Appearance
并发编程详解:从线程进程到协程
一、线程 vs 进程
基本概念
进程:
- 操作系统资源分配的基本单位
- 拥有独立的内存空间(代码、数据、堆栈)
- 进程间通信需要特殊机制(IPC)
- 创建和切换开销较大
线程:
- CPU调度的基本单位
- 同一进程内的线程共享内存空间
- 通信相对简单(共享内存)
- 创建和切换开销较小
Python示例对比
python
import multiprocessing
import threading
import time
import os
# 进程示例
def process_worker(name):
print(f"进程 {name} PID: {os.getpid()}")
time.sleep(2)
print(f"进程 {name} 结束")
# 线程示例
def thread_worker(name):
print(f"线程 {name} 在进程 {os.getpid()} 中运行")
time.sleep(2)
print(f"线程 {name} 结束")
def compare_process_vs_thread():
print("=== 进程示例 ===")
processes = []
start_time = time.time()
for i in range(3):
p = multiprocessing.Process(target=process_worker, args=(f"P{i}",))
processes.append(p)
p.start()
for p in processes:
p.join()
process_time = time.time() - start_time
print("\n=== 线程示例 ===")
threads = []
start_time = time.time()
for i in range(3):
t = threading.Thread(target=thread_worker, args=(f"T{i}",))
threads.append(t)
t.start()
for t in threads:
t.join()
thread_time = time.time() - start_time
print(f"\n总结: 进程用时 {process_time:.2f}s, 线程用时 {thread_time:.2f}s")
# 内存隔离示例
shared_list = []
def process_memory_demo():
def worker():
shared_list.append(os.getpid())
print(f"进程内 shared_list: {shared_list}")
print("不同进程内存隔离:")
p1 = multiprocessing.Process(target=worker)
p2 = multiprocessing.Process(target=worker)
p1.start()
p2.start()
p1.join()
p2.join()
print(f"主进程 shared_list: {shared_list}") # 空列表
def thread_memory_demo():
def worker():
shared_list.append(threading.current_thread().name)
print(f"线程 {threading.current_thread().name}: {shared_list}")
print("\n同一进程内线程共享内存:")
threads = []
for i in range(3):
t = threading.Thread(target=worker, name=f"Thread-{i}")
threads.append(t)
t.start()
for t in threads:
t.join()
print(f"最终 shared_list: {shared_list}")
二、锁机制:悲观锁 vs 乐观锁
悲观锁
悲观锁 的核心思想是:“悲观地”认为并发冲突一定会发生。因此,在操作共享数据之前,它必须先独占地获取锁(“上锁”),确保在锁释放之前,其他线程都无法访问该数据。
悲观锁最适合于以下场景:
- 写多读少:数据被修改的频率远高于被读取的频率,发生冲突的概率很高。
- 临界区代码执行时间长:一旦获取到数据,需要较长时间进行处理(例如复杂的计算、IO操作),在这期间不希望被干扰。
- 强一致性要求:要求数据的准确性绝对优先于系统性能,不允许出现任何脏读、更新丢失等问题。
- 冲突代价高:如果发生并发冲突,回滚或重试的代价非常大,不如一开始就串行化执行。
数据库中的悲观锁举例
在数据库层面,SELECT ... FOR UPDATE 是标准的悲观锁语句。
场景:抢购限量商品
sql
-- 1. 事务开始
BEGIN;
-- 2. 悲观锁查询:锁住id=1的商品行(假设库存为stock)
SELECT stock FROM products WHERE id = 1 FOR UPDATE;
-- 3. 应用逻辑判断库存 > 0
-- 4. 更新库存
UPDATE products SET stock = stock - 1 WHERE id = 1;
-- 5. 提交事务,释放锁
COMMIT;
当多个事务同时执行 SELECT ... FOR UPDATE 时,第一个事务会获得该行的排他锁(X Lock),后续事务会被阻塞在 SELECT 语句上,直到第一个事务提交。这确保了库存不会被超卖。
与乐观锁的对比
为了更好地理解悲观锁,可以对比一下乐观锁:
- 乐观锁思想:认为冲突很少发生,所以先直接修改数据。在提交时检查一下这期间数据是否被他人改动过(常用版本号或时间戳)。如果冲突了,就回滚并重试。
- 适用场景:读多写少,冲突概率低。例如,文章点赞数更新、某些配置信息更新。
| 特性 | 悲观锁 | 乐观锁 |
|---|---|---|
| 并发假设 | 冲突多,悲观 | 冲突少,乐观 |
| 实现方式 | synchronized, ReentrantLock, SELECT ... FOR UPDATE | 版本号(Version),CAS操作(如 AtomicInteger) |
| 执行流程 | 先加锁,后操作 | 先操作,提交时验证 |
| 线程阻塞 | 是(挂起等待) | 否(通常循环重试) |
| 开销 | 锁管理、上下文切换开销大 | 冲突少时,开销极小 |
| 适用场景 | 写多读少,强一致性,临界区长 | 读多写少,响应时间要求高 |
悲观锁适用于那些你确信会发生竞争,并且竞争失败的代价(如数据错误)远高于加锁成本(如性能损失)的场景。
典型例子包括:
- 金融交易(转账、支付)
- 库存扣减(秒杀、抢购)
- 订单状态变更
- 任何需要确保操作序列化的核心业务
python
import threading
import time
class BankAccountPessimistic:
"""悲观锁示例:银行账户"""
def __init__(self, balance=1000):
self.balance = balance
self.lock = threading.Lock()
def withdraw(self, amount):
with self.lock: # 自动获取和释放锁
if self.balance >= amount:
time.sleep(0.01) # 模拟处理时间
self.balance -= amount
print(f"取款 {amount},余额 {self.balance}")
return True
else:
print(f"取款 {amount} 失败,余额不足")
return False
def deposit(self, amount):
with self.lock:
time.sleep(0.01)
self.balance += amount
print(f"存款 {amount},余额 {self.balance}")
def pessimistic_lock_demo():
account = BankAccountPessimistic()
def customer_operations():
for _ in range(5):
account.withdraw(100)
account.deposit(50)
threads = []
for i in range(3):
t = threading.Thread(target=customer_operations, name=f"Customer-{i}")
threads.append(t)
t.start()
for t in threads:
t.join()
print(f"最终余额: {account.balance}")
乐观锁
假设不会冲突,先操作再检查版本
python
import threading
import time
class BankAccountOptimistic:
"""乐观锁示例:银行账户"""
def __init__(self, balance=1000):
self._balance = balance
self._version = 0
self._lock = threading.Lock() # 用于保护版本号
@property
def balance(self):
return self._balance
def _update_with_retry(self, operation, *args):
"""带重试的乐观锁更新"""
for attempt in range(3): # 重试3次
current_version = self._version
current_balance = self._balance
# 执行操作(但不真正修改)
new_balance, success = operation(current_balance, *args)
if not success:
return False
# 尝试提交更新
with self._lock:
if self._version == current_version:
self._balance = new_balance
self._version += 1
return True
# 版本冲突,重试
print(f"版本冲突,重试 {attempt + 1}")
time.sleep(0.001)
return False
def withdraw(self, amount):
def op(balance, amount):
if balance >= amount:
time.sleep(0.01)
return balance - amount, True
return balance, False
success = self._update_with_retry(op, amount)
if success:
print(f"取款 {amount},余额 {self.balance}")
else:
print(f"取款 {amount} 失败")
return success
def deposit(self, amount):
def op(balance, amount):
time.sleep(0.01)
return balance + amount, True
success = self._update_with_retry(op, amount)
if success:
print(f"存款 {amount},余额 {self.balance}")
return success
def optimistic_lock_demo():
account = BankAccountOptimistic()
def customer_operations():
for _ in range(5):
account.withdraw(100)
account.deposit(50)
threads = []
for i in range(3):
t = threading.Thread(target=customer_operations, name=f"Customer-{i}")
threads.append(t)
t.start()
for t in threads:
t.join()
print(f"最终余额: {account.balance}")
三、死锁避免
死锁产生的四个必要条件
- 互斥条件
- 请求与保持条件
- 不可剥夺条件
- 循环等待条件
死锁示例
python
import threading
import time
class DeadlockExample:
def __init__(self):
self.lock_a = threading.Lock()
self.lock_b = threading.Lock()
def method1(self):
with self.lock_a:
print(f"{threading.current_thread().name} 获取 lock_a")
time.sleep(0.1) # 增加死锁概率
with self.lock_b:
print(f"{threading.current_thread().name} 获取 lock_b")
# 执行操作
def method2(self):
with self.lock_b:
print(f"{threading.current_thread().name} 获取 lock_b")
time.sleep(0.1)
with self.lock_a:
print(f"{threading.current_thread().name} 获取 lock_a")
# 执行操作
def deadlock_demo():
example = DeadlockExample()
t1 = threading.Thread(target=example.method1, name="Thread-1")
t2 = threading.Thread(target=example.method2, name="Thread-2")
t1.start()
t2.start()
t1.join(timeout=2)
t2.join(timeout=2)
if t1.is_alive() or t2.is_alive():
print("检测到死锁!")
死锁避免策略
python
import threading
import time
from contextlib import contextmanager
class DeadlockFreeResourceManager:
"""使用锁排序避免死锁"""
def __init__(self):
self.locks = {
'A': threading.Lock(),
'B': threading.Lock(),
'C': threading.Lock()
}
# 定义锁的获取顺序
self.lock_order = ['A', 'B', 'C']
@contextmanager
def acquire_locks(self, *lock_names):
"""按预定义顺序获取锁"""
sorted_locks = sorted(
lock_names,
key=lambda x: self.lock_order.index(x)
)
acquired_locks = []
try:
for lock_name in sorted_locks:
lock = self.locks[lock_name]
lock.acquire()
acquired_locks.append(lock)
print(f"{threading.current_thread().name} 获取 {lock_name}")
time.sleep(0.05)
yield # 执行临界区代码
finally:
# 按获取的逆序释放锁
for lock in reversed(acquired_locks):
lock.release()
print(f"{threading.current_thread().name} 释放锁")
def transaction1(self):
with self.acquire_locks('A', 'B'):
print(f"{threading.current_thread().name} 执行交易1")
time.sleep(0.1)
def transaction2(self):
with self.acquire_locks('B', 'C'):
print(f"{threading.current_thread().name} 执行交易2")
time.sleep(0.1)
def transaction3(self):
with self.acquire_locks('A', 'B', 'C'):
print(f"{threading.current_thread().name} 执行交易3")
time.sleep(0.1)
def deadlock_avoidance_demo():
manager = DeadlockFreeResourceManager()
threads = [
threading.Thread(target=manager.transaction1, name="T1"),
threading.Thread(target=manager.transaction2, name="T2"),
threading.Thread(target=manager.transaction3, name="T3")
]
for t in threads:
t.start()
for t in threads:
t.join()
print("所有交易完成,无死锁!")
使用超时避免死锁
python
import threading
import time
class TimeoutLockExample:
def __init__(self):
self.lock_a = threading.Lock()
self.lock_b = threading.Lock()
def safe_method(self):
"""使用超时避免死锁"""
attempts = 0
while attempts < 3:
# 尝试获取第一个锁
acquired_a = self.lock_a.acquire(timeout=0.1)
if not acquired_a:
attempts += 1
continue
try:
# 尝试获取第二个锁(带超时)
acquired_b = self.lock_b.acquire(timeout=0.1)
if not acquired_b:
print(f"{threading.current_thread().name} 获取lock_b超时,释放lock_a重试")
self.lock_a.release()
attempts += 1
time.sleep(0.01)
continue
try:
# 执行操作
print(f"{threading.current_thread().name} 成功获取所有锁")
time.sleep(0.05)
return True
finally:
self.lock_b.release()
finally:
if acquired_a:
self.lock_a.release()
print(f"{threading.current_thread().name} 失败,超过重试次数")
return False
def timeout_lock_demo():
example = TimeoutLockExample()
threads = []
for i in range(5):
t = threading.Thread(
target=example.safe_method,
name=f"Thread-{i}"
)
threads.append(t)
t.start()
for t in threads:
t.join()
四、协程(Coroutine)
异步IO示例
python
import asyncio
import time
import aiohttp
# 基本协程示例
async def simple_coroutine(name, delay):
"""简单的协程示例"""
print(f"{name} 开始,等待 {delay} 秒")
await asyncio.sleep(delay)
print(f"{name} 结束")
return f"{name}-结果"
async def basic_coroutine_demo():
"""运行多个协程"""
start_time = time.time()
# 创建任务
task1 = asyncio.create_task(simple_coroutine("任务1", 2))
task2 = asyncio.create_task(simple_coroutine("任务2", 1))
task3 = asyncio.create_task(simple_coroutine("任务3", 3))
# 等待所有任务完成
results = await asyncio.gather(task1, task2, task3)
print(f"所有任务完成,结果: {results}")
print(f"总用时: {time.time() - start_time:.2f}秒")
# 协程与线程/进程对比
async def io_bound_coroutine(id, delay):
"""模拟IO密集型任务"""
print(f"协程 {id}: 开始IO操作")
await asyncio.sleep(delay) # 非阻塞等待
print(f"协程 {id}: IO操作完成")
return id
def io_bound_thread(id, delay):
"""线程版本的IO任务"""
print(f"线程 {id}: 开始IO操作")
time.sleep(delay) # 阻塞等待
print(f"线程 {id}: IO操作完成")
return id
async def compare_concurrency():
"""对比协程和线程"""
print("=== 协程并发 ===")
start = time.time()
tasks = [io_bound_coroutine(i, 1) for i in range(5)]
results = await asyncio.gather(*tasks)
print(f"协程用时: {time.time() - start:.2f}秒")
print("\n=== 线程并发 ===")
start = time.time()
import concurrent.futures
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
futures = [executor.submit(io_bound_thread, i, 1) for i in range(5)]
results = [f.result() for f in futures]
print(f"线程用时: {time.time() - start:.2f}秒")
# 实际应用:并发HTTP请求
async def fetch_url(session, url, id):
"""并发获取网页"""
try:
print(f"任务 {id}: 开始获取 {url}")
async with session.get(url, timeout=10) as response:
text = await response.text()
print(f"任务 {id}: 完成,状态码 {response.status}")
return len(text)
except Exception as e:
print(f"任务 {id}: 错误 - {e}")
return 0
async def concurrent_http_demo():
"""并发HTTP请求示例"""
urls = [
"http://httpbin.org/delay/1",
"http://httpbin.org/delay/2",
"http://httpbin.org/delay/1",
"https://api.github.com",
"http://httpbin.org/get"
]
start_time = time.time()
async with aiohttp.ClientSession() as session:
tasks = []
for i, url in enumerate(urls):
task = fetch_url(session, url, i)
tasks.append(task)
results = await asyncio.gather(*tasks)
print(f"\n获取 {len(urls)} 个页面,总字符数: {sum(results)}")
print(f"总用时: {time.time() - start_time:.2f}秒")
# 协程与生成器
def simple_generator():
"""生成器示例"""
print("开始")
yield 1
print("继续")
yield 2
print("结束")
async def coroutine_vs_generator():
"""协程与生成器的区别"""
print("=== 生成器 ===")
gen = simple_generator()
print(next(gen))
print(next(gen))
print("\n=== 协程 ===")
async def simple_async():
print("协程开始")
await asyncio.sleep(0.1)
print("协程继续")
await asyncio.sleep(0.1)
print("协程结束")
await simple_async()
# 高级协程模式:生产者-消费者
async def producer(queue, count):
"""生产者协程"""
for i in range(count):
item = f"项目-{i}"
await queue.put(item)
print(f"生产: {item}")
await asyncio.sleep(0.1)
await queue.put(None) # 结束信号
async def consumer(queue, id):
"""消费者协程"""
while True:
item = await queue.get()
if item is None:
queue.put(None) # 传递给其他消费者
break
print(f"消费者{id} 消费: {item}")
await asyncio.sleep(0.2)
async def producer_consumer_demo():
"""生产者-消费者模式"""
queue = asyncio.Queue(maxsize=5)
# 创建生产者和消费者任务
producer_task = asyncio.create_task(producer(queue, 10))
consumer_tasks = [
asyncio.create_task(consumer(queue, i))
for i in range(3)
]
# 等待生产者完成
await producer_task
# 等待消费者处理剩余项目
await queue.join()
# 取消消费者任务
for task in consumer_tasks:
task.cancel()
# 运行所有示例
async def main():
print("=" * 50)
print("协程基本示例")
print("=" * 50)
await basic_coroutine_demo()
print("\n" + "=" * 50)
print("协程与线程对比")
print("=" * 50)
await compare_concurrency()
print("\n" + "=" * 50)
print("并发HTTP请求")
print("=" * 50)
await concurrent_http_demo()
print("\n" + "=" * 50)
print("生产者-消费者模式")
print("=" * 50)
await producer_consumer_demo()
# 运行主函数
if __name__ == "__main__":
print("=== 线程 vs 进程示例 ===")
compare_process_vs_thread()
print("\n=== 进程内存隔离示例 ===")
process_memory_demo()
print("\n=== 线程内存共享示例 ===")
thread_memory_demo()
print("\n=== 悲观锁示例 ===")
pessimistic_lock_demo()
print("\n=== 乐观锁示例 ===")
optimistic_lock_demo()
print("\n=== 死锁示例 ===")
deadlock_demo()
print("\n=== 死锁避免示例 ===")
deadlock_avoidance_demo()
print("\n=== 超时锁示例 ===")
timeout_lock_demo()
print("\n=== 协程示例 ===")
asyncio.run(main())
五、总结对比
| 特性 | 进程 | 线程 | 协程 |
|---|---|---|---|
| 创建开销 | 大 | 小 | 极小 |
| 内存空间 | 独立 | 共享 | 共享 |
| 通信方式 | IPC复杂 | 共享内存简单 | 直接通信 |
| 切换开销 | 大 | 中 | 极小 |
| 并发级别 | 进程级 | 线程级 | 用户级 |
| 适用场景 | CPU密集型 | I/O密集型 | 高并发I/O |
| 数据安全 | 高(天然隔离) | 需要同步 | 需要同步 |
| Python实现 | multiprocessing | threading | asyncio |
选择建议:
- CPU密集型任务 → 多进程
- I/O密集型任务,需要共享状态 → 多线程
- 高并发I/O,大量连接 → 协程
- 需要避免GIL限制 → 多进程
- 简单并行,少量任务 → 线程池
最佳实践:
- 避免过早优化:先保证正确性,再优化性能
- 合理选择工具:根据任务特性选择并发模型
- 注意线程安全:共享数据需要适当同步
- 避免死锁:使用锁排序、超时等策略
- 资源管理:及时释放资源,避免泄漏
这个全面的示例涵盖了并发编程的核心概念,展示了各种技术的实际应用场景和最佳实践。