From 0db4512df4001cdb3d9dfaa8162b3a095757c18d Mon Sep 17 00:00:00 2001 From: Vincent Ambo Date: Mon, 28 May 2018 09:45:07 +0200 Subject: refactor(main): Simplify receiver Departing from the initial approach. There's no reason to multithread this for now. --- src/main.rs | 29 ++++++++--------------------- 1 file changed, 8 insertions(+), 21 deletions(-) (limited to 'src/main.rs') diff --git a/src/main.rs b/src/main.rs index 9cc9dcee0d63..ddf9154419a8 100644 --- a/src/main.rs +++ b/src/main.rs @@ -6,8 +6,6 @@ extern crate systemd; use systemd::journal::*; use std::process; -use std::thread; -use std::sync::mpsc::{channel, Receiver}; use std::time::{Duration, Instant}; use std::collections::vec_deque::{VecDeque, Drain}; @@ -43,10 +41,11 @@ impl From for Record { } } -/// This function spawns a double-looped, blocking receiver. It will -/// buffer messages for a second before flushing them to Stackdriver. -fn receiver_loop(rx: Receiver) { - let mut buf = VecDeque::new(); +/// This function starts a double-looped, blocking receiver. It will +/// buffer messages for half a second before flushing them to +/// Stackdriver. +fn receiver_loop(mut journal: Journal) { + let mut buf: VecDeque = VecDeque::new(); let iteration = Duration::from_millis(500); loop { @@ -58,8 +57,8 @@ fn receiver_loop(rx: Receiver) { break; } - if let Ok(record) = rx.recv_timeout(iteration) { - buf.push_back(record); + if let Ok(Some(record)) = journal.await_next_record(Some(iteration)) { + buf.push_back(record.into()); } } @@ -91,17 +90,5 @@ fn main () { } } - let (tx, rx) = channel(); - - let _receiver = thread::spawn(move || receiver_loop(rx)); - - journal.watch_all_elements(move |record| { - let record: Record = record.into(); - - if record.message.is_some() { - tx.send(record).ok(); - } - - Ok(()) - }).expect("Failed to read new entries from journal"); + receiver_loop(journal) } -- cgit 1.4.1