当前位置:首页 > Python > 正文内容

Python asyncio 异步编程实战指南

admin3周前 (03-22)Python28

Python 的 asyncio 库自 Python 3.4 引入以来,已经成为异步编程的标准工具。相比于传统的多线程或多进程模型,asyncio 提供了更轻量级的并发方案,特别适合 I/O 密集型任务。本文将通过多个实战示例,深入探讨 asyncio 的核心概念和高级用法。

一、asyncio 核心概念

asyncio 的核心是事件循环(Event Loop)和协程(Coroutine)。事件循环负责调度和执行异步任务,而协程则是使用 async/await 语法定义的异步函数。理解这些概念是掌握 asyncio 的基础。

协程不同于普通函数,调用协程不会立即执行,而是返回一个协程对象。只有将协程交给事件循环运行时,它才会真正执行。这就是为什么我们需要使用 asyncio.run() 或 loop.run_until_complete() 来启动异步程序。

二、基础异步编程示例

让我们从一个简单的异步 HTTP 请求示例开始:


import asyncio
import aiohttp

async def fetch_data(session, url):
    async with session.get(url) as response:
        return await response.text()

async def main():
    urls = [
        'https://httpbin.org/delay/1',
        'https://httpbin.org/delay/1',
        'https://httpbin.org/delay/1'
    ]
    
    async with aiohttp.ClientSession() as session:
        tasks = [fetch_data(session, url) for url in urls]
        results = await asyncio.gather(*tasks)
        print(f"成功获取 {len(results)} 个响应")

asyncio.run(main())

这个例子展示了 asyncio 的基本用法:定义异步函数、创建任务、并发执行。注意我们使用了 aiohttp 库,它是专门为 asyncio 设计的 HTTP 客户端。

三、高级任务控制

asyncio 提供了多种任务控制机制,让我们能够更精细地管理异步操作。

1. 超时控制

在异步编程中,超时控制非常重要。asyncio 提供了 asyncio.wait_for() 来实现这一点:


import asyncio

async def slow_operation():
    await asyncio.sleep(5)
    return "操作完成"

async def main_with_timeout():
    try:
        result = await asyncio.wait_for(slow_operation(), timeout=2)
        print(result)
    except asyncio.TimeoutError:
        print("操作超时!")

asyncio.run(main_with_timeout())

2. 任务取消

有时我们需要主动取消正在运行的任务:


import asyncio

async def background_task():
    try:
        for i in range(10):
            await asyncio.sleep(1)
            print(f"进度: {i+1}/10")
    except asyncio.CancelledError:
        print("任务被取消")
        raise

async def main_with_cancellation():
    task = asyncio.create_task(background_task())
    await asyncio.sleep(3)
    task.cancel()
    
    try:
        await task
    except asyncio.CancelledError:
        print("主程序捕获取消异常")

asyncio.run(main_with_cancellation())

四、并发模式对比

asyncio 提供了多种并发执行模式,每种都有其适用场景。

1. asyncio.gather()

gather 是最常用的并发方式,它会并发执行所有任务并等待全部完成:


import asyncio

async def task_with_id(task_id, delay):
    await asyncio.sleep(delay)
    return f"任务 {task_id} 完成"

async def demonstrate_gather():
    tasks = [task_with_id(i, 1) for i in range(5)]
    results = await asyncio.gather(*tasks)
    print(results)  # 所有结果按顺序返回

asyncio.run(demonstrate_gather())

2. asyncio.wait()

wait 提供了更灵活的控制,可以指定返回条件:


import asyncio

async def demonstrate_wait():
    tasks = [asyncio.create_task(task_with_id(i, i)) for i in range(1, 5)]
    
    # 等待第一个任务完成
    done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
    
    print(f"已完成 {len(done)} 个任务")
    for task in done:
        print(task.result())
    
    # 取消剩余任务
    for task in pending:
        task.cancel()

asyncio.run(demonstrate_wait())

3. asyncio.as_completed()

as_completed 按完成顺序迭代结果:


import asyncio

async def demonstrate_as_completed():
    tasks = [task_with_id(i, i) for i in range(3, 0, -1)]
    
    for coro in asyncio.as_completed(tasks):
        result = await coro
        print(result)  # 按完成顺序输出

asyncio.run(demonstrate_as_completed())

五、实际应用:批量文件处理

让我们通过一个实际场景来展示 asyncio 的威力。假设我们需要处理多个大文件:


import asyncio
import aiofiles
import os

