Celery 定时任务

1.安装celery /redis

pip install celery -i https://pypi.douban.com/simple
pip install redis -i https://pypi.douban.com/simple

2.编写定时任务

# -*- coding:utf-8 -*-
from celery import Celery
from celery.schedules import crontab


# 初始化celery celery 中间件选择redis
# celery_config_tasks: 起个名字 broker: 中间件的连接
crontab_app = Celery("celery_config_tasks", broker="redis://127.0.0.1:6379/1")

# 配置celery broker_heartbeat是redis断开后重连 
# worker_max_tasks_per_child是防止celery内存泄露 定期销毁work
crontab_app.conf.update(
    broker_heartbeat=None,
    worker_max_tasks_per_child=20
)

# @crontab_app.task 此装饰器代表是celery的任务
@crontab_app.task
def cron_task_one(number:int) -> int:
    sum_number = number + number
    print(f"sum_number--------------{sum_number}")
    return sum_number


@crontab_app.task
def cron_task_two(x:int, y:int) -> int:
    sum_xy = x + y
    print(f"sum_xy--------------{sum_xy}")
    return sum_xy


# 第一种 配置定时任务 celery_conrtab_task 是文件名
crontab_app.conf.beat_schedule = {
    "task_one": {
        "task": "celery_conrtab_task.cron_task_one",  # 定时任务
        "schedule": 1.0, # 秒
        "args": (100, )  # 参数
    },
    "task_two": {
        "task": "celery_conrtab_task.cron_task_two",
        "schedule": 1.0,
        "args": (1, 3)
    }
}
# schedule的另外一种写法 
# crontab()每分钟  crontab(minute='*/15')每十五分钟 
# 具体链接: https://docs.celeryproject.org/en/stable/userguide/periodic-tasks.html#crontab-schedules

#第二种方法配置定时任务
@crontab_app.on_after_configure.connect
def crontab_task(sender, **kwargs):
    sender.add_periodic_task(1.0, cron_task_one.s(100, ), name="celery_config_tasks")
    sender.add_periodic_task(3.0, cron_task_two.s(10, 20), name="celery_config_tasks")

3.启动任务

celery -A xxx(文件名) beat -l INFO # 发布定时任务 
celery -A xxx(文件名) worker -l INFO # worker处理任务 -c 可以指定并发数 默认是CPU核数

django+celery连接redis带密码凭证和无密码

celery连接redis不需要带密码

1.在项目中安装celery环境,在settings同一级文件创建个celeryconfig配置文件
2.在setting连接

CELERY_BROKER_BACKEND = 'redis'

CELERY_BROKER_URL = 'redis://127.0.0.1:6666/1'    #连接本地redis 不需要密码条件

CELERY_RESULT_BACKEND = 'django-db'
# 任务结果,使用Django的ORM
CELERY_ACCEPT_CONTENT = ['application/json']
# 设置任务接收的序列化类型
CELERY_TASK_SERIALIZER = 'json'
# 设置任务序列化方式
CELERY_RESULT_SERIALIZER = 'json'

celery连接redis需要带密码

from urllib.parse import quote  
PASSWORD = quote('xxxxx')  #使用有特殊字符密码,带有特殊字符需要进行转换才能识别

CELERY_BROKER_BACKEND = 'redis'

CELERY_BROKER_URL = 'redis://mast:{}@xxxx:6379/2'.format(PASSWORD)
#mast 代表 账号  {} 存放密码   xxx 连接的 ip  6379 端口  2 redis库
CELERY_RESULT_BACKEND = 'django-db'
# 任务结果,使用Django的ORM
CELERY_ACCEPT_CONTENT = ['application/json']
# 设置任务接收的序列化类型
CELERY_TASK_SERIALIZER = 'json'
# 设置任务序列化方式
CELERY_RESULT_SERIALIZER = 'json'

在端口运行如下图:

在这里插入图片描述

Django中使用celery+redis,当redis连接需要密码

当使用redis做broker,redis连接需要密码时:

BROKER_URL='redis://:xxxxx@127.0.0.1:6379/2',

其中xxxxx是密码,密码前必须加冒号。

文档更新时间: 2022-09-16 15:58   作者:李延召