Redis缓存一致性

Redis 缓存一致性指的是缓存数据与数据库数据保持同步,避免出现缓存数据过时、与数据库数据不匹配的情况。

策略 核心思想 一致性强度 性能影响 实现复杂度 适用场景
Cache-Aside 应用层主动管理缓存:读时延后加载,写时更新DB并删除缓存 最终一致性 读操作可能延迟 最常用,读多写少,如用户信息、商品信息
Write-Through 将缓存作为主要数据入口,所有写操作同步更新缓存和数据库 强一致性 写性能较低 写多读少,对一致性要求极高,如金融账户
Write-Behind 写操作先更新缓存,随后异步批量更新数据库 最终一致性 写性能高 写操作密集,可容忍数据丢失,如日志、统计
Read-Through 应用只读缓存,由缓存组件自身负责未命中时从数据库加载 取决于背后的写策略 读操作平均延迟低 希望简化应用逻辑,有相应缓存组件支持

1、Cache-Aside (旁路缓存) - 最常用模式

这是最广泛采用的模式,由应用程序显式地管理缓存和数据库的交互。

读流程:

  1. 接收读请求。
  2. 首先尝试从 Redis 中读取数据。
  3. 如果命中(Cache Hit),直接返回数据。
  4. 如果未命中(Cache Miss),则从数据库查询。
  5. 将数据库返回的数据写入 Redis(称为“回填”),然后返回响应。

写流程:

  1. 接收写请求。
  2. 更新数据库。
  3. 删除(而非更新)Redis 中对应的缓存键。

为何删除缓存,而不是更新它?
在并发场景下,如果采用更新缓存的方式,可能会因为操作时序问题导致缓存中被写入旧数据。删除缓存是一种更安全、更简单的策略,它确保后续的读请求能从数据库加载最新数据并重新回填缓存。

优点:实现简单,对业务代码侵入性低,缓存中只保留真正被请求的热点数据。

缺点:在极端的并发情况下,仍可能出现短时间的数据不一致(可通过“延迟双删”策略优化)。

代码示例

 import redis import pymysql import json import time  # 初始化Redis和数据库连接 r = redis.Redis(host='localhost', port=6379, db=0) db = pymysql.connect(host='localhost', user='user', password='pass', database='mydb')  def get_product(product_id):     """读取产品信息"""     cache_key = f"product:{product_id}"          # 1. 尝试从缓存获取     product_data = r.get(cache_key)     if product_data:         print(f"Cache hit for product {product_id}")         return json.loads(product_data)          print(f"Cache miss for product {product_id}")          # 2. 缓存未命中,查询数据库     cursor = db.cursor()     cursor.execute("SELECT * FROM products WHERE id = %s", (product_id,))     product = cursor.fetchone()     cursor.close()          if not product:         # 3. 数据库也没有,缓存空值防止穿透         r.setex(cache_key, 300, json.dumps({"status": "not_found"}))  # 5分钟空值缓存         return None          # 4. 数据库有数据,写入缓存     product_dict = {         'id': product[0],         'name': product[1],         'price': float(product[2]),         'stock': product[3]     }     r.setex(cache_key, 3600, json.dumps(product_dict))  # 缓存1小时          return product_dict  def update_product(product_id, new_data):     """更新产品信息"""     cache_key = f"product:{product_id}"          # 1. 更新数据库     cursor = db.cursor()     cursor.execute(         "UPDATE products SET name = %s, price = %s, stock = %s WHERE id = %s",         (new_data['name'], new_data['price'], new_data['stock'], product_id)     )     db.commit()     cursor.close()          # 2. 删除缓存     r.delete(cache_key)     print(f"Cache invalidated for product {product_id}")          # 3. 可选:预热缓存     # get_product(product_id)  # 立即重新加载到缓存  

缓存双删代码示例

这第二次删除是为了清除在“更新数据库”到“第一次删除缓存”这个极短时间窗口内,可能被其他读请求回填到缓存中的旧数据。

 import threading  def update_product_with_double_delete(product_id, new_data):     """使用延迟双删策略更新产品"""     cache_key = f"product:{product_id}"          # 第一次删除缓存     r.delete(cache_key)     print(f"First cache deletion for product {product_id}")          # 更新数据库     cursor = db.cursor()     cursor.execute(         "UPDATE products SET name = %s, price = %s, stock = %s WHERE id = %s",         (new_data['name'], new_data['price'], new_data['stock'], product_id)     )     db.commit()     cursor.close()          # 延迟第二次删除     def delayed_delete():         time.sleep(0.5)  # 延迟500ms         r.delete(cache_key)         print(f"Second cache deletion for product {product_id}")          threading.Thread(target=delayed_delete).start()  

2、Write-Through (写穿透) - 强一致性模式

在此模式下,缓存被视为主要的数据源。所有写操作都同步地经过缓存,由缓存组件负责同时更新缓存和数据库。

写流程:

  1. 应用将数据写入缓存。
  2. 缓存组件同步地将数据写入底层数据库。
  3. 写入成功后返回。

优点:保证了强一致性,读写操作都面对缓存,性能通常优于直接读数据库。

缺点:写延迟较高(因为需要等待两个写操作完成),且需要缓存组件或中间件支持此模式。

