Documentation

Code Engine Common Tasks

Abstract

SummaryA listing of various common uses for the Code Engine, including sample Python code, and how to create and use code modules.

Here are some of the many common tasks where the Code Engine can help you transform and enrich your events:

Creating new fields

One common use for the Code Engine is to modify events by adding new fields. These fields can store the results of lookups or external data, concatenations of existing fields, or other data. 

Regardless of how the new fields are created or populated, they're added as new keys in the event object. New keys added via the Code Engine will be replicated as new fields in the Mapper and then added to the table in the data destination.

Suppose we have an event that has an address and we'd like to add the postal code from another table or perhaps from a Geo-IP lookup service. When we add a new field (postal_code in our example below), the Mapper automatically adds that column to our table in the data destination.

First our transform adds the field to the event:

def transform(event):
    ...
    event['postal_code'] = # info we get from a Geo-IP service or from a table
    return event

And here's the new field in the event:

{
    ...
    "country": "France",
    "postal_code": "75001",
    "_metadata": {
        "@timestamp": "2018-10-16T15:26:57.027Z",
        "@uuid": "cf722353-370a-4cc8-9463-51d6",
        ...
    }
}

Take care when enriching events with data from external sites as this can cause lag if the service you're querying slows down or stops responding. Often, connecting to external/third party services can be more efficient when performed within the data destination.

Discarding events

Suppose that an input sets a field for the user’s login status, and you wish to only record events from users who are logged in. The following code could be used to discard events where the user is not logged in.

def transform(event):
    if (event['login_status'] == false):
        return None
    else:
        return event
Removing/Blacklisting specific data

If you have a data source that includes, for example, a table you want to blacklist/remove, you can do something like this:

#
# Removing Data from Alooma and Blacklisting Certain Data
#
# Input <-> table blacklist mapping
BLACKLIST_MAPPING = {
  '<input_label>': ["ugly_table"]
}

def is_blacklisted(event, input_label):
  if input_label in BLACKLIST_MAPPING:
    event_type = event['_metadata']['event_type']
    if event_type in BLACKLIST_MAPPING[input_label]:
      return True
  return False

# The transform checks to see if the event/input_label is blacklisted
def transform(event):
  input_label = event['_metadata']['input_label']
  if is_blacklisted(event, input_label):
    return None

  event['_metadata']['event_type'] = "%s.%s" %(dataset_name, event['_metadata']['event_type'])

  return event
Splitting events

An event can be split into multiple events. For example, suppose incoming events each include a list of websites visited by a user, and you want a separate event for every website that each user visits.

This sample function returns a list of event dictionaries, where each dictionary is composed of a site and the user from the original single event.

def transform(event):
    event_list = []
    for site in event['sites']:
        site_visit = {}
        site_visit['site'] = site
        site_visit['user'] = event['user']
        event_list.append(site_visit)
    return event_list

After returning multiple events, each event is automatically packaged with a _metadata dictionary corresponding to its parent event. However, the metadata fields on such events are not available for access in the Code Engine. Thus, the _metadata fields cannot be transformed unless explicitly copied to each event object. The following code example amends the previous example with an explicit metadata copy and field assignment:

from copy import deepcopy

def transform(event):
    event_list = []
    for site in event['sites']:
        site_visit = {}
        site_visit['site'] = site
        site_visit['user'] = event['user']
        site_visit['_metadata'] = deepcopy(event['_metadata'])
        site_visit['_metadata']['event_type'] = "transform_code"
        event_list.append(site_visit)
    return event_list

Regardless of whether the _metadata dictionary is added automatically or explicitly, the dictionary will appear in the Mapper. The _metadata dictionary and its fields are discussed here.

Flattening JSON

If you are importing JSON data that includes nested fields (typically from a webhook, SDK, or REST API data source), you may want to flatten the JSON before it's loaded into your data destination.

When your JSON data is imported, Alooma creates a column in the target data warehouse for every top level key (except for _metadata). This can be an issue if your key contains nested JSON as that JSON will become the contents of the column.

Here's an example of a basic JSON flattening function as it might appear in the Code Engine:

import collections

def flatten(d, parent_key='', sep='_'):
  items = []
  for k, v in d.items():
    if k != '_metadata':
      new_key = parent_key + sep + k if parent_key else k
      if isinstance(v, collections.MutableMapping):
          items.extend(flatten(v, new_key, sep=sep).items())
      else:
          items.append((new_key, v))
  return dict(items)

