ASP.NET 6 使用工作单元操作 MongoDB

大家好,我是Edison。

最近工作中需要用到MongoDB的事务操作,因此参考了一些资料封装了一个小的组件,提供基础的CRUD Repository基类 和 UnitOfWork工作单元模式。今天,就来简单介绍一下这个小组件。

关于MongoDB的事务

MongoDB在4.2版本开始全面支持了多文档事务,至今已过了四年了,虽然我们可能没有在项目中用MongoDB来替代传统关系型数据库如MySQL/SQL Server,但是不能否认MongoDB已经在事务能力上愈发成熟了。

ASP.NET 6 使用工作单元操作 MongoDB

在MongoDB中,所谓的事务主要指的是多个文档的事务,其使用方式和传统关系型数据库差不多。但我们需要注意的是:多文档事务只能应用在副本集 或 mongos 节点上。如果你只是一个单点的mongo实例,是无法进行多文档事务实践的。

画外音:如果你对MongoDB感兴趣,不妨看看我的这个系列博客:《MongoDB入门到实践学习之旅

那么,如何快速进行事务操作呢?

在Mongo Shell中进行事务

下面演示了如何通过Mongo Shell来进行一个多文档操作的事务提交:

var session = db.getMongo().startSession(); session.startTransaction({readConcern: { level: 'majority' },writeConcern: { w: 'majority' }});  var coll1 = session.getDatabase('students').getCollection('teams'); coll1.update({name: 'yzw-football-team'}, {$set: {members: 20}});  var coll2 = session.getDatabase('students').getCollection('records'); coll1.update({name: 'Edison'}, {$set: {gender: 'Female'}});  // 成功提交事务 session.commitTransaction();  // 失败事务回滚 session.abortTransaction();

在.NET应用中进行事务

下面展示了在.NET应用中通过MongoDB Driver来进行事务的示例:

using (var clientSession = mongoClient.StartSession()) {     try     {         var contacts = clientSession.Client.GetDatabase("testDB").GetCollection<Contact>("contacts");         contacts.ReplaceOne(contact => contact.Id == "1234455", contact);         var books = clientSession.Client.GetDatabase("testDB").GetCollection<Book>("books");         books.DeleteOne(book => book.Id == "1234455");          clientSession.CommitTransaction();     }     catch (Exception ex)     {         // to do some logging         clientSession.AbortTransaction();     } }

在大部分的实际应用中,我们通常都习惯使用数据仓储(Repository)的模式来进行CRUD,同时也习惯用工作单元(UnitOfWork)模式来进行协调多个Repository进行事务提交。那么,如何在自己的项目中实现这个呢?

参考了一些资料后,自己实现了一个基础小组件,暂且叫它:EDT.MongoProxy吧,我们来看看它是如何实现的。

单例的MongoClient

基于MongoDB的最佳时间,对于MongoClient最好设置为单例注入,因为在MongoDB.Driver中MongoClient已经被设计为线程安全可以被多线程共享,这样可还以避免反复实例化MongoClient带来的开销,避免在极端情况下的性能低下。

这里暂且设计一个MongoDbConnection类,用于包裹这个MongoClient,然后将其以单例模式注入IoC容器中。

public class MongoDbConnection : IMongoDbConnection {     public IMongoClient DatabaseClient { get; }     public string DatabaseName { get; }      public MongoDbConnection(MongoDatabaseConfigs configs, IConfiguration configuration)     {         DatabaseClient = new MongoClient(configs.GetMongoClientSettings(configuration));         DatabaseName = configs.DatabaseName;     } }

其中,这个MongoDatabaseConfigs类主要是获取appsettings中的配置,用以生成MongoClient的对应Settings。

/** Config Example "MongoDatabaseConfigs": {   "Servers": "xxx01.edisontalk.net,xxx02.edisontalk.net,xxx03.edisontalk.net",   "Port": 27017,   "ReplicaSetName": "edt-replica",   "DatabaseName": "EDT_Practices",   "AuthDatabaseName": "admin",    "ApplicationName": "Todo",   "UserName": "service_testdev",   "Password": "xxxxxxxxxxxxxxxxxxxxxxxx",   "UseTLS": true,   "AllowInsecureTLS": true,   "SslCertificatePath": "/etc/pki/tls/certs/EDT_CA.cer",   "UseEncryption": true } **/ public class MongoDatabaseConfigs {     private const string DEFAULT_AUTH_DB = "admin"; // Default AuthDB: admin      public string Servers { get; set; }     public int Port { get; set; } = 27017; // Default Port: 27017     public string ReplicaSetName { get; set; }     public string DatabaseName { get; set; }     public string DefaultCollectionName { get; set; } = string.Empty;     public string ApplicationName { get; set; }     public string UserName { get; set; }     public string Password { get; set; }     public string AuthDatabaseName { get; set; } = DEFAULT_AUTH_DB; // Default AuthDB: admin     public string CustomProperties { get; set; } = string.Empty;     public bool UseTLS { get; set; } = false;     public bool AllowInsecureTLS { get; set; } = true;     public string SslCertificatePath { get; set; } = string.Empty;     public bool StoreCertificateInKeyStore { get; set; } = false;       public MongoClientSettings GetMongoClientSettings(IConfiguration configuration = null)     {         if (string.IsNullOrWhiteSpace(Servers))             throw new ArgumentNullException("Mongo Servers Configuration is Missing!");          if (string.IsNullOrWhiteSpace(UserName) || string.IsNullOrWhiteSpace(Password))             throw new ArgumentNullException("Mongo Account Configuration is Missing!");          // Base Configuration         MongoClientSettings settings = new MongoClientSettings         {             ApplicationName = ApplicationName,             ReplicaSetName = ReplicaSetName         };           // Credential         settings.Credential = MongoCredential.CreateCredential(AuthDatabaseName, UserName, Password);          // Servers         var mongoServers = Servers.Split(",", StringSplitOptions.RemoveEmptyEntries).ToList();         if (mongoServers.Count == 1) // Standalone         {             settings.Server = new MongoServerAddress(mongoServers.First(), Port);             settings.DirectConnection = true;         }          if (mongoServers.Count > 1) // Cluster         {             var mongoServerAddresses = new List<MongoServerAddress>();             foreach (var mongoServer in mongoServers)             {                 var mongoServerAddress = new MongoServerAddress(mongoServer, Port);                 mongoServerAddresses.Add(mongoServerAddress);             }             settings.Servers = mongoServerAddresses;             settings.DirectConnection = false;         }          // SSL         if (UseTLS)         {             settings.UseTls = true;             settings.AllowInsecureTls = AllowInsecureTLS;             if (string.IsNullOrWhiteSpace(SslCertificatePath))                 throw new ArgumentNullException("SslCertificatePath is Missing!");              if (StoreCertificateInKeyStore)             {                 var localTrustStore = new X509Store(StoreName.Root);                 var certificateCollection = new X509Certificate2Collection();                 certificateCollection.Import(SslCertificatePath);                 try                 {                     localTrustStore.Open(OpenFlags.ReadWrite);                     localTrustStore.AddRange(certificateCollection);                 }                 catch (Exception ex)                 {                     throw;                 }                 finally                 {                     localTrustStore.Close();                 }             }              var certs = new List<X509Certificate> { new X509Certificate2(SslCertificatePath) };             settings.SslSettings = new SslSettings();             settings.SslSettings.ClientCertificates = certs;             settings.SslSettings.EnabledSslProtocols = System.Security.Authentication.SslProtocols.Tls13;         }          return settings;     } }

核心部分:MongoDbContext

这里我们主要仿照DbContext的设计,设计一个MongoDbContext,它从IoC容器中获取到单例的MongoClient,封装了事务的开启和提交,简化了应用代码的编写。

public class MongoDbContext : IMongoDbContext {     private readonly IMongoDatabase _database;     private readonly IMongoClient _mongoClient;     private readonly IList<Func<IClientSessionHandle, Task>> _commands         = new List<Func<IClientSessionHandle, Task>>();      public MongoDbContext(IMongoDbConnection dbClient)     {         _mongoClient = dbClient.DatabaseClient;         _database = _mongoClient.GetDatabase(dbClient.DatabaseName);     }      public void AddCommand(Func<IClientSessionHandle, Task> func)     {         _commands.Add(func);     }      public async Task AddCommandAsync(Func<IClientSessionHandle, Task> func)     {         _commands.Add(func);         await Task.CompletedTask;     }      /// <summary>     /// NOTES: Only works in Cluster mode     /// </summary>     public int Commit(IClientSessionHandle session)     {         try         {             session.StartTransaction();              foreach (var command in _commands)             {                 command(session);             }              session.CommitTransaction();             return _commands.Count;         }         catch (Exception ex)         {             session.AbortTransaction();             return 0;         }         finally         {             _commands.Clear();         }     }      /// <summary>     /// NOTES: Only works in Cluster mode     /// </summary>     public async Task<int> CommitAsync(IClientSessionHandle session)     {         try         {             session.StartTransaction();              foreach (var command in _commands)             {                 await command(session);             }              await session.CommitTransactionAsync();             return _commands.Count;         }         catch (Exception ex)         {             await session.AbortTransactionAsync();             return 0;         }         finally         {             _commands.Clear();         }     }      public IClientSessionHandle StartSession()     {         var session = _mongoClient.StartSession();         return session;     }      public async Task<IClientSessionHandle> StartSessionAsync()     {         var session = await _mongoClient.StartSessionAsync();         return session;     }      public IMongoCollection<T> GetCollection<T>(string name)     {         return _database.GetCollection<T>(name);     }      public void Dispose()     {         GC.SuppressFinalize(this);     } }

数据仓储:MongoRepositoryBase

在实际项目中,我们都希望有一个基础的RepositoryBase类,将CRUD的方法都封装了,我们实际中就只需要创建一个对应的Repository集成这个RepositoryBase就行了,无需再重复编写CRUD的方法。那么,也就有了这个MongoRepositoryBase类:

public class MongoRepositoryBase<TEntity> : IMongoRepositoryBase<TEntity>         where TEntity : MongoEntityBase, new() {     protected readonly IMongoDbContext _dbContext;     protected readonly IMongoCollection<TEntity> _dbSet;     private readonly string _collectionName;     private const string _keyField = "_id";      public MongoRepositoryBase(IMongoDbContext mongoDbContext)     {         _dbContext = mongoDbContext;         _collectionName = typeof(TEntity).GetAttributeValue((TableAttribute m) => m.Name)             ?? typeof(TEntity).Name;         if (string.IsNullOrWhiteSpace(_collectionName))             throw new ArgumentNullException("Mongo DatabaseName can't be NULL! Please set the attribute Table in your entity class.");          _dbSet = mongoDbContext.GetCollection<TEntity>(_collectionName);     }      #region Create Part      public async Task AddAsync(TEntity entity, IClientSessionHandle session = null)     {         if (session == null)             await _dbSet.InsertOneAsync(entity);         else             await _dbContext.AddCommandAsync(async (session) => await _dbSet.InsertOneAsync(entity));     }      public async Task AddManyAsync(IEnumerable<TEntity> entityList, IClientSessionHandle session = null)     {         if (session == null)             await _dbSet.InsertManyAsync(entityList);         else             await _dbContext.AddCommandAsync(async (session) => await _dbSet.InsertManyAsync(entityList));     }      #endregion      # region Delete Part      public async Task DeleteAsync(string id, IClientSessionHandle session = null)     {         if (session == null)             await _dbSet.DeleteOneAsync(Builders<TEntity>.Filter.Eq(_keyField, new ObjectId(id)));         else             await _dbContext.AddCommandAsync(async (session) => await _dbSet.DeleteOneAsync(Builders<TEntity>.Filter.Eq(_keyField, new ObjectId(id))));     }      public async Task DeleteAsync(Expression<Func<TEntity, bool>> expression, IClientSessionHandle session = null)     {         if (session == null)             await _dbSet.DeleteOneAsync(expression);         else             await _dbContext.AddCommandAsync(async (session) => await _dbSet.DeleteOneAsync(expression));     }      public async Task<DeleteResult> DeleteManyAsync(FilterDefinition<TEntity> filter, IClientSessionHandle session = null)     {         if (session == null)             return await _dbSet.DeleteManyAsync(filter);          await _dbContext.AddCommandAsync(async (session) => await _dbSet.DeleteManyAsync(filter));         return new DeleteResult.Acknowledged(10);     }      public async Task<DeleteResult> DeleteManyAsync(Expression<Func<TEntity, bool>> expression, IClientSessionHandle session = null)     {         if (session == null)             return await _dbSet.DeleteManyAsync(expression);          await _dbContext.AddCommandAsync(async (session) => await _dbSet.DeleteManyAsync(expression));         return new DeleteResult.Acknowledged(10);     }      #endregion      #region Update Part      public async Task UpdateAsync(TEntity entity, IClientSessionHandle session = null)     {         if (session == null)             await _dbSet.ReplaceOneAsync(item => item.Id == entity.Id, entity);         else             await _dbContext.AddCommandAsync(async (session) => await _dbSet.ReplaceOneAsync(item => item.Id == entity.Id, entity));     }      public async Task UpdateAsync(Expression<Func<TEntity, bool>> expression, Expression<Action<TEntity>> entity, IClientSessionHandle session = null)     {         var fieldList = new List<UpdateDefinition<TEntity>>();          if (entity.Body is MemberInitExpression param)         {             foreach (var item in param.Bindings)             {                 var propertyName = item.Member.Name;                 object propertyValue = null;                  if (item is not MemberAssignment memberAssignment) continue;                  if (memberAssignment.Expression.NodeType == ExpressionType.Constant)                 {                     if (memberAssignment.Expression is ConstantExpression constantExpression)                         propertyValue = constantExpression.Value;                 }                 else                 {                     propertyValue = Expression.Lambda(memberAssignment.Expression, null).Compile().DynamicInvoke();                 }                  if (propertyName != _keyField)                 {                     fieldList.Add(Builders<TEntity>.Update.Set(propertyName, propertyValue));                 }             }         }          if (session == null)             await _dbSet.UpdateOneAsync(expression, Builders<TEntity>.Update.Combine(fieldList));         else             await _dbContext.AddCommandAsync(async (session) => await _dbSet.UpdateOneAsync(expression, Builders<TEntity>.Update.Combine(fieldList)));     }      public async Task UpdateAsync(FilterDefinition<TEntity> filter, UpdateDefinition<TEntity> update, IClientSessionHandle session = null)     {         if (session == null)             await _dbSet.UpdateOneAsync(filter, update);         else             await _dbContext.AddCommandAsync(async (session) => await _dbSet.UpdateOneAsync(filter, update));     }      public async Task UpdateManyAsync(Expression<Func<TEntity, bool>> expression, UpdateDefinition<TEntity> update, IClientSessionHandle session = null)     {         if (session == null)             await _dbSet.UpdateManyAsync(expression, update);         else             await _dbContext.AddCommandAsync(async (session) => await _dbSet.UpdateManyAsync(expression, update));     }      public async Task<UpdateResult> UpdateManayAsync(Dictionary<string, string> dic, FilterDefinition<TEntity> filter, IClientSessionHandle session = null)     {         var t = new TEntity();         // Fields to be updated         var list = new List<UpdateDefinition<TEntity>>();         foreach (var item in t.GetType().GetProperties())         {             if (!dic.ContainsKey(item.Name)) continue;             var value = dic[item.Name];             list.Add(Builders<TEntity>.Update.Set(item.Name, value));         }         var updatefilter = Builders<TEntity>.Update.Combine(list);          if (session == null)             return await _dbSet.UpdateManyAsync(filter, updatefilter);          await _dbContext.AddCommandAsync(async (session) => await _dbSet.UpdateManyAsync(filter, updatefilter));         return new UpdateResult.Acknowledged(10, 10, null);     }      #endregion      #region Read Part      public async Task<TEntity> GetAsync(Expression<Func<TEntity, bool>> expression, bool readFromPrimary = true)     {         var readPreference = GetReadPreference(readFromPrimary);         var queryData = await _dbSet.WithReadPreference(readPreference)              .Find(expression)              .FirstOrDefaultAsync();         return queryData;     }      public async Task<TEntity> GetAsync(string id, bool readFromPrimary = true)     {         var readPreference = GetReadPreference(readFromPrimary);         var queryData = await _dbSet.WithReadPreference(readPreference).FindAsync(Builders<TEntity>.Filter.Eq(_keyField, new ObjectId(id)));         return queryData.FirstOrDefault();     }      public async Task<IEnumerable<TEntity>> GetAllAsync(bool readFromPrimary = true)     {         var readPreference = GetReadPreference(readFromPrimary);         var queryAllData = await _dbSet.WithReadPreference(readPreference).FindAsync(Builders<TEntity>.Filter.Empty);         return queryAllData.ToList();     }      public async Task<long> CountAsync(Expression<Func<TEntity, bool>> expression, bool readFromPrimary = true)     {         var readPreference = GetReadPreference(readFromPrimary);         return await _dbSet.WithReadPreference(readPreference).CountDocumentsAsync(expression);     }      public async Task<long> CountAsync(FilterDefinition<TEntity> filter, bool readFromPrimary = true)     {         var readPreference = GetReadPreference(readFromPrimary);         return await _dbSet.WithReadPreference(readPreference).CountDocumentsAsync(filter);     }      public async Task<bool> ExistsAsync(Expression<Func<TEntity, bool>> predicate, bool readFromPrimary = true)     {         var readPreference = GetReadPreference(readFromPrimary);         return await Task.FromResult(_dbSet.WithReadPreference(readPreference).AsQueryable().Any(predicate));     }      public async Task<List<TEntity>> FindListAsync(FilterDefinition<TEntity> filter, string[]? field = null, SortDefinition<TEntity>? sort = null, bool readFromPrimary = true)     {         var readPreference = GetReadPreference(readFromPrimary);         if (field == null || field.Length == 0)         {             if (sort == null)                 return await _dbSet.WithReadPreference(readPreference).Find(filter).ToListAsync();              return await _dbSet.WithReadPreference(readPreference).Find(filter).Sort(sort).ToListAsync();         }          var fieldList = new List<ProjectionDefinition<TEntity>>();         for (int i = 0; i < field.Length; i++)         {             fieldList.Add(Builders<TEntity>.Projection.Include(field[i].ToString()));         }         var projection = Builders<TEntity>.Projection.Combine(fieldList);         fieldList?.Clear();          if (sort == null)             return await _dbSet.WithReadPreference(readPreference).Find(filter).Project<TEntity>(projection).ToListAsync();          return await _dbSet.WithReadPreference(readPreference).Find(filter).Sort(sort).Project<TEntity>(projection).ToListAsync();     }      public async Task<List<TEntity>> FindListByPageAsync(FilterDefinition<TEntity> filter, int pageIndex, int pageSize, string[]? field = null, SortDefinition<TEntity>? sort = null, bool readFromPrimary = true)     {         var readPreference = GetReadPreference(readFromPrimary);         if (field == null || field.Length == 0)         {             if (sort == null)                 return await _dbSet.WithReadPreference(readPreference).Find(filter).Skip((pageIndex - 1) * pageSize).Limit(pageSize).ToListAsync();              return await _dbSet.WithReadPreference(readPreference).Find(filter).Sort(sort).Skip((pageIndex - 1) * pageSize).Limit(pageSize).ToListAsync();         }          var fieldList = new List<ProjectionDefinition<TEntity>>();         for (int i = 0; i < field.Length; i++)         {             fieldList.Add(Builders<TEntity>.Projection.Include(field[i].ToString()));         }         var projection = Builders<TEntity>.Projection.Combine(fieldList);         fieldList?.Clear();          if (sort == null)             return await _dbSet.WithReadPreference(readPreference).Find(filter).Project<TEntity>(projection).Skip((pageIndex - 1) * pageSize).Limit(pageSize).ToListAsync();          return await _dbSet.WithReadPreference(readPreference).Find(filter).Sort(sort).Project<TEntity>(projection).Skip((pageIndex - 1) * pageSize).Limit(pageSize).ToListAsync();     }      #endregion      #region Protected Methods      protected ReadPreference GetReadPreference(bool readFromPrimary)     {         if (readFromPrimary)             return ReadPreference.PrimaryPreferred;         else             return ReadPreference.SecondaryPreferred;     }      #endregion }

工作单元:UnitOfWork

在实际项目中,在对多个Repository操作之后,我们希望有一个统一的提交操作来实现事务的原子性。因此,我们可以有一个UnitOfWork来作为代理:

public class UnitOfWork : IUnitOfWork {     private readonly IMongoDbContext _context;      public UnitOfWork(IMongoDbContext context)     {         _context = context;     }      public bool SaveChanges(IClientSessionHandle session)     {         return _context.Commit(session) > 0;     }      public async Task<bool> SaveChangesAsync(IClientSessionHandle session)     {         return await _context.CommitAsync(session) > 0;     }      public IClientSessionHandle BeginTransaction()     {         return _context.StartSession();     }      public async Task<IClientSessionHandle> BeginTransactionAsync()     {         return await _context.StartSessionAsync();     }      public void Dispose()     {         _context.Dispose();     } }

封装注入:ServiceCollectionExtensions

为了便于应用中快速注入,我们可以简单封装一个扩展方法,快速注入相关的核心组成部分:

public static class ServiceCollectionExtensions {     /// <summary>     /// MongoDB Config Injection     /// </summary>     public static IServiceCollection AddMongoProxy(this IServiceCollection services, IConfiguration configuration)     {         if (!configuration.GetSection(nameof(MongoDatabaseConfigs)).Exists())             return services;          services.Configure<MongoDatabaseConfigs>(configuration.GetSection(nameof(MongoDatabaseConfigs)));         services.AddSingleton(sp => sp.GetRequiredService<IOptions<MongoDatabaseConfigs>>().Value);         services.AddSingleton<IMongoDbConnection, MongoDbConnection>();         services.AddScoped<IMongoDbContext, MongoDbContext>();         services.AddScoped<IUnitOfWork, UnitOfWork>();          return services;     } }

如何使用:三步上篮

第一步:注入MongoProxy核心部分

在appsettings中配置MongoDB的连接信息:

"MongoDatabaseConfigs": {   "Servers": "xxx01.edisontalk.net,xxx02.edisontalk.net,xxx03.edisontalk.net",   "Port": 27017,   "ReplicaSetName": "edt-replica",   "DatabaseName": "EDT_Practices",   "UserName": "xxxxxxxxxxxxx",   "Password": "xxxxxxxxxxxxx" }

然后通过扩展方法注入MongoProxy相关部分:

builder.Services.AddMongoProxy(builder.Configuration);

第二步:添加Entity 和 Repository

示例Entity:

[Table("Orders")] public class OrderEntity : MongoEntityBase, IEntity {     public string OrderNumber { get; set; }     public List<TransmissionEntity> Transmissions { get; set; } }

示例Repository:

public interface ITodoItemRepository : IMongoRepositoryBase<TodoItem> { }  public class TodoItemRepository : MongoRepositoryBase<TodoItem>, ITodoItemRepository {    public TodoItemRepository(IMongoDbContext mongoDbContext)        : base(mongoDbContext)    {    } }  services.AddScoped<ITodoItemRepository, TodoItemRepository>(); services.AddScoped<IOrderRepository, OrderRepository>(); ......

第三步:使用Repository 和 UnitOfWork

# 非事务模式 await _taskRepository.AddManyAsync(newTasks); # 事务模式(借助UnitOfWork工作单元) private readonly IUnitOfWork _unitOfWork;  public OrderService(IUnitOfWork unitOfWork, ......) {     _unitOfWork = unitOfWork;     ...... }  public async Task Example() {     using var session = await _unitOfWork.BeginTransactionAsync())     await _taskRepository.AddManyAsync(newTasks, session);     await _orderRepository.AddAsync(newOrder, session);      await _unitOfWork.SaveChangesAsync(session); }

小结

本文介绍了MongoDB事务的基本概念和如何通过.NET操作事务,重点介绍了EDT.MongoProxy这个小组件的设计,让我们可以在ASP.NET 6应用中通过数据仓储(Repository)和工作单元(UnitOfWork)的模式来快速方便地操作MongoDB的事务。

参考代码

本文代码并未提供所有的,如需查看,请至下面的代码仓库中查看,也可以点个赞给点鼓励。

GitHub:https://github.com/Coder-EdisonZhou/EDT.MongoProxy

参考资料

追逐时光者,《.NET Core MongoDB数据仓储和工作单元实操》  *本文主要设计参考自这篇文章,值得一读!

TheCodeBuzz,《MongoDB Repository Implementation in .NET Core》:

Bryan Avery, 《ASP.NET Core - MongoDB Repository Pattern & Unit Of Work》: 

 

ASP.NET 6 使用工作单元操作 MongoDB

发表评论

评论已关闭。

相关文章