Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 26 additions & 4 deletions quickfixj-core/src/main/java/quickfix/Session.java
Original file line number Diff line number Diff line change
Expand Up @@ -2352,7 +2352,7 @@ private void resendMessages(Message receivedMessage, int beginSeqNo, int endSeqN
state.get(beginSeqNo, endSeqNo, messages);
} catch (final IOException e) {
if (forceResendWhenCorruptedStore) {
LOG.error("Cannot read messages from stores, resend HeartBeats", e);
getLog().onErrorEvent("Cannot read messages from stores, resend HeartBeats: " + e.getMessage());
for (int i = beginSeqNo; i < endSeqNo; i++) {
final Message heartbeat = messageFactory.create(sessionID.getBeginString(),
MsgType.HEARTBEAT);
Expand Down Expand Up @@ -2391,11 +2391,33 @@ private void resendMessages(Message receivedMessage, int beginSeqNo, int endSeqN

final String msgType = msg.getHeader().getString(MsgType.FIELD);

if (MessageUtils.isAdminMessage(msgType) && !forceResendWhenCorruptedStore) {
if (begin == 0) {
begin = msgSeqNum;
// Check if message is an admin message
// According to FIX spec, only Reject messages should be resent among admin messages
if (MessageUtils.isAdminMessage(msgType)) {
if (MsgType.REJECT.equals(msgType)) {
// Reject messages should be resent
// Note: We don't call resendApproved() here to avoid calling toApp() on admin messages
initializeResendFields(msg);
if (begin != 0) {
generateSequenceReset(receivedMessage, begin, msgSeqNum);
}
getLog().onEvent("Resending Reject message: " + msgSeqNum);
boolean sent = send(msg.toString());
if (!sent) {
// Abort resend operation immediately - don't send any more messages
getLog().onWarnEvent("Resending messages aborted.");
return;
}
begin = 0;
appMessageJustSent = true;
} else {
// Other admin messages should NOT be resent, mark for gap fill
if (begin == 0) {
begin = msgSeqNum;
}
}
} else {
// Application message - resend normally
initializeResendFields(msg);
if (resendApproved(msg)) {
if (begin != 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ public static final class Builder {
private final boolean rejectInvalidMessage = true;
private final boolean rejectMessageOnUnhandledException = false;
private final boolean requiresOrigSendingTime = true;
private final boolean forceResendWhenCorruptedStore = false;
private boolean forceResendWhenCorruptedStore = false;
private final Set<InetAddress> allowedRemoteAddresses = null;
private final boolean validateIncomingMessage = true;
private final int resendRequestChunkSize = 0;
Expand Down Expand Up @@ -245,5 +245,10 @@ public Builder setEnableNextExpectedMsgSeqNum(final boolean enableNextExpectedMs
this.enableNextExpectedMsgSeqNum = enableNextExpectedMsgSeqNum;
return this;
}

public Builder setForceResendWhenCorruptedStore(final boolean forceResendWhenCorruptedStore) {
this.forceResendWhenCorruptedStore = forceResendWhenCorruptedStore;
return this;
}
}
}
144 changes: 144 additions & 0 deletions quickfixj-core/src/test/java/quickfix/SessionTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -3239,4 +3239,148 @@ public void testResendAbortsWhenSendReturnsFalse() throws Exception {
assertEquals("Only 2 messages should succeed", 2, responder.sentMessages.size());
}
}

// Test for issue #597: Session-level messages should not be resent when ForceResendWhenCorruptedStore is enabled
@Test
public void testResendDoesNotResendSessionLevelMessagesExceptReject() throws Exception {
final UnitTestApplication application = new UnitTestApplication();
final SessionID sessionID = new SessionID(FixVersions.BEGINSTRING_FIX44, "SENDER", "TARGET");

// Create a session with ForceResendWhenCorruptedStore enabled
final Session session = new SessionFactoryTestSupport.Builder()
.setSessionId(sessionID)
.setApplication(application)
.setIsInitiator(false)
.setPersistMessages(true)
.setForceResendWhenCorruptedStore(true)
.build();

// Use a capturing responder to track all sent messages
final List<String> sentMessages = new ArrayList<>();
final Responder capturingResponder = new Responder() {
@Override
public boolean send(String data) {
sentMessages.add(data);
return true;
}

@Override
public String getRemoteAddress() {
return null;
}

@Override
public void disconnect() {
}
};
session.setResponder(capturingResponder);

try {
// Logon to establish session
logonTo(session);
assertTrue(session.isLoggedOn());

// Clear lists after logon
application.clear();
sentMessages.clear();

// Get the message store
final MessageStore messageStore = session.getStore();

// Store some messages in the message store:
// Seq 2: Heartbeat (session-level, should NOT be resent)
String heartbeatMsg = "8=FIX.4.4\0019=60\00135=0\00134=2\00149=SENDER\00156=TARGET\001" +
"52=" + UtcTimestampConverter.convert(LocalDateTime.now(ZoneOffset.UTC), UtcTimestampPrecision.MILLIS) + "\00110=000\001";
messageStore.set(2, heartbeatMsg);

// Seq 3: Application message (should be resent)
String appMsg1 = "8=FIX.4.4\0019=100\00135=D\00134=3\00149=SENDER\00156=TARGET\001" +
"52=" + UtcTimestampConverter.convert(LocalDateTime.now(ZoneOffset.UTC), UtcTimestampPrecision.MILLIS) +
"\00155=EUR/USD\00154=1\00138=1000000\00140=1\00110=000\001";
messageStore.set(3, appMsg1);

// Seq 4: Logout (session-level, should NOT be resent)
String logoutMsg = "8=FIX.4.4\0019=60\00135=5\00134=4\00149=SENDER\00156=TARGET\001" +
"52=" + UtcTimestampConverter.convert(LocalDateTime.now(ZoneOffset.UTC), UtcTimestampPrecision.MILLIS) + "\00110=000\001";
messageStore.set(4, logoutMsg);

// Seq 5: Reject (session-level, SHOULD be resent)
String rejectMsg = "8=FIX.4.4\0019=80\00135=3\00134=5\00149=SENDER\00156=TARGET\001" +
"52=" + UtcTimestampConverter.convert(LocalDateTime.now(ZoneOffset.UTC), UtcTimestampPrecision.MILLIS) +
"\00145=100\00158=Invalid message\00110=000\001";
messageStore.set(5, rejectMsg);

// Seq 6: Application message (should be resent)
String appMsg2 = "8=FIX.4.4\0019=100\00135=D\00134=6\00149=SENDER\00156=TARGET\001" +
"52=" + UtcTimestampConverter.convert(LocalDateTime.now(ZoneOffset.UTC), UtcTimestampPrecision.MILLIS) +
"\00155=GBP/USD\00154=2\00138=2000000\00140=1\00110=000\001";
messageStore.set(6, appMsg2);

// Update the next sender sequence number to 7
messageStore.setNextSenderMsgSeqNum(7);

// Receive a ResendRequest for messages 2-6
final ResendRequest resendRequest = new ResendRequest();
resendRequest.getHeader().setString(SenderCompID.FIELD, "TARGET");
resendRequest.getHeader().setString(TargetCompID.FIELD, "SENDER");
resendRequest.getHeader().setInt(MsgSeqNum.FIELD, 2);
resendRequest.getHeader().setUtcTimeStamp(SendingTime.FIELD, LocalDateTime.now(ZoneOffset.UTC));
resendRequest.setInt(BeginSeqNo.FIELD, 2);
resendRequest.setInt(EndSeqNo.FIELD, 6);
resendRequest.toString(); // calculate length/checksum

session.next(resendRequest);

// Verify expectations:
// 1. toApp() should only be called for application messages (seq 3 and 6)
assertEquals("toApp should be called twice for app messages", 2, application.toAppMessages.size());

// 2. Parse all sent messages and verify what was sent
boolean rejectWasResent = false;
boolean heartbeatWasResent = false;
boolean logoutWasResent = false;
int sequenceResetCount = 0;
int appMessageCount = 0;

for (String sentMsg : sentMessages) {
try {
Message msg = new Message(sentMsg);
String msgType = msg.getHeader().getString(MsgType.FIELD);
int seqNum = msg.getHeader().getInt(MsgSeqNum.FIELD);

if (msgType.equals(MsgType.SEQUENCE_RESET)) {
sequenceResetCount++;
} else if (msgType.equals(MsgType.REJECT) && seqNum == 5) {
rejectWasResent = true;
// Verify Reject has PossDupFlag
assertTrue("Reject should have PossDupFlag set",
msg.getHeader().isSetField(PossDupFlag.FIELD) &&
msg.getHeader().getBoolean(PossDupFlag.FIELD));
} else if (msgType.equals(MsgType.HEARTBEAT) && seqNum == 2) {
heartbeatWasResent = true;
} else if (msgType.equals(MsgType.LOGOUT) && seqNum == 4) {
logoutWasResent = true;
} else if (msgType.equals("D")) { // NewOrderSingle
appMessageCount++;
// Verify app messages have PossDupFlag
assertTrue("Application messages should have PossDupFlag set",
msg.getHeader().isSetField(PossDupFlag.FIELD) &&
msg.getHeader().getBoolean(PossDupFlag.FIELD));
}
} catch (Exception e) {
// Skip unparseable messages
}
}

// Verify results
assertTrue("Reject message should have been resent", rejectWasResent);
assertFalse("Heartbeat should NOT have been resent", heartbeatWasResent);
assertFalse("Logout should NOT have been resent", logoutWasResent);
assertEquals("Two application messages should have been resent", 2, appMessageCount);
assertTrue("At least one SequenceReset should be sent for skipped admin messages", sequenceResetCount >= 1);

} finally {
session.close();
}
}
}
Loading