Skip to content
On this page

并发编程详解:从线程进程到协程

一、线程 vs 进程

基本概念

进程

  • 操作系统资源分配的基本单位
  • 拥有独立的内存空间(代码、数据、堆栈)
  • 进程间通信需要特殊机制(IPC)
  • 创建和切换开销较大

线程

  • CPU调度的基本单位
  • 同一进程内的线程共享内存空间
  • 通信相对简单(共享内存)
  • 创建和切换开销较小

Python示例对比

python
import multiprocessing
import threading
import time
import os

# 进程示例
def process_worker(name):
    print(f"进程 {name} PID: {os.getpid()}")
    time.sleep(2)
    print(f"进程 {name} 结束")

# 线程示例
def thread_worker(name):
    print(f"线程 {name} 在进程 {os.getpid()} 中运行")
    time.sleep(2)
    print(f"线程 {name} 结束")

def compare_process_vs_thread():
    print("=== 进程示例 ===")
    processes = []
    start_time = time.time()
  
    for i in range(3):
        p = multiprocessing.Process(target=process_worker, args=(f"P{i}",))
        processes.append(p)
        p.start()
  
    for p in processes:
        p.join()
  
    process_time = time.time() - start_time
  
    print("\n=== 线程示例 ===")
    threads = []
    start_time = time.time()
  
    for i in range(3):
        t = threading.Thread(target=thread_worker, args=(f"T{i}",))
        threads.append(t)
        t.start()
  
    for t in threads:
        t.join()
  
    thread_time = time.time() - start_time
  
    print(f"\n总结: 进程用时 {process_time:.2f}s, 线程用时 {thread_time:.2f}s")

# 内存隔离示例
shared_list = []

def process_memory_demo():
    def worker():
        shared_list.append(os.getpid())
        print(f"进程内 shared_list: {shared_list}")
  
    print("不同进程内存隔离:")
    p1 = multiprocessing.Process(target=worker)
    p2 = multiprocessing.Process(target=worker)
    p1.start()
    p2.start()
    p1.join()
    p2.join()
    print(f"主进程 shared_list: {shared_list}")  # 空列表

def thread_memory_demo():
    def worker():
        shared_list.append(threading.current_thread().name)
        print(f"线程 {threading.current_thread().name}: {shared_list}")
  
    print("\n同一进程内线程共享内存:")
    threads = []
    for i in range(3):
        t = threading.Thread(target=worker, name=f"Thread-{i}")
        threads.append(t)
        t.start()
  
    for t in threads:
        t.join()
  
    print(f"最终 shared_list: {shared_list}")

二、锁机制:悲观锁 vs 乐观锁

悲观锁

悲观锁 的核心思想是:“悲观地”认为并发冲突一定会发生。因此,在操作共享数据之前,它必须先独占地获取锁(“上锁”),确保在锁释放之前,其他线程都无法访问该数据。

悲观锁最适合于以下场景:

  1. 写多读少:数据被修改的频率远高于被读取的频率,发生冲突的概率很高。
  2. 临界区代码执行时间长:一旦获取到数据,需要较长时间进行处理(例如复杂的计算、IO操作),在这期间不希望被干扰。
  3. 强一致性要求:要求数据的准确性绝对优先于系统性能,不允许出现任何脏读、更新丢失等问题。
  4. 冲突代价高:如果发生并发冲突,回滚或重试的代价非常大,不如一开始就串行化执行。

数据库中的悲观锁举例

在数据库层面,SELECT ... FOR UPDATE 是标准的悲观锁语句。

场景:抢购限量商品

sql
-- 1. 事务开始
BEGIN;

-- 2. 悲观锁查询:锁住id=1的商品行(假设库存为stock)
SELECT stock FROM products WHERE id = 1 FOR UPDATE;

-- 3. 应用逻辑判断库存 > 0
-- 4. 更新库存
UPDATE products SET stock = stock - 1 WHERE id = 1;

-- 5. 提交事务,释放锁
COMMIT;

当多个事务同时执行 SELECT ... FOR UPDATE 时,第一个事务会获得该行的排他锁(X Lock),后续事务会被阻塞在 SELECT 语句上,直到第一个事务提交。这确保了库存不会被超卖。

