Temperature, Pressure and Vibration streams joining and finding simple pattern

Let's take example of IOT scenario. There are three sensors for temperature, pressure and vibration. We wish to join these streams such that for each point we get all these three values as they arrive and in the end we would like to notify when temperature, pressure and vibration all satisfy some condition, here when (temp > 71.1 and pressure > 11.1 and vibration > 35).

Joining of streams are happening on simple condition, i.e. where s2.point = s2.point, this is done by

``"jqry":{"cond":["point"], "opid":11, "args":["point"]}``

We can add `"cmp":["GT", "LTE"]` in the jqry. The implicit is `"cmp":["EQ"]`

``````temperature stream +==> temp_pressure stream pressure stream + ===> temp_pressure_vibration
stream vibration stream``````

Example

Here we will use join_type = 1, that's simple join, which means both streams would actively join with each other, but once joined they will not use the same event data for next join.

Let's define the sample stream first.

``````{
"schema":"myschema",
"streams":[
{
"name":"temp_stream",
"type":1,
"swsz":81600,
"inpt":[

],
"attr":[
{
"name":"temp",
"type":11
},
{
"name":"point",
"type":9
}
],
"join":[
{
"name":"temp_pressure_join",
"type":1,
"tloc":300,
"iatr":[
"temp",
"point"
],
"rstm":"pressure_stream",
"ratr":[
"pressure"
],
"jqry":{
"cond":[
"point"
],
"opid":11,
"args":[
"point"
]
},
"ostm":"temp_pressure_stream"
}
]
},
{
"name":"pressure_stream",
"type":1,
"inpt":[

],
"attr":[
{
"name":"pressure",
"type":11
},
{
"name":"point",
"type":9
}
],
"join":[
{
"name":"temp_pressure_join",
"type":1,
"tloc":300,
"ratr":[
"temp",
"point"
],
"rstm":"temp_stream",
"iatr":[
"pressure"
],
"jqry":{
"cond":[
"point"
],
"opid":11,
"args":[
"point"
]
},
"ostm":"temp_pressure_stream"
}
]
},
{
"name":"vibration_stream",
"type":1,
"inpt":[

],
"attr":[
{
"name":"vibration",
"type":9
},
{
"name":"point",
"type":9
}
],
"join":[
{
"name":"temp_pressure_vibration_join",
"type":1,
"tloc":300,
"ratr":[
"temp",
"pressure",
"point"
],
"rstm":"temp_pressure_stream",
"iatr":[
"vibration"
],
"jqry":{
"cond":[
"point"
],
"opid":11,
"args":[
"point"
]
},
"ostm":"temp_pressure_vibration_stream"
}
]
},
{
"name":"temp_pressure_stream",
"type":3,
"inpt":[
"temp_stream",
"pressure_stream"
],
"attr":[
{
"name":"point",
"type":9
},
{
"name":"temp",
"type":11
},
{
"name":"pressure",
"type":11
}
],
"join":[
{
"name":"temp_pressure_vibration_join",
"type":1,
"tloc":300,
"iatr":[
"temp",
"pressure",
"point"
],
"rstm":"vibration_stream",
"ratr":[
"vibration"
],
"jqry":{
"cond":[
"point"
],
"opid":11,
"args":[
"point"
]
},
"ostm":"temp_pressure_vibration_stream"
}
]
},
{
"name":"temp_pressure_vibration_stream",
"type":3,
"inpt":[
"temp_pressure_stream",
"vibration_stream"
],
"attr":[
{
"name":"point",
"type":9
},
{
"name":"temp",
"type":11
},
{
"name":"pressure",
"type":11
},
{
"name":"vibration",
"type":9
}
],
"cepq":[
{
"name":"temp_press_vib_cond",
"type":6,
"tloc":86400,
"fqry":{
"name":"{"query":[{"key":"temp", "cmp_op":0, "val":71.1},{"joinop":0},{"key":"pressure", "cmp_op":0, "val":11.0},{"joinop":0},{"key":"vibration", "cmp_op":0, "val":35}]}",
"type":1
},
"notf":12345
}
]
}
],
"notifs":[
{
"notifid":12345,
"name":"notif1",
"msg":"users msg",
"pri":1,
"mailto":[

],
"endpoints":[
"http://192.168.1.49:10102/iot"
],
"schemaid":1234,
"notif_streamid":4321,
"notif_stream_name":"sdf",
"freq":1,
"tags":[
"a"
],
"rel_streams":[
"s1"
]
}
]
}``````

