cache2go-源码阅读

简介

cache2go 是非常简短的 go 开源项目了,很适合作为第一个读源码项目。

如果你有一定的 go 开发经验,读起来会感觉到比较容易。

如果你刚刚接触 go 语音,基础知识还不完全了解,希望阅读本文时,遇到一个不会的知识点,去攻克一个,带着为了看懂本文源码的目的去学习基础知识。比如:

  • time.Timer
  • defer
  • sync.RWMutex

作者这样介绍:Concurrency-safe golang caching library with expiration capabilities,简单来说就是具有过期功能的并发安全 golang 缓存库,因此它具有两大特性:

  • 并发安全
  • 自动过期

项目结构

该项目非常简单,全部逻辑由三个文件实现:

  • cache.go:缓存多个表。
  • cachetable.go:缓存一个表。
  • cacheitem.go:缓存表中的一个条目。

数据结构图:
cache2go-源码阅读

接下来会自下而上地分析源码。

cacheitem.go

该文件中包含两块重要内容:

  • 结构体 CacheItem,用来缓存表中的一个条目。
  • 函数 NewCacheItem,用来创建 CacheItem 实例。

CacheItem

CacheItem 用来缓存表中的一个条目,属性解释:

  • sync.RWMutex:读写锁,保证并发读写安全。
  • key:键。
  • value:值,即数据。
  • lifeSpan:该条目的存活周期,即过期时间。
  • createdOn:创建时间。
  • accessedOn:上次访问时间。
  • accessCount:访问次数。
  • aboutToExpire:从缓存中删除项目之前触发的回调方法,可以在删除之前做一些自定义操作。

源码如下:

// CacheItem is an individual cache item // Parameter data contains the user-set value in the cache. type CacheItem struct { 	sync.RWMutex  	// The item's key. 	key interface{} 	// The item's data. 	data interface{} 	// How long will the item live in the cache when not being accessed/kept alive. 	lifeSpan time.Duration  	// Creation timestamp. 	createdOn time.Time 	// Last access timestamp. 	accessedOn time.Time 	// How often the item was accessed. 	accessCount int64  	// Callback method triggered right before removing the item from the cache 	aboutToExpire []func(key interface{}) } 

Get 方法

下面是一些比较简单的 Get 方法,一些有写场景的属性会多两行获取锁与释放锁的代码。

// LifeSpan returns this item's expiration duration. func (item *CacheItem) LifeSpan() time.Duration { 	// immutable 	return item.lifeSpan }  // AccessedOn returns when this item was last accessed. func (item *CacheItem) AccessedOn() time.Time { 	item.RLock() 	defer item.RUnlock() 	return item.accessedOn }  // CreatedOn returns when this item was added to the cache. func (item *CacheItem) CreatedOn() time.Time { 	// immutable 	return item.createdOn }  // AccessCount returns how often this item has been accessed. func (item *CacheItem) AccessCount() int64 { 	item.RLock() 	defer item.RUnlock() 	return item.accessCount }  // Key returns the key of this cached item. func (item *CacheItem) Key() interface{} { 	// immutable 	return item.key }  // Data returns the value of this cached item. func (item *CacheItem) Data() interface{} { 	// immutable 	return item.data } 

KeepAlive

保活函数:

  • 前两行代码表示:加锁保证并发安全读写。
  • 后两行代码表示:当被访问时,更新访问时间,同时访问次数加 1。
// KeepAlive marks an item to be kept for another expireDuration period. func (item *CacheItem) KeepAlive() { 	item.Lock() 	defer item.Unlock() 	item.accessedOn = time.Now() 	item.accessCount++ } 

AddAboutToExpireCallback

新增回调函数,回调函数无返回值,仅有一个参数 interface{},即支持任意的参数。

// AddAboutToExpireCallback appends a new callback to the AboutToExpire queue func (item *CacheItem) AddAboutToExpireCallback(f func(interface{})) { 	item.Lock() 	defer item.Unlock() 	item.aboutToExpire = append(item.aboutToExpire, f) } 

SetAboutToExpireCallback

