about summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--finito-postgres/src/lib.rs234
1 files changed, 111 insertions, 123 deletions
diff --git a/finito-postgres/src/lib.rs b/finito-postgres/src/lib.rs
index ffb2532e55cc..46309a04c00a 100644
--- a/finito-postgres/src/lib.rs
+++ b/finito-postgres/src/lib.rs
@@ -20,35 +20,15 @@ mod error;
 pub use error::{Result, Error};
 
 use chrono::prelude::{DateTime, Utc};
-use finito::FSM;
-use postgres::GenericConnection;
+use finito::{FSM, FSMBackend};
+use postgres::{Connection, GenericConnection};
 use postgres::transaction::Transaction;
 use serde::Serialize;
 use serde::de::DeserializeOwned;
 use serde_json::Value;
-use std::fmt;
 use std::marker::PhantomData;
 use uuid::Uuid;
 
-/// This struct represents rows in the database table in which
-/// machines (i.e. the current state of a Finito state machine) are
-/// persisted.
-#[derive(Debug, ToSql, FromSql)]
-struct MachineT {
-    /// ID of the persisted state machine.
-    id: Uuid,
-
-    /// Time at which the FSM was first created.
-    created: DateTime<Utc>,
-
-    /// Name of the type of FSM represented by this state.
-    fsm: String,
-
-    /// Current state of the FSM (TODO: Can the serialised FSM type be
-    /// used?)
-    state: Value,
-}
-
 /// This struct represents rows in the database table in which events
 /// are persisted.
 #[derive(Debug, ToSql, FromSql)]
@@ -118,41 +98,110 @@ struct ActionT {
 // The following functions implement the public interface of
 // `finito-postgres`.
 
-/// This type is used as a type-safe wrapper around the ID of a state
-/// machine. It carries information about the FSM type and is intended
-/// to add a layer of checking to prevent IDs from being mixed up.
-#[derive(Clone)]
-pub struct MachineId<S: FSM> {
-    uuid: Uuid,
-    phantom: PhantomData<S>,
+/// TODO: Write docs for this type, brain does not want to do it right
+/// now.
+pub struct FinitoPostgres<S> {
+    state: S,
+    // TODO: Use connection pool?
+    conn: Connection,
 }
 
-/// Custom debug implementation to format machine IDs using the name
-/// of the FSM and their UUID.
-impl <S: FSM> fmt::Debug for MachineId<S> {
-    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
-        write!(f, "{}:{}", S::FSM_NAME, self.uuid.hyphenated())
+impl <State: 'static> FSMBackend<State> for FinitoPostgres<State> {
+    type Key = Uuid;
+    type Error = Error;
+
+    fn insert_machine<S: FSM + Serialize>(&self, initial: S) -> Result<Uuid> {
+        let query = r#"
+          INSERT INTO machines (id, fsm, state)
+          VALUES ($1, $2, $3)
+        "#;
+
+        let id = Uuid::new_v4();
+        let fsm = S::FSM_NAME.to_string();
+        let state = serde_json::to_value(initial).expect("TODO");
+
+        self.conn.execute(query, &[&id, &fsm, &state]).expect("TODO");
+
+        return Ok(id);
+
+    }
+
+    fn get_machine<S: FSM + DeserializeOwned>(&self, key: Uuid) -> Result<S> {
+        get_machine_internal(&self.conn, key, false)
     }
-}
 
-impl <S: FSM> MachineId<S> {
-    /// Convert a UUID into a strongly typed machine ID.
-    pub fn from_uuid(uuid: Uuid) -> Self {
-        MachineId {
-            uuid,
-            phantom: PhantomData,
+    /// Advance a persisted state machine by applying an event, and
+    /// storing the event as well as all resulting actions.
+    ///
+    /// This function holds a database-lock on the state's row while
+    /// advancing the machine.
+    ///
+    /// **Note**: This function returns the new state of the machine
+    /// immediately after applying the event, however this does not
+    /// necessarily equate to the state of the machine after all related
+    /// processing is finished as running actions may result in additional
+    /// transitions.
+    fn advance<'a, S>(&'a self, key: Uuid, event: S::Event) -> Result<S>
+    where S: FSM + Serialize + DeserializeOwned,
+          S::State: From<&'a State>,
+          S::Event: Serialize + DeserializeOwned,
+          S::Action: Serialize + DeserializeOwned {
+        let tx = self.conn.transaction().expect("TODO");
+        let state = get_machine_internal(&tx, key, true)?;
+
+        // Advancing the FSM consumes the event, so it is persisted first:
+        let event_id = insert_event::<_, S>(&tx, key, &event)?;
+
+        // Core advancing logic is run:
+        let (new_state, actions) = finito::advance(state, event);
+
+        // Resulting actions are persisted (TODO: and interpreted)
+        let mut action_ids = vec![];
+        for action in actions {
+            let action_id = insert_action::<_, S>(&tx, key, event_id, &action)?;
+            action_ids.push(action_id);
         }
+
+        // And finally the state is updated:
+        update_state(&tx, key, &new_state).expect("TODO");
+        tx.commit().expect("TODO");
+
+        self.run_actions::<S>(key, action_ids);
+
+        Ok(new_state)
     }
+}
 
-    /// Return the UUID contained in a machine ID.
-    pub fn to_uuid(&self) -> Uuid {
-        self.uuid
+impl <State: 'static> FinitoPostgres<State> {
+    /// Execute several actions at the same time, each in a separate
+    /// thread. Note that actions returning further events, causing
+    /// further transitions, returning further actions and so on will
+    /// potentially cause multiple threads to get created.
+    fn run_actions<'a, S>(&'a self, fsm_id: Uuid, action_ids: Vec<Uuid>) where
+        S: FSM + Serialize + DeserializeOwned,
+        S::Event: Serialize + DeserializeOwned,
+        S::Action: Serialize + DeserializeOwned,
+        S::State: From<&'a State> {
+        let state: S::State = (&self.state).into();
+
+        for action_id in action_ids {
+            let tx = self.conn.transaction().expect("TODO");
+
+            // TODO: Determine which concurrency setup we actually want.
+            if let Ok(events) = run_action(tx, action_id, &state, PhantomData::<S>) {
+                for event in events {
+                    self.advance::<S>(fsm_id, event).expect("TODO");
+                }
+            }
+        }
     }
 }
 
+
+
 /// Insert a single state-machine into the database and return its
 /// newly allocated, random UUID.
-pub fn insert_machine<C, S>(conn: &C, initial: S) -> Result<MachineId<S>> where
+pub fn insert_machine<C, S>(conn: &C, initial: S) -> Result<Uuid> where
     C: GenericConnection,
     S: FSM + Serialize {
     let query = r#"
@@ -166,12 +215,12 @@ pub fn insert_machine<C, S>(conn: &C, initial: S) -> Result<MachineId<S>> where
 
     conn.execute(query, &[&id, &fsm, &state]).expect("TODO");
 
-    return Ok(MachineId::from_uuid(id));
+    return Ok(id);
 }
 
 /// Insert a single event into the database and return its UUID.
 fn insert_event<C, S>(conn: &C,
-                      fsm_id: &MachineId<S>,
+                      fsm_id: Uuid,
                       event: &S::Event) -> Result<Uuid>
 where
     C: GenericConnection,
@@ -186,13 +235,13 @@ where
     let fsm = S::FSM_NAME.to_string();
     let event_value = serde_json::to_value(event).expect("TODO");
 
-    conn.execute(query, &[&id, &fsm, &fsm_id.to_uuid(), &event_value]).expect("TODO");
+    conn.execute(query, &[&id, &fsm, &fsm_id, &event_value]).expect("TODO");
     return Ok(id)
 }
 
 /// Insert a single action into the database and return its UUID.
 fn insert_action<C, S>(conn: &C,
-                       fsm_id: &MachineId<S>,
+                       fsm_id: Uuid,
                        event_id: Uuid,
                        action: &S::Action) -> Result<Uuid> where
     C: GenericConnection,
