From e43c5390cf11655a883b9f96f608e65bdd21d6fa Mon Sep 17 00:00:00 2001 From: Wei Yang Date: Mon, 9 Sep 2019 18:45:48 +0800 Subject: [PATCH 1/2] ARTEMIS-2478 Expired message not removed in non destructive queue --- .../artemis/core/server/impl/QueueImpl.java | 14 +++++--------- .../integration/amqp/JMSNonDestructiveTest.java | 10 ++++++---- 2 files changed, 11 insertions(+), 13 deletions(-) diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java index dd7d9cf79b..d18ecbf50e 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java @@ -2745,9 +2745,6 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { logger.trace("Reference " + ref + " being expired"); } removeMessageReference(holder, ref); - - - handled++; consumers.reset(); continue; @@ -2778,8 +2775,9 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { deliveriesInTransit.countUp(); - - removeMessageReference(holder, ref); + if (!nonDestructive) { + removeMessageReference(holder, ref); + } ref.setInDelivery(true); handledconsumer = consumer; handled++; @@ -2836,10 +2834,8 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { } protected void removeMessageReference(ConsumerHolder holder, MessageReference ref) { - if (!nonDestructive) { - holder.iter.remove(); - refRemoved(ref); - } + holder.iter.remove(); + refRemoved(ref); } private void checkDepage() { diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSNonDestructiveTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSNonDestructiveTest.java index 32dcbb84fd..6fbb71d2fa 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSNonDestructiveTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSNonDestructiveTest.java @@ -53,14 +53,16 @@ public class JMSNonDestructiveTest extends JMSClientTestSupport { private ConnectionSupplier CoreConnection = () -> createCoreConnection(); protected final boolean persistenceEnabled; + protected final long scanPeriod; - public JMSNonDestructiveTest(boolean persistenceEnabled) { + public JMSNonDestructiveTest(boolean persistenceEnabled, long scanPeriod) { this.persistenceEnabled = persistenceEnabled; + this.scanPeriod = scanPeriod; } - @Parameterized.Parameters(name = "persistenceEnabled={0}") + @Parameterized.Parameters(name = "persistenceEnabled={0}, scanPeriod={1}") public static Collection data() { - Object[][] params = new Object[][]{{false}, {true}}; + Object[][] params = new Object[][]{{false, 100}, {true, 100}, {true, -1}}; return Arrays.asList(params); } @@ -72,7 +74,7 @@ public class JMSNonDestructiveTest extends JMSClientTestSupport { @Override protected void addConfiguration(ActiveMQServer server) { server.getConfiguration().setPersistenceEnabled(persistenceEnabled); - server.getConfiguration().setMessageExpiryScanPeriod(100); + server.getConfiguration().setMessageExpiryScanPeriod(scanPeriod); server.getAddressSettingsRepository().addMatch(NON_DESTRUCTIVE_QUEUE_NAME, new AddressSettings().setDefaultNonDestructive(true)); server.getAddressSettingsRepository().addMatch(NON_DESTRUCTIVE_EXPIRY_QUEUE_NAME, new AddressSettings().setDefaultNonDestructive(true).setExpiryDelay(100L)); server.getAddressSettingsRepository().addMatch(NON_DESTRUCTIVE_LVQ_QUEUE_NAME, new AddressSettings().setDefaultLastValueQueue(true).setDefaultNonDestructive(true)); From 793a45f35a8fe5eca7df9e9657d89e93cf0bcdd2 Mon Sep 17 00:00:00 2001 From: Clebert Suconic Date: Wed, 11 Sep 2019 09:36:53 -0400 Subject: [PATCH 2/2] NO-JIRA Speeding up JMSNonDestructiveTest --- .../tests/integration/amqp/JMSNonDestructiveTest.java | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSNonDestructiveTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSNonDestructiveTest.java index 6fbb71d2fa..9b31b792a1 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSNonDestructiveTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSNonDestructiveTest.java @@ -332,7 +332,7 @@ public class JMSNonDestructiveTest extends JMSClientTestSupport { assertNotNull(msg); assertEquals(Integer.toString(j), msg.getText()); } - TextMessage msg = (TextMessage) consumer.receive(200); + TextMessage msg = (TextMessage) consumer.receiveNoWait(); assertNull(msg); consumer.close(); } @@ -356,7 +356,7 @@ public class JMSNonDestructiveTest extends JMSClientTestSupport { Session consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); Queue consumerQueue = consumerSession.createQueue(queueName); MessageConsumer consumer = consumerSession.createConsumer(consumerQueue); - TextMessage msg = (TextMessage) consumer.receive(2000); + TextMessage msg = (TextMessage) consumer.receiveNoWait(); assertNull(msg); consumer.close(); } @@ -395,9 +395,9 @@ public class JMSNonDestructiveTest extends JMSClientTestSupport { assertEquals(Integer.toString(j), msg.getText()); assertEquals(Integer.toString(j), msg2.getText()); } - TextMessage msg = (TextMessage) consumer.receive(200); + TextMessage msg = (TextMessage) consumer.receiveNoWait(); assertNull(msg); - TextMessage msg2 = (TextMessage) consumer2.receive(200); + TextMessage msg2 = (TextMessage) consumer2.receiveNoWait(); assertNull(msg2); consumer.close(); consumer2.close(); @@ -468,7 +468,7 @@ public class JMSNonDestructiveTest extends JMSClientTestSupport { try (Connection connection = consumerConnectionSupplier.createConnection(); Session session = connection.createSession(); MessageConsumer consumer = session.createConsumer(session.createQueue(queueName))) { - TextMessage msg = (TextMessage) consumer.receive(1000); + TextMessage msg = (TextMessage) consumer.receiveNoWait(); assertNull(msg); } }