Technology

FastAPI で Server-Sent Events(SSE)を使い、処理の途中経過をストリーミング配信する

時間のかかる処理をWeb APIで実行するとき、「全部終わってから結果をまとめて返す」だけだと、利用者は完了まで何も表示されない時間をただ待つことになります。処理が数秒から数十秒かかる場合は、「いまどのくらい進捗しているか」を逐次画面に出せたほうが、体感の待ち時間は短くなります。

このような「サーバー側の進捗をリアルタイムにブラウザへ送り続ける」用途に使われるのがServer-Sent Events (SSE)です。SSEはHTMLの仕様の一部で、サーバーが一本の、開いたままのHTTPレスポンスを通じて、イベント(出来事の通知)を順次クライアントに送る仕組みです。言い換えると、通常はクライアントからリクエストが送られるとレスポンスを返して終了しますが、SSEではレスポンスを終了せずに通知が来るまで待機し、通知が来る度にクライアントに送るという形です。

サーバーからの一方向の通知・進捗・フィードで十分なら、双方向の WebSocket よりもSSEを使う方がシンプルです。

本記事では、FastAPIでSSEを使い、サーバー側の進捗を少しずつ作ってブラウザに送る方法を解説します。完了の伝え方や、処理の途中でエラーが起きたときの取り扱いも含みます。

前提

本記事は、以下の環境を用いています。

  • Python言語を用いて、fastapiライブラリを使います。
  • FastAPIは0.135.0 以降を用います。SSE専用のサポート(fastapi.sse)を使うためです。
  • Python 3.12で動作確認をしています。

※ この記事の動作確認は2026年6月中頃に実施しています。

目次

事前準備

FastAPI と開発用サーバー(Uvicorn)が入っていれば、SSE のために追加で入れるものはありません。FastAPIとUvicornは次のコマンドでインストールします。

pip install "fastapi[standard]"
# uv を使っている場合
uv add "fastapi[standard]"

fastapi[standard][standard]を付けると、FastAPI本体に加えて、Uvicornというサーバーや、後で使うfastapiコマンド(FastAPI CLI)などがまとめて入ります。

Uvicorn はASGI サーバーの一つです。ASGIとは、Pythonの非同期処理(async/await)に対応したWebアプリとサーバーの接続仕様のことです。ASGIはAsynchronous Server Gateway Interfaceの略です。

Pythonのバージョンは3.10以上が必須で、3.12以上が推奨です。

インストール後、FastAPI のバージョンを確認しておきます。

pip show fastapi
# uv の場合
uv pip show fastapi

表示される Version: が 0.135.0 以上であれば、本記事の内容を実行できます。

SSEとWebSocketの違い

SSEWebSocketとよく比較されます。違いを先に整理します。

項目SSEWebSocket
方向サーバーからクライアントへの一方向。クライアントからサーバーにメッセージを送ることはできない双方向。どちらからでも送れる
プロトコルふつうのHTTPの上で動く専用プロトコルに切り替える必要がある
再接続ブラウザ標準の受信APIであるEventSourceが、接続が切れると自動で再接続を試みる自動再接続は組み込まれていない
データテキスト(UTF-8)専用であるテキストとバイナリの両方を送れる

つまり「サーバーからの一方向の通知・進捗・フィードで十分」なら、SSE のほうがシンプルです。

SSE のメッセージ形式

SSEが「どんなテキストを流すのか」を表す仕様はとても簡潔です。

  • イベントストリーム(=流れていくデータ)はUTF-8のテキストです。
  • 1つのメッセージは、data:のような[名前:値]の形をしたフィールド行をいくつか並べ、空行(改行2つ)で区切ります。
  • 行頭が:(コロン)の行はコメントで、無視されます。接続が切れないようにする生存確認によく使われます。

いちばん単純なメッセージは次の通りです。最後に空行を1つ入れる点が重要です。

data: こんにちは
(空行)

