about summary refs log tree commit diff
path: root/users/tazjin/finito/finito-postgres/src/lib.rs
diff options
context:
space:
mode:
Diffstat (limited to 'users/tazjin/finito/finito-postgres/src/lib.rs')
-rw-r--r--users/tazjin/finito/finito-postgres/src/lib.rs191
1 files changed, 108 insertions, 83 deletions
diff --git a/users/tazjin/finito/finito-postgres/src/lib.rs b/users/tazjin/finito/finito-postgres/src/lib.rs
index ae147f751f88..ea63cc9dfdfd 100644
--- a/users/tazjin/finito/finito-postgres/src/lib.rs
+++ b/users/tazjin/finito/finito-postgres/src/lib.rs
@@ -4,8 +4,10 @@
 //!
 //! TODO: events & actions should have `SERIAL` keys
 
-#[macro_use] extern crate postgres;
-#[macro_use] extern crate postgres_derive;
+#[macro_use]
+extern crate postgres;
+#[macro_use]
+extern crate postgres_derive;
 
 extern crate chrono;
 extern crate finito;
@@ -14,23 +16,25 @@ extern crate serde;
 extern crate serde_json;
 extern crate uuid;
 
-#[cfg(test)] mod tests;
-#[cfg(test)] extern crate finito_door;
+#[cfg(test)]
+mod tests;
+#[cfg(test)]
+extern crate finito_door;
 
 mod error;
-pub use error::{Result, Error, ErrorKind};
+pub use error::{Error, ErrorKind, Result};
 
 use chrono::prelude::{DateTime, Utc};
 use error::ResultExt;
-use finito::{FSM, FSMBackend};
+use finito::{FSMBackend, FSM};
 use postgres::transaction::Transaction;
 use postgres::GenericConnection;
-use serde::Serialize;
+use r2d2_postgres::{r2d2, PostgresConnectionManager};
 use serde::de::DeserializeOwned;
+use serde::Serialize;
 use serde_json::Value;
 use std::marker::PhantomData;
 use uuid::Uuid;
-use r2d2_postgres::{r2d2, PostgresConnectionManager};
 
 type DBPool = r2d2::Pool<PostgresConnectionManager>;
 type DBConn = r2d2::PooledConnection<PostgresConnectionManager>;
@@ -112,15 +116,13 @@ pub struct FinitoPostgres<S> {
     db_pool: DBPool,
 }
 
-impl <S> FinitoPostgres<S> {
+impl<S> FinitoPostgres<S> {
     pub fn new(state: S, db_pool: DBPool, _pool_size: usize) -> Self {
-        FinitoPostgres {
-            state, db_pool,
-        }
+        FinitoPostgres { state, db_pool }
     }
 }
 
-impl <State: 'static> FSMBackend<State> for FinitoPostgres<State> {
+impl<State: 'static> FSMBackend<State> for FinitoPostgres<State> {
     type Key = Uuid;
     type Error = Error;
 
@@ -134,10 +136,11 @@ impl <State: 'static> FSMBackend<State> for FinitoPostgres<State> {
         let fsm = S::FSM_NAME.to_string();
         let state = serde_json::to_value(initial).context("failed to serialise FSM")?;
 
-        self.conn()?.execute(query, &[&id, &fsm, &state]).context("failed to insert FSM")?;
+        self.conn()?
+            .execute(query, &[&id, &fsm, &state])
+            .context("failed to insert FSM")?;
 
         return Ok(id);
-
     }
 
     fn get_machine<S: FSM + DeserializeOwned>(&self, key: Uuid) -> Result<S> {
@@ -156,10 +159,12 @@ impl <State: 'static> FSMBackend<State> for FinitoPostgres<State> {
     /// processing is finished as running actions may result in additional
     /// transitions.
     fn advance<'a, S>(&'a self, key: Uuid, event: S::Event) -> Result<S>
-    where S: FSM + Serialize + DeserializeOwned,
-          S::State: From<&'a State>,
-          S::Event: Serialize + DeserializeOwned,
-          S::Action: Serialize + DeserializeOwned {
+    where
+        S: FSM + Serialize + DeserializeOwned,
+        S::State: From<&'a State>,
+        S::Event: Serialize + DeserializeOwned,
+        S::Action: Serialize + DeserializeOwned,
+    {
         let conn = self.conn()?;
         let tx = conn.transaction().context("could not begin transaction")?;
         let state = get_machine_internal(&tx, key, true)?;
@@ -187,16 +192,18 @@ impl <State: 'static> FSMBackend<State> for FinitoPostgres<State> {
     }
 }
 
-impl <State: 'static> FinitoPostgres<State> {
+impl<State: 'static> FinitoPostgres<State> {
     /// Execute several actions at the same time, each in a separate
     /// thread. Note that actions returning further events, causing
     /// further transitions, returning further actions and so on will
     /// potentially cause multiple threads to get created.
