分布式消息队列 Celery 的最佳实践

目录

不使用数据库作为 Broker

Broker 的选择有很多,可以是消息队列,也可以是数据库,但我们应该尽量避免使用数据库作为 Broker,除非你的业务环境非常简单。

因为当任务并发高的时候,大量的 Worker 为了获取任务,会不断的轮询访问数据库,对数据库造成了很大的压力。 同时磁盘 I/O 也会一直处于高峰值状态,严重影响了系统性能。
如果说你的数据库即充当了 Broker 又兼顾着后端业务,那么应用程序就很可能会被拖垮。

使用消息队列作为 Broker,例如 RabbitMQ,就不会出现以上的问题。首先 RabbitMQ 会将任务队列放到内存中,速度快也不占磁盘 I/O。而且 RabbitMQ 会主动的将任务推送给订阅了队列的 Worker,Worker 也就不需要频繁的去轮询队列了。

实现优先级任务

不要将所有任务都堆积到默认队列,因为总有些任务是需要被即时处理的,例如用户的验证码短信比较紧急,应及时发送,而一些宣传短信却可以缓点再发。为了更好的用户体验,也为了达到更高的并发性能,任务总需要有个轻重缓急。

首先我们可以将任务进行合理的分类,不同类型的任务分别存放到不同的队列中。同时还可以为高优先级任务队列分配更多的 Worker,以此来保证高优先级任务的处理效率。常见的任务类型有:

  • 高优先级任务,低优先级任务
  • 实时任务,定时任务
  • 高频率任务,低频率任务
  • 长时间任务,短时间任务

但这样做的话,就不得不面对一个问题。当高优先级任务被消费完了,那么多个 Worker 也会随之空闲下来,极大的浪费了系统资源。所以说将所有任务都堆积到一个队列是不行的,一味地为高优先级任务队列分配 Worker 也不行,我们应该取出一个折中的方法。就是在保证高优先级任务队列始终拥有更多 Worker 的同时,当这些 Worker 空闲时,也能够处理低优先级的任务。我们可以通过 Worker 允许同时订阅多个队列的特性来实现这个效果。打个比方,现在有 high_queue、low_queue 以及 worker_1、2、3。那么我们应该让 worker_1、2、3 都订阅了 high_queue 的同时,也让 worker_2、3 订阅 low_queue。

而且将任务分类还有另一个好处就是能够有效的将关注度不同的任务日志分离(在启动 Worker 时可以分别指定日志文件路径),便于分门别类的查看。

需要注意的是,实践证明并不是 Worker 的数量越多就越好,只需要保证任务不堆积,还略有预留就可以了。

设置 Worker 启动的数量(默认数量为运行环境的 CPU 内核数)

CELERYD_CONCURRENCY = 4

应用 Worker 动态扩展

多线程和协程模式的 Worker 支持自动动态扩展,通过 CLI 启动 Worker 时使用 --autoscale 选项。该选项接收两个数值参数,分别表示 Worker 进程池的最大并发进程数和常态并发进程数。

celery -A celery_proj worker --autoscale=6,3

这两个数值如何设定要视乎实际的 Worker 运行环境,CPU 个数、Memory、任务数等。我们也提到过,Worker 并不是越多就越好,Worker 的进程数也一样。常规的设定方法为,常态并发进程数等于 CPU 个数,最大并发进程数为 CPU 个数的 2 倍即可。当然了,最优配比还是要根据任务的实际情况而定。

应用任务预取数

在介绍 RabbitMQ 的时候我们也提到过,任务执行所需的时间有长有短,对于执行时间短的任务我们应该给予 Worker 更大的任务预取数。这也是使用消息队列作为 Broker 的另一个好处。

以 Web 系统的文件上传为例,假如上传小文件(小于1MB)的数量要远大于上传大文件(大于 20MB)的数量,那么小文件上传任务就是高频率的短任务,相反,大文件上传任务就是低频率的长任务。可以分别使用 queue_small/worker_small_1、2 以及 queue_big/worker_big 来处理这些任务,并且应该为 worker_small_1、2 都设置更大的任务预取数(e.g. 每次取出 100 个任务),这样能够有效降低获取任务所花费的时间成本,得到更高的执行效率。

配置 Worker 每次从 RabbitMQ 获取的任务数:

CELERYD_PREFETCH_MULTIPLIER = 4

使用任务异常重试机制

当任务执行时触发异常了,我们会想着是否能够重跑一下,万一成功了呢。尤其是一些具有网络相关性的任务异常(e.g.调用外部 API 接口),很可能是由暂时性网络抖动触发的。对于这种情况,重试将会非常有效。Celery 提供的任务重试机制,可以定义任务等待重试时间、最大重试次数等参数。当然了,对于一些无论重试多少次也不会成功的任务(e.g.清理文件权限不足),就没必要做这些事情了。

