Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 40 additions & 5 deletions src/pog.gleam
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

// TODO: add time things with zone once pgo supports them

const default_checkout_timeout = 5000

import exception
import gleam/dynamic.{type Dynamic}
import gleam/dynamic/decode.{type Decoder}
Expand All @@ -25,7 +27,7 @@ import gleam/uri.{Uri}
const default_port: Int = 5432

pub opaque type Connection {
Pool(Name(Message))
Pool(name: Name(Message), checkout_timeout: Int)
SingleConnection(SingleConnection)
}

Expand All @@ -39,7 +41,20 @@ pub type Message
/// connection will fail.
///
pub fn named_connection(name: Name(Message)) -> Connection {
Pool(name)
Pool(name:, checkout_timeout: default_checkout_timeout)
}

/// Create a reference to a pool using the pool's name and an explicit
/// checkout timeout in milliseconds.
///
/// Use this when the pool was started with a non-default `checkout_timeout`
/// and you need transactions on this connection to use the same timeout.
///
pub fn named_connection_with_timeout(
name: Name(Message),
checkout_timeout timeout: Int,
) -> Connection {
Pool(name:, checkout_timeout: timeout)
}
Comment on lines 43 to 58
Copy link

Copilot AI Apr 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The default checkout timeout value 5000 is duplicated (e.g. named_connection and default_config). To avoid drift if this default changes, consider introducing a single module-level constant (e.g. default_checkout_timeout) and referencing it in both places.

Copilot uses AI. Check for mistakes.

/// The configuration for a pool of connections.
Expand Down Expand Up @@ -75,6 +90,11 @@ pub type Config {
/// (default: 1000): The database is pinged every idle_interval when the
/// connection is idle.
idle_interval: Int,
/// (default: 5000): Timeout in milliseconds for checking out a connection
/// from the pool. Used by `transaction` when acquiring a dedicated
/// connection. If the checkout takes longer than this, the transaction
/// fails with `QueryTimeout`.
checkout_timeout: Int,
/// trace (default: False): pgo is instrumented with [OpenCensus][1] and
/// when this option is true a span will be created (if sampled).
///
Expand Down Expand Up @@ -189,6 +209,14 @@ pub fn idle_interval(config: Config, idle_interval: Int) -> Config {
Config(..config, idle_interval:)
}

/// Timeout in milliseconds for checking out a connection from the pool.
/// Used by `transaction` when acquiring a dedicated connection.
///
/// default: 5000
pub fn checkout_timeout(config: Config, checkout_timeout: Int) -> Config {
Config(..config, checkout_timeout:)
}

/// Trace pgo is instrumented with [OpenTelemetry][1] and
/// when this option is true a span will be created (if sampled).
///
Expand Down Expand Up @@ -235,6 +263,7 @@ pub fn default_config(pool_name pool_name: Name(Message)) -> Config {
queue_target: 50,
queue_interval: 1000,
idle_interval: 1000,
checkout_timeout: default_checkout_timeout,
trace: False,
ip_version: Ipv4,
rows_as_map: False,
Expand Down Expand Up @@ -335,7 +364,11 @@ fn extract_ssl_mode(query: option.Option(String)) -> Result(Ssl, Nil) {
///
pub fn start(config: Config) -> actor.StartResult(Connection) {
case start_tree(config) {
Ok(pid) -> Ok(actor.Started(pid, Pool(config.pool_name)))
Ok(pid) ->
Ok(actor.Started(
pid,
Pool(name: config.pool_name, checkout_timeout: config.checkout_timeout),
))
Error(reason) -> Error(actor.InitExited(process.Abnormal(reason)))
}
}
Expand Down Expand Up @@ -431,10 +464,11 @@ pub fn transaction(
SingleConnection(conn) -> {
transaction_layer(conn, callback)
}
Pool(name) -> {
Pool(name:, checkout_timeout:) -> {
// Check out a single connection from the pool
use #(ref, conn) <- result.try(
checkout(name) |> result.map_error(TransactionQueryError),
checkout(name, checkout_timeout)
|> result.map_error(TransactionQueryError),
)
Comment on lines +467 to 472
Copy link

Copilot AI Apr 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are transaction tests, but none that exercise the new checkout-timeout path (i.e. that checkout_timeout is actually passed through to pgo:checkout/2 and affects transaction behavior). Adding a test that forces checkout contention (e.g. pool_size=1 and holding a connection) would help prevent regressions.

Copilot uses AI. Check for mistakes.

// Make a best attempt to check back in the connection, even if this
Expand Down Expand Up @@ -481,6 +515,7 @@ fn transaction_layer(
@external(erlang, "pog_ffi", "checkout")
fn checkout(
pool: Name(Message),
timeout: Int,
) -> Result(#(Reference, SingleConnection), QueryError)

@external(erlang, "pgo", "checkin")
Expand Down
8 changes: 4 additions & 4 deletions src/pog_ffi.erl
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
-module(pog_ffi).

-export([query/4, query_extended/2, start/1, coerce/1, null/0, checkout/1]).
-export([query/4, query_extended/2, start/1, coerce/1, null/0, checkout/2]).

-include_lib("pog/include/pog_Config.hrl").
-include_lib("pg_types/include/pg_types.hrl").
Expand Down Expand Up @@ -86,7 +86,7 @@ query(Pool, Sql, Arguments, Timeout) ->
Res = case Pool of
{single_connection, Conn} ->
pgo_handler:extended_query(Conn, Sql, Arguments, #{});
{pool, Name} ->
{pool, Name, _CheckoutTimeout} ->
Options = #{
pool => Name,
pool_options => [{timeout, Timeout}]
Expand All @@ -110,8 +110,8 @@ query_extended(Conn, Sql) ->
{error, convert_error(Error)}
end.

checkout(Name) when is_atom(Name) ->
case pgo:checkout(Name) of
checkout(Name, Timeout) when is_atom(Name) ->
case pgo:checkout(Name, [{timeout, Timeout}]) of
{ok, Ref, Conn} -> {ok, {Ref, Conn}};
{error, Error} -> {error, convert_error(Error)}
end.
Expand Down
37 changes: 37 additions & 0 deletions test/pog_test.gleam
Original file line number Diff line number Diff line change
Expand Up @@ -615,3 +615,40 @@ pub fn transaction_commit_test() {

disconnect(db)
}

pub fn transaction_checkout_timeout_test() {
// Start a pool with pool_size=1 and a very short checkout timeout
let assert Ok(db) =
process.new_name("pog_checkout_test")
|> default_config
|> pog.pool_size(1)
|> pog.checkout_timeout(50)
|> pog.start

// Hold the only connection by starting a transaction that blocks.
// In a separate process, try another transaction — it should fail
// with QueryTimeout because checkout_timeout is only 50ms and the
// only connection is held.
let parent = process.new_subject()
let _pid = process.spawn_unlinked(fn() {
// Sleep briefly to ensure the outer transaction starts first
process.sleep(20)
let result = pog.transaction(db.data, fn(tx) {
let _ = pog.query("SELECT 1") |> pog.execute(tx)
Ok(Nil)
})
process.send(parent, result)
})

// The outer transaction holds the connection for longer than 50ms
let _ = pog.transaction(db.data, fn(tx) {
let _ = pog.query("SELECT pg_sleep(0.2)") |> pog.execute(tx)
Ok(Nil)
})

// The spawned process should have received a timeout error
let assert Ok(inner_result) = process.receive(parent, 5000)
let assert Error(pog.TransactionQueryError(pog.QueryTimeout)) = inner_result

disconnect(db)
}