[ARTEMIS-1209] JMS OpenWire client cannot read notifications from activemq.notifications topic

Issue: https://issues.apache.org/jira/browse/ARTEMIS-1209
This commit is contained in:
Ingo Weiss 2017-06-06 16:46:58 +01:00 committed by Clebert Suconic
parent 25e9fd78d3
commit 45321c65bd
2 changed files with 26 additions and 1 deletions

View File

@ -94,6 +94,8 @@ public class OpenWireMessageConverter implements MessageConverter<OpenwireMessag
private static final String AMQ_MSG_DROPPABLE = AMQ_PREFIX + "DROPPABLE"; private static final String AMQ_MSG_DROPPABLE = AMQ_PREFIX + "DROPPABLE";
private static final String AMQ_MSG_COMPRESSED = AMQ_PREFIX + "COMPRESSED"; private static final String AMQ_MSG_COMPRESSED = AMQ_PREFIX + "COMPRESSED";
private static final String AMQ_NOTIFICATIONS_DESTINATION = "activemq.notifications";
private final WireFormat marshaller; private final WireFormat marshaller;
public OpenWireMessageConverter(WireFormat marshaller) { public OpenWireMessageConverter(WireFormat marshaller) {
@ -774,7 +776,8 @@ public class OpenWireMessageConverter implements MessageConverter<OpenwireMessag
if (props != null) { if (props != null) {
for (SimpleString s : props) { for (SimpleString s : props) {
String keyStr = s.toString(); String keyStr = s.toString();
if (keyStr.startsWith("_AMQ") || keyStr.startsWith("__HDR_")) { if ((keyStr.startsWith("_AMQ") || keyStr.startsWith("__HDR_")) &&
!(actualDestination.toString().contains(AMQ_NOTIFICATIONS_DESTINATION))) {
continue; continue;
} }
Object prop = coreMessage.getObjectProperty(s); Object prop = coreMessage.getObjectProperty(s);

View File

@ -38,6 +38,7 @@ import javax.jms.TemporaryQueue;
import javax.jms.TemporaryTopic; import javax.jms.TemporaryTopic;
import javax.jms.TextMessage; import javax.jms.TextMessage;
import javax.jms.Topic; import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicPublisher; import javax.jms.TopicPublisher;
import javax.jms.TopicSession; import javax.jms.TopicSession;
import javax.jms.TopicSubscriber; import javax.jms.TopicSubscriber;
@ -1467,6 +1468,27 @@ public class SimpleOpenWireTest extends BasicOpenWireTest {
} }
} }
@Test
public void testNotificationProperties() throws Exception {
try (TopicConnection topicConnection = factory.createTopicConnection()) {
TopicSession topicSession = topicConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
Topic notificationsTopic = topicSession.createTopic("activemq.notifications");
TopicSubscriber subscriber = topicSession.createSubscriber(notificationsTopic);
List<Message> receivedMessages = new ArrayList<>();
subscriber.setMessageListener(receivedMessages::add);
topicConnection.start();
while (receivedMessages.size() == 0) {
Thread.sleep(1000);
}
for (Message message : receivedMessages) {
assertNotNull(message);
assertNotNull(message.getStringProperty("_AMQ_NotifType"));
}
}
}
private void checkQueueEmpty(String qName) { private void checkQueueEmpty(String qName) {
PostOffice po = server.getPostOffice(); PostOffice po = server.getPostOffice();
LocalQueueBinding binding = (LocalQueueBinding) po.getBinding(SimpleString.toSimpleString(qName)); LocalQueueBinding binding = (LocalQueueBinding) po.getBinding(SimpleString.toSimpleString(qName));