ARTEMIS-1799 - Add a NotificationActiveMQServerPlugin

Adds a new plugin that will support sending new types of notifications
for broker events which will allow enhanced broker monitoring
This commit is contained in:
Christopher L. Shannon (cshannon) 2018-04-10 09:19:11 -04:00
parent e0334dff0e
commit 4795f7c6d0
7 changed files with 549 additions and 8 deletions

View File

@ -39,7 +39,15 @@ public enum CoreNotificationType implements NotificationType {
PROPOSAL(18),
PROPOSAL_RESPONSE(19),
UNPROPOSAL(20),
CONSUMER_SLOW(21);
CONSUMER_SLOW(21),
ADDRESS_ADDED(22),
ADDRESS_REMOVED(23),
CONNECTION_CREATED(24),
CONNECTION_DESTROYED(25),
SESSION_CREATED(26),
SESSION_CLOSED(27),
MESSAGE_DELIVERED(28),
MESSAGE_EXPIRED(29);
private final int value;

View File

@ -48,6 +48,8 @@ public final class ManagementHelper {
public static final SimpleString HDR_ADDRESS = new SimpleString("_AMQ_Address");
public static final SimpleString HDR_ROUTING_TYPE = new SimpleString("_AMQ_Routing_Type");
public static final SimpleString HDR_BINDING_ID = new SimpleString("_AMQ_Binding_ID");
public static final SimpleString HDR_BINDING_TYPE = new SimpleString("_AMQ_Binding_Type");
@ -76,6 +78,8 @@ public final class ManagementHelper {
public static final SimpleString HDR_CONNECTION_NAME = new SimpleString("_AMQ_ConnectionName");
public static final SimpleString HDR_MESSAGE_ID = new SimpleString("_AMQ_Message_ID");
// Attributes ----------------------------------------------------
// Static --------------------------------------------------------

View File

@ -1959,16 +1959,19 @@ public class ActiveMQServerImpl implements ActiveMQServer {
@Override
public void registerBrokerPlugins(final List<ActiveMQServerPlugin> plugins) {
configuration.registerBrokerPlugins(plugins);
plugins.forEach(plugin -> plugin.registered(this));
}
@Override
public void registerBrokerPlugin(final ActiveMQServerPlugin plugin) {
configuration.registerBrokerPlugin(plugin);
plugin.registered(this);
}
@Override
public void unRegisterBrokerPlugin(final ActiveMQServerPlugin plugin) {
configuration.unRegisterBrokerPlugin(plugin);
plugin.unregistered(this);
}
@Override
@ -2340,6 +2343,10 @@ public class ActiveMQServerImpl implements ActiveMQServer {
reloadManager.addCallback(configuration.getConfigurationUrl(), new ConfigurationFileReloader());
}
if (hasBrokerPlugins()) {
callBrokerPlugins(plugin -> plugin.registered(this));
}
return true;
}

View File

@ -30,6 +30,7 @@ import org.apache.activemq.artemis.core.postoffice.Binding;
import org.apache.activemq.artemis.core.postoffice.QueueBinding;
import org.apache.activemq.artemis.core.postoffice.RoutingStatus;
import org.apache.activemq.artemis.core.security.SecurityAuth;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.QueueConfig;
@ -49,6 +50,23 @@ import org.apache.activemq.artemis.utils.critical.CriticalComponent;
*/
public interface ActiveMQServerPlugin {
/**
* The plugin has been registered with the server
*
* @param server The ActiveMQServer the plugin has been registered to
*/
default void registered(ActiveMQServer server) {
}
/**
* The plugin has been unregistered with the server
*
* @param server The ActiveMQServer the plugin has been unregistered to
*/
default void unregistered(ActiveMQServer server) {
}
/**
* A connection has been created.

View File

@ -0,0 +1,287 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.server.plugin.impl;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.management.CoreNotificationType;
import org.apache.activemq.artemis.api.core.management.ManagementHelper;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.ServerConsumer;
import org.apache.activemq.artemis.core.server.ServerSession;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.server.management.ManagementService;
import org.apache.activemq.artemis.core.server.management.Notification;
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerPlugin;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.utils.collections.TypedProperties;
import org.jboss.logging.Logger;
/**
*
*/
public class NotificationActiveMQServerPlugin implements ActiveMQServerPlugin {
private static final Logger logger = Logger.getLogger(NotificationActiveMQServerPlugin.class);
public static final String SEND_CONNECTION_NOTIFICATIONS = "SEND_CONNECTION_NOTIFICATIONS";
public static final String SEND_SESSION_NOTIFICATIONS = "SEND_SESSION_NOTIFICATIONS";
public static final String SEND_ADDRESS_NOTIFICATIONS = "SEND_ADDRESS_NOTIFICATIONS";
public static final String SEND_DELIVERED_NOTIFICATIONS = "SEND_DELIVERED_NOTIFICATIONS";
public static final String SEND_EXPIRED_NOTIFICATIONS = "SEND_EXPIRED_NOTIFICATIONS";
private boolean sendConnectionNotifications;
private boolean sendSessionNotifications;
private boolean sendAddressNotifications;
private boolean sendDeliveredNotifications;
private boolean sendExpiredNotifications;
private final AtomicReference<ManagementService> managementService = new AtomicReference<>();
/**
* used to pass configured properties to Plugin
*
* @param properties
*/
@Override
public void init(Map<String, String> properties) {
sendConnectionNotifications = Boolean.parseBoolean(properties.getOrDefault(SEND_CONNECTION_NOTIFICATIONS,
Boolean.FALSE.toString()));
sendSessionNotifications = Boolean.parseBoolean(properties.getOrDefault(SEND_SESSION_NOTIFICATIONS,
Boolean.FALSE.toString()));
sendAddressNotifications = Boolean.parseBoolean(properties.getOrDefault(SEND_ADDRESS_NOTIFICATIONS,
Boolean.FALSE.toString()));
sendDeliveredNotifications = Boolean.parseBoolean(properties.getOrDefault(SEND_DELIVERED_NOTIFICATIONS,
Boolean.FALSE.toString()));
sendExpiredNotifications = Boolean.parseBoolean(properties.getOrDefault(SEND_EXPIRED_NOTIFICATIONS,
Boolean.FALSE.toString()));
}
@Override
public void registered(ActiveMQServer server) {
managementService.set(server.getManagementService());
}
@Override
public void unregistered(ActiveMQServer server) {
managementService.set(null);
}
@Override
public void afterCreateConnection(RemotingConnection connection) throws ActiveMQException {
sendConnectionNotification(connection, CoreNotificationType.CONNECTION_CREATED);
}
@Override
public void afterDestroyConnection(RemotingConnection connection) throws ActiveMQException {
sendConnectionNotification(connection, CoreNotificationType.CONNECTION_DESTROYED);
}
@Override
public void afterCreateSession(ServerSession session) throws ActiveMQException {
sendSessionNotification(session, CoreNotificationType.SESSION_CREATED);
}
@Override
public void afterCloseSession(ServerSession session, boolean failed) throws ActiveMQException {
sendSessionNotification(session, CoreNotificationType.SESSION_CLOSED);
}
@Override
public void afterAddAddress(AddressInfo addressInfo, boolean reload) throws ActiveMQException {
sendAddressNotification(addressInfo, CoreNotificationType.ADDRESS_ADDED);
}
@Override
public void afterRemoveAddress(SimpleString address, AddressInfo addressInfo) throws ActiveMQException {
sendAddressNotification(addressInfo, CoreNotificationType.ADDRESS_REMOVED);
}
@Override
public void afterDeliver(ServerConsumer consumer, MessageReference reference) throws ActiveMQException {
final ManagementService managementService = getManagementService();
if (managementService != null && sendDeliveredNotifications) {
try {
if (!reference.getQueue().getAddress().equals(managementService.getManagementNotificationAddress())) {
final TypedProperties props = new TypedProperties();
props.putSimpleStringProperty(ManagementHelper.HDR_ADDRESS, consumer.getQueueAddress());
props.putByteProperty(ManagementHelper.HDR_ROUTING_TYPE, consumer.getQueueType().getType());
props.putSimpleStringProperty(ManagementHelper.HDR_ROUTING_NAME, consumer.getQueueName());
props.putLongProperty(ManagementHelper.HDR_CONSUMER_NAME, consumer.getID());
props.putLongProperty(ManagementHelper.HDR_MESSAGE_ID, reference.getMessageID());
managementService.sendNotification(new Notification(null, CoreNotificationType.MESSAGE_DELIVERED, props));
}
} catch (Exception e) {
logger.warn("Error sending notification: " + CoreNotificationType.MESSAGE_DELIVERED, e.getMessage(), e);
}
}
}
@Override
public void messageExpired(MessageReference message, SimpleString messageExpiryAddress) throws ActiveMQException {
final ManagementService managementService = getManagementService();
if (managementService != null && sendExpiredNotifications) {
try {
if (!message.getQueue().getAddress().equals(managementService.getManagementNotificationAddress())) {
final TypedProperties props = new TypedProperties();
props.putSimpleStringProperty(ManagementHelper.HDR_ADDRESS, message.getQueue().getAddress());
props.putByteProperty(ManagementHelper.HDR_ROUTING_TYPE, message.getQueue().getRoutingType().getType());
props.putSimpleStringProperty(ManagementHelper.HDR_ROUTING_NAME, message.getQueue().getName());
props.putLongProperty(ManagementHelper.HDR_MESSAGE_ID, message.getMessageID());
if (message.hasConsumerId()) {
props.putLongProperty(ManagementHelper.HDR_CONSUMER_NAME, message.getConsumerId());
}
managementService.sendNotification(new Notification(null, CoreNotificationType.MESSAGE_EXPIRED, props));
}
} catch (Exception e) {
logger.warn("Error sending notification: " + CoreNotificationType.MESSAGE_EXPIRED, e.getMessage(), e);
}
}
}
private void sendAddressNotification(AddressInfo addressInfo, final CoreNotificationType type) {
final ManagementService managementService = getManagementService();
if (managementService != null && sendAddressNotifications) {
try {
final TypedProperties props = new TypedProperties();
props.putSimpleStringProperty(ManagementHelper.HDR_ADDRESS, addressInfo.getName());
props.putByteProperty(ManagementHelper.HDR_ROUTING_TYPE, addressInfo.getRoutingType().getType());
managementService.sendNotification(new Notification(null, type, props));
} catch (Exception e) {
logger.warn("Error sending notification: " + type, e.getMessage(), e);
}
}
}
private void sendConnectionNotification(final RemotingConnection connection, final CoreNotificationType type) {
final ManagementService managementService = getManagementService();
if (managementService != null && sendConnectionNotifications) {
try {
final TypedProperties props = new TypedProperties();
props.putSimpleStringProperty(ManagementHelper.HDR_CONNECTION_NAME, SimpleString.toSimpleString(connection.getID().toString()));
props.putSimpleStringProperty(ManagementHelper.HDR_REMOTE_ADDRESS, SimpleString.toSimpleString(connection.getRemoteAddress()));
managementService.sendNotification(new Notification(null, type, props));
} catch (Exception e) {
logger.warn("Error sending notification: " + type, e.getMessage(), e);
}
}
}
private void sendSessionNotification(final ServerSession session, final CoreNotificationType type) {
final ManagementService managementService = getManagementService();
if (managementService != null && sendSessionNotifications) {
try {
final TypedProperties props = new TypedProperties();
props.putSimpleStringProperty(ManagementHelper.HDR_CONNECTION_NAME, SimpleString.toSimpleString(session.getConnectionID().toString()));
props.putSimpleStringProperty(ManagementHelper.HDR_USER, SimpleString.toSimpleString(session.getUsername()));
props.putSimpleStringProperty(ManagementHelper.HDR_SESSION_NAME, SimpleString.toSimpleString(session.getName()));
managementService.sendNotification(new Notification(null, type, props));
} catch (Exception e) {
logger.warn("Error sending notification: " + type, e.getMessage(), e);
}
}
}
/**
* @return the sendConnectionNotifications
*/
public boolean isSendConnectionNotifications() {
return sendConnectionNotifications;
}
/**
* @param sendConnectionNotifications the sendConnectionNotifications to set
*/
public void setSendConnectionNotifications(boolean sendConnectionNotifications) {
this.sendConnectionNotifications = sendConnectionNotifications;
}
/**
* @return the sendSessionNotifications
*/
public boolean isSendSessionNotifications() {
return sendSessionNotifications;
}
/**
* @param sendSessionNotifications the sendSessionNotifications to set
*/
public void setSendSessionNotifications(boolean sendSessionNotifications) {
this.sendSessionNotifications = sendSessionNotifications;
}
/**
* @return the sendDeliveredNotifications
*/
public boolean isSendDeliveredNotifications() {
return sendDeliveredNotifications;
}
/**
* @param sendDeliveredNotifications the sendDeliveredNotifications to set
*/
public void setSendDeliveredNotifications(boolean sendDeliveredNotifications) {
this.sendDeliveredNotifications = sendDeliveredNotifications;
}
/**
* @return the sendExpiredNotifications
*/
public boolean isSendExpiredNotifications() {
return sendExpiredNotifications;
}
/**
* @param sendExpiredNotifications the sendExpiredNotifications to set
*/
public void setSendExpiredNotifications(boolean sendExpiredNotifications) {
this.sendExpiredNotifications = sendExpiredNotifications;
}
/**
* @return the sendAddressNotifications
*/
public boolean isSendAddressNotifications() {
return sendAddressNotifications;
}
/**
* @param sendAddressNotifications the sendAddressNotifications to set
*/
public void setSendAddressNotifications(boolean sendAddressNotifications) {
this.sendAddressNotifications = sendAddressNotifications;
}
private ManagementService getManagementService() {
return this.managementService.get();
}
}

View File

@ -131,3 +131,66 @@ Most events in the LoggingActiveMQServerPlugin follow a `beforeX` and `afterX` n
At Log Level `INFO`, the LoggingActiveMQServerPlugin logs an entry when an `afterX` notification occurs. By setting the Logger
"org.apache.activemq.artemis.core.server.plugin.impl" to `DEBUG` Level, log entries are generated for both `beforeX` and `afterX` notifications.
Log Level `DEBUG` will also log more information for a notification when available.
## Using the NotificationActiveMQServerPlugin
The NotificationActiveMQServerPlugin can be configured to send extra notifications for specific broker events.
You can select which notifications are sent by setting the following configuration properties to `true`.
<table summary="NotificationActiveMQServerPlugin configuration" border="1">
<colgroup>
<col/>
<col/>
</colgroup>
<thead>
<tr>
<th>Property</th>
<th>Property Description</th>
</tr>
</thead>
<tbody>
<tr>
<td>SEND_CONNECTION_NOTIFICATIONS</td>
<td>Sends a notification when a Connection is created/destroy. Default `false`.</td>
</tr>
<tr>
<td>SEND_SESSION_NOTIFICATIONS</td>
<td>Sends a notification when a Session is created/closed. Default `false`.</td>
</tr>
<tr>
<td>SEND_ADDRESS_NOTIFICATIONS</td>
<td>Sends a notification when an Address is added/removed. Default `false`.</td>
</tr>
<tr>
<td>SEND_DELIVERED_NOTIFICATIONS</td>
<td>Sends a notification when message is delivered to a consumer. Default `false`</td>
</tr>
<tr>
<td>SEND_EXPIRED_NOTIFICATIONS</td>
<td>Sends a notification when message has been expired by the broker. Default `false`</td>
</tr>
</tbody>
</table>
By default the NotificationActiveMQServerPlugin will not send any notifications. The plugin is activated by setting one (or a selection)
of the above configuration properties to `true`.
To configure the plugin, you can add the following configuration to the broker. In the example below both SEND_CONNECTION_NOTIFICATIONS
and SEND_SESSION_NOTIFICATIONS will be sent by the broker.
```xml
<configuration ...>
...
<broker-plugins>
<broker-plugin class-name="org.apache.activemq.artemis.core.server.plugin.impl.NotificationActiveMQServerPlugin">
<property key="SEND_CONNECTION_NOTIFICATIONS" value="true" />
<property key="SEND_SESSION_NOTIFICATIONS" value="true" />
</broker-plugin>
</broker-plugins>
...
</configuration>
```

View File

@ -18,9 +18,11 @@ package org.apache.activemq.artemis.tests.integration.management;
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ClientConsumer;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.ClientProducer;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
@ -28,16 +30,25 @@ import org.apache.activemq.artemis.api.core.management.ManagementHelper;
import org.apache.activemq.artemis.core.client.impl.ClientSessionInternal;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServers;
import org.apache.activemq.artemis.core.server.plugin.impl.NotificationActiveMQServerPlugin;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.utils.RandomUtil;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import static org.apache.activemq.artemis.api.core.management.CoreNotificationType.CONNECTION_CREATED;
import static org.apache.activemq.artemis.api.core.management.CoreNotificationType.CONNECTION_DESTROYED;
import static org.apache.activemq.artemis.api.core.management.CoreNotificationType.SESSION_CREATED;
import static org.apache.activemq.artemis.api.core.management.CoreNotificationType.SESSION_CLOSED;
import static org.apache.activemq.artemis.api.core.management.CoreNotificationType.ADDRESS_ADDED;
import static org.apache.activemq.artemis.api.core.management.CoreNotificationType.ADDRESS_REMOVED;
import static org.apache.activemq.artemis.api.core.management.CoreNotificationType.BINDING_ADDED;
import static org.apache.activemq.artemis.api.core.management.CoreNotificationType.BINDING_REMOVED;
import static org.apache.activemq.artemis.api.core.management.CoreNotificationType.CONSUMER_CLOSED;
import static org.apache.activemq.artemis.api.core.management.CoreNotificationType.CONSUMER_CREATED;
import static org.apache.activemq.artemis.api.core.management.CoreNotificationType.MESSAGE_DELIVERED;
import static org.apache.activemq.artemis.api.core.management.CoreNotificationType.MESSAGE_EXPIRED;
public class NotificationTest extends ActiveMQTestBase {
@ -70,10 +81,11 @@ public class NotificationTest extends ActiveMQTestBase {
session.createQueue(address, queue, durable);
ClientMessage[] notifications = NotificationTest.consumeMessages(1, notifConsumer);
Assert.assertEquals(BINDING_ADDED.toString(), notifications[0].getObjectProperty(ManagementHelper.HDR_NOTIFICATION_TYPE).toString());
Assert.assertEquals(queue.toString(), notifications[0].getObjectProperty(ManagementHelper.HDR_ROUTING_NAME).toString());
Assert.assertEquals(address.toString(), notifications[0].getObjectProperty(ManagementHelper.HDR_ADDRESS).toString());
//the first message received will be for the address creation
ClientMessage[] notifications = NotificationTest.consumeMessages(2, notifConsumer);
Assert.assertEquals(BINDING_ADDED.toString(), notifications[1].getObjectProperty(ManagementHelper.HDR_NOTIFICATION_TYPE).toString());
Assert.assertEquals(queue.toString(), notifications[1].getObjectProperty(ManagementHelper.HDR_ROUTING_NAME).toString());
Assert.assertEquals(address.toString(), notifications[1].getObjectProperty(ManagementHelper.HDR_ADDRESS).toString());
session.deleteQueue(queue);
}
@ -110,8 +122,7 @@ public class NotificationTest extends ActiveMQTestBase {
System.out.println(queue);
notifConsumer.close();
notifConsumer = session.createConsumer(notifQueue.toString(), ManagementHelper.HDR_ROUTING_NAME + " <> '" +
queue +
"'");
queue + "' AND " + ManagementHelper.HDR_ADDRESS + " <> '" + address + "'");
NotificationTest.flush(notifConsumer);
session.createQueue(address, queue, durable);
@ -133,7 +144,8 @@ public class NotificationTest extends ActiveMQTestBase {
session.deleteQueue(queue);
ClientMessage[] notifications = NotificationTest.consumeMessages(1, notifConsumer);
//There will be 2 notifications, first is for binding removal, second is for address removal
ClientMessage[] notifications = NotificationTest.consumeMessages(2, notifConsumer);
Assert.assertEquals(BINDING_REMOVED.toString(), notifications[0].getObjectProperty(ManagementHelper.HDR_NOTIFICATION_TYPE).toString());
Assert.assertEquals(queue.toString(), notifications[0].getObjectProperty(ManagementHelper.HDR_ROUTING_NAME).toString());
Assert.assertEquals(address.toString(), notifications[0].getObjectProperty(ManagementHelper.HDR_ADDRESS).toString());
@ -201,6 +213,140 @@ public class NotificationTest extends ActiveMQTestBase {
session.deleteQueue(queue);
}
@Test
public void testAddressAdded() throws Exception {
SimpleString address = RandomUtil.randomSimpleString();
NotificationTest.flush(notifConsumer);
session.createAddress(address, RoutingType.ANYCAST, true);
ClientMessage[] notifications = NotificationTest.consumeMessages(1, notifConsumer);
Assert.assertEquals(ADDRESS_ADDED.toString(), notifications[0].getObjectProperty(ManagementHelper.HDR_NOTIFICATION_TYPE).toString());
Assert.assertEquals(RoutingType.ANYCAST.getType(), notifications[0].getObjectProperty(ManagementHelper.HDR_ROUTING_TYPE));
Assert.assertEquals(address.toString(), notifications[0].getObjectProperty(ManagementHelper.HDR_ADDRESS).toString());
}
@Test
public void testAddressRemoved() throws Exception {
SimpleString address = RandomUtil.randomSimpleString();
session.createAddress(address, RoutingType.ANYCAST, true);
NotificationTest.flush(notifConsumer);
server.getPostOffice().removeAddressInfo(address);
ClientMessage[] notifications = NotificationTest.consumeMessages(1, notifConsumer);
Assert.assertEquals(ADDRESS_REMOVED.toString(), notifications[0].getObjectProperty(ManagementHelper.HDR_NOTIFICATION_TYPE).toString());
Assert.assertEquals(RoutingType.ANYCAST.getType(), notifications[0].getObjectProperty(ManagementHelper.HDR_ROUTING_TYPE));
Assert.assertEquals(address.toString(), notifications[0].getObjectProperty(ManagementHelper.HDR_ADDRESS).toString());
}
@Test
public void testConnectionCreatedAndDestroyed() throws Exception {
NotificationTest.flush(notifConsumer);
ClientSessionFactory sf = createSessionFactory(locator);
ClientSession mySession = sf.createSession("myUser", "myPassword", false, true, true, locator.isPreAcknowledge(), locator.getAckBatchSize());
mySession.start();
ClientMessage[] notifications = NotificationTest.consumeMessages(2, notifConsumer);
Assert.assertEquals(CONNECTION_CREATED.toString(), notifications[0].getObjectProperty(ManagementHelper.HDR_NOTIFICATION_TYPE).toString());
Assert.assertNotNull(notifications[0].getObjectProperty(ManagementHelper.HDR_CONNECTION_NAME));
final String connectionId = notifications[0].getObjectProperty(ManagementHelper.HDR_CONNECTION_NAME).toString();
Assert.assertEquals(SESSION_CREATED.toString(), notifications[1].getObjectProperty(ManagementHelper.HDR_NOTIFICATION_TYPE).toString());
Assert.assertNotNull(notifications[1].getObjectProperty(ManagementHelper.HDR_CONNECTION_NAME));
Assert.assertNotNull(notifications[1].getObjectProperty(ManagementHelper.HDR_SESSION_NAME));
Assert.assertEquals(SimpleString.toSimpleString("myUser"), notifications[1].getObjectProperty(ManagementHelper.HDR_USER));
NotificationTest.flush(notifConsumer);
mySession.close();
sf.close();
notifications = NotificationTest.consumeMessages(2, notifConsumer);
Assert.assertEquals(SESSION_CLOSED.toString(), notifications[0].getObjectProperty(ManagementHelper.HDR_NOTIFICATION_TYPE).toString());
Assert.assertNotNull(notifications[0].getObjectProperty(ManagementHelper.HDR_CONNECTION_NAME));
Assert.assertNotNull(notifications[0].getObjectProperty(ManagementHelper.HDR_SESSION_NAME));
Assert.assertEquals(SimpleString.toSimpleString("myUser"), notifications[0].getObjectProperty(ManagementHelper.HDR_USER));
Assert.assertEquals(CONNECTION_DESTROYED.toString(), notifications[1].getObjectProperty(ManagementHelper.HDR_NOTIFICATION_TYPE).toString());
Assert.assertNotNull(notifications[1].getObjectProperty(ManagementHelper.HDR_CONNECTION_NAME));
Assert.assertEquals(connectionId, notifications[1].getObjectProperty(ManagementHelper.HDR_CONNECTION_NAME).toString());
}
@Test
public void testMessageDelivered() throws Exception {
ClientSessionFactory sf = createSessionFactory(locator);
ClientSession mySession = sf.createSession("myUser", "myPassword", false, true, true, locator.isPreAcknowledge(), locator.getAckBatchSize());
mySession.start();
SimpleString queue = RandomUtil.randomSimpleString();
SimpleString address = RandomUtil.randomSimpleString();
boolean durable = RandomUtil.randomBoolean();
session.createQueue(address, queue, durable);
ClientConsumer consumer = mySession.createConsumer(queue);
ClientProducer producer = mySession.createProducer(address);
NotificationTest.flush(notifConsumer);
ClientMessage msg = session.createMessage(false);
msg.putStringProperty("someKey", "someValue");
producer.send(msg);
consumer.receive(1000);
ClientMessage[] notifications = NotificationTest.consumeMessages(1, notifConsumer);
Assert.assertEquals(MESSAGE_DELIVERED.toString(), notifications[0].getObjectProperty(ManagementHelper.HDR_NOTIFICATION_TYPE).toString());
Assert.assertNotNull(notifications[0].getObjectProperty(ManagementHelper.HDR_MESSAGE_ID));
Assert.assertNotNull(notifications[0].getObjectProperty(ManagementHelper.HDR_CONSUMER_NAME));
Assert.assertEquals(address, notifications[0].getObjectProperty(ManagementHelper.HDR_ADDRESS));
Assert.assertEquals(queue, notifications[0].getObjectProperty(ManagementHelper.HDR_ROUTING_NAME));
Assert.assertEquals(RoutingType.MULTICAST.getType(), notifications[0].getObjectProperty(ManagementHelper.HDR_ROUTING_TYPE));
consumer.close();
session.deleteQueue(queue);
}
@Test
public void testMessageExpired() throws Exception {
ClientSessionFactory sf = createSessionFactory(locator);
ClientSession mySession = sf.createSession("myUser", "myPassword", false, true, true, locator.isPreAcknowledge(), locator.getAckBatchSize());
mySession.start();
SimpleString queue = RandomUtil.randomSimpleString();
SimpleString address = RandomUtil.randomSimpleString();
boolean durable = RandomUtil.randomBoolean();
session.createQueue(address, queue, durable);
ClientConsumer consumer = mySession.createConsumer(queue);
ClientProducer producer = mySession.createProducer(address);
NotificationTest.flush(notifConsumer);
ClientMessage msg = session.createMessage(false);
msg.putStringProperty("someKey", "someValue");
msg.setExpiration(1);
producer.send(msg);
Thread.sleep(500);
consumer.receive(500);
ClientMessage[] notifications = NotificationTest.consumeMessages(1, notifConsumer);
Assert.assertEquals(MESSAGE_EXPIRED.toString(), notifications[0].getObjectProperty(ManagementHelper.HDR_NOTIFICATION_TYPE).toString());
Assert.assertNotNull(notifications[0].getObjectProperty(ManagementHelper.HDR_MESSAGE_ID));
Assert.assertEquals(address, notifications[0].getObjectProperty(ManagementHelper.HDR_ADDRESS));
Assert.assertEquals(queue, notifications[0].getObjectProperty(ManagementHelper.HDR_ROUTING_NAME));
Assert.assertEquals(RoutingType.MULTICAST.getType(), notifications[0].getObjectProperty(ManagementHelper.HDR_ROUTING_TYPE));
consumer.close();
session.deleteQueue(queue);
}
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
@ -211,6 +357,14 @@ public class NotificationTest extends ActiveMQTestBase {
super.setUp();
server = addServer(ActiveMQServers.newActiveMQServer(createDefaultInVMConfig(), false));
NotificationActiveMQServerPlugin notificationPlugin = new NotificationActiveMQServerPlugin();
notificationPlugin.setSendAddressNotifications(true);
notificationPlugin.setSendConnectionNotifications(true);
notificationPlugin.setSendSessionNotifications(true);
notificationPlugin.setSendDeliveredNotifications(true);
notificationPlugin.setSendExpiredNotifications(true);
server.registerBrokerPlugin(notificationPlugin);
server.start();
locator = createInVMNonHALocator();