I’m confused at why the following code throws an error. I don’t see any obvious race conditions, especially since each task should have its own Pipe
to work with. (I’m assuming Julia’s logging is thread-safe, which I think is correct, but regardless, I still get an error if I throw a lock around the @info
statement.) Does anyone have any ideas for getting this to work?
My use case is I have code where I optimize!
a JuMP model. The output from the optimization is printed to stdout
, but I want to log it, so I wrap it with log_stdout
.
# main.jl
function log_stdout(f; label = "")
p = Pipe()
local result
try
result = redirect_stdio(f; stdout = p)
finally
close(p.in)
out = read(p, String)
isempty(out) || @info label * "\n" * out
end
return result
end
function main()
@show Threads.nthreads()
tasks = map(1:3) do i
Threads.@spawn log_stdout(; label = "Run $i") do
@show i # Commenting out this line results in a different error.
println("A log message for i = ", i)
end
end
fetch.(tasks)
end
main()
julia> include("main.jl")
Threads.nthreads() = 1
┌ Info: Run 3
│ i = 3
└ A log message for i = 3
┌ Info: Run 1
└ i = 1
┌ Info: Run 2
└ i = 2
ERROR: LoadError: TaskFailedException
Stacktrace:
[1] wait(t::Task)
@ Base ./task.jl:370
[2] fetch
@ ./task.jl:390 [inlined]
[3] _broadcast_getindex_evalf
@ ./broadcast.jl:678 [inlined]
[4] _broadcast_getindex
@ ./broadcast.jl:651 [inlined]
[5] getindex
@ ./broadcast.jl:610 [inlined]
[6] copy
@ ./broadcast.jl:911 [inlined]
[7] materialize
@ ./broadcast.jl:872 [inlined]
[8] main()
@ Main ~/tmp/redirect_stdio_race/main.jl:26
[9] top-level scope
@ ~/tmp/redirect_stdio_race/main.jl:30
[10] include(fname::String)
@ Main ./sysimg.jl:38
[11] top-level scope
@ REPL[1]:1
nested task error: IOError: stream is closed or unusable
Stacktrace:
[1] check_open
@ ./stream.jl:388 [inlined]
[2] uv_write_async(s::Base.PipeEndpoint, p::Ptr{UInt8}, n::UInt64)
@ Base ./stream.jl:1108
[3] uv_write(s::Base.PipeEndpoint, p::Ptr{UInt8}, n::UInt64)
@ Base ./stream.jl:1069
[4] unsafe_write(s::Base.PipeEndpoint, p::Ptr{UInt8}, n::UInt64)
@ Base ./stream.jl:1154
[5] write
@ ./strings/io.jl:248 [inlined]
[6] print(io::Base.PipeEndpoint, s::String)
@ Base ./strings/io.jl:250
[7] print(::Base.PipeEndpoint, ::String, ::Int64, ::Vararg{Any})
@ Base ./strings/io.jl:46
[8] println(::Base.PipeEndpoint, ::String, ::Vararg{Any})
@ Base ./strings/io.jl:75
[9] println(::String, ::Int64)
@ Base ./coreio.jl:4
[10] #4
@ ~/tmp/redirect_stdio_race/main.jl:23 [inlined]
[11] redirect_stdio(f::var"#4#7"{Int64}; stdin::Nothing, stderr::Nothing, stdout::Pipe)
@ Base ./stream.jl:1448
[12] log_stdout(f::Function; label::String)
@ Main ~/tmp/redirect_stdio_race/main.jl:6
[13] log_stdout
@ ~/tmp/redirect_stdio_race/main.jl:1 [inlined]
[14] (::var"#3#6"{Int64})()
@ Main ~/tmp/redirect_stdio_race/main.jl:21
in expression starting at ~/tmp/redirect_stdio_race/main.jl:30
I’ve seen this code error on Julia 1.11.7 and 1.10.5, with 1, 2, and 12 threads.