Enriching Streaming Data With External Sources

by Ofri Raviv  
4 min read  • 25 Jan 2017

The need or want to enrich your data is not a new concept. However, the concept of enriching data in motion is new, and stateful processing is shifting the paradigm. In this post we’ll dive into another type of stateful processing: stream enrichment.

A very common challenge our customers face is to be able to join events data (e.g., click streams) with features coming from other data sources (e.g., a users database). Alooma provides database integrations, so one solution would be to replicate the users table to the data warehouse, and then use SQL JOIN operation when querying the data. However, this solution has its limitations:

  1. It significantly degrades performance compared to having all the required attributes in the same table.
  2. In some applications the values that need to be added to the streaming events have to reflect a snapshot of the external data source, and not update when the external data source changes later. For example, an application that enriches purchase events with currency exchange rates - you need to add the exchange rate at the moment of the purchase and not use the most up-to-date rate available at the moment of query.

Stateful Processing to the Rescue

Here is a use case in which we enrich events coming from mobile_events input, with user data that originates in a database table describing the users. Data from the users table is sent to the Alooma Platform as another stream, by the users_data input. The Code Engine’s transform function is only used to extract the context key from the two relevant event types.

When transform_with_context processes events coming from the users_data input, it writes data from the event to the context, and then discards the event.

When transform_with_context processes events coming from the mobile_events input, it reads data from the context and enriches the event. Note that the users table is not queried once per event, or even once per batch. Instead the relevant part of the users table is replicated to the worker process memory, using the context. This means the performance impact of this enrichment process on the database storing the users data is minimal.

def transform(event):
    if event['_metadata']['input_label'] == 'mobile_events':
        event['_metadata']['context_key'] = event['user_id']
    elif event['_metadata']['input_label'] == 'users_data':
        event['_metadata']['context_key'] = event['uid']
    return event

def transform_with_context(event, context, context_key):
    # users_data events only update the context, and return None ( = discard)
    # mobile_events read from the context

    if event['_metadata']['input_label'] == 'users_data':
        context['user_name'] = event['user_name']
        context['user_segment'] = event['user_segment']
        # discard the event, because we don't need to load it
        # to a data warehouse
        return None  

    elif event['_metadata']['input_label'] == 'mobile_events':
        # Use context.get because the context may be empty if we get 
        # mobile_events before we got the corresponding users_data.
        # This will just populate name and segment with None.
        event['user'] = {
            'name': context.get('user_name'),
            'segment': context.get('user_segment')}
        return event

The newly added fields, user.name and user.segment are mapped to columns in the events table, allowing us to easily query over them without the need to use SQL JOIN operations.

Currently, our stateful processing feature is in a closed beta, so if you’re interested in experimenting with it, please contact us. We’d love to have you join as well as hear what other data applications you would like to build using this!

This might interest you as well