diff options
author | Vincent Ambo <mail@tazj.in> | 2018-09-26T20·31+0200 |
---|---|---|
committer | Vincent Ambo <mail@tazj.in> | 2018-09-26T20·31+0200 |
commit | 406a90e8d696835f4b68e35f60f585af710c49c8 (patch) | |
tree | dbbdc092200d61038245d2f23d390ba0a2afecd7 /finito-postgres/src/lib.rs | |
parent | 3891ba84d5dbf335442c1c9e263823820f2a3327 (diff) |
feat(postgres): Implement initial (synchronous) actoin execution
Implements a simple model for executing actions which will run them in sequence, synchronously, after advancing an FSM and committing the initial transaction. Note that multiple things are not yet taken into account: * Error handling of actions (they can not currently fail) * Retrying of actions * Concurrency model I started out by implementing the concurrency model similarly to the green-threading method used in Hamingja (but using OS threads), but slowly noticed that it may not be the best way to do that. It needs a little bit of discussion. Either way for most actions this method is fast enough to work for implementing things on top of Finito's model.
Diffstat (limited to 'finito-postgres/src/lib.rs')
-rw-r--r-- | finito-postgres/src/lib.rs | 152 |
1 files changed, 127 insertions, 25 deletions
diff --git a/finito-postgres/src/lib.rs b/finito-postgres/src/lib.rs index 5ed9693bc9b3..af801314298f 100644 --- a/finito-postgres/src/lib.rs +++ b/finito-postgres/src/lib.rs @@ -22,6 +22,7 @@ pub use error::{Result, Error}; use chrono::prelude::{DateTime, Utc}; use finito::FSM; use postgres::GenericConnection; +use postgres::transaction::Transaction; use serde::Serialize; use serde::de::DeserializeOwned; use serde_json::Value; @@ -69,7 +70,7 @@ struct EventT { } /// This enum represents the possible statuses an action can be in. -#[derive(Debug, ToSql, FromSql)] +#[derive(Debug, PartialEq, ToSql, FromSql)] #[postgres(name = "actionstatus")] enum ActionStatus { /// The action was requested but has not run yet. @@ -234,6 +235,15 @@ fn update_state<C, S>(conn: &C, } } +/// Conditionally alter SQL statement to append locking clause inside +/// of a transaction. +fn alter_for_update(alter: bool, query: &str) -> String { + match alter { + false => query.to_string(), + true => format!("{} FOR UPDATE", query), + } +} + /// Retrieve the current state of a state machine from the database, /// optionally locking the machine state for the duration of some /// enclosing transaction. @@ -242,34 +252,66 @@ pub fn get_machine<C, S>(conn: &C, for_update: bool) -> Result<S> where C: GenericConnection, S: FSM + DeserializeOwned { - let query = r#" - SELECT id, created, fsm, state FROM machines WHERE id = $1 - "#; - - // If the machine is being fetched in the context of a - // transaction, with the intention to update it, the relevant - // clause needs to be appended: - let query = match for_update { - false => query.to_string(), - true => format!("{} FOR UPDATE", query), - }; + let query = alter_for_update(for_update, r#" + SELECT state FROM machines WHERE id = $1 + "#); let rows = conn.query(&query, &[&id.to_uuid()]).expect("TODO"); - let mut machines = rows.into_iter().map(|row| MachineT { - id: id.to_uuid(), - created: row.get(1), - fsm: row.get(2), - state: row.get(3), - }); - - if let Some(machine) = machines.next() { - Ok(serde_json::from_value(machine.state).expect("TODO")) + + if let Some(row) = rows.into_iter().next() { + Ok(serde_json::from_value(row.get(0)).expect("TODO")) } else { // TODO: return appropriate not found error Err(Error::SomeError) } } +/// Retrieve an action from the database, optionally locking it for +/// the duration of some enclosing transaction. +fn get_action<C, S>(conn: &C, id: Uuid) -> Result<(ActionStatus, S::Action)> where + C: GenericConnection, + S: FSM, + S::Action: DeserializeOwned { + let query = alter_for_update(true, r#" + SELECT status, content FROM actions + WHERE id = $1 AND fsm = $2 + "#); + + let rows = conn.query(&query, &[&id, &S::FSM_NAME]).expect("TODO"); + + if let Some(row) = rows.into_iter().next() { + let action = serde_json::from_value(row.get(1)).expect("TODO"); + Ok((row.get(0), action)) + } else { + // TODO: return appropriate not found error + Err(Error::SomeError) + } +} + +/// Update the status of an action after an attempt to run it. +fn update_action_status<C, S>(conn: &C, + id: Uuid, + status: ActionStatus, + error: Option<Value>, + _fsm: PhantomData<S>) -> Result<()> where + C: GenericConnection, + S: FSM { + let query = r#" + UPDATE actions SET status = $1, error = $2 + WHERE id = $3 AND fsm = $4 + "#; + + let result = conn.execute(&query, &[&status, &error, &id, &S::FSM_NAME]) + .expect("TODO"); + + if result != 1 { + // TODO: Fail in the most gruesome way! + unimplemented!() + } + + Ok(()) +} + /// Advance a persisted state machine by applying an event, and /// storing the event as well as all resulting actions. /// @@ -287,24 +329,84 @@ pub fn advance<C, S>(conn: &C, C: GenericConnection, S: FSM + Serialize + DeserializeOwned, S::Event: Serialize, - S::Action: Serialize { + S::Action: Serialize + DeserializeOwned { let tx = conn.transaction().expect("TODO"); - let state = get_machine(&tx, id, true).expect("TODO"); + let state = get_machine(&tx, id, true)?; // Advancing the FSM consumes the event, so it is persisted first: - let event_id = insert_event(&tx, id, &event).expect("TODO"); + let event_id = insert_event(&tx, id, &event)?; // Core advancing logic is run: let (new_state, actions) = finito::advance(state, event); // Resulting actions are persisted (TODO: and interpreted) + let mut action_ids = vec![]; for action in actions { - insert_action(&tx, id, event_id, &action).expect("TODO"); + let action_id = insert_action(&tx, id, event_id, &action)?; + action_ids.push(action_id); } // And finally the state is updated: update_state(&tx, id, &new_state).expect("TODO"); tx.commit().expect("TODO"); + run_actions(conn, id, action_ids); + Ok(new_state) } + +/// Execute a single action in case it is pending or retryable. Holds +/// a lock on the action's database row while performing the action +/// and writes back the status afterwards. +/// +/// Should the execution of an action fail cleanly (i.e. without a +/// panic), the error will be persisted. Should it fail by panicking +/// (which developers should never do explicitly in action +/// interpreters) its status will not be changed. +fn run_action<S>(tx: Transaction, id: Uuid, _fsm: PhantomData<S>) + -> Result<Vec<S::Event>> where + S: FSM, + S::Action: DeserializeOwned { + let (status, action) = get_action::<Transaction, S>(&tx, id)?; + + let result = match status { + ActionStatus::Pending => { + let events = <S as FSM>::act(action); + update_action_status( + &tx, id, ActionStatus::Completed, None, PhantomData::<S> + )?; + + events + }, + + _ => { + // TODO: Currently only pending actions are run because + // retryable actions are not yet implemented. + vec![] + }, + }; + + tx.commit().expect("TODO"); + Ok(result) +} + +/// Execute several actions at the same time, each in a separate +/// thread. Note that actions returning further events, causing +/// further transitions, returning further actions and so on will +/// potentially cause multiple threads to get created. +fn run_actions<C, S>(conn: &C, fsm_id: &MachineId<S>, action_ids: Vec<Uuid>) where + C: GenericConnection, + S: FSM + Serialize + DeserializeOwned, + S::Event: Serialize, + S::Action: Serialize + DeserializeOwned { + for action_id in action_ids { + let tx = conn.transaction().expect("TODO"); + + // TODO: Determine which concurrency setup we actually want. + if let Ok(events) = run_action(tx, action_id, PhantomData::<S>) { + for event in events { + advance(conn, fsm_id, event).expect("TODO"); + } + } + } +} |