This is useful to compute extra set of attributes from the event stream as we ingest data from the source. For example: if we get attribute a, b, c,… etc. and we wish to compute a3 based on some logic, then we need to define that here. This is how it looks:

[
   {
      "name":"m",
      "type":9,
      "opnm":"MUL",
      "stat":3,
      "iatr":[
         "b",
         "c",
         "d"
      ],
      "seq":1,
      "order":0
   },
   {
      "name":"n",
      "type":11,
      "stat":1,
      "opnm":"comp_int",
      "iatr":[
         "g",
         "h"
      ],
      "seq":0,
      "order":1
   },
   {
      "name":"o",
      "type":5,
      "opnm":"string_add",
      "iatr":[
         "a",
         "b"
      ],
      "order":2
   },
   {
      "name":"p",
      "type":5,
      "opid":3,
      "opnm":"myudf3",
      "iatr":[
         "c",
         "b"
      ],
      "order":3
   },
   {
      "name":"mexp",
      "type":9,
      "opnm":"MATH_EXP",
      "iatr":[
         "((($g+$h)*2)+($g*$h))"
      ],
      "order":4
   },
   {
      "name":"x",
      "type":11,
      "opnm":"PRED",
      "model":"mymodel1",
      "algo":"SVM",
      "attr_type":"HYB",
      "iatr":[
         "a",
         "b",
         "c"
      ],
      "order":5
   }
]

Let's look at each one by one

{"name":"m", "type":9, "opnm":"MUL", "stat": 3, "iatr":["b", "c", "d"],"seq":1,"order":0}

It says that compute new attribute "m" of type 9(long), from (b, c, d) using opnm : "MUL" (multiply) and enable “stat” as well (type 3, running stats).

Here are few default operations that can be used or user may upload a udf (user defined function - as explained in udf section separately) and use that. Following default operations are available within db. When we wish to use “opid” instead of "opnm", we may use following:

enum BANGDB_DEFAULT_UDF {
    // following are for computations of value of different attribute 
    BANGDB_DEFAULT_UDF_COPY = 1, 
    BANGDB_DEFAULT_UDF_ADD,
    BANGDB_DEFAULT_UDF_MUL,
    BANGDB_DEFAULT_UDF_DIV,
    BANGDB_DEFAULT_UDF_PERCENT,
    BANGDB_DEFAULT_UDF_SUB,
    BANGDB_DEFAULT_UDF_UPPER, // for string, it's upper case, for double it's ceiling, long doesn't care
    BANGDB_DEFAULT_UDF_LOWER, // for string, it's lower case, for double it's floor, long doesn't care 
    BANGDB_DEFAULT_UDF_COPY_VAL, // copies the value, doesn't use the val as attribute to read it from event 
    BANGDB_DEFAULT_UDF_LOG_BASE_E, 
    BANGDB_DEFAULT_UDF_LOG_BASE_2,
    BANGDB_DEFAULT_UDF_LOG_BASE_10,
    BANGDB_DEFAULT_UDF_MATH_EXP,
    BANGDB_DEFAULT_DATE_TS,
    BANGDB_DEFAULT_DATE_YEAR_ACTUCAL,
    BANGDB_DEFAULT_DATE_YEAR_EPOCH,
    BANGDB_DEFAULT_DATE_MONTH_ACTUAL,
    BANGDB_DEFAULT_DATE_MONTH_EPOCH,
    BANGDB_DEFAULT_DATE_WEEK_ACTUAL,
    BANGDB_DEFAULT_DATE_WEEK_MONTH,
    BANGDB_DEFAULT_DATE_WEEK_EPOCH,
    BANGDB_DEFAULT_DATE_DAY_YEAR,
    BANGDB_DEFAULT_DATE_DAY_WEEK,
    BANGDB_DEFAULT_DATE_DAY_MONTH,
    BANGDB_DEFAULT_DATE_DAY_EPOCH,
    BANGDB_DEFAULT_DATE_HOUR_ACTUAL,
    BANGDB_DEFAULT_DATE_HOUR_EPOCH,
    BANGDB_DEFAULT_DATE_MIN_ACTUAL,
    BANGDB_DEFAULT_DATE_MIN_EPOCH,
    BANGDB_DEFAULT_DATE_SEC_ACTUAL,
    BANGDB_DEFAULT_ABS_VAL,
    BANGDB_DEFAULT_GEOHASH,
    BANGDB_DEFAULT_UDF_INVALID = 1024
};

In the "opnm", we would however add following:

"COPY" // simply copy the attribute val
"ADD" // add two attributes values
"MUL" // multiply the attributes values 
"DIV" // divide the left attribute with the right one 
"PERCENT" // compute percentage, left of right 
"SUB" // subtract right one from left one 
"UPPER" // convert attribute value to upper case 
"LOWER" // convert attribute values to lower case 
"COPY_VAL" // this is to copy the value, not the attribute value, but whatever value is provided "LOG_E" // log to the base e 
"LOG_2" // log to the base 2, ln "LOG_10" // log to the base 10 
"MATH_EXP" // math expression, involving attributes and fixed values 
"PRED", // for prediction 
"TS", // timestamp given a date (ec; dd//mm/yy hr:mn:sc, etc...) 
"YEAR", // year from the date 
"YEAR_EPOCH", // num of year since epoch 
"MONTH", // month from the date 
"MONTH_EPOCH", // month since epoch 
"WEEK", // week from the date 
"WEEK_MONTH", // week of the month 
"WEEK_EPOCH", // week since epoch 
"DAY", // day from the date 
"DAY_WEEK", // day of the week 
"DAY_MONTH", // day of the month 
"DAY_EPOCH", // num of days since epoch 
"HOUR", // hour from the date 
"HOUR_EPOCH", // hours since epoch 
"MINUTE", // min from the date 
"MINUTE_EPOCH", // min since epoch 
"SECOND", // second from the date 
"ABS", // abs value of as it is 
"GEOHASH" // hash value for geo loc (lat, lon)

Another one

{"name":"n", "type":11, "stat":1, "opnm":"comp_int", "iatr":["g", "h"]}

It tells, compute attribute n of type(double) from input attribyte (g, h) using a udf name comp_int (implemented and uploaded by the user) and enable "stat": 1 (counting). Since "comp_int" is not the name of any of the default udf, therefore the db would look for custom udf implemented and supplied by the user.

{"name":"o", "type":5, "opnm":"string_add", "iatr":["a", "b"]}

It computes an attribute 0 of type 5(string) from input attributes (a, b) using udf string_add

{"name":"p", "type":5, "opid":3, "opnm":"myudf3", "iatr":["c", "b"]

This is also in similar lines, but it has both opid and opnm, in such case it uses opid 3.

{"name":"myexp", "type":9, "opid":13, "iatr":["((($g+$h)*2)+($g*$h))"]}

This is bit different, here it computes attribute "myexp" of type 9(long) using "opid": 13(BANGDB_DEFAULT_UDF_MATH_EXP) which says use math expression as defined in the "iatr" for the given input attributes. Here it adds g and h values then multiplies by 2 and the adds with multiple of g and h.

This should be simple enough, but it has lots of value as we can create new attributes and associate them with the stream before further processing.

Now, let's look at how to do prediction on stream.

{
   "name":"x",
   "type":11,
   "opnm":"PRED",
   "model":"mymodel1",
   "algo":"SVM",
   "attr_type":"HYB",
   "iatr":[
      "a",
      "b",
      "c"
   ]
}

Let's say when we ingest events in any stream, we wish to use set of attributes in the event and then use pre-trained model.

To do prediction and store the prediction output in the stream itself in some attribute. Here, we use "catr" and pretty much use the defined structure of the "catr" expect few additions and they are:

Rest all is same, we can use "stat" on the attribute, further this attribute can participate in different computations of the "catr", etc…

Few Examples

Let's say, we have a stream of data with attributes (a,b,c,m), here is the schema for the same.

{
   "schema":"myapp",
   "streams":[
      {
         "name":"product",
         "type":1,
         "swsz":86400,
         "inpt":[
            
         ],
         "attr":[
            {
               "name":"a",
               "type":5,
               "sidx":1,
               "stat":2,
               "ridx":1
            },
            {
               "name":"b",
               "type":9,
               "stat":3
            },
            {
               "name":"m",
               "type":11,
               "stat":3
            },
            {
               "name":"c",
               "type":5,
               "kysz":24,
               "stat":2
            }
         ]
      }
   ]
}

Now, let's compute several other attributes as required.

  1. Lower the attribute a, i.e replace 'Sachin' to 'sachin'.
  2. Here we would like to apply replace the attribute a with lower form of the same. Therefore for us here both "name" and "iatr" will be the same. To replace the attribute, we have "fnr" tag (find and replace) that we can set. "fnr" can take following values.

    Therefore, we can do following:

    {"name":"a","type":5,"opnm":"LOWER","iatr":["a"],"stat":2,"fnr":2}
  3. Add missing fixed value, if missing then add else ignore
  4. {"name":"i","type":5,"opnm":"COPY_VAL","iatr":["sachin"],"fndr":3}
  5. Compute new attribute using math expression
  6. {"name":"b","type":9,"opnm":"MATH_EXP","iatr":["($b*10)"]}