当前位置:首页 > Python > 正文内容

Python 异步编程实战:从零构建高性能 Web 爬虫

admin4周前 (03-20)Python36

一、为什么需要异步编程?

在构建 Web 爬虫时,同步代码会面临一个严重的性能瓶颈。当我们用传统的 requests 库发送 HTTP 请求时,程序必须等待服务器响应后才能继续执行下一个请求。如果我们要爬取 100 个页面,而每个请求平均耗时 1 秒,那么总耗时就是 100 秒。

异步编程的核心思想是利用等待时间做其他事情。当我们发送 HTTP 请求后,不需要一直等待响应,而是可以去处理其他请求。当响应到达时,再回来继续处理。这样可以同时处理多个请求,大幅提升效率。

import time
import asyncio
import aiohttp

# 同步方式
async def fetch_sync(session, url):
    # 模拟同步请求
    await asyncio.sleep(1)  # 模拟 I/O 等待
    return f"数据: {url}"

# 异步方式  
async def fetch_async(session, url):
    async with session.get(url) as response:
        return await response.text()

二、async/await 基础语法

Python 3.5 引入了 async/await 语法,让异步代码看起来像同步代码一样直观。

关键概念:

  • async def:定义协程函数,调用它不会立即执行,而是返回一个协程对象
  • await:暂停当前协程,等待异步操作完成
  • asyncio.run():运行协程的入口点
  • asyncio.gather():并发运行多个协程
import asyncio

async def say_hello(name):
    print(f"开始问候: {name}")
    await asyncio.sleep(1)  # 模拟耗时操作
    print(f"完成问候: {name}")
    return f"Hello, {name}"

async def main():
    # 串行执行
    result1 = await say_hello("Alice")
    result2 = await say_hello("Bob")
    
    # 并发执行
    results = await asyncio.gather(
        say_hello("Alice"),
        say_hello("Bob"),
        say_hello("Charlie")
    )
    print(results)

# 运行协程
asyncio.run(main())

三、构建异步 HTTP 客户端

aiohttp 是 Python 异步 HTTP 客户端的标准库,它基于 asyncio 构建,性能出色。

import aiohttp
import asyncio

class AsyncCrawler:
    def __init__(self, max_concurrent=10):
        self.max_concurrent = max_concurrent
        self.session = None
        
    async def __aenter__(self):
        # 创建会话,支持连接池
        connector = aiohttp.TCPConnector(limit=self.max_concurrent)
        self.session = aiohttp.ClientSession(connector=connector)
        return self
        
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        # 关闭会话
        await self.session.close()
        
    async def fetch(self, url, headers=None, timeout=30):
        """获取单个 URL 的内容"""
        try:
            async with self.session.get(
                url, 
                headers=headers,
                timeout=aiohttp.ClientTimeout(total=timeout)
            ) as response:
                if response.status == 200:
                    return {
                        url: url,
                        status: response.status,
                        content: await response.text(),
                        size: len(await response.text())
                    }
                else:
                    return {
                        url: url,
                        status: response.status,
                        error: f"HTTP {response.status}"
                    }
        except Exception as e:
            return {
                url: url,
                status: 0,
                error: str(e)
            }
    
    async def fetch_all(self, urls, headers=None):
        """并发获取多个 URL"""
        tasks = [self.fetch(url, headers) for url in urls]
        return await asyncio.gather(*tasks)

# 使用示例
async def demo():
    urls = [
        https://httpbin.org/delay/1,
        https://httpbin.org/delay/2,
        https://httpbin.org/delay/1,
    ]
    
    async with AsyncCrawler(max_concurrent=5) as crawler:
        results = await crawler.fetch_all(urls)
        for result in results:
            print(f"{result[url]}: {result[status]}")

asyncio.run(demo())

四、实战:高性能新闻爬虫

现在我们来构建一个完整的新闻爬虫,展示异步编程的威力。

import aiohttp
import asyncio
import json
from datetime import datetime
from typing import List, Dict

