Skip to content
On this page

什么是死锁?

死锁 是指两个或多个线程/进程在执行过程中,因争夺资源而造成的一种相互等待的现象,若无外力干涉,它们都将无法继续执行下去。

死锁的必要条件(Coffman条件)

死锁必须同时满足以下四个条件:

  1. 互斥条件:资源不能被共享,只能被一个进程独占
  2. 占有且等待:进程已占有部分资源,同时请求新资源
  3. 不可抢占:资源不能被强制从持有进程手中剥夺
  4. 循环等待:存在一个进程-资源的循环等待链

Python经典死锁场景举例

场景1:哲学家就餐问题(最经典)

python
import threading
import time

# 经典的死锁场景:哲学家就餐问题
class PhilosopherDining:
    def __init__(self, num_philosophers=5):
        self.forks = [threading.Lock() for _ in range(num_philosophers)]
        self.philosophers = []
  
    def philosopher(self, id):
        """哲学家线程:思考 -> 拿左边叉子 -> 拿右边叉子 -> 吃饭"""
        left_fork = self.forks[id]
        right_fork = self.forks[(id + 1) % len(self.forks)]
  
        while True:
            # 思考
            print(f"哲学家 {id} 正在思考...")
            time.sleep(0.1)
      
            # 尝试拿叉子吃饭 - 这种顺序可能导致死锁!
            print(f"哲学家 {id} 尝试拿左边叉子...")
            left_fork.acquire()
            print(f"哲学家 {id} 拿到了左边叉子")
      
            time.sleep(0.1)  # 增加死锁概率
      
            print(f"哲学家 {id} 尝试拿右边叉子...")
            right_fork.acquire()
            print(f"哲学家 {id} 拿到了右边叉子,开始吃饭")
      
            # 吃饭
            time.sleep(0.1)
            print(f"哲学家 {id} 吃完了,放下叉子")
      
            # 释放叉子
            right_fork.release()
            left_fork.release()

def test_philosopher_deadlock():
    """测试哲学家就餐问题的死锁"""
    print("=== 哲学家就餐问题(可能产生死锁)===")
    dining = PhilosopherDining(5)
  
    # 创建5个哲学家线程
    threads = []
    for i in range(5):
        t = threading.Thread(target=dining.philosopher, args=(i,))
        t.daemon = True  # 设置为守护线程,便于程序退出
        threads.append(t)
  
    for t in threads:
        t.start()
  
    # 运行一段时间观察死锁
    time.sleep(3)
    print("\n⚠️ 如果程序卡住,说明发生了死锁!")

场景2:银行家算法问题(资源竞争)

python
import threading
import time

class BankDeadlock:
    """模拟银行账户转账死锁"""
    def __init__(self):
        self.account_a = threading.Lock()
        self.account_b = threading.Lock()
        self.balance_a = 1000
        self.balance_b = 1000
  
    def transfer_a_to_b(self, amount):
        """从A转账到B - 错误的顺序"""
        print(f"线程1: 尝试锁定账户A...")
        self.account_a.acquire()
        print(f"线程1: 账户A已锁定,余额: {self.balance_a}")
        time.sleep(0.1)  # 模拟处理延迟
  
        print(f"线程1: 尝试锁定账户B...")
        self.account_b.acquire()
        print(f"线程1: 账户B已锁定")
  
        # 执行转账
        if self.balance_a >= amount:
            self.balance_a -= amount
            self.balance_b += amount
            print(f"线程1: 转账成功! A余额: {self.balance_a}, B余额: {self.balance_b}")
  
        self.account_b.release()
        self.account_a.release()
  
    def transfer_b_to_a(self, amount):
        """从B转账到A - 错误的顺序"""
        print(f"线程2: 尝试锁定账户B...")
        self.account_b.acquire()
        print(f"线程2: 账户B已锁定,余额: {self.balance_b}")
        time.sleep(0.1)  # 模拟处理延迟
  
        print(f"线程2: 尝试锁定账户A...")
        self.account_a.acquire()
        print(f"线程2: 账户A已锁定")
  
        # 执行转账
        if self.balance_b >= amount:
            self.balance_b -= amount
            self.balance_a += amount
            print(f"线程2: 转账成功! A余额: {self.balance_a}, B余额: {self.balance_b}")
  
        self.account_a.release()
        self.account_b.release()

