about summary refs log tree commit diff
path: root/src/main.rs
blob: 9cc9dcee0d632a9fc028dfe42f208d3f45a26b6d (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
// #[macro_use] extern crate failure;
#[macro_use] extern crate log;

extern crate env_logger;
extern crate systemd;

use systemd::journal::*;
use std::process;
use std::thread;
use std::sync::mpsc::{channel, Receiver};
use std::time::{Duration, Instant};
use std::collections::vec_deque::{VecDeque, Drain};

#[derive(Debug)]
struct Record {
    message: Option<String>,
    hostname: Option<String>,
    unit: Option<String>,
    timestamp: Option<String>,
}

impl From<JournalRecord> for Record {
    fn from(mut record: JournalRecord) -> Record {
        Record {
            // The message field is technically just a convention, but
            // journald seems to default to it when ingesting unit
            // output.
            message: record.remove("MESSAGE"),

            // Presumably this is always set, but who can be sure
            // about anything in this world.
            hostname: record.remove("_HOSTNAME"),

            // The unit is seemingly missing on kernel entries, but
            // present on all others.
            unit: record.remove("_SYSTEMD_UNIT"),

            // This timestamp is present on most log entries
            // (seemingly all that are ingested from the output
            // systemd units).
            timestamp: record.remove("_SOURCE_REALTIME_TIMESTAMP"),
        }
    }
}

/// This function spawns a double-looped, blocking receiver. It will
/// buffer messages for a second before flushing them to Stackdriver.
fn receiver_loop(rx: Receiver<Record>) {
    let mut buf = VecDeque::new();
    let iteration = Duration::from_millis(500);

    loop {
        trace!("Beginning outer iteration");
        let now = Instant::now();

        loop {
            if now.elapsed() > iteration {
                break;
            }

            if let Ok(record) = rx.recv_timeout(iteration) {
                buf.push_back(record);
            }
        }

        if !buf.is_empty() {
            flush(buf.drain(..));
        }

        trace!("Done outer iteration");
    }
}

/// Flushes all drained records to Stackdriver.
fn flush(drain: Drain<Record>) {
    let record_count = drain.count();
    debug!("Flushed {} records", record_count);
}

fn main () {
    env_logger::init();

    let mut journal = Journal::open(JournalFiles::All, false, true)
        .expect("Failed to open systemd journal");

    match journal.seek(JournalSeek::Tail) {
        Ok(cursor) => info!("Opened journal at cursor '{}'", cursor),
        Err(err) => {
            error!("Failed to set initial journal position: {}", err);
            process::exit(1)
        }
    }

    let (tx, rx) = channel();

    let _receiver = thread::spawn(move || receiver_loop(rx));

    journal.watch_all_elements(move |record| {
        let record: Record = record.into();

        if record.message.is_some() {
            tx.send(record).ok();
        }

        Ok(())
    }).expect("Failed to read new entries from journal");
}