Updated Rapid Persistence Adaptor to do batching for cursors

git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@477567 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Robert Davies 2006-11-21 08:11:03 +00:00
parent ecb50234ec
commit 878b9645ce
9 changed files with 764 additions and 733 deletions

View File

@ -91,7 +91,7 @@ public class KahaMessageStore implements MessageStore, UsageListener{
Message msg=(Message)messageContainer.get(entry); Message msg=(Message)messageContainer.get(entry);
if(msg.getMessageId().equals(identity)){ if(msg.getMessageId().equals(identity)){
result=msg; result=msg;
cache.put(identity,msg); cache.put(identity,entry);
break; break;
} }
} }
@ -186,7 +186,7 @@ public class KahaMessageStore implements MessageStore, UsageListener{
* @throws Exception * @throws Exception
* @see org.apache.activemq.store.MessageStore#recoverNextMessages(org.apache.activemq.command.MessageId, int, org.apache.activemq.store.MessageRecoveryListener) * @see org.apache.activemq.store.MessageStore#recoverNextMessages(org.apache.activemq.command.MessageId, int, org.apache.activemq.store.MessageRecoveryListener)
*/ */
public void recoverNextMessages(int maxReturned,MessageRecoveryListener listener) throws Exception{ public synchronized void recoverNextMessages(int maxReturned,MessageRecoveryListener listener) throws Exception{
StoreEntry entry = batchEntry; StoreEntry entry = batchEntry;
if (entry == null) { if (entry == null) {
entry= messageContainer.getFirst(); entry= messageContainer.getFirst();
@ -239,10 +239,9 @@ public class KahaMessageStore implements MessageStore, UsageListener{
* @see org.apache.activemq.memory.UsageListener#onMemoryUseChanged(org.apache.activemq.memory.UsageManager, int, int) * @see org.apache.activemq.memory.UsageListener#onMemoryUseChanged(org.apache.activemq.memory.UsageManager, int, int)
*/ */
public synchronized void onMemoryUseChanged(UsageManager memoryManager,int oldPercentUsage,int newPercentUsage){ public synchronized void onMemoryUseChanged(UsageManager memoryManager,int oldPercentUsage,int newPercentUsage){
if (newPercentUsage == 100) { if(newPercentUsage==100){
cache.clear(); cache.clear();
} }
} }
} }

View File

@ -32,6 +32,7 @@ import org.apache.activemq.kaha.StoreEntry;
import org.apache.activemq.memory.UsageManager; import org.apache.activemq.memory.UsageManager;
import org.apache.activemq.store.MessageRecoveryListener; import org.apache.activemq.store.MessageRecoveryListener;
import org.apache.activemq.store.TopicMessageStore; import org.apache.activemq.store.TopicMessageStore;
import org.apache.activemq.store.rapid.RapidMessageReference;
/** /**
* @version $Revision: 1.5 $ * @version $Revision: 1.5 $
@ -149,11 +150,9 @@ public class KahaTopicMessageStore extends KahaMessageStore implements TopicMess
listener.recoverMessage((Message)msg); listener.recoverMessage((Message)msg);
} }
} }
listener.finished();
} }
}else{
listener.finished();
} }
listener.finished();
} }
public void recoverNextMessages(String clientId,String subscriptionName,int maxReturned, public void recoverNextMessages(String clientId,String subscriptionName,int maxReturned,
@ -236,31 +235,7 @@ public class KahaTopicMessageStore extends KahaMessageStore implements TopicMess
messageContainer.add(messageRef); messageContainer.add(messageRef);
} }
/**
* @return the destination
* @see org.apache.activemq.store.MessageStore#getDestination()
*/
public ActiveMQDestination getDestination(){
return destination;
}
/**
* @param identity
* @return the Message
* @throws IOException
* @see org.apache.activemq.store.MessageStore#getMessage(org.apache.activemq.command.MessageId)
*/
public Message getMessage(MessageId identity) throws IOException{
Message result=null;
for(Iterator i=messageContainer.iterator();i.hasNext();){
Message msg=(Message)i.next();
if(msg.getMessageId().equals(identity)){
result=msg;
break;
}
}
return result;
}
/** /**
* @param identity * @param identity
@ -272,22 +247,7 @@ public class KahaTopicMessageStore extends KahaMessageStore implements TopicMess
return null; return null;
} }
/**
* @throws Exception
* @see org.apache.activemq.store.MessageStore#recover(org.apache.activemq.store.MessageRecoveryListener)
*/
public void recover(MessageRecoveryListener listener) throws Exception{
for(Iterator iter=messageContainer.iterator();iter.hasNext();){
Object msg=iter.next();
if(msg.getClass()==String.class){
listener.recoverMessageReference((String)msg);
}else{
listener.recoverMessage((Message)msg);
}
}
listener.finished();
}
/** /**
* @param context * @param context
* @throws IOException * @throws IOException
@ -302,23 +262,7 @@ public class KahaTopicMessageStore extends KahaMessageStore implements TopicMess
} }
} }
/**
* @param context
* @param ack
* @throws IOException
* @see org.apache.activemq.store.MessageStore#removeMessage(org.apache.activemq.broker.ConnectionContext,
* org.apache.activemq.command.MessageAck)
*/
public void removeMessage(ConnectionContext context,MessageAck ack) throws IOException{
for(Iterator i=messageContainer.iterator();i.hasNext();){
Message msg=(Message)i.next();
if(msg.getMessageId().equals(ack.getLastMessageId())){
i.remove();
break;
}
}
}
public synchronized void resetBatching(String clientId,String subscriptionName){ public synchronized void resetBatching(String clientId,String subscriptionName){
String key=getSubscriptionKey(clientId,subscriptionName); String key=getSubscriptionKey(clientId,subscriptionName);
TopicSubContainer topicSubContainer=(TopicSubContainer)subscriberMessages.get(key); TopicSubContainer topicSubContainer=(TopicSubContainer)subscriberMessages.get(key);
@ -326,25 +270,4 @@ public class KahaTopicMessageStore extends KahaMessageStore implements TopicMess
topicSubContainer.reset(); topicSubContainer.reset();
} }
} }
public MessageId getNextMessageIdToDeliver(String clientId,String subscriptionName) throws IOException{
// TODO Auto-generated method stub
return null;
}
/**
* @param clientId
* @param subscriptionName
* @param id
* @return previous messageId
* @throws IOException
* @see org.apache.activemq.store.TopicMessageStore#getPreviousMessageIdToDeliver(java.lang.String,
* java.lang.String, org.apache.activemq.command.MessageId)
*/
public MessageId getPreviousMessageIdToDeliver(String clientId,String subscriptionName,MessageId id)
throws IOException{
// TODO Auto-generated method stub
return null;
}
} }

View File

@ -22,43 +22,43 @@ import org.apache.activemq.kaha.StoreEntry;
* *
* @version $Revision: 1.10 $ * @version $Revision: 1.10 $
*/ */
class TopicSubContainer{ public class TopicSubContainer{
private ListContainer listContainer; private ListContainer listContainer;
private StoreEntry batchEntry; private StoreEntry batchEntry;
TopicSubContainer(ListContainer container){ public TopicSubContainer(ListContainer container){
this.listContainer = container; this.listContainer = container;
} }
/** /**
* @return the batchEntry * @return the batchEntry
*/ */
StoreEntry getBatchEntry(){ public StoreEntry getBatchEntry(){
return this.batchEntry; return this.batchEntry;
} }
/** /**
* @param batchEntry the batchEntry to set * @param batchEntry the batchEntry to set
*/ */
void setBatchEntry(StoreEntry batchEntry){ public void setBatchEntry(StoreEntry batchEntry){
this.batchEntry=batchEntry; this.batchEntry=batchEntry;
} }
/** /**
* @return the listContainer * @return the listContainer
*/ */
ListContainer getListContainer(){ public ListContainer getListContainer(){
return this.listContainer; return this.listContainer;
} }
/** /**
* @param listContainer the listContainer to set * @param listContainer the listContainer to set
*/ */
void setListContainer(ListContainer container){ public void setListContainer(ListContainer container){
this.listContainer=container; this.listContainer=container;
} }
void reset() { public void reset() {
batchEntry = null; batchEntry = null;
} }

View File

@ -26,6 +26,10 @@ public class RapidMessageReference {
public final MessageId messageId; public final MessageId messageId;
public final Location location; public final Location location;
public RapidMessageReference(MessageId messageId, Location location) {
this.messageId = messageId;
this.location=location;
}
public RapidMessageReference(Message message, Location location) { public RapidMessageReference(Message message, Location location) {
this.messageId = message.getMessageId(); this.messageId = message.getMessageId();
this.location=location; this.location=location;

View File

@ -0,0 +1,46 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.store.rapid;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.activeio.journal.active.Location;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.kaha.Marshaller;
public class RapidMessageReferenceMarshaller implements Marshaller{
public Object readPayload(DataInput dataIn) throws IOException{
MessageId mid = new MessageId(dataIn.readUTF());
Location loc = new Location(dataIn.readInt(),dataIn.readInt());
RapidMessageReference rmr = new RapidMessageReference(mid,loc);
return rmr;
}
public void writePayload(Object object,DataOutput dataOut) throws IOException{
RapidMessageReference rmr = (RapidMessageReference)object;
dataOut.writeUTF(rmr.getMessageId().toString());
dataOut.writeInt(rmr.getLocation().getLogFileId());
dataOut.writeInt(rmr.getLocation().getLogFileOffset());
}
}

View File

@ -30,11 +30,15 @@ import org.apache.activemq.command.JournalQueueAck;
import org.apache.activemq.command.Message; import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck; import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageId; import org.apache.activemq.command.MessageId;
import org.apache.activemq.kaha.ListContainer;
import org.apache.activemq.kaha.MapContainer; import org.apache.activemq.kaha.MapContainer;
import org.apache.activemq.kaha.StoreEntry;
import org.apache.activemq.memory.UsageListener;
import org.apache.activemq.memory.UsageManager; import org.apache.activemq.memory.UsageManager;
import org.apache.activemq.store.MessageRecoveryListener; import org.apache.activemq.store.MessageRecoveryListener;
import org.apache.activemq.store.MessageStore; import org.apache.activemq.store.MessageStore;
import org.apache.activemq.transaction.Synchronization; import org.apache.activemq.transaction.Synchronization;
import org.apache.activemq.util.LRUCache;
import org.apache.activemq.util.TransactionTemplate; import org.apache.activemq.util.TransactionTemplate;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -44,31 +48,42 @@ import org.apache.commons.logging.LogFactory;
* *
* @version $Revision: 1.14 $ * @version $Revision: 1.14 $
*/ */
public class RapidMessageStore implements MessageStore { public class RapidMessageStore implements MessageStore, UsageListener {
private static final Log log = LogFactory.getLog(RapidMessageStore.class); private static final Log log = LogFactory.getLog(RapidMessageStore.class);
protected final RapidPersistenceAdapter peristenceAdapter; protected final RapidPersistenceAdapter peristenceAdapter;
protected final RapidTransactionStore transactionStore; protected final RapidTransactionStore transactionStore;
protected final MapContainer messageContainer; protected final ListContainer messageContainer;
protected final ActiveMQDestination destination; protected final ActiveMQDestination destination;
protected final TransactionTemplate transactionTemplate; protected final TransactionTemplate transactionTemplate;
protected final LRUCache cache;
// private LinkedHashMap messages = new LinkedHashMap(); protected UsageManager usageManager;
// private ArrayList messageAcks = new ArrayList(); protected StoreEntry batchEntry = null;
// /** A MessageStore that we can use to retrieve messages quickly. */
// private LinkedHashMap cpAddedMessageIds;
protected Location lastLocation; protected Location lastLocation;
protected HashSet inFlightTxLocations = new HashSet(); protected HashSet inFlightTxLocations = new HashSet();
public RapidMessageStore(RapidPersistenceAdapter adapter, ActiveMQDestination destination, MapContainer container) { public RapidMessageStore(RapidPersistenceAdapter adapter, ActiveMQDestination destination, ListContainer container, int maximumCacheSize) {
this.peristenceAdapter = adapter; this.peristenceAdapter = adapter;
this.transactionStore = adapter.getTransactionStore(); this.transactionStore = adapter.getTransactionStore();
this.messageContainer = container; this.messageContainer = container;
this.destination = destination; this.destination = destination;
this.transactionTemplate = new TransactionTemplate(adapter, new ConnectionContext()); this.transactionTemplate = new TransactionTemplate(adapter, new ConnectionContext());
this.cache=new LRUCache(maximumCacheSize,maximumCacheSize,0.75f,false);
// populate the cache
StoreEntry entry=messageContainer.getFirst();
int count = 0;
if(entry!=null){
do{
RapidMessageReference msg = (RapidMessageReference)messageContainer.get(entry);
cache.put(msg.getMessageId(),entry);
entry = messageContainer.getNext(entry);
count++;
}while(entry!=null && count < maximumCacheSize);
}
} }
@ -76,7 +91,7 @@ public class RapidMessageStore implements MessageStore {
* Not synchronized since the Journal has better throughput if you increase * Not synchronized since the Journal has better throughput if you increase
* the number of concurrent writes that it is doing. * the number of concurrent writes that it is doing.
*/ */
public void addMessage(ConnectionContext context, final Message message) throws IOException { public synchronized void addMessage(ConnectionContext context, final Message message) throws IOException {
final MessageId id = message.getMessageId(); final MessageId id = message.getMessageId();
@ -118,12 +133,9 @@ public class RapidMessageStore implements MessageStore {
} }
} }
private void addMessage(final RapidMessageReference messageReference) { private synchronized void addMessage(final RapidMessageReference messageReference){
synchronized (this) { StoreEntry item=messageContainer.placeLast(messageReference);
lastLocation = messageReference.getLocation(); cache.put(messageReference.getMessageId(),item);
MessageId id = messageReference.getMessageId();
messageContainer.put(id.toString(), messageReference);
}
} }
static protected String toString(Location location) { static protected String toString(Location location) {
@ -141,7 +153,7 @@ public class RapidMessageStore implements MessageStore {
public void replayAddMessage(ConnectionContext context, Message message, Location location) { public void replayAddMessage(ConnectionContext context, Message message, Location location) {
try { try {
RapidMessageReference messageReference = new RapidMessageReference(message, location); RapidMessageReference messageReference = new RapidMessageReference(message, location);
messageContainer.put(message.getMessageId().toString(), messageReference); addMessage(messageReference);
} }
catch (Throwable e) { catch (Throwable e) {
log.warn("Could not replay add for message '" + message.getMessageId() + "'. Message may have already been added. reason: " + e); log.warn("Could not replay add for message '" + message.getMessageId() + "'. Message may have already been added. reason: " + e);
@ -160,7 +172,7 @@ public class RapidMessageStore implements MessageStore {
if( !context.isInTransaction() ) { if( !context.isInTransaction() ) {
if( debug ) if( debug )
log.debug("Journalled message remove for: "+ack.getLastMessageId()+", at: "+location); log.debug("Journalled message remove for: "+ack.getLastMessageId()+", at: "+location);
removeMessage(ack, location); removeMessage(ack.getLastMessageId());
} else { } else {
if( debug ) if( debug )
log.debug("Journalled transacted message remove for: "+ack.getLastMessageId()+", at: "+location); log.debug("Journalled transacted message remove for: "+ack.getLastMessageId()+", at: "+location);
@ -174,7 +186,7 @@ public class RapidMessageStore implements MessageStore {
log.debug("Transacted message remove commit for: "+ack.getLastMessageId()+", at: "+location); log.debug("Transacted message remove commit for: "+ack.getLastMessageId()+", at: "+location);
synchronized( RapidMessageStore.this ) { synchronized( RapidMessageStore.this ) {
inFlightTxLocations.remove(location); inFlightTxLocations.remove(location);
removeMessage(ack, location); removeMessage(ack.getLastMessageId());
} }
} }
public void afterRollback() throws Exception { public void afterRollback() throws Exception {
@ -189,32 +201,53 @@ public class RapidMessageStore implements MessageStore {
} }
} }
private void removeMessage(final MessageAck ack, final Location location) {
synchronized (this) { public synchronized void removeMessage(MessageId msgId) throws IOException{
lastLocation = location; StoreEntry entry=(StoreEntry)cache.remove(msgId);
MessageId id = ack.getLastMessageId(); if(entry!=null){
messageContainer.remove(id.toString()); entry = messageContainer.refresh(entry);
messageContainer.remove(entry);
}else{
for (entry = messageContainer.getFirst();entry != null; entry = messageContainer.getNext(entry)) {
RapidMessageReference msg=(RapidMessageReference)messageContainer.get(entry);
if(msg.getMessageId().equals(msgId)){
messageContainer.remove(entry);
break;
}
}
} }
} }
public void replayRemoveMessage(ConnectionContext context, MessageAck ack) { public void replayRemoveMessage(ConnectionContext context, MessageAck ack) {
try { try {
MessageId id = ack.getLastMessageId(); MessageId id = ack.getLastMessageId();
messageContainer.remove(id.toString()); removeMessage(id);
} }
catch (Throwable e) { catch (Throwable e) {
log.warn("Could not replay acknowledge for message '" + ack.getLastMessageId() + "'. Message may have already been acknowledged. reason: " + e); log.warn("Could not replay acknowledge for message '" + ack.getLastMessageId() + "'. Message may have already been acknowledged. reason: " + e);
} }
} }
/**
* public synchronized Message getMessage(MessageId identity) throws IOException{
*/ RapidMessageReference result=null;
public Message getMessage(MessageId id) throws IOException { StoreEntry entry=(StoreEntry)cache.get(identity);
RapidMessageReference messageReference = (RapidMessageReference) messageContainer.get(id.toString()); if(entry!=null){
if (messageReference == null ) entry = messageContainer.refresh(entry);
result = (RapidMessageReference)messageContainer.get(entry);
}else{
for (entry = messageContainer.getFirst();entry != null; entry = messageContainer.getNext(entry)) {
RapidMessageReference msg=(RapidMessageReference)messageContainer.get(entry);
if(msg.getMessageId().equals(identity)){
result=msg;
cache.put(identity,entry);
break;
}
}
}
if (result == null )
return null; return null;
return (Message) peristenceAdapter.readCommand(messageReference.getLocation()); return (Message) peristenceAdapter.readCommand(result.getLocation());
} }
/** /**
@ -225,28 +258,32 @@ public class RapidMessageStore implements MessageStore {
* @param listener * @param listener
* @throws Exception * @throws Exception
*/ */
public void recover(final MessageRecoveryListener listener) throws Exception {
for(Iterator iter=messageContainer.values().iterator();iter.hasNext();){ public synchronized void recover(MessageRecoveryListener listener) throws Exception{
for(Iterator iter=messageContainer.iterator();iter.hasNext();){
RapidMessageReference messageReference=(RapidMessageReference) iter.next(); RapidMessageReference messageReference=(RapidMessageReference) iter.next();
Message m = (Message) peristenceAdapter.readCommand(messageReference.getLocation()); Message m = (Message) peristenceAdapter.readCommand(messageReference.getLocation());
listener.recoverMessage(m); listener.recoverMessage(m);
} }
listener.finished(); listener.finished();
} }
public void start() throws Exception { public void start() {
if( this.usageManager != null )
this.usageManager.addUsageListener(this);
} }
public void stop() throws Exception { public void stop() {
if( this.usageManager != null )
this.usageManager.removeUsageListener(this);
} }
/** /**
* @see org.apache.activemq.store.MessageStore#removeAllMessages(ConnectionContext) * @see org.apache.activemq.store.MessageStore#removeAllMessages(ConnectionContext)
*/ */
public void removeAllMessages(ConnectionContext context) throws IOException { public synchronized void removeAllMessages(ConnectionContext context) throws IOException {
messageContainer.clear(); messageContainer.clear();
cache.clear();
} }
public ActiveMQDestination getDestination() { public ActiveMQDestination getDestination() {
@ -254,15 +291,16 @@ public class RapidMessageStore implements MessageStore {
} }
public void addMessageReference(ConnectionContext context, MessageId messageId, long expirationTime, String messageRef) throws IOException { public void addMessageReference(ConnectionContext context, MessageId messageId, long expirationTime, String messageRef) throws IOException {
throw new IOException("The journal does not support message references."); throw new IOException("Does not support message references.");
} }
public String getMessageReference(MessageId identity) throws IOException { public String getMessageReference(MessageId identity) throws IOException {
throw new IOException("The journal does not support message references."); throw new IOException("Does not support message references.");
} }
public void setUsageManager(UsageManager usageManager) { public void setUsageManager(UsageManager usageManager) {
this.usageManager = usageManager;
} }
/** /**
@ -289,13 +327,50 @@ public class RapidMessageStore implements MessageStore {
public int getMessageCount(){ public int getMessageCount(){
return 0; return messageContainer.size();
} }
public void recoverNextMessages(int maxReturned,MessageRecoveryListener listener) throws Exception{ public synchronized void recoverNextMessages(int maxReturned,MessageRecoveryListener listener) throws Exception{
StoreEntry entry=batchEntry;
if(entry==null){
entry=messageContainer.getFirst();
}else{
entry=messageContainer.refresh(entry);
entry=messageContainer.getNext(entry);
}
if(entry!=null){
int count=0;
do{
RapidMessageReference messageReference=(RapidMessageReference)messageContainer.get(entry);
Message msg=(Message)peristenceAdapter.readCommand(messageReference.getLocation());
if(msg!=null){
Message message=(Message)msg;
listener.recoverMessage(message);
count++;
}
batchEntry=entry;
entry=messageContainer.getNext(entry);
}while(entry!=null&&count<maxReturned);
}
listener.finished();
} }
public void resetBatching(){ public void resetBatching(){
batchEntry = null;
}
/**
* @return true if the store supports cursors
*/
public boolean isSupportForCursors() {
return true;
}
public synchronized void onMemoryUseChanged(UsageManager memoryManager,int oldPercentUsage,int newPercentUsage){
if (newPercentUsage == 100) {
cache.clear();
}
} }
} }

View File

@ -1,20 +1,17 @@
/** /**
* *
* Licensed to the Apache Software Foundation (ASF) under one or more * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
* contributor license agreements. See the NOTICE file distributed with * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
* this work for additional information regarding copyright ownership. * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
* The ASF licenses this file to You under the Apache License, Version 2.0 * License. You may obtain a copy of the License at
* (the "License"); you may not use this file except in compliance with *
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0 * http://www.apache.org/licenses/LICENSE-2.0
* *
* Unless required by applicable law or agreed to in writing, software * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* distributed under the License is distributed on an "AS IS" BASIS, * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * specific language governing permissions and limitations under the License.
* See the License for the specific language governing permissions and
* limitations under the License.
*/ */
package org.apache.activemq.store.rapid; package org.apache.activemq.store.rapid;
import java.io.File; import java.io.File;
@ -32,7 +29,6 @@ import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.activeio.journal.InvalidRecordLocationException; import org.apache.activeio.journal.InvalidRecordLocationException;
import org.apache.activeio.journal.Journal; import org.apache.activeio.journal.Journal;
import org.apache.activeio.journal.JournalEventListener; import org.apache.activeio.journal.JournalEventListener;
@ -52,6 +48,7 @@ import org.apache.activemq.command.JournalTrace;
import org.apache.activemq.command.JournalTransaction; import org.apache.activemq.command.JournalTransaction;
import org.apache.activemq.command.Message; import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck; import org.apache.activemq.command.MessageAck;
import org.apache.activemq.kaha.ListContainer;
import org.apache.activemq.kaha.MapContainer; import org.apache.activemq.kaha.MapContainer;
import org.apache.activemq.kaha.Store; import org.apache.activemq.kaha.Store;
import org.apache.activemq.kaha.StoreFactory; import org.apache.activemq.kaha.StoreFactory;
@ -65,6 +62,8 @@ import org.apache.activemq.store.TopicMessageStore;
import org.apache.activemq.store.TransactionStore; import org.apache.activemq.store.TransactionStore;
import org.apache.activemq.store.kahadaptor.AtomicIntegerMarshaller; import org.apache.activemq.store.kahadaptor.AtomicIntegerMarshaller;
import org.apache.activemq.store.kahadaptor.CommandMarshaller; import org.apache.activemq.store.kahadaptor.CommandMarshaller;
import org.apache.activemq.store.kahadaptor.KahaTopicMessageStore;
import org.apache.activemq.store.kahadaptor.TopicSubAckMarshaller;
import org.apache.activemq.store.rapid.RapidTransactionStore.Tx; import org.apache.activemq.store.rapid.RapidTransactionStore.Tx;
import org.apache.activemq.store.rapid.RapidTransactionStore.TxOperation; import org.apache.activemq.store.rapid.RapidTransactionStore.TxOperation;
import org.apache.activemq.thread.Scheduler; import org.apache.activemq.thread.Scheduler;
@ -78,101 +77,89 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
/** /**
* An implementation of {@link PersistenceAdapter} designed for use with a * An implementation of {@link PersistenceAdapter} designed for use with a {@link Journal} and then check pointing
* {@link Journal} and then check pointing asynchronously on a timeout with some * asynchronously on a timeout with some other long term persistent storage.
* other long term persistent storage.
* *
* @org.apache.xbean.XBean * @org.apache.xbean.XBean
* *
* @version $Revision: 1.17 $ * @version $Revision: 1.17 $
*/ */
public class RapidPersistenceAdapter implements PersistenceAdapter, JournalEventListener, UsageListener { public class RapidPersistenceAdapter implements PersistenceAdapter,JournalEventListener,UsageListener{
private static final Log log = LogFactory.getLog(RapidPersistenceAdapter.class); private static final Log log=LogFactory.getLog(RapidPersistenceAdapter.class);
private final Journal journal; private final Journal journal;
private final WireFormat wireFormat=new OpenWireFormat();
private final WireFormat wireFormat = new OpenWireFormat(); private final ConcurrentHashMap queues=new ConcurrentHashMap();
private final ConcurrentHashMap topics=new ConcurrentHashMap();
private final ConcurrentHashMap queues = new ConcurrentHashMap(); private long checkpointInterval=1000*60*5;
private final ConcurrentHashMap topics = new ConcurrentHashMap(); private long lastCheckpointRequest=System.currentTimeMillis();
private int maxCheckpointWorkers=10;
private long checkpointInterval = 1000 * 60 * 5; private int maxCheckpointMessageAddSize=5000;
private long lastCheckpointRequest = System.currentTimeMillis(); private RapidTransactionStore transactionStore=new RapidTransactionStore(this);
private int maxCheckpointWorkers = 10;
private int maxCheckpointMessageAddSize = 5000;
private RapidTransactionStore transactionStore = new RapidTransactionStore(this);
private ThreadPoolExecutor checkpointExecutor; private ThreadPoolExecutor checkpointExecutor;
private TaskRunner checkpointTask; private TaskRunner checkpointTask;
private CountDownLatch nextCheckpointCountDownLatch = new CountDownLatch(1); private CountDownLatch nextCheckpointCountDownLatch=new CountDownLatch(1);
private boolean fullCheckPoint; private boolean fullCheckPoint;
private AtomicBoolean started=new AtomicBoolean(false);
private AtomicBoolean started = new AtomicBoolean(false);
Store store; Store store;
private boolean useExternalMessageReferences; private boolean useExternalMessageReferences;
private final Runnable periodicCheckpointTask=createPeriodicCheckpointTask();
private int maximumDestinationCacheSize=2000;
final Runnable createPeriodicCheckpointTask(){
return new Runnable(){
private final Runnable periodicCheckpointTask = createPeriodicCheckpointTask(); public void run(){
if(System.currentTimeMillis()>lastCheckpointRequest+checkpointInterval){
final Runnable createPeriodicCheckpointTask() { checkpoint(false,true);
return new Runnable() { }
public void run() { }
if( System.currentTimeMillis()>lastCheckpointRequest+checkpointInterval ) { };
checkpoint(false, true);
}
}
};
} }
public RapidPersistenceAdapter(Journal journal, TaskRunnerFactory taskRunnerFactory) throws IOException {
this.journal = journal; public RapidPersistenceAdapter(Journal journal,TaskRunnerFactory taskRunnerFactory) throws IOException{
this.journal=journal;
journal.setJournalEventListener(this); journal.setJournalEventListener(this);
File dir=((JournalImpl)journal).getLogDirectory();
File dir = ((JournalImpl)journal).getLogDirectory();
String name=dir.getAbsolutePath()+File.separator+"kaha.db"; String name=dir.getAbsolutePath()+File.separator+"kaha.db";
store=StoreFactory.open(name,"rw"); store=StoreFactory.open(name,"rw");
checkpointTask=taskRunnerFactory.createTaskRunner(new Task(){
checkpointTask = taskRunnerFactory.createTaskRunner(new Task(){
public boolean iterate() { public boolean iterate(){
return doCheckpoint(); return doCheckpoint();
} }
}, "ActiveMQ Checkpoint Worker"); },"ActiveMQ Checkpoint Worker");
} }
public Set getDestinations() { public Set getDestinations(){
Set rc=new HashSet(); Set rc=new HashSet();
try { try{
for(Iterator i=store.getMapContainerIds().iterator();i.hasNext();){ for(Iterator i=store.getMapContainerIds().iterator();i.hasNext();){
Object obj=i.next(); Object obj=i.next();
if(obj instanceof ActiveMQDestination){ if(obj instanceof ActiveMQDestination){
rc.add(obj); rc.add(obj);
}
} }
}
}catch(IOException e){ }catch(IOException e){
log.error("Failed to get destinations " ,e); log.error("Failed to get destinations ",e);
} }
return rc; return rc;
} }
private MessageStore createMessageStore(ActiveMQDestination destination) throws IOException { private MessageStore createMessageStore(ActiveMQDestination destination) throws IOException{
if (destination.isQueue()) { if(destination.isQueue()){
return createQueueMessageStore((ActiveMQQueue) destination); return createQueueMessageStore((ActiveMQQueue)destination);
} }else{
else { return createTopicMessageStore((ActiveMQTopic)destination);
return createTopicMessageStore((ActiveMQTopic) destination);
} }
} }
public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException { public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException{
RapidMessageStore store = (RapidMessageStore) queues.get(destination); RapidMessageStore store=(RapidMessageStore)queues.get(destination);
if (store == null) { if(store==null){
MapContainer messageContainer=getMapContainer(destination,"topic-data"); ListContainer messageContainer=getListContainer(destination,"topic-data");
store = new RapidMessageStore(this, destination, messageContainer); store=new RapidMessageStore(this,destination,messageContainer,maximumDestinationCacheSize);
queues.put(destination, store); queues.put(destination,store);
} }
return store; return store;
} }
@ -189,257 +176,241 @@ public class RapidPersistenceAdapter implements PersistenceAdapter, JournalEvent
return container; return container;
} }
public TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException { protected ListContainer getListContainer(Object id,String containerName) throws IOException{
RapidTopicMessageStore store = (RapidTopicMessageStore) topics.get(destination); Store store=getStore();
if (store == null) { ListContainer container=store.getListContainer(id,containerName);
container.setMaximumCacheSize(0);
MapContainer messageContainer=getMapContainer(destination,"topic-data"); container.setMarshaller(new RapidMessageReferenceMarshaller());
MapContainer subsContainer=getMapContainer(destination.toString()+"-subscriptions","topic-subs"); container.load();
MapContainer ackContainer=this.store.getMapContainer(destination.toString(),"topic-acks"); return container;
ackContainer.setKeyMarshaller(new StringMarshaller());
ackContainer.setValueMarshaller(new AtomicIntegerMarshaller());
store = new RapidTopicMessageStore(this, destination, messageContainer, subsContainer, ackContainer);
topics.put(destination, store);
}
return store;
} }
public TransactionStore createTransactionStore() throws IOException { public TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException{
TopicMessageStore rc=(TopicMessageStore)topics.get(destination);
if(rc==null){
Store store=getStore();
ListContainer messageContainer=getListContainer(destination,"topic-data");
MapContainer subsContainer=getMapContainer(destination.toString()+"-Subscriptions","topic-subs");
ListContainer ackContainer=store.getListContainer(destination.toString(),"topic-acks");
ackContainer.setMarshaller(new TopicSubAckMarshaller());
rc=new RapidTopicMessageStore(this,store,messageContainer,ackContainer,subsContainer,destination,
maximumDestinationCacheSize);
topics.put(destination,rc);
}
return rc;
}
public TransactionStore createTransactionStore() throws IOException{
return transactionStore; return transactionStore;
} }
public long getLastMessageBrokerSequenceId() throws IOException { public long getLastMessageBrokerSequenceId() throws IOException{
// TODO: implement this. // TODO: implement this.
return 0; return 0;
} }
public void beginTransaction(ConnectionContext context) throws IOException { public void beginTransaction(ConnectionContext context) throws IOException{
} }
public void commitTransaction(ConnectionContext context) throws IOException { public void commitTransaction(ConnectionContext context) throws IOException{
} }
public void rollbackTransaction(ConnectionContext context) throws IOException { public void rollbackTransaction(ConnectionContext context) throws IOException{
} }
public synchronized void start() throws Exception { public synchronized void start() throws Exception{
if( !started.compareAndSet(false, true) ) if(!started.compareAndSet(false,true))
return; return;
checkpointExecutor=new ThreadPoolExecutor(maxCheckpointWorkers,maxCheckpointWorkers,30,TimeUnit.SECONDS,
checkpointExecutor = new ThreadPoolExecutor(maxCheckpointWorkers, maxCheckpointWorkers, 30, TimeUnit.SECONDS, new LinkedBlockingQueue(), new ThreadFactory() { new LinkedBlockingQueue(),new ThreadFactory(){
public Thread newThread(Runnable runable) {
Thread t = new Thread(runable, "Journal checkpoint worker"); public Thread newThread(Runnable runable){
t.setPriority(7); Thread t=new Thread(runable,"Journal checkpoint worker");
return t; t.setPriority(7);
} return t;
}); }
//checkpointExecutor.allowCoreThreadTimeOut(true); });
// checkpointExecutor.allowCoreThreadTimeOut(true);
createTransactionStore(); createTransactionStore();
recover(); recover();
// Do a checkpoint periodically. // Do a checkpoint periodically.
Scheduler.executePeriodically(periodicCheckpointTask, checkpointInterval/10); Scheduler.executePeriodically(periodicCheckpointTask,checkpointInterval/10);
} }
public void stop() throws Exception { public void stop() throws Exception{
if(!started.compareAndSet(true,false))
if( !started.compareAndSet(true, false) )
return; return;
Scheduler.cancel(periodicCheckpointTask); Scheduler.cancel(periodicCheckpointTask);
// Take one final checkpoint and stop checkpoint processing. // Take one final checkpoint and stop checkpoint processing.
checkpoint(false, true); checkpoint(false,true);
checkpointTask.shutdown(); checkpointTask.shutdown();
checkpointExecutor.shutdown(); checkpointExecutor.shutdown();
queues.clear(); queues.clear();
topics.clear(); topics.clear();
IOException firstException=null;
IOException firstException = null; try{
try {
journal.close(); journal.close();
} catch (Exception e) { }catch(Exception e){
firstException = IOExceptionSupport.create("Failed to close journals: " + e, e); firstException=IOExceptionSupport.create("Failed to close journals: "+e,e);
} }
store.close(); store.close();
if(firstException!=null){
if (firstException != null) {
throw firstException; throw firstException;
} }
} }
// Properties // Properties
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
/** /**
* @return Returns the wireFormat. * @return Returns the wireFormat.
*/ */
public WireFormat getWireFormat() { public WireFormat getWireFormat(){
return wireFormat; return wireFormat;
} }
// Implementation methods // Implementation methods
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
/** /**
* The Journal give us a call back so that we can move old data out of the * The Journal give us a call back so that we can move old data out of the journal. Taking a checkpoint does this
* journal. Taking a checkpoint does this for us. * for us.
* *
* @see org.apache.activemq.journal.JournalEventListener#overflowNotification(org.apache.activemq.journal.RecordLocation) * @see org.apache.activemq.journal.JournalEventListener#overflowNotification(org.apache.activemq.journal.RecordLocation)
*/ */
public void overflowNotification(RecordLocation safeLocation) { public void overflowNotification(RecordLocation safeLocation){
checkpoint(false, true); checkpoint(false,true);
} }
/** /**
* When we checkpoint we move all the journalled data to long term storage. * When we checkpoint we move all the journalled data to long term storage.
* @param stopping *
* @param stopping
* *
* @param b * @param b
*/ */
public void checkpoint(boolean sync, boolean fullCheckpoint) { public void checkpoint(boolean sync,boolean fullCheckpoint){
try { try{
if (journal == null ) if(journal==null)
throw new IllegalStateException("Journal is closed."); throw new IllegalStateException("Journal is closed.");
long now=System.currentTimeMillis();
long now = System.currentTimeMillis(); CountDownLatch latch=null;
CountDownLatch latch = null; synchronized(this){
synchronized(this) { latch=nextCheckpointCountDownLatch;
latch = nextCheckpointCountDownLatch; lastCheckpointRequest=now;
lastCheckpointRequest = now; if(fullCheckpoint){
if( fullCheckpoint ) { this.fullCheckPoint=true;
this.fullCheckPoint = true;
} }
} }
checkpointTask.wakeup(); checkpointTask.wakeup();
if(sync){
if (sync) {
log.debug("Waking for checkpoint to complete."); log.debug("Waking for checkpoint to complete.");
latch.await(); latch.await();
} }
} }catch(InterruptedException e){
catch (InterruptedException e) { log.warn("Request to start checkpoint failed: "+e,e);
log.warn("Request to start checkpoint failed: " + e, e);
} }
} }
/** /**
* This does the actual checkpoint. * This does the actual checkpoint.
* @return *
* @return
*/ */
public boolean doCheckpoint() { public boolean doCheckpoint(){
CountDownLatch latch = null; CountDownLatch latch=null;
boolean fullCheckpoint; boolean fullCheckpoint;
synchronized(this) { synchronized(this){
latch = nextCheckpointCountDownLatch; latch=nextCheckpointCountDownLatch;
nextCheckpointCountDownLatch = new CountDownLatch(1); nextCheckpointCountDownLatch=new CountDownLatch(1);
fullCheckpoint = this.fullCheckPoint; fullCheckpoint=this.fullCheckPoint;
this.fullCheckPoint=false; this.fullCheckPoint=false;
} }
try { try{
log.debug("Checkpoint started."); log.debug("Checkpoint started.");
RecordLocation newMark = null; RecordLocation newMark=null;
ArrayList futureTasks=new ArrayList(queues.size()+topics.size());
ArrayList futureTasks = new ArrayList(queues.size()+topics.size());
// //
// We do many partial checkpoints (fullCheckpoint==false) to move topic messages // We do many partial checkpoints (fullCheckpoint==false) to move topic messages
// to long term store as soon as possible. // to long term store as soon as possible.
// //
// We want to avoid doing that for queue messages since removes the come in the same // We want to avoid doing that for queue messages since removes the come in the same
// checkpoint cycle will nullify the previous message add. Therefore, we only // checkpoint cycle will nullify the previous message add. Therefore, we only
// checkpoint queues on the fullCheckpoint cycles. // checkpoint queues on the fullCheckpoint cycles.
// //
if( fullCheckpoint ) { if(fullCheckpoint){
Iterator iterator = queues.values().iterator(); Iterator iterator=queues.values().iterator();
while (iterator.hasNext()) { while(iterator.hasNext()){
try { try{
final RapidMessageStore ms = (RapidMessageStore) iterator.next(); final RapidMessageStore ms=(RapidMessageStore)iterator.next();
FutureTask task = new FutureTask(new Callable() { FutureTask task=new FutureTask(new Callable(){
public Object call() throws Exception {
public Object call() throws Exception{
return ms.checkpoint(); return ms.checkpoint();
}}); }
});
futureTasks.add(task); futureTasks.add(task);
checkpointExecutor.execute(task); checkpointExecutor.execute(task);
} }catch(Exception e){
catch (Exception e) { log.error("Failed to checkpoint a message store: "+e,e);
log.error("Failed to checkpoint a message store: " + e, e);
} }
} }
} }
Iterator iterator=topics.values().iterator();
while(iterator.hasNext()){
try{
final RapidTopicMessageStore ms=(RapidTopicMessageStore)iterator.next();
FutureTask task=new FutureTask(new Callable(){
Iterator iterator = topics.values().iterator(); public Object call() throws Exception{
while (iterator.hasNext()) {
try {
final RapidTopicMessageStore ms = (RapidTopicMessageStore) iterator.next();
FutureTask task = new FutureTask(new Callable() {
public Object call() throws Exception {
return ms.checkpoint(); return ms.checkpoint();
}}); }
});
futureTasks.add(task); futureTasks.add(task);
checkpointExecutor.execute(task); checkpointExecutor.execute(task);
} }catch(Exception e){
catch (Exception e) { log.error("Failed to checkpoint a message store: "+e,e);
log.error("Failed to checkpoint a message store: " + e, e);
} }
} }
try{
try { for(Iterator iter=futureTasks.iterator();iter.hasNext();){
for (Iterator iter = futureTasks.iterator(); iter.hasNext();) { FutureTask ft=(FutureTask)iter.next();
FutureTask ft = (FutureTask) iter.next(); RecordLocation mark=(RecordLocation)ft.get();
RecordLocation mark = (RecordLocation) ft.get();
// We only set a newMark on full checkpoints. // We only set a newMark on full checkpoints.
if( fullCheckpoint ) { if(fullCheckpoint){
if (mark != null && (newMark == null || newMark.compareTo(mark) < 0)) { if(mark!=null&&(newMark==null||newMark.compareTo(mark)<0)){
newMark = mark; newMark=mark;
} }
} }
} }
} catch (Throwable e) { }catch(Throwable e){
log.error("Failed to checkpoint a message store: " + e, e); log.error("Failed to checkpoint a message store: "+e,e);
} }
if(fullCheckpoint){
try{
if( fullCheckpoint ) { if(newMark!=null){
try { log.debug("Marking journal at: "+newMark);
if (newMark != null) { journal.setMark(newMark,true);
log.debug("Marking journal at: " + newMark);
journal.setMark(newMark, true);
} }
}catch(Exception e){
log.error("Failed to mark the Journal: "+e,e);
} }
catch (Exception e) { // TODO: do we need to implement a periodic clean up?
log.error("Failed to mark the Journal: " + e, e); // if (longTermPersistence instanceof JDBCPersistenceAdapter) {
} // // We may be check pointing more often than the checkpointInterval if under high use
// // But we don't want to clean up the db that often.
// TODO: do we need to implement a periodic clean up? // long now = System.currentTimeMillis();
// if( now > lastCleanup+checkpointInterval ) {
// if (longTermPersistence instanceof JDBCPersistenceAdapter) { // lastCleanup = now;
// // We may be check pointing more often than the checkpointInterval if under high use // ((JDBCPersistenceAdapter) longTermPersistence).cleanup();
// // But we don't want to clean up the db that often. // }
// long now = System.currentTimeMillis(); // }
// if( now > lastCleanup+checkpointInterval ) {
// lastCleanup = now;
// ((JDBCPersistenceAdapter) longTermPersistence).cleanup();
// }
// }
} }
log.debug("Checkpoint done."); log.debug("Checkpoint done.");
} }finally{
finally {
latch.countDown(); latch.countDown();
} }
synchronized(this) { synchronized(this){
return this.fullCheckPoint; return this.fullCheckPoint;
} }
} }
/** /**
@ -447,108 +418,95 @@ public class RapidPersistenceAdapter implements PersistenceAdapter, JournalEvent
* @return * @return
* @throws IOException * @throws IOException
*/ */
public DataStructure readCommand(RecordLocation location) throws IOException { public DataStructure readCommand(RecordLocation location) throws IOException{
try { try{
Packet data = journal.read(location); Packet data=journal.read(location);
return (DataStructure) wireFormat.unmarshal(toByteSequence(data)); return (DataStructure)wireFormat.unmarshal(toByteSequence(data));
} }catch(InvalidRecordLocationException e){
catch (InvalidRecordLocationException e) { throw createReadException(location,e);
throw createReadException(location, e); }catch(IOException e){
} throw createReadException(location,e);
catch (IOException e) {
throw createReadException(location, e);
} }
} }
/** /**
* Move all the messages that were in the journal into long term storage. We * Move all the messages that were in the journal into long term storage. We just replay and do a checkpoint.
* just replay and do a checkpoint.
* *
* @throws IOException * @throws IOException
* @throws IOException * @throws IOException
* @throws InvalidRecordLocationException * @throws InvalidRecordLocationException
* @throws IllegalStateException * @throws IllegalStateException
*/ */
private void recover() throws IllegalStateException, InvalidRecordLocationException, IOException, IOException { private void recover() throws IllegalStateException,InvalidRecordLocationException,IOException,IOException{
Location pos=null;
Location pos = null; int transactionCounter=0;
int transactionCounter = 0;
log.info("Journal Recovery Started."); log.info("Journal Recovery Started.");
ConnectionContext context = new ConnectionContext(); ConnectionContext context=new ConnectionContext();
// While we have records in the journal. // While we have records in the journal.
while ((pos = (Location) journal.getNextRecordLocation(pos)) != null) { while((pos=(Location)journal.getNextRecordLocation(pos))!=null){
Packet data = journal.read(pos); Packet data=journal.read(pos);
DataStructure c = (DataStructure) wireFormat.unmarshal(toByteSequence(data)); DataStructure c=(DataStructure)wireFormat.unmarshal(toByteSequence(data));
if(c instanceof Message){
if (c instanceof Message ) { Message message=(Message)c;
Message message = (Message) c; RapidMessageStore store=(RapidMessageStore)createMessageStore(message.getDestination());
RapidMessageStore store = (RapidMessageStore) createMessageStore(message.getDestination()); if(message.isInTransaction()){
if ( message.isInTransaction()) { transactionStore.addMessage(store,message,pos);
transactionStore.addMessage(store, message, pos); }else{
} store.replayAddMessage(context,message,pos);
else {
store.replayAddMessage(context, message, pos);
transactionCounter++; transactionCounter++;
} }
} else { }else{
switch (c.getDataStructureType()) { switch(c.getDataStructureType()){
case JournalQueueAck.DATA_STRUCTURE_TYPE: case JournalQueueAck.DATA_STRUCTURE_TYPE: {
{ JournalQueueAck command=(JournalQueueAck)c;
JournalQueueAck command = (JournalQueueAck) c; RapidMessageStore store=(RapidMessageStore)createMessageStore(command.getDestination());
RapidMessageStore store = (RapidMessageStore) createMessageStore(command.getDestination()); if(command.getMessageAck().isInTransaction()){
if (command.getMessageAck().isInTransaction()) { transactionStore.removeMessage(store,command.getMessageAck(),pos);
transactionStore.removeMessage(store, command.getMessageAck(), pos); }else{
} store.replayRemoveMessage(context,command.getMessageAck());
else {
store.replayRemoveMessage(context, command.getMessageAck());
transactionCounter++; transactionCounter++;
} }
} }
break; break;
case JournalTopicAck.DATA_STRUCTURE_TYPE: case JournalTopicAck.DATA_STRUCTURE_TYPE: {
{ JournalTopicAck command=(JournalTopicAck)c;
JournalTopicAck command = (JournalTopicAck) c; RapidTopicMessageStore store=(RapidTopicMessageStore)createMessageStore(command.getDestination());
RapidTopicMessageStore store = (RapidTopicMessageStore) createMessageStore(command.getDestination()); if(command.getTransactionId()!=null){
if (command.getTransactionId() != null) { transactionStore.acknowledge(store,command,pos);
transactionStore.acknowledge(store, command, pos); }else{
} store.replayAcknowledge(context,command.getClientId(),command.getSubscritionName(),command
else { .getMessageId());
store.replayAcknowledge(context, command.getClientId(), command.getSubscritionName(), command.getMessageId());
transactionCounter++; transactionCounter++;
} }
} }
break; break;
case JournalTransaction.DATA_STRUCTURE_TYPE: case JournalTransaction.DATA_STRUCTURE_TYPE: {
{ JournalTransaction command=(JournalTransaction)c;
JournalTransaction command = (JournalTransaction) c; try{
try {
// Try to replay the packet. // Try to replay the packet.
switch (command.getType()) { switch(command.getType()){
case JournalTransaction.XA_PREPARE: case JournalTransaction.XA_PREPARE:
transactionStore.replayPrepare(command.getTransactionId()); transactionStore.replayPrepare(command.getTransactionId());
break; break;
case JournalTransaction.XA_COMMIT: case JournalTransaction.XA_COMMIT:
case JournalTransaction.LOCAL_COMMIT: case JournalTransaction.LOCAL_COMMIT:
Tx tx = transactionStore.replayCommit(command.getTransactionId(), command.getWasPrepared()); Tx tx=transactionStore.replayCommit(command.getTransactionId(),command.getWasPrepared());
if (tx == null) if(tx==null)
break; // We may be trying to replay a commit that break; // We may be trying to replay a commit that
// was already committed. // was already committed.
// Replay the committed operations. // Replay the committed operations.
for (Iterator iter = tx.getOperations().iterator(); iter.hasNext();) { for(Iterator iter=tx.getOperations().iterator();iter.hasNext();){
TxOperation op = (TxOperation) iter.next(); TxOperation op=(TxOperation)iter.next();
if (op.operationType == TxOperation.ADD_OPERATION_TYPE) { if(op.operationType==TxOperation.ADD_OPERATION_TYPE){
op.store.replayAddMessage(context, (Message) op.data, op.location); op.store.replayAddMessage(context,(Message)op.data,op.location);
} }
if (op.operationType == TxOperation.REMOVE_OPERATION_TYPE) { if(op.operationType==TxOperation.REMOVE_OPERATION_TYPE){
op.store.replayRemoveMessage(context, (MessageAck) op.data); op.store.replayRemoveMessage(context,(MessageAck)op.data);
} }
if (op.operationType == TxOperation.ACK_OPERATION_TYPE) { if(op.operationType==TxOperation.ACK_OPERATION_TYPE){
JournalTopicAck ack = (JournalTopicAck) op.data; JournalTopicAck ack=(JournalTopicAck)op.data;
((RapidTopicMessageStore) op.store).replayAcknowledge(context, ack.getClientId(), ack.getSubscritionName(), ack ((RapidTopicMessageStore)op.store).replayAcknowledge(context,ack.getClientId(),ack
.getMessageId()); .getSubscritionName(),ack.getMessageId());
} }
} }
transactionCounter++; transactionCounter++;
@ -558,42 +516,39 @@ public class RapidPersistenceAdapter implements PersistenceAdapter, JournalEvent
transactionStore.replayRollback(command.getTransactionId()); transactionStore.replayRollback(command.getTransactionId());
break; break;
} }
} }catch(IOException e){
catch (IOException e) { log.error("Recovery Failure: Could not replay: "+c+", reason: "+e,e);
log.error("Recovery Failure: Could not replay: " + c + ", reason: " + e, e);
} }
} }
break; break;
case JournalTrace.DATA_STRUCTURE_TYPE: case JournalTrace.DATA_STRUCTURE_TYPE:
JournalTrace trace = (JournalTrace) c; JournalTrace trace=(JournalTrace)c;
log.debug("TRACE Entry: " + trace.getMessage()); log.debug("TRACE Entry: "+trace.getMessage());
break; break;
default: default:
log.error("Unknown type of record in transaction log which will be discarded: " + c); log.error("Unknown type of record in transaction log which will be discarded: "+c);
} }
} }
} }
RecordLocation location=writeTraceMessage("RECOVERED",true);
RecordLocation location = writeTraceMessage("RECOVERED", true); journal.setMark(location,true);
journal.setMark(location, true); log.info("Journal Recovered: "+transactionCounter+" message(s) in transactions recovered.");
log.info("Journal Recovered: " + transactionCounter + " message(s) in transactions recovered.");
} }
private IOException createReadException(RecordLocation location, Exception e) { private IOException createReadException(RecordLocation location,Exception e){
return IOExceptionSupport.create("Failed to read to journal for: " + location + ". Reason: " + e, e); return IOExceptionSupport.create("Failed to read to journal for: "+location+". Reason: "+e,e);
} }
protected IOException createWriteException(DataStructure packet, Exception e) { protected IOException createWriteException(DataStructure packet,Exception e){
return IOExceptionSupport.create("Failed to write to journal for: " + packet + ". Reason: " + e, e); return IOExceptionSupport.create("Failed to write to journal for: "+packet+". Reason: "+e,e);
} }
protected IOException createWriteException(String command, Exception e) { protected IOException createWriteException(String command,Exception e){
return IOExceptionSupport.create("Failed to write to journal for command: " + command + ". Reason: " + e, e); return IOExceptionSupport.create("Failed to write to journal for command: "+command+". Reason: "+e,e);
} }
protected IOException createRecoveryFailedException(Exception e) { protected IOException createRecoveryFailedException(Exception e){
return IOExceptionSupport.create("Failed to recover from journal. Reason: " + e, e); return IOExceptionSupport.create("Failed to recover from journal. Reason: "+e,e);
} }
/** /**
@ -603,85 +558,102 @@ public class RapidPersistenceAdapter implements PersistenceAdapter, JournalEvent
* @return * @return
* @throws IOException * @throws IOException
*/ */
public Location writeCommand(DataStructure command, boolean sync) throws IOException { public Location writeCommand(DataStructure command,boolean sync) throws IOException{
if( started.get() ) if(started.get())
return (Location) journal.write(toPacket(wireFormat.marshal(command)), sync); return (Location)journal.write(toPacket(wireFormat.marshal(command)),sync);
throw new IOException("closed"); throw new IOException("closed");
} }
private RecordLocation writeTraceMessage(String message, boolean sync) throws IOException { private RecordLocation writeTraceMessage(String message,boolean sync) throws IOException{
JournalTrace trace = new JournalTrace(); JournalTrace trace=new JournalTrace();
trace.setMessage(message); trace.setMessage(message);
return writeCommand(trace, sync); return writeCommand(trace,sync);
} }
public void onMemoryUseChanged(UsageManager memoryManager, int oldPercentUsage, int newPercentUsage) { public void onMemoryUseChanged(UsageManager memoryManager,int oldPercentUsage,int newPercentUsage){
if (newPercentUsage > 80 && oldPercentUsage < newPercentUsage) { if(newPercentUsage>80&&oldPercentUsage<newPercentUsage){
checkpoint(false, true); checkpoint(false,true);
} }
} }
public RapidTransactionStore getTransactionStore() { public RapidTransactionStore getTransactionStore(){
return transactionStore; return transactionStore;
} }
public void deleteAllMessages() throws IOException { public void deleteAllMessages() throws IOException{
try { try{
JournalTrace trace = new JournalTrace(); JournalTrace trace=new JournalTrace();
trace.setMessage("DELETED"); trace.setMessage("DELETED");
RecordLocation location = journal.write(toPacket(wireFormat.marshal(trace)), false); RecordLocation location=journal.write(toPacket(wireFormat.marshal(trace)),false);
journal.setMark(location, true); journal.setMark(location,true);
log.info("Journal deleted: "); log.info("Journal deleted: ");
} catch (IOException e) { }catch(IOException e){
throw e; throw e;
} catch (Throwable e) { }catch(Throwable e){
throw IOExceptionSupport.create(e); throw IOExceptionSupport.create(e);
} }
if(store!=null){ if(store!=null){
store.delete(); if(store.isInitialized()){
store.clear();
}else{
store.delete();
}
} }
} }
public int getMaxCheckpointMessageAddSize() { public int getMaxCheckpointMessageAddSize(){
return maxCheckpointMessageAddSize; return maxCheckpointMessageAddSize;
} }
public void setMaxCheckpointMessageAddSize(int maxCheckpointMessageAddSize) { public void setMaxCheckpointMessageAddSize(int maxCheckpointMessageAddSize){
this.maxCheckpointMessageAddSize = maxCheckpointMessageAddSize; this.maxCheckpointMessageAddSize=maxCheckpointMessageAddSize;
} }
public int getMaxCheckpointWorkers() { public int getMaxCheckpointWorkers(){
return maxCheckpointWorkers; return maxCheckpointWorkers;
} }
public void setMaxCheckpointWorkers(int maxCheckpointWorkers) { public void setMaxCheckpointWorkers(int maxCheckpointWorkers){
this.maxCheckpointWorkers = maxCheckpointWorkers; this.maxCheckpointWorkers=maxCheckpointWorkers;
} }
public boolean isUseExternalMessageReferences() { public boolean isUseExternalMessageReferences(){
return false; return false;
} }
public void setUseExternalMessageReferences(boolean enable) { public void setUseExternalMessageReferences(boolean enable){
if( enable ) if(enable)
throw new IllegalArgumentException("The journal does not support message references."); throw new IllegalArgumentException("The journal does not support message references.");
} }
public void setUsageManager(UsageManager usageManager) { public void setUsageManager(UsageManager usageManager){
} }
public Store getStore() { public Store getStore(){
return store; return store;
} }
public Packet toPacket(ByteSequence sequence) { /**
return new ByteArrayPacket(new org.apache.activeio.packet.ByteSequence(sequence.data, sequence.offset, sequence.length)); * @return the maximumDestinationCacheSize
} */
public int getMaximumDestinationCacheSize(){
public ByteSequence toByteSequence(Packet packet) { return this.maximumDestinationCacheSize;
org.apache.activeio.packet.ByteSequence sequence = packet.asByteSequence();
return new ByteSequence(sequence.getData(), sequence.getOffset(), sequence.getLength());
} }
/**
* @param maximumDestinationCacheSize the maximumDestinationCacheSize to set
*/
public void setMaximumDestinationCacheSize(int maximumDestinationCacheSize){
this.maximumDestinationCacheSize=maximumDestinationCacheSize;
}
public Packet toPacket(ByteSequence sequence){
return new ByteArrayPacket(new org.apache.activeio.packet.ByteSequence(sequence.data,sequence.offset,
sequence.length));
}
public ByteSequence toByteSequence(Packet packet){
org.apache.activeio.packet.ByteSequence sequence=packet.asByteSequence();
return new ByteSequence(sequence.getData(),sequence.getOffset(),sequence.getLength());
}
} }

View File

@ -15,32 +15,27 @@
package org.apache.activemq.store.rapid; package org.apache.activemq.store.rapid;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator; import java.util.Iterator;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activeio.journal.active.Location; import org.apache.activeio.journal.active.Location;
import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQTopic; import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.JournalTopicAck;
import org.apache.activemq.command.Message; import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageId; import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.SubscriptionInfo; import org.apache.activemq.command.SubscriptionInfo;
import org.apache.activemq.kaha.ListContainer; import org.apache.activemq.kaha.ListContainer;
import org.apache.activemq.kaha.MapContainer; import org.apache.activemq.kaha.MapContainer;
import org.apache.activemq.kaha.Marshaller; import org.apache.activemq.kaha.Marshaller;
import org.apache.activemq.kaha.Store; import org.apache.activemq.kaha.Store;
import org.apache.activemq.kaha.StringMarshaller; import org.apache.activemq.kaha.StoreEntry;
import org.apache.activemq.store.MessageRecoveryListener; import org.apache.activemq.store.MessageRecoveryListener;
import org.apache.activemq.store.TopicMessageStore; import org.apache.activemq.store.TopicMessageStore;
import org.apache.activemq.transaction.Synchronization; import org.apache.activemq.store.kahadaptor.ConsumerMessageRef;
import org.apache.activemq.util.SubscriptionKey; import org.apache.activemq.store.kahadaptor.ConsumerMessageRefMarshaller;
import org.apache.commons.logging.Log; import org.apache.activemq.store.kahadaptor.TopicSubAck;
import org.apache.commons.logging.LogFactory; import org.apache.activemq.store.kahadaptor.TopicSubContainer;
/** /**
@ -50,78 +45,61 @@ import org.apache.commons.logging.LogFactory;
*/ */
public class RapidTopicMessageStore extends RapidMessageStore implements TopicMessageStore{ public class RapidTopicMessageStore extends RapidMessageStore implements TopicMessageStore{
private static final Log log=LogFactory.getLog(RapidTopicMessageStore.class); private ListContainer ackContainer;
private HashMap ackedLastAckLocations=new HashMap(); private Map subscriberContainer;
private final MapContainer subscriberContainer; private Store store;
private final MapContainer ackContainer; private Map subscriberMessages=new ConcurrentHashMap();
private final Store store;
private Map subscriberAcks=new ConcurrentHashMap();
public RapidTopicMessageStore(RapidPersistenceAdapter adapter,ActiveMQTopic destination, public RapidTopicMessageStore(RapidPersistenceAdapter adapter, Store store,ListContainer messageContainer,ListContainer ackContainer,
MapContainer messageContainer,MapContainer subsContainer,MapContainer ackContainer) throws IOException{ MapContainer subsContainer,ActiveMQDestination destination,int maximumCacheSize) throws IOException{
super(adapter,destination,messageContainer); super(adapter,destination,messageContainer,maximumCacheSize);
this.subscriberContainer=subsContainer; this.store=store;
this.ackContainer=ackContainer; this.ackContainer=ackContainer;
this.store=adapter.getStore(); subscriberContainer=subsContainer;
// load all the Ack containers
for(Iterator i=subscriberContainer.keySet().iterator();i.hasNext();){ for(Iterator i=subscriberContainer.keySet().iterator();i.hasNext();){
Object key=i.next(); Object key=i.next();
addSubscriberAckContainer(key); addSubscriberMessageContainer(key);
} }
} }
public void recoverSubscription(String clientId,String subscriptionName,final MessageRecoveryListener listener) public synchronized void addMessage(ConnectionContext context,Message message) throws IOException{
throws Exception{ int subscriberCount=subscriberMessages.size();
String key=getSubscriptionKey(clientId,subscriptionName); if(subscriberCount>0){
ListContainer list=(ListContainer)subscriberAcks.get(key); final Location location = peristenceAdapter.writeCommand(message, message.isResponseRequired());
if(list!=null){ final RapidMessageReference md = new RapidMessageReference(message, location);
for(Iterator i=list.iterator();i.hasNext();){ StoreEntry messageEntry=messageContainer.placeLast(md);
Object msg=messageContainer.get(i.next()); TopicSubAck tsa=new TopicSubAck();
if(msg!=null){ tsa.setCount(subscriberCount);
if(msg.getClass()==String.class){ tsa.setMessageEntry(messageEntry);
listener.recoverMessageReference((String)msg); StoreEntry ackEntry=ackContainer.placeLast(tsa);
}else{ for(Iterator i=subscriberMessages.values().iterator();i.hasNext();){
listener.recoverMessage((Message)msg); TopicSubContainer container=(TopicSubContainer)i.next();
} ConsumerMessageRef ref=new ConsumerMessageRef();
} ref.setAckEntry(ackEntry);
listener.finished(); ref.setMessageEntry(messageEntry);
container.getListContainer().add(ref);
} }
}else{
listener.finished();
} }
} }
public void recoverNextMessages(String clientId,String subscriptionName,MessageId lastMessageId,int maxReturned, public synchronized void acknowledge(ConnectionContext context,String clientId,String subscriptionName,
MessageRecoveryListener listener) throws Exception{ MessageId messageId) throws IOException{
String key=getSubscriptionKey(clientId,subscriptionName); String subcriberId=getSubscriptionKey(clientId,subscriptionName);
ListContainer list=(ListContainer)subscriberAcks.get(key); TopicSubContainer container=(TopicSubContainer)subscriberMessages.get(subcriberId);
if(list!=null){ if(container!=null){
boolean startFound=false; ConsumerMessageRef ref=(ConsumerMessageRef)container.getListContainer().removeFirst();
int count=0; if(ref!=null){
for(Iterator i=list.iterator();i.hasNext()&&count<maxReturned;){ TopicSubAck tsa=(TopicSubAck)ackContainer.get(ref.getAckEntry());
Object msg=messageContainer.get(i.next()); if(tsa!=null){
if(msg!=null){ if(tsa.decrementCount()<=0){
if(msg.getClass()==String.class){ ackContainer.remove(ref.getAckEntry());
String ref=msg.toString(); messageContainer.remove(tsa.getMessageEntry());
if(startFound||lastMessageId==null){
listener.recoverMessageReference(ref);
count++;
}else if(!startFound||ref.equals(lastMessageId.toString())){
startFound=true;
}
}else{ }else{
Message message=(Message)msg; ackContainer.update(ref.getAckEntry(),tsa);
if(startFound||lastMessageId==null){
listener.recoverMessage(message);
count++;
}else if(!startFound&&message.getMessageId().equals(lastMessageId)){
startFound=true;
}
} }
} }
listener.finished();
} }
}else{
listener.finished();
} }
} }
@ -129,7 +107,7 @@ public class RapidTopicMessageStore extends RapidMessageStore implements TopicMe
return (SubscriptionInfo)subscriberContainer.get(getSubscriptionKey(clientId,subscriptionName)); return (SubscriptionInfo)subscriberContainer.get(getSubscriptionKey(clientId,subscriptionName));
} }
public void addSubsciption(String clientId,String subscriptionName,String selector,boolean retroactive) public synchronized void addSubsciption(String clientId,String subscriptionName,String selector,boolean retroactive)
throws IOException{ throws IOException{
SubscriptionInfo info=new SubscriptionInfo(); SubscriptionInfo info=new SubscriptionInfo();
info.setDestination(destination); info.setDestination(destination);
@ -142,197 +120,186 @@ public class RapidTopicMessageStore extends RapidMessageStore implements TopicMe
if(!subscriberContainer.containsKey(key)){ if(!subscriberContainer.containsKey(key)){
subscriberContainer.put(key,info); subscriberContainer.put(key,info);
} }
addSubscriberAckContainer(key); addSubscriberMessageContainer(key);
} }
public synchronized void addMessage(ConnectionContext context,Message message) throws IOException{ public synchronized void deleteSubscription(String clientId,String subscriptionName){
int subscriberCount=subscriberAcks.size(); String key=getSubscriptionKey(clientId,subscriptionName);
if(subscriberCount>0){ subscriberContainer.remove(key);
String id=message.getMessageId().toString(); TopicSubContainer container=(TopicSubContainer)subscriberMessages.get(key);
ackContainer.put(id,new AtomicInteger(subscriberCount)); for(Iterator i=container.getListContainer().iterator();i.hasNext();){
for(Iterator i=subscriberAcks.keySet().iterator();i.hasNext();){ ConsumerMessageRef ref=(ConsumerMessageRef)i.next();
Object key=i.next(); if(ref!=null){
ListContainer container=store.getListContainer(key,"durable-subs"); TopicSubAck tsa=(TopicSubAck)ackContainer.get(ref.getAckEntry());
container.add(id); if(tsa!=null){
} if(tsa.decrementCount()<=0){
super.addMessage(context,message); ackContainer.remove(ref.getAckEntry());
} messageContainer.remove(tsa.getMessageEntry());
}
/**
*/
public void acknowledge(ConnectionContext context,String clientId,String subscriptionName,final MessageId messageId)
throws IOException{
final boolean debug=log.isDebugEnabled();
JournalTopicAck ack=new JournalTopicAck();
ack.setDestination(destination);
ack.setMessageId(messageId);
ack.setMessageSequenceId(messageId.getBrokerSequenceId());
ack.setSubscritionName(subscriptionName);
ack.setClientId(clientId);
ack.setTransactionId(context.getTransaction()!=null?context.getTransaction().getTransactionId():null);
final Location location=peristenceAdapter.writeCommand(ack,false);
final SubscriptionKey key=new SubscriptionKey(clientId,subscriptionName);
if(!context.isInTransaction()){
if(debug)
log.debug("Journalled acknowledge for: "+messageId+", at: "+location);
acknowledge(messageId,location,key);
}else{
if(debug)
log.debug("Journalled transacted acknowledge for: "+messageId+", at: "+location);
synchronized(this){
inFlightTxLocations.add(location);
}
transactionStore.acknowledge(this,ack,location);
context.getTransaction().addSynchronization(new Synchronization(){
public void afterCommit() throws Exception{
if(debug)
log.debug("Transacted acknowledge commit for: "+messageId+", at: "+location);
synchronized(RapidTopicMessageStore.this){
inFlightTxLocations.remove(location);
acknowledge(messageId,location,key);
}
}
public void afterRollback() throws Exception{
if(debug)
log.debug("Transacted acknowledge rollback for: "+messageId+", at: "+location);
synchronized(RapidTopicMessageStore.this){
inFlightTxLocations.remove(location);
}
}
});
}
}
public void replayAcknowledge(ConnectionContext context,String clientId,String subscritionName,MessageId messageId){
try{
synchronized(this){
String subcriberId=getSubscriptionKey(clientId,subscritionName);
String id=messageId.toString();
ListContainer container=(ListContainer)subscriberAcks.get(subcriberId);
if(container!=null){
// container.remove(id);
container.removeFirst();
AtomicInteger count=(AtomicInteger)ackContainer.remove(id);
if(count!=null){
if(count.decrementAndGet()>0){
ackContainer.put(id,count);
}else{
// no more references to message messageContainer so remove it
messageContainer.remove(messageId.toString());
}
}
}
}
}catch(Throwable e){
log.debug("Could not replay acknowledge for message '"+messageId
+"'. Message may have already been acknowledged. reason: "+e);
}
}
/**
* @param messageId
* @param location
* @param key
*/
private void acknowledge(MessageId messageId,Location location,SubscriptionKey key){
synchronized(this){
lastLocation=location;
ackedLastAckLocations.put(key,messageId);
String subcriberId=getSubscriptionKey(key.getClientId(),key.getSubscriptionName());
String id=messageId.toString();
ListContainer container=(ListContainer)subscriberAcks.get(subcriberId);
if(container!=null){
// container.remove(id);
container.removeFirst();
AtomicInteger count=(AtomicInteger)ackContainer.remove(id);
if(count!=null){
if(count.decrementAndGet()>0){
ackContainer.put(id,count);
}else{ }else{
// no more references to message messageContainer so remove it ackContainer.update(ref.getAckEntry(),tsa);
messageContainer.remove(messageId.toString());
} }
} }
} }
} }
} }
public void recoverSubscription(String clientId,String subscriptionName,MessageRecoveryListener listener)
throws Exception{
String key=getSubscriptionKey(clientId,subscriptionName);
TopicSubContainer container=(TopicSubContainer)subscriberMessages.get(key);
if(container!=null){
for(Iterator i=container.getListContainer().iterator();i.hasNext();){
ConsumerMessageRef ref=(ConsumerMessageRef)i.next();
RapidMessageReference messageReference=(RapidMessageReference)messageContainer.get(ref
.getMessageEntry());
if(messageReference!=null){
Message m=(Message)peristenceAdapter.readCommand(messageReference.getLocation());
listener.recoverMessage(m);
}
}
}
listener.finished();
}
public void recoverNextMessages(String clientId,String subscriptionName,int maxReturned,
MessageRecoveryListener listener) throws Exception{
String key=getSubscriptionKey(clientId,subscriptionName);
TopicSubContainer container=(TopicSubContainer)subscriberMessages.get(key);
if(container!=null){
int count=0;
StoreEntry entry=container.getBatchEntry();
if(entry==null){
entry=container.getListContainer().getFirst();
}else{
entry=container.getListContainer().refresh(entry);
entry=container.getListContainer().getNext(entry);
}
if(entry!=null){
do{
ConsumerMessageRef consumerRef=(ConsumerMessageRef)container.getListContainer().get(entry);
RapidMessageReference messageReference=(RapidMessageReference)messageContainer.get(consumerRef
.getMessageEntry());
if(messageReference!=null){
Message m=(Message)peristenceAdapter.readCommand(messageReference.getLocation());
listener.recoverMessage(m);
count++;
}
container.setBatchEntry(entry);
entry=container.getListContainer().getNext(entry);
}while(entry!=null&&count<maxReturned);
}
}
listener.finished();
}
public SubscriptionInfo[] getAllSubscriptions() throws IOException{
return (SubscriptionInfo[])subscriberContainer.values().toArray(
new SubscriptionInfo[subscriberContainer.size()]);
}
protected String getSubscriptionKey(String clientId,String subscriberName){ protected String getSubscriptionKey(String clientId,String subscriberName){
String result=clientId+":"; String result=clientId+":";
result+=subscriberName!=null?subscriberName:"NOT_SET"; result+=subscriberName!=null?subscriberName:"NOT_SET";
return result; return result;
} }
public Location checkpoint() throws IOException{ protected void addSubscriberMessageContainer(Object key) throws IOException{
ArrayList cpAckedLastAckLocations; ListContainer container=store.getListContainer(key,"topic-subs");
// swap out the hash maps.. Marshaller marshaller=new ConsumerMessageRefMarshaller();
synchronized(this){ container.setMarshaller(marshaller);
cpAckedLastAckLocations=new ArrayList(this.ackedLastAckLocations.values()); TopicSubContainer tsc=new TopicSubContainer(container);
this.ackedLastAckLocations=new HashMap(); subscriberMessages.put(key,tsc);
}
Location rc=super.checkpoint();
if(!cpAckedLastAckLocations.isEmpty()){
Collections.sort(cpAckedLastAckLocations);
Location t=(Location)cpAckedLastAckLocations.get(0);
if(rc==null||t.compareTo(rc)<0){
rc=t;
}
}
return rc;
} }
public void deleteSubscription(String clientId,String subscriptionName) throws IOException{ public int getMessageCount(String clientId,String subscriberName) throws IOException{
String key=getSubscriptionKey(clientId,subscriberName);
TopicSubContainer container=(TopicSubContainer)subscriberMessages.get(key);
return container.getListContainer().size();
}
/**
* @param context
* @param messageId
* @param expirationTime
* @param messageRef
* @throws IOException
* @see org.apache.activemq.store.MessageStore#addMessageReference(org.apache.activemq.broker.ConnectionContext,
* org.apache.activemq.command.MessageId, long, java.lang.String)
*/
public void addMessageReference(ConnectionContext context,MessageId messageId,long expirationTime,String messageRef)
throws IOException{
throw new IOException("Not supported");
}
/**
* @param identity
* @return String
* @throws IOException
* @see org.apache.activemq.store.MessageStore#getMessageReference(org.apache.activemq.command.MessageId)
*/
public String getMessageReference(MessageId identity) throws IOException{
return null;
}
/**
* @param context
* @throws IOException
* @see org.apache.activemq.store.MessageStore#removeAllMessages(org.apache.activemq.broker.ConnectionContext)
*/
public synchronized void removeAllMessages(ConnectionContext context) throws IOException{
messageContainer.clear();
ackContainer.clear();
for(Iterator i=subscriberMessages.values().iterator();i.hasNext();){
TopicSubContainer container=(TopicSubContainer)i.next();
container.getListContainer().clear();
}
}
public synchronized void resetBatching(String clientId,String subscriptionName){
String key=getSubscriptionKey(clientId,subscriptionName); String key=getSubscriptionKey(clientId,subscriptionName);
subscriberContainer.remove(key); TopicSubContainer topicSubContainer=(TopicSubContainer)subscriberMessages.get(key);
ListContainer list=(ListContainer)subscriberAcks.get(key); if(topicSubContainer!=null){
for(Iterator i=list.iterator();i.hasNext();){ topicSubContainer.reset();
String id=i.next().toString(); }
AtomicInteger count=(AtomicInteger)ackContainer.remove(id); }
if(count!=null){
if(count.decrementAndGet()>0){
ackContainer.put(id,count); public Location checkpoint() throws IOException{
}else{ return null;
// no more references to message messageContainer so remove it }
messageContainer.remove(id);
public synchronized void replayAcknowledge(ConnectionContext context,String clientId,String subscriptionName,MessageId messageId){
String subcriberId=getSubscriptionKey(clientId,subscriptionName);
TopicSubContainer container=(TopicSubContainer)subscriberMessages.get(subcriberId);
if(container!=null){
ConsumerMessageRef ref=(ConsumerMessageRef)container.getListContainer().removeFirst();
if(ref!=null){
TopicSubAck tsa=(TopicSubAck)ackContainer.get(ref.getAckEntry());
if(tsa!=null){
if(tsa.decrementCount()<=0){
ackContainer.remove(ref.getAckEntry());
messageContainer.remove(tsa.getMessageEntry());
}else{
ackContainer.update(ref.getAckEntry(),tsa);
}
} }
} }
} }
} }
}
public SubscriptionInfo[] getAllSubscriptions() throws IOException{
return (SubscriptionInfo[])subscriberContainer.values().toArray(
new SubscriptionInfo[subscriberContainer.size()]);
}
protected void addSubscriberAckContainer(Object key) throws IOException{
ListContainer container=store.getListContainer(key,"durable-subs");
Marshaller marshaller=new StringMarshaller();
container.setMarshaller(marshaller);
subscriberAcks.put(key,container);
}
public int getMessageCount(String clientId,String subscriberName) throws IOException{
String key=getSubscriptionKey(clientId,subscriberName);
ListContainer list=(ListContainer)subscriberAcks.get(key);
return list.size();
}
public void resetBatching(String clientId,String subscriptionName,MessageId nextId){
}
public void recoverNextMessages(String clientId,String subscriptionName,int maxReturned,MessageRecoveryListener listener) throws Exception{
}
public void resetBatching(String clientId,String subscriptionName){
}
}

View File

@ -0,0 +1,45 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.perf;
import java.io.File;
import org.apache.activeio.journal.active.JournalImpl;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.store.rapid.RapidPersistenceAdapter;
/**
* @version $Revision: 1.3 $
*/
public class RapidStoreQueueTest extends SimpleQueueTest{
protected void configureBroker(BrokerService answer) throws Exception{
File dataFileDir = new File("activemq-data/perfTest");
File journalDir = new File(dataFileDir, "journal").getCanonicalFile();
JournalImpl journal = new JournalImpl(journalDir, 3, 1024*1024*20);
RapidPersistenceAdapter adaptor = new RapidPersistenceAdapter(journal,answer.getTaskRunnerFactory());
answer.setPersistenceAdapter(adaptor);
answer.addConnector(bindAddress);
answer.setDeleteAllMessagesOnStartup(true);
}
}