This commit is contained in:
rajdavies 2013-10-10 20:29:40 +01:00
parent e000471cf4
commit 468e697651
14 changed files with 365 additions and 33 deletions

View File

@ -16,6 +16,8 @@
*/
package org.apache.activemq.broker.jmx;
import java.util.Map;
import javax.management.openmbean.CompositeData;
import javax.management.openmbean.OpenDataException;
import javax.jms.JMSException;
@ -188,4 +190,40 @@ public class QueueView extends DestinationView implements QueueViewMBean {
}
return false;
}
/**
* @return a Map of groupNames and ConsumerIds
*/
@Override
public Map<String, String> getMessageGroups() {
Queue queue = (Queue) destination;
return queue.getMessageGroupOwners().getGroups();
}
/**
* @return the message group type implementation (simple,bucket,cached)
*/
@Override
public String getMessageGroupType() {
Queue queue = (Queue) destination;
return queue.getMessageGroupOwners().getType();
}
/**
* remove a message group = has the effect of rebalancing group
*/
@Override
public void removeMessageGroup(@MBeanInfo("groupName") String groupName) {
Queue queue = (Queue) destination;
queue.getMessageGroupOwners().removeGroup(groupName);
}
/**
* remove all the message groups - will rebalance all message groups across consumers
*/
@Override
public void removeAllMessageGroups() {
Queue queue = (Queue) destination;
queue.getMessageGroupOwners().removeAll();
}
}

View File

@ -16,6 +16,8 @@
*/
package org.apache.activemq.broker.jmx;
import java.util.Map;
import javax.management.openmbean.CompositeData;
import javax.management.openmbean.OpenDataException;
@ -180,4 +182,31 @@ public interface QueueViewMBean extends DestinationViewMBean {
*/
@MBeanInfo("Caching is enabled")
boolean isCacheEnabled();
/**
* @return a Map of groupNames and ConsumerIds
*/
@MBeanInfo("Map of groupNames and ConsumerIds")
Map<String,String> getMessageGroups();
/**
* @return the message group type implementation (simple,bucket,cached)
*/
@MBeanInfo("group implementation (simple,bucket,cached)")
String getMessageGroupType();
/**
* remove a message group = has the effect of rebalancing group
* @param groupName
*/
@MBeanInfo("remove a message group by its groupName")
void removeMessageGroup(@MBeanInfo("groupName")String groupName);
/**
* remove all the message groups - will rebalance all message groups across consumers
*/
@MBeanInfo("emove all the message groups - will rebalance all message groups across consumers")
void removeAllMessageGroups();
}

View File

