Celery 定时任务
1.安装celery /redis
pip install celery -i https://pypi.douban.com/simple
pip install redis -i https://pypi.douban.com/simple
2.编写定时任务
# -*- coding:utf-8 -*-
from celery import Celery
from celery.schedules import crontab
# 初始化celery celery 中间件选择redis
# celery_config_tasks: 起个名字 broker: 中间件的连接
crontab_app = Celery("celery_config_tasks", broker="redis://127.0.0.1:6379/1")
# 配置celery broker_heartbeat是redis断开后重连
# worker_max_tasks_per_child是防止celery内存泄露 定期销毁work
crontab_app.conf.update(
broker_heartbeat=None,
worker_max_tasks_per_child=20
)
# @crontab_app.task 此装饰器代表是celery的任务
@crontab_app.task
def cron_task_one(number:int) -> int:
sum_number = number + number
print(f"sum_number--------------{sum_number}")
return sum_number
@crontab_app.task
def cron_task_two(x:int, y:int) -> int:
sum_xy = x + y
print(f"sum_xy--------------{sum_xy}")
return sum_xy
# 第一种 配置定时任务 celery_conrtab_task 是文件名
crontab_app.conf.beat_schedule = {
"task_one": {
"task": "celery_conrtab_task.cron_task_one", # 定时任务
"schedule": 1.0, # 秒
"args": (100, ) # 参数
},
"task_two": {
"task": "celery_conrtab_task.cron_task_two",
"schedule": 1.0,
"args": (1, 3)
}
}
# schedule的另外一种写法
# crontab()每分钟 crontab(minute='*/15')每十五分钟
# 具体链接: https://docs.celeryproject.org/en/stable/userguide/periodic-tasks.html#crontab-schedules
#第二种方法配置定时任务
@crontab_app.on_after_configure.connect
def crontab_task(sender, **kwargs):
sender.add_periodic_task(1.0, cron_task_one.s(100, ), name="celery_config_tasks")
sender.add_periodic_task(3.0, cron_task_two.s(10, 20), name="celery_config_tasks")
3.启动任务
celery -A xxx(文件名) beat -l INFO # 发布定时任务
celery -A xxx(文件名) worker -l INFO # worker处理任务 -c 可以指定并发数 默认是CPU核数
django+celery连接redis带密码凭证和无密码
celery连接redis不需要带密码
1.在项目中安装celery环境,在settings同一级文件创建个celeryconfig配置文件
2.在setting连接
CELERY_BROKER_BACKEND = 'redis'
CELERY_BROKER_URL = 'redis://127.0.0.1:6666/1' #连接本地redis 不需要密码条件
CELERY_RESULT_BACKEND = 'django-db'
# 任务结果,使用Django的ORM
CELERY_ACCEPT_CONTENT = ['application/json']
# 设置任务接收的序列化类型
CELERY_TASK_SERIALIZER = 'json'
# 设置任务序列化方式
CELERY_RESULT_SERIALIZER = 'json'
celery连接redis需要带密码
from urllib.parse import quote
PASSWORD = quote('xxxxx') #使用有特殊字符密码,带有特殊字符需要进行转换才能识别
CELERY_BROKER_BACKEND = 'redis'
CELERY_BROKER_URL = 'redis://mast:{}@xxxx:6379/2'.format(PASSWORD)
#mast 代表 账号 {} 存放密码 xxx 连接的 ip 6379 端口 2 redis库
CELERY_RESULT_BACKEND = 'django-db'
# 任务结果,使用Django的ORM
CELERY_ACCEPT_CONTENT = ['application/json']
# 设置任务接收的序列化类型
CELERY_TASK_SERIALIZER = 'json'
# 设置任务序列化方式
CELERY_RESULT_SERIALIZER = 'json'
在端口运行如下图:
Django中使用celery+redis,当redis连接需要密码
当使用redis做broker,redis连接需要密码时:
BROKER_URL='redis://:xxxxx@127.0.0.1:6379/2',
其中xxxxx是密码,密码前必须加冒号。
文档更新时间: 2022-09-16 15:58 作者:李延召