Python深入-16-异步编程

一、术语

1.1 原生协程

使用 async def 定义的协程函数。在原生协程内可以使用 await 关键字委托另一个原生协程,这类似于在经典协程中使用 yield fromasync def 语句定义的始终是原生协程,即使主体中没有使用 await 关键字。await 关键字不能在原生协程外部使用。

1.2 经典协程

 一种生成器函数,在表达式中使用 `yield` 读取 `my_coro.send(data)` 调用发送的数据。经典协程可以使用 `yield from` 委托其他经典协程。经典协程不能由 `await` 驱动,而且 `asyncio`库不再支持。 

1.3 基于生成器的协程

一种使用 @types.coroutine(Python 3.5 引入)装饰的生成器函数。使用这个装饰器的生成器与新增的 await 关键字兼容。本章重点讨论原生协程和异步生成器。

1.4 异步生成器 一种使用

async def 定义,而且在主体中使用 yield 的生成器函数。返回一个提供 __anext__ 方 法(获取下一项)的异步生成器对象。

二、 一个 asyncio 示例:探测域名

#!/usr/bin/env python3
import asyncio
import socket
from keyword import kwlist

MAX_KEYWORD_LEN = 4  # 设置关键字的最大长度,因为域名越短越好


async def probe(domain: str) -> tuple[str, bool]:  # probe 返回一个元组,包含域名和一个布尔值。True 表示域名可解析。同时返回域名方便显示结果
    loop = asyncio.get_running_loop()  # 获取 asyncio 事件循环的引用,供后面使用。
    try:
        await loop.getaddrinfo(domain, None)  # 协程方法 loop.getaddrinfo(...) 返回一个五元组,使用套接字连接指定的地址。在这个示例中,我们不需要返回的结果。如果有结果返回,则说明域名可解析;否则不可解析
    except socket.gaierror:
        return (domain, False)
    return (domain, True)


async def main() -> None:  # main 必定是一个协程,因此可以在主体中使用 await
    names = (kw for kw in kwlist if len(kw) <= MAX_KEYWORD_LEN)  # 一个生成器,产出长度不超过 MAX_KEYWORD_LEN 的 Python 关键字
    domains = (f'{name}.dev'.lower() for name in names)  # 一个生成器,产出后缀为 .dev 的域名
    coros = [probe(domain) for domain in domains]  # 调用 probe 协程,传入各个 domain,构建一个协程对象列表
    for coro in asyncio.as_completed(coros):  # asyncio.as_completed 是一个生成器,产出协程,按照传入的协程完成的顺序(不是协程的提交顺序)返回结果。作用类似于示例 20-4 中用过的 futures.as_completed。
        domain, found = await coro  # 此时,我们知道协程已经结束,因为 as_completed 就是这个作用。因此,await 表达式不阻塞,但是我们需要从 coro 中获取结果。若 coro 抛出的异常未被处理,自然在这里重新抛出
        mark = '+' if found else ' '
        print(f'{mark} {domain}')


if __name__ == '__main__':
    asyncio.run(main())  # asyncio.run 启动事件循环,仅当事件循环退出后返回。使用 asyncio 的脚本经常这样做,即把main 实现为协程,在 if __name__ == '__main__': 块中使用 asyncio.run 驱动。

asyncio.get_running_loop 函数在 Python 3.7 中新增,供协程内部使用,例如这里的 probe。如果没有运行中的循环,那么 asyncio.get_running_loop 抛出 RuntimeErrorasyncio.get_running_loop 的实现比 asyncio.get_event_loop 更简单, 速度也更快。asyncio.get_event_loop 在必要时会启动事件循环。从 Python 3.10 开 始,asyncio.get_event_loop 已被弃用,最终将变成 asyncio.get_running_loop 的别名。

三、 可异步调用对象

for 关键字处理可迭代对象,await 关键字处理可异步调用对象。
作为 asyncio 库的终端用户,日常可见到以下两种可异步调用对象。

  • 原生协程对象,通过调用原生协程函数得到。

  • asyncio.Task,通常由把协程对象传给 asyncio.create_task() 得到。

然而,终端用户编写的代码不一定要使用 await 处理 Task,还可以使用 asyncio.create_task(one_coro())调度 one_coro 并发执行,不等待它返回。我们在 spinner_async.py 中的 spinner协程内就是这么做的。如果不打算取消或等待任务,则无 须保存 create_task 返回的 Task 对象。仅仅创建任务就能调度协程运行。
相比之下,使用 await other_coro() 立即运行 other_coro,等待协程运行完毕,因为继续向下执行 之前需要协程返回的结果。在 spinner_async.py 中,supervisor 协程使用 res = await slow() 执行 slow 并获得结果。
实现异步库,或者为 asyncio 库做贡献时,可能还要处理以下底层的可异步调用对象。

  • 提供 await 方法、返回一个迭代器的对象;例如,asyncio.Future 实例(asyncio.Taskasyncio.Future 的子类)。

  • 以其他语言编写的对象,使用 Python/C API,提供 tp_as_async.am_await 函数(类似于 await 方法),返回一个迭代器。

