django-database-taskをCloudTasks対応にした

引き続きDjango 6.0のタスクフレームワークを使うライブラリの整備をしています。

先日作成したdjango-database-task はタスクをデータベースに保存して、実行はコマンドツールや管理画面からの実行だけを最初に提供していました。

昨日の Django 6.0のタスクフレームワークでCloudTasksを利用するバックエンドを作った - 偏った言語信者の垂れ流し では、GCPのCloud Tasksだけを使うバックエンドを作成していました。

社内での利用を考えた場合、django-database-taskについてもCloud Tasks対応が必須のため、対応するバックエンドを作成しました。 Cloud Tasksを適用する部分は、タスクのキューイングのメッセージ通知と実行用のエンドポイントを呼び出しです。 READMEに使い方を記載しています。

Google Cloud Tasks Integration・tokibito/django-database-task

インストールはオプション指定すると依存パッケージが追加でインストールされます。

pip install django-database-task[cloudtasks]

サービスアカウントを設定していれば、ほとんど追加の設定なしで動かせます。

アーキテクチャ

タスクのエンキューから実行までの流れ。エンドポイントを /tasks/execute/<task_id>/ にした場合の例。

sequenceDiagram
    participant App as Application
    participant Backend as CloudTasksDatabaseBackend
    participant DB as Database
    participant CT as Cloud Tasks
    participant Handler as /tasks/execute/

    Note over App,Handler: Task Enqueue
    App->>Backend: task.enqueue(args, kwargs)
    Backend->>DB: INSERT task (status=READY)
    DB-->>Backend: Task ID
    Backend->>CT: Create Cloud Task (task_id only)
    CT-->>Backend: OK
    Backend-->>App: TaskResult (id, status=READY)

    Note over App,Handler: Task Execution (triggered by Cloud Tasks)
    CT->>Handler: POST /tasks/execute/<task_id>/<br/>(with OIDC token if configured)
    Handler->>Handler: Verify OIDC token (optional)
    Handler->>DB: SELECT task by ID
    DB-->>Handler: Task record
    Handler->>DB: UPDATE status=RUNNING
    Handler->>Handler: Execute task function
    alt Success
        Handler->>DB: UPDATE status=SUCCESSFUL
        Handler-->>CT: HTTP 200
    else Failure
        Handler->>DB: UPDATE status=FAILED
        Handler-->>CT: HTTP 500 (triggers retry)
    end

django-database-taskの場合、Cloud Tasksにはtask_idのみを預けるようにしていて、パラメータはペイロードに含めていません。 タスク実行用のパラメータはデータベースに保存されているはずなので、重複を避けるようにしています。

これを使うとどうなるか

django-database-taskのCloudTasks対応のバックエンドを利用すると、こんな感じのことができます。

  • Django 6.0のタスクフレームワークでタスクを定義できる
  • タスクを実行したときの引数や戻り値などはDBに格納され、ステータスも管理される
  • タスクをキューイングしたらCloudTasks経由で実行される(タスク実行用エンドポイント経由)
  • CloudTasksのキューを使って流量制御、再送制御ができる
  • AppEngineやCloudRunで動かすとき、Blue/Green環境で呼び出し元のバージョンでタスクを実行できる

流量制御や再送制御が結構うれしいポイントです。CloudTasksはエンドポイントがステータスコード200以外を返すと再送してくれるため、500エラーなどが出たときにそのままCloudTasksの基盤側から再実行がかかります。 たくさんのタスクが短時間にキューイングされた場合も、CloudTasksのキューの設定で単位時間あたりの実行数を設定しておけば、ゆっくり処理させたりできます。

AppEngineやCloudRunの場合、バージョンごとにドメイン名が割当てられるので、そのあたりも自動検出して呼び出し元のバージョンのホストのエンドポイントを呼ぶようにしています。Blue/Greenデプロイの対応は昔AppEngine SDKでも同様の対応がなされていたので、それを参考にした感じです。

いい感じにできたので、社内で使っていきたい気持ちです。