# 3. 深入理解AWFlow
本章导读
AWFlow核心框架是使用C语言实现的,采用了面向对象编程思想,本章将详细介绍AWFlow框架的实现,以便为后续的节点开发工作打下夯实的基础。
# 3.1 对象基类
AWFlow充分使用了面向对象编程思想。在AWFlow中,节点、传递的数据等都是对象,它们都继承自对象基类(tk_object_t)。
tk_object_t通用对象类是在工具库(Toolkit Library)的定义的,ToolKit Library 中包含了一系列基础的软件,比如:动态数组、动态内存分配器、时间分发器、事件队列、链表、定时器、环形缓冲区等等。ToolKit Library现存于AWTK(https://www.zlg.cn/index/pub/awtk)中(源码位于{awtk}/src/tkc目录中),AWFlow复用了AWTK中的这一部分。
# 3.1.1 类型定义
tk_object_t通用对象类的定义如下所示。
typedef struct _tk_object_t tk_object_t;
struct _tk_object_t {
emitter_t emitter; /* 事件分发器,用于外部注册对象事件处理函数*/
int32_t ref_count; /* 引用计数,用于对象的自动释放(当引用计数为0时)*/
char *name; /* 对象的名称*/
uint32_t visiting : 1; /* 标识对象是否在访问中*/
const object_vtable_t *vt; /* 对象虚函数表(核心是包含了抽象方法的定义)*/
};
该类型的详细定义用户无需关心,一般也不会直接访问这些成员。只需要知道,vt成员所指向的结构体,包含了各个抽象方法的实现。换句话说,object_vtable_t类型即定义了任何对象实体都需要实现的抽象方法,其定义详如下所示。
typedef struct _object_vtable_t object_vtable_t;
typedef ret_t (*object_on_destroy_t)(tk_tk_object_t* obj);
typedef int32_t (*object_compare_t)(tk_object_t* obj, tk_object_t* other);
typedef ret_t (*object_remove_prop_t)(tk_object_t* obj, const char* name);
typedef ret_t (*object_get_prop_t)(tk_object_t* obj, const char* name, value_t* v);
typedef ret_t (*object_foreach_prop_t)(tk_object_t* obj, tk_visit_t on_prop, void* ctx);
typedef ret_t (*object_set_prop_t)(tk_object_t* obj, const char* name, const value_t* v);
typedef bool_t (*object_can_exec_t)(tk_object_t* obj, const char* name, const char* args);
typedef ret_t (*object_exec_t)(tk_object_t* obj, const char* name, const char* args);
struct _object_vtable_t {
const char *type; /* 类型*/
const char *desc; /* 描述*/
uint32_t size : 28; /* 具体类所占用的内存空间大小*/
uint32_t is_collection : 1; /* 是否是对象集合,对于单个对象通常为false*/
/* 函数指针 */
object_on_destroy_t on_destroy; /* 对象销毁*/
object_compare_t compare; /* 比较两个对象是否相同*/
object_get_prop_t get_prop; /* 获取对象属性*/
object_set_prop_t set_prop; /* 设置对象属性*/
object_remove_prop_t remove_prop; /* 移除对象属性*/
object_foreach_prop_t foreach_prop; /* 遍历对象属性*/
object_can_exec_t can_exec; /* 判断某一操作是否可以执行*/
object_exec_t exec; /* 执行某一操作*/
};
在_object_vtable_t结构体中,共计定义了8个函数指针(抽象方法),各抽象方法的简要介绍详见下表。
抽象方法 | 简介 |
---|---|
on_destroy | 在销毁对象时被调用 |
compare | 比较两个对象是否相同 |
get_prop | 获取属性,通过属性名,从对象中获取属性的值 |
set_prop | 设置属性,给定一个属性的名称和值,完成该属性的设置 |
remove_prop | 移除属性 |
foreach_prop | 遍历所有属性 |
can_exec | 判断某一操作是否可以执行 |
exec | 执行某一操作 |
这些抽象方法通常并不需要全部实现,比如在AWFlow中,通常仅需实现三个抽象方法:on_destroy、get_prop、set_prop。
在object_vtable_t类型的定义中,除了包含抽象方法的定义之外,还包含4个特殊的成员:type、desc、size、is_collection,用以表示一些基本的信息。
type是一个表示实际类型的字符串,desc是具有描述性的字符串,它们都是单纯的字符串信息,其在AWFlow框架层面并没有任何实质性的作用,主要用于可读性方面的提示,通常设置为一些具有描述意义的字符串即可。
size表示具体对象所占用的内存空间大小。tk_object_t是抽象的对象基类,实际类型往往继承自tk_object_t,并扩展一些自定义的属性和方法,size即表示了实际对象类型的大小,以便在创建具体对象时,根据size的值分配内存空间。
is_collection表示该对象是否是集合对象(集合对象用于包含一系列相同的对象,类似于对象的数组),通常情况下,该值为False,AWFlow中暂时不会使用。
通常情况下,在实现了各个抽象方法之后,即可完成一个object_vtable_t类型的常量定义,代码如下:
static ret_t __set_prop (tk_object_t* obj, const char* name, const value_t* v) {
xxx_t* vm = (xxx_t*)(obj);
/* 其它代码 */
}
static ret_t __get_prop(tk_object_t* obj, const char* name, value_t* v) {
xxx_t* vm = (xxx_t*)(obj);
/* 其它代码 */
}
static bool_t __can_exec(tk_object_t* obj, const char* name, const char* args) {
xxx_t* vm = (xxx_t*)(obj);
/* 其它代码 */
}
static ret_t __exec(tk_object_t* obj, const char* name, const char* args) {
xxx_t* vm = (xxx_t*)(obj);
/* 其它代码 */
}
static ret_t __on_destroy(tk_object_t* obj) {
xxx_t* vm = (xxx_t*)(obj);
/* 其它代码 */
}
static const object_vtable_t __g_imp_vtable = {
.type = "xxx",
.desc = "a simple example",
.size = sizeof(xxx_t), /* size为具体类型的大小 */
.exec = __exec,
.can_exec = __can_exec,
.get_prop = __get_prop,
.set_prop = __set_prop,
.on_destroy = __on_destroy
};
其中,xxx_t表示具体的对象类型,其往往继承自tk_object_t。比如后文介绍的节点类(aw_node_t)。
基于定义的常量(__g_imp_vtable),即可通过object_create()函数完成对象的创建,其函数原型为:
tk_object_t* object_create(const object_vtable_t* vt);
调用形式即为:
tk_object_t* obj = object_create(&__g_imp_vtable);
该函数会根据size成员分配对象空间。由于内存空间是按照具体类型的大小分配的,因此,返回的obj可以强制转换为具体类型的指针使用,比如:
xxx_t* vm = (xxx_t*)(obj);
注意,object_create()创建的对象必须是继承自tk_object_t的对象,在完成内存空间的分配之后,object_create()函数会自动完成基类(tk_object_t)中各个成员的初始化。
# 3.1.2 对象生命周期管理
在对象基类中,实现了一个非常重要的功能:对象生命周期管理。对象生命周期的管理核心是维护一个引用计数器(ref_count)。
一个对象被创建(object_create)时,引用计数器的值为1,后续可以使用TK_OBJECT_REF()来增加引用计数器的值,使用TK_OBJECT_UNREF()来减小引用计数器的值,当引用计数值减为0时,销毁整个对象。TK_OBJECT_REF()和TK_OBJECT_UNREF()的实现范例如下所示。
tk_object_t* TK_OBJECT_REF(tk_object_t* obj) {
obj->ref_count++;
return obj;
}
本文中展示的代码仅为原理性说明,省略了一些非核心的代码,比如参数检查等。
ret_t TK_OBJECT_UNREF(tk_object_t* obj) {
if (obj->ref_count == 1) {
object_destroy(obj);
} else {
obj->ref_count--;
}
return RET_OK;
}
为了便于使用,定义了两个宏:
#define TK_OBJECT_REF(obj) TK_OBJECT_REF((tk_object_t*)(obj))
#define TK_OBJECT_UNREF(obj) \
if ((obj) != NULL) { \
TK_OBJECT_UNREF((tk_object_t*)(obj)); \
(obj) = NULL; \
}
宏定义中主要添加了强制类型转换。实际对象类型都继承自tk_object_t,使用宏可以避免频繁使用强制类型转换。此外,在TK_OBJECT_UNREF()中,还将obj的值设置为了NULL,这是因为在调用TK_OBJECT_UNREF()时,已经意味着调用者不会再使用(访问)对象,将obj强制设置为NULL,可以避免后续可能存在的误操作。
为了保证对象正确的生命周期,在使用对象时,就必须一个最基本的规则:使用一个对象之前,使用TK_OBJECT_REF()完成对该对象的引用,不再使用时,使用TK_OBJECT_UNREF()释放对该对象的引用。
通常情况下,在一个模块中,针对某一特定对象,应该确保TK_OBJECT_REF()和TK_OBJECT_UNREF()都会出现,且操作是成对的,有引用的地方,就一定存在释放引用的地方。特别地,针对对象的创建者,object_create()和TK_OBJECT_UNREF()可以视为一对操作。
# 3.2 节点类
流图的核心是节点,在AWFlow中,定义了抽象的节点类(定义了需要由具体类实现的抽象方法)。为了便于对节点的生命周期管理和属性的配置,节点类(node类)继承自object类(通用对象类型,图中的object_t即tk_object_t,下同),对应的类图详见下图。

上图中仅展示了核心的继承关系和数据处理抽象方法(on_data())的定义,通用对象中的抽象方法(set_prop、get_prop等)并没有展示,这不会影响对AWFlow框架本身的理解,对AWFlow框架的理解重点是关注数据流。
在流图中,一个节点最核心的业务逻辑就是完成对数据的处理。显然,各种功能的节点对数据处理的方法不尽相同。为了统一节点处理数据的调用形式,在aw_node_t类型中,定义了数据处理的抽象方法(on_data),这就要求所有具体的节点类,必须实现该抽象方法。aw_node_t的完整定义如下所示。
struct _aw_node_t;
typedef struct _aw_node_t aw_node_t;
struct _aw_node_t {
tk_object_t object; /* 继承自tk_object_t */
bool_t enable; /* 使能开关,为False时不处理数据 */
char *id; /* 唯一标识,字符串 */
aw_node_category_t category; /* 节点类型 */
uint32_t version; /* 节点版本号 */
const prop_desc_t **prop_descs; /* 属性描述(具有的属性)*/
const func_desc_t **func_descs; /* 函数描述(可以执行的操作)*/
char *consumer_ids; /* 消费者id(多个id之间使用逗号分隔)*/
darray_t consumers; /* 消费者,所有消费者存入该数组中 */
aw_node_on_data_t on_data; /* 数据处理 */
aw_node_on_event_t on_event; /* 事件处理 */
aw_node_manager_t *manager; /* 节点管理器(该节点的管理者)*/
};
下面详细介绍节点类中涉及到的相关成员及概念。
# 3.2.1 节点数据处理
由于节点的核心功能是完成对数据特定的处理,因此,这里首先关注节点类中数据处理抽象方法的定义:on_data。其类型(aw_node_on_data_t)的具体定义为:
typedef ret_t (*aw_node_on_data_t)(aw_node_t* node, tk_object_t* data);
需要处理的数据通过data参数传入,这里的data同样是一个tk_object_t类型的通用对象。在实际应用中,数据包的形式多种多样,这里使用通用对象来表示数据包,可以非常灵活的通过不同名称的属性表示各式各样的数据包,在后文介绍的具体示例中,将会进一步体会到这种灵活性所带来的便捷。
为了统一数据处理函数的调用形式,node类对外提供了封装好的节点数据处理接口:aw_node_on_data(),其实现仅仅只是简单的调用on_data()方法,如下所示。
ret_t aw_node_on_data(aw_node_t* node, tk_object_t* data) {
if (!(node->enable)) {
return RET_FAIL;
}
return node->on_data(node, data);
}
能够处理数据的前提是该节点处于使能状态(enable为True),默认情况下,节点在创建后都处于使能状态。
节点数据处理的具体方法,与节点类别(pump、sink、filter、config)相关,具体数据处理函数的实现,后文还会进一步介绍,这也是理解整个数据流的关键所在。
# 3.2.2 节点事件处理
在节点类的定义中,还定义了一个on_event抽象方法,其类型(aw_node_on_event_t)的具体定义为:
typedef ret_t (*aw_node_on_event_t)(aw_node_t* node, event_t* event);
当前事件通过event参数传入。在具体类实现时,如果需要在特定的事件产生时完成相应的操作,可以实现该方法。若无特殊需求,不必实现该方法。
目前,系统内部产生的与节点相关的事件主要有两个:
# 1. AW_NODE_EVENT_LOADED
该事件在节点加载完毕时产生。节点中部分属性可能需要持久化(掉电不丢失),这些属性需要存储在非易失存储器中。在AWFlow中,将节点导入到系统之中通常会经历两个过程:一是普通的创建过程,此时创建的节点,所有属性都是默认值(创建函数是由节点自身提供的);一是加载(Load)过程,在这个过程中,会将存储在存储器中的持久化属性读取出来,并更新到节点的属性中。在加载完毕时,即会产生AW_NODE_EVENT_LOADED事件。
节点中的属性是否需要持久化,是通过"属性描述"确定的,在3.2.4节中,还会对属性描述作进一步介绍。
# 2. AW_NODE_EVENT_GRAPH_LOADED
该事件在流图加载完成时产生,一个流图中通常有多个节点,AW---Flow启动时,会依序加载各个节点,所有节点加载完毕后,产生AW_NODE_EVENT_GRAPH_LOADED事件。
例如,某一操作需要使用到持久化属性,则建议在AW_NODE_EVENT_LOADED事件产生后再执行,以使用存储在仓库中的最新参数。示意代码如下所示。
static ret_t __on_event(aw_node_t* node, event_t* event)
{
if (event->type == AW_NODE_EVENT_LOADED) {
/* 节点加载完毕,直接使用各种 持久化属性 (存储器中最新的值)*/
}
return RET_OK;
}
其中的__on_event函数即为节点类中on_event()方法的具体实现,在C语言中的表现形式即为将节点类中的on_event函数指针赋值为__on_event。
# 3.2.3 节点基础属性
在1.2节中,介绍了AWFlow中的一些基础概念,比如节点ID、类别、消费者。这些内容在节点类的定义中都有体现。此外,还新增了版本号信息,用于节点版本管理。
# 1. 节点ID
节点ID是每个节点的唯一标识,在AWFlow中,节点类中的id成员即用于表示节点ID,节点ID使用字符串表示。
为确保唯一性,节点ID通常由可视化工具自动分配,AWFlow-Designer就可以自动完成各个节点的ID分配。
# 2. 节点类别
根据节点的输入输出情况,可以将节点分为四个大的类别(详见1.2.2 节点分类)。节点类中的category成员即用于表示节点类别,类型aw_node_category_t是一个枚举类型,其定义如下所示
typedef enum _aw_node_category_t {
AW_NODE_PUMP = 0, /* 输入节点 */
AW_NODE_SINK, /* 输出节点 */
AW_NODE_FILTER, /* 处理节点 */
AW_NODE_CONFIG /* 配置节点(既无输出,也无输入,仅用于配置参数)*/
} aw_node_category_t;
# 3. 节点消费者
消费者是某一节点的后继节点,在节点类的定义中,有两个与消费者相关的成员:consumer_ids和consumers成员。
每个节点的消费者通过consumer_ids指定,其是一个字符串,值为所有消费者节点对应id的集合,当存在多个消费者时,各id之间通过逗号分隔。
显然,根据consumer_ids可以获知节点所有的消费者,但consumer_ids中存储的是消费者节点的ID,如果要将数据传递给消费者还要根据ID找到对应的节点,再调用其中的on_data()方法,实现数据的处理。如果每次数据传递时都要通过ID寻找一次节点,一定程度上会影响效率。基于此,节点类中还定义了consumers成员,其是一个用于存储各个消费者节点对象的数组。在系统启动时,会根据consumer_ids的值找到各个消费者节点,并存于该数组中(数组中直接存放了指向各个消费者节点的指针),以便在向消费者传递数据时,可以更快的完成数据分发,不必在每次数据分发时都根据id寻找一次消费者节点。
节点类提供了一个专用的接口,用于将数据传递给消费者,其原型为:
ret_t aw_node_emit(aw_node_t* node, tk_object_t* data);
不难想象,在该函数的实现中,仅仅需要遍历consumers数组中的各个节点,并在每个节点上调用aw_node_on_data()函数,进而完成将数据分发给每个消费者节点。由此可见,数据的传递是依次进行的,并不是同时传递给所有消费者节点,传递的顺序是依照consumer_ids中各个id的顺序(数组中的节点存放顺序与之保持一致)。示意代码如下所示。
ret_t aw_node_emit(aw_node_t* node, tk_object_t* data)
{
aw_node_t * iter = NULL;
aw_node_t **nodes = NULL;
if (!(node->enable)) {
return RET_FAIL;
}
nodes = (aw_node_t**)(node->consumers.elms); /* elms 存放了各个节点的指针 */
if (node != NULL) {
uint32_t i = 0;
uint32_t nr = node->consumers.size; /* size 是数组的大小 */
for (i = 0; i < nr; i++) {
iter = nodes[i];
if (aw_node_on_data(iter, data) == RET_STOP) {
break;
}
}
}
return RET_OK;
}
需要注意的是,如果某一消费者在处理数据时修改了原始数据(例如,将原数据缓存中的所有小写字母修改为了大写字母),那么将影响排在其后面的所有消费者,它们在处理数据时,缓存中将是上一个消费者处理的结果。这可能会破坏数据流关系,对同属某一节点的其它消费者造成"不良"影响。因此,在节点处理中,均不建议修改原始数据对象中的数据。
用户通常并不需要直接调用aw_node_emit()接口,系统内部会根据当前节点的消费者节点,自动调用该函数完成数据的分发。
# 4. 节点版本
节点版本使用3部分表示:主版本号、次版本号、修订号。组合成一个32位无符号整数(主版本号:高16位;次版本号:8 ~ 15位;修订号:0 ~ 7位),例如,1.2.3对应的版本号为:
(1 << 16) | (2 << 8) | (3)
# 5. 属性设置与获取函数
为了便于获取或设置这些基础属性,AWFlow提供了对应的设置和获取函数,范例程序如下所示。
ret_t aw_node_set_prop_default(aw_node_t* node, const char* name, const value_t* v) {
return_value_if_fail(node != NULL && name != NULL && v != NULL, RET_BAD_PARAMS);
if (tk_str_eq(name, AW_NODE_PROP_ID)) {
aw_node_set_id(node, value_str(v));
return RET_OK;
} else if (tk_str_eq(name, AW_NODE_PROP_NAME)) {
object_set_name(OBJECT(node), value_str(v));
return RET_OK;
} else if (tk_str_eq(name, AW_NODE_PROP_ENABLE)) {
aw_node_set_enable(node, value_bool(v));
return RET_OK;
} else if (tk_str_eq(name, AW_NODE_PROP_VERSION)) {
return RET_NOT_IMPL;
}
return RET_NOT_FOUND;
}
ret_t aw_node_get_prop_default(aw_node_t* node, const char* name, value_t* v) {
return_value_if_fail(node != NULL && name != NULL && v != NULL, RET_BAD_PARAMS);
if (tk_str_eq(name, AW_NODE_PROP_ID)) {
value_set_str(v, node->id);
return RET_OK;
} else if (tk_str_eq(name, AW_NODE_PROP_NAME)) {
value_set_str(v, node->object.name);
return RET_OK;
} else if (tk_str_eq(name, AW_NODE_PROP_ENABLE)) {
value_set_bool(v, node->enable);
return RET_OK;
} else if (tk_str_eq(name, AW_NODE_PROP_VERSION)) {
value_set_uint32(v, node->version);
return RET_OK;
}
return RET_NOT_FOUND;
}
程序中仅提供了4个属性的设置和获取,属性名称都使用了宏进行定义,如下所示。
#define AW_NODE_PROP_ID "id"
#define AW_NODE_PROP_NAME "name"
#define AW_NODE_PROP_ENABLE "enable"
#define AW_NODE_PROP_VERSION "version"
注意,消费者属性并没有提供设置和获取接口,因其仅在构建流图时使用,向消费者分发数据也提供了aw_node_emit() 函数。此外,每个节点还有一个额外的名称(name)属性,该属性是从tk_object_t类继承而来的。
在节点类的定义中,还存在一个名为manager的成员,该成员指向节点所属的节点管理器,节点管理器专门用于管理系统中的所有节点,该成员往往由节点管理器赋值,节点本身通常不会直接访问。后文还会进一步介绍节点管理器。
# 3.2.4 属性描述
一般的节点都有一些参数需要配置,比如Settings节点,可能具有分包大小、超时时间等配置项。这些配置项作为节点的属性,外界可以通过属性设置接口(set_prop)修改它们。
为了使外界可以获知一个节点具有的所有属性,以便实现统一管理,例如:后续可以基于属性描述,在上位机上对节点属性进行可视化展示;"节点仓库"(存储器,后文会进一步介绍)可以通过属性描述决定是否将该属性的值存储到非易失存储器中(部分配置项可能需要掉电不丢失)。节点类中定义了prop_descs成员,它指向一个属性描述的指针数组,该数组中的每个成员都指向了一个属性描述。
"属性描述"通过类型为prop_desc_t的常量数据表示,其完整定义如下所示。
typedef struct _value_desc_t {
uint8_t type; /* 类型 */
uint8_t format; /* 格式 */
uint16_t flags; /* 标志 */
const char *name; /* 名称 */
const char *display_name; /* 显示名称 */
const char *desc; /* 描述(属性的文字介绍)*/
} value_desc_t, prop_desc_t, arg_desc_t, ret_desc_t;
注意,程序定义了四个类型:value_desc_t、prop_desc_t、arg_desc_t、ret_desc_t。它们本质上都是相同的,不同的名称,只是为了应用在不同的描述场景。
# 1. 类型
type表示该数据的类型,常见的类型如下所示。
类型 | 简介 |
---|---|
VALUE_DESC_TYPE_INT8 | 有符号8位数 |
VALUE_DESC_TYPE_UINT8 | 无符号8位数 |
VALUE_DESC_TYPE_INT16 | 有符号16位数 |
VALUE_DESC_TYPE_UINT16 | 无符号16位数 |
VALUE_DESC_TYPE_INT32 | 有符号32位数 |
VALUE_DESC_TYPE_UINT32 | 无符号32位数 |
VALUE_DESC_TYPE_INT64 | 有符号64位数 |
VALUE_DESC_TYPE_UINT64 | 无符号64位数 |
VALUE_DESC_TYPE_FLOAT | 单精度浮点数 |
VALUE_DESC_TYPE_DOUBLE | 双精度浮点数 |
VALUE_DESC_TYPE_BOOL | 布尔类型 |
VALUE_DESC_TYPE_STRING | 字符串类型 |
VALUE_DESC_TYPE_BINARY | 二进制数据 |
VALUE_DESC_TYPE_INT_ENUMS | 整数枚举类型 |
VALUE_DESC_TYPE_STRING_ENUMS | 字符串枚举类型 |
其中绝大部分数据类型(比如各种位宽的整数类型)都与C语言中的标准整数类型含义一致,这里仅对几个特殊的类型作简要说明:
(1)字符串类型与二进制数据的区别
字符串类型的数据通常仅由ASCII码组成,且0x00表示字符串结尾,不能出现在数据中部;而二进制数据中可以包含任意数据(包括0x00)。
(2)整数枚举类型
整数枚举类型的具体值为一个字符串数组,每个字符串值由一个整数字符串和名称字符串组成,且它们之间使用冒号分割。例如,UART通信的数据位可能有5、6、7、8,则可以使用一个整数枚举类型来表示:
.enums = {"5:5bits", "6:6bits", "7:7bits", "8:8bits", NULL},
后文会介绍到,数据类型将决定该属性的实际类型(实际类型继承自prop_desc_t,可能会扩展一些该类型下的特有数据),对于整数枚举类型,其对应的实际类型为prop_desc_int_enums_t,并使用enums成员表示枚举的字符串数组,该字符串数组必须以NULL结尾,以表示数组的结束。
整数枚举类型对应的属性(使用set_prop或get_prop设置和获取时),值为整数,且只能是该数组中列举的整数。
(3)字符串枚举类型
字符串枚举类型的具体值同样为一个为字符串数组,但数组中的每个字符串都是一个单纯的字符串,不再像整数枚举类型那样使用冒号分割出两个部分。例如,证件上的性别只能是男(male)或女(female),则可以使用字符串枚举类型表示:
.enums = {"male", "female", NULL},
对于字符串枚举类型,其对应的实际类型为prop_desc_string_enums_t,并使用enums成员表示枚举的字符串数组,该字符串数组必须以NULL结尾,以表示数组的结束。
字符串枚举类型对应的属性(使用set_prop或get_prop设置和获取时),值为字符串,且只能是该数组中列举的字符串。
数据类型(type)同时还会决定该属性的实际类型(实际类型继承自prop_desc_t,可能会扩展一些该类型下的特有数据),几个典型数据类型的类图详见下图。

对于各类数值类型(int8_t、uint8_t、int16_t、uint16_t、int32_t、uint32_t、int64_t、uint64_t、float、double等),它们扩展的成员都是相同的(仅类型不同):defvalue、min、max、step、unit。因而图中仅展示了有符号8位数的类图。在扩展的各个成员中:defvalue表示默认值;min表示最小值;max表示最大值;step表示步长,主要用于后续扩展使用(比如通过可视化界面单步调节整数值时使用),未使用时可以忽略该值的设定;unit表示单位,比如电压可能是"V",而电流可能是"A",其主要用于后续在可视化界面上显示,未使用时依然可以直接忽略。
bool类型取值只能是TRUE或FALSE,在类型中仅扩展了一个defvalue成员,表示该属性的默认值。
string类型扩展了3个成员:defvalue、min、max。defvalue表示默认值;min表示字符串的最小长度;max表示字符串的最大长度。
binary类型仅扩展了2个成员:min和max。分别表示二进制数据的最小长度和最大长度。注意,二进制数据无默认值。
int_enums(整数枚举类型)和string_enums(字符串枚举类型)均扩展了两个成员:defvalue和enums。defvalue表示默认值,enums用于列举具体的枚举值。
由于这里为每种类型的数据都定义了专有类型,使得类型繁多,但每种类型的差异性很小(成员类型不同),理解起来都是十分容易的。为避免赘述,这里仅展示prop_desc_int8_t类型的完整定义(其它类型的定义非常类似,可以此类推),如下所示。
typedef struct _value_desc_int8_t {
value_desc_t value_desc; /* 继承自基类 */
int8_t defvalue; /* 默认值 */
int8_t min; /* 最小值 */
int8_t max; /* 最大值 */
int8_t step; /* 步长 */
const char *unit; /* 单位 */
} value_desc_int8_t, prop_desc_int8_t, arg_desc_int8_t;
# 2. 格式
format可以进一步表示数据的格式。例如,对于一个字符串数据(string)或一包二进制数据(binary),若需正确解析,可能还需指定数据的格式,常见的格式有:XML、JSON、UBJSON、URL等。AWFlow中已经定义了一些常见的格式,如下所示。
格式 | 简介 |
---|---|
VALUE_FORMAT_TYPE_NONE | 无特殊格式(默认) |
VALUE_FORMAT_TYPE_HEX | 16进制格式 |
VALUE_FORMAT_TYPE_UNSIGNED | 非负格式(用于进一步限制float/double) |
VALUE_FORMAT_TYPE_PHONE | 手机号码格式 |
VALUE_FORMAT_TYPE_EMAIL | 邮件格式 |
VALUE_FORMAT_TYPE_IP4_ADDRESS | ip4地址格式 |
VALUE_FORMAT_TYPE_DOMAIN_NAME | 域名格式 |
VALUE_FORMAT_TYPE_URL | URL格式 |
VALUE_FORMAT_TYPE_JSON | JSON格式 |
VALUE_FORMAT_TYPE_XML | XML格式 |
VALUE_FORMAT_TYPE_UBJSON | UBJSON格式(JSON文件的一种具体表示法) |
格式只是针对数据类型的一个辅助信息,用于辅助数据接收方对数据进行正确的解析,具体这些不同的格式,可以灵活的应用于多种数据类型。未使用时,可以将format设置为0,即无格式:VALUE_FORMAT_TYPE_NONE。
# 3. 标志
flags用于指定一些特殊的标志,这些标志可以辅助上层系统对节点进行控制,常用的一些标志如下所示。
格式 | 简介 |
---|---|
PROP_DESC_FLAG_OBJECT | 单个对象的属性 |
PROP_DESC_FLAG_CLASS | 同一个类所有实例共享属性 |
PROP_DESC_FLAG_GLOBAL | 全局属性 |
PROP_DESC_FLAG_READABLE | 可读取 |
PROP_DESC_FLAG_WRITBALE | 可修改 |
PROP_DESC_FLAG_CONFIGURABLE | 可配置 |
PROP_DESC_FLAG_PERSISTENT | 需要持久化(掉电不丢失) |
flags的值可以是单个标志,也可以是多个标志的集合(使用C语言中的"|"运算符连接多个标志)。
(1)属性作用范围
FLAG_OBJECT、FLAG_CLASS、FLAG_GLOBAL这三个标志指定了属性的作用范围。
FLAG_OBJECT表示属性是单个对象(使用节点类创建的对象)的属性。换句话说,每个对象都会在内部独立的维护一个这样的属性,各个对象内部的属性值可以是不同的。
FLAG_CLASS表示属性是属于某一类对象的,也就是说,所有该类型的对象,共享该属性(针对这一类的对象,无论对象数目是多少,都只会存在一个唯一的属性值)。
FLAG_GLOBAL表示属性是属于整个系统的,所有对象共享(可以视为该属性是一个全局变量)。
在绝大部分情况下,都是每个对象维护一个独立的属性,以免互相影响,因此属性作用范围标志通常都直接设置为:PROP_DESC_FLAG_OBJECT。
(2)属性读写特性
PROP_DESC_FLAG_READABLE标志用于指定该属性是可读的,PROP_DESC_FLAG_WRITBALE标志用于指定该属性是可写的。一般情况下,都会同时设置这两个标志,以表明属性是可读可写的。
(3)属性配置特性
PROP_DESC_FLAG_CONFIGURABLE标志用于指定该属性是可配置的,对于可配置的属性,后续可以通过可视化工具将可配置属性展现出来,以供配置。
如果一个属性是不可配置的,表明该属性仅在系统内部使用,用户无需参与该属性的设置或获取。
(4)持久化特性
PROP_DESC_FLAG_PERSISTENT标志用于指定该属性需要持久化,即掉电不丢失。通常情况下,如果一个属性是可配置的,往往都需要持久化。以便保存用户配置,避免每次上电都要重新配置。
为便于使用,AWFlow中定义了两个宏,用于表示一般属性和可配置属性的典型标志。
一般属性标志(不可配置、可读写):
#define PROP_FLAGS_DEFAULT \
(PROP_DESC_FLAG_OBJECT | PROP_DESC_FLAG_READABLE | PROP_DESC_FLAG_WRITBALE)
可配置属性标志(可配置、可读写):
#define PROP_FLAGS_CONFIGURABLE \
(PROP_FLAGS_DEFAULT | PROP_DESC_FLAG_CONFIGURABLE | PROP_DESC_FLAG_PERSISTENT)
在实际开发过程中,往往直接使用这两个复合标志宏即可。
# 4. 名称
属性的名称。在使用set_prop和get_prop获取或设置属性时,需要通过name参数指定对应的属性名。这就要求具体的节点类在实现set_prop和get_prop方法时,所比较判断的名称与属性描述中的名称保持一致。
例如,在属性描述中描述了一个名了"timeout"的属性,用于控制超时时间,那么在实现set_prop和get_prop方法时,也应该确保对名为"timeout"属性的支持,范例程序如下所示。
static ret_t __set_prop (tk_object_t* obj, const char* name, const value_t* v) {
if (tk_str_eq("timeout", name)) {
p_this->timeout = value_int32(v); /* 将超时时间设置到当前对象中 */
return RET_OK;
} else if (/*...*/) {
/* 其它属性 */
}
return aw_node_set_prop_default(AW_NODE(obj), name, v);
}
static ret_t __get_prop (tk_object_t* obj, const char* name, value_t* v) {
aw_node_filter_add_t* filter_add = AW_NODE_FILTER_ADD(obj);
if (tk_str_eq("timeout", name)) {
value_set_int32(v, p_this->timeout); /* 从当前对象中提取出超时时间 */
return RET_OK;
} else if (/*...*/) {
/* 其它属性 */
}
return aw_node_get_prop_default(AW_NODE(obj), name, v);
}
前面提到,所有具体的节点类都继承自aw_node_t,aw_node_t类的几个基础属性也应支持通过set_prop和get_prop方法设置和获取:节点ID、是否使能(enable)等。因此,在子类实现set_prop和get_prop方法时,如果某一属性不是属于子类的特有属性,则可能是父类的属性,此时可以直接调用aw_node_set_prop_default()或aw_node_get_prop_default()尝试操作父类的属性。这也是子类在实现set_prop和get_prop方法时,往往将这两个函数放在最后的原因。
# 5. 显示名称
显示名称主要用于后续在可视化界面上显示的名称,仅供显示之用。
# 6. 描述
该描述主要是一些文字描述,通常是对该属性的简要介绍,也仅供可视化显示时使用。
例如,某一节点具有分片大小和超时时间两个属性,基于上面对属性描述中各个成员的介绍,可以完成属性描述的定义,范例如下所示。
static const prop_desc_int32_t s_prop_segment_size_desc = {
.value_desc = {
.type = VALUE_DESC_TYPE_INT32,
.name = "segment_size",
.desc = "segment Size. if it is 0, will not segment.",
.flags = PROP_FLAGS_CONFIGURABLE,
.format = 0
},
.defvalue = 128,
.min = 0,
.max = 1000
};
static const prop_desc_uint32_t s_prop_timeout_desc = {
.value_desc = {
.type = VALUE_DESC_TYPE_UINT32,
.name = "timeout",
.desc = "timeout for read(ms)",
.flags = PROP_FLAGS_CONFIGURABLE,
.format = 0
},
.defvalue = 1000,
.min = 0,
.max = 1000
};
static const prop_desc_t* s_prop_descs[] = {
(const prop_desc_t*)(&s_prop_segment_size_desc),
(const prop_desc_t*)(&s_prop_timeout_desc),
NULL
};
属性描述定义完成后,即可将s_prop_descs作为节点中prop_descs的值。
属性描述中包含了所有可以通过set_prop/get_prop设置或获取的属性,基于此,节点类提供了一个属性值有效性检查函数:aw_node_check_prop_value(),实现范例如下所示。
ret_t aw_node_check_prop_value(aw_node_t* node, const char* name, const value_t* v) {
const prop_desc_t* prop = aw_node_find_prop(node, name);
return_value_if_fail(v != NULL, RET_BAD_PARAMS);
if (prop == NULL &&
!tk_str_eq(name, AW_NODE_PROP_NAME) &&
!tk_str_eq(name, AW_NODE_PROP_ENABLE) &&
!tk_str_eq(name, AW_NODE_PROP_ID) &&
!tk_str_eq(name, AW_NODE_PROP_VERSION) &&
!tk_str_eq(name, "x") &&
!tk_str_eq(name, "y") &&
!tk_str_eq(name, "z")) {
value_t v1;
if (object_get_prop(OBJECT(node), name, &v1) != RET_OK) {
log_debug("not supported prop %s\n", name);
}
}
return RET_OK;
}
对于属性值,仅检查了v是否为NULL,对属性值中的具体内容并没有做额外的检查。而对于属性名的检查,仅在属性名不存在时,输出一段调试信息:
log_debug("not supported prop %s\n", name);
主要从三个方面判定属性值是否存在:
- 属性是否已经通过属性描述进行了描述
在aw_node_check_prop_value()函数中,调用了aw_node_find_prop()函数,根据名称获取该属性对应的属性描述,该函数的实现范例如下所示。
static const prop_desc_t* aw_node_find_prop(aw_node_t* node, const char* name) {
return_value_if_fail(node != NULL, NULL);
if (node->prop_descs != NULL) {
uint32_t i = 0;
const prop_desc_t** prop_descs = node->prop_descs;
for (i = 0; prop_descs[i] != NULL; i++) {
const prop_desc_t* iter = prop_descs[i];
if (tk_str_eq(iter->name, name)) {
return iter;
}
}
}
return NULL;
}
其实现的核心逻辑就是遍历节点中的属性描述列表,根据名称找到对应的属性描述。若不存在该名称对应的属性描述,返回NULL。
若aw_node_find_prop()可以返回有效的属性描述(prop != NULL),则表明该属性是有效的。
- 属性是否是节点基础属性
若属性并不存在对应的属性描述(prop == NULL),则继续查看是否为基础属性(name、enable、id、version)。如果是基础属性,则认为属性也是有效的。
特别地,在基础属性的判定中,额外判定了三个属性:"x"、"y"、"z",这三个属性是比较特殊的属性,整个AWFlow框架中并不会使用到它们,这几个属性主要来源于可视化上位机,它们绘制的流图中,可能会包含节点的坐标信息。这里排除了这几个属性,以避免可能会输出的警告信息。
- 是否能成功获取到属性
如果属性也并非基础属性,则再做最后的尝试:从节点对象中获取该属性,若获取成功,则表明属性有效;否者,属性无效,输出调试信息。
在判定过程中,可能会调用到节点对象的属性获取函数:object_get_prop(),因此,aw_node_check_prop_value()只能在属性设置函数中调用,不能在属性获取函数中调用(会形成循环调用,导致死循环)。
# 3.2.5 函数描述
节点就是一个对象,对象除了具有属性外,还具有一些行为(例如根据命令执行某些特定的操作)。on_data 函数是节点都具有的,不需要特别描述,一些特殊的行为才用函数来描述。
与属性描述类似,为了使外界可以获知一个节点支持的所有操作,以便实现统一管理,节点类中定义了func_descs成员,它指向一个函数描述的指针数组,该数组中的每个成员都指向了一个函数描述。
"函数描述"通过类型为func_desc_t的常量数据表示,其完整定义如下所示。
typedef struct _func_desc_t {
const char *name; /* 操作名 */
const char *desc; /* 操作描述 */
func_exec_t exec; /* 具体的执行函数 */
const arg_desc_t **args; /* 执行函数的参数描述 */
const value_desc_t *return_value; /* 返回值描述 */
} func_desc_t;
# 1. 名称
操作的名称。为了便于直接运行节点支持的某一操作,节点类对外提供了一个函数:aw_node_exec()。该函数的原型为:
ret_t aw_node_exec(aw_node_t* node, const char* name, value_t* ret, tk_object_t* args);
其中的name参数即用于指定需要执行的操作名称,在系统内部,会通过该名称与函数描述中的名称进行对比,以找到特定名称的函数(exec)。找到对应的函数后直接执行,ret参数和args参数都将直接传递给具体的exec函数。
aw_node_exec()函数的实现范例如下所示。
static const func_desc_t* aw_node_find_func(aw_node_t* node, const char* name) {
return_value_if_fail(node != NULL, NULL);
if (node->func_descs != NULL) {
uint32_t i = 0;
const func_desc_t** func_descs = node->func_descs;
for (i = 0; func_descs[i] != NULL; i++) {
const func_desc_t* iter = func_descs[i];
if (tk_str_eq(iter->name, name)) {
return iter;
}
}
}
return NULL;
}
ret_t aw_node_exec(aw_node_t* node, const char* name, value_t* ret, tk_object_t* args) {
const func_desc_t* func = aw_node_find_func(node, name);
return_value_if_fail(func != NULL && name != NULL && ret != NULL && args != NULL, RET_BAD_PARAMS);
return func->exec(node, ret, args);
}
程序中,单独定义了一个内部函数aw_node_find_func(),用于在节点的fun_descs中查找指定名称的函数描述,找到对应的函数描述后,直接执行其中的exec()方法。
# 2. 描述
该描述主要是一些文字描述,通常是对该操作所完成功能的简要介绍。
# 3. 执行函数
具体执行的函数,是整个操作的核心。其类型func_exec_t定义如下:
typedef ret_t (*func_exec_t)(void* obj, value_t* ret, tk_object_t* args);
对于节点来讲,参数obj是指向节点对象自身的指针。ret用于返回数据,返回值的具体类型等信息通过return_value所指定的"返回值描述"进行描述。args是执行该操作的一些参数,为了确保参数的灵活性,这里将args的类型也定义为了通用对象类型,可以通过通用对象中的一个或多个属性来传递参数。具体具有哪些属性名的参数,是通过函数描述中的args成员描述的。
# 4. 参数描述
args描述了执行函数exec的各个参数。每个参数通过一个arg_desc_t类型的"参数描述"进行描述。args指向了一个指针数组,且其中的每个指针都指向了一个参数描述,这一结构非常类似于前面介绍的属性描述。
由3.2.4中struct _value_desc_t的定义可知,arg_desc_t与prop_desc_t的定义是完全相同的。type、format、name、display_name、desc等成员的含义是相同的,唯一不同的是flags标志的设置,对于参数描述来讲,目前只有一个标志可用:ARG_DESC_FLAGS_REQUIRED。该标志用于指定该参数是可选的还是必选的。设置了该标志时,表示该参数是必选的(不可省略,必须提供),否则,表示该参数是可以省略的,省略时将使用参数描述中定义的默认值。
# 5. 返回值描述
返回值描述用于描述exec执行完成时的返回值(通过ret参数返回)的具体情况,由于返回值有且只有一个,因而返回值描述也仅需一个,在函数描述中使用了名为return_value的指针指向对应的描述。
返回值描述的类型为value_desc_t,由3.2.4中struct _value_desc_t的定义可知,其与prop_desc_t的定义是完全相同的。
例如,某一具体节点类可以执行名为"get_sys_info"的操作,用于获取系统的信息。系统信息分为4类:状态信息、数据缓存信息、配置信息、运行时间,因此通过一个名为"type"的参数指定需要获取的系统信息。对应的函数描述范例如下所示。
/* 返回值描述 */
static const value_desc_string_t s_get_sys_info_ret_desc = {
.value_desc = {
.type = VALUE_DESC_TYPE_STRING,
.name = "return",
.display_name = "return",
.desc = "system info",
.flags = PROP_FLAGS_DEFAULT,
},
.defvalue = "ok",
.min = 0,
.max = 1024
};
/* 单个参数描述 */
static const arg_desc_int_enums_t s_arg_get_info_desc_type_desc = {
.value_desc = {
.type = VALUE_DESC_TYPE_INT_ENUMS,
.name = "type",
.display_name = "type",
.desc = "system info type select",
.flags = ARG_DESC_FLAGS_REQUIRED,
},
.defvalue = 0,
.enums = {"0:state info", "1:data buffer info", "2:settings info", "3:run time", NULL},
};
/*参数描述集合 */
static const arg_desc_t* s_get_sys_info_arg_descs[] = {
(const arg_desc_t*)(&s_arg_get_info_desc_type_desc),
NULL
};
/* 具体的执行函数 */
static ret_t __exec_get_sys_info (void* obj, value_t* ret, tk_object_t* args) {
int type = object_get_prop_int(args, "type", 0); /* 需要获取的信息类型 */
switch(type) {
case 0: /* 获取系统状态,通过 ret 返回值返回 */
if (/* 状态正常 */) {
value_set_string(ret, "ok");
} else {
value_set_string(ret, "error");
}
break;
/* ...... 其它类型的信息获取 */
}
return RET_OK;
}
static const func_desc_t s_func_get_sys_info_desc = {
.name = "get_sys_info",
.desc = "get infomation of the system",
.exec = __exec_get_sys_info,
.return_value = (const arg_desc_t*)&(s_get_sys_info_ret_desc),
.args = (const arg_desc_t**)(s_get_sys_info_arg_descs)
};
static const func_desc_t* s_func_descs[] = {&s_func_get_sys_info_desc, NULL};
函数描述定义完成后,即可将s_func_descs作为节点中prop_descs的值。在实际应用中,函数描述应用得较少,读者对函数描述有一个基础的认识即可。
# 3.2.6 节点初始化和解初始化
节点类提供了节点初始化函数(aw_node_init)和解初始化函数(aw_node_deinit),它们的实现如下。
ret_t aw_node_init(aw_node_t* node, aw_node_category_t category, aw_node_on_data_t on_data) {
node->enable = TRUE;
node->on_data = on_data;
node->category = category;
darray_init(&(node->consumers), 0, (tk_destroy_t)(TK_OBJECT_UNREF), (tk_compare_t)aw_node_compare);
return RET_OK;
}
由此可见,consumers成员是一个动态数组,用以存储各个消费者节点对象。在调用初始化函数时,需要传入节点的类型和节点数据处理函数。
ret_t aw_node_deinit(aw_node_t* node) {
return_value_if_fail(node != NULL && node->on_data != NULL, RET_BAD_PARAMS);
TKMEM_FREE(node->id);
TKMEM_FREE(node->consumer_ids);
darray_deinit(&(node->consumers));
return RET_OK;
}
解初始化函数主要作用是释放相关资源。
节点初始化函数和解初始化函数通常并不需要由用户直接调用,而是在特定类别的节点基类中调用,下文在介绍四种类型的节点基类时,还会进一步展现。
# 3.3 四种类型的节点基类
前面提到,根据节点的输入输出情况,可以将节点分为四个大的类别:pump、filter、sink和config。由于他们的输入输出情况不同,因此对数据的处理也有所不同,例如:对于sink节点,其没有消费者,无需向消费者转发数据。为此,分别为各类别的节点定义了基类。这些基类均继承自节点类(aw_node_t),一个总的类图详见下图,后文还会对各节点类进行详细的介绍。

# 3.3.1 pump节点类
pump节点的典型特征是其前面没有任何节点,只有后置节点。换句话说,其具有消费者节点,但其本身不是任何节点的消费者。
AWFlow定义了pump节点类型:aw_node_pump_t,其继承自aw_node_t,对应类图详见下图。

aw_node_pump_t类型的完整定义如下。
struct _aw_node_pump_t;
typedef struct _aw_node_pump_t aw_node_pump_t;
struct _aw_node_pump_t {
aw_node_t node; /* 继承自aw_node_t */
aw_node_pump_input_t input; /* 需要具体类实现的虚函数:获取数据 */
aw_node_pump_output_t output; /* 需要具体类实现的虚函数:反馈数据 */
};
为便于区分,pump节点父类中的category成员会被赋值为AW_NODE_PUMP。在实际应用中,可以根据category成员的值确定节点类的实际类型。
pump类定义了两个抽象方法:input()和output(),它们需要由具体的pump类来实现。input()方法用于从外界获取数据,类型aw_node_pump_input_t的定义为:
typedef tk_object_t* (*aw_node_pump_input_t)(aw_node_t* node);
该方法的返回值为tk_object_t类型的数据,该值将作为data在数据流中传递(on_data()方法的参数)。对于pump节点来讲,其最重要的功能就是从外界获取数据,使数据进入数据流中。因而在实际应用中,input()方法是必须实现的。
output()方法用于向外界反馈一些额外的信息,类型aw_node_pump_output_t的定义为:
typedef ret_t (*aw_node_pump_output_t)(aw_node_t* node, tk_object_t* data);
要向外界反馈的数据存于data参数中。在实际应用中,output()方法可以选择性实现,没有特殊需求时往往无需实现,保持为NULL即可。output()方法主要应用于某些特殊的节点,其既可以从外界读取数据,又可以向外界发送数据,有点像是一个pump节点和一个sink节点的结合体,但其又不能拆分成两个节点,因为发送和接收往往需要使用到相同的资源。例如,在基于socket的网络通信中,同一个连接,接收数据和发送数据都是基于该socket实现的,都要使用该socket,因而只能将接收和发送数据放在同一个节点中。
pump节点的主要作用就是通过input()方法获取数据,并将数据传递给其所有的消费者以进一步处理。pump节点的数据处理工作就是完成数据的分发,基于此,在AWFlow中,pump节点父类中的on_data()方法的实现即为:aw_node_emit()。pump节点初始化函数的实现如下。
ret_t aw_node_pump_init(aw_node_t* node, aw_node_pump_input_t input) {
aw_node_pump_t* pump = AW_NODE_PUMP(node);
return_value_if_fail(node != NULL && input != NULL, RET_BAD_PARAMS);
aw_node_init(node, AW_NODE_PUMP, aw_node_emit);
pump->input = input;
return RET_OK;
}
在初始化节点时,将类别类别设置为了AW_NODE_PUMP,数据处理函数设置为了aw_node_emit。在调用初始化函数时,需要传入具体类实现的input()方法。
与初始化函数对应的是解初始化函数,当前解初始化函数没有特别的实现,直接调用了aw_node_deinit(),范例程序如下所示。
ret_t aw_node_pump_deinit(aw_node_t* node) {
return aw_node_deinit(node);
}
此外,为便于使用,pump节点对外提供了一个API: aw_node_pump_input_and_emit()。用于启动整个数据流开始工作,其原型为:
ret_t aw_node_pump_input_and_emit(aw_node_t* node);
在该函数的实现中,首先会调用input()方法获取数据,然后调用on_data()方法(对于pump节点,该方法的实现本质上就是aw_node_emit())完成对数据的处理(将数据传递给后继所有消费者节点),示意代码如下所示。
ret_t aw_node_pump_input_and_emit(aw_node_t* node) {
tk_object_t* data = ((aw_node_pump_t*)(node))->input(node); /* 获取数据 */
if (data != NULL) {
aw_node_on_data(node, data); /* 数据处理(实际就是分发给消费者处理)*/
TK_OBJECT_UNREF(data); /* 解除对数据的引用 */
}
return RET_OK;
}
当整个数据流图构建好后,只需要在主循环中以一定的时间间隔调用该函数,即可实现周期性的从系统外部获取数据,并提交到AWFlow数据流中处理。
在AWFlow中,为了数据传递的灵活性,数据本身也是一个对象(tk_object_t),以便通过对象中的多个属性传递多个数据,数据也就存在生命周期管理。
在上述代码中使用TK_OBJECT_UNREF()解除了对数据的引用,但整个函数中并没有调用TK_OBJECT_REF()引用数据对象。input()方法产生了一个数据,这就要求在input()方法中,需要完成对数据对象的引用(可能是创建一个新数据对象,也可能是引用一个现有的数据对象,总之,为了确保一致性,必须引用一次数据对象)。
一个最简单的方法就是在input()中创建一个对象,这样每次调用input()都会创建一个新对象,分发结束后(整个数据流结束),TK_OBJECT_UNREF(data)中就会销毁整个数据对象。
在后文的示例中还会看到,如果不期望每次input()都创建一个新的数据对象,那么可以在节点创建时一并创建数据对象,而在input()中,仅仅使用TK_OBJECT_REF()引用一次数据对象。
# 3.3.2 filter节点类
filter 节点就是普通节点,负责对数据进行过滤、转换和处理等。其典型特征是具有消费者,同时也是其它某些节点的消费者。
AWFlow定义了filter节点类型:aw_node_filter_t,其继承自aw_node_t,对应类图详见下图。

aw_node_filter_t类型的完整定义如下所示。
struct _aw_node_filter_t;
typedef struct _aw_node_filter_t aw_node_filter_t;
struct _aw_node_filter_t {
aw_node_t node; /* 继承自aw_node_t */
aw_node_filter_transform_t transform; /* 需要具体类实现的虚函数:获取数据 */
};
同理,为便于区分,filter节点父类中的category成员会被赋值为AW_NODE_FILTER。filter类仅定义了一个抽象方法:transform(),该抽象方法需要由具体的filter类来实现。该方法用于完成数据的具体转换(或处理),类型aw_node_filter_transform_t的定义为:
typedef tk_object_t* (*aw_node_filter_transform_t)(aw_node_t* node, tk_object_t* data);
data参数即为要处理的数据,该数据从filter节点的前置节点传递而来。返回值为处理的结果,结果仍然以obejct_t类型的数据存在。
transform方法仅需完成数据的处理即可,数据继续传递给后续消费者节点的操作由AWFlow框架完成。也就是说,AWFlow中针对filter类,会实现其父类中定义的on_data()方法:先调用transform()完成数据的处理,再调用aw_node_emit()将数据分发给后续消费者处理,该过程对应的示意代码如下所示。
static ret_t aw_node_filter_transform_and_emit(aw_node_t* node, tk_object_t* idata) {
tk_object_t* data = aw_node_filter_transform(node, idata);
if (data != NULL) {
aw_node_emit(node, data);
TK_OBJECT_UNREF(data);
}
return RET_OK;
}
此处实现的函数将作为父类中on_data()方法的实现(该函数将作为on_data函数指针的赋值)。filter节点初始化函数的实现如下所示。
ret_t aw_node_filter_init(aw_node_t* node, aw_node_filter_transform_t transform) {
aw_node_filter_t* filter = AW_NODE_FILTER(node);
return_value_if_fail(node != NULL && transform != NULL, RET_BAD_PARAMS);
aw_node_init(node, AW_NODE_FILTER, aw_node_filter_transform_and_emit);
filter->transform = transform;
return RET_OK;
}
在初始化节点时,将类别类别设置为了AW_NODE_FILTER,数据处理函数设置为了aw_node_filter_transform_and_emit。在调用初始化函数时,需要传入具体类实现的transform()方法。
与初始化函数对应的是解初始化函数,当前解初始化函数没有特别的实现,直接调用了aw_node_deinit(),范例程序如下所示。
ret_t aw_node_filter_deinit(aw_node_t* node) {
return aw_node_deinit(node);
}
在aw_node_filter_transform_and_emit()函数中,使用TK_OBJECT_UNREF()解除了对数据的引用,但整个函数中并没有调用TK_OBJECT_REF()引用数据对象。数据来源于transform(),这就要求在transform()方法中,需要完成对数据对象的引用(可能是创建一个新数据对象,也可能是引用一个现有的数据对象)。
通常情况下,在实现transform()方法时,返回的对象可能来源于三种情况:
- 方法一:每次都创建新对象
在transform()中创建一个新的数据对象,示意代码如下所示。
static tk_object_t *__transform(aw_node_t* node, tk_object_t* idata) {
tk_object_t *data = object_default_create(); /* 创建一个默认对象,作为数据对象 */
/* 提取 idata 中的相关数据,处理完成后,将相关数据设置到 data 中 */
return data;
}
- 方法二:使用事先(创建节点时)创建好的数据对象
在创建filter节点时,创建一个数据对象,在transform()中仅仅使用TK_OBJECT_REF()引用一次该对象,示意代码如下所示。
static tk_object_t *__transform(aw_node_t* node, tk_object_t* idata) {
aw_node_xxx_t *filter = (aw_node_xxx_t *)node;
/* 提取 idata 中的相关数据,完成理完成后,将相关数据设置到 filter->data中 */
return TK_OBJECT_REF(filter->data);
}
filter->data是在节点创建时一并创建的,在后续的示例中会进一步看到详细的示例。
- 方法三:直接使用通过参数传入的idata数据对象,示意代码如下所示。
static tk_object_t *__transform(aw_node_t* node, tk_object_t* idata) {
/* 提取 idata 中的相关数据,完成理完成后,将相关数据设置到 idata中 */
return TK_OBJECT_REF(idata);
}
显然,方式一每次都要创建新的对象,效率较低,极少选用。而方式二与方式三相比,主要是资源消耗的不同,方式三直接使用传入的idata数据对象,更加节省资源。但是,方式三也存在缺点。
我们知道,pump节点和filter节点都存在消费者节点,将数据分发给后续消费者节点是通过aw_node_emit()函数完成的,由aw_node_emit()函数的实现可知,data数据对象会被依次传递给各个消费者,也就是说,各个消费者节点输入的是同一个数据对象。
例如,节点A将数据分发给节点B、E、G,它们输入的数据是同一个数据对象,示意图详见下图。

节点B和节点E都是filter节点,如果节点B在处理数据过程(transform())中修改了idata数据对象中的属性,也就意味着修改了原始数据对象,那么节点E收到的数据就是修改过后的,如果节点E继续修改原始数据对象,那么节点G收到的数据就与原始数据差距更大了。
这可能会导致意料之外的结果。例如,节点B是对数据执行"加值"(ADD)操作(增加一个常量),而节点E本意是打印输出原始数据,如果节点B直接修改原始数据,将使节点E打印输出的数据是执行加值操作之后的数据。
因此,对于任意节点,均不建议在数据处理过程中修改原始数据,以避免对同级别的消费者节点产生负面影响。在实现filter节点的transform()方法时,均建议使用方式二,在节点内部维护一个特有的数据,确保所有数据可控。
在一些资源受限的场合,可能会采用方式三,此时,应谨慎处理整个数据流关系,特别考虑在修改数据时可能产生的副作用。
# 3.3.3 sink节点类
sink节点负责将数据传递给系统外部。这里的系统外部可以是控制台、图形用户界面、数据库或其它系统。
sink节点作为数据流的最后一级节点,不再存在后继消费者,相比于普通的filter节点,其少了一个将数据转发给后继消费者的操作,因而仅需完成对数据的处理。
AWFlow定义了sink节点类型:aw_node_sink_t,其继承自aw_node_t,对应类图详见下图。

aw_node_sink_t类型的完整定义如下所示。
struct _aw_node_sink_t;
typedef struct _aw_node_sink_t aw_node_sink_t;
struct _aw_node_sink_t {
aw_node_t node; /* 继承自aw_node_t */
};
为便于区分,sink节点父类中的category成员会被赋值为AW_NODE_SINK。由于除了数据处理之外没有其它额外的操作,因此,需要具体的sink类直接实现父类要求实现的on_data方法。sink节点初始化函数的实现如下所示。
ret_t aw_node_sink_init(aw_node_t* node, aw_node_on_data_t on_data) {
aw_node_init(node, AW_NODE_SINK, on_data);
return RET_OK;
}
在初始化节点时,将类别类别设置为了AW_NODE_SINK,具体的数据处理函数是通过on_data参数传入的,也就是说sink节点类并没有实现on_data()方法,需要由具体类实现。
与初始化函数对应的是解初始化函数,当前解初始化函数没有特别的实现,直接调用了aw_node_deinit(),范例程序如下所示。
ret_t aw_node_sink_deinit(aw_node_t* node) {
return aw_node_deinit(node);
}
正如filter节点中所介绍的那样,为了避免对同级别的其它消费者节点产生"不良"影响,在sink节点对数据进行处理时,也不建议对原始数据对象中的数据进行修改,通常情况下,sink节点仅仅只是访问数据对象中的数据。
# 3.3.4 config节点类
config节点是一种特殊的节点,仅用于配置参数(既无输入也无输出):利用config节点属性完成对某些参数的配置。
AWFlow定义了config节点类型:aw_node_config_t,其继承自aw_node_t,对应类图详见下图。

aw_node_config_t类型的完整定义如下所示。
struct _aw_node_config_t;
typedef struct _aw_node_config_t aw_node_config_t;
struct _aw_node_config_t {
aw_node_t node;
};
为便于区分,config节点父类中的category成员会被赋值为AW_NODE_CONFIG。config节点中不存在数据流动,即不存在数据处理,因而on_data()方法无需实现。但在AWFlow中,给定了一个默认空实现(保证结构一致性),如下所示。
static ret_t aw_node_config_on_data(aw_node_t* node, tk_object_t* data) {
return RET_NOT_IMPL;
}
config节点初始化函数的实现如下所示。
ret_t aw_node_config_init(aw_node_t* node) {
aw_node_init(node, AW_NODE_CONFIG, aw_node_config_on_data);
return RET_OK;
}
在初始化节点时,将类别类别设置为了AW_NODE_CONFIG,数据处理函数设置为了aw_node_config_on_data。
与初始化函数对应的是解初始化函数,当前解初始化函数没有特别的实现,直接调用了aw_node_deinit(),范例程序如下所示。
ret_t aw_node_config_deinit(aw_node_t* node) {
return aw_node_deinit(node);
}
通过上面对四种节点的介绍可知,在实际应用中,并不是直接实现aw_node_t类,往往需要根据节点的类别,继承自特定类型的节点类,然后实现该类要求实现的方法。pump类节点要求实现input()和output()方法;filter类节点要求实现transform()方法;sink类节点要求实现on_data()方法;config类节点无需实现数据处理方法。完整的设计类图详见下图。

具体类的实现与实际业务逻辑和功能相关,后文将以一个简单的示例展现各种类型节点的设计方法。
# 3.4 节点实现示例
在AWFlow中,一个最简单的系统仅由三个节点组成:一个pump节点负责获取(或产生)数据;一个filter节点负责处理数据;一个sink节点负责输出数据。
例如,pump节点产生一个随机数,filter节点把输入的数据加上一个增量,sink节点把数据记录下来(或输出到控制台)。对应的流图详见下图。

在这个流图中,随机数产生器(random_number)节点负责产生数据,驱动整个数据流工作。这个流图看起来非常简洁合理,但也存在一个不得不思考的问题:随机数产生的时机如何确定呢?几种典型的方案:
- 全速不间断产生(就好比放到一个while(1)里面不间断产生);
- 定时产生,比如每隔1秒产生一次;
- 某种特定的事件触发时产生。
简言之,数据产生的时机可能是非常灵活的,如果将数据产生的时机放到random_number节点内部处理,将使节点变得"臃肿",职责不单一:其不仅要负责生成随机数,还要考虑生成随机数的时机。这不利于节点的复用和维护。
一种更好的解决方案是:randon_number仅负责生成随机数,而生成随机数的时机由其它专有节点实现。例如,针对每隔一定时间产生一次随机数这种情景,"每隔一定时间"是典型的定时器特性,可以由一个专门的定时器节点来实现,对应的流图详见下图。

此时,random_number就是一个filter节点,timer定时器节点是pump节点,负责每隔一定时间产生一个数据,这个数据的内容可能并不重要,但其可以驱动整个数据流开始工作。
按照这个思路,接下来尝试实现这四个节点:
- timer节点:pump节点,用于周期性的产生一个数据;
- 随机数(Random number)节点:filter节点,不处理输入数据,输出一个随机数;
- 数据增加(add)节点:filter节点,将数据增加一个特定的值(数据处理);
- 日志记录(log)节点:sink节点,把数据记录下来,并打印输出到控制台。
通过对前面的介绍可知(详见图3.9),timer节点、Random number节点、add节点、log节点作为具体的节点,应分别继承自pump节点、filter节点、filter节点和sink节点,对应的类图详见下图。

要实现一个具体的节点类,主要需要完成以下内容:
- 确定继承关系,定义具体的节点类型;
- 实现父节点(pump、filter、sink)要求实现的方法;
- 实现on_event方法(如有需要);
- 定义属性描述(prop_descs);
- 实现属性的设置和获取方法:set_prop和get_prop(tk_object_t中定义的抽象方法);
- 定义函数描述(func_descs);
- 定义节点销毁和创建函数。
注意,在实际应用中,函数描述通常不会使用到,在设计之初可以不用理会,维持NULL即可,在有具体需求时,再实现它们也不迟。
"节点创建函数"是对外提供的唯一接口,该函数的主要功能是创建一个节点,完成节点中相关成员的赋值。上层系统可以直接通过该函数完成一个具体节点对象的创建。为了便于统一管理(后文会进一步介绍节点工厂,这些创建函数最终会统一注册到节点工厂中),AWFlow规约了节点创建函数的原型:
typedef aw_node_t* (*aw_node_create_t)(void);
也即是说,所有节点创建函数,均无参数,创建的节点通过返回值返回。作为示例,下面分别展示出timer、random number、add和log节点的具体实现。
# 3.4.1 timer节点实现
# 1. 定义具体的节点类
timer节点类继承自pump类,定义示例如下。
struct _aw_node_pump_timer_t;
typedef struct _aw_node_pump_timer_t aw_node_pump_timer_t;
struct _aw_node_pump_timer_t {
aw_node_pump_t aw_node_pump; /* 继承自pump节点 */
uint32_t duration; /* 数据产生周期时间(单位:毫秒)*/
tk_object_t *data; /* pump节点产生的数据对象 */
uint32_t timer_id; /* 使用的定时器ID */
};
该类型节点的主要作用是产生原始数据。前面提到,在AWFlow中,为了便于数据的传递,数据使用tk_object_t类型的对象表示,因而扩展了一个data数据成员,使得可以在节点内部维护一个始终有效的数据,进而不必在input()方法的实现中再创建新的数据对象。
对于那些可以通过set_prop/get_prop设置或获取的属性,属性名可能在很多地方用到(属性描述中、属性设置和获取函数中等等),出于便利性考虑,通常会将它们的属性名使用宏的形式定义好,比如:
#define AW_NODE_PUMP_TIMER_PROP_DURATION "duration"
注意,data和timer_id主要用于内部业务逻辑处理,不需要对用户展示,不属于可以通过set_prop/get_prop设置或获取的属性,因而没有定义相应的名称宏。
# 2. 实现input方法
对于pump节点来讲,绝大部分情况下都不需要实现output方法,这里仅实现核心的input方法,范例程序如下所示。
static tk_object_t* aw_node_pump_timer_input(aw_node_t* node) {
value_t v;
aw_node_pump_timer_t* pump_timer = AW_NODE_PUMP_TIMER(node);
tk_object_t* data = pump_timer->data;
value_set_uint64(&v, time_now_ms());
object_set_prop(data, AW_NODE_DATA_PROP_PAYLOAD, &v);
object_set_prop_str(data, AW_NODE_DATA_PROP_TOPIC, "timer");
return TK_OBJECT_REF(data);
}
由于在各个方法的实现中,都需要将aw_node_t *类型的指针转换为实际的timer节点类指针使用,为此,定义了一个名为AW_NODE_PUMP_TIMER()的宏,用于完成类型的强转,其定义如下:
#define AW_NODE_PUMP_TIMER(node) ((aw_node_pump_timer_t*)(node))
程序中,将payload属性的值设置为了当前时间(通过time_now_ms()),将topic属性的值设置为了"timer"。
在函数实现的最后,调用TK_OBJECT_REF(data)完成了对数据的引用并返回。在pump节点类的介绍中提到,数据在分发完成后,会自动调用一次TK_OBJECT_UNREF()解除对数据的引用,因而这里需要使用TK_OBJECT_REF()引用一次,避免在数据分发结束后数据对象被销毁。
这里使用的是节点中已经存在的数据对象(pump_timer->data),节点中data成员的赋值通常在节点创建函数(节点创建函数在后文会进一步介绍)中完成,即随节点一起被创建,形如:
pump_random_number->data = object_default_create();
object_default是Toolkit Library中实现的默认对象类,其实现了set_prop和get_prop方法,内部使用动态数组来保存设置的属性和值。在设置属性时,如果该属性不存在(通过属性名判定),则会自动新增一个属性;否则,就是修改一个已有属性的值。使用default对象可以很方便的管理一系列键值对(name就相当于键,value就是对应的值)。
当将对象(tk_object_t)用来存储(或传递)数据时,通常都直接使用系统中实现好的默认(default)object类型,可以直接通过object_default_create()函数完成这类对象的创建。
# 3. 实现on_event方法
在介绍pump节点时提到,其提供了一个API: aw_node_pump_input_and_emit(),用于启动整个数据流开始工作,其核心就是调用input()方法,并将获取到的数据分发给后续消费者节点。
按照定时器节点的定义(每隔一定时间......),实现定时器节点的关键就是:每隔一定的时间(由duration属性值决定)调用一次aw_node_pump_input_and_emit()。这就需要启动一个周期性定时器,并将定时时间设置为duration,在定时时间到时调用aw_node_pump_input_and_emit()。
由于定时时间值由duration决定,duration是一个可配置的属性,针对这类属性值,通常会在产生"节点加载完毕"事件(AW_NODE_EVENT_LOADED)后使用,据此,可以将启动定时器的操作放在对AW_NODE_EVENT_LOADED事件的处理中完成,范例程序如下所示。
static ret_t aw_node_pump_timer_on_event(aw_node_t* node, event_t* event) {
aw_node_pump_timer_t* pump_timer = AW_NODE_PUMP_TIMER(node);
if (event->type == AW_NODE_EVENT_LOADED && node->manager != NULL) {
aw_loop_t* loop = node->manager->loop;
uint32_t duration = pump_timer->duration;
pump_timer->timer_id = aw_loop_add_timer(loop, aw_node_pump_timer_on_timer, node, duration);
}
return RET_OK;
}
程序中出现了一些陌生的概念(node->manager和node->manager->loop),node->manager是节点管理器,其管理了所有节点,这里的关键是通过mangaer获取到了loop对象,最终通过loop对象启动了一个定时器(aw_loop_add_timer())。loop可以视为运行整个流图应用的主循环,其它程序模块可以向其中添加需要周期性执行的事务,aw_node_pump_timer_on_event的代码可以视为节点类启动定时器的标准程序,读者应掌握这种惯用法。有关loop的更多细节,在3.5.5节中还会进一步详细介绍。
程序的核心是调用aw_loop_add_timer()启动了一个定时器,定时器的定时时间为duration,在定时时间到时,会执行**aw_node_pump_timer_on_timer()**回调函数,其实现如下所示。
static ret_t aw_node_pump_timer_on_timer(const timer_info_t* info) {
aw_node_t* node = AW_NODE(info->ctx);
aw_node_pump_timer_t* pump_timer = AW_NODE_PUMP_TIMER(info->ctx);
if (node->enable) {
aw_node_pump_input_and_emit(AW_NODE(pump_timer));
}
return RET_REPEAT;
}
由此可见,在定时器回调函数中,调用了aw_node_pump_input_and_emit()函数。在函数的最后,需要返回RET_REPEAT,以便周期性重复定时。
aw_loop_add_timer(向loop循环中添加一个定时器)会返回一个定时器ID,在aw_node_pump_timer_on_event函数中,将该ID保存在了pump_timer->timer_id中,该ID可以用于后续在销毁节点时,移除此处添加的定时器,在节点销毁函数中,可以进一步看到如何使用timer_id。
# 4. 定义属性描述
在timer节点类中,扩展了3个属性:duration、data和timer_id。data和timer_id主要用于内部业务逻辑处理,不需要对用户展示,不属于可以通过set_prop/get_prop设置或获取的属性,因此,在属性描述中,并不需要包含对data和timer_id的描述,仅需描述duration属性,以便用户通过设置该参数调整定时周期时间。
duration属性描述的定义,范例程序如下所示。
static const prop_desc_uint32_t s_prop_duration_desc = {
.value_desc = {
.type = VALUE_DESC_TYPE_UINT32,
.name = AW_NODE_PUMP_TIMER_PROP_DURATION,
.display_name = "Duration(ms)",
.desc = "Duration(ms)",
.flags = PROP_FLAGS_CONFIGURABLE,
.format = 0
},
.defvalue = 1000,
.min = 0,
.max = 10000000
};
static const prop_desc_t* s_prop_descs[] = {(const prop_desc_t*)(&s_prop_duration_desc), NULL};
# 5. 实现属性的设置和获取方法
属性设置和获取方法,范例程序如下所示。
static ret_t aw_node_pump_timer_set_prop(tk_object_t* obj, const char* name, const value_t* v) {
aw_node_t* node = AW_NODE(obj);
aw_node_pump_timer_t* pump_timer = AW_NODE_PUMP_TIMER(obj);
return_value_if_fail(aw_node_check_prop_value(AW_NODE(obj), name, v) == RET_OK, RET_BAD_PARAMS);
if (tk_str_eq(AW_NODE_PUMP_TIMER_PROP_DURATION, name)) {
pump_timer->duration = value_uint32(v);
if (pump_timer->timer_id != TK_INVALID_ID && node->manager != NULL) {
aw_loop_t* loop = node->manager->loop;
uint32_t duration = pump_timer->duration;
aw_loop_remove_timer(loop, pump_timer->timer_id);
pump_timer->timer_id = aw_loop_add_timer(loop,aw_node_pump_timer_on_timer,node,duration);
}
return RET_OK;
}
return aw_node_set_prop_default(AW_NODE(obj), name, v);
}
static ret_t aw_node_pump_timer_get_prop(tk_object_t* obj, const char* name, value_t* v) {
aw_node_pump_timer_t* pump_timer = AW_NODE_PUMP_TIMER(obj);
if (tk_str_eq(AW_NODE_PUMP_TIMER_PROP_DURATION, name)) {
value_set_uint32(v, pump_timer->duration);
return RET_OK;
}
return aw_node_get_prop_default(AW_NODE(obj), name, v);
}
属性的设置和获取相对比较简单,唯一需要注意的是,在设置duration属性时,需要先通过timer_id移除已经添加的定时器,然后再使用新的duration启动定时器。
代码中调用了aw_node_check_prop_value()函数,检查了属性的有效性。这是节点类提供的一个方法,建议在设置属性前,都通过该函数检查一下待设置属性的有效性。
# 6. 节点销毁函数
节点销毁函数主要用于销毁相关的资源,范例程序如下所示。
static ret_t aw_node_pump_timer_on_destroy(tk_object_t* obj) {
aw_node_t* node = AW_NODE(obj);
aw_node_pump_timer_t* pump_timer = AW_NODE_PUMP_TIMER(obj);
TK_OBJECT_UNREF(pump_timer->data);
if (node->manager != NULL && pump_timer->timer_id != TK_INVALID_ID) {
aw_loop_t* loop = node->manager->loop;
aw_loop_remove_timer(loop, pump_timer->timer_id);
}
aw_node_pump_deinit(AW_NODE(obj));
return RET_OK;
}
在销毁函数中,首先销毁了节点中创建的数据对象,然后根据timer_id移除了节点添加的定时器,最后,作为pump节点,还需调用aw_node_pump_deinit(),这是pump节点类提供的解初始化函数接口。
# 7. 节点创建函数
基于上面实现的input()和on_event()方法,可以实现节点创建函数,实现范例如下所示。
static const object_vtable_t s_pump_timer_vtable = {
.type = "timer",
.desc = "aw_node_pump_timer",
.size = sizeof(aw_node_pump_timer_t),
.get_prop = aw_node_pump_timer_get_prop,
.set_prop = aw_node_pump_timer_set_prop,
.on_destroy = aw_node_pump_timer_on_destroy
};
aw_node_t* aw_node_pump_timer_create(void) {
aw_node_t* node = NULL;
tk_object_t* obj = object_create(&s_pump_timer_vtable);
aw_node_pump_timer_t* pump_timer = AW_NODE_PUMP_TIMER(obj);
return_value_if_fail(pump_timer != NULL, NULL);
node = AW_NODE(obj);
aw_node_pump_init(node, aw_node_pump_timer_input);
node->prop_descs = s_prop_descs;
node->func_descs = s_func_descs;
node->on_event = aw_node_pump_timer_on_event;
node->version = (0 << 16) | (1 << 8) | (0); /* 0.1.0 */
pump_timer->duration = s_prop_duration_desc.defvalue;
pump_timer->data = object_default_create();
return node;
}
为了使用object_create()完成对象的创建,定义了s_pump_timer_vtable常量。
在程序中使用了aw_node_pump_init()函数,其是通用pump类提供的用于初始化节点的方法,在初始化pump节点时,需要传递具体类实现的input方法(即aw_node_pump_timer_input函数)。
至此,完成了timer节点类相关的所有内容。值得注意的是,在实际编程中,各部分内容并不一定是严格按照文档讲解的顺序依次实现的,通常会先搭建好框架(比如先定义好相关函数的名称和参数,函数体留空),再不断完善。一个典型的例子是:销毁函数中具体要销毁哪些资源,会随着节点类使用资源的变化而变化。
# 3.4.2 Random number节点实现
# 1. 定义具体的节点类
Random number类继承自pump类,定义示例如下所示。
struct _aw_node_pump_random_number_t;
typedef struct _aw_node_pump_random_number_t aw_node_pump_random_number_t;
struct _aw_node_pump_random_number_t {
aw_node_filter_t aw_node_filter; /* 继承自pump节点 */
int32_t min; /* 控制随机数的最小值 */
int32_t max; /* 控制随机数的最大值 */
char *topic; /* 数据主题 */
tk_object_t *data; /* 存储产生的随机数 */
};
在该节点中,由于要产生新的随机数,在filter节点的介绍中提到,在数据处理过程中,应尽可能避免修改输入数据(原始数据),为此,扩展了一个data数据成员,以便在数据处理过程中将产生的随机数存放在自身特有的数据对象中。topic是一个主题字符串,该字符串会被设置到data中,作为数据的主题,数据主题用户可以通过set_prop设置。此外,为了限制产生的随机数范围,扩展了min和max两个属性。
类似地,为便于使用,将属性的名称使用宏的形式进行了定义:
#define AW_NODE_FILTER_RANDOM_NUMBER_PROP_MIN "min"
#define AW_NODE_FILTER_RANDOM_NUMBER_PROP_MAX "max"
#define AW_NODE_FILTER_RANDOM_NUMBER_PROP_TOPIC "topic"
# 2. 实现transform方法
对于filter节点来讲,核心是实现transform方法,范例程序如下所示。
static tk_object_t* aw_node_filter_random_number_transform(aw_node_t* node, tk_object_t* data) {
aw_node_filter_random_number_t* filter_random_number = AW_NODE_FILTER_RANDOM_NUMBER(node);
tk_object_t* ret_data = filter_random_number->data;
int32_t value = random();
int32_t min = filter_random_number->min;
int32_t max = filter_random_number->max;
int32_t range = max - min;
value = range ? (min + value % range) : min;
if (filter_random_number->topic != NULL) {
object_set_prop_str(ret_data, AW_NODE_DATA_PROP_TOPIC, filter_random_number->topic);
}
object_set_prop_int(ret_data, AW_NODE_DATA_PROP_PAYLOAD, value);
return TK_OBJECT_REF(ret_data);
}
类似地,定义了一个名为AW_NODE_PUMP_RANDOM_NUMBER的宏,用于完成类型的强转,其定义如下:
#define AW_NODE_PUMP_RANDOM_NUMBER(node) ((aw_node_pump_random_number_t*)(node))
程序中创建了随机数value(限定在min和max范围内),并将value值设置到了数据对象中,同时将topic也设置到了数据对象中,最后调用TK_OBJECT_REF(ret_data)完成了对数据的引用并返回。
# 3. 实现on_event方法
在on_event()方法的实现中,重点就是看是否需要在节点加载后处理事务,如有需要,就将相关事务添加到对AW_NODE_EVENT_LOADED事件的处理中。当前,产生随机数并不需要做额外的操作,on_event()的实现可以留空,范例程序如下所示。
static ret_t aw_node_filter_random_number_on_event(aw_node_t* node, event_t* event) {
if (event->type == AW_NODE_EVENT_LOADED) {
/*TODO:*/
}
return RET_OK;
}
为了保持良好的程序结构,即使没有实现,也可以定义一个空函数,以便在有需要时可以很方便的修改调整。
# 4. 定义属性描述
在random number类中,有三个属性需要描述:min、max、topic(data属于要传递的数据,不属于可以获取或设置的属性),属性描述定义范例程序如下所示。
static const prop_desc_int32_t s_prop_min_desc = {
.value_desc = {
.type = VALUE_DESC_TYPE_INT32,
.name = AW_NODE_FILTER_RANDOM_NUMBER_PROP_MIN,
.display_name = "Min Value",
.desc = "Min value of the random number",
.flags = PROP_FLAGS_DEFAULT | PROP_DESC_FLAG_PERSISTENT | PROP_DESC_FLAG_CONFIGURABLE,
.format = 0
},
.defvalue = 0,
.min = 0,
.max = 1000
};
static const prop_desc_int32_t s_prop_max_desc = {
.value_desc = {
.type = VALUE_DESC_TYPE_INT32,
.name = AW_NODE_FILTER_RANDOM_NUMBER_PROP_MAX,
.display_name = "Max Value",
.desc = "Max value of the random number",
.flags = PROP_FLAGS_DEFAULT | PROP_DESC_FLAG_PERSISTENT | PROP_DESC_FLAG_CONFIGURABLE,
.format = 0
},
.defvalue = 500,
.min = 0,
.max = 1000
};
static const prop_desc_string_t s_prop_topic_desc = {
.value_desc = {
.type = VALUE_DESC_TYPE_STRING,
.name = AW_NODE_FILTER_RANDOM_NUMBER_PROP_TOPIC,
.display_name = "Topic",
.desc = "Topic",
.flags = PROP_FLAGS_DEFAULT | PROP_DESC_FLAG_PERSISTENT |
PROP_DESC_FLAG_CONFIGURABLE,
.format = 0
},
.defvalue = "number",
.min = 0,
.max = 1000
};
static const prop_desc_t* s_prop_descs[] = {
(const prop_desc_t*)(&s_prop_min_desc),
(const prop_desc_t*)(&s_prop_max_desc),
(const prop_desc_t*)(&s_prop_topic_desc),
NULL
};
# 5. 实现属性的设置和获取方法
属性设置和获取方法的实现范例程序如下所示。
static ret_t aw_node_filter_random_number_set_prop(tk_object_t* obj, const char* name, const value_t* v){
aw_node_filter_random_number_t* filter_random_number = AW_NODE_FILTER_RANDOM_NUMBER(obj);
return_value_if_fail(aw_node_check_prop_value(AW_NODE(obj), name, v) == RET_OK, RET_BAD_PARAMS);
if (tk_str_eq(AW_NODE_FILTER_RANDOM_NUMBER_PROP_MIN, name)) {
filter_random_number->min = value_int32(v);
return RET_OK;
} else if (tk_str_eq(AW_NODE_FILTER_RANDOM_NUMBER_PROP_MAX, name)) {
filter_random_number->max = value_int32(v);
return RET_OK;
} else if (tk_str_eq(AW_NODE_FILTER_RANDOM_NUMBER_PROP_TOPIC, name)) {
filter_random_number->topic = tk_str_copy(filter_random_number->topic, value_str(v));
return RET_OK;
}
return aw_node_set_prop_default(AW_NODE(obj), name, v);
}
static ret_t aw_node_filter_random_number_get_prop(tk_object_t* obj, const char* name, value_t* v) {
aw_node_filter_random_number_t* filter_random_number = AW_NODE_FILTER_RANDOM_NUMBER(obj);
if (tk_str_eq(AW_NODE_FILTER_RANDOM_NUMBER_PROP_MIN, name)) {
value_set_int32(v, filter_random_number->min);
return RET_OK;
} else if (tk_str_eq(AW_NODE_FILTER_RANDOM_NUMBER_PROP_MAX, name)) {
value_set_int32(v, filter_random_number->max);
return RET_OK;
} else if (tk_str_eq(AW_NODE_FILTER_RANDOM_NUMBER_PROP_TOPIC, name)) {
value_set_str(v, filter_random_number->topic);
return RET_OK;
}
return aw_node_get_prop_default(AW_NODE(obj), name, v);
}
# 6. 节点销毁函数
节点销毁函数主要用于销毁相关的资源,范例程序如下所示。
static ret_t aw_node_filter_random_number_on_destroy(tk_object_t* obj) {
aw_node_filter_random_number_t* filter_random_number = AW_NODE_FILTER_RANDOM_NUMBER(obj);
TKMEM_FREE(filter_random_number->topic);
TK_OBJECT_UNREF(filter_random_number->data);
aw_node_filter_deinit(AW_NODE(obj));
return RET_OK;
}
在销毁函数中,首先销毁了节点中创建的数据对象,作为filter节点,还需调用aw_node_filter_deinit(),这是filter节点类提供的解初始化函数接口。
# 7. 节点创建函数
节点创建函数的实现范例如下所示。
static const object_vtable_t s_filter_random_number_vtable = {
.type = "random_number",
.desc = "aw_node_filter_random_number",
.size = sizeof(aw_node_filter_random_number_t),
.get_prop = aw_node_filter_random_number_get_prop,
.set_prop = aw_node_filter_random_number_set_prop,
.on_destroy = aw_node_filter_random_number_on_destroy
};
aw_node_t* aw_node_filter_random_number_create(void) {
aw_node_t* node = NULL;
tk_object_t* obj = object_create(&s_filter_random_number_vtable);
aw_node_filter_random_number_t* filter_random_number = AW_NODE_FILTER_RANDOM_NUMBER(obj);
return_value_if_fail(filter_random_number != NULL, NULL);
node = AW_NODE(obj);
aw_node_filter_init(node, aw_node_filter_random_number_transform);
node->prop_descs = s_prop_descs;
node->func_descs = s_func_descs;
node->on_event = aw_node_filter_random_number_on_event;
node->version = (0 << 16) | (1 << 8) | (0); /*0.1.0*/
filter_random_number->min = s_prop_min_desc.defvalue;
filter_random_number->max = s_prop_max_desc.defvalue;
filter_random_number->topic = tk_strdup(s_prop_topic_desc.defvalue);
filter_random_number->data = object_default_create();
return node;
}
类似地,定义了s_filter_random_number_vtable常量用于创建对象。程序中调用了aw_node_filter_init()函数,其是filter节点类提供的用于初始化节点的方法,在初始化filter节点时,需要传递具体类实现的transform方法(即aw_node_filter_random_number_transform函数)。
# 3.4.3 add节点实现
# 1. 定义具体的节点类
add类继承自filter类,定义示例如下所示。
struct _aw_node_filter_add_t;
typedef struct _aw_node_filter_add_t aw_node_filter_add_t;
struct _aw_node_filter_add_t {
aw_node_filter_t aw_node_filter;
int32_t delta;
tk_object_t *data;
};
该类型节点的主要作用是对原始数据增加一个特定值。为了控制具体的增量,扩展了一个delta属性(使用有符号数类型,表明增量也可能为负数)。类似地,作为filter节点,扩展了一个data数据成员,以便在数据处理过程中将处理后的数据存放在自身特有的数据对象中,避免对原始数据进行修改。
类似地,为便于使用,将属性的名称使用宏的形式进行了定义:
#define AW_NODE_FILTER_ADD_PROP_DELTA "delta"
# 2. 实现transform方法
对于filter节点来讲,核心是实现transform方法,范例程序如下所示。
static tk_object_t* aw_node_filter_add_transform(aw_node_t* node, tk_object_t* data) {
value_t v;
aw_node_filter_add_t* filter_add = AW_NODE_FILTER_ADD(node);
if (object_get_prop(data, AW_NODE_DATA_PROP_PAYLOAD, &v) == RET_OK) {
int32_t value = value_int(&v) + filter_add->delta;
object_set_prop_int(filter_add->data, AW_NODE_DATA_PROP_PAYLOAD, value);
return TK_OBJECT_REF(filter_add->data);
} else {
return NULL;
}
}
类似地,定义了一个名为AW_NODE_FILTER_ADD的宏,用于完成类型的强转,其定义如下:
#define AW_NODE_FILTER_ADD(node) ((aw_node_filter_add_t*)(node))
程序中,首先从输入数据(data)中获取到payload属性值,然后将该值增加delta,再将其设置到节点自身维护的数据对象(filter_add->data)中,最后调用TK_OBJECT_REF()完成了对数据的引用并返回。
# 3. 实现on_event方法
当前并不需要对事件做任何处理,on_event()的实现可以留空,可以选择定义一个空函数以维持程序结构(如aw_node_filter_random_number_on_event),也可以不定义,保持节点中的on_event为NULL。为了分别展示两种情形,这里选择不定义on_event()函数。
# 4. 定义属性描述
在add类中,扩展了一个delta属性,用于指定数据的增量(data属于要传递的数据,不属于可以获取或设置的属性),该属性的描述范例程序如下所示。
static const prop_desc_int32_t s_prop_delta_desc = {
.value_desc = {
.type = VALUE_DESC_TYPE_INT32,
.name = AW_NODE_FILTER_ADD_PROP_DELTA,
.display_name = "Delta",
.desc = "Delta",
.flags = PROP_FLAGS_CONFIGURABLE,
.format = 0
},
.defvalue = 0,
.min = -1000,
.max = 1000
};
static const prop_desc_t* s_prop_descs[] = {(const prop_desc_t*)(&s_prop_delta_desc), NULL};
# 5. 实现属性的设置和获取方法
属性设置和获取方法的实现范例如下所示。
static ret_t aw_node_filter_add_set_prop(tk_object_t* obj, const char* name, const value_t* v)
{
aw_node_filter_add_t* filter_add = AW_NODE_FILTER_ADD(obj);
return_value_if_fail(aw_node_check_prop_value(AW_NODE(obj), name, v) == RET_OK, RET_BAD_PARAMS);
if (tk_str_eq(AW_NODE_FILTER_ADD_PROP_DELTA, name)) {
filter_add->delta = value_int32(v);
return RET_OK;
}
return aw_node_set_prop_default(AW_NODE(obj), name, v);
}
static ret_t aw_node_filter_add_get_prop(tk_object_t* obj, const char* name, value_t* v) {
aw_node_filter_add_t* filter_add = AW_NODE_FILTER_ADD(obj);
if (tk_str_eq(AW_NODE_FILTER_ADD_PROP_DELTA, name)) {
value_set_int32(v, filter_add->delta);
return RET_OK;
}
return aw_node_get_prop_default(AW_NODE(obj), name, v);
}
# 6. 节点销毁函数
节点销毁函数主要用于销毁相关的资源,范例程序如下:
static ret_t aw_node_filter_add_on_destroy(tk_object_t* obj) {
aw_node_filter_add_t* filter_add = AW_NODE_FILTER_ADD(obj);
TK_OBJECT_UNREF(filter_add->data);
aw_node_filter_deinit(AW_NODE(obj));
return RET_OK;
}
在销毁函数中,首先销毁了节点中创建的数据对象,作为filter节点,还需调用aw_node_filter_deinit(),这是filter节点类提供的解初始化函数接口。
# 7. 节点创建函数
基于上面各个部分的实现,可以实现节点创建函数,范例如下所示。
static const object_vtable_t s_filter_add_vtable = {
.type = "add",
.desc = "aw_node_filter_add",
.size = sizeof(aw_node_filter_add_t),
.get_prop = aw_node_filter_add_get_prop,
.set_prop = aw_node_filter_add_set_prop,
.on_destroy = aw_node_filter_add_on_destroy
};
aw_node_t* aw_node_filter_add_create(void) {
aw_node_t* node = NULL;
tk_object_t* obj = object_create(&s_filter_add_vtable);
aw_node_filter_add_t* filter_add = AW_NODE_FILTER_ADD(obj);
return_value_if_fail(filter_add != NULL, NULL);
node = AW_NODE(obj);
aw_node_filter_init(node, aw_node_filter_add_transform);
node->prop_descs = s_prop_descs;
node->version = (0 << 16) | (1 << 8) | (0); /*0.1.0*/
filter_add->delta = s_prop_delta_desc.defvalue;
filter_add->data = object_default_create();
return node;
}
类似地,定义了s_filter_add_vtable常量用于创建对象。在上述代码中使用了aw_node_filter_init()函数,其是filter节点类提供的用于初始化节点的方法,初始化时传入了具体实现的transform方法(即aw_node_filter_add_transform函数)。
# 3.4.4 log节点实现
# 1. 定义具体的节点类
log类继承自sink类,定义示例如下所示。
struct _aw_node_sink_log_t;
typedef struct _aw_node_sink_log_t aw_node_sink_log_t;
struct _aw_node_sink_log_t {
aw_node_sink_t aw_node_sink;
bool_t log_to_console; /* 是否把信息输出到控制台 */
bool_t log_to_client; /* 是否把信息输出到调试窗口 */
bool_t log_to_mem; /* 是否把信息记录在内存中(用于测试)*/
str_t log; /* 数据记录:使用字符串存储 */
};
该类型节点的主要作用是完成数据对"外界"的展示或将数据输出到外部系统。共计扩展了4个成员。其中,log_to_console、log_to_client、log_to_mem是布尔类型的控制开关,用于控制是否将日志记录输出到特定的位置:
- log_to_console用于控制是否将信息输出至控制台;
- log_to_client用于控制是否将信息输出至客户端(至于客户端是什么,系统可以根据实际情况灵活实现,后文会进一步说明);
- log_to_mem用于将信息保存在节点内部的log属性中,主要用于测试。
节点类中定义的log属性即用于当log_to_mem为TRUE时,保存日志信息。这一运行逻辑在on_data()方法的实现中还可以进一步明确。log_to_mem和log的存在,使得对日志节点的测试更为便利:提供一种机制,可以获得当前的日志信息,进而将其与期望的日志信息对比,实现自动化测试。
类似地,为便于使用,将属性的名称使用宏的形式进行了定义:
#define AW_NODE_SINK_LOG_PROP_LOG_TO_CONSOLE "log_to_console"
#define AW_NODE_SINK_LOG_PROP_LOG_TO_CLIENT "log_to_client"
#define AW_NODE_SINK_LOG_PROP_LOG_TO_MEM "log_to_mem"
#define AW_NODE_SINK_LOG_PROP_LOG "log"
值得注意的是,与pump类节点(比如timer)、filter类节点(比如random_number和add)的定义不同,在log节点类的定义中,不再包含数据对象(data),这是因为sink节点不再需要继续向后传递数据,其往往只需要访问原始数据进行特定的操作,不会修改原始数据,因而也就不需要在内部维护一个特有的数据对象。
在实际应用中,通常并不能一次性定义好节点类型,在设计之初,可以定义好一个初步的框架(比如继承自sink节点),在后续的功能实现中再逐步完善。
# 2. 实现on_data方法
作为sink类节点,核心是实现on_data方法,范例程序如下所示。
static ret_t aw_node_sink_log_on_data(aw_node_t* node, tk_object_t* data) {
value_t v;
int32_t len = 0;
int32_t type = 0;
aw_node_sink_log_t* sink_log = AW_NODE_SINK_LOG(node);
if (object_get_prop(data, AW_NODE_DATA_PROP_PAYLOAD, &v) != RET_OK) {
return RET_FAIL;
}
len = object_get_prop_int(data, AW_NODE_DATA_PROP_PAYLOAD_LENGTH, 0);
type = object_get_prop_int(data, AW_NODE_DATA_PROP_PAYLOAD_TYPE, 0);
if (len > 0 && v.type == VALUE_TYPE_POINTER) {
char* payload = (char*)value_pointer(&v);
if (sink_log->log_to_mem) {
str_append_with_len(&(sink_log->log), (char*)payload, len);
}
if (type == AW_FLOW_CONTENT_TYPE_JSON || type == AW_FLOW_CONTENT_TYPE_CSV) {
if (sink_log->log_to_console) {
log_debug("%s\n", payload);
}
if (sink_log->log_to_client) {
aw_node_manager_log(node->manager, node, payload);
}
}
} else {
char buff[64];
const char* str = value_str_ex(&v, buff, sizeof(buff) - 1);
if (str != NULL) {
if (sink_log->log_to_console) {
log_debug("%s\n", str);
}
if (sink_log->log_to_mem) {
str_append(&(sink_log->log), str);
str_append(&(sink_log->log), "\n");
}
if (sink_log->log_to_client) {
aw_node_manager_log(node->manager, node, str);
}
}
}
return RET_OK;
}
可以将数据的处理分为两个部分:日志信息的产生和日志信息的发送(存储)。
- 日志信息的产生
首先提取出了payload,包括payload、payload_length和payload_type,如果长度有效且payload值的类型为指针,则将数据本身视为一个字符串进行处理,整个字符串就是日志信息,无需额外的加工。
否则,可能是其它类型的数据,比如整数、浮点数等等,此时就使用value_str_ex()函数将payload值转换为字符串(比如把整数123转换为字符串"123"),这个字符串信息就是日志信息。
- 日志信息的发送(存储)
这与log_to_console、log_to_client、log_to_mem的值相关。若log_to_console的值为TRUE,则会将信息输出到console控制台;若log_to_mem的值为TRUE,则会将字符串添加到log中,以将日志信息记录在内存中;若log_to_client的值为TRUE,则会调用aw_node_manager_log()将日志信息输出到客户端,具体客户端是什么,与具体实现相关,可能是远程的客户端,可能是UI窗口等,这会在后文介绍节点管理器时进一步说明。
在实际应用中,传递二进制或字符串数据时通常会使用到三个属性:
- AW_NODE_DATA_PROP_PAYLOAD("payload")
- AW_NODE_DATA_PROP_PAYLOAD_LENGTH("payloadLength")
- AW_NODE_DATA_PROP_PAYLOAD_TYPE("payloadType")
PAYLOAD属性和PAYLOAD_LENGTH属性表示数据起始地址以及数据的长度,PAYLOAD_TYPE属性表示数据的类型,以指示这类二进制数据或字符串具体使用的数据格式(比如XML、JSON、CSV、UBJSON等),便于接收方正确解析。换句话说,针对使用PAYLOAD表示的数据,可能会使用多个属性来额外表示一些信息。
此外,由于在各个方法的实现中,都需要将aw_node_t *类型的指针转换为实际的log类指针使用,为此,定义了一个名为AW_NODE_SINK_LOG的宏,用于完成类型的强转,其定义如下:
#define AW_NODE_SINK_LOG(node) ((aw_node_sink_log_t*)(node))
# 3. 实现on_event方法
作为日志记录节点,可以把产生的事件也记录在内,on_event()实现范例如下所示。
static ret_t aw_node_sink_log_on_event(aw_node_t* node, event_t* event) {
aw_node_sink_log_t* sink_log = AW_NODE_SINK_LOG(node);
if (event->type == AW_NODE_EVENT_LOADED) {
str_append(&(sink_log->log), "loaded\n");
} else if (event->type == AW_NODE_EVENT_GRAPH_LOADED) {
str_append(&(sink_log->log), "graph_loaded\n");
}
return RET_OK;
}
程序仅用作示意,将事件信息保存在了内部log中。实际上,也可以根据log_to_console、log_to_client、log_to_mem等值,将这些事件信息相关的日志输出到指定的位置。
# 4. 定义属性描述
在log类中,扩展了4个属性:log_to_console、log_to_client、log_to_mem和log,但log是用于存储日志信息的字符串的对象,主要用于内部测试,并不需要由用户直接配置(比如在可视化工具中配置),因此,不需要对log属性进行描述,基于此,其它三个属性的描述范例程序如下所示。
static const prop_desc_bool_t s_prop_log_to_console_desc = {
.value_desc = {
.type = VALUE_DESC_TYPE_BOOL,
.name = AW_NODE_SINK_LOG_PROP_LOG_TO_CONSOLE,
.display_name = "Console",
.desc = "print the data to console",
.flags = PROP_FLAGS_DEFAULT | PROP_DESC_FLAG_PERSISTENT | PROP_DESC_FLAG_CONFIGURABLE,
.format = 0
},
.defvalue = TRUE
};
static const prop_desc_bool_t s_prop_log_to_client_desc = {
.value_desc = {
.type = VALUE_DESC_TYPE_BOOL,
.name = AW_NODE_SINK_LOG_PROP_LOG_TO_CLIENT,
.display_name = "Debug Window",
.desc = "send log to client debug window",
.flags = PROP_FLAGS_DEFAULT | PROP_DESC_FLAG_PERSISTENT | PROP_DESC_FLAG_CONFIGURABLE,
.format = 0
},
.defvalue = TRUE
};
static const prop_desc_bool_t s_prop_log_to_mem_desc = {
.value_desc = {
.type = VALUE_DESC_TYPE_BOOL,
.name = AW_NODE_SINK_LOG_PROP_LOG_TO_MEM,
.display_name = "Hold log in mem",
.desc = "hold log in mem(for unit test only)",
.flags = PROP_FLAGS_DEFAULT,
.format = 0
},
.defvalue = FALSE
};
static const prop_desc_t* s_prop_descs[] = {
(const prop_desc_t*)(&s_prop_log_to_console_desc),
(const prop_desc_t*)(&s_prop_log_to_client_desc),
(const prop_desc_t*)(&s_prop_log_to_mem_desc),
NULL
};
注意,log_to_mem主要用于内部测试,其值为TRUE时,会将日志信息存一份到字符串对象log中(位于内存中),该功能主要用于测试。该参数并不需要通过可视化工具配置,也不需要掉电保存,因此,在其对应的描述的flags中,并没有设定PROP_DESC_FLAG_PERSISTENT 和 PROP_DESC_FLAG_CONFIGURABLE标志。
实际上,在属性描述中,不对log_to_mem属性进行描述也是可以的,这里主要是为了展示这样一种情况,即出于代码可读性考虑,也可以对一些不需要配置、不需要持久化的属性进行描述(这类描述可以提高代码的可读性和可维护性),但是,这些描述毕竟会占用一定的存储空间,也应注意平衡,避免对一些无关紧要,完全在内部使用的一些属性进行描述。
# 5. 实现属性的设置和获取方法
属性设置和获取方法的实现范例如下所示。
static ret_t aw_node_sink_log_set_prop(tk_object_t* obj, const char* name, const value_t* v) {
aw_node_sink_log_t* sink_log = AW_NODE_SINK_LOG(obj);
return_value_if_fail(aw_node_check_prop_value(AW_NODE(obj), name, v) == RET_OK, RET_BAD_PARAMS);
if (tk_str_eq(AW_NODE_SINK_LOG_PROP_LOG_TO_CONSOLE, name)) {
sink_log->log_to_console = value_bool(v);
return RET_OK;
} else if (tk_str_eq(AW_NODE_SINK_LOG_PROP_LOG_TO_CLIENT, name)) {
sink_log->log_to_client = value_bool(v);
return RET_OK;
} else if (tk_str_eq(AW_NODE_SINK_LOG_PROP_LOG_TO_MEM, name)) {
sink_log->log_to_mem = value_bool(v);
return RET_OK;
} else if (tk_str_eq(AW_NODE_SINK_LOG_PROP_LOG, name)) {
str_set(&(sink_log->log), "");
return RET_OK;
}
return aw_node_set_prop_default(AW_NODE(obj), name, v);
}
static ret_t aw_node_sink_log_get_prop(tk_object_t* obj, const char* name, value_t* v) {
aw_node_sink_log_t* sink_log = AW_NODE_SINK_LOG(obj);
if (tk_str_eq(AW_NODE_SINK_LOG_PROP_LOG_TO_CONSOLE, name)) {
value_set_bool(v, sink_log->log_to_console);
return RET_OK;
} else if (tk_str_eq(AW_NODE_SINK_LOG_PROP_LOG_TO_CLIENT, name)) {
value_set_bool(v, sink_log->log_to_client);
return RET_OK;
} else if (tk_str_eq(AW_NODE_SINK_LOG_PROP_LOG_TO_MEM, name)) {
value_set_bool(v, sink_log->log_to_mem);
return RET_OK;
} else if (tk_str_eq(AW_NODE_SINK_LOG_PROP_LOG, name)) {
value_set_str(v, sink_log->log.str);
return RET_OK;
}
return aw_node_get_prop_default(AW_NODE(obj), name, v);
}
# 6. 节点销毁函数
节点销毁函数主要用于销毁相关的资源,范例程序如下所示。
static ret_t aw_node_sink_log_on_destroy(tk_object_t* obj) {
aw_node_sink_log_t* sink_log = AW_NODE_SINK_LOG(obj);
str_reset(&(sink_log->log));
aw_node_sink_deinit(AW_NODE(obj));
return RET_OK;
}
在销毁函数中,销毁了节点中存储的记录(sink_log->log),同时,作为sink节点,还需调用aw_node_sink_deinit(),这是sink节点类提供的解初始化函数接口。
# 7. 节点创建函数
log节点类的创建函数实现范例如下所示。
static const object_vtable_t s_sink_log_vtable = {
.type = "log",
.desc = "aw_node_sink_log",
.size = sizeof(aw_node_sink_log_t),
.get_prop = aw_node_sink_log_get_prop,
.set_prop = aw_node_sink_log_set_prop,
.on_destroy = aw_node_sink_log_on_destroy
};
aw_node_t* aw_node_sink_log_create(void) {
aw_node_t* node = NULL;
tk_object_t* obj = object_create(&s_sink_log_vtable);
aw_node_sink_log_t* sink_log = AW_NODE_SINK_LOG(obj);
return_value_if_fail(sink_log != NULL, NULL);
node = AW_NODE(obj);
aw_node_sink_init(node, aw_node_sink_log_on_data);
node->prop_descs = s_prop_descs;
node->func_descs = s_func_descs;
node->on_event = aw_node_sink_log_on_event;
node->version = (0 << 16) | (1 << 8) | (0); /*0.1.0*/
sink_log->log_to_console = s_prop_log_to_console_desc.defvalue;
sink_log->log_to_client = s_prop_log_to_client_desc.defvalue;
sink_log->log_to_mem = s_prop_log_to_mem_desc.defvalue;
str_init(&(sink_log->log), 0);
return node;
}
程序中定义了s_sink_log_vtable常量用于创建对象。在上述代码中使用了aw_node_sink_init()函数,其是sink节点类提供的用于初始化节点的方法,初始化时传入了具体实现的on_data()方法(即aw_node_sink_log_on_data函数)。
# 3.5 AWFlow中的其它重要组成部分
AWFlow是一个框架,仅仅只有节点是远远不够的,本节将会介绍几个重要部分:节点工厂、graph加载器、节点仓库、节点管理器、loop循环等。
# 3.5.1 节点工厂
在上一小节中,实现了四个具体的节点(timer、random number、add、log),它们各自都提供了相应节点创建函数,如下所示。
节点类型 | 对应创建函数的原型 |
---|---|
timer | aw_node_t* aw_node_pump_timer_create(void); |
random number | aw_node_t* aw_node_filter_random_number_create(void); |
add | aw_node_t* aw_node_filter_add_create(void); |
log | aw_node_t* aw_node_sink_log_create(void); |
在一个实际的系统中,会有各种不同的节点,它们对应的创建函数各不相同。显然,框架不能直接创建具体的节点,否则增加新的节点就要修改框架,使系统明显违背"开闭原则"。为了解决这个问题,AWFlow引入了"节点工厂",负责创建节点。新增的节点通过工厂提供的接口把节点创建函数注册到工厂中。这样框架就可以通过工厂创建节点,而不需要知道节点的具体类型了。
基于节点工厂,系统创建一个节点的流程详见下图。

graph加载器是系统需要"创建节点"的部分,所谓graph加载器,即根据用户描述的数据流图(graph),完成节点的加载(创建各个节点,并组织好各个节点之间的关系)。显然,在加载过程中,会涉及到节点的创建,由上图可知,节点创建工作将委托给节点工厂完成。有关graph加载器的更多内容,后文还会进一步介绍。
对于用户来讲,要实现这一过程,需要完成两个步骤:一个是创建节点工厂;另一个是将各种不同类型的"节点创建函数"注册到节点工厂中。
# 8. 创建工厂
AWFlow提供了工厂创建函数,其原型为:
aw_node_factory_t* aw_node_factory_create(void);
该接口无任何参数,返回值即为创建的工厂,使用起来非常便捷。aw_node_factory_t是工厂类型,用户无需了解这个类型的详细定义,此处的返回值仅作为工厂的引用,后续可用于其它接口的参数传递。
# 9. 注册创建函数
在一个系统中,各个具体节点都会实现一个创建函数(详见3.5.1的创建函数原型),为了使工厂可以基于这些函数完成节点的创建,需要将这些创建函数注册到工厂中。
节点工厂提供了相应的注册函数,其原型为:
ret_t aw_node_factory_register(aw_node_factory_t* factory, const char* type, aw_node_create_t create);
其中,factory表示需要将节点创建函数注册到哪个工厂中(其值为创建工厂时的返回值);type为该节点的类型,其值为字符串,后续系统即可使用该字符串指定需要创建的节点类型;create即为各个具体节点类提供的创建函数。
例如,要将各个节点对应的创建函数注册到节点工厂中,则示意代码如下所示。
aw_node_factory_t* factory = aw_node_factory_create();
aw_node_factory_register(factory, "timer", aw_node_pump_timer_create);
aw_node_factory_register(factory, "random_number", aw_node_filter_random_number_create);
aw_node_factory_register(factory, "add", aw_node_filter_add_create);
aw_node_factory_register(factory, "log", aw_node_sink_log_create);
程序中,将type分别指定为了"timer"、"random_number"、"add"、"log",后续即可使用这些字符串指定对应的节点类型。
# 10. 创建节点
将节点创建函数注册到工厂之后,上层系统(比如graph加载器)就可以通过节点工厂完成节点的创建。创建节点的函数原型为:
aw_node_t* aw_node_factory_create_node(aw_node_factory_t* factory, const char* type);
其中,factory表示具体的工厂,type即为待创建节点的类型。系统将根据type找到注册的创建函数,然后调用创建函数完成节点的创建,最后通过返回值返回创建的节点。
# 11. 销毁工厂
当不再使用工厂时,可以执行销毁操作,以释放相关资源,销毁函数的原型为:
ret_t aw_node_factory_destroy(aw_node_factory_t* factory);
具体使用时,直接通过factory参数传入使用创建函数创建的节点工厂即可:
aw_node_factory_t* factory = aw_node_factory_create();
// ..... 其它操作
aw_node_factory_destroy (factory);
# 3.5.2 流图加载器
前面介绍了单个节点的设计方法,而实际应用通常是由多个节点组成的,多个节点以及它们之间的关系构成了数据流图("graph")。graph加载器的作用就是解析流图中的节点信息,然后利用工厂创建好相关节点,以在程序中构建整个流图应用。
流图的表现形式多种多样(后文会介绍两种典型的表示形式)。显然,不同表现形式的流图需要对应不同的加载器,但加载器的核心都是"加载",据此,AWFlow定义了抽象的加载器类:aw_graph_loader_t,并针对两种流图表示方法实现了两种对应的加载器,对应类图详见下图。

AWFlow框架已经实现了常用的加载器,用户无需关心定义细节,后文会介绍各种具体加载器的使用方法。
流图表示形式多种多样,最直观的是用可视化工具表示(比如AWFlow-Designer)。可视化工具可以方便用户绘制流图以及配置各节点的属性,但其本质上只是一种"展示"工具,流图相关的信息最终还是要以某种格式的数据保存起来,以方便存储、解析、甚至传输(将一张流图从一个设备发送到另一个设备)。
例如,针对图3.11所示的数据流图,完善关键属性的值,一张完整的流图示意图详见下图。

通常情况下,如果使用可视化工具绘制流图,id和consumer_ids两个属性可以根据节点的前后关系自动生成,其它属性可以通过可视化的配置界面进行配置,十分便捷。这里为了原理性说明,将id和consumer_ids两个属性也显式的表现在了graph中。
如何保存这样一张流图呢?在AWFlow中,提供了两种格式的存储方式,对应了两种加载器。
# 1. 流图加载器(JSON)
流图通常采用可视化工具绘制,AWFlow-Designer会使用JSON格式的文件存储所有流图信息,文件示意内容如下所示。
[
{
"id": "2",
"type": "timer",
"duration": "1000",
"wires": [ "3"]
},
{
"id": "3",
"type": "random_number",
"min": "10",
"max": "1000",
"wires": [ "4"]
},
{
"id": "4",
"type": "add",
"delta": "1000",
"wires": ["5"]
},
{
"id": "5",
"type": "log",
"log_to_console": "true",
"wires": []
}
]
其核心就是使用一系列键值对来表示各个属性值。文件中使用了名为"wires"的数组来表示节点后续与哪些节点用"线"直接连接,该值直接决定了节点的消费者。这里仅作示意性展示,实际应用中,可能会包含更多信息,比如节点在可视化界面中的坐标信息,但这些信息只是对显示有用,对流图本身的业务逻辑关系不会有任何影响。
针对JSON格式的流图信息,AWFlow实现了对应加载器:aw_graph_loader_json_t。该加载器用于将json文件中的相关信息读取出来,然后根据这些信息创建相关节点,并设定好各个节点的属性值。该加载提供了一个创建函数,通过调用创建函数,即可实现加载器的快速创建,其函数原型为:
aw_graph_loader_t* aw_graph_loader_json_create(aw_node_factory_t* factory, const char* uri);
在创建graph加载器时,需要通过factory参数指定该加载器所使用的节点工厂,以便后续加载器可以通过节点工厂创建指定类型(type)的节点。同时,还需通过uri参数指定JSON文件的路径。函数的返回值即为创建的graph加载器。
可使用如下语句完成流图加载器(JSON)的创建:
aw_node_factory_t *factory = aw_node_factory_create();
aw_graph_loader_t *loader = aw_graph_loader_json_create(factory, "file://./tests/test1.json");
流图加载器涉及到JSON文件,这就要求加载器必须有读取文件、解析JSON数据的能力,这不仅增加了资源消耗,也降低了加载的效率,在一些低资源的嵌入式平台可能并不适用。因此,AWFlow定义了另外一种流图表示形式:C数组形式。以满足低端平台的使用(没有文件系统,不涉及JSON格式解析)。
# 2. 流图加载器(C数组形式)
为了简化加载流程,提高加载速度,直接将流图以C数组的形式保存在C代码中,经过编译,这些信息最终将直接存储在程序固件中,这样就可以省略文件读取、JSON格式解析等相关操作。C数组的定义范例如下所示(对应流图详见图3.15)。
static const aw_graph_desc_t s_flow2[] = {
{"timer", "2", "3", "duration\0:1000\0\0\0"},
{"random_number", "3", "4", "min\0:10\0max\0:1000\0\0\0"},
{"add", "4", "5", "delta\0:1000\0\0\0"},
{"log", "5", NULL, "log\0:\0prefix\0:\0max_size\0:1000\0log_to_console\0:true\0\0\0"},
{NULL, NULL, NULL, NULL}
};
在该C数组中,除最后一个成员以外(最后一个成员中的所有信息均为NULL,用以表示整个的数组的结束),每个成员都表示了一个节点相关的所有信息(id、类型、消费者等等),数组成员的类型为:aw_graph_desc_t,其定义如下所示。
typedef struct _aw_graph_desc_t {
const char *type; /* 类型 */
const char *id; /* ID */
const char *consumer_ids; /* 消费者id */
const char *props; /* 其它属性列表 */
} aw_graph_desc_t;
其中包含了4个成员,每个成员的值都是一个字符串。type表示节点的类型,graph加载器将通过该信息完成节点的创建,因此,这里的type要与注册节点类对应的创建函数时指定的type一致;id即为该节点的id,其值将作为节点类中id成员的值;consumer_ids为该节点的消费者,其值将作为节点类中consumer_ids的值(若存在多个消费者,则多个消费者的id之间使用逗号分隔);props为创建该节点时,对该节点其它属性的设置(为节点的其它属性设定初始值)。
props指定了节点其它属性的初始值,不同节点需要设置的属性可能不尽相同,比如randrom number有min和max属性,而add节点仅有delta属性。在使用字符串表示时,每个属性使用一个键值对表示(键作为属性名,值作为属性值),具体格式为:
"name\0:value\0"
即属性名和属性值之间使用冒号分割。此外,属性名和属性值后应紧跟一个特殊字符'\0'(即C语言中字符串结束符),在属性名和属性值后都紧跟一个字符串结束符,主要是为了便于字符串的解析。注意,属性名不能为空,但属性值可以为空。
部分节点可能存在多个属性,那么在属性与属性之间无需任何间隔,直接紧紧相连即可:
"name1\0:value1\0name2\0:value2\0name3\0:value3\0"
特别地,在最后一个属性表示完成后,要额外增加两个字符'\0',以表示整个属性字符串结束。例如,要描述3个属性,则在第三个属性描述完成后额外再增加两个字符'\0'(由于value3后本身就有一个'\0',因此形式上最后有连续的3个'\0'):
"name1\0:value1\0name2\0:value2\0name3\0:value3\0\0\0"
在加载器加载一个节点时,会从props中解析出各个属性值,并使用节点提供的set_prop()方法完成这些属性值的设置。
例如,要对timer、random number、add、filter的各个扩展属性设置初始值,且设置的初始值如下所示。
节点类型 | 属性 | 值 |
---|---|---|
timer | duration | 1000 |
Random number | min | 10 |
Random number | max | 1000 |
add | delta | 1000 |
log | log_to_console | true |
则四个节点对应的props应分别设置为:
"duration\0:1000\0\0\0"
"min\0:10\0max\0:1000\0\0\0"
"delta\0:1000\0\0\0"
"log_to_console\0:true\0\0\0"
没有通过props设置的属性,将保持节点设计时指定的默认值。根据图3.15中所示的各个信息,可以完成数据流图的描述,
注意,在数组最后需要添加一个全为NULL的节点描述成员,以标定数组的结束。如果读者自行尝试来编写这个数组可能会发现,维护这个数组是极为困难的:
- 属性字符串中存在大量的冒号、'\0',修改极为不便;
- 需要确保ID的唯一性,人为保证是比较困难的;
- 流图中的节点关系非常不直观,无法明显看出数据流关系(这还是一个十分简单的流图)。
在实际应用开发过程中,并不会全靠人工来维护这个数组的,往往会使用可视化工具AWFlow-Designer绘制流图,然后通过工具将流图转换为这样的数组信息(比如将JSON文件中的信息转换为这样的数组)。这里介绍这个数组的原因,是为了让读者从代码层面理解整个加载器的原理。
# 3. 销毁流图加载器
前面介绍了两种加载器的创建方法,它们均返回了通用流图加载器:aw_graph_loader_t。当不再使用流图加载器时,可以执行销毁操作,以释放相关资源,销毁函数的原型为:
ret_t aw_graph_loader_destroy(aw_graph_loader_t* loader);
具体使用时,直接通过loader参数传入使用创建函数创建的graph加载器即可:
aw_graph_loader_t *loader = aw_graph_loader_default_create(factory);
// ..... 其它操作
aw_graph_loader_destroy(loader);
# 4. 加载器设计
graph加载器的作用就是从graph的描述信息中解析出相关的节点信息(节点类型、节点属性、节点之间的关系等)。
AWFlow充分考虑了graph具体表示方法的灵活性,定义了抽象的graph加载器。例如,针对JSON格式的graph描述信息,可以实现一个graph加载器;针对XML格式的graph描述信息,可以实现一个graph加载器。使得AWFlow不会限制graph的具体表现形式。
但是,为了简化加载流程,提高加载速度,在嵌入式系统中,更加推荐使用C代码来表示(比如C数组)graph,这样的graph可以直接编译到程序中,效率很高。基于此,AWFlow实现了一个默认的graph加载器,graph通过数组描述,加载器直接从数组中获取信息,完成加载。示意图详见下图。

默认的加载器提供了一个add_config()方法,该方法即用于向默认加载器中添加graph信息(一个包含各个节点信息的数组)。由add_config的命名也可得知,该方法是"添加"信息,因而默认加载器支持维护多个graph(虽然某一确定的时刻只能使用一张确定的graph,但可以通过更换graph切换应用程序功能)。默认的加载器是依赖于节点工厂的,因为其要使用节点工厂完成节点的创建。
默认加载器的设计,似乎限制了graph的具体形式(必须为C数组)。但实际上,AWFlow提供了工具,当graph使用其它形式表示时(比如JSON),可以通过工具先将其转换为C中的数组,然后再使用默认的graph加载器进行加载。这就相当于提前完成了JSON文件的解析(不必在最终的应用程序固件中包含JSON文件的解析代码,否则,如果直接设计一个能够加载JSON文件的加载器,该加载器势必包含对JSON文件的解析代码),提高了实际运行中graph的加载效率。因此,对于用户来讲,依然可以使用默认的加载器。
当然,在一些资源丰富的平台中,如果增加对JSON文件的解析影响很小,出于便捷性考虑,也可以直接实现一个基于json文件信息的加载器(实际上,AWFlow中也实现了基于JSON信息格式的加载器:aw_graph_loader_json_t)。
核心的加载原理是一样的,无非是"graph"的具体表现形式不同,本章仅以默认加载器为例进行原理说明。
加载器部分AWFlow已经实现,也无需用户自己实现特有的加载器,因此,相关类型(aw_graph_loader_t和aw_graph_loader_default_t)仅需了解即可,无需深入研究。接下来,主要对默认加载器的使用方法作简要介绍。
# 5. 创建graph加载器
默认的graph加载器提供了对应的创建函数,其函数原型为:
aw_graph_loader_t* aw_graph_loader_default_create(aw_node_factory_t* factory);
在创建graph加载器时,需要通过factory参数指定该加载器所使用的节点工厂,以便后续加载器可以通过节点工厂创建指定类型(type)的节点。函数的返回值即为创建的graph加载器,aw_graph_loader_t是graph加载器类型,用户无需了解这个类型的详细定义,此处的返回值仅作为加载器的引用,后续可用于其它接口的参数传递。
如需创建一个默认类型的graph加载器,则可使用如下语句:
aw_node_factory_t *factory = aw_node_factory_create();
aw_graph_loader_t *loader = aw_graph_loader_default_create(factory);
# 6. 构建数据流图(C数组)
对于默认加载器,数据流图使用数组常量表示,数组中的每个成员都描述了一个节点,一个节点使用aw_graph_desc_t类型的数据表示,该类型的定义如下所示。
typedef struct _aw_graph_desc_t {
const char *type; /* 类型 */
const char *id; /* ID */
const char *consumer_ids; /* 消费者id */
const char *props; /* 其它属性列表 */
} aw_graph_desc_t;
其中包含了4个成员,每个成员的值都是一个字符串。type表示节点的类型,graph加载器将通过该信息完成节点的创建,因此,这里的type要与注册节点类对应的创建函数时指定的type一致;id即为该节点的id,其值将作为节点类中id成员的值;consumer_ids为该节点的消费者,其值将作为节点类中consumer_ids的值(若存在多个消费者,则多个消费者的id之间使用逗号分隔);props为创建该节点时,对该节点其它属性的设置(为节点的其它属性设定初始值)。
props指定了节点其它属性的初始值,不同节点需要设置的属性可能不尽相同,比如randrom number有min和max属性,而add节点仅有delta属性。在使用字符串表示时,每个属性使用一个键值对表示(键作为属性名,值作为属性值),具体格式为:
"name\0:value\0"
即属性名和属性值之间使用冒号分割。此外,属性名和属性值后应紧跟一个特殊字符'\0'(即C语言中字符串结束符),在属性名和属性值后都紧跟一个字符串结束符,主要是为了便于字符串的解析。注意,属性名不能为空,但属性值可以为空。
部分节点可能存在多个属性,那么在属性与属性之间无需任何间隔,直接紧紧相连即可:
"name1\0:value1\0name2\0:value2\0name3\0:value3\0"
特别地,在最后一个属性表示完成后,要额外增加两个字符'\0',以表示整个属性字符串结束。例如,要描述3个属性,则在第三个属性描述完成后额外再增加两个字符'\0'(由于value3后本身就有一个'\0',因此形式上最后有连续的3个'\0'):
"name1\0:value1\0name2\0:value2\0name3\0:value3\0\0\0"
在加载器加载一个节点时,会从props中解析出各个属性值,并使用节点提供的set_prop()方法完成这些属性值的设置。
例如,要对timer、random number、add、filter的各个扩展属性设置初始值,且设置的初始值如下所示。
节点类型 | 属性 | 值 |
---|---|---|
timer | duration | 1000 |
Random number | min | 10 |
Random number | max | 1000 |
add | delta | 1000 |
log | log_to_console | true |
则四个节点对应的props应分别设置为:
"duration\0:1000\0\0\0"
"min\0:10\0max\0:1000\0\0\0"
"delta\0:1000\0\0\0"
"log_to_console\0:true\0\0\0"
没有通过props设置的属性,将保持节点设计时指定的默认值。
根据图中所示的各个信息,可以完成数据流图的描述,范例程序如下所示。
static const aw_graph_desc_t s_flow2[] = {
{"timer", "2", "3", "duration\0:1000\0\0\0"},
{"random_number", "3", "4", "min\0:10\0max\0:1000\0\0\0"},
{"add", "4", "5", "delta\0:1000\0\0\0"},
{"log", "5", NULL, "log\0:\0prefix\0:\0max_size\0:1000\0log_to_console\0:true\0\0\0"},
{NULL, NULL, NULL, NULL}
};
注意,在数组最后需要添加一个全为NULL的节点描述成员,以标定数组的结束。如果读者自行尝试来编写这个数组可能会发现,维护这个数组是极为困难的:
- 属性字符串中存在大量的冒号、'\0',修改极为不便;
- 需要确保ID的唯一性,人为保证是比较困难的;
- 流图中的节点关系非常不直观,无法明显看出数据流关系(这还是一个十分简单的流图)。
在实际应用开发过程中,并不会全靠人工来维护这个数组的,往往会使用可视化工具AWFlow-Designer绘制流图,然后通过工具将流图转换为这样的数组信息。这里介绍这个数组的原因,是为了让读者从代码层面理解整个加载器的原理。
# 7. 将数据流图的描述添加到加载器中
为了将数据流图的描述(C数组)添加到加载器中,默认加载器提供了add_config()接口,其函数原型为:
ret_t aw_graph_loader_default_add_config(aw_graph_loader_t *loader, const char *name, const aw_graph_desc_t *desc);
其中,loader为前面创建的fraph加载器;name为数据流图的名称,当向加载器中添加了多个数据流图时,使用名称可以区分不同的数据流图;desc即为数据流图的描述。
例如,要将数据流图s_flow2添加到加载器中,则范例程序如下所示。
aw_graph_loader_t * loader = aw_graph_loader_default_create(factory);
aw_graph_loader_default_add_config(loader, "default", s_flow2);
注意,这里的添加仅仅是将这些信息告知加载器,加载器并不会立即启动加载工作。具体的加载时机统一由后文介绍的节点管理器控制。
# 3.5.3 节点仓库(node repository)
在介绍节点时提到,节点通常具有一些可配置属性,有的配置属性是临时的,但有的配置属性是需要持久化的(属性描述的标志中具有PROP_DESC_FLAG_PERSISTENT标志)。这些需要持久化的属性应该保存到非易失存储器中,以实现掉电不丢失。
节点仓库即用于实现这一功能,在一个系统中,通常只会使用一个节点仓库,用以保存所有节点的持久化属性。
考虑到不同应用场景保存属性的方式不尽相同。可能通过文件保存,也可能直接通过非易失存储器的接口保存到非易失存储器(比如EEPROM、SPI Nor FLASH、Nand Flash等)中,此外,具体保存的格式也可能不同,可能使用JSON格式、XML格式、甚至自定义格式。总之,具体保存属性的方法多种多样。
# 1. node repository接口类
为了统一存储持久化属性的接口,定义了node repository类,如下所示。
struct _aw_node_repository_t {
aw_node_repository_save_t save;
aw_node_repository_load_t load;
aw_node_repository_destroy_t destroy;
};
其中包含了三个核心的抽象方法:save、load和destroy。
save方法用于保存一个节点中所有需要持久化的属性,aw_node_repository_save_t的定义如下:
typedef ret_t (*aw_node_repository_save_t)(aw_node_repository_t* repository, aw_node_t* node);
其中的node参数即为需要保存属性值的节点,在具体类实现该方法时,应该通过节点的属性描述找到那些需要持久化的参数,并通过get_prop从节点中将该参数对应的属性值获取出来,然后将获取到的属性值保存到存储器中。
load方法用于恢复一个节点中所有需要持久化的属性,aw_node_repository_load_t的定义如下:
typedef ret_t (*aw_node_repository_load_t)(aw_node_repository_t* repository, aw_node_t* node);
其中的node参数即为需要恢复属性值的节点,在具体类实现该方法时,应该通过节点的属性描述找到那些需要持久化的参数,并将存储的属性值读取出来,通过节点类提供的set_prop()方法设置到节点中。
destroy方法在销毁一个节点仓库对象时使用,用于释放对象所占用的资源。具体占用了哪些资源,通常由创建函数决定(具体实现方法将在下节介绍具体类时详细介绍)。
这些抽象方法将由具体实现类实现,为了使上层应用可以更方便的使用这些方法,系统还基于抽象方法封装了几个API,它们直接调用了抽象方法,如下所示。
ret_t aw_node_repository_save(aw_node_repository_t* repository, aw_node_t* node) {
return repository->save(repository, node);
}
ret_t aw_node_repository_load(aw_node_repository_t* repository, aw_node_t* node) {
return repository->load(repository, node);
}
ret_t aw_node_repository_destroy(aw_node_repository_t* repository) {
return repository->destroy(repository);
}
上层应用可以通过调用aw_node_repository_save()存储一个节点的持久化参数,调用aw_node_repository_load()加载一个节点的持久化参数,调用aw_node_repository_destroy()销毁一个节点仓库。
# 2. node repository实现类
具体实现类与实际应用相关,不同的存储方案对应的实现可能不同。无论何种实现类,实现的核心都是load和save方法,这两个方法无非就是一些属性值的设置和获取操作。作为操作属性值的示例,这里使用简单的内存进行模拟存储(数据存储到内存中)。
注意,内存会掉电丢失,这里仅作示意之用,实际应用应该选择一些非易失存储器。它们的核心原理是完全一致的,读者按照本节介绍的方法,可以很容易实现其它类型的实现类(将数据保存到非易失存储器中)。
(1)定义具体实现类
具体实现类(aw_node_repository_mem_t)的定义如下所示。
typedef struct _aw_node_repository_mem_t {
aw_node_repository_t repository;
tk_object_t *store;
} aw_node_repository_mem_t;
为了简便,这里扩展了一个名为store的对象,程序中使用该对象来保存属性,即将节点中需要保存的属性直接保存到store对象中。store对象直接使用默认对象即可(默认对象可以通过object_default_create()创建),当使用set_prop向默认对象中添加一个新属性时,默认对象会自动在内存中开辟一段空间(malloc())保存下该属性(属性名和属性值)。
(2)实现save方法
save方法的实现范例如下所示。
static ret_t aw_node_repository_mem_save(aw_node_repository_t* repository, aw_node_t* node) {
value_t v;
uint32_t i = 0;
char key[TK_NAME_LEN + 1];
const prop_desc_t** prop_descs = node->prop_descs;
aw_node_repository_mem_t *repository_mem = AW_NODE_REPOSITORY_MEM(repository);
tk_object_t *store = repository_mem->store;
for (i = 0; prop_descs[i] != NULL; i++) {
const prop_desc_t* iter = prop_descs[i];
if (iter->flags & PROP_DESC_FLAG_PERSISTENT) {
if (object_get_prop(OBJECT(node), iter->name, &v) == RET_OK) {
tk_snprintf(key, sizeof(key), "%s:%s", node->id, iter->name);
object_set_prop(OBJECT(store), key, &v);
}
}
}
return RET_OK;
}
该程序在for循环中依次遍历了节点的各个属性描述,当属性具有持久化标志(PROP_DESC_FLAG_PERSISTENT)时,表明该属性需要保存。此时,首先通过get_prop()获取出节点中该属性的值,然后使用set_prop()将该属性保存至了store对象中。
值的注意的是,节点仓库要用于所有节点的属性保存,不同节点可能具有相同名称的属性(此外,同类型的节点必然具有相同的属性名,比如多个log节点,每个log节点的属性名都是相同的),为了使各个节点之间的属性互不冲突,在保存数据时,将保存的属性名设置为了"id:name"的格式,即在原名称前增加了id,并以冒号分割。比如id为"2"的min属性,在保存其对应的属性值时,会将属性名设置为:"2:min"。由于id具有唯一性,可以确保各个节点保存的属性不会冲突。
在实际应用中,可能存在一些属性值没有发生变化的情况,为了提高保存数据的效率,可以在保存属性之前做一个简单的判断:当属性值与保存在存储器中的属性相同时,无需再次保存,直接返回即可。
程序中的AW_NODE_REPOSITORY_MEM()只是一个简单的类型转换宏,其定义如下:
#define AW_NODE_REPOSITORY_MEM(repository) ((aw_node_repository_mem_t*)(repository))
(3)实现load方法
load方法的实现范例如下所示。
static ret_t aw_node_repository_mem_load(aw_node_repository_t* repository, aw_node_t* node)
{
value_t v;
uint32_t i = 0;
char key[TK_NAME_LEN + 1];
const prop_desc_t** prop_descs = node->prop_descs;
aw_node_repository_mem_t *repository_mem = AW_NODE_REPOSITORY_MEM(repository);
tk_object_t *store = repository_mem->store;
for (i = 0; prop_descs[i] != NULL; i++) {
const prop_desc_t* iter = prop_descs[i];
if (iter->flags & PROP_DESC_FLAG_PERSISTENT) {
tk_snprintf(key, sizeof(key), "%s:%s", node->id, iter->name);
if (object_get_prop(OBJECT(store), key, &v) == RET_OK) {
object_set_prop(OBJECT(node), iter->name, &v);
}
}
}
return RET_OK;
}
该方法的实现与save()方法的实现基本类似,唯一的区别在于,当遍历到一个持久化属性时,执行的操作恰好相反:首先从store对象中获取到相应的属性值,然后使用set_prop()设置到node节点中。值的注意的是,在从store对象中获取相应的属性值时,使用的属性名为:"id:name",这是由存储时指定的属性名决定的。
(4)实现创建函数
与节点类似,为了使外界可以通过创建函数完成节点仓库的创建,需要对外提供一个节点仓库的创建函数。destroy()方法的实现通常与创建函数的实现存在一定的关联(创建函数的逆操作),因此,destoty()方法的实现就不再单独介绍。
aw_node_repository_mem_t类对应创建函数的实现范例如下所示。
static ret_t aw_node_repository_mem_destroy(aw_node_repository_t* repository)
{
aw_node_repository_mem_t* repository_mem = AW_NODE_REPOSITORY_MEM(repository);
TK_OBJECT_UNREF(repository_mem->store);
TKMEM_FREE(repository_mem);
return RET_OK;
}
aw_node_repository_t* aw_node_repository_mem_create(void)
{
aw_node_repository_t *repository = NULL;
aw_node_repository_mem_t *repository_mem = TKMEM_ZALLOC(aw_node_repository_mem_t);
repository = AW_NODE_REPOSITORY(repository_mem);
repository_mem->store = object_default_create();
if (repository_mem->store != NULL) {
repository->save = aw_node_repository_mem_save;
repository->load = aw_node_repository_mem_load;
repository->destroy = aw_node_repository_mem_destroy;
} else {
TKMEM_FREE(repository_mem);
repository = NULL;
}
return repository;
}
基于此,上层应用就可以直接使用aw_node_repository_mem_create()函数完成一个"节点仓库"(基于内存模拟存储)的创建。
# 3.5.4 节点管理器
前面分别介绍了AWFlow中与节点相关的各个部分:节点、节点工厂、节点加载器、节点仓库等。每个部分的功能都是明确且独立的。但实际应用往往需要各个部分协作完成。为此,AWFlow还引入了"节点管理器",其作为整个系统的"管理员",协调各个部分有序工作。
节点管理器主要完成了以下任务:
(1)从graph中加载节点列表,并构建数据流图(利用graph完成节点加载,并将各个节点管理起来,按照消费者关系在系统中构建好整个流图,这个过程会根据consumers_ids完成节点对象中consumers成员的赋值,其中包含了各个消费者节点的指针);
(2)负责从node respository中获取存储的节点属性;
(3)提供额外的用户管理接口(如节点遍历功能等)。
节点管理器与节点、节点仓库、节点加载器均相关,对应的类图详见下图。

节点工厂仅由graph加载器访问,因而节点管理器与节点工厂没有直接联系。
AWFlow已经实现了节点管理器,用户直接使用即可。常用接口如下所示。
函数原型 | 功能简介 |
---|---|
aw_node_manager_t* aw_node_manager_create(aw_graph_loader_t *loader, aw_node_repository_t *repository) | 创建一个节点管理器 |
ret_t aw_node_manager_load_graph(aw_node_manager_t *manager, const char *name); | 从graph中加载节点列表,构建数据流图 |
ret_t aw_node_manager_destroy(aw_node_manager_t* manager); | 销毁节点管理器,释放相关资源 |
# 1. 创建节点管理器
创建节点管理器的函数原型为:
aw_node_manager_t* aw_node_manager_create(aw_graph_loader_t *loader, aw_node_repository_t *repository);
在创建一个节点管理器时,需要指定该节点管理器所关联的graph加载器和节点仓库。因而该创建函数也具有两个相应的参数:loader和repository。
假定使用AWFlow提供的默认加载器和基于内存实现的节点仓库,则创建一个节点管理器的范例程序如下所示。
aw_node_factory_t *factory = aw_node_factory_create();
aw_graph_loader_t *loader = aw_graph_loader_default_create(factory);
aw_node_repository_t *repository = aw_node_repository_mem_create();
aw_node_manager_t *manager = aw_node_manager_create(loader, repository);
# 2. 加载节点,构建数据流图
在创建好节点管理器后,即可在需要的时候通知节点管理器加载指定的graph,从而构建整个应用程序。节点管理器提供了一个专用函数用于完成该操作,其原型为:
ret_t aw_node_manager_load_graph(aw_node_manager_t *manager, const char *name);
其中,manager参数指定具体的节点管理器(通过创建函数创建的),name指定需要加载的graph的名称。
在介绍graph加载器时提到,当使用aw_graph_loader_default_add_config()函数将节点描述信息添加到加载器中时,需要指定一个name参数,其就是graph的名称,graph的名称为"default"。
aw_node_manager_load_graph(manager, "default");
注意,aw_graph_loader_default_add_config()函数的调用必须在加载之前进行,否则,加载时将找不到名为"default"的graph。
在实际应用中,可以向graph加载器中添加多张数据流图(add_config),进而通过加载不同的graph切换应用功能。应用可以在任何需要的时候,调用该接口加载新的graph。值得注意的是,同一时刻只能运行一个graph,当加载新的graph时,之前的graph将被移除,不再有效。
# 3. 销毁节点管理器
当不再使用节点管理器时,可以执行销毁操作,以释放相关资源,销毁函数的原型为:
ret_t aw_node_manager_destroy(aw_node_manager_t* manager);
具体使用时,直接通过manager参数传入使用创建函数创建的节点管理器即可:
aw_node_manager_t *manager = aw_node_manager_create(loader, repository);
// ..... 其它操作
aw_node_manager_destroy(manager);
# 3.5.5 loop循环
在介绍具体pump节点(timer节点类)的实现时提到,为了驱动整个数据流正常工作,使用aw_loop_add_timer()添加了一个周期性事务,以便每隔一定时间调用一次aw_node_pump_input_and_emit()函数。
在AWFlow中,所有事务都在一个loop(大循环)中顺序处理,各事务依序处理,不会相互抢占。这极大的增强了AWFlow的可移植性。AWFlow不依赖于操作系统服务(比如多任务)。在OS中,可以单独创建一个任务来运行loop循环;而在裸机系统中,可以直接在主循环中运行loop循环。
AWFlow定义了抽象的loop类,不同系统只需实现loop类中定义的几个抽象方法即可,类图详见下图。

具体定义如下。
struct _aw_loop_t;
typedef struct _aw_loop_t aw_loop_t;
struct _aw_loop_t {
aw_loop_run_t run;
aw_loop_quit_t quit;
aw_loop_wakeup_t wakeup;
aw_loop_destroy_t destroy;
aw_loop_add_timer_t add_timer;
aw_loop_remove_timer_t remove_timer;
aw_loop_add_idle_t add_idle;
aw_loop_remove_idle_t remove_idle;
aw_loop_add_source_t add_source;
aw_loop_remove_source_t remove_source;
aw_loop_remove_source_by_tag_t remove_source_by_tag;
};
类中定义了11个抽象方法,它们的基本用途如下所示。
方法 | 功能简介 |
---|---|
run | 进入主循环开始运行 |
quit | 退出循环 |
wakeup | 将系统从睡眠中唤醒(可选) |
destroy | 销毁 |
add_timer | 添加一个周期性(定时)事务 |
remove_timer | 移除已经添加的周期性(定时)事务 |
add_idle | 添加一个空闲事务 |
remove_idle | 移除一个空闲事务 |
add_source | 添加一个事件源 |
remove_source | 移除一个事件源 |
remove_source_by_tag | 移除一个事件源(通过制定TAG) |
AWFlow框架提供了一个默认的loop实现类,对于用户来讲,内部实现细节无需关心,只需调用其提供的创建函数即可完成loop对象的创建,创建函数的原型为:
aw_loop_t* aw_loop_default_create(void);
调用形式如下:
aw_loop_t* loop = aw_loop_default_create();
换句话说,用户可以很容易获取到一个loop对象。
在实现timer节点类时,通过aw_loop_add_timer()添加了一个周期性事务,实际上,通过抽象方法的定义可以发现,loop类总共支持3类事务:周期性事务、空闲事务、事件性质的事务。
- 周期性事务
周期性事务就是需要周期性执行的事务,这个已经在timer节点类的实现中涉及到了。
- 空闲事务
空闲事务是一旦空闲就执行的事务,可以理解为空闲事务是添加到主循环中全速运行的事务。
- 事件性质的事务
顾名思义,就是当某种特定的事件产生时,才执行的事务。其主要用于基于socket的通信中,一种典型的应用场景:在TCP通信中,当数据可读时("数据可读"视为一种事件),执行特定回调,进而读取数据,开始处理。
本节将重点介绍loop类对上层用户提供了哪些常用接口,便于在实现节点类时,向loop中添加需要处理的事务。常用接口如下表所示。
函数原型 | 功能简介 |
---|---|
uint32_t aw_loop_add_timer(aw_loop_t *loop, timer_func_t on_timer, void *ctx, uint32_t duration); | 添加一个周期性事务 |
ret_t aw_loop_remove_timer(aw_loop_t* loop, uint32_t id); | 移除一个周期性事务 |
uint32_t aw_loop_add_idle(aw_loop_t *loop, idle_func_t on_idle, void *ctx); | 添加一个空闲事务 |
ret_t aw_loop_remove_idle(aw_loop_t* loop, uint32_t id); | 移除一个空闲事务 |
ret_t aw_loop_run(aw_loop_t* loop); | 进入主循环开始运行 |
ret_t aw_loop_quit(aw_loop_t* loop); | 退出主循环 |
ret_t aw_loop_destroy(aw_loop_t* loop); | 销毁loop对象,释放相关资源 |
# 1. 添加一个空闲事务
在前面介绍的Random number节点类中,使用到了该函数添加空闲事务(以便在系统空闲时执行获取数据并分发操作),其函数原型为:
uint32_t aw_loop_add_idle(aw_loop_t* loop, idle_func_t on_idle, void* ctx);
on_idle即为空闲任务,ctx为空闲任务上下文(以便在空闲任务中使用ctx)。idle_func_t的类型定义为:
typedef ret_t (*idle_func_t)(const idle_info_t* idle);
空闲任务对应的函数,仅有一个idle_info_t类型的指针作为参数。在添加空闲任务时指定的ctx,包含在参数中,可以通过idle->ctx提取。范例程序如下所示。
ret_t aw_node_pump_input_and_emit_idle(const idle_info_t* info) {
aw_node_pump_input_and_emit(AW_NODE(info->ctx));
return RET_REPEAT;
}
aw_loop_add_idle(node->manager->loop, aw_node_pump_input_and_emit_idle, node);
值的注意的是,空闲任务的返回值可以决定该空闲任务是否在后续循环中继续执行(即是否被移除)。若需重复执行,则返回值应设置为RET_REPEAT;否则,将自动移除该任务,后续主循环空闲时,不会再执行。
aw_loop_add_idle()函数的返回值是一个无符号32位整数,该整数是一个唯一ID,作为本次添加任务的标识。后续需要操作本次添加的空闲任务时(比如移除操作),可以通过该ID指定。特别地,当ID的值为TK_INVALID_ID时,表示添加失败。
# 2. 移除一个空闲事务
通过aw_loop_add_idle()函数添加的空闲事务,可以通过该函数移除,其函数原型为:
ret_t aw_loop_remove_idle(aw_loop_t* loop, uint32_t id);
参数id为添加事务时的返回值。典型使用方法如下:
uint32_t id = aw_loop_add_idle(loop, aw_node_pump_input_and_emit_idle, node);
// 其它操作
aw_loop_remove_idle(id);
# 3. 添加一个周期性事务
空闲任务只要一空闲就会执行,每次主循环都会执行一次。部分操作可能并不需要以如此高的频率执行,而是以一定的时间间隔周期性运作(比如每100ms执行一次)。添加周期性任务的函数原型为:
uint32_t aw_loop_add_timer(aw_loop_t* loop, timer_func_t on_timer, void* ctx, uint32_t duration);
on_timer即为需要周期性执行的任务,ctx为任务的上下文(以便在任务中使用ctx),duration为执行周期(单位:ms)。timer_func_t的类型定义为:
typedef ret_t (*timer_func_t)(const timer_info_t* timer);
周期性任务对应的函数,仅有一个timer_info_t类型的指针作为参数。在添加周期性任务时指定的ctx,包含在参数中,可以通过timer->ctx提取。
假定pump节点对应的数据获取和分发函数仅需100ms执行一次,则对应的范例程序如下所示。
ret_t aw_node_pump_input_and_emit_timer(const timer_info_t* info) {
aw_node_pump_input_and_emit(AW_NODE(info->ctx));
return RET_REPEAT;
}
aw_loop_add_timer(node->manager->loop, aw_node_pump_input_and_emit_timer, node,100);
周期性任务的返回值同样可以决定该周期性任务是否继续存在(若存在,则会继续按照指定周期执行)。若需重复执行,则返回值应设置为RET_REPEAT;否则,将自动移除该任务,周期性任务不会再被执行。
同理,aw_loop_add_timer()函数的返回值是一个无符号32位整数,该整数是一个唯一ID,作为本次添加任务的标识。后续需要操作本次添加的任务时(比如移除操作),可以通过该ID指定。特别地,当ID的值为TK_INVALID_ID时,表示添加失败。
# 4. 移除一个周期性事务
通过aw_loop_add_timer()函数添加的周期性事务,可以通过该函数移除,其函数原型为:
ret_t aw_loop_remove_timer(aw_loop_t* loop, uint32_t id);
参数id为添加事务时的返回值。典型使用方法如下:
uint32_t id = aw_loop_add_timer(node->manager->loop, aw_node_pump_input_and_emit_timer, node,100);
// 其它操作
aw_loop_remove_timer(id);
# 5. 进入主循环开始运行
进入主循环开始运行的函数原型为:
ret_t aw_loop_run(aw_loop_t* loop);
调用该函数后,将进入主循环开始运行,循环中将依照期望执行空闲任务和周期性任务。该函数的实现仅仅是简单的调用了run()方法:
ret_t aw_loop_run(aw_loop_t* loop) {
return loop->run(loop);
}
显然,run()方法的具体实现与loop实现类相关。但通常情况下,主循环核心代码的实现形式为:
loop->running = TRUE;
while (loop->running) {
// 主循环事务处理
}
在类的定义中,除了5个抽象方法外,还定义了一个bool类型的running成员,其表示当前loop是否处于运行状态。显然,进入主循环开始运行之后,如果不修改running的值,那么将一直在主循环中运行,aw_loop_run()函数后面的语句将不会执行。
为了使主循环可以在需要时(比如结束应用程序)退出,loop类还专门提供了退出接口。
# 6. 退出主循环
当需要退出主循环时,可以使用该函数,其函数原型为:
ret_t aw_loop_quit(aw_loop_t* loop);
在该函数的实现中,仅仅是将running的值修改为了FALSE。quit接口的实现如下所示。
ret_t aw_loop_quit(aw_loop_t* loop)
{
loop->running = FALSE;
return RET_OK;
}
# 7. 销毁loop对象
当不再使用一个loop对象时,可以将其销毁,销毁函数的原型为:
ret_t aw_loop_destroy(aw_loop_t* loop);
当退出主循环时,往往意味着应用程序的结束,因此,可以将销毁语句放在aw_loop_run()函数之后,例如:
aw_loop_run(loop);
aw_loop_destroy(loop);
在实际应用中,往往还有很多其它需要销毁的对象,比如节点管理器、节点仓库、graph加载器、节点工厂等等。
aw_loop_destroy()函数的实现也非常简单,其直接调用了destroy()方法:
ret_t aw_loop_destroy(aw_loop_t* loop) {
return loop->destroy(loop);
}
# 3.6 综合范例程序
# 3.6.1 四个节点构成的基础应用
前面详细介绍了AWFlow中的各个部分,同时实现了三种具体节点类:Random number、add、log。前面列举的代码大多是一些程序片段,但都不是完整的可执行程序。
一个完整的范例程序(具体功能对应于图3.10所示的数据流图)如下所示(假定直接在PC上运行测试,入口为main()函数)。
#include "tkc/platform.h"
#include "aw_flow/base/aw_node_manager.h"
#include "aw_flow/base/aw_loop_default.h"
#include "aw_flow/base/aw_node_factory.h"
#include "aw_flow/nodes/pump_random_number/aw_node_pump_random_number.h"
#include "aw_flow/nodes/filter_add/aw_node_filter_add.h"
#include "aw_flow/nodes/sink_log/aw_node_sink_log.h"
#include "aw_flow/base/aw_node_repository_mem.h"
#include "aw_flow/base/aw_graph_loader_default.h"
/* graph加载器(默认加载器)*/
#define aw_graph_loader_create aw_graph_loader_default_create
/* 节点仓库(默认的基于内存模拟的仓库)*/
#define aw_node_repository_create aw_node_repository_mem_create
/* 数据流图(节点描述)*/
static const aw_graph_desc_t s_flow2[] = {
{"random_number", "2", "3", "min\0:10\0max\0:1000\0\0\0"},
{"add", "3", "4", "delta\0:1000\0\0\0"},
{"log", "4", NULL, "log\0:\0prefix\0:\0max_size\0:1000\0log_to_console\0:true\0\0\0"},
{NULL, NULL, NULL, NULL}
};
/* 将节点创建函数注册到节点工厂中 */
static ret_t register_nodes(aw_node_factory_t* factory) {
aw_node_factory_register(factory, "random_number", aw_node_pump_random_number_create);
aw_node_factory_register(factory, "add", aw_node_filter_add_create);
aw_node_factory_register(factory, "log", aw_node_sink_log_create);
return RET_OK;
}
/* 添加数据流图 */
static ret_t register_graphs(aw_graph_loader_t* loader) {
aw_graph_loader_default_add_config(loader, "default", s_flow2);
return RET_OK;
}
static int flow_app_start(void) {
aw_node_factory_t *factory = aw_node_factory_create();
aw_graph_loader_t *loader = aw_graph_loader_default_create(factory);
aw_node_repository_t *repository = aw_node_repository_mem_create();
aw_node_manager_t *manager = aw_node_manager_create(loader, repository);
aw_loop_t *loop = aw_loop_default_create(manager);
register_nodes(factory);
register_graphs(loader);
aw_node_manager_load_graph(manager, "default");
aw_loop_run(loop);
aw_loop_destroy(loop);
aw_graph_loader_destroy(loader);
aw_node_manager_destroy(manager);
aw_node_factory_destroy(factory);
aw_node_repository_destroy(repository);
return 0;
}
int main(int argc, char* argv[]) {
return_value_if_fail(platform_prepare() == RET_OK, RET_FAIL);
TK_ENABLE_CONSOLE();
flow_app_start();
return 0;
}
在main()函数中,首先调用了platform_prepare()函数准备好平台,主要做一些与平台相关的初始化操作(比如计时器等),不同平台的实现可能不同。
TK_ENABLE_CONSOLE()为一个使能宏,用于使能控制台,以便log节点中通过log_debug()输出的字符串信息可以打印到控制台中。
flow_app_start()为真正的应用启动函数:首先创建了各种对象(节点工厂、graph加载器、节点仓库、节点管理器、loop等),接着使用register_nodes()和register_graphs()注册了具体的节点类以及数据流图(节点描述),然后使应用程序进入主循环,当应用程序退出时,最终销毁之前创建的各个对象。
读者可以将flow_app_start()函数的实现视为模板,通常情况下,该函数无需作任何修改,而需要根据应用修改的地方只有5处:
(1)graph加载器的创建函数
程序中使用了默认的graph加载器,若需使用用户自定义的其它加载器,仅需将aw_graph_loader_create宏修改为实际的创建函数。
(2)节点仓库的创建函数
程序中使用了基于内存模拟的节点仓库(便于测试),若需使用用户自定义的其它节点仓库,仅需将aw_node_repository_create宏修改为实际的创建函数。
(3)register_nodes函数实现
register_nodes()函数负责注册所有具体节点类对应的创建函数,在实际应用中,应该根据需要注册具体节点类的创建函数。
(4)register_graphs函数实现
register_nodes()函数负责向加载器中添加数据流图的描述,在实际应用中,应根据需要添加。绝大部分应用可能都只会使用"一张"数据流图,则register_nodes()函数的实现可以保持不变(只需要保证定义节点描述数组时,其数组名为s_flow2)。
(5)数据流图描述(节点描述数组)
节点描述数组应根据实际情况构建,不同应用该数组内容通常都是不同的。
当节点类型、节点仓库、graph加载器都没有发生变化时,如果需要修改应用功能,则只需修改节点描述数组即可。
# 3.6.2 添加log节点,实现对输入的监控
运行程序后,控制台会每秒打印一次信息,形如:
value:1081
value:1045
value:1204
value:1272
value:1340
value:1889
value:1057
该应用的主要功能是:add节点将随机数节点产生的数据增加1000,并通过log节点打印出增加后的值。但实际上,只看结果并不能验证整个数据流工作的正确性。为了查看整个数据流,最好是能够将整个数据流的原始输入数据(产生的随机数)也打印出来,如此一来,通过比较输入和输出,就可以很好的判断数据流工作的正确性。
log节点可以"监视"数据,为了查看输入数据,可以在random_number节点之后增加一个log节点作为其消费者。更新后的数据流图详见下图。

由于该应用只是添加了一个已有类型的节点(log),节点类型、节点仓库、graph加载器都没有发生变化,因而只需要在原应用程序的基础上,修改节点描述数组的内容。
在定义数组内容之前,为便于更加直观的查看各个节点所分配的id及属性值,绘制了带完整信息的数据流图,详见下图。

为了便于区分输入数据和输出数据,对于输入数据对应的log节点(id为"5"),设置了前缀为"input:";对于输出数据对应的log节点(id为"4"),设置了前缀为"output:"。根据图中所示的各个信息,可以完成数据流图的描述,范例程序如下所示。
static const aw_graph_desc_t s_flow2[] = {
{"random_number", "2", "5,3", "min\0:10\0max\0:1000\0\0\0"},
{"add", "3", "4", "delta\0:1000\0\0\0"},
{"log", "4", NULL,"log\0:\0prefix\0:output: \0max_size\0:1000\0log_to_console\0:true\0\0\0"},
{"log", "5", NULL,"log\0:\0prefix\0:input: \0max_size\0:1000\0log_to_console\0:true\0\0\0"},
{NULL, NULL, NULL, NULL}
};
将其与上述程序中的数组信息对比可以发现,这里增加了一个id为"5"的log节点,且将该节点作为了"random_number"的首个消费者。
运行该应用程序,将可以同时看到输入信息和输出信息:
input: value:884
output: value:1884
input: value:598
output: value:1598
input: value:658
output: value:1658
可以看到,输出数据恰好比输入数据大1000,说明整个数据流工作正常。在介绍"节点消费者"时提到,AWFlow在将数据分发给消费者时,分发的顺序与consumer_ids中各个id的顺序保持一致。应该特别注意可能修改原始数据的消费者节点,因为它的修改会影响到后续消费者。在本应用中,add节点也是"random_number"的消费者,且其会直接修改原始数据(详见程序aw_node_filter_add_transform,其直接将data中的数值型属性值增加了一个特定的值),基于此,为了查看输入数据,在"random_number"的消费者中,log节点(id为"5")必须排在add节点(id为"3")的前面,因而consumer_ids的值为"5,3"。
作为验证,读者可以尝试将"5,3"修改为"3,5"。此时,由于log节点排在add节点之后,log节点打印的值将是add操作之后的值,其将会与输出数据一致,进而看不到原始的输入数据。输出信息形如:
output: value:1994
input: value:1994
output: value:1993
input: value:1993
output: value:1061
input: value:1061
由此可见,input和output的值是完全相同的。同时,还可以发现另外一个现象,就是以"output"作为前缀的信息先输出,而以"input"作为前缀的信息后输出。这是由于id为"3"的add节点作为random_number的首个消费者,数据将首先分发给它,其对数据的处理过程为:对数据进行处理(增值操作),然后分发给其消费者(id为"4"的log节点)。也就是说,add节点对数据的处理过程包含了其消费者对数据的处理,因而id为"4"的log节点(前缀为"output")会先于id为"5"的log节点(前缀为"input")接收到数据。