mirror of https://github.com/apache/activemq.git
Strip delivery annotations from the incoming messages when using JMS transformer, the other transformers don't currently have a way to do this.
This commit is contained in:
parent
c07d6c841d
commit
f05ff94e5c
|
@ -38,10 +38,7 @@ public class AMQPNativeOutboundTransformer extends OutboundTransformer {
|
|||
|
||||
@Override
|
||||
public EncodedMessage transform(Message msg) throws Exception {
|
||||
if (msg == null) {
|
||||
return null;
|
||||
}
|
||||
if (!(msg instanceof BytesMessage)) {
|
||||
if (msg == null || !(msg instanceof BytesMessage)) {
|
||||
return null;
|
||||
}
|
||||
|
||||
|
|
|
@ -33,7 +33,6 @@ import org.apache.qpid.proton.amqp.UnsignedInteger;
|
|||
import org.apache.qpid.proton.amqp.UnsignedLong;
|
||||
import org.apache.qpid.proton.amqp.UnsignedShort;
|
||||
import org.apache.qpid.proton.amqp.messaging.ApplicationProperties;
|
||||
import org.apache.qpid.proton.amqp.messaging.DeliveryAnnotations;
|
||||
import org.apache.qpid.proton.amqp.messaging.Footer;
|
||||
import org.apache.qpid.proton.amqp.messaging.Header;
|
||||
import org.apache.qpid.proton.amqp.messaging.MessageAnnotations;
|
||||
|
@ -126,14 +125,6 @@ public abstract class InboundTransformer {
|
|||
vendor.setJMSXDeliveryCount(jms, header.getDeliveryCount().longValue());
|
||||
}
|
||||
|
||||
final DeliveryAnnotations da = amqp.getDeliveryAnnotations();
|
||||
if (da != null) {
|
||||
for (Map.Entry<?, ?> entry : da.getValue().entrySet()) {
|
||||
String key = entry.getKey().toString();
|
||||
setProperty(jms, prefixVendor + prefixDeliveryAnnotations + key, entry.getValue());
|
||||
}
|
||||
}
|
||||
|
||||
final MessageAnnotations ma = amqp.getMessageAnnotations();
|
||||
if (ma != null) {
|
||||
for (Map.Entry<?, ?> entry : ma.getValue().entrySet()) {
|
||||
|
|
|
@ -26,6 +26,7 @@ import org.apache.qpid.proton.amqp.Symbol;
|
|||
import org.apache.qpid.proton.amqp.messaging.AmqpValue;
|
||||
import org.apache.qpid.proton.amqp.messaging.ApplicationProperties;
|
||||
import org.apache.qpid.proton.amqp.messaging.Data;
|
||||
import org.apache.qpid.proton.amqp.messaging.DeliveryAnnotations;
|
||||
import org.apache.qpid.proton.amqp.messaging.MessageAnnotations;
|
||||
import org.apache.qpid.proton.amqp.messaging.Properties;
|
||||
import org.apache.qpid.proton.engine.Delivery;
|
||||
|
@ -37,6 +38,7 @@ public class AmqpMessage {
|
|||
private final Message message;
|
||||
private final Delivery delivery;
|
||||
|
||||
private Map<Symbol, Object> deliveryAnnotationsMap;
|
||||
private Map<Symbol, Object> messageAnnotationsMap;
|
||||
private Map<String, Object> applicationPropertiesMap;
|
||||
|
||||
|
@ -87,6 +89,10 @@ public class AmqpMessage {
|
|||
if (message.getApplicationProperties() != null) {
|
||||
applicationPropertiesMap = message.getApplicationProperties().getValue();
|
||||
}
|
||||
|
||||
if (message.getDeliveryAnnotations() != null) {
|
||||
deliveryAnnotationsMap = message.getDeliveryAnnotations().getValue();
|
||||
}
|
||||
}
|
||||
|
||||
//----- Access to interal client resources -------------------------------//
|
||||
|
@ -302,6 +308,39 @@ public class AmqpMessage {
|
|||
return messageAnnotationsMap.get(Symbol.valueOf(key));
|
||||
}
|
||||
|
||||
/**
|
||||
* Perform a proper delivery annotation set on the AMQP Message based on a Symbol
|
||||
* key and the target value to append to the current delivery annotations.
|
||||
*
|
||||
* @param key
|
||||
* The name of the Symbol whose value is being set.
|
||||
* @param value
|
||||
* The new value to set in the delivery annotations of this message.
|
||||
*/
|
||||
public void setDeliveryAnnotation(String key, Object value) {
|
||||
checkReadOnly();
|
||||
lazyCreateDeliveryAnnotations();
|
||||
deliveryAnnotationsMap.put(Symbol.valueOf(key), value);
|
||||
}
|
||||
|
||||
/**
|
||||
* Given a message annotation name, lookup and return the value associated with
|
||||
* that annotation name. If the message annotations have not been created yet
|
||||
* then this method will always return null.
|
||||
*
|
||||
* @param key
|
||||
* the Symbol name that should be looked up in the message annotations.
|
||||
*
|
||||
* @return the value of the annotation if it exists, or null if not set or not accessible.
|
||||
*/
|
||||
public Object getDeliveryAnnotation(String key) {
|
||||
if (deliveryAnnotationsMap == null) {
|
||||
return null;
|
||||
}
|
||||
|
||||
return deliveryAnnotationsMap.get(Symbol.valueOf(key));
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets a String value into the body of an outgoing Message, throws
|
||||
* an exception if this is an incoming message instance.
|
||||
|
@ -347,6 +386,13 @@ public class AmqpMessage {
|
|||
}
|
||||
}
|
||||
|
||||
private void lazyCreateDeliveryAnnotations() {
|
||||
if (deliveryAnnotationsMap == null) {
|
||||
deliveryAnnotationsMap = new HashMap<Symbol,Object>();
|
||||
message.setDeliveryAnnotations(new DeliveryAnnotations(deliveryAnnotationsMap));
|
||||
}
|
||||
}
|
||||
|
||||
private void lazyCreateApplicationProperties() {
|
||||
if (applicationPropertiesMap == null) {
|
||||
applicationPropertiesMap = new HashMap<String, Object>();
|
||||
|
|
|
@ -0,0 +1,96 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.transport.amqp.interop;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
import static org.junit.Assert.assertNull;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
|
||||
import org.apache.activemq.broker.jmx.QueueViewMBean;
|
||||
import org.apache.activemq.transport.amqp.client.AmqpClient;
|
||||
import org.apache.activemq.transport.amqp.client.AmqpClientTestSupport;
|
||||
import org.apache.activemq.transport.amqp.client.AmqpConnection;
|
||||
import org.apache.activemq.transport.amqp.client.AmqpMessage;
|
||||
import org.apache.activemq.transport.amqp.client.AmqpReceiver;
|
||||
import org.apache.activemq.transport.amqp.client.AmqpSender;
|
||||
import org.apache.activemq.transport.amqp.client.AmqpSession;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.junit.runners.Parameterized;
|
||||
import org.junit.runners.Parameterized.Parameters;
|
||||
|
||||
/**
|
||||
* Test around the handling of Deliver Annotations in messages sent and received.
|
||||
*/
|
||||
@RunWith(Parameterized.class)
|
||||
public class AmqpDeliveryAnnotationsTest extends AmqpClientTestSupport {
|
||||
|
||||
private final String DELIVERY_ANNOTATION_NAME = "TEST-DELIVERY-ANNOTATION";
|
||||
|
||||
private final String transformer;
|
||||
|
||||
@Parameters(name="{0}")
|
||||
public static Collection<Object[]> data() {
|
||||
return Arrays.asList(new Object[][] {
|
||||
{"jms"},
|
||||
// {"native"},
|
||||
// {"raw"} We cannot fix these now because proton has no way to selectively
|
||||
// prune the incoming message bytes from delivery annotations section
|
||||
// can be stripped from the message.
|
||||
});
|
||||
}
|
||||
|
||||
public AmqpDeliveryAnnotationsTest(String transformer) {
|
||||
this.transformer = transformer;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String getAmqpTransformer() {
|
||||
return transformer;
|
||||
}
|
||||
|
||||
@Test(timeout = 60000)
|
||||
public void testDeliveryAnnotationsStrippedFromIncoming() throws Exception {
|
||||
AmqpClient client = createAmqpClient();
|
||||
AmqpConnection connection = client.connect();
|
||||
AmqpSession session = connection.createSession();
|
||||
|
||||
AmqpSender sender = session.createSender("queue://" + getTestName());
|
||||
AmqpReceiver receiver = session.createReceiver("queue://" + getTestName());
|
||||
|
||||
AmqpMessage message = new AmqpMessage();
|
||||
|
||||
message.setText("Test-Message");
|
||||
message.setDeliveryAnnotation(DELIVERY_ANNOTATION_NAME, getTestName());
|
||||
|
||||
sender.send(message);
|
||||
receiver.flow(1);
|
||||
|
||||
QueueViewMBean queue = getProxyToQueue(getTestName());
|
||||
assertEquals(1, queue.getQueueSize());
|
||||
|
||||
AmqpMessage received = receiver.receive(); //5, TimeUnit.SECONDS);
|
||||
assertNotNull(received);
|
||||
assertNull(received.getDeliveryAnnotation(DELIVERY_ANNOTATION_NAME));
|
||||
|
||||
sender.close();
|
||||
connection.close();
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue