Ingestion lag

Last updated:

Important: Please ensure you have access to production so that you are able to handle incidents!

Background

Team Pipeline has committed to processing "standard" events end-to-end within 30 seconds (p95) and "buffered" events within 1m30s (p95).

It's important for customers that their data is queryable soon after being sent so they can debug their implementation and get the feeling that PostHog is working correctly.

To calculate how we're doing with respect to our commitment, we have a Celery task that every 60 seconds calculates lag based on an event that we emit every minute from a plugin.

As a result of this, our metric includes 60 second error bars, meaning we actually target 1m30s for "standard" events and 2m30s for "buffered" events.

Also, beyond measuring how fast we are, the ingestion lag metric being high might also be an indication that we're not processing events at all, so it's important that we treat it seriously.

Debugging

There are many things that can lead to ingestion lag being high, so it's very important to rule out parts of the system completely in order to determine the root cause.

The first thing to establish is if we're processing events with a delay or not processing events at all. A third option to consider later on is if we're processing some events and dropping others.

You can do this in many ways:

  1. Look at the live events tab

  2. Send an event and see if it appears on live events

  3. Check ClickHouse for the latest event _timestamp, and ingestion rate per minute

    SQL
    SELECT
    toStartOfMinute(timestamp) ts,
    count(*)
    FROM events
    WHERE timestamp > now() - INTERVAL 2 HOUR AND timestamp < now()
    GROUP BY ts
    ORDER BY ts

    Please note that timestamp reflects the time the event happened, not when the event was ingested. Thus it is possible that this metric is off if users are sending us events in the past or the future. However, it generally is a great representation of things. You can use _timestamp to get ingestion time timestamps, but note that it is a column that's not in our sorting key and thus filtering on it leads to expensive (and thus slow) queries.

If we're not processing events, then things are very serious. However, even then there are levels to the issue.

The worst case scenario is if events are being dropped by the capture endpoint (posthog/api/capture.py). This is because these events are forever lost.

It's a smaller but still serious issue if the plugin server is unable to process events or ClickHouse unable to consume them. These events will remain in Kafka until the messages older than the specified retention for the topic.

To determine this, there are a few useful resources:

Actions

Restarting the ingestion pods

A reasonably safe operation to perform is restarting the plugin server ingestion pods. This doesn't cause event loss and can be done by triggering a deploy or running a command like the following:

Terminal
kubectl rollout restart deployment/plugins-ingestion -n posthog

It's ok to do this when you're not fully sure what's wrong with the ingestion pipeline.

Scaling the ingestion pods

If you suspect that we're struggling due to high load, then you can scale vertically or horizontally.

To scale vertically (i.e. give the pods more resources), you should update the requests and limits for CPU and memory on the relevant section of the posthog-cloud-infra repo.

To scale horizontally (i.e. add more pods), you should update the min and max hpa values on the relevant section of the posthog-cloud-infra repo.

Dealing with hot partitions

If you see from the Kafka consumer lag graph (US Cloud link) that our lag seems to stem from only one or a few partitions, that's likely because we have a hot partition.

We partition the events_plugin_ingestion topic by <team_id>:<distinct_id>, meaning we're likely getting a burst of events for a given person in a short period of time, causing a hot partition.

There are a few possible mitigation steps for this:

  1. Wait. If the burst is temporary we should recover in time.
  2. Discover the problematic IDs and force events being sent to them to be randomly partitioned.

In order to do #2, you should first try to discover what IDs could be causing problems. Here's a useful query to run on ClickHouse:

SQL
SELECT
team_id,
distinct_id,
count(*) AS c
FROM events
WHERE
timestamp > now() - INTERVAL 1 HOUR AND
timestamp < now()
GROUP BY
team_id,
distinct_id
ORDER BY c DESC
LIMIT 20

If an ID stands out, you can do some further digging with:

SQL
SELECT
toStartOfMinute(timestamp) ts,
count(*)
FROM events
WHERE
team_id=<team_id> AND
distinct_id=<distinct_id> AND
timestamp > now() - INTERVAL 1 DAY AND
timestamp < now()
GROUP BY ts

Running the above query on Metabase gives you a very useful visualization, where you can see if events for this ID have increased significantly recently or if the volume is always high.

Often times this graph will give you a clear indication of if an ID caused the hot partition or not.

If you've established that there's likely an ID causing a hot partition, it's worth peeking into the properties for the person it's associated to:

SQL
WITH p AS (
SELECT
id,
properties
FROM person
WHERE
team_id=<team_id>
),
pdi AS (
SELECT person_id
FROM person_distinct_id2
WHERE
team_id=<team_id> AND
distinct_id=<distinct_id>
)
SELECT p.properties
FROM p JOIN pdi
ON p.id = pdi.person_id

If it looks like the properties are all / mostly generic (e.g. initial referrers, OS, properties PostHog sets by default) or the hot partition problem is very serious (a lot of lag), you can then copy the approach from this PR to force the events for the ID to be randomly partitioned.

We check the person properties because if a lot of custom properties are being used random partitioning may cause properties to get inconsistent values.

Dropping and recreating ClickHouse "ingestion tables"

If you've established that ClickHouse is not ingesting data correctly (via Kafka consumer lag graphs, ClickHouse logs, etc), then it could be worth recreating the relevant Kafka table + materialized view pairs that are responsible for ingesting data.

In order to do this, first get the schema for each table:

SQL
-- save the results somewhere
SHOW CREATE TABLE kafka_events_json;
SHOW CREATE TABLE events_json_mv;

Then you should first drop the materialized view (remove the comment to run on all nodes):

SQL
DROP VIEW events_json_mv -- ON CLUSTER 'posthog'

And later the Kafka table:

SQL
DROP TABLE kafka_events_json -- ON CLUSTER 'posthog'

And recreate them in reverse order using the schemas you copied earlier. i.e. Kafka table first, then materialized view. Make sure to add the ON CLUSTER clause if you dropped the tables on the whole cluster.

Restarting the ClickHouse server on a node

Important: You should never do this if you're not comfortable doing so from experience operating ClickHouse! Please loop in someone to do this in case you feel like ClickHouse has gotten into a bad state (from looking at logs, metrics, etc).

In extreme situations a ClickHouse node may have gotten into a bad state. In that case, restarting the ClickHouse server might help. To do this, you should SSH into the node and run:

Terminal
sudo systemctl restart clickhouse-server

Quick reference

A list of relevant resources / information that is useful for debugging ingestion lag.

Kafka topics

  • events_plugin_ingestion: Events sent from the Django events service to the plugin server. Consumed by a group poorly named clickhouse-ingestion.
  • clickhouse_events_json: Events sent from the plugin server to ClickHouse. Consumed by a group called group1 (and also by a group called clickhouse-plugin-server-async for the purpose of exports, but that's irrelevant here).
  • conversion_events_buffer: Events that should wait (60s by default) to be processed. Produced to from the plugin server and consumed by the plugin server using a group called ingester.

Tools

Questions?

Was this page useful?

Next article

Jobs not executing

Important: Please ensure you have access to production so that you are able to handle incidents! Background Jobs are an important functionality of PostHog apps / plugins. Among other things, they power much of our exports functionality. In order to debug jobs not working properly, it's important to understand the following: Jobs can be triggered from plugin servers with any capability Jobs can only be processed from plugin servers with the jobs capability In our Cloud environments…

Read next article