Working with Stream
Stream API
Most Important APIs for stream, Please refer Stream Manager for list of all Stream APIs, here we will discuss the most important ones when it comes to ingesting and scanning/retrieving the data.
Overall, following few Stream APIs are most critical and most of the time we will be dealing with these at run time for data processing.
public String put(long schemaid, long streamid, String doc)
public ResultSet scanDoc(long schemaid, long streamid, ResultSet prev_rs, long k1, long k2, String idx_filter_json, ScanFilter sf)
public ResultSet scanProcDoc(long schemaid, long streamid, String query_json, ResultSet prev_rs, ScanFilter sf)
public long count(long schemaid, long streamid)
public long getGpbyId(long schemaid, long streamid, String gpby_attr_list)
And few important info are as follows:
enum BANGDB_STREAM_TYPE {
BANGDB_STREAM_TYPE_NORMAL = 1, // defined in the schema
BANGDB_STREAM_TYPE_FILTER, // defined in the schema
BANGDB_STREAM_TYPE_JOINED, // defined in the schema
BANGDB_STREAM_TYPE_ENTITY, // defined in the schema
BANGDB_STREAM_TYPE_GPBY, // not explicitly defined in the schema
BANGDB_STREAM_TYPE_AGGR, // not explicitly defined in the schema
BANGDB_STREAM_TYPE_INVALID
};
Ingesting data
To ingest data into the BangDB, we call put()
function. It is very simple and all processing as defined in the schema is abstracted behind this API. Ingestion always happens for normal stream or the main stream as shown above in the enum.
Therefore, when we call this API, we ensure that every single computations will take place and only then it will return. Typically, for a single event put, there could be on an average several dozens of processing that may happen before the call returns, starting from computing new attributes, to referring to other existing attributes, groupbys, filter, join, entity computation, complex event processing, notification and few other necessary ones. Hence it is highly critical that the API is super efficient and highly performant. Here is how the API is called.
public String put(long schemaid, long streamid, String doc)
Scanning data from main stream
Scanning data means retrieving data from streams. The stream could be normal or other derived ones. BANGDB_STREAM_TYPE_NORMAL enum defines the normal stream and all events/data is ingested into this stream only. But we can scan data from normal and all other derived streams as well.
Also, filter, joined, referral stream, could also be scanned using this method.
public ResultSet scanDoc(long schemaid, long streamid, ResultSet prev_rs, long k1, long k2, String idx_filter_json, ScanFilter sf)
Scanning data from derived streams
We have to bother about only 3 stream types here, namely
To scan data from these streams, we use following API.
public ResultSet scanProcDoc(long schemaid, long streamid, String query_json, ResultSet prev_rs, ScanFilter sf)
schemaid : schema id for the schema to which the stream belongs.
streamid : This is for the main / normal stream id. Since the aggregate, entity and groupby streams are abstracted hence we use the stream id of the stream to which these belong.
query_json : quer_json has following basic structure, but it's different for different stream types:
{"proc-type": 5, "gpby-attrid": 123, "from_ts":123456, "to_ts": 234567, "skey": "*:a1:b1", "ekey": "*:a1:b7","rollup":0}
proc-type -> indicates what kind of derived stream, 5 means
gpby gpby-attrid -> id of the groupby.
User may call following function to get that
public String getGpbyName(long schemaid, long streamid, String gpby_attr_list)
Here gpby_attr_list has following structure:
{"attrs":["a1", "b2", "c3"], "gpby-val":"x", "gpby-name":"a1_b2_c3", "gpby-attrid":123}
Here, user may just provide attrs and gpby-val and it will return gpby-attrid OR simply provide the gpby-name if that's there. Mostly user may call this function with following gpby_attr_list to get the gpby-attid.
{"attrs":["a1", "b2", "c3"], "gpby-val":"x"}
for aggregate
{"proc-type": 6,"attrs":["a"], "from_ts":sk_ts, "to_ts":ek_ts, "rollup":1}
Very similar to the gpby, except that attrs[]
contains the attribute name for which the scan is being done for entity:
{"proc-type": 4, "skey:"sk", "ekey":"ek", "enty-stream-id":1234}
Here there is no from_ts and to_ts as entity stream is not based on timestamp as it's primary key. It is also not a sliding window based table or stream. The enty-stream-id is nothing but the stream id for the entity stream.