This commit is contained in:
Clebert Suconic 2019-01-17 10:25:52 -05:00
commit 4179c44e7e
1 changed files with 64 additions and 52 deletions

View File

@ -18,10 +18,12 @@ package org.apache.activemq.artemis.core.server.impl;
import java.math.BigDecimal;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.ArrayDeque;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
@ -117,7 +119,10 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
@Override
public String debug() {
return toString() + "::Delivering " + this.deliveringRefs.size();
String debug = toString() + "::Delivering ";
synchronized (lock) {
return debug + this.deliveringRefs.size();
}
}
/**
@ -132,7 +137,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
private final StorageManager storageManager;
protected final java.util.Deque<MessageReference> deliveringRefs = new ConcurrentLinkedDeque<>();
private final java.util.Deque<MessageReference> deliveringRefs = new ArrayDeque<>();
private final SessionCallback callback;
@ -334,16 +339,20 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
@Override
public List<MessageReference> getDeliveringMessages() {
List<MessageReference> refs = new LinkedList<>();
synchronized (lock) {
int expectedSize = 0;
List<MessageReference> refsOnConsumer = session.getInTXMessagesForConsumer(this.id);
if (refsOnConsumer != null) {
expectedSize = refsOnConsumer.size();
}
expectedSize += deliveringRefs.size();
final List<MessageReference> refs = new ArrayList<>(expectedSize);
if (refsOnConsumer != null) {
refs.addAll(refsOnConsumer);
}
refs.addAll(deliveringRefs);
return refs;
}
return refs;
}
/** i
@ -522,21 +531,16 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
removeItself();
LinkedList<MessageReference> refs = cancelRefs(failed, false, null);
Iterator<MessageReference> iter = refs.iterator();
List<MessageReference> refs = cancelRefs(failed, false, null);
Transaction tx = new TransactionImpl(storageManager);
while (iter.hasNext()) {
MessageReference ref = iter.next();
refs.forEach(ref -> {
if (logger.isTraceEnabled()) {
logger.trace("ServerConsumerImpl::" + this + " cancelling reference " + ref);
}
ref.getQueue().cancel(tx, ref, true);
}
});
tx.rollback();
@ -638,9 +642,9 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
}
@Override
public LinkedList<MessageReference> cancelRefs(final boolean failed,
final boolean lastConsumedAsDelivered,
final Transaction tx) throws Exception {
public List<MessageReference> cancelRefs(final boolean failed,
final boolean lastConsumedAsDelivered,
final Transaction tx) throws Exception {
boolean performACK = lastConsumedAsDelivered;
try {
@ -654,30 +658,30 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
largeMessageDeliverer = null;
}
LinkedList<MessageReference> refs = new LinkedList<>();
synchronized (lock) {
if (!deliveringRefs.isEmpty()) {
for (MessageReference ref : deliveringRefs) {
if (performACK) {
ref.acknowledge(tx, this);
if (deliveringRefs.isEmpty()) {
return Collections.emptyList();
}
final List<MessageReference> refs = new ArrayList<>(deliveringRefs.size());
MessageReference ref;
while ((ref = deliveringRefs.poll()) != null) {
if (performACK) {
ref.acknowledge(tx, this);
performACK = false;
} else {
refs.add(ref);
updateDeliveryCountForCanceledRef(ref, failed);
}
if (logger.isTraceEnabled()) {
logger.trace("ServerConsumerImpl::" + this + " Preparing Cancelling list for messageID = " + ref.getMessage().getMessageID() + ", ref = " + ref);
}
performACK = false;
} else {
refs.add(ref);
updateDeliveryCountForCanceledRef(ref, failed);
}
deliveringRefs.clear();
if (logger.isTraceEnabled()) {
logger.trace("ServerConsumerImpl::" + this + " Preparing Cancelling list for messageID = " + ref.getMessage().getMessageID() + ", ref = " + ref);
}
}
return refs;
}
return refs;
}
protected void updateDeliveryCountForCanceledRef(MessageReference ref, boolean failed) {
@ -1017,7 +1021,9 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
@Override
public synchronized void backToDelivering(MessageReference reference) {
deliveringRefs.addFirst(reference);
synchronized (lock) {
deliveringRefs.addFirst(reference);
}
}
@Override
@ -1038,26 +1044,32 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
if (deliveringRefs.peek().getMessage().getMessageID() == messageID) {
return deliveringRefs.poll();
}
Iterator<MessageReference> iter = deliveringRefs.iterator();
MessageReference ref = null;
while (iter.hasNext()) {
MessageReference theRef = iter.next();
if (theRef.getMessage().getMessageID() == messageID) {
iter.remove();
ref = theRef;
break;
}
}
return ref;
//slow path in a separate method
return removeDeliveringRefById(messageID);
}
}
private MessageReference removeDeliveringRefById(long messageID) {
assert deliveringRefs.peek().getMessage().getMessageID() != messageID;
Iterator<MessageReference> iter = deliveringRefs.iterator();
MessageReference ref = null;
while (iter.hasNext()) {
MessageReference theRef = iter.next();
if (theRef.getMessage().getMessageID() == messageID) {
iter.remove();
ref = theRef;
break;
}
}
return ref;
}
/**
* To be used on tests only
*/