线程智能,进程硬核:掌握Python并行编程手册

Python经常被指责"太慢"。虽然Python在原始计算方面确实不如C或Rust快,但使用正确的技术,你可以显著加速你的Python代码——特别是当你处理I/O密集型工作负载时。
在本文中,我们将深入探讨:
何时以及如何在Python中使用threading
它与multiprocessing的区别
如何识别I/O绑定CPU绑定工作负载
可以提升应用性能的实用示例
让我们开始穿针引线。
🧠 理解I/O绑定 vs CPU绑定
在选择线程还是多进程之前,你必须理解你要优化的任务类型
类型
描述
示例
最佳工具
I/O绑定
大部分时间在等待外部资源
网络爬虫、文件下载
threading, asyncio
CPU绑定
大部分时间在执行重计算
图像处理、机器学习推理
multiprocessing
💡 经验法则
如果你的程序因为等待而变慢,使用线程
如果它因为计算而变慢,使用进程
🧵 在Python中使用线程
Python的全局解释器锁(GIL)限制了CPU绑定线程的真正并行性,但对于I/O绑定任务threading可以带来巨大的速度提升。
示例:I/O绑定任务的线程化
  1. import threading
  2. import requests
  3. import time

  4. urls = [
  5. 'https://example.com',
  6. 'https://httpbin.org/delay/2',
  7. 'https://httpbin.org/get'
  8. ]

  9. def fetch(url):
  10. print(f"正在获取 {url}")
  11. response = requests.get(url)
  12. print(f"完成: {url} - 状态码 {response.status_code}")

  13. start = time.time()
  14. threads = []

  15. for url in urls:
  16. t = threading.Thread(target=fetch, args=(url,))
  17. threads.append(t)
  18. t.start()

  19. for t in threads:
  20. t.join()

  21. print(f"总时间: {time.time() - start:.2f} 秒")
python
🕒 没有线程,这将需要约6秒(每个请求2秒)。
使用线程,它在约2秒内运行,显示出真正的加速。
💡 线程注意事项
线程共享内存 可能出现竞态条件。
使用threading.Lock()避免共享资源冲突。
适合I/O,但对CPU密集型工作无效。
🧮 CPU绑定任务的多进程
对于CPU密集型工作负载,GIL成为瓶颈。这就是multiprocessing模块发挥作用的地方。它生成单独的进程,每个进程都有自己的Python解释器。
示例:使用多进程的CPU绑定任务
  1. from multiprocessing import Process, cpu_count
  2. import math
  3. import time

  4. def compute():
  5. print(f"进程启动")
  6. for _ in range(10**6):
  7. math.sqrt(12345.6789)

  8. if __name__ == "__main__":
  9. start = time.time()
  10. processes = []

  11. for _ in range(cpu_count()):
  12. p = Process(target=compute)
  13. processes.append(p)
  14. p.start()

  15. for p in processes:
  16. p.join()

  17. print(f"总时间: {time.time() - start:.2f} 秒")
python
在这里,我们将工作分配到所有可用的CPU核心——对计算密集型任务来说是一个巨大的提升。
🔍 如何判断任务是CPU绑定还是I/O绑定
使用分析工具或观察:
视觉检查
等待API调用、文件读取 I/O绑定
数学循环、数据处理 CPU绑定
使用分析工具
  1. pip install line_profiler
  2. kernprof -l script.py
  3. python -m line_profiler script.py.lprof
bash
或使用cProfile:
  1. python -m cProfile myscript.py
  2. 检查时间花在哪里:I/O调用还是计算。
bash
🧰 奖励:concurrent.futures用于清洁代码
不要手动管理线程或进程,使用:
  1. from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
python
I/O的ThreadPool:
  1. with ThreadPoolExecutor(max_workers=5) as executor:
  2. executor.map(fetch, urls)
python
CPU的ProcessPool:
  1. with ProcessPoolExecutor() as executor:
  2. executor.map(compute, range(cpu_count()))
python
最终思考
Python本身并不慢——它只是需要正确的工具。
任务类型 | 使用这个
---------|----------
I/O绑定 | threading, asyncio, ThreadPoolExecutor
CPU绑定 | multiprocessing, ProcessPoolExecutor
从小开始,分析你的代码,选择正确的并行化策略。你的应用——和你的用户——会感谢你。
实际应用示例
让我们看一些更实际的例子来展示这些概念:
示例1:并行文件处理
  1. import os
  2. import threading
  3. from concurrent.futures import ThreadPoolExecutor
  4. import time

  5. def process_file(filename):
  6. """模拟文件处理"""
  7. print(f"处理文件: {filename}")
  8. time.sleep(1) # 模拟I/O操作
  9. return f"已处理 {filename}"

  10. def process_files_sequential(files):
  11. """顺序处理文件"""
  12. results = []
  13. for file in files:
  14. results.append(process_file(file))
  15. return results

  16. def process_files_parallel(files):
  17. """并行处理文件"""
  18. with ThreadPoolExecutor(max_workers=4) as executor:
  19. results = list(executor.map(process_file, files))
  20. return results

  21. # 测试
  22. files = [f"file_{i}.txt" for i in range(10)]

  23. # 顺序处理
  24. start = time.time()
  25. process_files_sequential(files)
  26. sequential_time = time.time() - start

  27. # 并行处理
  28. start = time.time()
  29. process_files_parallel(files)
  30. parallel_time = time.time() - start

  31. print(f"顺序处理时间: {sequential_time:.2f}秒")
  32. print(f"并行处理时间: {parallel_time:.2f}秒")
  33. print(f"加速比: {sequential_time/parallel_time:.2f}x")
