diff --git a/quickfixj-core/src/main/java/quickfix/Session.java b/quickfixj-core/src/main/java/quickfix/Session.java index 19bede7b8..9f41b2268 100644 --- a/quickfixj-core/src/main/java/quickfix/Session.java +++ b/quickfixj-core/src/main/java/quickfix/Session.java @@ -2352,7 +2352,7 @@ private void resendMessages(Message receivedMessage, int beginSeqNo, int endSeqN state.get(beginSeqNo, endSeqNo, messages); } catch (final IOException e) { if (forceResendWhenCorruptedStore) { - LOG.error("Cannot read messages from stores, resend HeartBeats", e); + getLog().onErrorEvent("Cannot read messages from stores, resend HeartBeats: " + e.getMessage()); for (int i = beginSeqNo; i < endSeqNo; i++) { final Message heartbeat = messageFactory.create(sessionID.getBeginString(), MsgType.HEARTBEAT); @@ -2391,11 +2391,33 @@ private void resendMessages(Message receivedMessage, int beginSeqNo, int endSeqN final String msgType = msg.getHeader().getString(MsgType.FIELD); - if (MessageUtils.isAdminMessage(msgType) && !forceResendWhenCorruptedStore) { - if (begin == 0) { - begin = msgSeqNum; + // Check if message is an admin message + // According to FIX spec, only Reject messages should be resent among admin messages + if (MessageUtils.isAdminMessage(msgType)) { + if (MsgType.REJECT.equals(msgType)) { + // Reject messages should be resent + // Note: We don't call resendApproved() here to avoid calling toApp() on admin messages + initializeResendFields(msg); + if (begin != 0) { + generateSequenceReset(receivedMessage, begin, msgSeqNum); + } + getLog().onEvent("Resending Reject message: " + msgSeqNum); + boolean sent = send(msg.toString()); + if (!sent) { + // Abort resend operation immediately - don't send any more messages + getLog().onWarnEvent("Resending messages aborted."); + return; + } + begin = 0; + appMessageJustSent = true; + } else { + // Other admin messages should NOT be resent, mark for gap fill + if (begin == 0) { + begin = msgSeqNum; + } } } else { + // Application message - resend normally initializeResendFields(msg); if (resendApproved(msg)) { if (begin != 0) { diff --git a/quickfixj-core/src/test/java/quickfix/SessionFactoryTestSupport.java b/quickfixj-core/src/test/java/quickfix/SessionFactoryTestSupport.java index effecd882..70bc140f5 100644 --- a/quickfixj-core/src/test/java/quickfix/SessionFactoryTestSupport.java +++ b/quickfixj-core/src/test/java/quickfix/SessionFactoryTestSupport.java @@ -106,7 +106,7 @@ public static final class Builder { private final boolean rejectInvalidMessage = true; private final boolean rejectMessageOnUnhandledException = false; private final boolean requiresOrigSendingTime = true; - private final boolean forceResendWhenCorruptedStore = false; + private boolean forceResendWhenCorruptedStore = false; private final Set allowedRemoteAddresses = null; private final boolean validateIncomingMessage = true; private final int resendRequestChunkSize = 0; @@ -245,5 +245,10 @@ public Builder setEnableNextExpectedMsgSeqNum(final boolean enableNextExpectedMs this.enableNextExpectedMsgSeqNum = enableNextExpectedMsgSeqNum; return this; } + + public Builder setForceResendWhenCorruptedStore(final boolean forceResendWhenCorruptedStore) { + this.forceResendWhenCorruptedStore = forceResendWhenCorruptedStore; + return this; + } } } diff --git a/quickfixj-core/src/test/java/quickfix/SessionTest.java b/quickfixj-core/src/test/java/quickfix/SessionTest.java index aa393eb06..e42d6497a 100644 --- a/quickfixj-core/src/test/java/quickfix/SessionTest.java +++ b/quickfixj-core/src/test/java/quickfix/SessionTest.java @@ -3239,4 +3239,148 @@ public void testResendAbortsWhenSendReturnsFalse() throws Exception { assertEquals("Only 2 messages should succeed", 2, responder.sentMessages.size()); } } + + // Test for issue #597: Session-level messages should not be resent when ForceResendWhenCorruptedStore is enabled + @Test + public void testResendDoesNotResendSessionLevelMessagesExceptReject() throws Exception { + final UnitTestApplication application = new UnitTestApplication(); + final SessionID sessionID = new SessionID(FixVersions.BEGINSTRING_FIX44, "SENDER", "TARGET"); + + // Create a session with ForceResendWhenCorruptedStore enabled + final Session session = new SessionFactoryTestSupport.Builder() + .setSessionId(sessionID) + .setApplication(application) + .setIsInitiator(false) + .setPersistMessages(true) + .setForceResendWhenCorruptedStore(true) + .build(); + + // Use a capturing responder to track all sent messages + final List sentMessages = new ArrayList<>(); + final Responder capturingResponder = new Responder() { + @Override + public boolean send(String data) { + sentMessages.add(data); + return true; + } + + @Override + public String getRemoteAddress() { + return null; + } + + @Override + public void disconnect() { + } + }; + session.setResponder(capturingResponder); + + try { + // Logon to establish session + logonTo(session); + assertTrue(session.isLoggedOn()); + + // Clear lists after logon + application.clear(); + sentMessages.clear(); + + // Get the message store + final MessageStore messageStore = session.getStore(); + + // Store some messages in the message store: + // Seq 2: Heartbeat (session-level, should NOT be resent) + String heartbeatMsg = "8=FIX.4.4\0019=60\00135=0\00134=2\00149=SENDER\00156=TARGET\001" + + "52=" + UtcTimestampConverter.convert(LocalDateTime.now(ZoneOffset.UTC), UtcTimestampPrecision.MILLIS) + "\00110=000\001"; + messageStore.set(2, heartbeatMsg); + + // Seq 3: Application message (should be resent) + String appMsg1 = "8=FIX.4.4\0019=100\00135=D\00134=3\00149=SENDER\00156=TARGET\001" + + "52=" + UtcTimestampConverter.convert(LocalDateTime.now(ZoneOffset.UTC), UtcTimestampPrecision.MILLIS) + + "\00155=EUR/USD\00154=1\00138=1000000\00140=1\00110=000\001"; + messageStore.set(3, appMsg1); + + // Seq 4: Logout (session-level, should NOT be resent) + String logoutMsg = "8=FIX.4.4\0019=60\00135=5\00134=4\00149=SENDER\00156=TARGET\001" + + "52=" + UtcTimestampConverter.convert(LocalDateTime.now(ZoneOffset.UTC), UtcTimestampPrecision.MILLIS) + "\00110=000\001"; + messageStore.set(4, logoutMsg); + + // Seq 5: Reject (session-level, SHOULD be resent) + String rejectMsg = "8=FIX.4.4\0019=80\00135=3\00134=5\00149=SENDER\00156=TARGET\001" + + "52=" + UtcTimestampConverter.convert(LocalDateTime.now(ZoneOffset.UTC), UtcTimestampPrecision.MILLIS) + + "\00145=100\00158=Invalid message\00110=000\001"; + messageStore.set(5, rejectMsg); + + // Seq 6: Application message (should be resent) + String appMsg2 = "8=FIX.4.4\0019=100\00135=D\00134=6\00149=SENDER\00156=TARGET\001" + + "52=" + UtcTimestampConverter.convert(LocalDateTime.now(ZoneOffset.UTC), UtcTimestampPrecision.MILLIS) + + "\00155=GBP/USD\00154=2\00138=2000000\00140=1\00110=000\001"; + messageStore.set(6, appMsg2); + + // Update the next sender sequence number to 7 + messageStore.setNextSenderMsgSeqNum(7); + + // Receive a ResendRequest for messages 2-6 + final ResendRequest resendRequest = new ResendRequest(); + resendRequest.getHeader().setString(SenderCompID.FIELD, "TARGET"); + resendRequest.getHeader().setString(TargetCompID.FIELD, "SENDER"); + resendRequest.getHeader().setInt(MsgSeqNum.FIELD, 2); + resendRequest.getHeader().setUtcTimeStamp(SendingTime.FIELD, LocalDateTime.now(ZoneOffset.UTC)); + resendRequest.setInt(BeginSeqNo.FIELD, 2); + resendRequest.setInt(EndSeqNo.FIELD, 6); + resendRequest.toString(); // calculate length/checksum + + session.next(resendRequest); + + // Verify expectations: + // 1. toApp() should only be called for application messages (seq 3 and 6) + assertEquals("toApp should be called twice for app messages", 2, application.toAppMessages.size()); + + // 2. Parse all sent messages and verify what was sent + boolean rejectWasResent = false; + boolean heartbeatWasResent = false; + boolean logoutWasResent = false; + int sequenceResetCount = 0; + int appMessageCount = 0; + + for (String sentMsg : sentMessages) { + try { + Message msg = new Message(sentMsg); + String msgType = msg.getHeader().getString(MsgType.FIELD); + int seqNum = msg.getHeader().getInt(MsgSeqNum.FIELD); + + if (msgType.equals(MsgType.SEQUENCE_RESET)) { + sequenceResetCount++; + } else if (msgType.equals(MsgType.REJECT) && seqNum == 5) { + rejectWasResent = true; + // Verify Reject has PossDupFlag + assertTrue("Reject should have PossDupFlag set", + msg.getHeader().isSetField(PossDupFlag.FIELD) && + msg.getHeader().getBoolean(PossDupFlag.FIELD)); + } else if (msgType.equals(MsgType.HEARTBEAT) && seqNum == 2) { + heartbeatWasResent = true; + } else if (msgType.equals(MsgType.LOGOUT) && seqNum == 4) { + logoutWasResent = true; + } else if (msgType.equals("D")) { // NewOrderSingle + appMessageCount++; + // Verify app messages have PossDupFlag + assertTrue("Application messages should have PossDupFlag set", + msg.getHeader().isSetField(PossDupFlag.FIELD) && + msg.getHeader().getBoolean(PossDupFlag.FIELD)); + } + } catch (Exception e) { + // Skip unparseable messages + } + } + + // Verify results + assertTrue("Reject message should have been resent", rejectWasResent); + assertFalse("Heartbeat should NOT have been resent", heartbeatWasResent); + assertFalse("Logout should NOT have been resent", logoutWasResent); + assertEquals("Two application messages should have been resent", 2, appMessageCount); + assertTrue("At least one SequenceReset should be sent for skipped admin messages", sequenceResetCount >= 1); + + } finally { + session.close(); + } + } }