diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
index 616da5b474..f850cc185e 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
@@ -186,6 +186,11 @@ public class AMQPSessionCallback implements SessionCallback {
(String) null, this, true, operationContext, manager.getPrefixes());
}
+ @Override
+ public void afterDelivery() throws Exception {
+
+ }
+
public void start() {
}
@@ -599,11 +604,6 @@ public class AMQPSessionCallback implements SessionCallback {
}
- @Override
- public void afterDeliver(MessageReference ref, Message message, ServerConsumer consumerID, int deliveryCount) {
-
- }
-
@Override
public int sendLargeMessage(MessageReference ref,
Message message,
diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java
index bb3853931b..abcfe3f3d5 100644
--- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java
+++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java
@@ -59,12 +59,6 @@ public class MQTTPublishManager {
private MQTTSessionState.OutboundStore outboundStore;
- /** this is the last qos that happened during delivery.
- * since afterDelivery will happen in the same thread, no other threads should be calling delivery and afterDelivery
- * so it is safe to keep this value here.
- */
- private Integer currentQos;
-
public MQTTPublishManager(MQTTSession session) {
this.session = session;
}
@@ -114,6 +108,7 @@ public class MQTTPublishManager {
boolean isManagementConsumer(ServerConsumer consumer) {
return consumer == managementConsumer;
}
+
/**
* Since MQTT Subscriptions can over lap; a client may receive the same message twice. When this happens the client
* returns a PubRec or PubAck with ID. But we need to know which consumer to ack, since we only have the ID to go on we
@@ -124,35 +119,20 @@ public class MQTTPublishManager {
protected void sendMessage(ICoreMessage message, ServerConsumer consumer, int deliveryCount) throws Exception {
// This is to allow retries of PubRel.
if (isManagementConsumer(consumer)) {
- currentQos = null;
sendPubRelMessage(message);
} else {
int qos = decideQoS(message, consumer);
- currentQos = qos;
if (qos == 0) {
sendServerMessage((int) message.getMessageID(), message, deliveryCount, qos);
+ session.getServerSession().individualAcknowledge(consumer.getID(), message.getMessageID());
} else if (qos == 1 || qos == 2) {
int mqttid = outboundStore.generateMqttId(message.getMessageID(), consumer.getID());
outboundStore.publish(mqttid, message.getMessageID(), consumer.getID());
sendServerMessage(mqttid, message, deliveryCount, qos);
- } else {
- // this will happen during afterDeliver
- }
- }
- }
-
- protected void confirmMessage(ICoreMessage message, ServerConsumer consumer, int deliveryCount) throws Exception {
- if (currentQos != null) {
- int qos = currentQos.intValue();
- if (qos == 0) {
- session.getServerSession().individualAcknowledge(consumer.getID(), message.getMessageID());
- } else if (qos == 1 || qos == 2) {
- // everything happened in delivery
} else {
// Client must have disconnected and it's Subscription QoS cleared
consumer.individualCancel(message.getMessageID(), false);
}
-
}
}
diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionCallback.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionCallback.java
index 168b7fac77..50d57327a4 100644
--- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionCallback.java
+++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionCallback.java
@@ -59,15 +59,6 @@ public class MQTTSessionCallback implements SessionCallback {
return 1;
}
- @Override
- public void afterDeliver(MessageReference ref, Message message, ServerConsumer consumer, int deliveryCount) {
- try {
- session.getMqttPublishManager().confirmMessage(message.toCore(), consumer, deliveryCount);
- } catch (Exception e) {
- log.warn("Unable to send message: " + message.getMessageID() + " Cause: " + e.getMessage(), e);
- }
- }
-
@Override
public boolean updateDeliveryCountAfterCancel(ServerConsumer consumer, MessageReference ref, boolean failed) {
return false;
@@ -99,6 +90,11 @@ public class MQTTSessionCallback implements SessionCallback {
}
}
+ @Override
+ public void afterDelivery() throws Exception {
+
+ }
+
@Override
public void browserFinished(ServerConsumer consumer) {
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
index e4ecd48e29..b780563ff2 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
@@ -270,6 +270,12 @@ public class AMQSession implements SessionCallback {
}
+ // rename actualDest to destination
+ @Override
+ public void afterDelivery() throws Exception {
+
+ }
+
@Override
public void browserFinished(ServerConsumer consumer) {
AMQConsumer theConsumer = (AMQConsumer) consumer.getProtocolData();
@@ -306,14 +312,6 @@ public class AMQSession implements SessionCallback {
return theConsumer.handleDeliver(reference, message.toCore(), deliveryCount);
}
- @Override
- public void afterDeliver(MessageReference ref,
- org.apache.activemq.artemis.api.core.Message message,
- ServerConsumer consumerID,
- int deliveryCount) {
-
- }
-
@Override
public int sendLargeMessage(MessageReference reference,
org.apache.activemq.artemis.api.core.Message message,
diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java
index 03f9b4433c..80bbbe8613 100644
--- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java
+++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java
@@ -110,9 +110,8 @@ public class StompSession implements SessionCallback {
public void sendProducerCreditsFailMessage(int credits, SimpleString address) {
}
-
@Override
- public void afterDeliver(MessageReference ref, Message message, ServerConsumer consumerID, int deliveryCount) throws Exception {
+ public void afterDelivery() throws Exception {
PendingTask task;
while ((task = afterDeliveryTasks.poll()) != null) {
task.run();
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ManagementRemotingConnection.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ManagementRemotingConnection.java
index 35eab8a659..7e760c139d 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ManagementRemotingConnection.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ManagementRemotingConnection.java
@@ -211,6 +211,11 @@ public class ManagementRemotingConnection implements RemotingConnection {
return false;
}
+ @Override
+ public void afterDelivery() throws Exception {
+
+ }
+
@Override
public boolean updateDeliveryCountAfterCancel(ServerConsumer consumer, MessageReference ref, boolean failed) {
return false;
@@ -231,11 +236,6 @@ public class ManagementRemotingConnection implements RemotingConnection {
return 0;
}
- @Override
- public void afterDeliver(MessageReference ref, Message message, ServerConsumer consumerID, int deliveryCount) {
-
- }
-
@Override
public int sendLargeMessage(MessageReference reference,
Message message,
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreSessionCallback.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreSessionCallback.java
index fce0dd5346..f53d028981 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreSessionCallback.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreSessionCallback.java
@@ -132,11 +132,6 @@ public final class CoreSessionCallback implements SessionCallback {
return size;
}
- @Override
- public void afterDeliver(MessageReference ref, Message message, ServerConsumer consumerID, int deliveryCount) {
- // no op
- }
-
@Override
public void sendProducerCreditsMessage(int credits, SimpleString address) {
Packet packet = new SessionProducerCreditsMessage(credits, address);
@@ -149,6 +144,11 @@ public final class CoreSessionCallback implements SessionCallback {
}
+ @Override
+ public void afterDelivery() throws Exception {
+
+ }
+
@Override
public void sendProducerCreditsFailMessage(int credits, SimpleString address) {
Packet packet = new SessionProducerCreditsFailMessage(credits, address);
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Consumer.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Consumer.java
index 269c74fd79..babddc22cf 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Consumer.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Consumer.java
@@ -25,10 +25,6 @@ import org.apache.activemq.artemis.spi.core.protocol.SessionCallback;
public interface Consumer extends PriorityAware {
- interface GroupHandler {
- MessageReference handleMessageGroup(MessageReference ref, Consumer consumer, boolean newGroup);
- }
-
/**
*
* @see SessionCallback#supportsDirectDelivery()
@@ -38,7 +34,13 @@ public interface Consumer extends PriorityAware {
}
/**
-
+ * There was a change on semantic during 2.3 here.
+ * We now first accept the message, and the actual deliver is done as part of
+ * {@link #proceedDeliver(MessageReference)}. This is to avoid holding a lock on the queues while
+ * the delivery is being accomplished To avoid a lock on the queue in case of misbehaving
+ * consumers.
+ *
+ * This should return busy if handle is called before proceed deliver is called * * @param reference * @return @@ -46,29 +48,19 @@ public interface Consumer extends PriorityAware { */ HandleStatus handle(MessageReference reference) throws Exception; - /** - * This will return {@link HandleStatus#BUSY} if busy, {@link HandleStatus#NO_MATCH} if no match, or the MessageReference is handled - * This should return busy if handle is called before proceed deliver is called - * @param groupHandler - * @param reference - * @return - * @throws Exception - */ - default Object handleWithGroup(GroupHandler groupHandler, boolean newGroup, MessageReference reference) throws Exception { - return handle(reference); - } - /** wakes up internal threads to deliver more messages */ default void promptDelivery() { } /** - * This will called after delivery - * Giving protocols a chance to complete their deliveries doing things such as individualACK outside of main locks + * This will proceed with the actual delivery. + * Notice that handle should hold a readLock and proceedDelivery should release the readLock + * any lock operation on Consumer should also get a writeLock on the readWriteLock + * to guarantee there are no pending deliveries * * @throws Exception */ - void afterDeliver(MessageReference reference) throws Exception; + void proceedDeliver(MessageReference reference) throws Exception; Filter getFilter(); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java index 4c73261749..7d5bbe8ac7 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java @@ -689,7 +689,7 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled // FailureListener implementation -------------------------------- @Override - public void afterDeliver(MessageReference ref) { + public void proceedDeliver(MessageReference ref) { // no op } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/Redistributor.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/Redistributor.java index 44a5e0b93c..79820186bb 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/Redistributor.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/Redistributor.java @@ -193,7 +193,7 @@ public class Redistributor implements Consumer { } @Override - public void afterDeliver(MessageReference ref) { + public void proceedDeliver(MessageReference ref) { // no op } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java index 292d15f1bc..998ef8cc4b 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java @@ -98,6 +98,7 @@ import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; import org.apache.activemq.artemis.utils.BooleanUtil; import org.apache.activemq.artemis.utils.Env; import org.apache.activemq.artemis.utils.ReferenceCounter; +import org.apache.activemq.artemis.utils.ReusableLatch; import org.apache.activemq.artemis.utils.actors.ArtemisExecutor; import org.apache.activemq.artemis.utils.collections.LinkedListIterator; import org.apache.activemq.artemis.utils.collections.PriorityLinkedList; @@ -113,7 +114,7 @@ import org.jboss.logging.Logger; *
* Completely non blocking between adding to queue and delivering to consumers. */ -public class QueueImpl extends CriticalComponentImpl implements Queue, Consumer.GroupHandler { +public class QueueImpl extends CriticalComponentImpl implements Queue { protected static final int CRITICAL_PATHS = 5; protected static final int CRITICAL_PATH_ADD_TAIL = 0; @@ -267,6 +268,8 @@ public class QueueImpl extends CriticalComponentImpl implements Queue, Consumer. private final ExpiryScanner expiryScanner = new ExpiryScanner(); + private final ReusableLatch deliveriesInTransit = new ReusableLatch(0); + private final AtomicLong queueRateCheckTime = new AtomicLong(System.currentTimeMillis()); private final AtomicLong messagesAddedSnapshot = new AtomicLong(0); @@ -952,7 +955,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue, Consumer. // directDeliver flag to be re-computed resulting in direct delivery if the queue is empty // We don't recompute it on every delivery since executing isEmpty is expensive for a ConcurrentQueue - if (getExecutor().isFlushed() && + if (deliveriesInTransit.getCount() == 0 && getExecutor().isFlushed() && intermediateMessageReferences.isEmpty() && messageReferences.isEmpty() && pageIterator != null && !pageIterator.hasNext() && pageSubscription != null && !pageSubscription.isPaging()) { @@ -971,7 +974,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue, Consumer. } } - if (direct && supportsDirectDeliver && directDeliver && deliverDirect(ref)) { + if (direct && supportsDirectDeliver && directDeliver && deliveriesInTransit.getCount() == 0 && deliverDirect(ref)) { return; } @@ -1002,6 +1005,23 @@ public class QueueImpl extends CriticalComponentImpl implements Queue, Consumer. return false; } + /** + * This will wait for any pending deliveries to finish + */ + private boolean flushDeliveriesInTransit() { + try { + if (deliveriesInTransit.await(DELIVERY_TIMEOUT)) { + return true; + } else { + ActiveMQServerLogger.LOGGER.timeoutFlushInTransit(getName().toString(), getAddress().toString()); + return false; + } + } catch (Exception e) { + ActiveMQServerLogger.LOGGER.unableToFlushDeliveries(e); + return false; + } + } + @Override public void forceDelivery() { if (pageSubscription != null && pageSubscription.isPaging()) { @@ -2346,6 +2366,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue, Consumer. @Override public synchronized void pause(boolean persist) { try { + this.flushDeliveriesInTransit(); if (persist && isDurable()) { if (pauseStatusRecord >= 0) { storageManager.deleteQueueStatus(pauseStatusRecord); @@ -2586,16 +2607,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue, Consumer. consumer = groupConsumer; } - Object handleValue = handle(ref, consumer, groupConsumer == null); - - HandleStatus status; - - if (handleValue instanceof MessageReference) { - ref = (MessageReference) handleValue; - status = HandleStatus.HANDLED; - } else { - status = (HandleStatus) handleValue; - } + HandleStatus status = handle(ref, consumer); if (status == HandleStatus.HANDLED) { @@ -2603,6 +2615,13 @@ public class QueueImpl extends CriticalComponentImpl implements Queue, Consumer. // this is to avoid breaks on the loop when checking for any other factors. noDelivery = 0; + if (redistributor == null) { + ref = handleMessageGroup(ref, consumer, groupConsumer, groupID); + } + + deliveriesInTransit.countUp(); + + removeMessageReference(holder, ref); handledconsumer = consumer; handled++; @@ -2634,10 +2653,16 @@ public class QueueImpl extends CriticalComponentImpl implements Queue, Consumer. // Round robin'd all if (noDelivery == this.consumers.size()) { - if (logger.isDebugEnabled()) { - logger.debug(this + "::All the consumers were busy, giving up now"); + if (handledconsumer != null) { + // this shouldn't really happen, + // however I'm keeping this as an assertion case future developers ever change the logic here on this class + ActiveMQServerLogger.LOGGER.nonDeliveryHandled(); + } else { + if (logger.isDebugEnabled()) { + logger.debug(this + "::All the consumers were busy, giving up now"); + } + break; } - break; } noDelivery = 0; @@ -2645,7 +2670,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue, Consumer. } if (handledconsumer != null) { - afterDeliver(handledconsumer, ref); + proceedDeliver(handledconsumer, ref); } } @@ -3173,7 +3198,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue, Consumer. } } - private boolean deliver(MessageReference ref) { + private boolean deliver(final MessageReference ref) { synchronized (this) { if (!supportsDirectDeliver) { return false; @@ -3200,24 +3225,20 @@ public class QueueImpl extends CriticalComponentImpl implements Queue, Consumer. consumer = groupConsumer; } - Object handleValue = handle(ref, consumer, groupConsumer == null); - - - HandleStatus status; - - final MessageReference reference; - if (handleValue instanceof MessageReference) { - reference = (MessageReference) handleValue; - status = HandleStatus.HANDLED; - } else { - reference = ref; - status = (HandleStatus) handleValue; - } - + HandleStatus status = handle(ref, consumer); if (status == HandleStatus.HANDLED) { + final MessageReference reference; + if (redistributor == null) { + reference = handleMessageGroup(ref, consumer, groupConsumer, groupID); + } else { + reference = ref; + } + messagesAdded.incrementAndGet(); + deliveriesInTransit.countUp(); + proceedDeliver(consumer, reference); consumers.reset(); return true; } @@ -3248,16 +3269,9 @@ public class QueueImpl extends CriticalComponentImpl implements Queue, Consumer. return groupConsumer; } - /** This is {@link Consumer.GroupHandler#handleMessageGroup(MessageReference, Consumer, boolean)} */ - @Override - public MessageReference handleMessageGroup(MessageReference ref, Consumer consumer, boolean newGroup) { - if (redistributor != null) { - // no grouping work on this case - return ref; - } - SimpleString groupID = extractGroupID(ref); + private MessageReference handleMessageGroup(MessageReference ref, Consumer consumer, Consumer groupConsumer, SimpleString groupID) { if (exclusive) { - if (newGroup) { + if (groupConsumer == null) { exclusiveConsumer = consumer; if (groupFirstKey != null) { return new GroupFirstMessageReference(groupFirstKey, ref); @@ -3268,7 +3282,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue, Consumer. if (extractGroupSequence(ref) == -1) { groups.remove(groupID); consumers.repeat(); - } else if (newGroup) { + } else if (groupConsumer == null) { groups.put(groupID, consumer); if (groupFirstKey != null) { return new GroupFirstMessageReference(groupFirstKey, ref); @@ -3280,11 +3294,13 @@ public class QueueImpl extends CriticalComponentImpl implements Queue, Consumer. return ref; } - private void afterDeliver(Consumer consumer, MessageReference reference) { + private void proceedDeliver(Consumer consumer, MessageReference reference) { try { - consumer.afterDeliver(reference); + consumer.proceedDeliver(reference); } catch (Throwable t) { errorProcessing(consumer, t, reference); + } finally { + deliveriesInTransit.countDown(); } } @@ -3329,10 +3345,10 @@ public class QueueImpl extends CriticalComponentImpl implements Queue, Consumer. } } - private synchronized Object handle(final MessageReference reference, final Consumer consumer, boolean newGroup) { - Object status; + private synchronized HandleStatus handle(final MessageReference reference, final Consumer consumer) { + HandleStatus status; try { - status = consumer.handleWithGroup(this, newGroup, reference); + status = consumer.handle(reference); } catch (Throwable t) { ActiveMQServerLogger.LOGGER.removingBadConsumer(t, consumer, reference); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java index dde8d87d45..c709d4ec77 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java @@ -24,8 +24,11 @@ import java.util.Collections; import java.util.Iterator; import java.util.LinkedList; import java.util.List; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration; import org.apache.activemq.artemis.api.core.ActiveMQException; @@ -104,6 +107,13 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener { private SlowConsumerDetectionListener slowConsumerListener; + /** + * We get a readLock when a message is handled, and return the readLock when the message is finally delivered + * When stopping the consumer we need to get a writeLock to make sure we had all delivery finished + * otherwise a rollback may get message sneaking in + */ + private final ReadWriteLock lockDelivery = new ReentrantReadWriteLock(); + private volatile AtomicInteger availableCredits = new AtomicInteger(0); private boolean started; @@ -382,20 +392,8 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener { messageQueue.errorProcessing(this, e, deliveryObject); } - /** This is in case someone is using direct old API */ @Override - public HandleStatus handle(MessageReference ref) throws Exception { - Object refReturn = handleWithGroup(null, false, ref); - - if (refReturn instanceof MessageReference) { - return HandleStatus.HANDLED; - } else { - return (HandleStatus) refReturn; - } - - } - @Override - public Object handleWithGroup(GroupHandler handler, boolean newGroup, final MessageReference ref) throws Exception { + public HandleStatus handle(final MessageReference ref) throws Exception { // available credits can be set back to null with a flow control option. AtomicInteger checkInteger = availableCredits; if (callback != null && !callback.hasCredits(this) || checkInteger != null && checkInteger.get() <= 0) { @@ -483,46 +481,42 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener { } - MessageReference deliveryReference = ref; - - if (handler != null) { - deliveryReference = handler.handleMessageGroup(ref, this, newGroup); - } - - proceedDeliver(deliveryReference); + lockDelivery.readLock().lock(); return HandleStatus.HANDLED; } } - private void proceedDeliver(MessageReference reference) throws Exception { - Message message = reference.getMessage(); - - if (server.hasBrokerMessagePlugins()) { - server.callBrokerMessagePlugins(plugin -> plugin.beforeDeliver(this, reference)); - } - - if (message.isLargeMessage() && supportLargeMessage) { - if (largeMessageDeliverer == null) { - // This can't really happen as handle had already crated the deliverer - // instead of throwing an exception in weird cases there is no problem on just go ahead and create it - // again here - largeMessageDeliverer = new LargeMessageDeliverer((LargeServerMessage) message, reference); - } - // The deliverer was prepared during handle, as we can't have more than one pending large message - // as it would return busy if there is anything pending - largeMessageDeliverer.deliver(); - } else { - deliverStandardMessage(reference, message); - } - } - @Override - public void afterDeliver(MessageReference reference) throws Exception { - callback.afterDeliver(reference, reference.getMessage(), ServerConsumerImpl.this, reference.getDeliveryCount()); - if (server.hasBrokerMessagePlugins()) { - server.callBrokerMessagePlugins(plugin -> plugin.afterDeliver(this, reference)); + public void proceedDeliver(MessageReference reference) throws Exception { + try { + Message message = reference.getMessage(); + + if (server.hasBrokerMessagePlugins()) { + server.callBrokerMessagePlugins(plugin -> plugin.beforeDeliver(this, reference)); + } + + if (message.isLargeMessage() && supportLargeMessage) { + if (largeMessageDeliverer == null) { + // This can't really happen as handle had already crated the deliverer + // instead of throwing an exception in weird cases there is no problem on just go ahead and create it + // again here + largeMessageDeliverer = new LargeMessageDeliverer((LargeServerMessage) message, reference); + } + // The deliverer was prepared during handle, as we can't have more than one pending large message + // as it would return busy if there is anything pending + largeMessageDeliverer.deliver(); + } else { + deliverStandardMessage(reference, message); + } + } finally { + lockDelivery.readLock().unlock(); + callback.afterDelivery(); + if (server.hasBrokerMessagePlugins()) { + server.callBrokerMessagePlugins(plugin -> plugin.afterDeliver(this, reference)); + } } + } @Override @@ -632,7 +626,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener { * there are no other messages to be delivered. */ @Override - public void forceDelivery(final long sequence) { + public void forceDelivery(final long sequence) { forceDelivery(sequence, () -> { Message forcedDeliveryMessage = new CoreMessage(storageManager.generateID(), 50); MessageReference reference = MessageReference.Factory.createReference(forcedDeliveryMessage, messageQueue); @@ -736,7 +730,19 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener { @Override public void setStarted(final boolean started) { synchronized (lock) { - this.started = browseOnly || started; + boolean locked = lockDelivery(); + + // This is to make sure nothing would sneak to the client while started = false + // the client will stop the session and perform a rollback in certain cases. + // in case something sneaks to the client you could get to messaging delivering forever until + // you restart the server + try { + this.started = browseOnly || started; + } finally { + if (locked) { + lockDelivery.writeLock().unlock(); + } + } } // Outside the lock @@ -745,10 +751,35 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener { } } + private boolean lockDelivery() { + try { + if (!lockDelivery.writeLock().tryLock(30, TimeUnit.SECONDS)) { + ActiveMQServerLogger.LOGGER.timeoutLockingConsumer(); + if (server != null) { + server.threadDump(); + } + return false; + } + return true; + } catch (Exception e) { + ActiveMQServerLogger.LOGGER.failedToFinishDelivery(e); + return false; + } + } + @Override public void setTransferring(final boolean transferring) { synchronized (lock) { - this.transferring = transferring; + // This is to make sure that the delivery process has finished any pending delivery + // otherwise a message may sneak in on the client while we are trying to stop the consumer + boolean locked = lockDelivery(); + try { + this.transferring = transferring; + } finally { + if (locked) { + lockDelivery.writeLock().unlock(); + } + } } // Outside the lock @@ -1244,111 +1275,125 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener { } public boolean deliver() throws Exception { - if (!started) { - return false; - } - - LargeServerMessage currentLargeMessage = largeMessage; - if (currentLargeMessage == null) { - return true; - } - - if (availableCredits != null && availableCredits.get() <= 0) { - if (logger.isTraceEnabled()) { - logger.trace(this + "::FlowControl::delivery largeMessage interrupting as there are no more credits, available=" + availableCredits); - } - releaseHeapBodyBuffer(); - return false; - } - - if (!sentInitialPacket) { - context = currentLargeMessage.getBodyEncoder(); - - sizePendingLargeMessage = context.getLargeBodySize(); - - context.open(); - - sentInitialPacket = true; - - int packetSize = callback.sendLargeMessage(ref, currentLargeMessage, ServerConsumerImpl.this, context.getLargeBodySize(), ref.getDeliveryCount()); - - if (availableCredits != null) { - final int credits = availableCredits.addAndGet(-packetSize); - - if (credits <= 0) { - releaseHeapBodyBuffer(); - } - - if (logger.isTraceEnabled()) { - logger.trace(this + "::FlowControl::" + " deliver initialpackage with " + packetSize + " delivered, available now = " + availableCredits); - } + lockDelivery.readLock().lock(); + try { + if (!started) { + return false; } - // Execute the rest of the large message on a different thread so as not to tie up the delivery thread - // for too long + LargeServerMessage currentLargeMessage = largeMessage; + if (currentLargeMessage == null) { + return true; + } - resumeLargeMessage(); - - return false; - } else { if (availableCredits != null && availableCredits.get() <= 0) { if (logger.isTraceEnabled()) { - logger.trace(this + "::FlowControl::deliverLargeMessage Leaving loop of send LargeMessage because of credits, available=" + availableCredits); + logger.trace(this + "::FlowControl::delivery largeMessage interrupting as there are no more credits, available=" + + availableCredits); } releaseHeapBodyBuffer(); return false; } - final int localChunkLen = (int) Math.min(sizePendingLargeMessage - positionPendingLargeMessage, minLargeMessageSize); + if (!sentInitialPacket) { + context = currentLargeMessage.getBodyEncoder(); - final ByteBuffer bodyBuffer = acquireHeapBodyBuffer(localChunkLen); + sizePendingLargeMessage = context.getLargeBodySize(); - assert bodyBuffer.remaining() == localChunkLen; + context.open(); - final int readBytes = context.encode(bodyBuffer); + sentInitialPacket = true; - assert readBytes == localChunkLen; + int packetSize = callback.sendLargeMessage(ref, currentLargeMessage, ServerConsumerImpl.this, context.getLargeBodySize(), ref.getDeliveryCount()); - final byte[] body = bodyBuffer.array(); + if (availableCredits != null) { + final int credits = availableCredits.addAndGet(-packetSize); - assert body.length == readBytes; + if (credits <= 0) { + releaseHeapBodyBuffer(); + } - //It is possible to recycle the same heap body buffer because it won't be cached by sendLargeMessageContinuation - //given that requiresResponse is false: ChannelImpl::send will use the resend cache only if - //resendCache != null && packet.isRequiresConfirmations() - - int packetSize = callback.sendLargeMessageContinuation(ServerConsumerImpl.this, body, positionPendingLargeMessage + localChunkLen < sizePendingLargeMessage, false); - - int chunkLen = body.length; - - if (availableCredits != null) { - final int credits = availableCredits.addAndGet(-packetSize); - - if (credits <= 0) { - releaseHeapBodyBuffer(); + if (logger.isTraceEnabled()) { + logger.trace(this + "::FlowControl::" + + " deliver initialpackage with " + + packetSize + + " delivered, available now = " + + availableCredits); + } } - if (logger.isTraceEnabled()) { - logger.trace(this + "::FlowControl::largeMessage deliver continuation, packetSize=" + packetSize + " available now=" + availableCredits); - } - } + // Execute the rest of the large message on a different thread so as not to tie up the delivery thread + // for too long - positionPendingLargeMessage += chunkLen; - - if (positionPendingLargeMessage < sizePendingLargeMessage) { resumeLargeMessage(); return false; + } else { + if (availableCredits != null && availableCredits.get() <= 0) { + if (logger.isTraceEnabled()) { + logger.trace(this + "::FlowControl::deliverLargeMessage Leaving loop of send LargeMessage because of credits, available=" + + availableCredits); + } + releaseHeapBodyBuffer(); + return false; + } + + final int localChunkLen = (int) Math.min(sizePendingLargeMessage - positionPendingLargeMessage, minLargeMessageSize); + + final ByteBuffer bodyBuffer = acquireHeapBodyBuffer(localChunkLen); + + assert bodyBuffer.remaining() == localChunkLen; + + final int readBytes = context.encode(bodyBuffer); + + assert readBytes == localChunkLen; + + final byte[] body = bodyBuffer.array(); + + assert body.length == readBytes; + + //It is possible to recycle the same heap body buffer because it won't be cached by sendLargeMessageContinuation + //given that requiresResponse is false: ChannelImpl::send will use the resend cache only if + //resendCache != null && packet.isRequiresConfirmations() + + int packetSize = callback.sendLargeMessageContinuation(ServerConsumerImpl.this, body, positionPendingLargeMessage + localChunkLen < sizePendingLargeMessage, false); + + int chunkLen = body.length; + + if (availableCredits != null) { + final int credits = availableCredits.addAndGet(-packetSize); + + if (credits <= 0) { + releaseHeapBodyBuffer(); + } + + if (logger.isTraceEnabled()) { + logger.trace(this + "::FlowControl::largeMessage deliver continuation, packetSize=" + + packetSize + + " available now=" + + availableCredits); + } + } + + positionPendingLargeMessage += chunkLen; + + if (positionPendingLargeMessage < sizePendingLargeMessage) { + resumeLargeMessage(); + + return false; + } } + + if (logger.isTraceEnabled()) { + logger.trace("Finished deliverLargeMessage"); + } + + finish(); + + return true; + } finally { + lockDelivery.readLock().unlock(); } - - if (logger.isTraceEnabled()) { - logger.trace("Finished deliverLargeMessage"); - } - - finish(); - - return true; } public void finish() throws Exception { @@ -1408,7 +1453,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener { } if (status == HandleStatus.HANDLED) { - afterDeliver(current); + proceedDeliver(current); } current = null; @@ -1436,7 +1481,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener { } if (status == HandleStatus.HANDLED) { - afterDeliver(ref); + proceedDeliver(ref); } else if (status == HandleStatus.BUSY) { // keep a reference on the current message reference // to handle it next time the browser deliverer is executed diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/SessionCallback.java b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/SessionCallback.java index 18ef2537f8..5577522aca 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/SessionCallback.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/SessionCallback.java @@ -41,10 +41,11 @@ public interface SessionCallback { */ boolean hasCredits(ServerConsumer consumerID); - // Certain protocols (MQTT) will need to confirm messages doing things such as individualACKS - // and these need to be done outside of the main lock. - // otherwise we could dead-lock during delivery - void afterDeliver(MessageReference ref, Message message, ServerConsumer consumerID, int deliveryCount) throws Exception; + /** + * This can be used to complete certain operations outside of the lock, + * like acks or other operations. + */ + void afterDelivery() throws Exception; /** * Use this to updates specifics on the message after a redelivery happened. @@ -68,7 +69,6 @@ public interface SessionCallback { // Future developments may change this, but beware why I have chosen to keep the parameter separated here int sendMessage(MessageReference ref, Message message, ServerConsumer consumerID, int deliveryCount); - int sendLargeMessage(MessageReference reference, Message message, ServerConsumer consumerID, diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cli/DummyServerConsumer.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cli/DummyServerConsumer.java index 9bc0ea26cf..58bf2d3d5a 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cli/DummyServerConsumer.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cli/DummyServerConsumer.java @@ -190,7 +190,7 @@ public class DummyServerConsumer implements ServerConsumer { } @Override - public void afterDeliver(MessageReference reference) throws Exception { + public void proceedDeliver(MessageReference reference) throws Exception { } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java index 0006752894..3e64ac5fc7 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java @@ -527,6 +527,11 @@ public class HangConsumerTest extends ActiveMQTestBase { return true; } + @Override + public void afterDelivery() throws Exception { + + } + @Override public void sendProducerCreditsFailMessage(int credits, SimpleString address) { targetCallback.sendProducerCreditsFailMessage(credits, address); @@ -553,11 +558,6 @@ public class HangConsumerTest extends ActiveMQTestBase { } } - @Override - public void afterDeliver(MessageReference ref, Message message, ServerConsumer consumerID, int deliveryCount) { - - } - /* (non-Javadoc) * @see SessionCallback#sendLargeMessage(org.apache.activemq.artemis.core.server.ServerMessage, long, long, int) */ diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/client/GroupingTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/client/GroupingTest.java index fc32c44c2d..ba8cd95304 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/client/GroupingTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/client/GroupingTest.java @@ -271,6 +271,7 @@ public class GroupingTest extends JMSTestBase { assertEquals(tm.getStringProperty("JMSXGroupID"), jmsxgroupID); } + Thread.sleep(2000); //session.rollback(); //session.close(); //consume all msgs from 2nd first consumer diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/QueueImplTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/QueueImplTest.java index 076e164968..f20cce3d56 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/QueueImplTest.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/QueueImplTest.java @@ -1312,21 +1312,12 @@ public class QueueImplTest extends ActiveMQTestBase { @Override public synchronized HandleStatus handle(MessageReference reference) { - return HandleStatus.HANDLED; - } - - @Override - public Object handleWithGroup(GroupHandler groupHandler, boolean newGroup, MessageReference reference) throws Exception { if (count == 0) { //the first message is handled and will be used to determine this consumer //to be the group consumer count++; firstMessageHandled.countDown(); - if (groupHandler != null) { - return groupHandler.handleMessageGroup(reference, this, newGroup); - } else { - return HandleStatus.HANDLED; - } + return HandleStatus.HANDLED; } else if (count <= 2) { //the next two attempts to send the second message will be done //attempting a direct delivery and an async one after that @@ -1338,11 +1329,7 @@ public class QueueImplTest extends ActiveMQTestBase { //the second message should have stop the delivery loop: //it will succeed just to let the message being handled and //reduce the message count to 0 - if (groupHandler != null) { - return groupHandler.handleMessageGroup(reference, this, newGroup); - } else { - return HandleStatus.HANDLED; - } + return HandleStatus.HANDLED; } } }; diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakeConsumer.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakeConsumer.java index 47b042b0d2..2a5a33061f 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakeConsumer.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakeConsumer.java @@ -138,7 +138,7 @@ public class FakeConsumer implements Consumer { } @Override - public void afterDeliver(MessageReference ref) throws Exception { + public void proceedDeliver(MessageReference ref) throws Exception { // no op }