diff options
author | Vincent Ambo <tazjin@gmail.com> | 2018-05-27T21·57+0200 |
---|---|---|
committer | Vincent Ambo <tazjin@gmail.com> | 2018-05-27T21·57+0200 |
commit | 6793b25a67d79f4b0022e5c253dca56a3c96630f (patch) | |
tree | d79227e4b9f0a8a007fef315b0f1c32e68760f2b /src | |
parent | c5cd12b81f3a2908c3f8202dbc94dc535cf092c4 (diff) |
feat(main): Implement receiver & flushing logic
The only thing missing for a 0.1 test run is the actual gRPC call to Stackdriver.
Diffstat (limited to 'src')
-rw-r--r-- | src/journald.rs | 71 | ||||
-rw-r--r-- | src/main.rs | 115 |
2 files changed, 100 insertions, 86 deletions
diff --git a/src/journald.rs b/src/journald.rs deleted file mode 100644 index 4892553ea57e..000000000000 --- a/src/journald.rs +++ /dev/null @@ -1,71 +0,0 @@ -//! This module contains FFI-bindings to the journald APi. See -//! sd-journal(3) for detailed information about the API. -//! -//! Only calls required by journaldriver are implemented. - -/// This type represents an opaque pointer to an `sd_journal` struct. -/// It should be changed to an `extern type` once RF1861 is -/// stabilized. -enum SdJournal {} - -use failure::Error; -use std::mem; - -extern { - fn sd_journal_open(sd_journal: *mut SdJournal, flags: usize) -> usize; - fn sd_journal_close(sd_journal: *mut SdJournal); - fn sd_journal_next(sd_journal: *mut SdJournal) -> usize; -} - -// Safe interface: - -/// This struct contains the opaque data used by libsystemd to -/// reference the journal. -pub struct Journal { - sd_journal: *mut SdJournal, -} - -impl Drop for Journal { - fn drop(&mut self) { - unsafe { - sd_journal_close(self.sd_journal); - } - } -} - -/// Open the journal for reading. No flags are supplied to libsystemd, -/// which means that all journal entries will become available. -pub fn open_journal() -> Result<Journal, Error> { - let (mut sd_journal, result) = unsafe { - let mut journal: SdJournal = mem::uninitialized(); - let result = sd_journal_open(&mut journal, 0); - (journal, result) - }; - - ensure!(result == 0, "Could not open journal (errno: {})", result); - Ok(Journal { sd_journal: &mut sd_journal }) -} - -#[derive(Debug)] -pub enum NextEntry { - /// If no new entries are available in the journal this variant is - /// returned. - NoEntry, - - Entry, -} - -impl Journal { - pub fn read_next(&self) -> Result<NextEntry, Error> { - let result = unsafe { - sd_journal_next(self.sd_journal) - }; - - match result { - 0 => Ok(NextEntry::NoEntry), - 1 => Ok(NextEntry::Entry), - n if n > 1 => bail!("Journal unexpectedly advanced by {} entries!", n), - _ => bail!("An error occured while advancing the journal (errno: {})", result), - } - } -} diff --git a/src/main.rs b/src/main.rs index aca6aba900ad..9cc9dcee0d63 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,22 +1,107 @@ -#[macro_use] extern crate failure; -extern crate libc; +// #[macro_use] extern crate failure; +#[macro_use] extern crate log; -mod journald; +extern crate env_logger; +extern crate systemd; +use systemd::journal::*; use std::process; +use std::thread; +use std::sync::mpsc::{channel, Receiver}; +use std::time::{Duration, Instant}; +use std::collections::vec_deque::{VecDeque, Drain}; -fn main() { - let mut journal = match journald::open_journal() { - Ok(journal) => journal, - Err(e) => { - println!("{}", e); - process::exit(1); - }, - }; +#[derive(Debug)] +struct Record { + message: Option<String>, + hostname: Option<String>, + unit: Option<String>, + timestamp: Option<String>, +} + +impl From<JournalRecord> for Record { + fn from(mut record: JournalRecord) -> Record { + Record { + // The message field is technically just a convention, but + // journald seems to default to it when ingesting unit + // output. + message: record.remove("MESSAGE"), + + // Presumably this is always set, but who can be sure + // about anything in this world. + hostname: record.remove("_HOSTNAME"), + + // The unit is seemingly missing on kernel entries, but + // present on all others. + unit: record.remove("_SYSTEMD_UNIT"), + + // This timestamp is present on most log entries + // (seemingly all that are ingested from the output + // systemd units). + timestamp: record.remove("_SOURCE_REALTIME_TIMESTAMP"), + } + } +} + +/// This function spawns a double-looped, blocking receiver. It will +/// buffer messages for a second before flushing them to Stackdriver. +fn receiver_loop(rx: Receiver<Record>) { + let mut buf = VecDeque::new(); + let iteration = Duration::from_millis(500); + + loop { + trace!("Beginning outer iteration"); + let now = Instant::now(); + + loop { + if now.elapsed() > iteration { + break; + } + + if let Ok(record) = rx.recv_timeout(iteration) { + buf.push_back(record); + } + } + + if !buf.is_empty() { + flush(buf.drain(..)); + } + + trace!("Done outer iteration"); + } +} + +/// Flushes all drained records to Stackdriver. +fn flush(drain: Drain<Record>) { + let record_count = drain.count(); + debug!("Flushed {} records", record_count); +} + +fn main () { + env_logger::init(); + + let mut journal = Journal::open(JournalFiles::All, false, true) + .expect("Failed to open systemd journal"); + + match journal.seek(JournalSeek::Tail) { + Ok(cursor) => info!("Opened journal at cursor '{}'", cursor), + Err(err) => { + error!("Failed to set initial journal position: {}", err); + process::exit(1) + } + } + + let (tx, rx) = channel(); + + let _receiver = thread::spawn(move || receiver_loop(rx)); + + journal.watch_all_elements(move |record| { + let record: Record = record.into(); - println!("foo"); - - let entry = journal.read_next(); + if record.message.is_some() { + tx.send(record).ok(); + } - println!("Entry: {:?}", entry) + Ok(()) + }).expect("Failed to read new entries from journal"); } |