From b1e00ff0264fec4f3a5b87470980aebd94db81cf Mon Sep 17 00:00:00 2001 From: Vincent Ambo Date: Wed, 26 Sep 2018 16:53:04 +0200 Subject: feat(postgres): Bootstrap Postgres persistence implementation Adds the initial finito-postgres crate with type definitions for the tables and initial functions to interact with persisted FSMs. This is far from feature complete at this commit. --- finito-postgres/Cargo.toml | 21 ++++ finito-postgres/src/error.rs | 8 ++ finito-postgres/src/lib.rs | 241 +++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 270 insertions(+) create mode 100644 finito-postgres/Cargo.toml create mode 100644 finito-postgres/src/error.rs create mode 100644 finito-postgres/src/lib.rs (limited to 'finito-postgres') diff --git a/finito-postgres/Cargo.toml b/finito-postgres/Cargo.toml new file mode 100644 index 000000000000..c9f8476dbdc6 --- /dev/null +++ b/finito-postgres/Cargo.toml @@ -0,0 +1,21 @@ +[package] +name = "finito-postgres" +version = "0.1.0" +authors = ["Vincent Ambo "] + +[dependencies] +chrono = "0.4" +serde = "1.0" +serde_json = "1.0" +postgres-derive = "0.3" + +[dependencies.postgres] +version = "0.15" +features = [ "with-uuid", "with-chrono", "with-serde_json" ] + +[dependencies.uuid] +version = "0.5" +features = [ "v4" ] + +[dependencies.finito] +path = "../finito-core" diff --git a/finito-postgres/src/error.rs b/finito-postgres/src/error.rs new file mode 100644 index 000000000000..ccbf0c107e1b --- /dev/null +++ b/finito-postgres/src/error.rs @@ -0,0 +1,8 @@ +//! This module defines error types and conversions for issue that can +//! occur while dealing with persisted state machines. + +use std::result; + +pub type Result = result::Result; + +pub enum Error { SomeError } diff --git a/finito-postgres/src/lib.rs b/finito-postgres/src/lib.rs new file mode 100644 index 000000000000..aee8cf9adf96 --- /dev/null +++ b/finito-postgres/src/lib.rs @@ -0,0 +1,241 @@ +//! This module implements ... TODO when I can write again. + +#[macro_use] extern crate postgres; +#[macro_use] extern crate postgres_derive; + +extern crate chrono; +extern crate finito; +extern crate serde; +extern crate serde_json; +extern crate uuid; + +mod error; +pub use error::{Result, Error}; + +use chrono::prelude::{DateTime, Utc}; +use finito::FSM; +use postgres::GenericConnection; +use serde::Serialize; +use serde::de::DeserializeOwned; +use serde_json::Value; +use std::fmt; +use std::marker::PhantomData; +use uuid::Uuid; + +/// This struct represents rows in the database table in which +/// machines (i.e. the current state of a Finito state machine) are +/// persisted. +#[derive(Debug, ToSql, FromSql)] +struct MachineT { + /// ID of the persisted state machine. + id: Uuid, + + /// Time at which the FSM was first created. + created: DateTime, + + /// Name of the type of FSM represented by this state. + fsm: String, + + /// Current state of the FSM (TODO: Can the serialised FSM type be + /// used?) + state: Value, +} + +/// This struct represents rows in the database table in which events +/// are persisted. +#[derive(Debug, ToSql, FromSql)] +struct EventT { + /// ID of the persisted event. + id: Uuid, + + /// Timestamp at which the event was stored. + created: DateTime, + + /// Name of the type of FSM that this state belongs to. + fsm: String, + + /// ID of the state machine belonging to this event. + fsm_id: Uuid, + + /// Serialised content of the event. + event: Value, +} + +/// This enum represents the possible statuses an action can be in. +#[derive(Debug, ToSql, FromSql)] +enum ActionStatus { + /// The action was requested but has not run yet. + Pending, + + /// The action completed successfully. + Completed, + + /// The action failed to run. Information about the error will + /// have been persisted in Postgres. + Failed, +} + +/// This struct represents rows in the database table in which actions +/// are persisted. +#[derive(Debug, ToSql, FromSql)] +struct ActionT { + /// ID of the persisted event. + id: Uuid, + + /// Timestamp at which the event was stored. + created: DateTime, + + /// Name of the type of FSM that this state belongs to. + fsm: String, + + /// ID of the state machine belonging to this event. + fsm_id: Uuid, + + /// Serialised content of the action. + action: Value, + + /// Current status of the action. + status: ActionStatus, + + /// Serialised error representation, if an error occured during + /// processing. TODO: Use some actual error type. Maybe failure + /// has serialisation support? + error: Option, +} + +// The following functions implement the public interface of +// `finito-postgres`. + +/// This type is used as a type-safe wrapper around the ID of a state +/// machine. It carries information about the FSM type and is intended +/// to add a layer of checking to prevent IDs from being mixed up. +#[derive(Clone)] +pub struct MachineId { + uuid: Uuid, + phantom: PhantomData, +} + +/// Custom debug implementation to format machine IDs using the name +/// of the FSM and their UUID. +impl fmt::Debug for MachineId { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, "{}:{}", S::FSM_NAME, self.uuid.hyphenated()) + } +} + +impl MachineId { + /// Convert a UUID into a strongly typed machine ID. + pub fn from_uuid(uuid: Uuid) -> Self { + MachineId { + uuid, + phantom: PhantomData, + } + } + + /// Return the UUID contained in a machine ID. + pub fn to_uuid(&self) -> Uuid { + self.uuid + } +} + +/// Insert a single state-machine into the database and return its +/// newly allocated, random UUID. +pub fn insert_machine(conn: &C, initial: S) -> Result> where + C: GenericConnection, + S: FSM + Serialize { + let query = r#" + INSERT INTO machines (id, created, fsm, state) + VALUES ($1, NOW(), $2, $3) + "#; + + let id = Uuid::new_v4(); + let fsm = S::FSM_NAME.to_string(); + let state = serde_json::to_value(initial).expect("TODO"); + + conn.execute(query, &[&id, &fsm, &state]).expect("TODO"); + + return Ok(MachineId::from_uuid(id)); +} + +/// Insert a single event into the database and return its UUID. +pub fn insert_event(conn: &C, fsm_id: MachineId, event: S::Event) -> Result +where + C: GenericConnection, + S: FSM, + S::Event: Serialize { + let query = r#" + INSERT INTO events (id, created, fsm, fsm_id, event) + VALUES ($1, NOW(), $2, $3, $4) + "#; + + let id = Uuid::new_v4(); + let fsm = S::FSM_NAME.to_string(); + let event_value = serde_json::to_value(event).expect("TODO"); + + conn.execute(query, &[&id, &fsm, &fsm_id.to_uuid(), &event_value]).expect("TODO"); + return Ok(id) +} + +/// Insert a single action into the database and return its UUID. +pub fn insert_action(conn: &C, + fsm_id: MachineId, + action: S::Action) -> Result where + C: GenericConnection, + S: FSM, + S::Action: Serialize { + let query = r#" + INSERT INTO actions (id, created, fsm, fsm_id, action, status) + VALUES ($1, NOW(), $2, $3, $4, $5) + "#; + + let id = Uuid::new_v4(); + let fsm = S::FSM_NAME.to_string(); + let action_value = serde_json::to_value(action).expect("TODO"); + + conn.execute(query, &[&id, &fsm, &fsm_id.to_uuid(), &action_value, + &ActionStatus::Pending]).expect("TODO"); + return Ok(id) +} + +/// Retrieve the current state of a state machine from the database. +pub fn get_machine(conn: &C, id: MachineId) -> Result where + C: GenericConnection, + S: FSM + DeserializeOwned { + let query = r#" + SELECT (id, created, fsm, state) FROM machines WHERE id = $1 + "#; + + let rows = conn.query(query, &[&id.to_uuid()]).expect("TODO"); + let mut machines = rows.into_iter().map(|row| MachineT { + id: row.get(0), + created: row.get(1), + fsm: row.get(2), + state: row.get(3), + }); + + if let Some(machine) = machines.next() { + Ok(serde_json::from_value(machine.state).expect("TODO")) + } else { + // TODO: return appropriate not found error + Err(Error::SomeError) + } +} + +/// Advance a persisted state machine by applying an event, and +/// storing the event as well as all resulting actions. +/// +/// This function holds a database-lock on the state's row while +/// advancing the machine. +/// +/// **Note**: This function returns the new state of the machine +/// immediately after applying the event, however this does not +/// necessarily equate to the state of the machine after all related +/// processing is finished as running actions may result in additional +/// transitions. +pub fn advance(conn: &C, + id: MachineId, + event: S::Event) -> Result where + C: GenericConnection, + S: FSM + DeserializeOwned { + unimplemented!() +} -- cgit 1.4.1