Seata源码—6.Seata AT模式的数据源代理

大纲

1.Seata的Resource资源接口源码

2.Seata数据源连接池代理的实现源码

3.Client向Server发起注册RM的源码

4.Client向Server注册RM时的交互源码

5.数据源连接代理与SQL句柄代理的初始化源码

6.Seata基于SQL句柄代理执行SQL的源码

7.执行SQL语句前取消自动提交事务的源码

8.执行SQL语句前后构建数据镜像的源码

9.构建全局锁的key和UndoLog数据的源码

10.Seata Client发起分支事务注册的源码

11.Seata Server处理分支事务注册请求的源码

12.将UndoLog写入到数据库与提交事务的源码

13.通过全局锁重试策略组件执行事务的提交

14.注册分支事务时获取全局锁的入口源码

15.Seata Server获取全局锁的具体逻辑源码

16.全局锁和分支事务及本地事务总结

17.提交全局事务以及提交各分支事务的源码

18.全局事务回滚的过程源码

 

1.Seata的Resource资源接口源码

数据源代理DataSourceProxy不仅实现了Seata的Resource资源接口,同时还继承了实现了SeataDataSourceProxy接口的抽象类AbstractDataSourceProxy。

 

由于SeataDataSourceProxy接口又继承自JDK提供的DataSource接口,所以通过数据源连接池DataSource接口的方法,可以获取数据源的连接。

 

注意:这里的数据源==数据库。

