前言
虽然本人一直抱怨《微服务架构设计模式》中DDD模式下采用的Eventuate Tram Saga不算简单易用,但是为了更加深入了解原文作者的设计思路,还是花了点时间去阅读源码,并且为了自己日后自己返回来看的懂,就斗胆地对整个Eventuate Tram Saga从注册到执行的代码运行流程进行注释解读下,其中若是有什么错误疏漏以及需要改进的地方,希望各位在评论区指正。
源码讲解
1:Saga流程如何被记录注册
1-1 CreateOrderSaga类
//1-1构建流程,为了便以后续讲解,我们将一个step到下一个step之间的所有方法概括成一个“步骤” public class CreateOrderSaga implements SimpleSaga<CreateOrderSagaData> { private SagaDefinition<CreateOrderSagaData> sagaDefinition = //初始化起点,调用1-2的step step() //调用1-3 的invokeLocal方法,this::reject是一个输入CreateOrderSagaData类型参数返回CommandWithDestination的方法 .invokeLocal(this::create) .withCompensation(this::reject) //步骤一 //调用1-2step .step() //调用1-3 的invokeParticipant方法 .invokeParticipant(this::reserveCredit) .onReply(CustomerNotFound.class, this::handleCustomerNotFound) .onReply(CustomerCreditLimitExceeded.class, this::handleCustomerCreditLimitExceeded) //步骤二 //调用1-6step .step() .invokeLocal(this::approve) .build(); }
1-2 step初始化方法(saga流程的起点)
//1-2创建sagaDefinition的源码:因为实现了SimpleSaga,SimpleSaga默认方法 //此处的step对应1-1开头调用的step,1-1中后面的两个step并不调用此方法 default StepBuilder<Data> step() { SimpleSagaDefinitionBuilder<Data> builder = new SimpleSagaDefinitionBuilder<>(); return new StepBuilder<>(builder); }
1-3 StepBuilder类
//1-3StepBuilder内部方法: public class StepBuilder<Data>{ private final SimpleSagaDefinitionBuilder<Data> parent; //初始化时,这里parent被赋值为一个new SimpleSagaDefinitionBuilder;之后会用上第一次创建SimpleSagaDefinitionBuilder public StepBuilder(SimpleSagaDefinitionBuilder<Data> builder) { this.parent = builder; } //此处第一次时传入1-1中的this::create public LocalStepBuilder<Data> invokeLocal(Consumer<Data> localFunction) { return new LocalStepBuilder<>(parent, localFunction); } //此处传入1-1中的this::reserveCredit方法 public InvokeParticipantStepBuilder<Data> invokeParticipant(Function<Data, CommandWithDestination> action) { //调用1-6的withAction方法 return new InvokeParticipantStepBuilder<>(parent).withAction(Optional.empty(), action); } }
1-4 LocalStepBuilder类
//1-4根据CreateOrderSaga 中的流程可得知,后续invokeLocal方法: public class LocalStepBuilder<Data> { private final SimpleSagaDefinitionBuilder<Data> parent; private final Consumer<Data> localFunction; private Optional<Consumer<Data>> compensation = Optional.empty(); //设置父节点,设置执行方法 public LocalStepBuilder(SimpleSagaDefinitionBuilder<Data> parent, Consumer<Data> localFunction) { this.parent = parent; this.localFunction = localFunction; } //设置补偿方法,传入1-1中的this::reject public LocalStepBuilder<Data> withCompensation(Consumer<Data> localCompensation) { this.compensation = Optional.of(localCompensation); return this; } //再次调用step方法,此处的step方法对应的是1-1中后面的两个step方法 public StepBuilder<Data> step() { //调用1-5中的方法,将step之前接收的每个“步骤”放入到SimpleSagaDefinitionBuilder的List中 parent.addStep(makeLocalStep()); //创建一个新的“步骤”并承接之前的“步骤” return new StepBuilder<>(parent); } //创建一个“步骤” private LocalStep<Data> makeLocalStep() { return new LocalStep<>(localFunction, compensation, localExceptionSavers, rollbackExceptions); } //对用1-1中的build方法,将最后一个“步骤”放入流程列表中,调用1-5中的方法创建一个SimpleSagaDefinition对象 public SagaDefinition<Data> build() { //添加最后一个“步骤”并调用1-5中build()方法 parent.addStep(makeLocalStep()); return parent.build(); } }
1-5 SimpleSagaDefinitionBuilder类
//1-5 public class SimpleSagaDefinitionBuilder<Data> { //存放“步骤”列表 private List<SagaStep<Data>> sagaSteps = new LinkedList<>(); //添加“步骤节点”到列表中 public void addStep(SagaStep<Data> sagaStep) { sagaSteps.add(sagaStep); } //将所有步骤节点整合构建整个大“流程业务” public SagaDefinition<Data> build() { //将整个“步骤”节点列表传入2-3List<Step> steps内 return new SimpleSagaDefinition<>(sagaSteps); } }
1-6 InvokeParticipantStepBuilder类
//1-6 public class InvokeParticipantStepBuilder<Data> implements WithCompensationBuilder<Data> { private Optional<ParticipantInvocation<Data>> action = Optional.empty(); public InvokeParticipantStepBuilder(SimpleSagaDefinitionBuilder<Data> parent) { this.parent = parent; } //调用1-7的构造函数,1-1的this::reserveCredit方法传给ParticipantInvocationImpl构建出上面Optional<ParticipantInvocation<Data>> action InvokeParticipantStepBuilder<Data> withAction(Optional<Predicate<Data>> participantInvocationPredicate, Function<Data, CommandWithDestination> action) { this.action = Optional.of(new ParticipantInvocationImpl<>(participantInvocationPredicate, action)); return this; } public StepBuilder<Data> step() { addStep(); return new StepBuilder<>(parent); } //本例中因为只使用了withAction方法,所以compensation, actionReplyHandlers, compensationReplyHandlers都为bull //调用2-7的中ParticipantInvocationStep的构造方法,将action赋予participantInvocation private void addStep() { parent.addStep(new ParticipantInvocationStep<>(action, compensation, actionReplyHandlers, compensationReplyHandlers)); } }
1-7 ParticipantInvocationImpl类
//1-7 AbstractParticipantInvocation实现ParticipantInvocation接口 public class ParticipantInvocationImpl<Data, C extends Command> extends AbstractParticipantInvocation<Data> { private final boolean notification; //commandBuilder对应的就是1-6中withAction方法action参数 private final Function<Data, CommandWithDestination> commandBuilder; public ParticipantInvocationImpl(Optional<Predicate<Data>> invocablePredicate, Function<Data, CommandWithDestination> commandBuilder) { this(invocablePredicate, commandBuilder, false); } @Override public boolean isSuccessfulReply(Message message) { return CommandReplyOutcome.SUCCESS.name().equals(message.getRequiredHeader(ReplyMessageHeaders.REPLY_OUTCOME)); } //被2-7调用,执行Function<Data, CommandWithDestination> commandBuilder;并传入值为false的notification @Override public CommandWithDestinationAndType makeCommandToSend(Data data) { return new CommandWithDestinationAndType(commandBuilder.apply(data), notification); } }
至此所有Saga步骤注册完整的业务流程完毕。
2、Saga实例化工厂如何运作
2-0 sagaInstanceFactory创建createOrderSaga
sagaInstanceFactory.create(createOrderSaga, data);
2-1 SagaInstanceFactory类
//2-1Saga实例化工厂 public class SagaInstanceFactory { private Logger logger = LoggerFactory.getLogger(this.getClass()); private ConcurrentMap<Saga<?>, SagaManager<?>> sagaManagers = new ConcurrentHashMap<>(); public <SagaData> SagaInstance create(Saga<SagaData> saga, SagaData data) { SagaManager<SagaData> sagaManager = (SagaManager<SagaData>)sagaManagers.get(saga); if (sagaManager == null) throw new RuntimeException(("No SagaManager for " + saga)); //调用2-2的create方法 return sagaManager.create(data); } private <SagaData> SagaManager<SagaData> makeSagaManager(SagaManagerFactory sagaManagerFactory, Saga<SagaData> saga) { SagaManagerImpl<SagaData> sagaManager = sagaManagerFactory.make(saga); sagaManager.subscribeToReplyChannel(); return sagaManager; } }
2-2 SagaManagerImpl类
//2-2 public class SagaManagerImpl<Data> implements SagaManager<Data> { @Override public SagaInstance create(Data sagaData, Optional<String> resource) { SagaInstance sagaInstance = new SagaInstance(getSagaType(),null,"????",null,SagaDataSerde.serializeSagaData(sagaData), new HashSet<>()); //使用数据库保存saga实例 sagaInstanceRepository.save(sagaInstance); String sagaId = sagaInstance.getId(); //此步骤没什么意义 saga.onStarting(sagaId, sagaData); resource.ifPresent(r -> { if (!sagaLockManager.claimLock(getSagaType(), sagaId, r)) { throw new RuntimeException("Cannot claim lock for resource"); } }); //getStateDefinition()获取SagaDefinition,即1-1中。 //调用2-4start方法,启动saga流程,构建第一个“步骤”的SagaActions SagaActions<Data> actions = getStateDefinition().start(sagaData); actions.getLocalException().ifPresent(e -> { throw e; }); //过程操作,传入参数:saga.getSagaType()是获取sagaData(操作数据)的类名,比CreateOrderSagaData并修改成固定格式 //sagaId:唯一sagaId //sagaInstance:saga实例化 //sagaData:要操作的数据 //actions:第一个“步骤”的SagaActions //调用下面的processActions方法 processActions(saga.getSagaType(), sagaId, sagaInstance, sagaData, actions); return sagaInstance; } private void processActions(String sagaType, String sagaId, SagaInstance sagaInstance, Data sagaData, SagaActions<Data> actions) { //进行循环 while (true) { //如果传入的actions存在执行报错则信息执行if内的方法 if (actions.getLocalException().isPresent()) { actions = getStateDefinition().handleReply(sagaType, sagaId, actions.getUpdatedState().get(), actions.getUpdatedSagaData().get(), MessageBuilder .withPayload("{}") .withHeader(ReplyMessageHeaders.REPLY_OUTCOME, CommandReplyOutcome.FAILURE.name()) .withHeader(ReplyMessageHeaders.REPLY_TYPE, Failure.class.getName()) .build()); } else { // only do this if successful //如果成功,通过消息队列发送给接收方。 //第一次进来时:因为1-1中构建的流程内,“步骤一”是调用本地方法,因此不需要发送消息 //第二次进来时:由于1-1中构建的流程内,“步骤二”是调用参与者方法,因此需要发送消息给参与者 String lastRequestId = sagaCommandProducer.sendCommands(this.getSagaType(), sagaId, actions.getCommands(), this.makeSagaReplyChannel()); //第一次进来时:lastRequestId第一个步骤时为null //第二次进来时:返回一个请求Id sagaInstance.setLastRequestId(lastRequestId); //第一次进来时:更新“步骤一”sagaInstance实例状态信息:更新是否是最后节点(布尔值),更新是否需要补偿(布尔值),更新是否报错,更新更新的状态(对应2-5中executeStep方法的newState) //第二次进来时:更新“步骤二”sagaInstance实例状态 updateState(sagaInstance, actions); sagaInstance.setSerializedSagaData(SagaDataSerde.serializeSagaData(actions.getUpdatedSagaData().orElse(sagaData))); //执行第一个步骤时,并不是最后一个步骤节点所以不进入if中 if (actions.isEndState()) { performEndStateActions(sagaId, sagaInstance, actions.isCompensating(), actions.isFailed(), sagaData); } //使用数据库更新sagaInstance实例状态 sagaInstanceRepository.update(sagaInstance); //public boolean isReplyExpected() {return (commands.isEmpty() || commands.stream().anyMatch(CommandWithDestinationAndType::isCommand)) && !local;} //第一次进来时:在2-5的step.makeStepOutcome过程中因为将local设置为true,所以执行第一个步骤时actions.isReplyExpected()为false if (actions.isReplyExpected()) { break; } else { //模拟成功回复本地动作或通知,调用下面的simulateSuccessfulReplyToLocalActionOrNotification方法 //第一次进来时:传入“步骤一”的SagaActions,返回“步骤二”的SagaActions,继续循环 //第二次进来时:传入“步骤二”的SagaActions,返回“步骤三”的SagaActions actions = simulateSuccessfulReplyToLocalActionOrNotification(sagaType, sagaId, actions); } } } } //模拟成功回复本地动作或通知 private SagaActions<Data> simulateSuccessfulReplyToLocalActionOrNotification(String sagaType, String sagaId, SagaActions<Data> actions) { //获取1-1中的整个业务流程后,调用2-4中的handleReply方法,并设置REPLY_OUTCOME和REPLY_TYPE头为success,最后返回“步骤二”SagaActions,重新进入上面的while循环中 return getStateDefinition().handleReply(sagaType, sagaId, actions.getUpdatedState().get(), actions.getUpdatedSagaData().get(), MessageBuilder .withPayload("{}") .withHeader(ReplyMessageHeaders.REPLY_OUTCOME, CommandReplyOutcome.SUCCESS.name()) .withHeader(ReplyMessageHeaders.REPLY_TYPE, Success.class.getName()) .build()); } }
2-3 AbstractSimpleSagaDefinition类
//2-3 public abstract class AbstractSimpleSagaDefinition<Data, Step extends ISagaStep<Data>, ToExecute extends AbstractStepToExecute<Data, Step>, Provider extends AbstractSagaActionsProvider<Data,?>> { protected Logger logger = LoggerFactory.getLogger(this.getClass()); //steps接受1-5中的sagaSteps protected List<Step> steps; public AbstractSimpleSagaDefinition(List<Step> steps) { this.steps = steps; } //被2-4start方法调用 protected Provider firstStepToExecute(Data data) { //SagaExecutionState.startingState()返回SagaExecutionState(-1, false); //初始化时步骤节点currentlyExecuting为-1(因为0为第一个“步骤”,所以设置-1为起点),是否补偿compensating为false //开始执行下一个步骤节点(此处执行的是第一个“步骤”) return nextStepToExecute(SagaExecutionState.startingState(), data); } //被2-4的handleReply方法调用 protected Provider sagaActionsForNextStep(String sagaType, String sagaId, Data sagaData, Message message, SagaExecutionState state, Step currentStep, boolean compensating) { //此处分两种情况:本地调方法用CommandReplyOutcome.SUCCESS.name().equals(message.getRequiredHeader(ReplyMessageHeaders.REPLY_OUTCOME));【例如1-1中的“步骤一”】;远程调用参与者方法用getParticipantInvocation(compensating).get().isSuccessfulReply(message);【例如1-1中的“步骤二”】 //由于“步骤一”是本地方法且头中包含信息为success,直接调用下方的nextStepToExecute方法 if (currentStep.isSuccessfulReply(compensating, message)) { return nextStepToExecute(state, sagaData); } else if (compensating) { return handleFailedCompensatingTransaction(sagaType, sagaId, state, message); } else { return nextStepToExecute(state.startCompensating(), sagaData); } } protected Provider nextStepToExecute(SagaExecutionState state, Data data) { int skipped = 0; //初始化compensating为false无需补偿 boolean compensating = state.isCompensating(); //初始化compensating为false,所以下个要执行的步骤节点+1,如果为true说明出错需要回滚,因此direction初始化为-1 int direction = compensating ? -1 : +1; //第一次进入时:direction初始化为-1,所以i初始值为0,说明从第一个步骤开始执行;i必须小于节点长度;i根据direction来判断是否需要回滚到上一个节点还是进入下一个阶段 //第二次进入时:步骤一执行后state.getCurrentlyExecuting()变为0,所以i变为1,steps.get(i)获取“步骤二”。由于compensating依然为false,所以不进行回滚继续向下执行 for (int i = state.getCurrentlyExecuting() + direction; i >= 0 && i < steps.size(); i = i + direction) { //获取步骤节点 Step step = steps.get(i); //每个步骤节点中有正常执行的方法事务,可能有补偿事物。 //因此使用compensating进行判断。如果需要补偿且存在补偿事务,或则不需要补偿,以上两种情况则为true //step.hasCompensation(data)和step.hasAction(data)返回值都是布尔值。 //step.hasCompensation(data)用来判断是否存在补偿事务,step.hasAction(data)直接返回true if ((compensating ? step.hasCompensation(data) : step.hasAction(data))) { //makeStepToExecute指定执行步骤,调用SimpleSagaDefinition方法中的makeStepToExecute方法 //传入参数:当前跳过的skipped计数,是否需要补偿,以及“步骤” //makeSagaActionsProvider调用2-4的makeStepmakeStepToExecuteToExecute方法,构建一个StepToExecute对象, ToExecute stepToExecute = makeStepToExecute(skipped, compensating, step); //makeSagaActionsProvider传入:执行节点,处理数据,节点初始状态,调用2-4中的makeSagaActionsProvider方法 return makeSagaActionsProvider(stepToExecute, data, state); } else //如果需要补偿但没有补偿事务 //跳过计数+1 skipped++; } return makeSagaActionsProvider(makeEndStateSagaActions(state)); } protected Provider handleFailedCompensatingTransaction(String sagaType, String sagaId, SagaExecutionState state, Message message) { logger.error("Saga {} {} failed due to failed compensating transaction {}", sagaType, sagaId, message); return makeSagaActionsProvider(SagaActions.<Data>builder() .withUpdatedState(SagaExecutionStateJsonSerde.encodeState(SagaExecutionState.makeFailedEndState())) .withIsEndState(true) .withIsCompensating(state.isCompensating()) .withIsFailed(true) .build()); } protected SagaActions<Data> makeEndStateSagaActions(SagaExecutionState state) { return SagaActions.<Data>builder() .withUpdatedState(SagaExecutionStateJsonSerde.encodeState(SagaExecutionState.makeEndState())) .withIsEndState(true) .withIsCompensating(state.isCompensating()) .build(); } }
2-4 SimpleSagaDefinition类
//2-4 public class SimpleSagaDefinition<Data>{ public SimpleSagaDefinition(List<SagaStep<Data>> steps) { super(steps); } @Override public SagaActions<Data> start(Data sagaData) { //执行2-3中的firstStepToExecute方法,启动第一个流程 return toSagaActions(firstStepToExecute(sagaData)); //构建SagaActions完毕,返回并继续2-2中的create方法内getStateDefinition().start(sagaData)后的代码 } //被2-2中的processActions方法调用 @Override public SagaActions<Data> handleReply(String sagaType, String sagaId, String currentState, Data sagaData, Message message) { //将前一个“步骤”中的之前被JSON格式化(2-5的makeSagaActions把newState进行JSON格式化)的newState此处的currentState进行解码 SagaExecutionState state = SagaExecutionStateJsonSerde.decodeState(currentState); //state.getCurrentlyExecuting()的值为“0”,因为初始化currentlyExecuting的计数是-1,而在2-5 中执行currentState.nextState(size())已经把当前“步骤”计数+1.而,所以step.get(0)获取了整个业务流程中的“步骤一” SagaStep<Data> currentStep = steps.get(state.getCurrentlyExecuting()); //获取前一个“步骤”是否需要回滚 boolean compensating = state.isCompensating(); currentStep.getReplyHandler(message, compensating).ifPresent(handler -> invokeReplyHandler(message, sagaData, (d, m) -> { handler.accept(d, m); return null; })); //sagaActionsForNextStep会根据是否需要补偿来判断是采用nextStepToExecute(2-3)方法,还是调用handleFailedCompensatingTransaction(2-3)方法。 SagaActionsProvider<Data> sap = sagaActionsForNextStep(sagaType, sagaId, sagaData, message, state, currentStep, compensating); return toSagaActions(sap); } //被2-3的nextStepToExecute中调用 @Override protected StepToExecute<Data> makeStepToExecute(int skipped, boolean compensating, SagaStep<Data> step) { return new StepToExecute<>(step, skipped, compensating); } //被2-3的nextStepToExecute中调用 @Override protected SagaActionsProvider<Data> makeSagaActionsProvider(StepToExecute<Data> stepToExecute, Data data, SagaExecutionState state) { //调用2-5的executeStep方法 return new SagaActionsProvider<>(() -> stepToExecute.executeStep(data, state)); } }
2-5 executeStep方法
//2-5 StepToExecute类中的方法,被2-4中的makeSagaActionsProvider方法调用 public SagaActions<Data> executeStep(Data data, SagaExecutionState currentState) { //nextState方法执行:SagaExecutionState(compensating ? currentlyExecuting - size : currentlyExecuting + size, compensating) //protected int size() {return 1 + skipped;} //需要补偿:则下一个状态为当前节点currentlyExecuting-已跳过(没有补偿事务)的节点的长度,即回到最后(有补偿事务)执行的步骤节点。 //不需要补偿:则下一个状态为当前节点currentlyExecuting+需要跳过(没有补偿事务)的节点的长度 //计算完后将currentlyExecuting进行更新 SagaExecutionState newState = currentState.nextState(size()); SagaActions.Builder<Data> builder = SagaActions.builder(); //当前正执行是否需要补偿回滚 boolean compensating = currentState.isCompensating(); //调用2-6中的makeStepOutcome方法 //第一次进来时:因为步骤一执行的是本地方法,调用的2-6中的makeStepOutcome,作用在于判断当前“步骤”是该执行补偿事务还是正常的本地事务,如果执行出现错误则返回一个带有报错信息的StepOutcome对象 //第二次进来时:因为步骤二执行的是让参与方执行方法,调用的2-7中的makeStepOutcomemakeStepOutcome //执行StepOutcome的visit方法:将StepOutcome的RuntimeException类型的localOutcome属性赋值给SagaActions中的RuntimeException类型localException属性,同时将SagaActions中的local属性设置为ture; step.makeStepOutcome(data, this.compensating).visit(builder::withIsLocal, builder::withCommands); //SagaActions的makeSagaActions方法做两件事:将当前节点的数据newState格式化成JSON数据,newState信息包括:当前执行步骤的计数(第几个步骤)、是否回滚(布尔值)、是否是最后一个步骤(布尔值),是否出现错误(布尔值)。然后调用buildActions方法构建返回一个新的SagaActions //String state = encodeState(newState); //builder.buildActions(data, compensating, state, newState.isEndState()); //public SagaActions<Data> buildActions(Data data, boolean compensating, String state, boolean endState) { //return withUpdatedSagaData(data) // .withUpdatedState(state) // .withIsEndState(endState) // .withIsCompensating(compensating) // .build(); //} return makeSagaActions(builder, data, newState, compensating); //第一次进来时:构建SagaActions完毕,返回到2-4中的start方法 //第二次进来时:构建SagagAcions完毕,返回到2-2中的simulateSuccessfulReplyToLocalActionOrNotification方法 }
2-6 LocalStep类
//2-6 public class LocalStep<Data> implements SagaStep<Data> { private final Consumer<Data> localFunction; private final Optional<Consumer<Data>> compensation; private final List<LocalExceptionSaver<Data>> localExceptionSavers; private final List<Class<RuntimeException>> rollbackExceptions; @Override public StepOutcome makeStepOutcome(Data data, boolean compensating) { try { //如果需要回滚,执行回滚方法,compensation在1-1时的.withCompensation已经传入 if (compensating) { //真正执行业务逻辑方法的地方 compensation.ifPresent(localStep -> localStep.accept(data)); } else { //如果不需要回滚,直接执行补偿方法,localFunction在1-1的.invokeLocal时已经传入 localFunction.accept(data); } return makeLocalOutcome(Optional.empty()); } catch (RuntimeException e) { localExceptionSavers.stream().filter(saver -> saver.shouldSave(e)).findFirst().ifPresent(saver -> saver.save(data, e)); if (rollbackExceptions.isEmpty() || rollbackExceptions.stream().anyMatch(c -> c.isInstance(e))) return makeLocalOutcome(Optional.of(e)); else throw e; } } }
2-7 ParticipantInvocationStep类
//2-7 public class ParticipantInvocationStep<Data> implements SagaStep<Data> { //participantInvocation被1-6的aaddStep方法传递action赋值 private Optional<ParticipantInvocation<Data>> participantInvocation; private Optional<ParticipantInvocation<Data>> compensation; public ParticipantInvocationStep(Optional<ParticipantInvocation<Data>> participantInvocation, Optional<ParticipantInvocation<Data>> compensation, Map<String, BiConsumer<Data, Object>> actionReplyHandlers, Map<String, BiConsumer<Data, Object>> compensationReplyHandlers) { this.actionReplyHandlers = actionReplyHandlers; this.compensationReplyHandlers = compensationReplyHandlers; this.participantInvocation = participantInvocation; this.compensation = compensation; } //判断是否需要回滚,如果需要回滚执行compensation方法,如果不需要执行participantInvocation方法 private Optional<ParticipantInvocation<Data>> getParticipantInvocation(boolean compensating) { return compensating ? compensation : participantInvocation; } @Override public boolean isSuccessfulReply(boolean compensating, Message message) { return getParticipantInvocation(compensating).get().isSuccessfulReply(message); } @Override public StepOutcome makeStepOutcome(Data data, boolean compensating) { //先调用getParticipantInvocation方法,此处假设不需要回滚所以返回participantInvocation //调用makeRemoteStepOutcome方法,传入List<CommandWithDestinationAndType> commandsToSend 返回RemoteStepOutcome类型的结果 return StepOutcome.makeRemoteStepOutcome(getParticipantInvocation(compensating) //调用1-7中的makeCommandToSend方法,执行消息发送方法 .map(pi -> pi.makeCommandToSend(data)) //将返回的CommandWithDestinationAndType包装成单元素列表 .map(Collections::singletonList) //如果上述返回为空则返回一个空列表 .orElseGet(Collections::emptyList)); } }
图解流程
有空再补....
简单案例
花了两天写了个以领域驱动为思想的Saga模式事务管理简陋框架,主要为了讲解:领域驱动模型DDD(三)——使用Saga管理事务 教学而设计的,目前只能在单体架构中使用,后续有时间会更新分布式情况下的新版本。请记住,领域驱动模型是一种思想,它不一定捆绑分布式微服务,只是领域驱动模型思想更有利于分布式情况下对微服务应用的划分。
项目框架地址:https://github.com/CG-Lin/mvn-lin