a fix for AMQ-769 to allow the MessageGroupMap implementation to be specified via a policyEntry

http://incubator.apache.org/activemq/per-destination-policies.html

so you can specify something like this...

 <broker>

    <destinationPolicy>
      <policyMap>
        <policyEntries>
          <policyEntry queue=">">
           <messageGroupMapFactory>
            <simpleMessageGroupMapFactory/>
    

git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@429995 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
James Strachan 2006-08-09 08:44:27 +00:00
parent a7b9e8ce9c
commit e7e15b5559
5 changed files with 151 additions and 33 deletions

View File

@ -17,15 +17,12 @@
*/
package org.apache.activemq.broker.region;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import edu.emory.mathcs.backport.java.util.concurrent.CopyOnWriteArrayList;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.group.MessageGroupHashBucket;
import org.apache.activemq.broker.region.group.MessageGroupHashBucketFactory;
import org.apache.activemq.broker.region.group.MessageGroupMap;
import org.apache.activemq.broker.region.group.MessageGroupMapFactory;
import org.apache.activemq.broker.region.group.MessageGroupSet;
import org.apache.activemq.broker.region.policy.DeadLetterStrategy;
import org.apache.activemq.broker.region.policy.DispatchPolicy;
@ -47,7 +44,11 @@ import org.apache.activemq.util.BrokerSupport;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import edu.emory.mathcs.backport.java.util.concurrent.CopyOnWriteArrayList;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
/**
* The Queue is a List of MessageEntry objects that are dispatched to matching
@ -68,7 +69,6 @@ public class Queue implements Destination {
private LockOwner exclusiveOwner;
private MessageGroupMap messageGroupOwners;
private int messageGroupHashBucketCount = 1024;
protected long garbageSize = 0;
protected long garbageSizeBeforeCollection = 1000;
@ -76,7 +76,8 @@ public class Queue implements Destination {
protected final MessageStore store;
protected int highestSubscriptionPriority;
private DeadLetterStrategy deadLetterStrategy = new SharedDeadLetterStrategy();
private MessageGroupMapFactory messageGroupMapFactory = new MessageGroupHashBucketFactory();
public Queue(ActiveMQDestination destination, final UsageManager memoryManager, MessageStore store,
DestinationStatistics parentStats, TaskRunnerFactory taskFactory) throws Exception {
this.destination = destination;
@ -364,7 +365,7 @@ public class Queue implements Destination {
public MessageGroupMap getMessageGroupOwners() {
if (messageGroupOwners == null) {
messageGroupOwners = new MessageGroupHashBucket(messageGroupHashBucketCount);
messageGroupOwners = getMessageGroupMapFactory().createMessageGroupMap();
}
return messageGroupOwners;
}
@ -385,14 +386,14 @@ public class Queue implements Destination {
this.deadLetterStrategy = deadLetterStrategy;
}
public int getMessageGroupHashBucketCount() {
return messageGroupHashBucketCount;
public MessageGroupMapFactory getMessageGroupMapFactory() {
return messageGroupMapFactory;
}
public void setMessageGroupHashBucketCount(int messageGroupHashBucketCount) {
this.messageGroupHashBucketCount = messageGroupHashBucketCount;
public void setMessageGroupMapFactory(MessageGroupMapFactory messageGroupMapFactory) {
this.messageGroupMapFactory = messageGroupMapFactory;
}
public void resetStatistics() {
getDestinationStatistics().reset();
}

View File

@ -0,0 +1,52 @@
/**
*
* Copyright 2005-2006 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.broker.region.group;
/**
* A factory to create instances of {@link SimpleMessageGroupMap} when
* implementing the <a
* href="http://incubator.apache.org/activemq/message-groups.html">Message
* Groups</a> functionality.
*
* @org.apache.xbean.XBean
*
* @version $Revision$
*/
public class MessageGroupHashBucketFactory implements MessageGroupMapFactory {
private int bucketCount = 1024;
public MessageGroupMap createMessageGroupMap() {
return new MessageGroupHashBucket(bucketCount);
}
public int getBucketCount() {
return bucketCount;
}
/**
* Sets the number of hash buckets to use for the message group
* functionality. This is only applicable to using message groups to
* parallelize processing of a queue while preserving order across an
* individual JMSXGroupID header value. This value sets the number of hash
* buckets that will be used (i.e. the maximum possible concurrency).
*/
public void setBucketCount(int bucketCount) {
this.bucketCount = bucketCount;
}
}

View File

@ -0,0 +1,29 @@
/**
*
* Copyright 2005-2006 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.broker.region.group;
/**
* Represents a factory used to create new instances of {@link MessageGroupMap}
* for a destination.
*
* @version $Revision$
*/
public interface MessageGroupMapFactory {
public MessageGroupMap createMessageGroupMap();
}

View File

@ -0,0 +1,33 @@
/**
*
* Copyright 2005-2006 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.broker.region.group;
/**
* A factory to create instances of {@link SimpleMessageGroupMap} when implementing the
* <a href="http://incubator.apache.org/activemq/message-groups.html">Message Groups</a> functionality.
*
* @org.apache.xbean.XBean
*
* @version $Revision$
*/
public class SimpleMessageGroupMapFactory implements MessageGroupMapFactory {
public MessageGroupMap createMessageGroupMap() {
return new SimpleMessageGroupMap();
}
}

View File

@ -20,6 +20,8 @@ package org.apache.activemq.broker.region.policy;
import org.apache.activemq.broker.region.Queue;
import org.apache.activemq.broker.region.Topic;
import org.apache.activemq.broker.region.TopicSubscription;
import org.apache.activemq.broker.region.group.MessageGroupHashBucketFactory;
import org.apache.activemq.broker.region.group.MessageGroupMapFactory;
import org.apache.activemq.filter.DestinationMapEntry;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -40,11 +42,11 @@ public class PolicyEntry extends DestinationMapEntry {
private SubscriptionRecoveryPolicy subscriptionRecoveryPolicy;
private boolean sendAdvisoryIfNoConsumers;
private DeadLetterStrategy deadLetterStrategy;
private int messageGroupHashBucketCount = 1024;
private PendingMessageLimitStrategy pendingMessageLimitStrategy;
private MessageEvictionStrategy messageEvictionStrategy;
private long memoryLimit;
private MessageGroupMapFactory messageGroupMapFactory;
public void configure(Queue queue) {
if (dispatchPolicy != null) {
queue.setDispatchPolicy(dispatchPolicy);
@ -52,7 +54,7 @@ public class PolicyEntry extends DestinationMapEntry {
if (deadLetterStrategy != null) {
queue.setDeadLetterStrategy(deadLetterStrategy);
}
queue.setMessageGroupHashBucketCount(messageGroupHashBucketCount);
queue.setMessageGroupMapFactory(getMessageGroupMapFactory());
if( memoryLimit>0 ) {
queue.getUsageManager().setLimit(memoryLimit);
}
@ -137,21 +139,6 @@ public class PolicyEntry extends DestinationMapEntry {
this.deadLetterStrategy = deadLetterStrategy;
}
public int getMessageGroupHashBucketCount() {
return messageGroupHashBucketCount;
}
/**
* Sets the number of hash buckets to use for the message group
* functionality. This is only applicable to using message groups to
* parallelize processing of a queue while preserving order across an
* individual JMSXGroupID header value. This value sets the number of hash
* buckets that will be used (i.e. the maximum possible concurrency).
*/
public void setMessageGroupHashBucketCount(int messageGroupHashBucketCount) {
this.messageGroupHashBucketCount = messageGroupHashBucketCount;
}
public PendingMessageLimitStrategy getPendingMessageLimitStrategy() {
return pendingMessageLimitStrategy;
}
@ -189,4 +176,20 @@ public class PolicyEntry extends DestinationMapEntry {
this.memoryLimit = memoryLimit;
}
public MessageGroupMapFactory getMessageGroupMapFactory() {
if (messageGroupMapFactory == null) {
messageGroupMapFactory = new MessageGroupHashBucketFactory();
}
return messageGroupMapFactory;
}
/**
* Sets the factory used to create new instances of {MessageGroupMap} used to implement the
* <a href="http://incubator.apache.org/activemq/message-groups.html">Message Groups</a> functionality.
*/
public void setMessageGroupMapFactory(MessageGroupMapFactory messageGroupMapFactory) {
this.messageGroupMapFactory = messageGroupMapFactory;
}
}