diff options
author | Vincent Ambo <tazjin@gmail.com> | 2018-06-17T01·29+0200 |
---|---|---|
committer | Vincent Ambo <github@tazj.in> | 2018-06-17T13·46+0200 |
commit | 87ab3c806c40d565b75c18237494d591282ccbc2 (patch) | |
tree | c2064a0a45ed78417590ae33d2f7643fbe70e8aa /src/main.rs | |
parent | f626d8843865975e0804a1e04676255d787687a0 (diff) |
feat(main): Parse timestamps out of journald entries
Instead of relying on Stackdriver's ingestion timestamps, parse timestamps out of journal entries and provide those to Stackdriver. If a timestamp could not be parsed out of a log entry, the ingestion time is used as the fallback.
Diffstat (limited to 'src/main.rs')
-rw-r--r-- | src/main.rs | 49 |
1 files changed, 40 insertions, 9 deletions
diff --git a/src/main.rs b/src/main.rs index 898f9943da44..f6c4ea0d9060 100644 --- a/src/main.rs +++ b/src/main.rs @@ -40,12 +40,15 @@ #[macro_use] extern crate serde_json; #[macro_use] extern crate lazy_static; -extern crate failure; +extern crate chrono; extern crate env_logger; -extern crate systemd; -extern crate serde; +extern crate failure; extern crate reqwest; +extern crate serde; +extern crate systemd; +use chrono::offset::LocalResult; +use chrono::prelude::*; use failure::ResultExt; use reqwest::{header, Client}; use serde_json::Value; @@ -174,12 +177,36 @@ fn message_to_payload(message: Option<String>) -> Payload { } } +/// Attempt to parse journald's microsecond timestamps into a UTC +/// timestamp. +/// +/// Parse errors are dismissed and returned as empty options: There +/// simply aren't any useful fallback mechanisms other than defaulting +/// to ingestion time for journaldriver's use-case. +fn parse_microseconds(input: String) -> Option<DateTime<Utc>> { + if input.len() != 16 { + return None; + } + + let seconds: i64 = (&input[..10]).parse().ok()?; + let micros: u32 = (&input[10..]).parse().ok()?; + + match Utc.timestamp_opt(seconds, micros * 1000) { + LocalResult::Single(time) => Some(time), + _ => None, + } +} + /// This structure represents a log entry in the format expected by /// the Stackdriver API. #[derive(Debug, Serialize)] #[serde(rename_all = "camelCase")] struct LogEntry { labels: Value, + + #[serde(skip_serializing_if = "Option::is_none")] + timestamp: Option<DateTime<Utc>>, + #[serde(flatten)] payload: Payload, } @@ -203,15 +230,19 @@ impl From<JournalRecord> for LogEntry { // present on all others. let unit = record.remove("_SYSTEMD_UNIT"); - // TODO: This timestamp (in microseconds) should be parsed - // into a DateTime<Utc> and used instead of the ingestion - // time. - // let timestamp = record - // .remove("_SOURCE_REALTIME_TIMESTAMP") - // .map(); + // The source timestamp (if present) is specified in + // microseconds since epoch. + // + // If it is not present or can not be parsed, journaldriver + // will not send a timestamp for the log entry and it will + // default to the ingestion time. + let timestamp = record + .remove("_SOURCE_REALTIME_TIMESTAMP") + .and_then(parse_microseconds); LogEntry { payload, + timestamp, labels: json!({ "host": hostname, "unit": unit.unwrap_or_else(|| "syslog".into()), |