python
示例2:CPU密集型计算
  1. from multiprocessing import Pool, cpu_count
  2. import time

  3. def heavy_computation(n):
  4. """模拟CPU密集型计算"""
  5. result = 0
  6. for i in range(n):
  7. result += i ** 2
  8. return result

  9. def compute_sequential(numbers):
  10. """顺序计算"""
  11. results = []
  12. for num in numbers:
  13. results.append(heavy_computation(num))
  14. return results

  15. def compute_parallel(numbers):
  16. """并行计算"""
  17. with Pool(processes=cpu_count()) as pool:
  18. results = pool.map(heavy_computation, numbers)
  19. return results

  20. # 测试
  21. numbers = [1000000] * 8 # 8个相同的计算任务

  22. # 顺序计算
  23. start = time.time()
  24. compute_sequential(numbers)
  25. sequential_time = time.time() - start

  26. # 并行计算
  27. start = time.time()
  28. compute_parallel(numbers)
  29. parallel_time = time.time() - start

  30. print(f"顺序计算时间: {sequential_time:.2f}秒")
  31. print(f"并行计算时间: {parallel_time:.2f}秒")
  32. print(f"加速比: {sequential_time/parallel_time:.2f}x")
python
最佳实践和注意事项
1. 选择合适的并行化策略
  1. import asyncio
  2. import aiohttp
  3. import time

  4. # 对于I/O密集型任务,asyncio可能是更好的选择
  5. async def fetch_url(session, url):
  6. async with session.get(url) as response:
  7. return await response.text()

  8. async def fetch_all_urls(urls):
  9. async with aiohttp.ClientSession() as session:
  10. tasks = [fetch_url(session, url) for url in urls]
  11. return await asyncio.gather(*tasks)

  12. # 使用示例
  13. urls = ['https://httpbin.org/delay/1'] * 10
  14. start = time.time()
  15. asyncio.run(fetch_all_urls(urls))
  16. print(f"异步获取时间: {time.time() - start:.2f}秒")
python
2. 避免常见的陷阱
  1. import threading
  2. import time

  3. # 错误示例:共享状态没有保护
  4. counter = 0

  5. def increment_bad():
  6. global counter
  7. for _ in range(1000):
  8. counter += 1

  9. # 正确示例:使用锁保护共享状态
  10. counter = 0
  11. lock = threading.Lock()

  12. def increment_good():
  13. global counter
  14. for _ in range(1000):
  15. with lock:
  16. counter += 1

  17. # 测试
  18. threads = []
  19. for _ in range(10):
  20. t = threading.Thread(target=increment_good)
  21. threads.append(t)
  22. t.start()

  23. for t in threads:
  24. t.join()

  25. print(f"最终计数: {counter}")
python
3. 性能监控
  1. import cProfile
  2. import pstats
  3. import io

  4. def profile_function(func, *args, **kwargs):
  5. """分析函数性能"""
  6. pr = cProfile.Profile()
  7. pr.enable()
  8. result = func(*args, **kwargs)
  9. pr.disable()
  10. s = io.StringIO()
  11. ps = pstats.Stats(pr, stream=s).sort_stats('cumulative')
  12. ps.print_stats()
  13. print(s.getvalue())
  14. return result

  15. # 使用示例
  16. def example_function():
  17. time.sleep(1)
  18. return "完成"

  19. profile_function(example_function)
python
总结
Python的并行编程并不复杂,但需要理解正确的工具和时机:
I/O绑定任务:使用threadingasyncio
CPU绑定任务:使用multiprocessing
简单场景:使用concurrent.futures
复杂场景:考虑asyncio或专门的并行库
记住,过早优化是万恶之源。首先确保你的代码是正确的,然后测量性能瓶颈,最后应用适当的并行化策略。
通过掌握这些技术,你可以显著提升Python应用的性能,特别是在处理I/O密集型或CPU密集型任务时。
本文翻译自DEV Community上的原创文章,旨在帮助中文开发者掌握Python并行编程技术。