Struct cfrp::Builder [] [src]

pub struct Builder {
    pub inputs: RefCell<Vec<Box<RunInput>>>,
    pub runners: RefCell<Vec<Box<Run>>>,
    // some fields omitted
}

Builder provides helpers for building topologies

Fields

inputs
runners

Methods

impl Builder

fn new(config: Config) -> Self

Create a new Builder

fn listen<A>(&self, initial: A, input: Receiver<A>) -> Branch<A> where A: 'static + Clone + Send

Listen to input and push received data into the topology

All data must enter the topology via a call to listen; this function ensures data syncronization across the topology. Each listener runs in its own thread

Example

use std::default::*;
use std::sync::mpsc::*;
use cfrp::*;

let b = Builder::new(Default::default());
 
let (tx, rx): (Sender<usize>, Receiver<usize>) = channel();

// Receive data on `rx` and expose it as a signal with initial value 
//`initial`.  This is necessary because the topology must maintain 
// consistency between threads, so any message sent to any input is 
// propagated to all other inputs as "no-change" messages.
let signal = b.listen(0, rx);

fn value<T>(&self, v: T) -> Value<T> where T: 'static + Clone + Send

Creats a channel with constant value v

Nodes downstream of values will be executed once on initialization and never again. Constant values can be combined with dynamic values via lift2, in which case nodes will be executed when their dynamic parents change values.

Example

use std::default::*;
use std::sync::mpsc::*;
use cfrp::*;

let b = Builder::new(Default::default());
let (tx, rx): (Sender<usize>, Receiver<usize>) = channel();

let v = b.value(0);
let ch = b.listen(0, rx);

// Only ever computed once
let l1 = v.lift(|i| { i + 1 });

// Computed any time `ch` receives data, `static` will always be `0`
let l2 = l1.lift2(ch, |st, dy| { *st + *dy });

fn every(&self, interval: Duration) -> Branch<Tm>

Returns a signal which emits the "current" time every at every interval

The actual time between events may be longer than interval if writing to the topology blocks or the scheduling thread is pre-empted by other threads, however a signal with every interval's time value will eventually be sent.

fn ack_value<A>(&self, initial: A) -> Branch<A> where A: 'static + Clone + Send

Creates a channel which pushes Event::Changed(initial) when any other channel receives changes

Signals created with listen only cause nodes directly downstream of themselves to be recomputed. By contrast, signals created by ack_* will emit a value when any input signal's value changes.

Example

use std::sync::mpsc::*;
use cfrp::*;

let(tx, rx) = channel();
let(out_tx, out_rx) = channel();

spawn_topology(Default::default(), move |t| {
    t.add(t.ack_value(1).lift(move |i| { out_tx.send(i).unwrap(); }));
    t.add(t.listen(0, rx));
});

// Initial
assert_eq!(out_rx.recv().unwrap(), 1);

tx.send(1).unwrap();
assert_eq!(out_rx.recv().unwrap(), 1);

fn ack_counter<A, B>(&self, initial: A, by: A) -> Branch<A> where A: 'static + Clone + Send + Add<Output=A>

Return a signal that increments each time the topology receives data

Signals created with listen only cause nodes directly downstream of themselves to be recomputed. By contrast, signals created by ack_* will emit a value when any input signal's value changes.

fn ack_timestamp(&self) -> Branch<Tm>

Return a signal with the 'current' time each time the topology receives data

Signals created with listen only cause nodes directly downstream of themselves to be recomputed. By contrast, signals created by ack_* will emit a value when any input signal's value changes.

fn ack_random<R, A>(&self, rng: R) -> Branch<A> where R: 'static + Rng + Clone + Send, A: 'static + Send + Clone + Rand

Return a signal which generates a random value each time the topology receives data

If randomness is needed, ack_random is probably a better way of generating it than creating a Rng inside a handler because it exposes the generated value as a 'fact' about the system's history rather than embedding it in a non-deterministic function.

Signals created with listen only cause nodes directly downstream of themselves to be recomputed. By contrast, signals created by ack_* will emit a value when any input signal's value changes.

fn add<SA, A>(&self, root: SA) -> Branch<A> where SA: 'static + Signal<A>, A: 'static + Clone + Send

Add a signal to the topology

Returns a Branch<A>, allowing root to be used as input more than once SignalExt<A> also provides add_to(&Builder) so Builder::add can be used with method-chaining syntax

Example

use std::default::*;
use cfrp::*;

let b = Builder::new(Default::default());
 
// Topologies only execute transformations which have been added to a builder.
let fork = b.add(b.value(1).lift(|i| { i + 1} ));

// `add` returns a signal that can be used more than once
fork
    .clone()
    .lift(|i| { i - 1 } )
    .add_to(&b);

fork
    .lift(|i| { -i })
    .add_to(&b);

fn async<SA, A>(&self, root: SA) -> Branch<A> where SA: 'static + Signal<A>, A: 'static + Clone + Send

Combination of adding a signal and a channel

Async allows signals to be processed downstream out of order. Internally, the output of root is sent to new input channel. The result is that long-running processes can be handled outside of the synchronized topology process, and the result can be handled when it's available.

use std::thread;
use std::sync::mpsc::*;
use cfrp::*;

let (slow_tx, slow_rx) = channel();
let (fast_tx, fast_rx) = channel();
let (out_tx, out_rx) = channel();

spawn_topology(Default::default(), move |t| {
    let slow = t.listen(1 << 0, slow_rx)
        .lift(|i| -> usize { 
            if i > 1 { // allow the initial value to be computed quickly
                thread::sleep_ms(100);
            }

            i 
        }).async(t);

    let fast = t.listen(1 << 1, fast_rx);

    slow.lift2(fast, move |i,j| { out_tx.send(*i | *j).unwrap() })
    .add_to(t);
});

// Initial value
assert_eq!(out_rx.recv().unwrap(), (1 << 0) | (1 << 1));

slow_tx.send(1 << 2).unwrap();
fast_tx.send(1 << 3).unwrap();

// Will receive the 'fast' value first...
assert_eq!(out_rx.recv().unwrap(), (1 << 0) | (1 << 3));
// ...then the slow one
assert_eq!(out_rx.recv().unwrap(), (1 << 2) | (1 << 3));