Seata源码—5.全局事务的创建与返回处理

大纲

1.Seata开启分布式事务的流程总结

2.Seata生成全局事务ID的雪花算法源码

3.生成xid以及对全局事务会话进行持久化的源码

4.全局事务会话数据持久化的实现源码

5.Seata Server创建全局事务与返回xid的源码

6.Client获取Server的响应与处理的源码

7.Seata与Dubbo整合的过滤器源码

 

1.Seata开启分布式事务的流程总结

(1)Seata分布式事务执行流程

(2)开启一个全局事务的流程

 

(1)Seata分布式事务执行流程

Seata Client在执行添加了全局事务注解@GlobalTransactional的方法时,实际执行的是根据全局事务拦截器创建该方法所在Bean的动态代理方法,于是会执行GlobalTransactionalInterceptor的invoke()方法。此时,添加了全局事务注解@GlobalTransactional的方法就会被全局事务拦截器拦截了。

 

GlobalTransactionalInterceptor全局事务拦截器拦截目标方法的调用后,会由事务执行模版TransactionalTemplate的excute()方法来执行目标方法。

 

在事务执行模版TransactionalTemplate的excute()方法中,首先会判断Propagation全局事务传播级别,然后开启一个全局事务(也就是打开一个全局事务),接着才执行具体的业务目标方法。

 

执行具体的业务目标方法时,会通过Dubbo的RPC调用来传递全局事务的xid给其他的Seata Client。其他的Seata Client通过Dubbo过滤器获取到RPC调用中的xid后,会将xid放入线程本地变量副本中。之后执行SQL时就会获取数据库连接代理来对SQL进行拦截,数据库连接代理就可以从线程本地变量副本中获取xid,然后开启分支事务。

 

各个分支事务都执行完毕后,开启全局事务的Seata Client就会提交事务、处理全局锁、资源清理。

 

(2)开启一个全局事务的流程

Seata Server收到Seata Client发送过来的RpcMessage对象消息后,RpcMessage对象消息首先会由ServerOnRequestProcessor的process()方法处理,然后会由DefaultCoordinator的onRequest()方法进行处理,接着会由GlobalBeginRequest的handle()方法进行处理,然后会由DefaultCoordinator的doGlobalBegin()方法来处理,最后给到DefaultCore的begin()方法来进行处理。

 

在DefaultCore的begin()方法中,首先就会创建一个全局事务会话,然后将全局事务会话的xid通过MDC放入线程本地变量副本中,接着对该全局事务会话添加一个全局事务会话的生命周期监听器,最后打开该全局事务会话、发布会话开启事件并返回全局事务会话的xid。

 

在创建一个全局事务会话GlobalSession时,首先会由uuid生成组件UUIDGenerator来生成全局事务id(transactionId),然后根据生成的全局事务id(transactionId)来继续生成xid。

 

2.Seata生成全局事务ID的雪花算法源码

(1)通过UUIDGenerator生成全局事务ID

(2)IdWorker实现的雪花算法生成的ID的组成

(3)IdWorker实现的雪花算法对时钟回拨的处理

 

(1)通过UUIDGenerator生成全局事务ID

Seata在创建全局事务会话时会通过UUIDGenerator来生成全局事务ID,UUIDGenerator在生成ID时是通过Seata自己实现的雪花算法来生成的。

public class GlobalSession implements SessionLifecycle, SessionStorable {     ...     //创建全局事务会话     public static GlobalSession createGlobalSession(String applicationId, String txServiceGroup, String txName, int timeout) {         GlobalSession session = new GlobalSession(applicationId, txServiceGroup, txName, timeout, false);         return session;     }          public GlobalSession(String applicationId, String transactionServiceGroup, String transactionName, int timeout, boolean lazyLoadBranch) {         //全局事务id是通过UUIDGenerator来生成的         this.transactionId = UUIDGenerator.generateUUID();         this.status = GlobalStatus.Begin;         this.lazyLoadBranch = lazyLoadBranch;         if (!lazyLoadBranch) {             this.branchSessions = new ArrayList<>();         }         this.applicationId = applicationId;         this.transactionServiceGroup = transactionServiceGroup;         this.transactionName = transactionName;         this.timeout = timeout;         //根据UUIDGenerator生成的transactionId + XID工具生成最终的xid         this.xid = XID.generateXID(transactionId);     }     ... }  public class UUIDGenerator {     private static volatile IdWorker idWorker;          //generate UUID using snowflake algorithm     public static long generateUUID() {         //Double Check + volatile,实现并发场景下只创建一次idWorker对象         if (idWorker == null) {             synchronized (UUIDGenerator.class) {                 if (idWorker == null) {                     init(null);                 }             }         }         //正常情况下,每次都会通过idWorker生成一个id         return idWorker.nextId();     }          //init IdWorker     public static void init(Long serverNode) {         idWorker = new IdWorker(serverNode);     } }

(2)IdWorker实现的雪花算法生成的ID的组成

IdWorker就是Seata自己实现的基于雪花算法的ID生成器。IdWorker的nextId()方法通过雪花算法生成的transactionId一共是64位,用64个bit拼接出一个唯一的ID。

 

一.最高位始终是0,占1个bit

 

二.接着的10个bit是workerId

一台机器就是一个worker,每个worker都会有一个自己的workerId。生成workerId时,是基于本机网络地址里的Mac地址来生成的。

 

三.接着的41个bit是时间戳

表示可以为某台机器的每一毫秒,分配一个自增长的ID。毫秒时间戳有13位数,转换为2进制需要2的41次方。

 

四.最后的12个bit是序列号

如果一台机器在一毫秒内需要为很多线程生成ID,就可以通过自增长的12个bit的Sequence为每个线程分配ID。

 

(3)IdWorker实现的雪花算法对时钟回拨的处理

在执行IdWorker的nextId()方法时,会对包含序列号和时间戳的timestampAndSequence进行累加,也就是对timestampAndSequence的某一个毫秒内的Sequence序列号进行累加。

 

如果出现大量的线程并发获取ID,此时可能会导致timestampAndSequence中某一个毫秒内的Sequence序列号快速累加,并且将代表Sequence序列号的12个bit全部累加完毕,最后便会导致包含序列号和时间戳的timestampAndSequence中的毫秒时间戳也进行累加。

 

但当前的实际时间其实还是这一毫秒,而timestampAndSequence里的毫秒时间戳已经累加到下一个毫秒去了,出现时钟回拨问题,于是就需要调用waitIfNecessary()方法进行处理。

 

所以,在IdWorker的waitIfNecessary()方法中,如果获取ID的QPS过高,导致当前时间戳对应的Sequence序列号被耗尽,那么就需要阻塞当前线程5毫秒。

//IdWorker就是Seata自己实现的基于雪花算法的ID生成器 public class IdWorker {     private final long twepoch = 1588435200000L;//Start time cut (2020-05-03)     private final int workerIdBits = 10;//The number of bits occupied by workerId     private final int timestampBits = 41;//The number of bits occupied by timestamp     private final int sequenceBits = 12;//The number of bits occupied by sequence     private final int maxWorkerId = ~(-1 << workerIdBits);//Maximum supported machine id, the result is 1023      //business meaning: machine ID (0 ~ 1023)     //actual layout in memory:     //highest 1 bit: 0     //middle 10 bit: workerId     //lowest 53 bit: all 0     private long workerId;      //timestampAndSequence是64位的、支持CAS操作的Long型的、包含了Sequence序列号的时间戳     //它的最高位是11个bit,没有使用     //中间有41个bit,是时间戳     //最低位有12个bit,是序列号     //timestampAndSequence可以认为是把时间戳和序列号混合在了一个long型数字里          //timestamp and sequence mix in one Long     //highest 11 bit: not used     //middle  41 bit: timestamp     //lowest  12 bit: sequence     private AtomicLong timestampAndSequence;      //mask that help to extract timestamp and sequence from a long     //可以帮忙从一个long数字里提取出一个包含Sequence序列号的时间戳     private final long timestampAndSequenceMask = ~(-1L << (timestampBits + sequenceBits));      //instantiate an IdWorker using given workerId     public IdWorker(Long workerId) {         //初始化timestampAndSequence         initTimestampAndSequence();         //初始化workerId         initWorkerId(workerId);     }      //init first timestamp and sequence immediately     private void initTimestampAndSequence() {         //获取相对于twepoch的最新时间戳         long timestamp = getNewestTimestamp();         //将最新时间戳和sequenceBits进行位运算(左移),从而得到一个混合了sequence的时间戳         long timestampWithSequence = timestamp << sequenceBits;         //把混合了sequence的时间戳,赋值给timestampAndSequence         this.timestampAndSequence = new AtomicLong(timestampWithSequence);     }      //init workerId     private void initWorkerId(Long workerId) {         if (workerId == null) {             workerId = generateWorkerId();         }         if (workerId > maxWorkerId || workerId < 0) {             String message = String.format("worker Id can't be greater than %d or less than 0", maxWorkerId);             throw new IllegalArgumentException(message);         }         //将workerId与timestampBits+sequenceBits的和进行位运算(左移),获取一个workerId         this.workerId = workerId << (timestampBits + sequenceBits);     }      //通过snowflake雪花算法来生成transactionId     //一共是64位,用64个bit拼接出一个唯一的ID,最高位始终是0,占1个bit     //接着的10个bit是workerId,一台机器就是一个worker,每个worker都会有一个自己的workerId     //接着的41个bit是时间戳,表示可以为某台机器的每一毫秒,分配一个自增长的id,毫秒时间戳有13位数,转换为2进制就需要2的41次方,2的20次方是一个7位数的数字     //最后的12个bit是序列号,如果一台机器在一毫秒内需要为很多线程生成id,就可以通过自增长的12个bit的Sequence为每个线程分配id          //get next UUID(base on snowflake algorithm), which look like:     //highest 1 bit: always 0     //next   10 bit: workerId     //next   41 bit: timestamp     //lowest 12 bit: sequence     public long nextId() {         waitIfNecessary();         //对包含Sequence序列号的时间戳timestampAndSequence进行累加,也就是对timestampAndSequence的某一个毫秒内的Sequence进行累加         //如果出现大量的线程并发获取id,此时可能会导致timestampAndSequence的某一个毫秒内的Sequence快速累加,并且将12个bit全部累加完毕         //最终导致timestampAndSequence的毫秒时间戳也进行累加了         //但当前的实际时间其实还是这一毫秒,而timestampAndSequence里的毫秒时间戳已经累加到下一个毫秒去了,于是就需要waitIfNecessary()进行处理         long next = timestampAndSequence.incrementAndGet();          //把最新的包含Sequence序列号的时间戳next与timestampAndSequenceMask进行位运算,获取真正的包含Sequence序列号的时间戳timestampWithSequence         long timestampWithSequence = next & timestampAndSequenceMask;         //对包含Sequence序列号的时间戳与workerId通过位运算拼接在一起         return workerId | timestampWithSequence;     }      //block current thread if the QPS of acquiring UUID is too high that current sequence space is exhausted     //如果获取UUID的QPS过高,导致当前时间戳对应的Sequence序列号被耗尽了,那么就需要阻塞当前线程5毫秒     private void waitIfNecessary() {         //先获取包含Sequence序列号的当前时间戳         long currentWithSequence = timestampAndSequence.get();         //将currentWithSequence与sequenceBits进行位运算(右移),获取到当前时间戳         long current = currentWithSequence >>> sequenceBits;         //获取相对于twepoch的最新时间戳         long newest = getNewestTimestamp();         //如果当前的时间戳大于最新的时间戳,说明获取UUID的QPS过高,导致timestampAndSequence增长太快了(出现时钟回拨问题)         if (current >= newest) {             try {                 //如果获取UUID的QPS过高,导致当前时间戳对应的Sequence序列号被耗尽了,那么就需要阻塞当前线程5毫秒                 Thread.sleep(5);             } catch (InterruptedException ignore) {                 //don't care             }         }     }      //get newest timestamp relative to twepoch     private long getNewestTimestamp() {         //通过当前毫秒单位的时间戳 减去 一个固定的时间twepoch,得到的就是相对于twepoch的最新时间戳         return System.currentTimeMillis() - twepoch;     }      //auto generate workerId, try using mac first, if failed, then randomly generate one     private long generateWorkerId() {         try {             //生成一个workerId,默认是基于网络的Mac地址来生成的             return generateWorkerIdBaseOnMac();         } catch (Exception e) {             return generateRandomWorkerId();         }     }      //use lowest 10 bit of available MAC as workerId     private long generateWorkerIdBaseOnMac() throws Exception {         //获取所有的网络接口         Enumeration<NetworkInterface> all = NetworkInterface.getNetworkInterfaces();         //遍历每一个网络接口         while (all.hasMoreElements()) {             NetworkInterface networkInterface = all.nextElement();             boolean isLoopback = networkInterface.isLoopback();             boolean isVirtual = networkInterface.isVirtual();             //如果是虚拟的、回环的地址,那么这个地址就跳过,不能使用             if (isLoopback || isVirtual) {                 continue;             }             //获取本机网络地址里的Mac地址,基于Mac地址来生成一个workerid             byte[] mac = networkInterface.getHardwareAddress();             return ((mac[4] & 0B11) << 8) | (mac[5] & 0xFF);         }         throw new RuntimeException("no available mac found");     }      //randomly generate one as workerId     private long generateRandomWorkerId() {         return new Random().nextInt(maxWorkerId + 1);     } }

 

3.生成xid以及对全局事务会话进行持久化的源码

(1)根据全局事务ID生成xid

(2)全局事务会话的持久化

 

(1)根据全局事务ID生成xid

xid是通过ip:port:transactionId拼接出来的。

public class XID {     private static int port;     private static String ipAddress;     ...     //Generate xid string.     public static String generateXID(long tranId) {         //首先获取当前机器的IP地址         //然后拼接上一个冒号、接着拼接一个端口号、再拼接一个冒号         //最后再拼接事务id,以此来生成xid         //所以xid是通过ip:port:transactionId拼接出来的         return new StringBuilder().append(ipAddress).append(IP_PORT_SPLIT_CHAR).append(port).append(IP_PORT_SPLIT_CHAR).append(tranId).toString();     }     ... }

(2)全局事务会话的持久化

public class DefaultCore implements Core {     ...     @Override     public String begin(String applicationId, String transactionServiceGroup, String name, int timeout) throws TransactionException {         //创建一个全局事务会话         GlobalSession session = GlobalSession.createGlobalSession(applicationId, transactionServiceGroup, name, timeout);                //通过slf4j的MDC把xid放入线程本地变量副本里去         MDC.put(RootContext.MDC_KEY_XID, session.getXid());                 //添加一个全局事务会话的生命周期监听器         session.addSessionLifecycleListener(SessionHolder.getRootSessionManager());                //打开Session,其中会对全局事务会话进行持久化         session.begin();          //transaction start event,发布会话开启事件         MetricsPublisher.postSessionDoingEvent(session, false);          //返回全局事务会话的xid         return session.getXid();     }     ... }  public class GlobalSession implements SessionLifecycle, SessionStorable {     ...     @Override     public void begin() throws TransactionException {         this.status = GlobalStatus.Begin;         this.beginTime = System.currentTimeMillis();         this.active = true;         for (SessionLifecycleListener lifecycleListener : lifecycleListeners) {             lifecycleListener.onBegin(this);         }     }     ... }  public abstract class AbstractSessionManager implements SessionManager, SessionLifecycleListener {     ...     @Override     public void onBegin(GlobalSession globalSession) throws TransactionException {         addGlobalSession(globalSession);     }          @Override     public void addGlobalSession(GlobalSession session) throws TransactionException {         if (LOGGER.isDebugEnabled()) {             LOGGER.debug("MANAGER[{}] SESSION[{}] {}", name, session, LogOperation.GLOBAL_ADD);         }         writeSession(LogOperation.GLOBAL_ADD, session);     }          private void writeSession(LogOperation logOperation, SessionStorable sessionStorable) throws TransactionException {         //transactionStoreManager.writeSession()会对全局事务会话进行持久化         if (!transactionStoreManager.writeSession(logOperation, sessionStorable)) {             if (LogOperation.GLOBAL_ADD.equals(logOperation)) {                 throw new GlobalTransactionException(TransactionExceptionCode.FailedWriteSession, "Fail to store global session");             } else if (LogOperation.GLOBAL_UPDATE.equals(logOperation)) {                 throw new GlobalTransactionException(TransactionExceptionCode.FailedWriteSession, "Fail to update global session");             } else if (LogOperation.GLOBAL_REMOVE.equals(logOperation)) {                 throw new GlobalTransactionException(TransactionExceptionCode.FailedWriteSession, "Fail to remove global session");             } else if (LogOperation.BRANCH_ADD.equals(logOperation)) {                 throw new BranchTransactionException(TransactionExceptionCode.FailedWriteSession, "Fail to store branch session");             } else if (LogOperation.BRANCH_UPDATE.equals(logOperation)) {                 throw new BranchTransactionException(TransactionExceptionCode.FailedWriteSession, "Fail to update branch session");             } else if (LogOperation.BRANCH_REMOVE.equals(logOperation)) {                 throw new BranchTransactionException(TransactionExceptionCode.FailedWriteSession, "Fail to remove branch session");             } else {                 throw new BranchTransactionException(TransactionExceptionCode.FailedWriteSession, "Unknown LogOperation:" + logOperation.name());             }         }     }     ... }

 

4.全局事务会话数据持久化的实现源码

(1)全局事务会话数据的持久化流程

(2)将全局事务会话持久化到MySQL数据库的实现

(3)将全局事务会话持久化到File文件的实现

(4)将全局事务会话持久化到Redis存储的实现

 

(1)全局事务会话数据的持久化流程

创建全局事务会话时,会通过雪花算法生成全局事务ID即transactionId,然后通过transactionId按照"ip:port:transactionId"格式生成xid。

 

创建完全局事务会话之后,就会添加一个全局事务会话的生命周期监听器,然后就会调用GlobalSession的begin()方法开启会话。

 

在GlobalSession的begin()方法中,会调用全局事务会话生命周期监听器的onBegin()方法,也就是调用SessionLifecycleListener的onBegin()方法。

 

接着就会由AbstractSessionManager对全局事务会话进行管理,将GlobalSession添加到SessionManager会话管理器中,也就是调用transactionStoreManager的writeSession()方法,对全局事务会话进行持久化。

 

默认情况下,会通过数据库进行持久化,也就是调用DataBaseTransactionStoreManager数据库事务存储管理器的writeSession()方法,将全局事务会话存储到数据库中。

 

当然Seata提供了三种方式来对全局事务会话进行持久化,分别是数据库存储、文件存储和Redis存储。

Seata源码—5.全局事务的创建与返回处理

(2)将全局事务会话持久化到MySQL数据库的实现

//The type Database transaction store manager. public class DataBaseTransactionStoreManager extends AbstractTransactionStoreManager implements TransactionStoreManager {     private static volatile DataBaseTransactionStoreManager instance;     protected LogStore logStore;     ...     //Get the instance.     public static DataBaseTransactionStoreManager getInstance() {         if (instance == null) {             synchronized (DataBaseTransactionStoreManager.class) {                 if (instance == null) {                     instance = new DataBaseTransactionStoreManager();                 }             }         }         return instance;     }          //Instantiates a new Database transaction store manager.     private DataBaseTransactionStoreManager() {         logQueryLimit = CONFIG.getInt(ConfigurationKeys.STORE_DB_LOG_QUERY_LIMIT, DEFAULT_LOG_QUERY_LIMIT);         String datasourceType = CONFIG.getConfig(ConfigurationKeys.STORE_DB_DATASOURCE_TYPE);         //init dataSource,通过SPI机制加载DataSourceProvider         DataSource logStoreDataSource = EnhancedServiceLoader.load(DataSourceProvider.class, datasourceType).provide();         logStore = new LogStoreDataBaseDAO(logStoreDataSource);     }          @Override     public boolean writeSession(LogOperation logOperation, SessionStorable session) {         if (LogOperation.GLOBAL_ADD.equals(logOperation)) {             return logStore.insertGlobalTransactionDO(SessionConverter.convertGlobalTransactionDO(session));         } else if (LogOperation.GLOBAL_UPDATE.equals(logOperation)) {             return logStore.updateGlobalTransactionDO(SessionConverter.convertGlobalTransactionDO(session));         } else if (LogOperation.GLOBAL_REMOVE.equals(logOperation)) {             return logStore.deleteGlobalTransactionDO(SessionConverter.convertGlobalTransactionDO(session));         } else if (LogOperation.BRANCH_ADD.equals(logOperation)) {             return logStore.insertBranchTransactionDO(SessionConverter.convertBranchTransactionDO(session));         } else if (LogOperation.BRANCH_UPDATE.equals(logOperation)) {             return logStore.updateBranchTransactionDO(SessionConverter.convertBranchTransactionDO(session));         } else if (LogOperation.BRANCH_REMOVE.equals(logOperation)) {             return logStore.deleteBranchTransactionDO(SessionConverter.convertBranchTransactionDO(session));         } else {             throw new StoreException("Unknown LogOperation:" + logOperation.name());         }     }     ... }  public class LogStoreDataBaseDAO implements LogStore {     protected DataSource logStoreDataSource = null;     protected String globalTable;     protected String branchTable;     private String dbType;     ...     public LogStoreDataBaseDAO(DataSource logStoreDataSource) {         this.logStoreDataSource = logStoreDataSource;         globalTable = CONFIG.getConfig(ConfigurationKeys.STORE_DB_GLOBAL_TABLE, DEFAULT_STORE_DB_GLOBAL_TABLE);         branchTable = CONFIG.getConfig(ConfigurationKeys.STORE_DB_BRANCH_TABLE, DEFAULT_STORE_DB_BRANCH_TABLE);         dbType = CONFIG.getConfig(ConfigurationKeys.STORE_DB_TYPE);         if (StringUtils.isBlank(dbType)) {             throw new StoreException("there must be db type.");         }         if (logStoreDataSource == null) {             throw new StoreException("there must be logStoreDataSource.");         }         //init transaction_name size         initTransactionNameSize();     }          @Override     public boolean insertGlobalTransactionDO(GlobalTransactionDO globalTransactionDO) {         String sql = LogStoreSqlsFactory.getLogStoreSqls(dbType).getInsertGlobalTransactionSQL(globalTable);         Connection conn = null;         PreparedStatement ps = null;         try {             int index = 1;             conn = logStoreDataSource.getConnection();             conn.setAutoCommit(true);             ps = conn.prepareStatement(sql);             ps.setString(index++, globalTransactionDO.getXid());             ps.setLong(index++, globalTransactionDO.getTransactionId());             ps.setInt(index++, globalTransactionDO.getStatus());             ps.setString(index++, globalTransactionDO.getApplicationId());             ps.setString(index++, globalTransactionDO.getTransactionServiceGroup());             String transactionName = globalTransactionDO.getTransactionName();             transactionName = transactionName.length() > transactionNameColumnSize ? transactionName.substring(0, transactionNameColumnSize) : transactionName;             ps.setString(index++, transactionName);             ps.setInt(index++, globalTransactionDO.getTimeout());             ps.setLong(index++, globalTransactionDO.getBeginTime());             ps.setString(index++, globalTransactionDO.getApplicationData());             return ps.executeUpdate() > 0;         } catch (SQLException e) {             throw new StoreException(e);         } finally {             IOUtil.close(ps, conn);         }     }     ... }

(3)将全局事务会话持久化到File文件的实现

public class FileTransactionStoreManager extends AbstractTransactionStoreManager implements TransactionStoreManager, ReloadableStore {     private ReentrantLock writeSessionLock = new ReentrantLock();     ...     @Override     public boolean writeSession(LogOperation logOperation, SessionStorable session) {         long curFileTrxNum;         writeSessionLock.lock();         try {             if (!writeDataFile(new TransactionWriteStore(session, logOperation).encode())) {                 return false;             }             lastModifiedTime = System.currentTimeMillis();             curFileTrxNum = FILE_TRX_NUM.incrementAndGet();             if (curFileTrxNum % PER_FILE_BLOCK_SIZE == 0 && (System.currentTimeMillis() - trxStartTimeMills) > MAX_TRX_TIMEOUT_MILLS) {                 return saveHistory();             }         } catch (Exception exx) {             LOGGER.error("writeSession error, {}", exx.getMessage(), exx);             return false;         } finally {             writeSessionLock.unlock();         }         flushDisk(curFileTrxNum, currFileChannel);         return true;     }          private boolean writeDataFile(byte[] bs) {         if (bs == null || bs.length >= Integer.MAX_VALUE - 3) {             return false;         }         if (!writeDataFrame(bs)) {             return false;         }         return flushWriteBuffer(writeBuffer);     }          private boolean writeDataFrame(byte[] data) {         if (data == null || data.length <= 0) {             return true;         }         int dataLength = data.length;         int bufferRemainingSize = writeBuffer.remaining();         if (bufferRemainingSize <= INT_BYTE_SIZE) {             if (!flushWriteBuffer(writeBuffer)) {                 return false;             }         }         bufferRemainingSize = writeBuffer.remaining();         if (bufferRemainingSize <= INT_BYTE_SIZE) {             throw new IllegalStateException(String.format("Write buffer remaining size %d was too small", bufferRemainingSize));         }         writeBuffer.putInt(dataLength);         bufferRemainingSize = writeBuffer.remaining();         int dataPos = 0;         while (dataPos < dataLength) {             int dataLengthToWrite = dataLength - dataPos;             dataLengthToWrite = Math.min(dataLengthToWrite, bufferRemainingSize);             writeBuffer.put(data, dataPos, dataLengthToWrite);             bufferRemainingSize = writeBuffer.remaining();             if (bufferRemainingSize == 0) {                 if (!flushWriteBuffer(writeBuffer)) {                     return false;                 }                 bufferRemainingSize = writeBuffer.remaining();             }             dataPos += dataLengthToWrite;         }         return true;     }          private boolean flushWriteBuffer(ByteBuffer writeBuffer) {         writeBuffer.flip();         if (!writeDataFileByBuffer(writeBuffer)) {             return false;         }         writeBuffer.clear();         return true;     }          private void flushDisk(long curFileNum, FileChannel currFileChannel) {         if (FLUSH_DISK_MODE == FlushDiskMode.SYNC_MODEL) {             SyncFlushRequest syncFlushRequest = new SyncFlushRequest(curFileNum, currFileChannel);             writeDataFileRunnable.putRequest(syncFlushRequest);             syncFlushRequest.waitForFlush(MAX_WAIT_FOR_FLUSH_TIME_MILLS);         } else {             writeDataFileRunnable.putRequest(new AsyncFlushRequest(curFileNum, currFileChannel));         }     }     ... }  public class TransactionWriteStore implements SessionStorable {     private SessionStorable sessionRequest;     private LogOperation operate;          public TransactionWriteStore(SessionStorable sessionRequest, LogOperation operate) {         this.sessionRequest = sessionRequest;         this.operate = operate;     }          @Override     public byte[] encode() {         byte[] bySessionRequest = this.sessionRequest.encode();         byte byOpCode = this.getOperate().getCode();         int len = bySessionRequest.length + 1;         byte[] byResult = new byte[len];         ByteBuffer byteBuffer = ByteBuffer.wrap(byResult);         byteBuffer.put(bySessionRequest);         byteBuffer.put(byOpCode);         return byResult;     }     ... }

(4)将全局事务会话持久化到Redis存储的实现

这里的实现比较优雅,十分值得借鉴。

public class RedisTransactionStoreManager extends AbstractTransactionStoreManager implements TransactionStoreManager {     private static volatile RedisTransactionStoreManager instance;     //Map for LogOperation Global Operation     public static volatile ImmutableMap<LogOperation, Function<GlobalTransactionDO, Boolean>> globalMap;     //Map for LogOperation Branch Operation     public static volatile ImmutableMap<LogOperation, Function<BranchTransactionDO, Boolean>> branchMap;     ...     public static RedisTransactionStoreManager getInstance() {         if (instance == null) {             synchronized (RedisTransactionStoreManager.class) {                 if (instance == null) {                     instance = new RedisTransactionStoreManager();                 }             }         }         return instance;     }          public RedisTransactionStoreManager() {         super();         initGlobalMap();         initBranchMap();         logQueryLimit = CONFIG.getInt(STORE_REDIS_QUERY_LIMIT, DEFAULT_LOG_QUERY_LIMIT);         if (logQueryLimit > DEFAULT_LOG_QUERY_LIMIT) {             logQueryLimit = DEFAULT_LOG_QUERY_LIMIT;         }     }          public void initGlobalMap() {         if (CollectionUtils.isEmpty(branchMap)) {             globalMap = ImmutableMap.<LogOperation, Function<GlobalTransactionDO, Boolean>>builder()                 .put(LogOperation.GLOBAL_ADD, this::insertGlobalTransactionDO)                 .put(LogOperation.GLOBAL_UPDATE, this::updateGlobalTransactionDO)                 .put(LogOperation.GLOBAL_REMOVE, this::deleteGlobalTransactionDO)                 .build();         }     }         public void initBranchMap() {         if (CollectionUtils.isEmpty(branchMap)) {             branchMap = ImmutableMap.<LogOperation, Function<BranchTransactionDO, Boolean>>builder()                 .put(LogOperation.BRANCH_ADD, this::insertBranchTransactionDO)                 .put(LogOperation.BRANCH_UPDATE, this::updateBranchTransactionDO)                 .put(LogOperation.BRANCH_REMOVE, this::deleteBranchTransactionDO)                 .build();         }     }          //Insert the global transaction.     private boolean insertGlobalTransactionDO(GlobalTransactionDO globalTransactionDO) {         String globalKey = buildGlobalKeyByTransactionId(globalTransactionDO.getTransactionId());         try (Jedis jedis = JedisPooledFactory.getJedisInstance(); Pipeline pipelined = jedis.pipelined()) {             Date now = new Date();             globalTransactionDO.setGmtCreate(now);             globalTransactionDO.setGmtModified(now);             pipelined.hmset(globalKey, BeanUtils.objectToMap(globalTransactionDO));             pipelined.rpush(buildGlobalStatus(globalTransactionDO.getStatus()), globalTransactionDO.getXid());             pipelined.sync();             return true;         } catch (Exception ex) {             throw new RedisException(ex);         }     }          //Insert branch transaction     private boolean insertBranchTransactionDO(BranchTransactionDO branchTransactionDO) {         String branchKey = buildBranchKey(branchTransactionDO.getBranchId());         String branchListKey = buildBranchListKeyByXid(branchTransactionDO.getXid());         try (Jedis jedis = JedisPooledFactory.getJedisInstance(); Pipeline pipelined = jedis.pipelined()) {             Date now = new Date();             branchTransactionDO.setGmtCreate(now);             branchTransactionDO.setGmtModified(now);             pipelined.hmset(branchKey, BeanUtils.objectToMap(branchTransactionDO));             pipelined.rpush(branchListKey, branchKey);               pipelined.sync();             return true;         } catch (Exception ex) {             throw new RedisException(ex);         }     }          @Override     public boolean writeSession(LogOperation logOperation, SessionStorable session) {         if (globalMap.containsKey(logOperation) || branchMap.containsKey(logOperation)) {             return globalMap.containsKey(logOperation) ?                 globalMap.get(logOperation).apply(SessionConverter.convertGlobalTransactionDO(session)) :                 branchMap.get(logOperation).apply(SessionConverter.convertBranchTransactionDO(session));         } else {             throw new StoreException("Unknown LogOperation:" + logOperation.name());         }     }     ... }  public class SessionConverter {     ...     public static GlobalTransactionDO convertGlobalTransactionDO(SessionStorable session) {         if (session == null || !(session instanceof GlobalSession)) {             throw new IllegalArgumentException("The parameter of SessionStorable is not available, SessionStorable:" + StringUtils.toString(session));         }         GlobalSession globalSession = (GlobalSession)session;          GlobalTransactionDO globalTransactionDO = new GlobalTransactionDO();         globalTransactionDO.setXid(globalSession.getXid());         globalTransactionDO.setStatus(globalSession.getStatus().getCode());         globalTransactionDO.setApplicationId(globalSession.getApplicationId());         globalTransactionDO.setBeginTime(globalSession.getBeginTime());         globalTransactionDO.setTimeout(globalSession.getTimeout());         globalTransactionDO.setTransactionId(globalSession.getTransactionId());         globalTransactionDO.setTransactionName(globalSession.getTransactionName());         globalTransactionDO.setTransactionServiceGroup(globalSession.getTransactionServiceGroup());         globalTransactionDO.setApplicationData(globalSession.getApplicationData());         return globalTransactionDO;     }          public static BranchTransactionDO convertBranchTransactionDO(SessionStorable session) {         if (session == null || !(session instanceof BranchSession)) {             throw new IllegalArgumentException("The parameter of SessionStorable is not available, SessionStorable:" + StringUtils.toString(session));         }         BranchSession branchSession = (BranchSession)session;         BranchTransactionDO branchTransactionDO = new BranchTransactionDO();         branchTransactionDO.setXid(branchSession.getXid());         branchTransactionDO.setBranchId(branchSession.getBranchId());         branchTransactionDO.setBranchType(branchSession.getBranchType().name());         branchTransactionDO.setClientId(branchSession.getClientId());         branchTransactionDO.setResourceGroupId(branchSession.getResourceGroupId());         branchTransactionDO.setTransactionId(branchSession.getTransactionId());         branchTransactionDO.setApplicationData(branchSession.getApplicationData());         branchTransactionDO.setResourceId(branchSession.getResourceId());         branchTransactionDO.setStatus(branchSession.getStatus().getCode());         return branchTransactionDO;     }     ... }

 

5.Seata Server创建全局事务与返回xid的源码

-> ServerHandler.channelRead()接收Seata Client发送过来的请求; -> AbstractNettyRemoting.processMessage()处理RpcMessage消息; -> ServerOnRequestProcessor.process()处理RpcMessage消息; -> TransactionMessageHandler.onRequest()处理RpcMessage消息; -> RemotingServer.sendAsyncResponse()返回包含xid的响应给客户端;
public abstract class AbstractNettyRemotingServer extends AbstractNettyRemoting implements RemotingServer {     ...     @ChannelHandler.Sharable     class ServerHandler extends ChannelDuplexHandler {         @Override         public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception {             if (!(msg instanceof RpcMessage)) {                 return;             }             //接下来调用processMessage()方法对解码完毕的RpcMessage对象进行处理             processMessage(ctx, (RpcMessage) msg);         }     } }  public abstract class AbstractNettyRemoting implements Disposable {     ...     protected void processMessage(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception {         if (LOGGER.isDebugEnabled()) {             LOGGER.debug(String.format("%s msgId:%s, body:%s", this, rpcMessage.getId(), rpcMessage.getBody()));         }         Object body = rpcMessage.getBody();         if (body instanceof MessageTypeAware) {             MessageTypeAware messageTypeAware = (MessageTypeAware) body;             //根据消息类型获取到一个Pair对象,该Pair对象是由请求处理组件和请求处理线程池组成的             //processorTable里的内容,是NettyRemotingServer在初始化时,通过调用registerProcessor()方法put进去的             //所以下面的代码实际上会由ServerOnRequestProcessor的process()方法进行处理             final Pair<RemotingProcessor, ExecutorService> pair = this.processorTable.get((int) messageTypeAware.getTypeCode());             if (pair != null) {                 if (pair.getSecond() != null) {                     try {                         pair.getSecond().execute(() -> {                             try {                                 pair.getFirst().process(ctx, rpcMessage);                             } catch (Throwable th) {                                 LOGGER.error(FrameworkErrorCode.NetDispatch.getErrCode(), th.getMessage(), th);                             } finally {                                 MDC.clear();                             }                         });                     } catch (RejectedExecutionException e) {                         ...                     }                 } else {                     try {                         pair.getFirst().process(ctx, rpcMessage);                     } catch (Throwable th) {                         LOGGER.error(FrameworkErrorCode.NetDispatch.getErrCode(), th.getMessage(), th);                     }                 }             } else {                 LOGGER.error("This message type [{}] has no processor.", messageTypeAware.getTypeCode());             }         } else {             LOGGER.error("This rpcMessage body[{}] is not MessageTypeAware type.", body);         }     }     ... }  public class ServerOnRequestProcessor implements RemotingProcessor, Disposable {     private final RemotingServer remotingServer;     ...     @Override     public void process(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception {         if (ChannelManager.isRegistered(ctx.channel())) {             onRequestMessage(ctx, rpcMessage);         } else {             try {                 if (LOGGER.isInfoEnabled()) {                     LOGGER.info("closeChannelHandlerContext channel:" + ctx.channel());                 }                 ctx.disconnect();                 ctx.close();             } catch (Exception exx) {                 LOGGER.error(exx.getMessage());             }             if (LOGGER.isInfoEnabled()) {                 LOGGER.info(String.format("close a unhandled connection! [%s]", ctx.channel().toString()));             }         }     }          private void onRequestMessage(ChannelHandlerContext ctx, RpcMessage rpcMessage) {         Object message = rpcMessage.getBody();         //RpcContext线程本地变量副本         RpcContext rpcContext = ChannelManager.getContextFromIdentified(ctx.channel());         if (LOGGER.isDebugEnabled()) {             LOGGER.debug("server received:{},clientIp:{},vgroup:{}", message, NetUtil.toIpAddress(ctx.channel().remoteAddress()), rpcContext.getTransactionServiceGroup());         } else {             try {                 BatchLogHandler.INSTANCE.getLogQueue().put(message + ",clientIp:" + NetUtil.toIpAddress(ctx.channel().remoteAddress()) + ",vgroup:" + rpcContext.getTransactionServiceGroup());             } catch (InterruptedException e) {                 LOGGER.error("put message to logQueue error: {}", e.getMessage(), e);             }         }         if (!(message instanceof AbstractMessage)) {             return;         }         // the batch send request message         if (message instanceof MergedWarpMessage) {             ...         } else {             // the single send request message             final AbstractMessage msg = (AbstractMessage) message;             //最终调用到DefaultCoordinator的onRequest()方法来处理RpcMessage             AbstractResultMessage result = transactionMessageHandler.onRequest(msg, rpcContext);             //返回响应给客户端             remotingServer.sendAsyncResponse(rpcMessage, ctx.channel(), result);         }     }     ... }
-> TransactionMessageHandler.onRequest()处理RpcMessage消息; -> DefaultCoordinator.onRequest()处理RpcMessage消息; -> GlobalBeginRequest.handle()处理开启全局事务请求; -> AbstractTCInboundHandler.handle()开启全局事务返回全局事务; -> DefaultCoordinator.doGlobalBegin()开启全局事务; -> DefaultCore.begin()创建全局事务会话并开启; -> GlobalSession.createGlobalSession()创建全局事务会话; -> GlobalSession.begin()开启全局事务会话; -> AbstractSessionManager.onBegin() -> AbstractSessionManager.addGlobalSession() -> AbstractSessionManager.writeSession() -> TransactionStoreManager.writeSession()持久化全局事务会话;
public class DefaultCoordinator extends AbstractTCInboundHandler implements TransactionMessageHandler, Disposable {     ...     @Override     public AbstractResultMessage onRequest(AbstractMessage request, RpcContext context) {         if (!(request instanceof AbstractTransactionRequestToTC)) {             throw new IllegalArgumentException();         }         AbstractTransactionRequestToTC transactionRequest = (AbstractTransactionRequestToTC) request;         transactionRequest.setTCInboundHandler(this);         return transactionRequest.handle(context);     }     ... }  public class GlobalBeginRequest extends AbstractTransactionRequestToTC {     ...     @Override     public AbstractTransactionResponse handle(RpcContext rpcContext) {         return handler.handle(this, rpcContext);     }     ... }  public abstract class AbstractTCInboundHandler extends AbstractExceptionHandler implements TCInboundHandler {     private static final Logger LOGGER = LoggerFactory.getLogger(AbstractTCInboundHandler.class);          @Override     public GlobalBeginResponse handle(GlobalBeginRequest request, final RpcContext rpcContext) {         GlobalBeginResponse response = new GlobalBeginResponse();         exceptionHandleTemplate(new AbstractCallback<GlobalBeginRequest, GlobalBeginResponse>() {             @Override             public void execute(GlobalBeginRequest request, GlobalBeginResponse response) throws TransactionException {                 try {                     //开启全局事务                     doGlobalBegin(request, response, rpcContext);                 } catch (StoreException e) {                     throw new TransactionException(TransactionExceptionCode.FailedStore, String.format("begin global request failed. xid=%s, msg=%s", response.getXid(), e.getMessage()), e);                 }             }         }, request, response);         return response;     }     ... }  public class DefaultCoordinator extends AbstractTCInboundHandler implements TransactionMessageHandler, Disposable {     private final DefaultCore core;     ...     @Override     protected void doGlobalBegin(GlobalBeginRequest request, GlobalBeginResponse response, RpcContext rpcContext) throws TransactionException {         //接下来才真正处理开启全局事务的业务逻辑         //其中会调用DefaultCore来真正开启一个全局事务,即拿到xid并设置到响应里去         response.setXid(core.begin(             rpcContext.getApplicationId(),//应用程序id             rpcContext.getTransactionServiceGroup(),//事务服务分组             request.getTransactionName(),//事务名称             request.getTimeout())//超时时间         );         if (LOGGER.isInfoEnabled()) {             LOGGER.info("Begin new global transaction applicationId: {},transactionServiceGroup: {}, transactionName: {},timeout:{},xid:{}", rpcContext.getApplicationId(), rpcContext.getTransactionServiceGroup(), request.getTransactionName(), request.getTimeout(), response.getXid());         }     }     ... }  public class DefaultCore implements Core {     ...     @Override     public String begin(String applicationId, String transactionServiceGroup, String name, int timeout) throws TransactionException {         //创建一个全局事务会话         GlobalSession session = GlobalSession.createGlobalSession(applicationId, transactionServiceGroup, name, timeout);          //通过slf4j的MDC把xid放入线程本地变量副本里去         MDC.put(RootContext.MDC_KEY_XID, session.getXid());          //添加一个全局事务会话的生命周期监听器         session.addSessionLifecycleListener(SessionHolder.getRootSessionManager());          //打开Session,其中会对全局事务会话进行持久化         session.begin();          //transaction start event,发布会话开启事件         MetricsPublisher.postSessionDoingEvent(session, false);          //返回全局事务会话的xid         return session.getXid();     }     ... }  public class GlobalSession implements SessionLifecycle, SessionStorable {     ...     public static GlobalSession createGlobalSession(String applicationId, String txServiceGroup, String txName, int timeout) {         GlobalSession session = new GlobalSession(applicationId, txServiceGroup, txName, timeout, false);         return session;     }          public GlobalSession(String applicationId, String transactionServiceGroup, String transactionName, int timeout, boolean lazyLoadBranch) {         //全局事务id是通过UUIDGenerator来生成的         this.transactionId = UUIDGenerator.generateUUID();         this.status = GlobalStatus.Begin;         this.lazyLoadBranch = lazyLoadBranch;         if (!lazyLoadBranch) {             this.branchSessions = new ArrayList<>();         }         this.applicationId = applicationId;         this.transactionServiceGroup = transactionServiceGroup;         this.transactionName = transactionName;         this.timeout = timeout;         //根据UUIDGenerator生成的transactionId + XID工具生成最终的xid         this.xid = XID.generateXID(transactionId);     }          @Override     public void begin() throws TransactionException {         this.status = GlobalStatus.Begin;         this.beginTime = System.currentTimeMillis();         this.active = true;         for (SessionLifecycleListener lifecycleListener : lifecycleListeners) {             lifecycleListener.onBegin(this);         }     }     ... }  public abstract class AbstractSessionManager implements SessionManager, SessionLifecycleListener {     ...     @Override     public void onBegin(GlobalSession globalSession) throws TransactionException {         addGlobalSession(globalSession);     }          @Override     public void addGlobalSession(GlobalSession session) throws TransactionException {         if (LOGGER.isDebugEnabled()) {             LOGGER.debug("MANAGER[{}] SESSION[{}] {}", name, session, LogOperation.GLOBAL_ADD);         }         writeSession(LogOperation.GLOBAL_ADD, session);     }          private void writeSession(LogOperation logOperation, SessionStorable sessionStorable) throws TransactionException {         //transactionStoreManager.writeSession()会对全局事务会话进行持久化         if (!transactionStoreManager.writeSession(logOperation, sessionStorable)) {             ...         }     }     ... }
-> RemotingServer.sendAsyncResponse()返回包含xid的响应给客户端; -> AbstractNettyRemotingServer.sendAsyncResponse()异步发送响应; -> AbstractNettyRemoting.buildResponseMessage()构造包含xid响应; -> AbstractNettyRemoting.sendAsync()异步发送响应; -> Netty的Channel.writeAndFlush()发送响应给客户端;
public abstract class AbstractNettyRemotingServer extends AbstractNettyRemoting implements RemotingServer {     ...     @Override     public void sendAsyncResponse(RpcMessage rpcMessage, Channel channel, Object msg) {         Channel clientChannel = channel;         if (!(msg instanceof HeartbeatMessage)) {             clientChannel = ChannelManager.getSameClientChannel(channel);         }         if (clientChannel != null) {             RpcMessage rpcMsg = buildResponseMessage(rpcMessage, msg, msg instanceof HeartbeatMessage                 ? ProtocolConstants.MSGTYPE_HEARTBEAT_RESPONSE                 : ProtocolConstants.MSGTYPE_RESPONSE);             super.sendAsync(clientChannel, rpcMsg);         } else {             throw new RuntimeException("channel is error.");         }     }     ... }  public abstract class AbstractNettyRemoting implements Disposable {     ...     protected RpcMessage buildResponseMessage(RpcMessage rpcMessage, Object msg, byte messageType) {         RpcMessage rpcMsg = new RpcMessage();         rpcMsg.setMessageType(messageType);         rpcMsg.setCodec(rpcMessage.getCodec()); // same with request         rpcMsg.setCompressor(rpcMessage.getCompressor());         rpcMsg.setBody(msg);         rpcMsg.setId(rpcMessage.getId());         return rpcMsg;     }          //rpc async request.     protected void sendAsync(Channel channel, RpcMessage rpcMessage) {         channelWritableCheck(channel, rpcMessage.getBody());         if (LOGGER.isDebugEnabled()) {             LOGGER.debug("write message:" + rpcMessage.getBody() + ", channel:" + channel + ",active?" + channel.isActive() + ",writable?" + channel.isWritable() + ",isopen?" + channel.isOpen());         }         doBeforeRpcHooks(ChannelUtil.getAddressFromChannel(channel), rpcMessage);         channel.writeAndFlush(rpcMessage).addListener((ChannelFutureListener) future -> {             if (!future.isSuccess()) {                 destroyChannel(future.channel());             }         });     }     ... }

 

6.Client获取Server的响应与处理的源码

-> ClientHandler.channelRead()接收Seata Server返回的响应; -> AbstractNettyRemoting.processMessage()处理RpcMessage消息; -> ClientOnResponseProcessor.process()会设置MessageFuture结果; -> MessageFuture.setResultMessage()设置MessageFuture结果; -> CompletableFuture.complete()唤醒阻塞的线程;
public abstract class AbstractNettyRemotingClient extends AbstractNettyRemoting implements RemotingClient {     ...     @Sharable     class ClientHandler extends ChannelDuplexHandler {         @Override         public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception {             if (!(msg instanceof RpcMessage)) {                 return;             }             processMessage(ctx, (RpcMessage) msg);         }         ...     }     ... }  public abstract class AbstractNettyRemoting implements Disposable {     ...     protected void processMessage(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception {         if (LOGGER.isDebugEnabled()) {             LOGGER.debug(String.format("%s msgId:%s, body:%s", this, rpcMessage.getId(), rpcMessage.getBody()));         }         Object body = rpcMessage.getBody();         if (body instanceof MessageTypeAware) {             MessageTypeAware messageTypeAware = (MessageTypeAware) body;             //根据消息类型获取到一个Pair对象,该Pair对象是由请求处理组件和请求处理线程池组成的             //processorTable里的内容,是NettyRemotingServer在初始化时,通过调用registerProcessor()方法put进去的             //所以下面的代码实际上会由ServerOnRequestProcessor的process()方法进行处理             final Pair<RemotingProcessor, ExecutorService> pair = this.processorTable.get((int) messageTypeAware.getTypeCode());             if (pair != null) {                 if (pair.getSecond() != null) {                     try {                         pair.getSecond().execute(() -> {                             try {                                 pair.getFirst().process(ctx, rpcMessage);                             } catch (Throwable th) {                                 LOGGER.error(FrameworkErrorCode.NetDispatch.getErrCode(), th.getMessage(), th);                             } finally {                                 MDC.clear();                             }                         });                     } catch (RejectedExecutionException e) {                         ...                     }                 } else {                     try {                         pair.getFirst().process(ctx, rpcMessage);                     } catch (Throwable th) {                         LOGGER.error(FrameworkErrorCode.NetDispatch.getErrCode(), th.getMessage(), th);                     }                 }             } else {                 LOGGER.error("This message type [{}] has no processor.", messageTypeAware.getTypeCode());             }         } else {             LOGGER.error("This rpcMessage body[{}] is not MessageTypeAware type.", body);         }     }     ... }  public class ClientOnResponseProcessor implements RemotingProcessor {     ...     @Override     public void process(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception {         if (rpcMessage.getBody() instanceof MergeResultMessage) {             ...         } else if (rpcMessage.getBody() instanceof BatchResultMessage) {             ...         } else {             //这里是对普通消息的处理             MessageFuture messageFuture = futures.remove(rpcMessage.getId());             if (messageFuture != null) {                 messageFuture.setResultMessage(rpcMessage.getBody());             } else {                 if (rpcMessage.getBody() instanceof AbstractResultMessage) {                     if (transactionMessageHandler != null) {                         transactionMessageHandler.onResponse((AbstractResultMessage) rpcMessage.getBody(), null);                     }                 }             }         }     }     ... }  public class MessageFuture {     private transient CompletableFuture<Object> origin = new CompletableFuture<>();     ...     //Sets result message.     public void setResultMessage(Object obj) {         origin.complete(obj);     }     ... }

由于Seata Client发送开启全局事务的请求给Seata Server时,会通过MessageFuture的get()方法同步等待Seata Server返回响应。所以当Seata Client获取Seata Server的响应并通过complete()方法设置MessageFuture已经完成后,原来同步等待Seata Server响应的线程便会继续往下处理。

 

即某线程执行CompletableFuture.complete()方法后,执行CompletableFuture.get()方法的线程就不会被阻塞而会被唤醒。

-> GlobalTransactionalInterceptor.invoke() -> GlobalTransactionalInterceptor.handleGlobalTransaction() -> TransactionalTemplate.execute() -> TransactionalTemplate.beginTransaction() -> DefaultGlobalTransaction.begin() -> DefaultTransactionManager.begin() -> DefaultTransactionManager.syncCall() -> TmNettyRemotingClient.sendSyncRequest() -> AbstractNettyRemotingClient.sendSyncRequest()发送请求; -> AbstractNettyRemoting.sendSync()发送同步请求; -> MessageFuture.get()会同步等待Seata Server的响应结果; -> CompletableFuture.get()阻塞当前线程进行等待唤醒;
public abstract class AbstractNettyRemotingClient extends AbstractNettyRemoting implements RemotingClient {     ...     @Override     public Object sendSyncRequest(Object msg) throws TimeoutException {         //因为Seata Server是可以多节点部署实现高可用架构的,所以这里调用loadBalance()方法进行负载均衡         String serverAddress = loadBalance(getTransactionServiceGroup(), msg);         //获取RPC调用的超时时间         long timeoutMillis = this.getRpcRequestTimeout();         //构建一个RPC消息         RpcMessage rpcMessage = buildRequestMessage(msg, ProtocolConstants.MSGTYPE_RESQUEST_SYNC);          //send batch message         //put message into basketMap, @see MergedSendRunnable         //默认是不开启批量消息发送         if (this.isEnableClientBatchSendRequest()) {             ...         } else {             //通过网络连接管理器clientChannelManager,获取与指定Seata Server建立的网络连接Channel             //然后通过网络连接Channel把RpcMessage发送出去             Channel channel = clientChannelManager.acquireChannel(serverAddress);             return super.sendSync(channel, rpcMessage, timeoutMillis);         }     }     ... }  public abstract class AbstractNettyRemoting implements Disposable {     ...     protected Object sendSync(Channel channel, RpcMessage rpcMessage, long timeoutMillis) throws TimeoutException {         if (timeoutMillis <= 0) {             throw new FrameworkException("timeout should more than 0ms");         }         if (channel == null) {             LOGGER.warn("sendSync nothing, caused by null channel.");             return null;         }          //把发送出去的请求封装到MessageFuture中,然后存放到futures这个Map里         MessageFuture messageFuture = new MessageFuture();         messageFuture.setRequestMessage(rpcMessage);         messageFuture.setTimeout(timeoutMillis);         futures.put(rpcMessage.getId(), messageFuture);          channelWritableCheck(channel, rpcMessage.getBody());          //获取远程地址         String remoteAddr = ChannelUtil.getAddressFromChannel(channel);         doBeforeRpcHooks(remoteAddr, rpcMessage);          //异步化发送数据,同时对发送结果添加监听器         //如果发送失败,则会对网络连接Channel进行销毁处理         channel.writeAndFlush(rpcMessage).addListener((ChannelFutureListener) future -> {             if (!future.isSuccess()) {                 MessageFuture messageFuture1 = futures.remove(rpcMessage.getId());                 if (messageFuture1 != null) {                     messageFuture1.setResultMessage(future.cause());                 }                 destroyChannel(future.channel());             }         });          try {             //然后通过请求响应组件MessageFuture同步等待Seata Server返回该请求的响应             Object result = messageFuture.get(timeoutMillis, TimeUnit.MILLISECONDS);             doAfterRpcHooks(remoteAddr, rpcMessage, result);             return result;         } catch (Exception exx) {             LOGGER.error("wait response error:{},ip:{},request:{}", exx.getMessage(), channel.remoteAddress(), rpcMessage.getBody());             if (exx instanceof TimeoutException) {                 throw (TimeoutException) exx;             } else {                 throw new RuntimeException(exx);             }         }     }     ... }  public class MessageFuture {     private transient CompletableFuture<Object> origin = new CompletableFuture<>();     ...     public Object get(long timeout, TimeUnit unit) throws TimeoutException, InterruptedException {         Object result = null;         try {             result = origin.get(timeout, unit);             if (result instanceof TimeoutException) {                 throw (TimeoutException)result;             }         } catch (ExecutionException e) {             throw new ShouldNeverHappenException("Should not get results in a multi-threaded environment", e);         } catch (TimeoutException e) {             throw new TimeoutException(String.format("%s ,cost: %d ms", e.getMessage(), System.currentTimeMillis() - start));         }          if (result instanceof RuntimeException) {             throw (RuntimeException)result;         } else if (result instanceof Throwable) {             throw new RuntimeException((Throwable)result);         }          return result;     }     ... }

 

7.Seata与Dubbo整合的过滤器源码

(1)调用Dubbo过滤器的入口

(2)Seata与Dubbo整合的过滤器

 

(1)调用Dubbo过滤器的入口

-> GlobalTransactionalInterceptor.invoke()拦截添加了@GlobalTransactional注解的方法; -> GlobalTransactionalInterceptor.handleGlobalTransaction()进行全局事务的处理; -> TransactionalTemplate.execute()执行全局事务 -> TransactionalTemplate.beginTransaction()开启一个全局事务 -> handleGlobalTransaction().methodInvocation.proceed()真正执行目标方法 -> ApacheDubboTransactionPropagationFilter.invoke()经过Dubbo过滤器处理
public class GlobalTransactionalInterceptor implements ConfigurationChangeListener, MethodInterceptor, SeataInterceptor {     ...     //如果调用添加了@GlobalTransactional注解的方法,就会执行如下invoke()方法     @Override     public Object invoke(final MethodInvocation methodInvocation) throws Throwable {         //methodInvocation是一次方法调用         //通过methodInvocation的getThis()方法可以获取到被调用方法的对象         //通过AopUtils.getTargetClass()方法可以获取到对象对应的Class         Class<?> targetClass = methodInvocation.getThis() != null ? AopUtils.getTargetClass(methodInvocation.getThis()) : null;         //通过反射,获取到目标class中被调用的method方法         Method specificMethod = ClassUtils.getMostSpecificMethod(methodInvocation.getMethod(), targetClass);          //如果调用的目标method不为null         if (specificMethod != null && !specificMethod.getDeclaringClass().equals(Object.class)) {             //尝试寻找桥接方法bridgeMethod             final Method method = BridgeMethodResolver.findBridgedMethod(specificMethod);             //通过反射,获取被调用的目标方法的@GlobalTransactional注解             final GlobalTransactional globalTransactionalAnnotation = getAnnotation(method, targetClass, GlobalTransactional.class);             //通过反射,获取被调用目标方法的@GlobalLock注解             final GlobalLock globalLockAnnotation = getAnnotation(method, targetClass, GlobalLock.class);              //如果禁用了全局事务,或者开启了降级检查,同时降级次数大于了降级检查允许次数,那么localDisable就为true             //localDisable为true则表示全局事务被禁用了,此时就不可以开启全局事务了             boolean localDisable = disable || (degradeCheck && degradeNum >= degradeCheckAllowTimes);             //如果全局事务没有禁用             if (!localDisable) {                 //全局事务注解不为空,或者是AOP切面全局事务核心配置不为空                 if (globalTransactionalAnnotation != null || this.aspectTransactional != null) {                     AspectTransactional transactional;                     if (globalTransactionalAnnotation != null) {                         //创建全局事务AOP切面的核心配置AspectTransactional,配置数据会从全局事务注解里提取出来                         transactional = new AspectTransactional(                             globalTransactionalAnnotation.timeoutMills(),                             globalTransactionalAnnotation.name(),                             globalTransactionalAnnotation.rollbackFor(),                             globalTransactionalAnnotation.noRollbackForClassName(),                             globalTransactionalAnnotation.noRollbackFor(),                             globalTransactionalAnnotation.noRollbackForClassName(),                             globalTransactionalAnnotation.propagation(),                             globalTransactionalAnnotation.lockRetryInterval(),                             globalTransactionalAnnotation.lockRetryTimes()                         );                     } else {                         transactional = this.aspectTransactional;                     }                     //真正处理全局事务的入口                     return handleGlobalTransaction(methodInvocation, transactional);                 } else if (globalLockAnnotation != null) {                     return handleGlobalLock(methodInvocation, globalLockAnnotation);                 }             }         }          //直接运行目标方法         return methodInvocation.proceed();     }          //真正进行全局事务的处理     Object handleGlobalTransaction(final MethodInvocation methodInvocation, final AspectTransactional aspectTransactional) throws Throwable {         boolean succeed = true;         try {             //基于全局事务执行模版TransactionalTemplate,来执行全局事务             return transactionalTemplate.execute(new TransactionalExecutor() {                 @Override                 public Object execute() throws Throwable {                     //真正执行目标方法                     return methodInvocation.proceed();                 }                 ...             });         } catch (TransactionalExecutor.ExecutionException e) {             ...         } finally {             if (degradeCheck) {                 EVENT_BUS.post(new DegradeCheckEvent(succeed));             }         }     }     ... }  public class TransactionalTemplate {     ...     public Object execute(TransactionalExecutor business) throws Throwable {         //1.Get transactionInfo         TransactionInfo txInfo = business.getTransactionInfo();         if (txInfo == null) {             throw new ShouldNeverHappenException("transactionInfo does not exist");         }          //1.1 Get current transaction, if not null, the tx role is 'GlobalTransactionRole.Participant'.         //根据线程本地变量副本,获取当前线程本地变量副本里是否存在xid,如果存在则创建一个全局事务         //刚开始在开启一个全局事务的时候,是没有全局事务的         GlobalTransaction tx = GlobalTransactionContext.getCurrent();          //1.2 Handle the transaction propagation.         //从全局事务配置里,可以获取到全局事务的传播级别,默认是REQUIRED         //也就是如果存在一个全局事务,就直接执行业务;如果不存在一个全局事务,就开启一个新的全局事务;         Propagation propagation = txInfo.getPropagation();          //不同的全局事务传播级别,会采取不同的处理方式         //比如挂起当前事务 + 开启新的事务,或者是直接不使用事务执行业务,挂起其实就是解绑当前线程的xid         //可以通过@GlobalTransactional注解,定制业务方法的全局事务,比如指定业务方法全局事务的传播级别         SuspendedResourcesHolder suspendedResourcesHolder = null;         try {             switch (propagation) {                 ...             }              //1.3 If null, create new transaction with role 'GlobalTransactionRole.Launcher'.             if (tx == null) {                 tx = GlobalTransactionContext.createNew();             }              //set current tx config to holder             GlobalLockConfig previousConfig = replaceGlobalLockConfig(txInfo);              try {                 //2. If the tx role is 'GlobalTransactionRole.Launcher', send the request of beginTransaction to TC, else do nothing. Of course, the hooks will still be triggered.                 //开启一个全局事务                 beginTransaction(txInfo, tx);                  Object rs;                 try {                     //Do Your Business                     //执行业务方法,把全局事务xid通过Dubbo RPC传递下去,开启并提交一个一个分支事务                     rs = business.execute();                 } catch (Throwable ex) {                     //3. The needed business exception to rollback.                     //发生异常时需要完成的事务                     completeTransactionAfterThrowing(txInfo, tx, ex);                     throw ex;                 }                  //4. everything is fine, commit.                 //如果一切执行正常就会在这里提交全局事务                 commitTransaction(tx);                  return rs;             } finally {                 //5. clear                 //执行一些全局事务完成后的回调,比如清理等工作                 resumeGlobalLockConfig(previousConfig);                 triggerAfterCompletion();                 cleanUp();             }         } finally {             //If the transaction is suspended, resume it.             if (suspendedResourcesHolder != null) {                 //如果之前挂起了一个全局事务,此时可以恢复这个全局事务                 tx.resume(suspendedResourcesHolder);             }         }     }          //开启事务     private void beginTransaction(TransactionInfo txInfo, GlobalTransaction tx) throws TransactionalExecutor.ExecutionException {         try {             //开启全局事务之前有一个回调的一个钩子名为triggerBeforeBegin()             triggerBeforeBegin();             //真正去开启一个全局事务             tx.begin(txInfo.getTimeOut(), txInfo.getName());             //开启全局事务之后还有一个回调钩子名为triggerAfterBegin()             triggerAfterBegin();         } catch (TransactionException txe) {             throw new TransactionalExecutor.ExecutionException(tx, txe, TransactionalExecutor.Code.BeginFailure);         }     }     ... }

(2)Seata与Dubbo整合的过滤器

如果线程本地变量副本里的xid不为null,对应于发起RPC调用的情形。如果线程本地变量副本里的xid为null,则对应于接收RPC调用的情形。

 

当RootContext的xid不为null时,需要设置RpcContext的xid。当RootContext的xid为null + RpcContext的xid不为null时,需要设置RootContext的xid。

@Activate(group = {DubboConstants.PROVIDER, DubboConstants.CONSUMER}, order = 100) public class ApacheDubboTransactionPropagationFilter implements Filter {     private static final Logger LOGGER = LoggerFactory.getLogger(ApacheDubboTransactionPropagationFilter.class);          @Override     public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {         //发起Dubbo的RPC调用时,会先从线程本地变量副本里获取xid         String xid = RootContext.getXID();         //然后从线程本地变量副本里获取当前的分支事务类型,默认分支类型就是AT         BranchType branchType = RootContext.getBranchType();          //从RpcContext里获取attachments里的xid和分支类型         String rpcXid = getRpcXid();         String rpcBranchType = RpcContext.getContext().getAttachment(RootContext.KEY_BRANCH_TYPE);         if (LOGGER.isDebugEnabled()) {             LOGGER.debug("xid in RootContext[{}] xid in RpcContext[{}]", xid, rpcXid);         }          boolean bind = false;         if (xid != null) {             //如果线程本地变量副本里的xid不为null,对应于发起RPC调用的情形             //则把线程本地变量副本里的xid和分支类型,设置到RpcContext上下文里             //RpcContext上下文里的attachment内容会随着RPC请求发送到其他系统中             RpcContext.getContext().setAttachment(RootContext.KEY_XID, xid);             RpcContext.getContext().setAttachment(RootContext.KEY_BRANCH_TYPE, branchType.name());         } else {             //如果线程本地变量副本里的xid为null且RpcContext里的xid不为null,对应于接收RPC调用的情形             if (rpcXid != null) {                 //把RpcContext里的xid绑定到当前服务的线程本地变量副本里                 RootContext.bind(rpcXid);                  if (StringUtils.equals(BranchType.TCC.name(), rpcBranchType)) {                     RootContext.bindBranchType(BranchType.TCC);                 }                 bind = true;                 if (LOGGER.isDebugEnabled()) {                     LOGGER.debug("bind xid [{}] branchType [{}] to RootContext", rpcXid, rpcBranchType);                 }             }         }         try {             return invoker.invoke(invocation);         } finally {             if (bind) {                 BranchType previousBranchType = RootContext.getBranchType();                 //对线程本地变量副本里的xid做解绑                 String unbindXid = RootContext.unbind();                  if (BranchType.TCC == previousBranchType) {                     RootContext.unbindBranchType();                 }                 if (LOGGER.isDebugEnabled()) {                     LOGGER.debug("unbind xid [{}] branchType [{}] from RootContext", unbindXid, previousBranchType);                 }                 if (!rpcXid.equalsIgnoreCase(unbindXid)) {                     LOGGER.warn("xid in change during RPC from {} to {},branchType from {} to {}", rpcXid, unbindXid, rpcBranchType != null ? rpcBranchType : "AT", previousBranchType);                     if (unbindXid != null) {                         RootContext.bind(unbindXid);                         LOGGER.warn("bind xid [{}] back to RootContext", unbindXid);                         if (BranchType.TCC == previousBranchType) {                             RootContext.bindBranchType(BranchType.TCC);                             LOGGER.warn("bind branchType [{}] back to RootContext", previousBranchType);                         }                     }                 }             }             //对RpcContext上下文里的东西进行解绑             RpcContext.getContext().removeAttachment(RootContext.KEY_XID);             RpcContext.getContext().removeAttachment(RootContext.KEY_BRANCH_TYPE);             RpcContext.getServerContext().removeAttachment(RootContext.KEY_XID);             RpcContext.getServerContext().removeAttachment(RootContext.KEY_BRANCH_TYPE);         }     }      private String getRpcXid() {         String rpcXid = RpcContext.getContext().getAttachment(RootContext.KEY_XID);         if (rpcXid == null) {             rpcXid = RpcContext.getContext().getAttachment(RootContext.KEY_XID.toLowerCase());         }         return rpcXid;     } }

 

发表评论

评论已关闭。

相关文章

  • 0