ARTEMIS-191 Refactor RemoveDestinationTest
-Using core api to inspect queue status -Catch command visit() exceptions in order to pass it back to client. -Correct destination add/remove handlings
This commit is contained in:
parent
40f318f083
commit
be9959e0bc
|
@ -39,7 +39,6 @@ import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
|
||||||
import org.apache.activemq.artemis.api.core.ActiveMQException;
|
import org.apache.activemq.artemis.api.core.ActiveMQException;
|
||||||
import org.apache.activemq.artemis.api.core.ActiveMQNonExistentQueueException;
|
import org.apache.activemq.artemis.api.core.ActiveMQNonExistentQueueException;
|
||||||
import org.apache.activemq.artemis.api.core.ActiveMQSecurityException;
|
import org.apache.activemq.artemis.api.core.ActiveMQSecurityException;
|
||||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
|
||||||
import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQBrokerStoppedException;
|
import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQBrokerStoppedException;
|
||||||
import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQCompositeConsumerBrokerExchange;
|
import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQCompositeConsumerBrokerExchange;
|
||||||
import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQConnectionContext;
|
import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQConnectionContext;
|
||||||
|
@ -174,7 +173,7 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor {
|
||||||
|
|
||||||
private ConnectionState state;
|
private ConnectionState state;
|
||||||
|
|
||||||
private final Set<String> tempQueues = new ConcurrentHashSet<String>();
|
private final Set<ActiveMQDestination> tempQueues = new ConcurrentHashSet<ActiveMQDestination>();
|
||||||
|
|
||||||
private Map<TransactionId, TransactionInfo> txMap = new ConcurrentHashMap<TransactionId, TransactionInfo>();
|
private Map<TransactionId, TransactionInfo> txMap = new ConcurrentHashMap<TransactionId, TransactionInfo>();
|
||||||
|
|
||||||
|
@ -227,7 +226,14 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor {
|
||||||
response = new ExceptionResponse(this.stopError);
|
response = new ExceptionResponse(this.stopError);
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
response = ((Command) command).visit(this);
|
try {
|
||||||
|
response = ((Command) command).visit(this);
|
||||||
|
}
|
||||||
|
catch (Exception e) {
|
||||||
|
if (responseRequired) {
|
||||||
|
response = new ExceptionResponse(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if (response instanceof ExceptionResponse) {
|
if (response instanceof ExceptionResponse) {
|
||||||
if (!responseRequired) {
|
if (!responseRequired) {
|
||||||
|
@ -409,10 +415,10 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor {
|
||||||
}
|
}
|
||||||
|
|
||||||
private void deleteTempQueues() throws Exception {
|
private void deleteTempQueues() throws Exception {
|
||||||
Iterator<String> queueNames = tempQueues.iterator();
|
Iterator<ActiveMQDestination> tmpQs = tempQueues.iterator();
|
||||||
while (queueNames.hasNext()) {
|
while (tmpQs.hasNext()) {
|
||||||
String q = queueNames.next();
|
ActiveMQDestination q = tmpQs.next();
|
||||||
protocolManager.deleteQueue(q);
|
protocolManager.removeDestination(this, q);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1230,10 +1236,7 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor {
|
||||||
@Override
|
@Override
|
||||||
public Response processRemoveDestination(DestinationInfo info) throws Exception {
|
public Response processRemoveDestination(DestinationInfo info) throws Exception {
|
||||||
ActiveMQDestination dest = info.getDestination();
|
ActiveMQDestination dest = info.getDestination();
|
||||||
if (dest.isQueue()) {
|
protocolManager.removeDestination(this, dest);
|
||||||
String qName = "jms.queue." + dest.getPhysicalName();
|
|
||||||
protocolManager.deleteQueue(qName);
|
|
||||||
}
|
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1320,8 +1323,8 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor {
|
||||||
return this.wireFormat;
|
return this.wireFormat;
|
||||||
}
|
}
|
||||||
|
|
||||||
public void registerTempQueue(SimpleString qName) {
|
public void registerTempQueue(ActiveMQDestination queue) {
|
||||||
tempQueues.add(qName.toString());
|
tempQueues.add(queue);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -40,6 +40,9 @@ import org.apache.activemq.artemis.api.core.SimpleString;
|
||||||
import org.apache.activemq.artemis.api.core.management.CoreNotificationType;
|
import org.apache.activemq.artemis.api.core.management.CoreNotificationType;
|
||||||
import org.apache.activemq.artemis.api.core.management.ManagementHelper;
|
import org.apache.activemq.artemis.api.core.management.ManagementHelper;
|
||||||
import org.apache.activemq.artemis.core.io.IOCallback;
|
import org.apache.activemq.artemis.core.io.IOCallback;
|
||||||
|
import org.apache.activemq.artemis.core.postoffice.Binding;
|
||||||
|
import org.apache.activemq.artemis.core.postoffice.Bindings;
|
||||||
|
import org.apache.activemq.artemis.core.postoffice.QueueBinding;
|
||||||
import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQConnectionContext;
|
import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQConnectionContext;
|
||||||
import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQConsumer;
|
import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQConsumer;
|
||||||
import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQPersistenceAdapter;
|
import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQPersistenceAdapter;
|
||||||
|
@ -50,6 +53,7 @@ import org.apache.activemq.artemis.core.remoting.impl.netty.NettyServerConnectio
|
||||||
import org.apache.activemq.artemis.core.security.CheckType;
|
import org.apache.activemq.artemis.core.security.CheckType;
|
||||||
import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
||||||
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
|
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
|
||||||
|
import org.apache.activemq.artemis.core.server.Queue;
|
||||||
import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
|
import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
|
||||||
import org.apache.activemq.artemis.core.server.management.ManagementService;
|
import org.apache.activemq.artemis.core.server.management.ManagementService;
|
||||||
import org.apache.activemq.artemis.core.server.management.Notification;
|
import org.apache.activemq.artemis.core.server.management.Notification;
|
||||||
|
@ -136,7 +140,6 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, No
|
||||||
|
|
||||||
private final ScheduledExecutorService scheduledPool;
|
private final ScheduledExecutorService scheduledPool;
|
||||||
|
|
||||||
|
|
||||||
public OpenWireProtocolManager(OpenWireProtocolManagerFactory factory, ActiveMQServer server) {
|
public OpenWireProtocolManager(OpenWireProtocolManagerFactory factory, ActiveMQServer server) {
|
||||||
this.factory = factory;
|
this.factory = factory;
|
||||||
this.server = server;
|
this.server = server;
|
||||||
|
@ -429,18 +432,25 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, No
|
||||||
}
|
}
|
||||||
// Avoid replaying dup commands
|
// Avoid replaying dup commands
|
||||||
if (!ss.getProducerIds().contains(info.getProducerId())) {
|
if (!ss.getProducerIds().contains(info.getProducerId())) {
|
||||||
ActiveMQDestination destination = info.getDestination();
|
|
||||||
if (destination != null && !AdvisorySupport.isAdvisoryTopic(destination)) {
|
|
||||||
if (theConn.getProducerCount() >= theConn.getMaximumProducersAllowedPerConnection()) {
|
|
||||||
throw new IllegalStateException("Can't add producer on connection " + connectionId + ": at maximum limit: " + theConn.getMaximumProducersAllowedPerConnection());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
AMQSession amqSession = sessions.get(sessionId);
|
AMQSession amqSession = sessions.get(sessionId);
|
||||||
if (amqSession == null) {
|
if (amqSession == null) {
|
||||||
throw new IllegalStateException("Session not exist! : " + sessionId);
|
throw new IllegalStateException("Session not exist! : " + sessionId);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
ActiveMQDestination destination = info.getDestination();
|
||||||
|
if (destination != null && !AdvisorySupport.isAdvisoryTopic(destination)) {
|
||||||
|
if (theConn.getProducerCount() >= theConn.getMaximumProducersAllowedPerConnection()) {
|
||||||
|
throw new IllegalStateException("Can't add producer on connection " + connectionId + ": at maximum limit: " + theConn.getMaximumProducersAllowedPerConnection());
|
||||||
|
}
|
||||||
|
if (destination.isQueue()) {
|
||||||
|
OpenWireUtil.validateDestination(destination, amqSession);
|
||||||
|
}
|
||||||
|
DestinationInfo destInfo = new DestinationInfo(theConn.getConext().getConnectionId(), DestinationInfo.ADD_OPERATION_TYPE, destination);
|
||||||
|
this.addDestination(theConn, destInfo);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
amqSession.createProducer(info);
|
amqSession.createProducer(info);
|
||||||
|
|
||||||
try {
|
try {
|
||||||
|
@ -539,10 +549,40 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, No
|
||||||
return sessions.get(sessionId);
|
return sessions.get(sessionId);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void removeDestination(OpenWireConnection connection, ActiveMQDestination dest) throws Exception {
|
||||||
|
if (dest.isQueue()) {
|
||||||
|
SimpleString qName = new SimpleString("jms.queue." + dest.getPhysicalName());
|
||||||
|
this.server.destroyQueue(qName);
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
Bindings bindings = this.server.getPostOffice().getBindingsForAddress(SimpleString.toSimpleString("jms.topic." + dest.getPhysicalName()));
|
||||||
|
Iterator<Binding> iterator = bindings.getBindings().iterator();
|
||||||
|
|
||||||
|
while (iterator.hasNext()) {
|
||||||
|
Queue b = (Queue) iterator.next().getBindable();
|
||||||
|
if (b.getConsumerCount() > 0) {
|
||||||
|
throw new Exception("Destination still has an active subscription: " + dest.getPhysicalName());
|
||||||
|
}
|
||||||
|
if (b.isDurable()) {
|
||||||
|
throw new Exception("Destination still has durable subscription: " + dest.getPhysicalName());
|
||||||
|
}
|
||||||
|
b.deleteQueue();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!AdvisorySupport.isAdvisoryTopic(dest)) {
|
||||||
|
AMQConnectionContext context = connection.getConext();
|
||||||
|
DestinationInfo advInfo = new DestinationInfo(context.getConnectionId(), DestinationInfo.REMOVE_OPERATION_TYPE, dest);
|
||||||
|
|
||||||
|
ActiveMQTopic topic = AdvisorySupport.getDestinationAdvisoryTopic(dest);
|
||||||
|
fireAdvisory(context, topic, advInfo);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
public void addDestination(OpenWireConnection connection, DestinationInfo info) throws Exception {
|
public void addDestination(OpenWireConnection connection, DestinationInfo info) throws Exception {
|
||||||
ActiveMQDestination dest = info.getDestination();
|
ActiveMQDestination dest = info.getDestination();
|
||||||
if (dest.isQueue()) {
|
if (dest.isQueue()) {
|
||||||
SimpleString qName = new SimpleString("jms.queue." + dest.getPhysicalName());
|
SimpleString qName = OpenWireUtil.toCoreAddress(dest);
|
||||||
ConnectionState state = connection.getState();
|
ConnectionState state = connection.getState();
|
||||||
ConnectionInfo connInfo = state.getInfo();
|
ConnectionInfo connInfo = state.getInfo();
|
||||||
if (connInfo != null) {
|
if (connInfo != null) {
|
||||||
|
@ -555,9 +595,13 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, No
|
||||||
|
|
||||||
((ActiveMQServerImpl) server).checkQueueCreationLimit(user);
|
((ActiveMQServerImpl) server).checkQueueCreationLimit(user);
|
||||||
}
|
}
|
||||||
this.server.createQueue(qName, qName, null, connInfo == null ? null : SimpleString.toSimpleString(connInfo.getUserName()), false, true);
|
|
||||||
|
QueueBinding binding = (QueueBinding) server.getPostOffice().getBinding(qName);
|
||||||
|
if (binding == null) {
|
||||||
|
this.server.createQueue(qName, qName, null, connInfo == null ? null : SimpleString.toSimpleString(connInfo.getUserName()), false, dest.isTemporary());
|
||||||
|
}
|
||||||
if (dest.isTemporary()) {
|
if (dest.isTemporary()) {
|
||||||
connection.registerTempQueue(qName);
|
connection.registerTempQueue(dest);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -570,10 +614,6 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, No
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public void deleteQueue(String q) throws Exception {
|
|
||||||
server.destroyQueue(new SimpleString(q));
|
|
||||||
}
|
|
||||||
|
|
||||||
public void endTransaction(TransactionInfo info) throws Exception {
|
public void endTransaction(TransactionInfo info) throws Exception {
|
||||||
AMQSession txSession = transactions.get(info.getTransactionId());
|
AMQSession txSession = transactions.get(info.getTransactionId());
|
||||||
|
|
||||||
|
|
|
@ -614,4 +614,9 @@ public abstract class ArtemisBrokerBase implements Broker {
|
||||||
return directory.delete();
|
return directory.delete();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public ActiveMQServer getServer()
|
||||||
|
{
|
||||||
|
return server;
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -16,7 +16,6 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.activemq.broker.artemiswrapper;
|
package org.apache.activemq.broker.artemiswrapper;
|
||||||
|
|
||||||
import java.net.URI;
|
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
|
@ -35,7 +34,6 @@ import org.apache.activemq.artemis.core.security.Role;
|
||||||
import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
|
import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
|
||||||
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
|
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
|
||||||
import org.apache.activemq.artemis.core.settings.impl.SlowConsumerPolicy;
|
import org.apache.activemq.artemis.core.settings.impl.SlowConsumerPolicy;
|
||||||
import org.apache.activemq.artemis.jms.client.ActiveMQDestination;
|
|
||||||
import org.apache.activemq.artemis.jms.server.impl.JMSServerManagerImpl;
|
import org.apache.activemq.artemis.jms.server.impl.JMSServerManagerImpl;
|
||||||
import org.apache.activemq.artemis.spi.core.security.ActiveMQSecurityManagerImpl;
|
import org.apache.activemq.artemis.spi.core.security.ActiveMQSecurityManagerImpl;
|
||||||
import org.apache.activemq.artemiswrapper.ArtemisBrokerHelper;
|
import org.apache.activemq.artemiswrapper.ArtemisBrokerHelper;
|
||||||
|
@ -82,6 +80,7 @@ public class ArtemisBrokerWrapper extends ArtemisBrokerBase {
|
||||||
}
|
}
|
||||||
SimpleString dla = new SimpleString("jms.queue.ActiveMQ.DLQ");
|
SimpleString dla = new SimpleString("jms.queue.ActiveMQ.DLQ");
|
||||||
commonSettings.setDeadLetterAddress(dla);
|
commonSettings.setDeadLetterAddress(dla);
|
||||||
|
commonSettings.setAutoCreateJmsQueues(true);
|
||||||
|
|
||||||
serverConfig.getAcceptorConfigurations().add(transportConfiguration);
|
serverConfig.getAcceptorConfigurations().add(transportConfiguration);
|
||||||
if (this.bservice.enableSsl()) {
|
if (this.bservice.enableSsl()) {
|
||||||
|
|
|
@ -22,6 +22,8 @@ import static org.junit.Assert.assertTrue;
|
||||||
import static org.junit.Assert.fail;
|
import static org.junit.Assert.fail;
|
||||||
|
|
||||||
import java.net.URI;
|
import java.net.URI;
|
||||||
|
import java.util.Iterator;
|
||||||
|
import java.util.Set;
|
||||||
|
|
||||||
import javax.jms.Connection;
|
import javax.jms.Connection;
|
||||||
import javax.jms.ConnectionFactory;
|
import javax.jms.ConnectionFactory;
|
||||||
|
@ -31,12 +33,13 @@ import javax.jms.MessageProducer;
|
||||||
import javax.jms.Session;
|
import javax.jms.Session;
|
||||||
import javax.jms.TextMessage;
|
import javax.jms.TextMessage;
|
||||||
import javax.jms.Topic;
|
import javax.jms.Topic;
|
||||||
import javax.management.ObjectName;
|
|
||||||
|
|
||||||
import org.apache.activemq.advisory.DestinationSource;
|
import org.apache.activemq.advisory.DestinationSource;
|
||||||
|
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||||
|
import org.apache.activemq.artemis.core.postoffice.PostOffice;
|
||||||
import org.apache.activemq.broker.BrokerFactory;
|
import org.apache.activemq.broker.BrokerFactory;
|
||||||
import org.apache.activemq.broker.BrokerService;
|
import org.apache.activemq.broker.BrokerService;
|
||||||
import org.apache.activemq.broker.jmx.DestinationViewMBean;
|
import org.apache.activemq.broker.artemiswrapper.ArtemisBrokerWrapper;
|
||||||
import org.apache.activemq.command.ActiveMQDestination;
|
import org.apache.activemq.command.ActiveMQDestination;
|
||||||
import org.apache.activemq.command.ActiveMQTopic;
|
import org.apache.activemq.command.ActiveMQTopic;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
|
@ -45,8 +48,8 @@ import org.junit.Test;
|
||||||
|
|
||||||
public class RemoveDestinationTest {
|
public class RemoveDestinationTest {
|
||||||
|
|
||||||
private static final String VM_BROKER_URL = "vm://localhost?create=false";
|
private static final String TCP_BROKER_URL = "tcp://localhost:61616?create=false";
|
||||||
private static final String BROKER_URL = "broker:vm://localhost?broker.persistent=false&broker.useJmx=true";
|
private static final String BROKER_URL = "broker:tcp://localhost:61616?broker.persistent=false&broker.useJmx=true";
|
||||||
|
|
||||||
BrokerService broker;
|
BrokerService broker;
|
||||||
|
|
||||||
|
@ -65,7 +68,7 @@ public class RemoveDestinationTest {
|
||||||
}
|
}
|
||||||
|
|
||||||
private Connection createConnection(final boolean start) throws JMSException {
|
private Connection createConnection(final boolean start) throws JMSException {
|
||||||
ConnectionFactory cf = new ActiveMQConnectionFactory(VM_BROKER_URL);
|
ConnectionFactory cf = new ActiveMQConnectionFactory(TCP_BROKER_URL);
|
||||||
Connection conn = cf.createConnection();
|
Connection conn = cf.createConnection();
|
||||||
if (start) {
|
if (start) {
|
||||||
conn.start();
|
conn.start();
|
||||||
|
@ -118,7 +121,7 @@ public class RemoveDestinationTest {
|
||||||
|
|
||||||
ActiveMQTopic amqTopic = (ActiveMQTopic) topic;
|
ActiveMQTopic amqTopic = (ActiveMQTopic) topic;
|
||||||
|
|
||||||
assertTrue(destinationPresentInAdminView(broker, amqTopic));
|
assertTrue(destinationPresentInAdminView(amqTopic));
|
||||||
assertTrue(destinationSource.getTopics().contains(amqTopic));
|
assertTrue(destinationSource.getTopics().contains(amqTopic));
|
||||||
|
|
||||||
// This line generates a broker error since the consumer is still active.
|
// This line generates a broker error since the consumer is still active.
|
||||||
|
@ -133,7 +136,7 @@ public class RemoveDestinationTest {
|
||||||
Thread.sleep(3000);
|
Thread.sleep(3000);
|
||||||
|
|
||||||
assertTrue(destinationSource.getTopics().contains(amqTopic));
|
assertTrue(destinationSource.getTopics().contains(amqTopic));
|
||||||
assertTrue(destinationPresentInAdminView(broker, amqTopic));
|
assertTrue(destinationPresentInAdminView(amqTopic));
|
||||||
|
|
||||||
consumer.close();
|
consumer.close();
|
||||||
producer.close();
|
producer.close();
|
||||||
|
@ -146,16 +149,18 @@ public class RemoveDestinationTest {
|
||||||
amqConnection.destroyDestination(amqTopic);
|
amqConnection.destroyDestination(amqTopic);
|
||||||
Thread.sleep(3000);
|
Thread.sleep(3000);
|
||||||
assertFalse(destinationSource.getTopics().contains(amqTopic));
|
assertFalse(destinationSource.getTopics().contains(amqTopic));
|
||||||
assertFalse(destinationPresentInAdminView(broker, amqTopic));
|
assertFalse(destinationPresentInAdminView(amqTopic));
|
||||||
}
|
}
|
||||||
|
|
||||||
private boolean destinationPresentInAdminView(BrokerService broker2, ActiveMQTopic amqTopic) throws Exception {
|
private boolean destinationPresentInAdminView(ActiveMQTopic amqTopic) throws Exception {
|
||||||
boolean found = false;
|
boolean found = false;
|
||||||
for (ObjectName name : broker.getAdminView().getTopics()) {
|
ArtemisBrokerWrapper wrapper = (ArtemisBrokerWrapper) broker.getBroker();
|
||||||
|
PostOffice po = wrapper.getServer().getPostOffice();
|
||||||
DestinationViewMBean proxy = (DestinationViewMBean) broker.getManagementContext().newProxyInstance(name, DestinationViewMBean.class, true);
|
Set<SimpleString> addressSet = po.getAddresses();
|
||||||
|
Iterator<SimpleString> iter = addressSet.iterator();
|
||||||
if (proxy.getName().equals(amqTopic.getPhysicalName())) {
|
String addressToFind = "jms.topic." + amqTopic.getPhysicalName();
|
||||||
|
while (iter.hasNext()) {
|
||||||
|
if (addressToFind.equals(iter.next().toString())) {
|
||||||
found = true;
|
found = true;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue