about summary refs log tree commit diff
path: root/ops/journaldriver/src/main.rs
diff options
context:
space:
mode:
Diffstat (limited to 'ops/journaldriver/src/main.rs')
-rw-r--r--ops/journaldriver/src/main.rs175
1 files changed, 74 insertions, 101 deletions
diff --git a/ops/journaldriver/src/main.rs b/ops/journaldriver/src/main.rs
index a57bb3505d..4c404e607e 100644
--- a/ops/journaldriver/src/main.rs
+++ b/ops/journaldriver/src/main.rs
@@ -31,44 +31,30 @@
 //! `GOOGLE_APPLICATION_CREDENTIALS`, `GOOGLE_CLOUD_PROJECT` and
 //! `LOG_NAME` environment variables.
 
-#[macro_use] extern crate failure;
-#[macro_use] extern crate log;
-#[macro_use] extern crate serde_derive;
-#[macro_use] extern crate serde_json;
-#[macro_use] extern crate lazy_static;
-
-extern crate chrono;
-extern crate env_logger;
-extern crate medallion;
-extern crate serde;
-extern crate systemd;
-extern crate ureq;
-
-use chrono::offset::LocalResult;
-use chrono::prelude::*;
-use failure::ResultExt;
-use serde_json::{from_str, Value};
-use std::env;
-use std::fs::{self, File, rename};
-use std::io::{self, Read, ErrorKind, Write};
-use std::mem;
+use anyhow::{bail, Context, Result};
+use lazy_static::lazy_static;
+use log::{debug, error, info, trace};
+use serde::{Deserialize, Serialize};
+use serde_json::{from_str, json, Value};
+use std::convert::TryInto;
+use std::fs::{self, rename, File};
+use std::io::{self, ErrorKind, Read, Write};
 use std::path::PathBuf;
-use std::process;
 use std::time::{Duration, Instant};
-use systemd::journal::*;
+use std::{env, mem, process};
+use systemd::journal::{Journal, JournalFiles, JournalRecord, JournalSeek};
 
 #[cfg(test)]
 mod tests;
 
 const LOGGING_SERVICE: &str = "https://logging.googleapis.com/google.logging.v2.LoggingServiceV2";
 const ENTRIES_WRITE_URL: &str = "https://logging.googleapis.com/v2/entries:write";
-const METADATA_TOKEN_URL: &str = "http://metadata.google.internal/computeMetadata/v1/instance/service-accounts/default/token";
+const METADATA_TOKEN_URL: &str =
+    "http://metadata.google.internal/computeMetadata/v1/instance/service-accounts/default/token";
 const METADATA_ID_URL: &str = "http://metadata.google.internal/computeMetadata/v1/instance/id";
 const METADATA_ZONE_URL: &str = "http://metadata.google.internal/computeMetadata/v1/instance/zone";
-const METADATA_PROJECT_URL: &str = "http://metadata.google.internal/computeMetadata/v1/project/project-id";
-
-/// Convenience type alias for results using failure's `Error` type.
-type Result<T> = std::result::Result<T, failure::Error>;
+const METADATA_PROJECT_URL: &str =
+    "http://metadata.google.internal/computeMetadata/v1/project/project-id";
 
 /// Representation of static service account credentials for GCP.
 #[derive(Debug, Deserialize)]
