about summary refs log tree commit diff
path: root/src
diff options
context:
space:
mode:
authorVincent Ambo <tazjin@gmail.com>2018-05-27T21·57+0200
committerVincent Ambo <tazjin@gmail.com>2018-05-27T21·57+0200
commit6793b25a67d79f4b0022e5c253dca56a3c96630f (patch)
treed79227e4b9f0a8a007fef315b0f1c32e68760f2b /src
parentc5cd12b81f3a2908c3f8202dbc94dc535cf092c4 (diff)
feat(main): Implement receiver & flushing logic
The only thing missing for a 0.1 test run is the actual gRPC call to
Stackdriver.
Diffstat (limited to 'src')
-rw-r--r--src/journald.rs71
-rw-r--r--src/main.rs115
2 files changed, 100 insertions, 86 deletions
diff --git a/src/journald.rs b/src/journald.rs
deleted file mode 100644
index 4892553ea57e..000000000000
--- a/src/journald.rs
+++ /dev/null
@@ -1,71 +0,0 @@
-//! This module contains FFI-bindings to the journald APi. See
-//! sd-journal(3) for detailed information about the API.
-//!
-//! Only calls required by journaldriver are implemented.
-
-/// This type represents an opaque pointer to an `sd_journal` struct.
-/// It should be changed to an `extern type` once RF1861 is
-/// stabilized.
-enum SdJournal {}
-
-use failure::Error;
-use std::mem;
-
-extern {
-    fn sd_journal_open(sd_journal: *mut SdJournal, flags: usize) -> usize;
-    fn sd_journal_close(sd_journal: *mut SdJournal);
-    fn sd_journal_next(sd_journal: *mut SdJournal) -> usize;
-}
-
-// Safe interface:
-
-/// This struct contains the opaque data used by libsystemd to
-/// reference the journal.
-pub struct Journal {
-    sd_journal: *mut SdJournal,
-}
-
-impl Drop for Journal {
-    fn drop(&mut self) {
-        unsafe {
-            sd_journal_close(self.sd_journal);
-        }
-    }
-}
-
-/// Open the journal for reading. No flags are supplied to libsystemd,
-/// which means that all journal entries will become available.
-pub fn open_journal() -> Result<Journal, Error> {
-    let (mut sd_journal, result) = unsafe {
-        let mut journal: SdJournal = mem::uninitialized();
-        let result = sd_journal_open(&mut journal, 0);
-        (journal, result)
-    };
-
-    ensure!(result == 0, "Could not open journal (errno: {})", result);
-    Ok(Journal { sd_journal: &mut sd_journal })
-}
-
-#[derive(Debug)]
-pub enum NextEntry {
-    /// If no new entries are available in the journal this variant is
-    /// returned.
-    NoEntry,
-
-    Entry,
-}
-
-impl Journal {
-    pub fn read_next(&self) -> Result<NextEntry, Error> {
-        let result = unsafe {
-            sd_journal_next(self.sd_journal)
-        };
-
-        match result {
-            0 => Ok(NextEntry::NoEntry),
-            1 => Ok(NextEntry::Entry),
-            n if n > 1 => bail!("Journal unexpectedly advanced by {} entries!", n),
-            _ => bail!("An error occured while advancing the journal (errno: {})", result),
-        }
-    }
-}
diff --git a/src/main.rs b/src/main.rs
index aca6aba900ad..9cc9dcee0d63 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -1,22 +1,107 @@
-#[macro_use] extern crate failure;
-extern crate libc;
+// #[macro_use] extern crate failure;
+#[macro_use] extern crate log;
 
-mod journald;
+extern crate env_logger;
+extern crate systemd;
 
+use systemd::journal::*;
 use std::process;
+use std::thread;
+use std::sync::mpsc::{channel, Receiver};
+use std::time::{Duration, Instant};
+use std::collections::vec_deque::{VecDeque, Drain};
 
-fn main() {
-    let mut journal = match journald::open_journal() {
-        Ok(journal) => journal,
-        Err(e) => {
-            println!("{}", e);
-            process::exit(1);
-        },
-    };
+#[derive(Debug)]
+struct Record {
+    message: Option<String>,
+    hostname: Option<String>,
+    unit: Option<String>,
+    timestamp: Option<String>,
+}
+
+impl From<JournalRecord> for Record {
+    fn from(mut record: JournalRecord) -> Record {
+        Record {
+            // The message field is technically just a convention, but
+            // journald seems to default to it when ingesting unit
+            // output.
+            message: record.remove("MESSAGE"),
+
+            // Presumably this is always set, but who can be sure
+            // about anything in this world.
+            hostname: record.remove("_HOSTNAME"),
+
+            // The unit is seemingly missing on kernel entries, but
+            // present on all others.
+            unit: record.remove("_SYSTEMD_UNIT"),
+
+            // This timestamp is present on most log entries
+            // (seemingly all that are ingested from the output
+            // systemd units).
+            timestamp: record.remove("_SOURCE_REALTIME_TIMESTAMP"),
+        }
+    }
+}
+
+/// This function spawns a double-looped, blocking receiver. It will
+/// buffer messages for a second before flushing them to Stackdriver.
+fn receiver_loop(rx: Receiver<Record>) {
+    let mut buf = VecDeque::new();
+    let iteration = Duration::from_millis(500);
+
+    loop {
+        trace!("Beginning outer iteration");
+        let now = Instant::now();
+
+        loop {
+            if now.elapsed() > iteration {
+                break;
+            }
+
+            if let Ok(record) = rx.recv_timeout(iteration) {
+                buf.push_back(record);
+            }
+        }
+
+        if !buf.is_empty() {
+            flush(buf.drain(..));
+        }
+
+        trace!("Done outer iteration");
+    }
+}
+
+/// Flushes all drained records to Stackdriver.
+fn flush(drain: Drain<Record>) {
+    let record_count = drain.count();
+    debug!("Flushed {} records", record_count);
+}
+
+fn main () {
+    env_logger::init();
+
+    let mut journal = Journal::open(JournalFiles::All, false, true)
+        .expect("Failed to open systemd journal");
+
+    match journal.seek(JournalSeek::Tail) {
+        Ok(cursor) => info!("Opened journal at cursor '{}'", cursor),
+        Err(err) => {
+            error!("Failed to set initial journal position: {}", err);
+            process::exit(1)
+        }
+    }
+
+    let (tx, rx) = channel();
+
+    let _receiver = thread::spawn(move || receiver_loop(rx));
+
+    journal.watch_all_elements(move |record| {
+        let record: Record = record.into();
 
-    println!("foo");
-    
-    let entry = journal.read_next();
+        if record.message.is_some() {
+            tx.send(record).ok();
+        }
 
-    println!("Entry: {:?}", entry)
+        Ok(())
+    }).expect("Failed to read new entries from journal");
 }