单片机高效并发编程:基于命名协程的轻量级多任务方案

引言

在嵌入式开发中,如何在资源受限的单片机上实现高效并发一直是个挑战。传统RTOS虽然功能强大,但内存开销和复杂性较高。
本文介绍一种基于协程的轻量级并发方案,实现起来非常简单,通过创新的宏设计实现了代码段命名,大幅提升了代码的可读性和可维护性。
这个编程思路的灵感是lua语言带给我的,如果你学过lua你会发现我就是再精简的模拟lua语言的协程。

协程的基本原理

协程是一种用户态的轻量级线程,其切换由程序控制而不涉及内核态切换,因此开销极小。我们的实现基于 Duff's Device 技术,
通过 switch-case 语句和标号计算实现函数的多重入口。

协程框架核心设计

enum CoroutineStatus{   COROUTINE_READY = 0,       // 就绪状态   COROUTINE_RUNNING = 1,     // 运行状态   COROUTINE_SUSPENDED = 2,   // 挂起状态   COROUTINE_FINISHED = 3     // 完成状态 };  struct Coroutine{   void *args, *res;                   //协程参数和返回值   size_t pc;                          //模拟程序计数器   enum CoroutineStatus status;        // 当前状态   void (*func)(struct Coroutine *);   // 协程函数 };  typedef struct Coroutine Coroutine; typedef void (*CoroutineFuncType)(Coroutine *); 

核心管理函数

// 初始化协程 void coroutineInit(Coroutine *cor, void *args, void *res, CoroutineFuncType func){   cor->args = args;   cor->res = res;   cor->func = func;   cor->pc = 1;   cor->status = COROUTINE_READY; }  // 运行协程单步 int runCoroutine(Coroutine *cor) {   if (cor->status == COROUTINE_READY || cor->status == COROUTINE_SUSPENDED){     cor->status = COROUTINE_RUNNING;     cor->func(cor);     return 0;   }   return -1; }  // 轮询调度器 void schedule(Coroutine *cor, size_t len){   int active;   do{     active = 0;     for (int i = 0; i < len; ++i){       if (cor[i].status != COROUTINE_FINISHED){         runCoroutine(&cor[i]);         active += 1;       }     }   }while (active); } 

协程控制宏

// 跳转到指定位置并挂起 #define COROUTINE_YIELD_SET_PC(cor, val)           do{                                                (cor)->pc = val;                                 (cor)->status = COROUTINE_SUSPENDED;             return ;                                       }while(0)  // 跳转到相对位置并挂起 #define COROUTINE_YIELD_NEXT(cor, delt)            do{                                                (cor)->pc += delt;                               (cor)->status = COROUTINE_SUSPENDED;             return ;                                       }while(0)  // 结束协程 #define COROUTINE_END(cor)                         do{                                                (cor)->status = COROUTINE_FINISHED;              return ;                                       }while (0)  // 生成标签枚举 #define MAKE_LABEL(n1, p1, n2, p2, n3, p3, n4, p4,                          n5, p5, n6, p6, n7, p7, n8, p8,                          n9, p9, n10, p10, n11, p11,                              n12, p12, n13, p13, n14, p14,                            n15, p15, n16, p16, ...)                enum{                                                      n1 = 1, n2, n3, n4, n5, n6, n7, n8, n9, n10,             n11, n12, n13, n14, n15, n16                           }  // 生成case语句切片,模仿goto // 由于标准c语言语法中goto后面只能加常量标签所以这里用switch-case模拟goto语句 #define MAKE_LABEL_SLICE(n1, p1, n2, p2, n3, p3, n4, p4,                                  n5, p5, n6, p6, n7, p7, n8, p8,                                  n9, p9, n10, p10, n11, p11,                                      n12, p12, n13, p13, n14, p14,                                    n15, p15, n16, p16, ...)                  case n1: p1; case n2: p2; case n3: p3; case n4: p4;              case n5: p5; case n6: p6; case n7: p7; case n8: p8;              case n9: p9; case n10: p10; case n11: p11; case n12: p12;        case n13: p13; case n14: p14; case n15: p15; case n16: p16;      default: break;  // 协程主体定义宏 #define COROUTINE_PROGN(cor, ...)                                  MAKE_LABEL(__VA_ARGS__, N16, , N15, , N14, ,N13,                            , N12, , N11, , N10, , N9, , N8, , N7,                           , N6, , N5, , N4, , N3, , N2, , N1,);                 switch ((cor)->pc){                                                MAKE_LABEL_SLICE(__VA_ARGS__, N16, , N15, , N14, ,N13,                            , N12, , N11, , N10, , N9, , N8, , N7,                           , N6, , N5, , N4, , N3, , N2, , N1,);         } 

这里我只写了16个case,也就是说如果超过16个代码片段就会被抛弃,如果你的程序真的很复杂也可以自己再加,
不过16个应该能满足绝大多数情况了。
另一个问题是此方法可能要求编译器有一定优化能力,因为如果只写了几个片段剩下的case就是空的,
不过本人测试了一下,像clang和gcc对空case的优化特别好,哪怕不开编译优化也不会增加代码体积。
接下来我们写一个最简单的多协程计算数组所有元素的和,示范一下:

struct I32AddStructure{   int *array;   size_t len, idx; };  typedef struct I32AddStructure I32AddStructure;  void add(Coroutine *cor){   int *sum = (int *)cor->res;   I32AddStructure *add = (I32AddStructure *)cor->args;   COROUTINE_PROGN(cor,                   ADD_ONE_NUM /*给代码段命名,一般情况下可能无用,                   如果你想COROUTINE_YIELD_SET_PC进行绝对跳转的时候特别有用*/,                   {                     if (add->idx == add->len)                       COROUTINE_END(cor); // 已经到最后一个元素关闭协程                     *sum += add->array[add->idx++];                     COROUTINE_YIELD_NEXT(cor, 0); // 挂起,并且下次执行再执行此代码段                   }     ); }  int main(){   Coroutine cor[2];   int array[100];   I32AddStructure add1, add2;   int sum = 0;   for (int i = 0; i < 100; ++i){     array[i] = i;   }    add1.array = array;   add1.len = 50;   add1.idx = 0;    add2.array = array + 50;  add2.len = 50;   add2.idx = 0;    coroutineInit(&cor[0], &add1, &sum, add);   coroutineInit(&cor[1], &add2, &sum, add);   schedule(cor, 2);   printf("%dn", sum);   return 0; } 

当然这并不是再单片机上运行的程序,只是简单的举个例子。核心用法就是使用COROUTINR_PROGN生成需要分割的代码段。
每个代码段用COROUTINE_YIELD_NEXT(相对跳转并挂起)或者COROURINE_YIELD_SET_PC(绝对跳转并挂起)主动的让出cpu
也就是说可以在一些耗时等待其他硬件操作完成时主动调用COROUTINE_YIELD让出cpu,或者在一段时间内完成多个作业也可以使用这个框架。
接下来再来一个复杂的例子(伪代码):

// 传感器参数 typedef struct {     uint8_t sensor_pin;     float temperature;     float humidity;     uint32_t sample_count; } SensorParams;  void sensorCoroutine(Coroutine *cor) {   SensorParams *params = (SensorParams*)cor->args;   static uint32_t last_sample_time = 0;    COROUTINE_PROGN(cor,                   // 命名代码段:初始化传感器                   INIT_SENSOR,                   {                     printf("初始化传感器引脚 %dn", params->sensor_pin);                     sensorInit(params->sensor_pin);                     last_sample_time = getSystemTime();                     COROUTINE_YIELD_NEXT(cor, 1); //挂起,下次运行时运行下一片段,也就是WAIT_SAMPLE_INTERVAL片段                   }, // 别忘了这里的逗号                    // 命名代码段:等待采样间隔                   WAIT_SAMPLE_INTERVAL,                   {                     if (getSystemTime() - last_sample_time < 1000) { // 1秒间隔                       COROUTINE_YIELD_NEXT(cor, 0); // 保持当前状态                     }                     COROUTINE_YIELD_NEXT(cor, 1); //挂起,下次运行时运行下一片段,也就是READ_SENSOR_DATA片段                   }, // 别忘了这里的逗号                    // 命名代码段:读取传感器数据                   READ_SENSOR_DATA,                   {                     params->temperature = readTemperature(params->sensor_pin);                     params->humidity = readHumidity(params->sensor_pin);                     params->sample_count++;                     last_sample_time = getSystemTime();                     printf("第%lu样本: 温度=%.2fC, 湿度=%.2f%%n",                            params->sample_count, params->temperature, params->humidity);                     COROUTINE_YIELD_NEXT(cor, -1); // 回到等待状态                     // 或者使用COROUTINE_YIELD_SET_PC(cor, WAIT_SAMPLE_INTERVAL);                   }                   ); } 

使用相对跳转还是使用绝对跳转要看情况,使用绝对跳转可以在以后维护添加代码时在一定程度上不受影响,相反的相对跳转会受影响。
相对跳转适合挂起后接着运行下一段代码片段。

技术优势

1. 极低的内存开销

每个协程仅需约20字节内存(还可以接着优化,比如pc和status可以都用uint8_t)
无需为每个任务分配独立堆栈

2. 高效的上下文切换

切换开销仅为几个寄存器操作
无系统调用开销
确定性执行时间

3. 避免复杂的同步机制

协程在明确位置主动让出CPU
无需互斥锁、信号量等同步原语
降低死锁风险

4. 高度可移植性

纯C实现,不依赖特定硬件特性
可在任何支持标准C的平台运行
与RTOS兼容,可作为补充方案

5. 灵活的调度策略

支持轮询、优先级等多种调度方式
可根据系统负载动态调整

关于拓展

值得注意的是我并没有写协程休眠的机制,其实也很好写,需要在Coroutine结构体中增加一个变量,
在CoroutineStatus增加COROUTINE_SLEEPING状态,并添加几个休眠宏函数,再修改runCoroutine函数就可以了:

enum CoroutineStatus{   COROUTINE_READY = 0,   COROUTINE_RUNNING = 1,   COROUTINE_SUSPENDED = 2,   COROUTINE_SLEEPING = 3, //增加睡眠状态   COROUTINE_FINISHED = 4 };  #define GET_TIME() clock(); //在单片机中可以用SysTick获取运行时间  int runCoroutine(Coroutine *cor) {   if (cor->status == COROUTINE_READY || cor->status == COROUTINE_SUSPENDED){     cor->status = COROUTINE_RUNNING;     cor->func(cor);     return 0;   } else if (cor->status == COROUTINE_SLEEPING){     //如果是睡眠状态检测是否到达唤醒时间     if (GET_TIME() >= cor->sleepEndTime){       cor->status = COROUTINE_SUSPENDED;     }     return 0;   }   return -1; }  //睡眠并指定下一次唤醒跳转到哪里,绝对跳转 #define COROUTINE_SLEEP_SET_PC(cor, time, pc_val)          do{                                                        (cor)->pc = val;                                         (cor)->sleepEndTime = GET_TIME() + time;                 (cor)->status = COROUTINE_SLEEPING;                      return ;                                               }while(0)  //睡眠并指定下一次唤醒跳转到哪里,相对跳转 #define COROUTINE_SLEEP_NEXT(cor, time, delt)              do{                                                        (cor)->pc += delt;                                       (cor)->sleepEndTime = GET_TIME() + time;                 (cor)->status = COROUTINE_SLEEPING;                      return ;                                               }while(0)  

不过可能很多人觉得这个和状态机很像,确实是这样,叫它封装的状态机也可以。最重要的是这种封装方式不仅简化代码,
而且也美观了一些不是吗?

结语

本文提出的基于协程的轻量级并发方案,为资源受限的嵌入式系统提供了一种简洁高效的并发编程范式。通过借鉴Lua语言的协程思想,
并巧妙运用C语言的宏定义和Duff's Device技术,我们成功地在单片机上实现了内存开销极低、切换效率极高的协程框架。
该方案的核心优势在于:
极简设计:每个协程仅需约20字节内存,无需独立堆栈
高效切换:纯用户态切换,无系统调用开销
代码优雅:通过宏定义实现了代码段命名,大幅提升了状态机代码的可读性和可维护性
高度可移植:纯C实现,不依赖特定硬件平台

与传统RTOS相比,本方案在满足大多数嵌入式并发需求的同时,避免了复杂的内存管理和同步机制,降低了系统复杂度和死锁风险。
特别是对于那些对内存和实时性要求极高的应用场景,这种轻量级协程框架展现出了独特的价值。
展望未来,该框架还可以进一步扩展,如增加优先级调度、协程间通信、动态创建销毁等功能。希望这个从Lua语言中汲取灵感的实现方案,能够为嵌入式开发者提供新的思路,在资源受限的环境中依然能够编写出清晰、高效的并发代码。
正如编程语言的设计哲学所示:简洁并不等于简单,优雅的解决方案往往来自于对问题本质的深刻理解。

发表评论

评论已关闭。

相关文章