From 8b7282d849fca896f6b9794a5ddfc251db947120 Mon Sep 17 00:00:00 2001 From: Clebert Suconic Date: Tue, 28 Nov 2017 21:08:05 -0500 Subject: [PATCH] ARTEMIS-1529 Fixing Ref count over asynchronous ack --- .../artemis/utils/ReferenceCounter.java | 15 ++++ .../artemis/utils/ReferenceCounterUtil.java | 54 ++++++++++--- .../artemis/utils/ReferenceCounterTest.java | 15 +++- .../amqp/broker/AMQPSessionCallback.java | 33 ++++++-- .../activemq/artemis/core/server/Queue.java | 4 + .../artemis/core/server/impl/QueueImpl.java | 20 +++++ .../core/server/impl/QueueManagerImpl.java | 80 ++++++++----------- .../core/server/impl/ServerConsumerImpl.java | 2 + .../impl/TransientQueueManagerImpl.java | 43 ++++------ .../impl/ScheduledDeliveryHandlerTest.java | 5 ++ .../integration/amqp/TopicDurableTests.java | 39 ++++----- .../integration/client/ConsumerTest.java | 18 ++++- .../unit/core/postoffice/impl/FakeQueue.java | 6 ++ 13 files changed, 214 insertions(+), 120 deletions(-) diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ReferenceCounter.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ReferenceCounter.java index 2f46fb1b94..423b6b4fcf 100644 --- a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ReferenceCounter.java +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ReferenceCounter.java @@ -21,4 +21,19 @@ public interface ReferenceCounter { int increment(); int decrement(); + + int getCount(); + + + void setTask(Runnable task); + + Runnable getTask(); + + /** + * Some asynchronous operations (like ack) may delay certain conditions. + * After met, during afterCompletion we may need to recheck certain values + * to make sure we won't get into a situation where the condition was met asynchronously and queues not removed. + */ + void check(); + } diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ReferenceCounterUtil.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ReferenceCounterUtil.java index 3f971fdea5..3ef97a9372 100644 --- a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ReferenceCounterUtil.java +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ReferenceCounterUtil.java @@ -21,7 +21,7 @@ import java.util.concurrent.atomic.AtomicInteger; public class ReferenceCounterUtil implements ReferenceCounter { - private final Runnable runnable; + private Runnable task; /** * If executor is null the runnable will be called within the same thread, otherwise the executor will be used @@ -30,15 +30,35 @@ public class ReferenceCounterUtil implements ReferenceCounter { private final AtomicInteger uses = new AtomicInteger(0); - public ReferenceCounterUtil(Runnable runnable) { - this(runnable, null); + public ReferenceCounterUtil() { + this.executor = null; + this.task = null; + } + + public ReferenceCounterUtil(Executor executor) { + this.executor = executor; } public ReferenceCounterUtil(Runnable runnable, Executor executor) { - this.runnable = runnable; + this.setTask(runnable); this.executor = executor; } + public ReferenceCounterUtil(Runnable runnable) { + this.setTask(runnable); + this.executor = null; + } + + @Override + public void setTask(Runnable task) { + this.task = task; + } + + @Override + public Runnable getTask() { + return task; + } + @Override public int increment() { return uses.incrementAndGet(); @@ -48,13 +68,29 @@ public class ReferenceCounterUtil implements ReferenceCounter { public int decrement() { int value = uses.decrementAndGet(); if (value == 0) { - if (executor != null) { - executor.execute(runnable); - } else { - runnable.run(); - } + execute(); } return value; } + + private void execute() { + if (executor != null) { + executor.execute(task); + } else { + task.run(); + } + } + + @Override + public void check() { + if (getCount() <= 0) { + execute(); + } + } + + @Override + public int getCount() { + return uses.get(); + } } diff --git a/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/ReferenceCounterTest.java b/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/ReferenceCounterTest.java index 865afffd21..7dbc9fbf9c 100644 --- a/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/ReferenceCounterTest.java +++ b/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/ReferenceCounterTest.java @@ -16,7 +16,6 @@ */ package org.apache.activemq.artemis.utils; -import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -30,12 +29,13 @@ public class ReferenceCounterTest extends Assert { class LatchRunner implements Runnable { - final CountDownLatch latch = new CountDownLatch(1); + final ReusableLatch latch = new ReusableLatch(1); final AtomicInteger counts = new AtomicInteger(0); - volatile Thread lastThreadUsed; + volatile Thread lastThreadUsed = Thread.currentThread(); @Override public void run() { + lastThreadUsed = Thread.currentThread(); counts.incrementAndGet(); latch.countDown(); } @@ -65,6 +65,15 @@ public class ReferenceCounterTest extends Assert { assertNotSame(runner.lastThreadUsed, Thread.currentThread()); + runner.latch.setCount(1); + runner.lastThreadUsed = Thread.currentThread(); + + // force a recheck + counter.check(); + + runner.latch.await(5, TimeUnit.SECONDS); + assertNotSame(runner.lastThreadUsed, Thread.currentThread()); + executor.shutdown(); } diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java index 14e13b11ed..587367bd8c 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java @@ -16,7 +16,9 @@ */ package org.apache.activemq.artemis.protocol.amqp.broker; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executor; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.activemq.artemis.api.core.ActiveMQAddressExistsException; @@ -43,6 +45,7 @@ import org.apache.activemq.artemis.core.server.impl.AddressInfo; import org.apache.activemq.artemis.core.server.impl.ServerConsumerImpl; import org.apache.activemq.artemis.core.transaction.Transaction; import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPException; +import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPInternalErrorException; import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPResourceLimitExceededException; import org.apache.activemq.artemis.protocol.amqp.logger.ActiveMQAMQPProtocolMessageBundle; import org.apache.activemq.artemis.protocol.amqp.proton.AMQPConnectionContext; @@ -344,22 +347,40 @@ public class AMQPSessionCallback implements SessionCallback { public void closeSender(final Object brokerConsumer) throws Exception { final ServerConsumer consumer = ((ServerConsumer) brokerConsumer); + final CountDownLatch latch = new CountDownLatch(1); - serverSession.getSessionContext().executeOnCompletion(new IOCallback() { + Runnable runnable = new Runnable() { @Override - public void done() { + public void run() { try { consumer.close(false); + latch.countDown(); } catch (Exception e) { - logger.warn(e.getMessage(), e); } } + }; - @Override - public void onError(int errorCode, String errorMessage) { + // Due to the nature of proton this could be happening within flushes from the queue-delivery (depending on how it happened on the protocol) + // to avoid deadlocks the close has to be done outside of the main thread on an executor + // otherwise you could get a deadlock + Executor executor = protonSPI.getExeuctor(); + + if (executor != null) { + executor.execute(runnable); + } else { + runnable.run(); + } + + try { + // a short timeout will do.. 1 second is already long enough + if (!latch.await(1, TimeUnit.SECONDS)) { + logger.debug("Could not close consumer on time"); } - }); + } catch (InterruptedException e) { + throw new ActiveMQAMQPInternalErrorException("Unable to close consumers for queue: " + consumer.getQueue()); + } + consumer.getQueue().recheckRefCount(serverSession.getSessionContext()); } public String tempQueueName() { diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java index 9a348375fe..844a49dc8c 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java @@ -27,6 +27,7 @@ import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.filter.Filter; import org.apache.activemq.artemis.core.paging.cursor.PageSubscription; +import org.apache.activemq.artemis.core.persistence.OperationContext; import org.apache.activemq.artemis.core.server.impl.AckReason; import org.apache.activemq.artemis.core.transaction.Transaction; import org.apache.activemq.artemis.utils.ReferenceCounter; @@ -298,4 +299,7 @@ public interface Queue extends Bindable,CriticalComponent { void decDelivering(int size); + /** This is to perform a check on the counter again */ + void recheckRefCount(OperationContext context); + } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java index 0f47af124e..31a4869510 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java @@ -52,6 +52,7 @@ import org.apache.activemq.artemis.core.io.IOCallback; import org.apache.activemq.artemis.core.paging.cursor.PagePosition; import org.apache.activemq.artemis.core.paging.cursor.PageSubscription; import org.apache.activemq.artemis.core.paging.cursor.PagedReference; +import org.apache.activemq.artemis.core.persistence.OperationContext; import org.apache.activemq.artemis.core.persistence.QueueStatus; import org.apache.activemq.artemis.core.persistence.StorageManager; import org.apache.activemq.artemis.core.postoffice.Binding; @@ -2942,6 +2943,25 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { return BigDecimal.valueOf((locaMessageAdded - messagesAddedSnapshot.getAndSet(locaMessageAdded)) / timeSlice).setScale(2, BigDecimal.ROUND_UP).floatValue(); } + @Override + public void recheckRefCount(OperationContext context) { + ReferenceCounter refCount = refCountForConsumers; + if (refCount != null) { + context.executeOnCompletion(new IOCallback() { + @Override + public void done() { + refCount.check(); + } + + @Override + public void onError(int errorCode, String errorMessage) { + + } + }); + } + + } + // Inner classes // -------------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueManagerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueManagerImpl.java index 82a700f0a5..be83acaca9 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueManagerImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueManagerImpl.java @@ -19,71 +19,57 @@ package org.apache.activemq.artemis.core.server.impl; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; -import org.apache.activemq.artemis.core.server.QueueManager; import org.apache.activemq.artemis.core.server.Queue; +import org.apache.activemq.artemis.core.server.QueueManager; import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.utils.ReferenceCounterUtil; -public class QueueManagerImpl implements QueueManager { +public class QueueManagerImpl extends ReferenceCounterUtil implements QueueManager { private final SimpleString queueName; private final ActiveMQServer server; - private final Runnable runnable = new Runnable() { - @Override - public void run() { - Queue queue = server.locateQueue(queueName); - //the queue may already have been deleted and this is a result of that - if (queue == null) { - if (ActiveMQServerLogger.LOGGER.isDebugEnabled()) { - ActiveMQServerLogger.LOGGER.debug("pno queue to delete \"" + queueName + ".\""); - } - return; + private void doIt() { + Queue queue = server.locateQueue(queueName); + //the queue may already have been deleted and this is a result of that + if (queue == null) { + if (ActiveMQServerLogger.LOGGER.isDebugEnabled()) { + ActiveMQServerLogger.LOGGER.debug("pno queue to delete \"" + queueName + ".\""); } - SimpleString address = queue.getAddress(); - AddressSettings settings = server.getAddressSettingsRepository().getMatch(address.toString()); - long consumerCount = queue.getConsumerCount(); - long messageCount = queue.getMessageCount(); + return; + } + SimpleString address = queue.getAddress(); + AddressSettings settings = server.getAddressSettingsRepository().getMatch(address.toString()); + long consumerCount = queue.getConsumerCount(); + long messageCount = queue.getMessageCount(); - if (queue.isAutoCreated() && settings.isAutoDeleteQueues() && queue.getMessageCount() == 0) { - if (ActiveMQServerLogger.LOGGER.isDebugEnabled()) { - ActiveMQServerLogger.LOGGER.debug("deleting " + (queue.isAutoCreated() ? "auto-created " : "") + "queue \"" + queueName + ".\" consumerCount = " + consumerCount + "; messageCount = " + messageCount + "; isAutoDeleteQueues = " + settings.isAutoDeleteQueues()); - } + if (queue.isAutoCreated() && settings.isAutoDeleteQueues() && queue.getMessageCount() == 0 && queue.getConsumerCount() == 0) { + if (ActiveMQServerLogger.LOGGER.isDebugEnabled()) { + ActiveMQServerLogger.LOGGER.debug("deleting " + (queue.isAutoCreated() ? "auto-created " : "") + "queue \"" + queueName + ".\" consumerCount = " + consumerCount + "; messageCount = " + messageCount + "; isAutoDeleteQueues = " + settings.isAutoDeleteQueues()); + } - try { - server.destroyQueue(queueName, null, true, false); - } catch (Exception e) { - ActiveMQServerLogger.LOGGER.errorRemovingAutoCreatedQueue(e, queueName); - } - } else if (queue.isPurgeOnNoConsumers()) { - if (ActiveMQServerLogger.LOGGER.isDebugEnabled()) { - ActiveMQServerLogger.LOGGER.debug("purging queue \"" + queueName + ".\" consumerCount = " + consumerCount + "; messageCount = " + messageCount); - } - try { - queue.deleteAllReferences(); - } catch (Exception e) { - ActiveMQServerLogger.LOGGER.failedToPurgeQueue(e, queueName); - } + try { + server.destroyQueue(queueName, null, true, false); + } catch (Exception e) { + ActiveMQServerLogger.LOGGER.errorRemovingAutoCreatedQueue(e, queueName); + } + } else if (queue.isPurgeOnNoConsumers()) { + if (ActiveMQServerLogger.LOGGER.isDebugEnabled()) { + ActiveMQServerLogger.LOGGER.debug("purging queue \"" + queueName + ".\" consumerCount = " + consumerCount + "; messageCount = " + messageCount); + } + try { + queue.deleteAllReferences(); + } catch (Exception e) { + ActiveMQServerLogger.LOGGER.failedToPurgeQueue(e, queueName); } } - }; - - private final ReferenceCounterUtil referenceCounterUtil = new ReferenceCounterUtil(runnable); + } public QueueManagerImpl(ActiveMQServer server, SimpleString queueName) { this.server = server; this.queueName = queueName; - } - - @Override - public int increment() { - return referenceCounterUtil.increment(); - } - - @Override - public int decrement() { - return referenceCounterUtil.decrement(); + this.setTask(this::doIt); } @Override diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java index 8e64a211e8..36aa4e2d83 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java @@ -508,6 +508,8 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener { tx.rollback(); + messageQueue.recheckRefCount(session.getSessionContext()); + if (!browseOnly) { TypedProperties props = new TypedProperties(); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/TransientQueueManagerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/TransientQueueManagerImpl.java index 125c9fe762..ab14479fac 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/TransientQueueManagerImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/TransientQueueManagerImpl.java @@ -24,7 +24,7 @@ import org.apache.activemq.artemis.core.server.TransientQueueManager; import org.apache.activemq.artemis.utils.ReferenceCounterUtil; import org.jboss.logging.Logger; -public class TransientQueueManagerImpl implements TransientQueueManager { +public class TransientQueueManagerImpl extends ReferenceCounterUtil implements TransientQueueManager { private static final Logger logger = Logger.getLogger(TransientQueueManagerImpl.class); @@ -32,41 +32,28 @@ public class TransientQueueManagerImpl implements TransientQueueManager { private final ActiveMQServer server; - private final Runnable runnable = new Runnable() { - @Override - public void run() { - try { - if (logger.isDebugEnabled()) { - logger.debug("deleting temporary queue " + queueName); - } - - try { - server.destroyQueue(queueName, null, false); - } catch (ActiveMQException e) { - ActiveMQServerLogger.LOGGER.errorOnDeletingQueue(queueName.toString(), e); - } - } catch (Exception e) { - ActiveMQServerLogger.LOGGER.errorRemovingTempQueue(e, queueName); + private void doIt() { + try { + if (logger.isDebugEnabled()) { + logger.debug("deleting temporary queue " + queueName); } - } - }; - private final ReferenceCounterUtil referenceCounterUtil = new ReferenceCounterUtil(runnable); + try { + server.destroyQueue(queueName, null, false); + } catch (ActiveMQException e) { + ActiveMQServerLogger.LOGGER.errorOnDeletingQueue(queueName.toString(), e); + } + } catch (Exception e) { + ActiveMQServerLogger.LOGGER.errorRemovingTempQueue(e, queueName); + } + } public TransientQueueManagerImpl(ActiveMQServer server, SimpleString queueName) { this.server = server; this.queueName = queueName; - } - @Override - public int increment() { - return referenceCounterUtil.increment(); - } - - @Override - public int decrement() { - return referenceCounterUtil.decrement(); + this.setTask(this::doIt); } @Override diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java index ddf702e53f..27071901bb 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java @@ -40,6 +40,7 @@ import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.filter.Filter; import org.apache.activemq.artemis.core.message.impl.CoreMessage; import org.apache.activemq.artemis.core.paging.cursor.PageSubscription; +import org.apache.activemq.artemis.core.persistence.OperationContext; import org.apache.activemq.artemis.core.persistence.Persister; import org.apache.activemq.artemis.core.server.Consumer; import org.apache.activemq.artemis.core.server.MessageReference; @@ -777,6 +778,10 @@ public class ScheduledDeliveryHandlerTest extends Assert { } + @Override + public void recheckRefCount(OperationContext context) { + } + @Override public void unproposed(SimpleString groupID) { diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/TopicDurableTests.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/TopicDurableTests.java index 0a1a9d531b..8ba922d02a 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/TopicDurableTests.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/TopicDurableTests.java @@ -42,11 +42,8 @@ import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.IntStream; -import org.apache.activemq.artemis.core.config.DivertConfiguration; import org.apache.activemq.artemis.core.server.ActiveMQServer; -import org.apache.activemq.artemis.core.server.DivertConfigurationRoutingType; import org.apache.qpid.jms.JmsConnectionFactory; -import org.junit.Assert; import org.junit.Test; import static org.hamcrest.CoreMatchers.is; @@ -61,19 +58,11 @@ public class TopicDurableTests extends JMSClientTestSupport { @Test public void testMessageDurableSubscription() throws Exception { - for (int i = 0; i < 100; i++) { - testLoop(); - tearDown(); - setUp(); - } - } - - private void testLoop() throws Exception { JmsConnectionFactory connectionFactory = new JmsConnectionFactory(getBrokerQpidJMSConnectionURI() + "?jms.clientID=jmsTopicClient"); Connection connection = connectionFactory.createConnection(); connection.start(); - System.err.println("testMessageDurableSubscription"); + System.out.println("testMessageDurableSubscription"); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Topic testTopic = session.createTopic("jmsTopic"); @@ -87,39 +76,39 @@ public class TopicDurableTests extends JMSClientTestSupport { String batchPrefix = "First"; List listMsgs = generateMessages(session, batchPrefix, count); sendMessages(messageProducer, listMsgs); - System.err.println("First batch messages sent"); + System.out.println("First batch messages sent"); List recvd1 = receiveMessages(subscriber1, count); List recvd2 = receiveMessages(subscriber2, count); assertThat(recvd1.size(), is(count)); assertMessageContent(recvd1, batchPrefix); - System.err.println(sub1ID + " :First batch messages received"); + System.out.println(sub1ID + " :First batch messages received"); assertThat(recvd2.size(), is(count)); assertMessageContent(recvd2, batchPrefix); - System.err.println(sub2ID + " :First batch messages received"); + System.out.println(sub2ID + " :First batch messages received"); subscriber1.close(); - System.err.println(sub1ID + " : closed"); + System.out.println(sub1ID + " : closed"); batchPrefix = "Second"; listMsgs = generateMessages(session, batchPrefix, count); sendMessages(messageProducer, listMsgs); - System.err.println("Second batch messages sent"); + System.out.println("Second batch messages sent"); recvd2 = receiveMessages(subscriber2, count); assertThat(recvd2.size(), is(count)); assertMessageContent(recvd2, batchPrefix); - System.err.println(sub2ID + " :Second batch messages received"); + System.out.println(sub2ID + " :Second batch messages received"); subscriber1 = session.createDurableSubscriber(testTopic, sub1ID); - System.err.println(sub1ID + " :connected"); + System.out.println(sub1ID + " :connected"); recvd1 = receiveMessages(subscriber1, count); assertThat(recvd1.size(), is(count)); assertMessageContent(recvd1, batchPrefix); - System.err.println(sub1ID + " :Second batch messages received"); + System.out.println(sub1ID + " :Second batch messages received"); subscriber1.close(); subscriber2.close(); @@ -131,9 +120,9 @@ public class TopicDurableTests extends JMSClientTestSupport { @Test public void testSharedNonDurableSubscription() throws JMSException, NamingException, InterruptedException, ExecutionException, TimeoutException { - int iterations = 100; + int iterations = 10; for (int i = 0; i < iterations; i++) { - System.err.println("testSharedNonDurableSubscription; iteration: " + i); + System.out.println("testSharedNonDurableSubscription; iteration: " + i); //SETUP-START JmsConnectionFactory connectionFactory1 = new JmsConnectionFactory(getBrokerQpidJMSConnectionURI()); Connection connection1 = connectionFactory1.createConnection(); @@ -167,14 +156,14 @@ public class TopicDurableTests extends JMSClientTestSupport { List listMsgs = generateMessages(session, count); List>> results = receiveMessagesAsync(count, subscriber1, subscriber2, subscriber3); sendMessages(messageProducer, listMsgs); - System.err.println("messages sent"); + System.out.println("messages sent"); assertThat("Each message should be received only by one consumer", results.get(0).get(20, TimeUnit.SECONDS).size() + results.get(1).get(20, TimeUnit.SECONDS).size() + results.get(2).get(20, TimeUnit.SECONDS).size(), is(count)); - System.err.println("messages received"); + System.out.println("messages received"); //BODY-E //TEAR-DOWN-S @@ -255,7 +244,7 @@ public class TopicDurableTests extends JMSClientTestSupport { resultsList.add(new CompletableFuture<>()); receivedResList.add(new ArrayList<>()); MessageListener myListener = message -> { - System.err.println("Mesages received" + message + " count: " + totalCount.get()); + System.out.println("Mesages received" + message + " count: " + totalCount.get()); receivedResList.get(index).add(message); if (totalCount.decrementAndGet() == 0) { for (int j = 0; j < consumer.length; j++) { diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ConsumerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ConsumerTest.java index ef5334415a..0b36e18928 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ConsumerTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ConsumerTest.java @@ -266,7 +266,21 @@ public class ConsumerTest extends ActiveMQTestBase { } @Test - public void testAutoCreateCOnConsumer() throws Throwable { + public void testAutoCreateCOnConsumerAMQP() throws Throwable { + testAutoCreate(2); + } + + @Test + public void testAutoCreateCOnConsumerCore() throws Throwable { + testAutoCreate(1); + } + + @Test + public void testAutoCreateCOnConsumerOpenWire() throws Throwable { + testAutoCreate(3); + } + + private void testAutoCreate(int protocol) throws Throwable { final SimpleString thisQueue = SimpleString.toSimpleString("ThisQueue"); if (!isNetty()) { @@ -275,7 +289,7 @@ public class ConsumerTest extends ActiveMQTestBase { } for (int i = 0; i < 10; i++) { - ConnectionFactory factorySend = createFactory(2); + ConnectionFactory factorySend = createFactory(protocol); Connection connection = factorySend.createConnection(); try { diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java index 54cae7b062..f654ed5782 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java @@ -27,6 +27,7 @@ import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.filter.Filter; import org.apache.activemq.artemis.core.paging.cursor.PageSubscription; +import org.apache.activemq.artemis.core.persistence.OperationContext; import org.apache.activemq.artemis.core.server.Consumer; import org.apache.activemq.artemis.core.server.MessageReference; import org.apache.activemq.artemis.core.server.Queue; @@ -75,6 +76,11 @@ public class FakeQueue extends CriticalComponentImpl implements Queue { } + @Override + public void recheckRefCount(OperationContext context) { + + } + @Override public boolean isPersistedPause() { return false;