<kbd id="9plqc"><label id="9plqc"></label></kbd>

        <th id="9plqc"></th>
        1. <center id="9plqc"><video id="9plqc"></video></center>
          <sub id="9plqc"><form id="9plqc"><pre id="9plqc"></pre></form></sub>
          <nav id="9plqc"><form id="9plqc"><legend id="9plqc"></legend></form></nav>
          Python任務調(diào)度框架Rocketry 您所在的位置:網(wǎng)站首頁 屬雞2月出生人命運女 Python任務調(diào)度框架Rocketry

          Python任務調(diào)度框架Rocketry

          2025-07-15 08:25| 來源: 網(wǎng)絡整理| 查看: 265

          文章目錄 定時任務庫對比簡介與其余框架的區(qū)別安裝初試調(diào)度器基礎測試方法字符串格式具體時間間隔周期某時間段 條件 API條件邏輯方法對比 執(zhí)行選項在主進程和線程中執(zhí)行進程線程異步設置默認選項 日志流水線在一個任務后執(zhí)行輸入作為輸出會話級參數(shù)函數(shù)參數(shù)TODO:元參數(shù) 自定義條件 元任務遇到的坑參考文獻

          定時任務庫對比

          推薦閱讀 Python timing task - schedule vs. Celery vs. APScheduler

          庫大小優(yōu)點缺點適用場景Schedule輕量級易用無配置不能動態(tài)添加任務或持久化任務簡單任務Celery重量級①任務隊列②分布式①不能動態(tài)添加定時任務到系統(tǒng)中,如Flask(Django可以)②設置起來較累贅任務隊列APScheduler相對重量級①靈活,可動態(tài)增刪定時任務并持久化②支持多種存儲后端③集成框架多,用戶廣重量級,學習成本大通用Rocketry輕量級易用功能強尚未成熟,文檔不清晰通用

          簡介

          Rocketry 是 Python 的任務調(diào)度框架,易用、簡潔、強大。可通過 Python 裝飾器語法進行任務調(diào)度,支持定時、并發(fā)(異步、多線程、多進程)、條件觸發(fā)等。

          感覺沒有 Celery 和 APScheduler 成熟

          與其余框架的區(qū)別

          常見任務調(diào)度框架有:

          CrontabAPSchedulerAirflow

          Rocketry 的調(diào)度程序基于語句,有相同的調(diào)度策略,也可以使用自定義調(diào)度語句進行擴展。

          此外,Rocketry 非常易用,無需復雜配置,但可用于大型應用程序。

          安裝 pip install rocketry

          初試 import datetime from rocketry import Rocketry app = Rocketry() @app.task('every 5 seconds') def do_things(): print(datetime.datetime.now()) if __name__ == "__main__": app.run()

          調(diào)度器基礎

          創(chuàng)建調(diào)度器規(guī)則的方式,可通過與、或、非組合,還可用于任務的開始、結(jié)束、終止:

          字符串格式條件 API條件類

          測試方法

          判斷當前時間是否在 10:00 到 14:00 之間

          from rocketry.conds import time_of_day condition = time_of_day.between('10:00', '14:00') print(condition.observe())

          字符串格式

          簡單易寫,但靜態(tài)代碼分析器無法檢查語句是否正確

          具體時間間隔 import sys from rocketry import Rocketry app = Rocketry() @app.task('every 10 seconds') def do_constantly(): """每10秒執(zhí)行""" print(sys._getframe().f_code.co_name) @app.task('every 1 minute') def do_minutely(): """每1分鐘執(zhí)行""" print(sys._getframe().f_code.co_name) @app.task('every 1 hour') def do_hourly(): """每1小時執(zhí)行""" print(sys._getframe().f_code.co_name) @app.task('every 1 day') def do_daily(): """每1天執(zhí)行""" print(sys._getframe().f_code.co_name) @app.task('every 2 days 2 hours 20 seconds') def do_custom(): """每2天2小時20秒執(zhí)行""" print(sys._getframe().f_code.co_name) if __name__ == '__main__': app.run()

          周期 import sys from rocketry import Rocketry app = Rocketry() @app.task('secondly') def do_secondly(): """每1秒鐘執(zhí)行""" print(sys._getframe().f_code.co_name) @app.task('minutely') def do_minutely(): """每1分鐘執(zhí)行""" print(sys._getframe().f_code.co_name) @app.task('hourly') def do_hourly(): """每1小時執(zhí)行""" print(sys._getframe().f_code.co_name) @app.task('daily') def do_daily(): """每1天執(zhí)行""" print(sys._getframe().f_code.co_name) @app.task('weekly') def do_weekly(): """每1周執(zhí)行""" print(sys._getframe().f_code.co_name) @app.task('monthly') def do_monthly(): """每1個月執(zhí)行""" print(sys._getframe().f_code.co_name) if __name__ == '__main__': app.run()

          某時間段 before:之前after:之后between:之間starting:開始 import sys from rocketry import Rocketry app = Rocketry() @app.task('minutely before 45') def do_minutely(): """每分鐘的45秒前執(zhí)行""" print(sys._getframe().f_code.co_name) @app.task('hourly after 45:00') def do_hourly(): """每小時的45分后執(zhí)行""" print(sys._getframe().f_code.co_name) @app.task('daily between 08:00 and 14:00') def do_daily(): """每天的08:00到14:00這段時間內(nèi)執(zhí)行""" print(sys._getframe().f_code.co_name) @app.task('weekly on Monday') def do_weekly(): """每周的周一執(zhí)行""" print(sys._getframe().f_code.co_name) @app.task('monthly starting 3rd') def do_monthly(): """每個月的3號開始執(zhí)行""" print(sys._getframe().f_code.co_name) @app.task('time of day between 10:00 and 18:00') def do_constantly_during_day(): """每天的10:00到18:00這段時間內(nèi)執(zhí)行""" print(sys._getframe().f_code.co_name) @app.task('time of week between Saturday and Sunday') def do_constantly_during_weekend(): """每周的周六到周日這段時間內(nèi)執(zhí)行""" print(sys._getframe().f_code.co_name) if __name__ == '__main__': app.run()

          條件 API import sys from rocketry import Rocketry from rocketry.conds import every, hourly, daily, after_success, true, false app = Rocketry() @app.task(every('10 seconds')) def do_constantly(): """每10秒執(zhí)行""" print(sys._getframe().f_code.co_name) @app.task(hourly) def do_hourly(): """每1小時執(zhí)行""" print(sys._getframe().f_code.co_name) @app.task(daily.between('08:00', '14:00')) def do_daily(): """每天08:00到14:00執(zhí)行一次""" print(sys._getframe().f_code.co_name) @app.task(after_success(do_daily)) def do_after(): """do_daily成功后執(zhí)行""" print(sys._getframe().f_code.co_name) @app.task(true & false & ~(true | false)) def do_logic(): """邏輯執(zhí)行""" print(sys._getframe().f_code.co_name) if __name__ == '__main__': app.run()

          條件邏輯 &:與|:或~:非 import sys from rocketry import Rocketry from rocketry.conds import true, false app = Rocketry() @app.task(true) def do_constantly(): print(sys._getframe().f_code.co_name) @app.task(false) def do_never(): print(sys._getframe().f_code.co_name) @app.task(true & false) def do_and(): """與""" print(sys._getframe().f_code.co_name) @app.task(true | false) def do_or(): """或""" print(sys._getframe().f_code.co_name) @app.task(~false) def do_not(): """非""" print(sys._getframe().f_code.co_name) @app.task((true | false) & ~(true | false)) def do_nested(): print(sys._getframe().f_code.co_name) if __name__ == '__main__': app.run()

          方法對比

          執(zhí)行選項 main:在主進程和線程中執(zhí)行(默認)process:在單獨進程中執(zhí)行thread:在單獨線程中執(zhí)行async:異步執(zhí)行 執(zhí)行選項是否并發(fā)能否被終止能否修改 sessionmain???process???thread部分??async部分??

          threading.current_thread():獲取當前線程

          os.getpid():獲取當前進程 ID

          在主進程和線程中執(zhí)行 import os import sys import threading from rocketry import Rocketry app = Rocketry() @app.task('secondly', execution='main') def do_main(): """在主進程和線程中執(zhí)行""" print(sys._getframe().f_code.co_name, threading.current_thread(), os.getpid()) if __name__ == '__main__': app.run() # do_main 23448 # do_main 23448 # do_main 23448

          進程 import os import sys import threading from rocketry import Rocketry app = Rocketry() @app.task('secondly', execution='process') def do_process(): """在單獨進程中執(zhí)行""" print(sys._getframe().f_code.co_name, threading.current_thread(), os.getpid()) if __name__ == '__main__': app.run() # do_process 14612 # do_process 25996 # do_process 18504

          線程 import os import sys import threading from rocketry import Rocketry app = Rocketry() @app.task('secondly', execution='thread') def do_thread(): """""" print(sys._getframe().f_code.co_name, threading.currentThread(), os.getpid()) if __name__ == '__main__': app.run() # do_thread 26768 # do_thread 26768 # do_thread 26768

          異步 import os import sys import asyncio import threading from rocketry import Rocketry app = Rocketry() @app.task('secondly', execution='async') async def do_async(): """異步執(zhí)行""" print(sys._getframe().f_code.co_name, threading.currentThread(), os.getpid()) async def main(): rocketry_task = asyncio.create_task(app.serve()) await rocketry_task if __name__ == '__main__': asyncio.run(main()) # do_async 24976 # do_async 24976 # do_async 24976

          設置默認選項 import os import sys import threading from rocketry import Rocketry app = Rocketry(config={'task_execution': 'thread'}) @app.task('secondly') def do_thread(): print(sys._getframe().f_code.co_name, threading.currentThread(), os.getpid()) if __name__ == '__main__': app.run() # do_thread 26768 # do_thread 26768 # do_thread 26768

          日志

          內(nèi)置日志格式有:

          rocketry.log.MinimalRecord: 最簡略的日志rocketry.log.LogRecord: 經(jīng)典的日志元素rocketry.log.TaskLogRecord: 類似 LogRecord,同時包含開始、結(jié)束、運行次數(shù) import os import datetime from rocketry import Rocketry from redbird.repos import CSVFileRepo from rocketry.log import MinimalRecord, LogRecord, TaskLogRecord filename = 'logs.csv' if os.path.exists(filename): os.remove(filename) app = Rocketry(logger_repo=CSVFileRepo(filename=filename, model=MinimalRecord)) @app.task('secondly') def do_things(): print(datetime.datetime.now()) if __name__ == '__main__': app.run()

          流水線 在一個任務執(zhí)行后、成功后、失敗后,執(zhí)行任務將一個任務的輸出作為另一個任務的輸入

          在一個任務后執(zhí)行 after_success:成功后after_fail:失敗后after_finish:完成后 import sys import random from rocketry import Rocketry from rocketry.conds import after_success, after_fail, after_finish app = Rocketry(execution='main') @app.task('every 3 seconds') def do_things(): if random.randint(0, 10) % 2 == 0: print(sys._getframe().f_code.co_name, '\tfail!') raise Exception print(sys._getframe().f_code.co_name, '\tsuccess!') @app.task(after_success(do_things)) def do_after_success(): """成功后執(zhí)行""" print(sys._getframe().f_code.co_name) @app.task(after_fail(do_things)) def do_after_fail(): """失敗后執(zhí)行""" print(sys._getframe().f_code.co_name) @app.task(after_finish(do_things)) def do_after_fail_or_success(): """完成后執(zhí)行""" print(sys._getframe().f_code.co_name) if __name__ == '__main__': app.run()

          輸入作為輸出 import sys from rocketry import Rocketry from rocketry.args import Return from rocketry.conds import after_success app = Rocketry(execution='main') @app.task('every 3 seconds') def do_first(): print(sys._getframe().f_code.co_name) return 'Hello World' @app.task(after_success(do_first)) def do_second(arg=Return(do_first)): print(sys._getframe().f_code.co_name, arg) if __name__ == '__main__': app.run()

          會話級參數(shù)

          有兩種參數(shù):

          任務級別會話級別(多數(shù)情況下使用) Arg:SimpleArg:只傳遞值FuncArg:會話級函數(shù)實參 import sys from rocketry import Rocketry from rocketry.args import Arg, SimpleArg app = Rocketry(execution='main') app.params( my_arg='Hello world' ) @app.task('every 3 seconds') def do_arg(item=Arg('my_arg')): print(sys._getframe().f_code.co_name, item) @app.task('every 3 seconds') def do_simple_arg(item=SimpleArg('Hello world')): print(sys._getframe().f_code.co_name, item) if __name__ == '__main__': app.run()

          函數(shù)參數(shù)

          會話級別

          import sys import datetime from rocketry import Rocketry from rocketry.args import Arg app = Rocketry(execution='main') @app.param('my_arg') def get_item(): return datetime.datetime.now() @app.task('every 3 seconds') def do_func_arg(item=Arg('my_arg')): print(sys._getframe().f_code.co_name, item) if __name__ == '__main__': app.run()

          任務級別

          import sys import datetime from rocketry import Rocketry from rocketry.args import FuncArg app = Rocketry(execution='main') def get_item(): return datetime.datetime.now() @app.task('every 3 seconds') def do_func_arg(item=FuncArg(get_item)): print(sys._getframe().f_code.co_name, item) if __name__ == '__main__': app.run()

          TODO:元參數(shù)

          元參數(shù)包含調(diào)度系統(tǒng)組件的參數(shù),用于任務作會話,可關閉調(diào)度器或添加、刪除任務等

          會話參數(shù)

          from rocketry import Rocketry from rocketry.args import Session app = Rocketry(execution='main') @app.task('every 3 seconds') def manipulate_session(session=Session()): print(session) if __name__ == '__main__': app.run()

          任務參數(shù)

          from rocketry import Rocketry from rocketry.args import Task app = Rocketry(execution='main') @app.task() def do_things(): ... @app.task('every 3 seconds') def manipulate_task(this_task=Task(), another_task=Task('do_things')): print(this_task) print(another_task) if __name__ == '__main__': app.run()

          自定義條件 import sys from rocketry import Rocketry from rocketry.conds import daily app = Rocketry(execution='main') @app.cond() def things_ready(): return True or False @app.task(daily & things_ready) def do_things(): print(sys._getframe().f_code.co_name) if __name__ == '__main__': app.run()

          傳參,判斷文件是否存在

          import sys from pathlib import Path from rocketry import Rocketry from rocketry.conds import daily app = Rocketry(execution='main') @app.cond() def file_exists(file): return Path(file).is_file() @app.task(daily & file_exists(__file__)) def do_things(): print(sys._getframe().f_code.co_name) if __name__ == '__main__': app.run()

          傳參,判斷任務名

          import sys from pathlib import Path from rocketry import Rocketry from rocketry.args import Task from rocketry.conds import daily app = Rocketry(execution='main') @app.cond() def is_right_task(this_task=Task()): return this_task.name.startswith('do_') @app.task(daily & is_right_task) def do_things(): print(sys._getframe().f_code.co_name) if __name__ == '__main__': app.run()

          元任務

          可以在運行時:

          終止調(diào)度器重啟調(diào)度器強制任務運行禁用任務創(chuàng)建、更新、刪除任務

          遇到的坑

          FutureWarning: Default execution will be changed to ‘a(chǎn)sync’. To suppress this warning, specify task_execution, ie. Rocketry(execution=‘a(chǎn)sync’)

          實例化 Rocketry 對象時指定 execution,如

          from rocketry import Rocketry app = Rocketry(execution='main')

          參考文獻 Rocketry DocumentationRocketry GitHub


          【本文地址】

          公司簡介

          聯(lián)系我們

          今日新聞

          推薦新聞

          專題文章
            CopyRight 2018-2019 實驗室設備網(wǎng) 版權(quán)所有
            黄色免费网站在线看,韩国精品在线观看,韩国美女一区二区,99国产热 南漳县| 潞西市| 容城县| 海阳市| 利川市| 镇坪县| 静安区| 山东| 靖宇县| 多伦县| 南乐县| 光山县| 茶陵县| 兴山县| 霍林郭勒市| 黑龙江省| 蒙城县| 句容市| 宣汉县| 铅山县| 宾阳县| 静乐县| 永州市| 怀安县| 吉首市| 九龙坡区| 凤阳县| 平罗县| 江陵县| 维西| 洮南市| 沾化县| 个旧市| 莲花县| 廉江市| 黎平县| 科技| 门源| 临夏县| 德钦县| 龙岩市| http://444 http://444 http://444 http://444 http://444 http://444