Ah, I see. First of all, a quick reminder of what Kafka is made of.
Apache Kafka keeps data in topics, where each topic is split into a set of partitions. Roughly speaking, each partition is a file consisting of a long list of messages, each with a unique offset.
Unlike most other queues, Kafka doesn’t remove messages from topics when all consumers have read them. In fact, in its core, Kafka doesn’t know about consumers at all. Instead, Kafka removes messages after a preconfigured rotation period, and consumers themselves are responsible for keeping track of offsets. If consumer stops and doesn’t read anything from Kafka for longer than rotation period, It’s consumer’s problem - Kafka doesn’t even have a way to check if a consumer is up to date.
All Kafka brokers know is what offset range is available in particular topic and partition, and this is exactly what earliest_offset
and latest_offset
refer to. The only reason you may want to use these functions is to check that an offset you try to read is in the range.
So what about consumer offsets, i.e. a pointer to the latest message that the consumer has read? It turns out there’s no way to make a single algorithm that works for all cases. Consider such an example:
- You start reading a particular topic partition from offset 1000 and read a batch of 100 messages.
- During processing of this batch, an error occurs.
You recover from the error, but now should you start reading from offset 1000 or 1100? In practice, it depends on the client logic - sometimes you should, sometimes not. So the punchline is that only the client can decide how to handle offsets.
Let’s have some practice. Let’s say we want to read a topic “my_topic” and partition 0 (don’t forget that there might be other partitions as well!) and we want to start reading from the very first available message:
offset = take!(earliest_offset(kc, "my_topic", 0))
or we want to skip all the previous messages and read only new onces:
offset = take!(latest_offset(kc, "my_topic", 0))
or maybe we have a saved offset and want to start from there:
offset = load_from_some_storage()
Anyway, we now know our starting point. Let’s read a bunch of data from Kafka:
ch = fetch(kc, "my_topic", 0, offset)
messages = take!(ch)
What have we done just yet? We sent a single fetch request to a Kafka broker and broker returned a set of messages. Note, that we can’t control how many messages we want to read in a single request, but we can specify a size of a buffer and max wait time (see the options).
We can iterate over this batch:
for msg in messages
println(msg)
end
But surely it won’t read the next batch automatically. Here’s an example of what we can get as a result:
(0,UInt8[],UInt8[0x68,0x65,0x6c,0x6c,0x6f,0x20,0x77,0x6f,0x72,0x6c,0x64])
(1,UInt8[],UInt8[0x73,0x61,0x73,0x64,0x66])
(2,UInt8[],UInt8[0x61,0x73,0x64,0x66,0x61,0x73,0x64,0x66])
(3,UInt8[],UInt8[0x61,0x73,0x64,0x66,0x61,0x73,0x64,0x66,0x61,0x73,0x64,0x66])
(4,UInt8[],UInt8[0x73,0x61,0x64,0x67,0x64])
(5,UInt8[],UInt8[0x61,0x66,0x67])
(6,UInt8[],UInt8[0x64,0x66,0x67])
(7,UInt8[],UInt8[0x64,0x67,0x68,0x66])
...
Each message here is a tuple of 3 elements - an offset, a key and a value. The key and the value are what their names stand for, but why do we need an offset? It turns out Kafka can sometimes skip particular offsets, so in fact, if you start with offset 1000 and read 100 messages, it doesn’t mean you will end up in 1100th offset. It’s not a very frequent case, but it’s worth to take it into account. So to read the next batch you should do something like this:
next_offset = messages[end][1] + 1
next_messages = take!(fetch(kc, "my_topic", 0, next_offset))
or, more practically:
offset = ...
while true
messages = take!(fetch(kc, "my_topic", 0, offset))
process_batch(messages)
if !isempty(messages) # if there are no messages during timout period, empty array will be returned
offset = messages[end][1] + 1
end
end
Note that process_batch()
handles the whole batch of messages and may itself look something like:
function process_batch(messages)
for msg in messages
process_single_message(msg)
end
end
The final copy-paste example:
kc = KafkaClient("localhost", 9092)
offset = take!(earliest_offset(kc, "my_topic", 0))
while true
messages = take!(fetch(kc, "my_topic", 0, offset))
for msg in messages
println(msg)
end
if !isempty(messages)
offset = messages[end][1] + 1
end
end