diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java index c698bdd535..bcd2dc45bd 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java @@ -492,7 +492,8 @@ public class RegionBroker extends EmptyBroker { @Override public void send(ProducerBrokerExchange producerExchange, Message message) throws Exception { message.setBrokerInTime(System.currentTimeMillis()); - if (producerExchange.isMutable() || producerExchange.getRegion() == null) { + if (producerExchange.isMutable() || producerExchange.getRegion() == null + || (producerExchange.getRegion() != null && producerExchange.getRegion().getDestinationMap().get(message.getDestination()) == null)) { ActiveMQDestination destination = message.getDestination(); // ensure the destination is registered with the RegionBroker producerExchange.getConnectionContext().getBroker().addDestination(producerExchange.getConnectionContext(), destination,false); @@ -514,6 +515,7 @@ public class RegionBroker extends EmptyBroker { throw createUnknownDestinationTypeException(destination); } producerExchange.setRegion(region); + producerExchange.setRegionDestination(null); } producerExchange.getRegion().send(producerExchange, message); } diff --git a/activemq-core/src/test/java/org/apache/activemq/broker/jmx/PurgeTest.java b/activemq-core/src/test/java/org/apache/activemq/broker/jmx/PurgeTest.java index 150b19d79a..e38843583a 100644 --- a/activemq-core/src/test/java/org/apache/activemq/broker/jmx/PurgeTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/broker/jmx/PurgeTest.java @@ -16,10 +16,7 @@ */ package org.apache.activemq.broker.jmx; -import javax.jms.Connection; -import javax.jms.Message; -import javax.jms.MessageProducer; -import javax.jms.Session; +import javax.jms.*; import javax.management.MBeanServer; import javax.management.MBeanServerInvocationHandler; import javax.management.MalformedObjectNameException; @@ -111,6 +108,38 @@ public class PurgeTest extends EmbeddedBrokerTestSupport { addCombinationValues("persistenceAdapter", new Object[] {new MemoryPersistenceAdapter(), new AMQPersistenceAdapter(), new JDBCPersistenceAdapter()}); } + public void testDeleteSameProducer() throws Exception { + connection = connectionFactory.createConnection(); + connection.start(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + destination = createDestination(); + + MessageProducer producer = session.createProducer(destination); + Message message = session.createTextMessage("Test Message"); + producer.send(message); + + + MessageConsumer consumer = session.createConsumer(destination); + + Message received = consumer.receive(1000); + assertEquals(message, received); + + ObjectName brokerViewMBeanName = assertRegisteredObjectName(domain + ":Type=Broker,BrokerName=localhost"); + BrokerViewMBean brokerProxy = (BrokerViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, brokerViewMBeanName, BrokerViewMBean.class, true); + + brokerProxy.removeQueue(getDestinationString()); + + + producer.send(message); + + received = consumer.receive(1000); + + assertNotNull("Message not received", received); + assertEquals(message, received); + + + } + public void testDelete() throws Exception { // Send some messages connection = connectionFactory.createConnection();