Segfault when invoking Julia callback function from C

I am trying to pass a Julia function to a C library (aws-c-mqtt) which will use it as a callback when an asynchronous event occurs. I am running in to trouble because some of the parameters passed to this callback appear corrupted on the Julia side of the execution, but appear correct on the C side. This rr backtrace illustrates what I mean:

(rr) bt
#0  0x00007efcec134770 in julia_on_connection_complete_479 (connection=139624673185440, error_code=0, return_code=1, session_present=0 '\000', userdata=139624665866896)
    at /home/salmon/Documents/code/aws-c-mqtt-generator/test/runtests.jl:23
#1  0x00007efcec14051b in jlcapi_on_connection_complete_485 ()
#2  0x00007efce6c27213 in s_packet_handler_connack (connection=0x19e43c0, message_cursor=...) at /workspace/srcdir/aws-c-mqtt/source/client_channel_handler.c:165
#3  0x00007efce6c27de6 in s_process_mqtt_packet (connection=0x19e43c0, packet_type=AWS_MQTT_PACKET_CONNACK, packet=...) at /workspace/srcdir/aws-c-mqtt/source/client_channel_handler.c:459

In frames 2 and 3, you can see that connection=0x19e43c0. The connection pointer is originally given to the C library from Julia, and if you printed connection from Julia, you would see this address. Therefore, this is the correct address. Further inspection of the memory at this address in rr confirms that this is correct. The problem arises when passing this pointer to my callback. In frame 0, you can see that connection has changed: it is now connection=139624673185440.

Julia Code

This is the full Julia code (except the JLL wrapper). Relevant points are the definitions of on_connection_complete and on_connection_complete_cb, and the call to aws_mqtt_client_connection_connect.

# set substitute-path /workspace/srcdir ../local_sources

GC.enable(false)
GC.enable_finalizers(false)

using Test, LibAWSMQTT, CountDownLatches

const held_refs = Vector{Ref}()
const received_on_connection_complete = CountDownLatch(1)

function on_connection_interrupted(connection, error_code, userdata)
    @warn "Connection Interrupted" error_code
end

function on_connection_resumed(connection, return_code, session_present, userdata)
    @info "Connection Resumed" return_code session_present
    if !session_present
        @info "Resubscribing..."
        packet_id = aws_mqtt_resubscribe_existing_topics(connection, s_on_resubscribed, C_NULL)
        if packet_id == 0
            error("Failed to resubscribe aws_last_error=$(aws_last_error())")
        end
    end
end

function on_connection_complete(connection::Ptr{aws_mqtt_client_connection}, error_code::Cint, return_code::Cint, session_present::Cuchar, userdata::Ptr{Cvoid})
    count_down(received_on_connection_complete)
    println("on_connection_complete on_connection_complete on_connection_complete on_connection_complete")
    return nothing
end

struct UserData
    connection::Ptr{aws_mqtt_client_connection}
end

