added some test cases (that are not yet working) for reliable UDP

git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@389533 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
James Strachan 2006-03-28 16:19:23 +00:00
parent 5a392bee0a
commit 2a076d0710
7 changed files with 444 additions and 6 deletions

View File

@ -162,7 +162,7 @@
<dependency>
<groupId>org.apache.xbean</groupId>
<artifactId>xbean-spring</artifactId>
<version>${xbean_spring_version}</version>
<version>2.2</version>
<url>http://www.xbean.org</url>
<properties>
<war.bundle>true</war.bundle>
@ -344,28 +344,34 @@
<!-- This test currently fails -->
<exclude>**/ItStillMarshallsTheSameTest.*</exclude>
<!-- http://jira.activemq.org/jira/browse/AMQ-522 -->
<!-- https://issues.apache.org/activemq/browse/AMQ-522 -->
<exclude>**/ProxyConnectorTest.*</exclude>
<!-- http://jira.activemq.org/jira/browse/AMQ-594 -->
<!-- https://issues.apache.org/activemq/browse/AMQ-594 -->
<exclude>**/SimpleNetworkTest.*</exclude>
<!-- http://jira.activemq.org/jira/browse/AMQ-583 -->
<!-- https://issues.apache.org/activemq/browse/AMQ-583 -->
<exclude>**/DiscoveryTransportBrokerTest.*</exclude>
<!-- http://jira.activemq.org/jira/browse/AMQ-610 -->
<!-- https://issues.apache.org/activemq/browse/AMQ-610 -->
<exclude>**/FanoutTransportBrokerTest.*</exclude>
<!-- http://jira.activemq.org/jira/browse/AMQ-626 -->
<!-- https://issues.apache.org/activemq/browse/AMQ-626 -->
<exclude>**/MultipleTestsWithSpringFactoryBeanTest.*</exclude>
<exclude>**/MultipleTestsWithXBeanFactoryBeanTest.*</exclude>
<exclude>**/MultipleTestsWithSpringXBeanFactoryBeanTest.*</exclude>
<!-- https://issues.apache.org/activemq/browse/AMQ-667 -->
<exclude>**/SslTransportBrokerTest.*</exclude>
<!-- TODO FIX ME ASAP -->
<exclude>**/MulticastNetworkTest.*</exclude>
<exclude>**/UnreliableUdpTransportTest.*</exclude>
<exclude>**/MulticastTransportTest.*</exclude>
<exclude>**/UdpSendReceiveWithTwoConnectionsAndLargeMessagesTest.*</exclude>
<exclude>**/UnreliableUdpTransportTest.*</exclude>
</excludes>
</unitTest>
<resources>

View File

@ -0,0 +1,32 @@
/*
* Copyright 2005-2006 The Apache Software Foundation.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.reliable;
import java.net.SocketAddress;
/**
*
* @version $Revision: $
*/
public interface DropCommandStrategy {
/**
* Returns true if the command should be dropped for
* the given command ID and address
*/
boolean shouldDropCommand(int commandId, SocketAddress address, boolean redelivery);
}

View File

