applied patch from John Heitmann to fix AMQ-889 to avoid duplicate consumers (such as on failover) leaking resources

git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@436835 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
James Strachan 2006-08-25 15:39:16 +00:00
parent 1ad52f4209
commit e8c8abcd5b
4 changed files with 209 additions and 57 deletions

View File

@ -17,6 +17,8 @@
*/ */
package org.apache.activemq.broker.jmx; package org.apache.activemq.broker.jmx;
import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicInteger;
import javax.management.ObjectName; import javax.management.ObjectName;
import org.apache.activemq.broker.Broker; import org.apache.activemq.broker.Broker;
@ -33,6 +35,7 @@ public class BrokerView implements BrokerViewMBean {
final ManagedRegionBroker broker; final ManagedRegionBroker broker;
private final BrokerService brokerService; private final BrokerService brokerService;
private final AtomicInteger sessionIdCounter = new AtomicInteger(0);
public BrokerView(BrokerService brokerService, ManagedRegionBroker managedBroker) throws Exception { public BrokerView(BrokerService brokerService, ManagedRegionBroker managedBroker) throws Exception {
this.brokerService = brokerService; this.brokerService = brokerService;
@ -156,7 +159,7 @@ public class BrokerView implements BrokerViewMBean {
ConsumerInfo info = new ConsumerInfo(); ConsumerInfo info = new ConsumerInfo();
ConsumerId consumerId = new ConsumerId(); ConsumerId consumerId = new ConsumerId();
consumerId.setConnectionId(clientId); consumerId.setConnectionId(clientId);
consumerId.setSessionId(0); consumerId.setSessionId(sessionIdCounter.incrementAndGet());
consumerId.setValue(0); consumerId.setValue(0);
info.setConsumerId(consumerId); info.setConsumerId(consumerId);
info.setDestination(new ActiveMQTopic(topicName)); info.setDestination(new ActiveMQTopic(topicName));

View File

@ -60,6 +60,7 @@ abstract public class AbstractRegion implements Region {
protected boolean autoCreateDestinations=true; protected boolean autoCreateDestinations=true;
protected final TaskRunnerFactory taskRunnerFactory; protected final TaskRunnerFactory taskRunnerFactory;
protected final Object destinationsMutex = new Object(); protected final Object destinationsMutex = new Object();
protected final Map consumerChangeMutexMap = new HashMap();
public AbstractRegion(RegionBroker broker,DestinationStatistics destinationStatistics, UsageManager memoryManager, TaskRunnerFactory taskRunnerFactory, PersistenceAdapter persistenceAdapter) { public AbstractRegion(RegionBroker broker,DestinationStatistics destinationStatistics, UsageManager memoryManager, TaskRunnerFactory taskRunnerFactory, PersistenceAdapter persistenceAdapter) {
this.broker = broker; this.broker = broker;
@ -178,6 +179,21 @@ abstract public class AbstractRegion implements Region {
lookup(context, destination); lookup(context, destination);
} }
Object addGuard;
synchronized(consumerChangeMutexMap) {
addGuard = consumerChangeMutexMap.get(info.getConsumerId());
if (addGuard == null) {
addGuard = new Object();
consumerChangeMutexMap.put(info.getConsumerId(), addGuard);
}
}
synchronized (addGuard) {
Object o = subscriptions.get(info.getConsumerId());
if (o != null) {
log.warn("A duplicate subscription was detected. Clients may be misbehaving. Later warnings you may see about subscription removal are a consequence of this.");
return (Subscription)o;
}
Subscription sub = createSubscription(context, info); Subscription sub = createSubscription(context, info);
// We may need to add some destinations that are in persistent store but not active // We may need to add some destinations that are in persistent store but not active
@ -201,6 +217,13 @@ abstract public class AbstractRegion implements Region {
subscriptions.put(info.getConsumerId(), sub); subscriptions.put(info.getConsumerId(), sub);
// At this point we're done directly manipulating subscriptions,
// but we need to retain the synchronized block here. Consider
// otherwise what would happen if at this point a second
// thread added, then removed, as would be allowed with
// no mutex held. Remove is only essentially run once
// so everything after this point would be leaked.
// Add the subscription to all the matching queues. // Add the subscription to all the matching queues.
for (Iterator iter = destinationMap.get(info.getDestination()).iterator(); iter.hasNext();) { for (Iterator iter = destinationMap.get(info.getDestination()).iterator(); iter.hasNext();) {
Destination dest = (Destination) iter.next(); Destination dest = (Destination) iter.next();
@ -213,6 +236,7 @@ abstract public class AbstractRegion implements Region {
return sub; return sub;
} }
}
/** /**
* Get all the Destinations that are in storage * Get all the Destinations that are in storage
@ -247,6 +271,9 @@ abstract public class AbstractRegion implements Region {
destroySubscription(sub); destroySubscription(sub);
synchronized (consumerChangeMutexMap) {
consumerChangeMutexMap.remove(info.getConsumerId());
}
} }
protected void destroySubscription(Subscription sub) { protected void destroySubscription(Subscription sub) {

View File

@ -105,6 +105,10 @@ public class TopicRegion extends AbstractRegion {
else { else {
super.addConsumer(context, info); super.addConsumer(context, info);
sub = (DurableTopicSubscription) durableSubscriptions.get(key); sub = (DurableTopicSubscription) durableSubscriptions.get(key);
if (sub == null) {
throw new JMSException("Cannot use the same consumerId: " + info.getConsumerId() + " for two different durable subscriptions clientID: "
+ key.getClientId() + " subscriberName: " + key.getSubscriptionName());
}
} }
sub.activate(context, info); sub.activate(context, info);

View File

@ -0,0 +1,118 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.broker;
import javax.jms.DeliveryMode;
import junit.framework.Test;
import org.apache.activemq.broker.StubConnection;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ConnectionInfo;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.ProducerInfo;
import org.apache.activemq.command.SessionInfo;
import org.apache.activemq.network.NetworkTestSupport;
/**
* Pretend to be an abusive client that sends multiple
* identical ConsumerInfo commands and make sure the
* broker doesn't stall because of it.
*/
public class DoubleSubscriptionTest extends NetworkTestSupport {
public ActiveMQDestination destination;
public int deliveryMode;
private String remoteURI = "tcp://localhost:0?wireFormat.tcpNoDelayEnabled=true";
public static Test suite() {
return suite(DoubleSubscriptionTest.class);
}
public static void main(String[] args) {
junit.textui.TestRunner.run(suite());
}
public void initCombosForTestDoubleSubscription() {
addCombinationValues("destination", new Object[] { new ActiveMQQueue("TEST"), new ActiveMQQueue("TEST"), });
}
public void testDoubleSubscription() throws Exception {
// Start a normal consumer on the remote broker
StubConnection connection1 = createRemoteConnection();
ConnectionInfo connectionInfo1 = createConnectionInfo();
SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1);
ConsumerInfo consumerInfo1 = createConsumerInfo(sessionInfo1, destination);
connection1.send(connectionInfo1);
connection1.send(sessionInfo1);
connection1.request(consumerInfo1);
// Start a normal producer on a remote broker
StubConnection connection2 = createRemoteConnection();
ConnectionInfo connectionInfo2 = createConnectionInfo();
SessionInfo sessionInfo2 = createSessionInfo(connectionInfo2);
ProducerInfo producerInfo2 = createProducerInfo(sessionInfo2);
connection2.send(connectionInfo2);
connection2.send(sessionInfo2);
connection2.request(producerInfo2);
// Send a message to make sure the basics are working
connection2.request(createMessage(producerInfo2, destination, DeliveryMode.PERSISTENT));
Message m1 = receiveMessage(connection1);
assertNotNull(m1);
assertNoMessagesLeft(connection1);
connection1.send(createAck(consumerInfo1, m1, 1, MessageAck.STANDARD_ACK_TYPE));
// Send a message to sit on the broker while we mess with it
connection2.request(createMessage(producerInfo2, destination, DeliveryMode.PERSISTENT));
// Now we're going to resend the same consumer commands again and see if the broker
// can handle it.
connection1.send(connectionInfo1);
connection1.send(sessionInfo1);
connection1.request(consumerInfo1);
// After this there should be 2 messages on the broker...
connection2.request(createMessage(producerInfo2, destination, DeliveryMode.PERSISTENT));
// ... let's start a fresh consumer...
connection1.stop();
StubConnection connection3 = createRemoteConnection();
ConnectionInfo connectionInfo3 = createConnectionInfo();
SessionInfo sessionInfo3 = createSessionInfo(connectionInfo3);
ConsumerInfo consumerInfo3 = createConsumerInfo(sessionInfo3, destination);
connection3.send(connectionInfo3);
connection3.send(sessionInfo3);
connection3.request(consumerInfo3);
// ... and then grab the 2 that should be there.
assertNotNull(receiveMessage(connection3));
assertNotNull(receiveMessage(connection3));
assertNoMessagesLeft(connection3);
}
protected String getRemoteURI() {
return remoteURI;
}
}