miércoles, 3 de enero de 2024

22. Tareas Asincrónicas

Cuando se recibe una solicitud HTTP, es necesario devolver una respuesta al usuario lo más rápido posible. Cada tarea ejecutada durante el ciclo de solicitud/respuesta se suma al tiempo total de respuesta. Las tareas de larga duración pueden ralentizar seriamente la respuesta del servidor. ¿Cómo podemos devolver una respuesta rápida al usuario mientras completamos tareas que consumen mucho tiempo? 

Podemos hacerlo con la ejecución asíncrona.

Trabajar con tareas asíncronas

Podemos aligerar la carga del ciclo solicitud/respuesta ejecutando ciertas tareas en segundo plano. Por ejemplo, una plataforma de intercambio de videos permite a los usuarios subir videos, pero necesita mucho tiempo para transcodificar los videos subidos. Cuando un usuario sube un video, el sitio podría devolver una respuesta informando que la transcodificación comenzará pronto y empezar a transcodificar el video de manera asíncrona. Otro ejemplo es el envío de correos electrónicos a los usuarios. Si tu sitio envía notificaciones por correo electrónico desde una vista, la conexión del Protocolo Simple de Transferencia de Correo (SMTP) podría fallar o ralentizar la respuesta. Al enviar el correo electrónico de manera asíncrona, evitas bloquear la ejecución del código.

La ejecución asíncrona es especialmente relevante para procesos intensivos en datos, intensivos en recursos y que consumen mucho tiempo, o procesos sujetos a fallos, que podrían requerir una política de reintento.


Trabajadores, colas de mensajes y corredores de mensajes


Mientras tu servidor web procesa solicitudes y devuelve respuestas, necesitas un segundo servidor basado en tareas, llamado trabajador, para procesar las tareas asíncronas. Puede haber uno o varios trabajadores en ejecución, llevando a cabo tareas en segundo plano. Estos trabajadores pueden acceder a la base de datos, procesar archivos, enviar correos electrónicos, etc. Incluso pueden poner en cola futuras tareas. Todo esto mientras se mantiene el servidor web principal libre para procesar las solicitudes HTTP.

Para indicar a los trabajadores qué tareas ejecutar, necesitamos enviar mensajes. Nos comunicamos con los corredores añadiendo mensajes a una cola de mensajes, que básicamente es una estructura de datos de tipo "primero en entrar, primero en salir" (FIFO, por sus siglas en inglés). Cuando un corredor está disponible, toma el primer mensaje de la cola y comienza a ejecutar la tarea correspondiente. Una vez terminada, el corredor toma el siguiente mensaje de la cola y ejecuta la tarea correspondiente. Los corredores quedan inactivos cuando la cola de mensajes está vacía. Cuando se usan múltiples corredores, cada uno toma el primer mensaje disponible en orden cuando están disponibles. La cola asegura que cada corredor reciba solo una tarea a la vez y que ninguna tarea sea procesada por más de un trabajador.

Asynchronous execution using a message queue and workers


Un productor envía un mensaje a la cola, y el/los trabajador(es) consumen los mensajes en orden de llegada; el primer mensaje añadido a la cola de mensajes es el primero en ser procesado por el/los trabajador(es).

Para gestionar la cola de mensajes, necesitamos un corredor de mensajes. El corredor de mensajes se utiliza para traducir mensajes a un protocolo formal de mensajería y gestionar colas de mensajes para múltiples receptores. Proporciona almacenamiento confiable y entrega garantizada de mensajes. El corredor de mensajes nos permite crear colas de mensajes, dirigir mensajes, distribuir mensajes entre trabajadores, etc.


Usando Django con Celery y RabbitMQ


Celery es una cola de tareas distribuida que puede procesar grandes cantidades de mensajes. Usaremos Celery para definir tareas asíncronas como funciones de Python dentro de nuestras aplicaciones de Django. Ejecutaremos trabajadores de Celery que escucharán al corredor de mensajes para obtener nuevos mensajes y procesar tareas asíncronas.