class NewsCrawler:
    def __init__(self, max_concurrent=20, request_delay=0.1):
        self.max_concurrent = max_concurrent
        self.request_delay = request_delay
        self.semaphore = asyncio.Semaphore(max_concurrent)
        self.session = None
        self.results = []
        
    async def __aenter__(self):
        timeout = aiohttp.ClientTimeout(total=30)
        connector = aiohttp.TCPConnector(limit=self.max_concurrent)
        self.session = aiohttp.ClientSession(
            timeout=timeout,
            connector=connector
        )
        return self
        
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        await self.session.close()
        
    async def fetch_with_semaphore(self, url: str, headers: dict = None):
        """使用信号量控制并发"""
        async with self.semaphore:
            await asyncio.sleep(self.request_delay)
            return await self.fetch(url, headers)
    
    async def fetch(self, url: str, headers: dict = None) -> Dict:
        """获取页面内容"""
        try:
            async with self.session.get(url, headers=headers) as response:
                if response.status == 200:
                    text = await response.text()
                    return {
                        url: url,
                        status: response.status,
                        content: text,
                        size: len(text),
                        timestamp: datetime.now().isoformat()
                    }
                return {
                    url: url,
                    status: response.status,
                    error: f"HTTP {response.status}"
                }
        except Exception as e:
            return {
                url: url,
                status: 0,
                error: str(e)
            }
    
    def parse_news(self, result: Dict) -> Dict:
        """解析新闻内容(示例)"""
        if result.get(status) != 200:
            return result
            
        content = result[content]
        # 这里可以根据实际网站结构进行解析
        # 示例:提取标题和部分内容
        news_item = {
            url: result[url],
            title: f"新闻标题 - {len(content)} 字符",
            summary: content[:200] if len(content) > 200 else content,
            timestamp: result[timestamp]
        }
        return news_item
    
    async def crawl(self, urls: List[str], headers: dict = None) -> List[Dict]:
        """爬取多个页面"""
        tasks = [self.fetch_with_semaphore(url, headers) for url in urls]
        raw_results = await asyncio.gather(*tasks)
        
        # 解析结果
        self.results = [self.parse_news(result) for result in raw_results]
        return self.results
    
    def save_results(self, filename: str = news_results.json):
        """保存结果到文件"""
        with open(filename, w, encoding=utf-8) as f:
            json.dump(self.results, f, ensure_ascii=False, indent=2)
        print(f"结果已保存到 {filename}")

# 性能对比测试
async def performance_test():
    test_urls = [
        https://httpbin.org/delay/1 for _ in range(10)
    ]
    
    # 异步爬取
    start_time = datetime.now()
    async with NewsCrawler(max_concurrent=10) as crawler:
        async_results = await crawler.crawl(test_urls)
    async_time = (datetime.now() - start_time).total_seconds()
    
    print(f"异步爬取 {len(test_urls)} 个页面耗时: {async_time:.2f} 秒")
    print(f"平均每个请求: {async_time/len(test_urls):.2f} 秒")
    
    # 异步结果通常比同步快 5-10 倍
    # 同步方式需要 10 秒(10 个请求 × 1 秒)
    # 异步方式只需要约 1-2 秒(并发处理)

if __name__ == __main__:
    asyncio.run(performance_test())

五、性能优化技巧

要让异步爬虫达到最佳性能,需要注意以下几点:

1. 合理设置并发数

并发数不是越大越好。过高的并发数会导致:

  • 服务器拒绝连接(429 错误)
  • 本地资源耗尽
  • 被反爬虫系统识别

建议从 10-20 开始,根据目标网站调整。

2. 使用连接池

aiohttp 的 ClientSession 内置连接池,复用 TCP 连接可以减少握手开销:

connector = aiohttp.TCPConnector(
    limit=100,        # 总连接数限制
    limit_per_host=10,  # 每个主机的连接数
    keepalive_timeout=30
)

3. 添加请求延迟

await asyncio.sleep(0.1)  # 每次请求间隔 0.1 秒

4. 错误处理和重试机制

async def fetch_with_retry(self, url, max_retries=3):
    for attempt in range(max_retries):
        result = await self.fetch(url)
        if result[status] == 200:
            return result
        await asyncio.sleep(2 ** attempt)  # 指数退避
    return result

六、完整项目示例

下面是一个可直接使用的完整爬虫项目:

import aiohttp
import asyncio
import json
from datetime import datetime
from pathlib import Path

class AsyncWebCrawler:
    """异步 Web 爬虫"""
    
    def __init__(
        self,
        max_concurrent: int = 20,
        timeout: int = 30,
        request_delay: float = 0.1
    ):
        self.max_concurrent = max_concurrent
        self.timeout = timeout
        self.request_delay = request_delay
        self.semaphore = asyncio.Semaphore(max_concurrent)
        self.session = None
        self.stats = {
            total: 0,
            success: 0,
            failed: 0,
            start_time: None,
            end_time: None
        }
    
    async def __aenter__(self):
        connector = aiohttp.TCPConnector(limit=self.max_concurrent)
        timeout = aiohttp.ClientTimeout(total=self.timeout)
        self.session = aiohttp.ClientSession(
            connector=connector,
            timeout=timeout
        )
        return self
    
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        await self.session.close()
    
    async def fetch_one(self, url: str, headers: dict = None) -> dict:
        """获取单个 URL"""
        async with self.semaphore:
            await asyncio.sleep(self.request_delay)
            try:
                async with self.session.get(url, headers=headers) as resp:
                    content = await resp.text()
                    self.stats[success]  = 1
                    return {
                        url: url,
                        status: resp.status,
                        content: content,
                        size: len(content)
                    }
            except Exception as e:
                self.stats[failed]  = 1
                return {
                    url: url,
                    status: 0,
                    error: str(e)
                }
    
    async def crawl(self, urls: list, headers: dict = None) -> list:
        """爬取多个 URL"""
        self.stats[start_time] = datetime.now()
        self.stats[total] = len(urls)
        
        tasks = [self.fetch_one(url, headers) for url in urls]
        results = await asyncio.gather(*tasks)
        
        self.stats[end_time] = datetime.now()
        return results
    
    def get_stats(self) -> dict:
        """获取统计信息"""
        if self.stats[start_time] and self.stats[end_time]:
            duration = (self.stats[end_time] - self.stats[start_time]).total_seconds()
            self.stats[duration] = f"{duration:.2f}s"
            self.stats[avg_time] = f"{duration/self.stats[total]:.2f}s"
        return self.stats
    
    def save_results(self, results: list, filename: str = results.json):
        """保存结果"""
        Path(filename).write_text(
            json.dumps(results, ensure_ascii=False, indent=2),
            encoding=utf-8
        )
        print(f"✅ 结果已保存: {filename}")

# 使用示例
async def main():
    urls = [
        https://httpbin.org/get,
        https://httpbin.org/uuid,
        https://httpbin.org/headers,
    ]
    
    async with AsyncWebCrawler(max_concurrent=10) as crawler:
        results = await crawler.crawl(urls)
        crawler.save_results(results)
        
        stats = crawler.get_stats()
        print(f"\n📊 统计信息:")
        print(f"总请求数: {stats[total]}")
        print(f"成功: {stats[success]}")
        print(f"失败: {stats[failed]}")
        print(f"总耗时: {stats.get(duration, N/A)}")
        print(f"平均耗时: {stats.get(avg_time, N/A)}")

if __name__ == __main__:
    asyncio.run(main())

七、总结

Python 异步编程通过 async/await 语法和 asyncio 库,让我们能够编写高性能的并发代码。在 Web 爬虫场景中,异步编程可以实现:

  • 10 倍以上的性能提升:并发处理多个请求
  • 更好的资源利用:在等待 I/O 时做其他工作
  • 简洁的代码:看起来像同步代码,易于理解和维护

关键要点:

  1. 使用 aiohttp 替代 requests
  2. 合理设置并发数和请求延迟
  3. 使用信号量控制并发
  4. 添加错误处理和重试机制
  5. 保存统计信息,监控性能

异步编程是 Python 高级开发的必备技能,掌握它将让你的程序性能飞跃。开始尝试吧!

相关资源:

相关文章

[Python 教程] OpenCV-Python 入门:图像处理基础详解

OpenCV-Python 入门:图像处理基础详解OpenCV 是一个跨平台计算机视觉库,轻量级且高效,支持 Python 接口。本文将系统介绍 OpenCV 的核心概念和基础操作。一、OpenCV...

Python 上下文管理器实战:从 with 语句到自定义资源管理

在 Python 编程中,上下文管理器(Context Manager)是一个强大但常被低估的特性。当你使用 open() 函数读取文件时,那个熟悉的 with 语句背后,正是上下文管理器在默默工作。...

Python 装饰器的实用技巧与高级用法

装饰器本质上是一个接受函数作为参数并返回新函数的高阶函数。理解这一点是掌握装饰器的关键。让我们从最简单的例子开始,逐步深入到复杂的应用场景。首先,我们需要理解函数在 Python 中是一等公民。这意味...

深入理解 Python 装饰器:从基础到高级的完整指南

什么是装饰器?装饰器本质上是一个 Python 函数,它可以让其他函数在不需要做任何代码变动的前提下增加额外功能。装饰器的返回值通常也是一个函数对象。这种设计模式遵循了"开放封闭原则"——对扩展开放,...

Python 数据处理实战:从零开始掌握 Pandas 核心操作

在现代数据驱动的世界中,处理和分析结构化数据已成为必备技能。无论你是数据分析师、机器学习工程师还是科研人员,Pandas 都是你工具箱中不可或缺的利器。与 Excel 相比,Pandas 能够轻松处理...

Python 数据处理三部曲:从清洗到可视化的实战指南

在现代数据驱动的工作场景中,无论是处理实验数据、分析用户行为,还是监控业务指标,高效的数据处理能力都是不可或缺的。Python 提供了一套完整的数据处理工具链,其中 NumPy、Pandas 和 Ma...

发表评论

访客

看不清,换一张

◎欢迎参与讨论,请在这里发表您的看法和观点。