Dlang 并行化

Dlang 并行化

好难受,dlang 生态太差,没办法,学了半天才明白。

我尽量以精炼的语言解释。

采用 定义,例子(代码),解释 的步骤讲解。

所以你可能看到很多代码,一点解释……

我会省略一些 import,让代码短一些

parallelism 并行

感觉好废物,这一小部分了解即可。

这部分只需要会 parallelmap & amap 其实就差不多了。

介绍比较实用的几种方法。

parallel 迭代

foreach (i; parallel(range, work_uint_size = 100)) {     // do something here } 

其中 work_unit_size 表示最多同时运行的数量。

例子

import std.stdio, std.parallelism; import core.thread;  struct Producer {     void produce() {         Thread.sleep(1.seconds);          writeln("Process +1");     } };  void main() {     auto prods = new Producer[](10);      foreach (prod; parallel(prods)) {         prod.produce();     } } 

Task

创建任务:

auto theTask = task!anOperation(arguments); // or auto theTask = task(&someFunction, parameters...) 

运行任务:theTask.executeInNewThread()

查看是否完成:if (theTask.done) { ... }

获取结果:auto result = theTask.yeildForce()


asyncBuf

感觉没啥用。

并行保存多个需要长时间制作的元素。还需要保证使用的长时间的……

例子:

struct Producer {     int i, total;      bool empty() const {         return total <= i;     }      int front() const {         return i;     }      void popFront() {         writefln("Producing product ID: %d", i);         Thread.sleep(1.seconds / 2);         ++i;     } };  void main() {     auto prods = Producer(0, 10);     foreach (prod; taskPool.asyncBuf(prods, 3)) {         writef("Got product id: %dn", prod);         Thread.sleep(1.seconds);         writeln("Used product...");     } } 

map & amap

先看例子:

int increase(int x) {     Thread.sleep(500.msecs);     return x + 3; }  void main() {     int[] nums;     foreach (i; 0 .. 10) {         nums ~= i;     }      // auto results = taskPool.map!increase(nums);     auto results = taskPool.amap!increase(nums);     foreach (result; results) {         writeln(result);     } } 

可以类比 python 中的 map

两者的区别:

  • map 可以指定同时运行的数量,而 amap 是有多少运行多少。

  • map 会一定程度上按顺序执行,而 amap 并不是顺序执行,它依靠 RandomAccessRange,也就是随机顺序执行。


消息并发

我不知道怎么翻译,反正就是 Message Passing Concurrency

核心方法: spawn (唤起)

我们可以形象的认为,spawn 方法可以唤起一个新的工人(线程)来为我们工作。

并且这个工人与主线程是分开的(先看代码后面解释):

import std.stdio; import std.concurrency; import core.thread; void worker() {     foreach (i; 0 .. 5) {         Thread.sleep(500.msecs);         writeln(i, " (worker) in ", thisTid);      }  } void main() {     Tid myWorkerTid = spawn(&worker);     foreach (i; 0 .. 5) {         Thread.sleep(300.msecs);         writeln(i, " (main) in ", thisTid);      }      writeln("main is done!"); } 

最终输出:

0 (main) in Tid(7f0eb19bc0b0) 0 (worker) in Tid(7f0eb19bc000) 1 (main) in Tid(7f0eb19bc0b0) 2 (main) in Tid(7f0eb19bc0b0) 1 (worker) in Tid(7f0eb19bc000) 3 (main) in Tid(7f0eb19bc0b0) 2 (worker) in Tid(7f0eb19bc000) 4 (main) in Tid(7f0eb19bc0b0) main is done! 3 (worker) in Tid(7f0eb19bc000) 4 (worker) in Tid(7f0eb19bc000) 

实际输出可能略有差异。

解释

  • spawn(&worker) 唤起了一个新的线程运行 worker 函数,并返回了新的线程的 id 是一个结构体 Tid

  • thisTid 类似于一个宏,用于获取当前所在线程的 id


发送消息

先看代码后解释:

void worker() {     int value = 0;     while (value >= 0) {         value = receiveOnly!int();         double result = cast(double)value / 7;         ownerTid.send(result);     } }  void main() {     Tid myWorker = spawn(&worker);      foreach (val; 0 .. 10) {         myWorker.send(val);         double result = receiveOnly!double();         writefln("Send %s got %s", val, result);     }      myWorker.send(-1); // terminate worker process } 

最终输出:

Send 0 got 0 Send 1 got 0.142857 Send 2 got 0.285714 Send 3 got 0.428571 Send 4 got 0.571429 Send 5 got 0.714286 Send 6 got 0.857143 Send 7 got 1 Send 8 got 1.14286 Send 9 got 1.28571 

解释

  • ownerTid 类似于一个宏,用于取得唤醒自己的线程的 Tid,从而发送消息。

  • Tid.send(...) 可以向 Tid 代表的那个线程发送一条消息。

    • 如果同时要发送多个东西,在发送的地方是 Tid.send(a, b, c, ...)

    • 在接受的地方要变化为 receiveOnly!(typeof(a), typeof(b), typeof(c), ...),最终得到的是一个 tuple,可以通过下标访问。

  • receiveOnly!type() 表示只接受类型为 type 的消息。

  • 最后 myWorker.send(-1) 是根据代码逻辑结束的,并不属于通法。

如果我们需要更灵活的接受方法怎么办?

void workerFunc() {     bool isDone = false;     while (!isDone) {         void intHandler(int message) {             writeln("handling int message: ", message);              if (message == -1) {                 writeln("exiting");                 isDone = true;             }         }          void stringHandler(string message) {             writeln("handling string message: ", message);         }                  receive(&intHandler, &stringHandler);     }     } 

我们可以指定多种 Handler 以处理不同的数据类型。利用 receive 注册 到处理类型消息的函数中。


更优雅的方式

处理更多的类型:

struct Exit {}  void worker() {     bool done = false;      while (!done) {         receive(             (int message) {                 writeln("int message ", message);             },              (string message) {                 writeln("string message", message);             },              (Exit message) {                 writeln("Exit message");                 done = true;             },              (Variant message) {                 writeln("Unexpected message: ", message);             }         );     } }  void main() {     Tid myWorker = spawn(&worker);      myWorker.send(10);     myWorker.send("hello");     myWorker.send(10.1);     myWorker.send(Exit()); } 

主要是使用了匿名函数……

解释

  • 利用 std.variant.Variant 以接收任何类型的数据。但是需要保证,处理所有类型数据的方法应该放在最后面,不然会导致全部被判断成 Variant

超时接受

我们可以定一个超时时间,超过这个时间就直接返回。

先看代码:

struct Exit {}  void worker() {     bool done = false;      while (!done) {         bool received = receiveTimeout(600.msecs,             (Exit message) {                 writeln("Exit message");                 done = true;             },              (Variant message) {                 writeln("Some message: ", message);             }         );         if (!received) {             writeln("no message yet...");         }     } }  void main() {     Tid myWorker = spawn(&worker);      myWorker.send(10);     myWorker.send("hello");     Thread.sleep(1.seconds);     myWorker.send(10.1);     myWorker.send(Exit()); } 

最终输出

Some message: 10 Some message: hello no message yet... Some message: 10.1 Exit message 

解释

  • receiveTimeout 只比 recieve 多了一个参数,用于指定超时时间。

  • 返回一个 bool 变量,如果为 false 则没有接收到任何消息。


等待所有线程结束thread_joinAll()

一般来说放在需要放的地方……即可。


数据共享

终于讲到这里了。

我们先考虑一个程序:

import std.stdio; import std.concurrency; import core.thread;  int variable;  void printInfo(string message) {     writefln("%s: %s (@%s)", message, variable, &variable); }  void worker() {     variable = 42;     printInfo("Before the worker is terminated"); }  void main() {     spawn(&worker);     thread_joinAll();     printInfo("After the worker is terminated"); } 

其输出是这样的:

Before the worker is terminated: 42 (@7F308C88C530) After the worker is terminated: 0 (@7F308C98D730) 

可以发现,同样的变量在不同的线程里面地址是不一样的,也就是说数据是独立的,所以要有共享。

此时我们只需要修改:

shared int variable; 

即可。

实际上写为 shared(int) variable; 会更标准,但是好麻烦……

当然,不得不说,有了消息传递,那么数据共享就是备用的方案了。


Data Race

数据竞争是一个很常见的问题。

例子

void worker(shared int* i) {     foreach (t; 0 .. 200000) {         *i = *i + 1;     } }  void main() {     shared int i = 0;      foreach (id; 0 .. 10) {         spawn(&worker, &i);     }      thread_joinAll();     writeln("after i to ", i); } 

期望输出 2000000,但是实际输出可能远小于此。

所以我们要考虑同步:

void worker(shared int* i) {     foreach (t; 0 .. 200000) {         synchronized {             *i = *i + 1;         }     } } 

解释

  • synchronized 会隐式地创建一个锁,保证只有一个线程会持有这个锁,并且执行这些操作。

  • 有些时候,synchronized 会使得因为等待锁的额外开销使得程序变慢。但有些时候,我们可以通过更好的方法避免等待的开销,例如使用原子操作。

  • synchronized 创建的锁只会对于这一个代码块生效,不会影响到其他的代码块。


共用锁

void increase(shared int* i) {     foreach (t; 0 .. 200000) {         synchronized {             *i = *i + 1;         }     } }  void decrese(shared int* i) {     foreach (t; 0 .. 200000) {         synchronized {             *i = *i - 1;         }     } }  void main() {     shared int i = 0;      foreach (id; 0 .. 10) {         if (id & 1) spawn(&increase, &i);         else spawn(&decrese, &i);     }      thread_joinAll();     writeln("after i to ", i); } 

期望输出 0 但是实际输出……不知道。所以我们需要共用锁:

synchronized (lock_object) {     // ... } 

修改后的代码

class Lock {} shared Lock lock = new Lock();  void increase(shared int* i) {     foreach (t; 0 .. 200000) {         synchronized (lock) {             *i = *i + 1;         }     } }  void decrese(shared int* i) {     foreach (t; 0 .. 200000) {         synchronized (lock) {             *i = *i - 1;         }     } } 

现在就可以得到正确的答案了。


同步类

我们可以使用 synchronized 修饰一个类。这相当于在每一个代码块里面嵌套一个 synchronzied

synchronized class Cls {     void func() {         // ...     } } 

上面的等价于:

class Cls {     void func() {         synchronized (this) {             // ...         }     } } 

同步初始化

我们考虑这份代码:

static this() {     writeln("executing static this()"); }  void worker() { } void main() {     spawn(&worker);     thread_joinAll(); } 

最终会输出两次 executing static this()

如果我们修改为 shared static this() { ... },那么最终只会输出一次。


原子操作

需要用到 core.atomic 库。

有代码:

atomic!"+="(var, x); atomic!"-="(var, x); // ... like *= /= ^= ... 

这些都是原子操作。

有方法:

shared(int) *value; bool is_mutated = cas(value, currentValue, newValue); 

如果返回 true,那么值会改变,否则没有。

原子操作一般来说快于 synchronized

同时,原子操作也可以作用于结构体上,这里不作为讲解。

更多操作可以参考标准库:

  • core.sync.barrier

  • core.sync.condition

  • core.sync.config

  • core.sync.exception

  • core.sync.mutex

  • core.sync.rwmutex

  • core.sync.semaphore

发表评论

评论已关闭。

相关文章