about summary refs log tree commit diff
path: root/finito-postgres/src/lib.rs
diff options
context:
space:
mode:
Diffstat (limited to 'finito-postgres/src/lib.rs')
-rw-r--r--finito-postgres/src/lib.rs36
1 files changed, 28 insertions, 8 deletions
diff --git a/finito-postgres/src/lib.rs b/finito-postgres/src/lib.rs
index 844e8f79fee3..eea6405c6f45 100644
--- a/finito-postgres/src/lib.rs
+++ b/finito-postgres/src/lib.rs
@@ -9,6 +9,7 @@
 
 extern crate chrono;
 extern crate finito;
+extern crate r2d2_postgres;
 extern crate serde;
 extern crate serde_json;
 extern crate uuid;
@@ -19,16 +20,20 @@ extern crate uuid;
 mod error;
 pub use error::{Result, Error, ErrorKind};
 
-use error::ResultExt;
 use chrono::prelude::{DateTime, Utc};
+use error::ResultExt;
 use finito::{FSM, FSMBackend};
-use postgres::{Connection, GenericConnection};
 use postgres::transaction::Transaction;
+use postgres::GenericConnection;
 use serde::Serialize;
 use serde::de::DeserializeOwned;
 use serde_json::Value;
 use std::marker::PhantomData;
 use uuid::Uuid;
+use r2d2_postgres::{r2d2, PostgresConnectionManager};
+
+type DBPool = r2d2::Pool<PostgresConnectionManager>;
+type DBConn = r2d2::PooledConnection<PostgresConnectionManager>;
 
 /// This struct represents rows in the database table in which events
 /// are persisted.
@@ -103,8 +108,16 @@ struct ActionT {
 /// now.
 pub struct FinitoPostgres<S> {
     state: S,
-    // TODO: Use connection pool?
-    conn: Connection,
+
+    db_pool: DBPool,
+}
+
+impl <S> FinitoPostgres<S> {
+    pub fn new(state: S, db_pool: DBPool, pool_size: usize) -> Self {
+        FinitoPostgres {
+            state, db_pool,
+        }
+    }
 }
 
 impl <State: 'static> FSMBackend<State> for FinitoPostgres<State> {
@@ -121,14 +134,14 @@ impl <State: 'static> FSMBackend<State> for FinitoPostgres<State> {
         let fsm = S::FSM_NAME.to_string();
         let state = serde_json::to_value(initial).context("failed to serialise FSM")?;
 
-        self.conn.execute(query, &[&id, &fsm, &state]).context("failed to insert FSM")?;
+        self.conn()?.execute(query, &[&id, &fsm, &state]).context("failed to insert FSM")?;
 
         return Ok(id);
 
     }
 
     fn get_machine<S: FSM + DeserializeOwned>(&self, key: Uuid) -> Result<S> {
-        get_machine_internal(&self.conn, key, false)
+        get_machine_internal(&*self.conn()?, key, false)
     }
 
     /// Advance a persisted state machine by applying an event, and
@@ -147,7 +160,8 @@ impl <State: 'static> FSMBackend<State> for FinitoPostgres<State> {
           S::State: From<&'a State>,
           S::Event: Serialize + DeserializeOwned,
           S::Action: Serialize + DeserializeOwned {
-        let tx = self.conn.transaction().context("could not begin transaction")?;
+        let conn = self.conn()?;
+        let tx = conn.transaction().context("could not begin transaction")?;
         let state = get_machine_internal(&tx, key, true)?;
 
         // Advancing the FSM consumes the event, so it is persisted first:
@@ -184,9 +198,10 @@ impl <State: 'static> FinitoPostgres<State> {
         S::Action: Serialize + DeserializeOwned,
         S::State: From<&'a State> {
         let state: S::State = (&self.state).into();
+        let conn = self.conn().expect("TODO");
 
         for action_id in action_ids {
-            let tx = self.conn.transaction().expect("TODO");
+            let tx = conn.transaction().expect("TODO");
 
             // TODO: Determine which concurrency setup we actually want.
             if let Ok(events) = run_action(tx, action_id, &state, PhantomData::<S>) {
@@ -196,6 +211,11 @@ impl <State: 'static> FinitoPostgres<State> {
             }
         }
     }
+
+    /// Retrieve a single connection from the database connection pool.
+    fn conn(&self) -> Result<DBConn> {
+        self.db_pool.get().context("failed to retrieve connection from pool")
+    }
 }