閉じる

Pythonのscheduleで定期実行を実装する方法とサンプル

小さなスクリプトを決まったタイミングで動かしたい時に、Pythonのscheduleライブラリはとても手軽です。

OSのタスクスケジューラやcronほど重装備ではありませんが、常駐プロセスの中で周期的に関数を呼び出すには十分です。

ここではインストールから頻度指定、実行ループ、エラー対策、実用サンプルまで丁寧に解説します。

Pythonのscheduleとは?定期実行の基本

scheduleの特徴と用途

scheduleは、Pythonプロセス内で軽量に定期実行を実現するライブラリです。

指定した関数やメソッドを、一定間隔や特定の時刻に合わせて実行できます。

内部的にはスレッドやプロセスを勝手に増やすことはなく、あなたが書くループの中でrun_pending()を呼び出して「実行時刻になったジョブだけ」を順に実行します。

用途の例としては、ログの定期出力、バックアップの実行、APIのポーリング、軽量なメンテナンス処理のトリガなどが挙げられます。

ただし、プロセスが停止すればジョブも止まるため、システムレベルの常駐が必要ならOSのタスクスケジューラやcron、より本格的なAPSchedulerのようなライブラリも検討してください

インストール

Pythonが導入済みなら、pipでインストールできます。

Shell
# 仮想環境の利用をおすすめします
python -m venv .venv
# Windows
.venv\Scripts\activate
# macOS/Linux
source .venv/bin/activate

# scheduleのインストール
pip install schedule

# 参考: バージョン確認
pip show schedule

インポートだけで使える軽量パッケージなので、外部サービスの設定などは不要です。

最小サンプル

最小の動作を確かめるために、数秒ごとにメッセージを出す例です。

実運用では分単位や日次で使うことが多いですが、学習時は秒単位が便利です。

Python
# file: minimal_schedule.py
import schedule
import time
from datetime import datetime

def hello():
    """定期的に呼び出される関数"""
    print(f"[{datetime.now().strftime('%H:%M:%S')}] Hello, schedule!")

# 5秒ごとにhelloを実行
schedule.every(5).seconds.do(hello)

# ループして実行時刻になったジョブを動かす
# Ctrl+Cで停止できます
try:
    while True:
        schedule.run_pending()  # そろそろ実行すべきジョブがあれば実行
        time.sleep(1)           # CPUを占有しないように少し待つ
except KeyboardInterrupt:
    print("Stopped.")
実行結果
[12:00:05] Hello, schedule!
[12:00:10] Hello, schedule!
[12:00:15] Hello, schedule!
^C
Stopped.

scheduleは「ループの中でrun_pending()を呼ぶ」ことが必須です。

これを忘れると何も起きません。

scheduleでの書き方と頻度指定

毎分・毎時の定期実行

基本の間隔指定は数値と単位を組み合わせます。

毎分・毎時の実行例は以下の通りです。

Python
import schedule
import time

def job_minute():
    print("毎分のジョブを実行")

def job_hour():
    print("毎時のジョブを実行")

# 毎分
schedule.every().minute.do(job_minute)

# 5分ごと
schedule.every(5).minutes.do(lambda: print("5分おきのジョブ"))

# 毎時
schedule.every().hour.do(job_hour)

# 各ループ
while True:
    schedule.run_pending()
    time.sleep(1)

特定の分で実行したい場合毎時30分ちょうどのように.at(':30')が使えます。

Python
# 各時間の30分に実行 (例: 10:30, 11:30, 12:30, ...)
schedule.every().hour.at(":30").do(lambda: print("各時間の30分に実行"))

毎日(9:00など)の定期実行

毎日9時ちょうどに実行するには、every().day.at("09:00")を使います。

Python
import schedule
from datetime import datetime

def backup():
    print(f"{datetime.now()} バックアップを実行中...")

schedule.every().day.at("09:00").do(backup)

時刻文字列は24時間表記で"HH:MM"または"HH:MM:SS"です。

処理はローカル時刻で解釈されます(タイムゾーンは後述)。

平日・週次の定期実行

曜日ごとの指定も分かりやすく書けます。

Python
import schedule

def weekday_task():
    print("平日だけのタスク")

