diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/reliable/ExceptionIfDroppedReplayStrategy.java b/activemq-core/src/main/java/org/apache/activemq/transport/reliable/ExceptionIfDroppedReplayStrategy.java index 84872cb5da..7649c1c514 100644 --- a/activemq-core/src/main/java/org/apache/activemq/transport/reliable/ExceptionIfDroppedReplayStrategy.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/reliable/ExceptionIfDroppedReplayStrategy.java @@ -28,11 +28,15 @@ public class ExceptionIfDroppedReplayStrategy implements ReplayStrategy { private int maximumDifference = 5; - public void onDroppedPackets(ReliableTransport transport, int expectedCounter, int actualCounter) throws IOException { - long count = Math.abs(actualCounter - expectedCounter); + public boolean onDroppedPackets(ReliableTransport transport, int expectedCounter, int actualCounter) throws IOException { + int difference = actualCounter - expectedCounter; + long count = Math.abs(difference); if (count > maximumDifference) { throw new IOException("Packets dropped on: " + transport + " count: " + count + " expected: " + expectedCounter + " but was: " + actualCounter); } + + // lets discard old commands + return difference > 0; } public void onReceivedPacket(ReliableTransport transport, long expectedCounter) { diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/reliable/ReliableTransport.java b/activemq-core/src/main/java/org/apache/activemq/transport/reliable/ReliableTransport.java index 05dbcbf2e4..72db68dfec 100644 --- a/activemq-core/src/main/java/org/apache/activemq/transport/reliable/ReliableTransport.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/reliable/ReliableTransport.java @@ -47,11 +47,13 @@ public class ReliableTransport extends TransportFilter { if (!valid) { synchronized (commands) { - // lets add it to the list for later on - commands.add(command); - try { - replayStrategy.onDroppedPackets(this, expectedCounter, actualCounter); + boolean keep = replayStrategy.onDroppedPackets(this, expectedCounter, actualCounter); + + if (keep) { + // lets add it to the list for later on + commands.add(command); + } } catch (IOException e) { getTransportListener().onException(e); @@ -91,6 +93,23 @@ public class ReliableTransport extends TransportFilter { } } + public int getBufferedCommandCount() { + synchronized (commands) { + return commands.size(); + } + } + + public int getExpectedCounter() { + return expectedCounter; + } + + /** + * This property should never really be set - but is mutable primarily for test cases + */ + public void setExpectedCounter(int expectedCounter) { + this.expectedCounter = expectedCounter; + } + public String toString() { return next.toString(); } diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/reliable/ReplayStrategy.java b/activemq-core/src/main/java/org/apache/activemq/transport/reliable/ReplayStrategy.java index 70d55e0c65..720925b591 100644 --- a/activemq-core/src/main/java/org/apache/activemq/transport/reliable/ReplayStrategy.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/reliable/ReplayStrategy.java @@ -26,7 +26,15 @@ import java.io.IOException; */ public interface ReplayStrategy { - void onDroppedPackets(ReliableTransport transport, int expectedCounter, int actualCounter) throws IOException; + /** + * Deals with a dropped packet. + * + * @param transport the transport on which the packet was dropped + * @param expectedCounter the expected command counter + * @param actualCounter the actual command counter + * @return true if the command should be buffered or false if it should be discarded + */ + boolean onDroppedPackets(ReliableTransport transport, int expectedCounter, int actualCounter) throws IOException; void onReceivedPacket(ReliableTransport transport, long expectedCounter); diff --git a/activemq-core/src/test/java/org/apache/activemq/transport/ReliableTransportTest.java b/activemq-core/src/test/java/org/apache/activemq/transport/ReliableTransportTest.java index 5bd275a11c..f7a66c4ead 100644 --- a/activemq-core/src/test/java/org/apache/activemq/transport/ReliableTransportTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/transport/ReliableTransportTest.java @@ -31,7 +31,7 @@ import junit.framework.TestCase; */ public class ReliableTransportTest extends TestCase { - protected TransportFilter transport; + protected ReliableTransport transport; protected StubTransportListener listener = new StubTransportListener(); protected ReplayStrategy replayStrategy; @@ -41,12 +41,40 @@ public class ReliableTransportTest extends TestCase { sendStreamOfCommands(sequenceNumbers, true); } + public void testValidWrapAroundPackets() throws Exception { + int[] sequenceNumbers = new int[10]; + + int value = Integer.MAX_VALUE - 3; + transport.setExpectedCounter(value); + + for (int i = 0; i < 10; i++) { + System.out.println("command: " + i + " = " + value); + sequenceNumbers[i] = value++; + } + + sendStreamOfCommands(sequenceNumbers, true); + } + public void testDuplicatePacketsDropped() throws Exception { int[] sequenceNumbers = { 1, 2, 2, 3, 4, 5, 6, 7 }; sendStreamOfCommands(sequenceNumbers, true, 7); } + public void testOldDuplicatePacketsDropped() throws Exception { + int[] sequenceNumbers = { 1, 2, 3, 4, 5, 2, 6, 7 }; + + sendStreamOfCommands(sequenceNumbers, true, 7); + } + + public void testOldDuplicatePacketsDroppedUsingNegativeCounters() throws Exception { + int[] sequenceNumbers = { -3, -1, -3, -2, -1, 0, 1, -1, 3, 2, 0, 2, 4 }; + + transport.setExpectedCounter(-3); + + sendStreamOfCommands(sequenceNumbers, true, 8); + } + public void testWrongOrderOfPackets() throws Exception { int[] sequenceNumbers = { 4, 3, 1, 5, 2, 7, 6, 8, 10, 9 }; @@ -83,13 +111,14 @@ public class ReliableTransportTest extends TestCase { fail("Caught exception: " + e); } assertEquals("number of messages received", expectedCount, commands.size()); - } + + assertEquals("Should have no buffered commands", 0, transport.getBufferedCommandCount()); + } else { assertTrue("Should have received an exception!", exceptions.size() > 0); Exception e = (Exception) exceptions.remove(); System.out.println("Caught expected response: " + e); } - } protected void setUp() throws Exception {