Celery学习

Celery实现异步执行任务,拥有分布式的任务队列!!!


灵活且高效,不用担心高并发量导致过重的系统负担,有效处理复杂的逻辑任务,提高性能。

celery架构由三部分组成,消息中间件,任务执行单元和任务执行结果存储组成。

celery2{:width=”100%” align=”center”}

celery任务队列是一种跨线程、跨机器工作的一种机制.任务队列中包含称作任务的工作单元。有专门的工作进程持续不断的监视任务队列,并从中获得新的任务并处理.celery通过消息进行通信,通常使用一个叫Broker(中间人)来协client(任务的发出者)和worker(任务的处理者). clients发出消息到队列中,broker将队列中的信息派发给worker来处理。一个celery系统可以包含很多的worker和broker,可增强横向扩展性和高可用性能。

我们将耗时任务放到后台异步执行。不会影响用户其他操作。除了注册功能,例如上传,图形处理等等耗时的任务,都可以按照这种思路来解决。 如何实现异步执行任务呢?我们可使用celery. celery除了刚才所涉及到的异步执行任务之外,还可以实现定时处理某些任务。

Celery是一个功能完备即插即用的任务队列。它使得我们不需要考虑复杂的问题,使用非常简单。celery看起来似乎很庞大,本章节我们先对其进行简单的了解,然后再去学习其他一些高级特性。 celery适用异步处理问题,当发送邮件、或者文件上传, 图像处理等等一些比较耗时的操作,我们可将其异步执行,这样用户不需要等待很久,提高用户体验。 celery的特点是:

简单,易于使用和维护,有丰富的文档。

高效,单个celery进程每分钟可以处理数百万个任务。

灵活,celery中几乎每个部分都可以自定义扩展。

1.普通方式启动Celery Worker

在当前目录,使用如下方式启动Celery Worker:

1
celery worker -A tasks --loglevel=info   # 以默认方式prefork多进程启动celery

其中:

①参数 -A 指定了 Celery 实例的位置,本例是在 tasks.py 中,Celery 会自动在该文件中寻找 Celery 对象实例,当然,我们也可以自己指定,在本例,使用-A tasks.app;

②参数 –loglevel 指定了日志级别,默认为 warning,也可以使用 -l info 来表示;

③在生产环境中,我们通常会使用 Supervisor 来控制 Celery Worker 进程。


2.绿色协程启动celery:

1
celery -A mblog.celery worker -l info (-P eventlet) #最后是启动绿色协程

分析:添加了eventlet可能会导致django的db对数据库操作在关闭数据库时,会对创建这个连接进行验证是否是同一个线程所做的,如果不是同一个线程,则会报错。eventlet是支持wsgi的异步框架

https://www.cnblogs.com/giotto95827/p/8761055.html

注:而就是因为eventlet会对Python原生模块(os,thread)进行修改,也就是运行时打上猴子不定,使得原生的get_ident()取得的值和猴子补丁后的get_ident()取得的值不同。产生错误!为此,可以再Monkey_patch(thread=False)来取消其打补丁。但是有可能会影响eventlet带来的效果!


3.单进程启动celery:

1
celery -A mblog.celery worker --pool=solo -l info  # --pool=solo规定的单进程模式,以串行的方式执行任务队列

注:其中-A后面指定的是含有celery实例的py文件


4.django中使用celery

(1)单独配置一个py文件,用来配置celery

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
from __future__ import absolute_import, unicode_literals  # 绝对导入,把下一个新版本的特性导入到当前版本,例如python3取消了python2.u前缀

from celery import Celery
from django.conf import settings
from os import path, environ

project_name = path.split(path.dirname(__file__))[-1] # 当前目录的上一级==去掉文件名,返回目录
project_settings = "{}.settings".format(project_name)

# 设置环境变量,让django能够识别启动celery.py这个文件
environ.setdefault("DJANGO_SETTINGS_MODULE", project_settings)

# 实例化Celery
app = Celery(project_name)

# 使用django的settings文件配置celery,一些基础参数,比如任务队列存放的位置redis中,执行返回的结果保存的位置,BROKER_URL,CELERY_RESULT_BACKEND等
app.config_from_object("django.conf:settings", namespace='CELERY')

# Celery加载所有注册的应用
app.autodiscover_tasks(settings.INSTALLED_APPS)

(2)在setting.py文件中配置celery所需的属性

1
2
3
4
5
6
7
8
9
10
11
# celery 设置,用于实例化
# celery 中间人 redis://redis服务所在的ip地址:端口号/数据库号
BROKER_URL = 'redis://:<password>@127.0.0.1:6379/0'

# celery结果返回,可用于跟踪结果
CELERY_RESULT_BACKEND = 'redis://:<password>@127.0.0.1:6379/1'
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'

# celery时区设置,使用settings中TIME_ZONE同样的时区
CELERY_TIME_ZONE = TIME_ZONE

注:其中的为你的redis的密码

task任务函数返回值参考

https://www.cnblogs.com/wanghong1994/p/12144548.html

https://www.jianshu.com/p/66707b1e7787

https://www.cnblogs.com/zhaopanpan/p/10032853.html

注:中间人broker采用json格式保存任务通过保存其任务函数标识,任务函数名,调用参数。

执行任务函数的返回值必须是序列化的对象!!!


本博客所有文章除特别声明外,均采用 CC BY-SA 4.0 协议 ,转载请注明出处!