mirror of https://github.com/apache/activemq.git
enabled the hash bucket based implementation of MessageGroupMap by default and made the bucketCount configurable in the destination policy map
git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@366208 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
e1d4780221
commit
d7f91463d6
|
@ -19,9 +19,9 @@ package org.apache.activemq.broker.region;
|
|||
import edu.emory.mathcs.backport.java.util.concurrent.CopyOnWriteArrayList;
|
||||
|
||||
import org.apache.activemq.broker.ConnectionContext;
|
||||
import org.apache.activemq.broker.region.group.MessageGroupHashBucket;
|
||||
import org.apache.activemq.broker.region.group.MessageGroupMap;
|
||||
import org.apache.activemq.broker.region.group.MessageGroupSet;
|
||||
import org.apache.activemq.broker.region.group.SimpleMessageGroupMap;
|
||||
import org.apache.activemq.broker.region.policy.DeadLetterStrategy;
|
||||
import org.apache.activemq.broker.region.policy.DispatchPolicy;
|
||||
import org.apache.activemq.broker.region.policy.RoundRobinDispatchPolicy;
|
||||
|
@ -45,7 +45,6 @@ import java.io.IOException;
|
|||
import java.util.Iterator;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
|
||||
/**
|
||||
* The Queue is a List of MessageEntry objects that are dispatched to matching
|
||||
|
@ -65,7 +64,8 @@ public class Queue implements Destination {
|
|||
protected final DestinationStatistics destinationStatistics = new DestinationStatistics();
|
||||
|
||||
private Subscription exclusiveOwner;
|
||||
private final MessageGroupMap messageGroupOwners = new SimpleMessageGroupMap();
|
||||
private MessageGroupMap messageGroupOwners;
|
||||
private int messageGroupHashBucketCount = 1024;
|
||||
|
||||
protected long garbageSize = 0;
|
||||
protected long garbageSizeBeforeCollection = 1000;
|
||||
|
@ -186,7 +186,7 @@ public class Queue implements Destination {
|
|||
}
|
||||
|
||||
ConsumerId consumerId = sub.getConsumerInfo().getConsumerId();
|
||||
MessageGroupSet ownedGroups = messageGroupOwners.removeConsumer(consumerId);
|
||||
MessageGroupSet ownedGroups = getMessageGroupOwners().removeConsumer(consumerId);
|
||||
|
||||
synchronized (messages) {
|
||||
if (!sub.getConsumerInfo().isBrowser()) {
|
||||
|
@ -323,6 +323,9 @@ public class Queue implements Destination {
|
|||
}
|
||||
|
||||
public MessageGroupMap getMessageGroupOwners() {
|
||||
if (messageGroupOwners == null) {
|
||||
messageGroupOwners = new MessageGroupHashBucket(messageGroupHashBucketCount );
|
||||
}
|
||||
return messageGroupOwners;
|
||||
}
|
||||
|
||||
|
@ -342,6 +345,15 @@ public class Queue implements Destination {
|
|||
this.deadLetterStrategy = deadLetterStrategy;
|
||||
}
|
||||
|
||||
public int getMessageGroupHashBucketCount() {
|
||||
return messageGroupHashBucketCount;
|
||||
}
|
||||
|
||||
public void setMessageGroupHashBucketCount(int messageGroupHashBucketCount) {
|
||||
this.messageGroupHashBucketCount = messageGroupHashBucketCount;
|
||||
}
|
||||
|
||||
|
||||
// Implementation methods
|
||||
// -------------------------------------------------------------------------
|
||||
private MessageReference createMessageReference(Message message) {
|
||||
|
|
|
@ -34,6 +34,7 @@ public class PolicyEntry extends DestinationMapEntry {
|
|||
private SubscriptionRecoveryPolicy subscriptionRecoveryPolicy;
|
||||
private boolean sendAdvisoryIfNoConsumers;
|
||||
private DeadLetterStrategy deadLetterStrategy;
|
||||
private int messageGroupHashBucketCount = 1024;
|
||||
|
||||
public void configure(Queue queue) {
|
||||
if (dispatchPolicy != null) {
|
||||
|
@ -42,6 +43,7 @@ public class PolicyEntry extends DestinationMapEntry {
|
|||
if (deadLetterStrategy != null) {
|
||||
queue.setDeadLetterStrategy(deadLetterStrategy);
|
||||
}
|
||||
queue.setMessageGroupHashBucketCount(messageGroupHashBucketCount);
|
||||
}
|
||||
|
||||
public void configure(Topic topic) {
|
||||
|
@ -97,6 +99,20 @@ public class PolicyEntry extends DestinationMapEntry {
|
|||
public void setDeadLetterStrategy(DeadLetterStrategy deadLetterStrategy) {
|
||||
this.deadLetterStrategy = deadLetterStrategy;
|
||||
}
|
||||
|
||||
public int getMessageGroupHashBucketCount() {
|
||||
return messageGroupHashBucketCount;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the number of hash buckets to use for the message group functionality.
|
||||
* This is only applicable to using message groups to parallelize processing of a queue
|
||||
* while preserving order across an individual JMSXGroupID header value.
|
||||
* This value sets the number of hash buckets that will be used (i.e. the maximum possible concurrency).
|
||||
*/
|
||||
public void setMessageGroupHashBucketCount(int messageGroupHashBucketCount) {
|
||||
this.messageGroupHashBucketCount = messageGroupHashBucketCount;
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue