小さなスクリプトを決まったタイミングで動かしたい時に、Pythonのschedule
ライブラリはとても手軽です。
OSのタスクスケジューラやcronほど重装備ではありませんが、常駐プロセスの中で周期的に関数を呼び出すには十分です。
ここではインストールから頻度指定、実行ループ、エラー対策、実用サンプルまで丁寧に解説します。
Pythonのscheduleとは?定期実行の基本
scheduleの特徴と用途
scheduleは、Pythonプロセス内で軽量に定期実行を実現するライブラリです。
指定した関数やメソッドを、一定間隔や特定の時刻に合わせて実行できます。
内部的にはスレッドやプロセスを勝手に増やすことはなく、あなたが書くループの中でrun_pending()
を呼び出して「実行時刻になったジョブだけ」を順に実行します。
用途の例としては、ログの定期出力、バックアップの実行、APIのポーリング、軽量なメンテナンス処理のトリガなどが挙げられます。
ただし、プロセスが停止すればジョブも止まるため、システムレベルの常駐が必要ならOSのタスクスケジューラやcron、より本格的なAPSchedulerのようなライブラリも検討してください。
インストール
Pythonが導入済みなら、pipでインストールできます。
# 仮想環境の利用をおすすめします
python -m venv .venv
# Windows
.venv\Scripts\activate
# macOS/Linux
source .venv/bin/activate
# scheduleのインストール
pip install schedule
# 参考: バージョン確認
pip show schedule
インポートだけで使える軽量パッケージなので、外部サービスの設定などは不要です。
最小サンプル
最小の動作を確かめるために、数秒ごとにメッセージを出す例です。
実運用では分単位や日次で使うことが多いですが、学習時は秒単位が便利です。
# file: minimal_schedule.py
import schedule
import time
from datetime import datetime
def hello():
"""定期的に呼び出される関数"""
print(f"[{datetime.now().strftime('%H:%M:%S')}] Hello, schedule!")
# 5秒ごとにhelloを実行
schedule.every(5).seconds.do(hello)
# ループして実行時刻になったジョブを動かす
# Ctrl+Cで停止できます
try:
while True:
schedule.run_pending() # そろそろ実行すべきジョブがあれば実行
time.sleep(1) # CPUを占有しないように少し待つ
except KeyboardInterrupt:
print("Stopped.")
[12:00:05] Hello, schedule!
[12:00:10] Hello, schedule!
[12:00:15] Hello, schedule!
^C
Stopped.
scheduleは「ループの中でrun_pending()
を呼ぶ」ことが必須です。
これを忘れると何も起きません。
scheduleでの書き方と頻度指定
毎分・毎時の定期実行
基本の間隔指定は数値と単位を組み合わせます。
毎分・毎時の実行例は以下の通りです。
import schedule
import time
def job_minute():
print("毎分のジョブを実行")
def job_hour():
print("毎時のジョブを実行")
# 毎分
schedule.every().minute.do(job_minute)
# 5分ごと
schedule.every(5).minutes.do(lambda: print("5分おきのジョブ"))
# 毎時
schedule.every().hour.do(job_hour)
# 各ループ
while True:
schedule.run_pending()
time.sleep(1)
特定の分で実行したい場合は毎時30分ちょうど
のように.at(':30')
が使えます。
# 各時間の30分に実行 (例: 10:30, 11:30, 12:30, ...)
schedule.every().hour.at(":30").do(lambda: print("各時間の30分に実行"))
毎日(9:00など)の定期実行
毎日9時ちょうどに実行するには、every().day.at("09:00")
を使います。
import schedule
from datetime import datetime
def backup():
print(f"{datetime.now()} バックアップを実行中...")
schedule.every().day.at("09:00").do(backup)
時刻文字列は24時間表記で"HH:MM"
または"HH:MM:SS"
です。
処理はローカル時刻で解釈されます(タイムゾーンは後述)。
平日・週次の定期実行
曜日ごとの指定も分かりやすく書けます。
import schedule
def weekday_task():
print("平日だけのタスク")
# 曜日を個別に登録 (平日のみ)
schedule.every().monday.at("10:00").do(weekday_task)
schedule.every().tuesday.at("10:00").do(weekday_task)
schedule.every().wednesday.at("10:00").do(weekday_task)
schedule.every().thursday.at("10:00").do(weekday_task)
schedule.every().friday.at("10:00").do(weekday_task)
# 週次 (毎週火曜の13:15)
schedule.every().tuesday.at("13:15").do(lambda: print("週次タスク"))
# 2週間ごと (隔週金曜の18:00) も可能
schedule.every(2).weeks.do(lambda: print("隔週タスク")).at("18:00")
平日だけ実行したい時は、上のように月〜金を個別に登録するのが簡単です。
1件のジョブで条件分岐しても良いですが、時刻指定の柔軟性は分けた方が保ちやすいです。
複数ジョブの登録
複数のジョブを登録すると、run_pending()
は「いま実行すべきジョブ」を順に処理します。
ジョブにはtag
を付けて管理できます。
import schedule
import time
def job_a():
print("Aを実行")
def job_b():
print("Bを実行")
# タグを付けて登録
schedule.every(2).minutes.do(job_a).tag("group1", "fast")
schedule.every().hour.do(job_b).tag("group1", "slow")
# タグで停止・削除も可能
def stop_group1():
print("group1をクリア")
schedule.clear("group1")
# 実験用に10分後に削除を仕込む
schedule.every(10).minutes.do(stop_group1).tag("admin")
while True:
schedule.run_pending()
time.sleep(1)
タグ無しですべて消す場合はschedule.clear()
です。
現在登録中のジョブはschedule.get_jobs()
で一覧できます。
引数付きジョブ
ジョブに引数を渡すには.do(func, arg1, kw=value)
と書きます。
import schedule
from datetime import datetime
def greet(name, punctuation="!"):
print(f"{datetime.now():%H:%M:%S} Hello, {name}{punctuation}")
# 3分おきに引数付きで実行
schedule.every(3).minutes.do(greet, "Alice", punctuation="!!!")
引数の評価はスケジュール登録時に行われるため、副作用のあるオブジェクトをその場で生成して渡す場合は注意してください。
必要ならfunctools.partial
を活用します。
実行ループとエラー対策
while Trueとtime.sleep
典型的な実行ループは次の通りです。
CPU使用率を抑えるためtime.sleep
を入れます。
import schedule
import time
def loop_forever():
try:
while True:
schedule.run_pending()
time.sleep(1)
except KeyboardInterrupt:
print("Stopped.")
loop_forever()
待ち時間を最適化したい場合、schedule.idle_seconds()
を使うと次回実行までの秒数が取れます。
import schedule
import time
import math
def efficient_loop():
try:
while True:
schedule.run_pending()
idle = schedule.idle_seconds() # 次のジョブまでの秒数 or None
if idle is None:
time.sleep(1)
else:
# 0以下の場合はすぐにrun_pendingに戻る
time.sleep(max(0.0, min(idle, 60))) # 長くても60秒まで
except KeyboardInterrupt:
print("Stopped.")
efficient_loop()
例外処理とリトライ
ジョブの中で例外が起きても、run_pending()
の呼び出し自体は継続できます。
各ジョブの内部でtry/exceptし、ログを残しつつリトライのポリシーを実装すると安定します。
import schedule
import time
from random import random
def with_retry(task, *, retries=3, base_delay=1.0, factor=2.0):
"""指数バックオフで簡易リトライを行うラッパー"""
def wrapper(*args, **kwargs):
delay = base_delay
for attempt in range(1, retries + 1):
try:
return task(*args, **kwargs)
except Exception as e:
print(f"[{task.__name__}] エラー: {e} (試行{attempt}/{retries})")
if attempt == retries:
print(f"[{task.__name__}] 失敗を記録して中断")
break
time.sleep(delay)
delay *= factor
return wrapper
@with_retry
def flaky_job():
# たまに失敗する処理の例
if random() < 0.5:
raise RuntimeError("一時的な失敗")
print("成功!")
schedule.every(30).seconds.do(flaky_job)
try:
while True:
schedule.run_pending()
time.sleep(1)
except KeyboardInterrupt:
print("Stopped.")
[flaky_job] エラー: 一時的な失敗 (試行1/3)
[flaky_job] エラー: 一時的な失敗 (試行2/3)
成功!
永久リトライは無限ループの原因になるため、回数制限やアラート通知を併用するのがおすすめです。
ジョブを中止したい場合
条件によってそれ以降のスケジュールをやめたい時は、ジョブの中でreturn schedule.CancelJob
を返します。
import schedule
counter = 0
def limited_job():
global counter
counter += 1
print(f"実行{counter}回目")
if counter >= 3:
print("このジョブは終了します")
return schedule.CancelJob
schedule.every(5).seconds.do(limited_job)
長時間処理の重複防止
scheduleはデフォルトでは1スレッドで順番に処理するため、同じジョブが重複起動することは基本的にありません。
ただし、長時間処理を別スレッドで動かす設計にした場合や、複数プロセスから同じジョブを実行し得る場合は排他制御が必要です。
以下は同一プロセス内での簡易ロックの例です。
import schedule
import threading
import time
lock = threading.Lock()
def long_running_task():
# 今動作中ならスキップ
if not lock.acquire(blocking=False):
print("前回の処理がまだ終わっていないためスキップしました")
return
try:
print("長時間処理を開始...")
time.sleep(10) # 実際の重い処理を想定
print("長時間処理が完了しました")
finally:
lock.release()
# 1分おきに起動要求が来ても、重複は起こさない
schedule.every().minute.do(long_running_task)
while True:
schedule.run_pending()
time.sleep(1)
他プロセスや他ホストと排他したい場合は、ファイルロックやDBロック、分散ロック(Redisなど)を用意します。
ローカル時刻とタイムゾーン
scheduleはローカル時刻を使います。
サーバのタイムゾーンがUTCならevery().day.at("09:00")
は「UTCの9時」です。
日本時間9時に合わせたいなら、以下のいずれかを検討します。
- プロセスが動作するOSのタイムゾーンをAsia/Tokyoに設定する
- コンテナやサービス単位でTZ環境変数を設定する
- どうしてもUTCで動かすなら、UTCでの対応時刻(例: 日本時間9時はUTCで0時 or 0時/1時(DSTなしなら固定))に変換して登録する
- 応急的に「毎分実行して、指定タイムゾーンでの時刻が条件に合う時だけ本処理を行う」ガードを入れる
from datetime import datetime
from zoneinfo import ZoneInfo
import schedule
import time
last_run_date = None # 同じ日での二重実行防止
def notify_tokyo_1730():
global last_run_date
now_tokyo = datetime.now(ZoneInfo("Asia/Tokyo"))
# 17:30ちょうどの分で、未実行なら実行
if now_tokyo.hour == 17 and now_tokyo.minute == 30:
if last_run_date != now_tokyo.date():
print(f"Tokyo 17:30 通知を送信: {now_tokyo.isoformat()}")
last_run_date = now_tokyo.date()
else:
# 同じ分内で複数回呼ばれないようガード
pass
# 毎分チェック
schedule.every().minute.do(notify_tokyo_1730)
while True:
schedule.run_pending()
time.sleep(1)
サマータイム(DST)がある地域の時刻指定は特に注意です。
DSTに強いスケジューリングが必要なら、APSchedulerなどのタイムゾーン対応が豊富なライブラリも検討しましょう。
コピペOKのscheduleサンプル
ログを毎分出力
最小限のログ出力を毎分行い、停止はCtrl+Cで行います。
# file: heartbeat_every_minute.py
import schedule
import time
import logging
from datetime import datetime
# ロガー設定
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s %(levelname)s %(message)s"
)
def heartbeat():
"""アプリが生きていることを示すハートビート"""
logging.info("heartbeat")
# 毎分実行
schedule.every().minute.do(heartbeat)
def main():
logging.info("start scheduler")
try:
while True:
schedule.run_pending()
time.sleep(1)
except KeyboardInterrupt:
logging.info("stopped by user")
if __name__ == "__main__":
main()
2025-09-21 12:00:00,000 INFO start scheduler
2025-09-21 12:01:00,001 INFO heartbeat
2025-09-21 12:02:00,001 INFO heartbeat
2025-09-21 12:03:00,001 INFO heartbeat
2025-09-21 12:03:10,123 INFO stopped by user
バックアップを毎日9:00に実行
指定ディレクトリを日付で丸ごとコピーする簡単バックアップです。
失敗時はリトライします。
# file: daily_backup_0900.py
import schedule
import time
import shutil
from datetime import datetime
from pathlib import Path
SRC = Path("data") # バックアップ元
DST_BASE = Path("backup") # バックアップ先のルート
RETRIES = 3
def dated_backup_dir(base: Path, now: datetime) -> Path:
"""日付ベースのバックアップディレクトリ"""
return base / now.strftime("%Y%m%d_%H%M%S")
def backup():
"""ディレクトリを丸ごとコピーするバックアップ"""
now = datetime.now()
dst = dated_backup_dir(DST_BASE, now)
for attempt in range(1, RETRIES + 1):
try:
dst.parent.mkdir(parents=True, exist_ok=True)
print(f"[{now}] backup start -> {dst}")
shutil.copytree(SRC, dst) # 存在する場合は例外
print(f"[{datetime.now()}] backup done")
return
except FileExistsError:
# 同一時刻での重複を避けるため1秒待って再生成
time.sleep(1)
dst = dated_backup_dir(DST_BASE, datetime.now())
except Exception as e:
print(f"backup error: {e} (attempt {attempt}/{RETRIES})")
time.sleep(2 ** attempt)
print("backup failed after retries")
# 毎日9:00にバックアップ
schedule.every().day.at("09:00").do(backup)
if __name__ == "__main__":
print("scheduler started")
try:
while True:
schedule.run_pending()
time.sleep(1)
except KeyboardInterrupt:
print("stopped")
scheduler started
[2025-09-21 09:00:00.001234] backup start -> backup/20250921_090000
[2025-09-21 09:00:05.678901] backup done
平日17:30に通知を送る
月〜金の17:30にメッセージを送る例です。
ここでは標準出力に出すだけにしています。
# file: notify_weekdays_1730.py
import schedule
from datetime import datetime
import time
def send_notification():
now = datetime.now()
print(f"{now:%Y-%m-%d %H:%M} 平日17:30の通知を送信しました")
# 月〜金の17:30に登録
for day in (schedule.every().monday,
schedule.every().tuesday,
schedule.every().wednesday,
schedule.every().thursday,
schedule.every().friday):
day.at("17:30").do(send_notification)
if __name__ == "__main__":
print("notification scheduler started")
try:
while True:
schedule.run_pending()
time.sleep(1)
except KeyboardInterrupt:
print("stopped")
notification scheduler started
2025-09-22 17:30 平日17:30の通知を送信しました
複数ジョブをまとめて管理
タグでジョブをグループ化し、引数付きジョブも含めて管理する小さな管理クラス例です。
安全に終了できるようシグナルも補助しています。
# file: manage_multiple_jobs.py
import schedule
import time
import signal
from datetime import datetime
from typing import Callable, Any
STOP = False # 終了フラグ
def setup_signal_handlers():
"""Ctrl+CやSIGTERMで安全に停止する"""
def handler(signum, frame):
global STOP
print(f"signal received: {signum}, stopping...")
STOP = True
signal.signal(signal.SIGINT, handler)
# WindowsではSIGTERMが無い場合があるため存在チェック
if hasattr(signal, "SIGTERM"):
signal.signal(signal.SIGTERM, handler)
def add_job(func: Callable[..., Any], spec: str, *, tag: str, **kwargs):
"""
spec: 簡易的な頻度指定
- "min" -> 毎分
- "5min" -> 5分おき
- "hour:30" -> 毎時30分
- "daily@09:00" -> 毎日09:00
kwargs: funcに渡すキーワード引数
"""
if spec == "min":
return schedule.every().minute.do(func, **kwargs).tag(tag)
if spec.endswith("min") and spec[:-3].isdigit():
n = int(spec[:-3])
return schedule.every(n).minutes.do(func, **kwargs).tag(tag)
if spec.startswith("hour:"):
mm = spec.split(":", 1)[1]
return schedule.every().hour.at(f":{mm}").do(func, **kwargs).tag(tag)
if spec.startswith("daily@"):
t = spec.split("@", 1)[1]
return schedule.every().day.at(t).do(func, **kwargs).tag(tag)
raise ValueError(f"unsupported spec: {spec}")
def job_log(message: str):
print(f"[{datetime.now():%H:%M:%S}] {message}")
def job_sum(a: int, b: int):
print(f"{a} + {b} = {a + b}")
def main():
setup_signal_handlers()
# ログを毎分
add_job(job_log, "min", tag="log", message="heartbeat")
# 毎時30分にお知らせ
add_job(job_log, "hour:30", tag="news", message="30分の定期メッセージ")
# 毎日09:00に計算実行(引数付き)
add_job(job_sum, "daily@09:00", tag="calc", a=40, b=2)
print("jobs loaded:")
for j in schedule.get_jobs():
print(" -", j)
# 実行ループ
while not STOP:
schedule.run_pending()
time.sleep(1)
# 終了時に特定タグのみクリアする例
schedule.clear("news")
print("scheduler stopped")
if __name__ == "__main__":
main()
jobs loaded:
- Every 1 minute do job_log() (last run: [never], next run: 2025-09-21 12:01:00)
- Every 1 hour at :30 do job_log() (last run: [never], next run: 2025-09-21 12:30:00)
- Every 1 day at 09:00 do job_sum() (last run: [never], next run: 2025-09-22 09:00:00)
[12:01:00] heartbeat
[12:02:00] heartbeat
signal received: 2, stopping...
scheduler stopped
参考: 代表的な頻度指定の対応表
分かりやすさのために、よく使う指定とコード例をまとめます。
実行タイミング | 記述例 |
---|---|
毎秒 | schedule.every().seconds.do(job) |
5秒ごと | schedule.every(5).seconds.do(job) |
毎分 | schedule.every().minute.do(job) |
5分ごと | schedule.every(5).minutes.do(job) |
毎時 | schedule.every().hour.do(job) |
毎時30分 | schedule.every().hour.at(":30").do(job) |
毎日9時 | schedule.every().day.at("09:00").do(job) |
毎週月曜10時 | schedule.every().monday.at("10:00").do(job) |
隔週金曜18時 | schedule.every(2).weeks.at("18:00").do(job) |
プロセスが動いている間だけ有効である点を常に意識し、サービスとして動かす場合はsystemdやWindowsタスクスケジューラなどでプロセスを常駐させるとよいです。
まとめ
scheduleは、「Pythonプロセス内で簡単に定期実行を組み込みたい」という要件に最適な軽量ライブラリです。
インストールや書き方は極めてシンプルで、every(...).seconds/minutes/hours/day.at(...)
とrun_pending()
の組み合わせだけで基本機能は足ります。
さらに、タグ管理、引数付きジョブ、例外処理とリトライ、長時間処理の重複防止などの実務的な工夫を加えれば、日次バックアップや平日通知、定期ログ出力といったタスクを堅牢に運用できます。
タイムゾーンやプロセス常駐の課題は運用環境側の設定とあわせて対処し、もし要件が増えてきたらAPSchedulerやOSのスケジューラへの移行も検討すると安心です。