Thread-safe alternative to `redirect_stdio`?

I’m confused at why the following code throws an error. I don’t see any obvious race conditions, especially since each task should have its own Pipe to work with. (I’m assuming Julia’s logging is thread-safe, which I think is correct, but regardless, I still get an error if I throw a lock around the @info statement.) Does anyone have any ideas for getting this to work?

My use case is I have code where I optimize! a JuMP model. The output from the optimization is printed to stdout, but I want to log it, so I wrap it with log_stdout.

# main.jl
function log_stdout(f; label = "")

    p = Pipe()
    local result
    try
        result = redirect_stdio(f; stdout = p)
    finally
        close(p.in)
        out = read(p, String)
        isempty(out) || @info label * "\n" * out
    end

    return result

end

function main()

    @show Threads.nthreads()
    tasks = map(1:3) do i
        Threads.@spawn log_stdout(; label = "Run $i") do
            @show i # Commenting out this line results in a different error.
            println("A log message for i = ", i)
        end
    end
    fetch.(tasks)

end

main()
julia> include("main.jl")
Threads.nthreads() = 1
┌ Info: Run 3
│ i = 3
└ A log message for i = 3
┌ Info: Run 1
└ i = 1
┌ Info: Run 2
└ i = 2
ERROR: LoadError: TaskFailedException
Stacktrace:
  [1] wait(t::Task)
    @ Base ./task.jl:370
  [2] fetch
    @ ./task.jl:390 [inlined]
  [3] _broadcast_getindex_evalf
    @ ./broadcast.jl:678 [inlined]
  [4] _broadcast_getindex
    @ ./broadcast.jl:651 [inlined]
  [5] getindex
    @ ./broadcast.jl:610 [inlined]
  [6] copy
    @ ./broadcast.jl:911 [inlined]
  [7] materialize
    @ ./broadcast.jl:872 [inlined]
  [8] main()
    @ Main ~/tmp/redirect_stdio_race/main.jl:26
  [9] top-level scope
    @ ~/tmp/redirect_stdio_race/main.jl:30
 [10] include(fname::String)
    @ Main ./sysimg.jl:38
 [11] top-level scope
    @ REPL[1]:1

    nested task error: IOError: stream is closed or unusable
    Stacktrace:
      [1] check_open
        @ ./stream.jl:388 [inlined]
      [2] uv_write_async(s::Base.PipeEndpoint, p::Ptr{UInt8}, n::UInt64)
        @ Base ./stream.jl:1108
      [3] uv_write(s::Base.PipeEndpoint, p::Ptr{UInt8}, n::UInt64)
        @ Base ./stream.jl:1069
      [4] unsafe_write(s::Base.PipeEndpoint, p::Ptr{UInt8}, n::UInt64)
        @ Base ./stream.jl:1154
      [5] write
        @ ./strings/io.jl:248 [inlined]
      [6] print(io::Base.PipeEndpoint, s::String)
        @ Base ./strings/io.jl:250
      [7] print(::Base.PipeEndpoint, ::String, ::Int64, ::Vararg{Any})
        @ Base ./strings/io.jl:46
      [8] println(::Base.PipeEndpoint, ::String, ::Vararg{Any})
        @ Base ./strings/io.jl:75
      [9] println(::String, ::Int64)
        @ Base ./coreio.jl:4
     [10] #4
        @ ~/tmp/redirect_stdio_race/main.jl:23 [inlined]
     [11] redirect_stdio(f::var"#4#7"{Int64}; stdin::Nothing, stderr::Nothing, stdout::Pipe)
        @ Base ./stream.jl:1448
     [12] log_stdout(f::Function; label::String)
        @ Main ~/tmp/redirect_stdio_race/main.jl:6
     [13] log_stdout
        @ ~/tmp/redirect_stdio_race/main.jl:1 [inlined]
     [14] (::var"#3#6"{Int64})()
        @ Main ~/tmp/redirect_stdio_race/main.jl:21
in expression starting at ~/tmp/redirect_stdio_race/main.jl:30

I’ve seen this code error on Julia 1.11.7 and 1.10.5, with 1, 2, and 12 threads.

It looks like redirect_stdio (via redirect_stdout) replaces the global stdout variable, so that makes sense why my code is not thread-safe.

What’s the recommended way for making this thread-safe? Is it even possible without changing f?

I haven’t used it myself, but maybe you want ScopedStreams.jl?

1 Like

It looks like ScopedStreams.jl (suggested by @JamesNZ) does the job! I just had to update log_stdout as follows:

using ScopedStreams
@gen_scoped_stream_methods

function log_stdout(f; label = "")

    io = IOBuffer()
    local result
    try
        result = redirect_stream(f, io)
    finally
        out = String(take!(io))
        isempty(out) || @info label * "\n" * out
    end

    return result

end