Real-Time Anomaly Detection on the Stream

Ofri Raviv  •  4 min read  • 1 Feb 2017  • 

Monitoring KPIs in real-time is a must in today’s data-driven, competitive environment. However, the technical implementation of a system that tracks and responds to changes in KPIs can be challenging. In this blog post we explain how Alooma’s Code Engine can be used to track metrics in real-time, to detect when the metrics deviate from their previous values, and respond to these changes.

We’ve already covered the basics of stateful stream processing with an example of sessionization and an example of stream enrichment, so we’ll jump right into the details.

For this use case, we’ll monitor several metrics found in the events, and alert when they deviate from their previous values too much.

The Code Engine transform function splits the incoming events, and returns a list of events. One event in the list is the original event, that will continue the regular flow to the data warehouse. The rest of the list is comprised of an event for each metric we monitor. Only the metrics events are assigned a context key, so only they are passed to transform_with_context.

In transform_with_context we keep a list of the last VALUES_LEN values for each metric, and generate a notification if the current value is outside the range of mean ± 3 standard deviations.

import notifications  # This is an Alooma package

METRICS = ['response_time', 'response_size_bytes', 'total_order_amount']

def mean(values):
    n = len(values)
    return float(sum(values)) / n

def stdev(values):
    m = mean(values)
    ss = sum((x - m)**2 for x in values)
    return (ss / len(values)) ** 0.5

def transform(event):
    events_to_return = [event]
    for metric in METRICS:
        if metric in event:
            # Generate a new event, with a single attribute, called 'value'
            # and use the metric name as the event_type and context_key
            e = {
                'value': event[metric], 
                '_metadata': {
                    'event_type': metric,
                    'context_key': metric

    return events_to_return

def transform_with_context(event, context, context_key):
    # First time this metric is found. Initialize context
    if not context:
        context['values'] = []

    # Only check after we have enough samples
    if len(context['values']) == VALUES_LEN:
        m = mean(context['values'])
        sd = stdev(context['values'])
        v = event['value']
        if not(-3 * sd < v - m < 3 * sd):
                'Metric %s deviation' % context_key,
                'Current value: %.3f\nMean: %.3f\nSD: %.3f' % (v, m, sd))

    # Add the current value to the values list in the context object

    # Trim the values list if it is too long
    context['values'] = context['values'][-VALUES_LEN:]

    return None

Of course, this is a very basic snippet, that can be extended in many ways. For example, the logic of defining a deviation can be extended to compare the value of the metric to the prediction of several models. As another example, the reference list of values to which the current value is compared can be the values in a certain time window, instead of a constant number of last values.

The current output of the anomaly detection process is a dashboard notification in Alooma’s UI. We’re currently working on enabling the Code Engine to make REST requests to external services. Using REST requests will allow alerting a monitoring system or triggering some custom flow when an anomaly is detected.

Currently, our stateful processing feature is in a closed beta, so if you’re interested in experimenting with it, please contact us. We’d love have you join as well as hear what other data applications you would like to build using this!

Get your data flowing today!
Contact us for a demo or free trial.