1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
use std::sync::mpsc::*;
use super::super::{Event, Signal, SignalExt, SignalType, Push, Config};
pub struct Channel<A> where
A: 'static + Send + Clone,
{
config: Config,
source_rx: Receiver<Event<A>>,
initial: A,
}
impl<A> Channel<A> where
A: 'static + Send + Clone,
{
pub fn new(config: Config, source_rx: Receiver<Event<A>>, initial: A) -> Channel<A> {
Channel {
config: config,
source_rx: source_rx,
initial: initial,
}
}
}
impl<A> Signal<A> for Channel<A> where
A: 'static + Send + Clone,
{
fn config(&self) -> Config {
self.config.clone()
}
fn initial(&self) -> SignalType<A> {
SignalType::Dynamic(self.initial.clone())
}
fn push_to(self: Box<Self>, target: Option<Box<Push<A>>>) {
match target {
Some(mut t) => {
debug!("SETUP: Sending to Some");
loop {
match self.source_rx.recv() {
Err(e) => {
info!("RUN: Channel source_rx received Err {}, exiting", e);
t.push(Event::Exit);
return
},
Ok(a) => {
info!("RUN: Channel source_rx received data, pushing");
t.push(a);
},
}
}
}
None => {
debug!("SETUP: Sending to None");
loop {
match self.source_rx.recv() {
Err(e) => {
info!("RUN: Channel source_rx received Err {} with no target, exiting", e);
return
},
_ => {
info!("RUN: source_rx received data, but no target");
},
}
}
},
}
}
}
impl<A> SignalExt<A> for Channel<A> where A: 'static + Send + Clone {}