From e4b4830a045b7ee83d1a3edd02dcfce9a88054e2 Mon Sep 17 00:00:00 2001 From: Vincent Ambo Date: Sat, 16 Jun 2018 18:40:09 +0200 Subject: feat(main): Persist & load persisted cursor positions Adds support for persisting the cursor position in a file (by default `/var/journaldriver/cursor.pos`, overridable via the environment variable `CURSOR_POSITION_FILE`). This lets journaldriver resume from a known journal position after service restarts. This closes #3 --- src/main.rs | 75 +++++++++++++++++++++++++++++++++++++++++++++++++++++-------- 1 file changed, 66 insertions(+), 9 deletions(-) (limited to 'src/main.rs') diff --git a/src/main.rs b/src/main.rs index 894f72676ba5..d740cc1785a2 100644 --- a/src/main.rs +++ b/src/main.rs @@ -33,9 +33,6 @@ //! monitored resource descriptor) //! * TODO 2018-06-15: Extract timestamps from journald instead of //! relying on ingestion timestamps. -//! * TODO 2018-06-15: Persist last known cursor position after -//! flushing to allow journaldriver to resume from the same position -//! after a restart. #[macro_use] extern crate hyper; #[macro_use] extern crate log; @@ -49,10 +46,14 @@ extern crate systemd; extern crate serde; extern crate reqwest; +use failure::ResultExt; use reqwest::{header, Client}; use serde_json::Value; -use std::io::Read; +use std::env; +use std::fs::{self, File}; +use std::io::{self, Read, ErrorKind, Write}; use std::mem; +use std::path::PathBuf; use std::process; use std::time::{Duration, Instant}; use systemd::journal::*; @@ -68,7 +69,7 @@ const METADATA_PROJECT_URL: &str = "http://metadata.google.internal/computeMetad // Google's metadata service requires this header to be present on all // calls: -//g +// // https://cloud.google.com/compute/docs/storing-retrieving-metadata#querying header! { (MetadataFlavor, "Metadata-Flavor") => [String] } @@ -112,6 +113,12 @@ lazy_static! { "zone": ZONE.as_str(), } }); + + /// Path to the file in which journaldriver should persist its + /// cursor state. + static ref POSITION_FILE: PathBuf = env::var("CURSOR_POSITION_FILE") + .unwrap_or("/var/journaldriver/cursor.pos".into()) + .into(); } /// Convenience helper for retrieving values from the metadata server. @@ -240,17 +247,29 @@ fn receiver_loop(mut journal: Journal) -> Result<()> { if !buf.is_empty() { let to_flush = mem::replace(&mut buf, Vec::new()); - flush(&client, &mut token, to_flush)?; + flush(&client, &mut token, to_flush, journal.cursor()?)?; } trace!("Done outer iteration"); } } +/// Writes the current cursor into `/var/journaldriver/cursor.pos`. +fn persist_cursor(cursor: String) -> Result<()> { + let mut file = File::create(&*POSITION_FILE)?; + write!(file, "{}", cursor).map_err(Into::into) +} + /// Flushes all drained records to Stackdriver. Any Stackdriver /// message can at most contain 1000 log entries which means they are /// chunked up here. -fn flush(client: &Client, token: &mut Token, entries: Vec) -> Result<()> { +/// +/// If flushing is successful the last cursor position will be +/// persisted to disk. +fn flush(client: &Client, + token: &mut Token, + entries: Vec, + cursor: String) -> Result<()> { if token.is_expired() { debug!("Refreshing Google metadata access token"); let new_token = get_metadata_token()?; @@ -266,7 +285,7 @@ fn flush(client: &Client, token: &mut Token, entries: Vec) -> Result<( } } - Ok(()) + persist_cursor(cursor) } /// Represents the response returned by the metadata server's token @@ -330,13 +349,51 @@ fn write_entries(client: &Client, token: &Token, request: Value) -> Result<()> { Ok(()) } +/// Attempt to read the initial cursor position from the configured +/// file. If there is no initial cursor position set, read from the +/// tail of the log. +/// +/// The only "acceptable" error when reading the cursor position is +/// the cursor position file not existing, other errors are fatal +/// because they indicate a misconfiguration of journaldriver. +fn initial_cursor() -> Result { + let read_result: io::Result = (|| { + let mut contents = String::new(); + let mut file = File::open(&*POSITION_FILE)?; + file.read_to_string(&mut contents)?; + Ok(contents.trim().into()) + })(); + + match read_result { + Ok(cursor) => Ok(JournalSeek::Cursor { cursor }), + Err(ref err) if err.kind() == ErrorKind::NotFound => { + info!("No previous cursor position, reading from journal tail"); + Ok(JournalSeek::Tail) + }, + Err(err) => { + (Err(err).context("Could not read cursor position"))? + } + } +} + fn main () { env_logger::init(); + // If the cursor file does not yet exist, the directory structure + // leading up to it should be created: + let cursor_position_dir = POSITION_FILE.parent() + .expect("Invalid cursor position file path"); + + fs::create_dir_all(cursor_position_dir) + .expect("Could not create directory to store cursor position in"); + let mut journal = Journal::open(JournalFiles::All, false, true) .expect("Failed to open systemd journal"); - match journal.seek(JournalSeek::Tail) { + let seek_position = initial_cursor() + .expect("Failed to determine initial cursor position"); + + match journal.seek(seek_position) { Ok(cursor) => info!("Opened journal at cursor '{}'", cursor), Err(err) => { error!("Failed to set initial journal position: {}", err); -- cgit 1.4.1