官方文档:celery.app.control — Celery 5.2.7 documentation
worker远程控制命令的客户端。
服务器实现在celery.worker.control. 有两种类型的远程控制命令:
Inspect commands:
没有副作用,通常只会返回在 worker 中找到的一些值,如当前注册任务列表、活动任务列表等。命令可通过Inspect类访问。
Control commands:
执行副作用,比如添加一个新的队列来消费。命令可通过Control类访问。
class celery.app.control.Control(app=None):
pass
创建一个多任务python环境
目录结构如下所示:
其中,tasks是消费者模块,有管理的celery文件,有多个任务函数文件。produce_task1模拟消费者,测试异步任务调用,produce_task2模拟定时任务调用。
celery启动
celery -A celery_demo2.tasks.celery worker --loglevel=info -P eventlet
注意:在windows中执行时,添加-P eventlet,否则可能报错。
Ping
ping(destination=None)
Ping 所有(或特定)worker。
destination ( List ) – 如果设置,则向所有worker广播时向其发送命令的主机列表。
celery启动,指定名称-n,这儿启动两个来演示:
celery -A celery_demo2.tasks.celery worker -n dgw_node1 --loglevel=info -P eventlet
celery -A celery_demo2.tasks.celery worker -n dgw_node2 --loglevel=info -P eventlet
示例代码: 【此时任务被两个worker都进行消费了】
from celery_demo2.tasks.task1 import send_email
from celery_demo2.tasks.task2 import send_msg
from celery_demo2.tasks.celery import app
from celery import current_app
result1 = send_email.delay('张三')
print(f"任务ID{result1.id}")
result2 = send_msg.delay('王五')
print(f"任务ID{result2.id}")
ret = app.control.ping()
print(ret)
ret2 = app.control.ping(destination=['celery@dgw_node1'])
print(ret2)
ret3 = app.control.ping(destination=['celery@dgw_node2'])
print(ret3)
print("*" * 100)
ret4 = app.control.inspect().ping()
print(ret4)
ret5 = app.control.inspect().ping(destination=['celery@dgw_node1'])
print(ret5)
print("*" * 100)
ret6 = current_app.control.ping()
print(ret6)
ret7 = current_app.control.ping(destination=['celery@dgw_node1'])
print(ret7)
print("*" * 100)
ret8 = current_app.control.inspect().ping()
print(ret8)
ret9 = current_app.control.inspect().ping(destination=['celery@dgw_node1'])
print(ret9)
ret10 = current_app.control.inspect().ping(destination=['celery@dgw_node2'])
print(ret10)
print("程序执行完毕!")
运行结果:
registered
registered(*taskinfoitems)
返回每个worker的所有注册任务。
taskinfoitems ( Sequence [ str ] ) –Task 要包含的属性列表。
启动celery两个worker。
示例代码:
from celery_demo2.tasks.task1 import send_email
from celery_demo2.tasks.task2 import send_msg
from celery_demo2.tasks.celery import app
from celery import current_app
result1 = send_email.delay('张三')
print(f"任务ID{result1.id}")
result2 = send_msg.delay('王五')
print(f"任务ID{result2.id}")
ret1 = app.control.inspect().registered()
print(ret1)
ret2 = app.control.inspect().registered('serializer', 'max_retries')
print(ret2)
print("*" * 100)
ret3 = current_app.control.inspect().registered()
print(ret3)
ret4 = current_app.control.inspect().registered('serializer', 'max_retries')
print(ret4)
print("程序执行完毕!")
运行结果:
registered_tasks
该方法同registered。
current_app.control.inspect().registered_tasks()
current_app.control.inspect().registered_tasks('serializer', 'max_retries')
report
report( ):为每个worker返回人类可读的报告。
同样启动celery两个worker。
示例代码:
from celery_demo2.tasks.task1 import send_email
from celery_demo2.tasks.task2 import send_msg
from celery_demo2.tasks.celery import app
from celery import current_app
result1 = send_email.delay('张三')
print(f"任务ID{result1.id}")
result2 = send_msg.delay('王五')
print(f"任务ID{result2.id}")
ret1 = app.control.inspect().report()
print(ret1)
print("*" * 100)
ret2 = current_app.control.inspect().report()
print(ret2)
print("程序执行完毕!")
运行结果:
reserved
reserved(safe=None):当前保留任务的返回列表,不包括scheduled/active。
同样启动celery两个worker。
示例代码:
from celery_demo2.tasks.task1 import send_email
from celery_demo2.tasks.task2 import send_msg
from celery_demo2.tasks.celery import app
from celery import current_app
result1 = send_email.delay('张三')
print(f"任务ID{result1.id}")
result2 = send_msg.delay('王五')
print(f"任务ID{result2.id}")
ret1 = app.control.inspect().reserved()
print(ret1)
print("*" * 100)
ret2 = current_app.control.inspect().reserved()
print(ret2)
print("程序执行完毕!")
运行结果:
revoked
revoked():返回已撤销任务的列表。
同样启动celery两个worker。
示例代码:
from celery_demo2.tasks.task1 import send_email
from celery_demo2.tasks.task2 import send_msg
from celery_demo2.tasks.celery import app
from celery import current_app
result1 = send_email.delay('张三')
print(f"任务ID{result1.id}")
result2 = send_msg.delay('王五')
print(f"任务ID{result2.id}")
ret1 = app.control.inspect().revoked()
print(ret1)
print("*" * 100)
ret2 = current_app.control.inspect().revoked()
print(ret2)
print("*" * 100)
print("停止某个任务")
app.control.revoke(result1.id, terminate=True)
ret3 = app.control.inspect().revoked()
print(ret3)
print("*" * 100)
ret4 = current_app.control.inspect().revoked()
print(ret4)
print("程序执行完毕!")
运行结果:
参考博文:
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/142801.html