From bc38d7ce32d01cd0ac4c06ad4ff07274758bee4c Mon Sep 17 00:00:00 2001 From: "Christopher L. Shannon (cshannon)" Date: Thu, 18 Jan 2018 10:10:43 -0500 Subject: [PATCH] ARTEMIS-1619 - Add plugin support for address lifecyle Adding callbacks to the plugin API for address creation, update and removals --- .../core/postoffice/impl/PostOfficeImpl.java | 17 +++-- .../core/server/impl/ActiveMQServerImpl.java | 13 ++-- .../server/plugin/ActiveMQServerPlugin.java | 67 +++++++++++++++++ .../integration/plugin/AmqpPluginTest.java | 22 +++++- .../integration/plugin/CorePluginTest.java | 73 +++++++++++++++++-- .../plugin/MethodCalledVerifier.java | 48 ++++++++++++ .../integration/plugin/MqttPluginTest.java | 25 ++++++- .../plugin/OpenwirePluginTest.java | 29 +++++++- .../integration/plugin/StompPluginTest.java | 26 ++++++- 9 files changed, 295 insertions(+), 25 deletions(-) diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java index 6f731e563a..d69a782b1b 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java @@ -433,6 +433,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding private boolean internalAddressInfo(AddressInfo addressInfo, boolean reload) throws Exception { synchronized (addressLock) { + server.callBrokerPlugins(server.hasBrokerPlugins() ? plugin -> plugin.beforeAddAddress(addressInfo, reload) : null); boolean result; if (reload) { result = addressManager.reloadAddressInfo(addressInfo); @@ -445,6 +446,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding if (!addressInfo.isInternal()) { managementService.registerAddress(addressInfo); } + server.callBrokerPlugins(server.hasBrokerPlugins() ? plugin -> plugin.afterAddAddress(addressInfo, reload) : null); } catch (Exception e) { e.printStackTrace(); } @@ -517,23 +519,28 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding @Override public AddressInfo updateAddressInfo(SimpleString addressName, EnumSet routingTypes) throws Exception { - synchronized (addressLock) { - return addressManager.updateAddressInfo(addressName, routingTypes); + server.callBrokerPlugins(server.hasBrokerPlugins() ? plugin -> plugin.beforeUpdateAddress(addressName, routingTypes) : null); + final AddressInfo address = addressManager.updateAddressInfo(addressName, routingTypes); + server.callBrokerPlugins(server.hasBrokerPlugins() ? plugin -> plugin.afterUpdateAddress(address) : null); + + return address; } - - } @Override public AddressInfo removeAddressInfo(SimpleString address) throws Exception { synchronized (addressLock) { + server.callBrokerPlugins(server.hasBrokerPlugins() ? plugin -> plugin.beforeRemoveAddress(address) : null); Bindings bindingsForAddress = getBindingsForAddress(address); if (bindingsForAddress.getBindings().size() > 0) { throw ActiveMQMessageBundle.BUNDLE.addressHasBindings(address); } managementService.unregisterAddress(address); - return addressManager.removeAddressInfo(address); + final AddressInfo addressInfo = addressManager.removeAddressInfo(address); + server.callBrokerPlugins(server.hasBrokerPlugins() ? plugin -> plugin.afterRemoveAddress(address, addressInfo) : null); + + return addressInfo; } } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java index d6230e36ad..047e83964f 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java @@ -1860,6 +1860,8 @@ public class ActiveMQServerImpl implements ActiveMQServer { queue.deleteQueue(removeConsumers); + callBrokerPlugins(hasBrokerPlugins() ? plugin -> plugin.afterDestroyQueue(queue, address, session, checkConsumerCount, + removeConsumers, autoDeleteAddress) : null); AddressInfo addressInfo = getAddressInfo(address); if (autoDeleteAddress && postOffice != null && addressInfo != null && addressInfo.isAutoCreated()) { @@ -1871,9 +1873,6 @@ public class ActiveMQServerImpl implements ActiveMQServer { } callPostQueueDeletionCallbacks(address, queueName); - - callBrokerPlugins(hasBrokerPlugins() ? plugin -> plugin.afterDestroyQueue(queue, address, session, checkConsumerCount, - removeConsumers, autoDeleteAddress) : null); } @Override @@ -2777,10 +2776,10 @@ public class ActiveMQServerImpl implements ActiveMQServer { managementService.registerQueue(queue, queue.getAddress(), storageManager); } - callPostQueueCreationCallbacks(queue.getName()); - callBrokerPlugins(hasBrokerPlugins() ? plugin -> plugin.afterCreateQueue(queue) : null); + callPostQueueCreationCallbacks(queue.getName()); + return queue; } @@ -2882,10 +2881,10 @@ public class ActiveMQServerImpl implements ActiveMQServer { managementService.registerQueue(queue, queue.getAddress(), storageManager); - callPostQueueCreationCallbacks(queue.getName()); - callBrokerPlugins(hasBrokerPlugins() ? plugin -> plugin.afterCreateQueue(queue) : null); + callPostQueueCreationCallbacks(queue.getName()); + return queue; } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/ActiveMQServerPlugin.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/ActiveMQServerPlugin.java index c5e22ef23a..e095e0fb10 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/ActiveMQServerPlugin.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/ActiveMQServerPlugin.java @@ -17,6 +17,7 @@ package org.apache.activemq.artemis.core.server.plugin; +import java.util.EnumSet; import java.util.Map; import org.apache.activemq.artemis.api.core.ActiveMQException; @@ -36,6 +37,7 @@ import org.apache.activemq.artemis.core.server.ServerConsumer; import org.apache.activemq.artemis.core.server.ServerSession; import org.apache.activemq.artemis.core.server.cluster.Bridge; import org.apache.activemq.artemis.core.server.impl.AckReason; +import org.apache.activemq.artemis.core.server.impl.AddressInfo; import org.apache.activemq.artemis.core.transaction.Transaction; import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; import org.apache.activemq.artemis.spi.core.protocol.SessionCallback; @@ -213,6 +215,71 @@ public interface ActiveMQServerPlugin { } + /** + * Before an address is added tot he broker + * + * @param addressInfo The addressInfo that will be added + * @param reload If the address is being reloaded + * @throws ActiveMQException + */ + default void beforeAddAddress(AddressInfo addressInfo, boolean reload) throws ActiveMQException { + + } + + /** + * After an address has been added tot he broker + * + * @param addressInfo The newly added address + * @param reload If the address is being reloaded + * @throws ActiveMQException + */ + default void afterAddAddress(AddressInfo addressInfo, boolean reload) throws ActiveMQException { + + } + + + /** + * Before an address is updated + * + * @param address The existing address info that is about to be updated + * @param routingTypes The new routing types that the address will be updated with + * @throws ActiveMQException + */ + default void beforeUpdateAddress(SimpleString address, EnumSet routingTypes) throws ActiveMQException { + + } + + /** + * After an address has been updated + * + * @param addressInfo The newly updated address info + * @throws ActiveMQException + */ + default void afterUpdateAddress(AddressInfo addressInfo) throws ActiveMQException { + + } + + /** + * Before an address is removed + * + * @param address The address that will be removed + * @throws ActiveMQException + */ + default void beforeRemoveAddress(SimpleString address) throws ActiveMQException { + + } + + /** + * After an address has been removed + * + * @param address The address that has been removed + * @param addressInfo The address info that has been removed or null if not removed + * @throws ActiveMQException + */ + default void afterRemoveAddress(SimpleString address, AddressInfo addressInfo) throws ActiveMQException { + + } + /** * Before a queue is created * diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/AmqpPluginTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/AmqpPluginTest.java index d918b2766e..2990074877 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/AmqpPluginTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/AmqpPluginTest.java @@ -16,6 +16,7 @@ */ package org.apache.activemq.artemis.tests.integration.plugin; +import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_ADD_ADDRESS; import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_CLOSE_CONSUMER; import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_CLOSE_SESSION; import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_CREATE_CONNECTION; @@ -26,7 +27,9 @@ import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledV import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_DEPLOY_BRIDGE; import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_DESTROY_CONNECTION; import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_MESSAGE_ROUTE; +import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_REMOVE_ADDRESS; import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_SEND; +import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_ADD_ADDRESS; import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_CLOSE_CONSUMER; import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_CLOSE_SESSION; import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_CREATE_CONSUMER; @@ -35,6 +38,7 @@ import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledV import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_DELIVER; import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_DEPLOY_BRIDGE; import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_MESSAGE_ROUTE; +import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_REMOVE_ADDRESS; import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_SEND; import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.MESSAGE_ACKED; import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.MESSAGE_EXPIRED; @@ -100,7 +104,23 @@ public class AmqpPluginTest extends AmqpClientTestSupport { BEFORE_CREATE_SESSION, AFTER_CREATE_SESSION, BEFORE_CLOSE_SESSION, AFTER_CLOSE_SESSION, BEFORE_CREATE_CONSUMER, AFTER_CREATE_CONSUMER, BEFORE_CLOSE_CONSUMER, AFTER_CLOSE_CONSUMER, BEFORE_CREATE_QUEUE, AFTER_CREATE_QUEUE, MESSAGE_ACKED, BEFORE_SEND, - AFTER_SEND, BEFORE_MESSAGE_ROUTE, AFTER_MESSAGE_ROUTE, BEFORE_DELIVER, AFTER_DELIVER); + AFTER_SEND, BEFORE_MESSAGE_ROUTE, AFTER_MESSAGE_ROUTE, BEFORE_DELIVER, AFTER_DELIVER, + BEFORE_ADD_ADDRESS, AFTER_ADD_ADDRESS); + } + + @Test(timeout = 60000) + public void testQueueReceiverAutoCreatedQueue() throws Exception { + AmqpClient client = createAmqpClient(); + AmqpConnection connection = addConnection(client.connect()); + AmqpSession session = connection.createSession(); + + AmqpReceiver receiver = session.createReceiver("autoCreated"); + receiver.close(); + connection.close(); + + verifier.validatePluginMethodsAtLeast(1, BEFORE_ADD_ADDRESS, AFTER_ADD_ADDRESS); + //should fire once for the auto created address being removed + verifier.validatePluginMethodsEquals(1, BEFORE_REMOVE_ADDRESS, AFTER_REMOVE_ADDRESS); } @Override diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/CorePluginTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/CorePluginTest.java index 7df0ac01d2..518dd3ffb7 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/CorePluginTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/CorePluginTest.java @@ -16,6 +16,7 @@ */ package org.apache.activemq.artemis.tests.integration.plugin; +import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_ADD_ADDRESS; import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_CLOSE_CONSUMER; import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_CLOSE_SESSION; import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_CREATE_CONNECTION; @@ -27,8 +28,11 @@ import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledV import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_DESTROY_CONNECTION; import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_DESTROY_QUEUE; import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_MESSAGE_ROUTE; +import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_REMOVE_ADDRESS; import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_SEND; import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_SESSION_METADATA_ADDED; +import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_UPDATE_ADDRESS; +import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_ADD_ADDRESS; import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_CLOSE_CONSUMER; import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_CLOSE_SESSION; import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_CREATE_CONSUMER; @@ -38,8 +42,10 @@ import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledV import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_DEPLOY_BRIDGE; import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_DESTROY_QUEUE; import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_MESSAGE_ROUTE; +import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_REMOVE_ADDRESS; import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_SEND; import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_SESSION_METADATA_ADDED; +import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_UPDATE_ADDRESS; import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.MESSAGE_ACKED; import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.MESSAGE_EXPIRED; @@ -54,7 +60,9 @@ import javax.jms.MessageProducer; import javax.jms.Queue; import javax.jms.Session; import javax.jms.TextMessage; +import javax.jms.Topic; +import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.TransportConfiguration; import org.apache.activemq.artemis.core.config.BridgeConfiguration; @@ -65,6 +73,7 @@ import org.apache.activemq.artemis.core.remoting.impl.invm.TransportConstants; import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.MessageReference; import org.apache.activemq.artemis.core.server.impl.AckReason; +import org.apache.activemq.artemis.core.server.impl.AddressInfo; import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerPlugin; import org.apache.activemq.artemis.tests.util.JMSTestBase; import org.junit.Before; @@ -116,11 +125,12 @@ public class CorePluginTest extends JMSTestBase { conn.close(); verifier.validatePluginMethodsEquals(0, MESSAGE_EXPIRED, BEFORE_DEPLOY_BRIDGE, AFTER_DEPLOY_BRIDGE, - BEFORE_DESTROY_QUEUE, AFTER_DESTROY_QUEUE); + BEFORE_DESTROY_QUEUE, AFTER_DESTROY_QUEUE, BEFORE_UPDATE_ADDRESS, AFTER_UPDATE_ADDRESS, + BEFORE_REMOVE_ADDRESS, AFTER_REMOVE_ADDRESS); verifier.validatePluginMethodsEquals(1, AFTER_CREATE_CONNECTION, AFTER_DESTROY_CONNECTION, BEFORE_CREATE_CONSUMER, AFTER_CREATE_CONSUMER, BEFORE_CLOSE_CONSUMER, AFTER_CLOSE_CONSUMER, BEFORE_CREATE_QUEUE, AFTER_CREATE_QUEUE, MESSAGE_ACKED, BEFORE_SEND, AFTER_SEND, BEFORE_MESSAGE_ROUTE, - AFTER_MESSAGE_ROUTE); + AFTER_MESSAGE_ROUTE, BEFORE_ADD_ADDRESS, AFTER_ADD_ADDRESS); verifier.validatePluginMethodsEquals(2, BEFORE_CREATE_SESSION, AFTER_CREATE_SESSION, BEFORE_CLOSE_SESSION, AFTER_CLOSE_SESSION); @@ -139,7 +149,42 @@ public class CorePluginTest extends JMSTestBase { server.destroyQueue(new SimpleString(queue.getQueueName())); verifier.validatePluginMethodsEquals(1, BEFORE_CREATE_QUEUE, AFTER_CREATE_QUEUE, BEFORE_DESTROY_QUEUE, - AFTER_DESTROY_QUEUE); + AFTER_DESTROY_QUEUE, BEFORE_ADD_ADDRESS, AFTER_ADD_ADDRESS); + } + + @Test + public void testAutoCreateQueue() throws Exception { + conn = cf.createConnection(); + conn.start(); + Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue autoCreatedQueue = sess.createQueue("autoCreatedQueue"); + sess.createConsumer(autoCreatedQueue); + conn.close(); + + verifier.validatePluginMethodsEquals(1, BEFORE_DESTROY_QUEUE, + AFTER_DESTROY_QUEUE, BEFORE_REMOVE_ADDRESS, AFTER_REMOVE_ADDRESS); + + verifier.validatePluginMethodsEquals(2, BEFORE_CREATE_QUEUE, AFTER_CREATE_QUEUE, BEFORE_ADD_ADDRESS, + AFTER_ADD_ADDRESS); + } + + @Test + public void testAutoCreateTopic() throws Exception { + conn = cf.createConnection(); + conn.start(); + Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); + Topic autoCreatedTopic = sess.createTopic("autoCreatedTopic"); + sess.createConsumer(autoCreatedTopic); + conn.close(); + + //before/add address called just once to remove autocreated destination + verifier.validatePluginMethodsEquals(1, BEFORE_DESTROY_QUEUE, + AFTER_DESTROY_QUEUE, BEFORE_REMOVE_ADDRESS, AFTER_REMOVE_ADDRESS); + + //Before/Add address are called twice because of the autocreated destination and the + //created destination in the before method + verifier.validatePluginMethodsEquals(2, BEFORE_CREATE_QUEUE, AFTER_CREATE_QUEUE, BEFORE_ADD_ADDRESS, + AFTER_ADD_ADDRESS); } @Test @@ -162,12 +207,13 @@ public class CorePluginTest extends JMSTestBase { conn.close(); - verifier.validatePluginMethodsEquals(0, BEFORE_DEPLOY_BRIDGE, AFTER_DEPLOY_BRIDGE); + verifier.validatePluginMethodsEquals(0, BEFORE_DEPLOY_BRIDGE, AFTER_DEPLOY_BRIDGE, + BEFORE_REMOVE_ADDRESS, AFTER_REMOVE_ADDRESS); verifier.validatePluginMethodsAtLeast(1, AFTER_CREATE_CONNECTION, AFTER_DESTROY_CONNECTION, BEFORE_CREATE_CONSUMER, AFTER_CREATE_CONSUMER, BEFORE_CLOSE_CONSUMER, AFTER_CLOSE_CONSUMER, - BEFORE_SESSION_METADATA_ADDED, AFTER_SESSION_METADATA_ADDED, + BEFORE_SESSION_METADATA_ADDED, AFTER_SESSION_METADATA_ADDED, BEFORE_ADD_ADDRESS, AFTER_ADD_ADDRESS, BEFORE_CREATE_QUEUE, AFTER_CREATE_QUEUE, MESSAGE_ACKED, BEFORE_SEND, AFTER_SEND, BEFORE_MESSAGE_ROUTE, - AFTER_MESSAGE_ROUTE, MESSAGE_EXPIRED); + AFTER_MESSAGE_ROUTE, MESSAGE_EXPIRED, BEFORE_ADD_ADDRESS, AFTER_ADD_ADDRESS); verifier.validatePluginMethodsEquals(2, BEFORE_CREATE_SESSION, AFTER_CREATE_SESSION, BEFORE_CLOSE_SESSION, AFTER_CLOSE_SESSION); @@ -194,11 +240,13 @@ public class CorePluginTest extends JMSTestBase { conn.close(); - verifier.validatePluginMethodsEquals(0, BEFORE_DEPLOY_BRIDGE, AFTER_DEPLOY_BRIDGE); + verifier.validatePluginMethodsEquals(0, BEFORE_DEPLOY_BRIDGE, AFTER_DEPLOY_BRIDGE, + BEFORE_REMOVE_ADDRESS, AFTER_REMOVE_ADDRESS); verifier.validatePluginMethodsAtLeast(1, AFTER_CREATE_CONNECTION, AFTER_DESTROY_CONNECTION, BEFORE_CREATE_CONSUMER, AFTER_CREATE_CONSUMER, BEFORE_CLOSE_CONSUMER, AFTER_CLOSE_CONSUMER, BEFORE_CREATE_QUEUE, AFTER_CREATE_QUEUE, MESSAGE_ACKED, BEFORE_SEND, AFTER_SEND, BEFORE_MESSAGE_ROUTE, - AFTER_MESSAGE_ROUTE, BEFORE_DELIVER, AFTER_DELIVER, MESSAGE_EXPIRED); + AFTER_MESSAGE_ROUTE, BEFORE_DELIVER, AFTER_DELIVER, MESSAGE_EXPIRED, BEFORE_ADD_ADDRESS, + AFTER_ADD_ADDRESS); verifier.validatePluginMethodsEquals(2, BEFORE_CREATE_SESSION, AFTER_CREATE_SESSION, BEFORE_CLOSE_SESSION, AFTER_CLOSE_SESSION); @@ -257,6 +305,15 @@ public class CorePluginTest extends JMSTestBase { } + + @Test + public void testUpdateAddress() throws Exception { + server.addOrUpdateAddressInfo(new AddressInfo(SimpleString.toSimpleString("test"), RoutingType.ANYCAST)); + server.addOrUpdateAddressInfo(new AddressInfo(SimpleString.toSimpleString("test"), RoutingType.MULTICAST)); + + verifier.validatePluginMethodsEquals(1, BEFORE_UPDATE_ADDRESS, AFTER_UPDATE_ADDRESS); + } + private class ExpiredPluginVerifier implements ActiveMQServerPlugin { @Override diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/MethodCalledVerifier.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/MethodCalledVerifier.java index dbb659354f..f18e1be982 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/MethodCalledVerifier.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/MethodCalledVerifier.java @@ -7,12 +7,14 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import java.util.Arrays; +import java.util.EnumSet; import java.util.HashMap; import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; import com.google.common.base.Preconditions; +import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.SimpleString; @@ -29,6 +31,7 @@ import org.apache.activemq.artemis.core.server.ServerConsumer; import org.apache.activemq.artemis.core.server.ServerSession; import org.apache.activemq.artemis.core.server.cluster.Bridge; import org.apache.activemq.artemis.core.server.impl.AckReason; +import org.apache.activemq.artemis.core.server.impl.AddressInfo; import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerPlugin; import org.apache.activemq.artemis.core.transaction.Transaction; import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; @@ -67,6 +70,12 @@ public class MethodCalledVerifier implements ActiveMQServerPlugin { public static final String AFTER_CREATE_CONSUMER = "afterCreateConsumer"; public static final String BEFORE_CLOSE_CONSUMER = "beforeCloseConsumer"; public static final String AFTER_CLOSE_CONSUMER = "afterCloseConsumer"; + public static final String BEFORE_ADD_ADDRESS = "beforeAddAddress"; + public static final String AFTER_ADD_ADDRESS = "afterAddAddress"; + public static final String BEFORE_UPDATE_ADDRESS = "beforeUpdateAddress"; + public static final String AFTER_UPDATE_ADDRESS = "afterUpdateAddress"; + public static final String BEFORE_REMOVE_ADDRESS = "beforeRemoveAddress"; + public static final String AFTER_REMOVE_ADDRESS = "afterRemoveAddress"; public static final String BEFORE_CREATE_QUEUE = "beforeCreateQueue"; public static final String AFTER_CREATE_QUEUE = "afterCreateQueue"; public static final String BEFORE_DESTROY_QUEUE = "beforeDestroyQueue"; @@ -167,6 +176,45 @@ public class MethodCalledVerifier implements ActiveMQServerPlugin { methodCalled(AFTER_CLOSE_CONSUMER); } + @Override + public void beforeAddAddress(AddressInfo addressInfo, boolean reload) throws ActiveMQException { + Preconditions.checkNotNull(addressInfo); + methodCalled(BEFORE_ADD_ADDRESS); + } + + @Override + public void afterAddAddress(AddressInfo addressInfo, boolean reload) throws ActiveMQException { + Preconditions.checkNotNull(addressInfo); + methodCalled(AFTER_ADD_ADDRESS); + } + + @Override + public void beforeUpdateAddress(SimpleString address, EnumSet routingTypes) + throws ActiveMQException { + Preconditions.checkNotNull(address); + Preconditions.checkNotNull(routingTypes); + methodCalled(BEFORE_UPDATE_ADDRESS); + } + + @Override + public void afterUpdateAddress(AddressInfo addressInfo) throws ActiveMQException { + Preconditions.checkNotNull(addressInfo); + methodCalled(AFTER_UPDATE_ADDRESS); + } + + @Override + public void beforeRemoveAddress(SimpleString address) throws ActiveMQException { + Preconditions.checkNotNull(address); + methodCalled(BEFORE_REMOVE_ADDRESS); + } + + @Override + public void afterRemoveAddress(SimpleString address, AddressInfo addressInfo) throws ActiveMQException { + Preconditions.checkNotNull(address); + Preconditions.checkNotNull(addressInfo); + methodCalled(AFTER_REMOVE_ADDRESS); + } + @Override public void beforeCreateQueue(QueueConfig queueConfig) { Preconditions.checkNotNull(queueConfig); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/MqttPluginTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/MqttPluginTest.java index 2365ae5c43..2c68858cce 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/MqttPluginTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/MqttPluginTest.java @@ -22,10 +22,12 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.tests.integration.mqtt.imported.MQTTClientProvider; import org.apache.activemq.artemis.tests.integration.mqtt.imported.MQTTTestSupport; import org.junit.Test; +import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_ADD_ADDRESS; import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_CLOSE_CONSUMER; import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_CLOSE_SESSION; import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_CREATE_CONNECTION; @@ -36,7 +38,9 @@ import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledV import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_DEPLOY_BRIDGE; import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_DESTROY_CONNECTION; import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_MESSAGE_ROUTE; +import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_REMOVE_ADDRESS; import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_SEND; +import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_ADD_ADDRESS; import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_CLOSE_CONSUMER; import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_CLOSE_SESSION; import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_CREATE_CONSUMER; @@ -45,6 +49,7 @@ import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledV import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_DELIVER; import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_DEPLOY_BRIDGE; import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_MESSAGE_ROUTE; +import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_REMOVE_ADDRESS; import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_SEND; import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.MESSAGE_ACKED; import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.MESSAGE_EXPIRED; @@ -59,6 +64,11 @@ public class MqttPluginTest extends MQTTTestSupport { public void configureBroker() throws Exception { super.configureBroker(); server.registerBrokerPlugin(verifier); + + AddressSettings addressSettings = new AddressSettings(); + addressSettings.setAutoDeleteQueues(true).setAutoDeleteAddresses(true); + + server.getAddressSettingsRepository().addMatch("#", addressSettings); } @Test(timeout = 60 * 1000) @@ -106,7 +116,20 @@ public class MqttPluginTest extends MQTTTestSupport { AFTER_CREATE_SESSION, BEFORE_CLOSE_SESSION, AFTER_CLOSE_SESSION, BEFORE_CREATE_CONSUMER, AFTER_CREATE_CONSUMER, BEFORE_CLOSE_CONSUMER, AFTER_CLOSE_CONSUMER, BEFORE_CREATE_QUEUE, AFTER_CREATE_QUEUE, MESSAGE_ACKED, BEFORE_SEND, AFTER_SEND, BEFORE_MESSAGE_ROUTE, AFTER_MESSAGE_ROUTE, BEFORE_DELIVER, - AFTER_DELIVER); + AFTER_DELIVER, BEFORE_ADD_ADDRESS, AFTER_ADD_ADDRESS); + } + + @Test(timeout = 60 * 1000) + public void testMQTTAutoCreate() throws Exception { + final MQTTClientProvider subscriptionProvider = getMQTTClientProvider(); + initializeConnection(subscriptionProvider); + + subscriptionProvider.subscribe("foo/bah", AT_MOST_ONCE); + + subscriptionProvider.disconnect(); + + verifier.validatePluginMethodsAtLeast(1, + BEFORE_ADD_ADDRESS, AFTER_ADD_ADDRESS, BEFORE_REMOVE_ADDRESS, AFTER_REMOVE_ADDRESS); } } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/OpenwirePluginTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/OpenwirePluginTest.java index afb6841d72..2440cc6da7 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/OpenwirePluginTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/OpenwirePluginTest.java @@ -16,6 +16,7 @@ */ package org.apache.activemq.artemis.tests.integration.plugin; +import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_ADD_ADDRESS; import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_CLOSE_CONSUMER; import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_CLOSE_SESSION; import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_CREATE_CONNECTION; @@ -26,8 +27,10 @@ import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledV import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_DEPLOY_BRIDGE; import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_DESTROY_CONNECTION; import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_MESSAGE_ROUTE; +import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_REMOVE_ADDRESS; import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_SEND; import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_SESSION_METADATA_ADDED; +import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_ADD_ADDRESS; import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_CLOSE_CONSUMER; import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_CLOSE_SESSION; import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_CREATE_CONSUMER; @@ -36,6 +39,7 @@ import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledV import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_DELIVER; import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_DEPLOY_BRIDGE; import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_MESSAGE_ROUTE; +import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_REMOVE_ADDRESS; import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_SEND; import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_SESSION_METADATA_ADDED; import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.MESSAGE_ACKED; @@ -68,6 +72,10 @@ public class OpenwirePluginTest extends BasicOpenWireTest { long maxAddressSize, Map settings) { ActiveMQServer server = super.createServer(realFiles, configuration, pageSize, maxAddressSize, settings); server.registerBrokerPlugin(verifier); + + configuration.getAddressesSettings().put("autoCreated", new AddressSettings().setAutoDeleteAddresses(true) + .setAutoDeleteQueues(true).setAutoCreateQueues(true).setAutoCreateAddresses(true)); + return server; } @@ -102,7 +110,26 @@ public class OpenwirePluginTest extends BasicOpenWireTest { AFTER_CREATE_SESSION, BEFORE_CLOSE_SESSION, AFTER_CLOSE_SESSION, BEFORE_CREATE_CONSUMER, AFTER_CREATE_CONSUMER, BEFORE_CLOSE_CONSUMER, AFTER_CLOSE_CONSUMER, BEFORE_CREATE_QUEUE, AFTER_CREATE_QUEUE, MESSAGE_ACKED, BEFORE_SEND, AFTER_SEND, BEFORE_MESSAGE_ROUTE, AFTER_MESSAGE_ROUTE, BEFORE_DELIVER, - AFTER_DELIVER); + AFTER_DELIVER, BEFORE_ADD_ADDRESS, AFTER_ADD_ADDRESS); + + } + + @Test + public void testAutoCreatedQueue() throws JMSException { + connection.start(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue queue = session.createQueue("autoCreated"); + + // Reset the session. + session.close(); + session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + session.createConsumer(queue); + session.close(); + connection.close(); + + //Happens more than once because of advisories + verifier.validatePluginMethodsAtLeast(2, BEFORE_ADD_ADDRESS, AFTER_ADD_ADDRESS, + BEFORE_REMOVE_ADDRESS, AFTER_REMOVE_ADDRESS); } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/StompPluginTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/StompPluginTest.java index 4aac664a82..708e96827b 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/StompPluginTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/StompPluginTest.java @@ -17,6 +17,7 @@ package org.apache.activemq.artemis.tests.integration.plugin; +import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_ADD_ADDRESS; import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_CLOSE_CONSUMER; import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_CLOSE_SESSION; import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_CREATE_CONNECTION; @@ -27,7 +28,9 @@ import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledV import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_DEPLOY_BRIDGE; import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_DESTROY_CONNECTION; import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_MESSAGE_ROUTE; +import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_REMOVE_ADDRESS; import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_SEND; +import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_ADD_ADDRESS; import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_CLOSE_CONSUMER; import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_CLOSE_SESSION; import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_CREATE_CONSUMER; @@ -36,6 +39,7 @@ import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledV import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_DELIVER; import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_DEPLOY_BRIDGE; import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_MESSAGE_ROUTE; +import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_REMOVE_ADDRESS; import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_SEND; import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.MESSAGE_ACKED; import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.MESSAGE_EXPIRED; @@ -127,12 +131,30 @@ public class StompPluginTest extends StompTestBase { AFTER_CREATE_SESSION, BEFORE_CLOSE_SESSION, AFTER_CLOSE_SESSION, BEFORE_CREATE_CONSUMER, AFTER_CREATE_CONSUMER, BEFORE_CLOSE_CONSUMER, AFTER_CLOSE_CONSUMER, BEFORE_CREATE_QUEUE, AFTER_CREATE_QUEUE, MESSAGE_ACKED, BEFORE_SEND, AFTER_SEND, BEFORE_MESSAGE_ROUTE, AFTER_MESSAGE_ROUTE, BEFORE_DELIVER, - AFTER_DELIVER); + AFTER_DELIVER, BEFORE_ADD_ADDRESS, AFTER_ADD_ADDRESS); } catch (Throwable e) { - e.printStackTrace(); + fail(e.getMessage()); } } + @Test + public void testStompAutoCreateAddress() throws Exception { + + URI uri = new URI("ws+v12.stomp://localhost:61613"); + StompClientConnection newConn = StompClientConnectionFactory.createClientConnection(uri); + newConn.connect(defUser, defPass); + + subscribeQueue(newConn, "a-sub", "autoCreated"); + + // unsub + unsubscribe(newConn, "a-sub"); + newConn.disconnect(); + + verifier.validatePluginMethodsAtLeast(1, BEFORE_ADD_ADDRESS, AFTER_ADD_ADDRESS, + BEFORE_REMOVE_ADDRESS, AFTER_REMOVE_ADDRESS); + + } + }