
| """ 实现基于mmap的内存共享机制的锁机制 file -> memory
example: instance = MemoryShareLock() with instance as f: doing something
上下文中操作都是原子性的, 因此勿将大段代码放到上下文环境中, 只考虑并发过程可能会导致数据不一致的情况下。
***注意: 1.开发中, 尽量不要让一个进程同时拥有多把锁, 如果非要同时具备多把锁, 请自行处理好解锁时机, 以防出现死锁! 2.加锁和解锁过程请务必在同一个指令中完成! ***
"""
import functools import typing as t import os import mmap from util.fileimp import mmap_path from filelock import FileLock
def file_lock(func: t.Callable): """ 基于共享内存的文件锁装饰器, 锁函数体, 增加加锁后统一预处理, 解锁前统一后处理。 1.如果能某进程拿到针对指定文件描述符的锁, 则往下执行; 没拿到排它锁的进程将阻塞, 直到拿到锁, 向下执行。 2.锁住被修饰的函数整体。 3.基于共享内存实现文件读写, 因此定义prefix和suffix的函数中第一个参数必须看做一个mmap对象。 对文件的操作基于mmap进行,而不是普通的文件对象。
用法: share_lock = MemoryShareLock(filename='test.txt', length=10, tag_name='共享内存涉及的说明')
1.first example: @file_lock def func(instance): doing something...
func(share_lock)
2.second example: def prefix(mmap, *args, **kwargs): print('加锁后, 预处理函数')
def suffix(mmap, *args, **kwargs): print('解锁前, 后处理函数')
@file_lock def func(instance): doing something...
func(share_lock, prefix=prefix, suffix=prefix, prefix_args=('name', 'age'), suffix_args=('name', 'age'), prefix_kwargs={"hobby":"swim"}, suffix_kwargs={"love":"person"})
"""
@functools.wraps(func) def inner( instance: t.Any, prefix: t.Optional[t.Callable] = None, suffix: t.Optional[t.Callable] = None, prefix_args: t.Optional[t.Tuple] = None, suffix_args: t.Optional[t.Tuple] = None, prefix_kwargs: t.Optional[t.Dict] = None, suffix_kwargs: t.Optional[t.Dict] = None ): file_obj = open(instance.filename, 'r+', encoding='utf-8') mmap_obj = mmap.mmap(file_obj.fileno(), instance.length, tagname=instance.tag_name) lock = FileLock(f'{instance.filename}.lock', timeout=instance.timeout) with lock.acquire(): if prefix: prefix(mmap_obj, *prefix_args, **prefix_kwargs) func(instance) if suffix: suffix(mmap_obj, *suffix_args, **suffix_kwargs)
return inner
def inject_dependence(func): """ 依赖注入装饰器 """
@functools.wraps(func) def inner(instance, *args, **kwargs): dependence: str = func.__name__ func_name = dependence.split('_')[0] if dependence.split('_')[0]: _func = getattr(instance, f'{func_name}') _args = getattr(instance, f'{func_name}_args') or () _kwargs = getattr(instance, f'{func_name}_kwargs') or {} if _func: _func(*_args, **_kwargs) func(instance, *args, **kwargs)
return inner
class MemoryShareLock(object): """ 内存共享类 1.使用__slots__减少因多次读取共享内存而带来过多额外实例对象的属性空间的内存靠小。 2.不能仅仅对打开文件加锁, 而应该对内存读写 + 刷到磁盘的这一过程加锁, 确保内存中的数据不被多个进程同时污染。 3.操作同一块 (文件-> 内存的映射) 确保为单例。 4.生成锁请在此文件末尾生成。 5.继承MemoryShareLock的锁类, 应当提供自定义的pre_treatment, after_treatment函数, 并使用 依赖注入装饰器装饰。 6.实现加锁后预处理和解锁前后处理的依赖注入。 7.允许手动添加依赖, *注意: 由于实例全局唯一, 因此属于该实例的依赖也全局唯一。
技术说明: 相比于直接采用文件的排斥锁来说, 使用基于mmap的内存共享 + 排斥锁, 读写文件只需一次数据拷贝, 磁盘 -> 内存 and 内存 -> 磁盘。而普通的读写文件时会加入提高读写效率, 保护磁盘的页缓存机制, 缺点会 导致读写文件各两次数据拷贝。
"""
__slots__ = ( 'filename', 'length', 'tag_name', 'mmap_obj', 'file_obj', 'timeout', 'lock', 'prefix', 'suffix', 'prefix_args', 'suffix_args', 'prefix_kwargs', 'suffix_kwargs', )
map_instance = {}
def __enter__(self): file_obj = open(self.filename, 'w+', encoding='utf-8') mmap_obj = mmap.mmap(file_obj.fileno(), self.length, tagname=self.tag_name) lock = FileLock(f'{self.filename}.lock', timeout=self.timeout) setattr(self, 'mmap_obj', mmap_obj) setattr(self, 'lock', lock) lock.acquire() self.prefix_treatment() return self
def __exit__(self, exc_type, exc_val, exc_tb): mmap_obj = getattr(self, 'mmap_obj') lock = getattr(self, 'lock') self.suffix_treatment() mmap_obj.flush() mmap_obj.close() lock.release()
def __new__( cls, filename: str, *args, **kwargs ): if not cls.map_instance.setdefault(filename, None): instance = super().__new__(cls) cls.map_instance[filename] = instance return cls.map_instance.get(filename)
def __init__( self, filename: str, length: int, tag_name: str, timeout: int = -1, prefix: t.Optional[t.Callable] = None, suffix: t.Optional[t.Callable] = None, prefix_args: t.Optional[t.Tuple] = None, suffix_args: t.Optional[t.Tuple] = None, prefix_kwargs: t.Optional[t.Dict] = None, suffix_kwargs: t.Optional[t.Dict] = None ) -> None: """ 初始化对象 """ self.filename = os.path.join(mmap_path, filename) self.length = length self.tag_name = tag_name self.timeout = timeout self.prefix = prefix self.suffix = suffix self.prefix_args = prefix_args self.suffix_args = suffix_args self.prefix_kwargs = prefix_kwargs self.suffix_kwargs = suffix_kwargs
@inject_dependence def prefix_treatment(self): """ 预处理 读写 多进程共享的内存块, 进/线程安全的操作 """ mmap_obj: mmap.mmap = getattr(self, 'mmap_obj') mmap_obj.seek(0) mmap_obj.write("1".encode(encoding='utf-8'))
@inject_dependence def suffix_treatment(self): """ 后处理 读写 多进程共享的内存块, 进/线程安全的操作 """ mmap_obj: mmap.mmap = getattr(self, 'mmap_obj') mmap_obj.seek(0) mmap_obj.write("0".encode(encoding='utf-8'))
def add_suffix_dependence(self, suffix, *args, **kwargs): """ 增加后处理依赖 """ self.suffix = suffix self.suffix_args = args self.suffix_kwargs = kwargs
def add_prefix_dependence(self, prefix, *args, **kwargs): """ 增加预处理依赖 """ self.prefix = prefix self.prefix_args = args self.prefix_kwargs = kwargs
class FileWriteLock(MemoryShareLock): """ 自定义文件锁 """
@inject_dependence def pre_treatment(self): pass
@inject_dependence def after_treatment(self): pass
switch_window_lock = MemoryShareLock('switch_window.txt', 10, tag_name='切换窗口锁') auto_gui_lock = MemoryShareLock('mouse_move.txt', 10, tag_name='鼠标移动锁') file_write_lock = FileWriteLock('test_write.json', 1000, tag_name='文件写锁') input_lock = MemoryShareLock('input.txt', 100, tag_name='输入框锁') js_execute_lock = MemoryShareLock('js.txt', 100, tag_name='js执行锁')
|