领域驱动模型DDD(四)——Eventuate Tram Saga源码讲解

前言

虽然本人一直抱怨《微服务架构设计模式》中DDD模式下采用的Eventuate Tram Saga不算简单易用,但是为了更加深入了解原文作者的设计思路,还是花了点时间去阅读源码,并且为了自己日后自己返回来看的懂,就斗胆地对整个Eventuate Tram Saga从注册到执行的代码运行流程进行注释解读下,其中若是有什么错误疏漏以及需要改进的地方,希望各位在评论区指正。

源码讲解

1:Saga流程如何被记录注册

1-1 CreateOrderSaga类

  //1-1构建流程,为了便以后续讲解,我们将一个step到下一个step之间的所有方法概括成一个“步骤”   public class CreateOrderSaga implements SimpleSaga<CreateOrderSagaData> {       private SagaDefinition<CreateOrderSagaData> sagaDefinition =               //初始化起点,调用1-2的step               step()               //调用1-3 的invokeLocal方法,this::reject是一个输入CreateOrderSagaData类型参数返回CommandWithDestination的方法                 .invokeLocal(this::create)                 .withCompensation(this::reject)               //步骤一               //调用1-2step               .step()               //调用1-3 的invokeParticipant方法                 .invokeParticipant(this::reserveCredit)                 .onReply(CustomerNotFound.class, this::handleCustomerNotFound)                 .onReply(CustomerCreditLimitExceeded.class, this::handleCustomerCreditLimitExceeded)               //步骤二               //调用1-6step               .step()                 .invokeLocal(this::approve)               .build();   } 

1-2 step初始化方法(saga流程的起点)

  //1-2创建sagaDefinition的源码:因为实现了SimpleSaga,SimpleSaga默认方法   //此处的step对应1-1开头调用的step,1-1中后面的两个step并不调用此方法   default StepBuilder<Data> step() {     SimpleSagaDefinitionBuilder<Data> builder = new SimpleSagaDefinitionBuilder<>();     return new StepBuilder<>(builder);   } 

1-3 StepBuilder类

  //1-3StepBuilder内部方法:   public class StepBuilder<Data>{       private final SimpleSagaDefinitionBuilder<Data> parent;       //初始化时,这里parent被赋值为一个new SimpleSagaDefinitionBuilder;之后会用上第一次创建SimpleSagaDefinitionBuilder       public StepBuilder(SimpleSagaDefinitionBuilder<Data> builder) {         this.parent = builder;       }      //此处第一次时传入1-1中的this::create       public LocalStepBuilder<Data> invokeLocal(Consumer<Data> localFunction) {         return new LocalStepBuilder<>(parent, localFunction);       }       //此处传入1-1中的this::reserveCredit方法       public InvokeParticipantStepBuilder<Data> invokeParticipant(Function<Data, CommandWithDestination> action) {         //调用1-6的withAction方法         return new InvokeParticipantStepBuilder<>(parent).withAction(Optional.empty(), action);       }   } 

1-4 LocalStepBuilder类

  //1-4根据CreateOrderSaga 中的流程可得知,后续invokeLocal方法:   public class LocalStepBuilder<Data>  {      private final SimpleSagaDefinitionBuilder<Data> parent;      private final Consumer<Data> localFunction;      private Optional<Consumer<Data>> compensation = Optional.empty();      //设置父节点,设置执行方法      public LocalStepBuilder(SimpleSagaDefinitionBuilder<Data> parent, Consumer<Data> localFunction) {         this.parent = parent;         this.localFunction = localFunction;      }      //设置补偿方法,传入1-1中的this::reject      public LocalStepBuilder<Data> withCompensation(Consumer<Data> localCompensation) {         this.compensation = Optional.of(localCompensation);         return this;      }      //再次调用step方法,此处的step方法对应的是1-1中后面的两个step方法          public StepBuilder<Data> step() {         //调用1-5中的方法,将step之前接收的每个“步骤”放入到SimpleSagaDefinitionBuilder的List中         parent.addStep(makeLocalStep());         //创建一个新的“步骤”并承接之前的“步骤”         return new StepBuilder<>(parent);      }      //创建一个“步骤”      private LocalStep<Data> makeLocalStep() {         return new LocalStep<>(localFunction, compensation, localExceptionSavers, rollbackExceptions);     }     //对用1-1中的build方法,将最后一个“步骤”放入流程列表中,调用1-5中的方法创建一个SimpleSagaDefinition对象      public SagaDefinition<Data> build() {         //添加最后一个“步骤”并调用1-5中build()方法         parent.addStep(makeLocalStep());         return parent.build();         }   } 

