diff --git a/src/pog.gleam b/src/pog.gleam index 7549db4..781d45b 100644 --- a/src/pog.gleam +++ b/src/pog.gleam @@ -4,6 +4,8 @@ // TODO: add time things with zone once pgo supports them +const default_checkout_timeout = 5000 + import exception import gleam/dynamic.{type Dynamic} import gleam/dynamic/decode.{type Decoder} @@ -25,7 +27,7 @@ import gleam/uri.{Uri} const default_port: Int = 5432 pub opaque type Connection { - Pool(Name(Message)) + Pool(name: Name(Message), checkout_timeout: Int) SingleConnection(SingleConnection) } @@ -39,7 +41,20 @@ pub type Message /// connection will fail. /// pub fn named_connection(name: Name(Message)) -> Connection { - Pool(name) + Pool(name:, checkout_timeout: default_checkout_timeout) +} + +/// Create a reference to a pool using the pool's name and an explicit +/// checkout timeout in milliseconds. +/// +/// Use this when the pool was started with a non-default `checkout_timeout` +/// and you need transactions on this connection to use the same timeout. +/// +pub fn named_connection_with_timeout( + name: Name(Message), + checkout_timeout timeout: Int, +) -> Connection { + Pool(name:, checkout_timeout: timeout) } /// The configuration for a pool of connections. @@ -75,6 +90,11 @@ pub type Config { /// (default: 1000): The database is pinged every idle_interval when the /// connection is idle. idle_interval: Int, + /// (default: 5000): Timeout in milliseconds for checking out a connection + /// from the pool. Used by `transaction` when acquiring a dedicated + /// connection. If the checkout takes longer than this, the transaction + /// fails with `QueryTimeout`. + checkout_timeout: Int, /// trace (default: False): pgo is instrumented with [OpenCensus][1] and /// when this option is true a span will be created (if sampled). /// @@ -189,6 +209,14 @@ pub fn idle_interval(config: Config, idle_interval: Int) -> Config { Config(..config, idle_interval:) } +/// Timeout in milliseconds for checking out a connection from the pool. +/// Used by `transaction` when acquiring a dedicated connection. +/// +/// default: 5000 +pub fn checkout_timeout(config: Config, checkout_timeout: Int) -> Config { + Config(..config, checkout_timeout:) +} + /// Trace pgo is instrumented with [OpenTelemetry][1] and /// when this option is true a span will be created (if sampled). /// @@ -235,6 +263,7 @@ pub fn default_config(pool_name pool_name: Name(Message)) -> Config { queue_target: 50, queue_interval: 1000, idle_interval: 1000, + checkout_timeout: default_checkout_timeout, trace: False, ip_version: Ipv4, rows_as_map: False, @@ -335,7 +364,11 @@ fn extract_ssl_mode(query: option.Option(String)) -> Result(Ssl, Nil) { /// pub fn start(config: Config) -> actor.StartResult(Connection) { case start_tree(config) { - Ok(pid) -> Ok(actor.Started(pid, Pool(config.pool_name))) + Ok(pid) -> + Ok(actor.Started( + pid, + Pool(name: config.pool_name, checkout_timeout: config.checkout_timeout), + )) Error(reason) -> Error(actor.InitExited(process.Abnormal(reason))) } } @@ -431,10 +464,11 @@ pub fn transaction( SingleConnection(conn) -> { transaction_layer(conn, callback) } - Pool(name) -> { + Pool(name:, checkout_timeout:) -> { // Check out a single connection from the pool use #(ref, conn) <- result.try( - checkout(name) |> result.map_error(TransactionQueryError), + checkout(name, checkout_timeout) + |> result.map_error(TransactionQueryError), ) // Make a best attempt to check back in the connection, even if this @@ -481,6 +515,7 @@ fn transaction_layer( @external(erlang, "pog_ffi", "checkout") fn checkout( pool: Name(Message), + timeout: Int, ) -> Result(#(Reference, SingleConnection), QueryError) @external(erlang, "pgo", "checkin") diff --git a/src/pog_ffi.erl b/src/pog_ffi.erl index 9c486a2..b85501e 100644 --- a/src/pog_ffi.erl +++ b/src/pog_ffi.erl @@ -1,6 +1,6 @@ -module(pog_ffi). --export([query/4, query_extended/2, start/1, coerce/1, null/0, checkout/1]). +-export([query/4, query_extended/2, start/1, coerce/1, null/0, checkout/2]). -include_lib("pog/include/pog_Config.hrl"). -include_lib("pg_types/include/pg_types.hrl"). @@ -86,7 +86,7 @@ query(Pool, Sql, Arguments, Timeout) -> Res = case Pool of {single_connection, Conn} -> pgo_handler:extended_query(Conn, Sql, Arguments, #{}); - {pool, Name} -> + {pool, Name, _CheckoutTimeout} -> Options = #{ pool => Name, pool_options => [{timeout, Timeout}] @@ -110,8 +110,8 @@ query_extended(Conn, Sql) -> {error, convert_error(Error)} end. -checkout(Name) when is_atom(Name) -> - case pgo:checkout(Name) of +checkout(Name, Timeout) when is_atom(Name) -> + case pgo:checkout(Name, [{timeout, Timeout}]) of {ok, Ref, Conn} -> {ok, {Ref, Conn}}; {error, Error} -> {error, convert_error(Error)} end. diff --git a/test/pog_test.gleam b/test/pog_test.gleam index 3a6f7ad..c12e657 100644 --- a/test/pog_test.gleam +++ b/test/pog_test.gleam @@ -615,3 +615,40 @@ pub fn transaction_commit_test() { disconnect(db) } + +pub fn transaction_checkout_timeout_test() { + // Start a pool with pool_size=1 and a very short checkout timeout + let assert Ok(db) = + process.new_name("pog_checkout_test") + |> default_config + |> pog.pool_size(1) + |> pog.checkout_timeout(50) + |> pog.start + + // Hold the only connection by starting a transaction that blocks. + // In a separate process, try another transaction — it should fail + // with QueryTimeout because checkout_timeout is only 50ms and the + // only connection is held. + let parent = process.new_subject() + let _pid = process.spawn_unlinked(fn() { + // Sleep briefly to ensure the outer transaction starts first + process.sleep(20) + let result = pog.transaction(db.data, fn(tx) { + let _ = pog.query("SELECT 1") |> pog.execute(tx) + Ok(Nil) + }) + process.send(parent, result) + }) + + // The outer transaction holds the connection for longer than 50ms + let _ = pog.transaction(db.data, fn(tx) { + let _ = pog.query("SELECT pg_sleep(0.2)") |> pog.execute(tx) + Ok(Nil) + }) + + // The spawned process should have received a timeout error + let assert Ok(inner_result) = process.receive(parent, 5000) + let assert Error(pog.TransactionQueryError(pog.QueryTimeout)) = inner_result + + disconnect(db) +}