Flask项目中Celery的使用
一、项目目录结构
二、celery_config.py
- 在config中新增文件celery_config.py
# -*- coding: utf8 -*-
# package import
from .redis_config import REDIS_HOST, REDIS_PORT, REDIS_PASSWORD
BROKER_URL = 'redis://:{}@{}:{}/1'.format(REDIS_PASSWORD, REDIS_HOST, REDIS_PORT)
RESULT_BACKEND = 'redis://:{}@{}:{}/2'.format(REDIS_PASSWORD, REDIS_HOST, REDIS_PORT)
TASK_SERIALIZER = 'json'
RESULT_SERIALIZER = 'json'
ACCEPT_CONTENT = ['json']
TIMEZONE = 'Asia/Shanghai'
ENABLE_UTC = True
三、task.py
- 在app.py的同级目录下创建task.py文件
-*- coding: utf8 -*-
# package import
import time
from celery import Celery
from flask import Flask
from config import celery_config
"""
新初始化一个Flask,否则会造成循环引用。
名字一定不能与app.py中的Flask应用重名
例如app.py中的Flask应用的名称为app,tasks.py文件中的Flask应用名称为application
"""
application = Flask(__name__)
application.config.from_object(celery_config)
def make_celery(app): # 官方提供的使用方法
celery_server = Celery(app.import_name, backend=app.config['RESULT_BACKEND'],
broker=app.config['BROKER_URL'])
celery_server.conf.update(app.config)
task_base = celery_server.Task
class ContextTask(task_base):
abstract = True
def __call__(self, *args, **kwargs):
with app.app_context():
return task_base.__call__(self, *args, **kwargs)
celery_server.Task = ContextTask
return celery_server
celery = make_celery(application)
# 增加一个实例任务,add()
@celery.task
def add(x, y):
time.sleep(10)
return x + y
四、在蓝图中使用
@user_semantic_analysis.route('/hello/celery', methods=['GET', 'POST'])
def hello_celery():
# 使用add()方法
add.delay(1, 5320)
return 'hello celery'
五、启动celery服务
# 在终端中输入命令启动celery服务
celery -A tasks worker -l info -P eventlet -c 10
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/199453.html