1-5 SimpleSagaDefinitionBuilder类

  //1-5   public class SimpleSagaDefinitionBuilder<Data> {       //存放“步骤”列表       private List<SagaStep<Data>> sagaSteps = new LinkedList<>();       //添加“步骤节点”到列表中       public void addStep(SagaStep<Data> sagaStep) {         sagaSteps.add(sagaStep);       }       //将所有步骤节点整合构建整个大“流程业务”       public SagaDefinition<Data> build() {     //将整个“步骤”节点列表传入2-3List<Step> steps内         return new SimpleSagaDefinition<>(sagaSteps);       }   } 

1-6 InvokeParticipantStepBuilder类

   //1-6    public class InvokeParticipantStepBuilder<Data> implements WithCompensationBuilder<Data> {       private Optional<ParticipantInvocation<Data>> action = Optional.empty();            public InvokeParticipantStepBuilder(SimpleSagaDefinitionBuilder<Data> parent) {         this.parent = parent;       }       //调用1-7的构造函数,1-1的this::reserveCredit方法传给ParticipantInvocationImpl构建出上面Optional<ParticipantInvocation<Data>> action       InvokeParticipantStepBuilder<Data> withAction(Optional<Predicate<Data>> participantInvocationPredicate, Function<Data, CommandWithDestination> action) {         this.action = Optional.of(new ParticipantInvocationImpl<>(participantInvocationPredicate, action));         return this;       }              public StepBuilder<Data> step() {          addStep();          return new StepBuilder<>(parent);       }       //本例中因为只使用了withAction方法,所以compensation, actionReplyHandlers, compensationReplyHandlers都为bull       //调用2-7的中ParticipantInvocationStep的构造方法,将action赋予participantInvocation       private void addStep() {          parent.addStep(new ParticipantInvocationStep<>(action, compensation, actionReplyHandlers, compensationReplyHandlers));       }    }  

1-7 ParticipantInvocationImpl类

    //1-7 AbstractParticipantInvocation实现ParticipantInvocation接口     public class ParticipantInvocationImpl<Data, C extends Command> extends AbstractParticipantInvocation<Data> {       private final boolean notification;       //commandBuilder对应的就是1-6中withAction方法action参数       private final Function<Data, CommandWithDestination> commandBuilder;        public ParticipantInvocationImpl(Optional<Predicate<Data>> invocablePredicate, Function<Data, CommandWithDestination> commandBuilder) {         this(invocablePredicate, commandBuilder, false);       }        @Override       public boolean isSuccessfulReply(Message message) {         return CommandReplyOutcome.SUCCESS.name().equals(message.getRequiredHeader(ReplyMessageHeaders.REPLY_OUTCOME));       }        //被2-7调用,执行Function<Data, CommandWithDestination> commandBuilder;并传入值为false的notification       @Override       public CommandWithDestinationAndType makeCommandToSend(Data data) {         return new CommandWithDestinationAndType(commandBuilder.apply(data), notification);       }     } 

至此所有Saga步骤注册完整的业务流程完毕。

2、Saga实例化工厂如何运作

2-0 sagaInstanceFactory创建createOrderSaga

    sagaInstanceFactory.create(createOrderSaga, data); 

