Design considerations while Evaluating, Developing, Deploying a distributed task processing system

Konark Modi

@konarkmodi

MakeMyTrip.com

EuroPython - 2014

Image credits

Expectations from the Talk

  • Definition of Job / Task.
  • Use cases
  • Celery and It's Architecture
  • Design Choices
  • Various types of workflows.
  • Tools Available

Note: Each one of us have their own use-cases, this talk is primarily to understand whole ecosystem of tasks. Because we use Celery hence I'd be using celery to relate to example.

What is a task & what is a distributed task processing ?

  • Clickstream ( No. of hotels being watched, likes etc.)
  • Generate graphs based on a large data set for display on a site’s home page / Dashboard every 15 minute
  • Admin applications like
  • Thumbnails generation
  • Re-generate static files by examining when certain items in the admin have been modified
  • Connecting to third-party APIs
  • Mass e-mailers

Celery and it's Architecture

  • What is Celery ?
    • Asynschronus task queue/job queue.
    • Uses distributed message passing.
    • Supports both real-time processing + schedule jobs.
    • Tasks can be run concurrently on a single or more worker servers taking advantage of multiprocessing.
  • Ridiculously simple to get started with : Will just see that in a moment :)
  • Flexible & Reliable
    • Configurable and Extensible
    • Reliable delivery and execution of tasks
  • Everything is message passing
    • Executing tasks
    • Broadcasting commands
    • Publish Results
  • Out-Of-Box tools for operations and management of the system
  • Our use-cases over Celery
    • Poll approx 300+ DB queries every 15 minutes.
    • Custom metric collection
    • Inventory management system :
    • Asynchronus web-interface
    • Periodic tasks
    • Execute million of data processing tasks in chunks that run over for days
  • Has now become as a de-facto framework for any distirbuted task processing and background execution.

Architecture

Architecture

Image credits:

Install Celery

pip install celery

Celery config

BROKER_URL = 'amqp://guest:guest@localhost:5672/' CELERY_RESULT_BACKEND = "amqp" CELERY_IMPORTS = ("tasks",)

celery tasks.py

celery = Celery('tasks') celery.config_from_object('celeryconfig')

@celery.task def test_demo(): print "name" return True

@celery.task def add(a1, a2): a3 = a1 + a2 print a3 return a3

Starting celery

celery -A tasks worker --loglevel=info -c 1

Executing tasks

from tasks import * s = add.delay(2,3) s.status s.result

Design Choices

  • Scheduling Capabilities
  • Task Management
  • Worker-Management
  • Admin and Reporting

Scheduling Capabilities

  • Scheduling not just based on time but the nature of task too.
    • Cron based syntax.
    • Humanized form of entries.
    • Interval based.
    • Immediate execution
    • Countdown based

Task Management

  • Routing of tasks
    • Priority of execution
    • Based on OS
    • Based on hardware-capabilities
  • Conflict Management
  • Retries
    • Exception Handling
    • Expiration
      • Time-Limits (Soft / Hard)
  • Task state-management :
    • Sent / Received / Started / Succeeded / Failed / Revoked / Retired
  • Controls : Pause, Kill, Delete

Worker Management

  • Basic Tasks
    • Start / Stop (Warm-Shut-down / Cold-Shut-down)
    • With Traceback
    • Online / Offline
    • Heartbeat
  • Worker inspection
    • No of tasks being executed , scheduled and reserved
    • Time taken by each task being executed time-taken
    • Which worker is mapped to what all queues
  • Time-Limits
    • Soft-Limit
    • Hard-Limit
  • Auto Scale-up and Scale-Out and also shrink to normal
  • Broadcast message to workers
    • Helpful in case of revoking tasks
  • Result backend
  • Handling worker failures
  • Purge all waiting tasks

Admin and Reporting

  • Scheduling tasks from UI

Monitoring Workers

Tasks status

Historical Trends

Types of workflows

  • Chains
  • Groups
  • Chord
  • Chunks
  • Task-Tress

Chains

  • Link tasks together ( [Task1] | Output -> Input | [Task2] -> Input | [Task3] )
  • task = chain(add.s(4,4),add.s(5))() ```

Groups

  • Take a list of tasks that should be applied in parallel
  • (Task1, Task2,.....,TaskN)
  • result_group = group(add.s(2, 2), add.s(4, 4))**

Chord

  • A chord is a task that only executes after all of the tasks in a group have finished executing.
  • ((Task1, Task2,.....,TaskN)|Output - > [Input] | (TaskX))

Chunks

  • Divide the work into chunks, execute in parallel.
  • res_chunks = add.chunks(zip(range(100), range(100)), 10)()

Task - Trees

  • Execute trees of tasks asynchronously in a particular order

    from celery_tasktree import task_with_callbacks, TaskTree

    @task_with_callbacks

      def some_action(...):
          ...
    
      def execute_actions():
          tree = TaskTree()
          task0 = tree.add_task(some_action, args=[...], kwargs={...})
          task1 = tree.add_task(some_action, args=[...], kwargs={...})
          task10 = task1.add_task(some_action, args=[...], kwargs={...})
          task11 = task1.add_task(some_action, args=[...], kwargs={...})
          task110 = task11.add_task(some_action, args=[...], kwargs={...})
          async_result = tree.apply_async()
          return async_result 

Tools Available

  • Jobtastic
  • Dagobah
  • Luigi
  • Chronos
  • Azkaban
  • Apache Oozie

Thank You & Questions

@konarkmodi