ARTEMIS-4141 Update credits even for expired messages

When big messages are produced if a consumer receives an expired message, the credits are not updated, so if the consumer is too slow and an expiry delay has been set, we can end up with a situation where there are no more credits which prevents the consumer from receiving any more messages.
This commit is contained in:
Nicolas Filotto 2023-09-22 09:06:11 +02:00 committed by clebertsuconic
parent b74f47867f
commit 8599917222
5 changed files with 120 additions and 3 deletions

View File

@ -32,4 +32,13 @@ public interface MessageHandler {
* @param message a message
*/
void onMessage(ClientMessage message);
/**
* Notifies the MessageHandler that an expired message has been received.
*
* @param message a message
*/
default void onMessageExpired(ClientMessage message) {
// Do nothing by default
}
}

View File

@ -992,12 +992,17 @@ public final class ClientConsumerImpl implements ClientConsumerInternal {
logger.trace("{}::Handler.onMessage done", this);
} else {
theHandler.onMessageExpired(message);
logger.trace("{}::Handler.onMessageExpired done", this);
session.expire(this, message);
}
if (message.isLargeMessage()) {
message.discardBody();
}
} else {
session.expire(this, message);
}
// If slow consumer, we need to send 1 credit to make sure we get another message
if (clientWindowSize == 0) {

View File

@ -143,4 +143,13 @@ public class JMSMessageListenerWrapper implements MessageHandler {
session.setRecoverCalled(false);
}
@Override
public void onMessageExpired(ClientMessage message) {
try {
message.checkCompletion();
} catch (ActiveMQException e) {
ActiveMQJMSClientLogger.LOGGER.errorProcessingMessage(e);
}
}
}

View File

@ -42,6 +42,7 @@ import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryInternal
import org.apache.activemq.artemis.core.client.impl.ClientSessionInternal;
import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
import org.apache.activemq.artemis.jms.client.ActiveMQDestination;
import org.apache.activemq.artemis.jms.client.ActiveMQJMSClientLogger;
import org.apache.activemq.artemis.jms.client.ActiveMQMessage;
import org.apache.activemq.artemis.jms.client.ConnectionFactoryOptions;
import org.apache.activemq.artemis.jms.client.compatible1X.ActiveMQCompatibleMessage;
@ -397,6 +398,16 @@ public class ActiveMQMessageHandler implements MessageHandler, FailoverEventList
}
@Override
public void onMessageExpired(ClientMessage message) {
try {
message.checkCompletion();
} catch (ActiveMQException e) {
ActiveMQJMSClientLogger.LOGGER.errorProcessingMessage(e);
}
}
public void start() throws ActiveMQException {
session.start();
}

View File

@ -0,0 +1,83 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.jms.client;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.tests.util.JMSTestBase;
import org.apache.commons.lang3.RandomUtils;
import org.junit.Test;
public class SlowLargeMessageConsumerTest extends JMSTestBase {
private static final String TOPIC = "SlowLargeMessageConsumerTopic";
@Override
protected void extraServerConfig(ActiveMQServer server) {
server.getConfiguration().getAddressSettings().put(TOPIC, new AddressSettings().setExpiryDelay(100L).setMaxSizeBytes(1024));
}
/**
* @see <a href="https://issues.apache.org/jira/browse/ARTEMIS-4141">ARTEMIS-4141</a>
*/
@Test
public void ensureSlowConsumerOfLargeMessageNeverGetsStuck() throws Exception {
try (Connection conn = cf.createConnection()) {
conn.start();
try (Session sessionConsumer = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
Session sessionProducer = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE)) {
final Destination topic = sessionConsumer.createTopic(TOPIC);
final MessageConsumer consumer = sessionConsumer.createConsumer(topic);
final AtomicBoolean slow = new AtomicBoolean(true);
final CountDownLatch messageReceived = new CountDownLatch(1);
consumer.setMessageListener(message -> {
if (slow.get()) {
try {
TimeUnit.MILLISECONDS.sleep(50);
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
}
} else {
messageReceived.countDown();
}
});
final MessageProducer producer = sessionProducer.createProducer(topic);
int msgSize = 512 * 1024;
for (int i = 0; i < 100; i++) {
producer.send(sessionProducer.createObjectMessage(RandomUtils.nextBytes(msgSize)));
TimeUnit.MILLISECONDS.sleep(25);
}
TimeUnit.MILLISECONDS.sleep(100);
slow.set(false);
producer.send(sessionProducer.createObjectMessage(RandomUtils.nextBytes(msgSize)));
assertTrue(messageReceived.await(500, TimeUnit.MILLISECONDS));
}
}
}
}