1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338
use std::thread; use std::iter; use std::cell::*; use std::sync::*; use std::sync::mpsc::*; use std::marker::*; use std::ops::Add; use rand; use time; use super::{Signal, SignalExt, Run, Config}; use primitives::input::{RunInput, ReceiverInput, AckInput, RngInput}; use primitives::fork::{Fork, Branch}; use primitives::channel::Channel; use primitives::async::Async; use primitives::value::Value; /// `Builder` provides helpers for building topologies /// pub struct Builder { config: Config, pub inputs: RefCell<Vec<Box<RunInput>>>, pub runners: RefCell<Vec<Box<Run>>>, } impl Builder { /// Create a new Builder /// pub fn new(config: Config) -> Self { Builder { config: config, runners: RefCell::new(Vec::new()), inputs: RefCell::new(Vec::new()), } } /// Listen to `input` and push received data into the topology /// /// All data must enter the topology via a call to `listen`; this function /// ensures data syncronization across the topology. Each listener runs in /// its own thread /// /// # Example /// /// ``` /// use std::default::*; /// use std::sync::mpsc::*; /// use cfrp::*; /// /// let b = Builder::new(Default::default()); /// /// let (tx, rx): (Sender<usize>, Receiver<usize>) = channel(); /// /// // Receive data on `rx` and expose it as a signal with initial value /// //`initial`. This is necessary because the topology must maintain /// // consistency between threads, so any message sent to any input is /// // propagated to all other inputs as "no-change" messages. /// let signal = b.listen(0, rx); /// ``` /// pub fn listen<A>(&self, initial: A, input: Receiver<A>) -> Branch<A> where A: 'static + Clone + Send, { let (tx, rx) = sync_channel(self.config.buffer_size.clone()); let runner = ReceiverInput::new(input, tx); self.inputs.borrow_mut().push(Box::new(runner)); self.add(Channel::new(self.config.clone(), rx, initial)) } /// Creats a channel with constant value `v` /// /// Nodes downstream of values will be executed once on initialization and /// never again. Constant values can be combined with dynamic values via /// `lift2`, in which case nodes will be executed when their dynamic parents /// change values. /// /// # Example /// /// ``` /// use std::default::*; /// use std::sync::mpsc::*; /// use cfrp::*; /// /// let b = Builder::new(Default::default()); /// let (tx, rx): (Sender<usize>, Receiver<usize>) = channel(); /// /// let v = b.value(0); /// let ch = b.listen(0, rx); /// /// // Only ever computed once /// let l1 = v.lift(|i| { i + 1 }); /// /// // Computed any time `ch` receives data, `static` will always be `0` /// let l2 = l1.lift2(ch, |st, dy| { *st + *dy }); /// ``` /// pub fn value<T>(&self, v: T) -> Value<T> where T: 'static + Clone + Send, { Value::new(self.config.clone(), v) } /// Returns a signal which emits the "current" time every at every `interval` /// /// The actual time between events may be longer than `interval` if /// writing to the topology blocks or the scheduling thread is pre-empted /// by other threads, however a signal with every interval's time value will /// eventually be sent. /// pub fn every(&self, interval: time::Duration) -> Branch<time::Tm> { let (tx, rx) = sync_channel(0); let initial = time::now(); let mut last_tick = initial.clone(); thread::spawn(move || { loop { let tm = iter::repeat(interval) .scan(last_tick.clone(), |tm, d| { *tm = *tm + d; Some(*tm) }) .inspect(|tm| { match tx.send(tm.clone()) { Ok(_) => { last_tick = *tm; }, Err(_) => return, } }) .take_while(|tm| { *tm < time::now() }) .last(); let sleep_duration = time::now() - tm.unwrap(); thread::sleep_ms(sleep_duration.num_milliseconds() as u32); } }); self.listen(initial, rx) } /// Creates a channel which pushes `Event::Changed(initial)` when any /// other channel receives changes /// /// Signals created with `listen` only cause nodes directly downstream of /// themselves to be recomputed. By contrast, signals created by `ack_*` will /// emit a value when any input signal's value changes. /// /// # Example /// ``` /// use std::sync::mpsc::*; /// use cfrp::*; /// /// let(tx, rx) = channel(); /// let(out_tx, out_rx) = channel(); /// /// spawn_topology(Default::default(), move |t| { /// t.add(t.ack_value(1).lift(move |i| { out_tx.send(i).unwrap(); })); /// t.add(t.listen(0, rx)); /// }); /// /// // Initial /// assert_eq!(out_rx.recv().unwrap(), 1); /// /// tx.send(1).unwrap(); /// assert_eq!(out_rx.recv().unwrap(), 1); /// ``` /// pub fn ack_value<A>(&self, initial: A) -> Branch<A> where A: 'static + Clone + Send, { let (tx, rx) = sync_channel(self.config.buffer_size.clone()); let runner = AckInput::new(initial.clone(), tx); self.inputs.borrow_mut().push(Box::new(runner)); self.add(Channel::new(self.config.clone(), rx, initial)) } /// Return a signal that increments each time the topology receives data /// /// Signals created with `listen` only cause nodes directly downstream of /// themselves to be recomputed. By contrast, signals created by `ack_*` will /// emit a value when any input signal's value changes. /// pub fn ack_counter<A, B>(&self, initial: A, by: A) -> Branch<A> where A: 'static + Clone + Send + Add<Output=A>, { self.add( self.ack_value(by) .fold(initial, |c, incr| { c + incr }) ) } /// Return a signal with the 'current' time each time the topology receives /// data /// /// Signals created with `listen` only cause nodes directly downstream of /// themselves to be recomputed. By contrast, signals created by `ack_*` will /// emit a value when any input signal's value changes. /// pub fn ack_timestamp(&self) -> Branch<time::Tm> { self.add( self.ack_value(()) .lift(|_| -> time::Tm { time::now() }) ) } /// Return a signal which generates a random value each time the topology /// receives data /// /// If randomness is needed, `ack_random` is probably a better way of generating it /// than creating a Rng inside a handler because it exposes the generated /// value as a 'fact' about the system's history rather than embedding it /// in a non-deterministic function. /// /// Signals created with `listen` only cause nodes directly downstream of /// themselves to be recomputed. By contrast, signals created by `ack_*` will /// emit a value when any input signal's value changes. /// pub fn ack_random<R, A>(&self, mut rng: R) -> Branch<A> where R: 'static + rand::Rng + Clone + Send, A: 'static + Send + Clone + rand::Rand, { let (tx, rx) = sync_channel(self.config.buffer_size.clone()); let initial = rng.gen(); let runner = RngInput::new(rng, tx); self.inputs.borrow_mut().push(Box::new(runner)); self.add(Channel::new(self.config.clone(), rx, initial)) } /// Add a signal to the topology /// /// Returns a `Branch<A>`, allowing `root` to be used as input more than once /// `SignalExt<A>` also provides `add_to(&Builder)` so `Builder::add` can be /// used with method-chaining syntax /// /// # Example /// /// ``` /// use std::default::*; /// use cfrp::*; /// /// let b = Builder::new(Default::default()); /// /// // Topologies only execute transformations which have been added to a builder. /// let fork = b.add(b.value(1).lift(|i| { i + 1} )); /// /// // `add` returns a signal that can be used more than once /// fork /// .clone() /// .lift(|i| { i - 1 } ) /// .add_to(&b); /// /// fork /// .lift(|i| { -i }) /// .add_to(&b); /// ``` /// pub fn add<SA, A>(&self, root: SA) -> Branch<A> where // NOTE: This needs to be clone-able! SA: 'static + Signal<A>, A: 'static + Clone + Send, { let v = root.initial(); let fork_txs = Arc::new(Mutex::new(Vec::new())); let fork = Fork::new(Box::new(root), fork_txs.clone()); self.runners.borrow_mut().push(Box::new(fork)); Branch::new(self.config.clone(), fork_txs, None, v) } /// Combination of adding a signal and a channel /// /// Async allows signals to be processed downstream out of order. Internally, /// the output of `root` is sent to new input channel. The result is that /// long-running processes can be handled outside of the synchronized topology /// process, and the result can be handled when it's available. /// /// ``` /// use std::thread; /// use std::sync::mpsc::*; /// use cfrp::*; /// /// let (slow_tx, slow_rx) = channel(); /// let (fast_tx, fast_rx) = channel(); /// let (out_tx, out_rx) = channel(); /// /// spawn_topology(Default::default(), move |t| { /// let slow = t.listen(1 << 0, slow_rx) /// .lift(|i| -> usize { /// if i > 1 { // allow the initial value to be computed quickly /// thread::sleep_ms(100); /// } /// /// i /// }).async(t); /// /// let fast = t.listen(1 << 1, fast_rx); /// /// slow.lift2(fast, move |i,j| { out_tx.send(*i | *j).unwrap() }) /// .add_to(t); /// }); /// /// // Initial value /// assert_eq!(out_rx.recv().unwrap(), (1 << 0) | (1 << 1)); /// /// slow_tx.send(1 << 2).unwrap(); /// fast_tx.send(1 << 3).unwrap(); /// /// // Will receive the 'fast' value first... /// assert_eq!(out_rx.recv().unwrap(), (1 << 0) | (1 << 3)); /// // ...then the slow one /// assert_eq!(out_rx.recv().unwrap(), (1 << 2) | (1 << 3)); /// ``` /// pub fn async<SA, A>(&self, root: SA) -> Branch<A> where // NOTE: Needs to be cloneable SA: 'static + Signal<A>, A: 'static + Clone + Send, { let v = root.initial(); let (tx, rx) = sync_channel(self.config.buffer_size.clone()); let pusher = Async::new(Box::new(root), tx); self.runners.borrow_mut().push(Box::new(pusher)); self.listen(v.unwrap(), rx) } }