Are database connection pools typically used with concurrency

When creating hundreds of tasks using @spawn (with multiple threads), where each task has a connection to an embedded database, is it necessary to use a connection pool structure or can each task maintain its own DBInterface.connect()? From my tests, it seems like there is no issue with hundreds of DBInterface connections, and I do not see a package for connection pools. Thanks.

1 Like

I wouldn’t assume database connections are threadsafe (as the author of SQLite.jl and MySQL.jl, I know they aren’t; but I have been thinking of making them soon). I believe LibPQ.jl is last I checked.

I use the Pool structure in the ConcurrentUtilities.jl package (currently light on docs, but plentiful on docstrings: ConcurrentUtilities.jl/src/pools.jl at main · JuliaServices/ConcurrentUtilities.jl · GitHub). It’s fairly straightforward to use to set a max # of connections and then use an acquire/release pattern from various threads. I typically write a utility like the following to enforce proper usage:

function withconnection(f)
    conn = acquire(newconnection, POOL; isvalid=isopen)
    try
        return f(conn)
    finally
        release(POOL, conn)
    end
end

where newconnection is your own function that calls DBInterface.connect

2 Likes

In my example, the DuckDB library is thread safe across connections, and when I replicate their multithreaded python example in Julia (using julia -t 4), I have no issues having thousands of tasks reading-writing to the database concurrently. I will try the pool if necessary, but it seems to work as is.

using Dates, DataFrames, DuckDB
db = DuckDB.DB()
DBInterface.connect(db)
DBInterface.execute(db, "CREATE OR REPLACE TABLE data (date TIMESTAMP, id INT)")

function run_reader(db)
    conn = DBInterface.connect(db)
    while true 
        println(DBInterface.execute(conn, "SELECT id, count(date) as count, max(date) as max_date FROM data group by id order by id") |> DataFrames.DataFrame)
        Threads.sleep(1)
    end
    DBInterface.close(conn)
end
# spawn one reader task
Threads.@spawn run_reader(db)

function run_writer(db, id) 
    conn = DBInterface.connect(db);
    while true
        Threads.sleep(0.001)
        DuckDB.execute(conn, "INSERT INTO data VALUES (current_timestamp, ?)"; id);
    end 
    DBInterface.close(conn)
end 
# spawn writer tasks
for i in 1:1000
    Threads.@spawn run_writer(db, 1)
end
1 Like