Python 標準ライブラリだけで非同期に HTTP リクエストを投げる

非同期にリクエストを投げる HTTP クライアントライブラリはすでに存在する (e.g. httpx)が、 今回はあえて標準ライブラリだけで挑戦してみる。 利用したバージョンは Python 3.12.1。

Streams

標準ライブラリの asyncio モジュールには非同期にストリームを操作する関数やクラスが定義されている。 asyncio モジュールのストリームを使うと、async/await を使ってデータの送受信ができる。

接続の確立

asyncio.open_connection() でネットワーク接続を確立すると、 ストリームに読み書きするための StreamReaderStreamWriter を返り値で取得できる。

1
2
3
4
import asyncio


reader, writer = await asyncio.open_connection(host="httpbin.org", port=443, ssl=True)

汎用的な書き方をすると次のようになる。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
import asyncio
import urllib.parse


async def http_get(url: str):
    url_split = urllib.parse.urlsplit(url)
    port = url_split.port or (443 if url_split.scheme == "https" else 80)
    reader, writer = await asyncio.open_connection(
        host=url_split.hostname, port=port, ssl=(port == 443)
    )

    # do something

HTTP リクエストの送信

StreamWriter.write() を使って書き込むことでリクエストができる。 HTTP 用の便利なメソッドなどは無いので、HTTP の仕様に則ってリクエストを書き込んでいく。

1
2
3
4
5
6
query = (
    f"GET /headers HTTP/1.1\r\n"
    f"Host: httpbin.org\r\n"
    f"\r\n"
)
writer.write(query.encode("ascii"))

HTTP レスポンスの受信

StreamReader.readline()StreamReader.read() でレスポンスを読み取る。

1
(await reader.readline()).decode("ascii").rstrip()

当然ここでも HTTP 用のメソッドなどないので、HTTP の仕様に従ってレスポンスを読み取っていく。 HTTP レスポンスの1行目はステータスで、その次の行から空行までがヘッダーになる。 ストリームから読み取ったデータはただのテキストなので必要に応じてパースしないといけない。 また、HTTP ヘッダーのキーはケースインセンシティブであることに注意する。 レスポンスヘッダー部分を読み取るコードを関数にまとめると次のようになる。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
async def read_headers(reader: asyncio.StreamReader):
    headers = {}
    while True:
        line = (await reader.readline()).decode("ascii").rstrip()
        if not line:
            break

        [key, value] = line.split(":", maxsplit=1)
        headers[key.lower()] = value.strip()
    return headers

レスポンスボディは content-length ヘッダーで指定されたサイズを StreamReader.read() で読み取る。

1
2
3
4
5
6
headers = await read_headers(reader)
content_length = int(headers["content-length"])
raw_body = await reader.read(content_length)

writer.close()
await writer.wait_closed()

レスポンスボディを読み終えたら StreamWriter.close() を呼び出してネットワーク接続を閉じている。 StreamWriter.close() は接続自体を閉じるので、その後は対応する StreamReader も使用できなくなる。 したがって書き込み(リクエストの送信)を終えた時点ではなく、レスポンスの読み取りを終えたところで呼び出さないといけない。

実装

以上をふまえて非同期に HTTP リクエストを投げるサンプルコードを次のように実装した。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
from argparse import ArgumentParser
import asyncio
import base64
import json
import urllib.parse


async def http_get(url: str):
    url_split = urllib.parse.urlsplit(url)
    port = url_split.port or (443 if url_split.scheme == "https" else 80)
    reader, writer = await asyncio.open_connection(
        host=url_split.hostname, port=port, ssl=(port == 443)
    )

    query = (
        f"GET {url_split.path or '/'} HTTP/1.1\r\n"
        f"Host: {url_split.hostname}\r\n"
        f"\r\n"
    )
    print("===== request =====")
    writer.write(query.encode("ascii"))

    status_line = (await reader.readline()).decode("ascii").rstrip()
    print("===== response status =====")
    print(status_line)

    headers = await read_headers(reader)
    print("\n===== response headers =====")
    print(json.dumps(headers, indent=2))

    content_length = int(headers["content-length"])
    raw_body = await reader.read(content_length)

    writer.close()
    await writer.wait_closed()

    print("\n===== response body =====")
    (mime, charset) = parse_content_type(headers["content-type"])
    if mime:
        (media, sub_type) = parse_mime_type(mime)
        if media == "application" and sub_type == "json":
            body_json = json.loads(raw_body)
            print(json.dumps(body_json, indent=2))
        elif media == "text":
            print(raw_body.decode(charset or "ascii"))
        else:
            print(base64.standard_b64encode(raw_body))
    else:
        print(base64.standard_b64encode(raw_body))


async def read_headers(reader: asyncio.StreamReader):
    headers = {}
    while True:
        line = (await reader.readline()).decode("ascii").rstrip()
        if not line:
            break

        [key, value] = line.split(":", maxsplit=1)
        headers[key.lower()] = value.strip()
    return headers


def parse_content_type(content_type: str) -> tuple[str | None, str | None]:
    return tuple(
        [*map(str.strip, content_type.split(";")), None, None][:2]
    )  # type: ignore


def parse_mime_type(mime_type: str) -> tuple[str | None, str | None]:
    return tuple([*mime_type.split("/"), None, None][:2])  # type: ignore


async def another_task():
    for i in range(3):
        await asyncio.sleep(1)
        print(f"another task: {i}")


async def main():
    parser = ArgumentParser("http get")
    parser.add_argument("url", type=str)
    args = parser.parse_args()

    async with asyncio.TaskGroup() as tg:
        tg.create_task(http_get(args.url))
        tg.create_task(another_task())


if __name__ == "__main__":
    asyncio.run(main())

実行例は次のようになる。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
$ python main.py 'https://httpbin.org/delay/3'
===== request =====
another task: 0
another task: 1
another task: 2
===== response status =====
HTTP/1.1 200 OK

===== response headers =====
{
  "date": "Sat, 23 Mar 2024 03:52:04 GMT",
  "content-type": "application/json",
  "content-length": "249",
  "connection": "keep-alive",
  "server": "gunicorn/19.9.0",
  "access-control-allow-origin": "*",
  "access-control-allow-credentials": "true"
}

===== response body =====
{
  "args": {},
  "data": "",
  "files": {},
  "form": {},
  "headers": {
    "Host": "httpbin.org",
    "X-Amzn-Trace-Id": "Root=1-65fe51e1-2001f80d7ff3b6641e479b0c"
  },
  "origin": "106.73.25.129",
  "url": "https://httpbin.org/delay/3"
}

目標であった、標準ライブラリだけでの非同期 HTTP リクエストができた。