diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java index f850cc185e..616da5b474 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java @@ -186,11 +186,6 @@ public class AMQPSessionCallback implements SessionCallback { (String) null, this, true, operationContext, manager.getPrefixes()); } - @Override - public void afterDelivery() throws Exception { - - } - public void start() { } @@ -604,6 +599,11 @@ public class AMQPSessionCallback implements SessionCallback { } + @Override + public void afterDeliver(MessageReference ref, Message message, ServerConsumer consumerID, int deliveryCount) { + + } + @Override public int sendLargeMessage(MessageReference ref, Message message, diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java index abcfe3f3d5..bb3853931b 100644 --- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java +++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java @@ -59,6 +59,12 @@ public class MQTTPublishManager { private MQTTSessionState.OutboundStore outboundStore; + /** this is the last qos that happened during delivery. + * since afterDelivery will happen in the same thread, no other threads should be calling delivery and afterDelivery + * so it is safe to keep this value here. + */ + private Integer currentQos; + public MQTTPublishManager(MQTTSession session) { this.session = session; } @@ -108,7 +114,6 @@ public class MQTTPublishManager { boolean isManagementConsumer(ServerConsumer consumer) { return consumer == managementConsumer; } - /** * Since MQTT Subscriptions can over lap; a client may receive the same message twice. When this happens the client * returns a PubRec or PubAck with ID. But we need to know which consumer to ack, since we only have the ID to go on we @@ -119,20 +124,35 @@ public class MQTTPublishManager { protected void sendMessage(ICoreMessage message, ServerConsumer consumer, int deliveryCount) throws Exception { // This is to allow retries of PubRel. if (isManagementConsumer(consumer)) { + currentQos = null; sendPubRelMessage(message); } else { int qos = decideQoS(message, consumer); + currentQos = qos; if (qos == 0) { sendServerMessage((int) message.getMessageID(), message, deliveryCount, qos); - session.getServerSession().individualAcknowledge(consumer.getID(), message.getMessageID()); } else if (qos == 1 || qos == 2) { int mqttid = outboundStore.generateMqttId(message.getMessageID(), consumer.getID()); outboundStore.publish(mqttid, message.getMessageID(), consumer.getID()); sendServerMessage(mqttid, message, deliveryCount, qos); + } else { + // this will happen during afterDeliver + } + } + } + + protected void confirmMessage(ICoreMessage message, ServerConsumer consumer, int deliveryCount) throws Exception { + if (currentQos != null) { + int qos = currentQos.intValue(); + if (qos == 0) { + session.getServerSession().individualAcknowledge(consumer.getID(), message.getMessageID()); + } else if (qos == 1 || qos == 2) { + // everything happened in delivery } else { // Client must have disconnected and it's Subscription QoS cleared consumer.individualCancel(message.getMessageID(), false); } + } } diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionCallback.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionCallback.java index 50d57327a4..168b7fac77 100644 --- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionCallback.java +++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionCallback.java @@ -59,6 +59,15 @@ public class MQTTSessionCallback implements SessionCallback { return 1; } + @Override + public void afterDeliver(MessageReference ref, Message message, ServerConsumer consumer, int deliveryCount) { + try { + session.getMqttPublishManager().confirmMessage(message.toCore(), consumer, deliveryCount); + } catch (Exception e) { + log.warn("Unable to send message: " + message.getMessageID() + " Cause: " + e.getMessage(), e); + } + } + @Override public boolean updateDeliveryCountAfterCancel(ServerConsumer consumer, MessageReference ref, boolean failed) { return false; @@ -90,11 +99,6 @@ public class MQTTSessionCallback implements SessionCallback { } } - @Override - public void afterDelivery() throws Exception { - - } - @Override public void browserFinished(ServerConsumer consumer) { diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java index b780563ff2..e4ecd48e29 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java @@ -270,12 +270,6 @@ public class AMQSession implements SessionCallback { } - // rename actualDest to destination - @Override - public void afterDelivery() throws Exception { - - } - @Override public void browserFinished(ServerConsumer consumer) { AMQConsumer theConsumer = (AMQConsumer) consumer.getProtocolData(); @@ -312,6 +306,14 @@ public class AMQSession implements SessionCallback { return theConsumer.handleDeliver(reference, message.toCore(), deliveryCount); } + @Override + public void afterDeliver(MessageReference ref, + org.apache.activemq.artemis.api.core.Message message, + ServerConsumer consumerID, + int deliveryCount) { + + } + @Override public int sendLargeMessage(MessageReference reference, org.apache.activemq.artemis.api.core.Message message, diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java index 80bbbe8613..03f9b4433c 100644 --- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java +++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java @@ -110,8 +110,9 @@ public class StompSession implements SessionCallback { public void sendProducerCreditsFailMessage(int credits, SimpleString address) { } + @Override - public void afterDelivery() throws Exception { + public void afterDeliver(MessageReference ref, Message message, ServerConsumer consumerID, int deliveryCount) throws Exception { PendingTask task; while ((task = afterDeliveryTasks.poll()) != null) { task.run(); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ManagementRemotingConnection.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ManagementRemotingConnection.java index 7e760c139d..35eab8a659 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ManagementRemotingConnection.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ManagementRemotingConnection.java @@ -211,11 +211,6 @@ public class ManagementRemotingConnection implements RemotingConnection { return false; } - @Override - public void afterDelivery() throws Exception { - - } - @Override public boolean updateDeliveryCountAfterCancel(ServerConsumer consumer, MessageReference ref, boolean failed) { return false; @@ -236,6 +231,11 @@ public class ManagementRemotingConnection implements RemotingConnection { return 0; } + @Override + public void afterDeliver(MessageReference ref, Message message, ServerConsumer consumerID, int deliveryCount) { + + } + @Override public int sendLargeMessage(MessageReference reference, Message message, diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreSessionCallback.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreSessionCallback.java index f53d028981..fce0dd5346 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreSessionCallback.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreSessionCallback.java @@ -132,6 +132,11 @@ public final class CoreSessionCallback implements SessionCallback { return size; } + @Override + public void afterDeliver(MessageReference ref, Message message, ServerConsumer consumerID, int deliveryCount) { + // no op + } + @Override public void sendProducerCreditsMessage(int credits, SimpleString address) { Packet packet = new SessionProducerCreditsMessage(credits, address); @@ -144,11 +149,6 @@ public final class CoreSessionCallback implements SessionCallback { } - @Override - public void afterDelivery() throws Exception { - - } - @Override public void sendProducerCreditsFailMessage(int credits, SimpleString address) { Packet packet = new SessionProducerCreditsFailMessage(credits, address); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Consumer.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Consumer.java index babddc22cf..269c74fd79 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Consumer.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Consumer.java @@ -25,6 +25,10 @@ import org.apache.activemq.artemis.spi.core.protocol.SessionCallback; public interface Consumer extends PriorityAware { + interface GroupHandler { + MessageReference handleMessageGroup(MessageReference ref, Consumer consumer, boolean newGroup); + } + /** * * @see SessionCallback#supportsDirectDelivery() @@ -34,13 +38,7 @@ public interface Consumer extends PriorityAware { } /** - * There was a change on semantic during 2.3 here.
- * We now first accept the message, and the actual deliver is done as part of - * {@link #proceedDeliver(MessageReference)}. This is to avoid holding a lock on the queues while - * the delivery is being accomplished To avoid a lock on the queue in case of misbehaving - * consumers. - *

- * This should return busy if handle is called before proceed deliver is called + * * @param reference * @return @@ -48,19 +46,29 @@ public interface Consumer extends PriorityAware { */ HandleStatus handle(MessageReference reference) throws Exception; + /** + * This will return {@link HandleStatus#BUSY} if busy, {@link HandleStatus#NO_MATCH} if no match, or the MessageReference is handled + * This should return busy if handle is called before proceed deliver is called + * @param groupHandler + * @param reference + * @return + * @throws Exception + */ + default Object handleWithGroup(GroupHandler groupHandler, boolean newGroup, MessageReference reference) throws Exception { + return handle(reference); + } + /** wakes up internal threads to deliver more messages */ default void promptDelivery() { } /** - * This will proceed with the actual delivery. - * Notice that handle should hold a readLock and proceedDelivery should release the readLock - * any lock operation on Consumer should also get a writeLock on the readWriteLock - * to guarantee there are no pending deliveries + * This will called after delivery + * Giving protocols a chance to complete their deliveries doing things such as individualACK outside of main locks * * @throws Exception */ - void proceedDeliver(MessageReference reference) throws Exception; + void afterDeliver(MessageReference reference) throws Exception; Filter getFilter(); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java index 7d5bbe8ac7..4c73261749 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java @@ -689,7 +689,7 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled // FailureListener implementation -------------------------------- @Override - public void proceedDeliver(MessageReference ref) { + public void afterDeliver(MessageReference ref) { // no op } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/Redistributor.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/Redistributor.java index 79820186bb..44a5e0b93c 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/Redistributor.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/Redistributor.java @@ -193,7 +193,7 @@ public class Redistributor implements Consumer { } @Override - public void proceedDeliver(MessageReference ref) { + public void afterDeliver(MessageReference ref) { // no op } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java index 998ef8cc4b..292d15f1bc 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java @@ -98,7 +98,6 @@ import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; import org.apache.activemq.artemis.utils.BooleanUtil; import org.apache.activemq.artemis.utils.Env; import org.apache.activemq.artemis.utils.ReferenceCounter; -import org.apache.activemq.artemis.utils.ReusableLatch; import org.apache.activemq.artemis.utils.actors.ArtemisExecutor; import org.apache.activemq.artemis.utils.collections.LinkedListIterator; import org.apache.activemq.artemis.utils.collections.PriorityLinkedList; @@ -114,7 +113,7 @@ import org.jboss.logging.Logger; *

* Completely non blocking between adding to queue and delivering to consumers. */ -public class QueueImpl extends CriticalComponentImpl implements Queue { +public class QueueImpl extends CriticalComponentImpl implements Queue, Consumer.GroupHandler { protected static final int CRITICAL_PATHS = 5; protected static final int CRITICAL_PATH_ADD_TAIL = 0; @@ -268,8 +267,6 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { private final ExpiryScanner expiryScanner = new ExpiryScanner(); - private final ReusableLatch deliveriesInTransit = new ReusableLatch(0); - private final AtomicLong queueRateCheckTime = new AtomicLong(System.currentTimeMillis()); private final AtomicLong messagesAddedSnapshot = new AtomicLong(0); @@ -955,7 +952,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { // directDeliver flag to be re-computed resulting in direct delivery if the queue is empty // We don't recompute it on every delivery since executing isEmpty is expensive for a ConcurrentQueue - if (deliveriesInTransit.getCount() == 0 && getExecutor().isFlushed() && + if (getExecutor().isFlushed() && intermediateMessageReferences.isEmpty() && messageReferences.isEmpty() && pageIterator != null && !pageIterator.hasNext() && pageSubscription != null && !pageSubscription.isPaging()) { @@ -974,7 +971,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { } } - if (direct && supportsDirectDeliver && directDeliver && deliveriesInTransit.getCount() == 0 && deliverDirect(ref)) { + if (direct && supportsDirectDeliver && directDeliver && deliverDirect(ref)) { return; } @@ -1005,23 +1002,6 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { return false; } - /** - * This will wait for any pending deliveries to finish - */ - private boolean flushDeliveriesInTransit() { - try { - if (deliveriesInTransit.await(DELIVERY_TIMEOUT)) { - return true; - } else { - ActiveMQServerLogger.LOGGER.timeoutFlushInTransit(getName().toString(), getAddress().toString()); - return false; - } - } catch (Exception e) { - ActiveMQServerLogger.LOGGER.unableToFlushDeliveries(e); - return false; - } - } - @Override public void forceDelivery() { if (pageSubscription != null && pageSubscription.isPaging()) { @@ -2366,7 +2346,6 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { @Override public synchronized void pause(boolean persist) { try { - this.flushDeliveriesInTransit(); if (persist && isDurable()) { if (pauseStatusRecord >= 0) { storageManager.deleteQueueStatus(pauseStatusRecord); @@ -2607,7 +2586,16 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { consumer = groupConsumer; } - HandleStatus status = handle(ref, consumer); + Object handleValue = handle(ref, consumer, groupConsumer == null); + + HandleStatus status; + + if (handleValue instanceof MessageReference) { + ref = (MessageReference) handleValue; + status = HandleStatus.HANDLED; + } else { + status = (HandleStatus) handleValue; + } if (status == HandleStatus.HANDLED) { @@ -2615,13 +2603,6 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { // this is to avoid breaks on the loop when checking for any other factors. noDelivery = 0; - if (redistributor == null) { - ref = handleMessageGroup(ref, consumer, groupConsumer, groupID); - } - - deliveriesInTransit.countUp(); - - removeMessageReference(holder, ref); handledconsumer = consumer; handled++; @@ -2653,16 +2634,10 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { // Round robin'd all if (noDelivery == this.consumers.size()) { - if (handledconsumer != null) { - // this shouldn't really happen, - // however I'm keeping this as an assertion case future developers ever change the logic here on this class - ActiveMQServerLogger.LOGGER.nonDeliveryHandled(); - } else { - if (logger.isDebugEnabled()) { - logger.debug(this + "::All the consumers were busy, giving up now"); - } - break; + if (logger.isDebugEnabled()) { + logger.debug(this + "::All the consumers were busy, giving up now"); } + break; } noDelivery = 0; @@ -2670,7 +2645,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { } if (handledconsumer != null) { - proceedDeliver(handledconsumer, ref); + afterDeliver(handledconsumer, ref); } } @@ -3198,7 +3173,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { } } - private boolean deliver(final MessageReference ref) { + private boolean deliver(MessageReference ref) { synchronized (this) { if (!supportsDirectDeliver) { return false; @@ -3225,20 +3200,24 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { consumer = groupConsumer; } - HandleStatus status = handle(ref, consumer); + Object handleValue = handle(ref, consumer, groupConsumer == null); + + + HandleStatus status; + + final MessageReference reference; + if (handleValue instanceof MessageReference) { + reference = (MessageReference) handleValue; + status = HandleStatus.HANDLED; + } else { + reference = ref; + status = (HandleStatus) handleValue; + } + if (status == HandleStatus.HANDLED) { - final MessageReference reference; - if (redistributor == null) { - reference = handleMessageGroup(ref, consumer, groupConsumer, groupID); - } else { - reference = ref; - } - messagesAdded.incrementAndGet(); - deliveriesInTransit.countUp(); - proceedDeliver(consumer, reference); consumers.reset(); return true; } @@ -3269,9 +3248,16 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { return groupConsumer; } - private MessageReference handleMessageGroup(MessageReference ref, Consumer consumer, Consumer groupConsumer, SimpleString groupID) { + /** This is {@link Consumer.GroupHandler#handleMessageGroup(MessageReference, Consumer, boolean)} */ + @Override + public MessageReference handleMessageGroup(MessageReference ref, Consumer consumer, boolean newGroup) { + if (redistributor != null) { + // no grouping work on this case + return ref; + } + SimpleString groupID = extractGroupID(ref); if (exclusive) { - if (groupConsumer == null) { + if (newGroup) { exclusiveConsumer = consumer; if (groupFirstKey != null) { return new GroupFirstMessageReference(groupFirstKey, ref); @@ -3282,7 +3268,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { if (extractGroupSequence(ref) == -1) { groups.remove(groupID); consumers.repeat(); - } else if (groupConsumer == null) { + } else if (newGroup) { groups.put(groupID, consumer); if (groupFirstKey != null) { return new GroupFirstMessageReference(groupFirstKey, ref); @@ -3294,13 +3280,11 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { return ref; } - private void proceedDeliver(Consumer consumer, MessageReference reference) { + private void afterDeliver(Consumer consumer, MessageReference reference) { try { - consumer.proceedDeliver(reference); + consumer.afterDeliver(reference); } catch (Throwable t) { errorProcessing(consumer, t, reference); - } finally { - deliveriesInTransit.countDown(); } } @@ -3345,10 +3329,10 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { } } - private synchronized HandleStatus handle(final MessageReference reference, final Consumer consumer) { - HandleStatus status; + private synchronized Object handle(final MessageReference reference, final Consumer consumer, boolean newGroup) { + Object status; try { - status = consumer.handle(reference); + status = consumer.handleWithGroup(this, newGroup, reference); } catch (Throwable t) { ActiveMQServerLogger.LOGGER.removingBadConsumer(t, consumer, reference); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java index e19b1e5c03..dde8d87d45 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java @@ -24,12 +24,8 @@ import java.util.Collections; import java.util.Iterator; import java.util.LinkedList; import java.util.List; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.locks.LockSupport; -import java.util.concurrent.locks.ReadWriteLock; -import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration; import org.apache.activemq.artemis.api.core.ActiveMQException; @@ -108,13 +104,6 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener { private SlowConsumerDetectionListener slowConsumerListener; - /** - * We get a readLock when a message is handled, and return the readLock when the message is finally delivered - * When stopping the consumer we need to get a writeLock to make sure we had all delivery finished - * otherwise a rollback may get message sneaking in - */ - private final ReadWriteLock lockDelivery = new ReentrantReadWriteLock(); - private volatile AtomicInteger availableCredits = new AtomicInteger(0); private boolean started; @@ -393,8 +382,20 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener { messageQueue.errorProcessing(this, e, deliveryObject); } + /** This is in case someone is using direct old API */ @Override - public HandleStatus handle(final MessageReference ref) throws Exception { + public HandleStatus handle(MessageReference ref) throws Exception { + Object refReturn = handleWithGroup(null, false, ref); + + if (refReturn instanceof MessageReference) { + return HandleStatus.HANDLED; + } else { + return (HandleStatus) refReturn; + } + + } + @Override + public Object handleWithGroup(GroupHandler handler, boolean newGroup, final MessageReference ref) throws Exception { // available credits can be set back to null with a flow control option. AtomicInteger checkInteger = availableCredits; if (callback != null && !callback.hasCredits(this) || checkInteger != null && checkInteger.get() <= 0) { @@ -482,42 +483,46 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener { } - lockDelivery.readLock().lock(); + MessageReference deliveryReference = ref; + + if (handler != null) { + deliveryReference = handler.handleMessageGroup(ref, this, newGroup); + } + + proceedDeliver(deliveryReference); return HandleStatus.HANDLED; } } - @Override - public void proceedDeliver(MessageReference reference) throws Exception { - try { - Message message = reference.getMessage(); + private void proceedDeliver(MessageReference reference) throws Exception { + Message message = reference.getMessage(); - if (server.hasBrokerMessagePlugins()) { - server.callBrokerMessagePlugins(plugin -> plugin.beforeDeliver(this, reference)); - } - - if (message.isLargeMessage() && supportLargeMessage) { - if (largeMessageDeliverer == null) { - // This can't really happen as handle had already crated the deliverer - // instead of throwing an exception in weird cases there is no problem on just go ahead and create it - // again here - largeMessageDeliverer = new LargeMessageDeliverer((LargeServerMessage) message, reference); - } - // The deliverer was prepared during handle, as we can't have more than one pending large message - // as it would return busy if there is anything pending - largeMessageDeliverer.deliver(); - } else { - deliverStandardMessage(reference, message); - } - } finally { - lockDelivery.readLock().unlock(); - callback.afterDelivery(); - if (server.hasBrokerMessagePlugins()) { - server.callBrokerMessagePlugins(plugin -> plugin.afterDeliver(this, reference)); - } + if (server.hasBrokerMessagePlugins()) { + server.callBrokerMessagePlugins(plugin -> plugin.beforeDeliver(this, reference)); } + if (message.isLargeMessage() && supportLargeMessage) { + if (largeMessageDeliverer == null) { + // This can't really happen as handle had already crated the deliverer + // instead of throwing an exception in weird cases there is no problem on just go ahead and create it + // again here + largeMessageDeliverer = new LargeMessageDeliverer((LargeServerMessage) message, reference); + } + // The deliverer was prepared during handle, as we can't have more than one pending large message + // as it would return busy if there is anything pending + largeMessageDeliverer.deliver(); + } else { + deliverStandardMessage(reference, message); + } + } + + @Override + public void afterDeliver(MessageReference reference) throws Exception { + callback.afterDeliver(reference, reference.getMessage(), ServerConsumerImpl.this, reference.getDeliveryCount()); + if (server.hasBrokerMessagePlugins()) { + server.callBrokerMessagePlugins(plugin -> plugin.afterDeliver(this, reference)); + } } @Override @@ -627,7 +632,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener { * there are no other messages to be delivered. */ @Override - public void forceDelivery(final long sequence) { + public void forceDelivery(final long sequence) { forceDelivery(sequence, () -> { Message forcedDeliveryMessage = new CoreMessage(storageManager.generateID(), 50); MessageReference reference = MessageReference.Factory.createReference(forcedDeliveryMessage, messageQueue); @@ -730,68 +735,21 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener { @Override public void setStarted(final boolean started) { - lockDelivery(locked -> { - // This is to make sure nothing would sneak to the client while started = false - // the client will stop the session and perform a rollback in certain cases. - // in case something sneaks to the client you could get to messaging delivering forever until - // you restart the server + synchronized (lock) { this.started = browseOnly || started; - }); + } + // Outside the lock if (started) { promptDelivery(); } } - private static final long LOCK_DELIVERY_TIMEOUT_NS = TimeUnit.SECONDS.toNanos(30); - private static final long TRY_LOCK_NS = TimeUnit.MILLISECONDS.toNanos(100); - - private boolean lockDelivery(java.util.function.Consumer task) { - final long startWait = System.nanoTime(); - long now; - while (((now = System.nanoTime()) - startWait) < LOCK_DELIVERY_TIMEOUT_NS) { - try { - if (Thread.currentThread().isInterrupted()) { - throw new InterruptedException(); - } - } catch (Exception e) { - ActiveMQServerLogger.LOGGER.failedToFinishDelivery(e); - synchronized (lock) { - task.accept(false); - } - return false; - } - synchronized (lock) { - if (lockDelivery.writeLock().tryLock()) { - try { - task.accept(true); - } finally { - lockDelivery.writeLock().unlock(); - } - return true; - } - } - //entering the lock can take some time: discount that time from the - //time before attempting to lock delivery - final long timeToLock = System.nanoTime() - now; - if (timeToLock < TRY_LOCK_NS) { - final long timeToWait = TRY_LOCK_NS - timeToLock; - LockSupport.parkNanos(timeToWait); - } - } - ActiveMQServerLogger.LOGGER.timeoutLockingConsumer(); - if (server != null) { - server.threadDump(); - } - synchronized (lock) { - task.accept(false); - } - return false; - } - @Override public void setTransferring(final boolean transferring) { - lockDelivery(locked -> this.transferring = transferring); + synchronized (lock) { + this.transferring = transferring; + } // Outside the lock if (transferring) { @@ -1286,125 +1244,111 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener { } public boolean deliver() throws Exception { - lockDelivery.readLock().lock(); - try { - if (!started) { - return false; + if (!started) { + return false; + } + + LargeServerMessage currentLargeMessage = largeMessage; + if (currentLargeMessage == null) { + return true; + } + + if (availableCredits != null && availableCredits.get() <= 0) { + if (logger.isTraceEnabled()) { + logger.trace(this + "::FlowControl::delivery largeMessage interrupting as there are no more credits, available=" + availableCredits); + } + releaseHeapBodyBuffer(); + return false; + } + + if (!sentInitialPacket) { + context = currentLargeMessage.getBodyEncoder(); + + sizePendingLargeMessage = context.getLargeBodySize(); + + context.open(); + + sentInitialPacket = true; + + int packetSize = callback.sendLargeMessage(ref, currentLargeMessage, ServerConsumerImpl.this, context.getLargeBodySize(), ref.getDeliveryCount()); + + if (availableCredits != null) { + final int credits = availableCredits.addAndGet(-packetSize); + + if (credits <= 0) { + releaseHeapBodyBuffer(); + } + + if (logger.isTraceEnabled()) { + logger.trace(this + "::FlowControl::" + " deliver initialpackage with " + packetSize + " delivered, available now = " + availableCredits); + } } - LargeServerMessage currentLargeMessage = largeMessage; - if (currentLargeMessage == null) { - return true; - } + // Execute the rest of the large message on a different thread so as not to tie up the delivery thread + // for too long + resumeLargeMessage(); + + return false; + } else { if (availableCredits != null && availableCredits.get() <= 0) { if (logger.isTraceEnabled()) { - logger.trace(this + "::FlowControl::delivery largeMessage interrupting as there are no more credits, available=" + - availableCredits); + logger.trace(this + "::FlowControl::deliverLargeMessage Leaving loop of send LargeMessage because of credits, available=" + availableCredits); } releaseHeapBodyBuffer(); return false; } - if (!sentInitialPacket) { - context = currentLargeMessage.getBodyEncoder(); + final int localChunkLen = (int) Math.min(sizePendingLargeMessage - positionPendingLargeMessage, minLargeMessageSize); - sizePendingLargeMessage = context.getLargeBodySize(); + final ByteBuffer bodyBuffer = acquireHeapBodyBuffer(localChunkLen); - context.open(); + assert bodyBuffer.remaining() == localChunkLen; - sentInitialPacket = true; + final int readBytes = context.encode(bodyBuffer); - int packetSize = callback.sendLargeMessage(ref, currentLargeMessage, ServerConsumerImpl.this, context.getLargeBodySize(), ref.getDeliveryCount()); + assert readBytes == localChunkLen; - if (availableCredits != null) { - final int credits = availableCredits.addAndGet(-packetSize); + final byte[] body = bodyBuffer.array(); - if (credits <= 0) { - releaseHeapBodyBuffer(); - } + assert body.length == readBytes; - if (logger.isTraceEnabled()) { - logger.trace(this + "::FlowControl::" + - " deliver initialpackage with " + - packetSize + - " delivered, available now = " + - availableCredits); - } + //It is possible to recycle the same heap body buffer because it won't be cached by sendLargeMessageContinuation + //given that requiresResponse is false: ChannelImpl::send will use the resend cache only if + //resendCache != null && packet.isRequiresConfirmations() + + int packetSize = callback.sendLargeMessageContinuation(ServerConsumerImpl.this, body, positionPendingLargeMessage + localChunkLen < sizePendingLargeMessage, false); + + int chunkLen = body.length; + + if (availableCredits != null) { + final int credits = availableCredits.addAndGet(-packetSize); + + if (credits <= 0) { + releaseHeapBodyBuffer(); } - // Execute the rest of the large message on a different thread so as not to tie up the delivery thread - // for too long + if (logger.isTraceEnabled()) { + logger.trace(this + "::FlowControl::largeMessage deliver continuation, packetSize=" + packetSize + " available now=" + availableCredits); + } + } + positionPendingLargeMessage += chunkLen; + + if (positionPendingLargeMessage < sizePendingLargeMessage) { resumeLargeMessage(); return false; - } else { - if (availableCredits != null && availableCredits.get() <= 0) { - if (logger.isTraceEnabled()) { - logger.trace(this + "::FlowControl::deliverLargeMessage Leaving loop of send LargeMessage because of credits, available=" + - availableCredits); - } - releaseHeapBodyBuffer(); - return false; - } - - final int localChunkLen = (int) Math.min(sizePendingLargeMessage - positionPendingLargeMessage, minLargeMessageSize); - - final ByteBuffer bodyBuffer = acquireHeapBodyBuffer(localChunkLen); - - assert bodyBuffer.remaining() == localChunkLen; - - final int readBytes = context.encode(bodyBuffer); - - assert readBytes == localChunkLen; - - final byte[] body = bodyBuffer.array(); - - assert body.length == readBytes; - - //It is possible to recycle the same heap body buffer because it won't be cached by sendLargeMessageContinuation - //given that requiresResponse is false: ChannelImpl::send will use the resend cache only if - //resendCache != null && packet.isRequiresConfirmations() - - int packetSize = callback.sendLargeMessageContinuation(ServerConsumerImpl.this, body, positionPendingLargeMessage + localChunkLen < sizePendingLargeMessage, false); - - int chunkLen = body.length; - - if (availableCredits != null) { - final int credits = availableCredits.addAndGet(-packetSize); - - if (credits <= 0) { - releaseHeapBodyBuffer(); - } - - if (logger.isTraceEnabled()) { - logger.trace(this + "::FlowControl::largeMessage deliver continuation, packetSize=" + - packetSize + - " available now=" + - availableCredits); - } - } - - positionPendingLargeMessage += chunkLen; - - if (positionPendingLargeMessage < sizePendingLargeMessage) { - resumeLargeMessage(); - - return false; - } } - - if (logger.isTraceEnabled()) { - logger.trace("Finished deliverLargeMessage"); - } - - finish(); - - return true; - } finally { - lockDelivery.readLock().unlock(); } + + if (logger.isTraceEnabled()) { + logger.trace("Finished deliverLargeMessage"); + } + + finish(); + + return true; } public void finish() throws Exception { @@ -1464,7 +1408,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener { } if (status == HandleStatus.HANDLED) { - proceedDeliver(current); + afterDeliver(current); } current = null; @@ -1492,7 +1436,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener { } if (status == HandleStatus.HANDLED) { - proceedDeliver(ref); + afterDeliver(ref); } else if (status == HandleStatus.BUSY) { // keep a reference on the current message reference // to handle it next time the browser deliverer is executed diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/SessionCallback.java b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/SessionCallback.java index 5577522aca..18ef2537f8 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/SessionCallback.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/SessionCallback.java @@ -41,11 +41,10 @@ public interface SessionCallback { */ boolean hasCredits(ServerConsumer consumerID); - /** - * This can be used to complete certain operations outside of the lock, - * like acks or other operations. - */ - void afterDelivery() throws Exception; + // Certain protocols (MQTT) will need to confirm messages doing things such as individualACKS + // and these need to be done outside of the main lock. + // otherwise we could dead-lock during delivery + void afterDeliver(MessageReference ref, Message message, ServerConsumer consumerID, int deliveryCount) throws Exception; /** * Use this to updates specifics on the message after a redelivery happened. @@ -69,6 +68,7 @@ public interface SessionCallback { // Future developments may change this, but beware why I have chosen to keep the parameter separated here int sendMessage(MessageReference ref, Message message, ServerConsumer consumerID, int deliveryCount); + int sendLargeMessage(MessageReference reference, Message message, ServerConsumer consumerID, diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cli/DummyServerConsumer.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cli/DummyServerConsumer.java index 58bf2d3d5a..9bc0ea26cf 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cli/DummyServerConsumer.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cli/DummyServerConsumer.java @@ -190,7 +190,7 @@ public class DummyServerConsumer implements ServerConsumer { } @Override - public void proceedDeliver(MessageReference reference) throws Exception { + public void afterDeliver(MessageReference reference) throws Exception { } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java index 3e64ac5fc7..0006752894 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java @@ -527,11 +527,6 @@ public class HangConsumerTest extends ActiveMQTestBase { return true; } - @Override - public void afterDelivery() throws Exception { - - } - @Override public void sendProducerCreditsFailMessage(int credits, SimpleString address) { targetCallback.sendProducerCreditsFailMessage(credits, address); @@ -558,6 +553,11 @@ public class HangConsumerTest extends ActiveMQTestBase { } } + @Override + public void afterDeliver(MessageReference ref, Message message, ServerConsumer consumerID, int deliveryCount) { + + } + /* (non-Javadoc) * @see SessionCallback#sendLargeMessage(org.apache.activemq.artemis.core.server.ServerMessage, long, long, int) */ diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/client/GroupingTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/client/GroupingTest.java index ba8cd95304..fc32c44c2d 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/client/GroupingTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/client/GroupingTest.java @@ -271,7 +271,6 @@ public class GroupingTest extends JMSTestBase { assertEquals(tm.getStringProperty("JMSXGroupID"), jmsxgroupID); } - Thread.sleep(2000); //session.rollback(); //session.close(); //consume all msgs from 2nd first consumer diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/QueueImplTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/QueueImplTest.java index f20cce3d56..076e164968 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/QueueImplTest.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/QueueImplTest.java @@ -1312,12 +1312,21 @@ public class QueueImplTest extends ActiveMQTestBase { @Override public synchronized HandleStatus handle(MessageReference reference) { + return HandleStatus.HANDLED; + } + + @Override + public Object handleWithGroup(GroupHandler groupHandler, boolean newGroup, MessageReference reference) throws Exception { if (count == 0) { //the first message is handled and will be used to determine this consumer //to be the group consumer count++; firstMessageHandled.countDown(); - return HandleStatus.HANDLED; + if (groupHandler != null) { + return groupHandler.handleMessageGroup(reference, this, newGroup); + } else { + return HandleStatus.HANDLED; + } } else if (count <= 2) { //the next two attempts to send the second message will be done //attempting a direct delivery and an async one after that @@ -1329,7 +1338,11 @@ public class QueueImplTest extends ActiveMQTestBase { //the second message should have stop the delivery loop: //it will succeed just to let the message being handled and //reduce the message count to 0 - return HandleStatus.HANDLED; + if (groupHandler != null) { + return groupHandler.handleMessageGroup(reference, this, newGroup); + } else { + return HandleStatus.HANDLED; + } } } }; diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakeConsumer.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakeConsumer.java index 2a5a33061f..47b042b0d2 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakeConsumer.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakeConsumer.java @@ -138,7 +138,7 @@ public class FakeConsumer implements Consumer { } @Override - public void proceedDeliver(MessageReference ref) throws Exception { + public void afterDeliver(MessageReference ref) throws Exception { // no op }