ARTEMIS-1529 Fixing Ref count over asynchronous ack

This commit is contained in:
Clebert Suconic 2017-11-28 21:08:05 -05:00
parent dbb3aaddf6
commit 8b7282d849
13 changed files with 214 additions and 120 deletions

View File

@ -21,4 +21,19 @@ public interface ReferenceCounter {
int increment(); int increment();
int decrement(); int decrement();
int getCount();
void setTask(Runnable task);
Runnable getTask();
/**
* Some asynchronous operations (like ack) may delay certain conditions.
* After met, during afterCompletion we may need to recheck certain values
* to make sure we won't get into a situation where the condition was met asynchronously and queues not removed.
*/
void check();
} }

View File

@ -21,7 +21,7 @@ import java.util.concurrent.atomic.AtomicInteger;
public class ReferenceCounterUtil implements ReferenceCounter { public class ReferenceCounterUtil implements ReferenceCounter {
private final Runnable runnable; private Runnable task;
/** /**
* If executor is null the runnable will be called within the same thread, otherwise the executor will be used * If executor is null the runnable will be called within the same thread, otherwise the executor will be used
@ -30,15 +30,35 @@ public class ReferenceCounterUtil implements ReferenceCounter {
private final AtomicInteger uses = new AtomicInteger(0); private final AtomicInteger uses = new AtomicInteger(0);
public ReferenceCounterUtil(Runnable runnable) { public ReferenceCounterUtil() {
this(runnable, null); this.executor = null;
this.task = null;
}
public ReferenceCounterUtil(Executor executor) {
this.executor = executor;
} }
public ReferenceCounterUtil(Runnable runnable, Executor executor) { public ReferenceCounterUtil(Runnable runnable, Executor executor) {
this.runnable = runnable; this.setTask(runnable);
this.executor = executor; this.executor = executor;
} }
public ReferenceCounterUtil(Runnable runnable) {
this.setTask(runnable);
this.executor = null;
}
@Override
public void setTask(Runnable task) {
this.task = task;
}
@Override
public Runnable getTask() {
return task;
}
@Override @Override
public int increment() { public int increment() {
return uses.incrementAndGet(); return uses.incrementAndGet();
@ -48,13 +68,29 @@ public class ReferenceCounterUtil implements ReferenceCounter {
public int decrement() { public int decrement() {
int value = uses.decrementAndGet(); int value = uses.decrementAndGet();
if (value == 0) { if (value == 0) {
if (executor != null) { execute();
executor.execute(runnable);
} else {
runnable.run();
}
} }
return value; return value;
} }
private void execute() {
if (executor != null) {
executor.execute(task);
} else {
task.run();
}
}
@Override
public void check() {
if (getCount() <= 0) {
execute();
}
}
@Override
public int getCount() {
return uses.get();
}
} }

View File

@ -16,7 +16,6 @@
*/ */
package org.apache.activemq.artemis.utils; package org.apache.activemq.artemis.utils;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
@ -30,12 +29,13 @@ public class ReferenceCounterTest extends Assert {
class LatchRunner implements Runnable { class LatchRunner implements Runnable {
final CountDownLatch latch = new CountDownLatch(1); final ReusableLatch latch = new ReusableLatch(1);
final AtomicInteger counts = new AtomicInteger(0); final AtomicInteger counts = new AtomicInteger(0);
volatile Thread lastThreadUsed; volatile Thread lastThreadUsed = Thread.currentThread();
@Override @Override
public void run() { public void run() {
lastThreadUsed = Thread.currentThread();
counts.incrementAndGet(); counts.incrementAndGet();
latch.countDown(); latch.countDown();
} }
@ -65,6 +65,15 @@ public class ReferenceCounterTest extends Assert {
assertNotSame(runner.lastThreadUsed, Thread.currentThread()); assertNotSame(runner.lastThreadUsed, Thread.currentThread());
runner.latch.setCount(1);
runner.lastThreadUsed = Thread.currentThread();
// force a recheck
counter.check();
runner.latch.await(5, TimeUnit.SECONDS);
assertNotSame(runner.lastThreadUsed, Thread.currentThread());
executor.shutdown(); executor.shutdown();
} }

View File

@ -16,7 +16,9 @@
*/ */
package org.apache.activemq.artemis.protocol.amqp.broker; package org.apache.activemq.artemis.protocol.amqp.broker;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.activemq.artemis.api.core.ActiveMQAddressExistsException; import org.apache.activemq.artemis.api.core.ActiveMQAddressExistsException;
@ -43,6 +45,7 @@ import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.server.impl.ServerConsumerImpl; import org.apache.activemq.artemis.core.server.impl.ServerConsumerImpl;
import org.apache.activemq.artemis.core.transaction.Transaction; import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPException; import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPException;
import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPInternalErrorException;
import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPResourceLimitExceededException; import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPResourceLimitExceededException;
import org.apache.activemq.artemis.protocol.amqp.logger.ActiveMQAMQPProtocolMessageBundle; import org.apache.activemq.artemis.protocol.amqp.logger.ActiveMQAMQPProtocolMessageBundle;
import org.apache.activemq.artemis.protocol.amqp.proton.AMQPConnectionContext; import org.apache.activemq.artemis.protocol.amqp.proton.AMQPConnectionContext;
@ -344,22 +347,40 @@ public class AMQPSessionCallback implements SessionCallback {
public void closeSender(final Object brokerConsumer) throws Exception { public void closeSender(final Object brokerConsumer) throws Exception {
final ServerConsumer consumer = ((ServerConsumer) brokerConsumer); final ServerConsumer consumer = ((ServerConsumer) brokerConsumer);
final CountDownLatch latch = new CountDownLatch(1);
serverSession.getSessionContext().executeOnCompletion(new IOCallback() { Runnable runnable = new Runnable() {
@Override @Override
public void done() { public void run() {
try { try {
consumer.close(false); consumer.close(false);
latch.countDown();
} catch (Exception e) { } catch (Exception e) {
logger.warn(e.getMessage(), e);
} }
} }
};
// Due to the nature of proton this could be happening within flushes from the queue-delivery (depending on how it happened on the protocol)
// to avoid deadlocks the close has to be done outside of the main thread on an executor
// otherwise you could get a deadlock
Executor executor = protonSPI.getExeuctor();
if (executor != null) {
executor.execute(runnable);
} else {
runnable.run();
}
@Override try {
public void onError(int errorCode, String errorMessage) { // a short timeout will do.. 1 second is already long enough
if (!latch.await(1, TimeUnit.SECONDS)) {
logger.debug("Could not close consumer on time");
}
} catch (InterruptedException e) {
throw new ActiveMQAMQPInternalErrorException("Unable to close consumers for queue: " + consumer.getQueue());
} }
});
consumer.getQueue().recheckRefCount(serverSession.getSessionContext());
} }
public String tempQueueName() { public String tempQueueName() {

View File

@ -27,6 +27,7 @@ import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.filter.Filter; import org.apache.activemq.artemis.core.filter.Filter;
import org.apache.activemq.artemis.core.paging.cursor.PageSubscription; import org.apache.activemq.artemis.core.paging.cursor.PageSubscription;
import org.apache.activemq.artemis.core.persistence.OperationContext;
import org.apache.activemq.artemis.core.server.impl.AckReason; import org.apache.activemq.artemis.core.server.impl.AckReason;
import org.apache.activemq.artemis.core.transaction.Transaction; import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.utils.ReferenceCounter; import org.apache.activemq.artemis.utils.ReferenceCounter;
@ -298,4 +299,7 @@ public interface Queue extends Bindable,CriticalComponent {
void decDelivering(int size); void decDelivering(int size);
/** This is to perform a check on the counter again */
void recheckRefCount(OperationContext context);
} }

View File

@ -52,6 +52,7 @@ import org.apache.activemq.artemis.core.io.IOCallback;
import org.apache.activemq.artemis.core.paging.cursor.PagePosition; import org.apache.activemq.artemis.core.paging.cursor.PagePosition;
import org.apache.activemq.artemis.core.paging.cursor.PageSubscription; import org.apache.activemq.artemis.core.paging.cursor.PageSubscription;
import org.apache.activemq.artemis.core.paging.cursor.PagedReference; import org.apache.activemq.artemis.core.paging.cursor.PagedReference;
import org.apache.activemq.artemis.core.persistence.OperationContext;
import org.apache.activemq.artemis.core.persistence.QueueStatus; import org.apache.activemq.artemis.core.persistence.QueueStatus;
import org.apache.activemq.artemis.core.persistence.StorageManager; import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.postoffice.Binding; import org.apache.activemq.artemis.core.postoffice.Binding;
@ -2942,6 +2943,25 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
return BigDecimal.valueOf((locaMessageAdded - messagesAddedSnapshot.getAndSet(locaMessageAdded)) / timeSlice).setScale(2, BigDecimal.ROUND_UP).floatValue(); return BigDecimal.valueOf((locaMessageAdded - messagesAddedSnapshot.getAndSet(locaMessageAdded)) / timeSlice).setScale(2, BigDecimal.ROUND_UP).floatValue();
} }
@Override
public void recheckRefCount(OperationContext context) {
ReferenceCounter refCount = refCountForConsumers;
if (refCount != null) {
context.executeOnCompletion(new IOCallback() {
@Override
public void done() {
refCount.check();
}
@Override
public void onError(int errorCode, String errorMessage) {
}
});
}
}
// Inner classes // Inner classes
// -------------------------------------------------------------------------- // --------------------------------------------------------------------------

View File

@ -19,20 +19,18 @@ package org.apache.activemq.artemis.core.server.impl;
import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.QueueManager;
import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.QueueManager;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.utils.ReferenceCounterUtil; import org.apache.activemq.artemis.utils.ReferenceCounterUtil;
public class QueueManagerImpl implements QueueManager { public class QueueManagerImpl extends ReferenceCounterUtil implements QueueManager {
private final SimpleString queueName; private final SimpleString queueName;
private final ActiveMQServer server; private final ActiveMQServer server;
private final Runnable runnable = new Runnable() { private void doIt() {
@Override
public void run() {
Queue queue = server.locateQueue(queueName); Queue queue = server.locateQueue(queueName);
//the queue may already have been deleted and this is a result of that //the queue may already have been deleted and this is a result of that
if (queue == null) { if (queue == null) {
@ -46,7 +44,7 @@ public class QueueManagerImpl implements QueueManager {
long consumerCount = queue.getConsumerCount(); long consumerCount = queue.getConsumerCount();
long messageCount = queue.getMessageCount(); long messageCount = queue.getMessageCount();
if (queue.isAutoCreated() && settings.isAutoDeleteQueues() && queue.getMessageCount() == 0) { if (queue.isAutoCreated() && settings.isAutoDeleteQueues() && queue.getMessageCount() == 0 && queue.getConsumerCount() == 0) {
if (ActiveMQServerLogger.LOGGER.isDebugEnabled()) { if (ActiveMQServerLogger.LOGGER.isDebugEnabled()) {
ActiveMQServerLogger.LOGGER.debug("deleting " + (queue.isAutoCreated() ? "auto-created " : "") + "queue \"" + queueName + ".\" consumerCount = " + consumerCount + "; messageCount = " + messageCount + "; isAutoDeleteQueues = " + settings.isAutoDeleteQueues()); ActiveMQServerLogger.LOGGER.debug("deleting " + (queue.isAutoCreated() ? "auto-created " : "") + "queue \"" + queueName + ".\" consumerCount = " + consumerCount + "; messageCount = " + messageCount + "; isAutoDeleteQueues = " + settings.isAutoDeleteQueues());
} }
@ -67,23 +65,11 @@ public class QueueManagerImpl implements QueueManager {
} }
} }
} }
};
private final ReferenceCounterUtil referenceCounterUtil = new ReferenceCounterUtil(runnable);
public QueueManagerImpl(ActiveMQServer server, SimpleString queueName) { public QueueManagerImpl(ActiveMQServer server, SimpleString queueName) {
this.server = server; this.server = server;
this.queueName = queueName; this.queueName = queueName;
} this.setTask(this::doIt);
@Override
public int increment() {
return referenceCounterUtil.increment();
}
@Override
public int decrement() {
return referenceCounterUtil.decrement();
} }
@Override @Override

View File

@ -508,6 +508,8 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
tx.rollback(); tx.rollback();
messageQueue.recheckRefCount(session.getSessionContext());
if (!browseOnly) { if (!browseOnly) {
TypedProperties props = new TypedProperties(); TypedProperties props = new TypedProperties();

View File

@ -24,7 +24,7 @@ import org.apache.activemq.artemis.core.server.TransientQueueManager;
import org.apache.activemq.artemis.utils.ReferenceCounterUtil; import org.apache.activemq.artemis.utils.ReferenceCounterUtil;
import org.jboss.logging.Logger; import org.jboss.logging.Logger;
public class TransientQueueManagerImpl implements TransientQueueManager { public class TransientQueueManagerImpl extends ReferenceCounterUtil implements TransientQueueManager {
private static final Logger logger = Logger.getLogger(TransientQueueManagerImpl.class); private static final Logger logger = Logger.getLogger(TransientQueueManagerImpl.class);
@ -32,9 +32,7 @@ public class TransientQueueManagerImpl implements TransientQueueManager {
private final ActiveMQServer server; private final ActiveMQServer server;
private final Runnable runnable = new Runnable() { private void doIt() {
@Override
public void run() {
try { try {
if (logger.isDebugEnabled()) { if (logger.isDebugEnabled()) {
logger.debug("deleting temporary queue " + queueName); logger.debug("deleting temporary queue " + queueName);
@ -49,24 +47,13 @@ public class TransientQueueManagerImpl implements TransientQueueManager {
ActiveMQServerLogger.LOGGER.errorRemovingTempQueue(e, queueName); ActiveMQServerLogger.LOGGER.errorRemovingTempQueue(e, queueName);
} }
} }
};
private final ReferenceCounterUtil referenceCounterUtil = new ReferenceCounterUtil(runnable);
public TransientQueueManagerImpl(ActiveMQServer server, SimpleString queueName) { public TransientQueueManagerImpl(ActiveMQServer server, SimpleString queueName) {
this.server = server; this.server = server;
this.queueName = queueName; this.queueName = queueName;
}
@Override this.setTask(this::doIt);
public int increment() {
return referenceCounterUtil.increment();
}
@Override
public int decrement() {
return referenceCounterUtil.decrement();
} }
@Override @Override

View File

@ -40,6 +40,7 @@ import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.filter.Filter; import org.apache.activemq.artemis.core.filter.Filter;
import org.apache.activemq.artemis.core.message.impl.CoreMessage; import org.apache.activemq.artemis.core.message.impl.CoreMessage;
import org.apache.activemq.artemis.core.paging.cursor.PageSubscription; import org.apache.activemq.artemis.core.paging.cursor.PageSubscription;
import org.apache.activemq.artemis.core.persistence.OperationContext;
import org.apache.activemq.artemis.core.persistence.Persister; import org.apache.activemq.artemis.core.persistence.Persister;
import org.apache.activemq.artemis.core.server.Consumer; import org.apache.activemq.artemis.core.server.Consumer;
import org.apache.activemq.artemis.core.server.MessageReference; import org.apache.activemq.artemis.core.server.MessageReference;
@ -777,6 +778,10 @@ public class ScheduledDeliveryHandlerTest extends Assert {
} }
@Override
public void recheckRefCount(OperationContext context) {
}
@Override @Override
public void unproposed(SimpleString groupID) { public void unproposed(SimpleString groupID) {

View File

@ -42,11 +42,8 @@ import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.IntStream; import java.util.stream.IntStream;
import org.apache.activemq.artemis.core.config.DivertConfiguration;
import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.DivertConfigurationRoutingType;
import org.apache.qpid.jms.JmsConnectionFactory; import org.apache.qpid.jms.JmsConnectionFactory;
import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.CoreMatchers.is;
@ -61,19 +58,11 @@ public class TopicDurableTests extends JMSClientTestSupport {
@Test @Test
public void testMessageDurableSubscription() throws Exception { public void testMessageDurableSubscription() throws Exception {
for (int i = 0; i < 100; i++) {
testLoop();
tearDown();
setUp();
}
}
private void testLoop() throws Exception {
JmsConnectionFactory connectionFactory = new JmsConnectionFactory(getBrokerQpidJMSConnectionURI() + "?jms.clientID=jmsTopicClient"); JmsConnectionFactory connectionFactory = new JmsConnectionFactory(getBrokerQpidJMSConnectionURI() + "?jms.clientID=jmsTopicClient");
Connection connection = connectionFactory.createConnection(); Connection connection = connectionFactory.createConnection();
connection.start(); connection.start();
System.err.println("testMessageDurableSubscription"); System.out.println("testMessageDurableSubscription");
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic testTopic = session.createTopic("jmsTopic"); Topic testTopic = session.createTopic("jmsTopic");
@ -87,39 +76,39 @@ public class TopicDurableTests extends JMSClientTestSupport {
String batchPrefix = "First"; String batchPrefix = "First";
List<Message> listMsgs = generateMessages(session, batchPrefix, count); List<Message> listMsgs = generateMessages(session, batchPrefix, count);
sendMessages(messageProducer, listMsgs); sendMessages(messageProducer, listMsgs);
System.err.println("First batch messages sent"); System.out.println("First batch messages sent");
List<Message> recvd1 = receiveMessages(subscriber1, count); List<Message> recvd1 = receiveMessages(subscriber1, count);
List<Message> recvd2 = receiveMessages(subscriber2, count); List<Message> recvd2 = receiveMessages(subscriber2, count);
assertThat(recvd1.size(), is(count)); assertThat(recvd1.size(), is(count));
assertMessageContent(recvd1, batchPrefix); assertMessageContent(recvd1, batchPrefix);
System.err.println(sub1ID + " :First batch messages received"); System.out.println(sub1ID + " :First batch messages received");
assertThat(recvd2.size(), is(count)); assertThat(recvd2.size(), is(count));
assertMessageContent(recvd2, batchPrefix); assertMessageContent(recvd2, batchPrefix);
System.err.println(sub2ID + " :First batch messages received"); System.out.println(sub2ID + " :First batch messages received");
subscriber1.close(); subscriber1.close();
System.err.println(sub1ID + " : closed"); System.out.println(sub1ID + " : closed");
batchPrefix = "Second"; batchPrefix = "Second";
listMsgs = generateMessages(session, batchPrefix, count); listMsgs = generateMessages(session, batchPrefix, count);
sendMessages(messageProducer, listMsgs); sendMessages(messageProducer, listMsgs);
System.err.println("Second batch messages sent"); System.out.println("Second batch messages sent");
recvd2 = receiveMessages(subscriber2, count); recvd2 = receiveMessages(subscriber2, count);
assertThat(recvd2.size(), is(count)); assertThat(recvd2.size(), is(count));
assertMessageContent(recvd2, batchPrefix); assertMessageContent(recvd2, batchPrefix);
System.err.println(sub2ID + " :Second batch messages received"); System.out.println(sub2ID + " :Second batch messages received");
subscriber1 = session.createDurableSubscriber(testTopic, sub1ID); subscriber1 = session.createDurableSubscriber(testTopic, sub1ID);
System.err.println(sub1ID + " :connected"); System.out.println(sub1ID + " :connected");
recvd1 = receiveMessages(subscriber1, count); recvd1 = receiveMessages(subscriber1, count);
assertThat(recvd1.size(), is(count)); assertThat(recvd1.size(), is(count));
assertMessageContent(recvd1, batchPrefix); assertMessageContent(recvd1, batchPrefix);
System.err.println(sub1ID + " :Second batch messages received"); System.out.println(sub1ID + " :Second batch messages received");
subscriber1.close(); subscriber1.close();
subscriber2.close(); subscriber2.close();
@ -131,9 +120,9 @@ public class TopicDurableTests extends JMSClientTestSupport {
@Test @Test
public void testSharedNonDurableSubscription() throws JMSException, NamingException, InterruptedException, ExecutionException, TimeoutException { public void testSharedNonDurableSubscription() throws JMSException, NamingException, InterruptedException, ExecutionException, TimeoutException {
int iterations = 100; int iterations = 10;
for (int i = 0; i < iterations; i++) { for (int i = 0; i < iterations; i++) {
System.err.println("testSharedNonDurableSubscription; iteration: " + i); System.out.println("testSharedNonDurableSubscription; iteration: " + i);
//SETUP-START //SETUP-START
JmsConnectionFactory connectionFactory1 = new JmsConnectionFactory(getBrokerQpidJMSConnectionURI()); JmsConnectionFactory connectionFactory1 = new JmsConnectionFactory(getBrokerQpidJMSConnectionURI());
Connection connection1 = connectionFactory1.createConnection(); Connection connection1 = connectionFactory1.createConnection();
@ -167,14 +156,14 @@ public class TopicDurableTests extends JMSClientTestSupport {
List<Message> listMsgs = generateMessages(session, count); List<Message> listMsgs = generateMessages(session, count);
List<CompletableFuture<List<Message>>> results = receiveMessagesAsync(count, subscriber1, subscriber2, subscriber3); List<CompletableFuture<List<Message>>> results = receiveMessagesAsync(count, subscriber1, subscriber2, subscriber3);
sendMessages(messageProducer, listMsgs); sendMessages(messageProducer, listMsgs);
System.err.println("messages sent"); System.out.println("messages sent");
assertThat("Each message should be received only by one consumer", assertThat("Each message should be received only by one consumer",
results.get(0).get(20, TimeUnit.SECONDS).size() + results.get(0).get(20, TimeUnit.SECONDS).size() +
results.get(1).get(20, TimeUnit.SECONDS).size() + results.get(1).get(20, TimeUnit.SECONDS).size() +
results.get(2).get(20, TimeUnit.SECONDS).size(), results.get(2).get(20, TimeUnit.SECONDS).size(),
is(count)); is(count));
System.err.println("messages received"); System.out.println("messages received");
//BODY-E //BODY-E
//TEAR-DOWN-S //TEAR-DOWN-S
@ -255,7 +244,7 @@ public class TopicDurableTests extends JMSClientTestSupport {
resultsList.add(new CompletableFuture<>()); resultsList.add(new CompletableFuture<>());
receivedResList.add(new ArrayList<>()); receivedResList.add(new ArrayList<>());
MessageListener myListener = message -> { MessageListener myListener = message -> {
System.err.println("Mesages received" + message + " count: " + totalCount.get()); System.out.println("Mesages received" + message + " count: " + totalCount.get());
receivedResList.get(index).add(message); receivedResList.get(index).add(message);
if (totalCount.decrementAndGet() == 0) { if (totalCount.decrementAndGet() == 0) {
for (int j = 0; j < consumer.length; j++) { for (int j = 0; j < consumer.length; j++) {

View File

@ -266,7 +266,21 @@ public class ConsumerTest extends ActiveMQTestBase {
} }
@Test @Test
public void testAutoCreateCOnConsumer() throws Throwable { public void testAutoCreateCOnConsumerAMQP() throws Throwable {
testAutoCreate(2);
}
@Test
public void testAutoCreateCOnConsumerCore() throws Throwable {
testAutoCreate(1);
}
@Test
public void testAutoCreateCOnConsumerOpenWire() throws Throwable {
testAutoCreate(3);
}
private void testAutoCreate(int protocol) throws Throwable {
final SimpleString thisQueue = SimpleString.toSimpleString("ThisQueue"); final SimpleString thisQueue = SimpleString.toSimpleString("ThisQueue");
if (!isNetty()) { if (!isNetty()) {
@ -275,7 +289,7 @@ public class ConsumerTest extends ActiveMQTestBase {
} }
for (int i = 0; i < 10; i++) { for (int i = 0; i < 10; i++) {
ConnectionFactory factorySend = createFactory(2); ConnectionFactory factorySend = createFactory(protocol);
Connection connection = factorySend.createConnection(); Connection connection = factorySend.createConnection();
try { try {

View File

@ -27,6 +27,7 @@ import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.filter.Filter; import org.apache.activemq.artemis.core.filter.Filter;
import org.apache.activemq.artemis.core.paging.cursor.PageSubscription; import org.apache.activemq.artemis.core.paging.cursor.PageSubscription;
import org.apache.activemq.artemis.core.persistence.OperationContext;
import org.apache.activemq.artemis.core.server.Consumer; import org.apache.activemq.artemis.core.server.Consumer;
import org.apache.activemq.artemis.core.server.MessageReference; import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.server.Queue;
@ -75,6 +76,11 @@ public class FakeQueue extends CriticalComponentImpl implements Queue {
} }
@Override
public void recheckRefCount(OperationContext context) {
}
@Override @Override
public boolean isPersistedPause() { public boolean isPersistedPause() {
return false; return false;