amq store - just like the quick store (cut n' paste) - difference is optimizations for recoverying in flight messages

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@504999 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Robert Davies 2007-02-08 18:46:54 +00:00
parent feef358a16
commit a324484867
14 changed files with 1906 additions and 19 deletions

View File

@ -208,4 +208,11 @@ public interface MapContainer<K, V> extends Map<K, V>{
* @return a refreshed StoreEntry
*/
public StoreEntry refresh(StoreEntry entry);
/**
* Get the StoreEntry associated with the key
* @param key
* @return the StoreEntry
*/
public StoreEntry getEntry(K key);
}

View File

@ -194,6 +194,23 @@ public final class MapContainerImpl extends BaseContainerImpl implements MapCont
return result;
}
/**
* Get the StoreEntry associated with the key
* @param key
* @return the StoreEntry
*/
public synchronized StoreEntry getEntry(Object key) {
load();
StoreEntry item=null;
try{
item=index.get(key);
}catch(IOException e){
log.error("Failed trying to get key: "+key,e);
throw new RuntimeException(e);
}
return item;
}
/*
* (non-Javadoc)
*

View File

@ -138,6 +138,9 @@ class HashBin{
}
}
if(!replace){
if (low > size()) {
System.out.println("SIZE() " + size() + " low = " + low);
}
addHashEntry(low,newEntry);
size++;
}

View File

@ -98,21 +98,21 @@ public class HashIndex implements Index{
*
* @param marshaller
*/
public void setKeyMarshaller(Marshaller marshaller){
public synchronized void setKeyMarshaller(Marshaller marshaller){
this.keyMarshaller=marshaller;
}
/**
* @return the keySize
*/
public int getKeySize(){
public synchronized int getKeySize(){
return this.keySize;
}
/**
* @param keySize the keySize to set
*/
public void setKeySize(int keySize){
public synchronized void setKeySize(int keySize){
this.keySize=keySize;
if(loaded.get()){
throw new RuntimeException("Pages already loaded - can't reset key size");
@ -122,14 +122,14 @@ public class HashIndex implements Index{
/**
* @return the pageSize
*/
public int getPageSize(){
public synchronized int getPageSize(){
return this.pageSize;
}
/**
* @param pageSize the pageSize to set
*/
public void setPageSize(int pageSize){
public synchronized void setPageSize(int pageSize){
if(loaded.get()&&pageSize!=this.pageSize){
throw new RuntimeException("Pages already loaded - can't reset page size");
}
@ -140,38 +140,38 @@ public class HashIndex implements Index{
/**
* @return the enablePageCaching
*/
public boolean isEnablePageCaching(){
public synchronized boolean isEnablePageCaching(){
return this.enablePageCaching;
}
/**
* @param enablePageCaching the enablePageCaching to set
*/
public void setEnablePageCaching(boolean enablePageCaching){
public synchronized void setEnablePageCaching(boolean enablePageCaching){
this.enablePageCaching=enablePageCaching;
}
/**
* @return the pageCacheSize
*/
public int getPageCacheSize(){
public synchronized int getPageCacheSize(){
return this.pageCacheSize;
}
/**
* @param pageCacheSize the pageCacheSize to set
*/
public void setPageCacheSize(int pageCacheSize){
public synchronized void setPageCacheSize(int pageCacheSize){
this.pageCacheSize=pageCacheSize;
pageCache.setMaxCacheSize(pageCacheSize);
}
public boolean isTransient(){
public synchronized boolean isTransient(){
return false;
}
public void load(){
public synchronized void load(){
if(loaded.compareAndSet(false,true)){
keysPerPage=pageSize/keySize;
dataIn=new DataByteArrayInputStream();
@ -211,7 +211,7 @@ public class HashIndex implements Index{
}
}
public void unload() throws IOException{
public synchronized void unload() throws IOException{
if(loaded.compareAndSet(true,false)){
if(indexFile!=null){
indexFile.close();
@ -222,7 +222,7 @@ public class HashIndex implements Index{
}
}
public void store(Object key,StoreEntry value) throws IOException{
public synchronized void store(Object key,StoreEntry value) throws IOException{
load();
HashEntry entry=new HashEntry();
entry.setKey((Comparable)key);
@ -230,7 +230,7 @@ public class HashIndex implements Index{
getBin(key).put(entry);
}
public StoreEntry get(Object key) throws IOException{
public synchronized StoreEntry get(Object key) throws IOException{
load();
HashEntry entry=new HashEntry();
entry.setKey((Comparable)key);
@ -238,7 +238,7 @@ public class HashIndex implements Index{
return result!=null?indexManager.getIndex(result.getIndexOffset()):null;
}
public StoreEntry remove(Object key) throws IOException{
public synchronized StoreEntry remove(Object key) throws IOException{
load();
HashEntry entry=new HashEntry();
entry.setKey((Comparable)key);
@ -246,18 +246,18 @@ public class HashIndex implements Index{
return result!=null?indexManager.getIndex(result.getIndexOffset()):null;
}
public boolean containsKey(Object key) throws IOException{
public synchronized boolean containsKey(Object key) throws IOException{
return get(key)!=null;
}
public void clear() throws IOException{
public synchronized void clear() throws IOException{
unload();
delete();
openIndexFile();
load();
}
public void delete() throws IOException{
public synchronized void delete() throws IOException{
unload();
if(file.exists()){
boolean result=file.delete();

View File

@ -192,6 +192,8 @@ class HashPage{
void addHashEntry(int index,HashEntry entry) throws IOException{
//index = index >= 0 ? index : 0;
//index = (index == 0 || index< size()) ? index : size()-1;
hashIndexEntries.add(index,entry);
}

View File

@ -72,4 +72,11 @@ public interface ReferenceStore extends MessageStore {
*/
public ReferenceData getMessageReference(MessageId identity) throws IOException;
/**
* @return true if it supports external batch control
*/
public boolean supportsExternalBatchControl();
public void setBatch(MessageId startAfter);
}

View File

@ -0,0 +1,496 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
* file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
* to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package org.apache.activemq.store.amq;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map.Entry;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.DataStructure;
import org.apache.activemq.command.JournalQueueAck;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.kaha.impl.async.Location;
import org.apache.activemq.memory.UsageManager;
import org.apache.activemq.store.MessageRecoveryListener;
import org.apache.activemq.store.MessageStore;
import org.apache.activemq.store.PersistenceAdapter;
import org.apache.activemq.store.ReferenceStore;
import org.apache.activemq.store.ReferenceStore.ReferenceData;
import org.apache.activemq.thread.Task;
import org.apache.activemq.thread.TaskRunner;
import org.apache.activemq.transaction.Synchronization;
import org.apache.activemq.util.Callback;
import org.apache.activemq.util.TransactionTemplate;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
/**
* A MessageStore that uses a Journal to store it's messages.
*
* @version $Revision: 1.14 $
*/
public class AMQMessageStore implements MessageStore{
private static final Log log=LogFactory.getLog(AMQMessageStore.class);
protected final AMQPersistenceAdapter peristenceAdapter;
protected final AMQTransactionStore transactionStore;
protected final ReferenceStore referenceStore;
protected final ActiveMQDestination destination;
protected final TransactionTemplate transactionTemplate;
private LinkedHashMap<MessageId,ReferenceData> messages=new LinkedHashMap<MessageId,ReferenceData>();
private ArrayList<MessageAck> messageAcks=new ArrayList<MessageAck>();
/** A MessageStore that we can use to retrieve messages quickly. */
private LinkedHashMap<MessageId,ReferenceData> cpAddedMessageIds;
protected Location lastLocation;
protected Location lastWrittenLocation;
protected HashSet<Location> inFlightTxLocations=new HashSet<Location>();
protected final TaskRunner asyncWriteTask;
protected CountDownLatch flushLatch;
private final AtomicReference<Location> mark=new AtomicReference<Location>();
public AMQMessageStore(AMQPersistenceAdapter adapter,ReferenceStore referenceStore,ActiveMQDestination destination){
this.peristenceAdapter=adapter;
this.transactionStore=adapter.getTransactionStore();
this.referenceStore=referenceStore;
this.destination=destination;
this.transactionTemplate=new TransactionTemplate(adapter,new ConnectionContext());
asyncWriteTask=adapter.getTaskRunnerFactory().createTaskRunner(new Task(){
public boolean iterate(){
asyncWrite();
return false;
}
},"Checkpoint: "+destination);
}
public void setUsageManager(UsageManager usageManager){
referenceStore.setUsageManager(usageManager);
}
/**
* Not synchronized since the Journal has better throughput if you increase the number of concurrent writes that it
* is doing.
*/
public void addMessage(ConnectionContext context,final Message message) throws IOException{
final MessageId id=message.getMessageId();
final boolean debug=log.isDebugEnabled();
final Location location=peristenceAdapter.writeCommand(message,message.isResponseRequired());
if(!context.isInTransaction()){
if(debug)
log.debug("Journalled message add for: "+id+", at: "+location);
addMessage(message,location);
}else{
if(debug)
log.debug("Journalled transacted message add for: "+id+", at: "+location);
synchronized(this){
inFlightTxLocations.add(location);
}
transactionStore.addMessage(this,message,location);
context.getTransaction().addSynchronization(new Synchronization(){
public void afterCommit() throws Exception{
if(debug)
log.debug("Transacted message add commit for: "+id+", at: "+location);
synchronized(AMQMessageStore.this){
inFlightTxLocations.remove(location);
addMessage(message,location);
}
}
public void afterRollback() throws Exception{
if(debug)
log.debug("Transacted message add rollback for: "+id+", at: "+location);
synchronized(AMQMessageStore.this){
inFlightTxLocations.remove(location);
}
}
});
}
}
private void addMessage(final Message message,final Location location) throws InterruptedIOException{
ReferenceData data=new ReferenceData();
data.setExpiration(message.getExpiration());
data.setFileId(location.getDataFileId());
data.setOffset(location.getOffset());
synchronized(this){
lastLocation=location;
messages.put(message.getMessageId(),data);
}
try{
asyncWriteTask.wakeup();
}catch(InterruptedException e){
throw new InterruptedIOException();
}
}
public boolean replayAddMessage(ConnectionContext context,Message message,Location location){
MessageId id=message.getMessageId();
try{
// Only add the message if it has not already been added.
ReferenceData data=referenceStore.getMessageReference(id);
if(data==null){
data=new ReferenceData();
data.setExpiration(message.getExpiration());
data.setFileId(location.getDataFileId());
data.setOffset(location.getOffset());
referenceStore.addMessageReference(context,id,data);
return true;
}
}catch(Throwable e){
log.warn("Could not replay add for message '"+id+"'. Message may have already been added. reason: "+e,e);
}
return false;
}
/**
*/
public void removeMessage(ConnectionContext context,final MessageAck ack) throws IOException{
final boolean debug=log.isDebugEnabled();
JournalQueueAck remove=new JournalQueueAck();
remove.setDestination(destination);
remove.setMessageAck(ack);
final Location location=peristenceAdapter.writeCommand(remove,ack.isResponseRequired());
if(!context.isInTransaction()){
if(debug)
log.debug("Journalled message remove for: "+ack.getLastMessageId()+", at: "+location);
removeMessage(ack,location);
}else{
if(debug)
log.debug("Journalled transacted message remove for: "+ack.getLastMessageId()+", at: "+location);
synchronized(this){
inFlightTxLocations.add(location);
}
transactionStore.removeMessage(this,ack,location);
context.getTransaction().addSynchronization(new Synchronization(){
public void afterCommit() throws Exception{
if(debug)
log.debug("Transacted message remove commit for: "+ack.getLastMessageId()+", at: "+location);
synchronized(AMQMessageStore.this){
inFlightTxLocations.remove(location);
removeMessage(ack,location);
}
}
public void afterRollback() throws Exception{
if(debug)
log.debug("Transacted message remove rollback for: "+ack.getLastMessageId()+", at: "+location);
synchronized(AMQMessageStore.this){
inFlightTxLocations.remove(location);
}
}
});
}
}
private void removeMessage(final MessageAck ack,final Location location) throws InterruptedIOException{
ReferenceData data;
synchronized(this){
lastLocation=location;
MessageId id=ack.getLastMessageId();
data=messages.remove(id);
if(data==null){
messageAcks.add(ack);
}
}
if(data==null){
try{
asyncWriteTask.wakeup();
}catch(InterruptedException e){
throw new InterruptedIOException();
}
}
}
public boolean replayRemoveMessage(ConnectionContext context,MessageAck messageAck){
try{
// Only remove the message if it has not already been removed.
ReferenceData t=referenceStore.getMessageReference(messageAck.getLastMessageId());
if(t!=null){
referenceStore.removeMessage(context,messageAck);
return true;
}
}catch(Throwable e){
log.warn("Could not replay acknowledge for message '"+messageAck.getLastMessageId()
+"'. Message may have already been acknowledged. reason: "+e);
}
return false;
}
/**
* Waits till the lastest data has landed on the referenceStore
*
* @throws InterruptedIOException
*/
public void flush() throws InterruptedIOException{
if(log.isDebugEnabled()){
log.debug("flush starting ...");
}
CountDownLatch countDown;
synchronized(this){
if(lastWrittenLocation==lastLocation){
return;
}
if(flushLatch==null){
flushLatch=new CountDownLatch(1);
}
countDown=flushLatch;
}
try{
asyncWriteTask.wakeup();
countDown.await();
}catch(InterruptedException e){
throw new InterruptedIOException();
}
if(log.isDebugEnabled()){
log.debug("flush finished");
}
}
/**
* @return
* @throws IOException
*/
private void asyncWrite(){
try{
CountDownLatch countDown;
synchronized(this){
countDown=flushLatch;
flushLatch=null;
}
mark.set(doAsyncWrite());
if(countDown!=null){
countDown.countDown();
}
}catch(IOException e){
log.error("Checkpoint failed: "+e,e);
}
}
/**
* @return
* @throws IOException
*/
protected Location doAsyncWrite() throws IOException{
final ArrayList<MessageAck> cpRemovedMessageLocations;
final ArrayList<Location> cpActiveJournalLocations;
final int maxCheckpointMessageAddSize=peristenceAdapter.getMaxCheckpointMessageAddSize();
final Location lastLocation;
// swap out the message hash maps..
synchronized(this){
cpAddedMessageIds=this.messages;
cpRemovedMessageLocations=this.messageAcks;
cpActiveJournalLocations=new ArrayList<Location>(inFlightTxLocations);
this.messages=new LinkedHashMap<MessageId,ReferenceData>();
this.messageAcks=new ArrayList<MessageAck>();
lastLocation=this.lastLocation;
}
if(log.isDebugEnabled())
log.debug("Doing batch update... adding: "+cpAddedMessageIds.size()+" removing: "
+cpRemovedMessageLocations.size()+" ");
transactionTemplate.run(new Callback(){
public void execute() throws Exception{
int size=0;
PersistenceAdapter persitanceAdapter=transactionTemplate.getPersistenceAdapter();
ConnectionContext context=transactionTemplate.getContext();
// Checkpoint the added messages.
Iterator<Entry<MessageId,ReferenceData>> iterator=cpAddedMessageIds.entrySet().iterator();
while(iterator.hasNext()){
Entry<MessageId,ReferenceData> entry=iterator.next();
try{
referenceStore.addMessageReference(context,entry.getKey(),entry.getValue());
}catch(Throwable e){
log.warn("Message could not be added to long term store: "+e.getMessage(),e);
}
size++;
// Commit the batch if it's getting too big
if(size>=maxCheckpointMessageAddSize){
persitanceAdapter.commitTransaction(context);
persitanceAdapter.beginTransaction(context);
size=0;
}
}
persitanceAdapter.commitTransaction(context);
persitanceAdapter.beginTransaction(context);
// Checkpoint the removed messages.
for(MessageAck ack:cpRemovedMessageLocations){
try{
referenceStore.removeMessage(transactionTemplate.getContext(),ack);
}catch(Throwable e){
e.printStackTrace();
log.debug("Message could not be removed from long term store: "+e.getMessage(),e);
}
}
}
});
log.debug("Batch update done.");
synchronized(this){
cpAddedMessageIds=null;
lastWrittenLocation=lastLocation;
}
if(cpActiveJournalLocations.size()>0){
Collections.sort(cpActiveJournalLocations);
return cpActiveJournalLocations.get(0);
}else{
return lastLocation;
}
}
/**
*
*/
public Message getMessage(MessageId identity) throws IOException{
ReferenceData data=null;
synchronized(this){
// Is it still in flight???
data=messages.get(identity);
if(data==null&&cpAddedMessageIds!=null){
data=cpAddedMessageIds.get(identity);
}
}
if(data==null){
data=referenceStore.getMessageReference(identity);
if(data==null){
return null;
}
}
Location location=new Location();
location.setDataFileId(data.getFileId());
location.setOffset(data.getOffset());
DataStructure rc=peristenceAdapter.readCommand(location);
try{
return (Message)rc;
}catch(ClassCastException e){
throw new IOException("Could not read message "+identity+" at location "+location
+", expected a message, but got: "+rc);
}
}
/**
* Replays the referenceStore first as those messages are the oldest ones, then messages are replayed from the
* transaction log and then the cache is updated.
*
* @param listener
* @throws Exception
*/
public void recover(final MessageRecoveryListener listener) throws Exception{
flush();
referenceStore.recover(new RecoveryListenerAdapter(this,listener));
}
public void start() throws Exception{
referenceStore.start();
}
public void stop() throws Exception{
asyncWriteTask.shutdown();
referenceStore.stop();
}
/**
* @return Returns the longTermStore.
*/
public ReferenceStore getReferenceStore(){
return referenceStore;
}
/**
* @see org.apache.activemq.store.MessageStore#removeAllMessages(ConnectionContext)
*/
public void removeAllMessages(ConnectionContext context) throws IOException{
flush();
referenceStore.removeAllMessages(context);
}
public ActiveMQDestination getDestination(){
return destination;
}
public void addMessageReference(ConnectionContext context,MessageId messageId,long expirationTime,String messageRef)
throws IOException{
throw new IOException("The journal does not support message references.");
}
public String getMessageReference(MessageId identity) throws IOException{
throw new IOException("The journal does not support message references.");
}
/**
* @return
* @throws IOException
* @see org.apache.activemq.store.MessageStore#getMessageCount()
*/
public int getMessageCount() throws IOException{
flush();
return referenceStore.getMessageCount();
}
public void recoverNextMessages(int maxReturned,MessageRecoveryListener listener) throws Exception{
RecoveryListenerAdapter recoveryListener=new RecoveryListenerAdapter(this,listener);
if(referenceStore.supportsExternalBatchControl()){
synchronized(this){
referenceStore.recoverNextMessages(maxReturned,recoveryListener);
if(recoveryListener.size()==0&&recoveryListener.hasSpace()){
// check for inflight messages
int count=0;
Iterator<Entry<MessageId,ReferenceData>> iterator=messages.entrySet().iterator();
while(iterator.hasNext()&&count<maxReturned&&recoveryListener.hasSpace()){
Entry<MessageId,ReferenceData> entry=iterator.next();
ReferenceData data=entry.getValue();
Message message=getMessage(data);
recoveryListener.recoverMessage(message);
count++;
}
referenceStore.setBatch(recoveryListener.getLastRecoveredMessageId());
}
}
}else{
flush();
referenceStore.recoverNextMessages(maxReturned,recoveryListener);
}
}
Message getMessage(ReferenceData data) throws IOException{
Location location=new Location();
location.setDataFileId(data.getFileId());
location.setOffset(data.getOffset());
DataStructure rc=peristenceAdapter.readCommand(location);
try{
return (Message)rc;
}catch(ClassCastException e){
throw new IOException("Could not read message at location "+location+", expected a message, but got: "+rc);
}
}
public void resetBatching(){
referenceStore.resetBatching();
}
public Location getMark(){
return mark.get();
}
}

View File

@ -0,0 +1,679 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.store.amq;
import java.io.File;
import java.io.IOException;
import java.util.Date;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.activeio.journal.Journal;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.DataStructure;
import org.apache.activemq.command.JournalQueueAck;
import org.apache.activemq.command.JournalTopicAck;
import org.apache.activemq.command.JournalTrace;
import org.apache.activemq.command.JournalTransaction;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.kaha.impl.async.AsyncDataManager;
import org.apache.activemq.kaha.impl.async.Location;
import org.apache.activemq.memory.UsageListener;
import org.apache.activemq.memory.UsageManager;
import org.apache.activemq.openwire.OpenWireFormat;
import org.apache.activemq.store.MessageStore;
import org.apache.activemq.store.PersistenceAdapter;
import org.apache.activemq.store.ReferenceStore;
import org.apache.activemq.store.ReferenceStoreAdapter;
import org.apache.activemq.store.TopicMessageStore;
import org.apache.activemq.store.TopicReferenceStore;
import org.apache.activemq.store.TransactionStore;
import org.apache.activemq.store.amq.AMQTransactionStore.Tx;
import org.apache.activemq.store.amq.AMQTransactionStore.TxOperation;
import org.apache.activemq.store.kahadaptor.KahaReferenceStoreAdapter;
import org.apache.activemq.thread.DefaultThreadPools;
import org.apache.activemq.thread.Scheduler;
import org.apache.activemq.thread.Task;
import org.apache.activemq.thread.TaskRunner;
import org.apache.activemq.thread.TaskRunnerFactory;
import org.apache.activemq.util.ByteSequence;
import org.apache.activemq.util.IOExceptionSupport;
import org.apache.activemq.wireformat.WireFormat;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
/**
* An implementation of {@link PersistenceAdapter} designed for use with a
* {@link Journal} and then check pointing asynchronously on a timeout with some
* other long term persistent storage.
*
* @org.apache.xbean.XBean
*
* @version $Revision: 1.17 $
*/
public class AMQPersistenceAdapter implements PersistenceAdapter, UsageListener {
private static final Log log = LogFactory.getLog(AMQPersistenceAdapter.class);
private final ConcurrentHashMap<ActiveMQQueue, AMQMessageStore> queues = new ConcurrentHashMap<ActiveMQQueue, AMQMessageStore>();
private final ConcurrentHashMap<ActiveMQTopic, AMQMessageStore> topics = new ConcurrentHashMap<ActiveMQTopic, AMQMessageStore>();
private AsyncDataManager asyncDataManager;
private ReferenceStoreAdapter referenceStoreAdapter;
private TaskRunnerFactory taskRunnerFactory;
private WireFormat wireFormat = new OpenWireFormat();
private UsageManager usageManager;
private long cleanupInterval = 1000 * 60;
private long checkpointInterval = 1000 * 10;
private int maxCheckpointWorkers = 1;
private int maxCheckpointMessageAddSize = 1024*4;
private AMQTransactionStore transactionStore = new AMQTransactionStore(this);
private TaskRunner checkpointTask;
private CountDownLatch nextCheckpointCountDownLatch = new CountDownLatch(1);
private final AtomicBoolean started = new AtomicBoolean(false);
private Runnable periodicCheckpointTask;
private Runnable periodicCleanupTask;
private boolean deleteAllMessages;
private File directory = new File("activemq-data/quick");
public synchronized void start() throws Exception {
if( !started.compareAndSet(false, true) )
return;
this.usageManager.addUsageListener(this);
if( asyncDataManager == null ) {
asyncDataManager = createAsyncDataManager();
}
if( referenceStoreAdapter==null ) {
referenceStoreAdapter = createReferenceStoreAdapter();
}
referenceStoreAdapter.setUsageManager(usageManager);
if( taskRunnerFactory==null ) {
taskRunnerFactory = createTaskRunnerFactory();
}
asyncDataManager.start();
if( deleteAllMessages ) {
asyncDataManager.delete();
try {
JournalTrace trace = new JournalTrace();
trace.setMessage("DELETED "+new Date());
Location location = asyncDataManager.write(wireFormat.marshal(trace), false);
asyncDataManager.setMark(location, true);
log.info("Journal deleted: ");
deleteAllMessages=false;
} catch (IOException e) {
throw e;
} catch (Throwable e) {
throw IOExceptionSupport.create(e);
}
referenceStoreAdapter.deleteAllMessages();
}
referenceStoreAdapter.start();
Set<Integer> files = referenceStoreAdapter.getReferenceFileIdsInUse();
log.info("Active data files: "+files);
checkpointTask = taskRunnerFactory.createTaskRunner(new Task(){
public boolean iterate() {
doCheckpoint();
return false;
}
}, "ActiveMQ Journal Checkpoint Worker");
createTransactionStore();
recover();
// Do a checkpoint periodically.
periodicCheckpointTask = new Runnable() {
public void run() {
checkpoint(false);
}
};
Scheduler.executePeriodically(periodicCheckpointTask, checkpointInterval);
periodicCleanupTask = new Runnable() {
public void run() {
cleanup();
}
};
Scheduler.executePeriodically(periodicCleanupTask, cleanupInterval);
}
public void stop() throws Exception {
if( !started.compareAndSet(true, false) )
return;
this.usageManager.removeUsageListener(this);
Scheduler.cancel(periodicCheckpointTask);
Scheduler.cancel(periodicCleanupTask);
Iterator<AMQMessageStore> iterator = queues.values().iterator();
while (iterator.hasNext()) {
AMQMessageStore ms = iterator.next();
ms.stop();
}
iterator = topics.values().iterator();
while (iterator.hasNext()) {
final AMQTopicMessageStore ms = (AMQTopicMessageStore) iterator.next();
ms.stop();
}
// Take one final checkpoint and stop checkpoint processing.
checkpoint(true);
checkpointTask.shutdown();
queues.clear();
topics.clear();
IOException firstException = null;
referenceStoreAdapter.stop();
try {
log.debug("Journal close");
asyncDataManager.close();
} catch (Exception e) {
firstException = IOExceptionSupport.create("Failed to close journals: " + e, e);
}
if (firstException != null) {
throw firstException;
}
}
/**
* When we checkpoint we move all the journalled data to long term storage.
* @param stopping
*
* @param b
*/
public void checkpoint(boolean sync) {
try {
if (asyncDataManager == null )
throw new IllegalStateException("Journal is closed.");
CountDownLatch latch = null;
synchronized(this) {
latch = nextCheckpointCountDownLatch;
}
checkpointTask.wakeup();
if (sync) {
log.debug("Waitng for checkpoint to complete.");
latch.await();
}
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.warn("Request to start checkpoint failed: " + e, e);
}
}
/**
* This does the actual checkpoint.
* @return
*/
public boolean doCheckpoint() {
CountDownLatch latch = null;
synchronized(this) {
latch = nextCheckpointCountDownLatch;
nextCheckpointCountDownLatch = new CountDownLatch(1);
}
try {
log.debug("Checkpoint started.");
Location newMark = null;
Iterator<AMQMessageStore> iterator = queues.values().iterator();
while (iterator.hasNext()) {
final AMQMessageStore ms = iterator.next();
Location mark = (Location) ms.getMark();
if (mark != null && (newMark == null || newMark.compareTo(mark) < 0)) {
newMark = mark;
}
}
iterator = topics.values().iterator();
while (iterator.hasNext()) {
final AMQTopicMessageStore ms = (AMQTopicMessageStore) iterator.next();
Location mark = (Location) ms.getMark();
if (mark != null && (newMark == null || newMark.compareTo(mark) < 0)) {
newMark = mark;
}
}
try {
if (newMark != null) {
log.debug("Marking journal at: " + newMark);
asyncDataManager.setMark(newMark, false);
writeTraceMessage("CHECKPOINT "+new Date(), true);
}
}
catch (Exception e) {
log.error("Failed to mark the Journal: " + e, e);
}
// if (referenceStoreAdapter instanceof JDBCReferenceStoreAdapter) {
// // We may be check pointing more often than the checkpointInterval if under high use
// // But we don't want to clean up the db that often.
// long now = System.currentTimeMillis();
// if( now > lastCleanup+checkpointInterval ) {
// lastCleanup = now;
// ((JDBCReferenceStoreAdapter) referenceStoreAdapter).cleanup();
// }
// }
log.debug("Checkpoint done.");
}
finally {
latch.countDown();
}
return true;
}
/**
* Cleans up the data files
* @return
* @throws IOException
*/
public void cleanup() {
try {
Set<Integer> inUse = referenceStoreAdapter.getReferenceFileIdsInUse();
asyncDataManager.consolidateDataFilesNotIn(inUse);
} catch (IOException e) {
log.error("Could not cleanup data files: "+e, e);
}
}
public Set<ActiveMQDestination> getDestinations() {
Set<ActiveMQDestination> destinations = new HashSet<ActiveMQDestination>(referenceStoreAdapter.getDestinations());
destinations.addAll(queues.keySet());
destinations.addAll(topics.keySet());
return destinations;
}
private MessageStore createMessageStore(ActiveMQDestination destination) throws IOException {
if (destination.isQueue()) {
return createQueueMessageStore((ActiveMQQueue) destination);
}
else {
return createTopicMessageStore((ActiveMQTopic) destination);
}
}
public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException {
AMQMessageStore store = queues.get(destination);
if (store == null) {
ReferenceStore checkpointStore = referenceStoreAdapter.createQueueReferenceStore(destination);
store = new AMQMessageStore(this, checkpointStore, destination);
try {
store.start();
} catch (Exception e) {
throw IOExceptionSupport.create(e);
}
queues.put(destination, store);
}
return store;
}
public TopicMessageStore createTopicMessageStore(ActiveMQTopic destinationName) throws IOException {
AMQTopicMessageStore store = (AMQTopicMessageStore) topics.get(destinationName);
if (store == null) {
TopicReferenceStore checkpointStore = referenceStoreAdapter.createTopicReferenceStore(destinationName);
store = new AMQTopicMessageStore(this, checkpointStore, destinationName);
try {
store.start();
} catch (Exception e) {
throw IOExceptionSupport.create(e);
}
topics.put(destinationName, store);
}
return store;
}
public TransactionStore createTransactionStore() throws IOException {
return transactionStore;
}
public long getLastMessageBrokerSequenceId() throws IOException {
return referenceStoreAdapter.getLastMessageBrokerSequenceId();
}
public void beginTransaction(ConnectionContext context) throws IOException {
referenceStoreAdapter.beginTransaction(context);
}
public void commitTransaction(ConnectionContext context) throws IOException {
referenceStoreAdapter.commitTransaction(context);
}
public void rollbackTransaction(ConnectionContext context) throws IOException {
referenceStoreAdapter.rollbackTransaction(context);
}
/**
* @param location
* @return
* @throws IOException
*/
public DataStructure readCommand(Location location) throws IOException {
try {
ByteSequence packet = asyncDataManager.read(location);
return (DataStructure) wireFormat.unmarshal(packet);
} catch (IOException e) {
throw createReadException(location, e);
}
}
/**
* Move all the messages that were in the journal into long term storage. We
* just replay and do a checkpoint.
*
* @throws IOException
* @throws IOException
* @throws InvalidLocationException
* @throws IllegalStateException
*/
private void recover() throws IllegalStateException, IOException {
Location pos = null;
int redoCounter = 0;
log.info("Journal Recovery Started from: " + asyncDataManager);
long start = System.currentTimeMillis();
ConnectionContext context = new ConnectionContext();
// While we have records in the journal.
while ((pos = asyncDataManager.getNextLocation(pos)) != null) {
ByteSequence data = asyncDataManager.read(pos);
DataStructure c = (DataStructure) wireFormat.unmarshal(data);
if (c instanceof Message ) {
Message message = (Message) c;
AMQMessageStore store = (AMQMessageStore) createMessageStore(message.getDestination());
if ( message.isInTransaction()) {
transactionStore.addMessage(store, message, pos);
}
else {
if( store.replayAddMessage(context, message, pos) ) {
redoCounter++;
}
}
} else {
switch (c.getDataStructureType()) {
case JournalQueueAck.DATA_STRUCTURE_TYPE:
{
JournalQueueAck command = (JournalQueueAck) c;
AMQMessageStore store = (AMQMessageStore) createMessageStore(command.getDestination());
if (command.getMessageAck().isInTransaction()) {
transactionStore.removeMessage(store, command.getMessageAck(), pos);
}
else {
if( store.replayRemoveMessage(context, command.getMessageAck()) ) {
redoCounter++;
}
}
}
break;
case JournalTopicAck.DATA_STRUCTURE_TYPE:
{
JournalTopicAck command = (JournalTopicAck) c;
AMQTopicMessageStore store = (AMQTopicMessageStore) createMessageStore(command.getDestination());
if (command.getTransactionId() != null) {
transactionStore.acknowledge(store, command, pos);
}
else {
if( store.replayAcknowledge(context, command.getClientId(), command.getSubscritionName(), command.getMessageId()) ) {
redoCounter++;
}
}
}
break;
case JournalTransaction.DATA_STRUCTURE_TYPE:
{
JournalTransaction command = (JournalTransaction) c;
try {
// Try to replay the packet.
switch (command.getType()) {
case JournalTransaction.XA_PREPARE:
transactionStore.replayPrepare(command.getTransactionId());
break;
case JournalTransaction.XA_COMMIT:
case JournalTransaction.LOCAL_COMMIT:
Tx tx = transactionStore.replayCommit(command.getTransactionId(), command.getWasPrepared());
if (tx == null)
break; // We may be trying to replay a commit that
// was already committed.
// Replay the committed operations.
tx.getOperations();
for (Iterator iter = tx.getOperations().iterator(); iter.hasNext();) {
TxOperation op = (TxOperation) iter.next();
if (op.operationType == TxOperation.ADD_OPERATION_TYPE) {
if( op.store.replayAddMessage(context, (Message)op.data, op.location) )
redoCounter++;
}
if (op.operationType == TxOperation.REMOVE_OPERATION_TYPE) {
if( op.store.replayRemoveMessage(context, (MessageAck) op.data) )
redoCounter++;
}
if (op.operationType == TxOperation.ACK_OPERATION_TYPE) {
JournalTopicAck ack = (JournalTopicAck) op.data;
if( ((AMQTopicMessageStore) op.store).replayAcknowledge(context, ack.getClientId(), ack.getSubscritionName(), ack.getMessageId()) ) {
redoCounter++;
}
}
}
break;
case JournalTransaction.LOCAL_ROLLBACK:
case JournalTransaction.XA_ROLLBACK:
transactionStore.replayRollback(command.getTransactionId());
break;
}
}
catch (IOException e) {
log.error("Recovery Failure: Could not replay: " + c + ", reason: " + e, e);
}
}
break;
case JournalTrace.DATA_STRUCTURE_TYPE:
JournalTrace trace = (JournalTrace) c;
log.debug("TRACE Entry: " + trace.getMessage());
break;
default:
log.error("Unknown type of record in transaction log which will be discarded: " + c);
}
}
}
Location location = writeTraceMessage("RECOVERED "+new Date(), true);
asyncDataManager.setMark(location, true);
long end = System.currentTimeMillis();
log.info("Recovered " + redoCounter + " operations from redo log in "+((end-start)/1000.0f)+" seconds.");
}
private IOException createReadException(Location location, Exception e) {
return IOExceptionSupport.create("Failed to read to journal for: " + location + ". Reason: " + e, e);
}
protected IOException createWriteException(DataStructure packet, Exception e) {
return IOExceptionSupport.create("Failed to write to journal for: " + packet + ". Reason: " + e, e);
}
protected IOException createWriteException(String command, Exception e) {
return IOExceptionSupport.create("Failed to write to journal for command: " + command + ". Reason: " + e, e);
}
protected IOException createRecoveryFailedException(Exception e) {
return IOExceptionSupport.create("Failed to recover from journal. Reason: " + e, e);
}
/**
*
* @param command
* @param sync
* @return
* @throws IOException
*/
public Location writeCommand(DataStructure command, boolean sync) throws IOException {
return asyncDataManager.write(wireFormat.marshal(command), sync);
}
private Location writeTraceMessage(String message, boolean sync) throws IOException {
JournalTrace trace = new JournalTrace();
trace.setMessage(message);
return writeCommand(trace, sync);
}
public void onMemoryUseChanged(UsageManager memoryManager, int oldPercentUsage, int newPercentUsage) {
newPercentUsage = ((newPercentUsage)/10)*10;
oldPercentUsage = ((oldPercentUsage)/10)*10;
if (newPercentUsage >= 70 && oldPercentUsage < newPercentUsage) {
checkpoint(false);
}
}
public AMQTransactionStore getTransactionStore() {
return transactionStore;
}
public void deleteAllMessages() throws IOException {
deleteAllMessages=true;
}
public String toString(){
return "JournalPersistenceAdapator(" + referenceStoreAdapter + ")";
}
///////////////////////////////////////////////////////////////////
// Subclass overridables
///////////////////////////////////////////////////////////////////
protected AsyncDataManager createAsyncDataManager() {
AsyncDataManager manager = new AsyncDataManager();
manager.setDirectory(new File(directory, "journal"));
return manager;
}
protected ReferenceStoreAdapter createReferenceStoreAdapter() throws IOException {
KahaReferenceStoreAdapter adaptor = new KahaReferenceStoreAdapter(directory);
return adaptor;
}
protected TaskRunnerFactory createTaskRunnerFactory() {
return DefaultThreadPools.getDefaultTaskRunnerFactory();
}
///////////////////////////////////////////////////////////////////
// Property Accessors
///////////////////////////////////////////////////////////////////
public AsyncDataManager getAsyncDataManager() {
return asyncDataManager;
}
public void setAsyncDataManager(AsyncDataManager asyncDataManager) {
this.asyncDataManager = asyncDataManager;
}
public ReferenceStoreAdapter getReferenceStoreAdapter() {
return referenceStoreAdapter;
}
public void setReferenceStoreAdapter(ReferenceStoreAdapter referenceStoreAdapter) {
this.referenceStoreAdapter = referenceStoreAdapter;
}
public TaskRunnerFactory getTaskRunnerFactory() {
return taskRunnerFactory;
}
public void setTaskRunnerFactory(TaskRunnerFactory taskRunnerFactory) {
this.taskRunnerFactory = taskRunnerFactory;
}
/**
* @return Returns the wireFormat.
*/
public WireFormat getWireFormat() {
return wireFormat;
}
public void setWireFormat(WireFormat wireFormat) {
this.wireFormat = wireFormat;
}
public UsageManager getUsageManager() {
return usageManager;
}
public void setUsageManager(UsageManager usageManager) {
this.usageManager = usageManager;
}
public int getMaxCheckpointMessageAddSize() {
return maxCheckpointMessageAddSize;
}
public void setMaxCheckpointMessageAddSize(int maxCheckpointMessageAddSize) {
this.maxCheckpointMessageAddSize = maxCheckpointMessageAddSize;
}
public int getMaxCheckpointWorkers() {
return maxCheckpointWorkers;
}
public void setMaxCheckpointWorkers(int maxCheckpointWorkers) {
this.maxCheckpointWorkers = maxCheckpointWorkers;
}
public File getDirectory() {
return directory;
}
public void setDirectory(File directory) {
this.directory = directory;
}
}

View File

@ -0,0 +1,205 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
* file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
* to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package org.apache.activemq.store.amq;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.HashMap;
import java.util.Iterator;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.JournalTopicAck;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.SubscriptionInfo;
import org.apache.activemq.kaha.impl.async.Location;
import org.apache.activemq.store.MessageRecoveryListener;
import org.apache.activemq.store.TopicMessageStore;
import org.apache.activemq.store.TopicReferenceStore;
import org.apache.activemq.transaction.Synchronization;
import org.apache.activemq.util.Callback;
import org.apache.activemq.util.SubscriptionKey;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
/**
* A MessageStore that uses a Journal to store it's messages.
*
* @version $Revision: 1.13 $
*/
public class AMQTopicMessageStore extends AMQMessageStore implements TopicMessageStore{
private static final Log log=LogFactory.getLog(AMQTopicMessageStore.class);
private TopicReferenceStore topicReferenceStore;
private HashMap<SubscriptionKey,MessageId> ackedLastAckLocations=new HashMap<SubscriptionKey,MessageId>();
public AMQTopicMessageStore(AMQPersistenceAdapter adapter,TopicReferenceStore topicReferenceStore,
ActiveMQTopic destinationName){
super(adapter,topicReferenceStore,destinationName);
this.topicReferenceStore=topicReferenceStore;
}
public void recoverSubscription(String clientId,String subscriptionName,MessageRecoveryListener listener)
throws Exception{
flush();
topicReferenceStore.recoverSubscription(clientId,subscriptionName,new RecoveryListenerAdapter(this,listener));
}
public void recoverNextMessages(String clientId,String subscriptionName,int maxReturned,
final MessageRecoveryListener listener) throws Exception{
RecoveryListenerAdapter recoveryListener=new RecoveryListenerAdapter(this,listener);
topicReferenceStore.recoverNextMessages(clientId,subscriptionName,maxReturned,recoveryListener);
if(recoveryListener.size()==0){
flush();
topicReferenceStore.recoverNextMessages(clientId,subscriptionName,maxReturned,recoveryListener);
}
}
public SubscriptionInfo lookupSubscription(String clientId,String subscriptionName) throws IOException{
return topicReferenceStore.lookupSubscription(clientId,subscriptionName);
}
public void addSubsciption(String clientId,String subscriptionName,String selector,boolean retroactive)
throws IOException{
topicReferenceStore.addSubsciption(clientId,subscriptionName,selector,retroactive);
}
/**
*/
public void acknowledge(ConnectionContext context,String clientId,String subscriptionName,final MessageId messageId)
throws IOException{
final boolean debug=log.isDebugEnabled();
JournalTopicAck ack=new JournalTopicAck();
ack.setDestination(destination);
ack.setMessageId(messageId);
ack.setMessageSequenceId(messageId.getBrokerSequenceId());
ack.setSubscritionName(subscriptionName);
ack.setClientId(clientId);
ack.setTransactionId(context.getTransaction()!=null?context.getTransaction().getTransactionId():null);
final Location location=peristenceAdapter.writeCommand(ack,false);
final SubscriptionKey key=new SubscriptionKey(clientId,subscriptionName);
if(!context.isInTransaction()){
if(debug)
log.debug("Journalled acknowledge for: "+messageId+", at: "+location);
acknowledge(messageId,location,key);
}else{
if(debug)
log.debug("Journalled transacted acknowledge for: "+messageId+", at: "+location);
synchronized(this){
inFlightTxLocations.add(location);
}
transactionStore.acknowledge(this,ack,location);
context.getTransaction().addSynchronization(new Synchronization(){
public void afterCommit() throws Exception{
if(debug)
log.debug("Transacted acknowledge commit for: "+messageId+", at: "+location);
synchronized(AMQTopicMessageStore.this){
inFlightTxLocations.remove(location);
acknowledge(messageId,location,key);
}
}
public void afterRollback() throws Exception{
if(debug)
log.debug("Transacted acknowledge rollback for: "+messageId+", at: "+location);
synchronized(AMQTopicMessageStore.this){
inFlightTxLocations.remove(location);
}
}
});
}
}
public boolean replayAcknowledge(ConnectionContext context,String clientId,String subscritionName,
MessageId messageId){
try{
SubscriptionInfo sub=topicReferenceStore.lookupSubscription(clientId,subscritionName);
if(sub!=null){
topicReferenceStore.acknowledge(context,clientId,subscritionName,messageId);
return true;
}
}catch(Throwable e){
log.debug("Could not replay acknowledge for message '"+messageId
+"'. Message may have already been acknowledged. reason: "+e);
}
return false;
}
/**
* @param messageId
* @param location
* @param key
* @throws InterruptedIOException
*/
private void acknowledge(MessageId messageId,Location location,SubscriptionKey key) throws InterruptedIOException{
synchronized(this){
lastLocation=location;
ackedLastAckLocations.put(key,messageId);
}
try{
asyncWriteTask.wakeup();
}catch(InterruptedException e){
throw new InterruptedIOException();
}
}
@Override protected Location doAsyncWrite() throws IOException{
final HashMap<SubscriptionKey,MessageId> cpAckedLastAckLocations;
// swap out the hash maps..
synchronized(this){
cpAckedLastAckLocations=this.ackedLastAckLocations;
this.ackedLastAckLocations=new HashMap<SubscriptionKey,MessageId>();
}
Location location=super.doAsyncWrite();
transactionTemplate.run(new Callback(){
public void execute() throws Exception{
// Checkpoint the acknowledged messages.
Iterator<SubscriptionKey> iterator=cpAckedLastAckLocations.keySet().iterator();
while(iterator.hasNext()){
SubscriptionKey subscriptionKey=iterator.next();
MessageId identity=cpAckedLastAckLocations.get(subscriptionKey);
topicReferenceStore.acknowledge(transactionTemplate.getContext(),subscriptionKey.clientId,
subscriptionKey.subscriptionName,identity);
}
}
});
return location;
}
/**
* @return Returns the longTermStore.
*/
public TopicReferenceStore getTopicReferenceStore(){
return topicReferenceStore;
}
public void deleteSubscription(String clientId,String subscriptionName) throws IOException{
topicReferenceStore.deleteSubscription(clientId,subscriptionName);
}
public SubscriptionInfo[] getAllSubscriptions() throws IOException{
return topicReferenceStore.getAllSubscriptions();
}
public int getMessageCount(String clientId,String subscriberName) throws IOException{
flush();
return topicReferenceStore.getMessageCount(clientId,subscriberName);
}
public void resetBatching(String clientId,String subscriptionName){
topicReferenceStore.resetBatching(clientId,subscriptionName);
}
}

View File

@ -0,0 +1,340 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.store.amq;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map;
import javax.transaction.xa.XAException;
import org.apache.activemq.command.JournalTopicAck;
import org.apache.activemq.command.JournalTransaction;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.TransactionId;
import org.apache.activemq.command.XATransactionId;
import org.apache.activemq.kaha.impl.async.Location;
import org.apache.activemq.store.TransactionRecoveryListener;
import org.apache.activemq.store.TransactionStore;
/**
*/
public class AMQTransactionStore implements TransactionStore {
private final AMQPersistenceAdapter peristenceAdapter;
Map<TransactionId, Tx> inflightTransactions = new LinkedHashMap<TransactionId, Tx>();
Map<TransactionId, Tx> preparedTransactions = new LinkedHashMap<TransactionId, Tx>();
private boolean doingRecover;
public static class TxOperation {
static final byte ADD_OPERATION_TYPE = 0;
static final byte REMOVE_OPERATION_TYPE = 1;
static final byte ACK_OPERATION_TYPE = 3;
public byte operationType;
public AMQMessageStore store;
public Object data;
public Location location;
public TxOperation(byte operationType, AMQMessageStore store, Object data, Location location) {
this.operationType=operationType;
this.store=store;
this.data=data;
this.location=location;
}
}
/**
* Operations
* @version $Revision: 1.6 $
*/
public static class Tx {
private final Location location;
private ArrayList<TxOperation> operations = new ArrayList<TxOperation>();
public Tx(Location location) {
this.location=location;
}
public void add(AMQMessageStore store, Message msg, Location location) {
operations.add(new TxOperation(TxOperation.ADD_OPERATION_TYPE, store, msg, location));
}
public void add(AMQMessageStore store, MessageAck ack) {
operations.add(new TxOperation(TxOperation.REMOVE_OPERATION_TYPE, store, ack, null));
}
public void add(AMQTopicMessageStore store, JournalTopicAck ack) {
operations.add(new TxOperation(TxOperation.ACK_OPERATION_TYPE, store, ack, null));
}
public Message[] getMessages() {
ArrayList<Object> list = new ArrayList<Object>();
for (Iterator<TxOperation> iter = operations.iterator(); iter.hasNext();) {
TxOperation op = iter.next();
if( op.operationType==TxOperation.ADD_OPERATION_TYPE ) {
list.add(op.data);
}
}
Message rc[] = new Message[list.size()];
list.toArray(rc);
return rc;
}
public MessageAck[] getAcks() {
ArrayList<Object> list = new ArrayList<Object>();
for (Iterator<TxOperation> iter = operations.iterator(); iter.hasNext();) {
TxOperation op = iter.next();
if( op.operationType==TxOperation.REMOVE_OPERATION_TYPE ) {
list.add(op.data);
}
}
MessageAck rc[] = new MessageAck[list.size()];
list.toArray(rc);
return rc;
}
public ArrayList<TxOperation> getOperations() {
return operations;
}
}
public AMQTransactionStore(AMQPersistenceAdapter adapter) {
this.peristenceAdapter = adapter;
}
/**
* @throws IOException
* @see org.apache.activemq.store.TransactionStore#prepare(TransactionId)
*/
public void prepare(TransactionId txid) throws IOException{
Tx tx=null;
synchronized(inflightTransactions){
tx=inflightTransactions.remove(txid);
}
if(tx==null)
return;
peristenceAdapter.writeCommand(new JournalTransaction(JournalTransaction.XA_PREPARE,txid,false),true);
synchronized(preparedTransactions){
preparedTransactions.put(txid,tx);
}
}
/**
* @throws IOException
* @see org.apache.activemq.store.TransactionStore#prepare(TransactionId)
*/
public void replayPrepare(TransactionId txid) throws IOException{
Tx tx=null;
synchronized(inflightTransactions){
tx=inflightTransactions.remove(txid);
}
if(tx==null)
return;
synchronized(preparedTransactions){
preparedTransactions.put(txid,tx);
}
}
public Tx getTx(TransactionId txid,Location location){
Tx tx=null;
synchronized(inflightTransactions){
tx=inflightTransactions.get(txid);
}
if(tx==null){
tx=new Tx(location);
inflightTransactions.put(txid,tx);
}
return tx;
}
/**
* @throws XAException
* @see org.apache.activemq.store.TransactionStore#commit(org.apache.activemq.service.Transaction)
*/
public void commit(TransactionId txid,boolean wasPrepared) throws IOException{
Tx tx;
if(wasPrepared){
synchronized(preparedTransactions){
tx=preparedTransactions.remove(txid);
}
}else{
synchronized(inflightTransactions){
tx=inflightTransactions.remove(txid);
}
}
if(tx==null)
return;
if(txid.isXATransaction()){
peristenceAdapter.writeCommand(new JournalTransaction(JournalTransaction.XA_COMMIT,txid,wasPrepared),true);
}else{
peristenceAdapter.writeCommand(new JournalTransaction(JournalTransaction.LOCAL_COMMIT,txid,wasPrepared),
true);
}
}
/**
* @throws XAException
* @see org.apache.activemq.store.TransactionStore#commit(org.apache.activemq.service.Transaction)
*/
public Tx replayCommit(TransactionId txid,boolean wasPrepared) throws IOException{
if(wasPrepared){
synchronized(preparedTransactions){
return preparedTransactions.remove(txid);
}
}else{
synchronized(inflightTransactions){
return inflightTransactions.remove(txid);
}
}
}
/**
* @throws IOException
* @see org.apache.activemq.store.TransactionStore#rollback(TransactionId)
*/
public void rollback(TransactionId txid) throws IOException{
Tx tx=null;
synchronized(inflightTransactions){
tx=inflightTransactions.remove(txid);
}
if(tx!=null)
synchronized(preparedTransactions){
tx=preparedTransactions.remove(txid);
}
if(tx!=null){
if(txid.isXATransaction()){
peristenceAdapter.writeCommand(new JournalTransaction(JournalTransaction.XA_ROLLBACK,txid,false),true);
}else{
peristenceAdapter.writeCommand(new JournalTransaction(JournalTransaction.LOCAL_ROLLBACK,txid,false),
true);
}
}
}
/**
* @throws IOException
* @see org.apache.activemq.store.TransactionStore#rollback(TransactionId)
*/
public void replayRollback(TransactionId txid) throws IOException{
boolean inflight=false;
synchronized(inflightTransactions){
inflight=inflightTransactions.remove(txid)!=null;
}
if(inflight){
synchronized(preparedTransactions){
preparedTransactions.remove(txid);
}
}
}
public void start() throws Exception {
}
public void stop() throws Exception {
}
synchronized public void recover(TransactionRecoveryListener listener) throws IOException{
// All the in-flight transactions get rolled back..
synchronized(inflightTransactions){
inflightTransactions.clear();
}
this.doingRecover=true;
try{
Map<TransactionId, Tx> txs=null;
synchronized(preparedTransactions){
txs=new LinkedHashMap<TransactionId, Tx>(preparedTransactions);
}
for(Iterator<TransactionId> iter=txs.keySet().iterator();iter.hasNext();){
Object txid=iter.next();
Tx tx=txs.get(txid);
listener.recover((XATransactionId)txid,tx.getMessages(),tx.getAcks());
}
}finally{
this.doingRecover=false;
}
}
/**
* @param message
* @throws IOException
*/
void addMessage(AMQMessageStore store, Message message, Location location) throws IOException {
Tx tx = getTx(message.getTransactionId(), location);
tx.add(store, message, location);
}
/**
* @param ack
* @throws IOException
*/
public void removeMessage(AMQMessageStore store, MessageAck ack, Location location) throws IOException {
Tx tx = getTx(ack.getTransactionId(), location);
tx.add(store, ack);
}
public void acknowledge(AMQTopicMessageStore store, JournalTopicAck ack, Location location) {
Tx tx = getTx(ack.getTransactionId(), location);
tx.add(store, ack);
}
public Location checkpoint() throws IOException{
// Nothing really to checkpoint.. since, we don't
// checkpoint tx operations in to long term store until they are committed.
// But we keep track of the first location of an operation
// that was associated with an active tx. The journal can not
// roll over active tx records.
Location rc=null;
synchronized(inflightTransactions){
for(Iterator<Tx> iter=inflightTransactions.values().iterator();iter.hasNext();){
Tx tx=iter.next();
Location location=tx.location;
if(rc==null||rc.compareTo(location)<0){
rc=location;
}
}
}
synchronized(preparedTransactions){
for(Iterator<Tx> iter=preparedTransactions.values().iterator();iter.hasNext();){
Tx tx=iter.next();
Location location=tx.location;
if(rc==null||rc.compareTo(location)<0){
rc=location;
}
}
return rc;
}
}
public boolean isDoingRecover() {
return doingRecover;
}
}

View File

@ -0,0 +1,73 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
* file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
* to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package org.apache.activemq.store.amq;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.store.MessageRecoveryListener;
import org.apache.activemq.store.MessageStore;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
final class RecoveryListenerAdapter implements MessageRecoveryListener{
static final private Log log=LogFactory.getLog(RecoveryListenerAdapter.class);
private final MessageStore store;
private final MessageRecoveryListener listener;
private int count=0;
private MessageId lastRecovered;
RecoveryListenerAdapter(MessageStore store,MessageRecoveryListener listener){
this.store=store;
this.listener=listener;
}
public void finished(){
listener.finished();
}
public boolean hasSpace(){
return listener.hasSpace();
}
public void recoverMessage(Message message) throws Exception{
listener.recoverMessage(message);
lastRecovered=message.getMessageId();
count++;
}
public void recoverMessageReference(MessageId ref) throws Exception{
Message message=this.store.getMessage(ref);
if(message!=null){
listener.recoverMessage(message);
count++;
lastRecovered=ref;
}else{
log.error("Message id "+ref+" could not be recovered from the data store!");
}
}
MessageId getLastRecoveredMessageId() {
return lastRecovered;
}
int size(){
return count;
}
void reset(){
count=0;
}
}

View File

@ -0,0 +1,27 @@
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<html>
<head>
</head>
<body>
<p>
</p>
</body>
</html>

View File

@ -99,6 +99,7 @@ public class KahaReferenceStore implements ReferenceStore{
throws IOException{
ReferenceRecord record=new ReferenceRecord(messageId.toString(),data);
messageContainer.put(messageId,record);
addInterest(record);
}
public ReferenceData getMessageReference(MessageId identity) throws IOException{
@ -120,7 +121,8 @@ public class KahaReferenceStore implements ReferenceStore{
}
public synchronized void removeMessage(MessageId msgId) throws IOException{
messageContainer.remove(msgId);
ReferenceRecord rr = messageContainer.remove(msgId);
removeInterest(rr);
if(messageContainer.isEmpty()){
resetBatching();
}
@ -152,4 +154,28 @@ public class KahaReferenceStore implements ReferenceStore{
public boolean isSupportForCursors(){
return true;
}
/**
* @param startAfter
* @see org.apache.activemq.store.ReferenceStore#setBatch(org.apache.activemq.command.MessageId)
*/
public void setBatch(MessageId startAfter){
resetBatching();
if (startAfter != null) {
batchEntry = messageContainer.getEntry(startAfter);
}
}
public boolean supportsExternalBatchControl(){
return true;
}
void removeInterest(ReferenceRecord rr) {
}
void addInterest(ReferenceRecord rr) {
}
}

View File

@ -75,6 +75,7 @@ public class KahaTopicReferenceStore extends KahaReferenceStore implements Topic
int subscriberCount=subscriberMessages.size();
if(subscriberCount>0){
StoreEntry messageEntry=messageContainer.place(messageId,record);
addInterest(record);
TopicSubAck tsa=new TopicSubAck();
tsa.setCount(subscriberCount);
tsa.setMessageEntry(messageEntry);
@ -129,7 +130,11 @@ public class KahaTopicReferenceStore extends KahaReferenceStore implements Topic
if(tsa!=null){
if(tsa.decrementCount()<=0){
ackContainer.remove(ref.getAckEntry());
ReferenceRecord rr = messageContainer.get(messageId);
if (rr != null) {
messageContainer.remove(tsa.getMessageEntry());
removeInterest(rr);
}
}else{
ackContainer.update(ref.getAckEntry(),tsa);
}