I am trying to pass a Julia function to a C library (aws-c-mqtt) which will use it as a callback when an asynchronous event occurs. I am running in to trouble because some of the parameters passed to this callback appear corrupted on the Julia side of the execution, but appear correct on the C side. This rr
backtrace illustrates what I mean:
(rr) bt
#0 0x00007efcec134770 in julia_on_connection_complete_479 (connection=139624673185440, error_code=0, return_code=1, session_present=0 '\000', userdata=139624665866896)
at /home/salmon/Documents/code/aws-c-mqtt-generator/test/runtests.jl:23
#1 0x00007efcec14051b in jlcapi_on_connection_complete_485 ()
#2 0x00007efce6c27213 in s_packet_handler_connack (connection=0x19e43c0, message_cursor=...) at /workspace/srcdir/aws-c-mqtt/source/client_channel_handler.c:165
#3 0x00007efce6c27de6 in s_process_mqtt_packet (connection=0x19e43c0, packet_type=AWS_MQTT_PACKET_CONNACK, packet=...) at /workspace/srcdir/aws-c-mqtt/source/client_channel_handler.c:459
In frames 2 and 3, you can see that connection=0x19e43c0
. The connection
pointer is originally given to the C library from Julia, and if you printed connection
from Julia, you would see this address. Therefore, this is the correct address. Further inspection of the memory at this address in rr
confirms that this is correct. The problem arises when passing this pointer to my callback. In frame 0, you can see that connection
has changed: it is now connection=139624673185440
.
Julia Code
This is the full Julia code (except the JLL wrapper). Relevant points are the definitions of on_connection_complete
and on_connection_complete_cb
, and the call to aws_mqtt_client_connection_connect
.
# set substitute-path /workspace/srcdir ../local_sources
GC.enable(false)
GC.enable_finalizers(false)
using Test, LibAWSMQTT, CountDownLatches
const held_refs = Vector{Ref}()
const received_on_connection_complete = CountDownLatch(1)
function on_connection_interrupted(connection, error_code, userdata)
@warn "Connection Interrupted" error_code
end
function on_connection_resumed(connection, return_code, session_present, userdata)
@info "Connection Resumed" return_code session_present
if !session_present
@info "Resubscribing..."
packet_id = aws_mqtt_resubscribe_existing_topics(connection, s_on_resubscribed, C_NULL)
if packet_id == 0
error("Failed to resubscribe aws_last_error=$(aws_last_error())")
end
end
end
function on_connection_complete(connection::Ptr{aws_mqtt_client_connection}, error_code::Cint, return_code::Cint, session_present::Cuchar, userdata::Ptr{Cvoid})
count_down(received_on_connection_complete)
println("on_connection_complete on_connection_complete on_connection_complete on_connection_complete")
return nothing
end
struct UserData
connection::Ptr{aws_mqtt_client_connection}
end
function aws_iot_client_test_main()
endpoint = "a19vzmsjwkvm63-ats.iot.us-east-1.amazonaws.com"
client_id_name = "test-client-id"
subscribe_topic = "mytopic"
will_payload = "The client has gone offline!"
ca_filepath = joinpath(@__DIR__, "certs", "AmazonRootCA1.pem")
cert_filepath = joinpath(@__DIR__, "certs", "5909390f9eff261d9cf92331e0e6dfc3e0e673b0005c272ee258dd3cb192fffb-certificate.pem.crt")
pri_key_filepath = joinpath(@__DIR__, "certs", "5909390f9eff261d9cf92331e0e6dfc3e0e673b0005c272ee258dd3cb192fffb-private.pem.key")
allocator = aws_default_allocator()
aws_mqtt_library_init(allocator)
el_group = aws_event_loop_group_new_default(allocator, 1, C_NULL)
resolver_options = Ref(aws_host_resolver_default_options(8, el_group, C_NULL, C_NULL))
resolver = aws_host_resolver_new_default(allocator, resolver_options)
bootstrap_options = Ref(aws_client_bootstrap_options(el_group, resolver, C_NULL, C_NULL, C_NULL))
bootstrap = aws_client_bootstrap_new(allocator, bootstrap_options)
tls_ctx_opt = Ref(aws_tls_ctx_options(ntuple(_ -> UInt8(0), 200)))
@test AWS_OP_SUCCESS ==
aws_tls_ctx_options_init_client_mtls_from_path(tls_ctx_opt, allocator, cert_filepath, pri_key_filepath)
@test tls_ctx_opt[].allocator == allocator
@test AWS_OP_SUCCESS == aws_tls_ctx_options_set_alpn_list(tls_ctx_opt, "x-amzn-mqtt-ca")
@test AWS_OP_SUCCESS == aws_tls_ctx_options_override_default_trust_store_from_path(tls_ctx_opt, C_NULL, ca_filepath)
tls_ctx = aws_tls_client_ctx_new(allocator, tls_ctx_opt)
@test tls_ctx != C_NULL
aws_tls_ctx_options_clean_up(tls_ctx_opt)
tls_connection_options =
Ref(aws_tls_connection_options(C_NULL, C_NULL, C_NULL, C_NULL, C_NULL, C_NULL, C_NULL, false, 0))
aws_tls_connection_options_init_from_ctx(tls_connection_options, tls_ctx)
client = aws_mqtt_client_new(allocator, bootstrap)
@show client
connection = aws_mqtt_client_connection_new(client)
@show connection
ud = UserData(connection)
GC.@preserve connection begin
socket_options = Ref(aws_socket_options(AWS_SOCKET_STREAM, AWS_SOCKET_IPV6, 3000, 0, 0, 0, false))
host_name_cur = Ref(aws_byte_cursor_from_c_str(endpoint))
@show host_name_cur
client_id_cur = Ref(aws_byte_cursor_from_c_str(client_id_name))
on_connection_interrupted_cb =
@cfunction on_connection_interrupted Cvoid (Ptr{aws_mqtt_client_connection}, Int, Ptr{Cvoid})
on_connection_resumed_cb = @cfunction on_connection_resumed Cvoid (Ptr{aws_mqtt_client_connection},aws_mqtt_connect_return_code,Bool,Ptr{Cvoid},)
@test AWS_OP_SUCCESS == aws_mqtt_client_connection_set_connection_interruption_handlers(
connection,
on_connection_interrupted_cb,
C_NULL,
on_connection_resumed_cb,
C_NULL,
)
subscribe_topic_cur = Ref(aws_byte_cursor_from_c_str(subscribe_topic))
will_cur = Ref(aws_byte_cursor_from_c_str(will_payload))
aws_mqtt_client_connection_set_will(connection, subscribe_topic_cur, AWS_MQTT_QOS_AT_LEAST_ONCE, false, will_cur)
on_connection_complete_cb = @cfunction(on_connection_complete, Cvoid, (Ptr{aws_mqtt_client_connection},Cint,Cint,Cuchar,Ptr{Cvoid}))
GC.@preserve allocator el_group resolver bootstrap client connection socket_options tls_connection_options ud begin
@show connection
@show on_connection_complete_cb
push!(held_refs, Ref(on_connection_complete_cb))
conn_options = Ref(
aws_mqtt_connection_options(
host_name_cur[],
UInt16(8883),
Base.unsafe_convert(Ptr{aws_socket_options}, socket_options),
Base.unsafe_convert(Ptr{aws_tls_connection_options}, tls_connection_options),
client_id_cur[],
0,
0,
0,
on_connection_complete_cb,
Base.unsafe_convert(Ptr{Cvoid}, Ref(ud)), # user_data
true,
),
)
push!(held_refs, conn_options)
aws_mqtt_client_connection_connect(connection, conn_options)
await(received_on_connection_complete)
println("connected connected connected connected connected connected")
end
end
end
@testset "LibAWSMQTT" begin
@testset "aws_iot_client_test" begin
aws_iot_client_test_main()
end
end
C Code
mqtt/source/client_channel_handler.c:165
MQTT_CLIENT_CALL_CALLBACK_ARGS(
connection, on_connection_complete,
AWS_OP_SUCCESS, connack.connect_return_code, connack.session_present);
MQTT_CLIENT_CALL_CALLBACK_ARGS
is a variadic macro defined like so:
#define MQTT_CLIENT_CALL_CALLBACK_ARGS(client_ptr, callback, ...) \
do { \
if ((client_ptr)->callback) { \
(client_ptr)->callback((client_ptr), __VA_ARGS__, (client_ptr)->callback##_ud); \
} \
} while (false)
Looking at the C code, connection
is passed from the s_packet_handler_connack
function to the callback function without modification. I don’t understand what is causing its value to change when invoking julia_on_connection_complete_479
.
Ultimately, the error I get when the callback is invoked is a segfault:
Thread 3 received signal SIGSEGV, Segmentation fault.
[Switching to Thread 70533.70674]
0x00007efcec134770 in julia_on_connection_complete_479 (connection=139624673185440, error_code=0, return_code=1, session_present=0 '\000', userdata=139624665866896) at /home/salmon/Documents/code/aws-c-mqtt-generator/test/runtests.jl:23
Debugging steps I’ve tried so far that are worth noting:
- I can
ccall
the callback just fine. The segfault occurs when it’s called from the C library. - I have tried combinations of various Julia types and C types in the
cfunction
argument tuple and return type to debug possible argument passing errors. - I have tried creating the
cfunction
in the global scope. - I have tried preserving (
GC.@preserve
) all relevant data and even disabled the GC. This has no effect on the error, so this doesn’t appear to be a GC problem.
I have also uploaded a zip of the entire project here if anyone wants to reproduce it locally.