From 9ad9271371d103f8fd51d325fece92c62d21fe96 Mon Sep 17 00:00:00 2001 From: Matthieu Belmontet Date: Tue, 1 Jun 2021 09:57:02 +0200 Subject: [PATCH 1/6] test projection --- .gitignore | 3 +- .../impl/InMemoryEventStore.java | 4 +- .../impl/JooqKafkaTckImplementation.java | 12 +++- .../datastore/DataStoreVerification.java | 56 +++++++++++++++++-- .../datastore/DataStoreVerificationRules.java | 6 ++ .../datastore/TestProjection.java | 32 +++++++++++ .../datastore/InMemoryDataStoreTest.java | 20 +++++++ 7 files changed, 123 insertions(+), 10 deletions(-) create mode 100644 thoth-tck/src/main/java/fr/maif/eventsourcing/datastore/TestProjection.java diff --git a/.gitignore b/.gitignore index 212f41bf..fd36b7c5 100644 --- a/.gitignore +++ b/.gitignore @@ -3,4 +3,5 @@ target .bsp/ .DS_Store -.bloop \ No newline at end of file +.bloop +*.iml \ No newline at end of file diff --git a/thoth-core/src/main/java/fr/maif/eventsourcing/impl/InMemoryEventStore.java b/thoth-core/src/main/java/fr/maif/eventsourcing/impl/InMemoryEventStore.java index da043799..39ba2202 100644 --- a/thoth-core/src/main/java/fr/maif/eventsourcing/impl/InMemoryEventStore.java +++ b/thoth-core/src/main/java/fr/maif/eventsourcing/impl/InMemoryEventStore.java @@ -118,9 +118,7 @@ public Source, NotUsed> loadEventsByQuery(Tuple0 @Override public Source, NotUsed> loadEventsByQuery(Query query) { return Source.from(eventStore) - .filter(e -> { - return Option.of(query.entityId).map(id -> id.equals(e.entityId)).getOrElse(true); - }); + .filter(e -> Option.of(query.entityId).map(id -> id.equals(e.entityId)).getOrElse(true)); } @Override diff --git a/thoth-jooq/src/test/java/fr/maif/eventsourcing/impl/JooqKafkaTckImplementation.java b/thoth-jooq/src/test/java/fr/maif/eventsourcing/impl/JooqKafkaTckImplementation.java index e9061be9..7b46d49f 100644 --- a/thoth-jooq/src/test/java/fr/maif/eventsourcing/impl/JooqKafkaTckImplementation.java +++ b/thoth-jooq/src/test/java/fr/maif/eventsourcing/impl/JooqKafkaTckImplementation.java @@ -15,6 +15,8 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.Executors; +import fr.maif.eventsourcing.Projection; +import fr.maif.eventsourcing.datastore.TestProjection; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; @@ -58,6 +60,7 @@ public class JooqKafkaTckImplementation extends DataStoreVerification()) - .withNoProjections() + .withProjections(this.testProjection) .build(); @@ -228,6 +233,11 @@ public List> readPublishedEvents(String return envelopes; } + @Override + public Integer readProjection() { + return ((TestProjection)this.testProjection).getCount(); + } + private static Optional getEndOffsetIfNotReached(String topic, String kafkaServers, String groupId) { Properties properties = new Properties(); properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServers); diff --git a/thoth-tck/src/main/java/fr/maif/eventsourcing/datastore/DataStoreVerification.java b/thoth-tck/src/main/java/fr/maif/eventsourcing/datastore/DataStoreVerification.java index 4bc7c181..46505a86 100644 --- a/thoth-tck/src/main/java/fr/maif/eventsourcing/datastore/DataStoreVerification.java +++ b/thoth-tck/src/main/java/fr/maif/eventsourcing/datastore/DataStoreVerification.java @@ -12,6 +12,7 @@ import io.vavr.control.Either; import io.vavr.control.Option; +import io.vavr.control.Try; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; @@ -168,11 +169,7 @@ public void required_eventShouldBePublishedEventIfBrokerIsDownAtFirst() { submitValidCommand(eventProcessor, "1"); restartBroker(); - try { - Thread.sleep(10000); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } + sleep(); List> envelopes = deduplicateOnId(readPublishedEvents(kafkaBootstrapUrl(), topic)); cleanup(eventProcessor); @@ -199,6 +196,55 @@ public void required_commandSubmissionShouldFailIfDatabaseIsNotAvailable() { } } + + @Override + @Test + public void required_eventShouldBeConsumedByProjectionWhenEverythingIsAlright(){ + String topic = randomKafkaTopic(); + EventProcessor eventProcessor = eventProcessor(topic); + submitValidCommand(eventProcessor, "1"); + sleep(); + + cleanup(eventProcessor); + assertThat(readProjection()).isEqualTo(1); + } + @Override + @Test + public void required_eventShouldBeConsumedByProjectionEvenIfBrokerIsDownAtFirst(){ + String topic = randomKafkaTopic(); + EventProcessor eventProcessor = eventProcessor(topic); + shutdownBroker(); + submitValidCommand(eventProcessor, "1"); + sleep(); + restartBroker(); + sleep(); + cleanup(eventProcessor); + assertThat(readProjection()).isEqualTo(1); + } + @Override + @Test + public void required_eventShouldNotBeConsumedByProjectionEvenIfDataBaseIsBroken(){ + String topic = randomKafkaTopic(); + EventProcessor eventProcessor = eventProcessor(topic); + shutdownDatabase(); + try { + submitValidCommand(eventProcessor, "1"); + }catch (Throwable t){} + sleep(); + cleanup(eventProcessor); + assertThat(readProjection()).isEqualTo(0); + restartDatabase(); + } + + private void sleep() { + try { + Thread.sleep(10000); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + + @Override public Either> submitValidCommand( EventProcessor eventProcessor, diff --git a/thoth-tck/src/main/java/fr/maif/eventsourcing/datastore/DataStoreVerificationRules.java b/thoth-tck/src/main/java/fr/maif/eventsourcing/datastore/DataStoreVerificationRules.java index a6ee48bc..937fc183 100644 --- a/thoth-tck/src/main/java/fr/maif/eventsourcing/datastore/DataStoreVerificationRules.java +++ b/thoth-tck/src/main/java/fr/maif/eventsourcing/datastore/DataStoreVerificationRules.java @@ -19,6 +19,7 @@ public interface DataStoreVerificationRules readState(EventProcessor eventProcessor, String id); void submitDeleteCommand(EventProcessor eventProcessor, String id); List> readPublishedEvents(String kafkaBootstrapUrl, String topic); + Integer readProjection(); void shutdownBroker(); void restartBroker(); void shutdownDatabase(); @@ -38,6 +39,11 @@ public interface DataStoreVerificationRules> readFromDataStore(EventStore eventStore); default void cleanup( diff --git a/thoth-tck/src/main/java/fr/maif/eventsourcing/datastore/TestProjection.java b/thoth-tck/src/main/java/fr/maif/eventsourcing/datastore/TestProjection.java new file mode 100644 index 00000000..fd667c44 --- /dev/null +++ b/thoth-tck/src/main/java/fr/maif/eventsourcing/datastore/TestProjection.java @@ -0,0 +1,32 @@ +package fr.maif.eventsourcing.datastore; + +import fr.maif.eventsourcing.EventEnvelope; +import fr.maif.eventsourcing.Projection; +import io.vavr.Tuple; +import io.vavr.Tuple0; +import io.vavr.collection.List; +import io.vavr.concurrent.Future; + +import java.sql.Connection; + +public class TestProjection implements Projection { + private int counter = 0; + + @Override + public Future storeProjection(Connection connection, List> envelopes) { + return Future.of(() -> { + envelopes.forEach(envelope -> { + if (envelope.event instanceof TestEvent.SimpleEvent) { + counter++; + } + }); + return Tuple.empty(); + }); + } + + public int getCount() { + return counter; + } + + +} diff --git a/thoth-tck/src/test/java/fr/maif/eventsourcing/datastore/InMemoryDataStoreTest.java b/thoth-tck/src/test/java/fr/maif/eventsourcing/datastore/InMemoryDataStoreTest.java index d648ae70..097ad5aa 100644 --- a/thoth-tck/src/test/java/fr/maif/eventsourcing/datastore/InMemoryDataStoreTest.java +++ b/thoth-tck/src/test/java/fr/maif/eventsourcing/datastore/InMemoryDataStoreTest.java @@ -4,6 +4,7 @@ import java.util.concurrent.ExecutionException; import java.util.function.Function; +import fr.maif.eventsourcing.Projection; import org.mockito.Mockito; import org.testng.annotations.BeforeMethod; @@ -22,6 +23,7 @@ public class InMemoryDataStoreTest extends DataStoreVerification { public InMemoryEventStore eventStore; public EventProcessor eventProcessor; + @BeforeMethod(alwaysRun = true) public void init() { this.eventStore = Mockito.spy(InMemoryEventStore.create(actorSystem)); @@ -46,6 +48,24 @@ public List> readPublishedEvents(String } } + @Override + public Integer readProjection() { + // Not implemented for in memory + return null; + } + + @Override + public void required_eventShouldBeConsumedByProjectionWhenEverythingIsAlright(){ + // Not implemented for in memory + } + @Override + public void required_eventShouldBeConsumedByProjectionEvenIfBrokerIsDownAtFirst(){ + // Not implemented for in memory + } + @Override + public void required_eventShouldNotBeConsumedByProjectionEvenIfDataBaseIsBroken(){ + // Not implemented for in memory + } @Override public void required_commandSubmissionShouldFailIfDatabaseIsNotAvailable() { // Not implemented for in memory From 313e77b59b6bd3b06296689513692186fbc58efc Mon Sep 17 00:00:00 2001 From: Matthieu Belmontet Date: Tue, 1 Jun 2021 14:49:10 +0200 Subject: [PATCH 2/6] tck consitent projection --- .../impl/JooqKafkaTckImplementation.java | 7 + .../datastore/DataStoreVerification.java | 531 +++++++++--------- .../datastore/DataStoreVerificationRules.java | 4 +- .../datastore/TestConsistentProjection.java | 53 ++ .../datastore/InMemoryDataStoreTest.java | 226 ++++---- 5 files changed, 454 insertions(+), 367 deletions(-) create mode 100644 thoth-tck/src/main/java/fr/maif/eventsourcing/datastore/TestConsistentProjection.java diff --git a/thoth-jooq/src/test/java/fr/maif/eventsourcing/impl/JooqKafkaTckImplementation.java b/thoth-jooq/src/test/java/fr/maif/eventsourcing/impl/JooqKafkaTckImplementation.java index 7b46d49f..10552c46 100644 --- a/thoth-jooq/src/test/java/fr/maif/eventsourcing/impl/JooqKafkaTckImplementation.java +++ b/thoth-jooq/src/test/java/fr/maif/eventsourcing/impl/JooqKafkaTckImplementation.java @@ -16,6 +16,7 @@ import java.util.concurrent.Executors; import fr.maif.eventsourcing.Projection; +import fr.maif.eventsourcing.datastore.TestConsistentProjection; import fr.maif.eventsourcing.datastore.TestProjection; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; @@ -101,6 +102,7 @@ public void initClass() { postgres.start(); kafka = new KafkaContainer(); kafka.start(); + consistentProjection = new TestConsistentProjection(actorSystem,kafka.getBootstrapServers(),eventFormat,dataSource); } @@ -238,6 +240,11 @@ public Integer readProjection() { return ((TestProjection)this.testProjection).getCount(); } + @Override + public Integer readConsistentProjection() { + return consistentProjection.getCount(); + } + private static Optional getEndOffsetIfNotReached(String topic, String kafkaServers, String groupId) { Properties properties = new Properties(); properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServers); diff --git a/thoth-tck/src/main/java/fr/maif/eventsourcing/datastore/DataStoreVerification.java b/thoth-tck/src/main/java/fr/maif/eventsourcing/datastore/DataStoreVerification.java index 46505a86..eb8ea14e 100644 --- a/thoth-tck/src/main/java/fr/maif/eventsourcing/datastore/DataStoreVerification.java +++ b/thoth-tck/src/main/java/fr/maif/eventsourcing/datastore/DataStoreVerification.java @@ -6,293 +6,304 @@ import fr.maif.eventsourcing.EventProcessor; import fr.maif.eventsourcing.EventStore; import fr.maif.eventsourcing.ProcessingSuccess; -import fr.maif.eventsourcing.format.JacksonSimpleFormat; -import fr.maif.json.EventEnvelopeJson; import io.vavr.Tuple0; import io.vavr.control.Either; import io.vavr.control.Option; - -import io.vavr.control.Try; -import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.clients.consumer.ConsumerRecords; -import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.apache.kafka.common.PartitionInfo; -import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.serialization.StringDeserializer; import org.testng.annotations.Test; -import java.io.IOException; -import java.io.UncheckedIOException; -import java.time.Duration; -import java.util.ArrayList; -import java.util.Collections; import java.util.List; -import java.util.Optional; -import java.util.Properties; import java.util.UUID; -import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.stream.Collectors; import static org.assertj.core.api.Assertions.assertThat; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.node.ObjectNode; +public abstract class DataStoreVerification implements DataStoreVerificationRules { + public ActorSystem actorSystem = ActorSystem.create(); + protected TestConsistentProjection consistentProjection; + + public abstract EventProcessor eventProcessor(String topic); + + public abstract String kafkaBootstrapUrl(); + + @Override + @Test + public void required_submitValidSingleEventCommandMustWriteEventInDataStore() { + String topic = randomKafkaTopic(); + final EventProcessor eventProcessor = eventProcessor(topic); + submitValidCommand(eventProcessor, "1"); + + List> envelopes = deduplicateOnId(readFromDataStore(eventProcessor.eventStore())); -public abstract class DataStoreVerification implements DataStoreVerificationRules{ - public ActorSystem actorSystem = ActorSystem.create(); - public abstract EventProcessor eventProcessor(String topic); - public abstract String kafkaBootstrapUrl(); + cleanup(eventProcessor); + assertThat(envelopes).hasSize(1); + } - @Override - @Test - public void required_submitValidSingleEventCommandMustWriteEventInDataStore() { - String topic = randomKafkaTopic(); - final EventProcessor eventProcessor = eventProcessor(topic); - submitValidCommand(eventProcessor, "1"); + @Override + @Test + public void required_submitInvalidCommandMustNotWriteEventsIntDataStore() { + String topic = randomKafkaTopic(); + final EventProcessor eventProcessor = eventProcessor(topic); + submitInvalidCommand(eventProcessor, "1"); - List> envelopes = deduplicateOnId(readFromDataStore(eventProcessor.eventStore())); + List> envelopes = deduplicateOnId(readFromDataStore(eventProcessor.eventStore())); - cleanup(eventProcessor); - assertThat(envelopes).hasSize(1); - } + cleanup(eventProcessor); + assertThat(envelopes).isEmpty(); + } - @Override - @Test - public void required_submitInvalidCommandMustNotWriteEventsIntDataStore() { - String topic = randomKafkaTopic(); - final EventProcessor eventProcessor = eventProcessor(topic); - submitInvalidCommand(eventProcessor, "1"); + @Override + @Test + public void required_submitMultiEventCommandMustWriteAllEventsInDataStore() { + String topic = randomKafkaTopic(); + final EventProcessor eventProcessor = eventProcessor(topic); + submitMultiEventsCommand(eventProcessor, "1"); - List> envelopes = deduplicateOnId(readFromDataStore(eventProcessor.eventStore())); + List> envelopes = deduplicateOnId(readFromDataStore(eventProcessor.eventStore())); - cleanup(eventProcessor); - assertThat(envelopes).isEmpty(); - } + cleanup(eventProcessor); - @Override - @Test - public void required_submitMultiEventCommandMustWriteAllEventsInDataStore() { - String topic = randomKafkaTopic(); - final EventProcessor eventProcessor = eventProcessor(topic); - submitMultiEventsCommand(eventProcessor, "1"); + assertThat(envelopes.size()).isGreaterThan(1); + List ids = envelopes.stream().map(envelope -> envelope.id).collect(Collectors.toList()); - List> envelopes = deduplicateOnId(readFromDataStore(eventProcessor.eventStore())); + assertThat(ids).doesNotHaveDuplicates(); + } - cleanup(eventProcessor); + @Override + @Test + public void required_aggregateOfSingleEventStateShouldBeCorrect() { + String topic = randomKafkaTopic(); + EventProcessor eventProcessor = eventProcessor(topic); - assertThat(envelopes.size()).isGreaterThan(1); - List ids = envelopes.stream().map(envelope -> envelope.id).collect(Collectors.toList()); + submitValidCommand(eventProcessor, "1"); + Option state = readState(eventProcessor, "1"); - assertThat(ids).doesNotHaveDuplicates(); - } + cleanup(eventProcessor); - @Override - @Test - public void required_aggregateOfSingleEventStateShouldBeCorrect() { - String topic = randomKafkaTopic(); - EventProcessor eventProcessor = eventProcessor(topic); + assertThat(state.isDefined()).isTrue(); + assertThat(state.get().count).isEqualTo(1); + } - submitValidCommand(eventProcessor, "1"); - Option state = readState(eventProcessor, "1"); + @Override + @Test + public void required_aggregateOfDeleteEventStateShouldBeEmpty() { + String topic = randomKafkaTopic(); + EventProcessor eventProcessor = eventProcessor(topic); - cleanup(eventProcessor); + submitValidCommand(eventProcessor, "1"); + submitDeleteCommand(eventProcessor, "1"); + Option state = readState(eventProcessor, "1"); - assertThat(state.isDefined()).isTrue(); - assertThat(state.get().count).isEqualTo(1); - } + cleanup(eventProcessor); - @Override - @Test - public void required_aggregateOfDeleteEventStateShouldBeEmpty() { - String topic = randomKafkaTopic(); - EventProcessor eventProcessor = eventProcessor(topic); - - submitValidCommand(eventProcessor, "1"); - submitDeleteCommand(eventProcessor, "1"); - Option state = readState(eventProcessor, "1"); - - cleanup(eventProcessor); - - assertThat(state.isEmpty()).isTrue(); - } - - @Override - @Test - public void required_aggregateOfMultipleEventStateShouldBeCorrect() { - String topic = randomKafkaTopic(); - EventProcessor eventProcessor = eventProcessor(topic); - - submitMultiEventsCommand(eventProcessor, "1"); - Option state = readState(eventProcessor, "1"); - - cleanup(eventProcessor); - - assertThat(state.isDefined()).isTrue(); - assertThat(state.get().count).isEqualTo(2); - } - - @Override - @Test - public void required_singleEventShouldBePublished() { - String topic = randomKafkaTopic(); - EventProcessor eventProcessor = eventProcessor(topic); - - submitValidCommand(eventProcessor, "1"); - List> envelopes = deduplicateOnId(readPublishedEvents(kafkaBootstrapUrl(), topic)); - - cleanup(eventProcessor); - - assertThat(envelopes).hasSize(1); - } - - @Override - @Test - public void required_multipleEventsShouldBePublished() { - String topic = randomKafkaTopic(); - EventProcessor eventProcessor = eventProcessor(topic); - submitMultiEventsCommand(eventProcessor, "1"); - List> envelopes = deduplicateOnId(readPublishedEvents(kafkaBootstrapUrl(), topic)); - - cleanup(eventProcessor); - - assertThat(envelopes).hasSize(2); - } - - @Override - @Test - public void required_eventShouldBePublishedEventIfBrokerIsDownAtFirst() { - String topic = randomKafkaTopic(); - EventProcessor eventProcessor = eventProcessor(topic); - shutdownBroker(); - submitValidCommand(eventProcessor, "1"); - - restartBroker(); - sleep(); - List> envelopes = deduplicateOnId(readPublishedEvents(kafkaBootstrapUrl(), topic)); - - cleanup(eventProcessor); - - assertThat(envelopes).hasSize(1); - } - - @Override - @Test - public void required_commandSubmissionShouldFailIfDatabaseIsNotAvailable() { - String topic = randomKafkaTopic(); - EventProcessor eventProcessor = eventProcessor(topic); - shutdownDatabase(); - try { - Either> result = submitValidCommand(eventProcessor, "1"); - - cleanup(eventProcessor); - - assertThat(result.isLeft()).isTrue(); - } catch(Throwable ex) { - // implementation should either return an embedded error in either, either throw an exception - }finally { - restartDatabase(); - } - } - - - @Override - @Test - public void required_eventShouldBeConsumedByProjectionWhenEverythingIsAlright(){ - String topic = randomKafkaTopic(); - EventProcessor eventProcessor = eventProcessor(topic); - submitValidCommand(eventProcessor, "1"); - sleep(); - - cleanup(eventProcessor); - assertThat(readProjection()).isEqualTo(1); - } - @Override - @Test - public void required_eventShouldBeConsumedByProjectionEvenIfBrokerIsDownAtFirst(){ - String topic = randomKafkaTopic(); - EventProcessor eventProcessor = eventProcessor(topic); - shutdownBroker(); - submitValidCommand(eventProcessor, "1"); - sleep(); - restartBroker(); - sleep(); - cleanup(eventProcessor); - assertThat(readProjection()).isEqualTo(1); - } - @Override - @Test - public void required_eventShouldNotBeConsumedByProjectionEvenIfDataBaseIsBroken(){ - String topic = randomKafkaTopic(); - EventProcessor eventProcessor = eventProcessor(topic); - shutdownDatabase(); - try { - submitValidCommand(eventProcessor, "1"); - }catch (Throwable t){} - sleep(); - cleanup(eventProcessor); - assertThat(readProjection()).isEqualTo(0); - restartDatabase(); - } - - private void sleep() { - try { - Thread.sleep(10000); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - } - - - @Override - public Either> submitValidCommand( - EventProcessor eventProcessor, - String id) { - return eventProcessor.processCommand(new TestCommand.SimpleCommand(id)).get(); - } - - @Override - public void submitInvalidCommand( - EventProcessor eventProcessor, - String id - ) { - eventProcessor.processCommand(new TestCommand.InvalidCommand(id)).get(); - - } - - @Override - public void submitMultiEventsCommand( - EventProcessor eventProcessor, - String id - ) { - eventProcessor.processCommand(new TestCommand.MultiEventCommand(id)).get(); - } - - @Override - public void submitDeleteCommand(EventProcessor eventProcessor, String id) { - eventProcessor.processCommand(new TestCommand.DeleteCommand(id)).get(); - } - - @Override - public Option readState(EventProcessor eventProcessor, String id) { - return eventProcessor.getAggregate(id).get(); - } - - @Override - public List> readFromDataStore(EventStore eventStore) { - try { - return eventStore.loadAllEvents().runWith(Sink.seq(), actorSystem).toCompletableFuture().get(); - } catch (InterruptedException | ExecutionException e) { - throw new RuntimeException(e); - } - } - - public String randomKafkaTopic() { - return "test-topic" + UUID.randomUUID(); - } - - private List> deduplicateOnId(List> envelopes) { - return io.vavr.collection.List.ofAll(envelopes).distinctBy(env -> env.id).toJavaList(); - } + assertThat(state.isEmpty()).isTrue(); + } + + @Override + @Test + public void required_aggregateOfMultipleEventStateShouldBeCorrect() { + String topic = randomKafkaTopic(); + EventProcessor eventProcessor = eventProcessor(topic); + + submitMultiEventsCommand(eventProcessor, "1"); + Option state = readState(eventProcessor, "1"); + + cleanup(eventProcessor); + + assertThat(state.isDefined()).isTrue(); + assertThat(state.get().count).isEqualTo(2); + } + + @Override + @Test + public void required_singleEventShouldBePublished() { + String topic = randomKafkaTopic(); + EventProcessor eventProcessor = eventProcessor(topic); + + submitValidCommand(eventProcessor, "1"); + List> envelopes = deduplicateOnId(readPublishedEvents(kafkaBootstrapUrl(), topic)); + + cleanup(eventProcessor); + + assertThat(envelopes).hasSize(1); + } + + @Override + @Test + public void required_multipleEventsShouldBePublished() { + String topic = randomKafkaTopic(); + EventProcessor eventProcessor = eventProcessor(topic); + submitMultiEventsCommand(eventProcessor, "1"); + List> envelopes = deduplicateOnId(readPublishedEvents(kafkaBootstrapUrl(), topic)); + + cleanup(eventProcessor); + + assertThat(envelopes).hasSize(2); + } + + @Override + @Test + public void required_eventShouldBePublishedEventIfBrokerIsDownAtFirst() { + String topic = randomKafkaTopic(); + EventProcessor eventProcessor = eventProcessor(topic); + shutdownBroker(); + submitValidCommand(eventProcessor, "1"); + + restartBroker(); + sleep(); + List> envelopes = deduplicateOnId(readPublishedEvents(kafkaBootstrapUrl(), topic)); + + cleanup(eventProcessor); + + assertThat(envelopes).hasSize(1); + } + + @Override + @Test + public void required_commandSubmissionShouldFailIfDatabaseIsNotAvailable() { + String topic = randomKafkaTopic(); + EventProcessor eventProcessor = eventProcessor(topic); + shutdownDatabase(); + try { + Either> result = submitValidCommand(eventProcessor, "1"); + + cleanup(eventProcessor); + + assertThat(result.isLeft()).isTrue(); + } catch (Throwable ex) { + // implementation should either return an embedded error in either, either throw an exception + } finally { + restartDatabase(); + } + } + + + @Override + @Test + public void required_eventShouldBeConsumedByProjectionWhenEverythingIsAlright() { + String topic = randomKafkaTopic(); + EventProcessor eventProcessor = eventProcessor(topic); + submitValidCommand(eventProcessor, "1"); + sleep(); + + cleanup(eventProcessor); + assertThat(readProjection()).isEqualTo(1); + } + + @Override + @Test + public void required_eventShouldBeConsumedByProjectionEvenIfBrokerIsDownAtFirst() { + String topic = randomKafkaTopic(); + EventProcessor eventProcessor = eventProcessor(topic); + shutdownBroker(); + submitValidCommand(eventProcessor, "1"); + sleep(); + restartBroker(); + sleep(); + cleanup(eventProcessor); + assertThat(readProjection()).isEqualTo(1); + } + + @Override + @Test + public void required_eventShouldNotBeConsumedByProjectionEvenIfDataBaseIsBroken() { + String topic = randomKafkaTopic(); + EventProcessor eventProcessor = eventProcessor(topic); + shutdownDatabase(); + try { + submitValidCommand(eventProcessor, "1"); + } catch (Throwable t) { + } + sleep(); + cleanup(eventProcessor); + assertThat(readProjection()).isEqualTo(0); + restartDatabase(); + } + + @Override + public void required_eventShouldBeConsumedByConsistentProjectionWhenEverythingIsAlright() { + + String topic = randomKafkaTopic(); + consistentProjection.init(topic); + EventProcessor eventProcessor = eventProcessor(topic); + submitValidCommand(eventProcessor, "1"); + sleep(); + + cleanup(eventProcessor); + assertThat(readConsistentProjection()).isEqualTo(1); + } + + @Override + public void required_eventShouldBeConsumedByConsistentProjectionEvenIfBrokerIsDownAtFirst() { + String topic = randomKafkaTopic(); + consistentProjection.init(topic); + EventProcessor eventProcessor = eventProcessor(topic); + shutdownBroker(); + submitValidCommand(eventProcessor, "1"); + sleep(); + restartBroker(); + sleep(); + cleanup(eventProcessor); + assertThat(readConsistentProjection()).isEqualTo(1); + } + + private void sleep() { + try { + Thread.sleep(10000); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + + + @Override + public Either> submitValidCommand( + EventProcessor eventProcessor, + String id) { + return eventProcessor.processCommand(new TestCommand.SimpleCommand(id)).get(); + } + + @Override + public void submitInvalidCommand( + EventProcessor eventProcessor, + String id + ) { + eventProcessor.processCommand(new TestCommand.InvalidCommand(id)).get(); + + } + + @Override + public void submitMultiEventsCommand( + EventProcessor eventProcessor, + String id + ) { + eventProcessor.processCommand(new TestCommand.MultiEventCommand(id)).get(); + } + + @Override + public void submitDeleteCommand(EventProcessor eventProcessor, String id) { + eventProcessor.processCommand(new TestCommand.DeleteCommand(id)).get(); + } + + @Override + public Option readState(EventProcessor eventProcessor, String id) { + return eventProcessor.getAggregate(id).get(); + } + + @Override + public List> readFromDataStore(EventStore eventStore) { + try { + return eventStore.loadAllEvents().runWith(Sink.seq(), actorSystem).toCompletableFuture().get(); + } catch (InterruptedException | ExecutionException e) { + throw new RuntimeException(e); + } + } + + public String randomKafkaTopic() { + return "test-topic" + UUID.randomUUID(); + } + + private List> deduplicateOnId(List> envelopes) { + return io.vavr.collection.List.ofAll(envelopes).distinctBy(env -> env.id).toJavaList(); + } } diff --git a/thoth-tck/src/main/java/fr/maif/eventsourcing/datastore/DataStoreVerificationRules.java b/thoth-tck/src/main/java/fr/maif/eventsourcing/datastore/DataStoreVerificationRules.java index 937fc183..5a3722e6 100644 --- a/thoth-tck/src/main/java/fr/maif/eventsourcing/datastore/DataStoreVerificationRules.java +++ b/thoth-tck/src/main/java/fr/maif/eventsourcing/datastore/DataStoreVerificationRules.java @@ -20,6 +20,7 @@ public interface DataStoreVerificationRules eventProcessor, String id); List> readPublishedEvents(String kafkaBootstrapUrl, String topic); Integer readProjection(); + Integer readConsistentProjection(); void shutdownBroker(); void restartBroker(); void shutdownDatabase(); @@ -43,7 +44,8 @@ public interface DataStoreVerificationRules> readFromDataStore(EventStore eventStore); default void cleanup( diff --git a/thoth-tck/src/main/java/fr/maif/eventsourcing/datastore/TestConsistentProjection.java b/thoth-tck/src/main/java/fr/maif/eventsourcing/datastore/TestConsistentProjection.java new file mode 100644 index 00000000..772189ee --- /dev/null +++ b/thoth-tck/src/main/java/fr/maif/eventsourcing/datastore/TestConsistentProjection.java @@ -0,0 +1,53 @@ +package fr.maif.eventsourcing.datastore; + +import akka.actor.ActorSystem; +import fr.maif.projections.EventuallyConsistentProjection; +import io.vavr.Tuple; +import io.vavr.concurrent.Future; + +import javax.sql.DataSource; +import java.sql.PreparedStatement; +import java.sql.SQLException; + +public class TestConsistentProjection { + + private int counter = 0; + private final ActorSystem actorSystem; + private final String bootstrapServer; + private final TestEventFormat eventFormat; + private final DataSource dataSource; + + public TestConsistentProjection( + ActorSystem actorSystem, + String bootstrapServer, + TestEventFormat eventFormat, + DataSource dataSource) { + this.actorSystem = actorSystem; + this.eventFormat = eventFormat; + this.dataSource = dataSource; + this.bootstrapServer =bootstrapServer; + } + + + public void init(String topic) { + this.counter = 0; + EventuallyConsistentProjection.create( + ActorSystem.create(), + "TestConsistentProjection", + EventuallyConsistentProjection.Config.create(topic, "TestConsistentProjection", bootstrapServer), + eventFormat, + envelope -> + Future.of(() -> { + if (envelope.event instanceof TestEvent.SimpleEvent){ + counter++; + } + return Tuple.empty(); + }) + + ).start(); + } + + public int getCount() { + return counter; + } +} diff --git a/thoth-tck/src/test/java/fr/maif/eventsourcing/datastore/InMemoryDataStoreTest.java b/thoth-tck/src/test/java/fr/maif/eventsourcing/datastore/InMemoryDataStoreTest.java index 097ad5aa..771a831b 100644 --- a/thoth-tck/src/test/java/fr/maif/eventsourcing/datastore/InMemoryDataStoreTest.java +++ b/thoth-tck/src/test/java/fr/maif/eventsourcing/datastore/InMemoryDataStoreTest.java @@ -1,119 +1,133 @@ package fr.maif.eventsourcing.datastore; -import java.util.List; -import java.util.concurrent.ExecutionException; -import java.util.function.Function; - -import fr.maif.eventsourcing.Projection; -import org.mockito.Mockito; -import org.testng.annotations.BeforeMethod; - -import akka.actor.ActorSystem; import akka.stream.javadsl.Sink; import fr.maif.eventsourcing.EventEnvelope; import fr.maif.eventsourcing.EventProcessor; -import fr.maif.eventsourcing.EventStore; import fr.maif.eventsourcing.TransactionManager; import fr.maif.eventsourcing.impl.InMemoryEventStore; import io.vavr.Tuple; import io.vavr.Tuple0; import io.vavr.concurrent.Future; +import org.mockito.Mockito; +import org.testng.annotations.BeforeMethod; + +import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.function.Function; public class InMemoryDataStoreTest extends DataStoreVerification { - public InMemoryEventStore eventStore; - public EventProcessor eventProcessor; - - - @BeforeMethod(alwaysRun = true) - public void init() { - this.eventStore = Mockito.spy(InMemoryEventStore.create(actorSystem)); - this.eventProcessor = new EventProcessor<>( - actorSystem, - eventStore, - noOpTransactionManager(), - new TestCommandHandler(), - new TestEventHandler(), - io.vavr.collection.List.empty() - ); - } - - @Override - public List> readPublishedEvents(String kafkaBootStrapUrl, String topic) { - try { - return this.eventStore.loadAllEvents().runWith(Sink.seq(), actorSystem).toCompletableFuture().get(); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } catch (ExecutionException e) { - throw new RuntimeException(e); - } - } - - @Override - public Integer readProjection() { - // Not implemented for in memory - return null; - } - - @Override - public void required_eventShouldBeConsumedByProjectionWhenEverythingIsAlright(){ - // Not implemented for in memory - } - @Override - public void required_eventShouldBeConsumedByProjectionEvenIfBrokerIsDownAtFirst(){ - // Not implemented for in memory - } - @Override - public void required_eventShouldNotBeConsumedByProjectionEvenIfDataBaseIsBroken(){ - // Not implemented for in memory - } - @Override - public void required_commandSubmissionShouldFailIfDatabaseIsNotAvailable() { - // Not implemented for in memory - } - - - @Override - public void required_eventShouldBePublishedEventIfBrokerIsDownAtFirst() { - // Not implemented for in memory - } - - @Override - public void shutdownBroker() { - throw new RuntimeException("Not implemented for in memory"); - } - - @Override - public void restartBroker() { - Mockito.reset(eventStore); - throw new RuntimeException("Not implemented for in memory"); - } - - @Override - public void shutdownDatabase() { - throw new RuntimeException("Not implemented for in memory"); - } - - @Override - public void restartDatabase() { - throw new RuntimeException("Not implemented for in memory"); - } - - @Override - public EventProcessor eventProcessor(String topic) { - return this.eventProcessor; - } - - @Override - public String kafkaBootstrapUrl() { - return null; - } - - private TransactionManager noOpTransactionManager() { - return new TransactionManager() { - @Override - public Future withTransaction(Function> function) { - return function.apply(Tuple.empty()); - } - }; - } + public InMemoryEventStore eventStore; + public EventProcessor eventProcessor; + + + @BeforeMethod(alwaysRun = true) + public void init() { + this.eventStore = Mockito.spy(InMemoryEventStore.create(actorSystem)); + this.eventProcessor = new EventProcessor<>( + actorSystem, + eventStore, + noOpTransactionManager(), + new TestCommandHandler(), + new TestEventHandler(), + io.vavr.collection.List.empty() + ); + } + + @Override + public List> readPublishedEvents(String kafkaBootStrapUrl, String topic) { + try { + return this.eventStore.loadAllEvents().runWith(Sink.seq(), actorSystem).toCompletableFuture().get(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } catch (ExecutionException e) { + throw new RuntimeException(e); + } + } + + @Override + public Integer readProjection() { + // Not implemented for in memory + return null; + } + + @Override + public Integer readConsistentProjection() { + // Not implemented for in memory + return null; + } + + @Override + public void required_eventShouldBeConsumedByProjectionWhenEverythingIsAlright() { + // Not implemented for in memory + } + + @Override + public void required_eventShouldBeConsumedByProjectionEvenIfBrokerIsDownAtFirst() { + // Not implemented for in memory + } + + @Override + public void required_eventShouldNotBeConsumedByProjectionEvenIfDataBaseIsBroken() { + // Not implemented for in memory + } + + @Override + public void required_commandSubmissionShouldFailIfDatabaseIsNotAvailable() { + // Not implemented for in memory + } + + @Override + public void required_eventShouldBePublishedEventIfBrokerIsDownAtFirst() { + // Not implemented for in memory + } + + @Override + public void required_eventShouldBeConsumedByConsistentProjectionWhenEverythingIsAlright() { + // Not implemented for in memory + } + + @Override + public void required_eventShouldBeConsumedByConsistentProjectionEvenIfBrokerIsDownAtFirst() { + // Not implemented for in memory + } + + @Override + public void shutdownBroker() { + throw new RuntimeException("Not implemented for in memory"); + } + + @Override + public void restartBroker() { + Mockito.reset(eventStore); + throw new RuntimeException("Not implemented for in memory"); + } + + @Override + public void shutdownDatabase() { + throw new RuntimeException("Not implemented for in memory"); + } + + @Override + public void restartDatabase() { + throw new RuntimeException("Not implemented for in memory"); + } + + @Override + public EventProcessor eventProcessor(String topic) { + return this.eventProcessor; + } + + @Override + public String kafkaBootstrapUrl() { + return null; + } + + private TransactionManager noOpTransactionManager() { + return new TransactionManager() { + @Override + public Future withTransaction(Function> function) { + return function.apply(Tuple.empty()); + } + }; + } } From 22aef47f49cebe0754db2a25c23447dafd083455 Mon Sep 17 00:00:00 2001 From: Matthieu Belmontet Date: Wed, 2 Jun 2021 09:01:36 +0200 Subject: [PATCH 3/6] format code --- .../impl/JooqKafkaTckImplementation.java | 6 +- .../datastore/DataStoreVerification.java | 534 +++++++++--------- .../datastore/TestConsistentProjection.java | 80 ++- .../TestInstransactionProjection.java | 26 +- .../datastore/TestProjection.java | 32 -- .../datastore/InMemoryDataStoreTest.java | 228 ++++---- 6 files changed, 443 insertions(+), 463 deletions(-) delete mode 100644 thoth-tck/src/main/java/fr/maif/eventsourcing/datastore/TestProjection.java diff --git a/thoth-jooq/src/test/java/fr/maif/eventsourcing/impl/JooqKafkaTckImplementation.java b/thoth-jooq/src/test/java/fr/maif/eventsourcing/impl/JooqKafkaTckImplementation.java index 10552c46..ab1d3e58 100644 --- a/thoth-jooq/src/test/java/fr/maif/eventsourcing/impl/JooqKafkaTckImplementation.java +++ b/thoth-jooq/src/test/java/fr/maif/eventsourcing/impl/JooqKafkaTckImplementation.java @@ -17,7 +17,7 @@ import fr.maif.eventsourcing.Projection; import fr.maif.eventsourcing.datastore.TestConsistentProjection; -import fr.maif.eventsourcing.datastore.TestProjection; +import fr.maif.eventsourcing.datastore.TestInstransactionProjection; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; @@ -108,7 +108,7 @@ public void initClass() { @BeforeMethod(alwaysRun = true) public void init() throws SQLException { - this.testProjection = new TestProjection(); + this.testProjection = new TestInstransactionProjection(); this.dataSource = new PGSimpleDataSource(); dataSource.setUrl(postgres.getJdbcUrl()); dataSource.setUser(postgres.getUsername()); @@ -237,7 +237,7 @@ public List> readPublishedEvents(String @Override public Integer readProjection() { - return ((TestProjection)this.testProjection).getCount(); + return ((TestInstransactionProjection)this.testProjection).getCount(); } @Override diff --git a/thoth-tck/src/main/java/fr/maif/eventsourcing/datastore/DataStoreVerification.java b/thoth-tck/src/main/java/fr/maif/eventsourcing/datastore/DataStoreVerification.java index eb8ea14e..9f7f39ae 100644 --- a/thoth-tck/src/main/java/fr/maif/eventsourcing/datastore/DataStoreVerification.java +++ b/thoth-tck/src/main/java/fr/maif/eventsourcing/datastore/DataStoreVerification.java @@ -19,291 +19,291 @@ import static org.assertj.core.api.Assertions.assertThat; public abstract class DataStoreVerification implements DataStoreVerificationRules { - public ActorSystem actorSystem = ActorSystem.create(); - protected TestConsistentProjection consistentProjection; + public ActorSystem actorSystem = ActorSystem.create(); + protected TestConsistentProjection consistentProjection; - public abstract EventProcessor eventProcessor(String topic); + public abstract EventProcessor eventProcessor(String topic); - public abstract String kafkaBootstrapUrl(); + public abstract String kafkaBootstrapUrl(); - @Override - @Test - public void required_submitValidSingleEventCommandMustWriteEventInDataStore() { - String topic = randomKafkaTopic(); - final EventProcessor eventProcessor = eventProcessor(topic); - submitValidCommand(eventProcessor, "1"); + @Override + @Test + public void required_submitValidSingleEventCommandMustWriteEventInDataStore() { + String topic = randomKafkaTopic(); + final EventProcessor eventProcessor = eventProcessor(topic); + submitValidCommand(eventProcessor, "1"); - List> envelopes = deduplicateOnId(readFromDataStore(eventProcessor.eventStore())); + List> envelopes = deduplicateOnId(readFromDataStore(eventProcessor.eventStore())); - cleanup(eventProcessor); - assertThat(envelopes).hasSize(1); - } + cleanup(eventProcessor); + assertThat(envelopes).hasSize(1); + } - @Override - @Test - public void required_submitInvalidCommandMustNotWriteEventsIntDataStore() { - String topic = randomKafkaTopic(); - final EventProcessor eventProcessor = eventProcessor(topic); - submitInvalidCommand(eventProcessor, "1"); + @Override + @Test + public void required_submitInvalidCommandMustNotWriteEventsIntDataStore() { + String topic = randomKafkaTopic(); + final EventProcessor eventProcessor = eventProcessor(topic); + submitInvalidCommand(eventProcessor, "1"); - List> envelopes = deduplicateOnId(readFromDataStore(eventProcessor.eventStore())); + List> envelopes = deduplicateOnId(readFromDataStore(eventProcessor.eventStore())); - cleanup(eventProcessor); - assertThat(envelopes).isEmpty(); - } + cleanup(eventProcessor); + assertThat(envelopes).isEmpty(); + } - @Override - @Test - public void required_submitMultiEventCommandMustWriteAllEventsInDataStore() { - String topic = randomKafkaTopic(); - final EventProcessor eventProcessor = eventProcessor(topic); - submitMultiEventsCommand(eventProcessor, "1"); + @Override + @Test + public void required_submitMultiEventCommandMustWriteAllEventsInDataStore() { + String topic = randomKafkaTopic(); + final EventProcessor eventProcessor = eventProcessor(topic); + submitMultiEventsCommand(eventProcessor, "1"); - List> envelopes = deduplicateOnId(readFromDataStore(eventProcessor.eventStore())); + List> envelopes = deduplicateOnId(readFromDataStore(eventProcessor.eventStore())); - cleanup(eventProcessor); + cleanup(eventProcessor); - assertThat(envelopes.size()).isGreaterThan(1); - List ids = envelopes.stream().map(envelope -> envelope.id).collect(Collectors.toList()); + assertThat(envelopes.size()).isGreaterThan(1); + List ids = envelopes.stream().map(envelope -> envelope.id).collect(Collectors.toList()); - assertThat(ids).doesNotHaveDuplicates(); - } + assertThat(ids).doesNotHaveDuplicates(); + } - @Override - @Test - public void required_aggregateOfSingleEventStateShouldBeCorrect() { - String topic = randomKafkaTopic(); - EventProcessor eventProcessor = eventProcessor(topic); + @Override + @Test + public void required_aggregateOfSingleEventStateShouldBeCorrect() { + String topic = randomKafkaTopic(); + EventProcessor eventProcessor = eventProcessor(topic); - submitValidCommand(eventProcessor, "1"); - Option state = readState(eventProcessor, "1"); + submitValidCommand(eventProcessor, "1"); + Option state = readState(eventProcessor, "1"); - cleanup(eventProcessor); + cleanup(eventProcessor); - assertThat(state.isDefined()).isTrue(); - assertThat(state.get().count).isEqualTo(1); - } + assertThat(state.isDefined()).isTrue(); + assertThat(state.get().count).isEqualTo(1); + } - @Override - @Test - public void required_aggregateOfDeleteEventStateShouldBeEmpty() { - String topic = randomKafkaTopic(); - EventProcessor eventProcessor = eventProcessor(topic); + @Override + @Test + public void required_aggregateOfDeleteEventStateShouldBeEmpty() { + String topic = randomKafkaTopic(); + EventProcessor eventProcessor = eventProcessor(topic); - submitValidCommand(eventProcessor, "1"); - submitDeleteCommand(eventProcessor, "1"); - Option state = readState(eventProcessor, "1"); + submitValidCommand(eventProcessor, "1"); + submitDeleteCommand(eventProcessor, "1"); + Option state = readState(eventProcessor, "1"); - cleanup(eventProcessor); - - assertThat(state.isEmpty()).isTrue(); - } - - @Override - @Test - public void required_aggregateOfMultipleEventStateShouldBeCorrect() { - String topic = randomKafkaTopic(); - EventProcessor eventProcessor = eventProcessor(topic); - - submitMultiEventsCommand(eventProcessor, "1"); - Option state = readState(eventProcessor, "1"); - - cleanup(eventProcessor); - - assertThat(state.isDefined()).isTrue(); - assertThat(state.get().count).isEqualTo(2); - } - - @Override - @Test - public void required_singleEventShouldBePublished() { - String topic = randomKafkaTopic(); - EventProcessor eventProcessor = eventProcessor(topic); - - submitValidCommand(eventProcessor, "1"); - List> envelopes = deduplicateOnId(readPublishedEvents(kafkaBootstrapUrl(), topic)); - - cleanup(eventProcessor); - - assertThat(envelopes).hasSize(1); - } - - @Override - @Test - public void required_multipleEventsShouldBePublished() { - String topic = randomKafkaTopic(); - EventProcessor eventProcessor = eventProcessor(topic); - submitMultiEventsCommand(eventProcessor, "1"); - List> envelopes = deduplicateOnId(readPublishedEvents(kafkaBootstrapUrl(), topic)); - - cleanup(eventProcessor); - - assertThat(envelopes).hasSize(2); - } - - @Override - @Test - public void required_eventShouldBePublishedEventIfBrokerIsDownAtFirst() { - String topic = randomKafkaTopic(); - EventProcessor eventProcessor = eventProcessor(topic); - shutdownBroker(); - submitValidCommand(eventProcessor, "1"); - - restartBroker(); - sleep(); - List> envelopes = deduplicateOnId(readPublishedEvents(kafkaBootstrapUrl(), topic)); - - cleanup(eventProcessor); - - assertThat(envelopes).hasSize(1); - } - - @Override - @Test - public void required_commandSubmissionShouldFailIfDatabaseIsNotAvailable() { - String topic = randomKafkaTopic(); - EventProcessor eventProcessor = eventProcessor(topic); - shutdownDatabase(); - try { - Either> result = submitValidCommand(eventProcessor, "1"); - - cleanup(eventProcessor); - - assertThat(result.isLeft()).isTrue(); - } catch (Throwable ex) { - // implementation should either return an embedded error in either, either throw an exception - } finally { - restartDatabase(); - } - } - - - @Override - @Test - public void required_eventShouldBeConsumedByProjectionWhenEverythingIsAlright() { - String topic = randomKafkaTopic(); - EventProcessor eventProcessor = eventProcessor(topic); - submitValidCommand(eventProcessor, "1"); - sleep(); - - cleanup(eventProcessor); - assertThat(readProjection()).isEqualTo(1); - } - - @Override - @Test - public void required_eventShouldBeConsumedByProjectionEvenIfBrokerIsDownAtFirst() { - String topic = randomKafkaTopic(); - EventProcessor eventProcessor = eventProcessor(topic); - shutdownBroker(); - submitValidCommand(eventProcessor, "1"); - sleep(); - restartBroker(); - sleep(); - cleanup(eventProcessor); - assertThat(readProjection()).isEqualTo(1); - } - - @Override - @Test - public void required_eventShouldNotBeConsumedByProjectionEvenIfDataBaseIsBroken() { - String topic = randomKafkaTopic(); - EventProcessor eventProcessor = eventProcessor(topic); - shutdownDatabase(); - try { - submitValidCommand(eventProcessor, "1"); - } catch (Throwable t) { - } - sleep(); - cleanup(eventProcessor); - assertThat(readProjection()).isEqualTo(0); - restartDatabase(); - } - - @Override - public void required_eventShouldBeConsumedByConsistentProjectionWhenEverythingIsAlright() { - - String topic = randomKafkaTopic(); - consistentProjection.init(topic); - EventProcessor eventProcessor = eventProcessor(topic); - submitValidCommand(eventProcessor, "1"); - sleep(); - - cleanup(eventProcessor); - assertThat(readConsistentProjection()).isEqualTo(1); - } - - @Override - public void required_eventShouldBeConsumedByConsistentProjectionEvenIfBrokerIsDownAtFirst() { - String topic = randomKafkaTopic(); - consistentProjection.init(topic); - EventProcessor eventProcessor = eventProcessor(topic); - shutdownBroker(); - submitValidCommand(eventProcessor, "1"); - sleep(); - restartBroker(); - sleep(); - cleanup(eventProcessor); - assertThat(readConsistentProjection()).isEqualTo(1); - } - - private void sleep() { - try { - Thread.sleep(10000); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - } - - - @Override - public Either> submitValidCommand( - EventProcessor eventProcessor, - String id) { - return eventProcessor.processCommand(new TestCommand.SimpleCommand(id)).get(); - } - - @Override - public void submitInvalidCommand( - EventProcessor eventProcessor, - String id - ) { - eventProcessor.processCommand(new TestCommand.InvalidCommand(id)).get(); - - } - - @Override - public void submitMultiEventsCommand( - EventProcessor eventProcessor, - String id - ) { - eventProcessor.processCommand(new TestCommand.MultiEventCommand(id)).get(); - } - - @Override - public void submitDeleteCommand(EventProcessor eventProcessor, String id) { - eventProcessor.processCommand(new TestCommand.DeleteCommand(id)).get(); - } - - @Override - public Option readState(EventProcessor eventProcessor, String id) { - return eventProcessor.getAggregate(id).get(); - } - - @Override - public List> readFromDataStore(EventStore eventStore) { - try { - return eventStore.loadAllEvents().runWith(Sink.seq(), actorSystem).toCompletableFuture().get(); - } catch (InterruptedException | ExecutionException e) { - throw new RuntimeException(e); - } - } - - public String randomKafkaTopic() { - return "test-topic" + UUID.randomUUID(); - } - - private List> deduplicateOnId(List> envelopes) { - return io.vavr.collection.List.ofAll(envelopes).distinctBy(env -> env.id).toJavaList(); - } + cleanup(eventProcessor); + + assertThat(state.isEmpty()).isTrue(); + } + + @Override + @Test + public void required_aggregateOfMultipleEventStateShouldBeCorrect() { + String topic = randomKafkaTopic(); + EventProcessor eventProcessor = eventProcessor(topic); + + submitMultiEventsCommand(eventProcessor, "1"); + Option state = readState(eventProcessor, "1"); + + cleanup(eventProcessor); + + assertThat(state.isDefined()).isTrue(); + assertThat(state.get().count).isEqualTo(2); + } + + @Override + @Test + public void required_singleEventShouldBePublished() { + String topic = randomKafkaTopic(); + EventProcessor eventProcessor = eventProcessor(topic); + + submitValidCommand(eventProcessor, "1"); + List> envelopes = deduplicateOnId(readPublishedEvents(kafkaBootstrapUrl(), topic)); + + cleanup(eventProcessor); + + assertThat(envelopes).hasSize(1); + } + + @Override + @Test + public void required_multipleEventsShouldBePublished() { + String topic = randomKafkaTopic(); + EventProcessor eventProcessor = eventProcessor(topic); + submitMultiEventsCommand(eventProcessor, "1"); + List> envelopes = deduplicateOnId(readPublishedEvents(kafkaBootstrapUrl(), topic)); + + cleanup(eventProcessor); + + assertThat(envelopes).hasSize(2); + } + + @Override + @Test + public void required_eventShouldBePublishedEventIfBrokerIsDownAtFirst() { + String topic = randomKafkaTopic(); + EventProcessor eventProcessor = eventProcessor(topic); + shutdownBroker(); + submitValidCommand(eventProcessor, "1"); + + restartBroker(); + sleep(); + List> envelopes = deduplicateOnId(readPublishedEvents(kafkaBootstrapUrl(), topic)); + + cleanup(eventProcessor); + + assertThat(envelopes).hasSize(1); + } + + @Override + @Test + public void required_commandSubmissionShouldFailIfDatabaseIsNotAvailable() { + String topic = randomKafkaTopic(); + EventProcessor eventProcessor = eventProcessor(topic); + shutdownDatabase(); + try { + Either> result = submitValidCommand(eventProcessor, "1"); + + cleanup(eventProcessor); + + assertThat(result.isLeft()).isTrue(); + } catch (Throwable ex) { + // implementation should either return an embedded error in either, either throw an exception + } finally { + restartDatabase(); + } + } + + + @Override + @Test + public void required_eventShouldBeConsumedByProjectionWhenEverythingIsAlright() { + String topic = randomKafkaTopic(); + EventProcessor eventProcessor = eventProcessor(topic); + submitValidCommand(eventProcessor, "1"); + sleep(); + + cleanup(eventProcessor); + assertThat(readProjection()).isEqualTo(1); + } + + @Override + @Test + public void required_eventShouldBeConsumedByProjectionEvenIfBrokerIsDownAtFirst() { + String topic = randomKafkaTopic(); + EventProcessor eventProcessor = eventProcessor(topic); + shutdownBroker(); + submitValidCommand(eventProcessor, "1"); + sleep(); + restartBroker(); + sleep(); + cleanup(eventProcessor); + assertThat(readProjection()).isEqualTo(1); + } + + @Override + @Test + public void required_eventShouldNotBeConsumedByProjectionEvenIfDataBaseIsBroken() { + String topic = randomKafkaTopic(); + EventProcessor eventProcessor = eventProcessor(topic); + shutdownDatabase(); + try { + submitValidCommand(eventProcessor, "1"); + } catch (Throwable t) { + } + sleep(); + cleanup(eventProcessor); + assertThat(readProjection()).isEqualTo(0); + restartDatabase(); + } + + @Override + public void required_eventShouldBeConsumedByConsistentProjectionWhenEverythingIsAlright() { + + String topic = randomKafkaTopic(); + consistentProjection.init(topic); + EventProcessor eventProcessor = eventProcessor(topic); + submitValidCommand(eventProcessor, "1"); + sleep(); + + cleanup(eventProcessor); + assertThat(readConsistentProjection()).isEqualTo(1); + } + + @Override + public void required_eventShouldBeConsumedByConsistentProjectionEvenIfBrokerIsDownAtFirst() { + String topic = randomKafkaTopic(); + consistentProjection.init(topic); + EventProcessor eventProcessor = eventProcessor(topic); + shutdownBroker(); + submitValidCommand(eventProcessor, "1"); + sleep(); + restartBroker(); + sleep(); + cleanup(eventProcessor); + assertThat(readConsistentProjection()).isEqualTo(1); + } + + private void sleep() { + try { + Thread.sleep(10000); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + + + @Override + public Either> submitValidCommand( + EventProcessor eventProcessor, + String id) { + return eventProcessor.processCommand(new TestCommand.SimpleCommand(id)).get(); + } + + @Override + public void submitInvalidCommand( + EventProcessor eventProcessor, + String id + ) { + eventProcessor.processCommand(new TestCommand.InvalidCommand(id)).get(); + + } + + @Override + public void submitMultiEventsCommand( + EventProcessor eventProcessor, + String id + ) { + eventProcessor.processCommand(new TestCommand.MultiEventCommand(id)).get(); + } + + @Override + public void submitDeleteCommand(EventProcessor eventProcessor, String id) { + eventProcessor.processCommand(new TestCommand.DeleteCommand(id)).get(); + } + + @Override + public Option readState(EventProcessor eventProcessor, String id) { + return eventProcessor.getAggregate(id).get(); + } + + @Override + public List> readFromDataStore(EventStore eventStore) { + try { + return eventStore.loadAllEvents().runWith(Sink.seq(), actorSystem).toCompletableFuture().get(); + } catch (InterruptedException | ExecutionException e) { + throw new RuntimeException(e); + } + } + + public String randomKafkaTopic() { + return "test-topic" + UUID.randomUUID(); + } + + private List> deduplicateOnId(List> envelopes) { + return io.vavr.collection.List.ofAll(envelopes).distinctBy(env -> env.id).toJavaList(); + } } diff --git a/thoth-tck/src/main/java/fr/maif/eventsourcing/datastore/TestConsistentProjection.java b/thoth-tck/src/main/java/fr/maif/eventsourcing/datastore/TestConsistentProjection.java index 772189ee..9f904acb 100644 --- a/thoth-tck/src/main/java/fr/maif/eventsourcing/datastore/TestConsistentProjection.java +++ b/thoth-tck/src/main/java/fr/maif/eventsourcing/datastore/TestConsistentProjection.java @@ -6,48 +6,46 @@ import io.vavr.concurrent.Future; import javax.sql.DataSource; -import java.sql.PreparedStatement; -import java.sql.SQLException; public class TestConsistentProjection { - private int counter = 0; - private final ActorSystem actorSystem; - private final String bootstrapServer; - private final TestEventFormat eventFormat; - private final DataSource dataSource; - - public TestConsistentProjection( - ActorSystem actorSystem, - String bootstrapServer, - TestEventFormat eventFormat, - DataSource dataSource) { - this.actorSystem = actorSystem; - this.eventFormat = eventFormat; - this.dataSource = dataSource; - this.bootstrapServer =bootstrapServer; - } - - - public void init(String topic) { - this.counter = 0; - EventuallyConsistentProjection.create( - ActorSystem.create(), - "TestConsistentProjection", - EventuallyConsistentProjection.Config.create(topic, "TestConsistentProjection", bootstrapServer), - eventFormat, - envelope -> - Future.of(() -> { - if (envelope.event instanceof TestEvent.SimpleEvent){ - counter++; - } - return Tuple.empty(); - }) - - ).start(); - } - - public int getCount() { - return counter; - } + private final ActorSystem actorSystem; + private final String bootstrapServer; + private final TestEventFormat eventFormat; + private final DataSource dataSource; + private int counter = 0; + + public TestConsistentProjection( + ActorSystem actorSystem, + String bootstrapServer, + TestEventFormat eventFormat, + DataSource dataSource) { + this.actorSystem = actorSystem; + this.eventFormat = eventFormat; + this.dataSource = dataSource; + this.bootstrapServer = bootstrapServer; + } + + + public void init(String topic) { + this.counter = 0; + EventuallyConsistentProjection.create( + ActorSystem.create(), + "TestConsistentProjection", + EventuallyConsistentProjection.Config.create(topic, "TestConsistentProjection", bootstrapServer), + eventFormat, + envelope -> + Future.of(() -> { + if (envelope.event instanceof TestEvent.SimpleEvent) { + counter++; + } + return Tuple.empty(); + }) + + ).start(); + } + + public int getCount() { + return counter; + } } diff --git a/thoth-tck/src/main/java/fr/maif/eventsourcing/datastore/TestInstransactionProjection.java b/thoth-tck/src/main/java/fr/maif/eventsourcing/datastore/TestInstransactionProjection.java index 4efd35b9..f601fac7 100644 --- a/thoth-tck/src/main/java/fr/maif/eventsourcing/datastore/TestInstransactionProjection.java +++ b/thoth-tck/src/main/java/fr/maif/eventsourcing/datastore/TestInstransactionProjection.java @@ -1,18 +1,32 @@ package fr.maif.eventsourcing.datastore; -import java.sql.Connection; - -import akka.NotUsed; import fr.maif.eventsourcing.EventEnvelope; import fr.maif.eventsourcing.Projection; +import io.vavr.Tuple; import io.vavr.Tuple0; import io.vavr.collection.List; import io.vavr.concurrent.Future; +import java.sql.Connection; + public class TestInstransactionProjection implements Projection { + private int counter = 0; + @Override - public Future storeProjection(Connection connection, List> events) { - // TODO écrire des trucs en base - return null; + public Future storeProjection(Connection connection, List> envelopes) { + return Future.of(() -> { + envelopes.forEach(envelope -> { + if (envelope.event instanceof TestEvent.SimpleEvent) { + counter++; + } + }); + return Tuple.empty(); + }); } + + public int getCount() { + return counter; + } + + } diff --git a/thoth-tck/src/main/java/fr/maif/eventsourcing/datastore/TestProjection.java b/thoth-tck/src/main/java/fr/maif/eventsourcing/datastore/TestProjection.java deleted file mode 100644 index fd667c44..00000000 --- a/thoth-tck/src/main/java/fr/maif/eventsourcing/datastore/TestProjection.java +++ /dev/null @@ -1,32 +0,0 @@ -package fr.maif.eventsourcing.datastore; - -import fr.maif.eventsourcing.EventEnvelope; -import fr.maif.eventsourcing.Projection; -import io.vavr.Tuple; -import io.vavr.Tuple0; -import io.vavr.collection.List; -import io.vavr.concurrent.Future; - -import java.sql.Connection; - -public class TestProjection implements Projection { - private int counter = 0; - - @Override - public Future storeProjection(Connection connection, List> envelopes) { - return Future.of(() -> { - envelopes.forEach(envelope -> { - if (envelope.event instanceof TestEvent.SimpleEvent) { - counter++; - } - }); - return Tuple.empty(); - }); - } - - public int getCount() { - return counter; - } - - -} diff --git a/thoth-tck/src/test/java/fr/maif/eventsourcing/datastore/InMemoryDataStoreTest.java b/thoth-tck/src/test/java/fr/maif/eventsourcing/datastore/InMemoryDataStoreTest.java index 771a831b..1d91b749 100644 --- a/thoth-tck/src/test/java/fr/maif/eventsourcing/datastore/InMemoryDataStoreTest.java +++ b/thoth-tck/src/test/java/fr/maif/eventsourcing/datastore/InMemoryDataStoreTest.java @@ -16,118 +16,118 @@ import java.util.function.Function; public class InMemoryDataStoreTest extends DataStoreVerification { - public InMemoryEventStore eventStore; - public EventProcessor eventProcessor; - - - @BeforeMethod(alwaysRun = true) - public void init() { - this.eventStore = Mockito.spy(InMemoryEventStore.create(actorSystem)); - this.eventProcessor = new EventProcessor<>( - actorSystem, - eventStore, - noOpTransactionManager(), - new TestCommandHandler(), - new TestEventHandler(), - io.vavr.collection.List.empty() - ); - } - - @Override - public List> readPublishedEvents(String kafkaBootStrapUrl, String topic) { - try { - return this.eventStore.loadAllEvents().runWith(Sink.seq(), actorSystem).toCompletableFuture().get(); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } catch (ExecutionException e) { - throw new RuntimeException(e); - } - } - - @Override - public Integer readProjection() { - // Not implemented for in memory - return null; - } - - @Override - public Integer readConsistentProjection() { - // Not implemented for in memory - return null; - } - - @Override - public void required_eventShouldBeConsumedByProjectionWhenEverythingIsAlright() { - // Not implemented for in memory - } - - @Override - public void required_eventShouldBeConsumedByProjectionEvenIfBrokerIsDownAtFirst() { - // Not implemented for in memory - } - - @Override - public void required_eventShouldNotBeConsumedByProjectionEvenIfDataBaseIsBroken() { - // Not implemented for in memory - } - - @Override - public void required_commandSubmissionShouldFailIfDatabaseIsNotAvailable() { - // Not implemented for in memory - } - - @Override - public void required_eventShouldBePublishedEventIfBrokerIsDownAtFirst() { - // Not implemented for in memory - } - - @Override - public void required_eventShouldBeConsumedByConsistentProjectionWhenEverythingIsAlright() { - // Not implemented for in memory - } - - @Override - public void required_eventShouldBeConsumedByConsistentProjectionEvenIfBrokerIsDownAtFirst() { - // Not implemented for in memory - } - - @Override - public void shutdownBroker() { - throw new RuntimeException("Not implemented for in memory"); - } - - @Override - public void restartBroker() { - Mockito.reset(eventStore); - throw new RuntimeException("Not implemented for in memory"); - } - - @Override - public void shutdownDatabase() { - throw new RuntimeException("Not implemented for in memory"); - } - - @Override - public void restartDatabase() { - throw new RuntimeException("Not implemented for in memory"); - } - - @Override - public EventProcessor eventProcessor(String topic) { - return this.eventProcessor; - } - - @Override - public String kafkaBootstrapUrl() { - return null; - } - - private TransactionManager noOpTransactionManager() { - return new TransactionManager() { - @Override - public Future withTransaction(Function> function) { - return function.apply(Tuple.empty()); - } - }; - } + public InMemoryEventStore eventStore; + public EventProcessor eventProcessor; + + + @BeforeMethod(alwaysRun = true) + public void init() { + this.eventStore = Mockito.spy(InMemoryEventStore.create(actorSystem)); + this.eventProcessor = new EventProcessor<>( + actorSystem, + eventStore, + noOpTransactionManager(), + new TestCommandHandler(), + new TestEventHandler(), + io.vavr.collection.List.empty() + ); + } + + @Override + public List> readPublishedEvents(String kafkaBootStrapUrl, String topic) { + try { + return this.eventStore.loadAllEvents().runWith(Sink.seq(), actorSystem).toCompletableFuture().get(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } catch (ExecutionException e) { + throw new RuntimeException(e); + } + } + + @Override + public Integer readProjection() { + // Not implemented for in memory + return null; + } + + @Override + public Integer readConsistentProjection() { + // Not implemented for in memory + return null; + } + + @Override + public void required_eventShouldBeConsumedByProjectionWhenEverythingIsAlright() { + // Not implemented for in memory + } + + @Override + public void required_eventShouldBeConsumedByProjectionEvenIfBrokerIsDownAtFirst() { + // Not implemented for in memory + } + + @Override + public void required_eventShouldNotBeConsumedByProjectionEvenIfDataBaseIsBroken() { + // Not implemented for in memory + } + + @Override + public void required_commandSubmissionShouldFailIfDatabaseIsNotAvailable() { + // Not implemented for in memory + } + + @Override + public void required_eventShouldBePublishedEventIfBrokerIsDownAtFirst() { + // Not implemented for in memory + } + + @Override + public void required_eventShouldBeConsumedByConsistentProjectionWhenEverythingIsAlright() { + // Not implemented for in memory + } + + @Override + public void required_eventShouldBeConsumedByConsistentProjectionEvenIfBrokerIsDownAtFirst() { + // Not implemented for in memory + } + + @Override + public void shutdownBroker() { + throw new RuntimeException("Not implemented for in memory"); + } + + @Override + public void restartBroker() { + Mockito.reset(eventStore); + throw new RuntimeException("Not implemented for in memory"); + } + + @Override + public void shutdownDatabase() { + throw new RuntimeException("Not implemented for in memory"); + } + + @Override + public void restartDatabase() { + throw new RuntimeException("Not implemented for in memory"); + } + + @Override + public EventProcessor eventProcessor(String topic) { + return this.eventProcessor; + } + + @Override + public String kafkaBootstrapUrl() { + return null; + } + + private TransactionManager noOpTransactionManager() { + return new TransactionManager() { + @Override + public Future withTransaction(Function> function) { + return function.apply(Tuple.empty()); + } + }; + } } From c65f1a1c029d6ffec6b431d808cbac00d9d226e7 Mon Sep 17 00:00:00 2001 From: Matthieu Belmontet Date: Wed, 2 Jun 2021 10:12:17 +0200 Subject: [PATCH 4/6] ignore java version file --- .gitignore | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/.gitignore b/.gitignore index fd36b7c5..309a5ba6 100644 --- a/.gitignore +++ b/.gitignore @@ -4,4 +4,5 @@ target .bsp/ .DS_Store .bloop -*.iml \ No newline at end of file +*.iml +.java-version \ No newline at end of file From f6c6ed97bb39a1a58c49fd4c325b22bdc0b386f0 Mon Sep 17 00:00:00 2001 From: Matthieu Belmontet Date: Wed, 2 Jun 2021 14:45:19 +0200 Subject: [PATCH 5/6] tck async jooq --- build.sbt | 2 +- thoth-jooq-async/build.sbt | 9 +- .../JooqAsyncKafkaTCKImplementation.java | 347 ++++++++++++++++++ thoth-jooq-async/src/test/resources/base.sql | 26 ++ .../src/test/resources/testng.xml | 11 + .../impl/JooqKafkaTckImplementation.java | 90 +++-- thoth-tck/build.sbt | 9 +- .../datastore/DataStoreVerification.java | 19 +- .../datastore/DataStoreVerificationRules.java | 1 - .../eventsourcing/datastore/TestEvent.java | 5 +- .../TestInstransactionProjection.java | 24 +- .../TestInstransactionProjectionAsync.java | 32 ++ .../datastore/InMemoryDataStoreTest.java | 5 - 13 files changed, 502 insertions(+), 78 deletions(-) create mode 100644 thoth-jooq-async/src/test/java/fr/maif/eventsourcing/JooqAsyncKafkaTCKImplementation.java create mode 100644 thoth-jooq-async/src/test/resources/base.sql create mode 100644 thoth-jooq-async/src/test/resources/testng.xml create mode 100644 thoth-tck/src/main/java/fr/maif/eventsourcing/datastore/TestInstransactionProjectionAsync.java diff --git a/build.sbt b/build.sbt index efcc48dc..c46ba263 100644 --- a/build.sbt +++ b/build.sbt @@ -92,7 +92,7 @@ lazy val `thoth-kafka-goodies` = project ) lazy val `thoth-jooq-async` = project - .dependsOn(`thoth-core`) + .dependsOn(`thoth-core`, `thoth-tck`) .settings( sonatypeRepository := "https://s01.oss.sonatype.org/service/local", sonatypeCredentialHost := "s01.oss.sonatype.org", diff --git a/thoth-jooq-async/build.sbt b/thoth-jooq-async/build.sbt index 80166bb3..f72274d7 100644 --- a/thoth-jooq-async/build.sbt +++ b/thoth-jooq-async/build.sbt @@ -17,9 +17,16 @@ libraryDependencies ++= Seq( "org.junit.platform" % "junit-platform-commons" % "1.4.2" % Test, "org.junit.jupiter" % "junit-jupiter-engine" % "5.4.2" % Test, "org.junit.vintage" % "junit-vintage-engine" % "5.4.2" % Test, - "net.aichler" % "jupiter-interface" % "0.9.1" % Test + "net.aichler" % "jupiter-interface" % "0.9.1" % Test, + "org.mockito" % "mockito-core" % "2.22.0" % Test, + "org.testng" % "testng" % "6.3" % Test, + "org.testcontainers" % "postgresql" % "1.15.0" % Test, + "org.testcontainers" % "kafka" % "1.15.0" % Test ) + +testNGSuites := Seq(((resourceDirectory in Test).value / "testng.xml").absolutePath) + javacOptions in Compile ++= Seq( "-source", "8", diff --git a/thoth-jooq-async/src/test/java/fr/maif/eventsourcing/JooqAsyncKafkaTCKImplementation.java b/thoth-jooq-async/src/test/java/fr/maif/eventsourcing/JooqAsyncKafkaTCKImplementation.java new file mode 100644 index 00000000..920aa2fc --- /dev/null +++ b/thoth-jooq-async/src/test/java/fr/maif/eventsourcing/JooqAsyncKafkaTCKImplementation.java @@ -0,0 +1,347 @@ +package fr.maif.eventsourcing; + +import akka.actor.ActorSystem; +import akka.kafka.ProducerSettings; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; +import fr.maif.eventsourcing.datastore.DataStoreVerification; +import fr.maif.eventsourcing.datastore.TestCommand; +import fr.maif.eventsourcing.datastore.TestCommandHandler; +import fr.maif.eventsourcing.datastore.TestConsistentProjection; +import fr.maif.eventsourcing.datastore.TestEvent; +import fr.maif.eventsourcing.datastore.TestEventFormat; +import fr.maif.eventsourcing.datastore.TestEventHandler; +import fr.maif.eventsourcing.datastore.TestInstransactionProjection; +import fr.maif.eventsourcing.datastore.TestInstransactionProjectionAsync; +import fr.maif.eventsourcing.datastore.TestState; +import fr.maif.eventsourcing.format.JacksonEventFormat; +import fr.maif.eventsourcing.format.JacksonSimpleFormat; +import fr.maif.jooq.PgAsyncPool; +import fr.maif.jooq.reactive.ReactivePgAsyncPool; +import fr.maif.json.EventEnvelopeJson; +import fr.maif.kafka.JsonSerializer; +import fr.maif.kafka.KafkaSettings; +import io.vavr.Tuple0; +import io.vertx.core.Vertx; +import io.vertx.pgclient.PgConnectOptions; +import io.vertx.pgclient.PgPool; +import io.vertx.sqlclient.PoolOptions; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.jooq.SQLDialect; +import org.jooq.impl.DefaultConfiguration; +import org.postgresql.ds.PGSimpleDataSource; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.KafkaContainer; +import org.testcontainers.containers.PostgreSQLContainer; +import org.testcontainers.utility.DockerImageName; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.BeforeMethod; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Optional; +import java.util.Properties; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; + +public class JooqAsyncKafkaTCKImplementation extends DataStoreVerification { + + + public static final String DEFAULT_POSTGRE_TAG = "9.6.12"; + private static final DockerImageName DEFAULT_KAFKA_IMAGE_NAME = DockerImageName.parse("confluentinc/cp-kafka"); + private static final String DEFAULT_KAFKA_TAG = "5.4.3"; + private static final DockerImageName DEFAULT_POSTGRE_IMAGE_NAME = DockerImageName.parse("postgres"); + private final String SCHEMA = "CREATE TABLE IF NOT EXISTS test_journal (\n" + + " id UUID primary key,\n" + + " entity_id varchar(100) not null,\n" + + " sequence_num bigint not null,\n" + + " event_type varchar(100) not null,\n" + + " version int not null,\n" + + " transaction_id varchar(100) not null,\n" + + " event jsonb not null,\n" + + " metadata jsonb,\n" + + " context jsonb,\n" + + " total_message_in_transaction int default 1,\n" + + " num_message_in_transaction int default 1,\n" + + " emission_date timestamp not null default now(),\n" + + " user_id varchar(100),\n" + + " system_id varchar(100),\n" + + " published boolean default false,\n" + + " UNIQUE (entity_id, sequence_num)\n" + + " );\n" + + "CREATE TABLE IF NOT EXISTS test_projection (\n" + + " counter int not null\n" + + " );\n" + + " CREATE SEQUENCE if not exists test_sequence_num;"; + private final String INIT_TABLE_QUERY = "TRUNCATE TABLE test_journal;\n" + + " TRUNCATE TABLE test_projection;\n" + + " INSERT INTO test_projection VALUES(0);\n" ; + + private PGSimpleDataSource dataSource; + private TableNames tableNames; + private TestEventFormat eventFormat; + private PostgreSQLContainer postgres; + private KafkaContainer kafka; + private Projection testProjection; + private PgAsyncPool pgAsyncPool; + private Vertx vertx; + private PgPool pgPool; + + private static Optional getEndOffsetIfNotReached(String topic, String kafkaServers, String groupId) { + Properties properties = new Properties(); + properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServers); + properties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); + properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); + properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + KafkaConsumer consumer = new KafkaConsumer<>(properties); + PartitionInfo partitionInfo = consumer.partitionsFor("foo").get(0); + TopicPartition topicPartition = new TopicPartition(topic, partitionInfo.partition()); + consumer.assign(Collections.singletonList(topicPartition)); + + long position = consumer.position(topicPartition); + consumer.seekToEnd(Collections.singletonList(topicPartition)); + final long endOffset = consumer.position(topicPartition); + + Optional result = Optional.empty(); + if (endOffset > 0 && endOffset > position) { + result = Optional.of(consumer.position(topicPartition) - 1); + } + + consumer.close(); + return result; + } + + @AfterClass(alwaysRun = true) + public void tearDown() throws InterruptedException { + Thread.sleep(10000); + postgres.stop(); + kafka.stop(); + this.vertx.close(); + this.pgPool.close(); + } + + @BeforeClass(alwaysRun = true) + public void initClass() { + + this.tableNames = new TableNames("test_journal", "test_sequence_num"); + this.eventFormat = new TestEventFormat(); + + postgres = new PostgreSQLContainer(DEFAULT_POSTGRE_IMAGE_NAME.withTag(DEFAULT_POSTGRE_TAG)); + postgres.start(); + + kafka = new KafkaContainer(DEFAULT_KAFKA_IMAGE_NAME.withTag(DEFAULT_KAFKA_TAG)); + kafka.start(); + consistentProjection = new TestConsistentProjection(actorSystem, kafka.getBootstrapServers(), eventFormat, dataSource); + this.pgAsyncPool = pgAsyncPool(postgres); + } + + @BeforeMethod(alwaysRun = true) + public void init() throws SQLException { + this.testProjection = new TestInstransactionProjectionAsync(); + this.dataSource = new PGSimpleDataSource(); + dataSource.setUrl(postgres.getJdbcUrl()); + dataSource.setUser(postgres.getUsername()); + dataSource.setPassword(postgres.getPassword()); + // Override default setting, which wait indefinitely if database is down + dataSource.setLoginTimeout(5); + + dataSource.getConnection().prepareStatement(SCHEMA).execute(); + dataSource.getConnection().prepareStatement(INIT_TABLE_QUERY).execute(); + + + + + } + + private PgAsyncPool pgAsyncPool(PostgreSQLContainer server) { + this.vertx = Vertx.vertx(); + DefaultConfiguration jooqConfig = new DefaultConfiguration(); + jooqConfig.setSQLDialect(SQLDialect.POSTGRES); + + final PgConnectOptions options = new PgConnectOptions() + .setPort(server.getMappedPort(PostgreSQLContainer.POSTGRESQL_PORT)) + .setHost(server.getContainerIpAddress()) + .setDatabase(server.getDatabaseName()) + .setUser(server.getUsername()) + .setPassword(server.getPassword()); + PoolOptions poolOptions = new PoolOptions().setMaxSize(50); + this.pgPool = PgPool.pool(vertx, options, poolOptions); + + return new ReactivePgAsyncPool(pgPool, jooqConfig); + } + + @Override + public EventProcessor eventProcessor(String topic) { + + + return ReactivePostgresKafkaEventProcessor + .withSystem(ActorSystem.create()) + .withPgAsyncPool(this.pgAsyncPool) + .withTables(tableNames) + .withTransactionManager() + .withEventFormater(eventFormat) + .withNoMetaFormater() + .withNoContextFormater() + .withKafkaSettings(topic, producerSettings(settings(), new TestEventFormat())) + .withEventHandler(new TestEventHandler()) + .withDefaultAggregateStore() + .withCommandHandler(new TestCommandHandler<>()) + .withProjections(this.testProjection) + .build(); + } + + @Override + public String kafkaBootstrapUrl() { + return kafka.getBootstrapServers(); + } + + @Override + public void shutdownBroker() { + pauseContainer(kafka); + } + + @Override + public void restartBroker() { + unPauseContainer(kafka); + } + + @Override + public void shutdownDatabase() { + pauseContainer(postgres); + } + + @Override + public void restartDatabase() { + unPauseContainer(postgres); + } + + private void pauseContainer(GenericContainer container) { + container.getDockerClient().pauseContainerCmd(container.getContainerId()).exec(); + } + + private void unPauseContainer(GenericContainer container) { + container.getDockerClient().unpauseContainerCmd(container.getContainerId()).exec(); + } + + private KafkaSettings settings() { + return KafkaSettings.newBuilder(kafka.getBootstrapServers()).build(); + } + + private ProducerSettings> producerSettings( + KafkaSettings kafkaSettings, + JacksonEventFormat eventFormat) { + return kafkaSettings.producerSettings(actorSystem, JsonSerializer.of( + eventFormat, + JacksonSimpleFormat.empty(), + JacksonSimpleFormat.empty() + ) + ); + } + + @Override + public List> readPublishedEvents(String kafkaBootstrapUrl, String topic) { + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + String groupId = "reader-" + UUID.randomUUID(); + Optional maybeLastOffset = getEndOffsetIfNotReached(topic, kafkaBootstrapUrl, groupId); + if (!maybeLastOffset.isPresent()) { + return Collections.emptyList(); + } + long lastOffset = maybeLastOffset.get(); + + Properties props = new Properties(); + props.put("bootstrap.servers", kafkaBootstrapUrl); + props.put("group.id", groupId); + props.put("key.deserializer", + "org.apache.kafka.common.serialization.StringDeserializer"); + props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); + props.put("value.deserializer", + "org.apache.kafka.common.serialization.StringDeserializer"); + props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + + KafkaConsumer consumer = new KafkaConsumer<>(props); + + consumer.subscribe(Collections.singletonList(topic)); + + boolean running = true; + List> envelopes = new ArrayList<>(); + while (running) { + ConsumerRecords records = consumer.poll(Duration.ofMillis(500)); + for (ConsumerRecord record : records) { + final long offset = record.offset(); + if (offset >= lastOffset) { + running = false; + } + envelopes.add(parsEnvelope(record.value())); + } + consumer.commitSync(); + } + consumer.close(); + return envelopes; + } + + @Override + public Integer readProjection() { + try(final ResultSet resultSet = this.dataSource.getConnection() + .prepareStatement("SELECT counter::numeric FROM test_projection LIMIT 1").executeQuery()) { + resultSet.next(); + return resultSet.getInt(1); + } catch (SQLException throwables) { + throwables.printStackTrace(); + } + return null; + } + + @Override + public Integer readConsistentProjection() { + return consistentProjection.getCount(); + } + + public EventEnvelope parsEnvelope(String value) { + try { + ObjectMapper mapper = new ObjectMapper(); + ObjectNode node = (ObjectNode) mapper.readTree(value); + CompletableFuture> future = new CompletableFuture<>(); + EventEnvelopeJson.deserialize( + node, + eventFormat, + JacksonSimpleFormat.empty(), + JacksonSimpleFormat.empty(), + (event, err) -> { + future.completeExceptionally(new RuntimeException(err.toString())); + }, + future::complete + ); + return future.get(); + } catch (IOException e) { + throw new UncheckedIOException(e); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } catch (ExecutionException e) { + throw new RuntimeException(e); + } + } +} + + + diff --git a/thoth-jooq-async/src/test/resources/base.sql b/thoth-jooq-async/src/test/resources/base.sql new file mode 100644 index 00000000..935d1649 --- /dev/null +++ b/thoth-jooq-async/src/test/resources/base.sql @@ -0,0 +1,26 @@ + +CREATE TABLE IF NOT EXISTS vikings_journal ( + id UUID primary key, + entity_id varchar(100) not null, + sequence_num bigint not null, + event_type varchar(100) not null, + version int not null, + transaction_id varchar(100) not null, + event jsonb not null, + metadata jsonb, + context jsonb, + total_message_in_transaction int default 1, + num_message_in_transaction int default 1, + emission_date timestamp not null default now(), + user_id varchar(100), + system_id varchar(100), + published boolean default false, + UNIQUE (entity_id, sequence_num) +); +CREATE INDEX IF NOT EXISTS vikings_sequence_num_idx ON vikings_journal (sequence_num); +CREATE INDEX IF NOT EXISTS vikings_entity_id_idx ON vikings_journal (entity_id); +CREATE INDEX IF NOT EXISTS vikings_user_id_idx ON vikings_journal (user_id); +CREATE INDEX IF NOT EXISTS vikings_system_id_idx ON vikings_journal (system_id); +CREATE INDEX IF NOT EXISTS vikings_emission_date_idx ON vikings_journal (emission_date); +CREATE SEQUENCE if not exists vikings_sequence_num; +CREATE SEQUENCE if not exists vikings_id; diff --git a/thoth-jooq-async/src/test/resources/testng.xml b/thoth-jooq-async/src/test/resources/testng.xml new file mode 100644 index 00000000..c8b266ec --- /dev/null +++ b/thoth-jooq-async/src/test/resources/testng.xml @@ -0,0 +1,11 @@ + + + + + + + + + + + \ No newline at end of file diff --git a/thoth-jooq/src/test/java/fr/maif/eventsourcing/impl/JooqKafkaTckImplementation.java b/thoth-jooq/src/test/java/fr/maif/eventsourcing/impl/JooqKafkaTckImplementation.java index ab1d3e58..39432086 100644 --- a/thoth-jooq/src/test/java/fr/maif/eventsourcing/impl/JooqKafkaTckImplementation.java +++ b/thoth-jooq/src/test/java/fr/maif/eventsourcing/impl/JooqKafkaTckImplementation.java @@ -1,52 +1,21 @@ package fr.maif.eventsourcing.impl; -import java.io.IOException; -import java.io.UncheckedIOException; -import java.sql.Connection; -import java.sql.SQLException; -import java.time.Duration; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.Optional; -import java.util.Properties; -import java.util.UUID; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Executors; - -import fr.maif.eventsourcing.Projection; -import fr.maif.eventsourcing.datastore.TestConsistentProjection; -import fr.maif.eventsourcing.datastore.TestInstransactionProjection; -import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.clients.consumer.ConsumerRecords; -import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.apache.kafka.common.PartitionInfo; -import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.serialization.StringDeserializer; -import org.postgresql.ds.PGSimpleDataSource; -import org.testcontainers.containers.GenericContainer; -import org.testcontainers.containers.KafkaContainer; -import org.testcontainers.containers.PostgreSQLContainer; -import org.testng.annotations.AfterClass; -import org.testng.annotations.BeforeClass; -import org.testng.annotations.BeforeMethod; - -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.node.ObjectNode; - import akka.actor.ActorSystem; import akka.kafka.ProducerSettings; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; import fr.maif.eventsourcing.EventEnvelope; import fr.maif.eventsourcing.EventProcessor; import fr.maif.eventsourcing.PostgresKafkaEventProcessor; +import fr.maif.eventsourcing.Projection; import fr.maif.eventsourcing.datastore.DataStoreVerification; import fr.maif.eventsourcing.datastore.TestCommand; import fr.maif.eventsourcing.datastore.TestCommandHandler; +import fr.maif.eventsourcing.datastore.TestConsistentProjection; import fr.maif.eventsourcing.datastore.TestEvent; import fr.maif.eventsourcing.datastore.TestEventFormat; import fr.maif.eventsourcing.datastore.TestEventHandler; +import fr.maif.eventsourcing.datastore.TestInstransactionProjection; import fr.maif.eventsourcing.datastore.TestState; import fr.maif.eventsourcing.format.JacksonEventFormat; import fr.maif.eventsourcing.format.JacksonSimpleFormat; @@ -54,6 +23,36 @@ import fr.maif.kafka.JsonSerializer; import fr.maif.kafka.KafkaSettings; import io.vavr.Tuple0; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.postgresql.ds.PGSimpleDataSource; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.KafkaContainer; +import org.testcontainers.containers.PostgreSQLContainer; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.BeforeMethod; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Optional; +import java.util.Properties; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; public class JooqKafkaTckImplementation extends DataStoreVerification { private PGSimpleDataSource dataSource; @@ -81,9 +80,13 @@ public class JooqKafkaTckImplementation extends DataStoreVerification> readPublishedEvents(String @Override public Integer readProjection() { - return ((TestInstransactionProjection)this.testProjection).getCount(); + try(final ResultSet resultSet = this.dataSource.getConnection() + .prepareStatement("SELECT counter::numeric FROM test_projection LIMIT 1").executeQuery()) { + resultSet.next(); + return resultSet.getInt(1); + } catch (SQLException throwables) { + throwables.printStackTrace(); + } + return null; } @Override diff --git a/thoth-tck/build.sbt b/thoth-tck/build.sbt index b4bc9134..189daaa9 100644 --- a/thoth-tck/build.sbt +++ b/thoth-tck/build.sbt @@ -5,10 +5,11 @@ organization := "fr.maif" name := "thoth-tck" libraryDependencies ++= Seq( - "org.assertj" % "assertj-core" % "3.10.0", - "org.testng" % "testng" % "6.3", - "com.typesafe.akka" %% "akka-stream" % akkaVersion % Test, - "org.mockito" % "mockito-core" % "3.6.28" % Test + "org.assertj" % "assertj-core" % "3.10.0", + "org.testng" % "testng" % "6.3", + "com.typesafe.akka" %% "akka-stream" % akkaVersion % Test, + "org.mockito" % "mockito-core" % "3.6.28" % Test, + "fr.maif" %% "jooq-async-api" % jooqAsyncVersion ) testNGSuites := Seq(((resourceDirectory in Test).value / "testng.xml").absolutePath) diff --git a/thoth-tck/src/main/java/fr/maif/eventsourcing/datastore/DataStoreVerification.java b/thoth-tck/src/main/java/fr/maif/eventsourcing/datastore/DataStoreVerification.java index 9f7f39ae..7967a4e2 100644 --- a/thoth-tck/src/main/java/fr/maif/eventsourcing/datastore/DataStoreVerification.java +++ b/thoth-tck/src/main/java/fr/maif/eventsourcing/datastore/DataStoreVerification.java @@ -159,7 +159,7 @@ public void required_eventShouldBePublishedEventIfBrokerIsDownAtFirst() { } @Override - @Test + // @Test public void required_commandSubmissionShouldFailIfDatabaseIsNotAvailable() { String topic = randomKafkaTopic(); EventProcessor eventProcessor = eventProcessor(topic); @@ -204,21 +204,8 @@ public void required_eventShouldBeConsumedByProjectionEvenIfBrokerIsDownAtFirst( assertThat(readProjection()).isEqualTo(1); } - @Override - @Test - public void required_eventShouldNotBeConsumedByProjectionEvenIfDataBaseIsBroken() { - String topic = randomKafkaTopic(); - EventProcessor eventProcessor = eventProcessor(topic); - shutdownDatabase(); - try { - submitValidCommand(eventProcessor, "1"); - } catch (Throwable t) { - } - sleep(); - cleanup(eventProcessor); - assertThat(readProjection()).isEqualTo(0); - restartDatabase(); - } + + @Override public void required_eventShouldBeConsumedByConsistentProjectionWhenEverythingIsAlright() { diff --git a/thoth-tck/src/main/java/fr/maif/eventsourcing/datastore/DataStoreVerificationRules.java b/thoth-tck/src/main/java/fr/maif/eventsourcing/datastore/DataStoreVerificationRules.java index 5a3722e6..a2e0d1f1 100644 --- a/thoth-tck/src/main/java/fr/maif/eventsourcing/datastore/DataStoreVerificationRules.java +++ b/thoth-tck/src/main/java/fr/maif/eventsourcing/datastore/DataStoreVerificationRules.java @@ -42,7 +42,6 @@ public interface DataStoreVerificationRules SimpleEventV1 = Type.create(SimpleEvent.class, 1L); public static Type DeleteEventV1 = Type.create(DeleteEvent.class, 1L); - + static API.Match.Pattern0 $SimpleEvent() { + return API.Match.Pattern0.of(SimpleEvent.class); + } @Override public String entityId() { return id; diff --git a/thoth-tck/src/main/java/fr/maif/eventsourcing/datastore/TestInstransactionProjection.java b/thoth-tck/src/main/java/fr/maif/eventsourcing/datastore/TestInstransactionProjection.java index f601fac7..27b30c5f 100644 --- a/thoth-tck/src/main/java/fr/maif/eventsourcing/datastore/TestInstransactionProjection.java +++ b/thoth-tck/src/main/java/fr/maif/eventsourcing/datastore/TestInstransactionProjection.java @@ -8,24 +8,30 @@ import io.vavr.concurrent.Future; import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.SQLException; public class TestInstransactionProjection implements Projection { - private int counter = 0; + @Override public Future storeProjection(Connection connection, List> envelopes) { return Future.of(() -> { - envelopes.forEach(envelope -> { - if (envelope.event instanceof TestEvent.SimpleEvent) { - counter++; + try (PreparedStatement incrementStatement = connection.prepareStatement("UPDATE test_projection SET counter=counter+1")) { + for (EventEnvelope envelope : envelopes) { + if (envelope.event instanceof TestEvent.SimpleEvent) { + incrementStatement.addBatch(); + incrementStatement.executeBatch(); + } } - }); - return Tuple.empty(); + return Tuple.empty(); + } catch (SQLException ex) { + throw new RuntimeException("Failed to update projection", ex); + } + }); - } - public int getCount() { - return counter; + } diff --git a/thoth-tck/src/main/java/fr/maif/eventsourcing/datastore/TestInstransactionProjectionAsync.java b/thoth-tck/src/main/java/fr/maif/eventsourcing/datastore/TestInstransactionProjectionAsync.java new file mode 100644 index 00000000..2c5b6f55 --- /dev/null +++ b/thoth-tck/src/main/java/fr/maif/eventsourcing/datastore/TestInstransactionProjectionAsync.java @@ -0,0 +1,32 @@ +package fr.maif.eventsourcing.datastore; +import fr.maif.jooq.PgAsyncTransaction; +import fr.maif.eventsourcing.EventEnvelope; +import fr.maif.eventsourcing.Projection; +import io.vavr.API; +import io.vavr.Tuple; +import io.vavr.Tuple0; +import io.vavr.collection.List; +import io.vavr.concurrent.Future; + +import static io.vavr.API.Case; +import static io.vavr.API.Match; +import static io.vavr.PartialFunction.unlift; + + +public class TestInstransactionProjectionAsync implements Projection { + + @Override + public Future storeProjection(PgAsyncTransaction connection, List> envelopes) { + return connection.executeBatch(dsl -> + envelopes.collect(unlift(eventEnvelope -> + Match(eventEnvelope.event).option( + Case(TestEvent.$SimpleEvent(), e -> API.Tuple(eventEnvelope, e)) + ))) + .map(t -> dsl.query("UPDATE test_projection SET counter=counter+1" )) + ).map(__ -> Tuple.empty()); + } + + + + +} diff --git a/thoth-tck/src/test/java/fr/maif/eventsourcing/datastore/InMemoryDataStoreTest.java b/thoth-tck/src/test/java/fr/maif/eventsourcing/datastore/InMemoryDataStoreTest.java index 1d91b749..4c94781e 100644 --- a/thoth-tck/src/test/java/fr/maif/eventsourcing/datastore/InMemoryDataStoreTest.java +++ b/thoth-tck/src/test/java/fr/maif/eventsourcing/datastore/InMemoryDataStoreTest.java @@ -66,11 +66,6 @@ public void required_eventShouldBeConsumedByProjectionEvenIfBrokerIsDownAtFirst( // Not implemented for in memory } - @Override - public void required_eventShouldNotBeConsumedByProjectionEvenIfDataBaseIsBroken() { - // Not implemented for in memory - } - @Override public void required_commandSubmissionShouldFailIfDatabaseIsNotAvailable() { // Not implemented for in memory From 635c19c25a2dd799fdb4394f2d8bf6819775ddd2 Mon Sep 17 00:00:00 2001 From: Benjamin CAVY Date: Mon, 7 Jun 2021 21:48:59 +0200 Subject: [PATCH 6/6] fix: fix tkc tests Tests were not working since connection timeout was not setup on Postgres connection. Default timeout is infinite therefore test that shutdown BDD hangs. --- project/Dependencies.scala | 2 +- .../JooqAsyncKafkaTCKImplementation.java | 12 ++++++------ .../datastore/DataStoreVerification.java | 2 +- .../datastore/DataStoreVerificationRules.java | 4 ++-- 4 files changed, 10 insertions(+), 10 deletions(-) diff --git a/project/Dependencies.scala b/project/Dependencies.scala index f9c10f4e..0c11490e 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -4,6 +4,6 @@ object Dependencies { val akkaVersion = "2.6.14" val vavrVersion = "0.10.3" val jooqVersion = "3.14.3" - val jooqAsyncVersion = "1.1.0" + val jooqAsyncVersion = "1.1.2" val functionalJsonVersion = "1.0.3" } diff --git a/thoth-jooq-async/src/test/java/fr/maif/eventsourcing/JooqAsyncKafkaTCKImplementation.java b/thoth-jooq-async/src/test/java/fr/maif/eventsourcing/JooqAsyncKafkaTCKImplementation.java index 920aa2fc..e747e456 100644 --- a/thoth-jooq-async/src/test/java/fr/maif/eventsourcing/JooqAsyncKafkaTCKImplementation.java +++ b/thoth-jooq-async/src/test/java/fr/maif/eventsourcing/JooqAsyncKafkaTCKImplementation.java @@ -41,6 +41,7 @@ import org.testcontainers.containers.PostgreSQLContainer; import org.testcontainers.utility.DockerImageName; import org.testng.annotations.AfterClass; +import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeClass; import org.testng.annotations.BeforeMethod; @@ -148,8 +149,6 @@ public void initClass() { kafka = new KafkaContainer(DEFAULT_KAFKA_IMAGE_NAME.withTag(DEFAULT_KAFKA_TAG)); kafka.start(); - consistentProjection = new TestConsistentProjection(actorSystem, kafka.getBootstrapServers(), eventFormat, dataSource); - this.pgAsyncPool = pgAsyncPool(postgres); } @BeforeMethod(alwaysRun = true) @@ -165,9 +164,8 @@ public void init() throws SQLException { dataSource.getConnection().prepareStatement(SCHEMA).execute(); dataSource.getConnection().prepareStatement(INIT_TABLE_QUERY).execute(); - - - + this.pgAsyncPool = pgAsyncPool(postgres); + consistentProjection = new TestConsistentProjection(actorSystem, kafka.getBootstrapServers(), eventFormat, dataSource); } private PgAsyncPool pgAsyncPool(PostgreSQLContainer server) { @@ -176,12 +174,14 @@ private PgAsyncPool pgAsyncPool(PostgreSQLContainer server) { jooqConfig.setSQLDialect(SQLDialect.POSTGRES); final PgConnectOptions options = new PgConnectOptions() + .setConnectTimeout(10) .setPort(server.getMappedPort(PostgreSQLContainer.POSTGRESQL_PORT)) .setHost(server.getContainerIpAddress()) .setDatabase(server.getDatabaseName()) .setUser(server.getUsername()) .setPassword(server.getPassword()); - PoolOptions poolOptions = new PoolOptions().setMaxSize(50); + PoolOptions poolOptions = new PoolOptions() + .setMaxSize(50); this.pgPool = PgPool.pool(vertx, options, poolOptions); return new ReactivePgAsyncPool(pgPool, jooqConfig); diff --git a/thoth-tck/src/main/java/fr/maif/eventsourcing/datastore/DataStoreVerification.java b/thoth-tck/src/main/java/fr/maif/eventsourcing/datastore/DataStoreVerification.java index 7967a4e2..262adc35 100644 --- a/thoth-tck/src/main/java/fr/maif/eventsourcing/datastore/DataStoreVerification.java +++ b/thoth-tck/src/main/java/fr/maif/eventsourcing/datastore/DataStoreVerification.java @@ -159,7 +159,7 @@ public void required_eventShouldBePublishedEventIfBrokerIsDownAtFirst() { } @Override - // @Test + @Test public void required_commandSubmissionShouldFailIfDatabaseIsNotAvailable() { String topic = randomKafkaTopic(); EventProcessor eventProcessor = eventProcessor(topic); diff --git a/thoth-tck/src/main/java/fr/maif/eventsourcing/datastore/DataStoreVerificationRules.java b/thoth-tck/src/main/java/fr/maif/eventsourcing/datastore/DataStoreVerificationRules.java index a2e0d1f1..20a932f1 100644 --- a/thoth-tck/src/main/java/fr/maif/eventsourcing/datastore/DataStoreVerificationRules.java +++ b/thoth-tck/src/main/java/fr/maif/eventsourcing/datastore/DataStoreVerificationRules.java @@ -1,5 +1,7 @@ package fr.maif.eventsourcing.datastore; +import java.util.List; + import fr.maif.eventsourcing.Event; import fr.maif.eventsourcing.EventEnvelope; import fr.maif.eventsourcing.EventProcessor; @@ -10,8 +12,6 @@ import io.vavr.control.Either; import io.vavr.control.Option; -import java.util.List; - public interface DataStoreVerificationRules { Either> submitValidCommand(EventProcessor eventProcessor, String id); void submitInvalidCommand(EventProcessor eventProcessor, String id);