C#多线程之线程高级(下)

四、Monitor信号构造

信号构造本质:一个线程阻塞直到收到另一个线程发来的通知。

C#多线程之线程高级(下)

当多线程Wait同一对象时,就形成了一个“等待队列(waiting queue)”,和用于等待获得锁的“就绪队列(ready queue)”不同,每次调用Pulse时会释放队头线程,它会进入就绪队列,然后重新获取锁。可以把它想象成一个自动停车场,首先你在收费站(等待队列)排队验票,然后在栅栏前(就绪队列)排队等待放行。

这个队列结构天然有序,但是,对于Wait/Pulse应用通常不重要,在这种场景下把它想象成一个等待线程的“池(pool)”更好理解,每次调用Pulse都会从池中释放一个等待线程。

PulseAll释放整个等待队列或者说等待池。收到Pulse的线程不会完全同时开始执行,而是有序的执行,因为每个Wait语句都要试图重新获取同一把锁。他们的效果就是,PulseAll将线程从等待队列移到就绪队列中,让它们可以继续有序执行。

使用Wait/Pulse需要注意:

  • Wait / Pulse不能lock块之外使用,否则会抛异常。
  • Pulse最多释放一个线程,而PulseAll释放所有线程。
  • Wait会立即释放当前持有的锁,然后进入阻塞,等待脉冲
  • 收到脉冲会立即尝试重新获取锁,如果在指定时间内重新获取,则返回true,如果在超过指定时间获取,则返回false,如果没有获取锁,则一直阻塞不会返回

