Postgresql Read from multi-processing

I use LibPQ as following to read postgresql in a docker with multiple cores ( using the Distributed package ),

using LibPQ: Connection
conn = Connection("""host = wrds-pgdata.wharton.upenn.edu port = port
                     user='username' password='password'
                     sslmode = 'require' dbname = wrds
                  """)

and would experience error like

    nested task error: On worker 2:
    UnknownError: 
    Stacktrace:
      [1] error
        @ ~/.julia/packages/Memento/Qk5GZ/src/loggers.jl:458
      [2] #handle_result#50
        @ ~/.julia/packages/LibPQ/aC11V/src/results.jl:238
      [3] #_multi_execute#52
        @ ~/.julia/packages/LibPQ/aC11V/src/results.jl:299
      [4] _multi_execute
        @ ~/.julia/packages/LibPQ/aC11V/src/results.jl:295 [inlined]
      [5] #execute#51
        @ ~/.julia/packages/LibPQ/aC11V/src/results.jl:288 [inlined]
      [6] execute
        @ ~/.julia/packages/LibPQ/aC11V/src/results.jl:285 [inlined]

has anyone tried to work with calling postgresql with multi-processing before? Thank you!

please show your minimal example …

this code is working for me:

using ProgressMeter, LibPQ, DataFrames, Distributed
addprocs(4)
@everywhere begin
    using ProgressMeter, LibPQ, DataFrames  
    function processSQL(typelike) 
      conn = LibPQ.Connection("dbname=$(ENV["PGDATABASE"]) user=$(ENV["PGUSER"])")       
      result = execute(
          conn,
          "SELECT typname FROM pg_type where typname like '$typelike';";
          throw_error=true,
      )
      df = DataFrame(result)
      close(conn)
      return df
    end
end

sqltypes=["time%","float%","int%"]
nsqltypes=length(sqltypes)
returndf = @showprogress pmap(1:nsqltypes) do i
    processSQL(sqltypes[i])
end

log:

# julia
               _
   _       _ _(_)_     |  Documentation: https://docs.julialang.org
  (_)     | (_) (_)    |
   _ _   _| |_  __ _   |  Type "?" for help, "]?" for Pkg help.
  | | | | | | |/ _` |  |
  | | |_| | | | (_| |  |  Version 1.8.0-beta3 (2022-03-29)
 _/ |\__'_|_|_|\__'_|  |  Official https://julialang.org/ release
|__/                   |

julia> using ProgressMeter, LibPQ, DataFrames, Distributed

julia> addprocs(4)
4-element Vector{Int64}:
 2
 3
 4
 5

julia> @everywhere begin
           using ProgressMeter, LibPQ, DataFrames  
           function processSQL(typelike) 
             conn = LibPQ.Connection("dbname=$(ENV["PGDATABASE"]) user=$(ENV["PGUSER"])")       
             result = execute(
                 conn,
                 "SELECT typname FROM pg_type where typname like '$typelike';";
                 throw_error=true,
             )
             df = DataFrame(result)
             close(conn)
             return df
           end
       end

julia> sqltypes=["time%","float%","int%"]
3-element Vector{String}:
 "time%"
 "float%"
 "int%"

julia> nsqltypes=length(sqltypes)
3

julia> returndf = @showprogress pmap(1:nsqltypes) do i
           processSQL(sqltypes[i])
       end
Progress: 100%|████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| Time: 0:00:12
3-element Vector{DataFrame}:
 5×1 DataFrame
│ Row │ typname     │
│     │ String?     │
├─────┼─────────────┤
│ 1   │ time        │
│ 2   │ time_stamp  │
│ 3   │ timestamp   │
│ 4   │ timestamptz │
│ 5   │ timetz      │
 2×1 DataFrame
│ Row │ typname │
│     │ String? │
├─────┼─────────┤
│ 1   │ float4  │
│ 2   │ float8  │
 10×1 DataFrame
│ Row │ typname        │
│     │ String?        │
├─────┼────────────────┤
│ 1   │ int2           │
│ 2   │ int2vector     │
│ 3   │ int4           │
│ 4   │ int4multirange │
│ 5   │ int4range      │
│ 6   │ int8           │
│ 7   │ int8multirange │
│ 8   │ int8range      │
│ 9   │ internal       │
│ 10  │ interval       │

julia> 

1 Like

Thank you @ImreSamu !
Yes, will include minimum example, my bad…
The solution worked. I realized what I did wrong. I created conn outside of the @distributed like this

using LibPQ, DataFrames, Distributed
addprocs(4)
conn = create_db_connection()

@everywhere function fetch_db(conn, typelike)
    result = execute(
        conn,
        "SELECT typname FROM pg_type where typname like '$typelike';";
        throw_error=true,
    )
    df = DataFrame(result)    
    return df
end

sqltypes=["time%","float%","int%"]
output = @distributed (vcat) for typelike in sqltypes
    df = fetch_db(conn, typelike)
end

and obviously didn’t close connection as well. Many thanks for the answer!

1 Like