Seata源码—4.全局事务拦截与开启事务处理

大纲

1.Seata Server的启动入口的源码

2.Seata Server的网络服务器启动的源码

3.全局事务拦截器的核心变量

4.全局事务拦截器的初始化源码

5.全局事务拦截器的AOP切面拦截方法

6.通过全局事务执行模版来执行全局事务

7.获取xid构建全局事务实例与全局事务的传播级别

8.全局事务执行模版根据传播级别来执行业务

9.全局事务执行模版开启事务+提交事务+回滚事务

10.Seata Server集群的负载均衡机制实现源码

11.Seata Client向Seata Server发送请求的源码

12.Client将RpcMessage对象编码成字节数组

13.Server将字节数组解码成RpcMessage对象

14.Server处理已解码的RpcMessage对象的流程

15.Seata Server开启全局事务的流程源码

 

1.Seata Server的启动入口的源码

Seata源码—4.全局事务拦截与开启事务处理

代码位于seata-server模块下:

@SpringBootApplication(scanBasePackages = {"io.seata"}) public class ServerApplication {     public static void main(String[] args) throws IOException {         //run the spring-boot application         SpringApplication.run(ServerApplication.class, args);     } }  @Component public class ServerRunner implements CommandLineRunner, DisposableBean {     private static final Logger LOGGER = LoggerFactory.getLogger(ServerRunner.class);     private boolean started = Boolean.FALSE;     private static final List<Disposable> DISPOSABLE_LIST = new CopyOnWriteArrayList<>();      public static void addDisposable(Disposable disposable) {         DISPOSABLE_LIST.add(disposable);     }      @Override     public void run(String... args) {         try {             long start = System.currentTimeMillis();             Server.start(args);             started = true;              long cost = System.currentTimeMillis() - start;             LOGGER.info("seata server started in {} millSeconds", cost);         } catch (Throwable e) {             started = Boolean.FALSE;             LOGGER.error("seata server start error: {} ", e.getMessage(), e);             System.exit(-1);         }     }      public boolean started() {         return started;     }      @Override     public void destroy() throws Exception {         if (LOGGER.isDebugEnabled()) {             LOGGER.debug("destoryAll starting");         }         for (Disposable disposable : DISPOSABLE_LIST) {             disposable.destroy();         }         if (LOGGER.isDebugEnabled()) {             LOGGER.debug("destoryAll finish");         }     } }  public class Server {     //The entry point of application.     public static void start(String[] args) {         //create logger         final Logger logger = LoggerFactory.getLogger(Server.class);          //initialize the parameter parser         //Note that the parameter parser should always be the first line to execute.         //Because, here we need to parse the parameters needed for startup.         ParameterParser parameterParser = new ParameterParser(args);          //initialize the metrics         //Seata Server是支持metric指标采集功能的         MetricsManager.get().init();          System.setProperty(ConfigurationKeys.STORE_MODE, parameterParser.getStoreMode());          //Seata Server里的Netty服务器的IO线程池,最小50个,最大500个         ThreadPoolExecutor workingThreads = new ThreadPoolExecutor(             NettyServerConfig.getMinServerPoolSize(),             NettyServerConfig.getMaxServerPoolSize(),             NettyServerConfig.getKeepAliveTime(),             TimeUnit.SECONDS,             new LinkedBlockingQueue<>(NettyServerConfig.getMaxTaskQueueSize()),             new NamedThreadFactory("ServerHandlerThread", NettyServerConfig.getMaxServerPoolSize()),             new ThreadPoolExecutor.CallerRunsPolicy()         );          //创建一个Netty网络通信服务器         NettyRemotingServer nettyRemotingServer = new NettyRemotingServer(workingThreads);          UUIDGenerator.init(parameterParser.getServerNode());         //log store mode : file, db, redis         SessionHolder.init(parameterParser.getSessionStoreMode());         LockerManagerFactory.init(parameterParser.getLockStoreMode());                 //启动定时调度线程         DefaultCoordinator coordinator = DefaultCoordinator.getInstance(nettyRemotingServer);         coordinator.init();         nettyRemotingServer.setHandler(coordinator);          //let ServerRunner do destroy instead ShutdownHook, see https://github.com/seata/seata/issues/4028         ServerRunner.addDisposable(coordinator);          //127.0.0.1 and 0.0.0.0 are not valid here.         if (NetUtil.isValidIp(parameterParser.getHost(), false)) {             XID.setIpAddress(parameterParser.getHost());         } else {             String preferredNetworks = ConfigurationFactory.getInstance().getConfig(REGISTRY_PREFERED_NETWORKS);             if (StringUtils.isNotBlank(preferredNetworks)) {                 XID.setIpAddress(NetUtil.getLocalIp(preferredNetworks.split(REGEX_SPLIT_CHAR)));             } else {                 XID.setIpAddress(NetUtil.getLocalIp());             }         }                 //初始化Netty服务器         nettyRemotingServer.init();     } }

 

2.Seata Server的网络服务器启动的源码

Seata源码—4.全局事务拦截与开启事务处理

创建和启动Seata的网络服务器:

public class NettyRemotingServer extends AbstractNettyRemotingServer {     ...     //Instantiates a new Rpc remoting server. 创建Seata Server     public NettyRemotingServer(ThreadPoolExecutor messageExecutor) {         super(messageExecutor, new NettyServerConfig());     }          //启动Seata Server     @Override     public void init() {         //registry processor         registerProcessor();         if (initialized.compareAndSet(false, true)) {             super.init();         }     }          private void registerProcessor() {         //1.registry on request message processor         ServerOnRequestProcessor onRequestProcessor = new ServerOnRequestProcessor(this, getHandler());         ShutdownHook.getInstance().addDisposable(onRequestProcessor);         super.registerProcessor(MessageType.TYPE_BRANCH_REGISTER, onRequestProcessor, messageExecutor);         super.registerProcessor(MessageType.TYPE_BRANCH_STATUS_REPORT, onRequestProcessor, messageExecutor);         super.registerProcessor(MessageType.TYPE_GLOBAL_BEGIN, onRequestProcessor, messageExecutor);         super.registerProcessor(MessageType.TYPE_GLOBAL_COMMIT, onRequestProcessor, messageExecutor);         super.registerProcessor(MessageType.TYPE_GLOBAL_LOCK_QUERY, onRequestProcessor, messageExecutor);         super.registerProcessor(MessageType.TYPE_GLOBAL_REPORT, onRequestProcessor, messageExecutor);         super.registerProcessor(MessageType.TYPE_GLOBAL_ROLLBACK, onRequestProcessor, messageExecutor);         super.registerProcessor(MessageType.TYPE_GLOBAL_STATUS, onRequestProcessor, messageExecutor);         super.registerProcessor(MessageType.TYPE_SEATA_MERGE, onRequestProcessor, messageExecutor);         //2.registry on response message processor         ServerOnResponseProcessor onResponseProcessor = new ServerOnResponseProcessor(getHandler(), getFutures());         super.registerProcessor(MessageType.TYPE_BRANCH_COMMIT_RESULT, onResponseProcessor, branchResultMessageExecutor);         super.registerProcessor(MessageType.TYPE_BRANCH_ROLLBACK_RESULT, onResponseProcessor, branchResultMessageExecutor);         //3.registry rm message processor         RegRmProcessor regRmProcessor = new RegRmProcessor(this);         super.registerProcessor(MessageType.TYPE_REG_RM, regRmProcessor, messageExecutor);         //4.registry tm message processor         RegTmProcessor regTmProcessor = new RegTmProcessor(this);         super.registerProcessor(MessageType.TYPE_REG_CLT, regTmProcessor, null);         //5.registry heartbeat message processor         ServerHeartbeatProcessor heartbeatMessageProcessor = new ServerHeartbeatProcessor(this);         super.registerProcessor(MessageType.TYPE_HEARTBEAT_MSG, heartbeatMessageProcessor, null);     }     ... }  public abstract class AbstractNettyRemotingServer extends AbstractNettyRemoting implements RemotingServer {     private final NettyServerBootstrap serverBootstrap;     ...     public AbstractNettyRemotingServer(ThreadPoolExecutor messageExecutor, NettyServerConfig nettyServerConfig) {         super(messageExecutor);         //创建Netty Server         serverBootstrap = new NettyServerBootstrap(nettyServerConfig);         serverBootstrap.setChannelHandlers(new ServerHandler());     }          @Override     public void init() {         super.init();         //启动Netty Server         serverBootstrap.start();     }     ... }  public abstract class AbstractNettyRemoting implements Disposable {     //The Timer executor. 由单个线程进行调度的线程池     protected final ScheduledExecutorService timerExecutor = new ScheduledThreadPoolExecutor(1, new NamedThreadFactory("timeoutChecker", 1, true));     //The Message executor.     protected final ThreadPoolExecutor messageExecutor;     ...          public void init() {         //启动一个定时任务,每隔3秒检查发送的请求是否响应超时         timerExecutor.scheduleAtFixedRate(new Runnable() {             @Override             public void run() {                 for (Map.Entry<Integer, MessageFuture> entry : futures.entrySet()) {                     MessageFuture future = entry.getValue();                     if (future.isTimeout()) {                         futures.remove(entry.getKey());                         RpcMessage rpcMessage = future.getRequestMessage();                         future.setResultMessage(new TimeoutException(String.format("msgId: %s ,msgType: %s ,msg: %s ,request timeout", rpcMessage.getId(), String.valueOf(rpcMessage.getMessageType()), rpcMessage.getBody().toString())));                         if (LOGGER.isDebugEnabled()) {                             LOGGER.debug("timeout clear future: {}", entry.getValue().getRequestMessage().getBody());                         }                     }                 }                 nowMills = System.currentTimeMillis();             }         }, TIMEOUT_CHECK_INTERVAL, TIMEOUT_CHECK_INTERVAL, TimeUnit.MILLISECONDS);     }     ... }  public class NettyServerBootstrap implements RemotingBootstrap {     private final NettyServerConfig nettyServerConfig;     private final EventLoopGroup eventLoopGroupBoss;     private final EventLoopGroup eventLoopGroupWorker;     private final ServerBootstrap serverBootstrap = new ServerBootstrap();     private ChannelHandler[] channelHandlers;     private int listenPort;     private final AtomicBoolean initialized = new AtomicBoolean(false);      public NettyServerBootstrap(NettyServerConfig nettyServerConfig) {         this.nettyServerConfig = nettyServerConfig;         if (NettyServerConfig.enableEpoll()) {             this.eventLoopGroupBoss = new EpollEventLoopGroup(nettyServerConfig.getBossThreadSize(), new NamedThreadFactory(nettyServerConfig.getBossThreadPrefix(), nettyServerConfig.getBossThreadSize()));             this.eventLoopGroupWorker = new EpollEventLoopGroup(nettyServerConfig.getServerWorkerThreads(), new NamedThreadFactory(nettyServerConfig.getWorkerThreadPrefix(), nettyServerConfig.getServerWorkerThreads()));         } else {             this.eventLoopGroupBoss = new NioEventLoopGroup(nettyServerConfig.getBossThreadSize(), new NamedThreadFactory(nettyServerConfig.getBossThreadPrefix(), nettyServerConfig.getBossThreadSize()));             this.eventLoopGroupWorker = new NioEventLoopGroup(nettyServerConfig.getServerWorkerThreads(), new NamedThreadFactory(nettyServerConfig.getWorkerThreadPrefix(), nettyServerConfig.getServerWorkerThreads()));         }     }          //Sets channel handlers.     protected void setChannelHandlers(final ChannelHandler... handlers) {         if (handlers != null) {             channelHandlers = handlers;         }     }          @Override     public void start() {         this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupWorker)             .channel(NettyServerConfig.SERVER_CHANNEL_CLAZZ)             .option(ChannelOption.SO_BACKLOG, nettyServerConfig.getSoBackLogSize())             .option(ChannelOption.SO_REUSEADDR, true)             .childOption(ChannelOption.SO_KEEPALIVE, true)             .childOption(ChannelOption.TCP_NODELAY, true)             .childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSendBufSize())             .childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketResvBufSize())             .childOption(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(nettyServerConfig.getWriteBufferLowWaterMark(), nettyServerConfig.getWriteBufferHighWaterMark()))             .localAddress(new InetSocketAddress(getListenPort()))             .childHandler(new ChannelInitializer<SocketChannel>() {                 @Override                 public void initChannel(SocketChannel ch) {                     ch.pipeline().addLast(new IdleStateHandler(nettyServerConfig.getChannelMaxReadIdleSeconds(), 0, 0))                         .addLast(new ProtocolV1Decoder())                         .addLast(new ProtocolV1Encoder());                     if (channelHandlers != null) {                         addChannelPipelineLast(ch, channelHandlers);                     }                 }             }         );          try {             this.serverBootstrap.bind(getListenPort()).sync();             XID.setPort(getListenPort());             LOGGER.info("Server started, service listen port: {}", getListenPort());             RegistryFactory.getInstance().register(new InetSocketAddress(XID.getIpAddress(), XID.getPort()));             initialized.set(true);         } catch (SocketException se) {             throw new RuntimeException("Server start failed, the listen port: " + getListenPort(), se);         } catch (Exception exx) {             throw new RuntimeException("Server start failed", exx);         }     }     ... }  public class DefaultCoordinator extends AbstractTCInboundHandler implements TransactionMessageHandler, Disposable {     private RemotingServer remotingServer;     private final DefaultCore core;     private static volatile DefaultCoordinator instance;     private final ScheduledThreadPoolExecutor retryRollbacking = new ScheduledThreadPoolExecutor(1, new NamedThreadFactory(RETRY_ROLLBACKING, 1));     private final ScheduledThreadPoolExecutor retryCommitting = new ScheduledThreadPoolExecutor(1, new NamedThreadFactory(RETRY_COMMITTING, 1));     private final ScheduledThreadPoolExecutor asyncCommitting = new ScheduledThreadPoolExecutor(1, new NamedThreadFactory(ASYNC_COMMITTING, 1));     private final ScheduledThreadPoolExecutor timeoutCheck = new ScheduledThreadPoolExecutor(1, new NamedThreadFactory(TX_TIMEOUT_CHECK, 1));     private final ScheduledThreadPoolExecutor undoLogDelete = new ScheduledThreadPoolExecutor(1, new NamedThreadFactory(UNDOLOG_DELETE, 1));     ...          public static DefaultCoordinator getInstance(RemotingServer remotingServer) {         if (null == instance) {             synchronized (DefaultCoordinator.class) {                 if (null == instance) {                     instance = new DefaultCoordinator(remotingServer);                 }             }         }         return instance;     }          private DefaultCoordinator(RemotingServer remotingServer) {         if (remotingServer == null) {             throw new IllegalArgumentException("RemotingServer not allowed be null.");         }         this.remotingServer = remotingServer;         this.core = new DefaultCore(remotingServer);     }          public void init() {         retryRollbacking.scheduleAtFixedRate(             () -> SessionHolder.distributedLockAndExecute(RETRY_ROLLBACKING, this::handleRetryRollbacking),              0, ROLLBACKING_RETRY_PERIOD, TimeUnit.MILLISECONDS);          retryCommitting.scheduleAtFixedRate(             () -> SessionHolder.distributedLockAndExecute(RETRY_COMMITTING, this::handleRetryCommitting),              0, COMMITTING_RETRY_PERIOD, TimeUnit.MILLISECONDS);          asyncCommitting.scheduleAtFixedRate(             () -> SessionHolder.distributedLockAndExecute(ASYNC_COMMITTING, this::handleAsyncCommitting),              0, ASYNC_COMMITTING_RETRY_PERIOD, TimeUnit.MILLISECONDS);          timeoutCheck.scheduleAtFixedRate(             () -> SessionHolder.distributedLockAndExecute(TX_TIMEOUT_CHECK, this::timeoutCheck),              0, TIMEOUT_RETRY_PERIOD, TimeUnit.MILLISECONDS);          undoLogDelete.scheduleAtFixedRate(             () -> SessionHolder.distributedLockAndExecute(UNDOLOG_DELETE, this::undoLogDelete),             UNDO_LOG_DELAY_DELETE_PERIOD, UNDO_LOG_DELETE_PERIOD, TimeUnit.MILLISECONDS);     }     ... }

Seata Client的ClientHandler和Seata Server的ServerHandler:

public abstract class AbstractNettyRemotingServer extends AbstractNettyRemoting implements RemotingServer {     ...     @ChannelHandler.Sharable     class ServerHandler extends ChannelDuplexHandler {         //Channel read.         @Override         public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception {             if (!(msg instanceof RpcMessage)) {                 return;             }             //此时会把解码完毕的RpcMessage来进行处理             processMessage(ctx, (RpcMessage) msg);         }          @Override         public void channelWritabilityChanged(ChannelHandlerContext ctx) {             synchronized (lock) {                 if (ctx.channel().isWritable()) {                     lock.notifyAll();                 }             }             ctx.fireChannelWritabilityChanged();         }          //Channel inactive.         @Override         public void channelInactive(ChannelHandlerContext ctx) throws Exception {             debugLog("inactive:{}", ctx);             if (messageExecutor.isShutdown()) {                 return;             }             handleDisconnect(ctx);             super.channelInactive(ctx);         }          private void handleDisconnect(ChannelHandlerContext ctx) {             final String ipAndPort = NetUtil.toStringAddress(ctx.channel().remoteAddress());             RpcContext rpcContext = ChannelManager.getContextFromIdentified(ctx.channel());             if (LOGGER.isInfoEnabled()) {                 LOGGER.info(ipAndPort + " to server channel inactive.");             }             if (rpcContext != null && rpcContext.getClientRole() != null) {                 rpcContext.release();                 if (LOGGER.isInfoEnabled()) {                     LOGGER.info("remove channel:" + ctx.channel() + "context:" + rpcContext);                 }             } else {                 if (LOGGER.isInfoEnabled()) {                     LOGGER.info("remove unused channel:" + ctx.channel());                 }             }         }          //Exception caught.         @Override         public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {             try {                 if (cause instanceof DecoderException && null == ChannelManager.getContextFromIdentified(ctx.channel())) {                     return;                 }                 LOGGER.error("exceptionCaught:{}, channel:{}", cause.getMessage(), ctx.channel());                 super.exceptionCaught(ctx, cause);             } finally {                 ChannelManager.releaseRpcContext(ctx.channel());             }         }          //User event triggered.         @Override         public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {             if (evt instanceof IdleStateEvent) {                 debugLog("idle:{}", evt);                 IdleStateEvent idleStateEvent = (IdleStateEvent) evt;                 if (idleStateEvent.state() == IdleState.READER_IDLE) {                     if (LOGGER.isInfoEnabled()) {                         LOGGER.info("channel:" + ctx.channel() + " read idle.");                     }                     handleDisconnect(ctx);                     try {                         closeChannelHandlerContext(ctx);                     } catch (Exception e) {                         LOGGER.error(e.getMessage());                     }                 }             }         }          @Override         public void close(ChannelHandlerContext ctx, ChannelPromise future) throws Exception {             if (LOGGER.isInfoEnabled()) {                 LOGGER.info(ctx + " will closed");             }             super.close(ctx, future);         }     }     ... }  public abstract class AbstractNettyRemotingClient extends AbstractNettyRemoting implements RemotingClient {     ...     @Sharable     class ClientHandler extends ChannelDuplexHandler {         @Override         public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception {             if (!(msg instanceof RpcMessage)) {                 return;             }             processMessage(ctx, (RpcMessage) msg);         }          @Override         public void channelWritabilityChanged(ChannelHandlerContext ctx) {             synchronized (lock) {                 if (ctx.channel().isWritable()) {                     lock.notifyAll();                 }             }             ctx.fireChannelWritabilityChanged();         }          @Override         public void channelInactive(ChannelHandlerContext ctx) throws Exception {             if (messageExecutor.isShutdown()) {                 return;             }             if (LOGGER.isInfoEnabled()) {                 LOGGER.info("channel inactive: {}", ctx.channel());             }             clientChannelManager.releaseChannel(ctx.channel(), NetUtil.toStringAddress(ctx.channel().remoteAddress()));             super.channelInactive(ctx);         }          @Override         public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {             if (evt instanceof IdleStateEvent) {                 IdleStateEvent idleStateEvent = (IdleStateEvent) evt;                 if (idleStateEvent.state() == IdleState.READER_IDLE) {                     if (LOGGER.isInfoEnabled()) {                         LOGGER.info("channel {} read idle.", ctx.channel());                     }                     try {                         String serverAddress = NetUtil.toStringAddress(ctx.channel().remoteAddress());                         clientChannelManager.invalidateObject(serverAddress, ctx.channel());                     } catch (Exception exx) {                         LOGGER.error(exx.getMessage());                     } finally {                         clientChannelManager.releaseChannel(ctx.channel(), getAddressFromContext(ctx));                     }                 }                 if (idleStateEvent == IdleStateEvent.WRITER_IDLE_STATE_EVENT) {                     try {                         if (LOGGER.isDebugEnabled()) {                             LOGGER.debug("will send ping msg,channel {}", ctx.channel());                         }                         AbstractNettyRemotingClient.this.sendAsyncRequest(ctx.channel(), HeartbeatMessage.PING);                     } catch (Throwable throwable) {                         LOGGER.error("send request error: {}", throwable.getMessage(), throwable);                     }                 }             }         }          @Override         public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {             LOGGER.error(FrameworkErrorCode.ExceptionCaught.getErrCode(), NetUtil.toStringAddress(ctx.channel().remoteAddress()) + "connect exception. " + cause.getMessage(), cause);             clientChannelManager.releaseChannel(ctx.channel(), getAddressFromChannel(ctx.channel()));             if (LOGGER.isInfoEnabled()) {                 LOGGER.info("remove exception rm channel:{}", ctx.channel());             }             super.exceptionCaught(ctx, cause);         }          @Override         public void close(ChannelHandlerContext ctx, ChannelPromise future) throws Exception {             if (LOGGER.isInfoEnabled()) {                 LOGGER.info(ctx + " will closed");             }             super.close(ctx, future);         }     }     ... }

 

3.全局事务拦截器的核心变量

全局事务注解扫描器GlobalTransactionScanner的wrapIfNecessary()方法,如果发现Spring的Bean含有Seata的注解,就会为该Bean创建动态代理。

 

比如Spring的Bean添加了@GlobalTransactional注解,那么GlobalTransactionScanner类为这个Bean创建动态代理时,会使用全局事务拦截器GlobalTransactionalInterceptor来进行创建。

 

这样后续调用到这个Spring Bean的方法时,就会先调用GlobalTransactionInterceptor拦截器。

 

GlobalTransactionalInterceptor这个全局事务注解拦截器的核心变量如下:

一.TransactionalTemplate全局事务执行模版

二.GlobalLockTemplate全局锁管理模版

三.FailureHandler全局事务异常处理器

Seata源码—4.全局事务拦截与开启事务处理
//全局事务注解拦截器 public class GlobalTransactionalInterceptor implements ConfigurationChangeListener, MethodInterceptor, SeataInterceptor {     private static final Logger LOGGER = LoggerFactory.getLogger(GlobalTransactionalInterceptor.class);      //默认的全局事务异常处理组件     //如果全局事务出现开启、回滚、提交、重试异常时,就可以回调这个DefaultFailureHandlerImpl进行异常处理     private static final FailureHandler DEFAULT_FAIL_HANDLER = new DefaultFailureHandlerImpl();          //全局事务执行模版,用来管理全局事务的执行     private final TransactionalTemplate transactionalTemplate = new TransactionalTemplate();          //全局锁执行模版,用来实现不同全局事务间的写隔离     private final GlobalLockTemplate globalLockTemplate = new GlobalLockTemplate();      //真正的全局事务异常处理组件     private final FailureHandler failureHandler;      //是否禁用全局事务     private volatile boolean disable;      //全局事务拦截器的顺序     private int order;      //AOP切面全局事务核心配置,来自于全局事务注解     protected AspectTransactional aspectTransactional;      //全局事务降级检查的时间周期     private static int degradeCheckPeriod;      //是否开启全局事务的降级检查     private static volatile boolean degradeCheck;      //降级检查允许时间     private static int degradeCheckAllowTimes;      //降级次数     private static volatile Integer degradeNum = 0;          //reach达标次数     private static volatile Integer reachNum = 0;      //Guava提供的事件总线     private static final EventBus EVENT_BUS = new GuavaEventBus("degradeCheckEventBus", true);      //定时调度线程池     private static ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1, new NamedThreadFactory("degradeCheckWorker", 1, true));          //默认的全局事务超时时间     private static int defaultGlobalTransactionTimeout = 0;          ...     }

 

4.全局事务拦截器的初始化源码

全局事务拦截器GlobalTransactionalInterceptor进行初始化时,会设置全局事务的异常处理组件,设置默认的全局事务超时时间为60秒。

//全局事务注解扫描器 public class GlobalTransactionScanner extends AbstractAutoProxyCreator         implements ConfigurationChangeListener, InitializingBean, ApplicationContextAware, DisposableBean {     ...         //Spring AOP里对方法进行拦截的拦截器     private MethodInterceptor interceptor;          @Override     protected Object wrapIfNecessary(Object bean, String beanName, Object cacheKey) {         if (!doCheckers(bean, beanName)) {             return bean;         }         try {             synchronized (PROXYED_SET) {                 if (PROXYED_SET.contains(beanName)) {                     return bean;                 }                 interceptor = null;                  if (TCCBeanParserUtils.isTccAutoProxy(bean, beanName, applicationContext)) {                     //init tcc fence clean task if enable useTccFence                     TCCBeanParserUtils.initTccFenceCleanTask(TCCBeanParserUtils.getRemotingDesc(beanName), applicationContext);                     //TCC interceptor, proxy bean of sofa:reference/dubbo:reference, and LocalTCC                     interceptor = new TccActionInterceptor(TCCBeanParserUtils.getRemotingDesc(beanName));                     ConfigurationCache.addConfigListener(ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION, (ConfigurationChangeListener)interceptor);                 } else {                     //获取目标class的接口                     Class<?> serviceInterface = SpringProxyUtils.findTargetClass(bean);                     Class<?>[] interfacesIfJdk = SpringProxyUtils.findInterfaces(bean);                     //existsAnnotation()方法会判断Bean的Class或Method是否添加了@GlobalTransactional等注解                     if (!existsAnnotation(new Class[]{serviceInterface}) && !existsAnnotation(interfacesIfJdk)) {                         return bean;                     }                     if (globalTransactionalInterceptor == null) {                         //创建一个GlobalTransactionalInterceptor,即全局事务注解的拦截器                         globalTransactionalInterceptor = new GlobalTransactionalInterceptor(failureHandlerHook);                         ConfigurationCache.addConfigListener(ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION, (ConfigurationChangeListener)globalTransactionalInterceptor);                     }                     interceptor = globalTransactionalInterceptor;                 }                 LOGGER.info("Bean[{}] with name [{}] would use interceptor [{}]", bean.getClass().getName(), beanName, interceptor.getClass().getName());                 if (!AopUtils.isAopProxy(bean)) {//如果这个Bean并不是AOP代理                     //接下来会基于Spring的AbstractAutoProxyCreator创建针对目标Bean接口的动态代理                     //这样后续调用到目标Bean的方法,就会调用到GlobalTransactionInterceptor拦截器                     bean = super.wrapIfNecessary(bean, beanName, cacheKey);                 } else {                     AdvisedSupport advised = SpringProxyUtils.getAdvisedSupport(bean);                     Advisor[] advisor = buildAdvisors(beanName, getAdvicesAndAdvisorsForBean(null, null, null));                     int pos;                     for (Advisor avr : advisor) {                         // Find the position based on the advisor's order, and add to advisors by pos                         pos = findAddSeataAdvisorPosition(advised, avr);                         advised.addAdvisor(pos, avr);                     }                 }                 PROXYED_SET.add(beanName);                 return bean;             }         } catch (Exception exx) {             throw new RuntimeException(exx);         }     }     ... }  //全局事务拦截器 public class GlobalTransactionalInterceptor implements ConfigurationChangeListener, MethodInterceptor, SeataInterceptor {     //真正的全局事务异常处理组件     private final FailureHandler failureHandler;     //是否禁用全局事务     private volatile boolean disable;     //全局事务拦截器的顺序     private int order;     //是否开启全局事务的降级检查     private static volatile boolean degradeCheck;     //全局事务降级检查的时间周期     private static int degradeCheckPeriod;     //降级检查允许时间     private static int degradeCheckAllowTimes;     //默认的全局事务超时时间     private static int defaultGlobalTransactionTimeout = 0;     //Guava提供的事件总线     private static final EventBus EVENT_BUS = new GuavaEventBus("degradeCheckEventBus", true);     ...          //Instantiates a new Global transactional interceptor.     //实例化一个新的全局事务拦截器     public GlobalTransactionalInterceptor(FailureHandler failureHandler) {         this.failureHandler = failureHandler == null ? DEFAULT_FAIL_HANDLER : failureHandler;         this.disable = ConfigurationFactory.getInstance().getBoolean(ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION, DEFAULT_DISABLE_GLOBAL_TRANSACTION);         this.order = ConfigurationFactory.getInstance().getInt(ConfigurationKeys.TM_INTERCEPTOR_ORDER, TM_INTERCEPTOR_ORDER);         degradeCheck = ConfigurationFactory.getInstance().getBoolean(ConfigurationKeys.CLIENT_DEGRADE_CHECK, DEFAULT_TM_DEGRADE_CHECK);         if (degradeCheck) {             ConfigurationCache.addConfigListener(ConfigurationKeys.CLIENT_DEGRADE_CHECK, this);             degradeCheckPeriod = ConfigurationFactory.getInstance().getInt(ConfigurationKeys.CLIENT_DEGRADE_CHECK_PERIOD, DEFAULT_TM_DEGRADE_CHECK_PERIOD);             degradeCheckAllowTimes = ConfigurationFactory.getInstance().getInt(ConfigurationKeys.CLIENT_DEGRADE_CHECK_ALLOW_TIMES, DEFAULT_TM_DEGRADE_CHECK_ALLOW_TIMES);             EVENT_BUS.register(this);             if (degradeCheckPeriod > 0 && degradeCheckAllowTimes > 0) {                 startDegradeCheck();             }         }         this.initDefaultGlobalTransactionTimeout();     }          //初始化默认的全局事务超时时间,60s=1min     private void initDefaultGlobalTransactionTimeout() {         if (GlobalTransactionalInterceptor.defaultGlobalTransactionTimeout <= 0) {             int defaultGlobalTransactionTimeout;             try {                 defaultGlobalTransactionTimeout = ConfigurationFactory.getInstance().getInt(ConfigurationKeys.DEFAULT_GLOBAL_TRANSACTION_TIMEOUT, DEFAULT_GLOBAL_TRANSACTION_TIMEOUT);             } catch (Exception e) {                 LOGGER.error("Illegal global transaction timeout value: " + e.getMessage());                 defaultGlobalTransactionTimeout = DEFAULT_GLOBAL_TRANSACTION_TIMEOUT;             }             if (defaultGlobalTransactionTimeout <= 0) {                 LOGGER.warn("Global transaction timeout value '{}' is illegal, and has been reset to the default value '{}'", defaultGlobalTransactionTimeout, DEFAULT_GLOBAL_TRANSACTION_TIMEOUT);                 defaultGlobalTransactionTimeout = DEFAULT_GLOBAL_TRANSACTION_TIMEOUT;             }             GlobalTransactionalInterceptor.defaultGlobalTransactionTimeout = defaultGlobalTransactionTimeout;         }     }     ... }

 

5.全局事务拦截器的AOP切面拦截方法

如果调用添加了@GlobalTransactional注解的方法,就会执行GlobalTransactionalInterceptor的invoke()方法。

//全局事务拦截器 public class GlobalTransactionalInterceptor implements ConfigurationChangeListener, MethodInterceptor, SeataInterceptor {     //是否禁用全局事务     private volatile boolean disable;         //是否开启全局事务的降级检查     private static volatile boolean degradeCheck;     //降级次数     private static volatile Integer degradeNum = 0;     //降级检查允许时间     private static int degradeCheckAllowTimes;     //AOP切面全局事务核心配置,来自于全局事务注解     protected AspectTransactional aspectTransactional;     ...          //如果调用添加了@GlobalTransactional注解的方法,就会执行如下invoke()方法     @Override     public Object invoke(final MethodInvocation methodInvocation) throws Throwable {         //methodInvocation是一次方法调用         //通过methodInvocation的getThis()方法可以获取到被调用方法的对象         //通过AopUtils.getTargetClass()方法可以获取到对象对应的Class         Class<?> targetClass = methodInvocation.getThis() != null ? AopUtils.getTargetClass(methodInvocation.getThis()) : null;         //通过反射,获取到目标class中被调用的method方法         Method specificMethod = ClassUtils.getMostSpecificMethod(methodInvocation.getMethod(), targetClass);          //如果调用的目标method不为null         if (specificMethod != null && !specificMethod.getDeclaringClass().equals(Object.class)) {             //尝试寻找桥接方法bridgeMethod             final Method method = BridgeMethodResolver.findBridgedMethod(specificMethod);             //通过反射,获取被调用的目标方法的@GlobalTransactional注解             final GlobalTransactional globalTransactionalAnnotation = getAnnotation(method, targetClass, GlobalTransactional.class);             //通过反射,获取被调用目标方法的@GlobalLock注解             final GlobalLock globalLockAnnotation = getAnnotation(method, targetClass, GlobalLock.class);              //如果禁用了全局事务,或者开启了降级检查,同时降级次数大于了降级检查允许次数,那么localDisable就为true             //localDisable为true则表示全局事务被禁用了,此时就不可以开启全局事务了             boolean localDisable = disable || (degradeCheck && degradeNum >= degradeCheckAllowTimes);             //如果全局事务没有禁用             if (!localDisable) {                 //全局事务注解不为空,或者是AOP切面全局事务核心配置不为空                 if (globalTransactionalAnnotation != null || this.aspectTransactional != null) {                     AspectTransactional transactional;                     if (globalTransactionalAnnotation != null) {                         //创建全局事务AOP切面的核心配置AspectTransactional,配置数据会从全局事务注解里提取出来                         transactional = new AspectTransactional(                             globalTransactionalAnnotation.timeoutMills(),                             globalTransactionalAnnotation.name(),                             globalTransactionalAnnotation.rollbackFor(),                             globalTransactionalAnnotation.noRollbackForClassName(),                             globalTransactionalAnnotation.noRollbackFor(),                             globalTransactionalAnnotation.noRollbackForClassName(),                             globalTransactionalAnnotation.propagation(),                             globalTransactionalAnnotation.lockRetryInterval(),                             globalTransactionalAnnotation.lockRetryTimes()                         );                     } else {                         transactional = this.aspectTransactional;                     }                     //真正处理全局事务的入口                     return handleGlobalTransaction(methodInvocation, transactional);                 } else if (globalLockAnnotation != null) {                     return handleGlobalLock(methodInvocation, globalLockAnnotation);                 }             }         }          //直接运行目标方法         return methodInvocation.proceed();     }          //获取注解     public <T extends Annotation> T getAnnotation(Method method, Class<?> targetClass, Class<T> annotationClass) {         return Optional.ofNullable(method).map(m -> m.getAnnotation(annotationClass)).orElse(Optional.ofNullable(targetClass).map(t -> t.getAnnotation(annotationClass)).orElse(null));     }     ... }

 

6.通过全局事务执行模版来执行全局事务

GlobalTransactionInterceptor全局事务拦截器中会有一个全局事务执行模版的实例变量,这个全局事务执行模版TransactionalTemplate实例就是用来执行全局事务的。执行全局事务时,就会调用TransactionalTemplate的execute()方法。

//全局事务拦截器 public class GlobalTransactionalInterceptor implements ConfigurationChangeListener, MethodInterceptor, SeataInterceptor {     //全局事务执行模版,用来管理全局事务的执行     private final TransactionalTemplate transactionalTemplate = new TransactionalTemplate();     ...     //真正进行全局事务的处理     Object handleGlobalTransaction(final MethodInvocation methodInvocation, final AspectTransactional aspectTransactional) throws Throwable {         boolean succeed = true;         try {             //基于全局事务执行模版TransactionalTemplate,来执行全局事务             return transactionalTemplate.execute(new TransactionalExecutor() {                 //真正执行目标方法                 @Override                 public Object execute() throws Throwable {                     return methodInvocation.proceed();                 }                                  //根据全局事务注解可以获取到一个name,可以对目标方法进行格式化                 public String name() {                     String name = aspectTransactional.getName();                     if (!StringUtils.isNullOrEmpty(name)) {                         return name;                     }                     return formatMethod(methodInvocation.getMethod());                 }                                  //获取全局事务的信息                 @Override                 public TransactionInfo getTransactionInfo() {                     //reset the value of timeout                     int timeout = aspectTransactional.getTimeoutMills();                     if (timeout <= 0 || timeout == DEFAULT_GLOBAL_TRANSACTION_TIMEOUT) {                         timeout = defaultGlobalTransactionTimeout;                     }                     //封装一个全局事务信息实例TransactionInfo                     TransactionInfo transactionInfo = new TransactionInfo();                     transactionInfo.setTimeOut(timeout);//全局事务超时时间                     transactionInfo.setName(name());//全局事务名称                     transactionInfo.setPropagation(aspectTransactional.getPropagation());//全局事务传播级别                     transactionInfo.setLockRetryInterval(aspectTransactional.getLockRetryInterval());//全局锁获取重试间隔                     transactionInfo.setLockRetryTimes(aspectTransactional.getLockRetryTimes());//全局锁重试次数                     //全局事务回滚规则                     Set<RollbackRule> rollbackRules = new LinkedHashSet<>();                     for (Class<?> rbRule : aspectTransactional.getRollbackFor()) {                         rollbackRules.add(new RollbackRule(rbRule));                     }                     for (String rbRule : aspectTransactional.getRollbackForClassName()) {                         rollbackRules.add(new RollbackRule(rbRule));                     }                     for (Class<?> rbRule : aspectTransactional.getNoRollbackFor()) {                         rollbackRules.add(new NoRollbackRule(rbRule));                     }                     for (String rbRule : aspectTransactional.getNoRollbackForClassName()) {                         rollbackRules.add(new NoRollbackRule(rbRule));                     }                     transactionInfo.setRollbackRules(rollbackRules);                     return transactionInfo;                 }             });         } catch (TransactionalExecutor.ExecutionException e) {             ...         } finally {             if (degradeCheck) {                 EVENT_BUS.post(new DegradeCheckEvent(succeed));             }         }     }     ... }

 

7.获取xid构建全局事务实例与全局事务的传播级别

(1)从RootContext获取xid来构建全局事务实例

(2)全局事务的传播级别

 

(1)从RootContext获取xid来构建全局事务实例

RootContext会通过SPI机制加载ContextCore实例,比如FastThreadLocalContextCore实例、ThreadLocalContextCore实例。

 

而xid又会通过RootContext的bind()方法被put()到ContextCore实例中,也就是xid会被put()到ThreadLocal<Map<String, Object>>中,或者被put()到FastThreadLocal<Map<String, Object>>中。因此,通过RootContext的get()方法可以从ContextCore实例中获取当前线程的xid。

Seata源码—4.全局事务拦截与开启事务处理
//全局事务执行模版 public class TransactionalTemplate {     private static final Logger LOGGER = LoggerFactory.getLogger(TransactionalTemplate.class);          //Execute object.     public Object execute(TransactionalExecutor business) throws Throwable {         //1.Get transactionInfo         TransactionInfo txInfo = business.getTransactionInfo();         if (txInfo == null) {             throw new ShouldNeverHappenException("transactionInfo does not exist");         }          //1.1 Get current transaction, if not null, the tx role is 'GlobalTransactionRole.Participant'.         GlobalTransaction tx = GlobalTransactionContext.getCurrent();                //1.2 Handle the transaction propagation.         Propagation propagation = txInfo.getPropagation();         ...     }     ... }  //全局事务上下文 public class GlobalTransactionContext {     private GlobalTransactionContext() {     }          //Get GlobalTransaction instance bind on current thread.     public static GlobalTransaction getCurrent() {         String xid = RootContext.getXID();         if (xid == null) {             return null;         }         return new DefaultGlobalTransaction(xid, GlobalStatus.Begin, GlobalTransactionRole.Participant);     }     ... }  public class RootContext {     //通过SPI机制加载ContextCore实例,比如FastThreadLocalContextCore、ThreadLocalContextCore     //所以可以认为,xid是存放在ThreadLocal<Map<String, Object>>中的     private static ContextCore CONTEXT_HOLDER = ContextCoreLoader.load();     ...     private RootContext() {     }          //Gets xid.     @Nullable     public static String getXID() {         return (String) CONTEXT_HOLDER.get(KEY_XID);     }          //Bind xid.     public static void bind(@Nonnull String xid) {         if (StringUtils.isBlank(xid)) {             if (LOGGER.isDebugEnabled()) {                 LOGGER.debug("xid is blank, switch to unbind operation!");             }             unbind();         } else {             MDC.put(MDC_KEY_XID, xid);             if (LOGGER.isDebugEnabled()) {                 LOGGER.debug("bind {}", xid);             }             CONTEXT_HOLDER.put(KEY_XID, xid);         }     }     ... }

(2)全局事务的传播级别

全局事务的传播级别分别有:REQUIRED、REQUIRES_NEW、NOT_SUPPORTED、NEVER、SUPPORTS、MANDATORY。

//Propagation level of global transactions. //全局事务的传播级别 public enum Propagation {     //如果全局事务已经存在,此时会直接在当前的全局事务里继续去运行下去,后续运行的都是全局事务里的分支事务     //如果全局事务此时还不存在,就会开启一个新的全局事务来运行     //这种全局事务传播级别,就是REQUIRED     //The logic is similar to the following code:     //     if (tx == null) {     //         try {     //             tx = beginNewTransaction(); // begin new transaction, is not existing     //             Object rs = business.execute(); // execute with new transaction     //             commitTransaction(tx);     //             return rs;     //         } catch (Exception ex) {     //             rollbackTransaction(tx);     //             throw ex;     //         }     //     } else {     //         return business.execute(); // execute with current transaction     //     }     REQUIRED,      //如果全局事务已经存在,则先暂停该事务,然后开启一个新的全局事务来执行业务     //The logic is similar to the following code:     //     try {     //         if (tx != null) {     //             suspendedResource = suspendTransaction(tx); // suspend current transaction     //         }     //         try {     //             tx = beginNewTransaction(); // begin new transaction     //             Object rs = business.execute(); // execute with new transaction     //             commitTransaction(tx);     //             return rs;     //         } catch (Exception ex) {     //             rollbackTransaction(tx);     //             throw ex;     //         }     //     } finally {     //         if (suspendedResource != null) {     //             resumeTransaction(suspendedResource); // resume transaction     //         }     //     }     REQUIRES_NEW,      //如果全局事务已经存在,则先暂停该事务,然后不要使用全局事务来执行业务     //The logic is similar to the following code:     //     try {     //         if (tx != null) {     //             suspendedResource = suspendTransaction(tx); // suspend current transaction     //         }     //         return business.execute(); // execute without transaction     //     } finally {     //         if (suspendedResource != null) {     //             resumeTransaction(suspendedResource); // resume transaction     //         }     //     }     NOT_SUPPORTED,      //如果全局事务不存在,则不要使用全局事务来执行业务     //如果全局事务存在,则使用全局事务来执行业务     //The logic is similar to the following code:     //     if (tx != null) {     //         return business.execute(); // execute with current transaction     //     } else {     //         return business.execute(); // execute without transaction     //     }     SUPPORTS,      //如果全局事务存在,则抛异常     //如果全局事务不存在,则执行业务     //The logic is similar to the following code:     //     if (tx != null) {     //         throw new TransactionException("existing transaction");     //     }     //     return business.execute(); // execute without transaction     NEVER,      //如果全局事务不存在,则抛异常     //如果全局事务存在,则使用全局事务去执行业务     //The logic is similar to the following code:     //     if (tx == null) {     //         throw new TransactionException("not existing transaction");     //     }     //     return business.execute(); // execute with current transaction     MANDATORY }

 

8.全局事务执行模版根据传播级别来执行业务

//全局事务执行模版 public class TransactionalTemplate {     ...     //Execute object.     //通过全局事务生命周期管理组件执行全局事务     public Object execute(TransactionalExecutor business) throws Throwable {         //1.Get transactionInfo         TransactionInfo txInfo = business.getTransactionInfo();         if (txInfo == null) {             throw new ShouldNeverHappenException("transactionInfo does not exist");         }          //1.1 Get current transaction, if not null, the tx role is 'GlobalTransactionRole.Participant'.         //根据线程本地变量副本,获取当前线程本地变量副本里是否存在xid,如果存在则创建一个全局事务         //刚开始在开启一个全局事务的时候,是没有全局事务的         GlobalTransaction tx = GlobalTransactionContext.getCurrent();          //1.2 Handle the transaction propagation.         //从全局事务配置里,可以获取到全局事务的传播级别,默认是REQUIRED         //也就是如果存在一个全局事务,就直接执行业务;         //如果不存在一个全局事务,就开启一个新的全局事务;         Propagation propagation = txInfo.getPropagation();          //不同的全局事务传播级别,会采取不同的处理方式         //比如挂起当前事务 + 开启新的事务,或者是直接不使用事务执行业务,挂起其实就是解绑当前线程的xid         //可以通过@GlobalTransactional注解,定制业务方法的全局事务,比如指定业务方法全局事务的传播级别         SuspendedResourcesHolder suspendedResourcesHolder = null;         try {             switch (propagation) {                 case NOT_SUPPORTED:                     //If transaction is existing, suspend it.                     if (existingTransaction(tx)) {                         suspendedResourcesHolder = tx.suspend();                     }                     //Execute without transaction and return.                     return business.execute();                 case REQUIRES_NEW:                     //If transaction is existing, suspend it, and then begin new transaction.                     if (existingTransaction(tx)) {                         suspendedResourcesHolder = tx.suspend();                         tx = GlobalTransactionContext.createNew();                     }                     //Continue and execute with new transaction                     break;                 case SUPPORTS:                     //If transaction is not existing, execute without transaction.                     if (notExistingTransaction(tx)) {                         return business.execute();                     }                     //Continue and execute with new transaction                     break;                 case REQUIRED:                     //If current transaction is existing, execute with current transaction, else continue and execute with new transaction.                     break;                 case NEVER:                     //If transaction is existing, throw exception.                     if (existingTransaction(tx)) {                         throw new TransactionException(String.format("Existing transaction found for transaction marked with propagation 'never', xid = %s", tx.getXid()));                     } else {                         //Execute without transaction and return.                         return business.execute();                     }                 case MANDATORY:                     //If transaction is not existing, throw exception.                     if (notExistingTransaction(tx)) {                         throw new TransactionException("No existing transaction found for transaction marked with propagation 'mandatory'");                     }                     //Continue and execute with current transaction.                     break;                 default:                     throw new TransactionException("Not Supported Propagation:" + propagation);             }              //1.3 If null, create new transaction with role 'GlobalTransactionRole.Launcher'.             if (tx == null) {                 //如果xid为null,则会创建一个新的全局事务                 tx = GlobalTransactionContext.createNew();             }              //set current tx config to holder             GlobalLockConfig previousConfig = replaceGlobalLockConfig(txInfo);              try {                 //2. If the tx role is 'GlobalTransactionRole.Launcher', send the request of beginTransaction to TC,                 //else do nothing. Of course, the hooks will still be triggered.                 //开启一个全局事务                 beginTransaction(txInfo, tx);                  Object rs;                 try {                     //Do Your Business                     //执行业务方法,把全局事务xid通过Dubbo RPC传递下去,开启并执行一个一个分支事务                     rs = business.execute();                 } catch (Throwable ex) {                     //3. The needed business exception to rollback.                     //发生异常时需要完成的事务                     completeTransactionAfterThrowing(txInfo, tx, ex);                     throw ex;                 }                  //4. everything is fine, commit.                 //如果一切执行正常就会在这里提交全局事务                 commitTransaction(tx);                  return rs;             } finally {                 //5. clear                 //执行一些全局事务完成后的回调,比如清理等工作                 resumeGlobalLockConfig(previousConfig);                 triggerAfterCompletion();                 cleanUp();             }         } finally {             //If the transaction is suspended, resume it.             if (suspendedResourcesHolder != null) {                 //如果之前挂起了一个全局事务,此时可以恢复这个全局事务                 tx.resume(suspendedResourcesHolder);             }         }     }     ... }

 

9.全局事务执行模版开启事务+提交事务+回滚事务

(1)事务执行模版的开启事务+提交事务+回滚事务

(2)默认的全局事务和默认的事务管理器对事务的开启+提交+回滚的处理

Seata源码—4.全局事务拦截与开启事务处理

(1)事务执行模版的开启事务+提交事务+回滚事务

事务执行模版TransactionalTemplate在开启、提交、回滚事务时,会通过默认的全局事务DefaultGlobalTransaction来进行开启、提交、回滚事务。

//全局事务上下文 public class GlobalTransactionContext {     private GlobalTransactionContext() {     }          //Try to create a new GlobalTransaction.     //如果xid为null,则会创建一个新的全局事务     public static GlobalTransaction createNew() {         return new DefaultGlobalTransaction();     }     ... }  //默认的全局事务 public class DefaultGlobalTransaction implements GlobalTransaction {     private TransactionManager transactionManager;     private String xid;     private GlobalStatus status;     private GlobalTransactionRole role;     ...              //Instantiates a new Default global transaction.     DefaultGlobalTransaction() {         //全局事务角色是全局事务发起者         this(null, GlobalStatus.UnKnown, GlobalTransactionRole.Launcher);     }          //Instantiates a new Default global transaction.     DefaultGlobalTransaction(String xid, GlobalStatus status, GlobalTransactionRole role) {         this.transactionManager = TransactionManagerHolder.get();//全局事务管理者         this.xid = xid;         this.status = status;         this.role = role;     }     ... }  //全局事务执行模版 public class TransactionalTemplate {     ...     //开启事务     private void beginTransaction(TransactionInfo txInfo, GlobalTransaction tx) throws TransactionalExecutor.ExecutionException {         try {             //开启全局事务之前有一个回调的一个钩子名为triggerBeforeBegin()             triggerBeforeBegin();             //真正去开启一个全局事务             tx.begin(txInfo.getTimeOut(), txInfo.getName());             //开启全局事务之后还有一个回调钩子名为triggerAfterBegin()             triggerAfterBegin();         } catch (TransactionException txe) {             throw new TransactionalExecutor.ExecutionException(tx, txe, TransactionalExecutor.Code.BeginFailure);         }     }          private void triggerBeforeBegin() {         for (TransactionHook hook : getCurrentHooks()) {             try {                 hook.beforeBegin();             } catch (Exception e) {                 LOGGER.error("Failed execute beforeBegin in hook {}", e.getMessage(), e);             }         }     }          private void triggerAfterBegin() {         for (TransactionHook hook : getCurrentHooks()) {             try {                 hook.afterBegin();             } catch (Exception e) {                 LOGGER.error("Failed execute afterBegin in hook {}", e.getMessage(), e);             }         }     }     ...          //提交事务     private void commitTransaction(GlobalTransaction tx) throws TransactionalExecutor.ExecutionException {         try {             triggerBeforeCommit();             tx.commit();             triggerAfterCommit();         } catch (TransactionException txe) {             // 4.1 Failed to commit             throw new TransactionalExecutor.ExecutionException(tx, txe, TransactionalExecutor.Code.CommitFailure);         }     }          private void triggerBeforeCommit() {         for (TransactionHook hook : getCurrentHooks()) {             try {                 hook.beforeCommit();             } catch (Exception e) {                 LOGGER.error("Failed execute beforeCommit in hook {}", e.getMessage(), e);             }         }     }          private void triggerAfterCommit() {         for (TransactionHook hook : getCurrentHooks()) {             try {                 hook.afterCommit();             } catch (Exception e) {                 LOGGER.error("Failed execute afterCommit in hook {}", e.getMessage(), e);             }         }     }          private void triggerAfterCompletion() {         for (TransactionHook hook : getCurrentHooks()) {             try {                 hook.afterCompletion();             } catch (Exception e) {                 LOGGER.error("Failed execute afterCompletion in hook {}", e.getMessage(), e);             }         }     }     ...          //回滚事务     private void completeTransactionAfterThrowing(TransactionInfo txInfo, GlobalTransaction tx, Throwable originalException) throws TransactionalExecutor.ExecutionException {         //roll back         if (txInfo != null && txInfo.rollbackOn(originalException)) {             try {                 rollbackTransaction(tx, originalException);             } catch (TransactionException txe) {                 //Failed to rollback                 throw new TransactionalExecutor.ExecutionException(tx, txe, TransactionalExecutor.Code.RollbackFailure, originalException);             }         } else {             //not roll back on this exception, so commit             commitTransaction(tx);         }     }          private void rollbackTransaction(GlobalTransaction tx, Throwable originalException) throws TransactionException, TransactionalExecutor.ExecutionException {         triggerBeforeRollback();         tx.rollback();         triggerAfterRollback();         //3.1 Successfully rolled back         throw new TransactionalExecutor.ExecutionException(tx, GlobalStatus.RollbackRetrying.equals(tx.getLocalStatus()) ? TransactionalExecutor.Code.RollbackRetrying : TransactionalExecutor.Code.RollbackDone, originalException);     }          private void triggerBeforeRollback() {         for (TransactionHook hook : getCurrentHooks()) {             try {                 hook.beforeRollback();             } catch (Exception e) {                 LOGGER.error("Failed execute beforeRollback in hook {}", e.getMessage(), e);             }         }     }          private void triggerAfterRollback() {         for (TransactionHook hook : getCurrentHooks()) {             try {                 hook.afterRollback();             } catch (Exception e) {                 LOGGER.error("Failed execute afterRollback in hook {}", e.getMessage(), e);             }         }     }     ... }

(2)默认的全局事务和默认的事务管理器对事务的开启+提交+回滚的处理

默认的全局事务DefaultGlobalTransaction在进行开启、提交、回滚事务时,会由默认的事务管理器DefaultTransactionManager来开启、提交、回滚事务。

 

而默认的事务管理器DefaultTransactionManager在开启、提交、回滚事务时,最终都会执行其syncCall()方法发起一个同步调用,也就是通过TmNettyRemotingClient向Seata Server发送一个Netty请求。

//默认的全局事务 public class DefaultGlobalTransaction implements GlobalTransaction {     private TransactionManager transactionManager;     private String xid;     private GlobalStatus status;     private GlobalTransactionRole role;     ...           @Override     public void begin() throws TransactionException {         begin(DEFAULT_GLOBAL_TX_TIMEOUT);     }          @Override     public void begin(int timeout) throws TransactionException {         begin(timeout, DEFAULT_GLOBAL_TX_NAME);     }          @Override     public void begin(int timeout, String name) throws TransactionException {         if (role != GlobalTransactionRole.Launcher) {             assertXIDNotNull();             if (LOGGER.isDebugEnabled()) {                 LOGGER.debug("Ignore Begin(): just involved in global transaction [{}]", xid);             }             return;         }         assertXIDNull();         String currentXid = RootContext.getXID();         if (currentXid != null) {             throw new IllegalStateException("Global transaction already exists," + " can't begin a new global transaction, currentXid = " + currentXid);         }         //通过全局事务管理器去真正开启全局事务,一旦开启成功,就可以获取到一个xid         xid = transactionManager.begin(null, null, name, timeout);         status = GlobalStatus.Begin;         //把xid绑定到RootContext的线程本地变量副本里去         RootContext.bind(xid);         if (LOGGER.isInfoEnabled()) {             LOGGER.info("Begin new global transaction [{}]", xid);         }     }          @Override     public void commit() throws TransactionException {         if (role == GlobalTransactionRole.Participant) {             if (LOGGER.isDebugEnabled()) {                 LOGGER.debug("Ignore Commit(): just involved in global transaction [{}]", xid);             }             return;         }         assertXIDNotNull();         int retry = COMMIT_RETRY_COUNT <= 0 ? DEFAULT_TM_COMMIT_RETRY_COUNT : COMMIT_RETRY_COUNT;         try {             while (retry > 0) {                 try {                     retry--;                     status = transactionManager.commit(xid);                     break;                 } catch (Throwable ex) {                     LOGGER.error("Failed to report global commit [{}],Retry Countdown: {}, reason: {}", this.getXid(), retry, ex.getMessage());                     if (retry == 0) {                         throw new TransactionException("Failed to report global commit", ex);                     }                 }             }         } finally {             if (xid.equals(RootContext.getXID())) {                 suspend();             }         }         if (LOGGER.isInfoEnabled()) {             LOGGER.info("[{}] commit status: {}", xid, status);         }     }          @Override     public void rollback() throws TransactionException {         if (role == GlobalTransactionRole.Participant) {             if (LOGGER.isDebugEnabled()) {                 LOGGER.debug("Ignore Rollback(): just involved in global transaction [{}]", xid);             }             return;         }         assertXIDNotNull();         int retry = ROLLBACK_RETRY_COUNT <= 0 ? DEFAULT_TM_ROLLBACK_RETRY_COUNT : ROLLBACK_RETRY_COUNT;         try {             while (retry > 0) {                 try {                     retry--;                     status = transactionManager.rollback(xid);                     break;                 } catch (Throwable ex) {                     LOGGER.error("Failed to report global rollback [{}],Retry Countdown: {}, reason: {}", this.getXid(), retry, ex.getMessage());                     if (retry == 0) {                         throw new TransactionException("Failed to report global rollback", ex);                     }                 }             }         } finally {             if (xid.equals(RootContext.getXID())) {                 suspend();             }         }         if (LOGGER.isInfoEnabled()) {             LOGGER.info("[{}] rollback status: {}", xid, status);         }     }     ... }  public class RootContext {     private static ContextCore CONTEXT_HOLDER = ContextCoreLoader.load();     public static final String KEY_XID = "TX_XID";     ...          //Gets xid.     @Nullable     public static String getXID() {         return (String) CONTEXT_HOLDER.get(KEY_XID);     }          //Bind xid.     public static void bind(@Nonnull String xid) {         if (StringUtils.isBlank(xid)) {             if (LOGGER.isDebugEnabled()) {                 LOGGER.debug("xid is blank, switch to unbind operation!");             }             unbind();         } else {             MDC.put(MDC_KEY_XID, xid);             if (LOGGER.isDebugEnabled()) {                 LOGGER.debug("bind {}", xid);             }             CONTEXT_HOLDER.put(KEY_XID, xid);         }     }     ... }  //默认的全局事务管理器 public class DefaultTransactionManager implements TransactionManager {     @Override     public String begin(String applicationId, String transactionServiceGroup, String name, int timeout) throws TransactionException {         //构建一个全局事务开启请求GlobalBeginRequest         GlobalBeginRequest request = new GlobalBeginRequest();         request.setTransactionName(name);         request.setTimeout(timeout);         //发起一个同步调用         GlobalBeginResponse response = (GlobalBeginResponse) syncCall(request);         if (response.getResultCode() == ResultCode.Failed) {             throw new TmTransactionException(TransactionExceptionCode.BeginFailed, response.getMsg());         }         return response.getXid();     }          @Override     public GlobalStatus commit(String xid) throws TransactionException {         GlobalCommitRequest globalCommit = new GlobalCommitRequest();         globalCommit.setXid(xid);         GlobalCommitResponse response = (GlobalCommitResponse) syncCall(globalCommit);         return response.getGlobalStatus();     }          @Override     public GlobalStatus rollback(String xid) throws TransactionException {         GlobalRollbackRequest globalRollback = new GlobalRollbackRequest();         globalRollback.setXid(xid);         GlobalRollbackResponse response = (GlobalRollbackResponse) syncCall(globalRollback);         return response.getGlobalStatus();     }     ...     private AbstractTransactionResponse syncCall(AbstractTransactionRequest request) throws TransactionException {         try {             //TMNettyRemotingClient会和Seata Server基于Netty建立长连接             return (AbstractTransactionResponse) TmNettyRemotingClient.getInstance().sendSyncRequest(request);         } catch (TimeoutException toe) {             throw new TmTransactionException(TransactionExceptionCode.IO, "RPC timeout", toe);         }     } }  //GlobalBeginRequest会根据Seata的通信协议序列化成字节数组,然后通过Netty被发送到Seata Server中去 public class GlobalBeginRequest extends AbstractTransactionRequestToTC {     private int timeout = 60000;     private String transactionName;     ...          @Override     public short getTypeCode() {         return MessageType.TYPE_GLOBAL_BEGIN;     }          @Override     public AbstractTransactionResponse handle(RpcContext rpcContext) {         return handler.handle(this, rpcContext);     }     ... }

 

10.Seata Server集群的负载均衡机制实现源码

(1)通过负载均衡选择Seata Server节点

(2)Seata提供的负载均衡算法

 

(1)通过负载均衡选择Seata Server节点

默认的事务管理器DefaultTransactionManager在开启、提交、回滚事务时,最终都会执行其syncCall()方法发起一个同步调用,也就是通过TmNettyRemotingClient向Seata Server发送一个Netty请求。

 

syncCall()方法在调用TmNettyRemotingClient实例的sendSyncRequest()方法发送请求时,其实调用的是TmNettyRemotingClient的抽象父类AbstractNettyRemotingClient的sendSyncRequest()方法。

 

在sendSyncRequest()方法中,首先会调用AbstractNettyRemotingClient的loadBalance()方法进行负载均衡,也就是首先会调用AbstractNettyRemotingClient.doSelect()方法。

 

AbstractNettyRemotingClient的doSelect()方法会先通过LoadBalanceFactory工厂 + SPI来获取一个LoadBalance实例,然后再调用LoadBalance实例的select()方法来进行负载均衡。

 

负载均衡,其实就是从Seata Server节点中选择其中一个节点发送请求。

//默认的全局事务管理器 public class DefaultTransactionManager implements TransactionManager {     ...     private AbstractTransactionResponse syncCall(AbstractTransactionRequest request) throws TransactionException {         try {             //TMNettyRemotingClient会和Seata Server基于Netty建立长连接             return (AbstractTransactionResponse) TmNettyRemotingClient.getInstance().sendSyncRequest(request);         } catch (TimeoutException toe) {             throw new TmTransactionException(TransactionExceptionCode.IO, "RPC timeout", toe);         }     }     ... }  public abstract class AbstractNettyRemotingClient extends AbstractNettyRemoting implements RemotingClient {     ...     @Override     public Object sendSyncRequest(Object msg) throws TimeoutException {         //因为Seata Server是可以多节点部署实现高可用架构的,所以这里调用loadBalance()方法进行负载均衡         String serverAddress = loadBalance(getTransactionServiceGroup(), msg);         //获取RPC调用的超时时间         long timeoutMillis = this.getRpcRequestTimeout();         //构建一个RPC消息         RpcMessage rpcMessage = buildRequestMessage(msg, ProtocolConstants.MSGTYPE_RESQUEST_SYNC);          //send batch message         //put message into basketMap, @see MergedSendRunnable         //默认是不开启批量消息发送         if (this.isEnableClientBatchSendRequest()) {             ...         } else {             //通过网络连接管理器clientChannelManager,获取与指定Seata Server建立的网络连接Channel             //然后通过网络连接Channel把RpcMessage发送给Seata Server             Channel channel = clientChannelManager.acquireChannel(serverAddress);             return super.sendSync(channel, rpcMessage, timeoutMillis);         }     }          protected String loadBalance(String transactionServiceGroup, Object msg) {         InetSocketAddress address = null;         try {             @SuppressWarnings("unchecked")             List<InetSocketAddress> inetSocketAddressList = RegistryFactory.getInstance().aliveLookup(transactionServiceGroup);             address = this.doSelect(inetSocketAddressList, msg);         } catch (Exception ex) {             LOGGER.error(ex.getMessage());         }         if (address == null) {             throw new FrameworkException(NoAvailableService);         }         return NetUtil.toStringAddress(address);     }          protected InetSocketAddress doSelect(List<InetSocketAddress> list, Object msg) throws Exception {         if (CollectionUtils.isNotEmpty(list)) {             if (list.size() > 1) {                 return LoadBalanceFactory.getInstance().select(list, getXid(msg));             } else {                 return list.get(0);             }         }         return null;     }     ... }  public class LoadBalanceFactory {     ...     public static LoadBalance getInstance() {         //根据SPI机制获取LoadBalance实例         String config = ConfigurationFactory.getInstance().getConfig(LOAD_BALANCE_TYPE, DEFAULT_LOAD_BALANCE);         return EnhancedServiceLoader.load(LoadBalance.class, config);     } }

(2)Seata提供的负载均衡算法

轮询选择算法、随机选择算法、最少使用算法、一致性哈希算法。

 

一.轮询选择算法

@LoadLevel(name = ROUND_ROBIN_LOAD_BALANCE) public class RoundRobinLoadBalance implements LoadBalance {     private final AtomicInteger sequence = new AtomicInteger();          @Override     public <T> T select(List<T> invokers, String xid) {         int length = invokers.size();         //通过轮询选择Seata Server的节点         return invokers.get(getPositiveSequence() % length);     }          private int getPositiveSequence() {         for (;;) {             int current = sequence.get();             int next = current >= Integer.MAX_VALUE ? 0 : current + 1;             if (sequence.compareAndSet(current, next)) {                 return current;             }         }     } }

二.随机选择算法

@LoadLevel(name = RANDOM_LOAD_BALANCE) public class RandomLoadBalance implements LoadBalance {     @Override     public <T> T select(List<T> invokers, String xid) {         int length = invokers.size();         return invokers.get(ThreadLocalRandom.current().nextInt(length));     } }

三.最少使用算法

@LoadLevel(name = LEAST_ACTIVE_LOAD_BALANCE) public class LeastActiveLoadBalance implements LoadBalance {     @Override     public <T> T select(List<T> invokers, String xid) {         int length = invokers.size();         long leastActive = -1;         int leastCount = 0;         int[] leastIndexes = new int[length];         for (int i = 0; i < length; i++) {             long active = RpcStatus.getStatus(invokers.get(i).toString()).getActive();             if (leastActive == -1 || active < leastActive) {                 leastActive = active;                 leastCount = 1;                 leastIndexes[0] = i;             } else if (active == leastActive) {                 leastIndexes[leastCount++] = i;             }         }         if (leastCount == 1) {             return invokers.get(leastIndexes[0]);         }         return invokers.get(leastIndexes[ThreadLocalRandom.current().nextInt(leastCount)]);     } }

四.一致性哈希算法

@LoadLevel(name = CONSISTENT_HASH_LOAD_BALANCE) public class ConsistentHashLoadBalance implements LoadBalance {     public static final String LOAD_BALANCE_CONSISTENT_HASH_VISUAL_NODES = LOAD_BALANCE_PREFIX + "visualNodes";     private static final int VIRTUAL_NODES_NUM = ConfigurationFactory.getInstance().getInt(LOAD_BALANCE_CONSISTENT_HASH_VISUAL_NODES, VIRTUAL_NODES_DEFAULT);          @Override     public <T> T select(List<T> invokers, String xid) {         //通过一致性哈希选择节点         return new ConsistentHashSelector<>(invokers, VIRTUAL_NODES_NUM).select(xid);     }          private static final class ConsistentHashSelector<T> {         private final SortedMap<Long, T> virtualInvokers = new TreeMap<>();         private final HashFunction hashFunction = new MD5Hash();                  ConsistentHashSelector(List<T> invokers, int virtualNodes) {             for (T invoker : invokers) {                 for (int i = 0; i < virtualNodes; i++) {                     virtualInvokers.put(hashFunction.hash(invoker.toString() + i), invoker);                 }             }         }                 public T select(String objectKey) {             SortedMap<Long, T> tailMap = virtualInvokers.tailMap(hashFunction.hash(objectKey));             Long nodeHashVal = tailMap.isEmpty() ? virtualInvokers.firstKey() : tailMap.firstKey();             return virtualInvokers.get(nodeHashVal);         }     }          @SuppressWarnings("lgtm[java/weak-cryptographic-algorithm]")     private static class MD5Hash implements HashFunction {         MessageDigest instance;         public MD5Hash() {             try {                 instance = MessageDigest.getInstance("MD5");             } catch (NoSuchAlgorithmException e) {                 throw new IllegalStateException(e.getMessage(), e);             }         }                  @Override         public long hash(String key) {             instance.reset();             instance.update(key.getBytes());             byte[] digest = instance.digest();             long h = 0;             for (int i = 0; i < 4; i++) {                 h <<= 8;                 h |= ((int) digest[i]) & 0xFF;             }             return h;         }     }          public interface HashFunction {         long hash(String key);     } }

 

11.Seata Client向Seata Server发送请求的源码

首先Seata Client会通过网络连接管理器ClientChannelManager获取与指定Seata Server建立的网络连接Channel。

 

然后通过Netty的Channel把RpcMessage请求消息发送给Seata Server,也就是执行Channel的writeAndFlush()方法将RpcMessage请求消息异步发送给Seata Server。

 

其中,Seata Client会将发送的请求消息封装在一个MessageFuture实例中。并且,Seata Client会通过MessageFuture同步等待Seata Server返回该请求的响应。而MessageFuture请求响应组件是通过CompletableFuture实现同步等待的。

Seata源码—4.全局事务拦截与开启事务处理
public abstract class AbstractNettyRemotingClient extends AbstractNettyRemoting implements RemotingClient {     ...     @Override     public Object sendSyncRequest(Object msg) throws TimeoutException {         //因为Seata Server是可以多节点部署实现高可用架构的,所以这里调用loadBalance()方法进行负载均衡         String serverAddress = loadBalance(getTransactionServiceGroup(), msg);         //获取RPC调用的超时时间         long timeoutMillis = this.getRpcRequestTimeout();         //构建一个RPC消息         RpcMessage rpcMessage = buildRequestMessage(msg, ProtocolConstants.MSGTYPE_RESQUEST_SYNC);          //send batch message         //put message into basketMap, @see MergedSendRunnable         //默认是不开启批量消息发送         if (this.isEnableClientBatchSendRequest()) {             ...         } else {             //通过网络连接管理器clientChannelManager,获取与指定Seata Server建立的网络连接Channel             //然后通过网络连接Channel把RpcMessage发送给Seata Server             Channel channel = clientChannelManager.acquireChannel(serverAddress);             return super.sendSync(channel, rpcMessage, timeoutMillis);         }     }     ... }  public abstract class AbstractNettyRemoting implements Disposable {     ...     protected Object sendSync(Channel channel, RpcMessage rpcMessage, long timeoutMillis) throws TimeoutException {         if (timeoutMillis <= 0) {             throw new FrameworkException("timeout should more than 0ms");         }         if (channel == null) {             LOGGER.warn("sendSync nothing, caused by null channel.");             return null;         }          //把发送出去的请求封装到MessageFuture中,然后存放到futures这个Map里         MessageFuture messageFuture = new MessageFuture();         messageFuture.setRequestMessage(rpcMessage);         messageFuture.setTimeout(timeoutMillis);         futures.put(rpcMessage.getId(), messageFuture);          channelWritableCheck(channel, rpcMessage.getBody());          //获取远程地址         String remoteAddr = ChannelUtil.getAddressFromChannel(channel);         doBeforeRpcHooks(remoteAddr, rpcMessage);          //通过Netty的Channel异步化发送数据,同时对发送结果添加监听器         //如果发送失败,则会对网络连接Channel进行销毁处理         channel.writeAndFlush(rpcMessage).addListener((ChannelFutureListener) future -> {             if (!future.isSuccess()) {                 MessageFuture messageFuture1 = futures.remove(rpcMessage.getId());                 if (messageFuture1 != null) {                     messageFuture1.setResultMessage(future.cause());                 }                 destroyChannel(future.channel());             }         });          try {             //然后通过请求响应组件MessageFuture同步等待Seata Server返回该请求的响应             Object result = messageFuture.get(timeoutMillis, TimeUnit.MILLISECONDS);             doAfterRpcHooks(remoteAddr, rpcMessage, result);             return result;         } catch (Exception exx) {             LOGGER.error("wait response error:{},ip:{},request:{}", exx.getMessage(), channel.remoteAddress(), rpcMessage.getBody());             if (exx instanceof TimeoutException) {                 throw (TimeoutException) exx;             } else {                 throw new RuntimeException(exx);             }         }     }     ... }  public class MessageFuture {     private transient CompletableFuture<Object> origin = new CompletableFuture<>();     ...     public Object get(long timeout, TimeUnit unit) throws TimeoutException, InterruptedException {         Object result = null;         try {             result = origin.get(timeout, unit);             if (result instanceof TimeoutException) {                 throw (TimeoutException)result;             }         } catch (ExecutionException e) {             throw new ShouldNeverHappenException("Should not get results in a multi-threaded environment", e);         } catch (TimeoutException e) {             throw new TimeoutException(String.format("%s ,cost: %d ms", e.getMessage(), System.currentTimeMillis() - start));         }          if (result instanceof RuntimeException) {             throw (RuntimeException)result;         } else if (result instanceof Throwable) {             throw new RuntimeException((Throwable)result);         }          return result;     }     ... }

 

12.Client将RpcMessage对象编码成字节数组

Seata Client在调用Channel的writeAndFlush()方法将RpcMessage对象发送给Seata Server时,会先将RpcMessage对象交给NettyClientBootstrap的ChannelPipeline进行处理。其中,RpcMessage对象会被ProtocolV1Encoder编码成字节数组。

public class NettyClientBootstrap implements RemotingBootstrap {     ...     @Override     public void start() {         if (this.defaultEventExecutorGroup == null) {             this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(nettyClientConfig.getClientWorkerThreads(), new NamedThreadFactory(getThreadPrefix(nettyClientConfig.getClientWorkerThreadPrefix()), nettyClientConfig.getClientWorkerThreads()));         }          //基于Netty API构建一个Bootstrap         //设置好对应的NioEventLoopGroup线程池组,默认1个线程就够了         this.bootstrap.group(this.eventLoopGroupWorker)             .channel(nettyClientConfig.getClientChannelClazz())             .option(ChannelOption.TCP_NODELAY, true)             .option(ChannelOption.SO_KEEPALIVE, true)             .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, nettyClientConfig.getConnectTimeoutMillis())             .option(ChannelOption.SO_SNDBUF, nettyClientConfig.getClientSocketSndBufSize())             .option(ChannelOption.SO_RCVBUF, nettyClientConfig.getClientSocketRcvBufSize());          if (nettyClientConfig.enableNative()) {             if (PlatformDependent.isOsx()) {                 if (LOGGER.isInfoEnabled()) {                     LOGGER.info("client run on macOS");                 }             } else {                 bootstrap.option(EpollChannelOption.EPOLL_MODE, EpollMode.EDGE_TRIGGERED).option(EpollChannelOption.TCP_QUICKACK, true);             }         }          //对Netty网络通信数据处理组件pipeline进行初始化         bootstrap.handler(             new ChannelInitializer<SocketChannel>() {                 @Override                 public void initChannel(SocketChannel ch) {                     ChannelPipeline pipeline = ch.pipeline();                     //IdleStateHandler,空闲状态检查Handler                     //如果有数据通过就记录一下时间                     //如果超过很长时间没有数据通过,即处于空闲状态,那么就会触发一个user triggered event出去给ClientHandler来进行处理                     pipeline.addLast(new IdleStateHandler(                         nettyClientConfig.getChannelMaxReadIdleSeconds(),                         nettyClientConfig.getChannelMaxWriteIdleSeconds(),                         nettyClientConfig.getChannelMaxAllIdleSeconds()                     ))                     .addLast(new ProtocolV1Decoder())//基于Seata通信协议的编码器                     .addLast(new ProtocolV1Encoder());//基于Seata通信协议的解码器                     if (channelHandlers != null) {                         addChannelPipelineLast(ch, channelHandlers);                     }                 }             }         );          if (initialized.compareAndSet(false, true) && LOGGER.isInfoEnabled()) {             LOGGER.info("NettyClientBootstrap has started");         }     }     ... }  public class ProtocolV1Encoder extends MessageToByteEncoder {     private static final Logger LOGGER = LoggerFactory.getLogger(ProtocolV1Encoder.class);          @Override     public void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) {         try {             if (msg instanceof RpcMessage) {                 RpcMessage rpcMessage = (RpcMessage) msg;                               //完整的消息长度                 int fullLength = ProtocolConstants.V1_HEAD_LENGTH;                  //消息头的长度                 int headLength = ProtocolConstants.V1_HEAD_LENGTH;                  //获取消息类型                 byte messageType = rpcMessage.getMessageType();                  //先写入魔数MagicNumber,通过魔数代表一条消息的开始                 out.writeBytes(ProtocolConstants.MAGIC_CODE_BYTES);                  //然后写入版本号                 out.writeByte(ProtocolConstants.VERSION);                  //full Length(4B) and head length(2B) will fix in the end.                 //接着标记写入index的位置:当前写入的字节数 + 6,就是标记的writerIndex                 //可以理解为直接让writerIndex跳过了6字节,这6个字节的内容先空出来不写                 //最后写完具体的消息后,再把这6个字节代表的消息长度和消息头长度补回来                 //空出来的6个字节 = 4个字节的消息长度 + 2个字节的消息头长度                 out.writerIndex(out.writerIndex() + 6);                  //此时消息长度和消息头长度,还没统计出来,所以先跳过6个字节                 //也就是从版本号之后的第6个字节开始写:消息类型、codec、compressor                 out.writeByte(messageType);                 out.writeByte(rpcMessage.getCodec());                 out.writeByte(rpcMessage.getCompressor());                  //接着写入4个字节的消息ID                 out.writeInt(rpcMessage.getId());                  //direct write head with zero-copy                 //获取消息头                 Map<String, String> headMap = rpcMessage.getHeadMap();                 if (headMap != null && !headMap.isEmpty()) {                     //对消息头进行编码,把Map转换为字节数据写入到out里面,此时才是在写消息头                     //写完消息头之后,便可以获取到消息头长度headLength了                     int headMapBytesLength = HeadMapSerializer.getInstance().encode(headMap, out);                     headLength += headMapBytesLength;                     fullLength += headMapBytesLength;                 }                  byte[] bodyBytes = null;                 //根据消息类型对消息体进行序列化                 if (messageType != ProtocolConstants.MSGTYPE_HEARTBEAT_REQUEST && messageType != ProtocolConstants.MSGTYPE_HEARTBEAT_RESPONSE) {                     //heartbeat has no body                     //根据RpcMessage对象的codec属性通过SPI机制获取serializer序列化组件                     Serializer serializer = SerializerServiceLoader.load(SerializerType.getByCode(rpcMessage.getCodec()));                     //通过serializer对消息体进行序列化                     bodyBytes = serializer.serialize(rpcMessage.getBody());                      //根据RpcMessage对象的compressor属性通过SPI机制获取compressor压缩组件                     Compressor compressor = CompressorFactory.getCompressor(rpcMessage.getCompressor());                     //通过compressor对字节数组进行压缩                     bodyBytes = compressor.compress(bodyBytes);                     fullLength += bodyBytes.length;                 }                  if (bodyBytes != null) {                     out.writeBytes(bodyBytes);                 }                  //fix fullLength and headLength                 int writeIndex = out.writerIndex();                 //skip magic code(2B) + version(1B)                 out.writerIndex(writeIndex - fullLength + 3);                 out.writeInt(fullLength);                 out.writeShort(headLength);                 out.writerIndex(writeIndex);             } else {                 throw new UnsupportedOperationException("Not support this class:" + msg.getClass());             }         } catch (Throwable e) {             LOGGER.error("Encode request error!", e);         }     } }

 

13.Server将字节数组解码成RpcMessage对象

Seata Server收到Seata Client发来的字节数组时,会先将字节数组交给NettyServerBootstrap的ChannelPipeline进行处理。其中,字节数组会被ProtocolV1Decoder解码成RpcMessage对象。

public class NettyServerBootstrap implements RemotingBootstrap {     ...     @Override     public void start() {         this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupWorker)             .channel(NettyServerConfig.SERVER_CHANNEL_CLAZZ)             .option(ChannelOption.SO_BACKLOG, nettyServerConfig.getSoBackLogSize())             .option(ChannelOption.SO_REUSEADDR, true)             .childOption(ChannelOption.SO_KEEPALIVE, true)             .childOption(ChannelOption.TCP_NODELAY, true)             .childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSendBufSize())             .childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketResvBufSize())             .childOption(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(nettyServerConfig.getWriteBufferLowWaterMark(), nettyServerConfig.getWriteBufferHighWaterMark()))             .localAddress(new InetSocketAddress(getListenPort()))             .childHandler(new ChannelInitializer<SocketChannel>() {                 @Override                 public void initChannel(SocketChannel ch) {                     ch.pipeline().addLast(new IdleStateHandler(nettyServerConfig.getChannelMaxReadIdleSeconds(), 0, 0))                         .addLast(new ProtocolV1Decoder())                         .addLast(new ProtocolV1Encoder());                     if (channelHandlers != null) {                         addChannelPipelineLast(ch, channelHandlers);                     }                 }             }         );         try {             this.serverBootstrap.bind(getListenPort()).sync();             XID.setPort(getListenPort());             LOGGER.info("Server started, service listen port: {}", getListenPort());             RegistryFactory.getInstance().register(new InetSocketAddress(XID.getIpAddress(), XID.getPort()));             initialized.set(true);         } catch (SocketException se) {             throw new RuntimeException("Server start failed, the listen port: " + getListenPort(), se);         } catch (Exception exx) {             throw new RuntimeException("Server start failed", exx);         }     }     ... }  public class ProtocolV1Decoder extends LengthFieldBasedFrameDecoder {     private static final Logger LOGGER = LoggerFactory.getLogger(ProtocolV1Decoder.class);          //为了解决粘包和拆包的问题,这里基于LengthFieldBasedFrameDecoder按照整帧来进行解码     public ProtocolV1Decoder() {         // default is 8M         this(ProtocolConstants.MAX_FRAME_LENGTH);     }          public ProtocolV1Decoder(int maxFrameLength) {         //最大的帧长度是8M,所以一个消息数据不能超过8M         //开头是2个字节的魔数、1个字节的版本号、然后第4个字节开始是4个字节的FullLength         super(maxFrameLength, 3, 4, -7, 0);     }          //每一个整帧解出来之后,就可以通过decode()方法,把字节数组转为RpcMessage对象     @Override     protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {         Object decoded;         try {             //调用decode()方法进行解帧             decoded = super.decode(ctx, in);             if (decoded instanceof ByteBuf) {                 ByteBuf frame = (ByteBuf)decoded;                 try {                     return decodeFrame(frame);                 } finally {                     frame.release();                 }             }         } catch (Exception exx) {             LOGGER.error("Decode frame error, cause: {}", exx.getMessage());             throw new DecodeException(exx);         }         return decoded;     }          public Object decodeFrame(ByteBuf frame) {         //开头两个byte是魔数         byte b0 = frame.readByte();         byte b1 = frame.readByte();         if (ProtocolConstants.MAGIC_CODE_BYTES[0] != b0 || ProtocolConstants.MAGIC_CODE_BYTES[1] != b1) {             throw new IllegalArgumentException("Unknown magic code: " + b0 + ", " + b1);         }          //获取到version版本号         byte version = frame.readByte();         int fullLength = frame.readInt();         short headLength = frame.readShort();         byte messageType = frame.readByte();         byte codecType = frame.readByte();         byte compressorType = frame.readByte();         int requestId = frame.readInt();          RpcMessage rpcMessage = new RpcMessage();         rpcMessage.setCodec(codecType);         rpcMessage.setId(requestId);         rpcMessage.setCompressor(compressorType);         rpcMessage.setMessageType(messageType);          //direct read head with zero-copy         int headMapLength = headLength - ProtocolConstants.V1_HEAD_LENGTH;         if (headMapLength > 0) {             Map<String, String> map = HeadMapSerializer.getInstance().decode(frame, headMapLength);             rpcMessage.getHeadMap().putAll(map);         }          //read body         if (messageType == ProtocolConstants.MSGTYPE_HEARTBEAT_REQUEST) {             rpcMessage.setBody(HeartbeatMessage.PING);         } else if (messageType == ProtocolConstants.MSGTYPE_HEARTBEAT_RESPONSE) {             rpcMessage.setBody(HeartbeatMessage.PONG);         } else {             int bodyLength = fullLength - headLength;             if (bodyLength > 0) {                 byte[] bs = new byte[bodyLength];                 frame.readBytes(bs);                 //先获取到压缩组件,对消息体字节数组进行解压缩                 Compressor compressor = CompressorFactory.getCompressor(compressorType);                 bs = compressor.decompress(bs);                 //然后对解压缩完的数据,根据序列化类型进行反序列化,获取到消息体对象                 Serializer serializer = SerializerServiceLoader.load(SerializerType.getByCode(rpcMessage.getCodec()));                 rpcMessage.setBody(serializer.deserialize(bs));             }         }         return rpcMessage;     } }

 

14.Server处理已解码的RpcMessage对象的流程

Seata Server将收到的网络请求字节数组解码成RpcMessage对象后,便会将RpcMessage对象交给NettyServerBootstrap的ServerHandler进行处理,也就是交给ServerHandler的channelRead()方法进行处理。

 

ServerHandler的channelRead()方法会调用AbstractNettyRemoting的processMessage()方法,也就是调用ServerOnRequestProcessor的process()方法来实现对RpcMessage对象的处理。

 

在ServerOnRequestProcessor的process()方法的处理过程中,会调用TransactionMessageHandler的onRequest()方法处理RpcMessage对象。

 

由于Server.start()初始化NettyRemotingServer时,设置了TransactionMessageHandler为DefaultCoordinator,所以最终就会调用DefaultCoordinator的onRequest()方法来处理RpcMessage对象。

Seata源码—4.全局事务拦截与开启事务处理
public abstract class AbstractNettyRemotingServer extends AbstractNettyRemoting implements RemotingServer {     ...     @ChannelHandler.Sharable     class ServerHandler extends ChannelDuplexHandler {         //Channel read.         @Override         public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception {             if (!(msg instanceof RpcMessage)) {                 return;             }             //接下来调用processMessage()方法对解码完毕的RpcMessage对象进行处理             processMessage(ctx, (RpcMessage) msg);         }         ...     }     ... }  public abstract class AbstractNettyRemoting implements Disposable {     ...     protected void processMessage(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception {         if (LOGGER.isDebugEnabled()) {             LOGGER.debug(String.format("%s msgId:%s, body:%s", this, rpcMessage.getId(), rpcMessage.getBody()));         }         Object body = rpcMessage.getBody();         if (body instanceof MessageTypeAware) {             MessageTypeAware messageTypeAware = (MessageTypeAware) body;             //根据消息类型获取到一个Pair对象,该Pair对象是由请求处理组件和请求处理线程池组成的             //processorTable里的内容,是NettyRemotingServer在初始化时,通过调用registerProcessor()方法put进去的             //所以下面的代码实际上会由ServerOnRequestProcessor的process()方法进行处理             final Pair<RemotingProcessor, ExecutorService> pair = this.processorTable.get((int) messageTypeAware.getTypeCode());             if (pair != null) {                 if (pair.getSecond() != null) {                     try {                         pair.getSecond().execute(() -> {                             try {                                 pair.getFirst().process(ctx, rpcMessage);                             } catch (Throwable th) {                                 LOGGER.error(FrameworkErrorCode.NetDispatch.getErrCode(), th.getMessage(), th);                             } finally {                                 MDC.clear();                             }                         });                     } catch (RejectedExecutionException e) {                         LOGGER.error(FrameworkErrorCode.ThreadPoolFull.getErrCode(), "thread pool is full, current max pool size is " + messageExecutor.getActiveCount());                         if (allowDumpStack) {                             String name = ManagementFactory.getRuntimeMXBean().getName();                             String pid = name.split("@")[0];                             long idx = System.currentTimeMillis();                             try {                                 String jstackFile = idx + ".log";                                 LOGGER.info("jstack command will dump to " + jstackFile);                                 Runtime.getRuntime().exec(String.format("jstack %s > %s", pid, jstackFile));                             } catch (IOException exx) {                                 LOGGER.error(exx.getMessage());                             }                             allowDumpStack = false;                         }                     }                 } else {                     try {                         pair.getFirst().process(ctx, rpcMessage);                     } catch (Throwable th) {                         LOGGER.error(FrameworkErrorCode.NetDispatch.getErrCode(), th.getMessage(), th);                     }                 }             } else {                 LOGGER.error("This message type [{}] has no processor.", messageTypeAware.getTypeCode());             }         } else {             LOGGER.error("This rpcMessage body[{}] is not MessageTypeAware type.", body);         }     }     ... }  public class NettyRemotingServer extends AbstractNettyRemotingServer {     ...     private void registerProcessor() {         //1. registry on request message processor         ServerOnRequestProcessor onRequestProcessor = new ServerOnRequestProcessor(this, getHandler());         ShutdownHook.getInstance().addDisposable(onRequestProcessor);         super.registerProcessor(MessageType.TYPE_BRANCH_REGISTER, onRequestProcessor, messageExecutor);         super.registerProcessor(MessageType.TYPE_BRANCH_STATUS_REPORT, onRequestProcessor, messageExecutor);         super.registerProcessor(MessageType.TYPE_GLOBAL_BEGIN, onRequestProcessor, messageExecutor);         super.registerProcessor(MessageType.TYPE_GLOBAL_COMMIT, onRequestProcessor, messageExecutor);         super.registerProcessor(MessageType.TYPE_GLOBAL_LOCK_QUERY, onRequestProcessor, messageExecutor);         super.registerProcessor(MessageType.TYPE_GLOBAL_REPORT, onRequestProcessor, messageExecutor);         super.registerProcessor(MessageType.TYPE_GLOBAL_ROLLBACK, onRequestProcessor, messageExecutor);         super.registerProcessor(MessageType.TYPE_GLOBAL_STATUS, onRequestProcessor, messageExecutor);         super.registerProcessor(MessageType.TYPE_SEATA_MERGE, onRequestProcessor, messageExecutor);         //2. registry on response message processor         ServerOnResponseProcessor onResponseProcessor = new ServerOnResponseProcessor(getHandler(), getFutures());         super.registerProcessor(MessageType.TYPE_BRANCH_COMMIT_RESULT, onResponseProcessor, branchResultMessageExecutor);         super.registerProcessor(MessageType.TYPE_BRANCH_ROLLBACK_RESULT, onResponseProcessor, branchResultMessageExecutor);         //3. registry rm message processor         RegRmProcessor regRmProcessor = new RegRmProcessor(this);         super.registerProcessor(MessageType.TYPE_REG_RM, regRmProcessor, messageExecutor);         //4. registry tm message processor         RegTmProcessor regTmProcessor = new RegTmProcessor(this);         super.registerProcessor(MessageType.TYPE_REG_CLT, regTmProcessor, null);         //5. registry heartbeat message processor         ServerHeartbeatProcessor heartbeatMessageProcessor = new ServerHeartbeatProcessor(this);         super.registerProcessor(MessageType.TYPE_HEARTBEAT_MSG, heartbeatMessageProcessor, null);     }     ... }  public abstract class AbstractNettyRemotingServer extends AbstractNettyRemoting implements RemotingServer {     ...     @Override     public void registerProcessor(int messageType, RemotingProcessor processor, ExecutorService executor) {         Pair<RemotingProcessor, ExecutorService> pair = new Pair<>(processor, executor);         this.processorTable.put(messageType, pair);     }     ... }  public abstract class AbstractNettyRemoting implements Disposable {     ...     //This container holds all processors.     protected final HashMap<Integer/*MessageType*/, Pair<RemotingProcessor, ExecutorService>> processorTable = new HashMap<>(32);     ... }  public class ServerOnRequestProcessor implements RemotingProcessor, Disposable {     private final TransactionMessageHandler transactionMessageHandler;     ...     @Override     public void process(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception {         if (ChannelManager.isRegistered(ctx.channel())) {             onRequestMessage(ctx, rpcMessage);         } else {             try {                 if (LOGGER.isInfoEnabled()) {                     LOGGER.info("closeChannelHandlerContext channel:" + ctx.channel());                 }                 ctx.disconnect();                 ctx.close();             } catch (Exception exx) {                 LOGGER.error(exx.getMessage());             }             if (LOGGER.isInfoEnabled()) {                 LOGGER.info(String.format("close a unhandled connection! [%s]", ctx.channel().toString()));             }         }     }          private void onRequestMessage(ChannelHandlerContext ctx, RpcMessage rpcMessage) {         Object message = rpcMessage.getBody();         //RpcContext线程本地变量副本         RpcContext rpcContext = ChannelManager.getContextFromIdentified(ctx.channel());         if (LOGGER.isDebugEnabled()) {             LOGGER.debug("server received:{},clientIp:{},vgroup:{}", message, NetUtil.toIpAddress(ctx.channel().remoteAddress()), rpcContext.getTransactionServiceGroup());         } else {             try {                 BatchLogHandler.INSTANCE.getLogQueue().put(message + ",clientIp:" + NetUtil.toIpAddress(ctx.channel().remoteAddress()) + ",vgroup:" + rpcContext.getTransactionServiceGroup());             } catch (InterruptedException e) {                 LOGGER.error("put message to logQueue error: {}", e.getMessage(), e);             }         }         if (!(message instanceof AbstractMessage)) {             return;         }         //the batch send request message         if (message instanceof MergedWarpMessage) {             ...         } else {             //the single send request message             final AbstractMessage msg = (AbstractMessage) message;             //最终调用到DefaultCoordinator的onRequest()方法来处理RpcMessage             AbstractResultMessage result = transactionMessageHandler.onRequest(msg, rpcContext);             remotingServer.sendAsyncResponse(rpcMessage, ctx.channel(), result);         }     }     ... }  //Server端的全局事务处理逻辑组件 //其中包含了:开启很多后台线程、处理开启全局事务、处理提交全局事务、处理回滚全局事务、处理全局事务状态的上报、处理分支事务的注册、 //本地检查、超时检查、重试回滚、重试提交、异步提交、Undo Log的删除 public class DefaultCoordinator extends AbstractTCInboundHandler implements TransactionMessageHandler, Disposable {     ...     @Override     public AbstractResultMessage onRequest(AbstractMessage request, RpcContext context) {         if (!(request instanceof AbstractTransactionRequestToTC)) {             throw new IllegalArgumentException();         }         AbstractTransactionRequestToTC transactionRequest = (AbstractTransactionRequestToTC) request;         transactionRequest.setTCInboundHandler(this);         return transactionRequest.handle(context);     }     ... }

 

15.Seata Server开启全局事务的流程源码

注意:创建一个全局事务会话后,会通过slf4j的MDC把xid放入线程本地变量副本里。

Seata源码—4.全局事务拦截与开启事务处理
//Server端的全局事务处理逻辑组件 //其中包含了:开启很多后台线程、处理开启全局事务、处理提交全局事务、处理回滚全局事务、处理全局事务状态的上报、处理分支事务的注册、 //本地检查、超时检查、重试回滚、重试提交、异步提交、Undo Log的删除 public class DefaultCoordinator extends AbstractTCInboundHandler implements TransactionMessageHandler, Disposable {     ...     @Override     public AbstractResultMessage onRequest(AbstractMessage request, RpcContext context) {         if (!(request instanceof AbstractTransactionRequestToTC)) {             throw new IllegalArgumentException();         }         AbstractTransactionRequestToTC transactionRequest = (AbstractTransactionRequestToTC) request;         transactionRequest.setTCInboundHandler(this);         return transactionRequest.handle(context);     }     ... }  public class GlobalBeginRequest extends AbstractTransactionRequestToTC {     ...     @Override     public AbstractTransactionResponse handle(RpcContext rpcContext) {         return handler.handle(this, rpcContext);     }     ... }  //The type Abstract tc inbound handler. public abstract class AbstractTCInboundHandler extends AbstractExceptionHandler implements TCInboundHandler {     ...     @Override     public GlobalBeginResponse handle(GlobalBeginRequest request, final RpcContext rpcContext) {         GlobalBeginResponse response = new GlobalBeginResponse();         exceptionHandleTemplate(new AbstractCallback<GlobalBeginRequest, GlobalBeginResponse>() {             @Override             public void execute(GlobalBeginRequest request, GlobalBeginResponse response) throws TransactionException {                 try {                     //开启全局事务                     doGlobalBegin(request, response, rpcContext);                 } catch (StoreException e) {                     throw new TransactionException(TransactionExceptionCode.FailedStore, String.format("begin global request failed. xid=%s, msg=%s", response.getXid(), e.getMessage()), e);                 }             }         }, request, response);         return response;     }          //Do global begin.     protected abstract void doGlobalBegin(GlobalBeginRequest request, GlobalBeginResponse response, RpcContext rpcContext) throws TransactionException;     ... }  public class DefaultCoordinator extends AbstractTCInboundHandler implements TransactionMessageHandler, Disposable {     private final DefaultCore core;     ...     @Override     protected void doGlobalBegin(GlobalBeginRequest request, GlobalBeginResponse response, RpcContext rpcContext) throws TransactionException {         //接下来才真正处理开启全局事务的业务逻辑         //其中会调用DefaultCore来真正开启一个全局事务,即拿到xid并设置到响应里去         response.setXid(core.begin(             rpcContext.getApplicationId(),//应用程序id             rpcContext.getTransactionServiceGroup(),//事务服务分组             request.getTransactionName(),//事务名称             request.getTimeout())//超时时间         );         if (LOGGER.isInfoEnabled()) {             LOGGER.info("Begin new global transaction applicationId: {},transactionServiceGroup: {}, transactionName: {},timeout:{},xid:{}", rpcContext.getApplicationId(), rpcContext.getTransactionServiceGroup(), request.getTransactionName(), request.getTimeout(), response.getXid());         }     }     ... }  public class DefaultCore implements Core {     ...     @Override     public String begin(String applicationId, String transactionServiceGroup, String name, int timeout) throws TransactionException {         //创建一个全局事务会话         GlobalSession session = GlobalSession.createGlobalSession(applicationId, transactionServiceGroup, name, timeout);         //通过slf4j的MDC把xid放入线程本地变量副本里去         MDC.put(RootContext.MDC_KEY_XID, session.getXid());         //添加一个全局事务会话的生命周期监听器         session.addSessionLifecycleListener(SessionHolder.getRootSessionManager());         //打开Session         session.begin();          //transaction start event,发布会话开启事件         MetricsPublisher.postSessionDoingEvent(session, false);                 //返回全局事务会话的xid         return session.getXid();     }     ... }  public class GlobalSession implements SessionLifecycle, SessionStorable {     ...     public static GlobalSession createGlobalSession(String applicationId, String txServiceGroup, String txName, int timeout) {         GlobalSession session = new GlobalSession(applicationId, txServiceGroup, txName, timeout, false);         return session;     }          public GlobalSession(String applicationId, String transactionServiceGroup, String transactionName, int timeout, boolean lazyLoadBranch) {         //全局事务id是通过UUIDGenerator来生成的         this.transactionId = UUIDGenerator.generateUUID();         this.status = GlobalStatus.Begin;         this.lazyLoadBranch = lazyLoadBranch;         if (!lazyLoadBranch) {             this.branchSessions = new ArrayList<>();         }         this.applicationId = applicationId;         this.transactionServiceGroup = transactionServiceGroup;         this.transactionName = transactionName;         this.timeout = timeout;         //根据UUIDGenerator生成的transactionId + XID工具生成最终的xid         this.xid = XID.generateXID(transactionId);     }     ... }

 

举报
发表评论

评论已关闭。

相关文章

当前内容话题
  • 0