From 2b6f36dbfd3bc2bd4f9e155d6528ac7346e95c6d Mon Sep 17 00:00:00 2001 From: Bosanac Dejan Date: Thu, 10 Jun 2010 16:43:38 +0000 Subject: [PATCH] first stab at resolving https://issues.apache.org/activemq/browse/AMQ-2695 git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@953384 13f79535-47bb-0310-9956-ffa450edef68 --- .../region/cursors/TopicStorePrefetch.java | 8 +- .../usecases/SubscriptionSelectorTest.java | 162 ++++++++++++++++++ 2 files changed, 166 insertions(+), 4 deletions(-) create mode 100644 activemq-core/src/test/java/org/apache/activemq/usecases/SubscriptionSelectorTest.java diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java index c1bda53588..839f48541c 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java @@ -71,13 +71,13 @@ class TopicStorePrefetch extends AbstractStoreCursor { return false; } - - + @Override protected synchronized int getStoreSize() { try { - return store.getMessageCount(clientId, subscriberName); - } catch (IOException e) { + this.store.recoverNextMessages(clientId, subscriberName, maxBatchSize, this); + return size; + } catch (Exception e) { LOG.error(this + " Failed to get the outstanding message count from the store", e); throw new RuntimeException(e); } diff --git a/activemq-core/src/test/java/org/apache/activemq/usecases/SubscriptionSelectorTest.java b/activemq-core/src/test/java/org/apache/activemq/usecases/SubscriptionSelectorTest.java new file mode 100644 index 0000000000..52f0d4e728 --- /dev/null +++ b/activemq-core/src/test/java/org/apache/activemq/usecases/SubscriptionSelectorTest.java @@ -0,0 +1,162 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.usecases; + +import java.lang.management.ManagementFactory; + +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageListener; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TopicSubscriber; +import javax.management.MBeanServer; +import javax.management.ObjectName; + +import org.apache.activemq.ActiveMQConnection; +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.command.ActiveMQTopic; +import org.apache.activemq.store.amq.AMQPersistenceAdapter; + +public class SubscriptionSelectorTest extends org.apache.activemq.TestSupport { + + MBeanServer mbs; + BrokerService broker = null; + ActiveMQTopic topic; + + ActiveMQConnection consumerConnection = null, producerConnection = null; + Session producerSession; + MessageProducer producer; + + private int received = 0; + + public void testSubscription() throws Exception { + openConsumer(); + for (int i = 0; i < 4000; i++) { + sendMessage(false); + } + Thread.sleep(1000); + + assertEquals("Invalid message received.", 0, received); + + closeProducer(); + closeConsumer(); + stopBroker(); + + startBroker(false); + openConsumer(); + + sendMessage(true); + Thread.sleep(1000); + + assertEquals("Message is not recieved.", 1, received); + + sendMessage(true); + Thread.sleep(100); + + assertEquals("Message is not recieved.", 2, received); + } + + private void openConsumer() throws Exception { + consumerConnection = (ActiveMQConnection) createConnection(); + consumerConnection.setClientID("cliID"); + consumerConnection.start(); + Session session = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + TopicSubscriber subscriber = session.createDurableSubscriber(topic, "subName", "filter=true", false); + + subscriber.setMessageListener(new MessageListener() { + public void onMessage(Message message) { + received++; + } + }); + } + + private void closeConsumer() throws JMSException { + if (consumerConnection != null) + consumerConnection.close(); + consumerConnection = null; + } + + private void sendMessage(boolean filter) throws Exception { + if (producerConnection == null) { + producerConnection = (ActiveMQConnection) createConnection(); + producerConnection.start(); + producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + producer = producerSession.createProducer(topic); + } + + Message message = producerSession.createMessage(); + message.setBooleanProperty("filter", filter); + producer.send(message); + } + + private void closeProducer() throws JMSException { + if (producerConnection != null) + producerConnection.close(); + producerConnection = null; + } + + private int getPendingQueueSize() throws Exception { + ObjectName[] subs = broker.getAdminView().getDurableTopicSubscribers(); + for (ObjectName sub: subs) { + if ("cliID".equals(mbs.getAttribute(sub, "ClientId"))) { + Integer size = (Integer) mbs.getAttribute(sub, "PendingQueueSize"); + return size != null ? size : 0; + } + } + assertTrue(false); + return -1; + } + + private void startBroker(boolean deleteMessages) throws Exception { + broker = new BrokerService(); + broker.setBrokerName("test-broker"); + + //TODO create variants for different stores + //broker.setPersistenceAdapter(new AMQPersistenceAdapter()); + if (deleteMessages) { + broker.setDeleteAllMessagesOnStartup(true); + } + broker.start(); + } + + private void stopBroker() throws Exception { + if (broker != null) + broker.stop(); + broker = null; + } + + protected ActiveMQConnectionFactory createConnectionFactory() throws Exception { + return new ActiveMQConnectionFactory("vm://test-broker?jms.watchTopicAdvisories=false&waitForStart=5000&create=false"); + } + + @Override + protected void setUp() throws Exception { + super.setUp(); + + startBroker(true); + topic = (ActiveMQTopic) createDestination(); + mbs = ManagementFactory.getPlatformMBeanServer(); + } + + @Override + protected void tearDown() throws Exception { + stopBroker(); + super.tearDown(); + } +}