mirror of https://github.com/apache/activemq.git
Fix another test that fails because of https://issues.apache.org/jira/browse/AMQ-4237
git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1431123 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
3dd24195e6
commit
ac23437828
|
@ -39,7 +39,6 @@ import javax.management.ObjectName;
|
|||
import junit.framework.TestCase;
|
||||
|
||||
import org.apache.activemq.ActiveMQConnectionFactory;
|
||||
|
||||
import org.apache.activemq.broker.BrokerService;
|
||||
import org.apache.activemq.broker.BrokerTestSupport;
|
||||
import org.apache.activemq.broker.StubConnection;
|
||||
|
@ -68,87 +67,87 @@ import org.slf4j.Logger;
|
|||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/**
|
||||
* This class duplicates most of the functionality in {@link NetworkTestSupport}
|
||||
* and {@link BrokerTestSupport} because more control was needed over how brokers
|
||||
* and connectors are created. Also, this test asserts message counts via JMX on
|
||||
* each broker.
|
||||
*
|
||||
* @author bsnyder
|
||||
*
|
||||
* This class duplicates most of the functionality in {@link NetworkTestSupport}
|
||||
* and {@link BrokerTestSupport} because more control was needed over how brokers
|
||||
* and connectors are created. Also, this test asserts message counts via JMX on
|
||||
* each broker.
|
||||
*/
|
||||
public class BrokerNetworkWithStuckMessagesTest extends TestCase /*NetworkTestSupport*/ {
|
||||
|
||||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(BrokerNetworkWithStuckMessagesTest.class);
|
||||
|
||||
private BrokerService localBroker;
|
||||
private BrokerService remoteBroker;
|
||||
|
||||
private BrokerService localBroker;
|
||||
private BrokerService remoteBroker;
|
||||
private DemandForwardingBridge bridge;
|
||||
|
||||
|
||||
protected Map<String, BrokerService> brokers = new HashMap<String, BrokerService>();
|
||||
protected ArrayList connections = new ArrayList();
|
||||
|
||||
protected ArrayList<StubConnection> connections = new ArrayList<StubConnection>();
|
||||
|
||||
protected TransportConnector connector;
|
||||
protected TransportConnector remoteConnector;
|
||||
|
||||
|
||||
protected long idGenerator;
|
||||
protected int msgIdGenerator;
|
||||
protected int tempDestGenerator;
|
||||
protected int maxWait = 4000;
|
||||
protected String queueName = "TEST";
|
||||
|
||||
|
||||
protected String amqDomain = "org.apache.activemq";
|
||||
|
||||
|
||||
@Override
|
||||
protected void setUp() throws Exception {
|
||||
|
||||
// For those who want visual confirmation:
|
||||
// Uncomment the following to enable JMX support on a port number to use
|
||||
// Jconsole to view each broker. You will need to add some calls to
|
||||
// Thread.sleep() to be able to actually slow things down so that you
|
||||
// can manually see JMX attrs.
|
||||
|
||||
// For those who want visual confirmation:
|
||||
// Uncomment the following to enable JMX support on a port number to use
|
||||
// Jconsole to view each broker. You will need to add some calls to
|
||||
// Thread.sleep() to be able to actually slow things down so that you
|
||||
// can manually see JMX attrs.
|
||||
// System.setProperty("com.sun.management.jmxremote", "");
|
||||
// System.setProperty("com.sun.management.jmxremote.port", "1099");
|
||||
// System.setProperty("com.sun.management.jmxremote.authenticate", "false");
|
||||
// System.setProperty("com.sun.management.jmxremote.ssl", "false");
|
||||
|
||||
// Create the local broker
|
||||
|
||||
// Create the local broker
|
||||
createBroker();
|
||||
// Create the remote broker
|
||||
// Create the remote broker
|
||||
createRemoteBroker();
|
||||
|
||||
|
||||
// Remove the activemq-data directory from the creation of the remote broker
|
||||
FileUtils.deleteDirectory(new File("activemq-data"));
|
||||
|
||||
// Create a network bridge between the local and remote brokers so that
|
||||
|
||||
// Create a network bridge between the local and remote brokers so that
|
||||
// demand-based forwarding can take place
|
||||
NetworkBridgeConfiguration config = new NetworkBridgeConfiguration();
|
||||
config.setBrokerName("local");
|
||||
config.setDispatchAsync(false);
|
||||
config.setDuplex(true);
|
||||
|
||||
Transport localTransport = createTransport();
|
||||
Transport localTransport = createTransport();
|
||||
Transport remoteTransport = createRemoteTransport();
|
||||
|
||||
// Create a network bridge between the two brokers
|
||||
|
||||
// Create a network bridge between the two brokers
|
||||
bridge = new DemandForwardingBridge(config, localTransport, remoteTransport);
|
||||
bridge.setBrokerService(localBroker);
|
||||
bridge.start();
|
||||
|
||||
|
||||
waitForBridgeFormation();
|
||||
|
||||
|
||||
}
|
||||
|
||||
|
||||
protected void waitForBridgeFormation() throws Exception {
|
||||
for (final BrokerService broker : brokers.values()) {
|
||||
if (!broker.getNetworkConnectors().isEmpty()) {
|
||||
// Max wait here is 30 secs
|
||||
// Max wait here is 30 secs
|
||||
Wait.waitFor(new Wait.Condition() {
|
||||
@Override
|
||||
public boolean isSatisified() throws Exception {
|
||||
return !broker.getNetworkConnectors().get(0).activeBridges().isEmpty();
|
||||
}});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
protected void tearDown() throws Exception {
|
||||
bridge.stop();
|
||||
localBroker.stop();
|
||||
|
@ -156,11 +155,11 @@ public class BrokerNetworkWithStuckMessagesTest extends TestCase /*NetworkTestSu
|
|||
}
|
||||
|
||||
public void testBrokerNetworkWithStuckMessages() throws Exception {
|
||||
|
||||
|
||||
int sendNumMessages = 10;
|
||||
int receiveNumMessages = 5;
|
||||
|
||||
// Create a producer
|
||||
|
||||
// Create a producer
|
||||
StubConnection connection1 = createConnection();
|
||||
ConnectionInfo connectionInfo1 = createConnectionInfo();
|
||||
SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1);
|
||||
|
@ -168,41 +167,41 @@ public class BrokerNetworkWithStuckMessagesTest extends TestCase /*NetworkTestSu
|
|||
connection1.send(connectionInfo1);
|
||||
connection1.send(sessionInfo1);
|
||||
connection1.send(producerInfo);
|
||||
|
||||
// Create a destination on the local broker
|
||||
|
||||
// Create a destination on the local broker
|
||||
ActiveMQDestination destinationInfo1 = null;
|
||||
|
||||
// Send a 10 messages to the local broker
|
||||
|
||||
// Send a 10 messages to the local broker
|
||||
for (int i = 0; i < sendNumMessages; ++i) {
|
||||
destinationInfo1 = createDestinationInfo(connection1, connectionInfo1, ActiveMQDestination.QUEUE_TYPE);
|
||||
connection1.request(createMessage(producerInfo, destinationInfo1, DeliveryMode.NON_PERSISTENT));
|
||||
}
|
||||
|
||||
// Ensure that there are 10 messages on the local broker
|
||||
|
||||
// Ensure that there are 10 messages on the local broker
|
||||
Object[] messages = browseQueueWithJmx(localBroker);
|
||||
assertEquals(sendNumMessages, messages.length);
|
||||
|
||||
|
||||
// Create a synchronous consumer on the remote broker
|
||||
|
||||
|
||||
// Create a synchronous consumer on the remote broker
|
||||
StubConnection connection2 = createRemoteConnection();
|
||||
ConnectionInfo connectionInfo2 = createConnectionInfo();
|
||||
SessionInfo sessionInfo2 = createSessionInfo(connectionInfo2);
|
||||
connection2.send(connectionInfo2);
|
||||
connection2.send(sessionInfo2);
|
||||
ActiveMQDestination destinationInfo2 =
|
||||
ActiveMQDestination destinationInfo2 =
|
||||
createDestinationInfo(connection2, connectionInfo2, ActiveMQDestination.QUEUE_TYPE);
|
||||
final ConsumerInfo consumerInfo2 = createConsumerInfo(sessionInfo2, destinationInfo2);
|
||||
connection2.send(consumerInfo2);
|
||||
|
||||
// Consume 5 of the messages from the remote broker and ack them.
|
||||
|
||||
// Consume 5 of the messages from the remote broker and ack them.
|
||||
for (int i = 0; i < receiveNumMessages; ++i) {
|
||||
Message message1 = receiveMessage(connection2, 20000);
|
||||
assertNotNull(message1);
|
||||
connection2.send(createAck(consumerInfo2, message1, 1, MessageAck.INDIVIDUAL_ACK_TYPE));
|
||||
}
|
||||
|
||||
// Ensure that there are zero messages on the local broker. This tells
|
||||
// us that those messages have been prefetched to the remote broker
|
||||
|
||||
// Ensure that there are zero messages on the local broker. This tells
|
||||
// us that those messages have been prefetched to the remote broker
|
||||
// where the demand exists.
|
||||
Wait.waitFor(new Wait.Condition() {
|
||||
@Override
|
||||
|
@ -215,7 +214,7 @@ public class BrokerNetworkWithStuckMessagesTest extends TestCase /*NetworkTestSu
|
|||
assertEquals(0, messages.length);
|
||||
|
||||
LOG.info("Closing consumer on remote");
|
||||
// Close the consumer on the remote broker
|
||||
// Close the consumer on the remote broker
|
||||
connection2.send(consumerInfo2.createRemoveCommand());
|
||||
// also close connection etc.. so messages get dropped from the local consumer q
|
||||
connection2.send(connectionInfo2.createRemoveCommand());
|
||||
|
@ -259,7 +258,7 @@ public class BrokerNetworkWithStuckMessagesTest extends TestCase /*NetworkTestSu
|
|||
assertNull("Messages have migrated back: " + message1, message1);
|
||||
|
||||
// Consume the last 4 messages from the local broker and ack them just
|
||||
// to clean up the queue.
|
||||
// to clean up the queue.
|
||||
int counter = 1;
|
||||
for (; counter < receiveNumMessages; counter++) {
|
||||
message1 = receiveMessage(connection1);
|
||||
|
@ -289,15 +288,15 @@ public class BrokerNetworkWithStuckMessagesTest extends TestCase /*NetworkTestSu
|
|||
messages = browseQueueWithJmx(localBroker);
|
||||
assertEquals(0, messages.length);
|
||||
|
||||
// Close the consumer on the remote broker
|
||||
// Close the consumer on the remote broker
|
||||
connection2.send(consumerInfo3.createRemoveCommand());
|
||||
|
||||
|
||||
connection1.stop();
|
||||
connection2.stop();
|
||||
}
|
||||
|
||||
|
||||
protected BrokerService createBroker() throws Exception {
|
||||
localBroker = new BrokerService();
|
||||
localBroker = new BrokerService();
|
||||
localBroker.setBrokerName("localhost");
|
||||
localBroker.setUseJmx(true);
|
||||
localBroker.setPersistenceAdapter(null);
|
||||
|
@ -307,11 +306,11 @@ public class BrokerNetworkWithStuckMessagesTest extends TestCase /*NetworkTestSu
|
|||
configureBroker(localBroker);
|
||||
localBroker.start();
|
||||
localBroker.waitUntilStarted();
|
||||
|
||||
|
||||
localBroker.getManagementContext().setConnectorPort(2221);
|
||||
|
||||
|
||||
brokers.put(localBroker.getBrokerName(), localBroker);
|
||||
|
||||
|
||||
return localBroker;
|
||||
}
|
||||
|
||||
|
@ -337,32 +336,32 @@ public class BrokerNetworkWithStuckMessagesTest extends TestCase /*NetworkTestSu
|
|||
configureBroker(remoteBroker);
|
||||
remoteBroker.start();
|
||||
remoteBroker.waitUntilStarted();
|
||||
|
||||
|
||||
remoteBroker.getManagementContext().setConnectorPort(2222);
|
||||
|
||||
|
||||
brokers.put(remoteBroker.getBrokerName(), remoteBroker);
|
||||
|
||||
|
||||
return remoteBroker;
|
||||
}
|
||||
|
||||
|
||||
protected Transport createTransport() throws Exception {
|
||||
Transport transport = TransportFactory.connect(connector.getServer().getConnectURI());
|
||||
return transport;
|
||||
}
|
||||
|
||||
|
||||
protected Transport createRemoteTransport() throws Exception {
|
||||
Transport transport = TransportFactory.connect(remoteConnector.getServer().getConnectURI());
|
||||
return transport;
|
||||
}
|
||||
|
||||
|
||||
protected TransportConnector createConnector() throws Exception, IOException, URISyntaxException {
|
||||
return new TransportConnector(TransportFactory.bind(new URI(getLocalURI())));
|
||||
}
|
||||
|
||||
|
||||
protected TransportConnector createRemoteConnector() throws Exception, IOException, URISyntaxException {
|
||||
return new TransportConnector(TransportFactory.bind(new URI(getRemoteURI())));
|
||||
}
|
||||
|
||||
|
||||
protected String getRemoteURI() {
|
||||
return "vm://remotehost";
|
||||
}
|
||||
|
@ -370,7 +369,7 @@ public class BrokerNetworkWithStuckMessagesTest extends TestCase /*NetworkTestSu
|
|||
protected String getLocalURI() {
|
||||
return "vm://localhost";
|
||||
}
|
||||
|
||||
|
||||
protected StubConnection createConnection() throws Exception {
|
||||
Transport transport = TransportFactory.connect(connector.getServer().getConnectURI());
|
||||
StubConnection connection = new StubConnection(transport);
|
||||
|
@ -384,74 +383,75 @@ public class BrokerNetworkWithStuckMessagesTest extends TestCase /*NetworkTestSu
|
|||
connections.add(connection);
|
||||
return connection;
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
private Object[] browseQueueWithJms(BrokerService broker) throws Exception {
|
||||
Object[] messages = null;
|
||||
Connection connection = null;
|
||||
Session session = null;
|
||||
|
||||
try {
|
||||
URI brokerUri = connector.getUri();
|
||||
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerUri.toString());
|
||||
connection = connectionFactory.createConnection();
|
||||
connection.start();
|
||||
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
Queue destination = session.createQueue(queueName);
|
||||
QueueBrowser browser = session.createBrowser(destination);
|
||||
List<Message> list = new ArrayList<Message>();
|
||||
for (Enumeration<Message> enumn = browser.getEnumeration(); enumn.hasMoreElements();) {
|
||||
list.add(enumn.nextElement());
|
||||
}
|
||||
messages = list.toArray();
|
||||
}
|
||||
finally {
|
||||
if (session != null) {
|
||||
session.close();
|
||||
}
|
||||
if (connection != null) {
|
||||
connection.close();
|
||||
}
|
||||
}
|
||||
LOG.info("+Browsed with JMS: " + messages.length);
|
||||
|
||||
return messages;
|
||||
}
|
||||
|
||||
private Object[] browseQueueWithJmx(BrokerService broker) throws Exception {
|
||||
Hashtable<String, String> params = new Hashtable<String, String>();
|
||||
params.put("BrokerName", broker.getBrokerName());
|
||||
params.put("Type", "Queue");
|
||||
params.put("Destination", queueName);
|
||||
ObjectName queueObjectName = ObjectName.getInstance(amqDomain, params);
|
||||
|
||||
ManagementContext mgmtCtx = broker.getManagementContext();
|
||||
QueueViewMBean queueView = (QueueViewMBean)mgmtCtx.newProxyInstance(queueObjectName, QueueViewMBean.class, true);
|
||||
|
||||
Object[] messages = (Object[]) queueView.browse();
|
||||
|
||||
LOG.info("+Browsed with JMX: " + messages.length);
|
||||
|
||||
@SuppressWarnings({ "unchecked", "unused" })
|
||||
private Object[] browseQueueWithJms(BrokerService broker) throws Exception {
|
||||
Object[] messages = null;
|
||||
Connection connection = null;
|
||||
Session session = null;
|
||||
|
||||
try {
|
||||
URI brokerUri = connector.getUri();
|
||||
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerUri.toString());
|
||||
connection = connectionFactory.createConnection();
|
||||
connection.start();
|
||||
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
Queue destination = session.createQueue(queueName);
|
||||
QueueBrowser browser = session.createBrowser(destination);
|
||||
List<Message> list = new ArrayList<Message>();
|
||||
for (Enumeration<Message> enumn = browser.getEnumeration(); enumn.hasMoreElements();) {
|
||||
list.add(enumn.nextElement());
|
||||
}
|
||||
messages = list.toArray();
|
||||
}
|
||||
finally {
|
||||
if (session != null) {
|
||||
session.close();
|
||||
}
|
||||
if (connection != null) {
|
||||
connection.close();
|
||||
}
|
||||
}
|
||||
LOG.info("+Browsed with JMS: " + messages.length);
|
||||
|
||||
return messages;
|
||||
}
|
||||
|
||||
|
||||
private Object[] browseQueueWithJmx(BrokerService broker) throws Exception {
|
||||
Hashtable<String, String> params = new Hashtable<String, String>();
|
||||
params.put("brokerName", broker.getBrokerName());
|
||||
params.put("type", "Broker");
|
||||
params.put("destinationType", "Queue");
|
||||
params.put("destinationName", queueName);
|
||||
ObjectName queueObjectName = ObjectName.getInstance(amqDomain, params);
|
||||
|
||||
ManagementContext mgmtCtx = broker.getManagementContext();
|
||||
QueueViewMBean queueView = (QueueViewMBean)mgmtCtx.newProxyInstance(queueObjectName, QueueViewMBean.class, true);
|
||||
|
||||
Object[] messages = queueView.browse();
|
||||
|
||||
LOG.info("+Browsed with JMX: " + messages.length);
|
||||
|
||||
return messages;
|
||||
}
|
||||
|
||||
protected ConnectionInfo createConnectionInfo() throws Exception {
|
||||
ConnectionInfo info = new ConnectionInfo();
|
||||
info.setConnectionId(new ConnectionId("connection:" + (++idGenerator)));
|
||||
info.setClientId(info.getConnectionId().getValue());
|
||||
return info;
|
||||
}
|
||||
|
||||
|
||||
protected SessionInfo createSessionInfo(ConnectionInfo connectionInfo) throws Exception {
|
||||
SessionInfo info = new SessionInfo(connectionInfo, ++idGenerator);
|
||||
return info;
|
||||
}
|
||||
|
||||
|
||||
protected ProducerInfo createProducerInfo(SessionInfo sessionInfo) throws Exception {
|
||||
ProducerInfo info = new ProducerInfo(sessionInfo, ++idGenerator);
|
||||
return info;
|
||||
}
|
||||
|
||||
|
||||
protected ConsumerInfo createConsumerInfo(SessionInfo sessionInfo, ActiveMQDestination destination) throws Exception {
|
||||
ConsumerInfo info = new ConsumerInfo(sessionInfo, ++idGenerator);
|
||||
info.setBrowser(false);
|
||||
|
@ -460,7 +460,7 @@ public class BrokerNetworkWithStuckMessagesTest extends TestCase /*NetworkTestSu
|
|||
info.setDispatchAsync(false);
|
||||
return info;
|
||||
}
|
||||
|
||||
|
||||
protected DestinationInfo createTempDestinationInfo(ConnectionInfo connectionInfo, byte destinationType) {
|
||||
DestinationInfo info = new DestinationInfo();
|
||||
info.setConnectionId(connectionInfo.getConnectionId());
|
||||
|
@ -468,7 +468,7 @@ public class BrokerNetworkWithStuckMessagesTest extends TestCase /*NetworkTestSu
|
|||
info.setDestination(ActiveMQDestination.createDestination(info.getConnectionId() + ":" + (++tempDestGenerator), destinationType));
|
||||
return info;
|
||||
}
|
||||
|
||||
|
||||
protected ActiveMQDestination createDestinationInfo(StubConnection connection, ConnectionInfo connectionInfo1, byte destinationType) throws Exception {
|
||||
if ((destinationType & ActiveMQDestination.TEMP_MASK) != 0) {
|
||||
DestinationInfo info = createTempDestinationInfo(connectionInfo1, destinationType);
|
||||
|
@ -478,13 +478,13 @@ public class BrokerNetworkWithStuckMessagesTest extends TestCase /*NetworkTestSu
|
|||
return ActiveMQDestination.createDestination(queueName, destinationType);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
protected Message createMessage(ProducerInfo producerInfo, ActiveMQDestination destination, int deliveryMode) {
|
||||
Message message = createMessage(producerInfo, destination);
|
||||
message.setPersistent(deliveryMode == DeliveryMode.PERSISTENT);
|
||||
return message;
|
||||
}
|
||||
|
||||
|
||||
protected Message createMessage(ProducerInfo producerInfo, ActiveMQDestination destination) {
|
||||
ActiveMQTextMessage message = new ActiveMQTextMessage();
|
||||
message.setMessageId(new MessageId(producerInfo, ++msgIdGenerator));
|
||||
|
@ -506,7 +506,7 @@ public class BrokerNetworkWithStuckMessagesTest extends TestCase /*NetworkTestSu
|
|||
ack.setMessageCount(count);
|
||||
return ack;
|
||||
}
|
||||
|
||||
|
||||
public Message receiveMessage(StubConnection connection) throws InterruptedException {
|
||||
return receiveMessage(connection, maxWait);
|
||||
}
|
||||
|
@ -530,5 +530,4 @@ public class BrokerNetworkWithStuckMessagesTest extends TestCase /*NetworkTestSu
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue