When destinations are removed via JMX or by auto remove of inactive destinations an existing producer registration can be left with a reference to a resource that no longer exists but appears valid.  The destination should be marked as disposed so that a producer send can check to see if it should discard its current reference to the destination and either recreate it or update its cache to point to the currect destination resource.

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1140770 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Timothy A. Bish 2011-06-28 18:15:29 +00:00
parent 007bd4cc56
commit b5a19163bc
5 changed files with 273 additions and 100 deletions

View File

@ -97,6 +97,7 @@ public abstract class BaseDestination implements Destination {
private long lastActiveTime=0l; private long lastActiveTime=0l;
private boolean reduceMemoryFootprint = false; private boolean reduceMemoryFootprint = false;
protected final Scheduler scheduler; protected final Scheduler scheduler;
private boolean disposed = false;
/** /**
* @param brokerService * @param brokerService
@ -518,6 +519,11 @@ public abstract class BaseDestination implements Destination {
} }
this.destinationStatistics.setParent(null); this.destinationStatistics.setParent(null);
this.memoryUsage.stop(); this.memoryUsage.stop();
this.disposed = true;
}
public boolean isDisposed() {
return this.disposed;
} }
/** /**

View File

@ -62,6 +62,8 @@ public interface Destination extends Service, Task {
void dispose(ConnectionContext context) throws IOException; void dispose(ConnectionContext context) throws IOException;
boolean isDisposed();
DestinationStatistics getDestinationStatistics(); DestinationStatistics getDestinationStatistics();
DeadLetterStrategy getDeadLetterStrategy(); DeadLetterStrategy getDeadLetterStrategy();

View File

@ -61,6 +61,10 @@ public class DestinationFilter implements Destination {
next.dispose(context); next.dispose(context);
} }
public boolean isDisposed() {
return next.isDisposed();
}
public void gc() { public void gc() {
next.gc(); next.gc();
} }

View File

@ -232,17 +232,17 @@ public class RegionBroker extends EmptyBroker {
synchronized (clientIdSet) { synchronized (clientIdSet) {
ConnectionContext oldContext = clientIdSet.get(clientId); ConnectionContext oldContext = clientIdSet.get(clientId);
if (oldContext != null) { if (oldContext != null) {
if (context.isFaultTolerant() || context.isNetworkConnection()){ if (context.isFaultTolerant() || context.isNetworkConnection()){
//remove the old connection //remove the old connection
try{ try{
removeConnection(oldContext, info, new Exception("remove stale client")); removeConnection(oldContext, info, new Exception("remove stale client"));
}catch(Exception e){ }catch(Exception e){
LOG.warn("Failed to remove stale connection ",e); LOG.warn("Failed to remove stale connection ",e);
} }
}else{ }else{
throw new InvalidClientIDException("Broker: " + getBrokerName() + " - Client: " + clientId + " already connected from " throw new InvalidClientIDException("Broker: " + getBrokerName() + " - Client: " + clientId + " already connected from "
+ oldContext.getConnection().getRemoteAddress()); + oldContext.getConnection().getRemoteAddress());
} }
} else { } else {
clientIdSet.put(clientId, context); clientIdSet.put(clientId, context);
} }
@ -496,7 +496,8 @@ public class RegionBroker extends EmptyBroker {
public void send(ProducerBrokerExchange producerExchange, Message message) throws Exception { public void send(ProducerBrokerExchange producerExchange, Message message) throws Exception {
message.setBrokerInTime(System.currentTimeMillis()); message.setBrokerInTime(System.currentTimeMillis());
if (producerExchange.isMutable() || producerExchange.getRegion() == null if (producerExchange.isMutable() || producerExchange.getRegion() == null
|| (producerExchange.getRegion() != null && producerExchange.getRegion().getDestinationMap().get(message.getDestination()) == null)) { || (producerExchange.getRegion() != null && producerExchange.getRegion().getDestinationMap().get(message.getDestination()) == null)
|| (producerExchange.getRegionDestination() != null && producerExchange.getRegionDestination().isDisposed())) {
ActiveMQDestination destination = message.getDestination(); ActiveMQDestination destination = message.getDestination();
// ensure the destination is registered with the RegionBroker // ensure the destination is registered with the RegionBroker
producerExchange.getConnectionContext().getBroker().addDestination(producerExchange.getConnectionContext(), destination, isAllowTempAutoCreationOnSend()); producerExchange.getConnectionContext().getBroker().addDestination(producerExchange.getConnectionContext(), destination, isAllowTempAutoCreationOnSend());
@ -520,6 +521,7 @@ public class RegionBroker extends EmptyBroker {
producerExchange.setRegion(region); producerExchange.setRegion(region);
producerExchange.setRegionDestination(null); producerExchange.setRegionDestination(null);
} }
producerExchange.getRegion().send(producerExchange, message); producerExchange.getRegion().send(producerExchange, message);
} }
@ -815,48 +817,48 @@ public class RegionBroker extends EmptyBroker {
@Override @Override
public void sendToDeadLetterQueue(ConnectionContext context, public void sendToDeadLetterQueue(ConnectionContext context,
MessageReference node, Subscription subscription){ MessageReference node, Subscription subscription){
try{ try{
if(node!=null){ if(node!=null){
Message message=node.getMessage(); Message message=node.getMessage();
if(message!=null && node.getRegionDestination()!=null){ if(message!=null && node.getRegionDestination()!=null){
DeadLetterStrategy deadLetterStrategy=node DeadLetterStrategy deadLetterStrategy=node
.getRegionDestination().getDeadLetterStrategy(); .getRegionDestination().getDeadLetterStrategy();
if(deadLetterStrategy!=null){ if(deadLetterStrategy!=null){
if(deadLetterStrategy.isSendToDeadLetterQueue(message)){ if(deadLetterStrategy.isSendToDeadLetterQueue(message)){
// message may be inflight to other subscriptions so do not modify // message may be inflight to other subscriptions so do not modify
message = message.copy(); message = message.copy();
stampAsExpired(message); stampAsExpired(message);
message.setExpiration(0); message.setExpiration(0);
if(!message.isPersistent()){ if(!message.isPersistent()){
message.setPersistent(true); message.setPersistent(true);
message.setProperty("originalDeliveryMode", message.setProperty("originalDeliveryMode",
"NON_PERSISTENT"); "NON_PERSISTENT");
} }
// The original destination and transaction id do // The original destination and transaction id do
// not get filled when the message is first sent, // not get filled when the message is first sent,
// it is only populated if the message is routed to // it is only populated if the message is routed to
// another destination like the DLQ // another destination like the DLQ
ActiveMQDestination deadLetterDestination=deadLetterStrategy ActiveMQDestination deadLetterDestination=deadLetterStrategy
.getDeadLetterQueueFor(message, subscription); .getDeadLetterQueueFor(message, subscription);
if (context.getBroker()==null) { if (context.getBroker()==null) {
context.setBroker(getRoot()); context.setBroker(getRoot());
} }
BrokerSupport.resendNoCopy(context,message, BrokerSupport.resendNoCopy(context,message,
deadLetterDestination); deadLetterDestination);
} }
} else { } else {
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("Dead Letter message with no DLQ strategy in place, message id: " LOG.debug("Dead Letter message with no DLQ strategy in place, message id: "
+ message.getMessageId() + ", destination: " + message.getDestination()); + message.getMessageId() + ", destination: " + message.getDestination());
} }
} }
} }
} }
}catch(Exception e){ }catch(Exception e){
LOG.warn("Caught an exception sending to DLQ: "+node,e); LOG.warn("Caught an exception sending to DLQ: "+node,e);
} }
} }
@Override @Override
public Broker getRoot() { public Broker getRoot() {

View File

@ -0,0 +1,159 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.usecases;
import static org.junit.Assert.assertEquals;
import java.util.List;
import javax.jms.DeliveryMode;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueSession;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.management.MBeanServerConnection;
import javax.management.ObjectName;
import javax.management.remote.JMXConnector;
import javax.management.remote.JMXConnectorFactory;
import javax.management.remote.JMXServiceURL;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class JMXRemoveQueueThenSendIgnoredTest {
private static final Logger LOG = LoggerFactory.getLogger(JMXRemoveQueueThenSendIgnoredTest.class);
private BrokerService brokerService;
private MessageProducer producer;
private QueueSession session;
private QueueConnection connection;
private Queue queue;
private int count = 1;
@Before
public void setUp() throws Exception {
brokerService = new BrokerService();
brokerService.setBrokerName("dev");
brokerService.setPersistent(false);
brokerService.setUseJmx(true);
brokerService.addConnector("tcp://localhost:0");
brokerService.start();
final String brokerUri = brokerService.getTransportConnectors().get(0).getPublishableConnectString();
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(brokerUri);
connection = activeMQConnectionFactory.createQueueConnection();
session = connection.createQueueSession(true, Session.AUTO_ACKNOWLEDGE/*SESSION_TRANSACTED*/);
queue = session.createQueue("myqueue");
producer = session.createProducer(queue);
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
connection.start();
}
@Test
public void testRemoveQueueAndProduceAfterNewConsumerAdded() throws Exception {
MessageConsumer firstConsumer = registerConsumer();
produceMessage();
Message message = firstConsumer.receive(5000);
LOG.debug("Received message " + message);
assertEquals(1, numberOfMessages());
firstConsumer.close();
session.commit();
Thread.sleep(1000);
removeQueue();
Thread.sleep(1000);
MessageConsumer secondConsumer = registerConsumer();
produceMessage();
message = secondConsumer.receive(5000);
LOG.debug("Received message " + message);
assertEquals(1, numberOfMessages());
secondConsumer.close();
}
@Test
public void testRemoveQueueAndProduceBeforeNewConsumerAdded() throws Exception {
MessageConsumer firstConsumer = registerConsumer();
produceMessage();
Message message = firstConsumer.receive(5000);
LOG.debug("Received message " + message);
assertEquals(1, numberOfMessages());
firstConsumer.close();
session.commit();
Thread.sleep(1000);
removeQueue();
Thread.sleep(1000);
produceMessage();
MessageConsumer secondConsumer = registerConsumer();
message = secondConsumer.receive(5000);
LOG.debug("Received message " + message);
assertEquals(1, numberOfMessages());
secondConsumer.close();
}
private MessageConsumer registerConsumer() throws JMSException {
MessageConsumer consumer = session.createConsumer(queue);
return consumer;
}
private int numberOfMessages() throws Exception {
JMXConnector jmxConnector = JMXConnectorFactory.connect(new JMXServiceURL("service:jmx:rmi:///jndi/rmi://localhost:1099/jmxrmi"));
MBeanServerConnection mbeanServerConnection = jmxConnector.getMBeanServerConnection();
String beanId = "org.apache.activemq:BrokerName=dev,Type=Queue,Destination=myqueue";
List<?> object = (List<?>) mbeanServerConnection.invoke(new ObjectName(beanId), "browseMessages", null, null);
jmxConnector.close();
return object.size();
}
private void removeQueue() throws Exception {
LOG.debug("Removing Destination: myqueue");
brokerService.getAdminView().removeQueue("myqueue");
}
private void produceMessage() throws JMSException {
TextMessage textMessage = session.createTextMessage();
textMessage.setText("Sending message: " + count++);
LOG.debug("Sending message: " + textMessage);
producer.send(textMessage);
session.commit();
}
@After
public void tearDown() throws Exception {
connection.close();
brokerService.stop();
}
}