How to read data from a file in a multithreading loop

Hi Julia community,

My name is Nikolaj, I am not an expert on Julia, but I really like the syntax and speed of Julia.
I am using Julia to determine the wavelength dependent absorption coefficient of CO2, based on line-by-line data HITEMP2010 (calculated from quantum mechanics) available from HITRANonline data: HITEMP .

I am trying to read data from a file in a multithreading loop. The file consists of many lines of 160 characters each. After reading one line, I need to index into specific entries in the string. When I use only 1 thread I get the same result every time. When I use 2 or more threads, I get a different result for each run. I suspect that this happens because the different threads get ‘mixed up’ and access each others strings?

I have attached a minimal reproducible example (you also need the file “02_2250-2500_HITEMP2010.par” to run the example which can be downloaded from Index of /hitemp/data/HITEMP-2010/CO2_line_list).

using Plots
pyplot()
using LaTeXStrings

# set the number of threads
nthreads = 1 # [-] number of threads to use in parallel

# inputs
T = 1200.0 # [K] temperature
P_bar = 1.0 # [bar] pressure
P_atm = P_bar/1.01325 # [atm] pressure
P_Pa = P_atm*101325.0 # [Pa] pressure
CO2_molFrac = 0.1 # [-] mole fraction of CO2
nu_lower = 2300.0 # [1/cm] lower limit on wavenumber
nu_upper = 2305.0 # [1/cm] upper limit on wavenumber
nu_step = 0.01 # [1/cm] stepsize for wavenumber range

# initial calculations
nu_range = nu_lower:nu_step:nu_upper # [1/cm] wavenumber range
PpartCO2_atm = P_atm*CO2_molFrac # [atm] partial pressure of CO2 in atm
PpartCO2_Pa = P_Pa*CO2_molFrac # [Pa] partial pressure of CO2 in Pa
T_ref = 296.0 # [K] reference temperature of HITEMP2010
QT_ref_CO2 = 2.8609E+02 # QT at 296 K for carbon dioxide
c2 = 1.4387769 # [cm*K] constant for calculations
V_cm3 = 1e-6 # [m^3] = 1 cm^3
R = 8.314 # [J/mol-K] universal gas constant
mol_per_cm3 = PpartCO2_Pa*V_cm3/(R*T) # (mol*cm^-3) moles of CO2 per cm^3
molecules_per_cm3 = mol_per_cm3*6.02214129e23 # (molecules*cm^-3) molecules per cm^3
QT_value_CO2 = 4738.8 # The Q(1200) value has been looked up for this case

# connect to the spectral data of carbon dioxide
f_CO2 = Vector{IOStream}() # create the variable for storage
f_CO2 = push!(f_CO2, open("02_2250-2500_HITEMP2010.par")) # HITEMP2010 data file
# read the data from the file
rangeCO2 = String[] # empty string
templines = readlines(f_CO2[1]) # get data
for n = eachindex(templines) # loop over lines
    push!(rangeCO2, templines[n]) # put the lines into the CO2 range variable
end
# get the range (these values are really looked up)
index_low = 219893 # the lower index of the data we're interested in
index_upper = 234622 # the upper index of the data we're interested in
index_rangeWidth = index_upper-index_low # the range that we need

# get ready for parallel for-loop
widthNumber = 80.0 # the number of halfwidths to consider on each side of peak
kappaCO2_per_cm = zeros(length(nu_range)) # preallocate
sumKappaCO2 = zeros(length(nu_range)) # preallocate
indexCount = zeros(Int, nthreads) # preallocate
sumKappaCO2 = zeros(length(nu_range), nthreads) # preallocate
kappaCO2_per_cm = zeros(length(nu_range)) # preallocate

# enter the parallel for-loop
Threads.@threads for logicalCore in 1:nthreads
    # here I try to split the range into subranges
    indexCount[logicalCore] = trunc(Int, index_low+(index_rangeWidth*((logicalCore-1)/nthreads)))
    @label newRun # I use goto/label instead of for-loops here
    indexCount[logicalCore] += 1 # increase counter for this thread
    # now see if the current index for this thread is below the maximum for this thread
    if indexCount[logicalCore] <= trunc(Int, index_low+(index_rangeWidth*(logicalCore/nthreads)))
        line = rangeCO2[indexCount[logicalCore]] # get a single line from the data
        # indices refer to positions in the HITEMP2010 string
        vacuumWavenumber = parse(Float64, line[(2+1+1):(2+1+12)]) # 12
        intensity = parse(Float64, line[(2+1+12+1):(2+1+12+10)]) # 10
        airBroadenedHalfWidth = parse(Float64, line[(2+1+12+10+10+1):(2+1+12+10+10+5)]) # 5
        selfBroadenedHalfWidth = parse(Float64, line[(2+1+12+10+10+5+1):(2+1+12+10+10+5+5)]) # 5
        lowerStateEnergy = parse(Float64, line[(2+1+12+10+10+5+5+1):(2+1+12+10+10+5+5+10)]) # 10
        temperatureDependenceCoef = parse(Float64, line[(2+1+12+10+10+5+5+10+1):(2+1+12+10+10+5+5+10+4)]) # 4
        airPressureInducedLineShift = parse(Float64, line[(2+1+12+10+10+5+5+10+4+1):(2+1+12+10+10+5+5+10+4+8)]) # 8
        # calculate the halfwidth
        gamma = (T_ref/T)^temperatureDependenceCoef * (airBroadenedHalfWidth*(P_atm-PpartCO2_atm)+selfBroadenedHalfWidth*PpartCO2_atm)
        # calculate the intensity
        S = (intensity*(QT_ref_CO2/QT_value_CO2)*((exp(-c2*lowerStateEnergy/T))/(exp(-c2*lowerStateEnergy/T_ref)))*
                            ((1-exp(-c2*vacuumWavenumber/T))/(1-exp(-c2*vacuumWavenumber/T_ref))))
        # calculate the shifted peak position
        waveNumberShifted = vacuumWavenumber+airPressureInducedLineShift*P_atm
        nuCount = 1 # set the counter for wavenumber
        @label newNu
        if nuCount <= length(nu_range)
            nu = nu_range[nuCount]
            # now we check if we are within 80 halfwidths of the 'middle' wavenumber of the current index
            # we only do the calculations if we are within this range
            # otherwise we keep the value at zero, which should be sufficiently accurate (based on HITRAN Online)
            if (nu >= (waveNumberShifted-(widthNumber*gamma))) && (nu <= (waveNumberShifted+(widthNumber*gamma)))   
                # this is really the spectral absorption cross section (1/(cm^-2*molecule))
                kappaCO2 = (S/pi)*((gamma)/(gamma^2 +(nu-waveNumberShifted)^2))
                kappaCO2_per_cm[nuCount] = molecules_per_cm3 *kappaCO2 # (molecule*cm^-2)*(1/(cm^-2*molecule)) = (1/cm)
            elseif (nu > waveNumberShifted+(widthNumber*gamma))
                @goto endOfRun
            end
            nuCount += 1
            @goto newNu
        end
        @label endOfRun
        sumKappaCO2[:,logicalCore] += kappaCO2_per_cm[:] # add vector to sum
        @goto newRun
    end
end
# sum the contributions for each thread to get the total
sumSumKappaCO2 = sum(sumKappaCO2, dims=2)
plot() # now plot the sum
plot!(nu_range, sumSumKappaCO2 ,legend=false)
xlabel!(latexstring("Wavenumber \n [cm\$^{-1}\$]"))
ylabel!(latexstring("Spectral Absorption Coefficient \n [cm\$^{-1}\$]"))

It’s a bit much code to dig into. I would try to make your MWE even more minimal, try to remove everything that is not critically needed for your issue to occur.

2 Likes

Ok, I will do what I can. :slight_smile:

Ok, I have tried to simplify my code as much as possible, while still keeping some meaningful functionality.
When I run it on 1 thread it consistently produces the correct results, but when I run it with more than 1 thread it produces different and wrong results each time I run them.
The main loop is the following:

Threads.@threads for thread = 1:nthreads
    ilow = trunc(Int, index_low+(index_rangeWidth*((thread-1)/nthreads)))
    iup = trunc(Int, index_low+(index_rangeWidth*(thread/nthreads)))
    for i = ilow:1:iup
        gamma[i], S[i], waveShifted[i] = foo(rangeCO2, i, T, T_ref, P_atm, PpartCO2_atm, QT_ref_CO2, QT_value_CO2, c2)
        for nuCount = eachindex(nu_range)
            nu = nu_range[nuCount]
            if (nu >= (waveShifted[i]-(widthNumber*gamma[i]))) && (nu <= (waveShifted[i]+(widthNumber*gamma[i])))   
                kappaCO2 = (S[i]/pi)*((gamma[i])/(gamma[i]^2 +(nu-waveShifted[i])^2))
                kappaCO2_per_cm[nuCount] = molecules_per_cm3 *kappaCO2
            elseif (nu > waveShifted[i]+(widthNumber*gamma[i]))
                break
            end
        end
        sumKappaCO2[:] += kappaCO2_per_cm # add vector to sum
    end
    sumSumKappaCO2[:] += sumKappaCO2[:]
end

The function that is called within the loop reads data from the “rangeCO2”-variable and does a few calculations with those data:

function foo(rangeCO2, i, T, T_ref, P_atm, PpartCO2_atm, QT_ref_CO2, QT_value_CO2, c2)
    line = rangeCO2[i] # get a single line from the data
    # indices refer to positions in the HITEMP2010 string
    vacuumWavenumber = parse(Float64, line[(2+1+1):(2+1+12)]) # 12
    intensity = parse(Float64, line[(2+1+12+1):(2+1+12+10)]) # 10
    airBroadenedHalfWidth = parse(Float64, line[(2+1+12+10+10+1):(2+1+12+10+10+5)]) # 5
    selfBroadenedHalfWidth = parse(Float64, line[(2+1+12+10+10+5+1):(2+1+12+10+10+5+5)]) # 5
    lowerStateEnergy = parse(Float64, line[(2+1+12+10+10+5+5+1):(2+1+12+10+10+5+5+10)]) # 10
    temperatureDependenceCoef = parse(Float64, line[(2+1+12+10+10+5+5+10+1):(2+1+12+10+10+5+5+10+4)]) # 4
    airPressureInducedLineShift = parse(Float64, line[(2+1+12+10+10+5+5+10+4+1):(2+1+12+10+10+5+5+10+4+8)]) # 8
    # calculate the halfwidth
    gamma = (T_ref/T)^temperatureDependenceCoef * (airBroadenedHalfWidth*(P_atm-PpartCO2_atm)+selfBroadenedHalfWidth*PpartCO2_atm)
    # calculate the intensity
    S = (intensity*(QT_ref_CO2/QT_value_CO2)*((exp(-c2*lowerStateEnergy/T))/(exp(-c2*lowerStateEnergy/T_ref)))*
                        ((1-exp(-c2*vacuumWavenumber/T))/(1-exp(-c2*vacuumWavenumber/T_ref))))
    # calculate the shifted peak position
    waveShifted = vacuumWavenumber+airPressureInducedLineShift*P_atm
    return gamma, S, waveShifted
end

The “rangeCO2”-variable has been read from a file:

# connect to the spectral data of carbon dioxide
f_CO2 = Vector{IOStream}() # create the variable for storage
f_CO2 = push!(f_CO2, open("02_2250-2500_HITEMP2010.par")) # HITEMP2010 data file
# read the data from the file
rangeCO2 = String[] # empty string
templines = readlines(f_CO2[1]) # get data
for n = eachindex(templines) # loop over lines
push!(rangeCO2, templines[n]) # put the lines into the CO2 range variable
end

The rest of the code are just values that are set.
Full code example (which requires the file from HITEMP2010, mentioned in my original post):

using Plots

# set the number of threads
nthreads = 1 # [-] number of threads to use in parallel

