diff --git a/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/Wait.java b/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/Wait.java index 870fbf2dc2..6cb011ba47 100644 --- a/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/Wait.java +++ b/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/Wait.java @@ -27,7 +27,7 @@ public class Wait { public static final long MAX_WAIT_MILLIS = 30 * 1000; - public static final int SLEEP_MILLIS = 1000; + public static final int SLEEP_MILLIS = 100; public static final String DEFAULT_FAILURE_MESSAGE = "Condition wasn't met"; public interface Condition { diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java index 212a68e5ce..d03494a704 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java @@ -1949,6 +1949,9 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { QueueIterateAction messageAction) throws Exception { int count = 0; int txCount = 0; + // This is to avoid scheduling depaging while iterQueue is happening + // this should minimize the use of the paged executor. + depagePending = true; depageLock.lock(); @@ -2037,6 +2040,10 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { return count; } finally { depageLock.unlock(); + // to resume flow of depages, just in case + // as we disabled depaging during the execution of this method + depagePending = false; + forceDelivery(); } } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueManagerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueManagerImpl.java index eab87712a9..b20b537923 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueManagerImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueManagerImpl.java @@ -93,6 +93,10 @@ public class QueueManagerImpl extends ReferenceCounterUtil implements QueueManag } public QueueManagerImpl(ActiveMQServer server, SimpleString queueName) { + // This needs to be an executor + // otherwise we may have dead-locks in certain cases such as failure, + // where consumers are closed after callbacks + super(server.getExecutorFactory().getExecutor()); this.server = server; this.queueName = queueName; this.setTask(this::doIt); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/addressing/AddressingTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/addressing/AddressingTest.java index 37e365ba80..5a905b823a 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/addressing/AddressingTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/addressing/AddressingTest.java @@ -246,7 +246,7 @@ public class AddressingTest extends ActiveMQTestBase { consumer.close(); // the last consumer was closed so the queue should exist but be purged assertNotNull(server.locateQueue(queueName)); - assertEquals(0, queue.getMessageCount()); + Wait.assertEquals(0, queue::getMessageCount); // there are no consumers so no messages should be routed to the queue producer.send(session.createMessage(true)); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AutoDeleteAddressTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AutoDeleteAddressTest.java index 5242de2fb2..ee78bda2ad 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AutoDeleteAddressTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AutoDeleteAddressTest.java @@ -23,6 +23,7 @@ import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; +import org.apache.activemq.artemis.tests.util.Wait; import org.junit.Before; import org.junit.Test; @@ -52,7 +53,7 @@ public class AutoDeleteAddressTest extends ActiveMQTestBase { server.createQueue(addressA, RoutingType.ANYCAST, queueA, null, null, true, false, false, false, true, 1, false, true); assertNotNull(server.getAddressInfo(addressA)); cf.createSession().createConsumer(queueA).close(); - assertNull(server.getAddressInfo(addressA)); + Wait.assertTrue(() -> server.getAddressInfo(addressA) == null); } @Test diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AutoDeleteQueueTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AutoDeleteQueueTest.java index 0f9c09eadd..8603e7b166 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AutoDeleteQueueTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AutoDeleteQueueTest.java @@ -23,6 +23,7 @@ import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; +import org.apache.activemq.artemis.tests.util.Wait; import org.junit.Before; import org.junit.Test; @@ -52,7 +53,7 @@ public class AutoDeleteQueueTest extends ActiveMQTestBase { server.createQueue(addressA, RoutingType.ANYCAST, queueA, null, null, true, false, false, false, true, 1, false, true); assertNotNull(server.locateQueue(queueA)); cf.createSession().createConsumer(queueA).close(); - assertNull(server.locateQueue(queueA)); + Wait.assertTrue(() -> server.locateQueue(queueA) == null); } @Test diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/TestDeadlockOnPurgePagingTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/TestDeadlockOnPurgePagingTest.java new file mode 100644 index 0000000000..425a2ad00a --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/TestDeadlockOnPurgePagingTest.java @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.paging; + +import javax.jms.Connection; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; + +import org.apache.activemq.artemis.api.core.RoutingType; +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.api.core.client.ClientSessionFactory; +import org.apache.activemq.artemis.api.core.client.ServerLocator; +import org.apache.activemq.artemis.core.config.Configuration; +import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.core.server.impl.AddressInfo; +import org.apache.activemq.artemis.core.server.impl.QueueImpl; +import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory; +import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger; +import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; +import org.apache.activemq.artemis.tests.util.Wait; +import org.jboss.logging.Logger; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +public class TestDeadlockOnPurgePagingTest extends ActiveMQTestBase { + + private static final Logger logger = Logger.getLogger(TestDeadlockOnPurgePagingTest.class); + + protected ServerLocator locator; + protected ActiveMQServer server; + protected ClientSessionFactory sf; + static final int MESSAGE_SIZE = 1024; // 1k + static final int LARGE_MESSAGE_SIZE = 100 * 1024; + + protected static final IntegrationTestLogger log = IntegrationTestLogger.LOGGER; + + protected static final int RECEIVE_TIMEOUT = 5000; + + protected static final int PAGE_MAX = 100 * 1024; + + protected static final int PAGE_SIZE = 10 * 1024; + + static final SimpleString ADDRESS = new SimpleString("SimpleAddress"); + + @Override + @Before + public void setUp() throws Exception { + super.setUp(); + locator = createInVMNonHALocator(); + } + + @Test + public void testDeadlockOnPurge() throws Exception { + + int NUMBER_OF_MESSAGES = 5000; + clearDataRecreateServerDirs(); + + Configuration config = createDefaultNettyConfig().setJournalSyncNonTransactional(false); + + server = createServer(true, config, TestDeadlockOnPurgePagingTest.PAGE_SIZE, TestDeadlockOnPurgePagingTest.PAGE_MAX); + + server.start(); + + String queue = "purgeQueue"; + SimpleString ssQueue = new SimpleString(queue); + server.addAddressInfo(new AddressInfo(ssQueue, RoutingType.ANYCAST)); + QueueImpl purgeQueue = (QueueImpl) server.createQueue(ssQueue, RoutingType.ANYCAST, ssQueue, null, true, false, 1, true, false); + + ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(); + Connection connection = cf.createConnection(); + + connection.start(); + + Session session = connection.createSession(true, Session.SESSION_TRANSACTED); + javax.jms.Queue jmsQueue = session.createQueue(queue); + + MessageProducer producer = session.createProducer(jmsQueue); + + for (int i = 0; i < 100; i++) { + producer.send(session.createTextMessage("hello" + i)); + } + session.commit(); + + Wait.assertEquals(0, purgeQueue::getMessageCount); + + Wait.assertEquals(0, purgeQueue.getPageSubscription().getPagingStore()::getAddressSize); + + MessageConsumer consumer = session.createConsumer(jmsQueue); + + for (int i = 0; i < NUMBER_OF_MESSAGES / 5; i++) { + producer.send(session.createTextMessage("hello" + i)); + if (i == 10) { + purgeQueue.getPageSubscription().getPagingStore().startPaging(); + } + + if (i > 10 && i % 10 == 0) { + purgeQueue.getPageSubscription().getPagingStore().forceAnotherPage(); + } + } + session.commit(); + + + for (int i = 0; i < 100; i++) { + Message message = consumer.receive(5000); + Assert.assertNotNull(message); + } + session.commit(); + + consumer.close(); + + Wait.assertEquals(0L, purgeQueue::getMessageCount, 5000L, 10L); + + Wait.assertFalse(purgeQueue.getPageSubscription()::isPaging, 5000L, 10L); + + Wait.assertEquals(0L, purgeQueue.getPageSubscription().getPagingStore()::getAddressSize, 5000L, 10L); + } + + +} diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java index 3bc332bb08..73d74c094a 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java @@ -306,7 +306,7 @@ public class StompTest extends StompTestBase { // closing the consumer here should trigger auto-deletion assertNotNull(server.getPostOffice().getBinding(new SimpleString(queue))); consumer.close(); - assertNull(server.getPostOffice().getBinding(new SimpleString(queue))); + Wait.assertTrue(() -> server.getPostOffice().getBinding(new SimpleString(queue)) == null); } @Test @@ -1662,6 +1662,8 @@ public class StompTest extends StompTestBase { unsubscribe(conn, null, "/queue/" + ADDRESS, true, false); + Wait.assertTrue(() -> server.getAddressInfo(SimpleString.toSimpleString(ADDRESS)) == null); + // now subscribe to the address in a MULTICAST way which will create a MULTICAST queue for the subscription uuid = UUID.randomUUID().toString(); frame = conn.createFrame(Stomp.Commands.SUBSCRIBE)