From 2541d25fbaa28b81045f9809c4b71565c6de3f51 Mon Sep 17 00:00:00 2001 From: Vincent Ambo Date: Tue, 5 Jun 2018 15:24:03 +0200 Subject: feat(main): Emit output in chunks of max. 1000 records Required by the Stackdriver API. --- src/main.rs | 24 +++++++++++++++--------- 1 file changed, 15 insertions(+), 9 deletions(-) diff --git a/src/main.rs b/src/main.rs index 7c2a53f8a161..007dd6e629b2 100644 --- a/src/main.rs +++ b/src/main.rs @@ -12,10 +12,11 @@ extern crate reqwest; mod stackdriver; -use systemd::journal::*; +use std::env; +use std::mem; use std::process; use std::time::{Duration, Instant}; -use std::collections::vec_deque::{VecDeque, Drain}; +use systemd::journal::*; #[derive(Debug)] struct Record { @@ -53,7 +54,7 @@ impl From for Record { /// buffer messages for half a second before flushing them to /// Stackdriver. fn receiver_loop(mut journal: Journal) { - let mut buf: VecDeque = VecDeque::new(); + let mut buf: Vec = Vec::new(); let iteration = Duration::from_millis(500); loop { @@ -66,22 +67,27 @@ fn receiver_loop(mut journal: Journal) { } if let Ok(Some(record)) = journal.await_next_record(Some(iteration)) { - buf.push_back(record.into()); + trace!("Received a new record"); + buf.push(record.into()); } } if !buf.is_empty() { - flush(buf.drain(..)); + let to_flush = mem::replace(&mut buf, Vec::new()); + flush(to_flush); } trace!("Done outer iteration"); } } -/// Flushes all drained records to Stackdriver. -fn flush(drain: Drain) { - let record_count = drain.count(); - debug!("Flushed {} records", record_count); +/// Flushes all drained records to Stackdriver. Any Stackdriver +/// message can at most contain 1000 log entries which means they are +/// chunked up here. +fn flush(records: Vec) { + for chunk in records.chunks(1000) { + debug!("Flushed {} records", chunk.len()) + } } fn main () { -- cgit 1.4.1