Pyramid で Celery を使って非同期処理をする

Celery

Celery とは Python 製のタスクキューです。 非同期処理をするのに使います。

情弱なのでこの前の PyconJP 2012 で Celery の存在を知りました。 存在を知る前は、 Kestrel と MongoDB を使ってオレオレタスクキューを作って使っていました。 それ以前はリクエスト毎にスレッドを生成してそいつを走らせつつレスポンスを返してしまうような実装をしていました。

Django からこの Celery を使う日本語解説記事は複数見かけましたが、 Pyramid から使う解説記事は見つけられなかったのでぼくが書いておきます。

pyramid_celery をインストールする

$ pip install pyramid_celery

pip を使ってサクッと pyramid_celery をインストールします。

設定ファイルを編集する

Pyramid から先ほどインストールした pyramid_celery を使えるように、 PasteDeploy 設定ファイル( development.ini や production.ini )を編集します。

具体的には、 pyramid.includes に pyramid_celery を追加し、[app:main] の中に BROKER_URL を追加します。 BROKER_URL には、タスクを保管するメッセージキューの URL を設定します。 今回は、 RabbitMQ を使う前提でサンプルの値を入れてあります。

[app:main]
pyramid.includes =
    pyramid_celery
BROKER_URL = amqp://user:password@host:port//vhost
CELERY_RESULT_BACKEND = amqp://user:password@host:port//vhost

CELERY_RESULT_BACKEND の設定は Celery を使ってただ単に非同期処理をする上では必要のない設定ですが、これを設定してやることでタスクの状態(実行待ちか、実行後か)やタスクを実行した返り値などを取得することができます。

BROKER_URL や CELERY_RESULT_BACKEND としてデフォルトでは RabbitMQ を使うことになっていますが、 RabbitMQ 以外にも、 RedisMongoDBAmazon Simple Queue Service 、また SQLAlchemyDjango のORM を介して RDBMS を使うこともできるようです。 (それぞれのリンク先は Celery でこれらをストアとして使うための公式ドキュメント(英語)の該当する記事です)

タスクをプログラミングする

既存の Pyramid アプリケーションの中にタスクを定義します。 ここでは、 myapp.tasks としてタスクを定義していく事にします。

タスクの内容として、メールを送信したり画像のサムネイルを生成したりなどが考えられると思うのですが、どこの解説記事を読んでも(公式ドキュメントでさえも)足し算をタスクの例としていたので、ここでもそれにならって足し算を例として取り上げます。

from celery import task


@task
def add(x, y):
    return x + y

pceleryd を実行する

$ pceleryd ./development.ini

こいつがメッセージキューから実行すべきタスクを受け取ってタスクを実行し、その結果をストアに保存する役目を果たしてくれます。

タスクを呼び出す

$ pshell ./development.ini
>>> from myapp import tasks
>>> result = tasks.add.delay(5, 10)
>>> result.task_name
'myapp.tasks.add'
>>> result.ready()
True
>>> result.successful()
True
>>> result.result
15

このとおり、タスクが成功していることがわかります。 ここではインタラクティブシェルから直接タスクを呼び出していますが、実際に使うときはアプリケーションの任意の場所からリクエストに応じてタスクを実行します。

タスクを定期的に実行する

タスクを定期的に実行する方法として、伝統的には cron を用いる方法があります。 しかし Celery でもタスクを定期的に実行することができます。 Celery を使うことで、タスクを実行するスケジュールをアプリケーションのコードに含められるという利点があります。

タスクをプログラミングする

from celery.task import periodic_task
from celery.schedules import crontab
from datetime import timedelta


# 30秒おきに実行する
@periodic_task(run_every=timedelta(seconds=30))
def run_every_30seconds():
    # 何かの処理


# 毎月1日の8時0分に実行する
@periodic_task(run_every=crontab(minute='0', hour='8', day_of_month='1'))
def run_first_of_every_month_at_8():
    # 何かの処理

pceleryd と pcelerybeat を実行する

$ pceleryd ./development.ini
$ pcelerybeat /.development.ini

pceleryd の役目は上で説明したとおりです。 pcelerybeat の役目はタスクを実行すべき時刻になったら pceleryd にそのことを知らせるというものです。 従って、 pcelerybeat を実行しておきながら pceleryd を実行していない場合はそのタスクが実行されることはありません。

また、上記の2コマンドを実行する方法の他に、 pcelerybeat は実行せずに、 pceleryd に --beat オプションを追加することで1コマンドにまとめる方法もあります。

$ pceleryd ./development.ini --beat

注意

Celery 3.0 からは、 CELERY_ENABLE_UTC がデフォルトで True になっているため、 crontab に指定した時刻はすべて UTC として扱われます。 他のタイムゾーンの時刻で指定したい場合には、設定ファイルの CELERY_TIMEZONE に使いたいタイムゾーンを指定してやるか、サーバーのローカル時刻を使いたい場合は CELERY_ENABLE_UTC を False にしてやります。

また、 CELERY_TIMEZONE を指定する方法の場合、 pytz モジュールが必要なるので、 pip などでインストールしてください。

$ pip install pytz

最後に

Celery では celery.task を古いモジュールとして、もうこれ以上使われるべきでないとして使用を推奨していません。

from celery import Celery
celery = Celery()

@celery.task
def some_task():
    # 何らかの処理

以上のようにすることが推奨されていますが、 pyramid_celery のドキュメントには celery.task を使ってタスクが定義されていること、 pceleryd は celeryd の config オプションを無効化しているために PasteDeploy 設定ファイルに CELERYBEAT_SCHEDULE を指定する必要があり、その場合1行に Python の dict を文字列として記述し timedelta や crontab を使うには eval を使う必要があるなどカオスになることから古いスタイルを使っています。

StackOverFlow に、「 pyramid_celeryd は非常に薄いラッパーで、これがなくて Celery 単体で十分に使えるから pyramid_celery は必要とされてないよね!」って書いてある記事がありましたが、タスクがデータベースを使った処理をするときには PasteDeploy 設定ファイルから SQLAlchemy の engine を作ってそれを session に bind する処理が必要になり、また開発用と製品用でデータベースを分けている場合にはさらに処理が複雑になるので、ここでは pyramid_celery を使う方法を書きました。

参考 URL