ARTEMIS-2316 AMQPMessage missing ApplicationProperties from custom transformer
This commit is contained in:
parent
28919c5ebc
commit
bd1162d9b8
|
@ -1223,7 +1223,9 @@ public class AMQPMessage extends RefCountMessage {
|
|||
|
||||
@Override
|
||||
public Object removeProperty(String key) {
|
||||
return getApplicationPropertiesMap(false).remove(key);
|
||||
Object removed = getApplicationPropertiesMap(false).remove(key);
|
||||
messageChanged();
|
||||
return removed;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -1395,60 +1397,70 @@ public class AMQPMessage extends RefCountMessage {
|
|||
@Override
|
||||
public org.apache.activemq.artemis.api.core.Message putBooleanProperty(String key, boolean value) {
|
||||
getApplicationPropertiesMap(true).put(key, Boolean.valueOf(value));
|
||||
messageChanged();
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public org.apache.activemq.artemis.api.core.Message putByteProperty(String key, byte value) {
|
||||
getApplicationPropertiesMap(true).put(key, Byte.valueOf(value));
|
||||
messageChanged();
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public org.apache.activemq.artemis.api.core.Message putBytesProperty(String key, byte[] value) {
|
||||
getApplicationPropertiesMap(true).put(key, value);
|
||||
messageChanged();
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public org.apache.activemq.artemis.api.core.Message putShortProperty(String key, short value) {
|
||||
getApplicationPropertiesMap(true).put(key, Short.valueOf(value));
|
||||
messageChanged();
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public org.apache.activemq.artemis.api.core.Message putCharProperty(String key, char value) {
|
||||
getApplicationPropertiesMap(true).put(key, Character.valueOf(value));
|
||||
messageChanged();
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public org.apache.activemq.artemis.api.core.Message putIntProperty(String key, int value) {
|
||||
getApplicationPropertiesMap(true).put(key, Integer.valueOf(value));
|
||||
messageChanged();
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public org.apache.activemq.artemis.api.core.Message putLongProperty(String key, long value) {
|
||||
getApplicationPropertiesMap(true).put(key, Long.valueOf(value));
|
||||
messageChanged();
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public org.apache.activemq.artemis.api.core.Message putFloatProperty(String key, float value) {
|
||||
getApplicationPropertiesMap(true).put(key, Float.valueOf(value));
|
||||
messageChanged();
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public org.apache.activemq.artemis.api.core.Message putDoubleProperty(String key, double value) {
|
||||
getApplicationPropertiesMap(true).put(key, Double.valueOf(value));
|
||||
messageChanged();
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public org.apache.activemq.artemis.api.core.Message putBooleanProperty(SimpleString key, boolean value) {
|
||||
getApplicationPropertiesMap(true).put(key.toString(), Boolean.valueOf(value));
|
||||
messageChanged();
|
||||
return this;
|
||||
}
|
||||
|
||||
|
@ -1495,12 +1507,14 @@ public class AMQPMessage extends RefCountMessage {
|
|||
@Override
|
||||
public org.apache.activemq.artemis.api.core.Message putStringProperty(String key, String value) {
|
||||
getApplicationPropertiesMap(true).put(key, value);
|
||||
messageChanged();
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public org.apache.activemq.artemis.api.core.Message putObjectProperty(String key, Object value) throws ActiveMQPropertyConversionException {
|
||||
getApplicationPropertiesMap(true).put(key, value);
|
||||
messageChanged();
|
||||
return this;
|
||||
}
|
||||
|
||||
|
|
|
@ -0,0 +1,121 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
* <p>
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* <p>
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.artemis.tests.integration.amqp;
|
||||
|
||||
import java.net.URI;
|
||||
import java.net.URISyntaxException;
|
||||
import java.util.Arrays;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.activemq.artemis.api.core.RoutingType;
|
||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||
import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
|
||||
import org.apache.activemq.artemis.api.core.client.ClientConsumer;
|
||||
import org.apache.activemq.artemis.api.core.client.ClientMessage;
|
||||
import org.apache.activemq.artemis.api.core.client.ClientSession;
|
||||
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
|
||||
import org.apache.activemq.artemis.api.core.client.ServerLocator;
|
||||
import org.apache.activemq.artemis.core.config.BridgeConfiguration;
|
||||
import org.apache.activemq.artemis.core.config.DivertConfiguration;
|
||||
import org.apache.activemq.artemis.core.config.TransformerConfiguration;
|
||||
import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
||||
import org.apache.activemq.artemis.core.server.ComponentConfigurationRoutingType;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
public class AmqpBridgeApplicationProperties extends AmqpClientTestSupport {
|
||||
|
||||
private ActiveMQServer server0;
|
||||
private ActiveMQServer server1;
|
||||
|
||||
private SimpleString customNotificationQueue;
|
||||
private SimpleString frameworkNotificationsQueue;
|
||||
private SimpleString bridgeNotificationsQueue;
|
||||
private SimpleString notificationsQueue;
|
||||
|
||||
private String getServer0URL() {
|
||||
return "tcp://localhost:61616";
|
||||
}
|
||||
|
||||
private String getServer1URL() {
|
||||
return "tcp://localhost:61617";
|
||||
}
|
||||
|
||||
@Override
|
||||
public URI getBrokerAmqpConnectionURI() {
|
||||
try {
|
||||
return new URI(getServer0URL());
|
||||
} catch (URISyntaxException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
super.setUp();
|
||||
server0 = createServer(false, createBasicConfig());
|
||||
server1 = createServer(false, createBasicConfig());
|
||||
|
||||
server0.getConfiguration().addAcceptorConfiguration("acceptor", getServer0URL());
|
||||
server0.getConfiguration().addConnectorConfiguration("notification-broker", getServer1URL());
|
||||
server1.getConfiguration().addAcceptorConfiguration("acceptor", getServer1URL());
|
||||
|
||||
DivertConfiguration customNotificationsDivert = new DivertConfiguration().setName("custom-notifications-divert").setAddress("*.Provider.*.Agent.*.CustomNotification").setForwardingAddress("FrameworkNotifications").setExclusive(true).setTransformerConfiguration(new TransformerConfiguration(DivertApplicationPropertiesTransformer.class.getCanonicalName()));
|
||||
DivertConfiguration frameworkNotificationsDivert = new DivertConfiguration().setName("framework-notifications-divert").setAddress("BridgeNotifications").setForwardingAddress("Notifications").setRoutingType(ComponentConfigurationRoutingType.MULTICAST).setExclusive(true);
|
||||
|
||||
server0.getConfiguration().addDivertConfiguration(customNotificationsDivert);
|
||||
server1.getConfiguration().addDivertConfiguration(frameworkNotificationsDivert);
|
||||
|
||||
customNotificationQueue = SimpleString.toSimpleString("*.Provider.*.Agent.*.CustomNotification");
|
||||
frameworkNotificationsQueue = SimpleString.toSimpleString("FrameworkNotifications");
|
||||
bridgeNotificationsQueue = SimpleString.toSimpleString("BridgeNotifications");
|
||||
notificationsQueue = SimpleString.toSimpleString("Notifications");
|
||||
|
||||
server0.start();
|
||||
server1.start();
|
||||
|
||||
server0.createQueue(customNotificationQueue, RoutingType.ANYCAST, customNotificationQueue, null, true, false);
|
||||
server0.createQueue(frameworkNotificationsQueue, RoutingType.ANYCAST, frameworkNotificationsQueue, null, true, false);
|
||||
server1.createQueue(bridgeNotificationsQueue, RoutingType.ANYCAST, bridgeNotificationsQueue, null, true, false);
|
||||
server1.createQueue(notificationsQueue, RoutingType.MULTICAST, notificationsQueue, null, true, false);
|
||||
|
||||
server0.deployBridge(new BridgeConfiguration().setName("notifications-bridge").setQueueName(frameworkNotificationsQueue.toString()).setForwardingAddress(bridgeNotificationsQueue.toString()).setConfirmationWindowSize(10).setStaticConnectors(Arrays.asList("notification-broker")).setTransformerConfiguration(new TransformerConfiguration(BridgeApplicationPropertiesTransformer.class.getCanonicalName())));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testApplicationPropertiesFromTransformerForwardBridge() throws Exception {
|
||||
Map<String, Object> applicationProperties = new HashMap<>();
|
||||
applicationProperties.put(DivertApplicationPropertiesTransformer.TRX_ID, "100");
|
||||
|
||||
sendMessages("uswest.Provider.AMC.Agent.f261d0fa-51bd-44bd-abe0-ce22d2a387cd.CustomNotification", 1, RoutingType.ANYCAST, true);
|
||||
|
||||
try (ServerLocator locator = ActiveMQClient.createServerLocator(getServer1URL()); ClientSessionFactory sessionFactory = locator.createSessionFactory(); ClientSession session = sessionFactory.createSession(); ClientConsumer consumer = session.createConsumer(notificationsQueue)) {
|
||||
|
||||
session.start();
|
||||
|
||||
ClientMessage message = consumer.receive(5000);
|
||||
assertNotNull(message);
|
||||
|
||||
assertEquals("1", message.getStringProperty("A"));
|
||||
assertEquals("2", message.getStringProperty("B"));
|
||||
assertEquals("3", message.getStringProperty("C"));
|
||||
assertEquals("4", message.getStringProperty("D"));
|
||||
}
|
||||
}
|
||||
}
|
|
@ -16,15 +16,15 @@
|
|||
*/
|
||||
package org.apache.activemq.artemis.tests.integration.amqp;
|
||||
|
||||
import javax.management.MBeanServer;
|
||||
import javax.management.MBeanServerFactory;
|
||||
import java.net.URI;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
import javax.management.MBeanServer;
|
||||
import javax.management.MBeanServerFactory;
|
||||
|
||||
import org.apache.activemq.artemis.api.core.RoutingType;
|
||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||
import org.apache.activemq.artemis.api.core.TransportConfiguration;
|
||||
|
@ -316,7 +316,18 @@ public class AmqpClientTestSupport extends AmqpTestSupport {
|
|||
sendMessages(destinationName, count, routingType, false);
|
||||
}
|
||||
|
||||
protected void sendMessages(String destinationName, int count, RoutingType routingType, boolean durable) throws Exception {
|
||||
protected void sendMessages(String destinationName,
|
||||
int count,
|
||||
RoutingType routingType,
|
||||
boolean durable) throws Exception {
|
||||
sendMessages(destinationName, count, routingType, durable, Collections.emptyMap());
|
||||
}
|
||||
|
||||
protected void sendMessages(String destinationName,
|
||||
int count,
|
||||
RoutingType routingType,
|
||||
boolean durable,
|
||||
Map<String, Object> applicationProperties) throws Exception {
|
||||
AmqpClient client = createAmqpClient();
|
||||
AmqpConnection connection = addConnection(client.connect());
|
||||
try {
|
||||
|
@ -325,6 +336,9 @@ public class AmqpClientTestSupport extends AmqpTestSupport {
|
|||
|
||||
for (int i = 0; i < count; ++i) {
|
||||
AmqpMessage message = new AmqpMessage();
|
||||
for (Map.Entry<String, Object> entry : applicationProperties.entrySet()) {
|
||||
message.setApplicationProperty(entry.getKey(), entry.getValue());
|
||||
}
|
||||
message.setMessageId("MessageID:" + i);
|
||||
message.setDurable(durable);
|
||||
if (routingType != null) {
|
||||
|
|
|
@ -0,0 +1,32 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.artemis.tests.integration.amqp;
|
||||
|
||||
import org.apache.activemq.artemis.api.core.Message;
|
||||
import org.apache.activemq.artemis.core.server.transformer.Transformer;
|
||||
|
||||
public class BridgeApplicationPropertiesTransformer implements Transformer {
|
||||
|
||||
@Override
|
||||
public Message transform(final Message message) {
|
||||
|
||||
message.putStringProperty("C", "3");
|
||||
message.putStringProperty("D", "4");
|
||||
|
||||
return message;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,34 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.artemis.tests.integration.amqp;
|
||||
|
||||
import org.apache.activemq.artemis.api.core.Message;
|
||||
import org.apache.activemq.artemis.core.server.transformer.Transformer;
|
||||
|
||||
public class DivertApplicationPropertiesTransformer implements Transformer {
|
||||
|
||||
public static final String TRX_ID = "trxId";
|
||||
|
||||
@Override
|
||||
public Message transform(final Message message) {
|
||||
|
||||
message.putStringProperty("A", "1");
|
||||
message.putStringProperty("B", "2");
|
||||
|
||||
return message;
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue