ARTEMIS-450 Fixing deadlock over lots of rollbacks and Queue.addHead

This commit is contained in:
Clebert Suconic 2017-10-17 15:54:49 -04:00
parent c66a7975e6
commit fdcae9d32a
2 changed files with 146 additions and 4 deletions

View File

@ -244,6 +244,8 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
private final ReusableLatch deliveriesInTransit = new ReusableLatch(0);
private volatile boolean caused = false;
private final AtomicLong queueRateCheckTime = new AtomicLong(System.currentTimeMillis());
private final AtomicLong messagesAddedSnapshot = new AtomicLong(0);
@ -593,7 +595,6 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
enterCritical(CRITICAL_PATH_ADD_HEAD);
synchronized (this) {
try {
flushDeliveriesInTransit();
if (!scheduling && scheduledDeliveryHandler.checkAndSchedule(ref, false)) {
return;
}
@ -613,7 +614,6 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
enterCritical(CRITICAL_PATH_ADD_HEAD);
synchronized (this) {
try {
flushDeliveriesInTransit();
for (MessageReference ref : refs) {
addHead(ref, scheduling);
}
@ -717,6 +717,10 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
private boolean flushDeliveriesInTransit() {
try {
if (!deliveriesInTransit.await(100, TimeUnit.MILLISECONDS)) {
caused = true;
System.err.println("There are currently " + deliveriesInTransit.getCount() + " credits");
}
if (deliveriesInTransit.await(DELIVERY_TIMEOUT)) {
return true;
} else {
@ -835,8 +839,6 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
throw ActiveMQMessageBundle.BUNDLE.maxConsumerLimitReachedForQueue(address, name);
}
flushDeliveriesInTransit();
consumersChanged = true;
if (!consumer.supportsDirectDelivery()) {

View File

@ -0,0 +1,140 @@
/*
* Copyright 2005-2014 Red Hat, Inc.
* Red Hat licenses this file to you under the Apache License, version
* 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
* implied. See the License for the specific language governing
* permissions and limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.amqp;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import java.util.Arrays;
import java.util.Collection;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import org.apache.qpid.jms.JmsConnectionFactory;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@RunWith(Parameterized.class)
public class ExtremeCancelsTest extends JMSClientTestSupport {
private SimpleString anycastAddress = new SimpleString("theQueue");
@Override
protected String getConfiguredProtocols() {
return "AMQP,OPENWIRE,CORE";
}
private boolean isAMQP;
public ExtremeCancelsTest(boolean isAMQP) {
this.isAMQP = isAMQP;
}
@Parameterized.Parameters(name = "{index}: isAMQP={0}")
public static Collection<Object[]> parameters() {
return Arrays.asList(new Object[][] {
{true}, {false}
});
}
@Test(timeout = 120000)
public void testLotsOfCloseOpenConsumer() throws Exception {
server.createQueue(anycastAddress, RoutingType.ANYCAST, anycastAddress, null, true, false);
AtomicInteger errors = new AtomicInteger(0);
AtomicBoolean runnning = new AtomicBoolean(true);
Runnable runnable = new Runnable() {
@Override
public void run() {
try {
ConnectionFactory factory = createCF();
Connection connection = factory.createConnection();
Session session = connection.createSession();
connection.start();
Queue queue = session.createQueue(anycastAddress.toString());
while (runnning.get()) {
MessageConsumer consumer = session.createConsumer(queue);
TextMessage message = (TextMessage)consumer.receive(100);
if (message != null) {
consumer.close();
}
}
connection.close();
} catch (Exception e) {
e.printStackTrace();
errors.incrementAndGet();
}
}
};
Thread[] consumers = new Thread[10];
for (int i = 0; i < consumers.length; i++) {
consumers[i] = new Thread(runnable);
consumers[i].start();
}
ConnectionFactory factory = createCF();
Connection connection = factory.createConnection();
Session session = connection.createSession();
Queue queue = session.createQueue(anycastAddress.toString());
MessageProducer producer = session.createProducer(queue);
final int NUMBER_OF_MESSAGES = 500;
for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
producer.send(session.createTextMessage("Hello guys " + i));
}
runnning.set(false);
for (Thread c : consumers) {
c.join();
}
Assert.assertEquals(0, errors.get());
}
private ConnectionFactory createCF() {
if (isAMQP) {
return new JmsConnectionFactory(getBrokerQpidJMSConnectionURI());
} else {
return new ActiveMQConnectionFactory("tcp://localhost:5672");
}
}
}