mirror of https://github.com/apache/activemq.git
Allow fallback of message transformers when configured one can't handle the incoming message.
This commit is contained in:
parent
7043f32bb2
commit
13b915ad19
|
@ -24,6 +24,16 @@ public class AMQPNativeInboundTransformer extends AMQPRawInboundTransformer {
|
|||
super(vendor);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getTransformerName() {
|
||||
return TRANSFORMER_NATIVE;
|
||||
}
|
||||
|
||||
@Override
|
||||
public InboundTransformer getFallbackTransformer() {
|
||||
return new AMQPRawInboundTransformer(getVendor());
|
||||
}
|
||||
|
||||
@Override
|
||||
public Message transform(EncodedMessage amqpMessage) throws Exception {
|
||||
org.apache.qpid.proton.message.Message amqp = amqpMessage.decode();
|
||||
|
|
|
@ -25,6 +25,16 @@ public class AMQPRawInboundTransformer extends InboundTransformer {
|
|||
super(vendor);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getTransformerName() {
|
||||
return TRANSFORMER_RAW;
|
||||
}
|
||||
|
||||
@Override
|
||||
public InboundTransformer getFallbackTransformer() {
|
||||
return null; // No fallback from full raw transform
|
||||
}
|
||||
|
||||
@Override
|
||||
public Message transform(EncodedMessage amqpMessage) throws Exception {
|
||||
BytesMessage rc = vendor.createBytesMessage();
|
||||
|
|
|
@ -59,7 +59,11 @@ public abstract class InboundTransformer {
|
|||
this.vendor = vendor;
|
||||
}
|
||||
|
||||
abstract public Message transform(EncodedMessage amqpMessage) throws Exception;
|
||||
public abstract Message transform(EncodedMessage amqpMessage) throws Exception;
|
||||
|
||||
public abstract String getTransformerName();
|
||||
|
||||
public abstract InboundTransformer getFallbackTransformer();
|
||||
|
||||
public int getDefaultDeliveryMode() {
|
||||
return defaultDeliveryMode;
|
||||
|
|
|
@ -40,6 +40,16 @@ public class JMSMappingInboundTransformer extends InboundTransformer {
|
|||
super(vendor);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getTransformerName() {
|
||||
return TRANSFORMER_JMS;
|
||||
}
|
||||
|
||||
@Override
|
||||
public InboundTransformer getFallbackTransformer() {
|
||||
return new AMQPNativeInboundTransformer(getVendor());
|
||||
}
|
||||
|
||||
@SuppressWarnings({ "unchecked" })
|
||||
@Override
|
||||
public Message transform(EncodedMessage amqpMessage) throws Exception {
|
||||
|
|
|
@ -125,7 +125,7 @@ public class AmqpReceiver extends AmqpAbstractReceiver {
|
|||
|
||||
//----- Internal Implementation ------------------------------------------//
|
||||
|
||||
protected InboundTransformer getInboundTransformer() {
|
||||
protected InboundTransformer getTransformer() {
|
||||
if (inboundTransformer == null) {
|
||||
String transformer = session.getConnection().getConfiguredTransformer();
|
||||
if (transformer.equalsIgnoreCase(InboundTransformer.TRANSFORMER_JMS)) {
|
||||
|
@ -146,7 +146,26 @@ public class AmqpReceiver extends AmqpAbstractReceiver {
|
|||
protected void processDelivery(final Delivery delivery, Buffer deliveryBytes) throws Exception {
|
||||
if (!isClosed()) {
|
||||
EncodedMessage em = new EncodedMessage(delivery.getMessageFormat(), deliveryBytes.data, deliveryBytes.offset, deliveryBytes.length);
|
||||
final ActiveMQMessage message = (ActiveMQMessage) getInboundTransformer().transform(em);
|
||||
|
||||
InboundTransformer transformer = getTransformer();
|
||||
ActiveMQMessage message = null;
|
||||
|
||||
while (transformer != null) {
|
||||
try {
|
||||
message = (ActiveMQMessage) transformer.transform(em);
|
||||
break;
|
||||
} catch (Exception e) {
|
||||
LOG.debug("Transform of message using [{}] transformer, failed", getTransformer().getTransformerName());
|
||||
LOG.trace("Transformation error:", e);
|
||||
|
||||
transformer = transformer.getFallbackTransformer();
|
||||
}
|
||||
}
|
||||
|
||||
if (message == null) {
|
||||
throw new IOException("Failed to transform incoming delivery, skipping.");
|
||||
}
|
||||
|
||||
current = null;
|
||||
|
||||
if (isAnonymous()) {
|
||||
|
|
|
@ -18,10 +18,12 @@ package org.apache.activemq.transport.amqp.client;
|
|||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.NoSuchElementException;
|
||||
|
||||
import org.apache.activemq.transport.amqp.client.util.UnmodifiableDelivery;
|
||||
import org.apache.qpid.proton.Proton;
|
||||
import org.apache.qpid.proton.amqp.Binary;
|
||||
import org.apache.qpid.proton.amqp.DescribedType;
|
||||
import org.apache.qpid.proton.amqp.Symbol;
|
||||
import org.apache.qpid.proton.amqp.messaging.AmqpValue;
|
||||
import org.apache.qpid.proton.amqp.messaging.ApplicationProperties;
|
||||
|
@ -341,6 +343,8 @@ public class AmqpMessage {
|
|||
return deliveryAnnotationsMap.get(Symbol.valueOf(key));
|
||||
}
|
||||
|
||||
//----- Methods for manipulating the Message body ------------------------//
|
||||
|
||||
/**
|
||||
* Sets a String value into the body of an outgoing Message, throws
|
||||
* an exception if this is an incoming message instance.
|
||||
|
@ -371,6 +375,50 @@ public class AmqpMessage {
|
|||
getWrappedMessage().setBody(body);
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets a byte array value into the body of an outgoing Message, throws
|
||||
* an exception if this is an incoming message instance.
|
||||
*
|
||||
* @param value
|
||||
* the byte array value to store in the Message body.
|
||||
*
|
||||
* @throws IllegalStateException if the message is read only.
|
||||
*/
|
||||
public void setDescribedType(DescribedType described) throws IllegalStateException {
|
||||
checkReadOnly();
|
||||
AmqpValue body = new AmqpValue(described);
|
||||
getWrappedMessage().setBody(body);
|
||||
}
|
||||
|
||||
/**
|
||||
* Attempts to retrieve the message body as an DescribedType instance.
|
||||
*
|
||||
* @return an DescribedType instance if one is stored in the message body.
|
||||
*
|
||||
* @throws NoSuchElementException if the body does not contain a DescribedType.
|
||||
*/
|
||||
public DescribedType getDescribedType() throws NoSuchElementException {
|
||||
DescribedType result = null;
|
||||
|
||||
if (getWrappedMessage().getBody() == null) {
|
||||
return null;
|
||||
} else {
|
||||
if (getWrappedMessage().getBody() instanceof AmqpValue) {
|
||||
AmqpValue value = (AmqpValue) getWrappedMessage().getBody();
|
||||
|
||||
if (value.getValue() == null) {
|
||||
result = null;
|
||||
} else if (value.getValue() instanceof DescribedType) {
|
||||
result = (DescribedType) value.getValue();
|
||||
} else {
|
||||
throw new NoSuchElementException("Message does not contain a DescribedType body");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
//----- Internal implementation ------------------------------------------//
|
||||
|
||||
private void checkReadOnly() throws IllegalStateException {
|
||||
|
|
|
@ -398,7 +398,7 @@ public class AmqpSession extends AmqpAbstractResource<Session> {
|
|||
}
|
||||
|
||||
private void checkClosed() {
|
||||
if (isClosed()) {
|
||||
if (isClosed() || connection.isClosed()) {
|
||||
throw new IllegalStateException("Session is already closed");
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,176 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.transport.amqp.interop;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import javax.jms.BytesMessage;
|
||||
import javax.jms.Connection;
|
||||
import javax.jms.Destination;
|
||||
import javax.jms.Message;
|
||||
import javax.jms.MessageConsumer;
|
||||
import javax.jms.MessageProducer;
|
||||
import javax.jms.Session;
|
||||
|
||||
import org.apache.activemq.ActiveMQConnectionFactory;
|
||||
import org.apache.activemq.broker.jmx.QueueViewMBean;
|
||||
import org.apache.activemq.transport.amqp.client.AmqpClient;
|
||||
import org.apache.activemq.transport.amqp.client.AmqpClientTestSupport;
|
||||
import org.apache.activemq.transport.amqp.client.AmqpConnection;
|
||||
import org.apache.activemq.transport.amqp.client.AmqpMessage;
|
||||
import org.apache.activemq.transport.amqp.client.AmqpNoLocalFilter;
|
||||
import org.apache.activemq.transport.amqp.client.AmqpReceiver;
|
||||
import org.apache.activemq.transport.amqp.client.AmqpSender;
|
||||
import org.apache.activemq.transport.amqp.client.AmqpSession;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.junit.runners.Parameterized;
|
||||
import org.junit.runners.Parameterized.Parameters;
|
||||
|
||||
/**
|
||||
* Test that the broker can pass through an AMQP message with a described type
|
||||
* in the message body regardless of transformer in use.
|
||||
*/
|
||||
@RunWith(Parameterized.class)
|
||||
public class AmqpDescribedTypePayloadTest extends AmqpClientTestSupport {
|
||||
|
||||
private final String transformer;
|
||||
|
||||
@Parameters(name="{0}")
|
||||
public static Collection<Object[]> data() {
|
||||
return Arrays.asList(new Object[][] {
|
||||
{"jms"},
|
||||
{"native"},
|
||||
{"raw"}
|
||||
});
|
||||
}
|
||||
|
||||
public AmqpDescribedTypePayloadTest(String transformer) {
|
||||
this.transformer = transformer;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String getAmqpTransformer() {
|
||||
return transformer;
|
||||
}
|
||||
|
||||
@Test(timeout = 60000)
|
||||
public void testSendMessageWithDescribedTypeInBody() throws Exception {
|
||||
AmqpClient client = createAmqpClient();
|
||||
AmqpConnection connection = client.connect();
|
||||
AmqpSession session = connection.createSession();
|
||||
|
||||
AmqpSender sender = session.createSender("queue://" + getTestName());
|
||||
AmqpMessage message = new AmqpMessage();
|
||||
message.setDescribedType(new AmqpNoLocalFilter());
|
||||
sender.send(message);
|
||||
sender.close();
|
||||
|
||||
QueueViewMBean queue = getProxyToQueue(getTestName());
|
||||
assertEquals(1, queue.getQueueSize());
|
||||
|
||||
AmqpReceiver receiver = session.createReceiver("queue://" + getTestName());
|
||||
receiver.flow(1);
|
||||
AmqpMessage received = receiver.receive(5, TimeUnit.SECONDS);
|
||||
assertNotNull(received);
|
||||
assertNotNull(received.getDescribedType());
|
||||
receiver.close();
|
||||
|
||||
connection.close();
|
||||
}
|
||||
|
||||
@Test(timeout = 60000)
|
||||
public void testSendMessageWithDescribedTypeInBodyReceiveOverOpenWire() throws Exception {
|
||||
|
||||
AmqpClient client = createAmqpClient();
|
||||
AmqpConnection connection = client.connect();
|
||||
AmqpSession session = connection.createSession();
|
||||
|
||||
AmqpSender sender = session.createSender("queue://" + getTestName());
|
||||
AmqpMessage message = new AmqpMessage();
|
||||
message.setDescribedType(new AmqpNoLocalFilter());
|
||||
sender.send(message);
|
||||
sender.close();
|
||||
connection.close();
|
||||
|
||||
QueueViewMBean queue = getProxyToQueue(getTestName());
|
||||
assertEquals(1, queue.getQueueSize());
|
||||
|
||||
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(brokerService.getVmConnectorURI());
|
||||
Connection jmsConnection = factory.createConnection();
|
||||
Session jmsSession = jmsConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
Destination destination = jmsSession.createQueue(getTestName());
|
||||
MessageConsumer jmsConsumer = jmsSession.createConsumer(destination);
|
||||
jmsConnection.start();
|
||||
|
||||
Message received = jmsConsumer.receive(5000);
|
||||
assertNotNull(received);
|
||||
assertTrue(received instanceof BytesMessage);
|
||||
jmsConnection.close();
|
||||
}
|
||||
|
||||
@Test(timeout = 60000)
|
||||
public void testDescribedTypeMessageRoundTrips() throws Exception {
|
||||
|
||||
AmqpClient client = createAmqpClient();
|
||||
AmqpConnection connection = client.connect();
|
||||
AmqpSession session = connection.createSession();
|
||||
|
||||
// Send with AMQP client.
|
||||
AmqpSender sender = session.createSender("queue://" + getTestName());
|
||||
AmqpMessage message = new AmqpMessage();
|
||||
message.setDescribedType(new AmqpNoLocalFilter());
|
||||
sender.send(message);
|
||||
sender.close();
|
||||
|
||||
QueueViewMBean queue = getProxyToQueue(getTestName());
|
||||
assertEquals(1, queue.getQueueSize());
|
||||
|
||||
// Receive and resend with OpenWire JMS client
|
||||
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(brokerService.getVmConnectorURI());
|
||||
Connection jmsConnection = factory.createConnection();
|
||||
Session jmsSession = jmsConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
Destination destination = jmsSession.createQueue(getTestName());
|
||||
MessageConsumer jmsConsumer = jmsSession.createConsumer(destination);
|
||||
jmsConnection.start();
|
||||
|
||||
Message received = jmsConsumer.receive(5000);
|
||||
assertNotNull(received);
|
||||
assertTrue(received instanceof BytesMessage);
|
||||
|
||||
MessageProducer jmsProducer = jmsSession.createProducer(destination);
|
||||
jmsProducer.send(received);
|
||||
jmsConnection.close();
|
||||
|
||||
assertEquals(1, queue.getQueueSize());
|
||||
|
||||
// Now lets receive it with AMQP and see that we get back what we expected.
|
||||
AmqpReceiver receiver = session.createReceiver("queue://" + getTestName());
|
||||
receiver.flow(1);
|
||||
AmqpMessage returned = receiver.receive(5, TimeUnit.SECONDS);
|
||||
assertNotNull(returned);
|
||||
assertNotNull(returned.getDescribedType());
|
||||
receiver.close();
|
||||
connection.close();
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue