胖蔡说技术
随便扯扯

django-celery 实现Django框架分布式队列

Celery 是一个简单,灵活且可靠的分布式系统,可以处理大量消息,同时为维护此类系统所需的工具提供操作。这是一个任务队列,重点是实时处理,同时还支持任务计划。本文将介绍如何在Django框架中通过django-celery实现部分是任务队列。

图1.1 celery分布式任务队列

Django中使用Celery

以前版本的 Celery 需要一个单独的库才能与 Django 一起工作,但从 3.1 开始,情况就不再如此了。 Django 现在支持开箱即用,因此本文档仅包含集成 Celery Django 的基本方法。您将使用与非 Django 用户相同的 APICelery 5.0.x支持Django 1.11 LTS或更新版本。请使用Celery 4.4.x用于比Django 1.11的版本。

要在您的 Django 项目中使用 Celery,您必须首先定义一个 Celery 库的实例(称为“app”)。如果你有一个像这样的现代 Django 项目布局:

- proj/
  - manage.py
  - proj/
    - __init__.py
    - settings.py
    - urls.py


那么推荐的方法是创建一个新的 proj/proj/celery.py 模块来定义 Celery 实例:

# proj/proj/celery.py
import os

from celery import Celery

# Set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'proj.settings')

app = Celery('proj')

# Using a string here means the worker doesn't have to serialize
# the configuration object to child processes.
# - namespace='CELERY' means all celery-related configuration keys
#   should have a `CELERY_` prefix.
app.config_from_object('django.conf:settings', namespace='CELERY')

# Load task modules from all registered Django apps.
app.autodiscover_tasks()


@app.task(bind=True, ignore_result=True)
def debug_task(self):
    print(f'Request: {self.request!r}')

然后,您需要在proj/proj/__ init__.py模块中导入此应用程序。这样可以确保当Django启动时加载该应用程序,以便@shared_task Decorator(后面提到)将使用它:

# proj/proj/__init__.py
# This will make sure the app is always imported when
# Django starts so that shared_task will use this app.
from .celery import app as celery_app

__all__ = ('celery_app',)

让我们分解一下第一个模块中发生的事情,首先,我们为 celery 命令行程序设置默认的 DJANGO_SETTINGS_MODULE 环境变量:

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'proj.settings')

大写命名空间意味着所有 Celery 配置选项必须以大写而不是小写指定,并以 CELERY_ 开头,例如 task_always_eager 设置变为 CELERY_TASK_ALWAYS_EAGERbroker_url 设置变为 CELERY_BROKER_URL。这也适用于 workers 设置,例如,worker_concurrency 设置变为 CELERY_WORKER_CONCURRENCY。例如,一个 Django 项目的配置文件可能包括:

# settings.py

...
# Celery Configuration Options
CELERY_TIMEZONE = "Australia/Tasmania"
CELERY_TASK_TRACK_STARTED = True
CELERY_TASK_TIME_LIMIT = 30 * 60

您可以直接传递设置对象,但使用字符串更好,因为这样工作人员就不必序列化对象。 CELERY_ namespace 也是可选的,但推荐使用(以防止与其他 Django 设置重叠)。接下来,可重用应用程序的常见做法是在单独的 tasks.py 模块中定义所有任务,而 Celery 确实有一种自动发现这些模块的方法:

app.autodiscover_tasks()

使用上面的行,Celery 将按照 tasks.py 约定自动从所有已安装的应用程序中发现任务:

- app1/
    - tasks.py
    - models.py
- app2/
    - tasks.py
    - models.py

这样您就不必手动将各个模块添加到 CELERY_IMPORTS 设置中。

使用@shared_task 装饰器

您编写的任务可能会存在于可重用应用程序中,而可重用应用程序不能依赖于项目本身,因此您也无法直接导入您的应用程序实例。@shared_task 装饰器可以让你在没有任何具体应用实例的情况下创建任务:

# demoapp/tasks.py
# Create your tasks here

from demoapp.models import Widget

from celery import shared_task


@shared_task
def add(x, y):
    return x + y


@shared_task
def mul(x, y):
    return x * y


@shared_task
def xsum(numbers):
    return sum(numbers)


@shared_task
def count_widgets():
    return Widget.objects.count()


@shared_task
def rename_widget(widget_id, name):
    w = Widget.objects.get(id=widget_id)
    w.name = name
    w.save()
赞(0) 打赏
转载请附上原文出处链接:胖蔡说技术 » django-celery 实现Django框架分布式队列
分享到: 更多 (0)

请小编喝杯咖啡~

支付宝扫一扫打赏

微信扫一扫打赏