ARTEMIS-1535 - settle delivery in same lock as sending the disposition

https://issues.apache.org/jira/browse/ARTEMIS-1535
This commit is contained in:
Andy Taylor 2017-12-05 12:12:54 +00:00 committed by Clebert Suconic
parent e129fc795b
commit 34a912b644
1 changed files with 23 additions and 25 deletions

View File

@ -101,17 +101,29 @@ public class ProtonTransactionHandler implements ProtonDeliveryHandler {
}
Object action = ((AmqpValue) msg.getBody()).getValue();
if (action instanceof Declare) {
Binary txID = sessionSPI.newTransaction();
Declared declared = new Declared();
declared.setTxnId(txID);
connection.lock();
try {
delivery.disposition(declared);
} finally {
connection.unlock();
}
IOCallback ioAction = new IOCallback() {
@Override
public void done() {
connection.lock();
try {
delivery.settle();
delivery.disposition(declared);
} finally {
connection.unlock();
connection.flush();
}
}
@Override
public void onError(int errorCode, String errorMessage) {
}
};
sessionSPI.afterIO(ioAction);
} else if (action instanceof Discharge) {
Discharge discharge = (Discharge) action;
@ -124,9 +136,11 @@ public class ProtonTransactionHandler implements ProtonDeliveryHandler {
public void done() {
connection.lock();
try {
delivery.settle();
delivery.disposition(new Accepted());
} finally {
connection.unlock();
connection.flush();
}
}
@ -148,6 +162,7 @@ public class ProtonTransactionHandler implements ProtonDeliveryHandler {
log.warn(amqpE.getMessage(), amqpE);
connection.lock();
try {
delivery.settle();
delivery.disposition(createRejected(amqpE.getAmqpError(), amqpE.getMessage()));
} finally {
connection.unlock();
@ -157,29 +172,12 @@ public class ProtonTransactionHandler implements ProtonDeliveryHandler {
log.warn(e.getMessage(), e);
connection.lock();
try {
delivery.settle();
delivery.disposition(createRejected(Symbol.getSymbol("failed"), e.getMessage()));
} finally {
connection.unlock();
}
connection.flush();
} finally {
sessionSPI.afterIO(new IOCallback() {
@Override
public void done() {
connection.lock();
try {
delivery.settle();
} finally {
connection.unlock();
}
connection.flush();
}
@Override
public void onError(int errorCode, String errorMessage) {
}
});
}
}