Redis解决网络抖动问题
所谓网络抖动问题, 简单来说就是防止用户短暂的时间内对同一个接口多次点击访问
这里利用的是redis锁的原子性和with Statement上下文管理器实现, 另外该类还支持协程, 可使用async with 调用
1. 源码
FuncDefine.py
def clear_all_lock(PREFIX='lock'): keys = redis_operator.get_redis_keys_pattern(PREFIX + '*') for key in keys: if isinstance(key, bytes): kwl_py_write_log(key.decode(encoding='utf-8'), msgid='del_redis_key') redis_operator.delete_redis_key(key.decode(encoding='utf-8'), 1) def unlock(key): redis_operator.delete_redis_key(key, 1) class RedisLock: DEFAULT_VALUE = 1 PREFIX = 'lock' def __init__(self, key, lock_time=300): """ 初始化redis锁 :param key: 关键字 :param lock_time: 上锁时间 5min """ self._key = RedisLock.PREFIX + key self.lock_time = lock_time self.hold_lock = False @property def key(self): return self._key @key.setter def key(self, key): self._key = RedisLock.PREFIX + key def __enter__(self): self.hold_lock = self.acquire() return self def __exit__(self, exc_type, exc_val, exc_tb): if self.hold_lock: self.release() return False async def __aenter__(self): self.hold_lock = await self.acquire_cas_lock() return self async def __aexit__(self, exc_type, exc_val, exc_tb): if self.hold_lock: self.release() return False async def acquire_cas_lock(self, lock_time=60): try: return await asyncio.wait_for(self.acquire_lock_until_succ(), lock_time) except asyncio.TimeoutError as e: return False async def acquire_lock_until_succ(self): while redis_operator.set_redis_key_ex_nx(self.key, self.DEFAULT_VALUE, self.lock_time) is not True: # redis return is None or other await asyncio.sleep(0.01) return True def acquire(self): """ 设置redis锁 :param key: :param lock_time: :return: """ try: return redis_operator.set_redis_key_ex_nx(self.key, self.DEFAULT_VALUE, self.lock_time) is True except Exception: return False def release(self): redis_operator.delete_redis_key(self.key, 1)
redis_operator.py
import redis from redisConfig import * # ------------------------------------------------------------------------------------------------------ # 主从redis,个数一一对应 g_main_redis_pool_list = [] g_slave_redis_pool_list = [] g_main_redis_is_ok = [] # 主redis是否可用True为主ok g_slave_redis_is_ok = [] # 从redis是否可用 for each_redis in g_main_redis_server: redis_pool = redis.ConnectionPool(host=each_redis[0], port=each_redis[1], password=each_redis[2], socket_timeout=8, socket_connect_timeout=5) g_main_redis_pool_list.append(redis_pool) g_main_redis_is_ok.append(True) for each_redis in g_slave_redis_server: redis_pool = redis.ConnectionPool(host=each_redis[0], port=each_redis[1], password=each_redis[2], socket_timeout=8, socket_connect_timeout=5) g_slave_redis_pool_list.append(redis_pool) g_slave_redis_is_ok.append(True) def get_redis_by_key(strkey, nums): return (ord(strkey[0]) + ord(strkey[-1])) % nums # 从redis取 def get_redis_key(key): # 根据key来分库 index = get_redis_by_key(key, len(g_main_redis_pool_list)) if g_main_redis_is_ok[index]: # 主ok try: return redis.Redis(connection_pool=g_main_redis_pool_list[index]).get(key) except Exception: # 主标记为挂 g_main_redis_is_ok[index] = False # 主挂了试试从能不能用 g_slave_redis_is_ok[index] = True if g_slave_redis_is_ok[index]: # 从ok try: return redis.Redis(connection_pool=g_slave_redis_pool_list[index]).get(key) except Exception as e: # 从标记为挂 g_slave_redis_is_ok[index] = False # 从也挂了下回只能尝试使用主 g_main_redis_is_ok[index] = True # 抛出异常 raise Exception(repr(e)) # 按理不可能出现这种情况,主从皆False,全挂的情况也会至少打开一个 g_main_redis_is_ok[index] = Trueget_redis_by_key raise Exception('内部错误,get_redis_key运行异常') # redis存值且设置生命周期 def set_redis_key_ex(key, value, expire): # 根据key来分库 index = get_redis_by_key(key, len(g_main_redis_pool_list)) if g_main_redis_is_ok[index]: # 主ok try: if expire == 0: return redis.Redis(connection_pool=g_main_redis_pool_list[index]).set(key, value) return redis.Redis(connection_pool=g_main_redis_pool_list[index]).setex(key, value, expire) except Exception: # 主标记为挂 g_main_redis_is_ok[index] = False # 主挂了试试从能不能用 g_slave_redis_is_ok[index] = True if g_slave_redis_is_ok[index]: # 从ok try: if expire == 0: return redis.Redis(connection_pool=g_slave_redis_pool_list[index]).set(key, value) return redis.Redis(connection_pool=g_slave_redis_pool_list[index]).setex(key, value, expire) except Exception as e: # 从标记为挂 g_slave_redis_is_ok[index] = False # 从也挂了下回只能尝试使用主 g_main_redis_is_ok[index] = True # 抛出异常 raise Exception(repr(e)) # 按理不可能出现这种情况,主从皆False,全挂的情况也会至少打开一个 g_main_redis_is_ok[index] = True raise Exception('内部错误,set_redis_key_ex运行异常') # redis存值且设置生命周期 def expire_redis_key(key, expire): # 根据key来分库 index = get_redis_by_key(key, len(g_main_redis_pool_list)) if g_main_redis_is_ok[index]: # 主ok try: if expire == 0: return 0 return redis.Redis(connection_pool=g_main_redis_pool_list[index]).expire(key, expire) except Exception: # 主标记为挂 g_main_redis_is_ok[index] = False # 主挂了试试从能不能用 g_slave_redis_is_ok[index] = True if g_slave_redis_is_ok[index]: # 从ok try: if expire == 0: return 0 return redis.Redis(connection_pool=g_slave_redis_pool_list[index]).expire(key, expire) except Exception as e: # 从标记为挂 g_slave_redis_is_ok[index] = False # 从也挂了下回只能尝试使用主 g_main_redis_is_ok[index] = True # 抛出异常 raise Exception(repr(e)) # 按理不可能出现这种情况,主从皆False,全挂的情况也会至少打开一个 g_main_redis_is_ok[index] = True raise Exception('内部错误,expire_redis_key运行异常') # redis删除key def delete_redis_key(key, expire): # 根据key来分库 index = get_redis_by_key(key, len(g_main_redis_pool_list)) if g_main_redis_is_ok[index]: # 主ok try: if expire == 0: return 0 return redis.Redis(connection_pool=g_main_redis_pool_list[index]).delete(key) except Exception: # 主标记为挂 g_main_redis_is_ok[index] = False # 主挂了试试从能不能用 g_slave_redis_is_ok[index] = True if g_slave_redis_is_ok[index]: # 从ok try: if expire == 0: return 0 return redis.Redis(connection_pool=g_slave_redis_pool_list[index]).delete(key) except Exception as e: # 从标记为挂 g_slave_redis_is_ok[index] = False # 从也挂了下回只能尝试使用主 g_main_redis_is_ok[index] = True # 抛出异常 raise Exception(repr(e)) # 按理不可能出现这种情况,主从皆False,全挂的情况也会至少打开一个 g_main_redis_is_ok[index] = True raise Exception('内部错误,delete_redis_key运行异常') def set_redis_key_ex_nx(key, value, expire): """如果有键值则不设置""" # 根据key来分库 index = get_redis_by_key(key, len(g_main_redis_pool_list)) if g_main_redis_is_ok[index]: # 主ok try: if expire == 0: return 0 return redis.Redis(connection_pool=g_main_redis_pool_list[index]).set(key, value, ex=expire, nx=True) except Exception: # 主标记为挂 g_main_redis_is_ok[index] = False # 主挂了试试从能不能用 g_slave_redis_is_ok[index] = True if g_slave_redis_is_ok[index]: # 从ok try: if expire == 0: return 0 return redis.Redis(connection_pool=g_slave_redis_pool_list[index]).set(key, value, ex=expire, nx=True) except Exception as e: # 从标记为挂 g_slave_redis_is_ok[index] = False # 从也挂了下回只能尝试使用主 g_main_redis_is_ok[index] = True # 抛出异常 raise Exception(repr(e)) # 按理不可能出现这种情况,主从皆False,全挂的情况也会至少打开一个 g_main_redis_is_ok[index] = True raise Exception('内部错误,set_redis_key_nx_运行异常') def get_redis_keys_pattern(key_pattern): from builtins import enumerate key_set = set() # 主库找 for index, is_ok in enumerate(g_main_redis_is_ok): if is_ok: key_set.update(redis.Redis(connection_pool=g_main_redis_pool_list[index]).keys(key_pattern)) # 从库找 for index, is_ok in enumerate(g_slave_redis_is_ok): if is_ok: key_set.update(redis.Redis(connection_pool=g_slave_redis_pool_list[index]).keys(key_pattern)) return key_set if __name__ == "__main__": # set_redis_key_ex('ab','a',10) print(get_redis_key('ab').decode())
2. 使用方法
import FuncDefine with FuncDefine.RedisLock(rediskey) as lock: if not lock.hold_lock: return response(3, '商品添加中,请稍后~', '', [])
3. 源码分析
整体来看也就是接口访问过来的时候, 设置一个redis_key(nx=True, ex=300), 这样在五分钟之内就可以避免重复点击的情况
- 初始化redis, 上下文管理器会触发
__enter__()方法, 从而调用self.acquire()

- 设置redis的键, 如果不加nx=True, redis的set会直接覆盖之前key的值, 这里还存在一个主从redis, 感兴趣可以看看源码

- 当执行完with中的代码块, 会触发
__exit__()函数, 调用函数删除当前redis的key对应的值

- 剩下的一些函数都是封装的一些通用方法, 比如查看当前key值