Threadsafe DBInterface.jl-like connection pool interface for thread-unsafe connections

In light of this post on database connection pools and concurrency, I’m trying making a threadsafe DBInterface.jl-like connection pool interface for thread-unsafe connections.
It’s based on DBInterface.jl and ConcurrentUtilities.jl, currently just simple 60 lines of code covering only some equivalents of DBInterface.connect, DBInterface.execute, DBInterface.close!. It seems to work so far.

Before putting any more effort into this, I have some problems in mind to be solved:

  1. Are there any packages or workarounds that already does what I’m trying to do? I tried googling and didn’t find much info on it, and I’d be grateful if you could share me some info.

  2. Prepared statements don’t seem to get along with connection pools, as statements aren’t shared and live only inside a specific connection where it was defined. I suppose you can kind-of solve the problem by creating the same statement on all the connections of a pool, if you really need one, but I fear that it might be a bad practice that can lead to exceptions and inefficient use of connections. What would be the best practice for this?

  3. How much of the DBInterface.jl functionalities do you think should be covered for general purpose usage? I think DBInterface.connect, DBInterface.execute, DBInterface.close! is kind of enough, but I’m no expert in this field and might be missing something critical.

Here are the codes so far.

module PooledDBInterface

using DBInterface, ConcurrentUtilities, Base

mutable struct ConnectionPool{T<:DBInterface.Connection}
    pool::Pool{Nothing, T}
    args
    keyargs
end

struct AcquiredConnection{T<:DBInterface.Connection}
    conn::T
    pool::Pool{Nothing, T}
end

function connect(args...;limit::Int=4096, numbers::Int=1, keyargs...)
    if numbers < 1
        err("There must be more than 1 connection to a pool.")
    elseif numbers > limit
        err("Number of initial connections can't exceed the limit")
    end
    conn=DBInterface.connect(args...;keyargs...)
    pool=Pool{typeof(conn)}(limit)
    Base.release(pool, Base.acquire(()->conn, pool, forcenew=true))
    foreach(1:numbers-1) do _
        Base.release(pool, Base.acquire(()->DBInterface.connect(args...;keyargs...), pool, forcenew=true))
    end
    return ConnectionPool{typeof(conn)}(pool, args, keyargs)
end

function execute(pool::ConnectionPool, args...;keyargs...)
    conn=Base.acquire(()->DBInterface.connect(pool.args...;pool.keyargs...), pool.pool)
    try
        return DBInterface.execute(conn, args...;keyargs...)
    finally
        Base.release(pool.pool, conn)
    end
end

function close!(pool::ConnectionPool)
    pool=pool.pool
    Base.@lock pool.lock begin
        if ConcurrentUtilities.Pools.iskeyed(pool)
            for objs in values(pool.keyedvalues)
                foreach(x->DBInterface.close!(x), objs) 
                empty!(objs)
            end
        else
            foreach(x->DBInterface.close!(x), pool.values)
            empty!(pool.values)
        end
    end
end

function Base.acquire(pool::ConnectionPool{T}) where {T<:DBInterface.Connection}
    AcquiredConnection{T}(Base.acquire(()->DBInterface.connect(pool.args...;pool.keyargs...), pool.pool), pool.pool)
end

function Base.release(conn::AcquiredConnection)
    Base.release(conn.pool, conn.conn)
end

end # module PooledDBInterface
1 Like

Is it open sourced somewhere so I can try out?

I just created a minimal working example using SQLite.jl.
There was no much progress after the initial post (I decided to move to js for my specific needs), but might be worth a look?

1 Like

Thanks. Tried on MySQL and it works fine.

1 Like