diff options
author | Vincent Ambo <mail@tazj.in> | 2018-09-26T15·28+0200 |
---|---|---|
committer | Vincent Ambo <mail@tazj.in> | 2018-09-26T15·28+0200 |
commit | 40caa5ffa23cdd482b7c97e74891fb41269e8076 (patch) | |
tree | cbf60593008b7a696edc4184949d61ed932336fe | |
parent | b1e00ff0264fec4f3a5b87470980aebd94db81cf (diff) |
feat(postgres): Implement Postgres-backed 'advance' function
Transactionally updates a state machine with an incoming event. Note that this does not yet interpret actions.
-rw-r--r-- | finito-postgres/src/error.rs | 1 | ||||
-rw-r--r-- | finito-postgres/src/lib.rs | 86 |
2 files changed, 73 insertions, 14 deletions
diff --git a/finito-postgres/src/error.rs b/finito-postgres/src/error.rs index ccbf0c107e1b..0fb30a99dcd7 100644 --- a/finito-postgres/src/error.rs +++ b/finito-postgres/src/error.rs @@ -5,4 +5,5 @@ use std::result; pub type Result<T> = result::Result<T, Error>; +#[derive(Debug)] pub enum Error { SomeError } diff --git a/finito-postgres/src/lib.rs b/finito-postgres/src/lib.rs index aee8cf9adf96..152592405534 100644 --- a/finito-postgres/src/lib.rs +++ b/finito-postgres/src/lib.rs @@ -91,6 +91,9 @@ struct ActionT { /// ID of the state machine belonging to this event. fsm_id: Uuid, + /// ID of the event that resulted in this action. + event_id: Uuid, + /// Serialised content of the action. action: Value, @@ -158,7 +161,9 @@ pub fn insert_machine<C, S>(conn: &C, initial: S) -> Result<MachineId<S>> where } /// Insert a single event into the database and return its UUID. -pub fn insert_event<C, S>(conn: &C, fsm_id: MachineId<S>, event: S::Event) -> Result<Uuid> +fn insert_event<C, S>(conn: &C, + fsm_id: &MachineId<S>, + event: &S::Event) -> Result<Uuid> where C: GenericConnection, S: FSM, @@ -177,35 +182,70 @@ where } /// Insert a single action into the database and return its UUID. -pub fn insert_action<C, S>(conn: &C, - fsm_id: MachineId<S>, - action: S::Action) -> Result<Uuid> where +fn insert_action<C, S>(conn: &C, + fsm_id: &MachineId<S>, + event_id: Uuid, + action: &S::Action) -> Result<Uuid> where C: GenericConnection, S: FSM, S::Action: Serialize { let query = r#" - INSERT INTO actions (id, created, fsm, fsm_id, action, status) - VALUES ($1, NOW(), $2, $3, $4, $5) + INSERT INTO actions (id, created, fsm, fsm_id, event_id, action, status) + VALUES ($1, NOW(), $2, $3, $4, $5, $6) "#; let id = Uuid::new_v4(); let fsm = S::FSM_NAME.to_string(); let action_value = serde_json::to_value(action).expect("TODO"); - conn.execute(query, &[&id, &fsm, &fsm_id.to_uuid(), &action_value, - &ActionStatus::Pending]).expect("TODO"); + conn.execute(query, &[&id, &fsm, &fsm_id.to_uuid(), &event_id, + &action_value, &ActionStatus::Pending]).expect("TODO"); return Ok(id) } -/// Retrieve the current state of a state machine from the database. -pub fn get_machine<C, S>(conn: &C, id: MachineId<S>) -> Result<S> where +/// Update the state of a specified machine. +fn update_state<C, S>(conn: &C, + fsm_id: &MachineId<S>, + state: &S) -> Result<()> where + C: GenericConnection, + S: FSM + Serialize { + let query = r#" + UPDATE machines SET state = $1 WHERE id = $2 + "#; + + let state_value = serde_json::to_value(state).expect("TODO"); + let res_count = conn.execute(query, &[&state_value, &fsm_id.to_uuid()]) + .expect("TODO"); + + if res_count != 1 { + // TODO: not found error! + unimplemented!() + } else { + Ok(()) + } +} + +/// Retrieve the current state of a state machine from the database, +/// optionally locking the machine state for the duration of some +/// enclosing transaction. +pub fn get_machine<C, S>(conn: &C, + id: &MachineId<S>, + for_update: bool) -> Result<S> where C: GenericConnection, S: FSM + DeserializeOwned { let query = r#" SELECT (id, created, fsm, state) FROM machines WHERE id = $1 "#; - let rows = conn.query(query, &[&id.to_uuid()]).expect("TODO"); + // If the machine is being fetched in the context of a + // transaction, with the intention to update it, the relevant + // clause needs to be appended: + let query = match for_update { + false => query.to_string(), + true => format!("{} FOR UPDATE", query), + }; + + let rows = conn.query(&query, &[&id.to_uuid()]).expect("TODO"); let mut machines = rows.into_iter().map(|row| MachineT { id: row.get(0), created: row.get(1), @@ -233,9 +273,27 @@ pub fn get_machine<C, S>(conn: &C, id: MachineId<S>) -> Result<S> where /// processing is finished as running actions may result in additional /// transitions. pub fn advance<C, S>(conn: &C, - id: MachineId<S>, + id: &MachineId<S>, event: S::Event) -> Result<S> where C: GenericConnection, - S: FSM + DeserializeOwned { - unimplemented!() + S: FSM + Serialize + DeserializeOwned, + S::Event: Serialize, + S::Action: Serialize { + let tx = conn.transaction().expect("TODO"); + let state = get_machine(&tx, id, true).expect("TODO"); + + // Advancing the FSM consumes the event, so it is persisted first: + let event_id = insert_event(&tx, id, &event).expect("TODO"); + + // Core advancing logic is run: + let (new_state, actions) = finito::advance(state, event); + + // Resulting actions are persisted (TODO: and interpreted) + for action in actions { + insert_action(&tx, id, event_id, &action).expect("TODO"); + } + + // And finally the state is updated: + update_state(&tx, id, &new_state).expect("TODO"); + Ok(new_state) } |