This commit is contained in:
Clebert Suconic 2021-02-22 17:32:38 -05:00
commit 4de4329cb1
18 changed files with 177 additions and 95 deletions

View File

@ -398,7 +398,7 @@ public class AMQPSessionCallback implements SessionCallback {
public void closeSender(final Object brokerConsumer) throws Exception {
final ServerConsumer consumer = ((ServerConsumer) brokerConsumer);
consumer.close(false, true);
consumer.close(false);
consumer.getQueue().recheckRefCount(serverSession.getSessionContext());
}
@ -440,7 +440,7 @@ public class AMQPSessionCallback implements SessionCallback {
public void cancel(Object brokerConsumer, Message message, boolean updateCounts) throws Exception {
OperationContext oldContext = recoverContext();
try {
((ServerConsumer) brokerConsumer).individualCancel(message.getMessageID(), updateCounts, true);
((ServerConsumer) brokerConsumer).individualCancel(message.getMessageID(), updateCounts);
((ServerConsumer) brokerConsumer).getQueue().forceDelivery();
} finally {
resetContext(oldContext);

View File

@ -50,7 +50,7 @@ public class ProtonTransactionImpl extends TransactionImpl {
private boolean discharged;
public ProtonTransactionImpl(final Xid xid, final StorageManager storageManager, final int timeoutSeconds, final AMQPConnectionContext connection) {
super(xid, storageManager, timeoutSeconds, true);
super(xid, storageManager, timeoutSeconds);
addOperation(new TransactionOperationAbstract() {
@Override
public void afterCommit(Transaction tx) {

View File

@ -133,7 +133,7 @@ public class MQTTPublishManager {
sendServerMessage(mqttid, message, deliveryCount, qos);
} else {
// Client must have disconnected and it's Subscription QoS cleared
consumer.individualCancel(message.getMessageID(), false, true);
consumer.individualCancel(message.getMessageID(), false);
}
}
}

View File

@ -220,8 +220,7 @@ public interface Queue extends Bindable,CriticalComponent {
void cancel(Transaction tx, MessageReference ref, boolean ignoreRedeliveryCheck);
/** @param sorted it should use the messageID as a reference to where to add it in the queue */
void cancel(MessageReference reference, long timeBase, boolean sorted) throws Exception;
void cancel(MessageReference reference, long timeBase) throws Exception;
void deliverAsync();

View File

@ -64,8 +64,6 @@ public interface ServerConsumer extends Consumer, ConsumerInfo {
void close(boolean failed) throws Exception;
void close(boolean failed, boolean sorted) throws Exception;
/**
* This method is just to remove itself from Queues.
* If for any reason during a close an exception occurred, the exception treatment
@ -101,7 +99,7 @@ public interface ServerConsumer extends Consumer, ConsumerInfo {
void reject(long messageID) throws Exception;
void individualCancel(long messageID, boolean failed, boolean sorted) throws Exception;
void individualCancel(long messageID, boolean failed) throws Exception;
void forceDelivery(long sequence);

View File

@ -355,7 +355,7 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
refqueue = ref.getQueue();
try {
refqueue.cancel(ref, timeBase, false);
refqueue.cancel(ref, timeBase);
} catch (Exception e) {
// There isn't much we can do besides log an error
ActiveMQServerLogger.LOGGER.errorCancellingRefOnBridge(e, ref);

View File

@ -17,6 +17,7 @@
package org.apache.activemq.artemis.core.server.impl;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
@ -193,6 +194,18 @@ public class LastValueQueue extends QueueImpl {
}
}
/** LVQ has to use regular addHead due to last value queues calculations */
@Override
public void addSorted(MessageReference ref, boolean scheduling) {
this.addHead(ref, scheduling);
}
/** LVQ has to use regular addHead due to last value queues calculations */
@Override
public void addSorted(List<MessageReference> refs, boolean scheduling) {
this.addHead(refs, scheduling);
}
@Override
public synchronized void addHead(final MessageReference ref, boolean scheduling) {
// we first need to check redelivery-delay, as we can't put anything on headers if redelivery-delay

View File

@ -1115,13 +1115,18 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
enterCritical(CRITICAL_PATH_ADD_HEAD);
synchronized (this) {
try {
if (!scheduling && scheduledDeliveryHandler.checkAndSchedule(ref, false)) {
return;
if (ringSize != -1) {
enforceRing(ref, false, true);
}
internalAddSorted(ref);
if (!ref.isAlreadyAcked()) {
if (!scheduling && scheduledDeliveryHandler.checkAndSchedule(ref, false)) {
return;
}
internalAddSorted(ref);
directDeliver = false;
directDeliver = false;
}
} finally {
leaveCritical(CRITICAL_PATH_ADD_HEAD);
}
@ -1948,15 +1953,11 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
}
@Override
public synchronized void cancel(final MessageReference reference, final long timeBase, boolean sorted) throws Exception {
public synchronized void cancel(final MessageReference reference, final long timeBase) throws Exception {
Pair<Boolean, Boolean> redeliveryResult = checkRedelivery(reference, timeBase, false);
if (redeliveryResult.getA()) {
if (!scheduledDeliveryHandler.checkAndSchedule(reference, false)) {
if (sorted) {
internalAddSorted(reference);
} else {
internalAddHead(reference);
}
internalAddSorted(reference);
}
resetAllIterators();
@ -2862,6 +2863,8 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
int priority = getPriority(ref);
messageReferences.addSorted(ref, priority);
ref.setInDelivery(false);
}
private int getPriority(MessageReference ref) {
@ -3933,10 +3936,6 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
}
void postRollback(final LinkedList<MessageReference> refs) {
postRollback(refs, false);
}
void postRollback(final LinkedList<MessageReference> refs, boolean sorted) {
//if we have purged then ignore adding the messages back
if (purgeOnNoConsumers && getConsumerCount() == 0) {
purgeAfterRollback(refs);
@ -3946,11 +3945,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
// if the queue is non-destructive then any ack is ignored so no need to add messages back onto the queue
if (!isNonDestructive()) {
if (sorted) {
addSorted(refs, false);
} else {
addHead(refs, false);
}
addSorted(refs, false);
}
}

View File

@ -84,11 +84,6 @@ public class RefsOperation extends TransactionOperationAbstract {
@Override
public void afterRollback(final Transaction tx) {
afterRollback(tx, false);
}
@Override
public void afterRollback(final Transaction tx, boolean sorted) {
Map<QueueImpl, LinkedList<MessageReference>> queueMap = new HashMap<>();
long timeBase = System.currentTimeMillis();
@ -121,7 +116,7 @@ public class RefsOperation extends TransactionOperationAbstract {
QueueImpl queue = entry.getKey();
synchronized (queue) {
queue.postRollback(refs, sorted);
queue.postRollback(refs);
}
}

View File

@ -539,12 +539,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
}
@Override
public void close(final boolean failed) throws Exception {
close(failed, false);
}
@Override
public synchronized void close(final boolean failed, boolean sorted) throws Exception {
public synchronized void close(final boolean failed) throws Exception {
// Close should only ever be done once per consumer.
if (isClosed) return;
@ -570,7 +565,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
List<MessageReference> refs = cancelRefs(failed, false, null);
Transaction tx = new TransactionImpl(storageManager, sorted);
Transaction tx = new TransactionImpl(storageManager);
refs.forEach(ref -> {
if (logger.isTraceEnabled()) {
@ -1022,7 +1017,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
}
@Override
public synchronized void individualCancel(final long messageID, boolean failed, boolean sorted) throws Exception {
public synchronized void individualCancel(final long messageID, boolean failed) throws Exception {
if (browseOnly) {
return;
}
@ -1037,7 +1032,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
ref.decrementDeliveryCount();
}
ref.getQueue().cancel(ref, System.currentTimeMillis(), sorted);
ref.getQueue().cancel(ref, System.currentTimeMillis());
}

View File

@ -1268,7 +1268,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
ServerConsumer consumer = locateConsumer(consumerID);
if (consumer != null) {
consumer.individualCancel(messageID, failed, false);
consumer.individualCancel(messageID, failed);
}
}

View File

@ -52,10 +52,6 @@ public interface TransactionOperation {
*/
void afterRollback(Transaction tx);
default void afterRollback(Transaction tx, boolean sorted) {
afterRollback(tx);
}
List<MessageReference> getRelatedMessageReferences();
List<MessageReference> getListOnConsumer(long consumerID);

View File

@ -63,8 +63,6 @@ public class TransactionImpl implements Transaction {
private final long createTime;
private final boolean sorted;
private volatile boolean containsPersistent;
private int timeoutSeconds = -1;
@ -98,34 +96,23 @@ public class TransactionImpl implements Transaction {
}
public TransactionImpl(final StorageManager storageManager, final int timeoutSeconds) {
this(storageManager.generateID(), null, storageManager, timeoutSeconds, false);
this(storageManager.generateID(), null, storageManager, timeoutSeconds);
}
public TransactionImpl(final StorageManager storageManager) {
this(storageManager, false);
this(storageManager.generateID(), null, storageManager,-1);
}
public TransactionImpl(final StorageManager storageManager, boolean sorted) {
this(storageManager.generateID(), null, storageManager,-1, sorted);
}
public TransactionImpl(final Xid xid, final StorageManager storageManager, final int timeoutSeconds) {
this(storageManager.generateID(), xid, storageManager, timeoutSeconds, false);
}
public TransactionImpl(final Xid xid, final StorageManager storageManager, final int timeoutSeconds, final boolean sorted) {
this(storageManager.generateID(), xid, storageManager, timeoutSeconds, sorted);
this(storageManager.generateID(), xid, storageManager, timeoutSeconds);
}
public TransactionImpl(final long id, final Xid xid, final StorageManager storageManager) {
this(id, xid, storageManager, -1, false);
this(id, xid, storageManager, -1);
}
public TransactionImpl(final long id, final Xid xid, final StorageManager storageManager, boolean sorted) {
this(id, xid, storageManager, -1, sorted);
}
private TransactionImpl(final long id, final Xid xid, final StorageManager storageManager, final int timeoutSeconds, boolean sorted) {
private TransactionImpl(final long id, final Xid xid, final StorageManager storageManager, final int timeoutSeconds) {
this.storageManager = storageManager;
this.xid = xid;
@ -135,8 +122,6 @@ public class TransactionImpl implements Transaction {
this.createTime = System.currentTimeMillis();
this.timeoutSeconds = timeoutSeconds;
this.sorted = sorted;
}
// Transaction implementation
@ -217,7 +202,7 @@ public class TransactionImpl implements Transaction {
logger.trace("TransactionImpl::prepare::rollbackonly, rollingback " + this);
}
internalRollback(sorted);
internalRollback();
if (exception != null) {
throw exception;
@ -276,7 +261,7 @@ public class TransactionImpl implements Transaction {
return;
}
if (state == State.ROLLBACK_ONLY) {
internalRollback(sorted);
internalRollback();
if (exception != null) {
throw exception;
@ -367,7 +352,7 @@ public class TransactionImpl implements Transaction {
}
if (state != State.PREPARED) {
try {
internalRollback(sorted);
internalRollback();
} catch (Exception e) {
// nothing we can do beyond logging
// no need to special handler here as this was not even supposed to happen at this point
@ -400,11 +385,11 @@ public class TransactionImpl implements Transaction {
}
}
internalRollback(sorted);
internalRollback();
}
}
private void internalRollback(boolean sorted) throws Exception {
private void internalRollback() throws Exception {
if (logger.isTraceEnabled()) {
logger.trace("TransactionImpl::internalRollback " + this);
}
@ -439,7 +424,7 @@ public class TransactionImpl implements Transaction {
@Override
public void done() {
afterRollback(operationsToComplete, sorted);
afterRollback(operationsToComplete);
}
});
@ -453,7 +438,7 @@ public class TransactionImpl implements Transaction {
@Override
public void done() {
afterRollback(storeOperationsToComplete, sorted);
afterRollback(storeOperationsToComplete);
}
});
}
@ -583,10 +568,10 @@ public class TransactionImpl implements Transaction {
}
}
private synchronized void afterRollback(List<TransactionOperation> operationsToComplete, boolean sorted) {
private synchronized void afterRollback(List<TransactionOperation> operationsToComplete) {
if (operationsToComplete != null) {
for (TransactionOperation operation : operationsToComplete) {
operation.afterRollback(this, sorted);
operation.afterRollback(this);
}
// Help out GC here
operationsToComplete.clear();

View File

@ -1217,7 +1217,7 @@ public class ScheduledDeliveryHandlerTest extends Assert {
}
@Override
public void cancel(MessageReference reference, long timeBase, boolean backInPlace) throws Exception {
public void cancel(MessageReference reference, long timeBase) throws Exception {
}

View File

@ -91,11 +91,6 @@ public class DummyServerConsumer implements ServerConsumer {
}
@Override
public void close(boolean failed, boolean sorted) throws Exception {
}
@Override
public void removeItself() throws Exception {
@ -156,7 +151,7 @@ public class DummyServerConsumer implements ServerConsumer {
}
@Override
public void individualCancel(long messageID, boolean failed, boolean sorted) throws Exception {
public void individualCancel(long messageID, boolean failed) throws Exception {
}

View File

@ -30,8 +30,15 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.tests.util.CFUtil;
import org.apache.activemq.artemis.tests.util.JMSTestBase;
import org.apache.activemq.artemis.tests.util.Wait;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
@ -177,4 +184,101 @@ public class JMSOrderTest extends JMSTestBase {
}
@Test
public void testMultipleConsumersRollback() throws Exception {
internalMultipleConsumers(true);
}
@Test
public void testMultipleConsumersClose() throws Exception {
internalMultipleConsumers(false);
}
private void internalMultipleConsumers(final boolean rollback) throws Exception {
org.apache.activemq.artemis.core.server.Queue serverQueue = server.createQueue(new QueueConfiguration(getName()).setRoutingType(RoutingType.ANYCAST).setDurable(false));
int numberOfMessages = 100;
int numberOfConsumers = 3;
ConnectionFactory factory = CFUtil.createConnectionFactory(protocol, "tcp://localhost:61616");
final javax.jms.Queue jmsQueue;
try (Connection connection = factory.createConnection()) {
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
jmsQueue = session.createQueue(getName());
MessageProducer producer = session.createProducer(jmsQueue);
for (int i = 0; i < numberOfMessages; i++) {
TextMessage message = session.createTextMessage("test " + i);
message.setIntProperty("i", i);
producer.send(message);
}
}
Wait.assertEquals(numberOfMessages, serverQueue::getMessageCount);
AtomicBoolean running = new AtomicBoolean(true);
AtomicInteger errors = new AtomicInteger(0);
Runnable r = () -> {
try (Connection c = factory.createConnection()) {
Session s = c.createSession(true, Session.SESSION_TRANSACTED);
MessageConsumer cs = s.createConsumer(jmsQueue);
c.start();
int rollbacks = 0;
while (running.get()) {
TextMessage txt = (TextMessage)cs.receive(500);
if (txt != null) {
if (rollback) {
s.rollback();
rollbacks++;
if (rollbacks >= 3) {
break;
}
}
} else {
return;
}
}
} catch (Throwable e) {
e.printStackTrace();
errors.incrementAndGet();
running.set(false);
}
};
Thread[] threads = new Thread[numberOfConsumers];
for (int i = 0; i < numberOfConsumers; i++) {
threads[i] = new Thread(r, "consumer " + i);
threads[i].start();
}
for (Thread t : threads) {
t.join();
}
Assert.assertEquals(0, errors.get());
Wait.assertEquals(numberOfMessages, serverQueue::getMessageCount);
try (Connection c = factory.createConnection()) {
Session s = c.createSession(true, Session.SESSION_TRANSACTED);
MessageConsumer cs = s.createConsumer(jmsQueue);
c.start();
for (int i = 0; i < numberOfMessages; i++) {
TextMessage message = (TextMessage) cs.receive(1000);
Assert.assertNotNull(message);
Assert.assertEquals(i, message.getIntProperty("i"));
}
Assert.assertNull(cs.receiveNoWait());
}
}
}

View File

@ -149,12 +149,12 @@ public class RingQueueTest extends ActiveMQTestBase {
producer.send(message);
message = createTextMessage(clientSession, "hello1");
producer.send(message);
Wait.assertTrue(() -> queue.getMessageCount() == 2);
Wait.assertTrue(() -> queue.getDeliveringCount() == 2);
Wait.assertEquals(2, queue::getMessageCount);
Wait.assertEquals(2, queue::getDeliveringCount);
consumer.close();
Wait.assertTrue(() -> queue.getMessageCount() == 1);
Wait.assertTrue(() -> queue.getDeliveringCount() == 0);
Wait.assertTrue(() -> queue.getMessagesReplaced() == 1);
Wait.assertEquals(1, queue::getMessageCount);
Wait.assertEquals(0, queue::getDeliveringCount);
Wait.assertEquals(1, queue::getMessagesReplaced);
consumer = clientSession.createConsumer(qName);
message = consumer.receiveImmediate();
assertNotNull(message);
@ -242,13 +242,20 @@ public class RingQueueTest extends ActiveMQTestBase {
message.acknowledge();
}
consumer.close();
Wait.assertTrue(() -> queue.getMessageCount() == 5);
Wait.assertEquals(5, queue::getMessageCount);
for (int i = 0; i < 10; i++) {
for (int i = 0; i < 5; i++) {
producer.send(clientSession.createMessage(true));
}
Wait.assertTrue(() -> queue.getMessageCount() == 10);
Wait.assertTrue(() -> queue.getMessagesReplaced() == 5);
Wait.assertEquals(10, queue::getMessageCount);
// these sends will be replacing the old values
for (int i = 0; i < 5; i++) {
producer.send(clientSession.createMessage(true));
Wait.assertEquals(10, queue::getMessageCount);
}
Wait.assertEquals(5, queue::getMessagesReplaced);
consumer = clientSession.createConsumer(qName);
message = consumer.receiveImmediate();
assertNotNull(message);

View File

@ -423,7 +423,7 @@ public class FakeQueue extends CriticalComponentImpl implements Queue {
}
@Override
public void cancel(final MessageReference reference, final long timeBase, boolean sorted) throws Exception {
public void cancel(final MessageReference reference, final long timeBase) throws Exception {
// no-op
}