diff options
Diffstat (limited to 'finito-postgres/src/lib.rs')
-rw-r--r-- | finito-postgres/src/lib.rs | 234 |
1 files changed, 111 insertions, 123 deletions
diff --git a/finito-postgres/src/lib.rs b/finito-postgres/src/lib.rs index ffb2532e55cc..46309a04c00a 100644 --- a/finito-postgres/src/lib.rs +++ b/finito-postgres/src/lib.rs @@ -20,35 +20,15 @@ mod error; pub use error::{Result, Error}; use chrono::prelude::{DateTime, Utc}; -use finito::FSM; -use postgres::GenericConnection; +use finito::{FSM, FSMBackend}; +use postgres::{Connection, GenericConnection}; use postgres::transaction::Transaction; use serde::Serialize; use serde::de::DeserializeOwned; use serde_json::Value; -use std::fmt; use std::marker::PhantomData; use uuid::Uuid; -/// This struct represents rows in the database table in which -/// machines (i.e. the current state of a Finito state machine) are -/// persisted. -#[derive(Debug, ToSql, FromSql)] -struct MachineT { - /// ID of the persisted state machine. - id: Uuid, - - /// Time at which the FSM was first created. - created: DateTime<Utc>, - - /// Name of the type of FSM represented by this state. - fsm: String, - - /// Current state of the FSM (TODO: Can the serialised FSM type be - /// used?) - state: Value, -} - /// This struct represents rows in the database table in which events /// are persisted. #[derive(Debug, ToSql, FromSql)] @@ -118,41 +98,110 @@ struct ActionT { // The following functions implement the public interface of // `finito-postgres`. -/// This type is used as a type-safe wrapper around the ID of a state -/// machine. It carries information about the FSM type and is intended -/// to add a layer of checking to prevent IDs from being mixed up. -#[derive(Clone)] -pub struct MachineId<S: FSM> { - uuid: Uuid, - phantom: PhantomData<S>, +/// TODO: Write docs for this type, brain does not want to do it right +/// now. +pub struct FinitoPostgres<S> { + state: S, + // TODO: Use connection pool? + conn: Connection, } -/// Custom debug implementation to format machine IDs using the name -/// of the FSM and their UUID. -impl <S: FSM> fmt::Debug for MachineId<S> { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - write!(f, "{}:{}", S::FSM_NAME, self.uuid.hyphenated()) +impl <State: 'static> FSMBackend<State> for FinitoPostgres<State> { + type Key = Uuid; + type Error = Error; + + fn insert_machine<S: FSM + Serialize>(&self, initial: S) -> Result<Uuid> { + let query = r#" + INSERT INTO machines (id, fsm, state) + VALUES ($1, $2, $3) + "#; + + let id = Uuid::new_v4(); + let fsm = S::FSM_NAME.to_string(); + let state = serde_json::to_value(initial).expect("TODO"); + + self.conn.execute(query, &[&id, &fsm, &state]).expect("TODO"); + + return Ok(id); + + } + + fn get_machine<S: FSM + DeserializeOwned>(&self, key: Uuid) -> Result<S> { + get_machine_internal(&self.conn, key, false) } -} -impl <S: FSM> MachineId<S> { - /// Convert a UUID into a strongly typed machine ID. - pub fn from_uuid(uuid: Uuid) -> Self { - MachineId { - uuid, - phantom: PhantomData, + /// Advance a persisted state machine by applying an event, and + /// storing the event as well as all resulting actions. + /// + /// This function holds a database-lock on the state's row while + /// advancing the machine. + /// + /// **Note**: This function returns the new state of the machine + /// immediately after applying the event, however this does not + /// necessarily equate to the state of the machine after all related + /// processing is finished as running actions may result in additional + /// transitions. + fn advance<'a, S>(&'a self, key: Uuid, event: S::Event) -> Result<S> + where S: FSM + Serialize + DeserializeOwned, + S::State: From<&'a State>, + S::Event: Serialize + DeserializeOwned, + S::Action: Serialize + DeserializeOwned { + let tx = self.conn.transaction().expect("TODO"); + let state = get_machine_internal(&tx, key, true)?; + + // Advancing the FSM consumes the event, so it is persisted first: + let event_id = insert_event::<_, S>(&tx, key, &event)?; + + // Core advancing logic is run: + let (new_state, actions) = finito::advance(state, event); + + // Resulting actions are persisted (TODO: and interpreted) + let mut action_ids = vec![]; + for action in actions { + let action_id = insert_action::<_, S>(&tx, key, event_id, &action)?; + action_ids.push(action_id); } + + // And finally the state is updated: + update_state(&tx, key, &new_state).expect("TODO"); + tx.commit().expect("TODO"); + + self.run_actions::<S>(key, action_ids); + + Ok(new_state) } +} - /// Return the UUID contained in a machine ID. - pub fn to_uuid(&self) -> Uuid { - self.uuid +impl <State: 'static> FinitoPostgres<State> { + /// Execute several actions at the same time, each in a separate + /// thread. Note that actions returning further events, causing + /// further transitions, returning further actions and so on will + /// potentially cause multiple threads to get created. + fn run_actions<'a, S>(&'a self, fsm_id: Uuid, action_ids: Vec<Uuid>) where + S: FSM + Serialize + DeserializeOwned, + S::Event: Serialize + DeserializeOwned, + S::Action: Serialize + DeserializeOwned, + S::State: From<&'a State> { + let state: S::State = (&self.state).into(); + + for action_id in action_ids { + let tx = self.conn.transaction().expect("TODO"); + + // TODO: Determine which concurrency setup we actually want. + if let Ok(events) = run_action(tx, action_id, &state, PhantomData::<S>) { + for event in events { + self.advance::<S>(fsm_id, event).expect("TODO"); + } + } + } } } + + /// Insert a single state-machine into the database and return its /// newly allocated, random UUID. -pub fn insert_machine<C, S>(conn: &C, initial: S) -> Result<MachineId<S>> where +pub fn insert_machine<C, S>(conn: &C, initial: S) -> Result<Uuid> where C: GenericConnection, S: FSM + Serialize { let query = r#" @@ -166,12 +215,12 @@ pub fn insert_machine<C, S>(conn: &C, initial: S) -> Result<MachineId<S>> where conn.execute(query, &[&id, &fsm, &state]).expect("TODO"); - return Ok(MachineId::from_uuid(id)); + return Ok(id); } /// Insert a single event into the database and return its UUID. fn insert_event<C, S>(conn: &C, - fsm_id: &MachineId<S>, + fsm_id: Uuid, event: &S::Event) -> Result<Uuid> where C: GenericConnection, @@ -186,13 +235,13 @@ where let fsm = S::FSM_NAME.to_string(); let event_value = serde_json::to_value(event).expect("TODO"); - conn.execute(query, &[&id, &fsm, &fsm_id.to_uuid(), &event_value]).expect("TODO"); + conn.execute(query, &[&id, &fsm, &fsm_id, &event_value]).expect("TODO"); return Ok(id) } /// Insert a single action into the database and return its UUID. fn insert_action<C, S>(conn: &C, - fsm_id: &MachineId<S>, + fsm_id: Uuid, event_id: Uuid, action: &S::Action) -> Result<Uuid> where C: GenericConnection, @@ -207,14 +256,17 @@ fn insert_action<C, S>(conn: &C, let fsm = S::FSM_NAME.to_string(); let action_value = serde_json::to_value(action).expect("TODO"); - conn.execute(query, &[&id, &fsm, &fsm_id.to_uuid(), &event_id, - &action_value, &ActionStatus::Pending]).expect("TODO"); + conn.execute( + query, + &[&id, &fsm, &fsm_id, &event_id, &action_value, &ActionStatus::Pending] + ).expect("TODO"); + return Ok(id) } /// Update the state of a specified machine. fn update_state<C, S>(conn: &C, - fsm_id: &MachineId<S>, + fsm_id: Uuid, state: &S) -> Result<()> where C: GenericConnection, S: FSM + Serialize { @@ -223,7 +275,7 @@ fn update_state<C, S>(conn: &C, "#; let state_value = serde_json::to_value(state).expect("TODO"); - let res_count = conn.execute(query, &[&state_value, &fsm_id.to_uuid()]) + let res_count = conn.execute(query, &[&state_value, &fsm_id]) .expect("TODO"); if res_count != 1 { @@ -246,16 +298,16 @@ fn alter_for_update(alter: bool, query: &str) -> String { /// Retrieve the current state of a state machine from the database, /// optionally locking the machine state for the duration of some /// enclosing transaction. -pub fn get_machine<C, S>(conn: &C, - id: &MachineId<S>, - for_update: bool) -> Result<S> where +fn get_machine_internal<C, S>(conn: &C, + id: Uuid, + for_update: bool) -> Result<S> where C: GenericConnection, S: FSM + DeserializeOwned { let query = alter_for_update(for_update, r#" SELECT state FROM machines WHERE id = $1 "#); - let rows = conn.query(&query, &[&id.to_uuid()]).expect("TODO"); + let rows = conn.query(&query, &[&id]).expect("TODO"); if let Some(row) = rows.into_iter().next() { Ok(serde_json::from_value(row.get(0)).expect("TODO")) @@ -311,49 +363,6 @@ fn update_action_status<C, S>(conn: &C, Ok(()) } -/// Advance a persisted state machine by applying an event, and -/// storing the event as well as all resulting actions. -/// -/// This function holds a database-lock on the state's row while -/// advancing the machine. -/// -/// **Note**: This function returns the new state of the machine -/// immediately after applying the event, however this does not -/// necessarily equate to the state of the machine after all related -/// processing is finished as running actions may result in additional -/// transitions. -pub fn advance<C, S>(conn: &C, - id: &MachineId<S>, - event: S::Event) -> Result<S> where - C: GenericConnection, - S: FSM + Serialize + DeserializeOwned, - S::Event: Serialize, - S::Action: Serialize + DeserializeOwned { - let tx = conn.transaction().expect("TODO"); - let state = get_machine(&tx, id, true)?; - - // Advancing the FSM consumes the event, so it is persisted first: - let event_id = insert_event(&tx, id, &event)?; - - // Core advancing logic is run: - let (new_state, actions) = finito::advance(state, event); - - // Resulting actions are persisted (TODO: and interpreted) - let mut action_ids = vec![]; - for action in actions { - let action_id = insert_action(&tx, id, event_id, &action)?; - action_ids.push(action_id); - } - - // And finally the state is updated: - update_state(&tx, id, &new_state).expect("TODO"); - tx.commit().expect("TODO"); - - run_actions(conn, id, action_ids); - - Ok(new_state) -} - /// Execute a single action in case it is pending or retryable. Holds /// a lock on the action's database row while performing the action /// and writes back the status afterwards. @@ -362,7 +371,7 @@ pub fn advance<C, S>(conn: &C, /// panic), the error will be persisted. Should it fail by panicking /// (which developers should never do explicitly in action /// interpreters) its status will not be changed. -fn run_action<S>(tx: Transaction, id: Uuid, _fsm: PhantomData<S>) +fn run_action<S>(tx: Transaction, id: Uuid, state: &S::State, _fsm: PhantomData<S>) -> Result<Vec<S::Event>> where S: FSM, S::Action: DeserializeOwned { @@ -370,7 +379,7 @@ fn run_action<S>(tx: Transaction, id: Uuid, _fsm: PhantomData<S>) let result = match status { ActionStatus::Pending => { - match <S as FSM>::act(action) { + match S::act(action, state) { // If the action succeeded, update its status to // completed and return the created events. Ok(events) => { @@ -402,24 +411,3 @@ fn run_action<S>(tx: Transaction, id: Uuid, _fsm: PhantomData<S>) tx.commit().expect("TODO"); Ok(result) } - -/// Execute several actions at the same time, each in a separate -/// thread. Note that actions returning further events, causing -/// further transitions, returning further actions and so on will -/// potentially cause multiple threads to get created. -fn run_actions<C, S>(conn: &C, fsm_id: &MachineId<S>, action_ids: Vec<Uuid>) where - C: GenericConnection, - S: FSM + Serialize + DeserializeOwned, - S::Event: Serialize, - S::Action: Serialize + DeserializeOwned { - for action_id in action_ids { - let tx = conn.transaction().expect("TODO"); - - // TODO: Determine which concurrency setup we actually want. - if let Ok(events) = run_action(tx, action_id, PhantomData::<S>) { - for event in events { - advance(conn, fsm_id, event).expect("TODO"); - } - } - } -} |