mirror of
https://github.com/apache/activemq-artemis.git
synced 2025-03-06 01:10:14 +00:00
ARTEMIS-1604 Artemis deadlock using MQTT Protocol
Address code review comment not address when PR was merged.
This commit is contained in:
parent
ef7d9800be
commit
fafbd7e2e5
@ -3094,60 +3094,64 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
|
||||
return false;
|
||||
}
|
||||
try {
|
||||
synchronized (this) {
|
||||
if (!supportsDirectDeliver) {
|
||||
return false;
|
||||
}
|
||||
if (paused || !canDispatch() && redistributor == null) {
|
||||
return false;
|
||||
return deliver(ref);
|
||||
} finally {
|
||||
deliverLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
private boolean deliver(final MessageReference ref) {
|
||||
synchronized (this) {
|
||||
if (!supportsDirectDeliver) {
|
||||
return false;
|
||||
}
|
||||
if (paused || !canDispatch() && redistributor == null) {
|
||||
return false;
|
||||
}
|
||||
|
||||
if (checkExpired(ref)) {
|
||||
return true;
|
||||
}
|
||||
|
||||
consumers.reset();
|
||||
|
||||
while (consumers.hasNext() || redistributor != null) {
|
||||
|
||||
ConsumerHolder<? extends Consumer> holder = redistributor == null ? consumers.next() : redistributor;
|
||||
Consumer consumer = holder.consumer;
|
||||
|
||||
final SimpleString groupID = extractGroupID(ref);
|
||||
Consumer groupConsumer = getGroupConsumer(groupID);
|
||||
|
||||
if (groupConsumer != null) {
|
||||
consumer = groupConsumer;
|
||||
}
|
||||
|
||||
if (checkExpired(ref)) {
|
||||
HandleStatus status = handle(ref, consumer);
|
||||
|
||||
if (status == HandleStatus.HANDLED) {
|
||||
|
||||
if (redistributor == null) {
|
||||
handleMessageGroup(ref, consumer, groupConsumer, groupID);
|
||||
}
|
||||
|
||||
messagesAdded.incrementAndGet();
|
||||
|
||||
deliveriesInTransit.countUp();
|
||||
proceedDeliver(consumer, ref);
|
||||
consumers.reset();
|
||||
return true;
|
||||
}
|
||||
|
||||
consumers.reset();
|
||||
|
||||
while (consumers.hasNext() || redistributor != null) {
|
||||
|
||||
ConsumerHolder<? extends Consumer> holder = redistributor == null ? consumers.next() : redistributor;
|
||||
Consumer consumer = holder.consumer;
|
||||
|
||||
final SimpleString groupID = extractGroupID(ref);
|
||||
Consumer groupConsumer = getGroupConsumer(groupID);
|
||||
|
||||
if (groupConsumer != null) {
|
||||
consumer = groupConsumer;
|
||||
}
|
||||
|
||||
HandleStatus status = handle(ref, consumer);
|
||||
|
||||
if (status == HandleStatus.HANDLED) {
|
||||
|
||||
if (redistributor == null) {
|
||||
handleMessageGroup(ref, consumer, groupConsumer, groupID);
|
||||
}
|
||||
|
||||
messagesAdded.incrementAndGet();
|
||||
|
||||
deliveriesInTransit.countUp();
|
||||
proceedDeliver(consumer, ref);
|
||||
consumers.reset();
|
||||
return true;
|
||||
}
|
||||
|
||||
if (redistributor != null || groupConsumer != null) {
|
||||
break;
|
||||
}
|
||||
if (redistributor != null || groupConsumer != null) {
|
||||
break;
|
||||
}
|
||||
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.tracef("Queue " + getName() + " is out of direct delivery as no consumers handled a delivery");
|
||||
}
|
||||
return false;
|
||||
}
|
||||
} finally {
|
||||
deliverLock.unlock();
|
||||
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.tracef("Queue " + getName() + " is out of direct delivery as no consumers handled a delivery");
|
||||
}
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user