From 0d0b43ed8819e66a0888eb6d1d1f47b171ae62e0 Mon Sep 17 00:00:00 2001 From: Vincent Ambo Date: Mon, 7 Feb 2022 19:29:52 +0300 Subject: fix(users/tazjin): rustfmt code with non-default settings rustfmt only sometimes detects path-based nested config files (probably some kind of race?), so my users folder uses a separate formatting check for rustfmt to avoid flaky CI. Enough flakes around already ... Change-Id: Ifd862f9974f071b3a256643dd8e56c019116156a Reviewed-on: https://cl.tvl.fyi/c/depot/+/5242 Reviewed-by: tazjin Autosubmit: tazjin Tested-by: BuildkiteCI --- users/tazjin/finito/finito-core/src/lib.rs | 41 ++--- users/tazjin/finito/finito-door/src/lib.rs | 34 ++-- users/tazjin/finito/finito-postgres/src/error.rs | 26 ++- users/tazjin/finito/finito-postgres/src/lib.rs | 191 +++++++++++++---------- users/tazjin/finito/finito-postgres/src/tests.rs | 11 +- 5 files changed, 170 insertions(+), 133 deletions(-) (limited to 'users/tazjin/finito') diff --git a/users/tazjin/finito/finito-core/src/lib.rs b/users/tazjin/finito/finito-core/src/lib.rs index 517bfad2bc..aaec03a77b 100644 --- a/users/tazjin/finito/finito-core/src/lib.rs +++ b/users/tazjin/finito/finito-core/src/lib.rs @@ -38,8 +38,8 @@ //! //! * an event type representing all possible events in the machine //! -//! * an action type representing a description of all possible -//! side-effects of the machine +//! * an action type representing a description of all possible side-effects +//! of the machine //! //! Using the definition above we can now say that a transition in a //! state-machine, involving these three types, takes an initial state @@ -92,14 +92,13 @@ //! //! * `finito`: Core components and classes of Finito //! -//! * `finito-in-mem`: In-memory implementation of state machines -//! that do not need to live longer than an application using -//! standard library concurrency primitives. +//! * `finito-in-mem`: In-memory implementation of state machines that do not +//! need to live longer than an application using standard library +//! concurrency primitives. //! -//! * `finito-postgres`: Postgres-backed, persistent implementation -//! of state machines that, well, do need to live longer. Uses -//! Postgres for concurrency synchronisation, so keep that in -//! mind. +//! * `finito-postgres`: Postgres-backed, persistent implementation of state +//! machines that, well, do need to live longer. Uses Postgres for +//! concurrency synchronisation, so keep that in mind. //! //! Which should cover most use-cases. Okay, enough prose, lets dive //! in. @@ -110,8 +109,8 @@ extern crate serde; -use serde::Serialize; use serde::de::DeserializeOwned; +use serde::Serialize; use std::fmt::Debug; use std::mem; @@ -120,7 +119,10 @@ use std::mem; /// /// This trait is used to implement transition logic and to "tie the /// room together", with the room being our triplet of types. -pub trait FSM where Self: Sized { +pub trait FSM +where + Self: Sized, +{ /// A human-readable string uniquely describing what this FSM /// models. This is used in log messages, database tables and /// various other things throughout Finito. @@ -166,7 +168,7 @@ pub trait FSM where Self: Sized { /// `act` interprets and executes FSM actions. This is the only /// part of an FSM in which side-effects are allowed. - fn act(Self::Action, &Self::State) -> Result, Self::Error>; + fn act(action: Self::Action, state: &Self::State) -> Result, Self::Error>; } /// This function is the primary function used to advance a state @@ -223,11 +225,13 @@ pub trait FSMBackend { /// Insert a new state-machine into the backend's storage and /// return its newly allocated key. fn insert_machine(&self, initial: F) -> Result - where F: FSM + Serialize + DeserializeOwned; + where + F: FSM + Serialize + DeserializeOwned; /// Retrieve the current state of an FSM by its key. fn get_machine(&self, key: Self::Key) -> Result - where F: FSM + Serialize + DeserializeOwned; + where + F: FSM + Serialize + DeserializeOwned; /// Advance a state machine by applying an event and persisting it /// as well as any resulting actions. @@ -236,8 +240,9 @@ pub trait FSMBackend { /// on the backend used. Please consult the backend's /// documentation for details. fn advance<'a, F: FSM>(&'a self, key: Self::Key, event: F::Event) -> Result - where F: FSM + Serialize + DeserializeOwned, - F::State: From<&'a S>, - F::Event: Serialize + DeserializeOwned, - F::Action: Serialize + DeserializeOwned; + where + F: FSM + Serialize + DeserializeOwned, + F::State: From<&'a S>, + F::Event: Serialize + DeserializeOwned, + F::Action: Serialize + DeserializeOwned; } diff --git a/users/tazjin/finito/finito-door/src/lib.rs b/users/tazjin/finito/finito-door/src/lib.rs index 68542c0bc4..441ab0e3d2 100644 --- a/users/tazjin/finito/finito-door/src/lib.rs +++ b/users/tazjin/finito/finito-door/src/lib.rs @@ -27,15 +27,15 @@ //! The door can only be locked if it is closed. Oh, and it has a few //! extra features: //! -//! * whenever the door's state changes, an IRC channel receives a -//! message about that +//! * whenever the door's state changes, an IRC channel receives a message about +//! that //! -//! * the door calls the police if the code is intered incorrectly more -//! than a specified number of times (mhm, lets say, three) +//! * the door calls the police if the code is intered incorrectly more than a +//! specified number of times (mhm, lets say, three) //! -//! * if the police is called the door can not be interacted with -//! anymore (and honestly, for the sake of this example, we don't -//! care how its functionality is restored) +//! * if the police is called the door can not be interacted with anymore (and +//! honestly, for the sake of this example, we don't care how its +//! functionality is restored) //! //! ## The Door - Visualized //! @@ -71,7 +71,8 @@ //! //! Alright, enough foreplay, lets dive in! -#[macro_use] extern crate serde_derive; +#[macro_use] +extern crate serde_derive; extern crate failure; extern crate finito; @@ -292,11 +293,13 @@ mod tests { use finito::advance; fn test_fsm(initial: S, events: Vec) -> (S, Vec) { - events.into_iter().fold((initial, vec![]), |(state, mut actions), event| { - let (new_state, mut new_actions) = advance(state, event); - actions.append(&mut new_actions); - (new_state, actions) - }) + events + .into_iter() + .fold((initial, vec![]), |(state, mut actions), event| { + let (new_state, mut new_actions) = advance(state, event); + actions.append(&mut new_actions); + (new_state, actions) + }) } #[test] @@ -313,7 +316,10 @@ mod tests { ]; let (final_state, actions) = test_fsm(initial, events); - assert_eq!(final_state, DoorState::Locked { code: 4567, attempts: 2 }); + assert_eq!(final_state, DoorState::Locked { + code: 4567, + attempts: 2 + }); assert_eq!(actions, vec![ DoorAction::NotifyIRC("door was closed".into()), DoorAction::NotifyIRC("door was opened".into()), diff --git a/users/tazjin/finito/finito-postgres/src/error.rs b/users/tazjin/finito/finito-postgres/src/error.rs index e130d18361..ed33775cd7 100644 --- a/users/tazjin/finito/finito-postgres/src/error.rs +++ b/users/tazjin/finito/finito-postgres/src/error.rs @@ -1,10 +1,9 @@ //! This module defines error types and conversions for issue that can //! occur while dealing with persisted state machines. -use std::result; -use std::fmt; -use uuid::Uuid; use std::error::Error as StdError; +use std::{fmt, result}; +use uuid::Uuid; // errors to chain: use postgres::Error as PgError; @@ -41,20 +40,15 @@ impl fmt::Display for Error { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { use ErrorKind::*; let msg = match &self.kind { - Serialization(err) => - format!("JSON serialization error: {}", err), + Serialization(err) => format!("JSON serialization error: {}", err), - Database(err) => - format!("PostgreSQL error: {}", err), + Database(err) => format!("PostgreSQL error: {}", err), - DBPool(err) => - format!("Database connection pool error: {}", err), + DBPool(err) => format!("Database connection pool error: {}", err), - FSMNotFound(id) => - format!("FSM with ID {} not found", id), + FSMNotFound(id) => format!("FSM with ID {} not found", id), - ActionNotFound(id) => - format!("Action with ID {} not found", id), + ActionNotFound(id) => format!("Action with ID {} not found", id), }; match &self.context { @@ -66,7 +60,7 @@ impl fmt::Display for Error { impl StdError for Error {} -impl > From for Error { +impl> From for Error { fn from(err: E) -> Error { Error { kind: err.into(), @@ -99,11 +93,11 @@ pub trait ResultExt { fn context(self, ctx: C) -> Result; } -impl > ResultExt for result::Result { +impl> ResultExt for result::Result { fn context(self, ctx: C) -> Result { self.map_err(|err| Error { context: Some(format!("{}", ctx)), - .. err.into() + ..err.into() }) } } diff --git a/users/tazjin/finito/finito-postgres/src/lib.rs b/users/tazjin/finito/finito-postgres/src/lib.rs index ae147f751f..ea63cc9dfd 100644 --- a/users/tazjin/finito/finito-postgres/src/lib.rs +++ b/users/tazjin/finito/finito-postgres/src/lib.rs @@ -4,8 +4,10 @@ //! //! TODO: events & actions should have `SERIAL` keys -#[macro_use] extern crate postgres; -#[macro_use] extern crate postgres_derive; +#[macro_use] +extern crate postgres; +#[macro_use] +extern crate postgres_derive; extern crate chrono; extern crate finito; @@ -14,23 +16,25 @@ extern crate serde; extern crate serde_json; extern crate uuid; -#[cfg(test)] mod tests; -#[cfg(test)] extern crate finito_door; +#[cfg(test)] +mod tests; +#[cfg(test)] +extern crate finito_door; mod error; -pub use error::{Result, Error, ErrorKind}; +pub use error::{Error, ErrorKind, Result}; use chrono::prelude::{DateTime, Utc}; use error::ResultExt; -use finito::{FSM, FSMBackend}; +use finito::{FSMBackend, FSM}; use postgres::transaction::Transaction; use postgres::GenericConnection; -use serde::Serialize; +use r2d2_postgres::{r2d2, PostgresConnectionManager}; use serde::de::DeserializeOwned; +use serde::Serialize; use serde_json::Value; use std::marker::PhantomData; use uuid::Uuid; -use r2d2_postgres::{r2d2, PostgresConnectionManager}; type DBPool = r2d2::Pool; type DBConn = r2d2::PooledConnection; @@ -112,15 +116,13 @@ pub struct FinitoPostgres { db_pool: DBPool, } -impl FinitoPostgres { +impl FinitoPostgres { pub fn new(state: S, db_pool: DBPool, _pool_size: usize) -> Self { - FinitoPostgres { - state, db_pool, - } + FinitoPostgres { state, db_pool } } } -impl FSMBackend for FinitoPostgres { +impl FSMBackend for FinitoPostgres { type Key = Uuid; type Error = Error; @@ -134,10 +136,11 @@ impl FSMBackend for FinitoPostgres { let fsm = S::FSM_NAME.to_string(); let state = serde_json::to_value(initial).context("failed to serialise FSM")?; - self.conn()?.execute(query, &[&id, &fsm, &state]).context("failed to insert FSM")?; + self.conn()? + .execute(query, &[&id, &fsm, &state]) + .context("failed to insert FSM")?; return Ok(id); - } fn get_machine(&self, key: Uuid) -> Result { @@ -156,10 +159,12 @@ impl FSMBackend for FinitoPostgres { /// processing is finished as running actions may result in additional /// transitions. fn advance<'a, S>(&'a self, key: Uuid, event: S::Event) -> Result - where S: FSM + Serialize + DeserializeOwned, - S::State: From<&'a State>, - S::Event: Serialize + DeserializeOwned, - S::Action: Serialize + DeserializeOwned { + where + S: FSM + Serialize + DeserializeOwned, + S::State: From<&'a State>, + S::Event: Serialize + DeserializeOwned, + S::Action: Serialize + DeserializeOwned, + { let conn = self.conn()?; let tx = conn.transaction().context("could not begin transaction")?; let state = get_machine_internal(&tx, key, true)?; @@ -187,16 +192,18 @@ impl FSMBackend for FinitoPostgres { } } -impl FinitoPostgres { +impl FinitoPostgres { /// Execute several actions at the same time, each in a separate /// thread. Note that actions returning further events, causing /// further transitions, returning further actions and so on will /// potentially cause multiple threads to get created. - fn run_actions<'a, S>(&'a self, fsm_id: Uuid, action_ids: Vec) where + fn run_actions<'a, S>(&'a self, fsm_id: Uuid, action_ids: Vec) + where S: FSM + Serialize + DeserializeOwned, S::Event: Serialize + DeserializeOwned, S::Action: Serialize + DeserializeOwned, - S::State: From<&'a State> { + S::State: From<&'a State>, + { let state: S::State = (&self.state).into(); let conn = self.conn().expect("TODO"); @@ -214,17 +221,19 @@ impl FinitoPostgres { /// Retrieve a single connection from the database connection pool. fn conn(&self) -> Result { - self.db_pool.get().context("failed to retrieve connection from pool") + self.db_pool + .get() + .context("failed to retrieve connection from pool") } } - - /// Insert a single state-machine into the database and return its /// newly allocated, random UUID. -pub fn insert_machine(conn: &C, initial: S) -> Result where +pub fn insert_machine(conn: &C, initial: S) -> Result +where C: GenericConnection, - S: FSM + Serialize { + S: FSM + Serialize, +{ let query = r#" INSERT INTO machines (id, fsm, state) VALUES ($1, $2, $3) @@ -240,13 +249,12 @@ pub fn insert_machine(conn: &C, initial: S) -> Result where } /// Insert a single event into the database and return its UUID. -fn insert_event(conn: &C, - fsm_id: Uuid, - event: &S::Event) -> Result +fn insert_event(conn: &C, fsm_id: Uuid, event: &S::Event) -> Result where C: GenericConnection, S: FSM, - S::Event: Serialize { + S::Event: Serialize, +{ let query = r#" INSERT INTO events (id, fsm, fsm_id, event) VALUES ($1, $2, $3, $4) @@ -254,21 +262,19 @@ where let id = Uuid::new_v4(); let fsm = S::FSM_NAME.to_string(); - let event_value = serde_json::to_value(event) - .context("failed to serialize event")?; + let event_value = serde_json::to_value(event).context("failed to serialize event")?; conn.execute(query, &[&id, &fsm, &fsm_id, &event_value])?; - return Ok(id) + return Ok(id); } /// Insert a single action into the database and return its UUID. -fn insert_action(conn: &C, - fsm_id: Uuid, - event_id: Uuid, - action: &S::Action) -> Result where +fn insert_action(conn: &C, fsm_id: Uuid, event_id: Uuid, action: &S::Action) -> Result +where C: GenericConnection, S: FSM, - S::Action: Serialize { + S::Action: Serialize, +{ let query = r#" INSERT INTO actions (id, fsm, fsm_id, event_id, content, status) VALUES ($1, $2, $3, $4, $5, $6) @@ -276,23 +282,26 @@ fn insert_action(conn: &C, let id = Uuid::new_v4(); let fsm = S::FSM_NAME.to_string(); - let action_value = serde_json::to_value(action) - .context("failed to serialize action")?; + let action_value = serde_json::to_value(action).context("failed to serialize action")?; - conn.execute( - query, - &[&id, &fsm, &fsm_id, &event_id, &action_value, &ActionStatus::Pending] - )?; + conn.execute(query, &[ + &id, + &fsm, + &fsm_id, + &event_id, + &action_value, + &ActionStatus::Pending, + ])?; - return Ok(id) + return Ok(id); } /// Update the state of a specified machine. -fn update_state(conn: &C, - fsm_id: Uuid, - state: &S) -> Result<()> where +fn update_state(conn: &C, fsm_id: Uuid, state: &S) -> Result<()> +where C: GenericConnection, - S: FSM + Serialize { + S: FSM + Serialize, +{ let query = r#" UPDATE machines SET state = $1 WHERE id = $2 "#; @@ -312,23 +321,28 @@ fn update_state(conn: &C, fn alter_for_update(alter: bool, query: &str) -> String { match alter { false => query.to_string(), - true => format!("{} FOR UPDATE", query), + true => format!("{} FOR UPDATE", query), } } /// Retrieve the current state of a state machine from the database, /// optionally locking the machine state for the duration of some /// enclosing transaction. -fn get_machine_internal(conn: &C, - id: Uuid, - for_update: bool) -> Result where +fn get_machine_internal(conn: &C, id: Uuid, for_update: bool) -> Result +where C: GenericConnection, - S: FSM + DeserializeOwned { - let query = alter_for_update(for_update, r#" + S: FSM + DeserializeOwned, +{ + let query = alter_for_update( + for_update, + r#" SELECT state FROM machines WHERE id = $1 - "#); + "#, + ); - let rows = conn.query(&query, &[&id]).context("failed to retrieve FSM")?; + let rows = conn + .query(&query, &[&id]) + .context("failed to retrieve FSM")?; if let Some(row) = rows.into_iter().next() { Ok(serde_json::from_value(row.get(0)).context("failed to deserialize FSM")?) @@ -339,20 +353,25 @@ fn get_machine_internal(conn: &C, /// Retrieve an action from the database, optionally locking it for /// the duration of some enclosing transaction. -fn get_action(conn: &C, id: Uuid) -> Result<(ActionStatus, S::Action)> where +fn get_action(conn: &C, id: Uuid) -> Result<(ActionStatus, S::Action)> +where C: GenericConnection, S: FSM, - S::Action: DeserializeOwned { - let query = alter_for_update(true, r#" + S::Action: DeserializeOwned, +{ + let query = alter_for_update( + true, + r#" SELECT status, content FROM actions WHERE id = $1 AND fsm = $2 - "#); + "#, + ); let rows = conn.query(&query, &[&id, &S::FSM_NAME])?; if let Some(row) = rows.into_iter().next() { - let action = serde_json::from_value(row.get(1)) - .context("failed to deserialize FSM action")?; + let action = + serde_json::from_value(row.get(1)).context("failed to deserialize FSM action")?; Ok((row.get(0), action)) } else { Err(ErrorKind::ActionNotFound(id).into()) @@ -360,13 +379,17 @@ fn get_action(conn: &C, id: Uuid) -> Result<(ActionStatus, S::Action)> whe } /// Update the status of an action after an attempt to run it. -fn update_action_status(conn: &C, - id: Uuid, - status: ActionStatus, - error: Option, - _fsm: PhantomData) -> Result<()> where +fn update_action_status( + conn: &C, + id: Uuid, + status: ActionStatus, + error: Option, + _fsm: PhantomData, +) -> Result<()> +where C: GenericConnection, - S: FSM { + S: FSM, +{ let query = r#" UPDATE actions SET status = $1, error = $2 WHERE id = $3 AND fsm = $4 @@ -389,10 +412,16 @@ fn update_action_status(conn: &C, /// panic), the error will be persisted. Should it fail by panicking /// (which developers should never do explicitly in action /// interpreters) its status will not be changed. -fn run_action(tx: Transaction, id: Uuid, state: &S::State, _fsm: PhantomData) - -> Result> where +fn run_action( + tx: Transaction, + id: Uuid, + state: &S::State, + _fsm: PhantomData, +) -> Result> +where S: FSM, - S::Action: DeserializeOwned { + S::Action: DeserializeOwned, +{ let (status, action) = get_action::(&tx, id)?; let result = match status { @@ -401,29 +430,25 @@ fn run_action(tx: Transaction, id: Uuid, state: &S::State, _fsm: PhantomData< // If the action succeeded, update its status to // completed and return the created events. Ok(events) => { - update_action_status( - &tx, id, ActionStatus::Completed, None, PhantomData:: - )?; + update_action_status(&tx, id, ActionStatus::Completed, None, PhantomData::)?; events - }, + } // If the action failed, persist the debug message and // return nothing. Err(err) => { let msg = Some(format!("{:?}", err)); - update_action_status( - &tx, id, ActionStatus::Failed, msg, PhantomData:: - )?; + update_action_status(&tx, id, ActionStatus::Failed, msg, PhantomData::)?; vec![] - }, + } } - }, + } _ => { // TODO: Currently only pending actions are run because // retryable actions are not yet implemented. vec![] - }, + } }; tx.commit().context("failed to commit transaction")?; diff --git a/users/tazjin/finito/finito-postgres/src/tests.rs b/users/tazjin/finito/finito-postgres/src/tests.rs index b1b5821be3..dd270c3875 100644 --- a/users/tazjin/finito/finito-postgres/src/tests.rs +++ b/users/tazjin/finito/finito-postgres/src/tests.rs @@ -16,7 +16,11 @@ fn test_insert_machine() { let door = insert_machine(&conn, initial).expect("Failed to insert door"); let result = get_machine(&conn, &door, false).expect("Failed to fetch door"); - assert_eq!(result, DoorState::Opened, "Inserted door state should match"); + assert_eq!( + result, + DoorState::Opened, + "Inserted door state should match" + ); } #[test] @@ -41,7 +45,10 @@ fn test_advance() { } let result = get_machine(&conn, &door, false).expect("Failed to fetch door"); - let expected = DoorState::Locked { code: 4567, attempts: 2 }; + let expected = DoorState::Locked { + code: 4567, + attempts: 2, + }; assert_eq!(result, expected, "Advanced door state should match"); } -- cgit 1.4.1