mirror of https://github.com/apache/activemq.git
https://issues.apache.org/jira/browse/AMQ-4271 - virtualSelectorCacheBrokerPlugin support for consumers with no selectors
git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1438032 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
9fece8f3bc
commit
61cbe46fc6
|
@ -86,9 +86,15 @@ public class SubQueueSelectorCacheBroker extends BrokerFilter implements Runnabl
|
||||||
@Override
|
@Override
|
||||||
public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
|
public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
|
||||||
LOG.debug("Caching consumer selector [" + info.getSelector() + "] on a " + info.getDestination().getQualifiedName());
|
LOG.debug("Caching consumer selector [" + info.getSelector() + "] on a " + info.getDestination().getQualifiedName());
|
||||||
if (info.getSelector() != null) {
|
String selector = info.getSelector();
|
||||||
subSelectorCache.put(info.getDestination().getQualifiedName(), info.getSelector());
|
|
||||||
} //if
|
// As ConcurrentHashMap doesn't support null values, use always true expression
|
||||||
|
if (selector == null) {
|
||||||
|
selector = "TRUE";
|
||||||
|
}
|
||||||
|
|
||||||
|
subSelectorCache.put(info.getDestination().getQualifiedName(), selector);
|
||||||
|
|
||||||
return super.addConsumer(context, info);
|
return super.addConsumer(context, info);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -44,10 +44,16 @@ public class VirtualTopicDisconnectSelectorTest extends EmbeddedBrokerTestSuppor
|
||||||
|
|
||||||
private static final Logger LOG = LoggerFactory.getLogger(VirtualTopicDisconnectSelectorTest.class);
|
private static final Logger LOG = LoggerFactory.getLogger(VirtualTopicDisconnectSelectorTest.class);
|
||||||
protected Connection connection;
|
protected Connection connection;
|
||||||
protected int total = 3000;
|
|
||||||
protected String messageSelector;
|
|
||||||
|
|
||||||
public void testVirtualTopicDisconnect() throws Exception {
|
public void testVirtualTopicSelectorDisconnect() throws Exception {
|
||||||
|
testVirtualTopicDisconnect("odd = 'no'", 3000, 1500);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testVirtualTopicNoSelectorDisconnect() throws Exception {
|
||||||
|
testVirtualTopicDisconnect(null, 3000, 3000);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testVirtualTopicDisconnect(String messageSelector, int total , int expected) throws Exception {
|
||||||
if (connection == null) {
|
if (connection == null) {
|
||||||
connection = createConnection();
|
connection = createConnection();
|
||||||
}
|
}
|
||||||
|
@ -63,7 +69,7 @@ public class VirtualTopicDisconnectSelectorTest extends EmbeddedBrokerTestSuppor
|
||||||
LOG.info("Sending to: " + producerDestination);
|
LOG.info("Sending to: " + producerDestination);
|
||||||
LOG.info("Consuming from: " + destination );
|
LOG.info("Consuming from: " + destination );
|
||||||
|
|
||||||
MessageConsumer consumer = session.createConsumer(destination, messageSelector);
|
MessageConsumer consumer = createConsumer(session, destination, messageSelector);
|
||||||
|
|
||||||
MessageListener listener = new MessageListener(){
|
MessageListener listener = new MessageListener(){
|
||||||
public void onMessage(Message message){
|
public void onMessage(Message message){
|
||||||
|
@ -93,12 +99,12 @@ public class VirtualTopicDisconnectSelectorTest extends EmbeddedBrokerTestSuppor
|
||||||
consumer.close();
|
consumer.close();
|
||||||
}
|
}
|
||||||
if (i==reconnectCount){
|
if (i==reconnectCount){
|
||||||
consumer = session.createConsumer(destination, messageSelector);
|
consumer = createConsumer(session, destination, messageSelector);
|
||||||
consumer.setMessageListener(listener);
|
consumer.setMessageListener(listener);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
assertMessagesArrived(messageList,total/2,10000);
|
assertMessagesArrived(messageList, expected ,10000);
|
||||||
}
|
}
|
||||||
|
|
||||||
protected Destination getConsumerDsetination() {
|
protected Destination getConsumerDsetination() {
|
||||||
|
@ -112,7 +118,14 @@ public class VirtualTopicDisconnectSelectorTest extends EmbeddedBrokerTestSuppor
|
||||||
|
|
||||||
protected void setUp() throws Exception {
|
protected void setUp() throws Exception {
|
||||||
super.setUp();
|
super.setUp();
|
||||||
messageSelector = "odd = 'no'";
|
}
|
||||||
|
|
||||||
|
protected MessageConsumer createConsumer(Session session, Destination destination, String messageSelector) throws JMSException {
|
||||||
|
if (messageSelector != null) {
|
||||||
|
return session.createConsumer(destination, messageSelector);
|
||||||
|
} else {
|
||||||
|
return session.createConsumer(destination);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
protected TextMessage createMessage(Session session, int i) throws JMSException {
|
protected TextMessage createMessage(Session session, int i) throws JMSException {
|
||||||
|
|
|
@ -36,7 +36,7 @@
|
||||||
</virtualDestinationInterceptor>
|
</virtualDestinationInterceptor>
|
||||||
</destinationInterceptors>
|
</destinationInterceptors>
|
||||||
<plugins>
|
<plugins>
|
||||||
<virtualSelectorCacheBrokerPlugin persistFile = "selectorcache.data"/>
|
<virtualSelectorCacheBrokerPlugin persistFile = "target/selectorcache.data"/>
|
||||||
</plugins>
|
</plugins>
|
||||||
</broker>
|
</broker>
|
||||||
</beans>
|
</beans>
|
||||||
|
|
Loading…
Reference in New Issue