Stream Manager (Embedded)
static BangDBStreamManager *getInstance(BangDBDatabase *bdb, BangDBMLHelper *bmlh);
To get instance of BangDBStreamManager. It takes BangDB Database as parameter and also BangDBMLHelper as input parameter. The BangDBMLHelper could be NULL in case we don't wish to train and predict on streams. When successful it returns the instance of the stream manager, else NULL for error.
char *registerSchema(const char *schema_json);
BangDB stream works on schema of the set of streams or what we call app. We need to pass the app here which is in json format. It returns json string with error code as 0 for success else -1 for error. It also contains reason for failure when it fails. The app / schema details are covered in the stream section.
char *deregisterSchema(const char *schema_name);
To de-register an app or schema, simply pass the name of the schame and it will return json string with errcode as 0 for success or -1 for error. It also contains reason for failure when it fails. The app / schema details are covered in the stream section.
char *addStreams(long schemaid, const char *streams);
This is helpful when we wish to add streams in the existing schema or app. The streams input here is json string that contains array of streams to be added. It takes schemaid as input for which the set of streams to be added. It returns json string with errcode as 0 for success or -1 for error. It also contains reason for failure when it fails. The app / schema details are covered in the stream section.
// streams
{"schema":"myschema", "streams":[{"name":"mystream"}]}
char *deleteStreams(long schemaid, const char *streams);
This deletes the set of streams defined in the streams json containing names of the streams. It takes schemaid as input for which the set of streams to be deleted. It returns json string with errcode as 0 for success or -1 for error. It also contains reason for failure when it fails. The app / schema details are covered in the stream section.
char *addUdfs(long schema_id, const char *udfs);
The stream manager may leverage user defined functions for many computing as defined in the schema. Using this API, one can add udfs for given scheamid. It returns json string with errcode as 0 for success or -1 for error. It also contains reason for failure when it fails. The know more about udf, please see the udf section.
char *delUdfs(long schema_id, const char *udfs);
This API can delete udfs for given scheamid. It returns json string with errcode as 0 for success or -1 for error. It also contains reason for failure when it fails. The know more about udf, please see the udf section.
char *put(long schemaid, long streamid, const char *doc);
To put event into the stream manager, this api is used. The event is the doc (json document) and schemaid, streamid define the stream for which the event should be put. It returns json string with errcode as 0 for success or -1 for error. It also contains reason for failure when it fails. Please see stream section to know more about events.
char *put(long schemaid, long streamid, FDT *k, FDT *v);
This is to simply put the document into any given stream for a given schema. Please note there is no stream or event processing that takes place for this api. This is similar to table API. It returns json string with errcode as 0 for success or -1 for error. It also contains reason for failure when it fails.
int get(long schemaid, long streamid, FDT *key, FDT **val);
This is to simply get the document from any given stream for a give schema. This is similar to table api. It returns json string with errcode as 0 for success or -1 for error. It also contains reason for failure when it fails.
ResultSet *scanDoc(
long schemaid,
long streamid,
ResultSet *prev_rs,
FDT *k1 = NULL,
FDT *k2 = NULL,
const char *idx_filter_json = NULL,
scan_filter *sf = NULL
);
This is to scan the stream for given filters. This is very similar to the table API. It just takes additional schemaid and streamid to identify the given stream.
ResultSet *scanProcDoc(
long schemaid,
long streamid,
const char *attr_names_json,
ResultSet *prev_rs,
scan_filter *sf = NULL
);
This is to scan aggregate, grouby and entity streams. The attr_names_json
defines what to do and for whom this API is being called. The structure of the json is as follows:
for aggr = query_json = {"proc-type": 6,"attrs":["a", "b", ...], "option" : 1, "skey:"sk", "ekey":"ek", "rollup":1}
for entity = query_json = {"proc-type": 6,"attrs":["a", "b", ...], "option" : 1, "skey:"sk", "ekey":"ek", "enty-stream-id":1234}
for gpby = query_json = {"attrs":["a", "b", ...], "option" : 1, "skey:"sk", "ekey":"ek", "gpby-val":"x", "gpby-name":"a1_b2_c3", "gpby-attrid":123}
for gpby if gpby-attrid is provided then "attrs", "gpby-val", "gpby-name" not required, basically last 3 are to compute "gpby-attrid" only
enum BANGDB_STREAM_TYPE {
BANGDB_STREAM_TYPE_NORMAL = 1, // defined in the schema
BANGDB_STREAM_TYPE_FILTER, // defined in the schema
BANGDB_STREAM_TYPE_JOINED, // defined in the schema
BANGDB_STREAM_TYPE_ENTITY, // defined in the schema
BANGDB_STREAM_TYPE_GPBY, // not explicitly defined in the schema
BANGDB_STREAM_TYPE_AGGR, // not explicitly defined in the schema
BANGDB_STREAM_TYPE_INVALID
};
only BANGDB_STREAM_TYPE_ENTITY
, BANGDB_STREAM_TYPE_GPBY
and BANGDB_STREAM_TYPE_AGGR
are supported For aggr, we can also define rollup (1 means ON 0 means OFF) It returns ResultSet for success or NULL for error. Please see stream section to get more info on these.
long getGpbyName(long schemaid, long streamid, const char *gpby_attr_list, char **out_json);
Groupby name is mangled by the stream manager. Therefore to get the actual name, we can call this API. The gpby_attr_list
provides the necessary information for the computation of the name. It looks something like this:
gpby_attr_list = {"attrs":["a1", "b2", "c3"], "gpby-val":"x", "gpby-name":"a1_b2_c3", "gpby-attrid":123}
If gpby-attrid
field is already present, it returns same long value with NULL in out_json
. It returns 0 for success and -1 for error. Please see stream section to get more info on these.
long count(long schemaid, long streamid);
This returns count of events for given schema and stream ids, else -1 for error.
long getSchemaid(const char *schema_name, bool check_valid = true);
This returns schemaid for the given schema_name. If check_valid
is false then it will return potential name else the actual existing name. For success it returns the id else -1 for error.
long getStreamid(const char *schema_name, const char *stream_name, bool check_valid = true);
This returns streamid for the given schema_name and stream_name. If check_valid is false then it will return potential name else the actual existing name. For success it returns the id else -1 for error.
char *getSchemaStr(const char *schema_name);
This returns the entire schema or app json else errcode as -1 with the reason for failure.
char *getSchemaFromMetadata(const char *schema_name);
This returns the entire schema or app json else errcode as -1 with the reason for failure. Please note this is same as previous API, except it reads from table and then returns whereas previous API will return from the cache.
char *getSchemaDepGraph(long schema_id, bool bfs = true);
The schema is structured as graph within the stream manager. This api will return the graph for the given schema. The bfs defines breadth first travel if set as true and it will use dfs (depth first). As of now only bfs as true is supported. Upon error it will return json string with errcode as -1 with the reasons for failure. Please see stream section to know more on the graph.
char *getStreamDepGraph(long schema_id, long stream_id, bool only_dep = false);
This API will return the graph for the given stream for a schema.only_dep = true
means return the depth part else returns the node information. Upon error it will return json string with errcode as -1 with the reasons for failure. Please see stream section to know more on the graph.
void closeBangDBStreamManager(bool force = false);
This will close the stream manager. Please note if force is set to be false then it will simply reduce the reference count and if the count is 0 then it will close the stream manager. But if force is set to be as false then it will simply close the stream manager irrespective of how many references are there or not.