ARTEMIS-5093 support configurable onMessage timeout w/closing consumer

This commit is contained in:
Justin Bertram 2024-10-10 14:55:54 -05:00
parent 0ea5cfaf40
commit a5f317dcdb
13 changed files with 139 additions and 21 deletions

View File

@ -47,6 +47,7 @@ public class ServerLocatorConfig {
public int initialConnectAttempts = ActiveMQClient.INITIAL_CONNECT_ATTEMPTS;
public int failoverAttempts = ActiveMQClient.DEFAULT_FAILOVER_ATTEMPTS;
public int initialMessagePacketSize = ActiveMQClient.DEFAULT_INITIAL_MESSAGE_PACKET_SIZE;
public int onMessageCloseTimeout = ActiveMQClient.DEFAULT_ONMESSAGE_CLOSE_TIMEOUT;
public boolean cacheLargeMessagesClient = ActiveMQClient.DEFAULT_CACHE_LARGE_MESSAGE_CLIENT;
public int compressionLevel = ActiveMQClient.DEFAULT_COMPRESSION_LEVEL;
public boolean compressLargeMessage = ActiveMQClient.DEFAULT_COMPRESS_LARGE_MESSAGES;

View File

@ -138,6 +138,8 @@ public final class ActiveMQClient {
public static final int DEFAULT_INITIAL_MESSAGE_PACKET_SIZE = 1500;
public static final int DEFAULT_ONMESSAGE_CLOSE_TIMEOUT = 10_000;
public static final boolean DEFAULT_XA = false;
public static final boolean DEFAULT_HA = false;

View File

@ -744,6 +744,25 @@ public interface ServerLocator extends AutoCloseable {
*/
ServerLocator setInitialMessagePacketSize(int size);
/**
* Returns the timeout for onMessage completion when closing ClientConsumers created through this factory.
* <p>
* Value is in milliseconds, default value is {@link ActiveMQClient#DEFAULT_ONMESSAGE_CLOSE_TIMEOUT}.
*
* @return the timeout for onMessage completion when closing ClientConsumers created through this factory
*/
int getOnMessageCloseTimeout();
/**
* Sets the timeout in milliseconds for onMessage completion when closing ClientConsumers created through this factory.
* <p>
* A value of -1 means wait until the onMessage completes no matter how long it takes.
*
* @param onMessageCloseTimeout how long to wait in milliseconds for the ClientConsumer's MessageHandler's onMessage method to finish before closing or stopping the ClientConsumer.
* @return this ServerLocator
*/
ServerLocator setOnMessageCloseTimeout(int onMessageCloseTimeout);
/**
* Adds an interceptor which will be executed <em>after packets are received from the server</em>.
*

View File

@ -36,8 +36,8 @@ public interface ActiveMQClientLogger {
@LogMessage(id = 212001, value = "Error on clearing messages", level = LogMessage.Level.WARN)
void errorClearingMessages(Throwable e);
@LogMessage(id = 212002, value = "Timed out waiting for handler to complete processing", level = LogMessage.Level.WARN)
void timeOutWaitingForProcessing();
@LogMessage(id = 212002, value = "Timed out after waiting {}ms for handler to complete processing", level = LogMessage.Level.WARN)
void timeOutWaitingForProcessing(long duration);
@LogMessage(id = 212003, value = "Unable to close session", level = LogMessage.Level.WARN)
void unableToCloseSession(Exception e);

View File

@ -56,8 +56,6 @@ public final class ClientConsumerImpl implements ClientConsumerInternal {
private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private static final long CLOSE_TIMEOUT_MILLISECONDS = 10000;
private static final int NUM_PRIORITIES = 10;
public static final SimpleString FORCED_DELIVERY_MESSAGE = SimpleString.of("_hornetq.forced.delivery.seq");
@ -137,6 +135,8 @@ public final class ClientConsumerImpl implements ClientConsumerInternal {
private final ClassLoader contextClassLoader;
private volatile boolean manualFlowManagement;
private final int onMessageCloseTimeout;
public ClientConsumerImpl(final ClientSessionInternal session,
final ConsumerContext consumerContext,
final SimpleString queueName,
@ -151,7 +151,8 @@ public final class ClientConsumerImpl implements ClientConsumerInternal {
final Executor flowControlExecutor,
final SessionContext sessionContext,
final ClientSession.QueueQuery queueInfo,
final ClassLoader contextClassLoader) {
final ClassLoader contextClassLoader,
final int onMessageCloseTimeout) {
this.consumerContext = consumerContext;
this.queueName = queueName;
@ -182,6 +183,8 @@ public final class ClientConsumerImpl implements ClientConsumerInternal {
this.flowControlExecutor = flowControlExecutor;
this.onMessageCloseTimeout = onMessageCloseTimeout;
if (logger.isTraceEnabled()) {
logger.trace("{}:: being created at", this, new Exception("trace"));
}
@ -921,10 +924,12 @@ public final class ClientConsumerImpl implements ClientConsumerInternal {
sessionExecutor.execute(future);
boolean ok = future.await(ClientConsumerImpl.CLOSE_TIMEOUT_MILLISECONDS);
if (!ok) {
ActiveMQClientLogger.LOGGER.timeOutWaitingForProcessing();
if (onMessageCloseTimeout == -1) {
future.await();
} else {
if (!future.await(onMessageCloseTimeout)) {
ActiveMQClientLogger.LOGGER.timeOutWaitingForProcessing(onMessageCloseTimeout);
}
}
}

View File

@ -841,7 +841,7 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C
SessionContext context = createSessionChannel(name, username, password, xa, autoCommitSends, autoCommitAcks, preAcknowledge, clientID);
ClientSessionInternal session = new ClientSessionImpl(this, name, username, password, xa, autoCommitSends, autoCommitAcks, preAcknowledge, serverLocator.isBlockOnAcknowledge(), serverLocator.isAutoGroup(), ackBatchSize, serverLocator.getConsumerWindowSize(), serverLocator.getConsumerMaxRate(), serverLocator.getConfirmationWindowSize(), serverLocator.getProducerWindowSize(), serverLocator.getProducerMaxRate(), serverLocator.isBlockOnNonDurableSend(), serverLocator.isBlockOnDurableSend(), serverLocator.isCacheLargeMessagesClient(), serverLocator.getMinLargeMessageSize(), serverLocator.isCompressLargeMessage(), serverLocator.getCompressionLevel(), serverLocator.getInitialMessagePacketSize(), serverLocator.getGroupID(), context, orderedExecutorFactory.getExecutor(), orderedExecutorFactory.getExecutor(), flowControlExecutor, orderedExecutorFactory.getExecutor());
ClientSessionInternal session = new ClientSessionImpl(this, name, username, password, xa, autoCommitSends, autoCommitAcks, preAcknowledge, serverLocator.isBlockOnAcknowledge(), serverLocator.isAutoGroup(), ackBatchSize, serverLocator.getConsumerWindowSize(), serverLocator.getConsumerMaxRate(), serverLocator.getConfirmationWindowSize(), serverLocator.getProducerWindowSize(), serverLocator.getProducerMaxRate(), serverLocator.isBlockOnNonDurableSend(), serverLocator.isBlockOnDurableSend(), serverLocator.isCacheLargeMessagesClient(), serverLocator.getMinLargeMessageSize(), serverLocator.isCompressLargeMessage(), serverLocator.getCompressionLevel(), serverLocator.getInitialMessagePacketSize(), serverLocator.getGroupID(), serverLocator.getOnMessageCloseTimeout(), context, orderedExecutorFactory.getExecutor(), orderedExecutorFactory.getExecutor(), flowControlExecutor, orderedExecutorFactory.getExecutor());
synchronized (sessions) {
if (closed || !clientProtocolManager.isAlive()) {

View File

@ -144,6 +144,8 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi
private final String groupID;
private volatile int onMessageCloseTimeout;
private volatile boolean inClose;
private volatile boolean mayAttemptToFailover = true;
@ -189,6 +191,7 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi
final int compressionLevel,
final int initialMessagePacketSize,
final String groupID,
final int onMessageCloseTimeout,
final SessionContext sessionContext,
final Executor executor,
final Executor confirmationExecutor,
@ -246,6 +249,8 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi
this.groupID = groupID;
this.onMessageCloseTimeout = onMessageCloseTimeout;
producerCreditManager = new ClientProducerCreditManagerImpl(this, producerWindowSize);
this.sessionContext = sessionContext;
@ -2012,7 +2017,7 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi
final boolean browseOnly) throws ActiveMQException {
checkClosed();
ClientConsumerInternal consumer = sessionContext.createConsumer(queueName, filterString, priority, windowSize, maxRate, ackBatchSize, browseOnly, executor, flowControlExecutor);
ClientConsumerInternal consumer = sessionContext.createConsumer(queueName, filterString, priority, windowSize, maxRate, ackBatchSize, browseOnly, executor, flowControlExecutor, onMessageCloseTimeout);
addConsumer(consumer);

View File

@ -1298,6 +1298,18 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
return this;
}
@Override
public int getOnMessageCloseTimeout() {
return config.onMessageCloseTimeout;
}
@Override
public ServerLocator setOnMessageCloseTimeout(int onMessageCloseTimeout) {
checkWrite();
config.onMessageCloseTimeout = onMessageCloseTimeout;
return this;
}
@Override
public ServerLocatorImpl setGroupID(final String groupID) {
checkWrite();

View File

@ -400,7 +400,8 @@ public class ActiveMQSessionContext extends SessionContext {
int ackBatchSize,
boolean browseOnly,
Executor executor,
Executor flowControlExecutor) throws ActiveMQException {
Executor flowControlExecutor,
int onMessageCloseTimeout) throws ActiveMQException {
long consumerID = idGenerator.generateID();
ActiveMQConsumerContext consumerContext = new ActiveMQConsumerContext(consumerID);
@ -420,7 +421,7 @@ public class ActiveMQSessionContext extends SessionContext {
// The value we send is just a hint
final int consumerWindowSize = windowSize == ActiveMQClient.DEFAULT_CONSUMER_WINDOW_SIZE ? this.getDefaultConsumerWindowSize(queueInfo) : windowSize;
return new ClientConsumerImpl(session, consumerContext, queueName, filterString, priority, browseOnly, consumerWindowSize, calcWindowSize(consumerWindowSize), ackBatchSize, maxRate > 0 ? new TokenBucketLimiterImpl(maxRate, false) : null, executor, flowControlExecutor, this, queueInfo.toQueueQuery(), lookupTCCL());
return new ClientConsumerImpl(session, consumerContext, queueName, filterString, priority, browseOnly, consumerWindowSize, calcWindowSize(consumerWindowSize), ackBatchSize, maxRate > 0 ? new TokenBucketLimiterImpl(maxRate, false) : null, executor, flowControlExecutor, this, queueInfo.toQueueQuery(), lookupTCCL(), onMessageCloseTimeout);
}
@Override

View File

@ -340,7 +340,8 @@ public abstract class SessionContext {
int ackBatchSize,
boolean browseOnly,
Executor executor,
Executor flowControlExecutor) throws ActiveMQException;
Executor flowControlExecutor,
int onMessageCloseTimeout) throws ActiveMQException;
/**
* Performs a round trip to the server requesting what is the current tx timeout on the session

View File

@ -41,6 +41,14 @@ public class FutureLatch implements Runnable {
}
}
public void await() {
try {
latch.await();
} catch (InterruptedException e) {
// ignore
}
}
@Override
public void run() {
latch.countDown();

View File

@ -83,7 +83,8 @@ public class HornetQClientSessionContext extends ActiveMQSessionContext {
int ackBatchSize,
boolean browseOnly,
Executor executor,
Executor flowControlExecutor) throws ActiveMQException {
Executor flowControlExecutor,
int onMessageCloseTimeout) throws ActiveMQException {
long consumerID = idGenerator.generateID();
ActiveMQConsumerContext consumerContext = new ActiveMQConsumerContext(consumerID);
@ -96,7 +97,7 @@ public class HornetQClientSessionContext extends ActiveMQSessionContext {
// could be overridden on the queue settings
// The value we send is just a hint
return new ClientConsumerImpl(session, consumerContext, queueName, filterString, priority, browseOnly, windowSize, calcWindowSize(windowSize), ackBatchSize, maxRate > 0 ? new TokenBucketLimiterImpl(maxRate, false) : null, executor, flowControlExecutor, this, queueInfo.toQueueQuery(), lookupTCCL());
return new ClientConsumerImpl(session, consumerContext, queueName, filterString, priority, browseOnly, windowSize, calcWindowSize(windowSize), ackBatchSize, maxRate > 0 ? new TokenBucketLimiterImpl(maxRate, false) : null, executor, flowControlExecutor, this, queueInfo.toQueueQuery(), lookupTCCL(), onMessageCloseTimeout);
}
}

View File

@ -16,16 +16,13 @@
*/
package org.apache.activemq.artemis.tests.integration.client;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
import org.apache.activemq.artemis.api.core.client.ClientConsumer;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.ClientProducer;
@ -34,9 +31,17 @@ import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.api.core.client.MessageHandler;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.logs.AssertionLoggerHandler;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class MessageHandlerTest extends ActiveMQTestBase {
@ -117,6 +122,64 @@ public class MessageHandlerTest extends ActiveMQTestBase {
session.close();
}
@Test
@Timeout(20)
public void testMessageHandlerCloseTimeout() throws Exception {
// create Netty acceptor so client can use new onMessageCloseTimeout URL parameter
server.getRemotingService().createAcceptor("netty", "tcp://127.0.0.1:61616").start();
final int TIMEOUT = 100;
locator = ActiveMQClient.createServerLocator("tcp://127.0.0.1:61616?onMessageCloseTimeout=" + TIMEOUT);
sf = createSessionFactory(locator);
ClientSession session = addClientSession(sf.createSession(false, true, true));
session.createQueue(QueueConfiguration.of(QUEUE).setDurable(false));
ClientProducer producer = session.createProducer(QUEUE);
producer.send(createTextMessage(session, "m"));
ClientConsumer consumer = session.createConsumer(QUEUE, null, false);
session.start();
try (AssertionLoggerHandler loggerHandler = new AssertionLoggerHandler()) {
CountDownLatch beginLatch = new CountDownLatch(1);
AtomicBoolean messageHandlerFinished = new AtomicBoolean(false);
CountDownLatch completedLatch = new CountDownLatch(1);
consumer.setMessageHandler(message -> {
try {
beginLatch.countDown();
// don't just Thread.sleep() here because it will be interrupted on
// ClientConsumer.close()
while (!messageHandlerFinished.get()) {
try {
Thread.sleep(10);
} catch (InterruptedException e) {
// ignore
}
}
} finally {
completedLatch.countDown();
}
});
try {
beginLatch.await();
long start = System.currentTimeMillis();
consumer.close();
long duration = System.currentTimeMillis() - start;
assertTrue(duration >= TIMEOUT, "Closing consumer took " + duration + "ms");
assertEquals(1, completedLatch.getCount(), "MessageHandler should still be working!");
} finally {
// don't let the MessageHandler stick around even if an assertion failed
messageHandlerFinished.set(true);
}
assertTrue(loggerHandler.findText("AMQ212002", TIMEOUT + "ms"), "timeout message not found in logs");
assertTrue(completedLatch.await(10, TimeUnit.SECONDS), "MessageHandler should complete!");
}
}
@Test
public void testSetResetMessageHandler() throws Exception {
final ClientSession session = sf.createSession(false, true, true);