From b664022a1ed772e74b518dd5d12f44ca53fb2f16 Mon Sep 17 00:00:00 2001 From: Gary Tully Date: Thu, 18 May 2023 13:17:10 +0100 Subject: [PATCH] ARTEMIS-4284 - sync operwire remove consumer with the operation context to ensure prefetched messages are available to the next consumer in order This closes #4483 --- .../protocol/openwire/amq/AMQConsumer.java | 5 + .../PrefetchRedeliveryCountOpenwireTest.java | 96 +++++++++++++++++++ 2 files changed, 101 insertions(+) create mode 100644 tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/PrefetchRedeliveryCountOpenwireTest.java diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java index fee1f6c849..8397e5f84e 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java @@ -416,6 +416,11 @@ public class AMQConsumer { if (delayedDispatchPrompter != null) { delayedDispatchPrompter.cancel(false); } + if (info.getPrefetchSize() > 1) { + // because response required is false on a RemoveConsumerCommand, a new consumer could miss canceled prefetched messages + // we await the operation context completion before handling a subsequent command + session.getCoreSession().getSessionContext().waitCompletion(); + } } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/PrefetchRedeliveryCountOpenwireTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/PrefetchRedeliveryCountOpenwireTest.java new file mode 100644 index 0000000000..5fef3c4a9c --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/PrefetchRedeliveryCountOpenwireTest.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.openwire; + +import javax.jms.Connection; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; +import javax.jms.TextMessage; + +import java.util.Map; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.artemis.api.core.QueueConfiguration; +import org.apache.activemq.artemis.api.core.RoutingType; +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.core.settings.impl.AddressSettings; +import org.apache.activemq.command.ActiveMQQueue; +import org.junit.Assert; +import org.junit.Test; + +public class PrefetchRedeliveryCountOpenwireTest extends OpenWireTestBase { + + @Override + public void setUp() throws Exception { + realStore = true; + super.setUp(); + } + + @Override + protected void configureAddressSettings(Map addressSettingsMap) { + super.configureAddressSettings(addressSettingsMap); + // force send to dlq early + addressSettingsMap.get("#").setMaxDeliveryAttempts(2); + } + + @Test(timeout = 60_000) + public void testConsumerSingleMessageLoop() throws Exception { + Connection exConn = null; + + SimpleString durableQueue = new SimpleString("exampleQueue"); + this.server.createQueue(new QueueConfiguration(durableQueue).setRoutingType(RoutingType.ANYCAST).setExclusive(true)); + + try { + ActiveMQConnectionFactory exFact = new ActiveMQConnectionFactory(); + exFact.setWatchTopicAdvisories(false); + + Queue queue = new ActiveMQQueue("exampleQueue"); + + exConn = exFact.createConnection(); + + exConn.start(); + + Session session = exConn.createSession(false, Session.AUTO_ACKNOWLEDGE); + + MessageProducer producer = session.createProducer(queue); + + TextMessage message = session.createTextMessage("This is a text message"); + + int numMessages = 20; + for (int i = 0; i < numMessages; i++) { + producer.send(message); + } + + for (int i = 0; i < numMessages; i++) { + // consumer per message + MessageConsumer messageConsumer = session.createConsumer(queue); + + TextMessage messageReceived = (TextMessage) messageConsumer.receive(5000); + Assert.assertNotNull(messageReceived); + + assertEquals("This is a text message", messageReceived.getText()); + messageConsumer.close(); + } + } finally { + if (exConn != null) { + exConn.close(); + } + } + } +}