Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
# 2.0.0

### Breaking Changes

* Allow keyword configuration for subscribers. Note, keywords require atom keys, so if your current version of `kaffe` is 1.27.0 or higher, adopting to the keyword subscribers is a breaking (and highly encouraged) change.

# 1.28.0

### Enhancements
Expand Down
46 changes: 25 additions & 21 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,8 @@ There is also legacy support for single message consumers, which process one mes

```elixir
config :kaffe,
consumers: %{
"subscriber_1" => [
consumers: [
subscriber_1: [
endpoints: [kafka: 9092],
topics: ["interesting-topic"],
consumer_group: "your-app-consumer-group",
Expand All @@ -95,7 +95,7 @@ There is also legacy support for single message consumers, which process one mes
password: System.get_env("KAFFE_PRODUCER_PASSWORD")
}
],
"subscriber_2" => [
subscriber_2: [
endpoints: [kafka: 9092],
topics: ["topic-2"],
consumer_group: "your-app-consumer-group",
Expand All @@ -104,7 +104,7 @@ There is also legacy support for single message consumers, which process one mes
max_bytes: 50_000,
worker_allocation_strategy: :worker_per_topic_partition
]
}
]
```

3. Add `Kaffe.GroupMemberSupervisor` as a supervisor in your supervision tree.
Expand All @@ -117,12 +117,12 @@ There is also legacy support for single message consumers, which process one mes
children = [
%{
id: Kaffe.GroupMemberSupervisor.Subscriber1,
start: {Kaffe.GroupMemberSupervisor, :start_link, ["subscriber_1"]},
start: {Kaffe.GroupMemberSupervisor, :start_link, [:subscriber_1]},
type: :supervisor
},
%{
id: Kaffe.GroupMemberSupervisor.Subscriber2,
start: {Kaffe.GroupMemberSupervisor, :start_link, ["subscriber_2"]},
start: {Kaffe.GroupMemberSupervisor, :start_link, [:subscriber_2]},
type: :supervisor
}
]
Expand Down Expand Up @@ -201,16 +201,18 @@ _For backward compatibility only! `Kaffe.GroupMemberSupervisor` is recommended i

```elixir
config :kaffe,
consumer: [
endpoints: [kafka: 9092], # that's [hostname: kafka_port]
topics: ["interesting-topic"], # the topic(s) that will be consumed
consumer_group: "your-app-consumer-group", # the consumer group for tracking offsets in Kafka
message_handler: MessageProcessor, # the module from Step 1 that will process messages

# optional
async_message_ack: false, # see "async message acknowledgement" below
start_with_earliest_message: true # default false
],
consumers: [
subscriber_1: [
endpoints: [kafka: 9092], # that's [hostname: kafka_port]
topics: ["interesting-topic"], # the topic(s) that will be consumed
consumer_group: "your-app-consumer-group", # the consumer group for tracking offsets in Kafka
message_handler: MessageProcessor, # the module from Step 1 that will process messages

# optional
async_message_ack: false, # see "async message acknowledgement" below
start_with_earliest_message: true # default false
]
]
```

The `start_with_earliest_message` field controls where your consumer group starts when it starts for the very first time. Once offsets have been committed to Kafka then they will supercede this option. If omitted, your consumer group will start processing from the most recent messages in the topic instead of consuming all available messages.
Expand All @@ -221,11 +223,13 @@ _For backward compatibility only! `Kaffe.GroupMemberSupervisor` is recommended i

```elixir
config :kaffe,
consumer: [
heroku_kafka_env: true,
topics: ["interesting-topic"],
consumer_group: "your-app-consumer-group",
message_handler: MessageProcessor
consumers: [
subscriber_1: [
heroku_kafka_env: true,
topics: ["interesting-topic"],
consumer_group: "your-app-consumer-group",
message_handler: MessageProcessor
]
]
```

