about summary refs log tree commit diff
path: root/users/Profpatsch/my-prelude/src/Postgres
diff options
context:
space:
mode:
Diffstat (limited to 'users/Profpatsch/my-prelude/src/Postgres')
-rw-r--r--users/Profpatsch/my-prelude/src/Postgres/MonadPostgres.hs33
1 files changed, 29 insertions, 4 deletions
diff --git a/users/Profpatsch/my-prelude/src/Postgres/MonadPostgres.hs b/users/Profpatsch/my-prelude/src/Postgres/MonadPostgres.hs
index 57daeb3aceef..a542f8c7b899 100644
--- a/users/Profpatsch/my-prelude/src/Postgres/MonadPostgres.hs
+++ b/users/Profpatsch/my-prelude/src/Postgres/MonadPostgres.hs
@@ -5,6 +5,7 @@
 
 module Postgres.MonadPostgres where
 
+import Arg
 import AtLeast (AtLeast)
 import Control.Exception
   ( Exception (displayException),
@@ -26,7 +27,6 @@ import Data.List qualified as List
 import Data.Pool (Pool)
 import Data.Pool qualified as Pool
 import Data.Text qualified as Text
-import Data.Time (getCurrentTime)
 import Data.Typeable (Typeable)
 import Database.PostgreSQL.Simple (Connection, FormatError, FromRow, Query, QueryError, ResultError, SqlError, ToRow)
 import Database.PostgreSQL.Simple qualified as PG
@@ -48,7 +48,7 @@ import Pretty (showPretty)
 import Seconds
 import System.Exit (ExitCode (..))
 import Tool
-import UnliftIO (MonadUnliftIO (withRunInIO), bracket, hClose)
+import UnliftIO (MonadUnliftIO (withRunInIO), bracket, hClose, mask_)
 import UnliftIO.Concurrent (forkIO)
 import UnliftIO.Process (ProcessHandle)
 import UnliftIO.Process qualified as Process
@@ -429,7 +429,8 @@ data PgFormatProcess = PgFormatProcess
   { stdinHdl :: Handle,
     stdoutHdl :: Handle,
     stderrHdl :: Handle,
-    procHdl :: ProcessHandle
+    procHdl :: ProcessHandle,
+    startedAt :: Otel.Timestamp
   }
 
 initPgFormatPool :: (HasField "pgFormat" tools Tool) => tools -> IO PgFormatPool
@@ -463,11 +464,34 @@ initPgFormatPool tools = do
 destroyPgFormatPool :: PgFormatPool -> IO ()
 destroyPgFormatPool pool = Pool.destroyAllResources pool.pool
 
+-- | Get the oldest resource from the pool, or stop if you find a resource that’s older than `cutoffPointMs`.
+takeOldestResource :: PgFormatPool -> Arg "cutoffPointMs" Integer -> IO (PgFormatProcess, Pool.LocalPool PgFormatProcess)
+takeOldestResource pool cutoffPointMs = do
+  now <- Otel.getTimestamp
+  mask_ $ do
+    a <- Pool.takeResource pool.pool
+    (putBack, res) <- go now [] a
+    -- make sure we don’t leak any resources we didn’t use in the end
+    for_ putBack $ \(x, xLocal) -> Pool.putResource xLocal x
+    pure res
+  where
+    mkMs ts = (ts & Otel.timestampNanoseconds & toInteger) `div` 1000_000
+    go now putBack a@(a', _) =
+      if abs (mkMs now - mkMs a'.startedAt) > cutoffPointMs.unArg
+        then pure (putBack, a)
+        else
+          Pool.tryTakeResource pool.pool >>= \case
+            Nothing -> pure (putBack, a)
+            Just b@(b', _) -> do
+              if a'.startedAt < b'.startedAt
+                then go now (b : putBack) a
+                else go now (a : putBack) b
+
 -- | Format the given SQL with pg_formatter. Will use the pool of already running formatters to speed up execution.
 runPgFormat :: PgFormatPool -> ByteString -> IO (T3 "exitCode" ExitCode "formatted" ByteString "stderr" ByteString)
 runPgFormat pool sqlStatement = do
   bracket
-    (Pool.takeResource pool.pool)
+    (takeOldestResource pool 200)
     ( \(a, localPool) -> do
         -- we always destroy the resource, because the process exited
         Pool.destroyResource pool.pool localPool a
@@ -772,6 +796,7 @@ pgFormatStartCommandWaitForInput ::
   m PgFormatProcess
 pgFormatStartCommandWaitForInput tools = do
   do
+    startedAt <- Otel.getTimestamp
     (Just stdinHdl, Just stdoutHdl, Just stderrHdl, procHdl) <-
       Process.createProcess
         ( ( Process.proc