diff options
-rw-r--r-- | Cargo.lock | 29 | ||||
-rw-r--r-- | Cargo.toml | 4 | ||||
-rw-r--r-- | src/main.rs | 245 | ||||
-rw-r--r-- | src/stackdriver.rs | 92 |
4 files changed, 195 insertions, 175 deletions
diff --git a/Cargo.lock b/Cargo.lock index b522ee53b3cf..95fcd8e6896e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -99,17 +99,6 @@ version = "0.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" [[package]] -name = "chrono" -version = "0.4.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -dependencies = [ - "num-integer 0.1.38 (registry+https://github.com/rust-lang/crates.io-index)", - "num-traits 0.2.4 (registry+https://github.com/rust-lang/crates.io-index)", - "serde 1.0.62 (registry+https://github.com/rust-lang/crates.io-index)", - "time 0.1.40 (registry+https://github.com/rust-lang/crates.io-index)", -] - -[[package]] name = "core-foundation" version = "0.2.3" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -339,10 +328,10 @@ source = "registry+https://github.com/rust-lang/crates.io-index" name = "journaldriver" version = "0.1.0" dependencies = [ - "chrono 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)", "env_logger 0.5.10 (registry+https://github.com/rust-lang/crates.io-index)", "failure 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)", "hyper 0.11.27 (registry+https://github.com/rust-lang/crates.io-index)", + "lazy_static 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)", "libc 0.2.41 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.4.1 (registry+https://github.com/rust-lang/crates.io-index)", "pkg-config 0.3.11 (registry+https://github.com/rust-lang/crates.io-index)", @@ -526,19 +515,6 @@ version = "0.1.12" source = "registry+https://github.com/rust-lang/crates.io-index" [[package]] -name = "num-integer" -version = "0.1.38" -source = "registry+https://github.com/rust-lang/crates.io-index" -dependencies = [ - "num-traits 0.2.4 (registry+https://github.com/rust-lang/crates.io-index)", -] - -[[package]] -name = "num-traits" -version = "0.2.4" -source = "registry+https://github.com/rust-lang/crates.io-index" - -[[package]] name = "num_cpus" version = "1.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -1269,7 +1245,6 @@ dependencies = [ "checksum bytes 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)" = "7dd32989a66957d3f0cba6588f15d4281a733f4e9ffc43fcd2385f57d3bf99ff" "checksum cc 1.0.15 (registry+https://github.com/rust-lang/crates.io-index)" = "0ebb87d1116151416c0cf66a0e3fb6430cccd120fd6300794b4dfaa050ac40ba" "checksum cfg-if 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)" = "405216fd8fe65f718daa7102ea808a946b6ce40c742998fbfd3463645552de18" -"checksum chrono 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)" = "1cce36c92cb605414e9b824f866f5babe0a0368e39ea07393b9b63cf3844c0e6" "checksum core-foundation 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)" = "25bfd746d203017f7d5cbd31ee5d8e17f94b6521c7af77ece6c9e4b2d4b16c67" "checksum core-foundation-sys 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)" = "065a5d7ffdcbc8fa145d6f0746f3555025b9097a9e9cda59f7467abae670c78d" "checksum crc 1.8.1 (registry+https://github.com/rust-lang/crates.io-index)" = "d663548de7f5cca343f1e0a48d14dcfb0e9eb4e079ec58883b7251539fa10aeb" @@ -1316,8 +1291,6 @@ dependencies = [ "checksum native-tls 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)" = "f74dbadc8b43df7864539cedb7bc91345e532fdd913cfdc23ad94f4d2d40fbc0" "checksum net2 0.2.32 (registry+https://github.com/rust-lang/crates.io-index)" = "9044faf1413a1057267be51b5afba8eb1090bd2231c693664aa1db716fe1eae0" "checksum nodrop 0.1.12 (registry+https://github.com/rust-lang/crates.io-index)" = "9a2228dca57108069a5262f2ed8bd2e82496d2e074a06d1ccc7ce1687b6ae0a2" -"checksum num-integer 0.1.38 (registry+https://github.com/rust-lang/crates.io-index)" = "6ac0ea58d64a89d9d6b7688031b3be9358d6c919badcf7fbb0527ccfd891ee45" -"checksum num-traits 0.2.4 (registry+https://github.com/rust-lang/crates.io-index)" = "775393e285254d2f5004596d69bb8bc1149754570dcc08cf30cabeba67955e28" "checksum num_cpus 1.8.0 (registry+https://github.com/rust-lang/crates.io-index)" = "c51a3322e4bca9d212ad9a158a02abc6934d005490c054a2778df73a70aa0a30" "checksum openssl 0.9.24 (registry+https://github.com/rust-lang/crates.io-index)" = "a3605c298474a3aa69de92d21139fb5e2a81688d308262359d85cdd0d12a7985" "checksum openssl-sys 0.9.31 (registry+https://github.com/rust-lang/crates.io-index)" = "a4d6a27d108b29befe1822d40e2e22f85518dac59acbf7f30fdc532f48fd0a77" diff --git a/Cargo.toml b/Cargo.toml index e314c0a51ef0..66e83d25300a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -13,8 +13,8 @@ serde = "1.0" serde_derive = "1.0" serde_json = "1.0" reqwest = "0.8" -chrono = { version = "0.4", features = ["serde"] } -hyper = "*" # whatever reqwest drags in +hyper = "0.11" # whatever reqwest drags in +lazy_static = "1.0" [build-dependencies] pkg-config = "0.3" diff --git a/src/main.rs b/src/main.rs index c6cfa8df1e76..0e14827b6d12 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,58 +1,152 @@ -#[macro_use] extern crate failure; +//! This file implements journaldriver, a small application that +//! forwards logs from journald (systemd's log facility) to +//! Stackdriver Logging. +//! +//! Log entries are read continously from journald and are forwarded +//! to Stackdriver in batches. +//! +//! Stackdriver Logging has a concept of monitored resources. In the +//! simplest (and currently only supported) case this monitored +//! resource will be the GCE instance on which journaldriver is +//! running. +//! +//! Information about the instance, the project and required security +//! credentials are retrieved from Google's metadata instance on GCP. +//! +//! Things left to do: +//! * TODO 2018-06-15: Support non-GCP instances (see comment on +//! monitored resource descriptor) +//! * TODO 2018-06-15: Extract timestamps from journald instead of +//! relying on ingestion timestamps. +//! * TODO 2018-06-15: Persist last known cursor position after +//! flushing to allow journaldriver to resume from the same position +//! after a restart. + #[macro_use] extern crate hyper; #[macro_use] extern crate log; #[macro_use] extern crate serde_derive; +#[macro_use] extern crate serde_json; +#[macro_use] extern crate lazy_static; -extern crate chrono; +extern crate failure; extern crate env_logger; extern crate systemd; extern crate serde; -extern crate serde_json; extern crate reqwest; -mod stackdriver; - -use std::env; +use reqwest::{header, Client}; +use serde_json::Value; +use std::io::Read; use std::mem; -use std::ops::Add; use std::process; use std::time::{Duration, Instant}; use systemd::journal::*; +const ENTRIES_WRITE_URL: &str = "https://logging.googleapis.com/v2/entries:write"; const METADATA_TOKEN_URL: &str = "http://metadata.google.internal/computeMetadata/v1/instance/service-accounts/default/token"; +const METADATA_ID_URL: &str = "http://metadata.google.internal/computeMetadata/v1/instance/id"; +const METADATA_ZONE_URL: &str = "http://metadata.google.internal/computeMetadata/v1/instance/zone"; +const METADATA_PROJECT_URL: &str = "http://metadata.google.internal/computeMetadata/v1/project/project-id"; +// Google's metadata service requires this header to be present on all +// calls: +//g +// https://cloud.google.com/compute/docs/storing-retrieving-metadata#querying header! { (MetadataFlavor, "Metadata-Flavor") => [String] } +/// Convenience type alias for results using failure's `Error` type. type Result<T> = std::result::Result<T, failure::Error>; -#[derive(Debug)] -struct Record { - message: Option<String>, - hostname: Option<String>, - unit: Option<String>, - timestamp: Option<String>, +lazy_static! { + /// HTTP client instance preconfigured with the metadata header + /// required by Google. + static ref METADATA_CLIENT: Client = { + let mut headers = header::Headers::new(); + headers.set(MetadataFlavor("Google".into())); + + Client::builder().default_headers(headers) + .build().expect("Could not create metadata client") + }; + + /// ID of the GCP project in which this instance is running. + static ref PROJECT_ID: String = get_metadata(METADATA_PROJECT_URL) + .expect("Could not determine project ID"); + + /// ID of the current GCP instance. + static ref INSTANCE_ID: String = get_metadata(METADATA_ID_URL) + .expect("Could not determine instance ID"); + + /// GCP zone in which this instance is running. + static ref ZONE: String = get_metadata(METADATA_ZONE_URL) + .expect("Could not determine instance zone"); + + /// Descriptor of the currently monitored instance. + /// + /// For GCE instances, this will be the GCE instance ID. For + /// non-GCE machines a sensible solution may be using the machine + /// hostname as a Cloud Logging log name, but this is not yet + /// implemented. + static ref MONITORED_RESOURCE: Value = json!({ + "type": "gce_instance", + "labels": { + "project_id": PROJECT_ID.as_str(), + "instance_id": INSTANCE_ID.as_str(), + "zone": ZONE.as_str(), + } + }); +} + +/// Convenience helper for retrieving values from the metadata server. +fn get_metadata(url: &str) -> Result<String> { + let mut output = String::new(); + METADATA_CLIENT.get(url).send()? + .error_for_status()? + .read_to_string(&mut output)?; + + Ok(output.trim().into()) +} + +/// This structure represents a log entry in the format expected by +/// the Stackdriver API. +#[derive(Debug, Serialize)] +#[serde(rename_all = "camelCase")] +struct LogEntry { + labels: Value, + text_payload: String, // TODO: attempt to parse jsonPayloads } -impl From<JournalRecord> for Record { - fn from(mut record: JournalRecord) -> Record { - Record { - // The message field is technically just a convention, but - // journald seems to default to it when ingesting unit - // output. - message: record.remove("MESSAGE"), - - // Presumably this is always set, but who can be sure - // about anything in this world. - hostname: record.remove("_HOSTNAME"), - - // The unit is seemingly missing on kernel entries, but - // present on all others. - unit: record.remove("_SYSTEMD_UNIT"), - - // This timestamp is present on most log entries - // (seemingly all that are ingested from the output - // systemd units). - timestamp: record.remove("_SOURCE_REALTIME_TIMESTAMP"), +impl From<JournalRecord> for LogEntry { + // Converts from the fields contained in a journald record to the + // representation required by Stackdriver Logging. + // + // The fields are documented in systemd.journal-fields(7). + fn from(mut record: JournalRecord) -> LogEntry { + // The message field is technically just a convention, but + // journald seems to default to it when ingesting unit + // output. + let message = record.remove("MESSAGE"); + + // Presumably this is always set, but who can be sure + // about anything in this world. + let hostname = record.remove("_HOSTNAME"); + + // The unit is seemingly missing on kernel entries, but + // present on all others. + let unit = record.remove("_SYSTEMD_UNIT"); + + // TODO: This timestamp (in microseconds) should be parsed + // into a DateTime<Utc> and used instead of the ingestion + // time. + // let timestamp = record + // .remove("_SOURCE_REALTIME_TIMESTAMP") + // .map(); + + LogEntry { + text_payload: message.unwrap_or_else(|| "empty log entry".into()), + labels: json!({ + "host": hostname, + "unit": unit.unwrap_or_else(|| "syslog".into()), + }), } } } @@ -60,8 +154,11 @@ impl From<JournalRecord> for Record { /// This function starts a double-looped, blocking receiver. It will /// buffer messages for half a second before flushing them to /// Stackdriver. -fn receiver_loop(mut journal: Journal) { - let mut buf: Vec<Record> = Vec::new(); +fn receiver_loop(mut journal: Journal) -> Result<()> { + let mut token = get_metadata_token()?; + let client = reqwest::Client::new(); + + let mut buf: Vec<LogEntry> = Vec::new(); let iteration = Duration::from_millis(500); loop { @@ -73,15 +170,15 @@ fn receiver_loop(mut journal: Journal) { break; } - if let Ok(Some(record)) = journal.await_next_record(Some(iteration)) { - trace!("Received a new record"); - buf.push(record.into()); + if let Ok(Some(entry)) = journal.await_next_record(Some(iteration)) { + trace!("Received a new entry"); + buf.push(entry.into()); } } if !buf.is_empty() { let to_flush = mem::replace(&mut buf, Vec::new()); - flush(to_flush); + flush(&client, &mut token, to_flush)?; } trace!("Done outer iteration"); @@ -91,16 +188,29 @@ fn receiver_loop(mut journal: Journal) { /// Flushes all drained records to Stackdriver. Any Stackdriver /// message can at most contain 1000 log entries which means they are /// chunked up here. -fn flush(records: Vec<Record>) { - for chunk in records.chunks(1000) { - debug!("Flushed {} records", chunk.len()) +fn flush(client: &Client, token: &mut Token, entries: Vec<LogEntry>) -> Result<()> { + if token.is_expired() { + debug!("Refreshing Google metadata access token"); + let new_token = get_metadata_token()?; + mem::replace(token, new_token); + } + + for chunk in entries.chunks(1000) { + let request = prepare_request(chunk); + if let Err(write_error) = write_entries(client, token, request) { + error!("Failed to write {} entries: {}", chunk.len(), write_error) + } else { + debug!("Wrote {} entries to Stackdriver", chunk.len()) + } } + + Ok(()) } -/// Retrieves an access token from the GCP metadata service. +/// Represents the response returned by the metadata server's token +/// endpoint. The token is normally valid for an hour. #[derive(Deserialize)] struct TokenResponse { - #[serde(rename = "type")] expires_in: u64, access_token: String, } @@ -109,26 +219,55 @@ struct TokenResponse { /// representation of when it expires. struct Token { token: String, - renew_at: Instant, + fetched_at: Instant, + expires: Duration, } -fn get_metadata_token(client: &reqwest::Client) -> Result<Token> { - let now = Instant::now(); +impl Token { + /// Does this token need to be renewed? + fn is_expired(&self) -> bool { + self.fetched_at.elapsed() > self.expires + } +} - let token: TokenResponse = client.get(METADATA_TOKEN_URL) - .header(MetadataFlavor("Google".into())) +fn get_metadata_token() -> Result<Token> { + let token: TokenResponse = METADATA_CLIENT + .get(METADATA_TOKEN_URL) .send()?.json()?; debug!("Fetched new token from metadata service"); - let renew_at = now.add(Duration::from_secs(token.expires_in / 2)); - Ok(Token { - renew_at, + fetched_at: Instant::now(), + expires: Duration::from_secs(token.expires_in / 2), token: token.access_token, }) } +/// Convert a slice of log entries into the format expected by +/// Stackdriver. This format is documented here: +/// +/// https://cloud.google.com/logging/docs/reference/v2/rest/v2/entries/write +fn prepare_request(entries: &[LogEntry]) -> Value { + json!({ + "logName": format!("projects/{}/logs/journaldriver", PROJECT_ID.as_str()), + "resource": &*MONITORED_RESOURCE, + "entries": entries, + "partialSuccess": true + }) +} + +/// Perform the log entry insertion in Stackdriver Logging. +fn write_entries(client: &Client, token: &Token, request: Value) -> Result<()> { + client.post(ENTRIES_WRITE_URL) + .header(header::Authorization(format!("Bearer {}", token.token))) + .json(&request) + .send()? + .error_for_status()?; + + Ok(()) +} + fn main () { env_logger::init(); @@ -143,5 +282,5 @@ fn main () { } } - receiver_loop(journal) + receiver_loop(journal).expect("log receiver encountered an unexpected error"); } diff --git a/src/stackdriver.rs b/src/stackdriver.rs deleted file mode 100644 index 436b2642449f..000000000000 --- a/src/stackdriver.rs +++ /dev/null @@ -1,92 +0,0 @@ -//! This module defines types and functions for submitting log entries -//! to the Stackdriver Logging API. -//! -//! Initially this will use the HTTP & JSON API instead of gRPC. -//! -//! Documentation for the relevant endpoint is available at: -//! https://cloud.google.com/logging/docs/reference/v2/rest/v2/entries/write - -use chrono::{DateTime, Utc}; -use failure::{Error, ResultExt}; -use std::collections::HashMap; -use reqwest::Client; - -const WRITE_ENTRY_URL: &str = "https://logging.googleapis.com/v2/entries:write"; -const TOKEN_URL: &str = "http://metadata.google.internal/computeMetadata/v1/instance/service-accounts/default/token"; - -/// Represents an OAuth 2 access token as returned by Google's APIs. -#[derive(Deserialize)] -struct Token { - access_token: String, - expires_in: usize, -} - -#[derive(Debug, Serialize)] -#[serde(rename_all = "camelCase")] -struct MonitoredResource { - /// The monitored resource type. This field must match the type - /// field of a MonitoredResourceDescriptor object. - #[serde(rename = "type")] - resource_type: String, - - /// Values for all of the labels listed in the associated - /// monitored resource descriptor. - labels: HashMap<String, String>, -} - -/// This type represents a single Stackdriver log entry. -#[derive(Debug, Serialize)] -#[serde(rename_all = "camelCase")] -struct LogEntry<'a> { - /// The resource name of the log to which this log entry belongs. - log_name: String, - - /// The primary monitored resource associated with this log entry. - resource: &'a MonitoredResource, - - /// The time the event described by the log entry occurred. - timestamp: DateTime<Utc>, - - /// A set of user-defined (key, value) data that provides - /// additional information about the log entry. - labels: HashMap<String, String>, -} - -/// This type represents the request sent to the Stackdriver API to -/// insert a batch of log records. -#[derive(Debug, Serialize)] -#[serde(rename_all = "camelCase")] -struct WriteEntriesRequest<'a> { - /// The log entries to send to Stackdriver Logging. - entries: Vec<LogEntry<'a>>, - - /// Whether valid entries should be written even if some other - /// entries fail due to `INVALID_ARGUMENT` or `PERMISSION_DENIED` - /// errors. - partial_success: bool, - - /// Default labels that are added to the labels field of all log - /// entries in entries. If a log entry already has a label with - /// the same key as a label in this parameter, then the log - /// entry's label is not changed. - labels: HashMap<String, String>, - - /// The log entry payload, represented as a Unicode string. - text_payload: String, -} - -// Define the metadata header required by Google's service: -header! { (MetadataFlavor, "Metadata-Flavor") => [String] } - -/// This function is used to fetch a new authentication token from -/// Google. Currently only tokens retrieved via instance metadata are -/// supported. -fn fetch_token() -> Result<Token, Error> { - let token: Token = Client::new() - .get(TOKEN_URL) - .header(MetadataFlavor("Google".to_string())) - .send().context("Requesting token failed")? - .json().context("Deserializing token failed")?; - - Ok(token) -} |