about summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--Cargo.lock226
-rw-r--r--Cargo.toml3
-rw-r--r--src/journald.rs71
-rw-r--r--src/main.rs115
4 files changed, 329 insertions, 86 deletions
diff --git a/Cargo.lock b/Cargo.lock
index 1a17c08226c9..fb27771d2089 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -1,4 +1,22 @@
 [[package]]
+name = "aho-corasick"
+version = "0.6.4"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+dependencies = [
+ "memchr 2.0.1 (registry+https://github.com/rust-lang/crates.io-index)",
+]
+
+[[package]]
+name = "atty"
+version = "0.2.10"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+dependencies = [
+ "libc 0.2.41 (registry+https://github.com/rust-lang/crates.io-index)",
+ "termion 1.5.1 (registry+https://github.com/rust-lang/crates.io-index)",
+ "winapi 0.3.4 (registry+https://github.com/rust-lang/crates.io-index)",
+]
+
+[[package]]
 name = "backtrace"
 version = "0.3.8"
 source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -30,6 +48,27 @@ version = "0.1.3"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 
 [[package]]
+name = "cstr-argument"
+version = "0.0.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+dependencies = [
+ "cfg-if 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)",
+ "memchr 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)",
+]
+
+[[package]]
+name = "env_logger"
+version = "0.5.10"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+dependencies = [
+ "atty 0.2.10 (registry+https://github.com/rust-lang/crates.io-index)",
+ "humantime 1.1.1 (registry+https://github.com/rust-lang/crates.io-index)",
+ "log 0.4.1 (registry+https://github.com/rust-lang/crates.io-index)",
+ "regex 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)",
+ "termcolor 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)",
+]
+
+[[package]]
 name = "failure"
 version = "0.1.1"
 source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -49,30 +88,117 @@ dependencies = [
 ]
 
 [[package]]
+name = "humantime"
+version = "1.1.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+dependencies = [
+ "quick-error 1.2.2 (registry+https://github.com/rust-lang/crates.io-index)",
+]
+
+[[package]]
 name = "journaldriver"
 version = "0.1.0"
 dependencies = [
+ "env_logger 0.5.10 (registry+https://github.com/rust-lang/crates.io-index)",
  "failure 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)",
  "libc 0.2.41 (registry+https://github.com/rust-lang/crates.io-index)",
+ "log 0.4.1 (registry+https://github.com/rust-lang/crates.io-index)",
  "pkg-config 0.3.11 (registry+https://github.com/rust-lang/crates.io-index)",
+ "systemd 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)",
 ]
 
 [[package]]
+name = "lazy_static"
+version = "1.0.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+
+[[package]]
 name = "libc"
 version = "0.2.41"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 
 [[package]]
+name = "libsystemd-sys"
+version = "0.2.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+dependencies = [
+ "libc 0.2.41 (registry+https://github.com/rust-lang/crates.io-index)",
+ "pkg-config 0.3.11 (registry+https://github.com/rust-lang/crates.io-index)",
+]
+
+[[package]]
+name = "log"
+version = "0.4.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+dependencies = [
+ "cfg-if 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)",
+]
+
+[[package]]
+name = "memchr"
+version = "1.0.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+dependencies = [
+ "libc 0.2.41 (registry+https://github.com/rust-lang/crates.io-index)",
+]
+
+[[package]]
+name = "memchr"
+version = "2.0.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+dependencies = [
+ "libc 0.2.41 (registry+https://github.com/rust-lang/crates.io-index)",
+]
+
+[[package]]
 name = "pkg-config"
 version = "0.3.11"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 
 [[package]]
+name = "quick-error"
+version = "1.2.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+
+[[package]]
 name = "quote"
 version = "0.3.15"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 
 [[package]]
