diff options
Diffstat (limited to 'users/tazjin/finito/finito-postgres/src')
-rw-r--r-- | users/tazjin/finito/finito-postgres/src/error.rs | 26 | ||||
-rw-r--r-- | users/tazjin/finito/finito-postgres/src/lib.rs | 191 | ||||
-rw-r--r-- | users/tazjin/finito/finito-postgres/src/tests.rs | 11 |
3 files changed, 127 insertions, 101 deletions
diff --git a/users/tazjin/finito/finito-postgres/src/error.rs b/users/tazjin/finito/finito-postgres/src/error.rs index e130d18361f1..ed33775cd70e 100644 --- a/users/tazjin/finito/finito-postgres/src/error.rs +++ b/users/tazjin/finito/finito-postgres/src/error.rs @@ -1,10 +1,9 @@ //! This module defines error types and conversions for issue that can //! occur while dealing with persisted state machines. -use std::result; -use std::fmt; -use uuid::Uuid; use std::error::Error as StdError; +use std::{fmt, result}; +use uuid::Uuid; // errors to chain: use postgres::Error as PgError; @@ -41,20 +40,15 @@ impl fmt::Display for Error { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { use ErrorKind::*; let msg = match &self.kind { - Serialization(err) => - format!("JSON serialization error: {}", err), + Serialization(err) => format!("JSON serialization error: {}", err), - Database(err) => - format!("PostgreSQL error: {}", err), + Database(err) => format!("PostgreSQL error: {}", err), - DBPool(err) => - format!("Database connection pool error: {}", err), + DBPool(err) => format!("Database connection pool error: {}", err), - FSMNotFound(id) => - format!("FSM with ID {} not found", id), + FSMNotFound(id) => format!("FSM with ID {} not found", id), - ActionNotFound(id) => - format!("Action with ID {} not found", id), + ActionNotFound(id) => format!("Action with ID {} not found", id), }; match &self.context { @@ -66,7 +60,7 @@ impl fmt::Display for Error { impl StdError for Error {} -impl <E: Into<ErrorKind>> From<E> for Error { +impl<E: Into<ErrorKind>> From<E> for Error { fn from(err: E) -> Error { Error { kind: err.into(), @@ -99,11 +93,11 @@ pub trait ResultExt<T> { fn context<C: fmt::Display>(self, ctx: C) -> Result<T>; } -impl <T, E: Into<Error>> ResultExt<T> for result::Result<T, E> { +impl<T, E: Into<Error>> ResultExt<T> for result::Result<T, E> { fn context<C: fmt::Display>(self, ctx: C) -> Result<T> { self.map_err(|err| Error { context: Some(format!("{}", ctx)), - .. err.into() + ..err.into() }) } } diff --git a/users/tazjin/finito/finito-postgres/src/lib.rs b/users/tazjin/finito/finito-postgres/src/lib.rs index ae147f751f88..ea63cc9dfdfd 100644 --- a/users/tazjin/finito/finito-postgres/src/lib.rs +++ b/users/tazjin/finito/finito-postgres/src/lib.rs @@ -4,8 +4,10 @@ //! //! TODO: events & actions should have `SERIAL` keys -#[macro_use] extern crate postgres; -#[macro_use] extern crate postgres_derive; +#[macro_use] +extern crate postgres; +#[macro_use] +extern crate postgres_derive; extern crate chrono; extern crate finito; @@ -14,23 +16,25 @@ extern crate serde; extern crate serde_json; extern crate uuid; -#[cfg(test)] mod tests; -#[cfg(test)] extern crate finito_door; +#[cfg(test)] +mod tests; +#[cfg(test)] +extern crate finito_door; mod error; -pub use error::{Result, Error, ErrorKind}; +pub use error::{Error, ErrorKind, Result}; use chrono::prelude::{DateTime, Utc}; use error::ResultExt; -use finito::{FSM, FSMBackend}; +use finito::{FSMBackend, FSM}; use postgres::transaction::Transaction; use postgres::GenericConnection; -use serde::Serialize; +use r2d2_postgres::{r2d2, PostgresConnectionManager}; use serde::de::DeserializeOwned; +use serde::Serialize; use serde_json::Value; use std::marker::PhantomData; use uuid::Uuid; -use r2d2_postgres::{r2d2, PostgresConnectionManager}; type DBPool = r2d2::Pool<PostgresConnectionManager>; type DBConn = r2d2::PooledConnection<PostgresConnectionManager>; @@ -112,15 +116,13 @@ pub struct FinitoPostgres<S> { db_pool: DBPool, } -impl <S> FinitoPostgres<S> { +impl<S> FinitoPostgres<S> { pub fn new(state: S, db_pool: DBPool, _pool_size: usize) -> Self { - FinitoPostgres { - state, db_pool, - } + FinitoPostgres { state, db_pool } } } -impl <State: 'static> FSMBackend<State> for FinitoPostgres<State> { +impl<State: 'static> FSMBackend<State> for FinitoPostgres<State> { type Key = Uuid; type Error = Error; @@ -134,10 +136,11 @@ impl <State: 'static> FSMBackend<State> for FinitoPostgres<State> { let fsm = S::FSM_NAME.to_string(); let state = serde_json::to_value(initial).context("failed to serialise FSM")?; - self.conn()?.execute(query, &[&id, &fsm, &state]).context("failed to insert FSM")?; + self.conn()? + .execute(query, &[&id, &fsm, &state]) + .context("failed to insert FSM")?; return Ok(id); - } fn get_machine<S: FSM + DeserializeOwned>(&self, key: Uuid) -> Result<S> { @@ -156,10 +159,12 @@ impl <State: 'static> FSMBackend<State> for FinitoPostgres<State> { /// processing is finished as running actions may result in additional /// transitions. fn advance<'a, S>(&'a self, key: Uuid, event: S::Event) -> Result<S> - where S: FSM + Serialize + DeserializeOwned, - S::State: From<&'a State>, - S::Event: Serialize + DeserializeOwned, - S::Action: Serialize + DeserializeOwned { + where + S: FSM + Serialize + DeserializeOwned, + S::State: From<&'a State>, + S::Event: Serialize + DeserializeOwned, + S::Action: Serialize + DeserializeOwned, + { let conn = self.conn()?; let tx = conn.transaction().context("could not begin transaction")?; let state = get_machine_internal(&tx, key, true)?; @@ -187,16 +192,18 @@ impl <State: 'static> FSMBackend<State> for FinitoPostgres<State> { } } -impl <State: 'static> FinitoPostgres<State> { +impl<State: 'static> FinitoPostgres<State> { /// Execute several actions at the same time, each in a separate /// thread. Note that actions returning further events, causing /// further transitions, returning further actions and so on will /// potentially cause multiple threads to get created. - fn run_actions<'a, S>(&'a self, fsm_id: Uuid, action_ids: Vec<Uuid>) where + fn run_actions<'a, S>(&'a self, fsm_id: Uuid, action_ids: Vec<Uuid>) + where S: FSM + Serialize + DeserializeOwned, S::Event: Serialize + DeserializeOwned, S::Action: Serialize + DeserializeOwned, - S::State: From<&'a State> { + S::State: From<&'a State>, + { let state: S::State = (&self.state).into(); let conn = self.conn().expect("TODO"); @@ -214,17 +221,19 @@ impl <State: 'static> FinitoPostgres<State> { /// Retrieve a single connection from the database connection pool. fn conn(&self) -> Result<DBConn> { - self.db_pool.get().context("failed to retrieve connection from pool") + self.db_pool + .get() + .context("failed to retrieve connection from pool") } } - - /// Insert a single state-machine into the database and return its /// newly allocated, random UUID. -pub fn insert_machine<C, S>(conn: &C, initial: S) -> Result<Uuid> where +pub fn insert_machine<C, S>(conn: &C, initial: S) -> Result<Uuid> +where C: GenericConnection, - S: FSM + Serialize { + S: FSM + Serialize, +{ let query = r#" INSERT INTO machines (id, fsm, state) VALUES ($1, $2, $3) @@ -240,13 +249,12 @@ pub fn insert_machine<C, S>(conn: &C, initial: S) -> Result<Uuid> where } /// Insert a single event into the database and return its UUID. -fn insert_event<C, S>(conn: &C, - fsm_id: Uuid, - event: &S::Event) -> Result<Uuid> +fn insert_event<C, S>(conn: &C, fsm_id: Uuid, event: &S::Event) -> Result<Uuid> where C: GenericConnection, S: FSM, - S::Event: Serialize { + S::Event: Serialize, +{ let query = r#" INSERT INTO events (id, fsm, fsm_id, event) VALUES ($1, $2, $3, $4) @@ -254,21 +262,19 @@ where let id = Uuid::new_v4(); let fsm = S::FSM_NAME.to_string(); - let event_value = serde_json::to_value(event) - .context("failed to serialize event")?; + let event_value = serde_json::to_value(event).context("failed to serialize event")?; conn.execute(query, &[&id, &fsm, &fsm_id, &event_value])?; - return Ok(id) + return Ok(id); } /// Insert a single action into the database and return its UUID. -fn insert_action<C, S>(conn: &C, - fsm_id: Uuid, - event_id: Uuid, - action: &S::Action) -> Result<Uuid> where +fn insert_action<C, S>(conn: &C, fsm_id: Uuid, event_id: Uuid, action: &S::Action) -> Result<Uuid> +where C: GenericConnection, S: FSM, - S::Action: Serialize { + S::Action: Serialize, +{ let query = r#" INSERT INTO actions (id, fsm, fsm_id, event_id, content, status) VALUES ($1, $2, $3, $4, $5, $6) @@ -276,23 +282,26 @@ fn insert_action<C, S>(conn: &C, let id = Uuid::new_v4(); let fsm = S::FSM_NAME.to_string(); - let action_value = serde_json::to_value(action) - .context("failed to serialize action")?; + let action_value = serde_json::to_value(action).context("failed to serialize action")?; - conn.execute( - query, - &[&id, &fsm, &fsm_id, &event_id, &action_value, &ActionStatus::Pending] - )?; + conn.execute(query, &[ + &id, + &fsm, + &fsm_id, + &event_id, + &action_value, + &ActionStatus::Pending, + ])?; - return Ok(id) + return Ok(id); } /// Update the state of a specified machine. -fn update_state<C, S>(conn: &C, - fsm_id: Uuid, - state: &S) -> Result<()> where +fn update_state<C, S>(conn: &C, fsm_id: Uuid, state: &S) -> Result<()> +where C: GenericConnection, - S: FSM + Serialize { + S: FSM + Serialize, +{ let query = r#" UPDATE machines SET state = $1 WHERE id = $2 "#; @@ -312,23 +321,28 @@ fn update_state<C, S>(conn: &C, fn alter_for_update(alter: bool, query: &str) -> String { match alter { false => query.to_string(), - true => format!("{} FOR UPDATE", query), + true => format!("{} FOR UPDATE", query), } } /// Retrieve the current state of a state machine from the database, /// optionally locking the machine state for the duration of some /// enclosing transaction. -fn get_machine_internal<C, S>(conn: &C, - id: Uuid, - for_update: bool) -> Result<S> where +fn get_machine_internal<C, S>(conn: &C, id: Uuid, for_update: bool) -> Result<S> +where C: GenericConnection, - S: FSM + DeserializeOwned { - let query = alter_for_update(for_update, r#" + S: FSM + DeserializeOwned, +{ + let query = alter_for_update( + for_update, + r#" SELECT state FROM machines WHERE id = $1 - "#); + "#, + ); - let rows = conn.query(&query, &[&id]).context("failed to retrieve FSM")?; + let rows = conn + .query(&query, &[&id]) + .context("failed to retrieve FSM")?; if let Some(row) = rows.into_iter().next() { Ok(serde_json::from_value(row.get(0)).context("failed to deserialize FSM")?) @@ -339,20 +353,25 @@ fn get_machine_internal<C, S>(conn: &C, /// Retrieve an action from the database, optionally locking it for /// the duration of some enclosing transaction. -fn get_action<C, S>(conn: &C, id: Uuid) -> Result<(ActionStatus, S::Action)> where +fn get_action<C, S>(conn: &C, id: Uuid) -> Result<(ActionStatus, S::Action)> +where C: GenericConnection, S: FSM, - S::Action: DeserializeOwned { - let query = alter_for_update(true, r#" + S::Action: DeserializeOwned, +{ + let query = alter_for_update( + true, + r#" SELECT status, content FROM actions WHERE id = $1 AND fsm = $2 - "#); + "#, + ); let rows = conn.query(&query, &[&id, &S::FSM_NAME])?; if let Some(row) = rows.into_iter().next() { - let action = serde_json::from_value(row.get(1)) - .context("failed to deserialize FSM action")?; + let action = + serde_json::from_value(row.get(1)).context("failed to deserialize FSM action")?; Ok((row.get(0), action)) } else { Err(ErrorKind::ActionNotFound(id).into()) @@ -360,13 +379,17 @@ fn get_action<C, S>(conn: &C, id: Uuid) -> Result<(ActionStatus, S::Action)> whe } /// Update the status of an action after an attempt to run it. -fn update_action_status<C, S>(conn: &C, - id: Uuid, - status: ActionStatus, - error: Option<String>, - _fsm: PhantomData<S>) -> Result<()> where +fn update_action_status<C, S>( + conn: &C, + id: Uuid, + status: ActionStatus, + error: Option<String>, + _fsm: PhantomData<S>, +) -> Result<()> +where C: GenericConnection, - S: FSM { + S: FSM, +{ let query = r#" UPDATE actions SET status = $1, error = $2 WHERE id = $3 AND fsm = $4 @@ -389,10 +412,16 @@ fn update_action_status<C, S>(conn: &C, /// panic), the error will be persisted. Should it fail by panicking /// (which developers should never do explicitly in action /// interpreters) its status will not be changed. -fn run_action<S>(tx: Transaction, id: Uuid, state: &S::State, _fsm: PhantomData<S>) - -> Result<Vec<S::Event>> where +fn run_action<S>( + tx: Transaction, + id: Uuid, + state: &S::State, + _fsm: PhantomData<S>, +) -> Result<Vec<S::Event>> +where S: FSM, - S::Action: DeserializeOwned { + S::Action: DeserializeOwned, +{ let (status, action) = get_action::<Transaction, S>(&tx, id)?; let result = match status { @@ -401,29 +430,25 @@ fn run_action<S>(tx: Transaction, id: Uuid, state: &S::State, _fsm: PhantomData< // If the action succeeded, update its status to // completed and return the created events. Ok(events) => { - update_action_status( - &tx, id, ActionStatus::Completed, None, PhantomData::<S> - )?; + update_action_status(&tx, id, ActionStatus::Completed, None, PhantomData::<S>)?; events - }, + } // If the action failed, persist the debug message and // return nothing. Err(err) => { let msg = Some(format!("{:?}", err)); - update_action_status( - &tx, id, ActionStatus::Failed, msg, PhantomData::<S> - )?; + update_action_status(&tx, id, ActionStatus::Failed, msg, PhantomData::<S>)?; vec![] - }, + } } - }, + } _ => { // TODO: Currently only pending actions are run because // retryable actions are not yet implemented. vec![] - }, + } }; tx.commit().context("failed to commit transaction")?; diff --git a/users/tazjin/finito/finito-postgres/src/tests.rs b/users/tazjin/finito/finito-postgres/src/tests.rs index b1b5821be3c4..dd270c38759f 100644 --- a/users/tazjin/finito/finito-postgres/src/tests.rs +++ b/users/tazjin/finito/finito-postgres/src/tests.rs @@ -16,7 +16,11 @@ fn test_insert_machine() { let door = insert_machine(&conn, initial).expect("Failed to insert door"); let result = get_machine(&conn, &door, false).expect("Failed to fetch door"); - assert_eq!(result, DoorState::Opened, "Inserted door state should match"); + assert_eq!( + result, + DoorState::Opened, + "Inserted door state should match" + ); } #[test] @@ -41,7 +45,10 @@ fn test_advance() { } let result = get_machine(&conn, &door, false).expect("Failed to fetch door"); - let expected = DoorState::Locked { code: 4567, attempts: 2 }; + let expected = DoorState::Locked { + code: 4567, + attempts: 2, + }; assert_eq!(result, expected, "Advanced door state should match"); } |