https://issues.apache.org/jira/browse/AMQ-5077 - reduce reader thread work when client uses async send; async store updates can now queue up to the destination memory limit b/c they don't block the send thread. Pending store writes are now tracked in memory usage. This allows a client to quickly provide a burst of messages to fill the destination cache bounded only by network bandwidth

This commit is contained in:
gtully 2014-04-30 16:10:18 +01:00
parent 8498136f5e
commit ad1f751a41
20 changed files with 334 additions and 89 deletions

View File

@ -0,0 +1,39 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.broker.region;
import org.apache.activemq.command.Message;
import org.apache.activemq.usage.MemoryUsage;
public class PendingMarshalUsageTracker implements Runnable {
final MemoryUsage usage;
int messageSize;
public PendingMarshalUsageTracker(final Message message) {
usage = message.getMemoryUsage();
if (usage != null) {
messageSize = message.getSize();
usage.increaseUsage(messageSize);
}
}
@Override
public void run() {
if (usage != null) {
usage.decreaseUsage(messageSize);
}
}
}

View File

@ -81,6 +81,7 @@ import org.apache.activemq.filter.MessageEvaluationContext;
import org.apache.activemq.filter.NonCachedMessageEvaluationContext;
import org.apache.activemq.selector.SelectorParser;
import org.apache.activemq.state.ProducerState;
import org.apache.activemq.store.ListenableFuture;
import org.apache.activemq.store.MessageRecoveryListener;
import org.apache.activemq.store.MessageStore;
import org.apache.activemq.thread.Task;
@ -637,8 +638,8 @@ public class Queue extends BaseDestination implements Task, UsageListener {
if (isProducerFlowControl() && context.isProducerFlowControl()) {
if (warnOnProducerFlowControl) {
warnOnProducerFlowControl = false;
LOG.info("Usage Manager Memory Limit ({}) reached on {}. Producers will be throttled to the rate at which messages are removed from this destination to prevent flooding it. See http://activemq.apache.org/producer-flow-control.html for more info.",
memoryUsage.getLimit(), getActiveMQDestination().getQualifiedName());
LOG.info("Usage Manager Memory Limit ({}) reached on {}, size {}. Producers will be throttled to the rate at which messages are removed from this destination to prevent flooding it. See http://activemq.apache.org/producer-flow-control.html for more info.",
memoryUsage.getLimit(), getActiveMQDestination().getQualifiedName(), destinationStatistics.getMessages().getCount());
}
if (!context.isNetworkConnection() && systemUsage.isSendFailIfNoSpace()) {
@ -895,7 +896,7 @@ public class Queue extends BaseDestination implements Task, UsageListener {
void doMessageSend(final ProducerBrokerExchange producerExchange, final Message message) throws IOException,
Exception {
final ConnectionContext context = producerExchange.getConnectionContext();
Future<Object> result = null;
ListenableFuture<Object> result = null;
boolean needsOrderingWithTransactions = context.isInTransaction();
producerExchange.incrementSend();
@ -907,6 +908,7 @@ public class Queue extends BaseDestination implements Task, UsageListener {
message.getMessageId().setBrokerSequenceId(getDestinationSequenceId());
if (messages.isCacheEnabled()) {
result = store.asyncAddQueueMessage(context, message, isOptimizeStorage());
result.addListener(new PendingMarshalUsageTracker(message));
} else {
store.addMessage(context, message);
}
@ -942,7 +944,7 @@ public class Queue extends BaseDestination implements Task, UsageListener {
if (!needsOrderingWithTransactions) {
messageSent(context, message);
}
if (result != null && !result.isCancelled()) {
if (result != null && message.isResponseRequired() && !result.isCancelled()) {
try {
result.get();
} catch (CancellationException e) {

View File

@ -17,9 +17,6 @@
package org.apache.activemq.store;
import java.io.IOException;
import java.util.concurrent.Callable;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQDestination;
@ -29,7 +26,7 @@ import org.apache.activemq.command.MessageId;
import org.apache.activemq.usage.MemoryUsage;
abstract public class AbstractMessageStore implements MessageStore {
public static final FutureTask<Object> FUTURE;
public static final ListenableFuture<Object> FUTURE;
protected final ActiveMQDestination destination;
protected boolean prioritizedMessages;
@ -89,27 +86,27 @@ abstract public class AbstractMessageStore implements MessageStore {
}
@Override
public Future<Object> asyncAddQueueMessage(final ConnectionContext context, final Message message) throws IOException {
public ListenableFuture<Object> asyncAddQueueMessage(final ConnectionContext context, final Message message) throws IOException {
addMessage(context, message);
return FUTURE;
}
@Override
public Future<Object> asyncAddQueueMessage(final ConnectionContext context, final Message message,final boolean canOptimizeHint) throws IOException {
public ListenableFuture<Object> asyncAddQueueMessage(final ConnectionContext context, final Message message, final boolean canOptimizeHint) throws IOException {
addMessage(context, message, canOptimizeHint);
return FUTURE;
}
@Override
public Future<Object> asyncAddTopicMessage(final ConnectionContext context, final Message message,final boolean canOptimizeHint) throws IOException {
public ListenableFuture<Object> asyncAddTopicMessage(final ConnectionContext context, final Message message, final boolean canOptimizeHint) throws IOException {
addMessage(context, message, canOptimizeHint);
return FUTURE;
}
@Override
public Future<Object> asyncAddTopicMessage(final ConnectionContext context, final Message message) throws IOException {
public ListenableFuture<Object> asyncAddTopicMessage(final ConnectionContext context, final Message message) throws IOException {
addMessage(context, message);
return FUTURE;
return new InlineListenableFuture();
}
@Override
@ -121,14 +118,7 @@ abstract public class AbstractMessageStore implements MessageStore {
throw new IOException("update is not supported by: " + this);
}
static class CallableImplementation implements Callable<Object> {
public Object call() throws Exception {
return null;
}
}
static {
FUTURE = new FutureTask<Object>(new CallableImplementation());
FUTURE.run();
FUTURE = new InlineListenableFuture();
}
}

View File

@ -0,0 +1,58 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.store;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
public class InlineListenableFuture implements ListenableFuture<Object> {
public Object call() throws Exception {
return null;
}
@Override
public void addListener(Runnable listener) {
listener.run();
}
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
return false;
}
@Override
public boolean isCancelled() {
return false;
}
@Override
public boolean isDone() {
return true;
}
@Override
public Object get() throws InterruptedException, ExecutionException {
return null;
}
@Override
public Object get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
return null;
}
}

View File

@ -0,0 +1,28 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.store;
import java.util.concurrent.Future;
public interface ListenableFuture<T> extends Future<T> {
/**
* register a listener to be run on completion or immediately if complete
* any exceptions will be caught and logged
* @param listener
*/
void addListener(Runnable listener);
}

View File

@ -17,7 +17,6 @@
package org.apache.activemq.store;
import java.io.IOException;
import java.util.concurrent.Future;
import org.apache.activemq.Service;
import org.apache.activemq.broker.ConnectionContext;
@ -62,7 +61,7 @@ public interface MessageStore extends Service {
* @throws IOException
* @throws IOException
*/
Future<Object> asyncAddQueueMessage(ConnectionContext context, Message message) throws IOException;
ListenableFuture<Object> asyncAddQueueMessage(ConnectionContext context, Message message) throws IOException;
/**
* Adds a message to the message store
@ -74,18 +73,18 @@ public interface MessageStore extends Service {
* @throws IOException
* @throws IOException
*/
Future<Object> asyncAddQueueMessage(ConnectionContext context, Message message, boolean canOptimizeHint) throws IOException;
ListenableFuture<Object> asyncAddQueueMessage(ConnectionContext context, Message message, boolean canOptimizeHint) throws IOException;
/**
* Adds a message to the message store
*
* @param context context
* @param message
* @return a Future to track when this is complete
* @return a ListenableFuture to track when this is complete
* @throws IOException
* @throws IOException
*/
Future<Object> asyncAddTopicMessage(ConnectionContext context, Message message) throws IOException;
ListenableFuture<Object> asyncAddTopicMessage(ConnectionContext context, Message message) throws IOException;
/**
* Adds a message to the message store
@ -93,11 +92,11 @@ public interface MessageStore extends Service {
* @param context context
* @param message
* @param canOptimizeHint - give a hint to the store that the message may be consumed before it hits the disk
* @return a Future to track when this is complete
* @return a ListenableFuture to track when this is complete
* @throws IOException
* @throws IOException
*/
Future<Object> asyncAddTopicMessage(ConnectionContext context, Message message, boolean canOptimizeHint) throws IOException;
ListenableFuture<Object> asyncAddTopicMessage(ConnectionContext context, Message message, boolean canOptimizeHint) throws IOException;
/**
* Looks up a message using either the String messageID or the

View File

@ -122,22 +122,22 @@ public class ProxyMessageStore implements MessageStore {
}
@Override
public Future<Object> asyncAddQueueMessage(ConnectionContext context, Message message) throws IOException {
public ListenableFuture<Object> asyncAddQueueMessage(ConnectionContext context, Message message) throws IOException {
return delegate.asyncAddQueueMessage(context, message);
}
@Override
public Future<Object> asyncAddQueueMessage(ConnectionContext context, Message message, boolean canOptimizeHint) throws IOException {
public ListenableFuture<Object> asyncAddQueueMessage(ConnectionContext context, Message message, boolean canOptimizeHint) throws IOException {
return delegate.asyncAddQueueMessage(context,message,canOptimizeHint);
}
@Override
public Future<Object> asyncAddTopicMessage(ConnectionContext context, Message message) throws IOException {
public ListenableFuture<Object> asyncAddTopicMessage(ConnectionContext context, Message message) throws IOException {
return delegate.asyncAddTopicMessage(context, message);
}
@Override
public Future<Object> asyncAddTopicMessage(ConnectionContext context, Message message, boolean canOptimizeHint) throws IOException {
public ListenableFuture<Object> asyncAddTopicMessage(ConnectionContext context, Message message, boolean canOptimizeHint) throws IOException {
return asyncAddTopicMessage(context,message,canOptimizeHint);
}

View File

@ -171,22 +171,22 @@ public class ProxyTopicMessageStore implements TopicMessageStore {
}
@Override
public Future<Object> asyncAddTopicMessage(ConnectionContext context, Message message) throws IOException {
public ListenableFuture<Object> asyncAddTopicMessage(ConnectionContext context, Message message) throws IOException {
return delegate.asyncAddTopicMessage(context, message);
}
@Override
public Future<Object> asyncAddTopicMessage(ConnectionContext context, Message message, boolean canOptimizeHint) throws IOException {
public ListenableFuture<Object> asyncAddTopicMessage(ConnectionContext context, Message message, boolean canOptimizeHint) throws IOException {
return delegate.asyncAddTopicMessage(context,message, canOptimizeHint);
}
@Override
public Future<Object> asyncAddQueueMessage(ConnectionContext context, Message message) throws IOException {
public ListenableFuture<Object> asyncAddQueueMessage(ConnectionContext context, Message message) throws IOException {
return delegate.asyncAddQueueMessage(context, message);
}
@Override
public Future<Object> asyncAddQueueMessage(ConnectionContext context, Message message, boolean canOptimizeHint) throws IOException {
public ListenableFuture<Object> asyncAddQueueMessage(ConnectionContext context, Message message, boolean canOptimizeHint) throws IOException {
return delegate.asyncAddQueueMessage(context,message, canOptimizeHint);
}

View File

@ -22,7 +22,8 @@ import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.TransactionId;
import org.apache.activemq.command.XATransactionId;
import org.apache.activemq.store.AbstractMessageStore;
import org.apache.activemq.store.InlineListenableFuture;
import org.apache.activemq.store.ListenableFuture;
import org.apache.activemq.store.MessageStore;
import org.apache.activemq.store.PersistenceAdapter;
import org.apache.activemq.store.ProxyMessageStore;
@ -38,7 +39,6 @@ import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Future;
/**
* Provides a TransactionStore implementation that can create transaction aware
@ -149,15 +149,15 @@ public class MemoryTransactionStore implements TransactionStore {
}
@Override
public Future<Object> asyncAddQueueMessage(ConnectionContext context, Message message) throws IOException {
public ListenableFuture<Object> asyncAddQueueMessage(ConnectionContext context, Message message) throws IOException {
MemoryTransactionStore.this.addMessage(getDelegate(), message);
return AbstractMessageStore.FUTURE;
return new InlineListenableFuture();
}
@Override
public Future<Object> asyncAddQueueMessage(ConnectionContext context, Message message, boolean canoptimize) throws IOException {
public ListenableFuture<Object> asyncAddQueueMessage(ConnectionContext context, Message message, boolean canoptimize) throws IOException {
MemoryTransactionStore.this.addMessage(getDelegate(), message);
return AbstractMessageStore.FUTURE;
return new InlineListenableFuture();
}
@Override
@ -190,15 +190,15 @@ public class MemoryTransactionStore implements TransactionStore {
}
@Override
public Future<Object> asyncAddTopicMessage(ConnectionContext context, Message message) throws IOException {
public ListenableFuture<Object> asyncAddTopicMessage(ConnectionContext context, Message message) throws IOException {
MemoryTransactionStore.this.addMessage(getDelegate(), message);
return AbstractMessageStore.FUTURE;
return new InlineListenableFuture();
}
@Override
public Future<Object> asyncAddTopicMessage(ConnectionContext context, Message message, boolean canOptimize) throws IOException {
public ListenableFuture<Object> asyncAddTopicMessage(ConnectionContext context, Message message, boolean canOptimize) throws IOException {
MemoryTransactionStore.this.addMessage(getDelegate(), message);
return AbstractMessageStore.FUTURE;
return new InlineListenableFuture();
}
@Override

View File

@ -94,6 +94,7 @@ import org.apache.activemq.state.CommandVisitorAdapter;
import org.apache.activemq.thread.Scheduler;
import org.apache.activemq.thread.TaskRunnerFactory;
import org.apache.activemq.transport.FutureResponse;
import org.apache.activemq.transport.RequestTimedOutIOException;
import org.apache.activemq.transport.ResponseCallback;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportListener;
@ -696,7 +697,15 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
// know that the connection is being shutdown.
RemoveInfo removeCommand = info.createRemoveCommand();
removeCommand.setLastDeliveredSequenceId(lastDeliveredSequenceId);
doSyncSendPacket(info.createRemoveCommand(), closeTimeout);
try {
doSyncSendPacket(info.createRemoveCommand(), closeTimeout);
} catch (JMSException e) {
if (e.getCause() instanceof RequestTimedOutIOException) {
// expected
} else {
throw e;
}
}
doAsyncSendPacket(new ShutdownInfo());
}

View File

@ -30,7 +30,6 @@ import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Semaphore;
@ -60,7 +59,6 @@ import org.apache.activemq.store.*;
import org.apache.activemq.store.kahadb.data.KahaAddMessageCommand;
import org.apache.activemq.store.kahadb.data.KahaDestination;
import org.apache.activemq.store.kahadb.data.KahaDestination.DestinationType;
import org.apache.activemq.store.kahadb.data.KahaEntryType;
import org.apache.activemq.store.kahadb.data.KahaLocation;
import org.apache.activemq.store.kahadb.data.KahaRemoveDestinationCommand;
import org.apache.activemq.store.kahadb.data.KahaRemoveMessageCommand;
@ -370,7 +368,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
}
@Override
public Future<Object> asyncAddQueueMessage(final ConnectionContext context, final Message message)
public ListenableFuture<Object> asyncAddQueueMessage(final ConnectionContext context, final Message message)
throws IOException {
if (isConcurrentStoreAndDispatchQueues()) {
StoreQueueTask result = new StoreQueueTask(this, context, message);
@ -712,7 +710,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
}
@Override
public Future<Object> asyncAddTopicMessage(final ConnectionContext context, final Message message)
public ListenableFuture<Object> asyncAddTopicMessage(final ConnectionContext context, final Message message)
throws IOException {
if (isConcurrentStoreAndDispatchTopics()) {
StoreTopicTask result = new StoreTopicTask(this, context, message, subscriptionCount.get());
@ -1238,7 +1236,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
this.future = new InnerFutureTask(this);
}
public Future<Object> getFuture() {
public ListenableFuture<Object> getFuture() {
return this.future;
}
@ -1295,8 +1293,9 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
return this.message;
}
private class InnerFutureTask extends FutureTask<Object> {
private class InnerFutureTask extends FutureTask<Object> implements ListenableFuture<Object> {
private Runnable listener;
public InnerFutureTask(Runnable runnable) {
super(runnable, null);
@ -1309,6 +1308,29 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
public void complete() {
super.set(null);
}
@Override
public void done() {
fireListener();
}
@Override
public void addListener(Runnable listener) {
this.listener = listener;
if (isDone()) {
fireListener();
}
}
private void fireListener() {
if (listener != null) {
try {
listener.run();
} catch (Exception ignored) {
LOG.warn("Unexpected exception from future {} listener callback {}", this, listener, ignored);
}
}
}
}
}

View File

@ -33,9 +33,9 @@ import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.TransactionId;
import org.apache.activemq.command.XATransactionId;
import org.apache.activemq.openwire.OpenWireFormat;
import org.apache.activemq.protobuf.Buffer;
import org.apache.activemq.store.AbstractMessageStore;
import org.apache.activemq.store.ListenableFuture;
import org.apache.activemq.store.MessageStore;
import org.apache.activemq.store.ProxyMessageStore;
import org.apache.activemq.store.ProxyTopicMessageStore;
@ -166,12 +166,12 @@ public class KahaDBTransactionStore implements TransactionStore {
}
@Override
public Future<Object> asyncAddQueueMessage(ConnectionContext context, Message message) throws IOException {
public ListenableFuture<Object> asyncAddQueueMessage(ConnectionContext context, Message message) throws IOException {
return KahaDBTransactionStore.this.asyncAddQueueMessage(context, getDelegate(), message);
}
@Override
public Future<Object> asyncAddQueueMessage(ConnectionContext context, Message message, boolean canOptimize) throws IOException {
public ListenableFuture<Object> asyncAddQueueMessage(ConnectionContext context, Message message, boolean canOptimize) throws IOException {
return KahaDBTransactionStore.this.asyncAddQueueMessage(context, getDelegate(), message);
}
@ -200,12 +200,12 @@ public class KahaDBTransactionStore implements TransactionStore {
}
@Override
public Future<Object> asyncAddTopicMessage(ConnectionContext context, Message message) throws IOException {
public ListenableFuture<Object> asyncAddTopicMessage(ConnectionContext context, Message message) throws IOException {
return KahaDBTransactionStore.this.asyncAddTopicMessage(context, getDelegate(), message);
}
@Override
public Future<Object> asyncAddTopicMessage(ConnectionContext context, Message message, boolean canOptimize) throws IOException {
public ListenableFuture<Object> asyncAddTopicMessage(ConnectionContext context, Message message, boolean canOptimize) throws IOException {
return KahaDBTransactionStore.this.asyncAddTopicMessage(context, getDelegate(), message);
}
@ -389,7 +389,7 @@ public class KahaDBTransactionStore implements TransactionStore {
}
}
Future<Object> asyncAddQueueMessage(ConnectionContext context, final MessageStore destination, final Message message)
ListenableFuture<Object> asyncAddQueueMessage(ConnectionContext context, final MessageStore destination, final Message message)
throws IOException {
if (message.getTransactionId() != null) {
@ -416,7 +416,7 @@ public class KahaDBTransactionStore implements TransactionStore {
}
}
Future<Object> asyncAddTopicMessage(ConnectionContext context, final MessageStore destination, final Message message)
ListenableFuture<Object> asyncAddTopicMessage(ConnectionContext context, final MessageStore destination, final Message message)
throws IOException {
if (message.getTransactionId() != null) {

View File

@ -23,7 +23,6 @@ import java.util.HashSet;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Future;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.ConnectionContext;
@ -71,12 +70,12 @@ public class MultiKahaDBTransactionStore implements TransactionStore {
}
@Override
public Future<Object> asyncAddQueueMessage(ConnectionContext context, Message message) throws IOException {
public ListenableFuture<Object> asyncAddQueueMessage(ConnectionContext context, Message message) throws IOException {
return MultiKahaDBTransactionStore.this.asyncAddQueueMessage(transactionStore, context, getDelegate(), message);
}
@Override
public Future<Object> asyncAddQueueMessage(ConnectionContext context, Message message, boolean canOptimizeHint) throws IOException {
public ListenableFuture<Object> asyncAddQueueMessage(ConnectionContext context, Message message, boolean canOptimizeHint) throws IOException {
return MultiKahaDBTransactionStore.this.asyncAddQueueMessage(transactionStore, context, getDelegate(), message);
}
@ -105,12 +104,12 @@ public class MultiKahaDBTransactionStore implements TransactionStore {
}
@Override
public Future<Object> asyncAddTopicMessage(ConnectionContext context, Message message, boolean canOptimizeHint) throws IOException {
public ListenableFuture<Object> asyncAddTopicMessage(ConnectionContext context, Message message, boolean canOptimizeHint) throws IOException {
return MultiKahaDBTransactionStore.this.asyncAddTopicMessage(transactionStore, context, getDelegate(), message);
}
@Override
public Future<Object> asyncAddTopicMessage(ConnectionContext context, Message message) throws IOException {
public ListenableFuture<Object> asyncAddTopicMessage(ConnectionContext context, Message message) throws IOException {
return MultiKahaDBTransactionStore.this.asyncAddTopicMessage(transactionStore, context, getDelegate(), message);
}
@ -384,7 +383,7 @@ public class MultiKahaDBTransactionStore implements TransactionStore {
destination.addMessage(context, message);
}
Future<Object> asyncAddQueueMessage(final TransactionStore transactionStore, ConnectionContext context, final MessageStore destination, final Message message)
ListenableFuture<Object> asyncAddQueueMessage(final TransactionStore transactionStore, ConnectionContext context, final MessageStore destination, final Message message)
throws IOException {
if (message.getTransactionId() != null) {
getTx(message.getTransactionId()).trackStore(transactionStore);
@ -395,7 +394,7 @@ public class MultiKahaDBTransactionStore implements TransactionStore {
}
}
Future<Object> asyncAddTopicMessage(final TransactionStore transactionStore, ConnectionContext context, final MessageStore destination, final Message message)
ListenableFuture<Object> asyncAddTopicMessage(final TransactionStore transactionStore, ConnectionContext context, final MessageStore destination, final Message message)
throws IOException {
if (message.getTransactionId() != null) {

View File

@ -22,7 +22,7 @@ import org.fusesource.hawtdispatch.BaseRetained
import java.util.concurrent._
import atomic._
import org.fusesource.hawtbuf.Buffer
import org.apache.activemq.store.MessageRecoveryListener
import org.apache.activemq.store.{ListenableFuture, MessageRecoveryListener}
import java.lang.ref.WeakReference
import scala.Option._
import org.fusesource.hawtbuf.Buffer._
@ -97,12 +97,13 @@ object UowCompleted extends UowState {
*
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
class CountDownFuture[T <: AnyRef]() extends java.util.concurrent.Future[T] {
class CountDownFuture[T <: AnyRef]() extends ListenableFuture[T] {
private val latch:CountDownLatch=new CountDownLatch(1)
@volatile
var value:T = _
var error:Throwable = _
var listener:Runnable = _
def cancel(mayInterruptIfRunning: Boolean) = false
def isCancelled = false
@ -115,10 +116,12 @@ class CountDownFuture[T <: AnyRef]() extends java.util.concurrent.Future[T] {
def set(v:T) = {
value = v
latch.countDown()
fireListener
}
def failed(v:Throwable) = {
error = v
latch.countDown()
fireListener
}
def get() = {
@ -141,6 +144,25 @@ class CountDownFuture[T <: AnyRef]() extends java.util.concurrent.Future[T] {
}
def isDone = latch.await(0, TimeUnit.SECONDS);
def fireListener = {
if (listener != null) {
try {
listener.run()
} catch {
case e : Throwable => {
LevelDBStore.warn(e, "unexpected exception on future listener " +listener)
}
}
}
}
def addListener(l: Runnable) = {
listener = l
if (isDone) {
fireListener
}
}
}
object UowManagerConstants {

View File

@ -47,8 +47,7 @@ object LevelDBStore extends Log {
}
})
val DONE = new CountDownFuture[AnyRef]();
DONE.set(null)
val DONE = new InlineListenableFuture;
def toIOException(e: Throwable): IOException = {
if (e.isInstanceOf[ExecutionException]) {
@ -681,7 +680,7 @@ class LevelDBStore extends LockableServiceSupport with BrokerServiceAware with P
}
override def asyncAddQueueMessage(context: ConnectionContext, message: Message) = asyncAddQueueMessage(context, message, false)
override def asyncAddQueueMessage(context: ConnectionContext, message: Message, delay: Boolean): Future[AnyRef] = {
override def asyncAddQueueMessage(context: ConnectionContext, message: Message, delay: Boolean): ListenableFuture[AnyRef] = {
check_running
message.getMessageId.setEntryLocator(null)
if( message.getTransactionId!=null ) {
@ -800,7 +799,7 @@ class LevelDBStore extends LockableServiceSupport with BrokerServiceAware with P
def subscription_with_key(key:Long) = subscriptions.find(_._2.subKey == key).map(_._2)
override def asyncAddQueueMessage(context: ConnectionContext, message: Message, delay: Boolean): Future[AnyRef] = {
override def asyncAddQueueMessage(context: ConnectionContext, message: Message, delay: Boolean): ListenableFuture[AnyRef] = {
super.asyncAddQueueMessage(context, message, false)
}

View File

@ -312,6 +312,12 @@ public class JmsMultipleClientsTestSupport {
return currentTestName;
}
public void assertDestinationMemoryUsageGoesToZero() throws Exception {
assertEquals("destination memory is back to 0", 0,
TestSupport.getDestination(broker, ActiveMQDestination.transform(destination)).getMemoryUsage().getPercentUsage());
}
/*
* This is copied from AutoFailTestSupport. We may want to move it to someplace where more

View File

@ -56,6 +56,7 @@ public class QueueSubscriptionTest extends JmsMultipleClientsTestSupport {
doMultipleClientsTest();
assertTotalMessagesReceived(messageCount * producerCount);
assertDestinationMemoryUsageGoesToZero();
}
@Test(timeout = 60 * 1000)
@ -69,6 +70,7 @@ public class QueueSubscriptionTest extends JmsMultipleClientsTestSupport {
doMultipleClientsTest();
assertTotalMessagesReceived(messageCount * producerCount);
assertDestinationMemoryUsageGoesToZero();
}
@Test(timeout = 60 * 1000)
@ -82,6 +84,7 @@ public class QueueSubscriptionTest extends JmsMultipleClientsTestSupport {
doMultipleClientsTest();
assertTotalMessagesReceived(messageCount * producerCount);
assertDestinationMemoryUsageGoesToZero();
}
@Test(timeout = 2 * 60 * 1000)
@ -95,6 +98,7 @@ public class QueueSubscriptionTest extends JmsMultipleClientsTestSupport {
doMultipleClientsTest();
assertTotalMessagesReceived(messageCount * producerCount);
assertDestinationMemoryUsageGoesToZero();
}
@Test(timeout = 60 * 1000)
@ -108,6 +112,7 @@ public class QueueSubscriptionTest extends JmsMultipleClientsTestSupport {
doMultipleClientsTest();
assertTotalMessagesReceived(messageCount * producerCount);
assertDestinationMemoryUsageGoesToZero();
}
@Test(timeout = 60 * 1000)
@ -121,6 +126,7 @@ public class QueueSubscriptionTest extends JmsMultipleClientsTestSupport {
doMultipleClientsTest();
assertTotalMessagesReceived(messageCount * producerCount);
assertDestinationMemoryUsageGoesToZero();
}
@Test(timeout = 60 * 1000)
@ -134,6 +140,7 @@ public class QueueSubscriptionTest extends JmsMultipleClientsTestSupport {
doMultipleClientsTest();
assertTotalMessagesReceived(messageCount * producerCount);
assertDestinationMemoryUsageGoesToZero();
}
@Test(timeout = 2 * 60 * 1000)
@ -147,6 +154,7 @@ public class QueueSubscriptionTest extends JmsMultipleClientsTestSupport {
doMultipleClientsTest();
assertTotalMessagesReceived(messageCount * producerCount);
assertDestinationMemoryUsageGoesToZero();
}
protected void configurePrefetchOfOne() {

View File

@ -16,6 +16,7 @@
*/
package org.apache.activemq.broker;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.TestSupport;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.util.ThreadTracker;
@ -155,10 +156,5 @@ public class TopicSubscriptionTest extends QueueSubscriptionTest {
assertTotalMessagesReceived(messageCount * producerCount * consumerCount);
assertDestinationMemoryUsageGoesToZero();
}
private void assertDestinationMemoryUsageGoesToZero() throws Exception {
assertEquals("destination memory is back to 0", 0,
TestSupport.getDestination(broker, ActiveMQDestination.transform(destination)).getMemoryUsage().getPercentUsage());
}
}

View File

@ -20,12 +20,18 @@ package org.apache.activemq.broker.virtual;
import java.util.ArrayList;
import java.util.Collection;
import java.util.LinkedHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.management.ObjectName;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.jmx.QueueViewMBean;
import org.apache.activemq.broker.region.DestinationInterceptor;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
@ -33,6 +39,7 @@ import org.apache.activemq.broker.region.virtual.CompositeTopic;
import org.apache.activemq.broker.region.virtual.VirtualDestination;
import org.apache.activemq.broker.region.virtual.VirtualDestinationInterceptor;
import org.apache.activemq.command.ActiveMQBytesMessage;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
@ -43,13 +50,72 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
public class VirtualDestPerfTest {
private static final Logger LOG = LoggerFactory.getLogger(VirtualDestPerfTest.class);
public int messageSize = 5*1024;
public int messageCount = 10000;
ActiveMQTopic target = new ActiveMQTopic("target");
BrokerService brokerService;
ActiveMQConnectionFactory connectionFactory;
@Test
@Ignore("comparison test - 'new' no wait on future with async send broker side is always on")
public void testAsyncSendBurstToFillCache() throws Exception {
startBroker(4, true, true);
connectionFactory.setUseAsyncSend(true);
// a burst of messages to fill the cache
messageCount = 22000;
messageSize = 10*1024;
LinkedHashMap<Integer, Long> results = new LinkedHashMap<Integer, Long>();
final ActiveMQQueue queue = new ActiveMQQueue("targetQ");
for (Integer numThreads : new Integer[]{1, 2}) {
ExecutorService executor = Executors.newFixedThreadPool(numThreads);
final AtomicLong numMessagesToSend = new AtomicLong(messageCount);
purge();
long startTime = System.currentTimeMillis();
for (int i=0;i<numThreads;i++) {
executor.execute(new Runnable(){
@Override
public void run() {
try {
produceMessages(numMessagesToSend, queue);
} catch (Exception e) {
e.printStackTrace();
}
}
});
}
executor.shutdown();
executor.awaitTermination(5, TimeUnit.MINUTES);
long endTime = System.currentTimeMillis();
long seconds = (endTime - startTime) / 1000;
LOG.info("For numThreads {} duration {}", numThreads.intValue(), seconds);
results.put(numThreads, seconds);
LOG.info("Broker got {} messages", brokerService.getAdminView().getTotalEnqueueCount());
}
brokerService.stop();
brokerService.waitUntilStopped();
LOG.info("Results: {}", results);
}
private void purge() throws Exception {
ObjectName[] queues = brokerService.getAdminView().getQueues();
if (queues.length == 1) {
QueueViewMBean queueViewMBean = (QueueViewMBean)
brokerService.getManagementContext().newProxyInstance(queues[0], QueueViewMBean.class, false);
queueViewMBean.purge();
}
}
@Test
@Ignore("comparison test - takes too long and really needs a peek at the graph")
public void testPerf() throws Exception {
@ -58,10 +124,10 @@ public class VirtualDestPerfTest {
for (int i=2;i<11;i++) {
for (Boolean concurrent : new Boolean[]{true, false}) {
startBroker(i, concurrent);
startBroker(i, concurrent, false);
long startTime = System.currentTimeMillis();
produceMessages();
produceMessages(new AtomicLong(messageCount), target);
long endTime = System.currentTimeMillis();
long seconds = (endTime - startTime) / 1000;
LOG.info("For routes {} duration {}", i, seconds);
@ -89,20 +155,20 @@ public class VirtualDestPerfTest {
return set.toString().replace(",","%0D%0A").replace("[","").replace("]","").replace(" ", "");
}
protected void produceMessages() throws Exception {
protected void produceMessages(AtomicLong messageCount, ActiveMQDestination destination) throws Exception {
final ByteSequence payLoad = new ByteSequence(new byte[messageSize]);
Connection connection = connectionFactory.createConnection();
MessageProducer messageProducer = connection.createSession(false, Session.AUTO_ACKNOWLEDGE).createProducer(target);
MessageProducer messageProducer = connection.createSession(false, Session.AUTO_ACKNOWLEDGE).createProducer(destination);
messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT);
ActiveMQBytesMessage message = new ActiveMQBytesMessage();
message.setContent(new ByteSequence(new byte[5*1024]));
for (int i=0; i<10000; i++) {
message.setContent(payLoad);
while (messageCount.decrementAndGet() >= 0) {
messageProducer.send(message);
}
connection.close();
}
private void startBroker(int fanoutCount, boolean concurrentSend) throws Exception {
private void startBroker(int fanoutCount, boolean concurrentSend, boolean concurrentStoreAndDispatchQueues) throws Exception {
brokerService = new BrokerService();
brokerService.setDeleteAllMessagesOnStartup(true);
brokerService.setUseVirtualTopics(true);
@ -111,6 +177,8 @@ public class VirtualDestPerfTest {
PolicyMap destPolicyMap = new PolicyMap();
PolicyEntry defaultEntry = new PolicyEntry();
defaultEntry.setExpireMessagesPeriod(0);
defaultEntry.setOptimizedDispatch(true);
defaultEntry.setCursorMemoryHighWaterMark(110);
destPolicyMap.setDefaultEntry(defaultEntry);
brokerService.setDestinationPolicy(destPolicyMap);
@ -129,13 +197,13 @@ public class VirtualDestPerfTest {
brokerService.start();
connectionFactory = new ActiveMQConnectionFactory(brokerService.getTransportConnectors().get(0).getPublishableConnectString());
connectionFactory.setUseAsyncSend(false);
connectionFactory.setWatchTopicAdvisories(false);
if (brokerService.getPersistenceAdapter() instanceof KahaDBPersistenceAdapter) {
//with parallel sends and no consumers, concurrentStoreAnd dispatch, which uses a single thread by default
// will stop/impeed write batching. The num threads will need tweaking when consumers are in the mix but may introduce
// order issues
((KahaDBPersistenceAdapter)brokerService.getPersistenceAdapter()).setConcurrentStoreAndDispatchQueues(false);
((KahaDBPersistenceAdapter)brokerService.getPersistenceAdapter()).setConcurrentStoreAndDispatchQueues(concurrentStoreAndDispatchQueues);
}
}
}

View File

@ -180,7 +180,7 @@ public class QueueBrowsingTest {
@Test
public void testMemoryLimit() throws Exception {
broker.getSystemUsage().getMemoryUsage().setLimit(10 * 1024);
broker.getSystemUsage().getMemoryUsage().setLimit(16 * 1024);
int messageToSend = 370;