minor refactor to allow pluggable MessageGroupMap implementations so that we don't have to keep around every single GroupID in RAM if we don't want to (which could very easily become a RAM leak) and can use a provider which uses hashbuckets instead.

added an initial implementation of a hashbucket based MessageGroupMap which should be fine; am waiting on a unit test run to complete before I dare enable it by default :)

git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@366204 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
James Strachan 2006-01-05 15:36:07 +00:00
parent 71bd33d3ec
commit e1d4780221
13 changed files with 878 additions and 543 deletions

View File

@ -16,13 +16,12 @@
*/
package org.apache.activemq.broker.region;
import java.io.IOException;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import edu.emory.mathcs.backport.java.util.concurrent.CopyOnWriteArrayList;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.group.MessageGroupMap;
import org.apache.activemq.broker.region.group.MessageGroupSet;
import org.apache.activemq.broker.region.group.SimpleMessageGroupMap;
import org.apache.activemq.broker.region.policy.DeadLetterStrategy;
import org.apache.activemq.broker.region.policy.DispatchPolicy;
import org.apache.activemq.broker.region.policy.RoundRobinDispatchPolicy;
@ -42,8 +41,11 @@ import org.apache.activemq.transaction.Synchronization;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
import edu.emory.mathcs.backport.java.util.concurrent.CopyOnWriteArrayList;
import java.io.IOException;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Set;
/**
* The Queue is a List of MessageEntry objects that are dispatched to matching
@ -63,7 +65,7 @@ public class Queue implements Destination {
protected final DestinationStatistics destinationStatistics = new DestinationStatistics();
private Subscription exclusiveOwner;
private final ConcurrentHashMap messageGroupOwners = new ConcurrentHashMap();
private final MessageGroupMap messageGroupOwners = new SimpleMessageGroupMap();
protected long garbageSize = 0;
protected long garbageSizeBeforeCollection = 1000;
@ -183,16 +185,8 @@ public class Queue implements Destination {
wasExclusiveOwner = true;
}
HashSet ownedGroups = new HashSet();
ConsumerId consumerId = sub.getConsumerInfo().getConsumerId();
for (Iterator iter = messageGroupOwners.keySet().iterator(); iter.hasNext();) {
String group = (String) iter.next();
ConsumerId owner = (ConsumerId) messageGroupOwners.get(group);
if (owner.equals(consumerId)) {
ownedGroups.add(group);
iter.remove();
}
}
MessageGroupSet ownedGroups = messageGroupOwners.removeConsumer(consumerId);
synchronized (messages) {
if (!sub.getConsumerInfo().isBrowser()) {
@ -305,7 +299,7 @@ public class Queue implements Destination {
public String toString() {
return "Queue: destination=" + destination.getPhysicalName() + ", subscriptions=" + consumers.size() + ", memory=" + usageManager.getPercentUsage()
+ "%, size=" + messages.size() + ", in flight groups=" + messageGroupOwners.size();
+ "%, size=" + messages.size() + ", in flight groups=" + messageGroupOwners;
}
public void start() throws Exception {
@ -328,7 +322,7 @@ public class Queue implements Destination {
return destinationStatistics;
}
public ConcurrentHashMap getMessageGroupOwners() {
public MessageGroupMap getMessageGroupOwners() {
return messageGroupOwners;
}

View File

@ -21,6 +21,7 @@ import java.io.IOException;
import javax.jms.InvalidSelectorException;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.group.MessageGroupMap;
import org.apache.activemq.command.ConsumerId;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.MessageAck;
@ -77,7 +78,7 @@ public class QueueSubscription extends PrefetchSubscription {
int sequence = node.getGroupSequence();
if( groupId!=null ) {
ConcurrentHashMap messageGroupOwners = ((Queue)node.getRegionDestination()).getMessageGroupOwners();
MessageGroupMap messageGroupOwners = ((Queue)node.getRegionDestination()).getMessageGroupOwners();
// If we can own the first, then no-one else should own the rest.
if( sequence==1 ) {
@ -93,7 +94,7 @@ public class QueueSubscription extends PrefetchSubscription {
// need to become the new owner.
ConsumerId groupOwner;
synchronized(node) {
groupOwner = (ConsumerId) messageGroupOwners.get(groupId);
groupOwner = messageGroupOwners.get(groupId);
if( groupOwner==null ) {
if( node.lock(this) ) {
messageGroupOwners.put(groupId, info.getConsumerId());
@ -107,7 +108,7 @@ public class QueueSubscription extends PrefetchSubscription {
if( groupOwner.equals(info.getConsumerId()) ) {
// A group sequence < 1 is an end of group signal.
if ( sequence < 1 ) {
messageGroupOwners.remove(groupId);
messageGroupOwners.removeGroup(groupId);
}
return true;
}

View File

@ -0,0 +1,32 @@
/**
*
* Copyright 2005 LogicBlaze, Inc. http://www.logicblaze.com
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
**/
package org.apache.activemq.broker.region.group;
/**
* Represents an empty {@link MessageGroupSet}
*
* @version $Revision$
*/
public class EmptyMessageGroupSet implements MessageGroupSet {
public static final MessageGroupSet INSTANCE = new EmptyMessageGroupSet();
public boolean contains(String groupID) {
return false;
}
}

