mirror of https://github.com/apache/activemq.git
git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@743320 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
69e5f769b2
commit
b2148c5e7a
|
@ -425,11 +425,7 @@
|
|||
<!-- m2 tests failing since move from assembly -->
|
||||
<exclude>**/TwoBrokerMessageNotSentToRemoteWhenNoConsumerTest.*</exclude>
|
||||
<exclude>**/TwoBrokerQueueClientsReconnectTest.*</exclude>
|
||||
<exclude>**/ThreeBrokerQueueNetworkUsingTcpTest.*</exclude>
|
||||
<exclude>**/QueueConsumerCloseAndReconnectTest.*</exclude>
|
||||
<exclude>**/ThreeBrokerQueueNetworkTest.*</exclude>
|
||||
<exclude>**/ThreeBrokerTopicNetworkTest.*</exclude>
|
||||
<exclude>**/ThreeBrokerTopicNetworkUsingTcpTest.*</exclude>
|
||||
<exclude>**/TwoBrokerMulticastQueueTest.*</exclude>
|
||||
|
||||
<!-- TODO move to optional module... -->
|
||||
|
|
|
@ -37,7 +37,7 @@ public class DemandSubscription {
|
|||
DemandSubscription(ConsumerInfo info) {
|
||||
remoteInfo = info;
|
||||
localInfo = info.copy();
|
||||
localInfo.setSelector(null);
|
||||
localInfo.setSelector(info.getSelector());
|
||||
localInfo.setBrokerPath(info.getBrokerPath());
|
||||
localInfo.setNetworkSubscription(true);
|
||||
remoteSubsIds.add(info.getConsumerId());
|
||||
|
|
|
@ -189,13 +189,21 @@ public class JmsMultipleBrokersTestSupport extends CombinationTestSupport {
|
|||
}
|
||||
|
||||
protected MessageConsumer createConsumer(String brokerName, Destination dest) throws Exception {
|
||||
return createConsumer(brokerName, dest, null);
|
||||
return createConsumer(brokerName, dest, null, null);
|
||||
}
|
||||
|
||||
protected MessageConsumer createConsumer(String brokerName, Destination dest, String messageSelector) throws Exception {
|
||||
return createConsumer(brokerName, dest, null, messageSelector);
|
||||
}
|
||||
|
||||
protected MessageConsumer createConsumer(String brokerName, Destination dest, CountDownLatch latch) throws Exception {
|
||||
return createConsumer(brokerName, dest, latch, null);
|
||||
}
|
||||
|
||||
protected MessageConsumer createConsumer(String brokerName, Destination dest, CountDownLatch latch, String messageSelector) throws Exception {
|
||||
BrokerItem brokerItem = brokers.get(brokerName);
|
||||
if (brokerItem != null) {
|
||||
return brokerItem.createConsumer(dest, latch);
|
||||
return brokerItem.createConsumer(dest, latch, messageSelector);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
@ -257,6 +265,10 @@ public class JmsMultipleBrokersTestSupport extends CombinationTestSupport {
|
|||
|
||||
|
||||
protected void sendMessages(String brokerName, Destination destination, int count) throws Exception {
|
||||
sendMessages(brokerName, destination, count, null);
|
||||
}
|
||||
|
||||
protected void sendMessages(String brokerName, Destination destination, int count, HashMap<String, Object>properties) throws Exception {
|
||||
BrokerItem brokerItem = brokers.get(brokerName);
|
||||
|
||||
Connection conn = brokerItem.createConnection();
|
||||
|
@ -268,6 +280,11 @@ public class JmsMultipleBrokersTestSupport extends CombinationTestSupport {
|
|||
|
||||
for (int i = 0; i < count; i++) {
|
||||
TextMessage msg = createTextMessage(sess, conn.getClientID() + ": Message-" + i);
|
||||
if (properties != null) {
|
||||
for (String propertyName : properties.keySet()) {
|
||||
msg.setObjectProperty(propertyName, properties.get(propertyName));
|
||||
}
|
||||
}
|
||||
producer.send(msg);
|
||||
onSend(i, msg);
|
||||
}
|
||||
|
@ -368,22 +385,26 @@ public class JmsMultipleBrokersTestSupport extends CombinationTestSupport {
|
|||
}
|
||||
|
||||
public MessageConsumer createConsumer(Destination dest) throws Exception {
|
||||
return createConsumer(dest, null);
|
||||
return createConsumer(dest, null, null);
|
||||
}
|
||||
|
||||
public MessageConsumer createConsumer(Destination dest, String messageSelector) throws Exception {
|
||||
return createConsumer(dest, null, messageSelector);
|
||||
}
|
||||
|
||||
public MessageConsumer createConsumer(Destination dest, CountDownLatch latch) throws Exception {
|
||||
public MessageConsumer createConsumer(Destination dest, CountDownLatch latch, String messageSelector) throws Exception {
|
||||
Connection c = createConnection();
|
||||
c.start();
|
||||
Session s = c.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
return createConsumerWithSession(dest, s, latch);
|
||||
return createConsumerWithSession(dest, s, latch, messageSelector);
|
||||
}
|
||||
|
||||
public MessageConsumer createConsumerWithSession(Destination dest, Session sess) throws Exception {
|
||||
return createConsumerWithSession(dest, sess, null);
|
||||
return createConsumerWithSession(dest, sess, null, null);
|
||||
}
|
||||
|
||||
public MessageConsumer createConsumerWithSession(Destination dest, Session sess, CountDownLatch latch) throws Exception {
|
||||
MessageConsumer client = sess.createConsumer(dest);
|
||||
public MessageConsumer createConsumerWithSession(Destination dest, Session sess, CountDownLatch latch, String messageSelector) throws Exception {
|
||||
MessageConsumer client = sess.createConsumer(dest, messageSelector);
|
||||
MessageIdList messageIdList = new MessageIdList();
|
||||
messageIdList.setCountDownLatch(latch);
|
||||
messageIdList.setParent(allMessages);
|
||||
|
|
|
@ -17,6 +17,7 @@
|
|||
package org.apache.activemq.usecases;
|
||||
|
||||
import java.net.URI;
|
||||
import java.util.HashMap;
|
||||
|
||||
import javax.jms.Destination;
|
||||
import javax.jms.MessageConsumer;
|
||||
|
@ -87,6 +88,88 @@ public class ThreeBrokerQueueNetworkTest extends JmsMultipleBrokersTestSupport {
|
|||
// Total received should be 100
|
||||
assertEquals(MESSAGE_COUNT, msgsA.getMessageCount() + msgsC.getMessageCount());
|
||||
}
|
||||
|
||||
/**
|
||||
* BrokerA <- BrokerB -> BrokerC
|
||||
*/
|
||||
public void testBAandBCbrokerNetworkWithSelectorsSendFirst() throws Exception {
|
||||
// Setup broker networks
|
||||
bridgeBrokers("BrokerB", "BrokerA");
|
||||
bridgeBrokers("BrokerB", "BrokerC");
|
||||
|
||||
startAllBrokers();
|
||||
|
||||
// Setup destination
|
||||
Destination dest = createDestination("TEST.FOO", false);
|
||||
|
||||
|
||||
// Send messages for broker A
|
||||
HashMap<String, Object> props = new HashMap<String, Object>();
|
||||
props.put("broker", "BROKER_A");
|
||||
sendMessages("BrokerB", dest, MESSAGE_COUNT, props);
|
||||
|
||||
//Send messages for broker C
|
||||
props.clear();
|
||||
props.put("broker", "BROKER_C");
|
||||
sendMessages("BrokerB", dest, MESSAGE_COUNT, props);
|
||||
|
||||
// Setup consumers
|
||||
MessageConsumer clientA = createConsumer("BrokerA", dest, "broker = 'BROKER_A'");
|
||||
MessageConsumer clientC = createConsumer("BrokerC", dest, "broker = 'BROKER_C'");
|
||||
Thread.sleep(2000); //et subscriptions get propagated
|
||||
|
||||
// Let's try to wait for any messages.
|
||||
//Thread.sleep(1000);
|
||||
|
||||
// Get message count
|
||||
MessageIdList msgsA = getConsumerMessages("BrokerA", clientA);
|
||||
MessageIdList msgsC = getConsumerMessages("BrokerC", clientC);
|
||||
|
||||
// Total received should be 100
|
||||
assertEquals(MESSAGE_COUNT, msgsA.getMessageCount());
|
||||
assertEquals(MESSAGE_COUNT, msgsC.getMessageCount());
|
||||
}
|
||||
|
||||
/**
|
||||
* BrokerA <- BrokerB -> BrokerC
|
||||
*/
|
||||
public void testBAandBCbrokerNetworkWithSelectorsSubscribeFirst() throws Exception {
|
||||
// Setup broker networks
|
||||
bridgeBrokers("BrokerB", "BrokerA");
|
||||
bridgeBrokers("BrokerB", "BrokerC");
|
||||
|
||||
startAllBrokers();
|
||||
|
||||
// Setup destination
|
||||
Destination dest = createDestination("TEST.FOO", false);
|
||||
|
||||
// Setup consumers
|
||||
MessageConsumer clientA = createConsumer("BrokerA", dest, "broker = 'BROKER_A'");
|
||||
MessageConsumer clientC = createConsumer("BrokerC", dest, "broker = 'BROKER_C'");
|
||||
Thread.sleep(2000); //et subscriptions get propagated
|
||||
|
||||
|
||||
// Send messages for broker A
|
||||
HashMap<String, Object> props = new HashMap<String, Object>();
|
||||
props.put("broker", "BROKER_A");
|
||||
sendMessages("BrokerB", dest, MESSAGE_COUNT, props);
|
||||
|
||||
//Send messages for broker C
|
||||
props.clear();
|
||||
props.put("broker", "BROKER_C");
|
||||
sendMessages("BrokerB", dest, MESSAGE_COUNT, props);
|
||||
|
||||
// Let's try to wait for any messages.
|
||||
Thread.sleep(1000);
|
||||
|
||||
// Get message count
|
||||
MessageIdList msgsA = getConsumerMessages("BrokerA", clientA);
|
||||
MessageIdList msgsC = getConsumerMessages("BrokerC", clientC);
|
||||
|
||||
// Total received should be 100
|
||||
assertEquals(MESSAGE_COUNT, msgsA.getMessageCount());
|
||||
assertEquals(MESSAGE_COUNT, msgsC.getMessageCount());
|
||||
}
|
||||
|
||||
/**
|
||||
* BrokerA -> BrokerB <- BrokerC
|
||||
|
|
Loading…
Reference in New Issue