about summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--src/main.rs49
-rw-r--r--src/tests.rs11
2 files changed, 51 insertions, 9 deletions
diff --git a/src/main.rs b/src/main.rs
index 898f9943da44..f6c4ea0d9060 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -40,12 +40,15 @@
 #[macro_use] extern crate serde_json;
 #[macro_use] extern crate lazy_static;
 
-extern crate failure;
+extern crate chrono;
 extern crate env_logger;
-extern crate systemd;
-extern crate serde;
+extern crate failure;
 extern crate reqwest;
+extern crate serde;
+extern crate systemd;
 
+use chrono::offset::LocalResult;
+use chrono::prelude::*;
 use failure::ResultExt;
 use reqwest::{header, Client};
 use serde_json::Value;
@@ -174,12 +177,36 @@ fn message_to_payload(message: Option<String>) -> Payload {
     }
 }
 
+/// Attempt to parse journald's microsecond timestamps into a UTC
+/// timestamp.
+///
+/// Parse errors are dismissed and returned as empty options: There
+/// simply aren't any useful fallback mechanisms other than defaulting
+/// to ingestion time for journaldriver's use-case.
+fn parse_microseconds(input: String) -> Option<DateTime<Utc>> {
+    if input.len() != 16 {
+        return None;
+    }
+
+    let seconds: i64 = (&input[..10]).parse().ok()?;
+    let micros: u32 = (&input[10..]).parse().ok()?;
+
+    match Utc.timestamp_opt(seconds, micros * 1000) {
+        LocalResult::Single(time) => Some(time),
+        _ => None,
+    }
+}
+
 /// This structure represents a log entry in the format expected by
 /// the Stackdriver API.
 #[derive(Debug, Serialize)]
 #[serde(rename_all = "camelCase")]
 struct LogEntry {
     labels: Value,
+
+    #[serde(skip_serializing_if = "Option::is_none")]
+    timestamp: Option<DateTime<Utc>>,
+
     #[serde(flatten)]
     payload: Payload,
 }
@@ -203,15 +230,19 @@ impl From<JournalRecord> for LogEntry {
         // present on all others.
         let unit = record.remove("_SYSTEMD_UNIT");
 
-        // TODO: This timestamp (in microseconds) should be parsed
-        // into a DateTime<Utc> and used instead of the ingestion
-        // time.
-        // let timestamp = record
-        //     .remove("_SOURCE_REALTIME_TIMESTAMP")
-        //     .map();
+        // The source timestamp (if present) is specified in
+        // microseconds since epoch.
+        //
+        // If it is not present or can not be parsed, journaldriver
+        // will not send a timestamp for the log entry and it will
+        // default to the ingestion time.
+        let timestamp = record
+            .remove("_SOURCE_REALTIME_TIMESTAMP")
+            .and_then(parse_microseconds);
 
         LogEntry {
             payload,
+            timestamp,
             labels: json!({
                 "host": hostname,
                 "unit": unit.unwrap_or_else(|| "syslog".into()),
diff --git a/src/tests.rs b/src/tests.rs
index 623cb6b59981..6840602eca55 100644
--- a/src/tests.rs
+++ b/src/tests.rs
@@ -5,6 +5,7 @@ use serde_json::to_string;
 fn test_text_entry_serialization() {
     let entry = LogEntry {
         labels: Value::Null,
+        timestamp: None,
         payload: Payload::TextPayload {
             text_payload: "test entry".into(),
         }
@@ -20,6 +21,7 @@ fn test_text_entry_serialization() {
 fn test_json_entry_serialization() {
     let entry = LogEntry {
         labels: Value::Null,
+        timestamp: None,
         payload: Payload::TextPayload {
             text_payload: "test entry".into(),
         }
@@ -78,3 +80,12 @@ fn test_json_no_object() {
 
     assert_eq!(expected, payload, "Non-object JSON payload should be plain text");
 }
+
+#[test]
+fn test_parse_microseconds() {
+    let input: String = "1529175149291187".into();
+    let expected: DateTime<Utc> = "2018-06-16T18:52:29.291187Z"
+        .to_string().parse().unwrap();
+
+    assert_eq!(Some(expected), parse_microseconds(input));
+}