View File

@ -0,0 +1,107 @@
/**
*
* Copyright 2005 LogicBlaze, Inc. http://www.logicblaze.com
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
**/
package org.apache.activemq.broker.region.group;
import org.apache.activemq.command.ConsumerId;
/**
* Uses hash-code buckets to associate consumers with sets of message group IDs.
*
* @version $Revision$
*/
public class MessageGroupHashBucket implements MessageGroupMap {
private final int bucketCount;
private final ConsumerId[] consumers;
public MessageGroupHashBucket(int bucketCount) {
this.bucketCount = bucketCount;
this.consumers = new ConsumerId[bucketCount];
}
public void put(String groupId, ConsumerId consumerId) {
int bucket = getBucketNumber(groupId);
consumers[bucket] = consumerId;
}
public ConsumerId get(String groupId) {
int bucket = getBucketNumber(groupId);
return consumers[bucket];
}
public ConsumerId removeGroup(String groupId) {
int bucket = getBucketNumber(groupId);
ConsumerId answer = consumers[bucket];
consumers[bucket] = null;
return answer;
}
public MessageGroupSet removeConsumer(ConsumerId consumerId) {
MessageGroupSet answer = null;
for (int i = 0; i < consumers.length; i++) {
ConsumerId owner = consumers[i];
if (owner != null && owner.equals(consumerId)) {
answer = createMessageGroupSet(i, answer);
consumers[i] = null;
}
}
if (answer == null) {
// make an empty set
answer = EmptyMessageGroupSet.INSTANCE;
}
return answer;
}
public String toString() {
int count = 0;
for (int i = 0; i < consumers.length; i++) {
if (consumers[i] != null) {
count++;
}
}
return "active message group buckets: " + count;
}
protected MessageGroupSet createMessageGroupSet(int bucketNumber, final MessageGroupSet parent) {
final MessageGroupSet answer = createMessageGroupSet(bucketNumber);
if (parent == null) {
return answer;
}
else {
// union the two sets together
return new MessageGroupSet() {
public boolean contains(String groupID) {
return parent.contains(groupID) || answer.contains(groupID);
}
};
}
}
protected MessageGroupSet createMessageGroupSet(final int bucketNumber) {
return new MessageGroupSet() {
public boolean contains(String groupID) {
int bucket = getBucketNumber(groupID);
return bucket == bucketNumber;
}
};
}
protected int getBucketNumber(String groupId) {
return groupId.hashCode() % bucketCount;
}
}

View File

@ -0,0 +1,37 @@
/**
*
* Copyright 2005 LogicBlaze, Inc. http://www.logicblaze.com
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
**/
package org.apache.activemq.broker.region.group;
import org.apache.activemq.command.ConsumerId;
/**
* Represents a map of JMSXGroupID values to consumer IDs
*
* @version $Revision$
*/
public interface MessageGroupMap {
void put(String groupId, ConsumerId consumerId);
ConsumerId get(String groupId);
ConsumerId removeGroup(String groupId);
MessageGroupSet removeConsumer(ConsumerId consumerId);
}

View File

@ -0,0 +1,29 @@
/**
*
* Copyright 2005 LogicBlaze, Inc. http://www.logicblaze.com
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
**/
package org.apache.activemq.broker.region.group;
/**
* Represents a set of Message Group IDs
*
* @version $Revision$
*/
public interface MessageGroupSet {
boolean contains(String groupID);
}

View File

@ -0,0 +1,66 @@
/**
*
* Copyright 2005 LogicBlaze, Inc. http://www.logicblaze.com
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
**/
package org.apache.activemq.broker.region.group;
import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
import org.apache.activemq.command.ConsumerId;
import java.util.Iterator;
import java.util.Map;
/**
* A simple implementation which tracks every individual GroupID value but
* which can become a memory leak if clients die before they complete a message
* group.
*
* @version $Revision$
*/
public class SimpleMessageGroupMap implements MessageGroupMap {
private Map map = new ConcurrentHashMap();
public void put(String groupId, ConsumerId consumerId) {
map.put(groupId, consumerId);
}
public ConsumerId get(String groupId) {
return (ConsumerId) map.get(groupId);
}
public ConsumerId removeGroup(String groupId) {
return (ConsumerId) map.remove(groupId);
}
public MessageGroupSet removeConsumer(ConsumerId consumerId) {
SimpleMessageGroupSet ownedGroups = new SimpleMessageGroupSet();
for (Iterator iter = map.keySet().iterator(); iter.hasNext();) {
String group = (String) iter.next();
ConsumerId owner = (ConsumerId) map.get(group);
if (owner.equals(consumerId)) {
ownedGroups.add(group);
iter.remove();
}
}
return ownedGroups;
}
public String toString() {
return "message groups: " + map.size();
}
}

