diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/FilterConstants.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/FilterConstants.java index c4809f6bf3..37b221c326 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/FilterConstants.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/FilterConstants.java @@ -72,6 +72,11 @@ public final class FilterConstants { */ public static final SimpleString ACTIVEMQ_PREFIX = new SimpleString("AMQ"); + /** + * Proton protocol stores JMSMessageID as NATIVE_MESSAGE_ID + */ + public static final String NATIVE_MESSAGE_ID = "NATIVE_MESSAGE_ID"; + private FilterConstants() { // Utility class } diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSMessage.java index c7900e46bd..f9a94f5d1a 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSMessage.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSMessage.java @@ -31,9 +31,9 @@ import org.apache.activemq.artemis.core.message.impl.MessageInternal; import org.apache.activemq.artemis.jms.client.ActiveMQDestination; import org.apache.activemq.artemis.reader.MessageUtil; -public class ServerJMSMessage implements Message { +import static org.apache.activemq.artemis.api.core.FilterConstants.NATIVE_MESSAGE_ID; - public static final String NATIVE_MESSAGE_ID = "NATIVE_MESSAGE_ID"; +public class ServerJMSMessage implements Message { protected final MessageInternal message; diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingOutboundTransformer.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingOutboundTransformer.java index efa1a8133b..2fa714506e 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingOutboundTransformer.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingOutboundTransformer.java @@ -16,6 +16,7 @@ */ package org.apache.activemq.artemis.protocol.amqp.converter.message; +import static org.apache.activemq.artemis.api.core.FilterConstants.NATIVE_MESSAGE_ID; import static org.apache.activemq.artemis.api.core.Message.HDR_SCHEDULED_DELIVERY_TIME; import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.AMQP_DATA; import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.AMQP_NULL; @@ -338,7 +339,7 @@ public class JMSMappingOutboundTransformer extends OutboundTransformer { } properties.setGroupId(value); continue; - } else if (key.equals(ServerJMSMessage.NATIVE_MESSAGE_ID)) { + } else if (key.equals(NATIVE_MESSAGE_ID)) { // skip..internal use only continue; } else if (key.endsWith(HDR_SCHEDULED_DELIVERY_TIME.toString())) { diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/filter/impl/FilterImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/filter/impl/FilterImpl.java index b014b8608e..0a459c9500 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/filter/impl/FilterImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/filter/impl/FilterImpl.java @@ -28,6 +28,8 @@ import org.apache.activemq.artemis.selector.filter.FilterException; import org.apache.activemq.artemis.selector.filter.Filterable; import org.apache.activemq.artemis.selector.impl.SelectorParser; +import static org.apache.activemq.artemis.api.core.FilterConstants.NATIVE_MESSAGE_ID; + /** * This class implements an ActiveMQ Artemis filter * @@ -148,6 +150,13 @@ public class FilterImpl implements Filter { private static Object getHeaderFieldValue(final ServerMessage msg, final SimpleString fieldName) { if (FilterConstants.ACTIVEMQ_USERID.equals(fieldName)) { + if (msg.getUserID() == null) { + // Proton stores JMSMessageID as NATIVE_MESSAGE_ID that is an arbitrary string + String amqpNativeID = msg.getStringProperty(NATIVE_MESSAGE_ID); + if (amqpNativeID != null) { + return new SimpleString(amqpNativeID); + } + } // It's the stringified (hex) representation of a user id that can be used in a selector expression return new SimpleString("ID:" + msg.getUserID()); } else if (FilterConstants.ACTIVEMQ_PRIORITY.equals(fieldName)) { diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonTest.java index 1a1021efd5..c3c893365d 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonTest.java @@ -1606,6 +1606,22 @@ public class ProtonTest extends ProtonTestBase { } } + @Test + public void testFilterJMSMessageID() throws Exception { + javax.jms.Queue queue = createQueue(address); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageProducer p = session.createProducer(queue); + TextMessage message = session.createTextMessage(); + p.send(message); + System.out.println("get mid: " + message.getJMSMessageID()); + connection.start(); + MessageConsumer messageConsumer = session.createConsumer(queue, "JMSMessageID = '" + message.getJMSMessageID() + "'"); + TextMessage m = (TextMessage) messageConsumer.receive(5000); + Assert.assertNotNull(m); + assertEquals(message.getJMSMessageID(), m.getJMSMessageID()); + connection.close(); + } + private javax.jms.Queue createQueue(String address) throws Exception { Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); try {