[Kogel.Subscribe.Mssql]SQL Server增量订阅,数据库变更监听

此框架是SQL Server增量订阅,用来监听增删改数据库数据变更

目前仅支持SQL Server,Nuget上可以下载安装

或者使用Nuget命令添加包

dotnet add package Kogel.Subscribe.Mssql --version 0.0.0.1

 

(一)定义需要监听表的实体类 

 /// <summary>     ///      /// </summary>     [Display(Rename = "t_oms_order_detail")]     [ElasticsearchType(RelationName = "t_oms_order_detail", IdProperty = "Id")]     public class OmsOrderDetail : IBaseEntity<OmsOrderDetail, int>     {         /// <summary>         ///          /// </summary>         [Identity]         [Display(Rename = "id")]         [Nest.PropertyName("id")]         public override int Id { get; set; }          /// <summary>         ///          /// </summary>         [Display(Rename = "name")]         [Nest.PropertyName("name")]         public string Name { get; set; }          /// <summary>         ///          /// </summary>         [Display(Rename = "trade_id")]         [Nest.PropertyName("trade_id")]         public int? TradeId { get; set; }          /// <summary>         ///          /// </summary>         [Display(Rename = "descption")]         [Nest.PropertyName("descption")]         public string Descption { get; set; }          /// <summary>         ///          /// </summary>         [Display(Rename = "create_time")]         [Nest.PropertyName("create_time")]         public DateTime CreateTime { get; set; }     }

[Display]和[Identity]属于Kogel.Dapper.Extension的特性如果[想了解更多请点击],[ElasticsearchType]和[Nest.PropertyName]属于Elasticsearch特性,如果没用到可以忽略

 

(二)定义表订阅

    /// <summary>     /// 定义表订阅     /// </summary>     public class OmsOrderDetailSubscribe : Subscribe<OmsOrderDetail>     {         /// <summary>         /// 设置连接配置         /// </summary>         /// <param name="builder"></param>         public override void OnConfiguring(OptionsBuilder<OmsOrderDetail> builder)         {             //此连接字符串账号需要有管理员权限             builder.BuildConnection("数据库连接字符串");         }     }

如果需要此表对应多张分表可以设置

//配置所有表分片 builder.BuildShards(new List<string>             {                 "t_oms_order_detail_1",                 "t_oms_order_detail_2",                 "t_oms_order_detail_3"             })

 

(1).如果想推送订阅到RabbitMQ中

builder.BuilderRabbitMQ(new RabbitMQ.Client.ConnectionFactory             {                 HostName = "localhost",                 UserName = "guest",                 Password = "guest"             })

可以通过BuildTopic设置交换机名称

builder.BuildTopic("kogel_subscribe_order_detail")

 

(2).如果想推送订阅到Kafka中

builder.BuildKafka(new ProducerConfig             {                 BootstrapServers = "localhost:9092",                 Acks = Acks.None             })

可以通过BuildTopic设置Topic名称

builder.BuildTopic("kogel_subscribe_order_detail")

 

(3).如果想推送订阅到Elasticsearch中

 builder.BuildElasticsearch(new ElasticsearchConfig<OmsOrderDetail>             {                 Settings = new Nest.ConnectionSettings(new Uri("http://localhost:9200/")),             })

如果有设置Basic授权

builder.BuildElasticsearch(new ElasticsearchConfig<OmsOrderDetail>             {                 Settings = new Nest.ConnectionSettings(new Uri("http://localhost:9200/"))                     .BasicAuthentication("账号","密码")             })

如果想根据自己定义的分片逻辑插入到多个ES索引中可以通过WriteInterceptor

/// <summary>         /// 设置连接配置         /// </summary>         /// <param name="builder"></param>         public override void OnConfiguring(OptionsBuilder<OmsOrderDetail> builder)         {             //此连接字符串账号需要有管理员权限             builder.BuildConnection("数据库连接字符串");             //定义推送ES             builder.BuildElasticsearch(new ElasticsearchConfig<OmsOrderDetail>             {                 Settings = new Nest.ConnectionSettings(new Uri("http://localhost:9200/"))                     .BasicAuthentication("账号", "密码"),                 WriteInterceptor = message => WriteInterceptor(message)             });         }          /// <summary>         /// 定义自己的索引逻辑         /// </summary>         /// <param name="messages"></param>         /// <returns></returns>         private EsSubscribeMessage<OmsOrderDetail> WriteInterceptor(SubscribeMessage<OmsOrderDetail> message)         {             string esIndexName;             //这里写自己索引分片的业务逻辑             if (message.Result.Id % 3 == 0)             {                 esIndexName = $"kogel_orders_2";             }             else             {                 esIndexName = $"kogel_orders_1";             }             return message.ToEsSubscribeMessage(esIndexName);         }

并且ES索引不存在的时候会动态创建

 

(4).如果想自定义实现订阅逻辑,在可以Subscribe订阅类中重写

/// <summary>         /// 订阅变更 (每一次sql的执行会触发一次Subscribe)         /// </summary>         /// <param name="messageList">消息列表表示所有影响到的数据变更(会受BuildLimit限制,没有查询完成的会在下一次查出)</param>         public override void Subscribes(List<SubscribeMessage<T>> messageList)         {             foreach (var message in messageList)             {                 Console.WriteLine($"执行动作:{message.Operation},更新的表:{message.TableName},更新的id:{message.Result.GetId()}");             }         }

 

以上订阅的优先级:

[Kogel.Subscribe.Mssql]SQL Server增量订阅,数据库变更监听

 

(三)订阅启动

启动监听所有继承自Subscribe<T>的类,在应用程序启动时执行即可

ApplicationProgram.Run();

如果是基础BaseSubscribe<T>中间基类需要定义成abstract,例如

  /// <summary>     /// 基础配置类需要定义成abstract     /// </summary>     /// <typeparam name="T"></typeparam>     public abstract class BaseSubscribe<T> : Subscribe<T>         where T : class, IBaseEntity     {     }

关闭监听,在应用程序退出时执行即可

ApplicationProgram.Close();

 

(四)其他配置

builder.BuildCdcConfig(new CdcConfig             {                 //扫描间隔(每次扫描变更表的间隔,单位毫秒) 默认10000毫秒/10秒                 ScanInterval = 10000,                  //变更捕捉文件在DB保存的时间(默认三天)                 Retention = 60 * 24 * 3,                  //是否首次扫描表全部数据再监听变更(默认false)                 IsFirstScanFull = false,                  //每次检索的变更量(默认10条)                 Limit = 10,                  //变更扫描的偏移量位置(默认从最后中止处开始)                 OffsetPosition = OffsetPositionEnum.Abort             })

 

 

框架开源,完整框架源码可以去Github上下载:

https://github.com/a935368322/Kogel.Subscribe.Mssql

如有问题也可以加QQ群讨论:

技术群 710217654

发表评论

评论已关闭。

相关文章