Saving to a file during parallel computation

Hello!
I have a parallel program, and when I try to save data, I encounter the following error. Should I use channels when saving data to a file? Or is the problem solved in some other way?

Thank you for your help

Fragment of code:

Threads.@threads for p2_index_range in eachindex(p2_range)
        u0_start_2d = SVector{6}(matrix_u0_end[1, p2_index_range, :]);
        for p1_index_range in eachindex(p1_range)
            p1_index_range == 1 && continue
            
            loc_params = copy(params);
            loc_params[index_p1] = p1_range[p1_index_range];
            loc_params[index_p2] = p2_range[p2_index_range];

            prob = ODEProblem(sys, u0_start_2d, tspan, loc_params);
            sol = solver(prob, integ_set);

            if sol.retcode == ReturnCode.MaxIters
                exitcode = 1;
            end

            u0_end_2d = sol[end];

            matrix_u0_start[p1_index_range, p2_index_range, :] = u0_start_2d;
            matrix_u0_end[p1_index_range, p2_index_range, :] = u0_end_2d;
            matrix_params[p1_index_range, p2_index_range, :] = [loc_params[index_p1], loc_params[index_p2]]
            
            u0_start_2d = u0_end_2d;

            if exitcode == 1
                exit();
            end;
        end
        
        jldsave(filename_matrix_u0_start; matrix_u0_start);
        jldsave(filename_matrix_u0_end; matrix_u0_end);
        jldsave(filename_matrix_params; matrix_params);
    end

Errors:

ERROR: TaskFailedException

    nested task error: ArgumentError: attempted to truncate a file that was already open
    Stacktrace:
      [1] jldopen(fname::String, wr::Bool, create::Bool, truncate::Bool, iotype::Type{JLD2.MmapIO}; fallback::Type{IOStream}, compress::Bool, mmaparrays::Bool, typemap::Dict{String, Any}, parallel_read::Bool)
        @ JLD2 ~/.julia/packages/JLD2/IsVcY/src/JLD2.jl:334
      [2] jldopen(fname::String, mode::String; iotype::Type, kwargs::Base.Pairs{Symbol, Bool, Tuple{Symbol}, NamedTuple{(:compress,), Tuple{Bool}}})
        @ JLD2 ~/.julia/packages/JLD2/IsVcY/src/JLD2.jl:444
      [3] jldopen(::Function, ::String, ::Vararg{String}; kws::Base.Pairs{Symbol, Any, Tuple{Symbol, Symbol}, NamedTuple{(:compress, :iotype), Tuple{Bool, DataType}}})
        @ JLD2 ~/.julia/packages/JLD2/IsVcY/src/loadsave.jl:2
      [4] jldopen
        @ ~/.julia/packages/JLD2/IsVcY/src/loadsave.jl:1 [inlined]
      [5] #jldsave#81
        @ ~/.julia/packages/JLD2/IsVcY/src/loadsave.jl:243 [inlined]
      [6] jldsave (repeats 2 times)
        @ ~/.julia/packages/JLD2/IsVcY/src/loadsave.jl:240 [inlined]
      [7] macro expansion
        @ ~/work/repo_ds/dynamical-systems/TM6_glial_ECM/dia of LSE/inheritance/includes/parallel_map_u0s.jl:132 [inlined]
      [8] (::var"#16#threadsfor_fun#27"{var"#16#threadsfor_fun#26#28"{typeof(TM6_glial_ECM), Vector{Float64}, Tuple{Float64, Float64}, NamedTuple{(:alg, :adaptive, :abstol, :reltol), Tuple{Vern9{typeof(OrdinaryDiffEq.trivial_limiter!), typeof(OrdinaryDiffEq.trivial_limiter!), Static.False}, Bool, Float64, Float64}}, StepRangeLen{Float64, Base.TwicePrecision{Float64}, Base.TwicePrecision{Float64}, Int64}, StepRangeLen{Float64, Base.TwicePrecision{Float64}, Base.TwicePrecision{Float64}, Int64}, String, String, String, Array{Float64, 3}, Array{Float64, 3}, Array{Float64, 3}, Int64, Int64, Base.OneTo{Int64}}})(tid::Int64; onethread::Bool)
        @ Main ./threadingconstructs.jl:200
      [9] #16#threadsfor_fun
        @ ./threadingconstructs.jl:167 [inlined]
     [10] (::Base.Threads.var"#1#2"{var"#16#threadsfor_fun#27"{var"#16#threadsfor_fun#26#28"{typeof(TM6_glial_ECM), Vector{Float64}, Tuple{Float64, Float64}, NamedTuple{(:alg, :adaptive, :abstol, :reltol), Tuple{Vern9{typeof(OrdinaryDiffEq.trivial_limiter!), typeof(OrdinaryDiffEq.trivial_limiter!), Static.False}, Bool, Float64, Float64}}, StepRangeLen{Float64, Base.TwicePrecision{Float64}, Base.TwicePrecision{Float64}, Int64}, StepRangeLen{Float64, Base.TwicePrecision{Float64}, Base.TwicePrecision{Float64}, Int64}, String, String, String, Array{Float64, 3}, Array{Float64, 3}, Array{Float64, 3}, Int64, Int64, Base.OneTo{Int64}}}, Int64})()
        @ Base.Threads ./threadingconstructs.jl:139