保持任务的幂等性

Celery 虽然提供了任务异常重试机制,但却无法保证任务的事务性,也就是说 Celery 没有提供回滚的方法。为了让任务更易于部署和重试,我们应该尽量实现符合幂等性的任务,或者说应该尽量将一个长时间的任务拆解为多个幂等的短任务。

幂等(idempotent)是一个数学概念,常见于抽象代数中。在计算机编程中,幂等性函数(方法)的特点就是只要使用相同参数,无论重复执行多少次,都能得到相同的结果。幂等性函数不会影响系统状态,也不必担心重复执行会对系统造成改变。例如:get_user_name()set_true() 都属幂等函数。

简单来说,幂等性函数不论被执行或重跑多少次,产生的效果和返回的结果都是一样的。

适当关闭任务结果

Celery 支持任务实时处理,通过 AsyncResult 对象能够获取任务不同阶段的状态。

  • PENDING
  • RECEIVED
  • STARTED
  • SUCCESS
  • FAILURE
  • FAILURE
  • RETRY

但这种「返回」显然是有代价的,所以 Celery 默认会将它 Disabled。

  • 配置全局关闭返回任务结果:
CELERY_TASK_IGNORE_RESULT = True
  • 配置局部关闭返回任务结果:
@app.task(ignore_result=True)
def add(...):

假如说程序的业务逻辑判断的确需要根据任务执行状态来进行调整,那么我们应该考虑是否能够使用任务操作对象的状态属性来替代任务执行状态。比如:Web 系统实现的异步文件上传,文件就是任务操作对象,它有必要持久化为数据库记录,并在上传的过程中不断更新其进度和状态。那么后面的业务逻辑就可以根据数据库记录的状态属性进行判断,避免反复获取任务状态。

当然了,是否关闭任务结果需要视乎于实际场景,只是我们应该本着尽可能关闭它的原则。

需要注意的是,任务异常不属于常规的任务状态,但它同样被包含在任务返回值中。如果你仅仅是希望能够持久化这些异常信息以便用于分析错误原因,而对别的结果不感兴趣。那么你可以使用数据库作为 Backend 的同时加入下列配置选项:

CELERY_TASK_IGNORE_RESULT = False
task_store_errors_even_if_ignored = True

为任务设置超时

为了避免由于某些原因任务使得任务一直处于进行中的状态,导致队列中的其他任务一直被阻塞。应该为任务执行设置超时时间。

CELERYD_TASK_TIME_LIMIT = 1800 

合理使用 ack_late 机制

使用 RabbitMQ 作为 Broker 的话,我们就可以应用 RabbitMQ 的 ACK 机制来保证任务的有效传递。即当 Worker 从 Broker 成功接收到的任务之后,会立即回传 ACK(Acknowledgement) 确认信息给 Broker,然后 Broker 再将任务从队列中移除。否则 Broker 会在一段时间之后将任务重新发送给别的 Worker,这就是任务的有效传递。

但是,在对任务执行要求严格的场景中,「有效传递」显然是不够的,「有效执行」才可以。

为了实现「有效执行」,Celery 在 ACK 的基础上提供了 ack_late 机制。只有当任务完成(成功/失败)之后,才会向 Broker 回传 ACK 信息。ack_late 对于这种场景是很有效的,而代价就是消息队列的性能会降低,毕竟任务占用队列资源的时间变长了。

为某个任务开启 ack_late 机制:

@app.task(ack_late=True)
def add(...):

是否开启 ack_late 机制,同样需要根据实际需求来权衡。

传递对象的唯一标识

非常不推荐直接将对象序列化后作为任务传递,而是应该传递用于获取对象的唯一标识。例如:不要尝试将数据库的 ORM 对象作为任务传递,而是传递 ORM 对象的主键 id。当任务执行到需要使用 ORM 对象时,再通过 id 从数据库实时获取,避免了 ORM 对象因为队列阻塞导致过期的情况。

预防 Worker 内存泄漏

一个 Worker 在处理了大量的任务之后,可能会出现内存泄漏的问题。建议全局设置 Worker 最大的执行任务数量,即 Worker 在执行了一定数量的任务之后就会主动退出。

CELERYD_MAX_TASKS_PER_CHILD = 40

合理安排定时任务的调度周期

定时任务的调度计划应该经过慎重科学的设计,切忌不假思索的选择凌晨或某一个整点作为任务执行时间。

通过继承 Task 类来自定义任务装饰器

本站公众号
   欢迎关注本站公众号,获取更多程序园信息
开发小院