ensure dispatched Messages are set to Read Only mode before passing onto the transformer. 

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1486951 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Timothy A. Bish 2013-05-28 15:04:25 +00:00
parent 9490793f2c
commit b62648b8d7
1 changed files with 71 additions and 16 deletions

View File

@ -16,41 +16,91 @@
*/ */
package org.apache.activemq.transport.amqp; package org.apache.activemq.transport.amqp;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReentrantLock;
import javax.jms.InvalidSelectorException;
import org.apache.activemq.broker.BrokerContext; import org.apache.activemq.broker.BrokerContext;
import org.apache.activemq.command.*; import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.ActiveMQTempQueue;
import org.apache.activemq.command.Command;
import org.apache.activemq.command.ConnectionError; import org.apache.activemq.command.ConnectionError;
import org.apache.activemq.command.ConnectionId;
import org.apache.activemq.command.ConnectionInfo;
import org.apache.activemq.command.ConsumerId;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.DestinationInfo;
import org.apache.activemq.command.ExceptionResponse;
import org.apache.activemq.command.LocalTransactionId;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageDispatch;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.ProducerId;
import org.apache.activemq.command.ProducerInfo;
import org.apache.activemq.command.RemoveInfo;
import org.apache.activemq.command.RemoveSubscriptionInfo;
import org.apache.activemq.command.Response;
import org.apache.activemq.command.SessionId;
import org.apache.activemq.command.SessionInfo;
import org.apache.activemq.command.TransactionInfo;
import org.apache.activemq.selector.SelectorParser; import org.apache.activemq.selector.SelectorParser;
import org.apache.activemq.util.IOExceptionSupport; import org.apache.activemq.util.IOExceptionSupport;
import org.apache.activemq.util.IdGenerator; import org.apache.activemq.util.IdGenerator;
import org.apache.activemq.util.LongSequenceGenerator; import org.apache.activemq.util.LongSequenceGenerator;
import org.apache.qpid.proton.amqp.*; import org.apache.qpid.proton.amqp.Binary;
import org.apache.qpid.proton.amqp.messaging.*; import org.apache.qpid.proton.amqp.DescribedType;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.UnsignedInteger;
import org.apache.qpid.proton.amqp.messaging.Accepted;
import org.apache.qpid.proton.amqp.messaging.AmqpValue;
import org.apache.qpid.proton.amqp.messaging.Modified; import org.apache.qpid.proton.amqp.messaging.Modified;
import org.apache.qpid.proton.amqp.messaging.Rejected; import org.apache.qpid.proton.amqp.messaging.Rejected;
import org.apache.qpid.proton.amqp.messaging.Released; import org.apache.qpid.proton.amqp.messaging.Released;
import org.apache.qpid.proton.amqp.messaging.Target; import org.apache.qpid.proton.amqp.messaging.Target;
import org.apache.qpid.proton.amqp.transaction.*; import org.apache.qpid.proton.amqp.transaction.Coordinator;
import org.apache.qpid.proton.amqp.transport.*; import org.apache.qpid.proton.amqp.transaction.Declare;
import org.apache.qpid.proton.engine.*; import org.apache.qpid.proton.amqp.transaction.Declared;
import org.apache.qpid.proton.amqp.transaction.Discharge;
import org.apache.qpid.proton.amqp.transaction.TransactionalState;
import org.apache.qpid.proton.amqp.transport.DeliveryState;
import org.apache.qpid.proton.amqp.transport.ErrorCondition;
import org.apache.qpid.proton.amqp.transport.SenderSettleMode;
import org.apache.qpid.proton.engine.Delivery;
import org.apache.qpid.proton.engine.EndpointError;
import org.apache.qpid.proton.engine.EndpointState;
import org.apache.qpid.proton.engine.Link;
import org.apache.qpid.proton.engine.Receiver;
import org.apache.qpid.proton.engine.Sasl;
import org.apache.qpid.proton.engine.Sender;
import org.apache.qpid.proton.engine.Session;
import org.apache.qpid.proton.engine.impl.ConnectionImpl; import org.apache.qpid.proton.engine.impl.ConnectionImpl;
import org.apache.qpid.proton.engine.impl.LinkImpl; import org.apache.qpid.proton.engine.impl.LinkImpl;
import org.apache.qpid.proton.engine.impl.ProtocolTracer; import org.apache.qpid.proton.engine.impl.ProtocolTracer;
import org.apache.qpid.proton.engine.impl.TransportImpl; import org.apache.qpid.proton.engine.impl.TransportImpl;
import org.apache.qpid.proton.framing.TransportFrame; import org.apache.qpid.proton.framing.TransportFrame;
import org.apache.qpid.proton.jms.*; import org.apache.qpid.proton.jms.AMQPNativeInboundTransformer;
import org.apache.qpid.proton.jms.AMQPRawInboundTransformer;
import org.apache.qpid.proton.jms.AutoOutboundTransformer;
import org.apache.qpid.proton.jms.EncodedMessage;
import org.apache.qpid.proton.jms.InboundTransformer;
import org.apache.qpid.proton.jms.JMSMappingInboundTransformer;
import org.apache.qpid.proton.jms.OutboundTransformer;
import org.apache.qpid.proton.message.impl.MessageImpl; import org.apache.qpid.proton.message.impl.MessageImpl;
import org.fusesource.hawtbuf.Buffer; import org.fusesource.hawtbuf.Buffer;
import org.fusesource.hawtbuf.ByteArrayOutputStream; import org.fusesource.hawtbuf.ByteArrayOutputStream;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import javax.jms.InvalidSelectorException;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReentrantLock;
class AmqpProtocolConverter { class AmqpProtocolConverter {
public static final EnumSet<EndpointState> UNINITIALIZED_SET = EnumSet.of(EndpointState.UNINITIALIZED); public static final EnumSet<EndpointState> UNINITIALIZED_SET = EnumSet.of(EndpointState.UNINITIALIZED);
@ -312,7 +362,7 @@ class AmqpProtocolConverter {
private static final IdGenerator CONNECTION_ID_GENERATOR = new IdGenerator(); private static final IdGenerator CONNECTION_ID_GENERATOR = new IdGenerator();
private final ConnectionId connectionId = new ConnectionId(CONNECTION_ID_GENERATOR.generateId()); private final ConnectionId connectionId = new ConnectionId(CONNECTION_ID_GENERATOR.generateId());
private ConnectionInfo connectionInfo = new ConnectionInfo(); private final ConnectionInfo connectionInfo = new ConnectionInfo();
private long nextSessionId = 0; private long nextSessionId = 0;
private long nextTempDestinationId = 0; private long nextTempDestinationId = 0;
@ -336,6 +386,7 @@ class AmqpProtocolConverter {
connectionInfo.setTransportContext(amqpTransport.getPeerCertificates()); connectionInfo.setTransportContext(amqpTransport.getPeerCertificates());
sendToActiveMQ(connectionInfo, new ResponseHandler() { sendToActiveMQ(connectionInfo, new ResponseHandler() {
@Override
public void onResponse(AmqpProtocolConverter converter, Response response) throws IOException { public void onResponse(AmqpProtocolConverter converter, Response response) throws IOException {
protonConnection.open(); protonConnection.open();
pumpProtonToSocket(); pumpProtonToSocket();
@ -609,6 +660,7 @@ class AmqpProtocolConverter {
ProducerInfo producerInfo = new ProducerInfo(producerId); ProducerInfo producerInfo = new ProducerInfo(producerId);
producerInfo.setDestination(dest); producerInfo.setDestination(dest);
sendToActiveMQ(producerInfo, new ResponseHandler() { sendToActiveMQ(producerInfo, new ResponseHandler() {
@Override
public void onResponse(AmqpProtocolConverter converter, Response response) throws IOException { public void onResponse(AmqpProtocolConverter converter, Response response) throws IOException {
if (response.isException()) { if (response.isException()) {
receiver.setTarget(null); receiver.setTarget(null);
@ -658,7 +710,7 @@ class AmqpProtocolConverter {
class ConsumerContext extends AmqpDeliveryListener { class ConsumerContext extends AmqpDeliveryListener {
private final ConsumerId consumerId; private final ConsumerId consumerId;
private final Sender sender; private final Sender sender;
private boolean presettle; private final boolean presettle;
private boolean closed; private boolean closed;
public ConsumerContext(ConsumerId consumerId, Sender sender) { public ConsumerContext(ConsumerId consumerId, Sender sender) {
@ -748,6 +800,7 @@ class AmqpProtocolConverter {
sender.drained(); sender.drained();
} else { } else {
jms.setRedeliveryCounter(md.getRedeliveryCounter()); jms.setRedeliveryCounter(md.getRedeliveryCounter());
jms.setReadOnlyBody(true);
final EncodedMessage amqp = outboundTransformer.transform(jms); final EncodedMessage amqp = outboundTransformer.transform(jms);
if( amqp!=null && amqp.getLength() > 0 ) { if( amqp!=null && amqp.getLength() > 0 ) {
@ -906,6 +959,7 @@ class AmqpProtocolConverter {
consumerContext.closed=true; consumerContext.closed=true;
sendToActiveMQ(rsi, new ResponseHandler() { sendToActiveMQ(rsi, new ResponseHandler() {
@Override
public void onResponse(AmqpProtocolConverter converter, Response response) throws IOException { public void onResponse(AmqpProtocolConverter converter, Response response) throws IOException {
if (response.isException()) { if (response.isException()) {
sender.setSource(null); sender.setSource(null);
@ -957,6 +1011,7 @@ class AmqpProtocolConverter {
} }
sendToActiveMQ(consumerInfo, new ResponseHandler() { sendToActiveMQ(consumerInfo, new ResponseHandler() {
@Override
public void onResponse(AmqpProtocolConverter converter, Response response) throws IOException { public void onResponse(AmqpProtocolConverter converter, Response response) throws IOException {
if (response.isException()) { if (response.isException()) {
sender.setSource(null); sender.setSource(null);