diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java index a03ff10508..06a589442f 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java @@ -966,10 +966,18 @@ public class AMQPMessage extends RefCountMessage { @Override public void reencode() { + parseHeaders(); + getApplicationProperties(); + if (_header != null) getProtonMessage().setHeader(_header); if (_deliveryAnnotations != null) getProtonMessage().setDeliveryAnnotations(_deliveryAnnotations); if (_messageAnnotations != null) getProtonMessage().setMessageAnnotations(_messageAnnotations); if (applicationProperties != null) getProtonMessage().setApplicationProperties(applicationProperties); - if (_properties != null) getProtonMessage().setProperties(this._properties); + if (_properties != null) { + if (address != null) { + _properties.setTo(address); + } + getProtonMessage().setProperties(this._properties); + } bufferValid = false; checkBuffer(); } diff --git a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/message/AMQPMessageTest.java b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/message/AMQPMessageTest.java index 9dcf7c9c07..bff43a848e 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/message/AMQPMessageTest.java +++ b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/message/AMQPMessageTest.java @@ -24,6 +24,7 @@ import static org.junit.Assert.assertTrue; import java.nio.charset.StandardCharsets; import java.util.Date; +import java.util.HashMap; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ActiveMQBuffers; @@ -34,7 +35,6 @@ import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessagePersisterV2; import org.apache.activemq.artemis.protocol.amqp.util.NettyWritable; import org.apache.activemq.artemis.spi.core.protocol.EmbedMessageUtil; import org.apache.activemq.artemis.utils.RandomUtil; -import org.apache.commons.collections.map.HashedMap; import org.apache.qpid.proton.amqp.UnsignedInteger; import org.apache.qpid.proton.amqp.messaging.ApplicationProperties; import org.apache.qpid.proton.amqp.messaging.Header; @@ -58,7 +58,7 @@ public class AMQPMessageTest { protonMessage.setProperties(properties); protonMessage.getHeader().setDeliveryCount(new UnsignedInteger(7)); protonMessage.getHeader().setDurable(Boolean.TRUE); - protonMessage.setApplicationProperties(new ApplicationProperties(new HashedMap())); + protonMessage.setApplicationProperties(new ApplicationProperties(new HashMap())); AMQPMessage decoded = encodeAndDecodeMessage(protonMessage); @@ -67,6 +67,39 @@ public class AMQPMessageTest { assertEquals("someNiceLocal", decoded.getAddress()); } + @Test + public void testApplicationPropertiesReencode() { + MessageImpl protonMessage = (MessageImpl) Message.Factory.create(); + protonMessage.setHeader( new Header()); + Properties properties = new Properties(); + properties.setTo("someNiceLocal"); + protonMessage.setProperties(properties); + protonMessage.getHeader().setDeliveryCount(new UnsignedInteger(7)); + protonMessage.getHeader().setDurable(Boolean.TRUE); + HashMap map = new HashMap(); + map.put("key", "string1"); + protonMessage.setApplicationProperties(new ApplicationProperties(map)); + + AMQPMessage decoded = encodeAndDecodeMessage(protonMessage); + assertEquals("someNiceLocal", decoded.getAddress()); + + decoded.setAddress("newAddress"); + + decoded.reencode(); + assertEquals(7, decoded.getHeader().getDeliveryCount().intValue()); + assertEquals(true, decoded.getHeader().getDurable()); + assertEquals("newAddress", decoded.getAddress()); + assertEquals("string1", decoded.getObjectProperty("key")); + + // validate if the message will be the same after delivery + AMQPMessage newDecoded = encodeDelivery(decoded, 3); + assertEquals(2, decoded.getHeader().getDeliveryCount().intValue()); + assertEquals(true, newDecoded.getHeader().getDurable()); + assertEquals("newAddress", newDecoded.getAddress()); + assertEquals("string1", newDecoded.getObjectProperty("key")); + + } + @Test public void testGetAddressFromMessage() { final String ADDRESS = "myQueue"; @@ -260,4 +293,15 @@ public class AMQPMessageTest { return new AMQPMessage(0, bytes); } + + private AMQPMessage encodeDelivery(AMQPMessage message, int deliveryCount) { + ByteBuf nettyBuffer = Unpooled.buffer(1500); + + message.sendBuffer(nettyBuffer, deliveryCount); + + byte[] bytes = new byte[nettyBuffer.writerIndex()]; + nettyBuffer.readBytes(bytes); + + return new AMQPMessage(0, bytes); + } } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/DivertTopicToQueueTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/DivertTopicToQueueTest.java new file mode 100644 index 0000000000..560a88b0c7 --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/DivertTopicToQueueTest.java @@ -0,0 +1,81 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.tests.integration.amqp; + +import javax.jms.Connection; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; + +import org.apache.activemq.artemis.core.config.DivertConfiguration; +import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.core.server.DivertConfigurationRoutingType; +import org.apache.qpid.jms.JmsConnectionFactory; +import org.junit.Assert; +import org.junit.Test; + +public class DivertTopicToQueueTest extends JMSClientTestSupport { + + @Test + public void divertTopicToQueueWithSelectorTest() throws Exception { + final String address1 = "bss.order.workorderchanges.v1.topic"; + final String address2 = "bss.order.Consumer.cbma.workorderchanges.v1.queue"; + final String address3 = "bss.order.Consumer.pinpoint.workorderchanges.v1.queue"; + + DivertConfiguration dc1 = new DivertConfiguration().setName("WorkOrderChangesCBMA-Divert").setRoutingName("WorkOrderChangesCBMA-Divert").setAddress(address1).setForwardingAddress(address2).setExclusive(false).setRoutingType(DivertConfigurationRoutingType.ANYCAST); + DivertConfiguration dc2 = new DivertConfiguration().setName("WorkOrderChangesPinpoint-Divert").setRoutingName("WorkOrderChangesPinpoint-Divert").setAddress(address1).setForwardingAddress(address3).setExclusive(false).setRoutingType(DivertConfigurationRoutingType.ANYCAST); + + server.deployDivert(dc1); + server.deployDivert(dc2); + + JmsConnectionFactory factory = new JmsConnectionFactory(getBrokerQpidJMSConnectionURI()); + + Connection connection = factory.createConnection(null, null); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + javax.jms.Topic topicSource = session.createTopic(address1); + javax.jms.Queue queueTarget = session.createQueue(address2); + javax.jms.Queue queueTarget2 = session.createQueue(address3); + + final MessageProducer producer = session.createProducer(topicSource); + final TextMessage message = session.createTextMessage("Hello"); + message.setStringProperty("filename", "BILHANDLE"); + + connection.start(); + + String selector = "filename='BILHANDLE'"; + + final MessageConsumer consumer = session.createConsumer(queueTarget, selector); + final MessageConsumer consumer2 = session.createConsumer(queueTarget2, selector); + producer.send(message); + + TextMessage receivedMessage = (TextMessage) consumer.receive(1000); + TextMessage receivedMessage2 = (TextMessage) consumer2.receive(1000); + + Assert.assertNotNull(receivedMessage); + Assert.assertNotNull(receivedMessage2); + + connection.close(); + } + + @Override + protected void createAddressAndQueues(ActiveMQServer server) throws Exception { + // do not create unnecessary queues + } +}