2-1 SagaInstanceFactory类

 //2-1Saga实例化工厂  public class SagaInstanceFactory {   private Logger logger = LoggerFactory.getLogger(this.getClass());    private ConcurrentMap<Saga<?>, SagaManager<?>> sagaManagers = new ConcurrentHashMap<>();    public <SagaData> SagaInstance create(Saga<SagaData> saga, SagaData data) {     SagaManager<SagaData>  sagaManager = (SagaManager<SagaData>)sagaManagers.get(saga);     if (sagaManager == null)       throw new RuntimeException(("No SagaManager for " + saga));     //调用2-2的create方法     return sagaManager.create(data);   }    private <SagaData> SagaManager<SagaData> makeSagaManager(SagaManagerFactory sagaManagerFactory, Saga<SagaData> saga) {     SagaManagerImpl<SagaData> sagaManager = sagaManagerFactory.make(saga);     sagaManager.subscribeToReplyChannel();     return sagaManager;   } } 

2-2 SagaManagerImpl类

 //2-2  public class SagaManagerImpl<Data> implements SagaManager<Data> {   @Override   public SagaInstance create(Data sagaData, Optional<String> resource) {      SagaInstance sagaInstance = new SagaInstance(getSagaType(),null,"????",null,SagaDataSerde.serializeSagaData(sagaData), new HashSet<>());     //使用数据库保存saga实例     sagaInstanceRepository.save(sagaInstance);      String sagaId = sagaInstance.getId();     //此步骤没什么意义     saga.onStarting(sagaId, sagaData);      resource.ifPresent(r -> {       if (!sagaLockManager.claimLock(getSagaType(), sagaId, r)) {         throw new RuntimeException("Cannot claim lock for resource");       }     });     //getStateDefinition()获取SagaDefinition,即1-1中。     //调用2-4start方法,启动saga流程,构建第一个“步骤”的SagaActions     SagaActions<Data> actions = getStateDefinition().start(sagaData);      actions.getLocalException().ifPresent(e -> {       throw e;     });     //过程操作,传入参数:saga.getSagaType()是获取sagaData(操作数据)的类名,比CreateOrderSagaData并修改成固定格式     //sagaId:唯一sagaId     //sagaInstance:saga实例化     //sagaData:要操作的数据     //actions:第一个“步骤”的SagaActions     //调用下面的processActions方法     processActions(saga.getSagaType(), sagaId, sagaInstance, sagaData, actions);      return sagaInstance;       }      private void processActions(String sagaType, String sagaId, SagaInstance sagaInstance, Data sagaData, SagaActions<Data> actions) {       //进行循环       while (true) {       //如果传入的actions存在执行报错则信息执行if内的方法         if (actions.getLocalException().isPresent()) {            actions = getStateDefinition().handleReply(sagaType, sagaId, actions.getUpdatedState().get(), actions.getUpdatedSagaData().get(), MessageBuilder                   .withPayload("{}")                   .withHeader(ReplyMessageHeaders.REPLY_OUTCOME, CommandReplyOutcome.FAILURE.name())                   .withHeader(ReplyMessageHeaders.REPLY_TYPE, Failure.class.getName())                   .build());          } else {           // only do this if successful           //如果成功,通过消息队列发送给接收方。           //第一次进来时:因为1-1中构建的流程内,“步骤一”是调用本地方法,因此不需要发送消息           //第二次进来时:由于1-1中构建的流程内,“步骤二”是调用参与者方法,因此需要发送消息给参与者           String lastRequestId = sagaCommandProducer.sendCommands(this.getSagaType(), sagaId, actions.getCommands(), this.makeSagaReplyChannel());           //第一次进来时:lastRequestId第一个步骤时为null           //第二次进来时:返回一个请求Id           sagaInstance.setLastRequestId(lastRequestId);           //第一次进来时:更新“步骤一”sagaInstance实例状态信息:更新是否是最后节点(布尔值),更新是否需要补偿(布尔值),更新是否报错,更新更新的状态(对应2-5中executeStep方法的newState)           //第二次进来时:更新“步骤二”sagaInstance实例状态           updateState(sagaInstance, actions);            sagaInstance.setSerializedSagaData(SagaDataSerde.serializeSagaData(actions.getUpdatedSagaData().orElse(sagaData)));            //执行第一个步骤时,并不是最后一个步骤节点所以不进入if中           if (actions.isEndState()) {             performEndStateActions(sagaId, sagaInstance, actions.isCompensating(), actions.isFailed(), sagaData);           }           //使用数据库更新sagaInstance实例状态           sagaInstanceRepository.update(sagaInstance);           //public boolean isReplyExpected() {return (commands.isEmpty() || commands.stream().anyMatch(CommandWithDestinationAndType::isCommand)) && !local;}           //第一次进来时:在2-5的step.makeStepOutcome过程中因为将local设置为true,所以执行第一个步骤时actions.isReplyExpected()为false           if (actions.isReplyExpected()) {             break;           } else {             //模拟成功回复本地动作或通知,调用下面的simulateSuccessfulReplyToLocalActionOrNotification方法             //第一次进来时:传入“步骤一”的SagaActions,返回“步骤二”的SagaActions,继续循环             //第二次进来时:传入“步骤二”的SagaActions,返回“步骤三”的SagaActions             actions = simulateSuccessfulReplyToLocalActionOrNotification(sagaType, sagaId, actions);           }          }       }     }       //模拟成功回复本地动作或通知       private SagaActions<Data> simulateSuccessfulReplyToLocalActionOrNotification(String sagaType, String sagaId, SagaActions<Data> actions) {         //获取1-1中的整个业务流程后,调用2-4中的handleReply方法,并设置REPLY_OUTCOME和REPLY_TYPE头为success,最后返回“步骤二”SagaActions,重新进入上面的while循环中         return getStateDefinition().handleReply(sagaType, sagaId, actions.getUpdatedState().get(), actions.getUpdatedSagaData().get(), MessageBuilder             .withPayload("{}")             .withHeader(ReplyMessageHeaders.REPLY_OUTCOME, CommandReplyOutcome.SUCCESS.name())             .withHeader(ReplyMessageHeaders.REPLY_TYPE, Success.class.getName())             .build());       }   } 

