Revert "ARTEMIS-2423 Improving Consumer/Queue Delivery lock"
This reverts commit 7507a9fd4b
.
This commit is contained in:
parent
f73d7f5dd3
commit
8a1f267bd5
|
@ -186,6 +186,11 @@ public class AMQPSessionCallback implements SessionCallback {
|
||||||
(String) null, this, true, operationContext, manager.getPrefixes());
|
(String) null, this, true, operationContext, manager.getPrefixes());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void afterDelivery() throws Exception {
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
public void start() {
|
public void start() {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -599,11 +604,6 @@ public class AMQPSessionCallback implements SessionCallback {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public void afterDeliver(MessageReference ref, Message message, ServerConsumer consumerID, int deliveryCount) {
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public int sendLargeMessage(MessageReference ref,
|
public int sendLargeMessage(MessageReference ref,
|
||||||
Message message,
|
Message message,
|
||||||
|
|
|
@ -59,12 +59,6 @@ public class MQTTPublishManager {
|
||||||
|
|
||||||
private MQTTSessionState.OutboundStore outboundStore;
|
private MQTTSessionState.OutboundStore outboundStore;
|
||||||
|
|
||||||
/** this is the last qos that happened during delivery.
|
|
||||||
* since afterDelivery will happen in the same thread, no other threads should be calling delivery and afterDelivery
|
|
||||||
* so it is safe to keep this value here.
|
|
||||||
*/
|
|
||||||
private Integer currentQos;
|
|
||||||
|
|
||||||
public MQTTPublishManager(MQTTSession session) {
|
public MQTTPublishManager(MQTTSession session) {
|
||||||
this.session = session;
|
this.session = session;
|
||||||
}
|
}
|
||||||
|
@ -114,6 +108,7 @@ public class MQTTPublishManager {
|
||||||
boolean isManagementConsumer(ServerConsumer consumer) {
|
boolean isManagementConsumer(ServerConsumer consumer) {
|
||||||
return consumer == managementConsumer;
|
return consumer == managementConsumer;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Since MQTT Subscriptions can over lap; a client may receive the same message twice. When this happens the client
|
* Since MQTT Subscriptions can over lap; a client may receive the same message twice. When this happens the client
|
||||||
* returns a PubRec or PubAck with ID. But we need to know which consumer to ack, since we only have the ID to go on we
|
* returns a PubRec or PubAck with ID. But we need to know which consumer to ack, since we only have the ID to go on we
|
||||||
|
@ -124,35 +119,20 @@ public class MQTTPublishManager {
|
||||||
protected void sendMessage(ICoreMessage message, ServerConsumer consumer, int deliveryCount) throws Exception {
|
protected void sendMessage(ICoreMessage message, ServerConsumer consumer, int deliveryCount) throws Exception {
|
||||||
// This is to allow retries of PubRel.
|
// This is to allow retries of PubRel.
|
||||||
if (isManagementConsumer(consumer)) {
|
if (isManagementConsumer(consumer)) {
|
||||||
currentQos = null;
|
|
||||||
sendPubRelMessage(message);
|
sendPubRelMessage(message);
|
||||||
} else {
|
} else {
|
||||||
int qos = decideQoS(message, consumer);
|
int qos = decideQoS(message, consumer);
|
||||||
currentQos = qos;
|
|
||||||
if (qos == 0) {
|
if (qos == 0) {
|
||||||
sendServerMessage((int) message.getMessageID(), message, deliveryCount, qos);
|
sendServerMessage((int) message.getMessageID(), message, deliveryCount, qos);
|
||||||
|
session.getServerSession().individualAcknowledge(consumer.getID(), message.getMessageID());
|
||||||
} else if (qos == 1 || qos == 2) {
|
} else if (qos == 1 || qos == 2) {
|
||||||
int mqttid = outboundStore.generateMqttId(message.getMessageID(), consumer.getID());
|
int mqttid = outboundStore.generateMqttId(message.getMessageID(), consumer.getID());
|
||||||
outboundStore.publish(mqttid, message.getMessageID(), consumer.getID());
|
outboundStore.publish(mqttid, message.getMessageID(), consumer.getID());
|
||||||
sendServerMessage(mqttid, message, deliveryCount, qos);
|
sendServerMessage(mqttid, message, deliveryCount, qos);
|
||||||
} else {
|
|
||||||
// this will happen during afterDeliver
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
protected void confirmMessage(ICoreMessage message, ServerConsumer consumer, int deliveryCount) throws Exception {
|
|
||||||
if (currentQos != null) {
|
|
||||||
int qos = currentQos.intValue();
|
|
||||||
if (qos == 0) {
|
|
||||||
session.getServerSession().individualAcknowledge(consumer.getID(), message.getMessageID());
|
|
||||||
} else if (qos == 1 || qos == 2) {
|
|
||||||
// everything happened in delivery
|
|
||||||
} else {
|
} else {
|
||||||
// Client must have disconnected and it's Subscription QoS cleared
|
// Client must have disconnected and it's Subscription QoS cleared
|
||||||
consumer.individualCancel(message.getMessageID(), false);
|
consumer.individualCancel(message.getMessageID(), false);
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -59,15 +59,6 @@ public class MQTTSessionCallback implements SessionCallback {
|
||||||
return 1;
|
return 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public void afterDeliver(MessageReference ref, Message message, ServerConsumer consumer, int deliveryCount) {
|
|
||||||
try {
|
|
||||||
session.getMqttPublishManager().confirmMessage(message.toCore(), consumer, deliveryCount);
|
|
||||||
} catch (Exception e) {
|
|
||||||
log.warn("Unable to send message: " + message.getMessageID() + " Cause: " + e.getMessage(), e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean updateDeliveryCountAfterCancel(ServerConsumer consumer, MessageReference ref, boolean failed) {
|
public boolean updateDeliveryCountAfterCancel(ServerConsumer consumer, MessageReference ref, boolean failed) {
|
||||||
return false;
|
return false;
|
||||||
|
@ -99,6 +90,11 @@ public class MQTTSessionCallback implements SessionCallback {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void afterDelivery() throws Exception {
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void browserFinished(ServerConsumer consumer) {
|
public void browserFinished(ServerConsumer consumer) {
|
||||||
|
|
||||||
|
|
|
@ -270,6 +270,12 @@ public class AMQSession implements SessionCallback {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// rename actualDest to destination
|
||||||
|
@Override
|
||||||
|
public void afterDelivery() throws Exception {
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void browserFinished(ServerConsumer consumer) {
|
public void browserFinished(ServerConsumer consumer) {
|
||||||
AMQConsumer theConsumer = (AMQConsumer) consumer.getProtocolData();
|
AMQConsumer theConsumer = (AMQConsumer) consumer.getProtocolData();
|
||||||
|
@ -306,14 +312,6 @@ public class AMQSession implements SessionCallback {
|
||||||
return theConsumer.handleDeliver(reference, message.toCore(), deliveryCount);
|
return theConsumer.handleDeliver(reference, message.toCore(), deliveryCount);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public void afterDeliver(MessageReference ref,
|
|
||||||
org.apache.activemq.artemis.api.core.Message message,
|
|
||||||
ServerConsumer consumerID,
|
|
||||||
int deliveryCount) {
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public int sendLargeMessage(MessageReference reference,
|
public int sendLargeMessage(MessageReference reference,
|
||||||
org.apache.activemq.artemis.api.core.Message message,
|
org.apache.activemq.artemis.api.core.Message message,
|
||||||
|
|
|
@ -110,9 +110,8 @@ public class StompSession implements SessionCallback {
|
||||||
public void sendProducerCreditsFailMessage(int credits, SimpleString address) {
|
public void sendProducerCreditsFailMessage(int credits, SimpleString address) {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void afterDeliver(MessageReference ref, Message message, ServerConsumer consumerID, int deliveryCount) throws Exception {
|
public void afterDelivery() throws Exception {
|
||||||
PendingTask task;
|
PendingTask task;
|
||||||
while ((task = afterDeliveryTasks.poll()) != null) {
|
while ((task = afterDeliveryTasks.poll()) != null) {
|
||||||
task.run();
|
task.run();
|
||||||
|
|
|
@ -211,6 +211,11 @@ public class ManagementRemotingConnection implements RemotingConnection {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void afterDelivery() throws Exception {
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean updateDeliveryCountAfterCancel(ServerConsumer consumer, MessageReference ref, boolean failed) {
|
public boolean updateDeliveryCountAfterCancel(ServerConsumer consumer, MessageReference ref, boolean failed) {
|
||||||
return false;
|
return false;
|
||||||
|
@ -231,11 +236,6 @@ public class ManagementRemotingConnection implements RemotingConnection {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public void afterDeliver(MessageReference ref, Message message, ServerConsumer consumerID, int deliveryCount) {
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public int sendLargeMessage(MessageReference reference,
|
public int sendLargeMessage(MessageReference reference,
|
||||||
Message message,
|
Message message,
|
||||||
|
|
|
@ -132,11 +132,6 @@ public final class CoreSessionCallback implements SessionCallback {
|
||||||
return size;
|
return size;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public void afterDeliver(MessageReference ref, Message message, ServerConsumer consumerID, int deliveryCount) {
|
|
||||||
// no op
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void sendProducerCreditsMessage(int credits, SimpleString address) {
|
public void sendProducerCreditsMessage(int credits, SimpleString address) {
|
||||||
Packet packet = new SessionProducerCreditsMessage(credits, address);
|
Packet packet = new SessionProducerCreditsMessage(credits, address);
|
||||||
|
@ -149,6 +144,11 @@ public final class CoreSessionCallback implements SessionCallback {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void afterDelivery() throws Exception {
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void sendProducerCreditsFailMessage(int credits, SimpleString address) {
|
public void sendProducerCreditsFailMessage(int credits, SimpleString address) {
|
||||||
Packet packet = new SessionProducerCreditsFailMessage(credits, address);
|
Packet packet = new SessionProducerCreditsFailMessage(credits, address);
|
||||||
|
|
|
@ -25,10 +25,6 @@ import org.apache.activemq.artemis.spi.core.protocol.SessionCallback;
|
||||||
|
|
||||||
public interface Consumer extends PriorityAware {
|
public interface Consumer extends PriorityAware {
|
||||||
|
|
||||||
interface GroupHandler {
|
|
||||||
MessageReference handleMessageGroup(MessageReference ref, Consumer consumer, boolean newGroup);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
*
|
*
|
||||||
* @see SessionCallback#supportsDirectDelivery()
|
* @see SessionCallback#supportsDirectDelivery()
|
||||||
|
@ -38,7 +34,13 @@ public interface Consumer extends PriorityAware {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
* There was a change on semantic during 2.3 here.<br>
|
||||||
|
* We now first accept the message, and the actual deliver is done as part of
|
||||||
|
* {@link #proceedDeliver(MessageReference)}. This is to avoid holding a lock on the queues while
|
||||||
|
* the delivery is being accomplished To avoid a lock on the queue in case of misbehaving
|
||||||
|
* consumers.
|
||||||
|
* <p>
|
||||||
|
* This should return busy if handle is called before proceed deliver is called
|
||||||
*
|
*
|
||||||
* @param reference
|
* @param reference
|
||||||
* @return
|
* @return
|
||||||
|
@ -46,29 +48,19 @@ public interface Consumer extends PriorityAware {
|
||||||
*/
|
*/
|
||||||
HandleStatus handle(MessageReference reference) throws Exception;
|
HandleStatus handle(MessageReference reference) throws Exception;
|
||||||
|
|
||||||
/**
|
|
||||||
* This will return {@link HandleStatus#BUSY} if busy, {@link HandleStatus#NO_MATCH} if no match, or the MessageReference is handled
|
|
||||||
* This should return busy if handle is called before proceed deliver is called
|
|
||||||
* @param groupHandler
|
|
||||||
* @param reference
|
|
||||||
* @return
|
|
||||||
* @throws Exception
|
|
||||||
*/
|
|
||||||
default Object handleWithGroup(GroupHandler groupHandler, boolean newGroup, MessageReference reference) throws Exception {
|
|
||||||
return handle(reference);
|
|
||||||
}
|
|
||||||
|
|
||||||
/** wakes up internal threads to deliver more messages */
|
/** wakes up internal threads to deliver more messages */
|
||||||
default void promptDelivery() {
|
default void promptDelivery() {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This will called after delivery
|
* This will proceed with the actual delivery.
|
||||||
* Giving protocols a chance to complete their deliveries doing things such as individualACK outside of main locks
|
* Notice that handle should hold a readLock and proceedDelivery should release the readLock
|
||||||
|
* any lock operation on Consumer should also get a writeLock on the readWriteLock
|
||||||
|
* to guarantee there are no pending deliveries
|
||||||
*
|
*
|
||||||
* @throws Exception
|
* @throws Exception
|
||||||
*/
|
*/
|
||||||
void afterDeliver(MessageReference reference) throws Exception;
|
void proceedDeliver(MessageReference reference) throws Exception;
|
||||||
|
|
||||||
Filter getFilter();
|
Filter getFilter();
|
||||||
|
|
||||||
|
|
|
@ -689,7 +689,7 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
|
||||||
// FailureListener implementation --------------------------------
|
// FailureListener implementation --------------------------------
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void afterDeliver(MessageReference ref) {
|
public void proceedDeliver(MessageReference ref) {
|
||||||
// no op
|
// no op
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -193,7 +193,7 @@ public class Redistributor implements Consumer {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void afterDeliver(MessageReference ref) {
|
public void proceedDeliver(MessageReference ref) {
|
||||||
// no op
|
// no op
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -98,6 +98,7 @@ import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
|
||||||
import org.apache.activemq.artemis.utils.BooleanUtil;
|
import org.apache.activemq.artemis.utils.BooleanUtil;
|
||||||
import org.apache.activemq.artemis.utils.Env;
|
import org.apache.activemq.artemis.utils.Env;
|
||||||
import org.apache.activemq.artemis.utils.ReferenceCounter;
|
import org.apache.activemq.artemis.utils.ReferenceCounter;
|
||||||
|
import org.apache.activemq.artemis.utils.ReusableLatch;
|
||||||
import org.apache.activemq.artemis.utils.actors.ArtemisExecutor;
|
import org.apache.activemq.artemis.utils.actors.ArtemisExecutor;
|
||||||
import org.apache.activemq.artemis.utils.collections.LinkedListIterator;
|
import org.apache.activemq.artemis.utils.collections.LinkedListIterator;
|
||||||
import org.apache.activemq.artemis.utils.collections.PriorityLinkedList;
|
import org.apache.activemq.artemis.utils.collections.PriorityLinkedList;
|
||||||
|
@ -113,7 +114,7 @@ import org.jboss.logging.Logger;
|
||||||
* <p>
|
* <p>
|
||||||
* Completely non blocking between adding to queue and delivering to consumers.
|
* Completely non blocking between adding to queue and delivering to consumers.
|
||||||
*/
|
*/
|
||||||
public class QueueImpl extends CriticalComponentImpl implements Queue, Consumer.GroupHandler {
|
public class QueueImpl extends CriticalComponentImpl implements Queue {
|
||||||
|
|
||||||
protected static final int CRITICAL_PATHS = 5;
|
protected static final int CRITICAL_PATHS = 5;
|
||||||
protected static final int CRITICAL_PATH_ADD_TAIL = 0;
|
protected static final int CRITICAL_PATH_ADD_TAIL = 0;
|
||||||
|
@ -267,6 +268,8 @@ public class QueueImpl extends CriticalComponentImpl implements Queue, Consumer.
|
||||||
|
|
||||||
private final ExpiryScanner expiryScanner = new ExpiryScanner();
|
private final ExpiryScanner expiryScanner = new ExpiryScanner();
|
||||||
|
|
||||||
|
private final ReusableLatch deliveriesInTransit = new ReusableLatch(0);
|
||||||
|
|
||||||
private final AtomicLong queueRateCheckTime = new AtomicLong(System.currentTimeMillis());
|
private final AtomicLong queueRateCheckTime = new AtomicLong(System.currentTimeMillis());
|
||||||
|
|
||||||
private final AtomicLong messagesAddedSnapshot = new AtomicLong(0);
|
private final AtomicLong messagesAddedSnapshot = new AtomicLong(0);
|
||||||
|
@ -952,7 +955,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue, Consumer.
|
||||||
// directDeliver flag to be re-computed resulting in direct delivery if the queue is empty
|
// directDeliver flag to be re-computed resulting in direct delivery if the queue is empty
|
||||||
// We don't recompute it on every delivery since executing isEmpty is expensive for a ConcurrentQueue
|
// We don't recompute it on every delivery since executing isEmpty is expensive for a ConcurrentQueue
|
||||||
|
|
||||||
if (getExecutor().isFlushed() &&
|
if (deliveriesInTransit.getCount() == 0 && getExecutor().isFlushed() &&
|
||||||
intermediateMessageReferences.isEmpty() && messageReferences.isEmpty() &&
|
intermediateMessageReferences.isEmpty() && messageReferences.isEmpty() &&
|
||||||
pageIterator != null && !pageIterator.hasNext() &&
|
pageIterator != null && !pageIterator.hasNext() &&
|
||||||
pageSubscription != null && !pageSubscription.isPaging()) {
|
pageSubscription != null && !pageSubscription.isPaging()) {
|
||||||
|
@ -971,7 +974,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue, Consumer.
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (direct && supportsDirectDeliver && directDeliver && deliverDirect(ref)) {
|
if (direct && supportsDirectDeliver && directDeliver && deliveriesInTransit.getCount() == 0 && deliverDirect(ref)) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1002,6 +1005,23 @@ public class QueueImpl extends CriticalComponentImpl implements Queue, Consumer.
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This will wait for any pending deliveries to finish
|
||||||
|
*/
|
||||||
|
private boolean flushDeliveriesInTransit() {
|
||||||
|
try {
|
||||||
|
if (deliveriesInTransit.await(DELIVERY_TIMEOUT)) {
|
||||||
|
return true;
|
||||||
|
} else {
|
||||||
|
ActiveMQServerLogger.LOGGER.timeoutFlushInTransit(getName().toString(), getAddress().toString());
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
} catch (Exception e) {
|
||||||
|
ActiveMQServerLogger.LOGGER.unableToFlushDeliveries(e);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void forceDelivery() {
|
public void forceDelivery() {
|
||||||
if (pageSubscription != null && pageSubscription.isPaging()) {
|
if (pageSubscription != null && pageSubscription.isPaging()) {
|
||||||
|
@ -2346,6 +2366,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue, Consumer.
|
||||||
@Override
|
@Override
|
||||||
public synchronized void pause(boolean persist) {
|
public synchronized void pause(boolean persist) {
|
||||||
try {
|
try {
|
||||||
|
this.flushDeliveriesInTransit();
|
||||||
if (persist && isDurable()) {
|
if (persist && isDurable()) {
|
||||||
if (pauseStatusRecord >= 0) {
|
if (pauseStatusRecord >= 0) {
|
||||||
storageManager.deleteQueueStatus(pauseStatusRecord);
|
storageManager.deleteQueueStatus(pauseStatusRecord);
|
||||||
|
@ -2586,16 +2607,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue, Consumer.
|
||||||
consumer = groupConsumer;
|
consumer = groupConsumer;
|
||||||
}
|
}
|
||||||
|
|
||||||
Object handleValue = handle(ref, consumer, groupConsumer == null);
|
HandleStatus status = handle(ref, consumer);
|
||||||
|
|
||||||
HandleStatus status;
|
|
||||||
|
|
||||||
if (handleValue instanceof MessageReference) {
|
|
||||||
ref = (MessageReference) handleValue;
|
|
||||||
status = HandleStatus.HANDLED;
|
|
||||||
} else {
|
|
||||||
status = (HandleStatus) handleValue;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (status == HandleStatus.HANDLED) {
|
if (status == HandleStatus.HANDLED) {
|
||||||
|
|
||||||
|
@ -2603,6 +2615,13 @@ public class QueueImpl extends CriticalComponentImpl implements Queue, Consumer.
|
||||||
// this is to avoid breaks on the loop when checking for any other factors.
|
// this is to avoid breaks on the loop when checking for any other factors.
|
||||||
noDelivery = 0;
|
noDelivery = 0;
|
||||||
|
|
||||||
|
if (redistributor == null) {
|
||||||
|
ref = handleMessageGroup(ref, consumer, groupConsumer, groupID);
|
||||||
|
}
|
||||||
|
|
||||||
|
deliveriesInTransit.countUp();
|
||||||
|
|
||||||
|
|
||||||
removeMessageReference(holder, ref);
|
removeMessageReference(holder, ref);
|
||||||
handledconsumer = consumer;
|
handledconsumer = consumer;
|
||||||
handled++;
|
handled++;
|
||||||
|
@ -2634,10 +2653,16 @@ public class QueueImpl extends CriticalComponentImpl implements Queue, Consumer.
|
||||||
// Round robin'd all
|
// Round robin'd all
|
||||||
|
|
||||||
if (noDelivery == this.consumers.size()) {
|
if (noDelivery == this.consumers.size()) {
|
||||||
if (logger.isDebugEnabled()) {
|
if (handledconsumer != null) {
|
||||||
logger.debug(this + "::All the consumers were busy, giving up now");
|
// this shouldn't really happen,
|
||||||
|
// however I'm keeping this as an assertion case future developers ever change the logic here on this class
|
||||||
|
ActiveMQServerLogger.LOGGER.nonDeliveryHandled();
|
||||||
|
} else {
|
||||||
|
if (logger.isDebugEnabled()) {
|
||||||
|
logger.debug(this + "::All the consumers were busy, giving up now");
|
||||||
|
}
|
||||||
|
break;
|
||||||
}
|
}
|
||||||
break;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
noDelivery = 0;
|
noDelivery = 0;
|
||||||
|
@ -2645,7 +2670,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue, Consumer.
|
||||||
}
|
}
|
||||||
|
|
||||||
if (handledconsumer != null) {
|
if (handledconsumer != null) {
|
||||||
afterDeliver(handledconsumer, ref);
|
proceedDeliver(handledconsumer, ref);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -3173,7 +3198,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue, Consumer.
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private boolean deliver(MessageReference ref) {
|
private boolean deliver(final MessageReference ref) {
|
||||||
synchronized (this) {
|
synchronized (this) {
|
||||||
if (!supportsDirectDeliver) {
|
if (!supportsDirectDeliver) {
|
||||||
return false;
|
return false;
|
||||||
|
@ -3200,24 +3225,20 @@ public class QueueImpl extends CriticalComponentImpl implements Queue, Consumer.
|
||||||
consumer = groupConsumer;
|
consumer = groupConsumer;
|
||||||
}
|
}
|
||||||
|
|
||||||
Object handleValue = handle(ref, consumer, groupConsumer == null);
|
HandleStatus status = handle(ref, consumer);
|
||||||
|
|
||||||
|
|
||||||
HandleStatus status;
|
|
||||||
|
|
||||||
final MessageReference reference;
|
|
||||||
if (handleValue instanceof MessageReference) {
|
|
||||||
reference = (MessageReference) handleValue;
|
|
||||||
status = HandleStatus.HANDLED;
|
|
||||||
} else {
|
|
||||||
reference = ref;
|
|
||||||
status = (HandleStatus) handleValue;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
if (status == HandleStatus.HANDLED) {
|
if (status == HandleStatus.HANDLED) {
|
||||||
|
final MessageReference reference;
|
||||||
|
if (redistributor == null) {
|
||||||
|
reference = handleMessageGroup(ref, consumer, groupConsumer, groupID);
|
||||||
|
} else {
|
||||||
|
reference = ref;
|
||||||
|
}
|
||||||
|
|
||||||
messagesAdded.incrementAndGet();
|
messagesAdded.incrementAndGet();
|
||||||
|
|
||||||
|
deliveriesInTransit.countUp();
|
||||||
|
proceedDeliver(consumer, reference);
|
||||||
consumers.reset();
|
consumers.reset();
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
@ -3248,16 +3269,9 @@ public class QueueImpl extends CriticalComponentImpl implements Queue, Consumer.
|
||||||
return groupConsumer;
|
return groupConsumer;
|
||||||
}
|
}
|
||||||
|
|
||||||
/** This is {@link Consumer.GroupHandler#handleMessageGroup(MessageReference, Consumer, boolean)} */
|
private MessageReference handleMessageGroup(MessageReference ref, Consumer consumer, Consumer groupConsumer, SimpleString groupID) {
|
||||||
@Override
|
|
||||||
public MessageReference handleMessageGroup(MessageReference ref, Consumer consumer, boolean newGroup) {
|
|
||||||
if (redistributor != null) {
|
|
||||||
// no grouping work on this case
|
|
||||||
return ref;
|
|
||||||
}
|
|
||||||
SimpleString groupID = extractGroupID(ref);
|
|
||||||
if (exclusive) {
|
if (exclusive) {
|
||||||
if (newGroup) {
|
if (groupConsumer == null) {
|
||||||
exclusiveConsumer = consumer;
|
exclusiveConsumer = consumer;
|
||||||
if (groupFirstKey != null) {
|
if (groupFirstKey != null) {
|
||||||
return new GroupFirstMessageReference(groupFirstKey, ref);
|
return new GroupFirstMessageReference(groupFirstKey, ref);
|
||||||
|
@ -3268,7 +3282,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue, Consumer.
|
||||||
if (extractGroupSequence(ref) == -1) {
|
if (extractGroupSequence(ref) == -1) {
|
||||||
groups.remove(groupID);
|
groups.remove(groupID);
|
||||||
consumers.repeat();
|
consumers.repeat();
|
||||||
} else if (newGroup) {
|
} else if (groupConsumer == null) {
|
||||||
groups.put(groupID, consumer);
|
groups.put(groupID, consumer);
|
||||||
if (groupFirstKey != null) {
|
if (groupFirstKey != null) {
|
||||||
return new GroupFirstMessageReference(groupFirstKey, ref);
|
return new GroupFirstMessageReference(groupFirstKey, ref);
|
||||||
|
@ -3280,11 +3294,13 @@ public class QueueImpl extends CriticalComponentImpl implements Queue, Consumer.
|
||||||
return ref;
|
return ref;
|
||||||
}
|
}
|
||||||
|
|
||||||
private void afterDeliver(Consumer consumer, MessageReference reference) {
|
private void proceedDeliver(Consumer consumer, MessageReference reference) {
|
||||||
try {
|
try {
|
||||||
consumer.afterDeliver(reference);
|
consumer.proceedDeliver(reference);
|
||||||
} catch (Throwable t) {
|
} catch (Throwable t) {
|
||||||
errorProcessing(consumer, t, reference);
|
errorProcessing(consumer, t, reference);
|
||||||
|
} finally {
|
||||||
|
deliveriesInTransit.countDown();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -3329,10 +3345,10 @@ public class QueueImpl extends CriticalComponentImpl implements Queue, Consumer.
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private synchronized Object handle(final MessageReference reference, final Consumer consumer, boolean newGroup) {
|
private synchronized HandleStatus handle(final MessageReference reference, final Consumer consumer) {
|
||||||
Object status;
|
HandleStatus status;
|
||||||
try {
|
try {
|
||||||
status = consumer.handleWithGroup(this, newGroup, reference);
|
status = consumer.handle(reference);
|
||||||
} catch (Throwable t) {
|
} catch (Throwable t) {
|
||||||
ActiveMQServerLogger.LOGGER.removingBadConsumer(t, consumer, reference);
|
ActiveMQServerLogger.LOGGER.removingBadConsumer(t, consumer, reference);
|
||||||
|
|
||||||
|
|
|
@ -24,8 +24,11 @@ import java.util.Collections;
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
import java.util.LinkedList;
|
import java.util.LinkedList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
import java.util.concurrent.atomic.AtomicLong;
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
|
import java.util.concurrent.locks.ReadWriteLock;
|
||||||
|
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||||
|
|
||||||
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
|
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
|
||||||
import org.apache.activemq.artemis.api.core.ActiveMQException;
|
import org.apache.activemq.artemis.api.core.ActiveMQException;
|
||||||
|
@ -104,6 +107,13 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
|
||||||
|
|
||||||
private SlowConsumerDetectionListener slowConsumerListener;
|
private SlowConsumerDetectionListener slowConsumerListener;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* We get a readLock when a message is handled, and return the readLock when the message is finally delivered
|
||||||
|
* When stopping the consumer we need to get a writeLock to make sure we had all delivery finished
|
||||||
|
* otherwise a rollback may get message sneaking in
|
||||||
|
*/
|
||||||
|
private final ReadWriteLock lockDelivery = new ReentrantReadWriteLock();
|
||||||
|
|
||||||
private volatile AtomicInteger availableCredits = new AtomicInteger(0);
|
private volatile AtomicInteger availableCredits = new AtomicInteger(0);
|
||||||
|
|
||||||
private boolean started;
|
private boolean started;
|
||||||
|
@ -382,20 +392,8 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
|
||||||
messageQueue.errorProcessing(this, e, deliveryObject);
|
messageQueue.errorProcessing(this, e, deliveryObject);
|
||||||
}
|
}
|
||||||
|
|
||||||
/** This is in case someone is using direct old API */
|
|
||||||
@Override
|
@Override
|
||||||
public HandleStatus handle(MessageReference ref) throws Exception {
|
public HandleStatus handle(final MessageReference ref) throws Exception {
|
||||||
Object refReturn = handleWithGroup(null, false, ref);
|
|
||||||
|
|
||||||
if (refReturn instanceof MessageReference) {
|
|
||||||
return HandleStatus.HANDLED;
|
|
||||||
} else {
|
|
||||||
return (HandleStatus) refReturn;
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
@Override
|
|
||||||
public Object handleWithGroup(GroupHandler handler, boolean newGroup, final MessageReference ref) throws Exception {
|
|
||||||
// available credits can be set back to null with a flow control option.
|
// available credits can be set back to null with a flow control option.
|
||||||
AtomicInteger checkInteger = availableCredits;
|
AtomicInteger checkInteger = availableCredits;
|
||||||
if (callback != null && !callback.hasCredits(this) || checkInteger != null && checkInteger.get() <= 0) {
|
if (callback != null && !callback.hasCredits(this) || checkInteger != null && checkInteger.get() <= 0) {
|
||||||
|
@ -483,46 +481,42 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
MessageReference deliveryReference = ref;
|
lockDelivery.readLock().lock();
|
||||||
|
|
||||||
if (handler != null) {
|
|
||||||
deliveryReference = handler.handleMessageGroup(ref, this, newGroup);
|
|
||||||
}
|
|
||||||
|
|
||||||
proceedDeliver(deliveryReference);
|
|
||||||
|
|
||||||
return HandleStatus.HANDLED;
|
return HandleStatus.HANDLED;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void proceedDeliver(MessageReference reference) throws Exception {
|
|
||||||
Message message = reference.getMessage();
|
|
||||||
|
|
||||||
if (server.hasBrokerMessagePlugins()) {
|
|
||||||
server.callBrokerMessagePlugins(plugin -> plugin.beforeDeliver(this, reference));
|
|
||||||
}
|
|
||||||
|
|
||||||
if (message.isLargeMessage() && supportLargeMessage) {
|
|
||||||
if (largeMessageDeliverer == null) {
|
|
||||||
// This can't really happen as handle had already crated the deliverer
|
|
||||||
// instead of throwing an exception in weird cases there is no problem on just go ahead and create it
|
|
||||||
// again here
|
|
||||||
largeMessageDeliverer = new LargeMessageDeliverer((LargeServerMessage) message, reference);
|
|
||||||
}
|
|
||||||
// The deliverer was prepared during handle, as we can't have more than one pending large message
|
|
||||||
// as it would return busy if there is anything pending
|
|
||||||
largeMessageDeliverer.deliver();
|
|
||||||
} else {
|
|
||||||
deliverStandardMessage(reference, message);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void afterDeliver(MessageReference reference) throws Exception {
|
public void proceedDeliver(MessageReference reference) throws Exception {
|
||||||
callback.afterDeliver(reference, reference.getMessage(), ServerConsumerImpl.this, reference.getDeliveryCount());
|
try {
|
||||||
if (server.hasBrokerMessagePlugins()) {
|
Message message = reference.getMessage();
|
||||||
server.callBrokerMessagePlugins(plugin -> plugin.afterDeliver(this, reference));
|
|
||||||
|
if (server.hasBrokerMessagePlugins()) {
|
||||||
|
server.callBrokerMessagePlugins(plugin -> plugin.beforeDeliver(this, reference));
|
||||||
|
}
|
||||||
|
|
||||||
|
if (message.isLargeMessage() && supportLargeMessage) {
|
||||||
|
if (largeMessageDeliverer == null) {
|
||||||
|
// This can't really happen as handle had already crated the deliverer
|
||||||
|
// instead of throwing an exception in weird cases there is no problem on just go ahead and create it
|
||||||
|
// again here
|
||||||
|
largeMessageDeliverer = new LargeMessageDeliverer((LargeServerMessage) message, reference);
|
||||||
|
}
|
||||||
|
// The deliverer was prepared during handle, as we can't have more than one pending large message
|
||||||
|
// as it would return busy if there is anything pending
|
||||||
|
largeMessageDeliverer.deliver();
|
||||||
|
} else {
|
||||||
|
deliverStandardMessage(reference, message);
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
lockDelivery.readLock().unlock();
|
||||||
|
callback.afterDelivery();
|
||||||
|
if (server.hasBrokerMessagePlugins()) {
|
||||||
|
server.callBrokerMessagePlugins(plugin -> plugin.afterDeliver(this, reference));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -632,7 +626,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
|
||||||
* there are no other messages to be delivered.
|
* there are no other messages to be delivered.
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
public void forceDelivery(final long sequence) {
|
public void forceDelivery(final long sequence) {
|
||||||
forceDelivery(sequence, () -> {
|
forceDelivery(sequence, () -> {
|
||||||
Message forcedDeliveryMessage = new CoreMessage(storageManager.generateID(), 50);
|
Message forcedDeliveryMessage = new CoreMessage(storageManager.generateID(), 50);
|
||||||
MessageReference reference = MessageReference.Factory.createReference(forcedDeliveryMessage, messageQueue);
|
MessageReference reference = MessageReference.Factory.createReference(forcedDeliveryMessage, messageQueue);
|
||||||
|
@ -736,7 +730,19 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
|
||||||
@Override
|
@Override
|
||||||
public void setStarted(final boolean started) {
|
public void setStarted(final boolean started) {
|
||||||
synchronized (lock) {
|
synchronized (lock) {
|
||||||
this.started = browseOnly || started;
|
boolean locked = lockDelivery();
|
||||||
|
|
||||||
|
// This is to make sure nothing would sneak to the client while started = false
|
||||||
|
// the client will stop the session and perform a rollback in certain cases.
|
||||||
|
// in case something sneaks to the client you could get to messaging delivering forever until
|
||||||
|
// you restart the server
|
||||||
|
try {
|
||||||
|
this.started = browseOnly || started;
|
||||||
|
} finally {
|
||||||
|
if (locked) {
|
||||||
|
lockDelivery.writeLock().unlock();
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Outside the lock
|
// Outside the lock
|
||||||
|
@ -745,10 +751,35 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private boolean lockDelivery() {
|
||||||
|
try {
|
||||||
|
if (!lockDelivery.writeLock().tryLock(30, TimeUnit.SECONDS)) {
|
||||||
|
ActiveMQServerLogger.LOGGER.timeoutLockingConsumer();
|
||||||
|
if (server != null) {
|
||||||
|
server.threadDump();
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
return true;
|
||||||
|
} catch (Exception e) {
|
||||||
|
ActiveMQServerLogger.LOGGER.failedToFinishDelivery(e);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void setTransferring(final boolean transferring) {
|
public void setTransferring(final boolean transferring) {
|
||||||
synchronized (lock) {
|
synchronized (lock) {
|
||||||
this.transferring = transferring;
|
// This is to make sure that the delivery process has finished any pending delivery
|
||||||
|
// otherwise a message may sneak in on the client while we are trying to stop the consumer
|
||||||
|
boolean locked = lockDelivery();
|
||||||
|
try {
|
||||||
|
this.transferring = transferring;
|
||||||
|
} finally {
|
||||||
|
if (locked) {
|
||||||
|
lockDelivery.writeLock().unlock();
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Outside the lock
|
// Outside the lock
|
||||||
|
@ -1244,111 +1275,125 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
|
||||||
}
|
}
|
||||||
|
|
||||||
public boolean deliver() throws Exception {
|
public boolean deliver() throws Exception {
|
||||||
if (!started) {
|
lockDelivery.readLock().lock();
|
||||||
return false;
|
try {
|
||||||
}
|
if (!started) {
|
||||||
|
return false;
|
||||||
LargeServerMessage currentLargeMessage = largeMessage;
|
|
||||||
if (currentLargeMessage == null) {
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (availableCredits != null && availableCredits.get() <= 0) {
|
|
||||||
if (logger.isTraceEnabled()) {
|
|
||||||
logger.trace(this + "::FlowControl::delivery largeMessage interrupting as there are no more credits, available=" + availableCredits);
|
|
||||||
}
|
|
||||||
releaseHeapBodyBuffer();
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!sentInitialPacket) {
|
|
||||||
context = currentLargeMessage.getBodyEncoder();
|
|
||||||
|
|
||||||
sizePendingLargeMessage = context.getLargeBodySize();
|
|
||||||
|
|
||||||
context.open();
|
|
||||||
|
|
||||||
sentInitialPacket = true;
|
|
||||||
|
|
||||||
int packetSize = callback.sendLargeMessage(ref, currentLargeMessage, ServerConsumerImpl.this, context.getLargeBodySize(), ref.getDeliveryCount());
|
|
||||||
|
|
||||||
if (availableCredits != null) {
|
|
||||||
final int credits = availableCredits.addAndGet(-packetSize);
|
|
||||||
|
|
||||||
if (credits <= 0) {
|
|
||||||
releaseHeapBodyBuffer();
|
|
||||||
}
|
|
||||||
|
|
||||||
if (logger.isTraceEnabled()) {
|
|
||||||
logger.trace(this + "::FlowControl::" + " deliver initialpackage with " + packetSize + " delivered, available now = " + availableCredits);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Execute the rest of the large message on a different thread so as not to tie up the delivery thread
|
LargeServerMessage currentLargeMessage = largeMessage;
|
||||||
// for too long
|
if (currentLargeMessage == null) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
resumeLargeMessage();
|
|
||||||
|
|
||||||
return false;
|
|
||||||
} else {
|
|
||||||
if (availableCredits != null && availableCredits.get() <= 0) {
|
if (availableCredits != null && availableCredits.get() <= 0) {
|
||||||
if (logger.isTraceEnabled()) {
|
if (logger.isTraceEnabled()) {
|
||||||
logger.trace(this + "::FlowControl::deliverLargeMessage Leaving loop of send LargeMessage because of credits, available=" + availableCredits);
|
logger.trace(this + "::FlowControl::delivery largeMessage interrupting as there are no more credits, available=" +
|
||||||
|
availableCredits);
|
||||||
}
|
}
|
||||||
releaseHeapBodyBuffer();
|
releaseHeapBodyBuffer();
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
final int localChunkLen = (int) Math.min(sizePendingLargeMessage - positionPendingLargeMessage, minLargeMessageSize);
|
if (!sentInitialPacket) {
|
||||||
|
context = currentLargeMessage.getBodyEncoder();
|
||||||
|
|
||||||
final ByteBuffer bodyBuffer = acquireHeapBodyBuffer(localChunkLen);
|
sizePendingLargeMessage = context.getLargeBodySize();
|
||||||
|
|
||||||
assert bodyBuffer.remaining() == localChunkLen;
|
context.open();
|
||||||
|
|
||||||
final int readBytes = context.encode(bodyBuffer);
|
sentInitialPacket = true;
|
||||||
|
|
||||||
assert readBytes == localChunkLen;
|
int packetSize = callback.sendLargeMessage(ref, currentLargeMessage, ServerConsumerImpl.this, context.getLargeBodySize(), ref.getDeliveryCount());
|
||||||
|
|
||||||
final byte[] body = bodyBuffer.array();
|
if (availableCredits != null) {
|
||||||
|
final int credits = availableCredits.addAndGet(-packetSize);
|
||||||
|
|
||||||
assert body.length == readBytes;
|
if (credits <= 0) {
|
||||||
|
releaseHeapBodyBuffer();
|
||||||
|
}
|
||||||
|
|
||||||
//It is possible to recycle the same heap body buffer because it won't be cached by sendLargeMessageContinuation
|
if (logger.isTraceEnabled()) {
|
||||||
//given that requiresResponse is false: ChannelImpl::send will use the resend cache only if
|
logger.trace(this + "::FlowControl::" +
|
||||||
//resendCache != null && packet.isRequiresConfirmations()
|
" deliver initialpackage with " +
|
||||||
|
packetSize +
|
||||||
int packetSize = callback.sendLargeMessageContinuation(ServerConsumerImpl.this, body, positionPendingLargeMessage + localChunkLen < sizePendingLargeMessage, false);
|
" delivered, available now = " +
|
||||||
|
availableCredits);
|
||||||
int chunkLen = body.length;
|
}
|
||||||
|
|
||||||
if (availableCredits != null) {
|
|
||||||
final int credits = availableCredits.addAndGet(-packetSize);
|
|
||||||
|
|
||||||
if (credits <= 0) {
|
|
||||||
releaseHeapBodyBuffer();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (logger.isTraceEnabled()) {
|
// Execute the rest of the large message on a different thread so as not to tie up the delivery thread
|
||||||
logger.trace(this + "::FlowControl::largeMessage deliver continuation, packetSize=" + packetSize + " available now=" + availableCredits);
|
// for too long
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
positionPendingLargeMessage += chunkLen;
|
|
||||||
|
|
||||||
if (positionPendingLargeMessage < sizePendingLargeMessage) {
|
|
||||||
resumeLargeMessage();
|
resumeLargeMessage();
|
||||||
|
|
||||||
return false;
|
return false;
|
||||||
|
} else {
|
||||||
|
if (availableCredits != null && availableCredits.get() <= 0) {
|
||||||
|
if (logger.isTraceEnabled()) {
|
||||||
|
logger.trace(this + "::FlowControl::deliverLargeMessage Leaving loop of send LargeMessage because of credits, available=" +
|
||||||
|
availableCredits);
|
||||||
|
}
|
||||||
|
releaseHeapBodyBuffer();
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
final int localChunkLen = (int) Math.min(sizePendingLargeMessage - positionPendingLargeMessage, minLargeMessageSize);
|
||||||
|
|
||||||
|
final ByteBuffer bodyBuffer = acquireHeapBodyBuffer(localChunkLen);
|
||||||
|
|
||||||
|
assert bodyBuffer.remaining() == localChunkLen;
|
||||||
|
|
||||||
|
final int readBytes = context.encode(bodyBuffer);
|
||||||
|
|
||||||
|
assert readBytes == localChunkLen;
|
||||||
|
|
||||||
|
final byte[] body = bodyBuffer.array();
|
||||||
|
|
||||||
|
assert body.length == readBytes;
|
||||||
|
|
||||||
|
//It is possible to recycle the same heap body buffer because it won't be cached by sendLargeMessageContinuation
|
||||||
|
//given that requiresResponse is false: ChannelImpl::send will use the resend cache only if
|
||||||
|
//resendCache != null && packet.isRequiresConfirmations()
|
||||||
|
|
||||||
|
int packetSize = callback.sendLargeMessageContinuation(ServerConsumerImpl.this, body, positionPendingLargeMessage + localChunkLen < sizePendingLargeMessage, false);
|
||||||
|
|
||||||
|
int chunkLen = body.length;
|
||||||
|
|
||||||
|
if (availableCredits != null) {
|
||||||
|
final int credits = availableCredits.addAndGet(-packetSize);
|
||||||
|
|
||||||
|
if (credits <= 0) {
|
||||||
|
releaseHeapBodyBuffer();
|
||||||
|
}
|
||||||
|
|
||||||
|
if (logger.isTraceEnabled()) {
|
||||||
|
logger.trace(this + "::FlowControl::largeMessage deliver continuation, packetSize=" +
|
||||||
|
packetSize +
|
||||||
|
" available now=" +
|
||||||
|
availableCredits);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
positionPendingLargeMessage += chunkLen;
|
||||||
|
|
||||||
|
if (positionPendingLargeMessage < sizePendingLargeMessage) {
|
||||||
|
resumeLargeMessage();
|
||||||
|
|
||||||
|
return false;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (logger.isTraceEnabled()) {
|
||||||
|
logger.trace("Finished deliverLargeMessage");
|
||||||
|
}
|
||||||
|
|
||||||
|
finish();
|
||||||
|
|
||||||
|
return true;
|
||||||
|
} finally {
|
||||||
|
lockDelivery.readLock().unlock();
|
||||||
}
|
}
|
||||||
|
|
||||||
if (logger.isTraceEnabled()) {
|
|
||||||
logger.trace("Finished deliverLargeMessage");
|
|
||||||
}
|
|
||||||
|
|
||||||
finish();
|
|
||||||
|
|
||||||
return true;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public void finish() throws Exception {
|
public void finish() throws Exception {
|
||||||
|
@ -1408,7 +1453,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (status == HandleStatus.HANDLED) {
|
if (status == HandleStatus.HANDLED) {
|
||||||
afterDeliver(current);
|
proceedDeliver(current);
|
||||||
}
|
}
|
||||||
|
|
||||||
current = null;
|
current = null;
|
||||||
|
@ -1436,7 +1481,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (status == HandleStatus.HANDLED) {
|
if (status == HandleStatus.HANDLED) {
|
||||||
afterDeliver(ref);
|
proceedDeliver(ref);
|
||||||
} else if (status == HandleStatus.BUSY) {
|
} else if (status == HandleStatus.BUSY) {
|
||||||
// keep a reference on the current message reference
|
// keep a reference on the current message reference
|
||||||
// to handle it next time the browser deliverer is executed
|
// to handle it next time the browser deliverer is executed
|
||||||
|
|
|
@ -41,10 +41,11 @@ public interface SessionCallback {
|
||||||
*/
|
*/
|
||||||
boolean hasCredits(ServerConsumer consumerID);
|
boolean hasCredits(ServerConsumer consumerID);
|
||||||
|
|
||||||
// Certain protocols (MQTT) will need to confirm messages doing things such as individualACKS
|
/**
|
||||||
// and these need to be done outside of the main lock.
|
* This can be used to complete certain operations outside of the lock,
|
||||||
// otherwise we could dead-lock during delivery
|
* like acks or other operations.
|
||||||
void afterDeliver(MessageReference ref, Message message, ServerConsumer consumerID, int deliveryCount) throws Exception;
|
*/
|
||||||
|
void afterDelivery() throws Exception;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Use this to updates specifics on the message after a redelivery happened.
|
* Use this to updates specifics on the message after a redelivery happened.
|
||||||
|
@ -68,7 +69,6 @@ public interface SessionCallback {
|
||||||
// Future developments may change this, but beware why I have chosen to keep the parameter separated here
|
// Future developments may change this, but beware why I have chosen to keep the parameter separated here
|
||||||
int sendMessage(MessageReference ref, Message message, ServerConsumer consumerID, int deliveryCount);
|
int sendMessage(MessageReference ref, Message message, ServerConsumer consumerID, int deliveryCount);
|
||||||
|
|
||||||
|
|
||||||
int sendLargeMessage(MessageReference reference,
|
int sendLargeMessage(MessageReference reference,
|
||||||
Message message,
|
Message message,
|
||||||
ServerConsumer consumerID,
|
ServerConsumer consumerID,
|
||||||
|
|
|
@ -190,7 +190,7 @@ public class DummyServerConsumer implements ServerConsumer {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void afterDeliver(MessageReference reference) throws Exception {
|
public void proceedDeliver(MessageReference reference) throws Exception {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -527,6 +527,11 @@ public class HangConsumerTest extends ActiveMQTestBase {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void afterDelivery() throws Exception {
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void sendProducerCreditsFailMessage(int credits, SimpleString address) {
|
public void sendProducerCreditsFailMessage(int credits, SimpleString address) {
|
||||||
targetCallback.sendProducerCreditsFailMessage(credits, address);
|
targetCallback.sendProducerCreditsFailMessage(credits, address);
|
||||||
|
@ -553,11 +558,6 @@ public class HangConsumerTest extends ActiveMQTestBase {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public void afterDeliver(MessageReference ref, Message message, ServerConsumer consumerID, int deliveryCount) {
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
/* (non-Javadoc)
|
/* (non-Javadoc)
|
||||||
* @see SessionCallback#sendLargeMessage(org.apache.activemq.artemis.core.server.ServerMessage, long, long, int)
|
* @see SessionCallback#sendLargeMessage(org.apache.activemq.artemis.core.server.ServerMessage, long, long, int)
|
||||||
*/
|
*/
|
||||||
|
|
|
@ -271,6 +271,7 @@ public class GroupingTest extends JMSTestBase {
|
||||||
|
|
||||||
assertEquals(tm.getStringProperty("JMSXGroupID"), jmsxgroupID);
|
assertEquals(tm.getStringProperty("JMSXGroupID"), jmsxgroupID);
|
||||||
}
|
}
|
||||||
|
Thread.sleep(2000);
|
||||||
//session.rollback();
|
//session.rollback();
|
||||||
//session.close();
|
//session.close();
|
||||||
//consume all msgs from 2nd first consumer
|
//consume all msgs from 2nd first consumer
|
||||||
|
|
|
@ -1312,21 +1312,12 @@ public class QueueImplTest extends ActiveMQTestBase {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public synchronized HandleStatus handle(MessageReference reference) {
|
public synchronized HandleStatus handle(MessageReference reference) {
|
||||||
return HandleStatus.HANDLED;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public Object handleWithGroup(GroupHandler groupHandler, boolean newGroup, MessageReference reference) throws Exception {
|
|
||||||
if (count == 0) {
|
if (count == 0) {
|
||||||
//the first message is handled and will be used to determine this consumer
|
//the first message is handled and will be used to determine this consumer
|
||||||
//to be the group consumer
|
//to be the group consumer
|
||||||
count++;
|
count++;
|
||||||
firstMessageHandled.countDown();
|
firstMessageHandled.countDown();
|
||||||
if (groupHandler != null) {
|
return HandleStatus.HANDLED;
|
||||||
return groupHandler.handleMessageGroup(reference, this, newGroup);
|
|
||||||
} else {
|
|
||||||
return HandleStatus.HANDLED;
|
|
||||||
}
|
|
||||||
} else if (count <= 2) {
|
} else if (count <= 2) {
|
||||||
//the next two attempts to send the second message will be done
|
//the next two attempts to send the second message will be done
|
||||||
//attempting a direct delivery and an async one after that
|
//attempting a direct delivery and an async one after that
|
||||||
|
@ -1338,11 +1329,7 @@ public class QueueImplTest extends ActiveMQTestBase {
|
||||||
//the second message should have stop the delivery loop:
|
//the second message should have stop the delivery loop:
|
||||||
//it will succeed just to let the message being handled and
|
//it will succeed just to let the message being handled and
|
||||||
//reduce the message count to 0
|
//reduce the message count to 0
|
||||||
if (groupHandler != null) {
|
return HandleStatus.HANDLED;
|
||||||
return groupHandler.handleMessageGroup(reference, this, newGroup);
|
|
||||||
} else {
|
|
||||||
return HandleStatus.HANDLED;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
|
@ -138,7 +138,7 @@ public class FakeConsumer implements Consumer {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void afterDeliver(MessageReference ref) throws Exception {
|
public void proceedDeliver(MessageReference ref) throws Exception {
|
||||||
// no op
|
// no op
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue