Python多进程/多线程读写锁实现:高效管理并发读写共享资源


Python多进程/多线程读写锁实现:高效管理并发读写共享资源

本文深入探讨了在python多进程或多线程环境中,如何高效实现一个允许多个读者并发访问共享资源,同时确保单个作者独占且拥有优先权的读写锁机制。通过自定义`rwlock`类,利用`multiprocessing.joinablequeue`进行进程间同步与通信,解决了传统锁机制在读写并发场景下的局限性,并提供了详细的代码示例和多线程适配方案,旨在帮助开发者构建更健壮、高性能的并发应用。

引言:并发读写共享资源的挑战

在多进程或多线程编程中,管理对共享资源的访问是一个核心问题。当存在一个或多个写入者(Writer)和多个读取者(Reader)同时操作一个共享文件或内存数据时,需要一套严谨的同步机制来保证数据的一致性和程序的正确性。理想的场景是:

  1. 读操作并发性:允许多个读取者同时访问共享资源,以提高效率。
  2. 写操作独占性:当写入者需要修改资源时,必须独占访问,防止其他读者读取到不一致的数据,也防止其他写入者同时修改。
  3. 写入者优先权:当写入者请求访问时,应尽可能快地获得独占权,即使有读者正在进行读取。

传统的互斥锁(如multiprocessing.Lock或threading.Lock)可以保证独占访问,但它会限制所有操作的并发性,即使是读操作也无法并行。而multiprocessing.Condition虽然可以用于进程间的通知,但其设计更侧重于事件通知和等待,难以直接实现多读者并发、单作者独占且有优先权的复杂读写锁逻辑。

核心机制:自定义读写锁 RWLock

为了解决上述挑战,我们可以设计一个自定义的读写锁 RWLock。本方案的核心思想是利用 multiprocessing.JoinableQueue 作为读者与作者之间通信和同步的桥梁,实现一种合作式的读写控制。

RWLock 类设计理念

RWLock 的设计遵循以下原则:

  • 读者等待作者:读者在开始读取前,会等待作者发出“新数据已准备好”的信号。
  • 作者等待读者:作者在开始写入前,会等待所有读者完成对上一轮数据的读取。
  • 作者优先权:作者可以通过设置一个“停止”标志,请求正在读取的读者尽快结束当前操作并释放资源,以便作者能快速获得独占权。

RWLock 类详解

以下是 RWLock 类的详细实现,适用于多进程环境:

from multiprocessing import Process, Lock, Value, JoinableQueue
from threading import local
import time

class RWLock:
    def __init__(self, num_readers: int):
        """
        创建一个支持单写入者和多个读取者的读写锁。
        num_readers: 预期的读取者数量。
        """
        if num_readers < 1 or not isinstance(num_readers, int):
            raise ValueError('num_readers 必须是一个正整数。')

        # _local_storage 用于为每个读者进程/线程分配一个唯一的队列
        self._local_storage = local() 
        self._num_readers = num_readers
        # _queue_count 用于在初始化时为读者分配队列
        self._queue_count = Value('i', 0) 
        # _stop 标志,作者用它来通知读者尽快停止读取
        self._stop = Value('i', 0) 
        # _lock 用于保护 _queue_count 的并发访问
        self._lock = Lock() 
        # 为每个读者创建一个 JoinableQueue
        self._queues = [JoinableQueue(1) for _ in range(self._num_readers)]

    def acquire_for_reading(self) -> None:
        """读者请求共享读取权限。"""
        # 为当前读者进程/线程分配一个队列(如果尚未分配)
        queue = getattr(self._local_storage, 'queue', None)
        if queue is None:
            with self._lock:
                # 使用 _queue_count 确保每个读者获得唯一的队列
                queue = self._queues[self._queue_count.value]
                self._queue_count.value += 1
            self._local_storage.queue = queue

        # 阻塞等待作者放入一个信号,表示有新数据可读
        queue.get() 

    def release_for_reading(self):
        """读者完成共享读取,释放权限。"""
        # 通知队列,读者已完成对当前数据的处理
        self._local_storage.queue.task_done()

    def acquire_for_writing(self, immediate=True):
        """
        作者请求独占写入权限。
        immediate: 如果为True,作者会设置停止标志,请求读者尽快释放。
        """
        if immediate:
            # 设置停止标志,通知读者尽快中断读取
            self._stop.value = 1 

        # 阻塞等待所有读者完成对上一轮数据的处理(通过 task_done)
        # 队列的 join() 方法会等待所有 put() 的任务被 task_done() 标记完成
        for queue in self._queues:
            queue.join()

    def release_for_writing(self) -> None:
        """作者完成独占写入,释放权限。"""
        # 重置停止标志
        self._stop.value = 0  

        # 向所有读者队列放入一个信号,唤醒等待的读者
        for queue in self._queues:
            queue.put(None)

    def is_stop_posted(self) -> bool:
        """
        读者周期性调用此函数,检查作者是否需要立即独占资源。
        """
        return True if self._stop.value else False

