diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/SelectorTranslator.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/SelectorTranslator.java index d006783483..f3c214bf9a 100644 --- a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/SelectorTranslator.java +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/SelectorTranslator.java @@ -69,7 +69,7 @@ public class SelectorTranslator { } - private static String parse(final String input, final String match, final String replace) { + public static String parse(final String input, final String match, final String replace) { final char quote = '\''; boolean inQuote = false; diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConstants.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConstants.java new file mode 100644 index 0000000000..a8e9292ef8 --- /dev/null +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConstants.java @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.core.protocol.openwire; + +import org.apache.activemq.artemis.api.core.SimpleString; + +public class OpenWireConstants { + private static final SimpleString AMQ_PREFIX = new SimpleString("__HDR_"); + public static final SimpleString AMQ_MSG_DLQ_DELIVERY_FAILURE_CAUSE_PROPERTY = new SimpleString(AMQ_PREFIX + "dlqDeliveryFailureCause"); + public static final SimpleString AMQ_MSG_ARRIVAL = new SimpleString(AMQ_PREFIX + "ARRIVAL"); + public static final SimpleString AMQ_MSG_BROKER_IN_TIME = new SimpleString(AMQ_PREFIX + "BROKER_IN_TIME"); + public static final SimpleString AMQ_MSG_BROKER_PATH = new SimpleString(AMQ_PREFIX + "BROKER_PATH"); + public static final SimpleString AMQ_MSG_CLUSTER = new SimpleString(AMQ_PREFIX + "CLUSTER"); + public static final SimpleString AMQ_MSG_COMMAND_ID = new SimpleString(AMQ_PREFIX + "COMMAND_ID"); + public static final SimpleString AMQ_MSG_DATASTRUCTURE = new SimpleString(AMQ_PREFIX + "DATASTRUCTURE"); + public static final SimpleString AMQ_MSG_MESSAGE_ID = new SimpleString(AMQ_PREFIX + "MESSAGE_ID"); + public static final SimpleString AMQ_MSG_ORIG_DESTINATION = new SimpleString(AMQ_PREFIX + "ORIG_DESTINATION"); + public static final SimpleString AMQ_MSG_PRODUCER_ID = new SimpleString(AMQ_PREFIX + "PRODUCER_ID"); + public static final SimpleString AMQ_MSG_REPLY_TO = new SimpleString(AMQ_PREFIX + "REPLY_TO"); + public static final SimpleString AMQ_MSG_USER_ID = new SimpleString(AMQ_PREFIX + "USER_ID"); + public static final SimpleString AMQ_MSG_DROPPABLE = new SimpleString(AMQ_PREFIX + "DROPPABLE"); + public static final SimpleString AMQ_MSG_COMPRESSED = new SimpleString(AMQ_PREFIX + "COMPRESSED"); + + public static final SimpleString JMS_TYPE_PROPERTY = new SimpleString("JMSType"); + public static final SimpleString JMS_CORRELATION_ID_PROPERTY = new SimpleString("JMSCorrelationID"); +} diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java index 92f6e01360..879573c2a6 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java @@ -81,28 +81,6 @@ import static org.apache.activemq.command.ActiveMQDestination.QUEUE_TYPE; public final class OpenWireMessageConverter { - private static final SimpleString JMS_TYPE_PROPERTY = new SimpleString("JMSType"); - private static final SimpleString JMS_CORRELATION_ID_PROPERTY = new SimpleString("JMSCorrelationID"); - private static final SimpleString AMQ_PREFIX = new SimpleString("__HDR_"); - public static final SimpleString AMQ_MSG_DLQ_DELIVERY_FAILURE_CAUSE_PROPERTY = new SimpleString(AMQ_PREFIX + "dlqDeliveryFailureCause"); - - private static final SimpleString AMQ_MSG_ARRIVAL = new SimpleString(AMQ_PREFIX + "ARRIVAL"); - private static final SimpleString AMQ_MSG_BROKER_IN_TIME = new SimpleString(AMQ_PREFIX + "BROKER_IN_TIME"); - - private static final SimpleString AMQ_MSG_BROKER_PATH = new SimpleString(AMQ_PREFIX + "BROKER_PATH"); - private static final SimpleString AMQ_MSG_CLUSTER = new SimpleString(AMQ_PREFIX + "CLUSTER"); - private static final SimpleString AMQ_MSG_COMMAND_ID = new SimpleString(AMQ_PREFIX + "COMMAND_ID"); - private static final SimpleString AMQ_MSG_DATASTRUCTURE = new SimpleString(AMQ_PREFIX + "DATASTRUCTURE"); - public static final SimpleString AMQ_MSG_MESSAGE_ID = new SimpleString(AMQ_PREFIX + "MESSAGE_ID"); - public static final SimpleString AMQ_MSG_ORIG_DESTINATION = new SimpleString(AMQ_PREFIX + "ORIG_DESTINATION"); - public static final SimpleString AMQ_MSG_PRODUCER_ID = new SimpleString(AMQ_PREFIX + "PRODUCER_ID"); - public static final SimpleString AMQ_MSG_REPLY_TO = new SimpleString(AMQ_PREFIX + "REPLY_TO"); - - private static final SimpleString AMQ_MSG_USER_ID = new SimpleString(AMQ_PREFIX + "USER_ID"); - - private static final SimpleString AMQ_MSG_DROPPABLE = new SimpleString(AMQ_PREFIX + "DROPPABLE"); - private static final SimpleString AMQ_MSG_COMPRESSED = new SimpleString(AMQ_PREFIX + "COMPRESSED"); - public OpenWireMessageConverter() { } @@ -115,7 +93,7 @@ public final class OpenWireMessageConverter { final String type = messageSend.getType(); if (type != null) { - coreMessage.putStringProperty(JMS_TYPE_PROPERTY, new SimpleString(type)); + coreMessage.putStringProperty(OpenWireConstants.JMS_TYPE_PROPERTY, new SimpleString(type)); } coreMessage.setDurable(messageSend.isPersistent()); coreMessage.setExpiration(messageSend.getExpiration()); @@ -133,7 +111,7 @@ public final class OpenWireMessageConverter { } else if (contents != null) { final boolean messageCompressed = messageSend.isCompressed(); if (messageCompressed) { - coreMessage.putBooleanProperty(AMQ_MSG_COMPRESSED, true); + coreMessage.putBooleanProperty(OpenWireConstants.AMQ_MSG_COMPRESSED, true); } switch (coreType) { @@ -158,8 +136,8 @@ public final class OpenWireMessageConverter { } } //amq specific - coreMessage.putLongProperty(AMQ_MSG_ARRIVAL, messageSend.getArrival()); - coreMessage.putLongProperty(AMQ_MSG_BROKER_IN_TIME, messageSend.getBrokerInTime()); + coreMessage.putLongProperty(OpenWireConstants.AMQ_MSG_ARRIVAL, messageSend.getArrival()); + coreMessage.putLongProperty(OpenWireConstants.AMQ_MSG_BROKER_IN_TIME, messageSend.getBrokerInTime()); final BrokerId[] brokers = messageSend.getBrokerPath(); if (brokers != null) { putMsgBrokerPath(brokers, coreMessage); @@ -169,10 +147,10 @@ public final class OpenWireMessageConverter { putMsgCluster(cluster, coreMessage); } - coreMessage.putIntProperty(AMQ_MSG_COMMAND_ID, messageSend.getCommandId()); + coreMessage.putIntProperty(OpenWireConstants.AMQ_MSG_COMMAND_ID, messageSend.getCommandId()); final String corrId = messageSend.getCorrelationId(); if (corrId != null) { - coreMessage.putStringProperty(JMS_CORRELATION_ID_PROPERTY, new SimpleString(corrId)); + coreMessage.putStringProperty(OpenWireConstants.JMS_CORRELATION_ID_PROPERTY, new SimpleString(corrId)); } final DataStructure ds = messageSend.getDataStructure(); if (ds != null) { @@ -186,14 +164,15 @@ public final class OpenWireMessageConverter { final MessageId messageId = messageSend.getMessageId(); if (messageId != null) { - coreMessage.putStringProperty(AMQ_MSG_MESSAGE_ID, SimpleString.toSimpleString(messageId.toString())); + coreMessage.putStringProperty(OpenWireConstants.AMQ_MSG_MESSAGE_ID, SimpleString.toSimpleString(messageId.toString())); + coreMessage.putStringProperty(OpenWireConstants.AMQ_MSG_MESSAGE_ID, SimpleString.toSimpleString(messageId.toString())); } coreMessage.setUserID(UUIDGenerator.getInstance().generateUUID()); final ProducerId producerId = messageSend.getProducerId(); if (producerId != null) { - coreMessage.putStringProperty(AMQ_MSG_PRODUCER_ID, SimpleString.toSimpleString(producerId.toString())); + coreMessage.putStringProperty(OpenWireConstants.AMQ_MSG_PRODUCER_ID, SimpleString.toSimpleString(producerId.toString())); } putMsgProperties(messageSend, coreMessage); @@ -216,13 +195,13 @@ public final class OpenWireMessageConverter { final String userId = messageSend.getUserID(); if (userId != null) { - coreMessage.putStringProperty(AMQ_MSG_USER_ID, new SimpleString(userId)); + coreMessage.putStringProperty(OpenWireConstants.AMQ_MSG_USER_ID, new SimpleString(userId)); } - coreMessage.putBooleanProperty(AMQ_MSG_DROPPABLE, messageSend.isDroppable()); + coreMessage.putBooleanProperty(OpenWireConstants.AMQ_MSG_DROPPABLE, messageSend.isDroppable()); final ActiveMQDestination origDest = messageSend.getOriginalDestination(); if (origDest != null) { - coreMessage.putStringProperty(AMQ_MSG_ORIG_DESTINATION, origDest.getQualifiedName()); + coreMessage.putStringProperty(OpenWireConstants.AMQ_MSG_ORIG_DESTINATION, origDest.getQualifiedName()); } final Object scheduledDelay = messageSend.getProperties().get(ScheduledMessage.AMQ_SCHEDULED_DELAY); @@ -419,7 +398,7 @@ public final class OpenWireMessageConverter { builder.append(','); //is this separator safe? } } - coreMessage.putStringProperty(AMQ_MSG_BROKER_PATH, new SimpleString(builder.toString())); + coreMessage.putStringProperty(OpenWireConstants.AMQ_MSG_BROKER_PATH, new SimpleString(builder.toString())); } private static void putMsgCluster(final BrokerId[] cluster, final CoreMessage coreMessage) { @@ -430,7 +409,7 @@ public final class OpenWireMessageConverter { builder.append(','); //is this separator safe? } } - coreMessage.putStringProperty(AMQ_MSG_CLUSTER, new SimpleString(builder.toString())); + coreMessage.putStringProperty(OpenWireConstants.AMQ_MSG_CLUSTER, new SimpleString(builder.toString())); } private static void putMsgDataStructure(final DataStructure ds, @@ -438,7 +417,7 @@ public final class OpenWireMessageConverter { final CoreMessage coreMessage) throws IOException { final ByteSequence dsBytes = marshaller.marshal(ds); dsBytes.compact(); - coreMessage.putBytesProperty(AMQ_MSG_DATASTRUCTURE, dsBytes.data); + coreMessage.putBytesProperty(OpenWireConstants.AMQ_MSG_DATASTRUCTURE, dsBytes.data); } private static void putMsgProperties(final Message messageSend, @@ -525,7 +504,7 @@ public final class OpenWireMessageConverter { AMQConsumer consumer, UUID serverNodeUUID) throws IOException { final ActiveMQMessage amqMsg; final byte coreType = coreMessage.getType(); - final Boolean compressProp = getObjectProperty(coreMessage, Boolean.class, AMQ_MSG_COMPRESSED); + final Boolean compressProp = getObjectProperty(coreMessage, Boolean.class, OpenWireConstants.AMQ_MSG_COMPRESSED); final boolean isCompressed = compressProp != null && compressProp; final byte[] bytes; final ActiveMQBuffer buffer = coreMessage.getDataBuffer(); @@ -560,7 +539,7 @@ public final class OpenWireMessageConverter { throw new IllegalStateException("Unknown message type: " + coreMessage.getType()); } - final String type = getObjectProperty(coreMessage, String.class, JMS_TYPE_PROPERTY); + final String type = getObjectProperty(coreMessage, String.class, OpenWireConstants.JMS_TYPE_PROPERTY); if (type != null) { amqMsg.setJMSType(type); } @@ -569,7 +548,7 @@ public final class OpenWireMessageConverter { amqMsg.setPriority(coreMessage.getPriority()); amqMsg.setTimestamp(coreMessage.getTimestamp()); - Long brokerInTime = getObjectProperty(coreMessage, Long.class, AMQ_MSG_BROKER_IN_TIME); + Long brokerInTime = getObjectProperty(coreMessage, Long.class, OpenWireConstants.AMQ_MSG_BROKER_IN_TIME); if (brokerInTime == null) { brokerInTime = 0L; } @@ -579,35 +558,35 @@ public final class OpenWireMessageConverter { //we need check null because messages may come from other clients //and those amq specific attribute may not be set. - Long arrival = getObjectProperty(coreMessage, Long.class, AMQ_MSG_ARRIVAL); + Long arrival = getObjectProperty(coreMessage, Long.class, OpenWireConstants.AMQ_MSG_ARRIVAL); if (arrival == null) { //messages from other sources (like core client) may not set this prop arrival = 0L; } amqMsg.setArrival(arrival); - final SimpleString brokerPath = getObjectProperty(coreMessage, SimpleString.class, AMQ_MSG_BROKER_PATH); + final SimpleString brokerPath = getObjectProperty(coreMessage, SimpleString.class, OpenWireConstants.AMQ_MSG_BROKER_PATH); if (brokerPath != null && brokerPath.length() > 0) { setAMQMsgBrokerPath(amqMsg, brokerPath.toString()); } - final SimpleString clusterPath = getObjectProperty(coreMessage, SimpleString.class, AMQ_MSG_CLUSTER); + final SimpleString clusterPath = getObjectProperty(coreMessage, SimpleString.class, OpenWireConstants.AMQ_MSG_CLUSTER); if (clusterPath != null && clusterPath.length() > 0) { setAMQMsgClusterPath(amqMsg, clusterPath.toString()); } - Integer commandId = getObjectProperty(coreMessage, Integer.class, AMQ_MSG_COMMAND_ID); + Integer commandId = getObjectProperty(coreMessage, Integer.class, OpenWireConstants.AMQ_MSG_COMMAND_ID); if (commandId == null) { commandId = -1; } amqMsg.setCommandId(commandId); - final SimpleString corrId = getObjectProperty(coreMessage, SimpleString.class, JMS_CORRELATION_ID_PROPERTY); + final SimpleString corrId = getObjectProperty(coreMessage, SimpleString.class, OpenWireConstants.JMS_CORRELATION_ID_PROPERTY); if (corrId != null) { amqMsg.setCorrelationId(corrId.toString()); } - final byte[] dsBytes = getObjectProperty(coreMessage, byte[].class, AMQ_MSG_DATASTRUCTURE); + final byte[] dsBytes = getObjectProperty(coreMessage, byte[].class, OpenWireConstants.AMQ_MSG_DATASTRUCTURE); if (dsBytes != null) { setAMQMsgDataStructure(amqMsg, marshaller, dsBytes); } @@ -622,7 +601,7 @@ public final class OpenWireMessageConverter { amqMsg.setGroupSequence(coreMessage.getGroupSequence()); - final Object messageIdValue = getObjectProperty(coreMessage, Object.class, AMQ_MSG_MESSAGE_ID); + final Object messageIdValue = getObjectProperty(coreMessage, Object.class, OpenWireConstants.AMQ_MSG_MESSAGE_ID); final MessageId messageId; if (messageIdValue instanceof SimpleString) { messageId = new MessageId(messageIdValue.toString()); @@ -638,7 +617,7 @@ public final class OpenWireMessageConverter { amqMsg.setMessageId(messageId); - final Object origDestValue = getObjectProperty(coreMessage, Object.class, AMQ_MSG_ORIG_DESTINATION); + final Object origDestValue = getObjectProperty(coreMessage, Object.class, OpenWireConstants.AMQ_MSG_ORIG_DESTINATION); if (origDestValue instanceof SimpleString) { amqMsg.setOriginalDestination(ActiveMQDestination.createDestination(origDestValue.toString(), QUEUE_TYPE)); } else if (origDestValue instanceof byte[]) { @@ -646,7 +625,7 @@ public final class OpenWireMessageConverter { amqMsg.setOriginalDestination(origDest); } - final Object producerIdValue = getObjectProperty(coreMessage, Object.class, AMQ_MSG_PRODUCER_ID); + final Object producerIdValue = getObjectProperty(coreMessage, Object.class, OpenWireConstants.AMQ_MSG_PRODUCER_ID); if (producerIdValue instanceof SimpleString && ((SimpleString) producerIdValue).length() > 0) { amqMsg.setProducerId(new ProducerId(producerIdValue.toString())); } else if (producerIdValue instanceof byte[]) { @@ -656,7 +635,7 @@ public final class OpenWireMessageConverter { amqMsg.setRedeliveryCounter(reference.getDeliveryCount() - 1); - final Object replyToValue = getObjectProperty(coreMessage, Object.class, AMQ_MSG_REPLY_TO); + final Object replyToValue = getObjectProperty(coreMessage, Object.class, OpenWireConstants.AMQ_MSG_REPLY_TO); if (replyToValue instanceof SimpleString) { amqMsg.setReplyTo(ActiveMQDestination.createDestination(replyToValue.toString(), QUEUE_TYPE)); } else if (replyToValue instanceof byte[]) { @@ -664,17 +643,17 @@ public final class OpenWireMessageConverter { amqMsg.setReplyTo(replyTo); } - final SimpleString userId = getObjectProperty(coreMessage, SimpleString.class, AMQ_MSG_USER_ID); + final SimpleString userId = getObjectProperty(coreMessage, SimpleString.class, OpenWireConstants.AMQ_MSG_USER_ID); if (userId != null && userId.length() > 0) { amqMsg.setUserID(userId.toString()); } - final Boolean isDroppable = getObjectProperty(coreMessage, Boolean.class, AMQ_MSG_DROPPABLE); + final Boolean isDroppable = getObjectProperty(coreMessage, Boolean.class, OpenWireConstants.AMQ_MSG_DROPPABLE); if (isDroppable != null) { amqMsg.setDroppable(isDroppable); } - final SimpleString dlqCause = getObjectProperty(coreMessage, SimpleString.class, AMQ_MSG_DLQ_DELIVERY_FAILURE_CAUSE_PROPERTY); + final SimpleString dlqCause = getObjectProperty(coreMessage, SimpleString.class, OpenWireConstants.AMQ_MSG_DLQ_DELIVERY_FAILURE_CAUSE_PROPERTY); if (dlqCause != null) { setAMQMsgDlqDeliveryFailureCause(amqMsg, dlqCause); } diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java index e061712a0b..a20d568b01 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java @@ -49,6 +49,7 @@ import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.core.transaction.Transaction; import org.apache.activemq.artemis.reader.MessageUtil; import org.apache.activemq.artemis.utils.CompositeAddress; +import org.apache.activemq.artemis.core.protocol.openwire.OpenWireConstants; import org.apache.activemq.artemis.utils.SelectorTranslator; import org.apache.activemq.command.ConsumerControl; import org.apache.activemq.command.ConsumerId; @@ -59,6 +60,8 @@ import org.apache.activemq.command.MessageId; import org.apache.activemq.command.MessagePull; import org.apache.activemq.command.RemoveInfo; +import static org.apache.activemq.artemis.core.protocol.openwire.OpenWireConstants.AMQ_MSG_MESSAGE_ID; + public class AMQConsumer { private final AMQSession session; private final org.apache.activemq.command.ActiveMQDestination openwireDestination; @@ -117,9 +120,21 @@ public class AMQConsumer { return this.rolledbackMessageRefs; } + + private static String convertOpenWireToActiveMQFilterString(final String selectorString) { + if (selectorString == null) { + return null; + } + + String filterString = SelectorTranslator.convertToActiveMQFilterString(selectorString); + filterString = SelectorTranslator.parse(filterString, "AMQUserID", AMQ_MSG_MESSAGE_ID.toString()); + + return filterString; + } + public void init(SlowConsumerDetectionListener slowConsumerDetectionListener, long nativeId) throws Exception { - SimpleString selector = info.getSelector() == null ? null : new SimpleString(SelectorTranslator.convertToActiveMQFilterString(info.getSelector())); + SimpleString selector = info.getSelector() == null ? null : new SimpleString(convertOpenWireToActiveMQFilterString(info.getSelector())); boolean preAck = false; if (info.isNoLocal()) { if (!AdvisorySupport.isAdvisoryTopic(openwireDestination)) { @@ -350,7 +365,7 @@ public class AMQConsumer { Throwable poisonCause = ack.getPoisonCause(); if (poisonCause != null) { ((QueueImpl) ref.getQueue()).decDelivering(ref); - ref.getMessage().putStringProperty(OpenWireMessageConverter.AMQ_MSG_DLQ_DELIVERY_FAILURE_CAUSE_PROPERTY, new SimpleString(poisonCause.toString())); + ref.getMessage().putStringProperty(OpenWireConstants.AMQ_MSG_DLQ_DELIVERY_FAILURE_CAUSE_PROPERTY, new SimpleString(poisonCause.toString())); ((QueueImpl) ref.getQueue()).incDelivering(ref); } ref.getQueue().sendToDeadLetterAddress(transaction, ref); diff --git a/artemis-protocols/artemis-openwire-protocol/src/test/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverterTest.java b/artemis-protocols/artemis-openwire-protocol/src/test/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverterTest.java index 756f8a6825..c8e761404d 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/test/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverterTest.java +++ b/artemis-protocols/artemis-openwire-protocol/src/test/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverterTest.java @@ -40,10 +40,6 @@ import org.apache.activemq.wireformat.WireFormat; import org.junit.Test; import org.mockito.Mockito; -import static org.apache.activemq.artemis.core.protocol.openwire.OpenWireMessageConverter.AMQ_MSG_MESSAGE_ID; -import static org.apache.activemq.artemis.core.protocol.openwire.OpenWireMessageConverter.AMQ_MSG_ORIG_DESTINATION; -import static org.apache.activemq.artemis.core.protocol.openwire.OpenWireMessageConverter.AMQ_MSG_PRODUCER_ID; -import static org.apache.activemq.artemis.core.protocol.openwire.OpenWireMessageConverter.AMQ_MSG_REPLY_TO; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNull; @@ -147,7 +143,7 @@ public class OpenWireMessageConverterTest { classicMessage.setProducerId(new ProducerId(PRODUCER_ID)); classicMessage.setMessageId(new MessageId("1:1:1")); Message artemisMessage = OpenWireMessageConverter.inbound(classicMessage.getMessage(), openWireFormat, null); - assertEquals(PRODUCER_ID, artemisMessage.getStringProperty(OpenWireMessageConverter.AMQ_MSG_PRODUCER_ID)); + assertEquals(PRODUCER_ID, artemisMessage.getStringProperty(OpenWireConstants.AMQ_MSG_PRODUCER_ID)); MessageReference messageReference = new MessageReferenceImpl(artemisMessage, Mockito.mock(Queue.class)); AMQConsumer amqConsumer = Mockito.mock(AMQConsumer.class); @@ -165,7 +161,7 @@ public class OpenWireMessageConverterTest { final ByteSequence pidBytes = openWireFormat.marshal(classicMessage.getProducerId()); pidBytes.compact(); ICoreMessage coreMessage = new CoreMessage().initBuffer(8); - coreMessage.putBytesProperty(AMQ_MSG_PRODUCER_ID, pidBytes.data); + coreMessage.putBytesProperty(OpenWireConstants.AMQ_MSG_PRODUCER_ID, pidBytes.data); MessageReference messageReference = new MessageReferenceImpl(coreMessage, Mockito.mock(Queue.class)); AMQConsumer amqConsumer = Mockito.mock(AMQConsumer.class); Mockito.when(amqConsumer.getOpenwireDestination()).thenReturn(destination); @@ -180,7 +176,7 @@ public class OpenWireMessageConverterTest { ActiveMQMessage classicMessage = new ActiveMQMessage(); classicMessage.setMessageId(new MessageId(MESSAGE_ID)); Message artemisMessage = OpenWireMessageConverter.inbound(classicMessage.getMessage(), openWireFormat, null); - assertEquals(MESSAGE_ID, artemisMessage.getStringProperty(AMQ_MSG_MESSAGE_ID)); + assertEquals(MESSAGE_ID, artemisMessage.getStringProperty(OpenWireConstants.AMQ_MSG_MESSAGE_ID)); MessageReference messageReference = new MessageReferenceImpl(artemisMessage, Mockito.mock(Queue.class)); AMQConsumer amqConsumer = Mockito.mock(AMQConsumer.class); @@ -198,7 +194,7 @@ public class OpenWireMessageConverterTest { final ByteSequence midBytes = openWireFormat.marshal(classicMessage.getMessageId()); midBytes.compact(); ICoreMessage coreMessage = new CoreMessage().initBuffer(8); - coreMessage.putBytesProperty(AMQ_MSG_MESSAGE_ID, midBytes.data); + coreMessage.putBytesProperty(OpenWireConstants.AMQ_MSG_MESSAGE_ID, midBytes.data); MessageReference messageReference = new MessageReferenceImpl(coreMessage, Mockito.mock(Queue.class)); AMQConsumer amqConsumer = Mockito.mock(AMQConsumer.class); Mockito.when(amqConsumer.getOpenwireDestination()).thenReturn(destination); @@ -215,7 +211,7 @@ public class OpenWireMessageConverterTest { final ByteSequence destBytes = openWireFormat.marshal(classicMessage.getOriginalDestination()); destBytes.compact(); ICoreMessage coreMessage = new CoreMessage().initBuffer(8); - coreMessage.putBytesProperty(AMQ_MSG_ORIG_DESTINATION, destBytes.data); + coreMessage.putBytesProperty(OpenWireConstants.AMQ_MSG_ORIG_DESTINATION, destBytes.data); MessageReference messageReference = new MessageReferenceImpl(coreMessage, Mockito.mock(Queue.class)); AMQConsumer amqConsumer = Mockito.mock(AMQConsumer.class); Mockito.when(amqConsumer.getOpenwireDestination()).thenReturn(destination); @@ -232,7 +228,7 @@ public class OpenWireMessageConverterTest { final ByteSequence destBytes = openWireFormat.marshal(classicMessage.getJMSReplyTo()); destBytes.compact(); ICoreMessage coreMessage = new CoreMessage().initBuffer(8); - coreMessage.putBytesProperty(AMQ_MSG_REPLY_TO, destBytes.data); + coreMessage.putBytesProperty(OpenWireConstants.AMQ_MSG_REPLY_TO, destBytes.data); MessageReference messageReference = new MessageReferenceImpl(coreMessage, Mockito.mock(Queue.class)); AMQConsumer amqConsumer = Mockito.mock(AMQConsumer.class); Mockito.when(amqConsumer.getOpenwireDestination()).thenReturn(destination); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/amq/JMSConsumer2Test.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/amq/JMSConsumer2Test.java index 60bc068059..924d4c381d 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/amq/JMSConsumer2Test.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/amq/JMSConsumer2Test.java @@ -42,6 +42,7 @@ import org.apache.activemq.artemis.utils.ActiveMQThreadFactory; import org.apache.activemq.artemis.utils.Wait; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.util.IdGenerator; import org.junit.Test; /** @@ -271,4 +272,20 @@ public class JMSConsumer2Test extends BasicOpenWireTest { connection.close(); } + + @Test + public void testSelectorWithJMSMessageID() throws Exception { + Session session = connection.createSession(true, Session.SESSION_TRANSACTED); + ActiveMQDestination destination = createDestination(session, ActiveMQDestination.QUEUE_TYPE); + sendMessages(connection, destination, 1); + /* + * The OpenWire client uses the hostname in the JMSMessageID so the test + * uses the same method for the selector so that the test will work on + * any host. + */ + MessageConsumer consumer = session.createConsumer(destination, "JMSMessageID like '%" + IdGenerator.getHostName() + "%'"); + connection.start(); + Message m = consumer.receive(500); + assertNotNull(m); + } }