about summary refs log tree commit diff
path: root/users/tazjin/finito/finito-postgres
diff options
context:
space:
mode:
Diffstat (limited to 'users/tazjin/finito/finito-postgres')
-rw-r--r--users/tazjin/finito/finito-postgres/src/error.rs26
-rw-r--r--users/tazjin/finito/finito-postgres/src/lib.rs191
-rw-r--r--users/tazjin/finito/finito-postgres/src/tests.rs11
3 files changed, 127 insertions, 101 deletions
diff --git a/users/tazjin/finito/finito-postgres/src/error.rs b/users/tazjin/finito/finito-postgres/src/error.rs
index e130d18361f1..ed33775cd70e 100644
--- a/users/tazjin/finito/finito-postgres/src/error.rs
+++ b/users/tazjin/finito/finito-postgres/src/error.rs
@@ -1,10 +1,9 @@
 //! This module defines error types and conversions for issue that can
 //! occur while dealing with persisted state machines.
 
-use std::result;
-use std::fmt;
-use uuid::Uuid;
 use std::error::Error as StdError;
+use std::{fmt, result};
+use uuid::Uuid;
 
 // errors to chain:
 use postgres::Error as PgError;
@@ -41,20 +40,15 @@ impl fmt::Display for Error {
     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
         use ErrorKind::*;
         let msg = match &self.kind {
-            Serialization(err) =>
-                format!("JSON serialization error: {}", err),
+            Serialization(err) => format!("JSON serialization error: {}", err),
 
-            Database(err) =>
-                format!("PostgreSQL error: {}", err),
+            Database(err) => format!("PostgreSQL error: {}", err),
 
-            DBPool(err) =>
-                format!("Database connection pool error: {}", err),
+            DBPool(err) => format!("Database connection pool error: {}", err),
 
-            FSMNotFound(id) =>
-                format!("FSM with ID {} not found", id),
+            FSMNotFound(id) => format!("FSM with ID {} not found", id),
 
-            ActionNotFound(id) =>
-                format!("Action with ID {} not found", id),
+            ActionNotFound(id) => format!("Action with ID {} not found", id),
         };
 
         match &self.context {
@@ -66,7 +60,7 @@ impl fmt::Display for Error {
 
 impl StdError for Error {}
 
-impl <E: Into<ErrorKind>> From<E> for Error {
+impl<E: Into<ErrorKind>> From<E> for Error {
     fn from(err: E) -> Error {
         Error {
             kind: err.into(),
@@ -99,11 +93,11 @@ pub trait ResultExt<T> {
     fn context<C: fmt::Display>(self, ctx: C) -> Result<T>;
 }
 
-impl <T, E: Into<Error>> ResultExt<T> for result::Result<T, E> {
+impl<T, E: Into<Error>> ResultExt<T> for result::Result<T, E> {
     fn context<C: fmt::Display>(self, ctx: C) -> Result<T> {
         self.map_err(|err| Error {
             context: Some(format!("{}", ctx)),
-            .. err.into()
+            ..err.into()
         })
     }
 }
diff --git a/users/tazjin/finito/finito-postgres/src/lib.rs b/users/tazjin/finito/finito-postgres/src/lib.rs
index ae147f751f88..ea63cc9dfdfd 100644
--- a/users/tazjin/finito/finito-postgres/src/lib.rs
+++ b/users/tazjin/finito/finito-postgres/src/lib.rs
@@ -4,8 +4,10 @@
 //!
 //! TODO: events & actions should have `SERIAL` keys
 
-#[macro_use] extern crate postgres;
-#[macro_use] extern crate postgres_derive;
+#[macro_use]
+extern crate postgres;
+#[macro_use]
+extern crate postgres_derive;
 
 extern crate chrono;
 extern crate finito;
@@ -14,23 +16,25 @@ extern crate serde;
 extern crate serde_json;
 extern crate uuid;
 
-#[cfg(test)] mod tests;
-#[cfg(test)] extern crate finito_door;
+#[cfg(test)]
+mod tests;
+#[cfg(test)]
+extern crate finito_door;
 
 mod error;
-pub use error::{Result, Error, ErrorKind};
+pub use error::{Error, ErrorKind, Result};
 
 use chrono::prelude::{DateTime, Utc};
 use error::ResultExt;
-use finito::{FSM, FSMBackend};
+use finito::{FSMBackend, FSM};
 use postgres::transaction::Transaction;
 use postgres::GenericConnection;
-use serde::Serialize;
+use r2d2_postgres::{r2d2, PostgresConnectionManager};
 use serde::de::DeserializeOwned;
+use serde::Serialize;
 use serde_json::Value;
 use std::marker::PhantomData;
 use uuid::Uuid;
-use r2d2_postgres::{r2d2, PostgresConnectionManager};
 
 type DBPool = r2d2::Pool<PostgresConnectionManager>;
 type DBConn = r2d2::PooledConnection<PostgresConnectionManager>;
@@ -112,15 +116,13 @@ pub struct FinitoPostgres<S> {
     db_pool: DBPool,
 }
 
-impl <S> FinitoPostgres<S> {
+impl<S> FinitoPostgres<S> {
     pub fn new(state: S, db_pool: DBPool, _pool_size: usize) -> Self {
-        FinitoPostgres {
-            state, db_pool,
-        }
+        FinitoPostgres { state, db_pool }
     }
 }
 
-impl <State: 'static> FSMBackend<State> for FinitoPostgres<State> {
+impl<State: 'static> FSMBackend<State> for FinitoPostgres<State> {
     type Key = Uuid;
     type Error = Error;
 
@@ -134,10 +136,11 @@ impl <State: 'static> FSMBackend<State> for FinitoPostgres<State> {
         let fsm = S::FSM_NAME.to_string();
         let state = serde_json::to_value(initial).context("failed to serialise FSM")?;
 
-        self.conn()?.execute(query, &[&id, &fsm, &state]).context("failed to insert FSM")?;
+        self.conn()?
+            .execute(query, &[&id, &fsm, &state])
+            .context("failed to insert FSM")?;
 
         return Ok(id);
-
     }
 
     fn get_machine<S: FSM + DeserializeOwned>(&self, key: Uuid) -> Result<S> {
@@ -156,10 +159,12 @@ impl <State: 'static> FSMBackend<State> for FinitoPostgres<State> {
     /// processing is finished as running actions may result in additional
     /// transitions.
     fn advance<'a, S>(&'a self, key: Uuid, event: S::Event) -> Result<S>
-    where S: FSM + Serialize + DeserializeOwned,
-          S::State: From<&'a State>,
-          S::Event: Serialize + DeserializeOwned,
-          S::Action: Serialize + DeserializeOwned {
+    where
+        S: FSM + Serialize + DeserializeOwned,
+        S::State: From<&'a State>,
+        S::Event: Serialize + DeserializeOwned,
+        S::Action: Serialize + DeserializeOwned,
+    {
         let conn = self.conn()?;
         let tx = conn.transaction().context("could not begin transaction")?;
         let state = get_machine_internal(&tx, key, true)?;
@@ -187,16 +192,18 @@ impl <State: 'static> FSMBackend<State> for FinitoPostgres<State> {
     }
 }
 
-impl <State: 'static> FinitoPostgres<State> {
+impl<State: 'static> FinitoPostgres<State> {
     /// Execute several actions at the same time, each in a separate
     /// thread. Note that actions returning further events, causing
     /// further transitions, returning further actions and so on will
     /// potentially cause multiple threads to get created.
-    fn run_actions<'a, S>(&'a self, fsm_id: Uuid, action_ids: Vec<Uuid>) where
+    fn run_actions<'a, S>(&'a self, fsm_id: Uuid, action_ids: Vec<Uuid>)
+    where
         S: FSM + Serialize + DeserializeOwned,
         S::Event: Serialize + DeserializeOwned,
         S::Action: Serialize + DeserializeOwned,
-        S::State: From<&'a State> {
+        S::State: From<&'a State>,
+    {
         let state: S::State = (&self.state).into();
         let conn = self.conn().expect("TODO");
 
@@ -214,17 +221,19 @@ impl <State: 'static> FinitoPostgres<State> {
 
     /// Retrieve a single connection from the database connection pool.
     fn conn(&self) -> Result<DBConn> {
-        self.db_pool.get().context("failed to retrieve connection from pool")
+        self.db_pool
+            .get()
+            .context("failed to retrieve connection from pool")
     }
 }
 
-
-
 /// Insert a single state-machine into the database and return its
 /// newly allocated, random UUID.
-pub fn insert_machine<C, S>(conn: &C, initial: S) -> Result<Uuid> where
+pub fn insert_machine<C, S>(conn: &C, initial: S) -> Result<Uuid>
+where
     C: GenericConnection,
-    S: FSM + Serialize {
+    S: FSM + Serialize,
+{
     let query = r#"
       INSERT INTO machines (id, fsm, state)
       VALUES ($1, $2, $3)
@@ -240,13 +249,12 @@ pub fn insert_machine<C, S>(conn: &C, initial: S) -> Result<Uuid> where
 }
 
 /// Insert a single event into the database and return its UUID.
-fn insert_event<C, S>(conn: &C,
-                      fsm_id: Uuid,
-                      event: &S::Event) -> Result<Uuid>
+fn insert_event<C, S>(conn: &C, fsm_id: Uuid, event: &S::Event) -> Result<Uuid>
 where
     C: GenericConnection,
     S: FSM,
-    S::Event: Serialize {
+    S::Event: Serialize,
+{
     let query = r#"
       INSERT INTO events (id, fsm, fsm_id, event)
       VALUES ($1, $2, $3, $4)
@@ -254,21 +262,19 @@ where
 
     let id = Uuid::new_v4();
     let fsm = S::FSM_NAME.to_string();
-    let event_value = serde_json::to_value(event)
-        .context("failed to serialize event")?;
+    let event_value = serde_json::to_value(event).context("failed to serialize event")?;
 
     conn.execute(query, &[&id, &fsm, &fsm_id, &event_value])?;
-    return Ok(id)
+    return Ok(id);
 }
 
 /// Insert a single action into the database and return its UUID.
-fn insert_action<C, S>(conn: &C,
-                       fsm_id: Uuid,
-                       event_id: Uuid,
-                       action: &S::Action) -> Result<Uuid> where
+fn insert_action<C, S>(conn: &C, fsm_id: Uuid, event_id: Uuid, action: &S::Action) -> Result<Uuid>
+where
     C: GenericConnection,
     S: FSM,
-    S::Action: Serialize {
+    S::Action: Serialize,
+{
     let query = r#"
       INSERT INTO actions (id, fsm, fsm_id, event_id, content, status)
       VALUES ($1, $2, $3, $4, $5, $6)
@@ -276,23 +282,26 @@ fn insert_action<C, S>(conn: &C,
 
     let id = Uuid::new_v4();
     let fsm = S::FSM_NAME.to_string();
-    let action_value = serde_json::to_value(action)
-        .context("failed to serialize action")?;
+    let action_value = serde_json::to_value(action).context("failed to serialize action")?;
 
-    conn.execute(
-        query,
-        &[&id, &fsm, &fsm_id, &event_id, &action_value, &ActionStatus::Pending]
-    )?;
+    conn.execute(query, &[
+        &id,
+        &fsm,
+        &fsm_id,
+        &event_id,
+        &action_value,
+        &ActionStatus::Pending,
+    ])?;
 
-    return Ok(id)
+    return Ok(id);
 }
 
 /// Update the state of a specified machine.
-fn update_state<C, S>(conn: &C,
-                      fsm_id: Uuid,
-                      state: &S) -> Result<()> where
+fn update_state<C, S>(conn: &C, fsm_id: Uuid, state: &S) -> Result<()>
+where
     C: GenericConnection,
-    S: FSM + Serialize {
+    S: FSM + Serialize,
+{
     let query = r#"
       UPDATE machines SET state = $1 WHERE id = $2
     "#;
@@ -312,23 +321,28 @@ fn update_state<C, S>(conn: &C,
 fn alter_for_update(alter: bool, query: &str) -> String {
     match alter {
         false => query.to_string(),
-        true  => format!("{} FOR UPDATE", query),
+        true => format!("{} FOR UPDATE", query),
     }
 }
 
 /// Retrieve the current state of a state machine from the database,
 /// optionally locking the machine state for the duration of some
 /// enclosing transaction.
-fn get_machine_internal<C, S>(conn: &C,
-                              id: Uuid,
-                              for_update: bool) -> Result<S> where
+fn get_machine_internal<C, S>(conn: &C, id: Uuid, for_update: bool) -> Result<S>
+where
     C: GenericConnection,
-    S: FSM + DeserializeOwned {
-    let query = alter_for_update(for_update, r#"
+    S: FSM + DeserializeOwned,
+{
+    let query = alter_for_update(
+        for_update,
+        r#"
       SELECT state FROM machines WHERE id = $1
-    "#);
+    "#,
+    );
 
-    let rows = conn.query(&query, &[&id]).context("failed to retrieve FSM")?;
+    let rows = conn
+        .query(&query, &[&id])
+        .context("failed to retrieve FSM")?;
 
     if let Some(row) = rows.into_iter().next() {
         Ok(serde_json::from_value(row.get(0)).context("failed to deserialize FSM")?)
@@ -339,20 +353,25 @@ fn get_machine_internal<C, S>(conn: &C,
 
 /// Retrieve an action from the database, optionally locking it for
 /// the duration of some enclosing transaction.
-fn get_action<C, S>(conn: &C, id: Uuid) -> Result<(ActionStatus, S::Action)> where
+fn get_action<C, S>(conn: &C, id: Uuid) -> Result<(ActionStatus, S::Action)>
+where
     C: GenericConnection,
     S: FSM,
-    S::Action: DeserializeOwned {
-    let query = alter_for_update(true, r#"
+    S::Action: DeserializeOwned,
+{
+    let query = alter_for_update(
+        true,
+        r#"
       SELECT status, content FROM actions
       WHERE id = $1 AND fsm = $2
-    "#);
+    "#,
+    );
 
     let rows = conn.query(&query, &[&id, &S::FSM_NAME])?;
 
     if let Some(row) = rows.into_iter().next() {
-        let action = serde_json::from_value(row.get(1))
-            .context("failed to deserialize FSM action")?;
+        let action =
+            serde_json::from_value(row.get(1)).context("failed to deserialize FSM action")?;
         Ok((row.get(0), action))
     } else {
         Err(ErrorKind::ActionNotFound(id).into())
@@ -360,13 +379,17 @@ fn get_action<C, S>(conn: &C, id: Uuid) -> Result<(ActionStatus, S::Action)> whe
 }
 
 /// Update the status of an action after an attempt to run it.
-fn update_action_status<C, S>(conn: &C,
-                              id: Uuid,
-                              status: ActionStatus,
-                              error: Option<String>,
-                              _fsm: PhantomData<S>) -> Result<()> where
+fn update_action_status<C, S>(
+    conn: &C,
+    id: Uuid,
+    status: ActionStatus,
+    error: Option<String>,
+    _fsm: PhantomData<S>,
+) -> Result<()>
+where
     C: GenericConnection,
-    S: FSM {
+    S: FSM,
+{
     let query = r#"
       UPDATE actions SET status = $1, error = $2
       WHERE id = $3 AND fsm = $4
@@ -389,10 +412,16 @@ fn update_action_status<C, S>(conn: &C,
 /// panic), the error will be persisted. Should it fail by panicking
 /// (which developers should never do explicitly in action
 /// interpreters) its status will not be changed.
-fn run_action<S>(tx: Transaction, id: Uuid, state: &S::State, _fsm: PhantomData<S>)
-                 -> Result<Vec<S::Event>> where
+fn run_action<S>(
+    tx: Transaction,
+    id: Uuid,
+    state: &S::State,
+    _fsm: PhantomData<S>,
+) -> Result<Vec<S::Event>>
+where
     S: FSM,
-    S::Action: DeserializeOwned {
+    S::Action: DeserializeOwned,
+{
     let (status, action) = get_action::<Transaction, S>(&tx, id)?;
 
     let result = match status {
@@ -401,29 +430,25 @@ fn run_action<S>(tx: Transaction, id: Uuid, state: &S::State, _fsm: PhantomData<
                 // If the action succeeded, update its status to
                 // completed and return the created events.
                 Ok(events) => {
-                    update_action_status(
-                        &tx, id, ActionStatus::Completed, None, PhantomData::<S>
-                    )?;
+                    update_action_status(&tx, id, ActionStatus::Completed, None, PhantomData::<S>)?;
                     events
-                },
+                }
 
                 // If the action failed, persist the debug message and
                 // return nothing.
                 Err(err) => {
                     let msg = Some(format!("{:?}", err));
-                    update_action_status(
-                        &tx, id, ActionStatus::Failed, msg, PhantomData::<S>
-                    )?;
+                    update_action_status(&tx, id, ActionStatus::Failed, msg, PhantomData::<S>)?;
                     vec![]
-                },
+                }
             }
-        },
+        }
 
         _ => {
             // TODO: Currently only pending actions are run because
             // retryable actions are not yet implemented.
             vec![]
-        },
+        }
     };
 
     tx.commit().context("failed to commit transaction")?;
diff --git a/users/tazjin/finito/finito-postgres/src/tests.rs b/users/tazjin/finito/finito-postgres/src/tests.rs
index b1b5821be3c4..dd270c38759f 100644
--- a/users/tazjin/finito/finito-postgres/src/tests.rs
+++ b/users/tazjin/finito/finito-postgres/src/tests.rs
@@ -16,7 +16,11 @@ fn test_insert_machine() {
     let door = insert_machine(&conn, initial).expect("Failed to insert door");
     let result = get_machine(&conn, &door, false).expect("Failed to fetch door");
 
-    assert_eq!(result, DoorState::Opened, "Inserted door state should match");
+    assert_eq!(
+        result,
+        DoorState::Opened,
+        "Inserted door state should match"
+    );
 }
 
 #[test]
@@ -41,7 +45,10 @@ fn test_advance() {
     }
 
     let result = get_machine(&conn, &door, false).expect("Failed to fetch door");
-    let expected = DoorState::Locked { code: 4567, attempts: 2 };
+    let expected = DoorState::Locked {
+        code: 4567,
+        attempts: 2,
+    };
 
     assert_eq!(result, expected, "Advanced door state should match");
 }