When creating hundreds of tasks using @spawn (with multiple threads), where each task has a connection to an embedded database, is it necessary to use a connection pool structure or can each task maintain its own DBInterface.connect()? From my tests, it seems like there is no issue with hundreds of DBInterface connections, and I do not see a package for connection pools. Thanks.
I wouldn’t assume database connections are threadsafe (as the author of SQLite.jl and MySQL.jl, I know they aren’t; but I have been thinking of making them soon). I believe LibPQ.jl is last I checked.
I use the Pool
structure in the ConcurrentUtilities.jl package (currently light on docs, but plentiful on docstrings: ConcurrentUtilities.jl/src/pools.jl at main · JuliaServices/ConcurrentUtilities.jl · GitHub). It’s fairly straightforward to use to set a max # of connections and then use an acquire/release pattern from various threads. I typically write a utility like the following to enforce proper usage:
function withconnection(f)
conn = acquire(newconnection, POOL; isvalid=isopen)
try
return f(conn)
finally
release(POOL, conn)
end
end
where newconnection
is your own function that calls DBInterface.connect
In my example, the DuckDB library is thread safe across connections, and when I replicate their multithreaded python example in Julia (using julia -t 4
), I have no issues having thousands of tasks reading-writing to the database concurrently. I will try the pool if necessary, but it seems to work as is.
using Dates, DataFrames, DuckDB
db = DuckDB.DB()
DBInterface.connect(db)
DBInterface.execute(db, "CREATE OR REPLACE TABLE data (date TIMESTAMP, id INT)")
function run_reader(db)
conn = DBInterface.connect(db)
while true
println(DBInterface.execute(conn, "SELECT id, count(date) as count, max(date) as max_date FROM data group by id order by id") |> DataFrames.DataFrame)
Threads.sleep(1)
end
DBInterface.close(conn)
end
# spawn one reader task
Threads.@spawn run_reader(db)
function run_writer(db, id)
conn = DBInterface.connect(db);
while true
Threads.sleep(0.001)
DuckDB.execute(conn, "INSERT INTO data VALUES (current_timestamp, ?)"; id);
end
DBInterface.close(conn)
end
# spawn writer tasks
for i in 1:1000
Threads.@spawn run_writer(db, 1)
end