Dagger + Dates = snakemake?

I have a folder data with samples (csv files), I want to compute some statistics for each file and then make a summary. In snakemake you can define rules how to produce the output file from the inputs, and snakemake will handle the execution, running things only when needed (e.g. when a file has been modified). There’s not equivalent in Julia, but I think all most of the building blocks are already in Dagger and we just need some nicer frontend. Here’s an example I made quickly to illustrate:

I define a rule that takes a sample as input and create a new transformed csv file :

rule_square_df(i) = Rule(
    "data/sample_$(i).csv" => "data/sample_squared_$(i).csv",
    (input, output) -> begin
        df = CSV.read(input[1], DataFrame)
        df.xsquared = df.x.^2
        CSV.write(output[1], df)
        output
    end
)

I make another rule that takes all the transformed sample files and make a summary :

make_summary = Rule(
    ["data/sample_squared_$(i).csv" for i in 1:10] => "data/samples_summary.csv",
    (inputs, output) -> begin
        dfs = CSV.read.(inputs, DataFrame)
        mean_squared = DataFrame(sample = inputs, mean_squared = [mean(df.xsquared) for df in dfs])
        CSV.write(output[1], mean_squared)
        output
    end
)

Now I can build my DAG and run it :

inputs = 1:10
squared = [Dagger.@spawn rule_square_df(i)(1) for i in inputs]
summary = Dagger.@spawn make_summary(squared...)

This execute the rules on different threads and then make the summary :

