From 37c2a955a2465df01749cf0d5da3ed19d55df3a1 Mon Sep 17 00:00:00 2001 From: Gary Tully Date: Wed, 4 Feb 2009 15:19:56 +0000 Subject: [PATCH] move setBatch to MessageStore interface to keep cursors store agnostic - http://issues.apache.org/activemq/browse/AMQ-2020 - some store specific tests to follow git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@740765 13f79535-47bb-0310-9956-ffa450edef68 --- .../region/cursors/QueueStorePrefetch.java | 10 +--------- .../activemq/store/AbstractMessageStore.java | 4 ++++ .../apache/activemq/store/MessageStore.java | 7 +++++++ .../activemq/store/ProxyMessageStore.java | 4 ++++ .../store/ProxyTopicMessageStore.java | 4 ++++ .../activemq/store/amq/AMQMessageStore.java | 10 ++++++++++ .../region/QueueDuplicatesFromStoreTest.java | 20 ++++++++++--------- 7 files changed, 41 insertions(+), 18 deletions(-) diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java index b528bfaf07..ade026e2e8 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java @@ -75,15 +75,7 @@ class QueueStorePrefetch extends AbstractStoreCursor { } protected void setBatch(MessageId messageId) { - AMQMessageStore amqStore = (AMQMessageStore) store; - try { - amqStore.flush(); - } catch (InterruptedIOException e) { - LOG.debug("flush on setBatch resulted in exception", e); - } - KahaReferenceStore kahaStore = - (KahaReferenceStore) amqStore.getReferenceStore(); - kahaStore.setBatch(messageId); + store.setBatch(messageId); batchResetNeeded = false; } diff --git a/activemq-core/src/main/java/org/apache/activemq/store/AbstractMessageStore.java b/activemq-core/src/main/java/org/apache/activemq/store/AbstractMessageStore.java index e97b0d6d01..4434302d87 100644 --- a/activemq-core/src/main/java/org/apache/activemq/store/AbstractMessageStore.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/AbstractMessageStore.java @@ -17,6 +17,7 @@ package org.apache.activemq.store; import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.command.MessageId; import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.usage.MemoryUsage; @@ -42,4 +43,7 @@ abstract public class AbstractMessageStore implements MessageStore { public void setMemoryUsage(MemoryUsage memoryUsage) { } + + public void setBatch(MessageId messageId) { + } } diff --git a/activemq-core/src/main/java/org/apache/activemq/store/MessageStore.java b/activemq-core/src/main/java/org/apache/activemq/store/MessageStore.java index 9cb1be2f65..3757c34ecd 100755 --- a/activemq-core/src/main/java/org/apache/activemq/store/MessageStore.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/MessageStore.java @@ -110,4 +110,11 @@ public interface MessageStore extends Service { void recoverNextMessages(int maxReturned, MessageRecoveryListener listener) throws Exception; void dispose(ConnectionContext context); + + /** + * allow caching cursors to set the current batch offset when cache is exhausted + * @param messageId + */ + void setBatch(MessageId messageId); + } diff --git a/activemq-core/src/main/java/org/apache/activemq/store/ProxyMessageStore.java b/activemq-core/src/main/java/org/apache/activemq/store/ProxyMessageStore.java index 70549f9db2..cad63e2a24 100755 --- a/activemq-core/src/main/java/org/apache/activemq/store/ProxyMessageStore.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/ProxyMessageStore.java @@ -92,4 +92,8 @@ public class ProxyMessageStore implements MessageStore { delegate.resetBatching(); } + + public void setBatch(MessageId messageId) { + delegate.setBatch(messageId); + } } diff --git a/activemq-core/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java b/activemq-core/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java index 8e827c3226..1069078c0b 100755 --- a/activemq-core/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java @@ -134,4 +134,8 @@ public class ProxyTopicMessageStore implements TopicMessageStore { delegate.resetBatching(); } + + public void setBatch(MessageId messageId) { + delegate.setBatch(messageId); + } } diff --git a/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQMessageStore.java b/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQMessageStore.java index 304bf21e27..5297c0057d 100644 --- a/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQMessageStore.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQMessageStore.java @@ -558,4 +558,14 @@ public class AMQMessageStore extends AbstractMessageStore { referenceStore.dispose(context); super.dispose(context); } + + public void setBatch(MessageId messageId) { + try { + flush(); + } catch (InterruptedIOException e) { + LOG.debug("flush on setBatch resulted in exception", e); + } + getReferenceStore().setBatch(messageId); + } + } diff --git a/activemq-core/src/test/java/org/apache/activemq/broker/region/QueueDuplicatesFromStoreTest.java b/activemq-core/src/test/java/org/apache/activemq/broker/region/QueueDuplicatesFromStoreTest.java index c97df930e1..2dbd17369f 100644 --- a/activemq-core/src/test/java/org/apache/activemq/broker/region/QueueDuplicatesFromStoreTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/broker/region/QueueDuplicatesFromStoreTest.java @@ -46,6 +46,7 @@ import org.apache.activemq.command.Response; import org.apache.activemq.filter.MessageEvaluationContext; import org.apache.activemq.state.ProducerState; import org.apache.activemq.store.MessageStore; +import org.apache.activemq.store.PersistenceAdapter; import org.apache.activemq.store.amq.AMQPersistenceAdapter; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -70,16 +71,20 @@ public class QueueDuplicatesFromStoreTest extends TestCase { final int ackWindow = 50; final int ackBatchSize = 50; final int fullWindow = 200; - final int count = 20000; + protected int count = 20000; public void setUp() throws Exception { - brokerService = new BrokerService(); + brokerService = createBroker(); brokerService.setUseJmx(false); brokerService.deleteAllMessages(); brokerService.start(); } - public void tearDown() throws Exception { + protected BrokerService createBroker() throws Exception { + return new BrokerService(); + } + + public void tearDown() throws Exception { brokerService.stop(); } @@ -92,8 +97,7 @@ public class QueueDuplicatesFromStoreTest extends TestCase { } public void doTestNoDuplicateAfterCacheFullAndAcked(final int auditDepth) throws Exception { - final AMQPersistenceAdapter persistenceAdapter = - (AMQPersistenceAdapter) brokerService.getPersistenceAdapter(); + final PersistenceAdapter persistenceAdapter = brokerService.getPersistenceAdapter(); final MessageStore queueMessageStore = persistenceAdapter.createQueueMessageStore(destination); final ConnectionContext contextNotInTx = new ConnectionContext(); @@ -127,10 +131,9 @@ public class QueueDuplicatesFromStoreTest extends TestCase { Message message = getMessage(i); queue.send(producerExchange, message); } - - assertEquals("store count is correct", count, queueMessageStore - .getMessageCount()); + assertEquals("store count is correct", count, queueMessageStore.getMessageCount()); + // pull from store in small windows Subscription subscription = new Subscription() { @@ -305,7 +308,6 @@ public class QueueDuplicatesFromStoreTest extends TestCase { if (removeIndex % 1000 == 0) { LOG.info("acked: " + removeIndex); persistenceAdapter.checkpoint(true); - persistenceAdapter.cleanup(); } } }