diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java index d18ecbf50e..2f6e93b3d9 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java @@ -928,7 +928,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { synchronized (this) { try { if (ringSize != -1) { - enforceRing(ref, scheduling); + enforceRing(ref, scheduling, true); } if (!ref.isAlreadyAcked()) { @@ -1026,8 +1026,6 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { public void addTail(final MessageReference ref, final boolean direct) { enterCritical(CRITICAL_PATH_ADD_TAIL); try { - enforceRing(); - if (scheduleIfPossible(ref)) { return; } @@ -2592,6 +2590,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { refAdded(ref); messageReferences.addTail(ref, getPriority(ref)); pendingMetrics.incrementMetrics(ref); + enforceRing(false); } /** @@ -4072,14 +4071,16 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { } } - private void enforceRing() { + private void enforceRing(boolean head) { if (ringSize != -1) { // better escaping & inlining when ring isn't being used - enforceRing(null, false); + enforceRing(null, false, head); } } - private void enforceRing(MessageReference refToAck, boolean scheduling) { - if (getMessageCountForRing() >= ringSize) { + private void enforceRing(MessageReference refToAck, boolean scheduling, boolean head) { + int adjustment = head ? 1 : 0; + + if (getMessageCountForRing() + adjustment > ringSize) { refToAck = refToAck == null ? messageReferences.poll() : refToAck; if (refToAck != null) { diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/RingQueueTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/RingQueueTest.java index 7343b6b52d..9b06a8473b 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/RingQueueTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/RingQueueTest.java @@ -20,6 +20,8 @@ import javax.jms.Connection; import javax.jms.MessageProducer; import javax.jms.Session; +import java.util.concurrent.atomic.AtomicInteger; + import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration; import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.QueueAttributes; @@ -330,6 +332,42 @@ public class RingQueueTest extends ActiveMQTestBase { Wait.assertTrue(() -> queue.getMessageCount() == 1, 2000, 100); } + @Test + public void testMultipleConcurrentProducers() throws Exception { + final long RING_SIZE = 25; + ServerLocator locator = createNettyNonHALocator().setBlockOnAcknowledge(true).setAckBatchSize(0); + ClientSessionFactory sf = createSessionFactory(locator); + ClientSession clientSession = addClientSession(sf.createSession(false, true, true)); + clientSession.createQueue(address, qName, false, new QueueAttributes().setDurable(true).setRingSize(RING_SIZE).setMaxConsumers(-1).setPurgeOnNoConsumers(false)); + clientSession.start(); + final Queue queue = server.locateQueue(qName); + assertEquals(RING_SIZE, queue.getRingSize()); + final int nThreads = 25; + final long numberOfMessages = RING_SIZE; + + SomeProducer[] producers = new SomeProducer[nThreads]; + + try { + for (int i = 0; i < nThreads; i++) { + producers[i] = new SomeProducer(numberOfMessages, nThreads, address); + } + + for (int i = 0; i < nThreads; i++) { + producers[i].start(); + } + + for (SomeProducer producer : producers) { + producer.join(); + assertEquals(0, producer.errors.get()); + } + } catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + + Wait.assertTrue("message count should be " + RING_SIZE + " but it's actually " + queue.getMessageCount(), () -> queue.getMessageCount() == RING_SIZE, 2000, 100); + } + @Override @Before public void setUp() throws Exception { @@ -339,4 +377,47 @@ public class RingQueueTest extends ActiveMQTestBase { // start the server server.start(); } + + class SomeProducer extends Thread { + + final ClientSessionFactory factory; + final ServerLocator locator; + final ClientSession prodSession; + public final AtomicInteger errors = new AtomicInteger(0); + final long numberOfMessages; + final int nThreads; + final SimpleString address; + + SomeProducer(long numberOfMessages, int nThreads, SimpleString address) throws Exception { + locator = createNettyNonHALocator(); + factory = locator.createSessionFactory(); + prodSession = factory.createSession(true, false); + this.numberOfMessages = numberOfMessages; + this.nThreads = nThreads; + this.address = address; + } + + @Override + public void run() { + try { + ClientProducer producer = prodSession.createProducer(address); + for (int i = 0; i < numberOfMessages; i++) { + ClientMessage message = prodSession.createMessage(true); + message.putIntProperty("prodNR", i % nThreads); + producer.send(message); + } + + } catch (Throwable e) { + e.printStackTrace(); + errors.incrementAndGet(); + } finally { + try { + prodSession.close(); + locator.close(); + } catch (Throwable ignored) { + ignored.printStackTrace(); + } + } + } + } }