1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
use std::sync::mpsc::*;

use super::super::{Event, Signal, SignalExt, SignalType, Push, Config};

pub struct Channel<A> where
    A: 'static + Send + Clone,
{
    config: Config,
    source_rx: Receiver<Event<A>>,
    initial: A,
}

impl<A> Channel<A> where
    A: 'static + Send + Clone,
{
    pub fn new(config: Config, source_rx: Receiver<Event<A>>, initial: A) -> Channel<A> {
        Channel {
            config: config,
            source_rx: source_rx,
            initial: initial,
        }
    }
}

impl<A> Signal<A> for Channel<A> where
    A: 'static + Send + Clone,
{
    fn config(&self) -> Config {
        self.config.clone()
    }

    fn initial(&self) -> SignalType<A> {
        SignalType::Dynamic(self.initial.clone())
    }

    fn push_to(self: Box<Self>, target: Option<Box<Push<A>>>) {
        match target {
            Some(mut t) => {
                debug!("SETUP: Sending to Some");
                loop {
                    match self.source_rx.recv() {
                        Err(e) => {
                            info!("RUN: Channel source_rx received Err {}, exiting", e);
                            t.push(Event::Exit);

                            return
                        },
                        Ok(a) => {
                            info!("RUN: Channel source_rx received data, pushing");
                            t.push(a);
                        },
                    }
                }
            }
            None => {
                debug!("SETUP: Sending to None");
                // Just ensuring the channel is drained so we don't get memory leaks
                loop {
                    match self.source_rx.recv() {
                        Err(e) => {
                            info!("RUN: Channel source_rx received Err {} with no target, exiting", e);
                            return
                        },
                        _ => {
                            info!("RUN: source_rx received data, but no target");
                        },
                    }
                }
            },
        }
    }
}
impl<A> SignalExt<A> for Channel<A> where A: 'static + Send + Clone {}