From be9959e0bc5d4f46a058c0b02dbbdb1060546301 Mon Sep 17 00:00:00 2001 From: Howard Gao Date: Fri, 28 Aug 2015 20:33:38 +0800 Subject: [PATCH] ARTEMIS-191 Refactor RemoveDestinationTest -Using core api to inspect queue status -Catch command visit() exceptions in order to pass it back to client. -Correct destination add/remove handlings --- .../protocol/openwire/OpenWireConnection.java | 29 ++++---- .../openwire/OpenWireProtocolManager.java | 68 +++++++++++++++---- .../artemiswrapper/ArtemisBrokerBase.java | 5 ++ .../artemiswrapper/ArtemisBrokerWrapper.java | 3 +- .../activemq/RemoveDestinationTest.java | 33 +++++---- 5 files changed, 95 insertions(+), 43 deletions(-) diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java index c5644dd04c..3155794d81 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java @@ -39,7 +39,6 @@ import org.apache.activemq.artemis.api.core.ActiveMQBuffers; import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ActiveMQNonExistentQueueException; import org.apache.activemq.artemis.api.core.ActiveMQSecurityException; -import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQBrokerStoppedException; import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQCompositeConsumerBrokerExchange; import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQConnectionContext; @@ -174,7 +173,7 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor { private ConnectionState state; - private final Set tempQueues = new ConcurrentHashSet(); + private final Set tempQueues = new ConcurrentHashSet(); private Map txMap = new ConcurrentHashMap(); @@ -227,7 +226,14 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor { response = new ExceptionResponse(this.stopError); } else { - response = ((Command) command).visit(this); + try { + response = ((Command) command).visit(this); + } + catch (Exception e) { + if (responseRequired) { + response = new ExceptionResponse(e); + } + } if (response instanceof ExceptionResponse) { if (!responseRequired) { @@ -409,10 +415,10 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor { } private void deleteTempQueues() throws Exception { - Iterator queueNames = tempQueues.iterator(); - while (queueNames.hasNext()) { - String q = queueNames.next(); - protocolManager.deleteQueue(q); + Iterator tmpQs = tempQueues.iterator(); + while (tmpQs.hasNext()) { + ActiveMQDestination q = tmpQs.next(); + protocolManager.removeDestination(this, q); } } @@ -1230,10 +1236,7 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor { @Override public Response processRemoveDestination(DestinationInfo info) throws Exception { ActiveMQDestination dest = info.getDestination(); - if (dest.isQueue()) { - String qName = "jms.queue." + dest.getPhysicalName(); - protocolManager.deleteQueue(qName); - } + protocolManager.removeDestination(this, dest); return null; } @@ -1320,8 +1323,8 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor { return this.wireFormat; } - public void registerTempQueue(SimpleString qName) { - tempQueues.add(qName.toString()); + public void registerTempQueue(ActiveMQDestination queue) { + tempQueues.add(queue); } @Override diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java index 2554ce11dc..98b41abc0f 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java @@ -40,6 +40,9 @@ import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.management.CoreNotificationType; import org.apache.activemq.artemis.api.core.management.ManagementHelper; import org.apache.activemq.artemis.core.io.IOCallback; +import org.apache.activemq.artemis.core.postoffice.Binding; +import org.apache.activemq.artemis.core.postoffice.Bindings; +import org.apache.activemq.artemis.core.postoffice.QueueBinding; import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQConnectionContext; import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQConsumer; import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQPersistenceAdapter; @@ -50,6 +53,7 @@ import org.apache.activemq.artemis.core.remoting.impl.netty.NettyServerConnectio import org.apache.activemq.artemis.core.security.CheckType; import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; +import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl; import org.apache.activemq.artemis.core.server.management.ManagementService; import org.apache.activemq.artemis.core.server.management.Notification; @@ -136,7 +140,6 @@ public class OpenWireProtocolManager implements ProtocolManager, No private final ScheduledExecutorService scheduledPool; - public OpenWireProtocolManager(OpenWireProtocolManagerFactory factory, ActiveMQServer server) { this.factory = factory; this.server = server; @@ -429,18 +432,25 @@ public class OpenWireProtocolManager implements ProtocolManager, No } // Avoid replaying dup commands if (!ss.getProducerIds().contains(info.getProducerId())) { - ActiveMQDestination destination = info.getDestination(); - if (destination != null && !AdvisorySupport.isAdvisoryTopic(destination)) { - if (theConn.getProducerCount() >= theConn.getMaximumProducersAllowedPerConnection()) { - throw new IllegalStateException("Can't add producer on connection " + connectionId + ": at maximum limit: " + theConn.getMaximumProducersAllowedPerConnection()); - } - } AMQSession amqSession = sessions.get(sessionId); if (amqSession == null) { throw new IllegalStateException("Session not exist! : " + sessionId); } + ActiveMQDestination destination = info.getDestination(); + if (destination != null && !AdvisorySupport.isAdvisoryTopic(destination)) { + if (theConn.getProducerCount() >= theConn.getMaximumProducersAllowedPerConnection()) { + throw new IllegalStateException("Can't add producer on connection " + connectionId + ": at maximum limit: " + theConn.getMaximumProducersAllowedPerConnection()); + } + if (destination.isQueue()) { + OpenWireUtil.validateDestination(destination, amqSession); + } + DestinationInfo destInfo = new DestinationInfo(theConn.getConext().getConnectionId(), DestinationInfo.ADD_OPERATION_TYPE, destination); + this.addDestination(theConn, destInfo); + } + + amqSession.createProducer(info); try { @@ -539,10 +549,40 @@ public class OpenWireProtocolManager implements ProtocolManager, No return sessions.get(sessionId); } + public void removeDestination(OpenWireConnection connection, ActiveMQDestination dest) throws Exception { + if (dest.isQueue()) { + SimpleString qName = new SimpleString("jms.queue." + dest.getPhysicalName()); + this.server.destroyQueue(qName); + } + else { + Bindings bindings = this.server.getPostOffice().getBindingsForAddress(SimpleString.toSimpleString("jms.topic." + dest.getPhysicalName())); + Iterator iterator = bindings.getBindings().iterator(); + + while (iterator.hasNext()) { + Queue b = (Queue) iterator.next().getBindable(); + if (b.getConsumerCount() > 0) { + throw new Exception("Destination still has an active subscription: " + dest.getPhysicalName()); + } + if (b.isDurable()) { + throw new Exception("Destination still has durable subscription: " + dest.getPhysicalName()); + } + b.deleteQueue(); + } + } + + if (!AdvisorySupport.isAdvisoryTopic(dest)) { + AMQConnectionContext context = connection.getConext(); + DestinationInfo advInfo = new DestinationInfo(context.getConnectionId(), DestinationInfo.REMOVE_OPERATION_TYPE, dest); + + ActiveMQTopic topic = AdvisorySupport.getDestinationAdvisoryTopic(dest); + fireAdvisory(context, topic, advInfo); + } + } + public void addDestination(OpenWireConnection connection, DestinationInfo info) throws Exception { ActiveMQDestination dest = info.getDestination(); if (dest.isQueue()) { - SimpleString qName = new SimpleString("jms.queue." + dest.getPhysicalName()); + SimpleString qName = OpenWireUtil.toCoreAddress(dest); ConnectionState state = connection.getState(); ConnectionInfo connInfo = state.getInfo(); if (connInfo != null) { @@ -555,9 +595,13 @@ public class OpenWireProtocolManager implements ProtocolManager, No ((ActiveMQServerImpl) server).checkQueueCreationLimit(user); } - this.server.createQueue(qName, qName, null, connInfo == null ? null : SimpleString.toSimpleString(connInfo.getUserName()), false, true); + + QueueBinding binding = (QueueBinding) server.getPostOffice().getBinding(qName); + if (binding == null) { + this.server.createQueue(qName, qName, null, connInfo == null ? null : SimpleString.toSimpleString(connInfo.getUserName()), false, dest.isTemporary()); + } if (dest.isTemporary()) { - connection.registerTempQueue(qName); + connection.registerTempQueue(dest); } } @@ -570,10 +614,6 @@ public class OpenWireProtocolManager implements ProtocolManager, No } } - public void deleteQueue(String q) throws Exception { - server.destroyQueue(new SimpleString(q)); - } - public void endTransaction(TransactionInfo info) throws Exception { AMQSession txSession = transactions.get(info.getTransactionId()); diff --git a/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/artemiswrapper/ArtemisBrokerBase.java b/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/artemiswrapper/ArtemisBrokerBase.java index 6f2fff6546..5c052a67dd 100644 --- a/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/artemiswrapper/ArtemisBrokerBase.java +++ b/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/artemiswrapper/ArtemisBrokerBase.java @@ -614,4 +614,9 @@ public abstract class ArtemisBrokerBase implements Broker { return directory.delete(); } + public ActiveMQServer getServer() + { + return server; + } + } diff --git a/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/artemiswrapper/ArtemisBrokerWrapper.java b/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/artemiswrapper/ArtemisBrokerWrapper.java index 723529f109..14b93c6d79 100644 --- a/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/artemiswrapper/ArtemisBrokerWrapper.java +++ b/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/artemiswrapper/ArtemisBrokerWrapper.java @@ -16,7 +16,6 @@ */ package org.apache.activemq.broker.artemiswrapper; -import java.net.URI; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; @@ -35,7 +34,6 @@ import org.apache.activemq.artemis.core.security.Role; import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy; import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.core.settings.impl.SlowConsumerPolicy; -import org.apache.activemq.artemis.jms.client.ActiveMQDestination; import org.apache.activemq.artemis.jms.server.impl.JMSServerManagerImpl; import org.apache.activemq.artemis.spi.core.security.ActiveMQSecurityManagerImpl; import org.apache.activemq.artemiswrapper.ArtemisBrokerHelper; @@ -82,6 +80,7 @@ public class ArtemisBrokerWrapper extends ArtemisBrokerBase { } SimpleString dla = new SimpleString("jms.queue.ActiveMQ.DLQ"); commonSettings.setDeadLetterAddress(dla); + commonSettings.setAutoCreateJmsQueues(true); serverConfig.getAcceptorConfigurations().add(transportConfiguration); if (this.bservice.enableSsl()) { diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/RemoveDestinationTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/RemoveDestinationTest.java index 542972392d..894abe3127 100644 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/RemoveDestinationTest.java +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/RemoveDestinationTest.java @@ -22,6 +22,8 @@ import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import java.net.URI; +import java.util.Iterator; +import java.util.Set; import javax.jms.Connection; import javax.jms.ConnectionFactory; @@ -31,12 +33,13 @@ import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import javax.jms.Topic; -import javax.management.ObjectName; import org.apache.activemq.advisory.DestinationSource; +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.core.postoffice.PostOffice; import org.apache.activemq.broker.BrokerFactory; import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.broker.jmx.DestinationViewMBean; +import org.apache.activemq.broker.artemiswrapper.ArtemisBrokerWrapper; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQTopic; import org.junit.After; @@ -45,8 +48,8 @@ import org.junit.Test; public class RemoveDestinationTest { - private static final String VM_BROKER_URL = "vm://localhost?create=false"; - private static final String BROKER_URL = "broker:vm://localhost?broker.persistent=false&broker.useJmx=true"; + private static final String TCP_BROKER_URL = "tcp://localhost:61616?create=false"; + private static final String BROKER_URL = "broker:tcp://localhost:61616?broker.persistent=false&broker.useJmx=true"; BrokerService broker; @@ -65,7 +68,7 @@ public class RemoveDestinationTest { } private Connection createConnection(final boolean start) throws JMSException { - ConnectionFactory cf = new ActiveMQConnectionFactory(VM_BROKER_URL); + ConnectionFactory cf = new ActiveMQConnectionFactory(TCP_BROKER_URL); Connection conn = cf.createConnection(); if (start) { conn.start(); @@ -118,7 +121,7 @@ public class RemoveDestinationTest { ActiveMQTopic amqTopic = (ActiveMQTopic) topic; - assertTrue(destinationPresentInAdminView(broker, amqTopic)); + assertTrue(destinationPresentInAdminView(amqTopic)); assertTrue(destinationSource.getTopics().contains(amqTopic)); // This line generates a broker error since the consumer is still active. @@ -133,7 +136,7 @@ public class RemoveDestinationTest { Thread.sleep(3000); assertTrue(destinationSource.getTopics().contains(amqTopic)); - assertTrue(destinationPresentInAdminView(broker, amqTopic)); + assertTrue(destinationPresentInAdminView(amqTopic)); consumer.close(); producer.close(); @@ -146,16 +149,18 @@ public class RemoveDestinationTest { amqConnection.destroyDestination(amqTopic); Thread.sleep(3000); assertFalse(destinationSource.getTopics().contains(amqTopic)); - assertFalse(destinationPresentInAdminView(broker, amqTopic)); + assertFalse(destinationPresentInAdminView(amqTopic)); } - private boolean destinationPresentInAdminView(BrokerService broker2, ActiveMQTopic amqTopic) throws Exception { + private boolean destinationPresentInAdminView(ActiveMQTopic amqTopic) throws Exception { boolean found = false; - for (ObjectName name : broker.getAdminView().getTopics()) { - - DestinationViewMBean proxy = (DestinationViewMBean) broker.getManagementContext().newProxyInstance(name, DestinationViewMBean.class, true); - - if (proxy.getName().equals(amqTopic.getPhysicalName())) { + ArtemisBrokerWrapper wrapper = (ArtemisBrokerWrapper) broker.getBroker(); + PostOffice po = wrapper.getServer().getPostOffice(); + Set addressSet = po.getAddresses(); + Iterator iter = addressSet.iterator(); + String addressToFind = "jms.topic." + amqTopic.getPhysicalName(); + while (iter.hasNext()) { + if (addressToFind.equals(iter.next().toString())) { found = true; break; }