Celery Integration with Chaski Confluent

Chaski Confluent provides seamless integration with Celery for task management and distributed processing. The following demonstrates how to set up and use Celery with Chaski’s transport and backend.

Example tasks.py

The following script defines a Celery application and a simple task:

import os
from celery import Celery

from chaski.utils import transport
from chaski.utils import backend

# Configure the Celery application
app = Celery(
    'test',
    broker=os.getenv("CHASKI_CELERY_BROKER"),
    backend=os.getenv("CHASKI_CELERY_BACKEND"),
)

# Define a simple task
@app.task
def add(x, y):
    return x + y

Using the Task

To use the add task, you can call it asynchronously and retrieve the result as follows:

from tasks import add

# Execute the task asynchronously
result = add.delay(5, 6)

# Wait for and fetch the result
resultado = result.get(timeout=10)

Key Features

  • Broker Configuration: The broker is set using the CHASKI_CELERY_BROKER environment variable, defaulting to chaski://127.0.0.1:65433 if not defined.

  • Backend Configuration: The backend for storing task results is configured via the CHASKI_CELERY_BACKEND environment variable, with the same default as the broker.

  • Task Definition: Tasks are decorated with @app.task for easy declaration and execution.

This integration allows for distributed task execution with the robustness and scalability of Chaski Confluent.