data:の行は複数並べられます。

data: 1行目
data: 2行目
(空行)

受信側(EventSource)は、上の例のように連続するdata:行を改行でつなげて1 つのデータとして受け取ります。

event:という行を付けると、そのイベントに名前を付けられます。event:が無いメッセージは、ブラウザ側では「message」という既定の名前のイベントとして受け取られます。

event: progress
data: 進捗データ
(空行)

非同期ジェネレータで進捗を yield する

サーバー側でやることは「進捗を表すデータを、少しずつ生み出す」ことです。Pythonでは、非同期ジェネレータasync defの中でyieldする関数)が適しています。最小の例として、「重い処理を3段こなし、各段の完了を流す」場合を示します。重いI/Oの代わりに、await asyncio.sleep(1)で1秒待つことにします。

import asyncio

async def event_generator():
    steps = ["データを取得", "集計", "レポートを整形"]
    for i, name in enumerate(steps, start=1):
        await asyncio.sleep(1)        # 本来はここで重い処理(DB 問い合わせなど)
        yield {
            "type": "progress",
            "step": i,
            "total": len(steps),
            "message": f"{name} 完了",
        }

yield するたびに、その値が 1 チャンク(= 1 回の通信で送る、データのひとかたまり)としてクライアントへ送られます。

ポイントは2つあります。

  • yieldには辞書を渡します。渡した辞書はFastAPIが自動でJSONに変換し、data:行に入れてSSEのフォーマットで送ってくれます。data:や末尾の空行を自分で書く必要はありません。
  • await syncio.sleep(..)で待っている間、サーバーは他のリクエストを止めません。実際のコードでは、ここがawait付きの非同期I/O(外部API呼び出しやデータベースアクセス)になります。

これにより、この非同期関数を呼び出すことで、非同期ジェネレータが生み出す辞書を1つずつ取り出していくことができます。

補足:ジェネレータと非同期処理

ジェネレータ(yield)は、値を「少しずつ何度も」返せる関数です。ふつうの関数は return で値を 1 回返したら終わりですが、ジェネレータは return の代わりに yieldを使い、値を返すたびにいったん一時停止して、関数を抜けます。そして次の値を求められると、止まったところから再開します。

async / await は、非同期処理(待ち時間のある処理を、待っている間に他の作業を進められるようにする書き方)のための構文です。async def で定義した関数の中では await が使えます。await は「時間のかかる処理(外部との通信など)の完了を待つ」目印で、待っている間、その場で固まらずに他の処理へ順番を譲ります

イベントの種類を type で区別する

先ほどのコードでは、辞書にtypeというフィールドを持たせ、"progress"という値を入れていました。これは、進捗以外のイベントと区別するためのものです。

SSEでは、1本のストリームに複数の種類のイベントを流すことがあります。たとえば、処理の途中経過を伝える「進捗」だけでなく、すべて終わったことを伝える「完了」や、失敗を伝える「エラー」も、同じストリームで送りたくなります。受信側は、届いたイベントがどの種類なのかを見分けて、表示やその後の処理を変える必要があります。

そこで、各イベントのJSONにtypeという共通のフィールドを設け、"progress""done""error"のように種類を表す文字列を入れます。受信側はこのtypeを見るだけで、どの種類のイベントかを判別できます。先ほどの辞書にtypeを入れていたのは、このためです。

補足:event: フィールドではなく、JSONの中のtypeを使う理由

event: フィールドはユーザーが自由に名前を選べますが、EventSource 側が最初から使っている名前(error など)と偶然重なると、意味の違う同じ名前のイベントが受信口に混ざってしまいます。これを避けるため、data: の JSON に type という独自のフィールドを設け、そちらで種類を判別させます。

SSEでデータを送信する

FastAPIでSSEを構築するためにPydanticモデルとEventSourceResponseを用います。

Pydanticモデル

