大家好,欢迎来到立芯嵌入式。
今天终于来到状态机系列的最后一篇,我们来尝试解决一个问题:“状态机本身实现可能不是大问题,真正头疼的是状态机周边的代码,以及它们如何与外界互动。”
很多坑,其实都埋在状态机之外——我们怎么把它塞进系统里?它怎么跟其他部分“聊天”?
其实不光是状态机,你想想,任何一个处理单元,不管是状态机、一个简单的函数、一个并发线程,甚至是一个基于栈的解析器,只要它需要跟“外部世界”(环境)打交道,或者被外部世界访问,就得先回答几个灵魂拷问:
我要的数据准备好了吗? 这数据现在还靠谱吗?会不会过期了? 我去拿数据的时候,它会不会正好在变?(并发读写问题)
为了说起来方便,这篇文章里,我就把这些需要处理任务的家伙(状态机、线程、函数等)统称为“机器”(Machine)。
另外,根据你的项目是干啥的(应用领域),可能还会有额外的要求,比如:
结果必须在规定时间内给出来(实时性要求)。 没事儿的时候,得赶紧把 CPU 让出来,别占着茅坑不拉X(低功耗、高效率要求)。
这篇文章呢,主要是想带大家快速浏览一下,在设计和实现这种需要多个“机器”互相协作的并发系统时,可能会遇到哪些情况,有哪些设计上的考量点。咱们不抠每一个细节,代码片段也主要是为了启发思路,展示概念,别直接复制粘贴到你的项目里哈(除非你真的完全理解了并且适配了)。
如何给“机器”喂数据?轮询驱动 vs. 事件驱动
最开始接触状态机,还是在开发游戏的时候,那时候我偷了个懒,用了个简化设计:游戏的主循环(那个 super-loop
)像个调度员,每一轮给每个“活着”的游戏对象分配一小片时间(time slice),大家轮流执行(cooperative round-robin),谁也别想一直霸占 CPU。状态机就在分给它的那点时间里,执行当前状态对应的函数,检查各种条件,看看是不是该触发点啥反应,或者跳到下一个状态。
这种玩法,我后来才知道,学名叫“输入驱动”(Input-Driven)的状态机。你可以把它想象成一种轮询机制——状态函数一遍又一遍地去检查输入条件有没有满足。这种模式下,机器没法“睡大觉”等着事情发生,只能不停地问:“来了吗?来了吗?到底来了吗?”
这种设计的优点很明显:实现起来简单粗暴,省去了定义事件、搭建事件队列和分发机制的麻烦。
但缺点也很要命:
扩展性差:所有对象,不管是不是在摸鱼(比如等个定时器),每一轮都会被叫起来溜达一圈(执行 doYourJob
)。效率低下:明明条件可能很久才变一次,却要一遍遍地去查。同一个条件可能被好几个机器在同一轮里反复检查。 调度器依赖:外部调度器变得非常关键,它决定了机器多久被调用一次。如果调度器太简单(比如一个没有任何同步的死循环),你的机器就会以 CPU 能达到的最快速度疯狂执行,可能导致系统其他部分饿死,或者功耗飙高。 依赖噩梦:如果你的机器需要从系统的好几个地方获取输入,很容易搞出一个“上帝对象”(God Object),它几乎依赖所有东西,重构和复用就成了天方夜谭。 测试地狱:测这种机器,你得把它依赖的所有外部环境(那些被轮询的部分)都 mock 出来。依赖项一多,mock 的工作量能让你怀疑人生。 并发陷阱:如果你轮询的是一个共享变量,而这个变量可能随时被其他地方以非原子方式修改,那你就得引入同步机制(比如互斥锁、双缓冲)来避免数据竞争(critical runs)。
来看个“输入驱动”系统骨架:
#include <stdbool.h> // for bool type
#include <stddef.h> // for NULL
// "机器"接口定义 (使用函数指针模拟多态)
typedefstruct Machine_s {
// 指向具体机器实例数据的指针
void* p_instance_data;
// "干活"的函数指针
void (*doYourJob)(void* p_instance_data);
} Machine;
// 假设我们有一个全局的机器数组 (实际项目中可能用链表或动态数组)
#define MAX_MACHINES 10
Machine g_active_machines[MAX_MACHINES];
int g_num_active_machines = 0;
// 调度器函数
void scheduler(Machine machines[], int num_machines) {
while(true) {
for(int i = 0; i < num_machines; ++i) {
// 确保函数指针有效再调用
if (machines[i].doYourJob != NULL) {
// 把实例数据传回去,让函数知道自己在操作哪个对象
machines[i].doYourJob(machines[i].p_instance_data);
}
}
// 在这里进行同步或等待,比如等待下一个时钟滴答、中断,或者短暂休眠
// platform_specific_wait();
}
}
// --- 示例机器实现 ---
typedefstruct MyMachineData_s {
int some_state_variable;
// ... 其他状态数据 ...
} MyMachineData;
void my_machine_do_job(void* p_instance_data) {
MyMachineData* self = (MyMachineData*)p_instance_data;
// 在这里轮询各种输入/条件
// bool condition1 = check_sensor_A();
// bool condition2 = is_timer_expired();
// if (condition1 && self->some_state_variable == 0) { ... }
// 根据条件执行动作和状态转换...
}
// 初始化时...
// MyMachineData my_instance_data;
// g_active_machines[g_num_active_machines].p_instance_data = &my_instance_data;
// g_active_machines[g_num_active_machines].doYourJob = my_machine_do_job;
// g_num_active_machines++;
擂台的另一边,是“事件驱动”(Event-Driven)的状态机。这种机器平时就“躺平”着,啥也不干,直到有事件(Event)送上门来,它才会醒过来处理一下。它没有固定的“时间片”,不处理事件的时候,CPU 占用几乎为零。
这类状态机执行起来轻量,功耗也低。并发处理也相对容易搞定,只要保证一个机器一次只处理一个事件就好。
最后,事件提供了一种统一的方式把信息喂给机器。一方面代码更清晰,降低了理解成本(认知负荷);另一方面,测试也变得简单多了。测事件驱动的机器,不需要 mock 一堆外部依赖,你只需要创建相应的事件,然后塞给它就行了。
当然,天下没有免费的午餐。缺点是你需要设计一套消息传递系统:
事件结构:得定义好事件的数据结构,能承载各种可能产生的事件信息。通常用一个 enum
表示事件类型,再用union
或者void*
带上事件的具体数据。寻址:事件的生产者得知道要把事件送给哪个(或哪些)接收者。 队列:如果事件产生和消费不是同步的(通常都不是),就需要一个队列来缓存事件。队列可以是每个机器一个,也可以是全局共享一个。
下面是一个使用全局共享队列的事件驱动系统骨架:
#include <stdbool.h>
#include <stddef.h>
#include <stdlib.h> // for malloc/free if using dynamic event data
#include <pthread.h> // 假设使用 pthread 做线程安全,或者用平台相关的锁
// 事件类型枚举
typedefenum {
EVENT_TYPE_TIMEOUT,
EVENT_TYPE_SENSOR_READING,
EVENT_TYPE_USER_INPUT,
// ... 其他事件类型
} EventType;
// 基础事件结构
typedefstruct {
EventType type;
void* p_data; // 指向事件数据的指针 (需要根据 type 来解释)
size_t data_size; // 可选:数据大小,方便管理内存
} Event;
// "机器"接口定义 (这次是处理事件的 dispatch)
typedefstruct Machine_s {
void* p_instance_data;
// 分发事件的函数指针
void (*dispatch)(void* p_instance_data, const Event* p_event);
} Machine;
// --- 简化的全局事件队列 (非线程安全,需要加锁) ---
#define EVENT_QUEUE_SIZE 50
Event g_event_queue[EVENT_QUEUE_SIZE];
int g_queue_head = 0;
int g_queue_tail = 0;
int g_queue_count = 0;
pthread_mutex_t g_queue_lock = PTHREAD_MUTEX_INITIALIZER; // 需要初始化锁
// 发送事件到全局队列 (需要考虑线程安全)
bool send_global_event(const Event* p_event) {
bool success = false;
pthread_mutex_lock(&g_queue_lock);
if (g_queue_count < EVENT_QUEUE_SIZE) {
// 注意:这里是浅拷贝事件结构体,如果 p_data 指向动态内存,
// 需要制定好内存管理规则 (谁申请谁释放,或者使用引用计数)
g_event_queue[g_queue_head] = *p_event;
g_queue_head = (g_queue_head + 1) % EVENT_QUEUE_SIZE;
g_queue_count++;
success = true;
} else {
// 队列满了!处理错误,比如丢弃事件或报错
// fprintf(stderr, "Event queue full!\n");
}
pthread_mutex_unlock(&g_queue_lock);
// 可以考虑在这里发出信号,唤醒处理事件的线程
// pthread_cond_signal(&g_queue_cond);
return success;
}
// --- 事件分发器 (通常在单独的线程或主循环中运行) ---
Machine g_all_machines[MAX_MACHINES]; // 假设在别处定义和填充
int g_num_all_machines = 0;
void dispatcher_loop() {
Event current_event;
bool event_found;
while(true) {
event_found = false;
// 检查队列是否有事件 (需要加锁)
pthread_mutex_lock(&g_queue_lock);
if (g_queue_count > 0) {
current_event = g_event_queue[g_queue_tail];
g_queue_tail = (g_queue_tail + 1) % EVENT_QUEUE_SIZE;
g_queue_count--;
event_found = true;
}
pthread_mutex_unlock(&g_queue_lock);
if (event_found) {
// 把事件广播给所有机器
for (int i = 0; i < g_num_all_machines; ++i) {
if (g_all_machines[i].dispatch != NULL) {
g_all_machines[i].dispatch(g_all_machines[i].p_instance_data, ¤t_event);
}
}
// 如果事件数据 p_data 是动态分配的,考虑在这里或者由最后一个使用者释放
// free(current_event.p_data); // 内存管理策略要清晰!
} else {
// 队列为空,可以做点别的事,或者等待新事件
// pthread_cond_wait(&g_queue_cond, &g_queue_lock); // 配合条件变量等待
// platform_specific_yield(); // 或者让出 CPU
}
}
}
这个全局队列的版本有个毛病:没办法让机器只接收它感兴趣的事件。结果就是,所有事件都发给了所有机器,机器收到后还得自己判断一下:“这玩意儿跟我有关系吗?”没关系就直接扔掉。这样做效率不高,但实现简单,对于小项目可能够用了。
想解决这个问题,可以给每个机器配一个专属队列。这样,发送者在发送事件时,就得明确知道该把事件塞到哪个机器的队列里。这种方式解决了“兴趣匹配”的问题,但需要更多内存(每个机器一个队列),并且需要一套机制让发送者能拿到接收者的“地址”(队列句柄)。
看看每个机器一个队列的骨架:
#include <stdbool.h>
#include <stddef.h>
#include <stdlib.h>
#include <pthread.h> // for mutex
// 事件结构 (同上)
typedefenum { /* ... EventType ... */ } EventType;
typedefstruct { EventType type; void* p_data; size_t data_size; } Event;
// 简化的机器专属事件队列 (例如,用链表实现)
typedefstruct EventNode_s {
Event event;
struct EventNode_s* p_next;
} EventNode;
typedefstruct {
EventNode* p_head;
EventNode* p_tail;
int count;
pthread_mutex_t lock; // 每个队列一个锁,用于保护并发访问
} MachineQueue;
// 初始化队列
void machine_queue_init(MachineQueue* p_queue) {
p_queue->p_head = NULL;
p_queue->p_tail = NULL;
p_queue->count = 0;
pthread_mutex_init(&p_queue->lock, NULL); // 初始化互斥锁
}
// 销毁队列 (需要释放所有节点和锁)
void machine_queue_destroy(MachineQueue* p_queue) {
pthread_mutex_lock(&p_queue->lock);
EventNode* current = p_queue->p_head;
while (current != NULL) {
EventNode* next = current->p_next;
// 如果 p_data 是动态分配的,在这里释放
// if (current->event.p_data) free(current->event.p_data);
free(current);
current = next;
}
p_queue->p_head = p_queue->p_tail = NULL;
p_queue->count = 0;
pthread_mutex_unlock(&p_queue->lock);
pthread_mutex_destroy(&p_queue->lock);
}
// 向特定机器的队列发送事件 (线程安全)
bool machine_send_event(MachineQueue* p_queue, const Event* p_event) {
EventNode* p_new_node = (EventNode*)malloc(sizeof(EventNode));
if (!p_new_node) returnfalse; // 内存分配失败
// 复制事件内容 (注意 p_data 的内存管理!)
p_new_node->event = *p_event;
// 如果 p_event->p_data 是动态内存,需要深拷贝或转移所有权
// Example: if(p_event->data_size > 0) {
// p_new_node->event.p_data = malloc(p_event->data_size);
// memcpy(p_new_node->event.p_data, p_event->p_data, p_event->data_size);
// }
p_new_node->p_next = NULL;
pthread_mutex_lock(&p_queue->lock);
if (p_queue->p_tail != NULL) {
p_queue->p_tail->p_next = p_new_node;
p_queue->p_tail = p_new_node;
} else {
p_queue->p_head = p_queue->p_tail = p_new_node;
}
p_queue->count++;
pthread_mutex_unlock(&p_queue->lock);
// 这里可以发出信号,通知该机器的处理线程(如果它在等待)
// machine_signal_event_arrival(p_machine);
returntrue;
}
// "机器"结构,包含自己的队列和处理逻辑
typedefstruct Machine_s {
void* p_instance_data;
void (*dispatch)(void* p_instance_data, const Event* p_event);
MachineQueue event_queue;
} Machine;
// 机器处理自己队列中的事件
void machine_process_events(Machine* p_machine) {
EventNode* p_node_to_process = NULL;
// 一次处理完队列里所有当前事件,避免长时间持有锁
while (true) {
// 从队列头取一个事件
pthread_mutex_lock(&p_machine->event_queue.lock);
p_node_to_process = p_machine->event_queue.p_head;
if (p_node_to_process != NULL) {
p_machine->event_queue.p_head = p_node_to_process->p_next;
if (p_machine->event_queue.p_head == NULL) {
p_machine->event_queue.p_tail = NULL; // 队列空了
}
p_machine->event_queue.count--;
}
pthread_mutex_unlock(&p_machine->event_queue.lock);
if (p_node_to_process == NULL) {
break; // 队列空了,退出处理循环
}
// 调用机器自己的 dispatch 函数处理事件
if (p_machine->dispatch != NULL) {
p_machine->dispatch(p_machine->p_instance_data, &p_node_to_process->event);
}
// 释放事件节点内存
// 注意:如果事件数据是深拷贝的,也要在这里释放
// if (p_node_to_process->event.p_data) free(p_node_to_process->event.p_data);
free(p_node_to_process);
}
}
// 调度器,轮流让每个机器处理自己的事件队列
void scheduler_per_machine(Machine machines[], int num_machines) {
while(true) {
for(int i = 0; i < num_machines; ++i) {
machine_process_events(&machines[i]);
}
// 同步/等待
// platform_specific_wait_or_yield();
}
}
这种“每个机器有自己的队列”的模式,看起来是不是有点像我们最开始说的“输入驱动”?调度器还是在轮流给时间片,只不过机器拿到时间片后,是去处理自己的事件队列,而不是去轮询外部输入。代码里没展示的是,发送者如何知道要把事件发给哪个机器的队列(通常需要某种形式的注册或服务发现机制)。
互相转换?
软件是灵活的,想把输入驱动改成事件驱动,或者反过来,也不是完全不行:
输入驱动 -> 事件驱动:机器在它的时间片里,不去轮询外部硬件/变量,而是去检查自己的“收件箱”(事件队列)里有没有新邮件(事件)。 事件驱动 -> 输入驱动:搞一个特殊的“时间到了”事件(Time-Slice Event),调度器每一轮都给机器发一个这种事件,机器收到后就去执行原本轮询输入的逻辑。
真正难的,是从根本上改变你的环境(Environment)——从需要你主动去轮询状态的“被动式环境”,改造成状态变化时会主动触发事件并发送给你的“主动式环境”。
阻塞?非阻塞?这是个问题
不管你选了哪种模型(输入驱动还是事件驱动),你的机器可能总会遇到需要等待某个“慢操作”完成的情况。比如:
等另一个线程算完一个复杂结果。 等一个 I/O 操作(读串口、写 Flash、网络收发)完成。 等一个互斥锁被释放。
如果我们直接在机器的执行路径上阻塞(block)当前线程,傻等这些事情发生,那麻烦就大了。
为啥?在只有一个线程的裸机 MCU 上,阻塞了当前唯一的线程,整个系统就卡死了,这很直观。但即使是在多核、多线程的现代系统(比如 Linux、Windows,甚至带 RTOS 的嵌入式系统)上,阻塞也通常是个坏主意。
有两点值得你好好琢磨一下(我自己也花了好久才想明白):
上下文切换是开销大户:当你阻塞一个线程时,操作系统通常会把它挪开,让 CPU 去干别的“就绪”状态的活儿。这个过程叫“上下文切换”(Context Switch),它需要保存当前线程的状态,加载下一个线程的状态,这玩意儿挺耗时的。频繁地阻塞和唤醒,会导致大量的上下文切换,拖慢整个应用程序乃至整个系统的性能。 “一人生病,全家吃药”:如果你的多个状态机(或其他处理单元)都跑在同一个线程上(这在事件驱动模型中很常见,一个线程负责处理事件队列并分发给各个机器),一旦其中一个机器因为等待某个资源而阻塞了整个线程,那么这个线程上跑的所有其他机器也跟着一起卡住了,即使它们本来有事件要处理,也只能干等着。
因此,强烈建议:尽可能避免任何形式的阻塞调用!
常见的阻塞操作:
文件/设备 IO: read()
,write()
(的阻塞版本)同步锁: pthread_mutex_lock()
(如果锁已被占用),sem_wait()
(如果信号量为 0)条件等待: pthread_cond_wait()
休眠: sleep()
,usleep()
等待子进程/线程: waitpid()
,pthread_join()
怎么办?
拥抱异步:尽可能使用这些操作的异步(asynchronous)版本。比如,发起一个 IO 请求,然后马上返回,让硬件/驱动在后台处理,完成后通过中断、回调函数或者事件来通知你。 委托给“后台小弟”:如果某个操作实在没有异步版本,或者非常耗时,可以把它丢给一个专门的工作线程(可能来自一个线程池)去做。主线程(跑状态机的线程)继续干别的活。当工作线程完成后,再通过发事件或者调用回调的方式通知主线程:“老板,搞定了!”
状态机肚子里的“货”怎么掏?
让状态机对外提供信息,这个问题跟喂数据有点像。
最直观的方法:Getter 函数
大家(尤其是有 C++/Java 背景的同学)可能首先想到的就是提供一个 Getter 函数。面向对象编程教我们通过 getXxx()
这样的函数来访问对象的内部状态,这能实现一定程度的封装。
// --- Getter 方式 ---
typedefstruct MyMachineData_s {
int current_value;
// ...
pthread_mutex_t data_lock; // 可能需要锁来保护
} MyMachineData;
// Getter 函数
int my_machine_get_value(MyMachineData* self) {
// 如果 current_value 可能在其他线程被修改,需要加锁
pthread_mutex_lock(&self->data_lock);
int value = self->current_value;
pthread_mutex_unlock(&self->data_lock);
return value;
}
// 调用者线程:
// MyMachineData* the_machine = ...;
// int val = my_machine_get_value(the_machine);
这种方法,如果你只是想获取一个不会变的信息(比如查询某个配置参数,像数据包接收超时时间),那可能还行。
但如果想获取的是一个随时间变化的状态,问题就来了。假设这个状态机在后台异步地收集信息,计算出一个值,并且这个值可能还会过期。
调用 Getter 函数的是调用者线程。 更新这个值的状态机逻辑跑在机器线程。
这意味着,你调用 Getter 的时候:
值可能还没准备好(无效)。 值可能是有效的。 值可能正好在被机器线程写入(读到一半的数据?)。
你完全没法确定!
就算你加了锁(比如 pthread_mutex_lock
)来同步访问,又回到了我们刚才讨论的问题:
可能会阻塞调用者线程(等机器线程释放锁)。 可能会阻塞机器线程(等调用者线程释放锁)。 或者两个线程轮流阻塞。
而且,即使加了锁,你拿到的也可能是一个刚刚过期的值。这还是不靠谱。
更好的方法:发个“请求”事件
既然直接去“拿”有风险,不如改成“要”。我们可以建立一套请求/响应(Request/Response)的消息模式。
调用者(Requester)想知道某个值,就给目标机器(Provider)发一个“请求值”的事件(比如 EVENT_TYPE_QUERY_VALUE
)。这个事件里最好带上“我是谁”(比如 Requester 的机器 ID 或队列地址),方便 Provider 回复。Provider 收到请求事件后,在其状态机逻辑中准备好当前的值。 Provider 把值打包成一个“响应值”的事件(比如 EVENT_TYPE_VALUE_RESPONSE
),发送回给 Requester (根据请求事件里带的“我是谁”信息)。
为了避免 Requester 发出请求后傻等响应,Requester 自己最好也是事件驱动的。这样,它发出请求后就可以继续干别的,等收到响应事件后再处理结果。
// --- 请求/响应 模式 (概念 C 代码) ---
// 事件类型扩展
typedefenum {
// ... 其他事件 ...
EVENT_TYPE_QUERY_VALUE_REQUEST,
EVENT_TYPE_VALUE_RESPONSE,
} EventType;
// 请求事件的数据可以包含请求者的标识
typedefstruct {
Machine* p_requester_machine; // 指向请求者机器结构的指针 (或者用 ID)
} QueryValueRequestData;
// 响应事件的数据包含结果值
typedefstruct {
int the_value;
} ValueResponseData;
// --- Requester 机器 ---
typedefstruct RequesterData_s {
Machine base_machine; // 包含 dispatch, event_queue 等
// ... requester state ...
bool waiting_for_response;
} RequesterData;
void requester_dispatch(void* p_inst, const Event* p_event) {
RequesterData* self = (RequesterData*)p_inst;
switch (p_event->type) {
case EVENT_TYPE_VALUE_RESPONSE: {
if (self->waiting_for_response) {
ValueResponseData* resp_data = (ValueResponseData*)p_event->p_data;
printf("Requester received value: %d\n", resp_data->the_value);
self->waiting_for_response = false;
// ... 处理收到的值 ...
}
break;
}
// ... 其他事件处理 ...
default: break;
}
}
void requester_ask_for_value(RequesterData* self, Machine* p_provider_machine) {
if (!self->waiting_for_response) {
Event query_event;
QueryValueRequestData req_data;
req_data.p_requester_machine = (Machine*)self; // 把自己传过去
query_event.type = EVENT_TYPE_QUERY_VALUE_REQUEST;
query_event.p_data = &req_data; // 注意生命周期和数据复制
query_event.data_size = sizeof(req_data);
// 发送请求给 Provider (假设有函数可以发送到指定机器)
// machine_send_event(&p_provider_machine->event_queue, &query_event);
self->waiting_for_response = true;
printf("Requester sent query.\n");
}
}
// --- Provider 机器 ---
typedefstruct ProviderData_s {
Machine base_machine;
int current_internal_value;
// ... provider state ...
} ProviderData;
void provider_dispatch(void* p_inst, const Event* p_event) {
ProviderData* self = (ProviderData*)p_inst;
switch (p_event->type) {
case EVENT_TYPE_QUERY_VALUE_REQUEST: {
QueryValueRequestData* req_data = (QueryValueRequestData*)p_event->p_data;
Machine* p_requester = req_data->p_requester_machine;
// 准备响应事件
Event response_event;
ValueResponseData resp_data;
resp_data.the_value = self->current_internal_value; // 获取当前值
response_event.type = EVENT_TYPE_VALUE_RESPONSE;
response_event.p_data = &resp_data; // 注意生命周期和数据复制
response_event.data_size = sizeof(resp_data);
// 发送响应给 Requester
// machine_send_event(&p_requester->event_queue, &response_event);
printf("Provider sent response with value %d.\n", resp_data.the_value);
break;
}
// ... 其他事件处理 ...
default: break;
}
}
另一种选择:回调函数
还有一种方法是使用回调函数(Callback)。
Requester 在初始化时(或者通过一个专门的事件),向 Provider 注册一个回调函数。 当 Provider 的数据发生变化时(或者定期),Provider 就调用这个注册的回调函数,把新数据传给它。
// --- 回调函数 模式 (简化版代码) ---
// 值类型定义
typedefint ValueType;
// 回调函数指针类型定义
// 参数:新值,传递给回调的用户数据指针 (通常是 Requester 的实例指针)
typedef void (*ValueChangeCallback)(ValueType newValue, void* p_user_data);
// --- Provider 机器 ---
typedefstruct ProviderData_s {
Machine base_machine;
int current_internal_value;
ValueChangeCallback registered_callback; // 存储注册的回调函数指针
void* callback_user_data; // 存储传递给回调的用户数据
pthread_mutex_t callback_lock; // 保护回调指针和用户数据的锁
} ProviderData;
// Provider 提供的注册接口
void provider_register_callback(ProviderData* self, ValueChangeCallback cb, void* p_user_data) {
pthread_mutex_lock(&self->callback_lock);
self->registered_callback = cb;
self->callback_user_data = p_user_data;
pthread_mutex_unlock(&self->callback_lock);
}
// Provider 内部更新值并触发回调
void provider_update_value(ProviderData* self, ValueType new_value) {
self->current_internal_value = new_value; // 更新内部值
// 检查是否有回调注册,并调用它
pthread_mutex_lock(&self->callback_lock);
if (self->registered_callback != NULL) {
// 在 Provider 的线程上下文中调用回调!
self->registered_callback(new_value, self->callback_user_data);
}
pthread_mutex_unlock(&self->callback_lock);
}
// --- Requester 机器 ---
typedefstruct RequesterData_s {
Machine base_machine;
// ... requester state ...
} RequesterData;
// Requester 定义的回调函数 (静态或全局函数)
void requester_on_value_changed(ValueType newValue, void* p_user_data) {
RequesterData* self = (RequesterData*)p_user_data;
printf("Callback received value: %d in Requester context.\n", newValue);
// !!! 关键点:回调函数是在 Provider 线程执行的 !!!
// 不要做耗时或阻塞操作。
// 最安全的方式是:在回调里发一个事件给自己!
Event value_update_event;
ValueResponseData event_data; // 可以复用之前的响应数据结构
event_data.the_value = newValue;
value_update_event.type = EVENT_TYPE_VALUE_RESPONSE; // 复用事件类型
value_update_event.p_data = &event_data; // 小心生命周期,如果复杂需要拷贝
value_update_event.data_size = sizeof(event_data);
// 把事件发送给自己机器的队列
machine_send_event(&self->base_machine.event_queue, &value_update_event);
}
// Requester 初始化时注册回调
void requester_init_and_register(RequesterData* self, ProviderData* p_provider) {
// ... requester 初始化 ...
// machine_queue_init(&self->base_machine.event_queue);
// self->base_machine.dispatch = requester_dispatch; // 设置处理函数
// self->base_machine.p_instance_data = self;
// 向 Provider 注册回调,把自己的实例指针作为 user_data 传过去
provider_register_callback(p_provider, requester_on_value_changed, self);
}
使用回调要注意的关键点:
回调函数是在 Provider 的线程(也就是机器线程)里执行的! 这意味着你在回调函数里花费的任何时间,都是从 Provider 处理其他事件的时间里“偷”走的。 因此,回调函数必须非常快,绝对不能包含任何阻塞操作。
那么,如果 Requester 收到回调里的新值后,需要做一些稍微复杂(甚至可能阻塞)的操作怎么办?
最佳实践:在回调函数里,只做一件事——把收到的新值打包成一个事件,然后发送给 Requester 自己的事件队列。这样,实际的处理逻辑就会回到 Requester 自己的状态机(在 Requester 的线程上下文中安全地)执行,避免了在 Provider 线程里做不该做的事。上面 C 代码示例中的 requester_on_value_changed
就是这么做的。
事件队列的那些“坑”
用了事件驱动和事件队列,是不是就万事大吉了?不, 队列本身也会带来一些让人头疼的问题。
1. 并发访问队列的“坑”
你的机器通常是在事件分发器(Dispatcher)处理队列的时候获得 CPU 时间片的。如果在机器处理某个事件的过程中,它又想往同一个事件队列里发送新的事件(比如,处理完一个请求,立刻发出下一个请求),就可能存在并发访问队列的问题。
如果你的队列实现(比如 C++ 的 std::deque
或某些 C 库的队列)在插入/删除元素时,可能会让其他指向队列元素的迭代器或指针失效(Iterator Invalidation),或者你没有使用正确的锁来保护队列操作,那么多线程并发读写就可能导致数据错乱、内存访问错误甚至程序崩溃。
解决方案:确保你的队列实现是线程安全的,或者在所有访问队列(添加、移除、检查空)的地方都使用互斥锁(Mutex)或其他同步原语(比如我们前面 C 代码示例中加的 pthread_mutex_t
)。
2. 定时器事件的“幽灵”
假设你设计了这样一个状态机:
进入状态 A ( StateA
) 时,启动一个超时定时器 T1(比如 100ms),因为你在等待一个可能来也可能不来的消息Msg
。如果在超时之前收到了 Msg
事件,你会取消定时器 T1,然后转换到状态 B (StateB
)。如果等了 100ms 还没收到 Msg
,定时器 T1 就会触发一个超时事件Timeout
,你可能在状态 A 处理这个超时(比如报错或重试)。
现在,想象一下这个极端但可能发生的场景:
定时器 T1 快要到期了(比如还剩 1ms)。 就在这时,你期待的 Msg
事件到达了,被放进了事件队列。紧接着(在你的状态机处理 Msg
之前),那 1ms 过去了,定时器 T1 也触发了,它的Timeout
事件也被放进了队列。现在你的队列里排着两个事件: [Msg, Timeout]
。你的状态机开始处理队列。第一个事件是 Msg
。太好了!正是想要的。状态机执行动作:取消定时器 T1,然后转换到状态 B。进入状态 B 后,假设你又启动了一个新的超时定时器 T2(可能也是 100ms),因为在状态 B 你在等另一个消息。 状态机继续处理队列。下一个事件是 Timeout
。状态机拿出 Timeout
事件一看:“咦?这是一个Timeout
事件,跟我现在(在状态 B)正在等待的 T2 定时器超时是同一种类型的事件啊!”
这个 Timeout
事件其实是已经取消了的定时器 T1 产生的“幽灵”,但因为它在你取消 T1 之前就已经进入了队列,现在它被你的状态机(在状态 B)当作了是新定时器 T2 的超时!这就导致了错误的行为。
这种“事件别名”(Aliasing)的问题很容易踩进去(别问我怎么知道的,问就是踩过不止一次...)。
怎么破?
区分事件/定时器:让不同定时器产生的超时事件可以被区分开。 方法一:定义不同的超时事件类型( TimeoutA
,TimeoutB
...)。方法二:在超时事件的数据 p_data
里包含一个唯一的定时器 ID。状态机收到超时事件后,检查 ID 是否与当前正在等待的定时器 ID 匹配,不匹配就忽略。定时器事件特殊处理: 方法一:不把定时器超时事件放进普通队列。定时器到期时直接调用一个(非常轻量级的)回调,在回调里判断状态机是否还在等待这个超时,如果是,再生成一个真正的事件放入队列。 方法二:或者,把定时器事件放进一个更高优先级的队列,保证它们能被尽快处理。 让“取消”操作真正生效:让取消定时器的操作,能够把队列里对应的那个待处理的超时事件也移除掉。 方法一:物理删除。取消定时器时,遍历队列,找到并删除那个“幽灵”超时事件(如果队列支持高效查找和删除)。 方法二:逻辑删除。给每个定时器一个“世代号”(generation count)或者状态(active/cancelled)。取消时标记为 cancelled。状态机从队列里取出超时事件时,检查关联的定时器 ID 和它的状态/世代号,如果是已取消的,就直接丢弃这个事件。
不管用哪种方法,如果你在系统里用了事件队列和可取消的定时器,一定要意识到这个潜在的“幽灵事件”问题,不然迟早被它坑到。
总结
这篇文章,我们算是浅尝辄止地聊了聊在一个系统中,不同部分(状态机、线程等)需要互相通信时可能遇到的一些问题和挑战,以及一些可以帮助你应对的设计思路。
几乎每个嵌入式项目,多多少少都会遇到这些情况:需要处理中断、需要同时独立地执行多个任务、需要考虑低功耗等等。
从“输入驱动”转向“事件驱动”,是解锁许多性能、功耗和并发性优化的关键第一步。
总的来说,把状态机(或者其他处理单元)设计成相对封闭的系统,尽量不提供直接访问内部状态的 Getter/Setter 方法,而是通过消息(事件)来进行所有交互(包括设置状态和查询信息),是避免许多并发问题的有效途径。
最后,使用事件队列虽好,但也别忘了它可能带来的“惊喜”,比如那个让人头疼的“幽灵超时事件”。
好啦,关于嵌入式系统状态机的这个系列文章,到这里就暂时告一段落了。希望这趟旅程对你有所启发和帮助!
如果你有任何想法、问题或者踩过的其他坑,欢迎在评论区留言交流!下次再见!
