mirror of
https://github.com/apache/activemq.git
synced 2025-02-16 23:16:52 +00:00
Fixed the JmsTempDestinationTest test. Async send was causing the temp destination to get recreated after the temp connection was closed.
Fixed - Was sending duplicate temp destination created advisories. git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@641938 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
14b8957713
commit
b5df4babc0
@ -16,6 +16,7 @@
|
||||
*/
|
||||
package org.apache.activemq.advisory;
|
||||
|
||||
import java.util.Date;
|
||||
import java.util.Iterator;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
@ -64,7 +65,7 @@ public class AdvisoryBroker extends BrokerFilter {
|
||||
protected final ProducerId advisoryProducerId = new ProducerId();
|
||||
|
||||
private final LongSequenceGenerator messageIdGenerator = new LongSequenceGenerator();
|
||||
|
||||
|
||||
public AdvisoryBroker(Broker next) {
|
||||
super(next);
|
||||
advisoryProducerId.setConnectionId(ID_GENERATOR.generateId());
|
||||
@ -145,10 +146,12 @@ public class AdvisoryBroker extends BrokerFilter {
|
||||
public Destination addDestination(ConnectionContext context, ActiveMQDestination destination) throws Exception {
|
||||
Destination answer = next.addDestination(context, destination);
|
||||
if (!AdvisorySupport.isAdvisoryTopic(destination)) {
|
||||
ActiveMQTopic topic = AdvisorySupport.getDestinationAdvisoryTopic(destination);
|
||||
DestinationInfo info = new DestinationInfo(context.getConnectionId(), DestinationInfo.ADD_OPERATION_TYPE, destination);
|
||||
fireAdvisory(context, topic, info);
|
||||
destinations.put(destination, info);
|
||||
DestinationInfo previous = destinations.putIfAbsent(destination, info);
|
||||
if( previous==null ) {
|
||||
ActiveMQTopic topic = AdvisorySupport.getDestinationAdvisoryTopic(destination);
|
||||
fireAdvisory(context, topic, info);
|
||||
}
|
||||
}
|
||||
return answer;
|
||||
}
|
||||
@ -158,9 +161,11 @@ public class AdvisoryBroker extends BrokerFilter {
|
||||
next.addDestinationInfo(context, info);
|
||||
|
||||
if (!AdvisorySupport.isAdvisoryTopic(destination)) {
|
||||
ActiveMQTopic topic = AdvisorySupport.getDestinationAdvisoryTopic(destination);
|
||||
fireAdvisory(context, topic, info);
|
||||
destinations.put(destination, info);
|
||||
DestinationInfo previous = destinations.putIfAbsent(destination, info);
|
||||
if( previous==null ) {
|
||||
ActiveMQTopic topic = AdvisorySupport.getDestinationAdvisoryTopic(destination);
|
||||
fireAdvisory(context, topic, info);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -187,7 +192,6 @@ public class AdvisoryBroker extends BrokerFilter {
|
||||
public void removeDestinationInfo(ConnectionContext context, DestinationInfo destInfo) throws Exception {
|
||||
next.removeDestinationInfo(context, destInfo);
|
||||
DestinationInfo info = destinations.remove(destInfo.getDestination());
|
||||
|
||||
if (info != null) {
|
||||
info.setDestination(destInfo.getDestination());
|
||||
info.setOperationType(DestinationInfo.REMOVE_OPERATION_TYPE);
|
||||
|
@ -50,7 +50,7 @@ public class JmsTempDestinationTest extends TestCase {
|
||||
|
||||
protected void setUp() throws Exception {
|
||||
factory = new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false");
|
||||
factory.setUseAsyncSend(false);
|
||||
factory.setAlwaysSyncSend(true);
|
||||
connection = factory.createConnection();
|
||||
connections.add(connection);
|
||||
}
|
||||
@ -68,7 +68,7 @@ public class JmsTempDestinationTest extends TestCase {
|
||||
iter.remove();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Make sure Temp destination can only be consumed by local connection
|
||||
*
|
||||
|
Loading…
x
Reference in New Issue
Block a user