[ Info: running Rule(["data/sample_1.csv"], ["data/sample_squared_1.csv"], var"#147#148"()) on thread 7
[ Info: running Rule(["data/sample_3.csv"], ["data/sample_squared_3.csv"], var"#147#148"()) on thread 8
[ Info: running Rule(["data/sample_6.csv"], ["data/sample_squared_6.csv"], var"#147#148"()) on thread 6
[ Info: running Rule(["data/sample_7.csv"], ["data/sample_squared_7.csv"], var"#147#148"()) on thread 3
DTask (running)
[ Info: running Rule(["data/sample_5.csv"], ["data/sample_squared_5.csv"], var"#147#148"()) on thread 5
[ Info: running Rule(["data/sample_9.csv"], ["data/sample_squared_9.csv"], var"#147#148"()) on thread 8
[ Info: running Rule(["data/sample_2.csv"], ["data/sample_squared_2.csv"], var"#147#148"()) on thread 4
[ Info: running Rule(["data/sample_8.csv"], ["data/sample_squared_8.csv"], var"#147#148"()) on thread 8
[ Info: running Rule(["data/sample_10.csv"], ["data/sample_squared_10.csv"], var"#147#148"()) on thread 8
[ Info: running Rule(["data/sample_4.csv"], ["data/sample_squared_4.csv"], var"#147#148"()) on thread 2
[ Info: running Rule(["data/sample_squared_1.csv", "data/sample_squared_2.csv", "data/sample_squared_3.csv", "data/sample_squared_4.csv", "data/sample_squared_5.csv", "data/sample_squared_6.csv", "data/sample_squared_7.csv", "data/sample_squared_8.csv", "data/sample_squared_9.csv", "data/sample_squared_10.csv"], ["data/samples_summary.csv"], var"#150#153"()) on thread 4

If I run it again nothing happens because all the outputs are there and up-to-date :

DTask (running)

If I update an input file, things get updated correctly :

shell> touch data/sample_9.csv

[ Info: running Rule(["data/sample_9.csv"], ["data/sample_squared_9.csv"], var"#247#248"()) on thread 2
DTask (running)
[ Info: running Rule(["data/sample_squared_1.csv", "data/sample_squared_2.csv", "data/sample_squared_3.csv", "data/sample_squared_4.csv", "data/sample_squared_5.csv", "data/sample_squared_6.csv", "data/sample_squared_7.csv", "data/sample_squared_8.csv", "data/sample_squared_9.csv", "data/sample_squared_10.csv"], ["data/samples_summary.csv"], var"#250#253"()) on thread 5

My implementation is not very good and there’s probably use cases that wouldn’t work, but something a bit like this that would already be useful, even with limited functionalities. I’m not sure I’m handling input/outputs in a good way (Dagger needs tasks to link to each other via arguments) and it’s easy to do something wrong (e.g. omitting the … in the summary call will produce wrong results, because the summary will not wait for the first tasks to finish).

Here’s the full code :

Summary
using Pkg; Pkg.activate(".")
using Dagger, Statistics
using CSV, DataFrames, Dates

import Dagger.@par

##

if true
    for i in 1:10
        CSV.write(joinpath("data/sample_$(i).csv"), DataFrame(x=rand(10)))
    end
end

##

struct Rule
    inputs::Vector{String}
    outputs::Vector{String}
    action::Function  # Function to generate outputs from inputs
end

Rule(inputs::String, outputs::String, action::Function) = Rule([inputs], [outputs], action)
Rule(inputs::Vector{String}, outputs::String, action::Function) = Rule(inputs, [outputs], action)
Rule(inputs::String, outputs::Vector{String}, action::Function) = Rule([inputs], outputs, action)

Rule(io::Pair, action::Function) = Rule(io.first, io.second, action)

function needs_update(task::Rule)
    
    @assert all(isfile.(task.inputs))
    any(!isfile, task.outputs) && return true

    # Get the latest modification time of inputs
    input_mtime = maximum(mtime.(task.inputs))
    # Get the earliest modification time of outputs
    output_mtime = minimum(mtime.(task.outputs))
    # Run if any input is newer than any output
    input_mtime > output_mtime
end

function (task::Rule)(inputs...) # Inputs in not used, only for dagger to build the DAG
    if needs_update(task)
        @info "running $(task) on thread $(Threads.threadid())"
        task.action(task.inputs, task.outputs)
    end
    task.outputs
end

##

rule_square_df(i) = Rule(
    "data/sample_$(i).csv" => "data/sample_squared_$(i).csv",
    (input, output) -> begin
        df = CSV.read(input[1], DataFrame)
        df.xsquared = df.x.^2
        CSV.write(output[1], df)
        output
    end
)

make_summary = Rule(
    ["data/sample_squared_$(i).csv" for i in 1:10] => "data/samples_summary.csv",
    (inputs, output) -> begin
        dfs = CSV.read.(inputs, DataFrame)
        mean_squared = DataFrame(sample = inputs, mean_squared = [mean(df.xsquared) for df in dfs])
        CSV.write(output[1], mean_squared)
        output
    end
)

##

inputs = 1:10
squared = [Dagger.@spawn rule_square_df(i)(1) for i in inputs]
summary = Dagger.@spawn make_summary(squared...)
8 Likes

This is a cool idea! It seems like a simpler alternative to GitHub - mrufsvold/Waluigi.jl, supporting just file inputs/outputs.

One improvement I would suggest is to use the FileWatching stdlib to automatically run the Dagger.@spawn calls when an input changes, which would make it easy to design an auto-updating pipeline with rules - maybe this could be an optional feature, since sometimes you want to manually run the pipeline, if multiple files need to be updated first.

Also, another quality-of-life improvement would be switching the order of arguments to put the action first, so that you can use do syntax.

If you want to contribute it, I would be happy to have an implementation like this directly available in Dagger, as it’s pretty useful for a lot of use cases, and has minimal dependencies. Let me know what you think!

3 Likes

Sounds good, if I have the time I’ll try to clean it up and test typical use cases to make sure it can do the job.

Interesting. I have been thinking a lot about creating something like Snakemake in Julia in the past years. We use Snakemake in our data processing and analysis pipelines in KM3NeT and there are quite a few things which are hard or even impossible with Snakemake with respect to our use-cases. The other annoying thing is that Snakemake is really slow when you have tens of thousands of files (the DAG solver is super slow). For some workflows, Snakemake takes half an hour to solve it :laughing:

3 Likes