読者です 読者をやめる 読者になる 読者になる

Celeryでタスクをプリフェッチする数を変更する

Python Celery

celerydのメインプロセスは、タスクのメッセージをブローカーから受け取った後、ワーカープロセスに処理を実行させる。
メッセージをブローカーから受け取る際、実行中のものとは別に、事前にメッセージを取得(プリフェッチ)するようになっている。
全てのワーカープロセスが処理中であってもプリフェッチは行われる。
それぞれのタスクの処理に長い時間がかかる場合、複数のマシンでcelerydを動かしていても、このプリフェッチによってタスクが待ち状態になってしまう状況が発生する。
このプリフェッチ数は CELERYD_PREFETCH_MULTIPLIER の値を設定することで変更できる。デフォルト値は4。これは1ワーカーごとの値となる。(例: concurrencyを3に設定した場合は12)
http://docs.celeryproject.org/en/latest/configuration.html#celeryd-prefetch-multiplier
CELERYD_PREFETCH_MULTIPLIERを0に指定した場合、制限なくプリフェッチする?ようです。
余分なプリフェッチをしない場合は、CELERYD_PREFETCH_MULTIPLIERを1にして、CELERY_ACKS_LATEにTrueを指定する。
Optimizing — Celery 3.1.23 documentation
検証した環境は、Python2.7, Celery3.1.8。

検証用のタスク

spam.py
import time
from celery import Celery

app = Celery('spam', backend='amqp', broker='amqp://guest@localhost//')

@app.task
def test():
    return time.sleep(60)

60秒待って終了するだけのタスク。

デフォルト設定での実行結果

CeleryのworkerはあらかじめログレベルINFOで起動しておく。

$ celery -A spam worker -l info -c 1

testタスクを6回実行する。

$ for a in $(seq 1 6); do python -c "__import__('spam').test.delay()"; done;

こうすると、celeryのログでは、次のようになる。

[2014-02-06 18:25:36,658: INFO/MainProcess] Received task: spam.test[0b469a10-8d53-4975-bb69-66c6b906995a]
[2014-02-06 18:25:37,082: INFO/MainProcess] Received task: spam.test[070dfed8-72c4-4fb7-8339-991d1f8ad707]
[2014-02-06 18:25:37,501: INFO/MainProcess] Received task: spam.test[e76fca81-086e-48b3-8a29-2526b775528d]
[2014-02-06 18:25:37,928: INFO/MainProcess] Received task: spam.test[9ef8b908-badb-467d-ada7-89ef52fcaac3]
[2014-02-06 18:25:38,345: INFO/MainProcess] Received task: spam.test[1620fe6c-3b2d-4af8-88c8-361e925d7f07]
[2014-02-06 18:26:36,737: INFO/MainProcess] Task spam.test[0b469a10-8d53-4975-bb69-66c6b906995a] succeeded in 60.076744666s: None
[2014-02-06 18:26:36,742: INFO/MainProcess] Received task: spam.test[053d723a-7b3a-413d-a259-ce96b0dd3f11]
[2014-02-06 18:27:36,800: INFO/MainProcess] Task spam.test[070dfed8-72c4-4fb7-8339-991d1f8ad707] succeeded in 60.059771921s: None

最初に4つのタスクをプリフェッチし、1つ目のタスクがワーカーによって実行状態になる。その後、追加で1つプリフェッチされる(4個までプリフェッチ)
実行中のタスクが終わると、2つ目のタスクが実行され、新たに1つのタスクがプリフェッチされる。

設定値の変更
import time
from celery import Celery

app = Celery('spam', backend='amqp', broker='amqp://guest@localhost//')
app.conf.CELERY_ACKS_LATE = True
app.conf.CELERYD_PREFETCH_MULTIPLIER = 1

@app.task
def test():
    return time.sleep(60)

この状態で同様に実行すると、余分にプリフェッチせずにタスクが実行される。

[2014-02-06 18:49:47,553: INFO/MainProcess] Received task: spam.test[026b88f5-1902-4d2e-885e-cea0e05dd319]
[2014-02-06 18:50:47,659: INFO/MainProcess] Task spam.test[026b88f5-1902-4d2e-885e-cea0e05dd319] succeeded in 60.102571496s: None
[2014-02-06 18:50:47,664: INFO/MainProcess] Received task: spam.test[4cac6dfb-c645-4f7a-8017-9f4a7e0f4099]
[2014-02-06 18:51:47,733: INFO/MainProcess] Task spam.test[4cac6dfb-c645-4f7a-8017-9f4a7e0f4099] succeeded in 60.066362381s: None
# 以下略

処理時間の長いタスクがあって、複数サーバーでCeleryを実行してる、キューも分けてないような場合にはこういう設定で問題を回避できることがあります。