transaction performance enhancements

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@958009 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Robert Davies 2010-06-25 15:45:47 +00:00
parent 39c536395a
commit ddb2c91c20
3 changed files with 61 additions and 38 deletions

View File

@ -102,7 +102,7 @@ public class Queue extends BaseDestination implements Task, UsageListener {
private MessageGroupMap messageGroupOwners;
private DispatchPolicy dispatchPolicy = new RoundRobinDispatchPolicy();
private MessageGroupMapFactory messageGroupMapFactory = new MessageGroupHashBucketFactory();
private final Lock sendLock = new ReentrantLock();
final Lock sendLock = new ReentrantLock();
private ExecutorService executor;
protected final Map<MessageId, Runnable> messagesWaitingForSpace = Collections
.synchronizedMap(new LinkedHashMap<MessageId, Runnable>());
@ -616,9 +616,6 @@ public class Queue extends BaseDestination implements Task, UsageListener {
@Override
public void beforeCommit() throws Exception {
sendLock.lockInterruptibly();
}
@Override
public void afterCommit() throws Exception {
try {
// It could take while before we receive the commit
// op, by that time the message could have expired..

View File

@ -80,10 +80,11 @@ public class Topic extends BaseDestination implements Task {
};
};
public Topic(BrokerService brokerService, ActiveMQDestination destination, TopicMessageStore store, DestinationStatistics parentStats, TaskRunnerFactory taskFactory) throws Exception {
public Topic(BrokerService brokerService, ActiveMQDestination destination, TopicMessageStore store,
DestinationStatistics parentStats, TaskRunnerFactory taskFactory) throws Exception {
super(brokerService, store, destination, parentStats);
this.topicStore = store;
//set default subscription recovery policy
// set default subscription recovery policy
subscriptionRecoveryPolicy = new NoSubscriptionRecoveryPolicy();
this.taskRunner = taskFactory.createTaskRunner(this, "Topic " + destination.getPhysicalName());
}
@ -92,7 +93,8 @@ public class Topic extends BaseDestination implements Task {
public void initialize() throws Exception {
super.initialize();
if (store != null) {
// AMQ-2586: Better to leave this stat at zero than to give the user misleading metrics.
// AMQ-2586: Better to leave this stat at zero than to give the user
// misleading metrics.
// int messageCount = store.getMessageCount();
// destinationStatistics.getMessages().setCount(messageCount);
}
@ -147,7 +149,8 @@ public class Topic extends BaseDestination implements Task {
}
}
public void removeSubscription(ConnectionContext context, Subscription sub, long lastDeliveredSequenceId) throws Exception {
public void removeSubscription(ConnectionContext context, Subscription sub, long lastDeliveredSequenceId)
throws Exception {
if (!sub.getConsumerInfo().isDurable()) {
destinationStatistics.getConsumers().decrement();
synchronized (consumers) {
@ -166,7 +169,7 @@ public class Topic extends BaseDestination implements Task {
// deactivate and remove
removed.deactivate(false);
consumers.remove(removed);
}
}
}
}
@ -267,7 +270,8 @@ public class Topic extends BaseDestination implements Task {
final ConnectionContext context = producerExchange.getConnectionContext();
final ProducerInfo producerInfo = producerExchange.getProducerState().getInfo();
final boolean sendProducerAck = !message.isResponseRequired() && producerInfo.getWindowSize() > 0 && !context.isInRecoveryMode();
final boolean sendProducerAck = !message.isResponseRequired() && producerInfo.getWindowSize() > 0
&& !context.isInRecoveryMode();
// There is delay between the client sending it and it arriving at the
// destination.. it may have expired.
@ -289,17 +293,24 @@ public class Topic extends BaseDestination implements Task {
if (warnOnProducerFlowControl) {
warnOnProducerFlowControl = false;
LOG.info("Usage Manager memory limit ("+ memoryUsage.getLimit() + ") reached for " + getActiveMQDestination().getQualifiedName()
+ ". Producers will be throttled to the rate at which messages are removed from this destination to prevent flooding it."
+ " See http://activemq.apache.org/producer-flow-control.html for more info");
LOG
.info("Usage Manager memory limit ("
+ memoryUsage.getLimit()
+ ") reached for "
+ getActiveMQDestination().getQualifiedName()
+ ". Producers will be throttled to the rate at which messages are removed from this destination to prevent flooding it."
+ " See http://activemq.apache.org/producer-flow-control.html for more info");
}
if (systemUsage.isSendFailIfNoSpace()) {
throw new javax.jms.ResourceAllocationException("Usage Manager memory limit ("+ memoryUsage.getLimit() + ") reached. Stopping producer (" + message.getProducerId() + ") to prevent flooding "
+ getActiveMQDestination().getQualifiedName() + "." + " See http://activemq.apache.org/producer-flow-control.html for more info");
throw new javax.jms.ResourceAllocationException("Usage Manager memory limit ("
+ memoryUsage.getLimit() + ") reached. Stopping producer (" + message.getProducerId()
+ ") to prevent flooding " + getActiveMQDestination().getQualifiedName() + "."
+ " See http://activemq.apache.org/producer-flow-control.html for more info");
}
// We can avoid blocking due to low usage if the producer is sending
// We can avoid blocking due to low usage if the producer is
// sending
// a sync message or
// if it is using a producer window
if (producerInfo.getWindowSize() > 0 || message.isResponseRequired()) {
@ -319,7 +330,8 @@ public class Topic extends BaseDestination implements Task {
}
if (sendProducerAck) {
ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), message.getSize());
ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), message
.getSize());
context.getConnection().dispatchAsync(ack);
} else {
Response response = new Response();
@ -338,7 +350,8 @@ public class Topic extends BaseDestination implements Task {
}
});
// If the user manager is not full, then the task will not
// If the user manager is not full, then the task will
// not
// get called..
if (!memoryUsage.notifyCallbackWhenNotFull(sendMessagesWaitingForSpaceTask)) {
// so call it directly here.
@ -349,10 +362,11 @@ public class Topic extends BaseDestination implements Task {
}
} else {
// Producer flow control cannot be used, so we have do the flow
// Producer flow control cannot be used, so we have do the
// flow
// control at the broker
// by blocking this thread until there is space available.
if (memoryUsage.isFull()) {
if (context.isInTransaction()) {
@ -364,12 +378,20 @@ public class Topic extends BaseDestination implements Task {
if (count > 2 && context.isInTransaction()) {
count = 0;
int size = context.getTransaction().size();
LOG.warn("Waiting for space to send transacted message - transaction elements = " + size + " need more space to commit. Message = " + message);
LOG.warn("Waiting for space to send transacted message - transaction elements = "
+ size + " need more space to commit. Message = " + message);
}
}
} else {
waitForSpace(context, memoryUsage, "Usage Manager memory limit reached. Stopping producer (" + message.getProducerId() + ") to prevent flooding "
+ getActiveMQDestination().getQualifiedName() + "." + " See http://activemq.apache.org/producer-flow-control.html for more info");
waitForSpace(
context,
memoryUsage,
"Usage Manager memory limit reached. Stopping producer ("
+ message.getProducerId()
+ ") to prevent flooding "
+ getActiveMQDestination().getQualifiedName()
+ "."
+ " See http://activemq.apache.org/producer-flow-control.html for more info");
}
}
@ -403,7 +425,8 @@ public class Topic extends BaseDestination implements Task {
* @throws IOException
* @throws Exception
*/
synchronized void doMessageSend(final ProducerBrokerExchange producerExchange, final Message message) throws IOException, Exception {
synchronized void doMessageSend(final ProducerBrokerExchange producerExchange, final Message message)
throws IOException, Exception {
final ConnectionContext context = producerExchange.getConnectionContext();
message.setRegionDestination(this);
message.getMessageId().setBrokerSequenceId(getDestinationSequenceId());
@ -411,15 +434,17 @@ public class Topic extends BaseDestination implements Task {
if (topicStore != null && message.isPersistent() && !canOptimizeOutPersistence()) {
if (systemUsage.getStoreUsage().isFull(getStoreUsageHighWaterMark())) {
final String logMessage = "Usage Manager Store is Full, " + getStoreUsageHighWaterMark() + "% of " + systemUsage.getStoreUsage().getLimit() + ". Stopping producer (" + message.getProducerId() + ") to prevent flooding " + getActiveMQDestination().getQualifiedName() + "."
final String logMessage = "Usage Manager Store is Full, " + getStoreUsageHighWaterMark() + "% of "
+ systemUsage.getStoreUsage().getLimit() + ". Stopping producer (" + message.getProducerId()
+ ") to prevent flooding " + getActiveMQDestination().getQualifiedName() + "."
+ " See http://activemq.apache.org/producer-flow-control.html for more info";
if (systemUsage.isSendFailIfNoSpace()) {
throw new javax.jms.ResourceAllocationException(logMessage);
}
waitForSpace(context, systemUsage.getStoreUsage(), getStoreUsageHighWaterMark(), logMessage);
}
result = topicStore.asyncAddTopicMessage(context, message);
}
result = topicStore.asyncAddTopicMessage(context, message);
}
message.incrementReferenceCount();
@ -427,7 +452,7 @@ public class Topic extends BaseDestination implements Task {
if (context.isInTransaction()) {
context.getTransaction().addSynchronization(new Synchronization() {
@Override
public void afterCommit() throws Exception {
public void beforeCommit() throws Exception {
// It could take while before we receive the commit
// operration.. by that time the message could have
// expired..
@ -454,10 +479,10 @@ public class Topic extends BaseDestination implements Task {
}
if (result != null && !result.isCancelled()) {
try {
result.get();
}catch(CancellationException e) {
//ignore - the task has been cancelled if the message
// has already been deleted
result.get();
} catch (CancellationException e) {
// ignore - the task has been cancelled if the message
// has already been deleted
}
}
@ -472,7 +497,8 @@ public class Topic extends BaseDestination implements Task {
return "Topic: destination=" + destination.getPhysicalName() + ", subscriptions=" + consumers.size();
}
public void acknowledge(ConnectionContext context, Subscription sub, final MessageAck ack, final MessageReference node) throws IOException {
public void acknowledge(ConnectionContext context, Subscription sub, final MessageAck ack,
final MessageReference node) throws IOException {
if (topicStore != null && node.isPersistent()) {
DurableTopicSubscription dsub = (DurableTopicSubscription) sub;
SubscriptionKey key = dsub.getSubscriptionKey();
@ -580,7 +606,8 @@ public class Topic extends BaseDestination implements Task {
}
protected void dispatch(final ConnectionContext context, Message message) throws Exception {
// AMQ-2586: Better to leave this stat at zero than to give the user misleading metrics.
// AMQ-2586: Better to leave this stat at zero than to give the user
// misleading metrics.
// destinationStatistics.getMessages().increment();
destinationStatistics.getEnqueues().increment();
dispatchValve.increment();
@ -612,7 +639,8 @@ public class Topic extends BaseDestination implements Task {
public void messageExpired(ConnectionContext context, Subscription subs, MessageReference reference) {
broker.messageExpired(context, reference);
// AMQ-2586: Better to leave this stat at zero than to give the user misleading metrics.
// AMQ-2586: Better to leave this stat at zero than to give the user
// misleading metrics.
// destinationStatistics.getMessages().decrement();
destinationStatistics.getEnqueues().decrement();
destinationStatistics.getExpired().increment();
@ -626,11 +654,10 @@ public class Topic extends BaseDestination implements Task {
LOG.error("Failed to remove expired Message from the store ", e);
}
}
@Override
protected Log getLog() {
return LOG;
}
}

View File

@ -244,7 +244,6 @@ public class KahaDBTransactionStore implements TransactionStore {
doneSomething = true;
}
}
if (postCommit != null) {
postCommit.run();
}