From f4d4c3b4ce524ac2df287e7c5b20432e79760e50 Mon Sep 17 00:00:00 2001 From: Robert Davies Date: Thu, 1 Mar 2012 16:37:19 +0000 Subject: [PATCH] Fix for https://issues.apache.org/jira/browse/AMQ-3750 - add hint to storage of messages to enable concurrent store and dispatch git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1295662 13f79535-47bb-0310-9956-ffa450edef68 --- .../broker/region/BaseDestination.java | 10 ++++++ .../activemq/broker/region/Destination.java | 3 ++ .../broker/region/DestinationFilter.java | 8 +++++ .../apache/activemq/broker/region/Queue.java | 31 +++++++++++++++- .../apache/activemq/broker/region/Topic.java | 31 ++++++++++++++-- .../broker/region/policy/PolicyEntry.java | 10 ++++++ .../activemq/store/AbstractMessageStore.java | 16 +++++++++ .../apache/activemq/store/MessageStore.java | 35 +++++++++++++++++++ .../activemq/store/ProxyMessageStore.java | 16 +++++++-- .../store/ProxyTopicMessageStore.java | 12 +++++++ 10 files changed, 167 insertions(+), 5 deletions(-) diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java index 179d9d9a1d..d2eff73cb0 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java @@ -98,6 +98,7 @@ public abstract class BaseDestination implements Destination { private boolean reduceMemoryFootprint = false; protected final Scheduler scheduler; private boolean disposed = false; + private boolean doOptimzeMessageStorage = true; /** * @param brokerService @@ -714,6 +715,15 @@ public abstract class BaseDestination implements Destination { return this.reduceMemoryFootprint; } + public boolean isDoOptimzeMessageStorage() { + return doOptimzeMessageStorage; + } + + public void setDoOptimzeMessageStorage(boolean doOptimzeMessageStorage) { + this.doOptimzeMessageStorage = doOptimzeMessageStorage; + } + + public abstract List getConsumers(); protected boolean hasRegularConsumers(List consumers) { diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java index f666fa8d20..d1f979e8d8 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java @@ -230,4 +230,7 @@ public interface Destination extends Service, Task { boolean isPrioritizedMessages(); SlowConsumerStrategy getSlowConsumerStrategy(); + + boolean isDoOptimzeMessageStorage(); + void setDoOptimzeMessageStorage(boolean doOptimzeMessageStorage); } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java index f6e9985d92..20fa8390c2 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java @@ -302,4 +302,12 @@ public class DestinationFilter implements Destination { return next.getSlowConsumerStrategy(); } + public boolean isDoOptimzeMessageStorage() { + return next.isDoOptimzeMessageStorage(); + } + + public void setDoOptimzeMessageStorage(boolean doOptimzeMessageStorage) { + next.setDoOptimzeMessageStorage(doOptimzeMessageStorage); + } + } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java index a79275e311..fba2f20177 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java @@ -721,7 +721,7 @@ public class Queue extends BaseDestination implements Task, UsageListener { if (store != null && message.isPersistent()) { message.getMessageId().setBrokerSequenceId(getDestinationSequenceId()); if (messages.isCacheEnabled()) { - result = store.asyncAddQueueMessage(context, message); + result = store.asyncAddQueueMessage(context, message, isOptimizeStorage()); } else { store.addMessage(context, message); } @@ -2137,4 +2137,33 @@ public class Queue extends BaseDestination implements Task, UsageListener { protected Logger getLog() { return LOG; } + + protected boolean isOptimizeStorage(){ + boolean result = false; + if (isDoOptimzeMessageStorage()){ + consumersLock.readLock().lock(); + try{ + if (consumers.isEmpty()==false){ + result = true; + for (Subscription s : consumers) { + if (s.getPrefetchSize()==0){ + result = false; + break; + } + if (s.isSlowConsumer()){ + result = false; + break; + } + if (s.getInFlightUsage() > 10){ + result = false; + break; + } + } + } + }finally { + consumersLock.readLock().unlock(); + } + } + return result; + } } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java index 99f26ca6aa..67617ccc19 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java @@ -428,7 +428,7 @@ public class Topic extends BaseDestination implements Task { waitForSpace(context, systemUsage.getStoreUsage(), getStoreUsageHighWaterMark(), logMessage); } - result = topicStore.asyncAddTopicMessage(context, message); + result = topicStore.asyncAddTopicMessage(context, message,isOptimizeStorage()); } message.incrementReferenceCount(); @@ -688,4 +688,31 @@ public class Topic extends BaseDestination implements Task { protected Logger getLog() { return LOG; } -} + + protected boolean isOptimizeStorage(){ + boolean result = false; + + if (isDoOptimzeMessageStorage() && durableSubcribers.isEmpty()==false){ + result = true; + for (DurableTopicSubscription s : durableSubcribers.values()) { + if (s.isActive()== false){ + result = false; + break; + } + if (s.getPrefetchSize()==0){ + result = false; + break; + } + if (s.isSlowConsumer()){ + result = false; + break; + } + if (s.getInFlightUsage() > 10){ + result = false; + break; + } + } + } + return result; + } +} \ No newline at end of file diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java index f8499fafa1..57f4bfeb2b 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java @@ -96,6 +96,7 @@ public class PolicyEntry extends DestinationMapEntry { private long inactiveTimoutBeforeGC = BaseDestination.DEFAULT_INACTIVE_TIMEOUT_BEFORE_GC; private boolean reduceMemoryFootprint; private NetworkBridgeFilterFactory networkBridgeFilterFactory; + private boolean doOptimzeMessageStorage = true; public void configure(Broker broker,Queue queue) { @@ -171,6 +172,7 @@ public class PolicyEntry extends DestinationMapEntry { destination.setGcWithNetworkConsumers(isGcWithNetworkConsumers()); destination.setInactiveTimoutBeforeGC(getInactiveTimoutBeforeGC()); destination.setReduceMemoryFootprint(isReduceMemoryFootprint()); + destination.setDoOptimzeMessageStorage(isDoOptimzeMessageStorage()); } public void configure(Broker broker, SystemUsage memoryManager, TopicSubscription subscription) { @@ -832,4 +834,12 @@ public class PolicyEntry extends DestinationMapEntry { public NetworkBridgeFilterFactory getNetworkBridgeFilterFactory() { return networkBridgeFilterFactory; } + + public boolean isDoOptimzeMessageStorage() { + return doOptimzeMessageStorage; + } + + public void setDoOptimzeMessageStorage(boolean doOptimzeMessageStorage) { + this.doOptimzeMessageStorage = doOptimzeMessageStorage; + } } diff --git a/activemq-core/src/main/java/org/apache/activemq/store/AbstractMessageStore.java b/activemq-core/src/main/java/org/apache/activemq/store/AbstractMessageStore.java index 45b6ffbd48..c889fd3983 100644 --- a/activemq-core/src/main/java/org/apache/activemq/store/AbstractMessageStore.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/AbstractMessageStore.java @@ -73,12 +73,28 @@ abstract public class AbstractMessageStore implements MessageStore { return this.prioritizedMessages; } + + public void addMessage(final ConnectionContext context, final Message message,final boolean canOptimizeHint) throws IOException{ + addMessage(context,message); + } + + public Future asyncAddQueueMessage(final ConnectionContext context, final Message message) throws IOException { addMessage(context, message); return FUTURE; } + public Future asyncAddQueueMessage(final ConnectionContext context, final Message message,final boolean canOptimizeHint) throws IOException { + addMessage(context, message,canOptimizeHint); + return FUTURE; + } + + public Future asyncAddTopicMessage(final ConnectionContext context, final Message message,final boolean canOptimizeHint) throws IOException { + addMessage(context, message,canOptimizeHint); + return FUTURE; + } + public Future asyncAddTopicMessage(final ConnectionContext context, final Message message) throws IOException { addMessage(context, message); return FUTURE; diff --git a/activemq-core/src/main/java/org/apache/activemq/store/MessageStore.java b/activemq-core/src/main/java/org/apache/activemq/store/MessageStore.java index 38ddfeb889..24a05df98a 100755 --- a/activemq-core/src/main/java/org/apache/activemq/store/MessageStore.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/MessageStore.java @@ -41,6 +41,16 @@ public interface MessageStore extends Service { * @throws IOException */ void addMessage(ConnectionContext context, Message message) throws IOException; + + /** + * Adds a message to the message store + * + * @param context context + * @param message + * @param canOptimizeHint - give a hint to the store that the message may be consumed before it hits the disk + * @throws IOException + */ + void addMessage(ConnectionContext context, Message message,boolean canOptimizeHint) throws IOException; /** * Adds a message to the message store @@ -52,6 +62,18 @@ public interface MessageStore extends Service { * @throws IOException */ Future asyncAddQueueMessage(ConnectionContext context, Message message) throws IOException; + + /** + * Adds a message to the message store + * + * @param context context + * @param message + * @param canOptimizeHint - give a hint to the store that the message may be consumed before it hits the disk + * @return a Future to track when this is complete + * @throws IOException + * @throws IOException + */ + Future asyncAddQueueMessage(ConnectionContext context, Message message,boolean canOptimizeHint) throws IOException; /** * Adds a message to the message store @@ -64,6 +86,19 @@ public interface MessageStore extends Service { */ Future asyncAddTopicMessage(ConnectionContext context, Message message) throws IOException; +/** + * Adds a message to the message store + * + * @param context context + * @param message + * @param canOptimizeHint - give a hint to the store that the message may be consumed before it hits the disk + * @return a Future to track when this is complete + * @throws IOException + * @throws IOException + */ + Future asyncAddTopicMessage(ConnectionContext context, Message message,boolean canOptimizeHint) throws IOException; + + /** * Looks up a message using either the String messageID or the * messageNumber. Implementations are encouraged to fill in the missing key diff --git a/activemq-core/src/main/java/org/apache/activemq/store/ProxyMessageStore.java b/activemq-core/src/main/java/org/apache/activemq/store/ProxyMessageStore.java index 10d4723364..acae08bd0d 100755 --- a/activemq-core/src/main/java/org/apache/activemq/store/ProxyMessageStore.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/ProxyMessageStore.java @@ -44,6 +44,10 @@ public class ProxyMessageStore implements MessageStore { delegate.addMessage(context, message); } + public void addMessage(ConnectionContext context, Message message, boolean canOptimizeHint) throws IOException { + delegate.addMessage(context,message,canOptimizeHint); + } + public Message getMessage(MessageId identity) throws IOException { return delegate.getMessage(identity); } @@ -105,11 +109,19 @@ public class ProxyMessageStore implements MessageStore { public Future asyncAddQueueMessage(ConnectionContext context, Message message) throws IOException { return delegate.asyncAddQueueMessage(context, message); } - + + public Future asyncAddQueueMessage(ConnectionContext context, Message message, boolean canOptimizeHint) throws IOException { + return delegate.asyncAddQueueMessage(context,message,canOptimizeHint); + } + public Future asyncAddTopicMessage(ConnectionContext context, Message message) throws IOException { return delegate.asyncAddTopicMessage(context, message); } - + + public Future asyncAddTopicMessage(ConnectionContext context, Message message, boolean canOptimizeHint) throws IOException { + return asyncAddTopicMessage(context,message,canOptimizeHint); + } + public void removeAsyncMessage(ConnectionContext context, MessageAck ack) throws IOException { delegate.removeAsyncMessage(context, ack); } diff --git a/activemq-core/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java b/activemq-core/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java index fa01ec69f7..6fc58aa802 100755 --- a/activemq-core/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java @@ -45,6 +45,10 @@ public class ProxyTopicMessageStore implements TopicMessageStore { delegate.addMessage(context, message); } + public void addMessage(ConnectionContext context, Message message, boolean canOptimizeHint) throws IOException { + delegate.addMessage(context,message,canOptimizeHint); + } + public Message getMessage(MessageId identity) throws IOException { return delegate.getMessage(identity); } @@ -146,10 +150,18 @@ public class ProxyTopicMessageStore implements TopicMessageStore { return delegate.asyncAddTopicMessage(context, message); } + public Future asyncAddTopicMessage(ConnectionContext context, Message message, boolean canOptimizeHint) throws IOException { + return delegate.asyncAddTopicMessage(context,message,canOptimizeHint); + } + public Future asyncAddQueueMessage(ConnectionContext context, Message message) throws IOException { return delegate.asyncAddQueueMessage(context, message); } + public Future asyncAddQueueMessage(ConnectionContext context, Message message, boolean canOptimizeHint) throws IOException { + return delegate.asyncAddQueueMessage(context,message,canOptimizeHint); + } + public void removeAsyncMessage(ConnectionContext context, MessageAck ack) throws IOException { delegate.removeAsyncMessage(context, ack); }