public class DataSourceProxy extends AbstractDataSourceProxy implements Resource {     ... }  public abstract class AbstractDataSourceProxy implements SeataDataSourceProxy {     ... }  public interface SeataDataSourceProxy extends DataSource {     ... }  public interface DataSource extends CommonDataSource, Wrapper {     //获取数据源连接     Connection getConnection() throws SQLException;     Connection getConnection(String username, String password) throws SQLException; }

Seata的Resource资源接口有三个方法:

 

一.getResourceGroupId()方法用来获取资源分组

比如主从节点同属一个分组。

 

二.getResourceId()方法用来获取数据源ID

比如数据源连接URL可作为数据源ID。

 

三.getBranchType()方法用来获取分支事务类型

比如类型有:AT、TCC、SAGA、XA。

//Resource that can be managed by Resource Manager and involved into global transaction. //资源是由RM资源管理组件来负责管理的 //RM资源管理器组件会负责把一个个的资源纳入到全局事务里去 //比如RM可以管理数据库资源,把一个数据库本地事务纳入到全局事务里去 public interface Resource {     //Get the resource group id.     //e.g. master and slave data-source should be with the same resource group id.     //获取到资源分组ID     //主从架构的数据源关联到同一个资源分组ID     //比如MySQL部署了主从架构,主节点和从节点是两个数据源,但是关联到一个分组ID     String getResourceGroupId();      //Get the resource id.     //e.g. url of a data-source could be the id of the db data-source resource.     //比如数据源连接URL可以作为数据源的ID     String getResourceId();      //get resource type, AT, TCC, SAGA and XA     //branchType表示分支事务类型:AT、TCC、SAGA、XA     BranchType getBranchType(); }

 

2.Seata数据源连接池代理的实现源码

(1)Seata的数据源连接池代理接口SeataDataSourceProxy

(2)Seata的数据源连接池代理抽象类AbstractDataSourceProxy

(3)Seata的数据源连接池代理DataSourceProxy的变量和初始化

 

(1)Seata的数据源连接池代理接口SeataDataSourceProxy

SeataDataSourceProxy数据源代理在继承DataSource数据源连接池的基础上,增加了两个方法:一个是获取代理的目标数据源连接池的方法,一个是获取代理的目标数据源连接池对应的分支事务类型的方法。

public interface SeataDataSourceProxy extends DataSource {     //Gets target data source.      //获取代理的目标数据源连接池     DataSource getTargetDataSource();     //Gets branch type.      //获取代理的目标数据源连接池对应的分支事务类型     BranchType getBranchType(); }

(2)Seata的数据源连接池代理抽象类AbstractDataSourceProxy

AbstractDataSourceProxy抽象类的主要工作是封装代理的目标数据源连接池targetDataSource。

//The type Abstract data source proxy. //AbstractDataSourceProxy主要的工作就是: //封装了代理的目标数据源连接池targetDataSource public abstract class AbstractDataSourceProxy implements SeataDataSourceProxy {     //The Target data source.     //代理目标的连接池,可以通过targetDataSource来获取连接     protected DataSource targetDataSource;      //Instantiates a new Abstract data source proxy.     public AbstractDataSourceProxy(){ }      //Instantiates a new Abstract data source proxy.     public AbstractDataSourceProxy(DataSource targetDataSource) {         this.targetDataSource = targetDataSource;     }      //Gets target data source.     @Override     public DataSource getTargetDataSource() {         return targetDataSource;     }      @Override     public <T> T unwrap(Class<T> iface) throws SQLException {         return targetDataSource.unwrap(iface);     }      //判断目标连接池targetDataSource是否包装了指定的接口iface     @Override     public boolean isWrapperFor(Class<?> iface) throws SQLException {         return targetDataSource.isWrapperFor(iface);     }      @Override     public PrintWriter getLogWriter() throws SQLException {         return targetDataSource.getLogWriter();     }      @Override     public void setLogWriter(PrintWriter out) throws SQLException {         targetDataSource.setLogWriter(out);     }      @Override     public void setLoginTimeout(int seconds) throws SQLException {         targetDataSource.setLoginTimeout(seconds);     }      @Override     public int getLoginTimeout() throws SQLException {         return targetDataSource.getLoginTimeout();     }      @Override     public Logger getParentLogger() throws SQLFeatureNotSupportedException {         return targetDataSource.getParentLogger();     } }

(3)Seata的数据源连接池代理DataSourceProxy的变量和初始化

初始化数据源连接池代理DataSourceProxy的具体逻辑是:首先从目标数据库连接池dataSource中获取一个数据库连接,然后根据这个数据库连接Connection去初始化jdbcUrl和dbType,接着根据数据库连接地址jdbcUrl初始化resourceId,然后把当前数据库连接池代理DataSourceProxy作为一个资源注册到默认的RM即DefaultResourceManager里去,最后设置RootContext上下文即线程本地变量副本中的分支事务类型。

public class DataSourceProxy extends AbstractDataSourceProxy implements Resource {     private static final Logger LOGGER = LoggerFactory.getLogger(DataSourceProxy.class);     //默认资源分组ID     private static final String DEFAULT_RESOURCE_GROUP_ID = "DEFAULT";     //Enable the table meta checker,默认是不启用的     private static boolean ENABLE_TABLE_META_CHECKER_ENABLE = ConfigurationFactory.getInstance().getBoolean(ConfigurationKeys.CLIENT_TABLE_META_CHECK_ENABLE, DEFAULT_CLIENT_TABLE_META_CHECK_ENABLE);     //Table meta checker interval,默认是60s     private static final long TABLE_META_CHECKER_INTERVAL = ConfigurationFactory.getInstance().getLong(ConfigurationKeys.CLIENT_TABLE_META_CHECKER_INTERVAL, DEFAULT_TABLE_META_CHECKER_INTERVAL);     //资源组ID,比如MySQL部署了主从架构,主节点和从节点是两个数据源,但是关联到一个分组ID     private String resourceGroupId;     //代理的目标数据源连接url,这个数据源连接url也可以作为resourceId     private String jdbcUrl;     //数据源ID,比如数据库连接url就可以作为一个数据源ID     private String resourceId;     //数据源类型     private String dbType;     //数据源连接用户名     private String userName;     //定时调度的线程池,定时检查表里的元数据     private final ScheduledExecutorService tableMetaExecutor = new ScheduledThreadPoolExecutor(1, new NamedThreadFactory("tableMetaChecker", 1, true));      //Instantiates a new Data source proxy.     public DataSourceProxy(DataSource targetDataSource) {         this(targetDataSource, DEFAULT_RESOURCE_GROUP_ID);     }      //Instantiates a new Data source proxy.     //@param targetDataSource the target data source     //@param resourceGroupId  the resource group id     public DataSourceProxy(DataSource targetDataSource, String resourceGroupId) {         if (targetDataSource instanceof SeataDataSourceProxy) {             LOGGER.info("Unwrap the target data source, because the type is: {}", targetDataSource.getClass().getName());             targetDataSource = ((SeataDataSourceProxy) targetDataSource).getTargetDataSource();         }         this.targetDataSource = targetDataSource;         init(targetDataSource, resourceGroupId);     }      //初始化数据源连接池代理DataSourceProxy     private void init(DataSource dataSource, String resourceGroupId) {         //资源分组ID         this.resourceGroupId = resourceGroupId;          //从目标数据库连接池dataSource中,获取一个数据库连接         try (Connection connection = dataSource.getConnection()) {             //获取数据库连接connection里的元数据的连接url             jdbcUrl = connection.getMetaData().getURL();             //根据连接url获取到数据库类型             dbType = JdbcUtils.getDbType(jdbcUrl);             if (JdbcConstants.ORACLE.equals(dbType)) {                 //如果数据库类型等于oracle,则需要获取数据库连接connection的元数据的用户名                 userName = connection.getMetaData().getUserName();             } else if (JdbcConstants.MARIADB.equals(dbType)) {                 //如果数据库类型等于mariadb,则需要对数据库类型进行赋值为MySQL                 dbType = JdbcConstants.MYSQL;             }         } catch (SQLException e) {             throw new IllegalStateException("can not init dataSource", e);         }          //初始化资源ID,也就是获取数据库连接url来初始化resourceID         initResourceId();          //把当前数据库连接池代理,作为一个资源,注册到默认的RM里,也就是DefaultResourceManager         DefaultResourceManager.get().registerResource(this);          if (ENABLE_TABLE_META_CHECKER_ENABLE) {             tableMetaExecutor.scheduleAtFixedRate(() -> {                 try (Connection connection = dataSource.getConnection()) {                     TableMetaCacheFactory.getTableMetaCache(DataSourceProxy.this.getDbType()).refresh(connection, DataSourceProxy.this.getResourceId());                 } catch (Exception ignore) {                                        }             }, 0, TABLE_META_CHECKER_INTERVAL, TimeUnit.MILLISECONDS);         }          //Set the default branch type to 'AT' in the RootContext.         //设置RootContext上下文,即线程本地变量副本中的分支事务类型         RootContext.setDefaultBranchType(this.getBranchType());     }          private void initResourceId() {         if (JdbcConstants.POSTGRESQL.equals(dbType)) {             initPGResourceId();         } else if (JdbcConstants.ORACLE.equals(dbType) && userName != null) {             initDefaultResourceId();             resourceId = resourceId + "/" + userName;         } else if (JdbcConstants.MYSQL.equals(dbType)) {             initMysqlResourceId();         } else {             initDefaultResourceId();         }     }          private void initMysqlResourceId() {         String startsWith = "jdbc:mysql:loadbalance://";         if (jdbcUrl.startsWith(startsWith)) {             String url;             if (jdbcUrl.contains("?")) {                 url = jdbcUrl.substring(0, jdbcUrl.indexOf('?'));             } else {                 url = jdbcUrl;             }             resourceId = url.replace(",", "|");         } else {             initDefaultResourceId();         }     }     ... }

 

3.Client向Server发起注册RM的源码

初始化数据源连接池代理DataSourceProxy时,会将数据库连接池代理作为资源,注册到DefaultResourceManager资源管理器中。

 

而初始化DefaultResourceManager时,会通过SPI机制加载所有的ResourceManager。

 

因此在执行DataSourceProxy的init()方法进行初始化时,由于会调用DefaultResourceManager的registerResource()方法,所以最后会执行到DataSourceManager的registerResource()方法。

 

在DataSourceManager的registerResource()方法中,首先会把数据源连接池代理DataSourceProxy放入一个Map中进行缓存,然后通过RmNettyRemotingClient构造一个注册RM的请求把数据源连接池代理DataSourceProxy作为资源注册到Seata Server中。

Seata源码—6.Seata AT模式的数据源代理
public class DefaultResourceManager implements ResourceManager {     //all resource managers     protected static Map<BranchType, ResourceManager> resourceManagers = new ConcurrentHashMap<>();          private static class SingletonHolder {         private static DefaultResourceManager INSTANCE = new DefaultResourceManager();     }          //Get resource manager.     public static DefaultResourceManager get() {         return SingletonHolder.INSTANCE;     }          private DefaultResourceManager() {         initResourceManagers();     }          protected void initResourceManagers() {         //init all resource managers         //通过SPI加载所有的ResourceManager资源管理器         //比如:DataSourceManager、TCCResourceManager、SagaResourceManager、ResourceManagerXA         List<ResourceManager> allResourceManagers = EnhancedServiceLoader.loadAll(ResourceManager.class);         if (CollectionUtils.isNotEmpty(allResourceManagers)) {             for (ResourceManager rm : allResourceManagers) {                 resourceManagers.put(rm.getBranchType(), rm);             }         }     }          @Override     public void registerResource(Resource resource) {         getResourceManager(resource.getBranchType()).registerResource(resource);     }          public ResourceManager getResourceManager(BranchType branchType) {         ResourceManager rm = resourceManagers.get(branchType);         if (rm == null) {             throw new FrameworkException("No ResourceManager for BranchType:" + branchType.name());         }         return rm;     }     ... }  //The type Data source manager. //DataSourceManager是AT模式下的资源管理器 public class DataSourceManager extends AbstractResourceManager {     //异步化worker     private final AsyncWorker asyncWorker = new AsyncWorker(this);     //RM负责管理的一些resource资源     private final Map<String, Resource> dataSourceCache = new ConcurrentHashMap<>();     ...          @Override     public void registerResource(Resource resource) {         DataSourceProxy dataSourceProxy = (DataSourceProxy) resource;         //根据资源ID和数据源代理,把数据源连接池代理DataSourceProxy放入到map里去         dataSourceCache.put(dataSourceProxy.getResourceId(), dataSourceProxy);         super.registerResource(dataSourceProxy);     }     ... }  public abstract class AbstractResourceManager implements ResourceManager {     ...     @Override     public void registerResource(Resource resource) {         //通过RmNettyRemotingClient把RM注册到Seata Server中         RmNettyRemotingClient.getInstance().registerResource(resource.getResourceGroupId(), resource.getResourceId());     }     ... }

 

4.Client向Server注册RM时的交互源码

(1)Client异步发送注册RM的请求给Server

(2)Server收到注册RM的请求后的处理及异步响应

 

(1)Client异步发送注册RM的请求给Server

public final class RmNettyRemotingClient extends AbstractNettyRemotingClient {     ...     //Register new db key.     public void registerResource(String resourceGroupId, String resourceId) {         //Resource registration cannot be performed until the RM client is initialized         if (StringUtils.isBlank(transactionServiceGroup)) {             return;         }          if (getClientChannelManager().getChannels().isEmpty()) {             getClientChannelManager().reconnect(transactionServiceGroup);             return;         }          synchronized (getClientChannelManager().getChannels()) {             //向每一个Server发起注册             for (Map.Entry<String, Channel> entry : getClientChannelManager().getChannels().entrySet()) {                 String serverAddress = entry.getKey();                 Channel rmChannel = entry.getValue();                 if (LOGGER.isInfoEnabled()) {                     LOGGER.info("will register resourceId:{}", resourceId);                 }                 sendRegisterMessage(serverAddress, rmChannel, resourceId);             }         }     }          public void sendRegisterMessage(String serverAddress, Channel channel, String resourceId) {         RegisterRMRequest message = new RegisterRMRequest(applicationId, transactionServiceGroup);         message.setResourceIds(resourceId);         try {             //异步发送注册RM的请求             super.sendAsyncRequest(channel, message);         } catch (FrameworkException e) {             if (e.getErrcode() == FrameworkErrorCode.ChannelIsNotWritable && serverAddress != null) {                 getClientChannelManager().releaseChannel(channel, serverAddress);                 if (LOGGER.isInfoEnabled()) {                     LOGGER.info("remove not writable channel:{}", channel);                 }             } else {                 LOGGER.error("register resource failed, channel:{},resourceId:{}", channel, resourceId, e);             }         }     }     ... }  public abstract class AbstractNettyRemotingClient extends AbstractNettyRemoting implements RemotingClient {     ...     @Override     public void sendAsyncRequest(Channel channel, Object msg) {         if (channel == null) {             LOGGER.warn("sendAsyncRequest nothing, caused by null channel.");             return;         }         RpcMessage rpcMessage = buildRequestMessage(msg, msg instanceof HeartbeatMessage             ? ProtocolConstants.MSGTYPE_HEARTBEAT_REQUEST             : ProtocolConstants.MSGTYPE_RESQUEST_ONEWAY);         if (rpcMessage.getBody() instanceof MergeMessage) {             mergeMsgMap.put(rpcMessage.getId(), (MergeMessage) rpcMessage.getBody());         }         super.sendAsync(channel, rpcMessage);     }     ... }  public abstract class AbstractNettyRemoting implements Disposable {     ...     //rpc async request.     protected void sendAsync(Channel channel, RpcMessage rpcMessage) {         channelWritableCheck(channel, rpcMessage.getBody());         if (LOGGER.isDebugEnabled()) {             LOGGER.debug("write message:" + rpcMessage.getBody() + ", channel:" + channel + ",active?" + channel.isActive() + ",writable?" + channel.isWritable() + ",isopen?" + channel.isOpen());         }         doBeforeRpcHooks(ChannelUtil.getAddressFromChannel(channel), rpcMessage);         channel.writeAndFlush(rpcMessage).addListener((ChannelFutureListener) future -> {             if (!future.isSuccess()) {                 destroyChannel(future.channel());             }         });     }     ... }

(2)Server收到注册RM的请求后的处理及异步响应

Seata源码—6.Seata AT模式的数据源代理
public abstract class AbstractNettyRemotingServer extends AbstractNettyRemoting implements RemotingServer {     ...     @ChannelHandler.Sharable     class ServerHandler extends ChannelDuplexHandler {         @Override         public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception {             if (!(msg instanceof RpcMessage)) {                 return;             }             //接下来调用processMessage()方法对解码完毕的RpcMessage对象进行处理             processMessage(ctx, (RpcMessage) msg);         }         ...     }     ... }  public abstract class AbstractNettyRemoting implements Disposable {     ...     //Rpc message processing.     protected void processMessage(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception {         if (LOGGER.isDebugEnabled()) {             LOGGER.debug(String.format("%s msgId:%s, body:%s", this, rpcMessage.getId(), rpcMessage.getBody()));         }         Object body = rpcMessage.getBody();         if (body instanceof MessageTypeAware) {             MessageTypeAware messageTypeAware = (MessageTypeAware) body;             //根据消息类型获取到一个Pair对象,该Pair对象是由请求处理组件和请求处理线程池组成的             //processorTable里的内容,是NettyRemotingServer在初始化时,通过调用registerProcessor()方法put进去的             final Pair<RemotingProcessor, ExecutorService> pair = this.processorTable.get((int) messageTypeAware.getTypeCode());             if (pair != null) {                 if (pair.getSecond() != null) {                     try {                         pair.getSecond().execute(() -> {                             try {                                 pair.getFirst().process(ctx, rpcMessage);                             } catch (Throwable th) {                                 LOGGER.error(FrameworkErrorCode.NetDispatch.getErrCode(), th.getMessage(), th);                             } finally {                                 MDC.clear();                             }                         });                     } catch (RejectedExecutionException e) {                         ...                     }                 } else {                     try {                         pair.getFirst().process(ctx, rpcMessage);                     } catch (Throwable th) {                         LOGGER.error(FrameworkErrorCode.NetDispatch.getErrCode(), th.getMessage(), th);                     }                 }             } else {                 LOGGER.error("This message type [{}] has no processor.", messageTypeAware.getTypeCode());             }         } else {             LOGGER.error("This rpcMessage body[{}] is not MessageTypeAware type.", body);         }     }     ... }  public class RegRmProcessor implements RemotingProcessor {     ...     @Override     public void process(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception {         onRegRmMessage(ctx, rpcMessage);     }      private void onRegRmMessage(ChannelHandlerContext ctx, RpcMessage rpcMessage) {         RegisterRMRequest message = (RegisterRMRequest) rpcMessage.getBody();         //获取请求的发送地址         String ipAndPort = NetUtil.toStringAddress(ctx.channel().remoteAddress());         boolean isSuccess = false;         String errorInfo = StringUtils.EMPTY;         try {             if (null == checkAuthHandler || checkAuthHandler.regResourceManagerCheckAuth(message)) {                 //通过Channel管理组件ChannelManager,注册RM网络连接                 ChannelManager.registerRMChannel(message, ctx.channel());                 Version.putChannelVersion(ctx.channel(), message.getVersion());                 isSuccess = true;                 if (LOGGER.isDebugEnabled()) {                     LOGGER.debug("RM checkAuth for client:{},vgroup:{},applicationId:{} is OK", ipAndPort, message.getTransactionServiceGroup(), message.getApplicationId());                 }             } else {                 if (LOGGER.isWarnEnabled()) {                     LOGGER.warn("RM checkAuth for client:{},vgroup:{},applicationId:{} is FAIL", ipAndPort, message.getTransactionServiceGroup(), message.getApplicationId());                 }             }         } catch (Exception exx) {             isSuccess = false;             errorInfo = exx.getMessage();             LOGGER.error("RM register fail, error message:{}", errorInfo);         }         RegisterRMResponse response = new RegisterRMResponse(isSuccess);         if (StringUtils.isNotEmpty(errorInfo)) {             response.setMsg(errorInfo);         }         //返回响应给客户端         remotingServer.sendAsyncResponse(rpcMessage, ctx.channel(), response);         if (isSuccess && LOGGER.isInfoEnabled()) {             LOGGER.info("RM register success,message:{},channel:{},client version:{}", message, ctx.channel(), message.getVersion());         }     }     ... }  public class ChannelManager {     ...     public static void registerRMChannel(RegisterRMRequest resourceManagerRequest, Channel channel) throws IncompatibleVersionException {         Version.checkVersion(resourceManagerRequest.getVersion());         Set<String> dbkeySet = dbKeytoSet(resourceManagerRequest.getResourceIds());         RpcContext rpcContext;         if (!IDENTIFIED_CHANNELS.containsKey(channel)) {             rpcContext = buildChannelHolder(NettyPoolKey.TransactionRole.RMROLE, resourceManagerRequest.getVersion(),                 resourceManagerRequest.getApplicationId(), resourceManagerRequest.getTransactionServiceGroup(),                 resourceManagerRequest.getResourceIds(), channel);             rpcContext.holdInIdentifiedChannels(IDENTIFIED_CHANNELS);         } else {             rpcContext = IDENTIFIED_CHANNELS.get(channel);             rpcContext.addResources(dbkeySet);         }         if (dbkeySet == null || dbkeySet.isEmpty()) {              return;          }         for (String resourceId : dbkeySet) {             String clientIp;             ConcurrentMap<Integer, RpcContext> portMap =                 CollectionUtils.computeIfAbsent(RM_CHANNELS, resourceId, key -> new ConcurrentHashMap<>())                 .computeIfAbsent(resourceManagerRequest.getApplicationId(), key -> new ConcurrentHashMap<>())                 .computeIfAbsent(clientIp = ChannelUtil.getClientIpFromChannel(channel), key -> new ConcurrentHashMap<>());              rpcContext.holdInResourceManagerChannels(resourceId, portMap);             updateChannelsResource(resourceId, clientIp, resourceManagerRequest.getApplicationId());         }     }     ... }  public abstract class AbstractNettyRemotingServer extends AbstractNettyRemoting implements RemotingServer {     ...     @Override     public void sendAsyncResponse(RpcMessage rpcMessage, Channel channel, Object msg) {         Channel clientChannel = channel;         if (!(msg instanceof HeartbeatMessage)) {             clientChannel = ChannelManager.getSameClientChannel(channel);         }         if (clientChannel != null) {             RpcMessage rpcMsg = buildResponseMessage(rpcMessage, msg, msg instanceof HeartbeatMessage                 ? ProtocolConstants.MSGTYPE_HEARTBEAT_RESPONSE : ProtocolConstants.MSGTYPE_RESPONSE);             super.sendAsync(clientChannel, rpcMsg);         } else {             throw new RuntimeException("channel is error.");         }     }     ... }  public abstract class AbstractNettyRemoting implements Disposable {     ...     //rpc async request.     protected void sendAsync(Channel channel, RpcMessage rpcMessage) {         channelWritableCheck(channel, rpcMessage.getBody());         if (LOGGER.isDebugEnabled()) {             LOGGER.debug("write message:" + rpcMessage.getBody() + ", channel:" + channel + ",active?" + channel.isActive() + ",writable?" + channel.isWritable() + ",isopen?" + channel.isOpen());         }         doBeforeRpcHooks(ChannelUtil.getAddressFromChannel(channel), rpcMessage);         channel.writeAndFlush(rpcMessage).addListener((ChannelFutureListener) future -> {             if (!future.isSuccess()) {                 destroyChannel(future.channel());             }         });     }     ... }

 

5.数据源连接代理与SQL句柄代理的初始化源码

(1)数据库操作的三剑客之连接、句柄和结果

(2)数据源连接代理的初始化

(3)数据源连接代理对SQL进行预编译

(4)SQL句柄代理的初始化

(5)SQL句柄代理执行SQL

 

(1)数据库操作的三剑客之连接、句柄和结果

Seata Client或者Seata Server进行数据库操作的大致流程如下所示:

Seata源码—6.Seata AT模式的数据源代理
public class LogStoreDataBaseDAO implements LogStore {     //The Log store data source. 数据源连接池     protected DataSource logStoreDataSource = null;     ...     @Override     public GlobalTransactionDO queryGlobalTransactionDO(long transactionId) {         String sql = LogStoreSqlsFactory.getLogStoreSqls(dbType).getQueryGlobalTransactionSQLByTransactionId(globalTable);         Connection conn = null;//连接         PreparedStatement ps = null;//句柄         ResultSet rs = null;//结果         try {             //1.从数据源连接池中获取数据源连接             conn = logStoreDataSource.getConnection();             conn.setAutoCommit(true);             //2.对sql语句进行预编译             ps = conn.prepareStatement(sql);             ps.setLong(1, transactionId);             //3.执行sql语句             rs = ps.executeQuery();             if (rs.next()) {                 return convertGlobalTransactionDO(rs);             } else {                 return null;             }         } catch (SQLException e) {             throw new DataAccessException(e);         } finally {             IOUtil.close(rs, ps, conn);         }     }     ... }

(2)数据源连接代理的初始化

Seata Client或者Seata Server进行数据库操作时,首先会通过数据库连接池代理DataSourceProxy获取数据库连接,也就是会通过DataSourceProxy的getConnection()方法获取数据源连接代理ConnectionProxy,其中就会根据获取到的一个数据源连接Connection初始化一个数据源连接代理ConnectionProxy。

public class DataSourceProxy extends AbstractDataSourceProxy implements Resource {     ...     @Override     public ConnectionProxy getConnection() throws SQLException {         //从目标数据源连接池中获取一个数据库连接,然后封装到ConnectionProxy数据源连接代理中,并进行返回         Connection targetConnection = targetDataSource.getConnection();         return new ConnectionProxy(this, targetConnection);     }      @Override     public ConnectionProxy getConnection(String username, String password) throws SQLException {         //从目标数据源连接池中获取一个数据库连接,然后封装到ConnectionProxy数据源连接代理中,并进行返回         Connection targetConnection = targetDataSource.getConnection(username, password);         return new ConnectionProxy(this, targetConnection);     }     ... }  public class ConnectionProxy extends AbstractConnectionProxy {     //Instantiates a new Connection proxy.     public ConnectionProxy(DataSourceProxy dataSourceProxy, Connection targetConnection) {         super(dataSourceProxy, targetConnection);     }     ... }  public abstract class AbstractConnectionProxy implements Connection {     //The Data source proxy. 数据源连接池代理     protected DataSourceProxy dataSourceProxy;     //The Target connection. 目标数据源连接     protected Connection targetConnection;     //Instantiates a new Abstract connection proxy.     public AbstractConnectionProxy(DataSourceProxy dataSourceProxy, Connection targetConnection) {         this.dataSourceProxy = dataSourceProxy;         this.targetConnection = targetConnection;     }     ... }

(3)数据源连接代理对SQL进行预编译

数据源连接代理ConnectionProxy在进行数据库操作时,获取到数据库连接Connection之后,就需要对要执行的SQL进行预编译,也就是会调用AbstractConnectionProxy的prepareStatement()方法。

public abstract class AbstractConnectionProxy implements Connection {     ...     //对SQL进行预编译     @Override     public PreparedStatement prepareStatement(String sql) throws SQLException {         String dbType = getDbType();         //support oracle 10.2+         PreparedStatement targetPreparedStatement = null;         //如果是AT模式         if (BranchType.AT == RootContext.getBranchType()) {             List<SQLRecognizer> sqlRecognizers = SQLVisitorFactory.get(sql, dbType);             if (sqlRecognizers != null && sqlRecognizers.size() == 1) {                 SQLRecognizer sqlRecognizer = sqlRecognizers.get(0);                 if (sqlRecognizer != null && sqlRecognizer.getSQLType() == SQLType.INSERT) {                     TableMeta tableMeta = TableMetaCacheFactory.getTableMetaCache(dbType).getTableMeta(                         getTargetConnection(),                         sqlRecognizer.getTableName(), getDataSourceProxy().getResourceId()                     );                     String[] pkNameArray = new String[tableMeta.getPrimaryKeyOnlyName().size()];                     tableMeta.getPrimaryKeyOnlyName().toArray(pkNameArray);                     targetPreparedStatement = getTargetConnection().prepareStatement(sql, pkNameArray);                 }             }         }         if (targetPreparedStatement == null) {             targetPreparedStatement = getTargetConnection().prepareStatement(sql);         }         //返回一个SQL句柄代理         return new PreparedStatementProxy(this, targetPreparedStatement, sql);     }     ... }

(4)SQL句柄代理的初始化

SQL句柄代理PreparedStatementProxy的初始化主要是设置目标SQL、目标句柄和数据源连接代理。

public class PreparedStatementProxy extends AbstractPreparedStatementProxy implements PreparedStatement, ParametersHolder {     //Instantiates a new Prepared statement proxy.     public PreparedStatementProxy(AbstractConnectionProxy connectionProxy, PreparedStatement targetStatement, String targetSQL) throws SQLException {         super(connectionProxy, targetStatement, targetSQL);     }     ... }  public abstract class AbstractPreparedStatementProxy extends StatementProxy<PreparedStatement> implements PreparedStatement {     protected Map<Integer, ArrayList<Object>> parameters;          private void initParameterHolder() {         this.parameters = new HashMap<>();     }          //Instantiates a new Abstract prepared statement proxy.     public AbstractPreparedStatementProxy(AbstractConnectionProxy connectionProxy, PreparedStatement targetStatement, String targetSQL) throws SQLException {         super(connectionProxy, targetStatement, targetSQL);         initParameterHolder();     }     ... }  public class StatementProxy<T extends Statement> extends AbstractStatementProxy<T> {     //Instantiates a new Statement proxy.     public StatementProxy(AbstractConnectionProxy connectionWrapper, T targetStatement, String targetSQL) throws SQLException {         super(connectionWrapper, targetStatement, targetSQL);     }     ... }  public abstract class AbstractStatementProxy<T extends Statement> implements Statement {     //The Connection proxy.     protected AbstractConnectionProxy connectionProxy;     //The Target statement.     protected T targetStatement;     //The Target sql.     protected String targetSQL;     ...     //Instantiates a new Abstract statement proxy.     public AbstractStatementProxy(AbstractConnectionProxy connectionProxy, T targetStatement, String targetSQL) throws SQLException {         this.connectionProxy = connectionProxy;         this.targetStatement = targetStatement;         this.targetSQL = targetSQL;     }     ... }

(5)SQL句柄代理执行SQL

从数据源连接池中获取到数据源连接,以及对SQL语句进行预编译后,就可以调用SQL句柄代理PreparedStatementProxy的executeQuery()等方法执行SQL语句。

 

6.Seata基于SQL句柄代理执行SQL的源码

(1)Spring的JdbcTemplate操作数据库的三剑客

(2)基于SQL句柄代理执行SQL的流程

 

(1)Spring的JdbcTemplate操作数据库的三剑客

连接、句柄和结果。

@Disabled public class LocalTransactionWithGlobalLockDataSourceBasicTest {     private static ClassPathXmlApplicationContext context;     private static JdbcTemplate jdbcTemplate;          @BeforeAll     public static void before() {         context = new ClassPathXmlApplicationContext("basic-test-context.xml");         jdbcTemplate = (JdbcTemplate) context.getBean("jdbcTemplate");     }          @Test     public void testInsert() {         RootContext.bindGlobalLockFlag();         jdbcTemplate.update("insert into user0 (id, name, gmt) values (?, ?, ?)", new Object[]{2, "xxx", new Date()});     }     ... }  public class JdbcTemplate extends JdbcAccessor implements JdbcOperations {     ...     @Override     public int update(String sql, @Nullable Object... args) throws DataAccessException {         return update(sql, newArgPreparedStatementSetter(args));     }          @Override     public int update(String sql, @Nullable PreparedStatementSetter pss) throws DataAccessException {         return update(new SimplePreparedStatementCreator(sql), pss);     }          protected int update(final PreparedStatementCreator psc, @Nullable final PreparedStatementSetter pss) throws DataAccessException {         logger.debug("Executing prepared SQL update");         return updateCount(execute(psc, ps -> {             try {                 if (pss != null) {                     pss.setValues(ps);                 }                 //PreparedStatement执行SQL                 int rows = ps.executeUpdate();                 if (logger.isTraceEnabled()) {                     logger.trace("SQL update affected " + rows + " rows");                 }                 return rows;             } finally {                 if (pss instanceof ParameterDisposer) {                     ((ParameterDisposer) pss).cleanupParameters();                 }             }         }, true));     }          @Nullable     private <T> T execute(PreparedStatementCreator psc, PreparedStatementCallback<T> action, boolean closeResources) throws DataAccessException {         Assert.notNull(psc, "PreparedStatementCreator must not be null");         Assert.notNull(action, "Callback object must not be null");         if (logger.isDebugEnabled()) {             String sql = getSql(psc);             logger.debug("Executing prepared SQL statement" + (sql != null ? " [" + sql + "]" : ""));         }         //1.获取连接         Connection con = DataSourceUtils.getConnection(obtainDataSource());         PreparedStatement ps = null;         try {             //2.创建句柄             ps = psc.createPreparedStatement(con);             applyStatementSettings(ps);             //3.执行SQL的结果             T result = action.doInPreparedStatement(ps);             handleWarnings(ps);             return result;         } catch (SQLException ex) {             if (psc instanceof ParameterDisposer) {                 ((ParameterDisposer) psc).cleanupParameters();             }             String sql = getSql(psc);             psc = null;             JdbcUtils.closeStatement(ps);             ps = null;             DataSourceUtils.releaseConnection(con, getDataSource());             con = null;             throw translateException("PreparedStatementCallback", sql, ex);         } finally {             if (closeResources) {                 if (psc instanceof ParameterDisposer) {                     ((ParameterDisposer) psc).cleanupParameters();                 }                 JdbcUtils.closeStatement(ps);                 DataSourceUtils.releaseConnection(con, getDataSource());             }         }     }     ... }

(2)基于SQL句柄代理执行SQL的流程

SQL句柄代理PreparedStatementProxy在调用execute()方法执行SQL时,就会调用到ExecuteTemplate执行模版的execute()方法。

 

而ExecuteTemplate执行模版的execute()方法,如果发现不需要全局锁 + 没有开启全局事务,那么就普通执行本地事务。否则,最终就会调用到BaseTransactionalExecutor的excute()方法。

 

在BaseTransactionalExecutor的excute()方法中,首先会从线程本地变量副本中获取xid,然后再执行SQL。

Seata源码—6.Seata AT模式的数据源代理
public class PreparedStatementProxy extends AbstractPreparedStatementProxy implements PreparedStatement, ParametersHolder {     ...     @Override     public boolean execute() throws SQLException {         return ExecuteTemplate.execute(this, (statement, args) -> statement.execute());     }          @Override     public ResultSet executeQuery() throws SQLException {         return ExecuteTemplate.execute(this, (statement, args) -> statement.executeQuery());     }          @Override     public int executeUpdate() throws SQLException {         return ExecuteTemplate.execute(this, (statement, args) -> statement.executeUpdate());     }     ... }  public class ExecuteTemplate {     ...     public static <T, S extends Statement> T execute(StatementProxy<S> statementProxy, StatementCallback<T, S> statementCallback, Object... args) throws SQLException {         return execute(null, statementProxy, statementCallback, args);     }          public static <T, S extends Statement> T execute(List<SQLRecognizer> sqlRecognizers, StatementProxy<S> statementProxy, StatementCallback<T, S> statementCallback, Object... args) throws SQLException {         //如果发现不需要全局锁,而且没有开启AT模式下的全局事务,那么就普通执行本地事务         if (!RootContext.requireGlobalLock() && BranchType.AT != RootContext.getBranchType()) {             //Just work as original statement             return statementCallback.execute(statementProxy.getTargetStatement(), args);         }          //获取到DB的类型         String dbType = statementProxy.getConnectionProxy().getDbType();         if (CollectionUtils.isEmpty(sqlRecognizers)) {             sqlRecognizers = SQLVisitorFactory.get(statementProxy.getTargetSQL(), dbType);         }         Executor<T> executor;         if (CollectionUtils.isEmpty(sqlRecognizers)) {             executor = new PlainExecutor<>(statementProxy, statementCallback);         } else {             if (sqlRecognizers.size() == 1) {                 SQLRecognizer sqlRecognizer = sqlRecognizers.get(0);                 switch (sqlRecognizer.getSQLType()) {                     case INSERT:                         //通过SPI机制加载InsertExecutor                         executor = EnhancedServiceLoader.load(InsertExecutor.class, dbType, new Class[]{StatementProxy.class, StatementCallback.class, SQLRecognizer.class}, new Object[]{statementProxy, statementCallback, sqlRecognizer});                         break;                     case UPDATE:                         executor = new UpdateExecutor<>(statementProxy, statementCallback, sqlRecognizer);                         break;                     case DELETE:                         executor = new DeleteExecutor<>(statementProxy, statementCallback, sqlRecognizer);                         break;                     case SELECT_FOR_UPDATE:                         executor = new SelectForUpdateExecutor<>(statementProxy, statementCallback, sqlRecognizer);                         break;                     case INSERT_ON_DUPLICATE_UPDATE:                         switch (dbType) {                             case JdbcConstants.MYSQL:                             case JdbcConstants.MARIADB:                                 executor = new MySQLInsertOrUpdateExecutor(statementProxy, statementCallback, sqlRecognizer);                                 break;                             default:                                 throw new NotSupportYetException(dbType + " not support to INSERT_ON_DUPLICATE_UPDATE");                         }                         break;                     default:                         executor = new PlainExecutor<>(statementProxy, statementCallback);                         break;                 }             } else {                 executor = new MultiExecutor<>(statementProxy, statementCallback, sqlRecognizers);             }         }         T rs;         try {             //比如下面最终会调用BaseTransactionalExecutor.excute()方法             rs = executor.execute(args);         } catch (Throwable ex) {             if (!(ex instanceof SQLException)) {                 // Turn other exception into SQLException                 ex = new SQLException(ex);             }             throw (SQLException) ex;         }         return rs;     }     ... }  @LoadLevel(name = JdbcConstants.MYSQL, scope = Scope.PROTOTYPE) public class MySQLInsertExecutor extends BaseInsertExecutor implements Defaultable {     ...     //Instantiates a new Abstract dml base executor.     public MySQLInsertExecutor(StatementProxy statementProxy, StatementCallback statementCallback, SQLRecognizer sqlRecognizer) {         super(statementProxy, statementCallback, sqlRecognizer);     }     ... }  public abstract class BaseInsertExecutor<T, S extends Statement> extends AbstractDMLBaseExecutor<T, S> implements InsertExecutor<T> {     ...     public BaseInsertExecutor(StatementProxy<S> statementProxy, StatementCallback<T, S> statementCallback, SQLRecognizer sqlRecognizer) {         super(statementProxy, statementCallback, sqlRecognizer);     }     ... }  public abstract class AbstractDMLBaseExecutor<T, S extends Statement> extends BaseTransactionalExecutor<T, S> {     ...     public AbstractDMLBaseExecutor(StatementProxy<S> statementProxy, StatementCallback<T, S> statementCallback, SQLRecognizer sqlRecognizer) {         super(statementProxy, statementCallback, sqlRecognizer);     }          @Override     public T doExecute(Object... args) throws Throwable {         AbstractConnectionProxy connectionProxy = statementProxy.getConnectionProxy();         //判断是否是自动提交本地事务,默认情况本地事务都是自动提交的,此时需要阻止自动提交         if (connectionProxy.getAutoCommit()) {             return executeAutoCommitTrue(args);         } else {             return executeAutoCommitFalse(args);         }     }     ... }  public abstract class BaseTransactionalExecutor<T, S extends Statement> implements Executor<T> {     //The Statement proxy.     protected StatementProxy<S> statementProxy;     //The Statement callback.     protected StatementCallback<T, S> statementCallback;     //The Sql recognizer.     protected SQLRecognizer sqlRecognizer;     ...     public BaseTransactionalExecutor(StatementProxy<S> statementProxy, StatementCallback<T, S> statementCallback,         SQLRecognizer sqlRecognizer) {         this.statementProxy = statementProxy;         this.statementCallback = statementCallback;         this.sqlRecognizer = sqlRecognizer;     }     ...     @Override     public T execute(Object... args) throws Throwable {         //获取xid         String xid = RootContext.getXID();         if (xid != null) {             statementProxy.getConnectionProxy().bind(xid);         }         statementProxy.getConnectionProxy().setGlobalLockRequire(RootContext.requireGlobalLock());         return doExecute(args);     }          //Do execute object.     protected abstract T doExecute(Object... args) throws Throwable;     ... }

 

7.执行SQL语句前取消自动提交事务的源码

执行ExecuteTemplate执行模版的execute()方法时,最终会调用到BaseTransactionalExecutor基础事务执行器的excute()方法。

 

执行BaseTransactionalExecutor的execute()方法时,又会执行到AbstractDMLBaseExecutor的doExecute()方法。该方法会判断目标数据库连接是否会自动提交本地事务,默认情况下本地事务都是自动提交的。如果是,则取消自动提交本地事务。

Seata源码—6.Seata AT模式的数据源代理
public abstract class BaseTransactionalExecutor<T, S extends Statement> implements Executor<T> {     //The Statement proxy.     protected StatementProxy<S> statementProxy;     //The Statement callback.     protected StatementCallback<T, S> statementCallback;     //The Sql recognizer.     protected SQLRecognizer sqlRecognizer;     ...     public BaseTransactionalExecutor(StatementProxy<S> statementProxy, StatementCallback<T, S> statementCallback,         SQLRecognizer sqlRecognizer) {         this.statementProxy = statementProxy;         this.statementCallback = statementCallback;         this.sqlRecognizer = sqlRecognizer;     }     ...     @Override     public T execute(Object... args) throws Throwable {         //获取xid         String xid = RootContext.getXID();         if (xid != null) {             statementProxy.getConnectionProxy().bind(xid);         }         statementProxy.getConnectionProxy().setGlobalLockRequire(RootContext.requireGlobalLock());         return doExecute(args);     }          //Do execute object.     protected abstract T doExecute(Object... args) throws Throwable;     ... }  public abstract class AbstractDMLBaseExecutor<T, S extends Statement> extends BaseTransactionalExecutor<T, S> {     ...     public AbstractDMLBaseExecutor(StatementProxy<S> statementProxy, StatementCallback<T, S> statementCallback, SQLRecognizer sqlRecognizer) {         super(statementProxy, statementCallback, sqlRecognizer);     }          @Override     public T doExecute(Object... args) throws Throwable {         AbstractConnectionProxy connectionProxy = statementProxy.getConnectionProxy();         //判断是否是自动提交本地事务,默认情况本地事务都是自动提交的,此时需要阻止自动提交         if (connectionProxy.getAutoCommit()) {             return executeAutoCommitTrue(args);         } else {             return executeAutoCommitFalse(args);         }     }     ... }  public abstract class AbstractConnectionProxy implements Connection {     ...     @Override     public boolean getAutoCommit() throws SQLException {         //判断目标数据库连接是否是自动提交,默认情况是都是自动提交的         return targetConnection.getAutoCommit();     }     ... }  public abstract class AbstractDMLBaseExecutor<T, S extends Statement> extends BaseTransactionalExecutor<T, S> {     ...     //Execute auto commit true t.     protected T executeAutoCommitTrue(Object[] args) throws Throwable {         ConnectionProxy connectionProxy = statementProxy.getConnectionProxy();         try {             //修改自动提交事务的设置,此时需要阻止自动提交事务             connectionProxy.changeAutoCommit();             return new LockRetryPolicy(connectionProxy).execute(() -> {                 T result = executeAutoCommitFalse(args);//执行SQL语句                 connectionProxy.commit();//手动提交本地事务                 return result;             });         } catch (Exception e) {             //when exception occur in finally,this exception will lost, so just print it here             LOGGER.error("execute executeAutoCommitTrue error:{}", e.getMessage(), e);             if (!LockRetryPolicy.isLockRetryPolicyBranchRollbackOnConflict()) {                 connectionProxy.getTargetConnection().rollback();             }             throw e;         } finally {             connectionProxy.getContext().reset();             connectionProxy.setAutoCommit(true);         }     }     ... }  public class ConnectionProxy extends AbstractConnectionProxy {     private final ConnectionContext context = new ConnectionContext();     ...     //change connection autoCommit to false by seata     public void changeAutoCommit() throws SQLException {         getContext().setAutoCommitChanged(true);         setAutoCommit(false);     }          //Gets context.     public ConnectionContext getContext() {         return context;     }          @Override     public void setAutoCommit(boolean autoCommit) throws SQLException {         if ((context.inGlobalTransaction() || context.isGlobalLockRequire()) && autoCommit && !getAutoCommit()) {             //change autocommit from false to true, we should commit() first according to JDBC spec.             doCommit();         }         //把目标数据源连接的自动提交事务设置为false         targetConnection.setAutoCommit(autoCommit);     }     ... }

 

8.执行SQL语句前后构建数据镜像的源码

(1)AbstractDMLBaseExecutor的doExecute()方法的执行流程

(2)以UpdateExecuto为例构建前后镜像

 

(1)AbstractDMLBaseExecutor的doExecute()方法的执行流程

一.首先设置数据源连接阻止其自动提交事务

二.根据目标SQL语句构建beforeImage前镜像

三.执行目标SQL语句(但还没提交其对应的事务)

四.根据beforeImage前镜像构建afterImage后镜像

五.根据前镜像和后镜像构建UndoLog数据

六.手动提交数据源连接代理的事务

Seata源码—6.Seata AT模式的数据源代理
public abstract class AbstractDMLBaseExecutor<T, S extends Statement> extends BaseTransactionalExecutor<T, S> {     ...     //Execute auto commit true t.     protected T executeAutoCommitTrue(Object[] args) throws Throwable {         ConnectionProxy connectionProxy = statementProxy.getConnectionProxy();         try {             //修改数据源连接的自动提交事务的设置,此时需要阻止自动提交事务             connectionProxy.changeAutoCommit();             return new LockRetryPolicy(connectionProxy).execute(() -> {                 T result = executeAutoCommitFalse(args);//执行SQL语句                 connectionProxy.commit();//手动提交本地事务                 return result;             });         } catch (Exception e) {             // when exception occur in finally,this exception will lost, so just print it here             LOGGER.error("execute executeAutoCommitTrue error:{}", e.getMessage(), e);             if (!LockRetryPolicy.isLockRetryPolicyBranchRollbackOnConflict()) {                 connectionProxy.getTargetConnection().rollback();             }             throw e;         } finally {             connectionProxy.getContext().reset();             connectionProxy.setAutoCommit(true);         }     }          //Execute auto commit false t.     protected T executeAutoCommitFalse(Object[] args) throws Exception {         if (!JdbcConstants.MYSQL.equalsIgnoreCase(getDbType()) && isMultiPk()) {             throw new NotSupportYetException("multi pk only support mysql!");         }         //根据目标SQL语句构建beforeImage,表示目标SQL执行前的数据镜像         TableRecords beforeImage = beforeImage();         //接下来真正去执行这条SQL语句,但是此时本地事务还不会提交         T result = statementCallback.execute(statementProxy.getTargetStatement(), args);         int updateCount = statementProxy.getUpdateCount();         if (updateCount > 0) {             //根据beforeImage构建afterImage,表示目标SQL执行后的数据镜像             TableRecords afterImage = afterImage(beforeImage);             //根据beforeImage和afterImage准备undoLog数据到数据源连接代理中             prepareUndoLog(beforeImage, afterImage);         }         return result;     }     ... }

(2)以UpdateExecutor为例构建前后镜像

public class TableRecords implements java.io.Serializable {     //表的元数据     private transient TableMeta tableMeta;     //表的名称     private String tableName;     //表的多行数据     private List<Row> rows = new ArrayList<Row>();     ... }  public class UpdateExecutor<T, S extends Statement> extends AbstractDMLBaseExecutor<T, S> {     private static final Configuration CONFIG = ConfigurationFactory.getInstance();     private static final boolean ONLY_CARE_UPDATE_COLUMNS = CONFIG.getBoolean(ConfigurationKeys.TRANSACTION_UNDO_ONLY_CARE_UPDATE_COLUMNS, DefaultValues.DEFAULT_ONLY_CARE_UPDATE_COLUMNS);          //Instantiates a new Update executor.     public UpdateExecutor(StatementProxy<S> statementProxy, StatementCallback<T, S> statementCallback, SQLRecognizer sqlRecognizer) {         super(statementProxy, statementCallback, sqlRecognizer);     }      @Override     protected TableRecords beforeImage() throws SQLException {         ArrayList<List<Object>> paramAppenderList = new ArrayList<>();         TableMeta tmeta = getTableMeta();         //根据主键ID值拼接一个SQL语句,查询这条数据更新前的镜像         String selectSQL = buildBeforeImageSQL(tmeta, paramAppenderList);         return buildTableRecords(tmeta, selectSQL, paramAppenderList);     }      private String buildBeforeImageSQL(TableMeta tableMeta, ArrayList<List<Object>> paramAppenderList) {         SQLUpdateRecognizer recognizer = (SQLUpdateRecognizer) sqlRecognizer;         List<String> updateColumns = recognizer.getUpdateColumns();         StringBuilder prefix = new StringBuilder("SELECT ");         StringBuilder suffix = new StringBuilder(" FROM ").append(getFromTableInSQL());         String whereCondition = buildWhereCondition(recognizer, paramAppenderList);         String orderByCondition = buildOrderCondition(recognizer, paramAppenderList);         String limitCondition = buildLimitCondition(recognizer, paramAppenderList);         if (StringUtils.isNotBlank(whereCondition)) {             suffix.append(WHERE).append(whereCondition);         }         if (StringUtils.isNotBlank(orderByCondition)) {             suffix.append(" ").append(orderByCondition);         }         if (StringUtils.isNotBlank(limitCondition)) {             suffix.append(" ").append(limitCondition);         }         suffix.append(" FOR UPDATE");         StringJoiner selectSQLJoin = new StringJoiner(", ", prefix.toString(), suffix.toString());         if (ONLY_CARE_UPDATE_COLUMNS) {             if (!containsPK(updateColumns)) {                 selectSQLJoin.add(getColumnNamesInSQL(tableMeta.getEscapePkNameList(getDbType())));             }             for (String columnName : updateColumns) {                 selectSQLJoin.add(columnName);             }              //The on update xxx columns will be auto update by db, so it's also the actually updated columns             List<String> onUpdateColumns = tableMeta.getOnUpdateColumnsOnlyName();             onUpdateColumns.removeAll(updateColumns);             for (String onUpdateColumn : onUpdateColumns) {                 selectSQLJoin.add(ColumnUtils.addEscape(onUpdateColumn, getDbType()));             }         } else {             for (String columnName : tableMeta.getAllColumns().keySet()) {                 selectSQLJoin.add(ColumnUtils.addEscape(columnName, getDbType()));             }         }         return selectSQLJoin.toString();     }      @Override     protected TableRecords afterImage(TableRecords beforeImage) throws SQLException {         TableMeta tmeta = getTableMeta();         if (beforeImage == null || beforeImage.size() == 0) {             return TableRecords.empty(getTableMeta());         }         String selectSQL = buildAfterImageSQL(tmeta, beforeImage);         ResultSet rs = null;         try (PreparedStatement pst = statementProxy.getConnection().prepareStatement(selectSQL)) {             SqlGenerateUtils.setParamForPk(beforeImage.pkRows(), getTableMeta().getPrimaryKeyOnlyName(), pst);             rs = pst.executeQuery();             return TableRecords.buildRecords(tmeta, rs);         } finally {             IOUtil.close(rs);         }     }      private String buildAfterImageSQL(TableMeta tableMeta, TableRecords beforeImage) throws SQLException {         StringBuilder prefix = new StringBuilder("SELECT ");         String whereSql = SqlGenerateUtils.buildWhereConditionByPKs(tableMeta.getPrimaryKeyOnlyName(), beforeImage.pkRows().size(), getDbType());         String suffix = " FROM " + getFromTableInSQL() + " WHERE " + whereSql;         StringJoiner selectSQLJoiner = new StringJoiner(", ", prefix.toString(), suffix);         if (ONLY_CARE_UPDATE_COLUMNS) {             SQLUpdateRecognizer recognizer = (SQLUpdateRecognizer) sqlRecognizer;             List<String> updateColumns = recognizer.getUpdateColumns();             if (!containsPK(updateColumns)) {                 selectSQLJoiner.add(getColumnNamesInSQL(tableMeta.getEscapePkNameList(getDbType())));             }             for (String columnName : updateColumns) {                 selectSQLJoiner.add(columnName);             }              //The on update xxx columns will be auto update by db, so it's also the actually updated columns             List<String> onUpdateColumns = tableMeta.getOnUpdateColumnsOnlyName();             onUpdateColumns.removeAll(updateColumns);             for (String onUpdateColumn : onUpdateColumns) {                 selectSQLJoiner.add(ColumnUtils.addEscape(onUpdateColumn, getDbType()));             }         } else {             for (String columnName : tableMeta.getAllColumns().keySet()) {                 selectSQLJoiner.add(ColumnUtils.addEscape(columnName, getDbType()));             }         }         return selectSQLJoiner.toString();     } }

 

9.构建全局锁的key和UndoLog数据的源码

(1)prepareUndoLog()方法会构建全局锁的key和UndoLog数据

(2)构建全局锁的key的源码

(3)构建UndoLog数据的源码

 

(1)prepareUndoLog()方法会构建全局锁的key和UndoLog数据

在基础事务执行器BaseTransactionalExecutor的prepareUndoLog()方法中,会构建全局锁的key和构建UndoLog数据,并把它们设置到数据源连接代理ConnectionProxy中。

Seata源码—6.Seata AT模式的数据源代理
public abstract class BaseTransactionalExecutor<T, S extends Statement> implements Executor<T> {     ...     //prepare undo log.     //@param beforeImage the before image     //@param afterImage  the after image     protected void prepareUndoLog(TableRecords beforeImage, TableRecords afterImage) throws SQLException {         if (beforeImage.getRows().isEmpty() && afterImage.getRows().isEmpty()) {             return;         }         if (SQLType.UPDATE == sqlRecognizer.getSQLType()) {             if (beforeImage.getRows().size() != afterImage.getRows().size()) {                 throw new ShouldNeverHappenException("Before image size is not equaled to after image size, probably because you updated the primary keys.");             }         }         ConnectionProxy connectionProxy = statementProxy.getConnectionProxy();         TableRecords lockKeyRecords = sqlRecognizer.getSQLType() == SQLType.DELETE ? beforeImage : afterImage;          //构建全局锁的key         //比如更新了一批数据,那么需要针对这批数据的主键ID,来构建这批数据的全局锁的key         String lockKeys = buildLockKey(lockKeyRecords);          if (null != lockKeys) {             //将全局锁key设置到数据源连接代理ConnectionProxy中             connectionProxy.appendLockKey(lockKeys);             //构建UndoLog             SQLUndoLog sqlUndoLog = buildUndoItem(beforeImage, afterImage);             //将UndoLog设置到数据源连接代理ConnectionProxy中             connectionProxy.appendUndoLog(sqlUndoLog);         }     }     ... }

(2)构建全局锁的key的源码

public abstract class BaseTransactionalExecutor<T, S extends Statement> implements Executor<T> {     ...     //build lockKey     //@param rowsIncludingPK the records     //@return the string as local key. the local key example(multi pk): "t_user:1_a,2_b"     protected String buildLockKey(TableRecords rowsIncludingPK) {         if (rowsIncludingPK.size() == 0) {             return null;         }          //构建出来的全局锁的key形式为:table_name:id_11001         StringBuilder sb = new StringBuilder();         sb.append(rowsIncludingPK.getTableMeta().getTableName());         sb.append(":");         int filedSequence = 0;         //pksRows指的是,更新的每一行数据主键字段和主键的值         List<Map<String, Field>> pksRows = rowsIncludingPK.pkRows();         //获取到主键字段名称,主键可能是联合主键,主键字段的名称可能有多个         List<String> primaryKeysOnlyName = getTableMeta().getPrimaryKeyOnlyName();         //rowMap就是一行数据,rowMap中的key是字段名称,value是字段值         for (Map<String, Field> rowMap : pksRows) {             int pkSplitIndex = 0;             //遍历和提取这行数据里多个主键字段的名称             for (String pkName : primaryKeysOnlyName) {                 if (pkSplitIndex > 0) {                     sb.append("_");                 }                 //获取到多个主键字段的value,然后拼接在一起                 sb.append(rowMap.get(pkName).getValue());                 pkSplitIndex++;             }             filedSequence++;             if (filedSequence < pksRows.size()) {                 sb.append(",");             }         }          //最终拼成的key形如:table_name:1101_aadd,table_name:xxxx_xxx         return sb.toString();     }     ... }

(3)构建UndoLog数据的源码

public abstract class BaseTransactionalExecutor<T, S extends Statement> implements Executor<T> {     ...     //build a SQLUndoLog     //@param beforeImage the before image     //@param afterImage  the after image     protected SQLUndoLog buildUndoItem(TableRecords beforeImage, TableRecords afterImage) {         SQLType sqlType = sqlRecognizer.getSQLType();         String tableName = sqlRecognizer.getTableName();          SQLUndoLog sqlUndoLog = new SQLUndoLog();         sqlUndoLog.setSqlType(sqlType);//SQL的类型可能为insert、update、delete         sqlUndoLog.setTableName(tableName);//表的名称         sqlUndoLog.setBeforeImage(beforeImage);//SQL执行前的数据镜像         sqlUndoLog.setAfterImage(afterImage);//SQL执行后的数据镜像         return sqlUndoLog;     }     ... }  public class SQLUndoLog implements java.io.Serializable {     private SQLType sqlType;     private String tableName;     private TableRecords beforeImage;     private TableRecords afterImage;     ... }

 

10.Seata Client发起分支事务注册的源码

(1)ConnectionProxy.commit()提交事务

(2)ConnectionProxy.register()注册分支事务

 

(1)ConnectionProxy.commit()提交事务

执行数据源连接代理ConnectionProxy的commit()方法提交事务的时候,首先会先调用数据源连接代理ConnectionProxy的register()方法注册分支事务。

Seata源码—6.Seata AT模式的数据源代理
public class ConnectionProxy extends AbstractConnectionProxy {     private final ConnectionContext context = new ConnectionContext();     ...     @Override     public void commit() throws SQLException {         try {             //通过全局锁重试策略组件来执行本地事务的提交             lockRetryPolicy.execute(() -> {                 doCommit();                 return null;             });         } catch (SQLException e) {             if (targetConnection != null && !getAutoCommit() && !getContext().isAutoCommitChanged()) {                 rollback();             }             throw e;         } catch (Exception e) {             throw new SQLException(e);         }     }          private void doCommit() throws SQLException {         if (context.inGlobalTransaction()) {             processGlobalTransactionCommit();         } else if (context.isGlobalLockRequire()) {             processLocalCommitWithGlobalLocks();         } else {             targetConnection.commit();         }     }      private void processLocalCommitWithGlobalLocks() throws SQLException {         //检查全局锁keys         checkLock(context.buildLockKeys());         try {             //目标数据源连接提交事务             targetConnection.commit();         } catch (Throwable ex) {             throw new SQLException(ex);         }         context.reset();     }      private void processGlobalTransactionCommit() throws SQLException {         try {             //注册分支事务             register();         } catch (TransactionException e) {             recognizeLockKeyConflictException(e, context.buildLockKeys());         }         try {             UndoLogManagerFactory.getUndoLogManager(this.getDbType()).flushUndoLogs(this);             //目标数据源连接提交事务             targetConnection.commit();         } catch (Throwable ex) {             LOGGER.error("process connectionProxy commit error: {}", ex.getMessage(), ex);             report(false);             throw new SQLException(ex);         }         if (IS_REPORT_SUCCESS_ENABLE) {             report(true);         }         context.reset();     }     ... }

(2)ConnectionProxy.register()注册分支事务

执行数据源连接代理ConnectionProxy的register()方法注册分支事务的时候,会调用资源管理器DefaultResourceManager的branchRegister()方法,然后会继续调用AbstractResourceManager的branchRegister()方法来注册分支事务。

 

在AbstractResourceManager的branchRegister()方法中,首先会构造分支事务注册请求,然后通过RmNettyRemotingClient将分支事务注册请求发送给Seata Server。

//The type Connection proxy. //数据源连接代理 public class ConnectionProxy extends AbstractConnectionProxy {     private final ConnectionContext context = new ConnectionContext();     ...     private void register() throws TransactionException {         if (!context.hasUndoLog() || !context.hasLockKey()) {             return;         }         //分支事务注册         Long branchId = DefaultResourceManager.get().branchRegister(             BranchType.AT,//事务类型             getDataSourceProxy().getResourceId(),//资源id,资源是已经注册过了的             null,             context.getXid(),             context.getApplicationData(),             context.buildLockKeys()//注册分支事物时带上全局锁keys         );         context.setBranchId(branchId);     }     ... }  public class DefaultResourceManager implements ResourceManager {     protected static Map<BranchType, ResourceManager> resourceManagers = new ConcurrentHashMap<>();          private static class SingletonHolder {         private static DefaultResourceManager INSTANCE = new DefaultResourceManager();     }          public static DefaultResourceManager get() {         return SingletonHolder.INSTANCE;     }          private DefaultResourceManager() {         initResourceManagers();     }          protected void initResourceManagers() {         //通过SPI加载所有的ResourceManager资源管理器         //比如:DataSourceManager、TCCResourceManager、SagaResourceManager、ResourceManagerXA         List<ResourceManager> allResourceManagers = EnhancedServiceLoader.loadAll(ResourceManager.class);         if (CollectionUtils.isNotEmpty(allResourceManagers)) {             for (ResourceManager rm : allResourceManagers) {                 resourceManagers.put(rm.getBranchType(), rm);             }         }     }          //注册分支事务     @Override     public Long branchRegister(BranchType branchType, String resourceId, String clientId, String xid, String applicationData, String lockKeys) throws TransactionException {         return getResourceManager(branchType).branchRegister(branchType, resourceId, clientId, xid, applicationData, lockKeys);     }          public ResourceManager getResourceManager(BranchType branchType) {         ResourceManager rm = resourceManagers.get(branchType);         if (rm == null) {             throw new FrameworkException("No ResourceManager for BranchType:" + branchType.name());         }         return rm;     }     ... }  public abstract class AbstractResourceManager implements ResourceManager {     ...     @Override     public Long branchRegister(BranchType branchType, String resourceId, String clientId, String xid, String applicationData, String lockKeys) throws TransactionException {         try {             BranchRegisterRequest request = new BranchRegisterRequest();             request.setXid(xid);//xid是全局事务id             request.setLockKey(lockKeys);//这次分支事务要更新数据全局锁key             request.setResourceId(resourceId);//分支事务对应的资源id             request.setBranchType(branchType);//分支事务类型             request.setApplicationData(applicationData);//应用数据              BranchRegisterResponse response = (BranchRegisterResponse) RmNettyRemotingClient.getInstance().sendSyncRequest(request);             if (response.getResultCode() == ResultCode.Failed) {                 throw new RmTransactionException(response.getTransactionExceptionCode(), String.format("Response[ %s ]", response.getMsg()));             }             return response.getBranchId();         } catch (TimeoutException toe) {             throw new RmTransactionException(TransactionExceptionCode.IO, "RPC Timeout", toe);         } catch (RuntimeException rex) {             throw new RmTransactionException(TransactionExceptionCode.BranchRegisterFailed, "Runtime", rex);         }     }     ... }

 

11.Seata Server处理分支事务注册请求的源码

(1)Seata Server收到分支事务注册请求后的处理

(2)BranchRegisterRequest.handle()的处理

(3)DefaultCore.branchRegister()的处理

 

(1)Seata Server收到分支事务注册请求后的处理

Seata Server收到Seata Client发送过来的分支事务注册请求后,首先会将分支事务注册请求交给ServerOnRequestProcessor的process()方法进行处理,然后再将请求交给DefaultCoordinator的onRequest()方法进行处理。

public abstract class AbstractNettyRemotingServer extends AbstractNettyRemoting implements RemotingServer {     ...     @ChannelHandler.Sharable     class ServerHandler extends ChannelDuplexHandler {         @Override         public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception {             if (!(msg instanceof RpcMessage)) {                 return;             }             //接下来调用processMessage()方法对解码完毕的RpcMessage对象进行处理             processMessage(ctx, (RpcMessage) msg);         }     } }  public abstract class AbstractNettyRemoting implements Disposable {     ...     protected void processMessage(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception {         if (LOGGER.isDebugEnabled()) {             LOGGER.debug(String.format("%s msgId:%s, body:%s", this, rpcMessage.getId(), rpcMessage.getBody()));         }         Object body = rpcMessage.getBody();         if (body instanceof MessageTypeAware) {             MessageTypeAware messageTypeAware = (MessageTypeAware) body;             //根据消息类型获取到一个Pair对象,该Pair对象是由请求处理组件和请求处理线程池组成的             //processorTable里的内容,是NettyRemotingServer在初始化时,通过调用registerProcessor()方法put进去的             //所以下面的代码实际上会由ServerOnRequestProcessor的process()方法进行处理             final Pair<RemotingProcessor, ExecutorService> pair = this.processorTable.get((int) messageTypeAware.getTypeCode());             if (pair != null) {                 if (pair.getSecond() != null) {                     try {                         pair.getSecond().execute(() -> {                             try {                                 pair.getFirst().process(ctx, rpcMessage);                             } catch (Throwable th) {                                 LOGGER.error(FrameworkErrorCode.NetDispatch.getErrCode(), th.getMessage(), th);                             } finally {                                 MDC.clear();                             }                         });                     } catch (RejectedExecutionException e) {                         ...                     }                 } else {                     try {                         pair.getFirst().process(ctx, rpcMessage);                     } catch (Throwable th) {                         LOGGER.error(FrameworkErrorCode.NetDispatch.getErrCode(), th.getMessage(), th);                     }                 }             } else {                 LOGGER.error("This message type [{}] has no processor.", messageTypeAware.getTypeCode());             }         } else {             LOGGER.error("This rpcMessage body[{}] is not MessageTypeAware type.", body);         }     }     ... }  public class ServerOnRequestProcessor implements RemotingProcessor, Disposable {     private final RemotingServer remotingServer;     ...     @Override     public void process(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception {         if (ChannelManager.isRegistered(ctx.channel())) {             onRequestMessage(ctx, rpcMessage);         } else {             try {                 if (LOGGER.isInfoEnabled()) {                     LOGGER.info("closeChannelHandlerContext channel:" + ctx.channel());                 }                 ctx.disconnect();                 ctx.close();             } catch (Exception exx) {                 LOGGER.error(exx.getMessage());             }             if (LOGGER.isInfoEnabled()) {                 LOGGER.info(String.format("close a unhandled connection! [%s]", ctx.channel().toString()));             }         }     }          private void onRequestMessage(ChannelHandlerContext ctx, RpcMessage rpcMessage) {         Object message = rpcMessage.getBody();         //RpcContext线程本地变量副本         RpcContext rpcContext = ChannelManager.getContextFromIdentified(ctx.channel());         if (LOGGER.isDebugEnabled()) {             LOGGER.debug("server received:{},clientIp:{},vgroup:{}", message, NetUtil.toIpAddress(ctx.channel().remoteAddress()), rpcContext.getTransactionServiceGroup());         } else {             try {                 BatchLogHandler.INSTANCE.getLogQueue().put(message + ",clientIp:" + NetUtil.toIpAddress(ctx.channel().remoteAddress()) + ",vgroup:" + rpcContext.getTransactionServiceGroup());             } catch (InterruptedException e) {                 LOGGER.error("put message to logQueue error: {}", e.getMessage(), e);             }         }         if (!(message instanceof AbstractMessage)) {             return;         }         //the batch send request message         if (message instanceof MergedWarpMessage) {             ...         } else {             //the single send request message             final AbstractMessage msg = (AbstractMessage) message;             //最终调用到DefaultCoordinator的onRequest()方法来处理RpcMessage             //此时传入的msg其实就是客户端发送请求时的BranchRegisterRequest对象             AbstractResultMessage result = transactionMessageHandler.onRequest(msg, rpcContext);             //返回响应给客户端             remotingServer.sendAsyncResponse(rpcMessage, ctx.channel(), result);         }     }     ... }  public class DefaultCoordinator extends AbstractTCInboundHandler implements TransactionMessageHandler, Disposable {     ...     @Override     public AbstractResultMessage onRequest(AbstractMessage request, RpcContext context) {         if (!(request instanceof AbstractTransactionRequestToTC)) {             throw new IllegalArgumentException();         }         //此时传入的request其实就是客户端发送请求时的BranchRegisterRequest对象         AbstractTransactionRequestToTC transactionRequest = (AbstractTransactionRequestToTC) request;         transactionRequest.setTCInboundHandler(this);         return transactionRequest.handle(context);     }     ... }

(2)BranchRegisterRequest.handle()的处理

在DefaultCoordinator的onRequest()方法中,会调用BranchRegisterRequest的handle()方法来处理分支事务注册请求,该handle()方法又会调用DefaultCoordinator的doBranchRegister()方法,所以最后会调用DefaultCore的branchRegister()方法来具体处理分支事务注册请求。

public class BranchRegisterRequest extends AbstractTransactionRequestToTC  {     ...     @Override     public AbstractTransactionResponse handle(RpcContext rpcContext) {         return handler.handle(this, rpcContext);     }     ... }  public interface TCInboundHandler {     ...     //Handle branch register response.     BranchRegisterResponse handle(BranchRegisterRequest branchRegister, RpcContext rpcContext); }  public abstract class AbstractTCInboundHandler extends AbstractExceptionHandler implements TCInboundHandler {     ...     @Override     public BranchRegisterResponse handle(BranchRegisterRequest request, final RpcContext rpcContext) {         BranchRegisterResponse response = new BranchRegisterResponse();         exceptionHandleTemplate(new AbstractCallback<BranchRegisterRequest, BranchRegisterResponse>() {             @Override             public void execute(BranchRegisterRequest request, BranchRegisterResponse response) throws TransactionException {                 try {                     doBranchRegister(request, response, rpcContext);                 } catch (StoreException e) {                     throw new TransactionException(TransactionExceptionCode.FailedStore, String.format("branch register request failed. xid=%s, msg=%s", request.getXid(), e.getMessage()), e);                 }             }         }, request, response);         return response;     }          //Do branch register.     protected abstract void doBranchRegister(BranchRegisterRequest request, BranchRegisterResponse response, RpcContext rpcContext) throws TransactionException;     ... }  public class DefaultCoordinator extends AbstractTCInboundHandler implements TransactionMessageHandler, Disposable {     private final DefaultCore core;     ...     @Override     protected void doBranchRegister(BranchRegisterRequest request, BranchRegisterResponse response, RpcContext rpcContext) throws TransactionException {         MDC.put(RootContext.MDC_KEY_XID, request.getXid());         //调用DefaultCore的branchRegister()方法处理分支事务注册请求         response.setBranchId(core.branchRegister(request.getBranchType(), request.getResourceId(), rpcContext.getClientId(), request.getXid(), request.getApplicationData(), request.getLockKey()));     }     ... }

(3)DefaultCore.branchRegister()的处理

DefaultCore的branchRegister()方法其实会继续调用其抽象父类AbstractCore的branchRegister()方法来处理注册分支事务请求,具体的过程如下:

 

一.根据xid获取全局事务会话

二.根据全局事务会话创建分支事务会话

三.通过MDC将分支事务ID存到线程本地变量副本

四.注册分支事务需要先获取全局锁

五.把分支事务会话加入到全局事务会话中并持久化

Seata源码—6.Seata AT模式的数据源代理
public class DefaultCore implements Core {     private static Map<BranchType, AbstractCore> coreMap = new ConcurrentHashMap<>();          public DefaultCore(RemotingServer remotingServer) {         List<AbstractCore> allCore = EnhancedServiceLoader.loadAll(AbstractCore.class, new Class[] {RemotingServer.class}, new Object[] {remotingServer});         if (CollectionUtils.isNotEmpty(allCore)) {             for (AbstractCore core : allCore) {                 coreMap.put(core.getHandleBranchType(), core);             }         }     }          @Override     public Long branchRegister(BranchType branchType, String resourceId, String clientId, String xid, String applicationData, String lockKeys) throws TransactionException {         return getCore(branchType).branchRegister(branchType, resourceId, clientId, xid, applicationData, lockKeys);     }          public AbstractCore getCore(BranchType branchType) {         AbstractCore core = coreMap.get(branchType);         if (core == null) {             throw new NotSupportYetException("unsupported type:" + branchType.name());         }         return core;     }     ... }  public abstract class AbstractCore implements Core {     protected RemotingServer remotingServer;          public AbstractCore(RemotingServer remotingServer) {         if (remotingServer == null) {             throw new IllegalArgumentException("remotingServer must be not null");         }         this.remotingServer = remotingServer;     }          //注册分支事务     @Override     public Long branchRegister(BranchType branchType, String resourceId, String clientId, String xid, String applicationData, String lockKeys) throws TransactionException {         //1.根据xid获取全局事务会话GlobalSession         GlobalSession globalSession = assertGlobalSessionNotNull(xid, false);         return SessionHolder.lockAndExecute(globalSession, () -> {             globalSessionStatusCheck(globalSession);             globalSession.addSessionLifecycleListener(SessionHolder.getRootSessionManager());              //2.创建分支事务会话BranchSession,根据全局事务开启一个分支事务             //传入的参数依次是:全局事务会话、事务类型、资源ID、应用数据、全局锁keys、客户端ID             BranchSession branchSession = SessionHelper.newBranchByGlobal(globalSession, branchType, resourceId, applicationData, lockKeys, clientId);              //3.把分支事务的ID存放到线程本地变量副本中,也就是MDC中             MDC.put(RootContext.MDC_KEY_BRANCH_ID, String.valueOf(branchSession.getBranchId()));              //4.注册分支事务时会获取全局锁             //分支事务会话branchSession尝试获取一个全局锁,获取失败会抛异常,说明分支事务注册失败             branchSessionLock(globalSession, branchSession);              try {                 //5.把分支事务会话加入到全局事务会话中                 globalSession.addBranch(branchSession);             } catch (RuntimeException ex) {                 branchSessionUnlock(branchSession);                 throw new BranchTransactionException(FailedToAddBranch, String.format("Failed to store branch xid = %s branchId = %s", globalSession.getXid(), branchSession.getBranchId()), ex);             }             if (LOGGER.isInfoEnabled()) {                 LOGGER.info("Register branch successfully, xid = {}, branchId = {}, resourceId = {} ,lockKeys = {}", globalSession.getXid(), branchSession.getBranchId(), resourceId, lockKeys);             }             return branchSession.getBranchId();         });     }          private GlobalSession assertGlobalSessionNotNull(String xid, boolean withBranchSessions) throws TransactionException {         //根据xid寻找全局事务会话GlobalSession         GlobalSession globalSession = SessionHolder.findGlobalSession(xid, withBranchSessions);         if (globalSession == null) {             throw new GlobalTransactionException(TransactionExceptionCode.GlobalTransactionNotExist, String.format("Could not found global transaction xid = %s, may be has finished.", xid));         }         return globalSession;     }          //获取全局锁,获取全局锁失败则抛异常     protected void branchSessionLock(GlobalSession globalSession, BranchSession branchSession) throws TransactionException {     }     ... }  public class SessionHolder {     ...     //根据xid获取全局事务会话GlobalSession     public static GlobalSession findGlobalSession(String xid, boolean withBranchSessions) {         return getRootSessionManager().findGlobalSession(xid, withBranchSessions);     }     ... }  @LoadLevel(name = "db", scope = Scope.PROTOTYPE) public class DataBaseSessionManager extends AbstractSessionManager implements Initialize {     ...     //根据xid获取全局事务会话GlobalSession     @Override     public GlobalSession findGlobalSession(String xid, boolean withBranchSessions) {         return transactionStoreManager.readSession(xid, withBranchSessions);     }     ... }  public class DataBaseTransactionStoreManager extends AbstractTransactionStoreManager implements TransactionStoreManager {     ...     //根据xid获取全局事务会话GlobalSession     @Override     public GlobalSession readSession(String xid, boolean withBranchSessions) {         //global transaction         GlobalTransactionDO globalTransactionDO = logStore.queryGlobalTransactionDO(xid);         if (globalTransactionDO == null) {             return null;         }         //branch transactions         List<BranchTransactionDO> branchTransactionDOs = null;         //reduce rpc with db when branchRegister and getGlobalStatus         if (withBranchSessions) {             branchTransactionDOs = logStore.queryBranchTransactionDO(globalTransactionDO.getXid());         }         return getGlobalSession(globalTransactionDO, branchTransactionDOs);     }     ... }  public class SessionHelper {     ...     //创建分支事务会话     public static BranchSession newBranchByGlobal(GlobalSession globalSession, BranchType branchType, String resourceId, String applicationData, String lockKeys, String clientId) {         BranchSession branchSession = new BranchSession();         branchSession.setXid(globalSession.getXid());         branchSession.setTransactionId(globalSession.getTransactionId());         branchSession.setBranchId(UUIDGenerator.generateUUID());         branchSession.setBranchType(branchType);         branchSession.setResourceId(resourceId);         branchSession.setLockKey(lockKeys);         branchSession.setClientId(clientId);         branchSession.setApplicationData(applicationData);         return branchSession;     }     ... }  public class GlobalSession implements SessionLifecycle, SessionStorable {     private List<BranchSession> branchSessions;     ...     //把分支事务会话加入到全局事务会话中     @Override     public void addBranch(BranchSession branchSession) throws TransactionException {         for (SessionLifecycleListener lifecycleListener : lifecycleListeners) {             lifecycleListener.onAddBranch(this, branchSession);         }         branchSession.setStatus(BranchStatus.Registered);         add(branchSession);     }          //把分支事务会话加入到全局事务会话中     public boolean add(BranchSession branchSession) {         if (null != branchSessions) {             return branchSessions.add(branchSession);         } else {             //db and redis no need to deal with             return true;         }     }     ... }  public abstract class AbstractSessionManager implements SessionManager, SessionLifecycleListener {     ...     @Override     public void onAddBranch(GlobalSession globalSession, BranchSession branchSession) throws TransactionException {         addBranchSession(globalSession, branchSession);     }          @Override     public void addBranchSession(GlobalSession session, BranchSession branchSession) throws TransactionException {         if (LOGGER.isDebugEnabled()) {             LOGGER.debug("MANAGER[{}] SESSION[{}] {}", name, branchSession, LogOperation.BRANCH_ADD);         }         writeSession(LogOperation.BRANCH_ADD, branchSession);     }          //持久化全局事务会话     private void writeSession(LogOperation logOperation, SessionStorable sessionStorable) throws TransactionException {         //transactionStoreManager.writeSession()会对全局事务会话进行持久化         if (!transactionStoreManager.writeSession(logOperation, sessionStorable)) {             ...         }     }     ... }

 

12.将UndoLog写入到数据库与提交事务的源码

在数据源连接代理ConnectionProxy的processGlobalTransactionCommit()方法中:

一.首先会注册完分支事务

二.然后会将UndoLog写入到数据库

三.最后才提交目标数据源连接的事务

Seata源码—6.Seata AT模式的数据源代理
//数据源连接代理 public class ConnectionProxy extends AbstractConnectionProxy {     private final LockRetryPolicy lockRetryPolicy = new LockRetryPolicy(this);     ...     @Override     public void commit() throws SQLException {         try {             //通过全局锁重试策略组件LockRetryPolicy来执行本地事务的提交             lockRetryPolicy.execute(() -> {                 doCommit();                 return null;             });         } catch (SQLException e) {             if (targetConnection != null && !getAutoCommit() && !getContext().isAutoCommitChanged()) {                 rollback();             }             throw e;         } catch (Exception e) {             throw new SQLException(e);         }     }          private void doCommit() throws SQLException {         if (context.inGlobalTransaction()) {             processGlobalTransactionCommit();         } else if (context.isGlobalLockRequire()) {             processLocalCommitWithGlobalLocks();         } else {             targetConnection.commit();         }     }          private void processLocalCommitWithGlobalLocks() throws SQLException {         //检查全局锁keys         checkLock(context.buildLockKeys());         try {             //目标数据源连接提交事务             targetConnection.commit();         } catch (Throwable ex) {             throw new SQLException(ex);         }         context.reset();     }          private void processGlobalTransactionCommit() throws SQLException {         try {             //1.注册分支事务             register();         } catch (TransactionException e) {             recognizeLockKeyConflictException(e, context.buildLockKeys());         }         try {             //2.将UndoLog写入到数据库             UndoLogManagerFactory.getUndoLogManager(this.getDbType()).flushUndoLogs(this);             //3.目标数据源连接提交事务             targetConnection.commit();         } catch (Throwable ex) {             LOGGER.error("process connectionProxy commit error: {}", ex.getMessage(), ex);             report(false);             throw new SQLException(ex);         }         if (IS_REPORT_SUCCESS_ENABLE) {             report(true);         }         context.reset();     }     ... }  public class UndoLogManagerFactory {     private static final Map<String, UndoLogManager> UNDO_LOG_MANAGER_MAP = new ConcurrentHashMap<>();     //获取UndoLog管理器     public static UndoLogManager getUndoLogManager(String dbType) {         return CollectionUtils.computeIfAbsent(UNDO_LOG_MANAGER_MAP, dbType,             key -> EnhancedServiceLoader.load(UndoLogManager.class, dbType));     } }  public abstract class AbstractUndoLogManager implements UndoLogManager {     ...     @Override     public void flushUndoLogs(ConnectionProxy cp) throws SQLException {         ConnectionContext connectionContext = cp.getContext();         if (!connectionContext.hasUndoLog()) {             return;         }          String xid = connectionContext.getXid();         long branchId = connectionContext.getBranchId();          BranchUndoLog branchUndoLog = new BranchUndoLog();         branchUndoLog.setXid(xid);         branchUndoLog.setBranchId(branchId);         branchUndoLog.setSqlUndoLogs(connectionContext.getUndoItems());          UndoLogParser parser = UndoLogParserFactory.getInstance();         byte[] undoLogContent = parser.encode(branchUndoLog);          if (LOGGER.isDebugEnabled()) {             LOGGER.debug("Flushing UNDO LOG: {}", new String(undoLogContent, Constants.DEFAULT_CHARSET));         }          CompressorType compressorType = CompressorType.NONE;         if (needCompress(undoLogContent)) {             compressorType = ROLLBACK_INFO_COMPRESS_TYPE;             undoLogContent = CompressorFactory.getCompressor(compressorType.getCode()).compress(undoLogContent);         }          //插入UndoLog到数据库         insertUndoLogWithNormal(xid, branchId, buildContext(parser.getName(), compressorType), undoLogContent, cp.getTargetConnection());     }          //insert uodo log when normal     protected abstract void insertUndoLogWithNormal(String xid, long branchId, String rollbackCtx, byte[] undoLogContent, Connection conn) throws SQLException;     ... }  @LoadLevel(name = JdbcConstants.MYSQL) public class MySQLUndoLogManager extends AbstractUndoLogManager {     ...     @Override     protected void insertUndoLogWithNormal(String xid, long branchId, String rollbackCtx, byte[] undoLogContent, Connection conn) throws SQLException {         insertUndoLog(xid, branchId, rollbackCtx, undoLogContent, State.Normal, conn);     }          private void insertUndoLog(String xid, long branchId, String rollbackCtx, byte[] undoLogContent, State state, Connection conn) throws SQLException {         try (PreparedStatement pst = conn.prepareStatement(INSERT_UNDO_LOG_SQL)) {             pst.setLong(1, branchId);             pst.setString(2, xid);             pst.setString(3, rollbackCtx);             pst.setBytes(4, undoLogContent);             pst.setInt(5, state.getValue());             pst.executeUpdate();         } catch (Exception e) {             if (!(e instanceof SQLException)) {                 e = new SQLException(e);             }             throw (SQLException) e;         }     }     ... }

 

13.通过全局锁重试策略组件执行事务的提交

当设置完禁止自动提交事务、构建前镜像、执行SQL、构建后镜像,执行到数据源连接代理ConnectionProxy的commit()方法提交本地事务时,便会通过全局锁重试策略LockRetryPolicy来执行本地事务的提交。

 

全局锁重试策略LockRetryPolicy,会确保先获取到全局锁才提交本地事务。也就是如果获取不到全局锁,则重试获取。此外,注册分支事务时,获取到全局锁才能注册成功。

public class ConnectionProxy extends AbstractConnectionProxy {     private final LockRetryPolicy lockRetryPolicy = new LockRetryPolicy(this);     ...     @Override     public void commit() throws SQLException {         try {             //通过全局锁重试策略组件LockRetryPolicy来执行本地事务的提交             lockRetryPolicy.execute(() -> {                 doCommit();                 return null;             });         } catch (SQLException e) {             if (targetConnection != null && !getAutoCommit() && !getContext().isAutoCommitChanged()) {                 rollback();             }             throw e;         } catch (Exception e) {             throw new SQLException(e);         }     }     ...     public static class LockRetryPolicy {         protected static final boolean LOCK_RETRY_POLICY_BRANCH_ROLLBACK_ON_CONFLICT = ConfigurationFactory.getInstance().             getBoolean(ConfigurationKeys.CLIENT_LOCK_RETRY_POLICY_BRANCH_ROLLBACK_ON_CONFLICT, DEFAULT_CLIENT_LOCK_RETRY_POLICY_BRANCH_ROLLBACK_ON_CONFLICT);         protected final ConnectionProxy connection;         public LockRetryPolicy(ConnectionProxy connection) {             this.connection = connection;         }         public <T> T execute(Callable<T> callable) throws Exception {             //the only case that not need to retry acquire lock hear is             //LOCK_RETRY_POLICY_BRANCH_ROLLBACK_ON_CONFLICT == true && connection#autoCommit == true             //because it has retry acquire lock when AbstractDMLBaseExecutor#executeAutoCommitTrue             if (LOCK_RETRY_POLICY_BRANCH_ROLLBACK_ON_CONFLICT && connection.getContext().isAutoCommitChanged()) {                 //不需要重试                 return callable.call();             } else {                 //LOCK_RETRY_POLICY_BRANCH_ROLLBACK_ON_CONFLICT == false                 //or LOCK_RETRY_POLICY_BRANCH_ROLLBACK_ON_CONFLICT == true && autoCommit == false                 return doRetryOnLockConflict(callable);             }         }          protected <T> T doRetryOnLockConflict(Callable<T> callable) throws Exception {             LockRetryController lockRetryController = new LockRetryController();             while (true) {                 try {                     return callable.call();                 } catch (LockConflictException lockConflict) {                     onException(lockConflict);                     //AbstractDMLBaseExecutor#executeAutoCommitTrue the local lock is released                     if (connection.getContext().isAutoCommitChanged() && lockConflict.getCode() == TransactionExceptionCode.LockKeyConflictFailFast) {                         lockConflict.setCode(TransactionExceptionCode.LockKeyConflict);                     }                     //休眠一会再去重试                     lockRetryController.sleep(lockConflict);                 } catch (Exception e) {                     onException(e);                     throw e;                 }             }         }               //Callback on exception in doLockRetryOnConflict.         protected void onException(Exception e) throws Exception {         }     }     ... }  public abstract class AbstractDMLBaseExecutor<T, S extends Statement> extends BaseTransactionalExecutor<T, S> {     ...     private static class LockRetryPolicy extends ConnectionProxy.LockRetryPolicy {         LockRetryPolicy(final ConnectionProxy connection) {             super(connection);         }          @Override         public <T> T execute(Callable<T> callable) throws Exception {             if (LOCK_RETRY_POLICY_BRANCH_ROLLBACK_ON_CONFLICT) {                 return doRetryOnLockConflict(callable);             } else {                 return callable.call();             }         }          @Override         protected void onException(Exception e) throws Exception {             ConnectionContext context = connection.getContext();             //UndoItems can't use the Set collection class to prevent ABA             context.removeSavepoint(null);             //回滚目标数据源连接对SQL的执行             connection.getTargetConnection().rollback();         }          public static boolean isLockRetryPolicyBranchRollbackOnConflict() {             return LOCK_RETRY_POLICY_BRANCH_ROLLBACK_ON_CONFLICT;         }     }     ... }

 

14.注册分支事务时获取全局锁的入口源码

在Seata Server中,只有当全局锁获取成功后,分支事务才能注册成功。AbstractCore的branchRegister()方法会通过调用ATCore的branchSessionLock()方法来获取全局锁,而ATCore的branchSessionLock()方法最终则是靠调用AbstractLockManager的acquireLock()方法来尝试获取全局锁的。获取全局锁失败会抛出异常,说明注册分支事务失败。

public abstract class AbstractCore implements Core {     ...     //注册分支事务     @Override     public Long branchRegister(BranchType branchType, String resourceId, String clientId, String xid, String applicationData, String lockKeys) throws TransactionException {         //1.根据xid获取全局事务会话GlobalSession         GlobalSession globalSession = assertGlobalSessionNotNull(xid, false);         return SessionHolder.lockAndExecute(globalSession, () -> {             globalSessionStatusCheck(globalSession);             globalSession.addSessionLifecycleListener(SessionHolder.getRootSessionManager());              //2.创建分支事务会话,根据全局事务开启一个分支事务             //传入的参数依次是:全局事务会话、事务类型、资源ID、应用数据、全局锁keys、客户端ID             BranchSession branchSession = SessionHelper.newBranchByGlobal(globalSession, branchType, resourceId, applicationData, lockKeys, clientId);              //3.把分支事务的ID存放到线程本地变量副本中,也就是MDC中             MDC.put(RootContext.MDC_KEY_BRANCH_ID, String.valueOf(branchSession.getBranchId()));              //4.注册分支事务时会加全局锁             //分支事务会话branchSession尝试获取一个全局锁,获取失败会抛异常,说明分支事务注册失败             branchSessionLock(globalSession, branchSession);              try {                 //5.把分支事务会话加入到全局事务会话中                 globalSession.addBranch(branchSession);             } catch (RuntimeException ex) {                 branchSessionUnlock(branchSession);                 throw new BranchTransactionException(FailedToAddBranch, String.format("Failed to store branch xid = %s branchId = %s", globalSession.getXid(), branchSession.getBranchId()), ex);             }             if (LOGGER.isInfoEnabled()) {                 LOGGER.info("Register branch successfully, xid = {}, branchId = {}, resourceId = {} ,lockKeys = {}", globalSession.getXid(), branchSession.getBranchId(), resourceId, lockKeys);             }             return branchSession.getBranchId();         });     }     ... }  public class ATCore extends AbstractCore {     ...     @Override     protected void branchSessionLock(GlobalSession globalSession, BranchSession branchSession) throws TransactionException {         //从应用程序数据里提取出一些属性进行属性赋值         String applicationData = branchSession.getApplicationData();         boolean autoCommit = true;         boolean skipCheckLock = false;          if (StringUtils.isNotBlank(applicationData)) {             if (objectMapper == null) {                 objectMapper = new ObjectMapper();             }             try {                 //ObjectMapper是一个对象映射框架,它可以把ApplicationData对象里的属性值读取出来,然后写入到HashMap里                 Map<String, Object> data = objectMapper.readValue(applicationData, HashMap.class);                 Object clientAutoCommit = data.get(AUTO_COMMIT);                 if (clientAutoCommit != null && !(boolean)clientAutoCommit) {                     autoCommit = (boolean)clientAutoCommit;                 }                 Object clientSkipCheckLock = data.get(SKIP_CHECK_LOCK);                 if (clientSkipCheckLock instanceof Boolean) {                     skipCheckLock = (boolean)clientSkipCheckLock;                 }             } catch (IOException e) {                 LOGGER.error("failed to get application data: {}", e.getMessage(), e);             }         }          try {             //分支事务会话branchSession尝试获取一个全局锁,获取失败会抛异常,说明分支事务注册失败             if (!branchSession.lock(autoCommit, skipCheckLock)) {                 throw new BranchTransactionException(LockKeyConflict, String.format("Global lock acquire failed xid = %s branchId = %s", globalSession.getXid(), branchSession.getBranchId()));             }         } catch (StoreException e) {             if (e.getCause() instanceof BranchTransactionException) {                 throw new BranchTransactionException(((BranchTransactionException)e.getCause()).getCode(), String.format("Global lock acquire failed xid = %s branchId = %s", globalSession.getXid(), branchSession.getBranchId()));             }             throw e;         }     }     ... }  public class BranchSession implements Lockable, Comparable<BranchSession>, SessionStorable {     ...     public boolean lock(boolean autoCommit, boolean skipCheckLock) throws TransactionException {         if (this.getBranchType().equals(BranchType.AT)) {             //尝试获取全局锁             return LockerManagerFactory.getLockManager().acquireLock(this, autoCommit, skipCheckLock);         }         return true;     }     ... }  public class LockerManagerFactory {     private static final Configuration CONFIG = ConfigurationFactory.getInstance();     private static volatile LockManager LOCK_MANAGER;          public static LockManager getLockManager() {         if (LOCK_MANAGER == null) {             init();         }         return LOCK_MANAGER;     }          public static void init() {         init(null);     }          public static void init(String lockMode) {         if (LOCK_MANAGER == null) {             synchronized (LockerManagerFactory.class) {                 if (LOCK_MANAGER == null) {                     if (StringUtils.isBlank(lockMode)) {                         lockMode = CONFIG.getConfig(ConfigurationKeys.STORE_LOCK_MODE, CONFIG.getConfig(ConfigurationKeys.STORE_MODE, SERVER_DEFAULT_STORE_MODE));                     }                     if (StoreMode.contains(lockMode)) {                         LOCK_MANAGER = EnhancedServiceLoader.load(LockManager.class, lockMode);                     }                 }             }         }     } }  public abstract class AbstractLockManager implements LockManager {     ...     @Override     public boolean acquireLock(BranchSession branchSession, boolean autoCommit, boolean skipCheckLock) throws TransactionException {         if (branchSession == null) {             throw new IllegalArgumentException("branchSession can't be null for memory/file locker.");         }         String lockKey = branchSession.getLockKey();         if (StringUtils.isNullOrEmpty(lockKey)) {             //no lock             return true;         }         //get locks of branch         //获取到分支事务里需要的所有行锁         List<RowLock> locks = collectRowLocks(branchSession);         if (CollectionUtils.isEmpty(locks)) {             //no lock             return true;         }         //具体进行获取锁         return getLocker(branchSession).acquireLock(locks, autoCommit, skipCheckLock);     }          @Override     public List<RowLock> collectRowLocks(BranchSession branchSession) {         if (branchSession == null || StringUtils.isBlank(branchSession.getLockKey())) {             return Collections.emptyList();         }          String lockKey = branchSession.getLockKey();         String resourceId = branchSession.getResourceId();         String xid = branchSession.getXid();         long transactionId = branchSession.getTransactionId();         long branchId = branchSession.getBranchId();          return collectRowLocks(lockKey, resourceId, xid, transactionId, branchId);     }          protected List<RowLock> collectRowLocks(String lockKey, String resourceId, String xid, Long transactionId, Long branchID) {         List<RowLock> locks = new ArrayList<>();         String[] tableGroupedLockKeys = lockKey.split(";");         for (String tableGroupedLockKey : tableGroupedLockKeys) {             int idx = tableGroupedLockKey.indexOf(":");             if (idx < 0) {                 return locks;             }             String tableName = tableGroupedLockKey.substring(0, idx);             String mergedPKs = tableGroupedLockKey.substring(idx + 1);             if (StringUtils.isBlank(mergedPKs)) {                 return locks;             }             String[] pks = mergedPKs.split(",");             if (pks == null || pks.length == 0) {                 return locks;             }             for (String pk : pks) {                 if (StringUtils.isNotBlank(pk)) {                     RowLock rowLock = new RowLock();                     rowLock.setXid(xid);                     rowLock.setTransactionId(transactionId);                     rowLock.setBranchId(branchID);                     rowLock.setTableName(tableName);                     rowLock.setPk(pk);                     rowLock.setResourceId(resourceId);                     locks.add(rowLock);                 }             }         }         return locks;     }     ... }  public class RowLock {     private String xid;//全局事务xid     private Long transactionId;//全局事务ID         private Long branchId;//分支事务ID         private String resourceId;//资源ID     private String tableName;//表名称     private String pk;//主键         private String rowKey;//行键     private String feature;//功能特性     ... }

 

15.Seata Server获取全局锁的具体逻辑源码

调用AbstractLockManager的acquireLock()方法获取全局锁时,其实调用的是DataBaseLocker的acquireLock()方法 -> LockStoreDataBaseDAO的acquireLock()方法。

 

在LockStoreDataBaseDAO的acquireLock()方法中,首先会查询数据库中是否存在要申请的全局锁的记录,然后根据这些锁记录 + xid判断是否由当前全局事务获取的(这是核心)。

 

如果不是,则说明其他全局事务先获取到了要申请的全局锁,此时当前事务获取全局锁失败。

 

如果是,则把当前事务已经获取过的全局锁过滤出来,然后尝试写入当前分支事务还需获取的全局锁记录到数据库。如果写入成功,则表示当前分支事务成功获取到全局锁。如果写入失败,则表示其他分支事务已经获取到全局锁。

@LoadLevel(name = "db") public class DataBaseLockManager extends AbstractLockManager implements Initialize {     private Locker locker;          @Override     public void init() {         //init dataSource         String datasourceType = ConfigurationFactory.getInstance().getConfig(ConfigurationKeys.STORE_DB_DATASOURCE_TYPE);         DataSource lockStoreDataSource = EnhancedServiceLoader.load(DataSourceProvider.class, datasourceType).provide();         locker = new DataBaseLocker(lockStoreDataSource);     }      @Override     public Locker getLocker(BranchSession branchSession) {         return locker;     }     ... }  public class DataBaseLocker extends AbstractLocker {     private LockStore lockStore;          public DataBaseLocker(DataSource logStoreDataSource) {         lockStore = new LockStoreDataBaseDAO(logStoreDataSource);     }     ...          @Override     public boolean acquireLock(List<RowLock> locks, boolean autoCommit, boolean skipCheckLock) {         if (CollectionUtils.isEmpty(locks)) {             //no lock             return true;         }         try {             //通过执行MySQL来获取全局锁             return lockStore.acquireLock(convertToLockDO(locks), autoCommit, skipCheckLock);         } catch (StoreException e) {             throw e;         } catch (Exception t) {             LOGGER.error("AcquireLock error, locks:{}", CollectionUtils.toString(locks), t);             return false;         }     }     ... }  public class LockStoreDataBaseDAO implements LockStore {     ...     @Override     public boolean acquireLock(List<LockDO> lockDOs, boolean autoCommit, boolean skipCheckLock) {         //数据库操作三剑客:连接、句柄、结果         Connection conn = null;         PreparedStatement ps = null;         ResultSet rs = null;          Set<String> dbExistedRowKeys = new HashSet<>();         boolean originalAutoCommit = true;         if (lockDOs.size() > 1) {             lockDOs = lockDOs.stream().filter(LambdaUtils.distinctByKey(LockDO::getRowKey)).collect(Collectors.toList());         }         try {             //从全局锁数据源里获取到一个连接             conn = lockStoreDataSource.getConnection();             //关闭自动提交事务             if (originalAutoCommit = conn.getAutoCommit()) {                 conn.setAutoCommit(false);             }             //需要获取的锁,有可能多个             List<LockDO> unrepeatedLockDOs = lockDOs;              //check lock             if (!skipCheckLock) {                 boolean canLock = true;                  //query,针对全局锁表查询某个数据加了全局锁的全局事务xid                 //LockStoreSqlFactory是全局锁存储的SQL工厂                 String checkLockSQL = LockStoreSqlFactory.getLogStoreSql(dbType).getCheckLockableSql(lockTable, lockDOs.size());                 ps = conn.prepareStatement(checkLockSQL);                 for (int i = 0; i < lockDOs.size(); i++) {                     ps.setString(i + 1, lockDOs.get(i).getRowKey());                 }                 //执行查询                 rs = ps.executeQuery();                  //获取到当前要加全局锁的事务xid                 String currentXID = lockDOs.get(0).getXid();                 boolean failFast = false;                  //如果查询到的结果rs是空,则表示当前全局锁没有被事务获取占用                 while (rs.next()) {                     String dbXID = rs.getString(ServerTableColumnsName.LOCK_TABLE_XID);                      //如果获取到全局锁的是别的全局事务xid,那么获取全局锁失败,设置canLock为false                     if (!StringUtils.equals(dbXID, currentXID)) {                         if (LOGGER.isInfoEnabled()) {                             String dbPk = rs.getString(ServerTableColumnsName.LOCK_TABLE_PK);                             String dbTableName = rs.getString(ServerTableColumnsName.LOCK_TABLE_TABLE_NAME);                             long dbBranchId = rs.getLong(ServerTableColumnsName.LOCK_TABLE_BRANCH_ID);                             LOGGER.info("Global lock on [{}:{}] is holding by xid {} branchId {}", dbTableName, dbPk, dbXID, dbBranchId);                         }                         if (!autoCommit) {                             int status = rs.getInt(ServerTableColumnsName.LOCK_TABLE_STATUS);                             if (status == LockStatus.Rollbacking.getCode()) {                                 failFast = true;                             }                         }                         canLock = false;                         break;                     }                      dbExistedRowKeys.add(rs.getString(ServerTableColumnsName.LOCK_TABLE_ROW_KEY));                 }                  if (!canLock) {                     conn.rollback();                     if (failFast) {                         throw new StoreException(new BranchTransactionException(LockKeyConflictFailFast));                     }                     return false;                 }                  //If the lock has been exists in db, remove it from the lockDOs                 if (CollectionUtils.isNotEmpty(dbExistedRowKeys)) {                     //过滤当前事务已经获取过的全局锁                     unrepeatedLockDOs = lockDOs.stream().filter(lockDO -> !dbExistedRowKeys.contains(lockDO.getRowKey())).collect(Collectors.toList());                 }                 if (CollectionUtils.isEmpty(unrepeatedLockDOs)) {                     conn.rollback();                     return true;                 }             }              //lock             if (unrepeatedLockDOs.size() == 1) {                 LockDO lockDO = unrepeatedLockDOs.get(0);                 //尝试加锁,表示全局锁被当前的分支事务获取了                 if (!doAcquireLock(conn, lockDO)) {                     if (LOGGER.isInfoEnabled()) {                         LOGGER.info("Global lock acquire failed, xid {} branchId {} pk {}", lockDO.getXid(), lockDO.getBranchId(), lockDO.getPk());                     }                     conn.rollback();                     return false;                 }             } else {                 if (!doAcquireLocks(conn, unrepeatedLockDOs)) {                     if (LOGGER.isInfoEnabled()) {                         LOGGER.info("Global lock batch acquire failed, xid {} branchId {} pks {}", unrepeatedLockDOs.get(0).getXid(), unrepeatedLockDOs.get(0).getBranchId(), unrepeatedLockDOs.stream().map(lockDO -> lockDO.getPk()).collect(Collectors.toList()));                     }                     conn.rollback();                     return false;                 }             }             conn.commit();             return true;         } catch (SQLException e) {             throw new StoreException(e);         } finally {             IOUtil.close(rs, ps);             if (conn != null) {                 try {                     if (originalAutoCommit) {                         conn.setAutoCommit(true);                     }                     conn.close();                 } catch (SQLException e) {                 }             }         }     }          protected boolean doAcquireLock(Connection conn, LockDO lockDO) {         PreparedStatement ps = null;         try {             //insert             String insertLockSQL = LockStoreSqlFactory.getLogStoreSql(dbType).getInsertLockSQL(lockTable);             ps = conn.prepareStatement(insertLockSQL);             ps.setString(1, lockDO.getXid());//全局事务xid             ps.setLong(2, lockDO.getTransactionId());//全局事务id             ps.setLong(3, lockDO.getBranchId());//分支事务id             ps.setString(4, lockDO.getResourceId());//资源id             ps.setString(5, lockDO.getTableName());//表名称             ps.setString(6, lockDO.getPk());//主键             ps.setString(7, lockDO.getRowKey());//rowkey             ps.setInt(8, LockStatus.Locked.getCode());//locked             return ps.executeUpdate() > 0;         } catch (SQLException e) {             throw new StoreException(e);         } finally {             IOUtil.close(ps);         }     }     ... }

 

16.全局锁和分支事务及本地事务总结

获取到全局锁,才能注册分支事务成功,否则LockRetryPolicy重试。获取到全局锁,才能提交本地事务成功,否则LockRetryPolicy重试。

 

全局锁没有被其他事务(xid)获取,则当前事务(xid)才能获取全局锁成功。获取全局锁,会将当前分支事务申请全局锁的记录写入到数据库中。

 

17.提交全局事务以及提交各分支事务的源码

(1)Seata Client发起提交全局事务的请求

(2)Server向Client发送提交分支事务的请求

(3)Seata Client处理提交分支事务的请求

(4)全局事务的提交主要就是让各个分支事务把本地的UndoLog删除

 

(1)Seata Client发起提交全局事务的请求

-> TransactionalTemplate.execute()发起全局事务的提交 -> TransactionalTemplate.commitTransaction() -> DefaultGlobalTransaction.commit() -> DefaultTransactionManager.commit() -> DefaultTransactionManager.syncCall() -> TmNettyRemotingClient.sendSyncRequest() 把全局事务提交请求GlobalCommitRequest发送给Seata Server进行处理
//Template of executing business logic with a global transaction. 全局事务执行模版 public class TransactionalTemplate {     private static final Logger LOGGER = LoggerFactory.getLogger(TransactionalTemplate.class);          public Object execute(TransactionalExecutor business) throws Throwable {         //1.Get transactionInfo         TransactionInfo txInfo = business.getTransactionInfo();         if (txInfo == null) {             throw new ShouldNeverHappenException("transactionInfo does not exist");         }          //1.1 Get current transaction, if not null, the tx role is 'GlobalTransactionRole.Participant'.         //根据线程本地变量副本,获取当前线程本地变量副本里是否存在xid,如果存在则创建一个全局事务         //刚开始在开启一个全局事务的时候,是没有全局事务的         GlobalTransaction tx = GlobalTransactionContext.getCurrent();          //1.2 Handle the transaction propagation.         //从全局事务配置里,可以获取到全局事务的传播级别,默认是REQUIRED         //也就是如果存在一个全局事务,就直接执行业务;如果不存在一个全局事务,就开启一个新的全局事务;         Propagation propagation = txInfo.getPropagation();          //不同的全局事务传播级别,会采取不同的处理方式         //比如挂起当前事务 + 开启新的事务,或者是直接不使用事务执行业务,挂起其实就是解绑当前线程的xid         //可以通过@GlobalTransactional注解,定制业务方法的全局事务,比如指定业务方法全局事务的传播级别         SuspendedResourcesHolder suspendedResourcesHolder = null;         try {             switch (propagation) {                 ...             }              //1.3 If null, create new transaction with role 'GlobalTransactionRole.Launcher'.             if (tx == null) {                 tx = GlobalTransactionContext.createNew();             }              //set current tx config to holder             GlobalLockConfig previousConfig = replaceGlobalLockConfig(txInfo);              try {                 //2. If the tx role is 'GlobalTransactionRole.Launcher', send the request of beginTransaction to TC,                 //else do nothing. Of course, the hooks will still be triggered.                 //开启一个全局事务                 beginTransaction(txInfo, tx);                    Object rs;                 try {                     //Do Your Business                     //执行业务方法,把全局事务xid通过Dubbo RPC传递下去,开启并提交一个一个分支事务                     rs = business.execute();                 } catch (Throwable ex) {                     //3. The needed business exception to rollback.                     //发生异常时需要完成的事务                     completeTransactionAfterThrowing(txInfo, tx, ex);                     throw ex;                 }                  //4. everything is fine, commit.                 //如果一切执行正常就会在这里提交全局事务                 commitTransaction(tx);                  return rs;             } finally {                 //5. clear                 //执行一些全局事务完成后的回调,比如清理等工作                 resumeGlobalLockConfig(previousConfig);                 triggerAfterCompletion();                 cleanUp();             }         } finally {             //If the transaction is suspended, resume it.             if (suspendedResourcesHolder != null) {                 //如果之前挂起了一个全局事务,此时可以恢复这个全局事务                 tx.resume(suspendedResourcesHolder);             }         }     }          //提交事务     private void commitTransaction(GlobalTransaction tx) throws TransactionalExecutor.ExecutionException {         try {             triggerBeforeCommit();             tx.commit();             triggerAfterCommit();         } catch (TransactionException txe) {             // 4.1 Failed to commit             throw new TransactionalExecutor.ExecutionException(tx, txe, TransactionalExecutor.Code.CommitFailure);         }     }     ... }  //The type Default global transaction. 默认的全局事务 public class DefaultGlobalTransaction implements GlobalTransaction {     private TransactionManager transactionManager;          DefaultGlobalTransaction(String xid, GlobalStatus status, GlobalTransactionRole role) {         this.transactionManager = TransactionManagerHolder.get();//全局事务管理者         this.xid = xid;         this.status = status;         this.role = role;     }     ...     @Override     public void commit() throws TransactionException {         if (role == GlobalTransactionRole.Participant) {             //Participant has no responsibility of committing             if (LOGGER.isDebugEnabled()) {                 LOGGER.debug("Ignore Commit(): just involved in global transaction [{}]", xid);             }             return;         }         assertXIDNotNull();         int retry = COMMIT_RETRY_COUNT <= 0 ? DEFAULT_TM_COMMIT_RETRY_COUNT : COMMIT_RETRY_COUNT;         try {             while (retry > 0) {                 try {                     retry--;                     status = transactionManager.commit(xid);                     break;                 } catch (Throwable ex) {                     LOGGER.error("Failed to report global commit [{}],Retry Countdown: {}, reason: {}", this.getXid(), retry, ex.getMessage());                     if (retry == 0) {                         throw new TransactionException("Failed to report global commit", ex);                     }                 }             }         } finally {             if (xid.equals(RootContext.getXID())) {                 suspend();             }         }         if (LOGGER.isInfoEnabled()) {             LOGGER.info("[{}] commit status: {}", xid, status);         }     }     ... }  public class TransactionManagerHolder {     ...     private TransactionManagerHolder() {     }          private static class SingletonHolder {         private static TransactionManager INSTANCE = null;         static {             try {                 INSTANCE = EnhancedServiceLoader.load(TransactionManager.class);                 LOGGER.info("TransactionManager Singleton {}", INSTANCE);             } catch (Throwable anyEx) {                 LOGGER.error("Failed to load TransactionManager Singleton! ", anyEx);             }         }     }          //Get transaction manager.     public static TransactionManager get() {         if (SingletonHolder.INSTANCE == null) {             throw new ShouldNeverHappenException("TransactionManager is NOT ready!");         }         return SingletonHolder.INSTANCE;     }     ... }  public class DefaultTransactionManager implements TransactionManager {     ...     @Override     public GlobalStatus commit(String xid) throws TransactionException {         GlobalCommitRequest globalCommit = new GlobalCommitRequest();         globalCommit.setXid(xid);         GlobalCommitResponse response = (GlobalCommitResponse) syncCall(globalCommit);         return response.getGlobalStatus();     }          private AbstractTransactionResponse syncCall(AbstractTransactionRequest request) throws TransactionException {         try {             //TMNettyRemotingClient会和Seata Server基于Netty建立长连接             return (AbstractTransactionResponse) TmNettyRemotingClient.getInstance().sendSyncRequest(request);         } catch (TimeoutException toe) {             throw new TmTransactionException(TransactionExceptionCode.IO, "RPC timeout", toe);         }     }     ... }

(2)Server向Client发送提交分支事务的请求

ServerHandler的channelRead()方法会将收到的请求进行层层传递:首先交给DefaultCoordinator的onRequest()方法来进行处理,然后交给GlobalCommitRequest的handle()方法来进行处理,接着交给AbstractTCInboundHandler的handle()方法来进行处理,最后交给DefaultCoordinator的doGlobalCommit()方法来进行处理,也就是调用DefaultCore的commit()方法来提交全局事务。

public abstract class AbstractNettyRemotingServer extends AbstractNettyRemoting implements RemotingServer {     ...     @ChannelHandler.Sharable     class ServerHandler extends ChannelDuplexHandler {         @Override         public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception {             if (!(msg instanceof RpcMessage)) {                 return;             }             //接下来调用processMessage()方法对解码完毕的RpcMessage对象进行处理             processMessage(ctx, (RpcMessage) msg);         }         ...     }     ... }  public abstract class AbstractNettyRemoting implements Disposable {     ...     protected void processMessage(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception {         if (LOGGER.isDebugEnabled()) {             LOGGER.debug(String.format("%s msgId:%s, body:%s", this, rpcMessage.getId(), rpcMessage.getBody()));         }         Object body = rpcMessage.getBody();         if (body instanceof MessageTypeAware) {             MessageTypeAware messageTypeAware = (MessageTypeAware) body;             //根据消息类型获取到一个Pair对象,该Pair对象是由请求处理组件和请求处理线程池组成的             //processorTable里的内容,是NettyRemotingServer在初始化时,通过调用registerProcessor()方法put进去的             //所以下面的代码实际上会由ServerOnRequestProcessor的process()方法进行处理             final Pair<RemotingProcessor, ExecutorService> pair = this.processorTable.get((int) messageTypeAware.getTypeCode());             if (pair != null) {                 if (pair.getSecond() != null) {                     try {                         pair.getSecond().execute(() -> {                             try {                                 pair.getFirst().process(ctx, rpcMessage);                             } catch (Throwable th) {                                 LOGGER.error(FrameworkErrorCode.NetDispatch.getErrCode(), th.getMessage(), th);                             } finally {                                 MDC.clear();                             }                         });                     } catch (RejectedExecutionException e) {                         ...                     }                 } else {                     try {                         pair.getFirst().process(ctx, rpcMessage);                     } catch (Throwable th) {                         LOGGER.error(FrameworkErrorCode.NetDispatch.getErrCode(), th.getMessage(), th);                     }                 }             } else {                 LOGGER.error("This message type [{}] has no processor.", messageTypeAware.getTypeCode());             }         } else {             LOGGER.error("This rpcMessage body[{}] is not MessageTypeAware type.", body);         }     }     ... }  public class ServerOnRequestProcessor implements RemotingProcessor, Disposable {     private final RemotingServer remotingServer;     ...     @Override     public void process(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception {         if (ChannelManager.isRegistered(ctx.channel())) {             onRequestMessage(ctx, rpcMessage);         } else {             try {                 if (LOGGER.isInfoEnabled()) {                     LOGGER.info("closeChannelHandlerContext channel:" + ctx.channel());                 }                 ctx.disconnect();                 ctx.close();             } catch (Exception exx) {                 LOGGER.error(exx.getMessage());             }             if (LOGGER.isInfoEnabled()) {                 LOGGER.info(String.format("close a unhandled connection! [%s]", ctx.channel().toString()));             }         }     }          private void onRequestMessage(ChannelHandlerContext ctx, RpcMessage rpcMessage) {         Object message = rpcMessage.getBody();         //RpcContext线程本地变量副本         RpcContext rpcContext = ChannelManager.getContextFromIdentified(ctx.channel());         if (LOGGER.isDebugEnabled()) {             LOGGER.debug("server received:{},clientIp:{},vgroup:{}", message, NetUtil.toIpAddress(ctx.channel().remoteAddress()), rpcContext.getTransactionServiceGroup());         } else {             try {                 BatchLogHandler.INSTANCE.getLogQueue().put(message + ",clientIp:" + NetUtil.toIpAddress(ctx.channel().remoteAddress()) + ",vgroup:" + rpcContext.getTransactionServiceGroup());             } catch (InterruptedException e) {                 LOGGER.error("put message to logQueue error: {}", e.getMessage(), e);             }         }         if (!(message instanceof AbstractMessage)) {             return;         }         //the batch send request message         if (message instanceof MergedWarpMessage) {             ...         } else {             //the single send request message             final AbstractMessage msg = (AbstractMessage) message;             //最终调用到DefaultCoordinator的onRequest()方法来处理RpcMessage             AbstractResultMessage result = transactionMessageHandler.onRequest(msg, rpcContext);             //返回响应给客户端             remotingServer.sendAsyncResponse(rpcMessage, ctx.channel(), result);         }     }     ... }  public class DefaultCoordinator extends AbstractTCInboundHandler implements TransactionMessageHandler, Disposable {     ...     @Override     public AbstractResultMessage onRequest(AbstractMessage request, RpcContext context) {         if (!(request instanceof AbstractTransactionRequestToTC)) {             throw new IllegalArgumentException();         }         //传入的request其实就是客户端发送请求时的GlobalCommitRequest         AbstractTransactionRequestToTC transactionRequest = (AbstractTransactionRequestToTC) request;         transactionRequest.setTCInboundHandler(this);         return transactionRequest.handle(context);     }     ... }  public class GlobalCommitRequest extends AbstractGlobalEndRequest {     @Override     public short getTypeCode() {         return MessageType.TYPE_GLOBAL_COMMIT;     }          @Override     public AbstractTransactionResponse handle(RpcContext rpcContext) {         return handler.handle(this, rpcContext);     } }  public abstract class AbstractTCInboundHandler extends AbstractExceptionHandler implements TCInboundHandler {     ...     @Override     public GlobalCommitResponse handle(GlobalCommitRequest request, final RpcContext rpcContext) {         GlobalCommitResponse response = new GlobalCommitResponse();         response.setGlobalStatus(GlobalStatus.Committing);         exceptionHandleTemplate(new AbstractCallback<GlobalCommitRequest, GlobalCommitResponse>() {             @Override             public void execute(GlobalCommitRequest request, GlobalCommitResponse response) throws TransactionException {                 try {                     doGlobalCommit(request, response, rpcContext);                 } catch (StoreException e) {                     throw new TransactionException(TransactionExceptionCode.FailedStore, String.format("global commit request failed. xid=%s, msg=%s", request.getXid(), e.getMessage()), e);                 }             }                         @Override             public void onTransactionException(GlobalCommitRequest request, GlobalCommitResponse response, TransactionException tex) {                 super.onTransactionException(request, response, tex);                 checkTransactionStatus(request, response);             }              @Override             public void onException(GlobalCommitRequest request, GlobalCommitResponse response, Exception rex) {                 super.onException(request, response, rex);                 checkTransactionStatus(request, response);             }         }, request, response);         return response;     }          protected abstract void doGlobalCommit(GlobalCommitRequest request, GlobalCommitResponse response, RpcContext rpcContext) throws TransactionException;     ... }  public class DefaultCoordinator extends AbstractTCInboundHandler implements TransactionMessageHandler, Disposable {     private final DefaultCore core;     ...     @Override     protected void doGlobalCommit(GlobalCommitRequest request, GlobalCommitResponse response, RpcContext rpcContext) throws TransactionException {         MDC.put(RootContext.MDC_KEY_XID, request.getXid());         //调用DefaultCore.commit()方法提交全局事务         response.setGlobalStatus(core.commit(request.getXid()));     }     ... }

DefaultCore的commit()方法会调用DefaultCore的doGlobalCommit()方法,而doGlobalCommit()方法会获取全局事务的所有分支事务并进行遍历,然后把提交分支事务的请求BranchCommitRequest发送到Seata Client中。

public class DefaultCore implements Core {     ...     @Override     public GlobalStatus commit(String xid) throws TransactionException {         GlobalSession globalSession = SessionHolder.findGlobalSession(xid);         if (globalSession == null) {             return GlobalStatus.Finished;         }         globalSession.addSessionLifecycleListener(SessionHolder.getRootSessionManager());         //just lock changeStatus          boolean shouldCommit = SessionHolder.lockAndExecute(globalSession, () -> {             if (globalSession.getStatus() == GlobalStatus.Begin) {                 //Highlight: Firstly, close the session, then no more branch can be registered.                 globalSession.closeAndClean();                  if (globalSession.canBeCommittedAsync()) {                     globalSession.asyncCommit();                     MetricsPublisher.postSessionDoneEvent(globalSession, GlobalStatus.Committed, false, false);                     return false;                 } else {                     globalSession.changeGlobalStatus(GlobalStatus.Committing);                     return true;                 }             }             return false;         });          if (shouldCommit) {             boolean success = doGlobalCommit(globalSession, false);             //If successful and all remaining branches can be committed asynchronously, do async commit.             if (success && globalSession.hasBranch() && globalSession.canBeCommittedAsync()) {                 globalSession.asyncCommit();                 return GlobalStatus.Committed;             } else {                 return globalSession.getStatus();             }         } else {             return globalSession.getStatus() == GlobalStatus.AsyncCommitting ? GlobalStatus.Committed : globalSession.getStatus();         }     }      @Override     public boolean doGlobalCommit(GlobalSession globalSession, boolean retrying) throws TransactionException {         boolean success = true;         //start committing event         MetricsPublisher.postSessionDoingEvent(globalSession, retrying);          if (globalSession.isSaga()) {             success = getCore(BranchType.SAGA).doGlobalCommit(globalSession, retrying);         } else {             //获取到全局事务的所有分支事务,并进行遍历提交             Boolean result = SessionHelper.forEach(globalSession.getSortedBranches(), branchSession -> {                 //if not retrying, skip the canBeCommittedAsync branches                 if (!retrying && branchSession.canBeCommittedAsync()) {                     return CONTINUE;                 }                  BranchStatus currentStatus = branchSession.getStatus();                 if (currentStatus == BranchStatus.PhaseOne_Failed) {                     SessionHelper.removeBranch(globalSession, branchSession, !retrying);                     return CONTINUE;                 }                 try {                     //发送请求给Seata Client提交分支事务                     BranchStatus branchStatus = getCore(branchSession.getBranchType()).branchCommit(globalSession, branchSession);                     if (isXaerNotaTimeout(globalSession,branchStatus)) {                         LOGGER.info("Commit branch XAER_NOTA retry timeout, xid = {} branchId = {}", globalSession.getXid(), branchSession.getBranchId());                         branchStatus = BranchStatus.PhaseTwo_Committed;                     }                     switch (branchStatus) {                         case PhaseTwo_Committed:                             SessionHelper.removeBranch(globalSession, branchSession, !retrying);                             return CONTINUE;                         case PhaseTwo_CommitFailed_Unretryable:                             //not at branch                             SessionHelper.endCommitFailed(globalSession, retrying);                             LOGGER.error("Committing global transaction[{}] finally failed, caused by branch transaction[{}] commit failed.", globalSession.getXid(), branchSession.getBranchId());                             return false;                         default:                             if (!retrying) {                                 globalSession.queueToRetryCommit();                                 return false;                             }                             if (globalSession.canBeCommittedAsync()) {                                 LOGGER.error("Committing branch transaction[{}], status:{} and will retry later", branchSession.getBranchId(), branchStatus);                                 return CONTINUE;                             } else {                                 LOGGER.error("Committing global transaction[{}] failed, caused by branch transaction[{}] commit failed, will retry later.", globalSession.getXid(), branchSession.getBranchId());                                 return false;                             }                     }                 } catch (Exception ex) {                     StackTraceLogger.error(LOGGER, ex, "Committing branch transaction exception: {}", new String[] {branchSession.toString()});                     if (!retrying) {                         globalSession.queueToRetryCommit();                         throw new TransactionException(ex);                     }                 }                 return CONTINUE;             });             //Return if the result is not null             if (result != null) {                 return result;             }             //If has branch and not all remaining branches can be committed asynchronously,             //do print log and return false             if (globalSession.hasBranch() && !globalSession.canBeCommittedAsync()) {                 LOGGER.info("Committing global transaction is NOT done, xid = {}.", globalSession.getXid());                 return false;             }             if (!retrying) {                 //contains not AT branch                 globalSession.setStatus(GlobalStatus.Committed);             }         }         //if it succeeds and there is no branch, retrying=true is the asynchronous state when retrying. EndCommitted is         //executed to improve concurrency performance, and the global transaction ends..         if (success && globalSession.getBranchSessions().isEmpty()) {             SessionHelper.endCommitted(globalSession, retrying);             LOGGER.info("Committing global transaction is successfully done, xid = {}.", globalSession.getXid());         }         return success;     }     ... }  public abstract class AbstractCore implements Core {     protected RemotingServer remotingServer;     ...     @Override     public BranchStatus branchCommit(GlobalSession globalSession, BranchSession branchSession) throws TransactionException {         try {             BranchCommitRequest request = new BranchCommitRequest();             request.setXid(branchSession.getXid());             request.setBranchId(branchSession.getBranchId());             request.setResourceId(branchSession.getResourceId());             request.setApplicationData(branchSession.getApplicationData());             request.setBranchType(branchSession.getBranchType());             return branchCommitSend(request, globalSession, branchSession);         } catch (IOException | TimeoutException e) {             throw new BranchTransactionException(FailedToSendBranchCommitRequest, String.format("Send branch commit failed, xid = %s branchId = %s", branchSession.getXid(), branchSession.getBranchId()), e);         }     }          protected BranchStatus branchCommitSend(BranchCommitRequest request, GlobalSession globalSession, BranchSession branchSession) throws IOException, TimeoutException {         BranchCommitResponse response = (BranchCommitResponse) remotingServer.sendSyncRequest(branchSession.getResourceId(), branchSession.getClientId(), request);         return response.getBranchStatus();     }     ... }  public abstract class AbstractNettyRemotingServer extends AbstractNettyRemoting implements RemotingServer {     ...     @Override     public Object sendSyncRequest(Channel channel, Object msg) throws TimeoutException {         if (channel == null) {             throw new RuntimeException("client is not connected");         }         RpcMessage rpcMessage = buildRequestMessage(msg, ProtocolConstants.MSGTYPE_RESQUEST_SYNC);         return super.sendSync(channel, rpcMessage, NettyServerConfig.getRpcRequestTimeout());     }     ... }  public abstract class AbstractNettyRemoting implements Disposable {     ...     protected Object sendSync(Channel channel, RpcMessage rpcMessage, long timeoutMillis) throws TimeoutException {         if (timeoutMillis <= 0) {             throw new FrameworkException("timeout should more than 0ms");         }         if (channel == null) {             LOGGER.warn("sendSync nothing, caused by null channel.");             return null;         }          //把发送出去的请求封装到MessageFuture中,然后存放到futures这个Map里         MessageFuture messageFuture = new MessageFuture();         messageFuture.setRequestMessage(rpcMessage);         messageFuture.setTimeout(timeoutMillis);         futures.put(rpcMessage.getId(), messageFuture);          channelWritableCheck(channel, rpcMessage.getBody());          //获取远程地址         String remoteAddr = ChannelUtil.getAddressFromChannel(channel);         doBeforeRpcHooks(remoteAddr, rpcMessage);          //异步化发送数据,同时对发送结果添加监听器         //如果发送失败,则会对网络连接Channel进行销毁处理         channel.writeAndFlush(rpcMessage).addListener((ChannelFutureListener) future -> {             if (!future.isSuccess()) {                 MessageFuture messageFuture1 = futures.remove(rpcMessage.getId());                 if (messageFuture1 != null) {                     messageFuture1.setResultMessage(future.cause());                 }                 destroyChannel(future.channel());             }         });          try {             //然后通过请求响应组件MessageFuture同步等待Seata Server返回该请求的响应             Object result = messageFuture.get(timeoutMillis, TimeUnit.MILLISECONDS);             doAfterRpcHooks(remoteAddr, rpcMessage, result);             return result;         } catch (Exception exx) {             LOGGER.error("wait response error:{},ip:{},request:{}", exx.getMessage(), channel.remoteAddress(), rpcMessage.getBody());             if (exx instanceof TimeoutException) {                 throw (TimeoutException) exx;             } else {                 throw new RuntimeException(exx);             }         }     }     ... }

(3)Seata Client处理提交分支事务的请求

ClientHandler的channelRead()方法收到提交分支事务的请求后,会由RmBranchCommitProcessor的handleBranchCommit()方法进行处理。

-> AbstractRMHandler.onRequest() -> BranchCommitRequest.handle() -> AbstractRMHandler.handle() -> AbstractRMHandler.doBranchCommit() -> DataSourceManager.branchCommit() -> AsyncWorker.branchCommit()异步化提交分支事务
public abstract class AbstractNettyRemotingClient extends AbstractNettyRemoting implements RemotingClient {     ...     @Sharable     class ClientHandler extends ChannelDuplexHandler {         @Override         public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception {             if (!(msg instanceof RpcMessage)) {                 return;             }             processMessage(ctx, (RpcMessage) msg);         }         ...     }     ... }  public abstract class AbstractNettyRemoting implements Disposable {     ...     protected void processMessage(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception {         if (LOGGER.isDebugEnabled()) {             LOGGER.debug(String.format("%s msgId:%s, body:%s", this, rpcMessage.getId(), rpcMessage.getBody()));         }         Object body = rpcMessage.getBody();         if (body instanceof MessageTypeAware) {             MessageTypeAware messageTypeAware = (MessageTypeAware) body;             //根据消息类型获取到一个Pair对象,该Pair对象是由请求处理组件和请求处理线程池组成的             final Pair<RemotingProcessor, ExecutorService> pair = this.processorTable.get((int) messageTypeAware.getTypeCode());             if (pair != null) {                 if (pair.getSecond() != null) {                     try {                         pair.getSecond().execute(() -> {                             try {                                 pair.getFirst().process(ctx, rpcMessage);                             } catch (Throwable th) {                                 LOGGER.error(FrameworkErrorCode.NetDispatch.getErrCode(), th.getMessage(), th);                             } finally {                                 MDC.clear();                             }                         });                     } catch (RejectedExecutionException e) {                         ...                     }                 } else {                     try {                         pair.getFirst().process(ctx, rpcMessage);                     } catch (Throwable th) {                         LOGGER.error(FrameworkErrorCode.NetDispatch.getErrCode(), th.getMessage(), th);                     }                 }             } else {                 LOGGER.error("This message type [{}] has no processor.", messageTypeAware.getTypeCode());             }         } else {             LOGGER.error("This rpcMessage body[{}] is not MessageTypeAware type.", body);         }     }     ... }  public class RmBranchCommitProcessor implements RemotingProcessor {     ...     @Override     public void process(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception {         String remoteAddress = NetUtil.toStringAddress(ctx.channel().remoteAddress());         Object msg = rpcMessage.getBody();         if (LOGGER.isInfoEnabled()) {             LOGGER.info("rm client handle branch commit process:" + msg);         }         handleBranchCommit(rpcMessage, remoteAddress, (BranchCommitRequest) msg);     }      private void handleBranchCommit(RpcMessage request, String serverAddress, BranchCommitRequest branchCommitRequest) {         BranchCommitResponse resultMessage;         resultMessage = (BranchCommitResponse) handler.onRequest(branchCommitRequest, null);         if (LOGGER.isDebugEnabled()) {             LOGGER.debug("branch commit result:" + resultMessage);         }         try {             this.remotingClient.sendAsyncResponse(serverAddress, request, resultMessage);         } catch (Throwable throwable) {             LOGGER.error("branch commit error: {}", throwable.getMessage(), throwable);         }     }     ... }  public abstract class AbstractRMHandler extends AbstractExceptionHandler implements RMInboundHandler, TransactionMessageHandler {     ...     @Override     public AbstractResultMessage onRequest(AbstractMessage request, RpcContext context) {         if (!(request instanceof AbstractTransactionRequestToRM)) {             throw new IllegalArgumentException();         }         AbstractTransactionRequestToRM transactionRequest = (AbstractTransactionRequestToRM)request;         transactionRequest.setRMInboundMessageHandler(this);         return transactionRequest.handle(context);     }     ... }  public class BranchCommitRequest extends AbstractBranchEndRequest {     @Override     public short getTypeCode() {         return MessageType.TYPE_BRANCH_COMMIT;     }          @Override     public AbstractTransactionResponse handle(RpcContext rpcContext) {         return handler.handle(this);     } }  public abstract class AbstractRMHandler extends AbstractExceptionHandler implements RMInboundHandler, TransactionMessageHandler {     @Override     public BranchCommitResponse handle(BranchCommitRequest request) {         BranchCommitResponse response = new BranchCommitResponse();         exceptionHandleTemplate(new AbstractCallback<BranchCommitRequest, BranchCommitResponse>() {             @Override             public void execute(BranchCommitRequest request, BranchCommitResponse response) throws TransactionException {                 doBranchCommit(request, response);             }         }, request, response);         return response;     }          protected void doBranchCommit(BranchCommitRequest request, BranchCommitResponse response) throws TransactionException {         String xid = request.getXid();         long branchId = request.getBranchId();         String resourceId = request.getResourceId();         String applicationData = request.getApplicationData();         if (LOGGER.isInfoEnabled()) {             LOGGER.info("Branch committing: " + xid + " " + branchId + " " + resourceId + " " + applicationData);         }         BranchStatus status = getResourceManager().branchCommit(request.getBranchType(), xid, branchId, resourceId, applicationData);         response.setXid(xid);         response.setBranchId(branchId);         response.setBranchStatus(status);         if (LOGGER.isInfoEnabled()) {             LOGGER.info("Branch commit result: " + status);         }     }     ... }  //The type Data source manager. DataSourceManager是AT模式下的资源管理器 public class DataSourceManager extends AbstractResourceManager {     //异步化worker     private final AsyncWorker asyncWorker = new AsyncWorker(this);     ...     @Override     public BranchStatus branchCommit(BranchType branchType, String xid, long branchId, String resourceId, String applicationData) throws TransactionException {         //通过asyncWorker,异步化提交分支事务         return asyncWorker.branchCommit(xid, branchId, resourceId);     }     ... }

(4)全局事务的提交主要就是让各个分支事务把本地的UndoLog删除

public class AsyncWorker {     ...     public BranchStatus branchCommit(String xid, long branchId, String resourceId) {         Phase2Context context = new Phase2Context(xid, branchId, resourceId);         addToCommitQueue(context);         return BranchStatus.PhaseTwo_Committed;     }          private void addToCommitQueue(Phase2Context context) {         if (commitQueue.offer(context)) {             return;         }         CompletableFuture.runAsync(this::doBranchCommitSafely, scheduledExecutor).thenRun(() -> addToCommitQueue(context));     }          void doBranchCommitSafely() {         try {             doBranchCommit();         } catch (Throwable e) {             LOGGER.error("Exception occur when doing branch commit", e);         }     }          private void doBranchCommit() {         if (commitQueue.isEmpty()) {             return;         }         //transfer all context currently received to this list         List<Phase2Context> allContexts = new LinkedList<>();         commitQueue.drainTo(allContexts);         //group context by their resourceId         Map<String, List<Phase2Context>> groupedContexts = groupedByResourceId(allContexts);         groupedContexts.forEach(this::dealWithGroupedContexts);     }          private void dealWithGroupedContexts(String resourceId, List<Phase2Context> contexts) {         DataSourceProxy dataSourceProxy = dataSourceManager.get(resourceId);         if (dataSourceProxy == null) {             LOGGER.warn("failed to find resource for {} and requeue", resourceId);             addAllToCommitQueue(contexts);             return;         }          Connection conn = null;         try {             conn = dataSourceProxy.getPlainConnection();             UndoLogManager undoLogManager = UndoLogManagerFactory.getUndoLogManager(dataSourceProxy.getDbType());              //split contexts into several lists, with each list contain no more element than limit size             List<List<Phase2Context>> splitByLimit = Lists.partition(contexts, UNDOLOG_DELETE_LIMIT_SIZE);             //全局事务的提交,就是让各个分支事务把本地的undo logs删除掉即可             for (List<Phase2Context> partition : splitByLimit) {                 deleteUndoLog(conn, undoLogManager, partition);             }         } catch (SQLException sqlExx) {             addAllToCommitQueue(contexts);             LOGGER.error("failed to get connection for async committing on {} and requeue", resourceId, sqlExx);         } finally {             IOUtil.close(conn);         }     }     ... }

 

18.全局事务回滚的过程源码

全局事务的回滚流程和提交流程几乎一样:

一.Seata Client发起全局事务回滚请求

二.Server向Client发送分支事务回滚请求

三.Seata Client处理分支事务回滚的请求

 

举报
发表评论

评论已关闭。

相关文章

当前内容话题
  • 0