• 首页 首页 icon
  • 工具库 工具库 icon
    • IP查询 IP查询 icon
  • 内容库 内容库 icon
    • 快讯库 快讯库 icon
    • 精品库 精品库 icon
    • 问答库 问答库 icon
  • 更多 更多 icon
    • 服务条款 服务条款 icon

Celery使用优秀的python异步任务框架

武飞扬头像
coder Ethan
帮助3

Celery 简介

介绍

Celery 是一个简单、灵活且可靠的,处理大量消息的分布式系统,并且提供维护这样一个系统的必需工具。

它是一个专注于实时处理的任务队列,同时也支持任务调度。

Celery 通过消息机制进行通信,通常需要中间人(Broker)和工作者(Worker)来进行调节。其中Broker就是消息中间件(常用的rabbitmq和redis),主要用来进行发送和接收消息;Worker就是任务的执行单元,通常是开发者来自己定义任务的内容。

Celery 特点:

  • 简单:Celery 上手比较简单,不需要配置文件就可以直接运行;
  • 高可用:如果出现丢失连接或连接失败,职程(Worker)和客户端会自动重试,并且中间人通过 主/主 主/从 的方式来进行提高可用性;
  • 快速:单个 Celery 进行每分钟可以处理数以百万的任务,而且延迟仅为亚毫秒(使用 RabbitMQ、 librabbitmq 在优化过后);
  • 灵活:Celery 的每个部分几乎都可以自定义扩展和单独使用,例如自定义连接池、序列化方式、压缩方式、日志记录方式、任务调度、生产者、消费者、中间人(Broker)等。

Celery 主要用途:

  • 异步执行:解决耗时任务,将耗时操作任务提交给Celery去异步执行,比如发送短信/邮件、消息推送、音视频处理等等;
  • 延迟执行:解决延迟任务,比如有些跑批接口的任务,需要耗时比较长;
  • 调度:可以通过调度功能在一段时间内指定任务的执行时间 datetime,也可以根据简单每隔一段时间进行执行重复的任务,支持分钟、小时、星期几,也支持某一天或某一年的Crontab表达式。

Celery 主要架构:

celery 的5个主要角色:

  1. Task:就是任务,有异步任务和定时任务;
  2. Broker:中间人,接收生产者发来的消息即Task,将任务存入队列。任务的消费者是Worker。Celery本身不提供队列服务,推荐用Redis或RabbitMQ实现队列服务;
  3. Worker:执行任务的单元,它实时监控消息队列,如果有任务就获取任务并执行它;
  4. Beat:定时任务调度器,根据配置定时将任务发送给Broker;
  5. Backend:用于存储任务的执行结果(例如:SQLAlchemy/Django] ORM、Memcached、Redis、 RPC RabbitMQ/AMQP)以及自定义的后端结果存储中间件)。

学新通

安装

安装比较简单,命令如下:

pip install Celery

另外Celery 自定义了一组用于安装 Celery 和特定功能的依赖。可以在中括号加入您需要依赖,并可以通过逗号分割需要安装的多个依赖包,例如安装rabbitmq和redis依赖,如下:

pip install "celery[librabbitmq,redis]"

注意:RabbitMQ 是默认的中间人(Broker),只需要配置连接的URL即可,不需要安装额外的的配置以及初始化配置信息。但是使用redis作为中间件时(或作为结果存储),必须要安装 Celery 的依赖库,按照上面的命令即可。

基本使用

选用rabbitmq作为Broker。

创建一个工程,保存为一个 app 文件中。针对大型的项目,可能需要创建 独立的模块。

首先创建 tasks.py:

from celery import Celery


app = Celery('tasks',
             broker='amqp://guest:guest@localhost:5672',
             backend='redis://localhost')


@app.task
def add(x, y):
    return x   y

Celery参数:

第一个参数为当前模块的名称,只有在 __main__模块中定义任务时才会生产名称。

第二个参数为中间人(Broker)的链接 URL ,实例中使用的 RabbitMQ(Celery默认使用的也是RabbitMQ)。

第三个参数为后端结果链接,实例中使用的redis。

这里创建了一个名称为 add 的任务,返回的俩个数字的和。

然后启动celery服务:

celery -A tasks worker --loglevel=info

成功运行后,打印如下信息:

 -------------- celery@testdeMacBook-Pro.local v5.2.7 (dawn-chorus)
