Welcome to dora!

dora goal is to be a low latency, composable, and distributed data flow.

By using dora, you can define robotic applications as a graph of nodes that can be easily swapped and replaced. Those nodes can be shared and implemented in different languages such as Rust, Python or C. dora will then connect those nodes and try to provide as many features as possible to facilitate the dataflow.

✨ Features

Composability as:

  • YAML declarative programming
  • polyglot:
    • Rust
    • C
    • C++
    • Python
  • Isolated operators and custom nodes that can be reused.

Low latency as:

  • written in ...Cough...blazingly fast ...Cough... Rust.
  • PubSub communication with shared memory!
  • Zero-copy on read!

Distributed as:

  • PubSub communication between machines with zenoh
  • Distributed telemetry with opentelemetry

⚖️ LICENSE

This project is licensed under Apache-2.0. Check out NOTICE.md for more information.

Installation

This project is in early development, and many features have yet to be implemented with breaking changes. Please don't take for granted the current design. The installation process will be streamlined in the future.

Download the binaries from Github

Install dora binaries from GitHub releases:

wget https://github.com/dora-rs/dora/releases/download/<version>/dora-<version>-x86_64-Linux.zip
unzip dora-<version>-x86_64-Linux.zip
python3 -m pip install dora-rs==<version> ## For Python API
PATH=$PATH:$(pwd)
dora --help

Or compile from Source

Build it using:

git clone https://github.com/dora-rs/dora.git
cd dora
cargo build --all --release
PATH=$PATH:$(pwd)/target/release

Create a Rust workspace

  • Initiate the workspace with:
mkdir my_first_dataflow
cd my_first_dataflow
  • Create the Cargo.toml file that will configure the entire workspace:

Cargo.toml

[workspace]

members = [
    "rust-dataflow-example-node",
]

Write your first node

Let's write a node which sends the current time periodically. Let's make it after 100 iterations. The other nodes/operators will then exit as well because all sources closed.

  • Generate a new Rust binary (application):
cargo new rust-dataflow-example-node

with Cargo.toml:

[package]
name = "rust-dataflow-example-node"
version.workspace = true
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
dora-node-api = { workspace = true, features = ["tracing"] }
eyre = "0.6.8"
futures = "0.3.21"
rand = "0.8.5"
tokio = { version = "1.24.2", features = ["rt", "macros"] }

with src/main.rs:

use dora_node_api::{self, dora_core::config::DataId, DoraNode, Event};

fn main() -> eyre::Result<()> {
    println!("hello");

    let output = DataId::from("random".to_owned());

    let (mut node, mut events) = DoraNode::init_from_env()?;

    for i in 0..100 {
        let event = match events.recv() {
            Some(input) => input,
            None => break,
        };

        match event {
            Event::Input {
                id,
                metadata,
                data: _,
            } => match id.as_str() {
                "tick" => {
                    let random: u64 = rand::random();
                    println!("tick {i}, sending {random:#x}");
                    let data: &[u8] = &random.to_le_bytes();
                    node.send_output(output.clone(), metadata.parameters, data.len(), |out| {
                        out.copy_from_slice(data);
                    })?;
                }
                other => eprintln!("Ignoring unexpected input `{other}`"),
            },
            Event::Stop => println!("Received manual stop"),
            other => eprintln!("Received unexpected input: {other:?}"),
        }
    }

    Ok(())
}

Write your first operator

  • Generate a new Rust library:
cargo new rust-dataflow-example-operator --lib

with Cargo.toml:

[package]
name = "rust-dataflow-example-operator"
version.workspace = true
edition = "2021"
license.workspace = true

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[lib]
crate-type = ["cdylib"]

[dependencies]
dora-operator-api = { workspace = true }

with src/lib.rs:


#![allow(unused)]
#![warn(unsafe_op_in_unsafe_fn)]

