引き続きDjango 6.0のタスクフレームワークを使うライブラリの整備をしています。
先日作成したdjango-database-task はタスクをデータベースに保存して、実行はコマンドツールや管理画面からの実行だけを最初に提供していました。
昨日の Django 6.0のタスクフレームワークでCloudTasksを利用するバックエンドを作った - 偏った言語信者の垂れ流し では、GCPのCloud Tasksだけを使うバックエンドを作成していました。
社内での利用を考えた場合、django-database-taskについてもCloud Tasks対応が必須のため、対応するバックエンドを作成しました。 Cloud Tasksを適用する部分は、タスクのキューイングのメッセージ通知と実行用のエンドポイントを呼び出しです。 READMEに使い方を記載しています。
Google Cloud Tasks Integration・tokibito/django-database-task
インストールはオプション指定すると依存パッケージが追加でインストールされます。
pip install django-database-task[cloudtasks]
サービスアカウントを設定していれば、ほとんど追加の設定なしで動かせます。
アーキテクチャ
タスクのエンキューから実行までの流れ。エンドポイントを /tasks/execute/<task_id>/ にした場合の例。
sequenceDiagram
participant App as Application
participant Backend as CloudTasksDatabaseBackend
participant DB as Database
participant CT as Cloud Tasks
participant Handler as /tasks/execute/
Note over App,Handler: Task Enqueue
App->>Backend: task.enqueue(args, kwargs)
Backend->>DB: INSERT task (status=READY)
DB-->>Backend: Task ID
Backend->>CT: Create Cloud Task (task_id only)
CT-->>Backend: OK
Backend-->>App: TaskResult (id, status=READY)
Note over App,Handler: Task Execution (triggered by Cloud Tasks)
CT->>Handler: POST /tasks/execute/<task_id>/<br/>(with OIDC token if configured)
Handler->>Handler: Verify OIDC token (optional)
Handler->>DB: SELECT task by ID
DB-->>Handler: Task record
Handler->>DB: UPDATE status=RUNNING
Handler->>Handler: Execute task function
alt Success
Handler->>DB: UPDATE status=SUCCESSFUL
Handler-->>CT: HTTP 200
else Failure
Handler->>DB: UPDATE status=FAILED
Handler-->>CT: HTTP 500 (triggers retry)
end
django-database-taskの場合、Cloud Tasksにはtask_idのみを預けるようにしていて、パラメータはペイロードに含めていません。 タスク実行用のパラメータはデータベースに保存されているはずなので、重複を避けるようにしています。
これを使うとどうなるか
django-database-taskのCloudTasks対応のバックエンドを利用すると、こんな感じのことができます。
- Django 6.0のタスクフレームワークでタスクを定義できる
- タスクを実行したときの引数や戻り値などはDBに格納され、ステータスも管理される
- タスクをキューイングしたらCloudTasks経由で実行される(タスク実行用エンドポイント経由)
- CloudTasksのキューを使って流量制御、再送制御ができる
- AppEngineやCloudRunで動かすとき、Blue/Green環境で呼び出し元のバージョンでタスクを実行できる
流量制御や再送制御が結構うれしいポイントです。CloudTasksはエンドポイントがステータスコード200以外を返すと再送してくれるため、500エラーなどが出たときにそのままCloudTasksの基盤側から再実行がかかります。 たくさんのタスクが短時間にキューイングされた場合も、CloudTasksのキューの設定で単位時間あたりの実行数を設定しておけば、ゆっくり処理させたりできます。
AppEngineやCloudRunの場合、バージョンごとにドメイン名が割当てられるので、そのあたりも自動検出して呼び出し元のバージョンのホストのエンドポイントを呼ぶようにしています。Blue/Greenデプロイの対応は昔AppEngine SDKでも同様の対応がなされていたので、それを参考にした感じです。
いい感じにできたので、社内で使っていきたい気持ちです。