Django, Celery & Scaling - Prevent Multiple Cron Jobs to Run at the Same Time
Use-case: You have an application running in AWS Fargate. The task consists of:
- app container
- worker container
- scheduler container
You also have a managed Redis instance (AWS ElastiCache). This way, you can run the task multiple times, scale it based on your traffic.
Nope, that's not true. With 3 tasks you would have 3 schedulers which share the same configuration - run task xxx
every y
.
These schedules don't talk to each other to decide who publishes the task based on the schedule. All of them schedule the job, so the task is run 3 times, at the same time (if you're using cron definition).
This is not the desired behaviour in most cases. Imagine a synchronisation script that should be run only once every day, at 00:00, no matter how many instances is running.
This is mentioned in Celery documentation:
Like with cron, the tasks may overlap if the first task doesn’t complete before the next. If that’s a concern you should use a locking strategy to ensure only one instance can run at a time (see for example Ensuring a task is only executed one at a time).
Which links us to this page with memcache example.
I'm not using memcache on my project, instead I use shared redis instance for Celery broker and result backend and local memory cache. The example there (using cache.add(lock stuff)
) leads me to think that the lock is only applied at memory level, which isn't shared across multiple workers. That wouldn't work for me, I need to use the shared redis for locking.
Other StackOverflow answers just create a new Redis()
class instance and expect the redis to exist open on the same machine (localhost:6379
). Also not my case - my redis resides on someawshostname:6379
, creating Redis()
instance without configuring it wouldn't work. Why would I even need a new instance, when I already defined Celery broker and result backend to redis?
This led me to create this decorator (complete file included for reference):
from __future__ import absolute_import, unicode_literals
import functools
import os
from celery import Celery
# set the default Django settings module for the 'celery' program.
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "project.settings")
app = Celery("project")
# Using a string here means the worker doesn't have to serialize
# the configuration object to child processes.
# - namespace='CELERY' means all celery-related configuration keys
# should have a `CELERY_` prefix.
app.config_from_object("django.conf:settings", namespace="CELERY")
# Load task modules from all registered Django app configs.
app.autodiscover_tasks()
REDIS_CLIENT = app.backend.client # re-use result backend Redis client
def single_instance_task(timeout):
def task_exc(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
ret_value = None
have_lock = False
lock_id = "celery-single-instance-" + func.__name__
lock = REDIS_CLIENT.lock(lock_id, timeout=timeout)
try:
have_lock = lock.acquire(blocking=False)
if have_lock:
ret_value = func(*args, **kwargs)
finally:
if have_lock:
lock.release()
return ret_value
return wrapper
return task_exc
@app.task(bind=True)
@single_instance_task(timeout=60)
def debug_task(self):
print("Request: {0!r}".format(self.request))
With this decorator, tasks scheduled while the same task is already running, will simply finish without doing anything (as they didn't acquire the lock).
The timeout
is there for safety, if the task with lock got stuck - other runs will be allowed to start after the timeout.
Also, if it's not obvious, the lock is released right when the task is finished - it's not necessarily locked for the full timeout
period. If you need to be sure that the task is not run more than once per interval, you'd need to implement other locking mechanism.
This is basically my combination of codes from these sources: