ARTEMIS-842 JMSMessageID doesn't appear to work in selector
This commit is contained in:
parent
df4c3c8584
commit
4691cbe882
|
@ -72,6 +72,11 @@ public final class FilterConstants {
|
||||||
*/
|
*/
|
||||||
public static final SimpleString ACTIVEMQ_PREFIX = new SimpleString("AMQ");
|
public static final SimpleString ACTIVEMQ_PREFIX = new SimpleString("AMQ");
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Proton protocol stores JMSMessageID as NATIVE_MESSAGE_ID
|
||||||
|
*/
|
||||||
|
public static final String NATIVE_MESSAGE_ID = "NATIVE_MESSAGE_ID";
|
||||||
|
|
||||||
private FilterConstants() {
|
private FilterConstants() {
|
||||||
// Utility class
|
// Utility class
|
||||||
}
|
}
|
||||||
|
|
|
@ -31,9 +31,9 @@ import org.apache.activemq.artemis.core.message.impl.MessageInternal;
|
||||||
import org.apache.activemq.artemis.jms.client.ActiveMQDestination;
|
import org.apache.activemq.artemis.jms.client.ActiveMQDestination;
|
||||||
import org.apache.activemq.artemis.reader.MessageUtil;
|
import org.apache.activemq.artemis.reader.MessageUtil;
|
||||||
|
|
||||||
public class ServerJMSMessage implements Message {
|
import static org.apache.activemq.artemis.api.core.FilterConstants.NATIVE_MESSAGE_ID;
|
||||||
|
|
||||||
public static final String NATIVE_MESSAGE_ID = "NATIVE_MESSAGE_ID";
|
public class ServerJMSMessage implements Message {
|
||||||
|
|
||||||
protected final MessageInternal message;
|
protected final MessageInternal message;
|
||||||
|
|
||||||
|
|
|
@ -16,6 +16,7 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.activemq.artemis.protocol.amqp.converter.message;
|
package org.apache.activemq.artemis.protocol.amqp.converter.message;
|
||||||
|
|
||||||
|
import static org.apache.activemq.artemis.api.core.FilterConstants.NATIVE_MESSAGE_ID;
|
||||||
import static org.apache.activemq.artemis.api.core.Message.HDR_SCHEDULED_DELIVERY_TIME;
|
import static org.apache.activemq.artemis.api.core.Message.HDR_SCHEDULED_DELIVERY_TIME;
|
||||||
import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.AMQP_DATA;
|
import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.AMQP_DATA;
|
||||||
import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.AMQP_NULL;
|
import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.AMQP_NULL;
|
||||||
|
@ -338,7 +339,7 @@ public class JMSMappingOutboundTransformer extends OutboundTransformer {
|
||||||
}
|
}
|
||||||
properties.setGroupId(value);
|
properties.setGroupId(value);
|
||||||
continue;
|
continue;
|
||||||
} else if (key.equals(ServerJMSMessage.NATIVE_MESSAGE_ID)) {
|
} else if (key.equals(NATIVE_MESSAGE_ID)) {
|
||||||
// skip..internal use only
|
// skip..internal use only
|
||||||
continue;
|
continue;
|
||||||
} else if (key.endsWith(HDR_SCHEDULED_DELIVERY_TIME.toString())) {
|
} else if (key.endsWith(HDR_SCHEDULED_DELIVERY_TIME.toString())) {
|
||||||
|
|
|
@ -28,6 +28,8 @@ import org.apache.activemq.artemis.selector.filter.FilterException;
|
||||||
import org.apache.activemq.artemis.selector.filter.Filterable;
|
import org.apache.activemq.artemis.selector.filter.Filterable;
|
||||||
import org.apache.activemq.artemis.selector.impl.SelectorParser;
|
import org.apache.activemq.artemis.selector.impl.SelectorParser;
|
||||||
|
|
||||||
|
import static org.apache.activemq.artemis.api.core.FilterConstants.NATIVE_MESSAGE_ID;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This class implements an ActiveMQ Artemis filter
|
* This class implements an ActiveMQ Artemis filter
|
||||||
*
|
*
|
||||||
|
@ -148,6 +150,13 @@ public class FilterImpl implements Filter {
|
||||||
|
|
||||||
private static Object getHeaderFieldValue(final ServerMessage msg, final SimpleString fieldName) {
|
private static Object getHeaderFieldValue(final ServerMessage msg, final SimpleString fieldName) {
|
||||||
if (FilterConstants.ACTIVEMQ_USERID.equals(fieldName)) {
|
if (FilterConstants.ACTIVEMQ_USERID.equals(fieldName)) {
|
||||||
|
if (msg.getUserID() == null) {
|
||||||
|
// Proton stores JMSMessageID as NATIVE_MESSAGE_ID that is an arbitrary string
|
||||||
|
String amqpNativeID = msg.getStringProperty(NATIVE_MESSAGE_ID);
|
||||||
|
if (amqpNativeID != null) {
|
||||||
|
return new SimpleString(amqpNativeID);
|
||||||
|
}
|
||||||
|
}
|
||||||
// It's the stringified (hex) representation of a user id that can be used in a selector expression
|
// It's the stringified (hex) representation of a user id that can be used in a selector expression
|
||||||
return new SimpleString("ID:" + msg.getUserID());
|
return new SimpleString("ID:" + msg.getUserID());
|
||||||
} else if (FilterConstants.ACTIVEMQ_PRIORITY.equals(fieldName)) {
|
} else if (FilterConstants.ACTIVEMQ_PRIORITY.equals(fieldName)) {
|
||||||
|
|
|
@ -1606,6 +1606,22 @@ public class ProtonTest extends ProtonTestBase {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testFilterJMSMessageID() throws Exception {
|
||||||
|
javax.jms.Queue queue = createQueue(address);
|
||||||
|
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||||
|
MessageProducer p = session.createProducer(queue);
|
||||||
|
TextMessage message = session.createTextMessage();
|
||||||
|
p.send(message);
|
||||||
|
System.out.println("get mid: " + message.getJMSMessageID());
|
||||||
|
connection.start();
|
||||||
|
MessageConsumer messageConsumer = session.createConsumer(queue, "JMSMessageID = '" + message.getJMSMessageID() + "'");
|
||||||
|
TextMessage m = (TextMessage) messageConsumer.receive(5000);
|
||||||
|
Assert.assertNotNull(m);
|
||||||
|
assertEquals(message.getJMSMessageID(), m.getJMSMessageID());
|
||||||
|
connection.close();
|
||||||
|
}
|
||||||
|
|
||||||
private javax.jms.Queue createQueue(String address) throws Exception {
|
private javax.jms.Queue createQueue(String address) throws Exception {
|
||||||
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||||
try {
|
try {
|
||||||
|
|
Loading…
Reference in New Issue