Usando Celery, no solo puedes crear tareas asíncronas fácilmente y permitir que sean ejecutadas por los trabajadores tan pronto como sea posible, sino que también puedes programarlas para que se ejecuten en un momento específico. Puedes encontrar la documentación de Celery en https://docs.celeryq.dev/es/stable/index.html.

Celery se comunica a través de mensajes y requiere un corredor de mensajes para intermediar entre los clientes y los trabajadores. Hay varias opciones para un corredor de mensajes para Celery, incluyendo almacenes de llave/valor como Redis, o un corredor de mensajes real como RabbitMQ.

RabbitMQ es el mediador o corredor de mensajes más ampliamente desplegado. Soporta múltiples protocolos de mensajería, como el Protocolo Avanzado de Cola de Mensajes (AMQP), y es el mediador de mensajes recomendado para Celery. RabbitMQ es liviano, fácil de desplegar y se puede configurar para escalabilidad y alta disponibilidad.

La siguiente Figura muestra cómo utilizaremos Django, Celery y RabbitMQ para ejecutar tareas asíncronas.

Architecture for asynchronous tasks with Django, RabbitMQ, and Celery


Instalando Celery


Vamos a instalar Celery e integrarlo en el proyecto. Instala Celery vía pip utilizando el siguiente comando:

```

pip install celery

```

Puedes encontrar una introducción a Celery en https://docs.celeryq.dev/en/stable/getting-started/introduction.html.


Instalando RabbitMQ


La comunidad de RabbitMQ proporciona una imagen de Docker que facilita mucho desplegar un servidor RabbitMQ con una configuración estándar. 

Docker es una popular plataforma de contenerización de código abierto. Permite a los desarrolladores empaquetar aplicaciones en contenedores, simplificando el proceso de construcción, ejecución, gestión y distribución de aplicaciones.

Primero, descarga e instala Docker para tu sistema operativo. Encontrarás instrucciones para descargar e instalar Docker en Linux, macOS y Windows en https://docs.docker.com/get-docker/.

Después de instalar Docker en tu máquina, puedes fácilmente descargar la imagen de RabbitMQ de Docker ejecutando el siguiente comando desde la terminal (para una distribución basada en Debian en mi ordenador). En este caso necesitaremos utilizar una versión de RabbitMQ que incluya la interfaz de gestión web (RabbitMQ Management Plugin) y que sea ligera:

```

sudo docker pull rabbitmq:management-alpine

```

Esto descargará la imagen de RabbitMQ de Docker a tu máquina local. Puedes encontrar información sobre la imagen oficial de RabbitMQ en Docker en https://hub.docker.com/_/rabbitmq.

Si prefieres instalar RabbitMQ directamente en tu máquina en lugar de utilizar Docker, encontrarás guías detalladas de instalación para diferentes sistemas operativos en https://www.rabbitmq.com/download.html.

Ejecuta el siguiente comando en la terminal para iniciar el servidor de RabbitMQ con Docker:

sudo docker run -it --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:management-alpine

Con este comando, estamos indicando a RabbitMQ que se ejecute en el puerto 5672, y estamos ejecutando su interfaz de usuario de gestión basada en web en el puerto 15672.

Verás una salida que incluye las siguientes líneas:

 Starting broker...

completed with 4 plugins.

... Server startup complete; 4 plugins started.

RabbitMQ está en funcionamiento en el puerto 5672 y listo para recibir mensajes.


Accediendo a la interfaz de gestión de RabbitMQ

Abre http://127.0.0.1:15672/ en tu navegador. Verás la pantalla de inicio de sesión para la interfaz de usuario de gestión de RabbitMQ. Se verá así:

The RabbitMQ management UI login screen


Introduce "guest" tanto en el nombre de usuario (Username) como en la contraseña (Password).

The RabbitMQ management UI dashboard

Este es el usuario administrador predeterminado para RabbitMQ. En esta pantalla, puedes monitorear la actividad actual de RabbitMQ. Puedes ver que hay un nodo en funcionamiento sin conexiones o colas registradas.

Si usas RabbitMQ en un entorno de producción, necesitarás crear un nuevo usuario administrador y eliminar el usuario invitado predeterminado. Puedes hacerlo en la sección de Administración de la interfaz de gestión.

