tldr; how does one write a connection pool to a database like mongo?
As a side project to learn more about julia, I have a script which pulls a list from a mongo collection (database table), hits a GEOIP API, and inserts the result into another mongo collection. Right now, it’s written in Ruby. Read all about the performance differences. At the risk of embarrassment, I can’t seem to write a connection pool that works to thread the connections to the database.
Julia 1.5.4 is started as julia -t 48
using 4 packages:
import Mongoc
import HTTP
import ProgressMeter
import DataStructures
I connect to the db using a pool:
pool = Mongoc.ClientPool("mongodb://researchdb.lan", max_size=55)
client = Mongoc.Client(pool)
It turns out, it’s faster to pull the data back from mongo into two vectors than it is to do the set math in mongo itself (because of hardware, memory, etc). I have 2 functions that pull back the ip addresses from 2 different mongo collections. It takes about 2.5 minutes to run each function versus in mongo it takes 55-65 minutes:
function get_fill_ipaddr()
ipaddr_pb = ProgressMeter.Progress(length(unique_ipaddr), 0.1, "filling total_ipaddr...", 100)
fill_ipaddr = Set()
for ipaddr in Mongoc.find(unique_ipaddr)
push!(fill_ipaddr, ipaddr["_id"])
ProgressMeter.next!(ipaddr_pb)
end
return fill_ipaddr
end
The other is just like it, except a different collection.
I then simply subtract one set from the other and zero the original sets to save memory:
total_ipaddr = get_fill_ipaddr()
total_geoip = get_fill_geoip()
todo_list = setdiff(total_ipaddr - total_geoip)
total_geoip = nothing
total_ipaddr = nothing
All of this happens in under a second. total_geoip and total_ipaddr are about 35 million IPs as strings, on average.
And then i walk the set and run a function:
for ipaddr in todo_queue
@async get_geoip_results(ipaddr)
end
where get_geoip_results() is:
function get_geoip_results(ipaddr)
response = HTTP.get("$endpoint?apiKey=$apikey&ip=$ipaddr&lang=$language&include=security"; status_exception=false)
if response.status == 200
Mongoc.insert_one(geoip_infos, Mongoc.BSON(String(response.body)))
end
end
The http response is already in JSON, so it is inserted as is into the db. And this is where it all falls apart.
The mongo c driver on which Mongoc.jl is based is not thread-safe, except for one call, Mongoc.ClientPool is the only thread-safe call. When two threads try to use the same client, where client is defined above, julia segfaults in the mongoc.jl package. I expected this.
I tried breaking the insert to db as a different function and using DataStructures.jl Deque to push/pop new connections from the pool (as defined above), but that seems to re-use the same client connection, not create a new one. Something like this to create 55 “clients” for the pool to use. I tried using a Dict so each symbol is clearly unique, like for i in 1:55 push!(client_pool[i]) end
and call
for x in keys(client_pool)
clientx = pop!(client_pool["$first(x)"]
end
it didn’t work because i can’t guarantee two thread don’t pull the same key. So then i just tried:
Using DataStructures
client_pool = Deque()
for i in 1:55
push!(client_pool, i)
end
And then do a basic pop/push to/from the queue:
client_one = pop!(client_pool)
geoip_coll = client[:geoip_infos]
Mongoc.insert_one(geoip_coll, Mongoc.BSON(document))
push!(client_pool, client_one)
And then I thought about something like:
Mongoc.insert_one((Mongoc.Client(pool)[:geoip_infos]), Mongoc.BSON(String(response.body)))
And then I got more advanced into thinking i could create 1 dequeue shared across 2 threads/processes. The goal is 50 threads to poll the HTTP API and pushfirst! to the queue, and one single thread to empty it. If the queue is empty, the db writing thread waits until the queue has data (while !isempty(queue) sleep(5)
).
The timing here means the db thread will be far faster (1-5ms) versus the 50 http threads (50-75ms). Each queue would have to run independently, so maybe @async or @spawn.
So far, this is all beyond me in julia. I’ve read the parallel and multi-threading sections a few times. I’m clearly not grokking it yet.
Pointers? Advice? Help?
Thanks!