非同期にリクエストを投げる HTTP クライアントライブラリはすでに存在する
(e.g. httpx)が、
今回はあえて標準ライブラリだけで挑戦してみる。
利用したバージョンは Python 3.12.1。
標準ライブラリの asyncio モジュールには非同期にストリームを操作する関数やクラスが定義されている。
asyncio モジュールのストリームを使うと、async/await を使ってデータの送受信ができる。
asyncio.open_connection()
でネットワーク接続を確立すると、 ストリームに読み書きするための StreamReader
、StreamWriter
を返り値で取得できる。
1
2
3
4
|
import asyncio
reader, writer = await asyncio.open_connection(host="httpbin.org", port=443, ssl=True)
|
汎用的な書き方をすると次のようになる。
1
2
3
4
5
6
7
8
9
10
11
12
|
import asyncio
import urllib.parse
async def http_get(url: str):
url_split = urllib.parse.urlsplit(url)
port = url_split.port or (443 if url_split.scheme == "https" else 80)
reader, writer = await asyncio.open_connection(
host=url_split.hostname, port=port, ssl=(port == 443)
)
# do something
|
StreamWriter.write()
を使って書き込むことでリクエストができる。
HTTP 用の便利なメソッドなどは無いので、HTTP の仕様に則ってリクエストを書き込んでいく。
1
2
3
4
5
6
|
query = (
f"GET /headers HTTP/1.1\r\n"
f"Host: httpbin.org\r\n"
f"\r\n"
)
writer.write(query.encode("ascii"))
|
StreamReader.readline()
や
StreamReader.read()
でレスポンスを読み取る。
1
|
(await reader.readline()).decode("ascii").rstrip()
|
当然ここでも HTTP 用のメソッドなどないので、HTTP の仕様に従ってレスポンスを読み取っていく。
HTTP レスポンスの1行目はステータスで、その次の行から空行までがヘッダーになる。
ストリームから読み取ったデータはただのテキストなので必要に応じてパースしないといけない。
また、HTTP ヘッダーのキーはケースインセンシティブであることに注意する。
レスポンスヘッダー部分を読み取るコードを関数にまとめると次のようになる。
1
2
3
4
5
6
7
8
9
10
|
async def read_headers(reader: asyncio.StreamReader):
headers = {}
while True:
line = (await reader.readline()).decode("ascii").rstrip()
if not line:
break
[key, value] = line.split(":", maxsplit=1)
headers[key.lower()] = value.strip()
return headers
|
レスポンスボディは content-length
ヘッダーで指定されたサイズを StreamReader.read()
で読み取る。
1
2
3
4
5
6
|
headers = await read_headers(reader)
content_length = int(headers["content-length"])
raw_body = await reader.read(content_length)
writer.close()
await writer.wait_closed()
|
レスポンスボディを読み終えたら StreamWriter.close()
を呼び出してネットワーク接続を閉じている。
StreamWriter.close()
は接続自体を閉じるので、その後は対応する StreamReader
も使用できなくなる。
したがって書き込み(リクエストの送信)を終えた時点ではなく、レスポンスの読み取りを終えたところで呼び出さないといけない。
以上をふまえて非同期に HTTP リクエストを投げるサンプルコードを次のように実装した。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
|
from argparse import ArgumentParser
import asyncio
import base64
import json
import urllib.parse
async def http_get(url: str):
url_split = urllib.parse.urlsplit(url)
port = url_split.port or (443 if url_split.scheme == "https" else 80)
reader, writer = await asyncio.open_connection(
host=url_split.hostname, port=port, ssl=(port == 443)
)
query = (
f"GET {url_split.path or '/'} HTTP/1.1\r\n"
f"Host: {url_split.hostname}\r\n"
f"\r\n"
)
print("===== request =====")
writer.write(query.encode("ascii"))
status_line = (await reader.readline()).decode("ascii").rstrip()
print("===== response status =====")
print(status_line)
headers = await read_headers(reader)
print("\n===== response headers =====")
print(json.dumps(headers, indent=2))
content_length = int(headers["content-length"])
raw_body = await reader.read(content_length)
writer.close()
await writer.wait_closed()
print("\n===== response body =====")
(mime, charset) = parse_content_type(headers["content-type"])
if mime:
(media, sub_type) = parse_mime_type(mime)
if media == "application" and sub_type == "json":
body_json = json.loads(raw_body)
print(json.dumps(body_json, indent=2))
elif media == "text":
print(raw_body.decode(charset or "ascii"))
else:
print(base64.standard_b64encode(raw_body))
else:
print(base64.standard_b64encode(raw_body))
async def read_headers(reader: asyncio.StreamReader):
headers = {}
while True:
line = (await reader.readline()).decode("ascii").rstrip()
if not line:
break
[key, value] = line.split(":", maxsplit=1)
headers[key.lower()] = value.strip()
return headers
def parse_content_type(content_type: str) -> tuple[str | None, str | None]:
return tuple(
[*map(str.strip, content_type.split(";")), None, None][:2]
) # type: ignore
def parse_mime_type(mime_type: str) -> tuple[str | None, str | None]:
return tuple([*mime_type.split("/"), None, None][:2]) # type: ignore
async def another_task():
for i in range(3):
await asyncio.sleep(1)
print(f"another task: {i}")
async def main():
parser = ArgumentParser("http get")
parser.add_argument("url", type=str)
args = parser.parse_args()
async with asyncio.TaskGroup() as tg:
tg.create_task(http_get(args.url))
tg.create_task(another_task())
if __name__ == "__main__":
asyncio.run(main())
|
実行例は次のようになる。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
|
$ python main.py 'https://httpbin.org/delay/3'
===== request =====
another task: 0
another task: 1
another task: 2
===== response status =====
HTTP/1.1 200 OK
===== response headers =====
{
"date": "Sat, 23 Mar 2024 03:52:04 GMT",
"content-type": "application/json",
"content-length": "249",
"connection": "keep-alive",
"server": "gunicorn/19.9.0",
"access-control-allow-origin": "*",
"access-control-allow-credentials": "true"
}
===== response body =====
{
"args": {},
"data": "",
"files": {},
"form": {},
"headers": {
"Host": "httpbin.org",
"X-Amzn-Trace-Id": "Root=1-65fe51e1-2001f80d7ff3b6641e479b0c"
},
"origin": "106.73.25.129",
"url": "https://httpbin.org/delay/3"
}
|
目標であった、標準ライブラリだけでの非同期 HTTP リクエストができた。