diff options
author | Vincent Ambo <mail@tazj.in> | 2018-06-15T14·45+0200 |
---|---|---|
committer | Vincent Ambo <mail@tazj.in> | 2018-06-15T14·45+0200 |
commit | 4ef98fc2ba0f3007adcf2a8ab128ccecfcbba212 (patch) | |
tree | 059727e36438a7f621eb4f78180a8f760825b861 /src/main.rs | |
parent | 2d8e057118c527cc1697327db00d4006d8530b91 (diff) |
feat(main): Implement record conversion & flushing to API
This implements the "meat" of the initial version of journaldriver. Records from journald are converted into the representation required by Stackdriver and forwarded to the API. In this initial version journaldriver is only supported on instances running on GCP.
Diffstat (limited to 'src/main.rs')
-rw-r--r-- | src/main.rs | 245 |
1 files changed, 192 insertions, 53 deletions
diff --git a/src/main.rs b/src/main.rs index c6cfa8df1e76..0e14827b6d12 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,58 +1,152 @@ -#[macro_use] extern crate failure; +//! This file implements journaldriver, a small application that +//! forwards logs from journald (systemd's log facility) to +//! Stackdriver Logging. +//! +//! Log entries are read continously from journald and are forwarded +//! to Stackdriver in batches. +//! +//! Stackdriver Logging has a concept of monitored resources. In the +//! simplest (and currently only supported) case this monitored +//! resource will be the GCE instance on which journaldriver is +//! running. +//! +//! Information about the instance, the project and required security +//! credentials are retrieved from Google's metadata instance on GCP. +//! +//! Things left to do: +//! * TODO 2018-06-15: Support non-GCP instances (see comment on +//! monitored resource descriptor) +//! * TODO 2018-06-15: Extract timestamps from journald instead of +//! relying on ingestion timestamps. +//! * TODO 2018-06-15: Persist last known cursor position after +//! flushing to allow journaldriver to resume from the same position +//! after a restart. + #[macro_use] extern crate hyper; #[macro_use] extern crate log; #[macro_use] extern crate serde_derive; +#[macro_use] extern crate serde_json; +#[macro_use] extern crate lazy_static; -extern crate chrono; +extern crate failure; extern crate env_logger; extern crate systemd; extern crate serde; -extern crate serde_json; extern crate reqwest; -mod stackdriver; - -use std::env; +use reqwest::{header, Client}; +use serde_json::Value; +use std::io::Read; use std::mem; -use std::ops::Add; use std::process; use std::time::{Duration, Instant}; use systemd::journal::*; +const ENTRIES_WRITE_URL: &str = "https://logging.googleapis.com/v2/entries:write"; const METADATA_TOKEN_URL: &str = "http://metadata.google.internal/computeMetadata/v1/instance/service-accounts/default/token"; +const METADATA_ID_URL: &str = "http://metadata.google.internal/computeMetadata/v1/instance/id"; +const METADATA_ZONE_URL: &str = "http://metadata.google.internal/computeMetadata/v1/instance/zone"; +const METADATA_PROJECT_URL: &str = "http://metadata.google.internal/computeMetadata/v1/project/project-id"; +// Google's metadata service requires this header to be present on all +// calls: +//g +// https://cloud.google.com/compute/docs/storing-retrieving-metadata#querying header! { (MetadataFlavor, "Metadata-Flavor") => [String] } +/// Convenience type alias for results using failure's `Error` type. type Result<T> = std::result::Result<T, failure::Error>; -#[derive(Debug)] -struct Record { - message: Option<String>, - hostname: Option<String>, - unit: Option<String>, - timestamp: Option<String>, +lazy_static! { + /// HTTP client instance preconfigured with the metadata header + /// required by Google. + static ref METADATA_CLIENT: Client = { + let mut headers = header::Headers::new(); + headers.set(MetadataFlavor("Google".into())); + + Client::builder().default_headers(headers) + .build().expect("Could not create metadata client") + }; + + /// ID of the GCP project in which this instance is running. + static ref PROJECT_ID: String = get_metadata(METADATA_PROJECT_URL) + .expect("Could not determine project ID"); + + /// ID of the current GCP instance. + static ref INSTANCE_ID: String = get_metadata(METADATA_ID_URL) + .expect("Could not determine instance ID"); + + /// GCP zone in which this instance is running. + static ref ZONE: String = get_metadata(METADATA_ZONE_URL) + .expect("Could not determine instance zone"); + + /// Descriptor of the currently monitored instance. + /// + /// For GCE instances, this will be the GCE instance ID. For + /// non-GCE machines a sensible solution may be using the machine + /// hostname as a Cloud Logging log name, but this is not yet + /// implemented. + static ref MONITORED_RESOURCE: Value = json!({ + "type": "gce_instance", + "labels": { + "project_id": PROJECT_ID.as_str(), + "instance_id": INSTANCE_ID.as_str(), + "zone": ZONE.as_str(), + } + }); +} + +/// Convenience helper for retrieving values from the metadata server. +fn get_metadata(url: &str) -> Result<String> { + let mut output = String::new(); + METADATA_CLIENT.get(url).send()? + .error_for_status()? + .read_to_string(&mut output)?; + + Ok(output.trim().into()) +} + +/// This structure represents a log entry in the format expected by +/// the Stackdriver API. +#[derive(Debug, Serialize)] +#[serde(rename_all = "camelCase")] +struct LogEntry { + labels: Value, + text_payload: String, // TODO: attempt to parse jsonPayloads } -impl From<JournalRecord> for Record { - fn from(mut record: JournalRecord) -> Record { - Record { - // The message field is technically just a convention, but - // journald seems to default to it when ingesting unit - // output. - message: record.remove("MESSAGE"), - - // Presumably this is always set, but who can be sure - // about anything in this world. - hostname: record.remove("_HOSTNAME"), - - // The unit is seemingly missing on kernel entries, but - // present on all others. - unit: record.remove("_SYSTEMD_UNIT"), - - // This timestamp is present on most log entries - // (seemingly all that are ingested from the output - // systemd units). - timestamp: record.remove("_SOURCE_REALTIME_TIMESTAMP"), +impl From<JournalRecord> for LogEntry { + // Converts from the fields contained in a journald record to the + // representation required by Stackdriver Logging. + // + // The fields are documented in systemd.journal-fields(7). + fn from(mut record: JournalRecord) -> LogEntry { + // The message field is technically just a convention, but + // journald seems to default to it when ingesting unit + // output. + let message = record.remove("MESSAGE"); + + // Presumably this is always set, but who can be sure + // about anything in this world. + let hostname = record.remove("_HOSTNAME"); + + // The unit is seemingly missing on kernel entries, but + // present on all others. + let unit = record.remove("_SYSTEMD_UNIT"); + + // TODO: This timestamp (in microseconds) should be parsed + // into a DateTime<Utc> and used instead of the ingestion + // time. + // let timestamp = record + // .remove("_SOURCE_REALTIME_TIMESTAMP") + // .map(); + + LogEntry { + text_payload: message.unwrap_or_else(|| "empty log entry".into()), + labels: json!({ + "host": hostname, + "unit": unit.unwrap_or_else(|| "syslog".into()), + }), } } } @@ -60,8 +154,11 @@ impl From<JournalRecord> for Record { /// This function starts a double-looped, blocking receiver. It will /// buffer messages for half a second before flushing them to /// Stackdriver. -fn receiver_loop(mut journal: Journal) { - let mut buf: Vec<Record> = Vec::new(); +fn receiver_loop(mut journal: Journal) -> Result<()> { + let mut token = get_metadata_token()?; + let client = reqwest::Client::new(); + + let mut buf: Vec<LogEntry> = Vec::new(); let iteration = Duration::from_millis(500); loop { @@ -73,15 +170,15 @@ fn receiver_loop(mut journal: Journal) { break; } - if let Ok(Some(record)) = journal.await_next_record(Some(iteration)) { - trace!("Received a new record"); - buf.push(record.into()); + if let Ok(Some(entry)) = journal.await_next_record(Some(iteration)) { + trace!("Received a new entry"); + buf.push(entry.into()); } } if !buf.is_empty() { let to_flush = mem::replace(&mut buf, Vec::new()); - flush(to_flush); + flush(&client, &mut token, to_flush)?; } trace!("Done outer iteration"); @@ -91,16 +188,29 @@ fn receiver_loop(mut journal: Journal) { /// Flushes all drained records to Stackdriver. Any Stackdriver /// message can at most contain 1000 log entries which means they are /// chunked up here. -fn flush(records: Vec<Record>) { - for chunk in records.chunks(1000) { - debug!("Flushed {} records", chunk.len()) +fn flush(client: &Client, token: &mut Token, entries: Vec<LogEntry>) -> Result<()> { + if token.is_expired() { + debug!("Refreshing Google metadata access token"); + let new_token = get_metadata_token()?; + mem::replace(token, new_token); + } + + for chunk in entries.chunks(1000) { + let request = prepare_request(chunk); + if let Err(write_error) = write_entries(client, token, request) { + error!("Failed to write {} entries: {}", chunk.len(), write_error) + } else { + debug!("Wrote {} entries to Stackdriver", chunk.len()) + } } + + Ok(()) } -/// Retrieves an access token from the GCP metadata service. +/// Represents the response returned by the metadata server's token +/// endpoint. The token is normally valid for an hour. #[derive(Deserialize)] struct TokenResponse { - #[serde(rename = "type")] expires_in: u64, access_token: String, } @@ -109,26 +219,55 @@ struct TokenResponse { /// representation of when it expires. struct Token { token: String, - renew_at: Instant, + fetched_at: Instant, + expires: Duration, } -fn get_metadata_token(client: &reqwest::Client) -> Result<Token> { - let now = Instant::now(); +impl Token { + /// Does this token need to be renewed? + fn is_expired(&self) -> bool { + self.fetched_at.elapsed() > self.expires + } +} - let token: TokenResponse = client.get(METADATA_TOKEN_URL) - .header(MetadataFlavor("Google".into())) +fn get_metadata_token() -> Result<Token> { + let token: TokenResponse = METADATA_CLIENT + .get(METADATA_TOKEN_URL) .send()?.json()?; debug!("Fetched new token from metadata service"); - let renew_at = now.add(Duration::from_secs(token.expires_in / 2)); - Ok(Token { - renew_at, + fetched_at: Instant::now(), + expires: Duration::from_secs(token.expires_in / 2), token: token.access_token, }) } +/// Convert a slice of log entries into the format expected by +/// Stackdriver. This format is documented here: +/// +/// https://cloud.google.com/logging/docs/reference/v2/rest/v2/entries/write +fn prepare_request(entries: &[LogEntry]) -> Value { + json!({ + "logName": format!("projects/{}/logs/journaldriver", PROJECT_ID.as_str()), + "resource": &*MONITORED_RESOURCE, + "entries": entries, + "partialSuccess": true + }) +} + +/// Perform the log entry insertion in Stackdriver Logging. +fn write_entries(client: &Client, token: &Token, request: Value) -> Result<()> { + client.post(ENTRIES_WRITE_URL) + .header(header::Authorization(format!("Bearer {}", token.token))) + .json(&request) + .send()? + .error_for_status()?; + + Ok(()) +} + fn main () { env_logger::init(); @@ -143,5 +282,5 @@ fn main () { } } - receiver_loop(journal) + receiver_loop(journal).expect("log receiver encountered an unexpected error"); } |