+name = "redox_syscall"
+version = "0.1.38"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+
+[[package]]
+name = "redox_termios"
+version = "0.1.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+dependencies = [
+ "redox_syscall 0.1.38 (registry+https://github.com/rust-lang/crates.io-index)",
+]
+
+[[package]]
+name = "regex"
+version = "1.0.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+dependencies = [
+ "aho-corasick 0.6.4 (registry+https://github.com/rust-lang/crates.io-index)",
+ "memchr 2.0.1 (registry+https://github.com/rust-lang/crates.io-index)",
+ "regex-syntax 0.6.0 (registry+https://github.com/rust-lang/crates.io-index)",
+ "thread_local 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)",
+ "utf8-ranges 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)",
+]
+
+[[package]]
+name = "regex-syntax"
+version = "0.6.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+dependencies = [
+ "ucd-util 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)",
+]
+
+[[package]]
 name = "rustc-demangle"
 version = "0.1.8"
 source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -105,11 +231,78 @@ dependencies = [
 ]
 
 [[package]]
+name = "systemd"
+version = "0.3.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+dependencies = [
+ "cstr-argument 0.0.2 (registry+https://github.com/rust-lang/crates.io-index)",
+ "libc 0.2.41 (registry+https://github.com/rust-lang/crates.io-index)",
+ "libsystemd-sys 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)",
+ "log 0.4.1 (registry+https://github.com/rust-lang/crates.io-index)",
+ "utf8-cstr 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)",
+]
+
+[[package]]
+name = "termcolor"
+version = "0.3.6"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+dependencies = [
+ "wincolor 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)",
+]
+
+[[package]]
+name = "termion"
+version = "1.5.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+dependencies = [
+ "libc 0.2.41 (registry+https://github.com/rust-lang/crates.io-index)",
+ "redox_syscall 0.1.38 (registry+https://github.com/rust-lang/crates.io-index)",
+ "redox_termios 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)",
+]
+
+[[package]]
+name = "thread_local"
+version = "0.3.5"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+dependencies = [
+ "lazy_static 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)",
+ "unreachable 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)",
+]
+
+[[package]]
+name = "ucd-util"
+version = "0.1.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+
+[[package]]
 name = "unicode-xid"
 version = "0.0.4"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 
 [[package]]
+name = "unreachable"
+version = "1.0.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+dependencies = [
+ "void 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)",
+]
+
+[[package]]
+name = "utf8-cstr"
+version = "0.1.6"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+
+[[package]]
+name = "utf8-ranges"
+version = "1.0.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+
+[[package]]
+name = "void"
+version = "1.0.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+
+[[package]]
 name = "winapi"
 version = "0.3.4"
 source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -128,21 +321,54 @@ name = "winapi-x86_64-pc-windows-gnu"
 version = "0.4.0"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 
+[[package]]
+name = "wincolor"
+version = "0.1.6"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+dependencies = [
+ "winapi 0.3.4 (registry+https://github.com/rust-lang/crates.io-index)",
+]
+
 [metadata]
+"checksum aho-corasick 0.6.4 (registry+https://github.com/rust-lang/crates.io-index)" = "d6531d44de723825aa81398a6415283229725a00fa30713812ab9323faa82fc4"
+"checksum atty 0.2.10 (registry+https://github.com/rust-lang/crates.io-index)" = "2fc4a1aa4c24c0718a250f0681885c1af91419d242f29eb8f2ab28502d80dbd1"
 "checksum backtrace 0.3.8 (registry+https://github.com/rust-lang/crates.io-index)" = "dbdd17cd962b570302f5297aea8648d5923e22e555c2ed2d8b2e34eca646bf6d"
 "checksum backtrace-sys 0.1.21 (registry+https://github.com/rust-lang/crates.io-index)" = "b46a4e68c24954dfc8a0e515b069f695481d2997b840356db013ff9e52cdb8fe"
 "checksum cc 1.0.15 (registry+https://github.com/rust-lang/crates.io-index)" = "0ebb87d1116151416c0cf66a0e3fb6430cccd120fd6300794b4dfaa050ac40ba"
 "checksum cfg-if 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)" = "405216fd8fe65f718daa7102ea808a946b6ce40c742998fbfd3463645552de18"
+"checksum cstr-argument 0.0.2 (registry+https://github.com/rust-lang/crates.io-index)" = "514570a4b719329df37f93448a70df2baac553020d0eb43a8dfa9c1f5ba7b658"
+"checksum env_logger 0.5.10 (registry+https://github.com/rust-lang/crates.io-index)" = "0e6e40ebb0e66918a37b38c7acab4e10d299e0463fe2af5d29b9cc86710cfd2a"
 "checksum failure 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "934799b6c1de475a012a02dab0ace1ace43789ee4b99bcfbf1a2e3e8ced5de82"
 "checksum failure_derive 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "c7cdda555bb90c9bb67a3b670a0f42de8e73f5981524123ad8578aafec8ddb8b"
+"checksum humantime 1.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "0484fda3e7007f2a4a0d9c3a703ca38c71c54c55602ce4660c419fd32e188c9e"
+"checksum lazy_static 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)" = "c8f31047daa365f19be14b47c29df4f7c3b581832407daabe6ae77397619237d"
 "checksum libc 0.2.41 (registry+https://github.com/rust-lang/crates.io-index)" = "ac8ebf8343a981e2fa97042b14768f02ed3e1d602eac06cae6166df3c8ced206"
+"checksum libsystemd-sys 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "e751b723417158e0949ba470bee4affd6f1dd6b67622b5240d79186631b6a0d9"
+"checksum log 0.4.1 (registry+https://github.com/rust-lang/crates.io-index)" = "89f010e843f2b1a31dbd316b3b8d443758bc634bed37aabade59c686d644e0a2"
+"checksum memchr 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)" = "148fab2e51b4f1cfc66da2a7c32981d1d3c083a803978268bb11fe4b86925e7a"
+"checksum memchr 2.0.1 (registry+https://github.com/rust-lang/crates.io-index)" = "796fba70e76612589ed2ce7f45282f5af869e0fdd7cc6199fa1aa1f1d591ba9d"
 "checksum pkg-config 0.3.11 (registry+https://github.com/rust-lang/crates.io-index)" = "110d5ee3593dbb73f56294327fe5668bcc997897097cbc76b51e7aed3f52452f"
+"checksum quick-error 1.2.2 (registry+https://github.com/rust-lang/crates.io-index)" = "9274b940887ce9addde99c4eee6b5c44cc494b182b97e73dc8ffdcb3397fd3f0"
 "checksum quote 0.3.15 (registry+https://github.com/rust-lang/crates.io-index)" = "7a6e920b65c65f10b2ae65c831a81a073a89edd28c7cce89475bff467ab4167a"
+"checksum redox_syscall 0.1.38 (registry+https://github.com/rust-lang/crates.io-index)" = "0a12d51a5b5fd700e6c757f15877685bfa04fd7eb60c108f01d045cafa0073c2"
+"checksum redox_termios 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "7e891cfe48e9100a70a3b6eb652fef28920c117d366339687bd5576160db0f76"
+"checksum regex 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)" = "75ecf88252dce580404a22444fc7d626c01815debba56a7f4f536772a5ff19d3"
+"checksum regex-syntax 0.6.0 (registry+https://github.com/rust-lang/crates.io-index)" = "8f1ac0f60d675cc6cf13a20ec076568254472551051ad5dd050364d70671bf6b"
 "checksum rustc-demangle 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)" = "76d7ba1feafada44f2d38eed812bd2489a03c0f5abb975799251518b68848649"
 "checksum syn 0.11.11 (registry+https://github.com/rust-lang/crates.io-index)" = "d3b891b9015c88c576343b9b3e41c2c11a51c219ef067b264bd9c8aa9b441dad"
 "checksum synom 0.11.3 (registry+https://github.com/rust-lang/crates.io-index)" = "a393066ed9010ebaed60b9eafa373d4b1baac186dd7e008555b0f702b51945b6"
 "checksum synstructure 0.6.1 (registry+https://github.com/rust-lang/crates.io-index)" = "3a761d12e6d8dcb4dcf952a7a89b475e3a9d69e4a69307e01a470977642914bd"
+"checksum systemd 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)" = "1b62a732355787f960c25536210ae0a981aca2e5dae9dab8491bdae39613ce48"
+"checksum termcolor 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)" = "adc4587ead41bf016f11af03e55a624c06568b5a19db4e90fde573d805074f83"
+"checksum termion 1.5.1 (registry+https://github.com/rust-lang/crates.io-index)" = "689a3bdfaab439fd92bc87df5c4c78417d3cbe537487274e9b0b2dce76e92096"
+"checksum thread_local 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)" = "279ef31c19ededf577bfd12dfae728040a21f635b06a24cd670ff510edd38963"
+"checksum ucd-util 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "fd2be2d6639d0f8fe6cdda291ad456e23629558d466e2789d2c3e9892bda285d"
 "checksum unicode-xid 0.0.4 (registry+https://github.com/rust-lang/crates.io-index)" = "8c1f860d7d29cf02cb2f3f359fd35991af3d30bac52c57d265a3c461074cb4dc"
+"checksum unreachable 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)" = "382810877fe448991dfc7f0dd6e3ae5d58088fd0ea5e35189655f84e6814fa56"
+"checksum utf8-cstr 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)" = "55bcbb425141152b10d5693095950b51c3745d019363fc2929ffd8f61449b628"
+"checksum utf8-ranges 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)" = "662fab6525a98beff2921d7f61a39e7d59e0b425ebc7d0d9e66d316e55124122"
+"checksum void 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)" = "6a02e4885ed3bc0f2de90ea6dd45ebcbb66dacffe03547fadbb0eeae2770887d"
 "checksum winapi 0.3.4 (registry+https://github.com/rust-lang/crates.io-index)" = "04e3bd221fcbe8a271359c04f21a76db7d0c6028862d1bb5512d85e1e2eb5bb3"
 "checksum winapi-i686-pc-windows-gnu 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)" = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6"
 "checksum winapi-x86_64-pc-windows-gnu 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)" = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f"
+"checksum wincolor 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)" = "eeb06499a3a4d44302791052df005d5232b927ed1a9658146d842165c4de7767"
diff --git a/Cargo.toml b/Cargo.toml
index b32bd023d51c..f521d0c39dd6 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -6,6 +6,9 @@ authors = ["Vincent Ambo <mail@tazj.in>"]
 [dependencies]
 libc = "0.2"
 failure = "0.1"
+systemd = "0.3"
+log = "0.4"
+env_logger = "0.5"
 
 [build-dependencies]
 pkg-config = "0.3"
diff --git a/src/journald.rs b/src/journald.rs
deleted file mode 100644
index 4892553ea57e..000000000000
--- a/src/journald.rs
+++ /dev/null
@@ -1,71 +0,0 @@
-//! This module contains FFI-bindings to the journald APi. See
-//! sd-journal(3) for detailed information about the API.
-//!
-//! Only calls required by journaldriver are implemented.
-
-/// This type represents an opaque pointer to an `sd_journal` struct.
-/// It should be changed to an `extern type` once RF1861 is
-/// stabilized.
-enum SdJournal {}
-
-use failure::Error;
-use std::mem;
-
-extern {
-    fn sd_journal_open(sd_journal: *mut SdJournal, flags: usize) -> usize;
-    fn sd_journal_close(sd_journal: *mut SdJournal);
-    fn sd_journal_next(sd_journal: *mut SdJournal) -> usize;
-}
-
-// Safe interface:
-
-/// This struct contains the opaque data used by libsystemd to
-/// reference the journal.
-pub struct Journal {
-    sd_journal: *mut SdJournal,
-}
-
-impl Drop for Journal {
-    fn drop(&mut self) {
-        unsafe {
-            sd_journal_close(self.sd_journal);
-        }
-    }
-}
-
-/// Open the journal for reading. No flags are supplied to libsystemd,
-/// which means that all journal entries will become available.
-pub fn open_journal() -> Result<Journal, Error> {
-    let (mut sd_journal, result) = unsafe {
-        let mut journal: SdJournal = mem::uninitialized();
-        let result = sd_journal_open(&mut journal, 0);
-        (journal, result)
-    };
-
-    ensure!(result == 0, "Could not open journal (errno: {})", result);
-    Ok(Journal { sd_journal: &mut sd_journal })
-}
-
-#[derive(Debug)]
-pub enum NextEntry {
-    /// If no new entries are available in the journal this variant is
-    /// returned.
-    NoEntry,
-
-    Entry,
-}
-
-impl Journal {
-    pub fn read_next(&self) -> Result<NextEntry, Error> {
-        let result = unsafe {
-            sd_journal_next(self.sd_journal)
-        };
-
-        match result {
-            0 => Ok(NextEntry::NoEntry),
-            1 => Ok(NextEntry::Entry),
-            n if n > 1 => bail!("Journal unexpectedly advanced by {} entries!", n),
-            _ => bail!("An error occured while advancing the journal (errno: {})", result),
-        }
-    }
-}
diff --git a/src/main.rs b/src/main.rs
index aca6aba900ad..9cc9dcee0d63 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -1,22 +1,107 @@
-#[macro_use] extern crate failure;
-extern crate libc;
+// #[macro_use] extern crate failure;
+#[macro_use] extern crate log;
 
