We'll use the following code throughout the documentation.
from celery import Celery
from time import sleep
celery = Celery()
celery.config_from_object({
'BROKER_URL': 'amqp://localhost',
'CELERY_RESULT_BACKEND': 'amqp://',
'CELERYD_POOL_RESTARTS': True, # Required for /worker/pool/restart API
})
@celery.task
def add(x, y):
return x + y
@celery.task
def sub(x, y):
sleep(30) # Simulate work
return x - y
You'll need a celery worker instance and a flower instance running. In one terminal window run
celery worker --loglevel INFO -A proj -E --autoscale 10,3
and in another terminal run
celery flower -A proj
The tasks API is async, meaning calls will return immediatly and you'll need to poll on task status.
# Done once for the whole docs
import requests, json
api_root = 'http://localhost:5555/api'
task_api = '{}/task'.format(api_root)
args = {'args': [1, 2]}
url = '{}/async-apply/tasks.add'.format(task_api)
print(url)
resp = requests.post(url, data=json.dumps(args))
reply = resp.json()
reply
http://localhost:5555/api/task/async-apply/tasks.add
{u'state': u'PENDING', u'task-id': u'f4a53407-30f3-42af-869f-b7f8f4fbd684'}
We can see that we created a new task and it's pending. Note that the API is async, meaning it won't wait until the task finish.
Gets the task result. This is async and will return immediatly even if the task didn't finish (with state 'PENDING')
url = '{}/result/{}'.format(task_api, reply['task-id'])
print(url)
resp = requests.get(url)
resp.json()
http://localhost:5555/api/task/result/ced6fd57-419e-4b8e-8d99-0770be717cb4
{u'result': 3, u'state': u'SUCCESS', u'task-id': u'ced6fd57-419e-4b8e-8d99-0770be717cb4'}
Revoke a running task.
# Run a task
args = {'args': [1, 2]}
resp = requests.post('{}/async-apply/tasks.sub'.format(task_api), data=json.dumps(args))
reply = resp.json()
# Now revoke it
url = '{}/revoke/{}'.format(task_api, reply['task-id'])
print(url)
resp = requests.post(url, data='terminate=True')
resp.json()
http://localhost:5555/api/task/revoke/bcb4ac2e-cb2d-4a4b-a402-8eb3a3b0c8e8
{u'message': u"Revoked 'bcb4ac2e-cb2d-4a4b-a402-8eb3a3b0c8e8'"}
Update rate limit for a task.
worker = 'miki-manjaro' # You'll need to get the worker name from the worker API (seel below)
url = '{}/rate-limit/{}'.format(task_api, worker)
print(url)
resp = requests.post(url, params={'taskname': 'tasks.add', 'ratelimit': '10'})
resp.json()
http://localhost:5555/api/task/rate-limit/miki-manjaro
{u'message': u'new rate limit set successfully'}
url = '{}/timeout/{}'.format(task_api, worker)
print(url)
resp = requests.post(url, params={'taskname': 'tasks.add', 'hard': '3.14', 'soft': '3'}) # You can omit soft or hard
resp.json()
http://localhost:5555/api/task/timeout/miki-manjaro
{u'message': u'time limits set successfully'}
# Once for the documentation
worker_api = '{}/worker'.format(api_root)
List workers.
url = '{}/workers'.format(api_root) # Only one not under /worker
print(url)
resp = requests.get(url)
workers = resp.json()
workers
http://localhost:5555/api/workers
{u'miki-manjaro': {u'completed_tasks': 0, u'concurrency': 1, u'queues': [u'celery'], u'running_tasks': 0, u'status': True}}
Shutdown a worker.
worker = workers.keys()[0]
url = '{}/shutdown/{}'.format(worker_api, worker)
print(url)
resp = requests.post(url)
resp.json()
http://localhost:5555/api/worker/shutdown/miki-manjaro
{u'message': u'Shutting down!'}
Restart a worker pool, you need to have CELERYD_POOL_RESTARTS enabled in your configuration).
pool_api = '{}/pool'.format(worker_api)
url = '{}/restart/{}'.format(pool_api, worker)
print(url)
resp = requests.post(url)
resp.json()
http://localhost:5555/api/worker/pool/restart/miki-manjaro
{u'message': u"Restarting 'miki-manjaro' worker's pool"}
Grows worker pool.
url = '{}/grow/{}'.format(pool_api, worker)
print(url)
resp = requests.post(url, params={'n': '10'})
resp.json()
http://localhost:5555/api/worker/pool/grow/miki-manjaro
{u'message': u"Growing 'miki-manjaro' worker's pool"}
Shrink worker pool.
url = '{}/shrink/{}'.format(pool_api, worker)
print(url)
resp = requests.post(url, params={'n': '3'})
resp.json()
http://localhost:5555/api/worker/pool/shrink/miki-manjaro
{u'message': u"Shrinking 'miki-manjaro' worker's pool"}
url = '{}/autoscale/{}'.format(pool_api, worker)
print(url)
resp = requests.post(url, params={'min': '3', 'max': '10'})
resp.json()
http://localhost:5555/api/worker/pool/autoscale/miki-manjaro
{u'message': u"Autoscaling 'miki-manjaro' worker"}
Add a consumer to a queue.
queue_api = '{}/queue'.format(worker_api)
url = '{}/add-consumer/{}'.format(queue_api, worker)
print(url)
resp = requests.post(url, params={'queue': 'jokes'})
resp.json()
http://localhost:5555/api/worker/queue/add-consumer/miki-manjaro
{u'message': u"add consumer u'jokes'"}
Cancel a consumer queue.
url = '{}/cancel-consumer/{}'.format(queue_api, worker)
print(url)
resp = requests.post(url, params={'queue': 'jokes'})
resp.json()
http://localhost:5555/api/worker/queue/cancel-consumer/miki-manjaro
{u'message': u'no longer consuming from jokes'}