ARTEMIS-3807 Simplify Redistributor avoid races, and fix ProtocolMessageLoadBalancingTest

The control existing in Redistributor is not needed as the Queue::deliver will already have a control on re-scheduling the loop and avoid holding references for too long.
This commit is contained in:
Clebert Suconic 2022-04-28 19:35:26 -04:00 committed by clebertsuconic
parent 1a273821ed
commit 48cd586ac5
5 changed files with 49 additions and 157 deletions

View File

@ -18,15 +18,12 @@ package org.apache.activemq.artemis.core.server.cluster.impl;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.Executor;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.Pair;
import org.apache.activemq.artemis.core.filter.Filter;
import org.apache.activemq.artemis.core.io.IOCallback;
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.postoffice.PostOffice;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.Consumer;
import org.apache.activemq.artemis.core.server.HandleStatus;
import org.apache.activemq.artemis.core.server.MessageReference;
@ -44,14 +41,8 @@ public class Redistributor implements Consumer {
private final PostOffice postOffice;
private final Executor executor;
private final int batchSize;
private final Queue queue;
private int count;
private final long sequentialID;
// a Flush executor here is happening inside another executor.
@ -61,9 +52,7 @@ public class Redistributor implements Consumer {
public Redistributor(final Queue queue,
final StorageManager storageManager,
final PostOffice postOffice,
final Executor executor,
final int batchSize) {
final PostOffice postOffice) {
this.queue = queue;
this.sequentialID = storageManager.generateID();
@ -71,10 +60,6 @@ public class Redistributor implements Consumer {
this.storageManager = storageManager;
this.postOffice = postOffice;
this.executor = executor;
this.batchSize = batchSize;
}
@Override
@ -103,39 +88,17 @@ public class Redistributor implements Consumer {
}
public synchronized void start() {
active = true;
this.active = true;
}
public synchronized void stop() throws Exception {
active = false;
boolean ok = flushExecutor();
if (!ok) {
ActiveMQServerLogger.LOGGER.errorStoppingRedistributor();
}
this.active = false;
}
public synchronized void close() {
boolean ok = flushExecutor();
if (!ok) {
throw new IllegalStateException("Timed out waiting for executor to complete");
}
active = false;
}
private boolean flushExecutor() {
try {
boolean ok = pendingRuns.await(10000);
return ok;
} catch (InterruptedException e) {
ActiveMQServerLogger.LOGGER.failedToFlushExecutor(e);
return false;
}
}
@Override
public synchronized HandleStatus handle(final MessageReference reference) throws Exception {
if (!active) {
@ -154,41 +117,9 @@ public class Redistributor implements Consumer {
return HandleStatus.BUSY;
}
if (!reference.getMessage().isLargeMessage()) {
postOffice.processRoute(routingInfo.getB(), routingInfo.getA(), false);
ackRedistribution(reference, tx);
} else {
active = false;
executor.execute(new Runnable() {
@Override
public void run() {
try {
postOffice.processRoute(routingInfo.getB(), routingInfo.getA(), false);
ackRedistribution(reference, tx);
synchronized (Redistributor.this) {
active = true;
count++;
queue.deliverAsync();
}
} catch (Exception e) {
ActiveMQServerLogger.LOGGER.errorRedistributing(e, toManagementString(), reference.getMessageID());
try {
tx.rollback();
} catch (Exception e2) {
// Nothing much we can do now
ActiveMQServerLogger.LOGGER.failedToRollback(e2);
}
}
}
});
}
return HandleStatus.HANDLED;
}
@ -198,68 +129,12 @@ public class Redistributor implements Consumer {
// no op
}
private void internalExecute(final Runnable runnable) {
pendingRuns.countUp();
executor.execute(new Runnable() {
@Override
public void run() {
try {
runnable.run();
} finally {
pendingRuns.countDown();
}
}
});
}
private void ackRedistribution(final MessageReference reference, final Transaction tx) throws Exception {
reference.handled();
queue.acknowledge(tx, reference);
tx.commit();
storageManager.afterCompleteOperations(new IOCallback() {
@Override
public void onError(final int errorCode, final String errorMessage) {
ActiveMQServerLogger.LOGGER.ioErrorRedistributing(errorCode, errorMessage);
}
@Override
public void done() {
execPrompter();
}
});
}
private void execPrompter() {
count++;
// We use >= as the large message redistribution will set count to max_int
// so we are use the prompter will get called
if (count >= batchSize) {
// We continue the next batch on a different thread, so as not to keep the delivery thread busy for a very
// long time in the case there are many messages in the queue
active = false;
executor.execute(new Prompter());
count = 0;
}
}
private class Prompter implements Runnable {
@Override
public void run() {
synchronized (Redistributor.this) {
active = true;
queue.deliverAsync();
}
}
}
/* (non-Javadoc)

View File

@ -141,8 +141,6 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
private static final AtomicLongFieldUpdater<QueueImpl> consumerRemovedTimestampUpdater = AtomicLongFieldUpdater.newUpdater(QueueImpl.class, "consumerRemovedTimestamp");
private static final AtomicReferenceFieldUpdater<QueueImpl, Filter> filterUpdater = AtomicReferenceFieldUpdater.newUpdater(QueueImpl.class, Filter.class, "filter");
public static final int REDISTRIBUTOR_BATCH_SIZE = 100;
public static final int NUM_PRIORITIES = 10;
public static final int MAX_DELIVERIES_IN_LOOP = 1000;
@ -1532,7 +1530,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
redistributorFuture = scheduledExecutor.schedule(dar, delay, TimeUnit.MILLISECONDS);
}
} else {
internalAddRedistributor(executor);
internalAddRedistributor();
}
}
@ -3257,12 +3255,12 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
return messageReferences.removeWithID(serverID, id);
}
private void internalAddRedistributor(final ArtemisExecutor executor) {
private void internalAddRedistributor() {
if (redistributor == null && (consumers.isEmpty() || hasUnMatchedPending)) {
if (logger.isTraceEnabled()) {
logger.trace("QueueImpl::Adding redistributor on queue " + this.toString());
}
redistributor = new ConsumerHolder(new Redistributor(this, storageManager, postOffice, executor, QueueImpl.REDISTRIBUTOR_BATCH_SIZE));
redistributor = new ConsumerHolder(new Redistributor(this, storageManager, postOffice));
redistributor.consumer.start();
consumers.add(redistributor);
hasUnMatchedPending = false;
@ -4130,7 +4128,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
@Override
public void run() {
synchronized (QueueImpl.this) {
internalAddRedistributor(executor1);
internalAddRedistributor();
clearRedistributorFuture();
}

View File

@ -147,6 +147,7 @@ import org.apache.activemq.artemis.utils.Env;
import org.apache.activemq.artemis.utils.FileUtil;
import org.apache.activemq.artemis.utils.PortCheckRule;
import org.apache.activemq.artemis.utils.RandomUtil;
import org.apache.activemq.artemis.utils.RunnableEx;
import org.apache.activemq.artemis.utils.ThreadDumpUtil;
import org.apache.activemq.artemis.utils.ThreadLeakCheckRule;
import org.apache.activemq.artemis.utils.UUIDGenerator;
@ -228,6 +229,31 @@ public abstract class ActiveMQTestBase extends Assert {
// There is a verification about thread leakages. We only fail a single thread when this happens
private static Set<Thread> alreadyFailedThread = new HashSet<>();
private LinkedList<RunnableEx> runAfter;
protected synchronized void runAfter(RunnableEx run) {
Assert.assertNotNull(run);
if (runAfter == null) {
runAfter = new LinkedList();
}
runAfter.add(run);
}
@After
public void runAfter() {
if (runAfter != null) {
runAfter.forEach((r) -> {
try {
r.run();
} catch (Exception e) {
logger.warn(e.getMessage(), e);
}
});
}
}
private final Collection<ActiveMQServer> servers = new ArrayList<>();
private final Collection<ServerLocator> locators = new ArrayList<>();
private final Collection<ClientSessionFactory> sessionFactories = new ArrayList<>();

View File

@ -49,21 +49,15 @@ import org.apache.activemq.artemis.tests.util.Wait;
import org.apache.activemq.artemis.protocol.amqp.broker.ProtonProtocolManagerFactory;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.tests.integration.cluster.distribution.ClusterTestBase;
import org.apache.activemq.artemis.utils.RetryRule;
import org.apache.qpid.jms.JmsConnectionFactory;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@RunWith(value = Parameterized.class)
public class ProtocolsMessageLoadBalancingTest extends ClusterTestBase {
@Rule
public RetryRule retryRule = new RetryRule(2);
private static final int NUMBER_OF_SERVERS = 2;
private static final SimpleString queueName = SimpleString.toSimpleString("queues.0");
@ -166,6 +160,7 @@ public class ProtocolsMessageLoadBalancingTest extends ClusterTestBase {
{
ConnectionFactory cf = getJmsConnectionFactory(0);
Connection cn = cf.createConnection();
runAfter(cn::close);
Session sn = cn.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer pd = sn.createProducer(sn.createQueue(queueName.toString()));
@ -217,11 +212,13 @@ public class ProtocolsMessageLoadBalancingTest extends ClusterTestBase {
// Create Consumers
Connection cn0 = cf0.createConnection();
runAfter(cn0::close);
Session sn0 = cn0.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer c0 = sn0.createConsumer(sn0.createQueue(queueName.toString()));
cn0.start();
Connection cn1 = cf1.createConnection();
runAfter(cn1::close);
Session sn1 = cn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer c1 = sn1.createConsumer(sn0.createQueue(queueName.toString()));
cn1.start();
@ -258,8 +255,8 @@ public class ProtocolsMessageLoadBalancingTest extends ClusterTestBase {
cn0.close();
// Messages should stay in node 1 and note get redistributed.
assertEquals(NUMBER_OF_MESSAGES, servers[0].locateQueue(queueName).getMessageCount());
assertEquals(0, servers[1].locateQueue(queueName).getMessageCount());
Wait.assertEquals(NUMBER_OF_MESSAGES, servers[0].locateQueue(queueName)::getMessageCount);
Wait.assertEquals(0, servers[1].locateQueue(queueName)::getMessageCount);
}
@Test
@ -293,6 +290,7 @@ public class ProtocolsMessageLoadBalancingTest extends ClusterTestBase {
Thread.sleep(200);
Connection connection = factory.createConnection();
runAfter(connection::close);
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer consumer = session.createConsumer(session.createQueue("queues.expiry"));
@ -322,6 +320,7 @@ public class ProtocolsMessageLoadBalancingTest extends ClusterTestBase {
for (int node = 0; node < NUMBER_OF_SERVERS; node++) {
factory[node] = getJmsConnectionFactory(node);
connection[node] = factory[node].createConnection();
runAfter(connection[node]::close);
session[node] = connection[node].createSession(false, Session.AUTO_ACKNOWLEDGE);
consumer[node] = session[node].createConsumer(session[node].createQueue(queueName.toString()));
}
@ -400,6 +399,7 @@ public class ProtocolsMessageLoadBalancingTest extends ClusterTestBase {
for (int node = 0; node < NUMBER_OF_SERVERS; node++) {
factory[node] = getJmsConnectionFactory(node);
connection[node] = factory[node].createConnection();
runAfter(connection[node]::close);
session[node] = connection[node].createSession(false, Session.AUTO_ACKNOWLEDGE);
consumer[node] = session[node].createConsumer(session[node].createQueue(queueName.toString()));
}
@ -413,8 +413,6 @@ public class ProtocolsMessageLoadBalancingTest extends ClusterTestBase {
if (protocol.equals("AMQP")) {
ServerLocator locator = ActiveMQClient.createServerLocator("tcp://localhost:61616");
locator.setMinLargeMessageSize(1024);
ClientSessionFactory coreFactory = locator.createSessionFactory();
@ -446,13 +444,8 @@ public class ProtocolsMessageLoadBalancingTest extends ClusterTestBase {
for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
StringBuffer stringbuffer = new StringBuffer();
stringbuffer.append("hello");
if (i % 3 == 0) {
// making 1/3 of the messages to be large message
for (int j = 0; j < 300 * 1024; j++) {
stringbuffer.append(" ");
}
}
pd.send(sn.createTextMessage(stringbuffer.toString()));
Message message = sn.createTextMessage(stringbuffer.toString());
pd.send(message);
}
cn.close();
@ -476,7 +469,7 @@ public class ProtocolsMessageLoadBalancingTest extends ClusterTestBase {
for (int i = 0; i < messageCount; i++) {
Message msg = messageConsumer.receive(5000);
Assert.assertNotNull(msg);
Assert.assertNotNull("did not receive message at " + i, msg);
}
// this means no more messages received

View File

@ -1177,16 +1177,16 @@ public class MessageRedistributionTest extends ClusterTestBase {
waitForBindings(1, "queues.testaddress", 2, 1, false);
waitForBindings(2, "queues.testaddress", 2, 1, false);
send(0, "queues.testaddress", QueueImpl.REDISTRIBUTOR_BATCH_SIZE * 2, false, null);
send(0, "queues.testaddress", 200, false, null);
removeConsumer(0);
addConsumer(1, 1, "queue0", null);
Queue queue = servers[1].locateQueue(SimpleString.toSimpleString("queue0"));
Assert.assertNotNull(queue);
Wait.waitFor(() -> queue.getMessageCount() == QueueImpl.REDISTRIBUTOR_BATCH_SIZE * 2);
Wait.waitFor(() -> queue.getMessageCount() == 200);
for (int i = 0; i < QueueImpl.REDISTRIBUTOR_BATCH_SIZE * 2; i++) {
for (int i = 0; i < 200; i++) {
ClientMessage message = consumers[1].getConsumer().receive(5000);
Assert.assertNotNull(message);
message.acknowledge();