# 曜日を個別に登録 (平日のみ)
schedule.every().monday.at("10:00").do(weekday_task)
schedule.every().tuesday.at("10:00").do(weekday_task)
schedule.every().wednesday.at("10:00").do(weekday_task)
schedule.every().thursday.at("10:00").do(weekday_task)
schedule.every().friday.at("10:00").do(weekday_task)

# 週次 (毎週火曜の13:15)
schedule.every().tuesday.at("13:15").do(lambda: print("週次タスク"))
# 2週間ごと (隔週金曜の18:00) も可能
schedule.every(2).weeks.do(lambda: print("隔週タスク")).at("18:00")

平日だけ実行したい時は、上のように月〜金を個別に登録するのが簡単です。

1件のジョブで条件分岐しても良いですが、時刻指定の柔軟性は分けた方が保ちやすいです。

複数ジョブの登録

複数のジョブを登録すると、run_pending()は「いま実行すべきジョブ」を順に処理します。

ジョブにはtagを付けて管理できます。

Python
import schedule
import time

def job_a():
    print("Aを実行")

def job_b():
    print("Bを実行")

# タグを付けて登録
schedule.every(2).minutes.do(job_a).tag("group1", "fast")
schedule.every().hour.do(job_b).tag("group1", "slow")

# タグで停止・削除も可能
def stop_group1():
    print("group1をクリア")
    schedule.clear("group1")

# 実験用に10分後に削除を仕込む
schedule.every(10).minutes.do(stop_group1).tag("admin")

while True:
    schedule.run_pending()
    time.sleep(1)

タグ無しですべて消す場合はschedule.clear()です。

現在登録中のジョブはschedule.get_jobs()で一覧できます。

引数付きジョブ

ジョブに引数を渡すには.do(func, arg1, kw=value)と書きます。

Python
import schedule
from datetime import datetime

def greet(name, punctuation="!"):
    print(f"{datetime.now():%H:%M:%S} Hello, {name}{punctuation}")

# 3分おきに引数付きで実行
schedule.every(3).minutes.do(greet, "Alice", punctuation="!!!")

引数の評価はスケジュール登録時に行われるため、副作用のあるオブジェクトをその場で生成して渡す場合は注意してください。

必要ならfunctools.partialを活用します。

実行ループとエラー対策

while Trueとtime.sleep

典型的な実行ループは次の通りです。

CPU使用率を抑えるためtime.sleepを入れます。

Python
import schedule
import time

def loop_forever():
    try:
        while True:
            schedule.run_pending()
            time.sleep(1)
    except KeyboardInterrupt:
        print("Stopped.")

loop_forever()

待ち時間を最適化したい場合、schedule.idle_seconds()を使うと次回実行までの秒数が取れます。

Python
import schedule
import time
import math

def efficient_loop():
    try:
        while True:
            schedule.run_pending()
            idle = schedule.idle_seconds()  # 次のジョブまでの秒数 or None
            if idle is None:
                time.sleep(1)
            else:
                # 0以下の場合はすぐにrun_pendingに戻る
                time.sleep(max(0.0, min(idle, 60)))  # 長くても60秒まで
    except KeyboardInterrupt:
        print("Stopped.")

efficient_loop()

例外処理とリトライ

ジョブの中で例外が起きても、run_pending()の呼び出し自体は継続できます。

各ジョブの内部でtry/exceptし、ログを残しつつリトライのポリシーを実装すると安定します。

Python
import schedule
import time
from random import random

def with_retry(task, *, retries=3, base_delay=1.0, factor=2.0):
    """指数バックオフで簡易リトライを行うラッパー"""
    def wrapper(*args, **kwargs):
        delay = base_delay
        for attempt in range(1, retries + 1):
            try:
                return task(*args, **kwargs)
            except Exception as e:
                print(f"[{task.__name__}] エラー: {e} (試行{attempt}/{retries})")
                if attempt == retries:
                    print(f"[{task.__name__}] 失敗を記録して中断")
                    break
                time.sleep(delay)
                delay *= factor
    return wrapper

@with_retry
def flaky_job():
    # たまに失敗する処理の例
    if random() < 0.5:
        raise RuntimeError("一時的な失敗")
    print("成功!")

