diff options
Diffstat (limited to 'ops/journaldriver')
-rw-r--r-- | ops/journaldriver/.gitignore | 3 | ||||
-rw-r--r-- | ops/journaldriver/Cargo.lock | 511 | ||||
-rw-r--r-- | ops/journaldriver/Cargo.toml | 22 | ||||
-rw-r--r-- | ops/journaldriver/README.md | 152 | ||||
-rw-r--r-- | ops/journaldriver/build.rs | 5 | ||||
-rw-r--r-- | ops/journaldriver/default.nix | 11 | ||||
-rw-r--r-- | ops/journaldriver/src/main.rs | 638 | ||||
-rw-r--r-- | ops/journaldriver/src/tests.rs | 131 |
8 files changed, 1473 insertions, 0 deletions
diff --git a/ops/journaldriver/.gitignore b/ops/journaldriver/.gitignore new file mode 100644 index 000000000000..29e65519ba35 --- /dev/null +++ b/ops/journaldriver/.gitignore @@ -0,0 +1,3 @@ +result +/target +**/*.rs.bk diff --git a/ops/journaldriver/Cargo.lock b/ops/journaldriver/Cargo.lock new file mode 100644 index 000000000000..0b7afd993291 --- /dev/null +++ b/ops/journaldriver/Cargo.lock @@ -0,0 +1,511 @@ +# This file is automatically @generated by Cargo. +# It is not intended for manual editing. +version = 3 + +[[package]] +name = "aho-corasick" +version = "0.7.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e37cfd5e7657ada45f742d6e99ca5788580b5c529dc78faf11ece6dc702656f" +dependencies = [ + "memchr 2.4.1", +] + +[[package]] +name = "anyhow" +version = "1.0.56" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4361135be9122e0870de935d7c439aef945b9f9ddd4199a553b5270b49c82a27" + +[[package]] +name = "atty" +version = "0.2.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d9b39be18770d11421cdb1b9947a45dd3f37e93092cbf377614828a319d5fee8" +dependencies = [ + "hermit-abi", + "libc", + "winapi", +] + +[[package]] +name = "autocfg" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa" + +[[package]] +name = "base64" +version = "0.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "904dfeac50f3cdaba28fc6f57fdcddb75f49ed61346676a78c4ffe55877802fd" + +[[package]] +name = "bitflags" +version = "1.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a" + +[[package]] +name = "cc" +version = "1.0.73" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2fff2a6927b3bb87f9595d67196a70493f627687a71d87a0d692242c33f58c11" + +[[package]] +name = "cfg-if" +version = "0.1.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4785bdd1c96b2a846b2bd7cc02e86b6b3dbf14e7e53446c4f54c92a361040822" + +[[package]] +name = "cfg-if" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" + +[[package]] +name = "crimp" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bbe8f9a320ad9c1a2e3bacedaa281587bd297fb10a10179fd39f777049d04794" +dependencies = [ + "curl", + "serde", + "serde_json", +] + +[[package]] +name = "cstr-argument" +version = "0.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "514570a4b719329df37f93448a70df2baac553020d0eb43a8dfa9c1f5ba7b658" +dependencies = [ + "cfg-if 0.1.10", + "memchr 1.0.2", +] + +[[package]] +name = "curl" +version = "0.4.43" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "37d855aeef205b43f65a5001e0997d81f8efca7badad4fad7d897aa7f0d0651f" +dependencies = [ + "curl-sys", + "libc", + "openssl-probe", + "openssl-sys", + "schannel", + "socket2", + "winapi", +] + +[[package]] +name = "curl-sys" +version = "0.4.53+curl-7.82.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8092905a5a9502c312f223b2775f57ec5c5b715f9a15ee9d2a8591d1364a0352" +dependencies = [ + "cc", + "libc", + "libz-sys", + "openssl-sys", + "pkg-config", + "vcpkg", + "winapi", +] + +[[package]] +name = "env_logger" +version = "0.5.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "15b0a4d2e39f8420210be8b27eeda28029729e2fd4291019455016c348240c38" +dependencies = [ + "atty", + "humantime", + "log", + "regex", + "termcolor", +] + +[[package]] +name = "foreign-types" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f6f339eb8adc052cd2ca78910fda869aefa38d22d5cb648e6485e4d3fc06f3b1" +dependencies = [ + "foreign-types-shared", +] + +[[package]] +name = "foreign-types-shared" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "00b0228411908ca8685dba7fc2cdd70ec9990a6e753e89b6ac91a84c40fbaf4b" + +[[package]] +name = "hermit-abi" +version = "0.1.19" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "62b467343b94ba476dcb2500d242dadbb39557df889310ac77c5d99100aaac33" +dependencies = [ + "libc", +] + +[[package]] +name = "humantime" +version = "1.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "df004cfca50ef23c36850aaaa59ad52cc70d0e90243c3c7737a4dd32dc7a3c4f" +dependencies = [ + "quick-error", +] + +[[package]] +name = "itoa" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1aab8fc367588b89dcee83ab0fd66b72b50b72fa1904d7095045ace2b0c81c35" + +[[package]] +name = "journaldriver" +version = "1.1.0" +dependencies = [ + "anyhow", + "crimp", + "env_logger", + "lazy_static", + "log", + "medallion", + "pkg-config", + "serde", + "serde_derive", + "serde_json", + "systemd", + "time", +] + +[[package]] +name = "lazy_static" +version = "1.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646" + +[[package]] +name = "libc" +version = "0.2.123" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cb691a747a7ab48abc15c5b42066eaafde10dc427e3b6ee2a1cf43db04c763bd" + +[[package]] +name = "libsystemd-sys" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d7b98458cd04a5c3aacba6f1a3a3c4b9abcb0ae4d66a055eee502e0d52dc226b" +dependencies = [ + "libc", + "pkg-config", +] + +[[package]] +name = "libz-sys" +version = "1.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6f35facd4a5673cb5a48822be2be1d4236c1c99cb4113cab7061ac720d5bf859" +dependencies = [ + "cc", + "libc", + "pkg-config", + "vcpkg", +] + +[[package]] +name = "log" +version = "0.4.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6389c490849ff5bc16be905ae24bc913a9c8892e19b2341dbc175e14c341c2b8" +dependencies = [ + "cfg-if 1.0.0", +] + +[[package]] +name = "medallion" +version = "2.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "35b83c0c3277d722b53a6eb24e3c1321172f85b715cc7405add8ffd1f2f06288" +dependencies = [ + "anyhow", + "base64", + "openssl", + "serde", + "serde_json", + "time", +] + +[[package]] +name = "memchr" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "148fab2e51b4f1cfc66da2a7c32981d1d3c083a803978268bb11fe4b86925e7a" +dependencies = [ + "libc", +] + +[[package]] +name = "memchr" +version = "2.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "308cc39be01b73d0d18f82a0e7b2a3df85245f84af96fdddc5d202d27e47b86a" + +[[package]] +name = "num_threads" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "aba1801fb138d8e85e11d0fc70baf4fe1cdfffda7c6cd34a854905df588e5ed0" +dependencies = [ + "libc", +] + +[[package]] +name = "once_cell" +version = "1.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "87f3e037eac156d1775da914196f0f37741a274155e34a0b7e427c35d2a2ecb9" + +[[package]] +name = "openssl" +version = "0.10.38" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0c7ae222234c30df141154f159066c5093ff73b63204dcda7121eb082fc56a95" +dependencies = [ + "bitflags", + "cfg-if 1.0.0", + "foreign-types", + "libc", + "once_cell", + "openssl-sys", +] + +[[package]] +name = "openssl-probe" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ff011a302c396a5197692431fc1948019154afc178baf7d8e37367442a4601cf" + +[[package]] +name = "openssl-sys" +version = "0.9.72" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7e46109c383602735fa0a2e48dd2b7c892b048e1bf69e5c3b1d804b7d9c203cb" +dependencies = [ + "autocfg", + "cc", + "libc", + "pkg-config", + "vcpkg", +] + +[[package]] +name = "pkg-config" +version = "0.3.25" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1df8c4ec4b0627e53bdf214615ad287367e482558cf84b109250b37464dc03ae" + +[[package]] +name = "proc-macro2" +version = "1.0.37" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ec757218438d5fda206afc041538b2f6d889286160d649a86a24d37e1235afd1" +dependencies = [ + "unicode-xid", +] + +[[package]] +name = "quick-error" +version = "1.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a1d01941d82fa2ab50be1e79e6714289dd7cde78eba4c074bc5a4374f650dfe0" + +[[package]] +name = "quote" +version = "1.0.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a1feb54ed693b93a84e14094943b84b7c4eae204c512b7ccb95ab0c66d278ad1" +dependencies = [ + "proc-macro2", +] + +[[package]] +name = "regex" +version = "1.5.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1a11647b6b25ff05a515cb92c365cec08801e83423a235b51e231e1808747286" +dependencies = [ + "aho-corasick", + "memchr 2.4.1", + "regex-syntax", +] + +[[package]] +name = "regex-syntax" +version = "0.6.25" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f497285884f3fcff424ffc933e56d7cbca511def0c9831a7f9b5f6153e3cc89b" + +[[package]] +name = "ryu" +version = "1.0.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "73b4b750c782965c211b42f022f59af1fbceabdd026623714f104152f1ec149f" + +[[package]] +name = "schannel" +version = "0.1.19" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8f05ba609c234e60bee0d547fe94a4c7e9da733d1c962cf6e59efa4cd9c8bc75" +dependencies = [ + "lazy_static", + "winapi", +] + +[[package]] +name = "serde" +version = "1.0.136" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ce31e24b01e1e524df96f1c2fdd054405f8d7376249a5110886fb4b658484789" +dependencies = [ + "serde_derive", +] + +[[package]] +name = "serde_derive" +version = "1.0.136" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "08597e7152fcd306f41838ed3e37be9eaeed2b61c42e2117266a554fab4662f9" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "serde_json" +version = "1.0.79" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8e8d9fa5c3b304765ce1fd9c4c8a3de2c8db365a5b91be52f186efc675681d95" +dependencies = [ + "itoa", + "ryu", + "serde", +] + +[[package]] +name = "socket2" +version = "0.4.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "66d72b759436ae32898a2af0a14218dbf55efde3feeb170eb623637db85ee1e0" +dependencies = [ + "libc", + "winapi", +] + +[[package]] +name = "syn" +version = "1.0.91" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b683b2b825c8eef438b77c36a06dc262294da3d5a5813fac20da149241dcd44d" +dependencies = [ + "proc-macro2", + "quote", + "unicode-xid", +] + +[[package]] +name = "systemd" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1b62a732355787f960c25536210ae0a981aca2e5dae9dab8491bdae39613ce48" +dependencies = [ + "cstr-argument", + "libc", + "libsystemd-sys", + "log", + "utf8-cstr", +] + +[[package]] +name = "termcolor" +version = "1.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bab24d30b911b2376f3a13cc2cd443142f0c81dda04c118693e35b3835757755" +dependencies = [ + "winapi-util", +] + +[[package]] +name = "time" +version = "0.3.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c2702e08a7a860f005826c6815dcac101b19b5eb330c27fe4a5928fec1d20ddd" +dependencies = [ + "itoa", + "libc", + "num_threads", + "serde", + "time-macros", +] + +[[package]] +name = "time-macros" +version = "0.2.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "42657b1a6f4d817cda8e7a0ace261fe0cc946cf3a80314390b22cc61ae080792" + +[[package]] +name = "unicode-xid" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8ccb82d61f80a663efe1f787a51b16b5a51e3314d6ac365b08639f52387b33f3" + +[[package]] +name = "utf8-cstr" +version = "0.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "55bcbb425141152b10d5693095950b51c3745d019363fc2929ffd8f61449b628" + +[[package]] +name = "vcpkg" +version = "0.2.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "accd4ea62f7bb7a82fe23066fb0957d48ef677f6eeb8215f372f52e48bb32426" + +[[package]] +name = "winapi" +version = "0.3.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5c839a674fcd7a98952e593242ea400abe93992746761e38641405d28b00f419" +dependencies = [ + "winapi-i686-pc-windows-gnu", + "winapi-x86_64-pc-windows-gnu", +] + +[[package]] +name = "winapi-i686-pc-windows-gnu" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6" + +[[package]] +name = "winapi-util" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "70ec6ce85bb158151cae5e5c87f95a8e97d2c0c4b001223f33a334e3ce5de178" +dependencies = [ + "winapi", +] + +[[package]] +name = "winapi-x86_64-pc-windows-gnu" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" diff --git a/ops/journaldriver/Cargo.toml b/ops/journaldriver/Cargo.toml new file mode 100644 index 000000000000..4c32b893f77d --- /dev/null +++ b/ops/journaldriver/Cargo.toml @@ -0,0 +1,22 @@ +[package] +name = "journaldriver" +version = "1.1.0" +authors = ["Vincent Ambo <mail@tazj.in>"] +license = "GPL-3.0-or-later" +edition = "2021" + +[dependencies] +anyhow = "1.0" +crimp = "0.2" +env_logger = "0.5" +lazy_static = "1.0" +log = "0.4" +medallion = "2.5" +serde = "1.0" +serde_derive = "1.0" +serde_json = "1.0" +systemd = "0.3" +time = { version = "0.3", features = [ "serde-well-known", "macros" ]} + +[build-dependencies] +pkg-config = "0.3" diff --git a/ops/journaldriver/README.md b/ops/journaldriver/README.md new file mode 100644 index 000000000000..4dc9de0f617b --- /dev/null +++ b/ops/journaldriver/README.md @@ -0,0 +1,152 @@ +journaldriver +============= + +This is a small daemon used to forward logs from `journald` (systemd's +logging service) to [Stackdriver Logging][]. + +Many existing log services are written in inefficient dynamic +languages with error-prone "cover every possible use-case" +configuration. `journaldriver` instead aims to fit a specific use-case +very well, instead of covering every possible logging setup. + +`journaldriver` can be run on GCP-instances with no additional +configuration as authentication tokens are retrieved from the +[metadata server][]. + +<!-- markdown-toc start - Don't edit this section. Run M-x markdown-toc-refresh-toc --> +**Table of Contents** + +- [Features](#features) +- [Usage on Google Cloud Platform](#usage-on-google-cloud-platform) +- [Usage outside of Google Cloud Platform](#usage-outside-of-google-cloud-platform) +- [Log levels / severities / priorities](#log-levels--severities--priorities) +- [NixOS module](#nixos-module) +- [Stackdriver Error Reporting](#stackdriver-error-reporting) + +<!-- markdown-toc end --> + +# Features + +* `journaldriver` persists the last forwarded position in the journal + and will resume forwarding at the same position after a restart +* `journaldriver` will recognise log entries in JSON format and + forward them appropriately to make structured log entries available + in Stackdriver +* `journaldriver` can be used outside of GCP by configuring static + credentials +* `journaldriver` will recognise journald's log priority levels and + convert them into equivalent Stackdriver log severity levels + +# Usage on Google Cloud Platform + +`journaldriver` does not require any configuration when running on GCP +instances. + +1. Install `journaldriver` on the instance from which you wish to + forward logs. + +2. Ensure that the instance has the appropriate permissions to write + to Stackdriver. Google continously changes how IAM is implemented + on GCP, so you will have to refer to [Google's documentation][]. + + By default instances have the required permissions if Stackdriver + Logging support is enabled in the project. + +3. Start `journaldriver`, for example via `systemd`. + +# Usage outside of Google Cloud Platform + +When running outside of GCP, the following extra steps need to be +performed: + +1. Create a Google Cloud Platform service account with the "Log + Writer" role and download its private key in JSON-format. +2. When starting `journaldriver`, configure the following environment + variables: + + * `GOOGLE_CLOUD_PROJECT`: Name of the GCP project to which logs + should be written. + * `GOOGLE_APPLICATION_CREDENTIALS`: Filesystem path to the + JSON-file containing the service account's private key. + * `LOG_STREAM`: Name of the target log stream in Stackdriver Logging. + This will be automatically created if it does not yet exist. + * `LOG_NAME`: Name of the target log to write to. This defaults to + `journaldriver` if unset, but it is recommended to - for + example - set it to the machine hostname. + +# Log levels / severities / priorities + +`journaldriver` recognises [journald's priorities][] and converts them +into [equivalent severities][] in Stackdriver. Both sets of values +correspond to standard `syslog` priorities. + +The easiest way to emit log messages with priorites from an +application is to use [priority prefixes][], which are compatible with +structured log messages. + +For example, to emit a simple warning message (structured and +unstructured): + +``` +$ echo '<4>{"fnord":true, "msg":"structured log (warning)"}' | systemd-cat +$ echo '<4>unstructured log (warning)' | systemd-cat +``` + +# NixOS module + +The NixOS package repository [contains a module][] for setting up +`journaldriver` on NixOS machines. NixOS by default uses `systemd` for +service management and `journald` for logging, which means that log +output from most services will be captured automatically. + +On a GCP instance the only required option is this: + +```nix +services.journaldriver.enable = true; +``` + +When running outside of GCP, the configuration looks as follows: + +```nix +services.journaldriver = { + enable = true; + logStream = "prod-environment"; + logName = "hostname"; + googleCloudProject = "gcp-project-name"; + applicationCredentials = keyFile; +}; +``` + +**Note**: The `journaldriver`-module is included in stable releases of +NixOS since NixOS 18.09. + +# Stackdriver Error Reporting + +The [Stackdriver Error Reporting][] service of Google's monitoring +toolbox supports automatically detecting and correlating errors from +log entries. + +To use this functionality log messages must be logged in the expected +[log format][]. + +*Note*: Reporting errors from non-GCP instances requires that the +`LOG_STREAM` environment variable is set to the special value +`global`. + +This value changes the monitored resource descriptor from a log stream +to the project-global stream. Due to a limitation in Stackdriver Error +Reporting, this is the only way to correctly ingest errors from +non-GCP machines. Please see [issue #4][] for more information about +this. + +[Stackdriver Logging]: https://cloud.google.com/logging/ +[metadata server]: https://cloud.google.com/compute/docs/storing-retrieving-metadata +[Google's documentation]: https://cloud.google.com/logging/docs/access-control +[NixOS]: https://nixos.org/ +[contains a module]: https://github.com/NixOS/nixpkgs/pull/42134 +[journald's priorities]: http://0pointer.de/public/systemd-man/sd-daemon.html +[equivalent severities]: https://cloud.google.com/logging/docs/reference/v2/rest/v2/LogEntry#logseverity +[priority prefixes]: http://0pointer.de/public/systemd-man/sd-daemon.html +[Stackdriver Error Reporting]: https://cloud.google.com/error-reporting/ +[log format]: https://cloud.google.com/error-reporting/docs/formatting-error-messages +[issue #4]: https://github.com/tazjin/journaldriver/issues/4 diff --git a/ops/journaldriver/build.rs b/ops/journaldriver/build.rs new file mode 100644 index 000000000000..79eb1001bf09 --- /dev/null +++ b/ops/journaldriver/build.rs @@ -0,0 +1,5 @@ +extern crate pkg_config; + +fn main() { + pkg_config::probe_library("libsystemd").expect("Could not probe libsystemd"); +} diff --git a/ops/journaldriver/default.nix b/ops/journaldriver/default.nix new file mode 100644 index 000000000000..a06a858fa12a --- /dev/null +++ b/ops/journaldriver/default.nix @@ -0,0 +1,11 @@ +{ depot, pkgs, ... }: + +depot.third_party.naersk.buildPackage { + src = ./.; + + buildInputs = with pkgs; [ + pkgconfig + openssl + systemd.dev + ]; +} diff --git a/ops/journaldriver/src/main.rs b/ops/journaldriver/src/main.rs new file mode 100644 index 000000000000..4c404e607e5b --- /dev/null +++ b/ops/journaldriver/src/main.rs @@ -0,0 +1,638 @@ +// Copyright (C) 2018 Vincent Ambo <mail@tazj.in> +// +// journaldriver is free software: you can redistribute it and/or +// modify it under the terms of the GNU General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program. If not, see <http://www.gnu.org/licenses/>. + +//! This file implements journaldriver, a small application that +//! forwards logs from journald (systemd's log facility) to +//! Stackdriver Logging. +//! +//! Log entries are read continously from journald and are forwarded +//! to Stackdriver in batches. +//! +//! Stackdriver Logging has a concept of monitored resources. In the +//! simplest case this monitored resource will be the GCE instance on +//! which journaldriver is running. +//! +//! Information about the instance, the project and required security +//! credentials are retrieved from Google's metadata instance on GCP. +//! +//! To run journaldriver on non-GCP machines, users must specify the +//! `GOOGLE_APPLICATION_CREDENTIALS`, `GOOGLE_CLOUD_PROJECT` and +//! `LOG_NAME` environment variables. + +use anyhow::{bail, Context, Result}; +use lazy_static::lazy_static; +use log::{debug, error, info, trace}; +use serde::{Deserialize, Serialize}; +use serde_json::{from_str, json, Value}; +use std::convert::TryInto; +use std::fs::{self, rename, File}; +use std::io::{self, ErrorKind, Read, Write}; +use std::path::PathBuf; +use std::time::{Duration, Instant}; +use std::{env, mem, process}; +use systemd::journal::{Journal, JournalFiles, JournalRecord, JournalSeek}; + +#[cfg(test)] +mod tests; + +const LOGGING_SERVICE: &str = "https://logging.googleapis.com/google.logging.v2.LoggingServiceV2"; +const ENTRIES_WRITE_URL: &str = "https://logging.googleapis.com/v2/entries:write"; +const METADATA_TOKEN_URL: &str = + "http://metadata.google.internal/computeMetadata/v1/instance/service-accounts/default/token"; +const METADATA_ID_URL: &str = "http://metadata.google.internal/computeMetadata/v1/instance/id"; +const METADATA_ZONE_URL: &str = "http://metadata.google.internal/computeMetadata/v1/instance/zone"; +const METADATA_PROJECT_URL: &str = + "http://metadata.google.internal/computeMetadata/v1/project/project-id"; + +/// Representation of static service account credentials for GCP. +#[derive(Debug, Deserialize)] +struct Credentials { + /// PEM encoded private key + private_key: String, + + /// `kid` of this private key + private_key_id: String, + + /// "email" address of the service account + client_email: String, +} + +lazy_static! { + /// ID of the GCP project to which to send logs. + static ref PROJECT_ID: String = get_project_id(); + + /// Name of the log to write to (this should only be manually + /// configured if not running on GCP): + static ref LOG_NAME: String = env::var("LOG_NAME") + .unwrap_or("journaldriver".into()); + + /// Service account credentials (if configured) + static ref SERVICE_ACCOUNT_CREDENTIALS: Option<Credentials> = + env::var("GOOGLE_APPLICATION_CREDENTIALS").ok() + .and_then(|path| File::open(path).ok()) + .and_then(|file| serde_json::from_reader(file).ok()); + + /// Descriptor of the currently monitored instance. Refer to the + /// documentation of `determine_monitored_resource` for more + /// information. + static ref MONITORED_RESOURCE: Value = determine_monitored_resource(); + + /// Path to the directory in which journaldriver should persist + /// its cursor state. + static ref CURSOR_DIR: PathBuf = env::var("CURSOR_POSITION_DIR") + .unwrap_or("/var/lib/journaldriver".into()) + .into(); + + /// Path to the cursor position file itself. + static ref CURSOR_FILE: PathBuf = { + let mut path = CURSOR_DIR.clone(); + path.push("cursor.pos"); + path + }; + + /// Path to the temporary file used for cursor position writes. + static ref CURSOR_TMP_FILE: PathBuf = { + let mut path = CURSOR_DIR.clone(); + path.push("cursor.tmp"); + path + }; +} + +/// Convenience helper for retrieving values from the metadata server. +fn get_metadata(url: &str) -> Result<String> { + let response = crimp::Request::get(url) + .header("Metadata-Flavor", "Google")? + .timeout(std::time::Duration::from_secs(5))? + .send()? + .as_string()?; + + if !response.is_success() { + bail!( + "Error response ({}) from metadata server: {}", + response.status, + response.body + ); + } + + Ok(response.body.trim().to_owned()) +} + +/// Convenience helper for determining the project ID. +fn get_project_id() -> String { + env::var("GOOGLE_CLOUD_PROJECT") + .or_else(|_| get_metadata(METADATA_PROJECT_URL)) + .expect("Could not determine project ID") +} + +/// Determines the monitored resource descriptor used in Stackdriver +/// logs. On GCP this will be set to the instance ID as returned by +/// the metadata server. +/// +/// On non-GCP machines the value is determined by using the +/// `GOOGLE_CLOUD_PROJECT` and `LOG_STREAM` environment variables. +/// +/// [issue #4]: https://github.com/tazjin/journaldriver/issues/4 +fn determine_monitored_resource() -> Value { + if let Ok(log) = env::var("LOG_STREAM") { + // The special value `global` is recognised as a log stream name that + // results in a `global`-type resource descriptor. This is useful in + // cases where Stackdriver Error Reporting is intended to be used on + // a non-GCE instance. See [issue #4][] for details. + if log == "global" { + return json!({ + "type": "global", + "labels": { + "project_id": PROJECT_ID.as_str(), + } + }); + } + + json!({ + "type": "logging_log", + "labels": { + "project_id": PROJECT_ID.as_str(), + "name": log, + } + }) + } else { + let instance_id = get_metadata(METADATA_ID_URL).expect("Could not determine instance ID"); + + let zone = get_metadata(METADATA_ZONE_URL).expect("Could not determine instance zone"); + + json!({ + "type": "gce_instance", + "labels": { + "project_id": PROJECT_ID.as_str(), + "instance_id": instance_id, + "zone": zone, + } + }) + } +} + +/// Represents the response returned by the metadata server's token +/// endpoint. The token is normally valid for an hour. +#[derive(Deserialize)] +struct TokenResponse { + expires_in: u64, + access_token: String, +} + +/// Struct used to store a token together with a sensible +/// representation of when it expires. +struct Token { + token: String, + fetched_at: Instant, + expires: Duration, +} + +impl Token { + /// Does this token need to be renewed? + fn is_expired(&self) -> bool { + self.fetched_at.elapsed() > self.expires + } +} + +/// Retrieves a token from the GCP metadata service. Retrieving these +/// tokens requires no additional authentication. +fn get_metadata_token() -> Result<Token> { + let body = get_metadata(METADATA_TOKEN_URL)?; + let token: TokenResponse = from_str(&body)?; + + debug!("Fetched new token from metadata service"); + + Ok(Token { + fetched_at: Instant::now(), + expires: Duration::from_secs(token.expires_in / 2), + token: token.access_token, + }) +} + +/// Signs a token using static client credentials configured for a +/// service account. This service account must have been given the +/// `Log Writer` role in Google Cloud IAM. +/// +/// The process for creating and signing these tokens is described +/// here: +/// +/// https://developers.google.com/identity/protocols/OAuth2ServiceAccount#jwt-auth +fn sign_service_account_token(credentials: &Credentials) -> Result<Token> { + use medallion::{Algorithm, Header, Payload}; + + let iat = time::OffsetDateTime::now_utc(); + let exp = iat + time::Duration::seconds(3600); + + let header = Header { + alg: Algorithm::RS256, + headers: Some(json!({ + "kid": credentials.private_key_id, + })), + }; + + let payload: Payload<()> = Payload { + iss: Some(credentials.client_email.clone()), + sub: Some(credentials.client_email.clone()), + aud: Some(LOGGING_SERVICE.to_string()), + iat: Some(iat.unix_timestamp().try_into().unwrap()), + exp: Some(exp.unix_timestamp().try_into().unwrap()), + ..Default::default() + }; + + let token = medallion::Token::new(header, payload) + .sign(credentials.private_key.as_bytes()) + .context("Signing service account token failed")?; + + debug!("Signed new service account token"); + + Ok(Token { + token, + fetched_at: Instant::now(), + expires: Duration::from_secs(3000), + }) +} + +/// Retrieve the authentication token either by using static client +/// credentials, or by talking to the metadata server. +/// +/// Which behaviour is used is controlled by the environment variable +/// `GOOGLE_APPLICATION_CREDENTIALS`, which should be configured to +/// point at a JSON private key file if service account authentication +/// is to be used. +fn get_token() -> Result<Token> { + if let Some(credentials) = SERVICE_ACCOUNT_CREDENTIALS.as_ref() { + sign_service_account_token(credentials) + } else { + get_metadata_token() + } +} + +/// This structure represents the different types of payloads +/// supported by journaldriver. +/// +/// Currently log entries can either contain plain text messages or +/// structured payloads in JSON-format. +#[derive(Debug, Serialize, PartialEq)] +#[serde(untagged)] +enum Payload { + TextPayload { + #[serde(rename = "textPayload")] + text_payload: String, + }, + JsonPayload { + #[serde(rename = "jsonPayload")] + json_payload: Value, + }, +} + +/// Attempt to parse a log message as JSON and return it as a +/// structured payload. If parsing fails, return the entry in plain +/// text format. +fn message_to_payload(message: Option<String>) -> Payload { + match message { + None => Payload::TextPayload { + text_payload: "empty log entry".into(), + }, + Some(text_payload) => { + // Attempt to deserialize the text payload as a generic + // JSON value. + if let Ok(json_payload) = serde_json::from_str::<Value>(&text_payload) { + // If JSON-parsing succeeded on the payload, check + // whether we parsed an object (Stackdriver does not + // expect other types of JSON payload) and return it + // in that case. + if json_payload.is_object() { + return Payload::JsonPayload { json_payload }; + } + } + + Payload::TextPayload { text_payload } + } + } +} + +/// Attempt to parse journald's microsecond timestamps into a UTC +/// timestamp. +/// +/// Parse errors are dismissed and returned as empty options: There +/// simply aren't any useful fallback mechanisms other than defaulting +/// to ingestion time for journaldriver's use-case. +fn parse_microseconds(input: String) -> Option<time::OffsetDateTime> { + if input.len() != 16 { + return None; + } + + let micros: i128 = input.parse().ok()?; + let nanos: i128 = micros * 1000; + + time::OffsetDateTime::from_unix_timestamp_nanos(nanos).ok() +} + +/// Converts a journald log message priority to a +/// Stackdriver-compatible severity number. +/// +/// Both Stackdriver and journald specify equivalent +/// severities/priorities. Conveniently, the names are the same. +/// Inconveniently, the numbers are not. +/// +/// For more information on the journald priorities, consult these +/// man-pages: +/// +/// * systemd.journal-fields(7) (section 'PRIORITY') +/// * sd-daemon(3) +/// * systemd.exec(5) (section 'SyslogLevelPrefix') +/// +/// Note that priorities can be logged by applications via the prefix +/// concept described in these man pages, without interfering with +/// structured JSON-payloads. +/// +/// For more information on the Stackdriver severity levels, please +/// consult Google's documentation: +/// +/// https://cloud.google.com/logging/docs/reference/v2/rest/v2/LogEntry#LogSeverity +/// +/// Any unknown priority values result in no severity being set. +fn priority_to_severity(priority: String) -> Option<u32> { + match priority.as_ref() { + "0" => Some(800), // emerg + "1" => Some(700), // alert + "2" => Some(600), // crit + "3" => Some(500), // err + "4" => Some(400), // warning + "5" => Some(300), // notice + "6" => Some(200), // info + "7" => Some(100), // debug + _ => None, + } +} + +/// This structure represents a log entry in the format expected by +/// the Stackdriver API. +#[derive(Debug, Serialize)] +#[serde(rename_all = "camelCase")] +struct LogEntry { + labels: Value, + + #[serde(skip_serializing_if = "Option::is_none")] + #[serde(with = "time::serde::rfc3339::option")] + timestamp: Option<time::OffsetDateTime>, + + #[serde(flatten)] + payload: Payload, + + // https://cloud.google.com/logging/docs/reference/v2/rest/v2/LogEntry#LogSeverity + #[serde(skip_serializing_if = "Option::is_none")] + severity: Option<u32>, +} + +impl From<JournalRecord> for LogEntry { + // Converts from the fields contained in a journald record to the + // representation required by Stackdriver Logging. + // + // The fields are documented in systemd.journal-fields(7). + fn from(mut record: JournalRecord) -> LogEntry { + // The message field is technically just a convention, but + // journald seems to default to it when ingesting unit + // output. + let payload = message_to_payload(record.remove("MESSAGE")); + + // Presumably this is always set, but who can be sure + // about anything in this world. + let hostname = record.remove("_HOSTNAME"); + + // The unit is seemingly missing on kernel entries, but + // present on all others. + let unit = record.remove("_SYSTEMD_UNIT"); + + // The source timestamp (if present) is specified in + // microseconds since epoch. + // + // If it is not present or can not be parsed, journaldriver + // will not send a timestamp for the log entry and it will + // default to the ingestion time. + let timestamp = record + .remove("_SOURCE_REALTIME_TIMESTAMP") + .and_then(parse_microseconds); + + // Journald uses syslogd's concept of priority. No idea if this is + // always present, but it's optional in the Stackdriver API, so we just + // omit it if we can't find or parse it. + let severity = record.remove("PRIORITY").and_then(priority_to_severity); + + LogEntry { + payload, + timestamp, + labels: json!({ + "host": hostname, + "unit": unit.unwrap_or_else(|| "syslog".into()), + }), + severity, + } + } +} + +/// Attempt to read from the journal. If no new entry is present, +/// await the next one up to the specified timeout. +fn receive_next_record(timeout: Duration, journal: &mut Journal) -> Result<Option<JournalRecord>> { + let next_record = journal.next_record()?; + if next_record.is_some() { + return Ok(next_record); + } + + Ok(journal.await_next_record(Some(timeout))?) +} + +/// This function starts a double-looped, blocking receiver. It will +/// buffer messages for half a second before flushing them to +/// Stackdriver. +fn receiver_loop(mut journal: Journal) -> Result<()> { + let mut token = get_token()?; + + let mut buf: Vec<LogEntry> = Vec::new(); + let iteration = Duration::from_millis(500); + + loop { + trace!("Beginning outer iteration"); + let now = Instant::now(); + + loop { + if now.elapsed() > iteration { + break; + } + + if let Ok(Some(entry)) = receive_next_record(iteration, &mut journal) { + trace!("Received a new entry"); + buf.push(entry.into()); + } + } + + if !buf.is_empty() { + let to_flush = mem::replace(&mut buf, Vec::new()); + flush(&mut token, to_flush, journal.cursor()?)?; + } + + trace!("Done outer iteration"); + } +} + +/// Writes the current cursor into `/var/journaldriver/cursor.pos`. To +/// avoid issues with journaldriver being terminated while the cursor +/// is still being written, this will first write the cursor into a +/// temporary file and then move it. +fn persist_cursor(cursor: String) -> Result<()> { + // This code exists to aid in tracking down if there are other + // causes of issue #2 than what has already been taken care of. + // + // One theory is that journald (or the Rust library to interface + // with it) may occasionally return empty cursor strings. If this + // is ever the case, we would like to know about it. + if cursor.is_empty() { + error!("Received empty journald cursor position, refusing to persist!"); + error!("Please report this message at https://github.com/tazjin/journaldriver/issues/2"); + return Ok(()); + } + + let mut file = File::create(&*CURSOR_TMP_FILE).context("Failed to create cursor file")?; + + write!(file, "{}", cursor).context("Failed to write cursor file")?; + + rename(&*CURSOR_TMP_FILE, &*CURSOR_FILE) + .context("Failed to move cursor file") + .map_err(Into::into) +} + +/// Flushes all drained records to Stackdriver. Any Stackdriver +/// message can at most contain 1000 log entries which means they are +/// chunked up here. +/// +/// In some cases large payloads seem to cause errors in Stackdriver - +/// the chunks are therefore made smaller here. +/// +/// If flushing is successful the last cursor position will be +/// persisted to disk. +fn flush(token: &mut Token, entries: Vec<LogEntry>, cursor: String) -> Result<()> { + if token.is_expired() { + debug!("Refreshing Google metadata access token"); + let new_token = get_token()?; + *token = new_token; + } + + for chunk in entries.chunks(750) { + let request = prepare_request(chunk); + if let Err(write_error) = write_entries(token, request) { + error!("Failed to write {} entries: {}", chunk.len(), write_error) + } else { + debug!("Wrote {} entries to Stackdriver", chunk.len()) + } + } + + persist_cursor(cursor) +} + +/// Convert a slice of log entries into the format expected by +/// Stackdriver. This format is documented here: +/// +/// https://cloud.google.com/logging/docs/reference/v2/rest/v2/entries/write +fn prepare_request(entries: &[LogEntry]) -> Value { + json!({ + "logName": format!("projects/{}/logs/{}", PROJECT_ID.as_str(), LOG_NAME.as_str()), + "resource": &*MONITORED_RESOURCE, + "entries": entries, + "partialSuccess": true + }) +} + +/// Perform the log entry insertion in Stackdriver Logging. +fn write_entries(token: &Token, request: Value) -> Result<()> { + let response = crimp::Request::post(ENTRIES_WRITE_URL) + .json(&request)? + .header("Authorization", format!("Bearer {}", token.token).as_str())? + // The timeout values are set relatively high, not because of + // an expectation of Stackdriver being slow but just to + // eventually force an error in case of network troubles. + // Presumably no request in a functioning environment will + // ever hit these limits. + .timeout(std::time::Duration::from_secs(5))? + .send()?; + + if !response.is_success() { + let status = response.status; + let body = response + .as_string() + .map(|r| r.body) + .unwrap_or_else(|_| "no valid response body".to_owned()); + + bail!("Writing to Stackdriver failed({}): {}", status, body); + } + + Ok(()) +} + +/// Attempt to read the initial cursor position from the configured +/// file. If there is no initial cursor position set, read from the +/// tail of the log. +/// +/// The only "acceptable" error when reading the cursor position is +/// the cursor position file not existing, other errors are fatal +/// because they indicate a misconfiguration of journaldriver. +fn initial_cursor() -> Result<JournalSeek> { + let read_result: io::Result<String> = (|| { + let mut contents = String::new(); + let mut file = File::open(&*CURSOR_FILE)?; + file.read_to_string(&mut contents)?; + Ok(contents.trim().into()) + })(); + + match read_result { + Ok(cursor) => Ok(JournalSeek::Cursor { cursor }), + Err(ref err) if err.kind() == ErrorKind::NotFound => { + info!("No previous cursor position, reading from journal tail"); + Ok(JournalSeek::Tail) + } + Err(err) => (Err(err).context("Could not read cursor position"))?, + } +} + +fn main() { + env_logger::init(); + + // The directory in which cursor positions are persisted should + // have been created: + if !CURSOR_DIR.exists() { + error!("Cursor directory at '{:?}' does not exist", *CURSOR_DIR); + process::exit(1); + } + + let cursor_position_dir = CURSOR_FILE + .parent() + .expect("Invalid cursor position file path"); + + fs::create_dir_all(cursor_position_dir) + .expect("Could not create directory to store cursor position in"); + + let mut journal = + Journal::open(JournalFiles::All, false, true).expect("Failed to open systemd journal"); + + let seek_position = initial_cursor().expect("Failed to determine initial cursor position"); + + match journal.seek(seek_position) { + Ok(cursor) => info!("Opened journal at cursor '{}'", cursor), + Err(err) => { + error!("Failed to set initial journal position: {}", err); + process::exit(1) + } + } + + receiver_loop(journal).expect("log receiver encountered an unexpected error"); +} diff --git a/ops/journaldriver/src/tests.rs b/ops/journaldriver/src/tests.rs new file mode 100644 index 000000000000..6f5045d6a5cd --- /dev/null +++ b/ops/journaldriver/src/tests.rs @@ -0,0 +1,131 @@ +use super::*; +use serde_json::to_string; +use time::macros::datetime; + +#[test] +fn test_text_entry_serialization() { + let entry = LogEntry { + labels: Value::Null, + timestamp: None, + payload: Payload::TextPayload { + text_payload: "test entry".into(), + }, + severity: None, + }; + + let expected = "{\"labels\":null,\"textPayload\":\"test entry\"}"; + let result = to_string(&entry).expect("serialization failed"); + + assert_eq!( + expected, result, + "Plain text payload should serialize correctly" + ) +} + +#[test] +fn test_timestamped_entry_serialization() { + let entry = LogEntry { + labels: Value::Null, + timestamp: Some(datetime!(1952-10-07 12:00:00 UTC)), + payload: Payload::TextPayload { + text_payload: "test entry".into(), + }, + severity: None, + }; + + let expected = + "{\"labels\":null,\"timestamp\":\"1952-10-07T12:00:00Z\",\"textPayload\":\"test entry\"}"; + let result = to_string(&entry).expect("serialization failed"); + + assert_eq!( + expected, result, + "Plain text payload should serialize correctly" + ) +} + +#[test] +fn test_json_entry_serialization() { + let entry = LogEntry { + labels: Value::Null, + timestamp: None, + payload: Payload::JsonPayload { + json_payload: json!({ + "message": "JSON test" + }), + }, + severity: None, + }; + + let expected = "{\"labels\":null,\"jsonPayload\":{\"message\":\"JSON test\"}}"; + let result = to_string(&entry).expect("serialization failed"); + + assert_eq!(expected, result, "JSON payload should serialize correctly") +} + +#[test] +fn test_plain_text_payload() { + let message = "plain text payload".into(); + let payload = message_to_payload(Some(message)); + let expected = Payload::TextPayload { + text_payload: "plain text payload".into(), + }; + + assert_eq!( + expected, payload, + "Plain text payload should be detected correctly" + ); +} + +#[test] +fn test_empty_payload() { + let payload = message_to_payload(None); + let expected = Payload::TextPayload { + text_payload: "empty log entry".into(), + }; + + assert_eq!( + expected, payload, + "Empty payload should be handled correctly" + ); +} + +#[test] +fn test_json_payload() { + let message = "{\"someKey\":\"someValue\", \"otherKey\": 42}".into(); + let payload = message_to_payload(Some(message)); + let expected = Payload::JsonPayload { + json_payload: json!({ + "someKey": "someValue", + "otherKey": 42 + }), + }; + + assert_eq!( + expected, payload, + "JSON payload should be detected correctly" + ); +} + +#[test] +fn test_json_no_object() { + // This message can be parsed as valid JSON, but it is not an + // object - it should be returned as a plain-text payload. + let message = "42".into(); + let payload = message_to_payload(Some(message)); + let expected = Payload::TextPayload { + text_payload: "42".into(), + }; + + assert_eq!( + expected, payload, + "Non-object JSON payload should be plain text" + ); +} + +#[test] +fn test_parse_microseconds() { + let input: String = "1529175149291187".into(); + let expected: time::OffsetDateTime = datetime!(2018-06-16 18:52:29.291187 UTC); + + assert_eq!(Some(expected), parse_microseconds(input)); +} |