Strange data race problems in multi-threaded code (a possible solution)

The actual code that runs into problem is like the following:

s = mol.max_buf_size
Threads.@threads for ijkl = 1 : nbas^4
    t1, l = divrem(ijkl-1, nbas)
    t2, k = divrem(t1, nbas)
    i, j = divrem(t2, nbas)
    shells = get!(()-> Cint[0, 0, 0, 0], task_local_storage(), :shell)::Vector{Cint}
    shells[1] = i; shells[2] = j; shells[3] = k; shells[4] = l
    is, ie, ni = _get_sen(i+1, mol)
    js, je, nj = _get_sen(j+1, mol)
    ks, ke, nk = _get_sen(k+1, mol)
    ls, le, nl = _get_sen(l+1, mol)
    buf = get!(()-> zeros(s, s, s, s), task_local_storage(), :cache)::Array{Float64, 4}
    buf .= 0.0
    ccall(
        (:cint2e_sph, lib_int()), Cint,
        (
            Ptr{Cdouble}, Ptr{Cint}, Ptr{Cint}, Cint, Ptr{Cint}, Cint, Ptr{Cdouble}, Ptr{Any}
        ),
        buf, shells, mol.atm, mol.geo.n_atoms, mol.bas, nbas, mol.env, C_NULL
    )
    inte[is:ie, js:je, ks:ke, ls:le] .+= reshape(view(buf, 1:ni*nj*nk*nl), ni, nj, nk, nl)
end

Yes, many threads may write to the same array inte at the same time, but it can be promised that they will write to different parts.

I’ve print the address of buf, and found a strange thing in the non-function approach. For most of the time, the address of buf in one thread will keep the same during the loop, and each thread will have an individual address. Occasionally, one thread will happen to use the buf of another thread, which results in completely wrong results.

This peculiar behaviour may explain another fact that astonishes me even more. I’ve try to use a new array as the buf each time, with the code like the following, but the assertion in the code may fail:

s = mol.max_buf_size
Threads.@threads for ijkl = 1 : nbas^4
    t1, l = divrem(ijkl-1, nbas)
    t2, k = divrem(t1, nbas)
    i, j = divrem(t2, nbas)
    shells = get!(()-> Cint[0, 0, 0, 0], task_local_storage(), :shell)::Vector{Cint}
    shells[1] = i; shells[2] = j; shells[3] = k; shells[4] = l
    is, ie, ni = _get_sen(i+1, mol)
    js, je, nj = _get_sen(j+1, mol)
    ks, ke, nk = _get_sen(k+1, mol)
    ls, le, nl = _get_sen(l+1, mol)
    buf = zeros(ni, nj, nk, nl)
    buf .= 0.0
    ccall(
        (:cint2e_sph, lib_int()), Cint,
        (
            Ptr{Cdouble}, Ptr{Cint}, Ptr{Cint}, Cint, Ptr{Cint}, Cint, Ptr{Cdouble}, Ptr{Any}
        ),
        buf, shells, mol.atm, mol.geo.n_atoms, mol.bas, nbas, mol.env, C_NULL
    )
    @assert length(buf) == ni * nj * nk * nl
    inte[is:ie, js:je, ks:ke, ls:le] .+= buf
end

If a thread may sometimes use the buf of another thread, then the size will of course dismatch. But why will it happen?

I’ve also checked the documentation on threadcall. If I understand it correctly, I should use ccall instead of @threadcall here, because I do want to prevent executing other tasks until the call returns.

For those who insist on trying, I do have tried to use @threadcall here, which results in ConcurrencyViolationError(msg="lock must be held"). But there is nothing to lock here. Only inte is shared among threads, but first, different threads handles different parts of it, and second, if it is locked, then the whole multi-threading is completely useless.

What’s more strange, if I wrapper codes in the for loop into a function, and run it like the following, there seems to be no problem:

function _2e_inner(inte, ijkl, mol)
    # just like above
end
Threads.@threads for ijkl = 1 : nbas^4
    _2e_inner(inte, ijkl, mol)
end

However, due to some other reasons, I cannot use a function here, and I’d like to know the exact reason of the peculiar behaviour and possible solutions without wrapping into functions.

The actual code is in a quantum chemistry package trying to call libcint for electron integrals. The construction of bas and env is somehow trumblesome, but I haven’t found a simpler MWE.

julia> versioninfo()
Julia Version 1.9.4
Commit 8e5136fa297 (2023-11-14 08:46 UTC)
Build Info:
  Official https://julialang.org/ release
Platform Info:
  OS: Linux (x86_64-linux-gnu)
  CPU: 96 × Intel(R) Xeon(R) Gold 6240R CPU @ 2.40GHz
  WORD_SIZE: 64
  LIBM: libopenlibm
  LLVM: libLLVM-14.0.6 (ORCJIT, cascadelake)
  Threads: 8 on 96 virtual cores

maybe solution

using @spawn instead of @thread. Any possible problems?

function _get_partition(x, N)
    res = zeros(Int, N)
    res .= x ÷ N
    for i = 1 : x - x ÷ N * N
        res[i] += 1
    end
    res2 = zeros(Int, N+1)
    for i = 1 : N
        res2[i+1] = res2[i] + res[i]
    end
    return res, res2
end

bs, bs2 = _get_partition(nbas^4, Threads.nthreads())
@sync for it = 1 : Threads.nthreads()
    Threads.@spawn begin
        shells = Cint[0, 0, 0, 0]
        buf = zeros(s, s, s, s)
        for ijkl = bs2[it] + 1 : bs2[it+1]
            t1, l = divrem(ijkl-1, nbas)
            t2, k = divrem(t1, nbas)
            i, j = divrem(t2, nbas)
            shells[1] = i; shells[2] = j; shells[3] = k; shells[4] = l
            is, ie, ni = _get_sen(i+1, mol)
            js, je, nj = _get_sen(j+1, mol)
            ks, ke, nk = _get_sen(k+1, mol)
            ls, le, nl = _get_sen(l+1, mol)
            buf .= 0.0
            ccall(
                (:cint2e_sph, lib_int()), Cint,
                (
                    Ptr{Cdouble}, Ptr{Cint}, Ptr{Cint}, Cint, Ptr{Cint}, Cint, Ptr{Cdouble}, Ptr{Any}
                ),
                buf, shells, mol.atm, mol.geo.n_atoms, mol.bas, nbas, mol.env, C_NULL
            )
            # view(inte, is:ie, js:je, ks:ke, ls:le) .+= buf
            view(inte, is:ie, js:je, ks:ke, ls:le) .+= reshape(view(buf, 1:ni*nj*nk*nl), ni, nj, nk, nl)
        end
    end
end

ps

For those who are interested, reasons for not using a function is the following.
In my actual usage, the name :cint2e_sph need to be passed as an argument, and ccall cannot accept variables as the function symbol. Thus, in my actual code, I wrap it into a macro like the following:

macro _int_2e!(res, name, mol)
    return quote
        # something else
        ccall(
            ($(esc(name)), lib_int()), Cint,
            # something else
        )
    end
end

It seems then that the ccall part can never be put into a function, so the above solution is not practical.

The actual code as required

I’ve uploaded a MWE file. To use it, one needs to first compile the aforementioned libcint , and accordingly modify the Base.DL_LOAD_PATH. Setting METHOD to one of (:MNT, :MT, :MST, :NT), and just run main().
:MNT is none threaded and correct, :NT seems to be correct as the maybe solution part above; :MST suffers from synchronization problems as explained in the file, the actual error if I add @sync inside my macro will be

ERROR: LoadError: syntax: invalid let syntax around task.jl:475

:MT is the main problematic code, but the error only occurs occasionally, and one may need to run main() several times to trigger one. For the same reason, I’m not 100% sure whether the :NT way is really correct, or I have just happened not to trigger the error during my several runs.
mwe.jl (10.8 KB)

1 Like

How exactly?

I use something like println(pointer(buf, 1))

1 Like

That’s generally reasonable. Just to rule out something crazy though, can you check pointer_from_objref(buf) in addition to pointer(buf)?

The former is the identity for the array, the latter accesses the underlying, replaceable `Memory` buffer. (Expand and collapse for demo of the difference.)
julia> x = [0];

julia> Ref(x) .|> [pointer_from_objref, pointer]
2-element Vector{Ptr}:
 Ptr{Nothing} @0x00000144a92b7e10
 Ptr{Int64} @0x00000144a92b7e00

julia> append!(x, 1:12); # allocate new Memory

julia> Ref(x) .|> [pointer_from_objref, pointer]
2-element Vector{Ptr}:
 Ptr{Nothing} @0x00000144a92b7e10
 Ptr{Int64} @0x00000144a85f8620

I’m trying to rule out the possibility that the buf itself might not be from another task but is just corrupted. If the pointer_from_objref(buf) address remains the same, then it’s the same task-locally allocated array. If not, then I think it’s pretty much proof that the variable buf is switching objects; if you want to prove that it’s another task’s array, you could cache the addresses per ijkl to find out which threads are interfering, but I’m not sure how to safely do that in parallel given how little I know about the problem.

$ sort -n -k1 tmp4.log | uniq
2       Ptr{Nothing} @0x00007fe0999bc050        Ptr{Float64} @0x00007fe434525600
5       Ptr{Nothing} @0x00007fe0998fc010        Ptr{Float64} @0x00007fe42c536300
8       Ptr{Nothing} @0x00007fe0998fc010        Ptr{Float64} @0x00007fe42c536300

using println(Threads.threadid(), "\t", pointer_from_objref(buf), "\t", pointer(buf, 1))

Besides, there is once another problem. The output is

sort -n -k1 tmp3.log | uniq
1       Ptr{Nothing} @0x00007fe098a98f90        Ptr{Float64} @0x000000000dcec900
2       Ptr{Nothing} @0x00007fe0999bc010        Ptr{Float64} @0x00007fe434520a80
3       Ptr{Nothing} @0x00007fe09802c010        Ptr{Float64} @0x00007fe4242dcc80
7       Ptr{Nothing} @0x00007fe09989c010        Ptr{Float64} @0x00007fe41c018200

but one piece of data in inte is doubled.

Can you show the surrounding code?

If one of buf, ni, nj, nk, nl happen to be visible symbols outside you may inadvertently capture them in the closure you’re implicitly creating with Threads.@threads. That of course creates a splendid race condition.

So you should tell us whether that code runs at top-level / module scope (please don’t do that), or at function scope (ie whether there exists an enclosing function). If an enclosing function exists, show it to us. If the enclosing function has an enclosing function, show that as well, etc etc.

Don’t do this, that’s the ID of the thread that a task may run on, not the task itself (each iteration per ijkl value), and tasks can change what thread they’re on. For all we know, the repeated array on threadID 5 and 8 are from the same task/iteration. Not familiar with it, is the sort and uniq throwing away a lot of iterations? Seems weird there’s so few for nbas^4.

It’s not clear where in your code that println line is or how you’re writing the log, please provide the relevant code. You can put it in a collapsible Hide Details block if you don’t want to take up too much space. It’s also worth printing before AND after the potentially unsafe ccall line to catch possible corruption, you could distinguish the 2 calls per iteration with another column in your printout. Since you caught the buffer length mismatching before, you should also print those out. In summary, something like println(ijkl, "\t", before_or_after_ccall, "\t", pointer_from_objref(buf), "\t", pointer(buf)).

We can’t tell why that is or how that is significant without a MWE.

sort and uniq are bash commands, I just put the output in a file and try to find the unique outputs. Theoretically, if the program runs correctly, there will be at most one unique output per thread.

That’s not what your program is doing, and it’s probably not something you’d want to do. Your program should be correct regardless of the number of threads specified for the Julia process, even the default of 1, and that doesn’t mean the @threads for loop would be limited to 1 iteration and buf allocation. The @threads for loop is making tasks out of iterations to schedule among the Threads.nthreads() number of threads, don’t mix those up:

julia> Threads.@threads for i in 1:8
         task_local_storage("[i]ID", pointer_from_objref([i]))
         task_local_storage("threadID", Threads.threadid())
         println(task_local_storage())
       end
