From 6793b25a67d79f4b0022e5c253dca56a3c96630f Mon Sep 17 00:00:00 2001 From: Vincent Ambo Date: Sun, 27 May 2018 23:57:24 +0200 Subject: feat(main): Implement receiver & flushing logic The only thing missing for a 0.1 test run is the actual gRPC call to Stackdriver. --- src/main.rs | 115 ++++++++++++++++++++++++++++++++++++++++++++++++++++-------- 1 file changed, 100 insertions(+), 15 deletions(-) (limited to 'src/main.rs') diff --git a/src/main.rs b/src/main.rs index aca6aba900ad..9cc9dcee0d63 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,22 +1,107 @@ -#[macro_use] extern crate failure; -extern crate libc; +// #[macro_use] extern crate failure; +#[macro_use] extern crate log; -mod journald; +extern crate env_logger; +extern crate systemd; +use systemd::journal::*; use std::process; +use std::thread; +use std::sync::mpsc::{channel, Receiver}; +use std::time::{Duration, Instant}; +use std::collections::vec_deque::{VecDeque, Drain}; -fn main() { - let mut journal = match journald::open_journal() { - Ok(journal) => journal, - Err(e) => { - println!("{}", e); - process::exit(1); - }, - }; +#[derive(Debug)] +struct Record { + message: Option, + hostname: Option, + unit: Option, + timestamp: Option, +} + +impl From for Record { + fn from(mut record: JournalRecord) -> Record { + Record { + // The message field is technically just a convention, but + // journald seems to default to it when ingesting unit + // output. + message: record.remove("MESSAGE"), + + // Presumably this is always set, but who can be sure + // about anything in this world. + hostname: record.remove("_HOSTNAME"), + + // The unit is seemingly missing on kernel entries, but + // present on all others. + unit: record.remove("_SYSTEMD_UNIT"), + + // This timestamp is present on most log entries + // (seemingly all that are ingested from the output + // systemd units). + timestamp: record.remove("_SOURCE_REALTIME_TIMESTAMP"), + } + } +} + +/// This function spawns a double-looped, blocking receiver. It will +/// buffer messages for a second before flushing them to Stackdriver. +fn receiver_loop(rx: Receiver) { + let mut buf = VecDeque::new(); + let iteration = Duration::from_millis(500); + + loop { + trace!("Beginning outer iteration"); + let now = Instant::now(); + + loop { + if now.elapsed() > iteration { + break; + } + + if let Ok(record) = rx.recv_timeout(iteration) { + buf.push_back(record); + } + } + + if !buf.is_empty() { + flush(buf.drain(..)); + } + + trace!("Done outer iteration"); + } +} + +/// Flushes all drained records to Stackdriver. +fn flush(drain: Drain) { + let record_count = drain.count(); + debug!("Flushed {} records", record_count); +} + +fn main () { + env_logger::init(); + + let mut journal = Journal::open(JournalFiles::All, false, true) + .expect("Failed to open systemd journal"); + + match journal.seek(JournalSeek::Tail) { + Ok(cursor) => info!("Opened journal at cursor '{}'", cursor), + Err(err) => { + error!("Failed to set initial journal position: {}", err); + process::exit(1) + } + } + + let (tx, rx) = channel(); + + let _receiver = thread::spawn(move || receiver_loop(rx)); + + journal.watch_all_elements(move |record| { + let record: Record = record.into(); - println!("foo"); - - let entry = journal.read_next(); + if record.message.is_some() { + tx.send(record).ok(); + } - println!("Entry: {:?}", entry) + Ok(()) + }).expect("Failed to read new entries from journal"); } -- cgit 1.4.1