about summary refs log tree commit diff
path: root/src/main.rs
diff options
context:
space:
mode:
authorVincent Ambo <tazjin@gmail.com>2018-06-16T15·57+0200
committerVincent Ambo <tazjin@gmail.com>2018-06-16T15·57+0200
commit10f23a9dfb28f8f4ca838bad9ee402b0611ef6da (patch)
tree848dd6939ad35642550ba56a2f0828630974b2e7 /src/main.rs
parent71f0afe4b54f449978e60732f492f19694902609 (diff)
feat(main): Implement parsing of JSON payloads
Stackdriver supports structured JSON payloads in addition to simple
plain-text payloads.

This commit introduces a new feature in which journaldriver will
attempt to parse incoming log messages into JSON vaues and forward
them as structured payloads if they are JSON objects.

Messages that can not be parsed into JSON objects will continue to be
forwarded as plain text messages.
Diffstat (limited to 'src/main.rs')
-rw-r--r--src/main.rs53
1 files changed, 50 insertions, 3 deletions
diff --git a/src/main.rs b/src/main.rs
index 449be2f8185d..894f72676ba5 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -57,6 +57,9 @@ use std::process;
 use std::time::{Duration, Instant};
 use systemd::journal::*;
 
+#[cfg(test)]
+mod tests;
+
 const ENTRIES_WRITE_URL: &str = "https://logging.googleapis.com/v2/entries:write";
 const METADATA_TOKEN_URL: &str = "http://metadata.google.internal/computeMetadata/v1/instance/service-accounts/default/token";
 const METADATA_ID_URL: &str = "http://metadata.google.internal/computeMetadata/v1/instance/id";
@@ -121,13 +124,57 @@ fn get_metadata(url: &str) -> Result<String> {
     Ok(output.trim().into())
 }
 
+
+/// This structure represents the different types of payloads
+/// supported by journaldriver.
+///
+/// Currently log entries can either contain plain text messages or
+/// structured payloads in JSON-format.
+#[derive(Debug, Serialize, PartialEq)]
+#[serde(untagged)]
+enum Payload {
+    TextPayload {
+        #[serde(rename = "textPayload")]
+        text_payload: String,
+    },
+    JsonPayload {
+        #[serde(rename = "jsonPaylaod")]
+        json_payload: Value,
+    },
+}
+
+/// Attempt to parse a log message as JSON and return it as a
+/// structured payload. If parsing fails, return the entry in plain
+/// text format.
+fn message_to_payload(message: Option<String>) -> Payload {
+    match message {
+        None => Payload::TextPayload { text_payload: "empty log entry".into() },
+        Some(text_payload) => {
+            // Attempt to deserialize the text payload as a generic
+            // JSON value.
+            if let Ok(json_payload) = serde_json::from_str::<Value>(&text_payload) {
+                // If JSON-parsing succeeded on the payload, check
+                // whether we parsed an object (Stackdriver does not
+                // expect other types of JSON payload) and return it
+                // in that case.
+                if json_payload.is_object() {
+                    return Payload::JsonPayload { json_payload }
+                }
+            }
+
+            Payload::TextPayload { text_payload }
+        }
+    }
+}
+
 /// This structure represents a log entry in the format expected by
 /// the Stackdriver API.
 #[derive(Debug, Serialize)]
 #[serde(rename_all = "camelCase")]
 struct LogEntry {
     labels: Value,
-    text_payload: String, // TODO: attempt to parse jsonPayloads
+    #[serde(flatten)]
+    payload: Payload,
 }
 
 impl From<JournalRecord> for LogEntry {
@@ -139,7 +186,7 @@ impl From<JournalRecord> for LogEntry {
         // The message field is technically just a convention, but
         // journald seems to default to it when ingesting unit
         // output.
-        let message = record.remove("MESSAGE");
+        let payload = message_to_payload(record.remove("MESSAGE"));
 
         // Presumably this is always set, but who can be sure
         // about anything in this world.
@@ -157,7 +204,7 @@ impl From<JournalRecord> for LogEntry {
         //     .map();
 
         LogEntry {
-            text_payload: message.unwrap_or_else(|| "empty log entry".into()),
+            payload,
             labels: json!({
                 "host": hostname,
                 "unit": unit.unwrap_or_else(|| "syslog".into()),