schedule.every(30).seconds.do(flaky_job)

try:
    while True:
        schedule.run_pending()
        time.sleep(1)
except KeyboardInterrupt:
    print("Stopped.")
実行結果
[flaky_job] エラー: 一時的な失敗 (試行1/3)
[flaky_job] エラー: 一時的な失敗 (試行2/3)
成功!

永久リトライは無限ループの原因になるため、回数制限やアラート通知を併用するのがおすすめです。

ジョブを中止したい場合

条件によってそれ以降のスケジュールをやめたい時は、ジョブの中でreturn schedule.CancelJobを返します。

Python
import schedule

counter = 0
def limited_job():
    global counter
    counter += 1
    print(f"実行{counter}回目")
    if counter >= 3:
        print("このジョブは終了します")
        return schedule.CancelJob

schedule.every(5).seconds.do(limited_job)

長時間処理の重複防止

scheduleはデフォルトでは1スレッドで順番に処理するため、同じジョブが重複起動することは基本的にありません

ただし、長時間処理を別スレッドで動かす設計にした場合や、複数プロセスから同じジョブを実行し得る場合は排他制御が必要です。

以下は同一プロセス内での簡易ロックの例です。

Python
import schedule
import threading
import time

lock = threading.Lock()

def long_running_task():
    # 今動作中ならスキップ
    if not lock.acquire(blocking=False):
        print("前回の処理がまだ終わっていないためスキップしました")
        return
    try:
        print("長時間処理を開始...")
        time.sleep(10)  # 実際の重い処理を想定
        print("長時間処理が完了しました")
    finally:
        lock.release()

# 1分おきに起動要求が来ても、重複は起こさない
schedule.every().minute.do(long_running_task)

while True:
    schedule.run_pending()
    time.sleep(1)

他プロセスや他ホストと排他したい場合は、ファイルロックやDBロック、分散ロック(Redisなど)を用意します。

ローカル時刻とタイムゾーン

scheduleはローカル時刻を使います

サーバのタイムゾーンがUTCならevery().day.at("09:00")は「UTCの9時」です。

日本時間9時に合わせたいなら、以下のいずれかを検討します。

  • プロセスが動作するOSのタイムゾーンをAsia/Tokyoに設定する
  • コンテナやサービス単位でTZ環境変数を設定する
  • どうしてもUTCで動かすなら、UTCでの対応時刻(例: 日本時間9時はUTCで0時 or 0時/1時(DSTなしなら固定))に変換して登録する
  • 応急的に「毎分実行して、指定タイムゾーンでの時刻が条件に合う時だけ本処理を行う」ガードを入れる
Python毎分動かしつつ、Asia/Tokyoの17:30だけ実処理を実行
from datetime import datetime
from zoneinfo import ZoneInfo
import schedule
import time

last_run_date = None  # 同じ日での二重実行防止

def notify_tokyo_1730():
    global last_run_date
    now_tokyo = datetime.now(ZoneInfo("Asia/Tokyo"))
    # 17:30ちょうどの分で、未実行なら実行
    if now_tokyo.hour == 17 and now_tokyo.minute == 30:
        if last_run_date != now_tokyo.date():
            print(f"Tokyo 17:30 通知を送信: {now_tokyo.isoformat()}")
            last_run_date = now_tokyo.date()
        else:
            # 同じ分内で複数回呼ばれないようガード
            pass

# 毎分チェック
schedule.every().minute.do(notify_tokyo_1730)

while True:
    schedule.run_pending()
    time.sleep(1)

サマータイム(DST)がある地域の時刻指定は特に注意です。

DSTに強いスケジューリングが必要なら、APSchedulerなどのタイムゾーン対応が豊富なライブラリも検討しましょう。

コピペOKのscheduleサンプル

ログを毎分出力

最小限のログ出力を毎分行い、停止はCtrl+Cで行います。

Python
# file: heartbeat_every_minute.py
import schedule
import time
import logging
from datetime import datetime

# ロガー設定
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s %(levelname)s %(message)s"
)

def heartbeat():
    """アプリが生きていることを示すハートビート"""
    logging.info("heartbeat")