工作流程解析

  1. 初始化 (__init__)

    • 为每个预期读者创建一个 JoinableQueue。这些队列初始为空。
    • _queue_count 用于确保每个读者进程/线程在首次调用 acquire_for_reading 时能获得一个唯一的队列。
    • _stop 是一个共享的 Value,作为作者请求读者立即停止的标志。
  2. 读者获取锁 (acquire_for_reading)

    • 每个读者首次调用时,会从 _queues 列表中分配一个专属的 JoinableQueue。
    • 读者调用 queue.get(),由于队列初始为空,读者进程会在此处阻塞,等待作者放入数据。
  3. 读者释放锁 (release_for_reading)

    • 读者完成读取后,调用 queue.task_done()。这会通知其专属队列,之前由作者放入的那个任务(即唤醒信号)已经处理完毕。
  4. 作者获取锁 (acquire_for_writing)

    AI建筑知识问答 AI建筑知识问答

    用人工智能ChatGPT帮你解答所有建筑问题

    AI建筑知识问答 172 查看详情 AI建筑知识问答
    • 作者遍历所有读者的 JoinableQueue,并调用 queue.join()。
    • queue.join() 会阻塞,直到队列中所有通过 put() 添加的任务都被 task_done() 标记为完成。这意味着作者会等待所有读者完成对上一轮数据的读取。
    • 如果 immediate 参数为 True,作者会设置 _stop.value = 1,向读者发出立即停止的信号。
  5. 作者释放锁 (release_for_writing)

    • 作者完成写入后,首先重置 _stop.value = 0。
    • 然后,作者向每个读者的 JoinableQueue 中 put(None)。这会解除之前 acquire_for_reading 中读者 queue.get() 的阻塞状态,允许读者开始读取新数据。
  6. 作者优先权 (is_stop_posted)

    • 为了实现作者的“立即”优先权,读者在进行长时间读取操作时,需要周期性地调用 is_stop_posted()。如果检测到 _stop 标志被设置,读者应立即中断当前读取并调用 release_for_reading() 释放锁。

示例代码:多进程读写场景

以下是一个使用 RWLock 实现多进程读写共享数据的完整示例:

from multiprocessing import Process, Lock, Value, JoinableQueue
from threading import local
import time

# RWLock 类定义如上所示...
class RWLock:
    def __init__(self, num_readers: int):
        if num_readers < 1 or not isinstance(num_readers, int):
            raise ValueError('num_readers 必须是一个正整数。')
        self._local_storage = local()
        self._num_readers = num_readers
        self._queue_count = Value('i', 0)
        self._stop = Value('i', 0)
        self._lock = Lock()
        self._queues = [JoinableQueue(1) for _ in range(self._num_readers)]

    def acquire_for_reading(self) -> None:
        queue = getattr(self._local_storage, 'queue', None)
        if queue is None:
            with self._lock:
                queue = self._queues[self._queue_count.value]
                self._queue_count.value += 1
            self._local_storage.queue = queue
        queue.get()

    def release_for_reading(self):
        self._local_storage.queue.task_done()

    def acquire_for_writing(self, immediate=True):
        if immediate:
            self._stop.value = 1;

        for queue in self._queues:
            queue.join()

    def release_for_writing(self) -> None:
        self._stop.value = 0

        for queue in self._queues:
            queue.put(None)

    def is_stop_posted(self) -> bool:
        return True if self._stop.value else False

