This closes #2634
This commit is contained in:
commit
7c6e5e646c
|
@ -1223,7 +1223,9 @@ public class AMQPMessage extends RefCountMessage {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Object removeProperty(String key) {
|
public Object removeProperty(String key) {
|
||||||
return getApplicationPropertiesMap(false).remove(key);
|
Object removed = getApplicationPropertiesMap(false).remove(key);
|
||||||
|
messageChanged();
|
||||||
|
return removed;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -1395,60 +1397,70 @@ public class AMQPMessage extends RefCountMessage {
|
||||||
@Override
|
@Override
|
||||||
public org.apache.activemq.artemis.api.core.Message putBooleanProperty(String key, boolean value) {
|
public org.apache.activemq.artemis.api.core.Message putBooleanProperty(String key, boolean value) {
|
||||||
getApplicationPropertiesMap(true).put(key, Boolean.valueOf(value));
|
getApplicationPropertiesMap(true).put(key, Boolean.valueOf(value));
|
||||||
|
messageChanged();
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public org.apache.activemq.artemis.api.core.Message putByteProperty(String key, byte value) {
|
public org.apache.activemq.artemis.api.core.Message putByteProperty(String key, byte value) {
|
||||||
getApplicationPropertiesMap(true).put(key, Byte.valueOf(value));
|
getApplicationPropertiesMap(true).put(key, Byte.valueOf(value));
|
||||||
|
messageChanged();
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public org.apache.activemq.artemis.api.core.Message putBytesProperty(String key, byte[] value) {
|
public org.apache.activemq.artemis.api.core.Message putBytesProperty(String key, byte[] value) {
|
||||||
getApplicationPropertiesMap(true).put(key, value);
|
getApplicationPropertiesMap(true).put(key, value);
|
||||||
|
messageChanged();
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public org.apache.activemq.artemis.api.core.Message putShortProperty(String key, short value) {
|
public org.apache.activemq.artemis.api.core.Message putShortProperty(String key, short value) {
|
||||||
getApplicationPropertiesMap(true).put(key, Short.valueOf(value));
|
getApplicationPropertiesMap(true).put(key, Short.valueOf(value));
|
||||||
|
messageChanged();
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public org.apache.activemq.artemis.api.core.Message putCharProperty(String key, char value) {
|
public org.apache.activemq.artemis.api.core.Message putCharProperty(String key, char value) {
|
||||||
getApplicationPropertiesMap(true).put(key, Character.valueOf(value));
|
getApplicationPropertiesMap(true).put(key, Character.valueOf(value));
|
||||||
|
messageChanged();
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public org.apache.activemq.artemis.api.core.Message putIntProperty(String key, int value) {
|
public org.apache.activemq.artemis.api.core.Message putIntProperty(String key, int value) {
|
||||||
getApplicationPropertiesMap(true).put(key, Integer.valueOf(value));
|
getApplicationPropertiesMap(true).put(key, Integer.valueOf(value));
|
||||||
|
messageChanged();
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public org.apache.activemq.artemis.api.core.Message putLongProperty(String key, long value) {
|
public org.apache.activemq.artemis.api.core.Message putLongProperty(String key, long value) {
|
||||||
getApplicationPropertiesMap(true).put(key, Long.valueOf(value));
|
getApplicationPropertiesMap(true).put(key, Long.valueOf(value));
|
||||||
|
messageChanged();
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public org.apache.activemq.artemis.api.core.Message putFloatProperty(String key, float value) {
|
public org.apache.activemq.artemis.api.core.Message putFloatProperty(String key, float value) {
|
||||||
getApplicationPropertiesMap(true).put(key, Float.valueOf(value));
|
getApplicationPropertiesMap(true).put(key, Float.valueOf(value));
|
||||||
|
messageChanged();
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public org.apache.activemq.artemis.api.core.Message putDoubleProperty(String key, double value) {
|
public org.apache.activemq.artemis.api.core.Message putDoubleProperty(String key, double value) {
|
||||||
getApplicationPropertiesMap(true).put(key, Double.valueOf(value));
|
getApplicationPropertiesMap(true).put(key, Double.valueOf(value));
|
||||||
|
messageChanged();
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public org.apache.activemq.artemis.api.core.Message putBooleanProperty(SimpleString key, boolean value) {
|
public org.apache.activemq.artemis.api.core.Message putBooleanProperty(SimpleString key, boolean value) {
|
||||||
getApplicationPropertiesMap(true).put(key.toString(), Boolean.valueOf(value));
|
getApplicationPropertiesMap(true).put(key.toString(), Boolean.valueOf(value));
|
||||||
|
messageChanged();
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1495,12 +1507,14 @@ public class AMQPMessage extends RefCountMessage {
|
||||||
@Override
|
@Override
|
||||||
public org.apache.activemq.artemis.api.core.Message putStringProperty(String key, String value) {
|
public org.apache.activemq.artemis.api.core.Message putStringProperty(String key, String value) {
|
||||||
getApplicationPropertiesMap(true).put(key, value);
|
getApplicationPropertiesMap(true).put(key, value);
|
||||||
|
messageChanged();
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public org.apache.activemq.artemis.api.core.Message putObjectProperty(String key, Object value) throws ActiveMQPropertyConversionException {
|
public org.apache.activemq.artemis.api.core.Message putObjectProperty(String key, Object value) throws ActiveMQPropertyConversionException {
|
||||||
getApplicationPropertiesMap(true).put(key, value);
|
getApplicationPropertiesMap(true).put(key, value);
|
||||||
|
messageChanged();
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,121 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
* <p>
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
* <p>
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.activemq.artemis.tests.integration.amqp;
|
||||||
|
|
||||||
|
import java.net.URI;
|
||||||
|
import java.net.URISyntaxException;
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
import org.apache.activemq.artemis.api.core.RoutingType;
|
||||||
|
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||||
|
import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
|
||||||
|
import org.apache.activemq.artemis.api.core.client.ClientConsumer;
|
||||||
|
import org.apache.activemq.artemis.api.core.client.ClientMessage;
|
||||||
|
import org.apache.activemq.artemis.api.core.client.ClientSession;
|
||||||
|
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
|
||||||
|
import org.apache.activemq.artemis.api.core.client.ServerLocator;
|
||||||
|
import org.apache.activemq.artemis.core.config.BridgeConfiguration;
|
||||||
|
import org.apache.activemq.artemis.core.config.DivertConfiguration;
|
||||||
|
import org.apache.activemq.artemis.core.config.TransformerConfiguration;
|
||||||
|
import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
||||||
|
import org.apache.activemq.artemis.core.server.ComponentConfigurationRoutingType;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
public class AmqpBridgeApplicationProperties extends AmqpClientTestSupport {
|
||||||
|
|
||||||
|
private ActiveMQServer server0;
|
||||||
|
private ActiveMQServer server1;
|
||||||
|
|
||||||
|
private SimpleString customNotificationQueue;
|
||||||
|
private SimpleString frameworkNotificationsQueue;
|
||||||
|
private SimpleString bridgeNotificationsQueue;
|
||||||
|
private SimpleString notificationsQueue;
|
||||||
|
|
||||||
|
private String getServer0URL() {
|
||||||
|
return "tcp://localhost:61616";
|
||||||
|
}
|
||||||
|
|
||||||
|
private String getServer1URL() {
|
||||||
|
return "tcp://localhost:61617";
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public URI getBrokerAmqpConnectionURI() {
|
||||||
|
try {
|
||||||
|
return new URI(getServer0URL());
|
||||||
|
} catch (URISyntaxException e) {
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
@Before
|
||||||
|
public void setUp() throws Exception {
|
||||||
|
super.setUp();
|
||||||
|
server0 = createServer(false, createBasicConfig());
|
||||||
|
server1 = createServer(false, createBasicConfig());
|
||||||
|
|
||||||
|
server0.getConfiguration().addAcceptorConfiguration("acceptor", getServer0URL());
|
||||||
|
server0.getConfiguration().addConnectorConfiguration("notification-broker", getServer1URL());
|
||||||
|
server1.getConfiguration().addAcceptorConfiguration("acceptor", getServer1URL());
|
||||||
|
|
||||||
|
DivertConfiguration customNotificationsDivert = new DivertConfiguration().setName("custom-notifications-divert").setAddress("*.Provider.*.Agent.*.CustomNotification").setForwardingAddress("FrameworkNotifications").setExclusive(true).setTransformerConfiguration(new TransformerConfiguration(DivertApplicationPropertiesTransformer.class.getCanonicalName()));
|
||||||
|
DivertConfiguration frameworkNotificationsDivert = new DivertConfiguration().setName("framework-notifications-divert").setAddress("BridgeNotifications").setForwardingAddress("Notifications").setRoutingType(ComponentConfigurationRoutingType.MULTICAST).setExclusive(true);
|
||||||
|
|
||||||
|
server0.getConfiguration().addDivertConfiguration(customNotificationsDivert);
|
||||||
|
server1.getConfiguration().addDivertConfiguration(frameworkNotificationsDivert);
|
||||||
|
|
||||||
|
customNotificationQueue = SimpleString.toSimpleString("*.Provider.*.Agent.*.CustomNotification");
|
||||||
|
frameworkNotificationsQueue = SimpleString.toSimpleString("FrameworkNotifications");
|
||||||
|
bridgeNotificationsQueue = SimpleString.toSimpleString("BridgeNotifications");
|
||||||
|
notificationsQueue = SimpleString.toSimpleString("Notifications");
|
||||||
|
|
||||||
|
server0.start();
|
||||||
|
server1.start();
|
||||||
|
|
||||||
|
server0.createQueue(customNotificationQueue, RoutingType.ANYCAST, customNotificationQueue, null, true, false);
|
||||||
|
server0.createQueue(frameworkNotificationsQueue, RoutingType.ANYCAST, frameworkNotificationsQueue, null, true, false);
|
||||||
|
server1.createQueue(bridgeNotificationsQueue, RoutingType.ANYCAST, bridgeNotificationsQueue, null, true, false);
|
||||||
|
server1.createQueue(notificationsQueue, RoutingType.MULTICAST, notificationsQueue, null, true, false);
|
||||||
|
|
||||||
|
server0.deployBridge(new BridgeConfiguration().setName("notifications-bridge").setQueueName(frameworkNotificationsQueue.toString()).setForwardingAddress(bridgeNotificationsQueue.toString()).setConfirmationWindowSize(10).setStaticConnectors(Arrays.asList("notification-broker")).setTransformerConfiguration(new TransformerConfiguration(BridgeApplicationPropertiesTransformer.class.getCanonicalName())));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testApplicationPropertiesFromTransformerForwardBridge() throws Exception {
|
||||||
|
Map<String, Object> applicationProperties = new HashMap<>();
|
||||||
|
applicationProperties.put(DivertApplicationPropertiesTransformer.TRX_ID, "100");
|
||||||
|
|
||||||
|
sendMessages("uswest.Provider.AMC.Agent.f261d0fa-51bd-44bd-abe0-ce22d2a387cd.CustomNotification", 1, RoutingType.ANYCAST, true);
|
||||||
|
|
||||||
|
try (ServerLocator locator = ActiveMQClient.createServerLocator(getServer1URL()); ClientSessionFactory sessionFactory = locator.createSessionFactory(); ClientSession session = sessionFactory.createSession(); ClientConsumer consumer = session.createConsumer(notificationsQueue)) {
|
||||||
|
|
||||||
|
session.start();
|
||||||
|
|
||||||
|
ClientMessage message = consumer.receive(5000);
|
||||||
|
assertNotNull(message);
|
||||||
|
|
||||||
|
assertEquals("1", message.getStringProperty("A"));
|
||||||
|
assertEquals("2", message.getStringProperty("B"));
|
||||||
|
assertEquals("3", message.getStringProperty("C"));
|
||||||
|
assertEquals("4", message.getStringProperty("D"));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -16,15 +16,15 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.activemq.artemis.tests.integration.amqp;
|
package org.apache.activemq.artemis.tests.integration.amqp;
|
||||||
|
|
||||||
|
import javax.management.MBeanServer;
|
||||||
|
import javax.management.MBeanServerFactory;
|
||||||
import java.net.URI;
|
import java.net.URI;
|
||||||
|
import java.util.Collections;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
|
||||||
import javax.management.MBeanServer;
|
|
||||||
import javax.management.MBeanServerFactory;
|
|
||||||
|
|
||||||
import org.apache.activemq.artemis.api.core.RoutingType;
|
import org.apache.activemq.artemis.api.core.RoutingType;
|
||||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||||
import org.apache.activemq.artemis.api.core.TransportConfiguration;
|
import org.apache.activemq.artemis.api.core.TransportConfiguration;
|
||||||
|
@ -316,7 +316,18 @@ public class AmqpClientTestSupport extends AmqpTestSupport {
|
||||||
sendMessages(destinationName, count, routingType, false);
|
sendMessages(destinationName, count, routingType, false);
|
||||||
}
|
}
|
||||||
|
|
||||||
protected void sendMessages(String destinationName, int count, RoutingType routingType, boolean durable) throws Exception {
|
protected void sendMessages(String destinationName,
|
||||||
|
int count,
|
||||||
|
RoutingType routingType,
|
||||||
|
boolean durable) throws Exception {
|
||||||
|
sendMessages(destinationName, count, routingType, durable, Collections.emptyMap());
|
||||||
|
}
|
||||||
|
|
||||||
|
protected void sendMessages(String destinationName,
|
||||||
|
int count,
|
||||||
|
RoutingType routingType,
|
||||||
|
boolean durable,
|
||||||
|
Map<String, Object> applicationProperties) throws Exception {
|
||||||
AmqpClient client = createAmqpClient();
|
AmqpClient client = createAmqpClient();
|
||||||
AmqpConnection connection = addConnection(client.connect());
|
AmqpConnection connection = addConnection(client.connect());
|
||||||
try {
|
try {
|
||||||
|
@ -325,6 +336,9 @@ public class AmqpClientTestSupport extends AmqpTestSupport {
|
||||||
|
|
||||||
for (int i = 0; i < count; ++i) {
|
for (int i = 0; i < count; ++i) {
|
||||||
AmqpMessage message = new AmqpMessage();
|
AmqpMessage message = new AmqpMessage();
|
||||||
|
for (Map.Entry<String, Object> entry : applicationProperties.entrySet()) {
|
||||||
|
message.setApplicationProperty(entry.getKey(), entry.getValue());
|
||||||
|
}
|
||||||
message.setMessageId("MessageID:" + i);
|
message.setMessageId("MessageID:" + i);
|
||||||
message.setDurable(durable);
|
message.setDurable(durable);
|
||||||
if (routingType != null) {
|
if (routingType != null) {
|
||||||
|
|
|
@ -0,0 +1,32 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.activemq.artemis.tests.integration.amqp;
|
||||||
|
|
||||||
|
import org.apache.activemq.artemis.api.core.Message;
|
||||||
|
import org.apache.activemq.artemis.core.server.transformer.Transformer;
|
||||||
|
|
||||||
|
public class BridgeApplicationPropertiesTransformer implements Transformer {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Message transform(final Message message) {
|
||||||
|
|
||||||
|
message.putStringProperty("C", "3");
|
||||||
|
message.putStringProperty("D", "4");
|
||||||
|
|
||||||
|
return message;
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,34 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.activemq.artemis.tests.integration.amqp;
|
||||||
|
|
||||||
|
import org.apache.activemq.artemis.api.core.Message;
|
||||||
|
import org.apache.activemq.artemis.core.server.transformer.Transformer;
|
||||||
|
|
||||||
|
public class DivertApplicationPropertiesTransformer implements Transformer {
|
||||||
|
|
||||||
|
public static final String TRX_ID = "trxId";
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Message transform(final Message message) {
|
||||||
|
|
||||||
|
message.putStringProperty("A", "1");
|
||||||
|
message.putStringProperty("B", "2");
|
||||||
|
|
||||||
|
return message;
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue