一、术语
1.1 原生协程
使用 async def
定义的协程函数。在原生协程内可以使用 await
关键字委托另一个原生协程,这类似于在经典协程中使用 yield from
。async def
语句定义的始终是原生协程,即使主体中没有使用 await
关键字。await
关键字不能在原生协程外部使用。
1.2 经典协程
一种生成器函数,在表达式中使用 `yield` 读取 `my_coro.send(data)` 调用发送的数据。经典协程可以使用 `yield from` 委托其他经典协程。经典协程不能由 `await` 驱动,而且 `asyncio`库不再支持。
1.3 基于生成器的协程
一种使用 @types.coroutine
(Python 3.5 引入)装饰的生成器函数。使用这个装饰器的生成器与新增的 await
关键字兼容。本章重点讨论原生协程和异步生成器。
1.4 异步生成器 一种使用
async def
定义,而且在主体中使用 yield
的生成器函数。返回一个提供 __anext__
方 法(获取下一项)的异步生成器对象。
二、 一个 asyncio 示例:探测域名
#!/usr/bin/env python3
import asyncio
import socket
from keyword import kwlist
MAX_KEYWORD_LEN = 4 # 设置关键字的最大长度,因为域名越短越好
async def probe(domain: str) -> tuple[str, bool]: # probe 返回一个元组,包含域名和一个布尔值。True 表示域名可解析。同时返回域名方便显示结果
loop = asyncio.get_running_loop() # 获取 asyncio 事件循环的引用,供后面使用。
try:
await loop.getaddrinfo(domain, None) # 协程方法 loop.getaddrinfo(...) 返回一个五元组,使用套接字连接指定的地址。在这个示例中,我们不需要返回的结果。如果有结果返回,则说明域名可解析;否则不可解析
except socket.gaierror:
return (domain, False)
return (domain, True)
async def main() -> None: # main 必定是一个协程,因此可以在主体中使用 await
names = (kw for kw in kwlist if len(kw) <= MAX_KEYWORD_LEN) # 一个生成器,产出长度不超过 MAX_KEYWORD_LEN 的 Python 关键字
domains = (f'{name}.dev'.lower() for name in names) # 一个生成器,产出后缀为 .dev 的域名
coros = [probe(domain) for domain in domains] # 调用 probe 协程,传入各个 domain,构建一个协程对象列表
for coro in asyncio.as_completed(coros): # asyncio.as_completed 是一个生成器,产出协程,按照传入的协程完成的顺序(不是协程的提交顺序)返回结果。作用类似于示例 20-4 中用过的 futures.as_completed。
domain, found = await coro # 此时,我们知道协程已经结束,因为 as_completed 就是这个作用。因此,await 表达式不阻塞,但是我们需要从 coro 中获取结果。若 coro 抛出的异常未被处理,自然在这里重新抛出
mark = '+' if found else ' '
print(f'{mark} {domain}')
if __name__ == '__main__':
asyncio.run(main()) # asyncio.run 启动事件循环,仅当事件循环退出后返回。使用 asyncio 的脚本经常这样做,即把main 实现为协程,在 if __name__ == '__main__': 块中使用 asyncio.run 驱动。
asyncio.get_running_loop
函数在 Python 3.7 中新增,供协程内部使用,例如这里的 probe
。如果没有运行中的循环,那么 asyncio.get_running_loop
抛出 RuntimeError
。asyncio.get_running_loop
的实现比 asyncio.get_event_loop
更简单, 速度也更快。asyncio.get_event_loop
在必要时会启动事件循环。从 Python 3.10 开 始,asyncio.get_event_loop
已被弃用,最终将变成 asyncio.get_running_loop
的别名。
三、 可异步调用对象
for 关键字处理可迭代对象,await 关键字处理可异步调用对象。
作为 asyncio 库的终端用户,日常可见到以下两种可异步调用对象。
-
原生协程对象,通过调用原生协程函数得到。
-
asyncio.Task,通常由把协程对象传给 asyncio.create_task() 得到。
然而,终端用户编写的代码不一定要使用 await 处理 Task,还可以使用 asyncio.create_task(one_coro())
调度 one_coro
并发执行,不等待它返回。我们在 spinner_async.py
中的 spinner
协程内就是这么做的。如果不打算取消或等待任务,则无 须保存 create_task 返回的 Task 对象。仅仅创建任务就能调度协程运行。
相比之下,使用 await other_coro()
立即运行 other_coro
,等待协程运行完毕,因为继续向下执行 之前需要协程返回的结果。在 spinner_async.py 中,supervisor 协程使用 res = await slow() 执行 slow 并获得结果。
实现异步库,或者为 asyncio
库做贡献时,可能还要处理以下底层的可异步调用对象。
-
提供
await
方法、返回一个迭代器的对象;例如,asyncio.Future
实例(asyncio.Task
是asyncio.Future
的子类)。 -
以其他语言编写的对象,使用 Python/C API,提供
tp_as_async.am_await
函数(类似于 await 方法),返回一个迭代器。
现存基准代码或许还有一种可异步调用对象——基于生成器的协程对象——正在走弃用流程。
四、 增强 asyncio 版下载脚本的功能
4.1 使用 asyncio.as_completed 和一个线程
#!/usr/bin/env python3
"""Download flags of countries (with error handling).
asyncio async/await version
"""
# tag::FLAGS2_ASYNCIO_TOP[]
import asyncio
from collections import Counter
from http import HTTPStatus
from pathlib import Path
import httpx
import tqdm # type: ignore
from flags2_common import main, DownloadStatus, save_flag
# low concurrency default to avoid errors from remote site,
# such as 503 - Service Temporarily Unavailable
DEFAULT_CONCUR_REQ = 5
MAX_CONCUR_REQ = 1000
async def get_flag(client: httpx.AsyncClient, # <1>
base_url: str,
cc: str) -> bytes:
url = f'{base_url}/{cc}/{cc}.gif'.lower()
resp = await client.get(url, timeout=3.1, follow_redirects=True) # <2>
resp.raise_for_status()
return resp.content
async def download_one(client: httpx.AsyncClient,
cc: str,
base_url: str,
semaphore: asyncio.Semaphore,
verbose: bool) -> DownloadStatus:
try:
async with semaphore: # <3>
image = await get_flag(client, base_url, cc)
except httpx.HTTPStatusError as exc: # <4>
res = exc.response
if res.status_code == HTTPStatus.NOT_FOUND:
status = DownloadStatus.NOT_FOUND
msg = f'not found: {res.url}'
else:
raise
else:
await asyncio.to_thread(save_flag, image, f'{cc}.gif') # <5>
status = DownloadStatus.OK
msg = 'OK'
if verbose and msg:
print(cc, msg)
return status
# end::FLAGS2_ASYNCIO_TOP[]
# tag::FLAGS2_ASYNCIO_START[]
async def supervisor(cc_list: list[str],
base_url: str,
verbose: bool,
concur_req: int) -> Counter[DownloadStatus]: # <1>
counter: Counter[DownloadStatus] = Counter()
semaphore = asyncio.Semaphore(concur_req) # <2>
async with httpx.AsyncClient() as client:
to_do = [download_one(client, cc, base_url, semaphore, verbose)
for cc in sorted(cc_list)] # <3>
to_do_iter = asyncio.as_completed(to_do) # <4>
if not verbose:
to_do_iter = tqdm.tqdm(to_do_iter, total=len(cc_list)) # <5>
error: httpx.HTTPError | None = None # <6>
for coro in to_do_iter: # <7>
try:
status = await coro # <8>
except httpx.HTTPStatusError as exc:
error_msg = 'HTTP error {resp.status_code} - {resp.reason_phrase}'
error_msg = error_msg.format(resp=exc.response)
error = exc # <9>
except httpx.RequestError as exc:
error_msg = f'{exc} {type(exc)}'.strip()
error = exc # <10>
except KeyboardInterrupt:
break
if error:
status = DownloadStatus.ERROR # <11>
if verbose:
url = str(error.request.url) # <12>
cc = Path(url).stem.upper() # <13>
print(f'{cc} error: {error_msg}')
counter[status] += 1
return counter
def download_many(cc_list: list[str],
base_url: str,
verbose: bool,
concur_req: int) -> Counter[DownloadStatus]:
coro = supervisor(cc_list, base_url, verbose, concur_req)
counts = asyncio.run(coro) # <14>
return counts
if __name__ == '__main__':
main(download_many, DEFAULT_CONCUR_REQ, MAX_CONCUR_REQ)
# end::FLAGS2_ASYNCIO_START[]
从 Python 3.9 开始,asyncio.to_thread
协程可以轻松地把文件 I/O 委托给 asyncio 提供的一个线程池。
asyncio.Semaphore
有一个内部计时器。每次使用 await
处理协程方法 .acquire()
,计时器递 减;每次调用 .release() 方法(不是协程,因为永不阻塞),计时器递增。计时器的初始值在实例 化 Semaphore 时设定。
五、 异步迭代和异步可迭代对象
async for
处理异步可迭代对象,即实现了 __aiter__
的对象。然而,__aiter__
必须是常 规方法(不是协程方法),而且必须返回一个异步迭代器。异步迭代器提供 __anext__
协程方法,返回一个可异步调用对象,通常是一个协程对象。异步迭代器也 应实现 __aiter__
,往往返回 self。
原文始发于微信公众号(Python之家):Python深入-16-异步编程
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/198357.html