@@ -126,32 +112,27 @@ lazy_static! {
 
 /// Convenience helper for retrieving values from the metadata server.
 fn get_metadata(url: &str) -> Result<String> {
-    let response = ureq::get(url)
-        .set("Metadata-Flavor", "Google")
-        .timeout_connect(5000)
-        .timeout_read(5000)
-        .call();
-
-    if response.ok() {
-        // Whitespace is trimmed to remove newlines from responses.
-        let body = response.into_string()
-            .context("Failed to decode metadata response")?
-            .trim().to_string();
-
-        Ok(body)
-    } else {
-        let status = response.status_line().to_string();
-        let body = response.into_string()
-            .unwrap_or_else(|e| format!("Metadata body error: {}", e));
-        bail!("Metadata failure: {} ({})", body, status)
+    let response = crimp::Request::get(url)
+        .header("Metadata-Flavor", "Google")?
+        .timeout(std::time::Duration::from_secs(5))?
+        .send()?
+        .as_string()?;
+
+    if !response.is_success() {
+        bail!(
+            "Error response ({}) from metadata server: {}",
+            response.status,
+            response.body
+        );
     }
+
+    Ok(response.body.trim().to_owned())
 }
 
 /// Convenience helper for determining the project ID.
 fn get_project_id() -> String {
     env::var("GOOGLE_CLOUD_PROJECT")
-        .map_err(Into::into)
-        .or_else(|_: failure::Error| get_metadata(METADATA_PROJECT_URL))
+        .or_else(|_| get_metadata(METADATA_PROJECT_URL))
         .expect("Could not determine project ID")
 }
 
@@ -186,11 +167,9 @@ fn determine_monitored_resource() -> Value {
             }
         })
     } else {
-        let instance_id = get_metadata(METADATA_ID_URL)
-            .expect("Could not determine instance ID");
+        let instance_id = get_metadata(METADATA_ID_URL).expect("Could not determine instance ID");
 
-        let zone = get_metadata(METADATA_ZONE_URL)
-            .expect("Could not determine instance zone");
+        let zone = get_metadata(METADATA_ZONE_URL).expect("Could not determine instance zone");
 
         json!({
             "type": "gce_instance",
@@ -252,9 +231,8 @@ fn get_metadata_token() -> Result<Token> {
 fn sign_service_account_token(credentials: &Credentials) -> Result<Token> {
     use medallion::{Algorithm, Header, Payload};
 
-    let iat = Utc::now();
-    let exp = iat.checked_add_signed(chrono::Duration::seconds(3600))
-        .ok_or_else(|| format_err!("Failed to calculate token expiry"))?;
+    let iat = time::OffsetDateTime::now_utc();
+    let exp = iat + time::Duration::seconds(3600);
 
     let header = Header {
         alg: Algorithm::RS256,
@@ -267,8 +245,8 @@ fn sign_service_account_token(credentials: &Credentials) -> Result<Token> {
         iss: Some(credentials.client_email.clone()),
         sub: Some(credentials.client_email.clone()),
         aud: Some(LOGGING_SERVICE.to_string()),
-        iat: Some(iat.timestamp() as u64),
-        exp: Some(exp.timestamp() as u64),
+        iat: Some(iat.unix_timestamp().try_into().unwrap()),
+        exp: Some(exp.unix_timestamp().try_into().unwrap()),
         ..Default::default()
     };
 
@@ -323,7 +301,9 @@ enum Payload {
 /// text format.
 fn message_to_payload(message: Option<String>) -> Payload {
     match message {
-        None => Payload::TextPayload { text_payload: "empty log entry".into() },
+        None => Payload::TextPayload {
+            text_payload: "empty log entry".into(),
+        },
         Some(text_payload) => {
             // Attempt to deserialize the text payload as a generic
             // JSON value.
@@ -333,7 +313,7 @@ fn message_to_payload(message: Option<String>) -> Payload {
                 // expect other types of JSON payload) and return it
                 // in that case.
                 if json_payload.is_object() {
-                    return Payload::JsonPayload { json_payload }
+                    return Payload::JsonPayload { json_payload };
                 }
             }
 
@@ -348,18 +328,15 @@ fn message_to_payload(message: Option<String>) -> Payload {
 /// Parse errors are dismissed and returned as empty options: There
 /// simply aren't any useful fallback mechanisms other than defaulting
 /// to ingestion time for journaldriver's use-case.
-fn parse_microseconds(input: String) -> Option<DateTime<Utc>> {
+fn parse_microseconds(input: String) -> Option<time::OffsetDateTime> {
     if input.len() != 16 {
         return None;
     }
 
-    let seconds: i64 = (&input[..10]).parse().ok()?;
-    let micros: u32 = (&input[10..]).parse().ok()?;
+    let micros: i128 = input.parse().ok()?;
+    let nanos: i128 = micros * 1000;
 
-    match Utc.timestamp_opt(seconds, micros * 1000) {
-        LocalResult::Single(time) => Some(time),
-        _ => None,
-    }
+    time::OffsetDateTime::from_unix_timestamp_nanos(nanos).ok()
 }
 
 /// Converts a journald log message priority to a
@@ -408,7 +385,8 @@ struct LogEntry {
     labels: Value,
 
     #[serde(skip_serializing_if = "Option::is_none")]
-    timestamp: Option<DateTime<Utc>>,
+    #[serde(with = "time::serde::rfc3339::option")]
+    timestamp: Option<time::OffsetDateTime>,
 
     #[serde(flatten)]
     payload: Payload,
@@ -450,9 +428,7 @@ impl From<JournalRecord> for LogEntry {
         // Journald uses syslogd's concept of priority. No idea if this is
         // always present, but it's optional in the Stackdriver API, so we just
         // omit it if we can't find or parse it.
-        let severity = record
-            .remove("PRIORITY")
-            .and_then(priority_to_severity);
+        let severity = record.remove("PRIORITY").and_then(priority_to_severity);
 
         LogEntry {
             payload,
@@ -468,8 +444,7 @@ impl From<JournalRecord> for LogEntry {
 
 /// Attempt to read from the journal. If no new entry is present,
 /// await the next one up to the specified timeout.
-fn receive_next_record(timeout: Duration, journal: &mut Journal)
-                       -> Result<Option<JournalRecord>> {
+fn receive_next_record(timeout: Duration, journal: &mut Journal) -> Result<Option<JournalRecord>> {
     let next_record = journal.next_record()?;
     if next_record.is_some() {
         return Ok(next_record);
@@ -525,11 +500,10 @@ fn persist_cursor(cursor: String) -> Result<()> {
     if cursor.is_empty() {
         error!("Received empty journald cursor position, refusing to persist!");
         error!("Please report this message at https://github.com/tazjin/journaldriver/issues/2");
-        return Ok(())
+        return Ok(());
     }
 
-    let mut file = File::create(&*CURSOR_TMP_FILE)
-        .context("Failed to create cursor file")?;
+    let mut file = File::create(&*CURSOR_TMP_FILE).context("Failed to create cursor file")?;
 
     write!(file, "{}", cursor).context("Failed to write cursor file")?;
 
@@ -547,13 +521,11 @@ fn persist_cursor(cursor: String) -> Result<()> {
 ///
 /// If flushing is successful the last cursor position will be
 /// persisted to disk.
-fn flush(token: &mut Token,
-         entries: Vec<LogEntry>,
-         cursor: String) -> Result<()> {
+fn flush(token: &mut Token, entries: Vec<LogEntry>, cursor: String) -> Result<()> {
     if token.is_expired() {
         debug!("Refreshing Google metadata access token");
         let new_token = get_token()?;
-        mem::replace(token, new_token);
+        *token = new_token;
     }
 
     for chunk in entries.chunks(750) {
@@ -583,25 +555,28 @@ fn prepare_request(entries: &[LogEntry]) -> Value {
 
 /// Perform the log entry insertion in Stackdriver Logging.
 fn write_entries(token: &Token, request: Value) -> Result<()> {
-    let response = ureq::post(ENTRIES_WRITE_URL)
-        .set("Authorization", format!("Bearer {}", token.token).as_str())
+    let response = crimp::Request::post(ENTRIES_WRITE_URL)
+        .json(&request)?
+        .header("Authorization", format!("Bearer {}", token.token).as_str())?
         // The timeout values are set relatively high, not because of
         // an expectation of Stackdriver being slow but just to
-        // eventually hit an error case in case of network troubles.
+        // eventually force an error in case of network troubles.
         // Presumably no request in a functioning environment will
         // ever hit these limits.
-        .timeout_connect(2000)
-        .timeout_read(5000)
-        .send_json(request);
+        .timeout(std::time::Duration::from_secs(5))?
+        .send()?;
 
-    if response.ok() {
-        Ok(())
-    } else {
-        let status = response.status_line().to_string();
-        let body = response.into_string()
-            .unwrap_or_else(|_| "no response body".into());
-        bail!("Write failure: {} ({})", body, status)
+    if !response.is_success() {
+        let status = response.status;
+        let body = response
+            .as_string()
+            .map(|r| r.body)
+            .unwrap_or_else(|_| "no valid response body".to_owned());
+
+        bail!("Writing to Stackdriver failed({}): {}", status, body);
     }
+
+    Ok(())
 }
 
 /// Attempt to read the initial cursor position from the configured
@@ -624,14 +599,12 @@ fn initial_cursor() -> Result<JournalSeek> {
         Err(ref err) if err.kind() == ErrorKind::NotFound => {
             info!("No previous cursor position, reading from journal tail");
             Ok(JournalSeek::Tail)
-        },
-        Err(err) => {
-            (Err(err).context("Could not read cursor position"))?
         }
+        Err(err) => (Err(err).context("Could not read cursor position"))?,
     }
 }
 
-fn main () {
+fn main() {
     env_logger::init();
 
     // The directory in which cursor positions are persisted should
@@ -641,17 +614,17 @@ fn main () {
         process::exit(1);
     }
 
-    let cursor_position_dir = CURSOR_FILE.parent()
+    let cursor_position_dir = CURSOR_FILE
+        .parent()
         .expect("Invalid cursor position file path");
 
     fs::create_dir_all(cursor_position_dir)
         .expect("Could not create directory to store cursor position in");
 
-    let mut journal = Journal::open(JournalFiles::All, false, true)
-        .expect("Failed to open systemd journal");
+    let mut journal =
+        Journal::open(JournalFiles::All, false, true).expect("Failed to open systemd journal");
 
-    let seek_position = initial_cursor()
-        .expect("Failed to determine initial cursor position");
+    let seek_position = initial_cursor().expect("Failed to determine initial cursor position");
 
     match journal.seek(seek_position) {
         Ok(cursor) => info!("Opened journal at cursor '{}'", cursor),