# 毎分実行
schedule.every().minute.do(heartbeat)

def main():
    logging.info("start scheduler")
    try:
        while True:
            schedule.run_pending()
            time.sleep(1)
    except KeyboardInterrupt:
        logging.info("stopped by user")

if __name__ == "__main__":
    main()
実行結果
2025-09-21 12:00:00,000 INFO start scheduler
2025-09-21 12:01:00,001 INFO heartbeat
2025-09-21 12:02:00,001 INFO heartbeat
2025-09-21 12:03:00,001 INFO heartbeat
2025-09-21 12:03:10,123 INFO stopped by user

バックアップを毎日9:00に実行

指定ディレクトリを日付で丸ごとコピーする簡単バックアップです。

失敗時はリトライします。

Python
# file: daily_backup_0900.py
import schedule
import time
import shutil
from datetime import datetime
from pathlib import Path

SRC = Path("data")                       # バックアップ元
DST_BASE = Path("backup")                # バックアップ先のルート
RETRIES = 3

def dated_backup_dir(base: Path, now: datetime) -> Path:
    """日付ベースのバックアップディレクトリ"""
    return base / now.strftime("%Y%m%d_%H%M%S")

def backup():
    """ディレクトリを丸ごとコピーするバックアップ"""
    now = datetime.now()
    dst = dated_backup_dir(DST_BASE, now)
    for attempt in range(1, RETRIES + 1):
        try:
            dst.parent.mkdir(parents=True, exist_ok=True)
            print(f"[{now}] backup start -> {dst}")
            shutil.copytree(SRC, dst)  # 存在する場合は例外
            print(f"[{datetime.now()}] backup done")
            return
        except FileExistsError:
            # 同一時刻での重複を避けるため1秒待って再生成
            time.sleep(1)
            dst = dated_backup_dir(DST_BASE, datetime.now())
        except Exception as e:
            print(f"backup error: {e} (attempt {attempt}/{RETRIES})")
            time.sleep(2 ** attempt)
    print("backup failed after retries")

# 毎日9:00にバックアップ
schedule.every().day.at("09:00").do(backup)

if __name__ == "__main__":
    print("scheduler started")
    try:
        while True:
            schedule.run_pending()
            time.sleep(1)
    except KeyboardInterrupt:
        print("stopped")
実行結果
scheduler started
[2025-09-21 09:00:00.001234] backup start -> backup/20250921_090000
[2025-09-21 09:00:05.678901] backup done

平日17:30に通知を送る

月〜金の17:30にメッセージを送る例です。

ここでは標準出力に出すだけにしています。

Python
# file: notify_weekdays_1730.py
import schedule
from datetime import datetime
import time

def send_notification():
    now = datetime.now()
    print(f"{now:%Y-%m-%d %H:%M} 平日17:30の通知を送信しました")

# 月〜金の17:30に登録
for day in (schedule.every().monday,
            schedule.every().tuesday,
            schedule.every().wednesday,
            schedule.every().thursday,
            schedule.every().friday):
    day.at("17:30").do(send_notification)

if __name__ == "__main__":
    print("notification scheduler started")
    try:
        while True:
            schedule.run_pending()
            time.sleep(1)
    except KeyboardInterrupt:
        print("stopped")
実行結果
notification scheduler started
2025-09-22 17:30 平日17:30の通知を送信しました

複数ジョブをまとめて管理

タグでジョブをグループ化し、引数付きジョブも含めて管理する小さな管理クラス例です。

安全に終了できるようシグナルも補助しています。

Python
# file: manage_multiple_jobs.py
import schedule
import time
import signal
from datetime import datetime
from typing import Callable, Any

STOP = False  # 終了フラグ

def setup_signal_handlers():
    """Ctrl+CやSIGTERMで安全に停止する"""
    def handler(signum, frame):
        global STOP
        print(f"signal received: {signum}, stopping...")
        STOP = True
    signal.signal(signal.SIGINT, handler)
    # WindowsではSIGTERMが無い場合があるため存在チェック
    if hasattr(signal, "SIGTERM"):
        signal.signal(signal.SIGTERM, handler)

