Create a Rust workspace

  • Initiate the workspace with:
mkdir my_first_dataflow
cd my_first_dataflow
  • Create the Cargo.toml file that will configure the entire workspace:

Cargo.toml

[workspace]

members = [
    "rust-dataflow-example-node",
]

Write your first node

Let's write a node which sends the current time periodically. Let's make it after 100 iterations. The other nodes/operators will then exit as well because all sources closed.

  • Generate a new Rust binary (application):
cargo new rust-dataflow-example-node

with Cargo.toml:

[package]
name = "rust-dataflow-example-node"
version.workspace = true
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
dora-node-api = { workspace = true, features = ["tracing"] }
eyre = "0.6.8"
futures = "0.3.21"
rand = "0.8.5"
tokio = { version = "1.24.2", features = ["rt", "macros"] }

with src/main.rs:

use dora_node_api::{self, dora_core::config::DataId, DoraNode, Event};

fn main() -> eyre::Result<()> {
    println!("hello");

    let output = DataId::from("random".to_owned());

    let (mut node, mut events) = DoraNode::init_from_env()?;

    for i in 0..100 {
        let event = match events.recv() {
            Some(input) => input,
            None => break,
        };

        match event {
            Event::Input {
                id,
                metadata,
                data: _,
            } => match id.as_str() {
                "tick" => {
                    let random: u64 = rand::random();
                    println!("tick {i}, sending {random:#x}");
                    let data: &[u8] = &random.to_le_bytes();
                    node.send_output(output.clone(), metadata.parameters, data.len(), |out| {
                        out.copy_from_slice(data);
                    })?;
                }
                other => eprintln!("Ignoring unexpected input `{other}`"),
            },
            Event::Stop => println!("Received manual stop"),
            other => eprintln!("Received unexpected input: {other:?}"),
        }
    }

    Ok(())
}

Write your first operator

  • Generate a new Rust library:
cargo new rust-dataflow-example-operator --lib

with Cargo.toml:

[package]
name = "rust-dataflow-example-operator"
version.workspace = true
edition = "2021"
license.workspace = true

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[lib]
crate-type = ["cdylib"]

[dependencies]
dora-operator-api = { workspace = true }

with src/lib.rs:


#![allow(unused)]
#![warn(unsafe_op_in_unsafe_fn)]

fn main() {
use dora_operator_api::{register_operator, DoraOperator, DoraOutputSender, DoraStatus, Event};

register_operator!(ExampleOperator);

#[derive(Debug, Default)]
struct ExampleOperator {
    ticks: usize,
}

impl DoraOperator for ExampleOperator {
    fn on_event(
        &mut self,
        event: &Event,
        output_sender: &mut DoraOutputSender,
    ) -> Result<DoraStatus, String> {
        match event {
            Event::Input { id, data } => match *id {
                "tick" => {
                    self.ticks += 1;
                }
                "random" => {
                    let parsed = {
                        let data: [u8; 8] =
                            (*data).try_into().map_err(|_| "unexpected random data")?;
                        u64::from_le_bytes(data)
                    };
                    let output = format!(
                        "operator received random value {parsed:#x} after {} ticks",
                        self.ticks
                    );
                    output_sender.send("status".into(), output.into_bytes())?;
                }
                other => eprintln!("ignoring unexpected input {other}"),
            },
            Event::Stop => {}
            Event::InputClosed { id } => {
                println!("input `{id}` was closed");
                if *id == "random" {
                    println!("`random` input was closed -> exiting");
                    return Ok(DoraStatus::Stop);
                }
            }
            other => {
                println!("received unknown event {other:?}");
            }
        }

        Ok(DoraStatus::Continue)
    }
}
}
  • And modify the root Cargo.toml:
[workspace]

members = [
    "rust-dataflow-example-node",
    "rust-dataflow-example-operator",
]

Write your sink node

Let's write a logger which will print incoming data.

  • Generate a new Rust binary (application):
cargo new sink_logger

with Cargo.toml:

[package]
name = "rust-dataflow-example-sink"
version.workspace = true
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
dora-node-api = { workspace = true, features = ["tracing"] }
eyre = "0.6.8"

with src/main.rs:

use dora_node_api::{self, DoraNode, Event};
use eyre::{bail, Context, ContextCompat};

fn main() -> eyre::Result<()> {
    let (_node, mut events) = DoraNode::init_from_env()?;

    while let Some(event) = events.recv() {
        match event {
            Event::Input {
                id,
                metadata: _,
                data,
            } => match id.as_str() {
                "message" => {
                    let data = data.wrap_err("no data")?;
                    let received_string = std::str::from_utf8(&data)
                        .wrap_err("received message was not utf8-encoded")?;
                    println!("sink received message: {}", received_string);
                    if !received_string.starts_with("operator received random value ") {
                        bail!("unexpected message format (should start with 'operator received random value')")
                    }
                    if !received_string.ends_with(" ticks") {
                        bail!("unexpected message format (should end with 'ticks')")
                    }
                }
                other => eprintln!("Ignoring unexpected input `{other}`"),
            },
            Event::Stop => {
                println!("Received manual stop");
            }
            Event::InputClosed { id } => {
                println!("Input `{id}` was closed");
            }
            other => eprintln!("Received unexpected input: {other:?}"),
        }
    }

    Ok(())
}
  • And modify the root Cargo.toml:
[workspace]

members = [
    "rust-dataflow-example-node",
    "rust-dataflow-example-operator",
    "rust-dataflow-example-sink"
]

Compile everything

cargo build --all --release

Write a graph definition

Let's write the graph definition so that the nodes know who to communicate with.

dataflow.yml

nodes:
  - id: rust-node
    custom:
      build: cargo build -p rust-dataflow-example-node
      source: ../../target/debug/rust-dataflow-example-node
      inputs:
        tick: dora/timer/millis/10
      outputs:
        - random
  - id: runtime-node
    operators:
      - id: rust-operator
        build: cargo build -p rust-dataflow-example-operator
        shared-library: ../../target/debug/rust_dataflow_example_operator
        inputs:
          tick: dora/timer/millis/100
          random: rust-node/random
        outputs:
          - status
  - id: rust-sink
    custom:
      build: cargo build -p rust-dataflow-example-sink
      source: ../../target/debug/rust-dataflow-example-sink
      inputs:
        message: runtime-node/rust-operator/status

Run it!

  • Run the dataflow:
dora-daemon --run-dataflow dataflow.yml