about summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--src/main.rs161
1 files changed, 120 insertions, 41 deletions
diff --git a/src/main.rs b/src/main.rs
index 543ebf71485f..b96467f643dd 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -31,8 +31,6 @@
 //! Things left to do:
 //! * TODO 2018-06-15: Support non-GCP instances (see comment on
 //!   monitored resource descriptor)
-//! * TODO 2018-06-15: Extract timestamps from journald instead of
-//!   relying on ingestion timestamps.
 
 #[macro_use] extern crate failure;
 #[macro_use] extern crate hyper;
@@ -43,6 +41,7 @@
 
 extern crate chrono;
 extern crate env_logger;
+extern crate medallion;
 extern crate reqwest;
 extern crate serde;
 extern crate systemd;
@@ -64,6 +63,7 @@ use systemd::journal::*;
 #[cfg(test)]
 mod tests;
 
+const LOGGING_SERVICE: &str = "https://logging.googleapis.com/google.logging.v2.LoggingServiceV2";
 const ENTRIES_WRITE_URL: &str = "https://logging.googleapis.com/v2/entries:write";
 const METADATA_TOKEN_URL: &str = "http://metadata.google.internal/computeMetadata/v1/instance/service-accounts/default/token";
 const METADATA_ID_URL: &str = "http://metadata.google.internal/computeMetadata/v1/instance/id";
@@ -79,6 +79,19 @@ header! { (MetadataFlavor, "Metadata-Flavor") => [String] }
 /// Convenience type alias for results using failure's `Error` type.
 type Result<T> = std::result::Result<T, failure::Error>;
 