fn main() {
use dora_operator_api::{register_operator, DoraOperator, DoraOutputSender, DoraStatus, Event};

register_operator!(ExampleOperator);

#[derive(Debug, Default)]
struct ExampleOperator {
    ticks: usize,
}

impl DoraOperator for ExampleOperator {
    fn on_event(
        &mut self,
        event: &Event,
        output_sender: &mut DoraOutputSender,
    ) -> Result<DoraStatus, String> {
        match event {
            Event::Input { id, data } => match *id {
                "tick" => {
                    self.ticks += 1;
                }
                "random" => {
                    let parsed = {
                        let data: [u8; 8] =
                            (*data).try_into().map_err(|_| "unexpected random data")?;
                        u64::from_le_bytes(data)
                    };
                    let output = format!(
                        "operator received random value {parsed:#x} after {} ticks",
                        self.ticks
                    );
                    output_sender.send("status".into(), output.into_bytes())?;
                }
                other => eprintln!("ignoring unexpected input {other}"),
            },
            Event::Stop => {}
            Event::InputClosed { id } => {
                println!("input `{id}` was closed");
                if *id == "random" {
                    println!("`random` input was closed -> exiting");
                    return Ok(DoraStatus::Stop);
                }
            }
            other => {
                println!("received unknown event {other:?}");
            }
        }

        Ok(DoraStatus::Continue)
    }
}
}
  • And modify the root Cargo.toml:
[workspace]

members = [
    "rust-dataflow-example-node",
    "rust-dataflow-example-operator",
]

Write your sink node

Let's write a logger which will print incoming data.

  • Generate a new Rust binary (application):
cargo new sink_logger

with Cargo.toml:

[package]
name = "rust-dataflow-example-sink"
version.workspace = true
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
dora-node-api = { workspace = true, features = ["tracing"] }
eyre = "0.6.8"

with src/main.rs:

use dora_node_api::{self, DoraNode, Event};
use eyre::{bail, Context, ContextCompat};

fn main() -> eyre::Result<()> {
    let (_node, mut events) = DoraNode::init_from_env()?;

    while let Some(event) = events.recv() {
        match event {
            Event::Input {
                id,
                metadata: _,
                data,
            } => match id.as_str() {
                "message" => {
                    let data = data.wrap_err("no data")?;
                    let received_string = std::str::from_utf8(&data)
                        .wrap_err("received message was not utf8-encoded")?;
                    println!("sink received message: {}", received_string);
                    if !received_string.starts_with("operator received random value ") {
                        bail!("unexpected message format (should start with 'operator received random value')")
                    }
                    if !received_string.ends_with(" ticks") {
                        bail!("unexpected message format (should end with 'ticks')")
                    }
                }
                other => eprintln!("Ignoring unexpected input `{other}`"),
            },
            Event::Stop => {
                println!("Received manual stop");
            }
            Event::InputClosed { id } => {
                println!("Input `{id}` was closed");
            }
            other => eprintln!("Received unexpected input: {other:?}"),
        }
    }

    Ok(())
}
  • And modify the root Cargo.toml:
[workspace]

members = [
    "rust-dataflow-example-node",
    "rust-dataflow-example-operator",
    "rust-dataflow-example-sink"
]

Compile everything

cargo build --all --release

Write a graph definition

Let's write the graph definition so that the nodes know who to communicate with.

dataflow.yml

nodes:
  - id: rust-node
    custom:
      build: cargo build -p rust-dataflow-example-node
      source: ../../target/debug/rust-dataflow-example-node
      inputs:
        tick: dora/timer/millis/10
      outputs:
        - random
  - id: runtime-node
    operators:
      - id: rust-operator
        build: cargo build -p rust-dataflow-example-operator
        shared-library: ../../target/debug/rust_dataflow_example_operator
        inputs:
          tick: dora/timer/millis/100
          random: rust-node/random
        outputs:
          - status
  - id: rust-sink
    custom:
      build: cargo build -p rust-dataflow-example-sink
      source: ../../target/debug/rust-dataflow-example-sink
      inputs:
        message: runtime-node/rust-operator/status

Run it!

  • Run the dataflow:
dora-daemon --run-dataflow dataflow.yml

Design Overview

The dora framework is structured into different components:

design diagram

The following main components exist:

  • Nodes: Dora nodes are separate, isolated processes that communicate with other nodes through the dora library. Nodes can be either a custom, user-specified program, or a dora runtime node, which allows to run dora operators. Nodes implement their own main function and thus have full control over their execution.

    Nodes use the dora library to communicate with other nodes, which is available for multiple languages (Rust, C; maybe Python, WASM). Communication happens through a communication layer, which will be zenoh in our first version. We plan to add more options in the future. All communication layer implementations should be robust against disconnections, so operators should be able to keep running even if they lose the connection to the coordinator.

  • Operators: Operators are light-weight, cooperative, library-based components that are executed by a dora runtime node. They must implement a specific interface, depending on the used language. Operators can use a wide range of advanced features provided by the dora runtime, for example priority scheduling or native deadline support.

  • Coordinator: The coordinator is responsible for reading the dataflow from a YAML file, verifying it, and deploying the nodes and operators to the specified or automatically determined machines. It monitors the operator's health and implements high level cluster management functionality. For example, we could implement automatic scaling for cloud nodes or operator replication and restarts. The coordinator can be controlled through a command line program (CLI).

Operators vs Custom Nodes

There are two ways to implement an operation in dora: Either as a dora operator, or as a custom nodes. Both approaches have their advantages and drawbacks, as explained below. In general, it is recommended to create dora operators and only use custom nodes when necessary.

Operators have the following advantages:

  • They can use a wide range of advanced functionality provided by the dora runtime nodes. This includes special scheduling strategies and features such as deadlines.
  • They are light-weight, so they only occupy minimal amounts of memory. This makes it possible to run thousands of operators on the same machine.
  • They can use runtime-managed state storage, for robustness or for sharing state with other operators.
  • They share the address space with other operators on the same node, which makes communication much faster.

Custom nodes provide a different set of advantages:

  • Each node is a separate, isolated process, which can be important for security-critical operations.
  • They support pinned resources. For example, a CPU core can be pinned to a custom node through the dataflow configuration file.
  • They have full control over their execution, which allows to create complex input and wake-up rules.

Dataflow Specification

Dataflows are specified through a YAML file. This section presents our current draft for the file format. It only includes basic functionality for now, we will extend it later when we introduce more advanced features.

Dataflow

Dataflows are specified through the following format:

nodes:
    - id: foo
      # ... (see below)
    - id: bar
      # ... (see below)

Inputs and Outputs

Each operator or custom node has a separate namespace for its outputs. To refer to outputs, the / syntax is used. This way, there are no name conflicts between operators.

Input operands are specified using the : / syntax, where is the internal name that should be used for the operand. The main advantage of this name mapping is that the same operator executable can be reused multiple times on different input.

Nodes

Nodes are defined using the following format:

nodes:
  - id: some-unique-id
    # For nodes with multiple operators
    operators:
      - id: operator-1
        # ... (see below)
      - id: operator-2
        # ... (see below)



  - id: some-unique-id-2
    custom:
      source: path/to/timestamp
      env:
        - ENVIRONMENT_VARIABLE_1: true
      working-directory: some/path

      inputs:
        input_1: operator_2/output_4
        input_2: custom_node_2/output_4
      outputs:
        - output_1
 
  # Unique operator
  - id: some-unique-id-3
    operator:
        # ... (see below)

Nodes must provide either a operators field, or a custom field, but not both. Nodes with an operators field run a dora runtime process, which runs and manages the specified operators. Nodes with a custom field, run a custom executable.

Custom Nodes

Custom nodes specify the executable name and arguments like a normal shell operation through the run field. Through the optional env field, it is possible to set environment variables for the process. The optional working-directory field allows to overwrite the directory in which the program is started.

To integrate with the rest of the dora dataflow, custom nodes must specify their inputs and outputs, similar to operators. They can reference outputs of both operators, and other custom nodes.

Operators

Operators are defined through the following format:

- id: unique-operator-id
  name: Human-Readable Operator Name
  description: An optional description of the operators's purpose.

  inputs:
    input_1: source_operator_2/output_1
    input_2: custom_node_1/output_1
  outputs:
    - output_1

  ## ONE OF:
  shared_library: "path/to/shared_lib" # file extension and `lib` prefix are added automatically
  python: "path/to/python_file.py"
  wasm: "path/to/wasm_file.wasm"

Operators must list all their inputs and outputs. Inputs can be linked to arbitrary outputs of other operators or custom nodes.

There are multiple ways to implement an operator:

  • as a C-compatible shared library
  • as a Python object
  • as a WebAssembly (WASM) module

Each operator must specify exactly one implementation. The implementation must follow a specific format that is specified by dora.

Example

