Ingestion pipeline

Last updated:

In its simplest form, the PostHog ingestion pipeline is a collection of services which listen for events as they are sent in, process them, and then store them for later analysis. This document gives an overview of how data ingestion works, as well as some of the caveats to be aware of when sending events to PostHog.

Client Library
/decide API
Capture API
Plugin server
PostgreSQL (persons table)
Kafka
Kafka
ClickHouse

Capture API

The Capture API represents the user-facing side of the ingestion pipeline, and is exposed as a number of API routes where events can be sent. Before an event reaches the Ingestion pipeline, there are a couple of preliminary checks and actions that we perform so that we can return a response immediately to the client.

These consist of:

  • Validating API keys
  • Anonymizing IPs according to project settings
  • Decompressing and normalizing the shape of event data for the rest of the system
  • Sending processed data to events_plugin_ingestion Kafka topic
  • If any of these operations fail (other than checking for data validity), logging events to the Kafka dead_letter_queue table

The goal of this step is to be as simple as possible, so that we can reliably get events into the ingestion pipeline, where Kafka can persist them until they are able to be processed. Events are written to the events_plugin_ingestion Kafka topic, which is then consumed by the plugin-server.

Plugin server

Within the plugin server events go through a number of different steps here is an overview:

delay 60s
plugin-server - Event buffer
Apps - processEvent
plugin-server - Person processing
plugin-server - Event processing
plugin-server - Writing to ClickHouse
Apps - onEvent
GeoIP
Property flattener
BigQuery Export
Snowflake Export
PostgreSQL
Kafka - events_plugin_ingestion
Kafka - clickhouse_events_json

In the sections below we will dive deeper into each step:

  1. Event buffer
  2. Apps - processEvent
  3. Person processing
  4. Event processing
  5. Writing to ClickHouse
  6. Apps - onEvent

If you would like to dive even deeper the related source code can be found here.

1. Event buffer

The event buffer sits right at the beginning of the ingestion pipeline, and gives us the ability to selectively delay the processing of certain events. This is only enabled for teams who have requested to have Persons on Events queries enabled. For more information, take a look at the all about the event buffer section in the appendix.

2. Apps - processEvent

After the event buffer, we start the first of a few steps that augment or transform our raw event before it gets written into ClickHouse. This first step runs any workloads that come from Apps that you have installed and who have exported a processEvent function. This is the only chance for apps to transform or exclude an event before it is written into ClickHouse.

An example of an app that uses the processEvent hook is the GeoIP Enricher. This app uses the $ip property to retrieve and add geographic information to each event as they are ingested.

3. Person processing

The next step in the ingestion pipeline is processing the Person who sent the event, which is determined by the distinct_id field. A number of different actions can happen here depending both on if we've seen this distinct_id before, as well as which type of event is being sent.

This is one of the most complex steps in the entire pipeline, so to help make things easier we'll break down this step into a number of smaller sections:

  1. Associate the event with a person
    1. $identify events
    2. $create_alias events
    3. All other events
  2. Update person properties

Note that in case there were any changes to persons we will update the persons info in ClickHouse too.

1. Associate the event with a person

Based on which type of event is currently being processed, we perform a number of different steps.

3.1.1 - $identify events

In the case of an $identify event, the first step is to use the $distinct_id and $anon_distinct_id fields that are sent with the event to determine what actions we will need to take.

  • $anon_distinct_id - The UUID associated with the client device that sent the event (Only included for events sent from client-side libraries)
  • $distinct_id - The distinct identifier for whichever user sent the event (Email, UUID, etc.). This can be set by the sender or is defaulted to $anon_distinct_id if it is not set

To determine what to do at this stage, we need to make a call to PostgreSQL to determine which scenario we are in:

1. Neither $anon_distinct_id nor $distinct_id have been associated with a PersonCreate a new Person and add a mapping in PostgreSQL to associate this $distinct_id with the new person_id
2. Only one of $anon_distinct_id and $distinct_id have been associated with a PersonCreate a new mapping to associate the $distinct_id and $anon_distinct_id with the already existing person_id
3. Both $anon_distinct_id and $distinct_id have been associated with a PersonWe will merge these two people and associate all future events with the person_id that was associated with the $distinct_id

Note: In the case the $anon_distinct_id is missing (e.g. events from backend libraries), we will treat this event like all other events.

Merging two Persons

In the third scenario, where we have inadvertently created two Persons for the same user, we will need to merge them. Note that PostHog has a few built-in protections, in which case the merge will not aborted. (more info).

In the case of an $identify call, we will merge the person tied to $anon_distinct_id (person_2) into the person identified by distinct_id (person_1). This means that we'll associate $anon_distinct_id with person_1, delete person_2 and all future events for $anon_distinct_id will be associated with person_1.

If there are any conflicts when merging Person properties for these two persons, the values from the non-anonymous person (person_1) will take precedence. We choose to prioritize the history of the non-anonymous person (person_1), as it is far more likely that this person will have a history of previous events associated with the user that we want to preserve. For more information on exactly how the merging of properties is done, check out this overview of user properties.

3.1.2 - $create_alias events

