celery中API Reference(celery.app.control)

命运对每个人都是一样的,不一样的是各自的努力和付出不同,付出的越多,努力的越多,得到的回报也越多,在你累的时候请看一下身边比你成功却还比你更努力的人,这样,你就会更有动力。

导读:本篇文章讲解 celery中API Reference(celery.app.control),希望对大家有帮助,欢迎收藏,转发!站点地址:www.bmabk.com,来源:原文

官方文档:celery.app.control — Celery 5.2.7 documentation

worker远程控制命令的客户端。

服务器实现在celery.worker.control. 有两种类型的远程控制命令:

Inspect commands: 

        没有副作用,通常只会返回在 worker 中找到的一些值,如当前注册任务列表、活动任务列表等。命令可通过Inspect类访问。

Control commands:

        执行副作用,比如添加一个新的队列来消费。命令可通过Control类访问。

class celery.app.control.Control(app=None):

pass

创建一个多任务python环境

目录结构如下所示:

celery中API Reference(celery.app.control)

其中,tasks是消费者模块,有管理的celery文件,有多个任务函数文件。produce_task1模拟消费者,测试异步任务调用,produce_task2模拟定时任务调用。

celery启动

celery -A celery_demo2.tasks.celery worker --loglevel=info -P eventlet

注意:在windows中执行时,添加-P eventlet,否则可能报错。

celery中API Reference(celery.app.control)

Ping

ping(destination=None)

Ping 所有(或特定)worker。

destination ( List ) – 如果设置,则向所有worker广播时向其发送命令的主机列表。

celery启动,指定名称-n,这儿启动两个来演示:

celery -A celery_demo2.tasks.celery worker -n dgw_node1 --loglevel=info -P eventlet
celery -A celery_demo2.tasks.celery worker -n dgw_node2 --loglevel=info -P eventlet

celery中API Reference(celery.app.control)

celery中API Reference(celery.app.control)

示例代码:   【此时任务被两个worker都进行消费了】

from celery_demo2.tasks.task1 import send_email
from celery_demo2.tasks.task2 import send_msg
from celery_demo2.tasks.celery import app
from celery import current_app


result1 = send_email.delay('张三')
print(f"任务ID{result1.id}")
result2 = send_msg.delay('王五')
print(f"任务ID{result2.id}")
ret = app.control.ping()
print(ret)

ret2 = app.control.ping(destination=['celery@dgw_node1'])
print(ret2)

ret3 = app.control.ping(destination=['celery@dgw_node2'])
print(ret3)

print("*" * 100)

ret4 = app.control.inspect().ping()
print(ret4)

ret5 = app.control.inspect().ping(destination=['celery@dgw_node1'])
print(ret5)

print("*" * 100)

ret6 = current_app.control.ping()
print(ret6)

ret7 = current_app.control.ping(destination=['celery@dgw_node1'])
print(ret7)

print("*" * 100)

ret8 = current_app.control.inspect().ping()
print(ret8)

ret9 = current_app.control.inspect().ping(destination=['celery@dgw_node1'])
print(ret9)

ret10 = current_app.control.inspect().ping(destination=['celery@dgw_node2'])
print(ret10)

print("程序执行完毕!")

运行结果:

celery中API Reference(celery.app.control)

registered

registered(*taskinfoitems)

返回每个worker的所有注册任务。

taskinfoitems ( Sequence [ str ] ) –Task 要包含的属性列表。

启动celery两个worker。

示例代码:

from celery_demo2.tasks.task1 import send_email
from celery_demo2.tasks.task2 import send_msg
from celery_demo2.tasks.celery import app
from celery import current_app


result1 = send_email.delay('张三')
print(f"任务ID{result1.id}")
result2 = send_msg.delay('王五')
print(f"任务ID{result2.id}")

ret1 = app.control.inspect().registered()
print(ret1)

ret2 = app.control.inspect().registered('serializer', 'max_retries')
print(ret2)

print("*" * 100)

ret3 = current_app.control.inspect().registered()
print(ret3)

ret4 = current_app.control.inspect().registered('serializer', 'max_retries')
print(ret4)

print("程序执行完毕!")

运行结果:

celery中API Reference(celery.app.control)

registered_tasks

该方法同registered。

current_app.control.inspect().registered_tasks()
current_app.control.inspect().registered_tasks('serializer', 'max_retries')

report

report( ):为每个worker返回人类可读的报告。

同样启动celery两个worker。

示例代码:

from celery_demo2.tasks.task1 import send_email
from celery_demo2.tasks.task2 import send_msg
from celery_demo2.tasks.celery import app
from celery import current_app


result1 = send_email.delay('张三')
print(f"任务ID{result1.id}")
result2 = send_msg.delay('王五')
print(f"任务ID{result2.id}")

ret1 = app.control.inspect().report()
print(ret1)

print("*" * 100)

ret2 = current_app.control.inspect().report()
print(ret2)

print("程序执行完毕!")

运行结果:

celery中API Reference(celery.app.control)

reserved

reserved(safe=None):当前保留任务的返回列表,不包括scheduled/active。

同样启动celery两个worker。

示例代码:

from celery_demo2.tasks.task1 import send_email
from celery_demo2.tasks.task2 import send_msg
from celery_demo2.tasks.celery import app
from celery import current_app


result1 = send_email.delay('张三')
print(f"任务ID{result1.id}")
result2 = send_msg.delay('王五')
print(f"任务ID{result2.id}")

ret1 = app.control.inspect().reserved()
print(ret1)

print("*" * 100)

ret2 = current_app.control.inspect().reserved()
print(ret2)

print("程序执行完毕!")

运行结果:

celery中API Reference(celery.app.control)

revoked

revoked():返回已撤销任务的列表。

同样启动celery两个worker。

示例代码:

from celery_demo2.tasks.task1 import send_email
from celery_demo2.tasks.task2 import send_msg
from celery_demo2.tasks.celery import app
from celery import current_app


result1 = send_email.delay('张三')
print(f"任务ID{result1.id}")
result2 = send_msg.delay('王五')
print(f"任务ID{result2.id}")

ret1 = app.control.inspect().revoked()
print(ret1)

print("*" * 100)

ret2 = current_app.control.inspect().revoked()
print(ret2)

print("*" * 100)
print("停止某个任务")

app.control.revoke(result1.id, terminate=True)

ret3 = app.control.inspect().revoked()
print(ret3)

print("*" * 100)

ret4 = current_app.control.inspect().revoked()
print(ret4)

print("程序执行完毕!")

运行结果:

celery中API Reference(celery.app.control)

参考博文:

celery简单实现异步任务和定时任务_IT之一小佬的博客-CSDN博客

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/142801.html

(0)
飞熊的头像飞熊bm

相关推荐

发表回复

登录后才能评论
极客之音——专业性很强的中文编程技术网站,欢迎收藏到浏览器,订阅我们!