# inputs
T = 1200.0 # [K] temperature
P_atm = 1.0/1.01325 # [atm] pressure
P_Pa = P_atm*101325.0 # [Pa] pressure
CO2_molFrac = 0.1 # [-] mole fraction of CO2

# initial calculations
nu_range = 2300.0:0.01:2305.0 # [1/cm] wavenumber range
PpartCO2_atm = P_atm*CO2_molFrac # [atm] partial pressure of CO2 in atm
PpartCO2_Pa = P_Pa*CO2_molFrac # [Pa] partial pressure of CO2 in Pa
T_ref = 296.0 # [K] reference temperature of HITEMP2010
QT_ref_CO2 = 2.8609E+02 # QT at 296 K for carbon dioxide
c2 = 1.4387769 # [cm*K] constant for calculations
V_cm3 = 1e-6 # [m^3] = 1 cm^3
R = 8.314 # [J/mol-K] universal gas constant
mol_per_cm3 = PpartCO2_Pa*V_cm3/(R*T) # (mol*cm^-3) moles of CO2 per cm^3
molecules_per_cm3 = mol_per_cm3*6.02214129e23 # (molecules*cm^-3) molecules per cm^3
QT_value_CO2 = 4738.8 # The Q(1200) value has been looked up for this case

# connect to the spectral data of carbon dioxide
f_CO2 = Vector{IOStream}() # create the variable for storage
f_CO2 = push!(f_CO2, open("02_2250-2500_HITEMP2010.par")) # HITEMP2010 data file
# read the data from the file
rangeCO2 = String[] # empty string
templines = readlines(f_CO2[1]) # get data
for n = eachindex(templines) # loop over lines
    push!(rangeCO2, templines[n]) # put the lines into the CO2 range variable
end
# get the range (these values are really looked up)
index_low = 219893 # the lower index of the data we're interested in
index_upper = 234622 # the upper index of the data we're interested in
index_rangeWidth = index_upper-index_low # the range that we need

# get ready for parallel for-loop
widthNumber = 100.0 # the number of halfwidths to consider on each side of peak
kappaCO2_per_cm = zeros(length(nu_range)) # preallocate
sumKappaCO2 = zeros(length(nu_range),nthreads) # preallocate
indexCount = zeros(Int, nthreads) # preallocate
sumKappaCO2 = zeros(length(nu_range)) # preallocate
sumSumKappaCO2 = zeros(length(nu_range)) # preallocate
kappaCO2_per_cm = zeros(length(nu_range)) # preallocate
gamma = zeros(index_upper) # preallocate
S = zeros(index_upper) # preallocate
waveShifted = zeros(index_upper) # preallocate

function foo(rangeCO2, i, T, T_ref, P_atm, PpartCO2_atm, QT_ref_CO2, QT_value_CO2, c2)
    line = rangeCO2[i] # get a single line from the data
    # indices refer to positions in the HITEMP2010 string
    vacuumWavenumber = parse(Float64, line[(2+1+1):(2+1+12)]) # 12
    intensity = parse(Float64, line[(2+1+12+1):(2+1+12+10)]) # 10
    airBroadenedHalfWidth = parse(Float64, line[(2+1+12+10+10+1):(2+1+12+10+10+5)]) # 5
    selfBroadenedHalfWidth = parse(Float64, line[(2+1+12+10+10+5+1):(2+1+12+10+10+5+5)]) # 5
    lowerStateEnergy = parse(Float64, line[(2+1+12+10+10+5+5+1):(2+1+12+10+10+5+5+10)]) # 10
    temperatureDependenceCoef = parse(Float64, line[(2+1+12+10+10+5+5+10+1):(2+1+12+10+10+5+5+10+4)]) # 4
    airPressureInducedLineShift = parse(Float64, line[(2+1+12+10+10+5+5+10+4+1):(2+1+12+10+10+5+5+10+4+8)]) # 8
    # calculate the halfwidth
    gamma = (T_ref/T)^temperatureDependenceCoef * (airBroadenedHalfWidth*(P_atm-PpartCO2_atm)+selfBroadenedHalfWidth*PpartCO2_atm)
    # calculate the intensity
    S = (intensity*(QT_ref_CO2/QT_value_CO2)*((exp(-c2*lowerStateEnergy/T))/(exp(-c2*lowerStateEnergy/T_ref)))*
                        ((1-exp(-c2*vacuumWavenumber/T))/(1-exp(-c2*vacuumWavenumber/T_ref))))
    # calculate the shifted peak position
    waveShifted = vacuumWavenumber+airPressureInducedLineShift*P_atm
    return gamma, S, waveShifted
end

Threads.@threads for thread = 1:nthreads
    ilow = trunc(Int, index_low+(index_rangeWidth*((thread-1)/nthreads)))
    iup = trunc(Int, index_low+(index_rangeWidth*(thread/nthreads)))
    for i = ilow:1:iup
        gamma[i], S[i], waveShifted[i] = foo(rangeCO2, i, T, T_ref, P_atm, PpartCO2_atm, QT_ref_CO2, QT_value_CO2, c2)
        for nuCount = eachindex(nu_range)
            nu = nu_range[nuCount]
            if (nu >= (waveShifted[i]-(widthNumber*gamma[i]))) && (nu <= (waveShifted[i]+(widthNumber*gamma[i])))   
                kappaCO2 = (S[i]/pi)*((gamma[i])/(gamma[i]^2 +(nu-waveShifted[i])^2))
                kappaCO2_per_cm[nuCount] = molecules_per_cm3 *kappaCO2
            elseif (nu > waveShifted[i]+(widthNumber*gamma[i]))
                break
            end
        end
        sumKappaCO2[:] += kappaCO2_per_cm # add vector to sum
    end
    sumSumKappaCO2[:] += sumKappaCO2[:]
end
plot(sumSumKappaCO2, legend=false)

It looks like you are trying to the add variables to the same sum in the loop. You should calculate the parts of the sum for each bit of the loop, and then sum them afterwards instead.

Also, you should put your code inside a function instead of the global outer scope, and possibly consider chunking the code into smaller functions as this usually helps performance (due to type inference being a bit easier).

1 Like

Thanks a lot, this solved my problem! :smiley:
Now the results are the same each time, regardless of the number of threads.
The reason why I was trying to sum continuously was an attempt to save memory.
But lesson learned, and many thanks!
Also thanks for the advice regarding Julia programming in general!

My code now looks like this (the function and the loop was updated, the rest is the same):

function foo(rangeCO2, i, indexlow, T, T_ref, P_atm, PpartCO2_atm, QT_ref_CO2, QT_value_CO2, c2)
    line = rangeCO2[i+indexlow-1] # get a single line from the data
    # indices refer to positions in the HITEMP2010 string
    vacuumWavenumber = parse(Float64, line[(2+1+1):(2+1+12)]) # 12
    intensity = parse(Float64, line[(2+1+12+1):(2+1+12+10)]) # 10
    airBroadenedHalfWidth = parse(Float64, line[(2+1+12+10+10+1):(2+1+12+10+10+5)]) # 5
    selfBroadenedHalfWidth = parse(Float64, line[(2+1+12+10+10+5+1):(2+1+12+10+10+5+5)]) # 5
    lowerStateEnergy = parse(Float64, line[(2+1+12+10+10+5+5+1):(2+1+12+10+10+5+5+10)]) # 10
    temperatureDependenceCoef = parse(Float64, line[(2+1+12+10+10+5+5+10+1):(2+1+12+10+10+5+5+10+4)]) # 4
    airPressureInducedLineShift = parse(Float64, line[(2+1+12+10+10+5+5+10+4+1):(2+1+12+10+10+5+5+10+4+8)]) # 8
    # calculate the halfwidth
    gamma = (T_ref/T)^temperatureDependenceCoef * (airBroadenedHalfWidth*(P_atm-PpartCO2_atm)+selfBroadenedHalfWidth*PpartCO2_atm)
    # calculate the intensity
    S = (intensity*(QT_ref_CO2/QT_value_CO2)*((exp(-c2*lowerStateEnergy/T))/(exp(-c2*lowerStateEnergy/T_ref)))*
                        ((1-exp(-c2*vacuumWavenumber/T))/(1-exp(-c2*vacuumWavenumber/T_ref))))
    # calculate the shifted peak position
    waveShifted = vacuumWavenumber+airPressureInducedLineShift*P_atm
    return gamma, S, waveShifted
