Table disappearing from duckdb database?

I have an esoteric bug. I see this:

# persistent database backed by file.
function with_db(f::Function)
    con = DBInterface.connect(DuckDB.DB, _DB_PATH)
    try
        return f(con)
    finally
        DBInterface.close(con)
    end
end

julia> JPE.db_df("form_arrivals")
4×13 DataFrame
 Row │ timestamp                journal    paper_id  title   firstname_of_author  surname_of_author  email_of_author           email_of_second_author   ⋯
     │ DateTime                 String     String    String  String               String             String                    Union{Missing, Bool}     ⋯
─────┼───────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────
   1 │ 2025-06-21T12:42:12.412  JPE        12345679  Test 1  James                TEST1              florian.oswald@gmail.com                 missing   ⋯
   2 │ 2025-06-22T10:00:23.767  JPE-Macro  1233456   Test    Mary                 TEST2              florian.oswald@gmail.com                 missing 
   3 │ 2025-06-22T10:00:54.729  JPE-Micro  654321    test 3  Peter                TEST3              florian.oswald@gmail.com                 missing 
   4 │ 2025-06-24T16:11:47.802  JPE        11111111  Test 4  Mike                 Test4              florian.oswald@gmail.com                 missing 
                                                                                                                                        5 columns omitted


julia> JPE.db_table_exists_and_not_empty("form_arrivals")
false

julia> JPE.db_df("form_arrivals")
ERROR: Catalog Error: Table with name form_arrivals does not exist!
Did you mean "pragma_database_list"?

LINE 1: SELECT * FROM form_arrivals
                      ^

where

function db_df(table::String)
    with_db() do con
        DBInterface.execute(con,"SELECT * FROM $(table)") |> DataFrame
    end
end


function db_table_exists_and_not_empty(table::String)
    # Only allow alphanumeric and underscore table names to prevent SQL injection
    if !occursin(r"^[A-Za-z_][A-Za-z0-9_]*$", table)
        @warn "Invalid table name: $table"
        return false
    end

    query = "SELECT 1 FROM $table LIMIT 1"
    try
        with_db() do con
            nrow(DataFrame(DBInterface.execute(con, query))) > 0
        end
    catch e
        # @warn "Table does not exist or cannot be queried." exception=(e, catch_backtrace())
        return false
    end
end

Am I doing anything wrong here? It seems that this table just disappears?! how is this possible?

Is this specific to the database you are looking at?

yes. I spent some quality time with this. Seems super brittle with file locks and what not, got many many database corruptions. I am currently with something like this. those test run if I just include the file, they fail if I execute them via ] test. which I can live with, I just want to be sure that I won’t loose my database again!!

anyone with more experience with this please comment on whether what I’m doing here is ok or something not good. thanks!

using Test
using DataFrames
using DuckDB  

const DB_LOCK = ReentrantLock()
const DB_CONNECTION = Ref{Union{Nothing, Any}}(nothing)

const DB_PATH = "testduck.duckdb"

function with_db(f::Function)
    lock(DB_LOCK) do
        if DB_CONNECTION[] === nothing || !isopen(DB_CONNECTION[])
            DB_CONNECTION[] = DBInterface.connect(DuckDB.DB, DB_PATH)
            # DBInterface.execute(DB_CONNECTION[], "PRAGMA journal_mode=WAL")
        end
        f(DB_CONNECTION[])
    end
end


@testset "DuckDB Connection Stress Tests" begin
    
    @testset "Rapid Operations Test" begin
        @test_nowarn begin
            println("Creating base table...")
            with_db() do con
                DBInterface.execute(con, "DROP TABLE IF EXISTS stress_test")
                DBInterface.execute(con, "CREATE TABLE stress_test (id INTEGER, iteration INTEGER, timestamp TIMESTAMP)")
            end
            
            println("Running rapid operations...")
            for i in 1:50
                with_db() do con
                    DBInterface.execute(con, "INSERT INTO stress_test VALUES ($i, $i, NOW())")
                end
                
                with_db() do con
                    result = DBInterface.execute(con, "SELECT COUNT(*) as count FROM stress_test") |> DataFrame
                    @test result.count[1] == i  # Verify count matches iteration
                    println("Iteration $i: $(result.count[1]) rows")
                end
                
                # Simulate some work
                sleep(0.01)
            end
        end
        
        # Final verification
        final_result = with_db() do con
            DBInterface.execute(con, "SELECT COUNT(*) as total, MAX(id) as max_id FROM stress_test") |> DataFrame
        end
        
        @test final_result.total[1] == 50
        @test final_result.max_id[1] == 50
        println("✓ Rapid operations test passed: $(final_result.total[1]) rows inserted")
    end
    
    @testset "Simulated Concurrent Access Test" begin
        @test_nowarn begin
            println("Setting up concurrent access simulation...")
            
            # Create table
            with_db() do con
                DBInterface.execute(con, "DROP TABLE IF EXISTS concurrent_test")
                DBInterface.execute(con, "CREATE TABLE concurrent_test (process_id INTEGER, operation_id INTEGER, data TEXT)")
            end
            
            # Simulate multiple "processes" by rapidly switching between different operations
            tasks = []
            
            for process_id in 1:3
                task = @async begin
                    for op_id in 1:20
                        try
                            with_db() do con
                                DBInterface.execute(con, "INSERT INTO concurrent_test VALUES ($process_id, $op_id, 'data_$(process_id)_$(op_id)')")
                            end
                            
                            # Random small delay
                            sleep(rand() * 0.05)
                            
                            with_db() do con
                                result = DBInterface.execute(con, "SELECT COUNT(*) as count FROM concurrent_test WHERE process_id = $process_id") |> DataFrame
                                println("Process $process_id: $(result.count[1]) rows")
                            end
                        catch e
                            println("Error in process $process_id, operation $op_id: $e")
                            rethrow(e)
                        end
                    end
                end
                push!(tasks, task)
            end
            
            # Wait for all tasks
            for task in tasks
                wait(task)
            end
        end
        
        # Final verification
        result = with_db() do con
            DBInterface.execute(con, "SELECT process_id, COUNT(*) as count FROM concurrent_test GROUP BY process_id ORDER BY process_id") |> DataFrame
        end
        
        println("Final counts by process:")
        println(result)
        
        @test nrow(result) == 3  # Should have 3 processes
        @test all(result.count .== 20)  # Each process should have 20 operations
        @test sum(result.count) == 60  # Total should be 60
        println("✓ Concurrent access test passed: $(sum(result.count)) total operations")
    end
    
    @testset "Large Transaction Test" begin
        @test_nowarn begin
            println("Testing large transaction...")
            
            with_db() do con
                DBInterface.execute(con, "DROP TABLE IF EXISTS large_test")
                DBInterface.execute(con, "CREATE TABLE large_test (id INTEGER, data TEXT, random_num FLOAT)")
            end
            
            # Insert a lot of data in one go
            with_db() do con
                DBInterface.execute(con, "BEGIN TRANSACTION")
                try
                    for i in 1:1000
                        data = "long_string_" * "x"^100 * "_$i"  # Make it somewhat large
                        random_num = rand()
                        DBInterface.execute(con, "INSERT INTO large_test VALUES ($i, '$data', $random_num)")
                        
                        if i % 100 == 0
                            println("Inserted $i rows...")
                        end
                    end
                    DBInterface.execute(con, "COMMIT")
                    println("Transaction committed successfully")
                catch e
                    DBInterface.execute(con, "ROLLBACK")
                    println("Transaction rolled back due to error: $e")
                    rethrow(e)
                end
            end
        end
        
        # Verify the data
        result = with_db() do con
            DBInterface.execute(con, "SELECT COUNT(*) as count, AVG(random_num) as avg_random FROM large_test") |> DataFrame
        end
        
        @test result.count[1] == 1000
        @test 0.0 <= result.avg_random[1] <= 1.0  # Random average should be reasonable
        println("✓ Large transaction test passed: $(result.count[1]) rows, avg random: $(result.avg_random[1])")
    end
    
    @testset "Corruption Scenarios Test" begin
        @test_nowarn begin
            println("Testing scenarios that previously caused corruption...")
            
            # Rapid table creation/dropping
            for i in 1:10
                with_db() do con
                    DBInterface.execute(con, "DROP TABLE IF EXISTS temp_table_$i")
                    DBInterface.execute(con, "CREATE TABLE temp_table_$i (id INTEGER)")
                    DBInterface.execute(con, "INSERT INTO temp_table_$i VALUES ($i)")
                end
                
                with_db() do con
                    result = DBInterface.execute(con, "SELECT * FROM temp_table_$i") |> DataFrame
                    @test result.id[1] == i
                    println("Table $i: $(result.id[1])")
                end
            end
            
            # Clean up
            with_db() do con
                for i in 1:10
                    DBInterface.execute(con, "DROP TABLE IF EXISTS temp_table_$i")
                end
            end
        end
        
        println("✓ Corruption scenarios test passed!")
    end
    
    @testset "Cleanup Test Tables" begin
        @test_nowarn begin
            with_db() do con
                DBInterface.execute(con, "DROP TABLE IF EXISTS stress_test")
                DBInterface.execute(con, "DROP TABLE IF EXISTS concurrent_test") 
                DBInterface.execute(con, "DROP TABLE IF EXISTS large_test")
            end
        end
        println("✓ Test cleanup completed")
    end
end