From d7f91463d6c597fe301c328c9173703590cf17a5 Mon Sep 17 00:00:00 2001 From: James Strachan Date: Thu, 5 Jan 2006 15:58:49 +0000 Subject: [PATCH] enabled the hash bucket based implementation of MessageGroupMap by default and made the bucketCount configurable in the destination policy map git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@366208 13f79535-47bb-0310-9956-ffa450edef68 --- .../apache/activemq/broker/region/Queue.java | 20 +++++++++++++++---- .../broker/region/policy/PolicyEntry.java | 16 +++++++++++++++ 2 files changed, 32 insertions(+), 4 deletions(-) diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java index 54fa675e6a..5c2921622d 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java @@ -19,9 +19,9 @@ package org.apache.activemq.broker.region; import edu.emory.mathcs.backport.java.util.concurrent.CopyOnWriteArrayList; import org.apache.activemq.broker.ConnectionContext; +import org.apache.activemq.broker.region.group.MessageGroupHashBucket; import org.apache.activemq.broker.region.group.MessageGroupMap; import org.apache.activemq.broker.region.group.MessageGroupSet; -import org.apache.activemq.broker.region.group.SimpleMessageGroupMap; import org.apache.activemq.broker.region.policy.DeadLetterStrategy; import org.apache.activemq.broker.region.policy.DispatchPolicy; import org.apache.activemq.broker.region.policy.RoundRobinDispatchPolicy; @@ -45,7 +45,6 @@ import java.io.IOException; import java.util.Iterator; import java.util.LinkedList; import java.util.List; -import java.util.Set; /** * The Queue is a List of MessageEntry objects that are dispatched to matching @@ -65,7 +64,8 @@ public class Queue implements Destination { protected final DestinationStatistics destinationStatistics = new DestinationStatistics(); private Subscription exclusiveOwner; - private final MessageGroupMap messageGroupOwners = new SimpleMessageGroupMap(); + private MessageGroupMap messageGroupOwners; + private int messageGroupHashBucketCount = 1024; protected long garbageSize = 0; protected long garbageSizeBeforeCollection = 1000; @@ -186,7 +186,7 @@ public class Queue implements Destination { } ConsumerId consumerId = sub.getConsumerInfo().getConsumerId(); - MessageGroupSet ownedGroups = messageGroupOwners.removeConsumer(consumerId); + MessageGroupSet ownedGroups = getMessageGroupOwners().removeConsumer(consumerId); synchronized (messages) { if (!sub.getConsumerInfo().isBrowser()) { @@ -323,6 +323,9 @@ public class Queue implements Destination { } public MessageGroupMap getMessageGroupOwners() { + if (messageGroupOwners == null) { + messageGroupOwners = new MessageGroupHashBucket(messageGroupHashBucketCount ); + } return messageGroupOwners; } @@ -342,6 +345,15 @@ public class Queue implements Destination { this.deadLetterStrategy = deadLetterStrategy; } + public int getMessageGroupHashBucketCount() { + return messageGroupHashBucketCount; + } + + public void setMessageGroupHashBucketCount(int messageGroupHashBucketCount) { + this.messageGroupHashBucketCount = messageGroupHashBucketCount; + } + + // Implementation methods // ------------------------------------------------------------------------- private MessageReference createMessageReference(Message message) { diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java index 2b437082bf..0df1589fd9 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java @@ -34,6 +34,7 @@ public class PolicyEntry extends DestinationMapEntry { private SubscriptionRecoveryPolicy subscriptionRecoveryPolicy; private boolean sendAdvisoryIfNoConsumers; private DeadLetterStrategy deadLetterStrategy; + private int messageGroupHashBucketCount = 1024; public void configure(Queue queue) { if (dispatchPolicy != null) { @@ -42,6 +43,7 @@ public class PolicyEntry extends DestinationMapEntry { if (deadLetterStrategy != null) { queue.setDeadLetterStrategy(deadLetterStrategy); } + queue.setMessageGroupHashBucketCount(messageGroupHashBucketCount); } public void configure(Topic topic) { @@ -97,6 +99,20 @@ public class PolicyEntry extends DestinationMapEntry { public void setDeadLetterStrategy(DeadLetterStrategy deadLetterStrategy) { this.deadLetterStrategy = deadLetterStrategy; } + + public int getMessageGroupHashBucketCount() { + return messageGroupHashBucketCount; + } + + /** + * Sets the number of hash buckets to use for the message group functionality. + * This is only applicable to using message groups to parallelize processing of a queue + * while preserving order across an individual JMSXGroupID header value. + * This value sets the number of hash buckets that will be used (i.e. the maximum possible concurrency). + */ + public void setMessageGroupHashBucketCount(int messageGroupHashBucketCount) { + this.messageGroupHashBucketCount = messageGroupHashBucketCount; + } }