This closes #1678
This commit is contained in:
commit
a822af4712
|
@ -21,4 +21,19 @@ public interface ReferenceCounter {
|
||||||
int increment();
|
int increment();
|
||||||
|
|
||||||
int decrement();
|
int decrement();
|
||||||
|
|
||||||
|
int getCount();
|
||||||
|
|
||||||
|
|
||||||
|
void setTask(Runnable task);
|
||||||
|
|
||||||
|
Runnable getTask();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Some asynchronous operations (like ack) may delay certain conditions.
|
||||||
|
* After met, during afterCompletion we may need to recheck certain values
|
||||||
|
* to make sure we won't get into a situation where the condition was met asynchronously and queues not removed.
|
||||||
|
*/
|
||||||
|
void check();
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -21,7 +21,7 @@ import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
|
||||||
public class ReferenceCounterUtil implements ReferenceCounter {
|
public class ReferenceCounterUtil implements ReferenceCounter {
|
||||||
|
|
||||||
private final Runnable runnable;
|
private Runnable task;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* If executor is null the runnable will be called within the same thread, otherwise the executor will be used
|
* If executor is null the runnable will be called within the same thread, otherwise the executor will be used
|
||||||
|
@ -30,15 +30,35 @@ public class ReferenceCounterUtil implements ReferenceCounter {
|
||||||
|
|
||||||
private final AtomicInteger uses = new AtomicInteger(0);
|
private final AtomicInteger uses = new AtomicInteger(0);
|
||||||
|
|
||||||
public ReferenceCounterUtil(Runnable runnable) {
|
public ReferenceCounterUtil() {
|
||||||
this(runnable, null);
|
this.executor = null;
|
||||||
|
this.task = null;
|
||||||
|
}
|
||||||
|
|
||||||
|
public ReferenceCounterUtil(Executor executor) {
|
||||||
|
this.executor = executor;
|
||||||
}
|
}
|
||||||
|
|
||||||
public ReferenceCounterUtil(Runnable runnable, Executor executor) {
|
public ReferenceCounterUtil(Runnable runnable, Executor executor) {
|
||||||
this.runnable = runnable;
|
this.setTask(runnable);
|
||||||
this.executor = executor;
|
this.executor = executor;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public ReferenceCounterUtil(Runnable runnable) {
|
||||||
|
this.setTask(runnable);
|
||||||
|
this.executor = null;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void setTask(Runnable task) {
|
||||||
|
this.task = task;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Runnable getTask() {
|
||||||
|
return task;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public int increment() {
|
public int increment() {
|
||||||
return uses.incrementAndGet();
|
return uses.incrementAndGet();
|
||||||
|
@ -48,13 +68,29 @@ public class ReferenceCounterUtil implements ReferenceCounter {
|
||||||
public int decrement() {
|
public int decrement() {
|
||||||
int value = uses.decrementAndGet();
|
int value = uses.decrementAndGet();
|
||||||
if (value == 0) {
|
if (value == 0) {
|
||||||
if (executor != null) {
|
execute();
|
||||||
executor.execute(runnable);
|
|
||||||
} else {
|
|
||||||
runnable.run();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return value;
|
return value;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void execute() {
|
||||||
|
if (executor != null) {
|
||||||
|
executor.execute(task);
|
||||||
|
} else {
|
||||||
|
task.run();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void check() {
|
||||||
|
if (getCount() <= 0) {
|
||||||
|
execute();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int getCount() {
|
||||||
|
return uses.get();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -16,7 +16,6 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.activemq.artemis.utils;
|
package org.apache.activemq.artemis.utils;
|
||||||
|
|
||||||
import java.util.concurrent.CountDownLatch;
|
|
||||||
import java.util.concurrent.Executor;
|
import java.util.concurrent.Executor;
|
||||||
import java.util.concurrent.ExecutorService;
|
import java.util.concurrent.ExecutorService;
|
||||||
import java.util.concurrent.Executors;
|
import java.util.concurrent.Executors;
|
||||||
|
@ -30,12 +29,13 @@ public class ReferenceCounterTest extends Assert {
|
||||||
|
|
||||||
class LatchRunner implements Runnable {
|
class LatchRunner implements Runnable {
|
||||||
|
|
||||||
final CountDownLatch latch = new CountDownLatch(1);
|
final ReusableLatch latch = new ReusableLatch(1);
|
||||||
final AtomicInteger counts = new AtomicInteger(0);
|
final AtomicInteger counts = new AtomicInteger(0);
|
||||||
volatile Thread lastThreadUsed;
|
volatile Thread lastThreadUsed = Thread.currentThread();
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void run() {
|
public void run() {
|
||||||
|
lastThreadUsed = Thread.currentThread();
|
||||||
counts.incrementAndGet();
|
counts.incrementAndGet();
|
||||||
latch.countDown();
|
latch.countDown();
|
||||||
}
|
}
|
||||||
|
@ -65,6 +65,15 @@ public class ReferenceCounterTest extends Assert {
|
||||||
|
|
||||||
assertNotSame(runner.lastThreadUsed, Thread.currentThread());
|
assertNotSame(runner.lastThreadUsed, Thread.currentThread());
|
||||||
|
|
||||||
|
runner.latch.setCount(1);
|
||||||
|
runner.lastThreadUsed = Thread.currentThread();
|
||||||
|
|
||||||
|
// force a recheck
|
||||||
|
counter.check();
|
||||||
|
|
||||||
|
runner.latch.await(5, TimeUnit.SECONDS);
|
||||||
|
assertNotSame(runner.lastThreadUsed, Thread.currentThread());
|
||||||
|
|
||||||
executor.shutdown();
|
executor.shutdown();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -16,7 +16,9 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.activemq.artemis.protocol.amqp.broker;
|
package org.apache.activemq.artemis.protocol.amqp.broker;
|
||||||
|
|
||||||
|
import java.util.concurrent.CountDownLatch;
|
||||||
import java.util.concurrent.Executor;
|
import java.util.concurrent.Executor;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
|
||||||
import org.apache.activemq.artemis.api.core.ActiveMQAddressExistsException;
|
import org.apache.activemq.artemis.api.core.ActiveMQAddressExistsException;
|
||||||
|
@ -43,6 +45,7 @@ import org.apache.activemq.artemis.core.server.impl.AddressInfo;
|
||||||
import org.apache.activemq.artemis.core.server.impl.ServerConsumerImpl;
|
import org.apache.activemq.artemis.core.server.impl.ServerConsumerImpl;
|
||||||
import org.apache.activemq.artemis.core.transaction.Transaction;
|
import org.apache.activemq.artemis.core.transaction.Transaction;
|
||||||
import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPException;
|
import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPException;
|
||||||
|
import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPInternalErrorException;
|
||||||
import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPResourceLimitExceededException;
|
import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPResourceLimitExceededException;
|
||||||
import org.apache.activemq.artemis.protocol.amqp.logger.ActiveMQAMQPProtocolMessageBundle;
|
import org.apache.activemq.artemis.protocol.amqp.logger.ActiveMQAMQPProtocolMessageBundle;
|
||||||
import org.apache.activemq.artemis.protocol.amqp.proton.AMQPConnectionContext;
|
import org.apache.activemq.artemis.protocol.amqp.proton.AMQPConnectionContext;
|
||||||
|
@ -344,22 +347,40 @@ public class AMQPSessionCallback implements SessionCallback {
|
||||||
public void closeSender(final Object brokerConsumer) throws Exception {
|
public void closeSender(final Object brokerConsumer) throws Exception {
|
||||||
|
|
||||||
final ServerConsumer consumer = ((ServerConsumer) brokerConsumer);
|
final ServerConsumer consumer = ((ServerConsumer) brokerConsumer);
|
||||||
|
final CountDownLatch latch = new CountDownLatch(1);
|
||||||
|
|
||||||
serverSession.getSessionContext().executeOnCompletion(new IOCallback() {
|
Runnable runnable = new Runnable() {
|
||||||
@Override
|
@Override
|
||||||
public void done() {
|
public void run() {
|
||||||
try {
|
try {
|
||||||
consumer.close(false);
|
consumer.close(false);
|
||||||
|
latch.countDown();
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
logger.warn(e.getMessage(), e);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
// Due to the nature of proton this could be happening within flushes from the queue-delivery (depending on how it happened on the protocol)
|
||||||
|
// to avoid deadlocks the close has to be done outside of the main thread on an executor
|
||||||
|
// otherwise you could get a deadlock
|
||||||
|
Executor executor = protonSPI.getExeuctor();
|
||||||
|
|
||||||
|
if (executor != null) {
|
||||||
|
executor.execute(runnable);
|
||||||
|
} else {
|
||||||
|
runnable.run();
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
try {
|
||||||
public void onError(int errorCode, String errorMessage) {
|
// a short timeout will do.. 1 second is already long enough
|
||||||
|
if (!latch.await(1, TimeUnit.SECONDS)) {
|
||||||
|
logger.debug("Could not close consumer on time");
|
||||||
|
}
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
throw new ActiveMQAMQPInternalErrorException("Unable to close consumers for queue: " + consumer.getQueue());
|
||||||
}
|
}
|
||||||
});
|
|
||||||
|
|
||||||
|
consumer.getQueue().recheckRefCount(serverSession.getSessionContext());
|
||||||
}
|
}
|
||||||
|
|
||||||
public String tempQueueName() {
|
public String tempQueueName() {
|
||||||
|
|
|
@ -27,6 +27,7 @@ import org.apache.activemq.artemis.api.core.RoutingType;
|
||||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||||
import org.apache.activemq.artemis.core.filter.Filter;
|
import org.apache.activemq.artemis.core.filter.Filter;
|
||||||
import org.apache.activemq.artemis.core.paging.cursor.PageSubscription;
|
import org.apache.activemq.artemis.core.paging.cursor.PageSubscription;
|
||||||
|
import org.apache.activemq.artemis.core.persistence.OperationContext;
|
||||||
import org.apache.activemq.artemis.core.server.impl.AckReason;
|
import org.apache.activemq.artemis.core.server.impl.AckReason;
|
||||||
import org.apache.activemq.artemis.core.transaction.Transaction;
|
import org.apache.activemq.artemis.core.transaction.Transaction;
|
||||||
import org.apache.activemq.artemis.utils.ReferenceCounter;
|
import org.apache.activemq.artemis.utils.ReferenceCounter;
|
||||||
|
@ -298,4 +299,7 @@ public interface Queue extends Bindable,CriticalComponent {
|
||||||
|
|
||||||
void decDelivering(int size);
|
void decDelivering(int size);
|
||||||
|
|
||||||
|
/** This is to perform a check on the counter again */
|
||||||
|
void recheckRefCount(OperationContext context);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -52,6 +52,7 @@ import org.apache.activemq.artemis.core.io.IOCallback;
|
||||||
import org.apache.activemq.artemis.core.paging.cursor.PagePosition;
|
import org.apache.activemq.artemis.core.paging.cursor.PagePosition;
|
||||||
import org.apache.activemq.artemis.core.paging.cursor.PageSubscription;
|
import org.apache.activemq.artemis.core.paging.cursor.PageSubscription;
|
||||||
import org.apache.activemq.artemis.core.paging.cursor.PagedReference;
|
import org.apache.activemq.artemis.core.paging.cursor.PagedReference;
|
||||||
|
import org.apache.activemq.artemis.core.persistence.OperationContext;
|
||||||
import org.apache.activemq.artemis.core.persistence.QueueStatus;
|
import org.apache.activemq.artemis.core.persistence.QueueStatus;
|
||||||
import org.apache.activemq.artemis.core.persistence.StorageManager;
|
import org.apache.activemq.artemis.core.persistence.StorageManager;
|
||||||
import org.apache.activemq.artemis.core.postoffice.Binding;
|
import org.apache.activemq.artemis.core.postoffice.Binding;
|
||||||
|
@ -2942,6 +2943,25 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
|
||||||
return BigDecimal.valueOf((locaMessageAdded - messagesAddedSnapshot.getAndSet(locaMessageAdded)) / timeSlice).setScale(2, BigDecimal.ROUND_UP).floatValue();
|
return BigDecimal.valueOf((locaMessageAdded - messagesAddedSnapshot.getAndSet(locaMessageAdded)) / timeSlice).setScale(2, BigDecimal.ROUND_UP).floatValue();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void recheckRefCount(OperationContext context) {
|
||||||
|
ReferenceCounter refCount = refCountForConsumers;
|
||||||
|
if (refCount != null) {
|
||||||
|
context.executeOnCompletion(new IOCallback() {
|
||||||
|
@Override
|
||||||
|
public void done() {
|
||||||
|
refCount.check();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onError(int errorCode, String errorMessage) {
|
||||||
|
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
// Inner classes
|
// Inner classes
|
||||||
// --------------------------------------------------------------------------
|
// --------------------------------------------------------------------------
|
||||||
|
|
||||||
|
|
|
@ -19,20 +19,18 @@ package org.apache.activemq.artemis.core.server.impl;
|
||||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||||
import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
||||||
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
|
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
|
||||||
import org.apache.activemq.artemis.core.server.QueueManager;
|
|
||||||
import org.apache.activemq.artemis.core.server.Queue;
|
import org.apache.activemq.artemis.core.server.Queue;
|
||||||
|
import org.apache.activemq.artemis.core.server.QueueManager;
|
||||||
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
|
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
|
||||||
import org.apache.activemq.artemis.utils.ReferenceCounterUtil;
|
import org.apache.activemq.artemis.utils.ReferenceCounterUtil;
|
||||||
|
|
||||||
public class QueueManagerImpl implements QueueManager {
|
public class QueueManagerImpl extends ReferenceCounterUtil implements QueueManager {
|
||||||
|
|
||||||
private final SimpleString queueName;
|
private final SimpleString queueName;
|
||||||
|
|
||||||
private final ActiveMQServer server;
|
private final ActiveMQServer server;
|
||||||
|
|
||||||
private final Runnable runnable = new Runnable() {
|
private void doIt() {
|
||||||
@Override
|
|
||||||
public void run() {
|
|
||||||
Queue queue = server.locateQueue(queueName);
|
Queue queue = server.locateQueue(queueName);
|
||||||
//the queue may already have been deleted and this is a result of that
|
//the queue may already have been deleted and this is a result of that
|
||||||
if (queue == null) {
|
if (queue == null) {
|
||||||
|
@ -46,7 +44,7 @@ public class QueueManagerImpl implements QueueManager {
|
||||||
long consumerCount = queue.getConsumerCount();
|
long consumerCount = queue.getConsumerCount();
|
||||||
long messageCount = queue.getMessageCount();
|
long messageCount = queue.getMessageCount();
|
||||||
|
|
||||||
if (queue.isAutoCreated() && settings.isAutoDeleteQueues() && queue.getMessageCount() == 0) {
|
if (queue.isAutoCreated() && settings.isAutoDeleteQueues() && queue.getMessageCount() == 0 && queue.getConsumerCount() == 0) {
|
||||||
if (ActiveMQServerLogger.LOGGER.isDebugEnabled()) {
|
if (ActiveMQServerLogger.LOGGER.isDebugEnabled()) {
|
||||||
ActiveMQServerLogger.LOGGER.debug("deleting " + (queue.isAutoCreated() ? "auto-created " : "") + "queue \"" + queueName + ".\" consumerCount = " + consumerCount + "; messageCount = " + messageCount + "; isAutoDeleteQueues = " + settings.isAutoDeleteQueues());
|
ActiveMQServerLogger.LOGGER.debug("deleting " + (queue.isAutoCreated() ? "auto-created " : "") + "queue \"" + queueName + ".\" consumerCount = " + consumerCount + "; messageCount = " + messageCount + "; isAutoDeleteQueues = " + settings.isAutoDeleteQueues());
|
||||||
}
|
}
|
||||||
|
@ -67,23 +65,11 @@ public class QueueManagerImpl implements QueueManager {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
};
|
|
||||||
|
|
||||||
private final ReferenceCounterUtil referenceCounterUtil = new ReferenceCounterUtil(runnable);
|
|
||||||
|
|
||||||
public QueueManagerImpl(ActiveMQServer server, SimpleString queueName) {
|
public QueueManagerImpl(ActiveMQServer server, SimpleString queueName) {
|
||||||
this.server = server;
|
this.server = server;
|
||||||
this.queueName = queueName;
|
this.queueName = queueName;
|
||||||
}
|
this.setTask(this::doIt);
|
||||||
|
|
||||||
@Override
|
|
||||||
public int increment() {
|
|
||||||
return referenceCounterUtil.increment();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public int decrement() {
|
|
||||||
return referenceCounterUtil.decrement();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -508,6 +508,8 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
|
||||||
|
|
||||||
tx.rollback();
|
tx.rollback();
|
||||||
|
|
||||||
|
messageQueue.recheckRefCount(session.getSessionContext());
|
||||||
|
|
||||||
if (!browseOnly) {
|
if (!browseOnly) {
|
||||||
TypedProperties props = new TypedProperties();
|
TypedProperties props = new TypedProperties();
|
||||||
|
|
||||||
|
|
|
@ -24,7 +24,7 @@ import org.apache.activemq.artemis.core.server.TransientQueueManager;
|
||||||
import org.apache.activemq.artemis.utils.ReferenceCounterUtil;
|
import org.apache.activemq.artemis.utils.ReferenceCounterUtil;
|
||||||
import org.jboss.logging.Logger;
|
import org.jboss.logging.Logger;
|
||||||
|
|
||||||
public class TransientQueueManagerImpl implements TransientQueueManager {
|
public class TransientQueueManagerImpl extends ReferenceCounterUtil implements TransientQueueManager {
|
||||||
|
|
||||||
private static final Logger logger = Logger.getLogger(TransientQueueManagerImpl.class);
|
private static final Logger logger = Logger.getLogger(TransientQueueManagerImpl.class);
|
||||||
|
|
||||||
|
@ -32,9 +32,7 @@ public class TransientQueueManagerImpl implements TransientQueueManager {
|
||||||
|
|
||||||
private final ActiveMQServer server;
|
private final ActiveMQServer server;
|
||||||
|
|
||||||
private final Runnable runnable = new Runnable() {
|
private void doIt() {
|
||||||
@Override
|
|
||||||
public void run() {
|
|
||||||
try {
|
try {
|
||||||
if (logger.isDebugEnabled()) {
|
if (logger.isDebugEnabled()) {
|
||||||
logger.debug("deleting temporary queue " + queueName);
|
logger.debug("deleting temporary queue " + queueName);
|
||||||
|
@ -49,24 +47,13 @@ public class TransientQueueManagerImpl implements TransientQueueManager {
|
||||||
ActiveMQServerLogger.LOGGER.errorRemovingTempQueue(e, queueName);
|
ActiveMQServerLogger.LOGGER.errorRemovingTempQueue(e, queueName);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
};
|
|
||||||
|
|
||||||
private final ReferenceCounterUtil referenceCounterUtil = new ReferenceCounterUtil(runnable);
|
|
||||||
|
|
||||||
public TransientQueueManagerImpl(ActiveMQServer server, SimpleString queueName) {
|
public TransientQueueManagerImpl(ActiveMQServer server, SimpleString queueName) {
|
||||||
this.server = server;
|
this.server = server;
|
||||||
|
|
||||||
this.queueName = queueName;
|
this.queueName = queueName;
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
this.setTask(this::doIt);
|
||||||
public int increment() {
|
|
||||||
return referenceCounterUtil.increment();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public int decrement() {
|
|
||||||
return referenceCounterUtil.decrement();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -40,6 +40,7 @@ import org.apache.activemq.artemis.api.core.SimpleString;
|
||||||
import org.apache.activemq.artemis.core.filter.Filter;
|
import org.apache.activemq.artemis.core.filter.Filter;
|
||||||
import org.apache.activemq.artemis.core.message.impl.CoreMessage;
|
import org.apache.activemq.artemis.core.message.impl.CoreMessage;
|
||||||
import org.apache.activemq.artemis.core.paging.cursor.PageSubscription;
|
import org.apache.activemq.artemis.core.paging.cursor.PageSubscription;
|
||||||
|
import org.apache.activemq.artemis.core.persistence.OperationContext;
|
||||||
import org.apache.activemq.artemis.core.persistence.Persister;
|
import org.apache.activemq.artemis.core.persistence.Persister;
|
||||||
import org.apache.activemq.artemis.core.server.Consumer;
|
import org.apache.activemq.artemis.core.server.Consumer;
|
||||||
import org.apache.activemq.artemis.core.server.MessageReference;
|
import org.apache.activemq.artemis.core.server.MessageReference;
|
||||||
|
@ -777,6 +778,10 @@ public class ScheduledDeliveryHandlerTest extends Assert {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void recheckRefCount(OperationContext context) {
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void unproposed(SimpleString groupID) {
|
public void unproposed(SimpleString groupID) {
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,259 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
* <p>
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
* <p>
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.activemq.artemis.tests.integration.amqp;
|
||||||
|
|
||||||
|
import javax.jms.Connection;
|
||||||
|
import javax.jms.ConnectionFactory;
|
||||||
|
import javax.jms.DeliveryMode;
|
||||||
|
import javax.jms.JMSException;
|
||||||
|
import javax.jms.Message;
|
||||||
|
import javax.jms.MessageConsumer;
|
||||||
|
import javax.jms.MessageListener;
|
||||||
|
import javax.jms.MessageProducer;
|
||||||
|
import javax.jms.Session;
|
||||||
|
import javax.jms.TextMessage;
|
||||||
|
import javax.jms.Topic;
|
||||||
|
import javax.naming.Context;
|
||||||
|
import javax.naming.InitialContext;
|
||||||
|
import javax.naming.NamingException;
|
||||||
|
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Hashtable;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.concurrent.CompletableFuture;
|
||||||
|
import java.util.concurrent.ExecutionException;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.concurrent.TimeoutException;
|
||||||
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
import java.util.stream.IntStream;
|
||||||
|
|
||||||
|
import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
||||||
|
import org.apache.qpid.jms.JmsConnectionFactory;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import static org.hamcrest.CoreMatchers.is;
|
||||||
|
|
||||||
|
public class TopicDurableTests extends JMSClientTestSupport {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void createAddressAndQueues(ActiveMQServer server) throws Exception {
|
||||||
|
// do not create unnecessary queues
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testMessageDurableSubscription() throws Exception {
|
||||||
|
JmsConnectionFactory connectionFactory = new JmsConnectionFactory(getBrokerQpidJMSConnectionURI() + "?jms.clientID=jmsTopicClient");
|
||||||
|
Connection connection = connectionFactory.createConnection();
|
||||||
|
connection.start();
|
||||||
|
|
||||||
|
System.out.println("testMessageDurableSubscription");
|
||||||
|
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||||
|
Topic testTopic = session.createTopic("jmsTopic");
|
||||||
|
|
||||||
|
String sub1ID = "sub1DurSub";
|
||||||
|
String sub2ID = "sub2DurSub";
|
||||||
|
MessageConsumer subscriber1 = session.createDurableSubscriber(testTopic, sub1ID);
|
||||||
|
MessageConsumer subscriber2 = session.createDurableSubscriber(testTopic, sub2ID);
|
||||||
|
MessageProducer messageProducer = session.createProducer(testTopic);
|
||||||
|
|
||||||
|
int count = 100;
|
||||||
|
String batchPrefix = "First";
|
||||||
|
List<Message> listMsgs = generateMessages(session, batchPrefix, count);
|
||||||
|
sendMessages(messageProducer, listMsgs);
|
||||||
|
System.out.println("First batch messages sent");
|
||||||
|
|
||||||
|
List<Message> recvd1 = receiveMessages(subscriber1, count);
|
||||||
|
List<Message> recvd2 = receiveMessages(subscriber2, count);
|
||||||
|
|
||||||
|
assertThat(recvd1.size(), is(count));
|
||||||
|
assertMessageContent(recvd1, batchPrefix);
|
||||||
|
System.out.println(sub1ID + " :First batch messages received");
|
||||||
|
|
||||||
|
assertThat(recvd2.size(), is(count));
|
||||||
|
assertMessageContent(recvd2, batchPrefix);
|
||||||
|
System.out.println(sub2ID + " :First batch messages received");
|
||||||
|
|
||||||
|
subscriber1.close();
|
||||||
|
System.out.println(sub1ID + " : closed");
|
||||||
|
|
||||||
|
batchPrefix = "Second";
|
||||||
|
listMsgs = generateMessages(session, batchPrefix, count);
|
||||||
|
sendMessages(messageProducer, listMsgs);
|
||||||
|
System.out.println("Second batch messages sent");
|
||||||
|
|
||||||
|
recvd2 = receiveMessages(subscriber2, count);
|
||||||
|
assertThat(recvd2.size(), is(count));
|
||||||
|
assertMessageContent(recvd2, batchPrefix);
|
||||||
|
System.out.println(sub2ID + " :Second batch messages received");
|
||||||
|
|
||||||
|
subscriber1 = session.createDurableSubscriber(testTopic, sub1ID);
|
||||||
|
System.out.println(sub1ID + " :connected");
|
||||||
|
|
||||||
|
recvd1 = receiveMessages(subscriber1, count);
|
||||||
|
assertThat(recvd1.size(), is(count));
|
||||||
|
assertMessageContent(recvd1, batchPrefix);
|
||||||
|
System.out.println(sub1ID + " :Second batch messages received");
|
||||||
|
|
||||||
|
subscriber1.close();
|
||||||
|
subscriber2.close();
|
||||||
|
|
||||||
|
session.unsubscribe(sub1ID);
|
||||||
|
session.unsubscribe(sub2ID);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSharedNonDurableSubscription() throws JMSException, NamingException, InterruptedException, ExecutionException, TimeoutException {
|
||||||
|
int iterations = 10;
|
||||||
|
for (int i = 0; i < iterations; i++) {
|
||||||
|
System.out.println("testSharedNonDurableSubscription; iteration: " + i);
|
||||||
|
//SETUP-START
|
||||||
|
JmsConnectionFactory connectionFactory1 = new JmsConnectionFactory(getBrokerQpidJMSConnectionURI());
|
||||||
|
Connection connection1 = connectionFactory1.createConnection();
|
||||||
|
|
||||||
|
|
||||||
|
Hashtable env2 = new Hashtable<Object, Object>();
|
||||||
|
env2.put(Context.INITIAL_CONTEXT_FACTORY, "org.apache.qpid.jms.jndi.JmsInitialContextFactory");
|
||||||
|
env2.put("connectionfactory.qpidConnectionFactory", "amqp://localhost:5672");
|
||||||
|
env2.put("topic." + "jmsTopic", "jmsTopic");
|
||||||
|
Context context2 = new InitialContext(env2);
|
||||||
|
ConnectionFactory connectionFactory2 = (ConnectionFactory) context2.lookup("qpidConnectionFactory");
|
||||||
|
Connection connection2 = connectionFactory2.createConnection();
|
||||||
|
|
||||||
|
connection1.start();
|
||||||
|
connection2.start();
|
||||||
|
|
||||||
|
Session session = connection1.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||||
|
Session session2 = connection2.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||||
|
Topic testTopic = session.createTopic("jmsTopic");
|
||||||
|
//SETUP-END
|
||||||
|
|
||||||
|
//BODY-S
|
||||||
|
String subID = "sharedConsumerNonDurable123";
|
||||||
|
MessageConsumer subscriber1 = session.createSharedConsumer(testTopic, subID);
|
||||||
|
MessageConsumer subscriber2 = session2.createSharedConsumer(testTopic, subID);
|
||||||
|
MessageConsumer subscriber3 = session2.createSharedConsumer(testTopic, subID);
|
||||||
|
MessageProducer messageProducer = session.createProducer(testTopic);
|
||||||
|
messageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
|
||||||
|
|
||||||
|
int count = 10;
|
||||||
|
List<Message> listMsgs = generateMessages(session, count);
|
||||||
|
List<CompletableFuture<List<Message>>> results = receiveMessagesAsync(count, subscriber1, subscriber2, subscriber3);
|
||||||
|
sendMessages(messageProducer, listMsgs);
|
||||||
|
System.out.println("messages sent");
|
||||||
|
|
||||||
|
assertThat("Each message should be received only by one consumer",
|
||||||
|
results.get(0).get(20, TimeUnit.SECONDS).size() +
|
||||||
|
results.get(1).get(20, TimeUnit.SECONDS).size() +
|
||||||
|
results.get(2).get(20, TimeUnit.SECONDS).size(),
|
||||||
|
is(count));
|
||||||
|
System.out.println("messages received");
|
||||||
|
//BODY-E
|
||||||
|
|
||||||
|
//TEAR-DOWN-S
|
||||||
|
connection1.stop();
|
||||||
|
connection2.stop();
|
||||||
|
subscriber1.close();
|
||||||
|
subscriber2.close();
|
||||||
|
session.close();
|
||||||
|
session2.close();
|
||||||
|
connection1.close();
|
||||||
|
connection2.close();
|
||||||
|
//TEAR-DOWN-E
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
private void sendMessages(MessageProducer producer, List<Message> messages) {
|
||||||
|
messages.forEach(m -> {
|
||||||
|
try {
|
||||||
|
producer.send(m);
|
||||||
|
} catch (JMSException e) {
|
||||||
|
e.printStackTrace();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
protected List<Message> receiveMessages(MessageConsumer consumer, int count) {
|
||||||
|
return receiveMessages(consumer, count, 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
protected List<Message> receiveMessages(MessageConsumer consumer, int count, long timeout) {
|
||||||
|
List<Message> recvd = new ArrayList<>();
|
||||||
|
IntStream.range(0, count).forEach(i -> {
|
||||||
|
try {
|
||||||
|
recvd.add(timeout > 0 ? consumer.receive(timeout) : consumer.receive());
|
||||||
|
} catch (JMSException e) {
|
||||||
|
e.printStackTrace();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
return recvd;
|
||||||
|
}
|
||||||
|
|
||||||
|
protected void assertMessageContent(List<Message> msgs, String content) {
|
||||||
|
msgs.forEach(m -> {
|
||||||
|
try {
|
||||||
|
assertTrue(((TextMessage) m).getText().contains(content));
|
||||||
|
} catch (JMSException e) {
|
||||||
|
e.printStackTrace();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
protected List<Message> generateMessages(Session session, int count) {
|
||||||
|
return generateMessages(session, "", count);
|
||||||
|
}
|
||||||
|
|
||||||
|
protected List<Message> generateMessages(Session session, String prefix, int count) {
|
||||||
|
List<Message> messages = new ArrayList<>();
|
||||||
|
StringBuilder sb = new StringBuilder();
|
||||||
|
IntStream.range(0, count).forEach(i -> {
|
||||||
|
try {
|
||||||
|
messages.add(session.createTextMessage(sb.append(prefix).append("testMessage").append(i).toString()));
|
||||||
|
sb.setLength(0);
|
||||||
|
} catch (JMSException e) {
|
||||||
|
e.printStackTrace();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
return messages;
|
||||||
|
}
|
||||||
|
|
||||||
|
protected List<CompletableFuture<List<Message>>> receiveMessagesAsync(int count, MessageConsumer... consumer) throws JMSException {
|
||||||
|
AtomicInteger totalCount = new AtomicInteger(count);
|
||||||
|
List<CompletableFuture<List<Message>>> resultsList = new ArrayList<>();
|
||||||
|
List<List<Message>> receivedResList = new ArrayList<>();
|
||||||
|
|
||||||
|
for (int i = 0; i < consumer.length; i++) {
|
||||||
|
final int index = i;
|
||||||
|
resultsList.add(new CompletableFuture<>());
|
||||||
|
receivedResList.add(new ArrayList<>());
|
||||||
|
MessageListener myListener = message -> {
|
||||||
|
System.out.println("Mesages received" + message + " count: " + totalCount.get());
|
||||||
|
receivedResList.get(index).add(message);
|
||||||
|
if (totalCount.decrementAndGet() == 0) {
|
||||||
|
for (int j = 0; j < consumer.length; j++) {
|
||||||
|
resultsList.get(j).complete(receivedResList.get(j));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
consumer[i].setMessageListener(myListener);
|
||||||
|
}
|
||||||
|
return resultsList;
|
||||||
|
}
|
||||||
|
}
|
|
@ -266,7 +266,21 @@ public class ConsumerTest extends ActiveMQTestBase {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testAutoCreateCOnConsumer() throws Throwable {
|
public void testAutoCreateCOnConsumerAMQP() throws Throwable {
|
||||||
|
testAutoCreate(2);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testAutoCreateCOnConsumerCore() throws Throwable {
|
||||||
|
testAutoCreate(1);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testAutoCreateCOnConsumerOpenWire() throws Throwable {
|
||||||
|
testAutoCreate(3);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void testAutoCreate(int protocol) throws Throwable {
|
||||||
|
|
||||||
final SimpleString thisQueue = SimpleString.toSimpleString("ThisQueue");
|
final SimpleString thisQueue = SimpleString.toSimpleString("ThisQueue");
|
||||||
if (!isNetty()) {
|
if (!isNetty()) {
|
||||||
|
@ -275,7 +289,7 @@ public class ConsumerTest extends ActiveMQTestBase {
|
||||||
}
|
}
|
||||||
|
|
||||||
for (int i = 0; i < 10; i++) {
|
for (int i = 0; i < 10; i++) {
|
||||||
ConnectionFactory factorySend = createFactory(2);
|
ConnectionFactory factorySend = createFactory(protocol);
|
||||||
Connection connection = factorySend.createConnection();
|
Connection connection = factorySend.createConnection();
|
||||||
|
|
||||||
try {
|
try {
|
||||||
|
|
|
@ -27,6 +27,7 @@ import org.apache.activemq.artemis.api.core.RoutingType;
|
||||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||||
import org.apache.activemq.artemis.core.filter.Filter;
|
import org.apache.activemq.artemis.core.filter.Filter;
|
||||||
import org.apache.activemq.artemis.core.paging.cursor.PageSubscription;
|
import org.apache.activemq.artemis.core.paging.cursor.PageSubscription;
|
||||||
|
import org.apache.activemq.artemis.core.persistence.OperationContext;
|
||||||
import org.apache.activemq.artemis.core.server.Consumer;
|
import org.apache.activemq.artemis.core.server.Consumer;
|
||||||
import org.apache.activemq.artemis.core.server.MessageReference;
|
import org.apache.activemq.artemis.core.server.MessageReference;
|
||||||
import org.apache.activemq.artemis.core.server.Queue;
|
import org.apache.activemq.artemis.core.server.Queue;
|
||||||
|
@ -75,6 +76,11 @@ public class FakeQueue extends CriticalComponentImpl implements Queue {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void recheckRefCount(OperationContext context) {
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean isPersistedPause() {
|
public boolean isPersistedPause() {
|
||||||
return false;
|
return false;
|
||||||
|
|
Loading…
Reference in New Issue