Ahora vamos a agregar Celery al proyecto. Luego, ejecutaremos Celery y probaremos la conexión con RabbitMQ.

Agregar Celery a tu proyecto

Debes proporcionar una configuración para la instancia de Celery. Crea un nuevo archivo junto al archivo settings.py de ''PracticaDjango" y llámalo celery.py. Este archivo contendrá la configuración de Celery para tu proyecto.

Añade el siguiente código a él:


PracticaDjango/PracticaDjango/celery.py

import os
from celery import Celery
# Establece las opciones de configuración para el modulo Celery.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'PracticaDjango.settings')
app = Celery('PracticaDjango')
app.config_from_object('django.conf:settings', namespace='CELERY')
app.autodiscover_tasks()

  • Estableces la variable DJANGO_SETTINGS_MODULE para el programa de línea de comandos de Celery.
  • Creas una instancia de la aplicación con app = Celery('PracticaDjango').
  • Cargas cualquier configuración personalizada desde la configuración de tu proyecto utilizando el método config_from_object(). El atributo namespace especifica el prefijo que tendrán las configuraciones relacionadas con Celery en tu archivo settings.py. Al establecer el namespace CELERY, todas las configuraciones de Celery deben incluir el prefijo CELERY_ en su nombre (por ejemplo, CELERY_BROKER_URL).
  • Finalmente, le indicas a Celery que descubra automáticamente tareas asíncronas para tus aplicaciones. Celery buscará un archivo tasks.py en cada directorio de aplicación agregado a INSTALLED_APPS para cargar tareas asíncronas definidas en él.

Necesitas importar el módulo celery en el archivo __init__.py de tu proyecto para asegurarte de que se cargue cuando Django se inicie.

Edita el archivo PracticaDjango/init.py y agrega el siguiente código:

PracticaDjango/PracticaDjango/__init__.py

# import celery
from .celery import app as celery_app

__all__ = ['celery_app']
Con esto ya tenemos añadido Celery a nuestro proyecto de Django, y podemos empezar a utilizarlo.


Ejecutando un trabajador de Celery


Un trabajador de Celery es un proceso que maneja funciones como enviar/recibir mensajes en colas, registrar tareas, finalizar tareas bloqueadas, seguir el estado, etc. Una instancia de trabajador puede consumir cualquier cantidad de colas de mensajes.

Abre otra terminal y inicia un trabajador de Celery desde el directorio de tu proyecto, utilizando el siguiente comando:

celery -A PracticaDjango worker -l info

El trabajador de Celery está ahora en ejecución y listo para procesar tareas. Verifiquemos si hay una conexión entre Celery y RabbitMQ.

Abre http://127.0.0.1:15672/ en tu navegador para acceder a la interfaz de gestión de RabbitMQ. Ahora verás un gráfico bajo 'Mensajes en cola' (Queued messages) y otro gráfico bajo 'Tasas de mensajes' (message rates), similar a la siguiente imagen:

The RabbitMQ management dashboard displaying connections and queues

Obviamente, no hay mensajes en cola ya que aún no hemos enviado ningún mensaje a la cola de mensajes. El gráfico bajo 'Tasas de mensajes' debería actualizarse cada cinco segundos; puedes ver la frecuencia de actualización en la esquina superior derecha de la pantalla. Esta vez, tanto 'Conexiones' como 'Colas' deberían mostrar un número mayor que cero.

Ahora podemos comenzar a programar tareas asíncronas.


Agregar tareas asíncronas a tu aplicación


Enviemos un correo electrónico de confirmación al usuario cada vez que se realice un pedido en la tienda en línea. Implementaremos el envío del correo electrónico en una función de Python y la registraremos como una tarea con Celery. Luego, la agregaremos a la vista order_create para ejecutar la tarea de forma asíncrona.

La configuración CELERY_ALWAYS_EAGER te permite ejecutar tareas localmente de manera sincrónica, en lugar de enviarlas a la cola. Esto es útil para ejecutar pruebas unitarias o ejecutar la aplicación en tu entorno local sin ejecutar Celery.


Agregar tareas asíncronas a tu aplicación