@ -17,18 +17,7 @@
package org.apache.activemq.broker.region;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.*;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
@ -46,7 +35,6 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
import javax.jms.InvalidSelectorException;
import javax.jms.JMSException;
import javax.jms.ResourceAllocationException;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.ProducerBrokerExchange;
@ -56,24 +44,14 @@ import org.apache.activemq.broker.region.cursors.PendingMessageCursor;
import org.apache.activemq.broker.region.cursors.PrioritizedPendingList;
import org.apache.activemq.broker.region.cursors.StoreQueueCursor;
import org.apache.activemq.broker.region.cursors.VMPendingMessageCursor;
import org.apache.activemq.broker.region.group.MessageGroupHashBucketFactory;
import org.apache.activemq.broker.region.group.CachedMessageGroupMapFactory;
import org.apache.activemq.broker.region.group.MessageGroupMap;
import org.apache.activemq.broker.region.group.MessageGroupMapFactory;
import org.apache.activemq.broker.region.policy.DeadLetterStrategy;
import org.apache.activemq.broker.region.policy.DispatchPolicy;
import org.apache.activemq.broker.region.policy.RoundRobinDispatchPolicy;
import org.apache.activemq.broker.util.InsertionCountList;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.ConsumerId;
import org.apache.activemq.command.ExceptionResponse;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageDispatchNotification;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.ProducerAck;
import org.apache.activemq.command.ProducerInfo;
import org.apache.activemq.command.Response;
import org.apache.activemq.command.*;
import org.apache.activemq.filter.BooleanExpression;
import org.apache.activemq.filter.MessageEvaluationContext;
import org.apache.activemq.filter.NonCachedMessageEvaluationContext;
@ -114,7 +92,7 @@ public class Queue extends BaseDestination implements Task, UsageListener {
protected PendingList redeliveredWaitingDispatch = new OrderedPendingList();
private MessageGroupMap messageGroupOwners;
private DispatchPolicy dispatchPolicy = new RoundRobinDispatchPolicy();
private MessageGroupMapFactory messageGroupMapFactory = new MessageGroupHashBucketFactory();
private MessageGroupMapFactory messageGroupMapFactory = new CachedMessageGroupMapFactory();
final Lock sendLock = new ReentrantLock();
private ExecutorService executor;
private final Map<MessageId, Runnable> messagesWaitingForSpace = new LinkedHashMap<MessageId, Runnable>();

View File

@ -0,0 +1,87 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.broker.region.group;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import org.apache.activemq.command.ConsumerId;
import org.apache.activemq.memory.LRUMap;
/**
* A simple implementation which tracks every individual GroupID value in a LRUCache
*
*
*/
public class CachedMessageGroupMap implements MessageGroupMap {
private LRUMap<String, ConsumerId> cache = new LRUMap<String, ConsumerId>(1024);
public synchronized void put(String groupId, ConsumerId consumerId) {
cache.put(groupId, consumerId);
}
public synchronized ConsumerId get(String groupId) {
return cache.get(groupId);
}
public synchronized ConsumerId removeGroup(String groupId) {
return cache.remove(groupId);
}
public synchronized MessageGroupSet removeConsumer(ConsumerId consumerId) {
SimpleMessageGroupSet ownedGroups = new SimpleMessageGroupSet();
Map<String,ConsumerId> map = new HashMap<String, ConsumerId>();
map.putAll(cache);
for (Iterator<String> iter = map.keySet().iterator(); iter.hasNext();) {
String group = iter.next();
ConsumerId owner = map.get(group);
if (owner.equals(consumerId)) {
ownedGroups.add(group);
}
}
for (String group:ownedGroups.getUnderlyingSet()){
cache.remove(group);
}
return ownedGroups;
}
@Override
public synchronized void removeAll(){
cache.clear();
}
@Override
public synchronized Map<String, String> getGroups() {
Map<String,String> result = new HashMap<String,String>();
for (Map.Entry<String,ConsumerId>entry: cache.entrySet()){
result.put(entry.getKey(),entry.getValue().toString());
}
return result;
}
@Override
public String getType() {
return "cached";
}
public String toString() {
return "message groups: " + cache.size();
}
}

View File

@ -0,0 +1,33 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.broker.region.group;
/**
* A factory to create instances of {@link org.apache.activemq.broker.region.group.SimpleMessageGroupMap} when implementing the
* <a href="http://activemq.apache.org/message-groups.html">Message Groups</a> functionality.
*
* @org.apache.xbean.XBean
*
*
*/
public class CachedMessageGroupMapFactory implements MessageGroupMapFactory {
public MessageGroupMap createMessageGroupMap() {
return new CachedMessageGroupMap();
}
}

View File

@ -0,0 +1,39 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.broker.region.group;
import java.io.IOException;
import org.apache.activemq.util.FactoryFinder;
import org.apache.activemq.util.IOExceptionSupport;
public class GroupFactoryFinder {
private static final FactoryFinder GROUP_FACTORY_FINDER = new FactoryFinder("META-INF/services/org/apache/activemq/groups/");
private GroupFactoryFinder() {
}
public static MessageGroupMapFactory createMessageGroupMapFactory(String type) throws IOException {
try {
return (MessageGroupMapFactory)GROUP_FACTORY_FINDER.newInstance(type);
} catch (Throwable e) {
throw IOExceptionSupport.create("Could not load " + type + " factory:" + e, e);
}
}
}

View File

@ -16,7 +16,10 @@
*/
package org.apache.activemq.broker.region.group;
import java.util.Map;
import org.apache.activemq.command.ConsumerId;
import org.apache.activemq.memory.LRUMap;
/**
* Uses hash-code buckets to associate consumers with sets of message group IDs.
@ -27,30 +30,37 @@ public class MessageGroupHashBucket implements MessageGroupMap {
private final int bucketCount;
private final ConsumerId[] consumers;
private LRUMap<String,String>cache=new LRUMap<String,String>(64);
public MessageGroupHashBucket(int bucketCount) {
this.bucketCount = bucketCount;
this.consumers = new ConsumerId[bucketCount];
}
public void put(String groupId, ConsumerId consumerId) {
public synchronized void put(String groupId, ConsumerId consumerId) {
int bucket = getBucketNumber(groupId);
consumers[bucket] = consumerId;
if (consumerId != null){
cache.put(groupId,consumerId.toString());
}
}
public ConsumerId get(String groupId) {
public synchronized ConsumerId get(String groupId) {
int bucket = getBucketNumber(groupId);
//excersise cache
cache.get(groupId);
return consumers[bucket];
}
public ConsumerId removeGroup(String groupId) {
public synchronized ConsumerId removeGroup(String groupId) {
int bucket = getBucketNumber(groupId);
ConsumerId answer = consumers[bucket];
consumers[bucket] = null;
cache.remove(groupId);
return answer;
}
public MessageGroupSet removeConsumer(ConsumerId consumerId) {
public synchronized MessageGroupSet removeConsumer(ConsumerId consumerId) {
MessageGroupSet answer = null;
for (int i = 0; i < consumers.length; i++) {
ConsumerId owner = consumers[i];
@ -66,6 +76,23 @@ public class MessageGroupHashBucket implements MessageGroupMap {
return answer;
}
public synchronized void removeAll(){
for (int i =0; i < consumers.length; i++){
consumers[i] = null;
}
}
@Override
public Map<String, String> getGroups() {
return cache;
}
@Override
public String getType() {
return "bucket";
}
public String toString() {
int count = 0;
for (int i = 0; i < consumers.length; i++) {

View File

@ -16,6 +16,8 @@
*/
package org.apache.activemq.broker.region.group;
import java.util.Map;
import org.apache.activemq.command.ConsumerId;
/**
@ -33,4 +35,13 @@ public interface MessageGroupMap {
MessageGroupSet removeConsumer(ConsumerId consumerId);
void removeAll();
/**
* @return a map of group names and associated consumer Id
*/
Map<String,String> getGroups();
String getType();
}

