Hi everyone!
I’d like to share an early-stage project called MetidaFlows.jl .
The package is currently in active development and should be considered experimental .
The goal right now is to explore workflow-engine design patterns in pure Julia and collect feedback from the community.
What is MetidaFlows.jl?
MetidaFlows.jl is a lightweight graph-based workflow engine prototype for building:
- data-processing pipelines
- DAG-based computations
- agent/event-driven workflows
- typed node graphs with validation
It is designed to stay minimal, explicit, and fully Julia-native (no external runtime).
Current functionality
The package already implements:
- typed input/output ports
- graph-based workflow model
- connection validation with type checking
- DAG topological scheduler (
DAW) - queue-based scheduler (
ABW, experimental) - execution invalidation propagation
- node execution state tracking
- validation hooks (settings / result / structure)
- incremental graph modification
- simple serialization helpers
Execution model
Nodes are defined via multiple dispatch:
function MetidaFlows.execute_unsafe!(node::DataNode{MyNode})
...
end
Workflow modes
DAW — Data Analysis Workflow
Deterministic execution using topological sorting of a DAG.
ABW — Agent-Based Workflow
Queue-based execution model for dynamic/reactive workflows (still experimental).
Real working example
This example is taken directly from the test suite and actually runs:
using MetidaFlows
using CSV, DataFrames
struct CSVNode <: AbstractNodeType end
struct DataFrameNode <: AbstractNodeType end
csv_spec = NodeSpec(
"Load CSV",
PortSpec[],
[PortSpec("CSV File", CSV.File, :csv)],
[:file]
)
df_spec = NodeSpec(
"DataFrame",
[PortSpec("CSV File", CSV.File, :csv)],
[PortSpec("DataFrame", DataFrame, :dataframe)]
)
function MetidaFlows.execute_unsafe!(node::DataNode{CSVNode})
csv = CSV.File(node.settings[:file])
setdata!(node, :csv, csv)
return [:csv]
end
function MetidaFlows.execute_unsafe!(node::DataNode{DataFrameNode})
csv = getinputdata(node, :csv)
setdata!(node, :dataframe, DataFrame(csv))
return [:dataframe]
end
workflow = Workflow(0)
id1 = add_node!(workflow, DataNode(CSVNode, csv_spec))
id2 = add_node!(workflow, DataNode(DataFrameNode, df_spec))
add_connection!(workflow, id1, :csv, id2, :csv)
setsettings!(workflow, id1, Dict(:file => "data.csv"))
scheduler!(workflow)
df = getdata(workflow, id2, :dataframe)
Current status
This is not a production-ready package .
Expect breaking changes while the architecture evolves. The main focus right now is:
- stabilizing execution semantics
- improving scheduler correctness
- refining invalidation model
- improving test coverage
Roadmap ideas
- caching & checkpointing
- audit/logging system
- better serialization/export formats
Feedback welcome
I’d especially appreciate feedback from people working with:
- DAG systems
- ETL pipelines
- scientific computing workflows
- node-based editors
- agent-based systems
GitHub:
MetidaFlows.jl
Documentation:
Documentation link