This commit is contained in:
Justin Bertram 2020-01-09 19:41:14 -06:00
commit d6331b01f4
8 changed files with 156 additions and 5 deletions

View File

@ -27,7 +27,7 @@ public class Wait {
public static final long MAX_WAIT_MILLIS = 30 * 1000; public static final long MAX_WAIT_MILLIS = 30 * 1000;
public static final int SLEEP_MILLIS = 1000; public static final int SLEEP_MILLIS = 100;
public static final String DEFAULT_FAILURE_MESSAGE = "Condition wasn't met"; public static final String DEFAULT_FAILURE_MESSAGE = "Condition wasn't met";
public interface Condition { public interface Condition {

View File

@ -1949,6 +1949,9 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
QueueIterateAction messageAction) throws Exception { QueueIterateAction messageAction) throws Exception {
int count = 0; int count = 0;
int txCount = 0; int txCount = 0;
// This is to avoid scheduling depaging while iterQueue is happening
// this should minimize the use of the paged executor.
depagePending = true;
depageLock.lock(); depageLock.lock();
@ -2037,6 +2040,10 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
return count; return count;
} finally { } finally {
depageLock.unlock(); depageLock.unlock();
// to resume flow of depages, just in case
// as we disabled depaging during the execution of this method
depagePending = false;
forceDelivery();
} }
} }

View File

