BangDB Stream Manager provides ways to create necessary constructs to deal with streams in timeseries manner. It allows users to create stream, ingest data, define processing logic so that continuous ingestion and analysis can go on in automated manner.

C++
Java

To get the instance of BangDB Stream Manager, call the constructor. It takes BangDB Env object reference, see BangDBEnv for more details.

BangDB Stream Manager (BangDBEnv *env);

BangDB Stream works on a schema which user must define and register with the Stream manager in order to be able to receive data in the stream and also process the events as defined in the schema. See BangDB Stream for more information.

To register a schema

char *registerSchema(const char *schema_json);

The schema/ app is in JSON format and contains details of stream operation. It returns NULL for serious error or json doc with errcode less than 0 with information on why the registration failed. If successful then errcode is set to 0 in the returned doc. Users should call delete[] to free the memory.

To de-register / delete a existing schema

char *deregisterSchema(const char *schema_name, bool cleanclose = true);

schema_name is the name given to the schema by the user. If successful then errcode is set to 0 in the returned doc. Else for error it could return NULL or errcode set to -1. Users should call delete[] to free the memory.

To add streams to an existing schema

char *addStreams(long schemaid, const char *streams);

The streams input here is a json string that contains an array of streams to be added. It takes schemaid as input for which the set of streams to be added. schemaid is a unique id associated with a particular schema. If successful then errcode is set to 0 in the returned doc. Else for error it could return NULL or errcode set to -1. Users should call delete[] to free the memory.

To delete streams from an existing schema

char *deleteStreams(long schemaid, const char *streams);

If successful then errcode is set to 0 in the returned doc. Else for error it could return NULL or errcode set to -1. Users should call delete[] to free the memory.

To set stream state

char *setStreamState(const char *schema, const char *stream, short st);

If successful then errcode is set to 0 in the returned doc. Else for error it could return NULL or errcode set to -1. Users should call delete[] to free the memory.

To get stream state

int getStreamState(const char *schema, const char *stream);

The state of stream could be ON or OFF, hence it returns 1 or 0 respectively. For error it returns -1.

To add user defined functions for computing in the schemas

char *addUdfs(long schema_id, const char *udfs);

If successful then errcode is set to 0 in the returned doc. Else for error it could return NULL or errcode set to -1. Users should call delete[] to free the memory.

To delete udf from a given schema using udf name and schema id

char *delUdfs(long schema_id, const char *udfs);

If successful then errcode is set to 0 in the returned doc. Else for error it could return NULL or errcode set to -1. Users should call delete[] to free the memory.

This list all the user defined functions present in the database

char *getUdfList();

If successful then it returns the list else NULL. Users should call delete[] to free the memory. To get the list all Registered notifications. These are notification templates to send the event notifications. These are not actual notifications. Please see Notification to know more about notification template and also dealing with it.

To see more information on how to scan the table, see DataQuery section

ResultSet * scanRegisteredNotif(
  ResultSet * prev_rs,
  FDT * pk_skey = NULL,
  FDT * pk_ekey = NULL,
  const char * idx_filter_json = NULL,
  ScanFilter * sf = NULL
);

If successful, it returns resultset reference which could be iterated to read key and value. It returns NULL for error To get the list of generated notifications, users may scan in the usual way. Note that the query filter can still be used for the scan. See DataQuery to know more about scan.

ResultSet * scanNotification(
   ResultSet * prev_rs,
   FDT * pk_skey = NULL,
   FDT * pk_ekey = NULL,
   const char * idx_filter_json = NULL,
   ScanFilter * sf = NULL
);

If successful, it returns resultset reference which could be iterated to read key and value. It returns NULL for error.

To insert events into the stream. The event is the doc (json document)

char *put(long schemaid, long streamid, const char *doc);

streamid is a unique numerical id associated with a particular stream. It returns json with errcode set to -1 for error else 0 for success. User should check for NULL as well User should delete the memory of returned data by calling delete[].

To get the events from any given stream from a given schema

char *put(long schemaid, long streamid, long k, const char *v);

It returns json with errcode set to -1 for error else 0 for success. User should check for NULL as well User should delete the memory of returned data by calling delete[]. To scan the stream for a given filter condition. Users may scan the stream in the usual way. Note that the query filter (idx_filter_json) can still be used for the scan. See DataQuery to know more about scan.

ResultSet * scanDoc(
   long schemaid,
   long streamid,
   ResultSet * prev_rs,
   FDT * pk_skey = NULL,
   FDT * pk_ekey = NULL,
   const char * idx_filter_json = NULL,
   ScanFilter * sf = NULL
);

To scan aggregate, groupby and entity streams users should call this API. This takes a special argumentattr_names_json, which defines what kind of data is being scanned. This again is a recursive scan and is used similar to other scans (db). See DataQuery to know more about how to use scan effectively.

ResultSet * scanProcDoc(
   long schemaid,
   long streamid,
   const char * attr_names_json,
   ResultSet * prev_rs,
   ScanFilter * sf = NULL
);

The attr_names_json defines what to do and for whom this is being called. The structure of the json is

-for aggr = query_json = {
"proc-type": 6,
"attrs":["a", "b", ...], 
"option" : 1, "skey:"sk",
"ekey":"ek", 
"rollup":1
}

-for entity = query_json = {
"proc-type": 6,
"attrs":["a", "b", ...],
"option" : 1, 
"skey:"sk", 
"ekey":"ek", 
"enty-stream-id":1234
}

-for gpby = query_json = {
"attrs":["a", "b", ...], 
"option" : 1, 
"skey:"sk", 
"ekey":"ek", 
"gpby-val":"x", 
"gpby-name":"a1_b2_c3", 
"gpby-attrid":123
}

It returns ResultSet, which could be iterated to go over the events. Else it returns NULL for error.

To count the number of events for a given filter condition

long countProc(long schemaid, long streamid, const char *attr_names_json, ScanFilter *sf = NULL);

It returns -1 for error.

To get groupby operation name. Check out more about it instream

long getGpbyName(long schemaid, long streamid, const char *gpby_attr_list, char **out_json);

To get the GpbyName is the mangled name given by the stream manager to a particular groupby operation. The gpby_tatr_list provides the necessary information for the computation of the name.

gpby_attr_list = {"attrs":["a1", "b2", "c3"], "gpby-val":"x", "gpby-name":"a1_b2_c3", "gpby-attrid":123}

It returns -1 for error else 0 for success. The out_jsoncontains the name of the groupby.

To count total number of events present in the given stream

long count(long schemaid, long streamid);

It returns -1 for error else the count.

To get number of events present for a given condition or filter query (idx_filter_json)

long count(long schemaid, long streamid, FDT *pk_skey, FDT *pk_ekey, const char *idx_filter_json = NULL, ScanFilter *sf = NULL);

It returns -1 for error else the count.

To get count of event pushed into the raw streams

ResultSet * scanUsage(
   ResultSet * prev_rs,
   long fromts,
   long tots,
   int rollup,
   ScanFilter * sf = NULL
);

It returns Resultset for success or NULL for error.

To get the schema id for an existing schema

long getSchemaid(const char *schema_name, bool check_valid = true);

It returns -1 for error else the schemaid.

To get the stream id for a stream in an existing schema

long getStreamid(const char *schema_name, const char *stream_name, bool check_valid = true);

It returns -1 for error or streamid.

To get the entire schema (json structure). This API returns from the Stream memory and not from the stored metadata.

char *getSchemaStr(const char *schema_name);

It returns NULL for error or json with errocde non-zero for other errors. For success it returns the schema. User should delete the memory of returned data by calling delete[]. To get the entire schema (json structure) from metadata. Usually both this and previous schema would be the same, but in some cases they could be different.

char *getSchemaFromMetadata(const char *schema_name);

User should delete the memory of returned data by calling delete[].

To get a dependency graph for a given schema, users may call this API. This returns json doc defining the entire dependency graph for the scehma.

char *getSchemaDepGraph(long schema_id, bool bfs = true);

The schema is structured as a graph within the stream manager. This api will return the graph for the given schema. It returns NULL for error. User should delete the memory of returned data by calling delete[].

To get dependency graph for a given stream

char *getStreamDepGraph(long schema_id, long stream_id, bool only_dep = false);

This API will return the graph for the given stream for a schema. Please see stream section to know more on the graph. It returns NULL for error. User should delete the memory of returned data by calling delete[].

To get list of all schemas present in the database

char *getSchemaList();

This returns json doc with the list of all the schema or NULL for error. It may set errcode as -1 as well for some errors. User should delete the memory of returned data by calling delete[].

To close the stream manager in the end

void closeBangDB Stream Manager (CloseType ctype = DEFAULT_AT_CLIENT);

To get the instance of BangDB Stream Manager, call the constructor. It takes BangDB Env object reference, see BangDBEnv for more details.

public BangDBStreamManager(BangDBEnv bdbenv)

BangDB Stream works on a schema which user must define and register with the Stream manager in order to be able to receive data in the stream and also process the events as defined in the schema. See BangDB Stream for more information.

To register a schema

public String registerSchema(String schema_json)

The schema/ app is in JSON format and contains details of stream operation. It returns NULL for serious error or json doc with errcode less than 0 with information on why the registration failed. If successful then errcode is set to 0 in the returned doc.

To de-register / delete a existing schema

public String deregisterSchema(String schema_name)

To de-register an app or schema, simply pass the name of the scheme. If successful then errcode is set to 0 in the returned doc else for error. It could return NULL or errcode set to -1.

To drop or delete a existing schema

public String deregisterSchema(String schema_name, boolean cleanclose)

To add streams to an existing schema

public String addStreams(long schemaid, String streams)

The DB assigns a unique id to every registered schema which is represented by schemaid and the streams input here is a json string that contains an array of streams to be added. If successful then errcode is set to 0 in the returned doc. Else for error it could return NULL or errcode set to -1.

To delete streams from an existing schema

public String deleteStreams(long schemaid, String streams)

This deletes the set of streams defined in the streams json containing names of the streams. It takes schemaid as input for which the set of streams to be deleted. If successful then errcode is set to 0 in the returned doc else for error it could return NULL or errcode set to -1.

To check if the schema is currently ready for taking events

public int getSchemaDDLState(long schemaid)

This returns if the schema is currently ready for taking events. It returns the 1 if it's not ready and 0 if ready.

To get schemaid for the given schema_name

public long getSchemaid(String schema_name, boolean check_valid)

This returns schemaid for the given schema_name. If check_valid is false then it will return a potential name else the actual existing name. For success it returns the id else -1 for error.

To get stream id for a given schema_name and stream_name

public long getStreamid(String schema_name, String stream_name, boolean check_valid)

This returns streamid for the given schema_name and stream_name. If check_valid is false then it will return a potential name else the actual existing name. For success it returns the id else -1 for error.

To get status of a stream

public int getStreamState(String schema_name, String stream_name)

The state of the stream could be ON or OFF, hence it returns 1 or 0 respectively. For error it returns -1.

To put events into the stream

public String put(long schemaid, long streamid, String doc)

The event is the doc (json document) and schemaid, streamid define the stream for which the event should be put. It returns json string with errcode as 0 for success or -1 for error. It also contains reasons for failure when it fails.

To put events into any given stream for a given schema

public String put(long schemaid, long streamid, long k, String v)

This is to simply put the document into any given stream for a given schema. Please note there is no stream or event processing that takes place for this api. This is similar to table API. It returns a json string with errcode as 0 for success or -1 for error. It also contains the reason for failure when it fails.

To scan a stream for a given filter

public ResultSet scanDoc(
   long schemaid, 
   long streamid, 
   ResultSet prev_rs, 
   long k1, 
   long k2, 
   String idx_filter_json, 
   ScanFilter sf
)

To scan aggregate, groupby and entity stream

public ResultSet scanProcDoc(
   long schemaid, 
   long streamid,
   String attr_names_json,
   ResultSet prev_rs, 
   ScanFilter sf
)

The attr_names_json defines what to do and for whom this API is being called. The structure of the JSON is as follows:

for aggr = query_json = {
"proc-type": 6,
"attrs":["a", "b", ...], 
"option" : 1, 
"skey:"sk", 
"ekey":"ek", 
"rollup":1
}

for entity = query_json = {
"proc-type": 6,
"attrs":["a", "b", ...], 
"option" : 1, "skey:"sk",
"ekey":"ek", 
"enty-stream-id":1234
}

for gpby = query_json = {
"attrs":["a", "b", ...], 
"option" : 1, 
"skey:"sk", 
"ekey":"ek", 
"gpby-val":"x", 
"gpby-name":"a1_b2_c3", 
"gpby-attrid":123
}

In gpby if gpby-attrid is provided then "attrs", "gpby-val", "gpby-name" not required, basically last 3 are to compute gpby-attrid only. To get the list all Registered notifications. These are notification templates to send the event notifications. These are not actual notifications. Please see notificationto know more about notification template and also dealing with it.

public ResultSet scanRegisteredNotif(
   ResultSet prev_rs,
   long k1, 
   long k2, 
   String idx_filter_json, 
   ScanFilter sf
)

If successful, it returns resultset reference which could be iterated to read key and value. It returns NULL for error.

To get count of events in the raw streams

public ResultSet scanUsage(
   ResultSet prev_rs,
   long fromts,
   long tots,
   int rollup,
   ScanFilter sf
)

It returns Resultset for success and NULL for error.

To get count of events in given stream

public long countProc(long schemaid, long streamid, String attr_names_json, ScanFilter sf)

This returns count of events for given schema and stream ids, else -1 for error.

To get the name of a groupby operation. Check out more about it in stream.

public String getGpbyName(long schemaid, long streamid, String gpby_attr_list)
public long getGpbyId(long schemaid, long streamid, String gpby_attr_list)

Groupby name is mangled by the stream manager. Therefore to get the actual name, we can call this API. The gpby_attr_list provides the necessary information for the computation of the name. It looks something like this:

gpby_attr_list = {"attrs":["a1", "b2", "c3"], "gpby-val":"x", "gpby-name":"a1_b2_c3", "gpby-attrid":123}

It returns -1 for error else 0 for success. The out_json contains the name of the groupby.

To get count of events for a given schema and stream

public long count(long schemaid, long streamid)

It returns -1 for error else the count.

To get count of events based on filter condition for a given schema and stream

public long count(long schemaid, long streamid, long psk, long pek, String filter_json, ScanFilter sf)

It returns -1 for error else the count.

Adding UDF to perform computations on stream

public String addUdfs(long schema_id, String udfs)

The stream manager may leverage user defined functions for many computing as defined in the schema. Using this API, one can add udfs for a given scheamid. It returns a json string with errcode as 0 for success or -1 for error. It also contains the reason for failure when it fails.

To delete an UDF

public String delUdfs(long schema_id, String udfs)

This API can delete udfs for a given scheamid. It returns a json string with errcode as 0 for success or -1 for error. It also contains the reason for failure when it fails.

To compile a UDF

public String compileUdf(String code)

To get the list all UDF present

public String getUDFList()

It returns -1 for error else 0 for success. The result is a json containing list of udfs.

To get the list of all registered schemas

public String getSchemaList()

This returns json doc with the list of all the schema or NULL for error. It may set errcode as -1 as well for some errors.

To get a dependency graph for a given schema, users may call this API. This returns json doc defining the entire dependency graph for the schema.

public String getSchemaDepGraph(long schema_id, boolean bfs)

The schema is structured as a graph within the stream manager. This api will return the graph for the given schema. The bfs defines breadth first travel if set as true and it will use dfs (depth first). It returns NULL for error.

To get the entire schema (json structure) from metadata. Usually both this and previous schema would be the same, but in some cases they could be different.

public String getSchemaStr(String schemaName, short from_meta)

Here, from_meta = 0 means from memory, else 1 means, get from the meta store.

To get graphical structure for a given stream

public String getStreamDepGraph(long schema_id, long stream_id, boolean only_dep)

This api will return the graph for the given stream for a schema. Please see stream section to know more on the graph. It returns NULL for error.

To reset the ml helper atr run time

public void resetMlHelper(BangDBMLHelper bmlh)

To close the stream manager

public synchronized void closeBangdbStreamManager(CloseType closetype)