2-3 AbstractSimpleSagaDefinition类

   //2-3    public abstract class AbstractSimpleSagaDefinition<Data, Step extends ISagaStep<Data>,             ToExecute extends AbstractStepToExecute<Data, Step>,             Provider extends AbstractSagaActionsProvider<Data,?>> {          protected Logger logger = LoggerFactory.getLogger(this.getClass());         //steps接受1-5中的sagaSteps         protected List<Step> steps;          public AbstractSimpleSagaDefinition(List<Step> steps) {             this.steps = steps;         }         //被2-4start方法调用         protected Provider firstStepToExecute(Data data) {        //SagaExecutionState.startingState()返回SagaExecutionState(-1, false);        //初始化时步骤节点currentlyExecuting为-1(因为0为第一个“步骤”,所以设置-1为起点),是否补偿compensating为false        //开始执行下一个步骤节点(此处执行的是第一个“步骤”)             return nextStepToExecute(SagaExecutionState.startingState(), data);         }         //被2-4的handleReply方法调用         protected Provider sagaActionsForNextStep(String sagaType, String sagaId, Data sagaData, Message message,                                               SagaExecutionState state, Step currentStep, boolean compensating) {             //此处分两种情况:本地调方法用CommandReplyOutcome.SUCCESS.name().equals(message.getRequiredHeader(ReplyMessageHeaders.REPLY_OUTCOME));【例如1-1中的“步骤一”】;远程调用参与者方法用getParticipantInvocation(compensating).get().isSuccessfulReply(message);【例如1-1中的“步骤二”】             //由于“步骤一”是本地方法且头中包含信息为success,直接调用下方的nextStepToExecute方法             if (currentStep.isSuccessfulReply(compensating, message)) {                 return nextStepToExecute(state, sagaData);             } else if (compensating) {                 return handleFailedCompensatingTransaction(sagaType, sagaId, state, message);             } else {                 return nextStepToExecute(state.startCompensating(), sagaData);             }         }          protected Provider nextStepToExecute(SagaExecutionState state, Data data) {             int skipped = 0;             //初始化compensating为false无需补偿             boolean compensating = state.isCompensating();             //初始化compensating为false,所以下个要执行的步骤节点+1,如果为true说明出错需要回滚,因此direction初始化为-1             int direction = compensating ? -1 : +1;             //第一次进入时:direction初始化为-1,所以i初始值为0,说明从第一个步骤开始执行;i必须小于节点长度;i根据direction来判断是否需要回滚到上一个节点还是进入下一个阶段             //第二次进入时:步骤一执行后state.getCurrentlyExecuting()变为0,所以i变为1,steps.get(i)获取“步骤二”。由于compensating依然为false,所以不进行回滚继续向下执行             for (int i = state.getCurrentlyExecuting() + direction; i >= 0 && i < steps.size(); i = i + direction) {                //获取步骤节点                 Step step = steps.get(i);                 //每个步骤节点中有正常执行的方法事务,可能有补偿事物。                 //因此使用compensating进行判断。如果需要补偿且存在补偿事务,或则不需要补偿,以上两种情况则为true                 //step.hasCompensation(data)和step.hasAction(data)返回值都是布尔值。                 //step.hasCompensation(data)用来判断是否存在补偿事务,step.hasAction(data)直接返回true                 if ((compensating ? step.hasCompensation(data) : step.hasAction(data))) {                     //makeStepToExecute指定执行步骤,调用SimpleSagaDefinition方法中的makeStepToExecute方法                     //传入参数:当前跳过的skipped计数,是否需要补偿,以及“步骤”                     //makeSagaActionsProvider调用2-4的makeStepmakeStepToExecuteToExecute方法,构建一个StepToExecute对象,                     ToExecute stepToExecute = makeStepToExecute(skipped, compensating, step);                     //makeSagaActionsProvider传入:执行节点,处理数据,节点初始状态,调用2-4中的makeSagaActionsProvider方法                     return makeSagaActionsProvider(stepToExecute, data, state);                 } else                     //如果需要补偿但没有补偿事务                     //跳过计数+1                     skipped++;             }             return makeSagaActionsProvider(makeEndStateSagaActions(state));         }          protected Provider handleFailedCompensatingTransaction(String sagaType, String sagaId, SagaExecutionState state, Message message) {             logger.error("Saga {} {} failed due to failed compensating transaction {}", sagaType, sagaId, message);             return makeSagaActionsProvider(SagaActions.<Data>builder()                     .withUpdatedState(SagaExecutionStateJsonSerde.encodeState(SagaExecutionState.makeFailedEndState()))                     .withIsEndState(true)                     .withIsCompensating(state.isCompensating())                     .withIsFailed(true)                     .build());         }          protected SagaActions<Data> makeEndStateSagaActions(SagaExecutionState state) {             return SagaActions.<Data>builder()                     .withUpdatedState(SagaExecutionStateJsonSerde.encodeState(SagaExecutionState.makeEndState()))                     .withIsEndState(true)                     .withIsCompensating(state.isCompensating())                     .build();         }      } 

