about summary refs log tree commit diff
path: root/src/main.rs
diff options
context:
space:
mode:
authorVincent Ambo <tazjin@gmail.com>2018-06-17T01·29+0200
committerVincent Ambo <github@tazj.in>2018-06-17T13·46+0200
commit87ab3c806c40d565b75c18237494d591282ccbc2 (patch)
treec2064a0a45ed78417590ae33d2f7643fbe70e8aa /src/main.rs
parentf626d8843865975e0804a1e04676255d787687a0 (diff)
feat(main): Parse timestamps out of journald entries
Instead of relying on Stackdriver's ingestion timestamps, parse
timestamps out of journal entries and provide those to Stackdriver.

If a timestamp could not be parsed out of a log entry, the ingestion
time is used as the fallback.
Diffstat (limited to 'src/main.rs')
-rw-r--r--src/main.rs49
1 files changed, 40 insertions, 9 deletions
diff --git a/src/main.rs b/src/main.rs
index 898f9943da..f6c4ea0d90 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -40,12 +40,15 @@
 #[macro_use] extern crate serde_json;
 #[macro_use] extern crate lazy_static;
 
-extern crate failure;
+extern crate chrono;
 extern crate env_logger;
-extern crate systemd;
-extern crate serde;
+extern crate failure;
 extern crate reqwest;
+extern crate serde;
+extern crate systemd;
 
+use chrono::offset::LocalResult;
+use chrono::prelude::*;
 use failure::ResultExt;
 use reqwest::{header, Client};
 use serde_json::Value;
@@ -174,12 +177,36 @@ fn message_to_payload(message: Option<String>) -> Payload {
     }
 }
 
+/// Attempt to parse journald's microsecond timestamps into a UTC
+/// timestamp.
+///
+/// Parse errors are dismissed and returned as empty options: There
+/// simply aren't any useful fallback mechanisms other than defaulting
+/// to ingestion time for journaldriver's use-case.
+fn parse_microseconds(input: String) -> Option<DateTime<Utc>> {
+    if input.len() != 16 {
+        return None;
+    }
+
+    let seconds: i64 = (&input[..10]).parse().ok()?;
+    let micros: u32 = (&input[10..]).parse().ok()?;
+
+    match Utc.timestamp_opt(seconds, micros * 1000) {
+        LocalResult::Single(time) => Some(time),
+        _ => None,
+    }
+}
+
 /// This structure represents a log entry in the format expected by
 /// the Stackdriver API.
 #[derive(Debug, Serialize)]
 #[serde(rename_all = "camelCase")]
 struct LogEntry {
     labels: Value,
+
+    #[serde(skip_serializing_if = "Option::is_none")]
+    timestamp: Option<DateTime<Utc>>,
+
     #[serde(flatten)]
     payload: Payload,
 }
@@ -203,15 +230,19 @@ impl From<JournalRecord> for LogEntry {
         // present on all others.
         let unit = record.remove("_SYSTEMD_UNIT");
 
-        // TODO: This timestamp (in microseconds) should be parsed
-        // into a DateTime<Utc> and used instead of the ingestion
-        // time.
-        // let timestamp = record
-        //     .remove("_SOURCE_REALTIME_TIMESTAMP")
-        //     .map();
+        // The source timestamp (if present) is specified in
+        // microseconds since epoch.
+        //
+        // If it is not present or can not be parsed, journaldriver
+        // will not send a timestamp for the log entry and it will
+        // default to the ingestion time.
+        let timestamp = record
+            .remove("_SOURCE_REALTIME_TIMESTAMP")
+            .and_then(parse_microseconds);
 
         LogEntry {
             payload,
+            timestamp,
             labels: json!({
                 "host": hostname,
                 "unit": unit.unwrap_or_else(|| "syslog".into()),