ARTEMIS-4275 add ID to consumer notifications

This commit is contained in:
Justin Bertram 2023-05-10 12:08:53 -05:00 committed by Clebert Suconic
parent 3a48258f7d
commit a57c48ec55
3 changed files with 41 additions and 14 deletions

View File

@ -615,6 +615,8 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
props.putSimpleStringProperty(ManagementHelper.HDR_CLIENT_ID, SimpleString.toSimpleString(session.getRemotingConnection().getClientID())); props.putSimpleStringProperty(ManagementHelper.HDR_CLIENT_ID, SimpleString.toSimpleString(session.getRemotingConnection().getClientID()));
} }
props.putLongProperty(ManagementHelper.HDR_CONSUMER_NAME, getID());
Notification notification = new Notification(null, CoreNotificationType.CONSUMER_CLOSED, props); Notification notification = new Notification(null, CoreNotificationType.CONSUMER_CLOSED, props);
managementService.sendNotification(notification); managementService.sendNotification(notification);

View File

@ -624,6 +624,8 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
props.putSimpleStringProperty(ManagementHelper.HDR_CLIENT_ID, SimpleString.toSimpleString(remotingConnection.getClientID())); props.putSimpleStringProperty(ManagementHelper.HDR_CLIENT_ID, SimpleString.toSimpleString(remotingConnection.getClientID()));
} }
props.putLongProperty(ManagementHelper.HDR_CONSUMER_NAME, consumer.getID());
Notification notification = new Notification(null, CoreNotificationType.CONSUMER_CREATED, props); Notification notification = new Notification(null, CoreNotificationType.CONSUMER_CREATED, props);
if (logger.isDebugEnabled()) { if (logger.isDebugEnabled()) {

View File

@ -39,8 +39,9 @@ import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import static org.apache.activemq.artemis.api.core.management.ManagementHelper.HDR_CLIENT_ID; import static org.apache.activemq.artemis.api.core.management.ManagementHelper.HDR_CLIENT_ID;
import static org.apache.activemq.artemis.api.core.management.ManagementHelper.HDR_CONSUMER_NAME;
public class JMSClientIdNotificationTest extends MultiprotocolJMSClientTestSupport { public class JMSNotificationTest extends MultiprotocolJMSClientTestSupport {
private ClientConsumer notificationConsumer; private ClientConsumer notificationConsumer;
private String clientID; private String clientID;
@ -76,20 +77,20 @@ public class JMSClientIdNotificationTest extends MultiprotocolJMSClientTestSuppo
@Test(timeout = 30000) @Test(timeout = 30000)
public void testConsumerNotificationAMQP() throws Exception { public void testConsumerNotificationAMQP() throws Exception {
testConsumerNotification(createConnection(getBrokerQpidJMSConnectionURI(), null, null, clientID, true)); testConsumerNotifications(createConnection(getBrokerQpidJMSConnectionURI(), null, null, clientID, true));
} }
@Test(timeout = 30000) @Test(timeout = 30000)
public void testConsumerNotificationCore() throws Exception { public void testConsumerNotificationCore() throws Exception {
testConsumerNotification(createCoreConnection(getBrokerCoreJMSConnectionString(), null, null, clientID, true)); testConsumerNotifications(createCoreConnection(getBrokerCoreJMSConnectionString(), null, null, clientID, true));
} }
@Test(timeout = 30000) @Test(timeout = 30000)
public void testConsumerNotificationOpenWire() throws Exception { public void testConsumerNotificationOpenWire() throws Exception {
testConsumerNotification(createOpenWireConnection(getBrokerOpenWireJMSConnectionString(), null, null, clientID, true)); testConsumerNotifications(createOpenWireConnection(getBrokerOpenWireJMSConnectionString(), null, null, clientID, true));
} }
private void testConsumerNotification(Connection connection) throws Exception { private void testConsumerNotifications(Connection connection) throws Exception {
final String subscriptionName = "mySub"; final String subscriptionName = "mySub";
try { try {
flush(); flush();
@ -97,16 +98,32 @@ public class JMSClientIdNotificationTest extends MultiprotocolJMSClientTestSuppo
Topic topic = session.createTopic(getTopicName()); Topic topic = session.createTopic(getTopicName());
flush(); flush();
MessageConsumer consumer = session.createDurableSubscriber(topic, subscriptionName); MessageConsumer consumer = session.createDurableSubscriber(topic, subscriptionName);
notificationConsumer.receiveImmediate(); // clear the BINDING_ADDED notification for the subscription queue Message m = receiveNotification(CoreNotificationType.CONSUMER_CREATED, notificationConsumer);
validateClientIdOnNotification(CoreNotificationType.CONSUMER_CREATED); validateClientIdOnNotification(m, CoreNotificationType.CONSUMER_CREATED);
String consumerID = validatePropertyOnNotification(m, CoreNotificationType.CONSUMER_CREATED, HDR_CONSUMER_NAME, null, false);
consumer.close(); consumer.close();
validateClientIdOnNotification(CoreNotificationType.CONSUMER_CLOSED); m = receiveNotification(CoreNotificationType.CONSUMER_CLOSED, notificationConsumer);
validateClientIdOnNotification(m, CoreNotificationType.CONSUMER_CLOSED);
validatePropertyOnNotification(m, CoreNotificationType.CONSUMER_CLOSED, HDR_CONSUMER_NAME, consumerID, true);
session.unsubscribe(subscriptionName); session.unsubscribe(subscriptionName);
} finally { } finally {
connection.close(); connection.close();
} }
} }
ClientMessage receiveNotification(CoreNotificationType notificationType, ClientConsumer consumer) throws Exception {
for (;;) {
ClientMessage message = consumer.receive(1000);
if (message == null) {
return null;
}
String receivedType = message.getStringProperty(ManagementHelper.HDR_NOTIFICATION_TYPE);
if (String.valueOf(receivedType).equals(notificationType.toString())) {
return message;
}
}
}
@Test(timeout = 30000) @Test(timeout = 30000)
public void testSessionNotificationAMQP() throws Exception { public void testSessionNotificationAMQP() throws Exception {
testSessionNotification(createConnection(getBrokerQpidJMSConnectionURI(), null, null, clientID, true)); testSessionNotification(createConnection(getBrokerQpidJMSConnectionURI(), null, null, clientID, true));
@ -126,19 +143,25 @@ public class JMSClientIdNotificationTest extends MultiprotocolJMSClientTestSuppo
try { try {
flush(); flush();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
validateClientIdOnNotification(CoreNotificationType.SESSION_CREATED); validateClientIdOnNotification(notificationConsumer.receive(1000), CoreNotificationType.SESSION_CREATED);
session.close(); session.close();
validateClientIdOnNotification(CoreNotificationType.SESSION_CLOSED); validateClientIdOnNotification(notificationConsumer.receive(1000), CoreNotificationType.SESSION_CLOSED);
} finally { } finally {
connection.close(); connection.close();
} }
} }
private void validateClientIdOnNotification(CoreNotificationType notificationType) throws ActiveMQException { private void validateClientIdOnNotification(Message m, CoreNotificationType notificationType) {
Message m = notificationConsumer.receive(1000); validatePropertyOnNotification(m, notificationType, HDR_CLIENT_ID, clientID, true);
}
private String validatePropertyOnNotification(Message m, CoreNotificationType notificationType, SimpleString propertyName, String propertyValue, boolean checkValue) {
assertNotNull(m); assertNotNull(m);
assertEquals(notificationType.toString(), m.getStringProperty(ManagementHelper.HDR_NOTIFICATION_TYPE)); assertEquals(notificationType.toString(), m.getStringProperty(ManagementHelper.HDR_NOTIFICATION_TYPE));
assertTrue(m.getPropertyNames().contains(HDR_CLIENT_ID)); assertTrue(m.getPropertyNames().contains(propertyName));
assertEquals(clientID, m.getStringProperty(HDR_CLIENT_ID)); if (checkValue) {
assertEquals(propertyValue, m.getStringProperty(propertyName));
}
return m.getStringProperty(propertyName);
} }
} }