dotnetty 新的篇章- 开源

一、前言

      因为微服务引擎依赖于dotnetty组件,很多协议都是针对于dotnetty 进行扩展,然后对于老版本https://github.com/azure/dotnetty 停止更新后,本人下载源码进行修改更新,并且大家要求独立仓库进行开源,所以今天整理了代码开源至https://github.com/microsurging/DotNetty,  也希望大家一起贡献代码,让dotnetty 生态更强大。

HttpFlv:http://demo.kayakiot.cn:281/httpflv.html  (黑衣人)

 HttpFlv:http://demo.kayakiot.cn:281/httpflv1.html  (大红包)

HttpFlv:http://demo.kayakiot.cn:281/httpflv2.html  (鹿鼎记)

rtmp:rtmp://demo.kayakiot.cn:76/live1/livestream2   (黑衣人)

rtmp:rtmp://demo.kayakiot.cn:76/live1/livestream3   (大红包)

rtmp:rtmp://demo.kayakiot.cn:76/live1/livestream4(鹿鼎记)

注:测试服务器带宽只有8MB, httpflv  缓冲做的没有rtmp好,然后httpflv卡就多刷新几次

  凯亚 (Kayak) 是什么?

       凯亚(Kayak)是基于.NET8.0软件环境下的surging微服务引擎进行开发的, 平台包含了微服务和物联网平台。支持异步和响应式编程开发,功能包含了物模型,设备,产品,网络组件的统一管理和微服务平台下的注册中心,服务路由,模块,中间服务等管理。还有多协议适配(TCP,MQTT,UDP,CoAP,HTTP,Grpc,websocket,rtmp,httpflv,webservice,等),通过灵活多样的配置适配能够接入不同厂家不同协议等设备。并且通过设备告警,消息通知,数据可视化等功能。能够让你能快速建立起微服务物联网平台系统。

     凯亚物联网平台:http://demo.kayakiot.cn:3100(用户名:fanly  密码:123456)

    链路跟踪Skywalking V8:http://117.72.121.2:8080/

      surging 微服务引擎开源地址:https://github.com/fanliang11/surging(后面surging 会移动到microsurging进行维护)

二、ValueTask扩展支持

IValueTaskPromise:

    public interface IValueTaskPromise: IPromise     {                bool IsVoid { get; }              bool IsCompleted { get; }              bool IsSuccess { get; }              bool IsFaulted { get; }              bool IsCanceled { get; }              bool TryComplete();              void Complete();              bool TrySetException(Exception exception);              bool TrySetException(IEnumerable<Exception> exceptions);              void SetException(Exception exception);              void SetException(IEnumerable<Exception> exceptions);              bool TrySetCanceled();              void SetCanceled();              bool SetUncancellable();               IPromise Unvoid();         }

