diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/main.rs | 86 |
1 files changed, 43 insertions, 43 deletions
diff --git a/src/main.rs b/src/main.rs index ea96568a8a59..29a71974b89b 100644 --- a/src/main.rs +++ b/src/main.rs @@ -32,7 +32,6 @@ //! `LOG_NAME` environment variables. #[macro_use] extern crate failure; -#[macro_use] extern crate hyper; #[macro_use] extern crate log; #[macro_use] extern crate serde_derive; #[macro_use] extern crate serde_json; @@ -41,15 +40,14 @@ extern crate chrono; extern crate env_logger; extern crate medallion; -extern crate reqwest; extern crate serde; extern crate systemd; +extern crate ureq; use chrono::offset::LocalResult; use chrono::prelude::*; use failure::ResultExt; -use reqwest::{header, Client}; -use serde_json::Value; +use serde_json::{from_str, Value}; use std::env; use std::fs::{self, File}; use std::io::{self, Read, ErrorKind, Write}; @@ -69,12 +67,6 @@ const METADATA_ID_URL: &str = "http://metadata.google.internal/computeMetadata/v const METADATA_ZONE_URL: &str = "http://metadata.google.internal/computeMetadata/v1/instance/zone"; const METADATA_PROJECT_URL: &str = "http://metadata.google.internal/computeMetadata/v1/project/project-id"; -// Google's metadata service requires this header to be present on all -// calls: -// -// https://cloud.google.com/compute/docs/storing-retrieving-metadata#querying -header! { (MetadataFlavor, "Metadata-Flavor") => [String] } - /// Convenience type alias for results using failure's `Error` type. type Result<T> = std::result::Result<T, failure::Error>; @@ -92,16 +84,6 @@ struct Credentials { } lazy_static! { - /// HTTP client instance preconfigured with the metadata header - /// required by Google. - static ref METADATA_CLIENT: Client = { - let mut headers = header::Headers::new(); - headers.set(MetadataFlavor("Google".into())); - - Client::builder().default_headers(headers) - .build().expect("Could not create metadata client") - }; - /// ID of the GCP project to which to send logs. static ref PROJECT_ID: String = get_project_id(); @@ -130,12 +112,25 @@ lazy_static! { /// Convenience helper for retrieving values from the metadata server. fn get_metadata(url: &str) -> Result<String> { - let mut output = String::new(); - METADATA_CLIENT.get(url).send()? - .error_for_status()? - .read_to_string(&mut output)?; - - Ok(output.trim().into()) + let response = ureq::get(url) + .set("Metadata-Flavor", "Google") + .timeout_connect(5000) + .timeout_read(5000) + .call(); + + if response.ok() { + // Whitespace is trimmed to remove newlines from responses. + let body = response.into_string() + .context("Failed to decode metadata response")? + .trim().to_string(); + + Ok(body) + } else { + let status = response.status_line().to_string(); + let body = response.into_string() + .unwrap_or_else(|e| format!("Metadata body error: {}", e)); + bail!("Metadata failure: {} ({})", body, status) + } } /// Convenience helper for determining the project ID. @@ -205,9 +200,8 @@ impl Token { /// Retrieves a token from the GCP metadata service. Retrieving these /// tokens requires no additional authentication. fn get_metadata_token() -> Result<Token> { - let token: TokenResponse = METADATA_CLIENT - .get(METADATA_TOKEN_URL) - .send()?.json()?; + let body = get_metadata(METADATA_TOKEN_URL)?; + let token: TokenResponse = from_str(&body)?; debug!("Fetched new token from metadata service"); @@ -460,7 +454,6 @@ fn receive_next_record(timeout: Duration, journal: &mut Journal) /// Stackdriver. fn receiver_loop(mut journal: Journal) -> Result<()> { let mut token = get_token()?; - let client = reqwest::Client::new(); let mut buf: Vec<LogEntry> = Vec::new(); let iteration = Duration::from_millis(500); @@ -482,7 +475,7 @@ fn receiver_loop(mut journal: Journal) -> Result<()> { if !buf.is_empty() { let to_flush = mem::replace(&mut buf, Vec::new()); - flush(&client, &mut token, to_flush, journal.cursor()?)?; + flush(&mut token, to_flush, journal.cursor()?)?; } trace!("Done outer iteration"); @@ -504,8 +497,7 @@ fn persist_cursor(cursor: String) -> Result<()> { /// /// If flushing is successful the last cursor position will be /// persisted to disk. -fn flush(client: &Client, - token: &mut Token, +fn flush(token: &mut Token, entries: Vec<LogEntry>, cursor: String) -> Result<()> { if token.is_expired() { @@ -516,7 +508,7 @@ fn flush(client: &Client, for chunk in entries.chunks(750) { let request = prepare_request(chunk); - if let Err(write_error) = write_entries(client, token, request) { + if let Err(write_error) = write_entries(token, request) { error!("Failed to write {} entries: {}", chunk.len(), write_error) } else { debug!("Wrote {} entries to Stackdriver", chunk.len()) @@ -540,17 +532,25 @@ fn prepare_request(entries: &[LogEntry]) -> Value { } /// Perform the log entry insertion in Stackdriver Logging. -fn write_entries(client: &Client, token: &Token, request: Value) -> Result<()> { - let mut response = client.post(ENTRIES_WRITE_URL) - .header(header::Authorization(format!("Bearer {}", token.token))) - .json(&request) - .send()?; - - if response.status().is_success() { +fn write_entries(token: &Token, request: Value) -> Result<()> { + let response = ureq::post(ENTRIES_WRITE_URL) + .set("Authorization", format!("Bearer {}", token.token).as_str()) + // The timeout values are set relatively high, not because of + // an expectation of Stackdriver being slow but just to + // eventually hit an error case in case of network troubles. + // Presumably no request in a functioning environment will + // ever hit these limits. + .timeout_connect(2000) + .timeout_read(5000) + .send_json(request); + + if response.ok() { Ok(()) } else { - let body = response.text().unwrap_or_else(|_| "no response body".into()); - bail!("{} ({})", body, response.status()) + let status = response.status_line().to_string(); + let body = response.into_string() + .unwrap_or_else(|_| "no response body".into()); + bail!("Write failure: {} ({})", body, status) } } |