def test_bank_transfer_deadlock():
    """测试银行转账死锁"""
    print("\n=== 银行转账死锁示例 ===")
    bank = BankDeadlock()
  
    # 创建两个转账线程
    thread1 = threading.Thread(target=bank.transfer_a_to_b, args=(100,))
    thread2 = threading.Thread(target=bank.transfer_b_to_a, args=(150,))
  
    thread1.start()
    thread2.start()
  
    thread1.join(timeout=2)
    thread2.join(timeout=2)
  
    if thread1.is_alive() or thread2.is_alive():
        print("❌ 检测到死锁!线程无法完成执行")

场景3:嵌套锁(递归锁问题)

python
import threading

class NestedLockDeadlock:
    """嵌套锁导致的死锁"""
    def __init__(self):
        self.lock_a = threading.Lock()
        self.lock_b = threading.Lock()
  
    def method1(self):
        """方法1:先获取lock_a,再获取lock_b"""
        print("方法1: 等待lock_a...")
        self.lock_a.acquire()
        print("方法1: 获得lock_a")
  
        # 模拟一些处理
        import time
        time.sleep(0.1)
  
        print("方法1: 等待lock_b...")
        self.lock_b.acquire()
        print("方法1: 获得lock_b,执行操作")
  
        # 执行操作
        time.sleep(0.1)
  
        self.lock_b.release()
        self.lock_a.release()
        print("方法1: 完成")
  
    def method2(self):
        """方法2:先获取lock_b,再获取lock_a"""
        print("方法2: 等待lock_b...")
        self.lock_b.acquire()
        print("方法2: 获得lock_b")
  
        # 模拟一些处理
        import time
        time.sleep(0.1)
  
        print("方法2: 等待lock_a...")
        self.lock_a.acquire()
        print("方法2: 获得lock_a,执行操作")
  
        # 执行操作
        time.sleep(0.1)
  
        self.lock_a.release()
        self.lock_b.release()
        print("方法2: 完成")

def test_nested_lock_deadlock():
    """测试嵌套锁死锁"""
    print("\n=== 嵌套锁死锁示例 ===")
    obj = NestedLockDeadlock()
  
    thread1 = threading.Thread(target=obj.method1)
    thread2 = threading.Thread(target=obj.method2)
  
    thread1.start()
    thread2.start()
  
    # 等待线程完成(如果发生死锁则不会完成)
    thread1.join(timeout=2)
    thread2.join(timeout=2)
  
    if thread1.is_alive() or thread2.is_alive():
        print("❌ 检测到死锁!")
        print("死锁原因: method1持有lock_a等待lock_b,method2持有lock_b等待lock_a")

死锁解决方案示例

方案1:使用超时机制

python
import threading
import time

class TimeoutSolution:
    """使用超时避免死锁"""
    def __init__(self):
        self.lock_a = threading.Lock()
        self.lock_b = threading.Lock()
  
    def safe_transfer(self, from_account, to_account, amount):
        """安全的转账方法:使用超时"""
        acquired_a = False
        acquired_b = False
  
        try:
            # 尝试获取第一个锁,设置超时
            print(f"尝试获取锁 {from_account}...")
            if from_account == 'A':
                acquired_a = self.lock_a.acquire(timeout=1)
            else:
                acquired_b = self.lock_b.acquire(timeout=1)
      
            if not (acquired_a or acquired_b):
                print("获取第一个锁超时,放弃操作")
                return False
      
            # 获取第二个锁,设置超时
            print(f"尝试获取锁 {to_account}...")
            time.sleep(0.5)  # 模拟延迟增加死锁可能性
      
            if to_account == 'A':
                acquired_a = self.lock_a.acquire(timeout=1)
            else:
                acquired_b = self.lock_b.acquire(timeout=1)
      
            if not (acquired_a and acquired_b):
                print("获取第二个锁超时,释放已持有的锁")
                return False
      
            # 执行转账
            print(f"执行转账: {from_account} -> {to_account} 金额: {amount}")
            return True
      
        finally:
            # 确保释放所有已获取的锁
            if acquired_a:
                self.lock_a.release()
            if acquired_b:
                self.lock_b.release()

