diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/IndirectMessageReference.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/IndirectMessageReference.java index 3e7cab77ba..c1b5f3c4e1 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/IndirectMessageReference.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/IndirectMessageReference.java @@ -24,8 +24,8 @@ import org.apache.activemq.command.MessageId; * Keeps track of a message that is flowing through the Broker. This object may * hold a hard reference to the message or only hold the id of the message if * the message has been persisted on in a MessageStore. - * - * + * + * */ public class IndirectMessageReference implements QueueMessageReference { @@ -38,7 +38,7 @@ public class IndirectMessageReference implements QueueMessageReference { /** Direct reference to the message */ private final Message message; private final MessageId messageId; - + /** * @param message */ @@ -50,44 +50,70 @@ public class IndirectMessageReference implements QueueMessageReference { message.getGroupSequence(); } + @Override public Message getMessageHardRef() { return message; } + @Override public int getReferenceCount() { return message.getReferenceCount(); } + @Override public int incrementReferenceCount() { return message.incrementReferenceCount(); } + @Override public int decrementReferenceCount() { return message.decrementReferenceCount(); } + @Override public Message getMessage() { return message; } + @Override public String toString() { return "Message " + message.getMessageId() + " dropped=" + dropped + " acked=" + acked + " locked=" + (lockOwner != null); } + @Override public void incrementRedeliveryCounter() { message.incrementRedeliveryCounter(); } + @Override public synchronized boolean isDropped() { return dropped; } + @Override public synchronized void drop() { dropped = true; lockOwner = null; message.decrementReferenceCount(); } + /** + * Check if the message has already been dropped before + * dropping. Return true if dropped, else false. + * This method exists so that this can be done atomically + * under the intrinisic lock + */ + @Override + public synchronized boolean dropIfLive() { + if (isDropped()) { + return false; + } else { + drop(); + return true; + } + } + + @Override public boolean lock(LockOwner subscription) { synchronized (this) { if (dropped || lockOwner != null) { @@ -98,28 +124,34 @@ public class IndirectMessageReference implements QueueMessageReference { } } + @Override public synchronized boolean unlock() { boolean result = lockOwner != null; lockOwner = null; return result; } + @Override public synchronized LockOwner getLockOwner() { return lockOwner; } + @Override public int getRedeliveryCounter() { return message.getRedeliveryCounter(); } + @Override public MessageId getMessageId() { return messageId; } + @Override public Message.MessageDestination getRegionDestination() { return message.getRegionDestination(); } + @Override public boolean isPersistent() { return message.isPersistent(); } @@ -128,38 +160,47 @@ public class IndirectMessageReference implements QueueMessageReference { return lockOwner != null; } + @Override public synchronized boolean isAcked() { return acked; } + @Override public synchronized void setAcked(boolean b) { acked = b; } + @Override public String getGroupID() { return message.getGroupID(); } + @Override public int getGroupSequence() { return message.getGroupSequence(); } + @Override public ConsumerId getTargetConsumerId() { return message.getTargetConsumerId(); } + @Override public long getExpiration() { return message.getExpiration(); } + @Override public boolean isExpired() { return message.isExpired(); } + @Override public synchronized int getSize() { return message.getSize(); } + @Override public boolean isAdvisory() { return message.isAdvisory(); } diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/NullMessageReference.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/NullMessageReference.java index 95109724d6..bef9b230e6 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/NullMessageReference.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/NullMessageReference.java @@ -29,98 +29,127 @@ public final class NullMessageReference implements QueueMessageReference { private final ActiveMQMessage message = new ActiveMQMessage(); private volatile int references; + @Override public void drop() { throw new RuntimeException("not implemented"); } + @Override + public synchronized boolean dropIfLive() { + throw new RuntimeException("not implemented"); + } + + @Override public LockOwner getLockOwner() { throw new RuntimeException("not implemented"); } + @Override public boolean isAcked() { return false; } + @Override public boolean isDropped() { return false; } + @Override public boolean lock(LockOwner subscription) { return true; } + @Override public void setAcked(boolean b) { throw new RuntimeException("not implemented"); } + @Override public boolean unlock() { return true; } + @Override public int decrementReferenceCount() { return --references; } + @Override public long getExpiration() { throw new RuntimeException("not implemented"); } + @Override public String getGroupID() { return null; } + @Override public int getGroupSequence() { return 0; } + @Override public Message getMessage() { return message; } + @Override public Message getMessageHardRef() { throw new RuntimeException("not implemented"); } + @Override public MessageId getMessageId() { return message.getMessageId(); } + @Override public int getRedeliveryCounter() { throw new RuntimeException("not implemented"); } + @Override public int getReferenceCount() { return references; } + @Override public Destination getRegionDestination() { return null; } + @Override public int getSize() { throw new RuntimeException("not implemented"); } + @Override public ConsumerId getTargetConsumerId() { throw new RuntimeException("not implemented"); } + @Override public void incrementRedeliveryCounter() { throw new RuntimeException("not implemented"); } + @Override public int incrementReferenceCount() { return ++references; } + @Override public boolean isExpired() { return false; } + @Override public boolean isPersistent() { throw new RuntimeException("not implemented"); } + @Override public boolean isAdvisory() { return false; } diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java index 0d060226b0..f025998ba3 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java @@ -1748,7 +1748,6 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index // This sends the ack the the journal.. if (!ack.isInTransaction()) { acknowledge(context, sub, ack, reference); - getDestinationStatistics().getDequeues().increment(); dropMessage(reference); } else { try { @@ -1758,7 +1757,6 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index @Override public void afterCommit() throws Exception { - getDestinationStatistics().getDequeues().increment(); dropMessage(reference); wakeup(); } @@ -1788,9 +1786,10 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index } private void dropMessage(QueueMessageReference reference) { - if (!reference.isDropped()) { - reference.drop(); - destinationStatistics.getMessages().decrement(); + //use dropIfLive so we only process the statistics at most one time + if (reference.dropIfLive()) { + getDestinationStatistics().getDequeues().increment(); + getDestinationStatistics().getMessages().decrement(); pagedInMessagesLock.writeLock().lock(); try { pagedInMessages.remove(reference); diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/QueueMessageReference.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/QueueMessageReference.java index 21d43e493f..89bcd6a517 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/QueueMessageReference.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/QueueMessageReference.java @@ -18,25 +18,28 @@ package org.apache.activemq.broker.region; /** * Queue specific MessageReference. - * + * * @author fateev@amazon.com - * + * */ public interface QueueMessageReference extends MessageReference { QueueMessageReference NULL_MESSAGE = new NullMessageReference(); boolean isAcked(); - + void setAcked(boolean b); - + void drop(); - + + boolean dropIfLive(); + + @Override boolean isDropped(); - + boolean lock(LockOwner subscription); - + boolean unlock(); - + LockOwner getLockOwner(); } diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ6293Test.java b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ6293Test.java new file mode 100644 index 0000000000..07a87aa792 --- /dev/null +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ6293Test.java @@ -0,0 +1,178 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.bugs; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.io.File; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import javax.jms.Connection; +import javax.jms.JMSException; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.TransportConnector; +import org.apache.activemq.broker.region.Queue; +import org.apache.activemq.broker.region.policy.PolicyEntry; +import org.apache.activemq.broker.region.policy.PolicyMap; +import org.apache.activemq.command.ActiveMQQueue; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class AMQ6293Test { + + static final Logger LOG = LoggerFactory.getLogger(AMQ6293Test.class); + + private BrokerService brokerService; + private String connectionUri; + private ExecutorService service = Executors.newFixedThreadPool(6); + private final ActiveMQQueue queue = new ActiveMQQueue("test"); + private final int numMessages = 10000; + private Connection connection; + private Session session; + private final AtomicBoolean isException = new AtomicBoolean(); + + @Rule + public TemporaryFolder dataFileDir = new TemporaryFolder(new File("target")); + + @Before + public void before() throws Exception { + brokerService = new BrokerService(); + TransportConnector connector = brokerService.addConnector("tcp://localhost:0"); + connectionUri = connector.getPublishableConnectString(); + brokerService.setPersistent(true); + brokerService.setDataDirectoryFile(dataFileDir.getRoot()); + + PolicyMap policyMap = new PolicyMap(); + PolicyEntry entry = new PolicyEntry(); + policyMap.setDefaultEntry(entry); + brokerService.setDestinationPolicy(policyMap); + entry.setQueuePrefetch(100); + + brokerService.start(); + brokerService.waitUntilStarted(); + + final ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(connectionUri); + Connection connection = null; + connection = factory.createConnection(); + connection.start(); + + session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + } + + @After + public void after() throws Exception { + if (connection != null) { + connection.stop(); + } + if (brokerService != null) { + brokerService.stop(); + brokerService.waitUntilStopped(); + } + } + + @Test(timeout=90000) + public void testDestinationStatisticsOnPurge() throws Exception { + //send messages to the store + sendTestMessages(numMessages); + + //Start up 5 consumers + final Queue regionQueue = (Queue) brokerService.getRegionBroker().getDestinationMap().get(queue); + for (int i = 0; i < 5; i++) { + service.submit(new TestConsumer(session.createConsumer(queue))); + } + + //Start a purge task at the same time as the consumers + for (int i = 0; i < 1; i++) { + service.submit(new Runnable() { + + @Override + public void run() { + try { + regionQueue.purge(); + } catch (Exception e) { + isException.set(true); + LOG.warn(e.getMessage(), e); + throw new RuntimeException(e); + } + } + }); + } + + service.shutdown(); + assertTrue("Took too long to shutdown service", service.awaitTermination(1, TimeUnit.MINUTES)); + assertFalse("Exception encountered", isException.get()); + + //Verify dequeue and message counts + assertEquals(0, regionQueue.getDestinationStatistics().getMessages().getCount()); + assertEquals(numMessages, regionQueue.getDestinationStatistics().getDequeues().getCount()); + } + + private void sendTestMessages(int numMessages) throws JMSException { + MessageProducer producer = session.createProducer(queue); + + final TextMessage textMessage = session.createTextMessage(); + textMessage.setText("Message"); + for (int i = 1; i <= numMessages; i++) { + producer.send(textMessage); + if (i % 1000 == 0) { + LOG.info("Sent {} messages", i); + } + } + } + + private class TestConsumer implements Runnable { + private final MessageConsumer consumer; + + public TestConsumer(final MessageConsumer consumer) throws JMSException { + this.consumer = consumer; + } + + @Override + public void run() { + try { + int i = 0; + while (consumer.receive(1000) != null) { + i++; + if (i % 1000 == 0) { + LOG.info("Received {} messages", i); + } + } + } catch (Exception e) { + isException.set(true); + LOG.warn(e.getMessage(), e); + throw new RuntimeException(e); + } + } + }; +} \ No newline at end of file