diff --git a/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/AsyncDataManager.java b/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/AsyncDataManager.java index 9cc8e59517..361105ef7a 100644 --- a/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/AsyncDataManager.java +++ b/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/AsyncDataManager.java @@ -206,7 +206,7 @@ public final class AsyncDataManager { } } - private ByteSequence marshallState() throws IOException { + private synchronized ByteSequence marshallState() throws IOException { ByteArrayOutputStream baos = new ByteArrayOutputStream(); DataOutputStream dos = new DataOutputStream(baos); @@ -338,9 +338,7 @@ public final class AsyncDataManager { synchronized void removeInterestInFile(DataFile dataFile) throws IOException{ if(dataFile!=null){ if(dataFile.decrement()<=0){ - if(dataFile!=currentWriteFile){ - removeDataFile(dataFile); - } + removeDataFile(dataFile); } } } @@ -355,21 +353,18 @@ public final class AsyncDataManager { List purgeList=new ArrayList(); for (Integer key : unUsed) { DataFile dataFile=(DataFile) fileMap.get(key); - if( dataFile!=currentWriteFile ) { - purgeList.add(dataFile); - } + purgeList.add(dataFile); } for (DataFile dataFile : purgeList) { - removeDataFile(dataFile); + removeDataFile(dataFile); } - } public synchronized void consolidateDataFiles() throws IOException{ List purgeList=new ArrayList(); for (DataFile dataFile : fileMap.values()) { - if(dataFile.isUnused() && dataFile != currentWriteFile){ + if( dataFile.isUnused() ){ purgeList.add(dataFile); } } @@ -379,12 +374,21 @@ public final class AsyncDataManager { } private void removeDataFile(DataFile dataFile) throws IOException{ + + // Make sure we don't delete too much data. + if( dataFile==currentWriteFile || mark==null || dataFile.getDataFileId() >= mark.getDataFileId() ) { + return; + } + + accessorPool.disposeDataFileAccessors(dataFile); + fileMap.remove(dataFile.getDataFileId()); dataFile.unlink(); - accessorPool.disposeDataFileAccessors(dataFile); boolean result=dataFile.delete(); log.debug("discarding data file "+dataFile+(result?"successful ":"failed")); + } + /** * @return the maxFileLength @@ -479,8 +483,10 @@ public final class AsyncDataManager { return rc; } - public synchronized void setMark(Location location, boolean sync) throws IOException, IllegalStateException { - mark = location; + public void setMark(Location location, boolean sync) throws IOException, IllegalStateException { + synchronized(this) { + mark = location; + } storeState(sync); } diff --git a/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/DataFileAccessorPool.java b/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/DataFileAccessorPool.java index ea7ef2969b..4a8bc41d68 100644 --- a/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/DataFileAccessorPool.java +++ b/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/DataFileAccessorPool.java @@ -36,9 +36,12 @@ public class DataFileAccessorPool { int MAX_OPEN_READERS_PER_FILE=5; class Pool { + private final DataFile file; private final ArrayList pool = new ArrayList(); private boolean used; + private int openCounter; + private boolean disposed; public Pool(DataFile file) { this.file = file; @@ -52,12 +55,14 @@ public class DataFileAccessorPool { rc = (DataFileAccessor) pool.remove(pool.size()-1); } used=true; + openCounter++; return rc; } public void closeDataFileReader(DataFileAccessor reader) { + openCounter--; used=true; - if(pool.size() >= MAX_OPEN_READERS_PER_FILE ) { + if(pool.size() >= MAX_OPEN_READERS_PER_FILE || disposed) { reader.dispose(); } else { pool.add(reader); @@ -77,6 +82,11 @@ public class DataFileAccessorPool { reader.dispose(); } pool.clear(); + disposed=true; + } + + public int getOpenCounter() { + return openCounter; } } @@ -102,17 +112,17 @@ public class DataFileAccessorPool { } } - synchronized void disposeDataFileAccessors(DataFile dataFile) throws IOException { + synchronized void disposeDataFileAccessors(DataFile dataFile) { if( closed ) { - throw new IOException("Closed."); + throw new IllegalStateException("Closed."); } Pool pool = pools.get(dataFile.getDataFileId()); if( pool != null ) { - if( !pool.isUsed() ) { + if( pool.getOpenCounter()==0 ) { pool.dispose(); pools.remove(dataFile.getDataFileId()); } else { - throw new IOException("The data file is still in use: "+dataFile); + throw new IllegalStateException("The data file is still in use: "+dataFile+", use count: "+pool.getOpenCounter()); } } } diff --git a/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java b/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java index 0d80ed54fc..691137e90d 100644 --- a/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java @@ -77,7 +77,7 @@ public class KahaReferenceStore extends KahaMessageStore implements ReferenceSto }else{ for (entry = messageContainer.getFirst();entry != null; entry = messageContainer.getNext(entry)) { ReferenceRecord msg=(ReferenceRecord)messageContainer.get(entry); - if(msg.messageId.equals(identity)){ + if(msg.messageId.equals(identity.toString())){ result=msg; cache.put(identity,entry); break; diff --git a/activemq-core/src/main/java/org/apache/activemq/store/quick/QuickMessageStore.java b/activemq-core/src/main/java/org/apache/activemq/store/quick/QuickMessageStore.java index 759d86c325..073062c173 100644 --- a/activemq-core/src/main/java/org/apache/activemq/store/quick/QuickMessageStore.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/quick/QuickMessageStore.java @@ -18,12 +18,15 @@ package org.apache.activemq.store.quick; import java.io.IOException; +import java.io.InterruptedIOException; import java.util.ArrayList; import java.util.Collections; import java.util.HashSet; import java.util.Iterator; import java.util.LinkedHashMap; import java.util.Map.Entry; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicReference; import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.command.ActiveMQDestination; @@ -38,6 +41,8 @@ import org.apache.activemq.store.MessageStore; import org.apache.activemq.store.PersistenceAdapter; import org.apache.activemq.store.ReferenceStore; import org.apache.activemq.store.ReferenceStore.ReferenceData; +import org.apache.activemq.thread.Task; +import org.apache.activemq.thread.TaskRunner; import org.apache.activemq.transaction.Synchronization; import org.apache.activemq.util.Callback; import org.apache.activemq.util.TransactionTemplate; @@ -66,14 +71,26 @@ public class QuickMessageStore implements MessageStore { private LinkedHashMap cpAddedMessageIds; protected Location lastLocation; + protected Location lastWrittenLocation; + protected HashSet inFlightTxLocations = new HashSet(); + protected final TaskRunner asyncWriteTask; + protected CountDownLatch flushLatch; + private final AtomicReference mark = new AtomicReference(); + public QuickMessageStore(QuickPersistenceAdapter adapter, ReferenceStore referenceStore, ActiveMQDestination destination) { this.peristenceAdapter = adapter; this.transactionStore = adapter.getTransactionStore(); this.referenceStore = referenceStore; this.destination = destination; this.transactionTemplate = new TransactionTemplate(adapter, new ConnectionContext()); + + asyncWriteTask = adapter.getTaskRunnerFactory().createTaskRunner(new Task(){ + public boolean iterate() { + asyncWrite(); + return false; + }}, "Checkpoint: "+destination); } public void setUsageManager(UsageManager usageManager) { @@ -123,7 +140,7 @@ public class QuickMessageStore implements MessageStore { } } - private void addMessage(final Message message, final Location location) { + private void addMessage(final Message message, final Location location) throws InterruptedIOException { ReferenceData data = new ReferenceData(); data.setExpiration(message.getExpiration()); data.setFileId(location.getDataFileId()); @@ -132,6 +149,11 @@ public class QuickMessageStore implements MessageStore { lastLocation = location; messages.put(message.getMessageId(), data); } + try { + asyncWriteTask.wakeup(); + } catch (InterruptedException e) { + throw new InterruptedIOException(); + } } public void replayAddMessage(ConnectionContext context, Message message, Location location) { @@ -193,15 +215,24 @@ public class QuickMessageStore implements MessageStore { } } - private void removeMessage(final MessageAck ack, final Location location) { - synchronized (this) { + private void removeMessage(final MessageAck ack, final Location location) throws InterruptedIOException { + ReferenceData data; + synchronized (this) { lastLocation = location; MessageId id = ack.getLastMessageId(); - ReferenceData data = messages.remove(id); + data = messages.remove(id); if (data == null) { messageAcks.add(ack); } } + + if (data == null) { + try { + asyncWriteTask.wakeup(); + } catch (InterruptedException e) { + throw new InterruptedIOException(); + } + } } public void replayRemoveMessage(ConnectionContext context, MessageAck messageAck) { @@ -216,34 +247,77 @@ public class QuickMessageStore implements MessageStore { log.warn("Could not replay acknowledge for message '" + messageAck.getLastMessageId() + "'. Message may have already been acknowledged. reason: " + e); } } + + /** + * Waits till the lastest data has landed on the referenceStore + * @throws InterruptedIOException + */ + public void flush() throws InterruptedIOException { + log.debug("flush"); + CountDownLatch countDown; + synchronized(this) { + if( lastWrittenLocation == lastLocation ) { + return; + } + if( flushLatch== null ) { + flushLatch = new CountDownLatch(1); + } + countDown = flushLatch; + } + try { + asyncWriteTask.wakeup(); + countDown.await(); + } catch (InterruptedException e) { + throw new InterruptedIOException(); + } + } /** * @return * @throws IOException */ - public Location checkpoint() throws IOException { - return checkpoint(null); + private void asyncWrite() { + try { + CountDownLatch countDown; + synchronized(this) { + countDown = flushLatch; + flushLatch = null; + } + + mark.set(doAsyncWrite()); + + if ( countDown != null ) { + countDown.countDown(); + } + } catch (IOException e) { + log.error("Checkpoint failed: "+e, e); + } } /** * @return * @throws IOException */ - public Location checkpoint(final Callback postCheckpointTest) throws IOException { + protected Location doAsyncWrite() throws IOException { final ArrayList cpRemovedMessageLocations; final ArrayList cpActiveJournalLocations; final int maxCheckpointMessageAddSize = peristenceAdapter.getMaxCheckpointMessageAddSize(); - + final Location lastLocation; + // swap out the message hash maps.. synchronized (this) { cpAddedMessageIds = this.messages; cpRemovedMessageLocations = this.messageAcks; cpActiveJournalLocations=new ArrayList(inFlightTxLocations); this.messages = new LinkedHashMap(); - this.messageAcks = new ArrayList(); + this.messageAcks = new ArrayList(); + lastLocation = this.lastLocation; } + if( log.isDebugEnabled() ) + log.debug("Doing batch update... adding: "+cpAddedMessageIds.size()+" removing: "+cpRemovedMessageLocations.size()+" "); + transactionTemplate.run(new Callback() { public void execute() throws Exception { @@ -284,15 +358,15 @@ public class QuickMessageStore implements MessageStore { } } - if( postCheckpointTest!= null ) { - postCheckpointTest.execute(); - } } }); + + log.debug("Batch update done."); synchronized (this) { cpAddedMessageIds = null; + lastWrittenLocation = lastLocation; } if( cpActiveJournalLocations.size() > 0 ) { @@ -338,7 +412,7 @@ public class QuickMessageStore implements MessageStore { } /** - * Replays the checkpointStore first as those messages are the oldest ones, + * Replays the referenceStore first as those messages are the oldest ones, * then messages are replayed from the transaction log and then the cache is * updated. * @@ -346,7 +420,7 @@ public class QuickMessageStore implements MessageStore { * @throws Exception */ public void recover(final MessageRecoveryListener listener) throws Exception { - peristenceAdapter.checkpoint(true); + flush(); referenceStore.recover(new RecoveryListenerAdapter(this, listener)); } @@ -355,6 +429,7 @@ public class QuickMessageStore implements MessageStore { } public void stop() throws Exception { + asyncWriteTask.shutdown(); referenceStore.stop(); } @@ -369,7 +444,7 @@ public class QuickMessageStore implements MessageStore { * @see org.apache.activemq.store.MessageStore#removeAllMessages(ConnectionContext) */ public void removeAllMessages(ConnectionContext context) throws IOException { - peristenceAdapter.checkpoint(true); + flush(); referenceStore.removeAllMessages(context); } @@ -391,13 +466,12 @@ public class QuickMessageStore implements MessageStore { * @see org.apache.activemq.store.MessageStore#getMessageCount() */ public int getMessageCount() throws IOException{ - peristenceAdapter.checkpoint(true); + flush(); return referenceStore.getMessageCount(); } - - public void recoverNextMessages(int maxReturned,MessageRecoveryListener listener) throws Exception{ - peristenceAdapter.checkpoint(true); + public void recoverNextMessages(int maxReturned,MessageRecoveryListener listener) throws Exception{ + flush(); referenceStore.recoverNextMessages(maxReturned,new RecoveryListenerAdapter(this, listener)); } @@ -408,4 +482,8 @@ public class QuickMessageStore implements MessageStore { } + public Location getMark() { + return mark.get(); + } + } \ No newline at end of file diff --git a/activemq-core/src/main/java/org/apache/activemq/store/quick/QuickPersistenceAdapter.java b/activemq-core/src/main/java/org/apache/activemq/store/quick/QuickPersistenceAdapter.java index 0f1a276709..bdff7506f4 100644 --- a/activemq-core/src/main/java/org/apache/activemq/store/quick/QuickPersistenceAdapter.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/quick/QuickPersistenceAdapter.java @@ -19,19 +19,12 @@ package org.apache.activemq.store.quick; import java.io.File; import java.io.IOException; -import java.util.ArrayList; import java.util.Date; import java.util.HashSet; import java.util.Iterator; import java.util.Set; -import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.FutureTask; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.activeio.journal.Journal; @@ -94,12 +87,14 @@ public class QuickPersistenceAdapter implements PersistenceAdapter, UsageListene private WireFormat wireFormat = new OpenWireFormat(); private UsageManager usageManager; - private long checkpointInterval = 1000 * 30; + + private long cleanupInterval = 1000 * 10; + private long checkpointInterval = 1000 * 10; + private int maxCheckpointWorkers = 1; private int maxCheckpointMessageAddSize = 1024*4; private QuickTransactionStore transactionStore = new QuickTransactionStore(this); - private ThreadPoolExecutor checkpointExecutor; private TaskRunner checkpointTask; private CountDownLatch nextCheckpointCountDownLatch = new CountDownLatch(1); @@ -111,6 +106,7 @@ public class QuickPersistenceAdapter implements PersistenceAdapter, UsageListene private boolean deleteAllMessages; private File directory = new File("activemq-data/quick"); + public synchronized void start() throws Exception { if( !started.compareAndSet(false, true) ) @@ -152,7 +148,13 @@ public class QuickPersistenceAdapter implements PersistenceAdapter, UsageListene Set files = referenceStoreAdapter.getReferenceFileIdsInUse(); for (Integer fileId : files) { - asyncDataManager.addInterestInFile(fileId); + try { + asyncDataManager.addInterestInFile(fileId); + } catch (IOException e) { + // We can expect these since referenceStoreAdapter is a litle behind in updates + // and it might think it has references to data files that have allready come and gone.. + // This should get resolved once recovery kicks in. + } } checkpointTask = taskRunnerFactory.createTaskRunner(new Task(){ @@ -161,15 +163,7 @@ public class QuickPersistenceAdapter implements PersistenceAdapter, UsageListene return false; } }, "ActiveMQ Journal Checkpoint Worker"); - - checkpointExecutor = new ThreadPoolExecutor(maxCheckpointWorkers, maxCheckpointWorkers, 30, TimeUnit.SECONDS, new LinkedBlockingQueue(), new ThreadFactory() { - public Thread newThread(Runnable runable) { - Thread t = new Thread(runable, "Journal checkpoint worker"); - t.setPriority(7); - return t; - } - }); - + createTransactionStore(); recover(); @@ -187,7 +181,7 @@ public class QuickPersistenceAdapter implements PersistenceAdapter, UsageListene cleanup(); } }; - Scheduler.executePeriodically(periodicCleanupTask, checkpointInterval); + Scheduler.executePeriodically(periodicCleanupTask, cleanupInterval); } @@ -200,11 +194,22 @@ public class QuickPersistenceAdapter implements PersistenceAdapter, UsageListene this.usageManager.removeUsageListener(this); Scheduler.cancel(periodicCheckpointTask); + + Iterator iterator = queues.values().iterator(); + while (iterator.hasNext()) { + QuickMessageStore ms = iterator.next(); + ms.stop(); + } + + iterator = topics.values().iterator(); + while (iterator.hasNext()) { + final QuickTopicMessageStore ms = (QuickTopicMessageStore) iterator.next(); + ms.stop(); + } + // Take one final checkpoint and stop checkpoint processing. checkpoint(true); - checkpointTask.shutdown(); - log.debug("Checkpoint task shutdown"); - checkpointExecutor.shutdown(); + checkpointTask.shutdown(); queues.clear(); topics.clear(); @@ -268,55 +273,24 @@ public class QuickPersistenceAdapter implements PersistenceAdapter, UsageListene log.debug("Checkpoint started."); Location newMark = null; - ArrayList futureTasks = new ArrayList(queues.size()+topics.size()); - - // Iterator iterator = queues.values().iterator(); while (iterator.hasNext()) { - try { - final QuickMessageStore ms = iterator.next(); - FutureTask task = new FutureTask(new Callable() { - public Location call() throws Exception { - return ms.checkpoint(); - }}); - futureTasks.add(task); - checkpointExecutor.execute(task); - } - catch (Exception e) { - log.error("Failed to checkpoint a message store: " + e, e); + final QuickMessageStore ms = iterator.next(); + Location mark = (Location) ms.getMark(); + if (mark != null && (newMark == null || newMark.compareTo(mark) < 0)) { + newMark = mark; } } iterator = topics.values().iterator(); while (iterator.hasNext()) { - try { - final QuickTopicMessageStore ms = (QuickTopicMessageStore) iterator.next(); - FutureTask task = new FutureTask(new Callable() { - public Location call() throws Exception { - return ms.checkpoint(); - }}); - futureTasks.add(task); - checkpointExecutor.execute(task); - } - catch (Exception e) { - log.error("Failed to checkpoint a message store: " + e, e); + final QuickTopicMessageStore ms = (QuickTopicMessageStore) iterator.next(); + Location mark = (Location) ms.getMark(); + if (mark != null && (newMark == null || newMark.compareTo(mark) < 0)) { + newMark = mark; } } - try { - for (Iterator iter = futureTasks.iterator(); iter.hasNext();) { - FutureTask ft = iter.next(); - Location mark = (Location) ft.get(); - // We only set a newMark on full checkpoints. - if (mark != null && (newMark == null || newMark.compareTo(mark) < 0)) { - newMark = mark; - } - } - } catch (Throwable e) { - log.error("Failed to checkpoint a message store: " + e, e); - } - - try { if (newMark != null) { log.debug("Marking journal at: " + newMark); @@ -354,10 +328,8 @@ public class QuickPersistenceAdapter implements PersistenceAdapter, UsageListene public void cleanup() { try { - Set inUse = referenceStoreAdapter.getReferenceFileIdsInUse(); asyncDataManager.consolidateDataFilesNotIn(inUse); - } catch (IOException e) { log.error("Could not cleanup data files: "+e, e); } @@ -386,6 +358,11 @@ public class QuickPersistenceAdapter implements PersistenceAdapter, UsageListene if (store == null) { ReferenceStore checkpointStore = referenceStoreAdapter.createQueueReferenceStore(destination); store = new QuickMessageStore(this, checkpointStore, destination); + try { + store.start(); + } catch (Exception e) { + throw IOExceptionSupport.create(e); + } queues.put(destination, store); } return store; @@ -396,6 +373,11 @@ public class QuickPersistenceAdapter implements PersistenceAdapter, UsageListene if (store == null) { TopicReferenceStore checkpointStore = referenceStoreAdapter.createTopicReferenceStore(destinationName); store = new QuickTopicMessageStore(this, checkpointStore, destinationName); + try { + store.start(); + } catch (Exception e) { + throw IOExceptionSupport.create(e); + } topics.put(destinationName, store); } return store; @@ -445,7 +427,7 @@ public class QuickPersistenceAdapter implements PersistenceAdapter, UsageListene * @throws InvalidLocationException * @throws IllegalStateException */ - private void recover() throws IllegalStateException, IOException, IOException { + private void recover() throws IllegalStateException, IOException { Location pos = null; int transactionCounter = 0; @@ -594,8 +576,7 @@ public class QuickPersistenceAdapter implements PersistenceAdapter, UsageListene newPercentUsage = ((newPercentUsage)/10)*10; oldPercentUsage = ((oldPercentUsage)/10)*10; if (newPercentUsage >= 70 && oldPercentUsage < newPercentUsage) { - boolean sync = newPercentUsage >= 90; - checkpoint(sync); + checkpoint(false); } } diff --git a/activemq-core/src/main/java/org/apache/activemq/store/quick/QuickTopicMessageStore.java b/activemq-core/src/main/java/org/apache/activemq/store/quick/QuickTopicMessageStore.java index 51bacda4fa..e42839040e 100644 --- a/activemq-core/src/main/java/org/apache/activemq/store/quick/QuickTopicMessageStore.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/quick/QuickTopicMessageStore.java @@ -18,13 +18,13 @@ package org.apache.activemq.store.quick; import java.io.IOException; +import java.io.InterruptedIOException; import java.util.HashMap; import java.util.Iterator; import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.command.ActiveMQTopic; import org.apache.activemq.command.JournalTopicAck; -import org.apache.activemq.command.Message; import org.apache.activemq.command.MessageId; import org.apache.activemq.command.SubscriptionInfo; import org.apache.activemq.kaha.impl.async.Location; @@ -49,18 +49,18 @@ public class QuickTopicMessageStore extends QuickMessageStore implements TopicMe private TopicReferenceStore topicReferenceStore; private HashMap ackedLastAckLocations = new HashMap(); - public QuickTopicMessageStore(QuickPersistenceAdapter adapter, TopicReferenceStore checkpointStore, ActiveMQTopic destinationName) { - super(adapter, checkpointStore, destinationName); - this.topicReferenceStore = checkpointStore; + public QuickTopicMessageStore(QuickPersistenceAdapter adapter, TopicReferenceStore topicReferenceStore, ActiveMQTopic destinationName) { + super(adapter, topicReferenceStore, destinationName); + this.topicReferenceStore = topicReferenceStore; } public void recoverSubscription(String clientId, String subscriptionName, MessageRecoveryListener listener) throws Exception { - this.peristenceAdapter.checkpoint(true); + flush(); topicReferenceStore.recoverSubscription(clientId, subscriptionName, new RecoveryListenerAdapter(this, listener)); } public void recoverNextMessages(String clientId,String subscriptionName,int maxReturned, final MessageRecoveryListener listener) throws Exception{ - this.peristenceAdapter.checkpoint(true); + flush(); topicReferenceStore.recoverNextMessages(clientId, subscriptionName, maxReturned, new RecoveryListenerAdapter(this, listener)); } @@ -69,14 +69,10 @@ public class QuickTopicMessageStore extends QuickMessageStore implements TopicMe } public void addSubsciption(String clientId, String subscriptionName, String selector, boolean retroactive) throws IOException { - this.peristenceAdapter.checkpoint(true); + flush(); topicReferenceStore.addSubsciption(clientId, subscriptionName, selector, retroactive); } - public void addMessage(ConnectionContext context, Message message) throws IOException { - super.addMessage(context, message); - } - /** */ public void acknowledge(ConnectionContext context, String clientId, String subscriptionName, final MessageId messageId) throws IOException { @@ -141,27 +137,35 @@ public class QuickTopicMessageStore extends QuickMessageStore implements TopicMe * @param messageId * @param location * @param key + * @throws InterruptedIOException */ - private void acknowledge(MessageId messageId, Location location, SubscriptionKey key) { + private void acknowledge(MessageId messageId, Location location, SubscriptionKey key) throws InterruptedIOException { synchronized(this) { lastLocation = location; ackedLastAckLocations.put(key, messageId); } + try { + asyncWriteTask.wakeup(); + } catch (InterruptedException e) { + throw new InterruptedIOException(); + } } - public Location checkpoint() throws IOException { - - final HashMap cpAckedLastAckLocations; + @Override + protected Location doAsyncWrite() throws IOException { + + final HashMap cpAckedLastAckLocations; // swap out the hash maps.. synchronized (this) { cpAckedLastAckLocations = this.ackedLastAckLocations; this.ackedLastAckLocations = new HashMap(); } - - return super.checkpoint( new Callback() { + + Location location = super.doAsyncWrite(); + + transactionTemplate.run(new Callback() { public void execute() throws Exception { - // Checkpoint the acknowledged messages. Iterator iterator = cpAckedLastAckLocations.keySet().iterator(); while (iterator.hasNext()) { @@ -169,12 +173,12 @@ public class QuickTopicMessageStore extends QuickMessageStore implements TopicMe MessageId identity = cpAckedLastAckLocations.get(subscriptionKey); topicReferenceStore.acknowledge(transactionTemplate.getContext(), subscriptionKey.clientId, subscriptionKey.subscriptionName, identity); } - } - }); - + } ); + + return location; } - + /** * @return Returns the longTermStore. */ @@ -192,7 +196,7 @@ public class QuickTopicMessageStore extends QuickMessageStore implements TopicMe public int getMessageCount(String clientId,String subscriberName) throws IOException{ - this.peristenceAdapter.checkpoint(true); + flush(); return topicReferenceStore.getMessageCount(clientId,subscriberName); } diff --git a/activemq-core/src/main/java/org/apache/activemq/store/quick/RecoveryListenerAdapter.java b/activemq-core/src/main/java/org/apache/activemq/store/quick/RecoveryListenerAdapter.java index 8ac9db0f23..377836eaf5 100644 --- a/activemq-core/src/main/java/org/apache/activemq/store/quick/RecoveryListenerAdapter.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/quick/RecoveryListenerAdapter.java @@ -21,9 +21,12 @@ import org.apache.activemq.command.Message; import org.apache.activemq.command.MessageId; import org.apache.activemq.store.MessageRecoveryListener; import org.apache.activemq.store.MessageStore; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; final class RecoveryListenerAdapter implements MessageRecoveryListener { - + static final private Log log = LogFactory.getLog(RecoveryListenerAdapter.class); + private final MessageStore store; private final MessageRecoveryListener listener; @@ -45,6 +48,12 @@ final class RecoveryListenerAdapter implements MessageRecoveryListener { } public void recoverMessageReference(MessageId ref) throws Exception { - listener.recoverMessage( this.store.getMessage(ref) ); + Message message = this.store.getMessage(ref); + if( message !=null ){ + listener.recoverMessage( message ); + } else { + log.error("Message id "+ref+" could not be recovered from the data store!"); + } + } } \ No newline at end of file diff --git a/activemq-core/src/test/java/org/apache/activemq/JmsTestSupport.java b/activemq-core/src/test/java/org/apache/activemq/JmsTestSupport.java index f80462e3ba..08c7404c43 100755 --- a/activemq-core/src/test/java/org/apache/activemq/JmsTestSupport.java +++ b/activemq-core/src/test/java/org/apache/activemq/JmsTestSupport.java @@ -17,6 +17,7 @@ */ package org.apache.activemq; +import java.io.File; import java.io.IOException; import java.net.URI; import java.util.ArrayList; @@ -32,8 +33,6 @@ import javax.jms.MessageConsumer; import javax.jms.MessageProducer; import javax.jms.Session; -import org.apache.activemq.ActiveMQConnection; -import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.broker.BrokerFactory; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.command.ActiveMQDestination; @@ -105,6 +104,12 @@ public class JmsTestSupport extends CombinationTestSupport { protected void setUp() throws Exception { super.setUp(); + + if(System.getProperty("basedir")==null){ + File file=new File("."); + System.setProperty("basedir",file.getAbsolutePath()); + } + broker = createBroker(); broker.start(); factory = createConnectionFactory(); diff --git a/activemq-core/src/test/java/org/apache/activemq/broker/store/LoadTester.java b/activemq-core/src/test/java/org/apache/activemq/broker/store/LoadTester.java index 4c4d134c9d..3a88601d7c 100755 --- a/activemq-core/src/test/java/org/apache/activemq/broker/store/LoadTester.java +++ b/activemq-core/src/test/java/org/apache/activemq/broker/store/LoadTester.java @@ -45,6 +45,9 @@ import org.apache.activemq.command.ActiveMQQueue; */ public class LoadTester extends JmsTestSupport { + protected int MESSAGE_SIZE=1024*64; + protected int PRODUCE_COUNT=10000; + protected BrokerService createBroker() throws Exception { return BrokerFactory.createBroker(new URI("xbean:org/apache/activemq/broker/store/loadtester.xml")); } @@ -56,8 +59,6 @@ public class LoadTester extends JmsTestSupport { } public void testQueueSendThenAddConsumer() throws Exception { - int MESSAGE_SIZE=1024*64; - int PRODUCE_COUNT=10000; ProgressPrinter printer = new ProgressPrinter(PRODUCE_COUNT, 20); ActiveMQDestination destination = new ActiveMQQueue("TEST"); diff --git a/activemq-core/src/test/java/org/apache/activemq/broker/store/QuickJournalRecoveryBrokerTest.java b/activemq-core/src/test/java/org/apache/activemq/broker/store/QuickStoreRecoveryBrokerTest.java similarity index 94% rename from activemq-core/src/test/java/org/apache/activemq/broker/store/QuickJournalRecoveryBrokerTest.java rename to activemq-core/src/test/java/org/apache/activemq/broker/store/QuickStoreRecoveryBrokerTest.java index f9300783c1..9cdd923e5c 100644 --- a/activemq-core/src/test/java/org/apache/activemq/broker/store/QuickJournalRecoveryBrokerTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/broker/store/QuickStoreRecoveryBrokerTest.java @@ -28,7 +28,7 @@ import org.apache.activemq.store.quick.QuickPersistenceAdapter; * * @version $Revision$ */ -public class QuickJournalRecoveryBrokerTest extends RecoveryBrokerTest { +public class QuickStoreRecoveryBrokerTest extends RecoveryBrokerTest { protected BrokerService createBroker() throws Exception { BrokerService service = new BrokerService(); @@ -46,7 +46,7 @@ public class QuickJournalRecoveryBrokerTest extends RecoveryBrokerTest { } public static Test suite() { - return suite(QuickJournalRecoveryBrokerTest.class); + return suite(QuickStoreRecoveryBrokerTest.class); } public static void main(String[] args) { diff --git a/activemq-core/src/test/java/org/apache/activemq/broker/store/QuickJournalXARecoveryBrokerTest.java b/activemq-core/src/test/java/org/apache/activemq/broker/store/QuickStoreXARecoveryBrokerTest.java similarity index 74% rename from activemq-core/src/test/java/org/apache/activemq/broker/store/QuickJournalXARecoveryBrokerTest.java rename to activemq-core/src/test/java/org/apache/activemq/broker/store/QuickStoreXARecoveryBrokerTest.java index b5b4750676..c36e292b41 100644 --- a/activemq-core/src/test/java/org/apache/activemq/broker/store/QuickJournalXARecoveryBrokerTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/broker/store/QuickStoreXARecoveryBrokerTest.java @@ -21,17 +21,17 @@ import junit.framework.Test; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.XARecoveryBrokerTest; -import org.apache.activemq.store.DefaultPersistenceAdapterFactory; +import org.apache.activemq.store.quick.QuickPersistenceAdapter; /** * Used to verify that recovery works correctly against * * @version $Revision$ */ -public class QuickJournalXARecoveryBrokerTest extends XARecoveryBrokerTest { +public class QuickStoreXARecoveryBrokerTest extends XARecoveryBrokerTest { public static Test suite() { - return suite(QuickJournalXARecoveryBrokerTest.class); + return suite(QuickStoreXARecoveryBrokerTest.class); } public static void main(String[] args) { @@ -41,15 +41,15 @@ public class QuickJournalXARecoveryBrokerTest extends XARecoveryBrokerTest { protected BrokerService createBroker() throws Exception { BrokerService service = new BrokerService(); service.setDeleteAllMessagesOnStartup(true); - DefaultPersistenceAdapterFactory factory = (DefaultPersistenceAdapterFactory) service.getPersistenceFactory(); - factory.setUseQuickJournal(true); + QuickPersistenceAdapter pa = new QuickPersistenceAdapter(); + service.setPersistenceAdapter(pa); return service; } protected BrokerService createRestartedBroker() throws Exception { BrokerService service = new BrokerService(); - DefaultPersistenceAdapterFactory factory = (DefaultPersistenceAdapterFactory) service.getPersistenceFactory(); - factory.setUseQuickJournal(true); + QuickPersistenceAdapter pa = new QuickPersistenceAdapter(); + service.setPersistenceAdapter(pa); return service; }