...and 3 more exceptions.

Stacktrace:
 [1] threading_run(fun::var"#16#threadsfor_fun#27"{var"#16#threadsfor_fun#26#28"{typeof(TM6_glial_ECM), Vector{Float64}, Tuple{Float64, Float64}, NamedTuple{(:alg, :adaptive, :abstol, :reltol), Tuple{Vern9{typeof(OrdinaryDiffEq.trivial_limiter!), typeof(OrdinaryDiffEq.trivial_limiter!), Static.False}, Bool, Float64, Float64}}, StepRangeLen{Float64, Base.TwicePrecision{Float64}, Base.TwicePrecision{Float64}, Int64}, StepRangeLen{Float64, Base.TwicePrecision{Float64}, Base.TwicePrecision{Float64}, Int64}, String, String, String, Array{Float64, 3}, Array{Float64, 3}, Array{Float64, 3}, Int64, Int64, Base.OneTo{Int64}}}, static::Bool)
   @ Base.Threads ./threadingconstructs.jl:157
 [2] macro expansion
   @ ./threadingconstructs.jl:205 [inlined]
 [3] paralel_2pmap(sys::Function, params::Vector{Float64}, u0_initial::SVector{6, Float64}, tspan::Tuple{Float64, Float64}, integ_set::NamedTuple{(:alg, :adaptive, :abstol, :reltol), Tuple{Vern9{typeof(OrdinaryDiffEq.trivial_limiter!), typeof(OrdinaryDiffEq.trivial_limiter!), Static.False}, Bool, Float64, Float64}}, control_params::Vector{Any}, p1_range::StepRangeLen{Float64, Base.TwicePrecision{Float64}, Base.TwicePrecision{Float64}, Int64}, p2_range::StepRangeLen{Float64, Base.TwicePrecision{Float64}, Base.TwicePrecision{Float64}, Int64})
   @ Main ~/work/repo_ds/dynamical-systems/TM6_glial_ECM/dia of LSE/inheritance/includes/parallel_map_u0s.jl:103
 [4] top-level scope
   @ ~/work/repo_ds/dynamical-systems/TM6_glial_ECM/dia of LSE/inheritance/src/main.jl:49

Ideally, yes, you’d have separate threads for writing onto the file system. Even if you fixed the concurrency issues, there’s no sense in making your parallel workers wait on disk I/O. Assuming you can’t just wait until all workers are done and then write to disk.

1 Like

Could you please show an example of how to implement channels for saving data to files? I’m new to parallel computing and haven’t worked with channels yet.

I think I managed to fix this error. But I do not know how much this has affected the effectiveness of the program.

        sl = Threads.SpinLock()
        lock(sl)
        jldsave(filename_matrix_u0_start; matrix_u0_start);
        jldsave(filename_matrix_u0_end; matrix_u0_end);
        jldsave(filename_matrix_params; matrix_params);
        unlock(sl)

UPD: It didn’t help

Are you trying to save to the same file from multiple tasks?

It would be better to have a single task coordinate all file saving.

In this case every thread needs to see the same lock in order to synchronise. So you need to create it outside the threads like:

sl = Threads.SpinLock()
Threads.@threads for p2_index_range in eachindex(p2_range)
   u0_start_2d = SVector{6}(matrix_u0_end[1, p2_index_range, :]);
   for p1_index_range in eachindex(p1_range)
       # rest of code
       lock(sl)
       jldsave(filename_matrix_u0_start; matrix_u0_start);
       jldsave(filename_matrix_u0_end; matrix_u0_end);
       jldsave(filename_matrix_params; matrix_params);
       unlock(sl)
    end
end

But this will overwrite the same files over and over again. So the data is lost. What is it you want to accomplish?

EDIT: Actually just read the code again. Why do you want to save within the loops? Just move the saving out of the loops and just save the data once in the end?

Also, it’s not correct to use SpinLock for synchronizing blocking operations like disk I/O, as documented.

I would like to save the data to a file after completing the inner loop. Because calculations take several days and it often happens that electricity is cut off, as a result of which the computer turns off and the calculation is interrupted.

1 Like

Yes, I save data from different threads into one matrix. Each thread writes the results of calculations to a specific row of the matrix

Is there another way to use lock?

You might want to use jldopen to open the file once and then use Channels to build the matrix in a memoey buffer before writing on every return.

Thank you, I’ll try to use jldopen

Do you really need the global shared state in the array if you’re writing the results to files?

Alternatively, you could write out on file per loop iteration and merge them in the end.
Also note you can save multiple things into the same file. So you could avoid the need for synchronization like:

jldsave(filename*"_iteration_$(p2range).jld2"; matrix_u0_start, matrix_u0_end, matrix_params)

Each thread on the first iteration of the inner loop must receive data from the matrix. This matrix is stored as a file. I used Threads.@threads because it turned out to be the simplest, during use I didn’t think much about the global general state.

Thank you
I will try to save to different files

Here’s an example with a mutex (AKA lock) with Threads.@threads for and an alternative example, also with a mutex, but with explicit task handling (@spawn and wait). I think the best practice would be to use Channels, but I’m too rusty:

function worker(mutex, i)
  # do useful work
  result = (sind ∘ Float32)(i)

  # report on the results of the useful work
  lock(mutex)
  try
    println((i, result))
  finally
    unlock(mutex)
  end
end

function parallel_example_with_threads_for(f::F, worker_count::Int) where {F}
  let n = worker_count - 1, mutex = ReentrantLock()
    Threads.@threads for i ∈ 0:n
      f(mutex, i)
    end
  end
end

function parallel_example_with_foreach_wait(f::F, worker_count::Int) where {F}
  tasks = let n = worker_count - 1, mutex = ReentrantLock()
    [(Threads.@spawn f(mutex, i)) for i ∈ 0:n]
  end
  foreach(wait, tasks)
end

for example ∈ (
  parallel_example_with_threads_for,
  parallel_example_with_foreach_wait,
)
  println("Trying example: $example")
  example(worker, 7)
  println()
end

EDIT: in case it’s not clear, you’d need to replace the println line with saving your results to a file and the sind line with your optimization work.

The above outputs:

Trying example: parallel_example_with_threads_for
(0, 0.0f0)
(6, 0.104528464f0)
(2, 0.034899496f0)
(5, 0.087155744f0)
(4, 0.06975647f0)
(3, 0.052335955f0)
(1, 0.017452406f0)

Trying example: parallel_example_with_foreach_wait
(4, 0.06975647f0)
(6, 0.104528464f0)
(3, 0.052335955f0)
(0, 0.0f0)
(1, 0.017452406f0)
(2, 0.034899496f0)
(5, 0.087155744f0)

I think it would be cleaner to just do return or something, instead of exit().

This doesn’t matter for your specific example if each worker runs for days anyway, assuming I understood that part of your problem correctly?