From edafeeb5f59a59ce6039c0d22aaa3f6f3481872c Mon Sep 17 00:00:00 2001 From: Tyler Witt Date: Mon, 22 Dec 2025 10:32:38 -0800 Subject: [PATCH] Convert config to keyword Map based config doesn't automatically merge config values, which means that each environment, if changing a consumer, must define the _entire_ consumer's config. --- CHANGELOG.md | 6 +++ README.md | 46 ++++++++++--------- config/test.exs | 6 +-- lib/kaffe/config/consumer.ex | 22 +++++---- mix.exs | 2 +- test/kaffe/config/consumer_test.exs | 40 ++++++++-------- .../consumer_group/group_manager_test.exs | 6 +-- .../subscriber/group_member_test.exs | 6 +-- .../subscriber/subscriber_test.exs | 10 ++-- .../worker/worker_manager_test.exs | 2 +- 10 files changed, 81 insertions(+), 65 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 39c3209..9e239ce 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,9 @@ +# 2.0.0 + +### Breaking Changes + +* Allow keyword configuration for subscribers. Note, keywords require atom keys, so if your current version of `kaffe` is 1.27.0 or higher, adopting to the keyword subscribers is a breaking (and highly encouraged) change. + # 1.28.0 ### Enhancements diff --git a/README.md b/README.md index 664d552..ab6fa5c 100644 --- a/README.md +++ b/README.md @@ -76,8 +76,8 @@ There is also legacy support for single message consumers, which process one mes ```elixir config :kaffe, - consumers: %{ - "subscriber_1" => [ + consumers: [ + subscriber_1: [ endpoints: [kafka: 9092], topics: ["interesting-topic"], consumer_group: "your-app-consumer-group", @@ -95,7 +95,7 @@ There is also legacy support for single message consumers, which process one mes password: System.get_env("KAFFE_PRODUCER_PASSWORD") } ], - "subscriber_2" => [ + subscriber_2: [ endpoints: [kafka: 9092], topics: ["topic-2"], consumer_group: "your-app-consumer-group", @@ -104,7 +104,7 @@ There is also legacy support for single message consumers, which process one mes max_bytes: 50_000, worker_allocation_strategy: :worker_per_topic_partition ] - } + ] ``` 3. Add `Kaffe.GroupMemberSupervisor` as a supervisor in your supervision tree. @@ -117,12 +117,12 @@ There is also legacy support for single message consumers, which process one mes children = [ %{ id: Kaffe.GroupMemberSupervisor.Subscriber1, - start: {Kaffe.GroupMemberSupervisor, :start_link, ["subscriber_1"]}, + start: {Kaffe.GroupMemberSupervisor, :start_link, [:subscriber_1]}, type: :supervisor }, %{ id: Kaffe.GroupMemberSupervisor.Subscriber2, - start: {Kaffe.GroupMemberSupervisor, :start_link, ["subscriber_2"]}, + start: {Kaffe.GroupMemberSupervisor, :start_link, [:subscriber_2]}, type: :supervisor } ] @@ -201,16 +201,18 @@ _For backward compatibility only! `Kaffe.GroupMemberSupervisor` is recommended i ```elixir config :kaffe, - consumer: [ - endpoints: [kafka: 9092], # that's [hostname: kafka_port] - topics: ["interesting-topic"], # the topic(s) that will be consumed - consumer_group: "your-app-consumer-group", # the consumer group for tracking offsets in Kafka - message_handler: MessageProcessor, # the module from Step 1 that will process messages - - # optional - async_message_ack: false, # see "async message acknowledgement" below - start_with_earliest_message: true # default false - ], + consumers: [ + subscriber_1: [ + endpoints: [kafka: 9092], # that's [hostname: kafka_port] + topics: ["interesting-topic"], # the topic(s) that will be consumed + consumer_group: "your-app-consumer-group", # the consumer group for tracking offsets in Kafka + message_handler: MessageProcessor, # the module from Step 1 that will process messages + + # optional + async_message_ack: false, # see "async message acknowledgement" below + start_with_earliest_message: true # default false + ] + ] ``` The `start_with_earliest_message` field controls where your consumer group starts when it starts for the very first time. Once offsets have been committed to Kafka then they will supercede this option. If omitted, your consumer group will start processing from the most recent messages in the topic instead of consuming all available messages. @@ -221,11 +223,13 @@ _For backward compatibility only! `Kaffe.GroupMemberSupervisor` is recommended i ```elixir config :kaffe, - consumer: [ - heroku_kafka_env: true, - topics: ["interesting-topic"], - consumer_group: "your-app-consumer-group", - message_handler: MessageProcessor + consumers: [ + subscriber_1: [ + heroku_kafka_env: true, + topics: ["interesting-topic"], + consumer_group: "your-app-consumer-group", + message_handler: MessageProcessor + ] ] ``` diff --git a/config/test.exs b/config/test.exs index 2c9356a..993807c 100644 --- a/config/test.exs +++ b/config/test.exs @@ -4,8 +4,8 @@ config :kaffe, kafka_mod: TestBrod, group_subscriber_mod: TestBrodGroupSubscriber, test_partition_count: 32, - consumers: %{ - "subscriber_name" => [ + consumers: [ + subscriber_name: [ endpoints: [kafka: 9092], topics: ["kaffe-test"], consumer_group: "kaffe-test-group", @@ -24,7 +24,7 @@ config :kaffe, password: System.get_env("KAFFE_PRODUCER_PASSWORD") } ] - }, + ], producer: [ endpoints: [kafka: 9092], topics: ["kaffe-test"], diff --git a/lib/kaffe/config/consumer.ex b/lib/kaffe/config/consumer.ex index dec402f..ef72748 100644 --- a/lib/kaffe/config/consumer.ex +++ b/lib/kaffe/config/consumer.ex @@ -108,8 +108,11 @@ defmodule Kaffe.Config.Consumer do def consumer_group(config_key), do: config_get!(config_key, :consumer_group) - def subscriber_name(config_key), - do: config_get(config_key, :subscriber_name, consumer_group(config_key)) |> String.to_atom() + def subscriber_name(config_key) do + config_key + |> config_get!(:subscriber_name) + |> to_atom() + end def topics(config_key), do: config_get!(config_key, :topics) @@ -215,7 +218,7 @@ defmodule Kaffe.Config.Consumer do def config_get!(config_key, key) do Application.get_env(:kaffe, :consumers) - |> Map.get(config_key) + |> Access.get(config_key) |> Keyword.fetch!(key) end @@ -223,26 +226,29 @@ defmodule Kaffe.Config.Consumer do def config_get(config_key, key, default) do Application.get_env(:kaffe, :consumers) - |> Map.get(config_key) + |> Access.get(config_key) |> Keyword.get(key, default) end def validate_configuration!() do if Application.get_env(:kaffe, :consumers) == nil do old_config = Application.get_env(:kaffe, :consumer) || [] - subscriber_name = old_config |> Keyword.get(:subscriber_name, "subscriber_name") + subscriber_name = old_config |> Keyword.get(:subscriber_name, :subscriber_name) raise(""" UPDATE CONSUMERS CONFIG: - Set :kaffe, :consumers to a map with subscriber names as keys and config as values. + Set :kaffe, :consumers to a keyword list with subscriber names as keys and config as values. For example: config :kaffe, - consumers: %{ + consumers: [ #{inspect(subscriber_name)} => #{inspect(old_config)} - } + ] """) end end + + defp to_atom(val) when is_atom(val), do: val + defp to_atom(val) when is_binary(val), do: String.to_atom(val) end diff --git a/mix.exs b/mix.exs index 6d5f6d2..d5aca90 100644 --- a/mix.exs +++ b/mix.exs @@ -2,7 +2,7 @@ defmodule Kaffe.Mixfile do use Mix.Project @source_url "https://github.com/spreedly/kaffe" - @version "1.28.0" + @version "2.0.0" def project do [ diff --git a/test/kaffe/config/consumer_test.exs b/test/kaffe/config/consumer_test.exs index 022da2b..7ea7f74 100644 --- a/test/kaffe/config/consumer_test.exs +++ b/test/kaffe/config/consumer_test.exs @@ -4,12 +4,12 @@ defmodule Kaffe.Config.ConsumerTest do def change_config(subscriber_name, update_fn) do config = Application.get_env(:kaffe, :consumers)[subscriber_name] config = update_fn.(config) - Application.put_env(:kaffe, :consumers, %{subscriber_name => config}) + Application.put_env(:kaffe, :consumers, Keyword.new([{subscriber_name, config}])) end describe "configuration/1" do setup do - change_config("subscriber_name", fn config -> + change_config(:subscriber_name, fn config -> config |> Keyword.delete(:offset_reset_policy) |> Keyword.delete(:ssl) @@ -18,9 +18,9 @@ defmodule Kaffe.Config.ConsumerTest do end test "correct settings are extracted" do - sasl = Kaffe.Config.Consumer.config_get!("subscriber_name", :sasl) + sasl = Kaffe.Config.Consumer.config_get!(:subscriber_name, :sasl) - change_config("subscriber_name", fn config -> + change_config(:subscriber_name, fn config -> config |> Keyword.delete(:sasl) end) @@ -52,18 +52,18 @@ defmodule Kaffe.Config.ConsumerTest do } on_exit(fn -> - change_config("subscriber_name", fn config -> + change_config(:subscriber_name, fn config -> Keyword.put(config, :sasl, sasl) end) end) - assert Kaffe.Config.Consumer.configuration("subscriber_name") == expected + assert Kaffe.Config.Consumer.configuration(:subscriber_name) == expected end test "string endpoints parsed correctly" do - endpoints = Kaffe.Config.Consumer.config_get!("subscriber_name", :endpoints) + endpoints = Kaffe.Config.Consumer.config_get!(:subscriber_name, :endpoints) - change_config("subscriber_name", fn config -> + change_config(:subscriber_name, fn config -> config |> Keyword.put(:endpoints, "kafka:9092,localhost:9092") end) @@ -95,19 +95,19 @@ defmodule Kaffe.Config.ConsumerTest do } on_exit(fn -> - change_config("subscriber_name", fn config -> + change_config(:subscriber_name, fn config -> Keyword.put(config, :endpoints, endpoints) end) end) - assert Kaffe.Config.Consumer.configuration("subscriber_name") == expected + assert Kaffe.Config.Consumer.configuration(:subscriber_name) == expected end end test "correct settings with sasl plain are extracted" do - sasl = Kaffe.Config.Consumer.config_get!("subscriber_name", :sasl) + sasl = Kaffe.Config.Consumer.config_get!(:subscriber_name, :sasl) - change_config("subscriber_name", fn config -> + change_config(:subscriber_name, fn config -> Keyword.put(config, :sasl, %{mechanism: :plain, login: "Alice", password: "ecilA"}) end) @@ -140,18 +140,18 @@ defmodule Kaffe.Config.ConsumerTest do } on_exit(fn -> - change_config("subscriber_name", fn config -> + change_config(:subscriber_name, fn config -> Keyword.put(config, :sasl, sasl) end) end) - assert Kaffe.Config.Consumer.configuration("subscriber_name") == expected + assert Kaffe.Config.Consumer.configuration(:subscriber_name) == expected end test "correct settings with ssl are extracted" do - ssl = Kaffe.Config.Consumer.config_get("subscriber_name", :ssl, false) + ssl = Kaffe.Config.Consumer.config_get(:subscriber_name, :ssl, false) - change_config("subscriber_name", fn config -> + change_config(:subscriber_name, fn config -> Keyword.put(config, :ssl, true) end) @@ -184,21 +184,21 @@ defmodule Kaffe.Config.ConsumerTest do } on_exit(fn -> - change_config("subscriber_name", fn config -> + change_config(:subscriber_name, fn config -> Keyword.put(config, :ssl, ssl) end) end) - assert Kaffe.Config.Consumer.configuration("subscriber_name") == expected + assert Kaffe.Config.Consumer.configuration(:subscriber_name) == expected end describe "offset_reset_policy" do test "computes correctly from start_with_earliest_message == true" do - change_config("subscriber_name", fn config -> + change_config(:subscriber_name, fn config -> config |> Keyword.delete(:offset_reset_policy) end) - assert Kaffe.Config.Consumer.configuration("subscriber_name").offset_reset_policy == :reset_by_subscriber + assert Kaffe.Config.Consumer.configuration(:subscriber_name).offset_reset_policy == :reset_by_subscriber end end end diff --git a/test/kaffe/consumer_group/group_manager_test.exs b/test/kaffe/consumer_group/group_manager_test.exs index 7fe6809..2ed5b2b 100644 --- a/test/kaffe/consumer_group/group_manager_test.exs +++ b/test/kaffe/consumer_group/group_manager_test.exs @@ -37,7 +37,7 @@ defmodule Kaffe.GroupManagerTest do test "subscribe from config" do Process.register(self(), :test_case) - config = Kaffe.Config.Consumer.configuration("subscriber_name") + config = Kaffe.Config.Consumer.configuration(:subscriber_name) {:ok, _group_manager_pid} = GroupManager.start_link(config) :timer.sleep(config.rebalance_delay_ms) @@ -52,7 +52,7 @@ defmodule Kaffe.GroupManagerTest do test "subscribe to topics dynamically" do Process.register(self(), :test_case) - config = Kaffe.Config.Consumer.configuration("subscriber_name") + config = Kaffe.Config.Consumer.configuration(:subscriber_name) {:ok, _group_manager_pid} = GroupManager.start_link(config) :timer.sleep(config.rebalance_delay_ms) @@ -74,7 +74,7 @@ defmodule Kaffe.GroupManagerTest do test "duplicate topic subscription does nothing" do Process.register(self(), :test_case) - config = Kaffe.Config.Consumer.configuration("subscriber_name") + config = Kaffe.Config.Consumer.configuration(:subscriber_name) {:ok, _group_manager_pid} = GroupManager.start_link(config) :timer.sleep(config.rebalance_delay_ms) diff --git a/test/kaffe/consumer_group/subscriber/group_member_test.exs b/test/kaffe/consumer_group/subscriber/group_member_test.exs index 8c84025..5d3e3b0 100644 --- a/test/kaffe/consumer_group/subscriber/group_member_test.exs +++ b/test/kaffe/consumer_group/subscriber/group_member_test.exs @@ -44,7 +44,7 @@ defmodule Kaffe.GroupMemberTest do test "handle assignments_received" do Process.register(self(), :test_case) - config = Kaffe.Config.Consumer.configuration("subscriber_name") + config = Kaffe.Config.Consumer.configuration(:subscriber_name) {:ok, pid} = GroupMember.start_link("subscriber_name", "consumer_group", self(), "topic", config) GroupMember.assignments_received(pid, self(), 1, [{:brod_received_assignment, "topic", 0, 1}]) @@ -59,7 +59,7 @@ defmodule Kaffe.GroupMemberTest do test "handle assignments_revoked" do Process.register(self(), :test_case) - config = Kaffe.Config.Consumer.configuration("subscriber_name") + config = Kaffe.Config.Consumer.configuration(:subscriber_name) {:ok, pid} = GroupMember.start_link("subscriber_name", "consumer_group", self(), "topic", config) GroupMember.assignments_received(pid, self(), 1, [{:brod_received_assignment, "topic", 0, 1}]) @@ -77,7 +77,7 @@ defmodule Kaffe.GroupMemberTest do test "handle assignments_received without assignments_revoked" do Process.register(self(), :test_case) - config = Kaffe.Config.Consumer.configuration("subscriber_name") + config = Kaffe.Config.Consumer.configuration(:subscriber_name) {:ok, pid} = GroupMember.start_link("subscriber_name", "consumer_group", self(), "topic", config) GroupMember.assignments_received(pid, self(), 1, [{:brod_received_assignment, "topic", 0, 1}]) diff --git a/test/kaffe/consumer_group/subscriber/subscriber_test.exs b/test/kaffe/consumer_group/subscriber/subscriber_test.exs index dca8b22..d1e3aae 100644 --- a/test/kaffe/consumer_group/subscriber/subscriber_test.exs +++ b/test/kaffe/consumer_group/subscriber/subscriber_test.exs @@ -45,7 +45,7 @@ defmodule Kaffe.SubscriberTest do Process.register(self(), :test_case) {:ok, kafka_pid} = TestKafka.start_link(0) - config = Kaffe.Config.Consumer.configuration("subscriber_name") + config = Kaffe.Config.Consumer.configuration(:subscriber_name) {:ok, pid} = Subscriber.subscribe("subscriber_name", self(), self(), 1, "topic", 0, [], config) send(pid, {self(), build_message_set()}) @@ -57,7 +57,7 @@ defmodule Kaffe.SubscriberTest do Process.register(self(), :test_case) {:ok, kafka_pid} = TestKafka.start_link(0) - config = Kaffe.Config.Consumer.configuration("subscriber_name") + config = Kaffe.Config.Consumer.configuration(:subscriber_name) {:ok, pid} = Subscriber.subscribe("subscriber_name", self(), self(), 1, "topic", 0, [], config) Process.unlink(pid) Process.monitor(pid) @@ -80,7 +80,7 @@ defmodule Kaffe.SubscriberTest do Process.register(self(), :test_case) {:ok, kafka_pid} = TestKafka.start_link(0) - config = Kaffe.Config.Consumer.configuration("subscriber_name") + config = Kaffe.Config.Consumer.configuration(:subscriber_name) {:ok, pid} = Subscriber.subscribe("subscriber_name", self(), self(), 1, "topic", 0, [], config) Process.unlink(pid) Process.monitor(pid) @@ -95,7 +95,7 @@ defmodule Kaffe.SubscriberTest do Process.register(self(), :test_case) {:ok, kafka_pid} = TestKafka.start_link(1) - config = Kaffe.Config.Consumer.configuration("subscriber_name") + config = Kaffe.Config.Consumer.configuration(:subscriber_name) Subscriber.subscribe("subscriber_name", self(), self(), 1, "topic", 0, [], config) assert_receive {:subscribe, {:error, :no_available_offsets}} @@ -110,7 +110,7 @@ defmodule Kaffe.SubscriberTest do {:ok, kafka_pid} = TestKafka.start_link(2) Process.monitor(kafka_pid) - config = Kaffe.Config.Consumer.configuration("subscriber_name") + config = Kaffe.Config.Consumer.configuration(:subscriber_name) {:ok, subscriber_pid} = Subscriber.subscribe("subscriber_name", self(), self(), 1, "topic", 0, [], config) assert_receive {:subscribe, {:error, :no_available_offsets}} diff --git a/test/kaffe/consumer_group/worker/worker_manager_test.exs b/test/kaffe/consumer_group/worker/worker_manager_test.exs index 8123579..e912ccb 100644 --- a/test/kaffe/consumer_group/worker/worker_manager_test.exs +++ b/test/kaffe/consumer_group/worker/worker_manager_test.exs @@ -36,7 +36,7 @@ defmodule Kaffe.WorkerManagerTest do end defp consumer_config() do - Kaffe.Config.Consumer.configuration("subscriber_name") + Kaffe.Config.Consumer.configuration(:subscriber_name) end defp configure_strategy(worker_manager_pid, strategy) do