I have a folder data
with samples (csv files), I want to compute some statistics for each file and then make a summary. In snakemake you can define rules how to produce the output file from the inputs, and snakemake will handle the execution, running things only when needed (e.g. when a file has been modified). There’s not equivalent in Julia, but I think all most of the building blocks are already in Dagger and we just need some nicer frontend. Here’s an example I made quickly to illustrate:
I define a rule that takes a sample as input and create a new transformed csv file :
rule_square_df(i) = Rule(
"data/sample_$(i).csv" => "data/sample_squared_$(i).csv",
(input, output) -> begin
df = CSV.read(input[1], DataFrame)
df.xsquared = df.x.^2
CSV.write(output[1], df)
output
end
)
I make another rule that takes all the transformed sample files and make a summary :
make_summary = Rule(
["data/sample_squared_$(i).csv" for i in 1:10] => "data/samples_summary.csv",
(inputs, output) -> begin
dfs = CSV.read.(inputs, DataFrame)
mean_squared = DataFrame(sample = inputs, mean_squared = [mean(df.xsquared) for df in dfs])
CSV.write(output[1], mean_squared)
output
end
)
Now I can build my DAG and run it :
inputs = 1:10
squared = [Dagger.@spawn rule_square_df(i)(1) for i in inputs]
summary = Dagger.@spawn make_summary(squared...)
This execute the rules on different threads and then make the summary :
[ Info: running Rule(["data/sample_1.csv"], ["data/sample_squared_1.csv"], var"#147#148"()) on thread 7
[ Info: running Rule(["data/sample_3.csv"], ["data/sample_squared_3.csv"], var"#147#148"()) on thread 8
[ Info: running Rule(["data/sample_6.csv"], ["data/sample_squared_6.csv"], var"#147#148"()) on thread 6
[ Info: running Rule(["data/sample_7.csv"], ["data/sample_squared_7.csv"], var"#147#148"()) on thread 3
DTask (running)
[ Info: running Rule(["data/sample_5.csv"], ["data/sample_squared_5.csv"], var"#147#148"()) on thread 5
[ Info: running Rule(["data/sample_9.csv"], ["data/sample_squared_9.csv"], var"#147#148"()) on thread 8
[ Info: running Rule(["data/sample_2.csv"], ["data/sample_squared_2.csv"], var"#147#148"()) on thread 4
[ Info: running Rule(["data/sample_8.csv"], ["data/sample_squared_8.csv"], var"#147#148"()) on thread 8
[ Info: running Rule(["data/sample_10.csv"], ["data/sample_squared_10.csv"], var"#147#148"()) on thread 8
[ Info: running Rule(["data/sample_4.csv"], ["data/sample_squared_4.csv"], var"#147#148"()) on thread 2
[ Info: running Rule(["data/sample_squared_1.csv", "data/sample_squared_2.csv", "data/sample_squared_3.csv", "data/sample_squared_4.csv", "data/sample_squared_5.csv", "data/sample_squared_6.csv", "data/sample_squared_7.csv", "data/sample_squared_8.csv", "data/sample_squared_9.csv", "data/sample_squared_10.csv"], ["data/samples_summary.csv"], var"#150#153"()) on thread 4
If I run it again nothing happens because all the outputs are there and up-to-date :
DTask (running)
If I update an input file, things get updated correctly :
shell> touch data/sample_9.csv
[ Info: running Rule(["data/sample_9.csv"], ["data/sample_squared_9.csv"], var"#247#248"()) on thread 2
DTask (running)
[ Info: running Rule(["data/sample_squared_1.csv", "data/sample_squared_2.csv", "data/sample_squared_3.csv", "data/sample_squared_4.csv", "data/sample_squared_5.csv", "data/sample_squared_6.csv", "data/sample_squared_7.csv", "data/sample_squared_8.csv", "data/sample_squared_9.csv", "data/sample_squared_10.csv"], ["data/samples_summary.csv"], var"#250#253"()) on thread 5
My implementation is not very good and there’s probably use cases that wouldn’t work, but something a bit like this that would already be useful, even with limited functionalities. I’m not sure I’m handling input/outputs in a good way (Dagger needs tasks to link to each other via arguments) and it’s easy to do something wrong (e.g. omitting the … in the summary call will produce wrong results, because the summary will not wait for the first tasks to finish).
Here’s the full code :
Summary
using Pkg; Pkg.activate(".")
using Dagger, Statistics
using CSV, DataFrames, Dates
import Dagger.@par
##
if true
for i in 1:10
CSV.write(joinpath("data/sample_$(i).csv"), DataFrame(x=rand(10)))
end
end
##
struct Rule
inputs::Vector{String}
outputs::Vector{String}
action::Function # Function to generate outputs from inputs
end
Rule(inputs::String, outputs::String, action::Function) = Rule([inputs], [outputs], action)
Rule(inputs::Vector{String}, outputs::String, action::Function) = Rule(inputs, [outputs], action)
Rule(inputs::String, outputs::Vector{String}, action::Function) = Rule([inputs], outputs, action)
Rule(io::Pair, action::Function) = Rule(io.first, io.second, action)
function needs_update(task::Rule)
@assert all(isfile.(task.inputs))
any(!isfile, task.outputs) && return true
# Get the latest modification time of inputs
input_mtime = maximum(mtime.(task.inputs))
# Get the earliest modification time of outputs
output_mtime = minimum(mtime.(task.outputs))
# Run if any input is newer than any output
input_mtime > output_mtime
end
function (task::Rule)(inputs...) # Inputs in not used, only for dagger to build the DAG
if needs_update(task)
@info "running $(task) on thread $(Threads.threadid())"
task.action(task.inputs, task.outputs)
end
task.outputs
end
##
rule_square_df(i) = Rule(
"data/sample_$(i).csv" => "data/sample_squared_$(i).csv",
(input, output) -> begin
df = CSV.read(input[1], DataFrame)
df.xsquared = df.x.^2
CSV.write(output[1], df)
output
end
)
make_summary = Rule(
["data/sample_squared_$(i).csv" for i in 1:10] => "data/samples_summary.csv",
(inputs, output) -> begin
dfs = CSV.read.(inputs, DataFrame)
mean_squared = DataFrame(sample = inputs, mean_squared = [mean(df.xsquared) for df in dfs])
CSV.write(output[1], mean_squared)
output
end
)
##
inputs = 1:10
squared = [Dagger.@spawn rule_square_df(i)(1) for i in inputs]
summary = Dagger.@spawn make_summary(squared...)