This commit is contained in:
Clebert Suconic 2022-08-02 09:29:46 -04:00
commit 9444fc0bc8
6 changed files with 113 additions and 65 deletions

View File

@ -69,7 +69,7 @@ public class SelectorTranslator {
}
private static String parse(final String input, final String match, final String replace) {
public static String parse(final String input, final String match, final String replace) {
final char quote = '\'';
boolean inQuote = false;

View File

@ -0,0 +1,41 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.protocol.openwire;
import org.apache.activemq.artemis.api.core.SimpleString;
public class OpenWireConstants {
private static final SimpleString AMQ_PREFIX = new SimpleString("__HDR_");
public static final SimpleString AMQ_MSG_DLQ_DELIVERY_FAILURE_CAUSE_PROPERTY = new SimpleString(AMQ_PREFIX + "dlqDeliveryFailureCause");
public static final SimpleString AMQ_MSG_ARRIVAL = new SimpleString(AMQ_PREFIX + "ARRIVAL");
public static final SimpleString AMQ_MSG_BROKER_IN_TIME = new SimpleString(AMQ_PREFIX + "BROKER_IN_TIME");
public static final SimpleString AMQ_MSG_BROKER_PATH = new SimpleString(AMQ_PREFIX + "BROKER_PATH");
public static final SimpleString AMQ_MSG_CLUSTER = new SimpleString(AMQ_PREFIX + "CLUSTER");
public static final SimpleString AMQ_MSG_COMMAND_ID = new SimpleString(AMQ_PREFIX + "COMMAND_ID");
public static final SimpleString AMQ_MSG_DATASTRUCTURE = new SimpleString(AMQ_PREFIX + "DATASTRUCTURE");
public static final SimpleString AMQ_MSG_MESSAGE_ID = new SimpleString(AMQ_PREFIX + "MESSAGE_ID");
public static final SimpleString AMQ_MSG_ORIG_DESTINATION = new SimpleString(AMQ_PREFIX + "ORIG_DESTINATION");
public static final SimpleString AMQ_MSG_PRODUCER_ID = new SimpleString(AMQ_PREFIX + "PRODUCER_ID");
public static final SimpleString AMQ_MSG_REPLY_TO = new SimpleString(AMQ_PREFIX + "REPLY_TO");
public static final SimpleString AMQ_MSG_USER_ID = new SimpleString(AMQ_PREFIX + "USER_ID");
public static final SimpleString AMQ_MSG_DROPPABLE = new SimpleString(AMQ_PREFIX + "DROPPABLE");
public static final SimpleString AMQ_MSG_COMPRESSED = new SimpleString(AMQ_PREFIX + "COMPRESSED");
public static final SimpleString JMS_TYPE_PROPERTY = new SimpleString("JMSType");
public static final SimpleString JMS_CORRELATION_ID_PROPERTY = new SimpleString("JMSCorrelationID");
}

View File

@ -81,28 +81,6 @@ import static org.apache.activemq.command.ActiveMQDestination.QUEUE_TYPE;
public final class OpenWireMessageConverter {
private static final SimpleString JMS_TYPE_PROPERTY = new SimpleString("JMSType");
private static final SimpleString JMS_CORRELATION_ID_PROPERTY = new SimpleString("JMSCorrelationID");
private static final SimpleString AMQ_PREFIX = new SimpleString("__HDR_");
public static final SimpleString AMQ_MSG_DLQ_DELIVERY_FAILURE_CAUSE_PROPERTY = new SimpleString(AMQ_PREFIX + "dlqDeliveryFailureCause");
private static final SimpleString AMQ_MSG_ARRIVAL = new SimpleString(AMQ_PREFIX + "ARRIVAL");
private static final SimpleString AMQ_MSG_BROKER_IN_TIME = new SimpleString(AMQ_PREFIX + "BROKER_IN_TIME");
private static final SimpleString AMQ_MSG_BROKER_PATH = new SimpleString(AMQ_PREFIX + "BROKER_PATH");
private static final SimpleString AMQ_MSG_CLUSTER = new SimpleString(AMQ_PREFIX + "CLUSTER");
private static final SimpleString AMQ_MSG_COMMAND_ID = new SimpleString(AMQ_PREFIX + "COMMAND_ID");
private static final SimpleString AMQ_MSG_DATASTRUCTURE = new SimpleString(AMQ_PREFIX + "DATASTRUCTURE");
public static final SimpleString AMQ_MSG_MESSAGE_ID = new SimpleString(AMQ_PREFIX + "MESSAGE_ID");
public static final SimpleString AMQ_MSG_ORIG_DESTINATION = new SimpleString(AMQ_PREFIX + "ORIG_DESTINATION");
public static final SimpleString AMQ_MSG_PRODUCER_ID = new SimpleString(AMQ_PREFIX + "PRODUCER_ID");
public static final SimpleString AMQ_MSG_REPLY_TO = new SimpleString(AMQ_PREFIX + "REPLY_TO");
private static final SimpleString AMQ_MSG_USER_ID = new SimpleString(AMQ_PREFIX + "USER_ID");
private static final SimpleString AMQ_MSG_DROPPABLE = new SimpleString(AMQ_PREFIX + "DROPPABLE");
private static final SimpleString AMQ_MSG_COMPRESSED = new SimpleString(AMQ_PREFIX + "COMPRESSED");
public OpenWireMessageConverter() {
}
@ -115,7 +93,7 @@ public final class OpenWireMessageConverter {
final String type = messageSend.getType();
if (type != null) {
coreMessage.putStringProperty(JMS_TYPE_PROPERTY, new SimpleString(type));
coreMessage.putStringProperty(OpenWireConstants.JMS_TYPE_PROPERTY, new SimpleString(type));
}
coreMessage.setDurable(messageSend.isPersistent());
coreMessage.setExpiration(messageSend.getExpiration());
@ -133,7 +111,7 @@ public final class OpenWireMessageConverter {
} else if (contents != null) {
final boolean messageCompressed = messageSend.isCompressed();
if (messageCompressed) {
coreMessage.putBooleanProperty(AMQ_MSG_COMPRESSED, true);
coreMessage.putBooleanProperty(OpenWireConstants.AMQ_MSG_COMPRESSED, true);
}
switch (coreType) {
@ -158,8 +136,8 @@ public final class OpenWireMessageConverter {
}
}
//amq specific
coreMessage.putLongProperty(AMQ_MSG_ARRIVAL, messageSend.getArrival());
coreMessage.putLongProperty(AMQ_MSG_BROKER_IN_TIME, messageSend.getBrokerInTime());
coreMessage.putLongProperty(OpenWireConstants.AMQ_MSG_ARRIVAL, messageSend.getArrival());
coreMessage.putLongProperty(OpenWireConstants.AMQ_MSG_BROKER_IN_TIME, messageSend.getBrokerInTime());
final BrokerId[] brokers = messageSend.getBrokerPath();
if (brokers != null) {
putMsgBrokerPath(brokers, coreMessage);
@ -169,10 +147,10 @@ public final class OpenWireMessageConverter {
putMsgCluster(cluster, coreMessage);
}
coreMessage.putIntProperty(AMQ_MSG_COMMAND_ID, messageSend.getCommandId());
coreMessage.putIntProperty(OpenWireConstants.AMQ_MSG_COMMAND_ID, messageSend.getCommandId());
final String corrId = messageSend.getCorrelationId();
if (corrId != null) {
coreMessage.putStringProperty(JMS_CORRELATION_ID_PROPERTY, new SimpleString(corrId));
coreMessage.putStringProperty(OpenWireConstants.JMS_CORRELATION_ID_PROPERTY, new SimpleString(corrId));
}
final DataStructure ds = messageSend.getDataStructure();
if (ds != null) {
@ -186,14 +164,15 @@ public final class OpenWireMessageConverter {
final MessageId messageId = messageSend.getMessageId();
if (messageId != null) {
coreMessage.putStringProperty(AMQ_MSG_MESSAGE_ID, SimpleString.toSimpleString(messageId.toString()));
coreMessage.putStringProperty(OpenWireConstants.AMQ_MSG_MESSAGE_ID, SimpleString.toSimpleString(messageId.toString()));
coreMessage.putStringProperty(OpenWireConstants.AMQ_MSG_MESSAGE_ID, SimpleString.toSimpleString(messageId.toString()));
}
coreMessage.setUserID(UUIDGenerator.getInstance().generateUUID());
final ProducerId producerId = messageSend.getProducerId();
if (producerId != null) {
coreMessage.putStringProperty(AMQ_MSG_PRODUCER_ID, SimpleString.toSimpleString(producerId.toString()));
coreMessage.putStringProperty(OpenWireConstants.AMQ_MSG_PRODUCER_ID, SimpleString.toSimpleString(producerId.toString()));
}
putMsgProperties(messageSend, coreMessage);
@ -216,13 +195,13 @@ public final class OpenWireMessageConverter {
final String userId = messageSend.getUserID();
if (userId != null) {
coreMessage.putStringProperty(AMQ_MSG_USER_ID, new SimpleString(userId));
coreMessage.putStringProperty(OpenWireConstants.AMQ_MSG_USER_ID, new SimpleString(userId));
}
coreMessage.putBooleanProperty(AMQ_MSG_DROPPABLE, messageSend.isDroppable());
coreMessage.putBooleanProperty(OpenWireConstants.AMQ_MSG_DROPPABLE, messageSend.isDroppable());
final ActiveMQDestination origDest = messageSend.getOriginalDestination();
if (origDest != null) {
coreMessage.putStringProperty(AMQ_MSG_ORIG_DESTINATION, origDest.getQualifiedName());
coreMessage.putStringProperty(OpenWireConstants.AMQ_MSG_ORIG_DESTINATION, origDest.getQualifiedName());
}
final Object scheduledDelay = messageSend.getProperties().get(ScheduledMessage.AMQ_SCHEDULED_DELAY);
@ -419,7 +398,7 @@ public final class OpenWireMessageConverter {
builder.append(','); //is this separator safe?
}
}
coreMessage.putStringProperty(AMQ_MSG_BROKER_PATH, new SimpleString(builder.toString()));
coreMessage.putStringProperty(OpenWireConstants.AMQ_MSG_BROKER_PATH, new SimpleString(builder.toString()));
}
private static void putMsgCluster(final BrokerId[] cluster, final CoreMessage coreMessage) {
@ -430,7 +409,7 @@ public final class OpenWireMessageConverter {
builder.append(','); //is this separator safe?
}
}
coreMessage.putStringProperty(AMQ_MSG_CLUSTER, new SimpleString(builder.toString()));
coreMessage.putStringProperty(OpenWireConstants.AMQ_MSG_CLUSTER, new SimpleString(builder.toString()));
}
private static void putMsgDataStructure(final DataStructure ds,
@ -438,7 +417,7 @@ public final class OpenWireMessageConverter {
final CoreMessage coreMessage) throws IOException {
final ByteSequence dsBytes = marshaller.marshal(ds);
dsBytes.compact();
coreMessage.putBytesProperty(AMQ_MSG_DATASTRUCTURE, dsBytes.data);
coreMessage.putBytesProperty(OpenWireConstants.AMQ_MSG_DATASTRUCTURE, dsBytes.data);
}
private static void putMsgProperties(final Message messageSend,
@ -525,7 +504,7 @@ public final class OpenWireMessageConverter {
AMQConsumer consumer, UUID serverNodeUUID) throws IOException {
final ActiveMQMessage amqMsg;
final byte coreType = coreMessage.getType();
final Boolean compressProp = getObjectProperty(coreMessage, Boolean.class, AMQ_MSG_COMPRESSED);
final Boolean compressProp = getObjectProperty(coreMessage, Boolean.class, OpenWireConstants.AMQ_MSG_COMPRESSED);
final boolean isCompressed = compressProp != null && compressProp;
final byte[] bytes;
final ActiveMQBuffer buffer = coreMessage.getDataBuffer();
@ -560,7 +539,7 @@ public final class OpenWireMessageConverter {
throw new IllegalStateException("Unknown message type: " + coreMessage.getType());
}
final String type = getObjectProperty(coreMessage, String.class, JMS_TYPE_PROPERTY);
final String type = getObjectProperty(coreMessage, String.class, OpenWireConstants.JMS_TYPE_PROPERTY);
if (type != null) {
amqMsg.setJMSType(type);
}
@ -569,7 +548,7 @@ public final class OpenWireMessageConverter {
amqMsg.setPriority(coreMessage.getPriority());
amqMsg.setTimestamp(coreMessage.getTimestamp());
Long brokerInTime = getObjectProperty(coreMessage, Long.class, AMQ_MSG_BROKER_IN_TIME);
Long brokerInTime = getObjectProperty(coreMessage, Long.class, OpenWireConstants.AMQ_MSG_BROKER_IN_TIME);
if (brokerInTime == null) {
brokerInTime = 0L;
}
@ -579,35 +558,35 @@ public final class OpenWireMessageConverter {
//we need check null because messages may come from other clients
//and those amq specific attribute may not be set.
Long arrival = getObjectProperty(coreMessage, Long.class, AMQ_MSG_ARRIVAL);
Long arrival = getObjectProperty(coreMessage, Long.class, OpenWireConstants.AMQ_MSG_ARRIVAL);
if (arrival == null) {
//messages from other sources (like core client) may not set this prop
arrival = 0L;
}
amqMsg.setArrival(arrival);
final SimpleString brokerPath = getObjectProperty(coreMessage, SimpleString.class, AMQ_MSG_BROKER_PATH);
final SimpleString brokerPath = getObjectProperty(coreMessage, SimpleString.class, OpenWireConstants.AMQ_MSG_BROKER_PATH);
if (brokerPath != null && brokerPath.length() > 0) {
setAMQMsgBrokerPath(amqMsg, brokerPath.toString());
}
final SimpleString clusterPath = getObjectProperty(coreMessage, SimpleString.class, AMQ_MSG_CLUSTER);
final SimpleString clusterPath = getObjectProperty(coreMessage, SimpleString.class, OpenWireConstants.AMQ_MSG_CLUSTER);
if (clusterPath != null && clusterPath.length() > 0) {
setAMQMsgClusterPath(amqMsg, clusterPath.toString());
}
Integer commandId = getObjectProperty(coreMessage, Integer.class, AMQ_MSG_COMMAND_ID);
Integer commandId = getObjectProperty(coreMessage, Integer.class, OpenWireConstants.AMQ_MSG_COMMAND_ID);
if (commandId == null) {
commandId = -1;
}
amqMsg.setCommandId(commandId);
final SimpleString corrId = getObjectProperty(coreMessage, SimpleString.class, JMS_CORRELATION_ID_PROPERTY);
final SimpleString corrId = getObjectProperty(coreMessage, SimpleString.class, OpenWireConstants.JMS_CORRELATION_ID_PROPERTY);
if (corrId != null) {
amqMsg.setCorrelationId(corrId.toString());
}
final byte[] dsBytes = getObjectProperty(coreMessage, byte[].class, AMQ_MSG_DATASTRUCTURE);
final byte[] dsBytes = getObjectProperty(coreMessage, byte[].class, OpenWireConstants.AMQ_MSG_DATASTRUCTURE);
if (dsBytes != null) {
setAMQMsgDataStructure(amqMsg, marshaller, dsBytes);
}
@ -622,7 +601,7 @@ public final class OpenWireMessageConverter {
amqMsg.setGroupSequence(coreMessage.getGroupSequence());
final Object messageIdValue = getObjectProperty(coreMessage, Object.class, AMQ_MSG_MESSAGE_ID);
final Object messageIdValue = getObjectProperty(coreMessage, Object.class, OpenWireConstants.AMQ_MSG_MESSAGE_ID);
final MessageId messageId;
if (messageIdValue instanceof SimpleString) {
messageId = new MessageId(messageIdValue.toString());
@ -638,7 +617,7 @@ public final class OpenWireMessageConverter {
amqMsg.setMessageId(messageId);
final Object origDestValue = getObjectProperty(coreMessage, Object.class, AMQ_MSG_ORIG_DESTINATION);
final Object origDestValue = getObjectProperty(coreMessage, Object.class, OpenWireConstants.AMQ_MSG_ORIG_DESTINATION);
if (origDestValue instanceof SimpleString) {
amqMsg.setOriginalDestination(ActiveMQDestination.createDestination(origDestValue.toString(), QUEUE_TYPE));
} else if (origDestValue instanceof byte[]) {
@ -646,7 +625,7 @@ public final class OpenWireMessageConverter {
amqMsg.setOriginalDestination(origDest);
}
final Object producerIdValue = getObjectProperty(coreMessage, Object.class, AMQ_MSG_PRODUCER_ID);
final Object producerIdValue = getObjectProperty(coreMessage, Object.class, OpenWireConstants.AMQ_MSG_PRODUCER_ID);
if (producerIdValue instanceof SimpleString && ((SimpleString) producerIdValue).length() > 0) {
amqMsg.setProducerId(new ProducerId(producerIdValue.toString()));
} else if (producerIdValue instanceof byte[]) {
@ -656,7 +635,7 @@ public final class OpenWireMessageConverter {
amqMsg.setRedeliveryCounter(reference.getDeliveryCount() - 1);
final Object replyToValue = getObjectProperty(coreMessage, Object.class, AMQ_MSG_REPLY_TO);
final Object replyToValue = getObjectProperty(coreMessage, Object.class, OpenWireConstants.AMQ_MSG_REPLY_TO);
if (replyToValue instanceof SimpleString) {
amqMsg.setReplyTo(ActiveMQDestination.createDestination(replyToValue.toString(), QUEUE_TYPE));
} else if (replyToValue instanceof byte[]) {
@ -664,17 +643,17 @@ public final class OpenWireMessageConverter {
amqMsg.setReplyTo(replyTo);
}
final SimpleString userId = getObjectProperty(coreMessage, SimpleString.class, AMQ_MSG_USER_ID);
final SimpleString userId = getObjectProperty(coreMessage, SimpleString.class, OpenWireConstants.AMQ_MSG_USER_ID);
if (userId != null && userId.length() > 0) {
amqMsg.setUserID(userId.toString());
}
final Boolean isDroppable = getObjectProperty(coreMessage, Boolean.class, AMQ_MSG_DROPPABLE);
final Boolean isDroppable = getObjectProperty(coreMessage, Boolean.class, OpenWireConstants.AMQ_MSG_DROPPABLE);
if (isDroppable != null) {
amqMsg.setDroppable(isDroppable);
}
final SimpleString dlqCause = getObjectProperty(coreMessage, SimpleString.class, AMQ_MSG_DLQ_DELIVERY_FAILURE_CAUSE_PROPERTY);
final SimpleString dlqCause = getObjectProperty(coreMessage, SimpleString.class, OpenWireConstants.AMQ_MSG_DLQ_DELIVERY_FAILURE_CAUSE_PROPERTY);
if (dlqCause != null) {
setAMQMsgDlqDeliveryFailureCause(amqMsg, dlqCause);
}

View File

@ -49,6 +49,7 @@ import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.reader.MessageUtil;
import org.apache.activemq.artemis.utils.CompositeAddress;
import org.apache.activemq.artemis.core.protocol.openwire.OpenWireConstants;
import org.apache.activemq.artemis.utils.SelectorTranslator;
import org.apache.activemq.command.ConsumerControl;
import org.apache.activemq.command.ConsumerId;
@ -59,6 +60,8 @@ import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.MessagePull;
import org.apache.activemq.command.RemoveInfo;
import static org.apache.activemq.artemis.core.protocol.openwire.OpenWireConstants.AMQ_MSG_MESSAGE_ID;
public class AMQConsumer {
private final AMQSession session;
private final org.apache.activemq.command.ActiveMQDestination openwireDestination;
@ -117,9 +120,21 @@ public class AMQConsumer {
return this.rolledbackMessageRefs;
}
private static String convertOpenWireToActiveMQFilterString(final String selectorString) {
if (selectorString == null) {
return null;
}
String filterString = SelectorTranslator.convertToActiveMQFilterString(selectorString);
filterString = SelectorTranslator.parse(filterString, "AMQUserID", AMQ_MSG_MESSAGE_ID.toString());
return filterString;
}
public void init(SlowConsumerDetectionListener slowConsumerDetectionListener, long nativeId) throws Exception {
SimpleString selector = info.getSelector() == null ? null : new SimpleString(SelectorTranslator.convertToActiveMQFilterString(info.getSelector()));
SimpleString selector = info.getSelector() == null ? null : new SimpleString(convertOpenWireToActiveMQFilterString(info.getSelector()));
boolean preAck = false;
if (info.isNoLocal()) {
if (!AdvisorySupport.isAdvisoryTopic(openwireDestination)) {
@ -350,7 +365,7 @@ public class AMQConsumer {
Throwable poisonCause = ack.getPoisonCause();
if (poisonCause != null) {
((QueueImpl) ref.getQueue()).decDelivering(ref);
ref.getMessage().putStringProperty(OpenWireMessageConverter.AMQ_MSG_DLQ_DELIVERY_FAILURE_CAUSE_PROPERTY, new SimpleString(poisonCause.toString()));
ref.getMessage().putStringProperty(OpenWireConstants.AMQ_MSG_DLQ_DELIVERY_FAILURE_CAUSE_PROPERTY, new SimpleString(poisonCause.toString()));
((QueueImpl) ref.getQueue()).incDelivering(ref);
}
ref.getQueue().sendToDeadLetterAddress(transaction, ref);

View File

@ -40,10 +40,6 @@ import org.apache.activemq.wireformat.WireFormat;
import org.junit.Test;
import org.mockito.Mockito;
import static org.apache.activemq.artemis.core.protocol.openwire.OpenWireMessageConverter.AMQ_MSG_MESSAGE_ID;
import static org.apache.activemq.artemis.core.protocol.openwire.OpenWireMessageConverter.AMQ_MSG_ORIG_DESTINATION;
import static org.apache.activemq.artemis.core.protocol.openwire.OpenWireMessageConverter.AMQ_MSG_PRODUCER_ID;
import static org.apache.activemq.artemis.core.protocol.openwire.OpenWireMessageConverter.AMQ_MSG_REPLY_TO;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
@ -147,7 +143,7 @@ public class OpenWireMessageConverterTest {
classicMessage.setProducerId(new ProducerId(PRODUCER_ID));
classicMessage.setMessageId(new MessageId("1:1:1"));
Message artemisMessage = OpenWireMessageConverter.inbound(classicMessage.getMessage(), openWireFormat, null);
assertEquals(PRODUCER_ID, artemisMessage.getStringProperty(OpenWireMessageConverter.AMQ_MSG_PRODUCER_ID));
assertEquals(PRODUCER_ID, artemisMessage.getStringProperty(OpenWireConstants.AMQ_MSG_PRODUCER_ID));
MessageReference messageReference = new MessageReferenceImpl(artemisMessage, Mockito.mock(Queue.class));
AMQConsumer amqConsumer = Mockito.mock(AMQConsumer.class);
@ -165,7 +161,7 @@ public class OpenWireMessageConverterTest {
final ByteSequence pidBytes = openWireFormat.marshal(classicMessage.getProducerId());
pidBytes.compact();
ICoreMessage coreMessage = new CoreMessage().initBuffer(8);
coreMessage.putBytesProperty(AMQ_MSG_PRODUCER_ID, pidBytes.data);
coreMessage.putBytesProperty(OpenWireConstants.AMQ_MSG_PRODUCER_ID, pidBytes.data);
MessageReference messageReference = new MessageReferenceImpl(coreMessage, Mockito.mock(Queue.class));
AMQConsumer amqConsumer = Mockito.mock(AMQConsumer.class);
Mockito.when(amqConsumer.getOpenwireDestination()).thenReturn(destination);
@ -180,7 +176,7 @@ public class OpenWireMessageConverterTest {
ActiveMQMessage classicMessage = new ActiveMQMessage();
classicMessage.setMessageId(new MessageId(MESSAGE_ID));
Message artemisMessage = OpenWireMessageConverter.inbound(classicMessage.getMessage(), openWireFormat, null);
assertEquals(MESSAGE_ID, artemisMessage.getStringProperty(AMQ_MSG_MESSAGE_ID));
assertEquals(MESSAGE_ID, artemisMessage.getStringProperty(OpenWireConstants.AMQ_MSG_MESSAGE_ID));
MessageReference messageReference = new MessageReferenceImpl(artemisMessage, Mockito.mock(Queue.class));
AMQConsumer amqConsumer = Mockito.mock(AMQConsumer.class);
@ -198,7 +194,7 @@ public class OpenWireMessageConverterTest {
final ByteSequence midBytes = openWireFormat.marshal(classicMessage.getMessageId());
midBytes.compact();
ICoreMessage coreMessage = new CoreMessage().initBuffer(8);
coreMessage.putBytesProperty(AMQ_MSG_MESSAGE_ID, midBytes.data);
coreMessage.putBytesProperty(OpenWireConstants.AMQ_MSG_MESSAGE_ID, midBytes.data);
MessageReference messageReference = new MessageReferenceImpl(coreMessage, Mockito.mock(Queue.class));
AMQConsumer amqConsumer = Mockito.mock(AMQConsumer.class);
Mockito.when(amqConsumer.getOpenwireDestination()).thenReturn(destination);
@ -215,7 +211,7 @@ public class OpenWireMessageConverterTest {
final ByteSequence destBytes = openWireFormat.marshal(classicMessage.getOriginalDestination());
destBytes.compact();
ICoreMessage coreMessage = new CoreMessage().initBuffer(8);
coreMessage.putBytesProperty(AMQ_MSG_ORIG_DESTINATION, destBytes.data);
coreMessage.putBytesProperty(OpenWireConstants.AMQ_MSG_ORIG_DESTINATION, destBytes.data);
MessageReference messageReference = new MessageReferenceImpl(coreMessage, Mockito.mock(Queue.class));
AMQConsumer amqConsumer = Mockito.mock(AMQConsumer.class);
Mockito.when(amqConsumer.getOpenwireDestination()).thenReturn(destination);
@ -232,7 +228,7 @@ public class OpenWireMessageConverterTest {
final ByteSequence destBytes = openWireFormat.marshal(classicMessage.getJMSReplyTo());
destBytes.compact();
ICoreMessage coreMessage = new CoreMessage().initBuffer(8);
coreMessage.putBytesProperty(AMQ_MSG_REPLY_TO, destBytes.data);
coreMessage.putBytesProperty(OpenWireConstants.AMQ_MSG_REPLY_TO, destBytes.data);
MessageReference messageReference = new MessageReferenceImpl(coreMessage, Mockito.mock(Queue.class));
AMQConsumer amqConsumer = Mockito.mock(AMQConsumer.class);
Mockito.when(amqConsumer.getOpenwireDestination()).thenReturn(destination);

View File

@ -42,6 +42,7 @@ import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
import org.apache.activemq.artemis.utils.Wait;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.util.IdGenerator;
import org.junit.Test;
/**
@ -271,4 +272,20 @@ public class JMSConsumer2Test extends BasicOpenWireTest {
connection.close();
}
@Test
public void testSelectorWithJMSMessageID() throws Exception {
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
ActiveMQDestination destination = createDestination(session, ActiveMQDestination.QUEUE_TYPE);
sendMessages(connection, destination, 1);
/*
* The OpenWire client uses the hostname in the JMSMessageID so the test
* uses the same method for the selector so that the test will work on
* any host.
*/
MessageConsumer consumer = session.createConsumer(destination, "JMSMessageID like '%" + IdGenerator.getHostName() + "%'");
connection.start();
Message m = consumer.receive(500);
assertNotNull(m);
}
}