diff --git a/activemq-core/pom.xml b/activemq-core/pom.xml index 27d748edf8..0e28bfa7ad 100755 --- a/activemq-core/pom.xml +++ b/activemq-core/pom.xml @@ -609,7 +609,6 @@ org/apache/activemq/transport/failover/FailoverDuplicateTest.* org/apache/activemq/transport/vm/VmTransportNetworkBrokerTest.* org/apache/activemq/ExpiryHogTest.* - org/apache/activemq/bugs/AMQ2801Test.* org/apache/activemq/usecases/BrowseOverNetworkTest.* org/apache/activemq/usecases/ExpiredMessagesWithNoConsumerTest.* org/apache/kahadb/index/HashIndexTest.* diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java index ffd3a8d1f1..4e30d35889 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java @@ -1169,6 +1169,7 @@ public class Queue extends BaseDestination implements Task, UsageListener { store.resetBatching(); } messages.gc(); + messages.reset(); asyncWakeup(); } finally { messagesLock.writeLock().unlock(); diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java index 55e293c83b..eecb794fd3 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java @@ -253,8 +253,6 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i batchList.clear(); clearIterator(false); batchResetNeeded = true; - // wonder do we need to determine size here, it may change before restart - resetSize(); setCacheEnabled(false); } @@ -269,6 +267,8 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i LOG.trace(this + " - fillBatch"); } if (batchResetNeeded) { + resetSize(); + setMaxBatchSize(Math.min(regionDestination.getMaxPageSize(), size)); resetBatch(); this.batchResetNeeded = false; } @@ -305,7 +305,8 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i @Override public String toString() { return super.toString() + ":" + regionDestination.getActiveMQDestination().getPhysicalName() + ",batchResetNeeded=" + batchResetNeeded - + ",storeHasMessages=" + this.storeHasMessages + ",size=" + this.size + ",cacheEnabled=" + isCacheEnabled(); + + ",storeHasMessages=" + this.storeHasMessages + ",size=" + this.size + ",cacheEnabled=" + isCacheEnabled() + + ",maxBatchSize:" + maxBatchSize; } protected abstract void doFillBatch() throws Exception; diff --git a/activemq-core/src/test/java/org/apache/activemq/broker/XARecoveryBrokerTest.java b/activemq-core/src/test/java/org/apache/activemq/broker/XARecoveryBrokerTest.java index 38b08c2be6..7a32fd812c 100755 --- a/activemq-core/src/test/java/org/apache/activemq/broker/XARecoveryBrokerTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/broker/XARecoveryBrokerTest.java @@ -24,8 +24,6 @@ import javax.management.InstanceNotFoundException; import javax.management.MalformedObjectNameException; import javax.management.ObjectName; import junit.framework.Test; - -import org.apache.activemq.broker.jmx.DestinationView; import org.apache.activemq.broker.jmx.DestinationViewMBean; import org.apache.activemq.broker.jmx.RecoveredXATransactionViewMBean; import org.apache.activemq.broker.region.policy.PolicyEntry; @@ -43,7 +41,6 @@ import org.apache.activemq.command.SessionInfo; import org.apache.activemq.command.TransactionId; import org.apache.activemq.command.TransactionInfo; import org.apache.activemq.command.XATransactionId; -import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter; import org.apache.activemq.util.JMXSupport; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -233,9 +230,10 @@ public class XARecoveryBrokerTest extends BrokerRestartTestSupport { } // We should get the committed transactions. - for (int i = 0; i < expectedMessageCount(4, destination); i++) { + final int countToReceive = expectedMessageCount(4, destination); + for (int i = 0; i < countToReceive ; i++) { Message m = receiveMessage(connection, TimeUnit.SECONDS.toMillis(10)); - assertNotNull(m); + assertNotNull("Got non null message: " + i, m); } assertNoMessagesLeft(connection); diff --git a/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2801Test.java b/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2801Test.java index 3d5bbbdd42..a1d0bc1cdf 100644 --- a/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2801Test.java +++ b/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2801Test.java @@ -181,9 +181,9 @@ public class AMQ2801Test DurableSubscriptionViewMBean sub = (DurableSubscriptionViewMBean) broker.getManagementContext().newProxyInstance(subName, DurableSubscriptionViewMBean.class, true); - LOG.info(sub.getSubscriptionName() + ": pending = " + sub.getPendingQueueSize()); + LOG.info(sub.getSubscriptionName() + ": pending = " + sub.getPendingQueueSize() + ", dispatched: " + sub.getDispatchedQueueSize()); if(sub.getSubscriptionName().equals(SUBSCRIPTION1)) { - assertEquals("Incorrect number of pending messages", MSG_COUNT, sub.getPendingQueueSize()); + assertEquals("Incorrect number of pending messages", MSG_COUNT, sub.getPendingQueueSize() + sub.getDispatchedQueueSize()); } else { assertEquals("Incorrect number of pending messages", 0, sub.getPendingQueueSize()); } diff --git a/activemq-core/src/test/java/org/apache/activemq/usecases/TopicDurableConnectStatsTest.java b/activemq-core/src/test/java/org/apache/activemq/usecases/TopicDurableConnectStatsTest.java new file mode 100644 index 0000000000..c91058add0 --- /dev/null +++ b/activemq-core/src/test/java/org/apache/activemq/usecases/TopicDurableConnectStatsTest.java @@ -0,0 +1,261 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.usecases; + +import java.io.IOException; +import java.lang.management.ManagementFactory; +import java.util.Date; +import java.util.Vector; +import javax.jms.Connection; +import javax.jms.DeliveryMode; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; +import javax.management.MBeanServerConnection; +import javax.management.MBeanServerInvocationHandler; +import javax.management.MalformedObjectNameException; +import javax.management.ObjectName; +import junit.framework.Test; +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.ActiveMQPrefetchPolicy; +import org.apache.activemq.broker.BrokerFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.jmx.DurableSubscriptionViewMBean; +import org.apache.activemq.command.ActiveMQTopic; +import org.apache.activemq.util.Wait; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class TopicDurableConnectStatsTest extends org.apache.activemq.TestSupport { + + private static final Logger LOG = LoggerFactory.getLogger(TopicDurableConnectStatsTest.class); + private BrokerService broker; + private ActiveMQTopic topic; + private Vector exceptions = new Vector(); + private int messageSize = 4000; + protected MBeanServerConnection mbeanServer; + protected String domain = "org.apache.activemq"; + private ActiveMQConnectionFactory connectionFactory = null; + final int numMessages = 20; + + private static Session session2 = null; + + protected ActiveMQConnectionFactory createConnectionFactory() throws Exception { + + connectionFactory = new ActiveMQConnectionFactory("vm://" + getName(true)); + + ActiveMQPrefetchPolicy prefetchPolicy = new ActiveMQPrefetchPolicy(); + prefetchPolicy.setAll(10); + connectionFactory.setPrefetchPolicy(prefetchPolicy); + + connectionFactory.setWatchTopicAdvisories(false); + return connectionFactory; + } + + @Override + protected Connection createConnection() throws Exception { + return createConnection("cliName"); + } + + protected Connection createConnection(String name) throws Exception { + Connection con = super.createConnection(); + con.setClientID(name); + con.start(); + return con; + } + + public static Test suite() { + return suite(TopicDurableConnectStatsTest.class); + } + + protected void setUp() throws Exception { + exceptions.clear(); + topic = (ActiveMQTopic) createDestination(); + + createBroker(); + mbeanServer = ManagementFactory.getPlatformMBeanServer(); + super.setUp(); + } + + protected void tearDown() throws Exception { + super.tearDown(); + destroyBroker(); + } + + private void createBroker() throws Exception { + createBroker(true); + } + + private void createBroker(boolean deleteAllMessages) throws Exception { + broker = BrokerFactory.createBroker("broker:(vm://" + getName(true) + ")"); + broker.setBrokerName(getName(true)); + broker.setDeleteAllMessagesOnStartup(deleteAllMessages); + broker.setAdvisorySupport(false); + broker.addConnector("tcp://0.0.0.0:0"); + + setDefaultPersistenceAdapter(broker); + broker.start(); + } + + private void destroyBroker() throws Exception { + if (broker != null) + broker.stop(); + } + + protected ObjectName assertRegisteredObjectName(String name) throws MalformedObjectNameException, NullPointerException { + ObjectName objectName = new ObjectName(name); + + LOG.info("** Looking for " + name); + try { + if (mbeanServer.isRegistered(objectName)) { + LOG.info("Bean Registered: " + objectName); + } else { + LOG.info("Couldn't find Mbean! " + objectName); + + } + } catch (IOException e) { + e.printStackTrace(); + } + return objectName; + } + + public void testPendingTopicStat() throws Exception { + + Connection consumerCon = createConnection("cliId1"); + Session consumerSession = consumerCon.createSession(true, Session.AUTO_ACKNOWLEDGE); + MessageConsumer consumer1 = consumerSession.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true); + + DurableSubscriptionViewMBean subscriber1 = null; + + ObjectName subscriberObjName1 = assertRegisteredObjectName(domain + ":BrokerName=" + getName(true) + ",Type=Subscription,persistentMode=Durable,subscriptionID=SubsId,destinationType=Topic,destinationName=" + topic.getTopicName() + ",clientId=cliId1"); + subscriber1 = (DurableSubscriptionViewMBean) MBeanServerInvocationHandler.newProxyInstance(mbeanServer, subscriberObjName1, DurableSubscriptionViewMBean.class, true); + + LOG.info("Beginning Pending Queue Size count: " + subscriber1.getPendingQueueSize()); + LOG.info("Prefetch Limit: " + subscriber1.getPrefetchSize()); + + assertEquals("no pending", 0, subscriber1.getPendingQueueSize()); + assertEquals("Prefetch Limit ", 10, subscriber1.getPrefetchSize()); + + + Connection producerCon = createConnection("x"); + Session producerSessions = producerCon.createSession(true, Session.AUTO_ACKNOWLEDGE); + + + MessageProducer producer = producerSessions.createProducer(topic); + producer.setDeliveryMode(DeliveryMode.PERSISTENT); + + int i = 0; + for (; i < numMessages; i++) { + + if (i == 15) { + // kill consumer + + LOG.info("Killing consumer at 15"); + consumerSession.close(); + consumerCon.close(); + } + + TextMessage message = producerSessions.createTextMessage(createMessageText(i)); + message.setJMSExpiration(0); + message.setStringProperty("filter", "true"); + producer.send(topic, message); + producerSessions.commit(); + + } + LOG.info("Sent " + i + " messages in total"); + producerCon.close(); + + LOG.info("Pending Queue Size count: " + subscriber1.getPendingQueueSize()); + assertEquals("pending as expected", 20, subscriber1.getPendingQueueSize()); + + LOG.info("Re-connect client and consume messages"); + Connection con2 = createConnection("cliId1"); + session2 = con2.createSession(true, Session.AUTO_ACKNOWLEDGE); + MessageConsumer consumer2 = session2.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true); + + + final Listener listener = new Listener(); + consumer2.setMessageListener(listener); + + assertTrue("received all sent", Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + return numMessages == listener.count; + } + })); + + LOG.info("Received: " + listener.count); + + int pq = subscriber1.getPendingQueueSize(); + LOG.info("Pending Queue Size count: " + pq); + assertEquals("Pending queue after consumed", 0, pq); + + session2.close(); + con2.close(); + LOG.info("FINAL Pending Queue Size count (after consumer close): " + subscriber1.getPendingQueueSize()); + } + + + private String createMessageText(int index) { + StringBuffer buffer = new StringBuffer(messageSize); + buffer.append("Message: " + index + " sent at: " + new Date()); + if (buffer.length() > messageSize) { + return buffer.substring(0, messageSize); + } + for (int i = buffer.length(); i < messageSize; i++) { + buffer.append(' '); + } + return buffer.toString(); + } + + + public static class Listener implements MessageListener { + int count = 0; + String id = null; + + Listener() { + } + + public void onMessage(Message message) { + count++; + try { + session2.commit(); + } catch (JMSException e1) { + e1.printStackTrace(); + } + + if (id != null) { + try { + LOG.info(id + ", " + message.getJMSMessageID()); + } catch (Exception ignored) { + } + } + + try { + Thread.sleep(2); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + } +} + +