@ -0,0 +1,135 @@
/**
*
* Copyright 2005-2006 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.reliable;
import edu.emory.mathcs.backport.java.util.Queue;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.transport.StubTransport;
import org.apache.activemq.transport.StubTransportListener;
import org.apache.activemq.transport.reliable.ExceptionIfDroppedReplayStrategy;
import org.apache.activemq.transport.reliable.ReliableTransport;
import org.apache.activemq.transport.reliable.ReplayStrategy;
import junit.framework.TestCase;
/**
*
* @version $Revision$
*/
public class ReliableTransportTest extends TestCase {
protected ReliableTransport transport;
protected StubTransportListener listener = new StubTransportListener();
protected ReplayStrategy replayStrategy;
public void testValidSequenceOfPackets() throws Exception {
int[] sequenceNumbers = { 1, 2, 3, 4, 5, 6, 7 };
sendStreamOfCommands(sequenceNumbers, true);
}
public void testValidWrapAroundPackets() throws Exception {
int[] sequenceNumbers = new int[10];
int value = Integer.MAX_VALUE - 3;
transport.setExpectedCounter(value);
for (int i = 0; i < 10; i++) {
System.out.println("command: " + i + " = " + value);
sequenceNumbers[i] = value++;
}
sendStreamOfCommands(sequenceNumbers, true);
}
public void testDuplicatePacketsDropped() throws Exception {
int[] sequenceNumbers = { 1, 2, 2, 3, 4, 5, 6, 7 };
sendStreamOfCommands(sequenceNumbers, true, 7);
}
public void testOldDuplicatePacketsDropped() throws Exception {
int[] sequenceNumbers = { 1, 2, 3, 4, 5, 2, 6, 7 };
sendStreamOfCommands(sequenceNumbers, true, 7);
}
public void testOldDuplicatePacketsDroppedUsingNegativeCounters() throws Exception {
int[] sequenceNumbers = { -3, -1, -3, -2, -1, 0, 1, -1, 3, 2, 0, 2, 4 };
transport.setExpectedCounter(-3);
sendStreamOfCommands(sequenceNumbers, true, 8);
}
public void testWrongOrderOfPackets() throws Exception {
int[] sequenceNumbers = { 4, 3, 1, 5, 2, 7, 6, 8, 10, 9 };
sendStreamOfCommands(sequenceNumbers, true);
}
public void testMissingPacketsFails() throws Exception {
int[] sequenceNumbers = { 1, 2, /* 3, */ 4, 5, 6, 7, 8, 9, 10 };
sendStreamOfCommands(sequenceNumbers, false);
}
protected void sendStreamOfCommands(int[] sequenceNumbers, boolean expected) {
sendStreamOfCommands(sequenceNumbers, expected, sequenceNumbers.length);
}
protected void sendStreamOfCommands(int[] sequenceNumbers, boolean expected, int expectedCount) {
for (int i = 0; i < sequenceNumbers.length; i++) {
int commandId = sequenceNumbers[i];
ConsumerInfo info = new ConsumerInfo();
info.setSelector("Cheese: " + commandId);
info.setCommandId(commandId);
transport.onCommand(info);
}
Queue exceptions = listener.getExceptions();
Queue commands = listener.getCommands();
if (expected) {
if (!exceptions.isEmpty()) {
Exception e = (Exception) exceptions.remove();
e.printStackTrace();
fail("Caught exception: " + e);
}
assertEquals("number of messages received", expectedCount, commands.size());
assertEquals("Should have no buffered commands", 0, transport.getBufferedCommandCount());
}
else {
assertTrue("Should have received an exception!", exceptions.size() > 0);
Exception e = (Exception) exceptions.remove();
System.out.println("Caught expected response: " + e);
}
}
protected void setUp() throws Exception {
if (replayStrategy == null) {
replayStrategy = new ExceptionIfDroppedReplayStrategy();
}
transport = new ReliableTransport(new StubTransport(), replayStrategy);
transport.setTransportListener(listener);
transport.start();
}
}

View File

@ -0,0 +1,60 @@
/*
* Copyright 2005-2006 The Apache Software Foundation.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.reliable;
import org.apache.activemq.openwire.OpenWireFormat;
import org.apache.activemq.transport.udp.ByteBufferPool;
import org.apache.activemq.transport.udp.CommandDatagramChannel;
import org.apache.activemq.transport.udp.DatagramHeaderMarshaller;
import org.apache.activemq.transport.udp.UdpTransport;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import java.io.IOException;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.DatagramChannel;
/**
*
* @version $Revision: $
*/
public class UnreliableCommandDatagramChannel extends CommandDatagramChannel {
private static final Log log = LogFactory.getLog(UnreliableCommandDatagramChannel.class);
private DropCommandStrategy dropCommandStrategy;
public UnreliableCommandDatagramChannel(UdpTransport transport, OpenWireFormat wireFormat, int datagramSize,
SocketAddress targetAddress, DatagramHeaderMarshaller headerMarshaller, ReplayBuffer replayBuffer, DatagramChannel channel,
ByteBufferPool bufferPool, DropCommandStrategy strategy) {
super(transport, wireFormat, datagramSize, targetAddress, headerMarshaller, channel, bufferPool);
this.dropCommandStrategy = strategy;
}
protected void sendWriteBuffer(int commandId, SocketAddress address, ByteBuffer writeBuffer, boolean redelivery) throws IOException {
if (dropCommandStrategy.shouldDropCommand(commandId, address, redelivery)) {
writeBuffer.flip();
log.info("Dropping datagram with command: " + commandId);
// lets still add it to the replay buffer though!
getReplayBuffer().addBuffer(commandId, writeBuffer);
}
else {
super.sendWriteBuffer(commandId, address, writeBuffer, redelivery);
}
}
}

View File

@ -0,0 +1,60 @@
/*
* Copyright 2005-2006 The Apache Software Foundation.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.reliable;
import org.apache.activemq.openwire.OpenWireFormat;
import org.apache.activemq.transport.udp.CommandDatagramSocket;
import org.apache.activemq.transport.udp.DatagramHeaderMarshaller;
import org.apache.activemq.transport.udp.UdpTransport;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import java.io.IOException;
import java.net.DatagramSocket;
import java.net.SocketAddress;
/**
*
* @version $Revision: $
*/
public class UnreliableCommandDatagramSocket extends CommandDatagramSocket {
private static final Log log = LogFactory.getLog(UnreliableCommandDatagramSocket.class);
private DropCommandStrategy dropCommandStrategy;
public UnreliableCommandDatagramSocket(UdpTransport transport, OpenWireFormat wireFormat, int datagramSize,
SocketAddress targetAddress, DatagramHeaderMarshaller headerMarshaller, DatagramSocket channel,
DropCommandStrategy strategy) {
super(transport, wireFormat, datagramSize, targetAddress, headerMarshaller, channel);
this.dropCommandStrategy = strategy;
}
protected void sendWriteBuffer(int commandId, SocketAddress address, byte[] data, boolean redelivery)
throws IOException {
if (dropCommandStrategy.shouldDropCommand(commandId, address, redelivery)) {
log.info("Dropping datagram with command: " + commandId);
// lets still add it to the replay buffer though!
ReplayBuffer bufferCache = getReplayBuffer();
if (bufferCache != null && !redelivery) {
bufferCache.addBuffer(commandId, data);
}
}
else {
super.sendWriteBuffer(commandId, address, data, redelivery);
}
}
}

