I am trying to pass a Julia function to a C library (aws-c-mqtt) which will use it as a callback when an asynchronous event occurs. I am running in to trouble because some of the parameters passed to this callback appear corrupted on the Julia side of the execution, but appear correct on the C side. This rr backtrace illustrates what I mean:
(rr) bt
#0 0x00007efcec134770 in julia_on_connection_complete_479 (connection=139624673185440, error_code=0, return_code=1, session_present=0 '\000', userdata=139624665866896)
at /home/salmon/Documents/code/aws-c-mqtt-generator/test/runtests.jl:23
#1 0x00007efcec14051b in jlcapi_on_connection_complete_485 ()
#2 0x00007efce6c27213 in s_packet_handler_connack (connection=0x19e43c0, message_cursor=...) at /workspace/srcdir/aws-c-mqtt/source/client_channel_handler.c:165
#3 0x00007efce6c27de6 in s_process_mqtt_packet (connection=0x19e43c0, packet_type=AWS_MQTT_PACKET_CONNACK, packet=...) at /workspace/srcdir/aws-c-mqtt/source/client_channel_handler.c:459
In frames 2 and 3, you can see that connection=0x19e43c0. The connection pointer is originally given to the C library from Julia, and if you printed connection from Julia, you would see this address. Therefore, this is the correct address. Further inspection of the memory at this address in rr confirms that this is correct. The problem arises when passing this pointer to my callback. In frame 0, you can see that connection has changed: it is now connection=139624673185440.
Julia Code
This is the full Julia code (except the JLL wrapper). Relevant points are the definitions of on_connection_complete and on_connection_complete_cb, and the call to aws_mqtt_client_connection_connect.
# set substitute-path /workspace/srcdir ../local_sources
GC.enable(false)
GC.enable_finalizers(false)
using Test, LibAWSMQTT, CountDownLatches
const held_refs = Vector{Ref}()
const received_on_connection_complete = CountDownLatch(1)
function on_connection_interrupted(connection, error_code, userdata)
@warn "Connection Interrupted" error_code
end
function on_connection_resumed(connection, return_code, session_present, userdata)
@info "Connection Resumed" return_code session_present
if !session_present
@info "Resubscribing..."
packet_id = aws_mqtt_resubscribe_existing_topics(connection, s_on_resubscribed, C_NULL)
if packet_id == 0
error("Failed to resubscribe aws_last_error=$(aws_last_error())")
end
end
end
function on_connection_complete(connection::Ptr{aws_mqtt_client_connection}, error_code::Cint, return_code::Cint, session_present::Cuchar, userdata::Ptr{Cvoid})
count_down(received_on_connection_complete)
println("on_connection_complete on_connection_complete on_connection_complete on_connection_complete")
return nothing
end
struct UserData
connection::Ptr{aws_mqtt_client_connection}
end
function aws_iot_client_test_main()
endpoint = "a19vzmsjwkvm63-ats.iot.us-east-1.amazonaws.com"
client_id_name = "test-client-id"
subscribe_topic = "mytopic"
will_payload = "The client has gone offline!"
ca_filepath = joinpath(@__DIR__, "certs", "AmazonRootCA1.pem")
cert_filepath = joinpath(@__DIR__, "certs", "5909390f9eff261d9cf92331e0e6dfc3e0e673b0005c272ee258dd3cb192fffb-certificate.pem.crt")
pri_key_filepath = joinpath(@__DIR__, "certs", "5909390f9eff261d9cf92331e0e6dfc3e0e673b0005c272ee258dd3cb192fffb-private.pem.key")
allocator = aws_default_allocator()
aws_mqtt_library_init(allocator)
el_group = aws_event_loop_group_new_default(allocator, 1, C_NULL)
resolver_options = Ref(aws_host_resolver_default_options(8, el_group, C_NULL, C_NULL))
resolver = aws_host_resolver_new_default(allocator, resolver_options)
bootstrap_options = Ref(aws_client_bootstrap_options(el_group, resolver, C_NULL, C_NULL, C_NULL))
bootstrap = aws_client_bootstrap_new(allocator, bootstrap_options)
tls_ctx_opt = Ref(aws_tls_ctx_options(ntuple(_ -> UInt8(0), 200)))
@test AWS_OP_SUCCESS ==
aws_tls_ctx_options_init_client_mtls_from_path(tls_ctx_opt, allocator, cert_filepath, pri_key_filepath)
@test tls_ctx_opt[].allocator == allocator
@test AWS_OP_SUCCESS == aws_tls_ctx_options_set_alpn_list(tls_ctx_opt, "x-amzn-mqtt-ca")
@test AWS_OP_SUCCESS == aws_tls_ctx_options_override_default_trust_store_from_path(tls_ctx_opt, C_NULL, ca_filepath)
tls_ctx = aws_tls_client_ctx_new(allocator, tls_ctx_opt)
@test tls_ctx != C_NULL
aws_tls_ctx_options_clean_up(tls_ctx_opt)
tls_connection_options =
Ref(aws_tls_connection_options(C_NULL, C_NULL, C_NULL, C_NULL, C_NULL, C_NULL, C_NULL, false, 0))
aws_tls_connection_options_init_from_ctx(tls_connection_options, tls_ctx)
client = aws_mqtt_client_new(allocator, bootstrap)
@show client
connection = aws_mqtt_client_connection_new(client)
@show connection
ud = UserData(connection)
GC.@preserve connection begin
socket_options = Ref(aws_socket_options(AWS_SOCKET_STREAM, AWS_SOCKET_IPV6, 3000, 0, 0, 0, false))
host_name_cur = Ref(aws_byte_cursor_from_c_str(endpoint))
@show host_name_cur
client_id_cur = Ref(aws_byte_cursor_from_c_str(client_id_name))
on_connection_interrupted_cb =
@cfunction on_connection_interrupted Cvoid (Ptr{aws_mqtt_client_connection}, Int, Ptr{Cvoid})
on_connection_resumed_cb = @cfunction on_connection_resumed Cvoid (Ptr{aws_mqtt_client_connection},aws_mqtt_connect_return_code,Bool,Ptr{Cvoid},)
@test AWS_OP_SUCCESS == aws_mqtt_client_connection_set_connection_interruption_handlers(
connection,
on_connection_interrupted_cb,
C_NULL,
on_connection_resumed_cb,
C_NULL,
)
subscribe_topic_cur = Ref(aws_byte_cursor_from_c_str(subscribe_topic))
will_cur = Ref(aws_byte_cursor_from_c_str(will_payload))
aws_mqtt_client_connection_set_will(connection, subscribe_topic_cur, AWS_MQTT_QOS_AT_LEAST_ONCE, false, will_cur)
on_connection_complete_cb = @cfunction(on_connection_complete, Cvoid, (Ptr{aws_mqtt_client_connection},Cint,Cint,Cuchar,Ptr{Cvoid}))
GC.@preserve allocator el_group resolver bootstrap client connection socket_options tls_connection_options ud begin
@show connection
@show on_connection_complete_cb
push!(held_refs, Ref(on_connection_complete_cb))
conn_options = Ref(
aws_mqtt_connection_options(
host_name_cur[],
UInt16(8883),
Base.unsafe_convert(Ptr{aws_socket_options}, socket_options),
Base.unsafe_convert(Ptr{aws_tls_connection_options}, tls_connection_options),
client_id_cur[],
0,
0,
0,
on_connection_complete_cb,
Base.unsafe_convert(Ptr{Cvoid}, Ref(ud)), # user_data
true,
),
)
push!(held_refs, conn_options)
aws_mqtt_client_connection_connect(connection, conn_options)
await(received_on_connection_complete)
println("connected connected connected connected connected connected")
end
end
end
@testset "LibAWSMQTT" begin
@testset "aws_iot_client_test" begin
aws_iot_client_test_main()
end
end
C Code
mqtt/source/client_channel_handler.c:165
MQTT_CLIENT_CALL_CALLBACK_ARGS(
connection, on_connection_complete,
AWS_OP_SUCCESS, connack.connect_return_code, connack.session_present);
MQTT_CLIENT_CALL_CALLBACK_ARGS is a variadic macro defined like so:
#define MQTT_CLIENT_CALL_CALLBACK_ARGS(client_ptr, callback, ...) \
do { \
if ((client_ptr)->callback) { \
(client_ptr)->callback((client_ptr), __VA_ARGS__, (client_ptr)->callback##_ud); \
} \
} while (false)
Looking at the C code, connection is passed from the s_packet_handler_connack function to the callback function without modification. I don’t understand what is causing its value to change when invoking julia_on_connection_complete_479.
Ultimately, the error I get when the callback is invoked is a segfault:
Thread 3 received signal SIGSEGV, Segmentation fault.
[Switching to Thread 70533.70674]
0x00007efcec134770 in julia_on_connection_complete_479 (connection=139624673185440, error_code=0, return_code=1, session_present=0 '\000', userdata=139624665866896) at /home/salmon/Documents/code/aws-c-mqtt-generator/test/runtests.jl:23
Debugging steps I’ve tried so far that are worth noting:
- I can
ccallthe callback just fine. The segfault occurs when it’s called from the C library. - I have tried combinations of various Julia types and C types in the
cfunctionargument tuple and return type to debug possible argument passing errors. - I have tried creating the
cfunctionin the global scope. - I have tried preserving (
GC.@preserve) all relevant data and even disabled the GC. This has no effect on the error, so this doesn’t appear to be a GC problem.
I have also uploaded a zip of the entire project here if anyone wants to reproduce it locally.