ARTEMIS-2423 Improving Consumer/Queue Delivery lock

This commit is contained in:
Clebert Suconic 2019-07-25 12:10:24 -04:00
parent d2d21516ba
commit 7507a9fd4b
18 changed files with 282 additions and 296 deletions

View File

@ -186,11 +186,6 @@ public class AMQPSessionCallback implements SessionCallback {
(String) null, this, true, operationContext, manager.getPrefixes()); (String) null, this, true, operationContext, manager.getPrefixes());
} }
@Override
public void afterDelivery() throws Exception {
}
public void start() { public void start() {
} }
@ -604,6 +599,11 @@ public class AMQPSessionCallback implements SessionCallback {
} }
@Override
public void afterDeliver(MessageReference ref, Message message, ServerConsumer consumerID, int deliveryCount) {
}
@Override @Override
public int sendLargeMessage(MessageReference ref, public int sendLargeMessage(MessageReference ref,
Message message, Message message,

View File

@ -59,6 +59,12 @@ public class MQTTPublishManager {
private MQTTSessionState.OutboundStore outboundStore; private MQTTSessionState.OutboundStore outboundStore;
/** this is the last qos that happened during delivery.
* since afterDelivery will happen in the same thread, no other threads should be calling delivery and afterDelivery
* so it is safe to keep this value here.
*/
private Integer currentQos;
public MQTTPublishManager(MQTTSession session) { public MQTTPublishManager(MQTTSession session) {
this.session = session; this.session = session;
} }
@ -108,7 +114,6 @@ public class MQTTPublishManager {
boolean isManagementConsumer(ServerConsumer consumer) { boolean isManagementConsumer(ServerConsumer consumer) {
return consumer == managementConsumer; return consumer == managementConsumer;
} }
/** /**
* Since MQTT Subscriptions can over lap; a client may receive the same message twice. When this happens the client * Since MQTT Subscriptions can over lap; a client may receive the same message twice. When this happens the client
* returns a PubRec or PubAck with ID. But we need to know which consumer to ack, since we only have the ID to go on we * returns a PubRec or PubAck with ID. But we need to know which consumer to ack, since we only have the ID to go on we
@ -119,20 +124,35 @@ public class MQTTPublishManager {
protected void sendMessage(ICoreMessage message, ServerConsumer consumer, int deliveryCount) throws Exception { protected void sendMessage(ICoreMessage message, ServerConsumer consumer, int deliveryCount) throws Exception {
// This is to allow retries of PubRel. // This is to allow retries of PubRel.
if (isManagementConsumer(consumer)) { if (isManagementConsumer(consumer)) {
currentQos = null;
sendPubRelMessage(message); sendPubRelMessage(message);
} else { } else {
int qos = decideQoS(message, consumer); int qos = decideQoS(message, consumer);
currentQos = qos;
if (qos == 0) { if (qos == 0) {
sendServerMessage((int) message.getMessageID(), message, deliveryCount, qos); sendServerMessage((int) message.getMessageID(), message, deliveryCount, qos);
session.getServerSession().individualAcknowledge(consumer.getID(), message.getMessageID());
} else if (qos == 1 || qos == 2) { } else if (qos == 1 || qos == 2) {
int mqttid = outboundStore.generateMqttId(message.getMessageID(), consumer.getID()); int mqttid = outboundStore.generateMqttId(message.getMessageID(), consumer.getID());
outboundStore.publish(mqttid, message.getMessageID(), consumer.getID()); outboundStore.publish(mqttid, message.getMessageID(), consumer.getID());
sendServerMessage(mqttid, message, deliveryCount, qos); sendServerMessage(mqttid, message, deliveryCount, qos);
} else {
// this will happen during afterDeliver
}
}
}
protected void confirmMessage(ICoreMessage message, ServerConsumer consumer, int deliveryCount) throws Exception {
if (currentQos != null) {
int qos = currentQos.intValue();
if (qos == 0) {
session.getServerSession().individualAcknowledge(consumer.getID(), message.getMessageID());
} else if (qos == 1 || qos == 2) {
// everything happened in delivery
} else { } else {
// Client must have disconnected and it's Subscription QoS cleared // Client must have disconnected and it's Subscription QoS cleared
consumer.individualCancel(message.getMessageID(), false); consumer.individualCancel(message.getMessageID(), false);
} }
} }
} }

View File

@ -59,6 +59,15 @@ public class MQTTSessionCallback implements SessionCallback {
return 1; return 1;
} }
@Override
public void afterDeliver(MessageReference ref, Message message, ServerConsumer consumer, int deliveryCount) {
try {
session.getMqttPublishManager().confirmMessage(message.toCore(), consumer, deliveryCount);
} catch (Exception e) {
log.warn("Unable to send message: " + message.getMessageID() + " Cause: " + e.getMessage(), e);
}
}
@Override @Override
public boolean updateDeliveryCountAfterCancel(ServerConsumer consumer, MessageReference ref, boolean failed) { public boolean updateDeliveryCountAfterCancel(ServerConsumer consumer, MessageReference ref, boolean failed) {
return false; return false;
@ -90,11 +99,6 @@ public class MQTTSessionCallback implements SessionCallback {
} }
} }
@Override
public void afterDelivery() throws Exception {
}
@Override @Override
public void browserFinished(ServerConsumer consumer) { public void browserFinished(ServerConsumer consumer) {

View File

@ -270,12 +270,6 @@ public class AMQSession implements SessionCallback {
} }
// rename actualDest to destination
@Override
public void afterDelivery() throws Exception {
}
@Override @Override
public void browserFinished(ServerConsumer consumer) { public void browserFinished(ServerConsumer consumer) {
AMQConsumer theConsumer = (AMQConsumer) consumer.getProtocolData(); AMQConsumer theConsumer = (AMQConsumer) consumer.getProtocolData();
@ -312,6 +306,14 @@ public class AMQSession implements SessionCallback {
return theConsumer.handleDeliver(reference, message.toCore(), deliveryCount); return theConsumer.handleDeliver(reference, message.toCore(), deliveryCount);
} }
@Override
public void afterDeliver(MessageReference ref,
org.apache.activemq.artemis.api.core.Message message,
ServerConsumer consumerID,
int deliveryCount) {
}
@Override @Override
public int sendLargeMessage(MessageReference reference, public int sendLargeMessage(MessageReference reference,
org.apache.activemq.artemis.api.core.Message message, org.apache.activemq.artemis.api.core.Message message,

View File

@ -110,8 +110,9 @@ public class StompSession implements SessionCallback {
public void sendProducerCreditsFailMessage(int credits, SimpleString address) { public void sendProducerCreditsFailMessage(int credits, SimpleString address) {
} }
@Override @Override
public void afterDelivery() throws Exception { public void afterDeliver(MessageReference ref, Message message, ServerConsumer consumerID, int deliveryCount) throws Exception {
PendingTask task; PendingTask task;
while ((task = afterDeliveryTasks.poll()) != null) { while ((task = afterDeliveryTasks.poll()) != null) {
task.run(); task.run();

View File

@ -211,11 +211,6 @@ public class ManagementRemotingConnection implements RemotingConnection {
return false; return false;
} }
@Override
public void afterDelivery() throws Exception {
}
@Override @Override
public boolean updateDeliveryCountAfterCancel(ServerConsumer consumer, MessageReference ref, boolean failed) { public boolean updateDeliveryCountAfterCancel(ServerConsumer consumer, MessageReference ref, boolean failed) {
return false; return false;
@ -236,6 +231,11 @@ public class ManagementRemotingConnection implements RemotingConnection {
return 0; return 0;
} }
@Override
public void afterDeliver(MessageReference ref, Message message, ServerConsumer consumerID, int deliveryCount) {
}
@Override @Override
public int sendLargeMessage(MessageReference reference, public int sendLargeMessage(MessageReference reference,
Message message, Message message,

View File

@ -132,6 +132,11 @@ public final class CoreSessionCallback implements SessionCallback {
return size; return size;
} }
@Override
public void afterDeliver(MessageReference ref, Message message, ServerConsumer consumerID, int deliveryCount) {
// no op
}
@Override @Override
public void sendProducerCreditsMessage(int credits, SimpleString address) { public void sendProducerCreditsMessage(int credits, SimpleString address) {
Packet packet = new SessionProducerCreditsMessage(credits, address); Packet packet = new SessionProducerCreditsMessage(credits, address);
@ -144,11 +149,6 @@ public final class CoreSessionCallback implements SessionCallback {
} }
@Override
public void afterDelivery() throws Exception {
}
@Override @Override
public void sendProducerCreditsFailMessage(int credits, SimpleString address) { public void sendProducerCreditsFailMessage(int credits, SimpleString address) {
Packet packet = new SessionProducerCreditsFailMessage(credits, address); Packet packet = new SessionProducerCreditsFailMessage(credits, address);

View File

@ -25,6 +25,10 @@ import org.apache.activemq.artemis.spi.core.protocol.SessionCallback;
public interface Consumer extends PriorityAware { public interface Consumer extends PriorityAware {
interface GroupHandler {
MessageReference handleMessageGroup(MessageReference ref, Consumer consumer, boolean newGroup);
}
/** /**
* *
* @see SessionCallback#supportsDirectDelivery() * @see SessionCallback#supportsDirectDelivery()
@ -34,13 +38,7 @@ public interface Consumer extends PriorityAware {
} }
/** /**
* There was a change on semantic during 2.3 here.<br>
* We now first accept the message, and the actual deliver is done as part of
* {@link #proceedDeliver(MessageReference)}. This is to avoid holding a lock on the queues while
* the delivery is being accomplished To avoid a lock on the queue in case of misbehaving
* consumers.
* <p>
* This should return busy if handle is called before proceed deliver is called
* *
* @param reference * @param reference
* @return * @return
@ -48,19 +46,29 @@ public interface Consumer extends PriorityAware {
*/ */
HandleStatus handle(MessageReference reference) throws Exception; HandleStatus handle(MessageReference reference) throws Exception;
/**
* This will return {@link HandleStatus#BUSY} if busy, {@link HandleStatus#NO_MATCH} if no match, or the MessageReference is handled
* This should return busy if handle is called before proceed deliver is called
* @param groupHandler
* @param reference
* @return
* @throws Exception
*/
default Object handleWithGroup(GroupHandler groupHandler, boolean newGroup, MessageReference reference) throws Exception {
return handle(reference);
}
/** wakes up internal threads to deliver more messages */ /** wakes up internal threads to deliver more messages */
default void promptDelivery() { default void promptDelivery() {
} }
/** /**
* This will proceed with the actual delivery. * This will called after delivery
* Notice that handle should hold a readLock and proceedDelivery should release the readLock * Giving protocols a chance to complete their deliveries doing things such as individualACK outside of main locks
* any lock operation on Consumer should also get a writeLock on the readWriteLock
* to guarantee there are no pending deliveries
* *
* @throws Exception * @throws Exception
*/ */
void proceedDeliver(MessageReference reference) throws Exception; void afterDeliver(MessageReference reference) throws Exception;
Filter getFilter(); Filter getFilter();

View File

@ -689,7 +689,7 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
// FailureListener implementation -------------------------------- // FailureListener implementation --------------------------------
@Override @Override
public void proceedDeliver(MessageReference ref) { public void afterDeliver(MessageReference ref) {
// no op // no op
} }

View File

@ -193,7 +193,7 @@ public class Redistributor implements Consumer {
} }
@Override @Override
public void proceedDeliver(MessageReference ref) { public void afterDeliver(MessageReference ref) {
// no op // no op
} }

View File

@ -98,7 +98,6 @@ import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.utils.BooleanUtil; import org.apache.activemq.artemis.utils.BooleanUtil;
import org.apache.activemq.artemis.utils.Env; import org.apache.activemq.artemis.utils.Env;
import org.apache.activemq.artemis.utils.ReferenceCounter; import org.apache.activemq.artemis.utils.ReferenceCounter;
import org.apache.activemq.artemis.utils.ReusableLatch;
import org.apache.activemq.artemis.utils.actors.ArtemisExecutor; import org.apache.activemq.artemis.utils.actors.ArtemisExecutor;
import org.apache.activemq.artemis.utils.collections.LinkedListIterator; import org.apache.activemq.artemis.utils.collections.LinkedListIterator;
import org.apache.activemq.artemis.utils.collections.PriorityLinkedList; import org.apache.activemq.artemis.utils.collections.PriorityLinkedList;
@ -114,7 +113,7 @@ import org.jboss.logging.Logger;
* <p> * <p>
* Completely non blocking between adding to queue and delivering to consumers. * Completely non blocking between adding to queue and delivering to consumers.
*/ */
public class QueueImpl extends CriticalComponentImpl implements Queue { public class QueueImpl extends CriticalComponentImpl implements Queue, Consumer.GroupHandler {
protected static final int CRITICAL_PATHS = 5; protected static final int CRITICAL_PATHS = 5;
protected static final int CRITICAL_PATH_ADD_TAIL = 0; protected static final int CRITICAL_PATH_ADD_TAIL = 0;
@ -268,8 +267,6 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
private final ExpiryScanner expiryScanner = new ExpiryScanner(); private final ExpiryScanner expiryScanner = new ExpiryScanner();
private final ReusableLatch deliveriesInTransit = new ReusableLatch(0);
private final AtomicLong queueRateCheckTime = new AtomicLong(System.currentTimeMillis()); private final AtomicLong queueRateCheckTime = new AtomicLong(System.currentTimeMillis());
private final AtomicLong messagesAddedSnapshot = new AtomicLong(0); private final AtomicLong messagesAddedSnapshot = new AtomicLong(0);
@ -955,7 +952,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
// directDeliver flag to be re-computed resulting in direct delivery if the queue is empty // directDeliver flag to be re-computed resulting in direct delivery if the queue is empty
// We don't recompute it on every delivery since executing isEmpty is expensive for a ConcurrentQueue // We don't recompute it on every delivery since executing isEmpty is expensive for a ConcurrentQueue
if (deliveriesInTransit.getCount() == 0 && getExecutor().isFlushed() && if (getExecutor().isFlushed() &&
intermediateMessageReferences.isEmpty() && messageReferences.isEmpty() && intermediateMessageReferences.isEmpty() && messageReferences.isEmpty() &&
pageIterator != null && !pageIterator.hasNext() && pageIterator != null && !pageIterator.hasNext() &&
pageSubscription != null && !pageSubscription.isPaging()) { pageSubscription != null && !pageSubscription.isPaging()) {
@ -974,7 +971,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
} }
} }
if (direct && supportsDirectDeliver && directDeliver && deliveriesInTransit.getCount() == 0 && deliverDirect(ref)) { if (direct && supportsDirectDeliver && directDeliver && deliverDirect(ref)) {
return; return;
} }
@ -1005,23 +1002,6 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
return false; return false;
} }
/**
* This will wait for any pending deliveries to finish
*/
private boolean flushDeliveriesInTransit() {
try {
if (deliveriesInTransit.await(DELIVERY_TIMEOUT)) {
return true;
} else {
ActiveMQServerLogger.LOGGER.timeoutFlushInTransit(getName().toString(), getAddress().toString());
return false;
}
} catch (Exception e) {
ActiveMQServerLogger.LOGGER.unableToFlushDeliveries(e);
return false;
}
}
@Override @Override
public void forceDelivery() { public void forceDelivery() {
if (pageSubscription != null && pageSubscription.isPaging()) { if (pageSubscription != null && pageSubscription.isPaging()) {
@ -2366,7 +2346,6 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
@Override @Override
public synchronized void pause(boolean persist) { public synchronized void pause(boolean persist) {
try { try {
this.flushDeliveriesInTransit();
if (persist && isDurable()) { if (persist && isDurable()) {
if (pauseStatusRecord >= 0) { if (pauseStatusRecord >= 0) {
storageManager.deleteQueueStatus(pauseStatusRecord); storageManager.deleteQueueStatus(pauseStatusRecord);
@ -2607,7 +2586,16 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
consumer = groupConsumer; consumer = groupConsumer;
} }
HandleStatus status = handle(ref, consumer); Object handleValue = handle(ref, consumer, groupConsumer == null);
HandleStatus status;
if (handleValue instanceof MessageReference) {
ref = (MessageReference) handleValue;
status = HandleStatus.HANDLED;
} else {
status = (HandleStatus) handleValue;
}
if (status == HandleStatus.HANDLED) { if (status == HandleStatus.HANDLED) {
@ -2615,13 +2603,6 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
// this is to avoid breaks on the loop when checking for any other factors. // this is to avoid breaks on the loop when checking for any other factors.
noDelivery = 0; noDelivery = 0;
if (redistributor == null) {
ref = handleMessageGroup(ref, consumer, groupConsumer, groupID);
}
deliveriesInTransit.countUp();
removeMessageReference(holder, ref); removeMessageReference(holder, ref);
handledconsumer = consumer; handledconsumer = consumer;
handled++; handled++;
@ -2653,16 +2634,10 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
// Round robin'd all // Round robin'd all
if (noDelivery == this.consumers.size()) { if (noDelivery == this.consumers.size()) {
if (handledconsumer != null) { if (logger.isDebugEnabled()) {
// this shouldn't really happen, logger.debug(this + "::All the consumers were busy, giving up now");
// however I'm keeping this as an assertion case future developers ever change the logic here on this class
ActiveMQServerLogger.LOGGER.nonDeliveryHandled();
} else {
if (logger.isDebugEnabled()) {
logger.debug(this + "::All the consumers were busy, giving up now");
}
break;
} }
break;
} }
noDelivery = 0; noDelivery = 0;
@ -2670,7 +2645,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
} }
if (handledconsumer != null) { if (handledconsumer != null) {
proceedDeliver(handledconsumer, ref); afterDeliver(handledconsumer, ref);
} }
} }
@ -3198,7 +3173,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
} }
} }
private boolean deliver(final MessageReference ref) { private boolean deliver(MessageReference ref) {
synchronized (this) { synchronized (this) {
if (!supportsDirectDeliver) { if (!supportsDirectDeliver) {
return false; return false;
@ -3225,20 +3200,24 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
consumer = groupConsumer; consumer = groupConsumer;
} }
HandleStatus status = handle(ref, consumer); Object handleValue = handle(ref, consumer, groupConsumer == null);
HandleStatus status;
final MessageReference reference;
if (handleValue instanceof MessageReference) {
reference = (MessageReference) handleValue;
status = HandleStatus.HANDLED;
} else {
reference = ref;
status = (HandleStatus) handleValue;
}
if (status == HandleStatus.HANDLED) { if (status == HandleStatus.HANDLED) {
final MessageReference reference;
if (redistributor == null) {
reference = handleMessageGroup(ref, consumer, groupConsumer, groupID);
} else {
reference = ref;
}
messagesAdded.incrementAndGet(); messagesAdded.incrementAndGet();
deliveriesInTransit.countUp();
proceedDeliver(consumer, reference);
consumers.reset(); consumers.reset();
return true; return true;
} }
@ -3269,9 +3248,16 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
return groupConsumer; return groupConsumer;
} }
private MessageReference handleMessageGroup(MessageReference ref, Consumer consumer, Consumer groupConsumer, SimpleString groupID) { /** This is {@link Consumer.GroupHandler#handleMessageGroup(MessageReference, Consumer, boolean)} */
@Override
public MessageReference handleMessageGroup(MessageReference ref, Consumer consumer, boolean newGroup) {
if (redistributor != null) {
// no grouping work on this case
return ref;
}
SimpleString groupID = extractGroupID(ref);
if (exclusive) { if (exclusive) {
if (groupConsumer == null) { if (newGroup) {
exclusiveConsumer = consumer; exclusiveConsumer = consumer;
if (groupFirstKey != null) { if (groupFirstKey != null) {
return new GroupFirstMessageReference(groupFirstKey, ref); return new GroupFirstMessageReference(groupFirstKey, ref);
@ -3282,7 +3268,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
if (extractGroupSequence(ref) == -1) { if (extractGroupSequence(ref) == -1) {
groups.remove(groupID); groups.remove(groupID);
consumers.repeat(); consumers.repeat();
} else if (groupConsumer == null) { } else if (newGroup) {
groups.put(groupID, consumer); groups.put(groupID, consumer);
if (groupFirstKey != null) { if (groupFirstKey != null) {
return new GroupFirstMessageReference(groupFirstKey, ref); return new GroupFirstMessageReference(groupFirstKey, ref);
@ -3294,13 +3280,11 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
return ref; return ref;
} }
private void proceedDeliver(Consumer consumer, MessageReference reference) { private void afterDeliver(Consumer consumer, MessageReference reference) {
try { try {
consumer.proceedDeliver(reference); consumer.afterDeliver(reference);
} catch (Throwable t) { } catch (Throwable t) {
errorProcessing(consumer, t, reference); errorProcessing(consumer, t, reference);
} finally {
deliveriesInTransit.countDown();
} }
} }
@ -3345,10 +3329,10 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
} }
} }
private synchronized HandleStatus handle(final MessageReference reference, final Consumer consumer) { private synchronized Object handle(final MessageReference reference, final Consumer consumer, boolean newGroup) {
HandleStatus status; Object status;
try { try {
status = consumer.handle(reference); status = consumer.handleWithGroup(this, newGroup, reference);
} catch (Throwable t) { } catch (Throwable t) {
ActiveMQServerLogger.LOGGER.removingBadConsumer(t, consumer, reference); ActiveMQServerLogger.LOGGER.removingBadConsumer(t, consumer, reference);

View File

@ -24,11 +24,8 @@ import java.util.Collections;
import java.util.Iterator; import java.util.Iterator;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration; import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ActiveMQException;
@ -107,13 +104,6 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
private SlowConsumerDetectionListener slowConsumerListener; private SlowConsumerDetectionListener slowConsumerListener;
/**
* We get a readLock when a message is handled, and return the readLock when the message is finally delivered
* When stopping the consumer we need to get a writeLock to make sure we had all delivery finished
* otherwise a rollback may get message sneaking in
*/
private final ReadWriteLock lockDelivery = new ReentrantReadWriteLock();
private volatile AtomicInteger availableCredits = new AtomicInteger(0); private volatile AtomicInteger availableCredits = new AtomicInteger(0);
private boolean started; private boolean started;
@ -392,8 +382,20 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
messageQueue.errorProcessing(this, e, deliveryObject); messageQueue.errorProcessing(this, e, deliveryObject);
} }
/** This is in case someone is using direct old API */
@Override @Override
public HandleStatus handle(final MessageReference ref) throws Exception { public HandleStatus handle(MessageReference ref) throws Exception {
Object refReturn = handleWithGroup(null, false, ref);
if (refReturn instanceof MessageReference) {
return HandleStatus.HANDLED;
} else {
return (HandleStatus) refReturn;
}
}
@Override
public Object handleWithGroup(GroupHandler handler, boolean newGroup, final MessageReference ref) throws Exception {
// available credits can be set back to null with a flow control option. // available credits can be set back to null with a flow control option.
AtomicInteger checkInteger = availableCredits; AtomicInteger checkInteger = availableCredits;
if (callback != null && !callback.hasCredits(this) || checkInteger != null && checkInteger.get() <= 0) { if (callback != null && !callback.hasCredits(this) || checkInteger != null && checkInteger.get() <= 0) {
@ -481,42 +483,46 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
} }
lockDelivery.readLock().lock(); MessageReference deliveryReference = ref;
if (handler != null) {
deliveryReference = handler.handleMessageGroup(ref, this, newGroup);
}
proceedDeliver(deliveryReference);
return HandleStatus.HANDLED; return HandleStatus.HANDLED;
} }
} }
@Override private void proceedDeliver(MessageReference reference) throws Exception {
public void proceedDeliver(MessageReference reference) throws Exception { Message message = reference.getMessage();
try {
Message message = reference.getMessage();
if (server.hasBrokerMessagePlugins()) { if (server.hasBrokerMessagePlugins()) {
server.callBrokerMessagePlugins(plugin -> plugin.beforeDeliver(this, reference)); server.callBrokerMessagePlugins(plugin -> plugin.beforeDeliver(this, reference));
}
if (message.isLargeMessage() && supportLargeMessage) {
if (largeMessageDeliverer == null) {
// This can't really happen as handle had already crated the deliverer
// instead of throwing an exception in weird cases there is no problem on just go ahead and create it
// again here
largeMessageDeliverer = new LargeMessageDeliverer((LargeServerMessage) message, reference);
}
// The deliverer was prepared during handle, as we can't have more than one pending large message
// as it would return busy if there is anything pending
largeMessageDeliverer.deliver();
} else {
deliverStandardMessage(reference, message);
}
} finally {
lockDelivery.readLock().unlock();
callback.afterDelivery();
if (server.hasBrokerMessagePlugins()) {
server.callBrokerMessagePlugins(plugin -> plugin.afterDeliver(this, reference));
}
} }
if (message.isLargeMessage() && supportLargeMessage) {
if (largeMessageDeliverer == null) {
// This can't really happen as handle had already crated the deliverer
// instead of throwing an exception in weird cases there is no problem on just go ahead and create it
// again here
largeMessageDeliverer = new LargeMessageDeliverer((LargeServerMessage) message, reference);
}
// The deliverer was prepared during handle, as we can't have more than one pending large message
// as it would return busy if there is anything pending
largeMessageDeliverer.deliver();
} else {
deliverStandardMessage(reference, message);
}
}
@Override
public void afterDeliver(MessageReference reference) throws Exception {
callback.afterDeliver(reference, reference.getMessage(), ServerConsumerImpl.this, reference.getDeliveryCount());
if (server.hasBrokerMessagePlugins()) {
server.callBrokerMessagePlugins(plugin -> plugin.afterDeliver(this, reference));
}
} }
@Override @Override
@ -626,7 +632,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
* there are no other messages to be delivered. * there are no other messages to be delivered.
*/ */
@Override @Override
public void forceDelivery(final long sequence) { public void forceDelivery(final long sequence) {
forceDelivery(sequence, () -> { forceDelivery(sequence, () -> {
Message forcedDeliveryMessage = new CoreMessage(storageManager.generateID(), 50); Message forcedDeliveryMessage = new CoreMessage(storageManager.generateID(), 50);
MessageReference reference = MessageReference.Factory.createReference(forcedDeliveryMessage, messageQueue); MessageReference reference = MessageReference.Factory.createReference(forcedDeliveryMessage, messageQueue);
@ -730,19 +736,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
@Override @Override
public void setStarted(final boolean started) { public void setStarted(final boolean started) {
synchronized (lock) { synchronized (lock) {
boolean locked = lockDelivery(); this.started = browseOnly || started;
// This is to make sure nothing would sneak to the client while started = false
// the client will stop the session and perform a rollback in certain cases.
// in case something sneaks to the client you could get to messaging delivering forever until
// you restart the server
try {
this.started = browseOnly || started;
} finally {
if (locked) {
lockDelivery.writeLock().unlock();
}
}
} }
// Outside the lock // Outside the lock
@ -751,35 +745,10 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
} }
} }
private boolean lockDelivery() {
try {
if (!lockDelivery.writeLock().tryLock(30, TimeUnit.SECONDS)) {
ActiveMQServerLogger.LOGGER.timeoutLockingConsumer();
if (server != null) {
server.threadDump();
}
return false;
}
return true;
} catch (Exception e) {
ActiveMQServerLogger.LOGGER.failedToFinishDelivery(e);
return false;
}
}
@Override @Override
public void setTransferring(final boolean transferring) { public void setTransferring(final boolean transferring) {
synchronized (lock) { synchronized (lock) {
// This is to make sure that the delivery process has finished any pending delivery this.transferring = transferring;
// otherwise a message may sneak in on the client while we are trying to stop the consumer
boolean locked = lockDelivery();
try {
this.transferring = transferring;
} finally {
if (locked) {
lockDelivery.writeLock().unlock();
}
}
} }
// Outside the lock // Outside the lock
@ -1275,125 +1244,111 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
} }
public boolean deliver() throws Exception { public boolean deliver() throws Exception {
lockDelivery.readLock().lock(); if (!started) {
try { return false;
if (!started) { }
return false;
LargeServerMessage currentLargeMessage = largeMessage;
if (currentLargeMessage == null) {
return true;
}
if (availableCredits != null && availableCredits.get() <= 0) {
if (logger.isTraceEnabled()) {
logger.trace(this + "::FlowControl::delivery largeMessage interrupting as there are no more credits, available=" + availableCredits);
}
releaseHeapBodyBuffer();
return false;
}
if (!sentInitialPacket) {
context = currentLargeMessage.getBodyEncoder();
sizePendingLargeMessage = context.getLargeBodySize();
context.open();
sentInitialPacket = true;
int packetSize = callback.sendLargeMessage(ref, currentLargeMessage, ServerConsumerImpl.this, context.getLargeBodySize(), ref.getDeliveryCount());
if (availableCredits != null) {
final int credits = availableCredits.addAndGet(-packetSize);
if (credits <= 0) {
releaseHeapBodyBuffer();
}
if (logger.isTraceEnabled()) {
logger.trace(this + "::FlowControl::" + " deliver initialpackage with " + packetSize + " delivered, available now = " + availableCredits);
}
} }
LargeServerMessage currentLargeMessage = largeMessage; // Execute the rest of the large message on a different thread so as not to tie up the delivery thread
if (currentLargeMessage == null) { // for too long
return true;
}
resumeLargeMessage();
return false;
} else {
if (availableCredits != null && availableCredits.get() <= 0) { if (availableCredits != null && availableCredits.get() <= 0) {
if (logger.isTraceEnabled()) { if (logger.isTraceEnabled()) {
logger.trace(this + "::FlowControl::delivery largeMessage interrupting as there are no more credits, available=" + logger.trace(this + "::FlowControl::deliverLargeMessage Leaving loop of send LargeMessage because of credits, available=" + availableCredits);
availableCredits);
} }
releaseHeapBodyBuffer(); releaseHeapBodyBuffer();
return false; return false;
} }
if (!sentInitialPacket) { final int localChunkLen = (int) Math.min(sizePendingLargeMessage - positionPendingLargeMessage, minLargeMessageSize);
context = currentLargeMessage.getBodyEncoder();
sizePendingLargeMessage = context.getLargeBodySize(); final ByteBuffer bodyBuffer = acquireHeapBodyBuffer(localChunkLen);
context.open(); assert bodyBuffer.remaining() == localChunkLen;
sentInitialPacket = true; final int readBytes = context.encode(bodyBuffer);
int packetSize = callback.sendLargeMessage(ref, currentLargeMessage, ServerConsumerImpl.this, context.getLargeBodySize(), ref.getDeliveryCount()); assert readBytes == localChunkLen;
if (availableCredits != null) { final byte[] body = bodyBuffer.array();
final int credits = availableCredits.addAndGet(-packetSize);
if (credits <= 0) { assert body.length == readBytes;
releaseHeapBodyBuffer();
}
if (logger.isTraceEnabled()) { //It is possible to recycle the same heap body buffer because it won't be cached by sendLargeMessageContinuation
logger.trace(this + "::FlowControl::" + //given that requiresResponse is false: ChannelImpl::send will use the resend cache only if
" deliver initialpackage with " + //resendCache != null && packet.isRequiresConfirmations()
packetSize +
" delivered, available now = " + int packetSize = callback.sendLargeMessageContinuation(ServerConsumerImpl.this, body, positionPendingLargeMessage + localChunkLen < sizePendingLargeMessage, false);
availableCredits);
} int chunkLen = body.length;
if (availableCredits != null) {
final int credits = availableCredits.addAndGet(-packetSize);
if (credits <= 0) {
releaseHeapBodyBuffer();
} }
// Execute the rest of the large message on a different thread so as not to tie up the delivery thread if (logger.isTraceEnabled()) {
// for too long logger.trace(this + "::FlowControl::largeMessage deliver continuation, packetSize=" + packetSize + " available now=" + availableCredits);
}
}
positionPendingLargeMessage += chunkLen;
if (positionPendingLargeMessage < sizePendingLargeMessage) {
resumeLargeMessage(); resumeLargeMessage();
return false; return false;
} else {
if (availableCredits != null && availableCredits.get() <= 0) {
if (logger.isTraceEnabled()) {
logger.trace(this + "::FlowControl::deliverLargeMessage Leaving loop of send LargeMessage because of credits, available=" +
availableCredits);
}
releaseHeapBodyBuffer();
return false;
}
final int localChunkLen = (int) Math.min(sizePendingLargeMessage - positionPendingLargeMessage, minLargeMessageSize);
final ByteBuffer bodyBuffer = acquireHeapBodyBuffer(localChunkLen);
assert bodyBuffer.remaining() == localChunkLen;
final int readBytes = context.encode(bodyBuffer);
assert readBytes == localChunkLen;
final byte[] body = bodyBuffer.array();
assert body.length == readBytes;
//It is possible to recycle the same heap body buffer because it won't be cached by sendLargeMessageContinuation
//given that requiresResponse is false: ChannelImpl::send will use the resend cache only if
//resendCache != null && packet.isRequiresConfirmations()
int packetSize = callback.sendLargeMessageContinuation(ServerConsumerImpl.this, body, positionPendingLargeMessage + localChunkLen < sizePendingLargeMessage, false);
int chunkLen = body.length;
if (availableCredits != null) {
final int credits = availableCredits.addAndGet(-packetSize);
if (credits <= 0) {
releaseHeapBodyBuffer();
}
if (logger.isTraceEnabled()) {
logger.trace(this + "::FlowControl::largeMessage deliver continuation, packetSize=" +
packetSize +
" available now=" +
availableCredits);
}
}
positionPendingLargeMessage += chunkLen;
if (positionPendingLargeMessage < sizePendingLargeMessage) {
resumeLargeMessage();
return false;
}
} }
if (logger.isTraceEnabled()) {
logger.trace("Finished deliverLargeMessage");
}
finish();
return true;
} finally {
lockDelivery.readLock().unlock();
} }
if (logger.isTraceEnabled()) {
logger.trace("Finished deliverLargeMessage");
}
finish();
return true;
} }
public void finish() throws Exception { public void finish() throws Exception {
@ -1453,7 +1408,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
} }
if (status == HandleStatus.HANDLED) { if (status == HandleStatus.HANDLED) {
proceedDeliver(current); afterDeliver(current);
} }
current = null; current = null;
@ -1481,7 +1436,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
} }
if (status == HandleStatus.HANDLED) { if (status == HandleStatus.HANDLED) {
proceedDeliver(ref); afterDeliver(ref);
} else if (status == HandleStatus.BUSY) { } else if (status == HandleStatus.BUSY) {
// keep a reference on the current message reference // keep a reference on the current message reference
// to handle it next time the browser deliverer is executed // to handle it next time the browser deliverer is executed

View File

@ -41,11 +41,10 @@ public interface SessionCallback {
*/ */
boolean hasCredits(ServerConsumer consumerID); boolean hasCredits(ServerConsumer consumerID);
/** // Certain protocols (MQTT) will need to confirm messages doing things such as individualACKS
* This can be used to complete certain operations outside of the lock, // and these need to be done outside of the main lock.
* like acks or other operations. // otherwise we could dead-lock during delivery
*/ void afterDeliver(MessageReference ref, Message message, ServerConsumer consumerID, int deliveryCount) throws Exception;
void afterDelivery() throws Exception;
/** /**
* Use this to updates specifics on the message after a redelivery happened. * Use this to updates specifics on the message after a redelivery happened.
@ -69,6 +68,7 @@ public interface SessionCallback {
// Future developments may change this, but beware why I have chosen to keep the parameter separated here // Future developments may change this, but beware why I have chosen to keep the parameter separated here
int sendMessage(MessageReference ref, Message message, ServerConsumer consumerID, int deliveryCount); int sendMessage(MessageReference ref, Message message, ServerConsumer consumerID, int deliveryCount);
int sendLargeMessage(MessageReference reference, int sendLargeMessage(MessageReference reference,
Message message, Message message,
ServerConsumer consumerID, ServerConsumer consumerID,

View File

@ -190,7 +190,7 @@ public class DummyServerConsumer implements ServerConsumer {
} }
@Override @Override
public void proceedDeliver(MessageReference reference) throws Exception { public void afterDeliver(MessageReference reference) throws Exception {
} }

View File

@ -527,11 +527,6 @@ public class HangConsumerTest extends ActiveMQTestBase {
return true; return true;
} }
@Override
public void afterDelivery() throws Exception {
}
@Override @Override
public void sendProducerCreditsFailMessage(int credits, SimpleString address) { public void sendProducerCreditsFailMessage(int credits, SimpleString address) {
targetCallback.sendProducerCreditsFailMessage(credits, address); targetCallback.sendProducerCreditsFailMessage(credits, address);
@ -558,6 +553,11 @@ public class HangConsumerTest extends ActiveMQTestBase {
} }
} }
@Override
public void afterDeliver(MessageReference ref, Message message, ServerConsumer consumerID, int deliveryCount) {
}
/* (non-Javadoc) /* (non-Javadoc)
* @see SessionCallback#sendLargeMessage(org.apache.activemq.artemis.core.server.ServerMessage, long, long, int) * @see SessionCallback#sendLargeMessage(org.apache.activemq.artemis.core.server.ServerMessage, long, long, int)
*/ */

View File

@ -271,7 +271,6 @@ public class GroupingTest extends JMSTestBase {
assertEquals(tm.getStringProperty("JMSXGroupID"), jmsxgroupID); assertEquals(tm.getStringProperty("JMSXGroupID"), jmsxgroupID);
} }
Thread.sleep(2000);
//session.rollback(); //session.rollback();
//session.close(); //session.close();
//consume all msgs from 2nd first consumer //consume all msgs from 2nd first consumer

View File

@ -1312,12 +1312,21 @@ public class QueueImplTest extends ActiveMQTestBase {
@Override @Override
public synchronized HandleStatus handle(MessageReference reference) { public synchronized HandleStatus handle(MessageReference reference) {
return HandleStatus.HANDLED;
}
@Override
public Object handleWithGroup(GroupHandler groupHandler, boolean newGroup, MessageReference reference) throws Exception {
if (count == 0) { if (count == 0) {
//the first message is handled and will be used to determine this consumer //the first message is handled and will be used to determine this consumer
//to be the group consumer //to be the group consumer
count++; count++;
firstMessageHandled.countDown(); firstMessageHandled.countDown();
return HandleStatus.HANDLED; if (groupHandler != null) {
return groupHandler.handleMessageGroup(reference, this, newGroup);
} else {
return HandleStatus.HANDLED;
}
} else if (count <= 2) { } else if (count <= 2) {
//the next two attempts to send the second message will be done //the next two attempts to send the second message will be done
//attempting a direct delivery and an async one after that //attempting a direct delivery and an async one after that
@ -1329,7 +1338,11 @@ public class QueueImplTest extends ActiveMQTestBase {
//the second message should have stop the delivery loop: //the second message should have stop the delivery loop:
//it will succeed just to let the message being handled and //it will succeed just to let the message being handled and
//reduce the message count to 0 //reduce the message count to 0
return HandleStatus.HANDLED; if (groupHandler != null) {
return groupHandler.handleMessageGroup(reference, this, newGroup);
} else {
return HandleStatus.HANDLED;
}
} }
} }
}; };

View File

@ -138,7 +138,7 @@ public class FakeConsumer implements Consumer {
} }
@Override @Override
public void proceedDeliver(MessageReference ref) throws Exception { public void afterDeliver(MessageReference ref) throws Exception {
// no op // no op
} }