设置回调函数需要完全替代,不同于新增,需要先清空,再覆盖。

// SetAboutToExpireCallback configures a callback, which will be called right // before the item is about to be removed from the cache. func (item *CacheItem) SetAboutToExpireCallback(f func(interface{})) { 	if len(item.aboutToExpire) > 0 { 		item.RemoveAboutToExpireCallback() 	} 	item.Lock() 	defer item.Unlock() 	item.aboutToExpire = append(item.aboutToExpire, f) } 

RemoveAboutToExpireCallback

通过直接置空,删除所有的回调函数。

// RemoveAboutToExpireCallback empties the about to expire callback queue func (item *CacheItem) RemoveAboutToExpireCallback() { 	item.Lock() 	defer item.Unlock() 	item.aboutToExpire = nil } 

NewCacheItem

创建 CacheItem 实例

func NewCacheItem(key interface{}, lifeSpan time.Duration, data interface{}) *CacheItem { 	t := time.Now() 	return &CacheItem{ 		key:           key, 		lifeSpan:      lifeSpan, 		createdOn:     t, 		accessedOn:    t, 		accessCount:   0, 		aboutToExpire: nil, 		data:          data, 	} } 

cachetable.go

该文件中总共有 3 个类:CacheTable、CacheItemPair 和 CacheItemPairList。

下面由简单到复杂逐个分析。

CacheItemPair

CacheItemPair 用来记录缓存访问的次数。

// CacheItemPair maps key to access counter type CacheItemPair struct { 	Key         interface{} 	AccessCount int64 } 

CacheItemPairList

CacheItemPairList 是 CacheItemPair 的切片,通过实现方法 Swap、Len 和 Less 实现了 sort.Interface,支持排序。

需要注意方法 Less 的实现,是元素 i 大于元素 j,这种实现是为了降序排序。降序排序是为了方法 CacheTable.MostAccessed 返回访问次数最多的条目列表。

// CacheItemPairList is a slice of CacheItemPairs that implements sort. // Interface to sort by AccessCount. type CacheItemPairList []CacheItemPair  func (p CacheItemPairList) Swap(i, j int)      { p[i], p[j] = p[j], p[i] } func (p CacheItemPairList) Len() int           { return len(p) } func (p CacheItemPairList) Less(i, j int) bool { return p[i].AccessCount > p[j].AccessCount } 

CacheTable

CacheTable 用来缓存一个表,属性解释:

  • sync.RWMutex:读写锁,保证并发读写安全。
  • name:表名。
  • items:表中的条目列表。
  • cleanupTimer:过期清除定时器。
  • cleanupInterval:过期清除的时间。
  • logger:打印日志的对象。
  • loadData:读取不存在 key 的回调函数,可以用来做初始化缓存的逻辑。
  • addedItem:新增条目时的回调函数,增加灵活性。
  • aboutToDeleteItem:删除条目前的回调函数,增加灵活性。

源码如下:

// CacheTable is a table within the cache type CacheTable struct { 	sync.RWMutex  	// The table's name. 	name string 	// All cached items. 	items map[interface{}]*CacheItem  	// Timer responsible for triggering cleanup. 	cleanupTimer *time.Timer 	// Current timer duration. 	cleanupInterval time.Duration  	// The logger used for this table. 	logger *log.Logger  	// Callback method triggered when trying to load a non-existing key. 	loadData func(key interface{}, args ...interface{}) *CacheItem 	// Callback method triggered when adding a new item to the cache. 	addedItem []func(item *CacheItem) 	// Callback method triggered before deleting an item from the cache. 	aboutToDeleteItem []func(item *CacheItem) } 

下面会先介绍核心方法,再看简单的方法。

Add 新增条目

代码逻辑通过流程图描述了一下,其中的「过期检查」单独抽出来后面分析。

cache2go-源码阅读

NotFoundAdd 和 Add 核心逻辑是一样的,具体区别不做额外描述,源代码如下:

// Add adds a key/value pair to the cache. // Parameter key is the item's cache-key. // Parameter lifeSpan determines after which time period without an access the item // will get removed from the cache. // Parameter data is the item's value. func (table *CacheTable) Add(key interface{}, lifeSpan time.Duration, data interface{}) *CacheItem { 	item := NewCacheItem(key, lifeSpan, data)  	// Add item to cache. 	table.Lock() 	table.addInternal(item)  	return item }  func (table *CacheTable) addInternal(item *CacheItem) { 	// Careful: do not run this method unless the table-mutex is locked! 	// It will unlock it for the caller before running the callbacks and checks 	table.log("Adding item with key", item.key, "and lifespan of", item.lifeSpan, "to table", table.name) 	table.items[item.key] = item  	// Cache values so we don't keep blocking the mutex. 	expDur := table.cleanupInterval 	addedItem := table.addedItem 	table.Unlock()  	// Trigger callback after adding an item to cache. 	if addedItem != nil { 		for _, callback := range addedItem { 			callback(item) 		} 	}  	// If we haven't set up any expiration check timer or found a more imminent item. 	if item.lifeSpan > 0 && (expDur == 0 || item.lifeSpan < expDur) { 		table.expirationCheck() 	} }  // NotFoundAdd checks whether an item is not yet cached. Unlike the Exists // method this also adds data if the key could not be found. func (table *CacheTable) NotFoundAdd(key interface{}, lifeSpan time.Duration, data interface{}) bool { 	table.Lock()  	if _, ok := table.items[key]; ok { 		table.Unlock() 		return false 	}  	item := NewCacheItem(key, lifeSpan, data) 	table.addInternal(item)  	return true } 

expirationCheck 过期检查

过期检查的处理,是一个值得学习的点,这里并不是我们印象中用循环定期扫描哪些 key 过期了,也不是给每个条目分别定义一个定时器。

每次新增条目时,扫描得到最近过期条目的过期时间,仅定义一个定时器。该定时器触发时清除缓存,并生成下一个定时器,如此接力处理。

过期检查中会调用方法 table.deleteInternal 来清除过期的 key,这块儿在讲 Delete 方法时会再详细分析。

cache2go-源码阅读

// Expiration check loop, triggered by a self-adjusting timer. func (table *CacheTable) expirationCheck() { 	table.Lock() 	if table.cleanupTimer != nil { 		table.cleanupTimer.Stop() 	} 	if table.cleanupInterval > 0 { 		table.log("Expiration check triggered after", table.cleanupInterval, "for table", table.name) 	} else { 		table.log("Expiration check installed for table", table.name) 	}  	// To be more accurate with timers, we would need to update 'now' on every 	// loop iteration. Not sure it's really efficient though. 	now := time.Now() 	smallestDuration := 0 * time.Second 	for key, item := range table.items { 		// Cache values so we don't keep blocking the mutex. 		item.RLock() 		lifeSpan := item.lifeSpan 		accessedOn := item.accessedOn 		item.RUnlock()  		if lifeSpan == 0 { 			continue 		} 		if now.Sub(accessedOn) >= lifeSpan { 			// Item has excessed its lifespan. 			table.deleteInternal(key) 		} else { 			// Find the item chronologically closest to its end-of-lifespan. 			if smallestDuration == 0 || lifeSpan-now.Sub(accessedOn) < smallestDuration { 				smallestDuration = lifeSpan - now.Sub(accessedOn) 			} 		} 	}  	// Setup the interval for the next cleanup run. 	table.cleanupInterval = smallestDuration 	if smallestDuration > 0 { 		table.cleanupTimer = time.AfterFunc(smallestDuration, func() { 			go table.expirationCheck() 		}) 	} 	table.Unlock() } 

Delete 方法

从流程图可以看出,这块儿大部分逻辑是在加锁、释放锁,有这么多锁主要是有如下几个原因:

  • 一部分是表级别的,一部分是条目级别的;
  • 表级别锁出现两次获取与释放,这种实现主要是考虑到 deleteInternal 的复用性,同时支持 Delete 和 expirationCheck 的调用,做了一些锁回溯的逻辑。思考:假如 Mutex 是可重入锁,是不是不需要回溯处理了?