The process of handling $create_alias events is almost identical to the process for $identify events, except that instead of merging $anon_distinct_id into $distinct_id, we allow you to pass in two arbitrary $distinct_id's you would like to combine and merge the second one (alias) into distinct_id.

3.1.3 - All other events

For all other types of events, the process is much more straightforward. If we have determined that this is a new $distinct_id, then we will create a new Person within PostgreSQL and associate them with this $distinct_id. Otherwise, we will retrieve the person associated with this $distinct_id.

3.2. Update person properties

Now, once we have finished determining the Person who is associated with the event we are processing, we can finish by updating their properties within PostgreSQL. This step takes into account any $set, $set_once or $unset arguments provided on the event, and merges these with any existing values for the Person.

For more information on exactly how this updating is done, check out this overview of user properties.

4. Event processing

Finally, now that we have our event and person all ready, we perform a few last processing steps before we write the event to ClickHouse. This is our last chance to change anything about the event, which can include:

  • Adding group properties if the event has been assigned to a Group
  • Anonymizing IPs, if needed

5. Writing to ClickHouse

We combine the fully-processed event and the person from Step 3 and send it to a separate Kafka topic that ClickHouse will consume from and then write to the events table.

For more information on exactly how data is stored in ClickHouse, check out this reference

6. Apps - onEvent

The final step in the ingestion pipeline is calling the onEvent handler from any apps that we have enabled. This includes all of our export apps as well as some of our alerting/monitoring apps. It's worth noting that since this event has already been written to ClickHouse, it is effectively immutable at this point as we do not allow apps to directly update events. Any apps that need to transform events should use the processEvent handler.

Appendix

All about the event buffer

Determining if an event should be buffered

The buffer is only enabled for teams who have requested to have Persons on Events queries enabled.

After an event is consumed from Kafka, the plugin server will check a number of things in order to determine whether or not to buffer an event. If any of these conditions are true the event will not be buffered and will immediately move on to the next step.

  • If the distinct_id on the event is already associated with an existing Person
  • If the event is anonymous (In this case this means the distinct_id matches the device_id generated by the library that sent the event)
  • If the event is an $identify or $create_alias call that merges ids (note that before version 1.42 non merging events were also processed immediately)
  • If the event is coming from one of our mobile libraries, as it is not easy to determine if an event is anonymous and we don't want to delay all events from mobile

If an event coming in satisfies none of these checks, then it will be added to the buffer and sent back through the ingestion pipeline after 60s. Since this means that events would be delayed, our goal is to use the buffer as sparingly as possible.

Choosing to use the event buffer is a trade-off between events arriving quickly and events being associated the desired person.

For more detail on this step, check out this file from the plugin-server codebase.

Why are events buffered?

Since events coming into PostHog can arrive in a suboptimal order, we sometimes decide to delay the processing of specific events. There are two scenarios where events can arrive in suboptimal order that the event buffer is designed to handle:

  1. Events that are sent from the same library and session may arrive out of order due to network uncertainty
  2. During the initial signup flow the backend event might arrive before frontend has identified the user

It will become more obvious why it is better to buffer events in these two cases as we dive deeper into the example below.

Let's look at a initial sign-up flow (identify event is used to sign-up/log-in).

In the case that we're sending a signup event from the backend, this event will typically arrive before the identify event on the frontend. If we did not use the buffer, here is the events table we would see.

IDEventdistinct_idperson_idDetails
1pageviewanon-1user-1New person with id user-1 is created
2backend signupAliceuser-2New person with id user-2 is created
3identifyAlice (anon_distinct_id = anon-1)user-2Merge user-1 into user-2

As you can see, we end up with two different person_id's in the events table for the same user. See the impact this has on queries here.

Now, let's take a look at how the event buffer can help to prevent this.

IDEventdistinct_idperson_idDetails
1pageviewanon-1user-1New person with id user-1 is created
3identifyAlice (anon_distinct_id = anon-1)user-1distinct_id Alice is associated with user-1
2backend signupAliceuser-1This event is now processed last because it was buffered

As you can see, since we delayed the processing of event 2, we were able to avoid creating an unnecessary Person.

A similar scenario can occur when events with non-anonymous distinct_id arrive before an identify call due to network uncertainty.

The buffer is designed to help with the initial sign-up flow, and has no effect on events that are sent after this point. As an example, let's take a look at the following series of events:

IDEventdistinct_idperson_idDetails
1pageviewanon-1user-1New person with id user-1 is created
2identifyAlice (anon_distinct_id = anon-1)user-1distinct_id Alice is associated with user-1
3pageviewanon-2user-2Imagine this was the same user opening the page in incognito mode.
4identifyAlice (anon_distinct_id = anon-2)user-1Alice logs in. Merge user-1 into user-2

Since event 3 is anonymous, we do not buffer it and create a new Person with id user-2.

Questions?

Was this page useful?

Next article

ClickHouse

ClickHouse is our main analytics backend. Instead of data being inserted directly into ClickHouse, it itself pulls data from Kafka. This makes our ingestion pipeline more resilient towards outages. The following sections go more into depth in how this works exactly. Events In order to make PostHog easy to scale, we use a sharded ClickHouse setup. kafka_events table kafka_events table uses the Kafka table engine Tables using this engine set up Kafka consumers that consume data on read queries…

Read next article