--- ***** ----- 
-- ******* ---- macOS-10.16-x86_64-i386-64bit 2023-01-03 21:39:48
- *** --- * --- 
- ** ---------- [config]
- ** ---------- .> app:         tasks:0x7fcbf8ce4e20
- ** ---------- .> transport:   amqp://guest:**@localhost:5672//
- ** ---------- .> results:     redis://localhost/
- *** --- * --- .> concurrency: 8 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** ----- 
 -------------- [queues]
                .> celery           exchange=celery(direct) key=celery
                

[tasks]
  . tasks.add

[2023-01-03 21:39:48,906: INFO/MainProcess] Connected to amqp://guest:**@127.0.0.1:5672//
[2023-01-03 21:39:48,916: INFO/MainProcess] mingle: searching for neighbors
[2023-01-03 21:39:49,957: INFO/MainProcess] mingle: all alone
[2023-01-03 21:39:50,007: INFO/MainProcess] celery@testdeMacBook-Pro.local ready.

其中可以看到配置信息和任务信息。

最后调用任务:

需要调用我们创建的实例任务,可以通过 delay() 进行调用。

delay()apply_async() 的快捷方法,可以更好的控制任务的执行

>>> from tasks import add
>>> add.delay(4, 4)
<AsyncResult: e4722dc5-1ccd-45b5-ba89-75ff3edf4255>

调用任务会返回一个 AsyncResult 的实例,用于检测任务的状态,等待任务完成获取返回值(如果任务执行失败,会抛出异常)。默认这个功能是不开启的,到那时我们配置了backend,所以可以查看任务的状态,如下:

>>> result.ready() # ready() 可以检测是否已经处理完毕:
True
>>> result.get(timeout=1)  # get获取任务执行结果
8

如果任务出现异常,可以通过以下命令进行回溯:

>>> result.traceback

同时该任务已经有被celery Worker开始处理,可以在启动celery的控制台输出的日志进行查看执行情况,如下可以看到任务被成功执行:

[2023-01-03 21:48:08,374: INFO/MainProcess] Task tasks.add[1b5f11de-c113-4f07-b2de-52c7f7594959] received
[2023-01-03 21:48:08,390: INFO/ForkPoolWorker-8] Task tasks.add[1b5f11de-c113-4f07-b2de-52c7f7594959] succeeded in 0.013839624999999245s: 8

Flask使用Celery

异步任务

下面来看下在实际flask项目中,如何使用Celery。

1.创建flask项目

使用pycharm创建一个flask项目,名称为flask_celery。

同时创建一个文件夹,名称为celery_tasks,专门用于管理celery相关的文件。

2.celery配置

针对较大型的项目,建议使用专用配置模块,进行针对 Celery 配置。不建议使用硬编码,建议将所有的配置项集中化配置。

需要在celery_tasks文件夹下创建一个名为 config.py 的文件,添加以下配置内容:

broker_url = 'amqp://guest:guest@localhost:5672'
result_backend = 'redis://:123456@localhost:6379/8'

更多配置后面会讲解。

创建完后可以通过 app.config_from_object() 进行加载配置模块。

3.创建Celery app实例

在celery_tasks文件夹下创建main.py文件,代码如下:

from celery import Celery
from celery_tasks import config


celery_app = Celery(__name__)

# 加载配置文件
celery_app.config_from_object(config)

# 自动搜寻异步任务
celery_app.autodiscover_tasks(['celery_tasks.sms'])

注意:autodiscover_tasks用来自动搜寻我们指定文件夹下创建的任务,但是任务所在的文件必须叫tasks.py。

当然也可以在创建celery对象时使用参数include直接指定搜寻任务文件夹,如下:

celery_app = Celery('tasks',
                    broker='redis://127.0.0.1:6379/1',
                    backend='redis://127.0.0.1:6379/2',
                    include=[
                        'celery_tasks.sms',
                    ])

4.创建任务

创建一个发送短信验证码的任务:先在celery_tasks文件夹下创建sms文件夹,然后创建一个tasks.py(为了能被自动搜寻到)文件,编辑代码如下:

import time

from celery_tasks.celery_service import celery_app


@celery_app.task
def send_sms(phone, data):
    """模拟发送短信"""
    time.sleep(3)
    print("发送短信成功")
    return True

5.创建业务接口

一般真实业务中,我们的任务都是由flask项目中的业务去调用。这里创建一个名称为sms的接口,来调用发送短信验证码的任务,代码如下:

from flask import Flask

from celery_tasks.sms_task import send_sms

