Fixes problem /w amqp impl of message acks.

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1403459 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Hiram R. Chirino 2012-10-29 18:41:44 +00:00
parent afe73c1861
commit 7806807db1
1 changed files with 12 additions and 2 deletions

View File

@ -482,7 +482,7 @@ class AmqpProtocolConverter {
final ActiveMQMessage message = (ActiveMQMessage) getInboundTransformer().transform(em); final ActiveMQMessage message = (ActiveMQMessage) getInboundTransformer().transform(em);
current = null; current = null;
if( message.getDestination()==null ) { if( destination!=null ) {
message.setJMSDestination(destination); message.setJMSDestination(destination);
} }
message.setProducerId(producerId); message.setProducerId(producerId);
@ -777,6 +777,7 @@ class AmqpProtocolConverter {
ack.setLastMessageId(md.getMessage().getMessageId()); ack.setLastMessageId(md.getMessage().getMessageId());
ack.setMessageCount(1); ack.setMessageCount(1);
ack.setAckType((byte)ackType); ack.setAckType((byte)ackType);
ack.setDestination(md.getDestination());
DeliveryState remoteState = delivery.getRemoteState(); DeliveryState remoteState = delivery.getRemoteState();
if( remoteState!=null && remoteState instanceof TransactionalState) { if( remoteState!=null && remoteState instanceof TransactionalState) {
@ -788,7 +789,15 @@ class AmqpProtocolConverter {
sendToActiveMQ(ack, new ResponseHandler() { sendToActiveMQ(ack, new ResponseHandler() {
@Override @Override
public void onResponse(AmqpProtocolConverter converter, Response response) throws IOException { public void onResponse(AmqpProtocolConverter converter, Response response) throws IOException {
if( response.isException() ) {
if (response.isException()) {
Throwable exception = ((ExceptionResponse) response).getException();
exception.printStackTrace();
sender.close();
}
} else {
delivery.settle(); delivery.settle();
}
pumpProtonToSocket(); pumpProtonToSocket();
} }
}); });
@ -885,6 +894,7 @@ class AmqpProtocolConverter {
sender.open(); sender.open();
if (response.isException()) { if (response.isException()) {
Throwable exception = ((ExceptionResponse) response).getException(); Throwable exception = ((ExceptionResponse) response).getException();
exception.printStackTrace();
sender.close(); sender.close();
} }
pumpProtonToSocket(); pumpProtonToSocket();