Fix for https://issues.apache.org/jira/browse/AMQ-3750 - add hint to storage of messages to enable concurrent store and dispatch

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1295662 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Robert Davies 2012-03-01 16:37:19 +00:00
parent 5dda6e6fee
commit f4d4c3b4ce
10 changed files with 167 additions and 5 deletions

View File

@ -98,6 +98,7 @@ public abstract class BaseDestination implements Destination {
private boolean reduceMemoryFootprint = false;
protected final Scheduler scheduler;
private boolean disposed = false;
private boolean doOptimzeMessageStorage = true;
/**
* @param brokerService
@ -714,6 +715,15 @@ public abstract class BaseDestination implements Destination {
return this.reduceMemoryFootprint;
}
public boolean isDoOptimzeMessageStorage() {
return doOptimzeMessageStorage;
}
public void setDoOptimzeMessageStorage(boolean doOptimzeMessageStorage) {
this.doOptimzeMessageStorage = doOptimzeMessageStorage;
}
public abstract List<Subscription> getConsumers();
protected boolean hasRegularConsumers(List<Subscription> consumers) {

View File

@ -230,4 +230,7 @@ public interface Destination extends Service, Task {
boolean isPrioritizedMessages();
SlowConsumerStrategy getSlowConsumerStrategy();
boolean isDoOptimzeMessageStorage();
void setDoOptimzeMessageStorage(boolean doOptimzeMessageStorage);
}

View File

@ -302,4 +302,12 @@ public class DestinationFilter implements Destination {
return next.getSlowConsumerStrategy();
}
public boolean isDoOptimzeMessageStorage() {
return next.isDoOptimzeMessageStorage();
}
public void setDoOptimzeMessageStorage(boolean doOptimzeMessageStorage) {
next.setDoOptimzeMessageStorage(doOptimzeMessageStorage);
}
}

View File

@ -721,7 +721,7 @@ public class Queue extends BaseDestination implements Task, UsageListener {
if (store != null && message.isPersistent()) {
message.getMessageId().setBrokerSequenceId(getDestinationSequenceId());
if (messages.isCacheEnabled()) {
result = store.asyncAddQueueMessage(context, message);
result = store.asyncAddQueueMessage(context, message, isOptimizeStorage());
} else {
store.addMessage(context, message);
}
@ -2137,4 +2137,33 @@ public class Queue extends BaseDestination implements Task, UsageListener {
protected Logger getLog() {
return LOG;
}
protected boolean isOptimizeStorage(){
boolean result = false;
if (isDoOptimzeMessageStorage()){
consumersLock.readLock().lock();
try{
if (consumers.isEmpty()==false){
result = true;
for (Subscription s : consumers) {
if (s.getPrefetchSize()==0){
result = false;
break;
}
if (s.isSlowConsumer()){
result = false;
break;
}
if (s.getInFlightUsage() > 10){
result = false;
break;
}
}
}
}finally {
consumersLock.readLock().unlock();
}
}
return result;
}
}

View File

@ -428,7 +428,7 @@ public class Topic extends BaseDestination implements Task {
waitForSpace(context, systemUsage.getStoreUsage(), getStoreUsageHighWaterMark(), logMessage);
}
result = topicStore.asyncAddTopicMessage(context, message);
result = topicStore.asyncAddTopicMessage(context, message,isOptimizeStorage());
}
message.incrementReferenceCount();
@ -688,4 +688,31 @@ public class Topic extends BaseDestination implements Task {
protected Logger getLog() {
return LOG;
}
}
protected boolean isOptimizeStorage(){
boolean result = false;
if (isDoOptimzeMessageStorage() && durableSubcribers.isEmpty()==false){
result = true;
for (DurableTopicSubscription s : durableSubcribers.values()) {
if (s.isActive()== false){
result = false;
break;
}
if (s.getPrefetchSize()==0){
result = false;
break;
}
if (s.isSlowConsumer()){
result = false;
break;
}
if (s.getInFlightUsage() > 10){
result = false;
break;
}
}
}
return result;
}
}

View File

@ -96,6 +96,7 @@ public class PolicyEntry extends DestinationMapEntry {
private long inactiveTimoutBeforeGC = BaseDestination.DEFAULT_INACTIVE_TIMEOUT_BEFORE_GC;
private boolean reduceMemoryFootprint;
private NetworkBridgeFilterFactory networkBridgeFilterFactory;
private boolean doOptimzeMessageStorage = true;
public void configure(Broker broker,Queue queue) {
@ -171,6 +172,7 @@ public class PolicyEntry extends DestinationMapEntry {
destination.setGcWithNetworkConsumers(isGcWithNetworkConsumers());
destination.setInactiveTimoutBeforeGC(getInactiveTimoutBeforeGC());
destination.setReduceMemoryFootprint(isReduceMemoryFootprint());
destination.setDoOptimzeMessageStorage(isDoOptimzeMessageStorage());
}
public void configure(Broker broker, SystemUsage memoryManager, TopicSubscription subscription) {
@ -832,4 +834,12 @@ public class PolicyEntry extends DestinationMapEntry {
public NetworkBridgeFilterFactory getNetworkBridgeFilterFactory() {
return networkBridgeFilterFactory;
}
public boolean isDoOptimzeMessageStorage() {
return doOptimzeMessageStorage;
}
public void setDoOptimzeMessageStorage(boolean doOptimzeMessageStorage) {
this.doOptimzeMessageStorage = doOptimzeMessageStorage;
}
}

View File

@ -73,12 +73,28 @@ abstract public class AbstractMessageStore implements MessageStore {
return this.prioritizedMessages;
}
public void addMessage(final ConnectionContext context, final Message message,final boolean canOptimizeHint) throws IOException{
addMessage(context,message);
}
public Future<Object> asyncAddQueueMessage(final ConnectionContext context, final Message message) throws IOException {
addMessage(context, message);
return FUTURE;
}
public Future<Object> asyncAddQueueMessage(final ConnectionContext context, final Message message,final boolean canOptimizeHint) throws IOException {
addMessage(context, message,canOptimizeHint);
return FUTURE;
}
public Future<Object> asyncAddTopicMessage(final ConnectionContext context, final Message message,final boolean canOptimizeHint) throws IOException {
addMessage(context, message,canOptimizeHint);
return FUTURE;
}
public Future<Object> asyncAddTopicMessage(final ConnectionContext context, final Message message) throws IOException {
addMessage(context, message);
return FUTURE;

View File

@ -41,6 +41,16 @@ public interface MessageStore extends Service {
* @throws IOException
*/
void addMessage(ConnectionContext context, Message message) throws IOException;
/**
* Adds a message to the message store
*
* @param context context
* @param message
* @param canOptimizeHint - give a hint to the store that the message may be consumed before it hits the disk
* @throws IOException
*/
void addMessage(ConnectionContext context, Message message,boolean canOptimizeHint) throws IOException;
/**
* Adds a message to the message store
@ -52,6 +62,18 @@ public interface MessageStore extends Service {
* @throws IOException
*/
Future<Object> asyncAddQueueMessage(ConnectionContext context, Message message) throws IOException;
/**
* Adds a message to the message store
*
* @param context context
* @param message
* @param canOptimizeHint - give a hint to the store that the message may be consumed before it hits the disk
* @return a Future to track when this is complete
* @throws IOException
* @throws IOException
*/
Future<Object> asyncAddQueueMessage(ConnectionContext context, Message message,boolean canOptimizeHint) throws IOException;
/**
* Adds a message to the message store
@ -64,6 +86,19 @@ public interface MessageStore extends Service {
*/
Future<Object> asyncAddTopicMessage(ConnectionContext context, Message message) throws IOException;
/**
* Adds a message to the message store
*
* @param context context
* @param message
* @param canOptimizeHint - give a hint to the store that the message may be consumed before it hits the disk
* @return a Future to track when this is complete
* @throws IOException
* @throws IOException
*/
Future<Object> asyncAddTopicMessage(ConnectionContext context, Message message,boolean canOptimizeHint) throws IOException;
/**
* Looks up a message using either the String messageID or the
* messageNumber. Implementations are encouraged to fill in the missing key

View File

@ -44,6 +44,10 @@ public class ProxyMessageStore implements MessageStore {
delegate.addMessage(context, message);
}
public void addMessage(ConnectionContext context, Message message, boolean canOptimizeHint) throws IOException {
delegate.addMessage(context,message,canOptimizeHint);
}
public Message getMessage(MessageId identity) throws IOException {
return delegate.getMessage(identity);
}
@ -105,11 +109,19 @@ public class ProxyMessageStore implements MessageStore {
public Future<Object> asyncAddQueueMessage(ConnectionContext context, Message message) throws IOException {
return delegate.asyncAddQueueMessage(context, message);
}
public Future<Object> asyncAddQueueMessage(ConnectionContext context, Message message, boolean canOptimizeHint) throws IOException {
return delegate.asyncAddQueueMessage(context,message,canOptimizeHint);
}
public Future<Object> asyncAddTopicMessage(ConnectionContext context, Message message) throws IOException {
return delegate.asyncAddTopicMessage(context, message);
}
public Future<Object> asyncAddTopicMessage(ConnectionContext context, Message message, boolean canOptimizeHint) throws IOException {
return asyncAddTopicMessage(context,message,canOptimizeHint);
}
public void removeAsyncMessage(ConnectionContext context, MessageAck ack) throws IOException {
delegate.removeAsyncMessage(context, ack);
}

View File

@ -45,6 +45,10 @@ public class ProxyTopicMessageStore implements TopicMessageStore {
delegate.addMessage(context, message);
}
public void addMessage(ConnectionContext context, Message message, boolean canOptimizeHint) throws IOException {
delegate.addMessage(context,message,canOptimizeHint);
}
public Message getMessage(MessageId identity) throws IOException {
return delegate.getMessage(identity);
}
@ -146,10 +150,18 @@ public class ProxyTopicMessageStore implements TopicMessageStore {
return delegate.asyncAddTopicMessage(context, message);
}
public Future<Object> asyncAddTopicMessage(ConnectionContext context, Message message, boolean canOptimizeHint) throws IOException {
return delegate.asyncAddTopicMessage(context,message,canOptimizeHint);
}
public Future<Object> asyncAddQueueMessage(ConnectionContext context, Message message) throws IOException {
return delegate.asyncAddQueueMessage(context, message);
}
public Future<Object> asyncAddQueueMessage(ConnectionContext context, Message message, boolean canOptimizeHint) throws IOException {
return delegate.asyncAddQueueMessage(context,message,canOptimizeHint);
}
public void removeAsyncMessage(ConnectionContext context, MessageAck ack) throws IOException {
delegate.removeAsyncMessage(context, ack);
}