ARTEMIS-4410 - process deliveries before removing consumer on session close, ensure strict order for a single consumer

This commit is contained in:
Gary Tully 2023-08-30 17:52:33 +01:00 committed by Robbie Gemmell
parent 60ac0f32a5
commit b11945e0c7
3 changed files with 92 additions and 7 deletions

View File

@ -573,8 +573,6 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
del.finish(); del.finish();
} }
removeItself();
List<MessageReference> refs = cancelRefs(failed, false, null); List<MessageReference> refs = cancelRefs(failed, false, null);
Transaction tx = new TransactionImpl(storageManager); Transaction tx = new TransactionImpl(storageManager);
@ -587,6 +585,9 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
tx.rollback(); tx.rollback();
// started is false, leaving remove till after cancel ensures order for a single exclusive consumer
removeItself();
addLingerRefs(); addLingerRefs();
if (!browseOnly) { if (!browseOnly) {

View File

@ -35,6 +35,9 @@ import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.api.core.QueueConfiguration; import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.tests.util.CFUtil; import org.apache.activemq.artemis.tests.util.CFUtil;
import org.apache.activemq.artemis.tests.util.JMSTestBase; import org.apache.activemq.artemis.tests.util.JMSTestBase;
import org.apache.activemq.artemis.tests.util.Wait; import org.apache.activemq.artemis.tests.util.Wait;
@ -50,11 +53,12 @@ import static org.apache.activemq.artemis.tests.util.CFUtil.createConnectionFact
public class JMSOrderTest extends JMSTestBase { public class JMSOrderTest extends JMSTestBase {
String protocol; String protocol;
boolean exclusive;
ConnectionFactory protocolCF; ConnectionFactory protocolCF;
public JMSOrderTest(String protocol) { public JMSOrderTest(String protocol, boolean exclusive) {
this.protocol = protocol; this.protocol = protocol;
this.exclusive = exclusive;
} }
@Before @Before
@ -62,9 +66,16 @@ public class JMSOrderTest extends JMSTestBase {
protocolCF = createConnectionFactory(protocol, "tcp://localhost:61616"); protocolCF = createConnectionFactory(protocol, "tcp://localhost:61616");
} }
@Parameterized.Parameters(name = "protocol={0}") @Parameterized.Parameters(name = "protocol={0}&exclusive={1}")
public static Collection getParameters() { public static Collection getParameters() {
return Arrays.asList(new Object[][]{{"AMQP"}, {"OPENWIRE"}, {"CORE"}}); return Arrays.asList(new Object[][]{{"AMQP", true}, {"AMQP", false}, {"OPENWIRE", true}, {"OPENWIRE", false}, {"CORE", true}, {"CORE", false}});
}
@Override
protected void extraServerConfig(ActiveMQServer server) {
if (exclusive) {
server.getConfiguration().getAddressSettings().put("#", new AddressSettings().setAutoCreateQueues(true).setAutoCreateAddresses(true).setDeadLetterAddress(new SimpleString("ActiveMQ.DLQ")).setDefaultExclusiveQueue(true));
}
} }
protected void sendToAmqQueue(int count) throws Exception { protected void sendToAmqQueue(int count) throws Exception {

View File

@ -26,11 +26,14 @@ import javax.jms.TextMessage;
import java.util.Map; import java.util.Map;
import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQPrefetchPolicy;
import org.apache.activemq.RedeliveryPolicy;
import org.apache.activemq.artemis.api.core.QueueConfiguration; import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.transport.failover.FailoverTransport;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
@ -46,7 +49,9 @@ public class PrefetchRedeliveryCountOpenwireTest extends OpenWireTestBase {
protected void configureAddressSettings(Map<String, AddressSettings> addressSettingsMap) { protected void configureAddressSettings(Map<String, AddressSettings> addressSettingsMap) {
super.configureAddressSettings(addressSettingsMap); super.configureAddressSettings(addressSettingsMap);
// force send to dlq early // force send to dlq early
addressSettingsMap.get("#").setMaxDeliveryAttempts(2); addressSettingsMap.put("exampleQueue", new AddressSettings().setAutoCreateQueues(false).setAutoCreateAddresses(false).setDeadLetterAddress(new SimpleString("ActiveMQ.DLQ")).setAutoCreateAddresses(true).setMaxDeliveryAttempts(2));
// force send to dlq late
addressSettingsMap.put("exampleQueueTwo", new AddressSettings().setAutoCreateQueues(false).setAutoCreateAddresses(false).setDeadLetterAddress(new SimpleString("ActiveMQ.DLQ")).setAutoCreateAddresses(true).setMaxDeliveryAttempts(4000));
} }
@Test(timeout = 60_000) @Test(timeout = 60_000)
@ -93,4 +98,72 @@ public class PrefetchRedeliveryCountOpenwireTest extends OpenWireTestBase {
} }
} }
} }
@Test(timeout = 60_000)
public void testExclusiveConsumerOrderOnReconnectionLargePrefetch() throws Exception {
Connection exConn = null;
SimpleString durableQueue = new SimpleString("exampleQueueTwo");
this.server.createQueue(new QueueConfiguration(durableQueue).setRoutingType(RoutingType.ANYCAST).setExclusive(true));
try {
ActiveMQConnectionFactory exFact = new ActiveMQConnectionFactory();
exFact.setWatchTopicAdvisories(false);
ActiveMQPrefetchPolicy prefetchPastMaxDeliveriesInLoop = new ActiveMQPrefetchPolicy();
prefetchPastMaxDeliveriesInLoop.setAll(2000);
exFact.setPrefetchPolicy(prefetchPastMaxDeliveriesInLoop);
RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
redeliveryPolicy.setMaximumRedeliveries(4000);
exFact.setRedeliveryPolicy(redeliveryPolicy);
Queue queue = new ActiveMQQueue("exampleQueueTwo");
exConn = exFact.createConnection();
exConn.start();
Session session = exConn.createSession(true, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = session.createProducer(queue);
TextMessage message = session.createTextMessage("This is a text message");
int numMessages = 2000;
for (int i = 0; i < numMessages; i++) {
message.setIntProperty("SEQ", i);
producer.send(message);
}
session.commit();
exConn.close();
final int batch = 100;
for (int i = 0; i < numMessages; i += batch) {
// connection per batch
exConn = exFact.createConnection();
exConn.start();
session = exConn.createSession(true, Session.SESSION_TRANSACTED);
MessageConsumer messageConsumer = session.createConsumer(queue);
TextMessage messageReceived = null;
for (int j = 0; j < batch; j++) { // a small batch
messageReceived = (TextMessage) messageConsumer.receive(5000);
Assert.assertNotNull("null @ i=" + i, messageReceived);
Assert.assertEquals(i + j, messageReceived.getIntProperty("SEQ"));
assertEquals("This is a text message", messageReceived.getText());
}
session.commit();
((FailoverTransport)((org.apache.activemq.ActiveMQConnection)exConn).getTransport().narrow(FailoverTransport.class)).stop();
exConn.close();
}
} finally {
if (exConn != null) {
exConn.close();
}
}
}
} }