about summary refs log tree commit diff
path: root/tools/journaldriver/src/main.rs
diff options
context:
space:
mode:
Diffstat (limited to 'tools/journaldriver/src/main.rs')
-rw-r--r--tools/journaldriver/src/main.rs665
1 files changed, 0 insertions, 665 deletions
diff --git a/tools/journaldriver/src/main.rs b/tools/journaldriver/src/main.rs
deleted file mode 100644
index a57bb3505ddc..000000000000
--- a/tools/journaldriver/src/main.rs
+++ /dev/null
@@ -1,665 +0,0 @@
-// Copyright (C) 2018 Vincent Ambo <mail@tazj.in>
-//
-// journaldriver is free software: you can redistribute it and/or
-// modify it under the terms of the GNU General Public License as
-// published by the Free Software Foundation, either version 3 of the
-// License, or (at your option) any later version.
-//
-// This program is distributed in the hope that it will be useful,
-// but WITHOUT ANY WARRANTY; without even the implied warranty of
-// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
-// GNU General Public License for more details.
-//
-// You should have received a copy of the GNU General Public License
-// along with this program.  If not, see <http://www.gnu.org/licenses/>.
-
-//! This file implements journaldriver, a small application that
-//! forwards logs from journald (systemd's log facility) to
-//! Stackdriver Logging.
-//!
-//! Log entries are read continously from journald and are forwarded
-//! to Stackdriver in batches.
-//!
-//! Stackdriver Logging has a concept of monitored resources. In the
-//! simplest case this monitored resource will be the GCE instance on
-//! which journaldriver is running.
-//!
-//! Information about the instance, the project and required security
-//! credentials are retrieved from Google's metadata instance on GCP.
-//!
-//! To run journaldriver on non-GCP machines, users must specify the
-//! `GOOGLE_APPLICATION_CREDENTIALS`, `GOOGLE_CLOUD_PROJECT` and
-//! `LOG_NAME` environment variables.
-
-#[macro_use] extern crate failure;
-#[macro_use] extern crate log;
-#[macro_use] extern crate serde_derive;
-#[macro_use] extern crate serde_json;
-#[macro_use] extern crate lazy_static;
-
-extern crate chrono;
-extern crate env_logger;
-extern crate medallion;
-extern crate serde;
-extern crate systemd;
-extern crate ureq;
-
-use chrono::offset::LocalResult;
-use chrono::prelude::*;
-use failure::ResultExt;
-use serde_json::{from_str, Value};
-use std::env;
-use std::fs::{self, File, rename};
-use std::io::{self, Read, ErrorKind, Write};
-use std::mem;
-use std::path::PathBuf;
-use std::process;
-use std::time::{Duration, Instant};
-use systemd::journal::*;
-
-#[cfg(test)]
-mod tests;
-
-const LOGGING_SERVICE: &str = "https://logging.googleapis.com/google.logging.v2.LoggingServiceV2";
-const ENTRIES_WRITE_URL: &str = "https://logging.googleapis.com/v2/entries:write";
-const METADATA_TOKEN_URL: &str = "http://metadata.google.internal/computeMetadata/v1/instance/service-accounts/default/token";
-const METADATA_ID_URL: &str = "http://metadata.google.internal/computeMetadata/v1/instance/id";
-const METADATA_ZONE_URL: &str = "http://metadata.google.internal/computeMetadata/v1/instance/zone";
-const METADATA_PROJECT_URL: &str = "http://metadata.google.internal/computeMetadata/v1/project/project-id";
-
-/// Convenience type alias for results using failure's `Error` type.
-type Result<T> = std::result::Result<T, failure::Error>;
-
-/// Representation of static service account credentials for GCP.
-#[derive(Debug, Deserialize)]
-struct Credentials {
-    /// PEM encoded private key
-    private_key: String,
-
-    /// `kid` of this private key
-    private_key_id: String,
-
-    /// "email" address of the service account
-    client_email: String,
-}
-
-lazy_static! {
-    /// ID of the GCP project to which to send logs.
-    static ref PROJECT_ID: String = get_project_id();
-
-    /// Name of the log to write to (this should only be manually
-    /// configured if not running on GCP):
-    static ref LOG_NAME: String = env::var("LOG_NAME")
-        .unwrap_or("journaldriver".into());
-
-    /// Service account credentials (if configured)
-    static ref SERVICE_ACCOUNT_CREDENTIALS: Option<Credentials> =
-        env::var("GOOGLE_APPLICATION_CREDENTIALS").ok()
-        .and_then(|path| File::open(path).ok())
-        .and_then(|file| serde_json::from_reader(file).ok());
-
-    /// Descriptor of the currently monitored instance. Refer to the
-    /// documentation of `determine_monitored_resource` for more
-    /// information.
-    static ref MONITORED_RESOURCE: Value = determine_monitored_resource();
-
-    /// Path to the directory in which journaldriver should persist
-    /// its cursor state.
-    static ref CURSOR_DIR: PathBuf = env::var("CURSOR_POSITION_DIR")
-        .unwrap_or("/var/lib/journaldriver".into())
-        .into();
-
-    /// Path to the cursor position file itself.
-    static ref CURSOR_FILE: PathBuf = {
-        let mut path = CURSOR_DIR.clone();
-        path.push("cursor.pos");
-        path
-    };
-
-    /// Path to the temporary file used for cursor position writes.
-    static ref CURSOR_TMP_FILE: PathBuf = {
-        let mut path = CURSOR_DIR.clone();
-        path.push("cursor.tmp");
-        path
-    };
-}
-
-/// Convenience helper for retrieving values from the metadata server.
-fn get_metadata(url: &str) -> Result<String> {
-    let response = ureq::get(url)
-        .set("Metadata-Flavor", "Google")
-        .timeout_connect(5000)
-        .timeout_read(5000)
-        .call();
-
-    if response.ok() {
-        // Whitespace is trimmed to remove newlines from responses.
-        let body = response.into_string()
-            .context("Failed to decode metadata response")?
-            .trim().to_string();
-
-        Ok(body)
-    } else {
-        let status = response.status_line().to_string();
-        let body = response.into_string()
-            .unwrap_or_else(|e| format!("Metadata body error: {}", e));
-        bail!("Metadata failure: {} ({})", body, status)
-    }
-}
-
-/// Convenience helper for determining the project ID.
-fn get_project_id() -> String {
-    env::var("GOOGLE_CLOUD_PROJECT")
-        .map_err(Into::into)
-        .or_else(|_: failure::Error| get_metadata(METADATA_PROJECT_URL))
-        .expect("Could not determine project ID")
-}
-
-/// Determines the monitored resource descriptor used in Stackdriver
-/// logs. On GCP this will be set to the instance ID as returned by
-/// the metadata server.
-///
-/// On non-GCP machines the value is determined by using the
-/// `GOOGLE_CLOUD_PROJECT` and `LOG_STREAM` environment variables.
-///
-/// [issue #4]: https://github.com/tazjin/journaldriver/issues/4
-fn determine_monitored_resource() -> Value {
-    if let Ok(log) = env::var("LOG_STREAM") {
-        // The special value `global` is recognised as a log stream name that
-        // results in a `global`-type resource descriptor. This is useful in
-        // cases where Stackdriver Error Reporting is intended to be used on
-        // a non-GCE instance. See [issue #4][] for details.
-        if log == "global" {
-            return json!({
-                "type": "global",
-                "labels": {
-                    "project_id": PROJECT_ID.as_str(),
-                }
-            });
-        }
-
-        json!({
-            "type": "logging_log",
-            "labels": {
-                "project_id": PROJECT_ID.as_str(),
-                "name": log,
-            }
-        })
-    } else {
-        let instance_id = get_metadata(METADATA_ID_URL)
-            .expect("Could not determine instance ID");
-
-        let zone = get_metadata(METADATA_ZONE_URL)
-            .expect("Could not determine instance zone");
-
-        json!({
-            "type": "gce_instance",
-            "labels": {
-                "project_id": PROJECT_ID.as_str(),
-                "instance_id": instance_id,
-                "zone": zone,
-            }
-        })
-    }
-}
-
-/// Represents the response returned by the metadata server's token
-/// endpoint. The token is normally valid for an hour.
-#[derive(Deserialize)]
-struct TokenResponse {
-    expires_in: u64,
-    access_token: String,
-}
-
-/// Struct used to store a token together with a sensible
-/// representation of when it expires.
-struct Token {
-    token: String,
-    fetched_at: Instant,
-    expires: Duration,
-}
-
-impl Token {
-    /// Does this token need to be renewed?
-    fn is_expired(&self) -> bool {
-        self.fetched_at.elapsed() > self.expires
-    }
-}
-
-/// Retrieves a token from the GCP metadata service. Retrieving these
-/// tokens requires no additional authentication.
-fn get_metadata_token() -> Result<Token> {
-    let body = get_metadata(METADATA_TOKEN_URL)?;
-    let token: TokenResponse = from_str(&body)?;
-
-    debug!("Fetched new token from metadata service");
-
-    Ok(Token {
-        fetched_at: Instant::now(),
-        expires: Duration::from_secs(token.expires_in / 2),
-        token: token.access_token,
-    })
-}
-
-/// Signs a token using static client credentials configured for a
-/// service account. This service account must have been given the
-/// `Log Writer` role in Google Cloud IAM.
-///
-/// The process for creating and signing these tokens is described
-/// here:
-///
-/// https://developers.google.com/identity/protocols/OAuth2ServiceAccount#jwt-auth
-fn sign_service_account_token(credentials: &Credentials) -> Result<Token> {
-    use medallion::{Algorithm, Header, Payload};
-
-    let iat = Utc::now();
-    let exp = iat.checked_add_signed(chrono::Duration::seconds(3600))
-        .ok_or_else(|| format_err!("Failed to calculate token expiry"))?;
-
-    let header = Header {
-        alg: Algorithm::RS256,
-        headers: Some(json!({
-            "kid": credentials.private_key_id,
-        })),
-    };
-
-    let payload: Payload<()> = Payload {
-        iss: Some(credentials.client_email.clone()),
-        sub: Some(credentials.client_email.clone()),
-        aud: Some(LOGGING_SERVICE.to_string()),
-        iat: Some(iat.timestamp() as u64),
-        exp: Some(exp.timestamp() as u64),
-        ..Default::default()
-    };
-
-    let token = medallion::Token::new(header, payload)
-        .sign(credentials.private_key.as_bytes())
-        .context("Signing service account token failed")?;
-
-    debug!("Signed new service account token");
-
-    Ok(Token {
-        token,
-        fetched_at: Instant::now(),
-        expires: Duration::from_secs(3000),
-    })
-}
-
-/// Retrieve the authentication token either by using static client
-/// credentials, or by talking to the metadata server.
-///
-/// Which behaviour is used is controlled by the environment variable
-/// `GOOGLE_APPLICATION_CREDENTIALS`, which should be configured to
-/// point at a JSON private key file if service account authentication
-/// is to be used.
-fn get_token() -> Result<Token> {
-    if let Some(credentials) = SERVICE_ACCOUNT_CREDENTIALS.as_ref() {
-        sign_service_account_token(credentials)
-    } else {
-        get_metadata_token()
-    }
-}
-
-/// This structure represents the different types of payloads
-/// supported by journaldriver.
-///
-/// Currently log entries can either contain plain text messages or
-/// structured payloads in JSON-format.
-#[derive(Debug, Serialize, PartialEq)]
-#[serde(untagged)]
-enum Payload {
-    TextPayload {
-        #[serde(rename = "textPayload")]
-        text_payload: String,
-    },
-    JsonPayload {
-        #[serde(rename = "jsonPayload")]
-        json_payload: Value,
-    },
-}
-
-/// Attempt to parse a log message as JSON and return it as a
-/// structured payload. If parsing fails, return the entry in plain
-/// text format.
-fn message_to_payload(message: Option<String>) -> Payload {
-    match message {
-        None => Payload::TextPayload { text_payload: "empty log entry".into() },
-        Some(text_payload) => {
-            // Attempt to deserialize the text payload as a generic
-            // JSON value.
-            if let Ok(json_payload) = serde_json::from_str::<Value>(&text_payload) {
-                // If JSON-parsing succeeded on the payload, check
-                // whether we parsed an object (Stackdriver does not
-                // expect other types of JSON payload) and return it
-                // in that case.
-                if json_payload.is_object() {
-                    return Payload::JsonPayload { json_payload }
-                }
-            }
-
-            Payload::TextPayload { text_payload }
-        }
-    }
-}
-
-/// Attempt to parse journald's microsecond timestamps into a UTC
-/// timestamp.
-///
-/// Parse errors are dismissed and returned as empty options: There
-/// simply aren't any useful fallback mechanisms other than defaulting
-/// to ingestion time for journaldriver's use-case.
-fn parse_microseconds(input: String) -> Option<DateTime<Utc>> {
-    if input.len() != 16 {
-        return None;
-    }
-
-    let seconds: i64 = (&input[..10]).parse().ok()?;
-    let micros: u32 = (&input[10..]).parse().ok()?;
-
-    match Utc.timestamp_opt(seconds, micros * 1000) {
-        LocalResult::Single(time) => Some(time),
-        _ => None,
-    }
-}
-
-/// Converts a journald log message priority to a
-/// Stackdriver-compatible severity number.
-///
-/// Both Stackdriver and journald specify equivalent
-/// severities/priorities. Conveniently, the names are the same.
-/// Inconveniently, the numbers are not.
-///
-/// For more information on the journald priorities, consult these
-/// man-pages:
-///
-/// * systemd.journal-fields(7) (section 'PRIORITY')
-/// * sd-daemon(3)
-/// * systemd.exec(5) (section 'SyslogLevelPrefix')
-///
-/// Note that priorities can be logged by applications via the prefix
-/// concept described in these man pages, without interfering with
-/// structured JSON-payloads.
-///
-/// For more information on the Stackdriver severity levels, please
-/// consult Google's documentation:
-///
-/// https://cloud.google.com/logging/docs/reference/v2/rest/v2/LogEntry#LogSeverity
-///
-/// Any unknown priority values result in no severity being set.
-fn priority_to_severity(priority: String) -> Option<u32> {
-    match priority.as_ref() {
-        "0" => Some(800), // emerg
-        "1" => Some(700), // alert
-        "2" => Some(600), // crit
-        "3" => Some(500), // err
-        "4" => Some(400), // warning
-        "5" => Some(300), // notice
-        "6" => Some(200), // info
-        "7" => Some(100), // debug
-        _ => None,
-    }
-}
-
-/// This structure represents a log entry in the format expected by
-/// the Stackdriver API.
-#[derive(Debug, Serialize)]
-#[serde(rename_all = "camelCase")]
-struct LogEntry {
-    labels: Value,
-
-    #[serde(skip_serializing_if = "Option::is_none")]
-    timestamp: Option<DateTime<Utc>>,
-
-    #[serde(flatten)]
-    payload: Payload,
-
-    // https://cloud.google.com/logging/docs/reference/v2/rest/v2/LogEntry#LogSeverity
-    #[serde(skip_serializing_if = "Option::is_none")]
-    severity: Option<u32>,
-}
-
-impl From<JournalRecord> for LogEntry {
-    // Converts from the fields contained in a journald record to the
-    // representation required by Stackdriver Logging.
-    //
-    // The fields are documented in systemd.journal-fields(7).
-    fn from(mut record: JournalRecord) -> LogEntry {
-        // The message field is technically just a convention, but
-        // journald seems to default to it when ingesting unit
-        // output.
-        let payload = message_to_payload(record.remove("MESSAGE"));
-
-        // Presumably this is always set, but who can be sure
-        // about anything in this world.
-        let hostname = record.remove("_HOSTNAME");
-
-        // The unit is seemingly missing on kernel entries, but
-        // present on all others.
-        let unit = record.remove("_SYSTEMD_UNIT");
-
-        // The source timestamp (if present) is specified in
-        // microseconds since epoch.
-        //
-        // If it is not present or can not be parsed, journaldriver
-        // will not send a timestamp for the log entry and it will
-        // default to the ingestion time.
-        let timestamp = record
-            .remove("_SOURCE_REALTIME_TIMESTAMP")
-            .and_then(parse_microseconds);
-
-        // Journald uses syslogd's concept of priority. No idea if this is
-        // always present, but it's optional in the Stackdriver API, so we just
-        // omit it if we can't find or parse it.
-        let severity = record
-            .remove("PRIORITY")
-            .and_then(priority_to_severity);
-
-        LogEntry {
-            payload,
-            timestamp,
-            labels: json!({
-                "host": hostname,
-                "unit": unit.unwrap_or_else(|| "syslog".into()),
-            }),
-            severity,
-        }
-    }
-}
-
-/// Attempt to read from the journal. If no new entry is present,
-/// await the next one up to the specified timeout.
-fn receive_next_record(timeout: Duration, journal: &mut Journal)
-                       -> Result<Option<JournalRecord>> {
-    let next_record = journal.next_record()?;
-    if next_record.is_some() {
-        return Ok(next_record);
-    }
-
-    Ok(journal.await_next_record(Some(timeout))?)
-}
-
-/// This function starts a double-looped, blocking receiver. It will
-/// buffer messages for half a second before flushing them to
-/// Stackdriver.
-fn receiver_loop(mut journal: Journal) -> Result<()> {
-    let mut token = get_token()?;
-
-    let mut buf: Vec<LogEntry> = Vec::new();
-    let iteration = Duration::from_millis(500);
-
-    loop {
-        trace!("Beginning outer iteration");
-        let now = Instant::now();
-
-        loop {
-            if now.elapsed() > iteration {
-                break;
-            }
-
-            if let Ok(Some(entry)) = receive_next_record(iteration, &mut journal) {
-                trace!("Received a new entry");
-                buf.push(entry.into());
-            }
-        }
-
-        if !buf.is_empty() {
-            let to_flush = mem::replace(&mut buf, Vec::new());
-            flush(&mut token, to_flush, journal.cursor()?)?;
-        }
-
-        trace!("Done outer iteration");
-    }
-}
-
-/// Writes the current cursor into `/var/journaldriver/cursor.pos`. To
-/// avoid issues with journaldriver being terminated while the cursor
-/// is still being written, this will first write the cursor into a
-/// temporary file and then move it.
-fn persist_cursor(cursor: String) -> Result<()> {
-    // This code exists to aid in tracking down if there are other
-    // causes of issue #2 than what has already been taken care of.
-    //
-    // One theory is that journald (or the Rust library to interface
-    // with it) may occasionally return empty cursor strings. If this
-    // is ever the case, we would like to know about it.
-    if cursor.is_empty() {
-        error!("Received empty journald cursor position, refusing to persist!");
-        error!("Please report this message at https://github.com/tazjin/journaldriver/issues/2");
-        return Ok(())
-    }
-
-    let mut file = File::create(&*CURSOR_TMP_FILE)
-        .context("Failed to create cursor file")?;
-
-    write!(file, "{}", cursor).context("Failed to write cursor file")?;
-
-    rename(&*CURSOR_TMP_FILE, &*CURSOR_FILE)
-        .context("Failed to move cursor file")
-        .map_err(Into::into)
-}
-
-/// Flushes all drained records to Stackdriver. Any Stackdriver
-/// message can at most contain 1000 log entries which means they are
-/// chunked up here.
-///
-/// In some cases large payloads seem to cause errors in Stackdriver -
-/// the chunks are therefore made smaller here.
-///
-/// If flushing is successful the last cursor position will be
-/// persisted to disk.
-fn flush(token: &mut Token,
-         entries: Vec<LogEntry>,
-         cursor: String) -> Result<()> {
-    if token.is_expired() {
-        debug!("Refreshing Google metadata access token");
-        let new_token = get_token()?;
-        mem::replace(token, new_token);
-    }
-
-    for chunk in entries.chunks(750) {
-        let request = prepare_request(chunk);
-        if let Err(write_error) = write_entries(token, request) {
-            error!("Failed to write {} entries: {}", chunk.len(), write_error)
-        } else {
-            debug!("Wrote {} entries to Stackdriver", chunk.len())
-        }
-    }
-
-    persist_cursor(cursor)
-}
-
-/// Convert a slice of log entries into the format expected by
-/// Stackdriver. This format is documented here:
-///
-/// https://cloud.google.com/logging/docs/reference/v2/rest/v2/entries/write
-fn prepare_request(entries: &[LogEntry]) -> Value {
-    json!({
-        "logName": format!("projects/{}/logs/{}", PROJECT_ID.as_str(), LOG_NAME.as_str()),
-        "resource": &*MONITORED_RESOURCE,
-        "entries": entries,
-        "partialSuccess": true
-    })
-}
-
-/// Perform the log entry insertion in Stackdriver Logging.
-fn write_entries(token: &Token, request: Value) -> Result<()> {
-    let response = ureq::post(ENTRIES_WRITE_URL)
-        .set("Authorization", format!("Bearer {}", token.token).as_str())
-        // The timeout values are set relatively high, not because of
-        // an expectation of Stackdriver being slow but just to
-        // eventually hit an error case in case of network troubles.
-        // Presumably no request in a functioning environment will
-        // ever hit these limits.
-        .timeout_connect(2000)
-        .timeout_read(5000)
-        .send_json(request);
-
-    if response.ok() {
-        Ok(())
-    } else {
-        let status = response.status_line().to_string();
-        let body = response.into_string()
-            .unwrap_or_else(|_| "no response body".into());
-        bail!("Write failure: {} ({})", body, status)
-    }
-}
-
-/// Attempt to read the initial cursor position from the configured
-/// file. If there is no initial cursor position set, read from the
-/// tail of the log.
-///
-/// The only "acceptable" error when reading the cursor position is
-/// the cursor position file not existing, other errors are fatal
-/// because they indicate a misconfiguration of journaldriver.
-fn initial_cursor() -> Result<JournalSeek> {
-    let read_result: io::Result<String> = (|| {
-        let mut contents = String::new();
-        let mut file = File::open(&*CURSOR_FILE)?;
-        file.read_to_string(&mut contents)?;
-        Ok(contents.trim().into())
-    })();
-
-    match read_result {
-        Ok(cursor) => Ok(JournalSeek::Cursor { cursor }),
-        Err(ref err) if err.kind() == ErrorKind::NotFound => {
-            info!("No previous cursor position, reading from journal tail");
-            Ok(JournalSeek::Tail)
-        },
-        Err(err) => {
-            (Err(err).context("Could not read cursor position"))?
-        }
-    }
-}
-
-fn main () {
-    env_logger::init();
-
-    // The directory in which cursor positions are persisted should
-    // have been created:
-    if !CURSOR_DIR.exists() {
-        error!("Cursor directory at '{:?}' does not exist", *CURSOR_DIR);
-        process::exit(1);
-    }
-
-    let cursor_position_dir = CURSOR_FILE.parent()
-        .expect("Invalid cursor position file path");
-
-    fs::create_dir_all(cursor_position_dir)
-        .expect("Could not create directory to store cursor position in");
-
-    let mut journal = Journal::open(JournalFiles::All, false, true)
-        .expect("Failed to open systemd journal");
-
-    let seek_position = initial_cursor()
-        .expect("Failed to determine initial cursor position");
-
-    match journal.seek(seek_position) {
-        Ok(cursor) => info!("Opened journal at cursor '{}'", cursor),
-        Err(err) => {
-            error!("Failed to set initial journal position: {}", err);
-            process::exit(1)
-        }
-    }
-
-    receiver_loop(journal).expect("log receiver encountered an unexpected error");
-}