APScheduler:定时任务框架

APScheduler:定时任务框架

安装

文档: https://apscheduler.readthedocs.io/en/stable/userguide.html

安装

$ pip install apscheduler
>>> import apscheduler
>>> apscheduler.version
'3.6.3'

组件

APScheduler由一下四部分组成

  • triggers:触发器,指定定时任务执行的时机,每个任务都有自己的触发器.
  • job stores:存储器,持久存储,默认存储在内存中.
  • executors:执行器,在定时任务执行时,以进程或线程方式执行
  • scheduler:调度器,包含BackgroundScheduler(后台运行)和BlockingScheduler(阻塞运行).他会合理安排作业存储器,执行器,触发器进行工作.并进行添加和删除任务等.调度器通常是只有一个的,开发人员很少直接操作触发器,存储器,执行器等.因为这些都由调度器自动来实现了.
APScheduler:定时任务框架
10334

触发器(triggers)

1.date在特定时间执行

示例:

from datetime import date, datetime

from apscheduler.schedulers.blocking import BlockingScheduler


sched = BlockingScheduler()

def my_job(text):
   print(text)

# run_date 接受 date, datetime 数据类型
sched.add_job(my_job, 'date', run_date=datetime(20201127193330), args=['text'])

sched.start()

更多:https://apscheduler.readthedocs.io/en/stable/modules/triggers/date.html

2.interval间隔执行

在固定的时间间隔后触发事件.参数如下:

weeks 周,整形
days 一个月中的第几天,整形
hours 时,整形
minutes 分,整形
seconds 秒,整形
start_date 起始时间
end_date 结束时间
jitter 触发的时间误差
from apscheduler.schedulers.blocking import BlockingScheduler
from datetime import datetime


sched = BlockingScheduler()

def job_funciton():
   print('hello world')


sched.add_job(job_funciton, trigger='interval',seconds=5)
# 指定小时
# sched.add_job(job_funciton, trigger='interval', hours=2)
# 指定开始,结束时间
# sched.add_job(job_funciton, trigger='interval', start_date='2020-11-27 20:30:00', end_date='2010-11-27 21:30:00)
sched.start() 

3.crontab

在某个确切的时间周期性触发事件.

year month 1-12 day 1-31
week 1-53 day_of_week 一周中的第几天(0/Monday) hour 0-23
minute 0-59 second 0-59 start_date datetime数据类型,或者字符串类型
end_date 结束时间 timezone 时区 jitter 触发的误差时间

也可以用表达式类型,可以用以下方式:

表达式 字段 描述
* 任何 在每个值都触发
*/a 任何 每隔 a触发一次
a-b 任何 在 a-b区间内任何一个时间触发( a必须小于 b)
a-b/c 任何 在 a-b区间内每隔 c触发一次
xth y day 第 x个星期 y触发
lastx day 最后一个星期 x触发
last day 一个月中的最后一天触发
x,y,z 任何 可以把上面的表达式进行组合
from apscheduler.schedulers.blocking import BlockingScheduler


def job_function():
   print "Hello World"

sched = BlockingScheduler()

# Schedules job_function to be run on the third Friday
# of June, July, August, November and December at 00:00, 01:00, 02:00 and 03:00
sched.add_job(job_function, 'cron', month='6-8,11-12', day='3rd fri', hour='0-3')

sched.start()

调度器(schedulers)

  1. BlockingScheduler:适用于调度程序是进程中唯一运行的进程,调用 start函数会阻塞当前线程,不能立即返回。
  2. BackgroundScheduler:适用于调度程序在应用程序的后台运行,调用 start后主线程不会阻塞。
  3. AsyncIOScheduler:适用于使用了 asyncio模块的应用程序。
  4. GeventScheduler:适用于使用 gevent模块的应用程序。
  5. TwistedScheduler:适用于构建 Twisted的应用程序。
  6. QtScheduler:适用于构建 Qt的应用程序。
  7. TornadoScheduler: tornado

任务存储器(job stores)

有2中方式,一种是加载在内存中(默认配置),一种是使用数据库.使用内存简单高效,但是程序出现问题,从新运行,会把以前的任务从新再执行一次.数据库则可以在中断的地方恢复正常使用.

  • MemoryJobStore:使用内存
  • MongoDBJobStore:使用mongodb
  • RedisJobStore:使用redis
  • SQLAlchemy:使用SQLAlchemy框架

1.RedisJobStore

RedisJobStore(db=0, jobs_key='apscheduler.jobs', run_times_key='apscheduler.run_times', pickle_protocol=pickle.HIGHEST_PROTOCOL, **connect_args)

有2种创建的方法:

  • add_jobstore:需要指定redis的相关参数.
from datetime import datetime, timedelta
import sys
import os

from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.jobstores.redis import RedisJobStore



def alarm(time):
    print('Alarm! This alarm was scheduled at %s.' % time)


if __name__ == '__main__':
    scheduler = BlockingScheduler()
    scheduler.add_jobstore('redis', jobs_key='example.jobs', run_times_key='example.run_times', host='192.168.0.101', port=6379, db=0)
    if len(sys.argv) > 1 and sys.argv[1] == '--clear':
        scheduler.remove_all_jobs()

    alarm_time = datetime.now() + timedelta(seconds=300)
    scheduler.add_job(alarm, 'date', run_date=alarm_time, args=[datetime.now()])
    print('To clear the alarms, run this example with the --clear argument.')
    print('Press Ctrl+{0} to exit'.format('Break' if os.name == 'nt' else 'C'))

    try:
        scheduler.start()
    except (KeyboardInterrupt, SystemExit):
        pass
  • RedisJobStore
from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.jobstores.redis import RedisJobStore
from datetime import datetime, timedelta

jobstore = {
    'default' : RedisJobStore(db=0, jobs_key='myfunc', run_times_key='myfunc_time', host='192.168.0.101', port=6379)
}

def my_func(t):
    print('hello world, %s' %t)


if __name__ == '__main__':
    sched = BlockingScheduler(jobstores=jobstore)
    alarm_time = datetime.now() + timedelta(seconds=300)
    sched.add_job(my_func,run_date=alarm_time, args=['ning'])
    sched.start()

均可在redis中查询到数据.

APScheduler:定时任务框架

2.SQLAlchemy

使用ORM框架,演示使用MySql

from apscheduler.schedulers.blocking import BlockingScheduler
from datetime import datetime, timedelta

def my_func(t):
   print('hello %s' %t)

if __name__ == "__main__":
   sched = BlockingScheduler()
   url = 'mysql+pymysql://root:2008.Cn123@192.168.0.101:3306/test'
   sched.add_jobstore('sqlalchemy', url=url,tablename='api_job')

   alarm_time = datetime.now() + timedelta(seconds=300)
   sched.add_job(my_func,run_date=alarm_time, args=['ning'])
   sched.start()

如果表不存在,会自动创建表.tablename用于指定表的名称.

在数据库中可以查看到表

select * from api_job;
APScheduler:定时任务框架
10336

执行器executors

执行器取决于应用场景,默认是ThreadPoolExecutor,它可以满足大部分需求.如果是CPU密集型计算,可以选择ProcessPoolExecutor

class apscheduler.executors.pool.ThreadPoolExecutor(max_workers=10)
class apscheduler.executors.pool.ProcessPoolExecutor(max_workers=10)
max_worker 指定最多使用线程/进程
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.executors.pool import ThreadPoolExecutor

executors = {
   'default': ThreadPoolExecutor(20),
}
conf = { # redis配置
   "host":127.0.0.1,
   "port":6379,
   "db":15# 连接15号数据库
   "max_connections":10 # redis最大支持300个连接数
}
scheduler = BackgroundScheduler(executors=executors)
scheduler.add_jobstore(jobstore='redis', **conf) # 添加任务持久化存储方式,如果未安装redis可省略此步骤

任务操作

  • add_job(func, id='xxx', args=None, kwargs=None)添加任务
# 添加任务func, func参数可以使用 '可导入模块:可调用对象'的方式引入,即可用模块来引入
#❯ tree -L 2
#├── func
#│   ├── add_func.py
#│   ├── __init__.py
#└── jobs.py
# add_func.py
def add(x,y):
    print(x+y)
    
# jobs.py
from apscheduler.schedulers.blocking import BlockingScheduler
from func.add_func import add

shced = BlockingScheduler()


if __name__ == "__main__":
    shced.add_job('func.add_func:add', args=[1,2], id='job1')
    shced.start()   

除去使用add_job(),还可以使用装饰器函数scheduled_job来添加任务.

  • remove_job(job_id):删除任务,需要指定job_id
  • pause_job(job_id):暂停任务
  • resume_job(job_id):恢复任务
  • modify_job(job_id, **changes):修改任务属性
  • print_jobs():作业信息
# 方法1
job = scheduler.add_job(myfunc, 'interval', minutes=2)  # 添加任务
job.remove()  # 删除任务
job.pause() # 暂定任务
job.resume()  # 恢复任务

# 方法2
scheduler.add_job(myfunc, 'interval', minutes=2, id='my_job_id')  # 添加任务    
scheduler.remove_job('my_job_id')  # 删除任务
scheduler.pause_job('my_job_id')  # 暂定任务
scheduler.resume_job('my_job_id')  # 恢复任务

示例

方法1

from pytz import utc
from datetime import datetime
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.jobstores.mongodb import MongoDBJobStore
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor
def tick():
   print('Tick! The time is: %s' % datetime.now())
# 选择MongoDB作为任务存储数据库
jobstores = {
   'mongo': MongoDBJobStore(),
   'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')
}
# 默认使用线程池
executors = {
   'default': ThreadPoolExecutor(20),
   'processpool': ProcessPoolExecutor(5)
}
# 默认参数配置
job_defaults = {
   'coalesce'False,  # 积攒的任务是否只跑一次,是否合并所有错过的Job
   'max_instances'3,  # 默认同一时刻只能有一个实例运行,通过max_instances=3修改为3个。
   'misfire_grace_time'30  # 30秒的任务超时容错
}
scheduler = BackgroundScheduler(jobstores=jobstores, executors=executors, job_defaults=job_defaults, timezone=utc)
scheduler.add_job(tick, 'interval', seconds=3)
scheduler.start()

方法2

from apscheduler.schedulers.background import BackgroundScheduler
# The "apscheduler." prefix is hard coded
scheduler = BackgroundScheduler({
   'apscheduler.jobstores.mongo': {
        'type''mongodb'
   },
   'apscheduler.jobstores.default': {
       'type''sqlalchemy',
       'url''sqlite:///jobs.sqlite'
   },
   'apscheduler.executors.default': {
       'class''apscheduler.executors.pool:ThreadPoolExecutor',
       'max_workers''20'
   },
   'apscheduler.executors.processpool': {
       'type''processpool',
       'max_workers''5'
   },
   'apscheduler.job_defaults.coalesce''false',
   'apscheduler.job_defaults.max_instances''3',
   'apscheduler.timezone''UTC',
})

方法3

from pytz import utc
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.executors.pool import ProcessPoolExecutor
jobstores = {
   'mongo': {'type''mongodb'},
   'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')
}
executors = {
   'default': {'type''threadpool''max_workers'20},
   'processpool': ProcessPoolExecutor(max_workers=5)
}
job_defaults = {
   'coalesce'False,
   'max_instances'3
}
scheduler = BackgroundScheduler()
# ..这里可以添加任务
scheduler.configure(jobstores=jobstores, executors=executors, job_defaults=job_defaults, timezone=utc)

misfire_grace_time:如果一个job本来14:00有一次执行,但是由于某种原因没有被调度上,现在14:01了,这个14:00的运行实例被提交时,会检查它预订运行的时间和当下时间的差值(这里是1分钟),大于我们设置的30秒限制,那么这个运行实例不会被执行。合并:最常见的情形是scheduler被shutdown后重启,某个任务会积攒了好几次没执行如5次,下次这个job被submit给executor时,执行5次。将coalesce=True后,只会执行一次

replace_existing: 如果在程序初始化时,是从数据库读取任务的,那么必须为每个任务定义一个明确的ID,并且使用replace_existing=True,否则每次重启程序,你都会得到一份新的任务拷贝,也就意味着任务的状态不会保存。


原文始发于微信公众号(Flask学习笔记):APScheduler:定时任务框架

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/36385.html

(0)
小半的头像小半

相关推荐

发表回复

登录后才能评论
极客之音——专业性很强的中文编程技术网站,欢迎收藏到浏览器,订阅我们!