mirror of https://github.com/apache/activemq.git
resolve AMQ-1979, slave connection did not know about temp destinations so it would not delete on close, only on an explicit delete command
git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@705592 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
4842547a5b
commit
d81ce2e276
|
@ -17,6 +17,7 @@
|
|||
package org.apache.activemq.broker.ft;
|
||||
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
import org.apache.activemq.broker.Connection;
|
||||
import org.apache.activemq.broker.ConnectionContext;
|
||||
import org.apache.activemq.broker.ConsumerBrokerExchange;
|
||||
|
@ -211,14 +212,24 @@ public class MasterBroker extends InsertableMutableBrokerFilter {
|
|||
super.removeSubscription(context, info);
|
||||
sendAsyncToSlave(info);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void addDestinationInfo(ConnectionContext context,
|
||||
DestinationInfo info) throws Exception {
|
||||
super.addDestinationInfo(context, info);
|
||||
if (info.getDestination().isTemporary()) {
|
||||
sendAsyncToSlave(info);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void removeDestinationInfo(ConnectionContext context, DestinationInfo info) throws Exception {
|
||||
super.removeDestinationInfo(context, info);
|
||||
if (info.getDestination().isTemporary()) {
|
||||
sendAsyncToSlave(info);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* begin a transaction
|
||||
*
|
||||
|
|
|
@ -75,6 +75,10 @@ public class MasterSlaveTempQueueMemoryTest extends TempQueueMemoryTest {
|
|||
AdvisoryBroker ab = (AdvisoryBroker) slave.getBroker().getAdaptor(
|
||||
AdvisoryBroker.class);
|
||||
|
||||
if (!deleteTempQueue) {
|
||||
// give temp destination removes a chance to perculate on connection.close
|
||||
Thread.sleep(2000);
|
||||
}
|
||||
assertEquals("the temp queues should not be visible as they are removed", 1, ab.getAdvisoryDestinations().size());
|
||||
|
||||
RegionBroker rb = (RegionBroker) slave.getBroker().getAdaptor(
|
||||
|
@ -101,7 +105,7 @@ public class MasterSlaveTempQueueMemoryTest extends TempQueueMemoryTest {
|
|||
// master does not always reach expected total, should be assertEquals.., why?
|
||||
assertTrue("dispatched to slave is as good as master, master="
|
||||
+ masterRb.getDestinationStatistics().getDispatched().getCount(),
|
||||
rb.getDestinationStatistics().getDispatched().getCount() + 2*COUNT >=
|
||||
rb.getDestinationStatistics().getDispatched().getCount() + 2*messagesToSend >=
|
||||
masterRb.getDestinationStatistics().getDispatched().getCount());
|
||||
}
|
||||
|
||||
|
@ -121,7 +125,6 @@ public class MasterSlaveTempQueueMemoryTest extends TempQueueMemoryTest {
|
|||
try {
|
||||
latch.await(30L, TimeUnit.SECONDS);
|
||||
} catch (Exception e) {
|
||||
// TODO Auto-generated catch block
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
|
@ -147,4 +150,11 @@ public class MasterSlaveTempQueueMemoryTest extends TempQueueMemoryTest {
|
|||
assertEquals("inflight match expected", 0, masterRb.getDestinationStatistics().getInflight().getCount());
|
||||
assertEquals("inflight match on slave and master", slaveRb.getDestinationStatistics().getInflight().getCount(), masterRb.getDestinationStatistics().getInflight().getCount());
|
||||
}
|
||||
|
||||
public void testLoadRequestReplyWithNoTempQueueDelete() throws Exception {
|
||||
deleteTempQueue = false;
|
||||
messagesToSend = 10;
|
||||
testLoadRequestReply();
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -37,7 +37,8 @@ public class TempQueueMemoryTest extends EmbeddedBrokerTestSupport {
|
|||
protected Connection clientConnection;
|
||||
protected Session clientSession;
|
||||
protected Destination serverDestination;
|
||||
protected static final int COUNT = 2000;
|
||||
protected int messagesToSend = 2000;
|
||||
protected boolean deleteTempQueue = true;
|
||||
|
||||
public void testLoadRequestReply() throws Exception {
|
||||
MessageConsumer serverConsumer = serverSession.createConsumer(serverDestination);
|
||||
|
@ -56,7 +57,7 @@ public class TempQueueMemoryTest extends EmbeddedBrokerTestSupport {
|
|||
});
|
||||
|
||||
MessageProducer producer = clientSession.createProducer(serverDestination);
|
||||
for (int i =0; i< COUNT; i++) {
|
||||
for (int i =0; i< messagesToSend; i++) {
|
||||
TemporaryQueue replyTo = clientSession.createTemporaryQueue();
|
||||
MessageConsumer consumer = clientSession.createConsumer(replyTo);
|
||||
Message msg = clientSession.createMessage();
|
||||
|
@ -64,7 +65,11 @@ public class TempQueueMemoryTest extends EmbeddedBrokerTestSupport {
|
|||
producer.send(msg);
|
||||
Message reply = consumer.receive();
|
||||
consumer.close();
|
||||
replyTo.delete();
|
||||
if (deleteTempQueue) {
|
||||
replyTo.delete();
|
||||
} else {
|
||||
// temp queue will be cleaned up on clientConnection.close
|
||||
}
|
||||
}
|
||||
|
||||
clientSession.close();
|
||||
|
|
Loading…
Reference in New Issue