Appearance
什么是死锁?
死锁 是指两个或多个线程/进程在执行过程中,因争夺资源而造成的一种相互等待的现象,若无外力干涉,它们都将无法继续执行下去。
死锁的必要条件(Coffman条件)
死锁必须同时满足以下四个条件:
- 互斥条件:资源不能被共享,只能被一个进程独占
- 占有且等待:进程已占有部分资源,同时请求新资源
- 不可抢占:资源不能被强制从持有进程手中剥夺
- 循环等待:存在一个进程-资源的循环等待链
Python经典死锁场景举例
场景1:哲学家就餐问题(最经典)
python
import threading
import time
# 经典的死锁场景:哲学家就餐问题
class PhilosopherDining:
def __init__(self, num_philosophers=5):
self.forks = [threading.Lock() for _ in range(num_philosophers)]
self.philosophers = []
def philosopher(self, id):
"""哲学家线程:思考 -> 拿左边叉子 -> 拿右边叉子 -> 吃饭"""
left_fork = self.forks[id]
right_fork = self.forks[(id + 1) % len(self.forks)]
while True:
# 思考
print(f"哲学家 {id} 正在思考...")
time.sleep(0.1)
# 尝试拿叉子吃饭 - 这种顺序可能导致死锁!
print(f"哲学家 {id} 尝试拿左边叉子...")
left_fork.acquire()
print(f"哲学家 {id} 拿到了左边叉子")
time.sleep(0.1) # 增加死锁概率
print(f"哲学家 {id} 尝试拿右边叉子...")
right_fork.acquire()
print(f"哲学家 {id} 拿到了右边叉子,开始吃饭")
# 吃饭
time.sleep(0.1)
print(f"哲学家 {id} 吃完了,放下叉子")
# 释放叉子
right_fork.release()
left_fork.release()
def test_philosopher_deadlock():
"""测试哲学家就餐问题的死锁"""
print("=== 哲学家就餐问题(可能产生死锁)===")
dining = PhilosopherDining(5)
# 创建5个哲学家线程
threads = []
for i in range(5):
t = threading.Thread(target=dining.philosopher, args=(i,))
t.daemon = True # 设置为守护线程,便于程序退出
threads.append(t)
for t in threads:
t.start()
# 运行一段时间观察死锁
time.sleep(3)
print("\n⚠️ 如果程序卡住,说明发生了死锁!")
场景2:银行家算法问题(资源竞争)
python
import threading
import time
class BankDeadlock:
"""模拟银行账户转账死锁"""
def __init__(self):
self.account_a = threading.Lock()
self.account_b = threading.Lock()
self.balance_a = 1000
self.balance_b = 1000
def transfer_a_to_b(self, amount):
"""从A转账到B - 错误的顺序"""
print(f"线程1: 尝试锁定账户A...")
self.account_a.acquire()
print(f"线程1: 账户A已锁定,余额: {self.balance_a}")
time.sleep(0.1) # 模拟处理延迟
print(f"线程1: 尝试锁定账户B...")
self.account_b.acquire()
print(f"线程1: 账户B已锁定")
# 执行转账
if self.balance_a >= amount:
self.balance_a -= amount
self.balance_b += amount
print(f"线程1: 转账成功! A余额: {self.balance_a}, B余额: {self.balance_b}")
self.account_b.release()
self.account_a.release()
def transfer_b_to_a(self, amount):
"""从B转账到A - 错误的顺序"""
print(f"线程2: 尝试锁定账户B...")
self.account_b.acquire()
print(f"线程2: 账户B已锁定,余额: {self.balance_b}")
time.sleep(0.1) # 模拟处理延迟
print(f"线程2: 尝试锁定账户A...")
self.account_a.acquire()
print(f"线程2: 账户A已锁定")
# 执行转账
if self.balance_b >= amount:
self.balance_b -= amount
self.balance_a += amount
print(f"线程2: 转账成功! A余额: {self.balance_a}, B余额: {self.balance_b}")
self.account_a.release()
self.account_b.release()
def test_bank_transfer_deadlock():
"""测试银行转账死锁"""
print("\n=== 银行转账死锁示例 ===")
bank = BankDeadlock()
# 创建两个转账线程
thread1 = threading.Thread(target=bank.transfer_a_to_b, args=(100,))
thread2 = threading.Thread(target=bank.transfer_b_to_a, args=(150,))
thread1.start()
thread2.start()
thread1.join(timeout=2)
thread2.join(timeout=2)
if thread1.is_alive() or thread2.is_alive():
print("❌ 检测到死锁!线程无法完成执行")
场景3:嵌套锁(递归锁问题)
python
import threading
class NestedLockDeadlock:
"""嵌套锁导致的死锁"""
def __init__(self):
self.lock_a = threading.Lock()
self.lock_b = threading.Lock()
def method1(self):
"""方法1:先获取lock_a,再获取lock_b"""
print("方法1: 等待lock_a...")
self.lock_a.acquire()
print("方法1: 获得lock_a")
# 模拟一些处理
import time
time.sleep(0.1)
print("方法1: 等待lock_b...")
self.lock_b.acquire()
print("方法1: 获得lock_b,执行操作")
# 执行操作
time.sleep(0.1)
self.lock_b.release()
self.lock_a.release()
print("方法1: 完成")
def method2(self):
"""方法2:先获取lock_b,再获取lock_a"""
print("方法2: 等待lock_b...")
self.lock_b.acquire()
print("方法2: 获得lock_b")
# 模拟一些处理
import time
time.sleep(0.1)
print("方法2: 等待lock_a...")
self.lock_a.acquire()
print("方法2: 获得lock_a,执行操作")
# 执行操作
time.sleep(0.1)
self.lock_a.release()
self.lock_b.release()
print("方法2: 完成")
def test_nested_lock_deadlock():
"""测试嵌套锁死锁"""
print("\n=== 嵌套锁死锁示例 ===")
obj = NestedLockDeadlock()
thread1 = threading.Thread(target=obj.method1)
thread2 = threading.Thread(target=obj.method2)
thread1.start()
thread2.start()
# 等待线程完成(如果发生死锁则不会完成)
thread1.join(timeout=2)
thread2.join(timeout=2)
if thread1.is_alive() or thread2.is_alive():
print("❌ 检测到死锁!")
print("死锁原因: method1持有lock_a等待lock_b,method2持有lock_b等待lock_a")
死锁解决方案示例
方案1:使用超时机制
python
import threading
import time
class TimeoutSolution:
"""使用超时避免死锁"""
def __init__(self):
self.lock_a = threading.Lock()
self.lock_b = threading.Lock()
def safe_transfer(self, from_account, to_account, amount):
"""安全的转账方法:使用超时"""
acquired_a = False
acquired_b = False
try:
# 尝试获取第一个锁,设置超时
print(f"尝试获取锁 {from_account}...")
if from_account == 'A':
acquired_a = self.lock_a.acquire(timeout=1)
else:
acquired_b = self.lock_b.acquire(timeout=1)
if not (acquired_a or acquired_b):
print("获取第一个锁超时,放弃操作")
return False
# 获取第二个锁,设置超时
print(f"尝试获取锁 {to_account}...")
time.sleep(0.5) # 模拟延迟增加死锁可能性
if to_account == 'A':
acquired_a = self.lock_a.acquire(timeout=1)
else:
acquired_b = self.lock_b.acquire(timeout=1)
if not (acquired_a and acquired_b):
print("获取第二个锁超时,释放已持有的锁")
return False
# 执行转账
print(f"执行转账: {from_account} -> {to_account} 金额: {amount}")
return True
finally:
# 确保释放所有已获取的锁
if acquired_a:
self.lock_a.release()
if acquired_b:
self.lock_b.release()
方案2:固定锁顺序(最常用解决方案)
python
class OrderedLockSolution:
"""通过固定锁顺序避免死锁"""
def __init__(self):
self.account_a = threading.Lock()
self.account_b = threading.Lock()
self.balance_a = 1000
self.balance_b = 1000
def ordered_transfer(self, from_id, to_id, amount):
"""
使用固定顺序获取锁:
总是先获取编号小的锁,再获取编号大的锁
"""
# 确定锁的顺序
first_lock = self.account_a if from_id < to_id else self.account_b
second_lock = self.account_b if from_id < to_id else self.account_a
# 按固定顺序获取锁
print(f"按顺序获取锁: 先锁{from_id if from_id < to_id else to_id}")
first_lock.acquire()
try:
print(f"再锁{to_id if from_id < to_id else from_id}")
second_lock.acquire()
try:
# 执行转账
print(f"执行转账: 账户{from_id} -> 账户{to_id} 金额: {amount}")
return True
finally:
second_lock.release()
finally:
first_lock.release()
def test_ordered_solution():
"""测试有序锁解决方案"""
print("\n=== 有序锁解决方案 ===")
bank = OrderedLockSolution()
# 创建多个转账线程
threads = []
transfers = [(1, 2, 100), (2, 1, 150), (1, 2, 200)]
for from_id, to_id, amount in transfers:
t = threading.Thread(
target=bank.ordered_transfer,
args=(from_id, to_id, amount)
)
threads.append(t)
t.start()
for t in threads:
t.join()
print("✅ 所有转账完成,无死锁发生")
方案3:上下文管理器 + 统一锁管理器
python
import threading
class ContextManagerSolution:
"""使用上下文管理器避免死锁的解决方案"""
def __init__(self):
self.account_a = threading.Lock()
self.account_b = threading.Lock()
self.balance_a = 1000
self.balance_b = 1000
def transfer_with_context_manager(self, from_account, to_account, amount):
"""
方法1:使用上下文管理器 + 统一锁顺序
这是最安全和推荐的方式
"""
# 确定锁的顺序:总是先锁编号小的账户
if from_account == "A" and to_account == "B":
# A -> B: 先锁A,再锁B
with self.account_a:
with self.account_b:
# 执行转账
if self.balance_a >= amount:
self.balance_a -= amount
self.balance_b += amount
print(
f"✅ 转账成功: {from_account} -> {to_account}, 金额: {amount}"
)
print(
f" 账户A余额: {self.balance_a}, 账户B余额: {self.balance_b}"
)
return True
else:
print(f"❌ 转账失败: 账户{from_account}余额不足")
return False
elif from_account == "B" and to_account == "A":
# B -> A: 也先锁A,再锁B(统一顺序!)
with self.account_a:
with self.account_b:
# 执行转账
if self.balance_b >= amount:
self.balance_b -= amount
self.balance_a += amount
print(
f"✅ 转账成功: {from_account} -> {to_account}, 金额: {amount}"
)
print(
f" 账户A余额: {self.balance_a}, 账户B余额: {self.balance_b}"
)
return True
else:
print(f"❌ 转账失败: 账户{from_account}余额不足")
return False
def explain_context_manager():
"""详细解释上下文管理器的工作原理"""
print("=" * 60)
print("📚 上下文管理器 (Context Manager) 详解")
print("=" * 60)
print("\n1️⃣ 什么是上下文管理器?")
print(" Python的 `with` 语句用于管理资源,确保资源在使用后正确释放")
print(" threading.Lock() 支持上下文管理器协议")
print("\n2️⃣ 手动管理锁 vs 上下文管理器:")
print("\n ❌ 手动管理(容易出错):")
print(" lock.acquire()")
print(" try:")
print(" # 执行操作")
print(" finally:")
print(" lock.release() # 如果忘记释放,会导致死锁")
print("\n ✅ 上下文管理器(自动管理):")
print(" with lock:")
print(" # 执行操作")
print(" # 自动释放锁,即使发生异常也会释放")
print("\n3️⃣ 嵌套上下文管理器:")
print(" with lock_a:")
print(" with lock_b:")
print(" # 执行操作")
print(" # lock_b 自动释放")
print(" # lock_a 自动释放")
print("\n4️⃣ 上下文管理器的优势:")
print(" ✓ 自动释放:即使发生异常也会释放锁")
print(" ✓ 代码简洁:不需要 try-finally 块")
print(" ✓ 不易出错:不会忘记释放锁")
print(" ✓ 可读性强:代码结构清晰")
def test_context_manager_solution():
"""测试上下文管理器解决方案"""
print("\n" + "=" * 60)
print("🧪 测试:使用上下文管理器 + 统一锁顺序")
print("=" * 60)
bank = ContextManagerSolution()
def transfer_a_to_b():
print("\n[线程1] 开始执行: A -> B 转账 100")
bank.transfer_with_context_manager("A", "B", 100)
def transfer_b_to_a():
print("\n[线程2] 开始执行: B -> A 转账 150")
bank.transfer_with_context_manager("B", "A", 150)
# 创建两个线程同时执行
thread1 = threading.Thread(target=transfer_a_to_b)
thread2 = threading.Thread(target=transfer_b_to_a)
thread1.start()
thread2.start()
thread1.join()
thread2.join()
print("\n✅ 所有转账完成,未发生死锁!")
print(f"最终余额 - 账户A: {bank.balance_a}, 账户B: {bank.balance_b}")
if __name__ == "__main__":
explain_context_manager()
test_context_manager_solution()
预防策略:
- 固定锁顺序:为所有资源定义全局的获取顺序
- 超时机制:设置获取锁的超时时间
- 避免嵌套锁:尽量减少锁的嵌套使用
- 使用更高级的同步原语:如
threading.RLock、threading.Semaphore
检测方法:
- 代码审查:检查锁的获取顺序
- 静态分析工具:使用pylint等工具检查
- 动态检测:使用线程分析器
- 超时监控:设置操作超时,超时则可能存在死锁
实际应用中的建议
python
# 实用模板:安全的资源管理
class SafeResourceManager:
def __init__(self):
self.locks = {}
def safe_operation(self, resource_ids):
"""安全地操作多个资源"""
# 1. 按固定顺序排序资源ID
sorted_ids = sorted(resource_ids)
# 2. 按顺序获取所有锁
acquired_locks = []
try:
for res_id in sorted_ids:
lock = self._get_lock(res_id)
lock.acquire(timeout=5.0) # 设置超时
acquired_locks.append(lock)
# 3. 执行操作
return self._do_operation(resource_ids)
finally:
# 4. 逆序释放锁(良好实践)
for lock in reversed(acquired_locks):
lock.release()
def _get_lock(self, res_id):
# 获取或创建锁
pass
def _do_operation(self, resource_ids):
# 执行实际操作
pass
总结
死锁是多线程编程中的常见问题,理解其产生条件和解决方案至关重要。在Python中:
- 经典死锁场景包括哲学家就餐、银行转账、嵌套锁
- 解决方案:固定锁顺序 > 超时机制 > 使用高级同步原语
- 最佳实践:总是按相同顺序获取锁,使用
with语句管理锁,设置合理的超时
通过合理的设计和预防,可以大大降低死锁发生的概率。