Convert to new proton APIs.

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1421487 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Hiram R. Chirino 2012-12-13 20:20:53 +00:00
parent c73027da1a
commit ebaacfd4f6
1 changed files with 41 additions and 30 deletions

View File

@ -18,25 +18,27 @@ package org.apache.activemq.transport.amqp;
import org.apache.activemq.broker.BrokerContext; import org.apache.activemq.broker.BrokerContext;
import org.apache.activemq.command.*; import org.apache.activemq.command.*;
import org.apache.activemq.command.ConnectionError;
import org.apache.activemq.selector.SelectorParser; import org.apache.activemq.selector.SelectorParser;
import org.apache.activemq.util.IOExceptionSupport; import org.apache.activemq.util.IOExceptionSupport;
import org.apache.activemq.util.IdGenerator; import org.apache.activemq.util.IdGenerator;
import org.apache.activemq.util.LongSequenceGenerator; import org.apache.activemq.util.LongSequenceGenerator;
import org.apache.qpid.proton.amqp.*;
import org.apache.qpid.proton.amqp.messaging.*;
import org.apache.qpid.proton.amqp.messaging.Modified;
import org.apache.qpid.proton.amqp.messaging.Rejected;
import org.apache.qpid.proton.amqp.messaging.Released;
import org.apache.qpid.proton.amqp.messaging.Target;
import org.apache.qpid.proton.amqp.transaction.*;
import org.apache.qpid.proton.amqp.transport.*;
import org.apache.qpid.proton.engine.*; import org.apache.qpid.proton.engine.*;
import org.apache.qpid.proton.engine.impl.*; import org.apache.qpid.proton.engine.impl.ConnectionImpl;
import org.apache.qpid.proton.engine.impl.LinkImpl;
import org.apache.qpid.proton.engine.impl.ProtocolTracer;
import org.apache.qpid.proton.engine.impl.TransportImpl;
import org.apache.qpid.proton.framing.TransportFrame; import org.apache.qpid.proton.framing.TransportFrame;
import org.apache.qpid.proton.jms.*; import org.apache.qpid.proton.jms.*;
import org.apache.qpid.proton.type.Binary; import org.apache.qpid.proton.message.impl.MessageImpl;
import org.apache.qpid.proton.type.DescribedType;
import org.apache.qpid.proton.type.Symbol;
import org.apache.qpid.proton.type.UnsignedInteger;
import org.apache.qpid.proton.type.messaging.*;
import org.apache.qpid.proton.type.messaging.Modified;
import org.apache.qpid.proton.type.messaging.Rejected;
import org.apache.qpid.proton.type.messaging.Released;
import org.apache.qpid.proton.type.transaction.*;
import org.apache.qpid.proton.type.transport.DeliveryState;
import org.apache.qpid.proton.type.transport.SenderSettleMode;
import org.fusesource.hawtbuf.Buffer; import org.fusesource.hawtbuf.Buffer;
import org.fusesource.hawtbuf.ByteArrayOutputStream; import org.fusesource.hawtbuf.ByteArrayOutputStream;
import org.slf4j.Logger; import org.slf4j.Logger;
@ -472,9 +474,10 @@ class AmqpProtocolConverter {
if( response.isException() ) { if( response.isException() ) {
ExceptionResponse er = (ExceptionResponse)response; ExceptionResponse er = (ExceptionResponse)response;
Rejected rejected = new Rejected(); Rejected rejected = new Rejected();
ArrayList errors = new ArrayList(); ErrorCondition condition = new ErrorCondition();
errors.add(er.getException().getMessage()); condition.setCondition(Symbol.valueOf("failed"));
rejected.setError(errors); condition.setDescription(er.getException().getMessage());
rejected.setError(condition);
delivery.disposition(rejected); delivery.disposition(rejected);
} }
} }
@ -509,8 +512,7 @@ class AmqpProtocolConverter {
@Override @Override
protected void onMessage(Receiver receiver, final Delivery delivery, Buffer buffer) throws Exception { protected void onMessage(Receiver receiver, final Delivery delivery, Buffer buffer) throws Exception {
org.apache.qpid.proton.message.Message msg = new org.apache.qpid.proton.message.Message(); MessageImpl msg = new MessageImpl();
int offset = buffer.offset; int offset = buffer.offset;
int len = buffer.length; int len = buffer.length;
while( len > 0 ) { while( len > 0 ) {
@ -557,9 +559,7 @@ class AmqpProtocolConverter {
if( response.isException() ) { if( response.isException() ) {
ExceptionResponse er = (ExceptionResponse)response; ExceptionResponse er = (ExceptionResponse)response;
Rejected rejected = new Rejected(); Rejected rejected = new Rejected();
ArrayList errors = new ArrayList(); rejected.setError(createErrorCondition("failed", er.getException().getMessage()));
errors.add(er.getException().getMessage());
rejected.setError(errors);
delivery.disposition(rejected); delivery.disposition(rejected);
} }
delivery.settle(); delivery.settle();
@ -577,7 +577,7 @@ class AmqpProtocolConverter {
void onReceiverOpen(final Receiver receiver, AmqpSessionContext sessionContext) { void onReceiverOpen(final Receiver receiver, AmqpSessionContext sessionContext) {
// Client is producing to this receiver object // Client is producing to this receiver object
org.apache.qpid.proton.type.transport.Target remoteTarget = receiver.getRemoteTarget(); org.apache.qpid.proton.amqp.transport.Target remoteTarget = receiver.getRemoteTarget();
if( remoteTarget instanceof Coordinator ) { if( remoteTarget instanceof Coordinator ) {
pumpProtonToSocket(); pumpProtonToSocket();
receiver.setContext(coordinatorContext); receiver.setContext(coordinatorContext);
@ -585,12 +585,12 @@ class AmqpProtocolConverter {
receiver.open(); receiver.open();
pumpProtonToSocket(); pumpProtonToSocket();
} else { } else {
org.apache.qpid.proton.type.messaging.Target target = (Target) remoteTarget; org.apache.qpid.proton.amqp.messaging.Target target = (Target) remoteTarget;
ProducerId producerId = new ProducerId(sessionContext.sessionId, sessionContext.nextProducerId++); ProducerId producerId = new ProducerId(sessionContext.sessionId, sessionContext.nextProducerId++);
ActiveMQDestination dest; ActiveMQDestination dest;
if( target.getDynamic() ) { if( target.getDynamic() ) {
dest = createTempQueue(); dest = createTempQueue();
org.apache.qpid.proton.type.messaging.Target actualTarget = new org.apache.qpid.proton.type.messaging.Target(); org.apache.qpid.proton.amqp.messaging.Target actualTarget = new org.apache.qpid.proton.amqp.messaging.Target();
actualTarget.setAddress(dest.getQualifiedName()); actualTarget.setAddress(dest.getQualifiedName());
actualTarget.setDynamic(true); actualTarget.setDynamic(true);
receiver.setTarget(actualTarget); receiver.setTarget(actualTarget);
@ -625,11 +625,11 @@ class AmqpProtocolConverter {
private ActiveMQDestination createDestination(Object terminus) { private ActiveMQDestination createDestination(Object terminus) {
if( terminus == null ) { if( terminus == null ) {
return null; return null;
} else if( terminus instanceof org.apache.qpid.proton.type.messaging.Source) { } else if( terminus instanceof org.apache.qpid.proton.amqp.messaging.Source) {
org.apache.qpid.proton.type.messaging.Source source = (org.apache.qpid.proton.type.messaging.Source)terminus; org.apache.qpid.proton.amqp.messaging.Source source = (org.apache.qpid.proton.amqp.messaging.Source)terminus;
return ActiveMQDestination.createDestination(source.getAddress(), ActiveMQDestination.QUEUE_TYPE); return ActiveMQDestination.createDestination(source.getAddress(), ActiveMQDestination.QUEUE_TYPE);
} else if( terminus instanceof org.apache.qpid.proton.type.messaging.Target) { } else if( terminus instanceof org.apache.qpid.proton.amqp.messaging.Target) {
org.apache.qpid.proton.type.messaging.Target target = (org.apache.qpid.proton.type.messaging.Target)terminus; org.apache.qpid.proton.amqp.messaging.Target target = (org.apache.qpid.proton.amqp.messaging.Target)terminus;
return ActiveMQDestination.createDestination(target.getAddress(), ActiveMQDestination.QUEUE_TYPE); return ActiveMQDestination.createDestination(target.getAddress(), ActiveMQDestination.QUEUE_TYPE);
} else if( terminus instanceof Coordinator ) { } else if( terminus instanceof Coordinator ) {
Coordinator target = (Coordinator)terminus; Coordinator target = (Coordinator)terminus;
@ -849,7 +849,7 @@ class AmqpProtocolConverter {
private final ConcurrentHashMap<ConsumerId, ConsumerContext> subscriptionsByConsumerId = new ConcurrentHashMap<ConsumerId, ConsumerContext>(); private final ConcurrentHashMap<ConsumerId, ConsumerContext> subscriptionsByConsumerId = new ConcurrentHashMap<ConsumerId, ConsumerContext>();
void onSenderOpen(final Sender sender, AmqpSessionContext sessionContext) { void onSenderOpen(final Sender sender, AmqpSessionContext sessionContext) {
org.apache.qpid.proton.type.messaging.Source source = (org.apache.qpid.proton.type.messaging.Source)sender.getRemoteSource(); org.apache.qpid.proton.amqp.messaging.Source source = (org.apache.qpid.proton.amqp.messaging.Source)sender.getRemoteSource();
final ConsumerId id = new ConsumerId(sessionContext.sessionId, sessionContext.nextConsumerId++); final ConsumerId id = new ConsumerId(sessionContext.sessionId, sessionContext.nextConsumerId++);
ConsumerContext consumerContext = new ConsumerContext(id, sender); ConsumerContext consumerContext = new ConsumerContext(id, sender);
@ -879,7 +879,7 @@ class AmqpProtocolConverter {
ActiveMQDestination dest; ActiveMQDestination dest;
if( source == null ) { if( source == null ) {
source = new org.apache.qpid.proton.type.messaging.Source(); source = new org.apache.qpid.proton.amqp.messaging.Source();
source.setAddress(""); source.setAddress("");
source.setCapabilities(DURABLE_SUBSCRIPTION_ENDED); source.setCapabilities(DURABLE_SUBSCRIPTION_ENDED);
sender.setSource(source); sender.setSource(source);
@ -912,7 +912,7 @@ class AmqpProtocolConverter {
} else if( source.getDynamic() ) { } else if( source.getDynamic() ) {
// lets create a temp dest. // lets create a temp dest.
dest = createTempQueue(); dest = createTempQueue();
source = new org.apache.qpid.proton.type.messaging.Source(); source = new org.apache.qpid.proton.amqp.messaging.Source();
source.setAddress(dest.getQualifiedName()); source.setAddress(dest.getQualifiedName());
source.setDynamic(true); source.setDynamic(true);
sender.setSource(source); sender.setSource(source);
@ -1023,4 +1023,15 @@ class AmqpProtocolConverter {
} }
} }
ErrorCondition createErrorCondition(String name) {
return createErrorCondition(name, "");
}
ErrorCondition createErrorCondition(String name, String description) {
ErrorCondition condition = new ErrorCondition();
condition.setCondition(Symbol.valueOf(name));
condition.setDescription(description);
return condition;
}
} }