ARTEMIS-1619 - Add plugin support for address lifecyle

Adding callbacks to the plugin API for address creation, update and
removals
This commit is contained in:
Christopher L. Shannon (cshannon) 2018-01-18 10:10:43 -05:00 committed by Justin Bertram
parent 10f1e1223b
commit bc38d7ce32
9 changed files with 295 additions and 25 deletions

View File

@ -433,6 +433,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
private boolean internalAddressInfo(AddressInfo addressInfo, boolean reload) throws Exception {
synchronized (addressLock) {
server.callBrokerPlugins(server.hasBrokerPlugins() ? plugin -> plugin.beforeAddAddress(addressInfo, reload) : null);
boolean result;
if (reload) {
result = addressManager.reloadAddressInfo(addressInfo);
@ -445,6 +446,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
if (!addressInfo.isInternal()) {
managementService.registerAddress(addressInfo);
}
server.callBrokerPlugins(server.hasBrokerPlugins() ? plugin -> plugin.afterAddAddress(addressInfo, reload) : null);
} catch (Exception e) {
e.printStackTrace();
}
@ -517,23 +519,28 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
@Override
public AddressInfo updateAddressInfo(SimpleString addressName,
EnumSet<RoutingType> routingTypes) throws Exception {
synchronized (addressLock) {
return addressManager.updateAddressInfo(addressName, routingTypes);
server.callBrokerPlugins(server.hasBrokerPlugins() ? plugin -> plugin.beforeUpdateAddress(addressName, routingTypes) : null);
final AddressInfo address = addressManager.updateAddressInfo(addressName, routingTypes);
server.callBrokerPlugins(server.hasBrokerPlugins() ? plugin -> plugin.afterUpdateAddress(address) : null);
return address;
}
}
@Override
public AddressInfo removeAddressInfo(SimpleString address) throws Exception {
synchronized (addressLock) {
server.callBrokerPlugins(server.hasBrokerPlugins() ? plugin -> plugin.beforeRemoveAddress(address) : null);
Bindings bindingsForAddress = getBindingsForAddress(address);
if (bindingsForAddress.getBindings().size() > 0) {
throw ActiveMQMessageBundle.BUNDLE.addressHasBindings(address);
}
managementService.unregisterAddress(address);
return addressManager.removeAddressInfo(address);
final AddressInfo addressInfo = addressManager.removeAddressInfo(address);
server.callBrokerPlugins(server.hasBrokerPlugins() ? plugin -> plugin.afterRemoveAddress(address, addressInfo) : null);
return addressInfo;
}
}

View File

@ -1860,6 +1860,8 @@ public class ActiveMQServerImpl implements ActiveMQServer {
queue.deleteQueue(removeConsumers);
callBrokerPlugins(hasBrokerPlugins() ? plugin -> plugin.afterDestroyQueue(queue, address, session, checkConsumerCount,
removeConsumers, autoDeleteAddress) : null);
AddressInfo addressInfo = getAddressInfo(address);
if (autoDeleteAddress && postOffice != null && addressInfo != null && addressInfo.isAutoCreated()) {
@ -1871,9 +1873,6 @@ public class ActiveMQServerImpl implements ActiveMQServer {
}
callPostQueueDeletionCallbacks(address, queueName);
callBrokerPlugins(hasBrokerPlugins() ? plugin -> plugin.afterDestroyQueue(queue, address, session, checkConsumerCount,
removeConsumers, autoDeleteAddress) : null);
}
@Override
@ -2777,10 +2776,10 @@ public class ActiveMQServerImpl implements ActiveMQServer {
managementService.registerQueue(queue, queue.getAddress(), storageManager);
}
callPostQueueCreationCallbacks(queue.getName());
callBrokerPlugins(hasBrokerPlugins() ? plugin -> plugin.afterCreateQueue(queue) : null);
callPostQueueCreationCallbacks(queue.getName());
return queue;
}
@ -2882,10 +2881,10 @@ public class ActiveMQServerImpl implements ActiveMQServer {
managementService.registerQueue(queue, queue.getAddress(), storageManager);
callPostQueueCreationCallbacks(queue.getName());
callBrokerPlugins(hasBrokerPlugins() ? plugin -> plugin.afterCreateQueue(queue) : null);
callPostQueueCreationCallbacks(queue.getName());
return queue;
}

View File

@ -17,6 +17,7 @@
package org.apache.activemq.artemis.core.server.plugin;
import java.util.EnumSet;
import java.util.Map;
import org.apache.activemq.artemis.api.core.ActiveMQException;
@ -36,6 +37,7 @@ import org.apache.activemq.artemis.core.server.ServerConsumer;
import org.apache.activemq.artemis.core.server.ServerSession;
import org.apache.activemq.artemis.core.server.cluster.Bridge;
import org.apache.activemq.artemis.core.server.impl.AckReason;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.spi.core.protocol.SessionCallback;
@ -213,6 +215,71 @@ public interface ActiveMQServerPlugin {
}
/**
* Before an address is added tot he broker
*
* @param addressInfo The addressInfo that will be added
* @param reload If the address is being reloaded
* @throws ActiveMQException
*/
default void beforeAddAddress(AddressInfo addressInfo, boolean reload) throws ActiveMQException {
}
/**
* After an address has been added tot he broker
*
* @param addressInfo The newly added address
* @param reload If the address is being reloaded
* @throws ActiveMQException
*/
default void afterAddAddress(AddressInfo addressInfo, boolean reload) throws ActiveMQException {
}
/**
* Before an address is updated
*
* @param address The existing address info that is about to be updated
* @param routingTypes The new routing types that the address will be updated with
* @throws ActiveMQException
*/
default void beforeUpdateAddress(SimpleString address, EnumSet<RoutingType> routingTypes) throws ActiveMQException {
}
/**
* After an address has been updated
*
* @param addressInfo The newly updated address info
* @throws ActiveMQException
*/
default void afterUpdateAddress(AddressInfo addressInfo) throws ActiveMQException {
}
/**
* Before an address is removed
*
* @param address The address that will be removed
* @throws ActiveMQException
*/
default void beforeRemoveAddress(SimpleString address) throws ActiveMQException {
}
/**
* After an address has been removed
*
* @param address The address that has been removed
* @param addressInfo The address info that has been removed or null if not removed
* @throws ActiveMQException
*/
default void afterRemoveAddress(SimpleString address, AddressInfo addressInfo) throws ActiveMQException {
}
/**
* Before a queue is created
*

View File

@ -16,6 +16,7 @@
*/
package org.apache.activemq.artemis.tests.integration.plugin;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_ADD_ADDRESS;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_CLOSE_CONSUMER;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_CLOSE_SESSION;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_CREATE_CONNECTION;
@ -26,7 +27,9 @@ import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledV
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_DEPLOY_BRIDGE;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_DESTROY_CONNECTION;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_MESSAGE_ROUTE;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_REMOVE_ADDRESS;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_SEND;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_ADD_ADDRESS;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_CLOSE_CONSUMER;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_CLOSE_SESSION;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_CREATE_CONSUMER;
@ -35,6 +38,7 @@ import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledV
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_DELIVER;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_DEPLOY_BRIDGE;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_MESSAGE_ROUTE;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_REMOVE_ADDRESS;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_SEND;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.MESSAGE_ACKED;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.MESSAGE_EXPIRED;
@ -100,7 +104,23 @@ public class AmqpPluginTest extends AmqpClientTestSupport {
BEFORE_CREATE_SESSION, AFTER_CREATE_SESSION, BEFORE_CLOSE_SESSION, AFTER_CLOSE_SESSION,
BEFORE_CREATE_CONSUMER, AFTER_CREATE_CONSUMER, BEFORE_CLOSE_CONSUMER, AFTER_CLOSE_CONSUMER,
BEFORE_CREATE_QUEUE, AFTER_CREATE_QUEUE, MESSAGE_ACKED, BEFORE_SEND,
AFTER_SEND, BEFORE_MESSAGE_ROUTE, AFTER_MESSAGE_ROUTE, BEFORE_DELIVER, AFTER_DELIVER);
AFTER_SEND, BEFORE_MESSAGE_ROUTE, AFTER_MESSAGE_ROUTE, BEFORE_DELIVER, AFTER_DELIVER,
BEFORE_ADD_ADDRESS, AFTER_ADD_ADDRESS);
}
@Test(timeout = 60000)
public void testQueueReceiverAutoCreatedQueue() throws Exception {
AmqpClient client = createAmqpClient();
AmqpConnection connection = addConnection(client.connect());
AmqpSession session = connection.createSession();
AmqpReceiver receiver = session.createReceiver("autoCreated");
receiver.close();
connection.close();
verifier.validatePluginMethodsAtLeast(1, BEFORE_ADD_ADDRESS, AFTER_ADD_ADDRESS);
//should fire once for the auto created address being removed
verifier.validatePluginMethodsEquals(1, BEFORE_REMOVE_ADDRESS, AFTER_REMOVE_ADDRESS);
}
@Override

View File

@ -16,6 +16,7 @@
*/
package org.apache.activemq.artemis.tests.integration.plugin;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_ADD_ADDRESS;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_CLOSE_CONSUMER;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_CLOSE_SESSION;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_CREATE_CONNECTION;
@ -27,8 +28,11 @@ import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledV
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_DESTROY_CONNECTION;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_DESTROY_QUEUE;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_MESSAGE_ROUTE;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_REMOVE_ADDRESS;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_SEND;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_SESSION_METADATA_ADDED;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_UPDATE_ADDRESS;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_ADD_ADDRESS;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_CLOSE_CONSUMER;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_CLOSE_SESSION;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_CREATE_CONSUMER;
@ -38,8 +42,10 @@ import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledV
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_DEPLOY_BRIDGE;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_DESTROY_QUEUE;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_MESSAGE_ROUTE;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_REMOVE_ADDRESS;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_SEND;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_SESSION_METADATA_ADDED;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_UPDATE_ADDRESS;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.MESSAGE_ACKED;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.MESSAGE_EXPIRED;
@ -54,7 +60,9 @@ import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.core.config.BridgeConfiguration;
@ -65,6 +73,7 @@ import org.apache.activemq.artemis.core.remoting.impl.invm.TransportConstants;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.impl.AckReason;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerPlugin;
import org.apache.activemq.artemis.tests.util.JMSTestBase;
import org.junit.Before;
@ -116,11 +125,12 @@ public class CorePluginTest extends JMSTestBase {
conn.close();
verifier.validatePluginMethodsEquals(0, MESSAGE_EXPIRED, BEFORE_DEPLOY_BRIDGE, AFTER_DEPLOY_BRIDGE,
BEFORE_DESTROY_QUEUE, AFTER_DESTROY_QUEUE);
BEFORE_DESTROY_QUEUE, AFTER_DESTROY_QUEUE, BEFORE_UPDATE_ADDRESS, AFTER_UPDATE_ADDRESS,
BEFORE_REMOVE_ADDRESS, AFTER_REMOVE_ADDRESS);
verifier.validatePluginMethodsEquals(1, AFTER_CREATE_CONNECTION, AFTER_DESTROY_CONNECTION,
BEFORE_CREATE_CONSUMER, AFTER_CREATE_CONSUMER, BEFORE_CLOSE_CONSUMER, AFTER_CLOSE_CONSUMER,
BEFORE_CREATE_QUEUE, AFTER_CREATE_QUEUE, MESSAGE_ACKED, BEFORE_SEND, AFTER_SEND, BEFORE_MESSAGE_ROUTE,
AFTER_MESSAGE_ROUTE);
AFTER_MESSAGE_ROUTE, BEFORE_ADD_ADDRESS, AFTER_ADD_ADDRESS);
verifier.validatePluginMethodsEquals(2, BEFORE_CREATE_SESSION, AFTER_CREATE_SESSION, BEFORE_CLOSE_SESSION,
AFTER_CLOSE_SESSION);
@ -139,7 +149,42 @@ public class CorePluginTest extends JMSTestBase {
server.destroyQueue(new SimpleString(queue.getQueueName()));
verifier.validatePluginMethodsEquals(1, BEFORE_CREATE_QUEUE, AFTER_CREATE_QUEUE, BEFORE_DESTROY_QUEUE,
AFTER_DESTROY_QUEUE);
AFTER_DESTROY_QUEUE, BEFORE_ADD_ADDRESS, AFTER_ADD_ADDRESS);
}
@Test
public void testAutoCreateQueue() throws Exception {
conn = cf.createConnection();
conn.start();
Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue autoCreatedQueue = sess.createQueue("autoCreatedQueue");
sess.createConsumer(autoCreatedQueue);
conn.close();
verifier.validatePluginMethodsEquals(1, BEFORE_DESTROY_QUEUE,
AFTER_DESTROY_QUEUE, BEFORE_REMOVE_ADDRESS, AFTER_REMOVE_ADDRESS);
verifier.validatePluginMethodsEquals(2, BEFORE_CREATE_QUEUE, AFTER_CREATE_QUEUE, BEFORE_ADD_ADDRESS,
AFTER_ADD_ADDRESS);
}
@Test
public void testAutoCreateTopic() throws Exception {
conn = cf.createConnection();
conn.start();
Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic autoCreatedTopic = sess.createTopic("autoCreatedTopic");
sess.createConsumer(autoCreatedTopic);
conn.close();
//before/add address called just once to remove autocreated destination
verifier.validatePluginMethodsEquals(1, BEFORE_DESTROY_QUEUE,
AFTER_DESTROY_QUEUE, BEFORE_REMOVE_ADDRESS, AFTER_REMOVE_ADDRESS);
//Before/Add address are called twice because of the autocreated destination and the
//created destination in the before method
verifier.validatePluginMethodsEquals(2, BEFORE_CREATE_QUEUE, AFTER_CREATE_QUEUE, BEFORE_ADD_ADDRESS,
AFTER_ADD_ADDRESS);
}
@Test
@ -162,12 +207,13 @@ public class CorePluginTest extends JMSTestBase {
conn.close();
verifier.validatePluginMethodsEquals(0, BEFORE_DEPLOY_BRIDGE, AFTER_DEPLOY_BRIDGE);
verifier.validatePluginMethodsEquals(0, BEFORE_DEPLOY_BRIDGE, AFTER_DEPLOY_BRIDGE,
BEFORE_REMOVE_ADDRESS, AFTER_REMOVE_ADDRESS);
verifier.validatePluginMethodsAtLeast(1, AFTER_CREATE_CONNECTION, AFTER_DESTROY_CONNECTION,
BEFORE_CREATE_CONSUMER, AFTER_CREATE_CONSUMER, BEFORE_CLOSE_CONSUMER, AFTER_CLOSE_CONSUMER,
BEFORE_SESSION_METADATA_ADDED, AFTER_SESSION_METADATA_ADDED,
BEFORE_SESSION_METADATA_ADDED, AFTER_SESSION_METADATA_ADDED, BEFORE_ADD_ADDRESS, AFTER_ADD_ADDRESS,
BEFORE_CREATE_QUEUE, AFTER_CREATE_QUEUE, MESSAGE_ACKED, BEFORE_SEND, AFTER_SEND, BEFORE_MESSAGE_ROUTE,
AFTER_MESSAGE_ROUTE, MESSAGE_EXPIRED);
AFTER_MESSAGE_ROUTE, MESSAGE_EXPIRED, BEFORE_ADD_ADDRESS, AFTER_ADD_ADDRESS);
verifier.validatePluginMethodsEquals(2, BEFORE_CREATE_SESSION, AFTER_CREATE_SESSION, BEFORE_CLOSE_SESSION,
AFTER_CLOSE_SESSION);
@ -194,11 +240,13 @@ public class CorePluginTest extends JMSTestBase {
conn.close();
verifier.validatePluginMethodsEquals(0, BEFORE_DEPLOY_BRIDGE, AFTER_DEPLOY_BRIDGE);
verifier.validatePluginMethodsEquals(0, BEFORE_DEPLOY_BRIDGE, AFTER_DEPLOY_BRIDGE,
BEFORE_REMOVE_ADDRESS, AFTER_REMOVE_ADDRESS);
verifier.validatePluginMethodsAtLeast(1, AFTER_CREATE_CONNECTION, AFTER_DESTROY_CONNECTION,
BEFORE_CREATE_CONSUMER, AFTER_CREATE_CONSUMER, BEFORE_CLOSE_CONSUMER, AFTER_CLOSE_CONSUMER,
BEFORE_CREATE_QUEUE, AFTER_CREATE_QUEUE, MESSAGE_ACKED, BEFORE_SEND, AFTER_SEND, BEFORE_MESSAGE_ROUTE,
AFTER_MESSAGE_ROUTE, BEFORE_DELIVER, AFTER_DELIVER, MESSAGE_EXPIRED);
AFTER_MESSAGE_ROUTE, BEFORE_DELIVER, AFTER_DELIVER, MESSAGE_EXPIRED, BEFORE_ADD_ADDRESS,
AFTER_ADD_ADDRESS);
verifier.validatePluginMethodsEquals(2, BEFORE_CREATE_SESSION, AFTER_CREATE_SESSION, BEFORE_CLOSE_SESSION,
AFTER_CLOSE_SESSION);
@ -257,6 +305,15 @@ public class CorePluginTest extends JMSTestBase {
}
@Test
public void testUpdateAddress() throws Exception {
server.addOrUpdateAddressInfo(new AddressInfo(SimpleString.toSimpleString("test"), RoutingType.ANYCAST));
server.addOrUpdateAddressInfo(new AddressInfo(SimpleString.toSimpleString("test"), RoutingType.MULTICAST));
verifier.validatePluginMethodsEquals(1, BEFORE_UPDATE_ADDRESS, AFTER_UPDATE_ADDRESS);
}
private class ExpiredPluginVerifier implements ActiveMQServerPlugin {
@Override

View File

@ -7,12 +7,14 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.util.Arrays;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import com.google.common.base.Preconditions;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
@ -29,6 +31,7 @@ import org.apache.activemq.artemis.core.server.ServerConsumer;
import org.apache.activemq.artemis.core.server.ServerSession;
import org.apache.activemq.artemis.core.server.cluster.Bridge;
import org.apache.activemq.artemis.core.server.impl.AckReason;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerPlugin;
import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
@ -67,6 +70,12 @@ public class MethodCalledVerifier implements ActiveMQServerPlugin {
public static final String AFTER_CREATE_CONSUMER = "afterCreateConsumer";
public static final String BEFORE_CLOSE_CONSUMER = "beforeCloseConsumer";
public static final String AFTER_CLOSE_CONSUMER = "afterCloseConsumer";
public static final String BEFORE_ADD_ADDRESS = "beforeAddAddress";
public static final String AFTER_ADD_ADDRESS = "afterAddAddress";
public static final String BEFORE_UPDATE_ADDRESS = "beforeUpdateAddress";
public static final String AFTER_UPDATE_ADDRESS = "afterUpdateAddress";
public static final String BEFORE_REMOVE_ADDRESS = "beforeRemoveAddress";
public static final String AFTER_REMOVE_ADDRESS = "afterRemoveAddress";
public static final String BEFORE_CREATE_QUEUE = "beforeCreateQueue";
public static final String AFTER_CREATE_QUEUE = "afterCreateQueue";
public static final String BEFORE_DESTROY_QUEUE = "beforeDestroyQueue";
@ -167,6 +176,45 @@ public class MethodCalledVerifier implements ActiveMQServerPlugin {
methodCalled(AFTER_CLOSE_CONSUMER);
}
@Override
public void beforeAddAddress(AddressInfo addressInfo, boolean reload) throws ActiveMQException {
Preconditions.checkNotNull(addressInfo);
methodCalled(BEFORE_ADD_ADDRESS);
}
@Override
public void afterAddAddress(AddressInfo addressInfo, boolean reload) throws ActiveMQException {
Preconditions.checkNotNull(addressInfo);
methodCalled(AFTER_ADD_ADDRESS);
}
@Override
public void beforeUpdateAddress(SimpleString address, EnumSet<RoutingType> routingTypes)
throws ActiveMQException {
Preconditions.checkNotNull(address);
Preconditions.checkNotNull(routingTypes);
methodCalled(BEFORE_UPDATE_ADDRESS);
}
@Override
public void afterUpdateAddress(AddressInfo addressInfo) throws ActiveMQException {
Preconditions.checkNotNull(addressInfo);
methodCalled(AFTER_UPDATE_ADDRESS);
}
@Override
public void beforeRemoveAddress(SimpleString address) throws ActiveMQException {
Preconditions.checkNotNull(address);
methodCalled(BEFORE_REMOVE_ADDRESS);
}
@Override
public void afterRemoveAddress(SimpleString address, AddressInfo addressInfo) throws ActiveMQException {
Preconditions.checkNotNull(address);
Preconditions.checkNotNull(addressInfo);
methodCalled(AFTER_REMOVE_ADDRESS);
}
@Override
public void beforeCreateQueue(QueueConfig queueConfig) {
Preconditions.checkNotNull(queueConfig);

View File

@ -22,10 +22,12 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.tests.integration.mqtt.imported.MQTTClientProvider;
import org.apache.activemq.artemis.tests.integration.mqtt.imported.MQTTTestSupport;
import org.junit.Test;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_ADD_ADDRESS;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_CLOSE_CONSUMER;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_CLOSE_SESSION;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_CREATE_CONNECTION;
@ -36,7 +38,9 @@ import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledV
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_DEPLOY_BRIDGE;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_DESTROY_CONNECTION;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_MESSAGE_ROUTE;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_REMOVE_ADDRESS;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_SEND;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_ADD_ADDRESS;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_CLOSE_CONSUMER;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_CLOSE_SESSION;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_CREATE_CONSUMER;
@ -45,6 +49,7 @@ import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledV
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_DELIVER;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_DEPLOY_BRIDGE;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_MESSAGE_ROUTE;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_REMOVE_ADDRESS;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_SEND;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.MESSAGE_ACKED;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.MESSAGE_EXPIRED;
@ -59,6 +64,11 @@ public class MqttPluginTest extends MQTTTestSupport {
public void configureBroker() throws Exception {
super.configureBroker();
server.registerBrokerPlugin(verifier);
AddressSettings addressSettings = new AddressSettings();
addressSettings.setAutoDeleteQueues(true).setAutoDeleteAddresses(true);
server.getAddressSettingsRepository().addMatch("#", addressSettings);
}
@Test(timeout = 60 * 1000)
@ -106,7 +116,20 @@ public class MqttPluginTest extends MQTTTestSupport {
AFTER_CREATE_SESSION, BEFORE_CLOSE_SESSION, AFTER_CLOSE_SESSION, BEFORE_CREATE_CONSUMER,
AFTER_CREATE_CONSUMER, BEFORE_CLOSE_CONSUMER, AFTER_CLOSE_CONSUMER, BEFORE_CREATE_QUEUE, AFTER_CREATE_QUEUE,
MESSAGE_ACKED, BEFORE_SEND, AFTER_SEND, BEFORE_MESSAGE_ROUTE, AFTER_MESSAGE_ROUTE, BEFORE_DELIVER,
AFTER_DELIVER);
AFTER_DELIVER, BEFORE_ADD_ADDRESS, AFTER_ADD_ADDRESS);
}
@Test(timeout = 60 * 1000)
public void testMQTTAutoCreate() throws Exception {
final MQTTClientProvider subscriptionProvider = getMQTTClientProvider();
initializeConnection(subscriptionProvider);
subscriptionProvider.subscribe("foo/bah", AT_MOST_ONCE);
subscriptionProvider.disconnect();
verifier.validatePluginMethodsAtLeast(1,
BEFORE_ADD_ADDRESS, AFTER_ADD_ADDRESS, BEFORE_REMOVE_ADDRESS, AFTER_REMOVE_ADDRESS);
}
}

View File

@ -16,6 +16,7 @@
*/
package org.apache.activemq.artemis.tests.integration.plugin;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_ADD_ADDRESS;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_CLOSE_CONSUMER;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_CLOSE_SESSION;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_CREATE_CONNECTION;
@ -26,8 +27,10 @@ import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledV
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_DEPLOY_BRIDGE;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_DESTROY_CONNECTION;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_MESSAGE_ROUTE;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_REMOVE_ADDRESS;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_SEND;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_SESSION_METADATA_ADDED;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_ADD_ADDRESS;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_CLOSE_CONSUMER;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_CLOSE_SESSION;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_CREATE_CONSUMER;
@ -36,6 +39,7 @@ import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledV
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_DELIVER;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_DEPLOY_BRIDGE;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_MESSAGE_ROUTE;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_REMOVE_ADDRESS;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_SEND;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_SESSION_METADATA_ADDED;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.MESSAGE_ACKED;
@ -68,6 +72,10 @@ public class OpenwirePluginTest extends BasicOpenWireTest {
long maxAddressSize, Map<String, AddressSettings> settings) {
ActiveMQServer server = super.createServer(realFiles, configuration, pageSize, maxAddressSize, settings);
server.registerBrokerPlugin(verifier);
configuration.getAddressesSettings().put("autoCreated", new AddressSettings().setAutoDeleteAddresses(true)
.setAutoDeleteQueues(true).setAutoCreateQueues(true).setAutoCreateAddresses(true));
return server;
}
@ -102,7 +110,26 @@ public class OpenwirePluginTest extends BasicOpenWireTest {
AFTER_CREATE_SESSION, BEFORE_CLOSE_SESSION, AFTER_CLOSE_SESSION, BEFORE_CREATE_CONSUMER,
AFTER_CREATE_CONSUMER, BEFORE_CLOSE_CONSUMER, AFTER_CLOSE_CONSUMER, BEFORE_CREATE_QUEUE, AFTER_CREATE_QUEUE,
MESSAGE_ACKED, BEFORE_SEND, AFTER_SEND, BEFORE_MESSAGE_ROUTE, AFTER_MESSAGE_ROUTE, BEFORE_DELIVER,
AFTER_DELIVER);
AFTER_DELIVER, BEFORE_ADD_ADDRESS, AFTER_ADD_ADDRESS);
}
@Test
public void testAutoCreatedQueue() throws JMSException {
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue("autoCreated");
// Reset the session.
session.close();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
session.createConsumer(queue);
session.close();
connection.close();
//Happens more than once because of advisories
verifier.validatePluginMethodsAtLeast(2, BEFORE_ADD_ADDRESS, AFTER_ADD_ADDRESS,
BEFORE_REMOVE_ADDRESS, AFTER_REMOVE_ADDRESS);
}

View File

@ -17,6 +17,7 @@
package org.apache.activemq.artemis.tests.integration.plugin;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_ADD_ADDRESS;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_CLOSE_CONSUMER;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_CLOSE_SESSION;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_CREATE_CONNECTION;
@ -27,7 +28,9 @@ import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledV
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_DEPLOY_BRIDGE;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_DESTROY_CONNECTION;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_MESSAGE_ROUTE;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_REMOVE_ADDRESS;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_SEND;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_ADD_ADDRESS;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_CLOSE_CONSUMER;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_CLOSE_SESSION;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_CREATE_CONSUMER;
@ -36,6 +39,7 @@ import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledV
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_DELIVER;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_DEPLOY_BRIDGE;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_MESSAGE_ROUTE;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_REMOVE_ADDRESS;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_SEND;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.MESSAGE_ACKED;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.MESSAGE_EXPIRED;
@ -127,12 +131,30 @@ public class StompPluginTest extends StompTestBase {
AFTER_CREATE_SESSION, BEFORE_CLOSE_SESSION, AFTER_CLOSE_SESSION, BEFORE_CREATE_CONSUMER,
AFTER_CREATE_CONSUMER, BEFORE_CLOSE_CONSUMER, AFTER_CLOSE_CONSUMER, BEFORE_CREATE_QUEUE, AFTER_CREATE_QUEUE,
MESSAGE_ACKED, BEFORE_SEND, AFTER_SEND, BEFORE_MESSAGE_ROUTE, AFTER_MESSAGE_ROUTE, BEFORE_DELIVER,
AFTER_DELIVER);
AFTER_DELIVER, BEFORE_ADD_ADDRESS, AFTER_ADD_ADDRESS);
} catch (Throwable e) {
e.printStackTrace();
fail(e.getMessage());
}
}
@Test
public void testStompAutoCreateAddress() throws Exception {
URI uri = new URI("ws+v12.stomp://localhost:61613");
StompClientConnection newConn = StompClientConnectionFactory.createClientConnection(uri);
newConn.connect(defUser, defPass);
subscribeQueue(newConn, "a-sub", "autoCreated");
// unsub
unsubscribe(newConn, "a-sub");
newConn.disconnect();
verifier.validatePluginMethodsAtLeast(1, BEFORE_ADD_ADDRESS, AFTER_ADD_ADDRESS,
BEFORE_REMOVE_ADDRESS, AFTER_REMOVE_ADDRESS);
}
}