def add_job(func: Callable[..., Any], spec: str, *, tag: str, **kwargs):
    """
    spec: 簡易的な頻度指定
        - "min"         -> 毎分
        - "5min"        -> 5分おき
        - "hour:30"     -> 毎時30分
        - "daily@09:00" -> 毎日09:00
    kwargs: funcに渡すキーワード引数
    """
    if spec == "min":
        return schedule.every().minute.do(func, **kwargs).tag(tag)
    if spec.endswith("min") and spec[:-3].isdigit():
        n = int(spec[:-3])
        return schedule.every(n).minutes.do(func, **kwargs).tag(tag)
    if spec.startswith("hour:"):
        mm = spec.split(":", 1)[1]
        return schedule.every().hour.at(f":{mm}").do(func, **kwargs).tag(tag)
    if spec.startswith("daily@"):
        t = spec.split("@", 1)[1]
        return schedule.every().day.at(t).do(func, **kwargs).tag(tag)
    raise ValueError(f"unsupported spec: {spec}")

def job_log(message: str):
    print(f"[{datetime.now():%H:%M:%S}] {message}")

def job_sum(a: int, b: int):
    print(f"{a} + {b} = {a + b}")

def main():
    setup_signal_handlers()

    # ログを毎分
    add_job(job_log, "min", tag="log", message="heartbeat")

    # 毎時30分にお知らせ
    add_job(job_log, "hour:30", tag="news", message="30分の定期メッセージ")

    # 毎日09:00に計算実行(引数付き)
    add_job(job_sum, "daily@09:00", tag="calc", a=40, b=2)

    print("jobs loaded:")
    for j in schedule.get_jobs():
        print(" -", j)

    # 実行ループ
    while not STOP:
        schedule.run_pending()
        time.sleep(1)

    # 終了時に特定タグのみクリアする例
    schedule.clear("news")
    print("scheduler stopped")

if __name__ == "__main__":
    main()
実行結果
jobs loaded:
 - Every 1 minute do job_log() (last run: [never], next run: 2025-09-21 12:01:00)
 - Every 1 hour at :30 do job_log() (last run: [never], next run: 2025-09-21 12:30:00)
 - Every 1 day at 09:00 do job_sum() (last run: [never], next run: 2025-09-22 09:00:00)
[12:01:00] heartbeat
[12:02:00] heartbeat
signal received: 2, stopping...
scheduler stopped

参考: 代表的な頻度指定の対応表

分かりやすさのために、よく使う指定とコード例をまとめます。

実行タイミング記述例
毎秒schedule.every().seconds.do(job)
5秒ごとschedule.every(5).seconds.do(job)
毎分schedule.every().minute.do(job)
5分ごとschedule.every(5).minutes.do(job)
毎時schedule.every().hour.do(job)
毎時30分schedule.every().hour.at(":30").do(job)
毎日9時schedule.every().day.at("09:00").do(job)
毎週月曜10時schedule.every().monday.at("10:00").do(job)
隔週金曜18時schedule.every(2).weeks.at("18:00").do(job)

プロセスが動いている間だけ有効である点を常に意識し、サービスとして動かす場合はsystemdやWindowsタスクスケジューラなどでプロセスを常駐させるとよいです。

まとめ

scheduleは、「Pythonプロセス内で簡単に定期実行を組み込みたい」という要件に最適な軽量ライブラリです。

インストールや書き方は極めてシンプルで、every(...).seconds/minutes/hours/day.at(...)run_pending()の組み合わせだけで基本機能は足ります。

さらに、タグ管理、引数付きジョブ、例外処理とリトライ、長時間処理の重複防止などの実務的な工夫を加えれば、日次バックアップや平日通知、定期ログ出力といったタスクを堅牢に運用できます。

タイムゾーンやプロセス常駐の課題は運用環境側の設定とあわせて対処し、もし要件が増えてきたらAPSchedulerやOSのスケジューラへの移行も検討すると安心です。

この記事を書いた人
エーテリア編集部
エーテリア編集部

人気のPythonを初めて学ぶ方向けに、文法の基本から小さな自動化まで、実際に手を動かして理解できる記事を書いています。

クラウドSSLサイトシールは安心の証です。

URLをコピーしました!