This commit is contained in:
Clebert Suconic 2019-03-12 12:41:00 -04:00
commit 2aa12ffc37
5 changed files with 143 additions and 50 deletions

View File

@ -36,6 +36,11 @@ public class MQTTSessionCallback implements SessionCallback {
this.connection = connection;
}
@Override
public boolean supportsDirectDelivery() {
return false;
}
@Override
public boolean isWritable(ReadyListener callback, Object protocolContext) {
return connection.isWritable(callback);

View File

@ -79,6 +79,11 @@ public class StompSession implements SessionCallback {
this.consumerCredits = ConfigurationHelper.getIntProperty(TransportConstants.STOMP_CONSUMERS_CREDIT, TransportConstants.STOMP_DEFAULT_CONSUMERS_CREDIT, connection.getAcceptorUsed().getConfiguration());
}
@Override
public boolean supportsDirectDelivery() {
return false;
}
@Override
public boolean isWritable(ReadyListener callback, Object protocolContext) {
return connection.isWritable(callback);

View File

@ -39,6 +39,7 @@ import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
import org.apache.activemq.artemis.api.core.ActiveMQException;
@ -210,6 +211,9 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
private final Runnable deliverRunner = new DeliverRunner();
//This lock is used to prevent deadlocks between direct and async deliveries
private final ReentrantLock deliverLock = new ReentrantLock();
private volatile boolean depagePending = false;
private final StorageManager storageManager;
@ -249,7 +253,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
private volatile boolean directDeliver = true;
private volatile boolean supportsDirectDeliver = true;
private volatile boolean supportsDirectDeliver = false;
private AddressSettingsRepositoryListener addressSettingsRepositoryListener;
@ -881,7 +885,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
return;
}
if (supportsDirectDeliver && !directDeliver && direct && System.currentTimeMillis() - lastDirectDeliveryCheck > CHECK_QUEUE_SIZE_PERIOD) {
if (direct && supportsDirectDeliver && !directDeliver && System.currentTimeMillis() - lastDirectDeliveryCheck > CHECK_QUEUE_SIZE_PERIOD) {
if (logger.isTraceEnabled()) {
logger.trace("Checking to re-enable direct deliver on queue " + this.getName());
}
@ -1074,8 +1078,12 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
throw ActiveMQMessageBundle.BUNDLE.maxConsumerLimitReachedForQueue(address, name);
}
if (!consumer.supportsDirectDelivery()) {
this.supportsDirectDeliver = false;
if (consumers.isEmpty()) {
this.supportsDirectDeliver = consumer.supportsDirectDelivery();
} else {
if (!consumer.supportsDirectDelivery()) {
this.supportsDirectDeliver = false;
}
}
cancelRedistributor();
@ -1154,6 +1162,9 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
}
private boolean checkConsumerDirectDeliver() {
if (consumers.isEmpty()) {
return false;
}
boolean supports = true;
for (ConsumerHolder consumerCheck : consumers) {
if (!consumerCheck.consumer.supportsDirectDelivery()) {
@ -2337,7 +2348,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
@Override
public boolean isDirectDeliver() {
return directDeliver;
return directDeliver && supportsDirectDeliver;
}
/**
@ -3069,57 +3080,74 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
* This method delivers the reference on the callers thread - this can give us better latency in the case there is nothing in the queue
*/
private boolean deliverDirect(final MessageReference ref) {
synchronized (this) {
if (!supportsDirectDeliver) {
return false;
}
if (paused || !canDispatch() && redistributor == null) {
return false;
}
if (checkExpired(ref)) {
return true;
}
consumers.reset();
while (consumers.hasNext() || redistributor != null) {
ConsumerHolder<? extends Consumer> holder = redistributor == null ? consumers.next() : redistributor;
Consumer consumer = holder.consumer;
final SimpleString groupID = extractGroupID(ref);
Consumer groupConsumer = getGroupConsumer(groupID);
if (groupConsumer != null) {
consumer = groupConsumer;
//The order to enter the deliverLock re QueueImpl::this lock is very important:
//- acquire deliverLock::lock
//- acquire QueueImpl::this lock
//DeliverRunner::run is doing the same to avoid deadlocks.
//Without deliverLock, a directDeliver happening while a DeliverRunner::run
//could cause a deadlock.
//Both DeliverRunner::run and deliverDirect could trigger a ServerConsumerImpl::individualAcknowledge:
//- deliverDirect first acquire QueueImpl::this, then ServerConsumerImpl::this
//- DeliverRunner::run first acquire ServerConsumerImpl::this then QueueImpl::this
if (!deliverLock.tryLock()) {
logger.tracef("Cannot perform a directDelivery because there is a running async deliver");
return false;
}
try {
synchronized (this) {
if (!supportsDirectDeliver) {
return false;
}
if (paused || !canDispatch() && redistributor == null) {
return false;
}
HandleStatus status = handle(ref, consumer);
if (status == HandleStatus.HANDLED) {
if (redistributor == null) {
handleMessageGroup(ref, consumer, groupConsumer, groupID);
}
messagesAdded.incrementAndGet();
deliveriesInTransit.countUp();
proceedDeliver(consumer, ref);
consumers.reset();
if (checkExpired(ref)) {
return true;
}
if (redistributor != null || groupConsumer != null) {
break;
}
}
consumers.reset();
if (logger.isTraceEnabled()) {
logger.tracef("Queue " + getName() + " is out of direct delivery as no consumers handled a delivery");
while (consumers.hasNext() || redistributor != null) {
ConsumerHolder<? extends Consumer> holder = redistributor == null ? consumers.next() : redistributor;
Consumer consumer = holder.consumer;
final SimpleString groupID = extractGroupID(ref);
Consumer groupConsumer = getGroupConsumer(groupID);
if (groupConsumer != null) {
consumer = groupConsumer;
}
HandleStatus status = handle(ref, consumer);
if (status == HandleStatus.HANDLED) {
if (redistributor == null) {
handleMessageGroup(ref, consumer, groupConsumer, groupID);
}
messagesAdded.incrementAndGet();
deliveriesInTransit.countUp();
proceedDeliver(consumer, ref);
consumers.reset();
return true;
}
if (redistributor != null || groupConsumer != null) {
break;
}
}
if (logger.isTraceEnabled()) {
logger.tracef("Queue " + getName() + " is out of direct delivery as no consumers handled a delivery");
}
return false;
}
return false;
} finally {
deliverLock.unlock();
}
}
@ -3464,8 +3492,11 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
enterCritical(CRITICAL_DELIVER);
boolean needCheckDepage = false;
try {
synchronized (QueueImpl.this.deliverRunner) {
deliverLock.lock();
try {
needCheckDepage = deliver();
} finally {
deliverLock.unlock();
}
} finally {
leaveCritical(CRITICAL_DELIVER);

View File

@ -43,6 +43,8 @@ import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.config.CoreAddressConfiguration;
import org.apache.activemq.artemis.core.config.CoreQueueConfiguration;
import org.apache.activemq.artemis.core.postoffice.Binding;
import org.apache.activemq.artemis.core.postoffice.QueueBinding;
import org.apache.activemq.artemis.core.protocol.mqtt.MQTTSession;
import org.apache.activemq.artemis.core.protocol.mqtt.MQTTUtil;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
@ -63,6 +65,7 @@ import org.fusesource.mqtt.client.Topic;
import org.fusesource.mqtt.client.Tracer;
import org.fusesource.mqtt.codec.MQTTFrame;
import org.fusesource.mqtt.codec.PUBLISH;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
@ -151,6 +154,23 @@ public class MQTTTest extends MQTTTestSupport {
publishProvider.disconnect();
}
@Test(timeout = 60 * 1000)
public void testDirectDeliverFalse() throws Exception {
final MQTTClientProvider subscriptionProvider = getMQTTClientProvider();
initializeConnection(subscriptionProvider);
subscriptionProvider.subscribe("foo/bah", AT_MOST_ONCE);
for (Binding b : server.getPostOffice().getAllBindings().values()) {
if (b instanceof QueueBinding) {
Assert.assertFalse("Queue " + ((QueueBinding) b).getQueue().getName(), ((QueueBinding)b).getQueue().isDirectDeliver());
}
}
subscriptionProvider.disconnect();
}
@Test(timeout = 60 * 1000)
public void testUnsubscribeMQTT() throws Exception {
final MQTTClientProvider subscriptionProvider = getMQTTClientProvider();

View File

@ -46,6 +46,8 @@ import org.apache.activemq.artemis.api.core.management.ActiveMQServerControl;
import org.apache.activemq.artemis.api.core.management.ManagementHelper;
import org.apache.activemq.artemis.api.core.management.ResourceNames;
import org.apache.activemq.artemis.api.jms.ActiveMQJMSClient;
import org.apache.activemq.artemis.core.postoffice.Binding;
import org.apache.activemq.artemis.core.postoffice.QueueBinding;
import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding;
import org.apache.activemq.artemis.core.protocol.stomp.Stomp;
import org.apache.activemq.artemis.core.protocol.stomp.StompProtocolManagerFactory;
@ -2009,4 +2011,34 @@ public class StompTest extends StompTestBase {
assertTrue(server.getAddressInfo(simpleQueueName).getRoutingTypes().contains(RoutingType.MULTICAST));
Assert.assertNull(server.locateQueue(simpleQueueName));
}
@Test
public void directDeliverDisabledOnStomp() throws Exception {
String payload = "This is a test message";
// Set up STOMP subscription
conn.connect(defUser, defPass);
subscribe(conn, null, Stomp.Headers.Subscribe.AckModeValues.AUTO);
for (Binding b : server.getPostOffice().getAllBindings().values()) {
if (b instanceof QueueBinding) {
Assert.assertFalse("Queue " + ((QueueBinding) b).getQueue().getName(), ((QueueBinding)b).getQueue().isDirectDeliver());
}
}
// Send MQTT Message
MQTTClientProvider clientProvider = new FuseMQTTClientProvider();
clientProvider.connect("tcp://" + hostname + ":" + port);
clientProvider.publish(getQueuePrefix() + getQueueName(), payload.getBytes(), 0);
clientProvider.disconnect();
// Receive STOMP Message
ClientStompFrame frame = conn.receiveFrame();
assertTrue(frame.getBody()
.contains(payload));
}
}