示例代码

 class WriteThroughCache:     def __init__(self):         self.redis = redis.Redis(host='localhost', port=6379, db=0)         self.db = pymysql.connect(host='localhost', user='user', password='pass', database='mydb')          def set(self, key, value, ttl=None):         """写入数据(同时更新缓存和数据库)"""         # 1. 更新数据库         self._update_db(key, value)                  # 2. 更新缓存         if ttl:             self.redis.setex(key, ttl, json.dumps(value))         else:             self.redis.set(key, json.dumps(value))          def get(self, key):         """读取数据"""         # 直接从缓存读取         data = self.redis.get(key)         if data:             return json.loads(data)         return None          def _update_db(self, key, value):         """更新数据库(根据key类型处理)"""         if key.startswith("product:"):             product_id = key.split(":")[1]             cursor = self.db.cursor()             cursor.execute(                 "UPDATE products SET name = %s, price = %s, stock = %s WHERE id = %s",                 (value['name'], value['price'], value['stock'], product_id)             )             self.db.commit()             cursor.close()  # 使用示例 cache = WriteThroughCache() product_data = {'id': 123, 'name': 'New Product', 'price': 29.99, 'stock': 100} cache.set("product:123", product_data, ttl=3600)  # 读取 cached_product = cache.get("product:123")  

3、Write-Behind (写回) - 高性能写模式

与 Write-Through 类似,写操作首先更新缓存。但不同的是,对数据库的更新是异步批量进行的。

写流程:

  1. 应用将数据写入缓存,写入成功后立即返回。
  2. 缓存组件在后台异步地、批量地将一段时间内累积的写操作刷新到数据库。

优点:写性能极高,非常适合写操作非常密集的场景。

缺点:有数据丢失的风险(如果缓存宕机,尚未持久化到数据库的数据就会丢失),只能保证最终一致性。实现复杂度最高。

示例代码

 class WriteBehindCache:     def __init__(self):         self.redis = redis.Redis(host='localhost', port=6379, db=0)         self.db = pymysql.connect(host='localhost', user='user', password='pass', database='mydb')         self.write_queue = []         self.batch_size = 100         self.batch_interval = 10  # 秒         self.running = True         self.lock = threading.Lock()                  # 启动后台批处理线程         self.batch_thread = threading.Thread(target=self._batch_writer)         self.batch_thread.daemon = True         self.batch_thread.start()          def set(self, key, value):         """写入数据(先更新缓存,异步更新数据库)"""         # 1. 立即更新缓存         self.redis.set(key, json.dumps(value))                  # 2. 将数据库更新加入队列         with self.lock:             self.write_queue.append((key, value))          def get(self, key):         """读取数据"""         data = self.redis.get(key)         if data:             return json.loads(data)         return None          def _batch_writer(self):         """后台批处理线程"""         while self.running:             time.sleep(self.batch_interval)                          if not self.write_queue:                 continue                              # 获取当前批量的数据             with self.lock:                 batch = self.write_queue[:self.batch_size]                 self.write_queue = self.write_queue[self.batch_size:]                          if not batch:                 continue                              try:                 # 批量更新数据库                 cursor = self.db.cursor()                 for key, value in batch:                     if key.startswith("product:"):                         product_id = key.split(":")[1]                         cursor.execute(                             "INSERT INTO products (id, name, price, stock) VALUES (%s, %s, %s, %s) "                             "ON DUPLICATE KEY UPDATE name=%s, price=%s, stock=%s",                             (product_id, value['name'], value['price'], value['stock'],                              value['name'], value['price'], value['stock'])                         )                 self.db.commit()                 cursor.close()                 print(f"Batch updated {len(batch)} items to database")             except Exception as e:                 print(f"Batch update failed: {e}")                 # 将失败的任务重新加入队列                 with self.lock:                     self.write_queue = batch + self.write_queue          def stop(self):         """停止服务"""         self.running = False         self.batch_thread.join()         # 处理剩余队列         self._batch_writer()  # 使用示例 cache = WriteBehindCache()  # 高频率写入 for i in range(1000):     product_data = {'id': i, 'name': f'Product {i}', 'price': i*1.1, 'stock': i%100}     cache.set(f"product:{i}", product_data)  # 停止服务时 cache.stop()  

4、 Read-Through (读穿透)

此模式是 Cache-Aside 的变体,旨在简化应用逻辑。应用只从缓存读数据,当缓存未命中时,由缓存组件自身(而不是应用程序)负责从数据库加载数据并回填,然后返回给应用。

读流程:

  1. 应用向缓存请求数据。
  2. 如果缓存命中,直接返回。
  3. 如果缓存未命中,缓存组件从数据库加载数据,存入缓存,然后返回。

优点:将缓存逻辑从应用中解耦,应用代码更简洁。
缺点:需要缓存组件(或智能客户端)支持此功能。

示例代码

 class ReadThroughCache:     def __init__(self):         self.redis = redis.Redis(host='localhost', port=6379, db=0)         self.db = pymysql.connect(host='localhost', user='user', password='pass', database='mydb')          def get(self, key):         """读取数据(缓存未命中时自动加载)"""         # 1. 尝试从缓存获取         data = self.redis.get(key)         if data:             return json.loads(data)                  # 2. 缓存未命中,从数据库加载         value = self._load_from_db(key)         if value is None:             # 缓存空值防止穿透             self.redis.setex(key, 300, json.dumps({"status": "not_found"}))             return None                  # 3. 写入缓存         self.redis.setex(key, 3600, json.dumps(value))         return value          def _load_from_db(self, key):         """从数据库加载数据"""         if key.startswith("product:"):             product_id = key.split(":")[1]             cursor = self.db.cursor()             cursor.execute("SELECT * FROM products WHERE id = %s", (product_id,))             product = cursor.fetchone()             cursor.close()                          if product:                 return {                     'id': product[0],                     'name': product[1],                     'price': float(product[2]),                     'stock': product[3]                 }         return None  # 使用示例 cache = ReadThroughCache() product = cache.get("product:123")  

发表评论

评论已关闭。

相关文章