From c50b8e49c453748405a480a7bf466ae01eda6cca Mon Sep 17 00:00:00 2001 From: "Timothy A. Bish" Date: Mon, 15 Jul 2013 14:21:23 +0000 Subject: [PATCH] fix for: https://issues.apache.org/jira/browse/AMQ-4595 git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1503263 13f79535-47bb-0310-9956-ffa450edef68 --- .../apache/activemq/broker/region/Queue.java | 81 ++++----- .../region/QueueBrowserSubscription.java | 19 +++ .../org/apache/activemq/bugs/AMQ4595Test.java | 156 ++++++++++++++++++ 3 files changed, 211 insertions(+), 45 deletions(-) create mode 100644 activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4595Test.java diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java index 14d7d29264..5e5f6026a5 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java @@ -108,8 +108,7 @@ public class Queue extends BaseDestination implements Task, UsageListener { protected PendingMessageCursor messages; private final ReentrantReadWriteLock pagedInMessagesLock = new ReentrantReadWriteLock(); private final LinkedHashMap pagedInMessages = new LinkedHashMap(); - // Messages that are paged in but have not yet been targeted at a - // subscription + // Messages that are paged in but have not yet been targeted at a subscription private final ReentrantReadWriteLock pagedInPendingDispatchLock = new ReentrantReadWriteLock(); protected PendingList pagedInPendingDispatch = new OrderedPendingList(); protected PendingList redeliveredWaitingDispatch = new OrderedPendingList(); @@ -177,7 +176,6 @@ public class Queue extends BaseDestination implements Task, UsageListener { } return returnValue; } - } DelayQueue flowControlTimeoutMessages = new DelayQueue(); @@ -246,7 +244,7 @@ public class Queue extends BaseDestination implements Task, UsageListener { consumersLock.readLock().lock(); try { return new ArrayList(consumers); - }finally { + } finally { consumersLock.readLock().unlock(); } } @@ -447,10 +445,9 @@ public class Queue extends BaseDestination implements Task, UsageListener { sub.add(context, this); // needs to be synchronized - so no contention with dispatching - // consumersLock. + // consumersLock. consumersLock.writeLock().lock(); try { - // set a flag if this is a first consumer if (consumers.size() == 0) { firstConsumer = true; @@ -474,7 +471,7 @@ public class Queue extends BaseDestination implements Task, UsageListener { } dispatchSelector.setExclusiveConsumer(exclusiveConsumer); } - }finally { + } finally { consumersLock.writeLock().unlock(); } @@ -488,7 +485,7 @@ public class Queue extends BaseDestination implements Task, UsageListener { if (!this.optimizedDispatch) { wakeup(); } - }finally { + } finally { pagedInPendingDispatchLock.writeLock().unlock(); } if (this.optimizedDispatch) { @@ -593,13 +590,13 @@ public class Queue extends BaseDestination implements Task, UsageListener { if (!redeliveredWaitingDispatch.isEmpty()) { doDispatch(new OrderedPendingList()); } - }finally { + } finally { consumersLock.writeLock().unlock(); } if (!this.optimizedDispatch) { wakeup(); } - }finally { + } finally { pagedInPendingDispatchLock.writeLock().unlock(); } if (this.optimizedDispatch) { @@ -639,13 +636,12 @@ public class Queue extends BaseDestination implements Task, UsageListener { if (isProducerFlowControl() && context.isProducerFlowControl()) { if (warnOnProducerFlowControl) { warnOnProducerFlowControl = false; - LOG - .info("Usage Manager Memory Limit (" - + memoryUsage.getLimit() - + ") reached on " - + getActiveMQDestination().getQualifiedName() - + ". Producers will be throttled to the rate at which messages are removed from this destination to prevent flooding it." - + " See http://activemq.apache.org/producer-flow-control.html for more info"); + LOG.info("Usage Manager Memory Limit (" + + memoryUsage.getLimit() + + ") reached on " + + getActiveMQDestination().getQualifiedName() + + ". Producers will be throttled to the rate at which messages are removed from this destination to prevent flooding it." + + " See http://activemq.apache.org/producer-flow-control.html for more info"); } if (!context.isNetworkConnection() && systemUsage.isSendFailIfNoSpace()) { @@ -979,9 +975,9 @@ public class Queue extends BaseDestination implements Task, UsageListener { public String toString() { int size = 0; messagesLock.readLock().lock(); - try{ + try { size = messages.size(); - }finally { + } finally { messagesLock.readLock().unlock(); } return destination.getQualifiedName() + ", subscriptions=" + consumers.size() @@ -1120,7 +1116,6 @@ public class Queue extends BaseDestination implements Task, UsageListener { return allConsumersExclusiveByDefault; } - // Implementation methods // ------------------------------------------------------------------------- private QueueMessageReference createMessageReference(Message message) { @@ -1209,7 +1204,6 @@ public class Queue extends BaseDestination implements Task, UsageListener { messagesLock.writeLock().unlock(); } } - } catch (Exception e) { LOG.error("Problem retrieving message for browse", e); } @@ -1230,12 +1224,12 @@ public class Queue extends BaseDestination implements Task, UsageListener { public QueueMessageReference getMessage(String id) { MessageId msgId = new MessageId(id); pagedInMessagesLock.readLock().lock(); - try{ + try { QueueMessageReference ref = this.pagedInMessages.get(msgId); if (ref != null) { return ref; } - }finally { + } finally { pagedInMessagesLock.readLock().unlock(); } messagesLock.readLock().lock(); @@ -1282,6 +1276,7 @@ public class Queue extends BaseDestination implements Task, UsageListener { // don't spin/hang if stats are out and there is nothing left in the // store } while (!list.isEmpty() && this.destinationStatistics.getMessages().getCount() > 0); + if (this.destinationStatistics.getMessages().getCount() > 0) { LOG.warn(getActiveMQDestination().getQualifiedName() + " after purge complete, message count stats report: " @@ -1346,9 +1341,9 @@ public class Queue extends BaseDestination implements Task, UsageListener { do { doPageIn(true); pagedInMessagesLock.readLock().lock(); - try{ + try { set.addAll(pagedInMessages.values()); - }finally { + } finally { pagedInMessagesLock.readLock().unlock(); } List list = new ArrayList(set); @@ -1415,7 +1410,7 @@ public class Queue extends BaseDestination implements Task, UsageListener { pagedInMessagesLock.readLock().lock(); try { set.addAll(pagedInMessages.values()); - }finally { + } finally { pagedInMessagesLock.readLock().unlock(); } List list = new ArrayList(set); @@ -1505,9 +1500,9 @@ public class Queue extends BaseDestination implements Task, UsageListener { do { doPageIn(true); pagedInMessagesLock.readLock().lock(); - try{ + try { set.addAll(pagedInMessages.values()); - }finally { + } finally { pagedInMessagesLock.readLock().unlock(); } List list = new ArrayList(set); @@ -1534,9 +1529,9 @@ public class Queue extends BaseDestination implements Task, UsageListener { do { doPageIn(true); pagedInMessagesLock.readLock().lock(); - try{ + try { set.addAll(pagedInMessages.values()); - }finally { + } finally { pagedInMessagesLock.readLock().unlock(); } List list = new ArrayList(set); @@ -1615,7 +1610,6 @@ public class Queue extends BaseDestination implements Task, UsageListener { } } - messagesLock.readLock().lock(); try{ pageInMoreMessages |= !messages.isEmpty(); @@ -1640,7 +1634,6 @@ public class Queue extends BaseDestination implements Task, UsageListener { if (pageInMoreMessages || hasBrowsers || !redeliveredWaitingDispatch.isEmpty()) { try { pageInMessages(hasBrowsers); - } catch (Throwable e) { LOG.error("Failed to page in more queue messages ", e); } @@ -1670,7 +1663,7 @@ public class Queue extends BaseDestination implements Task, UsageListener { } boolean added = false; for (QueueMessageReference node : alreadyDispatchedMessages) { - if (!node.isAcked() && !browser.getPending().getMessageAudit().isDuplicate(node.getMessageId())) { + if (!node.isAcked() && !browser.isDuplicate(node.getMessageId())) { msgContext.setMessageReference(node); if (browser.matches(node, msgContext)) { browser.add(node); @@ -1750,7 +1743,6 @@ public class Queue extends BaseDestination implements Task, UsageListener { } finally { pagedInPendingDispatchLock.writeLock().unlock(); } - } protected void removeMessage(ConnectionContext c, Subscription subs, QueueMessageReference r) throws IOException { @@ -1793,9 +1785,9 @@ public class Queue extends BaseDestination implements Task, UsageListener { if (ack.isPoisonAck() || (sub != null && sub.getConsumerInfo().isNetworkSubscription())) { // message gone to DLQ, is ok to allow redelivery messagesLock.writeLock().lock(); - try{ + try { messages.rollback(reference.getMessageId()); - }finally { + } finally { messagesLock.writeLock().unlock(); } } @@ -1841,9 +1833,9 @@ public class Queue extends BaseDestination implements Task, UsageListener { final void sendMessage(final Message msg) throws Exception { messagesLock.writeLock().lock(); - try{ + try { messages.addMessageLast(msg); - }finally { + } finally { messagesLock.writeLock().unlock(); } } @@ -2044,7 +2036,7 @@ public class Queue extends BaseDestination implements Task, UsageListener { return list; } consumers = new ArrayList(this.consumers); - }finally { + } finally { consumersLock.writeLock().unlock(); } @@ -2104,7 +2096,7 @@ public class Queue extends BaseDestination implements Task, UsageListener { addToConsumerList(target); consumers = new ArrayList(this.consumers); } - }finally { + } finally { consumersLock.writeLock().unlock(); } } @@ -2149,7 +2141,6 @@ public class Queue extends BaseDestination implements Task, UsageListener { } return result; - } protected void assignGroup(Subscription subs, MessageGroupMap messageGroupOwners, MessageReference n, String groupId) throws IOException { @@ -2187,13 +2178,13 @@ public class Queue extends BaseDestination implements Task, UsageListener { int total = 0; boolean zeroPrefetch = false; consumersLock.readLock().lock(); - try{ + try { for (Subscription s : consumers) { zeroPrefetch |= s.getPrefetchSize() == 0; int countBeforeFull = s.countBeforeFull(); total += countBeforeFull; } - }finally { + } finally { consumersLock.readLock().unlock(); } if (total == 0 && zeroPrefetch) { @@ -2306,7 +2297,7 @@ public class Queue extends BaseDestination implements Task, UsageListener { break; } } - }finally { + } finally { consumersLock.readLock().unlock(); } return sub; @@ -2346,7 +2337,7 @@ public class Queue extends BaseDestination implements Task, UsageListener { } } } - }finally { + } finally { consumersLock.readLock().unlock(); } } diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/QueueBrowserSubscription.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/QueueBrowserSubscription.java index 74a673d960..9bc3c1d102 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/QueueBrowserSubscription.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/QueueBrowserSubscription.java @@ -18,7 +18,9 @@ package org.apache.activemq.broker.region; import java.io.IOException; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; import javax.jms.JMSException; @@ -26,15 +28,22 @@ import org.apache.activemq.broker.Broker; import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.command.ConsumerInfo; import org.apache.activemq.command.MessageAck; +import org.apache.activemq.command.MessageId; import org.apache.activemq.filter.MessageEvaluationContext; import org.apache.activemq.usage.SystemUsage; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class QueueBrowserSubscription extends QueueSubscription { + protected static final Logger LOG = LoggerFactory.getLogger(QueueBrowserSubscription.class); + int queueRefs; boolean browseDone; boolean destinationsAdded; + private final Map audit = new HashMap(); + public QueueBrowserSubscription(Broker broker, SystemUsage usageManager, ConnectionContext context, ConsumerInfo info) throws JMSException { super(broker, usageManager, context, info); } @@ -56,6 +65,16 @@ public class QueueBrowserSubscription extends QueueSubscription { checkDone(); } + public boolean isDuplicate(MessageId messageId) { + + if (!audit.containsKey(messageId)) { + audit.put(messageId, Boolean.TRUE); + return false; + } + + return true; + } + private void checkDone() throws Exception { if (!browseDone && queueRefs == 0 && destinationsAdded) { browseDone = true; diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4595Test.java b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4595Test.java new file mode 100644 index 0000000000..04d3620df0 --- /dev/null +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4595Test.java @@ -0,0 +1,156 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.bugs; + +import static org.junit.Assert.assertEquals; + +import java.net.URI; +import java.util.Date; +import java.util.Enumeration; + +import javax.jms.Connection; +import javax.jms.DeliveryMode; +import javax.jms.JMSException; +import javax.jms.MessageProducer; +import javax.jms.QueueBrowser; +import javax.jms.Session; +import javax.jms.TextMessage; + +import org.apache.activemq.ActiveMQConnection; +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.TransportConnector; +import org.apache.activemq.command.ActiveMQQueue; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class AMQ4595Test { + + private static final Logger LOG = LoggerFactory.getLogger(AMQ4595Test.class); + + private BrokerService broker; + private URI connectUri; + private ActiveMQConnectionFactory factory; + + @Before + public void startBroker() throws Exception { + broker = new BrokerService(); + TransportConnector connector = broker.addConnector("vm://localhost"); + broker.deleteAllMessages(); + +// PolicyEntry policy = new PolicyEntry(); +// policy.setQueue(">"); +// policy.setMaxAuditDepth(16384); +// policy.setCursorMemoryHighWaterMark(95); // More breathing room. +// PolicyMap pMap = new PolicyMap(); +// pMap.setDefaultEntry(policy); +// broker.setDestinationPolicy(pMap); + + broker.getSystemUsage().getMemoryUsage().setLimit(256 * 1024 * 1024); + broker.start(); + broker.waitUntilStarted(); + connectUri = connector.getConnectUri(); + factory = new ActiveMQConnectionFactory(connectUri); + } + + @After + public void stopBroker() throws Exception { + broker.stop(); + broker.waitUntilStopped(); + } + + @Test(timeout=120000) + public void testBrowsingSmallBatch() throws JMSException { + doTestBrowsing(100); + } + + @Test(timeout=160000) + public void testBrowsingMediumBatch() throws JMSException { + doTestBrowsing(1000); + } + + @Test(timeout=300000) + public void testBrowsingLargeBatch() throws JMSException { + doTestBrowsing(10000); + } + + private void doTestBrowsing(int messageToSend) throws JMSException { + ActiveMQQueue queue = new ActiveMQQueue("TEST"); + + // Send the messages to the Queue. + ActiveMQConnection producerConnection = (ActiveMQConnection) factory.createConnection(); + producerConnection.setUseAsyncSend(true); + producerConnection.start(); + Session producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageProducer producer = producerSession.createProducer(queue); + producer.setDeliveryMode(DeliveryMode.PERSISTENT); + + for (int i = 1; i <= messageToSend; i++) { + String msgStr = provideMessageText(i, 8192); + producer.send(producerSession.createTextMessage(msgStr)); + if ((i % 1000) == 0) { + LOG.info("P&C: {}", msgStr.substring(0, 100)); + } + } + producerConnection.close(); + + // Browse the queue. + Connection connection = factory.createConnection(); + connection.start(); + Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); + + QueueBrowser browser = session.createBrowser(queue); + Enumeration enumeration = browser.getEnumeration(); + int browsed = 0; + while (enumeration.hasMoreElements()) { + TextMessage m = (TextMessage) enumeration.nextElement(); + browsed++; + if ((browsed % 1000) == 0) { + LOG.info("B[{}]: {}", browsed, m.getText().substring(0, 100)); + } + } + browser.close(); + session.close(); + connection.close(); + + // The number of messages browsed should be equal to the number of messages sent. + assertEquals(messageToSend, browsed); + + browser.close(); + } + + public String provideMessageText(int messageNumber, int messageSize) { + StringBuilder buf = new StringBuilder(); + buf.append("Message: "); + if (messageNumber > 0) { + buf.append(messageNumber); + } + buf.append(" sent at: ").append(new Date()); + + if (buf.length() > messageSize) { + return buf.substring(0, messageSize); + } + for (int i = buf.length(); i < messageSize; i++) { + buf.append(' '); + } + return buf.toString(); + } + +} \ No newline at end of file