From 9b6a5f7377b43d76167a5e00d6d007de0890047b Mon Sep 17 00:00:00 2001 From: Buck Doyle Date: Tue, 20 Jan 2026 20:43:55 -0600 Subject: [PATCH 1/2] Add rate-limiter mail delivery strategy --- config/prod.exs | 4 + lib/prison_rideshare/application.ex | 1 + lib/prison_rideshare/mailer_rate_limiter.ex | 82 +++++++++++++++++++++ 3 files changed, 87 insertions(+) create mode 100644 lib/prison_rideshare/mailer_rate_limiter.ex diff --git a/config/prod.exs b/config/prod.exs index ca15fad..cca10d4 100644 --- a/config/prod.exs +++ b/config/prod.exs @@ -15,4 +15,8 @@ config :prison_rideshare, PrisonRideshare.Repo, pool_size: String.to_integer(System.get_env("POOL_SIZE") || "10"), ssl: true +config :prison_rideshare, PrisonRideshare.Mailer, + deliver_later_strategy: PrisonRideshare.MailerRateLimiter, + rate_limit_ms: String.to_integer(System.get_env("MAILER_RATE_LIMIT_MS") || "30000") + config :prison_rideshare, gas_price_endpoint: System.get_env("GAS_PRICE_ENDPOINT") diff --git a/lib/prison_rideshare/application.ex b/lib/prison_rideshare/application.ex index 2438a96..cbb96b8 100644 --- a/lib/prison_rideshare/application.ex +++ b/lib/prison_rideshare/application.ex @@ -10,6 +10,7 @@ defmodule PrisonRideshare.Application do {Phoenix.PubSub, name: PrisonRideshare.PubSub}, # Start the Ecto repository PrisonRideshare.Repo, + PrisonRideshare.MailerRateLimiter, # Start the endpoint when the application starts PrisonRideshareWeb.Endpoint, # Start your own worker by calling: PrisonRideshare.Worker.start_link(arg1, arg2, arg3) diff --git a/lib/prison_rideshare/mailer_rate_limiter.ex b/lib/prison_rideshare/mailer_rate_limiter.ex new file mode 100644 index 0000000..ca9276c --- /dev/null +++ b/lib/prison_rideshare/mailer_rate_limiter.ex @@ -0,0 +1,82 @@ +defmodule PrisonRideshare.MailerRateLimiter do + @behaviour Bamboo.DeliverLaterStrategy + use GenServer + + require Logger + + @default_interval_ms 30000 + + def start_link(opts \\ []) do + GenServer.start_link(__MODULE__, opts, name: __MODULE__) + end + + @impl Bamboo.DeliverLaterStrategy + def deliver_later(adapter, email, config) do + GenServer.cast(__MODULE__, {:enqueue, adapter, email, config}) + end + + @impl GenServer + def init(_opts) do + {:ok, + %{ + queue: :queue.new(), + interval_ms: delivery_interval_ms(), + in_flight: false + }} + end + + @impl GenServer + def handle_cast({:enqueue, adapter, email, config}, state) do + state = %{state | queue: :queue.in({adapter, email, config}, state.queue)} + {:noreply, maybe_schedule(state)} + end + + @impl GenServer + def handle_info(:deliver_next, state) do + case :queue.out(state.queue) do + {{:value, {adapter, email, config}}, queue} -> + deliver(adapter, email, config) + state = %{state | queue: queue} + + if :queue.is_empty(queue) do + {:noreply, %{state | in_flight: false}} + else + Process.send_after(self(), :deliver_next, state.interval_ms) + {:noreply, state} + end + + {:empty, _queue} -> + {:noreply, %{state | in_flight: false}} + end + end + + defp maybe_schedule(%{in_flight: true} = state), do: state + + defp maybe_schedule(state) do + send(self(), :deliver_next) + %{state | in_flight: true} + end + + defp deliver(adapter, email, config) do + try do + case adapter.deliver(email, config) do + {:error, error} -> + Logger.error("Email delivery failed: #{inspect(error)}") + + _ -> + :ok + end + rescue + exception -> + Logger.error(Exception.format(:error, exception, __STACKTRACE__)) + catch + kind, reason -> + Logger.error(Exception.format(kind, reason, __STACKTRACE__)) + end + end + + defp delivery_interval_ms do + config = Application.get_env(:prison_rideshare, PrisonRideshare.Mailer, []) + config[:rate_limit_ms] || @default_interval_ms + end +end From e0e63473d4a07368cad97780e9561784125877cf Mon Sep 17 00:00:00 2001 From: Buck Doyle Date: Sat, 21 Mar 2026 09:39:31 -0700 Subject: [PATCH 2/2] Add logging about queueing --- lib/prison_rideshare/mailer_rate_limiter.ex | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/lib/prison_rideshare/mailer_rate_limiter.ex b/lib/prison_rideshare/mailer_rate_limiter.ex index ca9276c..a9010f7 100644 --- a/lib/prison_rideshare/mailer_rate_limiter.ex +++ b/lib/prison_rideshare/mailer_rate_limiter.ex @@ -28,6 +28,8 @@ defmodule PrisonRideshare.MailerRateLimiter do @impl GenServer def handle_cast({:enqueue, adapter, email, config}, state) do state = %{state | queue: :queue.in({adapter, email, config}, state.queue)} + queue_size = :queue.len(state.queue) + Logger.info("Mail queued: to=#{inspect(email.to)} subject=#{inspect(email.subject)} queue_size=#{queue_size}") {:noreply, maybe_schedule(state)} end @@ -35,12 +37,16 @@ defmodule PrisonRideshare.MailerRateLimiter do def handle_info(:deliver_next, state) do case :queue.out(state.queue) do {{:value, {adapter, email, config}}, queue} -> + remaining = :queue.len(queue) + Logger.info("Mail sending: to=#{inspect(email.to)} subject=#{inspect(email.subject)} remaining=#{remaining}") deliver(adapter, email, config) state = %{state | queue: queue} if :queue.is_empty(queue) do + Logger.info("Mail queue drained") {:noreply, %{state | in_flight: false}} else + Logger.info("Mail queue scheduling next delivery in #{state.interval_ms}ms remaining=#{remaining}") Process.send_after(self(), :deliver_next, state.interval_ms) {:noreply, state} end @@ -64,7 +70,7 @@ defmodule PrisonRideshare.MailerRateLimiter do Logger.error("Email delivery failed: #{inspect(error)}") _ -> - :ok + Logger.info("Mail sent: to=#{inspect(email.to)} subject=#{inspect(email.subject)}") end rescue exception ->