Rust API

Operator

The operator API is a framework for you to implement. The implemented operator will be managed by dora. This framework enable us to make optimisation and provide advanced features. It is the recommended way of using dora.

An operator requires to be registered and implement the DoraOperator trait. It is composed of an on_event method that defines the behaviour of the operator when there is an event such as receiving an input for example.


#![allow(unused)]
#![warn(unsafe_op_in_unsafe_fn)]

fn main() {
use dora_operator_api::{register_operator, DoraOperator, DoraOutputSender, DoraStatus, Event};

register_operator!(ExampleOperator);

#[derive(Debug, Default)]
struct ExampleOperator {
    ticks: usize,
}

impl DoraOperator for ExampleOperator {
    fn on_event(
        &mut self,
        event: &Event,
        output_sender: &mut DoraOutputSender,
    ) -> Result<DoraStatus, String> {
}

Try it out!

  • Generate a new Rust library
cargo new rust-dataflow-example-operator --lib

Cargo.toml

[package]
name = "rust-dataflow-example-operator"
version.workspace = true
edition = "2021"
license.workspace = true

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[lib]
crate-type = ["cdylib"]

[dependencies]
dora-operator-api = { workspace = true }

src/lib.rs


#![allow(unused)]
#![warn(unsafe_op_in_unsafe_fn)]

fn main() {
use dora_operator_api::{register_operator, DoraOperator, DoraOutputSender, DoraStatus, Event};

register_operator!(ExampleOperator);

#[derive(Debug, Default)]
struct ExampleOperator {
    ticks: usize,
}

impl DoraOperator for ExampleOperator {
    fn on_event(
        &mut self,
        event: &Event,
        output_sender: &mut DoraOutputSender,
    ) -> Result<DoraStatus, String> {
        match event {
            Event::Input { id, data } => match *id {
                "tick" => {
                    self.ticks += 1;
                }
                "random" => {
                    let parsed = {
                        let data: [u8; 8] =
                            (*data).try_into().map_err(|_| "unexpected random data")?;
                        u64::from_le_bytes(data)
                    };
                    let output = format!(
                        "operator received random value {parsed:#x} after {} ticks",
                        self.ticks
                    );
                    output_sender.send("status".into(), output.into_bytes())?;
                }
                other => eprintln!("ignoring unexpected input {other}"),
            },
            Event::Stop => {}
            Event::InputClosed { id } => {
                println!("input `{id}` was closed");
                if *id == "random" {
                    println!("`random` input was closed -> exiting");
                    return Ok(DoraStatus::Stop);
                }
            }
            other => {
                println!("received unknown event {other:?}");
            }
        }

        Ok(DoraStatus::Continue)
    }
}
}
  • Build it:
cargo build --release
  • Link it in your graph as:
        build: cargo build -p rust-dataflow-example-operator
        shared-library: ../../target/debug/rust_dataflow_example_operator
        inputs:
          tick: dora/timer/millis/100
          random: rust-node/random
        outputs:
          - status
  - id: rust-sink
    custom:

This example can be found in examples.

Custom Node

The custom node API allow you to integrate dora into your application. It allows you to retrieve input and send output in any fashion you want.

DoraNode::init_from_env()

DoraNode::init_from_env() initiate a node from environment variables set by dora-coordinator


#![allow(unused)]
fn main() {
let (mut node, mut events) = DoraNode::init_from_env()?;
}

.recv()

.recv() wait for the next event on the events stream.


#![allow(unused)]
fn main() {
let event = events.recv();
}

.send_output(...)

send_output send data from the node to the other nodes. We take a closure as an input to enable zero copy on send.


#![allow(unused)]
fn main() {
node.send_output(
    &data_id, 
    metadata.parameters,
    data.len(),
    |out| {
        out.copy_from_slice(data);
    })?;
}

Try it out!

  • Generate a new Rust binary (application):
cargo new rust-dataflow-example-node
[package]
name = "rust-dataflow-example-node"
version.workspace = true
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
dora-node-api = { workspace = true, features = ["tracing"] }
eyre = "0.6.8"
futures = "0.3.21"
rand = "0.8.5"
tokio = { version = "1.24.2", features = ["rt", "macros"] }

src/main.rs

use dora_node_api::{self, dora_core::config::DataId, DoraNode, Event};

fn main() -> eyre::Result<()> {
    println!("hello");

    let output = DataId::from("random".to_owned());

    let (mut node, mut events) = DoraNode::init_from_env()?;

    for i in 0..100 {
        let event = match events.recv() {
            Some(input) => input,
            None => break,
        };

        match event {
            Event::Input {
                id,
                metadata,
                data: _,
            } => match id.as_str() {
                "tick" => {
                    let random: u64 = rand::random();
                    println!("tick {i}, sending {random:#x}");
                    let data: &[u8] = &random.to_le_bytes();
                    node.send_output(output.clone(), metadata.parameters, data.len(), |out| {
                        out.copy_from_slice(data);
                    })?;
                }
                other => eprintln!("Ignoring unexpected input `{other}`"),
            },
            Event::Stop => println!("Received manual stop"),
            other => eprintln!("Received unexpected input: {other:?}"),
        }
    }

    Ok(())
}
  • Link it in your graph as:
      inputs:
        tick: dora/timer/millis/10
      outputs:
        - random
  - id: runtime-node
    operators:
      - id: rust-operator