@@ -207,14 +256,17 @@ fn insert_action<C, S>(conn: &C,
     let fsm = S::FSM_NAME.to_string();
     let action_value = serde_json::to_value(action).expect("TODO");
 
-    conn.execute(query, &[&id, &fsm, &fsm_id.to_uuid(), &event_id,
-                          &action_value, &ActionStatus::Pending]).expect("TODO");
+    conn.execute(
+        query,
+        &[&id, &fsm, &fsm_id, &event_id, &action_value, &ActionStatus::Pending]
+    ).expect("TODO");
+
     return Ok(id)
 }
 
 /// Update the state of a specified machine.
 fn update_state<C, S>(conn: &C,
-                      fsm_id: &MachineId<S>,
+                      fsm_id: Uuid,
                       state: &S) -> Result<()> where
     C: GenericConnection,
     S: FSM + Serialize {
@@ -223,7 +275,7 @@ fn update_state<C, S>(conn: &C,
     "#;
 
     let state_value = serde_json::to_value(state).expect("TODO");
-    let res_count = conn.execute(query, &[&state_value, &fsm_id.to_uuid()])
+    let res_count = conn.execute(query, &[&state_value, &fsm_id])
         .expect("TODO");
 
     if res_count != 1 {
@@ -246,16 +298,16 @@ fn alter_for_update(alter: bool, query: &str) -> String {
 /// Retrieve the current state of a state machine from the database,
 /// optionally locking the machine state for the duration of some
 /// enclosing transaction.
-pub fn get_machine<C, S>(conn: &C,
-                         id: &MachineId<S>,
-                         for_update: bool) -> Result<S> where
+fn get_machine_internal<C, S>(conn: &C,
+                              id: Uuid,
+                              for_update: bool) -> Result<S> where
     C: GenericConnection,
     S: FSM + DeserializeOwned {
     let query = alter_for_update(for_update, r#"
       SELECT state FROM machines WHERE id = $1
     "#);
 
-    let rows = conn.query(&query, &[&id.to_uuid()]).expect("TODO");
+    let rows = conn.query(&query, &[&id]).expect("TODO");
 
     if let Some(row) = rows.into_iter().next() {
         Ok(serde_json::from_value(row.get(0)).expect("TODO"))
@@ -311,49 +363,6 @@ fn update_action_status<C, S>(conn: &C,
     Ok(())
 }
 
-/// Advance a persisted state machine by applying an event, and
-/// storing the event as well as all resulting actions.
-///
-/// This function holds a database-lock on the state's row while
-/// advancing the machine.
-///
-/// **Note**: This function returns the new state of the machine
-/// immediately after applying the event, however this does not
-/// necessarily equate to the state of the machine after all related
-/// processing is finished as running actions may result in additional
-/// transitions.
-pub fn advance<C, S>(conn: &C,
-                     id: &MachineId<S>,
-                     event: S::Event) -> Result<S> where
-    C: GenericConnection,
-    S: FSM + Serialize + DeserializeOwned,
-    S::Event: Serialize,
-    S::Action: Serialize + DeserializeOwned {
-    let tx = conn.transaction().expect("TODO");
-    let state = get_machine(&tx, id, true)?;
-
-    // Advancing the FSM consumes the event, so it is persisted first:
-    let event_id = insert_event(&tx, id, &event)?;
-
-    // Core advancing logic is run:
-    let (new_state, actions) = finito::advance(state, event);
-
-    // Resulting actions are persisted (TODO: and interpreted)
-    let mut action_ids = vec![];
-    for action in actions {
-        let action_id = insert_action(&tx, id, event_id, &action)?;
-        action_ids.push(action_id);
-    }
-
-    // And finally the state is updated:
-    update_state(&tx, id, &new_state).expect("TODO");
-    tx.commit().expect("TODO");
-
-    run_actions(conn, id, action_ids);
-
-    Ok(new_state)
-}
-
 /// Execute a single action in case it is pending or retryable. Holds
 /// a lock on the action's database row while performing the action
 /// and writes back the status afterwards.
@@ -362,7 +371,7 @@ pub fn advance<C, S>(conn: &C,
 /// panic), the error will be persisted. Should it fail by panicking
 /// (which developers should never do explicitly in action
 /// interpreters) its status will not be changed.
-fn run_action<S>(tx: Transaction, id: Uuid, _fsm: PhantomData<S>)
+fn run_action<S>(tx: Transaction, id: Uuid, state: &S::State, _fsm: PhantomData<S>)
                  -> Result<Vec<S::Event>> where
     S: FSM,
     S::Action: DeserializeOwned {
@@ -370,7 +379,7 @@ fn run_action<S>(tx: Transaction, id: Uuid, _fsm: PhantomData<S>)
 
     let result = match status {
         ActionStatus::Pending => {
-            match <S as FSM>::act(action) {
+            match S::act(action, state) {
                 // If the action succeeded, update its status to
                 // completed and return the created events.
                 Ok(events) => {
@@ -402,24 +411,3 @@ fn run_action<S>(tx: Transaction, id: Uuid, _fsm: PhantomData<S>)
     tx.commit().expect("TODO");
     Ok(result)
 }
-
-/// Execute several actions at the same time, each in a separate
-/// thread. Note that actions returning further events, causing
-/// further transitions, returning further actions and so on will
-/// potentially cause multiple threads to get created.
-fn run_actions<C, S>(conn: &C, fsm_id: &MachineId<S>, action_ids: Vec<Uuid>) where
-    C: GenericConnection,
-    S: FSM + Serialize + DeserializeOwned,
-    S::Event: Serialize,
-    S::Action: Serialize + DeserializeOwned {
-    for action_id in action_ids {
-        let tx = conn.transaction().expect("TODO");
-
-        // TODO: Determine which concurrency setup we actually want.
-        if let Ok(events) = run_action(tx, action_id, PhantomData::<S>) {
-            for event in events {
-                advance(conn, fsm_id, event).expect("TODO");
-            }
-        }
-    }
-}