updated the reliable transport to be able to deal with out of order messages within a certain range and discarding duplicates within a range

git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@385480 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
James Strachan 2006-03-13 09:52:15 +00:00
parent e6954f1fd4
commit 65fdf0762a
4 changed files with 70 additions and 10 deletions

View File

@ -28,11 +28,15 @@ public class ExceptionIfDroppedReplayStrategy implements ReplayStrategy {
private int maximumDifference = 5; private int maximumDifference = 5;
public void onDroppedPackets(ReliableTransport transport, int expectedCounter, int actualCounter) throws IOException { public boolean onDroppedPackets(ReliableTransport transport, int expectedCounter, int actualCounter) throws IOException {
long count = Math.abs(actualCounter - expectedCounter); int difference = actualCounter - expectedCounter;
long count = Math.abs(difference);
if (count > maximumDifference) { if (count > maximumDifference) {
throw new IOException("Packets dropped on: " + transport + " count: " + count + " expected: " + expectedCounter + " but was: " + actualCounter); throw new IOException("Packets dropped on: " + transport + " count: " + count + " expected: " + expectedCounter + " but was: " + actualCounter);
} }
// lets discard old commands
return difference > 0;
} }
public void onReceivedPacket(ReliableTransport transport, long expectedCounter) { public void onReceivedPacket(ReliableTransport transport, long expectedCounter) {

View File

@ -47,11 +47,13 @@ public class ReliableTransport extends TransportFilter {
if (!valid) { if (!valid) {
synchronized (commands) { synchronized (commands) {
// lets add it to the list for later on
commands.add(command);
try { try {
replayStrategy.onDroppedPackets(this, expectedCounter, actualCounter); boolean keep = replayStrategy.onDroppedPackets(this, expectedCounter, actualCounter);
if (keep) {
// lets add it to the list for later on
commands.add(command);
}
} }
catch (IOException e) { catch (IOException e) {
getTransportListener().onException(e); getTransportListener().onException(e);
@ -91,6 +93,23 @@ public class ReliableTransport extends TransportFilter {
} }
} }
public int getBufferedCommandCount() {
synchronized (commands) {
return commands.size();
}
}
public int getExpectedCounter() {
return expectedCounter;
}
/**
* This property should never really be set - but is mutable primarily for test cases
*/
public void setExpectedCounter(int expectedCounter) {
this.expectedCounter = expectedCounter;
}
public String toString() { public String toString() {
return next.toString(); return next.toString();
} }

View File

@ -26,7 +26,15 @@ import java.io.IOException;
*/ */
public interface ReplayStrategy { public interface ReplayStrategy {
void onDroppedPackets(ReliableTransport transport, int expectedCounter, int actualCounter) throws IOException; /**
* Deals with a dropped packet.
*
* @param transport the transport on which the packet was dropped
* @param expectedCounter the expected command counter
* @param actualCounter the actual command counter
* @return true if the command should be buffered or false if it should be discarded
*/
boolean onDroppedPackets(ReliableTransport transport, int expectedCounter, int actualCounter) throws IOException;
void onReceivedPacket(ReliableTransport transport, long expectedCounter); void onReceivedPacket(ReliableTransport transport, long expectedCounter);

View File

@ -31,7 +31,7 @@ import junit.framework.TestCase;
*/ */
public class ReliableTransportTest extends TestCase { public class ReliableTransportTest extends TestCase {
protected TransportFilter transport; protected ReliableTransport transport;
protected StubTransportListener listener = new StubTransportListener(); protected StubTransportListener listener = new StubTransportListener();
protected ReplayStrategy replayStrategy; protected ReplayStrategy replayStrategy;
@ -41,12 +41,40 @@ public class ReliableTransportTest extends TestCase {
sendStreamOfCommands(sequenceNumbers, true); sendStreamOfCommands(sequenceNumbers, true);
} }
public void testValidWrapAroundPackets() throws Exception {
int[] sequenceNumbers = new int[10];
int value = Integer.MAX_VALUE - 3;
transport.setExpectedCounter(value);
for (int i = 0; i < 10; i++) {
System.out.println("command: " + i + " = " + value);
sequenceNumbers[i] = value++;
}
sendStreamOfCommands(sequenceNumbers, true);
}
public void testDuplicatePacketsDropped() throws Exception { public void testDuplicatePacketsDropped() throws Exception {
int[] sequenceNumbers = { 1, 2, 2, 3, 4, 5, 6, 7 }; int[] sequenceNumbers = { 1, 2, 2, 3, 4, 5, 6, 7 };
sendStreamOfCommands(sequenceNumbers, true, 7); sendStreamOfCommands(sequenceNumbers, true, 7);
} }
public void testOldDuplicatePacketsDropped() throws Exception {
int[] sequenceNumbers = { 1, 2, 3, 4, 5, 2, 6, 7 };
sendStreamOfCommands(sequenceNumbers, true, 7);
}
public void testOldDuplicatePacketsDroppedUsingNegativeCounters() throws Exception {
int[] sequenceNumbers = { -3, -1, -3, -2, -1, 0, 1, -1, 3, 2, 0, 2, 4 };
transport.setExpectedCounter(-3);
sendStreamOfCommands(sequenceNumbers, true, 8);
}
public void testWrongOrderOfPackets() throws Exception { public void testWrongOrderOfPackets() throws Exception {
int[] sequenceNumbers = { 4, 3, 1, 5, 2, 7, 6, 8, 10, 9 }; int[] sequenceNumbers = { 4, 3, 1, 5, 2, 7, 6, 8, 10, 9 };
@ -83,13 +111,14 @@ public class ReliableTransportTest extends TestCase {
fail("Caught exception: " + e); fail("Caught exception: " + e);
} }
assertEquals("number of messages received", expectedCount, commands.size()); assertEquals("number of messages received", expectedCount, commands.size());
}
assertEquals("Should have no buffered commands", 0, transport.getBufferedCommandCount());
}
else { else {
assertTrue("Should have received an exception!", exceptions.size() > 0); assertTrue("Should have received an exception!", exceptions.size() > 0);
Exception e = (Exception) exceptions.remove(); Exception e = (Exception) exceptions.remove();
System.out.println("Caught expected response: " + e); System.out.println("Caught expected response: " + e);
} }
} }
protected void setUp() throws Exception { protected void setUp() throws Exception {