A problem: writing results to a file in parallel

I’m a newbie in julia language, I’ve written a parallel code using Distributed, and I want to write the intermediate results of the program’s calculations to a file, this is my code.

    using Distributed
    # Set up parallel workers
    addprocs(4)
    @everywhere begin
        function my_function(n)
            # Compute the result
            result=n;
            # Open the file in append mode
            f = open("output.txt", "a")
            # # Write the result to the file
            println(f, "Result: ", result)
            close(f)
            # open("output.txt","a",lock=true) do io
            #     write(io,"Result: "*string(result))
            #     write(io,"\r\n")
            # end
            return result
        end
    end
    @distributed for i in 1:10000
        result = my_function(i)
        println("Result for iteration $i: $result")
    end

I want get output.txt as follow:
Result: 1
Result: 2501
Result: 2
Result: 3
Result: 4
Result: 5
Result: 6
Result: 7
Result: 8

but I got:
Result: 2566
Result: 85
Result: 2567
Result: 86
Result: 2568
Result: 87
Result: 2569
Result: 88
0
Result: 2571
Result: 89
Result: 90
2
Result: 2573
Result: 7501
Result: 105

Result: 2587
Result: 107

I can`t understand.

You are creating a race condition when multiple processes try to write to the same file simultaneously and end up overwriting one another.

You’ll either need to

  • use a lock file to control access (so that only one process writes to at once), e.g. using Pidfile
  • use a a different file format that supports parallel I/O (e.g. parallel HDF5, though that requires you to use MPI).
  • send all of the data you want to output to a single process, and write from that.

The third option is usually the simplest and most robust.

5 Likes

Thank you very much, I will try the third option, the first two were a bit difficult for me.
In the documentation, I see this in the function open: “The lock keyword argument controls whether the operation will be locked to ensure secure multithreaded access.” But it doesn’t seem to work. Thanks again.

Given that your output is pretty simple, I think a fourth way to do this would be with Logging and then use LoggingExtras.jl to pipe them to a file.

@zhpf0530 I hope this is said in a nice manner. You are doing distributed processing. Multithreaded applies to running on a single server.

@johnh Thanks. Multithreading is really good for running on a single server. I tried it, and the result was the same, and the output file was also messy. Following Steward’s suggestion, I now use channels to log the data, and then a separate process writes the data.

I think there are two mistakes regarding the use of lock=true for open in the code above (which is the default by the way):

  • it’s meant for multithreading rather than multiprocess
  • it’s meant for multiple accesses to the io value returned by open

The second is the biggest mistake: if you do io = open(..., lock=true) in different threads then each thread will use its own io object with its own lock, so the locks are useless. Instead, you should call open only once, and share the io between threads.

1 Like

You’re not doing multi-threaded access. You’re doing multi-process access. Google processes vs threads.

I have a case where I want to do something like what @zhpf0530 did. Instead, I am calling the @distributed macro from within a function that iterates over a collection of files and I do not need to use the @everywhere macro. My function is shown below, but I wonder if the lock is properly done to avoid data races, if it is not could you elaborate on how could I do what you mean in the snippet I added here?

function simulatedannealing_solvebatch_threaded(dir; decayrate=0.1, maxiter=150, mintemp=10E-15, seed=1, savetofile="output.csv", overwrite=false)
    addprocs(3)
    files = joinpath([dir, "instances.txt"]) |> readlines
    # nthreads = Threads.nthreads()    # number of threads when the julia repl was launched
    if isfile(join([dir, savetofile])) && !overwrite
        error("output file already exists, let `overwrite=true` if you really want to add new data to it")
    end
    if overwrite
        println("output file does not exists, it will be created")
    end
    if !isfile(join([dir, savetofile]))
        open(join([dir, savetofile]), "w") do io
            DelimitedFiles.writedlm(io, [["capacity", "instance", "seed", "binscount", "fitness", "seenneighbors", "visiteddelta", "visitedpr", "notfeasible", "deltatime"]], ',')
        end
    end
    @distributed for file in files
        instancename, _ = splitext(file)
        instancename = split(instancename, '_')
        classname, capacity, number = instancename 
        println("Class: $classname \t Instance: $capacity \t No. $number by $(myid())")
        # println("processing seeds...")
        # println("\t ↳ at seed: $seed")
        _, data = joinpath(dir, file) |> x -> simulated_annealing(x; decayrate=decayrate, maxiter=maxiter, mintemp=mintemp, seed=seed)
        open(join([dir, savetofile]), "a", lock=true) do io
            DelimitedFiles.writedlm(io, [[capacity, number, seed, data...]], ',')
        end
    end
    return nothing
end

lock=true only prevents races for multi-threaded access (single process with multiple threads, shared memory), not for distributed-memory access (multi-process, disjoint memory). These are two totally different parallel-computing paradigms.

(See my answer above for what to do with distributed-memory parallelism.)

another option is for each thread/process to write into its own separate file and then combine them afterwards.

2 Likes

Or use a database.

(I would maybe categorize this under “file format whose library supports parallel I/O”.)

1 Like

Many databases support connecting remotely and saving records concurrently so you don’t even need a shared file system.

2 Likes

I would categorize that under

3 Likes