2-4 SimpleSagaDefinition类

    //2-4     public class SimpleSagaDefinition<Data>{        public SimpleSagaDefinition(List<SagaStep<Data>> steps) {         super(steps);       }        @Override       public SagaActions<Data> start(Data sagaData) {          //执行2-3中的firstStepToExecute方法,启动第一个流程         return toSagaActions(firstStepToExecute(sagaData));         //构建SagaActions完毕,返回并继续2-2中的create方法内getStateDefinition().start(sagaData)后的代码       }        //被2-2中的processActions方法调用       @Override       public SagaActions<Data> handleReply(String sagaType, String sagaId, String currentState, Data sagaData, Message message) {          //将前一个“步骤”中的之前被JSON格式化(2-5的makeSagaActions把newState进行JSON格式化)的newState此处的currentState进行解码         SagaExecutionState state = SagaExecutionStateJsonSerde.decodeState(currentState);          //state.getCurrentlyExecuting()的值为“0”,因为初始化currentlyExecuting的计数是-1,而在2-5 中执行currentState.nextState(size())已经把当前“步骤”计数+1.而,所以step.get(0)获取了整个业务流程中的“步骤一”         SagaStep<Data> currentStep = steps.get(state.getCurrentlyExecuting());         //获取前一个“步骤”是否需要回滚         boolean compensating = state.isCompensating();          currentStep.getReplyHandler(message, compensating).ifPresent(handler -> invokeReplyHandler(message, sagaData, (d, m) -> {               handler.accept(d, m);               return null;         }));         //sagaActionsForNextStep会根据是否需要补偿来判断是采用nextStepToExecute(2-3)方法,还是调用handleFailedCompensatingTransaction(2-3)方法。         SagaActionsProvider<Data> sap = sagaActionsForNextStep(sagaType, sagaId, sagaData, message, state, currentStep, compensating);         return toSagaActions(sap);        }         //被2-3的nextStepToExecute中调用       @Override         protected StepToExecute<Data> makeStepToExecute(int skipped, boolean compensating, SagaStep<Data> step) {           return new StepToExecute<>(step, skipped, compensating);       }           //被2-3的nextStepToExecute中调用       @Override       protected SagaActionsProvider<Data> makeSagaActionsProvider(StepToExecute<Data> stepToExecute, Data data, SagaExecutionState state) {         //调用2-5的executeStep方法         return new SagaActionsProvider<>(() -> stepToExecute.executeStep(data, state));       }     } 

