diff --git a/src/Dafda.Tests/Producing/TestOutgoingMessageFactory.cs b/src/Dafda.Tests/Producing/TestOutgoingMessageFactory.cs index 2d8b1ba..8a91261 100644 --- a/src/Dafda.Tests/Producing/TestOutgoingMessageFactory.cs +++ b/src/Dafda.Tests/Producing/TestOutgoingMessageFactory.cs @@ -7,11 +7,18 @@ namespace Dafda.Tests.Producing { [Collection("Serializing")] - public class TestOutgoingMessageFactory + public class TestOutgoingMessageFactory : IClassFixture { + private readonly DafdaActivitySourceFixture _fixture; private const string DummyTopic = "dummy_topic"; private const string DummyType = "dummy_type"; + public TestOutgoingMessageFactory(DafdaActivitySourceFixture fixture) + { + _fixture = fixture; + _fixture.ResetDafdaActivitySource(); + } + [Fact] public async Task Can_create_outgoing_message_from_registry_with_expected_raw_message() { diff --git a/src/Dafda.Tests/Producing/TestProducer.cs b/src/Dafda.Tests/Producing/TestProducer.cs index 14f4af6..963b3bb 100644 --- a/src/Dafda.Tests/Producing/TestProducer.cs +++ b/src/Dafda.Tests/Producing/TestProducer.cs @@ -1,6 +1,7 @@ using System; using System.Collections.Generic; using System.Diagnostics; +using System.Linq; using System.Threading.Tasks; using Dafda.Consuming; using Dafda.Diagnostics; @@ -17,8 +18,16 @@ namespace Dafda.Tests.Producing { [Collection("Serializing")] - public class TestProducer + public class TestProducer : IClassFixture { + private readonly DafdaActivitySourceFixture _fixture; + + public TestProducer(DafdaActivitySourceFixture fixture) + { + _fixture = fixture; + _fixture.ResetDafdaActivitySource(); + } + [Fact] public async Task Can_produce_message() { @@ -311,5 +320,451 @@ await sut.Produce( AssertJson.Equal(expected, spy.Value); } + + [Fact] + public async Task Can_produce_multiple_messages() + { + var spy = new KafkaProducerSpy(); + + var sut = A.Producer + .With(spy) + .With(A.OutgoingMessageRegistry + .Register("foo", "bar", @event => @event.Id) + .Build() + ) + .Build(); + + var messages = new[] + { + new Message { Id = "id1" }, + new Message { Id = "id2" }, + new Message { Id = "id3" } + }; + + await sut.Produce(messages); + + Assert.Equal(3, spy.ProduceCallCount); + Assert.Equal("foo", spy.Topic); + } + + [Fact] + public async Task produces_multiple_messages_to_expected_topic() + { + var spy = new KafkaProducerSpy(); + + var expectedTopic = "foo"; + + var sut = A.Producer + .With(spy) + .With(A.OutgoingMessageRegistry + .Register(expectedTopic, "bar", @event => @event.Id) + .Build() + ) + .Build(); + + var messages = new[] + { + new Message { Id = "id1" }, + new Message { Id = "id2" } + }; + + await sut.Produce( + messages: messages, + headers: new Dictionary + { + { "foo-key", "foo-value" } + } + ); + + Assert.Equal(expectedTopic, spy.Topic); + Assert.Equal(2, spy.ProduceCallCount); + } + + [Fact] + public async Task produces_multiple_messages_with_expected_partition_keys() + { + var spy = new KafkaProducerSpy(); + + var sut = A.Producer + .With(spy) + .With(A.OutgoingMessageRegistry + .Register("foo", "bar", @event => @event.Id) + .Build() + ) + .Build(); + + var messages = new[] + { + new Message { Id = "key1" }, + new Message { Id = "key2" }, + new Message { Id = "key3" } + }; + + await sut.Produce( + messages: messages, + headers: new Dictionary + { + { "foo-key", "foo-value" } + } + ); + + Assert.Equal(3, spy.ProduceCallCount); + Assert.Contains("key1", spy.AllKeys); + Assert.Contains("key2", spy.AllKeys); + Assert.Contains("key3", spy.AllKeys); + } + + [Fact] + public async Task produces_multiple_messages_with_expected_serialized_values() + { + var expectedValue = "foo-value"; + + var spy = new KafkaProducerSpy(new PayloadSerializerStub(expectedValue)); + + var sut = A.Producer + .With(spy) + .With(A.OutgoingMessageRegistry + .Register("foo", "bar", @event => @event.Id) + .Build() + ) + .Build(); + + var messages = new[] + { + new Message { Id = "id1" }, + new Message { Id = "id2" } + }; + + await sut.Produce( + messages: messages, + headers: new Dictionary + { + { "foo-key", "foo-value" } + } + ); + + Assert.Equal(expectedValue, spy.Value); + Assert.Equal(2, spy.ProduceCallCount); + } + + [Fact] + public async Task produces_expected_multiple_messages_without_headers_using_default_serializer() + { + var spy = new KafkaProducerSpy(); + + var sut = A.Producer + .With(spy) + .With(new MessageIdGeneratorStub(() => "1")) + .With(A.OutgoingMessageRegistry + .Register("foo", "bar", @event => @event.Id) + .Build() + ) + .Build(); + + var messages = new[] + { + new Message { Id = "dummyId1" }, + new Message { Id = "dummyId2" } + }; + + await sut.Produce( + messages: messages, + headers: new Dictionary + { + } + ); + + Assert.Equal(2, spy.ProduceCallCount); + var expected = @"{ + ""messageId"":""1"", + ""type"":""bar"", + ""causationId"":""1"", + ""correlationId"":""1"", + ""data"": + { + ""id"":""dummyId2"" + } + }"; + + AssertJson.Equal(expected, spy.Value); + } + + [Fact] + public async Task produces_expected_multiple_messages_with_headers_using_default_serializer() + { + var spy = new KafkaProducerSpy(); + + var sut = A.Producer + .With(spy) + .With(new MessageIdGeneratorStub(() => "1")) + .With(A.OutgoingMessageRegistry + .Register("foo", "bar", @event => @event.Id) + .Build() + ) + .Build(); + + var messages = new[] + { + new Message { Id = "dummyId1" }, + new Message { Id = "dummyId2" } + }; + + await sut.Produce( + messages: messages, + headers: new Dictionary + { + { "foo-key", "foo-value" } + } + ); + + Assert.Equal(2, spy.ProduceCallCount); + var expected = @"{ + ""messageId"":""1"", + ""type"":""bar"", + ""foo-key"":""foo-value"", + ""causationId"":""1"", + ""correlationId"":""1"", + ""data"": + { + ""id"":""dummyId2"" + } + }"; + + AssertJson.Equal(expected, spy.Value); + } + + [Fact] + public async Task produces_multiple_messages_using_metadata() + { + var spy = new KafkaProducerSpy(); + + var sut = A.Producer + .With(spy) + .With(A.OutgoingMessageRegistry + .Register("foo", "bar", @event => @event.Id) + .Build() + ) + .Build(); + + var messages = new[] + { + new Message { Id = "key1" }, + new Message { Id = "key2" } + }; + + await sut.Produce( + messages: messages, + headers: new Metadata + { + CausationId = "my-causation", + CorrelationId = "my-correlation" + } + ); + + Assert.Equal(2, spy.ProduceCallCount); + Assert.Contains("key1", spy.AllKeys); + Assert.Contains("key2", spy.AllKeys); + } + + [Fact] + public async Task produces_multiple_messages_using_message_handler_context() + { + var spy = new KafkaProducerSpy(); + + var sut = A.Producer + .With(spy) + .With(new MessageIdGeneratorStub(() => "1")) + .With(A.OutgoingMessageRegistry + .Register("foo", "bar", @event => @event.Id) + .Build() + ) + .Build(); + + var messages = new[] + { + new Message { Id = "0" }, + new Message { Id = "1" } + }; + + await sut.Produce( + messages: messages, + context: new MessageHandlerContext(new Metadata + { + CausationId = "my-causation", + CorrelationId = "my-correlation" + }) + ); + + Assert.Equal(2, spy.ProduceCallCount); + var expected = @" + { + ""messageId"":""1"", + ""type"":""bar"", + ""correlationId"":""my-correlation"", + ""causationId"":""1"", + ""data"": + { + ""id"":""1"" + } + }"; + + AssertJson.Equal(expected, spy.Value); + } + + [Fact] + public async Task produces_multiple_messages_using_message_handler_context_with_additional_headers() + { + var spy = new KafkaProducerSpy(); + + var sut = A.Producer + .With(spy) + .With(new MessageIdGeneratorStub(() => "1")) + .With(A.OutgoingMessageRegistry + .Register("foo", "bar", @event => @event.Id) + .Build() + ) + .Build(); + + var messages = new[] + { + new Message { Id = "0" }, + new Message { Id = "1" } + }; + + await sut.Produce( + messages: messages, + context: new MessageHandlerContext(new Metadata + { + CausationId = "my-causation", + CorrelationId = "my-correlation" + }), + headers: new Dictionary + { + { "additional-key", "additional-value" } + } + ); + + Assert.Equal(2, spy.ProduceCallCount); + var expected = @" + { + ""messageId"":""1"", + ""type"":""bar"", + ""additional-key"":""additional-value"", + ""correlationId"":""my-correlation"", + ""causationId"":""1"", + ""data"": + { + ""id"":""1"" + } + }"; + + AssertJson.Equal(expected, spy.Value); + } + + [Fact] + public async Task produces_multiple_messages_preserves_order() + { + var spy = new KafkaProducerSpy(); + + var sut = A.Producer + .With(spy) + .With(A.OutgoingMessageRegistry + .Register("foo", "bar", @event => @event.Id) + .Build() + ) + .Build(); + + var messages = new[] + { + new Message { Id = "first" }, + new Message { Id = "second" }, + new Message { Id = "third" }, + new Message { Id = "fourth" }, + new Message { Id = "fifth" } + }; + + await sut.Produce(messages); + + Assert.Equal(5, spy.ProduceCallCount); + Assert.Equal("first", spy.ProducedMessages[0].Key); + Assert.Equal("second", spy.ProducedMessages[1].Key); + Assert.Equal("third", spy.ProducedMessages[2].Key); + Assert.Equal("fourth", spy.ProducedMessages[3].Key); + Assert.Equal("fifth", spy.ProducedMessages[4].Key); + } + + [Fact] + public async Task produces_multiple_messages_with_headers_preserves_order() + { + var spy = new KafkaProducerSpy(); + + var sut = A.Producer + .With(spy) + .With(A.OutgoingMessageRegistry + .Register("foo", "bar", @event => @event.Id) + .Build() + ) + .Build(); + + var messages = new[] + { + new Message { Id = "alpha" }, + new Message { Id = "beta" }, + new Message { Id = "gamma" } + }; + + await sut.Produce( + messages: messages, + headers: new Dictionary + { + { "foo-key", "foo-value" } + } + ); + + Assert.Equal(3, spy.ProduceCallCount); + Assert.Equal("alpha", spy.ProducedMessages[0].Key); + Assert.Equal("beta", spy.ProducedMessages[1].Key); + Assert.Equal("gamma", spy.ProducedMessages[2].Key); + Assert.Collection(spy.ProducedMessages, + msg => Assert.Equal(1, msg.Order), + msg => Assert.Equal(2, msg.Order), + msg => Assert.Equal(3, msg.Order) + ); + } + + [Fact] + public async Task produces_multiple_messages_with_context_preserves_order() + { + var spy = new KafkaProducerSpy(); + + var sut = A.Producer + .With(spy) + .With(A.OutgoingMessageRegistry + .Register("foo", "bar", @event => @event.Id) + .Build() + ) + .Build(); + + var messages = new[] + { + new Message { Id = "msg1" }, + new Message { Id = "msg2" }, + new Message { Id = "msg3" }, + new Message { Id = "msg4" } + }; + + await sut.Produce( + messages: messages, + context: new MessageHandlerContext(new Metadata + { + CausationId = "my-causation", + CorrelationId = "my-correlation" + }) + ); + + Assert.Equal(4, spy.ProduceCallCount); + var orderedKeys = spy.ProducedMessages.OrderBy(m => m.Order).Select(m => m.Key).ToArray(); + Assert.Equal(new[] { "msg1", "msg2", "msg3", "msg4" }, orderedKeys); + } } } \ No newline at end of file diff --git a/src/Dafda.Tests/TestDoubles/KafkaProducerSpy.cs b/src/Dafda.Tests/TestDoubles/KafkaProducerSpy.cs index 633ebe1..88d547a 100644 --- a/src/Dafda.Tests/TestDoubles/KafkaProducerSpy.cs +++ b/src/Dafda.Tests/TestDoubles/KafkaProducerSpy.cs @@ -10,6 +10,9 @@ namespace Dafda.Tests.TestDoubles { internal class KafkaProducerSpy : KafkaProducer { + private readonly List _keys = new List(); + private readonly List _producedMessages = new List(); + public KafkaProducerSpy() : this(Enumerable.Empty>(), new DefaultPayloadSerializer()) { @@ -31,6 +34,15 @@ internal override Task InternalProduce(string topic, string key, string value) Key = key; Value = value; ProducerActivityId = Activity.Current?.Id; + ProduceCallCount++; + _keys.Add(key); + _producedMessages.Add(new ProducedMessage + { + Topic = topic, + Key = key, + Value = value, + Order = ProduceCallCount + }); return Task.CompletedTask; } @@ -47,5 +59,16 @@ public override void Dispose() public string Key { get; private set; } public string Value { get; private set; } public string ProducerActivityId { get; private set; } + public int ProduceCallCount { get; private set; } + public IReadOnlyList AllKeys => _keys; + public IReadOnlyList ProducedMessages => _producedMessages; + + public class ProducedMessage + { + public string Topic { get; set; } + public string Key { get; set; } + public string Value { get; set; } + public int Order { get; set; } + } } } \ No newline at end of file diff --git a/src/Dafda/Producing/Producer.cs b/src/Dafda/Producing/Producer.cs index 4405954..c6666b0 100644 --- a/src/Dafda/Producing/Producer.cs +++ b/src/Dafda/Producing/Producer.cs @@ -22,6 +22,61 @@ internal Producer(KafkaProducer kafkaProducer, OutgoingMessageRegistry outgoingM internal string Name { get; set; } = "__Default Producer__"; + /// + /// Produce multiple on Kafka. The messages will be produced in the order of the IEnumerable provided. + /// + /// The messages + public async Task Produce(IEnumerable messages) + { + var produceTasks = messages.Select(Produce); + await Task.WhenAll(produceTasks); + } + + /// + /// Produce multiple on Kafka including . The messages will be produced in the order of the IEnumerable provided. + /// + /// The messages + /// The message headers + public async Task Produce(IEnumerable messages, Metadata headers) + { + var produceTasks = messages.Select(message => Produce(message, headers)); + await Task.WhenAll(produceTasks); + } + + /// + /// Produce multiple on Kafka including . The messages will be produced in the order of the IEnumerable provided. + /// + /// The messages + /// The message headers + public async Task Produce(IEnumerable messages, Dictionary headers) + { + var produceTasks = messages.Select(message => Produce(message, headers)); + await Task.WhenAll(produceTasks); + } + + /// + /// Produce multiple on Kafka. The messages will be produced in the order of the IEnumerable provided. + /// + /// The messages + /// Context from the consumer. Supply this to get correlation and causation id on the new messages + public async Task Produce(IEnumerable messages, MessageHandlerContext context) + { + var produceTasks = messages.Select(message => Produce(message, context)); + await Task.WhenAll(produceTasks); + } + + /// + /// Produce multiple on Kafka. The messages will be produced in the order of the IEnumerable provided. + /// + /// The messages + /// Context from the consumer. Supply this to get correlation and causation id on the new messages + /// Additional message headers + public async Task Produce(IEnumerable messages, MessageHandlerContext context, Dictionary headers) + { + var produceTasks = messages.Select(message => Produce(message, context, headers)); + await Task.WhenAll(produceTasks); + } + /// /// Produce a on Kafka ///