Failing to write a connection pool for Mongo

tldr; how does one write a connection pool to a database like mongo?

As a side project to learn more about julia, I have a script which pulls a list from a mongo collection (database table), hits a GEOIP API, and inserts the result into another mongo collection. Right now, it’s written in Ruby. Read all about the performance differences. At the risk of embarrassment, I can’t seem to write a connection pool that works to thread the connections to the database.

Julia 1.5.4 is started as julia -t 48 using 4 packages:

import Mongoc
import HTTP
import ProgressMeter
import DataStructures

I connect to the db using a pool:

pool = Mongoc.ClientPool("mongodb://researchdb.lan", max_size=55)
client = Mongoc.Client(pool)

It turns out, it’s faster to pull the data back from mongo into two vectors than it is to do the set math in mongo itself (because of hardware, memory, etc). I have 2 functions that pull back the ip addresses from 2 different mongo collections. It takes about 2.5 minutes to run each function versus in mongo it takes 55-65 minutes:

function get_fill_ipaddr()
  ipaddr_pb = ProgressMeter.Progress(length(unique_ipaddr), 0.1, "filling total_ipaddr...", 100)
  fill_ipaddr = Set()
  for ipaddr in Mongoc.find(unique_ipaddr)
    push!(fill_ipaddr, ipaddr["_id"])
    ProgressMeter.next!(ipaddr_pb)
  end
  return fill_ipaddr
end

The other is just like it, except a different collection.

I then simply subtract one set from the other and zero the original sets to save memory:

total_ipaddr = get_fill_ipaddr()
total_geoip = get_fill_geoip()
todo_list = setdiff(total_ipaddr - total_geoip)
total_geoip = nothing
total_ipaddr = nothing

All of this happens in under a second. total_geoip and total_ipaddr are about 35 million IPs as strings, on average.

And then i walk the set and run a function:

for ipaddr in todo_queue
  @async get_geoip_results(ipaddr)
end

where get_geoip_results() is:

function get_geoip_results(ipaddr)
  response = HTTP.get("$endpoint?apiKey=$apikey&ip=$ipaddr&lang=$language&include=security"; status_exception=false)
  if response.status == 200
    Mongoc.insert_one(geoip_infos, Mongoc.BSON(String(response.body)))
  end
end

The http response is already in JSON, so it is inserted as is into the db. And this is where it all falls apart.

The mongo c driver on which Mongoc.jl is based is not thread-safe, except for one call, Mongoc.ClientPool is the only thread-safe call. When two threads try to use the same client, where client is defined above, julia segfaults in the mongoc.jl package. I expected this.

I tried breaking the insert to db as a different function and using DataStructures.jl Deque to push/pop new connections from the pool (as defined above), but that seems to re-use the same client connection, not create a new one. Something like this to create 55 “clients” for the pool to use. I tried using a Dict so each symbol is clearly unique, like for i in 1:55 push!(client_pool[i]) end and call

for x in keys(client_pool)
 clientx =  pop!(client_pool["$first(x)"]
end

it didn’t work because i can’t guarantee two thread don’t pull the same key. So then i just tried:

Using DataStructures
client_pool = Deque()
for i in 1:55
  push!(client_pool, i)
end

And then do a basic pop/push to/from the queue:

client_one = pop!(client_pool)
geoip_coll = client[:geoip_infos]
Mongoc.insert_one(geoip_coll, Mongoc.BSON(document))
push!(client_pool, client_one)

And then I thought about something like:

Mongoc.insert_one((Mongoc.Client(pool)[:geoip_infos]), Mongoc.BSON(String(response.body)))

And then I got more advanced into thinking i could create 1 dequeue shared across 2 threads/processes. The goal is 50 threads to poll the HTTP API and pushfirst! to the queue, and one single thread to empty it. If the queue is empty, the db writing thread waits until the queue has data (while !isempty(queue) sleep(5)).

The timing here means the db thread will be far faster (1-5ms) versus the 50 http threads (50-75ms). Each queue would have to run independently, so maybe @async or @spawn.

So far, this is all beyond me in julia. I’ve read the parallel and multi-threading sections a few times. I’m clearly not grokking it yet.

Pointers? Advice? Help?

Thanks!

I did this recently with LIbPQ, and a channel.


mutable struct ConnectionPool
    pool::Channel{LibPQ.Connection}
    count::Int
    max::Int
    connect::String
    protect::ReentrantLock
end

function ConnectionPool()
    max     = parse(Int, ENV["DB_POOL"])
    connect = join([
        "dbname=$(ENV["DB_NAME"])",
        "user=$(ENV["DB_USER"])",
        "password=$(ENV["DB_PASS"])",
        "host=$(ENV["DB_HOST"])",
        "port=$(ENV["DB_PORT"])"
    ], " ");

    return ConnectionPool(
        Channel{LibPQ.Connection}(max),
        0, max,
        connect,
        ReentrantLock()
    )
end

function Base.close(
            p::ConnectionPool
        )::Nothing

    close(p.pool)

    while isready(p.pool)
        close(take!(p.pool))
    end

end

function test(
            conn::LibPQ.Connection
        )::Bool
    retval = false
    result = execute(conn, "select 1"; throw_error=false)

    if status(result) == LibPQ.libpq_c.PGRES_TUPLES_OK
        retval = true
    end

    close(result)

    return retval
end

function get_valid_connection(
            p::ConnectionPool
        )::LibPQ.Connection

    conn = nothing

    while conn === nothing

        # -------------------------------------------------------------------------------------
        # If the pool is not full operate under special conditions.
        lock(p.protect) do
            if p.count < p.max
                if isready(p.pool) == true
                    conn = take!(p.pool)
                else
                    conn = LibPQ.Connection(p.connect)
                    p.count += 1
                end
            end
            nothing
        end

        # -------------------------------------------------------------------------------------
        # If conn is nothing then the pool is full, grab the next available connection.
        if conn === nothing
            conn = take!(p.pool)
        end

        # -------------------------------------------------------------------------------------
        # Check the connection.
        if test(conn) == false
            close(conn)

            conn = nothing

            lock(p.protect) do
                p.count -= 1
            end
        end

    end

    return conn
end

function get_conn(
            f::Function,
            p::ConnectionPool
        )::Nothing

    conn = get_valid_connection(p)

    try
        f(conn)
    finally
        try
            put!(p.pool, conn)
        catch
            close(conn)
        end
    end

    nothing
end

In short a Channel (pool) is created with the maximum number of connections I want to have. get_valid_connection does the interesting work.

  • If the pool isn’t full but there is a connection in pool grab that.
  • If the pool isn’t full and there is NO connections in pool create a new one.
  • If the pool is full, wait for a connection to be added back to pool.

All my DB calls where via callbacks (get_conn) so that I have 1 piece of code taking and returning connections to the pool… You don’t need to do that.

EDIT: Reviewing the code there may be a small issues introduced when I added the testing of the connection. If a connection is invalid and removed we go back to the “not full” state which means a thread could block inside the lock() block because other threads emptied the channel between the time isready() was called and take!() was called. It will only block until one of the other threads returns a connection so briefly you might have max-1 connections being used.

EDIT2: I probably should also take making the connection out of the critical section. :-/

Okay here, I rewrote my code, this doesn’t have the issues that the first did. :slight_smile:

using Base.Threads
import DataStructures: CircularDeque

mutable struct ConnectionPool
    pool::CircularDeque{LibPQ.Connection}
    count::Int
    max::Int
    connect::String
    event::Threads.Condition
    closed::Bool
end

function ConnectionPool()
    max     = parse(Int, ENV["DB_POOL"])
    connect = join([
        "dbname=$(ENV["DB_NAME"])",
        "user=$(ENV["DB_USER"])",
        "password=$(ENV["DB_PASS"])",
        "host=$(ENV["DB_HOST"])",
        "port=$(ENV["DB_PORT"])"
    ], " ");

    return ConnectionPool(
        CircularDeque{LibPQ.Connection}(max),
        0, max,
        connect,
        Threads.Condition(ReentrantLock()),
        false
    )
end

function Base.close(
            p::ConnectionPool
        )::Nothing
    more = 0

    lock(p.event) do
        p.closed = true
        more = p.count
        notify(p.event)
    end

    while more > 0
        lock(p.event) do
            if isempty(p.pool) == true
                wait(p.event)
            end
            if isempty(p.pool) == false
                close(popfirst!(p.pool))
                p.count -= 1
            end
            more = p.count
        end
    end
end

function test(
            conn::LibPQ.Connection
        )::Bool
    local retval = false
    local result = execute(conn, "select 1"; throw_error=false)

    if status(result) == LibPQ.libpq_c.PGRES_TUPLES_OK
        retval = true
    end

    close(result)

    return retval
end

function get_valid_connection(
            p::ConnectionPool
        )::LibPQ.Connection

    local conn = nothing

    while conn === nothing

        # ---------------------------------------------------------------------------------
        # Get a connection
        connect = false

        lock(p.event) do
            while conn === nothing && connect == false
                if p.closed == true
                    throw("Connection pool is closed.")
                end
                if isempty(p.pool) == false
                    conn = popfirst!(p.pool)
                elseif p.count < p.max
                    connect = true
                    p.count += 1
                else
                    wait(p.event)
                end
            end
        end

        # ---------------------------------------------------------------------------------
        # Create a connection if we can.
        if connect == true
            try
                conn = LibPQ.Connection(p.connect)
            catch ex
                lock(p.event) do
                    p.count -= 1
                    notify(p.event)
                end
                rethrow(ex)
            end
        end

        # ---------------------------------------------------------------------------------
        # Check the connection.
        if test(conn) == false
            close(conn)

            conn = nothing

            lock(p.event) do
                p.count -= 1
                notify(p.event)
            end
        end

    end

    return conn
end

function get_conn(
            f::Function,
            p::ConnectionPool
        )::Nothing

    local conn = get_valid_connection(p)

    try
        f(conn)
    finally
        lock(p.event) do
            push!(p.pool, conn)
            notify(p.event)
        end
    end

    nothing
end
1 Like

Wow thank you for sharing your code! Sounds like this is more complicated than I expected, but along the lines of where I was heading. I’ll try to parse the code and understand it.

For learning purposes, I’ve limited myself to the docs and reading the code of other packages. I’m sure my code samples can be vastly improved.

Thank you.