// Copyright (C) 2018 Aprila Bank ASA (contact: vincent@aprila.no) // // journaldriver is free software: you can redistribute it and/or // modify it under the terms of the GNU General Public License as // published by the Free Software Foundation, either version 3 of the // License, or (at your option) any later version. // // This program is distributed in the hope that it will be useful, // but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // GNU General Public License for more details. // // You should have received a copy of the GNU General Public License // along with this program. If not, see . //! This file implements journaldriver, a small application that //! forwards logs from journald (systemd's log facility) to //! Stackdriver Logging. //! //! Log entries are read continously from journald and are forwarded //! to Stackdriver in batches. //! //! Stackdriver Logging has a concept of monitored resources. In the //! simplest (and currently only supported) case this monitored //! resource will be the GCE instance on which journaldriver is //! running. //! //! Information about the instance, the project and required security //! credentials are retrieved from Google's metadata instance on GCP. //! //! Things left to do: //! * TODO 2018-06-15: Support non-GCP instances (see comment on //! monitored resource descriptor) #[macro_use] extern crate failure; #[macro_use] extern crate hyper; #[macro_use] extern crate log; #[macro_use] extern crate serde_derive; #[macro_use] extern crate serde_json; #[macro_use] extern crate lazy_static; extern crate chrono; extern crate env_logger; extern crate medallion; extern crate reqwest; extern crate serde; extern crate systemd; use chrono::offset::LocalResult; use chrono::prelude::*; use failure::ResultExt; use reqwest::{header, Client}; use serde_json::Value; use std::env; use std::fs::{self, File}; use std::io::{self, Read, ErrorKind, Write}; use std::mem; use std::path::PathBuf; use std::process; use std::time::{Duration, Instant}; use systemd::journal::*; #[cfg(test)] mod tests; const LOGGING_SERVICE: &str = "https://logging.googleapis.com/google.logging.v2.LoggingServiceV2"; const ENTRIES_WRITE_URL: &str = "https://logging.googleapis.com/v2/entries:write"; const METADATA_TOKEN_URL: &str = "http://metadata.google.internal/computeMetadata/v1/instance/service-accounts/default/token"; const METADATA_ID_URL: &str = "http://metadata.google.internal/computeMetadata/v1/instance/id"; const METADATA_ZONE_URL: &str = "http://metadata.google.internal/computeMetadata/v1/instance/zone"; const METADATA_PROJECT_URL: &str = "http://metadata.google.internal/computeMetadata/v1/project/project-id"; // Google's metadata service requires this header to be present on all // calls: // // https://cloud.google.com/compute/docs/storing-retrieving-metadata#querying header! { (MetadataFlavor, "Metadata-Flavor") => [String] } /// Convenience type alias for results using failure's `Error` type. type Result = std::result::Result; /// Representation of static service account credentials for GCP. #[derive(Debug, Deserialize)] struct Credentials { /// PEM encoded private key private_key: String, /// `kid` of this private key private_key_id: String, /// "email" address of the service account client_email: String, } lazy_static! { /// HTTP client instance preconfigured with the metadata header /// required by Google. static ref METADATA_CLIENT: Client = { let mut headers = header::Headers::new(); headers.set(MetadataFlavor("Google".into())); Client::builder().default_headers(headers) .build().expect("Could not create metadata client") }; /// ID of the GCP project in which this instance is running. static ref PROJECT_ID: String = get_metadata(METADATA_PROJECT_URL) .expect("Could not determine project ID"); /// ID of the current GCP instance. static ref INSTANCE_ID: String = get_metadata(METADATA_ID_URL) .expect("Could not determine instance ID"); /// GCP zone in which this instance is running. static ref ZONE: String = get_metadata(METADATA_ZONE_URL) .expect("Could not determine instance zone"); /// Service account credentials (if configured) static ref SERVICE_ACCOUNT_CREDENTIALS: Option = env::var("GOOGLE_APPLICATION_CREDENTIALS").ok() .and_then(|path| File::open(path).ok()) .and_then(|file| serde_json::from_reader(file).ok()); /// Descriptor of the currently monitored instance. /// /// For GCE instances, this will be the GCE instance ID. For /// non-GCE machines a sensible solution may be using the machine /// hostname as a Cloud Logging log name, but this is not yet /// implemented. static ref MONITORED_RESOURCE: Value = json!({ "type": "gce_instance", "labels": { "project_id": PROJECT_ID.as_str(), "instance_id": INSTANCE_ID.as_str(), "zone": ZONE.as_str(), } }); /// Path to the file in which journaldriver should persist its /// cursor state. static ref POSITION_FILE: PathBuf = env::var("CURSOR_POSITION_FILE") .unwrap_or("/var/journaldriver/cursor.pos".into()) .into(); } /// Convenience helper for retrieving values from the metadata server. fn get_metadata(url: &str) -> Result { let mut output = String::new(); METADATA_CLIENT.get(url).send()? .error_for_status()? .read_to_string(&mut output)?; Ok(output.trim().into()) } /// Represents the response returned by the metadata server's token /// endpoint. The token is normally valid for an hour. #[derive(Deserialize)] struct TokenResponse { expires_in: u64, access_token: String, } /// Struct used to store a token together with a sensible /// representation of when it expires. struct Token { token: String, fetched_at: Instant, expires: Duration, } impl Token { /// Does this token need to be renewed? fn is_expired(&self) -> bool { self.fetched_at.elapsed() > self.expires } } /// Retrieves a token from the GCP metadata service. Retrieving these /// tokens requires no additional authentication. fn get_metadata_token() -> Result { let token: TokenResponse = METADATA_CLIENT .get(METADATA_TOKEN_URL) .send()?.json()?; debug!("Fetched new token from metadata service"); Ok(Token { fetched_at: Instant::now(), expires: Duration::from_secs(token.expires_in / 2), token: token.access_token, }) } /// Signs a token using static client credentials configured for a /// service account. This service account must have been given the /// `Log Writer` role in Google Cloud IAM. /// /// The process for creating and signing these tokens is described /// here: /// /// https://developers.google.com/identity/protocols/OAuth2ServiceAccount#jwt-auth fn sign_service_account_token(credentials: &Credentials) -> Result { use medallion::{Algorithm, Header, Payload}; let iat = Utc::now(); let exp = iat.checked_add_signed(chrono::Duration::seconds(3600)) .ok_or_else(|| format_err!("Failed to calculate token expiry"))?; let header = Header { alg: Algorithm::RS256, headers: Some(json!({ "kid": credentials.private_key_id, })), }; let payload: Payload<()> = Payload { iss: Some(credentials.client_email.clone()), sub: Some(credentials.client_email.clone()), aud: Some(LOGGING_SERVICE.to_string()), iat: Some(iat.timestamp() as u64), exp: Some(exp.timestamp() as u64), ..Default::default() }; let token = medallion::Token::new(header, payload) .sign(credentials.private_key.as_bytes()) .context("Signing service account token failed")?; debug!("Signed new service account token"); Ok(Token { token, fetched_at: Instant::now(), expires: Duration::from_secs(3000), }) } /// Retrieve the authentication token either by using static client /// credentials, or by talking to the metadata server. /// /// Which behaviour is used is controlled by the environment variable /// `GOOGLE_APPLICATION_CREDENTIALS`, which should be configured to /// point at a JSON private key file if service account authentication /// is to be used. fn get_token() -> Result { if let Some(credentials) = SERVICE_ACCOUNT_CREDENTIALS.as_ref() { sign_service_account_token(credentials) } else { get_metadata_token() } } /// This structure represents the different types of payloads /// supported by journaldriver. /// /// Currently log entries can either contain plain text messages or /// structured payloads in JSON-format. #[derive(Debug, Serialize, PartialEq)] #[serde(untagged)] enum Payload { TextPayload { #[serde(rename = "textPayload")] text_payload: String, }, JsonPayload { #[serde(rename = "jsonPayload")] json_payload: Value, }, } /// Attempt to parse a log message as JSON and return it as a /// structured payload. If parsing fails, return the entry in plain /// text format. fn message_to_payload(message: Option) -> Payload { match message { None => Payload::TextPayload { text_payload: "empty log entry".into() }, Some(text_payload) => { // Attempt to deserialize the text payload as a generic // JSON value. if let Ok(json_payload) = serde_json::from_str::(&text_payload) { // If JSON-parsing succeeded on the payload, check // whether we parsed an object (Stackdriver does not // expect other types of JSON payload) and return it // in that case. if json_payload.is_object() { return Payload::JsonPayload { json_payload } } } Payload::TextPayload { text_payload } } } } /// Attempt to parse journald's microsecond timestamps into a UTC /// timestamp. /// /// Parse errors are dismissed and returned as empty options: There /// simply aren't any useful fallback mechanisms other than defaulting /// to ingestion time for journaldriver's use-case. fn parse_microseconds(input: String) -> Option> { if input.len() != 16 { return None; } let seconds: i64 = (&input[..10]).parse().ok()?; let micros: u32 = (&input[10..]).parse().ok()?; match Utc.timestamp_opt(seconds, micros * 1000) { LocalResult::Single(time) => Some(time), _ => None, } } /// This structure represents a log entry in the format expected by /// the Stackdriver API. #[derive(Debug, Serialize)] #[serde(rename_all = "camelCase")] struct LogEntry { labels: Value, #[serde(skip_serializing_if = "Option::is_none")] timestamp: Option>, #[serde(flatten)] payload: Payload, } impl From for LogEntry { // Converts from the fields contained in a journald record to the // representation required by Stackdriver Logging. // // The fields are documented in systemd.journal-fields(7). fn from(mut record: JournalRecord) -> LogEntry { // The message field is technically just a convention, but // journald seems to default to it when ingesting unit // output. let payload = message_to_payload(record.remove("MESSAGE")); // Presumably this is always set, but who can be sure // about anything in this world. let hostname = record.remove("_HOSTNAME"); // The unit is seemingly missing on kernel entries, but // present on all others. let unit = record.remove("_SYSTEMD_UNIT"); // The source timestamp (if present) is specified in // microseconds since epoch. // // If it is not present or can not be parsed, journaldriver // will not send a timestamp for the log entry and it will // default to the ingestion time. let timestamp = record .remove("_SOURCE_REALTIME_TIMESTAMP") .and_then(parse_microseconds); LogEntry { payload, timestamp, labels: json!({ "host": hostname, "unit": unit.unwrap_or_else(|| "syslog".into()), }), } } } /// Attempt to read from the journal. If no new entry is present, /// await the next one up to the specified timeout. fn receive_next_record(timeout: Duration, journal: &mut Journal) -> Result> { let next_record = journal.next_record()?; if next_record.is_some() { return Ok(next_record); } Ok(journal.await_next_record(Some(timeout))?) } /// This function starts a double-looped, blocking receiver. It will /// buffer messages for half a second before flushing them to /// Stackdriver. fn receiver_loop(mut journal: Journal) -> Result<()> { let mut token = get_token()?; let client = reqwest::Client::new(); let mut buf: Vec = Vec::new(); let iteration = Duration::from_millis(500); loop { trace!("Beginning outer iteration"); let now = Instant::now(); loop { if now.elapsed() > iteration { break; } if let Ok(Some(entry)) = receive_next_record(iteration, &mut journal) { trace!("Received a new entry"); buf.push(entry.into()); } } if !buf.is_empty() { let to_flush = mem::replace(&mut buf, Vec::new()); flush(&client, &mut token, to_flush, journal.cursor()?)?; } trace!("Done outer iteration"); } } /// Writes the current cursor into `/var/journaldriver/cursor.pos`. fn persist_cursor(cursor: String) -> Result<()> { let mut file = File::create(&*POSITION_FILE)?; write!(file, "{}", cursor).map_err(Into::into) } /// Flushes all drained records to Stackdriver. Any Stackdriver /// message can at most contain 1000 log entries which means they are /// chunked up here. /// /// In some cases large payloads seem to cause errors in Stackdriver - /// the chunks are therefore made smaller here. /// /// If flushing is successful the last cursor position will be /// persisted to disk. fn flush(client: &Client, token: &mut Token, entries: Vec, cursor: String) -> Result<()> { if token.is_expired() { debug!("Refreshing Google metadata access token"); let new_token = get_token()?; mem::replace(token, new_token); } for chunk in entries.chunks(750) { let request = prepare_request(chunk); if let Err(write_error) = write_entries(client, token, request) { error!("Failed to write {} entries: {}", chunk.len(), write_error) } else { debug!("Wrote {} entries to Stackdriver", chunk.len()) } } persist_cursor(cursor) } /// Convert a slice of log entries into the format expected by /// Stackdriver. This format is documented here: /// /// https://cloud.google.com/logging/docs/reference/v2/rest/v2/entries/write fn prepare_request(entries: &[LogEntry]) -> Value { json!({ "logName": format!("projects/{}/logs/journaldriver", PROJECT_ID.as_str()), "resource": &*MONITORED_RESOURCE, "entries": entries, "partialSuccess": true }) } /// Perform the log entry insertion in Stackdriver Logging. fn write_entries(client: &Client, token: &Token, request: Value) -> Result<()> { let mut response = client.post(ENTRIES_WRITE_URL) .header(header::Authorization(format!("Bearer {}", token.token))) .json(&request) .send()?; if response.status().is_success() { Ok(()) } else { let body = response.text().unwrap_or_else(|_| "no response body".into()); bail!("{} ({})", body, response.status()) } } /// Attempt to read the initial cursor position from the configured /// file. If there is no initial cursor position set, read from the /// tail of the log. /// /// The only "acceptable" error when reading the cursor position is /// the cursor position file not existing, other errors are fatal /// because they indicate a misconfiguration of journaldriver. fn initial_cursor() -> Result { let read_result: io::Result = (|| { let mut contents = String::new(); let mut file = File::open(&*POSITION_FILE)?; file.read_to_string(&mut contents)?; Ok(contents.trim().into()) })(); match read_result { Ok(cursor) => Ok(JournalSeek::Cursor { cursor }), Err(ref err) if err.kind() == ErrorKind::NotFound => { info!("No previous cursor position, reading from journal tail"); Ok(JournalSeek::Tail) }, Err(err) => { (Err(err).context("Could not read cursor position"))? } } } fn main () { env_logger::init(); // If the cursor file does not yet exist, the directory structure // leading up to it should be created: let cursor_position_dir = POSITION_FILE.parent() .expect("Invalid cursor position file path"); fs::create_dir_all(cursor_position_dir) .expect("Could not create directory to store cursor position in"); let mut journal = Journal::open(JournalFiles::All, false, true) .expect("Failed to open systemd journal"); let seek_position = initial_cursor() .expect("Failed to determine initial cursor position"); match journal.seek(seek_position) { Ok(cursor) => info!("Opened journal at cursor '{}'", cursor), Err(err) => { error!("Failed to set initial journal position: {}", err); process::exit(1) } } receiver_loop(journal).expect("log receiver encountered an unexpected error"); }