def transform(event):
  metadata = event['_metadata']
  event = flatten(event)
  event['_metadata'] = metadata
  return event

Note

This is not intended to be a one-size-fits-all example of how to flatten JSON. Your data will vary, and you will likely need to modify, perhaps heavily, the example above. That said, the example should help you on your way. If you have questions, please reach out.

Here's some very simple sample data, before flattening (the _metadata is just copied over so it's not important for this example):

{
    "name": "Inventory",
    "EastLot": {
        "SUV": 19,
        "Van": 5,
        "Sedan": 33,
        "Coupe": 20
    },
    "_metadata": {
        ( ... )
    }
}

If we do not flatten the JSON, the resulting import will include a column named EastLot and the JSON fields are included as the value of EastLot.

Here is the same JSON after flattening via the transform above:

{
    "name": "Inventory",
    "EastLot_SUV": 19,
    "EastLot_Coupe": 20,
    "EastLot_Van": 5,
    "EastLot_Sedan": 33,
    "_metadata": {
        ( ... )
    }
}

Now each field is a column.

Figure 1. Flattening JSON via the Code Engine
Flattening JSON via the Code Engine

(Click to enlarge)



Geo-IP resolution (enriching events)

The Alooma Code Engine supports direct extraction of geographical information from IP addresses. This is an example of how to use the Code Engine to enrich existing events with supplemental data. Simply import the geoip library and call the geoip.lookup function on an IP address. The function returns an object containing the country, country code, region, city, and postal (zip) code.

Given the following sample event:

 {
    "_metadata": {
        "input_label": "REST_Endpoint",
        "event_type": "REST_Endpoint",
        "client_ip": "194.153.110.160",
        "@version": "1",
        "@timestamp": "2015-10-16T15:26:57.027Z",
        "host": "172.17.0.73",
        "@uuid": "af721753-370a-4cc8-9463-5f62c82988e2",
        "@parent_uuid": ""
    }
} 

And the following transform code:

import geoip
def transform(event):
    addr = event['_metadata']['client_ip']
    geoip_info = geoip.lookup(addr)
    event['country'] = geoip_info.country
    event['country_code'] = geoip_info.country_code
    event['region'] = geoip_info.region
    event['city'] = geoip_info.city
    event['postal_code'] = geoip_info.postal_code
    return event

The transformed event appears below. Note the new country, country_code, region, city and postal_code fields are added to the table in the data destination automatically by the Mapper:

{
    "country": "France",
    "countrycode": "FR",
    "region": "J",
    "city": "Paris",
    "postal_code": "75001",
    "_metadata": {
        "@timestamp": "2015-10-16T15:26:57.027Z",
        "@uuid": "af721753-370a-4cc8-9463-5f62c82988e2",
        "@version": "1",
        "host": "172.17.0.73",
        "client_ip": "194.153.110.160",
        "input_label": "REST_Endpoint",
        "event_type": "REST_Endpoint",
        "@parent_uuid": ""
    }
}

Alooma uses IP2Location for Geo-IP resolution and we update to the latest version each month. Geo-IP resolution works on both IPv4 and IPv6 addresses. If an IP address is invalid, or in the rare case that a country cannot be found, then the lookup function returns None. City and postal code data is less comprehensive, and may be None if there is no information for a given IP address.

Notification generation

Alooma provides an API to generate notifications that appear in the notification pane of the Dashboard page. You can generate notifications to display information, warnings, and errors. A notification has two string arguments: a title and a description.

import notifications
def transform(event):
    product = event['product']
    inventory = event['inventory']

    if 1 < inventory < 5:
        description = product + ": " + str(inventory) + " remaining"
        notifications.info("Inventory running low", description)
    if inventory == 0:
        description = product + " is out of inventory"
        notifications.warn("No inventory left", description)
    if inventory < 0:
        description = product + " has negative inventory"
        notifications.error("Inventory accounting error", description)
    return event
notifications.png

Multiple notifications are aggregated by their title when received within 15 minutes of one another. Aggregated notifications can be expanded in the notification pane of the Dashboard page in order to see the separate descriptions for each notification.

