refactored the reliable transport into the reliable package and added a test case demonstrating the transport handling duplicate packets and dealing with reordering OK

git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@385478 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
James Strachan 2006-03-13 09:17:43 +00:00
parent c8b8fdde4f
commit e6954f1fd4
5 changed files with 85 additions and 37 deletions

View File

@ -14,25 +14,40 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.activemq.transport.replay; package org.apache.activemq.transport.reliable;
import org.apache.activemq.transport.ReliableTransport;
import java.io.IOException; import java.io.IOException;
/** /**
* Throws an exception if packets are dropped causing the transport to be closed. * Throws an exception if packets are dropped causing the transport to be
* closed.
* *
* @version $Revision$ * @version $Revision$
*/ */
public class ExceptionIfDroppedReplayStrategy implements ReplayStrategy { public class ExceptionIfDroppedReplayStrategy implements ReplayStrategy {
public void onDroppedPackets(ReliableTransport transport, long expectedCounter, long actualCounter) throws IOException { private int maximumDifference = 5;
long count = actualCounter - expectedCounter;
throw new IOException("Packets dropped on: " + transport + " count: " + count + " expected: " + expectedCounter + " but was: " + actualCounter); public void onDroppedPackets(ReliableTransport transport, int expectedCounter, int actualCounter) throws IOException {
long count = Math.abs(actualCounter - expectedCounter);
if (count > maximumDifference) {
throw new IOException("Packets dropped on: " + transport + " count: " + count + " expected: " + expectedCounter + " but was: " + actualCounter);
}
} }
public void onReceivedPacket(ReliableTransport transport, long expectedCounter) { public void onReceivedPacket(ReliableTransport transport, long expectedCounter) {
} }
public int getMaximumDifference() {
return maximumDifference;
}
/**
* Sets the maximum allowed difference between an expected packet and an
* actual packet before an error occurs
*/
public void setMaximumDifference(int maximumDifference) {
this.maximumDifference = maximumDifference;
}
} }

View File

