about summary refs log tree commit diff
diff options
context:
space:
mode:
authorVincent Ambo <mail@tazj.in>2018-09-26T15·28+0200
committerVincent Ambo <mail@tazj.in>2018-09-26T15·28+0200
commit40caa5ffa23cdd482b7c97e74891fb41269e8076 (patch)
treecbf60593008b7a696edc4184949d61ed932336fe
parentb1e00ff0264fec4f3a5b87470980aebd94db81cf (diff)
feat(postgres): Implement Postgres-backed 'advance' function
Transactionally updates a state machine with an incoming event. Note
that this does not yet interpret actions.
-rw-r--r--finito-postgres/src/error.rs1
-rw-r--r--finito-postgres/src/lib.rs86
2 files changed, 73 insertions, 14 deletions
diff --git a/finito-postgres/src/error.rs b/finito-postgres/src/error.rs
index ccbf0c107e1b..0fb30a99dcd7 100644
--- a/finito-postgres/src/error.rs
+++ b/finito-postgres/src/error.rs
@@ -5,4 +5,5 @@ use std::result;
 
 pub type Result<T> = result::Result<T, Error>;
 
+#[derive(Debug)]
 pub enum Error { SomeError }
diff --git a/finito-postgres/src/lib.rs b/finito-postgres/src/lib.rs
index aee8cf9adf96..152592405534 100644
--- a/finito-postgres/src/lib.rs
+++ b/finito-postgres/src/lib.rs
@@ -91,6 +91,9 @@ struct ActionT {
     /// ID of the state machine belonging to this event.
     fsm_id: Uuid,
 
+    /// ID of the event that resulted in this action.
+    event_id: Uuid,
+
     /// Serialised content of the action.
     action: Value,
 
@@ -158,7 +161,9 @@ pub fn insert_machine<C, S>(conn: &C, initial: S) -> Result<MachineId<S>> where
 }
 
 /// Insert a single event into the database and return its UUID.
-pub fn insert_event<C, S>(conn: &C, fsm_id: MachineId<S>, event: S::Event) -> Result<Uuid>
+fn insert_event<C, S>(conn: &C,
+                      fsm_id: &MachineId<S>,
+                      event: &S::Event) -> Result<Uuid>
 where
     C: GenericConnection,
     S: FSM,
@@ -177,35 +182,70 @@ where
 }
 
 /// Insert a single action into the database and return its UUID.
-pub fn insert_action<C, S>(conn: &C,
-                           fsm_id: MachineId<S>,
-                           action: S::Action) -> Result<Uuid> where
+fn insert_action<C, S>(conn: &C,
+                       fsm_id: &MachineId<S>,
+                       event_id: Uuid,
+                       action: &S::Action) -> Result<Uuid> where
     C: GenericConnection,
     S: FSM,
     S::Action: Serialize {
     let query = r#"
-      INSERT INTO actions (id, created, fsm, fsm_id, action, status)
-      VALUES ($1, NOW(), $2, $3, $4, $5)
+      INSERT INTO actions (id, created, fsm, fsm_id, event_id, action, status)
+      VALUES ($1, NOW(), $2, $3, $4, $5, $6)
     "#;
 
     let id = Uuid::new_v4();
     let fsm = S::FSM_NAME.to_string();
     let action_value = serde_json::to_value(action).expect("TODO");
 
-    conn.execute(query, &[&id, &fsm, &fsm_id.to_uuid(), &action_value,
-                          &ActionStatus::Pending]).expect("TODO");
+    conn.execute(query, &[&id, &fsm, &fsm_id.to_uuid(), &event_id,
+                          &action_value, &ActionStatus::Pending]).expect("TODO");
     return Ok(id)
 }
 
-/// Retrieve the current state of a state machine from the database.
-pub fn get_machine<C, S>(conn: &C, id: MachineId<S>) -> Result<S> where
+/// Update the state of a specified machine.
+fn update_state<C, S>(conn: &C,
+                      fsm_id: &MachineId<S>,
+                      state: &S) -> Result<()> where
+    C: GenericConnection,
+    S: FSM + Serialize {
+    let query = r#"
+      UPDATE machines SET state = $1 WHERE id = $2
+    "#;
+
+    let state_value = serde_json::to_value(state).expect("TODO");
+    let res_count = conn.execute(query, &[&state_value, &fsm_id.to_uuid()])
+        .expect("TODO");
+
+    if res_count != 1 {
+        // TODO: not found error!
+        unimplemented!()
+    } else {
+        Ok(())
+    }
+}
+
+/// Retrieve the current state of a state machine from the database,
+/// optionally locking the machine state for the duration of some
+/// enclosing transaction.
+pub fn get_machine<C, S>(conn: &C,
+                         id: &MachineId<S>,
+                         for_update: bool) -> Result<S> where
     C: GenericConnection,
     S: FSM + DeserializeOwned {
     let query = r#"
       SELECT (id, created, fsm, state) FROM machines WHERE id = $1
     "#;
 
-    let rows = conn.query(query, &[&id.to_uuid()]).expect("TODO");
+    // If the machine is being fetched in the context of a
+    // transaction, with the intention to update it, the relevant
+    // clause needs to be appended:
+    let query = match for_update {
+        false => query.to_string(),
+        true  => format!("{} FOR UPDATE", query),
+    };
+
+    let rows = conn.query(&query, &[&id.to_uuid()]).expect("TODO");
     let mut machines = rows.into_iter().map(|row| MachineT {
         id: row.get(0),
         created: row.get(1),
@@ -233,9 +273,27 @@ pub fn get_machine<C, S>(conn: &C, id: MachineId<S>) -> Result<S> where
 /// processing is finished as running actions may result in additional
 /// transitions.
 pub fn advance<C, S>(conn: &C,
-                     id: MachineId<S>,
+                     id: &MachineId<S>,
                      event: S::Event) -> Result<S> where
     C: GenericConnection,
-    S: FSM + DeserializeOwned {
-    unimplemented!()
+    S: FSM + Serialize + DeserializeOwned,
+    S::Event: Serialize,
+    S::Action: Serialize {
+    let tx = conn.transaction().expect("TODO");
+    let state = get_machine(&tx, id, true).expect("TODO");
+
+    // Advancing the FSM consumes the event, so it is persisted first:
+    let event_id = insert_event(&tx, id, &event).expect("TODO");
+
+    // Core advancing logic is run:
+    let (new_state, actions) = finito::advance(state, event);
+
+    // Resulting actions are persisted (TODO: and interpreted)
+    for action in actions {
+        insert_action(&tx, id, event_id, &action).expect("TODO");
+    }
+
+    // And finally the state is updated:
+    update_state(&tx, id, &new_state).expect("TODO");
+    Ok(new_state)
 }