View File

@ -0,0 +1,68 @@
/*
* Copyright 2005-2006 The Apache Software Foundation.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.reliable;
import org.apache.activemq.openwire.OpenWireFormat;
import org.apache.activemq.transport.udp.CommandChannel;
import org.apache.activemq.transport.udp.CommandDatagramChannel;
import org.apache.activemq.transport.udp.UdpTransport;
import java.io.IOException;
import java.net.SocketAddress;
import java.net.URI;
import java.net.UnknownHostException;
/**
* An unreliable UDP transport that will randomly discard packets to simulate a
* bad network (or UDP buffers being flooded).
*
* @version $Revision: $
*/
public class UnreliableUdpTransport extends UdpTransport {
private DropCommandStrategy dropCommandStrategy;
public UnreliableUdpTransport(OpenWireFormat wireFormat, int port) throws UnknownHostException, IOException {
super(wireFormat, port);
}
public UnreliableUdpTransport(OpenWireFormat wireFormat, SocketAddress socketAddress) throws IOException {
super(wireFormat, socketAddress);
}
public UnreliableUdpTransport(OpenWireFormat wireFormat, URI remoteLocation) throws UnknownHostException,
IOException {
super(wireFormat, remoteLocation);
}
public UnreliableUdpTransport(OpenWireFormat wireFormat) throws IOException {
super(wireFormat);
}
public DropCommandStrategy getDropCommandStrategy() {
return dropCommandStrategy;
}
public void setDropCommandStrategy(DropCommandStrategy dropCommandStrategy) {
this.dropCommandStrategy = dropCommandStrategy;
}
protected CommandChannel createCommandDatagramChannel() {
return new UnreliableCommandDatagramChannel(this, getWireFormat(), getDatagramSize(), getTargetAddress(),
createDatagramHeaderMarshaller(), getReplayBuffer(), getChannel(), getBufferPool(), dropCommandStrategy);
}
}

View File

@ -0,0 +1,77 @@
/*
* Copyright 2005-2006 The Apache Software Foundation.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.reliable;
import org.apache.activemq.openwire.OpenWireFormat;
import org.apache.activemq.transport.CommandJoiner;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.udp.ResponseRedirectInterceptor;
import org.apache.activemq.transport.udp.UdpTransport;
import org.apache.activemq.transport.udp.UdpTransportTest;
import java.net.SocketAddress;
import java.net.URI;
import java.util.HashSet;
import java.util.Set;
/**
*
* @version $Revision: $
*/
public class UnreliableUdpTransportTest extends UdpTransportTest {
protected DropCommandStrategy dropStrategy = new DropCommandStrategy() {
public boolean shouldDropCommand(int commandId, SocketAddress address, boolean redelivery) {
if (redelivery) {
return false;
}
return commandId % 3 == 2;
}
};
protected Transport createProducer() throws Exception {
System.out.println("Producer using URI: " + producerURI);
OpenWireFormat wireFormat = createWireFormat();
UnreliableUdpTransport transport = new UnreliableUdpTransport(wireFormat, new URI(producerURI));
transport.setDropCommandStrategy(dropStrategy);
ReliableTransport reliableTransport = new ReliableTransport(transport, transport);
Replayer replayer = reliableTransport.getReplayer();
reliableTransport.setReplayStrategy(createReplayStrategy(replayer));
return new CommandJoiner(reliableTransport, wireFormat);
}
protected Transport createConsumer() throws Exception {
System.out.println("Consumer on port: " + consumerPort);
OpenWireFormat wireFormat = createWireFormat();
UdpTransport transport = new UdpTransport(wireFormat, consumerPort);
ReliableTransport reliableTransport = new ReliableTransport(transport, transport);
Replayer replayer = reliableTransport.getReplayer();
reliableTransport.setReplayStrategy(createReplayStrategy(replayer));
ResponseRedirectInterceptor redirectInterceptor = new ResponseRedirectInterceptor(reliableTransport, transport);
return new CommandJoiner(redirectInterceptor, wireFormat);
}
protected ReplayStrategy createReplayStrategy(Replayer replayer) {
assertNotNull("Should have a replayer!", replayer);
return new DefaultReplayStrategy(1);
}
}