Python任務調(diào)度框架Rocketry | 您所在的位置:網(wǎng)站首頁 › 屬雞2月出生人命運女 › Python任務調(diào)度框架Rocketry |
文章目錄
定時任務庫對比簡介與其余框架的區(qū)別安裝初試調(diào)度器基礎測試方法字符串格式具體時間間隔周期某時間段
條件 API條件邏輯方法對比
執(zhí)行選項在主進程和線程中執(zhí)行進程線程異步設置默認選項
日志流水線在一個任務后執(zhí)行輸入作為輸出會話級參數(shù)函數(shù)參數(shù)TODO:元參數(shù)
自定義條件
元任務遇到的坑參考文獻
定時任務庫對比
推薦閱讀 Python timing task - schedule vs. Celery vs. APScheduler 庫大小優(yōu)點缺點適用場景Schedule輕量級易用無配置不能動態(tài)添加任務或持久化任務簡單任務Celery重量級①任務隊列②分布式①不能動態(tài)添加定時任務到系統(tǒng)中,如Flask(Django可以)②設置起來較累贅任務隊列APScheduler相對重量級①靈活,可動態(tài)增刪定時任務并持久化②支持多種存儲后端③集成框架多,用戶廣重量級,學習成本大通用Rocketry輕量級易用功能強尚未成熟,文檔不清晰通用 簡介Rocketry 是 Python 的任務調(diào)度框架,易用、簡潔、強大。可通過 Python 裝飾器語法進行任務調(diào)度,支持定時、并發(fā)(異步、多線程、多進程)、條件觸發(fā)等。 感覺沒有 Celery 和 APScheduler 成熟 與其余框架的區(qū)別常見任務調(diào)度框架有: CrontabAPSchedulerAirflowRocketry 的調(diào)度程序基于語句,有相同的調(diào)度策略,也可以使用自定義調(diào)度語句進行擴展。 此外,Rocketry 非常易用,無需復雜配置,但可用于大型應用程序。 安裝 pip install rocketry 初試 import datetime from rocketry import Rocketry app = Rocketry() @app.task('every 5 seconds') def do_things(): print(datetime.datetime.now()) if __name__ == "__main__": app.run() 調(diào)度器基礎創(chuàng)建調(diào)度器規(guī)則的方式,可通過與、或、非組合,還可用于任務的開始、結(jié)束、終止: 字符串格式條件 API條件類 測試方法判斷當前時間是否在 10:00 到 14:00 之間 from rocketry.conds import time_of_day condition = time_of_day.between('10:00', '14:00') print(condition.observe()) 字符串格式簡單易寫,但靜態(tài)代碼分析器無法檢查語句是否正確 具體時間間隔 import sys from rocketry import Rocketry app = Rocketry() @app.task('every 10 seconds') def do_constantly(): """每10秒執(zhí)行""" print(sys._getframe().f_code.co_name) @app.task('every 1 minute') def do_minutely(): """每1分鐘執(zhí)行""" print(sys._getframe().f_code.co_name) @app.task('every 1 hour') def do_hourly(): """每1小時執(zhí)行""" print(sys._getframe().f_code.co_name) @app.task('every 1 day') def do_daily(): """每1天執(zhí)行""" print(sys._getframe().f_code.co_name) @app.task('every 2 days 2 hours 20 seconds') def do_custom(): """每2天2小時20秒執(zhí)行""" print(sys._getframe().f_code.co_name) if __name__ == '__main__': app.run() 周期 import sys from rocketry import Rocketry app = Rocketry() @app.task('secondly') def do_secondly(): """每1秒鐘執(zhí)行""" print(sys._getframe().f_code.co_name) @app.task('minutely') def do_minutely(): """每1分鐘執(zhí)行""" print(sys._getframe().f_code.co_name) @app.task('hourly') def do_hourly(): """每1小時執(zhí)行""" print(sys._getframe().f_code.co_name) @app.task('daily') def do_daily(): """每1天執(zhí)行""" print(sys._getframe().f_code.co_name) @app.task('weekly') def do_weekly(): """每1周執(zhí)行""" print(sys._getframe().f_code.co_name) @app.task('monthly') def do_monthly(): """每1個月執(zhí)行""" print(sys._getframe().f_code.co_name) if __name__ == '__main__': app.run() 某時間段 before:之前after:之后between:之間starting:開始 import sys from rocketry import Rocketry app = Rocketry() @app.task('minutely before 45') def do_minutely(): """每分鐘的45秒前執(zhí)行""" print(sys._getframe().f_code.co_name) @app.task('hourly after 45:00') def do_hourly(): """每小時的45分后執(zhí)行""" print(sys._getframe().f_code.co_name) @app.task('daily between 08:00 and 14:00') def do_daily(): """每天的08:00到14:00這段時間內(nèi)執(zhí)行""" print(sys._getframe().f_code.co_name) @app.task('weekly on Monday') def do_weekly(): """每周的周一執(zhí)行""" print(sys._getframe().f_code.co_name) @app.task('monthly starting 3rd') def do_monthly(): """每個月的3號開始執(zhí)行""" print(sys._getframe().f_code.co_name) @app.task('time of day between 10:00 and 18:00') def do_constantly_during_day(): """每天的10:00到18:00這段時間內(nèi)執(zhí)行""" print(sys._getframe().f_code.co_name) @app.task('time of week between Saturday and Sunday') def do_constantly_during_weekend(): """每周的周六到周日這段時間內(nèi)執(zhí)行""" print(sys._getframe().f_code.co_name) if __name__ == '__main__': app.run() 條件 API import sys from rocketry import Rocketry from rocketry.conds import every, hourly, daily, after_success, true, false app = Rocketry() @app.task(every('10 seconds')) def do_constantly(): """每10秒執(zhí)行""" print(sys._getframe().f_code.co_name) @app.task(hourly) def do_hourly(): """每1小時執(zhí)行""" print(sys._getframe().f_code.co_name) @app.task(daily.between('08:00', '14:00')) def do_daily(): """每天08:00到14:00執(zhí)行一次""" print(sys._getframe().f_code.co_name) @app.task(after_success(do_daily)) def do_after(): """do_daily成功后執(zhí)行""" print(sys._getframe().f_code.co_name) @app.task(true & false & ~(true | false)) def do_logic(): """邏輯執(zhí)行""" print(sys._getframe().f_code.co_name) if __name__ == '__main__': app.run() 條件邏輯 &:與|:或~:非 import sys from rocketry import Rocketry from rocketry.conds import true, false app = Rocketry() @app.task(true) def do_constantly(): print(sys._getframe().f_code.co_name) @app.task(false) def do_never(): print(sys._getframe().f_code.co_name) @app.task(true & false) def do_and(): """與""" print(sys._getframe().f_code.co_name) @app.task(true | false) def do_or(): """或""" print(sys._getframe().f_code.co_name) @app.task(~false) def do_not(): """非""" print(sys._getframe().f_code.co_name) @app.task((true | false) & ~(true | false)) def do_nested(): print(sys._getframe().f_code.co_name) if __name__ == '__main__': app.run() 方法對比 執(zhí)行選項 main:在主進程和線程中執(zhí)行(默認)process:在單獨進程中執(zhí)行thread:在單獨線程中執(zhí)行async:異步執(zhí)行 執(zhí)行選項是否并發(fā)能否被終止能否修改 sessionmain???process???thread部分??async部分??threading.current_thread():獲取當前線程 os.getpid():獲取當前進程 ID 在主進程和線程中執(zhí)行 import os import sys import threading from rocketry import Rocketry app = Rocketry() @app.task('secondly', execution='main') def do_main(): """在主進程和線程中執(zhí)行""" print(sys._getframe().f_code.co_name, threading.current_thread(), os.getpid()) if __name__ == '__main__': app.run() # do_main 23448 # do_main 23448 # do_main 23448 進程 import os import sys import threading from rocketry import Rocketry app = Rocketry() @app.task('secondly', execution='process') def do_process(): """在單獨進程中執(zhí)行""" print(sys._getframe().f_code.co_name, threading.current_thread(), os.getpid()) if __name__ == '__main__': app.run() # do_process 14612 # do_process 25996 # do_process 18504 線程 import os import sys import threading from rocketry import Rocketry app = Rocketry() @app.task('secondly', execution='thread') def do_thread(): """""" print(sys._getframe().f_code.co_name, threading.currentThread(), os.getpid()) if __name__ == '__main__': app.run() # do_thread 26768 # do_thread 26768 # do_thread 26768 異步 import os import sys import asyncio import threading from rocketry import Rocketry app = Rocketry() @app.task('secondly', execution='async') async def do_async(): """異步執(zhí)行""" print(sys._getframe().f_code.co_name, threading.currentThread(), os.getpid()) async def main(): rocketry_task = asyncio.create_task(app.serve()) await rocketry_task if __name__ == '__main__': asyncio.run(main()) # do_async 24976 # do_async 24976 # do_async 24976 設置默認選項 import os import sys import threading from rocketry import Rocketry app = Rocketry(config={'task_execution': 'thread'}) @app.task('secondly') def do_thread(): print(sys._getframe().f_code.co_name, threading.currentThread(), os.getpid()) if __name__ == '__main__': app.run() # do_thread 26768 # do_thread 26768 # do_thread 26768 日志內(nèi)置日志格式有: rocketry.log.MinimalRecord: 最簡略的日志rocketry.log.LogRecord: 經(jīng)典的日志元素rocketry.log.TaskLogRecord: 類似 LogRecord,同時包含開始、結(jié)束、運行次數(shù) import os import datetime from rocketry import Rocketry from redbird.repos import CSVFileRepo from rocketry.log import MinimalRecord, LogRecord, TaskLogRecord filename = 'logs.csv' if os.path.exists(filename): os.remove(filename) app = Rocketry(logger_repo=CSVFileRepo(filename=filename, model=MinimalRecord)) @app.task('secondly') def do_things(): print(datetime.datetime.now()) if __name__ == '__main__': app.run() 流水線 在一個任務執(zhí)行后、成功后、失敗后,執(zhí)行任務將一個任務的輸出作為另一個任務的輸入 在一個任務后執(zhí)行 after_success:成功后after_fail:失敗后after_finish:完成后 import sys import random from rocketry import Rocketry from rocketry.conds import after_success, after_fail, after_finish app = Rocketry(execution='main') @app.task('every 3 seconds') def do_things(): if random.randint(0, 10) % 2 == 0: print(sys._getframe().f_code.co_name, '\tfail!') raise Exception print(sys._getframe().f_code.co_name, '\tsuccess!') @app.task(after_success(do_things)) def do_after_success(): """成功后執(zhí)行""" print(sys._getframe().f_code.co_name) @app.task(after_fail(do_things)) def do_after_fail(): """失敗后執(zhí)行""" print(sys._getframe().f_code.co_name) @app.task(after_finish(do_things)) def do_after_fail_or_success(): """完成后執(zhí)行""" print(sys._getframe().f_code.co_name) if __name__ == '__main__': app.run() 輸入作為輸出 import sys from rocketry import Rocketry from rocketry.args import Return from rocketry.conds import after_success app = Rocketry(execution='main') @app.task('every 3 seconds') def do_first(): print(sys._getframe().f_code.co_name) return 'Hello World' @app.task(after_success(do_first)) def do_second(arg=Return(do_first)): print(sys._getframe().f_code.co_name, arg) if __name__ == '__main__': app.run() 會話級參數(shù)有兩種參數(shù): 任務級別會話級別(多數(shù)情況下使用) Arg:SimpleArg:只傳遞值FuncArg:會話級函數(shù)實參 import sys from rocketry import Rocketry from rocketry.args import Arg, SimpleArg app = Rocketry(execution='main') app.params( my_arg='Hello world' ) @app.task('every 3 seconds') def do_arg(item=Arg('my_arg')): print(sys._getframe().f_code.co_name, item) @app.task('every 3 seconds') def do_simple_arg(item=SimpleArg('Hello world')): print(sys._getframe().f_code.co_name, item) if __name__ == '__main__': app.run() 函數(shù)參數(shù)會話級別 import sys import datetime from rocketry import Rocketry from rocketry.args import Arg app = Rocketry(execution='main') @app.param('my_arg') def get_item(): return datetime.datetime.now() @app.task('every 3 seconds') def do_func_arg(item=Arg('my_arg')): print(sys._getframe().f_code.co_name, item) if __name__ == '__main__': app.run()任務級別 import sys import datetime from rocketry import Rocketry from rocketry.args import FuncArg app = Rocketry(execution='main') def get_item(): return datetime.datetime.now() @app.task('every 3 seconds') def do_func_arg(item=FuncArg(get_item)): print(sys._getframe().f_code.co_name, item) if __name__ == '__main__': app.run() TODO:元參數(shù)元參數(shù)包含調(diào)度系統(tǒng)組件的參數(shù),用于任務作會話,可關閉調(diào)度器或添加、刪除任務等 會話參數(shù) from rocketry import Rocketry from rocketry.args import Session app = Rocketry(execution='main') @app.task('every 3 seconds') def manipulate_session(session=Session()): print(session) if __name__ == '__main__': app.run()任務參數(shù) from rocketry import Rocketry from rocketry.args import Task app = Rocketry(execution='main') @app.task() def do_things(): ... @app.task('every 3 seconds') def manipulate_task(this_task=Task(), another_task=Task('do_things')): print(this_task) print(another_task) if __name__ == '__main__': app.run() 自定義條件 import sys from rocketry import Rocketry from rocketry.conds import daily app = Rocketry(execution='main') @app.cond() def things_ready(): return True or False @app.task(daily & things_ready) def do_things(): print(sys._getframe().f_code.co_name) if __name__ == '__main__': app.run()傳參,判斷文件是否存在 import sys from pathlib import Path from rocketry import Rocketry from rocketry.conds import daily app = Rocketry(execution='main') @app.cond() def file_exists(file): return Path(file).is_file() @app.task(daily & file_exists(__file__)) def do_things(): print(sys._getframe().f_code.co_name) if __name__ == '__main__': app.run()傳參,判斷任務名 import sys from pathlib import Path from rocketry import Rocketry from rocketry.args import Task from rocketry.conds import daily app = Rocketry(execution='main') @app.cond() def is_right_task(this_task=Task()): return this_task.name.startswith('do_') @app.task(daily & is_right_task) def do_things(): print(sys._getframe().f_code.co_name) if __name__ == '__main__': app.run() 元任務可以在運行時: 終止調(diào)度器重啟調(diào)度器強制任務運行禁用任務創(chuàng)建、更新、刪除任務 遇到的坑FutureWarning: Default execution will be changed to ‘a(chǎn)sync’. To suppress this warning, specify task_execution, ie. Rocketry(execution=‘a(chǎn)sync’) 實例化 Rocketry 對象時指定 execution,如 from rocketry import Rocketry app = Rocketry(execution='main') 參考文獻 Rocketry DocumentationRocketry GitHub |
今日新聞 |
推薦新聞 |
專題文章 |
CopyRight 2018-2019 實驗室設備網(wǎng) 版權(quán)所有 |