workflow-core自带事件的局限性
众所周知,workflow-core只能从外向内抛事件
比如在api接口中引发事件,在工作流中等待事件完成
//第一步,开启流程 public async Task<IActionResult> Add([FromBody] Dto parm) { //启动一个新的工作流 //由 Workflow-Core 自动生成(默认是 GUID 格式,唯一标识一个正在运行的工作流实例 //通过该ID可以控制工作流生命周期 var workflowId = await _workflowHost.StartWorkflow("xxxFlow", 1, parm); return SUCCESS(1); } //第二步,审批,引发事件 public async Task<IActionResult> Audit([FromBody] Dto parm) { var modal = parm.Adapt<Dto>().ToUpdate(HttpContext); //通知工作流继续执行 await _workflowHost.PublishEvent( "audit", // 必须与WaitForEvent的事件名称匹配 parm.WorkflowId, // 必须与WaitForEvent的事件Key匹配 parm//提供的数据 ); return SUCCESS(1); }
但是通常来说,我们在前端调用一个接口,要获得返回数据。比如Add后要刷新界面,看到表格中新添加的一行记录。
然而由于我们缺乏这种手段来从流程中获得反馈,而且是等待式await的反馈。所以Add就直接俄返回了,界面上刷新时,流程可能还没跑到插入数据库那一步,于是界面上也看不到新数据。
事件扩展
我研究了一天后,利用C#的扩展方法语法,给workflow-core扩展了外抛事件,这样可以在接口中等待流程中某个步骤完成。
设想
设想调用方法应该这样
//第一步,开启流程 public async Task<IActionResult> Add([FromBody] Dto parm) { //启动一个新的工作流 //由 Workflow-Core 自动生成(默认是 GUID 格式,唯一标识一个正在运行的工作流实例 //通过该ID可以控制工作流生命周期 var workflowId = await _workflowHost.StartWorkflow("xxxFlow", 1, parm); //等待瞬时事件,SubmitProblemStep步骤执行完成 var response = await _workflowHost.WaitEvent<Dto>("SubmitStep", workflowId); return SUCCESS(response); }
而流程中这样引发事件比较优雅
Action<WaitFor, Dto> outputAction = (edata, data) => { (edata.EventData as Dto).Adapt(data);//保证引用对象data不丢失的情况下更新对象data }; //这样引发事件 builder // 提交记录 .StartWith<SubmitStep>() .Input(step=>step.Entity, data=>data) .Output((step,data)=> data=step.Entity)//将WorkflowId赋值给流程,便于WaitFor监听事件 //RaiseEvent扩展的向外抛事件,嵌入原来fluentApi中引发事件 //执行到这里,表示记录已在SubmitStep中插入到数据库了 .RaiseEvent("SubmitStep", data=>data.WorkflowId) //workflow-core自带的向内抛事件 .WaitFor("AuditStep", data => data.WorkflowId, date => DateTime.Now)
实现
我观察了已有的.Then,.Input这些方法,终于搞清楚实现哪个接口,扩展我们的方法。按照设想,需要暴露两个方法
RaiseEvent给内部流程用WaitEvent给外部接口用
/// <summary> /// 工作流从内->外抛出事件扩展 /// </summary> public static class WorkFlowEventExtensions { /// <summary> /// 瞬时事件集合 /// </summary> public static List<EventSource> events = new List<EventSource>(); /// <summary> /// 抛出事件扩展 /// </summary> /// <typeparam name="TData"></typeparam> /// <typeparam name="TStepBody"></typeparam> /// <param name="builder"></param> /// <param name="eventName"></param> /// <param name="eventKey"></param> /// <returns></returns> public static IStepBuilder<TData, ActionStepBody> RaiseEvent<TData, TStepBody>(this IWorkflowModifier<TData, TStepBody> builder, string eventName, Func<TData, string> eventKey) where TStepBody : IStepBody { return builder .Delay(d => TimeSpan.FromMilliseconds(100))//延迟流程小会儿,等待事件events先添加成功 .Then(ctx => { var key = eventKey.Invoke((TData)ctx.Workflow.Data); //释放互斥锁,让WaitEvent继续执行下去 var mu = events.FirstOrDefault(x => x.EventName == eventName && x.EventKey == key); if (mu != null) { mu.Data = ctx.Workflow.Data; //完成任务 mu.Cts.TrySetResult(true); } }); } /// <summary> /// 等待内部事件扩展 /// </summary> /// <typeparam name="TData">可以是workflow的实体类,比如IWorkflow<TData>中的TData类</typeparam> /// <param name="host"></param> /// <param name="eventName">事件名</param> /// <param name="eventKey">事件key,一般是WorkflowId,流程实例Id</param> /// <returns></returns> public static async Task<TData> WaitEvent<TData>(this IWorkflowHost host, string eventName, string eventKey) { TData data = default(TData); //等待任务完成,说明步骤已执行,可以得到需要的数据 await Task.Run(async () => { //向任务集合添加一个任务 EventSource eventCts = new EventSource { EventName = eventName, EventKey = eventKey, Cts = new TaskCompletionSource<bool>(), }; events.Add(eventCts); //等待任务完成 await eventCts.Cts.Task; //退出等待 if (eventCts.Data != null && eventCts.Data is TData) { //取得数据 data = (TData)eventCts.Data; } events.Remove(eventCts); }); return data; } } /// <summary> /// 事件源 /// </summary> public class EventSource { public string EventName { get; set; } public string EventKey { get; set; } /// <summary> /// 任务完成源 /// </summary> public TaskCompletionSource<bool> Cts { get; set; } public object Data { get; set; } }
结果
curd了一下,发现当然是达到了想要的结果😁。
其中的关键断点命中如下
- 第一步

- 第二步

- 第三步
