From 878b9645ce9b0538816c0dca715843c87496f4ec Mon Sep 17 00:00:00 2001 From: Robert Davies Date: Tue, 21 Nov 2006 08:11:03 +0000 Subject: [PATCH] Updated Rapid Persistence Adaptor to do batching for cursors git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@477567 13f79535-47bb-0310-9956-ffa450edef68 --- .../store/kahadaptor/KahaMessageStore.java | 7 +- .../kahadaptor/KahaTopicMessageStore.java | 87 +-- .../store/kahadaptor/TopicSubContainer.java | 14 +- .../store/rapid/RapidMessageReference.java | 4 + .../RapidMessageReferenceMarshaller.java | 46 ++ .../store/rapid/RapidMessageStore.java | 159 ++-- .../store/rapid/RapidPersistenceAdapter.java | 694 +++++++++--------- .../store/rapid/RapidTopicMessageStore.java | 441 +++++------ .../activemq/perf/RapidStoreQueueTest.java | 45 ++ 9 files changed, 764 insertions(+), 733 deletions(-) create mode 100644 activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidMessageReferenceMarshaller.java create mode 100644 activemq-core/src/test/java/org/apache/activemq/perf/RapidStoreQueueTest.java diff --git a/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaMessageStore.java b/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaMessageStore.java index f5c34c8574..b4cfb02cc1 100755 --- a/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaMessageStore.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaMessageStore.java @@ -91,7 +91,7 @@ public class KahaMessageStore implements MessageStore, UsageListener{ Message msg=(Message)messageContainer.get(entry); if(msg.getMessageId().equals(identity)){ result=msg; - cache.put(identity,msg); + cache.put(identity,entry); break; } } @@ -186,7 +186,7 @@ public class KahaMessageStore implements MessageStore, UsageListener{ * @throws Exception * @see org.apache.activemq.store.MessageStore#recoverNextMessages(org.apache.activemq.command.MessageId, int, org.apache.activemq.store.MessageRecoveryListener) */ - public void recoverNextMessages(int maxReturned,MessageRecoveryListener listener) throws Exception{ + public synchronized void recoverNextMessages(int maxReturned,MessageRecoveryListener listener) throws Exception{ StoreEntry entry = batchEntry; if (entry == null) { entry= messageContainer.getFirst(); @@ -239,10 +239,9 @@ public class KahaMessageStore implements MessageStore, UsageListener{ * @see org.apache.activemq.memory.UsageListener#onMemoryUseChanged(org.apache.activemq.memory.UsageManager, int, int) */ public synchronized void onMemoryUseChanged(UsageManager memoryManager,int oldPercentUsage,int newPercentUsage){ - if (newPercentUsage == 100) { + if(newPercentUsage==100){ cache.clear(); } - } } diff --git a/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicMessageStore.java b/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicMessageStore.java index 0ba3a33d66..252aafcd5f 100755 --- a/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicMessageStore.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicMessageStore.java @@ -32,6 +32,7 @@ import org.apache.activemq.kaha.StoreEntry; import org.apache.activemq.memory.UsageManager; import org.apache.activemq.store.MessageRecoveryListener; import org.apache.activemq.store.TopicMessageStore; +import org.apache.activemq.store.rapid.RapidMessageReference; /** * @version $Revision: 1.5 $ @@ -149,11 +150,9 @@ public class KahaTopicMessageStore extends KahaMessageStore implements TopicMess listener.recoverMessage((Message)msg); } } - listener.finished(); } - }else{ - listener.finished(); } + listener.finished(); } public void recoverNextMessages(String clientId,String subscriptionName,int maxReturned, @@ -236,31 +235,7 @@ public class KahaTopicMessageStore extends KahaMessageStore implements TopicMess messageContainer.add(messageRef); } - /** - * @return the destination - * @see org.apache.activemq.store.MessageStore#getDestination() - */ - public ActiveMQDestination getDestination(){ - return destination; - } - - /** - * @param identity - * @return the Message - * @throws IOException - * @see org.apache.activemq.store.MessageStore#getMessage(org.apache.activemq.command.MessageId) - */ - public Message getMessage(MessageId identity) throws IOException{ - Message result=null; - for(Iterator i=messageContainer.iterator();i.hasNext();){ - Message msg=(Message)i.next(); - if(msg.getMessageId().equals(identity)){ - result=msg; - break; - } - } - return result; - } + /** * @param identity @@ -272,22 +247,7 @@ public class KahaTopicMessageStore extends KahaMessageStore implements TopicMess return null; } - /** - * @throws Exception - * @see org.apache.activemq.store.MessageStore#recover(org.apache.activemq.store.MessageRecoveryListener) - */ - public void recover(MessageRecoveryListener listener) throws Exception{ - for(Iterator iter=messageContainer.iterator();iter.hasNext();){ - Object msg=iter.next(); - if(msg.getClass()==String.class){ - listener.recoverMessageReference((String)msg); - }else{ - listener.recoverMessage((Message)msg); - } - } - listener.finished(); - } - + /** * @param context * @throws IOException @@ -302,23 +262,7 @@ public class KahaTopicMessageStore extends KahaMessageStore implements TopicMess } } - /** - * @param context - * @param ack - * @throws IOException - * @see org.apache.activemq.store.MessageStore#removeMessage(org.apache.activemq.broker.ConnectionContext, - * org.apache.activemq.command.MessageAck) - */ - public void removeMessage(ConnectionContext context,MessageAck ack) throws IOException{ - for(Iterator i=messageContainer.iterator();i.hasNext();){ - Message msg=(Message)i.next(); - if(msg.getMessageId().equals(ack.getLastMessageId())){ - i.remove(); - break; - } - } - } - + public synchronized void resetBatching(String clientId,String subscriptionName){ String key=getSubscriptionKey(clientId,subscriptionName); TopicSubContainer topicSubContainer=(TopicSubContainer)subscriberMessages.get(key); @@ -326,25 +270,4 @@ public class KahaTopicMessageStore extends KahaMessageStore implements TopicMess topicSubContainer.reset(); } } - - - public MessageId getNextMessageIdToDeliver(String clientId,String subscriptionName) throws IOException{ - // TODO Auto-generated method stub - return null; - } - - /** - * @param clientId - * @param subscriptionName - * @param id - * @return previous messageId - * @throws IOException - * @see org.apache.activemq.store.TopicMessageStore#getPreviousMessageIdToDeliver(java.lang.String, - * java.lang.String, org.apache.activemq.command.MessageId) - */ - public MessageId getPreviousMessageIdToDeliver(String clientId,String subscriptionName,MessageId id) - throws IOException{ - // TODO Auto-generated method stub - return null; - } } diff --git a/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/TopicSubContainer.java b/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/TopicSubContainer.java index b8b68167df..cb732603d1 100644 --- a/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/TopicSubContainer.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/TopicSubContainer.java @@ -22,43 +22,43 @@ import org.apache.activemq.kaha.StoreEntry; * * @version $Revision: 1.10 $ */ - class TopicSubContainer{ + public class TopicSubContainer{ private ListContainer listContainer; private StoreEntry batchEntry; - TopicSubContainer(ListContainer container){ + public TopicSubContainer(ListContainer container){ this.listContainer = container; } /** * @return the batchEntry */ - StoreEntry getBatchEntry(){ + public StoreEntry getBatchEntry(){ return this.batchEntry; } /** * @param batchEntry the batchEntry to set */ - void setBatchEntry(StoreEntry batchEntry){ + public void setBatchEntry(StoreEntry batchEntry){ this.batchEntry=batchEntry; } /** * @return the listContainer */ - ListContainer getListContainer(){ + public ListContainer getListContainer(){ return this.listContainer; } /** * @param listContainer the listContainer to set */ - void setListContainer(ListContainer container){ + public void setListContainer(ListContainer container){ this.listContainer=container; } - void reset() { + public void reset() { batchEntry = null; } diff --git a/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidMessageReference.java b/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidMessageReference.java index c57b95e942..530dffb9b5 100644 --- a/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidMessageReference.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidMessageReference.java @@ -26,6 +26,10 @@ public class RapidMessageReference { public final MessageId messageId; public final Location location; + public RapidMessageReference(MessageId messageId, Location location) { + this.messageId = messageId; + this.location=location; + } public RapidMessageReference(Message message, Location location) { this.messageId = message.getMessageId(); this.location=location; diff --git a/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidMessageReferenceMarshaller.java b/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidMessageReferenceMarshaller.java new file mode 100644 index 0000000000..320aaee266 --- /dev/null +++ b/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidMessageReferenceMarshaller.java @@ -0,0 +1,46 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.store.rapid; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import org.apache.activeio.journal.active.Location; +import org.apache.activemq.command.MessageId; +import org.apache.activemq.kaha.Marshaller; + +public class RapidMessageReferenceMarshaller implements Marshaller{ + + + + public Object readPayload(DataInput dataIn) throws IOException{ + MessageId mid = new MessageId(dataIn.readUTF()); + Location loc = new Location(dataIn.readInt(),dataIn.readInt()); + RapidMessageReference rmr = new RapidMessageReference(mid,loc); + return rmr; + } + + public void writePayload(Object object,DataOutput dataOut) throws IOException{ + RapidMessageReference rmr = (RapidMessageReference)object; + dataOut.writeUTF(rmr.getMessageId().toString()); + dataOut.writeInt(rmr.getLocation().getLogFileId()); + dataOut.writeInt(rmr.getLocation().getLogFileOffset()); + + } +} diff --git a/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidMessageStore.java b/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidMessageStore.java index 9b251554b6..cf88e75ff9 100755 --- a/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidMessageStore.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidMessageStore.java @@ -30,11 +30,15 @@ import org.apache.activemq.command.JournalQueueAck; import org.apache.activemq.command.Message; import org.apache.activemq.command.MessageAck; import org.apache.activemq.command.MessageId; +import org.apache.activemq.kaha.ListContainer; import org.apache.activemq.kaha.MapContainer; +import org.apache.activemq.kaha.StoreEntry; +import org.apache.activemq.memory.UsageListener; import org.apache.activemq.memory.UsageManager; import org.apache.activemq.store.MessageRecoveryListener; import org.apache.activemq.store.MessageStore; import org.apache.activemq.transaction.Synchronization; +import org.apache.activemq.util.LRUCache; import org.apache.activemq.util.TransactionTemplate; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -44,31 +48,42 @@ import org.apache.commons.logging.LogFactory; * * @version $Revision: 1.14 $ */ -public class RapidMessageStore implements MessageStore { +public class RapidMessageStore implements MessageStore, UsageListener { private static final Log log = LogFactory.getLog(RapidMessageStore.class); protected final RapidPersistenceAdapter peristenceAdapter; protected final RapidTransactionStore transactionStore; - protected final MapContainer messageContainer; + protected final ListContainer messageContainer; protected final ActiveMQDestination destination; protected final TransactionTemplate transactionTemplate; - -// private LinkedHashMap messages = new LinkedHashMap(); -// private ArrayList messageAcks = new ArrayList(); - -// /** A MessageStore that we can use to retrieve messages quickly. */ -// private LinkedHashMap cpAddedMessageIds; + protected final LRUCache cache; + protected UsageManager usageManager; + protected StoreEntry batchEntry = null; + + protected Location lastLocation; protected HashSet inFlightTxLocations = new HashSet(); - public RapidMessageStore(RapidPersistenceAdapter adapter, ActiveMQDestination destination, MapContainer container) { + public RapidMessageStore(RapidPersistenceAdapter adapter, ActiveMQDestination destination, ListContainer container, int maximumCacheSize) { this.peristenceAdapter = adapter; this.transactionStore = adapter.getTransactionStore(); this.messageContainer = container; this.destination = destination; this.transactionTemplate = new TransactionTemplate(adapter, new ConnectionContext()); + this.cache=new LRUCache(maximumCacheSize,maximumCacheSize,0.75f,false); +// populate the cache + StoreEntry entry=messageContainer.getFirst(); + int count = 0; + if(entry!=null){ + do{ + RapidMessageReference msg = (RapidMessageReference)messageContainer.get(entry); + cache.put(msg.getMessageId(),entry); + entry = messageContainer.getNext(entry); + count++; + }while(entry!=null && count < maximumCacheSize); + } } @@ -76,7 +91,7 @@ public class RapidMessageStore implements MessageStore { * Not synchronized since the Journal has better throughput if you increase * the number of concurrent writes that it is doing. */ - public void addMessage(ConnectionContext context, final Message message) throws IOException { + public synchronized void addMessage(ConnectionContext context, final Message message) throws IOException { final MessageId id = message.getMessageId(); @@ -118,12 +133,9 @@ public class RapidMessageStore implements MessageStore { } } - private void addMessage(final RapidMessageReference messageReference) { - synchronized (this) { - lastLocation = messageReference.getLocation(); - MessageId id = messageReference.getMessageId(); - messageContainer.put(id.toString(), messageReference); - } + private synchronized void addMessage(final RapidMessageReference messageReference){ + StoreEntry item=messageContainer.placeLast(messageReference); + cache.put(messageReference.getMessageId(),item); } static protected String toString(Location location) { @@ -141,7 +153,7 @@ public class RapidMessageStore implements MessageStore { public void replayAddMessage(ConnectionContext context, Message message, Location location) { try { RapidMessageReference messageReference = new RapidMessageReference(message, location); - messageContainer.put(message.getMessageId().toString(), messageReference); + addMessage(messageReference); } catch (Throwable e) { log.warn("Could not replay add for message '" + message.getMessageId() + "'. Message may have already been added. reason: " + e); @@ -160,7 +172,7 @@ public class RapidMessageStore implements MessageStore { if( !context.isInTransaction() ) { if( debug ) log.debug("Journalled message remove for: "+ack.getLastMessageId()+", at: "+location); - removeMessage(ack, location); + removeMessage(ack.getLastMessageId()); } else { if( debug ) log.debug("Journalled transacted message remove for: "+ack.getLastMessageId()+", at: "+location); @@ -174,7 +186,7 @@ public class RapidMessageStore implements MessageStore { log.debug("Transacted message remove commit for: "+ack.getLastMessageId()+", at: "+location); synchronized( RapidMessageStore.this ) { inFlightTxLocations.remove(location); - removeMessage(ack, location); + removeMessage(ack.getLastMessageId()); } } public void afterRollback() throws Exception { @@ -189,32 +201,53 @@ public class RapidMessageStore implements MessageStore { } } - private void removeMessage(final MessageAck ack, final Location location) { - synchronized (this) { - lastLocation = location; - MessageId id = ack.getLastMessageId(); - messageContainer.remove(id.toString()); + + public synchronized void removeMessage(MessageId msgId) throws IOException{ + StoreEntry entry=(StoreEntry)cache.remove(msgId); + if(entry!=null){ + entry = messageContainer.refresh(entry); + messageContainer.remove(entry); + }else{ + for (entry = messageContainer.getFirst();entry != null; entry = messageContainer.getNext(entry)) { + RapidMessageReference msg=(RapidMessageReference)messageContainer.get(entry); + if(msg.getMessageId().equals(msgId)){ + messageContainer.remove(entry); + break; + } + } } } public void replayRemoveMessage(ConnectionContext context, MessageAck ack) { try { MessageId id = ack.getLastMessageId(); - messageContainer.remove(id.toString()); + removeMessage(id); } catch (Throwable e) { log.warn("Could not replay acknowledge for message '" + ack.getLastMessageId() + "'. Message may have already been acknowledged. reason: " + e); } } - /** - * - */ - public Message getMessage(MessageId id) throws IOException { - RapidMessageReference messageReference = (RapidMessageReference) messageContainer.get(id.toString()); - if (messageReference == null ) + + public synchronized Message getMessage(MessageId identity) throws IOException{ + RapidMessageReference result=null; + StoreEntry entry=(StoreEntry)cache.get(identity); + if(entry!=null){ + entry = messageContainer.refresh(entry); + result = (RapidMessageReference)messageContainer.get(entry); + }else{ + for (entry = messageContainer.getFirst();entry != null; entry = messageContainer.getNext(entry)) { + RapidMessageReference msg=(RapidMessageReference)messageContainer.get(entry); + if(msg.getMessageId().equals(identity)){ + result=msg; + cache.put(identity,entry); + break; + } + } + } + if (result == null ) return null; - return (Message) peristenceAdapter.readCommand(messageReference.getLocation()); + return (Message) peristenceAdapter.readCommand(result.getLocation()); } /** @@ -225,28 +258,32 @@ public class RapidMessageStore implements MessageStore { * @param listener * @throws Exception */ - public void recover(final MessageRecoveryListener listener) throws Exception { - for(Iterator iter=messageContainer.values().iterator();iter.hasNext();){ + public synchronized void recover(MessageRecoveryListener listener) throws Exception{ + for(Iterator iter=messageContainer.iterator();iter.hasNext();){ RapidMessageReference messageReference=(RapidMessageReference) iter.next(); Message m = (Message) peristenceAdapter.readCommand(messageReference.getLocation()); listener.recoverMessage(m); } listener.finished(); - } - public void start() throws Exception { + public void start() { + if( this.usageManager != null ) + this.usageManager.addUsageListener(this); } - public void stop() throws Exception { + public void stop() { + if( this.usageManager != null ) + this.usageManager.removeUsageListener(this); } /** * @see org.apache.activemq.store.MessageStore#removeAllMessages(ConnectionContext) */ - public void removeAllMessages(ConnectionContext context) throws IOException { + public synchronized void removeAllMessages(ConnectionContext context) throws IOException { messageContainer.clear(); + cache.clear(); } public ActiveMQDestination getDestination() { @@ -254,15 +291,16 @@ public class RapidMessageStore implements MessageStore { } public void addMessageReference(ConnectionContext context, MessageId messageId, long expirationTime, String messageRef) throws IOException { - throw new IOException("The journal does not support message references."); + throw new IOException("Does not support message references."); } public String getMessageReference(MessageId identity) throws IOException { - throw new IOException("The journal does not support message references."); + throw new IOException("Does not support message references."); } public void setUsageManager(UsageManager usageManager) { + this.usageManager = usageManager; } /** @@ -289,13 +327,50 @@ public class RapidMessageStore implements MessageStore { public int getMessageCount(){ - return 0; + return messageContainer.size(); } - public void recoverNextMessages(int maxReturned,MessageRecoveryListener listener) throws Exception{ + public synchronized void recoverNextMessages(int maxReturned,MessageRecoveryListener listener) throws Exception{ + StoreEntry entry=batchEntry; + if(entry==null){ + entry=messageContainer.getFirst(); + }else{ + entry=messageContainer.refresh(entry); + entry=messageContainer.getNext(entry); + } + if(entry!=null){ + int count=0; + do{ + RapidMessageReference messageReference=(RapidMessageReference)messageContainer.get(entry); + Message msg=(Message)peristenceAdapter.readCommand(messageReference.getLocation()); + if(msg!=null){ + Message message=(Message)msg; + listener.recoverMessage(message); + count++; + } + batchEntry=entry; + entry=messageContainer.getNext(entry); + }while(entry!=null&&countlastCheckpointRequest+checkpointInterval ) { - checkpoint(false, true); - } - } - }; + public void run(){ + if(System.currentTimeMillis()>lastCheckpointRequest+checkpointInterval){ + checkpoint(false,true); + } + } + }; } - - public RapidPersistenceAdapter(Journal journal, TaskRunnerFactory taskRunnerFactory) throws IOException { - this.journal = journal; + public RapidPersistenceAdapter(Journal journal,TaskRunnerFactory taskRunnerFactory) throws IOException{ + this.journal=journal; journal.setJournalEventListener(this); - - File dir = ((JournalImpl)journal).getLogDirectory(); + File dir=((JournalImpl)journal).getLogDirectory(); String name=dir.getAbsolutePath()+File.separator+"kaha.db"; store=StoreFactory.open(name,"rw"); - - checkpointTask = taskRunnerFactory.createTaskRunner(new Task(){ - public boolean iterate() { + checkpointTask=taskRunnerFactory.createTaskRunner(new Task(){ + + public boolean iterate(){ return doCheckpoint(); } - }, "ActiveMQ Checkpoint Worker"); - + },"ActiveMQ Checkpoint Worker"); } - public Set getDestinations() { + public Set getDestinations(){ Set rc=new HashSet(); - try { - for(Iterator i=store.getMapContainerIds().iterator();i.hasNext();){ - Object obj=i.next(); - if(obj instanceof ActiveMQDestination){ - rc.add(obj); + try{ + for(Iterator i=store.getMapContainerIds().iterator();i.hasNext();){ + Object obj=i.next(); + if(obj instanceof ActiveMQDestination){ + rc.add(obj); + } } - } }catch(IOException e){ - log.error("Failed to get destinations " ,e); + log.error("Failed to get destinations ",e); } return rc; } - private MessageStore createMessageStore(ActiveMQDestination destination) throws IOException { - if (destination.isQueue()) { - return createQueueMessageStore((ActiveMQQueue) destination); - } - else { - return createTopicMessageStore((ActiveMQTopic) destination); + private MessageStore createMessageStore(ActiveMQDestination destination) throws IOException{ + if(destination.isQueue()){ + return createQueueMessageStore((ActiveMQQueue)destination); + }else{ + return createTopicMessageStore((ActiveMQTopic)destination); } } - public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException { - RapidMessageStore store = (RapidMessageStore) queues.get(destination); - if (store == null) { - MapContainer messageContainer=getMapContainer(destination,"topic-data"); - store = new RapidMessageStore(this, destination, messageContainer); - queues.put(destination, store); + public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException{ + RapidMessageStore store=(RapidMessageStore)queues.get(destination); + if(store==null){ + ListContainer messageContainer=getListContainer(destination,"topic-data"); + store=new RapidMessageStore(this,destination,messageContainer,maximumDestinationCacheSize); + queues.put(destination,store); } return store; } @@ -189,257 +176,241 @@ public class RapidPersistenceAdapter implements PersistenceAdapter, JournalEvent return container; } - public TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException { - RapidTopicMessageStore store = (RapidTopicMessageStore) topics.get(destination); - if (store == null) { - - MapContainer messageContainer=getMapContainer(destination,"topic-data"); - MapContainer subsContainer=getMapContainer(destination.toString()+"-subscriptions","topic-subs"); - MapContainer ackContainer=this.store.getMapContainer(destination.toString(),"topic-acks"); - - ackContainer.setKeyMarshaller(new StringMarshaller()); - ackContainer.setValueMarshaller(new AtomicIntegerMarshaller()); - - store = new RapidTopicMessageStore(this, destination, messageContainer, subsContainer, ackContainer); - topics.put(destination, store); - } - return store; + protected ListContainer getListContainer(Object id,String containerName) throws IOException{ + Store store=getStore(); + ListContainer container=store.getListContainer(id,containerName); + container.setMaximumCacheSize(0); + container.setMarshaller(new RapidMessageReferenceMarshaller()); + container.load(); + return container; } - public TransactionStore createTransactionStore() throws IOException { + public TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException{ + TopicMessageStore rc=(TopicMessageStore)topics.get(destination); + if(rc==null){ + Store store=getStore(); + ListContainer messageContainer=getListContainer(destination,"topic-data"); + MapContainer subsContainer=getMapContainer(destination.toString()+"-Subscriptions","topic-subs"); + ListContainer ackContainer=store.getListContainer(destination.toString(),"topic-acks"); + ackContainer.setMarshaller(new TopicSubAckMarshaller()); + rc=new RapidTopicMessageStore(this,store,messageContainer,ackContainer,subsContainer,destination, + maximumDestinationCacheSize); + topics.put(destination,rc); + } + return rc; + } + + public TransactionStore createTransactionStore() throws IOException{ return transactionStore; } - public long getLastMessageBrokerSequenceId() throws IOException { + public long getLastMessageBrokerSequenceId() throws IOException{ // TODO: implement this. return 0; } - public void beginTransaction(ConnectionContext context) throws IOException { + public void beginTransaction(ConnectionContext context) throws IOException{ } - public void commitTransaction(ConnectionContext context) throws IOException { + public void commitTransaction(ConnectionContext context) throws IOException{ } - public void rollbackTransaction(ConnectionContext context) throws IOException { + public void rollbackTransaction(ConnectionContext context) throws IOException{ } - public synchronized void start() throws Exception { - if( !started.compareAndSet(false, true) ) + public synchronized void start() throws Exception{ + if(!started.compareAndSet(false,true)) return; - - checkpointExecutor = new ThreadPoolExecutor(maxCheckpointWorkers, maxCheckpointWorkers, 30, TimeUnit.SECONDS, new LinkedBlockingQueue(), new ThreadFactory() { - public Thread newThread(Runnable runable) { - Thread t = new Thread(runable, "Journal checkpoint worker"); - t.setPriority(7); - return t; - } - }); - //checkpointExecutor.allowCoreThreadTimeOut(true); - + checkpointExecutor=new ThreadPoolExecutor(maxCheckpointWorkers,maxCheckpointWorkers,30,TimeUnit.SECONDS, + new LinkedBlockingQueue(),new ThreadFactory(){ + + public Thread newThread(Runnable runable){ + Thread t=new Thread(runable,"Journal checkpoint worker"); + t.setPriority(7); + return t; + } + }); + // checkpointExecutor.allowCoreThreadTimeOut(true); createTransactionStore(); recover(); - // Do a checkpoint periodically. - Scheduler.executePeriodically(periodicCheckpointTask, checkpointInterval/10); - + Scheduler.executePeriodically(periodicCheckpointTask,checkpointInterval/10); } - public void stop() throws Exception { - - if( !started.compareAndSet(true, false) ) + public void stop() throws Exception{ + if(!started.compareAndSet(true,false)) return; - Scheduler.cancel(periodicCheckpointTask); - // Take one final checkpoint and stop checkpoint processing. - checkpoint(false, true); - checkpointTask.shutdown(); + checkpoint(false,true); + checkpointTask.shutdown(); checkpointExecutor.shutdown(); - queues.clear(); topics.clear(); - - IOException firstException = null; - try { + IOException firstException=null; + try{ journal.close(); - } catch (Exception e) { - firstException = IOExceptionSupport.create("Failed to close journals: " + e, e); + }catch(Exception e){ + firstException=IOExceptionSupport.create("Failed to close journals: "+e,e); } store.close(); - - if (firstException != null) { + if(firstException!=null){ throw firstException; } } // Properties // ------------------------------------------------------------------------- - /** * @return Returns the wireFormat. */ - public WireFormat getWireFormat() { + public WireFormat getWireFormat(){ return wireFormat; } // Implementation methods // ------------------------------------------------------------------------- - /** - * The Journal give us a call back so that we can move old data out of the - * journal. Taking a checkpoint does this for us. + * The Journal give us a call back so that we can move old data out of the journal. Taking a checkpoint does this + * for us. * * @see org.apache.activemq.journal.JournalEventListener#overflowNotification(org.apache.activemq.journal.RecordLocation) */ - public void overflowNotification(RecordLocation safeLocation) { - checkpoint(false, true); + public void overflowNotification(RecordLocation safeLocation){ + checkpoint(false,true); } /** * When we checkpoint we move all the journalled data to long term storage. - * @param stopping + * + * @param stopping * * @param b */ - public void checkpoint(boolean sync, boolean fullCheckpoint) { - try { - if (journal == null ) + public void checkpoint(boolean sync,boolean fullCheckpoint){ + try{ + if(journal==null) throw new IllegalStateException("Journal is closed."); - - long now = System.currentTimeMillis(); - CountDownLatch latch = null; - synchronized(this) { - latch = nextCheckpointCountDownLatch; - lastCheckpointRequest = now; - if( fullCheckpoint ) { - this.fullCheckPoint = true; + long now=System.currentTimeMillis(); + CountDownLatch latch=null; + synchronized(this){ + latch=nextCheckpointCountDownLatch; + lastCheckpointRequest=now; + if(fullCheckpoint){ + this.fullCheckPoint=true; } } - checkpointTask.wakeup(); - - if (sync) { + if(sync){ log.debug("Waking for checkpoint to complete."); latch.await(); } - } - catch (InterruptedException e) { - log.warn("Request to start checkpoint failed: " + e, e); + }catch(InterruptedException e){ + log.warn("Request to start checkpoint failed: "+e,e); } } - + /** * This does the actual checkpoint. - * @return + * + * @return */ - public boolean doCheckpoint() { - CountDownLatch latch = null; + public boolean doCheckpoint(){ + CountDownLatch latch=null; boolean fullCheckpoint; - synchronized(this) { - latch = nextCheckpointCountDownLatch; - nextCheckpointCountDownLatch = new CountDownLatch(1); - fullCheckpoint = this.fullCheckPoint; - this.fullCheckPoint=false; - } - try { - + synchronized(this){ + latch=nextCheckpointCountDownLatch; + nextCheckpointCountDownLatch=new CountDownLatch(1); + fullCheckpoint=this.fullCheckPoint; + this.fullCheckPoint=false; + } + try{ log.debug("Checkpoint started."); - RecordLocation newMark = null; - - ArrayList futureTasks = new ArrayList(queues.size()+topics.size()); - + RecordLocation newMark=null; + ArrayList futureTasks=new ArrayList(queues.size()+topics.size()); // // We do many partial checkpoints (fullCheckpoint==false) to move topic messages - // to long term store as soon as possible. + // to long term store as soon as possible. // // We want to avoid doing that for queue messages since removes the come in the same - // checkpoint cycle will nullify the previous message add. Therefore, we only + // checkpoint cycle will nullify the previous message add. Therefore, we only // checkpoint queues on the fullCheckpoint cycles. // - if( fullCheckpoint ) { - Iterator iterator = queues.values().iterator(); - while (iterator.hasNext()) { - try { - final RapidMessageStore ms = (RapidMessageStore) iterator.next(); - FutureTask task = new FutureTask(new Callable() { - public Object call() throws Exception { + if(fullCheckpoint){ + Iterator iterator=queues.values().iterator(); + while(iterator.hasNext()){ + try{ + final RapidMessageStore ms=(RapidMessageStore)iterator.next(); + FutureTask task=new FutureTask(new Callable(){ + + public Object call() throws Exception{ return ms.checkpoint(); - }}); + } + }); futureTasks.add(task); - checkpointExecutor.execute(task); - } - catch (Exception e) { - log.error("Failed to checkpoint a message store: " + e, e); + checkpointExecutor.execute(task); + }catch(Exception e){ + log.error("Failed to checkpoint a message store: "+e,e); } } } + Iterator iterator=topics.values().iterator(); + while(iterator.hasNext()){ + try{ + final RapidTopicMessageStore ms=(RapidTopicMessageStore)iterator.next(); + FutureTask task=new FutureTask(new Callable(){ - Iterator iterator = topics.values().iterator(); - while (iterator.hasNext()) { - try { - final RapidTopicMessageStore ms = (RapidTopicMessageStore) iterator.next(); - FutureTask task = new FutureTask(new Callable() { - public Object call() throws Exception { + public Object call() throws Exception{ return ms.checkpoint(); - }}); + } + }); futureTasks.add(task); - checkpointExecutor.execute(task); - } - catch (Exception e) { - log.error("Failed to checkpoint a message store: " + e, e); + checkpointExecutor.execute(task); + }catch(Exception e){ + log.error("Failed to checkpoint a message store: "+e,e); } } - - try { - for (Iterator iter = futureTasks.iterator(); iter.hasNext();) { - FutureTask ft = (FutureTask) iter.next(); - RecordLocation mark = (RecordLocation) ft.get(); + try{ + for(Iterator iter=futureTasks.iterator();iter.hasNext();){ + FutureTask ft=(FutureTask)iter.next(); + RecordLocation mark=(RecordLocation)ft.get(); // We only set a newMark on full checkpoints. - if( fullCheckpoint ) { - if (mark != null && (newMark == null || newMark.compareTo(mark) < 0)) { - newMark = mark; + if(fullCheckpoint){ + if(mark!=null&&(newMark==null||newMark.compareTo(mark)<0)){ + newMark=mark; } } } - } catch (Throwable e) { - log.error("Failed to checkpoint a message store: " + e, e); + }catch(Throwable e){ + log.error("Failed to checkpoint a message store: "+e,e); } - - - if( fullCheckpoint ) { - try { - if (newMark != null) { - log.debug("Marking journal at: " + newMark); - journal.setMark(newMark, true); + if(fullCheckpoint){ + try{ + if(newMark!=null){ + log.debug("Marking journal at: "+newMark); + journal.setMark(newMark,true); } + }catch(Exception e){ + log.error("Failed to mark the Journal: "+e,e); } - catch (Exception e) { - log.error("Failed to mark the Journal: " + e, e); - } - -// TODO: do we need to implement a periodic clean up? - -// if (longTermPersistence instanceof JDBCPersistenceAdapter) { -// // We may be check pointing more often than the checkpointInterval if under high use -// // But we don't want to clean up the db that often. -// long now = System.currentTimeMillis(); -// if( now > lastCleanup+checkpointInterval ) { -// lastCleanup = now; -// ((JDBCPersistenceAdapter) longTermPersistence).cleanup(); -// } -// } + // TODO: do we need to implement a periodic clean up? + // if (longTermPersistence instanceof JDBCPersistenceAdapter) { + // // We may be check pointing more often than the checkpointInterval if under high use + // // But we don't want to clean up the db that often. + // long now = System.currentTimeMillis(); + // if( now > lastCleanup+checkpointInterval ) { + // lastCleanup = now; + // ((JDBCPersistenceAdapter) longTermPersistence).cleanup(); + // } + // } } - log.debug("Checkpoint done."); - } - finally { + }finally{ latch.countDown(); } - synchronized(this) { + synchronized(this){ return this.fullCheckPoint; - } - + } } /** @@ -447,108 +418,95 @@ public class RapidPersistenceAdapter implements PersistenceAdapter, JournalEvent * @return * @throws IOException */ - public DataStructure readCommand(RecordLocation location) throws IOException { - try { - Packet data = journal.read(location); - return (DataStructure) wireFormat.unmarshal(toByteSequence(data)); - } - catch (InvalidRecordLocationException e) { - throw createReadException(location, e); - } - catch (IOException e) { - throw createReadException(location, e); + public DataStructure readCommand(RecordLocation location) throws IOException{ + try{ + Packet data=journal.read(location); + return (DataStructure)wireFormat.unmarshal(toByteSequence(data)); + }catch(InvalidRecordLocationException e){ + throw createReadException(location,e); + }catch(IOException e){ + throw createReadException(location,e); } } /** - * Move all the messages that were in the journal into long term storage. We - * just replay and do a checkpoint. + * Move all the messages that were in the journal into long term storage. We just replay and do a checkpoint. * * @throws IOException * @throws IOException * @throws InvalidRecordLocationException * @throws IllegalStateException */ - private void recover() throws IllegalStateException, InvalidRecordLocationException, IOException, IOException { - - Location pos = null; - int transactionCounter = 0; - + private void recover() throws IllegalStateException,InvalidRecordLocationException,IOException,IOException{ + Location pos=null; + int transactionCounter=0; log.info("Journal Recovery Started."); - ConnectionContext context = new ConnectionContext(); - + ConnectionContext context=new ConnectionContext(); // While we have records in the journal. - while ((pos = (Location) journal.getNextRecordLocation(pos)) != null) { - Packet data = journal.read(pos); - DataStructure c = (DataStructure) wireFormat.unmarshal(toByteSequence(data)); - - if (c instanceof Message ) { - Message message = (Message) c; - RapidMessageStore store = (RapidMessageStore) createMessageStore(message.getDestination()); - if ( message.isInTransaction()) { - transactionStore.addMessage(store, message, pos); - } - else { - store.replayAddMessage(context, message, pos); + while((pos=(Location)journal.getNextRecordLocation(pos))!=null){ + Packet data=journal.read(pos); + DataStructure c=(DataStructure)wireFormat.unmarshal(toByteSequence(data)); + if(c instanceof Message){ + Message message=(Message)c; + RapidMessageStore store=(RapidMessageStore)createMessageStore(message.getDestination()); + if(message.isInTransaction()){ + transactionStore.addMessage(store,message,pos); + }else{ + store.replayAddMessage(context,message,pos); transactionCounter++; } - } else { - switch (c.getDataStructureType()) { - case JournalQueueAck.DATA_STRUCTURE_TYPE: - { - JournalQueueAck command = (JournalQueueAck) c; - RapidMessageStore store = (RapidMessageStore) createMessageStore(command.getDestination()); - if (command.getMessageAck().isInTransaction()) { - transactionStore.removeMessage(store, command.getMessageAck(), pos); - } - else { - store.replayRemoveMessage(context, command.getMessageAck()); + }else{ + switch(c.getDataStructureType()){ + case JournalQueueAck.DATA_STRUCTURE_TYPE: { + JournalQueueAck command=(JournalQueueAck)c; + RapidMessageStore store=(RapidMessageStore)createMessageStore(command.getDestination()); + if(command.getMessageAck().isInTransaction()){ + transactionStore.removeMessage(store,command.getMessageAck(),pos); + }else{ + store.replayRemoveMessage(context,command.getMessageAck()); transactionCounter++; } } - break; - case JournalTopicAck.DATA_STRUCTURE_TYPE: - { - JournalTopicAck command = (JournalTopicAck) c; - RapidTopicMessageStore store = (RapidTopicMessageStore) createMessageStore(command.getDestination()); - if (command.getTransactionId() != null) { - transactionStore.acknowledge(store, command, pos); - } - else { - store.replayAcknowledge(context, command.getClientId(), command.getSubscritionName(), command.getMessageId()); + break; + case JournalTopicAck.DATA_STRUCTURE_TYPE: { + JournalTopicAck command=(JournalTopicAck)c; + RapidTopicMessageStore store=(RapidTopicMessageStore)createMessageStore(command.getDestination()); + if(command.getTransactionId()!=null){ + transactionStore.acknowledge(store,command,pos); + }else{ + store.replayAcknowledge(context,command.getClientId(),command.getSubscritionName(),command + .getMessageId()); transactionCounter++; } } - break; - case JournalTransaction.DATA_STRUCTURE_TYPE: - { - JournalTransaction command = (JournalTransaction) c; - try { + break; + case JournalTransaction.DATA_STRUCTURE_TYPE: { + JournalTransaction command=(JournalTransaction)c; + try{ // Try to replay the packet. - switch (command.getType()) { + switch(command.getType()){ case JournalTransaction.XA_PREPARE: transactionStore.replayPrepare(command.getTransactionId()); break; case JournalTransaction.XA_COMMIT: case JournalTransaction.LOCAL_COMMIT: - Tx tx = transactionStore.replayCommit(command.getTransactionId(), command.getWasPrepared()); - if (tx == null) + Tx tx=transactionStore.replayCommit(command.getTransactionId(),command.getWasPrepared()); + if(tx==null) break; // We may be trying to replay a commit that - // was already committed. - + // was already committed. // Replay the committed operations. - for (Iterator iter = tx.getOperations().iterator(); iter.hasNext();) { - TxOperation op = (TxOperation) iter.next(); - if (op.operationType == TxOperation.ADD_OPERATION_TYPE) { - op.store.replayAddMessage(context, (Message) op.data, op.location); + for(Iterator iter=tx.getOperations().iterator();iter.hasNext();){ + TxOperation op=(TxOperation)iter.next(); + if(op.operationType==TxOperation.ADD_OPERATION_TYPE){ + op.store.replayAddMessage(context,(Message)op.data,op.location); } - if (op.operationType == TxOperation.REMOVE_OPERATION_TYPE) { - op.store.replayRemoveMessage(context, (MessageAck) op.data); + if(op.operationType==TxOperation.REMOVE_OPERATION_TYPE){ + op.store.replayRemoveMessage(context,(MessageAck)op.data); } - if (op.operationType == TxOperation.ACK_OPERATION_TYPE) { - JournalTopicAck ack = (JournalTopicAck) op.data; - ((RapidTopicMessageStore) op.store).replayAcknowledge(context, ack.getClientId(), ack.getSubscritionName(), ack - .getMessageId()); + if(op.operationType==TxOperation.ACK_OPERATION_TYPE){ + JournalTopicAck ack=(JournalTopicAck)op.data; + ((RapidTopicMessageStore)op.store).replayAcknowledge(context,ack.getClientId(),ack + .getSubscritionName(),ack.getMessageId()); } } transactionCounter++; @@ -558,42 +516,39 @@ public class RapidPersistenceAdapter implements PersistenceAdapter, JournalEvent transactionStore.replayRollback(command.getTransactionId()); break; } - } - catch (IOException e) { - log.error("Recovery Failure: Could not replay: " + c + ", reason: " + e, e); + }catch(IOException e){ + log.error("Recovery Failure: Could not replay: "+c+", reason: "+e,e); } } - break; + break; case JournalTrace.DATA_STRUCTURE_TYPE: - JournalTrace trace = (JournalTrace) c; - log.debug("TRACE Entry: " + trace.getMessage()); + JournalTrace trace=(JournalTrace)c; + log.debug("TRACE Entry: "+trace.getMessage()); break; default: - log.error("Unknown type of record in transaction log which will be discarded: " + c); + log.error("Unknown type of record in transaction log which will be discarded: "+c); } } } - - RecordLocation location = writeTraceMessage("RECOVERED", true); - journal.setMark(location, true); - - log.info("Journal Recovered: " + transactionCounter + " message(s) in transactions recovered."); + RecordLocation location=writeTraceMessage("RECOVERED",true); + journal.setMark(location,true); + log.info("Journal Recovered: "+transactionCounter+" message(s) in transactions recovered."); } - private IOException createReadException(RecordLocation location, Exception e) { - return IOExceptionSupport.create("Failed to read to journal for: " + location + ". Reason: " + e, e); + private IOException createReadException(RecordLocation location,Exception e){ + return IOExceptionSupport.create("Failed to read to journal for: "+location+". Reason: "+e,e); } - protected IOException createWriteException(DataStructure packet, Exception e) { - return IOExceptionSupport.create("Failed to write to journal for: " + packet + ". Reason: " + e, e); + protected IOException createWriteException(DataStructure packet,Exception e){ + return IOExceptionSupport.create("Failed to write to journal for: "+packet+". Reason: "+e,e); } - protected IOException createWriteException(String command, Exception e) { - return IOExceptionSupport.create("Failed to write to journal for command: " + command + ". Reason: " + e, e); + protected IOException createWriteException(String command,Exception e){ + return IOExceptionSupport.create("Failed to write to journal for command: "+command+". Reason: "+e,e); } - protected IOException createRecoveryFailedException(Exception e) { - return IOExceptionSupport.create("Failed to recover from journal. Reason: " + e, e); + protected IOException createRecoveryFailedException(Exception e){ + return IOExceptionSupport.create("Failed to recover from journal. Reason: "+e,e); } /** @@ -603,85 +558,102 @@ public class RapidPersistenceAdapter implements PersistenceAdapter, JournalEvent * @return * @throws IOException */ - public Location writeCommand(DataStructure command, boolean sync) throws IOException { - if( started.get() ) - return (Location) journal.write(toPacket(wireFormat.marshal(command)), sync); + public Location writeCommand(DataStructure command,boolean sync) throws IOException{ + if(started.get()) + return (Location)journal.write(toPacket(wireFormat.marshal(command)),sync); throw new IOException("closed"); } - private RecordLocation writeTraceMessage(String message, boolean sync) throws IOException { - JournalTrace trace = new JournalTrace(); + private RecordLocation writeTraceMessage(String message,boolean sync) throws IOException{ + JournalTrace trace=new JournalTrace(); trace.setMessage(message); - return writeCommand(trace, sync); + return writeCommand(trace,sync); } - public void onMemoryUseChanged(UsageManager memoryManager, int oldPercentUsage, int newPercentUsage) { - if (newPercentUsage > 80 && oldPercentUsage < newPercentUsage) { - checkpoint(false, true); + public void onMemoryUseChanged(UsageManager memoryManager,int oldPercentUsage,int newPercentUsage){ + if(newPercentUsage>80&&oldPercentUsage0){ + final Location location = peristenceAdapter.writeCommand(message, message.isResponseRequired()); + final RapidMessageReference md = new RapidMessageReference(message, location); + StoreEntry messageEntry=messageContainer.placeLast(md); + TopicSubAck tsa=new TopicSubAck(); + tsa.setCount(subscriberCount); + tsa.setMessageEntry(messageEntry); + StoreEntry ackEntry=ackContainer.placeLast(tsa); + for(Iterator i=subscriberMessages.values().iterator();i.hasNext();){ + TopicSubContainer container=(TopicSubContainer)i.next(); + ConsumerMessageRef ref=new ConsumerMessageRef(); + ref.setAckEntry(ackEntry); + ref.setMessageEntry(messageEntry); + container.getListContainer().add(ref); } - }else{ - listener.finished(); } } - public void recoverNextMessages(String clientId,String subscriptionName,MessageId lastMessageId,int maxReturned, - MessageRecoveryListener listener) throws Exception{ - String key=getSubscriptionKey(clientId,subscriptionName); - ListContainer list=(ListContainer)subscriberAcks.get(key); - if(list!=null){ - boolean startFound=false; - int count=0; - for(Iterator i=list.iterator();i.hasNext()&&count0){ - String id=message.getMessageId().toString(); - ackContainer.put(id,new AtomicInteger(subscriberCount)); - for(Iterator i=subscriberAcks.keySet().iterator();i.hasNext();){ - Object key=i.next(); - ListContainer container=store.getListContainer(key,"durable-subs"); - container.add(id); - } - super.addMessage(context,message); - } - } - - /** - */ - public void acknowledge(ConnectionContext context,String clientId,String subscriptionName,final MessageId messageId) - throws IOException{ - final boolean debug=log.isDebugEnabled(); - JournalTopicAck ack=new JournalTopicAck(); - ack.setDestination(destination); - ack.setMessageId(messageId); - ack.setMessageSequenceId(messageId.getBrokerSequenceId()); - ack.setSubscritionName(subscriptionName); - ack.setClientId(clientId); - ack.setTransactionId(context.getTransaction()!=null?context.getTransaction().getTransactionId():null); - final Location location=peristenceAdapter.writeCommand(ack,false); - final SubscriptionKey key=new SubscriptionKey(clientId,subscriptionName); - if(!context.isInTransaction()){ - if(debug) - log.debug("Journalled acknowledge for: "+messageId+", at: "+location); - acknowledge(messageId,location,key); - }else{ - if(debug) - log.debug("Journalled transacted acknowledge for: "+messageId+", at: "+location); - synchronized(this){ - inFlightTxLocations.add(location); - } - transactionStore.acknowledge(this,ack,location); - context.getTransaction().addSynchronization(new Synchronization(){ - - public void afterCommit() throws Exception{ - if(debug) - log.debug("Transacted acknowledge commit for: "+messageId+", at: "+location); - synchronized(RapidTopicMessageStore.this){ - inFlightTxLocations.remove(location); - acknowledge(messageId,location,key); - } - } - - public void afterRollback() throws Exception{ - if(debug) - log.debug("Transacted acknowledge rollback for: "+messageId+", at: "+location); - synchronized(RapidTopicMessageStore.this){ - inFlightTxLocations.remove(location); - } - } - }); - } - } - - public void replayAcknowledge(ConnectionContext context,String clientId,String subscritionName,MessageId messageId){ - try{ - synchronized(this){ - String subcriberId=getSubscriptionKey(clientId,subscritionName); - String id=messageId.toString(); - ListContainer container=(ListContainer)subscriberAcks.get(subcriberId); - if(container!=null){ - // container.remove(id); - container.removeFirst(); - AtomicInteger count=(AtomicInteger)ackContainer.remove(id); - if(count!=null){ - if(count.decrementAndGet()>0){ - ackContainer.put(id,count); - }else{ - // no more references to message messageContainer so remove it - messageContainer.remove(messageId.toString()); - } - } - } - } - }catch(Throwable e){ - log.debug("Could not replay acknowledge for message '"+messageId - +"'. Message may have already been acknowledged. reason: "+e); - } - } - - /** - * @param messageId - * @param location - * @param key - */ - private void acknowledge(MessageId messageId,Location location,SubscriptionKey key){ - synchronized(this){ - lastLocation=location; - ackedLastAckLocations.put(key,messageId); - String subcriberId=getSubscriptionKey(key.getClientId(),key.getSubscriptionName()); - String id=messageId.toString(); - ListContainer container=(ListContainer)subscriberAcks.get(subcriberId); - if(container!=null){ - // container.remove(id); - container.removeFirst(); - AtomicInteger count=(AtomicInteger)ackContainer.remove(id); - if(count!=null){ - if(count.decrementAndGet()>0){ - ackContainer.put(id,count); + public synchronized void deleteSubscription(String clientId,String subscriptionName){ + String key=getSubscriptionKey(clientId,subscriptionName); + subscriberContainer.remove(key); + TopicSubContainer container=(TopicSubContainer)subscriberMessages.get(key); + for(Iterator i=container.getListContainer().iterator();i.hasNext();){ + ConsumerMessageRef ref=(ConsumerMessageRef)i.next(); + if(ref!=null){ + TopicSubAck tsa=(TopicSubAck)ackContainer.get(ref.getAckEntry()); + if(tsa!=null){ + if(tsa.decrementCount()<=0){ + ackContainer.remove(ref.getAckEntry()); + messageContainer.remove(tsa.getMessageEntry()); }else{ - // no more references to message messageContainer so remove it - messageContainer.remove(messageId.toString()); + ackContainer.update(ref.getAckEntry(),tsa); } } } } } + public void recoverSubscription(String clientId,String subscriptionName,MessageRecoveryListener listener) + throws Exception{ + String key=getSubscriptionKey(clientId,subscriptionName); + TopicSubContainer container=(TopicSubContainer)subscriberMessages.get(key); + if(container!=null){ + for(Iterator i=container.getListContainer().iterator();i.hasNext();){ + ConsumerMessageRef ref=(ConsumerMessageRef)i.next(); + RapidMessageReference messageReference=(RapidMessageReference)messageContainer.get(ref + .getMessageEntry()); + if(messageReference!=null){ + Message m=(Message)peristenceAdapter.readCommand(messageReference.getLocation()); + listener.recoverMessage(m); + } + } + } + listener.finished(); + } + + public void recoverNextMessages(String clientId,String subscriptionName,int maxReturned, + MessageRecoveryListener listener) throws Exception{ + String key=getSubscriptionKey(clientId,subscriptionName); + TopicSubContainer container=(TopicSubContainer)subscriberMessages.get(key); + if(container!=null){ + int count=0; + StoreEntry entry=container.getBatchEntry(); + if(entry==null){ + entry=container.getListContainer().getFirst(); + }else{ + entry=container.getListContainer().refresh(entry); + entry=container.getListContainer().getNext(entry); + } + if(entry!=null){ + do{ + ConsumerMessageRef consumerRef=(ConsumerMessageRef)container.getListContainer().get(entry); + RapidMessageReference messageReference=(RapidMessageReference)messageContainer.get(consumerRef + .getMessageEntry()); + if(messageReference!=null){ + Message m=(Message)peristenceAdapter.readCommand(messageReference.getLocation()); + listener.recoverMessage(m); + count++; + } + container.setBatchEntry(entry); + entry=container.getListContainer().getNext(entry); + }while(entry!=null&&count0){ - ackContainer.put(id,count); - }else{ - // no more references to message messageContainer so remove it - messageContainer.remove(id); + TopicSubContainer topicSubContainer=(TopicSubContainer)subscriberMessages.get(key); + if(topicSubContainer!=null){ + topicSubContainer.reset(); + } + } + + + public Location checkpoint() throws IOException{ + return null; + } + + + public synchronized void replayAcknowledge(ConnectionContext context,String clientId,String subscriptionName,MessageId messageId){ + String subcriberId=getSubscriptionKey(clientId,subscriptionName); + TopicSubContainer container=(TopicSubContainer)subscriberMessages.get(subcriberId); + if(container!=null){ + ConsumerMessageRef ref=(ConsumerMessageRef)container.getListContainer().removeFirst(); + if(ref!=null){ + TopicSubAck tsa=(TopicSubAck)ackContainer.get(ref.getAckEntry()); + if(tsa!=null){ + if(tsa.decrementCount()<=0){ + ackContainer.remove(ref.getAckEntry()); + messageContainer.remove(tsa.getMessageEntry()); + }else{ + ackContainer.update(ref.getAckEntry(),tsa); + } } } } } - - public SubscriptionInfo[] getAllSubscriptions() throws IOException{ - return (SubscriptionInfo[])subscriberContainer.values().toArray( - new SubscriptionInfo[subscriberContainer.size()]); - } - - protected void addSubscriberAckContainer(Object key) throws IOException{ - ListContainer container=store.getListContainer(key,"durable-subs"); - Marshaller marshaller=new StringMarshaller(); - container.setMarshaller(marshaller); - subscriberAcks.put(key,container); - } +} + + + + - public int getMessageCount(String clientId,String subscriberName) throws IOException{ - String key=getSubscriptionKey(clientId,subscriberName); - ListContainer list=(ListContainer)subscriberAcks.get(key); - return list.size(); - } - public void resetBatching(String clientId,String subscriptionName,MessageId nextId){ - } - - public void recoverNextMessages(String clientId,String subscriptionName,int maxReturned,MessageRecoveryListener listener) throws Exception{ - - - } - - public void resetBatching(String clientId,String subscriptionName){ - - - } -} \ No newline at end of file diff --git a/activemq-core/src/test/java/org/apache/activemq/perf/RapidStoreQueueTest.java b/activemq-core/src/test/java/org/apache/activemq/perf/RapidStoreQueueTest.java new file mode 100644 index 0000000000..17d9dafd54 --- /dev/null +++ b/activemq-core/src/test/java/org/apache/activemq/perf/RapidStoreQueueTest.java @@ -0,0 +1,45 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.perf; + +import java.io.File; +import org.apache.activeio.journal.active.JournalImpl; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.store.rapid.RapidPersistenceAdapter; +/** + * @version $Revision: 1.3 $ + */ +public class RapidStoreQueueTest extends SimpleQueueTest{ + + + protected void configureBroker(BrokerService answer) throws Exception{ + + File dataFileDir = new File("activemq-data/perfTest"); + File journalDir = new File(dataFileDir, "journal").getCanonicalFile(); + JournalImpl journal = new JournalImpl(journalDir, 3, 1024*1024*20); + + RapidPersistenceAdapter adaptor = new RapidPersistenceAdapter(journal,answer.getTaskRunnerFactory()); + + + answer.setPersistenceAdapter(adaptor); + answer.addConnector(bindAddress); + answer.setDeleteAllMessagesOnStartup(true); + + } + +}