[ANN]: MetidaFlows.jl - experimental workflow engine project for Julia

Hi everyone!

I’d like to share an early-stage project called MetidaFlows.jl .

The package is currently in active development and should be considered experimental .
The goal right now is to explore workflow-engine design patterns in pure Julia and collect feedback from the community.

What is MetidaFlows.jl?

MetidaFlows.jl is a lightweight graph-based workflow engine prototype for building:

  • data-processing pipelines
  • DAG-based computations
  • agent/event-driven workflows
  • typed node graphs with validation

It is designed to stay minimal, explicit, and fully Julia-native (no external runtime).

Current functionality

The package already implements:

  • typed input/output ports
  • graph-based workflow model
  • connection validation with type checking
  • DAG topological scheduler (DAW )
  • queue-based scheduler (ABW , experimental)
  • execution invalidation propagation
  • node execution state tracking
  • validation hooks (settings / result / structure)
  • incremental graph modification
  • simple serialization helpers

Execution model

Nodes are defined via multiple dispatch:

function MetidaFlows.execute_unsafe!(node::DataNode{MyNode})
    ...
end

Workflow modes

DAW — Data Analysis Workflow

Deterministic execution using topological sorting of a DAG.

ABW — Agent-Based Workflow

Queue-based execution model for dynamic/reactive workflows (still experimental).

Real working example

This example is taken directly from the test suite and actually runs:

using MetidaFlows
using CSV, DataFrames

struct CSVNode <: AbstractNodeType end
struct DataFrameNode <: AbstractNodeType end

csv_spec = NodeSpec(
    "Load CSV",
    PortSpec[],
    [PortSpec("CSV File", CSV.File, :csv)],
    [:file]
)

df_spec = NodeSpec(
    "DataFrame",
    [PortSpec("CSV File", CSV.File, :csv)],
    [PortSpec("DataFrame", DataFrame, :dataframe)]
)

function MetidaFlows.execute_unsafe!(node::DataNode{CSVNode})
    csv = CSV.File(node.settings[:file])
    setdata!(node, :csv, csv)
    return [:csv]
end

function MetidaFlows.execute_unsafe!(node::DataNode{DataFrameNode})
    csv = getinputdata(node, :csv)
    setdata!(node, :dataframe, DataFrame(csv))
    return [:dataframe]
end

workflow = Workflow(0)

id1 = add_node!(workflow, DataNode(CSVNode, csv_spec))
id2 = add_node!(workflow, DataNode(DataFrameNode, df_spec))

add_connection!(workflow, id1, :csv, id2, :csv)

setsettings!(workflow, id1, Dict(:file => "data.csv"))

scheduler!(workflow)

df = getdata(workflow, id2, :dataframe)

Current status

This is not a production-ready package .

Expect breaking changes while the architecture evolves. The main focus right now is:

  • stabilizing execution semantics
  • improving scheduler correctness
  • refining invalidation model
  • improving test coverage

Roadmap ideas

  • caching & checkpointing
  • audit/logging system
  • better serialization/export formats

Feedback welcome

I’d especially appreciate feedback from people working with:

  • DAG systems
  • ETL pipelines
  • scientific computing workflows
  • node-based editors
  • agent-based systems

GitHub:
MetidaFlows.jl

Documentation:
Documentation link

1 Like