diff options
Diffstat (limited to 'ops/journaldriver/src/main.rs')
-rw-r--r-- | ops/journaldriver/src/main.rs | 175 |
1 files changed, 74 insertions, 101 deletions
diff --git a/ops/journaldriver/src/main.rs b/ops/journaldriver/src/main.rs index a57bb3505d..4c404e607e 100644 --- a/ops/journaldriver/src/main.rs +++ b/ops/journaldriver/src/main.rs @@ -31,44 +31,30 @@ //! `GOOGLE_APPLICATION_CREDENTIALS`, `GOOGLE_CLOUD_PROJECT` and //! `LOG_NAME` environment variables. -#[macro_use] extern crate failure; -#[macro_use] extern crate log; -#[macro_use] extern crate serde_derive; -#[macro_use] extern crate serde_json; -#[macro_use] extern crate lazy_static; - -extern crate chrono; -extern crate env_logger; -extern crate medallion; -extern crate serde; -extern crate systemd; -extern crate ureq; - -use chrono::offset::LocalResult; -use chrono::prelude::*; -use failure::ResultExt; -use serde_json::{from_str, Value}; -use std::env; -use std::fs::{self, File, rename}; -use std::io::{self, Read, ErrorKind, Write}; -use std::mem; +use anyhow::{bail, Context, Result}; +use lazy_static::lazy_static; +use log::{debug, error, info, trace}; +use serde::{Deserialize, Serialize}; +use serde_json::{from_str, json, Value}; +use std::convert::TryInto; +use std::fs::{self, rename, File}; +use std::io::{self, ErrorKind, Read, Write}; use std::path::PathBuf; -use std::process; use std::time::{Duration, Instant}; -use systemd::journal::*; +use std::{env, mem, process}; +use systemd::journal::{Journal, JournalFiles, JournalRecord, JournalSeek}; #[cfg(test)] mod tests; const LOGGING_SERVICE: &str = "https://logging.googleapis.com/google.logging.v2.LoggingServiceV2"; const ENTRIES_WRITE_URL: &str = "https://logging.googleapis.com/v2/entries:write"; -const METADATA_TOKEN_URL: &str = "http://metadata.google.internal/computeMetadata/v1/instance/service-accounts/default/token"; +const METADATA_TOKEN_URL: &str = + "http://metadata.google.internal/computeMetadata/v1/instance/service-accounts/default/token"; const METADATA_ID_URL: &str = "http://metadata.google.internal/computeMetadata/v1/instance/id"; const METADATA_ZONE_URL: &str = "http://metadata.google.internal/computeMetadata/v1/instance/zone"; -const METADATA_PROJECT_URL: &str = "http://metadata.google.internal/computeMetadata/v1/project/project-id"; - -/// Convenience type alias for results using failure's `Error` type. -type Result<T> = std::result::Result<T, failure::Error>; +const METADATA_PROJECT_URL: &str = + "http://metadata.google.internal/computeMetadata/v1/project/project-id"; /// Representation of static service account credentials for GCP. #[derive(Debug, Deserialize)] @@ -126,32 +112,27 @@ lazy_static! { /// Convenience helper for retrieving values from the metadata server. fn get_metadata(url: &str) -> Result<String> { - let response = ureq::get(url) - .set("Metadata-Flavor", "Google") - .timeout_connect(5000) - .timeout_read(5000) - .call(); - - if response.ok() { - // Whitespace is trimmed to remove newlines from responses. - let body = response.into_string() - .context("Failed to decode metadata response")? - .trim().to_string(); - - Ok(body) - } else { - let status = response.status_line().to_string(); - let body = response.into_string() - .unwrap_or_else(|e| format!("Metadata body error: {}", e)); - bail!("Metadata failure: {} ({})", body, status) + let response = crimp::Request::get(url) + .header("Metadata-Flavor", "Google")? + .timeout(std::time::Duration::from_secs(5))? + .send()? + .as_string()?; + + if !response.is_success() { + bail!( + "Error response ({}) from metadata server: {}", + response.status, + response.body + ); } + + Ok(response.body.trim().to_owned()) } /// Convenience helper for determining the project ID. fn get_project_id() -> String { env::var("GOOGLE_CLOUD_PROJECT") - .map_err(Into::into) - .or_else(|_: failure::Error| get_metadata(METADATA_PROJECT_URL)) + .or_else(|_| get_metadata(METADATA_PROJECT_URL)) .expect("Could not determine project ID") } @@ -186,11 +167,9 @@ fn determine_monitored_resource() -> Value { } }) } else { - let instance_id = get_metadata(METADATA_ID_URL) - .expect("Could not determine instance ID"); + let instance_id = get_metadata(METADATA_ID_URL).expect("Could not determine instance ID"); - let zone = get_metadata(METADATA_ZONE_URL) - .expect("Could not determine instance zone"); + let zone = get_metadata(METADATA_ZONE_URL).expect("Could not determine instance zone"); json!({ "type": "gce_instance", @@ -252,9 +231,8 @@ fn get_metadata_token() -> Result<Token> { fn sign_service_account_token(credentials: &Credentials) -> Result<Token> { use medallion::{Algorithm, Header, Payload}; - let iat = Utc::now(); - let exp = iat.checked_add_signed(chrono::Duration::seconds(3600)) - .ok_or_else(|| format_err!("Failed to calculate token expiry"))?; + let iat = time::OffsetDateTime::now_utc(); + let exp = iat + time::Duration::seconds(3600); let header = Header { alg: Algorithm::RS256, @@ -267,8 +245,8 @@ fn sign_service_account_token(credentials: &Credentials) -> Result<Token> { iss: Some(credentials.client_email.clone()), sub: Some(credentials.client_email.clone()), aud: Some(LOGGING_SERVICE.to_string()), - iat: Some(iat.timestamp() as u64), - exp: Some(exp.timestamp() as u64), + iat: Some(iat.unix_timestamp().try_into().unwrap()), + exp: Some(exp.unix_timestamp().try_into().unwrap()), ..Default::default() }; @@ -323,7 +301,9 @@ enum Payload { /// text format. fn message_to_payload(message: Option<String>) -> Payload { match message { - None => Payload::TextPayload { text_payload: "empty log entry".into() }, + None => Payload::TextPayload { + text_payload: "empty log entry".into(), + }, Some(text_payload) => { // Attempt to deserialize the text payload as a generic // JSON value. @@ -333,7 +313,7 @@ fn message_to_payload(message: Option<String>) -> Payload { // expect other types of JSON payload) and return it // in that case. if json_payload.is_object() { - return Payload::JsonPayload { json_payload } + return Payload::JsonPayload { json_payload }; } } @@ -348,18 +328,15 @@ fn message_to_payload(message: Option<String>) -> Payload { /// Parse errors are dismissed and returned as empty options: There /// simply aren't any useful fallback mechanisms other than defaulting /// to ingestion time for journaldriver's use-case. -fn parse_microseconds(input: String) -> Option<DateTime<Utc>> { +fn parse_microseconds(input: String) -> Option<time::OffsetDateTime> { if input.len() != 16 { return None; } - let seconds: i64 = (&input[..10]).parse().ok()?; - let micros: u32 = (&input[10..]).parse().ok()?; + let micros: i128 = input.parse().ok()?; + let nanos: i128 = micros * 1000; - match Utc.timestamp_opt(seconds, micros * 1000) { - LocalResult::Single(time) => Some(time), - _ => None, - } + time::OffsetDateTime::from_unix_timestamp_nanos(nanos).ok() } /// Converts a journald log message priority to a @@ -408,7 +385,8 @@ struct LogEntry { labels: Value, #[serde(skip_serializing_if = "Option::is_none")] - timestamp: Option<DateTime<Utc>>, + #[serde(with = "time::serde::rfc3339::option")] + timestamp: Option<time::OffsetDateTime>, #[serde(flatten)] payload: Payload, @@ -450,9 +428,7 @@ impl From<JournalRecord> for LogEntry { // Journald uses syslogd's concept of priority. No idea if this is // always present, but it's optional in the Stackdriver API, so we just // omit it if we can't find or parse it. - let severity = record - .remove("PRIORITY") - .and_then(priority_to_severity); + let severity = record.remove("PRIORITY").and_then(priority_to_severity); LogEntry { payload, @@ -468,8 +444,7 @@ impl From<JournalRecord> for LogEntry { /// Attempt to read from the journal. If no new entry is present, /// await the next one up to the specified timeout. -fn receive_next_record(timeout: Duration, journal: &mut Journal) - -> Result<Option<JournalRecord>> { +fn receive_next_record(timeout: Duration, journal: &mut Journal) -> Result<Option<JournalRecord>> { let next_record = journal.next_record()?; if next_record.is_some() { return Ok(next_record); @@ -525,11 +500,10 @@ fn persist_cursor(cursor: String) -> Result<()> { if cursor.is_empty() { error!("Received empty journald cursor position, refusing to persist!"); error!("Please report this message at https://github.com/tazjin/journaldriver/issues/2"); - return Ok(()) + return Ok(()); } - let mut file = File::create(&*CURSOR_TMP_FILE) - .context("Failed to create cursor file")?; + let mut file = File::create(&*CURSOR_TMP_FILE).context("Failed to create cursor file")?; write!(file, "{}", cursor).context("Failed to write cursor file")?; @@ -547,13 +521,11 @@ fn persist_cursor(cursor: String) -> Result<()> { /// /// If flushing is successful the last cursor position will be /// persisted to disk. -fn flush(token: &mut Token, - entries: Vec<LogEntry>, - cursor: String) -> Result<()> { +fn flush(token: &mut Token, entries: Vec<LogEntry>, cursor: String) -> Result<()> { if token.is_expired() { debug!("Refreshing Google metadata access token"); let new_token = get_token()?; - mem::replace(token, new_token); + *token = new_token; } for chunk in entries.chunks(750) { @@ -583,25 +555,28 @@ fn prepare_request(entries: &[LogEntry]) -> Value { /// Perform the log entry insertion in Stackdriver Logging. fn write_entries(token: &Token, request: Value) -> Result<()> { - let response = ureq::post(ENTRIES_WRITE_URL) - .set("Authorization", format!("Bearer {}", token.token).as_str()) + let response = crimp::Request::post(ENTRIES_WRITE_URL) + .json(&request)? + .header("Authorization", format!("Bearer {}", token.token).as_str())? // The timeout values are set relatively high, not because of // an expectation of Stackdriver being slow but just to - // eventually hit an error case in case of network troubles. + // eventually force an error in case of network troubles. // Presumably no request in a functioning environment will // ever hit these limits. - .timeout_connect(2000) - .timeout_read(5000) - .send_json(request); + .timeout(std::time::Duration::from_secs(5))? + .send()?; - if response.ok() { - Ok(()) - } else { - let status = response.status_line().to_string(); - let body = response.into_string() - .unwrap_or_else(|_| "no response body".into()); - bail!("Write failure: {} ({})", body, status) + if !response.is_success() { + let status = response.status; + let body = response + .as_string() + .map(|r| r.body) + .unwrap_or_else(|_| "no valid response body".to_owned()); + + bail!("Writing to Stackdriver failed({}): {}", status, body); } + + Ok(()) } /// Attempt to read the initial cursor position from the configured @@ -624,14 +599,12 @@ fn initial_cursor() -> Result<JournalSeek> { Err(ref err) if err.kind() == ErrorKind::NotFound => { info!("No previous cursor position, reading from journal tail"); Ok(JournalSeek::Tail) - }, - Err(err) => { - (Err(err).context("Could not read cursor position"))? } + Err(err) => (Err(err).context("Could not read cursor position"))?, } } -fn main () { +fn main() { env_logger::init(); // The directory in which cursor positions are persisted should @@ -641,17 +614,17 @@ fn main () { process::exit(1); } - let cursor_position_dir = CURSOR_FILE.parent() + let cursor_position_dir = CURSOR_FILE + .parent() .expect("Invalid cursor position file path"); fs::create_dir_all(cursor_position_dir) .expect("Could not create directory to store cursor position in"); - let mut journal = Journal::open(JournalFiles::All, false, true) - .expect("Failed to open systemd journal"); + let mut journal = + Journal::open(JournalFiles::All, false, true).expect("Failed to open systemd journal"); - let seek_position = initial_cursor() - .expect("Failed to determine initial cursor position"); + let seek_position = initial_cursor().expect("Failed to determine initial cursor position"); match journal.seek(seek_position) { Ok(cursor) => info!("Opened journal at cursor '{}'", cursor), |