ARTEMIS-3386 Expiry messages using too many threads

This commit is contained in:
Clebert Suconic 2021-07-14 17:21:28 -04:00 committed by clebertsuconic
parent 18e9dee490
commit b4aef3fca8
9 changed files with 197 additions and 38 deletions

View File

@ -66,7 +66,7 @@ public class Wait {
}
public static void assertEquals(Long size, LongCondition condition, long timeout, long sleepMillis) throws Exception {
assertEquals(size, condition, timeout, sleepMillis, true);
assertEquals(size, condition, timeout, sleepMillis, false);
}
public static void assertEquals(Long size, LongCondition condition, long timeout, long sleepMillis, boolean printThreadDump) throws Exception {

View File

@ -30,9 +30,11 @@ import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Stream;
import org.apache.activemq.artemis.api.core.ActiveMQAddressDoesNotExistException;
@ -1824,7 +1826,14 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
// This is to avoid leaks on PostOffice between stops and starts
for (Queue queue : iterableOf(getLocalQueues())) {
try {
queue.expireReferences();
CountDownLatch latch = new CountDownLatch(1);
queue.expireReferences(latch::countDown);
// the idea is in fact to block the Reaper while the Queue is executing reaping.
// This would avoid another eventual expiry to be called if the period for reaping is too small
// This should also avoid bursts in CPU consumption because of the expiry reaping
if (!latch.await(10, TimeUnit.SECONDS)) {
ActiveMQServerLogger.LOGGER.errorExpiringMessages(new TimeoutException(queue.getName().toString()));
}
} catch (Exception e) {
ActiveMQServerLogger.LOGGER.errorExpiringMessages(e);
}

View File

@ -325,7 +325,11 @@ public interface Queue extends Bindable,CriticalComponent {
*/
int expireReferences(Filter filter) throws Exception;
void expireReferences() throws Exception;
default void expireReferences() {
expireReferences((Runnable)null);
}
void expireReferences(Runnable done);
void expire(MessageReference ref) throws Exception;

View File

@ -2380,14 +2380,25 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
}
@Override
public void expireReferences() {
public void expireReferences(Runnable done) {
if (isExpirationRedundant()) {
if (done != null) {
done.run();
}
return;
}
if (!queueDestroyed && expiryScanner.scannerRunning.get() == 0) {
expiryScanner.scannerRunning.incrementAndGet();
if (expiryScanner.scannerRunning.incrementAndGet() == 1) {
expiryScanner.doneCallback = done;
}
getExecutor().execute(expiryScanner);
} else {
// expire is already happening on this queue, move on!
if (done != null) {
done.run();
}
}
}
@ -2405,13 +2416,16 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
class ExpiryScanner implements Runnable {
public Runnable doneCallback;
public AtomicInteger scannerRunning = new AtomicInteger(0);
LinkedListIterator<MessageReference> iter = null;
@Override
public void run() {
boolean expired = false;
boolean hasElements = false;
int elementsIterated = 0;
int elementsExpired = 0;
LinkedList<MessageReference> expiredMessages = new LinkedList<>();
@ -2424,32 +2438,54 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
logger.debug("Scanning for expires on " + QueueImpl.this.getName());
}
LinkedListIterator<MessageReference> iter = iterator();
if (iter == null) {
if (server.hasBrokerQueuePlugins()) {
try {
server.callBrokerQueuePlugins((p) -> p.beforeExpiryScan(QueueImpl.this));
} catch (Exception e) {
logger.warn(e.getMessage(), e);
}
}
iter = iterator();
}
try {
while (postOffice.isStarted() && iter.hasNext()) {
hasElements = true;
MessageReference ref = iter.next();
if (ref.getMessage().isExpired()) {
elementsExpired++;
incDelivering(ref);
expired = true;
expiredMessages.add(ref);
iter.remove();
if (++elementsExpired >= MAX_DELIVERIES_IN_LOOP) {
logger.debug("Breaking loop of expiring");
scannerRunning.incrementAndGet();
getExecutor().execute(this);
break;
}
}
if (++elementsIterated >= MAX_DELIVERIES_IN_LOOP) {
logger.debug("Breaking loop of expiring");
scannerRunning.incrementAndGet();
getExecutor().execute(this);
break;
}
}
} finally {
try {
if (scannerRunning.decrementAndGet() == 0) {
if (server.hasBrokerQueuePlugins()) {
try {
server.callBrokerQueuePlugins((p) -> p.afterExpiryScan(QueueImpl.this));
} catch (Exception e) {
logger.warn(e.getMessage(), e);
}
}
iter.close();
} catch (Throwable ignored) {
iter = null;
if (doneCallback != null) {
doneCallback.run();
doneCallback = null;
}
}
scannerRunning.decrementAndGet();
logger.debug("Scanning for expires on " + QueueImpl.this.getName() + " done");
}
@ -2473,7 +2509,6 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
}
logger.debug("Expired " + elementsExpired + " references");
}
// If empty we need to schedule depaging to make sure we would depage expired messages as well

View File

@ -109,4 +109,18 @@ public interface ActiveMQServerQueuePlugin extends ActiveMQServerBasePlugin {
boolean removeConsumers, boolean autoDeleteAddress) throws ActiveMQException {
}
/**
* To be called before starting expiry scan on the queue
* @param queue
*/
default void beforeExpiryScan(Queue queue) {
}
/**
* To be called before starting expiry scan on the queue
* @param queue
*/
default void afterExpiryScan(Queue queue) {
}
}

View File

@ -863,6 +863,11 @@ public class ScheduledDeliveryHandlerTest extends Assert {
return null;
}
@Override
public void expireReferences(Runnable done) {
}
@Override
public void refDown(MessageReference messageReference) {
@ -1401,7 +1406,7 @@ public class ScheduledDeliveryHandlerTest extends Assert {
}
@Override
public void expireReferences() throws Exception {
public void expireReferences() {
}

View File

@ -16,6 +16,9 @@
*/
package org.apache.activemq.artemis.tests.integration.client;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.SimpleString;
@ -27,6 +30,8 @@ import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServers;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.impl.QueueImpl;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.utils.RandomUtil;
@ -74,6 +79,52 @@ public class ExpiryAddressTest extends ActiveMQTestBase {
m.acknowledge();
}
@Test
public void testExpireSingleMessage() throws Exception {
SimpleString ea = new SimpleString("EA");
SimpleString adSend = new SimpleString("a1");
SimpleString qName = new SimpleString("q1");
SimpleString eq = new SimpleString("EA1");
AddressSettings addressSettings = new AddressSettings().setExpiryAddress(ea);
server.getAddressSettingsRepository().addMatch("#", addressSettings);
clientSession.createQueue(new QueueConfiguration(eq).setAddress(ea).setDurable(false));
clientSession.createQueue(new QueueConfiguration(qName).setAddress(adSend).setDurable(false));
ClientProducer producer = clientSession.createProducer(adSend);
for (int i = 0; i < QueueImpl.MAX_DELIVERIES_IN_LOOP * 2 + 100; i++) {
ClientMessage clientMessage = createTextMessage(clientSession, "notExpired!");
clientMessage.putIntProperty("i", i);
producer.send(clientMessage);
}
ClientMessage clientMessage = createTextMessage(clientSession, "heyho!");
clientMessage.setExpiration(System.currentTimeMillis());
producer.send(clientMessage);
Queue queueQ1 = server.locateQueue("q1");
CountDownLatch latch = new CountDownLatch(10);
for (int i = 0; i < 10; i++) {
// done should be called even for the ones that are ignored because the expiry is already running
queueQ1.expireReferences(latch::countDown);
}
Assert.assertTrue(latch.await(10, TimeUnit.SECONDS));
clientSession.start();
ClientConsumer clientConsumer = clientSession.createConsumer(eq);
ClientMessage m = clientConsumer.receive(5000);
Assert.assertNotNull(m);
Assert.assertEquals(qName.toString(), m.getStringProperty(Message.HDR_ORIGINAL_QUEUE));
Assert.assertEquals(adSend.toString(), m.getStringProperty(Message.HDR_ORIGINAL_ADDRESS));
Assert.assertNotNull(m);
Assert.assertEquals(m.getBodyBuffer().readString(), "heyho!");
m.acknowledge();
}
@Test
public void testBasicSendWithRetroActiveAddressSettings() throws Exception {
// apply "original" address settings

View File

@ -20,8 +20,10 @@ import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ClientConsumer;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
@ -33,8 +35,11 @@ import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServers;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerQueuePlugin;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.tests.util.Wait;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@ -52,6 +57,9 @@ public class ExpiryRunnerTest extends ActiveMQTestBase {
private SimpleString expiryQueue;
private SimpleString expiryAddress;
Queue artemisExpiryQueue;
private ServerLocator locator;
@Test
@ -212,30 +220,57 @@ public class ExpiryRunnerTest extends ActiveMQTestBase {
thr.join();
}
//
// public static void main(final String[] args) throws Exception
// {
// for (int i = 0; i < 1000; i++)
// {
// TestSuite suite = new TestSuite();
// ExpiryRunnerTest expiryRunnerTest = new ExpiryRunnerTest();
// expiryRunnerTest.setName("testExpireWhilstConsuming");
// suite.addTest(expiryRunnerTest);
//
// TestResult result = TestRunner.run(suite);
// if (result.errorCount() > 0 || result.failureCount() > 0)
// {
// System.exit(1);
// }
// }
// }
@Test
public void testManyQueuesExpire() throws Exception {
AtomicInteger currentExpiryHappening = new AtomicInteger();
AtomicInteger maxExpiryHappening = new AtomicInteger(0);
server.registerBrokerPlugin(new ActiveMQServerQueuePlugin() {
@Override
public void beforeExpiryScan(Queue queue) {
currentExpiryHappening.incrementAndGet();
while (!maxExpiryHappening.compareAndSet(maxExpiryHappening.get(), Math.max(maxExpiryHappening.get(), currentExpiryHappening.get()))) {
Thread.yield();
}
}
@Override
public void afterExpiryScan(Queue queue) {
currentExpiryHappening.decrementAndGet();
}
});
Assert.assertTrue(server.hasBrokerQueuePlugins());
server.getAddressSettingsRepository().addMatch("#", new AddressSettings().setExpiryAddress(expiryAddress));
for (int ad = 0; ad < 1000; ad++) {
server.addAddressInfo(new AddressInfo("test" + ad));
server.createQueue(new QueueConfiguration("test" + ad).setAddress("test" + ad).setRoutingType(RoutingType.ANYCAST));
}
ClientProducer producer = clientSession.createProducer();
for (int i = 0; i < 1000; i++) {
ClientMessage message = clientSession.createMessage(true);
message.setExpiration(System.currentTimeMillis());
producer.send("test" + i, message);
}
Wait.assertEquals(1000, artemisExpiryQueue::getMessageCount);
// The system should not burst itself looking for expiration, that would use too many resources from the broker itself
Assert.assertTrue("The System had " + maxExpiryHappening + " threads in parallel scanning for expiration", maxExpiryHappening.get() == 1);
}
@Override
@Before
public void setUp() throws Exception {
super.setUp();
ConfigurationImpl configuration = (ConfigurationImpl) createDefaultInVMConfig().setMessageExpiryScanPeriod(1000);
ConfigurationImpl configuration = (ConfigurationImpl) createDefaultInVMConfig().setMessageExpiryScanPeriod(100);
server = addServer(ActiveMQServers.newActiveMQServer(configuration, false));
// start the server
server.start();
@ -251,7 +286,9 @@ public class ExpiryRunnerTest extends ActiveMQTestBase {
AddressSettings addressSettings = new AddressSettings().setExpiryAddress(expiryAddress);
server.getAddressSettingsRepository().addMatch(qName.toString(), addressSettings);
server.getAddressSettingsRepository().addMatch(qName2.toString(), addressSettings);
clientSession.createQueue(new QueueConfiguration(expiryQueue).setAddress(expiryAddress).setDurable(false));
clientSession.createQueue(new QueueConfiguration(expiryQueue).setAddress(expiryAddress).setDurable(true));
artemisExpiryQueue = server.locateQueue(expiryQueue);
Assert.assertNotNull(artemisExpiryQueue);
}
private static class DummyMessageHandler implements Runnable {

View File

@ -61,6 +61,10 @@ public class FakeQueue extends CriticalComponentImpl implements Queue {
}
@Override
public void expireReferences(Runnable done) {
}
@Override
public PagingStore getPagingStore() {
return null;
@ -503,7 +507,7 @@ public class FakeQueue extends CriticalComponentImpl implements Queue {
}
@Override
public void expireReferences() throws Exception {
public void expireReferences() {
// no-op
}