-    fn run_actions<'a, S>(&'a self, fsm_id: Uuid, action_ids: Vec<Uuid>) where
+    fn run_actions<'a, S>(&'a self, fsm_id: Uuid, action_ids: Vec<Uuid>)
+    where
         S: FSM + Serialize + DeserializeOwned,
         S::Event: Serialize + DeserializeOwned,
         S::Action: Serialize + DeserializeOwned,
-        S::State: From<&'a State> {
+        S::State: From<&'a State>,
+    {
         let state: S::State = (&self.state).into();
         let conn = self.conn().expect("TODO");
 
@@ -214,17 +221,19 @@ impl <State: 'static> FinitoPostgres<State> {
 
     /// Retrieve a single connection from the database connection pool.
     fn conn(&self) -> Result<DBConn> {
-        self.db_pool.get().context("failed to retrieve connection from pool")
+        self.db_pool
+            .get()
+            .context("failed to retrieve connection from pool")
     }
 }
 
-
-
 /// Insert a single state-machine into the database and return its
 /// newly allocated, random UUID.
-pub fn insert_machine<C, S>(conn: &C, initial: S) -> Result<Uuid> where
+pub fn insert_machine<C, S>(conn: &C, initial: S) -> Result<Uuid>
+where
     C: GenericConnection,
-    S: FSM + Serialize {
+    S: FSM + Serialize,
+{
     let query = r#"
       INSERT INTO machines (id, fsm, state)
       VALUES ($1, $2, $3)
@@ -240,13 +249,12 @@ pub fn insert_machine<C, S>(conn: &C, initial: S) -> Result<Uuid> where
 }
 
 /// Insert a single event into the database and return its UUID.
-fn insert_event<C, S>(conn: &C,
-                      fsm_id: Uuid,
-                      event: &S::Event) -> Result<Uuid>
+fn insert_event<C, S>(conn: &C, fsm_id: Uuid, event: &S::Event) -> Result<Uuid>
 where
     C: GenericConnection,
     S: FSM,
-    S::Event: Serialize {
+    S::Event: Serialize,
+{
     let query = r#"
       INSERT INTO events (id, fsm, fsm_id, event)
       VALUES ($1, $2, $3, $4)
@@ -254,21 +262,19 @@ where
 
     let id = Uuid::new_v4();
     let fsm = S::FSM_NAME.to_string();
-    let event_value = serde_json::to_value(event)
-        .context("failed to serialize event")?;
+    let event_value = serde_json::to_value(event).context("failed to serialize event")?;
 
     conn.execute(query, &[&id, &fsm, &fsm_id, &event_value])?;
-    return Ok(id)
+    return Ok(id);
 }
 
 /// Insert a single action into the database and return its UUID.
-fn insert_action<C, S>(conn: &C,
-                       fsm_id: Uuid,
-                       event_id: Uuid,
-                       action: &S::Action) -> Result<Uuid> where
+fn insert_action<C, S>(conn: &C, fsm_id: Uuid, event_id: Uuid, action: &S::Action) -> Result<Uuid>
+where
     C: GenericConnection,
     S: FSM,
-    S::Action: Serialize {
+    S::Action: Serialize,
+{
     let query = r#"
       INSERT INTO actions (id, fsm, fsm_id, event_id, content, status)
       VALUES ($1, $2, $3, $4, $5, $6)
@@ -276,23 +282,26 @@ fn insert_action<C, S>(conn: &C,
 
     let id = Uuid::new_v4();
     let fsm = S::FSM_NAME.to_string();
-    let action_value = serde_json::to_value(action)
-        .context("failed to serialize action")?;
+    let action_value = serde_json::to_value(action).context("failed to serialize action")?;
 
-    conn.execute(
-        query,
-        &[&id, &fsm, &fsm_id, &event_id, &action_value, &ActionStatus::Pending]
-    )?;
+    conn.execute(query, &[
+        &id,
+        &fsm,
+        &fsm_id,
+        &event_id,
+        &action_value,
+        &ActionStatus::Pending,
+    ])?;
 
-    return Ok(id)
+    return Ok(id);
 }
 
 /// Update the state of a specified machine.
-fn update_state<C, S>(conn: &C,
-                      fsm_id: Uuid,
-                      state: &S) -> Result<()> where
+fn update_state<C, S>(conn: &C, fsm_id: Uuid, state: &S) -> Result<()>
+where
     C: GenericConnection,
-    S: FSM + Serialize {
+    S: FSM + Serialize,
+{
     let query = r#"
       UPDATE machines SET state = $1 WHERE id = $2
     "#;
@@ -312,23 +321,28 @@ fn update_state<C, S>(conn: &C,
 fn alter_for_update(alter: bool, query: &str) -> String {
     match alter {
         false => query.to_string(),
-        true  => format!("{} FOR UPDATE", query),
+        true => format!("{} FOR UPDATE", query),
     }
 }
 
 /// Retrieve the current state of a state machine from the database,
 /// optionally locking the machine state for the duration of some
 /// enclosing transaction.
-fn get_machine_internal<C, S>(conn: &C,
-                              id: Uuid,
-                              for_update: bool) -> Result<S> where
+fn get_machine_internal<C, S>(conn: &C, id: Uuid, for_update: bool) -> Result<S>
+where
     C: GenericConnection,
-    S: FSM + DeserializeOwned {
-    let query = alter_for_update(for_update, r#"
+    S: FSM + DeserializeOwned,
+{
+    let query = alter_for_update(
+        for_update,
+        r#"
       SELECT state FROM machines WHERE id = $1
-    "#);
+    "#,
+    );
 
-    let rows = conn.query(&query, &[&id]).context("failed to retrieve FSM")?;
+    let rows = conn
+        .query(&query, &[&id])
+        .context("failed to retrieve FSM")?;
 
     if let Some(row) = rows.into_iter().next() {
         Ok(serde_json::from_value(row.get(0)).context("failed to deserialize FSM")?)
@@ -339,20 +353,25 @@ fn get_machine_internal<C, S>(conn: &C,
 
 /// Retrieve an action from the database, optionally locking it for
 /// the duration of some enclosing transaction.
-fn get_action<C, S>(conn: &C, id: Uuid) -> Result<(ActionStatus, S::Action)> where
+fn get_action<C, S>(conn: &C, id: Uuid) -> Result<(ActionStatus, S::Action)>
+where
     C: GenericConnection,
     S: FSM,
-    S::Action: DeserializeOwned {
-    let query = alter_for_update(true, r#"
+    S::Action: DeserializeOwned,
+{
+    let query = alter_for_update(
+        true,
+        r#"
       SELECT status, content FROM actions
       WHERE id = $1 AND fsm = $2
-    "#);
+    "#,
+    );
 
     let rows = conn.query(&query, &[&id, &S::FSM_NAME])?;
 
     if let Some(row) = rows.into_iter().next() {
-        let action = serde_json::from_value(row.get(1))
-            .context("failed to deserialize FSM action")?;
+        let action =
+            serde_json::from_value(row.get(1)).context("failed to deserialize FSM action")?;
         Ok((row.get(0), action))
     } else {
         Err(ErrorKind::ActionNotFound(id).into())
@@ -360,13 +379,17 @@ fn get_action<C, S>(conn: &C, id: Uuid) -> Result<(ActionStatus, S::Action)> whe
 }
 
 /// Update the status of an action after an attempt to run it.
-fn update_action_status<C, S>(conn: &C,
-                              id: Uuid,
-                              status: ActionStatus,
-                              error: Option<String>,
-                              _fsm: PhantomData<S>) -> Result<()> where
+fn update_action_status<C, S>(
+    conn: &C,
+    id: Uuid,
+    status: ActionStatus,
+    error: Option<String>,
+    _fsm: PhantomData<S>,
+) -> Result<()>
+where
     C: GenericConnection,
-    S: FSM {
+    S: FSM,
+{
     let query = r#"
       UPDATE actions SET status = $1, error = $2
       WHERE id = $3 AND fsm = $4
@@ -389,10 +412,16 @@ fn update_action_status<C, S>(conn: &C,
 /// panic), the error will be persisted. Should it fail by panicking
 /// (which developers should never do explicitly in action
 /// interpreters) its status will not be changed.
-fn run_action<S>(tx: Transaction, id: Uuid, state: &S::State, _fsm: PhantomData<S>)
-                 -> Result<Vec<S::Event>> where
+fn run_action<S>(
+    tx: Transaction,
+    id: Uuid,
+    state: &S::State,
+    _fsm: PhantomData<S>,
+) -> Result<Vec<S::Event>>
+where
     S: FSM,
-    S::Action: DeserializeOwned {
+    S::Action: DeserializeOwned,
+{
     let (status, action) = get_action::<Transaction, S>(&tx, id)?;
 
     let result = match status {
@@ -401,29 +430,25 @@ fn run_action<S>(tx: Transaction, id: Uuid, state: &S::State, _fsm: PhantomData<
                 // If the action succeeded, update its status to
                 // completed and return the created events.
                 Ok(events) => {
-                    update_action_status(
-                        &tx, id, ActionStatus::Completed, None, PhantomData::<S>
-                    )?;
+                    update_action_status(&tx, id, ActionStatus::Completed, None, PhantomData::<S>)?;
                     events
-                },
+                }
 
                 // If the action failed, persist the debug message and
                 // return nothing.
                 Err(err) => {
                     let msg = Some(format!("{:?}", err));
-                    update_action_status(
-                        &tx, id, ActionStatus::Failed, msg, PhantomData::<S>
-                    )?;
+                    update_action_status(&tx, id, ActionStatus::Failed, msg, PhantomData::<S>)?;
                     vec![]
-                },
+                }
             }
-        },
+        }
 
         _ => {
             // TODO: Currently only pending actions are run because
             // retryable actions are not yet implemented.
             vec![]
-        },
+        }
     };
 
     tx.commit().context("failed to commit transaction")?;