This commit is contained in:
Clebert Suconic 2019-09-13 11:44:24 -04:00
commit de0da642b0
2 changed files with 89 additions and 7 deletions

View File

@ -928,7 +928,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
synchronized (this) { synchronized (this) {
try { try {
if (ringSize != -1) { if (ringSize != -1) {
enforceRing(ref, scheduling); enforceRing(ref, scheduling, true);
} }
if (!ref.isAlreadyAcked()) { if (!ref.isAlreadyAcked()) {
@ -1026,8 +1026,6 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
public void addTail(final MessageReference ref, final boolean direct) { public void addTail(final MessageReference ref, final boolean direct) {
enterCritical(CRITICAL_PATH_ADD_TAIL); enterCritical(CRITICAL_PATH_ADD_TAIL);
try { try {
enforceRing();
if (scheduleIfPossible(ref)) { if (scheduleIfPossible(ref)) {
return; return;
} }
@ -2592,6 +2590,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
refAdded(ref); refAdded(ref);
messageReferences.addTail(ref, getPriority(ref)); messageReferences.addTail(ref, getPriority(ref));
pendingMetrics.incrementMetrics(ref); pendingMetrics.incrementMetrics(ref);
enforceRing(false);
} }
/** /**
@ -4072,14 +4071,16 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
} }
} }
private void enforceRing() { private void enforceRing(boolean head) {
if (ringSize != -1) { // better escaping & inlining when ring isn't being used if (ringSize != -1) { // better escaping & inlining when ring isn't being used
enforceRing(null, false); enforceRing(null, false, head);
} }
} }
private void enforceRing(MessageReference refToAck, boolean scheduling) { private void enforceRing(MessageReference refToAck, boolean scheduling, boolean head) {
if (getMessageCountForRing() >= ringSize) { int adjustment = head ? 1 : 0;
if (getMessageCountForRing() + adjustment > ringSize) {
refToAck = refToAck == null ? messageReferences.poll() : refToAck; refToAck = refToAck == null ? messageReferences.poll() : refToAck;
if (refToAck != null) { if (refToAck != null) {

View File

@ -20,6 +20,8 @@ import javax.jms.Connection;
import javax.jms.MessageProducer; import javax.jms.MessageProducer;
import javax.jms.Session; import javax.jms.Session;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration; import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.QueueAttributes; import org.apache.activemq.artemis.api.core.QueueAttributes;
@ -330,6 +332,42 @@ public class RingQueueTest extends ActiveMQTestBase {
Wait.assertTrue(() -> queue.getMessageCount() == 1, 2000, 100); Wait.assertTrue(() -> queue.getMessageCount() == 1, 2000, 100);
} }
@Test
public void testMultipleConcurrentProducers() throws Exception {
final long RING_SIZE = 25;
ServerLocator locator = createNettyNonHALocator().setBlockOnAcknowledge(true).setAckBatchSize(0);
ClientSessionFactory sf = createSessionFactory(locator);
ClientSession clientSession = addClientSession(sf.createSession(false, true, true));
clientSession.createQueue(address, qName, false, new QueueAttributes().setDurable(true).setRingSize(RING_SIZE).setMaxConsumers(-1).setPurgeOnNoConsumers(false));
clientSession.start();
final Queue queue = server.locateQueue(qName);
assertEquals(RING_SIZE, queue.getRingSize());
final int nThreads = 25;
final long numberOfMessages = RING_SIZE;
SomeProducer[] producers = new SomeProducer[nThreads];
try {
for (int i = 0; i < nThreads; i++) {
producers[i] = new SomeProducer(numberOfMessages, nThreads, address);
}
for (int i = 0; i < nThreads; i++) {
producers[i].start();
}
for (SomeProducer producer : producers) {
producer.join();
assertEquals(0, producer.errors.get());
}
} catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
Wait.assertTrue("message count should be " + RING_SIZE + " but it's actually " + queue.getMessageCount(), () -> queue.getMessageCount() == RING_SIZE, 2000, 100);
}
@Override @Override
@Before @Before
public void setUp() throws Exception { public void setUp() throws Exception {
@ -339,4 +377,47 @@ public class RingQueueTest extends ActiveMQTestBase {
// start the server // start the server
server.start(); server.start();
} }
class SomeProducer extends Thread {
final ClientSessionFactory factory;
final ServerLocator locator;
final ClientSession prodSession;
public final AtomicInteger errors = new AtomicInteger(0);
final long numberOfMessages;
final int nThreads;
final SimpleString address;
SomeProducer(long numberOfMessages, int nThreads, SimpleString address) throws Exception {
locator = createNettyNonHALocator();
factory = locator.createSessionFactory();
prodSession = factory.createSession(true, false);
this.numberOfMessages = numberOfMessages;
this.nThreads = nThreads;
this.address = address;
}
@Override
public void run() {
try {
ClientProducer producer = prodSession.createProducer(address);
for (int i = 0; i < numberOfMessages; i++) {
ClientMessage message = prodSession.createMessage(true);
message.putIntProperty("prodNR", i % nThreads);
producer.send(message);
}
} catch (Throwable e) {
e.printStackTrace();
errors.incrementAndGet();
} finally {
try {
prodSession.close();
locator.close();
} catch (Throwable ignored) {
ignored.printStackTrace();
}
}
}
}
} }