Why do we need sessions?
We all have event data. Lots of it. In a lot of different places. It comes from user views and click streams, from backend logs and many other sources. This data can be extremely valuable if you use it correctly.
One challenge we found ourselves facing over and over again when analyzing our event data was how to perform logic on sessions of events. One KPI our product team wanted to track over time is the “one-sitting-end-to-end”: the percentage of users that succeed in completing the full flow, from defining a new input to loading events to a data warehouse, in a single sitting. The logic behind this KPI is that it is a proxy to how easy it is to use our product. A “single sitting”, or a session, was defined as a continuous usage session with no more than 30 minutes between events. Finding the events that signal the beginning of the flow (a user defining a new input) and the end of the flow (events from this input are being loaded to a data warehouse) is easy. However, finding whether these events are in the same session results in a very unpleasant SQL query. If, instead of having independent events, we had a
session_id variable that would be the same for all events in the same session, then the SQL to find sessions containing both the start and end events would be much nicer.
Sometimes it is possible to add this
We decided to solve this problem by harnessing Alooma’s Code Engine.
Writing Stateful Computations in the Code Engine
Alooma’s Code Engine allows a user to define custom Python code that is applied to all the events in the stream before they are loaded to the data warehouse. This is a very powerful and flexible tool for data cleansing and processing.
Instead of processing each event independently, we can assign a
context key to each event. Events that share the same context key value will then share a context object, which is a Python dictionary that the Code Engine can read and write to. Using this context object the Code Engine can implement stateful computations.
Writing a stateful computation involves two stages: The first stage is the
transform function, which is stateless, meaning it has no access to context. This stage is where the context key is assigned, typically based on a field in the event. It can also be used for simple transformations. A simple
transform function that only sets the context key would look like this:
def transform(event): event['_metadata']['context_key'] = event['user_id'] return event
The second stage is
transform_with_context. This function is applied after
transform and gets the event that was returned from
transform as input, along with its corresponding context object. The context is a Python dictionary which is initially empty, and
transform_with_context can edit it in-place. Anything written to it will be available when processing subsequent events sharing the same context key. The context key value is also passed for convenience. Here’s an example of a basic
transform_with_context function that counts the number of events sharing the same context key:
def transform_with_context(event, context, context_key): # if first event in the session, then context is empty, initialize it if 'count' not in context: context['count'] = 0 context['count'] += 1 event['count'] = context['count'] return event
If an event is not assigned a context key, it is not passed to
Now let’s look at how we can create and use sessions from streams of events using the stateful processing in the Code Engine.
Creating Sessions in Real-Time
In our use case we need to perform sessionization on events coming from two different inputs:
backend_events. Each input holds the “user ID” property, by which we want to sessionize, in a different field. The
transform function detects which input the current event is coming from, and pulls the user’s ID from the appropriate field.
import datetime import uuid SESSION_MAX_INACTIVITY = datetime.timedelta(minutes=30) def transform(event): if event['_metadata']['input_label'] == 'user_actions': event['_metadata']['context_key'] = event['user_id'] elif event['_metadata']['input_label'] == 'backend_events': event['_metadata']['context_key'] = event['data']['uid'] return event def generate_new_session_id(): return str(uuid.uuid4()) def transform_with_context(event, context, context_key): if not context: # Empty context means this is the first time # we're getting an event from this user, so # we initialize the context. Notice that you # MUST NOT assign a new dictionary to the context. # Instead, edit the context in-place. context['last_active'] = event['timestamp'] context['session_id'] = generate_new_session_id() if event['timestamp'] > context['last_active'] + SESSION_MAX_INACTIVITY: # Enough time passed since last activity. Start a new session. context['session_id'] = generate_new_session_id() # Enrich the event with session data event['session_id'] = context['session_id'] # Update the context data context['last_active'] = event['timestamp'] return event
transform_with_context adds a session ID to each event: If the event follows an earlier event from the same user within 30 minutes, then the function uses the session ID contained in the context object. If more than 30 minutes have passed, then a new session ID is generated.
Now that we have our session information as a part of the event, we can easily query the data to compute our “one-sitting-end-to-end” KPI: For each timeframe (e.g., a week), simply count the number of sessions that contain the two types of events:
ADD_INPUT from the
user_actions input, and
EVENTS_LOADED from the
backend_events input. We divide the number of these end-to-end sessions by the total number of sessions in each timeframe.
WITH session_input AS ( SELECT min(t) AS session_start_time, max(CASE WHEN event = 'ADD_INPUT' THEN 1 ELSE 0 END) and max(CASE WHEN event = 'EVENTS_LOADED' THEN 1 ELSE 0 END) AS end_to_end, session, input_name FROM sessions_test GROUP BY session, input_name ) SELECT week, count(1) AS num_sessions, sum(end_to_end) AS num_end_to_end_sessions, sum(end_to_end) :: FLOAT / count(1) AS end_to_end_ratio FROM ( SELECT date_trunc('week', session_start_time) AS week, session, max(end_to_end :: INT) AS end_to_end FROM session_input GROUP BY session, week ) GROUP BY WEEK ORDER BY WEEK;
session_input table describes for each session and input, whether a start event and an end event were observed. From the
session_input table, we group by session and select
max(end_to_end) as an indication whether this session contains a start event and an end event for at least one of its inputs. In the outermost query we count the number of sessions that are end-to-end, and their ratio from all sessions, grouped by week.
Using session information allowed us to measure important KPIs about our product usage, and drive better product decisions. Using the Code Engine’s stateful processing gave us an easy and efficient way to generate this session information.
We are just scratching the surface when it comes to all of the capabilities that stateful processing can bring to an organization. In addition to sessionization, stateful processing enables many more advanced computations in the Code Engine. In our next blog posts we’ll cover two more use cases: stream enrichment and anomaly detection.
Currently, our stateful processing feature is in a closed beta, so if you’re interested in experimenting with it, please contact us. We’d love to have you join as well as hear what other data applications you would like to build using this!