cache2go-源码阅读

// Delete an item from the cache. func (table *CacheTable) Delete(key interface{}) (*CacheItem, error) { 	table.Lock() 	defer table.Unlock()  	return table.deleteInternal(key) }  func (table *CacheTable) deleteInternal(key interface{}) (*CacheItem, error) { 	r, ok := table.items[key] 	if !ok { 		return nil, ErrKeyNotFound 	}  	// Cache value so we don't keep blocking the mutex. 	aboutToDeleteItem := table.aboutToDeleteItem 	table.Unlock()  	// Trigger callbacks before deleting an item from cache. 	if aboutToDeleteItem != nil { 		for _, callback := range aboutToDeleteItem { 			callback(r) 		} 	}  	r.RLock() 	defer r.RUnlock() 	if r.aboutToExpire != nil { 		for _, callback := range r.aboutToExpire { 			callback(key) 		} 	}  	table.Lock() 	table.log("Deleting item with key", key, "created on", r.createdOn, "and hit", r.accessCount, "times from table", table.name) 	delete(table.items, key)  	return r, nil } 

Value 取值

取值本身是比较简单的,只不过这里要进行一些额外处理:

  • 取不值时,是否有自定义逻辑,比如降级查询后缓存进去。
  • 取到值时,更新访问时间,达到保活的目的。

cache2go-源码阅读

// Value returns an item from the cache and marks it to be kept alive. You can // pass additional arguments to your DataLoader callback function. func (table *CacheTable) Value(key interface{}, args ...interface{}) (*CacheItem, error) { 	table.RLock() 	r, ok := table.items[key] 	loadData := table.loadData 	table.RUnlock()  	if ok { 		// Update access counter and timestamp. 		r.KeepAlive() 		return r, nil 	}  	// Item doesn't exist in cache. Try and fetch it with a data-loader. 	if loadData != nil { 		item := loadData(key, args...) 		if item != nil { 			table.Add(key, item.lifeSpan, item.data) 			return item, nil 		}  		return nil, ErrKeyNotFoundOrLoadable 	}  	return nil, ErrKeyNotFound } 

MostAccessed 最常访问的条目

这个方法用到了前文提到的 CacheItemPair 和 CacheItemPairList。

  • 首先遍历条目,取出 key 和 accessCount 存储到 p 中,用来排序;
  • 接着用有序的 p 映射出所有条目的顺序,返回有序的条目。
// MostAccessed returns the most accessed items in this cache table func (table *CacheTable) MostAccessed(count int64) []*CacheItem { 	table.RLock() 	defer table.RUnlock()  	p := make(CacheItemPairList, len(table.items)) 	i := 0 	for k, v := range table.items { 		p[i] = CacheItemPair{k, v.accessCount} 		i++ 	} 	sort.Sort(p)  	var r []*CacheItem 	c := int64(0) 	for _, v := range p { 		if c >= count { 			break 		}  		item, ok := table.items[v.Key] 		if ok { 			r = append(r, item) 		} 		c++ 	}  	return r } 

Foreach 方法

为开发者提供更加丰富的自定义操作。

// Foreach all items func (table *CacheTable) Foreach(trans func(key interface{}, item *CacheItem)) { 	table.RLock() 	defer table.RUnlock()  	for k, v := range table.items { 		trans(k, v) 	} } 

清空缓存

清空缓存的方法比较简单,一方面是数据的清空,另一方面是定时器的清空。

// Flush deletes all items from this cache table. func (table *CacheTable) Flush() { 	table.Lock() 	defer table.Unlock()  	table.log("Flushing table", table.name)  	table.items = make(map[interface{}]*CacheItem) 	table.cleanupInterval = 0 	if table.cleanupTimer != nil { 		table.cleanupTimer.Stop() 	} } 

查询相关方法

Count 和 Exists 方法是比较简单的,不用多说。

