LF Edge eKuiper 是 Golang 实现的轻量级物联网边缘分析、流式处理开源软件,可以运行在各类资源受限的边缘设备上。eKuiper是一个规则引擎,即按一定的规则去处理数据。eKuiper灵活地解决了:从哪里取数,进行什么处理,处理完发到哪去 这些问题。
eKuiper虽然不是edgex的一部分,但与edgex做了比较深的集成。
关于eKuiper的基本使用,可以参考官方文档:https://ekuiper.org/docs/zh/latest/ 。
本文仅专注于如何在edgex中使用eKuiper的图拓扑规则来进行一些数据的处理。同时指出eKuiper哪些能做到,哪些做不到,有哪些坑。
注:基于eKuiper版本1.6.2。
在物联网的场景下,一些设备的数据是需要进行加工或处理的,比如:
做一些简单计算 典型的是温度采集上来都是int型的,需要除以10或100再-40得到真实值。这种线型标定,可以在采集时进行。比如edgex中的device-modbu采集服务就可以通过配置来实现。 但如果采集阶段不由我们控制,我们不好干涉,需要上报后处理,那一种办法是在自定义的application service中写逻辑处理,也可以使用eKuiper进行处理。
进行采样或聚合 想计算下每分钟的平均温度,然后再发送到云端或者发送到application service中去处理或者触发对设备的控制。这种场景非常适合使用eKuiper进行。
因此,希望通过edgex和eKuiper的图规则来实现以下场景:
如图所示:
然后,开始配置图拓扑规则。图拓扑规则的API的body数据格式如下:
{
"id": "graph_rule_demo", //规则id,唯一标识
"name": "Graph Rule - control", //规则名称
"graph": {
"nodes": { //节点列表,定义图上的节点
"local-edgex-source": {
"type": "source",
"nodeType": "edgex",
"props": {
"confKey": "my_conf"
}
},
……
},
"topo": { //关系列表,定义图中的连线
"sources": ["local-edgex-source"],
"edges": {
"local-edgex-source": ["device-filter"],
……
}
}
}
}
关于图拓扑配置的官方文档都很少。
拓扑中nodes的json结构如下:
"your-source-name": {
"type": "source",
"nodeType": "edgex",
"props": {
……
}
}
然后你看官方文档提到的配置:
看似你能在props下面去自己定义这些参数。但实际上,这些属性都不能生效。也就是说:
总结一下,目前从edgex的消息总线中接收数据,有2种方式:
name: default
messageType: event
port: 6379
protocol: redis
server: edgex-redis
topic: rules-events
type: redis
这种情况下,node配置中props缺省就行了。
"your-source-name": {
"type": "source",
"nodeType": "edgex",
"props": {
}
}
但其实展示的是错的,实际订阅的topic是edgex/events/device/#
,这在ekuiper控制台日志里可以看到。
name: my_conf
messageType: request //如果从消息总线,这里必须是request
port: 6379
protocol: redis
server: edgex-redis //实际redis的ip或域名
topic: edgex/events/device/# //这个topic可以按需进行,但要按照edgex的层级
type: redis
注意:
如果从消息总线中接收消息,那messageType要使用request。 topic前缀为edgex/events/device
,并遵循edgex的层级结构:edgex/events/device/<device-profile-name>/<device-name>/<source-name>
。
但其实dtos.Event 和 requests.AddEventRequest 的区别不大,从源码看,AddEventRequest在Event外面包了一层:
此时,props中的confKey配为已有配置即可。
"your-source-name": {
"type": "source",
"nodeType": "edgex",
"props": {
"confKey": "my_conf"
}
},
图拓扑中的规则配置,请参考官方文档:https://ekuiper.org/docs/zh/latest/rules/graph_rule.html
但文档中都是些简单的示例。所以我们要把之前的SQL转换成json的配置项,只能不断尝试。下面是一部分规则的注意点。
"device-filter": {
"type": "operator",
"nodeType": "filter",
"props": {
"expr": "meta(deviceName) = \"DemoModbusDevice\" AND Temperature < 30"
}
},
注意:
逻辑拼接,使用AND / OR,而不是 && |
"temp-cal": {
"type": "operator",
"nodeType": "function",
"props": {
"expr": "power(Temperature, 2) AS TemperaturePredict"
}
},
注意:
power(Temperature, 2) AS TemperaturePredict, ln(Temperature, 2) AS TemperaturePredict2
形式是不行的,只有第1个会被计算,后面的参数会被忽略。在计算节点之后,会增加新属性,但原有属性也会保留。也就是说,如果此时再流回消息总线,那流回的这个消息也会重新进入到ekuiper源中,也就是成了循环,消息会指数级增长,很快把服务器资源用满。
为了避免成环,1种方法是在订阅的topic中加过滤,即在edgex源配置中,topic上配到详细的层级。这种的情况缺点是你只能处理某个设备或某类属性的数据。 还有1种方法是,我只向edgex总线发新生成的属性,这就用到了pick类型的node。
配置很简单:
"field-pick": {
"type": "operator",
"nodeType": "pick",
"props": {
"fields": ["TemperaturePredict"]
}
},
把消息流入edgex总线,最基础的配置如下:
"local-edgex-sink": {
"type": "sink",
"nodeType": "edgex",
"props": {
"host":"edgex-redis",
"port": 6379,
"protocol": "redis",
"messageType": "request",
"topicPrefix": "edgex/events/device",
"contentType": "application/json"
}
}
},
此时会发现,由于没有带过来元数据信息,流入edgex消息总线的数据里,deviceName、profileName和sourceName都使用了如下所示的默认值:
{
"apiVersion":"v2",
"id":"37720fa9-ef58-427e-b636-0f2a4a6bf3f3",
"deviceName":"ekuiper",
"profileName":"ekuiperProfile",
"sourceName":"graph_rule_edgex_demo",
"origin":1666248283338039300,
"readings":[
{
"id":"2988a557-1384-4244-9570-f8fe409cbbc9",
"origin":1666248283338074000,
"deviceName":"ekuiper",
"resourceName":"TemperaturePredict",
"profileName":"ekuiperProfile",
"valueType":"Float64",
"value":"2.433600e+02"
}
]
}
如果只处理一个设备的数据,那在props中直接配置topic就可以了,如topic: edgex/events/device/ProfileAAA/DeviceAAA/TemperaturePredict
。或者配置topicPrefix,并配置deviceName、profileName和sourceName,如:
"topicPrefix": "edgex/events/device",
"deviceName":"DeviceAAA",
"resourceName":"TemperaturePredict",
"profileName":"ProfileAAA",
如果我处理的是多个设备,那deviceName就不能配死了,这就需要配置动态元数据,来保留之前的元数据。
按照文档 动态元数据 中所示:
把这个转换到图拓扑的配置就是,先要加一个计算node,产生edgex_meta这个字段,然后在edgex sink中配置metadata,如下所示:
"meta-fill": {
"type": "operator",
"nodeType": "function",
"props": {
"expr": "meta(*) AS edgex_meta"
}
},
"local-edgex-sink": {
"type": "sink",
"nodeType": "edgex",
"props": {
"host":"edgex-redis",
"port": 6379,
"protocol": "redis",
"messageType": "request",
"topicPrefix": "edgex/events/device",
"contentType": "application/json"
"metadata": "edgex_meta"
}
}
注意:
这就有问题了,我的场景下其实只是想保留deviceName、profileName,而sourceName指定为新生成的字段TemperaturePredict
,也就是说仅仅deviceName、profileName是动态的,但这在目前没办法做到。
个人感觉应该是props中deviceName、profileName和sourceName的配置优先级更高,用户的主观配置去覆盖meta中的数据才合理。
整个规则的配置如下,仅供参考:
{
"id": "graph_rule_edgex_demo",
"name": "Test Graph Rule",
"graph": {
"nodes": {
"local-edgex-source": {
"type": "source",
"nodeType": "edgex",
"props": {
"confKey": "my_conf"
}
},
"device-filter": {
"type": "operator",
"nodeType": "filter",
"props": {
"expr": "meta(deviceName) = \"DemoModbusDevice\" AND Temperature < 30"
}
},
"temp-cal": {
"type": "operator",
"nodeType": "function",
"props": {
"expr": "power(Temperature, 2) AS TemperaturePredict"
}
},
"field-pick": {
"type": "operator",
"nodeType": "pick",
"props": {
"fields": ["TemperaturePredict"]
}
},
"meta-fill": {
"type": "operator",
"nodeType": "function",
"props": {
"expr": "meta(*) AS edgex_meta"
}
},
"local-mqtt-sink": {
"type": "sink",
"nodeType": "mqtt",
"props": {
"server": "tcp://local-emqx:1883",
"topic": "demo/sink/greater"
}
},
"local-edgex-sink": {
"type": "sink",
"nodeType": "edgex",
"props": {
"host":"edgex-redis",
"port": 6379,
"protocol": "redis",
"messageType": "request",
"topicPrefix": "edgex/events/device",
"contentType": "application/json"
}
}
},
"topo": {
"sources": ["local-edgex-source"],
"edges": {
"local-edgex-source": ["device-filter"],
"device-filter": ["temp-cal"],
"temp-cal": ["field-pick"],
"field-pick": ["meta-fill"],
"meta-fill": ["local-mqtt-sink", "local-edgex-sink"]
}
}
}
}
上面的配置并不能直接完成最初的场景设定,即:通过计算生成新的属性,并流回edgex的消息总线(虽然流回了消息总线,但里面的数据格式不是理想的)。 work around方法是,我把edgex_meta作为一个属性,放回消息总线,在application service中二次处理。
最终,流入消息总线的消息如下:
topic: edgex/events/device/ekuiperProfile/ekuiper/graph_rule_edgex_demo
{
"apiVersion":"v2",
"id":"7274c14e-ba66-4c55-9ef3-45856f9d75c0",
"deviceName":"ekuiper",
"profileName":"ekuiperProfile",
"sourceName":"graph_rule_edgex_demo",
"origin":1666250712010773200,
"readings":[
{
"id":"3138b5eb-8b5a-40d3-bca2-b3352ef6ca98",
"origin":1666250712010787000,
"deviceName":"ekuiper",
"resourceName":"edgex_meta",
"profileName":"ekuiperProfile",
"valueType":"Object",
"objectValue":{
"Temperature":{
"deviceName":"DemoModbusDevice",
"id":"b9652a7f-3ff2-4cb4-8789-2c8db1f263b8",
"origin":1666250712009348900,
"profileName":"DemoModbusProfile",
"valueType":"Float32"
},
"correlationid":"29e2f9f5-38d6-42a8-bfdf-56811619a8d6",
"deviceName":"DemoModbusDevice",
"id":"dacaeb8f-7568-44d2-af7f-e4ce6ecd375c",
"origin":1666250712009531400,
"profileName":"DemoModbusProfile",
"sourceName":"Temperature",
"tags":null
}
},
{
"id":"98d45a05-c613-441b-ad20-957327e80bf4",
"origin":1666250712010809000,
"deviceName":"ekuiper",
"resourceName":"TemperaturePredict",
"profileName":"ekuiperProfile",
"valueType":"Float64",
"value":"2.433600e+02"
}
]
}
发到mqtt的消息如下:
[
{
"TemperaturePredict":243.36001190185561,
"edgex_meta":{
"Temperature":{
"deviceName":"DemoModbusDevice",
"id":"ac8764e1-dcfd-4210-b410-8e56e00bc722",
"origin":1666252953917595800,
"profileName":"DemoModbusProfile",
"valueType":"Float32"
},
"correlationid":"af8b46ac-a9da-4475-8db5-b9644f09db15",
"deviceName":"DemoModbusDevice",
"id":"8baf53af-4633-4ffd-aa48-64b0ae55be0d",
"origin":1666252953917725800,
"profileName":"DemoModbusProfile",
"sourceName":"Temperature",
"tags":null
}
}
]
可以看到,edgex_meta被当做一个属性进行了包装,进入了edgex消息总线。