FastAPIでSSEを構築する場合、辞書の代わりにPydanticモデルをyieldする方法が推奨です。この場合、FastAPIがモデルを使ってデータの検証・ドキュメント化・JSONへの変換を行ってくれます。進捗イベントの形をモデルとして定義する例を示します。

from pydantic import BaseModel

class Progress(BaseModel):
    type: str
    step: int
    total: int
    message: str

モデルも辞書と同じように、自動でJSONへ変換されてdata:に入ります。Pydanticモデルを使って型を決めておくと、誤ったフィールドを送ろうとしたときにエラーになるため、間違いに気づきやすくなります。

EventSourceResponseで送信する

FastAPIでSSEを配信するには、fastapi.sseEventSourceResponseを使います。エンドポイントを定義するデコレータにresponse_class=EventSourceResponseを指定し、関数の中で先ほどのProgressモデルをyieldします。

import asyncio
from collections.abc import AsyncIterable

from fastapi import FastAPI
from fastapi.sse import EventSourceResponse
from pydantic import BaseModel

app = FastAPI()


class Progress(BaseModel):
    type: str
    step: int
    total: int
    message: str


@app.get("/stream", response_class=EventSourceResponse)
async def stream() -> AsyncIterable[Progress]:
    steps = ["データを取得", "集計", "レポートを整形"]
    for i, name in enumerate(steps, start=1):
        await asyncio.sleep(1)
        yield Progress(
            type="progress",
            step=i,
            total=len(steps),
            message=f"{name} 完了",
        )

response_classは、このエンドポイントが返すレスポンスのクラスを指定する引数です。response_class=EventSourceResponseを指定すると、このエンドポイントはSSE形式のレスポンスを返すようになります。関数の中でyieldした値は、自動的にJSONへ変換され、data:フィールドに入って送られます。Content-TypeもSSE用のtext/event-streamに設定されます。

戻り値の型注釈-> AsyncIterable[Progress]は、「Progressモデルを次々に生み出す非同期イテラブル」を返すという意味です。イテラブルとは、forで1つずつ順に取り出せるオブジェクトのことです。

EventSourceResponseは、SSEを実運用するうえで必要になる定型処理を、最初から行ってくれます。具体的には次の3つです。

項目内容
キープアライブメッセージが流れない時間が続くと、一部のプロキシが接続を切ってしまうことがあります。これを防ぐため、15秒ごとに:(コロン)で始まるキープアライブのコメント行が自動で送られます。
Cache-Control: no-cacheストリームがキャッシュされないよう、このヘッダーが自動で設定されます。
X-Accel-Buffering: nonginxなどの一部のプロキシで、レスポンスがためこまれる(バッファリングされる)のを防ぐためのヘッダーが自動で設定されます。

以前は、これらのヘッダーやキープアライブを自分でコードに書く必要がありましたが、EventSourceResponseを使う場合は、書き手が何もしなくても有効になります。

補足:キャッシュとバッファリングがなぜSSEで問題になるのか

1つ目のキャッシュは、進捗を伝える通知が「その時その時の最新の状態」を表すものなので、ブラウザや途中の機器に古い内容をキャッシュされて使いまわされると、利用者が古い進捗を見せられてしまう、という問題です。

2つ目のバッファリングは、サーバーが正しく少しずつ送っていても、途中にいるリバースプロキシ(ブラウザとサーバーの間に立って通信を中継するソフト。nginxはその代表例)がデータを一旦ためこんでからまとめて送ってしまうと、リアルタイム性が失われ、SSEを使う意味がなくなってしまう、という問題です。

EventSourceResponseは、この2つに対応するヘッダーを最初から付けてくれます。

動作を確認する

これまでのソースをまとめてmain.pyに書き込んだ後に、サーバーを起動します。

fastapi dev main.py
# uv の場合
uv run fastapi dev main.py

サーバーを起動したら、curlコマンドで動作確認します。

curl -N http://127.0.0.1:8000/stream

