GO实现Redis:GO实现Redis的AOF持久化(4)

  • 将用户发来的指令以RESP协议的形式存储在本地的AOF文件,重启Redis后执行此文件恢复数据
  • https://github.com/csgopher/go-redis
  • 本文涉及以下文件:
    redis.conf:配置文件
    aof:实现aof

redis.conf

appendonly yes appendfilename appendonly.aof 

aof/aof.go

type CmdLine = [][]byte  const (    aofQueueSize = 1 << 16 )  type payload struct {    cmdLine CmdLine    dbIndex int }  type AofHandler struct {    db          databaseface.Database    aofChan     chan *payload    aofFile     *os.File    aofFilename string    currentDB   int }  func NewAOFHandler(db databaseface.Database) (*AofHandler, error) {    handler := &AofHandler{}    handler.aofFilename = config.Properties.AppendFilename    handler.db = db    handler.LoadAof()    aofFile, err := os.OpenFile(handler.aofFilename, os.O_APPEND|os.O_CREATE|os.O_RDWR, 0600)    if err != nil {       return nil, err    }    handler.aofFile = aofFile    handler.aofChan = make(chan *payload, aofQueueSize)    go func() {       handler.handleAof()    }()    return handler, nil }  func (handler *AofHandler) AddAof(dbIndex int, cmdLine CmdLine) {    if config.Properties.AppendOnly && handler.aofChan != nil {       handler.aofChan <- &payload{          cmdLine: cmdLine,          dbIndex: dbIndex,       }    } }  func (handler *AofHandler) handleAof() {    handler.currentDB = 0    for p := range handler.aofChan {       if p.dbIndex != handler.currentDB {          // select db          data := reply.MakeMultiBulkReply(utils.ToCmdLine("SELECT", strconv.Itoa(p.dbIndex))).ToBytes()          _, err := handler.aofFile.Write(data)          if err != nil {             logger.Warn(err)             continue          }          handler.currentDB = p.dbIndex       }       data := reply.MakeMultiBulkReply(p.cmdLine).ToBytes()       _, err := handler.aofFile.Write(data)       if err != nil {          logger.Warn(err)       }    } }  func (handler *AofHandler) LoadAof() {    file, err := os.Open(handler.aofFilename)    if err != nil {       logger.Warn(err)       return    }    defer file.Close()    ch := parser.ParseStream(file)    fakeConn := &connection.Connection{}    for p := range ch {       if p.Err != nil {          if p.Err == io.EOF {             break          }          logger.Error("parse error: " + p.Err.Error())          continue       }       if p.Data == nil {          logger.Error("empty payload")          continue       }       r, ok := p.Data.(*reply.MultiBulkReply)       if !ok {          logger.Error("require multi bulk reply")          continue       }       ret := handler.db.Exec(fakeConn, r.Args)       if reply.IsErrorReply(ret) {          logger.Error("exec err", err)       }    } } 

AofHandler:1.从管道中接收数据 2.写入AOF文件
AddAof:用户的指令包装成payload放入管道
handleAof:将管道中的payload写入磁盘
LoadAof:重启Redis后加载aof文件

database/database.go

type Database struct {    dbSet []*DB    aofHandler *aof.AofHandler }  func NewDatabase() *Database {    mdb := &Database{}    if config.Properties.Databases == 0 {       config.Properties.Databases = 16    }    mdb.dbSet = make([]*DB, config.Properties.Databases)    for i := range mdb.dbSet {       singleDB := makeDB()       singleDB.index = i       mdb.dbSet[i] = singleDB    }    if config.Properties.AppendOnly {       aofHandler, err := aof.NewAOFHandler(mdb)       if err != nil {          panic(err)       }       mdb.aofHandler = aofHandler       for _, db := range mdb.dbSet {          singleDB := db          singleDB.addAof = func(line CmdLine) {             mdb.aofHandler.AddAof(singleDB.index, line)          }       }    }    return mdb } 

将AOF加入到database里
使用singleDB的原因:因为在循环中获取返回变量的地址都完全相同,因此当我们想要访问数组中元素所在的地址时,不应该直接获取 range 返回的变量地址 db,而应该使用 singleDB := db

database/db.go

type DB struct {    index int    data   dict.Dict    addAof func(CmdLine) }  func makeDB() *DB { 	db := &DB{ 		data:   dict.MakeSyncDict(), 		addAof: func(line CmdLine) {}, 	} 	return db } 

由于分数据库db引用不到aof,所以添加一个addAof匿名函数,在NewDatabase中用这个匿名函数调用AddAof

database/keys.go

func execDel(db *DB, args [][]byte) resp.Reply {    ......    if deleted > 0 {       db.addAof(utils.ToCmdLine2("del", args...))    }    return reply.MakeIntReply(int64(deleted)) }  func execFlushDB(db *DB, args [][]byte) resp.Reply { 	db.Flush() 	db.addAof(utils.ToCmdLine2("flushdb", args...)) 	return &reply.OkReply{} }  func execRename(db *DB, args [][]byte) resp.Reply { 	...... 	db.addAof(utils.ToCmdLine2("rename", args...)) 	return &reply.OkReply{} }  func execRenameNx(db *DB, args [][]byte) resp.Reply { 	...... 	db.addAof(utils.ToCmdLine2("renamenx", args...)) 	return reply.MakeIntReply(1) } 

database/string.go

func execSet(db *DB, args [][]byte) resp.Reply {    ......    db.addAof(utils.ToCmdLine2("set", args...))    return &reply.OkReply{} }  func execSetNX(db *DB, args [][]byte) resp.Reply {    ......    db.addAof(utils.ToCmdLine2("setnx", args...))    return reply.MakeIntReply(int64(result)) }  func execGetSet(db *DB, args [][]byte) resp.Reply {    key := string(args[0])    value := args[1]     entity, exists := db.GetEntity(key)    db.PutEntity(key, &database.DataEntity{Data: value})    db.addAof(utils.ToCmdLine2("getset", args...))    ...... } 

添加addAof方法

测试命令

*3rn$3rnSETrn$3rnkeyrn$5rnvaluern *2rn$3rnGETrn$3rnkeyrn *2rn$6rnSELECTrn$1rn1rn 

发表评论

评论已关闭。

相关文章