Wait和Pulse

  1. 定义一个字段,作为同步对象

    private readonly object _locker = new object(); 
  2. 定义一个或多个字段,作为阻塞条件

    private bool _ok; 
  3. 当你希望阻塞的时候

    Monitor.Wait在等待脉冲时,同步对象上的锁会被释放,并且进入阻塞状态,直到收到 _locker上的脉冲,收到脉冲后重新获取 _locker,如果此时 _locker 已经被别的线程占有,则继续阻塞,直至_获取 _locker

    lock (_locker)  {     while (!_ok)     {         Monitor.Wait (_locker);     } } 
  4. 当你希望改变阻塞条件时

    lock (_locker) {     _ok = true;     Monitor.Pulse(_locker);  // Monitor.PulseAll(_locker); } 

WaitPulse几乎是万能的,通过一个bool标识我们就能实现AutoResetEvent/ManualResetEvent的功能,同理使用一个整形字段,就可以实现CountdownEvent/Semaphore

性能方面,调用Pulse花费大概约是在等待句柄上调用Set三分之一的时间。但是,使用WaitPulse进行信号同步,对比事件等待句柄有以下缺点:

  • Wait / Pulse不能跨越应用程序域和进程使用。

  • 必须通过锁保护所有信号同步逻辑涉及的变量。

等待超时

调用Wait方法时,你可以设定一个超时时间,可以是毫秒或TimeSpan的形式。如果因为超时而放弃了等待,那么Wait方法就会返回false

public static bool Wait(object obj, TimeSpan timeout) 

如果在超时到达时仍然没有获得一个脉冲,CLR会主动给它发送一个虚拟的脉冲(virtual pulse),使其能够重新获得锁,然后继续执行,就像收到一个真实脉冲一样。

下面这个例子非常有用,它可以定期的检查阻塞条件。即使其它线程无法按照预期发送脉冲,例如程序之后被其他人修改,但没能正确使用Pulse,这样也可以在一定程度上免疫 bug。因此在复杂的同步设计中可以给所有Wait指定超时时间。

lock (_locker)   while (/* <blocking-condition> */)     Monitor.Wait (_locker, /* <timeout> */); 

Monitor.Wait的boolean类型返回值其实还可以这么理解:其返回值意味着是否获得了一个“真实的脉冲“。

如果”虚拟的脉冲“并不是期待的行为,可以记录日志或抛出异常。

Wait等待一个变量上的脉冲,Pulse对一个变量发送脉冲。脉冲也是一种信号形式,相对于事件等待句柄那种锁存(latching)信号,脉冲顾名思义是一种非锁存或者说易失的信号

双向信号与竞争状态

Monitor.Pulse是一种单向通信机制:发送脉冲的线程不关心发出的脉冲被谁收到了,他没有返回值,不会阻塞,内部也没有确认机制。

当一个线程发起一次脉冲:

  • 如果等待队列中没有任何线程,那么这次发起的脉冲不会有任何效果。
  • 如果等待队列中有线程,线程发送完脉冲并释放锁后,并不能保证接到脉冲信号的等待线程能立即开始工作。

然后我们有一些场景依赖等待线程能够在收到脉冲后及时的响应,此时,双向信号出现了,这是一种自定义的确认机制。

在上文的信号构造基础上改造一个竞争状态的案例:

public class 竞争状态测试 {     private readonly ITestOutputHelper _testOutputHelper;     private readonly object _locker = new object();     private bool _ok;      public 竞争状态测试(ITestOutputHelper testOutputHelper)     {         _testOutputHelper = testOutputHelper;     }      [Fact]     void Show()     {         new Thread(() =>  // Worker         {             for (int i = 0; i < 5; i++)                 lock (_locker)                 {                     while (!_ok) Monitor.Wait(_locker);                     _ok = false;                     _testOutputHelper.WriteLine("Wassup?");                 }         }).Start();          for (int i = 0; i < 5; i++)         {             lock (_locker)             {                 _ok = true;                 Monitor.Pulse(_locker);             }         }     } } 

我们期待的结果:

Wassup? Wassup? Wassup? Wassup? Wassup? 

实际上这个这个程序可能一次”Wassup?“都不会输出:主线程可能在工作线程启动之前完成,这五次Pulse啥事都没干

还记得我们讲事件等待句柄时,使用AutoResetEvent来模拟的双向信号吗?现在使用Monitor来实现一个扩展性更好的版本

public class 双向信号测试 {     private readonly ITestOutputHelper _testOutputHelper;     private readonly object _locker = new();     private bool _entry; // 我是否可以工作了     private bool _ready; // 我是否可以继续投递了      public 双向信号测试(ITestOutputHelper testOutputHelper)     {         _testOutputHelper = testOutputHelper;     }      [Fact]     void Show()     {         new Thread(() =>         {             Thread.Sleep(100);             for (int i = 0; i < 5; i++)             {                 lock (_locker)                 {                     _ready = true;                     Monitor.PulseAll(_locker);                     while (!_entry) Monitor.Wait(_locker);                     _entry = false;                     _testOutputHelper.WriteLine("Wassup?");                 }             }         }).Start();          for (int i = 0; i < 5; i++)         {             lock (_locker)             {                 while (!_ready) Monitor.Wait(_locker);                 _ready = false;                 _entry = true;                 Monitor.PulseAll(_locker);             }         }     } } 

我们仍然使用_ready来作为上游脉冲线程的自旋条件,使用_entry作为下游等待线程的自旋条件。由于我们的逻辑都在lock语句中,即使之后引入了第三个线程,我们的逻辑仍然不会出问题,_ready_entry的读写总是原子的。

升级生产消费队列

  1. 这次,我们将允许多个消费者,各自拥有独立的消费线程。使用一个数组来存放这些线程,并且他们接收的不再是string,而是更加灵活的委托:

    private Thread[] _workers; private Queue<Action> _queue = new Queue<Action>(); 
  2. 和上次一样,我们传递null来告知消费者线程退出:

    foreach (var worker in _workers) {     AddTask(null); } 
  3. 在告知消费线程退出后Join这些线程,等待未完成的任务被消费:

    foreach (var worker in _workers) {      worker.Join(); } 
  4. 每个工作线程会执行一个名为Consume的方法。我们在构造队列时循环创建和启动这些线程:

    _workers = new Thread[workerCount]; for (int i = 0; i < workerCount; i++) {     _workers[i] = new Thread(Consume);     _workers[i].Start(); } 
  5. 消费Comsume方法,一个工作线程从队列中取出并执行一个项目。我们希望工作线程没什么事情做的时候,或者说当队列中没有任何项目时,它们应该被阻塞。因此,我们的阻塞条件是_queue.Count == 0

    private void Consume() {     while (true)     {         Action task;         lock (_locker)         {             while (_queue.Count == 0)             {                 Monitor.Wait(_locker);  // 队列里没任务,释放锁,进入等待             }             // 获取新任务,重新持有锁             task = _queue.Dequeue();         }                  if (task == null) return;  // 空任务代表退出         task();  // 执行任务     } } 
  6. 添加一个任务。出于效率考虑,加入一个任务时,我们调用Pulse而不是PulseAll。这是因为每个项目只需要唤醒(至多)一个消费者。如果你只有一个冰激凌,你不会把一个班 30 个正在睡觉的孩子都叫起来排队获取它。

    public void AddTask(Action task) {     lock (_locker)     {         _queue.Enqueue(task);         Monitor.Pulse(_locker);     } } 

模拟等待句柄

在双向信号中,你可能注意到了一个模式:_flag在当前线程被作为自旋阻塞条件,在另一线程中被设置为true,跳出自旋

lock(_locker) {     while (!_flag) Monitor.Wait(_locker); 	_flag = false; } 

ManualResetEvent

事实上它的工作原理就是模仿AutoResetEvent。如果去掉_flag=false,就得到了ManualResetEvent的基础版本。

private readonly object _locker = new object(); private bool _signal; void WaitOne() {     lock (_locker)     {         while (!_signal) Monitor.Wait(_locker);     } } void Set() {     lock (_locker)     {         _signal = true;         Monitor.PulseAll(_locker);     } } void Reset() {     lock (_locker) _signal = false; } 

使用PulseAll,是因为可能存在多个被阻塞的等待线程。而EventWaitHandle.WaitOne()的通行条件就是:是开着的,ManualResetEvent被放行通过后不会自己关门,只能通过Reset将门关上,再次期间其它所有阻塞线程都能通行。

AutoResetEvent

实现AutoResetEvent非常简单,只需要将WaitOne方法改为:

lock (_locker) {     while (!_signal) Monitor.Wait(_locker);     _signal = false;  // 添加一条,自己关门 } 

然后将Set方法改为:

lock (_locker) {     _signal = true;     Monitor.Pulse(_locker);  // PulseAll替换成Pulse: } 

Semaphore

_signal替换为一个整型字段可以得到Semaphore的基础版本

public class 模拟信号量 {     private readonly object _locker = new object();     private int _count, _initialCount;     public 模拟信号量(int initialCount)     {         _initialCount = initialCount;     }          void WaitOne()  // +1     {         lock (_locker)         {             _count++;             while (_count >= _initialCount)             {                 Monitor.Wait(_locker);             }         }     }      void Release()  // -1     {         lock (_locker)         {             _count --;             Monitor.Pulse(_locker);         }     } } 

模拟CountdownEvent

是不是非常类似信号量?

public class 模拟CountdownEvent {     private object _locker = new object();     private int _initialCount;      public 模拟CountdownEvent(int initialCount)     {         _initialCount = initialCount;     }      public void Signal()  // +1     {         AddCount(-1);     }      public void AddCount(int amount)  // +amount     {         lock (_locker)         {             _initialCount -= amount;             if (_initialCount <= 0) Monitor.PulseAll(_locker);         }     }      public void Wait()     {         lock (_locker)         {             while (_initialCount > 0)                 Monitor.Wait(_locker);         }     } } 

线程会合

CountdownEvent

利用我们刚刚实现的模拟CountdownEvent,来实现两个线程的会和,和同步基础中提到的WaitHandle.SignalAndWait一样。

并且我们也可以通过initialCount将会和的线程扩展到更多个,显而易见的强大。

public class 线程会和测试 {     private readonly ITestOutputHelper _testOutputHelper;     private 模拟CountdownEvent _countdown = new 模拟CountdownEvent(2);      public 线程会和测试(ITestOutputHelper testOutputHelper)     {         _testOutputHelper = testOutputHelper;     }      [Fact]     public void Show()     {         // 每个线程都睡眠一段随机时间         Random r = new Random();         new Thread(Mate).Start(r.Next(10000));         Thread.Sleep(r.Next(10000));          _countdown.Signal();         _countdown.Wait();          _testOutputHelper.WriteLine("Mate! ");     }      void Mate(object delay)     {         Thread.Sleep((int)delay);          _countdown.Signal(); //+1         _countdown.Wait();          _testOutputHelper.WriteLine("Mate! ");     } } 

上面例子,每个线程随机休眠一段时间,然后等待对方,他们几乎在同时打印”Mate!“,这被称为线程执行屏障(thread execution barrier)

当你想让多个线程执行一个系列任务,希望它们步调一致时,可以用到线程执行屏障。然而,我们现在的解决方案有一定限制:我们不能重用同一个Countdown对象来第二次会合线程,至少在没有额外信号构造的情况下不能。为解决这个问题,Framework 4.0 提供了一个新的类Barrier

Barrier

Framework 4.0 加入的一个信号构造。它实现了线程执行屏障(thread execution barrier),允许多个线程在一个时间点会合。这个类非常快速和高效,它是建立在Wait / Pulse和自旋锁基础上的。

  1. 实例化它,指定有多少个线程参与会合(可以调用AddParticipants / RemoveParticipants来进行更改)。

    public Barrier(int participantCount) 
  2. 当希望会合时,调用SignalAndWait。表示参与者已到达障碍,并等待所有其他参与者到达障碍

    public void SignalAndWait() 

    他还实现了协作取消模式

    public void SignalAndWait(CancellationToken cancellationToken) 

    并提供了超时时间的重载,返回一个bool类型,true标识在规定的时间,其他参与者到达障碍,false标识没有全部到达

    public bool SignalAndWait(TimeSpan timeout) 

实例化Barrier,参数为 3 ,意思是调用SignalAndWait会被阻塞直到该方法被调用 3 次。但与CountdownEvent不同,它会自动复位:再调用SignalAndWait仍会阻塞直到被调用 3 次。这允许你保持多个线程“步调一致”,让它们执行一个系列任务。

C#多线程之线程高级(下)

下边的例子中,三个线程步调一致地打印数字 0 到 4:

private readonly ITestOutputHelper _testOutputHelper; private Barrier _barrier = new Barrier(3); public Barrier测试(ITestOutputHelper testOutputHelper) {     _testOutputHelper = testOutputHelper; } [Fact] void Show() {     new Thread(Speak).Start();     new Thread(Speak).Start();     new Thread(Speak).Start(); } void Speak() {     for (int i = 0; i < 5; i++)     {         _testOutputHelper.WriteLine(i.ToString());         _barrier.SignalAndWait();     } } 

Barrier还提供一个非常用有的构造参数,他是一个委托,会在每个会和处执行。不用担心抢占,因为当它被执行时,所有的参与者都是被阻塞的。

public Barrier(int participantCount, Action<Barrier>? postPhaseAction) 

五、拓展

前景回顾:

还记得我们在讲同步的时候提到的最小化共享数据无状态设计吗?经过前面的学习,稍加思考,其实引发线程安全的本质是多线程并发下的数据交互问题。如果我们的数据在线程之间没有交互,或者说我们的数据都是只读的,那不就天然的线程安全了吗?

现在你能理解为什么只读字段是天然线程安全的了吗?

然而有的场景下又需要对公共数据进行读写,同步篇中我们通过很简单的排它锁来保证线程安全,在这里,我们不在满足这种粗暴的粒度(事实上多数时候读总是多于写),这时,读写锁出现了。

ReaderWriterLockSlim

ReaderWriterLockSlim在 Framework 3.5 加入的,被加入了standard 1.0,此类型是线程安全的,用于保护由多个线程读取的资源。

ReaderWriterLockSlim出现的目的是为了取缔ReaderWriterLock,他简化了递归规则以及锁状态的升级和降级规则。避免了许多潜在的死锁情况。 另外,他的性能显著优于ReaderWriterLock。 建议对所有新开发的项目使用ReaderWriterLockSlim

然而如果与普通的lockMonitor.Enter / Exit)对比,他还是要慢一倍。

ReaderWriterLockSlim有三种模式:

  • 读取模式:允许任意多的线程处于读取模式

  • 可升级模式:只允许一个线程处于可升级模式,与读锁兼容

  • 写入模式:完全互斥,不允许任何模式下的线程获取任何锁

ReaderWriterLockSlim定义了如下的方法来获取和释放读 / 写锁:

public void EnterReadLock(); public void ExitReadLock(); public void EnterWriteLock(); public void ExitWriteLock(); 

另外,对应所有EnterXXX的方法,都有相应的TryXXX版本,可以接受一个超时参数,与Monitor.TryEnter类似。

让我们来看一个案例:

模拟三个读线程,两个写线程,并行执行

new Thread(Read).Start(); new Thread(Read).Start(); new Thread(Read).Start(); new Thread(Write).Start(); new Thread(Write).Start(); 

读方法是这样的

while (true) {     _rw.EnterReadLock();     foreach (int number in _items)     {         Console.WriteLine("Thread " + Thread.CurrentThread.ManagedThreadId + " added " + number);         Thread.Sleep(100);     }     _rw.ExitReadLock(); } 

写方法是这样的

while (true) {     int number = _rand.Value.Next(100);     _rw.EnterWriteLock();     _items.Add(number);     _rw.ExitWriteLock();     Console.WriteLine("Thread " + Thread.CurrentThread.ManagedThreadId + " added " + number);     Thread.Sleep(100); } 

随机数生成方法就是用的TLS讲过的

new ThreadLocal<Random>(() => new Random(Guid.NewGuid().GetHashCode())); 

需要注意ReaderWriterLockSlim实现了IDisposable,用完了请记得释放

public class ReaderWriterLockSlim : IDisposable 

运行结果:

Thread 11 added 42 Thread 8 reading 42 Thread 6 reading 42 Thread 7 reading 42 Thread 10 added 98 Thread 8 reading 42 ... 

显而易见的,并发度变高了

锁递归

ReaderWriterLockSlim提供一个构造参数LockRecursionPolicy用于配置锁递归策略

public ReaderWriterLockSlim(LockRecursionPolicy recursionPolicy) 
public enum LockRecursionPolicy {   /// <summary>If a thread tries to enter a lock recursively, an exception is thrown. Some classes may allow certain recursions when this setting is in effect.</summary>   NoRecursion,   /// <summary>A thread can enter a lock recursively. Some classes may restrict this capability.</summary>   SupportsRecursion, } 

默认情况下是使用NoRecursion策略:不允许递归或重入,这与GO的读写锁设计不谋而合,建议使用此默认策略,因为递归引入了不必要的复杂性,并使代码更易于死锁。

public ReaderWriterLockSlim() : this(LockRecursionPolicy.NoRecursion) 

开启支持递归策略后,以下代码不会抛出LockRecursionException异常

var rw = new ReaderWriterLockSlim (LockRecursionPolicy.SupportsRecursion); rw.EnterReadLock(); rw.EnterReadLock(); rw.ExitReadLock(); rw.ExitReadLock(); 

递归锁定级别只能越来越小,级别顺序如下:读锁,可升级锁,写锁。下面代码会抛出LockRecursionException异常

void F() {     var rw = new ReaderWriterLockSlim (LockRecursionPolicy.SupportsRecursion);     rw.EnterReadLock();     rw.EnterWriteLock();     rw.EnterWriteLock();     rw.ExitReadLock(); } Assert.Throws<LockRecursionException>(F); 

可升级锁例外,把可升级锁升级为写锁是合法的。

var rw = new ReaderWriterLockSlim(LockRecursionPolicy.SupportsRecursion); rw.EnterUpgradeableReadLock(); rw.EnterWriteLock(); rw.ExitWriteLock(); rw.ExitUpgradeableReadLock(); 

思考一个问题:为什么只允许一个线程处于可升级模式?

SQL Server ReaderWriterLockSlim
共享锁(Share lock) 读锁(Read lock)
排它锁(Exclusive lock) 写锁(Write lock)
更新锁(Update lock) 可升级锁(Upgradeable lock)

Timer

如果你需要使用规律的时间间隔重复执行一些方法,这个例子会使得一个线程永远被占用

while (true) {     // do something     Thread.Sleep(1000); } 

这时候你会需要Timer

创建计时器时,可以指定在方法首次执行之前等待的时间 dueTime ,以及后续执行之间等待的时间period。 类 Timer 的分辨率与系统时钟相同。 这意味着,如果period小于系统时钟的分辨率,委托将以系统时钟分辨率定义的时间间隔执行,在Windows 7 和Windows 8系统上大约为 15 毫秒。

public Timer(TimerCallback callback, object? state, int dueTime, int period) 

下面这个例子首次间隔1s,之后间隔500ms打印tick...

Timer timer = new Timer ((data) => {     _testOutputHelper.WriteLine(data.ToString()); }, "tick...", 1000, 500); Thread.Sleep(3000); timer.Dispose(); 

计时器委托是在构造计时器时指定的,不能更改。 该方法不会在创建计时器的线程上执行;而是在线程池(thread pool)执行。

如果计时器间隔period小于执行回调所需的时间,或者如果所有线程池线程都在使用,并且回调被多次排队,则可以在两个线程池线程上同时执行回调。

只要使用 Timer,就必须保留对它的引用。 与任何托管对象一样,当没有对其引用时,会受到垃圾回收的约束。 即使 Timer 仍然处于活动状态也不会阻止它被收集。

不再需要计时器时,请调用 Dispose 释放计时器持有的资源。请注意,调用 Dispose() 后仍然可能会发生回调,因为计时器将回调排队供线程池线程执行。可以使用public bool Dispose(WaitHandle notifyObject)重载等待所有回调完成。

System.Threading.Timer是一个普通计时器。 它会回调一个线程池线程(来自工作池)。

System.Timers.Timer是一个System.ComponentModel.Component ,它包装System.Threading.Timer ,并提供一些用于在特定线程上调度的附加功能。

发表评论

相关文章