move setBatch to MessageStore interface to keep cursors store agnostic - http://issues.apache.org/activemq/browse/AMQ-2020 - some store specific tests to follow

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@740765 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Gary Tully 2009-02-04 15:19:56 +00:00
parent 9a1f9c2fe3
commit 37c2a955a2
7 changed files with 41 additions and 18 deletions

View File

@ -75,15 +75,7 @@ class QueueStorePrefetch extends AbstractStoreCursor {
} }
protected void setBatch(MessageId messageId) { protected void setBatch(MessageId messageId) {
AMQMessageStore amqStore = (AMQMessageStore) store; store.setBatch(messageId);
try {
amqStore.flush();
} catch (InterruptedIOException e) {
LOG.debug("flush on setBatch resulted in exception", e);
}
KahaReferenceStore kahaStore =
(KahaReferenceStore) amqStore.getReferenceStore();
kahaStore.setBatch(messageId);
batchResetNeeded = false; batchResetNeeded = false;
} }

View File

@ -17,6 +17,7 @@
package org.apache.activemq.store; package org.apache.activemq.store;
import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.usage.MemoryUsage; import org.apache.activemq.usage.MemoryUsage;
@ -42,4 +43,7 @@ abstract public class AbstractMessageStore implements MessageStore {
public void setMemoryUsage(MemoryUsage memoryUsage) { public void setMemoryUsage(MemoryUsage memoryUsage) {
} }
public void setBatch(MessageId messageId) {
}
} }

View File

@ -110,4 +110,11 @@ public interface MessageStore extends Service {
void recoverNextMessages(int maxReturned, MessageRecoveryListener listener) throws Exception; void recoverNextMessages(int maxReturned, MessageRecoveryListener listener) throws Exception;
void dispose(ConnectionContext context); void dispose(ConnectionContext context);
/**
* allow caching cursors to set the current batch offset when cache is exhausted
* @param messageId
*/
void setBatch(MessageId messageId);
} }

View File

@ -92,4 +92,8 @@ public class ProxyMessageStore implements MessageStore {
delegate.resetBatching(); delegate.resetBatching();
} }
public void setBatch(MessageId messageId) {
delegate.setBatch(messageId);
}
} }

View File

@ -134,4 +134,8 @@ public class ProxyTopicMessageStore implements TopicMessageStore {
delegate.resetBatching(); delegate.resetBatching();
} }
public void setBatch(MessageId messageId) {
delegate.setBatch(messageId);
}
} }

View File

@ -558,4 +558,14 @@ public class AMQMessageStore extends AbstractMessageStore {
referenceStore.dispose(context); referenceStore.dispose(context);
super.dispose(context); super.dispose(context);
} }
public void setBatch(MessageId messageId) {
try {
flush();
} catch (InterruptedIOException e) {
LOG.debug("flush on setBatch resulted in exception", e);
}
getReferenceStore().setBatch(messageId);
}
} }

View File

@ -46,6 +46,7 @@ import org.apache.activemq.command.Response;
import org.apache.activemq.filter.MessageEvaluationContext; import org.apache.activemq.filter.MessageEvaluationContext;
import org.apache.activemq.state.ProducerState; import org.apache.activemq.state.ProducerState;
import org.apache.activemq.store.MessageStore; import org.apache.activemq.store.MessageStore;
import org.apache.activemq.store.PersistenceAdapter;
import org.apache.activemq.store.amq.AMQPersistenceAdapter; import org.apache.activemq.store.amq.AMQPersistenceAdapter;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -70,15 +71,19 @@ public class QueueDuplicatesFromStoreTest extends TestCase {
final int ackWindow = 50; final int ackWindow = 50;
final int ackBatchSize = 50; final int ackBatchSize = 50;
final int fullWindow = 200; final int fullWindow = 200;
final int count = 20000; protected int count = 20000;
public void setUp() throws Exception { public void setUp() throws Exception {
brokerService = new BrokerService(); brokerService = createBroker();
brokerService.setUseJmx(false); brokerService.setUseJmx(false);
brokerService.deleteAllMessages(); brokerService.deleteAllMessages();
brokerService.start(); brokerService.start();
} }
protected BrokerService createBroker() throws Exception {
return new BrokerService();
}
public void tearDown() throws Exception { public void tearDown() throws Exception {
brokerService.stop(); brokerService.stop();
} }
@ -92,8 +97,7 @@ public class QueueDuplicatesFromStoreTest extends TestCase {
} }
public void doTestNoDuplicateAfterCacheFullAndAcked(final int auditDepth) throws Exception { public void doTestNoDuplicateAfterCacheFullAndAcked(final int auditDepth) throws Exception {
final AMQPersistenceAdapter persistenceAdapter = final PersistenceAdapter persistenceAdapter = brokerService.getPersistenceAdapter();
(AMQPersistenceAdapter) brokerService.getPersistenceAdapter();
final MessageStore queueMessageStore = final MessageStore queueMessageStore =
persistenceAdapter.createQueueMessageStore(destination); persistenceAdapter.createQueueMessageStore(destination);
final ConnectionContext contextNotInTx = new ConnectionContext(); final ConnectionContext contextNotInTx = new ConnectionContext();
@ -128,8 +132,7 @@ public class QueueDuplicatesFromStoreTest extends TestCase {
queue.send(producerExchange, message); queue.send(producerExchange, message);
} }
assertEquals("store count is correct", count, queueMessageStore assertEquals("store count is correct", count, queueMessageStore.getMessageCount());
.getMessageCount());
// pull from store in small windows // pull from store in small windows
Subscription subscription = new Subscription() { Subscription subscription = new Subscription() {
@ -305,7 +308,6 @@ public class QueueDuplicatesFromStoreTest extends TestCase {
if (removeIndex % 1000 == 0) { if (removeIndex % 1000 == 0) {
LOG.info("acked: " + removeIndex); LOG.info("acked: " + removeIndex);
persistenceAdapter.checkpoint(true); persistenceAdapter.checkpoint(true);
persistenceAdapter.cleanup();
} }
} }
} }