From 12c8096a23840ede9c364cc184dddfe19846e2e0 Mon Sep 17 00:00:00 2001 From: Clebert Suconic Date: Mon, 22 Feb 2021 09:36:55 -0500 Subject: [PATCH] ARTEMIS-3093 Ordering on multiple consumers and core with rollback --- .../amqp/broker/AMQPSessionCallback.java | 4 +- .../transaction/ProtonTransactionImpl.java | 2 +- .../protocol/mqtt/MQTTPublishManager.java | 2 +- .../activemq/artemis/core/server/Queue.java | 3 +- .../artemis/core/server/ServerConsumer.java | 4 +- .../core/server/cluster/impl/BridgeImpl.java | 2 +- .../core/server/impl/LastValueQueue.java | 13 +++ .../artemis/core/server/impl/QueueImpl.java | 33 +++--- .../core/server/impl/RefsOperation.java | 7 +- .../core/server/impl/ServerConsumerImpl.java | 13 +-- .../core/server/impl/ServerSessionImpl.java | 2 +- .../transaction/TransactionOperation.java | 4 - .../transaction/impl/TransactionImpl.java | 43 +++----- .../impl/ScheduledDeliveryHandlerTest.java | 2 +- .../integration/cli/DummyServerConsumer.java | 7 +- .../integration/client/JMSOrderTest.java | 104 ++++++++++++++++++ .../integration/server/RingQueueTest.java | 25 +++-- .../unit/core/postoffice/impl/FakeQueue.java | 2 +- 18 files changed, 177 insertions(+), 95 deletions(-) diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java index eb628554f5..f0fb1efe45 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java @@ -398,7 +398,7 @@ public class AMQPSessionCallback implements SessionCallback { public void closeSender(final Object brokerConsumer) throws Exception { final ServerConsumer consumer = ((ServerConsumer) brokerConsumer); - consumer.close(false, true); + consumer.close(false); consumer.getQueue().recheckRefCount(serverSession.getSessionContext()); } @@ -440,7 +440,7 @@ public class AMQPSessionCallback implements SessionCallback { public void cancel(Object brokerConsumer, Message message, boolean updateCounts) throws Exception { OperationContext oldContext = recoverContext(); try { - ((ServerConsumer) brokerConsumer).individualCancel(message.getMessageID(), updateCounts, true); + ((ServerConsumer) brokerConsumer).individualCancel(message.getMessageID(), updateCounts); ((ServerConsumer) brokerConsumer).getQueue().forceDelivery(); } finally { resetContext(oldContext); diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/transaction/ProtonTransactionImpl.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/transaction/ProtonTransactionImpl.java index 83128e1f45..123dbb5d9b 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/transaction/ProtonTransactionImpl.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/transaction/ProtonTransactionImpl.java @@ -50,7 +50,7 @@ public class ProtonTransactionImpl extends TransactionImpl { private boolean discharged; public ProtonTransactionImpl(final Xid xid, final StorageManager storageManager, final int timeoutSeconds, final AMQPConnectionContext connection) { - super(xid, storageManager, timeoutSeconds, true); + super(xid, storageManager, timeoutSeconds); addOperation(new TransactionOperationAbstract() { @Override public void afterCommit(Transaction tx) { diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java index 4b89636117..5d9c96dce6 100644 --- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java +++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java @@ -133,7 +133,7 @@ public class MQTTPublishManager { sendServerMessage(mqttid, message, deliveryCount, qos); } else { // Client must have disconnected and it's Subscription QoS cleared - consumer.individualCancel(message.getMessageID(), false, true); + consumer.individualCancel(message.getMessageID(), false); } } } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java index 628e43e1f0..5b1d1285af 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java @@ -220,8 +220,7 @@ public interface Queue extends Bindable,CriticalComponent { void cancel(Transaction tx, MessageReference ref, boolean ignoreRedeliveryCheck); - /** @param sorted it should use the messageID as a reference to where to add it in the queue */ - void cancel(MessageReference reference, long timeBase, boolean sorted) throws Exception; + void cancel(MessageReference reference, long timeBase) throws Exception; void deliverAsync(); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerConsumer.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerConsumer.java index 8528f68044..e743c0401c 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerConsumer.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerConsumer.java @@ -64,8 +64,6 @@ public interface ServerConsumer extends Consumer, ConsumerInfo { void close(boolean failed) throws Exception; - void close(boolean failed, boolean sorted) throws Exception; - /** * This method is just to remove itself from Queues. * If for any reason during a close an exception occurred, the exception treatment @@ -101,7 +99,7 @@ public interface ServerConsumer extends Consumer, ConsumerInfo { void reject(long messageID) throws Exception; - void individualCancel(long messageID, boolean failed, boolean sorted) throws Exception; + void individualCancel(long messageID, boolean failed) throws Exception; void forceDelivery(long sequence); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java index 8a34ca375e..4458ac968c 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java @@ -355,7 +355,7 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled refqueue = ref.getQueue(); try { - refqueue.cancel(ref, timeBase, false); + refqueue.cancel(ref, timeBase); } catch (Exception e) { // There isn't much we can do besides log an error ActiveMQServerLogger.LOGGER.errorCancellingRefOnBridge(e, ref); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java index 1df1bceee6..02031e784d 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java @@ -17,6 +17,7 @@ package org.apache.activemq.artemis.core.server.impl; import java.util.Collections; +import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; @@ -193,6 +194,18 @@ public class LastValueQueue extends QueueImpl { } } + /** LVQ has to use regular addHead due to last value queues calculations */ + @Override + public void addSorted(MessageReference ref, boolean scheduling) { + this.addHead(ref, scheduling); + } + + /** LVQ has to use regular addHead due to last value queues calculations */ + @Override + public void addSorted(List refs, boolean scheduling) { + this.addHead(refs, scheduling); + } + @Override public synchronized void addHead(final MessageReference ref, boolean scheduling) { // we first need to check redelivery-delay, as we can't put anything on headers if redelivery-delay diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java index 44e1a3f76c..809c00c011 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java @@ -1115,13 +1115,18 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { enterCritical(CRITICAL_PATH_ADD_HEAD); synchronized (this) { try { - if (!scheduling && scheduledDeliveryHandler.checkAndSchedule(ref, false)) { - return; + if (ringSize != -1) { + enforceRing(ref, false, true); } - internalAddSorted(ref); + if (!ref.isAlreadyAcked()) { + if (!scheduling && scheduledDeliveryHandler.checkAndSchedule(ref, false)) { + return; + } + internalAddSorted(ref); - directDeliver = false; + directDeliver = false; + } } finally { leaveCritical(CRITICAL_PATH_ADD_HEAD); } @@ -1948,15 +1953,11 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { } @Override - public synchronized void cancel(final MessageReference reference, final long timeBase, boolean sorted) throws Exception { + public synchronized void cancel(final MessageReference reference, final long timeBase) throws Exception { Pair redeliveryResult = checkRedelivery(reference, timeBase, false); if (redeliveryResult.getA()) { if (!scheduledDeliveryHandler.checkAndSchedule(reference, false)) { - if (sorted) { - internalAddSorted(reference); - } else { - internalAddHead(reference); - } + internalAddSorted(reference); } resetAllIterators(); @@ -2862,6 +2863,8 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { int priority = getPriority(ref); messageReferences.addSorted(ref, priority); + + ref.setInDelivery(false); } private int getPriority(MessageReference ref) { @@ -3933,10 +3936,6 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { } void postRollback(final LinkedList refs) { - postRollback(refs, false); - } - - void postRollback(final LinkedList refs, boolean sorted) { //if we have purged then ignore adding the messages back if (purgeOnNoConsumers && getConsumerCount() == 0) { purgeAfterRollback(refs); @@ -3946,11 +3945,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { // if the queue is non-destructive then any ack is ignored so no need to add messages back onto the queue if (!isNonDestructive()) { - if (sorted) { - addSorted(refs, false); - } else { - addHead(refs, false); - } + addSorted(refs, false); } } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RefsOperation.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RefsOperation.java index 054ba7324f..c50a06dea2 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RefsOperation.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RefsOperation.java @@ -84,11 +84,6 @@ public class RefsOperation extends TransactionOperationAbstract { @Override public void afterRollback(final Transaction tx) { - afterRollback(tx, false); - } - - @Override - public void afterRollback(final Transaction tx, boolean sorted) { Map> queueMap = new HashMap<>(); long timeBase = System.currentTimeMillis(); @@ -121,7 +116,7 @@ public class RefsOperation extends TransactionOperationAbstract { QueueImpl queue = entry.getKey(); synchronized (queue) { - queue.postRollback(refs, sorted); + queue.postRollback(refs); } } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java index 15864db020..f7dbf9aaab 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java @@ -539,12 +539,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener { } @Override - public void close(final boolean failed) throws Exception { - close(failed, false); - } - - @Override - public synchronized void close(final boolean failed, boolean sorted) throws Exception { + public synchronized void close(final boolean failed) throws Exception { // Close should only ever be done once per consumer. if (isClosed) return; @@ -570,7 +565,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener { List refs = cancelRefs(failed, false, null); - Transaction tx = new TransactionImpl(storageManager, sorted); + Transaction tx = new TransactionImpl(storageManager); refs.forEach(ref -> { if (logger.isTraceEnabled()) { @@ -1022,7 +1017,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener { } @Override - public synchronized void individualCancel(final long messageID, boolean failed, boolean sorted) throws Exception { + public synchronized void individualCancel(final long messageID, boolean failed) throws Exception { if (browseOnly) { return; } @@ -1037,7 +1032,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener { ref.decrementDeliveryCount(); } - ref.getQueue().cancel(ref, System.currentTimeMillis(), sorted); + ref.getQueue().cancel(ref, System.currentTimeMillis()); } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java index f66a57f64f..dbc68f23ca 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java @@ -1268,7 +1268,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener { ServerConsumer consumer = locateConsumer(consumerID); if (consumer != null) { - consumer.individualCancel(messageID, failed, false); + consumer.individualCancel(messageID, failed); } } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/TransactionOperation.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/TransactionOperation.java index 5c7e7e6680..5da1d97d92 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/TransactionOperation.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/TransactionOperation.java @@ -52,10 +52,6 @@ public interface TransactionOperation { */ void afterRollback(Transaction tx); - default void afterRollback(Transaction tx, boolean sorted) { - afterRollback(tx); - } - List getRelatedMessageReferences(); List getListOnConsumer(long consumerID); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImpl.java index 8e22bbb5d0..e14d31ddf9 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImpl.java @@ -63,8 +63,6 @@ public class TransactionImpl implements Transaction { private final long createTime; - private final boolean sorted; - private volatile boolean containsPersistent; private int timeoutSeconds = -1; @@ -98,34 +96,23 @@ public class TransactionImpl implements Transaction { } public TransactionImpl(final StorageManager storageManager, final int timeoutSeconds) { - this(storageManager.generateID(), null, storageManager, timeoutSeconds, false); + this(storageManager.generateID(), null, storageManager, timeoutSeconds); } public TransactionImpl(final StorageManager storageManager) { - this(storageManager, false); + this(storageManager.generateID(), null, storageManager,-1); } - public TransactionImpl(final StorageManager storageManager, boolean sorted) { - this(storageManager.generateID(), null, storageManager,-1, sorted); - } public TransactionImpl(final Xid xid, final StorageManager storageManager, final int timeoutSeconds) { - this(storageManager.generateID(), xid, storageManager, timeoutSeconds, false); - } - - public TransactionImpl(final Xid xid, final StorageManager storageManager, final int timeoutSeconds, final boolean sorted) { - this(storageManager.generateID(), xid, storageManager, timeoutSeconds, sorted); + this(storageManager.generateID(), xid, storageManager, timeoutSeconds); } public TransactionImpl(final long id, final Xid xid, final StorageManager storageManager) { - this(id, xid, storageManager, -1, false); + this(id, xid, storageManager, -1); } - public TransactionImpl(final long id, final Xid xid, final StorageManager storageManager, boolean sorted) { - this(id, xid, storageManager, -1, sorted); - } - - private TransactionImpl(final long id, final Xid xid, final StorageManager storageManager, final int timeoutSeconds, boolean sorted) { + private TransactionImpl(final long id, final Xid xid, final StorageManager storageManager, final int timeoutSeconds) { this.storageManager = storageManager; this.xid = xid; @@ -135,8 +122,6 @@ public class TransactionImpl implements Transaction { this.createTime = System.currentTimeMillis(); this.timeoutSeconds = timeoutSeconds; - - this.sorted = sorted; } // Transaction implementation @@ -217,7 +202,7 @@ public class TransactionImpl implements Transaction { logger.trace("TransactionImpl::prepare::rollbackonly, rollingback " + this); } - internalRollback(sorted); + internalRollback(); if (exception != null) { throw exception; @@ -276,7 +261,7 @@ public class TransactionImpl implements Transaction { return; } if (state == State.ROLLBACK_ONLY) { - internalRollback(sorted); + internalRollback(); if (exception != null) { throw exception; @@ -367,7 +352,7 @@ public class TransactionImpl implements Transaction { } if (state != State.PREPARED) { try { - internalRollback(sorted); + internalRollback(); } catch (Exception e) { // nothing we can do beyond logging // no need to special handler here as this was not even supposed to happen at this point @@ -400,11 +385,11 @@ public class TransactionImpl implements Transaction { } } - internalRollback(sorted); + internalRollback(); } } - private void internalRollback(boolean sorted) throws Exception { + private void internalRollback() throws Exception { if (logger.isTraceEnabled()) { logger.trace("TransactionImpl::internalRollback " + this); } @@ -439,7 +424,7 @@ public class TransactionImpl implements Transaction { @Override public void done() { - afterRollback(operationsToComplete, sorted); + afterRollback(operationsToComplete); } }); @@ -453,7 +438,7 @@ public class TransactionImpl implements Transaction { @Override public void done() { - afterRollback(storeOperationsToComplete, sorted); + afterRollback(storeOperationsToComplete); } }); } @@ -583,10 +568,10 @@ public class TransactionImpl implements Transaction { } } - private synchronized void afterRollback(List operationsToComplete, boolean sorted) { + private synchronized void afterRollback(List operationsToComplete) { if (operationsToComplete != null) { for (TransactionOperation operation : operationsToComplete) { - operation.afterRollback(this, sorted); + operation.afterRollback(this); } // Help out GC here operationsToComplete.clear(); diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java index 5d3e1d149e..0fcd2b4bd2 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java @@ -1217,7 +1217,7 @@ public class ScheduledDeliveryHandlerTest extends Assert { } @Override - public void cancel(MessageReference reference, long timeBase, boolean backInPlace) throws Exception { + public void cancel(MessageReference reference, long timeBase) throws Exception { } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cli/DummyServerConsumer.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cli/DummyServerConsumer.java index a8224235f7..169303623a 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cli/DummyServerConsumer.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cli/DummyServerConsumer.java @@ -91,11 +91,6 @@ public class DummyServerConsumer implements ServerConsumer { } - @Override - public void close(boolean failed, boolean sorted) throws Exception { - - } - @Override public void removeItself() throws Exception { @@ -156,7 +151,7 @@ public class DummyServerConsumer implements ServerConsumer { } @Override - public void individualCancel(long messageID, boolean failed, boolean sorted) throws Exception { + public void individualCancel(long messageID, boolean failed) throws Exception { } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/JMSOrderTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/JMSOrderTest.java index e883bf6b8f..333bcda901 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/JMSOrderTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/JMSOrderTest.java @@ -30,8 +30,15 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.activemq.artemis.api.core.QueueConfiguration; +import org.apache.activemq.artemis.api.core.RoutingType; +import org.apache.activemq.artemis.tests.util.CFUtil; import org.apache.activemq.artemis.tests.util.JMSTestBase; +import org.apache.activemq.artemis.tests.util.Wait; +import org.junit.Assert; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; @@ -177,4 +184,101 @@ public class JMSOrderTest extends JMSTestBase { } + @Test + public void testMultipleConsumersRollback() throws Exception { + internalMultipleConsumers(true); + } + + @Test + public void testMultipleConsumersClose() throws Exception { + internalMultipleConsumers(false); + } + + private void internalMultipleConsumers(final boolean rollback) throws Exception { + + + org.apache.activemq.artemis.core.server.Queue serverQueue = server.createQueue(new QueueConfiguration(getName()).setRoutingType(RoutingType.ANYCAST).setDurable(false)); + + int numberOfMessages = 100; + int numberOfConsumers = 3; + + ConnectionFactory factory = CFUtil.createConnectionFactory(protocol, "tcp://localhost:61616"); + final javax.jms.Queue jmsQueue; + + try (Connection connection = factory.createConnection()) { + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + jmsQueue = session.createQueue(getName()); + MessageProducer producer = session.createProducer(jmsQueue); + + for (int i = 0; i < numberOfMessages; i++) { + TextMessage message = session.createTextMessage("test " + i); + message.setIntProperty("i", i); + producer.send(message); + } + } + + Wait.assertEquals(numberOfMessages, serverQueue::getMessageCount); + + AtomicBoolean running = new AtomicBoolean(true); + AtomicInteger errors = new AtomicInteger(0); + Runnable r = () -> { + try (Connection c = factory.createConnection()) { + Session s = c.createSession(true, Session.SESSION_TRANSACTED); + MessageConsumer cs = s.createConsumer(jmsQueue); + c.start(); + int rollbacks = 0; + while (running.get()) { + TextMessage txt = (TextMessage)cs.receive(500); + if (txt != null) { + if (rollback) { + s.rollback(); + rollbacks++; + + if (rollbacks >= 3) { + break; + } + } + } else { + return; + } + } + } catch (Throwable e) { + e.printStackTrace(); + errors.incrementAndGet(); + running.set(false); + } + }; + + Thread[] threads = new Thread[numberOfConsumers]; + + for (int i = 0; i < numberOfConsumers; i++) { + threads[i] = new Thread(r, "consumer " + i); + threads[i].start(); + } + + for (Thread t : threads) { + t.join(); + } + + Assert.assertEquals(0, errors.get()); + + Wait.assertEquals(numberOfMessages, serverQueue::getMessageCount); + + try (Connection c = factory.createConnection()) { + Session s = c.createSession(true, Session.SESSION_TRANSACTED); + MessageConsumer cs = s.createConsumer(jmsQueue); + c.start(); + + for (int i = 0; i < numberOfMessages; i++) { + TextMessage message = (TextMessage) cs.receive(1000); + Assert.assertNotNull(message); + Assert.assertEquals(i, message.getIntProperty("i")); + } + + Assert.assertNull(cs.receiveNoWait()); + } + + } + + } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/RingQueueTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/RingQueueTest.java index 3cfaf98234..8f7ae9faa4 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/RingQueueTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/RingQueueTest.java @@ -149,12 +149,12 @@ public class RingQueueTest extends ActiveMQTestBase { producer.send(message); message = createTextMessage(clientSession, "hello1"); producer.send(message); - Wait.assertTrue(() -> queue.getMessageCount() == 2); - Wait.assertTrue(() -> queue.getDeliveringCount() == 2); + Wait.assertEquals(2, queue::getMessageCount); + Wait.assertEquals(2, queue::getDeliveringCount); consumer.close(); - Wait.assertTrue(() -> queue.getMessageCount() == 1); - Wait.assertTrue(() -> queue.getDeliveringCount() == 0); - Wait.assertTrue(() -> queue.getMessagesReplaced() == 1); + Wait.assertEquals(1, queue::getMessageCount); + Wait.assertEquals(0, queue::getDeliveringCount); + Wait.assertEquals(1, queue::getMessagesReplaced); consumer = clientSession.createConsumer(qName); message = consumer.receiveImmediate(); assertNotNull(message); @@ -242,13 +242,20 @@ public class RingQueueTest extends ActiveMQTestBase { message.acknowledge(); } consumer.close(); - Wait.assertTrue(() -> queue.getMessageCount() == 5); + Wait.assertEquals(5, queue::getMessageCount); - for (int i = 0; i < 10; i++) { + for (int i = 0; i < 5; i++) { producer.send(clientSession.createMessage(true)); } - Wait.assertTrue(() -> queue.getMessageCount() == 10); - Wait.assertTrue(() -> queue.getMessagesReplaced() == 5); + Wait.assertEquals(10, queue::getMessageCount); + + // these sends will be replacing the old values + for (int i = 0; i < 5; i++) { + producer.send(clientSession.createMessage(true)); + Wait.assertEquals(10, queue::getMessageCount); + } + + Wait.assertEquals(5, queue::getMessagesReplaced); consumer = clientSession.createConsumer(qName); message = consumer.receiveImmediate(); assertNotNull(message); diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java index cc88b53563..cb2aff6ad2 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java @@ -423,7 +423,7 @@ public class FakeQueue extends CriticalComponentImpl implements Queue { } @Override - public void cancel(final MessageReference reference, final long timeBase, boolean sorted) throws Exception { + public void cancel(final MessageReference reference, final long timeBase) throws Exception { // no-op }