Kafka.jl usage


#1

Hi there ! Does anyone use a Kafka.jl package? I have a problem with connection with one of my kafka topic, below is my code:

using Kafka
kc = KafkaClient("my_host", 6667)

consumer = fetch(kc, "my_topic", 0, 0)

while true                 
        for each in take!(consumer)
              println(each)
        end         
end

Code stops on header = recv_header(sock) command in file “client.jl” (part of kafka.jl package) and I do not know why, does anyone have an idea?

function handle_response(kc::KafkaClient, sock::TCPSocket)
       header = recv_header(sock)

Might it be because I have 4 kafka brokers?

Thanks !


#2

fetch in Kafka.jl is a rough equivalent of sending a single FetchRequest, i.e. reading a single chunk of data from a particular topic and partition. Thus the line:

consumer = fetch(kc, "my_topic", 0, 0)

is semantically incorrect: instead of a consumer object, fetch returns a one-time channel with a single chunk of response data that you can read using:

channel = fetch(kc, "my_topic", 0, 0)
message = take!(channel)

The advantage of channels over normal response is that they are asynchronous, and this exactly what Kafka is about. For example, instead of waiting for a message to appear in the channel, you can continue async processing using map:

map(response -> do_smth(response), channel)

Kafka supports many kinds of workflows. Three most different examples are built-in consumer groups which allow to read all topics from several machines and automatically coordinate offsets, Apache Storm which processes messages asynchronously as they arrive and Apache Spark that handles messages in batches. Kafka.jl doesn’t assume any specific workflow, but instead provides flexibility to build any of them. The downside of the current approach is that only relatively low-level API is provided.

Do you have a specific task at hand? If you describe it, I’ll try to come up with the best soution for you and maybe add it to Kafka.jl as a more high-level API.


#3

HI Andrei, Thank you so much for your support, I’ve discovered that it might be the case with Kafka version - on 0.10 Kafka.jl works very nice, but I have a problem on 0.9. Do you know if Kafka.jl works on earlier releases of Kafka?

Thanks !


#4

Do you mean the code you have posted in your first message? As I’ve told this is the wrong usage of the API, and if it works for Kafka 0.10, it should be a coincidence. What do you try to achieve?


#5

Hi, here is my code I use now:

using Kafka

kc = KafkaClient("host", 6667)
md_channel = metadata(kc, ["topic"])
md = take!(md_channel)
start_offset = take!(earliest_offset(kc, "topic", 0))             

for each in take!(fetch(kc, "topic", 0, start_offset))
            println(each)
            start_offset = (take!(latest_offset(kc, "topic", 0)))
 end

and it works perfectly on cluster with Kafka v 0.10, but does not on another cluster with Kafka 0.9, so that’s why I was wondering whether it’s because of Kafka version.


#6

Ah, I see. First of all, a quick reminder of what Kafka is made of.

Apache Kafka keeps data in topics, where each topic is split into a set of partitions. Roughly speaking, each partition is a file consisting of a long list of messages, each with a unique offset.

Unlike most other queues, Kafka doesn’t remove messages from topics when all consumers have read them. In fact, in its core, Kafka doesn’t know about consumers at all. Instead, Kafka removes messages after a preconfigured rotation period, and consumers themselves are responsible for keeping track of offsets. If consumer stops and doesn’t read anything from Kafka for longer than rotation period, It’s consumer’s problem - Kafka doesn’t even have a way to check if a consumer is up to date.

All Kafka brokers know is what offset range is available in particular topic and partition, and this is exactly what earliest_offset and latest_offset refer to. The only reason you may want to use these functions is to check that an offset you try to read is in the range.

So what about consumer offsets, i.e. a pointer to the latest message that the consumer has read? It turns out there’s no way to make a single algorithm that works for all cases. Consider such an example:

  1. You start reading a particular topic partition from offset 1000 and read a batch of 100 messages.
  2. During processing of this batch, an error occurs.

You recover from the error, but now should you start reading from offset 1000 or 1100? In practice, it depends on the client logic - sometimes you should, sometimes not. So the punchline is that only the client can decide how to handle offsets.

Let’s have some practice. Let’s say we want to read a topic “my_topic” and partition 0 (don’t forget that there might be other partitions as well!) and we want to start reading from the very first available message:

offset = take!(earliest_offset(kc, "my_topic", 0))

or we want to skip all the previous messages and read only new onces:

offset = take!(latest_offset(kc, "my_topic", 0))

or maybe we have a saved offset and want to start from there:

offset = load_from_some_storage()

Anyway, we now know our starting point. Let’s read a bunch of data from Kafka:

ch = fetch(kc, "my_topic", 0, offset)
messages = take!(ch)

What have we done just yet? We sent a single fetch request to a Kafka broker and broker returned a set of messages. Note, that we can’t control how many messages we want to read in a single request, but we can specify a size of a buffer and max wait time (see the options).

We can iterate over this batch:

for msg in messages
    println(msg)
end

But surely it won’t read the next batch automatically. Here’s an example of what we can get as a result:

(0,UInt8[],UInt8[0x68,0x65,0x6c,0x6c,0x6f,0x20,0x77,0x6f,0x72,0x6c,0x64])
(1,UInt8[],UInt8[0x73,0x61,0x73,0x64,0x66])
(2,UInt8[],UInt8[0x61,0x73,0x64,0x66,0x61,0x73,0x64,0x66])
(3,UInt8[],UInt8[0x61,0x73,0x64,0x66,0x61,0x73,0x64,0x66,0x61,0x73,0x64,0x66])
(4,UInt8[],UInt8[0x73,0x61,0x64,0x67,0x64])
(5,UInt8[],UInt8[0x61,0x66,0x67])
(6,UInt8[],UInt8[0x64,0x66,0x67])
(7,UInt8[],UInt8[0x64,0x67,0x68,0x66])
...

Each message here is a tuple of 3 elements - an offset, a key and a value. The key and the value are what their names stand for, but why do we need an offset? It turns out Kafka can sometimes skip particular offsets, so in fact, if you start with offset 1000 and read 100 messages, it doesn’t mean you will end up in 1100th offset. It’s not a very frequent case, but it’s worth to take it into account. So to read the next batch you should do something like this:

next_offset = messages[end][1] + 1
next_messages = take!(fetch(kc, "my_topic", 0, next_offset))

or, more practically:

offset = ...
while true
    messages = take!(fetch(kc, "my_topic", 0, offset))
    process_batch(messages)
    if !isempty(messages)        # if there are no messages during timout period, empty array will be returned
        offset = messages[end][1] + 1
    end
end

Note that process_batch() handles the whole batch of messages and may itself look something like:

function process_batch(messages)
    for msg in messages
        process_single_message(msg)
    end
end

The final copy-paste example:

kc = KafkaClient("localhost", 9092)
offset = take!(earliest_offset(kc, "my_topic", 0))
while true
    messages = take!(fetch(kc, "my_topic", 0, offset))
    for msg in messages
        println(msg)
    end
    if !isempty(messages)
        offset = messages[end][1] + 1
    end
end

#7

Hi, Andrei, thanks for amazing explanation ! It’s much clearly for me know and thankfully my code runs :wink: however I do not know why, it seems that it was because I had an addiction line in function handle_response

function handle_response(kc::KafkaClient, sock::TCPSocket)
       header = recv_header(sock)
       println("ok") # my added line

because when I add the line again it stops.

Thank you again for your help !


#8

Hi, just one more case: my program works about 3 minutes and then i have an error, which looks like this, but the key’s numbers are different. Do you know what could be wrong?:


#9

What tag / commit are you on? Do you still have any local modifications to the project? Neither master, nor v0.1.0 tag correspond to lines in your output.


#10

I shouldn’t have any changes compare to master branch, 33 line in client.jl in my file is handle_response(kc, sock), because I’ve delete two empty lines in the beginning :wink:


#11

Can you post the code you are using? Also, what’s approximate rate of messages in Kafka do you have?


#12

On “my_topic” there are 160 thousands new records every 30 seconds.

Ok, I think I’ve fixed something a little, now it stops without an error or any message, below is my code:

using Kafka
using DataFrames
kc = KafkaClient("localhost", 6667)

offset_message = take!(earliest_offset(kc, "my_topic", 0))

while true
        messages = take!(fetch(kc, "my_topic", 0, offset_message))
        for each in messages

             time = string(now())           
             keys = [convert(Vector{UInt8}, "1")]
             values = [convert(Vector{UInt8}, time)]
             messages2 = collect(zip(keys, values))
             offset = produce(kc, "my_second_topic", 0, messages2)
    
        end
        
        if !isempty(messages)
            offset_message = messages[end][1] + 1
        end

end

Thanks !


#13

Looks like some messages are deserialized incorrectly. I’ll take a look at in the evening. Can tell the exact Kafka version you are using in this test?


#14

We are using 0.9.0.2.4. Thanks a lot !


#15

This looks like a version of Java library, so I used Kafka server 2.11-0.9.0.1. However, I wasn’t able to reproduce your error even after 20 minutes of reading from local Kafka. Could you please completely reset the package code on your machine? On Linux, you can do it like this:

cd ~/.julia/v0.5/Kafka
git reset --hard HEAD
git pull origin master

Also, note that the code you have posted is very inefficient:

for each in messages
    ...
    offset = produce(kc, "my_second_topic", 0, messages2)
end

You read the whole batch of messages in a single request to Kafka, but make a produce request for each of these messages. In my test, I’ve got 25k messages in a batch, so I had to do 25k produce requests. Kafka can handle it, but a much more performant option would be to do something like this:

messages = take!(fetch(kc, "my_topic", 0, offset))
new_messages = process_consumed_messages(messages)
take!(produce(kc, "my_second_topic", 0, new_messages))

In this case each fetch request will be followed by only one produce request.


#16

Hi, thanks, I was working during the week on this bug, but i didn’t move forward. I think it might be something inside my kafka broker. If I find out what’s wrong, I’ll let you know.