function aws_iot_client_test_main()
    endpoint = "a19vzmsjwkvm63-ats.iot.us-east-1.amazonaws.com"
    client_id_name = "test-client-id"
    subscribe_topic = "mytopic"
    will_payload = "The client has gone offline!"
    ca_filepath = joinpath(@__DIR__, "certs", "AmazonRootCA1.pem")
    cert_filepath = joinpath(@__DIR__, "certs", "5909390f9eff261d9cf92331e0e6dfc3e0e673b0005c272ee258dd3cb192fffb-certificate.pem.crt")
    pri_key_filepath = joinpath(@__DIR__, "certs", "5909390f9eff261d9cf92331e0e6dfc3e0e673b0005c272ee258dd3cb192fffb-private.pem.key")

    allocator = aws_default_allocator()

    aws_mqtt_library_init(allocator)

    el_group = aws_event_loop_group_new_default(allocator, 1, C_NULL)

    resolver_options = Ref(aws_host_resolver_default_options(8, el_group, C_NULL, C_NULL))
    resolver = aws_host_resolver_new_default(allocator, resolver_options)

    bootstrap_options = Ref(aws_client_bootstrap_options(el_group, resolver, C_NULL, C_NULL, C_NULL))
    bootstrap = aws_client_bootstrap_new(allocator, bootstrap_options)

    tls_ctx_opt = Ref(aws_tls_ctx_options(ntuple(_ -> UInt8(0), 200)))
    @test AWS_OP_SUCCESS ==
          aws_tls_ctx_options_init_client_mtls_from_path(tls_ctx_opt, allocator, cert_filepath, pri_key_filepath)
    @test tls_ctx_opt[].allocator == allocator
    @test AWS_OP_SUCCESS == aws_tls_ctx_options_set_alpn_list(tls_ctx_opt, "x-amzn-mqtt-ca")
    @test AWS_OP_SUCCESS == aws_tls_ctx_options_override_default_trust_store_from_path(tls_ctx_opt, C_NULL, ca_filepath)

    tls_ctx = aws_tls_client_ctx_new(allocator, tls_ctx_opt)
    @test tls_ctx != C_NULL

    aws_tls_ctx_options_clean_up(tls_ctx_opt)

    tls_connection_options =
        Ref(aws_tls_connection_options(C_NULL, C_NULL, C_NULL, C_NULL, C_NULL, C_NULL, C_NULL, false, 0))
    aws_tls_connection_options_init_from_ctx(tls_connection_options, tls_ctx)

    client = aws_mqtt_client_new(allocator, bootstrap)
    @show client
    connection = aws_mqtt_client_connection_new(client)
    @show connection
    ud = UserData(connection)

    GC.@preserve connection begin
        socket_options = Ref(aws_socket_options(AWS_SOCKET_STREAM, AWS_SOCKET_IPV6, 3000, 0, 0, 0, false))

        host_name_cur = Ref(aws_byte_cursor_from_c_str(endpoint))
        @show host_name_cur
        client_id_cur = Ref(aws_byte_cursor_from_c_str(client_id_name))

        on_connection_interrupted_cb =
            @cfunction on_connection_interrupted Cvoid (Ptr{aws_mqtt_client_connection}, Int, Ptr{Cvoid})
        on_connection_resumed_cb = @cfunction on_connection_resumed Cvoid (Ptr{aws_mqtt_client_connection},aws_mqtt_connect_return_code,Bool,Ptr{Cvoid},)
        @test AWS_OP_SUCCESS == aws_mqtt_client_connection_set_connection_interruption_handlers(
            connection,
            on_connection_interrupted_cb,
            C_NULL,
            on_connection_resumed_cb,
            C_NULL,
        )

        subscribe_topic_cur = Ref(aws_byte_cursor_from_c_str(subscribe_topic))
        will_cur = Ref(aws_byte_cursor_from_c_str(will_payload))
        aws_mqtt_client_connection_set_will(connection, subscribe_topic_cur, AWS_MQTT_QOS_AT_LEAST_ONCE, false, will_cur)

        on_connection_complete_cb = @cfunction(on_connection_complete, Cvoid, (Ptr{aws_mqtt_client_connection},Cint,Cint,Cuchar,Ptr{Cvoid}))
        GC.@preserve allocator el_group resolver bootstrap client connection socket_options tls_connection_options ud begin
            @show connection
            @show on_connection_complete_cb
            push!(held_refs, Ref(on_connection_complete_cb))
            conn_options = Ref(
                aws_mqtt_connection_options(
                    host_name_cur[],
                    UInt16(8883),
                    Base.unsafe_convert(Ptr{aws_socket_options}, socket_options),
                    Base.unsafe_convert(Ptr{aws_tls_connection_options}, tls_connection_options),
                    client_id_cur[],
                    0,
                    0,
                    0,
                    on_connection_complete_cb,
                    Base.unsafe_convert(Ptr{Cvoid}, Ref(ud)), # user_data
                    true,
                ),
            )
            push!(held_refs, conn_options)
            aws_mqtt_client_connection_connect(connection, conn_options)
            await(received_on_connection_complete)
            println("connected connected connected connected connected connected")
        end
    end
end

@testset "LibAWSMQTT" begin
    @testset "aws_iot_client_test" begin
        aws_iot_client_test_main()
    end
end
C Code

mqtt/source/client_channel_handler.c:165

MQTT_CLIENT_CALL_CALLBACK_ARGS(
    connection, on_connection_complete,
    AWS_OP_SUCCESS, connack.connect_return_code, connack.session_present);

MQTT_CLIENT_CALL_CALLBACK_ARGS is a variadic macro defined like so:

#define MQTT_CLIENT_CALL_CALLBACK_ARGS(client_ptr, callback, ...)                                                      \
    do {                                                                                                               \
        if ((client_ptr)->callback) {                                                                                  \
            (client_ptr)->callback((client_ptr), __VA_ARGS__, (client_ptr)->callback##_ud);                            \
        }                                                                                                              \
    } while (false)

Looking at the C code, connection is passed from the s_packet_handler_connack function to the callback function without modification. I don’t understand what is causing its value to change when invoking julia_on_connection_complete_479.

Ultimately, the error I get when the callback is invoked is a segfault:

Thread 3 received signal SIGSEGV, Segmentation fault.
[Switching to Thread 70533.70674]
0x00007efcec134770 in julia_on_connection_complete_479 (connection=139624673185440, error_code=0, return_code=1, session_present=0 '\000', userdata=139624665866896) at /home/salmon/Documents/code/aws-c-mqtt-generator/test/runtests.jl:23

Debugging steps I’ve tried so far that are worth noting:

  • I can ccall the callback just fine. The segfault occurs when it’s called from the C library.
  • I have tried combinations of various Julia types and C types in the cfunction argument tuple and return type to debug possible argument passing errors.
  • I have tried creating the cfunction in the global scope.
  • I have tried preserving (GC.@preserve) all relevant data and even disabled the GC. This has no effect on the error, so this doesn’t appear to be a GC problem.

I have also uploaded a zip of the entire project here if anyone wants to reproduce it locally.

Julia functions being called from C must adhere to a few rules.
They must either be called on a Julia worker thread (e.g. the backtrace needs to go back into Julia),
or they must not interact with the Julia runtime. The only excempt function is uv_async_send and
AsyncConditions.

In your code you are trying to do arbitrary IO and from a “foreign” thread.

I wrote a package ForeignCallbacks.jl that aims to make the latter use-case less error prone.

Edit:
I am actually not sure if you are running into this use-case since it is not immediately clear if the backtrace you supplied is just really short or you cut it off and it does reach back into Julia.

Here is the full backtrace for you:

(rr) c
Continuing.
client = Ptr{aws_mqtt_client} @0x0000000001a0dbf0
connection = Ptr{aws_mqtt_client_connection} @0x00000000019e43c0
host_name_cur = Base.RefValue{aws_byte_cursor}(aws_byte_cursor(0x000000000000002e, Ptr{UInt8} @0x00007efd05553ad8))
connection = Ptr{aws_mqtt_client_connection} @0x00000000019e43c0
on_connection_complete_cb = Ptr{Nothing} @0x00007efcec140490
[New Thread 70533.70675]
[New Thread 70533.70674]
[New Thread 70533.70534]

Thread 3 received signal SIGSEGV, Segmentation fault.
[Switching to Thread 70533.70674]
0x00007efcec134770 in julia_on_connection_complete_479 (connection=139624673185440, error_code=0, return_code=1, session_present=0 '\000', userdata=139624665866896) at /home/salmon/Documents/code/aws-c-mqtt-generator/test/runtests.jl:23
23          end
(rr) bt
#0  0x00007efcec134770 in julia_on_connection_complete_479 (connection=139624673185440, error_code=0, return_code=1, session_present=0 '\000', userdata=139624665866896)
    at /home/salmon/Documents/code/aws-c-mqtt-generator/test/runtests.jl:23
#1  0x00007efcec14051b in jlcapi_on_connection_complete_485 ()
#2  0x00007efce6c27213 in s_packet_handler_connack (connection=0x19e43c0, message_cursor=...) at /workspace/srcdir/aws-c-mqtt/source/client_channel_handler.c:165
#3  0x00007efce6c27de6 in s_process_mqtt_packet (connection=0x19e43c0, packet_type=AWS_MQTT_PACKET_CONNACK, packet=...) at /workspace/srcdir/aws-c-mqtt/source/client_channel_handler.c:459
#4  0x00007efce6c2830a in s_process_read_message (handler=0x19e43e8, slot=0x7efce002d3b0, message=0x7efce000dac0) at /workspace/srcdir/aws-c-mqtt/source/client_channel_handler.c:573
#5  0x00007efce6c33416 in aws_channel_handler_process_read_message (handler=0x19e43e8, slot=0x7efce002d3b0, message=0x7efce000dac0) at /workspace/srcdir/aws-c-io/source/channel.c:1000
#6  0x00007efce6c32761 in aws_channel_slot_send_message (slot=0x7efce0001770, message=0x7efce000dac0, dir=AWS_CHANNEL_DIR_READ) at /workspace/srcdir/aws-c-io/source/channel.c:760
#7  0x00007efce6c4c2b6 in s_s2n_handler_process_read_message (handler=0x7efce0011f20, slot=0x7efce0001770, message=0x7efce00099c0)
    at /workspace/srcdir/aws-c-io/source/s2n/s2n_tls_channel_handler.c:538
#8  0x00007efce6c33416 in aws_channel_handler_process_read_message (handler=0x7efce0011f20, slot=0x7efce0001770, message=0x7efce00099c0) at /workspace/srcdir/aws-c-io/source/channel.c:1000
#9  0x00007efce6c32761 in aws_channel_slot_send_message (slot=0x7efce0011b60, message=0x7efce00099c0, dir=AWS_CHANNEL_DIR_READ) at /workspace/srcdir/aws-c-io/source/channel.c:760
#10 0x00007efce6c4fe3d in s_do_read (socket_handler=0x7efce0011e10) at /workspace/srcdir/aws-c-io/source/socket_channel_handler.c:164
#11 0x00007efce6c5013c in s_on_readable_notification (socket=0x7efce0000ca0, error_code=0, user_data=0x7efce0011e10) at /workspace/srcdir/aws-c-io/source/socket_channel_handler.c:221
#12 0x00007efce6c49252 in s_on_socket_io_event (event_loop=0x21f0810, handle=0x7efce0000d98, events=3, user_data=0x7efce0000ca0) at /workspace/srcdir/aws-c-io/source/posix/socket.c:1618
#13 0x00007efce6c41f20 in s_main_loop (args=0x21f0810) at /workspace/srcdir/aws-c-io/source/linux/epoll_event_loop.c:624
#14 0x00007efce6f2bada in thread_fn (arg=0x1dcb4c0) at /workspace/srcdir/aws-c-common/source/posix/thread.c:137
#15 0x00007efd195bf2a5 in start_thread (arg=0x7efce69c7640) at pthread_create.c:481
#16 0x00007efd194e7323 in clone () at ../sysdeps/unix/sysv/linux/x86_64/clone.S:95

So yes this is a “foreign” thread and the segmentation fault you are seeing is because you are trying to interact with the runtime.

Also I am curious you define:

on_connection_complete_cb = @cfunction(
    on_connection_complete, 
    Cvoid,
    (Ptr{aws_mqtt_client_connection},Cint,Cint,Cuchar,Ptr{Cvoid}))

and the library defines it as:

typedef void(aws_mqtt_client_on_connection_complete_fn)(
    struct aws_mqtt_client_connection *connection,
    int error_code,
    enum aws_mqtt_connect_return_code return_code,
    bool session_present,
    void *userdata);

Are you sure that enum has the correct underlying type?

Are you sure that enum has the correct underlying type?

No, I’m not sure. Cuchar is my best guess from reading the Julia documentation. Please do let me know if you think it should be something else.

So yes this is a “foreign” thread and the segmentation fault you are seeing is because you are trying to interact with the runtime.

Okay, good to know why this happens. It is also the case that if I don’t do much of anything in the callback (like simply return nothing and nothing else), the segfault does not occur. (even with -O0). I guess this lines up with your explanation.

I guess my question now becomes, how else I should be implementing this functionality? With your ForeignCallbacks package, it’s my understanding that:

  1. I create a ForeignCallback instance.
  2. Inside my on_connection_complete callback function, I ccall the notify! function.
  3. All the logic I want to put into on_connection_complete is moved into the foreign callback closure.

Do I understand that correctly?

Yes, except for the point that you don’t need to ccall the notify function you can directly call it from your callback. In the demo I should probably use Julia’s @threadcall :slight_smile: to a custom callback which then constructs the message type and calls notify from there. I sadly got distracted from the project where I needed it for.

Ok, I now have:

struct Message
    error_code::Cint
    return_code::Cint
    session_present::Cuchar
end

callback = ForeignCallbacks.ForeignCallback{Message}() do msg
    @show msg
    count_down(received_on_connection_complete)
    return nothing
end
token = ForeignCallbacks.ForeignToken(callback)

function on_connection_complete(connection::Ptr{aws_mqtt_client_connection}, error_code::Cint, return_code::Cint, session_present::Cuchar, userdata::Ptr{Cvoid})
    ForeignCallbacks.notify!(token, Message(error_code, return_code, session_present))
    return nothing
end

Which results in this segfault:

(rr) c
Continuing.
client = Ptr{aws_mqtt_client} @0x00000000046ab6a0
connection = Ptr{aws_mqtt_client_connection} @0x00000000047143e0
host_name_cur = Base.RefValue{aws_byte_cursor}(aws_byte_cursor(0x000000000000002e, Ptr{UInt8} @0x00007f0d761ebe98))
connection = Ptr{aws_mqtt_client_connection} @0x00000000047143e0
on_connection_complete_cb = Ptr{Nothing} @0x00007f0d5cdd8d70
[New Thread 180422.180467]
[New Thread 180422.180466]
[New Thread 180422.180423]

Thread 3 received signal SIGSEGV, Segmentation fault.
[Switching to Thread 180422.180466]
0x00007f0d5cdcceea in julia_on_connection_complete_488 (connection=16, error_code=0, return_code=80, session_present=0 '\000', userdata=1) at /home/salmon/Documents/code/aws-c-mqtt-generator/test/runtests.jl:40
40      function on_connection_complete(connection::Ptr{aws_mqtt_client_connection}, error_code::Cint, return_code::Cint, session_present::Cuchar, userdata::Ptr{Cvoid})
(rr) bt
#0  0x00007f0d5cdcceea in julia_on_connection_complete_488 (connection=16, error_code=0, return_code=80, session_present=0 '\000', userdata=1)
    at /home/salmon/Documents/code/aws-c-mqtt-generator/test/runtests.jl:40
#1  0x00007f0d5cdd8dfb in jlcapi_on_connection_complete_493 ()
#2  0x00007f0d578bf213 in s_packet_handler_connack (connection=0x47143e0, message_cursor=...) at /workspace/srcdir/aws-c-mqtt/source/client_channel_handler.c:165
#3  0x00007f0d578bfde6 in s_process_mqtt_packet (connection=0x47143e0, packet_type=AWS_MQTT_PACKET_CONNACK, packet=...) at /workspace/srcdir/aws-c-mqtt/source/client_channel_handler.c:459
#4  0x00007f0d578c030a in s_process_read_message (handler=0x4714408, slot=0x7f0d4802d3b0, message=0x7f0d4800dac0) at /workspace/srcdir/aws-c-mqtt/source/client_channel_handler.c:573
#5  0x00007f0d578cb416 in aws_channel_handler_process_read_message (handler=0x4714408, slot=0x7f0d4802d3b0, message=0x7f0d4800dac0) at /workspace/srcdir/aws-c-io/source/channel.c:1000
#6  0x00007f0d578ca761 in aws_channel_slot_send_message (slot=0x7f0d48001770, message=0x7f0d4800dac0, dir=AWS_CHANNEL_DIR_READ) at /workspace/srcdir/aws-c-io/source/channel.c:760
#7  0x00007f0d578e42b6 in s_s2n_handler_process_read_message (handler=0x7f0d48011f20, slot=0x7f0d48001770, message=0x7f0d480099c0)
    at /workspace/srcdir/aws-c-io/source/s2n/s2n_tls_channel_handler.c:538
#8  0x00007f0d578cb416 in aws_channel_handler_process_read_message (handler=0x7f0d48011f20, slot=0x7f0d48001770, message=0x7f0d480099c0) at /workspace/srcdir/aws-c-io/source/channel.c:1000
#9  0x00007f0d578ca761 in aws_channel_slot_send_message (slot=0x7f0d48011b60, message=0x7f0d480099c0, dir=AWS_CHANNEL_DIR_READ) at /workspace/srcdir/aws-c-io/source/channel.c:760
#10 0x00007f0d578e7e3d in s_do_read (socket_handler=0x7f0d48011e10) at /workspace/srcdir/aws-c-io/source/socket_channel_handler.c:164
#11 0x00007f0d578e813c in s_on_readable_notification (socket=0x7f0d48000ca0, error_code=0, user_data=0x7f0d48011e10) at /workspace/srcdir/aws-c-io/source/socket_channel_handler.c:221
#12 0x00007f0d578e1252 in s_on_socket_io_event (event_loop=0x38aa850, handle=0x7f0d48000d98, events=3, user_data=0x7f0d48000ca0) at /workspace/srcdir/aws-c-io/source/posix/socket.c:1618
#13 0x00007f0d578d9f20 in s_main_loop (args=0x38aa850) at /workspace/srcdir/aws-c-io/source/linux/epoll_event_loop.c:624
#14 0x00007f0d57bc3ada in thread_fn (arg=0x468c210) at /workspace/srcdir/aws-c-common/source/posix/thread.c:137
#15 0x00007f0d8a2572a5 in start_thread (arg=0x7f0d4f657640) at pthread_create.c:481
#16 0x00007f0d8a17f323 in clone () at ../sysdeps/unix/sysv/linux/x86_64/clone.S:95

I guess I misunderstand something about how to use ForeignCallbacks. Do you see the error?

It is very odd to me that connection=16 in the Julia callback invocation.

Thank you for your help so far.

Yes that is very weird. That smells like an ABI disagreement between your @cfunction and what C expects.

You can’t pass in token as a global. Do you have control of userdata? If so that would be the place to pass it through.

That smells like an ABI disagreement between your @cfunction and what C expects.

Indeed. I’ve spent a while debugging this, so if you have any insights or debugging tips, I would very much appreciate it.

You can’t pass in token as a global. Do you have control of userdata ? If so that would be the place to pass it through.

Yes, I do. I’ll go that route.

@vchuravy I am quite stuck on this problem now. If you have any ideas on how I can continue debugging this, I would appreciate hearing them.

Thanks to Valentin for meeting with me offline. We arrived at a final solution to this problem:

Declare a callback this way:

struct Message
	error_code::Cint
	return_code::Cint
	session_present::Cuchar
end

callback = ForeignCallbacks.ForeignCallback{Message}() do msg
	# actual callback impl in here
	return nothing
end

function on_connection_complete(connection::Ptr{aws_mqtt_client_connection}, error_code::Cint, return_code::Cint, session_present::Cuchar, userdata::Ptr{Cvoid})
	# convert and then load the pointer because this function must be type-stable
	token = Base.unsafe_load(Base.unsafe_convert(Ptr{ForeignCallbacks.ForeignToken}, userdata))
	ForeignCallbacks.notify!(token, Message(error_code, return_code, session_present))
	return nothing
end

Store the user_data this way:

token = Ref(ForeignCallbacks.ForeignToken(callback))
Base.unsafe_convert(Ptr{Cvoid}, token) # this is the user_data

Then you can create the callback C function:

on_connection_complete_cb = @cfunction(on_connection_complete, Cvoid, (Ptr{aws_mqtt_client_connection},Cint,Cint,Cuchar,Ptr{Cvoid}))

Relevant variables like token and on_connection_complete_cb must be GC.@preserve-ed while in use.

4 Likes