From 7806807db12a4d62e221bee4adf118705b25e870 Mon Sep 17 00:00:00 2001 From: "Hiram R. Chirino" Date: Mon, 29 Oct 2012 18:41:44 +0000 Subject: [PATCH] Fixes problem /w amqp impl of message acks. git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1403459 13f79535-47bb-0310-9956-ffa450edef68 --- .../transport/amqp/AmqpProtocolConverter.java | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java index 61be2e7a55..30ca69e925 100644 --- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java @@ -482,7 +482,7 @@ class AmqpProtocolConverter { final ActiveMQMessage message = (ActiveMQMessage) getInboundTransformer().transform(em); current = null; - if( message.getDestination()==null ) { + if( destination!=null ) { message.setJMSDestination(destination); } message.setProducerId(producerId); @@ -777,6 +777,7 @@ class AmqpProtocolConverter { ack.setLastMessageId(md.getMessage().getMessageId()); ack.setMessageCount(1); ack.setAckType((byte)ackType); + ack.setDestination(md.getDestination()); DeliveryState remoteState = delivery.getRemoteState(); if( remoteState!=null && remoteState instanceof TransactionalState) { @@ -788,7 +789,15 @@ class AmqpProtocolConverter { sendToActiveMQ(ack, new ResponseHandler() { @Override public void onResponse(AmqpProtocolConverter converter, Response response) throws IOException { - delivery.settle(); + if( response.isException() ) { + if (response.isException()) { + Throwable exception = ((ExceptionResponse) response).getException(); + exception.printStackTrace(); + sender.close(); + } + } else { + delivery.settle(); + } pumpProtonToSocket(); } }); @@ -885,6 +894,7 @@ class AmqpProtocolConverter { sender.open(); if (response.isException()) { Throwable exception = ((ExceptionResponse) response).getException(); + exception.printStackTrace(); sender.close(); } pumpProtonToSocket();