DefaultValueTaskPromise:

    public class DefaultValueTaskPromise: IValueTaskPromise     {         private readonly CancellationToken _token; #if NET         private readonly TaskCompletionSource _tcs; #else         private readonly ManualResetValueTaskSource<object> _tcs; #endif          private int v_uncancellable = SharedConstants.False;          public DefaultValueTaskPromise()         {             _token = CancellationToken.None; #if NET             _tcs = new TaskCompletionSource(); #else             _tcs = new ManualResetValueTaskSource<object>(); #endif         }          public DefaultValueTaskPromise(object state)         { #if NET             _tcs = new TaskCompletionSource(state); #else             _tcs = new ManualResetValueTaskSource<object>(state); #endif         }          public DefaultValueTaskPromise(CancellationToken cancellationToken)         {             _token= cancellationToken;         }            public ValueTask ValueTask         {             [MethodImpl(InlineMethod.AggressiveOptimization)]             get => _tcs.AwaitVoid(_token);         }          public bool IsVoid => false;          public bool IsSuccess => ValueTask.IsCompletedSuccessfully;          public bool IsCompleted => ValueTask.IsCompleted;          public bool IsFaulted => ValueTask.IsFaulted;          public bool IsCanceled => ValueTask.IsCanceled;         public  Task  Task => ValueTask.AsTask();          public virtual bool TryComplete()         { #if NET             return _tcs.TrySetResult(); #else             return _tcs.SetResult(0); #endif         }          public virtual void Complete()         { #if NET             _tcs.SetResult(); #else             _tcs.SetResult(0); #endif         }         public virtual void SetCanceled()         {             if (SharedConstants.False < (uint)Volatile.Read(ref v_uncancellable)) { return; }             _tcs.SetCanceled();         }          public virtual void SetException(Exception exception)         {             if (exception is AggregateException aggregateException)             {                 SetException(aggregateException.InnerExceptions);                 return;             }             _tcs.SetException(exception);         }          public virtual void SetException(IEnumerable<Exception> exceptions)         {             _tcs.SetException(exceptions.FirstOrDefault());         }          public virtual bool TrySetCanceled()         {             if (SharedConstants.False < (uint)Volatile.Read(ref v_uncancellable)) { return false; }               _tcs.SetCanceled();             return true;         }          public virtual bool TrySetException(Exception exception)         {             if (exception is AggregateException aggregateException)             {                 return TrySetException(aggregateException.InnerExceptions);             }               _tcs.SetException(exception);             return true;         }          public virtual bool TrySetException(IEnumerable<Exception> exceptions)         {               _tcs.SetException(exceptions.FirstOrDefault());             return true;         }          public bool SetUncancellable()         {             if (SharedConstants.False >= (uint)Interlocked.CompareExchange(ref v_uncancellable, SharedConstants.True, SharedConstants.False))             {                 return true;             }             return !IsCompleted;         }          public override string ToString() => "TaskCompletionSource[status: " + ValueTask.AsTask().Status.ToString() + "]";          public IPromise Unvoid() => this;               }

ManualResetValueTaskSource:

 internal interface IStrongBox<T>  {      ref T Value { get; }       bool RunContinuationsAsynchronously { get; set; }  }   public enum ContinuationOptions  {      None,       ForceDefaultTaskScheduler  }   public class ManualResetValueTaskSource<T> : IStrongBox<ManualResetValueTaskSourceLogic<T>>, IValueTaskSource<T>, IValueTaskSource  {      private ManualResetValueTaskSourceLogic<T> _logic;      private readonly Action _cancellationCallback;       [MethodImpl(MethodImplOptions.AggressiveInlining)]      public ManualResetValueTaskSource(ContinuationOptions options = ContinuationOptions.None)      {          _logic = new ManualResetValueTaskSourceLogic<T>(this, options,null);          _cancellationCallback = SetCanceled;      }       public ManualResetValueTaskSource(object state, ContinuationOptions options = ContinuationOptions.None)      {          _logic = new ManualResetValueTaskSourceLogic<T>(this, options,state);          _cancellationCallback = SetCanceled;      }       public short Version => _logic.Version;        [MethodImpl(MethodImplOptions.AggressiveInlining)]      public bool SetResult(T result)      {          lock (_cancellationCallback)          {              if (_logic.Completed)              {                  return false;              }               _logic.SetResult(result);              return true;          }      }       [MethodImpl(MethodImplOptions.AggressiveInlining)]      public void SetException(Exception error)      {          if (Monitor.TryEnter(_cancellationCallback))          {              if (_logic.Completed)              {                  Monitor.Exit(_cancellationCallback);                  return;              }               _logic.SetException(error);              Monitor.Exit(_cancellationCallback);          }      }       public void SetCanceled() => SetException(new TaskCanceledException());       public T GetResult(short token) => _logic.GetResult(token);       void IValueTaskSource.GetResult(short token) => _logic.GetResult(token);       public ValueTaskSourceStatus GetStatus(short token) => _logic.GetStatus(token);       public bool RunContinuationsAsynchronously { get; set; } = true;       public void OnCompleted(Action<object> continuation, object state, short token, ValueTaskSourceOnCompletedFlags flags) => _logic.OnCompleted(continuation, state, token, flags);       ref ManualResetValueTaskSourceLogic<T> IStrongBox<ManualResetValueTaskSourceLogic<T>>.Value => ref _logic;       [MethodImpl(MethodImplOptions.AggressiveInlining)]      public ValueTask<T> AwaitValue(CancellationToken cancellation)      {          CancellationTokenRegistration? registration = cancellation == CancellationToken.None              ? (CancellationTokenRegistration?)null              : cancellation.Register(_cancellationCallback);          return _logic.AwaitValue(this, registration);      }       public ValueTask AwaitVoid(CancellationToken cancellation)      {          CancellationTokenRegistration? registration = cancellation == CancellationToken.None              ? (CancellationTokenRegistration?)null              : cancellation.Register(_cancellationCallback);          return _logic.AwaitVoid(this, registration);      }       public void Reset() => _logic.Reset();  }   internal struct ManualResetValueTaskSourceLogic<TResult>  {      private static readonly Action<object> s_sentinel = s => throw new InvalidOperationException();       private readonly IStrongBox<ManualResetValueTaskSourceLogic<TResult>> _parent;      private readonly ContinuationOptions _options;      private Action<object> _continuation;      private object _continuationState;      private object _capturedContext;      private ExecutionContext _executionContext;      private bool _completed;      private TResult _result;      private ExceptionDispatchInfo _error;      private CancellationTokenRegistration? _registration;             public ManualResetValueTaskSourceLogic(IStrongBox<ManualResetValueTaskSourceLogic<TResult>> parent, ContinuationOptions options,object state)      {          _parent = parent ?? throw new ArgumentNullException(nameof(parent));          _options = options;          _continuation = null;          _continuationState = null;          _capturedContext = null;          _executionContext = null;          _completed = state != null;          _result =state==null? default(TResult): (TResult)state;          _error = null;          Version = 0;          _registration = null;      }       public short Version { get; private set; }       public bool Completed => _completed;       private void ValidateToken(short token)      {          if (token != Version)          {               throw new InvalidOperationException();          }      }       public ValueTaskSourceStatus GetStatus(short token)      {         // ValidateToken(token);           return              !_completed ? ValueTaskSourceStatus.Pending :              _error == null ? ValueTaskSourceStatus.Succeeded :              _error.SourceException is OperationCanceledException ? ValueTaskSourceStatus.Canceled :              ValueTaskSourceStatus.Faulted;      }       public TResult GetResult(short token)      {         // ValidateToken(token);           if (!_completed)          {              return _result;          }           TResult result = _result;          ExceptionDispatchInfo error = _error;          Reset();           error?.Throw();          return result;      }       public void Reset()      {          Version++;           _registration?.Dispose();           _completed = false;          _continuation = null;          _continuationState = null;          _result = default(TResult);          _error = null;          _executionContext = null;          _capturedContext = null;          _registration = null;      }       public void OnCompleted(Action<object> continuation, object state, short token, ValueTaskSourceOnCompletedFlags flags)      {          if (continuation == null)          {              throw new ArgumentNullException(nameof(continuation));          }           ValidateToken(token);            if ((flags & ValueTaskSourceOnCompletedFlags.FlowExecutionContext) != 0)          {              _executionContext = ExecutionContext.Capture();          }           if ((flags & ValueTaskSourceOnCompletedFlags.UseSchedulingContext) != 0)          {              SynchronizationContext sc = SynchronizationContext.Current;              if (sc != null && sc.GetType() != typeof(SynchronizationContext))              {                  _capturedContext = sc;              }              else              {                  TaskScheduler ts = TaskScheduler.Current;                  if (ts != TaskScheduler.Default)                  {                      _capturedContext = ts;                  }              }          }           _continuationState = state;          if (Interlocked.CompareExchange(ref _continuation, continuation, null) != null)          {              _executionContext = null;               object cc = _capturedContext;              _capturedContext = null;               switch (cc)              {                  case null:                      Task.Factory.StartNew(continuation, state, CancellationToken.None, TaskCreationOptions.DenyChildAttach, TaskScheduler.Default);                      break;                   case SynchronizationContext sc:                      sc.Post(s =>                      {                          var tuple = (Tuple<Action<object>, object>)s;                          tuple.Item1(tuple.Item2);                      }, Tuple.Create(continuation, state));                      break;                   case TaskScheduler ts:                      Task.Factory.StartNew(continuation, state, CancellationToken.None, TaskCreationOptions.DenyChildAttach, ts);                      break;              }          }      }       public void SetResult(TResult result)      {          _result = result;          SignalCompletion();      }       [MethodImpl(MethodImplOptions.AggressiveInlining)]      public void SetException(Exception error)      {          _error = ExceptionDispatchInfo.Capture(error);          SignalCompletion();      }       private void SignalCompletion()      {          if (_completed)          {              throw new InvalidOperationException("Double completion of completion source is prohibited");          }           _completed = true;           if (Interlocked.CompareExchange(ref _continuation, s_sentinel, null) != null)          {              if (_executionContext != null)              {                  ExecutionContext.Run(                      _executionContext,                      s => ((IStrongBox<ManualResetValueTaskSourceLogic<TResult>>)s).Value.InvokeContinuation(),                      _parent ?? throw new InvalidOperationException());              }              else              {                  InvokeContinuation();              }          }      }       private void InvokeContinuation()      {          object cc = _capturedContext;          _capturedContext = null;           if (_options == ContinuationOptions.ForceDefaultTaskScheduler)          {              cc = TaskScheduler.Default;          }           switch (cc)          {              case null:                  if (_parent.RunContinuationsAsynchronously)                  {                      var c = _continuation;                      if (_executionContext != null)                      {                          ThreadPool.QueueUserWorkItem(s => c(s), _continuationState);                      }                      else                      {                          ThreadPool.UnsafeQueueUserWorkItem(s => c(s),  _continuationState);                      }                  }                  else                  {                      _continuation(_continuationState);                  }                  break;               case SynchronizationContext sc:                  sc.Post(s =>                  {                      ref ManualResetValueTaskSourceLogic<TResult> logicRef = ref ((IStrongBox<ManualResetValueTaskSourceLogic<TResult>>)s).Value;                      logicRef._continuation(logicRef._continuationState);                  }, _parent ?? throw new InvalidOperationException());                  break;               case TaskScheduler ts:                  Task.Factory.StartNew(_continuation, _continuationState, CancellationToken.None, TaskCreationOptions.DenyChildAttach, ts);                  break;          }      }       public ValueTask<T> AwaitValue<T>(IValueTaskSource<T> source, CancellationTokenRegistration? registration)      {          _registration = registration;          return new ValueTask<T>(source, Version);      }       public ValueTask AwaitVoid(IValueTaskSource source, CancellationTokenRegistration? registration)      {          _registration = registration;          return new ValueTask(source, Version);      }  }

然后把DefaultPromise 替换成DefaultValueTaskPromise,如下图所示

dotnetty 新的篇章- 开源

三、扩展支持rtmp编解码

如下图所示 :

dotnetty 新的篇章- 开源

四、demo展示

开启了三个通道进行推流,cpu,内存都比较稳定

dotnetty 新的篇章- 开源

 在凯亚物联网平台你也可以创建rtmp组件

dotnetty 新的篇章- 开源

 

视频中心

dotnetty 新的篇章- 开源

 

发表评论

评论已关闭。

相关文章