In light of this post on database connection pools and concurrency, I’m trying making a threadsafe DBInterface.jl-like connection pool interface for thread-unsafe connections.
It’s based on DBInterface.jl and ConcurrentUtilities.jl, currently just simple 60 lines of code covering only some equivalents of DBInterface.connect
, DBInterface.execute
, DBInterface.close!
. It seems to work so far.
Before putting any more effort into this, I have some problems in mind to be solved:
-
Are there any packages or workarounds that already does what I’m trying to do? I tried googling and didn’t find much info on it, and I’d be grateful if you could share me some info.
-
Prepared statements don’t seem to get along with connection pools, as statements aren’t shared and live only inside a specific connection where it was defined. I suppose you can kind-of solve the problem by creating the same statement on all the connections of a pool, if you really need one, but I fear that it might be a bad practice that can lead to exceptions and inefficient use of connections. What would be the best practice for this?
-
How much of the DBInterface.jl functionalities do you think should be covered for general purpose usage? I think
DBInterface.connect
,DBInterface.execute
,DBInterface.close!
is kind of enough, but I’m no expert in this field and might be missing something critical.
Here are the codes so far.
module PooledDBInterface
using DBInterface, ConcurrentUtilities, Base
mutable struct ConnectionPool{T<:DBInterface.Connection}
pool::Pool{Nothing, T}
args
keyargs
end
struct AcquiredConnection{T<:DBInterface.Connection}
conn::T
pool::Pool{Nothing, T}
end
function connect(args...;limit::Int=4096, numbers::Int=1, keyargs...)
if numbers < 1
err("There must be more than 1 connection to a pool.")
elseif numbers > limit
err("Number of initial connections can't exceed the limit")
end
conn=DBInterface.connect(args...;keyargs...)
pool=Pool{typeof(conn)}(limit)
Base.release(pool, Base.acquire(()->conn, pool, forcenew=true))
foreach(1:numbers-1) do _
Base.release(pool, Base.acquire(()->DBInterface.connect(args...;keyargs...), pool, forcenew=true))
end
return ConnectionPool{typeof(conn)}(pool, args, keyargs)
end
function execute(pool::ConnectionPool, args...;keyargs...)
conn=Base.acquire(()->DBInterface.connect(pool.args...;pool.keyargs...), pool.pool)
try
return DBInterface.execute(conn, args...;keyargs...)
finally
Base.release(pool.pool, conn)
end
end
function close!(pool::ConnectionPool)
pool=pool.pool
Base.@lock pool.lock begin
if ConcurrentUtilities.Pools.iskeyed(pool)
for objs in values(pool.keyedvalues)
foreach(x->DBInterface.close!(x), objs)
empty!(objs)
end
else
foreach(x->DBInterface.close!(x), pool.values)
empty!(pool.values)
end
end
end
function Base.acquire(pool::ConnectionPool{T}) where {T<:DBInterface.Connection}
AcquiredConnection{T}(Base.acquire(()->DBInterface.connect(pool.args...;pool.keyargs...), pool.pool), pool.pool)
end
function Base.release(conn::AcquiredConnection)
Base.release(conn.pool, conn.conn)
end
end # module PooledDBInterface