app = Flask(__name__)


@app.route('/')
def hello_world():
    return 'Hello World!'


@app.route('/sms')
def sms():
    # 做一些业务操作
    # delay函数会把任务交给celery去执行,然后立即返回,所以是异步
    send_sms.delay()

    return 'send sms success!'


if __name__ == '__main__':
    app.run()

6.启动flask项目

启动flask项目即可

7.启动celery

注意,启动时,不要切换到celery_app所在文件夹下,而是在项目根目录即可(后面会说明为什么)。

执行命令:

celery -A celery_tasks.main worker -l info
 -------------- celery@testdembp v5.2.7 (dawn-chorus)
--- ***** ----- 
-- ******* ---- macOS-10.16-x86_64-i386-64bit 2023-01-05 00:14:03
- *** --- * --- 
- ** ---------- [config]
- ** ---------- .> app:         celery_tasks.celery_service:0x7f95f8a69f70
- ** ---------- .> transport:   amqp://guest:**@localhost:5672//
- ** ---------- .> results:     redis://:**@localhost:6379/8
- *** --- * --- .> concurrency: 8 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** ----- 
 -------------- [queues]
                .> celery           exchange=celery(direct) key=celery
                

[tasks]
  . celery_tasks.sms.tasks.send_sms

[2023-01-05 00:14:04,226: INFO/MainProcess] Connected to amqp://guest:**@127.0.0.1:5672//
[2023-01-05 00:14:04,235: INFO/MainProcess] mingle: searching for neighbors
[2023-01-05 00:14:05,273: INFO/MainProcess] mingle: all alone
[2023-01-05 00:14:05,298: INFO/MainProcess] celery@testdembp ready.

可以看到tasks里已经有我们的celery_tasks.sms.tasks.send_sms任务了,注意这里的任务名就是这个包含路径的全名,而不只是send_sms这个函数名。

注意:正式环境启动时,可以使用如下命令启动celer worker

nohup celery -A proj worker - P gevent -c 1000 > celery.log 2>&1 &

# 1.nohup 和末尾的&:后台执行,忽略所有挂断
# 2.- P gevent -c 1000 :开1000个协程执行,可根据业务调整

8.测试

访问我们的sms接口:http://127.0.0.1:5000/sms。然后可以看到接口是立即返回的,没有延迟3秒。然后再celery任务控制台看到如下信息:

[2023-01-05 00:16:54,391: INFO/MainProcess] Task celery_tasks.sms.tasks.send_sms[23b58998-ab44-4bc6-8aff-bc6cc12aa664] received
[2023-01-05 00:16:57,394: WARNING/ForkPoolWorker-8] 发送短信成功
[2023-01-05 00:16:57,402: INFO/ForkPoolWorker-8] Task celery_tasks.sms.tasks.send_sms[23b58998-ab44-4bc6-8aff-bc6cc12aa664] succeeded in 3.0086944159999973s: True

任务被成功执行,且看到最后耗时3.0086944159999973s,返回True。

看一下整个项目的结构如下:

学新通

最后特别说明:

整个项目其实由三部分组成:客户端(flask应用)、Celery服务端、中间件(broker)。这三部分在实际成产中都可以分开单独去部署,Celery本身也是一个单独的服务,只不过为了方便,写在了flask项目中,实际也可以把celery_tasks文件夹单独拿出来,单独去部署启动服务。但是文件分开后注意一点,flask客户端去调用celery里的异步任务的时候,是通过任务名(就是上面我们看到的文件路径加函数名组成的)放到Broker中,celery服务从Broker获取任务名去执行。flask可以没有具体的异步任务逻辑代码,只要一个任务名就行,但是为了flask能顺利调用异步任务,flask项目使用celery的相关文件的目录结构要与celery本身服务的目录结构相同,这样保证了flask客户端和celery服务两边的任务名相同。

当celery异步任务中需要flask上下文时,就不宜把flask项目和celery项目分开了。

还有一点是,除非有特殊要求,否则任务最好不要返回值,能省去后端结果存储、提高性能。

定时任务

创建定时任务时,使用celery的celery beat,它是一个调度程序,定期启动任务,然后由集群中的可用节点执行任务。

默认情况下会从配置中的 beat_schedule 项中获取条目(entries,也就是要执行的具体的定时任务),但是也可以使用自定义存储,例如将entries存储在SQL数据库中。

时区设置:默认情况下,定期任务计划使用UTC时区,但是可以使用时区设置更改使用的时区。

