A flexible and robust gRPC connection pooling library for Elixir, providing efficient connection management with automatic health monitoring, connection warming, and environment-agnostic configuration.
GrpcConnectionPool was extracted from a production Pub/Sub gRPC client to provide a generic, reusable solution for any Elixir application that needs reliable gRPC connection pooling. It's built on top of Poolex for modern, efficient worker pool management.
- 🌐 Environment-agnostic: Works seamlessly with production, local development, and test environments
- 🔄 Health monitoring: Automatic connection health checks and recovery
- 🔥 Connection warming: Periodic pings to prevent idle connection timeouts
- 🔁 Retry logic: Configurable exponential backoff for connection failures
- 🏗️ Multiple pools: Support for multiple named pools serving different gRPC services
- ⚙️ Flexible configuration: Configure via code, environment variables, or config files
- 🧪 Production tested: Fully tested with real gRPC services including Google Pub/Sub emulator
We chose this architecture after learning from production experience with gRPC connection pooling:
-
Poolex over NimblePool: While NimblePool is excellent, Poolex offers more modern features and better monitoring capabilities that are essential for production gRPC services.
-
GenServer Workers: Each connection is managed by a dedicated GenServer that handles:
- Connection lifecycle management
- Health monitoring with periodic pings
- Automatic reconnection with exponential backoff
- Connection warming to prevent timeouts
-
Environment-Agnostic Configuration: Real applications need to work across development (local gRPC servers), testing (emulators), and production (secure cloud services) environments seamlessly.
-
Separation of Concerns:
Config: Pure configuration managementWorker: Connection lifecycle and healthPool: Poolex integration and operation executionGrpcConnectionPool: Clean public API
- Reduced latency: Pre-warmed connections eliminate cold start delays
- Improved reliability: Automatic reconnection and health monitoring
- Resource efficiency: Connection reuse and proper cleanup
- Scalability: Multiple pools for different services
- Operational visibility: Built-in monitoring and metrics
Add grpc_connection_pool to your dependencies in mix.exs:
def deps do
[
{:grpc_connection_pool, "~> 0.1.0"},
{:grpc, "~> 0.10.2"} # Required peer dependency
]
end# Create configuration
{:ok, config} = GrpcConnectionPool.Config.production(
host: "api.example.com",
port: 443,
pool_size: 5
)
# Start pool
{:ok, _pid} = GrpcConnectionPool.start_link(config)
# Execute gRPC operations
operation = fn channel ->
request = %MyService.ListRequest{}
MyService.Stub.list(channel, request)
end
{:ok, response} = GrpcConnectionPool.execute(operation){:ok, config} = GrpcConnectionPool.Config.local(
host: "localhost",
port: 9090,
pool_size: 3
)
{:ok, _pid} = GrpcConnectionPool.start_link(config)The library supports flexible configuration through GrpcConnectionPool.Config:
{:ok, config} = GrpcConnectionPool.Config.new([
endpoint: [
type: :production, # :production, :local, or custom atom
host: "api.example.com", # Required: gRPC server hostname
port: 443, # Required: gRPC server port
ssl: [], # SSL options ([] for default SSL)
credentials: nil, # Custom GRPC.Credential (overrides ssl)
retry_config: [ # Optional: retry configuration
max_attempts: 3,
base_delay: 1000,
max_delay: 5000
]
],
pool: [
size: 5, # Number of connections in pool
name: MyApp.GrpcPool, # Pool name (must be unique)
checkout_timeout: 15_000 # Timeout for getting connections
],
connection: [
keepalive: 30_000, # HTTP/2 keepalive interval
ping_interval: 25_000, # Ping interval to keep connections warm
health_check: true, # Enable connection health monitoring
suppress_connection_errors: false # Suppress gun_down/gun_error logs (useful for GCP endpoints)
]
])# Single service configuration
config :my_app, GrpcConnectionPool,
endpoint: [
type: :production,
host: "api.example.com",
port: 443,
ssl: []
],
pool: [
size: 10,
name: MyApp.GrpcPool
],
connection: [
ping_interval: 30_000
]
# Multiple service configuration
config :my_app, :service_a,
endpoint: [type: :production, host: "service-a.example.com"],
pool: [size: 5, name: MyApp.ServiceA.Pool]
config :my_app, :service_b,
endpoint: [type: :production, host: "service-b.example.com"],
pool: [size: 8, name: MyApp.ServiceB.Pool]# For a production gRPC service with SSL
config :my_app, GrpcConnectionPool,
endpoint: [
type: :production,
host: "api.example.com",
port: 443,
ssl: [] # Use default SSL settings
],
pool: [
size: 10,
name: MyApp.GrpcPool
],
connection: [
keepalive: 30_000, # Send keepalive every 30 seconds
ping_interval: 25_000, # Ping every 25 seconds to keep warm
health_check: true
]# For local development with a gRPC server (no SSL)
config :my_app, GrpcConnectionPool,
endpoint: [
type: :local,
host: "localhost",
port: 9090
],
pool: [
size: 3, # Smaller pool for development
name: MyApp.DevPool
]# For testing with Google Pub/Sub emulator or similar
config :my_app, GrpcConnectionPool,
endpoint: [
type: :test,
host: "localhost",
port: 8085,
retry_config: [
max_attempts: 3,
base_delay: 1000,
max_delay: 5000
]
],
pool: [
size: 2, # Small pool for tests
name: MyApp.TestPool
],
connection: [
ping_interval: nil # Disable pinging in tests
]# Configuration for multiple gRPC services
config :my_app, :service_a,
endpoint: [
type: :production,
host: "service-a.example.com",
port: 443
],
pool: [
size: 5,
name: MyApp.ServiceAPool
]
config :my_app, :service_b,
endpoint: [
type: :production,
host: "service-b.example.com",
port: 443
],
pool: [
size: 8,
name: MyApp.ServiceBPool
]# Custom SSL configuration with client certificates
config :my_app, GrpcConnectionPool,
endpoint: [
type: :production,
host: "secure-api.example.com",
port: 443,
credentials: GRPC.Credential.new(ssl: [
verify: :verify_peer,
cacertfile: "/path/to/ca.pem",
certfile: "/path/to/client.pem",
keyfile: "/path/to/client-key.pem"
])
]# Different configurations per environment using Mix.env()
case Mix.env() do
:prod ->
config :my_app, GrpcConnectionPool,
endpoint: [
type: :production,
host: "api.example.com",
port: 443
],
pool: [size: 20] # Large pool for production
:dev ->
config :my_app, GrpcConnectionPool,
endpoint: [
type: :local,
host: "localhost",
port: 9090
],
pool: [size: 3] # Small pool for development
:test ->
config :my_app, GrpcConnectionPool,
endpoint: [
type: :test,
host: "localhost",
port: 8085
],
pool: [size: 1], # Minimal pool for tests
connection: [
ping_interval: nil # No pinging needed in tests
]
end# Single pool
{:ok, config} = GrpcConnectionPool.Config.from_env(:my_app)
# Multiple pools
{:ok, service_a_config} = GrpcConnectionPool.Config.from_env(:my_app, :service_a)
{:ok, service_b_config} = GrpcConnectionPool.Config.from_env(:my_app, :service_b)For configuration determined at runtime (e.g., from environment variables):
import Config
if config_env() == :prod do
# Get configuration from environment variables
grpc_host = System.get_env("GRPC_HOST") || "api.example.com"
grpc_port = System.get_env("GRPC_PORT", "443") |> String.to_integer()
pool_size = System.get_env("GRPC_POOL_SIZE", "10") |> String.to_integer()
config :my_app, GrpcConnectionPool,
endpoint: [
type: :production,
host: grpc_host,
port: grpc_port
],
pool: [size: pool_size]
enddefmodule MyApp.Application do
use Application
def start(_type, _args) do
# Load configuration
{:ok, config} = GrpcConnectionPool.Config.from_env(:my_app)
children = [
# Your other services...
{GrpcConnectionPool, config}
]
opts = [strategy: :one_for_one, name: MyApp.Supervisor]
Supervisor.start_link(children, opts)
end
enddefmodule MyApp.Application do
use Application
def start(_type, _args) do
# Load configurations for different services
{:ok, service_a_config} = GrpcConnectionPool.Config.from_env(:my_app, :service_a)
{:ok, service_b_config} = GrpcConnectionPool.Config.from_env(:my_app, :service_b)
children = [
# Your other services...
{GrpcConnectionPool, service_a_config},
{GrpcConnectionPool, service_b_config}
]
opts = [strategy: :one_for_one, name: MyApp.Supervisor]
Supervisor.start_link(children, opts)
end
enddef start(_type, _args) do
# Create configurations programmatically
{:ok, user_service_config} = GrpcConnectionPool.Config.production(
host: "users.example.com",
pool_name: MyApp.UserService.Pool,
pool_size: 5
)
{:ok, payment_service_config} = GrpcConnectionPool.Config.production(
host: "payments.example.com",
pool_name: MyApp.PaymentService.Pool,
pool_size: 3
)
children = [
{GrpcConnectionPool, user_service_config},
{GrpcConnectionPool, payment_service_config}
]
Supervisor.start_link(children, strategy: :one_for_one)
end# lib/my_app/application.ex
defmodule MyApp.Application do
use Application
def start(_type, _args) do
# Single pool from config
{:ok, config} = GrpcConnectionPool.Config.from_env(:my_app)
children = [
# Your other processes...
{GrpcConnectionPool, config}
]
opts = [strategy: :one_for_one, name: MyApp.Supervisor]
Supervisor.start_link(children, opts)
end
end# lib/my_app/application.ex
defmodule MyApp.Application do
use Application
def start(_type, _args) do
# Load configurations for different services
{:ok, service_a_config} = GrpcConnectionPool.Config.from_env(:my_app, :service_a)
{:ok, service_b_config} = GrpcConnectionPool.Config.from_env(:my_app, :service_b)
children = [
# Your other services...
{GrpcConnectionPool, service_a_config},
{GrpcConnectionPool, service_b_config}
]
opts = [strategy: :one_for_one, name: MyApp.Supervisor]
Supervisor.start_link(children, opts)
end
end# Simple operation with default pool
operation = fn channel ->
request = %MyService.GetUserRequest{user_id: "123"}
MyService.Stub.get_user(channel, request)
end
case GrpcConnectionPool.execute(operation) do
{:ok, {:ok, user}} -> IO.puts("Found user: #{user.name}")
{:ok, {:error, error}} -> IO.puts("gRPC error: #{inspect(error)}")
{:error, reason} -> IO.puts("Pool error: #{inspect(reason)}")
end# Execute on specific pool
user_operation = fn channel ->
request = %UserService.GetRequest{id: "123"}
UserService.Stub.get(channel, request)
end
payment_operation = fn channel ->
request = %PaymentService.ChargeRequest{amount: 1000}
PaymentService.Stub.charge(channel, request)
end
# Use different pools for different services
{:ok, user} = GrpcConnectionPool.execute(user_operation, pool: MyApp.UserService.Pool)
{:ok, charge} = GrpcConnectionPool.execute(payment_operation, pool: MyApp.PaymentService.Pool)operation = fn channel ->
request = %MyService.CreateRequest{data: "important data"}
MyService.Stub.create(channel, request)
end
case GrpcConnectionPool.execute(operation) do
{:ok, {:ok, result}} ->
# Success case
handle_success(result)
{:ok, {:error, %GRPC.RPCError{status: status, message: message}}} ->
# gRPC-level error (service returned error)
handle_grpc_error(status, message)
{:error, :not_connected} ->
# Pool has no healthy connections
handle_connection_error()
{:error, {:exit, {:noproc, _}}} ->
# Pool doesn't exist
handle_pool_missing_error()
{:error, reason} ->
# Other pool-level errors
handle_pool_error(reason)
end# For services requiring client certificates
ssl_config = [
verify: :verify_peer,
cacertfile: "/path/to/ca.pem",
certfile: "/path/to/client.pem",
keyfile: "/path/to/client-key.pem"
]
{:ok, config} = GrpcConnectionPool.Config.new([
endpoint: [
type: :production,
host: "secure-api.example.com",
port: 443,
ssl: ssl_config
],
pool: [size: 5]
])# For advanced authentication scenarios
credentials = GRPC.Credential.new(ssl: [
verify: :verify_peer,
cacerts: :public_key.cacerts_get()
])
{:ok, config} = GrpcConnectionPool.Config.new([
endpoint: [
type: :production,
host: "api.example.com",
port: 443,
credentials: credentials # Overrides ssl config
]
])# config/dev.exs
config :my_app, GrpcConnectionPool,
endpoint: [
type: :local,
host: "localhost",
port: 9090 # Local gRPC server
],
pool: [size: 2], # Smaller pool for development
connection: [
ping_interval: nil # Disable pinging for local development
]# config/test.exs
config :my_app, GrpcConnectionPool,
endpoint: [
type: :test,
host: "localhost",
port: 8085, # Emulator port
retry_config: [
max_attempts: 3,
base_delay: 500, # Faster retries for tests
max_delay: 2000
]
],
pool: [size: 1], # Minimal pool for tests
connection: [
ping_interval: nil, # No pinging needed in tests
health_check: false # Disable health checks in tests
]# config/prod.exs
config :my_app, GrpcConnectionPool,
endpoint: [
type: :production,
host: "api.example.com",
port: 443,
ssl: [] # Use default SSL
],
pool: [
size: 20, # Large pool for production load
checkout_timeout: 10_000
],
connection: [
keepalive: 30_000,
ping_interval: 25_000,
health_check: true
]# Check pool status
status = GrpcConnectionPool.status()
IO.inspect(status) # %{pool_name: GrpcConnectionPool.Pool, status: :running}
# Check specific pool
status = GrpcConnectionPool.status(MyApp.UserService.Pool)# Stop specific pool
:ok = GrpcConnectionPool.stop(MyApp.UserService.Pool)
# The pool will be automatically restarted by the supervisor if neededThe library automatically monitors connection health and replaces dead connections. You can also check worker status:
# This is mainly for debugging - not needed in normal usage
worker_pid = :poolex.checkout(MyApp.UserService.Pool)
status = GrpcConnectionPool.Worker.status(worker_pid) # :connected | :disconnected
:poolex.checkin(MyApp.UserService.Pool, worker_pid)mix test --exclude emulatorStart the Google Pub/Sub emulator (or any gRPC service emulator):
# Using Docker Compose (if available)
docker-compose up -d
# Or manually
docker run --rm -p 8085:8085 google/cloud-sdk:emulators-pubsub \
/google-cloud-sdk/bin/gcloud beta emulators pubsub start \
--host-port=0.0.0.0:8085 --project=test-projectRun integration tests:
mix test --only emulatorRun all tests:
mix test# In your test helper
defmodule MyApp.TestHelper do
def start_test_pool do
{:ok, config} = GrpcConnectionPool.Config.local(
host: "localhost",
port: 8085, # Your test service port
pool_size: 1
)
{:ok, _pid} = GrpcConnectionPool.start_link(config, name: TestPool)
end
def stop_test_pool do
GrpcConnectionPool.stop(TestPool)
end
end
# In your tests
defmodule MyApp.GrpcTest do
use ExUnit.Case
setup do
MyApp.TestHelper.start_test_pool()
on_exit(&MyApp.TestHelper.stop_test_pool/0)
end
test "grpc operation works" do
operation = fn channel ->
# Your gRPC test operation
end
assert {:ok, result} = GrpcConnectionPool.execute(operation, pool: TestPool)
end
end- Small services: 3-5 connections per pool
- Medium load: 5-10 connections per pool
- High load: 10-20+ connections per pool
- Multiple services: Separate pools with appropriate sizing
config :my_app, GrpcConnectionPool,
connection: [
keepalive: 30_000, # Keep connections alive (important for cloud services)
ping_interval: 25_000, # Ping before cloud timeouts (usually 60s)
health_check: true # Enable automatic health monitoring
]Monitor pool performance using:
- Connection establishment logs
- Pool checkout timeouts
- gRPC operation latencies
- Connection health check failures
:noprocerrors: Pool not started or wrong pool name- Connection timeouts: Network issues or service unavailable
- SSL errors: Incorrect SSL configuration or certificates
- Pool checkout timeouts: Pool too small or slow operations
Enable debug logging to troubleshoot issues:
# config/config.exs
config :logger, level: :info
# In your code
require Logger
Logger.info("Pool status: #{inspect(GrpcConnectionPool.status())}")If you see frequent reconnections:
- Check network connectivity
- Verify service availability
- Adjust
ping_intervalandkeepalivesettings - Check for firewall or load balancer timeouts
GCP gRPC endpoints have hardcoded timeouts that periodically close connections, causing gun_down error messages. This is normal behavior and the connection pool will automatically replace the workers. To suppress these expected error messages:
config :my_app, GrpcConnectionPool,
endpoint: [
type: :production,
host: "pubsub.googleapis.com", # Or other GCP gRPC endpoints
port: 443
],
connection: [
suppress_connection_errors: true # Suppress expected gun_down errors from GCP
]The worker processes will still crash gracefully and be replaced by Poolex as designed.
- Fork the repository
- Create a feature branch (
git checkout -b feature/amazing-feature) - Make your changes
- Add tests for your changes
- Run the test suite (
mix test) - Commit your changes (
git commit -m 'Add amazing feature') - Push to the branch (
git push origin feature/amazing-feature) - Open a Pull Request
git clone https://github.com/nyo16/grpc_connection_pool.git
cd grpc_connection_pool
mix deps.get
mix testThis project is licensed under the MIT License - see the LICENSE file for details.
- Built on top of Poolex for excellent pool management
- Inspired by production requirements from Google Cloud Pub/Sub integration
- Thanks to the Elixir gRPC community for the solid foundation
Documentation: https://hexdocs.pm/grpc_connection_pool
Source Code: https://github.com/nyo16/grpc_connection_pool
Issues: https://github.com/nyo16/grpc_connection_pool/issues