diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java index bd6b8aae7b..0f47af124e 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java @@ -244,6 +244,8 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { private final ReusableLatch deliveriesInTransit = new ReusableLatch(0); + private volatile boolean caused = false; + private final AtomicLong queueRateCheckTime = new AtomicLong(System.currentTimeMillis()); private final AtomicLong messagesAddedSnapshot = new AtomicLong(0); @@ -593,7 +595,6 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { enterCritical(CRITICAL_PATH_ADD_HEAD); synchronized (this) { try { - flushDeliveriesInTransit(); if (!scheduling && scheduledDeliveryHandler.checkAndSchedule(ref, false)) { return; } @@ -613,7 +614,6 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { enterCritical(CRITICAL_PATH_ADD_HEAD); synchronized (this) { try { - flushDeliveriesInTransit(); for (MessageReference ref : refs) { addHead(ref, scheduling); } @@ -717,6 +717,10 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { private boolean flushDeliveriesInTransit() { try { + if (!deliveriesInTransit.await(100, TimeUnit.MILLISECONDS)) { + caused = true; + System.err.println("There are currently " + deliveriesInTransit.getCount() + " credits"); + } if (deliveriesInTransit.await(DELIVERY_TIMEOUT)) { return true; } else { @@ -835,8 +839,6 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { throw ActiveMQMessageBundle.BUNDLE.maxConsumerLimitReachedForQueue(address, name); } - flushDeliveriesInTransit(); - consumersChanged = true; if (!consumer.supportsDirectDelivery()) { diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ExtremeCancelsTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ExtremeCancelsTest.java new file mode 100644 index 0000000000..b2ac355c55 --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ExtremeCancelsTest.java @@ -0,0 +1,140 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package org.apache.activemq.artemis.tests.integration.amqp; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; +import javax.jms.TextMessage; +import java.util.Arrays; +import java.util.Collection; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.activemq.artemis.api.core.RoutingType; +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory; +import org.apache.qpid.jms.JmsConnectionFactory; +import org.junit.Assert; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +@RunWith(Parameterized.class) +public class ExtremeCancelsTest extends JMSClientTestSupport { + + private SimpleString anycastAddress = new SimpleString("theQueue"); + + + @Override + protected String getConfiguredProtocols() { + return "AMQP,OPENWIRE,CORE"; + } + + private boolean isAMQP; + + public ExtremeCancelsTest(boolean isAMQP) { + this.isAMQP = isAMQP; + } + + + @Parameterized.Parameters(name = "{index}: isAMQP={0}") + public static Collection parameters() { + return Arrays.asList(new Object[][] { + {true}, {false} + }); + } + + + @Test(timeout = 120000) + public void testLotsOfCloseOpenConsumer() throws Exception { + + server.createQueue(anycastAddress, RoutingType.ANYCAST, anycastAddress, null, true, false); + + AtomicInteger errors = new AtomicInteger(0); + AtomicBoolean runnning = new AtomicBoolean(true); + Runnable runnable = new Runnable() { + @Override + public void run() { + + try { + ConnectionFactory factory = createCF(); + + Connection connection = factory.createConnection(); + Session session = connection.createSession(); + connection.start(); + Queue queue = session.createQueue(anycastAddress.toString()); + + while (runnning.get()) { + MessageConsumer consumer = session.createConsumer(queue); + TextMessage message = (TextMessage)consumer.receive(100); + if (message != null) { + consumer.close(); + } + } + + + connection.close(); + + } catch (Exception e) { + e.printStackTrace(); + errors.incrementAndGet(); + } + } + }; + + Thread[] consumers = new Thread[10]; + + for (int i = 0; i < consumers.length; i++) { + consumers[i] = new Thread(runnable); + consumers[i].start(); + } + + ConnectionFactory factory = createCF(); + + Connection connection = factory.createConnection(); + Session session = connection.createSession(); + Queue queue = session.createQueue(anycastAddress.toString()); + MessageProducer producer = session.createProducer(queue); + + + final int NUMBER_OF_MESSAGES = 500; + + + for (int i = 0; i < NUMBER_OF_MESSAGES; i++) { + producer.send(session.createTextMessage("Hello guys " + i)); + } + + runnning.set(false); + + + for (Thread c : consumers) { + c.join(); + } + + Assert.assertEquals(0, errors.get()); + } + + private ConnectionFactory createCF() { + if (isAMQP) { + return new JmsConnectionFactory(getBrokerQpidJMSConnectionURI()); + } else { + return new ActiveMQConnectionFactory("tcp://localhost:5672"); + } + } + +}