From 89994afbbef25134c5d36b163cd4f5c0f142b4c6 Mon Sep 17 00:00:00 2001 From: Max Schaible Date: Fri, 28 Nov 2025 14:08:51 +0100 Subject: [PATCH 1/4] improve socket.io reconnection to hub --- .../common/hub/auth/HubOIDCAuthenticator.java | 24 +++- .../message/MessageSpringConfig.java | 116 ++++++++++++++---- 2 files changed, 115 insertions(+), 25 deletions(-) diff --git a/src/main/java/de/privateaim/node_message_broker/common/hub/auth/HubOIDCAuthenticator.java b/src/main/java/de/privateaim/node_message_broker/common/hub/auth/HubOIDCAuthenticator.java index 5b97603..a4502ea 100644 --- a/src/main/java/de/privateaim/node_message_broker/common/hub/auth/HubOIDCAuthenticator.java +++ b/src/main/java/de/privateaim/node_message_broker/common/hub/auth/HubOIDCAuthenticator.java @@ -83,7 +83,13 @@ public Mono authenticate() { "trying to fetch access token", e)); } }) - .doOnError(err -> log.error(err.getMessage(), err)) + .doOnError(err -> { + if (isNetworkError(err)) { + log.warn("authentication request failed due to network issue: {}", err.getMessage()); + } else { + log.error(err.getMessage(), err); + } + }) .retryWhen(Retry.backoff(retryConfig.maxRetries(), Duration.ofMillis(retryConfig.retryDelayMs())) .jitter(0.75) .filter(err -> err instanceof HubAuthException) @@ -120,7 +126,13 @@ public Mono refresh(OAuth2Token refreshToken) { "trying to refresh access token", e)); } }) - .doOnError(err -> log.error(err.getMessage(), err)) + .doOnError(err -> { + if (isNetworkError(err)) { + log.warn("token refresh request failed due to network issue: {}", err.getMessage()); + } else { + log.error(err.getMessage(), err); + } + }) .retryWhen(Retry.backoff(retryConfig.maxRetries(), Duration.ofMillis(retryConfig.retryDelayMs())) .jitter(0.75) .filter(err -> err instanceof HubAuthException) @@ -129,6 +141,14 @@ public Mono refresh(OAuth2Token refreshToken) { .formatted(retryConfig.maxRetries()))))); } + private boolean isNetworkError(Throwable err) { + // Check if this is a network-related error that doesn't need a full stack trace + return err instanceof org.springframework.web.reactive.function.client.WebClientRequestException + || err instanceof java.net.ConnectException + || err instanceof java.net.SocketException + || err instanceof java.io.IOException; + } + private OAuth2AccessToken parseAccessToken(@NonNull String tokenValue) { try { var jwt = decodeJwt(tokenValue); diff --git a/src/main/java/de/privateaim/node_message_broker/message/MessageSpringConfig.java b/src/main/java/de/privateaim/node_message_broker/message/MessageSpringConfig.java index 6a62250..72f224d 100644 --- a/src/main/java/de/privateaim/node_message_broker/message/MessageSpringConfig.java +++ b/src/main/java/de/privateaim/node_message_broker/message/MessageSpringConfig.java @@ -146,47 +146,103 @@ public Socket underlyingMessengerSocket( @Qualifier("HUB_AUTHENTICATOR") OIDCAuthenticator hubAuthenticator, @Qualifier("HUB_MESSAGE_RECEIVER") MessageReceiver messageReceiver, @Qualifier("HUB_MESSENGER_UNDERLYING_SOCKET_SECURE_CLIENT") OkHttpClient secureBaseClient) { + + URI messengerUri = URI.create(hubMessengerBaseUrl); + + String socketHost = messengerUri.getScheme() + "://" + messengerUri.getHost(); + if (messengerUri.getPort() != -1) { + socketHost += ":" + messengerUri.getPort(); + } + String socketPath = "/socket.io/"; + if (messengerUri.getPath() != null && !messengerUri.getPath().isEmpty() + && !messengerUri.getPath().equals("/")) { + socketPath = messengerUri.getPath() + "/socket.io/"; + } + IO.Options options = IO.Options.builder() - .setPath(null) + .setPath(socketPath) .setAuth(new HashMap<>()) + // Configure robust reconnection: infinite attempts with exponential backoff + .setReconnection(true) + .setReconnectionAttempts(Integer.MAX_VALUE) + .setReconnectionDelay(1000) // Start with 1 second delay + .setReconnectionDelayMax(30000) // Max 30 seconds between attempts + .setRandomizationFactor(0.5) // Add jitter to prevent thundering herd .build(); - // this is used for SSL backed connections that need to trust additional certificates + // this is used for SSL backed connections that need to trust additional + // certificates options.callFactory = secureBaseClient; options.webSocketFactory = secureBaseClient; - final Socket socket = IO.socket(URI.create(hubMessengerBaseUrl), options); + // socket.io expects the base URL to be without the path + final Socket socket = IO.socket(URI.create(socketHost), options); + log.info("created socket for hub messenger at `{}`", socketHost); socket.on(Socket.EVENT_CONNECT_ERROR, objects -> { - log.error("cannot connect to hub messenger at `{}`", hubMessengerBaseUrl); + String errorMsg = objects.length > 0 ? objects[0].toString() : "unknown error"; + log.error("cannot connect to hub messenger at `{}` - error: {}", hubMessengerBaseUrl, errorMsg); - // we block here since this is a crucial component - var oidcTokenPair = hubAuthenticator.authenticate().block(); - if (oidcTokenPair == null) { - throw new RuntimeException("authentication failed - cannot connect to hub messenger at `%s`" - .formatted(hubMessengerBaseUrl)); + // Try to refresh the authentication token and reconnect + // IMPORTANT: Do not throw exceptions here - it would kill the EventThread and stop reconnection + try { + var oidcTokenPair = hubAuthenticator.authenticate().block(); + if (oidcTokenPair != null) { + options.auth.put("token", oidcTokenPair.accessToken().getTokenValue()); + log.info("refreshed authentication token, reconnecting to hub messenger"); + socket.connect(); + } else { + log.warn("authentication returned null - will retry on next reconnection attempt"); + } + } catch (Exception e) { + log.warn("failed to refresh authentication token - will retry on next reconnection attempt: {}", + e.getMessage()); } - options.auth.put("token", oidcTokenPair.accessToken().getTokenValue()); - - log.info("reconnecting to hub messenger at `{}` with new authentication token", hubMessengerBaseUrl); - socket.connect(); }); socket.on(Socket.EVENT_CONNECT, objects -> log.info("connected to hub messenger at `{}`", hubMessengerBaseUrl)); - socket.io().on(Manager.EVENT_RECONNECT_ATTEMPT, - objects -> { - log.info("trying to reconnect to hub messenger via socket at `{}", hubMessengerBaseUrl); - // we block here since this is a crucial component - var oidcTokenPair = hubAuthenticator.authenticate().block(); - if (oidcTokenPair == null) { - throw new RuntimeException("authentication failed - cannot connect to hub messenger at `%s`" - .formatted(hubMessengerBaseUrl)); - } + socket.on(Socket.EVENT_DISCONNECT, objects -> { + String reason = objects.length > 0 ? objects[0].toString() : "unknown"; + log.warn("disconnected from hub messenger at `{}` - reason: {}", hubMessengerBaseUrl, reason); + }); + socket.io().on(Manager.EVENT_RECONNECT_ATTEMPT, objects -> { + int attemptNumber = objects.length > 0 ? (int) objects[0] : -1; + log.info("reconnection attempt #{} to hub messenger at `{}`", attemptNumber, hubMessengerBaseUrl); + + // Try to refresh the authentication token before reconnecting + // IMPORTANT: Do not throw exceptions here - it would kill the EventThread and stop reconnection + try { + var oidcTokenPair = hubAuthenticator.authenticate().block(); + if (oidcTokenPair != null) { options.auth.put("token", oidcTokenPair.accessToken().getTokenValue()); - }); + log.debug("refreshed authentication token for reconnection attempt #{}", attemptNumber); + } else { + log.warn("authentication returned null on attempt #{} - will use existing token", attemptNumber); + } + } catch (Exception e) { + log.warn("failed to refresh authentication token on attempt #{} - will use existing token: {}", + attemptNumber, e.getMessage()); + } + }); + + socket.io().on(Manager.EVENT_RECONNECT, objects -> { + int attemptNumber = objects.length > 0 ? (int) objects[0] : -1; + log.info("successfully reconnected to hub messenger at `{}` after {} attempts", + hubMessengerBaseUrl, attemptNumber); + }); + + socket.io().on(Manager.EVENT_RECONNECT_ERROR, objects -> { + String errorMsg = objects.length > 0 ? objects[0].toString() : "unknown error"; + log.warn("reconnection error to hub messenger at `{}`: {}", hubMessengerBaseUrl, errorMsg); + }); + + socket.io().on(Manager.EVENT_ERROR, objects -> { + String errorMsg = objects.length > 0 ? objects[0].toString() : "unknown error"; + log.error("socket manager error for hub messenger at `{}`: {}", hubMessengerBaseUrl, errorMsg); + }); socket.on(SOCKET_RECEIVE_HUB_MESSAGE_IDENTIFIER, objects -> { log.debug("processing incoming message"); @@ -196,6 +252,20 @@ public Socket underlyingMessengerSocket( } ); + // Get auth token before first connection attempt to avoid an expected initial failure + try { + var oidcTokenPair = hubAuthenticator.authenticate().block(); + if (oidcTokenPair != null) { + options.auth.put("token", oidcTokenPair.accessToken().getTokenValue()); + log.info("obtained initial authentication token for hub messenger"); + } else { + log.warn("initial authentication returned null - will authenticate on first connection error"); + } + } catch (Exception e) { + log.warn("failed to obtain initial authentication token - will authenticate on first connection error: {}", + e.getMessage()); + } + socket.connect(); return socket; } From 7fb2b165dde26781a2ca1b557a8983d95a329be5 Mon Sep 17 00:00:00 2001 From: Max Schaible Date: Mon, 8 Dec 2025 16:08:44 +0100 Subject: [PATCH 2/4] little adjustments for local development --- .gitignore | 12 ++++++++++++ dev/docker-compose.yml | 2 +- 2 files changed, 13 insertions(+), 1 deletion(-) diff --git a/.gitignore b/.gitignore index 103423a..914de3f 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,6 @@ HELP.md +project_overview.md + target/ !.mvn/wrapper/maven-wrapper.jar !**/src/main/**/target/ @@ -34,3 +36,13 @@ system-tests/environment/node/node-docker-compose.yml system-tests/environment/resources/secrets/ !system-tests/environment/resources/secrets/setup-secrets.sh system-tests/tests/mb-test-cli +system-tests/test-setup/* +system-tests/curl + +message-broker-sink-server + +# secrets +*.crt +.env* +privatekey*.pem +*secret*.txt diff --git a/dev/docker-compose.yml b/dev/docker-compose.yml index f737ddd..03abd28 100644 --- a/dev/docker-compose.yml +++ b/dev/docker-compose.yml @@ -4,7 +4,7 @@ services: image: mongo:7.0.5@sha256:fcde2d71bf00b592c9cabab1d7d01defde37d69b3d788c53c3bc7431b6b15de8 ports: - "17017:27017" - restart: always + restart: unless-stopped environment: MONGO_INITDB_DATABASE: "message-broker-db" TZ: "Europe/Berlin" From 785b57c15685b6b0c2f9e0a87715d2a93d97d87b Mon Sep 17 00:00:00 2001 From: Max Schaible Date: Wed, 17 Dec 2025 12:30:03 +0100 Subject: [PATCH 3/4] simplilfy error handling for reconnection errors --- .../common/hub/auth/HubOIDCAuthenticator.java | 46 +++++-------------- 1 file changed, 11 insertions(+), 35 deletions(-) diff --git a/src/main/java/de/privateaim/node_message_broker/common/hub/auth/HubOIDCAuthenticator.java b/src/main/java/de/privateaim/node_message_broker/common/hub/auth/HubOIDCAuthenticator.java index a4502ea..b4f3c5f 100644 --- a/src/main/java/de/privateaim/node_message_broker/common/hub/auth/HubOIDCAuthenticator.java +++ b/src/main/java/de/privateaim/node_message_broker/common/hub/auth/HubOIDCAuthenticator.java @@ -65,12 +65,7 @@ public Mono authenticate() { .with("secret", clientSecret)) .retrieve() .onStatus(HttpStatusCode::is5xxServerError, - response -> { - var err = new HubAuthException("could not fetch hub access token"); - - log.warn("retrying token request after failed attempt", err); - return Mono.error(err); - }) + response -> Mono.error(new HubAuthException("could not fetch hub access token"))) .bodyToMono(HubAuthTokenResponse.class) .flatMap(hubAuthTokenResponse -> { try { @@ -84,17 +79,14 @@ public Mono authenticate() { } }) .doOnError(err -> { - if (isNetworkError(err)) { - log.warn("authentication request failed due to network issue: {}", err.getMessage()); - } else { - log.error(err.getMessage(), err); - } + log.error("authentication request failed: {}", err.getMessage()); + log.debug("authentication request error details", err); }) .retryWhen(Retry.backoff(retryConfig.maxRetries(), Duration.ofMillis(retryConfig.retryDelayMs())) .jitter(0.75) .filter(err -> err instanceof HubAuthException) - .onRetryExhaustedThrow(((retryBackoffSpec, retrySignal) -> - new HubAccessTokenNotObtainable("exhausted maximum retries of '%d'" + .onRetryExhaustedThrow(((retryBackoffSpec, + retrySignal) -> new HubAccessTokenNotObtainable("exhausted maximum retries of '%d'" .formatted(retryConfig.maxRetries()))))); } @@ -108,12 +100,8 @@ public Mono refresh(OAuth2Token refreshToken) { .with("refresh_token", refreshToken.getTokenValue())) .retrieve() .onStatus(HttpStatusCode::is5xxServerError, - response -> { - var err = new HubAuthException("could not fetch new access token using refresh token"); - - log.warn("retrying token refresh request after failed attempt", err); - return Mono.error(err); - }) + response -> Mono + .error(new HubAuthException("could not fetch new access token using refresh token"))) .bodyToMono(HubAuthTokenResponse.class) .flatMap(hubAuthTokenResponse -> { try { @@ -127,28 +115,17 @@ public Mono refresh(OAuth2Token refreshToken) { } }) .doOnError(err -> { - if (isNetworkError(err)) { - log.warn("token refresh request failed due to network issue: {}", err.getMessage()); - } else { - log.error(err.getMessage(), err); - } + log.error("token refresh request failed: {}", err.getMessage()); + log.debug("token refresh error details", err); }) .retryWhen(Retry.backoff(retryConfig.maxRetries(), Duration.ofMillis(retryConfig.retryDelayMs())) .jitter(0.75) .filter(err -> err instanceof HubAuthException) - .onRetryExhaustedThrow(((retryBackoffSpec, retrySignal) -> - new HubAccessTokenNotObtainable("exhausted maximum retries of '%d'" + .onRetryExhaustedThrow(((retryBackoffSpec, + retrySignal) -> new HubAccessTokenNotObtainable("exhausted maximum retries of '%d'" .formatted(retryConfig.maxRetries()))))); } - private boolean isNetworkError(Throwable err) { - // Check if this is a network-related error that doesn't need a full stack trace - return err instanceof org.springframework.web.reactive.function.client.WebClientRequestException - || err instanceof java.net.ConnectException - || err instanceof java.net.SocketException - || err instanceof java.io.IOException; - } - private OAuth2AccessToken parseAccessToken(@NonNull String tokenValue) { try { var jwt = decodeJwt(tokenValue); @@ -231,7 +208,6 @@ public Instant getExpiresAt() { } } - public static Builder builder() { return new Builder(); } From 4c15419eefdc9404f6fca03acc90e7538829fd70 Mon Sep 17 00:00:00 2001 From: Max Schaible Date: Wed, 17 Dec 2025 13:09:58 +0100 Subject: [PATCH 4/4] detect broken tcp connection more quickly --- .../node_message_broker/message/MessageSpringConfig.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/de/privateaim/node_message_broker/message/MessageSpringConfig.java b/src/main/java/de/privateaim/node_message_broker/message/MessageSpringConfig.java index 72f224d..0205450 100644 --- a/src/main/java/de/privateaim/node_message_broker/message/MessageSpringConfig.java +++ b/src/main/java/de/privateaim/node_message_broker/message/MessageSpringConfig.java @@ -90,7 +90,7 @@ OkHttpClient decoratedSocketBaseClient(@Qualifier("COMMON_JAVA_SSL_CONTEXT") SSL @Qualifier("COMMON_TRUST_MANAGER_FACTORY") TrustManagerFactory tmf) { var clientBuilder = new OkHttpClient.Builder() - .readTimeout(1, TimeUnit.MINUTES); + .pingInterval(4, TimeUnit.SECONDS); decorateClientWithSSLContext(clientBuilder, sslCtx, tmf); decorateClientWithProxySettings(clientBuilder);