Fixed a ton of Quick store bugs that were found when running the QuickStoreLoadTester.

git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@492471 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Hiram R. Chirino 2007-01-04 09:34:46 +00:00
parent 158dbc66e7
commit 75b1c94ddc
11 changed files with 237 additions and 143 deletions

View File

@ -206,7 +206,7 @@ public final class AsyncDataManager {
} }
} }
private ByteSequence marshallState() throws IOException { private synchronized ByteSequence marshallState() throws IOException {
ByteArrayOutputStream baos = new ByteArrayOutputStream(); ByteArrayOutputStream baos = new ByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream(baos); DataOutputStream dos = new DataOutputStream(baos);
@ -338,9 +338,7 @@ public final class AsyncDataManager {
synchronized void removeInterestInFile(DataFile dataFile) throws IOException{ synchronized void removeInterestInFile(DataFile dataFile) throws IOException{
if(dataFile!=null){ if(dataFile!=null){
if(dataFile.decrement()<=0){ if(dataFile.decrement()<=0){
if(dataFile!=currentWriteFile){ removeDataFile(dataFile);
removeDataFile(dataFile);
}
} }
} }
} }
@ -355,21 +353,18 @@ public final class AsyncDataManager {
List<DataFile> purgeList=new ArrayList<DataFile>(); List<DataFile> purgeList=new ArrayList<DataFile>();
for (Integer key : unUsed) { for (Integer key : unUsed) {
DataFile dataFile=(DataFile) fileMap.get(key); DataFile dataFile=(DataFile) fileMap.get(key);
if( dataFile!=currentWriteFile ) { purgeList.add(dataFile);
purgeList.add(dataFile);
}
} }
for (DataFile dataFile : purgeList) { for (DataFile dataFile : purgeList) {
removeDataFile(dataFile); removeDataFile(dataFile);
} }
} }
public synchronized void consolidateDataFiles() throws IOException{ public synchronized void consolidateDataFiles() throws IOException{
List<DataFile> purgeList=new ArrayList<DataFile>(); List<DataFile> purgeList=new ArrayList<DataFile>();
for (DataFile dataFile : fileMap.values()) { for (DataFile dataFile : fileMap.values()) {
if(dataFile.isUnused() && dataFile != currentWriteFile){ if( dataFile.isUnused() ){
purgeList.add(dataFile); purgeList.add(dataFile);
} }
} }
@ -379,12 +374,21 @@ public final class AsyncDataManager {
} }
private void removeDataFile(DataFile dataFile) throws IOException{ private void removeDataFile(DataFile dataFile) throws IOException{
// Make sure we don't delete too much data.
if( dataFile==currentWriteFile || mark==null || dataFile.getDataFileId() >= mark.getDataFileId() ) {
return;
}
accessorPool.disposeDataFileAccessors(dataFile);
fileMap.remove(dataFile.getDataFileId()); fileMap.remove(dataFile.getDataFileId());
dataFile.unlink(); dataFile.unlink();
accessorPool.disposeDataFileAccessors(dataFile);
boolean result=dataFile.delete(); boolean result=dataFile.delete();
log.debug("discarding data file "+dataFile+(result?"successful ":"failed")); log.debug("discarding data file "+dataFile+(result?"successful ":"failed"));
} }
/** /**
* @return the maxFileLength * @return the maxFileLength
@ -479,8 +483,10 @@ public final class AsyncDataManager {
return rc; return rc;
} }
public synchronized void setMark(Location location, boolean sync) throws IOException, IllegalStateException { public void setMark(Location location, boolean sync) throws IOException, IllegalStateException {
mark = location; synchronized(this) {
mark = location;
}
storeState(sync); storeState(sync);
} }

View File

@ -36,9 +36,12 @@ public class DataFileAccessorPool {
int MAX_OPEN_READERS_PER_FILE=5; int MAX_OPEN_READERS_PER_FILE=5;
class Pool { class Pool {
private final DataFile file; private final DataFile file;
private final ArrayList<DataFileAccessor> pool = new ArrayList<DataFileAccessor>(); private final ArrayList<DataFileAccessor> pool = new ArrayList<DataFileAccessor>();
private boolean used; private boolean used;
private int openCounter;
private boolean disposed;
public Pool(DataFile file) { public Pool(DataFile file) {
this.file = file; this.file = file;
@ -52,12 +55,14 @@ public class DataFileAccessorPool {
rc = (DataFileAccessor) pool.remove(pool.size()-1); rc = (DataFileAccessor) pool.remove(pool.size()-1);
} }
used=true; used=true;
openCounter++;
return rc; return rc;
} }
public void closeDataFileReader(DataFileAccessor reader) { public void closeDataFileReader(DataFileAccessor reader) {
openCounter--;
used=true; used=true;
if(pool.size() >= MAX_OPEN_READERS_PER_FILE ) { if(pool.size() >= MAX_OPEN_READERS_PER_FILE || disposed) {
reader.dispose(); reader.dispose();
} else { } else {
pool.add(reader); pool.add(reader);
@ -77,6 +82,11 @@ public class DataFileAccessorPool {
reader.dispose(); reader.dispose();
} }
pool.clear(); pool.clear();
disposed=true;
}
public int getOpenCounter() {
return openCounter;
} }
} }
@ -102,17 +112,17 @@ public class DataFileAccessorPool {
} }
} }
synchronized void disposeDataFileAccessors(DataFile dataFile) throws IOException { synchronized void disposeDataFileAccessors(DataFile dataFile) {
if( closed ) { if( closed ) {
throw new IOException("Closed."); throw new IllegalStateException("Closed.");
} }
Pool pool = pools.get(dataFile.getDataFileId()); Pool pool = pools.get(dataFile.getDataFileId());
if( pool != null ) { if( pool != null ) {
if( !pool.isUsed() ) { if( pool.getOpenCounter()==0 ) {
pool.dispose(); pool.dispose();
pools.remove(dataFile.getDataFileId()); pools.remove(dataFile.getDataFileId());
} else { } else {
throw new IOException("The data file is still in use: "+dataFile); throw new IllegalStateException("The data file is still in use: "+dataFile+", use count: "+pool.getOpenCounter());
} }
} }
} }

View File

@ -77,7 +77,7 @@ public class KahaReferenceStore extends KahaMessageStore implements ReferenceSto
}else{ }else{
for (entry = messageContainer.getFirst();entry != null; entry = messageContainer.getNext(entry)) { for (entry = messageContainer.getFirst();entry != null; entry = messageContainer.getNext(entry)) {
ReferenceRecord msg=(ReferenceRecord)messageContainer.get(entry); ReferenceRecord msg=(ReferenceRecord)messageContainer.get(entry);
if(msg.messageId.equals(identity)){ if(msg.messageId.equals(identity.toString())){
result=msg; result=msg;
cache.put(identity,entry); cache.put(identity,entry);
break; break;

View File

@ -18,12 +18,15 @@
package org.apache.activemq.store.quick; package org.apache.activemq.store.quick;
import java.io.IOException; import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.HashSet; import java.util.HashSet;
import java.util.Iterator; import java.util.Iterator;
import java.util.LinkedHashMap; import java.util.LinkedHashMap;
import java.util.Map.Entry; import java.util.Map.Entry;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQDestination;
@ -38,6 +41,8 @@ import org.apache.activemq.store.MessageStore;
import org.apache.activemq.store.PersistenceAdapter; import org.apache.activemq.store.PersistenceAdapter;
import org.apache.activemq.store.ReferenceStore; import org.apache.activemq.store.ReferenceStore;
import org.apache.activemq.store.ReferenceStore.ReferenceData; import org.apache.activemq.store.ReferenceStore.ReferenceData;
import org.apache.activemq.thread.Task;
import org.apache.activemq.thread.TaskRunner;
import org.apache.activemq.transaction.Synchronization; import org.apache.activemq.transaction.Synchronization;
import org.apache.activemq.util.Callback; import org.apache.activemq.util.Callback;
import org.apache.activemq.util.TransactionTemplate; import org.apache.activemq.util.TransactionTemplate;
@ -66,14 +71,26 @@ public class QuickMessageStore implements MessageStore {
private LinkedHashMap<MessageId, ReferenceData> cpAddedMessageIds; private LinkedHashMap<MessageId, ReferenceData> cpAddedMessageIds;
protected Location lastLocation; protected Location lastLocation;
protected Location lastWrittenLocation;
protected HashSet<Location> inFlightTxLocations = new HashSet<Location>(); protected HashSet<Location> inFlightTxLocations = new HashSet<Location>();
protected final TaskRunner asyncWriteTask;
protected CountDownLatch flushLatch;
private final AtomicReference<Location> mark = new AtomicReference<Location>();
public QuickMessageStore(QuickPersistenceAdapter adapter, ReferenceStore referenceStore, ActiveMQDestination destination) { public QuickMessageStore(QuickPersistenceAdapter adapter, ReferenceStore referenceStore, ActiveMQDestination destination) {
this.peristenceAdapter = adapter; this.peristenceAdapter = adapter;
this.transactionStore = adapter.getTransactionStore(); this.transactionStore = adapter.getTransactionStore();
this.referenceStore = referenceStore; this.referenceStore = referenceStore;
this.destination = destination; this.destination = destination;
this.transactionTemplate = new TransactionTemplate(adapter, new ConnectionContext()); this.transactionTemplate = new TransactionTemplate(adapter, new ConnectionContext());
asyncWriteTask = adapter.getTaskRunnerFactory().createTaskRunner(new Task(){
public boolean iterate() {
asyncWrite();
return false;
}}, "Checkpoint: "+destination);
} }
public void setUsageManager(UsageManager usageManager) { public void setUsageManager(UsageManager usageManager) {
@ -123,7 +140,7 @@ public class QuickMessageStore implements MessageStore {
} }
} }
private void addMessage(final Message message, final Location location) { private void addMessage(final Message message, final Location location) throws InterruptedIOException {
ReferenceData data = new ReferenceData(); ReferenceData data = new ReferenceData();
data.setExpiration(message.getExpiration()); data.setExpiration(message.getExpiration());
data.setFileId(location.getDataFileId()); data.setFileId(location.getDataFileId());
@ -132,6 +149,11 @@ public class QuickMessageStore implements MessageStore {
lastLocation = location; lastLocation = location;
messages.put(message.getMessageId(), data); messages.put(message.getMessageId(), data);
} }
try {
asyncWriteTask.wakeup();
} catch (InterruptedException e) {
throw new InterruptedIOException();
}
} }
public void replayAddMessage(ConnectionContext context, Message message, Location location) { public void replayAddMessage(ConnectionContext context, Message message, Location location) {
@ -193,15 +215,24 @@ public class QuickMessageStore implements MessageStore {
} }
} }
private void removeMessage(final MessageAck ack, final Location location) { private void removeMessage(final MessageAck ack, final Location location) throws InterruptedIOException {
synchronized (this) { ReferenceData data;
synchronized (this) {
lastLocation = location; lastLocation = location;
MessageId id = ack.getLastMessageId(); MessageId id = ack.getLastMessageId();
ReferenceData data = messages.remove(id); data = messages.remove(id);
if (data == null) { if (data == null) {
messageAcks.add(ack); messageAcks.add(ack);
} }
} }
if (data == null) {
try {
asyncWriteTask.wakeup();
} catch (InterruptedException e) {
throw new InterruptedIOException();
}
}
} }
public void replayRemoveMessage(ConnectionContext context, MessageAck messageAck) { public void replayRemoveMessage(ConnectionContext context, MessageAck messageAck) {
@ -216,34 +247,77 @@ public class QuickMessageStore implements MessageStore {
log.warn("Could not replay acknowledge for message '" + messageAck.getLastMessageId() + "'. Message may have already been acknowledged. reason: " + e); log.warn("Could not replay acknowledge for message '" + messageAck.getLastMessageId() + "'. Message may have already been acknowledged. reason: " + e);
} }
} }
/**
* Waits till the lastest data has landed on the referenceStore
* @throws InterruptedIOException
*/
public void flush() throws InterruptedIOException {
log.debug("flush");
CountDownLatch countDown;
synchronized(this) {
if( lastWrittenLocation == lastLocation ) {
return;
}
if( flushLatch== null ) {
flushLatch = new CountDownLatch(1);
}
countDown = flushLatch;
}
try {
asyncWriteTask.wakeup();
countDown.await();
} catch (InterruptedException e) {
throw new InterruptedIOException();
}
}
/** /**
* @return * @return
* @throws IOException * @throws IOException
*/ */
public Location checkpoint() throws IOException { private void asyncWrite() {
return checkpoint(null); try {
CountDownLatch countDown;
synchronized(this) {
countDown = flushLatch;
flushLatch = null;
}
mark.set(doAsyncWrite());
if ( countDown != null ) {
countDown.countDown();
}
} catch (IOException e) {
log.error("Checkpoint failed: "+e, e);
}
} }
/** /**
* @return * @return
* @throws IOException * @throws IOException
*/ */
public Location checkpoint(final Callback postCheckpointTest) throws IOException { protected Location doAsyncWrite() throws IOException {
final ArrayList<MessageAck> cpRemovedMessageLocations; final ArrayList<MessageAck> cpRemovedMessageLocations;
final ArrayList<Location> cpActiveJournalLocations; final ArrayList<Location> cpActiveJournalLocations;
final int maxCheckpointMessageAddSize = peristenceAdapter.getMaxCheckpointMessageAddSize(); final int maxCheckpointMessageAddSize = peristenceAdapter.getMaxCheckpointMessageAddSize();
final Location lastLocation;
// swap out the message hash maps.. // swap out the message hash maps..
synchronized (this) { synchronized (this) {
cpAddedMessageIds = this.messages; cpAddedMessageIds = this.messages;
cpRemovedMessageLocations = this.messageAcks; cpRemovedMessageLocations = this.messageAcks;
cpActiveJournalLocations=new ArrayList<Location>(inFlightTxLocations); cpActiveJournalLocations=new ArrayList<Location>(inFlightTxLocations);
this.messages = new LinkedHashMap<MessageId, ReferenceData>(); this.messages = new LinkedHashMap<MessageId, ReferenceData>();
this.messageAcks = new ArrayList<MessageAck>(); this.messageAcks = new ArrayList<MessageAck>();
lastLocation = this.lastLocation;
} }
if( log.isDebugEnabled() )
log.debug("Doing batch update... adding: "+cpAddedMessageIds.size()+" removing: "+cpRemovedMessageLocations.size()+" ");
transactionTemplate.run(new Callback() { transactionTemplate.run(new Callback() {
public void execute() throws Exception { public void execute() throws Exception {
@ -284,15 +358,15 @@ public class QuickMessageStore implements MessageStore {
} }
} }
if( postCheckpointTest!= null ) {
postCheckpointTest.execute();
}
} }
}); });
log.debug("Batch update done.");
synchronized (this) { synchronized (this) {
cpAddedMessageIds = null; cpAddedMessageIds = null;
lastWrittenLocation = lastLocation;
} }
if( cpActiveJournalLocations.size() > 0 ) { if( cpActiveJournalLocations.size() > 0 ) {
@ -338,7 +412,7 @@ public class QuickMessageStore implements MessageStore {
} }
/** /**
* Replays the checkpointStore first as those messages are the oldest ones, * Replays the referenceStore first as those messages are the oldest ones,
* then messages are replayed from the transaction log and then the cache is * then messages are replayed from the transaction log and then the cache is
* updated. * updated.
* *
@ -346,7 +420,7 @@ public class QuickMessageStore implements MessageStore {
* @throws Exception * @throws Exception
*/ */
public void recover(final MessageRecoveryListener listener) throws Exception { public void recover(final MessageRecoveryListener listener) throws Exception {
peristenceAdapter.checkpoint(true); flush();
referenceStore.recover(new RecoveryListenerAdapter(this, listener)); referenceStore.recover(new RecoveryListenerAdapter(this, listener));
} }
@ -355,6 +429,7 @@ public class QuickMessageStore implements MessageStore {
} }
public void stop() throws Exception { public void stop() throws Exception {
asyncWriteTask.shutdown();
referenceStore.stop(); referenceStore.stop();
} }
@ -369,7 +444,7 @@ public class QuickMessageStore implements MessageStore {
* @see org.apache.activemq.store.MessageStore#removeAllMessages(ConnectionContext) * @see org.apache.activemq.store.MessageStore#removeAllMessages(ConnectionContext)
*/ */
public void removeAllMessages(ConnectionContext context) throws IOException { public void removeAllMessages(ConnectionContext context) throws IOException {
peristenceAdapter.checkpoint(true); flush();
referenceStore.removeAllMessages(context); referenceStore.removeAllMessages(context);
} }
@ -391,13 +466,12 @@ public class QuickMessageStore implements MessageStore {
* @see org.apache.activemq.store.MessageStore#getMessageCount() * @see org.apache.activemq.store.MessageStore#getMessageCount()
*/ */
public int getMessageCount() throws IOException{ public int getMessageCount() throws IOException{
peristenceAdapter.checkpoint(true); flush();
return referenceStore.getMessageCount(); return referenceStore.getMessageCount();
} }
public void recoverNextMessages(int maxReturned,MessageRecoveryListener listener) throws Exception{
public void recoverNextMessages(int maxReturned,MessageRecoveryListener listener) throws Exception{ flush();
peristenceAdapter.checkpoint(true);
referenceStore.recoverNextMessages(maxReturned,new RecoveryListenerAdapter(this, listener)); referenceStore.recoverNextMessages(maxReturned,new RecoveryListenerAdapter(this, listener));
} }
@ -408,4 +482,8 @@ public class QuickMessageStore implements MessageStore {
} }
public Location getMark() {
return mark.get();
}
} }

View File

@ -19,19 +19,12 @@ package org.apache.activemq.store.quick;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList;
import java.util.Date; import java.util.Date;
import java.util.HashSet; import java.util.HashSet;
import java.util.Iterator; import java.util.Iterator;
import java.util.Set; import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.FutureTask;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.activeio.journal.Journal; import org.apache.activeio.journal.Journal;
@ -94,12 +87,14 @@ public class QuickPersistenceAdapter implements PersistenceAdapter, UsageListene
private WireFormat wireFormat = new OpenWireFormat(); private WireFormat wireFormat = new OpenWireFormat();
private UsageManager usageManager; private UsageManager usageManager;
private long checkpointInterval = 1000 * 30;
private long cleanupInterval = 1000 * 10;
private long checkpointInterval = 1000 * 10;
private int maxCheckpointWorkers = 1; private int maxCheckpointWorkers = 1;
private int maxCheckpointMessageAddSize = 1024*4; private int maxCheckpointMessageAddSize = 1024*4;
private QuickTransactionStore transactionStore = new QuickTransactionStore(this); private QuickTransactionStore transactionStore = new QuickTransactionStore(this);
private ThreadPoolExecutor checkpointExecutor;
private TaskRunner checkpointTask; private TaskRunner checkpointTask;
private CountDownLatch nextCheckpointCountDownLatch = new CountDownLatch(1); private CountDownLatch nextCheckpointCountDownLatch = new CountDownLatch(1);
@ -111,6 +106,7 @@ public class QuickPersistenceAdapter implements PersistenceAdapter, UsageListene
private boolean deleteAllMessages; private boolean deleteAllMessages;
private File directory = new File("activemq-data/quick"); private File directory = new File("activemq-data/quick");
public synchronized void start() throws Exception { public synchronized void start() throws Exception {
if( !started.compareAndSet(false, true) ) if( !started.compareAndSet(false, true) )
@ -152,7 +148,13 @@ public class QuickPersistenceAdapter implements PersistenceAdapter, UsageListene
Set<Integer> files = referenceStoreAdapter.getReferenceFileIdsInUse(); Set<Integer> files = referenceStoreAdapter.getReferenceFileIdsInUse();
for (Integer fileId : files) { for (Integer fileId : files) {
asyncDataManager.addInterestInFile(fileId); try {
asyncDataManager.addInterestInFile(fileId);
} catch (IOException e) {
// We can expect these since referenceStoreAdapter is a litle behind in updates
// and it might think it has references to data files that have allready come and gone..
// This should get resolved once recovery kicks in.
}
} }
checkpointTask = taskRunnerFactory.createTaskRunner(new Task(){ checkpointTask = taskRunnerFactory.createTaskRunner(new Task(){
@ -161,15 +163,7 @@ public class QuickPersistenceAdapter implements PersistenceAdapter, UsageListene
return false; return false;
} }
}, "ActiveMQ Journal Checkpoint Worker"); }, "ActiveMQ Journal Checkpoint Worker");
checkpointExecutor = new ThreadPoolExecutor(maxCheckpointWorkers, maxCheckpointWorkers, 30, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new ThreadFactory() {
public Thread newThread(Runnable runable) {
Thread t = new Thread(runable, "Journal checkpoint worker");
t.setPriority(7);
return t;
}
});
createTransactionStore(); createTransactionStore();
recover(); recover();
@ -187,7 +181,7 @@ public class QuickPersistenceAdapter implements PersistenceAdapter, UsageListene
cleanup(); cleanup();
} }
}; };
Scheduler.executePeriodically(periodicCleanupTask, checkpointInterval); Scheduler.executePeriodically(periodicCleanupTask, cleanupInterval);
} }
@ -200,11 +194,22 @@ public class QuickPersistenceAdapter implements PersistenceAdapter, UsageListene
this.usageManager.removeUsageListener(this); this.usageManager.removeUsageListener(this);
Scheduler.cancel(periodicCheckpointTask); Scheduler.cancel(periodicCheckpointTask);
Iterator<QuickMessageStore> iterator = queues.values().iterator();
while (iterator.hasNext()) {
QuickMessageStore ms = iterator.next();
ms.stop();
}
iterator = topics.values().iterator();
while (iterator.hasNext()) {
final QuickTopicMessageStore ms = (QuickTopicMessageStore) iterator.next();
ms.stop();
}
// Take one final checkpoint and stop checkpoint processing. // Take one final checkpoint and stop checkpoint processing.
checkpoint(true); checkpoint(true);
checkpointTask.shutdown(); checkpointTask.shutdown();
log.debug("Checkpoint task shutdown");
checkpointExecutor.shutdown();
queues.clear(); queues.clear();
topics.clear(); topics.clear();
@ -268,55 +273,24 @@ public class QuickPersistenceAdapter implements PersistenceAdapter, UsageListene
log.debug("Checkpoint started."); log.debug("Checkpoint started.");
Location newMark = null; Location newMark = null;
ArrayList<FutureTask> futureTasks = new ArrayList<FutureTask>(queues.size()+topics.size());
//
Iterator<QuickMessageStore> iterator = queues.values().iterator(); Iterator<QuickMessageStore> iterator = queues.values().iterator();
while (iterator.hasNext()) { while (iterator.hasNext()) {
try { final QuickMessageStore ms = iterator.next();
final QuickMessageStore ms = iterator.next(); Location mark = (Location) ms.getMark();
FutureTask<Location> task = new FutureTask<Location>(new Callable<Location>() { if (mark != null && (newMark == null || newMark.compareTo(mark) < 0)) {
public Location call() throws Exception { newMark = mark;
return ms.checkpoint();
}});
futureTasks.add(task);
checkpointExecutor.execute(task);
}
catch (Exception e) {
log.error("Failed to checkpoint a message store: " + e, e);
} }
} }
iterator = topics.values().iterator(); iterator = topics.values().iterator();
while (iterator.hasNext()) { while (iterator.hasNext()) {
try { final QuickTopicMessageStore ms = (QuickTopicMessageStore) iterator.next();
final QuickTopicMessageStore ms = (QuickTopicMessageStore) iterator.next(); Location mark = (Location) ms.getMark();
FutureTask<Location> task = new FutureTask<Location>(new Callable<Location>() { if (mark != null && (newMark == null || newMark.compareTo(mark) < 0)) {
public Location call() throws Exception { newMark = mark;
return ms.checkpoint();
}});
futureTasks.add(task);
checkpointExecutor.execute(task);
}
catch (Exception e) {
log.error("Failed to checkpoint a message store: " + e, e);
} }
} }
try {
for (Iterator<FutureTask> iter = futureTasks.iterator(); iter.hasNext();) {
FutureTask ft = iter.next();
Location mark = (Location) ft.get();
// We only set a newMark on full checkpoints.
if (mark != null && (newMark == null || newMark.compareTo(mark) < 0)) {
newMark = mark;
}
}
} catch (Throwable e) {
log.error("Failed to checkpoint a message store: " + e, e);
}
try { try {
if (newMark != null) { if (newMark != null) {
log.debug("Marking journal at: " + newMark); log.debug("Marking journal at: " + newMark);
@ -354,10 +328,8 @@ public class QuickPersistenceAdapter implements PersistenceAdapter, UsageListene
public void cleanup() { public void cleanup() {
try { try {
Set<Integer> inUse = referenceStoreAdapter.getReferenceFileIdsInUse(); Set<Integer> inUse = referenceStoreAdapter.getReferenceFileIdsInUse();
asyncDataManager.consolidateDataFilesNotIn(inUse); asyncDataManager.consolidateDataFilesNotIn(inUse);
} catch (IOException e) { } catch (IOException e) {
log.error("Could not cleanup data files: "+e, e); log.error("Could not cleanup data files: "+e, e);
} }
@ -386,6 +358,11 @@ public class QuickPersistenceAdapter implements PersistenceAdapter, UsageListene
if (store == null) { if (store == null) {
ReferenceStore checkpointStore = referenceStoreAdapter.createQueueReferenceStore(destination); ReferenceStore checkpointStore = referenceStoreAdapter.createQueueReferenceStore(destination);
store = new QuickMessageStore(this, checkpointStore, destination); store = new QuickMessageStore(this, checkpointStore, destination);
try {
store.start();
} catch (Exception e) {
throw IOExceptionSupport.create(e);
}
queues.put(destination, store); queues.put(destination, store);
} }
return store; return store;
@ -396,6 +373,11 @@ public class QuickPersistenceAdapter implements PersistenceAdapter, UsageListene
if (store == null) { if (store == null) {
TopicReferenceStore checkpointStore = referenceStoreAdapter.createTopicReferenceStore(destinationName); TopicReferenceStore checkpointStore = referenceStoreAdapter.createTopicReferenceStore(destinationName);
store = new QuickTopicMessageStore(this, checkpointStore, destinationName); store = new QuickTopicMessageStore(this, checkpointStore, destinationName);
try {
store.start();
} catch (Exception e) {
throw IOExceptionSupport.create(e);
}
topics.put(destinationName, store); topics.put(destinationName, store);
} }
return store; return store;
@ -445,7 +427,7 @@ public class QuickPersistenceAdapter implements PersistenceAdapter, UsageListene
* @throws InvalidLocationException * @throws InvalidLocationException
* @throws IllegalStateException * @throws IllegalStateException
*/ */
private void recover() throws IllegalStateException, IOException, IOException { private void recover() throws IllegalStateException, IOException {
Location pos = null; Location pos = null;
int transactionCounter = 0; int transactionCounter = 0;
@ -594,8 +576,7 @@ public class QuickPersistenceAdapter implements PersistenceAdapter, UsageListene
newPercentUsage = ((newPercentUsage)/10)*10; newPercentUsage = ((newPercentUsage)/10)*10;
oldPercentUsage = ((oldPercentUsage)/10)*10; oldPercentUsage = ((oldPercentUsage)/10)*10;
if (newPercentUsage >= 70 && oldPercentUsage < newPercentUsage) { if (newPercentUsage >= 70 && oldPercentUsage < newPercentUsage) {
boolean sync = newPercentUsage >= 90; checkpoint(false);
checkpoint(sync);
} }
} }

View File

@ -18,13 +18,13 @@
package org.apache.activemq.store.quick; package org.apache.activemq.store.quick;
import java.io.IOException; import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.HashMap; import java.util.HashMap;
import java.util.Iterator; import java.util.Iterator;
import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQTopic; import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.JournalTopicAck; import org.apache.activemq.command.JournalTopicAck;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageId; import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.SubscriptionInfo; import org.apache.activemq.command.SubscriptionInfo;
import org.apache.activemq.kaha.impl.async.Location; import org.apache.activemq.kaha.impl.async.Location;
@ -49,18 +49,18 @@ public class QuickTopicMessageStore extends QuickMessageStore implements TopicMe
private TopicReferenceStore topicReferenceStore; private TopicReferenceStore topicReferenceStore;
private HashMap<SubscriptionKey, MessageId> ackedLastAckLocations = new HashMap<SubscriptionKey, MessageId>(); private HashMap<SubscriptionKey, MessageId> ackedLastAckLocations = new HashMap<SubscriptionKey, MessageId>();
public QuickTopicMessageStore(QuickPersistenceAdapter adapter, TopicReferenceStore checkpointStore, ActiveMQTopic destinationName) { public QuickTopicMessageStore(QuickPersistenceAdapter adapter, TopicReferenceStore topicReferenceStore, ActiveMQTopic destinationName) {
super(adapter, checkpointStore, destinationName); super(adapter, topicReferenceStore, destinationName);
this.topicReferenceStore = checkpointStore; this.topicReferenceStore = topicReferenceStore;
} }
public void recoverSubscription(String clientId, String subscriptionName, MessageRecoveryListener listener) throws Exception { public void recoverSubscription(String clientId, String subscriptionName, MessageRecoveryListener listener) throws Exception {
this.peristenceAdapter.checkpoint(true); flush();
topicReferenceStore.recoverSubscription(clientId, subscriptionName, new RecoveryListenerAdapter(this, listener)); topicReferenceStore.recoverSubscription(clientId, subscriptionName, new RecoveryListenerAdapter(this, listener));
} }
public void recoverNextMessages(String clientId,String subscriptionName,int maxReturned, final MessageRecoveryListener listener) throws Exception{ public void recoverNextMessages(String clientId,String subscriptionName,int maxReturned, final MessageRecoveryListener listener) throws Exception{
this.peristenceAdapter.checkpoint(true); flush();
topicReferenceStore.recoverNextMessages(clientId, subscriptionName, maxReturned, new RecoveryListenerAdapter(this, listener)); topicReferenceStore.recoverNextMessages(clientId, subscriptionName, maxReturned, new RecoveryListenerAdapter(this, listener));
} }
@ -69,14 +69,10 @@ public class QuickTopicMessageStore extends QuickMessageStore implements TopicMe
} }
public void addSubsciption(String clientId, String subscriptionName, String selector, boolean retroactive) throws IOException { public void addSubsciption(String clientId, String subscriptionName, String selector, boolean retroactive) throws IOException {
this.peristenceAdapter.checkpoint(true); flush();
topicReferenceStore.addSubsciption(clientId, subscriptionName, selector, retroactive); topicReferenceStore.addSubsciption(clientId, subscriptionName, selector, retroactive);
} }
public void addMessage(ConnectionContext context, Message message) throws IOException {
super.addMessage(context, message);
}
/** /**
*/ */
public void acknowledge(ConnectionContext context, String clientId, String subscriptionName, final MessageId messageId) throws IOException { public void acknowledge(ConnectionContext context, String clientId, String subscriptionName, final MessageId messageId) throws IOException {
@ -141,27 +137,35 @@ public class QuickTopicMessageStore extends QuickMessageStore implements TopicMe
* @param messageId * @param messageId
* @param location * @param location
* @param key * @param key
* @throws InterruptedIOException
*/ */
private void acknowledge(MessageId messageId, Location location, SubscriptionKey key) { private void acknowledge(MessageId messageId, Location location, SubscriptionKey key) throws InterruptedIOException {
synchronized(this) { synchronized(this) {
lastLocation = location; lastLocation = location;
ackedLastAckLocations.put(key, messageId); ackedLastAckLocations.put(key, messageId);
} }
try {
asyncWriteTask.wakeup();
} catch (InterruptedException e) {
throw new InterruptedIOException();
}
} }
public Location checkpoint() throws IOException { @Override
protected Location doAsyncWrite() throws IOException {
final HashMap<SubscriptionKey, MessageId> cpAckedLastAckLocations;
final HashMap<SubscriptionKey, MessageId> cpAckedLastAckLocations;
// swap out the hash maps.. // swap out the hash maps..
synchronized (this) { synchronized (this) {
cpAckedLastAckLocations = this.ackedLastAckLocations; cpAckedLastAckLocations = this.ackedLastAckLocations;
this.ackedLastAckLocations = new HashMap<SubscriptionKey, MessageId>(); this.ackedLastAckLocations = new HashMap<SubscriptionKey, MessageId>();
} }
return super.checkpoint( new Callback() { Location location = super.doAsyncWrite();
transactionTemplate.run(new Callback() {
public void execute() throws Exception { public void execute() throws Exception {
// Checkpoint the acknowledged messages. // Checkpoint the acknowledged messages.
Iterator<SubscriptionKey> iterator = cpAckedLastAckLocations.keySet().iterator(); Iterator<SubscriptionKey> iterator = cpAckedLastAckLocations.keySet().iterator();
while (iterator.hasNext()) { while (iterator.hasNext()) {
@ -169,12 +173,12 @@ public class QuickTopicMessageStore extends QuickMessageStore implements TopicMe
MessageId identity = cpAckedLastAckLocations.get(subscriptionKey); MessageId identity = cpAckedLastAckLocations.get(subscriptionKey);
topicReferenceStore.acknowledge(transactionTemplate.getContext(), subscriptionKey.clientId, subscriptionKey.subscriptionName, identity); topicReferenceStore.acknowledge(transactionTemplate.getContext(), subscriptionKey.clientId, subscriptionKey.subscriptionName, identity);
} }
} }
}); } );
return location;
} }
/** /**
* @return Returns the longTermStore. * @return Returns the longTermStore.
*/ */
@ -192,7 +196,7 @@ public class QuickTopicMessageStore extends QuickMessageStore implements TopicMe
public int getMessageCount(String clientId,String subscriberName) throws IOException{ public int getMessageCount(String clientId,String subscriberName) throws IOException{
this.peristenceAdapter.checkpoint(true); flush();
return topicReferenceStore.getMessageCount(clientId,subscriberName); return topicReferenceStore.getMessageCount(clientId,subscriberName);
} }

View File

@ -21,9 +21,12 @@ import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageId; import org.apache.activemq.command.MessageId;
import org.apache.activemq.store.MessageRecoveryListener; import org.apache.activemq.store.MessageRecoveryListener;
import org.apache.activemq.store.MessageStore; import org.apache.activemq.store.MessageStore;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
final class RecoveryListenerAdapter implements MessageRecoveryListener { final class RecoveryListenerAdapter implements MessageRecoveryListener {
static final private Log log = LogFactory.getLog(RecoveryListenerAdapter.class);
private final MessageStore store; private final MessageStore store;
private final MessageRecoveryListener listener; private final MessageRecoveryListener listener;
@ -45,6 +48,12 @@ final class RecoveryListenerAdapter implements MessageRecoveryListener {
} }
public void recoverMessageReference(MessageId ref) throws Exception { public void recoverMessageReference(MessageId ref) throws Exception {
listener.recoverMessage( this.store.getMessage(ref) ); Message message = this.store.getMessage(ref);
if( message !=null ){
listener.recoverMessage( message );
} else {
log.error("Message id "+ref+" could not be recovered from the data store!");
}
} }
} }

View File

@ -17,6 +17,7 @@
*/ */
package org.apache.activemq; package org.apache.activemq;
import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.net.URI; import java.net.URI;
import java.util.ArrayList; import java.util.ArrayList;
@ -32,8 +33,6 @@ import javax.jms.MessageConsumer;
import javax.jms.MessageProducer; import javax.jms.MessageProducer;
import javax.jms.Session; import javax.jms.Session;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerFactory; import org.apache.activemq.broker.BrokerFactory;
import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQDestination;
@ -105,6 +104,12 @@ public class JmsTestSupport extends CombinationTestSupport {
protected void setUp() throws Exception { protected void setUp() throws Exception {
super.setUp(); super.setUp();
if(System.getProperty("basedir")==null){
File file=new File(".");
System.setProperty("basedir",file.getAbsolutePath());
}
broker = createBroker(); broker = createBroker();
broker.start(); broker.start();
factory = createConnectionFactory(); factory = createConnectionFactory();

View File

@ -45,6 +45,9 @@ import org.apache.activemq.command.ActiveMQQueue;
*/ */
public class LoadTester extends JmsTestSupport { public class LoadTester extends JmsTestSupport {
protected int MESSAGE_SIZE=1024*64;
protected int PRODUCE_COUNT=10000;
protected BrokerService createBroker() throws Exception { protected BrokerService createBroker() throws Exception {
return BrokerFactory.createBroker(new URI("xbean:org/apache/activemq/broker/store/loadtester.xml")); return BrokerFactory.createBroker(new URI("xbean:org/apache/activemq/broker/store/loadtester.xml"));
} }
@ -56,8 +59,6 @@ public class LoadTester extends JmsTestSupport {
} }
public void testQueueSendThenAddConsumer() throws Exception { public void testQueueSendThenAddConsumer() throws Exception {
int MESSAGE_SIZE=1024*64;
int PRODUCE_COUNT=10000;
ProgressPrinter printer = new ProgressPrinter(PRODUCE_COUNT, 20); ProgressPrinter printer = new ProgressPrinter(PRODUCE_COUNT, 20);
ActiveMQDestination destination = new ActiveMQQueue("TEST"); ActiveMQDestination destination = new ActiveMQQueue("TEST");

View File

@ -28,7 +28,7 @@ import org.apache.activemq.store.quick.QuickPersistenceAdapter;
* *
* @version $Revision$ * @version $Revision$
*/ */
public class QuickJournalRecoveryBrokerTest extends RecoveryBrokerTest { public class QuickStoreRecoveryBrokerTest extends RecoveryBrokerTest {
protected BrokerService createBroker() throws Exception { protected BrokerService createBroker() throws Exception {
BrokerService service = new BrokerService(); BrokerService service = new BrokerService();
@ -46,7 +46,7 @@ public class QuickJournalRecoveryBrokerTest extends RecoveryBrokerTest {
} }
public static Test suite() { public static Test suite() {
return suite(QuickJournalRecoveryBrokerTest.class); return suite(QuickStoreRecoveryBrokerTest.class);
} }
public static void main(String[] args) { public static void main(String[] args) {

View File

@ -21,17 +21,17 @@ import junit.framework.Test;
import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.XARecoveryBrokerTest; import org.apache.activemq.broker.XARecoveryBrokerTest;
import org.apache.activemq.store.DefaultPersistenceAdapterFactory; import org.apache.activemq.store.quick.QuickPersistenceAdapter;
/** /**
* Used to verify that recovery works correctly against * Used to verify that recovery works correctly against
* *
* @version $Revision$ * @version $Revision$
*/ */
public class QuickJournalXARecoveryBrokerTest extends XARecoveryBrokerTest { public class QuickStoreXARecoveryBrokerTest extends XARecoveryBrokerTest {
public static Test suite() { public static Test suite() {
return suite(QuickJournalXARecoveryBrokerTest.class); return suite(QuickStoreXARecoveryBrokerTest.class);
} }
public static void main(String[] args) { public static void main(String[] args) {
@ -41,15 +41,15 @@ public class QuickJournalXARecoveryBrokerTest extends XARecoveryBrokerTest {
protected BrokerService createBroker() throws Exception { protected BrokerService createBroker() throws Exception {
BrokerService service = new BrokerService(); BrokerService service = new BrokerService();
service.setDeleteAllMessagesOnStartup(true); service.setDeleteAllMessagesOnStartup(true);
DefaultPersistenceAdapterFactory factory = (DefaultPersistenceAdapterFactory) service.getPersistenceFactory(); QuickPersistenceAdapter pa = new QuickPersistenceAdapter();
factory.setUseQuickJournal(true); service.setPersistenceAdapter(pa);
return service; return service;
} }
protected BrokerService createRestartedBroker() throws Exception { protected BrokerService createRestartedBroker() throws Exception {
BrokerService service = new BrokerService(); BrokerService service = new BrokerService();
DefaultPersistenceAdapterFactory factory = (DefaultPersistenceAdapterFactory) service.getPersistenceFactory(); QuickPersistenceAdapter pa = new QuickPersistenceAdapter();
factory.setUseQuickJournal(true); service.setPersistenceAdapter(pa);
return service; return service;
} }