diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/main.rs | 24 |
1 files changed, 15 insertions, 9 deletions
diff --git a/src/main.rs b/src/main.rs index 7c2a53f8a161..007dd6e629b2 100644 --- a/src/main.rs +++ b/src/main.rs @@ -12,10 +12,11 @@ extern crate reqwest; mod stackdriver; -use systemd::journal::*; +use std::env; +use std::mem; use std::process; use std::time::{Duration, Instant}; -use std::collections::vec_deque::{VecDeque, Drain}; +use systemd::journal::*; #[derive(Debug)] struct Record { @@ -53,7 +54,7 @@ impl From<JournalRecord> for Record { /// buffer messages for half a second before flushing them to /// Stackdriver. fn receiver_loop(mut journal: Journal) { - let mut buf: VecDeque<Record> = VecDeque::new(); + let mut buf: Vec<Record> = Vec::new(); let iteration = Duration::from_millis(500); loop { @@ -66,22 +67,27 @@ fn receiver_loop(mut journal: Journal) { } if let Ok(Some(record)) = journal.await_next_record(Some(iteration)) { - buf.push_back(record.into()); + trace!("Received a new record"); + buf.push(record.into()); } } if !buf.is_empty() { - flush(buf.drain(..)); + let to_flush = mem::replace(&mut buf, Vec::new()); + flush(to_flush); } trace!("Done outer iteration"); } } -/// Flushes all drained records to Stackdriver. -fn flush(drain: Drain<Record>) { - let record_count = drain.count(); - debug!("Flushed {} records", record_count); +/// Flushes all drained records to Stackdriver. Any Stackdriver +/// message can at most contain 1000 log entries which means they are +/// chunked up here. +fn flush(records: Vec<Record>) { + for chunk in records.chunks(1000) { + debug!("Flushed {} records", chunk.len()) + } } fn main () { |