En el post de hoy vamos a ver como integrar Celery con Django para programar una tarea periódica.
Qué es Celery
Celery es una cola de tareas asíncrona Open Source, basada en el envío distribuido de mensajes, que nos permite operaciones a tiempo real, pero también, operaciones programadas o periódicas. Es decir, nos permite enviar tareas a una cola, para que se ejecuten cuando sea posible.
¿Para que me sirve celery?
Hay veces que necesitamos ejecutar trabajos intensivos en recursos, que requieren tiempos de ejecución prolongados, lo que generaría un problema para cualquier aplicación donde haya interacción con el usuario, ya que bloquearía esa interacción hasta que finalizara el trabajo.
Si hablamos de aplicaciones web, donde hay diversos timeouts asociados (Web Server, navegador), Celery permite desacoplar la ejecución de estos procesos, de la aplicación con la que interactua el usuario, enviando los trabajos a un message broker, que los distribuirá en celery workers para que los ejecuten.
Las unidades de ejecución, denominadas tasks se ejecutan de forma concurrente por uno o varios servidores de trabajo, y se pueden ejecutar tanto en background, como de forma síncrona (esperando hasta que se completen)
La gracia en el entorno web es que podemos distribuir las tasks entre varios threads de trabajo, que pueden estar en diferentes servidores del que ejecuta la aplicación, para no afectar al rendimiento del servidor de la aplicación web.
Message Broker
Para enviar tareas a los threads de trabajo, Celery utiliza un sistema de envío y recepeción mensajes (cola de mensajes), lo que soluciona a través de un servicio aparte denominado message broker. El Message Broker recomendado para usar con Celery es RabbitMQ, pero también ofrece soporte limitado para usar Redis, Beanstalk, MongoDB, CouchDB y también bases de datos (usando SQLAlchemy o Django ORM).
RabbitMQ
RabbitMQ (MQ viene de Messages Queue), es un sistema de mensajería para aplicaciones basado en colas, muy potente y escalable, desarrollado con Erlang. Es muy completo en cuanto a características, estable, y de facil instalación. Desde Celery lo proponen como una elección excelente para el entorno de producción. Sí queréis indagar más en el tema, podéis mirar en Using RabbitMQ.
Instalando Celery y RabbitMQ
Recordemos que los comandos son para el supuesto de trabajo sobre Debian o Ubuntu, y usamos entornos virtuales mediante virtualenvwrapper
Instalemos RabbitMQ
$ sudo apt-get install rabbitmq-server
Tras la instalación, el broker debería estar listo y corriendo en background preparado para dirigir mensajes: Starting rabbitmq-server: SUCCESS.
Instalemos Celery
Nos movemos a nuestro entorno virtual e instalamos Celery:
$: workon mientorno
(mientorno)$:pip install celery
Estructura de un proyecto Django con Celery
Lo primero que vamos a hacer es ver como deberíamos estructurar el proyecto. Considerando que disponemos de la estructura típica, añadiremos los archivos celery.py y tasks.py como se muestra a continuación:
miProyecto
/db.sqlite3
/manage.py
/miAplicacion
/tasks.py
/…
/miProyecto
/__init__.py
/urls.py
/wsgi.py
/settings.py
/celery.py
Fichero celery.py
En este fichero definiremos la instancia de Celery:
#Activamos los imports absolutos para evitar conflictos entre packages
from __future__ import absolute_importimport os
from celery import Celery
from django.conf import settings
#indicamos el Django settings module por defecto para nuestro programa celery (no es necesario pero asi evitamos tener que pasarle siempre al programa celery nuestro módulo settings)
os.environ.setdefault(‘DJANGO_SETTINGS_MODULE’, ‘miProyecto.settings‘)app = Celery(‘miProyecto‘)
#Añadimos el Django settings module como fuente de configuración de Celery (nos permite configurar Celery directamente desde el Django settings). Al pasarlo como string nos ahorramos un problema si trabajasemos con windows.
app.config_from_object(‘django.conf:settings’)# Si tenemos nuestras tareas en un fichero de nombre tasks.py, esto nos permite indicarle a celery que encuentre automáticamente dicho módulo dentro del proyecto. De este modo no tenemos que añadirlo a la variable CELERY_IMPORTS del settings
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)#ejemplo de tarea que muestra su propia información. El bind=True indica que hace referencia a su instancia de tarea actual.
@app.task(bind=True)
def debug_task(self):
print(‘Request: {0!r}’.format(self.request))
Nótese que he resaltado en cursiva la llamada al fichero settings de nuestro proyecto así como la instanciación de Celery (en ambos casos tendremos que personalizarlo con el nombre del directorio adecuado) así commo la definición de una tarea demostrativa mediante el decorador task
DETALLE: Imports Absolutos
En Python 2.x, cuando realizamos algo del estilo import string, primero se busca en nuestros packages relativos del proyecto, y si no encuentra nada, busca en los directorios absolutos del sys.path. Normalmente no queremos ese comportamiento, y existe la posibilidad de indicar que busque en los directorios relativos con la inclusión de un punto (from . import string), así que para asegurarnos de que los imports que hacemos sin usar el . son absolutos, habilitamos esta característica mediante la llamada from __future__ import absolute_import
Importar la instancia de Celery
La instancia app de Celery que acabamos de definir, tiene que importarse con nuestro módulo, para asegurarnos que se importa al iniciar Django para que sea utilizada por el decorador @shared_task (que veremos más adelante), por lo que la llamaremos en el fichero miProyecto/miProyecto/__init__.py como vemos a continuación:
from __future__ import absolute_import
from .celery import app as celery_app
Fichero tasks.py
Creamos un fichero de demostración de tareas dentro de miProyecto/miAplicacion/tasks.py
from __future__ import absolute_import
from celery import shared_task@shared_task
def add(x, y):
return x + y@shared_task
def mul(x, y):
return x * y@shared_task
def xsum(numbers):
return sum(numbers)
Decorador @shared_task
La idea es que nuestras tareas vivan en una app reusable, que por definición no puede depender del propio proyecto, así que no podremos importar la instancia directamente. En lugar de eso, utilizamos el decorador @shared_task que nos permite crear tareas sin tener ninguna instancia concreta de nuestro programa Celery.
Usando RabbitMQ como Message Broker
Anteriormente hemos tomado la decisión de unsar RabbitMQ como Message Broker, y lo hemos instalado en el sistema, pero tenemos que indicarle a Celery que lo use. Para eso, y dado que hemos establecido el archivo settings.py como fuente de configuración de Celery en pasos anteriores, solo tenemos que editar el fichero miProyecto/miProyecto/settings.py para incluir la siguiente variable:
BROKER_URL = ‘amqp://guest:guest@localhost//’
Es interesante destacar que el broker podría no estar en el mismo servidor que ejecuta las aplicaciones Django, es por eso que el broker se especifica con una URL.
Backend de Resultados: guardando los resultados en Django
Para almacenar información del estado de las tareas, Celery necesita guardar o enviar los estados a algún sitio. Hay varios backends disponibles para esto, por lo que podemos elegir entre:SQLAlchemy, Django ORM, Memcached, Redis, AMQP (RabbitMQ), y MongoDB – o incluso puedes definirte el tuyo propio.
Evidentemente, en nuestro caso vamos a usar Django ORM
Para utilizar Django como nuestro backend de resultados, deberemos seguir 4 pasos:
-
Instalar primero la librería django-celery
(mientorno)$:pip install django-celery
- Añadir djcelery como INSTALED_APPS en el fichero de settings (miProyecto/miProyecto/settings.py)
INSTALLED_APPS = (
‘django.contrib.auth’,
‘django.contrib.contenttypes’,
‘django.contrib.sessions’,
‘django.contrib.sites’,
‘django.contrib.messages’,
‘django.contrib.staticfiles’,
‘miAplicacion’,
‘djcelery’,
# otras apps que use mi proyecto,
)
-
Crear las tablas de la base de datos de celery (tanto para guardar los resultados, como para el scheduler de tareas periódicas)
-
En caso de utilizar south
(miEntorno)$:./manage.py migrate djcelery
-
En caso contrario
(miEntorno)$:./manage.py syncdb
-
-
Configurar Celery para usar el backend django-celery, añadiendo la variable correspondiente directamente en nuestro fichero settings.py:
-
Si queremos usar la base de datos como backend:
CELERY_RESULT_BACKEND=’djcelery.backends.database:DatabaseBackend’
-
Si queremos usar la cache como backend:
CELERY_RESULT_BACKEND=’djcelery.backends.cache:CacheBackend’,
-
Iniciando el proceso de trabajo
En un entorno de producción, habría que ejecutar el worker en background como un daemon, pero para probarlo en nuestro entorno de desarrollo será suficiente con ejecutar el siguiente comando:
(miEntorno)$:celery -A miProyecto worker -l info
Probando la ejecución de tareas
Una vez hemos iniciado desde terminal el proceso de trabajo (comando anterior), si no ha habido ningún error, nos habrá salido el mensaje de que está listo, y el proceso estará vivo en ese terminal, por lo que:
- Abrimos un nuevo terminal
-
Cargamos nuestro entorno virtual
$:workon miEntorno
-
lanzamos la consola de django
(miEntorno)$: ./manage.py shell
-
Y ahora podemos utilizar los métodos add, mul y xsum que hemos definido en el modulo tasks:
>>> from miAplicacion.tasks import add
>>> result = add.delay(2, 7)
Llegado este punto podemos ver como en el otro terminal sale un mensaje que dice Received task: miAplicacion.tasks.add
Podemos mirar el valor resultante desde nuestra consola django con:
>>> result.ready()
True
>>> result.get()
9
Método delay()
Para llamar a una tarea, debemos usar el nombre de la misma (siguiendo nuestro ejemplo, add) y llamar al método delay, junto con los argumentos de la tarea.
Así pues, si la tarea add la hemos definido como:
@shared_task
def add(x,y)
A la hora de llamarla como tarea con los argumentos x=1, e y=3, ejecutaremos:
>>>add.delay(1,3)
Tareas Periódicas
Para ejecutar tareas periódicas, Celery utiliza Celery Beats, que es un planificador que nos permitirá lanzar tasks a intérvalos regulares que serán ejecutados por los workers disponibles en el cluster.
Algo que hay que recordar al usar Celery Beats es su planteamiento centralizado: Tienes que garantizar que solo haya funcionando un único planificador a la vez para un mismo calendario, ya que en caso contrario se podrían dar casos de tareas duplicadas.
En este sentido hay que tener en cuenta que las tareas se pueden llegar a solapar si su tiempo de ejecución es superior al intérvalo planificado por lo que habrá que vigilar con este tema, y en caso de ser un punto conflictivo, habría que diseñar una estratégia mediante locks
Entradas
Para planificar una tarea periódicamente, tenemos que añadir una entrada en el parámetro CELERYBEAT_SCHEDULE de los settings.
En nuestro caso, vamos a ejecutar cada 30 segundos la tarea add que hemos definido anteriormente para realizar la suma de los valores 15 y 35, por lo que abrimos nuestro fichero miProyecto/miProyecto/settings.py y añadimos:
from datetime import timedelta
CELERYBEAT_SCHEDULE = {
‘add-every-30-seconds’: {
‘task’: ‘miAplicacion.tasks.add’,
‘schedule’: timedelta(seconds=30),
‘args’: (15, 35)
},
}#Si no usamos CELERY_TIMEZONE, se usará por defecto el TIME_ZONE que use nuestra app Django
CELERY_TIMEZONE = ‘Europe/London’
Además del nombre de la tarea, la frecuencia de ejecución, y los args (esto permite listas y tuplas), podemos pasarle un diccionario como kwargs, opciones de ejecución, etc
Consideraciones de tiempo
La zona horaria por defecto de Celery es UTC, pero como véis, podemos usar la variable CELERY_TIMEZONE en los ajustes para cambiarlo.
Además, en el caso de Django, si no se especifica el CELERY_TIMEZONE se toma por defecto el TIME_ZONE que hayamos definido en nuestros settings, que es el que usará nuestro proyecto Django.
En caso de efectuar cambios en la zona horaria, habrá que actualizar la base de datos de Celery Beat para que se actualice el planificador. Para eso, hacemos:
(miEntorno)$:./manage.py shell
>>> from djcelery.models import PeriodicTask
>>> PeriodicTask.objects.update(last_run_at=None)
Planificaciones
No sólo podemos definir con qué frecuencia debe ejecutarse la tarea, si no que podemos decirle que se ejecute un día y hora concretos, mediante el tipo de planificador crontab
from celery.schedules import crontab
CELERYBEAT_SCHEDULE = {
# Executes every Monday morning at 7:30 A.M
‘add-every-monday-morning’: {
‘task’: ‘miAplicacion.tasks.add’,
‘schedule’: crontab(hour=7, minute=30, day_of_week=1),
‘args’: (15, 35),
},
}
En la documentación de Celery para Celery Beat, hay una tabla detallada que explica la sintaxis para crontab, y muestra lo complejas que se pueden hacer las planificaciones (cosas del estilo de ejecútate cada 10 minutos, pero solo de 3-4 am, 5-6 pm 9-10 pm los Jueves y los Viernes)
Lanzando el Planificador
Para lanzar el servicio celery-beats, hay un planificador por defecto (que guarda el registro de las últimas ejecuciones en una base de datos local), pero se pueden usar otros, indicándolo con el parámetro -S
En nuestro caso, utilizaremos el planificador que nos proporciona django-celery, que guarda las planificaciones en la BBDD de Django, y que nos permitirá añadir, modificar o elminar dichas planificaciones desde el propio panel de administración Django Admin.
Para lanzar nuestro planificador, ejecutamos:
(miEntorno)$:celery -A proj beat -S djcelery.schedulers.DatabaseScheduler
Entorno de producción
Para lanzar tanto el worker como el planificador en el entorno de producción, deberíamos ejecutarlos como daemons.
Ampliaré este tema cuando tenga algo más de tiempo, pero por ahora puedo dirigiros al tutorial de daemonización de Celery
Espero que este tutorial haya sido de ayuda. ¡Saludos!
Hola cómo estás, es el mejor tutorial que he visto en la web sobre celery. Por fa, cuando puedes subir un tuto sobre cómo poner en un ambiente de producción las tareas asíncrona de celery.
Gracias, me alegro de que te guste.
Celery en producción es un dolor de cabeza ¿eh? La documentación no es demasiado correcta. Cuando tenga algo de tiempo le dedicaré un POST, lo tengo pendiente, pero últimamente estoy metido en demasiados proyectos, y no me queda casi tiempo para el blog 🙁
Un saludo,
Por qué Celery es un dolor cabeza? Quisiera saber por que estaba pensando en usalo en un proyecto.
Enrique, muchas gracias por el trabajo de documentar (¡y bien documentado!) cómo programar tareas periódicas. Me ha sido muy útil esta entrada de tu blog. Quisiera aprovechar también para preguntarte si conoces la manera de generar un PDF en la tarea periódica. Actualmente uso reportlab y pisa para generar PDFs, mediante una vista que hace todos los cálculos correspondientes y llama a render_to_pdf con la plantilla html pasándole una variable context. ¿Cómo puedo hacer que desde una tarea se llame a esta vista tantas veces como sea necesario, y que el PDF lo guarde en algún directorio concreto? Muchas gracias!
¡Gracias Juan Luís! Me alegro de que te haya servido. En cuanto a lo del PDF, la verdad es que nunca he utilizado tareas de Celery para hacer algo como lo que comentas, ni me he tenido que pelear con la generación de PDFs 🙁
Yo quisera saber como hacer las Periodic task con Django pero qeu expliquen con un ejemplo mas detallado, entrando por view la hora mes y dia que deceo ejecutar la tarea… saludos
Muchas gracias por facilitar la vida de los demás, compartiendo el conocimiento que has adquirido mediante la experiencia. Esto no es un simple tutorial, te pones en todos los casos y esto me ha resultado increíblemente útil. Un abrazo bro.
Hola, me preguntaba si en celery hay alguna forma de aguardar informacion sobre las entradas y salidas de celery? Gracias!
Gracias! muy bien explicado, me sirvió de base para desacoplar unos procesos de mi aplicación y hacer que funcione más rápido y mejor.