# 共享数据类,使用 multiprocessing.Value 实现进程间共享
class SharedData:
    def __init__(self):
        self.value = Value('i', 0, lock=False) # lock=False 表示不使用内部锁,由 RWLock 管理

def reader(rw_lock, id, shared_data):
    while True:
        rw_lock.acquire_for_reading()
        # 模拟长时间读取任务
        # 在读取过程中周期性检查作者是否需要独占
        sleep_time = id / 10 # 不同的读者有不同的模拟读取时间
        for _ in range(10):
            time.sleep(sleep_time)
            if rw_lock.is_stop_posted():
                print(f'读者 {id} 收到停止信号,中断读取。', flush=True)
                break # 中断当前读取循环

        print(f'读者 {id} 完成处理数据: {shared_data.value}', flush=True)
        rw_lock.release_for_reading()
        time.sleep(0.1) # 短暂休眠,避免忙等待

def writer(rw_lock, shared_data):
    while True:
        # 当 shared_data.value 为 3 时,作者请求立即独占
        rw_lock.acquire_for_writing(immediate=(shared_data.value == 3))
        shared_data.value.value += 1 # 修改共享数据
        print(f'作者写入数据: {shared_data.value.value} (时间: {time.time()})', flush=True)
        rw_lock.release_for_writing()
        time.sleep(0.5) # 模拟作者完成写入后的其他工作

def main():
    rw_lock = RWLock(3) # 实例化读写锁,有3个读者
    shared_data = SharedData() # 共享数据

    # 启动读者进程
    for id in range(1, 4):
        Process(target=reader, args=(rw_lock, id, shared_data), daemon=True).start()

    # 启动作者进程
    Process(target=writer, args=(rw_lock, shared_data), daemon=True).start()

    input('按 Enter 键终止程序:\n')

if __name__ == '__main__':
    main()

运行上述代码,你将观察到:

  • 作者会写入数据,然后读者会并发地读取这些数据。
  • 读者会根据其 id 模拟不同的读取时间。
  • 当 shared_data.value 达到 3 时,作者会以 immediate=True 的方式获取锁。此时,正在读取的读者会检查 is_stop_posted() 标志,如果发现被设置,会尽快中断读取并释放锁,从而让作者更快地获得独占权。

多线程适配

上述 RWLock 类是为 multiprocessing 设计的,但其核心逻辑同样适用于多线程环境。只需要将 multiprocessing 相关的原语替换为 threading 和 queue 模块中的对应实现即可。

主要替换点:

  • multiprocessing.Process -> threading.Thread
  • multiprocessing.Lock -> threading.Lock
  • multiprocessing.Value -> 普通 Python int 变量(因为线程共享内存,不需要特殊的共享值类型)
  • multiprocessing.JoinableQueue -> queue.Queue

以下是多线程版本的 RWLockMultiThreading 实现:

from threading import Thread, Lock, local
from queue import Queue
import time

class RWLockMultiThreading:
    def __init__(self, num_readers: int):
        """
        创建一个支持单写入者和多个读取者(多线程)的读写锁。
        """
        if num_readers < 1 or not isinstance(num_readers, int):
            raise ValueError('num_readers 必须是一个正整数。')

        self._local_storage = local()
        self._num_readers = num_readers
        self._queue_count = 0 # 普通int变量,因为线程共享内存
        self._stop = 0        # 普通int变量
        self._lock = Lock()   # threading.Lock
        self._queues = [Queue(1) for _ in range(self._num_readers)] # queue.Queue

    def acquire_for_reading(self) -> None:
        queue = getattr(self._local_storage, 'queue', None)
        if queue is None:
            with self._lock:
                queue = self._queues[self._queue_count]
                self._queue_count += 1
            self._local_storage.queue = queue
        queue.get()

    def release_for_reading(self):
        self._local_storage.queue.task_done()

    def acquire_for_writing(self, immediate=True):
        if immediate:
            self._stop = 1;

        for queue in self._queues:
            queue.join()

    def release_for_writing(self) -> None:
        self._stop = 0

        for queue in self._queues:
            queue.put(None)

    def is_stop_posted(self) -> bool:
        return True if self._stop else False