async def process_file(file_path):
    async with aiofiles.open(file_path, 'r') as f:
        content = await f.read()
    # 模拟文件处理
    await asyncio.sleep(0.5)
    return len(content)

async def batch_process_files(directory):
    files = [os.path.join(directory, f) 
             for f in os.listdir(directory) 
             if f.endswith('.txt')]
    
    tasks = [process_file(file) for file in files]
    sizes = await asyncio.gather(*tasks)
    
    total = sum(sizes)
    print(f"处理了 {len(files)} 个文件,总大小: {total} 字节")

六、性能优化技巧

1. 使用信号量控制并发数

在处理大量任务时,我们可能需要限制并发数量以避免资源耗尽:


import asyncio

semaphore = asyncio.Semaphore(3)  # 限制最多3个并发任务

async def controlled_task(task_id):
    async with semaphore:
        await asyncio.sleep(1)
        print(f"任务 {task_id} 完成")

async def limited_concurrency():
    tasks = [controlled_task(i) for i in range(10)]
    await asyncio.gather(*tasks)

asyncio.run(limited_concurrency())

2. 避免阻塞事件循环

在协程中执行 CPU 密集型任务会阻塞事件循环。解决方法是将这些任务放到线程池中:


import asyncio

async def blocking_task():
    def cpu_bound():
        total = 0
        for i in range(10**7):
            total += i
        return total
    
    # 将阻塞任务放到线程池
    loop = asyncio.get_running_loop()
    result = await loop.run_in_executor(None, cpu_bound)
    return result

async def main_with_executor():
    result = await blocking_task()
    print(f"计算结果: {result}")

asyncio.run(main_with_executor())

七、错误处理最佳实践

异步编程中的错误处理需要特别注意。gather 可以配置返回异常而不是抛出:


import asyncio

async def failing_task(task_id):
    if task_id == 2:
        raise ValueError("任务2失败")
    await asyncio.sleep(0.5)
    return f"任务 {task_id} 成功"

async def error_handling_example():
    tasks = [failing_task(i) for i in range(5)]
    results = await asyncio.gather(*tasks, return_exceptions=True)
    
    for i, result in enumerate(results):
        if isinstance(result, Exception):
            print(f"任务 {i} 失败: {result}")
        else:
            print(result)

asyncio.run(error_handling_example())

八、总结

asyncio 为 Python 提供了强大的异步编程能力。通过掌握事件循环、协程、任务控制等核心概念,以及正确使用各种并发模式,我们可以构建出高效的异步应用程序。在实际应用中,合理使用信号量控制并发、避免阻塞操作、做好错误处理,这些都是编写高质量异步代码的关键。

记住,异步编程不是银弹,它最适合 I/O 密集型任务。对于 CPU 密集型任务,考虑使用多进程或其他并发模型。选择正确的工具,才能发挥最大的性能优势。

相关文章

[Python 教程] NumPy 数组操作详解

NumPy 数组操作详解 NumPy 是 Python 科学计算的基础库,提供高性能的多维数组对象。本文详细介绍 NumPy 数组的核心操作。 一、创建数组 import numpy as np...

[Python 教程] Python 多线程编程指南

Python 多线程编程指南 Python 的 threading 模块提供多线程支持。本文介绍多线程编程的基础和实用技巧。 一、创建线程 import threading import time...

[Python 教程] Python 网络请求与爬虫基础

Python 网络请求与爬虫基础 requests 是 Python 最常用的 HTTP 库。本文介绍网络请求和爬虫的基础知识。 一、基础请求 import requests # GET 请求 r...

Python 上下文管理器实战:从 with 语句到自定义资源管理

在 Python 编程中,上下文管理器(Context Manager)是一个强大但常被低估的特性。当你使用 open() 函数读取文件时,那个熟悉的 with 语句背后,正是上下文管理器在默默工作。...

Python 装饰器完全指南:从入门到实战

装饰器本质上是一个接受函数作为参数并返回新函数的高阶函数。理解这一点是掌握装饰器的关键。想象一下,你有一个函数,你想在它执行前后添加一些额外的逻辑,比如日志记录、性能测试、权限验证等。装饰器就是为这种...

Python 装饰器的实用技巧与高级用法

装饰器本质上是一个接受函数作为参数并返回新函数的高阶函数。理解这一点是掌握装饰器的关键。让我们从最简单的例子开始,逐步深入到复杂的应用场景。首先,我们需要理解函数在 Python 中是一等公民。这意味...

发表评论

访客

看不清,换一张

◎欢迎参与讨论,请在这里发表您的看法和观点。