非同期にリクエストを投げる HTTP クライアントライブラリはすでに存在する
(e.g. httpx )が、
今回はあえて標準ライブラリだけで挑戦してみる。
利用したバージョンは Python 3.12.1。
標準ライブラリの asyncio モジュールには非同期にストリーム を操作する関数やクラスが定義されている。
asyncio モジュールのストリームを使うと、async/await を使ってデータの送受信ができる。
asyncio.open_connection()
でネットワーク接続を確立すると、 ストリームに読み書きするための StreamReader
、StreamWriter
を返り値で取得できる。
1
2
3
4
import asyncio
reader , writer = await asyncio . open_connection ( host = "httpbin.org" , port = 443 , ssl = True )
汎用的な書き方をすると次のようになる。
1
2
3
4
5
6
7
8
9
10
11
12
import asyncio
import urllib.parse
async def http_get ( url : str ):
url_split = urllib . parse . urlsplit ( url )
port = url_split . port or ( 443 if url_split . scheme == "https" else 80 )
reader , writer = await asyncio . open_connection (
host = url_split . hostname , port = port , ssl = ( port == 443 )
)
# do something
StreamWriter.write()
を使って書き込むことでリクエストができる。
HTTP 用の便利なメソッドなどは無いので、HTTP の仕様に則ってリクエストを書き込んでいく。
1
2
3
4
5
6
query = (
f "GET /headers HTTP/1.1 \r\n "
f "Host: httpbin.org \r\n "
f " \r\n "
)
writer . write ( query . encode ( "ascii" ))
StreamReader.readline()
や
StreamReader.read()
でレスポンスを読み取る。
1
( await reader . readline ()) . decode ( "ascii" ) . rstrip ()
当然ここでも HTTP 用のメソッドなどないので、HTTP の仕様に従ってレスポンスを読み取っていく。
HTTP レスポンスの1行目はステータスで、その次の行から空行までがヘッダーになる。
ストリームから読み取ったデータはただのテキストなので必要に応じてパースしないといけない。
また、HTTP ヘッダーのキーはケースインセンシティブであることに注意する。
レスポンスヘッダー部分を読み取るコードを関数にまとめると次のようになる。
1
2
3
4
5
6
7
8
9
10
async def read_headers ( reader : asyncio . StreamReader ):
headers = {}
while True :
line = ( await reader . readline ()) . decode ( "ascii" ) . rstrip ()
if not line :
break
[ key , value ] = line . split ( ":" , maxsplit = 1 )
headers [ key . lower ()] = value . strip ()
return headers
レスポンスボディは content-length
ヘッダーで指定されたサイズを StreamReader.read()
で読み取る。
1
2
3
4
5
6
headers = await read_headers ( reader )
content_length = int ( headers [ "content-length" ])
raw_body = await reader . read ( content_length )
writer . close ()
await writer . wait_closed ()
レスポンスボディを読み終えたら StreamWriter.close()
を呼び出してネットワーク接続を閉じている。
StreamWriter.close()
は接続自体を閉じるので、その後は対応する StreamReader
も使用できなくなる。
したがって書き込み(リクエストの送信)を終えた時点ではなく、レスポンスの読み取りを終えたところで呼び出さないといけない。
以上をふまえて非同期に HTTP リクエストを投げるサンプルコードを次のように実装した。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
from argparse import ArgumentParser
import asyncio
import base64
import json
import urllib.parse
async def http_get ( url : str ):
url_split = urllib . parse . urlsplit ( url )
port = url_split . port or ( 443 if url_split . scheme == "https" else 80 )
reader , writer = await asyncio . open_connection (
host = url_split . hostname , port = port , ssl = ( port == 443 )
)
query = (
f "GET { url_split . path or '/' } HTTP/1.1 \r\n "
f "Host: { url_split . hostname } \r\n "
f " \r\n "
)
print ( "===== request =====" )
writer . write ( query . encode ( "ascii" ))
status_line = ( await reader . readline ()) . decode ( "ascii" ) . rstrip ()
print ( "===== response status =====" )
print ( status_line )
headers = await read_headers ( reader )
print ( " \n ===== response headers =====" )
print ( json . dumps ( headers , indent = 2 ))
content_length = int ( headers [ "content-length" ])
raw_body = await reader . read ( content_length )
writer . close ()
await writer . wait_closed ()
print ( " \n ===== response body =====" )
( mime , charset ) = parse_content_type ( headers [ "content-type" ])
if mime :
( media , sub_type ) = parse_mime_type ( mime )
if media == "application" and sub_type == "json" :
body_json = json . loads ( raw_body )
print ( json . dumps ( body_json , indent = 2 ))
elif media == "text" :
print ( raw_body . decode ( charset or "ascii" ))
else :
print ( base64 . standard_b64encode ( raw_body ))
else :
print ( base64 . standard_b64encode ( raw_body ))
async def read_headers ( reader : asyncio . StreamReader ):
headers = {}
while True :
line = ( await reader . readline ()) . decode ( "ascii" ) . rstrip ()
if not line :
break
[ key , value ] = line . split ( ":" , maxsplit = 1 )
headers [ key . lower ()] = value . strip ()
return headers
def parse_content_type ( content_type : str ) -> tuple [ str | None , str | None ]:
return tuple (
[ * map ( str . strip , content_type . split ( ";" )), None , None ][: 2 ]
) # type: ignore
def parse_mime_type ( mime_type : str ) -> tuple [ str | None , str | None ]:
return tuple ([ * mime_type . split ( "/" ), None , None ][: 2 ]) # type: ignore
async def another_task ():
for i in range ( 3 ):
await asyncio . sleep ( 1 )
print ( f "another task: { i } " )
async def main ():
parser = ArgumentParser ( "http get" )
parser . add_argument ( "url" , type = str )
args = parser . parse_args ()
async with asyncio . TaskGroup () as tg :
tg . create_task ( http_get ( args . url ))
tg . create_task ( another_task ())
if __name__ == "__main__" :
asyncio . run ( main ())
実行例は次のようになる。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
$ python main.py 'https://httpbin.org/delay/3'
===== request =====
another task: 0
another task: 1
another task: 2
===== response status =====
HTTP/1.1 200 OK
===== response headers =====
{
"date" : "Sat, 23 Mar 2024 03:52:04 GMT" ,
"content-type" : "application/json" ,
"content-length" : "249" ,
"connection" : "keep-alive" ,
"server" : "gunicorn/19.9.0" ,
"access-control-allow-origin" : "*" ,
"access-control-allow-credentials" : "true"
}
===== response body =====
{
"args" : {} ,
"data" : "" ,
"files" : {} ,
"form" : {} ,
"headers" : {
"Host" : "httpbin.org" ,
"X-Amzn-Trace-Id" : "Root=1-65fe51e1-2001f80d7ff3b6641e479b0c"
} ,
"origin" : "106.73.25.129" ,
"url" : "https://httpbin.org/delay/3"
}
目標であった、標準ライブラリだけでの非同期 HTTP リクエストができた。