Changes to the test for AMQ-2324 and AMQ-2484; trying to get it to pass consistently

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@881174 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Bruce Snyder 2009-11-17 06:43:17 +00:00
parent f6cf0bafc2
commit a15b39569b
3 changed files with 120 additions and 25 deletions

View File

@ -212,6 +212,11 @@
<artifactId>spring-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-jmdns_1.0</artifactId>

View File

@ -1,17 +1,25 @@
package org.apache.activemq.network;
import java.io.File;
import java.io.IOException;
import javax.jms.DeliveryMode;
import junit.framework.Test;
import org.apache.activemq.broker.StubConnection;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.Command;
import org.apache.activemq.command.ConnectionInfo;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.ProducerInfo;
import org.apache.activemq.command.SessionInfo;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportListener;
import org.apache.activemq.util.Wait;
import org.apache.commons.io.FileUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -29,17 +37,67 @@ public class BrokerNetworkWithStuckMessagesTest extends NetworkTestSupport {
NetworkBridgeConfiguration config = new NetworkBridgeConfiguration();
config.setBrokerName("local");
config.setDispatchAsync(false);
bridge = new DemandForwardingBridge(config, createTransport(), createRemoteTransport());
Transport localTransport = createTransport();
localTransport.setTransportListener(new TransportListener() {
Command command = null;
public void onCommand(Object o) {
this.command = (Command) o;
LOG.info("Command from [" + command.getFrom() + "] to [" + command.getTo() + "]");
}
public void onException(IOException error) {
LOG.info("Command from [" + command.getFrom() + "] to [" + command.getTo() + "]");
LOG.info("Exception: " + error);
}
public void transportInterupted() {
LOG.info("Interruption on local transport");
}
public void transportResumed() {
LOG.info("Resumption on local transport");
}
});
Transport remoteTransport = createRemoteTransport();
remoteTransport.setTransportListener(new TransportListener() {
Command command = null;
public void onCommand(Object o) {
this.command = (Command) o;
LOG.info("Command from [" + command.getFrom() + "] to [" + command.getTo() + "]");
}
public void onException(IOException error) {
LOG.info("Command from [" + command.getFrom() + "] to [" + command.getTo() + "]");
LOG.info("Exception: " + error);
}
public void transportInterupted() {
LOG.info("Interruption on remote transport");
}
public void transportResumed() {
LOG.info("Resumption on remote transport");
}
});
bridge = new DemandForwardingBridge(config, localTransport, remoteTransport);
bridge.setBrokerService(broker);
bridge.start();
// Enable JMX support on the local and remote brokers
broker.setUseJmx(true);
remoteBroker.setUseJmx(true);
// broker.setUseJmx(true);
// remoteBroker.setUseJmx(true);
// Set the names of teh local and remote brokers
broker.setBrokerName("local");
remoteBroker.setBrokerName("remote");
// Make sure persistence is disabled
broker.setPersistent(false);
broker.setPersistenceAdapter(null);
remoteBroker.setPersistent(false);
remoteBroker.setPersistenceAdapter(null);
// Remove the activemq-data directory from the creation of the remote broker
FileUtils.deleteDirectory(new File("activemq-data"));
}
protected void tearDown() throws Exception {
@ -66,31 +124,42 @@ public class BrokerNetworkWithStuckMessagesTest extends NetworkTestSupport {
for (int i = 0; i < sendNumMessages; ++i) {
destinationInfo1 = createDestinationInfo(connection1, connectionInfo1, ActiveMQDestination.QUEUE_TYPE);
connection1.send(createMessage(producerInfo, destinationInfo1, DeliveryMode.NON_PERSISTENT));
// connection1.send(createMessage(producerInfo, destinationInfo1, DeliveryMode.NON_PERSISTENT));
connection1.request(createMessage(producerInfo, destinationInfo1, DeliveryMode.NON_PERSISTENT));
}
// Ensure that there are 10 messages on the local broker
assertTrue(countMessagesInQueue(connection1, connectionInfo1, destinationInfo1) == 10);
int messageCount1 = countMessagesInQueue(connection1, connectionInfo1, destinationInfo1);
assertEquals(10, messageCount1);
// Create a consumer on the remote broker
StubConnection connection2 = createRemoteConnection();
final StubConnection connection2 = createRemoteConnection();
ConnectionInfo connectionInfo2 = createConnectionInfo();
SessionInfo sessionInfo2 = createSessionInfo(connectionInfo2);
connection2.send(connectionInfo2);
connection2.send(sessionInfo2);
ActiveMQDestination destinationInfo2 =
createDestinationInfo(connection2, connectionInfo2, ActiveMQDestination.QUEUE_TYPE);
ConsumerInfo consumerInfo2 = createConsumerInfo(sessionInfo2, destinationInfo2);
final ConsumerInfo consumerInfo2 = createConsumerInfo(sessionInfo2, destinationInfo2);
connection2.send(consumerInfo2);
// Consume 5 of the messages from the remote broker and ack them.
// Because the prefetch size is set to 1000, this will cause the
// messages on the local broker to be forwarded to the remote broker.
// Because the prefetch size is set to 1000 in the createConsumerInfo()
// method, this will cause the messages on the local broker to be
// forwarded to the remote broker.
for (int i = 0; i < receiveNumMessages; ++i) {
Message message1 = receiveMessage(connection2);
assertNotNull(message1);
connection2.send(createAck(consumerInfo2, message1, 1, MessageAck.STANDARD_ACK_TYPE));
assertTrue("Message " + i + " was not received", Wait.waitFor(new Wait.Condition() {
public boolean isSatisified() throws Exception {
Message message1 = receiveMessage(connection2);
assertNotNull(message1);
connection2.send(createAck(consumerInfo2, message1, 1, MessageAck.STANDARD_ACK_TYPE));
return message1 != null;
}
}));
// Message message1 = receiveMessage(connection2);
// assertNotNull(message1);
// connection2.send(createAck(consumerInfo2, message1, 1, MessageAck.STANDARD_ACK_TYPE));
}
// Close the consumer on the remote broker
@ -99,10 +168,13 @@ public class BrokerNetworkWithStuckMessagesTest extends NetworkTestSupport {
// Ensure that there are zero messages on the local broker. This tells
// us that those messages have been prefetched to the remote broker
// where the demand exists.
assertTrue(countMessagesInQueue(connection1, connectionInfo1, destinationInfo1) == 0);
int messageCount2 = countMessagesInQueue(connection1, connectionInfo1, destinationInfo1);
// Sometimes it fails here
assertEquals(0, messageCount2);
// There should now be 5 messages stuck on the remote broker
assertTrue(countMessagesInQueue(connection2, connectionInfo2, destinationInfo1) == 5);
int messageCount3 = countMessagesInQueue(connection2, connectionInfo2, destinationInfo2);
assertEquals(5, messageCount3);
// Create a consumer on the local broker just to confirm that it doesn't
// receive any messages
@ -113,27 +185,38 @@ public class BrokerNetworkWithStuckMessagesTest extends NetworkTestSupport {
//////////////////////////////////////////////////////
// An assertNull() is done here because this is currently the correct
// behavior. This is actually the purpose of this test - to prove that
// messages are stuck on the remote broker. AMQ-2324 aims to fix this
// situation so that messages don't get stuck.
// messages are stuck on the remote broker. AMQ-2324 and AMQ-2484 aim
// to fix this situation so that messages don't get stuck.
assertNull(message1);
//////////////////////////////////////////////////////
consumerInfo2 = createConsumerInfo(sessionInfo2, destinationInfo2);
connection2.send(consumerInfo2);
ConsumerInfo consumerInfo3 = createConsumerInfo(sessionInfo2, destinationInfo2);
connection2.send(consumerInfo3);
// Consume the last 5 messages from the remote broker and ack them just
// to clean up the queue.
int counter = 0;
for (int i = 0; i < receiveNumMessages; ++i) {
message1 = receiveMessage(connection2);
assertNotNull(message1);
connection2.send(createAck(consumerInfo2, message1, 1, MessageAck.STANDARD_ACK_TYPE));
connection2.send(createAck(consumerInfo3, message1, 1, MessageAck.STANDARD_ACK_TYPE));
++counter;
}
// Ensure that 5 messages were received
assertEquals(receiveNumMessages, counter);
// Close the consumer on the remote broker
connection2.send(consumerInfo2.createRemoveCommand());
Thread.sleep(2000);
// Ensure that the queue on the remote broker is empty
assertTrue(countMessagesInQueue(connection2, connectionInfo2, destinationInfo2) == 0);
int messageCount4 = countMessagesInQueue(connection2, connectionInfo2, destinationInfo1);
// Sometimes it fails here
assertEquals(0, messageCount4);
// Close the consumer on the remote broker
connection2.send(consumerInfo3.createRemoveCommand());
connection1.stop();
connection2.stop();
}
public static Test suite() {

View File

@ -50,6 +50,7 @@
<openjpa-version>1.2.0</openjpa-version>
<commons-dbcp-version>1.2.2</commons-dbcp-version>
<commons-httpclient-version>3.1</commons-httpclient-version>
<commons-io-version>1.4</commons-io-version>
<commons-logging-version>1.1</commons-logging-version>
<commons-pool-version>1.4</commons-pool-version>
<commons-primitives-version>1.0</commons-primitives-version>
@ -814,6 +815,12 @@
<version>${annogen-version}</version>
</dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>${commons-io-version}</version>
</dependency>
<dependency>
<groupId>org.apache.ant</groupId>
<artifactId>ant</artifactId>