-mod journald;
+extern crate env_logger;
+extern crate systemd;
 
+use systemd::journal::*;
 use std::process;
+use std::thread;
+use std::sync::mpsc::{channel, Receiver};
+use std::time::{Duration, Instant};
+use std::collections::vec_deque::{VecDeque, Drain};
 
-fn main() {
-    let mut journal = match journald::open_journal() {
-        Ok(journal) => journal,
-        Err(e) => {
-            println!("{}", e);
-            process::exit(1);
-        },
-    };
+#[derive(Debug)]
+struct Record {
+    message: Option<String>,
+    hostname: Option<String>,
+    unit: Option<String>,
+    timestamp: Option<String>,
+}
+
+impl From<JournalRecord> for Record {
+    fn from(mut record: JournalRecord) -> Record {
+        Record {
+            // The message field is technically just a convention, but
+            // journald seems to default to it when ingesting unit
+            // output.
+            message: record.remove("MESSAGE"),
+
+            // Presumably this is always set, but who can be sure
+            // about anything in this world.
+            hostname: record.remove("_HOSTNAME"),
+
+            // The unit is seemingly missing on kernel entries, but
+            // present on all others.
+            unit: record.remove("_SYSTEMD_UNIT"),
+
+            // This timestamp is present on most log entries
+            // (seemingly all that are ingested from the output
+            // systemd units).
+            timestamp: record.remove("_SOURCE_REALTIME_TIMESTAMP"),
+        }
+    }
+}
+
+/// This function spawns a double-looped, blocking receiver. It will
+/// buffer messages for a second before flushing them to Stackdriver.
+fn receiver_loop(rx: Receiver<Record>) {
+    let mut buf = VecDeque::new();
+    let iteration = Duration::from_millis(500);
+
+    loop {
+        trace!("Beginning outer iteration");
+        let now = Instant::now();
+
+        loop {
+            if now.elapsed() > iteration {
+                break;
+            }
+
+            if let Ok(record) = rx.recv_timeout(iteration) {
+                buf.push_back(record);
+            }
+        }
+
+        if !buf.is_empty() {
+            flush(buf.drain(..));
+        }
+
+        trace!("Done outer iteration");
+    }
+}
+
+/// Flushes all drained records to Stackdriver.
+fn flush(drain: Drain<Record>) {
+    let record_count = drain.count();
+    debug!("Flushed {} records", record_count);
+}
+
+fn main () {
+    env_logger::init();
+
+    let mut journal = Journal::open(JournalFiles::All, false, true)
+        .expect("Failed to open systemd journal");
+
+    match journal.seek(JournalSeek::Tail) {
+        Ok(cursor) => info!("Opened journal at cursor '{}'", cursor),
+        Err(err) => {
+            error!("Failed to set initial journal position: {}", err);
+            process::exit(1)
+        }
+    }
+
+    let (tx, rx) = channel();
+
+    let _receiver = thread::spawn(move || receiver_loop(rx));
+
+    journal.watch_all_elements(move |record| {
+        let record: Record = record.into();
 
-    println!("foo");
-    
-    let entry = journal.read_next();
+        if record.message.is_some() {
+            tx.send(record).ok();
+        }
 
-    println!("Entry: {:?}", entry)
+        Ok(())
+    }).expect("Failed to read new entries from journal");
 }