about summary refs log tree commit diff
diff options
context:
space:
mode:
authorVincent Ambo <mail@tazj.in>2018-06-05T13·24+0200
committerVincent Ambo <mail@tazj.in>2018-06-05T13·24+0200
commit2541d25fbaa28b81045f9809c4b71565c6de3f51 (patch)
tree6ef787ce3755e5a1ba70ba14a728be9540d52ea4
parentef638bfa20d48c12960787865ec780b4e9773095 (diff)
feat(main): Emit output in chunks of max. 1000 records
Required by the Stackdriver API.
-rw-r--r--src/main.rs24
1 files changed, 15 insertions, 9 deletions
diff --git a/src/main.rs b/src/main.rs
index 7c2a53f8a161..007dd6e629b2 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -12,10 +12,11 @@ extern crate reqwest;
 
 mod stackdriver;
 
-use systemd::journal::*;
+use std::env;
+use std::mem;
 use std::process;
 use std::time::{Duration, Instant};
-use std::collections::vec_deque::{VecDeque, Drain};
+use systemd::journal::*;
 
 #[derive(Debug)]
 struct Record {
@@ -53,7 +54,7 @@ impl From<JournalRecord> for Record {
 /// buffer messages for half a second before flushing them to
 /// Stackdriver.
 fn receiver_loop(mut journal: Journal) {
-    let mut buf: VecDeque<Record> = VecDeque::new();
+    let mut buf: Vec<Record> = Vec::new();
     let iteration = Duration::from_millis(500);
 
     loop {
@@ -66,22 +67,27 @@ fn receiver_loop(mut journal: Journal) {
             }
 
             if let Ok(Some(record)) = journal.await_next_record(Some(iteration)) {
-                buf.push_back(record.into());
+                trace!("Received a new record");
+                buf.push(record.into());
             }
         }
 
         if !buf.is_empty() {
-            flush(buf.drain(..));
+            let to_flush = mem::replace(&mut buf, Vec::new());
+            flush(to_flush);
         }
 
         trace!("Done outer iteration");
     }
 }
 
-/// Flushes all drained records to Stackdriver.
-fn flush(drain: Drain<Record>) {
-    let record_count = drain.count();
-    debug!("Flushed {} records", record_count);
+/// Flushes all drained records to Stackdriver. Any Stackdriver
+/// message can at most contain 1000 log entries which means they are
+/// chunked up here.
+fn flush(records: Vec<Record>) {
+    for chunk in records.chunks(1000) {
+        debug!("Flushed {} records", chunk.len())
+    }
 }
 
 fn main () {