Note that when running code in the Code Engine, notifications from the execution will not appear in the notification pane.

User-agent parsing

The Alooma Code-Engine supports user-agent parsing using the ua-parser library.

Given the following sample event:

{
    "user_agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/41.0.2272.104 Safari/537.36",
    "_metadata": {
        ...
    }
}

And the following transform code:

from ua_parser import user_agent_parser
def transform(event):
    result = user_agent_parser.Parse(event['user_agent'])
    event['browser'] = result['user_agent']['family']
    event['OS'] = result['os']['family']
    return event

The transformed event appears below.

{
    "user_agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/41.0.2272.104 Safari/537.36",
    "browser": "Chrome",
    "OS": "Mac OS X",
    "_metadata": {
        ...
    }
}
Flexible date parsing

Flexible date/time string parsing is provided via the Python dateutil module.

Given the following sample event:

{
    "user_time": "Sat Oct 14 07:13:46 UTC 2013",
    "_metadata": {
        ...
    }
}

And the following transform code:

from dateutil.parser import parse
def transform(event):
    timestamp = parse(event['user_time']) # type(timestamp) => <type 'datetime.datetime'>
    event['user_time'] = timestamp.isoformat()
    return event

The transformed event appears below. Notice user_time is now ISO-format.

{
    "user_time": "2013-10-14T07:13:46+00:00",
    "_metadata": {
        ...
    }
}
Retrieving elements from nested dictionaries

Transform code often has long or nested conditional statements to check for the presence of nested dictionary elements in the event object. This convention can result in cumbersome code, but is necessary to avoid KeyError exceptions when accessing a dictionary.

The following get function provides a shortcut to retrieve values if they exist, and avoids KeyError exceptions if values do not exist.

def get(dictobj, *path):
    '''
    Get path from dictobj.
    Returns None if path does not exist.

    e.g.: dictobj = {'parent': {'child': {'grandchild': 'foo'}}}
          get(dictobj, 'parent', 'child', 'grandchild')
          > 'foo'
          get(dictobj, 'some', 'other', 'path')
          > None

    Can be used to simplify code like:

    if ('data' in event and 'url' in event['data'] and event['data']['url'] == 'xxx')
    
    to:
    
    if get(event, 'data', 'url') == 'xxx'

    '''
  element = dictobj
  for path_element in path[:-1]:
    if path_element not in element:
      return None
    element = element.get(path_element)

  return element.get(path[-1], None)

For example, accessing event['_metadata']['client_ip'] results in a KeyError if the event is missing either the _metadata dictionary or client_ip key.

In contrast, get(event, ‘_metadata’,’client_ip’) gracefully returns None if any dictionary elements are missing or if the returned value is equal to None.

Now that you've seen the basics of the Code Engine, continue to learn about testing your code in the UI or programmatically.

Handling surrogates in data

If your data includes UTF-16 characters that have surrogates, the mapping can fail as the event is processed and the output may become corrupted. The solution for this is to strip out (or replace) those UTF-16 characters. In our example below, we're replacing any such UTF-16 characters with a question mark (?) as specified in the discard_surrogates() function.

import re

HAS_UTF8 = re.compile(r'[\x80-\xff]')
ESCAPE_ASCII = re.compile(r'([\\"]|[^\ -~])')
SURROGATES = {
    u'\ufdd0', u'\ufdd1', u'\ufdd2', u'\ufdd3', u'\ufdd4', u'\ufdd5', u'\ufdd6',
    u'\ufdd7', u'\ufdd8', u'\ufdd9', u'\ufdda', u'\ufddb', u'\ufddc', u'\ufddd',
    u'\ufdde', u'\ufddf', u'\ufde0', u'\ufde1', u'\ufde2', u'\ufde3', u'\ufde4',
    u'\ufde5', u'\ufde6', u'\ufde7', u'\ufde8', u'\ufde9', u'\ufdea', u'\ufdeb',
    u'\ufdec', u'\ufded', u'\ufdee', u'\ufdef', u'\ufdf0', u'\ufdf1', u'\ufdf2',
    u'\ufdf3', u'\ufdf4', u'\ufdf5', u'\ufdf6', u'\ufdf7', u'\ufdf8', u'\ufdf9',
    u'\ufdfa', u'\ufdfb', u'\ufdfc', u'\ufdfd', u'\ufffe', u'\uffff'
}

def replace_surrogate(match):
    s = match.group(0)
    return '?' if s in SURROGATES else s

def discard_surrogates(s):
    if s is None:
        return s

    elif not isinstance(s, basestring):
        raise ValueError('Only handles strings')

    elif isinstance(s, str):
        if not HAS_UTF8.search(s):
            print 'Does not need cleaning'
            return s

        s = s.decode('utf-8')

    return ESCAPE_ASCII.sub(replace_surrogate, s)

You should not run this code on every field in every event as that may slow processing of events in large volume environments. Rather, create a map of the event types and fields that you do wish to parse and have the transform only check fields in the map.

Hashing information

One way to avoid having Personally Identifiable Information (PII) in your data warehouse is to hash it as it flows through the Code Engine. Here's an example of a basic hash function and transform as it might appear in the Code Engine. In this example, we're looking for events in the "Customer" table, and we'll hash the values of the "Address" and "Income" fields.

import uuid
import hashlib

events_to_hash = ['Customers'] # list of tables to transform
fields_to_hash = ['Address', 'Income'] # fields in those tables to hash

def hash_password(password):
  # uuid is used to generate a random number
  salt = uuid.uuid4().hex
  return hashlib.sha256(salt.encode() + password.encode()).hexdigest() + ':' + salt

def hash_event(event, fields):
  for field in fields:
    if isinstance(event['field'], str):
      event[field] = hash_password(event[field])
    else:            # str() if value is not a string
      event[field] = hash_password(str(event[field])) 
    return event


def transform(event):
  if event['_metadata']['event_type'] in events_to_hash:
    event = hash_event(event, fields_to_hash)
  return event

So the idea is to specify the table that holds the data and the fields within that data to hash. Here's some very simple sample data, prior to hashing (the _metadata is not important for this example):

{
    "CustID": "12345",
    "Address": "123 Elm St.",
    "Income": 59000,
    "CustGroup": 1122,
    "_metadata": {
        ( ... )
    }
}

And here's that data with the "Address" and "Income" fields hashed as a result of the transform:

{
    "CustID": "12345",
    "Address": "76fde0e163256e319cb3e18cdd238d9903d1b6a3aa24f32074191",
    "Income": "def00770e163256e319cb3e18cdd238d99031b6a3aa24f320656c",
    "CustGroup": 1122,
    "_metadata": {
        ( ... )
    }
}
Prepending the schema onto the event type

When it comes to mapping, there are several options for designing your data destination. For some configurations, using the OneClick mapping makes sense. In others, creating the target schemas based on the source schemas is the right approach. In Alooma, when a value is prepended to an event, and automapping is on, we automatically create a new schema based on that prepended value.

For MySQL inputs, you can take advantage of this by adding the schema to the event type name (so event_type becomes schema.event_type). This can be helpful when sending events from MySQL to schemas in the data warehouse that match the source schemas.

Here's an example of how to prepend the schema onto the event type:

def prefix_event_type(event):
    schema = event['_metadata'].get('schema')
    event_type = event['_metadata'].get('event_type')
    if schema:
        event['_metadata']['event_type'] = schema + "." + event_type
    return event_type
Working with secrets and alooma.py

Because there are times you will need to pass sensitive information (things like tokens, keys, usernames, and passwords) from the Code Engine, you can define these as "secrets" via the Alooma API and then reference them in the Code Engine without having to know or show the values of the secrets.

For example, using alooma.py, you can set (and get and delete) secrets:

api.set_secrets({"my_user":"joe@example.com", "my_password": "12345678"})

In this case, we've set two secrets: one called "my_user" with the value of "joe@example.com" and one called "my_password" with the value of "12345678". Now, once you include alooma_secrets, you can reference those secrets in the Code Engine:

import alooma_secrets


def use_credentials(user, password):
    # code that uses credentials here
    return True

def transform(event):
    credentials = {
        'user': alooma_secrets.get("my_user"),
        'password': alooma_secrets.get("my_password"),
    }
    
    event['new_data'] = use_credentials(user=credentials['user'],
                                        password=credentials['password'])
    return event
Code responsibly :)

As any good developer knows, you don't just go and deploy code without testing it! Learn more about testing your Code Engine code using alooma.py.

Search results

    No results found