about summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--finito-postgres/src/lib.rs152
1 files changed, 127 insertions, 25 deletions
diff --git a/finito-postgres/src/lib.rs b/finito-postgres/src/lib.rs
index 5ed9693bc9b3..af801314298f 100644
--- a/finito-postgres/src/lib.rs
+++ b/finito-postgres/src/lib.rs
@@ -22,6 +22,7 @@ pub use error::{Result, Error};
 use chrono::prelude::{DateTime, Utc};
 use finito::FSM;
 use postgres::GenericConnection;
+use postgres::transaction::Transaction;
 use serde::Serialize;
 use serde::de::DeserializeOwned;
 use serde_json::Value;
@@ -69,7 +70,7 @@ struct EventT {
 }
 
 /// This enum represents the possible statuses an action can be in.
-#[derive(Debug, ToSql, FromSql)]
+#[derive(Debug, PartialEq, ToSql, FromSql)]
 #[postgres(name = "actionstatus")]
 enum ActionStatus {
     /// The action was requested but has not run yet.
@@ -234,6 +235,15 @@ fn update_state<C, S>(conn: &C,
     }
 }
 
+/// Conditionally alter SQL statement to append locking clause inside
+/// of a transaction.
+fn alter_for_update(alter: bool, query: &str) -> String {
+    match alter {
+        false => query.to_string(),
+        true  => format!("{} FOR UPDATE", query),
+    }
+}
+
 /// Retrieve the current state of a state machine from the database,
 /// optionally locking the machine state for the duration of some
 /// enclosing transaction.
