diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java index 61be2e7a55..30ca69e925 100644 --- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java @@ -482,7 +482,7 @@ class AmqpProtocolConverter { final ActiveMQMessage message = (ActiveMQMessage) getInboundTransformer().transform(em); current = null; - if( message.getDestination()==null ) { + if( destination!=null ) { message.setJMSDestination(destination); } message.setProducerId(producerId); @@ -777,6 +777,7 @@ class AmqpProtocolConverter { ack.setLastMessageId(md.getMessage().getMessageId()); ack.setMessageCount(1); ack.setAckType((byte)ackType); + ack.setDestination(md.getDestination()); DeliveryState remoteState = delivery.getRemoteState(); if( remoteState!=null && remoteState instanceof TransactionalState) { @@ -788,7 +789,15 @@ class AmqpProtocolConverter { sendToActiveMQ(ack, new ResponseHandler() { @Override public void onResponse(AmqpProtocolConverter converter, Response response) throws IOException { - delivery.settle(); + if( response.isException() ) { + if (response.isException()) { + Throwable exception = ((ExceptionResponse) response).getException(); + exception.printStackTrace(); + sender.close(); + } + } else { + delivery.settle(); + } pumpProtonToSocket(); } }); @@ -885,6 +894,7 @@ class AmqpProtocolConverter { sender.open(); if (response.isException()) { Throwable exception = ((ExceptionResponse) response).getException(); + exception.printStackTrace(); sender.close(); } pumpProtonToSocket();