From 11214f0ca9ea8b5c58c7d3a9c46249cb95ac3cfc Mon Sep 17 00:00:00 2001 From: Francesco Nigro Date: Wed, 12 Dec 2018 17:47:33 +0100 Subject: [PATCH] ARTEMIS-2212 Avoid using CLQ on ServerConsumerImpl It would deliver a better performance for the most common operations eg offer, poll, iterations, size. --- .../core/server/impl/ServerConsumerImpl.java | 116 ++++++++++-------- 1 file changed, 64 insertions(+), 52 deletions(-) diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java index 19e395646b..18bb9871ab 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java @@ -18,10 +18,12 @@ package org.apache.activemq.artemis.core.server.impl; import java.math.BigDecimal; import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.ArrayDeque; +import java.util.Collections; import java.util.Iterator; import java.util.LinkedList; import java.util.List; -import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; @@ -117,7 +119,10 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener { @Override public String debug() { - return toString() + "::Delivering " + this.deliveringRefs.size(); + String debug = toString() + "::Delivering "; + synchronized (lock) { + return debug + this.deliveringRefs.size(); + } } /** @@ -132,7 +137,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener { private final StorageManager storageManager; - protected final java.util.Deque deliveringRefs = new ConcurrentLinkedDeque<>(); + private final java.util.Deque deliveringRefs = new ArrayDeque<>(); private final SessionCallback callback; @@ -334,16 +339,20 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener { @Override public List getDeliveringMessages() { - List refs = new LinkedList<>(); synchronized (lock) { + int expectedSize = 0; List refsOnConsumer = session.getInTXMessagesForConsumer(this.id); + if (refsOnConsumer != null) { + expectedSize = refsOnConsumer.size(); + } + expectedSize += deliveringRefs.size(); + final List refs = new ArrayList<>(expectedSize); if (refsOnConsumer != null) { refs.addAll(refsOnConsumer); } refs.addAll(deliveringRefs); + return refs; } - - return refs; } /** i @@ -522,21 +531,16 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener { removeItself(); - LinkedList refs = cancelRefs(failed, false, null); - - Iterator iter = refs.iterator(); + List refs = cancelRefs(failed, false, null); Transaction tx = new TransactionImpl(storageManager); - while (iter.hasNext()) { - MessageReference ref = iter.next(); - + refs.forEach(ref -> { if (logger.isTraceEnabled()) { logger.trace("ServerConsumerImpl::" + this + " cancelling reference " + ref); } - ref.getQueue().cancel(tx, ref, true); - } + }); tx.rollback(); @@ -638,9 +642,9 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener { } @Override - public LinkedList cancelRefs(final boolean failed, - final boolean lastConsumedAsDelivered, - final Transaction tx) throws Exception { + public List cancelRefs(final boolean failed, + final boolean lastConsumedAsDelivered, + final Transaction tx) throws Exception { boolean performACK = lastConsumedAsDelivered; try { @@ -654,30 +658,30 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener { largeMessageDeliverer = null; } - LinkedList refs = new LinkedList<>(); - synchronized (lock) { - if (!deliveringRefs.isEmpty()) { - for (MessageReference ref : deliveringRefs) { - if (performACK) { - ref.acknowledge(tx, this); + if (deliveringRefs.isEmpty()) { + return Collections.emptyList(); + } + final List refs = new ArrayList<>(deliveringRefs.size()); + MessageReference ref; + while ((ref = deliveringRefs.poll()) != null) { + if (performACK) { + ref.acknowledge(tx, this); - performACK = false; - } else { - refs.add(ref); - updateDeliveryCountForCanceledRef(ref, failed); - } - - if (logger.isTraceEnabled()) { - logger.trace("ServerConsumerImpl::" + this + " Preparing Cancelling list for messageID = " + ref.getMessage().getMessageID() + ", ref = " + ref); - } + performACK = false; + } else { + refs.add(ref); + updateDeliveryCountForCanceledRef(ref, failed); } - deliveringRefs.clear(); + if (logger.isTraceEnabled()) { + logger.trace("ServerConsumerImpl::" + this + " Preparing Cancelling list for messageID = " + ref.getMessage().getMessageID() + ", ref = " + ref); + } } + + return refs; } - return refs; } protected void updateDeliveryCountForCanceledRef(MessageReference ref, boolean failed) { @@ -1017,7 +1021,9 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener { @Override public synchronized void backToDelivering(MessageReference reference) { - deliveringRefs.addFirst(reference); + synchronized (lock) { + deliveringRefs.addFirst(reference); + } } @Override @@ -1038,26 +1044,32 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener { if (deliveringRefs.peek().getMessage().getMessageID() == messageID) { return deliveringRefs.poll(); } - - Iterator iter = deliveringRefs.iterator(); - - MessageReference ref = null; - - while (iter.hasNext()) { - MessageReference theRef = iter.next(); - - if (theRef.getMessage().getMessageID() == messageID) { - iter.remove(); - - ref = theRef; - - break; - } - } - return ref; + //slow path in a separate method + return removeDeliveringRefById(messageID); } } + private MessageReference removeDeliveringRefById(long messageID) { + assert deliveringRefs.peek().getMessage().getMessageID() != messageID; + + Iterator iter = deliveringRefs.iterator(); + + MessageReference ref = null; + + while (iter.hasNext()) { + MessageReference theRef = iter.next(); + + if (theRef.getMessage().getMessageID() == messageID) { + iter.remove(); + + ref = theRef; + + break; + } + } + return ref; + } + /** * To be used on tests only */