例如配置 Asia/Shanghai:

broker_url = 'amqp://guest:guest@localhost:5672'
result_backend = 'redis://:123456@localhost:6379/8'
# 时区设置
timezone = 'Asia/Shanghai'

也可以你直接使用app.conf.timezone ='Asia/Shanghai'来直接配置。

首先创建我们的任务,在celery_tasks文件夹创建一个新文件夹scheduled,然后创建tasks.py文件,编辑 celery_tasks/scheduled/tasks.py,代码如下:

from celery_tasks.main import celery_app


@celery_app.task
def scheduled_task(name):
    """模拟定时任务"""
    print(f"{name}执行定时任务")
    return True

然后在周期调度列表中添加条目(entry),建议使用如下方式添加,编辑celery_tasks/main.py:

from datetime import timedelta

from celery import Celery

# from app import app
from celery_tasks import config


celery_app = Celery(__name__)

# 加载配置文件
celery_app.config_from_object(config)

# 自动搜寻异步任务
celery_app.autodiscover_tasks(['celery_tasks.sms', 'celery_tasks.scheduled'])

# 设置定时任务
celery_app.conf.beat_schedule = {
    # 名字,最好做到见名知意
    'add-every-10-seconds': {
        # 执行scheduled.tasks下的scheduled_task函数
        'task': 'celery_tasks.scheduled.tasks.scheduled_task',
        # 每隔3秒执行一次
        'schedule': 3.0,
        # 'schedule': timedelta(seconds=3),
        # 传递参数
        'args': ('张三',)
    },
}

设置时间间隔时,也可以使用timedelta(不仅可以设置秒,也可以设置天,年等间隔),这样更加灵活。

下面启动我们的celery worker服务

 celery -A celery_tasks.main worker -l info
 # 显示如下启动成功
  
 -------------- celery@testdembp v5.2.7 (dawn-chorus)
--- ***** ----- 
-- ******* ---- macOS-10.16-x86_64-i386-64bit 2023-01-07 12:14:20
- *** --- * --- 
- ** ---------- [config]
- ** ---------- .> app:         celery_tasks.main:0x7fbb2066c3d0
- ** ---------- .> transport:   amqp://guest:**@localhost:5672//
- ** ---------- .> results:     redis://:**@localhost:6379/8
- *** --- * --- .> concurrency: 8 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** ----- 
 -------------- [queues]
                .> celery           exchange=celery(direct) key=celery
                

[tasks]
  . celery_tasks.scheduled.tasks.scheduled_task
  . celery_tasks.sms.tasks.send_sms

[2023-01-07 12:14:21,070: INFO/MainProcess] Connected to amqp://guest:**@127.0.0.1:5672//
[2023-01-07 12:14:21,083: INFO/MainProcess] mingle: searching for neighbors
[2023-01-07 12:14:22,119: INFO/MainProcess] mingle: all alone
[2023-01-07 12:14:22,145: INFO/MainProcess] celery@testdembp ready.

这时候是不会执行任何任务的。与一般的异步任务不同的是,定时任务是依赖celery beat服务的,需要单独再启动celery beat服务,如下:

celery -A celery_tasks.main beat
# 显示如下则启动成功
celery beat v5.2.7 (dawn-chorus) is starting.
__    -    ... __   -        _
LocalTime -> 2023-01-07 12:07:10
Configuration ->
    . broker -> amqp://guest:**@localhost:5672//
    . loader -> celery.loaders.app.AppLoader
    . scheduler -> celery.beat.PersistentScheduler
    . db -> celerybeat-schedule
    . logfile -> [stderr]@%WARNING
    . maxinterval -> 5.00 minutes (300s)

beat进程会读取配置文件的内容,周期性的将配置中到期需要执行的任务发送给任务队列,然后worker进程从队列读取执行。

启动后,在worker服务的终端马山可以看到任务已经执行了,如下:

