ARTEMIS-2458 Fix AMQP Transaction Session Close Ordering

This commit is contained in:
Michael Pearce 2019-08-21 08:19:32 +01:00 committed by Clebert Suconic
parent 61eb379741
commit 25d0b511ce
13 changed files with 181 additions and 38 deletions

View File

@ -363,7 +363,7 @@ public class AMQPSessionCallback implements SessionCallback {
public void closeSender(final Object brokerConsumer) throws Exception {
final ServerConsumer consumer = ((ServerConsumer) brokerConsumer);
consumer.close(false);
consumer.close(false, true);
consumer.getQueue().recheckRefCount(serverSession.getSessionContext());
}

View File

@ -50,7 +50,7 @@ public class ProtonTransactionImpl extends TransactionImpl {
private boolean discharged;
public ProtonTransactionImpl(final Xid xid, final StorageManager storageManager, final int timeoutSeconds, final AMQPConnectionContext connection) {
super(xid, storageManager, timeoutSeconds);
super(xid, storageManager, timeoutSeconds, true);
addOperation(new TransactionOperationAbstract() {
@Override
public void afterCommit(Transaction tx) {

View File

@ -146,6 +146,9 @@ public interface Queue extends Bindable,CriticalComponent {
ReferenceCounter getConsumersRefCount();
/* Called when a message is cancelled back into the queue */
void addSorted(List<MessageReference> refs, boolean scheduling);
void reload(MessageReference ref);
void addTail(MessageReference ref);
@ -154,6 +157,9 @@ public interface Queue extends Bindable,CriticalComponent {
void addHead(MessageReference ref, boolean scheduling);
/* Called when a message is cancelled back into the queue */
void addSorted(MessageReference ref, boolean scheduling);
void addHead(List<MessageReference> refs, boolean scheduling);
void acknowledge(MessageReference ref) throws Exception;

View File

@ -63,6 +63,8 @@ public interface ServerConsumer extends Consumer, ConsumerInfo {
void close(boolean failed) throws Exception;
void close(boolean failed, boolean sorted) throws Exception;
/**
* This method is just to remove itself from Queues.
* If for any reason during a close an exception occurred, the exception treatment

View File

@ -894,6 +894,25 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
}
}
/* Called when a message is cancelled back into the queue */
@Override
public void addSorted(final MessageReference ref, boolean scheduling) {
enterCritical(CRITICAL_PATH_ADD_HEAD);
synchronized (this) {
try {
if (!scheduling && scheduledDeliveryHandler.checkAndSchedule(ref, false)) {
return;
}
internalAddSorted(ref);
directDeliver = false;
} finally {
leaveCritical(CRITICAL_PATH_ADD_HEAD);
}
}
}
/* Called when a message is cancelled back into the queue */
@Override
public void addHead(final List<MessageReference> refs, boolean scheduling) {
@ -913,6 +932,25 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
}
}
/* Called when a message is cancelled back into the queue */
@Override
public void addSorted(final List<MessageReference> refs, boolean scheduling) {
enterCritical(CRITICAL_PATH_ADD_HEAD);
synchronized (this) {
try {
for (MessageReference ref : refs) {
addSorted(ref, scheduling);
}
resetAllIterators();
deliverAsync();
} finally {
leaveCritical(CRITICAL_PATH_ADD_HEAD);
}
}
}
@Override
public synchronized void reload(final MessageReference ref) {
queueMemorySize.addAndGet(ref.getMessageMemoryEstimate());
@ -3461,13 +3499,21 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
}
void postRollback(final LinkedList<MessageReference> refs) {
postRollback(refs, false);
}
void postRollback(final LinkedList<MessageReference> refs, boolean sorted) {
//if we have purged then ignore adding the messages back
if (purgeOnNoConsumers && getConsumerCount() == 0) {
purgeAfterRollback(refs);
return;
}
addHead(refs, false);
if (sorted) {
addSorted(refs, false);
} else {
addHead(refs, false);
}
}
private void purgeAfterRollback(LinkedList<MessageReference> refs) {

View File

@ -79,6 +79,11 @@ public class RefsOperation extends TransactionOperationAbstract {
@Override
public void afterRollback(final Transaction tx) {
afterRollback(tx, false);
}
@Override
public void afterRollback(final Transaction tx, boolean sorted) {
Map<QueueImpl, LinkedList<MessageReference>> queueMap = new HashMap<>();
long timeBase = System.currentTimeMillis();
@ -109,7 +114,7 @@ public class RefsOperation extends TransactionOperationAbstract {
QueueImpl queue = entry.getKey();
synchronized (queue) {
queue.postRollback(refs);
queue.postRollback(refs, sorted);
}
}

View File

@ -529,7 +529,12 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
}
@Override
public synchronized void close(final boolean failed) throws Exception {
public void close(final boolean failed) throws Exception {
close(failed, false);
}
@Override
public synchronized void close(final boolean failed, boolean sorted) throws Exception {
// Close should only ever be done once per consumer.
if (isClosed) return;
@ -555,7 +560,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
List<MessageReference> refs = cancelRefs(failed, false, null);
Transaction tx = new TransactionImpl(storageManager);
Transaction tx = new TransactionImpl(storageManager, sorted);
refs.forEach(ref -> {
if (logger.isTraceEnabled()) {

View File

@ -52,6 +52,10 @@ public interface TransactionOperation {
*/
void afterRollback(Transaction tx);
default void afterRollback(Transaction tx, boolean sorted) {
afterRollback(tx);
}
List<MessageReference> getRelatedMessageReferences();
List<MessageReference> getListOnConsumer(long consumerID);

View File

@ -63,6 +63,8 @@ public class TransactionImpl implements Transaction {
private final long createTime;
private final boolean sorted;
private volatile boolean containsPersistent;
private int timeoutSeconds = -1;
@ -96,47 +98,45 @@ public class TransactionImpl implements Transaction {
}
public TransactionImpl(final StorageManager storageManager, final int timeoutSeconds) {
this.storageManager = storageManager;
xid = null;
id = storageManager.generateID();
createTime = System.currentTimeMillis();
this.timeoutSeconds = timeoutSeconds;
this(storageManager.generateID(), null, storageManager, timeoutSeconds, false);
}
public TransactionImpl(final StorageManager storageManager) {
this.storageManager = storageManager;
this(storageManager, false);
}
xid = null;
id = storageManager.generateID();
createTime = System.currentTimeMillis();
public TransactionImpl(final StorageManager storageManager, boolean sorted) {
this(storageManager.generateID(), null, storageManager,-1, sorted);
}
public TransactionImpl(final Xid xid, final StorageManager storageManager, final int timeoutSeconds) {
this.storageManager = storageManager;
this(storageManager.generateID(), xid, storageManager, timeoutSeconds, false);
}
this.xid = xid;
id = storageManager.generateID();
createTime = System.currentTimeMillis();
this.timeoutSeconds = timeoutSeconds;
public TransactionImpl(final Xid xid, final StorageManager storageManager, final int timeoutSeconds, final boolean sorted) {
this(storageManager.generateID(), xid, storageManager, timeoutSeconds, sorted);
}
public TransactionImpl(final long id, final Xid xid, final StorageManager storageManager) {
this(id, xid, storageManager, -1, false);
}
public TransactionImpl(final long id, final Xid xid, final StorageManager storageManager, boolean sorted) {
this(id, xid, storageManager, -1, sorted);
}
private TransactionImpl(final long id, final Xid xid, final StorageManager storageManager, final int timeoutSeconds, boolean sorted) {
this.storageManager = storageManager;
this.xid = xid;
this.id = id;
createTime = System.currentTimeMillis();
this.createTime = System.currentTimeMillis();
this.timeoutSeconds = timeoutSeconds;
this.sorted = sorted;
}
// Transaction implementation
@ -217,7 +217,7 @@ public class TransactionImpl implements Transaction {
logger.trace("TransactionImpl::prepare::rollbackonly, rollingback " + this);
}
internalRollback();
internalRollback(sorted);
if (exception != null) {
throw exception;
@ -276,7 +276,7 @@ public class TransactionImpl implements Transaction {
return;
}
if (state == State.ROLLBACK_ONLY) {
internalRollback();
internalRollback(sorted);
if (exception != null) {
throw exception;
@ -379,11 +379,11 @@ public class TransactionImpl implements Transaction {
}
}
internalRollback();
internalRollback(sorted);
}
}
private void internalRollback() throws Exception {
private void internalRollback(boolean sorted) throws Exception {
if (logger.isTraceEnabled()) {
logger.trace("TransactionImpl::internalRollback " + this);
}
@ -418,7 +418,7 @@ public class TransactionImpl implements Transaction {
@Override
public void done() {
afterRollback(operationsToComplete);
afterRollback(operationsToComplete, sorted);
}
});
@ -432,7 +432,7 @@ public class TransactionImpl implements Transaction {
@Override
public void done() {
afterRollback(storeOperationsToComplete);
afterRollback(storeOperationsToComplete, sorted);
}
});
}
@ -562,10 +562,10 @@ public class TransactionImpl implements Transaction {
}
}
private synchronized void afterRollback(List<TransactionOperation> operationsToComplete) {
private synchronized void afterRollback(List<TransactionOperation> operationsToComplete, boolean sorted) {
if (operationsToComplete != null) {
for (TransactionOperation operation : operationsToComplete) {
operation.afterRollback(this);
operation.afterRollback(this, sorted);
}
// Help out GC here
operationsToComplete.clear();

View File

@ -1035,6 +1035,11 @@ public class ScheduledDeliveryHandlerTest extends Assert {
return null;
}
@Override
public void addSorted(List<MessageReference> refs, boolean scheduling) {
addHead(refs, scheduling);
}
@Override
public void reload(MessageReference ref) {
@ -1055,6 +1060,11 @@ public class ScheduledDeliveryHandlerTest extends Assert {
}
@Override
public void addSorted(MessageReference ref, boolean scheduling) {
}
@Override
public void addHead(List<MessageReference> refs, boolean scheduling) {
for (MessageReference ref : refs) {

View File

@ -90,6 +90,11 @@ public class DummyServerConsumer implements ServerConsumer {
}
@Override
public void close(boolean failed, boolean sorted) throws Exception {
}
@Override
public void removeItself() throws Exception {

View File

@ -129,4 +129,54 @@ public class JMSOrderTest extends JMSTestBase {
}
@Test(timeout = 60000)
public void testReceiveSomeThenClose() throws Exception {
Connection connection = protocolCF.createConnection();
try {
connection.start();
int totalCount = 5;
int consumeBeforeRollback = 2;
sendToAmqQueue(totalCount);
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
Queue queue = session.createQueue(name.getMethodName());
MessageConsumer consumer = session.createConsumer(queue);
for (int i = 1; i <= consumeBeforeRollback; i++) {
Message message = consumer.receive(3000);
assertNotNull(message);
assertEquals("Unexpected message number", i, message.getIntProperty("nr"));
}
session.close();
session = connection.createSession(true, Session.SESSION_TRANSACTED);
queue = session.createQueue(name.getMethodName());
consumer = session.createConsumer(queue);
// Consume again.. the previously consumed messages should get delivered
// again after the rollback and then the remainder should follow
List<Integer> messageNumbers = new ArrayList<>();
for (int i = 1; i <= totalCount; i++) {
Message message = consumer.receive(3000);
assertNotNull("Failed to receive message: " + i, message);
int msgNum = message.getIntProperty("nr");
System.out.println("Received " + msgNum);
messageNumbers.add(msgNum);
}
session.commit();
assertEquals("Unexpected size of list", totalCount, messageNumbers.size());
for (int i = 0; i < messageNumbers.size(); i++) {
assertEquals("Unexpected order of messages: " + messageNumbers, Integer.valueOf(i + 1), messageNumbers.get(i));
}
} finally {
connection.close();
}
}
}

View File

@ -268,6 +268,11 @@ public class FakeQueue extends CriticalComponentImpl implements Queue {
}
@Override
public void addSorted(MessageReference ref, boolean scheduling) {
}
@Override
public void addHead(List<MessageReference> ref, boolean scheduling) {
// no-op
@ -459,6 +464,11 @@ public class FakeQueue extends CriticalComponentImpl implements Queue {
return null; //To change body of implemented methods use File | Settings | File Templates.
}
@Override
public void addSorted(List<MessageReference> refs, boolean scheduling) {
}
@Override
public Set<Consumer> getConsumers() {
// no-op