From ce6c7027a946f01e43e3ddfaca1616158f5f843d Mon Sep 17 00:00:00 2001 From: YoEight Date: Wed, 17 May 2023 11:34:45 -0400 Subject: [PATCH] Fix client error when connecting on esdb >= 22.10. --- .../main/protobuf/EventStoreMessages.proto | 403 ++++++++++-------- .../scala/eventstore/core/EsException.scala | 3 +- .../scala/eventstore/core/SystemError.scala | 10 + ...eatePersistentSubscriptionInspection.scala | 1 + ...letePersistentSubscriptionInspection.scala | 3 +- .../operations/DeleteStreamInspection.scala | 1 + .../operations/ReadAllEventsInspection.scala | 1 + .../core/operations/ReadEventInspection.scala | 1 + .../ReadStreamEventsInspection.scala | 1 + .../ScavengeDatabaseInspection.scala | 1 + .../TransactionCommitInspection.scala | 1 + .../TransactionStartInspection.scala | 1 + .../TransactionWriteInspection.scala | 1 + ...datePersistentSubscriptionInspection.scala | 1 + .../operations/WriteEventsInspection.scala | 1 + .../core/tcp/EventStoreProtoFormats.scala | 71 +-- 16 files changed, 298 insertions(+), 203 deletions(-) diff --git a/core/src/main/protobuf/EventStoreMessages.proto b/core/src/main/protobuf/EventStoreMessages.proto index 1faa4192..1e40b152 100644 --- a/core/src/main/protobuf/EventStoreMessages.proto +++ b/core/src/main/protobuf/EventStoreMessages.proto @@ -1,7 +1,8 @@ -syntax = "proto2"; +syntax = "proto3"; package eventstore.proto; -enum OperationResult { +enum OperationResult +{ Success = 0; PrepareTimeout = 1; CommitTimeout = 2; @@ -13,114 +14,115 @@ enum OperationResult { } message NewEvent { - required bytes event_id = 1; - required string event_type = 2; - required int32 data_content_type = 3; - required int32 metadata_content_type = 4; - required bytes data = 5; - optional bytes metadata = 6; + bytes event_id = 1; + string event_type = 2; + int32 data_content_type = 3; + int32 metadata_content_type = 4; + bytes data = 5; + bytes metadata = 6; } message EventRecord { - required string event_stream_id = 1; - required int64 event_number = 2; - required bytes event_id = 3; - required string event_type = 4; - required int32 data_content_type = 5; - required int32 metadata_content_type = 6; - required bytes data = 7; - optional bytes metadata = 8; - optional int64 created = 9; - optional int64 created_epoch = 10; + string event_stream_id = 1; + int64 event_number = 2; + bytes event_id = 3; + string event_type = 4; + int32 data_content_type = 5; + int32 metadata_content_type = 6; + bytes data = 7; + bytes metadata = 8; + int64 created = 9; + int64 created_epoch = 10; } message ResolvedIndexedEvent { - optional EventRecord event = 1; // marked as optional to make compatible with protoc 2.5.0 - optional EventRecord link = 2; + EventRecord event = 1; + EventRecord link = 2; } message ResolvedEvent { - optional EventRecord event = 1; - optional EventRecord link = 2; - required int64 commit_position = 3; - required int64 prepare_position = 4; + EventRecord event = 1; + EventRecord link = 2; + int64 commit_position = 3; + int64 prepare_position = 4; } message WriteEvents { - required string event_stream_id = 1; - required int64 expected_version = 2; + string event_stream_id = 1; + int64 expected_version = 2; repeated NewEvent events = 3; - required bool require_master = 4; + bool require_leader = 4; } message WriteEventsCompleted { - required OperationResult result = 1; - optional string message = 2; - required int64 first_event_number = 3; - required int64 last_event_number = 4; - optional int64 prepare_position = 5; - optional int64 commit_position = 6; - // optional int64 current_version = 7; -- Add later, https://github.com/EventStore/EventStore/commit/95832cf1320ae74d3a79c3950547b54e28b8f3b6#diff-fa5234643c26663dd9d0e5ff7c836e16 + OperationResult result = 1; + string message = 2; + int64 first_event_number = 3; + int64 last_event_number = 4; + int64 prepare_position = 5; + int64 commit_position = 6; + int64 current_version = 7; } message DeleteStream { - required string event_stream_id = 1; - required int64 expected_version = 2; - required bool require_master = 3; - optional bool hard_delete = 4; + string event_stream_id = 1; + int64 expected_version = 2; + bool require_leader = 3; + bool hard_delete = 4; } message DeleteStreamCompleted { - required OperationResult result = 1; - optional string message = 2; - optional int64 prepare_position = 3; - optional int64 commit_position = 4; + OperationResult result = 1; + string message = 2; + int64 prepare_position = 3; + int64 commit_position = 4; + int64 current_version = 5; } message TransactionStart { - required string event_stream_id = 1; - required int64 expected_version = 2; - required bool require_master = 3; + string event_stream_id = 1; + int64 expected_version = 2; + bool require_leader = 3; } message TransactionStartCompleted { - required int64 transaction_id = 1; - required OperationResult result = 2; - optional string message = 3; + int64 transaction_id = 1; + OperationResult result = 2; + string message = 3; } message TransactionWrite { - required int64 transaction_id = 1; + int64 transaction_id = 1; repeated NewEvent events = 2; - required bool require_master = 3; + bool require_leader = 3; } message TransactionWriteCompleted { - required int64 transaction_id = 1; - required OperationResult result = 2; - optional string message = 3; + int64 transaction_id = 1; + OperationResult result = 2; + string message = 3; } message TransactionCommit { - required int64 transaction_id = 1; - required bool require_master = 2; + int64 transaction_id = 1; + bool require_leader = 2; } message TransactionCommitCompleted { - required int64 transaction_id = 1; - required OperationResult result = 2; - optional string message = 3; - required int64 first_event_number = 4; - required int64 last_event_number = 5; - optional int64 prepare_position = 6; - optional int64 commit_position = 7; + int64 transaction_id = 1; + OperationResult result = 2; + string message = 3; + int64 first_event_number = 4; + int64 last_event_number = 5; + int64 prepare_position = 6; + int64 commit_position = 7; } message ReadEvent { - required string event_stream_id = 1; - required int64 event_number = 2; - required bool resolve_link_tos = 3; - required bool require_master = 4; + string event_stream_id = 1; + int64 event_number = 2; + bool resolve_link_tos = 3; + bool require_leader = 4; } message ReadEventCompleted { @@ -134,18 +136,18 @@ message ReadEventCompleted { AccessDenied = 5; } - required ReadEventResult result = 1; - required ResolvedIndexedEvent event = 2; + ReadEventResult result = 1; + ResolvedIndexedEvent event = 2; - optional string error = 3; + string error = 3; } message ReadStreamEvents { - required string event_stream_id = 1; - required int64 from_event_number = 2; - required int32 max_count = 3; - required bool resolve_link_tos = 4; - required bool require_master = 5; + string event_stream_id = 1; + int64 from_event_number = 2; + int32 max_count = 3; + bool resolve_link_tos = 4; + bool require_leader = 5; } message ReadStreamEventsCompleted { @@ -160,21 +162,21 @@ message ReadStreamEventsCompleted { } repeated ResolvedIndexedEvent events = 1; - required ReadStreamResult result = 2; - required int64 next_event_number = 3; - required int64 last_event_number = 4; - required bool is_end_of_stream = 5; - required int64 last_commit_position = 6; + ReadStreamResult result = 2; + int64 next_event_number = 3; + int64 last_event_number = 4; + bool is_end_of_stream = 5; + int64 last_commit_position = 6; - optional string error = 7; + string error = 7; } message ReadAllEvents { - required int64 commit_position = 1; - required int64 prepare_position = 2; - required int32 max_count = 3; - required bool resolve_link_tos = 4; - required bool require_master = 5; + int64 commit_position = 1; + int64 prepare_position = 2; + int32 max_count = 3; + bool resolve_link_tos = 4; + bool require_leader = 5; } message ReadAllEventsCompleted { @@ -186,57 +188,105 @@ message ReadAllEventsCompleted { AccessDenied = 3; } - required int64 commit_position = 1; - required int64 prepare_position = 2; + int64 commit_position = 1; + int64 prepare_position = 2; repeated ResolvedEvent events = 3; - required int64 next_commit_position = 4; - required int64 next_prepare_position = 5; + int64 next_commit_position = 4; + int64 next_prepare_position = 5; - optional ReadAllResult result = 6 [default = Success]; - optional string error = 7; + ReadAllResult result = 6; + string error = 7; +} + +message Filter{ + + enum FilterContext { + StreamId = 0; + EventType = 1; + + } + + enum FilterType { + Regex = 0; + Prefix = 1; + } + + FilterContext context = 1; + FilterType type = 2; + repeated string data = 3; +} + +message FilteredReadAllEvents { + int64 commit_position = 1; + int64 prepare_position = 2; + int32 max_count = 3; + int32 max_search_window = 4; + bool resolve_link_tos = 5; + bool require_leader = 6; + Filter filter = 7; +} + +message FilteredReadAllEventsCompleted { + + enum FilteredReadAllResult { + Success = 0; + NotModified = 1; + Error = 2; + AccessDenied = 3; + } + + int64 commit_position = 1; + int64 prepare_position = 2; + repeated ResolvedEvent events = 3; + int64 next_commit_position = 4; + int64 next_prepare_position = 5; + bool is_end_of_stream = 6; + + FilteredReadAllResult result = 7; + string error = 8; } message CreatePersistentSubscription { - required string subscription_group_name = 1; - required string event_stream_id = 2; - required bool resolve_link_tos = 3; - required int64 start_from = 4; - required int32 message_timeout_milliseconds = 5; - required bool record_statistics = 6; - required int32 live_buffer_size = 7; - required int32 read_batch_size = 8; - required int32 buffer_size = 9; - required int32 max_retry_count = 10; - required bool prefer_round_robin = 11; - required int32 checkpoint_after_time = 12; - required int32 checkpoint_max_count = 13; - required int32 checkpoint_min_count = 14; - required int32 subscriber_max_count = 15; - optional string named_consumer_strategy = 16; + string subscription_group_name = 1; + string event_stream_id = 2; + bool resolve_link_tos = 3; + int64 start_from = 4; + int32 message_timeout_milliseconds = 5; + bool record_statistics = 6; + int32 live_buffer_size = 7; + int32 read_batch_size = 8; + int32 buffer_size = 9; + int32 max_retry_count = 10; + bool prefer_round_robin = 11; + int32 checkpoint_after_time = 12; + int32 checkpoint_max_count = 13; + int32 checkpoint_min_count = 14; + int32 subscriber_max_count = 15; + string named_consumer_strategy = 16; } message DeletePersistentSubscription { - required string subscription_group_name = 1; - required string event_stream_id = 2; + string subscription_group_name = 1; + string event_stream_id = 2; } message UpdatePersistentSubscription { - required string subscription_group_name = 1; - required string event_stream_id = 2; - required bool resolve_link_tos = 3; - required int64 start_from = 4; - required int32 message_timeout_milliseconds = 5; - required bool record_statistics = 6; - required int32 live_buffer_size = 7; - required int32 read_batch_size = 8; - required int32 buffer_size = 9; - required int32 max_retry_count = 10; - required bool prefer_round_robin = 11; - required int32 checkpoint_after_time = 12; - required int32 checkpoint_max_count = 13; - required int32 checkpoint_min_count = 14; - required int32 subscriber_max_count = 15; - optional string named_consumer_strategy = 16; + string subscription_group_name = 1; + string event_stream_id = 2; + bool resolve_link_tos = 3; + int64 start_from = 4; + int32 message_timeout_milliseconds = 5; + bool record_statistics = 6; + int32 live_buffer_size = 7; + int32 read_batch_size = 8; + int32 buffer_size = 9; + int32 max_retry_count = 10; + bool prefer_round_robin = 11; + int32 checkpoint_after_time = 12; + int32 checkpoint_max_count = 13; + int32 checkpoint_min_count = 14; + int32 subscriber_max_count = 15; + string named_consumer_strategy = 16; } message UpdatePersistentSubscriptionCompleted { @@ -244,10 +294,10 @@ message UpdatePersistentSubscriptionCompleted { Success = 0; DoesNotExist = 1; Fail = 2; - AccessDenied = 3; + AccessDenied=3; } - required UpdatePersistentSubscriptionResult result = 1 [default = Success]; - optional string reason = 2; + UpdatePersistentSubscriptionResult result = 1; + string reason = 2; } message CreatePersistentSubscriptionCompleted { @@ -255,10 +305,10 @@ message CreatePersistentSubscriptionCompleted { Success = 0; AlreadyExists = 1; Fail = 2; - AccessDenied = 3; + AccessDenied=3; } - required CreatePersistentSubscriptionResult result = 1 [default = Success]; - optional string reason = 2; + CreatePersistentSubscriptionResult result = 1; + string reason = 2; } message DeletePersistentSubscriptionCompleted { @@ -268,19 +318,19 @@ message DeletePersistentSubscriptionCompleted { Fail = 2; AccessDenied = 3; } - required DeletePersistentSubscriptionResult result = 1 [default = Success]; - optional string reason = 2; + DeletePersistentSubscriptionResult result = 1; + string reason = 2; } message ConnectToPersistentSubscription { - required string subscription_id = 1; - required string event_stream_id = 2; - required int32 allowed_in_flight_messages = 3; + string subscription_id = 1; + string event_stream_id = 2; + int32 allowed_in_flight_messages = 3; } message PersistentSubscriptionAckEvents { - required string subscription_id = 1; + string subscription_id = 1; repeated bytes processed_event_ids = 2; } @@ -293,34 +343,47 @@ message PersistentSubscriptionNakEvents { Stop = 4; } - required string subscription_id = 1; + string subscription_id = 1; repeated bytes processed_event_ids = 2; - optional string message = 3; - required NakAction action = 4 [default = Unknown]; + string message = 3; + NakAction action = 4; } message PersistentSubscriptionConfirmation { - required int64 last_commit_position = 1; - required string subscription_id = 2; - optional int64 last_event_number = 3; + int64 last_commit_position = 1; + string subscription_id = 2; + int64 last_event_number = 3; } message PersistentSubscriptionStreamEventAppeared { - required ResolvedIndexedEvent event = 1; + ResolvedIndexedEvent event = 1; + int32 retryCount = 2; } message SubscribeToStream { - required string event_stream_id = 1; - required bool resolve_link_tos = 2; + string event_stream_id = 1; + bool resolve_link_tos = 2; +} + +message FilteredSubscribeToStream { + string event_stream_id = 1; + bool resolve_link_tos = 2; + Filter filter = 3; + int32 checkpoint_interval = 4; +} + +message CheckpointReached { + int64 commit_position = 1; + int64 prepare_position = 2; } message SubscriptionConfirmation { - required int64 last_commit_position = 1; - optional int64 last_event_number = 2; + int64 last_commit_position = 1; + int64 last_event_number = 2; } message StreamEventAppeared { - required ResolvedEvent event = 1; + ResolvedEvent event = 1; } message UnsubscribeFromStream { @@ -331,12 +394,12 @@ message SubscriptionDropped { enum SubscriptionDropReason { Unsubscribed = 0; AccessDenied = 1; - NotFound = 2; - PersistentSubscriptionDeleted = 3; - SubscriberMaxCountReached = 4; + NotFound=2; + PersistentSubscriptionDeleted=3; + SubscriberMaxCountReached=4; } - optional SubscriptionDropReason reason = 1 [default = Unsubscribed]; + SubscriptionDropReason reason = 1; } message NotHandled { @@ -344,20 +407,20 @@ message NotHandled { enum NotHandledReason { NotReady = 0; TooBusy = 1; - NotMaster = 2; + NotLeader = 2; IsReadOnly = 3; } - required NotHandledReason reason = 1; - optional MasterInfo additional_info = 2; // changed in jvmclient by purpose + NotHandledReason reason = 1; + bytes additional_info = 2; - message MasterInfo { - required string external_tcp_address = 1; - required int32 external_tcp_port = 2; - required string external_http_address = 3; - required int32 external_http_port = 4; - optional string external_secure_tcp_address = 5; - optional int32 external_secure_tcp_port = 6; + message LeaderInfo { + string external_tcp_address = 1; + int32 external_tcp_port = 2; + string http_address = 3; + int32 http_port = 4; + string external_secure_tcp_address = 5; + int32 external_secure_tcp_port = 6; } } @@ -367,19 +430,19 @@ message ScavengeDatabase { message ScavengeDatabaseResponse { enum ScavengeResult { - Started = 0; - InProgress = 1; + Started = 0; + InProgress = 1; Unauthorized = 2; } - required ScavengeResult result = 1; - optional string scavengeId = 2; -} + ScavengeResult result = 1; + string scavengeId = 2; +} message IdentifyClient { - required int32 version = 1; // 1 to identify as 64-bit, 0 as legacy 32-bit - optional string connection_name = 2; + int32 version = 1; + string connection_name = 2; } message ClientIdentified { diff --git a/core/src/main/scala/eventstore/core/EsException.scala b/core/src/main/scala/eventstore/core/EsException.scala index c79ce2b5..7a88fc6b 100644 --- a/core/src/main/scala/eventstore/core/EsException.scala +++ b/core/src/main/scala/eventstore/core/EsException.scala @@ -29,4 +29,5 @@ object CannotEstablishConnectionException { @SerialVersionUID(1L) case object ScavengeUnauthorizedException extends EsException("Not authorized to scavenge") @SerialVersionUID(1L) final case class CommandNotExpectedException(message: String) extends EsException(message) @SerialVersionUID(1L) final case class RetriesLimitReachedException(message: String) extends EsException(message) -@SerialVersionUID(1L) final case class InvalidOperationException(message: String) extends EsException(message) \ No newline at end of file +@SerialVersionUID(1L) final case class InvalidOperationException(message: String) extends EsException(message) +@SerialVersionUID(1L) case object UnrecognizedException extends EsException("Unrecognized error from the server") \ No newline at end of file diff --git a/core/src/main/scala/eventstore/core/SystemError.scala b/core/src/main/scala/eventstore/core/SystemError.scala index 2ed20035..460973d3 100644 --- a/core/src/main/scala/eventstore/core/SystemError.scala +++ b/core/src/main/scala/eventstore/core/SystemError.scala @@ -23,6 +23,7 @@ object OperationError { case object StreamDeleted extends OperationError case object InvalidTransaction extends OperationError case object AccessDenied extends OperationError + case object Unrecognized extends OperationError } sealed trait ReadEventError extends ServerError @@ -32,6 +33,7 @@ object ReadEventError { case object StreamNotFound extends ReadEventError case object StreamDeleted extends ReadEventError case object AccessDenied extends ReadEventError + case object Unrecognized extends ReadEventError final case class Error(message: Option[String]) extends ReadEventError { override def toString = s"ReadEventError($message)" } @@ -43,6 +45,7 @@ object ReadStreamEventsError { case object StreamNotFound extends ReadStreamEventsError case object StreamDeleted extends ReadStreamEventsError case object AccessDenied extends ReadStreamEventsError + case object Unrecognized extends ReadStreamEventsError final case class Error(message: Option[String]) extends ReadStreamEventsError { override def toString = s"ReadStreamEventsError($message)" } @@ -52,6 +55,7 @@ sealed trait ReadAllEventsError extends ServerError object ReadAllEventsError { case object AccessDenied extends ReadAllEventsError + case object Unrecognized extends ReadAllEventsError final case class Error(message: Option[String]) extends ReadAllEventsError { override def toString = s"ReadAllEventsError($message)" } @@ -68,6 +72,7 @@ object NotHandled { case object TooBusy extends Reason final case class NotMaster(mi: MasterInfo) extends Reason case object IsReadOnly extends Reason + case object Unrecognized extends Reason final case class MasterInfo( tcpAddress: InetSocketAddress, @@ -87,6 +92,7 @@ object SubscriptionDropped { case object NotFound extends SubscriptionDropped case object PersistentSubscriptionDeleted extends SubscriptionDropped case object SubscriberMaxCountReached extends SubscriptionDropped + case object Unrecognized extends SubscriptionDropped } sealed trait ScavengeError extends SystemError @@ -94,6 +100,7 @@ sealed trait ScavengeError extends SystemError object ScavengeError { case object InProgress extends ScavengeError case object Unauthorized extends ScavengeError + case object Unrecognized extends ScavengeError } sealed trait CreatePersistentSubscriptionError extends SystemError @@ -101,6 +108,7 @@ sealed trait CreatePersistentSubscriptionError extends SystemError object CreatePersistentSubscriptionError { case object AccessDenied extends CreatePersistentSubscriptionError case object AlreadyExists extends CreatePersistentSubscriptionError + case object Unrecognized extends CreatePersistentSubscriptionError final case class Error(message: Option[String]) extends CreatePersistentSubscriptionError { override def toString = s"CreatePersistentSubscriptionError($message)" } @@ -111,6 +119,7 @@ sealed trait UpdatePersistentSubscriptionError extends SystemError object UpdatePersistentSubscriptionError { case object AccessDenied extends UpdatePersistentSubscriptionError case object DoesNotExist extends UpdatePersistentSubscriptionError + case object Unrecognized extends UpdatePersistentSubscriptionError final case class Error(message: Option[String]) extends UpdatePersistentSubscriptionError { override def toString = s"UpdatePersistentSubscriptionError($message)" } @@ -121,6 +130,7 @@ sealed trait DeletePersistentSubscriptionError extends SystemError object DeletePersistentSubscriptionError { case object AccessDenied extends DeletePersistentSubscriptionError case object DoesNotExist extends DeletePersistentSubscriptionError + case object Unrecognized extends DeletePersistentSubscriptionError final case class Error(message: Option[String]) extends DeletePersistentSubscriptionError { override def toString = s"DeletePersistentSubscriptionError($message)" } diff --git a/core/src/main/scala/eventstore/core/operations/CreatePersistentSubscriptionInspection.scala b/core/src/main/scala/eventstore/core/operations/CreatePersistentSubscriptionInspection.scala index 1b8dbb5b..bcc08c87 100644 --- a/core/src/main/scala/eventstore/core/operations/CreatePersistentSubscriptionInspection.scala +++ b/core/src/main/scala/eventstore/core/operations/CreatePersistentSubscriptionInspection.scala @@ -13,6 +13,7 @@ private[eventstore] final case class CreatePersistentSubscriptionInspection(out: val result = error match { case AccessDenied => AccessDeniedException(s"Read access denied for $streamId") case AlreadyExists => InvalidOperationException(s"Subscription group ${out.groupName} on stream $streamId already exists") + case Unrecognized => UnrecognizedException case e: Error => ServerErrorException(e.message.getOrElse(e.toString)) } Fail(result) diff --git a/core/src/main/scala/eventstore/core/operations/DeletePersistentSubscriptionInspection.scala b/core/src/main/scala/eventstore/core/operations/DeletePersistentSubscriptionInspection.scala index 20ef4cda..ae850e4e 100644 --- a/core/src/main/scala/eventstore/core/operations/DeletePersistentSubscriptionInspection.scala +++ b/core/src/main/scala/eventstore/core/operations/DeletePersistentSubscriptionInspection.scala @@ -2,7 +2,7 @@ package eventstore package core package operations -import DeletePersistentSubscriptionError.{Error, DoesNotExist, AccessDenied} +import DeletePersistentSubscriptionError.{Error, DoesNotExist, AccessDenied, Unrecognized} import PersistentSubscription.{Delete, DeleteCompleted} import Inspection.Decision.Fail @@ -13,6 +13,7 @@ private[eventstore] final case class DeletePersistentSubscriptionInspection(out: val result = error match { case AccessDenied => AccessDeniedException(s"Write access denied for stream $streamId") case DoesNotExist => InvalidOperationException(s"Subscription group ${out.groupName} on stream $streamId does not exist") + case Unrecognized => UnrecognizedException case e: Error => ServerErrorException(e.message.getOrElse(e.toString)) } Fail(result) diff --git a/core/src/main/scala/eventstore/core/operations/DeleteStreamInspection.scala b/core/src/main/scala/eventstore/core/operations/DeleteStreamInspection.scala index fb39c9f2..02ecca02 100644 --- a/core/src/main/scala/eventstore/core/operations/DeleteStreamInspection.scala +++ b/core/src/main/scala/eventstore/core/operations/DeleteStreamInspection.scala @@ -17,6 +17,7 @@ private[eventstore] final case class DeleteStreamInspection(out: DeleteStream) case StreamDeleted => Fail(streamDeletedException) case InvalidTransaction => Fail(InvalidTransactionException) case AccessDenied => Fail(new AccessDeniedException(s"Write access denied for $streamId")) + case Unrecognized => Fail(UnrecognizedException) } } diff --git a/core/src/main/scala/eventstore/core/operations/ReadAllEventsInspection.scala b/core/src/main/scala/eventstore/core/operations/ReadAllEventsInspection.scala index dafc10a9..1dca2e59 100644 --- a/core/src/main/scala/eventstore/core/operations/ReadAllEventsInspection.scala +++ b/core/src/main/scala/eventstore/core/operations/ReadAllEventsInspection.scala @@ -12,6 +12,7 @@ private[eventstore] final case class ReadAllEventsInspection(out: ReadAllEvents) error match { case e: Error => Fail(ServerErrorException(e.message.getOrElse(e.toString))) case AccessDenied => Fail(AccessDeniedException(s"Read access denied for $streamId")) + case Unrecognized => Fail(UnrecognizedException) } } diff --git a/core/src/main/scala/eventstore/core/operations/ReadEventInspection.scala b/core/src/main/scala/eventstore/core/operations/ReadEventInspection.scala index bdb5de48..6d56e50a 100644 --- a/core/src/main/scala/eventstore/core/operations/ReadEventInspection.scala +++ b/core/src/main/scala/eventstore/core/operations/ReadEventInspection.scala @@ -15,6 +15,7 @@ private[eventstore] final case class ReadEventInspection(out: ReadEvent) case StreamDeleted => StreamDeletedException(s"Read failed due to $streamId has been deleted") case e: Error => ServerErrorException(e.message.getOrElse(e.toString)) case AccessDenied => AccessDeniedException(s"Read access denied for $streamId") + case Unrecognized => UnrecognizedException } Fail(result) diff --git a/core/src/main/scala/eventstore/core/operations/ReadStreamEventsInspection.scala b/core/src/main/scala/eventstore/core/operations/ReadStreamEventsInspection.scala index da8a967f..ebede580 100644 --- a/core/src/main/scala/eventstore/core/operations/ReadStreamEventsInspection.scala +++ b/core/src/main/scala/eventstore/core/operations/ReadStreamEventsInspection.scala @@ -14,6 +14,7 @@ private[eventstore] final case class ReadStreamEventsInspection(out: ReadStreamE case StreamDeleted => StreamDeletedException(s"Read failed due to $streamId has been deleted") case e: Error => ServerErrorException(e.message.getOrElse(e.toString)) case AccessDenied => AccessDeniedException(s"Read access denied for $streamId") + case Unrecognized => UnrecognizedException } Fail(result) diff --git a/core/src/main/scala/eventstore/core/operations/ScavengeDatabaseInspection.scala b/core/src/main/scala/eventstore/core/operations/ScavengeDatabaseInspection.scala index 2325ca98..5463eb34 100644 --- a/core/src/main/scala/eventstore/core/operations/ScavengeDatabaseInspection.scala +++ b/core/src/main/scala/eventstore/core/operations/ScavengeDatabaseInspection.scala @@ -12,6 +12,7 @@ private[eventstore] object ScavengeDatabaseInspection val result = error match { case InProgress => ScavengeInProgressException case Unauthorized => ScavengeUnauthorizedException + case Unrecognized => UnrecognizedException } Fail(result) } diff --git a/core/src/main/scala/eventstore/core/operations/TransactionCommitInspection.scala b/core/src/main/scala/eventstore/core/operations/TransactionCommitInspection.scala index 12df11b1..1ca964e5 100644 --- a/core/src/main/scala/eventstore/core/operations/TransactionCommitInspection.scala +++ b/core/src/main/scala/eventstore/core/operations/TransactionCommitInspection.scala @@ -19,6 +19,7 @@ private[eventstore] final case class TransactionCommitInspection(out: Transactio case StreamDeleted => Fail(streamDeletedException) case InvalidTransaction => Fail(InvalidTransactionException) case AccessDenied => Fail(AccessDeniedException(s"Write access denied")) + case Unrecognized => Fail(UnrecognizedException) } } diff --git a/core/src/main/scala/eventstore/core/operations/TransactionStartInspection.scala b/core/src/main/scala/eventstore/core/operations/TransactionStartInspection.scala index 58c08bd9..37ec43a9 100644 --- a/core/src/main/scala/eventstore/core/operations/TransactionStartInspection.scala +++ b/core/src/main/scala/eventstore/core/operations/TransactionStartInspection.scala @@ -20,6 +20,7 @@ private[eventstore] final case class TransactionStartInspection(out: Transaction case StreamDeleted => Fail(streamDeletedException) case InvalidTransaction => Fail(InvalidTransactionException) case AccessDenied => Fail(AccessDeniedException(s"Write access denied for $streamId")) + case Unrecognized => Fail(UnrecognizedException) } } diff --git a/core/src/main/scala/eventstore/core/operations/TransactionWriteInspection.scala b/core/src/main/scala/eventstore/core/operations/TransactionWriteInspection.scala index 8d364ee9..20fecb92 100644 --- a/core/src/main/scala/eventstore/core/operations/TransactionWriteInspection.scala +++ b/core/src/main/scala/eventstore/core/operations/TransactionWriteInspection.scala @@ -17,6 +17,7 @@ private[eventstore] final case class TransactionWriteInspection(out: Transaction case StreamDeleted => Unexpected case InvalidTransaction => Unexpected case AccessDenied => Fail(AccessDeniedException(s"Write access denied")) + case Unrecognized => Fail(UnrecognizedException) } } } \ No newline at end of file diff --git a/core/src/main/scala/eventstore/core/operations/UpdatePersistentSubscriptionInspection.scala b/core/src/main/scala/eventstore/core/operations/UpdatePersistentSubscriptionInspection.scala index 9c5fe3db..0a5398c9 100644 --- a/core/src/main/scala/eventstore/core/operations/UpdatePersistentSubscriptionInspection.scala +++ b/core/src/main/scala/eventstore/core/operations/UpdatePersistentSubscriptionInspection.scala @@ -16,6 +16,7 @@ private[eventstore] final case class UpdatePersistentSubscriptionInspection(out: case AccessDenied => AccessDeniedException(s"Write access denied for stream $streamId") case DoesNotExist => InvalidOperationException(s"Subscription group ${out.groupName} on stream $streamId does not exist") case e: Error => ServerErrorException(e.message.getOrElse(e.toString)) + case Unrecognized => UnrecognizedException } Fail(result) } diff --git a/core/src/main/scala/eventstore/core/operations/WriteEventsInspection.scala b/core/src/main/scala/eventstore/core/operations/WriteEventsInspection.scala index c0e8e19a..46a2c36a 100644 --- a/core/src/main/scala/eventstore/core/operations/WriteEventsInspection.scala +++ b/core/src/main/scala/eventstore/core/operations/WriteEventsInspection.scala @@ -19,6 +19,7 @@ private[eventstore] final case class WriteEventsInspection(out: WriteEvents) case StreamDeleted => Fail(streamDeleted) case InvalidTransaction => Fail(InvalidTransactionException) case AccessDenied => Fail(accessDenied) + case Unrecognized => Fail(UnrecognizedException) } def accessDenied: AccessDeniedException = AccessDeniedException( diff --git a/core/src/main/scala/eventstore/core/tcp/EventStoreProtoFormats.scala b/core/src/main/scala/eventstore/core/tcp/EventStoreProtoFormats.scala index da7694b2..afd93ae1 100644 --- a/core/src/main/scala/eventstore/core/tcp/EventStoreProtoFormats.scala +++ b/core/src/main/scala/eventstore/core/tcp/EventStoreProtoFormats.scala @@ -42,6 +42,7 @@ trait EventStoreProtoFormats extends DefaultProtoFormats with DefaultFormats { case StreamDeleted => failure(E.StreamDeleted) case InvalidTransaction => failure(E.InvalidTransaction) case AccessDenied => failure(E.AccessDenied) + case UNRECOGNIZED => failure(E.Unrecognized) } } @@ -88,7 +89,7 @@ trait EventStoreProtoFormats extends DefaultProtoFormats with DefaultFormats { metadata = Content(byteString(x.getMetadata), ContentType(x.getMetadataContentType)) ), created = option( - x.hasCreatedEpoch, ZonedDateTime.ofInstant(Instant.ofEpochMilli(x.getCreatedEpoch), ZoneId.systemDefault) + x.getCreatedEpoch != 0L, ZonedDateTime.ofInstant(Instant.ofEpochMilli(x.getCreatedEpoch), ZoneId.systemDefault) ) ) } @@ -129,7 +130,7 @@ trait EventStoreProtoFormats extends DefaultProtoFormats with DefaultFormats { builder.setEventStreamId(x.streamId.streamId) builder.setExpectedVersion(expectedVersion(x.expectedVersion)) builder.addAllEvents(x.events.map(EventDataWriter.toProto(_).build()).asJava) - builder.setRequireMaster(x.requireMaster) + builder.setRequireLeader(x.requireMaster) builder } } @@ -145,7 +146,7 @@ trait EventStoreProtoFormats extends DefaultProtoFormats with DefaultFormats { val numbersRange = EventNumber.Range.opt(x.getFirstEventNumber, x.getLastEventNumber) - val position = if(x.hasCommitPosition && x.hasPreparePosition) { + val position = if(x.getCommitPosition != 0L && x.getPreparePosition != 0L) { positionOpt(x.getCommitPosition, x.getPreparePosition) } else None @@ -158,7 +159,7 @@ trait EventStoreProtoFormats extends DefaultProtoFormats with DefaultFormats { val builder = j.DeleteStream.newBuilder() builder.setEventStreamId(x.streamId.streamId) builder.setExpectedVersion(expectedVersion(x.expectedVersion)) - builder.setRequireMaster(x.requireMaster) + builder.setRequireLeader(x.requireMaster) builder.setHardDelete(x.hard) builder } @@ -170,7 +171,7 @@ trait EventStoreProtoFormats extends DefaultProtoFormats with DefaultFormats { def parse: Array[Byte] => j.DeleteStreamCompleted = j.DeleteStreamCompleted.parseFrom def success(x: j.DeleteStreamCompleted): DeleteStreamCompleted = { - val position = if(x.hasCommitPosition && x.hasPreparePosition) { + val position = if(x.getCommitPosition != 0L && x.getPreparePosition != 0L) { positionOpt(x.getCommitPosition, x.getPreparePosition) } else None @@ -183,7 +184,7 @@ trait EventStoreProtoFormats extends DefaultProtoFormats with DefaultFormats { val builder = j.TransactionStart.newBuilder() builder.setEventStreamId(x.streamId.streamId) builder.setExpectedVersion(expectedVersion(x.expectedVersion)) - builder.setRequireMaster(x.requireMaster) + builder.setRequireLeader(x.requireMaster) builder } } @@ -200,7 +201,7 @@ trait EventStoreProtoFormats extends DefaultProtoFormats with DefaultFormats { val builder = j.TransactionWrite.newBuilder() builder.setTransactionId(x.transactionId) builder.addAllEvents(x.events.map(EventDataWriter.toProto(_).build()).asJava) - builder.setRequireMaster(x.requireMaster) + builder.setRequireLeader(x.requireMaster) builder } } @@ -216,7 +217,7 @@ trait EventStoreProtoFormats extends DefaultProtoFormats with DefaultFormats { def toProto(x: TransactionCommit): j.TransactionCommit.Builder = { val builder = j.TransactionCommit.newBuilder() builder.setTransactionId(x.transactionId) - builder.setRequireMaster(x.requireMaster) + builder.setRequireLeader(x.requireMaster) builder } } @@ -230,7 +231,7 @@ trait EventStoreProtoFormats extends DefaultProtoFormats with DefaultFormats { val numbersRange = EventNumber.Range.opt(x.getFirstEventNumber ,x.getLastEventNumber) - val position = if(x.hasCommitPosition && x.hasPreparePosition) { + val position = if(x.getCommitPosition != 0L && x.getPreparePosition != 0L) { positionOpt(x.getCommitPosition, x.getPreparePosition) } else None @@ -244,7 +245,7 @@ trait EventStoreProtoFormats extends DefaultProtoFormats with DefaultFormats { builder.setEventStreamId(x.streamId.streamId) builder.setEventNumber(EventNumberConverter.from(x.eventNumber)) builder.setResolveLinkTos(x.resolveLinkTos) - builder.setRequireMaster(x.requireMaster) + builder.setRequireLeader(x.requireMaster) builder } } @@ -262,8 +263,9 @@ trait EventStoreProtoFormats extends DefaultProtoFormats with DefaultFormats { case NotFound => failure(E.EventNotFound) case NoStream => failure(E.StreamNotFound) case StreamDeleted => failure(E.StreamDeleted) - case Error => failure(E.Error(message(option(x.hasError, x.getError)))) + case Error => failure(E.Error(message(option(!x.getError.isBlank, x.getError)))) case AccessDenied => failure(E.AccessDenied) + case UNRECOGNIZED => failure(E.Unrecognized) } } } @@ -275,7 +277,7 @@ trait EventStoreProtoFormats extends DefaultProtoFormats with DefaultFormats { builder.setFromEventNumber(EventNumberConverter.from(x.fromNumber)) builder.setMaxCount(x.maxCount) builder.setResolveLinkTos(x.resolveLinkTos) - builder.setRequireMaster(x.requireMaster) + builder.setRequireLeader(x.requireMaster) builder } } @@ -305,8 +307,9 @@ trait EventStoreProtoFormats extends DefaultProtoFormats with DefaultFormats { case NoStream => failure(E.StreamNotFound) case StreamDeleted => failure(E.StreamDeleted) case NotModified => this.failure(new IllegalArgumentException("ReadStreamEventsCompleted.NotModified is not supported")) - case Error => failure(E.Error(message(option(x.hasError, x.getError)))) + case Error => failure(E.Error(message(option(!x.getError.isBlank, x.getError)))) case AccessDenied => failure(E.AccessDenied) + case UNRECOGNIZED => failure(E.Unrecognized) } } } @@ -325,7 +328,7 @@ trait EventStoreProtoFormats extends DefaultProtoFormats with DefaultFormats { builder.setPreparePosition(preparePosition) builder.setMaxCount(x.maxCount) builder.setResolveLinkTos(x.resolveLinkTos) - builder.setRequireMaster(x.requireMaster) + builder.setRequireLeader(x.requireMaster) builder } } @@ -348,13 +351,14 @@ trait EventStoreProtoFormats extends DefaultProtoFormats with DefaultFormats { direction = direction ) - val result = if (x.hasResult) x.getResult else Success + val result = if (x.getResult != null) x.getResult else Success result match { case Success => Try(readAllEventsCompleted) case NotModified => this.failure(new IllegalArgumentException("ReadAllEventsCompleted.NotModified is not supported")) - case Error => failure(E.Error(message(option(x.hasError, x.getError)))) + case Error => failure(E.Error(message(option(!x.getError.isBlank, x.getError)))) case AccessDenied => failure(E.AccessDenied) + case UNRECOGNIZED => failure(E.Unrecognized) } } } @@ -406,8 +410,9 @@ trait EventStoreProtoFormats extends DefaultProtoFormats with DefaultFormats { x.getResult match { case Success => Try(Ps.CreateCompleted) case AlreadyExists => failure(E.AlreadyExists) - case Fail => failure(E.Error(message(option(x.hasReason, x.getReason)))) + case Fail => failure(E.Error(message(option(!x.getReason.isBlank, x.getReason)))) case AccessDenied => failure(E.AccessDenied) + case UNRECOGNIZED => failure(E.Unrecognized) } } } @@ -435,8 +440,9 @@ trait EventStoreProtoFormats extends DefaultProtoFormats with DefaultFormats { x.getResult match { case Success => Try(Ps.DeleteCompleted) case DoesNotExist => failure(E.DoesNotExist) - case Fail => failure(E.Error(message(option(x.hasReason, x.getReason)))) + case Fail => failure(E.Error(message(option(!x.getReason.isBlank, x.getReason)))) case AccessDenied => failure(E.AccessDenied) + case UNRECOGNIZED => failure(E.Unrecognized) } } } @@ -484,8 +490,9 @@ trait EventStoreProtoFormats extends DefaultProtoFormats with DefaultFormats { x.getResult match { case Success => Try(Ps.UpdateCompleted) case DoesNotExist => failure(E.DoesNotExist) - case Fail => failure(E.Error(message(option(x.hasReason, x.getReason)))) + case Fail => failure(E.Error(message(option(!x.getReason.isBlank, x.getReason)))) case AccessDenied => failure(E.AccessDenied) + case UNRECOGNIZED => failure(E.Unrecognized) } } } @@ -509,7 +516,7 @@ trait EventStoreProtoFormats extends DefaultProtoFormats with DefaultFormats { def fromProto(x: j.PersistentSubscriptionConfirmation): PersistentSubscription.Connected = { val eventNumber = for { - x <- option(x.hasLastEventNumber, x.getLastEventNumber) + x <- option(x.getLastEventNumber != 0L, x.getLastEventNumber) y <- EventNumber.Exact.opt(x) } yield y @@ -577,7 +584,7 @@ trait EventStoreProtoFormats extends DefaultProtoFormats with DefaultFormats { j.SubscriptionConfirmation.parseFrom def fromProto(x: j.SubscriptionConfirmation): SubscribeCompleted = - option(x.hasLastEventNumber, x.getLastEventNumber) match { + option(x.getLastEventNumber != 0L, x.getLastEventNumber) match { case None => SubscribeToAllCompleted(x.getLastCommitPosition) case Some(eventNumber) => SubscribeToStreamCompleted( lastCommit = x.getLastCommitPosition, @@ -600,13 +607,14 @@ trait EventStoreProtoFormats extends DefaultProtoFormats with DefaultFormats { import j.SubscriptionDropped.{SubscriptionDropReason => P} def unsubscribed = Try(Unsubscribed) - if (!x.hasReason) unsubscribed + if (x.getReason == null) unsubscribed else x.getReason match { case P.Unsubscribed => unsubscribed case P.AccessDenied => failure(SubscriptionDropped.AccessDenied) case P.NotFound => failure(SubscriptionDropped.NotFound) case P.PersistentSubscriptionDeleted => failure(SubscriptionDropped.PersistentSubscriptionDeleted) case P.SubscriberMaxCountReached => failure(SubscriptionDropped.SubscriberMaxCountReached) + case P.UNRECOGNIZED => failure(SubscriptionDropped.Unrecognized) } } } @@ -621,7 +629,7 @@ trait EventStoreProtoFormats extends DefaultProtoFormats with DefaultFormats { import eventstore.core.{ ScavengeError => E } def scavengeDatabaseResponse = ScavengeDatabaseResponse( - option(x.hasScavengeId, x.getScavengeId) + option(!x.getScavengeId.isBlank, x.getScavengeId) ) def failure(x: ScavengeError) = this.failure(x) @@ -631,6 +639,7 @@ trait EventStoreProtoFormats extends DefaultProtoFormats with DefaultFormats { case Started => Try(scavengeDatabaseResponse) case InProgress => failure(E.InProgress) case Unauthorized => failure(E.Unauthorized) + case UNRECOGNIZED => failure(E.Unrecognized) } } } @@ -640,26 +649,26 @@ trait EventStoreProtoFormats extends DefaultProtoFormats with DefaultFormats { def parse: Array[Byte] => j.NotHandled = j.NotHandled.parseFrom - def masterInfo(x: j.NotHandled.MasterInfo): NotHandled.MasterInfo = NotHandled.MasterInfo( + private def masterInfo(x: j.NotHandled.LeaderInfo): NotHandled.MasterInfo = NotHandled.MasterInfo( tcpAddress = x.getExternalTcpAddress :: x.getExternalTcpPort, - httpAddress = x.getExternalHttpAddress :: x.getExternalHttpPort, + httpAddress = x.getHttpAddress :: x.getHttpPort, tcpSecureAddress = for { - h <- option(x.hasExternalSecureTcpAddress, x.getExternalSecureTcpAddress) - p <- option(x.hasExternalSecureTcpPort, x.getExternalSecureTcpPort) + h <- option(!x.getExternalTcpAddress.isBlank, x.getExternalSecureTcpAddress) + p <- option(x.getExternalSecureTcpPort != 0, x.getExternalSecureTcpPort) } yield h :: p ) - def masterInfo(x: Option[j.NotHandled.MasterInfo]): NotHandled.MasterInfo = { - require(x.isDefined, "additionalInfo is not provided for NotHandled.NotMaster") - masterInfo(x.get) + private def masterInfo(x: com.google.protobuf.ByteString): NotHandled.MasterInfo = { + masterInfo(j.NotHandled.LeaderInfo.parseFrom(x)) } def fromProto(x: j.NotHandled): NotHandled = { val reason = x.getReason match { case NotReady => NotHandled.NotReady case TooBusy => NotHandled.TooBusy - case NotMaster => NotHandled.NotMaster(masterInfo(x.getAdditionalInfo)) + case NotLeader => NotHandled.NotMaster(masterInfo(x.getAdditionalInfo)) case IsReadOnly => NotHandled.IsReadOnly + case UNRECOGNIZED => NotHandled.Unrecognized } NotHandled(reason) }