added support for concurrent dispatch and store of persistent messages in KahaDB

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@945102 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Robert Davies 2010-05-17 11:53:28 +00:00
parent 7046dc7f24
commit ea843782b3
10 changed files with 674 additions and 180 deletions

View File

@ -29,18 +29,18 @@ import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.DelayQueue; import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed; import java.util.concurrent.Delayed;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import javax.jms.InvalidSelectorException; import javax.jms.InvalidSelectorException;
import javax.jms.JMSException; import javax.jms.JMSException;
import javax.jms.ResourceAllocationException; import javax.jms.ResourceAllocationException;
import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.ProducerBrokerExchange; import org.apache.activemq.broker.ProducerBrokerExchange;
@ -105,13 +105,13 @@ public class Queue extends BaseDestination implements Task, UsageListener {
private final Object dispatchMutex = new Object(); private final Object dispatchMutex = new Object();
private boolean useConsumerPriority = true; private boolean useConsumerPriority = true;
private boolean strictOrderDispatch = false; private boolean strictOrderDispatch = false;
private QueueDispatchSelector dispatchSelector; private final QueueDispatchSelector dispatchSelector;
private boolean optimizedDispatch = false; private boolean optimizedDispatch = false;
private boolean firstConsumer = false; private boolean firstConsumer = false;
private int timeBeforeDispatchStarts = 0; private int timeBeforeDispatchStarts = 0;
private int consumersBeforeDispatchStarts = 0; private int consumersBeforeDispatchStarts = 0;
private CountDownLatch consumersBeforeStartsLatch; private CountDownLatch consumersBeforeStartsLatch;
private AtomicLong pendingWakeups = new AtomicLong(); private final AtomicLong pendingWakeups = new AtomicLong();
private final Runnable sendMessagesWaitingForSpaceTask = new Runnable() { private final Runnable sendMessagesWaitingForSpaceTask = new Runnable() {
public void run() { public void run() {
@ -163,6 +163,7 @@ public class Queue extends BaseDestination implements Task, UsageListener {
class FlowControlTimeoutTask extends Thread { class FlowControlTimeoutTask extends Thread {
@Override
public void run() { public void run() {
TimeoutMessage timeout; TimeoutMessage timeout;
try { try {
@ -220,6 +221,7 @@ public class Queue extends BaseDestination implements Task, UsageListener {
} }
} }
@Override
public void initialize() throws Exception { public void initialize() throws Exception {
if (this.messages == null) { if (this.messages == null) {
if (destination.isTemporary() || broker == null || store == null) { if (destination.isTemporary() || broker == null || store == null) {
@ -554,6 +556,7 @@ public class Queue extends BaseDestination implements Task, UsageListener {
void doMessageSend(final ProducerBrokerExchange producerExchange, final Message message) throws IOException, Exception { void doMessageSend(final ProducerBrokerExchange producerExchange, final Message message) throws IOException, Exception {
final ConnectionContext context = producerExchange.getConnectionContext(); final ConnectionContext context = producerExchange.getConnectionContext();
Future<Object> result = null;
synchronized (sendLock) { synchronized (sendLock) {
if (store != null && message.isPersistent()) { if (store != null && message.isPersistent()) {
if (systemUsage.getStoreUsage().isFull(getStoreUsageHighWaterMark())) { if (systemUsage.getStoreUsage().isFull(getStoreUsageHighWaterMark())) {
@ -568,8 +571,11 @@ public class Queue extends BaseDestination implements Task, UsageListener {
waitForSpace(context, systemUsage.getStoreUsage(), getStoreUsageHighWaterMark(), logMessage); waitForSpace(context, systemUsage.getStoreUsage(), getStoreUsageHighWaterMark(), logMessage);
} }
message.getMessageId().setBrokerSequenceId(getDestinationSequenceId()); message.getMessageId().setBrokerSequenceId(getDestinationSequenceId());
store.addMessage(context, message); if (context.isInTransaction()) {
store.addMessage(context, message);
}else {
result = store.asyncAddQueueMessage(context, message);
}
} }
} }
if (context.isInTransaction()) { if (context.isInTransaction()) {
@ -578,6 +584,7 @@ public class Queue extends BaseDestination implements Task, UsageListener {
// our memory. This increment is decremented once the tx finishes.. // our memory. This increment is decremented once the tx finishes..
message.incrementReferenceCount(); message.incrementReferenceCount();
context.getTransaction().addSynchronization(new Synchronization() { context.getTransaction().addSynchronization(new Synchronization() {
@Override
public void afterCommit() throws Exception { public void afterCommit() throws Exception {
try { try {
// It could take while before we receive the commit // It could take while before we receive the commit
@ -603,6 +610,14 @@ public class Queue extends BaseDestination implements Task, UsageListener {
// usage manager. // usage manager.
sendMessage(context, message); sendMessage(context, message);
} }
if (result != null && !result.isCancelled()) {
try {
result.get();
}catch(CancellationException e) {
//ignore - the task has been cancelled if the message
// has already been deleted
}
}
} }
private void expireMessages() { private void expireMessages() {
@ -651,7 +666,7 @@ public class Queue extends BaseDestination implements Task, UsageListener {
ack.setLastMessageId(node.getMessageId()); ack.setLastMessageId(node.getMessageId());
ack.setMessageCount(1); ack.setMessageCount(1);
} }
store.removeMessage(context, ack); store.removeAsyncMessage(context, ack);
} }
} }
@ -666,6 +681,7 @@ public class Queue extends BaseDestination implements Task, UsageListener {
return msg; return msg;
} }
@Override
public String toString() { public String toString() {
int size = 0; int size = 0;
synchronized (messages) { synchronized (messages) {
@ -725,6 +741,7 @@ public class Queue extends BaseDestination implements Task, UsageListener {
// Properties // Properties
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
@Override
public ActiveMQDestination getActiveMQDestination() { public ActiveMQDestination getActiveMQDestination() {
return destination; return destination;
} }
@ -936,7 +953,7 @@ public class Queue extends BaseDestination implements Task, UsageListener {
for (MessageReference ref : list) { for (MessageReference ref : list) {
try { try {
QueueMessageReference r = (QueueMessageReference) ref; QueueMessageReference r = (QueueMessageReference) ref;
removeMessage(c, (IndirectMessageReference) r); removeMessage(c, r);
} catch (IOException e) { } catch (IOException e) {
} }
} }
@ -1273,6 +1290,7 @@ public class Queue extends BaseDestination implements Task, UsageListener {
return messageId.equals(r.getMessageId().toString()); return messageId.equals(r.getMessageId().toString());
} }
@Override
public String toString() { public String toString() {
return "MessageIdFilter: " + messageId; return "MessageIdFilter: " + messageId;
} }
@ -1326,12 +1344,14 @@ public class Queue extends BaseDestination implements Task, UsageListener {
} finally { } finally {
context.getTransaction().addSynchronization(new Synchronization() { context.getTransaction().addSynchronization(new Synchronization() {
@Override
public void afterCommit() throws Exception { public void afterCommit() throws Exception {
getDestinationStatistics().getDequeues().increment(); getDestinationStatistics().getDequeues().increment();
dropMessage(reference); dropMessage(reference);
wakeup(); wakeup();
} }
@Override
public void afterRollback() throws Exception { public void afterRollback() throws Exception {
reference.setAcked(false); reference.setAcked(false);
} }
@ -1634,6 +1654,7 @@ public class Queue extends BaseDestination implements Task, UsageListener {
* org.apache.activemq.broker.region.BaseDestination#processDispatchNotification * org.apache.activemq.broker.region.BaseDestination#processDispatchNotification
* (org.apache.activemq.command.MessageDispatchNotification) * (org.apache.activemq.command.MessageDispatchNotification)
*/ */
@Override
public void processDispatchNotification(MessageDispatchNotification messageDispatchNotification) throws Exception { public void processDispatchNotification(MessageDispatchNotification messageDispatchNotification) throws Exception {
// do dispatch // do dispatch
Subscription sub = getMatchingSubscription(messageDispatchNotification); Subscription sub = getMatchingSubscription(messageDispatchNotification);

View File

@ -21,10 +21,11 @@ import java.util.ArrayList;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.Set; import java.util.Set;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.Future;
import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.ProducerBrokerExchange; import org.apache.activemq.broker.ProducerBrokerExchange;
@ -87,6 +88,7 @@ public class Topic extends BaseDestination implements Task {
this.taskRunner = taskFactory.createTaskRunner(this, "Topic " + destination.getPhysicalName()); this.taskRunner = taskFactory.createTaskRunner(this, "Topic " + destination.getPhysicalName());
} }
@Override
public void initialize() throws Exception { public void initialize() throws Exception {
super.initialize(); super.initialize();
if (store != null) { if (store != null) {
@ -402,6 +404,7 @@ public class Topic extends BaseDestination implements Task {
final ConnectionContext context = producerExchange.getConnectionContext(); final ConnectionContext context = producerExchange.getConnectionContext();
message.setRegionDestination(this); message.setRegionDestination(this);
message.getMessageId().setBrokerSequenceId(getDestinationSequenceId()); message.getMessageId().setBrokerSequenceId(getDestinationSequenceId());
Future<Object> result = null;
if (topicStore != null && message.isPersistent() && !canOptimizeOutPersistence()) { if (topicStore != null && message.isPersistent() && !canOptimizeOutPersistence()) {
if (systemUsage.getStoreUsage().isFull(getStoreUsageHighWaterMark())) { if (systemUsage.getStoreUsage().isFull(getStoreUsageHighWaterMark())) {
@ -413,13 +416,18 @@ public class Topic extends BaseDestination implements Task {
waitForSpace(context, systemUsage.getStoreUsage(), getStoreUsageHighWaterMark(), logMessage); waitForSpace(context, systemUsage.getStoreUsage(), getStoreUsageHighWaterMark(), logMessage);
} }
topicStore.addMessage(context, message); if (context.isInTransaction()) {
topicStore.addMessage(context, message);
}else {
result = topicStore.asyncAddTopicMessage(context, message);
}
} }
message.incrementReferenceCount(); message.incrementReferenceCount();
if (context.isInTransaction()) { if (context.isInTransaction()) {
context.getTransaction().addSynchronization(new Synchronization() { context.getTransaction().addSynchronization(new Synchronization() {
@Override
public void afterCommit() throws Exception { public void afterCommit() throws Exception {
// It could take while before we receive the commit // It could take while before we receive the commit
// operration.. by that time the message could have // operration.. by that time the message could have
@ -445,6 +453,14 @@ public class Topic extends BaseDestination implements Task {
message.decrementReferenceCount(); message.decrementReferenceCount();
} }
} }
if (result != null && !result.isCancelled()) {
try {
result.get();
}catch(CancellationException e) {
//ignore - the task has been cancelled if the message
// has already been deleted
}
}
} }
@ -452,6 +468,7 @@ public class Topic extends BaseDestination implements Task {
return durableSubcribers.size() == 0; return durableSubcribers.size() == 0;
} }
@Override
public String toString() { public String toString() {
return "Topic: destination=" + destination.getPhysicalName() + ", subscriptions=" + consumers.size(); return "Topic: destination=" + destination.getPhysicalName() + ", subscriptions=" + consumers.size();
} }

View File

@ -17,18 +17,24 @@
package org.apache.activemq.store; package org.apache.activemq.store;
import java.io.IOException; import java.io.IOException;
import java.util.concurrent.Callable;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageId; import org.apache.activemq.command.MessageId;
import org.apache.activemq.usage.MemoryUsage; import org.apache.activemq.usage.MemoryUsage;
abstract public class AbstractMessageStore implements MessageStore { abstract public class AbstractMessageStore implements MessageStore {
static final FutureTask<Object> FUTURE;
protected final ActiveMQDestination destination; protected final ActiveMQDestination destination;
public AbstractMessageStore(ActiveMQDestination destination) { public AbstractMessageStore(ActiveMQDestination destination) {
this.destination = destination; this.destination = destination;
} }
public void dispose(ConnectionContext context) { public void dispose(ConnectionContext context) {
} }
@ -44,16 +50,43 @@ abstract public class AbstractMessageStore implements MessageStore {
public void setMemoryUsage(MemoryUsage memoryUsage) { public void setMemoryUsage(MemoryUsage memoryUsage) {
} }
public void setBatch(MessageId messageId) throws IOException, Exception { public void setBatch(MessageId messageId) throws IOException, Exception {
} }
/** /**
* flag to indicate if the store is empty * flag to indicate if the store is empty
*
* @return true if the message count is 0 * @return true if the message count is 0
* @throws Exception * @throws Exception
*/ */
public boolean isEmpty() throws Exception{ public boolean isEmpty() throws Exception {
return getMessageCount()==0; return getMessageCount() == 0;
} }
public Future<Object> asyncAddQueueMessage(final ConnectionContext context, final Message message) throws IOException {
addMessage(context, message);
return FUTURE;
}
public Future<Object> asyncAddTopicMessage(final ConnectionContext context, final Message message) throws IOException {
addMessage(context, message);
return FUTURE;
}
public void removeAsyncMessage(ConnectionContext context, MessageAck ack) throws IOException {
removeMessage(context, ack);
}
static class CallableImplementation implements Callable<Object> {
public Object call() throws Exception {
return null;
}
}
static {
FUTURE = new FutureTask<Object>(new CallableImplementation());
FUTURE.run();
}
} }

View File

@ -17,6 +17,7 @@
package org.apache.activemq.store; package org.apache.activemq.store;
import java.io.IOException; import java.io.IOException;
import java.util.concurrent.Future;
import org.apache.activemq.Service; import org.apache.activemq.Service;
import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQDestination;
@ -40,6 +41,30 @@ public interface MessageStore extends Service {
* @throws IOException * @throws IOException
*/ */
void addMessage(ConnectionContext context, Message message) throws IOException; void addMessage(ConnectionContext context, Message message) throws IOException;
/**
* Adds a message to the message store
*
* @param context context
* @param message
* @param l
* @return a Future to track when this is complete
* @throws IOException
* @throws IOException
*/
Future<Object> asyncAddQueueMessage(ConnectionContext context, Message message) throws IOException;
/**
* Adds a message to the message store
*
* @param context context
* @param message
* @param l
* @return a Future to track when this is complete
* @throws IOException
* @throws IOException
*/
Future<Object> asyncAddTopicMessage(ConnectionContext context, Message message) throws IOException;
/** /**
* Looks up a message using either the String messageID or the * Looks up a message using either the String messageID or the
@ -62,6 +87,8 @@ public interface MessageStore extends Service {
* @throws IOException * @throws IOException
*/ */
void removeMessage(ConnectionContext context, MessageAck ack) throws IOException; void removeMessage(ConnectionContext context, MessageAck ack) throws IOException;
void removeAsyncMessage(ConnectionContext context, MessageAck ack) throws IOException;
/** /**
* Removes all the messages from the message store. * Removes all the messages from the message store.

View File

@ -17,6 +17,7 @@
package org.apache.activemq.store; package org.apache.activemq.store;
import java.io.IOException; import java.io.IOException;
import java.util.concurrent.Future;
import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.Message; import org.apache.activemq.command.Message;
@ -100,4 +101,16 @@ public class ProxyMessageStore implements MessageStore {
public boolean isEmpty() throws Exception { public boolean isEmpty() throws Exception {
return delegate.isEmpty(); return delegate.isEmpty();
} }
public Future<Object> asyncAddQueueMessage(ConnectionContext context, Message message) throws IOException {
return delegate.asyncAddQueueMessage(context, message);
}
public Future<Object> asyncAddTopicMessage(ConnectionContext context, Message message) throws IOException {
return delegate.asyncAddTopicMessage(context, message);
}
public void removeAsyncMessage(ConnectionContext context, MessageAck ack) throws IOException {
delegate.removeAsyncMessage(context, ack);
}
} }

View File

@ -17,6 +17,7 @@
package org.apache.activemq.store; package org.apache.activemq.store;
import java.io.IOException; import java.io.IOException;
import java.util.concurrent.Future;
import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.Message; import org.apache.activemq.command.Message;
@ -140,4 +141,16 @@ public class ProxyTopicMessageStore implements TopicMessageStore {
public boolean isEmpty() throws Exception { public boolean isEmpty() throws Exception {
return delegate.isEmpty(); return delegate.isEmpty();
} }
public Future<Object> asyncAddTopicMessage(ConnectionContext context, Message message) throws IOException {
return delegate.asyncAddTopicMessage(context, message);
}
public Future<Object> asyncAddQueueMessage(ConnectionContext context, Message message) throws IOException {
return delegate.asyncAddQueueMessage(context, message);
}
public void removeAsyncMessage(ConnectionContext context, MessageAck ack) throws IOException {
delegate.removeAsyncMessage(context, ack);
}
} }

View File

@ -17,9 +17,7 @@
package org.apache.activemq.store; package org.apache.activemq.store;
import java.io.IOException; import java.io.IOException;
import javax.jms.JMSException; import javax.jms.JMSException;
import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.MessageId; import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.SubscriptionInfo; import org.apache.activemq.command.SubscriptionInfo;
@ -42,7 +40,7 @@ public interface TopicMessageStore extends MessageStore {
* @throws IOException * @throws IOException
*/ */
void acknowledge(ConnectionContext context, String clientId, String subscriptionName, MessageId messageId) throws IOException; void acknowledge(ConnectionContext context, String clientId, String subscriptionName, MessageId messageId) throws IOException;
/** /**
* @param clientId * @param clientId
* @param subscriptionName * @param subscriptionName

View File

@ -386,4 +386,35 @@ public class KahaDBPersistenceAdapter implements PersistenceAdapter, BrokerServi
public void setDirectoryArchive(File directoryArchive) { public void setDirectoryArchive(File directoryArchive) {
letter.setDirectoryArchive(directoryArchive); letter.setDirectoryArchive(directoryArchive);
} }
public boolean isConcurrentStoreAndDispatchQueues() {
return letter.isConcurrentStoreAndDispatchQueues();
}
public void setConcurrentStoreAndDispatchQueues(boolean concurrentStoreAndDispatch) {
letter.setConcurrentStoreAndDispatchQueues(concurrentStoreAndDispatch);
}
public boolean isConcurrentStoreAndDispatchTopics() {
return letter.isConcurrentStoreAndDispatchTopics();
}
public void setConcurrentStoreAndDispatchTopics(boolean concurrentStoreAndDispatch) {
letter.setConcurrentStoreAndDispatchTopics(concurrentStoreAndDispatch);
}
public int getMaxAsyncJobs() {
return letter.getMaxAsyncJobs();
}
/**
* @param maxAsyncJobs the maxAsyncJobs to set
*/
public void setMaxAsyncJobs(int maxAsyncJobs) {
letter.setMaxAsyncJobs(maxAsyncJobs);
}
@Override
public String toString() {
return "KahaDBPersistenceAdapter";
}
} }

View File

@ -57,6 +57,8 @@ import org.apache.activemq.store.kahadb.data.KahaTransactionInfo;
import org.apache.activemq.store.kahadb.data.KahaXATransactionId; import org.apache.activemq.store.kahadb.data.KahaXATransactionId;
import org.apache.activemq.util.Callback; import org.apache.activemq.util.Callback;
import org.apache.activemq.util.IOHelper; import org.apache.activemq.util.IOHelper;
import org.apache.activemq.util.ServiceStopper;
import org.apache.activemq.util.ServiceSupport;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.kahadb.index.BTreeIndex; import org.apache.kahadb.index.BTreeIndex;
@ -78,7 +80,7 @@ import org.apache.kahadb.util.SequenceSet;
import org.apache.kahadb.util.StringMarshaller; import org.apache.kahadb.util.StringMarshaller;
import org.apache.kahadb.util.VariableMarshaller; import org.apache.kahadb.util.VariableMarshaller;
public class MessageDatabase implements BrokerServiceAware { public class MessageDatabase extends ServiceSupport implements BrokerServiceAware {
private BrokerService brokerService; private BrokerService brokerService;
@ -171,7 +173,7 @@ public class MessageDatabase implements BrokerServiceAware {
protected AtomicBoolean opened = new AtomicBoolean(); protected AtomicBoolean opened = new AtomicBoolean();
private LockFile lockFile; private LockFile lockFile;
private boolean ignoreMissingJournalfiles = false; private boolean ignoreMissingJournalfiles = false;
private int indexCacheSize = 100; private int indexCacheSize = 10000;
private boolean checkForCorruptJournalFiles = false; private boolean checkForCorruptJournalFiles = false;
private boolean checksumJournalFiles = false; private boolean checksumJournalFiles = false;
@ -179,16 +181,14 @@ public class MessageDatabase implements BrokerServiceAware {
public MessageDatabase() { public MessageDatabase() {
} }
public void start() throws Exception { @Override
if (started.compareAndSet(false, true)) { public void doStart() throws Exception {
load(); load();
}
} }
public void stop() throws Exception { @Override
if (started.compareAndSet(true, false)) { public void doStop(ServiceStopper stopper) throws Exception {
unload(); unload();
}
} }
private void loadPageFile() throws IOException { private void loadPageFile() throws IOException {