curlは、コマンドラインからHTTPリクエストを送れるツールです。-Nオプションを付けると、curl側がデータをため込まずに、イベントが届くたびにすぐ表示してくれます。

1秒ごとに、次のように1行ずつ出力されれば、正しく動作しています(文章の自動整形で空行が消えるため、代わりに文字で書いています)。


data: {"type":"progress","step":1,"total":3,"message":"データを取得 完了"}
(空行)
data: {"type":"progress","step":2,"total":3,"message":"集計 完了"}
(空行)
data: {"type":"progress","step":3,"total":3,"message":"レポートを整形 完了"}
(空行)

なお、メッセージの合間に:で始まる行(キープアライブのコメント)が混ざることがあります。これは前述のとおり、接続を保つために自動で送られているものです。

ここまでは、進捗(progress)だけを送ってきましたが、同じ要領で別の状態を表すメッセージも送れます。例えば、次のコードでは処理が終了したときに完了(done)もイベントとして送っています。

import asyncio
from collections.abc import AsyncIterable
from typing import Union

from fastapi import FastAPI
from fastapi.sse import EventSourceResponse
from pydantic import BaseModel

app = FastAPI()


class Progress(BaseModel):
    type: str
    step: int
    total: int
    message: str


class Done(BaseModel):
    type: str
    result: str


@app.get("/stream", response_class=EventSourceResponse)
async def stream() -> AsyncIterable[Union[Progress, Done]]:
    steps = ["データを取得", "集計", "レポートを整形"]
    for i, name in enumerate(steps, start=1):
        await asyncio.sleep(1)
        yield Progress(
            type="progress",
            step=i,
            total=len(steps),
            message=f"{name} 完了",
        )
    yield Done(type="done", result="全ステップ完了")

Doneを新しいモデルとして定義し、進捗のループがすべて終わった後にyieldします。ProgressDoneのどちらも流れるため、戻り値の型注釈はAsyncIterable[Union[Progress, Done]]とします。Unionは「これらのいずれかの型」を表す型注釈で、typingから読み込みます。

同様に、curl -Nで動作確認すると、以下の結果が返ってきます。

data: {"type":"progress","step":1,"total":3,"message":"データを取得 完了"}
(空行)
data: {"type":"progress","step":2,"total":3,"message":"集計 完了"}
(空行)
data: {"type":"progress","step":3,"total":3,"message":"レポートを整形 完了"}
(空行)
data: {"type":"done","result":"全ステップ完了"}
(空行)

SSEによって、種類の異なるJSONデータを逐次送れることが確認できました。

例外時にerrorイベントを流す

普通のAPIなら、エラー時にHTTPExceptionで、サーバー側の失敗を示すステータス(500など)を返せます。しかし、ストリーミングでは、最初のチャンクを送った時点でステータスコード(200)とヘッダが既に送出されているため、途中からそのステータスを変えることはできません。

そこで、処理の途中で起きたエラーは、ストリームの中の「エラーイベント」として流します。ジェネレータ全体をtry/exceptで囲み、例外を捕まえたらtype: "error"のイベントをyieldして終了します。エラーイベント用にErrorモデルを追加します。

次のコードでは、デモのため「集計」の段で必ずエラーを発生させ、それをexceptで捕まえてerrorイベントとして流しています。

import asyncio
from collections.abc import AsyncIterable
from typing import Union

from fastapi import FastAPI
from fastapi.sse import EventSourceResponse
from pydantic import BaseModel

app = FastAPI()


class Progress(BaseModel):
    type: str
    step: int
    total: int
    message: str


class Done(BaseModel):
    type: str
    result: str


class Error(BaseModel):
    type: str
    message: str