Enviemos un correo electrónico de confirmación al usuario cada vez que se realice un pedido en la tienda en línea. Implementaremos el envío del correo electrónico en una función de Python y la registraremos como una tarea con Celery. Luego, la agregaremos a la vista order_create para ejecutar la tarea de forma asíncrona.

Cuando se ejecuta la vista order_create, Celery enviará el mensaje a una cola de mensajes gestionada por RabbitMQ y luego un intermediario de Celery ejecutará la tarea asíncrona que definimos con una función de Python.

La convención para facilitar el descubrimiento de tareas por parte de Celery es definir tareas asíncronas para tu aplicación en un módulo de tareas dentro del directorio de la aplicación.

Crea un nuevo archivo dentro de la aplicación de pedidos (Orders) y nómbralo tasks.py. Este es el lugar donde Celery buscará tareas asíncronas. Agrega el siguiente código a él:

PracticaBlog/PracticaDjango/Orders/tasks.py

from celery import shared_task
from django.core.mail import send_mail
from .models import Order

@shared_task
def order_created(order_id):
    """
    Función para enviar una notificación por correo electrónico cuando se realiza un pedido
    correctamente.
    """
    order = Order.objects.get(id=order_id)
    subject = f'Orden nº. {order.id}'
    message = f'Estimado {order.first_name},\n\n' \
              f'Su pedido ha sido realizado correctamente.' \
              f'Su ID del pedido es {order.id}.'
    mail_sent = send_mail(subject,
                          message,
                          'admin@myshop.com',
                          [order.email])
    return mail_sent

Hemos definido la función order_created utilizando el decorador @shared_task. Como puedes ver, una tarea de Celery es simplemente una función de Python decorada con @shared_task. La función order_created recibe un parámetro order_id. Siempre es recomendable pasar solo identificadores a las funciones de tarea y recuperar objetos de la base de datos cuando se ejecuta la tarea. De esta manera, evitamos acceder a información desactualizada, ya que los datos en la base de datos podrían haber cambiado mientras la tarea estaba en cola. Hemos utilizado la función send_mail() proporcionada por Django para enviar una notificación por correo electrónico al usuario que realizó el pedido.

Ya vimos cómo configurar Django para usar tu servidor SMTP en el Capítulo 10, Django. Formulario de contacto y envío de email con datos. Variables de Entorno. Si no quieres configurar los ajustes de correo electrónico, puedes indicarle a Django que escriba correos electrónicos en la consola agregando la siguiente configuración al archivo settings.py:

EMAIL_BACKEND = 'django.core.mail.backends.console.EmailBackend

Utiliza tareas asíncronas no solo para procesos que consumen mucho tiempo, sino también para otros procesos que no requieren tanto tiempo para ejecutarse pero que están sujetos a fallos de conexión o requieren una política de reintentos.

Ahora debes agregar la tarea a tu vista order_create. Edita el archivo views.py de la aplicación de órdenes (Orders), importa la tarea y llama a la tarea asíncrona order_created después de vaciar el carrito, de la siguiente manera:

PracticaBlog/PracticaDjango/Orders/tasks.py

#...
# Crea una tarea asincronica al finalizar la orden
from .tasks import order_created


# Create your views here.
def order_create(request):
    carro = Carro(request)
    if request.method == "POST":
        form = OrderCreateForm(request.POST)
        if form.is_valid():
            order = form.save()
            for item in carro:
                OrderItem.objects.create(
                    order=order,
                    product=item["producto"],
                    price=item["precio"],
                    quantity=item["cantidad"],
                )
            # Limpia el carro
            carro.clear()
            # launch asynchronous task
            order_created.delay(order.id)
            return render(request, "Orders/order/created.html", {"order": order})
    else:
        form = OrderCreateForm()
    return render(request, "Orders/order/create.html", {"cart": carro, "form": form})

Llamas al método delay() de la tarea para ejecutarla de manera asíncrona. La tarea se añadirá a la cola de mensajes y será ejecutada por el trabajador de Celery tan pronto como sea posible.

