diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java index b7f897b12d..b1e003ff01 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java @@ -19,7 +19,6 @@ package org.apache.activemq.broker.region.cursors; import java.util.Iterator; import java.util.LinkedHashMap; import java.util.Map.Entry; - import org.apache.activemq.broker.region.Destination; import org.apache.activemq.broker.region.MessageReference; import org.apache.activemq.command.Message; @@ -47,6 +46,7 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i this.regionDestination=destination; } + @Override public final synchronized void start() throws Exception{ if (!isStarted()) { super.start(); @@ -60,6 +60,7 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i } } + @Override public final synchronized void stop() throws Exception { resetBatch(); super.stop(); @@ -91,6 +92,7 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i return recovered; } + @Override public final void reset() { if (batchList.isEmpty()) { try { @@ -104,6 +106,7 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i size(); } + @Override public synchronized void release() { clearIterator(false); } @@ -127,6 +130,7 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i public final void finished() { } + @Override public final synchronized boolean hasNext() { if (batchList.isEmpty()) { try { @@ -140,6 +144,7 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i return this.iterator.hasNext(); } + @Override public final synchronized MessageReference next() { MessageReference result = null; if (!this.batchList.isEmpty()&&this.iterator.hasNext()) { @@ -149,6 +154,7 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i return result; } + @Override public final synchronized void addMessageLast(MessageReference node) throws Exception { if (cacheEnabled && hasSpace()) { recoverMessage(node.getMessage(),true); @@ -171,11 +177,13 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i protected void setBatch(MessageId messageId) throws Exception { } + @Override public final synchronized void addMessageFirst(MessageReference node) throws Exception { cacheEnabled=false; size++; } + @Override public final synchronized void remove() { size--; if (iterator!=null) { @@ -184,7 +192,7 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i if (last != null) { last.decrementReferenceCount(); } - if (size==0 && isStarted() && useCache && hasSpace() && getStoreSize() == 0) { + if (size==0 && isStarted() && useCache && hasSpace() && isStoreEmpty()) { if (LOG.isDebugEnabled()) { LOG.debug(regionDestination.getActiveMQDestination().getPhysicalName() + " enabling cache on last remove"); } @@ -192,16 +200,19 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i } } + @Override public final synchronized void remove(MessageReference node) { size--; cacheEnabled=false; batchList.remove(node.getMessageId()); } + @Override public final synchronized void clear() { gc(); } + @Override public final synchronized void gc() { for (Message msg : batchList.values()) { rollback(msg.getMessageId()); @@ -218,6 +229,7 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i } } + @Override protected final synchronized void fillBatch() { if (batchResetNeeded) { resetBatch(); @@ -237,15 +249,18 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i } } + @Override public final synchronized boolean isEmpty() { // negative means more messages added to store through queue.send since last reset return size == 0; } + @Override public final synchronized boolean hasMessagesBufferedToDeliver() { return !batchList.isEmpty(); } + @Override public final synchronized int size() { if (size < 0) { this.size = getStoreSize(); @@ -259,4 +274,6 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i protected abstract void resetBatch(); protected abstract int getStoreSize(); + + protected abstract boolean isStoreEmpty(); } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java index d3e963d8b7..77ee08c91b 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java @@ -17,7 +17,6 @@ package org.apache.activemq.broker.region.cursors; import java.io.IOException; - import org.apache.activemq.broker.region.Queue; import org.apache.activemq.command.Message; import org.apache.activemq.command.MessageId; @@ -33,7 +32,7 @@ import org.apache.commons.logging.LogFactory; */ class QueueStorePrefetch extends AbstractStoreCursor { private static final Log LOG = LogFactory.getLog(QueueStorePrefetch.class); - private MessageStore store; + private final MessageStore store; /** * Construct it @@ -41,7 +40,7 @@ class QueueStorePrefetch extends AbstractStoreCursor { */ public QueueStorePrefetch(Queue queue) { super(queue); - this.store = (MessageStore)queue.getMessageStore(); + this.store = queue.getMessageStore(); } @@ -58,29 +57,47 @@ class QueueStorePrefetch extends AbstractStoreCursor { + @Override protected synchronized int getStoreSize() { try { - return this.store.getMessageCount(); + int result = this.store.getMessageCount(); + return result; + } catch (IOException e) { LOG.error("Failed to get message count", e); throw new RuntimeException(e); } } + @Override + protected synchronized boolean isStoreEmpty() { + try { + return this.store.isEmpty(); + + } catch (Exception e) { + LOG.error("Failed to get message count", e); + throw new RuntimeException(e); + } + } + + @Override protected void resetBatch() { this.store.resetBatching(); } + @Override protected void setBatch(MessageId messageId) throws Exception { store.setBatch(messageId); batchResetNeeded = false; } + @Override protected void doFillBatch() throws Exception { this.store.recoverNextMessages(this.maxBatchSize, this); } + @Override public String toString() { return "QueueStorePrefetch" + System.identityHashCode(this); } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java index 9a7c329dd4..79bccaf54b 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java @@ -17,7 +17,6 @@ package org.apache.activemq.broker.region.cursors; import java.io.IOException; - import org.apache.activemq.broker.region.Subscription; import org.apache.activemq.broker.region.Topic; import org.apache.activemq.command.Message; @@ -36,10 +35,10 @@ import org.apache.commons.logging.LogFactory; */ class TopicStorePrefetch extends AbstractStoreCursor { private static final Log LOG = LogFactory.getLog(TopicStorePrefetch.class); - private TopicMessageStore store; - private String clientId; - private String subscriberName; - private Subscription subscription; + private final TopicMessageStore store; + private final String clientId; + private final String subscriberName; + private final Subscription subscription; /** * @param topic @@ -62,6 +61,7 @@ class TopicStorePrefetch extends AbstractStoreCursor { } + @Override public synchronized boolean recoverMessage(Message message, boolean cached) throws Exception { MessageEvaluationContext messageEvaluationContext = new NonCachedMessageEvaluationContext(); messageEvaluationContext.setMessageReference(message); @@ -73,6 +73,7 @@ class TopicStorePrefetch extends AbstractStoreCursor { } + @Override protected synchronized int getStoreSize() { try { return store.getMessageCount(clientId, subscriberName); @@ -81,17 +82,31 @@ class TopicStorePrefetch extends AbstractStoreCursor { throw new RuntimeException(e); } } + + @Override + protected synchronized boolean isStoreEmpty() { + try { + return this.store.isEmpty(); + + } catch (Exception e) { + LOG.error("Failed to get message count", e); + throw new RuntimeException(e); + } + } + @Override protected void resetBatch() { this.store.resetBatching(clientId, subscriberName); } + @Override protected void doFillBatch() throws Exception { this.store.recoverNextMessages(clientId, subscriberName, maxBatchSize, this); } + @Override public String toString() { return "TopicStorePrefetch" + System.identityHashCode(this) + "(" + clientId + "," + subscriberName + ")"; } diff --git a/activemq-core/src/main/java/org/apache/activemq/store/AbstractMessageStore.java b/activemq-core/src/main/java/org/apache/activemq/store/AbstractMessageStore.java index f5bf1db7dd..f0e58a0959 100644 --- a/activemq-core/src/main/java/org/apache/activemq/store/AbstractMessageStore.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/AbstractMessageStore.java @@ -17,10 +17,9 @@ package org.apache.activemq.store; import java.io.IOException; - +import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.MessageId; -import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.usage.MemoryUsage; abstract public class AbstractMessageStore implements MessageStore { @@ -48,4 +47,13 @@ abstract public class AbstractMessageStore implements MessageStore { public void setBatch(MessageId messageId) throws IOException, Exception { } + + /** + * flag to indicate if the store is empty + * @return true if the message count is 0 + * @throws Exception + */ + public boolean isEmpty() throws Exception{ + return getMessageCount()==0; + } } diff --git a/activemq-core/src/main/java/org/apache/activemq/store/MessageStore.java b/activemq-core/src/main/java/org/apache/activemq/store/MessageStore.java index afdd64c3ca..493b80fb74 100755 --- a/activemq-core/src/main/java/org/apache/activemq/store/MessageStore.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/MessageStore.java @@ -17,7 +17,6 @@ package org.apache.activemq.store; import java.io.IOException; - import org.apache.activemq.Service; import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.command.ActiveMQDestination; @@ -25,7 +24,6 @@ import org.apache.activemq.command.Message; import org.apache.activemq.command.MessageAck; import org.apache.activemq.command.MessageId; import org.apache.activemq.usage.MemoryUsage; -import org.apache.activemq.usage.SystemUsage; /** * Represents a message store which is used by the persistent implementations @@ -114,7 +112,15 @@ public interface MessageStore extends Service { /** * allow caching cursors to set the current batch offset when cache is exhausted * @param messageId + * @throws Exception */ void setBatch(MessageId messageId) throws Exception; + /** + * flag to indicate if the store is empty + * @return true if the message count is 0 + * @throws Exception + */ + boolean isEmpty() throws Exception; + } diff --git a/activemq-core/src/main/java/org/apache/activemq/store/ProxyMessageStore.java b/activemq-core/src/main/java/org/apache/activemq/store/ProxyMessageStore.java index 97212bf4b8..27c89aaebb 100755 --- a/activemq-core/src/main/java/org/apache/activemq/store/ProxyMessageStore.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/ProxyMessageStore.java @@ -96,4 +96,8 @@ public class ProxyMessageStore implements MessageStore { public void setBatch(MessageId messageId) throws Exception { delegate.setBatch(messageId); } + + public boolean isEmpty() throws Exception { + return delegate.isEmpty(); + } } diff --git a/activemq-core/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java b/activemq-core/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java index 4acb4f1dd2..0d4d0d257e 100755 --- a/activemq-core/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java @@ -17,7 +17,6 @@ package org.apache.activemq.store; import java.io.IOException; - import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.Message; @@ -25,7 +24,6 @@ import org.apache.activemq.command.MessageAck; import org.apache.activemq.command.MessageId; import org.apache.activemq.command.SubscriptionInfo; import org.apache.activemq.usage.MemoryUsage; -import org.apache.activemq.usage.SystemUsage; /** * A simple proxy that delegates to another MessageStore. @@ -138,4 +136,8 @@ public class ProxyTopicMessageStore implements TopicMessageStore { public void setBatch(MessageId messageId) throws Exception { delegate.setBatch(messageId); } + + public boolean isEmpty() throws Exception { + return delegate.isEmpty(); + } } diff --git a/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java b/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java index 020ce01f44..1dddef7963 100644 --- a/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java @@ -24,7 +24,6 @@ import java.util.Iterator; import java.util.Map; import java.util.Set; import java.util.Map.Entry; - import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQQueue; @@ -69,7 +68,7 @@ import org.apache.kahadb.page.Transaction; public class KahaDBStore extends MessageDatabase implements PersistenceAdapter { - private WireFormat wireFormat = new OpenWireFormat(); + private final WireFormat wireFormat = new OpenWireFormat(); public void setBrokerName(String brokerName) { } @@ -128,6 +127,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter { this.dest = convert( destination ); } + @Override public ActiveMQDestination getDestination() { return destination; } @@ -200,6 +200,19 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter { }); } } + + public boolean isEmpty() throws IOException { + synchronized(indexMutex) { + return pageFile.tx().execute(new Transaction.CallableClosure(){ + public Boolean execute(Transaction tx) throws IOException { + // Iterate through all index entries to get a count of messages in the destination. + StoredDestination sd = getStoredDestination(dest, tx); + return sd.locationIndex.isEmpty(tx); + } + }); + } + } + public void recover(final MessageRecoveryListener listener) throws Exception { synchronized(indexMutex) { @@ -266,10 +279,13 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter { } + @Override public void setMemoryUsage(MemoryUsage memoeyUSage) { } + @Override public void start() throws Exception { } + @Override public void stop() throws Exception { } diff --git a/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2512Test.java b/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2512Test.java new file mode 100644 index 0000000000..669066e1ee --- /dev/null +++ b/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2512Test.java @@ -0,0 +1,174 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.bugs; + +import java.io.File; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.DeliveryMode; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; +import javax.jms.TextMessage; +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.EmbeddedBrokerTestSupport; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.store.kahadb.KahaDBStore; +import org.apache.activemq.util.IOHelper; + +public class AMQ2512Test extends EmbeddedBrokerTestSupport { + private static Connection connection; + private final static String QUEUE_NAME = "dee.q"; + private final static int INITIAL_MESSAGES_CNT = 1000; + private final static int WORKER_INTERNAL_ITERATIONS = 100; + private final static int TOTAL_MESSAGES_CNT = INITIAL_MESSAGES_CNT * WORKER_INTERNAL_ITERATIONS + + INITIAL_MESSAGES_CNT; + private final static byte[] payload = new byte[5 * 1024]; + private final static String TEXT = new String(payload); + + private final static String PRP_INITIAL_ID = "initial-id"; + private final static String PRP_WORKER_ID = "worker-id"; + + private final static CountDownLatch LATCH = new CountDownLatch(TOTAL_MESSAGES_CNT); + + private final static AtomicInteger ON_MSG_COUNTER = new AtomicInteger(); + + public void testKahaDBFailure() throws Exception { + final ConnectionFactory fac = new ActiveMQConnectionFactory(this.bindAddress); + connection = fac.createConnection(); + final Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + final Queue queue = session.createQueue(QUEUE_NAME); + final MessageProducer producer = session.createProducer(queue); + producer.setDeliveryMode(DeliveryMode.PERSISTENT); + connection.start(); + + final long startTime = System.nanoTime(); + + final List consumers = new ArrayList(); + for (int i = 0; i < 20; i++) { + consumers.add(new Consumer("worker-" + i)); + } + + for (int i = 0; i < INITIAL_MESSAGES_CNT; i++) { + final TextMessage msg = session.createTextMessage(TEXT); + msg.setStringProperty(PRP_INITIAL_ID, "initial-" + i); + producer.send(msg); + } + + LATCH.await(); + final long endTime = System.nanoTime(); + System.out.println("Total execution time = " + + TimeUnit.MILLISECONDS.convert(endTime - startTime, TimeUnit.NANOSECONDS) + " [ms]."); + System.out.println("Rate = " + TOTAL_MESSAGES_CNT + / TimeUnit.SECONDS.convert(endTime - startTime, TimeUnit.NANOSECONDS) + " [msg/s]."); + + for (Consumer c : consumers) { + c.close(); + } + connection.close(); + } + + private final static class Consumer implements MessageListener { + private final String name; + private final Session session; + private final MessageProducer producer; + + private Consumer(String name) { + this.name = name; + try { + session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); + final Queue queue = session.createQueue(QUEUE_NAME + "?consumer.prefetchSize=10"); + producer = session.createProducer(queue); + producer.setDeliveryMode(DeliveryMode.PERSISTENT); + final MessageConsumer consumer = session.createConsumer(queue); + consumer.setMessageListener(this); + } catch (JMSException e) { + e.printStackTrace(); + throw new RuntimeException(e); + } + } + + public void onMessage(Message message) { + final TextMessage msg = (TextMessage) message; + try { + if (!msg.propertyExists(PRP_WORKER_ID)) { + for (int i = 0; i < WORKER_INTERNAL_ITERATIONS; i++) { + final TextMessage newMsg = session.createTextMessage(msg.getText()); + newMsg.setStringProperty(PRP_WORKER_ID, name + "-" + i); + newMsg.setStringProperty(PRP_INITIAL_ID, msg.getStringProperty(PRP_INITIAL_ID)); + producer.send(newMsg); + } + } + msg.acknowledge(); + + } catch (JMSException e) { + e.printStackTrace(); + throw new RuntimeException(e); + } finally { + final int onMsgCounter = ON_MSG_COUNTER.getAndIncrement(); + if (onMsgCounter % 1000 == 0) { + System.out.println("message received: " + onMsgCounter); + } + LATCH.countDown(); + } + } + + private void close() { + if (session != null) { + try { + session.close(); + } catch (JMSException e) { + e.printStackTrace(); + throw new RuntimeException(e); + } + } + } + } + + @Override + protected void setUp() throws Exception { + bindAddress = "tcp://0.0.0.0:61617"; + super.setUp(); + } + + @Override + protected BrokerService createBroker() throws Exception { + File dataFileDir = new File("target/test-amq-2512/datadb"); + IOHelper.mkdirs(dataFileDir); + IOHelper.deleteChildren(dataFileDir); + KahaDBStore kaha = new KahaDBStore(); + kaha.setDirectory(dataFileDir); + BrokerService answer = new BrokerService(); + answer.setPersistenceAdapter(kaha); + + kaha.setEnableJournalDiskSyncs(false); + //kaha.setIndexCacheSize(10); + answer.setDataDirectoryFile(dataFileDir); + answer.setUseJmx(false); + answer.addConnector(bindAddress); + return answer; + } +} diff --git a/kahadb/src/main/java/org/apache/kahadb/index/BTreeIndex.java b/kahadb/src/main/java/org/apache/kahadb/index/BTreeIndex.java index 4cba413a87..350a654ce2 100644 --- a/kahadb/src/main/java/org/apache/kahadb/index/BTreeIndex.java +++ b/kahadb/src/main/java/org/apache/kahadb/index/BTreeIndex.java @@ -220,6 +220,10 @@ public class BTreeIndex implements Index { pw.flush(); } + synchronized public boolean isEmpty(final Transaction tx) throws IOException { + return getRoot(tx).isEmpty(tx); + } + synchronized public Iterator> iterator(final Transaction tx) throws IOException { return getRoot(tx).iterator(tx); } diff --git a/kahadb/src/main/java/org/apache/kahadb/index/BTreeNode.java b/kahadb/src/main/java/org/apache/kahadb/index/BTreeNode.java index 372f430223..a32de749ce 100644 --- a/kahadb/src/main/java/org/apache/kahadb/index/BTreeNode.java +++ b/kahadb/src/main/java/org/apache/kahadb/index/BTreeNode.java @@ -507,6 +507,10 @@ public final class BTreeNode { } } + public boolean isEmpty(final Transaction tx) throws IOException { + return keys.length==0; + } + public void visit(Transaction tx, BTreeVisitor visitor) throws IOException { if (visitor == null) { throw new IllegalArgumentException("Visitor cannot be null");