2-5 executeStep方法

    //2-5  StepToExecute类中的方法,被2-4中的makeSagaActionsProvider方法调用     public SagaActions<Data> executeStep(Data data, SagaExecutionState currentState) {      //nextState方法执行:SagaExecutionState(compensating ? currentlyExecuting - size : currentlyExecuting + size, compensating)      //protected int size() {return 1 + skipped;}      //需要补偿:则下一个状态为当前节点currentlyExecuting-已跳过(没有补偿事务)的节点的长度,即回到最后(有补偿事务)执行的步骤节点。      //不需要补偿:则下一个状态为当前节点currentlyExecuting+需要跳过(没有补偿事务)的节点的长度      //计算完后将currentlyExecuting进行更新       SagaExecutionState newState = currentState.nextState(size());        SagaActions.Builder<Data> builder = SagaActions.builder();       //当前正执行是否需要补偿回滚       boolean compensating = currentState.isCompensating();       //调用2-6中的makeStepOutcome方法       //第一次进来时:因为步骤一执行的是本地方法,调用的2-6中的makeStepOutcome,作用在于判断当前“步骤”是该执行补偿事务还是正常的本地事务,如果执行出现错误则返回一个带有报错信息的StepOutcome对象       //第二次进来时:因为步骤二执行的是让参与方执行方法,调用的2-7中的makeStepOutcomemakeStepOutcome       //执行StepOutcome的visit方法:将StepOutcome的RuntimeException类型的localOutcome属性赋值给SagaActions中的RuntimeException类型localException属性,同时将SagaActions中的local属性设置为ture;       step.makeStepOutcome(data, this.compensating).visit(builder::withIsLocal, builder::withCommands);        //SagaActions的makeSagaActions方法做两件事:将当前节点的数据newState格式化成JSON数据,newState信息包括:当前执行步骤的计数(第几个步骤)、是否回滚(布尔值)、是否是最后一个步骤(布尔值),是否出现错误(布尔值)。然后调用buildActions方法构建返回一个新的SagaActions       //String state = encodeState(newState);       //builder.buildActions(data, compensating, state, newState.isEndState());       //public SagaActions<Data> buildActions(Data data, boolean compensating, String state, boolean endState) {       //return withUpdatedSagaData(data)       //       .withUpdatedState(state)       //       .withIsEndState(endState)       //       .withIsCompensating(compensating)       //       .build();       //}       return makeSagaActions(builder, data, newState, compensating);       //第一次进来时:构建SagaActions完毕,返回到2-4中的start方法       //第二次进来时:构建SagagAcions完毕,返回到2-2中的simulateSuccessfulReplyToLocalActionOrNotification方法     } 

