Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
HELP.md
project_overview.md

target/
!.mvn/wrapper/maven-wrapper.jar
!**/src/main/**/target/
Expand Down Expand Up @@ -34,3 +36,13 @@ system-tests/environment/node/node-docker-compose.yml
system-tests/environment/resources/secrets/
!system-tests/environment/resources/secrets/setup-secrets.sh
system-tests/tests/mb-test-cli
system-tests/test-setup/*
system-tests/curl

message-broker-sink-server

# secrets
*.crt
.env*
privatekey*.pem
*secret*.txt
2 changes: 1 addition & 1 deletion dev/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ services:
image: mongo:7.0.5@sha256:fcde2d71bf00b592c9cabab1d7d01defde37d69b3d788c53c3bc7431b6b15de8
ports:
- "17017:27017"
restart: always
restart: unless-stopped
environment:
MONGO_INITDB_DATABASE: "message-broker-db"
TZ: "Europe/Berlin"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,12 +65,7 @@ public Mono<OIDCTokenPair> authenticate() {
.with("secret", clientSecret))
.retrieve()
.onStatus(HttpStatusCode::is5xxServerError,
response -> {
var err = new HubAuthException("could not fetch hub access token");

log.warn("retrying token request after failed attempt", err);
return Mono.error(err);
})
response -> Mono.error(new HubAuthException("could not fetch hub access token")))
.bodyToMono(HubAuthTokenResponse.class)
.flatMap(hubAuthTokenResponse -> {
try {
Expand All @@ -83,12 +78,15 @@ public Mono<OIDCTokenPair> authenticate() {
"trying to fetch access token", e));
}
})
.doOnError(err -> log.error(err.getMessage(), err))
.doOnError(err -> {
log.error("authentication request failed: {}", err.getMessage());
log.debug("authentication request error details", err);
})
.retryWhen(Retry.backoff(retryConfig.maxRetries(), Duration.ofMillis(retryConfig.retryDelayMs()))
.jitter(0.75)
.filter(err -> err instanceof HubAuthException)
.onRetryExhaustedThrow(((retryBackoffSpec, retrySignal) ->
new HubAccessTokenNotObtainable("exhausted maximum retries of '%d'"
.onRetryExhaustedThrow(((retryBackoffSpec,
retrySignal) -> new HubAccessTokenNotObtainable("exhausted maximum retries of '%d'"
.formatted(retryConfig.maxRetries())))));
}

Expand All @@ -102,12 +100,8 @@ public Mono<OIDCTokenPair> refresh(OAuth2Token refreshToken) {
.with("refresh_token", refreshToken.getTokenValue()))
.retrieve()
.onStatus(HttpStatusCode::is5xxServerError,
response -> {
var err = new HubAuthException("could not fetch new access token using refresh token");

log.warn("retrying token refresh request after failed attempt", err);
return Mono.error(err);
})
response -> Mono
.error(new HubAuthException("could not fetch new access token using refresh token")))
.bodyToMono(HubAuthTokenResponse.class)
.flatMap(hubAuthTokenResponse -> {
try {
Expand All @@ -120,12 +114,15 @@ public Mono<OIDCTokenPair> refresh(OAuth2Token refreshToken) {
"trying to refresh access token", e));
}
})
.doOnError(err -> log.error(err.getMessage(), err))
.doOnError(err -> {
log.error("token refresh request failed: {}", err.getMessage());
log.debug("token refresh error details", err);
})
.retryWhen(Retry.backoff(retryConfig.maxRetries(), Duration.ofMillis(retryConfig.retryDelayMs()))
.jitter(0.75)
.filter(err -> err instanceof HubAuthException)
.onRetryExhaustedThrow(((retryBackoffSpec, retrySignal) ->
new HubAccessTokenNotObtainable("exhausted maximum retries of '%d'"
.onRetryExhaustedThrow(((retryBackoffSpec,
retrySignal) -> new HubAccessTokenNotObtainable("exhausted maximum retries of '%d'"
.formatted(retryConfig.maxRetries())))));
}

Expand Down Expand Up @@ -211,7 +208,6 @@ public Instant getExpiresAt() {
}
}


public static Builder builder() {
return new Builder();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ OkHttpClient decoratedSocketBaseClient(@Qualifier("COMMON_JAVA_SSL_CONTEXT") SSL
@Qualifier("COMMON_TRUST_MANAGER_FACTORY") TrustManagerFactory tmf) {

var clientBuilder = new OkHttpClient.Builder()
.readTimeout(1, TimeUnit.MINUTES);
.pingInterval(4, TimeUnit.SECONDS);
decorateClientWithSSLContext(clientBuilder, sslCtx, tmf);
decorateClientWithProxySettings(clientBuilder);

Expand Down Expand Up @@ -146,47 +146,103 @@ public Socket underlyingMessengerSocket(
@Qualifier("HUB_AUTHENTICATOR") OIDCAuthenticator hubAuthenticator,
@Qualifier("HUB_MESSAGE_RECEIVER") MessageReceiver messageReceiver,
@Qualifier("HUB_MESSENGER_UNDERLYING_SOCKET_SECURE_CLIENT") OkHttpClient secureBaseClient) {

URI messengerUri = URI.create(hubMessengerBaseUrl);

String socketHost = messengerUri.getScheme() + "://" + messengerUri.getHost();
if (messengerUri.getPort() != -1) {
socketHost += ":" + messengerUri.getPort();
}
String socketPath = "/socket.io/";
if (messengerUri.getPath() != null && !messengerUri.getPath().isEmpty()
&& !messengerUri.getPath().equals("/")) {
socketPath = messengerUri.getPath() + "/socket.io/";
}

IO.Options options = IO.Options.builder()
.setPath(null)
.setPath(socketPath)
.setAuth(new HashMap<>())
// Configure robust reconnection: infinite attempts with exponential backoff
.setReconnection(true)
.setReconnectionAttempts(Integer.MAX_VALUE)
.setReconnectionDelay(1000) // Start with 1 second delay
.setReconnectionDelayMax(30000) // Max 30 seconds between attempts
.setRandomizationFactor(0.5) // Add jitter to prevent thundering herd
.build();

// this is used for SSL backed connections that need to trust additional certificates
// this is used for SSL backed connections that need to trust additional
// certificates
options.callFactory = secureBaseClient;
options.webSocketFactory = secureBaseClient;

final Socket socket = IO.socket(URI.create(hubMessengerBaseUrl), options);
// socket.io expects the base URL to be without the path
final Socket socket = IO.socket(URI.create(socketHost), options);
log.info("created socket for hub messenger at `{}`", socketHost);

socket.on(Socket.EVENT_CONNECT_ERROR, objects -> {
log.error("cannot connect to hub messenger at `{}`", hubMessengerBaseUrl);
String errorMsg = objects.length > 0 ? objects[0].toString() : "unknown error";
log.error("cannot connect to hub messenger at `{}` - error: {}", hubMessengerBaseUrl, errorMsg);

// we block here since this is a crucial component
var oidcTokenPair = hubAuthenticator.authenticate().block();
if (oidcTokenPair == null) {
throw new RuntimeException("authentication failed - cannot connect to hub messenger at `%s`"
.formatted(hubMessengerBaseUrl));
// Try to refresh the authentication token and reconnect
// IMPORTANT: Do not throw exceptions here - it would kill the EventThread and stop reconnection
try {
var oidcTokenPair = hubAuthenticator.authenticate().block();
if (oidcTokenPair != null) {
options.auth.put("token", oidcTokenPair.accessToken().getTokenValue());
log.info("refreshed authentication token, reconnecting to hub messenger");
socket.connect();
} else {
log.warn("authentication returned null - will retry on next reconnection attempt");
}
} catch (Exception e) {
log.warn("failed to refresh authentication token - will retry on next reconnection attempt: {}",
e.getMessage());
}
options.auth.put("token", oidcTokenPair.accessToken().getTokenValue());

log.info("reconnecting to hub messenger at `{}` with new authentication token", hubMessengerBaseUrl);
socket.connect();
});

socket.on(Socket.EVENT_CONNECT,
objects -> log.info("connected to hub messenger at `{}`", hubMessengerBaseUrl));

socket.io().on(Manager.EVENT_RECONNECT_ATTEMPT,
objects -> {
log.info("trying to reconnect to hub messenger via socket at `{}", hubMessengerBaseUrl);
// we block here since this is a crucial component
var oidcTokenPair = hubAuthenticator.authenticate().block();
if (oidcTokenPair == null) {
throw new RuntimeException("authentication failed - cannot connect to hub messenger at `%s`"
.formatted(hubMessengerBaseUrl));
}
socket.on(Socket.EVENT_DISCONNECT, objects -> {
String reason = objects.length > 0 ? objects[0].toString() : "unknown";
log.warn("disconnected from hub messenger at `{}` - reason: {}", hubMessengerBaseUrl, reason);
});

socket.io().on(Manager.EVENT_RECONNECT_ATTEMPT, objects -> {
int attemptNumber = objects.length > 0 ? (int) objects[0] : -1;
log.info("reconnection attempt #{} to hub messenger at `{}`", attemptNumber, hubMessengerBaseUrl);

// Try to refresh the authentication token before reconnecting
// IMPORTANT: Do not throw exceptions here - it would kill the EventThread and stop reconnection
try {
var oidcTokenPair = hubAuthenticator.authenticate().block();
if (oidcTokenPair != null) {
options.auth.put("token", oidcTokenPair.accessToken().getTokenValue());
});
log.debug("refreshed authentication token for reconnection attempt #{}", attemptNumber);
} else {
log.warn("authentication returned null on attempt #{} - will use existing token", attemptNumber);
}
} catch (Exception e) {
log.warn("failed to refresh authentication token on attempt #{} - will use existing token: {}",
attemptNumber, e.getMessage());
}
});

socket.io().on(Manager.EVENT_RECONNECT, objects -> {
int attemptNumber = objects.length > 0 ? (int) objects[0] : -1;
log.info("successfully reconnected to hub messenger at `{}` after {} attempts",
hubMessengerBaseUrl, attemptNumber);
});

socket.io().on(Manager.EVENT_RECONNECT_ERROR, objects -> {
String errorMsg = objects.length > 0 ? objects[0].toString() : "unknown error";
log.warn("reconnection error to hub messenger at `{}`: {}", hubMessengerBaseUrl, errorMsg);
});

socket.io().on(Manager.EVENT_ERROR, objects -> {
String errorMsg = objects.length > 0 ? objects[0].toString() : "unknown error";
log.error("socket manager error for hub messenger at `{}`: {}", hubMessengerBaseUrl, errorMsg);
});

socket.on(SOCKET_RECEIVE_HUB_MESSAGE_IDENTIFIER, objects -> {
log.debug("processing incoming message");
Expand All @@ -196,6 +252,20 @@ public Socket underlyingMessengerSocket(
}
);

// Get auth token before first connection attempt to avoid an expected initial failure
try {
var oidcTokenPair = hubAuthenticator.authenticate().block();
if (oidcTokenPair != null) {
options.auth.put("token", oidcTokenPair.accessToken().getTokenValue());
log.info("obtained initial authentication token for hub messenger");
} else {
log.warn("initial authentication returned null - will authenticate on first connection error");
}
} catch (Exception e) {
log.warn("failed to obtain initial authentication token - will authenticate on first connection error: {}",
e.getMessage());
}

socket.connect();
return socket;
}
Expand Down
Loading