匠心精神 - 良心品质腾讯认可的专业机构-IT人的高薪实战学院

咨询电话:4000806560

Python开发者必学:如何使用Celery进行任务调度和消息队列

Python开发者必学:如何使用Celery进行任务调度和消息队列

随着互联网的发展,需要进行异步任务处理的情况越来越多。比如发送邮件、生成缩略图、处理数据、推送消息等,这些任务比较耗时,不能在请求处理中直接执行。一般情况下,我们采用异步任务进行处理,从而提高系统的性能和可用性。Celery就是一款强大的分布式任务队列,用于处理大量异步任务。

1. 简介

Celery是一个分布式任务队列,它支持多种调度器(beat)、消息队列(broker)和后端存储(backend)。它可以将任务分发到多个worker节点中执行,从而实现分布式和异步任务处理。

Celery的架构如下图所示:

![Celery架构图](https://cdn.jsdelivr.net/gh/itfanr/image-repo/blog/celery-architecture.png)

在这个图中,我们可以看到 Celery 由如下几部分组成:

- Client:客户端,它可以向 Celery 提交任务,并获取任务执行结果。
- Broker:消息队列,用于存放任务。Celery 支持多种 Broker,比如 RabbitMQ、Redis、MongoDB 等。
- Workers:执行任务的工作者,它们从 Broker 中获取任务,并执行任务。Celery 支持多种 Workers。
- Beat:定时任务调度器,用于执行周期性的任务。

在本文中,我们将重点介绍如何使用 Celery 完成任务调度和消息队列的功能。

2. 安装

在启动之前,我们需要首先安装 Celery,可以使用pip进行安装:

```bash
pip install Celery
```

3. 快速入门

现在让我们来看一个简单的例子,使用 Celery 来执行一个异步任务。下面我们定义一个函数,这个函数用于计算两个数的和,但是这个函数会比较耗时,我们希望用 Celery 来异步执行这个任务。

```python
# tasks.py
import time
from celery import Celery

app = Celery('tasks', broker='pyamqp://guest@localhost//')

@app.task
def add(x, y):
    time.sleep(5) # 模拟耗时操作
    return x + y
```

首先,在 tasks.py 文件中导入 celery 库和 Celery 类。然后我们创建了一个 Celery 实例,它的名字为 tasks,并设置了 Broker 的地址。在 add 函数头部增加了 @app.task 装饰器,将这个函数注册为任务。

接下来我们来编写客户端代码,使用 Celery 来异步执行 add 函数。

```python
# client.py
from tasks import add

result = add.delay(4, 4)
print(result.get())
```

在 client.py 中,我们从 tasks 模块导入了 add 函数。我们通过 add.delay(4, 4) 来异步执行任务,这个函数会返回一个 AsyncResult 对象,它表示了被调用任务的状态。在这个例子中,我们可以使用 result.get() 来获取任务的执行结果。

4. 定时任务

使用 Celery 可以很方便地实现定时任务。我们只需要使用 Beat 调度器,就可以轻松地实现定时任务的功能。

下面我们来看一个例子,我们需要每 5 秒执行一次 add 任务。

```python
# tasks.py
import time
from celery import Celery

app = Celery('tasks', broker='pyamqp://guest@localhost//')

@app.task
def add(x, y):
    return x + y

@app.task
def run_add():
    result = add.delay(4, 4)
    print(result.get())

app.conf.beat_schedule = {
    'run-add-every-5-seconds': {
        'task': 'tasks.run_add',
        'schedule': 5.0
    },
}
```

上面我们定义了一个名为 run_add 的任务,这个任务每次会调用 add 函数来执行任务。然后我们使用 app.conf.beat_schedule 来设置 Beat 调度器,它会每 5 秒钟执行一次 run_add 任务。

我们还需要在启动时启动 Beat 调度器。在命令行中执行下面的代码:

```bash
celery -A tasks beat
```

5. 消息队列

Celery 支持多种消息队列,例如 RabbitMQ、Redis、MongoDB 等。在本文中,我们将使用 RabbitMQ 作为消息队列。

如果您还没有安装 RabbitMQ,可以参考这个[安装指南](https://www.rabbitmq.com/download.html)。

我们首先需要定义 task.py 中的 Broker:

```python
# tasks.py
import time
from celery import Celery

app = Celery('tasks', broker='amqp://guest:guest@localhost:5672//')

...
```

这里我们将 Broker 修改为 amqp://guest:guest@localhost:5672//,表示我们使用 RabbitMQ 作为消息队列。其中 guest:guest 表示用户名和密码,localhost:5672 表示 RabbitMQ 服务器的地址和端口。

接下来,我们需要启动 RabbitMQ 服务器。在命令行中执行以下命令:

```bash
rabbitmq-server
```

然后,我们就可以运行任务并将它们提交到 RabbitMQ 队列中了:

```python
# client.py
from tasks import add

result = add.delay(4, 4)
print(result.get())
```

在上面的代码中,我们仅仅修改了 task.py 中 Broker 的地址,其他的代码都没有做任何修改。

6. 结论

在本文中,我们介绍了 Celery 的基本用法,包括任务调度和消息队列的功能。Celery 可以帮助我们实现异步任务处理,提高系统的性能和可用性。如果您需要进行异步任务处理,那么 Celery 绝对是一个值得尝试的工具。