mirror of https://github.com/apache/activemq.git
Made the peer test a little more reliable by using consumer advisory messages to know when the peers have fully connected in the cluster.
git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@386258 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
70f9d61313
commit
38f7b126d6
|
@ -17,9 +17,13 @@
|
|||
|
||||
package org.apache.activemq.transport.peer;
|
||||
import org.apache.activemq.ActiveMQConnectionFactory;
|
||||
import org.apache.activemq.advisory.AdvisorySupport;
|
||||
import org.apache.activemq.command.ActiveMQDestination;
|
||||
import org.apache.activemq.command.ActiveMQMessage;
|
||||
import org.apache.activemq.command.ActiveMQQueue;
|
||||
import org.apache.activemq.command.ActiveMQTextMessage;
|
||||
import org.apache.activemq.command.ActiveMQTopic;
|
||||
import org.apache.activemq.command.ConsumerInfo;
|
||||
import org.apache.activemq.util.MessageIdList;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
@ -28,6 +32,7 @@ import javax.jms.Connection;
|
|||
import javax.jms.DeliveryMode;
|
||||
import javax.jms.Destination;
|
||||
import javax.jms.JMSException;
|
||||
import javax.jms.Message;
|
||||
import javax.jms.MessageConsumer;
|
||||
import javax.jms.MessageProducer;
|
||||
import javax.jms.Session;
|
||||
|
@ -40,7 +45,7 @@ import junit.framework.TestCase;
|
|||
*/
|
||||
public class PeerTransportTest extends TestCase {
|
||||
protected Log log = LogFactory.getLog(getClass());
|
||||
protected Destination destination;
|
||||
protected ActiveMQDestination destination;
|
||||
protected boolean topic = true;
|
||||
protected static int MESSAGE_COUNT = 50;
|
||||
protected static int NUMBER_IN_CLUSTER = 3;
|
||||
|
@ -54,10 +59,11 @@ public class PeerTransportTest extends TestCase {
|
|||
connections = new Connection[NUMBER_IN_CLUSTER];
|
||||
producers = new MessageProducer[NUMBER_IN_CLUSTER];
|
||||
messageIdList = new MessageIdList[NUMBER_IN_CLUSTER];
|
||||
Destination destination = createDestination();
|
||||
ActiveMQDestination destination = createDestination();
|
||||
|
||||
String root = System.getProperty("activemq.store.dir");
|
||||
|
||||
|
||||
for (int i = 0;i < NUMBER_IN_CLUSTER;i++) {
|
||||
connections[i] = createConnection(i);
|
||||
connections[i].setClientID("ClusterTest" + i);
|
||||
|
@ -70,9 +76,30 @@ public class PeerTransportTest extends TestCase {
|
|||
messageIdList[i] = new MessageIdList();
|
||||
consumer.setMessageListener(messageIdList[i]);
|
||||
}
|
||||
System.out.println("Sleeping to ensure cluster is fully connected");
|
||||
Thread.sleep(10000);
|
||||
System.out.println("Finished sleeping");
|
||||
|
||||
System.out.println("Waiting for cluster to be fully connected");
|
||||
|
||||
// Each connection should see that NUMBER_IN_CLUSTER consumers get registered on the destination.
|
||||
ActiveMQDestination advisoryDest = AdvisorySupport.getConsumerAdvisoryTopic(destination);
|
||||
for (int i = 0;i < NUMBER_IN_CLUSTER;i++) {
|
||||
Session session = connections[i].createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
MessageConsumer consumer = createMessageConsumer(session, advisoryDest);
|
||||
|
||||
int j=0;
|
||||
while(j < NUMBER_IN_CLUSTER) {
|
||||
ActiveMQMessage message = (ActiveMQMessage) consumer.receive(1000);
|
||||
if( message == null ) {
|
||||
fail("Connection "+i+" saw "+j+" consumers, expected: "+NUMBER_IN_CLUSTER);
|
||||
}
|
||||
if( message.getDataStructure()!=null && message.getDataStructure().getDataStructureType()==ConsumerInfo.DATA_STRUCTURE_TYPE ) {
|
||||
j++;
|
||||
}
|
||||
}
|
||||
|
||||
session.close();
|
||||
}
|
||||
|
||||
System.out.println("Cluster is online.");
|
||||
}
|
||||
|
||||
protected void tearDown() throws Exception {
|
||||
|
@ -93,11 +120,11 @@ public class PeerTransportTest extends TestCase {
|
|||
return fac.createConnection();
|
||||
}
|
||||
|
||||
protected Destination createDestination() {
|
||||
protected ActiveMQDestination createDestination() {
|
||||
return createDestination(getClass().getName());
|
||||
}
|
||||
|
||||
protected Destination createDestination(String name) {
|
||||
protected ActiveMQDestination createDestination(String name) {
|
||||
if (topic) {
|
||||
return new ActiveMQTopic(name);
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue