From aa85a187236585488764206239b3c92711e690b3 Mon Sep 17 00:00:00 2001 From: Profpatsch Date: Mon, 13 May 2024 11:22:12 +0200 Subject: feat(users/Profpatsch/MonadPostgres): add PgFormatPool Change-Id: Id65ee6184ef536fe6a46637005bea903b37f6ffd Reviewed-on: https://cl.tvl.fyi/c/depot/+/11653 Autosubmit: Profpatsch Tested-by: BuildkiteCI Reviewed-by: Profpatsch --- .../my-prelude/src/Postgres/MonadPostgres.hs | 146 ++++++++++++++++++--- 1 file changed, 127 insertions(+), 19 deletions(-) (limited to 'users') diff --git a/users/Profpatsch/my-prelude/src/Postgres/MonadPostgres.hs b/users/Profpatsch/my-prelude/src/Postgres/MonadPostgres.hs index 61428205c15f..b0f00de422e0 100644 --- a/users/Profpatsch/my-prelude/src/Postgres/MonadPostgres.hs +++ b/users/Profpatsch/my-prelude/src/Postgres/MonadPostgres.hs @@ -7,11 +7,17 @@ module Postgres.MonadPostgres where import AtLeast (AtLeast) import Control.Exception + ( Exception (displayException), + Handler (Handler), + catches, + try, + ) import Control.Foldl qualified as Fold import Control.Monad.Logger.CallStack (MonadLogger, logDebug, logWarn) import Control.Monad.Reader (MonadReader (ask), ReaderT (..)) import Control.Monad.Trans.Resource import Data.Aeson (FromJSON) +import Data.ByteString qualified as ByteString import Data.Error.Tree import Data.HashMap.Strict qualified as HashMap import Data.Int (Int64) @@ -28,6 +34,7 @@ import Database.PostgreSQL.Simple.FromRow qualified as PG import Database.PostgreSQL.Simple.ToField (ToField) import Database.PostgreSQL.Simple.ToRow (ToRow (toRow)) import Database.PostgreSQL.Simple.Types (Query (..)) +import GHC.IO.Handle (Handle) import GHC.Records (getField) import Label import OpenTelemetry.Trace.Core qualified as Otel hiding (inSpan, inSpan') @@ -39,7 +46,9 @@ import Pretty (showPretty) import Seconds import System.Exit (ExitCode (..)) import Tool -import UnliftIO (MonadUnliftIO (withRunInIO)) +import UnliftIO (MonadUnliftIO (withRunInIO), bracket, hClose) +import UnliftIO.Concurrent (forkIO) +import UnliftIO.Process (ProcessHandle) import UnliftIO.Process qualified as Process import UnliftIO.Resource qualified as Resource import Prelude hiding (init, span) @@ -405,6 +414,74 @@ withPGTransaction connPool f = connPool (\conn -> Postgres.withTransaction conn (f conn)) +-- | `pg_formatter` is a perl script that does not support any kind of streaming. +-- Thus we initialize a pool with a bunch of these scripts running, waiting for input. This way we can have somewhat fast SQL formatting. +-- +-- Call `initPgFormatPool` to initialize, then use `withPgFormat` to format some sql. +data PgFormatPool = PgFormatPool + { pool :: Pool PgFormatProcess, + pgFormat :: Tool + } + +data PgFormatProcess = PgFormatProcess + { stdinHdl :: Handle, + stdoutHdl :: Handle, + procHdl :: ProcessHandle + } + +initPgFormatPool :: (HasField "pgFormat" tools Tool) => tools -> IO PgFormatPool +initPgFormatPool tools = do + pool <- + Pool.newPool + ( Pool.defaultPoolConfig + (pgFormatStartCommandWaitForInput tools) + ( \pgFmt -> do + Process.terminateProcess pgFmt.procHdl + -- make sure we don’t leave any zombies + _ <- forkIO $ do + _ <- Process.waitForProcess pgFmt.procHdl + pure () + pure () + ) + -- unused resource time + 100 + -- number of resources + 3 + ) + + -- fill the pool with resources + let go = + Pool.tryWithResource pool (\_ -> go) >>= \case + Nothing -> pure () + Just () -> pure () + _ <- go + pure (PgFormatPool {pool, pgFormat = tools.pgFormat}) + +destroyPgFormatPool :: PgFormatPool -> IO () +destroyPgFormatPool pool = Pool.destroyAllResources pool.pool + +-- | Format the given SQL with pg_formatter. Will use the pool of already running formatters to speed up execution. +withPgFormat :: PgFormatPool -> ByteString -> IO (ExitCode, ByteString) +withPgFormat pool sqlStatement = do + bracket + (Pool.takeResource pool.pool) + ( \(a, localPool) -> do + -- we always destroy the resource, because the process exited + Pool.destroyResource pool.pool localPool a + -- create a new process to keep the pool “warm” + new <- pgFormatStartCommandWaitForInput pool + Pool.putResource localPool new + ) + ( \(pgFmt, _localPool) -> do + ByteString.hPut pgFmt.stdinHdl sqlStatement + -- close stdin to make pg_formatter format (it exits …) + -- issue: https://github.com/darold/pgFormatter/issues/333 + hClose pgFmt.stdinHdl + formatted <- ByteString.hGetContents pgFmt.stdoutHdl + exitCode <- Process.waitForProcess pgFmt.procHdl + pure (exitCode, formatted) + ) + runPGTransactionImpl :: (MonadUnliftIO m) => m (Pool Postgres.Connection) -> @@ -664,21 +741,50 @@ pgFormatQueryByteString tools queryBytes = do "-" ] (queryBytes & bytesToTextUtf8Lenient & textToString) - case exitCode of - ExitSuccess -> pure (stdout & stringToText) - ExitFailure status -> do - logWarn [fmt|pg_format failed with status {status} while formatting the query, using original query string. Is there a syntax error?|] - logDebug - ( prettyErrorTree - ( nestedMultiError - "pg_format output" - ( nestedError "stdout" (singleError (stdout & stringToText & newError)) - :| [(nestedError "stderr" (singleError (stderr & stringToText & newError)))] - ) - ) + handlePgFormatExitCode exitCode stdout stderr queryBytes + +pgFormatStartCommandWaitForInput :: + ( MonadIO m, + HasField "pgFormat" tools Tool, + MonadFail m + ) => + tools -> + m PgFormatProcess +pgFormatStartCommandWaitForInput tools = do + do + (Just stdinHdl, Just stdoutHdl, Nothing, procHdl) <- + Process.createProcess + ( ( Process.proc + tools.pgFormat.toolPath + [ "--no-rcfile", + "-" + ] ) - logDebug [fmt|pg_format stdout: stderr|] - pure (queryBytes & bytesToTextUtf8Lenient) + { Process.std_in = Process.CreatePipe, + Process.std_out = Process.CreatePipe, + Process.std_err = Process.Inherit + } + ) + + pure PgFormatProcess {..} + +handlePgFormatExitCode :: (MonadLogger m) => ExitCode -> String -> String -> ByteString -> m Text +handlePgFormatExitCode exitCode stdout stderr queryBytes = + case exitCode of + ExitSuccess -> pure (stdout & stringToText) + ExitFailure status -> do + logWarn [fmt|pg_format failed with status {status} while formatting the query, using original query string. Is there a syntax error?|] + logDebug + ( prettyErrorTree + ( nestedMultiError + "pg_format output" + ( nestedError "stdout" (singleError (stdout & stringToText & newError)) + :| [(nestedError "stderr" (singleError (stderr & stringToText & newError)))] + ) + ) + ) + logDebug [fmt|pg_format stdout: stderr|] + pure (queryBytes & bytesToTextUtf8Lenient) data DebugLogDatabaseQueries = -- | Do not log the database queries @@ -710,10 +816,12 @@ traceQueryIfEnabled :: Transaction m () traceQueryIfEnabled tools span logDatabaseQueries qry params = do -- In case we have query logging enabled, we want to do that - let formattedQuery = case params of - HasNoParams -> pgFormatQueryNoParams' tools qry - HasSingleParam p -> pgFormatQuery' tools qry p - HasMultiParams ps -> pgFormatQueryMany' tools qry ps + let formattedQuery = + Otel.inSpan "Postgres Query Formatting" Otel.defaultSpanArguments $ + case params of + HasNoParams -> pgFormatQueryNoParams' tools qry + HasSingleParam p -> pgFormatQuery' tools qry p + HasMultiParams ps -> pgFormatQueryMany' tools qry ps let doLog errs = Otel.addAttributes span -- cgit 1.4.1