文章目录
Flask项目中Celery做异步任务遇到的一些坑
项目目录结构
一、celery_app.py 创建celery_app
- 我选择celery_app放在celery_tasks文件夹目录下
- celery异步任务中若要用到Sqlalchemy做ORM的话,需要让application导入mysql_config
- 例如有两个异步任务分别放在了”user/tool/semantic_analysis_tasks.py”以及”globals/home.py”两个py文件里,那么需要在celery_server中用include参数进行路径的导入才能正常使用任务
- 下面是celery_app.py文件的示例代码
# -*- coding: utf8 -*-
# package import
from celery import Celery
from flask import Flask
from config import celery_config, mysql_config
from dao.mysql.a1_and_cloud.connect import MySQLConnection
from services.helpler.data_operate import LogDataOperate
from util.exts import db
"""
新初始化一个Flask,否则会造成循环引用。
名字一定不能与app.py中的Flask应用重名
例如app.py中的Flask应用的名称为app,tasks.py文件中的Flask应用名称为application
"""
application = Flask(__name__)
# import celery_config and mysql_config for application
application.config.from_object(celery_config)
application.config.from_object(mysql_config)
# init db
db.init_app(application)
# make celery function
def make_celery(input_app):
celery_server = Celery(
input_app.import_name, backend=input_app.config['RESULT_BACKEND'], broker=input_app.config['BROKER_URL'],
include=['celery_tasks.globals.home', 'celery_tasks.user.tool.semantic_analysis_tasks']
)
celery_server.conf.update(input_app.config)
task_base = celery_server.Task
class ContextTask(task_base):
abstract = True
def __call__(self, *args, **kwargs):
with input_app.app_context():
return task_base.__call__(self, *args, **kwargs)
celery_server.Task = ContextTask
return celery_server
celery_app = make_celery(application)
"""
if you don't want to use include params in celery_server, you can use following code, it's same to include.
"""
# celery_app.autodiscover_tasks(['celery_tasks.user.tool.semantic_analysis_tasks', 'celery_tasks.globals.home'])
二、manage.py 数据迁移
- Flask中若要使用ORM的话,需要自己写个manage.py文件进行迁移。而项目中要用到celery做异步任务的话,manage.py与纯Flask项目有些区别。下面是用到celery的manage.py的示例代码
"""
# -*- coding:utf-8 -*-
用命令对数据库表进行迁移
这是Flask项目进行迁移的命令
python manage.py db init
python manage.py db migrate
python manage.py db upgrade
如果想回退到上一级迁移文件的话,要用到下面这个命令
python manage.py db downgrade
"""
# package import
from flask_script import Manager
from flask_migrate import Migrate, MigrateCommand
# import app and celery_app
from app import app
from celery_tasks.celery_app import celery_app
from models.models import *
from util.exts import db
"""
tips:
You must define two manager if you want to use celery, because Flask basic app and Celery app is different app
This is important !
If you don't create celery_manager, celery task will except a lot of exception, such as SqlAlchemy don't callback
"""
# define the manager for flask basic app
manager = Manager(app)
# use Migrate band db and flask basic app
migrate = Migrate(app, db)
# add migrate command to flask basic app manager
manager.add_command('db', MigrateCommand)
# define the manager for celery app
celery_manager = Manager(celery_app)
# use Migrate band db and celery app
celery_migrate = Migrate(celery_app, db)
# add migrate command to celery basic app manager
celery_manager.add_command('db', MigrateCommand)
# run flask basic app manager and celery app manager
if __name__ == "__main__":
# run manager and celery_manager
manager.run()
celery_manager.run()
三、start_celery.sh 启动celery服务
- celery_app我放在了celery_tasks目录下,所以需要用下面的命令启动celery服务
# start celery serve
celery -A celery_tasks.celery_app.celery_app worker -l info -P eventlet -c 10
四、get_celery_result.py 获取任务结果
- 如果要获取任务结果的话,AsyncResult中传入的id一定要是字符串,负责会报错,下面是获取celery任务结果的示例代码
# -*- coding: utf8 -*-
# package import
from celery.result import AsyncResult
from celery_tasks.celery_app import celery_app
# create class CeleryResultGet
class CeleryResultGet:
def __init__(self, task_id, data_type):
"""
:param task_id: celery task id
:param data_type: the type of you want to get (status, result...)
"""
self.task_id = str(task_id)
self.data_type = data_type
self.data = None
# get celery task data function
def get_task_data(self):
if self.data_type == 'status':
self.data = AsyncResult(id=self.task_id, app=celery_app).status
elif self.data_type == 'result':
self.data = AsyncResult(id=self.task_id, app=celery_app).result
return self.data
目前就遇到这些问题,有新的坑再更新
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/199452.html