@app.get("/stream", response_class=EventSourceResponse)
async def stream() -> AsyncIterable[Union[Progress, Done, Error]]:
    steps = ["データを取得", "集計", "レポートを整形"]
    try:
        for i, name in enumerate(steps, start=1):
            await asyncio.sleep(1)
            # (デモ用)2 段目で失敗させる
            if name == "集計":
                raise ValueError("集計でエラーが発生しました")
            yield Progress(
                type="progress",
                step=i,
                total=len(steps),
                message=f"{name} 完了",
            )
        yield Done(type="done", result="全ステップ完了")
    except Exception as e:
        yield Error(type="error", message=str(e))

先ほどと同様に、curl -Nで見ると、1段目まで流れ、2段目でerrorに切り替わって終わります。

data: {"type":"progress","step":1,"total":3,"message":"データを取得 完了"}
(空行)
data: {"type":"error","message":"集計でエラーが発生しました"}
(空行)

受信側では、記事の後半で扱っているブラウザ側の受信例のように data.type === "error" のようなif文により、エラー表示に切り替えることになります。

注意

本番では、str(e) をそのまま外部(ブラウザなど)に送ると内部情報が漏れることがあります。利用者向けには定型のメッセージを流し、詳細はサーバー側のログに記録する、といった切り分けを検討します。

サーバー側の全体コード

ここまでのサーバー側のコードをまとめたものを以下に示します。typeフィールドで進捗・完了・エラーを区別し、エラーをイベントとして流す形です。別のオリジンからAPIを呼び込むためにCORSの設定も行っています。CORSについては前記事(FastAPIでCORSを設定して別ポートのフロントから呼べるようにする)で取り扱っています。ここではGETとPOSTそれぞれを使用した場合について示します。

GETを使用する

import asyncio
from collections.abc import AsyncIterable
from typing import Union

from fastapi import FastAPI
from fastapi.sse import EventSourceResponse
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel

app = FastAPI()

# 許可するオリジンの列挙
origins = [
    "http://localhost:5500",
    "http://127.0.0.1:5500",
]

# CORS の設定
app.add_middleware(
    CORSMiddleware,
    allow_origins=origins,
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)


class Progress(BaseModel):
    type: str
    step: int
    total: int
    message: str


class Done(BaseModel):
    type: str
    result: str


class Error(BaseModel):
    type: str
    message: str


@app.get("/stream", response_class=EventSourceResponse)
async def stream() -> AsyncIterable[Union[Progress, Done, Error]]:
    steps = ["データを取得", "集計", "レポートを整形"]
    try:
        for i, name in enumerate(steps, start=1):
            await asyncio.sleep(1)  # 本来はここで重い非同期 I/O
            yield Progress(
                type="progress",
                step=i,
                total=len(steps),
                message=f"{name} 完了",
            )
        yield Done(type="done", result="全ステップ完了")
    except Exception as e:
        # 本番では str(e) ではなく定型文にする
        yield Error(type="error", message=str(e))

POSTを使用する

import asyncio
from collections.abc import AsyncIterable
from typing import Union

from fastapi import FastAPI
from fastapi.sse import EventSourceResponse
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel

app = FastAPI()

# 許可するオリジンの列挙
origins = [
    "http://localhost:5500",
    "http://127.0.0.1:5500",
]

