diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java index df35115dd9..a1c58309d6 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java @@ -1223,7 +1223,9 @@ public class AMQPMessage extends RefCountMessage { @Override public Object removeProperty(String key) { - return getApplicationPropertiesMap(false).remove(key); + Object removed = getApplicationPropertiesMap(false).remove(key); + messageChanged(); + return removed; } @Override @@ -1395,60 +1397,70 @@ public class AMQPMessage extends RefCountMessage { @Override public org.apache.activemq.artemis.api.core.Message putBooleanProperty(String key, boolean value) { getApplicationPropertiesMap(true).put(key, Boolean.valueOf(value)); + messageChanged(); return this; } @Override public org.apache.activemq.artemis.api.core.Message putByteProperty(String key, byte value) { getApplicationPropertiesMap(true).put(key, Byte.valueOf(value)); + messageChanged(); return this; } @Override public org.apache.activemq.artemis.api.core.Message putBytesProperty(String key, byte[] value) { getApplicationPropertiesMap(true).put(key, value); + messageChanged(); return this; } @Override public org.apache.activemq.artemis.api.core.Message putShortProperty(String key, short value) { getApplicationPropertiesMap(true).put(key, Short.valueOf(value)); + messageChanged(); return this; } @Override public org.apache.activemq.artemis.api.core.Message putCharProperty(String key, char value) { getApplicationPropertiesMap(true).put(key, Character.valueOf(value)); + messageChanged(); return this; } @Override public org.apache.activemq.artemis.api.core.Message putIntProperty(String key, int value) { getApplicationPropertiesMap(true).put(key, Integer.valueOf(value)); + messageChanged(); return this; } @Override public org.apache.activemq.artemis.api.core.Message putLongProperty(String key, long value) { getApplicationPropertiesMap(true).put(key, Long.valueOf(value)); + messageChanged(); return this; } @Override public org.apache.activemq.artemis.api.core.Message putFloatProperty(String key, float value) { getApplicationPropertiesMap(true).put(key, Float.valueOf(value)); + messageChanged(); return this; } @Override public org.apache.activemq.artemis.api.core.Message putDoubleProperty(String key, double value) { getApplicationPropertiesMap(true).put(key, Double.valueOf(value)); + messageChanged(); return this; } @Override public org.apache.activemq.artemis.api.core.Message putBooleanProperty(SimpleString key, boolean value) { getApplicationPropertiesMap(true).put(key.toString(), Boolean.valueOf(value)); + messageChanged(); return this; } @@ -1495,12 +1507,14 @@ public class AMQPMessage extends RefCountMessage { @Override public org.apache.activemq.artemis.api.core.Message putStringProperty(String key, String value) { getApplicationPropertiesMap(true).put(key, value); + messageChanged(); return this; } @Override public org.apache.activemq.artemis.api.core.Message putObjectProperty(String key, Object value) throws ActiveMQPropertyConversionException { getApplicationPropertiesMap(true).put(key, value); + messageChanged(); return this; } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpBridgeApplicationProperties.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpBridgeApplicationProperties.java new file mode 100644 index 0000000000..886d65e548 --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpBridgeApplicationProperties.java @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.amqp; + +import java.net.URI; +import java.net.URISyntaxException; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; + +import org.apache.activemq.artemis.api.core.RoutingType; +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.api.core.client.ActiveMQClient; +import org.apache.activemq.artemis.api.core.client.ClientConsumer; +import org.apache.activemq.artemis.api.core.client.ClientMessage; +import org.apache.activemq.artemis.api.core.client.ClientSession; +import org.apache.activemq.artemis.api.core.client.ClientSessionFactory; +import org.apache.activemq.artemis.api.core.client.ServerLocator; +import org.apache.activemq.artemis.core.config.BridgeConfiguration; +import org.apache.activemq.artemis.core.config.DivertConfiguration; +import org.apache.activemq.artemis.core.config.TransformerConfiguration; +import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.core.server.ComponentConfigurationRoutingType; +import org.junit.Before; +import org.junit.Test; + +public class AmqpBridgeApplicationProperties extends AmqpClientTestSupport { + + private ActiveMQServer server0; + private ActiveMQServer server1; + + private SimpleString customNotificationQueue; + private SimpleString frameworkNotificationsQueue; + private SimpleString bridgeNotificationsQueue; + private SimpleString notificationsQueue; + + private String getServer0URL() { + return "tcp://localhost:61616"; + } + + private String getServer1URL() { + return "tcp://localhost:61617"; + } + + @Override + public URI getBrokerAmqpConnectionURI() { + try { + return new URI(getServer0URL()); + } catch (URISyntaxException e) { + throw new RuntimeException(e); + } + } + + @Override + @Before + public void setUp() throws Exception { + super.setUp(); + server0 = createServer(false, createBasicConfig()); + server1 = createServer(false, createBasicConfig()); + + server0.getConfiguration().addAcceptorConfiguration("acceptor", getServer0URL()); + server0.getConfiguration().addConnectorConfiguration("notification-broker", getServer1URL()); + server1.getConfiguration().addAcceptorConfiguration("acceptor", getServer1URL()); + + DivertConfiguration customNotificationsDivert = new DivertConfiguration().setName("custom-notifications-divert").setAddress("*.Provider.*.Agent.*.CustomNotification").setForwardingAddress("FrameworkNotifications").setExclusive(true).setTransformerConfiguration(new TransformerConfiguration(DivertApplicationPropertiesTransformer.class.getCanonicalName())); + DivertConfiguration frameworkNotificationsDivert = new DivertConfiguration().setName("framework-notifications-divert").setAddress("BridgeNotifications").setForwardingAddress("Notifications").setRoutingType(ComponentConfigurationRoutingType.MULTICAST).setExclusive(true); + + server0.getConfiguration().addDivertConfiguration(customNotificationsDivert); + server1.getConfiguration().addDivertConfiguration(frameworkNotificationsDivert); + + customNotificationQueue = SimpleString.toSimpleString("*.Provider.*.Agent.*.CustomNotification"); + frameworkNotificationsQueue = SimpleString.toSimpleString("FrameworkNotifications"); + bridgeNotificationsQueue = SimpleString.toSimpleString("BridgeNotifications"); + notificationsQueue = SimpleString.toSimpleString("Notifications"); + + server0.start(); + server1.start(); + + server0.createQueue(customNotificationQueue, RoutingType.ANYCAST, customNotificationQueue, null, true, false); + server0.createQueue(frameworkNotificationsQueue, RoutingType.ANYCAST, frameworkNotificationsQueue, null, true, false); + server1.createQueue(bridgeNotificationsQueue, RoutingType.ANYCAST, bridgeNotificationsQueue, null, true, false); + server1.createQueue(notificationsQueue, RoutingType.MULTICAST, notificationsQueue, null, true, false); + + server0.deployBridge(new BridgeConfiguration().setName("notifications-bridge").setQueueName(frameworkNotificationsQueue.toString()).setForwardingAddress(bridgeNotificationsQueue.toString()).setConfirmationWindowSize(10).setStaticConnectors(Arrays.asList("notification-broker")).setTransformerConfiguration(new TransformerConfiguration(BridgeApplicationPropertiesTransformer.class.getCanonicalName()))); + } + + @Test + public void testApplicationPropertiesFromTransformerForwardBridge() throws Exception { + Map applicationProperties = new HashMap<>(); + applicationProperties.put(DivertApplicationPropertiesTransformer.TRX_ID, "100"); + + sendMessages("uswest.Provider.AMC.Agent.f261d0fa-51bd-44bd-abe0-ce22d2a387cd.CustomNotification", 1, RoutingType.ANYCAST, true); + + try (ServerLocator locator = ActiveMQClient.createServerLocator(getServer1URL()); ClientSessionFactory sessionFactory = locator.createSessionFactory(); ClientSession session = sessionFactory.createSession(); ClientConsumer consumer = session.createConsumer(notificationsQueue)) { + + session.start(); + + ClientMessage message = consumer.receive(5000); + assertNotNull(message); + + assertEquals("1", message.getStringProperty("A")); + assertEquals("2", message.getStringProperty("B")); + assertEquals("3", message.getStringProperty("C")); + assertEquals("4", message.getStringProperty("D")); + } + } +} \ No newline at end of file diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpClientTestSupport.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpClientTestSupport.java index 5d8c68100c..ca34c23ce5 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpClientTestSupport.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpClientTestSupport.java @@ -16,15 +16,15 @@ */ package org.apache.activemq.artemis.tests.integration.amqp; +import javax.management.MBeanServer; +import javax.management.MBeanServerFactory; import java.net.URI; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.Map; import java.util.Set; -import javax.management.MBeanServer; -import javax.management.MBeanServerFactory; - import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.TransportConfiguration; @@ -316,7 +316,18 @@ public class AmqpClientTestSupport extends AmqpTestSupport { sendMessages(destinationName, count, routingType, false); } - protected void sendMessages(String destinationName, int count, RoutingType routingType, boolean durable) throws Exception { + protected void sendMessages(String destinationName, + int count, + RoutingType routingType, + boolean durable) throws Exception { + sendMessages(destinationName, count, routingType, durable, Collections.emptyMap()); + } + + protected void sendMessages(String destinationName, + int count, + RoutingType routingType, + boolean durable, + Map applicationProperties) throws Exception { AmqpClient client = createAmqpClient(); AmqpConnection connection = addConnection(client.connect()); try { @@ -325,6 +336,9 @@ public class AmqpClientTestSupport extends AmqpTestSupport { for (int i = 0; i < count; ++i) { AmqpMessage message = new AmqpMessage(); + for (Map.Entry entry : applicationProperties.entrySet()) { + message.setApplicationProperty(entry.getKey(), entry.getValue()); + } message.setMessageId("MessageID:" + i); message.setDurable(durable); if (routingType != null) { diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/BridgeApplicationPropertiesTransformer.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/BridgeApplicationPropertiesTransformer.java new file mode 100644 index 0000000000..9efb2af346 --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/BridgeApplicationPropertiesTransformer.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.amqp; + +import org.apache.activemq.artemis.api.core.Message; +import org.apache.activemq.artemis.core.server.transformer.Transformer; + +public class BridgeApplicationPropertiesTransformer implements Transformer { + + @Override + public Message transform(final Message message) { + + message.putStringProperty("C", "3"); + message.putStringProperty("D", "4"); + + return message; + } +} diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/DivertApplicationPropertiesTransformer.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/DivertApplicationPropertiesTransformer.java new file mode 100644 index 0000000000..6620dc4ee5 --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/DivertApplicationPropertiesTransformer.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.amqp; + +import org.apache.activemq.artemis.api.core.Message; +import org.apache.activemq.artemis.core.server.transformer.Transformer; + +public class DivertApplicationPropertiesTransformer implements Transformer { + + public static final String TRX_ID = "trxId"; + + @Override + public Message transform(final Message message) { + + message.putStringProperty("A", "1"); + message.putStringProperty("B", "2"); + + return message; + } +}