IdDict{Any, Any}("threadID" => 4, "[i]ID" => Ptr{Nothing} @0x000002a69e0b2e10)
IdDict{Any, Any}("threadID" => 2, "[i]ID" => Ptr{Nothing} @0x000002a6dc3b9830)
IdDict{Any, Any}("threadID" => 3, "[i]ID" => Ptr{Nothing} @0x000002a6df21eaf0)
IdDict{Any, Any}("threadID" => 4, "[i]ID" => Ptr{Nothing} @0x000002a69e0b37b0)
IdDict{Any, Any}("threadID" => 2, "[i]ID" => Ptr{Nothing} @0x000002a6dc3ba1d0)
IdDict{Any, Any}("threadID" => 1, "[i]ID" => Ptr{Nothing} @0x000002a6dde252d0)
IdDict{Any, Any}("threadID" => 3, "[i]ID" => Ptr{Nothing} @0x000002a6df21f490)
IdDict{Any, Any}("threadID" => 1, "[i]ID" => Ptr{Nothing} @0x000002a6dde25c70)

EDIT: Tasks aren’t threads, but this example is terrible, see next comment.

I understand I’ve made a false assumption that tasks and threads are equal, but I think this does not matters much and simplifies the discussion, so I just went on with it. Let’s be more rigorous, then.

However many threads I’m using (in the actual case, 8), the for-loop will be scheduled to some task runners (probably 8 in my case). As I get it, the task_local_storage will keep a seperate dictionary for each task runner. Be it true, in my case the probably-8-task-runners will have at most 8 different such dictionaries, and each dict may have a buf array. buf arrays in different dict should not be the same and should not overlap.

If I, in each iteration, outputs the address of the buf used in the iteration, then if the program runs correctly, all iterations processed by the same task runner should output exactly the same address, and two iterations processed by different task runners should never output the same address. So I just perform a post-processing on the outputs to get the uniq outputs.

In my actual runs, to reduct the number of outputs, I’ve quickly returned for large index values. You can see it in the attached file. Because of this, even if there are probably 8 task-runners, only some of them actually do something useful. This can be shown in the first result I post:

$ sort -n -k1 tmp4.log | uniq
2       Ptr{Nothing} @0x00007fe0999bc050        Ptr{Float64} @0x00007fe434525600
5       Ptr{Nothing} @0x00007fe0998fc010        Ptr{Float64} @0x00007fe42c536300
8       Ptr{Nothing} @0x00007fe0998fc010        Ptr{Float64} @0x00007fe42c536300

According to the above discussion, it may be explained as the following: there are only two task runners that do actual jobs, one of which always runs on thread 2, and the other may run on thread 5 or 8. It may not necessarily lead to problems, but I’ve run the program several times, and it seems that whenever a task runner runs on more than one threads, an error occurs.

The second result I post is completely another problem. It seems that 4 task runners each runs on one thread, but one iteration is processed by two different task runners, so the related result is doubled.

Based on what you’ve said, I went back to factcheck myself. With properly distinct keys, we can see that task_local_storage() does in fact match each thread. Tasks are indeed different from threads, but it seems like each iteration isn’t a distinct task or thread (see fixed example below). That actually makes sense because when nthreads() is the default 1, we don’t need Threads.@threads for to make multiple tasks to run a for-loop.

julia> Threads.nthreads() # just to show the threads for the process
2

julia> Threads.@threads for i in 1:4
         task_local_storage("[$i]ID", pointer_from_objref([i]))
         task_local_storage("threadID", Threads.threadid())
         println(pointer_from_objref(task_local_storage()), ": ", task_local_storage())
       end
