Stream Commands
Stream is a feature in BangDB using which user can ingest, parse, process and analyse data in continuous manner. CLI provides lots of commands to deal with streams in simple manner.
There are following important steps to set up streams.
- Create Schema (using cli or can author json schema file separately)
- Register the schema
- Ingest and process data
Schema is a jsondocument which defines how the streams will get processed, stored and further the necessary actions may be taken. The structure of json is very simple it is like following:
Let's see a sample schema
{
"schema":"myschema",
"streams":[
{
"name":"stream1",
"type":1,
"swsz":86400,
"inpt":[
],
"attr":[
{
"name":"a",
"type":5,
"kysz":32,
"sidx":1,
"ridx":1,
"stat":1
},
{
"name":"b",
"type":5,
"kysz":32,
"stat":2
},
{
"name":"g",
"type":11,
"stat":3
},
{
"name":"h",
"type":11
}
],
"catr":[
{
"name":"m",
"type":9,
"opnm":"ADD",
"stat":3,
"iatr":[
"b",
"a"
]
},
{
"name":"mexp",
"type":9,
"opnm":"MATH_EXP",
"iatr":[
"((($g+$h)*2)+($g*$h))"
]
}
],
"gpby":[
{
"gpat":[
"a",
"b"
],
"iatr":"g",
"stat":3,
"gran":3600,
"kysz":48
}
],
"refr":[
{
"name":"myrefr1",
"iatr":[
"c"
],
"rstm":"stream2",
"ratr":[
"p1"
],
"jqry":{
"cond":[
"b"
],
"opnm":"ATTR",
"args":[
"b"
]
}
}
]
},
{
"name":"stream2",
"type":1,
"inpt":[
],
"attr":[
{
"name":"p1",
"type":5,
"kysz":24,
"sidx":1
},
{
"name":"b",
"type":5,
"kysz":32
}
]
}
]
}
Other example could be following. This runs CEP in continuous manner to find a pattern where for two stocks, the current value is exactly same but the stocks are different.
{
"schema":"test",
"streams":[
{
"name":"s1",
"type":1,
"swsz":86400,
"inpt":[
],
"attr":[
{
"name":"stock",
"type":5,
"kysz":16
},
{
"name":"price",
"type":9
}
],
"cepq":[
{
"name":"mystock",
"type":1,
"tloc":3000,
"ratr":[
"price"
],
"rstm":"s1",
"iatr":[
"stock",
"price"
],
"jqry":{
"cond":[
"stock",
"price"
],
"opid":11,
"args":[
"stock",
"price"
],
"cmp":[
"NE",
"EQ"
],
"seq":1
},
"fqry":{
"name":"{"query":[{"key":"price", "cmp_op":0, "val":70.5}]}",
"type":1
},
"cond":[
{
"name":"NUMT",
"val":1,
"opid":1
},
{
"name":"DUR",
"val":1000,
"opid":0
}
],
"ostm":"s2"
}
]
},
{
"name":"s2",
"type":3,
"swsz":86400,
"inpt":[
"s1"
],
"attr":[
{
"name":"stock",
"type":5,
"kysz":16
},
{
"name":"price",
"type":9
}
]
}
]
}
Another example, which joins two streams in continuous manner. This joins temperature and pressure for any given point and sends the joined data to temp_pressure_stream
.
{
"schema":"myschema",
"streams":[
{
"name":"temp_stream",
"type":1,
"swsz":81600,
"inpt":[
],
"attr":[
{
"name":"temp",
"type":11
},
{
"name":"point",
"type":9
}
],
"join":[
{
"name":"temp_pressure_join",
"type":3,
"tloc":3000,
"iatr":[
"temp",
"point"
],
"rstm":"pressure_stream",
"ratr":[
"pressure"
],
"jqry":{
"cond":[
"point"
],
"opid":11,
"args":[
"point"
]
},
"ostm":"temp_pressure_stream"
}
]
},
{
"name":"pressure_stream",
"type":1,
"inpt":[
],
"attr":[
{
"name":"pressure",
"type":11
},
{
"name":"point",
"type":9
}
],
"join":[
{
"name":"temp_pressure_join",
"type":5,
"tloc":3000,
"ratr":[
"temp",
"point"
],
"rstm":"temp_stream",
"iatr":[
"pressure"
],
"jqry":{
"cond":[
"point"
],
"opid":11,
"args":[
"point"
]
},
"ostm":"temp_pressure_stream"
}
]
},
{
"name":"temp_pressure_stream",
"type":3,
"inpt":[
"temp_stream",
"pressure_stream"
],
"attr":[
{
"name":"point",
"type":9
},
{
"name":"temp",
"type":11
},
{
"name":"pressure",
"type":11
}
]
}
]
}
To register the given schema (We can write a schema (like above) and directly register it with the server using CLI. Suppose the schema is in file “sample_schema.json” then we can register as below)
register schema sample_schema.json
@@@@ time taken to init stream [ myschema__stream2 ] is ...
Usage :
-------------------------
| Time : 0.003 msec |
-------------------------
User : 0.01 msec
Sys : 0 msec
-------------------------
@@@@ time taken to init stream [ myschema__stream1 ] is ...
Usage :
-------------------------
| Time : 89.097 msec |
-------------------------
User : 89.975 msec Sys : 1 msec
-------------------------
success
To de-register the schema, drop the schema
deregister schema myschema
success
OR, we can create using the workflow provided by the CLI, if we issue command "create schema"
create schema myschema
do you wish to read earlier saved schema for editing/adding? [ yes | no ]: no
Select "no" as we are creating from scratch. We should select yes. If we have some schema on the disk and we would like to add/edit to it. Then it asks to select the stream name.
what's the name of the stream that you wish to add?: stream1
We input the name of the stream "stream1" and the press enter. Post this we must select the type of the stream.
what's the type of the stream [ raw/normal primary (1) | fltr output (2) | join/cep output (3) | Entity (4) | skip (5) ] (or Enter for default (1)):1
Since this is the primary/raw stream hence we will select 1.
Now, define the size of the sliding window, enter for default one day
what's the size of the sliding window in seconds [ or enter for default 86400 sec (a day) ]:
By now, we have empty stream schema defined, now we must define what goes inside the schema framework enter 1 to add attributes first.
What would you like to add (press Enter when done) [ attr (1) | catr (2) | refr (3) | gpby (4) | fltr (5) | join (6) | entity (7) | cep (8) | notifs (9) ]:1
This is start the sub workflow to add attributes
add attributes
attribute name: a
attribute type (press Enter for default(5)) [ string(5) | long(9) | double (11) ]: 5 enable sidx [ 0 | 1 ]: 1
attribute key size in bytes (press Enter for default(24 bytes)): 32
enable ridx [ 0 | 1 ]: 1
enable stat [ none(0) | count (1) | unique count (2) ]: 1
add another attribute ? [ yes | no ]:
Now let's also create another stream “stream2” and add two attributes p1 and b as shown above, following the same workflow. Let's now create "catr". When creating "catr", it starts sub workflow for the "catr" itself.
add computed attributes (catr)
attribute name (press Enter to end): m
attribute type (press Enter for default (5)) [ string(5) | long(9) | double (11) ]: 9
available default ops are
[ COPY | ADD | MUL | DIV | PERCENT | SUB | UPPER | LOWER | COPY_VAL
| LOG_E | LOG_2 | LOG_10 | MATH_EXP | PRED | TS | YEAR | YEAR_EPOCH
| MONTH | MONTH_EPOCH | WEEK | WEEK_MONTH | WEEK_EPOCH | DAY | DAY_WEEK
| DAY_MONTH | DAY_EPOCH | HOUR | HOUR_EPOCH | MINUTE | MINUTE_EPOCH
| SECOND | ABS ]
or
Enter custom udf name
enter the name of the intended operation from the above default ops (press Enter to end): ADD
Select "ADD" as the ops here, so we wish to add something. We can ADD attributes or some fixed val.
Here let's say we wish to add the attributes, therefore we need to select attributes that we wish to add.
enter the name of the intended operation from the above default ops (press Enter to end): ADD
enter the input attributes on which this ops will be performed, (press Enter once done): b
enter the input attributes on which this ops will be performed, (press Enter once done): a
enter the input attributes on which this ops will be performed, (press Enter once done):
enter sequence [ 0 | 1 ], if 1 then it will be done before refer else post refr:1
The sequence is important if let's say we wish to apply operation on attribute which will be referred from other stream. Order ensures we execute catr before or after refer.
We can now select "sidx" and "stat" for this computed attribute as well.
enable sidx [ 0 | 1 ]: 0
enable stat [ none(0) | count (1) | running stats (3) ]: 0
should add, replace or add only if present [ add (1) | replace (2) | add only if not present (3) ]: 1
add another computed attribute ? [ yes | no ]: yes
Add another computed attribute now. This time we will use Math Expression to compute the attribute, using the existing attributes.
Here we should select ((($g+$h)*2)+($g*$h))
as math expression.
attribute name (press Enter to end): mexp
attribute type (press Enter for default (5)) [ string(5) | long(9) | double (11) ]: 9
available default ops are
[ COPY | ADD | MUL | DIV | PERCENT | SUB | UPPER | LOWER | COPY_VAL |
LOG_E | LOG_2 | LOG_10 | MATH_EXP | PRED | TS | YEAR | YEAR_EPOCH |
MONTH | MONTH_EPOCH | WEEK | WEEK_MONTH | WEEK_EPOCH | DAY | DAY_WEEK |
DAY_MONTH | DAY_EPOCH | HOUR | HOUR_EPOCH | MINUTE | MINUTE_EPOCH |
SECOND | ABS ]
or Enter custom udf name enter the name of the intended operation from the above default ops (press Enter to end): MATH_EXP
enter math expression: ((($g+$h)*2)+($g*$h))
enter sequence [ 0 | 1 ], if 1 then it will be done before refr else post refr: 1
enable sidx [ 0 | 1 ]: 0
enable stat [ none(0) | count (1) | running stats (3) ]: 0
should add, replace or add only if present [ add (1) | replace (2) | add only if not present (3) ]: 1
add another computed attribute ? [ yes | no ]: no
Now let's add groupby. It's also a sub-workflow
What would you like to add (press Enter when done) [ attr (1) | catr (2) | refr (3) | gpby (4) | fltr (5) | join (6) | entity (7) | cep (8) | notifs (9) ]: 4
add groupby (gpby)
name of the attribute that would be aggregated: g
enter name of groupby attributes (press Enter once done): a
enter name of groupby attributes (press Enter once done): b
enter name of groupby attributes (press Enter once done):
attribute key size in bytes (note gpby is name mangled with aggr and groupby attr names, hence should be properly allocated): 48
granularity for the aggregate (in seconds): 600
enable stat (1,2,3) [ count (1) | unique count (2) | running stat (3) ]: 1
add another gpby ? [ yes | no ]: no
Now let's create "refr"
, refer attribute in another stream.
What would you like to add (press Enter when done)
[ attr (1) | catr (2) | refr (3) | gpby (4) | fltr (5) | join (6) | entity (7) | cep (8) | notifs (9) ]: 3
add refers (refr)... refr name: myrefr1
enter name of input attribute that will get created after referring the other stream: c
enter name of input attribute that will get created after referring the other stream:
enable stat (1,2,3) on this created attribute? [ none(0) | count (1) | unique count (2) | running stat (3) ]: 0
enter refr (other stream) stream name: stream2
enter refr attribute name, the attribute which will get copied into this stream if refer condition is satisfied: p1
enter refr attribute name, the attribute which will get copied into this stream if refer condition is satisfied:
enter name of condition attribute need to join two events (press Enter once done): b
enter name of condition attribute need to join two events (press Enter once done):
available jqry op names tells whether the condition attributes should be compared with another attribute (ATTR), some fixed value (FIXED), math expression (MATH_EXP), both attribute and fixed (HYBRID) or your custom udf name
enter opid (operation name)[ ATTR (a) | FIXED (f) | MATH_EXP (m) | HYBRID (h) ]: a
enter name of arguments (attribute or fixed val or hybrid or math_exp as opid selected previously) for joining (press Enter once done): b
enter name of arguments (attribute or fixed val or hybrid or math_exp as opid selected previously) for joining (press Enter once done):
enter the comparison operators [ EQ | NE | GT | LT ] for join (press Enter once done): EQ
enter the comparison operators [ EQ | NE | GT | LT ] for join (press Enter once done):
enter seq (sequence to tell if strictly consecutive events are required (1) or we may have other events in between (0)) [ or Enter for default (0) ]: 0
should add, replace or add only if present [ add (1) | replace (2) | add only if not present (3) ]: 1 add another refr ? [ yes | no ]: no
That's it to create the first schema (as shown above), now simply commit the changes and it will create the schema.
What would you like to add (press Enter when done) [ attr (1) | catr (2) | refr (3) | gpby (4) | fltr (5) | join (6) | entity (7) | cep (8) | notifs (9) ]:
add another stream ? [ yes | no ]: no
do you wish to register the schema now? [ yes | no ]: yes
@@@@ time taken to init stream [ myschema__stream2 ] is ...
Usage :
-------------------------
| Time : 0.002 msec |
-------------------------
User : 0.008 msec
Sys : 0 msec
-------------------------
@@@@ time taken to init stream [ myschema__stream1 ] is ...
Usage :
-------------------------
| Time : 100.218 msec |
-------------------------
User : 0.884 msec
Sys : 1.001 msec
-------------------------
success schema [ myschema ] registered successfully
It finally asks if you wish to store the schema on the disk
do you wish to save the schema (locally) for later reference? [ yes | no ]: no
done with the schema [ myschema ] processing
See the existing schemas
schema list fetched successfully
+----------------+---------+
|name |state |
+----------------+---------+
|myschema |1 |
+----------------+---------+
fetched [ 1 ] schemas
To see the schema
select schema from myschema pretty
To insert event in the streams
insert into myschema.stream2 values null {"p1":"P123","b":"b123"}
success
insert into myschema.stream1 values null {"a":"a123","b":"b123","g":10.2,"h":5.5}
success
Select data from stream1 now
select * from myschema.stream1 scanning for pk range [null : null] and query = null
+-------------------------------+-------------------------------------------------------------------------------------------------------------------------+
|key |val |
+-------------------------------+-------------------------------------------------------------------------------------------------------------------------+
|1612193259884284 |{"a":"a123","b":"b123","g":10.2,"h":5.5,"_pk":1612193259884284,"m":7746999651695517567, | | "mexp":87,"c":"P123","_v":1} |
+-------------------------------+-------------------------------------------------------------------------------------------------------------------------+
total rows retrieved = 1 (1)
As you see, attribute m, mexp and c got added due to "catr" and "refr" respectively.
To see the count of events in each stream of a given schema
select stream from myschema
+-------------+--------+------------------+
|stream |type |num_events |
+-------------+--------+------------------+
|stream2 |1 | 1 |
+-------------+--------+------------------+
|stream1 |1 | 1 |
+-------------+----+----------------------+
successful in getting the streams [ 2 num ] info for schema [ myschema ]
To describe the schema, this will show the dependencies among streams and other entities
describe schema myschema pretty {
"schema":"myschema",
"schemaid":4766750373947953813,
"node_info":[
{
"refr-by":[
],
"cepq":[
],
"gpby":[
],
"fltr":[
],
"refr":[
],
"refr-to":[
],
"parent":[
],
"type":0,
"children":[
"myschema__stream2",
"myschema__stream1"
],
"node":"dummy_head",
"enty":[
],
"joins":[
]
},
{
"refr":[
],
"fltr":[
],
"gpby":[
],
"cepq":[
],
"refr-by":[
"myschema__stream1"
],
"enty":[
],
"node":"myschema__stream2",
"joins":[
],
"children":[
],
"type":1,
"parent":[
"dummy_head"
],
"refr-to":[
]
},
{
"refr-to":[
"myschema__stream2"
],
"children":[
"myschema__stream1__a__b__g"
],
"type":1,
"parent":[
"dummy_head"
],
"joins":[
],
"enty":[
],
"node":"myschema__stream1",
"refr-by":[
],
"cepq":[
],
"fltr":[
],
"gpby":[
"g"
],
"refr":[
"myschema__stream2"
]
},
{
"gpby":[
],
"fltr":[
],
"refr":[
],
"refr-by":[
],
"cepq":[
],
"enty":[
],
"joins":[
],
"node":"myschema__stream1__a__b__g",
"refr-to":[
],
"parent":[
],
"type":5,
"children":[
]
}
]
}
To describe stream of any schema
describe stream myschema.stream1 {
"node":6578472670278090808,
"attributes":[
{
"name":"b",
"type":5,
"kysz":48,
"sidx":0,
"stat":2,
"ridx":0
},
{
"name":"g",
"type":11,
"kysz":8,
"sidx":0,
"stat":3,
"ridx":0
},
{
"name":"m",
"type":9,
"kysz":8,
"sidx":0,
"stat":0,
"ridx":0
},
{
"name":"a",
"type":5,
"kysz":32,
"sidx":1,
"stat":1,
"ridx":1
},
{
"name":"h",
"type":11,
"kysz":8,
"sidx":0,
"stat":0,
"ridx":0
},
{
"name":"c",
"type":5,
"kysz":0,
"sidx":0,
"stat":0,
"ridx":0
},
{
"name":"mexp",
"type":9,
"kysz":8,
"sidx":0,
"stat":0,
"ridx":0
}
],
"parents":[
{
"name":"dummy_head",
"stid":0
}
],
"children":[
{
"name":"myschema__stream1__a__b__g",
"stid":2601729947950351671
}
],
"catr":[
{
"name":"m",
"attr-type":9,
"fnr":1,
"iatr":[
"b",
"a"
]
},
{
"name":"mexp",
"attr-type":9,
"fnr":1,
"iatr":[
"g",
"h"
]
}
],
"gpby":[
{
"gpby-attr":"g",
"gpby-attr-type":11,
"gran":600,
"swsz":86400,
"gpat":[
"a",
"b"
]
}
],
"refr-to":[
{
"name":"myschema__stream2",
"stid":5641729835205121944
}
],
"refr-by":[
],
"fltr":[
],
"joins":[
],
"refr":[
{
"refr-name":"myrefr1",
"refr-type":0,
"joins-with":"myschema__stream2",
"iatr":[
"c",
"b"
],
"ratr":[
"b",
"p1"
]
}
],
"cepq":[
],
"enty":[
],
"name":"myschema__stream1",
"type":1
}
You may also run the above command with "pretty"
qualifier.
To register notification
This is to create notification template which could be used to send notifications when certain event happens. For example when CEP query is satisfied, when filter is made, when join has happened etc.
Notification creation also create a sub-workflow which is intuitive enough to be able to create the notifications.
add notification details:
notification name: notif1
notification id: 1234
enter notification msg: This is a sample notification
frequency in seconds (minimum num of seconds to wait before sending same notification): 60
priority of the notification [1, 2 or 3] (1 is highest): 1
enter the schema name for which this notification may be used?: myschema
enter mail ids [whom notificaions will be sent to] (press enter to break):sachin@bangdb.com
enter mail ids [whom notificaions will be sent to] (press enter to break):admin@bangdb.com
enter mail ids [whom notificaions will be sent to] (press enter to break):
enter API end points [whom notificaions will be sent to] (press enter to break):http://192.168.1.3:10101/account
enter API end points [whom notificaions will be sent to] (press enter to break):
{
"schemaid":4766750373947953813,
"endpoints":[
"http://192.168.1.3:10101/account"
],
"mailto":[
"sachin@bangdb.com",
"admin@bangdb.com"
],
"freq":60,
"pri":1,
"name":"notif1",
"notifid":1234,
"msg":"This is a sample notification"
}
notification registered successfully
To de-register notification
deregister notification 1234
To select the list of registered notifications
select * from reg_notif scanning for pk range [null : null] and query = null
+--------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| key | val |
+--------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|1234 | {"name":"notif1","notifid":1234,"msg":"This is a sample notification","freq":60,"pri":1, | | | "schemaid":4766750373947953813,"mailto":["sachin@bangdb.com","admin@bangdb.com"], | | | "endpoints":["http://192.168.1.3:10101/account"]} |
+--------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
total rows retrieved = 1 (1)
stats or aggr data selection
While creating the schema we add "stat" : 1 for attr "a" and "stat":2 for attr "b". stat : 1 is for count, 2 is for unique count and 3 for all stats.
select aggr(a) from myschema.stream1 the query json is = {"proc-type":6,"attrs":["a"],"rollup":1,"from_ts":1,"to_ts":9223372036854775807}
+------+------------------------------------------------------------+
|key |val |
+------+------------------------------------------------------------+
|1 |{"fromts":1,"tots":9223372036854775807,"aggr_val":{"cnt":1}}|
+------+------------------------------------------------------------+
total rows retrieved = 1 (1)
Plot - we can plot the data in a chart on the terminal or save it on a disk
We can plot data by adding "plot"
command at the end of the query along with optional values.
The syntax for plot is as follows:
plot {"title":"mychart","term":1,"saveto":"file1.png","type":"line","just_plot":1,"attrs":["cnt","min","max"]}
select aggr(a) from myschema.stream1 plot {"type":"line"} the query json is = {"proc-type":6,"attrs":["a"],"rollup":1,"from_ts":1,"to_ts":9223372036854775807}
the time range has not been selected, hence limiting the num of data points to 1000 and just_plot =1
Warning: empty y range [0:0], adjusting to [-1:1]
bangdb_plot aggr 1
+--------------------------------------------------------------------+
| + + + + + + + + + |
| |
| |
0.5 |-+ +-|
| |
| |
| |
| |
0 |-+ *********************************************************** +-|
| |
| |
| |
-0.5 |-+ +-|
| |
| |
| |
| + + + + + + + + + |
-1 +--------------------------------------------------------------------+
12:00 14:00 16:00 18:00 20:00 22:00 24:00 26:00 28:00 30:00 32:00
total rows retrieved = 1000 (1000)
more data to come, continue .... [y/n]:
There are more commands here which are self explanatory.