Python开发者必学:如何使用Celery进行任务调度和消息队列 随着互联网的发展,需要进行异步任务处理的情况越来越多。比如发送邮件、生成缩略图、处理数据、推送消息等,这些任务比较耗时,不能在请求处理中直接执行。一般情况下,我们采用异步任务进行处理,从而提高系统的性能和可用性。Celery就是一款强大的分布式任务队列,用于处理大量异步任务。 1. 简介 Celery是一个分布式任务队列,它支持多种调度器(beat)、消息队列(broker)和后端存储(backend)。它可以将任务分发到多个worker节点中执行,从而实现分布式和异步任务处理。 Celery的架构如下图所示: ![Celery架构图](https://cdn.jsdelivr.net/gh/itfanr/image-repo/blog/celery-architecture.png) 在这个图中,我们可以看到 Celery 由如下几部分组成: - Client:客户端,它可以向 Celery 提交任务,并获取任务执行结果。 - Broker:消息队列,用于存放任务。Celery 支持多种 Broker,比如 RabbitMQ、Redis、MongoDB 等。 - Workers:执行任务的工作者,它们从 Broker 中获取任务,并执行任务。Celery 支持多种 Workers。 - Beat:定时任务调度器,用于执行周期性的任务。 在本文中,我们将重点介绍如何使用 Celery 完成任务调度和消息队列的功能。 2. 安装 在启动之前,我们需要首先安装 Celery,可以使用pip进行安装: ```bash pip install Celery ``` 3. 快速入门 现在让我们来看一个简单的例子,使用 Celery 来执行一个异步任务。下面我们定义一个函数,这个函数用于计算两个数的和,但是这个函数会比较耗时,我们希望用 Celery 来异步执行这个任务。 ```python # tasks.py import time from celery import Celery app = Celery('tasks', broker='pyamqp://guest@localhost//') @app.task def add(x, y): time.sleep(5) # 模拟耗时操作 return x + y ``` 首先,在 tasks.py 文件中导入 celery 库和 Celery 类。然后我们创建了一个 Celery 实例,它的名字为 tasks,并设置了 Broker 的地址。在 add 函数头部增加了 @app.task 装饰器,将这个函数注册为任务。 接下来我们来编写客户端代码,使用 Celery 来异步执行 add 函数。 ```python # client.py from tasks import add result = add.delay(4, 4) print(result.get()) ``` 在 client.py 中,我们从 tasks 模块导入了 add 函数。我们通过 add.delay(4, 4) 来异步执行任务,这个函数会返回一个 AsyncResult 对象,它表示了被调用任务的状态。在这个例子中,我们可以使用 result.get() 来获取任务的执行结果。 4. 定时任务 使用 Celery 可以很方便地实现定时任务。我们只需要使用 Beat 调度器,就可以轻松地实现定时任务的功能。 下面我们来看一个例子,我们需要每 5 秒执行一次 add 任务。 ```python # tasks.py import time from celery import Celery app = Celery('tasks', broker='pyamqp://guest@localhost//') @app.task def add(x, y): return x + y @app.task def run_add(): result = add.delay(4, 4) print(result.get()) app.conf.beat_schedule = { 'run-add-every-5-seconds': { 'task': 'tasks.run_add', 'schedule': 5.0 }, } ``` 上面我们定义了一个名为 run_add 的任务,这个任务每次会调用 add 函数来执行任务。然后我们使用 app.conf.beat_schedule 来设置 Beat 调度器,它会每 5 秒钟执行一次 run_add 任务。 我们还需要在启动时启动 Beat 调度器。在命令行中执行下面的代码: ```bash celery -A tasks beat ``` 5. 消息队列 Celery 支持多种消息队列,例如 RabbitMQ、Redis、MongoDB 等。在本文中,我们将使用 RabbitMQ 作为消息队列。 如果您还没有安装 RabbitMQ,可以参考这个[安装指南](https://www.rabbitmq.com/download.html)。 我们首先需要定义 task.py 中的 Broker: ```python # tasks.py import time from celery import Celery app = Celery('tasks', broker='amqp://guest:guest@localhost:5672//') ... ``` 这里我们将 Broker 修改为 amqp://guest:guest@localhost:5672//,表示我们使用 RabbitMQ 作为消息队列。其中 guest:guest 表示用户名和密码,localhost:5672 表示 RabbitMQ 服务器的地址和端口。 接下来,我们需要启动 RabbitMQ 服务器。在命令行中执行以下命令: ```bash rabbitmq-server ``` 然后,我们就可以运行任务并将它们提交到 RabbitMQ 队列中了: ```python # client.py from tasks import add result = add.delay(4, 4) print(result.get()) ``` 在上面的代码中,我们仅仅修改了 task.py 中 Broker 的地址,其他的代码都没有做任何修改。 6. 结论 在本文中,我们介绍了 Celery 的基本用法,包括任务调度和消息队列的功能。Celery 可以帮助我们实现异步任务处理,提高系统的性能和可用性。如果您需要进行异步任务处理,那么 Celery 绝对是一个值得尝试的工具。