Stream Manager
BangDB Stream Manager provides ways to create necessary constructs to deal with streams in timeseries manner. It allows users to create stream, ingest data, define processing logic so that continuous ingestion and analysis can go on in automated manner.
To get the instance of BangDB Stream Manager, call the constructor. It takes BangDB Env object reference, see BangDBEnv for more details.
BangDB Stream Manager (BangDBEnv *env);
BangDB Stream works on a schema which user must define and register with the Stream manager in order to be able to receive data in the stream and also process the events as defined in the schema. See BangDB Stream for more information.
To register a schema
char *registerSchema(const char *schema_json);
The schema/ app is in JSON format and contains details of stream operation. It returns NULL for serious error or json doc with errcode less than 0 with information on why the registration failed. If successful then errcode is set to 0 in the returned doc. Users should call delete[]
to free the memory.
To de-register / delete a existing schema
char *deregisterSchema(const char *schema_name, bool cleanclose = true);
schema_name
is the name given to the schema by the user. If successful then errcode is set to 0 in the returned doc. Else for error it could return NULL or errcode set to -1. Users should call delete[]
to free the memory.
To add streams to an existing schema
char *addStreams(long schemaid, const char *streams);
The streams input here is a json string that contains an array of streams to be added. It takes schemaid as input for which the set of streams to be added. schemaid is a unique id associated with a particular schema. If successful then errcode is set to 0 in the returned doc. Else for error it could return NULL or errcode set to -1. Users should call delete[]
to free the memory.
To delete streams from an existing schema
char *deleteStreams(long schemaid, const char *streams);
If successful then errcode is set to 0 in the returned doc. Else for error it could return NULL or errcode set to -1. Users should call delete[]
to free the memory.
To set stream state
char *setStreamState(const char *schema, const char *stream, short st);
If successful then errcode is set to 0 in the returned doc. Else for error it could return NULL or errcode set to -1. Users should call delete[]
to free the memory.
To get stream state
int getStreamState(const char *schema, const char *stream);
The state of stream could be ON or OFF, hence it returns 1 or 0 respectively. For error it returns -1.
To add user defined functions for computing in the schemas
char *addUdfs(long schema_id, const char *udfs);
If successful then errcode is set to 0 in the returned doc. Else for error it could return NULL or errcode set to -1. Users should call delete[]
to free the memory.
To delete udf from a given schema using udf name and schema id
char *delUdfs(long schema_id, const char *udfs);
If successful then errcode is set to 0 in the returned doc. Else for error it could return NULL or errcode set to -1. Users should call delete[]
to free the memory.
This list all the user defined functions present in the database
char *getUdfList();
If successful then it returns the list else NULL. Users should call delete[]
to free the memory. To get the list all Registered notifications. These are notification templates to send the event notifications. These are not actual notifications. Please see Notification to know more about notification template and also dealing with it.
To see more information on how to scan the table, see DataQuery section
ResultSet * scanRegisteredNotif(
ResultSet * prev_rs,
FDT * pk_skey = NULL,
FDT * pk_ekey = NULL,
const char * idx_filter_json = NULL,
ScanFilter * sf = NULL
);
If successful, it returns resultset reference which could be iterated to read key and value. It returns NULL for error To get the list of generated notifications, users may scan in the usual way. Note that the query filter can still be used for the scan. See DataQuery to know more about scan.
ResultSet * scanNotification(
ResultSet * prev_rs,
FDT * pk_skey = NULL,
FDT * pk_ekey = NULL,
const char * idx_filter_json = NULL,
ScanFilter * sf = NULL
);
If successful, it returns resultset reference which could be iterated to read key and value. It returns NULL for error.
To insert events into the stream. The event is the doc (json document)
char *put(long schemaid, long streamid, const char *doc);
streamid is a unique numerical id associated with a particular stream. It returns json with errcode set to -1 for error else 0 for success. User should check for NULL as well User should delete the memory of returned data by calling delete[]
.
To get the events from any given stream from a given schema
char *put(long schemaid, long streamid, long k, const char *v);
It returns json with errcode set to -1 for error else 0 for success. User should check for NULL as well User should delete the memory of returned data by calling delete[]
. To scan the stream for a given filter condition. Users may scan the stream in the usual way. Note that the query filter (idx_filter_json)
can still be used for the scan. See DataQuery to know more about scan.
ResultSet * scanDoc(
long schemaid,
long streamid,
ResultSet * prev_rs,
FDT * pk_skey = NULL,
FDT * pk_ekey = NULL,
const char * idx_filter_json = NULL,
ScanFilter * sf = NULL
);
To scan aggregate, groupby and entity streams users should call this API. This takes a special argumentattr_names_json
, which defines what kind of data is being scanned. This again is a recursive scan and is used similar to other scans (db). See DataQuery to know more about how to use scan effectively.
ResultSet * scanProcDoc(
long schemaid,
long streamid,
const char * attr_names_json,
ResultSet * prev_rs,
ScanFilter * sf = NULL
);
The attr_names_json
defines what to do and for whom this is being called. The structure of the json is
-for aggr = query_json = {
"proc-type": 6,
"attrs":["a", "b", ...],
"option" : 1, "skey:"sk",
"ekey":"ek",
"rollup":1
}
-for entity = query_json = {
"proc-type": 6,
"attrs":["a", "b", ...],
"option" : 1,
"skey:"sk",
"ekey":"ek",
"enty-stream-id":1234
}
-for gpby = query_json = {
"attrs":["a", "b", ...],
"option" : 1,
"skey:"sk",
"ekey":"ek",
"gpby-val":"x",
"gpby-name":"a1_b2_c3",
"gpby-attrid":123
}
It returns ResultSet, which could be iterated to go over the events. Else it returns NULL for error.
To count the number of events for a given filter condition
long countProc(long schemaid, long streamid, const char *attr_names_json, ScanFilter *sf = NULL);
It returns -1 for error.
To get groupby operation name. Check out more about it instream
long getGpbyName(long schemaid, long streamid, const char *gpby_attr_list, char **out_json);
To get the GpbyName is the mangled name given by the stream manager to a particular groupby operation. The gpby_tatr_list
provides the necessary information for the computation of the name.
gpby_attr_list = {"attrs":["a1", "b2", "c3"], "gpby-val":"x", "gpby-name":"a1_b2_c3", "gpby-attrid":123}
It returns -1 for error else 0 for success. The out_json
contains the name of the groupby.
To count total number of events present in the given stream
long count(long schemaid, long streamid);
It returns -1 for error else the count.
To get number of events present for a given condition or filter query (idx_filter_json)
long count(long schemaid, long streamid, FDT *pk_skey, FDT *pk_ekey, const char *idx_filter_json = NULL, ScanFilter *sf = NULL);
It returns -1 for error else the count.
To get count of event pushed into the raw streams
ResultSet * scanUsage(
ResultSet * prev_rs,
long fromts,
long tots,
int rollup,
ScanFilter * sf = NULL
);
It returns Resultset for success or NULL for error.
To get the schema id for an existing schema
long getSchemaid(const char *schema_name, bool check_valid = true);
It returns -1 for error else the schemaid.
To get the stream id for a stream in an existing schema
long getStreamid(const char *schema_name, const char *stream_name, bool check_valid = true);
It returns -1 for error or streamid.
To get the entire schema (json structure). This API returns from the Stream memory and not from the stored metadata.
char *getSchemaStr(const char *schema_name);
It returns NULL for error or json with errocde non-zero for other errors. For success it returns the schema. User should delete the memory of returned data by calling delete[]
. To get the entire schema (json structure) from metadata. Usually both this and previous schema would be the same, but in some cases they could be different.
char *getSchemaFromMetadata(const char *schema_name);
User should delete the memory of returned data by calling delete[]
.
To get a dependency graph for a given schema, users may call this API. This returns json doc defining the entire dependency graph for the scehma.
char *getSchemaDepGraph(long schema_id, bool bfs = true);
The schema is structured as a graph within the stream manager. This api will return the graph for the given schema. It returns NULL for error. User should delete the memory of returned data by calling delete[]
.
To get dependency graph for a given stream
char *getStreamDepGraph(long schema_id, long stream_id, bool only_dep = false);
This API will return the graph for the given stream for a schema. Please see stream section to know more on the graph. It returns NULL for error. User should delete the memory of returned data by calling delete[]
.
To get list of all schemas present in the database
char *getSchemaList();
This returns json doc with the list of all the schema or NULL for error. It may set errcode as -1 as well for some errors. User should delete the memory of returned data by calling delete[]
.
To close the stream manager in the end
void closeBangDB Stream Manager (CloseType ctype = DEFAULT_AT_CLIENT);