Flask项目中使用Celery遇到的一些坑整理

如果你不相信努力和时光,那么成果就会是第一个选择辜负你的。不要去否定你自己的过去,也不要用你的过去牵扯你现在的努力和对未来的展望。不是因为拥有希望你才去努力,而是去努力了,你才有可能看到希望的光芒。Flask项目中使用Celery遇到的一些坑整理,希望对大家有帮助,欢迎收藏,转发!站点地址:www.bmabk.com,来源:原文

Flask项目中Celery做异步任务遇到的一些坑

项目目录结构

在这里插入图片描述

一、celery_app.py 创建celery_app

  • 我选择celery_app放在celery_tasks文件夹目录下
    在这里插入图片描述
  • celery异步任务中若要用到Sqlalchemy做ORM的话,需要让application导入mysql_config
  • 例如有两个异步任务分别放在了”user/tool/semantic_analysis_tasks.py”以及”globals/home.py”两个py文件里,那么需要在celery_server中用include参数进行路径的导入才能正常使用任务
  • 下面是celery_app.py文件的示例代码
# -*- coding: utf8 -*-

# package import
from celery import Celery
from flask import Flask
from config import celery_config, mysql_config
from dao.mysql.a1_and_cloud.connect import MySQLConnection
from services.helpler.data_operate import LogDataOperate
from util.exts import db

"""
新初始化一个Flask,否则会造成循环引用。
名字一定不能与app.py中的Flask应用重名
例如app.py中的Flask应用的名称为app,tasks.py文件中的Flask应用名称为application
"""

application = Flask(__name__)

# import celery_config and mysql_config for application
application.config.from_object(celery_config)
application.config.from_object(mysql_config)

# init db
db.init_app(application)


# make celery function
def make_celery(input_app):
    celery_server = Celery(
        input_app.import_name, backend=input_app.config['RESULT_BACKEND'], broker=input_app.config['BROKER_URL'],
        include=['celery_tasks.globals.home', 'celery_tasks.user.tool.semantic_analysis_tasks']
    )
    celery_server.conf.update(input_app.config)

    task_base = celery_server.Task

    class ContextTask(task_base):
        abstract = True

        def __call__(self, *args, **kwargs):
            with input_app.app_context():
                return task_base.__call__(self, *args, **kwargs)

    celery_server.Task = ContextTask
    return celery_server


celery_app = make_celery(application)

"""
if you don't want to use include params in celery_server, you can use following code, it's same to include.  
"""
# celery_app.autodiscover_tasks(['celery_tasks.user.tool.semantic_analysis_tasks', 'celery_tasks.globals.home'])

二、manage.py 数据迁移

  • Flask中若要使用ORM的话,需要自己写个manage.py文件进行迁移。而项目中要用到celery做异步任务的话,manage.py与纯Flask项目有些区别。下面是用到celery的manage.py的示例代码
"""
# -*- coding:utf-8 -*-
用命令对数据库表进行迁移

这是Flask项目进行迁移的命令
python manage.py db init
python manage.py db migrate
python manage.py db upgrade

如果想回退到上一级迁移文件的话,要用到下面这个命令
python manage.py db downgrade
"""

# package import
from flask_script import Manager
from flask_migrate import Migrate, MigrateCommand

# import app and celery_app
from app import app
from celery_tasks.celery_app import celery_app

from models.models import *
from util.exts import db


"""
tips:
    You must define two manager if you want to use celery, because Flask basic app and Celery app is different app
    This is important !
    If you don't create celery_manager, celery task will except a lot of exception, such as SqlAlchemy don't callback
"""

# define the manager for flask basic app
manager = Manager(app)

# use Migrate band db and flask basic app
migrate = Migrate(app, db)

# add migrate command to flask basic app manager
manager.add_command('db', MigrateCommand)

# define the manager for celery app
celery_manager = Manager(celery_app)

# use Migrate band db and celery app
celery_migrate = Migrate(celery_app, db)

# add migrate command to celery basic app manager
celery_manager.add_command('db', MigrateCommand)


# run flask basic app manager and celery app manager
if __name__ == "__main__":

	# run manager and celery_manager
    manager.run()

    celery_manager.run()

三、start_celery.sh 启动celery服务

  • celery_app我放在了celery_tasks目录下,所以需要用下面的命令启动celery服务
# start celery serve
celery -A celery_tasks.celery_app.celery_app worker -l info -P eventlet  -c 10

四、get_celery_result.py 获取任务结果

  • 如果要获取任务结果的话,AsyncResult中传入的id一定要是字符串,负责会报错,下面是获取celery任务结果的示例代码
# -*- coding: utf8 -*-

# package import
from celery.result import AsyncResult
from celery_tasks.celery_app import celery_app


# create class CeleryResultGet
class CeleryResultGet:
    def __init__(self, task_id, data_type):
        """
        :param task_id: celery task id
        :param data_type: the type of you want to get (status, result...)
        """

        self.task_id = str(task_id)
        self.data_type = data_type
        self.data = None

    # get celery task data function
    def get_task_data(self):

        if self.data_type == 'status':

            self.data = AsyncResult(id=self.task_id, app=celery_app).status

        elif self.data_type == 'result':

            self.data = AsyncResult(id=self.task_id, app=celery_app).result

        return self.data

目前就遇到这些问题,有新的坑再更新

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/199452.html

(0)
飞熊的头像飞熊bm

相关推荐

发表回复

登录后才能评论
极客之音——专业性很强的中文编程技术网站,欢迎收藏到浏览器,订阅我们!