[2023-01-07 12:14:28,999: INFO/MainProcess] Task celery_tasks.scheduled.tasks.scheduled_task[64510402-3a67-43f4-b1f3-60477f873e7b] received
[2023-01-07 12:14:29,001: WARNING/ForkPoolWorker-8] 张三执行定时任务
[2023-01-07 12:14:29,008: INFO/ForkPoolWorker-8] Task celery_tasks.scheduled.tasks.scheduled_task[64510402-3a67-43f4-b1f3-60477f873e7b] succeeded in 0.007088375000000369s: True
[2023-01-07 12:14:31,973: INFO/MainProcess] Task celery_tasks.scheduled.tasks.scheduled_task[d81cf939-c24d-4cdd-b111-03bfa4a315b8] received
[2023-01-07 12:14:31,974: WARNING/ForkPoolWorker-8] 张三执行定时任务
[2023-01-07 12:14:31,976: INFO/ForkPoolWorker-8] Task celery_tasks.scheduled.tasks.scheduled_task[d81cf939-c24d-4cdd-b111-03bfa4a315b8] succeeded in 0.0017581670000001992s: True
[2023-01-07 12:14:34,970: INFO/MainProcess] Task celery_tasks.scheduled.tasks.scheduled_task[385e75e0-e9c6-4510-993d-12a82799e5dc] received
[2023-01-07 12:14:34,971: WARNING/ForkPoolWorker-8] 张三执行定时任务
[2023-01-07 12:14:34,972: INFO/ForkPoolWorker-8] Task celery_tasks.scheduled.tasks.scheduled_task[385e75e0-e9c6-4510-993d-12a82799e5dc] succeeded in 0.0008522909999992834s: True

可以看到,确实任务每3秒执行一次。

设置任务时,还可以更加灵活的使用Crontab 调度器,例如一天中的特定时间或一周中的某天执行任务。

如下,设置任务每周一的上午7:30分执行:

from celery.schedules import crontab

app.conf.beat_schedule = {
    # Executes every Monday morning at 7:30 a.m.
    'add-every-monday-morning': {
        'task': 'celery_tasks.scheduled.tasks.scheduled_task',
        'schedule': crontab(hour=7, minute=30, day_of_week=1),
        'args': ('张三',),
    },
}

Celery使用Flask上下文

参考:https://flask.palletsprojects.com/en/2.2.x/patterns/celery/

关于Flask的上下文原理参考文章:https://blog.csdn.net/qq_43745578/article/details/129574039?spm=1001.2014.3001.5501

根据Flask上下文原理介绍,我们知道:

在一个非请求的线程中,想要使用应用上下文,必须要 手动把上下文push到栈中,文章中介绍了两种方法。

在Celery执行任务时,也一样,由于Celery是异步的,和请求并不是一个线程,所以Celery要想使用上下文,也需要同样把上下文手动push到栈中。这里我们使用with app.app_context()

同时我们需要使用工厂函数模式来创建appcelery_app

修改我们的项目结构如下:

学新通

主要变化如下:

1.增加一个app文件夹,用于创建Flaskapp和Celery app

2.celery_tasks文件夹下的__init__.py文件增加创建Celery app的工厂方法。

3.增加run.py取代之前的app.py,用于启动flask应用。

具体代码如下:

(1)celery_tasks/__init__.py

这个文件增加用来创建Celery app的工厂方法:

from celery import Celery, Task
from flask import Flask


def celery_init_app(app: Flask) -> Celery:
    # 重写celery的task类,主要把任务放在上下文中执行
    class FlaskTask(Task):
        def __call__(self, *args: object, **kwargs: object) -> object:
            with app.app_context():
                return self.run(*args, **kwargs)

    celery_app = Celery(app.name, task_cls=FlaskTask)
    # 把celery赋值到app的属性中,便于后面取
    app.extensions["celery"] = celery_app
    return celery_app

(2)app_factory/__init__.py

这个文件增加flask app的工厂方法,用于创建flask app 和 celery app:

from flask import Flask
from celery_tasks import  celery_init_app


def create_app() -> Flask:
    app = Flask(__name__)
    celery_init_app(app)
    return app


flask_app = create_app()
celery_app = flask_app.extensions["celery"]

(3)celery_tasks/main.py

修改次文件,不再创建celery app,而是使用app_factory/__init__.py文件创建的 celery app。

from celery import Celery

from celery_tasks import config
from app_factory import celery_app

# celery_app = Celery(__name__)


# 加载配置文件
celery_app.config_from_object(config)

# 自动搜寻异步任务
celery_app.autodiscover_tasks(['celery_tasks.sms', 'celery_tasks.scheduled', 'celery_tasks.context_tasks'])

# 设置定时任务
celery_app.conf.beat_schedule = {
    # 名字,最好做到见名知意
    'add-every-10-seconds': {
        # 执行scheduled.tasks下的scheduled_task函数
        'task': 'celery_tasks.scheduled.tasks.scheduled_task',
        # 每隔3秒执行一次
        'schedule': 3.0,
        # 'schedule': timedelta(seconds=3),
        # 传递参数
        'args': ('张三',)
    },
}

