https://issues.apache.org/jira/browse/AMQ-3092 - Deleting a Queue from the console results in lost messages

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1050059 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Bosanac Dejan 2010-12-16 17:07:24 +00:00
parent 6348481c18
commit 44d6be4e33
2 changed files with 36 additions and 5 deletions

View File

@ -492,7 +492,8 @@ public class RegionBroker extends EmptyBroker {
@Override @Override
public void send(ProducerBrokerExchange producerExchange, Message message) throws Exception { public void send(ProducerBrokerExchange producerExchange, Message message) throws Exception {
message.setBrokerInTime(System.currentTimeMillis()); message.setBrokerInTime(System.currentTimeMillis());
if (producerExchange.isMutable() || producerExchange.getRegion() == null) { if (producerExchange.isMutable() || producerExchange.getRegion() == null
|| (producerExchange.getRegion() != null && producerExchange.getRegion().getDestinationMap().get(message.getDestination()) == null)) {
ActiveMQDestination destination = message.getDestination(); ActiveMQDestination destination = message.getDestination();
// ensure the destination is registered with the RegionBroker // ensure the destination is registered with the RegionBroker
producerExchange.getConnectionContext().getBroker().addDestination(producerExchange.getConnectionContext(), destination,false); producerExchange.getConnectionContext().getBroker().addDestination(producerExchange.getConnectionContext(), destination,false);
@ -514,6 +515,7 @@ public class RegionBroker extends EmptyBroker {
throw createUnknownDestinationTypeException(destination); throw createUnknownDestinationTypeException(destination);
} }
producerExchange.setRegion(region); producerExchange.setRegion(region);
producerExchange.setRegionDestination(null);
} }
producerExchange.getRegion().send(producerExchange, message); producerExchange.getRegion().send(producerExchange, message);
} }

View File

@ -16,10 +16,7 @@
*/ */
package org.apache.activemq.broker.jmx; package org.apache.activemq.broker.jmx;
import javax.jms.Connection; import javax.jms.*;
import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.management.MBeanServer; import javax.management.MBeanServer;
import javax.management.MBeanServerInvocationHandler; import javax.management.MBeanServerInvocationHandler;
import javax.management.MalformedObjectNameException; import javax.management.MalformedObjectNameException;
@ -111,6 +108,38 @@ public class PurgeTest extends EmbeddedBrokerTestSupport {
addCombinationValues("persistenceAdapter", new Object[] {new MemoryPersistenceAdapter(), new AMQPersistenceAdapter(), new JDBCPersistenceAdapter()}); addCombinationValues("persistenceAdapter", new Object[] {new MemoryPersistenceAdapter(), new AMQPersistenceAdapter(), new JDBCPersistenceAdapter()});
} }
public void testDeleteSameProducer() throws Exception {
connection = connectionFactory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
destination = createDestination();
MessageProducer producer = session.createProducer(destination);
Message message = session.createTextMessage("Test Message");
producer.send(message);
MessageConsumer consumer = session.createConsumer(destination);
Message received = consumer.receive(1000);
assertEquals(message, received);
ObjectName brokerViewMBeanName = assertRegisteredObjectName(domain + ":Type=Broker,BrokerName=localhost");
BrokerViewMBean brokerProxy = (BrokerViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, brokerViewMBeanName, BrokerViewMBean.class, true);
brokerProxy.removeQueue(getDestinationString());
producer.send(message);
received = consumer.receive(1000);
assertNotNull("Message not received", received);
assertEquals(message, received);
}
public void testDelete() throws Exception { public void testDelete() throws Exception {
// Send some messages // Send some messages
connection = connectionFactory.createConnection(); connection = connectionFactory.createConnection();