about summary refs log tree commit diff
path: root/finito-postgres/src/lib.rs
diff options
context:
space:
mode:
authorVincent Ambo <mail@tazj.in>2018-12-13T15·53+0100
committerVincent Ambo <mail@tazj.in>2018-12-13T15·53+0100
commitb7481172252d6f00546e94534b05d011b4105843 (patch)
tree3d3d3024d7d45d3875ad651f01b4df91acd7967e /finito-postgres/src/lib.rs
parent42d56713bd6bbaf213cac37ff85fb1c64188fec5 (diff)
feat(postgres): Introduce database connection pool
Adds an r2d2 database connection pool to the backend type. This makes
it possible for separate FSMs to run at the same time through the same
backend, by retrieving separate connections.
Diffstat (limited to 'finito-postgres/src/lib.rs')
-rw-r--r--finito-postgres/src/lib.rs36
1 files changed, 28 insertions, 8 deletions
diff --git a/finito-postgres/src/lib.rs b/finito-postgres/src/lib.rs
index 844e8f79fee3..eea6405c6f45 100644
--- a/finito-postgres/src/lib.rs
+++ b/finito-postgres/src/lib.rs
@@ -9,6 +9,7 @@
 
 extern crate chrono;
 extern crate finito;
+extern crate r2d2_postgres;
 extern crate serde;
 extern crate serde_json;
 extern crate uuid;
@@ -19,16 +20,20 @@ extern crate uuid;
 mod error;
 pub use error::{Result, Error, ErrorKind};
 
-use error::ResultExt;
 use chrono::prelude::{DateTime, Utc};
+use error::ResultExt;
 use finito::{FSM, FSMBackend};
-use postgres::{Connection, GenericConnection};
 use postgres::transaction::Transaction;
+use postgres::GenericConnection;
 use serde::Serialize;
 use serde::de::DeserializeOwned;
 use serde_json::Value;
 use std::marker::PhantomData;
 use uuid::Uuid;
+use r2d2_postgres::{r2d2, PostgresConnectionManager};
+
+type DBPool = r2d2::Pool<PostgresConnectionManager>;
+type DBConn = r2d2::PooledConnection<PostgresConnectionManager>;
 
 /// This struct represents rows in the database table in which events
 /// are persisted.
@@ -103,8 +108,16 @@ struct ActionT {
 /// now.
 pub struct FinitoPostgres<S> {
     state: S,
-    // TODO: Use connection pool?
-    conn: Connection,
+
+    db_pool: DBPool,
+}
+
+impl <S> FinitoPostgres<S> {
+    pub fn new(state: S, db_pool: DBPool, pool_size: usize) -> Self {
+        FinitoPostgres {
+            state, db_pool,
+        }
+    }
 }
 
 impl <State: 'static> FSMBackend<State> for FinitoPostgres<State> {
@@ -121,14 +134,14 @@ impl <State: 'static> FSMBackend<State> for FinitoPostgres<State> {
         let fsm = S::FSM_NAME.to_string();
         let state = serde_json::to_value(initial).context("failed to serialise FSM")?;
 
-        self.conn.execute(query, &[&id, &fsm, &state]).context("failed to insert FSM")?;
+        self.conn()?.execute(query, &[&id, &fsm, &state]).context("failed to insert FSM")?;
 
         return Ok(id);
 
     }
 
     fn get_machine<S: FSM + DeserializeOwned>(&self, key: Uuid) -> Result<S> {
-        get_machine_internal(&self.conn, key, false)
+        get_machine_internal(&*self.conn()?, key, false)
     }
 
     /// Advance a persisted state machine by applying an event, and
@@ -147,7 +160,8 @@ impl <State: 'static> FSMBackend<State> for FinitoPostgres<State> {
           S::State: From<&'a State>,
           S::Event: Serialize + DeserializeOwned,
           S::Action: Serialize + DeserializeOwned {
-        let tx = self.conn.transaction().context("could not begin transaction")?;
+        let conn = self.conn()?;
+        let tx = conn.transaction().context("could not begin transaction")?;
         let state = get_machine_internal(&tx, key, true)?;
 
         // Advancing the FSM consumes the event, so it is persisted first:
@@ -184,9 +198,10 @@ impl <State: 'static> FinitoPostgres<State> {
         S::Action: Serialize + DeserializeOwned,
         S::State: From<&'a State> {
         let state: S::State = (&self.state).into();
+        let conn = self.conn().expect("TODO");
 
         for action_id in action_ids {
-            let tx = self.conn.transaction().expect("TODO");
+            let tx = conn.transaction().expect("TODO");
 
             // TODO: Determine which concurrency setup we actually want.
             if let Ok(events) = run_action(tx, action_id, &state, PhantomData::<S>) {
@@ -196,6 +211,11 @@ impl <State: 'static> FinitoPostgres<State> {
             }
         }
     }
+
+    /// Retrieve a single connection from the database connection pool.
+    fn conn(&self) -> Result<DBConn> {
+        self.db_pool.get().context("failed to retrieve connection from pool")
+    }
 }