配置文件还和以前一样。

(4)创建celery任务

新建文件夹celery_tasks/context_tasks,在里面创建文件tasks.py

from app_factory import celery_app
from flask import current_app


@celery_app.task
def context_task():
    """celery中使用flask上下文"""
    print("context_task")
    with current_app.app_context():
        print(f"current_app mysql:{current_app.mysql}")
        return str(current_app.config)

这里主要创建一个task,并且使用了flask中的应用上下文,打印出来应用的name。

(5)run.py

创建视图函数,启动app:

from app_factory import flask_app as app
from celery_tasks.context_tasks.tasks import context_task
from celery_tasks.sms.tasks import send_sms


@app.route('/sms', methods=['GET'])
def sms():
    send_sms.delay()
    return 'send sms success!'

@app.route('/app_context', methods=["GET"])
def test_flask_app_context():
    result = context_task.delay()
    print(result)
    return "success!"


if __name__ == '__main__':
    app.run()

这里的app使用我们工厂函数创建的app。

(6)启动celery

 celery -A celery_tasks.main worker -l info

(7)测试

访问路由http://127.0.0.1:5000/app_context,看到celery任务后台打印如下:

[2023-03-21 19:04:37,353: INFO/MainProcess] Task celery_tasks.context_tasks.tasks.context_task[70891026-47f7-48b1-bb9c-41aa08be51c3] received
[2023-03-21 19:04:37,356: WARNING/ForkPoolWorker-8] context_task
[2023-03-21 19:04:37,359: WARNING/ForkPoolWorker-8] current_app mysql:mysql1
[2023-03-21 19:04:37,367: INFO/ForkPoolWorker-8] Task celery_tasks.context_tasks.tasks.context_task[70891026-47f7-48b1-bb9c-41aa08be51c3] succeeded in 0.011942791999999702s: '<Config {\'ENV\': \'production\', \'DEBUG\': False, \'TESTING\': False, \'PROPAGATE_EXCEPTIONS\': None, \'PRESERVE_CONTEXT_ON_EXCEPTION\': None, \'SECRET_KEY\': None, \'PERMANENT_SESSION_LIFETIME\': datetime.timedelta(days=31), \'USE_X_SENDFILE\': False, \'SERVER_NAME\': None, \'APPLICATION_ROOT\': \'/\', \'SESSION_COOKIE_NAME\': \'session\', \'SESSION_COOKIE_DOMAIN\': None, \'SESSION_COOKIE_PATH\': None, \'SESSION_COOKIE_HTTPONLY\': True, \'SESSION_COOKIE_SECURE\': False, \'SESSION_COOKIE_SAMESITE\': None, \'SESSION_REFRESH_EACH_REQUEST\': True, \'MAX_CONTENT_LENGTH\': None, \'SEND_FILE_MAX_AGE_DEFAULT\': None, \'TRAP_BAD_REQUEST_ERRORS\': None, \'TRAP_HTTP_EXCEPTIONS\': False, \'EXPLAIN_TEMPLATE_LOADING\': False, \'PREFERRED_URL_SCHEME\': \'http\', \'JSON_AS_ASCII\': True, \'JSON_SORT_KEYS\': True, \'JSONIFY_PRETTYPRINT_REGULAR\': False, \'JSONIFY_MIMETYPE\': \'application/json\', \'TEMPLATES_AUTO_RELOAD\': None, \'MAX_COOKIE_SIZE\': 4093}>'

进阶使用参考

停止Worker

使用 Control c 就可以停止职程(Worker)

后台运行

正式环境后台启动celery命令:

nohup celery -A proj worker - P gevent -c 1000 > celery.log 2>&1 &

# 1.nohup 和末尾的&:后台执行,忽略所有挂断
# 2.- P gevent -c 1000 :开1000个协程执行,可根据业务调整

参考:

https://www.celerycn.io/ru-men/celery-jian-jie

https://blog.csdn.net/u010339879/article/details/97691231

这篇好文章是转载于:学新通技术网

  • 版权申明: 本站部分内容来自互联网,仅供学习及演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,请提供相关证据及您的身份证明,我们将在收到邮件后48小时内删除。
  • 本站站名: 学新通技术网
  • 本文地址: /boutique/detail/tanhfehkkj
系列文章
更多 icon
同类精品
更多 icon
继续加载