现存基准代码或许还有一种可异步调用对象——基于生成器的协程对象——正在走弃用流程。

四、 增强 asyncio 版下载脚本的功能

4.1  使用 asyncio.as_completed 和一个线程

#!/usr/bin/env python3

"""Download flags of countries (with error handling).

asyncio async/await version

"""

# tag::FLAGS2_ASYNCIO_TOP[]
import asyncio
from collections import Counter
from http import HTTPStatus
from pathlib import Path

import httpx
import tqdm  # type: ignore

from flags2_common import main, DownloadStatus, save_flag

# low concurrency default to avoid errors from remote site,
# such as 503 - Service Temporarily Unavailable
DEFAULT_CONCUR_REQ = 5
MAX_CONCUR_REQ = 1000

async def get_flag(client: httpx.AsyncClient,  # <1>
                   base_url: str,
                   cc: str)
 -> bytes:

    url = f'{base_url}/{cc}/{cc}.gif'.lower()
    resp = await client.get(url, timeout=3.1, follow_redirects=True)   # <2>
    resp.raise_for_status()
    return resp.content

async def download_one(client: httpx.AsyncClient,
                       cc: str,
                       base_url: str,
                       semaphore: asyncio.Semaphore,
                       verbose: bool)
 -> DownloadStatus:

    try:
        async with semaphore:  # <3>
            image = await get_flag(client, base_url, cc)
    except httpx.HTTPStatusError as exc:  # <4>
        res = exc.response
        if res.status_code == HTTPStatus.NOT_FOUND:
            status = DownloadStatus.NOT_FOUND
            msg = f'not found: {res.url}'
        else:
            raise
    else:
        await asyncio.to_thread(save_flag, image, f'{cc}.gif')  # <5>
        status = DownloadStatus.OK
        msg = 'OK'
    if verbose and msg:
        print(cc, msg)
    return status
# end::FLAGS2_ASYNCIO_TOP[]

# tag::FLAGS2_ASYNCIO_START[]
async def supervisor(cc_list: list[str],
                     base_url: str,
                     verbose: bool,
                     concur_req: int)
 -> Counter[DownloadStatus]:
  # <1>
    counter: Counter[DownloadStatus] = Counter()
    semaphore = asyncio.Semaphore(concur_req)  # <2>
    async with httpx.AsyncClient() as client:
        to_do = [download_one(client, cc, base_url, semaphore, verbose)
                 for cc in sorted(cc_list)]  # <3>
        to_do_iter = asyncio.as_completed(to_do)  # <4>
        if not verbose:
            to_do_iter = tqdm.tqdm(to_do_iter, total=len(cc_list))  # <5>
        error: httpx.HTTPError | None = None  # <6>
        for coro in to_do_iter:  # <7>
            try:
                status = await coro  # <8>
            except httpx.HTTPStatusError as exc:
                error_msg = 'HTTP error {resp.status_code} - {resp.reason_phrase}'
                error_msg = error_msg.format(resp=exc.response)
                error = exc  # <9>
            except httpx.RequestError as exc:
                error_msg = f'{exc} {type(exc)}'.strip()
                error = exc  # <10>
            except KeyboardInterrupt:
                break

            if error:
                status = DownloadStatus.ERROR  # <11>
                if verbose:
                    url = str(error.request.url)  # <12>
                    cc = Path(url).stem.upper()   # <13>
                    print(f'{cc} error: {error_msg}')
            counter[status] += 1

    return counter

def download_many(cc_list: list[str],
                  base_url: str,
                  verbose: bool,
                  concur_req: int)
 -> Counter[DownloadStatus]:

    coro = supervisor(cc_list, base_url, verbose, concur_req)
    counts = asyncio.run(coro)  # <14>

    return counts

if __name__ == '__main__':
    main(download_many, DEFAULT_CONCUR_REQ, MAX_CONCUR_REQ)
# end::FLAGS2_ASYNCIO_START[]

从 Python 3.9 开始,asyncio.to_thread 协程可以轻松地把文件 I/O 委托给 asyncio 提供的一个线程池。 
asyncio.Semaphore有一个内部计时器。每次使用 await处理协程方法 .acquire(),计时器递 减;每次调用 .release() 方法(不是协程,因为永不阻塞),计时器递增。计时器的初始值在实例 化 Semaphore 时设定。

五、 异步迭代和异步可迭代对象

async for 处理异步可迭代对象,即实现了 __aiter__ 的对象。然而,__aiter__ 必须是常 规方法(不是协程方法),而且必须返回一个异步迭代器。异步迭代器提供 __anext__ 协程方法,返回一个可异步调用对象,通常是一个协程对象。异步迭代器也 应实现 __aiter__,往往返回 self。  


原文始发于微信公众号(Python之家):Python深入-16-异步编程

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/198357.html

(0)
小半的头像小半

相关推荐

发表回复

登录后才能评论
极客之音——专业性很强的中文编程技术网站,欢迎收藏到浏览器,订阅我们!