# 共享数据类,普通Python对象即可
class SharedValue:
    def __init__(self):
        self.value = 0

def reader_thread(rw_lock, id, shared_data):
    while True:
        rw_lock.acquire_for_reading()
        sleep_time = id / 10
        for _ in range(10):
            time.sleep(sleep_time)
            if rw_lock.is_stop_posted():
                print(f'读者线程 {id} 收到停止信号,中断读取。', flush=True)
                break
        print(f'读者线程 {id} 完成处理数据: {shared_data.value}', flush=True)
        rw_lock.release_for_reading()
        time.sleep(0.1)

def writer_thread(rw_lock, shared_data):
    while True:
        rw_lock.acquire_for_writing(immediate=(shared_data.value == 3))
        shared_data.value += 1
        print(f'作者线程写入数据: {shared_data.value} (时间: {time.time()})', flush=True)
        rw_lock.release_for_writing()
        time.sleep(0.5)

def main_thread():
    rw_lock = RWLockMultiThreading(3)
    shared_data = SharedValue()
    for id in range(1, 4):
        Thread(target=reader_thread, args=(rw_lock, id, shared_data), daemon=True).start()
    Thread(target=writer_thread, args=(rw_lock, shared_data), daemon=True).start()
    input('按 Enter 键终止程序:\n')

if __name__ == '__main__':
    main_thread()

注意事项与最佳实践

  1. 读者协作性:作者的“立即”优先权依赖于读者进程/线程的协作。如果读者在执行长时间任务时不检查 is_stop_posted() 标志,作者将无法强制其立即释放资源,只能等待读者自然完成。因此,在实际应用中,读者任务应设计成可中断的,并在关键点检查此标志。
  2. 数据一致性:此 RWLock 确保了写入时的独占性,即当作者写入时,没有读者正在读取,也没有其他作者(本设计只支持一个作者)正在写入。同时,它也保证了读者总是读取到作者完整写入后的数据,避免了读取到不一致的中间状态。
  3. 资源管理:务必确保 acquire_for_reading() 和 acquire_for_writing() 之后,都对应地调用 release_for_reading() 和 release_for_writing(),即使在异常情况下也应如此(例如使用 try...finally 结构),以避免死锁或资源泄露。 4

以上就是Python多进程/多线程读写锁实现:高效管理并发读写共享资源的详细内容,更多请关注其它相关文章!


# 首次  # 网站推广容易做吗现在  # 网站建设怎样容易  # 资深seo待遇  # SEO计费系统的源码  # 华人博学网站建设方案  # 赣州seo讲文兄  # 合肥网站导航栏优化  # 工业品推广营销广告  # 面膜推广策划营销  # 知名的seo电话  # 适用于  # 浮点  # python  # 知识问答  # 自定义  # 长时间  # 创建一个  # 多个  # 多线程  # 是一个  # red  # 同步机制  # 并发访问  # ai 


相关栏目: 【 Google疑问12 】 【 Facebook疑问10 】 【 优化推广96088 】 【 技术知识133117 】 【 IDC资讯59369 】 【 网络运营7196 】 【 IT资讯61894


