Kafka compaction tuning

Kafka offers two cleanup policies, which seems simple enough: “delete”, where data is deleted after a certain amount of time, and “compact”, where only the most recent value is kept for any key. But what if data is not deleted/compacted as expected?

You should never expect a topic to be instantly compacted, because it is an asynchronous process. Still, sometimes a topic contains far more values for a key than you would expect, or very old keys that should already be gone. How to debug? What is happening in the background?

Segments

As you probably know, Kafka data is separated into partitions. Internally, a partition is stored in segments: The active segment and several older inactive segments. It is interesting to look at the actual files in your Kafka broker, shown for partition 0 of “my-topic”:

./var/lib/kafka/data/kafka-log2/my-topic-0:
total 2484656
   1802632 Feb 23 12:50 000011131187.index
1073740950 Feb 23 12:50 000011131187.log
   2703720 Feb 23 12:50 000011131187.timeindex

   1806376 Feb 26 09:24 000012291610.index
1073741407 Feb 26 09:24 000012291610.log
        10 Feb 23 12:50 000012291610.snapshot
   2709444 Feb 26 09:24 000012291610.timeindex

  10485760 Feb 27 10:27 000013374640.index
 386077722 Feb 27 10:02 000013374640.log
        10 Feb 26 09:24 000013374640.snapshot
  10485756 Feb 27 10:27 000013374640.timeindex

        10 Feb 27 10:27 000013787681.snapshot
       112 Feb 27 10:27 leader-epoch-checkpoint
        43 Jan 29 16:35 partition.metadata

Look at the files with the ending .log:

./var/lib/kafka/data/kafka-log2/my-topic-0:
1073740950 Feb 23 12:50 000011131187.log  [inactive]
1073741407 Feb 26 09:24 000012291610.log  [inactive]
 386077722 Feb 27 10:02 000013374640.log  [active]

There is one segment containing events starting with the offset 11131187, one later segment starting with offset 12291610 and the newest segment starting with offset 13374640.

The newest segment must be the active segment, where new events are stored. You can verify this by checking the file size - this file should be growing in size. All older segments are inactive.

Sometimes you will find artefacts like the lonely snapshot file 000013787681.snapshot above. These can be ignored.

Segment rolling

We configured

segment.bytes: 1073741824 (1 GB, which is the default)
segment.ms: 604800000 (1 week, which is the default)

Whenever the active segment reaches a size of 1 GB, it becomes inactive and a new active segment is created; but no later than 1 week after creation. Note that segments are per partition, so we are talking about 1GB per partition, which might be a lot or very little depending on your traffic.

As you can see, both inactive segments in the partition are roughly 1 GB in size.

The rolling of segments is important because all cleanup is done in inactive segments only: The active segment is never cleaned or compacted. If it takes a while until your segment is rolled, this limits your cleanup.

Performance impact

If we want deletion or compaction to happen more often, we might use smaller segments, so the active segment is rolled more frequently. Note that smaller segments increase the number of open file descriptors, which are a limited ressource on most operating systems. More frequent cleanups will increase the cpu load and the disk io, especially during compaction. It is always advised to keep this in mind and monitor it.

You can tune the number of log cleaner threads with log.cleaner.threads and throttle the io with log.cleaner.io.max.bytes.per.second. See Optimizing Kafka broker configuration (strimzi) for more details.

Cleanup policy “delete”

We configured a policy of

cleanup.policy: delete
retention.ms: 604800000 (1 week, which the default))

All events older than a week can be deleted, with the additional constraints that

a) the segment will only be deleted, if all data in the segment is older than retention.ms. (The cleanup will always delete the whole segment).

b) only inactive segments are deleted, never the active segment

The two inactive segments 000011131187 and 000012291610 in the listing above are not yet old enough and therefore kept until they are outdated.

Compacted wood segments

Cleanup policy “compact”

We now look at the topic config

cleanup.policy: compact

Such a compacted topic will eventually contain only one value for any key, the newest value. Note that compaction happens asynchronously, and at any moment there might be multiple values for a key in the topic.

The offsets will not change, and the order will be maintained. By default, keys are kept in a compacted topic forever, but we will discuss deletion from a compacted topic later.

I recommend this video starting at minute 0:34:40 to learn more about the internals of compaction: Deep Dive Into Apache Kafka - Jun Rao (Confluent)

Given you don’t care about older values for a key, compaction is useful when you read a topic from the very beginning: You can see it as a performance optimization, where you can skip most outdated values.

As Jun Rao explains in the video, with a compacted topic you can combine the “consumer catchup mode” (realtime updates) with the “consumer bootstrap mode” (reading all existing keys) using the same listener, is just depends on whether you consume only the new updates or the whole topic from the beginning.

A compacted topic

total 33424
./var/lib/kafka/data/kafka-log2/another-topic-0:
     992 Feb 27 04:56 00000188679.index
22711925 Feb 27 04:56 00000188679.log
    1488 Feb 27 04:56 00000188679.timeindex
    2072 Feb 27 05:56 00002639225.index
 1176762 Feb 27 05:56 00002639225.log
     102 Feb 27 04:56 00002639225.snapshot
    3120 Feb 27 05:56 00002639225.timeindex
    2456 Feb 27 06:56 00002641367.index
 1406491 Feb 27 06:56 00002641367.log
     102 Feb 27 05:56 00002641367.snapshot
    3696 Feb 27 06:56 00002641367.timeindex
    3736 Feb 27 07:56 00002643940.index
 2169167 Feb 27 07:56 00002643940.log
     102 Feb 27 06:56 00002643940.snapshot
    5616 Feb 27 07:56 00002643940.timeindex
10485760 Feb 27 13:56 00002647746.index
 6589951 Feb 27 13:56 00002647746.log
     102 Feb 27 07:56 00002647746.snapshot
10485756 Feb 27 08:30 00002647746.timeindex
     102 Feb 27 08:30 00002649416.snapshot
     393 Feb 27 10:38 leader-epoch-checkpoint
      43 Jan 29 16:35 partition.metadata
total 35908

Let’s again focus at the *.log files

total 33424
./var/lib/kafka/data/kafka-log2/another-topic-0:
22711925 Feb 27 04:56 00000188679.log          [inactive]
 1176762 Feb 27 05:56 00002639225.log          [inactive]
 1406491 Feb 27 06:56 00002641367.log          [inactive]
 2169167 Feb 27 07:56 00002643940.log          [inactive]
 6589951 Feb 27 13:56 00002647746.log          [active]

Here we use a configuration of

segment.bytes: 53687088 (~50 MB)
segment.ms: 3600000 (1 hour)
min.cleanable.dirty.ratio: 0.5 (the default)

The active segment is rolled every hour, so we have more and smaller segments as in the first example. This is on purpose, because we want compaction to happen early, and since the active segment is never compacted, we need to roll faster.

The last segment 00002647746 is the active segment, as we could tell by the growing size. It is obvious, that segments are rolled every hour. But why do we have four inactive segments, and which of them are already compacted?

It is not trivial to tell, whether a segment is compacted (or “clean”). In our case we did look at the data inside the log-files and could tell from the data if it was clean (the last event for a key contained special information - and if a segment contained “last events” only, it was obviously compacted).

We found out that the oldest segment was compacted:

total 33424
./var/lib/kafka/data/kafka-log2/another-topic-0:
22711925 Feb 27 04:56 00000188679.log          [inactive, compacted (clean)]
 1176762 Feb 27 05:56 00002639225.log          [inactive, dirty]
 1406491 Feb 27 06:56 00002641367.log          [inactive, dirty]
 2169167 Feb 27 07:56 00002643940.log          [inactive, dirty]
 6589951 Feb 27 13:56 00002647746.log          [active]

During compaction, the kafka log cleaner will combine all inactive segments to one new segment, keeping the newest values for each key and dropping all outdated values.

00000188679 is the compacted segment from this last compaction, it contains all keys between offset 188679 and offset 2639224 with their latest values. The original segments that once contained these events were deleted when the compacted segment was created.

The min.cleanable.dirty.ratio

What about the other three segments in our example? The segment 00002639225, 00002641367 and 00002643940 have not yet been compacted, because we configured a dirty.ratio of 0.5:

min.cleanable.dirty.ratio: 0.5 (the default)

With a value of 0.5, compaction is only triggered if we have more dirty inactive segments than clean inactive segments.

In our case, sum(dirty_segments) / sum(clean_segments + dirty_segments) is (1176762 + 1406491 + 2169167) / (22711925 + 1176762 + 1406491 + 2169167) = 0.17 < 0.5

If we have a lower dirty-ratio, compacting is triggered more often. If we have a min.cleanable.dirty.ratio of 0.1, only 10% of the inactive segments can be dirty.

More compaction tuning

The config max.compaction.lag.ms (default is very high, 9223372036854775807) allows you to specify a maximum time after which an event must be compacted. This actually forces a segment rolling and overwrites the min.cleanable.dirty.ratio, so it is a bit more “brute force”. This is especially helpful if you are using tombstones (see below) and want to confirm to privacy regulations, for more details see Kafka Improvement Proposals 354.

