python-socketioとFastAPIを組み合わせて動かす

python-socketioのsioインスタンスを操作するWebAPIを作りたくて調べていました。

試した環境はUbuntu 18.04、Python 3.8、python-socketio 4.4.0、python-engineio 3.11.2、FastAPI 0.52.0です。

python-socketioのASGIAppクラスに other_asgi_app という引数があり、ここにASGIアプリケーションを指定すれば、socketio以外のトラフィックを指定したASGIアプリケーションに流してくれるようです。

python-socketio.readthedocs.io

コード

server.py:

import socketio
from fastapi import FastAPI

# setup fastapi
app_fastapi = FastAPI()
# setup socketio
sio = socketio.AsyncServer(async_mode='asgi')
app_socketio = socketio.ASGIApp(sio, other_asgi_app=app_fastapi)


@app_fastapi.get("/")
async def index():
    """fastapiのAPI実装(socketioに関係ない)
    """
    return {"result": "Index"}


@app_fastapi.get("/ping/{sid}")
async def ping(sid: str):
    """指定されたsidにemitするエンドポイント
    """
    sio.start_background_task(
        sio.emit,
        "ping", {"message": "ping from server"}, room=sid)
    return {"result": "OK"}


@sio.event
async def connect(sid, environ):
    """socketioのconnectイベント
    """
    print('connect ', sid)


@sio.event
async def disconnect(sid):
    """socketioのdisconnectイベント
    """
    print('disconnect ', sid)

client.py:

import asyncio
import socketio

sio = socketio.AsyncClient(reconnection=False)


@sio.on('ping')
async def on_ping(data):
    print('on_ping: ', data)


@sio.event
async def connect():
    print('connected.')


@sio.event
async def disconnect():
    print('disconnected.')


async def main():
    await sio.connect('http://localhost:8000')
    await sio.wait()

asyncio.run(main())

requirements.txt:

fastapi
uvicorn
python-socketio
aiohttp

実行結果

server:

# uvicornでサーバーアプリケーションを実行
$ uvicorn server:app_socketio
INFO:     Server initialized for asgi.
INFO:     Started server process [14975]
INFO:     Waiting for application startup.
INFO:     Application startup complete.
INFO:     Uvicorn running on http://127.0.0.1:8000 (Press CTRL+C to quit)
INFO:     db0b541370c040bea135855bbd0384eb: Sending packet OPEN data {'sid': 'db0b541370c040bea135855bbd0384eb', 'upgrades': ['websocket'], 'pingTimeout': 60000, 'pingInterval': 25000}
connect  db0b541370c040bea135855bbd0384eb
INFO:     db0b541370c040bea135855bbd0384eb: Sending packet MESSAGE data 0
INFO:     127.0.0.1:50052 - "GET /socket.io/?transport=polling&EIO=3&t=1583778107.0980704 HTTP/1.1" 200 OK
INFO:     ('127.0.0.1', 50052) - "WebSocket /socket.io/" [accepted]
INFO:     db0b541370c040bea135855bbd0384eb: Received request to upgrade to websocket
INFO:     db0b541370c040bea135855bbd0384eb: Upgrade to websocket successful
INFO:     db0b541370c040bea135855bbd0384eb: Received packet PING data None
INFO:     db0b541370c040bea135855bbd0384eb: Sending packet PONG data None
INFO:     127.0.0.1:50054 - "GET /ping/db0b541370c040bea135855bbd0384eb HTTP/1.1" 200 OK
INFO:     emitting event "ping" to db0b541370c040bea135855bbd0384eb [/]
INFO:     db0b541370c040bea135855bbd0384eb: Sending packet MESSAGE data 2["ping",{"message":"ping from server"}]

client:

# サーバー起動後にクライアントを起動
$ python client.py
connected.
on_ping:  {'message': 'ping from server'}
disconnected.

API呼び出し(curl):

# sidを指定してcurlでAPI呼び出し
$ curl -w '\n' http://localhost:8000/ping/db0b541370c040bea135855bbd0384eb
{"result":"OK"}

VagrantのUbuntu環境をアップグレードする

作業環境としてVagrant(VirtualBox)でUbuntu16.04を2017年ごろから使ってます。 そろそろ20.04も近くなってきたので、今更ですが既存の環境をUbuntu18.04に上げておこうかと思い、色々調べていたのでメモを残します。

使っているboxは ubuntu/xenial64ubuntu/bionic64

Vagrant box ubuntu/bionic64 - Vagrant Cloud

Q. Vagrantの仕組みで既存のUbuntu環境のメジャーアップデートはできるのか?

A.できない。Ubuntuなら通常通りdo-release-upgradeコマンドを使おう。新規にVMを作り直してもよいなら、そちらのほうが手軽。

Q. Vagrantfileのboxを変更したら既存のVMは使えない?

A.そのまま使える。boxの指定を ubuntu/xenial64 から ubuntu/bionic64 に変更しましたが、特に問題はなかった。

他に気をつけた点

  • アップグレードするとadd-apt-repositoryで追加したaptリポジトリが無効化されるので、 vagrant provision を再度実行すれば有効になるようにしとくのがよい。
  • ホスト名はアップグレードで変わらないので、気になるなら変更すればいい。
  • Vagrantfileに書いていたパッケージのインストールが、パッケージの名称変更により一部失敗していたので修正した。