Asegúrate de que RabbitMQ esté en funcionamiento. Luego, detén el proceso del trabajador de Celery y vuelve a iniciarlo con el siguiente comando:

celery -A PracticaDjango worker -l info

El trabajador de Celery ha registrado ahora la tarea. En otra terminal, inicia el servidor de desarrollo desde el directorio del proyecto con el siguiente comando:

python manage.py runserver

Abre http://127.0.0.1:8000/ en tu navegador, añade algunos productos a tu carrito de compras y completa un pedido. En la terminal donde iniciaste el trabajador de Celery, verás una salida similar a la siguiente:

[2024-01-02 20:00:07,087: WARNING/ForkPoolWorker-2] Content-Type: text/plain; charset="utf-8"
MIME-Version: 1.0
Content-Transfer-Encoding: 7bit
Subject: =?utf-8?b?T3JkZW4gbsK6LiAz?=
From: admin@myshop.com
To: perico@gamil.com
Date: Tue, 02 Jan 2024 19:00:07 -0000
Message-ID: <170422200708.12830.17805739684368165117@machine>

Estimado fdfdf,

Su pedido ha sido realizado correctamente.Su ID del pedido es 3.

La tarea order_created ha sido ejecutada y se ha enviado una notificación por correo electrónico para el pedido. Si estás utilizando el backend de correo electrónico console.EmailBackend, no se enviará ningún correo electrónico, pero deberías ver el texto renderizado del correo electrónico en la salida de la consola.


Monitorizando Celery con Flower


Además de la interfaz de gestión de RabbitMQ, puedes usar otras herramientas para monitorear las tareas asíncronas que se ejecutan con Celery. Flower es una herramienta basada en web para monitorear Celery.

Instala Flower usando el siguiente comando:

pip install flower

Una vez instalado, puedes iniciar Flower ejecutando el siguiente comando en una nueva terminal desde el directorio de tu proyecto:

celery -A PracticaDjango flower

Abre http://localhost:5555/dashboard en tu navegador. Podrás ver los trabajadores activos de Celery y estadísticas de tareas asíncronas. La pantalla debería parecerse a esta:


página de inicio de flower


Verás un trabajador activo, cuyo nombre comienza con celery@ y cuyo estado es En línea.

Haz clic en el nombre del trabajador y luego haz clic en la pestaña Colas (Queues). Verás la siguiente pantalla:

pestaña de colas de flowers


En este lugar, puedes ver la cola activa llamada celery. Esta es la cola activa que está conectada al broker de mensajes.

Si vuelves a la pantalla inicial y haces clic en el número total de tareas procesadas, verás la siguiente pantalla:

pestaña de mensajes procesados

Aquí puedes ver las tareas que han sido procesadas y la cantidad de veces que han sido ejecutadas. Deberías ver la tarea order_created y el número total de veces que ha sido ejecutada. Este número podría variar dependiendo de cuántos pedidos hayas realizado.

Abre http://localhost:8000/ en tu navegador. Agrega algunos artículos al carrito y luego completa el proceso de compra.

Abre http://localhost:5555/dashboard en tu navegador. Flower ha registrado la tarea como procesada. Ahora deberías ver al menos un 1 bajo "Procesado" y un 1 bajo "Completado" también.


Si al refrescar Flower te da este error:

FLOWER_UNAUTHENTICATED_API environment variable is required to enable API without authentication

Si estás en un entorno Unix/Linux/macOS, puedes establecer la variable de entorno antes de ejecutar Flower en la línea de comandos: export FLOWER_UNAUTHENTICATED_API=true celery -A PracticaDjango flower

Si estás en Windows, puedes establecer la variable de entorno y luego ejecutar Flower: set FLOWER_UNAUTHENTICATED_API=true celery -A myshop flower

Al configurar FLOWER_UNAUTHENTICATED_API en true, habilitarás el acceso a la API de Flower sin autenticación. Esto solucionará el error que estás viendo al intentar refrescar Flower en tu navegador web.

Puedes encontrar más información sobre este programa en https://flower.readthedocs.io/.

Puedes encontrar el código de es Post en el siguiente enlace de GITHUB.

No hay comentarios:

Publicar un comentario