View File

@ -0,0 +1,40 @@
/**
*
* Copyright 2005 LogicBlaze, Inc. http://www.logicblaze.com
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
**/
package org.apache.activemq.broker.region.group;
import java.util.HashSet;
import java.util.Set;
/**
* A simple implementation which just uses a {@link Set}
*
* @version $Revision$
*/
public class SimpleMessageGroupSet implements MessageGroupSet {
private Set set = new HashSet();
public boolean contains(String groupID) {
return set.contains(groupID);
}
public void add(String group) {
set.add(group);
}
}

View File

@ -0,0 +1,9 @@
<html>
<head>
</head>
<body>
Classes to implement the <a href="http://activemq.org/Message+Groups">Message Groups</a>a> feature.
</body>
</html>

View File

@ -904,7 +904,7 @@ static final long[] jjtoSkip = {
static final long[] jjtoSpecial = {
0x3eL,
};
private SimpleCharStream input_stream;
protected SimpleCharStream input_stream;
private final int[] jjrounds = new int[43];
private final int[] jjstateSet = new int[86];
protected char curChar;
@ -946,7 +946,7 @@ public void SwitchTo(int lexState)
curLexState = lexState;
}
private final Token jjFillToken()
protected Token jjFillToken()
{
Token t = Token.newToken(jjmatchedKind);
t.kind = jjmatchedKind;
@ -966,7 +966,7 @@ int jjround;
int jjmatchedPos;
int jjmatchedKind;
public final Token getNextToken()
public Token getNextToken()
{
int kind;
Token specialToken = null;

View File

@ -0,0 +1,31 @@
/**
*
* Copyright 2005 LogicBlaze, Inc. http://www.logicblaze.com
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
**/
package org.apache.activemq.broker.region.group;
/**
*
* @version $Revision$
*/
public class MessageGroupHashBucketTest extends MessageGroupMapTest {
protected MessageGroupMap createMessageGroupMap() {
return new MessageGroupHashBucket(1024);
}
}

View File

@ -0,0 +1,106 @@
/**
*
* Copyright 2005 LogicBlaze, Inc. http://www.logicblaze.com
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
**/
package org.apache.activemq.broker.region.group;
import org.apache.activemq.command.ConnectionId;
import org.apache.activemq.command.ConsumerId;
import org.apache.activemq.command.SessionId;
import junit.framework.TestCase;
/**
*
* @version $Revision$
*/
public class MessageGroupMapTest extends TestCase {
protected MessageGroupMap map;
private ConsumerId consumer1;
private ConsumerId consumer2;
private ConsumerId consumer3;
private long idCounter;
public void testSingleConsumerForManyBucks() throws Exception {
assertGet("1", null);
map.put("1", consumer1);
assertGet("1", consumer1);
map.put("2", consumer1);
assertGet("2", consumer1);
map.put("3", consumer1);
assertGet("3", consumer1);
MessageGroupSet set = map.removeConsumer(consumer1);
assertContains(set, "1");
assertContains(set, "2");
assertContains(set, "3");
assertGet("1", null);
assertGet("2", null);
assertGet("3", null);
}
public void testManyConsumers() throws Exception {
assertGet("1", null);
map.put("1", consumer1);
assertGet("1", consumer1);
map.put("2", consumer2);
assertGet("2", consumer2);
map.put("3", consumer3);
assertGet("3", consumer3);
MessageGroupSet set = map.removeConsumer(consumer1);
assertContains(set, "1");
assertGet("1", null);
map.put("1", consumer2);
assertGet("1", consumer2);
set = map.removeConsumer(consumer2);
assertContains(set, "1");
assertContains(set, "2");
}
protected void setUp() throws Exception {
super.setUp();
map = createMessageGroupMap();
consumer1 = createConsumerId();
consumer2 = createConsumerId();
consumer3 = createConsumerId();
}
protected MessageGroupMap createMessageGroupMap() {
return new SimpleMessageGroupMap();
}
protected ConsumerId createConsumerId() {
ConnectionId connectionId = new ConnectionId("" + ++idCounter);
SessionId sessionId = new SessionId(connectionId, ++idCounter);
ConsumerId answer = new ConsumerId(sessionId, ++idCounter);
return answer;
}
protected void assertGet(String groupdId, ConsumerId expected) {
ConsumerId actual = map.get(groupdId);
assertEquals("Entry for groupId: " + groupdId, expected, actual);
}
protected void assertContains(MessageGroupSet set, String groupID) {
assertTrue("MessageGroup set: " + set + " does not contain groupID: " + groupID, set.contains(groupID));
}
}