# CORS の設定
app.add_middleware(
    CORSMiddleware,
    allow_origins=origins,
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

class Question(BaseModel):
    text: str


class Progress(BaseModel):
    type: str
    step: int
    total: int
    message: str


class Done(BaseModel):
    type: str
    result: str


@app.post("/ask", response_class=EventSourceResponse)
async def ask(question: Question) -> AsyncIterable[Union[Progress, Done]]:
    steps = ["データを取得", "集計", "レポートを整形"]
    for i, name in enumerate(steps, start=1):
        await asyncio.sleep(1)
        yield Progress(
            type="progress",
            step=i,
            total=len(steps),
            message=f"{name} 完了(質問: {question.text})",
        )
    yield Done(type="done", result="全ステップ完了")

サーバーの起動方法

サーバーは次のコマンドで起動します。

fastapi dev main.py
# uv の場合
uv run fastapi dev main.py

ブラウザで受け取る

GETおよびPOSTのリクエストに対するレスポンスをブラウザで受けとるために、それぞれEventSourcefetch + getReaderを使用する方法を示します。

EventSourceを使う

受信側には、ブラウザ標準のEventSourceを使うのが最も簡単です。EventSourceResponseは標準的なSSE形式で配信するため、EventSourceでそのまま受け取れます。

ただし、EventSourceにはURLしか渡せないため、対象は常にGETのエンドポイントに限られます。POSTを送りたい場合には、そのレスポンスをEventSourceでは受けられません(その方法は次節で扱います)。index.htmlという名前で保存します。

<!DOCTYPE html>
<html lang="ja">
<head>
  <meta charset="UTF-8">
  <title>SSE 受信テスト</title>
</head>
<body>
  <h1>SSE 受信テスト</h1>
  <pre id="log"></pre>

  <script>
    const logEl = document.getElementById("log");
    const es = new EventSource("http://127.0.0.1:8000/stream");

    es.onmessage = (e) => {
      const data = JSON.parse(e.data);
      console.log(data.type, data);
      logEl.textContent += JSON.stringify(data) + "\n";
      if (data.type === "done" || data.type === "error") {
        es.close();   // 自動再接続を止める
      }
    };

    es.onerror = () => { es.close(); };
  </script>
</body>
</html>

EventSource は処理が終わっても接続を開いたままにし、勝手に再接続を試みます。完了・エラーのイベントを受け取ったら close() で明示的に閉じます。

バックエンドはGET用の全体コードを使用します。フロントエンドは簡易サーバーを以下のコマンドで立ち上げます。

python -m http.server 5500
# uv を使う場合
uv run python -m http.server 5500

http://127.0.0.1:5500/にアクセスしてブラウザで表示して、以下の文字列が出力されれば成功です。

{"type":"progress","step":1,"total":3,"message":"データを取得 完了"}
{"type":"progress","step":2,"total":3,"message":"集計 完了"}
{"type":"progress","step":3,"total":3,"message":"レポートを整形 完了"}
{"type":"done","result":"全ステップ完了"}

補足:スクリプトが実行されるタイミング

new EventSource(...)onmessage の登録は、ページ読み込み時に 1 回だけ実行されてすぐ終わります。その後サーバーからイベントが届くたびに、ブラウザが裏側で「どの EventSource の、どの種類のイベントか」を見て、対応する onmessage(や onerror)だけを呼び出します。コード自体がループしているわけではなく、イベントが来るたびにブラウザから呼び出される、という形です。

fetch + getReaderを使う

EventSourceはGET専用なので、リクエストボディを送れません。POSTで送ってからSSEを受け取りたい場合は、EventSourceではなくfetchでストリームを読みます。EventSourceResponsePOSTのエンドポイントでも使えます。

受信側は、fetchでストリームを取得し、届いたチャンクを自分でつなぎ合わせて、SSEのメッセージ(空行で区切られたまとまり)に分けてからパースします。EventSourceResponseが送るのは標準的なSSE形式なので、data:行を取り出してJSONとして読みます。index.htmlという名前で保存します。

<!DOCTYPE html>
<html lang="ja">
<head>
  <meta charset="UTF-8">
  <title>SSE POST 受信テスト</title>
</head>
<body>
  <h1>SSE POST 受信テスト</h1>
  <input id="question" type="text" value="テスト質問" />
  <button id="btn">送信</button>
  <pre id="log"></pre>

  <script>
    document.getElementById("btn").addEventListener("click", async () => {
      const questionText = document.getElementById("question").value;
      const logEl = document.getElementById("log");
      logEl.textContent = "";

      const res = await fetch("http://127.0.0.1:8000/ask", {
        method: "POST",
        headers: { "Content-Type": "application/json" },
        body: JSON.stringify({ text: questionText }),
      });

      const reader = res.body.getReader();
      const decoder = new TextDecoder();
      let buffer = "";

      while (true) {
        const { value, done } = await reader.read();
        if (done) break;

        // 届いたバイト列を文字列にしてバッファに溜める
        buffer += decoder.decode(value, { stream: true });

        // 空行(\n\n)でイベントごとに区切る
        const parts = buffer.split("\n\n");
        // 最後の要素はまだ途中の可能性があるのでバッファに残す
        buffer = parts.pop();

        for (const part of parts) {
          // 1 つのイベントの中から data: 行だけを取り出す
          const line = part.split("\n").find((l) => l.startsWith("data:"));
          if (!line) continue;   // : で始まるコメント行(キープアライブ)などは無視

          const json = line.slice("data:".length).trim();
          const event = JSON.parse(json);

          logEl.textContent += JSON.stringify(event) + "\n";
          if (event.type === "done" || event.type === "error") {
            return;
          }
        }
      }
    });
  </script>
</body>
</html>

fetchres.body.getReader()で、レスポンスボディを少しずつ読み取るリーダーを得ます。reader.read()は、届いたチャンク(value)と、ストリームが終わったかどうか(done)を返します。

TextDecoderdecode(value, { stream: true })で、届いたバイト列を文字列に変換します。{ stream: true }を付けると、マルチバイト文字が途中で切れても、次のチャンクと正しくつないでくれます。

ネットワークのチャンクは、SSEのメッセージの区切りとは無関係に分割されて届きます。そのため、文字列をバッファに溜めながら、空行(\n\n)でメッセージに区切ります。区切った最後の要素は、まだ途中までしか届いていない可能性があるため、バッファに残して次のチャンクと結合します。

各メッセージの中からdata:で始まる行を取り出し、data:を除いた部分をJSON.parseで読みます。:で始まるコメント行(キープアライブ)はdata:で始まらないため、自然に無視されます。

この方式なら、POSTでボディを送りつつSSEを受け取れます。

バックエンドはPOST用の全体コードを使用します。フロントエンドは簡易サーバーを以下のコマンドで立ち上げます。

python -m http.server 5500
# uv を使う場合
uv run python -m http.server 5500

http://127.0.0.1:5500/にアクセスしてブラウザで表示して、以下の文字列が出力されれば成功です。

{"type":"progress","step":1,"total":3,"message":"データを取得 完了(質問: テスト質問)"}
{"type":"progress","step":2,"total":3,"message":"集計 完了(質問: テスト質問)"}
{"type":"progress","step":3,"total":3,"message":"レポートを整形 完了(質問: テスト質問)"}
{"type":"done","result":"全ステップ完了"}

補足1:SSEメッセージでeventやidを設定したい場合

ここまでは、辞書やPydanticモデルをそのままyieldし、その内容をdata:フィールドに入れて送ってきました。data:以外のSSEのフィールド(eventidretrycomment)も設定したいときは、データをそのままyieldする代わりに、fastapi.sseServerSentEventyieldします。ServerSentEventdataに辞書を渡す形が基本です。

from collections.abc import AsyncIterable

from fastapi import FastAPI
from fastapi.sse import EventSourceResponse, ServerSentEvent

app = FastAPI()


@app.get("/stream", response_class=EventSourceResponse)
async def stream() -> AsyncIterable[ServerSentEvent]:
    yield ServerSentEvent(comment="stream start") # コメント行(: で始まる行)
    for i in range(1, 4):
        yield ServerSentEvent(
            data={"step": i},  # data は自動で JSON 化される
            event="progress",  # event: フィールド
            id=str(i),         # id: フィールド
        )

dataに渡した値はJSONへ変換されます。JSON化せずに生の文字列を送りたい場合(整形済みのログ行など)は、dataの代わりにraw_dataを使います。dataraw_dataは、どちらか一方だけを指定します。

本記事の本筋(typeフィールドで種類を区別する方式)ではServerSentEventは必須ではありませんが、idを付けて再接続時に続きから送りたい場合などに使えます。

これまでと同様にバックエンドを起動した後に、curl -Nでテストすると次の結果が得られます。

: stream start

event: progress
data: {"step": 1}
id: 1

event: progress
data: {"step": 2}
id: 2

event: progress
data: {"step": 3}
id: 3

このようにSSEメッセージに、event:id:が設定されていることを確認できます。

補足2:SSEの生形式を手で組む場合

本記事ではEventSourceResponsedata:行や空行を自動で組み立ててくれるため、辞書やPydanticモデルをyieldするだけで済みました。仕組みを理解しておきたい場合のために、SSEの生形式を手で組むとどうなるかを示します。

EventSourceResponseを使わず、より低レベルのStreamingResponsefastapi.responsesにあります)を使うと、yieldした文字列がそのままレスポンスボディとして送られます。この場合、SSEの形式(data:と末尾の空行)は自分で組み立てる必要があります。

import asyncio
import json
from fastapi import FastAPI
from fastapi.responses import StreamingResponse

app = FastAPI()


def sse_format(payload: dict) -> str:
    # data: の行と、メッセージの区切りである空行(\n\n)を自分で付ける
    return f"data: {json.dumps(payload, ensure_ascii=False)}\n\n"


async def event_generator():
    steps = ["データを取得", "集計", "レポートを整形"]
    for i, name in enumerate(steps, start=1):
        await asyncio.sleep(1)
        yield sse_format({"type": "progress", "step": i, "message": f"{name} 完了"})
    yield sse_format({"type": "done"})


@app.get("/stream")
async def stream():
    return StreamingResponse(
        event_generator(),
        media_type="text/event-stream",
    )

この方法では、data:の文字列や末尾の\n\n、さらに(必要なら)キープアライブやCache-Controlなどのヘッダーも、すべて自分で用意することになります。EventSourceResponseは、これらを肩代わりしてくれる仕組みだと理解すると分かりやすいです。なお、fastapi.sseEventSourceResponse)はFastAPI 0.135.0で追加されたものなので、これより古いバージョンを使わざるを得ない場合は、このStreamingResponseによる手組みが選択肢になります。

これまでと同様にcurl -Nでテストすると次のように表示されます。

data: {"type": "progress", "step": 1, "message": "データを取得 完了"}
(空行)
data: {"type": "progress", "step": 2, "message": "集計 完了"}
(空行)
data: {"type": "progress", "step": 3, "message": "レポートを整形 完了"}
(空行)
data: {"type": "done"}
(空行)

まとめ

本記事では、FastAPIのSSEサポート(fastapi.sse)を使って、処理の途中経過をストリーミング配信する方法を解説しました。要点を以下にまとめます。

  • SSEは、サーバーからクライアントへの一方向の配信に向いた、ふつうのHTTPで動く仕組みです。双方向が要らないならWebSocketより手軽です。
  • FastAPI 0.135.0以降では、fastapi.sseEventSourceResponseresponse_classに指定し、エンドポイントの中で辞書やPydanticモデルをyieldするだけでSSEを配信できます。yieldした値は自動でJSON化され、data:に入ります。
  • キープアライブ(15秒ごとのping)・Cache-Control: no-cacheX-Accel-Buffering: noは標準で自動なので、自分で書く必要はありません。
  • 複数の種類のイベントは、data:のJSONにtypeを持たせて区別すると、EventSourceのerrorイベントと衝突せず扱えます。
  • ストリーミングでは途中でステータスを変えられないので、エラーはtypeerrorのイベントとして流します。
  • 受信側は、GETならEventSourceが最も簡単です。POSTでボディを送りたい場合は、fetchgetReader()でストリームを読み、空行で区切ってdata:行をパースします。
  • eventidを設定したいときはServerSentEventyieldします。

参考資料

-Technology
-,