diff options
author | Vincent Ambo <tazjin@gmail.com> | 2018-05-27T21·57+0200 |
---|---|---|
committer | Vincent Ambo <tazjin@gmail.com> | 2018-05-27T21·57+0200 |
commit | 6793b25a67d79f4b0022e5c253dca56a3c96630f (patch) | |
tree | d79227e4b9f0a8a007fef315b0f1c32e68760f2b | |
parent | c5cd12b81f3a2908c3f8202dbc94dc535cf092c4 (diff) |
feat(main): Implement receiver & flushing logic
The only thing missing for a 0.1 test run is the actual gRPC call to Stackdriver.
-rw-r--r-- | Cargo.lock | 226 | ||||
-rw-r--r-- | Cargo.toml | 3 | ||||
-rw-r--r-- | src/journald.rs | 71 | ||||
-rw-r--r-- | src/main.rs | 115 |
4 files changed, 329 insertions, 86 deletions
diff --git a/Cargo.lock b/Cargo.lock index 1a17c08226c9..fb27771d2089 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1,4 +1,22 @@ [[package]] +name = "aho-corasick" +version = "0.6.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "memchr 2.0.1 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "atty" +version = "0.2.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "libc 0.2.41 (registry+https://github.com/rust-lang/crates.io-index)", + "termion 1.5.1 (registry+https://github.com/rust-lang/crates.io-index)", + "winapi 0.3.4 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] name = "backtrace" version = "0.3.8" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -30,6 +48,27 @@ version = "0.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" [[package]] +name = "cstr-argument" +version = "0.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "cfg-if 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)", + "memchr 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "env_logger" +version = "0.5.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "atty 0.2.10 (registry+https://github.com/rust-lang/crates.io-index)", + "humantime 1.1.1 (registry+https://github.com/rust-lang/crates.io-index)", + "log 0.4.1 (registry+https://github.com/rust-lang/crates.io-index)", + "regex 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)", + "termcolor 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] name = "failure" version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -49,30 +88,117 @@ dependencies = [ ] [[package]] +name = "humantime" +version = "1.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "quick-error 1.2.2 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] name = "journaldriver" version = "0.1.0" dependencies = [ + "env_logger 0.5.10 (registry+https://github.com/rust-lang/crates.io-index)", "failure 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)", "libc 0.2.41 (registry+https://github.com/rust-lang/crates.io-index)", + "log 0.4.1 (registry+https://github.com/rust-lang/crates.io-index)", "pkg-config 0.3.11 (registry+https://github.com/rust-lang/crates.io-index)", + "systemd 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] +name = "lazy_static" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" + +[[package]] name = "libc" version = "0.2.41" source = "registry+https://github.com/rust-lang/crates.io-index" [[package]] +name = "libsystemd-sys" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "libc 0.2.41 (registry+https://github.com/rust-lang/crates.io-index)", + "pkg-config 0.3.11 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "log" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "cfg-if 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "memchr" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "libc 0.2.41 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "memchr" +version = "2.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "libc 0.2.41 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] name = "pkg-config" version = "0.3.11" source = "registry+https://github.com/rust-lang/crates.io-index" [[package]] +name = "quick-error" +version = "1.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" + +[[package]] name = "quote" version = "0.3.15" source = "registry+https://github.com/rust-lang/crates.io-index" [[package]] +name = "redox_syscall" +version = "0.1.38" +source = "registry+https://github.com/rust-lang/crates.io-index" + +[[package]] +name = "redox_termios" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "redox_syscall 0.1.38 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "regex" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "aho-corasick 0.6.4 (registry+https://github.com/rust-lang/crates.io-index)", + "memchr 2.0.1 (registry+https://github.com/rust-lang/crates.io-index)", + "regex-syntax 0.6.0 (registry+https://github.com/rust-lang/crates.io-index)", + "thread_local 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)", + "utf8-ranges 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "regex-syntax" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "ucd-util 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] name = "rustc-demangle" version = "0.1.8" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -105,11 +231,78 @@ dependencies = [ ] [[package]] +name = "systemd" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "cstr-argument 0.0.2 (registry+https://github.com/rust-lang/crates.io-index)", + "libc 0.2.41 (registry+https://github.com/rust-lang/crates.io-index)", + "libsystemd-sys 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)", + "log 0.4.1 (registry+https://github.com/rust-lang/crates.io-index)", + "utf8-cstr 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "termcolor" +version = "0.3.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "wincolor 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "termion" +version = "1.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "libc 0.2.41 (registry+https://github.com/rust-lang/crates.io-index)", + "redox_syscall 0.1.38 (registry+https://github.com/rust-lang/crates.io-index)", + "redox_termios 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "thread_local" +version = "0.3.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "lazy_static 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)", + "unreachable 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "ucd-util" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" + +[[package]] name = "unicode-xid" version = "0.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" [[package]] +name = "unreachable" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "void 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "utf8-cstr" +version = "0.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" + +[[package]] +name = "utf8-ranges" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" + +[[package]] +name = "void" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" + +[[package]] name = "winapi" version = "0.3.4" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -128,21 +321,54 @@ name = "winapi-x86_64-pc-windows-gnu" version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" +[[package]] +name = "wincolor" +version = "0.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "winapi 0.3.4 (registry+https://github.com/rust-lang/crates.io-index)", +] + [metadata] +"checksum aho-corasick 0.6.4 (registry+https://github.com/rust-lang/crates.io-index)" = "d6531d44de723825aa81398a6415283229725a00fa30713812ab9323faa82fc4" +"checksum atty 0.2.10 (registry+https://github.com/rust-lang/crates.io-index)" = "2fc4a1aa4c24c0718a250f0681885c1af91419d242f29eb8f2ab28502d80dbd1" "checksum backtrace 0.3.8 (registry+https://github.com/rust-lang/crates.io-index)" = "dbdd17cd962b570302f5297aea8648d5923e22e555c2ed2d8b2e34eca646bf6d" "checksum backtrace-sys 0.1.21 (registry+https://github.com/rust-lang/crates.io-index)" = "b46a4e68c24954dfc8a0e515b069f695481d2997b840356db013ff9e52cdb8fe" "checksum cc 1.0.15 (registry+https://github.com/rust-lang/crates.io-index)" = "0ebb87d1116151416c0cf66a0e3fb6430cccd120fd6300794b4dfaa050ac40ba" "checksum cfg-if 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)" = "405216fd8fe65f718daa7102ea808a946b6ce40c742998fbfd3463645552de18" +"checksum cstr-argument 0.0.2 (registry+https://github.com/rust-lang/crates.io-index)" = "514570a4b719329df37f93448a70df2baac553020d0eb43a8dfa9c1f5ba7b658" +"checksum env_logger 0.5.10 (registry+https://github.com/rust-lang/crates.io-index)" = "0e6e40ebb0e66918a37b38c7acab4e10d299e0463fe2af5d29b9cc86710cfd2a" "checksum failure 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "934799b6c1de475a012a02dab0ace1ace43789ee4b99bcfbf1a2e3e8ced5de82" "checksum failure_derive 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "c7cdda555bb90c9bb67a3b670a0f42de8e73f5981524123ad8578aafec8ddb8b" +"checksum humantime 1.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "0484fda3e7007f2a4a0d9c3a703ca38c71c54c55602ce4660c419fd32e188c9e" +"checksum lazy_static 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)" = "c8f31047daa365f19be14b47c29df4f7c3b581832407daabe6ae77397619237d" "checksum libc 0.2.41 (registry+https://github.com/rust-lang/crates.io-index)" = "ac8ebf8343a981e2fa97042b14768f02ed3e1d602eac06cae6166df3c8ced206" +"checksum libsystemd-sys 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "e751b723417158e0949ba470bee4affd6f1dd6b67622b5240d79186631b6a0d9" +"checksum log 0.4.1 (registry+https://github.com/rust-lang/crates.io-index)" = "89f010e843f2b1a31dbd316b3b8d443758bc634bed37aabade59c686d644e0a2" +"checksum memchr 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)" = "148fab2e51b4f1cfc66da2a7c32981d1d3c083a803978268bb11fe4b86925e7a" +"checksum memchr 2.0.1 (registry+https://github.com/rust-lang/crates.io-index)" = "796fba70e76612589ed2ce7f45282f5af869e0fdd7cc6199fa1aa1f1d591ba9d" "checksum pkg-config 0.3.11 (registry+https://github.com/rust-lang/crates.io-index)" = "110d5ee3593dbb73f56294327fe5668bcc997897097cbc76b51e7aed3f52452f" +"checksum quick-error 1.2.2 (registry+https://github.com/rust-lang/crates.io-index)" = "9274b940887ce9addde99c4eee6b5c44cc494b182b97e73dc8ffdcb3397fd3f0" "checksum quote 0.3.15 (registry+https://github.com/rust-lang/crates.io-index)" = "7a6e920b65c65f10b2ae65c831a81a073a89edd28c7cce89475bff467ab4167a" +"checksum redox_syscall 0.1.38 (registry+https://github.com/rust-lang/crates.io-index)" = "0a12d51a5b5fd700e6c757f15877685bfa04fd7eb60c108f01d045cafa0073c2" +"checksum redox_termios 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "7e891cfe48e9100a70a3b6eb652fef28920c117d366339687bd5576160db0f76" +"checksum regex 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)" = "75ecf88252dce580404a22444fc7d626c01815debba56a7f4f536772a5ff19d3" +"checksum regex-syntax 0.6.0 (registry+https://github.com/rust-lang/crates.io-index)" = "8f1ac0f60d675cc6cf13a20ec076568254472551051ad5dd050364d70671bf6b" "checksum rustc-demangle 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)" = "76d7ba1feafada44f2d38eed812bd2489a03c0f5abb975799251518b68848649" "checksum syn 0.11.11 (registry+https://github.com/rust-lang/crates.io-index)" = "d3b891b9015c88c576343b9b3e41c2c11a51c219ef067b264bd9c8aa9b441dad" "checksum synom 0.11.3 (registry+https://github.com/rust-lang/crates.io-index)" = "a393066ed9010ebaed60b9eafa373d4b1baac186dd7e008555b0f702b51945b6" "checksum synstructure 0.6.1 (registry+https://github.com/rust-lang/crates.io-index)" = "3a761d12e6d8dcb4dcf952a7a89b475e3a9d69e4a69307e01a470977642914bd" +"checksum systemd 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)" = "1b62a732355787f960c25536210ae0a981aca2e5dae9dab8491bdae39613ce48" +"checksum termcolor 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)" = "adc4587ead41bf016f11af03e55a624c06568b5a19db4e90fde573d805074f83" +"checksum termion 1.5.1 (registry+https://github.com/rust-lang/crates.io-index)" = "689a3bdfaab439fd92bc87df5c4c78417d3cbe537487274e9b0b2dce76e92096" +"checksum thread_local 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)" = "279ef31c19ededf577bfd12dfae728040a21f635b06a24cd670ff510edd38963" +"checksum ucd-util 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "fd2be2d6639d0f8fe6cdda291ad456e23629558d466e2789d2c3e9892bda285d" "checksum unicode-xid 0.0.4 (registry+https://github.com/rust-lang/crates.io-index)" = "8c1f860d7d29cf02cb2f3f359fd35991af3d30bac52c57d265a3c461074cb4dc" +"checksum unreachable 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)" = "382810877fe448991dfc7f0dd6e3ae5d58088fd0ea5e35189655f84e6814fa56" +"checksum utf8-cstr 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)" = "55bcbb425141152b10d5693095950b51c3745d019363fc2929ffd8f61449b628" +"checksum utf8-ranges 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)" = "662fab6525a98beff2921d7f61a39e7d59e0b425ebc7d0d9e66d316e55124122" +"checksum void 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)" = "6a02e4885ed3bc0f2de90ea6dd45ebcbb66dacffe03547fadbb0eeae2770887d" "checksum winapi 0.3.4 (registry+https://github.com/rust-lang/crates.io-index)" = "04e3bd221fcbe8a271359c04f21a76db7d0c6028862d1bb5512d85e1e2eb5bb3" "checksum winapi-i686-pc-windows-gnu 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)" = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6" "checksum winapi-x86_64-pc-windows-gnu 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)" = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" +"checksum wincolor 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)" = "eeb06499a3a4d44302791052df005d5232b927ed1a9658146d842165c4de7767" diff --git a/Cargo.toml b/Cargo.toml index b32bd023d51c..f521d0c39dd6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -6,6 +6,9 @@ authors = ["Vincent Ambo <mail@tazj.in>"] [dependencies] libc = "0.2" failure = "0.1" +systemd = "0.3" +log = "0.4" +env_logger = "0.5" [build-dependencies] pkg-config = "0.3" diff --git a/src/journald.rs b/src/journald.rs deleted file mode 100644 index 4892553ea57e..000000000000 --- a/src/journald.rs +++ /dev/null @@ -1,71 +0,0 @@ -//! This module contains FFI-bindings to the journald APi. See -//! sd-journal(3) for detailed information about the API. -//! -//! Only calls required by journaldriver are implemented. - -/// This type represents an opaque pointer to an `sd_journal` struct. -/// It should be changed to an `extern type` once RF1861 is -/// stabilized. -enum SdJournal {} - -use failure::Error; -use std::mem; - -extern { - fn sd_journal_open(sd_journal: *mut SdJournal, flags: usize) -> usize; - fn sd_journal_close(sd_journal: *mut SdJournal); - fn sd_journal_next(sd_journal: *mut SdJournal) -> usize; -} - -// Safe interface: - -/// This struct contains the opaque data used by libsystemd to -/// reference the journal. -pub struct Journal { - sd_journal: *mut SdJournal, -} - -impl Drop for Journal { - fn drop(&mut self) { - unsafe { - sd_journal_close(self.sd_journal); - } - } -} - -/// Open the journal for reading. No flags are supplied to libsystemd, -/// which means that all journal entries will become available. -pub fn open_journal() -> Result<Journal, Error> { - let (mut sd_journal, result) = unsafe { - let mut journal: SdJournal = mem::uninitialized(); - let result = sd_journal_open(&mut journal, 0); - (journal, result) - }; - - ensure!(result == 0, "Could not open journal (errno: {})", result); - Ok(Journal { sd_journal: &mut sd_journal }) -} - -#[derive(Debug)] -pub enum NextEntry { - /// If no new entries are available in the journal this variant is - /// returned. - NoEntry, - - Entry, -} - -impl Journal { - pub fn read_next(&self) -> Result<NextEntry, Error> { - let result = unsafe { - sd_journal_next(self.sd_journal) - }; - - match result { - 0 => Ok(NextEntry::NoEntry), - 1 => Ok(NextEntry::Entry), - n if n > 1 => bail!("Journal unexpectedly advanced by {} entries!", n), - _ => bail!("An error occured while advancing the journal (errno: {})", result), - } - } -} diff --git a/src/main.rs b/src/main.rs index aca6aba900ad..9cc9dcee0d63 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,22 +1,107 @@ -#[macro_use] extern crate failure; -extern crate libc; +// #[macro_use] extern crate failure; +#[macro_use] extern crate log; -mod journald; +extern crate env_logger; +extern crate systemd; +use systemd::journal::*; use std::process; +use std::thread; +use std::sync::mpsc::{channel, Receiver}; +use std::time::{Duration, Instant}; +use std::collections::vec_deque::{VecDeque, Drain}; -fn main() { - let mut journal = match journald::open_journal() { - Ok(journal) => journal, - Err(e) => { - println!("{}", e); - process::exit(1); - }, - }; +#[derive(Debug)] +struct Record { + message: Option<String>, + hostname: Option<String>, + unit: Option<String>, + timestamp: Option<String>, +} + +impl From<JournalRecord> for Record { + fn from(mut record: JournalRecord) -> Record { + Record { + // The message field is technically just a convention, but + // journald seems to default to it when ingesting unit + // output. + message: record.remove("MESSAGE"), + + // Presumably this is always set, but who can be sure + // about anything in this world. + hostname: record.remove("_HOSTNAME"), + + // The unit is seemingly missing on kernel entries, but + // present on all others. + unit: record.remove("_SYSTEMD_UNIT"), + + // This timestamp is present on most log entries + // (seemingly all that are ingested from the output + // systemd units). + timestamp: record.remove("_SOURCE_REALTIME_TIMESTAMP"), + } + } +} + +/// This function spawns a double-looped, blocking receiver. It will +/// buffer messages for a second before flushing them to Stackdriver. +fn receiver_loop(rx: Receiver<Record>) { + let mut buf = VecDeque::new(); + let iteration = Duration::from_millis(500); + + loop { + trace!("Beginning outer iteration"); + let now = Instant::now(); + + loop { + if now.elapsed() > iteration { + break; + } + + if let Ok(record) = rx.recv_timeout(iteration) { + buf.push_back(record); + } + } + + if !buf.is_empty() { + flush(buf.drain(..)); + } + + trace!("Done outer iteration"); + } +} + +/// Flushes all drained records to Stackdriver. +fn flush(drain: Drain<Record>) { + let record_count = drain.count(); + debug!("Flushed {} records", record_count); +} + +fn main () { + env_logger::init(); + + let mut journal = Journal::open(JournalFiles::All, false, true) + .expect("Failed to open systemd journal"); + + match journal.seek(JournalSeek::Tail) { + Ok(cursor) => info!("Opened journal at cursor '{}'", cursor), + Err(err) => { + error!("Failed to set initial journal position: {}", err); + process::exit(1) + } + } + + let (tx, rx) = channel(); + + let _receiver = thread::spawn(move || receiver_loop(rx)); + + journal.watch_all_elements(move |record| { + let record: Record = record.into(); - println!("foo"); - - let entry = journal.read_next(); + if record.message.is_some() { + tx.send(record).ok(); + } - println!("Entry: {:?}", entry) + Ok(()) + }).expect("Failed to read new entries from journal"); } |