View File

@ -16,6 +16,7 @@
*/
package org.apache.activemq.broker.region.group;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@ -57,6 +58,26 @@ public class SimpleMessageGroupMap implements MessageGroupMap {
return ownedGroups;
}
@Override
public void removeAll(){
map.clear();
}
@Override
public Map<String, String> getGroups() {
Map<String,String> result = new HashMap<String,String>();
for (Map.Entry<String,ConsumerId>entry:map.entrySet()){
result.put(entry.getKey(),entry.getValue().toString());
}
return result;
}
@Override
public String getType() {
return "simple";
}
public String toString() {
return "message groups: " + map.size();
}

View File

@ -36,4 +36,8 @@ public class SimpleMessageGroupSet implements MessageGroupSet {
set.add(group);
}
protected Set<String> getUnderlyingSet(){
return set;
}
}

View File

@ -24,12 +24,11 @@ import org.apache.activemq.broker.region.DurableTopicSubscription;
import org.apache.activemq.broker.region.Queue;
import org.apache.activemq.broker.region.QueueBrowserSubscription;
import org.apache.activemq.broker.region.QueueSubscription;
import org.apache.activemq.broker.region.RegionBroker;
import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.broker.region.Topic;
import org.apache.activemq.broker.region.TopicSubscription;
import org.apache.activemq.broker.region.cursors.PendingMessageCursor;
import org.apache.activemq.broker.region.group.MessageGroupHashBucketFactory;
import org.apache.activemq.broker.region.group.GroupFactoryFinder;
import org.apache.activemq.broker.region.group.MessageGroupMapFactory;
import org.apache.activemq.filter.DestinationMapEntry;
import org.apache.activemq.network.NetworkBridgeFilterFactory;
@ -54,6 +53,7 @@ public class PolicyEntry extends DestinationMapEntry {
private PendingMessageLimitStrategy pendingMessageLimitStrategy;
private MessageEvictionStrategy messageEvictionStrategy;
private long memoryLimit;
private String messageGroupMapFactoryType = "cached";
private MessageGroupMapFactory messageGroupMapFactory;
private PendingQueueMessageStoragePolicy pendingQueuePolicy;
private PendingDurableSubscriberMessageStoragePolicy pendingDurableSubscriberPolicy;
@ -395,7 +395,11 @@ public class PolicyEntry extends DestinationMapEntry {
public MessageGroupMapFactory getMessageGroupMapFactory() {
if (messageGroupMapFactory == null) {
messageGroupMapFactory = new MessageGroupHashBucketFactory();
try {
messageGroupMapFactory = GroupFactoryFinder.createMessageGroupMapFactory(getMessageGroupMapFactoryType());
}catch(Exception e){
LOG.error("Failed to create message group Factory ",e);
}
}
return messageGroupMapFactory;
}
@ -410,6 +414,16 @@ public class PolicyEntry extends DestinationMapEntry {
this.messageGroupMapFactory = messageGroupMapFactory;
}
public String getMessageGroupMapFactoryType() {
return messageGroupMapFactoryType;
}
public void setMessageGroupMapFactoryType(String messageGroupMapFactoryType) {
this.messageGroupMapFactoryType = messageGroupMapFactoryType;
}
/**
* @return the pendingDurableSubscriberPolicy
*/

View File

@ -0,0 +1,17 @@
## ---------------------------------------------------------------------------
## Licensed to the Apache Software Foundation (ASF) under one or more
## contributor license agreements. See the NOTICE file distributed with
## this work for additional information regarding copyright ownership.
## The ASF licenses this file to You under the Apache License, Version 2.0
## (the "License"); you may not use this file except in compliance with
## the License. You may obtain a copy of the License at
##
## http://www.apache.org/licenses/LICENSE-2.0
##
## Unless required by applicable law or agreed to in writing, software
## distributed under the License is distributed on an "AS IS" BASIS,
## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
## See the License for the specific language governing permissions and
## limitations under the License.
## ---------------------------------------------------------------------------
class=org.apache.activemq.broker.region.group.MessageGroupHashBucketFactory

View File

@ -0,0 +1,17 @@
## ---------------------------------------------------------------------------
## Licensed to the Apache Software Foundation (ASF) under one or more
## contributor license agreements. See the NOTICE file distributed with
## this work for additional information regarding copyright ownership.
## The ASF licenses this file to You under the Apache License, Version 2.0
## (the "License"); you may not use this file except in compliance with
## the License. You may obtain a copy of the License at
##
## http://www.apache.org/licenses/LICENSE-2.0
##
## Unless required by applicable law or agreed to in writing, software
## distributed under the License is distributed on an "AS IS" BASIS,
## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
## See the License for the specific language governing permissions and
## limitations under the License.
## ---------------------------------------------------------------------------
class=org.apache.activemq.broker.region.group.CachedMessageGroupMapFactory

View File

@ -0,0 +1,17 @@
## ---------------------------------------------------------------------------
## Licensed to the Apache Software Foundation (ASF) under one or more
## contributor license agreements. See the NOTICE file distributed with
## this work for additional information regarding copyright ownership.
## The ASF licenses this file to You under the Apache License, Version 2.0
## (the "License"); you may not use this file except in compliance with
## the License. You may obtain a copy of the License at
##
## http://www.apache.org/licenses/LICENSE-2.0
##
## Unless required by applicable law or agreed to in writing, software
## distributed under the License is distributed on an "AS IS" BASIS,
## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
## See the License for the specific language governing permissions and
## limitations under the License.
## ---------------------------------------------------------------------------
class=org.apache.activemq.broker.region.group.SimpleMessageGroupMapFactory