@ -93,6 +93,10 @@ public class QueueManagerImpl extends ReferenceCounterUtil implements QueueManag
} }
public QueueManagerImpl(ActiveMQServer server, SimpleString queueName) { public QueueManagerImpl(ActiveMQServer server, SimpleString queueName) {
// This needs to be an executor
// otherwise we may have dead-locks in certain cases such as failure,
// where consumers are closed after callbacks
super(server.getExecutorFactory().getExecutor());
this.server = server; this.server = server;
this.queueName = queueName; this.queueName = queueName;
this.setTask(this::doIt); this.setTask(this::doIt);

View File

@ -246,7 +246,7 @@ public class AddressingTest extends ActiveMQTestBase {
consumer.close(); consumer.close();
// the last consumer was closed so the queue should exist but be purged // the last consumer was closed so the queue should exist but be purged
assertNotNull(server.locateQueue(queueName)); assertNotNull(server.locateQueue(queueName));
assertEquals(0, queue.getMessageCount()); Wait.assertEquals(0, queue::getMessageCount);
// there are no consumers so no messages should be routed to the queue // there are no consumers so no messages should be routed to the queue
producer.send(session.createMessage(true)); producer.send(session.createMessage(true));

View File

@ -23,6 +23,7 @@ import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.tests.util.Wait;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
@ -52,7 +53,7 @@ public class AutoDeleteAddressTest extends ActiveMQTestBase {
server.createQueue(addressA, RoutingType.ANYCAST, queueA, null, null, true, false, false, false, true, 1, false, true); server.createQueue(addressA, RoutingType.ANYCAST, queueA, null, null, true, false, false, false, true, 1, false, true);
assertNotNull(server.getAddressInfo(addressA)); assertNotNull(server.getAddressInfo(addressA));
cf.createSession().createConsumer(queueA).close(); cf.createSession().createConsumer(queueA).close();
assertNull(server.getAddressInfo(addressA)); Wait.assertTrue(() -> server.getAddressInfo(addressA) == null);
} }
@Test @Test

View File

@ -23,6 +23,7 @@ import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.tests.util.Wait;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
@ -52,7 +53,7 @@ public class AutoDeleteQueueTest extends ActiveMQTestBase {
server.createQueue(addressA, RoutingType.ANYCAST, queueA, null, null, true, false, false, false, true, 1, false, true); server.createQueue(addressA, RoutingType.ANYCAST, queueA, null, null, true, false, false, false, true, 1, false, true);
assertNotNull(server.locateQueue(queueA)); assertNotNull(server.locateQueue(queueA));
cf.createSession().createConsumer(queueA).close(); cf.createSession().createConsumer(queueA).close();
assertNull(server.locateQueue(queueA)); Wait.assertTrue(() -> server.locateQueue(queueA) == null);
} }
@Test @Test

View File

@ -0,0 +1,136 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.paging;
import javax.jms.Connection;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.server.impl.QueueImpl;
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.tests.util.Wait;
import org.jboss.logging.Logger;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
public class TestDeadlockOnPurgePagingTest extends ActiveMQTestBase {
private static final Logger logger = Logger.getLogger(TestDeadlockOnPurgePagingTest.class);
protected ServerLocator locator;
protected ActiveMQServer server;
protected ClientSessionFactory sf;
static final int MESSAGE_SIZE = 1024; // 1k
static final int LARGE_MESSAGE_SIZE = 100 * 1024;
protected static final IntegrationTestLogger log = IntegrationTestLogger.LOGGER;
protected static final int RECEIVE_TIMEOUT = 5000;
protected static final int PAGE_MAX = 100 * 1024;
protected static final int PAGE_SIZE = 10 * 1024;
static final SimpleString ADDRESS = new SimpleString("SimpleAddress");
@Override
@Before
public void setUp() throws Exception {
super.setUp();
locator = createInVMNonHALocator();
}
@Test
public void testDeadlockOnPurge() throws Exception {
int NUMBER_OF_MESSAGES = 5000;
clearDataRecreateServerDirs();
Configuration config = createDefaultNettyConfig().setJournalSyncNonTransactional(false);
server = createServer(true, config, TestDeadlockOnPurgePagingTest.PAGE_SIZE, TestDeadlockOnPurgePagingTest.PAGE_MAX);
server.start();
String queue = "purgeQueue";
SimpleString ssQueue = new SimpleString(queue);
server.addAddressInfo(new AddressInfo(ssQueue, RoutingType.ANYCAST));
QueueImpl purgeQueue = (QueueImpl) server.createQueue(ssQueue, RoutingType.ANYCAST, ssQueue, null, true, false, 1, true, false);
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory();
Connection connection = cf.createConnection();
connection.start();
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
javax.jms.Queue jmsQueue = session.createQueue(queue);
MessageProducer producer = session.createProducer(jmsQueue);
for (int i = 0; i < 100; i++) {
producer.send(session.createTextMessage("hello" + i));
}
session.commit();
Wait.assertEquals(0, purgeQueue::getMessageCount);
Wait.assertEquals(0, purgeQueue.getPageSubscription().getPagingStore()::getAddressSize);
MessageConsumer consumer = session.createConsumer(jmsQueue);
for (int i = 0; i < NUMBER_OF_MESSAGES / 5; i++) {
producer.send(session.createTextMessage("hello" + i));
if (i == 10) {
purgeQueue.getPageSubscription().getPagingStore().startPaging();
}
if (i > 10 && i % 10 == 0) {
purgeQueue.getPageSubscription().getPagingStore().forceAnotherPage();
}
}
session.commit();
for (int i = 0; i < 100; i++) {
Message message = consumer.receive(5000);
Assert.assertNotNull(message);
}
session.commit();
consumer.close();
Wait.assertEquals(0L, purgeQueue::getMessageCount, 5000L, 10L);
Wait.assertFalse(purgeQueue.getPageSubscription()::isPaging, 5000L, 10L);
Wait.assertEquals(0L, purgeQueue.getPageSubscription().getPagingStore()::getAddressSize, 5000L, 10L);
}
}

View File

@ -306,7 +306,7 @@ public class StompTest extends StompTestBase {
// closing the consumer here should trigger auto-deletion // closing the consumer here should trigger auto-deletion
assertNotNull(server.getPostOffice().getBinding(new SimpleString(queue))); assertNotNull(server.getPostOffice().getBinding(new SimpleString(queue)));
consumer.close(); consumer.close();
assertNull(server.getPostOffice().getBinding(new SimpleString(queue))); Wait.assertTrue(() -> server.getPostOffice().getBinding(new SimpleString(queue)) == null);
} }
@Test @Test
@ -1662,6 +1662,8 @@ public class StompTest extends StompTestBase {
unsubscribe(conn, null, "/queue/" + ADDRESS, true, false); unsubscribe(conn, null, "/queue/" + ADDRESS, true, false);
Wait.assertTrue(() -> server.getAddressInfo(SimpleString.toSimpleString(ADDRESS)) == null);
// now subscribe to the address in a MULTICAST way which will create a MULTICAST queue for the subscription // now subscribe to the address in a MULTICAST way which will create a MULTICAST queue for the subscription
uuid = UUID.randomUUID().toString(); uuid = UUID.randomUUID().toString();
frame = conn.createFrame(Stomp.Commands.SUBSCRIBE) frame = conn.createFrame(Stomp.Commands.SUBSCRIBE)