相关推荐: 猫眼电影app怎么查询电影院的营业时间_猫眼电影影院营业时间查询教程  5G和6G的连接密度有什么区别 6G每平方公里能连接多少设备  《原神》月之一版本新增书籍一览  电脑“无法访问指定设备、路径或文件”怎么办?五种权限设置方法  Windows 11怎么删除恢复分区_Windows 11使用Diskpart命令强行删除分区  Django模型动态关联检查:高效管理复杂关系  抖音如何进行蓝V认证 抖音企业号申请所需资料与流程  Golang中的rune与byte类型区别是什么_Golang字符与字节处理详解  《360浏览器》设置摄像头权限方法  喜茶GO更换登录账号方法  海棠书屋官方在线书籍入口 海棠书屋文学作品浏览官网链接  怎样让Windows 11的开始菜单恢复经典样式_Open-Shell工具使用指南【怀旧】  微星主板BIOS怎么调整内存时序_内存参数手动优化BIOS设置教程  win11怎么更改账户类型 Win11标准用户和管理员权限切换【教程】  Win11怎么设置分辨率 Win11显示设置调整分辨率及刷新率修改  Go语言中方法与接收器:指针和值类型的调用机制详解  疯狂小鸟微信小游戏入口 疯狂小鸟网页版秒玩  抖音号显示企业机构号是什么意思?企业机构号申请条件是什么?  B站怎么快速升级 B站用户等级提升攻略【详解】  PDF如何批量加注释_PDF多文件批注高亮操作教程  百度浏览器无法安装扩展程序_百度浏览器插件安装失败原因解析  C++如何使用CMake构建项目_C++ CMakeLists.txt编写入门教程  windows10怎么关闭自动安装应用_windows10禁止推广应用下载  《虎扑》关闭社区内容推荐方法  Python定时发送QQ消息  《procreate》绘制渐变效果教程  Lar*el怎么实现全文搜索_Lar*el Scout集成Algolia教程  解决C#跨线程访问XML对象的异常 安全的并发XML处理模式  极兔快递官网查询入口手机版 手机极兔快递登录查询入口官方  《万兴喵影》导出视频方法  小米civi如何设置锁屏时间  mysql镜像配置如何设置用户权限组_mysql镜像配置用户组与权限分级管理方法  SQLAlchemy 2.0 与 Pydantic 模型类型安全集成指南  抖音猜你想搜能说明对方搜过吗  Linux如何自动分析系统异常日志_Linux日志智能检测  在React中正确处理HTML input type="number"的数值类型  HTML与J*aScript实现下拉菜单驱动的动态表格:构建交互式维修表单  Animex动漫社正版在线入口 Animex动漫社动漫官方观看网  12306夜间购票失败? | 查看官方公布的暂停服务公告与应对方案  手机耗电快是什么原因 延长手机电池续航时间的设置方法【详解】  c++如何链接Boost库_c++准标准库的集成与使用  Python中处理嵌套字典与列表的数据提取与过滤教程  sublime如何撤销关闭的标签页_sublime重新打开已关闭文件技巧  iPhone12是否要更新ios16  电脑没有声音了怎么办 电脑声音问题的全面排查与修复指南【详解】  iSpring三分屏制作教程  附近酒吧怎么找?  J*aScript事件处理:优化键盘输入与表单提交的实践指南  C#中的Record类型有什么优势?C# 9新特性Record与Class的用法区别  多闪电脑版下载_多闪PC端模拟器使用 

 2025-10-27

了解您产品搜索量及市场趋势,制定营销计划

同行竞争及网站分析保障您的广告效果

点击免费数据支持

提交您的需求,1小时内享受我们的专业解答。

运城市盐湖区信雨科技有限公司


运城市盐湖区信雨科技有限公司

运城市盐湖区信雨科技有限公司是一家深耕海外推广领域十年的专业服务商,作为谷歌推广与Facebook广告全球合作伙伴,聚焦外贸企业出海痛点,以数字化营销为核心,提供一站式海外营销解决方案。公司凭借十年行业沉淀与平台官方资源加持,打破传统外贸获客壁垒,助力企业高效开拓全球市场,成为中小企业出海的可靠合作伙伴。

 8156699

 13765294890

 8156699@qq.com

Notice

We and selected third parties use cookies or similar technologies for technical purposes and, with your consent, for other purposes as specified in the cookie policy.
You can consent to the use of such technologies by closing this notice, by interacting with any link or button outside of this notice or by continuing to browse otherwise.