// Count returns how many items are currently stored in the cache. func (table *CacheTable) Count() int { 	table.RLock() 	defer table.RUnlock() 	return len(table.items) }  // Exists returns whether an item exists in the cache. Unlike the Value method // Exists neither tries to fetch data via the loadData callback nor does it // keep the item alive in the cache. func (table *CacheTable) Exists(key interface{}) bool { 	table.RLock() 	defer table.RUnlock() 	_, ok := table.items[key]  	return ok } 

Set 相关方法

下面这些 Set 方法比较简单,也不多做赘述。

// SetDataLoader configures a data-loader callback, which will be called when // trying to access a non-existing key. The key and 0...n additional arguments // are passed to the callback function. func (table *CacheTable) SetDataLoader(f func(interface{}, ...interface{}) *CacheItem) { 	table.Lock() 	defer table.Unlock() 	table.loadData = f }  // SetAddedItemCallback configures a callback, which will be called every time // a new item is added to the cache. func (table *CacheTable) SetAddedItemCallback(f func(*CacheItem)) { 	if len(table.addedItem) > 0 { 		table.RemoveAddedItemCallbacks() 	} 	table.Lock() 	defer table.Unlock() 	table.addedItem = append(table.addedItem, f) }  //AddAddedItemCallback appends a new callback to the addedItem queue func (table *CacheTable) AddAddedItemCallback(f func(*CacheItem)) { 	table.Lock() 	defer table.Unlock() 	table.addedItem = append(table.addedItem, f) }  // SetAboutToDeleteItemCallback configures a callback, which will be called // every time an item is about to be removed from the cache. func (table *CacheTable) SetAboutToDeleteItemCallback(f func(*CacheItem)) { 	if len(table.aboutToDeleteItem) > 0 { 		table.RemoveAboutToDeleteItemCallback() 	} 	table.Lock() 	defer table.Unlock() 	table.aboutToDeleteItem = append(table.aboutToDeleteItem, f) }  // AddAboutToDeleteItemCallback appends a new callback to the AboutToDeleteItem queue func (table *CacheTable) AddAboutToDeleteItemCallback(f func(*CacheItem)) { 	table.Lock() 	defer table.Unlock() 	table.aboutToDeleteItem = append(table.aboutToDeleteItem, f) }  // SetLogger sets the logger to be used by this cache table. func (table *CacheTable) SetLogger(logger *log.Logger) { 	table.Lock() 	defer table.Unlock() 	table.logger = logger } 

删除相关方法

过于简单,不做赘述

// RemoveAddedItemCallbacks empties the added item callback queue func (table *CacheTable) RemoveAddedItemCallbacks() { 	table.Lock() 	defer table.Unlock() 	table.addedItem = nil }  // RemoveAboutToDeleteItemCallback empties the about to delete item callback queue func (table *CacheTable) RemoveAboutToDeleteItemCallback() { 	table.Lock() 	defer table.Unlock() 	table.aboutToDeleteItem = nil } 

cache.go

Cache 函数是该缓存库的入口函数,该函数存在一段双检逻辑,需要特别了解下原因:

  • mutex.Lock() 获取锁过程中,可能另一协程已经完成了初始化。因此,需要再次校验。
// Cache returns the existing cache table with given name or creates a new one // if the table does not exist yet. func Cache(table string) *CacheTable { 	mutex.RLock() 	t, ok := cache[table] 	mutex.RUnlock()  	if !ok { 		mutex.Lock() 		t, ok = cache[table] 		// Double check whether the table exists or not. 		if !ok { 			t = &CacheTable{ 				name:  table, 				items: make(map[interface{}]*CacheItem), 			} 			cache[table] = t 		} 		mutex.Unlock() 	}  	return t } 

examples

样例也比较简单,读者可以自行阅读下。

引用

  1. https://github.com/muesli/cache2go
  2. https://mp.weixin.qq.com/s/6JjL0KVccW7nAQiKuDAl-w
  3. https://mp.weixin.qq.com/s/gIvNjn7GdOQUwg1pKtDTEQ
  4. https://mp.weixin.qq.com/s/898HtDyFTykvMu2-vvMy-A
发表评论

相关文章