Hello, I want to build the julia version of Futu API referring to this python version, and asked the Grok, ChatGPT, etc and got the function below:
sock = Sockets.TCPSocket()
# Set receive buffer
setsockopt(sock, Sockets.SOL_SOCKET, Sockets.SO_RCVBUF, rcvbuf_size)
ERROR: LoadError: UndefVarError: `setsockopt` not defined in `Main.FutuAPI.CommonModule.Connection`
Suggestion: check for spelling errors or missing imports.
and also error with Sockets.SOL_SOCKET
, Sockets.SO_RCVBUF
any suggestions would be appreciate.
Module Connection
module Connection
using OpenSSL, Sockets, ProtoBuf, Dates, Base.Threads
include("SysConfig.jl")
include("ProtoID.jl")
include("../proto/Common.jl")
include("../proto/InitConnect.jl")
using .SysConfig, .ProtoID, .Common, .InitConnect
# ============================== Enums ================================
@enum ConnStatus begin
Start = 0
Connecting = 1
Connected = 2
Closed = 3
end
@enum ConnectErr begin
ConnectOk = 0
Fail = 1
ConnectErr_Timeout = 2
end
@enum CloseReason begin
Close = 0
RemoteClose = 1
ReadFail = 2
CloseReason_SendFail = 3
ConnectFail = 4
end
@enum PacketErr begin
PacketOk = 0
PacketErr_Timeout = 1
Invalid = 2
PacketErr_SendFail = 3
Disconnect = 4
end
# ============================== Structs =================================
mutable struct SyncReqRspInfo
event::Condition
ret::Int32
msg::String
data::Union{Nothing, Any}
SyncReqRspInfo() = new(Condition(), Int32(RET_OK), "", nothing)
end
mutable struct ConnectInfo
start_time::Union{Nothing, DateTime}
event::Union{Nothing, Condition}
conn_id::UInt64
err::ConnectErr
msg::String
ConnectInfo(is_sync::Bool) = begin
ci = new(nothing, nothing, 0, ConnectOk, "")
if is_sync
ci.event = Condition()
end
ci
end
end
is_sync(ci::ConnectInfo) = ci.event !== nothing
function set_result(ci::ConnectInfo, err::ConnectErr, msg::String)
ci.err = err
ci.msg = msg
ci.event !== nothing && notify(ci.event)
end
wait(ci::ConnectInfo; timeout::Float64 = 5.0) = ci.event !== nothing && wait(ci.event; timeout)
mutable struct SendInfo
send_time::Union{Nothing, DateTime}
proto_id::Int
serial_no::Int
header_dict::Union{Nothing, Dict}
err::PacketErr
msg::String
event::Union{Nothing, Condition}
rsp::Union{Nothing, Any}
SendInfo(is_sync::Bool) = begin
si = new(nothing, 0, 0, nothing, PacketOk, "", nothing, nothing)
if is_sync
si.event = Condition()
end
si
end
end
is_sync(si::SendInfo) = si.event !== nothing
function set_result(si::SendInfo, err::PacketErr, msg::String, rsp::Union{Nothing, Any})
si.err = err
si.msg = msg
si.rsp = rsp
si.event !== nothing && notify(si.event)
end
wait(si::SendInfo; timeout::Float64 = 5.0) = si.event !== nothing && wait(si.event; timeout)
mutable struct FutuConnection
client::TCPSocket
conn_id::UInt64
aes_key::String
keep_alive_interval::Int32
aes_cipher::Union{Nothing, OpenSSL.EvpCipher}
opend_conn_id::UInt64
status::ConnStatus
timeout::Union{Nothing, Float64}
readbuf::Vector{UInt8}
writebuf::Vector{UInt8}
send_info_dict::Dict{Tuple{Int, Int}, SendInfo}
connect_info::ConnectInfo
function FutuConnection(client, conn_id, aes_key, keep_alive_interval, aes_cipher, is_sync=true)
new(client, conn_id, aes_key, keep_alive_interval, aes_cipher, 0, Start,
nothing, UInt8[], UInt8[], Dict{Tuple{Int, Int}, SendInfo}(), ConnectInfo(is_sync))
end
end
mutable struct NetworkController
next_conn_id::UInt64
connections::Dict{UInt64, FutuConnection}
sync_req_timeout::Float64
last_activate_time::DateTime
last_check_req_time::DateTime
running::Bool
NetworkController() = new(1, Dict(), 12.0, now(), now(), false)
end
# ===================== Encryption/Decryption Utilities ==========================
function rsa_encrypt_body(body::Vector{UInt8}, evp_pkey)
max_plain_len = 100
encrypted_segments = Vector{UInt8}[]
rsa = OpenSSL.RSA(evp_pkey)
pubkey = RSA.public_key(rsa)
for i in 1:max_plain_len:length(body)
chunk = body[i:min(i + max_plain_len - 1, end)]
encrypted = RSA.encrypt(pubkey, chunk; padding=OpenSSL.PKCS1v15())
push!(encrypted_segments, encrypted)
end
return vcat(encrypted_segments...)
end
function aes_encrypt_body(body::Vector{UInt8}, key::Vector{UInt8})
mod_len = length(body) % 16
padded_body = mod_len == 0 ? body : vcat(body, zeros(UInt8, 16 - mod_len))
cipher = OpenSSL.Cipher("aes-128-ecb", key, nothing, true)
encrypted = OpenSSL.encrypt(cipher, padded_body)
padding_block = zeros(UInt8, 16)
padding_block[end] = UInt8(mod_len)
encrypted_padding = OpenSSL.encrypt(cipher, padding_block)
return vcat(encrypted, encrypted_padding)
end
function rsa_decrypt_body(encrypted_body::Vector{UInt8}, evp_pkey)
segment_len = 128
decrypted_segments = Vector{UInt8}[]
rsa = OpenSSL.RSA(evp_pkey)
for i in 1:segment_len:length(encrypted_body)
chunk = encrypted_body[i:min(i + segment_len - 1, end)]
decrypted = RSA.decrypt(rsa, chunk; padding=OpenSSL.PKCS1v15())
push!(decrypted_segments, decrypted)
end
return vcat(decrypted_segments...)
end
function aes_decrypt_body(encrypted_body::Vector{UInt8}, key::Vector{UInt8})
mod_len = encrypted_body[end]
data_len = length(encrypted_body) - 16
encrypted_data = encrypted_body[1:data_len]
cipher = OpenSSL.Cipher("aes-128-ecb", key, nothing, false)
decrypted = OpenSSL.decrypt(cipher, encrypted_data)
return mod_len == 0 ? decrypted : decrypted[1:end-(16-mod_len)]
end
function parse_head(data::Vector{UInt8})
Dict(
"proto_id" => ntoh(reinterpret(UInt16, data[3:4])[1]),
"body_len" => ntoh(reinterpret(UInt32, data[5:8])[1]),
"serial_no" => 1 # Placeholder
)
end
# ========================== Network Management =============================
function start(nc::NetworkController)
nc.running = true
Threads.@spawn poll(nc)
end
function stop(nc::NetworkController)
nc.running = false
end
function poll(nc::NetworkController)
pollfds = Dict{UInt64, Vector{Pollfd}}()
while nc.running
now_time = now()
for (conn_id, conn) in nc.connections
if !haskey(pollfds, conn_id)
pollfds[conn_id] = [Pollfd(conn.client, POLLIN | POLLOUT)]
end
ready = poll_fd(pollfds[conn_id], 0.05)
if !isempty(ready) && ready[1].revents & POLLIN != 0
on_read(nc, conn)
end
if !isempty(ready) && ready[1].revents & POLLOUT != 0 && length(conn.writebuf) > 0
on_write(nc, conn)
end
if conn.status == Connecting
check_connect_timeout(nc, conn, now_time)
elseif conn.status == Connected
check_req_timeout(nc, conn, now_time)
end
end
nc.last_activate_time = now_time
nc.last_check_req_time = now_time
sleep(0.05)
end
end
function connect_to_futu(
nc::NetworkController, host::String = "127.0.0.1", port::Int = 11111;
timeout::Float64 = 5.0, is_encrypt::Bool = true, is_sync::Bool = true,
rcvbuf_size::Int = 1024*1024, sndbuf_size::Int = 1024*1024)
conn_id = nc.next_conn_id
nc.next_conn_id += 1
sock = Sockets.TCPSocket()
# Set receive buffer
setsockopt(sock, Sockets.SOL_SOCKET, Sockets.SO_RCVBUF, rcvbuf_size)
actual_rcvbuf = Sockets.getsocketopt(sock, Sockets.SOL_SOCKET, Sockets.SO_RCVBUF)
if actual_rcvbuf < rcvbuf_size
SysConfig.config.debug && println("Warning: Receive buffer set to $actual_rcvbuf instead of $rcvbuf_size")
end
# Set send buffer
Sockets.setsockopt(sock, Sockets.SOL_SOCKET, Sockets.SO_SNDBUF, sndbuf_size)
actual_sndbuf = Sockets.getopt(sock, Sockets.SOL_SOCKET, Sockets.SO_SNDBUF)
if actual_sndbuf < sndbuf_size
SysConfig.config.debug && println("Warning: Send buffer set to $actual_sndbuf instead of $sndbuf_size")
end
conn = FutuConnection(sock, conn_id, "", 10, nothing, is_sync)
conn.status = Connecting
conn.timeout = timeout
conn.connect_info.conn_id = conn_id
nc.connections[conn_id] = conn
Threads.@spawn begin
conn.connect_info.start_time = now()
try
connect(sock, host, port; nonblocking = true)
Threads.@spawn begin
sleep(timeout)
if conn.status == Connecting
set_result(conn.connect_info, ConnectErr_Timeout, "Connection timeout")
close_connection(nc, conn_id, ConnectFail, "Timeout")
end
end
catch e
set_result(conn.connect_info, Fail, string(e))
close_connection(nc, conn_id, ConnectFail, string(e))
end
Threads.@spawn keep_alive_task(conn)
end
return RET_OK, conn.connect_info
end
function send(nc::NetworkController, conn_id::UInt64, data::Vector{UInt8}, is_sync::Bool=false)
header = parse_head(data[1:8])
send_info = SendInfo(is_sync)
send_info.proto_id = header["proto_id"]
send_info.serial_no = header["serial_no"]
send_info.header_dict = header
Threads.@spawn begin
send_info.send_time = now()
do_send(nc, conn_id, send_info, data)
end
return RET_OK, send_info
end
function keep_alive_task(conn::FutuConnection)
while conn.status == Connected
sleep(conn.keep_alive_interval)
SysConfig.config.debug && println("Heartbeat for conn_id=$(conn.conn_id)")
end
end
function check_connect_timeout(nc::NetworkController, conn::FutuConnection, now_time::DateTime)
if conn.timeout !== nothing && conn.connect_info.start_time !== nothing
elapsed = (now_time - conn.connect_info.start_time).value / 1000
if elapsed >= conn.timeout
set_result(conn.connect_info, ConnectErr_Timeout, "Connection timeout")
close_connection(nc, conn.conn_id, ConnectFail, "Timeout")
end
end
end
function check_req_timeout(nc::NetworkController, conn::FutuConnection, now_time::DateTime)
for (proto_info, send_info) in collect(conn.send_info_dict)
if send_info.send_time !== nothing
elapsed = (now_time - send_info.send_time).value / 1000
if elapsed >= nc.sync_req_timeout
set_result(send_info, PacketErr_Timeout, "Request timeout", nothing)
delete!(conn.send_info_dict, proto_info)
end
end
end
end
# ============================== I/O Handling ================================
function do_send(nc::NetworkController, conn_id::UInt64, send_info::SendInfo, data::Vector{UInt8})
conn = get(nc.connections, conn_id, nothing)
if conn === nothing || conn.status != Connected
set_result(send_info, PacketErr_SendFail, "Connection lost or not connected", nothing)
return
end
proto_info = (send_info.proto_id, send_info.serial_no)
conn.send_info_dict[proto_info] = send_info
if length(conn.writebuf) > 0
append!(conn.writebuf, data)
else
write(conn.client, data)
end
end
function on_read(nc::NetworkController, conn::FutuConnection)
try
data = readavailable(conn.client)
append!(conn.readbuf, data)
while length(conn.readbuf) >= 8
head = parse_head(conn.readbuf[1:8])
body_len = head["body_len"]
if length(conn.readbuf) < 8 + body_len
break
end
body = conn.readbuf[9:8+body_len]
deleteat!(conn.readbuf, 1:8+body_len)
on_packet(nc, conn, head, PacketOk, "", body)
end
catch e
close_connection(nc, conn.conn_id, ReadFail, string(e))
end
end
function on_write(nc::NetworkController, conn::FutuConnection)
try
if length(conn.writebuf) > 0
sent = write(conn.client, conn.writebuf)
deleteat!(conn.writebuf, 1:sent)
end
catch e
close_connection(nc, conn.conn_id, CloseReason_SendFail, string(e))
end
end
function close_connection(nc::NetworkController, conn_id::UInt64, reason::CloseReason, msg::String)
conn = get(nc.connections, conn_id, nothing)
if conn === nothing || conn.status == Closed
return
end
close(conn.client)
conn.status = Closed
for send_info in values(conn.send_info_dict)
set_result(send_info, Disconnect, "", nothing)
end
delete!(nc.connections, conn_id)
SysConfig.config.debug && println("Closed conn_id=$conn_id, reason=$reason, msg=$msg")
end
function on_packet(nc::NetworkController, conn::FutuConnection, header::Dict, err::PacketErr, msg::String, rsp_body::Vector{UInt8})
proto_info = (header["proto_id"], header["serial_no"])
rsp_pb = nothing
if err == PacketOk
if header["proto_id"] == ProtoID.InitConnect
decrypted = rsa_decrypt_body(rsp_body, SysConfig.get_rsa_obj())
rsp_pb = ProtoBuf.decode(IOBuffer(decrypted), InitConnect.Response)
if rsp_pb.retType == RET_OK
conn.opend_conn_id = rsp_pb.s2c.connID
conn.aes_key = rsp_pb.s2c.connAESKey
conn.aes_cipher = OpenSSL.Cipher("aes-128-ecb", Vector{UInt8}(conn.aes_key), nothing, true)
end
else
rsp_pb = aes_decrypt_body(rsp_body, Vector{UInt8}(conn.aes_key))
end
end
send_info = get(conn.send_info_dict, proto_info, nothing)
if send_info !== nothing
set_result(send_info, err, msg, rsp_pb)
delete!(conn.send_info_dict, proto_info)
end
end
export NetworkController, FutuConnection, SyncReqRspInfo, SendInfo, ConnectInfo,
start, stop, connect_to_futu, send, rsa_encrypt_body, aes_encrypt_body,
rsa_decrypt_body, aes_decrypt_body, parse_head
end