Expand Down
6 changes: 3 additions & 3 deletions config/test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ config :kaffe,
kafka_mod: TestBrod,
group_subscriber_mod: TestBrodGroupSubscriber,
test_partition_count: 32,
consumers: %{
"subscriber_name" => [
consumers: [
subscriber_name: [
endpoints: [kafka: 9092],
topics: ["kaffe-test"],
consumer_group: "kaffe-test-group",
Expand All @@ -24,7 +24,7 @@ config :kaffe,
password: System.get_env("KAFFE_PRODUCER_PASSWORD")
}
]
},
],
producer: [
endpoints: [kafka: 9092],
topics: ["kaffe-test"],
Expand Down
22 changes: 14 additions & 8 deletions lib/kaffe/config/consumer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -108,8 +108,11 @@ defmodule Kaffe.Config.Consumer do

def consumer_group(config_key), do: config_get!(config_key, :consumer_group)

def subscriber_name(config_key),
do: config_get(config_key, :subscriber_name, consumer_group(config_key)) |> String.to_atom()
def subscriber_name(config_key) do
config_key
|> config_get!(:subscriber_name)
|> to_atom()
end

def topics(config_key), do: config_get!(config_key, :topics)

Expand Down Expand Up @@ -215,34 +218,37 @@ defmodule Kaffe.Config.Consumer do

def config_get!(config_key, key) do
Application.get_env(:kaffe, :consumers)
|> Map.get(config_key)
|> Access.get(config_key)
|> Keyword.fetch!(key)
end

def config_get(config_key, :subscriber_name, _default), do: config_key

def config_get(config_key, key, default) do
Application.get_env(:kaffe, :consumers)
|> Map.get(config_key)
|> Access.get(config_key)
|> Keyword.get(key, default)
end

def validate_configuration!() do
if Application.get_env(:kaffe, :consumers) == nil do
old_config = Application.get_env(:kaffe, :consumer) || []
subscriber_name = old_config |> Keyword.get(:subscriber_name, "subscriber_name")
subscriber_name = old_config |> Keyword.get(:subscriber_name, :subscriber_name)

raise("""
UPDATE CONSUMERS CONFIG:

Set :kaffe, :consumers to a map with subscriber names as keys and config as values.
Set :kaffe, :consumers to a keyword list with subscriber names as keys and config as values.
For example:

config :kaffe,
consumers: %{
consumers: [
#{inspect(subscriber_name)} => #{inspect(old_config)}
}
]
""")
end
end

defp to_atom(val) when is_atom(val), do: val
defp to_atom(val) when is_binary(val), do: String.to_atom(val)
end
2 changes: 1 addition & 1 deletion mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ defmodule Kaffe.Mixfile do
use Mix.Project

@source_url "https://github.com/spreedly/kaffe"
@version "1.28.0"
@version "2.0.0"

def project do
[
Expand Down
40 changes: 20 additions & 20 deletions test/kaffe/config/consumer_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,12 @@ defmodule Kaffe.Config.ConsumerTest do
def change_config(subscriber_name, update_fn) do
config = Application.get_env(:kaffe, :consumers)[subscriber_name]
config = update_fn.(config)
Application.put_env(:kaffe, :consumers, %{subscriber_name => config})
Application.put_env(:kaffe, :consumers, Keyword.new([{subscriber_name, config}]))
end

describe "configuration/1" do
setup do
change_config("subscriber_name", fn config ->
change_config(:subscriber_name, fn config ->
config
|> Keyword.delete(:offset_reset_policy)
|> Keyword.delete(:ssl)
Expand All @@ -18,9 +18,9 @@ defmodule Kaffe.Config.ConsumerTest do
end

test "correct settings are extracted" do
sasl = Kaffe.Config.Consumer.config_get!("subscriber_name", :sasl)
sasl = Kaffe.Config.Consumer.config_get!(:subscriber_name, :sasl)

change_config("subscriber_name", fn config ->
change_config(:subscriber_name, fn config ->
config |> Keyword.delete(:sasl)
end)

Expand Down Expand Up @@ -52,18 +52,18 @@ defmodule Kaffe.Config.ConsumerTest do
}

on_exit(fn ->
change_config("subscriber_name", fn config ->
change_config(:subscriber_name, fn config ->
Keyword.put(config, :sasl, sasl)
end)
end)

assert Kaffe.Config.Consumer.configuration("subscriber_name") == expected
assert Kaffe.Config.Consumer.configuration(:subscriber_name) == expected
end

test "string endpoints parsed correctly" do
endpoints = Kaffe.Config.Consumer.config_get!("subscriber_name", :endpoints)
endpoints = Kaffe.Config.Consumer.config_get!(:subscriber_name, :endpoints)

change_config("subscriber_name", fn config ->
change_config(:subscriber_name, fn config ->
config |> Keyword.put(:endpoints, "kafka:9092,localhost:9092")
end)

Expand Down Expand Up @@ -95,19 +95,19 @@ defmodule Kaffe.Config.ConsumerTest do
}

on_exit(fn ->
change_config("subscriber_name", fn config ->
change_config(:subscriber_name, fn config ->
Keyword.put(config, :endpoints, endpoints)
end)
end)

assert Kaffe.Config.Consumer.configuration("subscriber_name") == expected
assert Kaffe.Config.Consumer.configuration(:subscriber_name) == expected
end
end

test "correct settings with sasl plain are extracted" do
sasl = Kaffe.Config.Consumer.config_get!("subscriber_name", :sasl)
sasl = Kaffe.Config.Consumer.config_get!(:subscriber_name, :sasl)

change_config("subscriber_name", fn config ->
change_config(:subscriber_name, fn config ->
Keyword.put(config, :sasl, %{mechanism: :plain, login: "Alice", password: "ecilA"})
end)

Expand Down Expand Up @@ -140,18 +140,18 @@ defmodule Kaffe.Config.ConsumerTest do
}

on_exit(fn ->
change_config("subscriber_name", fn config ->
change_config(:subscriber_name, fn config ->
Keyword.put(config, :sasl, sasl)
end)
end)

assert Kaffe.Config.Consumer.configuration("subscriber_name") == expected
assert Kaffe.Config.Consumer.configuration(:subscriber_name) == expected
end

test "correct settings with ssl are extracted" do
ssl = Kaffe.Config.Consumer.config_get("subscriber_name", :ssl, false)
ssl = Kaffe.Config.Consumer.config_get(:subscriber_name, :ssl, false)

change_config("subscriber_name", fn config ->
change_config(:subscriber_name, fn config ->
Keyword.put(config, :ssl, true)
end)

Expand Down Expand Up @@ -184,21 +184,21 @@ defmodule Kaffe.Config.ConsumerTest do
}

on_exit(fn ->
change_config("subscriber_name", fn config ->
change_config(:subscriber_name, fn config ->
Keyword.put(config, :ssl, ssl)
end)
end)

assert Kaffe.Config.Consumer.configuration("subscriber_name") == expected
assert Kaffe.Config.Consumer.configuration(:subscriber_name) == expected
end

describe "offset_reset_policy" do
test "computes correctly from start_with_earliest_message == true" do
change_config("subscriber_name", fn config ->
change_config(:subscriber_name, fn config ->
config |> Keyword.delete(:offset_reset_policy)
end)

assert Kaffe.Config.Consumer.configuration("subscriber_name").offset_reset_policy == :reset_by_subscriber
assert Kaffe.Config.Consumer.configuration(:subscriber_name).offset_reset_policy == :reset_by_subscriber
end
end
end
6 changes: 3 additions & 3 deletions test/kaffe/consumer_group/group_manager_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ defmodule Kaffe.GroupManagerTest do

test "subscribe from config" do
Process.register(self(), :test_case)
config = Kaffe.Config.Consumer.configuration("subscriber_name")
config = Kaffe.Config.Consumer.configuration(:subscriber_name)
{:ok, _group_manager_pid} = GroupManager.start_link(config)

:timer.sleep(config.rebalance_delay_ms)
Expand All @@ -52,7 +52,7 @@ defmodule Kaffe.GroupManagerTest do

test "subscribe to topics dynamically" do
Process.register(self(), :test_case)
config = Kaffe.Config.Consumer.configuration("subscriber_name")
config = Kaffe.Config.Consumer.configuration(:subscriber_name)
{:ok, _group_manager_pid} = GroupManager.start_link(config)

:timer.sleep(config.rebalance_delay_ms)
Expand All @@ -74,7 +74,7 @@ defmodule Kaffe.GroupManagerTest do

test "duplicate topic subscription does nothing" do
Process.register(self(), :test_case)
config = Kaffe.Config.Consumer.configuration("subscriber_name")
config = Kaffe.Config.Consumer.configuration(:subscriber_name)
{:ok, _group_manager_pid} = GroupManager.start_link(config)

:timer.sleep(config.rebalance_delay_ms)
Expand Down
6 changes: 3 additions & 3 deletions test/kaffe/consumer_group/subscriber/group_member_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ defmodule Kaffe.GroupMemberTest do

test "handle assignments_received" do
Process.register(self(), :test_case)
config = Kaffe.Config.Consumer.configuration("subscriber_name")
config = Kaffe.Config.Consumer.configuration(:subscriber_name)
{:ok, pid} = GroupMember.start_link("subscriber_name", "consumer_group", self(), "topic", config)

GroupMember.assignments_received(pid, self(), 1, [{:brod_received_assignment, "topic", 0, 1}])
Expand All @@ -59,7 +59,7 @@ defmodule Kaffe.GroupMemberTest do

test "handle assignments_revoked" do
Process.register(self(), :test_case)
config = Kaffe.Config.Consumer.configuration("subscriber_name")
config = Kaffe.Config.Consumer.configuration(:subscriber_name)
{:ok, pid} = GroupMember.start_link("subscriber_name", "consumer_group", self(), "topic", config)

GroupMember.assignments_received(pid, self(), 1, [{:brod_received_assignment, "topic", 0, 1}])
Expand All @@ -77,7 +77,7 @@ defmodule Kaffe.GroupMemberTest do

test "handle assignments_received without assignments_revoked" do
Process.register(self(), :test_case)
config = Kaffe.Config.Consumer.configuration("subscriber_name")
config = Kaffe.Config.Consumer.configuration(:subscriber_name)
{:ok, pid} = GroupMember.start_link("subscriber_name", "consumer_group", self(), "topic", config)

GroupMember.assignments_received(pid, self(), 1, [{:brod_received_assignment, "topic", 0, 1}])
Expand Down
Loading