nodes:
  - id: rust-node
    custom:
      build: cargo build -p rust-dataflow-example-node
      source: ../../target/debug/rust-dataflow-example-node
      inputs:
        tick: dora/timer/millis/10
      outputs:
        - random
  - id: runtime-node
    operators:
      - id: rust-operator
        build: cargo build -p rust-dataflow-example-operator
        shared-library: ../../target/debug/rust_dataflow_example_operator
        inputs:
          tick: dora/timer/millis/100
          random: rust-node/random
        outputs:
          - status
  - id: rust-sink
    custom:
      build: cargo build -p rust-dataflow-example-sink
      source: ../../target/debug/rust-dataflow-example-sink
      inputs:
        message: runtime-node/rust-operator/status

TODO: Integration with ROS 1/2

To integrate dora-rs operators with ROS1 or ROS2 operators, we plan to provide special bridge operators. These operators act as a sink in one dataflow framework and push all messages to a different dataflow framework, where they act as source.

For example, we plan to provide a to_ros_2 operator, which takes a single data input, which is then published to a specified ROS 2 dataflow.

Rust API

Operator

The operator API is a framework for you to implement. The implemented operator will be managed by dora. This framework enable us to make optimisation and provide advanced features. It is the recommended way of using dora.

An operator requires to be registered and implement the DoraOperator trait. It is composed of an on_event method that defines the behaviour of the operator when there is an event such as receiving an input for example.


#![allow(unused)]
#![warn(unsafe_op_in_unsafe_fn)]