Ptr{Nothing} @0x000001c4a66fbf30: IdDict{Any, Any}("threadID" => 1, "[1]ID" => Ptr{Nothing} @0x000001c4a66fbf10)
Ptr{Nothing} @0x000001c4a4f5c2b0: IdDict{Any, Any}("threadID" => 2, "[3]ID" => Ptr{Nothing} @0x000001c4a4f5c290)
Ptr{Nothing} @0x000001c4a66fbf30: IdDict{Any, Any}("threadID" => 1, "[1]ID" => Ptr{Nothing} @0x000001c4a66fbf10, "[2]ID" => Ptr{Nothing} @0x000001c4a6708bd0)
Ptr{Nothing} @0x000001c4a4f5c2b0: IdDict{Any, Any}("threadID" => 2, "[4]ID" => Ptr{Nothing} @0x000001c4a4f5cf10, "[3]ID" => Ptr{Nothing} @0x000001c4a4f5c290)

So in the first example, buf = get!(...) may reuse an array from a previous iteration cached by task_local_storage() instead of allocating a fresh array for each iteration; since tasks are free to resume on different threads, that might be what you’re seeing. I would think that the 2nd and 3rd examples get around that for buf specifically, though it doesn’t explain the 2nd example’s AssertionError. Maybe it’s the preliminary shells = get!(...)? Otherwise I’m still suspecting some Array metadata corruption going on there.

Two pointers coinciding does not conclusively prove that objects are the same: If the objects have non-overlapping lifetimes, it is possible that you first allocate an object x at address xaddr, log its address, then the object is freed, and you later allocate a new object y at the same address yaddr = xaddr.

Since the discussion has meandered too much, could you post a minimal and complete runnable example that reproduces the assertion error? Please note the full command line invocation and version. We can edit your example to remove references to deps we don’t have, like your C library (eg replace ccall by wait(…)).

Not all behavior is the same between running in a script or in the REPL.

Next, you should really give some more context, such that we can verify that the offending buf, ni, nj, nk, nl are actually local variables, and not global variables or captures (Core.Box) from lambdas / inner functions – that would be a very bad data race and explain your assertion failure.

(inner functions? lambdas? – sure, that’s what Threads.@threads does under the hood. Global variables? – loops outside of functions have weird scoping behavior that sometimes differs between REPL and script and between julia versions. Endless flamewars were had.)

2 Likes

I’ve attached a file in the post. You need to modify it a bit for the reproduction of the assertion error, though, by changing the tast_local_storage line into direct allocation, and performing the check after calling ccall.

Can you clean up your code to remove the macro mess?

There is no valid reason to have your main code live as a macro. As far as I understood you, your issue is that you can’t pass the C function name symbol as a parameter; you get around this by simply writing

FooLibCintWrap(buf, shells, atm, natoms, bas, nbas, env) =  ccall(
                               (:fooLibCintWrap, "libciint"), Cint,
                               (
                                   Ptr{Cdouble}, Ptr{Cint}, Ptr{Cint}, Cint, Ptr{Cint}, Cint, Ptr{Cdouble}, Ptr{Any}
                               ),
                               buf, shells, atm, natoms, bas, nbas, env, C_NULL)

and then passing that function as a parameter. If there are too many different symbols such that this single line would be copied too often, then you can use raw function pointers / libdl or macros to define them.

But really, maintaining a clear list of < 30 repetitive one-liners is less effort that doing macro shit. If the library is sprawling with 3000 different exported symbols and machine-readable documentation, then I recommend writing a tiny little tool that generates the source code for these wrapper functions, instead of macros.

I also strongly suggest putting the output of that generation into your git repo, but opinions on that differ.

Respectfully, that’s not a MWE, that is a whole program you’re in the middle of debugging. MWEs don’t ask people to make edits, and they omit as much dependencies, types, and methods as possible while demonstrating the particular issue. For example, if the libcint ccalls can be removed and still demonstrate incorrect buf issues (which requires an explicit example of wrong vs expected results, and as far as we know you’re witnessing adequately isolated tasks migrating to different threads) or the starker @assert length mismatch, it narrows down causes of the problem and makes it 10x easier for other people to run. Other unrelated issues should also be omitted; for example, taking a chunk of a multidimensional array like view(buf, 1:ni*nj*nk*nl) is usually a problem, but it’s not necessarily related to the issue at hand and may be supported by cint2e_sph anyway (though the documentation seems to contradict this).

Worth warning that this would also remove macro hygiene and thus variable scoping, so this has to be carefully done.