+/// Representation of static service account credentials for GCP.
+#[derive(Debug, Deserialize)]
+struct Credentials {
+    /// PEM encoded private key
+    private_key: String,
+
+    /// `kid` of this private key
+    private_key_id: String,
+
+    /// "email" address of the service account
+    client_email: String,
+}
+
 lazy_static! {
     /// HTTP client instance preconfigured with the metadata header
     /// required by Google.
@@ -102,6 +115,12 @@ lazy_static! {
     static ref ZONE: String = get_metadata(METADATA_ZONE_URL)
         .expect("Could not determine instance zone");
 
+    /// Service account credentials (if configured)
+    static ref SERVICE_ACCOUNT_CREDENTIALS: Option<Credentials> =
+        env::var("GOOGLE_APPLICATION_CREDENTIALS").ok()
+        .and_then(|path| File::open(path).ok())
+        .and_then(|file| serde_json::from_reader(file).ok());
+
     /// Descriptor of the currently monitored instance.
     ///
     /// For GCE instances, this will be the GCE instance ID. For
@@ -134,6 +153,103 @@ fn get_metadata(url: &str) -> Result<String> {
     Ok(output.trim().into())
 }
 
+/// Represents the response returned by the metadata server's token
+/// endpoint. The token is normally valid for an hour.
+#[derive(Deserialize)]
+struct TokenResponse {
+    expires_in: u64,
+    access_token: String,
+}
+
+/// Struct used to store a token together with a sensible
+/// representation of when it expires.
+struct Token {
+    token: String,
+    fetched_at: Instant,
+    expires: Duration,
+}
+
+impl Token {
+    /// Does this token need to be renewed?
+    fn is_expired(&self) -> bool {
+        self.fetched_at.elapsed() > self.expires
+    }
+}
+
+/// Retrieves a token from the GCP metadata service. Retrieving these
+/// tokens requires no additional authentication.
+fn get_metadata_token() -> Result<Token> {
+    let token: TokenResponse  = METADATA_CLIENT
+        .get(METADATA_TOKEN_URL)
+        .send()?.json()?;
+
+    debug!("Fetched new token from metadata service");
+
+    Ok(Token {
+        fetched_at: Instant::now(),
+        expires: Duration::from_secs(token.expires_in / 2),
+        token: token.access_token,
+    })
+}
+
+/// Signs a token using static client credentials configured for a
+/// service account. This service account must have been given the
+/// `Log Writer` role in Google Cloud IAM.
+///
+/// The process for creating and signing these tokens is described
+/// here:
+///
+/// https://developers.google.com/identity/protocols/OAuth2ServiceAccount#jwt-auth
+fn sign_service_account_token(credentials: &Credentials) -> Result<Token> {
+    use medallion::{Algorithm, Header, Payload};
+
+    let iat = Utc::now();
+    let exp = iat.checked_add_signed(chrono::Duration::seconds(3600))
+        .ok_or_else(|| format_err!("Failed to calculate token expiry"))?;
+
+    let header = Header {
+        alg: Algorithm::RS256,
+        headers: Some(json!({
+            "kid": credentials.private_key_id,
+        })),
+    };
+
+    let payload: Payload<()> = Payload {
+        iss: Some(credentials.client_email.clone()),
+        sub: Some(credentials.client_email.clone()),
+        aud: Some(LOGGING_SERVICE.to_string()),
+        iat: Some(iat.timestamp() as u64),
+        exp: Some(exp.timestamp() as u64),
+        ..Default::default()
+    };
+
+    let token = medallion::Token::new(header, payload)
+        .sign(credentials.private_key.as_bytes())
+        .context("Signing service account token failed")?;
+
+    debug!("Signed new service account token");
+
+    Ok(Token {
+        token,
+        fetched_at: Instant::now(),
+        expires: Duration::from_secs(3000),
+    })
+}
+
+/// Retrieve the authentication token either by using static client
+/// credentials, or by talking to the metadata server.
+///
+/// Which behaviour is used is controlled by the environment variable
+/// `GOOGLE_APPLICATION_CREDENTIALS`, which should be configured to
+/// point at a JSON private key file if service account authentication
+/// is to be used.
+fn get_token() -> Result<Token> {
+    if let Some(credentials) = SERVICE_ACCOUNT_CREDENTIALS.as_ref() {
+        sign_service_account_token(credentials)
+    } else {
+        get_metadata_token()
+    }
+}
 
 /// This structure represents the different types of payloads
 /// supported by journaldriver.
@@ -267,7 +383,7 @@ fn receive_next_record(timeout: Duration, journal: &mut Journal)
 /// buffer messages for half a second before flushing them to
 /// Stackdriver.
 fn receiver_loop(mut journal: Journal) -> Result<()> {
-    let mut token = get_metadata_token()?;
+    let mut token = get_token()?;
     let client = reqwest::Client::new();
 
     let mut buf: Vec<LogEntry> = Vec::new();
@@ -318,7 +434,7 @@ fn flush(client: &Client,
          cursor: String) -> Result<()> {
     if token.is_expired() {
         debug!("Refreshing Google metadata access token");
-        let new_token = get_metadata_token()?;
+        let new_token = get_token()?;
         mem::replace(token, new_token);
     }
 
@@ -334,43 +450,6 @@ fn flush(client: &Client,
     persist_cursor(cursor)
 }
 
-/// Represents the response returned by the metadata server's token
-/// endpoint. The token is normally valid for an hour.
-#[derive(Deserialize)]
-struct TokenResponse {
-    expires_in: u64,
-    access_token: String,
-}
-
-/// Struct used to store a token together with a sensible
-/// representation of when it expires.
-struct Token {
-    token: String,
-    fetched_at: Instant,
-    expires: Duration,
-}
-
-impl Token {
-    /// Does this token need to be renewed?
-    fn is_expired(&self) -> bool {
-        self.fetched_at.elapsed() > self.expires
-    }
-}
-
-fn get_metadata_token() -> Result<Token> {
-    let token: TokenResponse  = METADATA_CLIENT
-        .get(METADATA_TOKEN_URL)
-        .send()?.json()?;
-
-    debug!("Fetched new token from metadata service");
-
-    Ok(Token {
-        fetched_at: Instant::now(),
-        expires: Duration::from_secs(token.expires_in / 2),
-        token: token.access_token,
-    })
-}
-
 /// Convert a slice of log entries into the format expected by
 /// Stackdriver. This format is documented here:
 ///