diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/QueueView.java b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/QueueView.java index 372bb80283..64a4c2792c 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/QueueView.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/QueueView.java @@ -64,7 +64,7 @@ public class QueueView extends DestinationView implements QueueViewMBean { LOG.info("{} purge of {} messages", destination.getActiveMQDestination().getQualifiedName(), originalMessageCount); } - public boolean removeMessage(String messageId) throws Exception { + public synchronized boolean removeMessage(String messageId) throws Exception { return ((Queue)destination).removeMessage(messageId); } @@ -76,25 +76,25 @@ public class QueueView extends DestinationView implements QueueViewMBean { return ((Queue)destination).removeMatchingMessages(selector, maximumMessages); } - public boolean copyMessageTo(String messageId, String destinationName) throws Exception { + public synchronized boolean copyMessageTo(String messageId, String destinationName) throws Exception { ConnectionContext context = BrokerSupport.getConnectionContext(broker.getContextBroker()); ActiveMQDestination toDestination = ActiveMQDestination.createDestination(destinationName, ActiveMQDestination.QUEUE_TYPE); return ((Queue)destination).copyMessageTo(context, messageId, toDestination); } - public int copyMatchingMessagesTo(String selector, String destinationName) throws Exception { + public synchronized int copyMatchingMessagesTo(String selector, String destinationName) throws Exception { ConnectionContext context = BrokerSupport.getConnectionContext(broker.getContextBroker()); ActiveMQDestination toDestination = ActiveMQDestination.createDestination(destinationName, ActiveMQDestination.QUEUE_TYPE); return ((Queue)destination).copyMatchingMessagesTo(context, selector, toDestination); } - public int copyMatchingMessagesTo(String selector, String destinationName, int maximumMessages) throws Exception { + public synchronized int copyMatchingMessagesTo(String selector, String destinationName, int maximumMessages) throws Exception { ConnectionContext context = BrokerSupport.getConnectionContext(broker.getContextBroker()); ActiveMQDestination toDestination = ActiveMQDestination.createDestination(destinationName, ActiveMQDestination.QUEUE_TYPE); return ((Queue)destination).copyMatchingMessagesTo(context, selector, toDestination, maximumMessages); } - public boolean moveMessageTo(String messageId, String destinationName) throws Exception { + public synchronized boolean moveMessageTo(String messageId, String destinationName) throws Exception { ConnectionContext context = BrokerSupport.getConnectionContext(broker.getContextBroker()); ActiveMQDestination toDestination = ActiveMQDestination.createDestination(destinationName, ActiveMQDestination.QUEUE_TYPE); return ((Queue)destination).moveMessageTo(context, messageId, toDestination); @@ -123,6 +123,9 @@ public class QueueView extends DestinationView implements QueueViewMBean { public boolean retryMessage(String messageId) throws Exception { Queue queue = (Queue) destination; QueueMessageReference ref = queue.getMessage(messageId); + if (ref == null) { + throw new JMSException("Could not find message reference: "+ messageId); + } Message rc = ref.getMessage(); if (rc != null) { ActiveMQDestination originalDestination = rc.getOriginalDestination(); diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java index fa75752df1..86b8c6dccf 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java @@ -1405,7 +1405,10 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index doPageIn(true); pagedInMessagesLock.readLock().lock(); try { - set.addAll(pagedInMessages.values()); + if (!set.addAll(pagedInMessages.values())) { + // nothing new to check - mem constraint on page in + return movedCounter; + }; } finally { pagedInMessagesLock.readLock().unlock(); } @@ -1474,7 +1477,10 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index doPageIn(true, false, (messages.isCacheEnabled() || !broker.getBrokerService().isPersistent()) ? messages.size() : getMaxBrowsePageSize()); pagedInMessagesLock.readLock().lock(); try { - set.addAll(pagedInMessages.values()); + if (!set.addAll(pagedInMessages.values())) { + // nothing new to check - mem constraint on page in + return movedCounter; + } } finally { pagedInMessagesLock.readLock().unlock(); } @@ -1591,7 +1597,10 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index doPageIn(true); pagedInMessagesLock.readLock().lock(); try { - set.addAll(pagedInMessages.values()); + if (!set.addAll(pagedInMessages.values())) { + // nothing new to check - mem constraint on page in + return movedCounter; + } } finally { pagedInMessagesLock.readLock().unlock(); } @@ -1623,7 +1632,10 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index doPageIn(true); pagedInMessagesLock.readLock().lock(); try { - set.addAll(pagedInMessages.values()); + if (!set.addAll(pagedInMessages.values())) { + // nothing new to check - mem constraint on page in + return restoredCounter; + } } finally { pagedInMessagesLock.readLock().unlock(); } diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/jmx/JmxOpPageInOnMemoryLimit.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/jmx/JmxOpPageInOnMemoryLimit.java new file mode 100644 index 0000000000..459bec117e --- /dev/null +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/jmx/JmxOpPageInOnMemoryLimit.java @@ -0,0 +1,149 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.jmx; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.region.policy.PolicyEntry; +import org.apache.activemq.broker.region.policy.PolicyMap; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import javax.jms.*; +import javax.management.MBeanServer; +import javax.management.MBeanServerInvocationHandler; +import javax.management.MalformedObjectNameException; +import javax.management.ObjectName; + +import static org.junit.Assert.*; + +// https://issues.apache.org/jira/browse/AMQ-7302 +public class JmxOpPageInOnMemoryLimit { + + BrokerService broker; + protected MBeanServer mbeanServer; + protected String domain = "org.apache.activemq"; + + protected Connection connection; + protected int messageCount = 4000; + ActiveMQQueue destination = new ActiveMQQueue("QUEUE_TO_FILL_PAST_MEM_LIMIT"); + String lastMessageId = ""; + + @Test(timeout = 60*1000) + public void testNoHangOnPageInForJmxOps() throws Exception { + + // Now get the QueueViewMBean and ... + String objectNameStr = broker.getBrokerObjectName().toString(); + objectNameStr += ",destinationType=Queue,destinationName="+destination.getQueueName(); + ObjectName queueViewMBeanName = assertRegisteredObjectName(objectNameStr); + final QueueViewMBean proxy = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true); + + assertFalse("limit reached, cache disabled", proxy.isCacheEnabled()); + + proxy.removeMessage(lastMessageId); + + proxy.copyMessageTo(lastMessageId, "someOtherQ"); + + proxy.moveMatchingMessagesTo("JMSMessageID = '" + lastMessageId + "'","someOtherQ"); + + + // flick dlq flag to allow retry work + proxy.setDLQ(true); + proxy.retryMessages(); + + try { + proxy.retryMessage(lastMessageId); + } catch (JMSException expected) { + assertTrue("Could not find", expected.getMessage().contains("find")); + } + + long count = proxy.getQueueSize(); + boolean cursorFull = proxy.getCursorPercentUsage() >= 70; + assertTrue("Cursor full", cursorFull); + + assertEquals("Queue size", messageCount, count); + } + + private String produceMessages() throws Exception { + connection = createConnectionFactory().createConnection(); + connection.start(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + String trackLastMessageId = ""; + MessageProducer producer = session.createProducer(destination); + final byte[] payload = new byte[1024]; + for (int i = 0; i < messageCount; i++) { + BytesMessage message = session.createBytesMessage(); + message.writeBytes(payload); + producer.send(message); + trackLastMessageId = message.getJMSMessageID(); + } + producer.close(); + connection.close(); + return trackLastMessageId; + } + + + protected ObjectName assertRegisteredObjectName(String name) throws MalformedObjectNameException, NullPointerException { + ObjectName objectName = new ObjectName(name); + if (!mbeanServer.isRegistered(objectName)) { + fail("Could not find MBean!: " + objectName); + } + return objectName; + } + + @Before + public void setUp() throws Exception { + createBroker(); + mbeanServer = broker.getManagementContext().getMBeanServer(); + } + + @After + public void tearDown() throws Exception { + if (connection != null) { + connection.close(); + connection = null; + } + broker.stop(); + } + + protected BrokerService createBroker() throws Exception { + broker = new BrokerService(); + broker.setUseJmx(true); + broker.setEnableStatistics(true); + broker.addConnector("tcp://localhost:0"); + ((KahaDBPersistenceAdapter)broker.getPersistenceAdapter()).setConcurrentStoreAndDispatchQueues(false); + + broker.deleteAllMessages(); + + PolicyEntry policyEntry = new PolicyEntry(); + policyEntry.setMemoryLimit(1024*1024); + PolicyMap policyMap = new PolicyMap(); + policyMap.setDefaultEntry(policyEntry); + broker.setDestinationPolicy(policyMap); + broker.start(); + lastMessageId = produceMessages(); + return broker; + } + + protected ConnectionFactory createConnectionFactory() throws Exception { + return new ActiveMQConnectionFactory(broker.getTransportConnectors().get(0).getPublishableConnectString()); + } + +}