diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/CoreNotificationType.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/CoreNotificationType.java index ff514ad8d4..686400ef7a 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/CoreNotificationType.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/CoreNotificationType.java @@ -39,7 +39,15 @@ public enum CoreNotificationType implements NotificationType { PROPOSAL(18), PROPOSAL_RESPONSE(19), UNPROPOSAL(20), - CONSUMER_SLOW(21); + CONSUMER_SLOW(21), + ADDRESS_ADDED(22), + ADDRESS_REMOVED(23), + CONNECTION_CREATED(24), + CONNECTION_DESTROYED(25), + SESSION_CREATED(26), + SESSION_CLOSED(27), + MESSAGE_DELIVERED(28), + MESSAGE_EXPIRED(29); private final int value; diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ManagementHelper.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ManagementHelper.java index 66180f26bd..a682eba9b7 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ManagementHelper.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ManagementHelper.java @@ -48,6 +48,8 @@ public final class ManagementHelper { public static final SimpleString HDR_ADDRESS = new SimpleString("_AMQ_Address"); + public static final SimpleString HDR_ROUTING_TYPE = new SimpleString("_AMQ_Routing_Type"); + public static final SimpleString HDR_BINDING_ID = new SimpleString("_AMQ_Binding_ID"); public static final SimpleString HDR_BINDING_TYPE = new SimpleString("_AMQ_Binding_Type"); @@ -76,6 +78,8 @@ public final class ManagementHelper { public static final SimpleString HDR_CONNECTION_NAME = new SimpleString("_AMQ_ConnectionName"); + public static final SimpleString HDR_MESSAGE_ID = new SimpleString("_AMQ_Message_ID"); + // Attributes ---------------------------------------------------- // Static -------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java index 1b4b0d1e81..96076cf4cb 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java @@ -1959,16 +1959,19 @@ public class ActiveMQServerImpl implements ActiveMQServer { @Override public void registerBrokerPlugins(final List plugins) { configuration.registerBrokerPlugins(plugins); + plugins.forEach(plugin -> plugin.registered(this)); } @Override public void registerBrokerPlugin(final ActiveMQServerPlugin plugin) { configuration.registerBrokerPlugin(plugin); + plugin.registered(this); } @Override public void unRegisterBrokerPlugin(final ActiveMQServerPlugin plugin) { configuration.unRegisterBrokerPlugin(plugin); + plugin.unregistered(this); } @Override @@ -2340,6 +2343,10 @@ public class ActiveMQServerImpl implements ActiveMQServer { reloadManager.addCallback(configuration.getConfigurationUrl(), new ConfigurationFileReloader()); } + if (hasBrokerPlugins()) { + callBrokerPlugins(plugin -> plugin.registered(this)); + } + return true; } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/ActiveMQServerPlugin.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/ActiveMQServerPlugin.java index 25e759fc89..ed00ab05c6 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/ActiveMQServerPlugin.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/ActiveMQServerPlugin.java @@ -30,6 +30,7 @@ import org.apache.activemq.artemis.core.postoffice.Binding; import org.apache.activemq.artemis.core.postoffice.QueueBinding; import org.apache.activemq.artemis.core.postoffice.RoutingStatus; import org.apache.activemq.artemis.core.security.SecurityAuth; +import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.MessageReference; import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.server.QueueConfig; @@ -49,6 +50,23 @@ import org.apache.activemq.artemis.utils.critical.CriticalComponent; */ public interface ActiveMQServerPlugin { + /** + * The plugin has been registered with the server + * + * @param server The ActiveMQServer the plugin has been registered to + */ + default void registered(ActiveMQServer server) { + + } + + /** + * The plugin has been unregistered with the server + * + * @param server The ActiveMQServer the plugin has been unregistered to + */ + default void unregistered(ActiveMQServer server) { + + } /** * A connection has been created. diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/impl/NotificationActiveMQServerPlugin.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/impl/NotificationActiveMQServerPlugin.java new file mode 100644 index 0000000000..abaa27f40e --- /dev/null +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/impl/NotificationActiveMQServerPlugin.java @@ -0,0 +1,287 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.core.server.plugin.impl; + +import java.util.Map; +import java.util.concurrent.atomic.AtomicReference; + +import org.apache.activemq.artemis.api.core.ActiveMQException; +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.api.core.management.CoreNotificationType; +import org.apache.activemq.artemis.api.core.management.ManagementHelper; +import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.core.server.MessageReference; +import org.apache.activemq.artemis.core.server.ServerConsumer; +import org.apache.activemq.artemis.core.server.ServerSession; +import org.apache.activemq.artemis.core.server.impl.AddressInfo; +import org.apache.activemq.artemis.core.server.management.ManagementService; +import org.apache.activemq.artemis.core.server.management.Notification; +import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerPlugin; +import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; +import org.apache.activemq.artemis.utils.collections.TypedProperties; +import org.jboss.logging.Logger; + +/** + * + */ +public class NotificationActiveMQServerPlugin implements ActiveMQServerPlugin { + + private static final Logger logger = Logger.getLogger(NotificationActiveMQServerPlugin.class); + + public static final String SEND_CONNECTION_NOTIFICATIONS = "SEND_CONNECTION_NOTIFICATIONS"; + public static final String SEND_SESSION_NOTIFICATIONS = "SEND_SESSION_NOTIFICATIONS"; + public static final String SEND_ADDRESS_NOTIFICATIONS = "SEND_ADDRESS_NOTIFICATIONS"; + public static final String SEND_DELIVERED_NOTIFICATIONS = "SEND_DELIVERED_NOTIFICATIONS"; + public static final String SEND_EXPIRED_NOTIFICATIONS = "SEND_EXPIRED_NOTIFICATIONS"; + + private boolean sendConnectionNotifications; + private boolean sendSessionNotifications; + private boolean sendAddressNotifications; + private boolean sendDeliveredNotifications; + private boolean sendExpiredNotifications; + + + private final AtomicReference managementService = new AtomicReference<>(); + + /** + * used to pass configured properties to Plugin + * + * @param properties + */ + @Override + public void init(Map properties) { + sendConnectionNotifications = Boolean.parseBoolean(properties.getOrDefault(SEND_CONNECTION_NOTIFICATIONS, + Boolean.FALSE.toString())); + sendSessionNotifications = Boolean.parseBoolean(properties.getOrDefault(SEND_SESSION_NOTIFICATIONS, + Boolean.FALSE.toString())); + sendAddressNotifications = Boolean.parseBoolean(properties.getOrDefault(SEND_ADDRESS_NOTIFICATIONS, + Boolean.FALSE.toString())); + sendDeliveredNotifications = Boolean.parseBoolean(properties.getOrDefault(SEND_DELIVERED_NOTIFICATIONS, + Boolean.FALSE.toString())); + sendExpiredNotifications = Boolean.parseBoolean(properties.getOrDefault(SEND_EXPIRED_NOTIFICATIONS, + Boolean.FALSE.toString())); + } + + @Override + public void registered(ActiveMQServer server) { + managementService.set(server.getManagementService()); + } + + @Override + public void unregistered(ActiveMQServer server) { + managementService.set(null); + } + + @Override + public void afterCreateConnection(RemotingConnection connection) throws ActiveMQException { + sendConnectionNotification(connection, CoreNotificationType.CONNECTION_CREATED); + } + + @Override + public void afterDestroyConnection(RemotingConnection connection) throws ActiveMQException { + sendConnectionNotification(connection, CoreNotificationType.CONNECTION_DESTROYED); + } + + @Override + public void afterCreateSession(ServerSession session) throws ActiveMQException { + sendSessionNotification(session, CoreNotificationType.SESSION_CREATED); + } + + @Override + public void afterCloseSession(ServerSession session, boolean failed) throws ActiveMQException { + sendSessionNotification(session, CoreNotificationType.SESSION_CLOSED); + } + + @Override + public void afterAddAddress(AddressInfo addressInfo, boolean reload) throws ActiveMQException { + sendAddressNotification(addressInfo, CoreNotificationType.ADDRESS_ADDED); + } + + @Override + public void afterRemoveAddress(SimpleString address, AddressInfo addressInfo) throws ActiveMQException { + sendAddressNotification(addressInfo, CoreNotificationType.ADDRESS_REMOVED); + } + + @Override + public void afterDeliver(ServerConsumer consumer, MessageReference reference) throws ActiveMQException { + final ManagementService managementService = getManagementService(); + + if (managementService != null && sendDeliveredNotifications) { + try { + if (!reference.getQueue().getAddress().equals(managementService.getManagementNotificationAddress())) { + final TypedProperties props = new TypedProperties(); + props.putSimpleStringProperty(ManagementHelper.HDR_ADDRESS, consumer.getQueueAddress()); + props.putByteProperty(ManagementHelper.HDR_ROUTING_TYPE, consumer.getQueueType().getType()); + props.putSimpleStringProperty(ManagementHelper.HDR_ROUTING_NAME, consumer.getQueueName()); + props.putLongProperty(ManagementHelper.HDR_CONSUMER_NAME, consumer.getID()); + props.putLongProperty(ManagementHelper.HDR_MESSAGE_ID, reference.getMessageID()); + + managementService.sendNotification(new Notification(null, CoreNotificationType.MESSAGE_DELIVERED, props)); + } + } catch (Exception e) { + logger.warn("Error sending notification: " + CoreNotificationType.MESSAGE_DELIVERED, e.getMessage(), e); + } + } + } + + @Override + public void messageExpired(MessageReference message, SimpleString messageExpiryAddress) throws ActiveMQException { + final ManagementService managementService = getManagementService(); + + if (managementService != null && sendExpiredNotifications) { + try { + if (!message.getQueue().getAddress().equals(managementService.getManagementNotificationAddress())) { + final TypedProperties props = new TypedProperties(); + props.putSimpleStringProperty(ManagementHelper.HDR_ADDRESS, message.getQueue().getAddress()); + props.putByteProperty(ManagementHelper.HDR_ROUTING_TYPE, message.getQueue().getRoutingType().getType()); + props.putSimpleStringProperty(ManagementHelper.HDR_ROUTING_NAME, message.getQueue().getName()); + props.putLongProperty(ManagementHelper.HDR_MESSAGE_ID, message.getMessageID()); + if (message.hasConsumerId()) { + props.putLongProperty(ManagementHelper.HDR_CONSUMER_NAME, message.getConsumerId()); + } + + managementService.sendNotification(new Notification(null, CoreNotificationType.MESSAGE_EXPIRED, props)); + } + } catch (Exception e) { + logger.warn("Error sending notification: " + CoreNotificationType.MESSAGE_EXPIRED, e.getMessage(), e); + } + } + } + + private void sendAddressNotification(AddressInfo addressInfo, final CoreNotificationType type) { + final ManagementService managementService = getManagementService(); + + if (managementService != null && sendAddressNotifications) { + try { + final TypedProperties props = new TypedProperties(); + props.putSimpleStringProperty(ManagementHelper.HDR_ADDRESS, addressInfo.getName()); + props.putByteProperty(ManagementHelper.HDR_ROUTING_TYPE, addressInfo.getRoutingType().getType()); + + managementService.sendNotification(new Notification(null, type, props)); + } catch (Exception e) { + logger.warn("Error sending notification: " + type, e.getMessage(), e); + } + } + } + + private void sendConnectionNotification(final RemotingConnection connection, final CoreNotificationType type) { + final ManagementService managementService = getManagementService(); + + if (managementService != null && sendConnectionNotifications) { + try { + final TypedProperties props = new TypedProperties(); + props.putSimpleStringProperty(ManagementHelper.HDR_CONNECTION_NAME, SimpleString.toSimpleString(connection.getID().toString())); + props.putSimpleStringProperty(ManagementHelper.HDR_REMOTE_ADDRESS, SimpleString.toSimpleString(connection.getRemoteAddress())); + + managementService.sendNotification(new Notification(null, type, props)); + } catch (Exception e) { + logger.warn("Error sending notification: " + type, e.getMessage(), e); + } + } + } + + private void sendSessionNotification(final ServerSession session, final CoreNotificationType type) { + final ManagementService managementService = getManagementService(); + + if (managementService != null && sendSessionNotifications) { + try { + final TypedProperties props = new TypedProperties(); + props.putSimpleStringProperty(ManagementHelper.HDR_CONNECTION_NAME, SimpleString.toSimpleString(session.getConnectionID().toString())); + props.putSimpleStringProperty(ManagementHelper.HDR_USER, SimpleString.toSimpleString(session.getUsername())); + props.putSimpleStringProperty(ManagementHelper.HDR_SESSION_NAME, SimpleString.toSimpleString(session.getName())); + + managementService.sendNotification(new Notification(null, type, props)); + } catch (Exception e) { + logger.warn("Error sending notification: " + type, e.getMessage(), e); + } + } + } + + /** + * @return the sendConnectionNotifications + */ + public boolean isSendConnectionNotifications() { + return sendConnectionNotifications; + } + + /** + * @param sendConnectionNotifications the sendConnectionNotifications to set + */ + public void setSendConnectionNotifications(boolean sendConnectionNotifications) { + this.sendConnectionNotifications = sendConnectionNotifications; + } + + /** + * @return the sendSessionNotifications + */ + public boolean isSendSessionNotifications() { + return sendSessionNotifications; + } + + /** + * @param sendSessionNotifications the sendSessionNotifications to set + */ + public void setSendSessionNotifications(boolean sendSessionNotifications) { + this.sendSessionNotifications = sendSessionNotifications; + } + + /** + * @return the sendDeliveredNotifications + */ + public boolean isSendDeliveredNotifications() { + return sendDeliveredNotifications; + } + + /** + * @param sendDeliveredNotifications the sendDeliveredNotifications to set + */ + public void setSendDeliveredNotifications(boolean sendDeliveredNotifications) { + this.sendDeliveredNotifications = sendDeliveredNotifications; + } + + /** + * @return the sendExpiredNotifications + */ + public boolean isSendExpiredNotifications() { + return sendExpiredNotifications; + } + + /** + * @param sendExpiredNotifications the sendExpiredNotifications to set + */ + public void setSendExpiredNotifications(boolean sendExpiredNotifications) { + this.sendExpiredNotifications = sendExpiredNotifications; + } + + /** + * @return the sendAddressNotifications + */ + public boolean isSendAddressNotifications() { + return sendAddressNotifications; + } + + /** + * @param sendAddressNotifications the sendAddressNotifications to set + */ + public void setSendAddressNotifications(boolean sendAddressNotifications) { + this.sendAddressNotifications = sendAddressNotifications; + } + + private ManagementService getManagementService() { + return this.managementService.get(); + } +} diff --git a/docs/user-manual/en/broker-plugins.md b/docs/user-manual/en/broker-plugins.md index d0a704042d..6fd03a810b 100644 --- a/docs/user-manual/en/broker-plugins.md +++ b/docs/user-manual/en/broker-plugins.md @@ -131,3 +131,66 @@ Most events in the LoggingActiveMQServerPlugin follow a `beforeX` and `afterX` n At Log Level `INFO`, the LoggingActiveMQServerPlugin logs an entry when an `afterX` notification occurs. By setting the Logger "org.apache.activemq.artemis.core.server.plugin.impl" to `DEBUG` Level, log entries are generated for both `beforeX` and `afterX` notifications. Log Level `DEBUG` will also log more information for a notification when available. + +## Using the NotificationActiveMQServerPlugin + +The NotificationActiveMQServerPlugin can be configured to send extra notifications for specific broker events. + +You can select which notifications are sent by setting the following configuration properties to `true`. + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
PropertyProperty Description
SEND_CONNECTION_NOTIFICATIONSSends a notification when a Connection is created/destroy. Default `false`.
SEND_SESSION_NOTIFICATIONSSends a notification when a Session is created/closed. Default `false`.
SEND_ADDRESS_NOTIFICATIONSSends a notification when an Address is added/removed. Default `false`.
SEND_DELIVERED_NOTIFICATIONSSends a notification when message is delivered to a consumer. Default `false`
SEND_EXPIRED_NOTIFICATIONSSends a notification when message has been expired by the broker. Default `false`
+ +By default the NotificationActiveMQServerPlugin will not send any notifications. The plugin is activated by setting one (or a selection) +of the above configuration properties to `true`. + +To configure the plugin, you can add the following configuration to the broker. In the example below both SEND_CONNECTION_NOTIFICATIONS +and SEND_SESSION_NOTIFICATIONS will be sent by the broker. + +```xml + + +... + + + + + + +... + + +``` + diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/NotificationTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/NotificationTest.java index e0e6f4cfb9..09aec0e0f7 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/NotificationTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/NotificationTest.java @@ -18,9 +18,11 @@ package org.apache.activemq.artemis.tests.integration.management; import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration; import org.apache.activemq.artemis.api.core.ActiveMQException; +import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.client.ClientConsumer; import org.apache.activemq.artemis.api.core.client.ClientMessage; +import org.apache.activemq.artemis.api.core.client.ClientProducer; import org.apache.activemq.artemis.api.core.client.ClientSession; import org.apache.activemq.artemis.api.core.client.ClientSessionFactory; import org.apache.activemq.artemis.api.core.client.ServerLocator; @@ -28,16 +30,25 @@ import org.apache.activemq.artemis.api.core.management.ManagementHelper; import org.apache.activemq.artemis.core.client.impl.ClientSessionInternal; import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.ActiveMQServers; +import org.apache.activemq.artemis.core.server.plugin.impl.NotificationActiveMQServerPlugin; import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; import org.apache.activemq.artemis.utils.RandomUtil; import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import static org.apache.activemq.artemis.api.core.management.CoreNotificationType.CONNECTION_CREATED; +import static org.apache.activemq.artemis.api.core.management.CoreNotificationType.CONNECTION_DESTROYED; +import static org.apache.activemq.artemis.api.core.management.CoreNotificationType.SESSION_CREATED; +import static org.apache.activemq.artemis.api.core.management.CoreNotificationType.SESSION_CLOSED; +import static org.apache.activemq.artemis.api.core.management.CoreNotificationType.ADDRESS_ADDED; +import static org.apache.activemq.artemis.api.core.management.CoreNotificationType.ADDRESS_REMOVED; import static org.apache.activemq.artemis.api.core.management.CoreNotificationType.BINDING_ADDED; import static org.apache.activemq.artemis.api.core.management.CoreNotificationType.BINDING_REMOVED; import static org.apache.activemq.artemis.api.core.management.CoreNotificationType.CONSUMER_CLOSED; import static org.apache.activemq.artemis.api.core.management.CoreNotificationType.CONSUMER_CREATED; +import static org.apache.activemq.artemis.api.core.management.CoreNotificationType.MESSAGE_DELIVERED; +import static org.apache.activemq.artemis.api.core.management.CoreNotificationType.MESSAGE_EXPIRED; public class NotificationTest extends ActiveMQTestBase { @@ -70,10 +81,11 @@ public class NotificationTest extends ActiveMQTestBase { session.createQueue(address, queue, durable); - ClientMessage[] notifications = NotificationTest.consumeMessages(1, notifConsumer); - Assert.assertEquals(BINDING_ADDED.toString(), notifications[0].getObjectProperty(ManagementHelper.HDR_NOTIFICATION_TYPE).toString()); - Assert.assertEquals(queue.toString(), notifications[0].getObjectProperty(ManagementHelper.HDR_ROUTING_NAME).toString()); - Assert.assertEquals(address.toString(), notifications[0].getObjectProperty(ManagementHelper.HDR_ADDRESS).toString()); + //the first message received will be for the address creation + ClientMessage[] notifications = NotificationTest.consumeMessages(2, notifConsumer); + Assert.assertEquals(BINDING_ADDED.toString(), notifications[1].getObjectProperty(ManagementHelper.HDR_NOTIFICATION_TYPE).toString()); + Assert.assertEquals(queue.toString(), notifications[1].getObjectProperty(ManagementHelper.HDR_ROUTING_NAME).toString()); + Assert.assertEquals(address.toString(), notifications[1].getObjectProperty(ManagementHelper.HDR_ADDRESS).toString()); session.deleteQueue(queue); } @@ -110,8 +122,7 @@ public class NotificationTest extends ActiveMQTestBase { System.out.println(queue); notifConsumer.close(); notifConsumer = session.createConsumer(notifQueue.toString(), ManagementHelper.HDR_ROUTING_NAME + " <> '" + - queue + - "'"); + queue + "' AND " + ManagementHelper.HDR_ADDRESS + " <> '" + address + "'"); NotificationTest.flush(notifConsumer); session.createQueue(address, queue, durable); @@ -133,7 +144,8 @@ public class NotificationTest extends ActiveMQTestBase { session.deleteQueue(queue); - ClientMessage[] notifications = NotificationTest.consumeMessages(1, notifConsumer); + //There will be 2 notifications, first is for binding removal, second is for address removal + ClientMessage[] notifications = NotificationTest.consumeMessages(2, notifConsumer); Assert.assertEquals(BINDING_REMOVED.toString(), notifications[0].getObjectProperty(ManagementHelper.HDR_NOTIFICATION_TYPE).toString()); Assert.assertEquals(queue.toString(), notifications[0].getObjectProperty(ManagementHelper.HDR_ROUTING_NAME).toString()); Assert.assertEquals(address.toString(), notifications[0].getObjectProperty(ManagementHelper.HDR_ADDRESS).toString()); @@ -201,6 +213,140 @@ public class NotificationTest extends ActiveMQTestBase { session.deleteQueue(queue); } + @Test + public void testAddressAdded() throws Exception { + SimpleString address = RandomUtil.randomSimpleString(); + + NotificationTest.flush(notifConsumer); + + session.createAddress(address, RoutingType.ANYCAST, true); + + ClientMessage[] notifications = NotificationTest.consumeMessages(1, notifConsumer); + Assert.assertEquals(ADDRESS_ADDED.toString(), notifications[0].getObjectProperty(ManagementHelper.HDR_NOTIFICATION_TYPE).toString()); + Assert.assertEquals(RoutingType.ANYCAST.getType(), notifications[0].getObjectProperty(ManagementHelper.HDR_ROUTING_TYPE)); + Assert.assertEquals(address.toString(), notifications[0].getObjectProperty(ManagementHelper.HDR_ADDRESS).toString()); + + } + + @Test + public void testAddressRemoved() throws Exception { + SimpleString address = RandomUtil.randomSimpleString(); + session.createAddress(address, RoutingType.ANYCAST, true); + NotificationTest.flush(notifConsumer); + + server.getPostOffice().removeAddressInfo(address); + + ClientMessage[] notifications = NotificationTest.consumeMessages(1, notifConsumer); + Assert.assertEquals(ADDRESS_REMOVED.toString(), notifications[0].getObjectProperty(ManagementHelper.HDR_NOTIFICATION_TYPE).toString()); + Assert.assertEquals(RoutingType.ANYCAST.getType(), notifications[0].getObjectProperty(ManagementHelper.HDR_ROUTING_TYPE)); + Assert.assertEquals(address.toString(), notifications[0].getObjectProperty(ManagementHelper.HDR_ADDRESS).toString()); + } + + @Test + public void testConnectionCreatedAndDestroyed() throws Exception { + NotificationTest.flush(notifConsumer); + + ClientSessionFactory sf = createSessionFactory(locator); + ClientSession mySession = sf.createSession("myUser", "myPassword", false, true, true, locator.isPreAcknowledge(), locator.getAckBatchSize()); + mySession.start(); + + ClientMessage[] notifications = NotificationTest.consumeMessages(2, notifConsumer); + Assert.assertEquals(CONNECTION_CREATED.toString(), notifications[0].getObjectProperty(ManagementHelper.HDR_NOTIFICATION_TYPE).toString()); + Assert.assertNotNull(notifications[0].getObjectProperty(ManagementHelper.HDR_CONNECTION_NAME)); + final String connectionId = notifications[0].getObjectProperty(ManagementHelper.HDR_CONNECTION_NAME).toString(); + + Assert.assertEquals(SESSION_CREATED.toString(), notifications[1].getObjectProperty(ManagementHelper.HDR_NOTIFICATION_TYPE).toString()); + Assert.assertNotNull(notifications[1].getObjectProperty(ManagementHelper.HDR_CONNECTION_NAME)); + Assert.assertNotNull(notifications[1].getObjectProperty(ManagementHelper.HDR_SESSION_NAME)); + Assert.assertEquals(SimpleString.toSimpleString("myUser"), notifications[1].getObjectProperty(ManagementHelper.HDR_USER)); + + NotificationTest.flush(notifConsumer); + mySession.close(); + sf.close(); + + notifications = NotificationTest.consumeMessages(2, notifConsumer); + + Assert.assertEquals(SESSION_CLOSED.toString(), notifications[0].getObjectProperty(ManagementHelper.HDR_NOTIFICATION_TYPE).toString()); + Assert.assertNotNull(notifications[0].getObjectProperty(ManagementHelper.HDR_CONNECTION_NAME)); + Assert.assertNotNull(notifications[0].getObjectProperty(ManagementHelper.HDR_SESSION_NAME)); + Assert.assertEquals(SimpleString.toSimpleString("myUser"), notifications[0].getObjectProperty(ManagementHelper.HDR_USER)); + + Assert.assertEquals(CONNECTION_DESTROYED.toString(), notifications[1].getObjectProperty(ManagementHelper.HDR_NOTIFICATION_TYPE).toString()); + Assert.assertNotNull(notifications[1].getObjectProperty(ManagementHelper.HDR_CONNECTION_NAME)); + Assert.assertEquals(connectionId, notifications[1].getObjectProperty(ManagementHelper.HDR_CONNECTION_NAME).toString()); + } + + @Test + public void testMessageDelivered() throws Exception { + ClientSessionFactory sf = createSessionFactory(locator); + ClientSession mySession = sf.createSession("myUser", "myPassword", false, true, true, locator.isPreAcknowledge(), locator.getAckBatchSize()); + + mySession.start(); + + SimpleString queue = RandomUtil.randomSimpleString(); + SimpleString address = RandomUtil.randomSimpleString(); + boolean durable = RandomUtil.randomBoolean(); + + session.createQueue(address, queue, durable); + + ClientConsumer consumer = mySession.createConsumer(queue); + ClientProducer producer = mySession.createProducer(address); + + NotificationTest.flush(notifConsumer); + + ClientMessage msg = session.createMessage(false); + msg.putStringProperty("someKey", "someValue"); + producer.send(msg); + consumer.receive(1000); + + ClientMessage[] notifications = NotificationTest.consumeMessages(1, notifConsumer); + Assert.assertEquals(MESSAGE_DELIVERED.toString(), notifications[0].getObjectProperty(ManagementHelper.HDR_NOTIFICATION_TYPE).toString()); + Assert.assertNotNull(notifications[0].getObjectProperty(ManagementHelper.HDR_MESSAGE_ID)); + Assert.assertNotNull(notifications[0].getObjectProperty(ManagementHelper.HDR_CONSUMER_NAME)); + Assert.assertEquals(address, notifications[0].getObjectProperty(ManagementHelper.HDR_ADDRESS)); + Assert.assertEquals(queue, notifications[0].getObjectProperty(ManagementHelper.HDR_ROUTING_NAME)); + Assert.assertEquals(RoutingType.MULTICAST.getType(), notifications[0].getObjectProperty(ManagementHelper.HDR_ROUTING_TYPE)); + + consumer.close(); + session.deleteQueue(queue); + } + + @Test + public void testMessageExpired() throws Exception { + ClientSessionFactory sf = createSessionFactory(locator); + ClientSession mySession = sf.createSession("myUser", "myPassword", false, true, true, locator.isPreAcknowledge(), locator.getAckBatchSize()); + + mySession.start(); + + SimpleString queue = RandomUtil.randomSimpleString(); + SimpleString address = RandomUtil.randomSimpleString(); + boolean durable = RandomUtil.randomBoolean(); + + session.createQueue(address, queue, durable); + + ClientConsumer consumer = mySession.createConsumer(queue); + ClientProducer producer = mySession.createProducer(address); + + NotificationTest.flush(notifConsumer); + + ClientMessage msg = session.createMessage(false); + msg.putStringProperty("someKey", "someValue"); + msg.setExpiration(1); + producer.send(msg); + Thread.sleep(500); + consumer.receive(500); + + ClientMessage[] notifications = NotificationTest.consumeMessages(1, notifConsumer); + Assert.assertEquals(MESSAGE_EXPIRED.toString(), notifications[0].getObjectProperty(ManagementHelper.HDR_NOTIFICATION_TYPE).toString()); + Assert.assertNotNull(notifications[0].getObjectProperty(ManagementHelper.HDR_MESSAGE_ID)); + Assert.assertEquals(address, notifications[0].getObjectProperty(ManagementHelper.HDR_ADDRESS)); + Assert.assertEquals(queue, notifications[0].getObjectProperty(ManagementHelper.HDR_ROUTING_NAME)); + Assert.assertEquals(RoutingType.MULTICAST.getType(), notifications[0].getObjectProperty(ManagementHelper.HDR_ROUTING_TYPE)); + + consumer.close(); + session.deleteQueue(queue); + } + // Package protected --------------------------------------------- // Protected ----------------------------------------------------- @@ -211,6 +357,14 @@ public class NotificationTest extends ActiveMQTestBase { super.setUp(); server = addServer(ActiveMQServers.newActiveMQServer(createDefaultInVMConfig(), false)); + NotificationActiveMQServerPlugin notificationPlugin = new NotificationActiveMQServerPlugin(); + notificationPlugin.setSendAddressNotifications(true); + notificationPlugin.setSendConnectionNotifications(true); + notificationPlugin.setSendSessionNotifications(true); + notificationPlugin.setSendDeliveredNotifications(true); + notificationPlugin.setSendExpiredNotifications(true); + + server.registerBrokerPlugin(notificationPlugin); server.start(); locator = createInVMNonHALocator();