@@ -242,34 +252,66 @@ pub fn get_machine<C, S>(conn: &C,
                          for_update: bool) -> Result<S> where
     C: GenericConnection,
     S: FSM + DeserializeOwned {
-    let query = r#"
-      SELECT id, created, fsm, state FROM machines WHERE id = $1
-    "#;
-
-    // If the machine is being fetched in the context of a
-    // transaction, with the intention to update it, the relevant
-    // clause needs to be appended:
-    let query = match for_update {
-        false => query.to_string(),
-        true  => format!("{} FOR UPDATE", query),
-    };
+    let query = alter_for_update(for_update, r#"
+      SELECT state FROM machines WHERE id = $1
+    "#);
 
     let rows = conn.query(&query, &[&id.to_uuid()]).expect("TODO");
-    let mut machines = rows.into_iter().map(|row| MachineT {
-        id: id.to_uuid(),
-        created: row.get(1),
-        fsm: row.get(2),
-        state: row.get(3),
-    });
-
-    if let Some(machine) = machines.next() {
-        Ok(serde_json::from_value(machine.state).expect("TODO"))
+
+    if let Some(row) = rows.into_iter().next() {
+        Ok(serde_json::from_value(row.get(0)).expect("TODO"))
     } else {
         // TODO: return appropriate not found error
         Err(Error::SomeError)
     }
 }
 
+/// Retrieve an action from the database, optionally locking it for
+/// the duration of some enclosing transaction.
+fn get_action<C, S>(conn: &C, id: Uuid) -> Result<(ActionStatus, S::Action)> where
+    C: GenericConnection,
+    S: FSM,
+    S::Action: DeserializeOwned {
+    let query = alter_for_update(true, r#"
+      SELECT status, content FROM actions
+      WHERE id = $1 AND fsm = $2
+    "#);
+
+    let rows = conn.query(&query, &[&id, &S::FSM_NAME]).expect("TODO");
+
+    if let Some(row) = rows.into_iter().next() {
+        let action = serde_json::from_value(row.get(1)).expect("TODO");
+        Ok((row.get(0), action))
+    } else {
+        // TODO: return appropriate not found error
+        Err(Error::SomeError)
+    }
+}
+
+/// Update the status of an action after an attempt to run it.
+fn update_action_status<C, S>(conn: &C,
+                              id: Uuid,
+                              status: ActionStatus,
+                              error: Option<Value>,
+                              _fsm: PhantomData<S>) -> Result<()> where
+    C: GenericConnection,
+    S: FSM {
+    let query = r#"
+      UPDATE actions SET status = $1, error = $2
+      WHERE id = $3 AND fsm = $4
+    "#;
+
+    let result = conn.execute(&query, &[&status, &error, &id, &S::FSM_NAME])
+        .expect("TODO");
+
+    if result != 1 {
+        // TODO: Fail in the most gruesome way!
+        unimplemented!()
+    }
+
+    Ok(())
+}
+
 /// Advance a persisted state machine by applying an event, and
 /// storing the event as well as all resulting actions.
 ///
@@ -287,24 +329,84 @@ pub fn advance<C, S>(conn: &C,
     C: GenericConnection,
     S: FSM + Serialize + DeserializeOwned,
     S::Event: Serialize,
-    S::Action: Serialize {
+    S::Action: Serialize + DeserializeOwned {
     let tx = conn.transaction().expect("TODO");
-    let state = get_machine(&tx, id, true).expect("TODO");
+    let state = get_machine(&tx, id, true)?;
 
     // Advancing the FSM consumes the event, so it is persisted first:
-    let event_id = insert_event(&tx, id, &event).expect("TODO");
+    let event_id = insert_event(&tx, id, &event)?;
 
     // Core advancing logic is run:
     let (new_state, actions) = finito::advance(state, event);
 
     // Resulting actions are persisted (TODO: and interpreted)
+    let mut action_ids = vec![];
     for action in actions {
-        insert_action(&tx, id, event_id, &action).expect("TODO");
+        let action_id = insert_action(&tx, id, event_id, &action)?;
+        action_ids.push(action_id);
     }
 
     // And finally the state is updated:
     update_state(&tx, id, &new_state).expect("TODO");
     tx.commit().expect("TODO");
 
+    run_actions(conn, id, action_ids);
+
     Ok(new_state)
 }
+
+/// Execute a single action in case it is pending or retryable. Holds
+/// a lock on the action's database row while performing the action
+/// and writes back the status afterwards.
+///
+/// Should the execution of an action fail cleanly (i.e. without a
+/// panic), the error will be persisted. Should it fail by panicking
+/// (which developers should never do explicitly in action
+/// interpreters) its status will not be changed.
+fn run_action<S>(tx: Transaction, id: Uuid, _fsm: PhantomData<S>)
+                 -> Result<Vec<S::Event>> where
+    S: FSM,
+    S::Action: DeserializeOwned {
+    let (status, action) = get_action::<Transaction, S>(&tx, id)?;
+
+    let result = match status {
+        ActionStatus::Pending => {
+            let events = <S as FSM>::act(action);
+            update_action_status(
+                &tx, id, ActionStatus::Completed, None, PhantomData::<S>
+            )?;
+
+            events
+        },
+
+        _ => {
+            // TODO: Currently only pending actions are run because
+            // retryable actions are not yet implemented.
+            vec![]
+        },
+    };
+
+    tx.commit().expect("TODO");
+    Ok(result)
+}
+
+/// Execute several actions at the same time, each in a separate
+/// thread. Note that actions returning further events, causing
+/// further transitions, returning further actions and so on will
+/// potentially cause multiple threads to get created.
+fn run_actions<C, S>(conn: &C, fsm_id: &MachineId<S>, action_ids: Vec<Uuid>) where
+    C: GenericConnection,
+    S: FSM + Serialize + DeserializeOwned,
+    S::Event: Serialize,
+    S::Action: Serialize + DeserializeOwned {
+    for action_id in action_ids {
+        let tx = conn.transaction().expect("TODO");
+
+        // TODO: Determine which concurrency setup we actually want.
+        if let Ok(events) = run_action(tx, action_id, PhantomData::<S>) {
+            for event in events {
+                advance(conn, fsm_id, event).expect("TODO");
+            }
+        }
+    }
+}