実際にやった手順

  1. Vagrantfileの更新(boxの変更、パッケージ名の変更)
  2. VMsshで入って sudo do-release-upgrade
  3. ホスト名の変更 sudo hostnamectl set-hostname ubuntu-bionic
  4. sudo rm /etc/apt/sources.list.d/* (provisionでパッケージを入れ直すので)
  5. vagrant provision でaptのパッケージなど入れ直し。
  6. Pythonのvenvの作り直し

Pythonの入門者向け動画教材

去年から手伝っていて、今年の4月には公開されていたのですが、Pythonの入門者向けの動画の教材を作っていました。

JMOOCというオンライン講座の1つとして公開されています。 www.fisdom.org 講座はFisdomというプラットフォームで配信されているため、利用には登録が必要ですが、受講料はかかりません。

CMSコミュニケーションズの寺田さんと2人で半分ずつ分担してやってました。

Pythonを学習するための手助けになればいいなと思います。

2020/4/24追記

2020年度分についてはこちらです。

Pythonの入門者向け動画教材2020年4月開講分 - 偏った言語信者の垂れ流し

DjangoでLOGGING設定をしているときに、IPythonの補完に不具合が出る場合の対処

Djangoに限ったことではないのですが、PythonのloggingでrootのハンドラをDEBUGにしていると、IPythonの補完を実行した際にparsoという依存ライブラリのログが出てしまい、カーソル位置がずれてしまうことあります。

例えば、Djangoで以下のように settings.py でロギングの設定をした場合。

LOGGING = {
    'version': 1,
    'disable_existing_loggers': False,
    'handlers': {
        'console': {
            'level': 'DEBUG',
            'class': 'logging.StreamHandler',
        },
    },
    'root': {
        'handlers': ['console'],
        'level': 'DEBUG',
    }
}

Djangomanage.py shell では、IPythonがインストールされていると、IPythonが起動します。この設定で補完機能を使おうとTabキーを押したとき、次のようにログが出力されてカーソル位置がずれます。

f:id:nullpobug:20190623164234p:plain
IPythonの補完時にログが出力される

parsoというモジュール(ipython → jedi → parsoの依存)がデバッグログを出力するのですが、これでは補完がまともに使えなくなってしまいます。

parsoのログをINFOレベルに設定することで、回避できます。(もしくは disable_existing_loggers やrootロガーの設定変更で回避することもできます)

LOGGING = {
    'version': 1,
    'disable_existing_loggers': False,
    'handlers': {
        'console': {
            'level': 'DEBUG',
            'class': 'logging.StreamHandler',
        },
    },
    'loggers': {
        'parso': {
            'handlers': ['console'],
            'level': 'INFO',
            'propagate': False,
        },
    },
    'root': {
        'handlers': ['console'],
        'level': 'DEBUG',
    }
}

参考

オープンソースカンファレンス2019 HokkaidoでDjangoの紹介をしました

6/1に札幌で開催された オープンソースカンファレンス2019 Hokkaido で、django-ja名義でDjangoフレームワークの紹介をしてきました。

資料はSpeakerDeckにアップロードしています。

Djangoフレームワークの紹介_OSC北海道2019 - Speaker Deck

日本のDjangoユーザーのコミュニティである django-ja では、Djangoフレームワークに関する雑談や、DjangoCongressJPの開催、Djangoフレームワークのドキュメントやリソースの翻訳を行っています。

Home | djangoproject.jp

DjangoのSubqueryとOuterRefを使ってサブクエリを組み立てる

DjangoのORMで、サブクエリを使う方法について。任意のSQLであればrawメソッドを使えばよいのですが、なるべくORMのAPIを使いたい。

DjangoのORMでは任意の位置にサブクエリを使えるわけではないですが、例えば「テーブル単位での問い合わせ結果にサブクエリで得た列を追加する」ぐらいのことは、annotateメソッドを使ってできます。

サブクエリ側で条件を指定した絞り込みを行う場合、SubqueryとOuterRefを組み合わせるとできます。

Query Expressions | Django documentation | Django

モデル

Categoryモデルを親に持つ、Itemモデル、という関係性のデータです。

from django.db import models
from django.utils import timezone


class Category(models.Model):
    parent = models.ForeignKey(
        'myapp.Category', null=True, blank=True,
        on_delete=models.CASCADE)
    name = models.CharField(max_length=20)

    class Meta:
        db_table = 'category'
        ordering = ['id']

    def __str__(self):
        return self.name


class Item(models.Model):
    category = models.ForeignKey(
        'myapp.Category', null=True, blank=True,
        on_delete=models.SET_NULL)
    name = models.CharField(max_length=20)
    updated = models.DateTimeField(default=timezone.now)

    class Meta:
        db_table = 'item'
        ordering = ['id']

    def __str__(self):
        return self.name

Djangoのシェルで試したコード

CountなどのAggregate Expressionでうまくクエリを作れない場合、Subqueryを継承すると、自由にクエリを作れます。

サブクエリの結果をCountするCountQueryクラスを定義して使ってます。

from datetime import timedelta
from django.utils import timezone
from django.db.models import OuterRef, Subquery, IntegerField
import sqlparse
from myapp.models import Category, Item

class CountQuery(Subquery):
    """件数をカウントするサブクエリ
    """
    template = "(SELECT COUNT(*) FROM (%(subquery)s) _count)"
    output_field = IntegerField()

categories = Category.objects.annotate(
    updated_count=CountQuery(
        Item.objects.filter(
            category_id=OuterRef('pk'),
            updated__gt=timezone.now() - timedelta(days=1)  # 24時間以内に更新されたItemの件数を取得
        )
    )
)

print(sqlparse.format(str(categories.query), reindent=True))  # sqlparseでSQL文を整形

組み立てたSQLは、sqlparseで整形して表示してみてます。

SELECT "category"."id",
       "category"."parent_id",
       "category"."name",

  (SELECT COUNT(*)
   FROM
     (SELECT U0."id",
             U0."category_id",
             U0."name",
             U0."updated"
      FROM "item" U0
      WHERE (U0."category_id" = ("category"."id")
             AND U0."updated" > 2019-03-19 08:22:03.638946)
      ORDER BY U0."id" ASC) _count) AS "updated_count"
FROM "category"
ORDER BY "category"."id" ASC

理想はもう1段減らしたいのだけど、きれいに作る方法を見つけれてないです...

参考

Django 1.11 Annotating a Subquery Aggregate - Stack Overflow