From ea843782b387e39aa363f5bb45607b61c99f588f Mon Sep 17 00:00:00 2001 From: Robert Davies Date: Mon, 17 May 2010 11:53:28 +0000 Subject: [PATCH] added support for concurrent dispatch and store of persistent messages in KahaDB git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@945102 13f79535-47bb-0310-9956-ffa450edef68 --- .../apache/activemq/broker/region/Queue.java | 37 +- .../apache/activemq/broker/region/Topic.java | 21 +- .../activemq/store/AbstractMessageStore.java | 47 +- .../apache/activemq/store/MessageStore.java | 27 + .../activemq/store/ProxyMessageStore.java | 13 + .../store/ProxyTopicMessageStore.java | 13 + .../activemq/store/TopicMessageStore.java | 4 +- .../kahadb/KahaDBPersistenceAdapter.java | 31 + .../activemq/store/kahadb/KahaDBStore.java | 641 ++++++++++++++---- .../store/kahadb/MessageDatabase.java | 20 +- 10 files changed, 674 insertions(+), 180 deletions(-) diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java index ae510d31c3..f6f152d1b3 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java @@ -29,18 +29,18 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.CancellationException; import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.CountDownLatch; import java.util.concurrent.DelayQueue; import java.util.concurrent.Delayed; import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; - import javax.jms.InvalidSelectorException; import javax.jms.JMSException; import javax.jms.ResourceAllocationException; - import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.ProducerBrokerExchange; @@ -105,13 +105,13 @@ public class Queue extends BaseDestination implements Task, UsageListener { private final Object dispatchMutex = new Object(); private boolean useConsumerPriority = true; private boolean strictOrderDispatch = false; - private QueueDispatchSelector dispatchSelector; + private final QueueDispatchSelector dispatchSelector; private boolean optimizedDispatch = false; private boolean firstConsumer = false; private int timeBeforeDispatchStarts = 0; private int consumersBeforeDispatchStarts = 0; private CountDownLatch consumersBeforeStartsLatch; - private AtomicLong pendingWakeups = new AtomicLong(); + private final AtomicLong pendingWakeups = new AtomicLong(); private final Runnable sendMessagesWaitingForSpaceTask = new Runnable() { public void run() { @@ -163,6 +163,7 @@ public class Queue extends BaseDestination implements Task, UsageListener { class FlowControlTimeoutTask extends Thread { + @Override public void run() { TimeoutMessage timeout; try { @@ -220,6 +221,7 @@ public class Queue extends BaseDestination implements Task, UsageListener { } } + @Override public void initialize() throws Exception { if (this.messages == null) { if (destination.isTemporary() || broker == null || store == null) { @@ -554,6 +556,7 @@ public class Queue extends BaseDestination implements Task, UsageListener { void doMessageSend(final ProducerBrokerExchange producerExchange, final Message message) throws IOException, Exception { final ConnectionContext context = producerExchange.getConnectionContext(); + Future result = null; synchronized (sendLock) { if (store != null && message.isPersistent()) { if (systemUsage.getStoreUsage().isFull(getStoreUsageHighWaterMark())) { @@ -568,8 +571,11 @@ public class Queue extends BaseDestination implements Task, UsageListener { waitForSpace(context, systemUsage.getStoreUsage(), getStoreUsageHighWaterMark(), logMessage); } message.getMessageId().setBrokerSequenceId(getDestinationSequenceId()); - store.addMessage(context, message); - + if (context.isInTransaction()) { + store.addMessage(context, message); + }else { + result = store.asyncAddQueueMessage(context, message); + } } } if (context.isInTransaction()) { @@ -578,6 +584,7 @@ public class Queue extends BaseDestination implements Task, UsageListener { // our memory. This increment is decremented once the tx finishes.. message.incrementReferenceCount(); context.getTransaction().addSynchronization(new Synchronization() { + @Override public void afterCommit() throws Exception { try { // It could take while before we receive the commit @@ -603,6 +610,14 @@ public class Queue extends BaseDestination implements Task, UsageListener { // usage manager. sendMessage(context, message); } + if (result != null && !result.isCancelled()) { + try { + result.get(); + }catch(CancellationException e) { + //ignore - the task has been cancelled if the message + // has already been deleted + } + } } private void expireMessages() { @@ -651,7 +666,7 @@ public class Queue extends BaseDestination implements Task, UsageListener { ack.setLastMessageId(node.getMessageId()); ack.setMessageCount(1); } - store.removeMessage(context, ack); + store.removeAsyncMessage(context, ack); } } @@ -666,6 +681,7 @@ public class Queue extends BaseDestination implements Task, UsageListener { return msg; } + @Override public String toString() { int size = 0; synchronized (messages) { @@ -725,6 +741,7 @@ public class Queue extends BaseDestination implements Task, UsageListener { // Properties // ------------------------------------------------------------------------- + @Override public ActiveMQDestination getActiveMQDestination() { return destination; } @@ -936,7 +953,7 @@ public class Queue extends BaseDestination implements Task, UsageListener { for (MessageReference ref : list) { try { QueueMessageReference r = (QueueMessageReference) ref; - removeMessage(c, (IndirectMessageReference) r); + removeMessage(c, r); } catch (IOException e) { } } @@ -1273,6 +1290,7 @@ public class Queue extends BaseDestination implements Task, UsageListener { return messageId.equals(r.getMessageId().toString()); } + @Override public String toString() { return "MessageIdFilter: " + messageId; } @@ -1326,12 +1344,14 @@ public class Queue extends BaseDestination implements Task, UsageListener { } finally { context.getTransaction().addSynchronization(new Synchronization() { + @Override public void afterCommit() throws Exception { getDestinationStatistics().getDequeues().increment(); dropMessage(reference); wakeup(); } + @Override public void afterRollback() throws Exception { reference.setAcked(false); } @@ -1634,6 +1654,7 @@ public class Queue extends BaseDestination implements Task, UsageListener { * org.apache.activemq.broker.region.BaseDestination#processDispatchNotification * (org.apache.activemq.command.MessageDispatchNotification) */ + @Override public void processDispatchNotification(MessageDispatchNotification messageDispatchNotification) throws Exception { // do dispatch Subscription sub = getMatchingSubscription(messageDispatchNotification); diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java index 38be904d91..d6f24b47ef 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java @@ -21,10 +21,11 @@ import java.util.ArrayList; import java.util.LinkedList; import java.util.List; import java.util.Set; +import java.util.concurrent.CancellationException; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CopyOnWriteArraySet; - +import java.util.concurrent.Future; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.ProducerBrokerExchange; @@ -87,6 +88,7 @@ public class Topic extends BaseDestination implements Task { this.taskRunner = taskFactory.createTaskRunner(this, "Topic " + destination.getPhysicalName()); } + @Override public void initialize() throws Exception { super.initialize(); if (store != null) { @@ -402,6 +404,7 @@ public class Topic extends BaseDestination implements Task { final ConnectionContext context = producerExchange.getConnectionContext(); message.setRegionDestination(this); message.getMessageId().setBrokerSequenceId(getDestinationSequenceId()); + Future result = null; if (topicStore != null && message.isPersistent() && !canOptimizeOutPersistence()) { if (systemUsage.getStoreUsage().isFull(getStoreUsageHighWaterMark())) { @@ -413,13 +416,18 @@ public class Topic extends BaseDestination implements Task { waitForSpace(context, systemUsage.getStoreUsage(), getStoreUsageHighWaterMark(), logMessage); } - topicStore.addMessage(context, message); + if (context.isInTransaction()) { + topicStore.addMessage(context, message); + }else { + result = topicStore.asyncAddTopicMessage(context, message); + } } message.incrementReferenceCount(); if (context.isInTransaction()) { context.getTransaction().addSynchronization(new Synchronization() { + @Override public void afterCommit() throws Exception { // It could take while before we receive the commit // operration.. by that time the message could have @@ -445,6 +453,14 @@ public class Topic extends BaseDestination implements Task { message.decrementReferenceCount(); } } + if (result != null && !result.isCancelled()) { + try { + result.get(); + }catch(CancellationException e) { + //ignore - the task has been cancelled if the message + // has already been deleted + } + } } @@ -452,6 +468,7 @@ public class Topic extends BaseDestination implements Task { return durableSubcribers.size() == 0; } + @Override public String toString() { return "Topic: destination=" + destination.getPhysicalName() + ", subscriptions=" + consumers.size(); } diff --git a/activemq-core/src/main/java/org/apache/activemq/store/AbstractMessageStore.java b/activemq-core/src/main/java/org/apache/activemq/store/AbstractMessageStore.java index f0e58a0959..cceae96b5c 100644 --- a/activemq-core/src/main/java/org/apache/activemq/store/AbstractMessageStore.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/AbstractMessageStore.java @@ -17,18 +17,24 @@ package org.apache.activemq.store; import java.io.IOException; +import java.util.concurrent.Callable; +import java.util.concurrent.Future; +import java.util.concurrent.FutureTask; import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.command.Message; +import org.apache.activemq.command.MessageAck; import org.apache.activemq.command.MessageId; import org.apache.activemq.usage.MemoryUsage; abstract public class AbstractMessageStore implements MessageStore { + static final FutureTask FUTURE; protected final ActiveMQDestination destination; public AbstractMessageStore(ActiveMQDestination destination) { this.destination = destination; } - + public void dispose(ConnectionContext context) { } @@ -44,16 +50,43 @@ abstract public class AbstractMessageStore implements MessageStore { public void setMemoryUsage(MemoryUsage memoryUsage) { } - + public void setBatch(MessageId messageId) throws IOException, Exception { } - + /** * flag to indicate if the store is empty + * * @return true if the message count is 0 - * @throws Exception + * @throws Exception */ - public boolean isEmpty() throws Exception{ - return getMessageCount()==0; - } + public boolean isEmpty() throws Exception { + return getMessageCount() == 0; + } + + public Future asyncAddQueueMessage(final ConnectionContext context, final Message message) throws IOException { + addMessage(context, message); + return FUTURE; + } + + + public Future asyncAddTopicMessage(final ConnectionContext context, final Message message) throws IOException { + addMessage(context, message); + return FUTURE; + } + + public void removeAsyncMessage(ConnectionContext context, MessageAck ack) throws IOException { + removeMessage(context, ack); + } + + static class CallableImplementation implements Callable { + public Object call() throws Exception { + return null; + } + } + + static { + FUTURE = new FutureTask(new CallableImplementation()); + FUTURE.run(); + } } diff --git a/activemq-core/src/main/java/org/apache/activemq/store/MessageStore.java b/activemq-core/src/main/java/org/apache/activemq/store/MessageStore.java index 493b80fb74..a4bcd1a04b 100755 --- a/activemq-core/src/main/java/org/apache/activemq/store/MessageStore.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/MessageStore.java @@ -17,6 +17,7 @@ package org.apache.activemq.store; import java.io.IOException; +import java.util.concurrent.Future; import org.apache.activemq.Service; import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.command.ActiveMQDestination; @@ -40,6 +41,30 @@ public interface MessageStore extends Service { * @throws IOException */ void addMessage(ConnectionContext context, Message message) throws IOException; + + /** + * Adds a message to the message store + * + * @param context context + * @param message + * @param l + * @return a Future to track when this is complete + * @throws IOException + * @throws IOException + */ + Future asyncAddQueueMessage(ConnectionContext context, Message message) throws IOException; + + /** + * Adds a message to the message store + * + * @param context context + * @param message + * @param l + * @return a Future to track when this is complete + * @throws IOException + * @throws IOException + */ + Future asyncAddTopicMessage(ConnectionContext context, Message message) throws IOException; /** * Looks up a message using either the String messageID or the @@ -62,6 +87,8 @@ public interface MessageStore extends Service { * @throws IOException */ void removeMessage(ConnectionContext context, MessageAck ack) throws IOException; + + void removeAsyncMessage(ConnectionContext context, MessageAck ack) throws IOException; /** * Removes all the messages from the message store. diff --git a/activemq-core/src/main/java/org/apache/activemq/store/ProxyMessageStore.java b/activemq-core/src/main/java/org/apache/activemq/store/ProxyMessageStore.java index 27c89aaebb..729396bdb3 100755 --- a/activemq-core/src/main/java/org/apache/activemq/store/ProxyMessageStore.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/ProxyMessageStore.java @@ -17,6 +17,7 @@ package org.apache.activemq.store; import java.io.IOException; +import java.util.concurrent.Future; import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.Message; @@ -100,4 +101,16 @@ public class ProxyMessageStore implements MessageStore { public boolean isEmpty() throws Exception { return delegate.isEmpty(); } + + public Future asyncAddQueueMessage(ConnectionContext context, Message message) throws IOException { + return delegate.asyncAddQueueMessage(context, message); + } + + public Future asyncAddTopicMessage(ConnectionContext context, Message message) throws IOException { + return delegate.asyncAddTopicMessage(context, message); + } + + public void removeAsyncMessage(ConnectionContext context, MessageAck ack) throws IOException { + delegate.removeAsyncMessage(context, ack); + } } diff --git a/activemq-core/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java b/activemq-core/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java index 0d4d0d257e..7ea7880c9e 100755 --- a/activemq-core/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java @@ -17,6 +17,7 @@ package org.apache.activemq.store; import java.io.IOException; +import java.util.concurrent.Future; import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.Message; @@ -140,4 +141,16 @@ public class ProxyTopicMessageStore implements TopicMessageStore { public boolean isEmpty() throws Exception { return delegate.isEmpty(); } + + public Future asyncAddTopicMessage(ConnectionContext context, Message message) throws IOException { + return delegate.asyncAddTopicMessage(context, message); + } + + public Future asyncAddQueueMessage(ConnectionContext context, Message message) throws IOException { + return delegate.asyncAddQueueMessage(context, message); + } + + public void removeAsyncMessage(ConnectionContext context, MessageAck ack) throws IOException { + delegate.removeAsyncMessage(context, ack); + } } diff --git a/activemq-core/src/main/java/org/apache/activemq/store/TopicMessageStore.java b/activemq-core/src/main/java/org/apache/activemq/store/TopicMessageStore.java index 44838f985b..c8f03924d2 100755 --- a/activemq-core/src/main/java/org/apache/activemq/store/TopicMessageStore.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/TopicMessageStore.java @@ -17,9 +17,7 @@ package org.apache.activemq.store; import java.io.IOException; - import javax.jms.JMSException; - import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.command.MessageId; import org.apache.activemq.command.SubscriptionInfo; @@ -42,7 +40,7 @@ public interface TopicMessageStore extends MessageStore { * @throws IOException */ void acknowledge(ConnectionContext context, String clientId, String subscriptionName, MessageId messageId) throws IOException; - + /** * @param clientId * @param subscriptionName diff --git a/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java b/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java index 57b3e82de4..6116f621fe 100644 --- a/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java @@ -386,4 +386,35 @@ public class KahaDBPersistenceAdapter implements PersistenceAdapter, BrokerServi public void setDirectoryArchive(File directoryArchive) { letter.setDirectoryArchive(directoryArchive); } + + public boolean isConcurrentStoreAndDispatchQueues() { + return letter.isConcurrentStoreAndDispatchQueues(); + } + + public void setConcurrentStoreAndDispatchQueues(boolean concurrentStoreAndDispatch) { + letter.setConcurrentStoreAndDispatchQueues(concurrentStoreAndDispatch); + } + + public boolean isConcurrentStoreAndDispatchTopics() { + return letter.isConcurrentStoreAndDispatchTopics(); + } + + public void setConcurrentStoreAndDispatchTopics(boolean concurrentStoreAndDispatch) { + letter.setConcurrentStoreAndDispatchTopics(concurrentStoreAndDispatch); + } + + public int getMaxAsyncJobs() { + return letter.getMaxAsyncJobs(); + } + /** + * @param maxAsyncJobs the maxAsyncJobs to set + */ + public void setMaxAsyncJobs(int maxAsyncJobs) { + letter.setMaxAsyncJobs(maxAsyncJobs); + } + + @Override + public String toString() { + return "KahaDBPersistenceAdapter"; + } } diff --git a/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java b/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java index 4e1e4756b0..c920d0a8a8 100644 --- a/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java @@ -18,12 +18,25 @@ package org.apache.activemq.store.kahadb; import java.io.DataInputStream; import java.io.IOException; +import java.io.InterruptedIOException; import java.util.ArrayList; +import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; +import java.util.List; import java.util.Map; import java.util.Set; import java.util.Map.Entry; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.FutureTask; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.Semaphore; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQQueue; @@ -46,7 +59,6 @@ import org.apache.activemq.store.PersistenceAdapter; import org.apache.activemq.store.TopicMessageStore; import org.apache.activemq.store.TransactionRecoveryListener; import org.apache.activemq.store.TransactionStore; -import org.apache.activemq.store.kahadb.MessageDatabase.StoredDestination; import org.apache.activemq.store.kahadb.data.KahaAddMessageCommand; import org.apache.activemq.store.kahadb.data.KahaCommitCommand; import org.apache.activemq.store.kahadb.data.KahaDestination; @@ -62,23 +74,167 @@ import org.apache.activemq.store.kahadb.data.KahaXATransactionId; import org.apache.activemq.store.kahadb.data.KahaDestination.DestinationType; import org.apache.activemq.usage.MemoryUsage; import org.apache.activemq.usage.SystemUsage; +import org.apache.activemq.util.ServiceStopper; import org.apache.activemq.wireformat.WireFormat; import org.apache.kahadb.journal.Location; import org.apache.kahadb.page.Transaction; - public class KahaDBStore extends MessageDatabase implements PersistenceAdapter { - + private static final int MAX_ASYNC_JOBS = 10000; + protected ExecutorService queueExecutor; + protected ExecutorService topicExecutor; + protected final Map asyncQueueMap = new HashMap(); + protected final Map asyncTopicMap = new HashMap(); private final WireFormat wireFormat = new OpenWireFormat(); + private SystemUsage usageManager; + private LinkedBlockingQueue asyncQueueJobQueue; + private LinkedBlockingQueue asyncTopicJobQueue; + private Semaphore queueSemaphore; + private Semaphore topicSemaphore; + private boolean concurrentStoreAndDispatchQueues = true; + private boolean concurrentStoreAndDispatchTopics = true; + private int maxAsyncJobs = MAX_ASYNC_JOBS; + public KahaDBStore() { + + } public void setBrokerName(String brokerName) { } + public void setUsageManager(SystemUsage usageManager) { + this.usageManager = usageManager; + } + + public SystemUsage getUsageManager() { + return this.usageManager; + } + + /** + * @return the concurrentStoreAndDispatch + */ + public boolean isConcurrentStoreAndDispatchQueues() { + return this.concurrentStoreAndDispatchQueues; + } + + /** + * @param concurrentStoreAndDispatch + * the concurrentStoreAndDispatch to set + */ + public void setConcurrentStoreAndDispatchQueues(boolean concurrentStoreAndDispatch) { + this.concurrentStoreAndDispatchQueues = concurrentStoreAndDispatch; + } + + /** + * @return the concurrentStoreAndDispatch + */ + public boolean isConcurrentStoreAndDispatchTopics() { + return this.concurrentStoreAndDispatchTopics; + } + + /** + * @param concurrentStoreAndDispatch + * the concurrentStoreAndDispatch to set + */ + public void setConcurrentStoreAndDispatchTopics(boolean concurrentStoreAndDispatch) { + this.concurrentStoreAndDispatchTopics = concurrentStoreAndDispatch; + } + + /** + * @return the maxAsyncJobs + */ + public int getMaxAsyncJobs() { + return this.maxAsyncJobs; + } + /** + * @param maxAsyncJobs + * the maxAsyncJobs to set + */ + public void setMaxAsyncJobs(int maxAsyncJobs) { + this.maxAsyncJobs = maxAsyncJobs; + } + + @Override + public void doStart() throws Exception { + this.queueSemaphore = new Semaphore(getMaxAsyncJobs()); + this.topicSemaphore = new Semaphore(getMaxAsyncJobs()); + this.asyncQueueJobQueue = new LinkedBlockingQueue(getMaxAsyncJobs()); + this.asyncTopicJobQueue = new LinkedBlockingQueue(getMaxAsyncJobs()); + this.queueExecutor = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, asyncQueueJobQueue, + new ThreadFactory() { + public Thread newThread(Runnable runnable) { + Thread thread = new Thread(runnable, "ConcurrentQueueStoreAndDispatch"); + thread.setDaemon(true); + return thread; + } + }); + this.topicExecutor = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, asyncTopicJobQueue, + new ThreadFactory() { + public Thread newThread(Runnable runnable) { + Thread thread = new Thread(runnable, "ConcurrentTopicStoreAndDispatch"); + thread.setDaemon(true); + return thread; + } + }); + super.doStart(); + + } + + @Override + public void doStop(ServiceStopper stopper) throws Exception { + this.queueSemaphore.drainPermits(); + this.topicSemaphore.drainPermits(); + if (this.queueExecutor != null) { + this.queueExecutor.shutdownNow(); + } + if (this.topicExecutor != null) { + this.topicExecutor.shutdownNow(); + } + super.doStop(stopper); + } + + protected StoreQueueTask removeQueueTask(MessageId id) { + StoreQueueTask task = this.asyncQueueMap.remove(id); + if (task != null) { + task.getMessage().decrementReferenceCount(); + this.queueSemaphore.release(); + } + return task; + } + + protected void addQueueTask(StoreQueueTask task) throws IOException { + try { + this.queueSemaphore.acquire(); + } catch (InterruptedException e) { + throw new InterruptedIOException(e.getMessage()); + } + this.asyncQueueMap.put(task.getMessage().getMessageId(), task); + task.getMessage().incrementReferenceCount(); + this.queueExecutor.execute(task); + } + + protected StoreTopicTask removeTopicTask(MessageId id) { + StoreTopicTask task = this.asyncTopicMap.remove(id); + if (task != null) { + task.getMessage().decrementReferenceCount(); + this.topicSemaphore.release(); + } + return task; + } + + protected void addTopicTask(StoreTopicTask task) throws IOException { + try { + this.topicSemaphore.acquire(); + } catch (InterruptedException e) { + throw new InterruptedIOException(e.getMessage()); + } + this.asyncTopicMap.put(task.getMessage().getMessageId(), task); + task.getMessage().incrementReferenceCount(); + this.topicExecutor.execute(task); } public TransactionStore createTransactionStore() throws IOException { - return new TransactionStore(){ - + return new TransactionStore() { + public void commit(TransactionId txid, boolean wasPrepared) throws IOException { store(new KahaCommitCommand().setTransactionInfo(createTransactionInfo(txid)), true); } @@ -90,22 +246,24 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter { } public void recover(TransactionRecoveryListener listener) throws IOException { for (Map.Entry> entry : preparedTransactions.entrySet()) { - XATransactionId xid = (XATransactionId)entry.getKey(); + XATransactionId xid = (XATransactionId) entry.getKey(); ArrayList messageList = new ArrayList(); ArrayList ackList = new ArrayList(); - + for (Operation op : entry.getValue()) { - if( op.getClass() == AddOpperation.class ) { - AddOpperation addOp = (AddOpperation)op; - Message msg = (Message)wireFormat.unmarshal( new DataInputStream(addOp.getCommand().getMessage().newInput()) ); + if (op.getClass() == AddOpperation.class) { + AddOpperation addOp = (AddOpperation) op; + Message msg = (Message) wireFormat.unmarshal(new DataInputStream(addOp.getCommand() + .getMessage().newInput())); messageList.add(msg); } else { - RemoveOpperation rmOp = (RemoveOpperation)op; - MessageAck ack = (MessageAck)wireFormat.unmarshal( new DataInputStream(rmOp.getCommand().getAck().newInput()) ); + RemoveOpperation rmOp = (RemoveOpperation) op; + MessageAck ack = (MessageAck) wireFormat.unmarshal(new DataInputStream(rmOp.getCommand() + .getAck().newInput())); ackList.add(ack); } } - + Message[] addedMessages = new Message[messageList.size()]; MessageAck[] acks = new MessageAck[ackList.size()]; messageList.toArray(addedMessages); @@ -125,7 +283,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter { public KahaDBMessageStore(ActiveMQDestination destination) { super(destination); - this.dest = convert( destination ); + this.dest = convert(destination); } @Override @@ -133,24 +291,52 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter { return destination; } + @Override + public Future asyncAddQueueMessage(final ConnectionContext context, final Message message) + throws IOException { + if (isConcurrentStoreAndDispatchQueues()) { + StoreQueueTask result = new StoreQueueTask(this, context, message); + addQueueTask(result); + return result.getFuture(); + } else { + return super.asyncAddQueueMessage(context, message); + } + } + + @Override + public void removeAsyncMessage(ConnectionContext context, MessageAck ack) throws IOException { + if (isConcurrentStoreAndDispatchQueues()) { + StoreQueueTask task = removeQueueTask(ack.getLastMessageId()); + if (task != null) { + if (!task.cancel()) { + removeMessage(context, ack); + } + } else { + removeMessage(context, ack); + } + } else { + removeMessage(context, ack); + } + } + public void addMessage(ConnectionContext context, Message message) throws IOException { KahaAddMessageCommand command = new KahaAddMessageCommand(); command.setDestination(dest); command.setMessageId(message.getMessageId().toString()); - command.setTransactionInfo( createTransactionInfo(message.getTransactionId()) ); + command.setTransactionInfo(createTransactionInfo(message.getTransactionId())); org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(message); command.setMessage(new Buffer(packet.getData(), packet.getOffset(), packet.getLength())); store(command, isEnableJournalDiskSyncs() && message.isResponseRequired()); - + } - + public void removeMessage(ConnectionContext context, MessageAck ack) throws IOException { KahaRemoveMessageCommand command = new KahaRemoveMessageCommand(); command.setDestination(dest); command.setMessageId(ack.getLastMessageId().toString()); - command.setTransactionInfo(createTransactionInfo(ack.getTransactionId()) ); + command.setTransactionInfo(createTransactionInfo(ack.getTransactionId())); store(command, isEnableJournalDiskSyncs() && ack.isResponseRequired()); } @@ -162,37 +348,40 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter { public Message getMessage(MessageId identity) throws IOException { final String key = identity.toString(); - - // Hopefully one day the page file supports concurrent read operations... but for now we must + + // Hopefully one day the page file supports concurrent read + // operations... but for now we must // externally synchronize... Location location; - synchronized(indexMutex) { - location = pageFile.tx().execute(new Transaction.CallableClosure(){ + synchronized (indexMutex) { + location = pageFile.tx().execute(new Transaction.CallableClosure() { public Location execute(Transaction tx) throws IOException { StoredDestination sd = getStoredDestination(dest, tx); Long sequence = sd.messageIdIndex.get(tx, key); - if( sequence ==null ) { + if (sequence == null) { return null; } return sd.orderIndex.get(tx, sequence).location; } }); } - if( location == null ) { + if (location == null) { return null; } - + return loadMessage(location); } - + public int getMessageCount() throws IOException { - synchronized(indexMutex) { - return pageFile.tx().execute(new Transaction.CallableClosure(){ + synchronized (indexMutex) { + return pageFile.tx().execute(new Transaction.CallableClosure() { public Integer execute(Transaction tx) throws IOException { - // Iterate through all index entries to get a count of messages in the destination. + // Iterate through all index entries to get a count of + // messages in the destination. StoredDestination sd = getStoredDestination(dest, tx); - int rc=0; - for (Iterator> iterator = sd.locationIndex.iterator(tx); iterator.hasNext();) { + int rc = 0; + for (Iterator> iterator = sd.locationIndex.iterator(tx); iterator + .hasNext();) { iterator.next(); rc++; } @@ -201,12 +390,14 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter { }); } } - + + @Override public boolean isEmpty() throws IOException { - synchronized(indexMutex) { - return pageFile.tx().execute(new Transaction.CallableClosure(){ + synchronized (indexMutex) { + return pageFile.tx().execute(new Transaction.CallableClosure() { public Boolean execute(Transaction tx) throws IOException { - // Iterate through all index entries to get a count of messages in the destination. + // Iterate through all index entries to get a count of + // messages in the destination. StoredDestination sd = getStoredDestination(dest, tx); return sd.locationIndex.isEmpty(tx); } @@ -214,40 +405,41 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter { } } - public void recover(final MessageRecoveryListener listener) throws Exception { - synchronized(indexMutex) { - pageFile.tx().execute(new Transaction.Closure(){ + synchronized (indexMutex) { + pageFile.tx().execute(new Transaction.Closure() { public void execute(Transaction tx) throws Exception { StoredDestination sd = getStoredDestination(dest, tx); - for (Iterator> iterator = sd.orderIndex.iterator(tx); iterator.hasNext();) { + for (Iterator> iterator = sd.orderIndex.iterator(tx); iterator + .hasNext();) { Entry entry = iterator.next(); - listener.recoverMessage( loadMessage(entry.getValue().location) ); + listener.recoverMessage(loadMessage(entry.getValue().location)); } } }); } } - long cursorPos=0; - + long cursorPos = 0; + public void recoverNextMessages(final int maxReturned, final MessageRecoveryListener listener) throws Exception { - synchronized(indexMutex) { - pageFile.tx().execute(new Transaction.Closure(){ + synchronized (indexMutex) { + pageFile.tx().execute(new Transaction.Closure() { public void execute(Transaction tx) throws Exception { StoredDestination sd = getStoredDestination(dest, tx); - Entry entry=null; + Entry entry = null; int counter = 0; - for (Iterator> iterator = sd.orderIndex.iterator(tx, cursorPos); iterator.hasNext();) { + for (Iterator> iterator = sd.orderIndex.iterator(tx, cursorPos); iterator + .hasNext();) { entry = iterator.next(); - listener.recoverMessage( loadMessage(entry.getValue().location ) ); + listener.recoverMessage(loadMessage(entry.getValue().location)); counter++; - if( counter >= maxReturned ) { + if (counter >= maxReturned) { break; } } - if( entry!=null ) { - cursorPos = entry.getKey()+1; + if (entry != null) { + cursorPos = entry.getKey() + 1; } } }); @@ -255,29 +447,29 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter { } public void resetBatching() { - cursorPos=0; + cursorPos = 0; } - @Override public void setBatch(MessageId identity) throws IOException { final String key = identity.toString(); - - // Hopefully one day the page file supports concurrent read operations... but for now we must + + // Hopefully one day the page file supports concurrent read + // operations... but for now we must // externally synchronize... Long location; - synchronized(indexMutex) { - location = pageFile.tx().execute(new Transaction.CallableClosure(){ + synchronized (indexMutex) { + location = pageFile.tx().execute(new Transaction.CallableClosure() { public Long execute(Transaction tx) throws IOException { StoredDestination sd = getStoredDestination(dest, tx); return sd.messageIdIndex.get(tx, key); } }); } - if( location!=null ) { - cursorPos=location+1; + if (location != null) { + cursorPos = location + 1; } - + } @Override @@ -285,32 +477,65 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter { } @Override public void start() throws Exception { + super.start(); } @Override public void stop() throws Exception { + super.stop(); } - + } - + class KahaDBTopicMessageStore extends KahaDBMessageStore implements TopicMessageStore { - public KahaDBTopicMessageStore(ActiveMQTopic destination) { + private final AtomicInteger subscriptionCount = new AtomicInteger(); + public KahaDBTopicMessageStore(ActiveMQTopic destination) throws IOException { super(destination); + this.subscriptionCount.set(getAllSubscriptions().length); } - - public void acknowledge(ConnectionContext context, String clientId, String subscriptionName, MessageId messageId) throws IOException { + + @Override + public Future asyncAddTopicMessage(final ConnectionContext context, final Message message) + throws IOException { + if (isConcurrentStoreAndDispatchTopics()) { + StoreTopicTask result = new StoreTopicTask(this, context, message, subscriptionCount.get()); + addTopicTask(result); + return result.getFuture(); + } else { + return super.asyncAddTopicMessage(context, message); + } + } + + public void acknowledge(ConnectionContext context, String clientId, String subscriptionName, MessageId messageId) + throws IOException { + String subscriptionKey = subscriptionKey(clientId, subscriptionName); + if (isConcurrentStoreAndDispatchTopics()) { + StoreTopicTask task = asyncTopicMap.get(messageId); + if (task != null) { + + if (task.addSubscriptionKey(subscriptionKey)) { + removeTopicTask(messageId); + task.cancel(); + } + } else { + doAcknowledge(context, subscriptionKey, messageId); + } + } else { + doAcknowledge(context, subscriptionKey, messageId); + } + } + + protected void doAcknowledge(ConnectionContext context, String subscriptionKey, MessageId messageId) + throws IOException { KahaRemoveMessageCommand command = new KahaRemoveMessageCommand(); command.setDestination(dest); - command.setSubscriptionKey(subscriptionKey(clientId, subscriptionName)); + command.setSubscriptionKey(subscriptionKey); command.setMessageId(messageId.toString()); - // We are not passed a transaction info.. so we can't participate in a transaction. - // Looks like a design issue with the TopicMessageStore interface. Also we can't recover the original ack - // to pass back to the XA recover method. - // command.setTransactionInfo(); store(command, false); } public void addSubsciption(SubscriptionInfo subscriptionInfo, boolean retroactive) throws IOException { - String subscriptionKey = subscriptionKey(subscriptionInfo.getClientId(), subscriptionInfo.getSubscriptionName()); + String subscriptionKey = subscriptionKey(subscriptionInfo.getClientId(), subscriptionInfo + .getSubscriptionName()); KahaSubscriptionCommand command = new KahaSubscriptionCommand(); command.setDestination(dest); command.setSubscriptionKey(subscriptionKey); @@ -318,6 +543,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter { org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(subscriptionInfo); command.setSubscriptionInfo(new Buffer(packet.getData(), packet.getOffset(), packet.getLength())); store(command, isEnableJournalDiskSyncs() && true); + this.subscriptionCount.incrementAndGet(); } public void deleteSubscription(String clientId, String subscriptionName) throws IOException { @@ -325,111 +551,120 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter { command.setDestination(dest); command.setSubscriptionKey(subscriptionKey(clientId, subscriptionName)); store(command, isEnableJournalDiskSyncs() && true); + this.subscriptionCount.decrementAndGet(); } public SubscriptionInfo[] getAllSubscriptions() throws IOException { - + final ArrayList subscriptions = new ArrayList(); - synchronized(indexMutex) { - pageFile.tx().execute(new Transaction.Closure(){ + synchronized (indexMutex) { + pageFile.tx().execute(new Transaction.Closure() { public void execute(Transaction tx) throws IOException { StoredDestination sd = getStoredDestination(dest, tx); - for (Iterator> iterator = sd.subscriptions.iterator(tx); iterator.hasNext();) { + for (Iterator> iterator = sd.subscriptions.iterator(tx); iterator + .hasNext();) { Entry entry = iterator.next(); - SubscriptionInfo info = (SubscriptionInfo)wireFormat.unmarshal( new DataInputStream(entry.getValue().getSubscriptionInfo().newInput()) ); + SubscriptionInfo info = (SubscriptionInfo) wireFormat.unmarshal(new DataInputStream(entry + .getValue().getSubscriptionInfo().newInput())); subscriptions.add(info); } } }); } - - SubscriptionInfo[]rc=new SubscriptionInfo[subscriptions.size()]; + + SubscriptionInfo[] rc = new SubscriptionInfo[subscriptions.size()]; subscriptions.toArray(rc); return rc; } public SubscriptionInfo lookupSubscription(String clientId, String subscriptionName) throws IOException { final String subscriptionKey = subscriptionKey(clientId, subscriptionName); - synchronized(indexMutex) { - return pageFile.tx().execute(new Transaction.CallableClosure(){ + synchronized (indexMutex) { + return pageFile.tx().execute(new Transaction.CallableClosure() { public SubscriptionInfo execute(Transaction tx) throws IOException { StoredDestination sd = getStoredDestination(dest, tx); KahaSubscriptionCommand command = sd.subscriptions.get(tx, subscriptionKey); - if( command ==null ) { + if (command == null) { return null; } - return (SubscriptionInfo)wireFormat.unmarshal( new DataInputStream(command.getSubscriptionInfo().newInput()) ); + return (SubscriptionInfo) wireFormat.unmarshal(new DataInputStream(command + .getSubscriptionInfo().newInput())); } }); } } - + public int getMessageCount(String clientId, String subscriptionName) throws IOException { final String subscriptionKey = subscriptionKey(clientId, subscriptionName); - synchronized(indexMutex) { - return pageFile.tx().execute(new Transaction.CallableClosure(){ + synchronized (indexMutex) { + return pageFile.tx().execute(new Transaction.CallableClosure() { public Integer execute(Transaction tx) throws IOException { StoredDestination sd = getStoredDestination(dest, tx); Long cursorPos = sd.subscriptionAcks.get(tx, subscriptionKey); - if ( cursorPos==null ) { + if (cursorPos == null) { // The subscription might not exist. return 0; } cursorPos += 1; - + int counter = 0; - for (Iterator> iterator = sd.orderIndex.iterator(tx, cursorPos); iterator.hasNext();) { + for (Iterator> iterator = sd.orderIndex.iterator(tx, cursorPos); iterator + .hasNext();) { iterator.next(); counter++; } return counter; } }); - } + } } - public void recoverSubscription(String clientId, String subscriptionName, final MessageRecoveryListener listener) throws Exception { + public void recoverSubscription(String clientId, String subscriptionName, final MessageRecoveryListener listener) + throws Exception { final String subscriptionKey = subscriptionKey(clientId, subscriptionName); - synchronized(indexMutex) { - pageFile.tx().execute(new Transaction.Closure(){ + synchronized (indexMutex) { + pageFile.tx().execute(new Transaction.Closure() { public void execute(Transaction tx) throws Exception { StoredDestination sd = getStoredDestination(dest, tx); Long cursorPos = sd.subscriptionAcks.get(tx, subscriptionKey); cursorPos += 1; - - for (Iterator> iterator = sd.orderIndex.iterator(tx, cursorPos); iterator.hasNext();) { + + for (Iterator> iterator = sd.orderIndex.iterator(tx, cursorPos); iterator + .hasNext();) { Entry entry = iterator.next(); - listener.recoverMessage( loadMessage(entry.getValue().location ) ); + listener.recoverMessage(loadMessage(entry.getValue().location)); } } }); } } - public void recoverNextMessages(String clientId, String subscriptionName, final int maxReturned, final MessageRecoveryListener listener) throws Exception { + public void recoverNextMessages(String clientId, String subscriptionName, final int maxReturned, + final MessageRecoveryListener listener) throws Exception { final String subscriptionKey = subscriptionKey(clientId, subscriptionName); - synchronized(indexMutex) { - pageFile.tx().execute(new Transaction.Closure(){ + synchronized (indexMutex) { + pageFile.tx().execute(new Transaction.Closure() { public void execute(Transaction tx) throws Exception { StoredDestination sd = getStoredDestination(dest, tx); Long cursorPos = sd.subscriptionCursors.get(subscriptionKey); - if( cursorPos == null ) { + if (cursorPos == null) { cursorPos = sd.subscriptionAcks.get(tx, subscriptionKey); cursorPos += 1; } - - Entry entry=null; + + Entry entry = null; int counter = 0; - for (Iterator> iterator = sd.orderIndex.iterator(tx, cursorPos); iterator.hasNext();) { + for (Iterator> iterator = sd.orderIndex.iterator(tx, cursorPos); iterator + .hasNext();) { entry = iterator.next(); - listener.recoverMessage( loadMessage(entry.getValue().location ) ); + listener.recoverMessage(loadMessage(entry.getValue().location)); counter++; - if( counter >= maxReturned ) { + if (counter >= maxReturned) { break; } } - if( entry!=null ) { + if (entry != null) { sd.subscriptionCursors.put(subscriptionKey, entry.getKey() + 1); } } @@ -440,8 +675,8 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter { public void resetBatching(String clientId, String subscriptionName) { try { final String subscriptionKey = subscriptionKey(clientId, subscriptionName); - synchronized(indexMutex) { - pageFile.tx().execute(new Transaction.Closure(){ + synchronized (indexMutex) { + pageFile.tx().execute(new Transaction.Closure() { public void execute(Transaction tx) throws IOException { StoredDestination sd = getStoredDestination(dest, tx); sd.subscriptionCursors.remove(subscriptionKey); @@ -454,10 +689,10 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter { } } - String subscriptionKey(String clientId, String subscriptionName){ - return clientId+":"+subscriptionName; + String subscriptionKey(String clientId, String subscriptionName) { + return clientId + ":" + subscriptionName; } - + public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException { return new KahaDBMessageStore(destination); } @@ -469,8 +704,9 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter { /** * Cleanup method to remove any state associated with the given destination. * This method does not stop the message store (it might not be cached). - * - * @param destination Destination to forget + * + * @param destination + * Destination to forget */ public void removeQueueMessageStore(ActiveMQQueue destination) { } @@ -478,24 +714,25 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter { /** * Cleanup method to remove any state associated with the given destination * This method does not stop the message store (it might not be cached). - * - * @param destination Destination to forget + * + * @param destination + * Destination to forget */ public void removeTopicMessageStore(ActiveMQTopic destination) { } public void deleteAllMessages() throws IOException { - deleteAllMessages=true; + deleteAllMessages = true; } - - + public Set getDestinations() { try { final HashSet rc = new HashSet(); - synchronized(indexMutex) { - pageFile.tx().execute(new Transaction.Closure(){ + synchronized (indexMutex) { + pageFile.tx().execute(new Transaction.Closure() { public void execute(Transaction tx) throws IOException { - for (Iterator> iterator = metadata.destinations.iterator(tx); iterator.hasNext();) { + for (Iterator> iterator = metadata.destinations.iterator(tx); iterator + .hasNext();) { Entry entry = iterator.next(); if (!isEmptyTopic(entry, tx)) { rc.add(convert(entry.getKey())); @@ -503,7 +740,8 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter { } } - private boolean isEmptyTopic(Entry entry, Transaction tx) throws IOException { + private boolean isEmptyTopic(Entry entry, Transaction tx) + throws IOException { boolean isEmptyTopic = false; ActiveMQDestination dest = convert(entry.getKey()); if (dest.isTopic()) { @@ -521,13 +759,13 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter { throw new RuntimeException(e); } } - + public long getLastMessageBrokerSequenceId() throws IOException { return 0; } - + public long size() { - if ( !started.get() ) { + if (!started.get()) { return 0; } try { @@ -546,15 +784,14 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter { public void rollbackTransaction(ConnectionContext context) throws IOException { throw new IOException("Not yet implemented."); } - + public void checkpoint(boolean sync) throws IOException { super.checkpointCleanup(false); } - - - /////////////////////////////////////////////////////////////////// + + // ///////////////////////////////////////////////////////////////// // Internal helper methods. - /////////////////////////////////////////////////////////////////// + // ///////////////////////////////////////////////////////////////// /** * @param location @@ -562,35 +799,35 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter { * @throws IOException */ Message loadMessage(Location location) throws IOException { - KahaAddMessageCommand addMessage = (KahaAddMessageCommand)load(location); - Message msg = (Message)wireFormat.unmarshal( new DataInputStream(addMessage.getMessage().newInput()) ); + KahaAddMessageCommand addMessage = (KahaAddMessageCommand) load(location); + Message msg = (Message) wireFormat.unmarshal(new DataInputStream(addMessage.getMessage().newInput())); return msg; } - /////////////////////////////////////////////////////////////////// + // ///////////////////////////////////////////////////////////////// // Internal conversion methods. - /////////////////////////////////////////////////////////////////// - + // ///////////////////////////////////////////////////////////////// + KahaTransactionInfo createTransactionInfo(TransactionId txid) { - if( txid ==null ) { + if (txid == null) { return null; } KahaTransactionInfo rc = new KahaTransactionInfo(); - + // Link it up to the previous record that was part of the transaction. ArrayList tx = inflightTransactions.get(txid); - if( tx!=null ) { - rc.setPreviousEntry(convert(tx.get(tx.size()-1).location)); + if (tx != null) { + rc.setPreviousEntry(convert(tx.get(tx.size() - 1).location)); } - - if( txid.isLocalTransaction() ) { - LocalTransactionId t = (LocalTransactionId)txid; + + if (txid.isLocalTransaction()) { + LocalTransactionId t = (LocalTransactionId) txid; KahaLocalTransactionId kahaTxId = new KahaLocalTransactionId(); kahaTxId.setConnectionId(t.getConnectionId().getValue()); kahaTxId.setTransacitonId(t.getValue()); rc.setLocalTransacitonId(kahaTxId); } else { - XATransactionId t = (XATransactionId)txid; + XATransactionId t = (XATransactionId) txid; KahaXATransactionId kahaTxId = new KahaXATransactionId(); kahaTxId.setBranchQualifier(new Buffer(t.getBranchQualifier())); kahaTxId.setGlobalTransactionId(new Buffer(t.getGlobalTransactionId())); @@ -599,18 +836,18 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter { } return rc; } - + KahaLocation convert(Location location) { KahaLocation rc = new KahaLocation(); rc.setLogId(location.getDataFileId()); rc.setOffset(location.getOffset()); return rc; } - + KahaDestination convert(ActiveMQDestination dest) { KahaDestination rc = new KahaDestination(); rc.setName(dest.getPhysicalName()); - switch( dest.getDestinationType() ) { + switch (dest.getDestinationType()) { case ActiveMQDestination.QUEUE_TYPE: rc.setType(DestinationType.QUEUE); return rc; @@ -630,13 +867,13 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter { ActiveMQDestination convert(String dest) { int p = dest.indexOf(":"); - if( p<0 ) { + if (p < 0) { throw new IllegalArgumentException("Not in the valid destination format"); } int type = Integer.parseInt(dest.substring(0, p)); - String name = dest.substring(p+1); - - switch( KahaDestination.DestinationType.valueOf(type) ) { + String name = dest.substring(p + 1); + + switch (KahaDestination.DestinationType.valueOf(type)) { case QUEUE: return new ActiveMQQueue(name); case TOPIC: @@ -645,9 +882,113 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter { return new ActiveMQTempQueue(name); case TEMP_TOPIC: return new ActiveMQTempTopic(name); - default: + default: throw new IllegalArgumentException("Not in the valid destination format"); } } - + + class StoreQueueTask implements Runnable { + protected final Message message; + protected final ConnectionContext context; + protected final MessageStore store; + protected final InnerFutureTask future; + protected final AtomicBoolean done = new AtomicBoolean(); + + public StoreQueueTask(MessageStore store, ConnectionContext context, Message message) { + this.store = store; + this.context = context; + this.message = message; + this.future = new InnerFutureTask(this); + + } + + public Future getFuture() { + return this.future; + } + + public boolean cancel() { + if (this.done.compareAndSet(false, true)) { + this.future.cancel(false); + return true; + } + return false; + } + + public void run() { + try { + if (this.done.compareAndSet(false, true)) { + this.store.addMessage(context, message); + removeQueueTask(this.message.getMessageId()); + this.future.complete(); + } + } catch (Exception e) { + this.future.setException(e); + } + } + + protected Message getMessage() { + return this.message; + } + + private class InnerFutureTask extends FutureTask { + + public InnerFutureTask(Runnable runnable) { + super(runnable, null); + + } + + public void setException(final Exception e) { + super.setException(e); + } + + public void complete() { + super.set(null); + } + } + } + + class StoreTopicTask extends StoreQueueTask { + private final int subscriptionCount; + private final List subscriptionKeys = new ArrayList(1); + private final KahaDBTopicMessageStore topicStore; + public StoreTopicTask(KahaDBTopicMessageStore store, ConnectionContext context, Message message, + int subscriptionCount) { + super(store, context, message); + this.topicStore = store; + this.subscriptionCount = subscriptionCount; + + } + + /** + * add a key + * + * @param key + * @return true if all acknowledgements received + */ + public boolean addSubscriptionKey(String key) { + synchronized (this.subscriptionKeys) { + this.subscriptionKeys.add(key); + } + return this.subscriptionKeys.size() >= this.subscriptionCount; + } + + @Override + public void run() { + try { + if (this.done.compareAndSet(false, true)) { + this.topicStore.addMessage(context, message); + // apply any acks we have + synchronized (this.subscriptionKeys) { + for (String key : this.subscriptionKeys) { + this.topicStore.doAcknowledge(context, key, this.message.getMessageId()); + } + } + removeQueueTask(this.message.getMessageId()); + this.future.complete(); + } + } catch (Exception e) { + this.future.setException(e); + } + } + } } diff --git a/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java b/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java index 7eb1fb4561..1425e0be9f 100644 --- a/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java @@ -57,6 +57,8 @@ import org.apache.activemq.store.kahadb.data.KahaTransactionInfo; import org.apache.activemq.store.kahadb.data.KahaXATransactionId; import org.apache.activemq.util.Callback; import org.apache.activemq.util.IOHelper; +import org.apache.activemq.util.ServiceStopper; +import org.apache.activemq.util.ServiceSupport; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.kahadb.index.BTreeIndex; @@ -78,7 +80,7 @@ import org.apache.kahadb.util.SequenceSet; import org.apache.kahadb.util.StringMarshaller; import org.apache.kahadb.util.VariableMarshaller; -public class MessageDatabase implements BrokerServiceAware { +public class MessageDatabase extends ServiceSupport implements BrokerServiceAware { private BrokerService brokerService; @@ -171,7 +173,7 @@ public class MessageDatabase implements BrokerServiceAware { protected AtomicBoolean opened = new AtomicBoolean(); private LockFile lockFile; private boolean ignoreMissingJournalfiles = false; - private int indexCacheSize = 100; + private int indexCacheSize = 10000; private boolean checkForCorruptJournalFiles = false; private boolean checksumJournalFiles = false; @@ -179,16 +181,14 @@ public class MessageDatabase implements BrokerServiceAware { public MessageDatabase() { } - public void start() throws Exception { - if (started.compareAndSet(false, true)) { - load(); - } + @Override + public void doStart() throws Exception { + load(); } - public void stop() throws Exception { - if (started.compareAndSet(true, false)) { - unload(); - } + @Override + public void doStop(ServiceStopper stopper) throws Exception { + unload(); } private void loadPageFile() throws IOException {