The config min.compaction.lag.ms (default is 0) specifies a minimal time before compaction is happening. This is helpful if you have consumers that need to see all events before compaction is happening, so the consumer’s lag should never be larger than this value.

Multiple clean segments

Will there always be only one compacted segment? No, when the compacted segment reaches a certain size, it is split into multiple cleaned segments. In my experiments this seemed to be around the configured segment.bytes, which would make sense. Here’s an example:

/var/lib/kafka/data/kafka-log0/some-topic-8:
total 14492
   12288 Mar  1 13:34 .
   53248 Mar  1 14:14 ..
     336 Feb 28 15:32 00000214014.index
10989151 Feb 28 15:32 00000214014.log
     504 Feb 28 15:32 00000214014.timeindex
     112 Mar  1 13:33 00000357160.index
 1579674 Mar  1 13:33 00000357160.log
     102 Feb 28 15:32 00000357160.snapshot
     168 Mar  1 13:33 00000357160.timeindex
     194 Feb 29 06:54 00000426189.snapshot
10485760 Mar  1 14:14 00000573223.index
 2142485 Mar  1 14:14 00000573223.log
     102 Mar  1 13:33 00000573223.snapshot
10485756 Mar  1 14:14 00000573223.timeindex
      49 Mar  1 10:52 leader-epoch-checkpoint
      43 Feb 27 17:24 partition.metadata

It is not easy to tell, but from the contents we found that we have two cleaned and one active segment.

/var/lib/kafka/data/kafka-log0/some-topic-8:
total 14492
10989151 Feb 28 15:32 00000214014.log  [inactive, clean]
 1579674 Mar  1 13:33 00000357160.log  [inactive, clean]
 2142485 Mar  1 14:14 00000573223.log  [active]

One hint that 00000357160 is compacted: You can tell from the timestamps of 00000357160.snapshot ( Feb 28 15:32) vs. 00000357160.log (Mar 1 13:33) that this segment was written during a longer period than the configured rolling of 1h.

The segment from offset 214014 contains all the older keys, while the segment from 357160 contains the new keys that were updated more recently. This becomes important when we talk about the cleanup.policy "compact,delete": It will delete the older compacted segment, when all the data in this segment ist outdated.

Compacted vegetables Photo © Eva Jaeger-Nilius

Deletion and compacted topics

By default, compacted topics get bigger and bigger, as each compaction adds new keys with their latest values. Let’s assume we want to get rid of some outdated keys: How do we delete them?

(Note that compacted events that are no longer useful still increase the min.cleanable.dirty.ratio, so in addition to wasting space they also cause less frequent compaction).

Cleanup policy “compact,delete”

It is possible to combine our two policies, described in Kafka Improvement Proposals 71, with a config of

cleanup.policy: compact,delete

Basically, deletion will happen in addition to compaction, based on the configured retention.ms described above.

Still, all data in the segment must be outdated: And as we have seen, every compaction adds new data to the compacted segment, so this segment is never completely outdated. Only when your clean segment is split into multiple clean segments (see above), the older part finally can get deleted.

Tombstones

An alternative to the cleanup policy compact,delete are tombstones: When a key becomes obsolete, e.g. an entity is deleted, you can send a tombstone to your topic, this is simply a kafka event for the key with a value of null. Keys with tombstones will be deleted during compaction. You can configure delete.retention.ms (not to be confused with retention.ms!), which defines a minimum time that tombstone are kept before they are compacted and deleted, this allows the consumers to see the tombstones.

The creation of tombstones can be triggered by business events and is therefore a bit less generic than the deletion by time. You could create a generic cleaner which sends tombstone for old events, but this process would need to remember (or find out) for which keys tombstones already have been sent.

So, why does my topic not compact?

Coming back to the initial question, why does my topic not compact? One common reason could be that your segments are rolled only rarely. This is especially common with the default of 1 week / 1 GB for the active segment.

It could also be influenced by configs like the min.cleanable.dirty.ratio or min.compaction.lag.ms.

Some rarer causes have been compiled here: Possible Reasons Why a Kafka Topic Is Not Being Compacted.

If deletion is not working as expected, it could be that your oldest segment also contains newer data - especially with the combined cleanup.policy delete,compact.

For debugging, it is very helpful to look at the sizes, timestamps, offsets and contents of the segments in your kafka broker, then you should be able to figure out the problem.

Thanks!

Besides the already mentioned, here are two resources that helped writing this blog:

Deep dive into Apache Kafka storage internals: segments, rolling and retention - (Paolo Patierno, strimzi)

Understanding Kafka Compaction (Ted Naleid)

Thanks for reading, and I am happy to hear your questions, corrections and insights.

Kommentare