[AMQ-6643] refine fix to allow wildcard subs to non wildcard subscription queues, enable simple wildcard sub to drain all subscription queues

This commit is contained in:
gtully 2017-08-08 16:38:11 +01:00
parent a15626193c
commit a67c75a9e1
2 changed files with 20 additions and 7 deletions

View File

@ -48,8 +48,9 @@ public class MappedQueueFilter extends DestinationFilter {
// recover messages for first consumer only // recover messages for first consumer only
boolean noSubs = getConsumers().isEmpty(); boolean noSubs = getConsumers().isEmpty();
// for virtual consumer wildcard dests, only subscribe to exact match to ensure no duplicates // for virtual consumer wildcard dests, only subscribe to exact match or non wildcard dests to ensure no duplicates
if (sub.getActiveMQDestination().compareTo(next.getActiveMQDestination()) == 0) { int match = sub.getActiveMQDestination().compareTo(next.getActiveMQDestination());
if (match == 0 || (!next.getActiveMQDestination().isPattern() && match == 1)) {
super.addSubscription(context, sub); super.addSubscription(context, sub);
} }
if (noSubs && !getConsumers().isEmpty()) { if (noSubs && !getConsumers().isEmpty()) {

View File

@ -22,9 +22,11 @@ import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap; import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.util.Wait; import org.apache.activemq.util.Wait;
import javax.jms.Connection; import javax.jms.Connection;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer; import javax.jms.MessageProducer;
import javax.jms.Session; import javax.jms.Session;
import javax.jms.TextMessage; import javax.jms.TextMessage;
@ -63,19 +65,21 @@ public class DrainBridgeTest {
System.out.println("Local count: " + drainingBroker.getAdminView().getTotalMessageCount() + ", target count:" + target.getAdminView().getTotalMessageCount()); System.out.println("Local count: " + drainingBroker.getAdminView().getTotalMessageCount() + ", target count:" + target.getAdminView().getTotalMessageCount());
assertEquals("local messages", 20, drainingBroker.getAdminView().getTotalMessageCount()); assertEquals("local messages", 22, drainingBroker.getAdminView().getTotalMessageCount());
assertEquals("no remote messages", 0, target.getAdminView().getTotalMessageCount()); assertEquals("no remote messages", 0, target.getAdminView().getTotalMessageCount());
Wait.waitFor(new Wait.Condition() { Wait.waitFor(new Wait.Condition() {
@Override @Override
public boolean isSatisified() throws Exception { public boolean isSatisified() throws Exception {
System.out.println("Local count: " + drainingBroker.getAdminView().getTotalMessageCount() + ", target count:" + target.getAdminView().getTotalMessageCount());
return drainingBroker.getAdminView().getTotalMessageCount() == 0l; return drainingBroker.getAdminView().getTotalMessageCount() == 0l;
} }
}); });
assertEquals("no local messages", 0, drainingBroker.getAdminView().getTotalMessageCount()); assertEquals("no local messages", 0, drainingBroker.getAdminView().getTotalMessageCount());
assertEquals("remote messages", 20, target.getAdminView().getTotalMessageCount()); assertEquals("remote messages", 22, target.getAdminView().getTotalMessageCount());
assertEquals("number of queues match", drainingBroker.getAdminView().getQueues().length, target.getAdminView().getQueues().length);
drainingBroker.stop(); drainingBroker.stop();
target.stop(); target.stop();
} }
@ -99,10 +103,18 @@ public class DrainBridgeTest {
conn.start(); conn.start();
Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
TextMessage msg = session.createTextMessage("This is a message."); TextMessage msg = session.createTextMessage("This is a message.");
MessageProducer producer = session.createProducer(new ActiveMQQueue("Q.Foo,Bar")); MessageProducer producer = session.createProducer(null);
ActiveMQQueue queue = new ActiveMQQueue("Q.Foo,Bar");
for (int i = 0; i < 10; i++) { for (int i = 0; i < 10; i++) {
producer.send(msg); producer.send(queue, msg);
} }
// add virtual topic consumer Q
MessageConsumer messageConsumerA = session.createConsumer(new ActiveMQQueue("Consumer.A.VirtualTopic.Y"));
MessageConsumer messageConsumeB = session.createConsumer(new ActiveMQQueue("Consumer.B.VirtualTopic.Y"));
producer.send(new ActiveMQTopic("VirtualTopic.Y"), msg);
conn.close(); conn.close();
broker.stop(); broker.stop();
} }