与乐观锁的对比

为了更好地理解悲观锁,可以对比一下乐观锁:

  • 乐观锁思想:认为冲突很少发生,所以先直接修改数据。在提交时检查一下这期间数据是否被他人改动过(常用版本号或时间戳)。如果冲突了,就回滚并重试。
  • 适用场景读多写少,冲突概率低。例如,文章点赞数更新、某些配置信息更新。
特性悲观锁乐观锁
并发假设冲突多,悲观冲突少,乐观
实现方式synchronized, ReentrantLock, SELECT ... FOR UPDATE版本号(Version),CAS操作(如 AtomicInteger
执行流程先加锁,后操作先操作,提交时验证
线程阻塞是(挂起等待)否(通常循环重试)
开销锁管理、上下文切换开销大冲突少时,开销极小
适用场景写多读少,强一致性,临界区长读多写少,响应时间要求高

悲观锁适用于那些你确信会发生竞争,并且竞争失败的代价(如数据错误)远高于加锁成本(如性能损失)的场景。

典型例子包括:

  • 金融交易(转账、支付)
  • 库存扣减(秒杀、抢购)
  • 订单状态变更
  • 任何需要确保操作序列化的核心业务
python
import threading
import time

class BankAccountPessimistic:
    """悲观锁示例:银行账户"""
    def __init__(self, balance=1000):
        self.balance = balance
        self.lock = threading.Lock()
  
    def withdraw(self, amount):
        with self.lock:  # 自动获取和释放锁
            if self.balance >= amount:
                time.sleep(0.01)  # 模拟处理时间
                self.balance -= amount
                print(f"取款 {amount},余额 {self.balance}")
                return True
            else:
                print(f"取款 {amount} 失败,余额不足")
                return False
  
    def deposit(self, amount):
        with self.lock:
            time.sleep(0.01)
            self.balance += amount
            print(f"存款 {amount},余额 {self.balance}")

def pessimistic_lock_demo():
    account = BankAccountPessimistic()
  
    def customer_operations():
        for _ in range(5):
            account.withdraw(100)
            account.deposit(50)
  
    threads = []
    for i in range(3):
        t = threading.Thread(target=customer_operations, name=f"Customer-{i}")
        threads.append(t)
        t.start()
  
    for t in threads:
        t.join()
  
    print(f"最终余额: {account.balance}")

乐观锁

假设不会冲突,先操作再检查版本

python
import threading
import time

class BankAccountOptimistic:
    """乐观锁示例:银行账户"""
    def __init__(self, balance=1000):
        self._balance = balance
        self._version = 0
        self._lock = threading.Lock()  # 用于保护版本号
  
    @property
    def balance(self):
        return self._balance
  
    def _update_with_retry(self, operation, *args):
        """带重试的乐观锁更新"""
        for attempt in range(3):  # 重试3次
            current_version = self._version
            current_balance = self._balance
    
            # 执行操作(但不真正修改)
            new_balance, success = operation(current_balance, *args)
    
            if not success:
                return False
    
            # 尝试提交更新
            with self._lock:
                if self._version == current_version:
                    self._balance = new_balance
                    self._version += 1
                    return True
    
            # 版本冲突,重试
            print(f"版本冲突,重试 {attempt + 1}")
            time.sleep(0.001)
  
        return False
  
    def withdraw(self, amount):
        def op(balance, amount):
            if balance >= amount:
                time.sleep(0.01)
                return balance - amount, True
            return balance, False
  
        success = self._update_with_retry(op, amount)
        if success:
            print(f"取款 {amount},余额 {self.balance}")
        else:
            print(f"取款 {amount} 失败")
        return success
  
    def deposit(self, amount):
        def op(balance, amount):
            time.sleep(0.01)
            return balance + amount, True
  
        success = self._update_with_retry(op, amount)
        if success:
            print(f"存款 {amount},余额 {self.balance}")
        return success

def optimistic_lock_demo():
    account = BankAccountOptimistic()
  
    def customer_operations():
        for _ in range(5):
            account.withdraw(100)
            account.deposit(50)
  
    threads = []
    for i in range(3):
        t = threading.Thread(target=customer_operations, name=f"Customer-{i}")
        threads.append(t)
        t.start()
  
    for t in threads:
        t.join()
  
    print(f"最终余额: {account.balance}")

三、死锁避免

死锁产生的四个必要条件

  1. 互斥条件
  2. 请求与保持条件
  3. 不可剥夺条件
  4. 循环等待条件

死锁示例

python
import threading
import time

class DeadlockExample:
    def __init__(self):
        self.lock_a = threading.Lock()
        self.lock_b = threading.Lock()
  
    def method1(self):
        with self.lock_a:
            print(f"{threading.current_thread().name} 获取 lock_a")
            time.sleep(0.1)  # 增加死锁概率
            with self.lock_b:
                print(f"{threading.current_thread().name} 获取 lock_b")
                # 执行操作
  
    def method2(self):
        with self.lock_b:
            print(f"{threading.current_thread().name} 获取 lock_b")
            time.sleep(0.1)
            with self.lock_a:
                print(f"{threading.current_thread().name} 获取 lock_a")
                # 执行操作

def deadlock_demo():
    example = DeadlockExample()
  
    t1 = threading.Thread(target=example.method1, name="Thread-1")
    t2 = threading.Thread(target=example.method2, name="Thread-2")
  
    t1.start()
    t2.start()
  
    t1.join(timeout=2)
    t2.join(timeout=2)
  
    if t1.is_alive() or t2.is_alive():
        print("检测到死锁!")

死锁避免策略

python
import threading
import time
from contextlib import contextmanager

class DeadlockFreeResourceManager:
    """使用锁排序避免死锁"""
  
    def __init__(self):
        self.locks = {
            'A': threading.Lock(),
            'B': threading.Lock(),
            'C': threading.Lock()
        }
        # 定义锁的获取顺序
        self.lock_order = ['A', 'B', 'C']
  
    @contextmanager
    def acquire_locks(self, *lock_names):
        """按预定义顺序获取锁"""
        sorted_locks = sorted(
            lock_names, 
            key=lambda x: self.lock_order.index(x)
        )
  
        acquired_locks = []
        try:
            for lock_name in sorted_locks:
                lock = self.locks[lock_name]
                lock.acquire()
                acquired_locks.append(lock)
                print(f"{threading.current_thread().name} 获取 {lock_name}")
                time.sleep(0.05)
    
            yield  # 执行临界区代码
    
        finally:
            # 按获取的逆序释放锁
            for lock in reversed(acquired_locks):
                lock.release()
                print(f"{threading.current_thread().name} 释放锁")
  
    def transaction1(self):
        with self.acquire_locks('A', 'B'):
            print(f"{threading.current_thread().name} 执行交易1")
            time.sleep(0.1)
  
    def transaction2(self):
        with self.acquire_locks('B', 'C'):
            print(f"{threading.current_thread().name} 执行交易2")
            time.sleep(0.1)
  
    def transaction3(self):
        with self.acquire_locks('A', 'B', 'C'):
            print(f"{threading.current_thread().name} 执行交易3")
            time.sleep(0.1)

def deadlock_avoidance_demo():
    manager = DeadlockFreeResourceManager()
  
    threads = [
        threading.Thread(target=manager.transaction1, name="T1"),
        threading.Thread(target=manager.transaction2, name="T2"),
        threading.Thread(target=manager.transaction3, name="T3")
    ]
  
    for t in threads:
        t.start()
  
    for t in threads:
        t.join()
  
    print("所有交易完成,无死锁!")

使用超时避免死锁

python
import threading
import time

class TimeoutLockExample:
    def __init__(self):
        self.lock_a = threading.Lock()
        self.lock_b = threading.Lock()
  
    def safe_method(self):
        """使用超时避免死锁"""
        attempts = 0
  
        while attempts < 3:
            # 尝试获取第一个锁
            acquired_a = self.lock_a.acquire(timeout=0.1)
            if not acquired_a:
                attempts += 1
                continue
    
            try:
                # 尝试获取第二个锁(带超时)
                acquired_b = self.lock_b.acquire(timeout=0.1)
                if not acquired_b:
                    print(f"{threading.current_thread().name} 获取lock_b超时,释放lock_a重试")
                    self.lock_a.release()
                    attempts += 1
                    time.sleep(0.01)
                    continue
        
                try:
                    # 执行操作
                    print(f"{threading.current_thread().name} 成功获取所有锁")
                    time.sleep(0.05)
                    return True
        
                finally:
                    self.lock_b.release()
    
            finally:
                if acquired_a:
                    self.lock_a.release()
  
        print(f"{threading.current_thread().name} 失败,超过重试次数")
        return False

def timeout_lock_demo():
    example = TimeoutLockExample()
  
    threads = []
    for i in range(5):
        t = threading.Thread(
            target=example.safe_method,
            name=f"Thread-{i}"
        )
        threads.append(t)
        t.start()
  
    for t in threads:
        t.join()

四、协程(Coroutine)

异步IO示例

python
import asyncio
import time
import aiohttp

# 基本协程示例
async def simple_coroutine(name, delay):
    """简单的协程示例"""
    print(f"{name} 开始,等待 {delay} 秒")
    await asyncio.sleep(delay)
    print(f"{name} 结束")
    return f"{name}-结果"

async def basic_coroutine_demo():
    """运行多个协程"""
    start_time = time.time()
  
    # 创建任务
    task1 = asyncio.create_task(simple_coroutine("任务1", 2))
    task2 = asyncio.create_task(simple_coroutine("任务2", 1))
    task3 = asyncio.create_task(simple_coroutine("任务3", 3))
  
    # 等待所有任务完成
    results = await asyncio.gather(task1, task2, task3)
  
    print(f"所有任务完成,结果: {results}")
    print(f"总用时: {time.time() - start_time:.2f}秒")

# 协程与线程/进程对比
async def io_bound_coroutine(id, delay):
    """模拟IO密集型任务"""
    print(f"协程 {id}: 开始IO操作")
    await asyncio.sleep(delay)  # 非阻塞等待
    print(f"协程 {id}: IO操作完成")
    return id

def io_bound_thread(id, delay):
    """线程版本的IO任务"""
    print(f"线程 {id}: 开始IO操作")
    time.sleep(delay)  # 阻塞等待
    print(f"线程 {id}: IO操作完成")
    return id

async def compare_concurrency():
    """对比协程和线程"""
    print("=== 协程并发 ===")
    start = time.time()
    tasks = [io_bound_coroutine(i, 1) for i in range(5)]
    results = await asyncio.gather(*tasks)
    print(f"协程用时: {time.time() - start:.2f}秒")
  
    print("\n=== 线程并发 ===")
    start = time.time()
    import concurrent.futures
    with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
        futures = [executor.submit(io_bound_thread, i, 1) for i in range(5)]
        results = [f.result() for f in futures]
    print(f"线程用时: {time.time() - start:.2f}秒")

# 实际应用:并发HTTP请求
async def fetch_url(session, url, id):
    """并发获取网页"""
    try:
        print(f"任务 {id}: 开始获取 {url}")
        async with session.get(url, timeout=10) as response:
            text = await response.text()
            print(f"任务 {id}: 完成,状态码 {response.status}")
            return len(text)
    except Exception as e:
        print(f"任务 {id}: 错误 - {e}")
        return 0

async def concurrent_http_demo():
    """并发HTTP请求示例"""
    urls = [
        "http://httpbin.org/delay/1",
        "http://httpbin.org/delay/2",
        "http://httpbin.org/delay/1",
        "https://api.github.com",
        "http://httpbin.org/get"
    ]
  
    start_time = time.time()
  
    async with aiohttp.ClientSession() as session:
        tasks = []
        for i, url in enumerate(urls):
            task = fetch_url(session, url, i)
            tasks.append(task)
  
        results = await asyncio.gather(*tasks)
  
    print(f"\n获取 {len(urls)} 个页面,总字符数: {sum(results)}")
    print(f"总用时: {time.time() - start_time:.2f}秒")

# 协程与生成器
def simple_generator():
    """生成器示例"""
    print("开始")
    yield 1
    print("继续")
    yield 2
    print("结束")

async def coroutine_vs_generator():
    """协程与生成器的区别"""
    print("=== 生成器 ===")
    gen = simple_generator()
    print(next(gen))
    print(next(gen))
  
    print("\n=== 协程 ===")
    async def simple_async():
        print("协程开始")
        await asyncio.sleep(0.1)
        print("协程继续")
        await asyncio.sleep(0.1)
        print("协程结束")
  
    await simple_async()

# 高级协程模式:生产者-消费者
async def producer(queue, count):
    """生产者协程"""
    for i in range(count):
        item = f"项目-{i}"
        await queue.put(item)
        print(f"生产: {item}")
        await asyncio.sleep(0.1)
    await queue.put(None)  # 结束信号

async def consumer(queue, id):
    """消费者协程"""
    while True:
        item = await queue.get()
        if item is None:
            queue.put(None)  # 传递给其他消费者
            break
        print(f"消费者{id} 消费: {item}")
        await asyncio.sleep(0.2)

async def producer_consumer_demo():
    """生产者-消费者模式"""
    queue = asyncio.Queue(maxsize=5)
  
    # 创建生产者和消费者任务
    producer_task = asyncio.create_task(producer(queue, 10))
    consumer_tasks = [
        asyncio.create_task(consumer(queue, i))
        for i in range(3)
    ]
  
    # 等待生产者完成
    await producer_task
  
    # 等待消费者处理剩余项目
    await queue.join()
  
    # 取消消费者任务
    for task in consumer_tasks:
        task.cancel()

# 运行所有示例
async def main():
    print("=" * 50)
    print("协程基本示例")
    print("=" * 50)
    await basic_coroutine_demo()
  
    print("\n" + "=" * 50)
    print("协程与线程对比")
    print("=" * 50)
    await compare_concurrency()
  
    print("\n" + "=" * 50)
    print("并发HTTP请求")
    print("=" * 50)
    await concurrent_http_demo()
  
    print("\n" + "=" * 50)
    print("生产者-消费者模式")
    print("=" * 50)
    await producer_consumer_demo()

# 运行主函数
if __name__ == "__main__":
    print("=== 线程 vs 进程示例 ===")
    compare_process_vs_thread()
  
    print("\n=== 进程内存隔离示例 ===")
    process_memory_demo()
  
    print("\n=== 线程内存共享示例 ===")
    thread_memory_demo()
  
    print("\n=== 悲观锁示例 ===")
    pessimistic_lock_demo()
  
    print("\n=== 乐观锁示例 ===")
    optimistic_lock_demo()
  
    print("\n=== 死锁示例 ===")
    deadlock_demo()
  
    print("\n=== 死锁避免示例 ===")
    deadlock_avoidance_demo()
  
    print("\n=== 超时锁示例 ===")
    timeout_lock_demo()
  
    print("\n=== 协程示例 ===")
    asyncio.run(main())

五、总结对比

特性进程线程协程
创建开销极小
内存空间独立共享共享
通信方式IPC复杂共享内存简单直接通信
切换开销极小
并发级别进程级线程级用户级
适用场景CPU密集型I/O密集型高并发I/O
数据安全高(天然隔离)需要同步需要同步
Python实现multiprocessingthreadingasyncio

选择建议:

  1. CPU密集型任务 → 多进程
  2. I/O密集型任务,需要共享状态 → 多线程
  3. 高并发I/O,大量连接 → 协程
  4. 需要避免GIL限制 → 多进程
  5. 简单并行,少量任务 → 线程池

最佳实践:

  1. 避免过早优化:先保证正确性,再优化性能
  2. 合理选择工具:根据任务特性选择并发模型
  3. 注意线程安全:共享数据需要适当同步
  4. 避免死锁:使用锁排序、超时等策略
  5. 资源管理:及时释放资源,避免泄漏

这个全面的示例涵盖了并发编程的核心概念,展示了各种技术的实际应用场景和最佳实践。

技术文档集合