2-6 LocalStep类

    //2-6     public class LocalStep<Data> implements SagaStep<Data> {       private final Consumer<Data> localFunction;       private final Optional<Consumer<Data>> compensation;       private final List<LocalExceptionSaver<Data>> localExceptionSavers;       private final List<Class<RuntimeException>> rollbackExceptions;        @Override       public StepOutcome makeStepOutcome(Data data, boolean compensating) {         try {           //如果需要回滚,执行回滚方法,compensation在1-1时的.withCompensation已经传入           if (compensating) {            //真正执行业务逻辑方法的地方             compensation.ifPresent(localStep -> localStep.accept(data));           } else {           //如果不需要回滚,直接执行补偿方法,localFunction在1-1的.invokeLocal时已经传入             localFunction.accept(data);           }           return makeLocalOutcome(Optional.empty());         } catch (RuntimeException e) {           localExceptionSavers.stream().filter(saver -> saver.shouldSave(e)).findFirst().ifPresent(saver -> saver.save(data, e));           if (rollbackExceptions.isEmpty() || rollbackExceptions.stream().anyMatch(c -> c.isInstance(e)))             return makeLocalOutcome(Optional.of(e));           else             throw e;         }       }      } 

2-7 ParticipantInvocationStep类

   //2-7    public class ParticipantInvocationStep<Data> implements SagaStep<Data> {       //participantInvocation被1-6的aaddStep方法传递action赋值       private Optional<ParticipantInvocation<Data>> participantInvocation;       private Optional<ParticipantInvocation<Data>> compensation;              public ParticipantInvocationStep(Optional<ParticipantInvocation<Data>> participantInvocation,                                    Optional<ParticipantInvocation<Data>> compensation,                                    Map<String, BiConsumer<Data, Object>> actionReplyHandlers,                                    Map<String, BiConsumer<Data, Object>> compensationReplyHandlers) {     	this.actionReplyHandlers = actionReplyHandlers;     	this.compensationReplyHandlers = compensationReplyHandlers;     	this.participantInvocation = participantInvocation;     	this.compensation = compensation;   	 }       //判断是否需要回滚,如果需要回滚执行compensation方法,如果不需要执行participantInvocation方法       private Optional<ParticipantInvocation<Data>> getParticipantInvocation(boolean compensating) {         return compensating ? compensation : participantInvocation;       }        @Override       public boolean isSuccessfulReply(boolean compensating, Message message) {         return getParticipantInvocation(compensating).get().isSuccessfulReply(message);       }        @Override       public StepOutcome makeStepOutcome(Data data, boolean compensating) {          //先调用getParticipantInvocation方法,此处假设不需要回滚所以返回participantInvocation          //调用makeRemoteStepOutcome方法,传入List<CommandWithDestinationAndType> commandsToSend 返回RemoteStepOutcome类型的结果         return StepOutcome.makeRemoteStepOutcome(getParticipantInvocation(compensating)                 //调用1-7中的makeCommandToSend方法,执行消息发送方法                 .map(pi -> pi.makeCommandToSend(data))                 //将返回的CommandWithDestinationAndType包装成单元素列表                  .map(Collections::singletonList)                 //如果上述返回为空则返回一个空列表                 .orElseGet(Collections::emptyList));       }     } 

图解流程

有空再补....

简单案例

花了两天写了个以领域驱动为思想的Saga模式事务管理简陋框架,主要为了讲解:领域驱动模型DDD(三)——使用Saga管理事务 教学而设计的,目前只能在单体架构中使用,后续有时间会更新分布式情况下的新版本。请记住,领域驱动模型是一种思想,它不一定捆绑分布式微服务,只是领域驱动模型思想更有利于分布式情况下对微服务应用的划分。
项目框架地址:https://github.com/CG-Lin/mvn-lin

发表评论

评论已关闭。

相关文章