読者です 読者をやめる 読者になる 読者になる

Celeryでキュー名を指定してタスクをルーティングする

2つ以上のホストでcelerydを起動し、タスクを処理させるホストを明示的に指定したい場合には、キュー名を指定することで実現できる。
Routing Tasks — Celery 3.1.23 documentation
試したバージョンは、Python 2.7、Celery 3.1.8、BrokerはRabbitMQ。

ワーカーのプロセスIDを返すタスクの作成

指定したプロセスで実行されたかを確認しやすいように、プロセスIDを返すだけのタスクを作成する。

spam.py
import os
from celery import Celery

app = Celery('spam', backend='amqp', broker='amqp://guest@localhost//')

@app.task
def hello():
    return os.getpid()

キュー名はtaskデコレータのqueue引数か、実行時にapply_asyncメソッドでqueue引数に指定できる。
今回は実行時に指定する。デフォルトのキュー名は celery となる。

ワーカーを起動する

1つ目のワーカーは、デフォルトのキュー名で起動する。ログレベルをINFOにすると、タスクの実行ログが出力されるので指定しておく。

$ celery -A spam worker -l info

2つ目のワーカーは、-Qオプションでキュー名にfooを指定する。

$ celery -A spam worker -l info -Q foo

タスクを実行した結果

対話シェルからタスクを実行して結果をprintしてみる。

>>> from spam import hello
>>> print hello.apply_async().get()
22113
>>> print hello.apply_async().get()
22113
>>> print hello.apply_async(queue='foo').get()
22065
>>> print hello.apply_async(queue='foo').get()
22065

キュー名を指定した場合と、そうでない場合でプロセスIDが変わることを確認できた。
ルーティングされたタスクの実行がワーカーのログで確認できる。