方案2:固定锁顺序(最常用解决方案)

python
class OrderedLockSolution:
    """通过固定锁顺序避免死锁"""
    def __init__(self):
        self.account_a = threading.Lock()
        self.account_b = threading.Lock()
        self.balance_a = 1000
        self.balance_b = 1000
  
    def ordered_transfer(self, from_id, to_id, amount):
        """
        使用固定顺序获取锁:
        总是先获取编号小的锁,再获取编号大的锁
        """
        # 确定锁的顺序
        first_lock = self.account_a if from_id < to_id else self.account_b
        second_lock = self.account_b if from_id < to_id else self.account_a
  
        # 按固定顺序获取锁
        print(f"按顺序获取锁: 先锁{from_id if from_id < to_id else to_id}")
        first_lock.acquire()
        try:
            print(f"再锁{to_id if from_id < to_id else from_id}")
            second_lock.acquire()
            try:
                # 执行转账
                print(f"执行转账: 账户{from_id} -> 账户{to_id} 金额: {amount}")
                return True
            finally:
                second_lock.release()
        finally:
            first_lock.release()

def test_ordered_solution():
    """测试有序锁解决方案"""
    print("\n=== 有序锁解决方案 ===")
    bank = OrderedLockSolution()
  
    # 创建多个转账线程
    threads = []
    transfers = [(1, 2, 100), (2, 1, 150), (1, 2, 200)]
  
    for from_id, to_id, amount in transfers:
        t = threading.Thread(
            target=bank.ordered_transfer,
            args=(from_id, to_id, amount)
        )
        threads.append(t)
        t.start()
  
    for t in threads:
        t.join()
  
    print("✅ 所有转账完成,无死锁发生")

方案3:上下文管理器 + 统一锁管理器

python
import threading


class ContextManagerSolution:
    """使用上下文管理器避免死锁的解决方案"""

    def __init__(self):
        self.account_a = threading.Lock()
        self.account_b = threading.Lock()
        self.balance_a = 1000
        self.balance_b = 1000

    def transfer_with_context_manager(self, from_account, to_account, amount):
        """
        方法1:使用上下文管理器 + 统一锁顺序
        这是最安全和推荐的方式
        """
        # 确定锁的顺序:总是先锁编号小的账户
        if from_account == "A" and to_account == "B":
            # A -> B: 先锁A,再锁B
            with self.account_a:
                with self.account_b:
                    # 执行转账
                    if self.balance_a >= amount:
                        self.balance_a -= amount
                        self.balance_b += amount
                        print(
                            f"✅ 转账成功: {from_account} -> {to_account}, 金额: {amount}"
                        )
                        print(
                            f"   账户A余额: {self.balance_a}, 账户B余额: {self.balance_b}"
                        )
                        return True
                    else:
                        print(f"❌ 转账失败: 账户{from_account}余额不足")
                        return False

        elif from_account == "B" and to_account == "A":
            # B -> A: 也先锁A,再锁B(统一顺序!)
            with self.account_a:
                with self.account_b:
                    # 执行转账
                    if self.balance_b >= amount:
                        self.balance_b -= amount
                        self.balance_a += amount
                        print(
                            f"✅ 转账成功: {from_account} -> {to_account}, 金额: {amount}"
                        )
                        print(
                            f"   账户A余额: {self.balance_a}, 账户B余额: {self.balance_b}"
                        )
                        return True
                    else:
                        print(f"❌ 转账失败: 账户{from_account}余额不足")
                        return False


def explain_context_manager():
    """详细解释上下文管理器的工作原理"""
    print("=" * 60)
    print("📚 上下文管理器 (Context Manager) 详解")
    print("=" * 60)

    print("\n1️⃣ 什么是上下文管理器?")
    print("   Python的 `with` 语句用于管理资源,确保资源在使用后正确释放")
    print("   threading.Lock() 支持上下文管理器协议")

    print("\n2️⃣ 手动管理锁 vs 上下文管理器:")
    print("\n   ❌ 手动管理(容易出错):")
    print("      lock.acquire()")
    print("      try:")
    print("          # 执行操作")
    print("      finally:")
    print("          lock.release()  # 如果忘记释放,会导致死锁")

    print("\n   ✅ 上下文管理器(自动管理):")
    print("      with lock:")
    print("          # 执行操作")
    print("      # 自动释放锁,即使发生异常也会释放")

    print("\n3️⃣ 嵌套上下文管理器:")
    print("   with lock_a:")
    print("       with lock_b:")
    print("           # 执行操作")
    print("       # lock_b 自动释放")
    print("   # lock_a 自动释放")

    print("\n4️⃣ 上下文管理器的优势:")
    print("   ✓ 自动释放:即使发生异常也会释放锁")
    print("   ✓ 代码简洁:不需要 try-finally 块")
    print("   ✓ 不易出错:不会忘记释放锁")
    print("   ✓ 可读性强:代码结构清晰")


def test_context_manager_solution():
    """测试上下文管理器解决方案"""
    print("\n" + "=" * 60)
    print("🧪 测试:使用上下文管理器 + 统一锁顺序")
    print("=" * 60)

    bank = ContextManagerSolution()

    def transfer_a_to_b():
        print("\n[线程1] 开始执行: A -> B 转账 100")
        bank.transfer_with_context_manager("A", "B", 100)

    def transfer_b_to_a():
        print("\n[线程2] 开始执行: B -> A 转账 150")
        bank.transfer_with_context_manager("B", "A", 150)

    # 创建两个线程同时执行
    thread1 = threading.Thread(target=transfer_a_to_b)
    thread2 = threading.Thread(target=transfer_b_to_a)

    thread1.start()
    thread2.start()

    thread1.join()
    thread2.join()

    print("\n✅ 所有转账完成,未发生死锁!")
    print(f"最终余额 - 账户A: {bank.balance_a}, 账户B: {bank.balance_b}")


if __name__ == "__main__":
    explain_context_manager()
    test_context_manager_solution()

预防策略:

  1. 固定锁顺序:为所有资源定义全局的获取顺序
  2. 超时机制:设置获取锁的超时时间
  3. 避免嵌套锁:尽量减少锁的嵌套使用
  4. 使用更高级的同步原语:如 threading.RLockthreading.Semaphore

检测方法:

  1. 代码审查:检查锁的获取顺序
  2. 静态分析工具:使用pylint等工具检查
  3. 动态检测:使用线程分析器
  4. 超时监控:设置操作超时,超时则可能存在死锁

实际应用中的建议

python
# 实用模板:安全的资源管理
class SafeResourceManager:
    def __init__(self):
        self.locks = {}
  
    def safe_operation(self, resource_ids):
        """安全地操作多个资源"""
        # 1. 按固定顺序排序资源ID
        sorted_ids = sorted(resource_ids)
  
        # 2. 按顺序获取所有锁
        acquired_locks = []
        try:
            for res_id in sorted_ids:
                lock = self._get_lock(res_id)
                lock.acquire(timeout=5.0)  # 设置超时
                acquired_locks.append(lock)
      
            # 3. 执行操作
            return self._do_operation(resource_ids)
      
        finally:
            # 4. 逆序释放锁(良好实践)
            for lock in reversed(acquired_locks):
                lock.release()
  
    def _get_lock(self, res_id):
        # 获取或创建锁
        pass
  
    def _do_operation(self, resource_ids):
        # 执行实际操作
        pass

总结

死锁是多线程编程中的常见问题,理解其产生条件和解决方案至关重要。在Python中:

  1. 经典死锁场景包括哲学家就餐、银行转账、嵌套锁
  2. 解决方案:固定锁顺序 > 超时机制 > 使用高级同步原语
  3. 最佳实践:总是按相同顺序获取锁,使用 with语句管理锁,设置合理的超时

通过合理的设计和预防,可以大大降低死锁发生的概率。

技术文档集合