fn main() {
use dora_operator_api::{register_operator, DoraOperator, DoraOutputSender, DoraStatus, Event};

register_operator!(ExampleOperator);

#[derive(Debug, Default)]
struct ExampleOperator {
    ticks: usize,
}

impl DoraOperator for ExampleOperator {
    fn on_event(
        &mut self,
        event: &Event,
        output_sender: &mut DoraOutputSender,
    ) -> Result<DoraStatus, String> {
}

Try it out!

  • Generate a new Rust library
cargo new rust-dataflow-example-operator --lib

Cargo.toml

[package]
name = "rust-dataflow-example-operator"
version.workspace = true
edition = "2021"
license.workspace = true

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[lib]
crate-type = ["cdylib"]

[dependencies]
dora-operator-api = { workspace = true }

src/lib.rs


#![allow(unused)]
#![warn(unsafe_op_in_unsafe_fn)]

fn main() {
use dora_operator_api::{register_operator, DoraOperator, DoraOutputSender, DoraStatus, Event};

register_operator!(ExampleOperator);

#[derive(Debug, Default)]
struct ExampleOperator {
    ticks: usize,
}

impl DoraOperator for ExampleOperator {
    fn on_event(
        &mut self,
        event: &Event,
        output_sender: &mut DoraOutputSender,
    ) -> Result<DoraStatus, String> {
        match event {
            Event::Input { id, data } => match *id {
                "tick" => {
                    self.ticks += 1;
                }
                "random" => {
                    let parsed = {
                        let data: [u8; 8] =
                            (*data).try_into().map_err(|_| "unexpected random data")?;
                        u64::from_le_bytes(data)
                    };
                    let output = format!(
                        "operator received random value {parsed:#x} after {} ticks",
                        self.ticks
                    );
                    output_sender.send("status".into(), output.into_bytes())?;
                }
                other => eprintln!("ignoring unexpected input {other}"),
            },
            Event::Stop => {}
            Event::InputClosed { id } => {
                println!("input `{id}` was closed");
                if *id == "random" {
                    println!("`random` input was closed -> exiting");
                    return Ok(DoraStatus::Stop);
                }
            }
            other => {
                println!("received unknown event {other:?}");
            }
        }

        Ok(DoraStatus::Continue)
    }
}
}
  • Build it:
cargo build --release
  • Link it in your graph as:
        build: cargo build -p rust-dataflow-example-operator
        shared-library: ../../target/debug/rust_dataflow_example_operator
        inputs:
          tick: dora/timer/millis/100
          random: rust-node/random
        outputs:
          - status
  - id: rust-sink
    custom:

This example can be found in examples.

Custom Node

The custom node API allow you to integrate dora into your application. It allows you to retrieve input and send output in any fashion you want.

DoraNode::init_from_env()

DoraNode::init_from_env() initiate a node from environment variables set by dora-coordinator


#![allow(unused)]
fn main() {
let (mut node, mut events) = DoraNode::init_from_env()?;
}

.recv()

.recv() wait for the next event on the events stream.


#![allow(unused)]
fn main() {
let event = events.recv();
}

.send_output(...)

send_output send data from the node to the other nodes. We take a closure as an input to enable zero copy on send.


#![allow(unused)]
fn main() {
node.send_output(
    &data_id, 
    metadata.parameters,
    data.len(),
    |out| {
        out.copy_from_slice(data);
    })?;
}

Try it out!

  • Generate a new Rust binary (application):
cargo new rust-dataflow-example-node
[package]
name = "rust-dataflow-example-node"
version.workspace = true
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
dora-node-api = { workspace = true, features = ["tracing"] }
eyre = "0.6.8"
futures = "0.3.21"
rand = "0.8.5"
tokio = { version = "1.24.2", features = ["rt", "macros"] }

src/main.rs

use dora_node_api::{self, dora_core::config::DataId, DoraNode, Event};

fn main() -> eyre::Result<()> {
    println!("hello");

    let output = DataId::from("random".to_owned());

    let (mut node, mut events) = DoraNode::init_from_env()?;

    for i in 0..100 {
        let event = match events.recv() {
            Some(input) => input,
            None => break,
        };

        match event {
            Event::Input {
                id,
                metadata,
                data: _,
            } => match id.as_str() {
                "tick" => {
                    let random: u64 = rand::random();
                    println!("tick {i}, sending {random:#x}");
                    let data: &[u8] = &random.to_le_bytes();
                    node.send_output(output.clone(), metadata.parameters, data.len(), |out| {
                        out.copy_from_slice(data);
                    })?;
                }
                other => eprintln!("Ignoring unexpected input `{other}`"),
            },
            Event::Stop => println!("Received manual stop"),
            other => eprintln!("Received unexpected input: {other:?}"),
        }
    }

    Ok(())
}
  • Link it in your graph as:
      inputs:
        tick: dora/timer/millis/10
      outputs:
        - random
  - id: runtime-node
    operators:
      - id: rust-operator

C API

Operator

The operator API is a framework for you to implement. The implemented operator will be managed by dora. This framework enable us to make optimisation and provide advanced features.

The operator definition is composed of 3 functions, dora_init_operator that initialise the operator and its context. dora_drop_operator that free the memory, and dora_on_event that action the logic of the operator on receiving an input.

#include "../../apis/c/operator/operator_api.h"
#include <assert.h>
#include <string.h>
#include <stdio.h>
#include <stdlib.h>

DoraInitResult_t dora_init_operator(void)
{
    void *context = malloc(1);
    char *context_char = (char *)context;
    *context_char = 0;

    DoraInitResult_t result = {.operator_context = context};
    return result;
}

DoraResult_t dora_drop_operator(void *operator_context)
{
    free(operator_context);

    DoraResult_t result = {};
    return result;
}

OnEventResult_t dora_on_event(
    const RawEvent_t *event,
    const SendOutput_t *send_output,
    void *operator_context)
{

Try it out!

  • Create an operator.c file:
#include "../../apis/c/operator/operator_api.h"
#include <assert.h>
#include <string.h>
#include <stdio.h>
#include <stdlib.h>

DoraInitResult_t dora_init_operator(void)
{
    void *context = malloc(1);
    char *context_char = (char *)context;
    *context_char = 0;

    DoraInitResult_t result = {.operator_context = context};
    return result;
}

DoraResult_t dora_drop_operator(void *operator_context)
{
    free(operator_context);

    DoraResult_t result = {};
    return result;
}

OnEventResult_t dora_on_event(
    const RawEvent_t *event,
    const SendOutput_t *send_output,
    void *operator_context)
{
    char *counter = (char *)operator_context;

    if (event->input != NULL)
    {
        // input event
        Input_t *input = event->input;

        char id[input->id.len + 1];
        memcpy(id, input->id.ptr, input->id.len);
        id[input->id.len] = 0;

        if (strcmp(id, "message") == 0)
        {
            char data[input->data.len + 1];
            memcpy(data, input->data.ptr, input->data.len);
            data[input->data.len] = 0;

            *counter += 1;
            printf("C operator received message `%s`, counter: %i\n", data, *counter);

            char *out_id = "counter";
            char *out_id_heap = strdup(out_id);

            int data_alloc_size = 100;
            char *out_data = (char *)malloc(data_alloc_size);
            int count = snprintf(out_data, data_alloc_size, "The current counter value is %d", *counter);
            assert(count >= 0 && count < 100);

            Output_t output = {.id = {
                                   .ptr = (uint8_t *)out_id_heap,
                                   .len = strlen(out_id_heap),
                                   .cap = strlen(out_id_heap) + 1,
                               },
                               .data = {.ptr = (uint8_t *)out_data, .len = strlen(out_data), .cap = data_alloc_size}};
            DoraResult_t res = (send_output->send_output.call)(send_output->send_output.env_ptr, output);

            OnEventResult_t result = {.result = res, .status = DORA_STATUS_CONTINUE};
            return result;
        }
        else
        {
            printf("C operator received unexpected input %s, context: %i\n", id, *counter);
        }
    }
    if (event->stop)
    {
        printf("C operator received stop event\n");
    }

    OnEventResult_t result = {.status = DORA_STATUS_CONTINUE};
    return result;
}
  -ladvapi32 -luserenv -lkernel32 -lws2_32 -lbcrypt -lncrypt -lschannel -lntdll -liphlpapi
  -lcfgmgr32 -lcredui -lcrypt32 -lcryptnet -lfwpuclnt -lgdi32 -lmsimg32 -lmswsock -lole32
  -lopengl32 -lsecur32 -lshell32 -lsynchronization -luser32 -lwinspool
  -Wl,-nodefaultlib:libcmt -D_DLL -lmsvcrt
  ```
  Also: On Windows, the output file should have an `.exe` extension: `--output build/c_node.exe`
  • Repeat the previous step for the sink.c executable

  • Link it in your graph as:

        inputs:
          message: c_node/message
        outputs:
          - counter
  - id: c_sink
    custom:
      source: build/c_sink
      inputs:

Custom Node

The custom node API allow you to integrate dora into your application. It allows you to retrieve input and send output in any fashion you want.

init_dora_context_from_env

init_dora_context_from_env initiate a node from environment variables set by dora-coordinator

void *dora_context = init_dora_context_from_env();

dora_next_event

dora_next_event waits for the next event (e.g. an input). Use read_dora_event_type to read the event's type. Inputs are of type DoraEventType_Input. To extract the ID and data of an input event, use read_dora_input_id and read_dora_input_data on the returned pointer. It is safe to ignore any events and handle only the events that are relevant to the node.

void *input = dora_next_input(dora_context);

// read out the ID as a UTF8-encoded string
char *id;
size_t id_len;
read_dora_input_id(input, &id, &id_len);

// read out the data as a byte array
char *data;
size_t data_len;
read_dora_input_data(input, &data, &data_len);

dora_send_output

dora_send_output send data from the node.

char out_id[] = "tick";
char out_data[50];
dora_send_output(dora_context, out_id, strlen(out_id), out_data, out_data_len);

Try it out!

  • Create an node.c file:
#include <stdio.h>
#include <string.h>
#include <assert.h>
#include "../../apis/c/node/node_api.h"

// sleep
#ifdef _WIN32
#include <Windows.h>
#else
#include <unistd.h>
#endif

int main()
{
    printf("[c node] Hello World\n");

    void *dora_context = init_dora_context_from_env();
    if (dora_context == NULL)
    {
        fprintf(stderr, "failed to init dora context\n");
        return -1;
    }

    printf("[c node] dora context initialized\n");

    for (char i = 0; i < 100; i++)
    {
        void *event = dora_next_event(dora_context);
        if (event == NULL)
        {
            printf("[c node] ERROR: unexpected end of event\n");
            return -1;
        }

        enum DoraEventType ty = read_dora_event_type(event);

        if (ty == DoraEventType_Input)
        {
            char *data;
            size_t data_len;
            read_dora_input_data(event, &data, &data_len);

            assert(data_len == 0);

            char out_id[] = "message";
            char out_data[50];
            int out_data_len = sprintf(out_data, "loop iteration %d", i);

            dora_send_output(dora_context, out_id, strlen(out_id), out_data, out_data_len);
        }
        else if (ty == DoraEventType_Stop)
        {
            printf("[c node] received stop event\n");
        }
        else
        {
            printf("[c node] received unexpected event: %d\n", ty);
        }

        free_dora_event(event);
    }

    printf("[c node] received 10 events\n");

    free_dora_context(dora_context);

    printf("[c node] finished successfully\n");

    return 0;
}
  • Create a build folder in this directory (i.e., next to the node.c file)

  • Compile the dora-node-api-c crate into a static library.

    • Run cargo build -p dora-node-api-c --release
    • The resulting staticlib is then available under ../../target/release/libdora-node-api-c.a.
  • Compile the node.c (e.g. using clang) and link the staticlib

    • For example, use the following command:
      clang node.c <FLAGS> -ldora_node_api_c -L ../../target/release --output build/c_node
      
    • The <FLAGS> depend on the operating system and the libraries that the C node uses. The following flags are required for each OS:
  • Link it in your graph as:

        timer: dora/timer/millis/50
      outputs:
        - message
  - id: runtime-node
    operators:
      - id: c_operator
        shared-library: build/operator

Python API

Operator

The operator API is a framework for you to implement. The implemented operator will be managed by dora. This framework enable us to make optimisation and provide advanced features. It is the recommended way of using dora.

An operator requires an on_event method and requires to return a DoraStatus , depending of it needs to continue or stop.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

from typing import Callable

import cv2
import numpy as np
import pyarrow as pa
import torch

from dora import DoraStatus

pa.array([])

CAMERA_WIDTH = 640
CAMERA_HEIGHT = 480


class Operator:
    """
    Infering object from images
    """

    def __init__(self):
        self.model = torch.hub.load("ultralytics/yolov5", "yolov5n")

For Python, we recommend to allocate the operator on a single runtime. A runtime will share the same GIL with several operators making those operators run almost sequentially. See: https://docs.rs/pyo3/latest/pyo3/marker/struct.Python.html#deadlocks

Try it out!

  • Create an operator python file called object_detection.py:
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

from typing import Callable
from dora import Node

import cv2
import numpy as np
import torch

model = torch.hub.load("ultralytics/yolov5", "yolov5n")

node = Node()

for event in node:
    match event["type"]:
        case "INPUT":
            match event["id"]:
                case "image":
                    print("[object detection] received image input")
                    frame = np.frombuffer(event["data"], dtype="uint8")
                    frame = cv2.imdecode(frame, -1)
                    frame = frame[:, :, ::-1]  # OpenCV image (BGR to RGB)
                    results = model(frame)  # includes NMS
                    arrays = np.array(results.xyxy[0].cpu()).tobytes()

                    node.send_output("bbox", arrays, event["metadata"])
                case other:
                    print("[object detection] ignoring unexpected input:", other)
        case "STOP":
            print("[object detection] received stop")
        case "ERROR":
            print("[object detection] error: ", event["error"])
        case other:
            print("[object detection] received unexpected event:", other)
  • Link it in your graph as:
      source: ./object_detection.py
      inputs:
        image: webcam/image
      outputs:
        - bbox

  - id: plot

Custom Node

The custom node API allow you to integrate dora into your application. It allows you to retrieve input and send output in any fashion you want.

Node()

Node() initiate a node from environment variables set by dora-coordinator

from dora import Node

node = Node()

.next() or __next__() as an iterator

.next() gives you the next input that the node has received. It blocks until the next input becomes available. It will return None when all senders has been dropped.

input_id, value, metadata = node.next()

# or

for input_id, value, metadata in node:

.send_output(output_id, data)

send_output send data from the node.

node.send_output("string", b"string", {"open_telemetry_context": "7632e76"})

Try it out!

  • Install python node API:
pip install dora-rs
  • Create a python file called webcam.py:
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import time

import cv2

from dora import Node

node = Node()

video_capture = cv2.VideoCapture(0)

start = time.time()

# Run for 20 seconds
while time.time() - start < 10:
    # Wait next dora_input
    event = node.next()
    match event["type"]:
        case "INPUT":
            ret, frame = video_capture.read()
            if ret:
                node.send_output(
                    "image",
                    cv2.imencode(".jpg", frame)[1].tobytes(),
                    event["metadata"],
                )
        case "STOP":
            print("received stop")
            break
        case other:
            print("received unexpected event:", other)
            break

video_capture.release()
  • Link it in your graph as:
        tick:
          source: dora/timer/millis/100
          queue_size: 1000
      outputs:
        - image

  - id: object_detection

State Management

Most operations require to keep some sort of state between calls. This document describes the different ways to handle state in dora.

Internal State

Operators are struct or object instances, so they can keep internal state between invocations. This state is private to the operator. When an operator exits or crashes, its internal state is lost.

Saving State

To make themselves resilient against crashes, operators can use dora's state management. The dora runtime provides each operator with a private key-value store (KVS). Operators can save serialized state into the KVS by using the save_state function of the runtime:


#![allow(unused)]
fn main() {
fn save_state(key: &str, value: Vec<u8>)
}

The runtime only stores the latest value for each key, so subsequent writes to the same key replace the earlier values. Serialization is required because the state must be self-contained (i.e. no pointers to other memory) and consistent (i.e. no half-updated state). Otherwise, state recovery might not be possible after an operator crash.

To keep the performance overhead of this function low, it is recommended to use a suitable serialization format that stores the data with minimal memory and compute overhead. Text-based formats such as JSON are not recommended. Also, fast-changing state should be stored under a separate key to minimize the amount of state that needs to be written.

State Recovery

When an operator crashes, the dora runtime restarts it and supplies it with the last version of the saved state. It does this by calling the operator's restore_state method:


#![allow(unused)]
fn main() {
fn restore_state(&mut self, state: HashMap<String, Vec<u8>>)
}

In this method, the operator should deserialize and apply all state entries, and perform all custom consistency checks that are necessary.

Sharing State

To share state between operators, dora provides access to a node-local key-value store:


#![allow(unused)]
fn main() {
fn kvs_write(key: &str, value: Vec<u8>)
}

#![allow(unused)]
fn main() {
fn kvs_read(key: &str) -> Vec<u8>
}

Todo:

  • Consistency?
  • Anna?

Custom Nodes

Custom nodes have full control over the execution, so they can implement their own state management. Shared state can be accessed through the kvs_read and kvs_write functions of the dora library, which are equivalent to the respective functions provided by the dora runtime.

Since custom nodes cannot use the recovery feature of the dora runtime, the save_state/restore_state functions are not available for them.

Framework

  • Runtime process
    • Talks with other runtime processes
    • Across machines
    • loop
      • listen for inputs
      • invoke corresponding operator(s)
      • collect and forward outputs
  • Operators
    • Connected to runtime
      • Via TCP socket (can be a separate process)
        • Single connection with high level message format, or
        • Separate connection per input/output
      • Dynamically linked as shared library
        • Runtime invokes specific handler message directly with input(s)
        • Outputs either:
          • Return a collection as result
          • Call runtime function to send out result
    • Input aggregation (i.e. waiting until multiple inputs are available)
      • by runtime -> aggregation specified in config file
      • by operator -> custom handling possible

Library

  • All sources/operator/sinks are separate processes that link a runtime library
  • "Orchestrator" process
    • reads config file
    • launches processes accordingly
    • passes node config
      • as argument
      • via env variable
      • including input and output names
  • Runtime library provides (async) functions to
    • wait for one or multiple inputs
    • with timeouts
    • send out outputs

Middleware (communication) layer abstraction (MLA)

dora needs to implement MLA as a separate crate to provides a middleware abstraction layer that enables scalable, high performance communications for inter async tasks, intra-process (OS threads), interprocess communication on a single computer node or between different nodes in a computer network. MLA needs to support different communication patterns:

  • publish-subscribe push / push pattern - the published message is pushed to subscribers
  • publish-subscribe push / pull pattern - the published message is write to storage and later pulled by subscribers
  • Request / reply pattern
  • Point-to-point pattern
  • Client / Server pattern

The MLA needs to abstract following details:

  • inter-async tasks (e.g., tokio channels), intraprocess (OS threads, e.g., shared memory), interprocess and inter-host / inter-network communication
  • different transport layer implementations (shared memory, UDP, TCP)
  • builtin support for multiple serialization / deserialization protocols, e.g, capnproto, protobuf, flatbuffers etc
  • different language bindings to Rust, Python, C, C++ etc
  • telemetry tools for logs, metrics, distributed tracing, live data monitoring (e.g., tap a live data), recording and replay

Rust eco-system has abundant crates to provide underlaying communications, e.g.,:

  • tokio / crossbeam provides different types of channels serving different purpose: mpsc, oneshot, broadcast, watch etc
  • Tonic provides gRPC services
  • Tower provides request/reply service
  • Zenoh middleware provides many different pub/sub capabilities

MLA also needs to provide high level APIs:

  • publish(topic, value, optional fields):- optional fields may contain senders' identify to help MLA logics to satify above requirements
  • subscriber(topic, optional fields)-> future streams
  • put(key, value, optional fields)
  • get(key, optional fields) -> value
  • send(key, msg, optional fields)
  • recv(key, optional fields)->value

More info here: #53