Three streams, they each measure temp, pressure and vibration for the point p. temp_stream and pressure_stream are joining with each other to send data to temp_pressure_stream. Further, vibration_stream and temp_pressure_stream are joining together to send data to temp_pressure_vibration_stream.

The join condition for all of these streams are s1.point = s2.point, i.e. join on point. Finally, from the temp_pressure_vibration_stream notification is sent when temp greater than 71.1 AND pressure greater than 11.0 AND vibration is greater than 35.

Now, let's push some data in following order:

``````put [ temp_stream ] : {"temp":70.1, "point":1}
put [ pressure_stream ] : {"pressure":10.2, "point":1}
put [ pressure_stream ] : {"pressure":11.1, "point":1}
put [ vibration_stream ] : {"vibration":30, "point":1}
put [ pressure_stream ] : {"pressure":11.5, "point":1}
put [ vibration_stream ] : {"vibration":40, "point":1}
put [ pressure_stream ] : {"pressure":11.8, "point":1}
put [ temp_stream ] : {"temp":71.1, "point":1}
put [ vibration_stream ] : {"vibration":50, "point":1}
put [ temp_stream ] : {"temp":71.2, "point":1}
put [ vibration_stream ] : {"vibration":60, "point":1}``````

Now let's scan and see data from different streams. Here are the data in temp stream:

``````{
"temp":70.10000000000001,
"point":1,
"_pk":1584944534256620,
"_v":1
}
{
"temp":71.10000000000001,
"point":1,
"_pk":1584944534362863,
"_v":1
}
{
"temp":71.2,
"point":1,
"_pk":1584944534396958,
"_v":1
}``````

Here are the data in pressure stream:

``````{
"pressure":10.2,
"point":1,
"_pk":1584944534266887,
"_v":1
}
{
"pressure":11.1,
"point":1,
"_pk":1584944534280460,
"_v":1
}
{
"pressure":11.5,
"point":1,
"_pk":1584944534308709,
"_v":1
}
{
"pressure":11.8,
"point":1,
"_pk":1584944534332148,
"_v":1
}``````

Here is the data in vibration stream:

``````{
"vibration":30,
"point":1,
"_pk":1584944534296090,
"_v":1
}
{
"vibration":40,
"point":1,
"_pk":1584944534320131,
"_v":1
}
{
"vibration":50,
"point":1,
"_pk":1584944534384031,
"_v":1
}
{
"vibration":60,
"point":1,
"_pk":1584944534397786,
"_v":1
}``````

Here are the data in temp_pressure stream:

``````{
"pressure":10.2,
"_pk":1584944534266887,
"temp":70.10000000000001,
"point":1,
"_jpk1":1584944534256620,
"_v":1
}
{
"temp":71.10000000000001,
"point":1,
"_pk":1584944534362863,
"pressure":11.1,
"_jpk1":1584944534280460,
"_v":1
}
{
"temp":71.2,
"point":1,
"_pk":1584944534396958,
"pressure":11.5,
"_jpk1":1584944534308709,
"_v":1
}``````

Here are the data in temp_pressure_vibration stream:

``````{
"vibration":30,
"_pk":1584944534296090,
"temp":70.10000000000001,
"pressure":10.2,
"point":1,
"_jpk1":1584944534266887,
"_v":1
}
{
"temp":71.10000000000001,
"pressure":11.1,
"point":1,
"_pk":1584944534362863,
"vibration":40,
"_jpk1":1584944534320131,
"_v":1
}
{
"temp":71.2,
"pressure":11.5,
"point":1,
"_pk":1584944534396958,
"vibration":50,
"_jpk1":1584944534384031,
"_v":1
}``````

Finally there is notification sent out from the following joined stream:

``````{
"notifid":12345,
"name":"notif1",
"msg":"users msg",
"pri":1,
"mailto":[

],
"endpoints":[
"http://192.168.1.49:10102/iot"
],
"schemaid":1234,
"notif_streamid":4321,
"notif_stream_name":"sdf",
"freq":1,
"tags":[
"a"
],
"rel_streams":[
"s1"
],
"count":1,
"dur":0,
"notif_event":{
"temp":71.2,
"pressure":11.5,
"point":1,
"_pk":1584944534396958,
"vibration":50,
"_jpk1":1584944534384031,
"_v":1
},
"count":1,
"count_this_notif":1,
"dur":0
}``````