@ -14,13 +14,12 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.activemq.transport; package org.apache.activemq.transport.reliable;
import org.apache.activemq.command.Command; import org.apache.activemq.command.Command;
import org.apache.activemq.openwire.CommandIdComparator; import org.apache.activemq.openwire.CommandIdComparator;
import org.apache.activemq.transport.replay.ReplayStrategy; import org.apache.activemq.transport.Transport;
import org.apache.commons.logging.Log; import org.apache.activemq.transport.TransportFilter;
import org.apache.commons.logging.LogFactory;
import java.io.IOException; import java.io.IOException;
import java.util.SortedSet; import java.util.SortedSet;
@ -33,10 +32,8 @@ import java.util.TreeSet;
* @version $Revision$ * @version $Revision$
*/ */
public class ReliableTransport extends TransportFilter { public class ReliableTransport extends TransportFilter {
private static final Log log = LogFactory.getLog(ReliableTransport.class);
private ReplayStrategy replayStrategy; private ReplayStrategy replayStrategy;
private SortedSet headers = new TreeSet(new CommandIdComparator()); private SortedSet commands = new TreeSet(new CommandIdComparator());
private int expectedCounter = 1; private int expectedCounter = 1;
public ReliableTransport(Transport next, ReplayStrategy replayStrategy) { public ReliableTransport(Transport next, ReplayStrategy replayStrategy) {
@ -49,28 +46,48 @@ public class ReliableTransport extends TransportFilter {
boolean valid = expectedCounter == actualCounter; boolean valid = expectedCounter == actualCounter;
if (!valid) { if (!valid) {
// lets add it to the list for later on synchronized (commands) {
headers.add(command); // lets add it to the list for later on
commands.add(command);
try { try {
replayStrategy.onDroppedPackets(this, expectedCounter, actualCounter); replayStrategy.onDroppedPackets(this, expectedCounter, actualCounter);
} }
catch (IOException e) { catch (IOException e) {
getTransportListener().onException(e); getTransportListener().onException(e);
} }
if (!headers.isEmpty()) { if (!commands.isEmpty()) {
// lets see if the first item in the set is the next header // lets see if the first item in the set is the next
command = (Command) headers.first(); // expected
valid = expectedCounter == command.getCommandId(); command = (Command) commands.first();
valid = expectedCounter == command.getCommandId();
if (valid) {
commands.remove(command);
}
}
} }
} }
if (valid) { while (valid) {
// we've got a valid header so increment counter // we've got a valid header so increment counter
replayStrategy.onReceivedPacket(this, expectedCounter); replayStrategy.onReceivedPacket(this, expectedCounter);
expectedCounter++; expectedCounter++;
getTransportListener().onCommand(command); getTransportListener().onCommand(command);
synchronized (commands) {
// we could have more commands left
valid = !commands.isEmpty();
if (valid) {
// lets see if the first item in the set is the next
// expected
command = (Command) commands.first();
valid = expectedCounter == command.getCommandId();
if (valid) {
commands.remove(command);
}
}
}
} }
} }

View File

@ -14,9 +14,8 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.activemq.transport.replay; package org.apache.activemq.transport.reliable;
import org.apache.activemq.transport.ReliableTransport;
import java.io.IOException; import java.io.IOException;
@ -27,7 +26,7 @@ import java.io.IOException;
*/ */
public interface ReplayStrategy { public interface ReplayStrategy {
void onDroppedPackets(ReliableTransport transport, long expectedCounter, long actualCounter) throws IOException; void onDroppedPackets(ReliableTransport transport, int expectedCounter, int actualCounter) throws IOException;
void onReceivedPacket(ReliableTransport transport, long expectedCounter); void onReceivedPacket(ReliableTransport transport, long expectedCounter);

View File

@ -22,8 +22,8 @@ import org.apache.activemq.command.Endpoint;
import org.apache.activemq.openwire.OpenWireFormat; import org.apache.activemq.openwire.OpenWireFormat;
import org.apache.activemq.transport.Transport; import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportThreadSupport; import org.apache.activemq.transport.TransportThreadSupport;
import org.apache.activemq.transport.replay.ExceptionIfDroppedReplayStrategy; import org.apache.activemq.transport.reliable.ExceptionIfDroppedReplayStrategy;
import org.apache.activemq.transport.replay.ReplayStrategy; import org.apache.activemq.transport.reliable.ReplayStrategy;
import org.apache.activemq.util.ServiceStopper; import org.apache.activemq.util.ServiceStopper;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;

View File

@ -19,8 +19,9 @@ package org.apache.activemq.transport;
import edu.emory.mathcs.backport.java.util.Queue; import edu.emory.mathcs.backport.java.util.Queue;
import org.apache.activemq.command.ConsumerInfo; import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.transport.replay.ExceptionIfDroppedReplayStrategy; import org.apache.activemq.transport.reliable.ExceptionIfDroppedReplayStrategy;
import org.apache.activemq.transport.replay.ReplayStrategy; import org.apache.activemq.transport.reliable.ReliableTransport;
import org.apache.activemq.transport.reliable.ReplayStrategy;
import junit.framework.TestCase; import junit.framework.TestCase;
@ -36,17 +37,33 @@ public class ReliableTransportTest extends TestCase {
public void testValidSequenceOfPackets() throws Exception { public void testValidSequenceOfPackets() throws Exception {
int[] sequenceNumbers = { 1, 2, 3, 4, 5, 6, 7 }; int[] sequenceNumbers = { 1, 2, 3, 4, 5, 6, 7 };
sendStreamOfCommands(sequenceNumbers, true);
}
public void testDuplicatePacketsDropped() throws Exception {
int[] sequenceNumbers = { 1, 2, 2, 3, 4, 5, 6, 7 };
sendStreamOfCommands(sequenceNumbers, true, 7);
}
public void testWrongOrderOfPackets() throws Exception {
int[] sequenceNumbers = { 4, 3, 1, 5, 2, 7, 6, 8, 10, 9 };
sendStreamOfCommands(sequenceNumbers, true); sendStreamOfCommands(sequenceNumbers, true);
} }
public void testInvalidSequenceOfPackets() throws Exception { public void testMissingPacketsFails() throws Exception {
int[] sequenceNumbers = { 1, 2, /* 3, */ 4, 5, 6, 7 }; int[] sequenceNumbers = { 1, 2, /* 3, */ 4, 5, 6, 7, 8, 9, 10 };
sendStreamOfCommands(sequenceNumbers, false); sendStreamOfCommands(sequenceNumbers, false);
} }
protected void sendStreamOfCommands(int[] sequenceNumbers, boolean expected) { protected void sendStreamOfCommands(int[] sequenceNumbers, boolean expected) {
sendStreamOfCommands(sequenceNumbers, expected, sequenceNumbers.length);
}
protected void sendStreamOfCommands(int[] sequenceNumbers, boolean expected, int expectedCount) {
for (int i = 0; i < sequenceNumbers.length; i++) { for (int i = 0; i < sequenceNumbers.length; i++) {
int commandId = sequenceNumbers[i]; int commandId = sequenceNumbers[i];
@ -65,7 +82,7 @@ public class ReliableTransportTest extends TestCase {
e.printStackTrace(); e.printStackTrace();
fail("Caught exception: " + e); fail("Caught exception: " + e);
} }
assertEquals("number of messages received", sequenceNumbers.length, commands.size()); assertEquals("number of messages received", expectedCount, commands.size());
} }
else { else {
assertTrue("Should have received an exception!", exceptions.size() > 0); assertTrue("Should have received an exception!", exceptions.size() > 0);