end

kappaCO2_per_cm = zeros(length(nu_range), index_rangeWidth, nthreads) # preallocate
Threads.@threads for thread = 1:nthreads
    ilow = trunc(Int, (index_rangeWidth*((thread-1)/nthreads)))+1
    iup = trunc(Int, (index_rangeWidth*(thread/nthreads)))
    for i = ilow:1:iup
        gamma[i], S[i], waveShifted[i] = foo(rangeCO2, i, index_low, T, T_ref, P_atm, PpartCO2_atm, QT_ref_CO2, QT_value_CO2, c2)
        for nuCount = eachindex(nu_range)
            nu = nu_range[nuCount]
            if (nu >= (waveShifted[i]-(widthNumber*gamma[i]))) && (nu <= (waveShifted[i]+(widthNumber*gamma[i])))   
                kappaCO2 = (S[i]/pi)*((gamma[i])/(gamma[i]^2 +(nu-waveShifted[i])^2))
                kappaCO2_per_cm[nuCount, i, thread] = molecules_per_cm3 *kappaCO2
            elseif (nu > waveShifted[i]+(widthNumber*gamma[i]))
                break
            end
        end
    end
end
kappaPlot = sum(sum(kappaCO2_per_cm, dims=2), dims=3)
plot(nu_range, kappaPlot[:,1,1], legend=false)
2 Likes

Great!

I would suggest also putting the Threads.@threads loop inside a function too, and passing in the variables you need as arguments (i.e. don’t use any global variables).

1 Like

I actually ended up changing the code back to summing every time. But thanks to your help my loop is much nicer and easier to read. The code below also produces the correct results, but it uses only a fraction of memory compared to making enormous 3D arrays. :slight_smile:

function multiThreadLoop(kappasum_per_cm::Array{Float64}, nthreads::Int64, index_low::Int64, index_rangeWidth::Int64,
                        T::Float64, T_ref::Float64, P_atm::Float64, Ppart_atm::Float64, QT_ref::Float64, QT_value::Float64,
                        c2::Float64, range::Vector{String}, gamma::Vector{Float64}, S::Vector{Float64},
                        waveShifted::Vector{Float64}, nu_range)
    Threads.@threads for thread = 1:nthreads # run in parallel
        ilow = trunc(Int, index_low+(index_rangeWidth*((thread-1)/nthreads)))+1 # get lowest line wavenumber index for this run
        iup = trunc(Int, index_low+(index_rangeWidth*(thread/nthreads)))-1 # get highest line wavenumber index for this run
        thisRange = range[ilow:1:iup] # setup the range
        for i = eachindex(thisRange) # loop over each line, to create a Lorentz shape for each line
            # find the halfwidth, the temperature dependent intensity and the shifted wavepeak position
            gamma[i], S[i], waveShifted[i] = waveCalc(thisRange, i, T, T_ref, P_atm, Ppart_atm, QT_ref, QT_value, c2)
            for nuCount = eachindex(nu_range) # loop over all wavelengths in this range
                nu = nu_range[nuCount] # set the wavelength
                # only calculate if we are within a number of halfwidths from the peak
                # this number 'widthNumber' is an input to this function (80 should be fine)
                if (nu >= (waveShifted[i]-(widthNumber*gamma[i]))) && (nu <= (waveShifted[i]+(widthNumber*gamma[i])))   
                    kappa = (S[i]/pi)*((gamma[i])/(gamma[i]^2 +(nu-waveShifted[i])^2))
                    kappasum_per_cm[nuCount] += molecules_per_cm3 *kappa # we sum outside the loop
                elseif (nu > waveShifted[i]+(widthNumber*gamma[i]))
                    break
                end
            end
        end
    end
    return kappasum_per_cm
end
1 Like