diff options
Diffstat (limited to 'users')
-rw-r--r-- | users/tazjin/finito/.gitignore | 4 | ||||
-rw-r--r-- | users/tazjin/finito/Cargo.toml | 6 | ||||
-rw-r--r-- | users/tazjin/finito/README.md | 27 | ||||
-rw-r--r-- | users/tazjin/finito/finito-core/Cargo.toml | 7 | ||||
-rw-r--r-- | users/tazjin/finito/finito-core/src/lib.rs | 243 | ||||
-rw-r--r-- | users/tazjin/finito/finito-door/Cargo.toml | 12 | ||||
-rw-r--r-- | users/tazjin/finito/finito-door/src/lib.rs | 327 | ||||
-rw-r--r-- | users/tazjin/finito/finito-postgres/Cargo.toml | 25 | ||||
-rw-r--r-- | users/tazjin/finito/finito-postgres/migrations/2018-09-26-160621_bootstrap_finito_schema/down.sql | 4 | ||||
-rw-r--r-- | users/tazjin/finito/finito-postgres/migrations/2018-09-26-160621_bootstrap_finito_schema/up.sql | 37 | ||||
-rw-r--r-- | users/tazjin/finito/finito-postgres/src/error.rs | 109 | ||||
-rw-r--r-- | users/tazjin/finito/finito-postgres/src/lib.rs | 431 | ||||
-rw-r--r-- | users/tazjin/finito/finito-postgres/src/tests.rs | 47 |
13 files changed, 1279 insertions, 0 deletions
diff --git a/users/tazjin/finito/.gitignore b/users/tazjin/finito/.gitignore new file mode 100644 index 000000000000..1aefdbbf6cc3 --- /dev/null +++ b/users/tazjin/finito/.gitignore @@ -0,0 +1,4 @@ +.envrc +/target/ +**/*.rs.bk +Cargo.lock diff --git a/users/tazjin/finito/Cargo.toml b/users/tazjin/finito/Cargo.toml new file mode 100644 index 000000000000..310133abeebb --- /dev/null +++ b/users/tazjin/finito/Cargo.toml @@ -0,0 +1,6 @@ +[workspace] +members = [ + "finito-core", + "finito-door", + "finito-postgres" +] diff --git a/users/tazjin/finito/README.md b/users/tazjin/finito/README.md new file mode 100644 index 000000000000..5acd67d3bea7 --- /dev/null +++ b/users/tazjin/finito/README.md @@ -0,0 +1,27 @@ +Finito +====== + +This is a Rust port of the Haskell state-machine library Finito. It is +slightly less featureful because it loses the ability to ensure that +side-effects are contained and because of a slight reduction in +expressivity, which makes it a bit more restrictive. + +However, it still implements the FSM model well enough. + +# Components + +Finito is split up into multiple independent components (note: not all +of these exist yet), separating functionality related to FSM +persistence from other things. + +* `finito`: Core abstraction implemented by Finito +* `finito-door`: Example implementation of a simple, lockable door +* `finito-postgres`: Persistent state-machines using Postgres + +**Note**: The `finito` core library does not contain any tests. Its +coverage is instead provided by the `finito-door` library, which +actually implements an example FSM. + +These are split out because the documentation for `finito-door` is +interesting regardless and because other Finito packages also need an +example implementation. diff --git a/users/tazjin/finito/finito-core/Cargo.toml b/users/tazjin/finito/finito-core/Cargo.toml new file mode 100644 index 000000000000..1d7bdb8b01fe --- /dev/null +++ b/users/tazjin/finito/finito-core/Cargo.toml @@ -0,0 +1,7 @@ +[package] +name = "finito" +version = "0.1.0" +authors = ["Vincent Ambo <mail@tazj.in>"] + +[dependencies] +serde = "1.0" diff --git a/users/tazjin/finito/finito-core/src/lib.rs b/users/tazjin/finito/finito-core/src/lib.rs new file mode 100644 index 000000000000..517bfad2bc74 --- /dev/null +++ b/users/tazjin/finito/finito-core/src/lib.rs @@ -0,0 +1,243 @@ +//! Finito's core finite-state machine abstraction. +//! +//! # What & why? +//! +//! Most processes that occur in software applications can be modeled +//! as finite-state machines (FSMs), however the actual states, the +//! transitions between them and the model's interaction with the +//! external world is often implicit. +//! +//! Making the states of a process explicit using a simple language +//! that works for both software developers and other people who may +//! have opinions on processes makes it easier to synchronise thoughts, +//! extend software and keep a good level of control over what is going +//! on. +//! +//! This library aims to provide functionality for implementing +//! finite-state machines in a way that balances expressivity and +//! safety. +//! +//! Finito does not aim to prevent every possible incorrect +//! transition, but aims for somewhere "safe-enough" (please don't +//! lynch me) that is still easily understood. +//! +//! # Conceptual overview +//! +//! The core idea behind Finito can be expressed in a single line and +//! will potentially look familiar if you have used Erlang in a +//! previous life. The syntax used here is the type-signature notation +//! of Haskell. +//! +//! ```text +//! advance :: state -> event -> (state, [action]) +//! ``` +//! +//! In short, every FSM is made up of three distinct types: +//! +//! * a state type representing all possible states of the machine +//! +//! * an event type representing all possible events in the machine +//! +//! * an action type representing a description of all possible +//! side-effects of the machine +//! +//! Using the definition above we can now say that a transition in a +//! state-machine, involving these three types, takes an initial state +//! and an event to apply it to and returns a new state and a list of +//! actions to execute. +//! +//! With this definition most processes can already be modeled quite +//! well. Two additional functions are required to make it all work: +//! +//! ```text +//! -- | The ability to cause additional side-effects after entering +//! -- a new state. +//! > enter :: state -> [action] +//! ``` +//! +//! as well as +//! +//! ```text +//! -- | An interpreter for side-effects +//! act :: action -> m [event] +//! ``` +//! +//! **Note**: This library is based on an original Haskell library. In +//! Haskell, side-effects can be controlled via the type system which +//! is impossible in Rust. +//! +//! Some parts of Finito make assumptions about the programmer not +//! making certain kinds of mistakes, which are pointed out in the +//! documentation. Unfortunately those assumptions are not +//! automatically verifiable in Rust. +//! +//! ## Example +//! +//! Please consult `finito-door` for an example representing a simple, +//! lockable door as a finite-state machine. This gives an overview +//! over Finito's primary features. +//! +//! If you happen to be the kind of person who likes to learn about +//! libraries by reading code, you should familiarise yourself with the +//! door as it shows up as the example in other finito-related +//! libraries, too. +//! +//! # Persistence, side-effects and mud +//! +//! These three things are inescapable in the fateful realm of +//! computers, but Finito separates them out into separate libraries +//! that you can drag in as you need them. +//! +//! Currently, those libraries include: +//! +//! * `finito`: Core components and classes of Finito +//! +//! * `finito-in-mem`: In-memory implementation of state machines +//! that do not need to live longer than an application using +//! standard library concurrency primitives. +//! +//! * `finito-postgres`: Postgres-backed, persistent implementation +//! of state machines that, well, do need to live longer. Uses +//! Postgres for concurrency synchronisation, so keep that in +//! mind. +//! +//! Which should cover most use-cases. Okay, enough prose, lets dive +//! in. +//! +//! # Does Finito make you want to scream? +//! +//! Please reach out! I want to know why! + +extern crate serde; + +use serde::Serialize; +use serde::de::DeserializeOwned; +use std::fmt::Debug; +use std::mem; + +/// Primary trait that needs to be implemented for every state type +/// representing the states of an FSM. +/// +/// This trait is used to implement transition logic and to "tie the +/// room together", with the room being our triplet of types. +pub trait FSM where Self: Sized { + /// A human-readable string uniquely describing what this FSM + /// models. This is used in log messages, database tables and + /// various other things throughout Finito. + const FSM_NAME: &'static str; + + /// The associated event type of an FSM represents all possible + /// events that can occur in the state-machine. + type Event; + + /// The associated action type of an FSM represents all possible + /// actions that can occur in the state-machine. + type Action; + + /// The associated error type of an FSM represents failures that + /// can occur during action processing. + type Error: Debug; + + /// The associated state type of an FSM describes the state that + /// is made available to the implementation of action + /// interpretations. + type State; + + /// `handle` deals with any incoming events to cause state + /// transitions and emit actions. This function is the core logic + /// of any state machine. + /// + /// Implementations of this function **must not** cause any + /// side-effects to avoid breaking the guarantees of Finitos + /// conceptual model. + fn handle(self, event: Self::Event) -> (Self, Vec<Self::Action>); + + /// `enter` is called when a new state is entered, allowing a + /// state to produce additional side-effects. + /// + /// This is useful for side-effects that event handlers do not + /// need to know about and for resting assured that a certain + /// action has been caused when a state is entered. + /// + /// FSM state types are expected to be enum (i.e. sum) types. A + /// state is considered "new" and enter calls are run if is of a + /// different enum variant. + fn enter(&self) -> Vec<Self::Action>; + + /// `act` interprets and executes FSM actions. This is the only + /// part of an FSM in which side-effects are allowed. + fn act(Self::Action, &Self::State) -> Result<Vec<Self::Event>, Self::Error>; +} + +/// This function is the primary function used to advance a state +/// machine. It takes care of both running the event handler as well +/// as possible state-enter calls and returning the result. +/// +/// Users of Finito should basically always use this function when +/// advancing state-machines manually, and never call FSM-trait +/// methods directly. +pub fn advance<S: FSM>(state: S, event: S::Event) -> (S, Vec<S::Action>) { + // Determine the enum variant of the initial state (used to + // trigger enter calls). + let old_discriminant = mem::discriminant(&state); + + let (new_state, mut actions) = state.handle(event); + + // Compare the enum variant of the resulting state to the old one + // and run `enter` if they differ. + let new_discriminant = mem::discriminant(&new_state); + let mut enter_actions = if old_discriminant != new_discriminant { + new_state.enter() + } else { + vec![] + }; + + actions.append(&mut enter_actions); + + (new_state, actions) +} + +/// This trait is implemented by Finito backends. Backends are +/// expected to be able to keep track of the current state of an FSM +/// and retrieve it / apply updates transactionally. +/// +/// See the `finito-postgres` and `finito-in-mem` crates for example +/// implementations of this trait. +/// +/// Backends must be parameterised over an additional (user-supplied) +/// state type which can be used to track application state that must +/// be made available to action handlers, for example to pass along +/// database connections. +pub trait FSMBackend<S: 'static> { + /// Key type used to identify individual state machines in this + /// backend. + /// + /// TODO: Should be parameterised over FSM type after rustc + /// #44265. + type Key; + + /// Error type for all potential failures that can occur when + /// interacting with this backend. + type Error: Debug; + + /// Insert a new state-machine into the backend's storage and + /// return its newly allocated key. + fn insert_machine<F>(&self, initial: F) -> Result<Self::Key, Self::Error> + where F: FSM + Serialize + DeserializeOwned; + + /// Retrieve the current state of an FSM by its key. + fn get_machine<F: FSM>(&self, key: Self::Key) -> Result<F, Self::Error> + where F: FSM + Serialize + DeserializeOwned; + + /// Advance a state machine by applying an event and persisting it + /// as well as any resulting actions. + /// + /// **Note**: Whether actions are automatically executed depends + /// on the backend used. Please consult the backend's + /// documentation for details. + fn advance<'a, F: FSM>(&'a self, key: Self::Key, event: F::Event) -> Result<F, Self::Error> + where F: FSM + Serialize + DeserializeOwned, + F::State: From<&'a S>, + F::Event: Serialize + DeserializeOwned, + F::Action: Serialize + DeserializeOwned; +} diff --git a/users/tazjin/finito/finito-door/Cargo.toml b/users/tazjin/finito/finito-door/Cargo.toml new file mode 100644 index 000000000000..32c0a5a7c4ef --- /dev/null +++ b/users/tazjin/finito/finito-door/Cargo.toml @@ -0,0 +1,12 @@ +[package] +name = "finito-door" +version = "0.1.0" +authors = ["Vincent Ambo <mail@tazj.in>"] + +[dependencies] +failure = "0.1" +serde = "1.0" +serde_derive = "1.0" + +[dependencies.finito] +path = "../finito-core" diff --git a/users/tazjin/finito/finito-door/src/lib.rs b/users/tazjin/finito/finito-door/src/lib.rs new file mode 100644 index 000000000000..68542c0bc448 --- /dev/null +++ b/users/tazjin/finito/finito-door/src/lib.rs @@ -0,0 +1,327 @@ +//! Example implementation of a lockable door in Finito +//! +//! # What & why? +//! +//! This module serves as a (hopefully simple) example of how to +//! implement finite-state machines using Finito. Note that the +//! concepts of Finito itself won't be explained in detail here, +//! consult its library documentation for that. +//! +//! Reading through this module should give you a rough idea of how to +//! work with Finito and get you up and running modeling things +//! *quickly*. +//! +//! Note: The generated documentation for this module will display the +//! various components of the door, but it will not inform you about +//! the actual transition logic and all that stuff. Read the source, +//! too! +//! +//! # The Door +//! +//! My favourite example when explaining these state-machines +//! conceptually has been to use a simple, lockable door. Our door has +//! a keypad next to it which can be used to lock the door by entering +//! a code, after which the same code must be entered to unlock it +//! again. +//! +//! The door can only be locked if it is closed. Oh, and it has a few +//! extra features: +//! +//! * whenever the door's state changes, an IRC channel receives a +//! message about that +//! +//! * the door calls the police if the code is intered incorrectly more +//! than a specified number of times (mhm, lets say, three) +//! +//! * if the police is called the door can not be interacted with +//! anymore (and honestly, for the sake of this example, we don't +//! care how its functionality is restored) +//! +//! ## The Door - Visualized +//! +//! Here's a rough attempt at drawing a state diagram in ASCII. The +//! bracketed words denote states, the arrows denote events: +//! +//! ```text +//! <--Open--- <--Unlock-- correct code? --Unlock--> +//! [Opened] [Closed] [Locked] [Disabled] +//! --Close--> ----Lock--> +//! ``` +//! +//! I'm so sorry for that drawing. +//! +//! ## The Door - Usage example +//! +//! An interaction session with our final door could look like this: +//! +//! ```rust,ignore +//! use finito_postgres::{insert_machine, advance}; +//! +//! let door = insert_machine(&conn, &DoorState::Opened)?; +//! +//! advance(&conn, &door, DoorEvent::Close)?; +//! advance(&conn, &door, DoorEvent::Lock(1337))?; +//! +//! format!("Door is now: {}", get_machine(&conn, &door)?); +//! ``` +//! +//! Here we have created, closed and then locked a door and inspected +//! its state. We will see that it is locked, has the locking code we +//! gave it and three remaining attempts to open it. +//! +//! Alright, enough foreplay, lets dive in! + +#[macro_use] extern crate serde_derive; + +extern crate failure; +extern crate finito; + +use finito::FSM; + +/// Type synonym to represent the code with which the door is locked. This +/// exists only for clarity in the signatures below and please do not email me +/// about the fact that an integer is not actually a good representation of +/// numerical digits. Thanks! +type Code = usize; + +/// Type synonym to represent the remaining number of unlock attempts. +type Attempts = usize; + +/// This type represents the possible door states and the data that they carry. +/// We can infer this from the "diagram" in the documentation above. +/// +/// This type is the one for which `finito::FSM` will be implemented, making it +/// the wooden (?) heart of our door. +#[derive(Debug, PartialEq, Serialize, Deserialize)] +pub enum DoorState { + /// In `Opened` state, the door is wide open and anyone who fits through can + /// go through. + Opened, + + /// In `Closed` state, the door is shut but does not prevent anyone from + /// opening it. + Closed, + + /// In `Locked` state, the door is locked and waiting for someone to enter + /// its locking code on the keypad. + /// + /// This state contains the code that the door is locked with, as well as + /// the remaining number of attempts before the door calls the police and + /// becomes unusable. + Locked { code: Code, attempts: Attempts }, + + /// This state represents a disabled door after the police has been called. + /// The police will need to unlock it manually! + Disabled, +} + +/// This type represents the events that can occur in our door, i.e. the input +/// and interactions it receives. +#[derive(Debug, PartialEq, Serialize, Deserialize)] +pub enum DoorEvent { + /// `Open` means someone is opening the door! + Open, + + /// `Close` means, you guessed it, the exact opposite. + Close, + + /// `Lock` means somebody has entered a locking code on the + /// keypad. + Lock(Code), + + /// `Unlock` means someone has attempted to unlock the door. + Unlock(Code), +} + +/// This type represents the possible actions, a.k.a. everything our door "does" +/// that does not just impact itself, a.k.a. side-effects. +/// +/// **Note**: This type by itself *is not* a collection of side-effects, it +/// merely describes the side-effects we want to occur (which are then +/// interpreted by the machinery later). +#[derive(Debug, PartialEq, Serialize, Deserialize)] +pub enum DoorAction { + /// `NotifyIRC` is used to display some kind of message on the + /// aforementioned IRC channel that is, for some reason, very interested in + /// the state of the door. + NotifyIRC(String), + + /// `CallThePolice` does what you think it does. + /// + /// **Note**: For safety reasons, causing this action is not recommended for + /// users inside the US! + CallThePolice, +} + +/// This trait implementation turns our 'DoorState' into a type actually +/// representing a finite-state machine. To implement it, we need to do three +/// main things: +/// +/// * Define what our associated `Event` and `Action` type should be +/// +/// * Define the event-handling and state-entering logic (i.e. the meat of the +/// ... door) +/// +/// * Implement the interpretation of our actions, i.e. implement actual +/// side-effects +impl FSM for DoorState { + const FSM_NAME: &'static str = "door"; + + // As you might expect, our `Event` type is 'DoorEvent' and our `Action` + // type is 'DoorAction'. + type Event = DoorEvent; + type Action = DoorAction; + type State = (); + + // For error handling, the door simply uses `failure` which provides a + // generic, chainable error type. In real-world implementations you may want + // to use a custom error type or similar. + type Error = failure::Error; + + // The implementation of `handle` provides us with the actual transition + // logic of the door. + // + // The door is conceptually not that complicated so it is relatively short. + fn handle(self, event: DoorEvent) -> (Self, Vec<DoorAction>) { + match (self, event) { + // An opened door can be closed: + (DoorState::Opened, DoorEvent::Close) => return (DoorState::Closed, vec![]), + + // A closed door can be opened: + (DoorState::Closed, DoorEvent::Open) => return (DoorState::Opened, vec![]), + + // A closed door can also be locked, in which case the locking code + // is stored with the next state and the unlock attempts default to + // three: + (DoorState::Closed, DoorEvent::Lock(code)) => { + return (DoorState::Locked { code, attempts: 3 }, vec![]) + } + + // A locked door receiving an `Unlock`-event can do several + // different things ... + (DoorState::Locked { code, attempts }, DoorEvent::Unlock(unlock_code)) => { + // In the happy case, entry of a correct code leads to the door + // becoming unlocked (i.e. transitioning back to `Closed`). + if code == unlock_code { + return (DoorState::Closed, vec![]); + } + + // If the code wasn't correct and the fraudulent unlocker ran + // out of attempts (i.e. there was only one attempt remaining), + // it's time for some consequences. + if attempts == 1 { + return (DoorState::Disabled, vec![DoorAction::CallThePolice]); + } + + // If the code wasn't correct, but there are still some + // remaining attempts, the user doesn't have to face the police + // quite yet but IRC gets to laugh about it. + return ( + DoorState::Locked { + code, + attempts: attempts - 1, + }, + vec![DoorAction::NotifyIRC("invalid code entered".into())], + ); + } + + // This actually already concludes our event-handling logic. Our + // uncaring door does absolutely nothing if you attempt to do + // something with it that it doesn't support, so the last handler is + // a simple fallback. + // + // In a real-world state machine, especially one that receives + // events from external sources, you may want fallback handlers to + // actually do something. One example could be creating an action + // that logs information about unexpected events, alerts a + // monitoring service, or whatever else. + (current, _) => (current, vec![]), + } + } + + // The implementation of `enter` lets door states cause additional actions + // they are transitioned to. In the door example we use this only to notify + // IRC about what is going on. + fn enter(&self) -> Vec<DoorAction> { + let msg = match self { + DoorState::Opened => "door was opened", + DoorState::Closed => "door was closed", + DoorState::Locked { .. } => "door was locked", + DoorState::Disabled => "door was disabled", + }; + + vec![DoorAction::NotifyIRC(msg.into())] + } + + // The implementation of `act` lets us perform actual side-effects. + // + // Again, for the sake of educational simplicity, this does not deal with + // all potential (or in fact any) error cases that can occur during this toy + // implementation of actions. + // + // Additionally the `act` function can return new events. This is useful for + // a sort of "callback-like" pattern (cause an action to fetch some data, + // receive it as an event) but is not used in this example. + fn act(action: DoorAction, _state: &()) -> Result<Vec<DoorEvent>, failure::Error> { + match action { + DoorAction::NotifyIRC(msg) => { + use std::fs::OpenOptions; + use std::io::Write; + + let mut file = OpenOptions::new() + .append(true) + .create(true) + .open("/tmp/door-irc.log")?; + + write!(file, "<doorbot> {}\n", msg)?; + Ok(vec![]) + } + + DoorAction::CallThePolice => { + // TODO: call the police + println!("The police was called! For real!"); + Ok(vec![]) + } + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use finito::advance; + + fn test_fsm<S: FSM>(initial: S, events: Vec<S::Event>) -> (S, Vec<S::Action>) { + events.into_iter().fold((initial, vec![]), |(state, mut actions), event| { + let (new_state, mut new_actions) = advance(state, event); + actions.append(&mut new_actions); + (new_state, actions) + }) + } + + #[test] + fn test_door() { + let initial = DoorState::Opened; + let events = vec![ + DoorEvent::Close, + DoorEvent::Open, + DoorEvent::Close, + DoorEvent::Lock(1234), + DoorEvent::Unlock(1234), + DoorEvent::Lock(4567), + DoorEvent::Unlock(1234), + ]; + let (final_state, actions) = test_fsm(initial, events); + + assert_eq!(final_state, DoorState::Locked { code: 4567, attempts: 2 }); + assert_eq!(actions, vec![ + DoorAction::NotifyIRC("door was closed".into()), + DoorAction::NotifyIRC("door was opened".into()), + DoorAction::NotifyIRC("door was closed".into()), + DoorAction::NotifyIRC("door was locked".into()), + DoorAction::NotifyIRC("door was closed".into()), + DoorAction::NotifyIRC("door was locked".into()), + DoorAction::NotifyIRC("invalid code entered".into()), + ]); + } +} diff --git a/users/tazjin/finito/finito-postgres/Cargo.toml b/users/tazjin/finito/finito-postgres/Cargo.toml new file mode 100644 index 000000000000..dd8d1d000304 --- /dev/null +++ b/users/tazjin/finito/finito-postgres/Cargo.toml @@ -0,0 +1,25 @@ +[package] +name = "finito-postgres" +version = "0.1.0" +authors = ["Vincent Ambo <mail@tazj.in>"] + +[dependencies] +chrono = "0.4" +postgres-derive = "0.3" +serde = "1.0" +serde_json = "1.0" +r2d2_postgres = "0.14" + +[dependencies.postgres] +version = "0.15" +features = [ "with-uuid", "with-chrono", "with-serde_json" ] + +[dependencies.uuid] +version = "0.5" +features = [ "v4" ] + +[dependencies.finito] +path = "../finito-core" + +[dev-dependencies.finito-door] +path = "../finito-door" diff --git a/users/tazjin/finito/finito-postgres/migrations/2018-09-26-160621_bootstrap_finito_schema/down.sql b/users/tazjin/finito/finito-postgres/migrations/2018-09-26-160621_bootstrap_finito_schema/down.sql new file mode 100644 index 000000000000..9b56f9d35abe --- /dev/null +++ b/users/tazjin/finito/finito-postgres/migrations/2018-09-26-160621_bootstrap_finito_schema/down.sql @@ -0,0 +1,4 @@ +DROP TABLE actions; +DROP TYPE ActionStatus; +DROP TABLE events; +DROP TABLE machines; diff --git a/users/tazjin/finito/finito-postgres/migrations/2018-09-26-160621_bootstrap_finito_schema/up.sql b/users/tazjin/finito/finito-postgres/migrations/2018-09-26-160621_bootstrap_finito_schema/up.sql new file mode 100644 index 000000000000..18ace393b8d9 --- /dev/null +++ b/users/tazjin/finito/finito-postgres/migrations/2018-09-26-160621_bootstrap_finito_schema/up.sql @@ -0,0 +1,37 @@ +-- Creates the initial schema required by finito-postgres. + +CREATE TABLE machines ( + id UUID PRIMARY KEY, + created TIMESTAMPTZ NOT NULL DEFAULT NOW(), + fsm TEXT NOT NULL, + state JSONB NOT NULL +); + +CREATE TABLE events ( + id UUID PRIMARY KEY, + created TIMESTAMPTZ NOT NULL DEFAULT NOW(), + fsm TEXT NOT NULL, + fsm_id UUID NOT NULL REFERENCES machines(id), + event JSONB NOT NULL +); +CREATE INDEX idx_events_machines ON events(fsm_id); + +CREATE TYPE ActionStatus AS ENUM ( + 'Pending', + 'Completed', + 'Failed' +); + +CREATE TABLE actions ( + id UUID PRIMARY KEY, + created TIMESTAMPTZ NOT NULL DEFAULT NOW(), + fsm TEXT NOT NULL, + fsm_id UUID NOT NULL REFERENCES machines(id), + event_id UUID NOT NULL REFERENCES events(id), + content JSONB NOT NULL, + status ActionStatus NOT NULL, + error TEXT +); + +CREATE INDEX idx_actions_machines ON actions(fsm_id); +CREATE INDEX idx_actions_events ON actions(event_id); diff --git a/users/tazjin/finito/finito-postgres/src/error.rs b/users/tazjin/finito/finito-postgres/src/error.rs new file mode 100644 index 000000000000..e130d18361f1 --- /dev/null +++ b/users/tazjin/finito/finito-postgres/src/error.rs @@ -0,0 +1,109 @@ +//! This module defines error types and conversions for issue that can +//! occur while dealing with persisted state machines. + +use std::result; +use std::fmt; +use uuid::Uuid; +use std::error::Error as StdError; + +// errors to chain: +use postgres::Error as PgError; +use r2d2_postgres::r2d2::Error as PoolError; +use serde_json::Error as JsonError; + +pub type Result<T> = result::Result<T, Error>; + +#[derive(Debug)] +pub struct Error { + pub kind: ErrorKind, + pub context: Option<String>, +} + +#[derive(Debug)] +pub enum ErrorKind { + /// Errors occuring during JSON serialization of FSM types. + Serialization(String), + + /// Errors occuring during communication with the database. + Database(String), + + /// Errors with the database connection pool. + DBPool(String), + + /// State machine could not be found. + FSMNotFound(Uuid), + + /// Action could not be found. + ActionNotFound(Uuid), +} + +impl fmt::Display for Error { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + use ErrorKind::*; + let msg = match &self.kind { + Serialization(err) => + format!("JSON serialization error: {}", err), + + Database(err) => + format!("PostgreSQL error: {}", err), + + DBPool(err) => + format!("Database connection pool error: {}", err), + + FSMNotFound(id) => + format!("FSM with ID {} not found", id), + + ActionNotFound(id) => + format!("Action with ID {} not found", id), + }; + + match &self.context { + None => write!(f, "{}", msg), + Some(ctx) => write!(f, "{}: {}", ctx, msg), + } + } +} + +impl StdError for Error {} + +impl <E: Into<ErrorKind>> From<E> for Error { + fn from(err: E) -> Error { + Error { + kind: err.into(), + context: None, + } + } +} + +impl From<JsonError> for ErrorKind { + fn from(err: JsonError) -> ErrorKind { + ErrorKind::Serialization(err.to_string()) + } +} + +impl From<PgError> for ErrorKind { + fn from(err: PgError) -> ErrorKind { + ErrorKind::Database(err.to_string()) + } +} + +impl From<PoolError> for ErrorKind { + fn from(err: PoolError) -> ErrorKind { + ErrorKind::DBPool(err.to_string()) + } +} + +/// Helper trait that makes it possible to supply contextual +/// information with an error. +pub trait ResultExt<T> { + fn context<C: fmt::Display>(self, ctx: C) -> Result<T>; +} + +impl <T, E: Into<Error>> ResultExt<T> for result::Result<T, E> { + fn context<C: fmt::Display>(self, ctx: C) -> Result<T> { + self.map_err(|err| Error { + context: Some(format!("{}", ctx)), + .. err.into() + }) + } +} diff --git a/users/tazjin/finito/finito-postgres/src/lib.rs b/users/tazjin/finito/finito-postgres/src/lib.rs new file mode 100644 index 000000000000..eea6405c6f45 --- /dev/null +++ b/users/tazjin/finito/finito-postgres/src/lib.rs @@ -0,0 +1,431 @@ +//! PostgreSQL-backed persistence for Finito state machines +//! +//! This module implements ... TODO when I can write again. +//! +//! TODO: events & actions should have `SERIAL` keys + +#[macro_use] extern crate postgres; +#[macro_use] extern crate postgres_derive; + +extern crate chrono; +extern crate finito; +extern crate r2d2_postgres; +extern crate serde; +extern crate serde_json; +extern crate uuid; + +#[cfg(test)] mod tests; +#[cfg(test)] extern crate finito_door; + +mod error; +pub use error::{Result, Error, ErrorKind}; + +use chrono::prelude::{DateTime, Utc}; +use error::ResultExt; +use finito::{FSM, FSMBackend}; +use postgres::transaction::Transaction; +use postgres::GenericConnection; +use serde::Serialize; +use serde::de::DeserializeOwned; +use serde_json::Value; +use std::marker::PhantomData; +use uuid::Uuid; +use r2d2_postgres::{r2d2, PostgresConnectionManager}; + +type DBPool = r2d2::Pool<PostgresConnectionManager>; +type DBConn = r2d2::PooledConnection<PostgresConnectionManager>; + +/// This struct represents rows in the database table in which events +/// are persisted. +#[derive(Debug, ToSql, FromSql)] +struct EventT { + /// ID of the persisted event. + id: Uuid, + + /// Timestamp at which the event was stored. + created: DateTime<Utc>, + + /// Name of the type of FSM that this state belongs to. + fsm: String, + + /// ID of the state machine belonging to this event. + fsm_id: Uuid, + + /// Serialised content of the event. + event: Value, +} + +/// This enum represents the possible statuses an action can be in. +#[derive(Debug, PartialEq, ToSql, FromSql)] +#[postgres(name = "actionstatus")] +enum ActionStatus { + /// The action was requested but has not run yet. + Pending, + + /// The action completed successfully. + Completed, + + /// The action failed to run. Information about the error will + /// have been persisted in Postgres. + Failed, +} + +/// This struct represents rows in the database table in which actions +/// are persisted. +#[derive(Debug, ToSql, FromSql)] +struct ActionT { + /// ID of the persisted event. + id: Uuid, + + /// Timestamp at which the event was stored. + created: DateTime<Utc>, + + /// Name of the type of FSM that this state belongs to. + fsm: String, + + /// ID of the state machine belonging to this event. + fsm_id: Uuid, + + /// ID of the event that resulted in this action. + event_id: Uuid, + + /// Serialised content of the action. + #[postgres(name = "content")] // renamed because 'action' is a keyword in PG + action: Value, + + /// Current status of the action. + status: ActionStatus, + + /// Detailed (i.e. Debug-trait formatted) error message, if an + /// error occured during action processing. + error: Option<String>, +} + +// The following functions implement the public interface of +// `finito-postgres`. + +/// TODO: Write docs for this type, brain does not want to do it right +/// now. +pub struct FinitoPostgres<S> { + state: S, + + db_pool: DBPool, +} + +impl <S> FinitoPostgres<S> { + pub fn new(state: S, db_pool: DBPool, pool_size: usize) -> Self { + FinitoPostgres { + state, db_pool, + } + } +} + +impl <State: 'static> FSMBackend<State> for FinitoPostgres<State> { + type Key = Uuid; + type Error = Error; + + fn insert_machine<S: FSM + Serialize>(&self, initial: S) -> Result<Uuid> { + let query = r#" + INSERT INTO machines (id, fsm, state) + VALUES ($1, $2, $3) + "#; + + let id = Uuid::new_v4(); + let fsm = S::FSM_NAME.to_string(); + let state = serde_json::to_value(initial).context("failed to serialise FSM")?; + + self.conn()?.execute(query, &[&id, &fsm, &state]).context("failed to insert FSM")?; + + return Ok(id); + + } + + fn get_machine<S: FSM + DeserializeOwned>(&self, key: Uuid) -> Result<S> { + get_machine_internal(&*self.conn()?, key, false) + } + + /// Advance a persisted state machine by applying an event, and + /// storing the event as well as all resulting actions. + /// + /// This function holds a database-lock on the state's row while + /// advancing the machine. + /// + /// **Note**: This function returns the new state of the machine + /// immediately after applying the event, however this does not + /// necessarily equate to the state of the machine after all related + /// processing is finished as running actions may result in additional + /// transitions. + fn advance<'a, S>(&'a self, key: Uuid, event: S::Event) -> Result<S> + where S: FSM + Serialize + DeserializeOwned, + S::State: From<&'a State>, + S::Event: Serialize + DeserializeOwned, + S::Action: Serialize + DeserializeOwned { + let conn = self.conn()?; + let tx = conn.transaction().context("could not begin transaction")?; + let state = get_machine_internal(&tx, key, true)?; + + // Advancing the FSM consumes the event, so it is persisted first: + let event_id = insert_event::<_, S>(&tx, key, &event)?; + + // Core advancing logic is run: + let (new_state, actions) = finito::advance(state, event); + + // Resulting actions are persisted (TODO: and interpreted) + let mut action_ids = vec![]; + for action in actions { + let action_id = insert_action::<_, S>(&tx, key, event_id, &action)?; + action_ids.push(action_id); + } + + // And finally the state is updated: + update_state(&tx, key, &new_state)?; + tx.commit().context("could not commit transaction")?; + + self.run_actions::<S>(key, action_ids); + + Ok(new_state) + } +} + +impl <State: 'static> FinitoPostgres<State> { + /// Execute several actions at the same time, each in a separate + /// thread. Note that actions returning further events, causing + /// further transitions, returning further actions and so on will + /// potentially cause multiple threads to get created. + fn run_actions<'a, S>(&'a self, fsm_id: Uuid, action_ids: Vec<Uuid>) where + S: FSM + Serialize + DeserializeOwned, + S::Event: Serialize + DeserializeOwned, + S::Action: Serialize + DeserializeOwned, + S::State: From<&'a State> { + let state: S::State = (&self.state).into(); + let conn = self.conn().expect("TODO"); + + for action_id in action_ids { + let tx = conn.transaction().expect("TODO"); + + // TODO: Determine which concurrency setup we actually want. + if let Ok(events) = run_action(tx, action_id, &state, PhantomData::<S>) { + for event in events { + self.advance::<S>(fsm_id, event).expect("TODO"); + } + } + } + } + + /// Retrieve a single connection from the database connection pool. + fn conn(&self) -> Result<DBConn> { + self.db_pool.get().context("failed to retrieve connection from pool") + } +} + + + +/// Insert a single state-machine into the database and return its +/// newly allocated, random UUID. +pub fn insert_machine<C, S>(conn: &C, initial: S) -> Result<Uuid> where + C: GenericConnection, + S: FSM + Serialize { + let query = r#" + INSERT INTO machines (id, fsm, state) + VALUES ($1, $2, $3) + "#; + + let id = Uuid::new_v4(); + let fsm = S::FSM_NAME.to_string(); + let state = serde_json::to_value(initial).context("failed to serialize FSM")?; + + conn.execute(query, &[&id, &fsm, &state])?; + + return Ok(id); +} + +/// Insert a single event into the database and return its UUID. +fn insert_event<C, S>(conn: &C, + fsm_id: Uuid, + event: &S::Event) -> Result<Uuid> +where + C: GenericConnection, + S: FSM, + S::Event: Serialize { + let query = r#" + INSERT INTO events (id, fsm, fsm_id, event) + VALUES ($1, $2, $3, $4) + "#; + + let id = Uuid::new_v4(); + let fsm = S::FSM_NAME.to_string(); + let event_value = serde_json::to_value(event) + .context("failed to serialize event")?; + + conn.execute(query, &[&id, &fsm, &fsm_id, &event_value])?; + return Ok(id) +} + +/// Insert a single action into the database and return its UUID. +fn insert_action<C, S>(conn: &C, + fsm_id: Uuid, + event_id: Uuid, + action: &S::Action) -> Result<Uuid> where + C: GenericConnection, + S: FSM, + S::Action: Serialize { + let query = r#" + INSERT INTO actions (id, fsm, fsm_id, event_id, content, status) + VALUES ($1, $2, $3, $4, $5, $6) + "#; + + let id = Uuid::new_v4(); + let fsm = S::FSM_NAME.to_string(); + let action_value = serde_json::to_value(action) + .context("failed to serialize action")?; + + conn.execute( + query, + &[&id, &fsm, &fsm_id, &event_id, &action_value, &ActionStatus::Pending] + )?; + + return Ok(id) +} + +/// Update the state of a specified machine. +fn update_state<C, S>(conn: &C, + fsm_id: Uuid, + state: &S) -> Result<()> where + C: GenericConnection, + S: FSM + Serialize { + let query = r#" + UPDATE machines SET state = $1 WHERE id = $2 + "#; + + let state_value = serde_json::to_value(state).context("failed to serialize FSM")?; + let res_count = conn.execute(query, &[&state_value, &fsm_id])?; + + if res_count != 1 { + Err(ErrorKind::FSMNotFound(fsm_id).into()) + } else { + Ok(()) + } +} + +/// Conditionally alter SQL statement to append locking clause inside +/// of a transaction. +fn alter_for_update(alter: bool, query: &str) -> String { + match alter { + false => query.to_string(), + true => format!("{} FOR UPDATE", query), + } +} + +/// Retrieve the current state of a state machine from the database, +/// optionally locking the machine state for the duration of some +/// enclosing transaction. +fn get_machine_internal<C, S>(conn: &C, + id: Uuid, + for_update: bool) -> Result<S> where + C: GenericConnection, + S: FSM + DeserializeOwned { + let query = alter_for_update(for_update, r#" + SELECT state FROM machines WHERE id = $1 + "#); + + let rows = conn.query(&query, &[&id]).context("failed to retrieve FSM")?; + + if let Some(row) = rows.into_iter().next() { + Ok(serde_json::from_value(row.get(0)).context("failed to deserialize FSM")?) + } else { + Err(ErrorKind::FSMNotFound(id).into()) + } +} + +/// Retrieve an action from the database, optionally locking it for +/// the duration of some enclosing transaction. +fn get_action<C, S>(conn: &C, id: Uuid) -> Result<(ActionStatus, S::Action)> where + C: GenericConnection, + S: FSM, + S::Action: DeserializeOwned { + let query = alter_for_update(true, r#" + SELECT status, content FROM actions + WHERE id = $1 AND fsm = $2 + "#); + + let rows = conn.query(&query, &[&id, &S::FSM_NAME])?; + + if let Some(row) = rows.into_iter().next() { + let action = serde_json::from_value(row.get(1)) + .context("failed to deserialize FSM action")?; + Ok((row.get(0), action)) + } else { + Err(ErrorKind::ActionNotFound(id).into()) + } +} + +/// Update the status of an action after an attempt to run it. +fn update_action_status<C, S>(conn: &C, + id: Uuid, + status: ActionStatus, + error: Option<String>, + _fsm: PhantomData<S>) -> Result<()> where + C: GenericConnection, + S: FSM { + let query = r#" + UPDATE actions SET status = $1, error = $2 + WHERE id = $3 AND fsm = $4 + "#; + + let result = conn.execute(&query, &[&status, &error, &id, &S::FSM_NAME])?; + + if result != 1 { + Err(ErrorKind::ActionNotFound(id).into()) + } else { + Ok(()) + } +} + +/// Execute a single action in case it is pending or retryable. Holds +/// a lock on the action's database row while performing the action +/// and writes back the status afterwards. +/// +/// Should the execution of an action fail cleanly (i.e. without a +/// panic), the error will be persisted. Should it fail by panicking +/// (which developers should never do explicitly in action +/// interpreters) its status will not be changed. +fn run_action<S>(tx: Transaction, id: Uuid, state: &S::State, _fsm: PhantomData<S>) + -> Result<Vec<S::Event>> where + S: FSM, + S::Action: DeserializeOwned { + let (status, action) = get_action::<Transaction, S>(&tx, id)?; + + let result = match status { + ActionStatus::Pending => { + match S::act(action, state) { + // If the action succeeded, update its status to + // completed and return the created events. + Ok(events) => { + update_action_status( + &tx, id, ActionStatus::Completed, None, PhantomData::<S> + )?; + events + }, + + // If the action failed, persist the debug message and + // return nothing. + Err(err) => { + let msg = Some(format!("{:?}", err)); + update_action_status( + &tx, id, ActionStatus::Failed, msg, PhantomData::<S> + )?; + vec![] + }, + } + }, + + _ => { + // TODO: Currently only pending actions are run because + // retryable actions are not yet implemented. + vec![] + }, + }; + + tx.commit().context("failed to commit transaction")?; + Ok(result) +} diff --git a/users/tazjin/finito/finito-postgres/src/tests.rs b/users/tazjin/finito/finito-postgres/src/tests.rs new file mode 100644 index 000000000000..b1b5821be3c4 --- /dev/null +++ b/users/tazjin/finito/finito-postgres/src/tests.rs @@ -0,0 +1,47 @@ +use super::*; + +use finito_door::*; +use postgres::{Connection, TlsMode}; + +// TODO: read config from environment +fn open_test_connection() -> Connection { + Connection::connect("postgres://finito:finito@localhost/finito", TlsMode::None) + .expect("Failed to connect to test database") +} + +#[test] +fn test_insert_machine() { + let conn = open_test_connection(); + let initial = DoorState::Opened; + let door = insert_machine(&conn, initial).expect("Failed to insert door"); + let result = get_machine(&conn, &door, false).expect("Failed to fetch door"); + + assert_eq!(result, DoorState::Opened, "Inserted door state should match"); +} + +#[test] +fn test_advance() { + let conn = open_test_connection(); + + let initial = DoorState::Opened; + let events = vec![ + DoorEvent::Close, + DoorEvent::Open, + DoorEvent::Close, + DoorEvent::Lock(1234), + DoorEvent::Unlock(1234), + DoorEvent::Lock(4567), + DoorEvent::Unlock(1234), + ]; + + let door = insert_machine(&conn, initial).expect("Failed to insert door"); + + for event in events { + advance(&conn, &door, event).expect("Failed to advance door FSM"); + } + + let result = get_machine(&conn, &door, false).expect("Failed to fetch door"); + let expected = DoorState::Locked { code: 4567, attempts: 2 }; + + assert_eq!(result, expected, "Advanced door state should match"); +} |