about summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--src/main.rs75
1 files changed, 66 insertions, 9 deletions
diff --git a/src/main.rs b/src/main.rs
index 894f72676ba5..d740cc1785a2 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -33,9 +33,6 @@
 //!   monitored resource descriptor)
 //! * TODO 2018-06-15: Extract timestamps from journald instead of
 //!   relying on ingestion timestamps.
-//! * TODO 2018-06-15: Persist last known cursor position after
-//!   flushing to allow journaldriver to resume from the same position
-//!   after a restart.
 
 #[macro_use] extern crate hyper;
 #[macro_use] extern crate log;
@@ -49,10 +46,14 @@ extern crate systemd;
 extern crate serde;
 extern crate reqwest;
 
+use failure::ResultExt;
 use reqwest::{header, Client};
 use serde_json::Value;
-use std::io::Read;
+use std::env;
+use std::fs::{self, File};
+use std::io::{self, Read, ErrorKind, Write};
 use std::mem;
+use std::path::PathBuf;
 use std::process;
 use std::time::{Duration, Instant};
 use systemd::journal::*;
@@ -68,7 +69,7 @@ const METADATA_PROJECT_URL: &str = "http://metadata.google.internal/computeMetad
 
 // Google's metadata service requires this header to be present on all
 // calls:
-//g
+//
 // https://cloud.google.com/compute/docs/storing-retrieving-metadata#querying
 header! { (MetadataFlavor, "Metadata-Flavor") => [String] }
 
@@ -112,6 +113,12 @@ lazy_static! {
             "zone": ZONE.as_str(),
         }
     });
+
+    /// Path to the file in which journaldriver should persist its
+    /// cursor state.
+    static ref POSITION_FILE: PathBuf = env::var("CURSOR_POSITION_FILE")
+        .unwrap_or("/var/journaldriver/cursor.pos".into())
+        .into();
 }
 
 /// Convenience helper for retrieving values from the metadata server.
@@ -240,17 +247,29 @@ fn receiver_loop(mut journal: Journal) -> Result<()> {
 
         if !buf.is_empty() {
             let to_flush = mem::replace(&mut buf, Vec::new());
-            flush(&client, &mut token, to_flush)?;
+            flush(&client, &mut token, to_flush, journal.cursor()?)?;
         }
 
         trace!("Done outer iteration");
     }
 }
 
+/// Writes the current cursor into `/var/journaldriver/cursor.pos`.
+fn persist_cursor(cursor: String) -> Result<()> {
+    let mut file = File::create(&*POSITION_FILE)?;
+    write!(file, "{}", cursor).map_err(Into::into)
+}
+
 /// Flushes all drained records to Stackdriver. Any Stackdriver
 /// message can at most contain 1000 log entries which means they are
 /// chunked up here.
-fn flush(client: &Client, token: &mut Token, entries: Vec<LogEntry>) -> Result<()> {
+///
+/// If flushing is successful the last cursor position will be
+/// persisted to disk.
+fn flush(client: &Client,
+         token: &mut Token,
+         entries: Vec<LogEntry>,
+         cursor: String) -> Result<()> {
     if token.is_expired() {
         debug!("Refreshing Google metadata access token");
         let new_token = get_metadata_token()?;
@@ -266,7 +285,7 @@ fn flush(client: &Client, token: &mut Token, entries: Vec<LogEntry>) -> Result<(
         }
     }
 
-    Ok(())
+    persist_cursor(cursor)
 }
 
 /// Represents the response returned by the metadata server's token
@@ -330,13 +349,51 @@ fn write_entries(client: &Client, token: &Token, request: Value) -> Result<()> {
     Ok(())
 }
 
+/// Attempt to read the initial cursor position from the configured
+/// file. If there is no initial cursor position set, read from the
+/// tail of the log.
+///
+/// The only "acceptable" error when reading the cursor position is
+/// the cursor position file not existing, other errors are fatal
+/// because they indicate a misconfiguration of journaldriver.
+fn initial_cursor() -> Result<JournalSeek> {
+    let read_result: io::Result<String> = (|| {
+        let mut contents = String::new();
+        let mut file = File::open(&*POSITION_FILE)?;
+        file.read_to_string(&mut contents)?;
+        Ok(contents.trim().into())
+    })();
+
+    match read_result {
+        Ok(cursor) => Ok(JournalSeek::Cursor { cursor }),
+        Err(ref err) if err.kind() == ErrorKind::NotFound => {
+            info!("No previous cursor position, reading from journal tail");
+            Ok(JournalSeek::Tail)
+        },
+        Err(err) => {
+            (Err(err).context("Could not read cursor position"))?
+        }
+    }
+}
+
 fn main () {
     env_logger::init();
 
+    // If the cursor file does not yet exist, the directory structure
+    // leading up to it should be created:
+    let cursor_position_dir = POSITION_FILE.parent()
+        .expect("Invalid cursor position file path");
+
+    fs::create_dir_all(cursor_position_dir)
+        .expect("Could not create directory to store cursor position in");
+
     let mut journal = Journal::open(JournalFiles::All, false, true)
         .expect("Failed to open systemd journal");
 
-    match journal.seek(JournalSeek::Tail) {
+    let seek_position = initial_cursor()
+        .expect("Failed to determine initial cursor position");
+
+    match journal.seek(seek_position) {
         Ok(cursor) => info!("Opened journal at cursor '{}'", cursor),
         Err(err) => {
             error!("Failed to set initial journal position: {}", err);