Transactions dispatch and commit to the store asynchronously, though the commit only returns to the producer when they both complete for KahaDB

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@955039 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Robert Davies 2010-06-15 20:33:41 +00:00
parent ad06a5f9c3
commit 27262c8463
16 changed files with 1000 additions and 331 deletions

View File

@ -93,7 +93,8 @@ public class Queue extends BaseDestination implements Task, UsageListener {
protected final List<Subscription> consumers = new ArrayList<Subscription>(50);
protected PendingMessageCursor messages;
private final LinkedHashMap<MessageId, QueueMessageReference> pagedInMessages = new LinkedHashMap<MessageId, QueueMessageReference>();
// Messages that are paged in but have not yet been targeted at a subscription
// Messages that are paged in but have not yet been targeted at a
// subscription
private List<QueueMessageReference> pagedInPendingDispatch = new ArrayList<QueueMessageReference>(100);
private List<QueueMessageReference> redeliveredWaitingDispatch = new ArrayList<QueueMessageReference>();
private MessageGroupMap messageGroupOwners;
@ -101,7 +102,8 @@ public class Queue extends BaseDestination implements Task, UsageListener {
private MessageGroupMapFactory messageGroupMapFactory = new MessageGroupHashBucketFactory();
private final Object sendLock = new Object();
private ExecutorService executor;
protected final Map<MessageId, Runnable> messagesWaitingForSpace = Collections.synchronizedMap(new LinkedHashMap<MessageId, Runnable>());
protected final Map<MessageId, Runnable> messagesWaitingForSpace = Collections
.synchronizedMap(new LinkedHashMap<MessageId, Runnable>());
private final Object dispatchMutex = new Object();
private boolean useConsumerPriority = true;
private boolean strictOrderDispatch = false;
@ -112,7 +114,7 @@ public class Queue extends BaseDestination implements Task, UsageListener {
private int consumersBeforeDispatchStarts = 0;
private CountDownLatch consumersBeforeStartsLatch;
private final AtomicLong pendingWakeups = new AtomicLong();
private final Runnable sendMessagesWaitingForSpaceTask = new Runnable() {
public void run() {
asyncWakeup();
@ -123,46 +125,47 @@ public class Queue extends BaseDestination implements Task, UsageListener {
expireMessages();
}
};
private final Object iteratingMutex = new Object() {};
private final Object iteratingMutex = new Object() {
};
private final Scheduler scheduler;
class TimeoutMessage implements Delayed {
Message message;
ConnectionContext context;
long trigger;
public TimeoutMessage(Message message, ConnectionContext context, long delay) {
this.message = message;
this.context = context;
this.trigger = System.currentTimeMillis() + delay;
}
public long getDelay(TimeUnit unit) {
long n = trigger - System.currentTimeMillis();
return unit.convert(n, TimeUnit.MILLISECONDS);
}
public int compareTo(Delayed delayed) {
long other = ((TimeoutMessage)delayed).trigger;
long other = ((TimeoutMessage) delayed).trigger;
int returnValue;
if (this.trigger < other) {
returnValue = -1;
returnValue = -1;
} else if (this.trigger > other) {
returnValue = 1;
returnValue = 1;
} else {
returnValue = 0;
returnValue = 0;
}
return returnValue;
}
}
DelayQueue<TimeoutMessage> flowControlTimeoutMessages = new DelayQueue<TimeoutMessage>();
class FlowControlTimeoutTask extends Thread {
@Override
public void run() {
TimeoutMessage timeout;
@ -172,8 +175,14 @@ public class Queue extends BaseDestination implements Task, UsageListener {
if (timeout != null) {
synchronized (messagesWaitingForSpace) {
if (messagesWaitingForSpace.remove(timeout.message.getMessageId()) != null) {
ExceptionResponse response = new ExceptionResponse(new ResourceAllocationException("Usage Manager Memory Limit reached. Stopping producer (" + timeout.message.getProducerId() + ") to prevent flooding "
+ getActiveMQDestination().getQualifiedName() + "." + " See http://activemq.apache.org/producer-flow-control.html for more info"));
ExceptionResponse response = new ExceptionResponse(
new ResourceAllocationException(
"Usage Manager Memory Limit reached. Stopping producer ("
+ timeout.message.getProducerId()
+ ") to prevent flooding "
+ getActiveMQDestination().getQualifiedName()
+ "."
+ " See http://activemq.apache.org/producer-flow-control.html for more info"));
response.setCorrelationId(timeout.message.getCommandId());
timeout.context.getConnection().dispatchAsync(response);
}
@ -187,19 +196,19 @@ public class Queue extends BaseDestination implements Task, UsageListener {
}
}
};
private final FlowControlTimeoutTask flowControlTimeoutTask = new FlowControlTimeoutTask();
private static final Comparator<Subscription> orderedCompare = new Comparator<Subscription>() {
public int compare(Subscription s1, Subscription s2) {
//We want the list sorted in descending order
// We want the list sorted in descending order
return s2.getConsumerInfo().getPriority() - s1.getConsumerInfo().getPriority();
}
};
public Queue(BrokerService brokerService, final ActiveMQDestination destination, MessageStore store, DestinationStatistics parentStats, TaskRunnerFactory taskFactory) throws Exception {
public Queue(BrokerService brokerService, final ActiveMQDestination destination, MessageStore store,
DestinationStatistics parentStats, TaskRunnerFactory taskFactory) throws Exception {
super(brokerService, store, destination, parentStats);
this.taskFactory = taskFactory;
this.dispatchSelector = new QueueDispatchSelector(destination);
@ -212,7 +221,8 @@ public class Queue extends BaseDestination implements Task, UsageListener {
}
}
// make the queue easily visible in the debugger from its task runner threads
// make the queue easily visible in the debugger from its task runner
// threads
final class QueueThread extends Thread {
final Queue queue;
@ -231,9 +241,12 @@ public class Queue extends BaseDestination implements Task, UsageListener {
this.messages = new StoreQueueCursor(broker, this);
}
}
// If a VMPendingMessageCursor don't use the default Producer System Usage
// since it turns into a shared blocking queue which can lead to a network deadlock.
// If we are cursoring to disk..it's not and issue because it does not block due
// If a VMPendingMessageCursor don't use the default Producer System
// Usage
// since it turns into a shared blocking queue which can lead to a
// network deadlock.
// If we are cursoring to disk..it's not and issue because it does not
// block due
// to large disk sizes.
if (messages instanceof VMPendingMessageCursor) {
this.systemUsage = brokerService.getSystemUsage();
@ -260,7 +273,8 @@ public class Queue extends BaseDestination implements Task, UsageListener {
if (message.isExpired()) {
if (broker.isExpired(message)) {
messageExpired(createConnectionContext(), createMessageReference(message));
// drop message will decrement so counter balance here
// drop message will decrement so counter
// balance here
destinationStatistics.getMessages().increment();
}
return true;
@ -300,8 +314,9 @@ public class Queue extends BaseDestination implements Task, UsageListener {
}
/*
* Holder for subscription that needs attention on next iterate
* browser needs access to existing messages in the queue that have already been dispatched
* Holder for subscription that needs attention on next iterate browser
* needs access to existing messages in the queue that have already been
* dispatched
*/
class BrowserDispatch {
QueueBrowserSubscription browser;
@ -370,26 +385,30 @@ public class Queue extends BaseDestination implements Task, UsageListener {
browserDispatches.addLast(browserDispatch);
}
}
if (!(this.optimizedDispatch || isSlave())) {
wakeup();
}
}
if (this.optimizedDispatch || isSlave()) {
// Outside of dispatchLock() to maintain the lock hierarchy of
// iteratingMutex -> dispatchLock. - see https://issues.apache.org/activemq/browse/AMQ-1878
// iteratingMutex -> dispatchLock. - see
// https://issues.apache.org/activemq/browse/AMQ-1878
wakeup();
}
}
public void removeSubscription(ConnectionContext context, Subscription sub, long lastDeiveredSequenceId) throws Exception {
public void removeSubscription(ConnectionContext context, Subscription sub, long lastDeiveredSequenceId)
throws Exception {
destinationStatistics.getConsumers().decrement();
// synchronize with dispatch method so that no new messages are sent
// while removing up a subscription.
synchronized (dispatchMutex) {
if (LOG.isDebugEnabled()) {
LOG.debug("remove sub: " + sub + ", lastDeliveredSeqId: " + lastDeiveredSequenceId + ", dequeues: " + getDestinationStatistics().getDequeues().getCount() + ", dispatched: "
+ getDestinationStatistics().getDispatched().getCount() + ", inflight: " + getDestinationStatistics().getInflight().getCount());
LOG.debug("remove sub: " + sub + ", lastDeliveredSeqId: " + lastDeiveredSequenceId + ", dequeues: "
+ getDestinationStatistics().getDequeues().getCount() + ", dispatched: "
+ getDestinationStatistics().getDispatched().getCount() + ", inflight: "
+ getDestinationStatistics().getInflight().getCount());
}
synchronized (consumers) {
removeFromConsumerList(sub);
@ -398,7 +417,9 @@ public class Queue extends BaseDestination implements Task, UsageListener {
if (exclusiveConsumer == sub) {
exclusiveConsumer = null;
for (Subscription s : consumers) {
if (s.getConsumerInfo().isExclusive() && (exclusiveConsumer == null || s.getConsumerInfo().getPriority() > exclusiveConsumer.getConsumerInfo().getPriority())) {
if (s.getConsumerInfo().isExclusive()
&& (exclusiveConsumer == null || s.getConsumerInfo().getPriority() > exclusiveConsumer
.getConsumerInfo().getPriority())) {
exclusiveConsumer = s;
}
@ -410,13 +431,15 @@ public class Queue extends BaseDestination implements Task, UsageListener {
getMessageGroupOwners().removeConsumer(consumerId);
// redeliver inflight messages
for (MessageReference ref : sub.remove(context, this)) {
QueueMessageReference qmr = (QueueMessageReference) ref;
if (qmr.getLockOwner() == sub) {
qmr.unlock();
// only increment redelivery if it was delivered or we have no delivery information
if (lastDeiveredSequenceId == 0 || qmr.getMessageId().getBrokerSequenceId() <= lastDeiveredSequenceId) {
// only increment redelivery if it was delivered or we
// have no delivery information
if (lastDeiveredSequenceId == 0
|| qmr.getMessageId().getBrokerSequenceId() <= lastDeiveredSequenceId) {
qmr.incrementRedeliveryCounter();
}
}
@ -432,7 +455,8 @@ public class Queue extends BaseDestination implements Task, UsageListener {
}
if (this.optimizedDispatch || isSlave()) {
// Outside of dispatchLock() to maintain the lock hierarchy of
// iteratingMutex -> dispatchLock. - see https://issues.apache.org/activemq/browse/AMQ-1878
// iteratingMutex -> dispatchLock. - see
// https://issues.apache.org/activemq/browse/AMQ-1878
wakeup();
}
}
@ -443,9 +467,10 @@ public class Queue extends BaseDestination implements Task, UsageListener {
// destination.. it may have expired.
message.setRegionDestination(this);
final ProducerInfo producerInfo = producerExchange.getProducerState().getInfo();
final boolean sendProducerAck = !message.isResponseRequired() && producerInfo.getWindowSize() > 0 && !context.isInRecoveryMode();
final boolean sendProducerAck = !message.isResponseRequired() && producerInfo.getWindowSize() > 0
&& !context.isInRecoveryMode();
if (message.isExpired()) {
//message not stored - or added to stats yet - so chuck here
// message not stored - or added to stats yet - so chuck here
broker.getRoot().messageExpired(context, message);
if (sendProducerAck) {
ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), message.getSize());
@ -459,20 +484,28 @@ public class Queue extends BaseDestination implements Task, UsageListener {
if (isProducerFlowControl() && context.isProducerFlowControl()) {
if (warnOnProducerFlowControl) {
warnOnProducerFlowControl = false;
LOG.info("Usage Manager Memory Limit ("+ memoryUsage.getLimit() + ") reached on " + getActiveMQDestination().getQualifiedName()
+ ". Producers will be throttled to the rate at which messages are removed from this destination to prevent flooding it."
+ " See http://activemq.apache.org/producer-flow-control.html for more info");
LOG
.info("Usage Manager Memory Limit ("
+ memoryUsage.getLimit()
+ ") reached on "
+ getActiveMQDestination().getQualifiedName()
+ ". Producers will be throttled to the rate at which messages are removed from this destination to prevent flooding it."
+ " See http://activemq.apache.org/producer-flow-control.html for more info");
}
if (systemUsage.isSendFailIfNoSpace()) {
throw new ResourceAllocationException("Usage Manager Memory Limit reached. Stopping producer (" + message.getProducerId() + ") to prevent flooding "
+ getActiveMQDestination().getQualifiedName() + "." + " See http://activemq.apache.org/producer-flow-control.html for more info");
throw new ResourceAllocationException("Usage Manager Memory Limit reached. Stopping producer ("
+ message.getProducerId() + ") to prevent flooding "
+ getActiveMQDestination().getQualifiedName() + "."
+ " See http://activemq.apache.org/producer-flow-control.html for more info");
}
// We can avoid blocking due to low usage if the producer is sending
// We can avoid blocking due to low usage if the producer is
// sending
// a sync message or if it is using a producer window
if (producerInfo.getWindowSize() > 0 || message.isResponseRequired()) {
// copy the exchange state since the context will be modified while we are waiting
// copy the exchange state since the context will be
// modified while we are waiting
// for space.
final ProducerBrokerExchange producerExchangeCopy = producerExchange.copy();
synchronized (messagesWaitingForSpace) {
@ -491,7 +524,8 @@ public class Queue extends BaseDestination implements Task, UsageListener {
}
if (sendProducerAck) {
ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), message.getSize());
ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), message
.getSize());
context.getConnection().dispatchAsync(ack);
} else {
Response response = new Response();
@ -510,9 +544,10 @@ public class Queue extends BaseDestination implements Task, UsageListener {
}
}
});
if (systemUsage.getSendFailIfNoSpaceAfterTimeout() != 0) {
flowControlTimeoutMessages.add(new TimeoutMessage(message, context, systemUsage.getSendFailIfNoSpaceAfterTimeout()));
flowControlTimeoutMessages.add(new TimeoutMessage(message, context, systemUsage
.getSendFailIfNoSpaceAfterTimeout()));
}
registerCallbackForNotFullNotification();
@ -523,8 +558,10 @@ public class Queue extends BaseDestination implements Task, UsageListener {
} else {
if (memoryUsage.isFull()) {
waitForSpace(context, memoryUsage, "Usage Manager Memory Limit is full. Producer (" + message.getProducerId() + ") stopped to prevent flooding "
+ getActiveMQDestination().getQualifiedName() + "." + " See http://activemq.apache.org/producer-flow-control.html for more info");
waitForSpace(context, memoryUsage, "Usage Manager Memory Limit is full. Producer ("
+ message.getProducerId() + ") stopped to prevent flooding "
+ getActiveMQDestination().getQualifiedName() + "."
+ " See http://activemq.apache.org/producer-flow-control.html for more info");
}
// The usage manager could have delayed us by the time
@ -555,14 +592,18 @@ public class Queue extends BaseDestination implements Task, UsageListener {
}
}
void doMessageSend(final ProducerBrokerExchange producerExchange, final Message message) throws IOException, Exception {
void doMessageSend(final ProducerBrokerExchange producerExchange, final Message message) throws IOException,
Exception {
final ConnectionContext context = producerExchange.getConnectionContext();
Future<Object> result = null;
synchronized (sendLock) {
if (store != null && message.isPersistent()) {
if (systemUsage.getStoreUsage().isFull(getStoreUsageHighWaterMark())) {
String logMessage = "Usage Manager Store is Full, " + getStoreUsageHighWaterMark() + "% of " + systemUsage.getStoreUsage().getLimit() + ". Stopping producer (" + message.getProducerId() + ") to prevent flooding " + getActiveMQDestination().getQualifiedName() + "."
String logMessage = "Usage Manager Store is Full, " + getStoreUsageHighWaterMark() + "% of "
+ systemUsage.getStoreUsage().getLimit() + ". Stopping producer ("
+ message.getProducerId() + ") to prevent flooding "
+ getActiveMQDestination().getQualifiedName() + "."
+ " See http://activemq.apache.org/producer-flow-control.html for more info";
if (systemUsage.isSendFailIfNoSpace()) {
@ -572,11 +613,7 @@ public class Queue extends BaseDestination implements Task, UsageListener {
waitForSpace(context, systemUsage.getStoreUsage(), getStoreUsageHighWaterMark(), logMessage);
}
message.getMessageId().setBrokerSequenceId(getDestinationSequenceId());
if (context.isInTransaction()) {
store.addMessage(context, message);
}else {
result = store.asyncAddQueueMessage(context, message);
}
result = store.asyncAddQueueMessage(context, message);
}
}
if (context.isInTransaction()) {
@ -613,10 +650,10 @@ public class Queue extends BaseDestination implements Task, UsageListener {
}
if (result != null && !result.isCancelled()) {
try {
result.get();
}catch(CancellationException e) {
//ignore - the task has been cancelled if the message
// has already been deleted
result.get();
} catch (CancellationException e) {
// ignore - the task has been cancelled if the message
// has already been deleted
}
}
}
@ -652,10 +689,12 @@ public class Queue extends BaseDestination implements Task, UsageListener {
public void gc() {
}
public void acknowledge(ConnectionContext context, Subscription sub, MessageAck ack, MessageReference node) throws IOException {
public void acknowledge(ConnectionContext context, Subscription sub, MessageAck ack, MessageReference node)
throws IOException {
messageConsumed(context, node);
if (store != null && node.isPersistent()) {
// the original ack may be a ranged ack, but we are trying to delete a specific
// the original ack may be a ranged ack, but we are trying to delete
// a specific
// message store here so we need to convert to a non ranged ack.
if (ack.getMessageCount() > 0) {
// Dup the ack
@ -692,7 +731,8 @@ public class Queue extends BaseDestination implements Task, UsageListener {
synchronized (messages) {
size = messages.size();
}
return "Queue: destination=" + destination.getPhysicalName() + ", subscriptions=" + consumers.size() + ", memory=" + memoryUsage.getPercentUsage() + "%, size=" + size + ", in flight groups="
return "Queue: destination=" + destination.getPhysicalName() + ", subscriptions=" + consumers.size()
+ ", memory=" + memoryUsage.getPercentUsage() + "%, size=" + size + ", in flight groups="
+ messageGroupOwners;
}
@ -705,15 +745,15 @@ public class Queue extends BaseDestination implements Task, UsageListener {
if (getExpireMessagesPeriod() > 0) {
scheduler.schedualPeriodically(expireMessagesTask, getExpireMessagesPeriod());
}
flowControlTimeoutTask.setName("Producer Flow Control Timeout Task");
// Start flow control timeout task
// Prevent trying to start it multiple times
if (!flowControlTimeoutTask.isAlive()) {
flowControlTimeoutTask.start();
}
doPageIn(false);
}
@ -726,7 +766,7 @@ public class Queue extends BaseDestination implements Task, UsageListener {
}
scheduler.cancel(expireMessagesTask);
if (flowControlTimeoutTask.isAlive()) {
flowControlTimeoutTask.interrupt();
}
@ -897,7 +937,8 @@ public class Queue extends BaseDestination implements Task, UsageListener {
}
}
private void addAll(Collection<QueueMessageReference> refs, List<Message> l, int maxBrowsePageSize, List<MessageReference> toExpire) throws Exception {
private void addAll(Collection<QueueMessageReference> refs, List<Message> l, int maxBrowsePageSize,
List<MessageReference> toExpire) throws Exception {
for (Iterator<QueueMessageReference> i = refs.iterator(); i.hasNext() && l.size() < getMaxBrowsePageSize();) {
QueueMessageReference ref = i.next();
if (ref.isExpired()) {
@ -962,10 +1003,13 @@ public class Queue extends BaseDestination implements Task, UsageListener {
} catch (IOException e) {
}
}
// don't spin/hang if stats are out and there is nothing left in the store
// don't spin/hang if stats are out and there is nothing left in the
// store
} while (!list.isEmpty() && this.destinationStatistics.getMessages().getCount() > 0);
if (this.destinationStatistics.getMessages().getCount() > 0) {
LOG.warn(getActiveMQDestination().getQualifiedName() + " after purge complete, message count stats report: " + this.destinationStatistics.getMessages().getCount());
LOG.warn(getActiveMQDestination().getQualifiedName()
+ " after purge complete, message count stats report: "
+ this.destinationStatistics.getMessages().getCount());
}
gc();
this.destinationStatistics.getMessages().setCount(0);
@ -1032,7 +1076,8 @@ public class Queue extends BaseDestination implements Task, UsageListener {
/**
* Copies the message matching the given messageId
*/
public boolean copyMessageTo(ConnectionContext context, String messageId, ActiveMQDestination dest) throws Exception {
public boolean copyMessageTo(ConnectionContext context, String messageId, ActiveMQDestination dest)
throws Exception {
return copyMatchingMessages(context, createMessageIdFilter(messageId), dest, 1) > 0;
}
@ -1041,7 +1086,8 @@ public class Queue extends BaseDestination implements Task, UsageListener {
*
* @return the number of messages copied
*/
public int copyMatchingMessagesTo(ConnectionContext context, String selector, ActiveMQDestination dest) throws Exception {
public int copyMatchingMessagesTo(ConnectionContext context, String selector, ActiveMQDestination dest)
throws Exception {
return copyMatchingMessagesTo(context, selector, dest, -1);
}
@ -1051,7 +1097,8 @@ public class Queue extends BaseDestination implements Task, UsageListener {
*
* @return the number of messages copied
*/
public int copyMatchingMessagesTo(ConnectionContext context, String selector, ActiveMQDestination dest, int maximumMessages) throws Exception {
public int copyMatchingMessagesTo(ConnectionContext context, String selector, ActiveMQDestination dest,
int maximumMessages) throws Exception {
return copyMatchingMessages(context, createSelectorFilter(selector), dest, maximumMessages);
}
@ -1061,7 +1108,8 @@ public class Queue extends BaseDestination implements Task, UsageListener {
*
* @return the number of messages copied
*/
public int copyMatchingMessages(ConnectionContext context, MessageReferenceFilter filter, ActiveMQDestination dest, int maximumMessages) throws Exception {
public int copyMatchingMessages(ConnectionContext context, MessageReferenceFilter filter, ActiveMQDestination dest,
int maximumMessages) throws Exception {
int movedCounter = 0;
int count = 0;
Set<MessageReference> set = new CopyOnWriteArraySet<MessageReference>();
@ -1098,9 +1146,12 @@ public class Queue extends BaseDestination implements Task, UsageListener {
/**
* Move a message
*
* @param context connection context
* @param m message
* @param dest ActiveMQDestination
* @param context
* connection context
* @param m
* message
* @param dest
* ActiveMQDestination
* @throws Exception
*/
public boolean moveMessageTo(ConnectionContext context, Message m, ActiveMQDestination dest) throws Exception {
@ -1116,7 +1167,8 @@ public class Queue extends BaseDestination implements Task, UsageListener {
/**
* Moves the message matching the given messageId
*/
public boolean moveMessageTo(ConnectionContext context, String messageId, ActiveMQDestination dest) throws Exception {
public boolean moveMessageTo(ConnectionContext context, String messageId, ActiveMQDestination dest)
throws Exception {
return moveMatchingMessagesTo(context, createMessageIdFilter(messageId), dest, 1) > 0;
}
@ -1125,7 +1177,8 @@ public class Queue extends BaseDestination implements Task, UsageListener {
*
* @return the number of messages removed
*/
public int moveMatchingMessagesTo(ConnectionContext context, String selector, ActiveMQDestination dest) throws Exception {
public int moveMatchingMessagesTo(ConnectionContext context, String selector, ActiveMQDestination dest)
throws Exception {
return moveMatchingMessagesTo(context, selector, dest, Integer.MAX_VALUE);
}
@ -1133,7 +1186,8 @@ public class Queue extends BaseDestination implements Task, UsageListener {
* Moves the messages matching the given selector up to the maximum number
* of matched messages
*/
public int moveMatchingMessagesTo(ConnectionContext context, String selector, ActiveMQDestination dest, int maximumMessages) throws Exception {
public int moveMatchingMessagesTo(ConnectionContext context, String selector, ActiveMQDestination dest,
int maximumMessages) throws Exception {
return moveMatchingMessagesTo(context, createSelectorFilter(selector), dest, maximumMessages);
}
@ -1141,7 +1195,8 @@ public class Queue extends BaseDestination implements Task, UsageListener {
* Moves the messages matching the given filter up to the maximum number of
* matched messages
*/
public int moveMatchingMessagesTo(ConnectionContext context, MessageReferenceFilter filter, ActiveMQDestination dest, int maximumMessages) throws Exception {
public int moveMatchingMessagesTo(ConnectionContext context, MessageReferenceFilter filter,
ActiveMQDestination dest, int maximumMessages) throws Exception {
int movedCounter = 0;
Set<MessageReference> set = new CopyOnWriteArraySet<MessageReference>();
do {
@ -1180,7 +1235,7 @@ public class Queue extends BaseDestination implements Task, UsageListener {
* @see org.apache.activemq.thread.Task#iterate()
*/
public boolean iterate() {
boolean pageInMoreMessages = false;
boolean pageInMoreMessages = false;
synchronized (iteratingMutex) {
// do early to allow dispatch of these waiting messages
@ -1202,7 +1257,8 @@ public class Queue extends BaseDestination implements Task, UsageListener {
firstConsumer = false;
try {
if (consumersBeforeDispatchStarts > 0) {
int timeout = 1000; // wait one second by default if consumer count isn't reached
int timeout = 1000; // wait one second by default if
// consumer count isn't reached
if (timeBeforeDispatchStarts > 0) {
timeout = timeBeforeDispatchStarts;
}
@ -1212,7 +1268,8 @@ public class Queue extends BaseDestination implements Task, UsageListener {
}
} else {
if (LOG.isDebugEnabled()) {
LOG.debug(timeout + " ms elapsed and " + consumers.size() + " consumers subscribed. Starting dispatch.");
LOG.debug(timeout + " ms elapsed and " + consumers.size()
+ " consumers subscribed. Starting dispatch.");
}
}
}
@ -1226,21 +1283,24 @@ public class Queue extends BaseDestination implements Task, UsageListener {
LOG.error(e);
}
}
BrowserDispatch pendingBrowserDispatch = getNextBrowserDispatch();
synchronized (messages) {
pageInMoreMessages |= !messages.isEmpty();
}
// Kinda ugly.. but I think dispatchLock is the only mutex protecting the
// pagedInPendingDispatch variable.
// Kinda ugly.. but I think dispatchLock is the only mutex
// protecting the
// pagedInPendingDispatch variable.
synchronized (dispatchMutex) {
pageInMoreMessages |= !pagedInPendingDispatch.isEmpty();
}
// Perhaps we should page always into the pagedInPendingDispatch list if
// !messages.isEmpty(), and then if !pagedInPendingDispatch.isEmpty()
// Perhaps we should page always into the pagedInPendingDispatch
// list if
// !messages.isEmpty(), and then if
// !pagedInPendingDispatch.isEmpty()
// then we do a dispatch.
if (pageInMoreMessages || pendingBrowserDispatch != null || !redeliveredWaitingDispatch.isEmpty()) {
try {
@ -1250,7 +1310,7 @@ public class Queue extends BaseDestination implements Task, UsageListener {
LOG.error("Failed to page in more queue messages ", e);
}
}
if (pendingBrowserDispatch != null) {
ArrayList<QueueMessageReference> alreadyDispatchedMessages = null;
synchronized (pagedInMessages) {
@ -1264,7 +1324,7 @@ public class Queue extends BaseDestination implements Task, UsageListener {
try {
MessageEvaluationContext msgContext = new NonCachedMessageEvaluationContext();
msgContext.setDestination(destination);
QueueBrowserSubscription browser = pendingBrowserDispatch.getBrowser();
for (QueueMessageReference node : alreadyDispatchedMessages) {
if (!node.isAcked()) {
@ -1278,10 +1338,10 @@ public class Queue extends BaseDestination implements Task, UsageListener {
} catch (Exception e) {
LOG.warn("exception on dispatch to browser: " + pendingBrowserDispatch.getBrowser(), e);
}
} while ((pendingBrowserDispatch = getNextBrowserDispatch()) != null);
}
if (pendingWakeups.get() > 0) {
pendingWakeups.decrementAndGet();
}
@ -1336,7 +1396,8 @@ public class Queue extends BaseDestination implements Task, UsageListener {
removeMessage(c, subs, r, ack);
}
protected void removeMessage(ConnectionContext context, Subscription sub, final QueueMessageReference reference, MessageAck ack) throws IOException {
protected void removeMessage(ConnectionContext context, Subscription sub, final QueueMessageReference reference,
MessageAck ack) throws IOException {
reference.setAcked(true);
// This sends the ack the the journal..
if (!ack.isInTransaction()) {
@ -1408,7 +1469,8 @@ public class Queue extends BaseDestination implements Task, UsageListener {
final void sendMessage(final ConnectionContext context, Message msg) throws Exception {
if (!msg.isPersistent() && messages.getSystemUsage() != null) {
if (systemUsage.getTempUsage().isFull()) {
final String logMessage = "Usage Manager Temp Store is Full. Stopping producer (" + msg.getProducerId() + ") to prevent flooding " + getActiveMQDestination().getQualifiedName() + "."
final String logMessage = "Usage Manager Temp Store is Full. Stopping producer (" + msg.getProducerId()
+ ") to prevent flooding " + getActiveMQDestination().getQualifiedName() + "."
+ " See http://activemq.apache.org/producer-flow-control.html for more info";
if (systemUsage.isSendFailIfNoSpace()) {
throw new ResourceAllocationException(logMessage);
@ -1460,12 +1522,14 @@ public class Queue extends BaseDestination implements Task, UsageListener {
synchronized (dispatchMutex) {
int toPageIn = Math.min(getMaxPageSize(), messages.size());
if (LOG.isDebugEnabled()) {
LOG.debug(destination.getPhysicalName() + " toPageIn: " + toPageIn + ", Inflight: " + destinationStatistics.getInflight().getCount() + ", pagedInMessages.size "
LOG.debug(destination.getPhysicalName() + " toPageIn: " + toPageIn + ", Inflight: "
+ destinationStatistics.getInflight().getCount() + ", pagedInMessages.size "
+ pagedInMessages.size() + ", enqueueSize: " + destinationStatistics.getEnqueues().getCount());
}
if (isLazyDispatch() && !force) {
// Only page in the minimum number of messages which can be dispatched immediately.
// Only page in the minimum number of messages which can be
// dispatched immediately.
toPageIn = Math.min(getConsumerMessageCountBeforeFull(), toPageIn);
}
if (toPageIn > 0 && (force || (!consumers.isEmpty() && pagedInPendingDispatch.size() < getMaxPageSize()))) {
@ -1478,6 +1542,7 @@ public class Queue extends BaseDestination implements Task, UsageListener {
while (messages.hasNext() && count < toPageIn) {
MessageReference node = messages.next();
messages.remove();
QueueMessageReference ref = createMessageReference(node.getMessage());
if (ref.isExpired()) {
if (broker.isExpired(ref)) {
@ -1494,7 +1559,8 @@ public class Queue extends BaseDestination implements Task, UsageListener {
messages.release();
}
}
// Only add new messages, not already pagedIn to avoid multiple dispatch attempts
// Only add new messages, not already pagedIn to avoid multiple
// dispatch attempts
synchronized (pagedInMessages) {
resultList = new ArrayList<QueueMessageReference>(result.size());
for (QueueMessageReference ref : result) {
@ -1520,7 +1586,8 @@ public class Queue extends BaseDestination implements Task, UsageListener {
synchronized (pagedInPendingDispatch) {
if (!redeliveredWaitingDispatch.isEmpty()) {
// Try first to dispatch redelivered messages to keep an proper order
// Try first to dispatch redelivered messages to keep an
// proper order
redeliveredWaitingDispatch = doActualDispatch(redeliveredWaitingDispatch);
}
if (!pagedInPendingDispatch.isEmpty()) {
@ -1528,7 +1595,8 @@ public class Queue extends BaseDestination implements Task, UsageListener {
// dispatched before.
pagedInPendingDispatch = doActualDispatch(pagedInPendingDispatch);
}
// and now see if we can dispatch the new stuff.. and append to the pending
// and now see if we can dispatch the new stuff.. and append to
// the pending
// list anything that does not actually get dispatched.
if (list != null && !list.isEmpty()) {
if (pagedInPendingDispatch.isEmpty()) {
@ -1581,29 +1649,34 @@ public class Queue extends BaseDestination implements Task, UsageListener {
if (!s.isFull()) {
// Dispatch it.
s.add(node);
target = s;
target = s;
break;
} else {
// no further dispatch of list to a full consumer to avoid out of order message receipt
// no further dispatch of list to a full consumer to
// avoid out of order message receipt
fullConsumers.add(s);
}
}
interestCount++;
} else {
// makes sure it gets dispatched again
if (!node.isDropped() && !((QueueMessageReference) node).isAcked() && (!node.isDropped() || s.getConsumerInfo().isBrowser())) {
if (!node.isDropped() && !((QueueMessageReference) node).isAcked()
&& (!node.isDropped() || s.getConsumerInfo().isBrowser())) {
interestCount++;
}
}
}
if ((target == null && interestCount > 0) || consumers.size() == 0) {
// This means all subs were full or that there are no consumers...
// This means all subs were full or that there are no
// consumers...
rc.add((QueueMessageReference) node);
}
// If it got dispatched, rotate the consumer list to get round robin distribution.
if (target != null && !strictOrderDispatch && consumers.size() > 1 && !dispatchSelector.isExclusiveConsumer(target)) {
// If it got dispatched, rotate the consumer list to get round robin
// distribution.
if (target != null && !strictOrderDispatch && consumers.size() > 1
&& !dispatchSelector.isExclusiveConsumer(target)) {
synchronized (this.consumers) {
if (removeFromConsumerList(target)) {
addToConsumerList(target);
@ -1654,7 +1727,6 @@ public class Queue extends BaseDestination implements Task, UsageListener {
* dispatch process is non deterministic between master and slave. On a
* notification, the actual dispatch to the subscription (as chosen by the
* master) is completed. (non-Javadoc)
*
* @see
* org.apache.activemq.broker.region.BaseDestination#processDispatchNotification
* (org.apache.activemq.command.MessageDispatchNotification)
@ -1670,7 +1742,8 @@ public class Queue extends BaseDestination implements Task, UsageListener {
}
}
private QueueMessageReference getMatchingMessage(MessageDispatchNotification messageDispatchNotification) throws Exception {
private QueueMessageReference getMatchingMessage(MessageDispatchNotification messageDispatchNotification)
throws Exception {
QueueMessageReference message = null;
MessageId messageId = messageDispatchNotification.getMessageId();
@ -1719,8 +1792,11 @@ public class Queue extends BaseDestination implements Task, UsageListener {
}
if (message == null) {
throw new JMSException("Slave broker out of sync with master - Message: " + messageDispatchNotification.getMessageId() + " on " + messageDispatchNotification.getDestination()
+ " does not exist among pending(" + pagedInPendingDispatch.size() + ") for subscription: " + messageDispatchNotification.getConsumerId());
throw new JMSException("Slave broker out of sync with master - Message: "
+ messageDispatchNotification.getMessageId() + " on "
+ messageDispatchNotification.getDestination() + " does not exist among pending("
+ pagedInPendingDispatch.size() + ") for subscription: "
+ messageDispatchNotification.getConsumerId());
}
return message;
}
@ -1732,7 +1808,8 @@ public class Queue extends BaseDestination implements Task, UsageListener {
* @return sub or null if the subscription has been removed before dispatch
* @throws JMSException
*/
private Subscription getMatchingSubscription(MessageDispatchNotification messageDispatchNotification) throws JMSException {
private Subscription getMatchingSubscription(MessageDispatchNotification messageDispatchNotification)
throws JMSException {
Subscription sub = null;
synchronized (consumers) {
for (Subscription s : consumers) {

View File

@ -163,10 +163,10 @@ public class Topic extends BaseDestination implements Task {
DurableTopicSubscription removed = durableSubcribers.remove(key);
if (removed != null) {
destinationStatistics.getConsumers().decrement();
}
// deactivate and remove
removed.deactivate(false);
consumers.remove(removed);
// deactivate and remove
removed.deactivate(false);
consumers.remove(removed);
}
}
}
@ -418,12 +418,8 @@ public class Topic extends BaseDestination implements Task {
}
waitForSpace(context, systemUsage.getStoreUsage(), getStoreUsageHighWaterMark(), logMessage);
}
if (context.isInTransaction()) {
topicStore.addMessage(context, message);
}else {
result = topicStore.asyncAddTopicMessage(context, message);
}
topicStore.asyncAddTopicMessage(context, message);
}
message.incrementReferenceCount();

View File

@ -602,6 +602,8 @@ public abstract class Message extends BaseCommand implements MarshallAware, Mess
if (rc == 1 && getMemoryUsage() != null) {
getMemoryUsage().increaseUsage(size);
//System.err.println("INCREASE USAGE " + System.identityHashCode(getMemoryUsage()) + " PERCENT = " + getMemoryUsage().getPercentUsage());
}
//System.out.println(" + "+getMemoryUsage().getName()+" :::: "+getMessageId()+"rc="+rc);
@ -618,7 +620,10 @@ public abstract class Message extends BaseCommand implements MarshallAware, Mess
if (rc == 0 && getMemoryUsage() != null) {
getMemoryUsage().decreaseUsage(size);
//Thread.dumpStack();
//System.err.println("DECREADED USAGE " + System.identityHashCode(getMemoryUsage()) + " PERCENT = " + getMemoryUsage().getPercentUsage());
}
//System.out.println(" - "+getMemoryUsage().getName()+" :::: "+getMessageId()+"rc="+rc);
return rc;

View File

@ -17,8 +17,6 @@
package org.apache.activemq.store;
import java.io.IOException;
import java.util.concurrent.FutureTask;
import org.apache.activemq.Service;
import org.apache.activemq.command.TransactionId;
@ -32,10 +30,9 @@ public interface TransactionStore extends Service {
void prepare(TransactionId txid) throws IOException;
void commit(TransactionId txid, boolean wasPrepared, Runnable done) throws IOException;
void commit(TransactionId txid, boolean wasPrepared, Runnable preCommit,Runnable postCommit) throws IOException;
void rollback(TransactionId txid) throws IOException;
void recover(TransactionRecoveryListener listener) throws IOException;
}

View File

@ -21,9 +21,7 @@ import java.io.IOException;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map;
import javax.transaction.xa.XAException;
import org.apache.activemq.command.JournalTopicAck;
import org.apache.activemq.command.JournalTransaction;
import org.apache.activemq.command.Message;
@ -99,7 +97,10 @@ public class AMQTransactionStore implements TransactionStore {
* @throws XAException
* @see org.apache.activemq.store.TransactionStore#commit(org.apache.activemq.service.Transaction)
*/
public void commit(TransactionId txid, boolean wasPrepared, Runnable done) throws IOException {
public void commit(TransactionId txid, boolean wasPrepared, Runnable preCommit,Runnable postCommit) throws IOException {
if (preCommit != null) {
preCommit.run();
}
AMQTx tx;
if (wasPrepared) {
synchronized (preparedTransactions) {
@ -111,7 +112,9 @@ public class AMQTransactionStore implements TransactionStore {
}
}
if (tx == null) {
done.run();
if (postCommit != null) {
postCommit.run();
}
return;
}
if (txid.isXATransaction()) {
@ -119,7 +122,9 @@ public class AMQTransactionStore implements TransactionStore {
} else {
peristenceAdapter.writeCommand(new JournalTransaction(JournalTransaction.LOCAL_COMMIT, txid, wasPrepared), true,true);
}
done.run();
if (postCommit != null) {
postCommit.run();
}
}
/**

View File

@ -22,9 +22,7 @@ import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map;
import javax.transaction.xa.XAException;
import org.apache.activeio.journal.RecordLocation;
import org.apache.activemq.command.JournalTopicAck;
import org.apache.activemq.command.JournalTransaction;
@ -40,8 +38,8 @@ import org.apache.activemq.store.TransactionStore;
public class JournalTransactionStore implements TransactionStore {
private final JournalPersistenceAdapter peristenceAdapter;
private Map<Object, Tx> inflightTransactions = new LinkedHashMap<Object, Tx>();
private Map<TransactionId, Tx> preparedTransactions = new LinkedHashMap<TransactionId, Tx>();
private final Map<Object, Tx> inflightTransactions = new LinkedHashMap<Object, Tx>();
private final Map<TransactionId, Tx> preparedTransactions = new LinkedHashMap<TransactionId, Tx>();
private boolean doingRecover;
public static class TxOperation {
@ -70,7 +68,7 @@ public class JournalTransactionStore implements TransactionStore {
public static class Tx {
private final RecordLocation location;
private ArrayList<TxOperation> operations = new ArrayList<TxOperation>();
private final ArrayList<TxOperation> operations = new ArrayList<TxOperation>();
public Tx(RecordLocation location) {
this.location = location;
@ -176,8 +174,11 @@ public class JournalTransactionStore implements TransactionStore {
* @throws XAException
* @see org.apache.activemq.store.TransactionStore#commit(org.apache.activemq.service.Transaction)
*/
public void commit(TransactionId txid, boolean wasPrepared, Runnable done) throws IOException {
public void commit(TransactionId txid, boolean wasPrepared, Runnable preCommit,Runnable postCommit) throws IOException {
Tx tx;
if (preCommit != null) {
preCommit.run();
}
if (wasPrepared) {
synchronized (preparedTransactions) {
tx = preparedTransactions.remove(txid);
@ -188,7 +189,9 @@ public class JournalTransactionStore implements TransactionStore {
}
}
if (tx == null) {
done.run();
if (postCommit != null) {
postCommit.run();
}
return;
}
if (txid.isXATransaction()) {
@ -198,7 +201,9 @@ public class JournalTransactionStore implements TransactionStore {
peristenceAdapter.writeCommand(new JournalTransaction(JournalTransaction.LOCAL_COMMIT, txid,
wasPrepared), true);
}
done.run();
if (postCommit != null) {
postCommit.run();
}
}
/**

View File

@ -21,9 +21,7 @@ import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import javax.transaction.xa.XAException;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.BrokerServiceAware;
import org.apache.activemq.broker.ConnectionContext;
@ -38,7 +36,6 @@ import org.apache.activemq.store.ProxyTopicMessageStore;
import org.apache.activemq.store.TopicMessageStore;
import org.apache.activemq.store.TransactionRecoveryListener;
import org.apache.activemq.store.TransactionStore;
import org.apache.activemq.store.journal.JournalPersistenceAdapter;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -51,9 +48,9 @@ import org.apache.commons.logging.LogFactory;
public class KahaTransactionStore implements TransactionStore, BrokerServiceAware {
private static final Log LOG = LogFactory.getLog(KahaTransactionStore.class);
private Map transactions = new ConcurrentHashMap();
private Map prepared;
private KahaPersistenceAdapter adaptor;
private final Map transactions = new ConcurrentHashMap();
private final Map prepared;
private final KahaPersistenceAdapter adaptor;
private BrokerService brokerService;
@ -64,10 +61,12 @@ public class KahaTransactionStore implements TransactionStore, BrokerServiceAwar
public MessageStore proxy(MessageStore messageStore) {
return new ProxyMessageStore(messageStore) {
@Override
public void addMessage(ConnectionContext context, final Message send) throws IOException {
KahaTransactionStore.this.addMessage(getDelegate(), send);
}
@Override
public void removeMessage(ConnectionContext context, final MessageAck ack) throws IOException {
KahaTransactionStore.this.removeMessage(getDelegate(), ack);
}
@ -76,10 +75,12 @@ public class KahaTransactionStore implements TransactionStore, BrokerServiceAwar
public TopicMessageStore proxy(TopicMessageStore messageStore) {
return new ProxyTopicMessageStore(messageStore) {
@Override
public void addMessage(ConnectionContext context, final Message send) throws IOException {
KahaTransactionStore.this.addMessage(getDelegate(), send);
}
@Override
public void removeMessage(ConnectionContext context, final MessageAck ack) throws IOException {
KahaTransactionStore.this.removeMessage(getDelegate(), ack);
}
@ -101,13 +102,18 @@ public class KahaTransactionStore implements TransactionStore, BrokerServiceAwar
* @throws XAException
* @see org.apache.activemq.store.TransactionStore#commit(org.apache.activemq.service.Transaction)
*/
public void commit(TransactionId txid, boolean wasPrepared, Runnable done) throws IOException {
public void commit(TransactionId txid, boolean wasPrepared, Runnable before,Runnable after) throws IOException {
if(before != null) {
before.run();
}
KahaTransaction tx = getTx(txid);
if (tx != null) {
tx.commit(this);
removeTx(txid);
}
done.run();
if (after != null) {
after.run();
}
}
/**

View File

@ -37,9 +37,6 @@ import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import javax.jms.InvalidSelectorException;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
@ -63,17 +60,13 @@ import org.apache.activemq.store.MessageRecoveryListener;
import org.apache.activemq.store.MessageStore;
import org.apache.activemq.store.PersistenceAdapter;
import org.apache.activemq.store.TopicMessageStore;
import org.apache.activemq.store.TransactionRecoveryListener;
import org.apache.activemq.store.TransactionStore;
import org.apache.activemq.store.kahadb.data.KahaAddMessageCommand;
import org.apache.activemq.store.kahadb.data.KahaCommitCommand;
import org.apache.activemq.store.kahadb.data.KahaDestination;
import org.apache.activemq.store.kahadb.data.KahaLocalTransactionId;
import org.apache.activemq.store.kahadb.data.KahaLocation;
import org.apache.activemq.store.kahadb.data.KahaPrepareCommand;
import org.apache.activemq.store.kahadb.data.KahaRemoveDestinationCommand;
import org.apache.activemq.store.kahadb.data.KahaRemoveMessageCommand;
import org.apache.activemq.store.kahadb.data.KahaRollbackCommand;
import org.apache.activemq.store.kahadb.data.KahaSubscriptionCommand;
import org.apache.activemq.store.kahadb.data.KahaTransactionInfo;
import org.apache.activemq.store.kahadb.data.KahaXATransactionId;
@ -88,26 +81,28 @@ import org.apache.commons.logging.LogFactory;
import org.apache.kahadb.journal.Location;
import org.apache.kahadb.page.Transaction;
public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
private static final Log LOG = LogFactory.getLog(KahaDBStore.class);
public class KahaDBStore extends MessageDatabase implements PersistenceAdapter{
static final Log LOG = LogFactory.getLog(KahaDBStore.class);
private static final int MAX_ASYNC_JOBS = 10000;
protected ExecutorService queueExecutor;
protected ExecutorService topicExecutor;
protected final Map<AsyncJobKey, StoreQueueTask> asyncQueueMap = new HashMap<AsyncJobKey, StoreQueueTask>();
protected final Map<MessageId, StoreTopicTask> asyncTopicMap = new HashMap<MessageId, StoreTopicTask>();
protected final Map<AsyncJobKey, StoreTopicTask> asyncTopicMap = new HashMap<AsyncJobKey, StoreTopicTask>();
private final WireFormat wireFormat = new OpenWireFormat();
private SystemUsage usageManager;
private LinkedBlockingQueue<Runnable> asyncQueueJobQueue;
private LinkedBlockingQueue<Runnable> asyncTopicJobQueue;
private Semaphore queueSemaphore;
private Semaphore topicSemaphore;
Semaphore globalQueueSemaphore;
Semaphore globalTopicSemaphore;
private boolean concurrentStoreAndDispatchQueues = true;
private boolean concurrentStoreAndDispatchTopics = true;
private int maxAsyncJobs = MAX_ASYNC_JOBS;
private final KahaDBTransactionStore transactionStore;
public KahaDBStore() {
this.transactionStore = new KahaDBTransactionStore(this);
}
public void setBrokerName(String brokerName) {
}
@ -166,8 +161,8 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
@Override
public void doStart() throws Exception {
super.doStart();
this.queueSemaphore = new Semaphore(getMaxAsyncJobs());
this.topicSemaphore = new Semaphore(getMaxAsyncJobs());
this.globalQueueSemaphore = new Semaphore(getMaxAsyncJobs());
this.globalTopicSemaphore = new Semaphore(getMaxAsyncJobs());
this.asyncQueueJobQueue = new LinkedBlockingQueue<Runnable>(getMaxAsyncJobs());
this.asyncTopicJobQueue = new LinkedBlockingQueue<Runnable>(getMaxAsyncJobs());
this.queueExecutor = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, asyncQueueJobQueue,
@ -190,11 +185,23 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
@Override
public void doStop(ServiceStopper stopper) throws Exception {
if (this.queueSemaphore != null) {
this.queueSemaphore.drainPermits();
synchronized (this.asyncQueueMap) {
for (StoreQueueTask task : this.asyncQueueMap.values()) {
task.cancel();
}
this.asyncQueueMap.clear();
}
if (this.topicSemaphore != null) {
this.topicSemaphore.drainPermits();
synchronized (this.asyncTopicMap) {
for (StoreTopicTask task : this.asyncTopicMap.values()) {
task.cancel();
}
this.asyncTopicMap.clear();
}
if (this.globalQueueSemaphore != null) {
this.globalQueueSemaphore.drainPermits();
}
if (this.globalTopicSemaphore != null) {
this.globalTopicSemaphore.drainPermits();
}
if (this.queueExecutor != null) {
this.queueExecutor.shutdownNow();
@ -205,99 +212,50 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
super.doStop(stopper);
}
protected StoreQueueTask removeQueueTask(ActiveMQDestination activeMQDestination, MessageId id) {
StoreQueueTask task = this.asyncQueueMap.remove(new AsyncJobKey(id, activeMQDestination));
if (task != null) {
task.getMessage().decrementReferenceCount();
this.queueSemaphore.release();
protected StoreQueueTask removeQueueTask(KahaDBMessageStore store, MessageId id) {
StoreQueueTask task = null;
synchronized (this.asyncQueueMap) {
task = this.asyncQueueMap.remove(new AsyncJobKey(id, store.getDestination()));
}
return task;
}
protected void addQueueTask(ActiveMQDestination activeMQDestination, StoreQueueTask task) throws IOException {
try {
this.queueSemaphore.acquire();
} catch (InterruptedException e) {
throw new InterruptedIOException(e.getMessage());
protected void addQueueTask(KahaDBMessageStore store, StoreQueueTask task) throws IOException {
synchronized (this.asyncQueueMap) {
this.asyncQueueMap.put(new AsyncJobKey(task.getMessage().getMessageId(), store.getDestination()), task);
}
this.asyncQueueMap.put(new AsyncJobKey(task.getMessage().getMessageId(), activeMQDestination), task);
task.getMessage().incrementReferenceCount();
this.queueExecutor.execute(task);
}
protected StoreTopicTask removeTopicTask(MessageId id) {
StoreTopicTask task = this.asyncTopicMap.remove(id);
if (task != null) {
task.getMessage().decrementReferenceCount();
this.topicSemaphore.release();
protected StoreTopicTask removeTopicTask(KahaDBMessageStore store, MessageId id) {
StoreTopicTask task = null;
synchronized (this.asyncTopicMap) {
task = this.asyncTopicMap.remove(new AsyncJobKey(id, store.getDestination()));
}
return task;
}
protected void addTopicTask(StoreTopicTask task) throws IOException {
try {
this.topicSemaphore.acquire();
} catch (InterruptedException e) {
throw new InterruptedIOException(e.getMessage());
protected void addTopicTask(KahaDBMessageStore store, StoreTopicTask task) throws IOException {
synchronized (this.asyncTopicMap) {
this.asyncTopicMap.put(new AsyncJobKey(task.getMessage().getMessageId(), store.getDestination()), task);
}
this.asyncTopicMap.put(task.getMessage().getMessageId(), task);
task.getMessage().incrementReferenceCount();
this.topicExecutor.execute(task);
}
public TransactionStore createTransactionStore() throws IOException {
return new TransactionStore() {
public void commit(TransactionId txid, boolean wasPrepared, Runnable done) throws IOException {
store(new KahaCommitCommand().setTransactionInfo(createTransactionInfo(txid)), true, done);
}
public void prepare(TransactionId txid) throws IOException {
store(new KahaPrepareCommand().setTransactionInfo(createTransactionInfo(txid)), true, null);
}
public void rollback(TransactionId txid) throws IOException {
store(new KahaRollbackCommand().setTransactionInfo(createTransactionInfo(txid)), false, null);
}
public void recover(TransactionRecoveryListener listener) throws IOException {
for (Map.Entry<TransactionId, ArrayList<Operation>> entry : preparedTransactions.entrySet()) {
XATransactionId xid = (XATransactionId) entry.getKey();
ArrayList<Message> messageList = new ArrayList<Message>();
ArrayList<MessageAck> ackList = new ArrayList<MessageAck>();
for (Operation op : entry.getValue()) {
if (op.getClass() == AddOpperation.class) {
AddOpperation addOp = (AddOpperation) op;
Message msg = (Message) wireFormat.unmarshal(new DataInputStream(addOp.getCommand()
.getMessage().newInput()));
messageList.add(msg);
} else {
RemoveOpperation rmOp = (RemoveOpperation) op;
Buffer ackb = rmOp.getCommand().getAck();
MessageAck ack = (MessageAck) wireFormat.unmarshal(new DataInputStream(ackb.newInput()));
ackList.add(ack);
}
}
Message[] addedMessages = new Message[messageList.size()];
MessageAck[] acks = new MessageAck[ackList.size()];
messageList.toArray(addedMessages);
ackList.toArray(acks);
listener.recover(xid, addedMessages, acks);
}
}
public void start() throws Exception {
}
public void stop() throws Exception {
}
};
return this.transactionStore;
}
public class KahaDBMessageStore extends AbstractMessageStore {
protected KahaDestination dest;
private final int maxAsyncJobs;
private final Semaphore localDestinationSemaphore;
public KahaDBMessageStore(ActiveMQDestination destination) {
super(destination);
this.dest = convert(destination);
this.maxAsyncJobs = getMaxAsyncJobs();
this.localDestinationSemaphore = new Semaphore(this.maxAsyncJobs);
}
@Override
@ -310,7 +268,8 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
throws IOException {
if (isConcurrentStoreAndDispatchQueues()) {
StoreQueueTask result = new StoreQueueTask(this, context, message);
addQueueTask(destination, result);
result.aquireLocks();
addQueueTask(this, result);
return result.getFuture();
} else {
return super.asyncAddQueueMessage(context, message);
@ -320,10 +279,15 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
@Override
public void removeAsyncMessage(ConnectionContext context, MessageAck ack) throws IOException {
if (isConcurrentStoreAndDispatchQueues()) {
StoreQueueTask task = removeQueueTask(destination, ack.getLastMessageId());
AsyncJobKey key = new AsyncJobKey(ack.getLastMessageId(), getDestination());
StoreQueueTask task = null;
synchronized (asyncQueueMap) {
task = asyncQueueMap.get(key);
}
if (task != null) {
if (!task.cancel()) {
try {
task.future.get();
} catch (InterruptedException e) {
throw new InterruptedIOException(e.toString());
@ -331,6 +295,10 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
LOG.debug("removeAsync: cannot cancel, waiting for add resulted in ex", ignored);
}
removeMessage(context, ack);
} else {
synchronized (asyncQueueMap) {
asyncQueueMap.remove(key);
}
}
} else {
removeMessage(context, ack);
@ -348,7 +316,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(message);
command.setMessage(new Buffer(packet.getData(), packet.getOffset(), packet.getLength()));
store(command, isEnableJournalDiskSyncs() && message.isResponseRequired(), null);
store(command, isEnableJournalDiskSyncs() && message.isResponseRequired(), null, null);
}
public void removeMessage(ConnectionContext context, MessageAck ack) throws IOException {
@ -359,13 +327,13 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(ack);
command.setAck(new Buffer(packet.getData(), packet.getOffset(), packet.getLength()));
store(command, isEnableJournalDiskSyncs() && ack.isResponseRequired(), null);
store(command, isEnableJournalDiskSyncs() && ack.isResponseRequired(), null, null);
}
public void removeAllMessages(ConnectionContext context) throws IOException {
KahaRemoveDestinationCommand command = new KahaRemoveDestinationCommand();
command.setDestination(dest);
store(command, true, null);
store(command, true, null, null);
}
public Message getMessage(MessageId identity) throws IOException {
@ -395,21 +363,27 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
}
public int getMessageCount() throws IOException {
synchronized (indexMutex) {
return pageFile.tx().execute(new Transaction.CallableClosure<Integer, IOException>() {
public Integer execute(Transaction tx) throws IOException {
// Iterate through all index entries to get a count of
// messages in the destination.
StoredDestination sd = getStoredDestination(dest, tx);
int rc = 0;
for (Iterator<Entry<Location, Long>> iterator = sd.locationIndex.iterator(tx); iterator
.hasNext();) {
iterator.next();
rc++;
try {
lockAsyncJobQueue();
synchronized (indexMutex) {
return pageFile.tx().execute(new Transaction.CallableClosure<Integer, IOException>() {
public Integer execute(Transaction tx) throws IOException {
// Iterate through all index entries to get a count
// of
// messages in the destination.
StoredDestination sd = getStoredDestination(dest, tx);
int rc = 0;
for (Iterator<Entry<Location, Long>> iterator = sd.locationIndex.iterator(tx); iterator
.hasNext();) {
iterator.next();
rc++;
}
return rc;
}
return rc;
}
});
});
}
} finally {
unlockAsyncJobQueue();
}
}
@ -452,11 +426,12 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
Entry<Long, MessageKeys> entry = null;
int counter = 0;
for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx, cursorPos); iterator
.hasNext();) {
.hasNext()
&& listener.hasSpace();) {
entry = iterator.next();
listener.recoverMessage(loadMessage(entry.getValue().location));
counter++;
if (counter >= maxReturned) {
if (counter >= maxReturned || listener.hasSpace() == false) {
break;
}
}
@ -474,22 +449,27 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
@Override
public void setBatch(MessageId identity) throws IOException {
final String key = identity.toString();
try {
final String key = identity.toString();
lockAsyncJobQueue();
// Hopefully one day the page file supports concurrent read
// operations... but for now we must
// externally synchronize...
Long location;
synchronized (indexMutex) {
location = pageFile.tx().execute(new Transaction.CallableClosure<Long, IOException>() {
public Long execute(Transaction tx) throws IOException {
StoredDestination sd = getStoredDestination(dest, tx);
return sd.messageIdIndex.get(tx, key);
}
});
}
if (location != null) {
cursorPos = location + 1;
// Hopefully one day the page file supports concurrent read
// operations... but for now we must
// externally synchronize...
Long location;
synchronized (indexMutex) {
location = pageFile.tx().execute(new Transaction.CallableClosure<Long, IOException>() {
public Long execute(Transaction tx) throws IOException {
StoredDestination sd = getStoredDestination(dest, tx);
return sd.messageIdIndex.get(tx, key);
}
});
}
if (location != null) {
cursorPos = location + 1;
}
} finally {
unlockAsyncJobQueue();
}
}
@ -506,6 +486,30 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
super.stop();
}
protected void lockAsyncJobQueue() {
try {
this.localDestinationSemaphore.tryAcquire(this.maxAsyncJobs, 60, TimeUnit.SECONDS);
} catch (Exception e) {
LOG.error("Failed to lock async jobs for " + this.destination, e);
}
}
protected void unlockAsyncJobQueue() {
this.localDestinationSemaphore.release(this.maxAsyncJobs);
}
protected void acquireLocalAsyncLock() {
try {
this.localDestinationSemaphore.acquire();
} catch (InterruptedException e) {
LOG.error("Failed to aquire async lock for " + this.destination, e);
}
}
protected void releaseLocalAsyncLock() {
this.localDestinationSemaphore.release();
}
}
class KahaDBTopicMessageStore extends KahaDBMessageStore implements TopicMessageStore {
@ -520,7 +524,8 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
throws IOException {
if (isConcurrentStoreAndDispatchTopics()) {
StoreTopicTask result = new StoreTopicTask(this, context, message, subscriptionCount.get());
addTopicTask(result);
result.aquireLocks();
addTopicTask(this, result);
return result.getFuture();
} else {
return super.asyncAddTopicMessage(context, message);
@ -531,11 +536,19 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
throws IOException {
String subscriptionKey = subscriptionKey(clientId, subscriptionName);
if (isConcurrentStoreAndDispatchTopics()) {
StoreTopicTask task = asyncTopicMap.get(messageId);
AsyncJobKey key = new AsyncJobKey(messageId, getDestination());
StoreTopicTask task = null;
synchronized (asyncTopicMap) {
task = asyncTopicMap.get(key);
}
if (task != null) {
if (task.addSubscriptionKey(subscriptionKey)) {
removeTopicTask(messageId);
task.cancel();
removeTopicTask(this, messageId);
if (task.cancel()) {
synchronized (asyncTopicMap) {
asyncTopicMap.remove(key);
}
}
}
} else {
doAcknowledge(context, subscriptionKey, messageId);
@ -551,7 +564,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
command.setDestination(dest);
command.setSubscriptionKey(subscriptionKey);
command.setMessageId(messageId.toString());
store(command, false, null);
store(command, false, null, null);
}
public void addSubsciption(SubscriptionInfo subscriptionInfo, boolean retroactive) throws IOException {
@ -563,7 +576,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
command.setRetroactive(retroactive);
org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(subscriptionInfo);
command.setSubscriptionInfo(new Buffer(packet.getData(), packet.getOffset(), packet.getLength()));
store(command, isEnableJournalDiskSyncs() && true, null);
store(command, isEnableJournalDiskSyncs() && true, null, null);
this.subscriptionCount.incrementAndGet();
}
@ -571,7 +584,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
KahaSubscriptionCommand command = new KahaSubscriptionCommand();
command.setDestination(dest);
command.setSubscriptionKey(subscriptionKey(clientId, subscriptionName));
store(command, isEnableJournalDiskSyncs() && true, null);
store(command, isEnableJournalDiskSyncs() && true, null, null);
this.subscriptionCount.decrementAndGet();
}
@ -698,7 +711,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
entry = iterator.next();
listener.recoverMessage(loadMessage(entry.getValue().location));
counter++;
if (counter >= maxReturned) {
if (counter >= maxReturned || listener.hasSpace() == false) {
break;
}
}
@ -732,11 +745,11 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
}
public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException {
return new KahaDBMessageStore(destination);
return this.transactionStore.proxy(new KahaDBMessageStore(destination));
}
public TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException {
return new KahaDBTopicMessageStore(destination);
return this.transactionStore.proxy(new KahaDBTopicMessageStore(destination));
}
/**
@ -928,20 +941,19 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
static class AsyncJobKey {
MessageId id;
ActiveMQDestination destination;
AsyncJobKey(MessageId id, ActiveMQDestination destination) {
this.id = id;
this.destination = destination;
}
@Override
public boolean equals(Object obj) {
if (obj == this) {
return true;
}
return obj instanceof AsyncJobKey &&
id.equals(((AsyncJobKey)obj).id) &&
destination.equals(((AsyncJobKey)obj).destination);
return obj instanceof AsyncJobKey && id.equals(((AsyncJobKey) obj).id)
&& destination.equals(((AsyncJobKey) obj).destination);
}
@Override
@ -949,24 +961,25 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
return id.hashCode() + destination.hashCode();
}
@Override
public String toString() {
return destination.getPhysicalName() + "-" + id;
return destination.getPhysicalName() + "-" + id;
}
}
class StoreQueueTask implements Runnable {
protected final Message message;
protected final ConnectionContext context;
protected final MessageStore store;
protected final KahaDBMessageStore store;
protected final InnerFutureTask future;
protected final AtomicBoolean done = new AtomicBoolean();
protected final AtomicBoolean locked = new AtomicBoolean();
public StoreQueueTask(MessageStore store, ConnectionContext context, Message message) {
public StoreQueueTask(KahaDBMessageStore store, ConnectionContext context, Message message) {
this.store = store;
this.context = context;
this.message = message;
this.future = new InnerFutureTask(this);
}
public Future<Object> getFuture() {
@ -974,21 +987,45 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
}
public boolean cancel() {
releaseLocks();
if (this.done.compareAndSet(false, true)) {
return this.future.cancel(false);
}
return false;
}
void aquireLocks() {
if (this.locked.compareAndSet(false, true)) {
try {
globalQueueSemaphore.acquire();
store.acquireLocalAsyncLock();
message.incrementReferenceCount();
} catch (InterruptedException e) {
LOG.warn("Failed to aquire lock", e);
}
}
}
void releaseLocks() {
if (this.locked.compareAndSet(true, false)) {
store.releaseLocalAsyncLock();
globalQueueSemaphore.release();
message.decrementReferenceCount();
}
}
public void run() {
try {
if (this.done.compareAndSet(false, true)) {
this.store.addMessage(context, message);
removeQueueTask(this.store.getDestination(), this.message.getMessageId());
removeQueueTask(this.store, this.message.getMessageId());
this.future.complete();
}
} catch (Exception e) {
this.future.setException(e);
} finally {
releaseLocks();
}
}
@ -1025,6 +1062,29 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
}
@Override
void aquireLocks() {
if (this.locked.compareAndSet(false, true)) {
try {
globalTopicSemaphore.acquire();
store.acquireLocalAsyncLock();
message.incrementReferenceCount();
} catch (InterruptedException e) {
LOG.warn("Failed to aquire lock", e);
}
}
}
@Override
void releaseLocks() {
if (this.locked.compareAndSet(true, false)) {
message.decrementReferenceCount();
store.releaseLocalAsyncLock();
globalTopicSemaphore.release();
}
}
/**
* add a key
*
@ -1047,13 +1107,16 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
synchronized (this.subscriptionKeys) {
for (String key : this.subscriptionKeys) {
this.topicStore.doAcknowledge(context, key, this.message.getMessageId());
}
}
removeTopicTask(this.message.getMessageId());
removeTopicTask(this.topicStore, this.message.getMessageId());
this.future.complete();
}
} catch (Exception e) {
this.future.setException(e);
} finally {
releaseLocks();
}
}
}

View File

@ -0,0 +1,460 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.store.kahadb;
import java.io.DataInputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import javax.transaction.xa.XAException;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.TransactionId;
import org.apache.activemq.command.XATransactionId;
import org.apache.activemq.openwire.OpenWireFormat;
import org.apache.activemq.protobuf.Buffer;
import org.apache.activemq.store.AbstractMessageStore;
import org.apache.activemq.store.MessageStore;
import org.apache.activemq.store.ProxyMessageStore;
import org.apache.activemq.store.ProxyTopicMessageStore;
import org.apache.activemq.store.TopicMessageStore;
import org.apache.activemq.store.TransactionRecoveryListener;
import org.apache.activemq.store.TransactionStore;
import org.apache.activemq.store.kahadb.MessageDatabase.AddOpperation;
import org.apache.activemq.store.kahadb.MessageDatabase.Operation;
import org.apache.activemq.store.kahadb.MessageDatabase.RemoveOpperation;
import org.apache.activemq.store.kahadb.data.KahaCommitCommand;
import org.apache.activemq.store.kahadb.data.KahaPrepareCommand;
import org.apache.activemq.store.kahadb.data.KahaRollbackCommand;
import org.apache.activemq.store.kahadb.data.KahaTransactionInfo;
import org.apache.activemq.wireformat.WireFormat;
/**
* Provides a TransactionStore implementation that can create transaction aware
* MessageStore objects from non transaction aware MessageStore objects.
*
* @version $Revision: 1.4 $
*/
public class KahaDBTransactionStore implements TransactionStore {
ConcurrentHashMap<Object, Tx> inflightTransactions = new ConcurrentHashMap<Object, Tx>();
private final WireFormat wireFormat = new OpenWireFormat();
private final KahaDBStore theStore;
public KahaDBTransactionStore(KahaDBStore theStore) {
this.theStore = theStore;
}
public class Tx {
private final ArrayList<AddMessageCommand> messages = new ArrayList<AddMessageCommand>();
private final ArrayList<RemoveMessageCommand> acks = new ArrayList<RemoveMessageCommand>();
public void add(AddMessageCommand msg) {
messages.add(msg);
}
public void add(RemoveMessageCommand ack) {
acks.add(ack);
}
public Message[] getMessages() {
Message rc[] = new Message[messages.size()];
int count = 0;
for (Iterator<AddMessageCommand> iter = messages.iterator(); iter.hasNext();) {
AddMessageCommand cmd = iter.next();
rc[count++] = cmd.getMessage();
}
return rc;
}
public MessageAck[] getAcks() {
MessageAck rc[] = new MessageAck[acks.size()];
int count = 0;
for (Iterator<RemoveMessageCommand> iter = acks.iterator(); iter.hasNext();) {
RemoveMessageCommand cmd = iter.next();
rc[count++] = cmd.getMessageAck();
}
return rc;
}
/**
* @return true if something to commit
* @throws IOException
*/
public List<Future<Object>> commit() throws IOException {
List<Future<Object>> results = new ArrayList<Future<Object>>();
// Do all the message adds.
for (Iterator<AddMessageCommand> iter = messages.iterator(); iter.hasNext();) {
AddMessageCommand cmd = iter.next();
results.add(cmd.run());
}
// And removes..
for (Iterator<RemoveMessageCommand> iter = acks.iterator(); iter.hasNext();) {
RemoveMessageCommand cmd = iter.next();
cmd.run();
results.add(cmd.run());
}
return results;
}
}
public abstract class AddMessageCommand {
private final ConnectionContext ctx;
AddMessageCommand(ConnectionContext ctx) {
this.ctx = ctx;
}
abstract Message getMessage();
Future<Object> run() throws IOException {
return run(this.ctx);
}
abstract Future<Object> run(ConnectionContext ctx) throws IOException;
}
public abstract class RemoveMessageCommand {
private final ConnectionContext ctx;
RemoveMessageCommand(ConnectionContext ctx) {
this.ctx = ctx;
}
abstract MessageAck getMessageAck();
Future<Object> run() throws IOException {
return run(this.ctx);
}
abstract Future<Object> run(ConnectionContext context) throws IOException;
}
public MessageStore proxy(MessageStore messageStore) {
return new ProxyMessageStore(messageStore) {
@Override
public void addMessage(ConnectionContext context, final Message send) throws IOException {
KahaDBTransactionStore.this.addMessage(context, getDelegate(), send);
}
@Override
public Future<Object> asyncAddQueueMessage(ConnectionContext context, Message message) throws IOException {
return KahaDBTransactionStore.this.asyncAddQueueMessage(context, getDelegate(), message);
}
@Override
public void removeMessage(ConnectionContext context, final MessageAck ack) throws IOException {
KahaDBTransactionStore.this.removeMessage(context, getDelegate(), ack);
}
@Override
public void removeAsyncMessage(ConnectionContext context, MessageAck ack) throws IOException {
KahaDBTransactionStore.this.removeAsyncMessage(context, getDelegate(), ack);
}
};
}
public TopicMessageStore proxy(TopicMessageStore messageStore) {
return new ProxyTopicMessageStore(messageStore) {
@Override
public void addMessage(ConnectionContext context, final Message send) throws IOException {
KahaDBTransactionStore.this.addMessage(context, getDelegate(), send);
}
@Override
public Future<Object> asyncAddTopicMessage(ConnectionContext context, Message message) throws IOException {
KahaDBTransactionStore.this.asyncAddTopicMessage(context, getDelegate(), message);
return AbstractMessageStore.FUTURE;
}
@Override
public void removeMessage(ConnectionContext context, final MessageAck ack) throws IOException {
KahaDBTransactionStore.this.removeMessage(context, getDelegate(), ack);
}
@Override
public void removeAsyncMessage(ConnectionContext context, MessageAck ack) throws IOException {
KahaDBTransactionStore.this.removeAsyncMessage(context, getDelegate(), ack);
}
};
}
/**
* @throws IOException
* @see org.apache.activemq.store.TransactionStore#prepare(TransactionId)
*/
public void prepare(TransactionId txid) throws IOException {
inflightTransactions.remove(txid);
KahaTransactionInfo info = getTransactionInfo(txid);
theStore.store(new KahaPrepareCommand().setTransactionInfo(info), true, null, null);
}
public Tx getTx(Object txid) {
Tx tx = inflightTransactions.get(txid);
if (tx == null) {
tx = new Tx();
inflightTransactions.put(txid, tx);
}
return tx;
}
/**
* @throws XAException
* @see org.apache.activemq.store.TransactionStore#commit(org.apache.activemq.service.Transaction)
*/
public void commit(TransactionId txid, boolean wasPrepared, Runnable preCommit, Runnable postCommit)
throws IOException {
if (txid != null) {
if (!txid.isXATransaction()) {
if (preCommit != null) {
preCommit.run();
}
Tx tx = inflightTransactions.remove(txid);
if (tx != null) {
List<Future<Object>> results = tx.commit();
boolean doneSomething = false;
for (Future<Object> result : results) {
try {
result.get();
} catch (InterruptedException e) {
theStore.brokerService.handleIOException(new IOException(e));
} catch (ExecutionException e) {
theStore.brokerService.handleIOException(new IOException(e));
}catch(CancellationException e) {
}
if (!result.isCancelled()) {
doneSomething = true;
}
}
if (postCommit != null) {
postCommit.run();
}
if (doneSomething) {
KahaTransactionInfo info = getTransactionInfo(txid);
theStore.store(new KahaCommitCommand().setTransactionInfo(info), true, null, null);
}
}else {
//The Tx will be null for failed over clients - lets run their post commits
if (postCommit != null) {
postCommit.run();
}
}
} else {
KahaTransactionInfo info = getTransactionInfo(txid);
theStore.store(new KahaCommitCommand().setTransactionInfo(info), true, preCommit, postCommit);
}
}
}
/**
* @throws IOException
* @see org.apache.activemq.store.TransactionStore#rollback(TransactionId)
*/
public void rollback(TransactionId txid) throws IOException {
if (txid.isXATransaction()) {
KahaTransactionInfo info = getTransactionInfo(txid);
theStore.store(new KahaRollbackCommand().setTransactionInfo(info), false, null, null);
} else {
Object result = inflightTransactions.remove(txid);
}
}
public void start() throws Exception {
}
public void stop() throws Exception {
}
public synchronized void recover(TransactionRecoveryListener listener) throws IOException {
// All the inflight transactions get rolled back..
// inflightTransactions.clear();
for (Map.Entry<TransactionId, ArrayList<Operation>> entry : theStore.preparedTransactions.entrySet()) {
XATransactionId xid = (XATransactionId) entry.getKey();
ArrayList<Message> messageList = new ArrayList<Message>();
ArrayList<MessageAck> ackList = new ArrayList<MessageAck>();
for (Operation op : entry.getValue()) {
if (op.getClass() == AddOpperation.class) {
AddOpperation addOp = (AddOpperation) op;
Message msg = (Message) wireFormat.unmarshal(new DataInputStream(addOp.getCommand().getMessage()
.newInput()));
messageList.add(msg);
} else {
RemoveOpperation rmOp = (RemoveOpperation) op;
Buffer ackb = rmOp.getCommand().getAck();
MessageAck ack = (MessageAck) wireFormat.unmarshal(new DataInputStream(ackb.newInput()));
ackList.add(ack);
}
}
Message[] addedMessages = new Message[messageList.size()];
MessageAck[] acks = new MessageAck[ackList.size()];
messageList.toArray(addedMessages);
ackList.toArray(acks);
listener.recover(xid, addedMessages, acks);
}
}
/**
* @param message
* @throws IOException
*/
void addMessage(ConnectionContext context, final MessageStore destination, final Message message)
throws IOException {
if (message.getTransactionId() != null) {
if (message.getTransactionId().isXATransaction()) {
destination.addMessage(context, message);
} else {
Tx tx = getTx(message.getTransactionId());
tx.add(new AddMessageCommand(context) {
@Override
public Message getMessage() {
return message;
}
@Override
public Future<Object> run(ConnectionContext ctx) throws IOException {
destination.addMessage(ctx, message);
return AbstractMessageStore.FUTURE;
}
});
}
} else {
destination.addMessage(context, message);
}
}
Future<Object> asyncAddQueueMessage(ConnectionContext context, final MessageStore destination, final Message message)
throws IOException {
if (message.getTransactionId() != null) {
if (message.getTransactionId().isXATransaction()) {
return destination.asyncAddQueueMessage(context, message);
} else {
Tx tx = getTx(message.getTransactionId());
tx.add(new AddMessageCommand(context) {
@Override
public Message getMessage() {
return message;
}
@Override
public Future<Object> run(ConnectionContext ctx) throws IOException {
return destination.asyncAddQueueMessage(ctx, message);
}
});
return AbstractMessageStore.FUTURE;
}
} else {
return destination.asyncAddQueueMessage(context, message);
}
}
Future<Object> asyncAddTopicMessage(ConnectionContext context, final MessageStore destination, final Message message)
throws IOException {
if (message.getTransactionId() != null) {
if (message.getTransactionId().isXATransaction()) {
return destination.asyncAddTopicMessage(context, message);
} else {
Tx tx = getTx(message.getTransactionId());
tx.add(new AddMessageCommand(context) {
@Override
public Message getMessage() {
return message;
}
@Override
public Future run(ConnectionContext ctx) throws IOException {
return destination.asyncAddTopicMessage(ctx, message);
}
});
return AbstractMessageStore.FUTURE;
}
} else {
return destination.asyncAddTopicMessage(context, message);
}
}
/**
* @param ack
* @throws IOException
*/
final void removeMessage(ConnectionContext context, final MessageStore destination, final MessageAck ack)
throws IOException {
if (ack.isInTransaction()) {
if (ack.getTransactionId().isXATransaction()) {
destination.removeMessage(context, ack);
} else {
Tx tx = getTx(ack.getTransactionId());
tx.add(new RemoveMessageCommand(context) {
@Override
public MessageAck getMessageAck() {
return ack;
}
@Override
public Future<Object> run(ConnectionContext ctx) throws IOException {
destination.removeMessage(ctx, ack);
return AbstractMessageStore.FUTURE;
}
});
}
} else {
destination.removeMessage(context, ack);
}
}
final void removeAsyncMessage(ConnectionContext context, final MessageStore destination, final MessageAck ack)
throws IOException {
if (ack.isInTransaction()) {
if (ack.getTransactionId().isXATransaction()) {
destination.removeAsyncMessage(context, ack);
} else {
Tx tx = getTx(ack.getTransactionId());
tx.add(new RemoveMessageCommand(context) {
@Override
public MessageAck getMessageAck() {
return ack;
}
@Override
public Future<Object> run(ConnectionContext ctx) throws IOException {
destination.removeMessage(ctx, ack);
return AbstractMessageStore.FUTURE;
}
});
}
} else {
destination.removeAsyncMessage(context, ack);
}
}
private KahaTransactionInfo getTransactionInfo(TransactionId txid) {
return theStore.createTransactionInfo(txid);
}
}

View File

@ -631,7 +631,7 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar
// Methods call by the broker to update and query the store.
// /////////////////////////////////////////////////////////////////
public Location store(JournalCommand data) throws IOException {
return store(data, false, null);
return store(data, false, null,null);
}
/**
@ -641,8 +641,11 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar
* during a recovery process.
* @param done
*/
public Location store(JournalCommand data, boolean sync, Runnable done) throws IOException {
try {
public Location store(JournalCommand data, boolean sync, Runnable before,Runnable after) throws IOException {
if (before != null) {
before.run();
}
try {
int size = data.serializedSizeFramed();
DataByteArrayOutputStream os = new DataByteArrayOutputStream(size + 1);
os.writeByte(data.type().getNumber());
@ -664,8 +667,8 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar
LOG.info("KahaDB: Recovering checkpoint thread after exception");
startCheckpoint();
}
if (done != null) {
done.run();
if (after != null) {
after.run();
}
return location;
} catch (IOException ioe) {

View File

@ -24,7 +24,6 @@ import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.Map.Entry;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
@ -62,7 +61,7 @@ import org.apache.kahadb.page.Transaction;
public class TempKahaDBStore extends TempMessageDatabase implements PersistenceAdapter {
private WireFormat wireFormat = new OpenWireFormat();
private final WireFormat wireFormat = new OpenWireFormat();
public void setBrokerName(String brokerName) {
}
@ -72,9 +71,14 @@ public class TempKahaDBStore extends TempMessageDatabase implements PersistenceA
public TransactionStore createTransactionStore() throws IOException {
return new TransactionStore(){
public void commit(TransactionId txid, boolean wasPrepared, Runnable done) throws IOException {
public void commit(TransactionId txid, boolean wasPrepared, Runnable preCommit,Runnable postCommit) throws IOException {
if (preCommit != null) {
preCommit.run();
}
processCommit(txid);
done.run();
if (postCommit != null) {
postCommit.run();
}
}
public void prepare(TransactionId txid) throws IOException {
processPrepare(txid);
@ -122,6 +126,7 @@ public class TempKahaDBStore extends TempMessageDatabase implements PersistenceA
this.dest = convert( destination );
}
@Override
public ActiveMQDestination getDestination() {
return destination;
}
@ -254,10 +259,13 @@ public class TempKahaDBStore extends TempMessageDatabase implements PersistenceA
}
@Override
public void setMemoryUsage(MemoryUsage memoeyUSage) {
}
@Override
public void start() throws Exception {
}
@Override
public void stop() throws Exception {
}

View File

@ -21,9 +21,7 @@ import java.util.ArrayList;
import java.util.Iterator;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Future;
import javax.transaction.xa.XAException;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
@ -53,9 +51,9 @@ public class MemoryTransactionStore implements TransactionStore {
private boolean doingRecover;
public class Tx {
private ArrayList<AddMessageCommand> messages = new ArrayList<AddMessageCommand>();
private final ArrayList<AddMessageCommand> messages = new ArrayList<AddMessageCommand>();
private ArrayList<RemoveMessageCommand> acks = new ArrayList<RemoveMessageCommand>();
private final ArrayList<RemoveMessageCommand> acks = new ArrayList<RemoveMessageCommand>();
public void add(AddMessageCommand msg) {
messages.add(msg);
@ -130,19 +128,23 @@ public class MemoryTransactionStore implements TransactionStore {
public MessageStore proxy(MessageStore messageStore) {
return new ProxyMessageStore(messageStore) {
@Override
public void addMessage(ConnectionContext context, final Message send) throws IOException {
MemoryTransactionStore.this.addMessage(getDelegate(), send);
}
@Override
public Future<Object> asyncAddQueueMessage(ConnectionContext context, Message message) throws IOException {
MemoryTransactionStore.this.addMessage(getDelegate(), message);
return AbstractMessageStore.FUTURE;
}
@Override
public void removeMessage(ConnectionContext context, final MessageAck ack) throws IOException {
MemoryTransactionStore.this.removeMessage(getDelegate(), ack);
}
@Override
public void removeAsyncMessage(ConnectionContext context, MessageAck ack) throws IOException {
MemoryTransactionStore.this.removeMessage(getDelegate(), ack);
}
@ -151,19 +153,23 @@ public class MemoryTransactionStore implements TransactionStore {
public TopicMessageStore proxy(TopicMessageStore messageStore) {
return new ProxyTopicMessageStore(messageStore) {
@Override
public void addMessage(ConnectionContext context, final Message send) throws IOException {
MemoryTransactionStore.this.addMessage(getDelegate(), send);
}
@Override
public Future<Object> asyncAddTopicMessage(ConnectionContext context, Message message) throws IOException {
MemoryTransactionStore.this.addMessage(getDelegate(), message);
return AbstractMessageStore.FUTURE;
}
@Override
public void removeMessage(ConnectionContext context, final MessageAck ack) throws IOException {
MemoryTransactionStore.this.removeMessage(getDelegate(), ack);
}
@Override
public void removeAsyncMessage(ConnectionContext context, MessageAck ack) throws IOException {
MemoryTransactionStore.this.removeMessage(getDelegate(), ack);
}
@ -194,8 +200,10 @@ public class MemoryTransactionStore implements TransactionStore {
* @throws XAException
* @see org.apache.activemq.store.TransactionStore#commit(org.apache.activemq.service.Transaction)
*/
public void commit(TransactionId txid, boolean wasPrepared, Runnable done) throws IOException {
public void commit(TransactionId txid, boolean wasPrepared, Runnable preCommit,Runnable postCommit) throws IOException {
if (preCommit != null) {
preCommit.run();
}
Tx tx;
if (wasPrepared) {
tx = preparedTransactions.remove(txid);
@ -204,11 +212,15 @@ public class MemoryTransactionStore implements TransactionStore {
}
if (tx == null) {
done.run();
if (postCommit != null) {
postCommit.run();
}
return;
}
tx.commit();
done.run();
if (postCommit != null) {
postCommit.run();
}
}

View File

@ -17,9 +17,7 @@
package org.apache.activemq.transaction;
import java.io.IOException;
import javax.transaction.xa.XAException;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.LocalTransactionId;
import org.apache.activemq.command.TransactionId;
@ -44,6 +42,7 @@ public class LocalTransaction extends Transaction {
this.context = context;
}
@Override
public void commit(boolean onePhase) throws XAException, IOException {
if (LOG.isDebugEnabled()) {
LOG.debug("commit: " + xid
@ -69,10 +68,11 @@ public class LocalTransaction extends Transaction {
context.getTransactions().remove(xid);
// Sync on transaction store to avoid out of order messages in the cursor
// https://issues.apache.org/activemq/browse/AMQ-2594
transactionStore.commit(getTransactionId(), false, postCommitTask);
transactionStore.commit(getTransactionId(), false,preCommitTask, postCommitTask);
this.waitPostCommitDone(postCommitTask);
}
@Override
public void rollback() throws XAException, IOException {
if (LOG.isDebugEnabled()) {
@ -98,12 +98,14 @@ public class LocalTransaction extends Transaction {
}
}
@Override
public int prepare() throws XAException {
XAException xae = new XAException("Prepare not implemented on Local Transactions.");
xae.errorCode = XAException.XAER_RMERR;
throw xae;
}
@Override
public TransactionId getTransactionId() {
return xid;
}

View File

@ -23,6 +23,9 @@ public class Synchronization {
public void beforeEnd() throws Exception {
}
public void beforeCommit() throws Exception {
}
public void afterCommit() throws Exception {
}

View File

@ -24,9 +24,7 @@ import java.util.Iterator;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;
import javax.transaction.xa.XAException;
import org.apache.activemq.command.TransactionId;
import org.apache.commons.logging.Log;
@ -36,16 +34,27 @@ import org.apache.commons.logging.Log;
*
* @version $Revision: 1.5 $
*/
public abstract class Transaction implements Callable {
public abstract class Transaction {
public static final byte START_STATE = 0; // can go to: 1,2,3
public static final byte IN_USE_STATE = 1; // can go to: 2,3
public static final byte PREPARED_STATE = 2; // can go to: 3
public static final byte FINISHED_STATE = 3;
private ArrayList<Synchronization> synchronizations = new ArrayList<Synchronization>();
private final ArrayList<Synchronization> synchronizations = new ArrayList<Synchronization>();
private byte state = START_STATE;
protected FutureTask<?> postCommitTask = new FutureTask(this);
protected FutureTask<?> preCommitTask = new FutureTask<Object>(new Callable<Object>() {
public Object call() throws Exception {
doPreCommit();
return null;
}
});
protected FutureTask<?> postCommitTask = new FutureTask<Object>(new Callable<Object>() {
public Object call() throws Exception {
doPostCommit();
return null;
}
});
public byte getState() {
return state;
@ -86,6 +95,13 @@ public abstract class Transaction implements Callable {
// r.execute();
// }
}
protected void fireBeforeCommit() throws Exception {
for (Iterator<Synchronization> iter = synchronizations.iterator(); iter.hasNext();) {
Synchronization s = iter.next();
s.beforeCommit();
}
}
protected void fireAfterCommit() throws Exception {
for (Iterator<Synchronization> iter = synchronizations.iterator(); iter.hasNext();) {
@ -102,6 +118,7 @@ public abstract class Transaction implements Callable {
}
}
@Override
public String toString() {
return super.toString() + "[synchronizations=" + synchronizations + "]";
}
@ -140,6 +157,20 @@ public abstract class Transaction implements Callable {
}
}
}
protected void doPreCommit() throws XAException {
try {
fireBeforeCommit();
} catch (Throwable e) {
// I guess this could happen. Post commit task failed
// to execute properly.
getLog().warn("PRE COMMIT FAILED: ", e);
XAException xae = new XAException("PRE COMMIT FAILED");
xae.errorCode = XAException.XAER_RMERR;
xae.initCause(e);
throw xae;
}
}
protected void doPostCommit() throws XAException {
try {
@ -154,10 +185,4 @@ public abstract class Transaction implements Callable {
throw xae;
}
}
public Object call() throws Exception {
doPostCommit();
return null;
}
}

View File

@ -17,10 +17,8 @@
package org.apache.activemq.transaction;
import java.io.IOException;
import javax.transaction.xa.XAException;
import javax.transaction.xa.XAResource;
import org.apache.activemq.broker.TransactionBroker;
import org.apache.activemq.command.TransactionId;
import org.apache.activemq.command.XATransactionId;
@ -48,6 +46,7 @@ public class XATransaction extends Transaction {
}
}
@Override
public void commit(boolean onePhase) throws XAException, IOException {
if (LOG.isDebugEnabled()) {
LOG.debug("XA Transaction commit: " + xid);
@ -64,14 +63,14 @@ public class XATransaction extends Transaction {
checkForPreparedState(onePhase);
doPrePrepare();
setStateFinished();
transactionStore.commit(getTransactionId(), false, postCommitTask);
transactionStore.commit(getTransactionId(), false, preCommitTask,postCommitTask);
waitPostCommitDone(postCommitTask);
break;
case PREPARED_STATE:
// 2 phase commit, work done.
// We would record commit here.
setStateFinished();
transactionStore.commit(getTransactionId(), true, postCommitTask);
transactionStore.commit(getTransactionId(), true, preCommitTask,postCommitTask);
waitPostCommitDone(postCommitTask);
break;
default:
@ -108,6 +107,7 @@ public class XATransaction extends Transaction {
}
}
@Override
public void rollback() throws XAException, IOException {
if (LOG.isDebugEnabled()) {
@ -151,6 +151,7 @@ public class XATransaction extends Transaction {
}
}
@Override
public int prepare() throws XAException, IOException {
if (LOG.isDebugEnabled()) {
LOG.debug("XA Transaction prepare: " + xid);
@ -178,6 +179,7 @@ public class XATransaction extends Transaction {
broker.removeTransaction(xid);
}
@Override
public TransactionId getTransactionId() {
return xid;
}