about summary refs log tree commit diff
path: root/src/main.rs
diff options
context:
space:
mode:
authorVincent Ambo <mail@tazj.in>2018-05-28T07·45+0200
committerVincent Ambo <mail@tazj.in>2018-05-28T07·45+0200
commit0db4512df4001cdb3d9dfaa8162b3a095757c18d (patch)
tree39ca17a312209aacb4b834ba3ee5296e9643e686 /src/main.rs
parent6793b25a67d79f4b0022e5c253dca56a3c96630f (diff)
refactor(main): Simplify receiver
Departing from the initial approach. There's no reason to multithread
this for now.
Diffstat (limited to 'src/main.rs')
-rw-r--r--src/main.rs29
1 files changed, 8 insertions, 21 deletions
diff --git a/src/main.rs b/src/main.rs
index 9cc9dcee0d..ddf9154419 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -6,8 +6,6 @@ extern crate systemd;
 
 use systemd::journal::*;
 use std::process;
-use std::thread;
-use std::sync::mpsc::{channel, Receiver};
 use std::time::{Duration, Instant};
 use std::collections::vec_deque::{VecDeque, Drain};
 
@@ -43,10 +41,11 @@ impl From<JournalRecord> for Record {
     }
 }
 
-/// This function spawns a double-looped, blocking receiver. It will
-/// buffer messages for a second before flushing them to Stackdriver.
-fn receiver_loop(rx: Receiver<Record>) {
-    let mut buf = VecDeque::new();
+/// This function starts a double-looped, blocking receiver. It will
+/// buffer messages for half a second before flushing them to
+/// Stackdriver.
+fn receiver_loop(mut journal: Journal) {
+    let mut buf: VecDeque<Record> = VecDeque::new();
     let iteration = Duration::from_millis(500);
 
     loop {
@@ -58,8 +57,8 @@ fn receiver_loop(rx: Receiver<Record>) {
                 break;
             }
 
-            if let Ok(record) = rx.recv_timeout(iteration) {
-                buf.push_back(record);
+            if let Ok(Some(record)) = journal.await_next_record(Some(iteration)) {
+                buf.push_back(record.into());
             }
         }
 
@@ -91,17 +90,5 @@ fn main () {
         }
     }
 
-    let (tx, rx) = channel();
-
-    let _receiver = thread::spawn(move || receiver_loop(rx));
-
-    journal.watch_all_elements(move |record| {
-        let record: Record = record.into();
-
-        if record.message.is_some() {
-            tx.send(record).ok();
-        }
-
-        Ok(())
-    }).expect("Failed to read new entries from journal");
+    receiver_loop(journal)
 }