about summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--Cargo.lock29
-rw-r--r--Cargo.toml4
-rw-r--r--src/main.rs245
-rw-r--r--src/stackdriver.rs92
4 files changed, 195 insertions, 175 deletions
diff --git a/Cargo.lock b/Cargo.lock
index b522ee53b3cf..95fcd8e6896e 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -99,17 +99,6 @@ version = "0.1.3"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 
 [[package]]
-name = "chrono"
-version = "0.4.2"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-dependencies = [
- "num-integer 0.1.38 (registry+https://github.com/rust-lang/crates.io-index)",
- "num-traits 0.2.4 (registry+https://github.com/rust-lang/crates.io-index)",
- "serde 1.0.62 (registry+https://github.com/rust-lang/crates.io-index)",
- "time 0.1.40 (registry+https://github.com/rust-lang/crates.io-index)",
-]
-
-[[package]]
 name = "core-foundation"
 version = "0.2.3"
 source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -339,10 +328,10 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
 name = "journaldriver"
 version = "0.1.0"
 dependencies = [
- "chrono 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)",
  "env_logger 0.5.10 (registry+https://github.com/rust-lang/crates.io-index)",
  "failure 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)",
  "hyper 0.11.27 (registry+https://github.com/rust-lang/crates.io-index)",
+ "lazy_static 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)",
  "libc 0.2.41 (registry+https://github.com/rust-lang/crates.io-index)",
  "log 0.4.1 (registry+https://github.com/rust-lang/crates.io-index)",
  "pkg-config 0.3.11 (registry+https://github.com/rust-lang/crates.io-index)",
@@ -526,19 +515,6 @@ version = "0.1.12"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 
 [[package]]
-name = "num-integer"
-version = "0.1.38"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-dependencies = [
- "num-traits 0.2.4 (registry+https://github.com/rust-lang/crates.io-index)",
-]
-
-[[package]]
-name = "num-traits"
-version = "0.2.4"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-
-[[package]]
 name = "num_cpus"
 version = "1.8.0"
 source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -1269,7 +1245,6 @@ dependencies = [
 "checksum bytes 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)" = "7dd32989a66957d3f0cba6588f15d4281a733f4e9ffc43fcd2385f57d3bf99ff"
 "checksum cc 1.0.15 (registry+https://github.com/rust-lang/crates.io-index)" = "0ebb87d1116151416c0cf66a0e3fb6430cccd120fd6300794b4dfaa050ac40ba"
 "checksum cfg-if 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)" = "405216fd8fe65f718daa7102ea808a946b6ce40c742998fbfd3463645552de18"
-"checksum chrono 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)" = "1cce36c92cb605414e9b824f866f5babe0a0368e39ea07393b9b63cf3844c0e6"
 "checksum core-foundation 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)" = "25bfd746d203017f7d5cbd31ee5d8e17f94b6521c7af77ece6c9e4b2d4b16c67"
 "checksum core-foundation-sys 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)" = "065a5d7ffdcbc8fa145d6f0746f3555025b9097a9e9cda59f7467abae670c78d"
 "checksum crc 1.8.1 (registry+https://github.com/rust-lang/crates.io-index)" = "d663548de7f5cca343f1e0a48d14dcfb0e9eb4e079ec58883b7251539fa10aeb"
@@ -1316,8 +1291,6 @@ dependencies = [
 "checksum native-tls 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)" = "f74dbadc8b43df7864539cedb7bc91345e532fdd913cfdc23ad94f4d2d40fbc0"
 "checksum net2 0.2.32 (registry+https://github.com/rust-lang/crates.io-index)" = "9044faf1413a1057267be51b5afba8eb1090bd2231c693664aa1db716fe1eae0"
 "checksum nodrop 0.1.12 (registry+https://github.com/rust-lang/crates.io-index)" = "9a2228dca57108069a5262f2ed8bd2e82496d2e074a06d1ccc7ce1687b6ae0a2"
-"checksum num-integer 0.1.38 (registry+https://github.com/rust-lang/crates.io-index)" = "6ac0ea58d64a89d9d6b7688031b3be9358d6c919badcf7fbb0527ccfd891ee45"
-"checksum num-traits 0.2.4 (registry+https://github.com/rust-lang/crates.io-index)" = "775393e285254d2f5004596d69bb8bc1149754570dcc08cf30cabeba67955e28"
 "checksum num_cpus 1.8.0 (registry+https://github.com/rust-lang/crates.io-index)" = "c51a3322e4bca9d212ad9a158a02abc6934d005490c054a2778df73a70aa0a30"
 "checksum openssl 0.9.24 (registry+https://github.com/rust-lang/crates.io-index)" = "a3605c298474a3aa69de92d21139fb5e2a81688d308262359d85cdd0d12a7985"
 "checksum openssl-sys 0.9.31 (registry+https://github.com/rust-lang/crates.io-index)" = "a4d6a27d108b29befe1822d40e2e22f85518dac59acbf7f30fdc532f48fd0a77"
diff --git a/Cargo.toml b/Cargo.toml
index e314c0a51ef0..66e83d25300a 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -13,8 +13,8 @@ serde = "1.0"
 serde_derive = "1.0"
 serde_json = "1.0"
 reqwest = "0.8"
-chrono = { version = "0.4", features = ["serde"] }
-hyper = "*" # whatever reqwest drags in
+hyper = "0.11" # whatever reqwest drags in
+lazy_static = "1.0"
 
 [build-dependencies]
 pkg-config = "0.3"
diff --git a/src/main.rs b/src/main.rs
index c6cfa8df1e76..0e14827b6d12 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -1,58 +1,152 @@
-#[macro_use] extern crate failure;
+//! This file implements journaldriver, a small application that
+//! forwards logs from journald (systemd's log facility) to
+//! Stackdriver Logging.
+//!
+//! Log entries are read continously from journald and are forwarded
+//! to Stackdriver in batches.
+//!
+//! Stackdriver Logging has a concept of monitored resources. In the
+//! simplest (and currently only supported) case this monitored
+//! resource will be the GCE instance on which journaldriver is
+//! running.
+//!
+//! Information about the instance, the project and required security
+//! credentials are retrieved from Google's metadata instance on GCP.
+//!
+//! Things left to do:
+//! * TODO 2018-06-15: Support non-GCP instances (see comment on
+//!   monitored resource descriptor)
+//! * TODO 2018-06-15: Extract timestamps from journald instead of
+//!   relying on ingestion timestamps.
+//! * TODO 2018-06-15: Persist last known cursor position after
+//!   flushing to allow journaldriver to resume from the same position
+//!   after a restart.
+
 #[macro_use] extern crate hyper;
 #[macro_use] extern crate log;
 #[macro_use] extern crate serde_derive;
+#[macro_use] extern crate serde_json;
+#[macro_use] extern crate lazy_static;
 
-extern crate chrono;
+extern crate failure;
 extern crate env_logger;
 extern crate systemd;
 extern crate serde;
-extern crate serde_json;
 extern crate reqwest;
 
-mod stackdriver;
-
-use std::env;
+use reqwest::{header, Client};
+use serde_json::Value;
+use std::io::Read;
 use std::mem;
-use std::ops::Add;
 use std::process;
 use std::time::{Duration, Instant};
 use systemd::journal::*;
 
+const ENTRIES_WRITE_URL: &str = "https://logging.googleapis.com/v2/entries:write";
 const METADATA_TOKEN_URL: &str = "http://metadata.google.internal/computeMetadata/v1/instance/service-accounts/default/token";
+const METADATA_ID_URL: &str = "http://metadata.google.internal/computeMetadata/v1/instance/id";
+const METADATA_ZONE_URL: &str = "http://metadata.google.internal/computeMetadata/v1/instance/zone";
+const METADATA_PROJECT_URL: &str = "http://metadata.google.internal/computeMetadata/v1/project/project-id";
 
+// Google's metadata service requires this header to be present on all
+// calls:
+//g
+// https://cloud.google.com/compute/docs/storing-retrieving-metadata#querying
 header! { (MetadataFlavor, "Metadata-Flavor") => [String] }
 
+/// Convenience type alias for results using failure's `Error` type.
 type Result<T> = std::result::Result<T, failure::Error>;
 
-#[derive(Debug)]
-struct Record {
-    message: Option<String>,
-    hostname: Option<String>,
-    unit: Option<String>,
-    timestamp: Option<String>,
+lazy_static! {
+    /// HTTP client instance preconfigured with the metadata header
+    /// required by Google.
+    static ref METADATA_CLIENT: Client = {
+        let mut headers = header::Headers::new();
+        headers.set(MetadataFlavor("Google".into()));
+
+        Client::builder().default_headers(headers)
+            .build().expect("Could not create metadata client")
+    };
+
+    /// ID of the GCP project in which this instance is running.
+    static ref PROJECT_ID: String = get_metadata(METADATA_PROJECT_URL)
+        .expect("Could not determine project ID");
+
+    /// ID of the current GCP instance.
+    static ref INSTANCE_ID: String = get_metadata(METADATA_ID_URL)
+        .expect("Could not determine instance ID");
+
+    /// GCP zone in which this instance is running.
+    static ref ZONE: String = get_metadata(METADATA_ZONE_URL)
+        .expect("Could not determine instance zone");
+
+    /// Descriptor of the currently monitored instance.
+    ///
+    /// For GCE instances, this will be the GCE instance ID. For
+    /// non-GCE machines a sensible solution may be using the machine
+    /// hostname as a Cloud Logging log name, but this is not yet
+    /// implemented.
+    static ref MONITORED_RESOURCE: Value = json!({
+        "type": "gce_instance",
+        "labels": {
+            "project_id": PROJECT_ID.as_str(),
+            "instance_id": INSTANCE_ID.as_str(),
+            "zone": ZONE.as_str(),
+        }
+    });
+}
+
+/// Convenience helper for retrieving values from the metadata server.
+fn get_metadata(url: &str) -> Result<String> {
+    let mut output = String::new();
+    METADATA_CLIENT.get(url).send()?
+        .error_for_status()?
+        .read_to_string(&mut output)?;
+
+    Ok(output.trim().into())
+}
+
+/// This structure represents a log entry in the format expected by
+/// the Stackdriver API.
+#[derive(Debug, Serialize)]
+#[serde(rename_all = "camelCase")]
+struct LogEntry {
+    labels: Value,
+    text_payload: String, // TODO: attempt to parse jsonPayloads
 }
 
-impl From<JournalRecord> for Record {
-    fn from(mut record: JournalRecord) -> Record {
-        Record {
-            // The message field is technically just a convention, but
-            // journald seems to default to it when ingesting unit
-            // output.
-            message: record.remove("MESSAGE"),
-
-            // Presumably this is always set, but who can be sure
-            // about anything in this world.
-            hostname: record.remove("_HOSTNAME"),
-
-            // The unit is seemingly missing on kernel entries, but
-            // present on all others.
-            unit: record.remove("_SYSTEMD_UNIT"),
-
-            // This timestamp is present on most log entries
-            // (seemingly all that are ingested from the output
-            // systemd units).
-            timestamp: record.remove("_SOURCE_REALTIME_TIMESTAMP"),
+impl From<JournalRecord> for LogEntry {
+    // Converts from the fields contained in a journald record to the
+    // representation required by Stackdriver Logging.
+    //
+    // The fields are documented in systemd.journal-fields(7).
+    fn from(mut record: JournalRecord) -> LogEntry {
+        // The message field is technically just a convention, but
+        // journald seems to default to it when ingesting unit
+        // output.
+        let message = record.remove("MESSAGE");
+
+        // Presumably this is always set, but who can be sure
+        // about anything in this world.
+        let hostname = record.remove("_HOSTNAME");
+
+        // The unit is seemingly missing on kernel entries, but
+        // present on all others.
+        let unit = record.remove("_SYSTEMD_UNIT");
+
+        // TODO: This timestamp (in microseconds) should be parsed
+        // into a DateTime<Utc> and used instead of the ingestion
+        // time.
+        // let timestamp = record
+        //     .remove("_SOURCE_REALTIME_TIMESTAMP")
+        //     .map();
+
+        LogEntry {
+            text_payload: message.unwrap_or_else(|| "empty log entry".into()),
+            labels: json!({
+                "host": hostname,
+                "unit": unit.unwrap_or_else(|| "syslog".into()),
+            }),
         }
     }
 }
@@ -60,8 +154,11 @@ impl From<JournalRecord> for Record {
 /// This function starts a double-looped, blocking receiver. It will
 /// buffer messages for half a second before flushing them to
 /// Stackdriver.
-fn receiver_loop(mut journal: Journal) {
-    let mut buf: Vec<Record> = Vec::new();
+fn receiver_loop(mut journal: Journal) -> Result<()> {
+    let mut token = get_metadata_token()?;
+    let client = reqwest::Client::new();
+
+    let mut buf: Vec<LogEntry> = Vec::new();
     let iteration = Duration::from_millis(500);
 
     loop {
@@ -73,15 +170,15 @@ fn receiver_loop(mut journal: Journal) {
                 break;
             }
 
-            if let Ok(Some(record)) = journal.await_next_record(Some(iteration)) {
-                trace!("Received a new record");
-                buf.push(record.into());
+            if let Ok(Some(entry)) = journal.await_next_record(Some(iteration)) {
+                trace!("Received a new entry");
+                buf.push(entry.into());
             }
         }
 
         if !buf.is_empty() {
             let to_flush = mem::replace(&mut buf, Vec::new());
-            flush(to_flush);
+            flush(&client, &mut token, to_flush)?;
         }
 
         trace!("Done outer iteration");
@@ -91,16 +188,29 @@ fn receiver_loop(mut journal: Journal) {
 /// Flushes all drained records to Stackdriver. Any Stackdriver
 /// message can at most contain 1000 log entries which means they are
 /// chunked up here.
-fn flush(records: Vec<Record>) {
-    for chunk in records.chunks(1000) {
-        debug!("Flushed {} records", chunk.len())
+fn flush(client: &Client, token: &mut Token, entries: Vec<LogEntry>) -> Result<()> {
+    if token.is_expired() {
+        debug!("Refreshing Google metadata access token");
+        let new_token = get_metadata_token()?;
+        mem::replace(token, new_token);
+    }
+
+    for chunk in entries.chunks(1000) {
+        let request = prepare_request(chunk);
+        if let Err(write_error) = write_entries(client, token, request) {
+            error!("Failed to write {} entries: {}", chunk.len(), write_error)
+        } else {
+            debug!("Wrote {} entries to Stackdriver", chunk.len())
+        }
     }
+
+    Ok(())
 }
 
-/// Retrieves an access token from the GCP metadata service.
+/// Represents the response returned by the metadata server's token
+/// endpoint. The token is normally valid for an hour.
 #[derive(Deserialize)]
 struct TokenResponse {
-    #[serde(rename = "type")]
     expires_in: u64,
     access_token: String,
 }
@@ -109,26 +219,55 @@ struct TokenResponse {
 /// representation of when it expires.
 struct Token {
     token: String,
-    renew_at: Instant,
+    fetched_at: Instant,
+    expires: Duration,
 }
 
-fn get_metadata_token(client: &reqwest::Client) -> Result<Token> {
-    let now = Instant::now();
+impl Token {
+    /// Does this token need to be renewed?
+    fn is_expired(&self) -> bool {
+        self.fetched_at.elapsed() > self.expires
+    }
+}
 
-    let token: TokenResponse  = client.get(METADATA_TOKEN_URL)
-        .header(MetadataFlavor("Google".into()))
+fn get_metadata_token() -> Result<Token> {
+    let token: TokenResponse  = METADATA_CLIENT
+        .get(METADATA_TOKEN_URL)
         .send()?.json()?;
 
     debug!("Fetched new token from metadata service");
 
-    let renew_at = now.add(Duration::from_secs(token.expires_in / 2));
-
     Ok(Token {
-        renew_at,
+        fetched_at: Instant::now(),
+        expires: Duration::from_secs(token.expires_in / 2),
         token: token.access_token,
     })
 }
 
+/// Convert a slice of log entries into the format expected by
+/// Stackdriver. This format is documented here:
+///
+/// https://cloud.google.com/logging/docs/reference/v2/rest/v2/entries/write
+fn prepare_request(entries: &[LogEntry]) -> Value {
+    json!({
+        "logName": format!("projects/{}/logs/journaldriver", PROJECT_ID.as_str()),
+        "resource": &*MONITORED_RESOURCE,
+        "entries": entries,
+        "partialSuccess": true
+    })
+}
+
+/// Perform the log entry insertion in Stackdriver Logging.
+fn write_entries(client: &Client, token: &Token, request: Value) -> Result<()> {
+    client.post(ENTRIES_WRITE_URL)
+        .header(header::Authorization(format!("Bearer {}", token.token)))
+        .json(&request)
+        .send()?
+        .error_for_status()?;
+
+    Ok(())
+}
+
 fn main () {
     env_logger::init();
 
@@ -143,5 +282,5 @@ fn main () {
         }
     }
 
-    receiver_loop(journal)
+    receiver_loop(journal).expect("log receiver encountered an unexpected error");
 }
diff --git a/src/stackdriver.rs b/src/stackdriver.rs
deleted file mode 100644
index 436b2642449f..000000000000
--- a/src/stackdriver.rs
+++ /dev/null
@@ -1,92 +0,0 @@
-//! This module defines types and functions for submitting log entries
-//! to the Stackdriver Logging API.
-//!
-//! Initially this will use the HTTP & JSON API instead of gRPC.
-//!
-//! Documentation for the relevant endpoint is available at:
-//!    https://cloud.google.com/logging/docs/reference/v2/rest/v2/entries/write
-
-use chrono::{DateTime, Utc};
-use failure::{Error, ResultExt};
-use std::collections::HashMap;
-use reqwest::Client;
-
-const WRITE_ENTRY_URL: &str = "https://logging.googleapis.com/v2/entries:write";
-const TOKEN_URL: &str = "http://metadata.google.internal/computeMetadata/v1/instance/service-accounts/default/token";
-
-/// Represents an OAuth 2 access token as returned by Google's APIs.
-#[derive(Deserialize)]
-struct Token {
-    access_token: String,
-    expires_in: usize,
-}
-
-#[derive(Debug, Serialize)]
-#[serde(rename_all = "camelCase")]
-struct MonitoredResource {
-    /// The monitored resource type. This field must match the type
-    /// field of a MonitoredResourceDescriptor object.
-    #[serde(rename = "type")]
-    resource_type: String,
-
-    /// Values for all of the labels listed in the associated
-    /// monitored resource descriptor.
-    labels: HashMap<String, String>,
-}
-
-/// This type represents a single Stackdriver log entry.
-#[derive(Debug, Serialize)]
-#[serde(rename_all = "camelCase")]
-struct LogEntry<'a> {
-    /// The resource name of the log to which this log entry belongs.
-    log_name: String,
-
-    /// The primary monitored resource associated with this log entry.
-    resource: &'a MonitoredResource,
-
-    /// The time the event described by the log entry occurred.
-    timestamp: DateTime<Utc>,
-
-    /// A set of user-defined (key, value) data that provides
-    /// additional information about the log entry.
-    labels: HashMap<String, String>,
-}
-
-/// This type represents the request sent to the Stackdriver API to
-/// insert a batch of log records.
-#[derive(Debug, Serialize)]
-#[serde(rename_all = "camelCase")]
-struct WriteEntriesRequest<'a> {
-    /// The log entries to send to Stackdriver Logging.
-    entries: Vec<LogEntry<'a>>,
-
-    /// Whether valid entries should be written even if some other
-    /// entries fail due to `INVALID_ARGUMENT` or `PERMISSION_DENIED`
-    /// errors.
-    partial_success: bool,
-
-    /// Default labels that are added to the labels field of all log
-    /// entries in entries. If a log entry already has a label with
-    /// the same key as a label in this parameter, then the log
-    /// entry's label is not changed.
-    labels: HashMap<String, String>,
-
-    /// The log entry payload, represented as a Unicode string.
-    text_payload: String,
-}
-
-// Define the metadata header required by Google's service:
-header! { (MetadataFlavor, "Metadata-Flavor") => [String] }
-
-/// This function is used to fetch a new authentication token from
-/// Google. Currently only tokens retrieved via instance metadata are
-/// supported.
-fn fetch_token() -> Result<Token, Error> {
-    let token: Token = Client::new()
-        .get(TOKEN_URL)
-        .header(MetadataFlavor("Google".to_string()))
-        .send().context("Requesting token failed")?
-        .json().context("Deserializing token failed")?;
-
-    Ok(token)
-}