This commit is contained in:
Clebert Suconic 2017-06-07 16:27:48 -04:00
commit db5a9597ab
2 changed files with 27 additions and 1 deletions

View File

@ -94,6 +94,8 @@ public class OpenWireMessageConverter implements MessageConverter<OpenwireMessag
private static final String AMQ_MSG_DROPPABLE = AMQ_PREFIX + "DROPPABLE"; private static final String AMQ_MSG_DROPPABLE = AMQ_PREFIX + "DROPPABLE";
private static final String AMQ_MSG_COMPRESSED = AMQ_PREFIX + "COMPRESSED"; private static final String AMQ_MSG_COMPRESSED = AMQ_PREFIX + "COMPRESSED";
private static final String AMQ_NOTIFICATIONS_DESTINATION = "activemq.notifications";
private final WireFormat marshaller; private final WireFormat marshaller;
public OpenWireMessageConverter(WireFormat marshaller) { public OpenWireMessageConverter(WireFormat marshaller) {
@ -774,7 +776,8 @@ public class OpenWireMessageConverter implements MessageConverter<OpenwireMessag
if (props != null) { if (props != null) {
for (SimpleString s : props) { for (SimpleString s : props) {
String keyStr = s.toString(); String keyStr = s.toString();
if (keyStr.startsWith("_AMQ") || keyStr.startsWith("__HDR_")) { if ((keyStr.startsWith("_AMQ") || keyStr.startsWith("__HDR_")) &&
!(actualDestination.toString().contains(AMQ_NOTIFICATIONS_DESTINATION))) {
continue; continue;
} }
Object prop = coreMessage.getObjectProperty(s); Object prop = coreMessage.getObjectProperty(s);

View File

@ -38,6 +38,7 @@ import javax.jms.TemporaryQueue;
import javax.jms.TemporaryTopic; import javax.jms.TemporaryTopic;
import javax.jms.TextMessage; import javax.jms.TextMessage;
import javax.jms.Topic; import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicPublisher; import javax.jms.TopicPublisher;
import javax.jms.TopicSession; import javax.jms.TopicSession;
import javax.jms.TopicSubscriber; import javax.jms.TopicSubscriber;
@ -62,6 +63,7 @@ import org.apache.activemq.artemis.core.postoffice.PostOffice;
import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding; import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding;
import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.tests.util.Wait;
import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic; import org.apache.activemq.command.ActiveMQTopic;
import org.junit.Assert; import org.junit.Assert;
@ -1467,6 +1469,27 @@ public class SimpleOpenWireTest extends BasicOpenWireTest {
} }
} }
@Test
public void testNotificationProperties() throws Exception {
try (TopicConnection topicConnection = factory.createTopicConnection()) {
TopicSession topicSession = topicConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
Topic notificationsTopic = topicSession.createTopic("activemq.notifications");
TopicSubscriber subscriber = topicSession.createSubscriber(notificationsTopic);
List<Message> receivedMessages = new ArrayList<>();
subscriber.setMessageListener(receivedMessages::add);
topicConnection.start();
Wait.waitFor(() -> receivedMessages.size() > 0);
Assert.assertTrue(receivedMessages.size() > 0);
for (Message message : receivedMessages) {
assertNotNull(message);
assertNotNull(message.getStringProperty("_AMQ_NotifType"));
}
}
}
private void checkQueueEmpty(String qName) { private void checkQueueEmpty(String qName) {
PostOffice po = server.getPostOffice(); PostOffice po = server.getPostOffice();
LocalQueueBinding binding = (LocalQueueBinding) po.getBinding(SimpleString.toSimpleString(qName)); LocalQueueBinding binding = (LocalQueueBinding) po.getBinding(SimpleString.toSimpleString(qName));