Celery 使用详解

前言

前段时间需要使用rabbitmq做写缓存,一直使用pika+rabbitmq的组合,pika这个模块虽然可以很直观地操作rabbitmq,但是官方给的例子太简单,对其底层原理了解又不是很深,遇到很多坑,尤其是需要自己写连接池管理和channel池管理。虽然也有用过celery,一直也是celery+redis的组合,涉及很浅;目前打算深研一下celery+redis+rabbitmq的使用。

celery + rabbitmq初步

我们先不在集成框架如flask或Django中使用celery,而仅仅单独使用。

简单介绍

Celery 是一个异步任务队列,一个Celery有三个核心组件:

  • Celery 客户端: 用于发布后台作业;当与 Flask 一起工作的时候,客户端与 Flask 应用一起运行。
  • Celery workers: 运行后台作业的进程。Celery 支持本地和远程的 workers,可以在本地服务器上启动一个单独的 worker,也可以在远程服务器上启动worker,需要拷贝代码;
  • 消息代理: 客户端通过消息队列和 workers 进行通信,Celery 支持多种方式来实现这些队列。最常用的代理就是 RabbitMQ 和 Redis。

安装rabbitmq和redis

为了提高性能,官方推荐使用librabbitmq,这是一个连接rabbitmq的C++的库;

初步使用

一般我们使用redis做结果存储,使用rabbitmq做任务队列;

第一步:创建并发送一个异步任务

发生了什么事

  • app.task装饰add函数成一个Task实例,add.delay函数将task实例序列化后,通过librabbitmq库的方法将任务发送到rabbitmq;
  • 该过程创建一个名字为celery的exchange交换机,类型为direct(直连交换机);创建一个名为celery的queue,队列和交换机使用路由键celery绑定;
  • 打开rabbitmq管理后台,可以看到有一条消息已经在celery队列中;

记住:当有多个装饰器的时候,app.task一定要在最外层;

扩展

如果使用redis作为任务队列中间人,在redis中存在两个键 celery和_kombu.binding.celery, _kombu.binding.celery表示有一名为 celery 的任务队列(Celery 默认),而键celery为默认队列中的任务列表,使用list类型,可以看看添加进去的任务数据。

第二步:开启worker执行任务

在项目目录下执行命令:

任务执行完毕后结果存储在redis中,查看redis中的数据,发现存在一个string类型的键值对:

该键值对的失效时间默认为24小时。

分析序列化的消息

add.delay将Task实例序列化后发送到rabbitmq,那么序列化的过程是怎样的呢?

下面是添加到rabbitmq任务队列中的消息数据,使用的是pickle模块对body部分的数据进行序列化:

将序列化消息反序列化

我们可以看到body里面有我们需要执行的函数的一切信息,celery的worker接收到消息后就会反序列化body数据,执行相应的方法。

  • 常见的数据序列化方式

经过比较,为了保持跨语言的兼容性和速度,采用msgpack或json方式;

celery配置

celery的性能和许多因素有关,比如序列化的方式,连接rabbitmq的方式,多进程、单线程等等,我们可以指定配置;

基本配置项

加载配置

也可以直接加载配置

此外还有两个方法可以加载配置,但开发不会直接调用:

一份比较常用的配置文件

在celery4.0以后配置参数改成了小写,对于4.0以后的版本替代参数:

Celery对象

核心的对象就是Celery了,初始化方法:

这些参数都是celery实例化的配置,我们也可以不写,然后使用config_from_object方法加载配置;

创建异步任务的方法task

任何被task修饰的方法都会被创建一个Task对象,变成一个可序列化并发送到远程服务器的任务;它有多种修饰方式:

  • 使用默认的参数
  • 指定相关参数
  • 自定义Task基类

Task的一般属性

调用异步任务

调用异步任务有三个方法,如下:

app.send_task

Task.delay

delay方法是apply_async方法的简化版,不支持执行选项,只能传递任务的参数。

Task.apply_async

apply_async支持执行选项,它会覆盖全局的默认参数和定义该任务时指定的执行选项,本质上还是调用了send_task方法;

  • 自定义发布者,交换机,路由键, 队列, 优先级,序列方案和压缩方法:

获取任务结果和状态

由于celery发送的都是去其他进程执行的任务,如果需要在客户端监控任务的状态,有如下方法:

但是一般业务中很少用到,因为获取任务执行的结果需要阻塞,celery使用场景一般是不关心结果的。

使用celery

本文为转载文章,贵在分享,版权归原作者及原出处所有,如涉及版权等问题,请及时与我联系。
原文出处:天宇之游
原文链接:https://www.cnblogs.com/cwp-bg/p/8759638.html

发表评论

邮箱地址不会被公开。 必填项已用*标注