ARTEMIS-1772 Reduce memory footprint and allocations of QueueImpl

It includes:
- Message References: no longer uses boxed primitives and AtomicInteger
- Node: intrusive nodes no longer need a reference field holding itself
- RefCountMessage: no longer uses AtomicInteger, but AtomicIntegerFieldUpdater
This commit is contained in:
Francesco Nigro 2018-03-20 10:16:24 +01:00 committed by Clebert Suconic
parent c17f05de26
commit f6e8345dbe
9 changed files with 149 additions and 82 deletions

View File

@ -30,7 +30,7 @@ public class LinkedListImpl<E> implements LinkedList<E> {
private static final int INITIAL_ITERATOR_ARRAY_SIZE = 10; private static final int INITIAL_ITERATOR_ARRAY_SIZE = 10;
private final Node<E> head = new Node<>(null); private final Node<E> head = new NodeHolder<>(null);
private Node<E> tail = null; private Node<E> tail = null;
@ -91,7 +91,7 @@ public class LinkedListImpl<E> implements LinkedList<E> {
if (ret != null) { if (ret != null) {
removeAfter(head); removeAfter(head);
return ret.val; return ret.val();
} else { } else {
return null; return null;
} }
@ -218,29 +218,37 @@ public class LinkedListImpl<E> implements LinkedList<E> {
throw new IllegalStateException("Cannot find iter to remove"); throw new IllegalStateException("Cannot find iter to remove");
} }
private static final class NodeHolder<T> extends Node<T> {
private final T val;
//only the head is allowed to hold a null
private NodeHolder(T e) {
val = e;
}
@Override
protected T val() {
return val;
}
}
public static class Node<T> { public static class Node<T> {
private Node<T> next; private Node<T> next;
private Node<T> prev; private Node<T> prev;
private final T val;
private int iterCount; private int iterCount;
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
protected Node() { protected T val() {
val = (T)this; return (T) this;
}
//only the head is allowed to hold a null
private Node(T e) {
val = e;
} }
@Override @Override
public String toString() { public String toString() {
return val == this ? "Intrusive Node" : "Node, value = " + val; return val() == this ? "Intrusive Node" : "Node, value = " + val();
} }
private static <T> Node<T> with(final T o) { private static <T> Node<T> with(final T o) {
@ -254,7 +262,7 @@ public class LinkedListImpl<E> implements LinkedList<E> {
return node; return node;
} }
} }
return new Node(o); return new NodeHolder<>(o);
} }
} }
@ -298,14 +306,14 @@ public class LinkedListImpl<E> implements LinkedList<E> {
repeat = false; repeat = false;
if (e != null) { if (e != null) {
return e.val; return e.val();
} else { } else {
if (canAdvance()) { if (canAdvance()) {
advance(); advance();
e = getNode(); e = getNode();
return e.val; return e.val();
} else { } else {
throw new NoSuchElementException(); throw new NoSuchElementException();
} }
@ -326,7 +334,7 @@ public class LinkedListImpl<E> implements LinkedList<E> {
repeat = false; repeat = false;
return e.val; return e.val();
} }
@Override @Override

View File

@ -17,13 +17,16 @@
package org.apache.activemq.artemis.api.core; package org.apache.activemq.artemis.api.core;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
public abstract class RefCountMessage implements Message { public abstract class RefCountMessage implements Message {
private final AtomicInteger durableRefCount = new AtomicInteger(); private static final AtomicIntegerFieldUpdater<RefCountMessage> DURABLE_REF_COUNT_UPDATER = AtomicIntegerFieldUpdater.newUpdater(RefCountMessage.class, "durableRefCount");
private static final AtomicIntegerFieldUpdater<RefCountMessage> REF_COUNT_UPDATER = AtomicIntegerFieldUpdater.newUpdater(RefCountMessage.class, "refCount");
private final AtomicInteger refCount = new AtomicInteger(); private volatile int durableRefCount = 0;
private volatile int refCount = 0;
private RefCountMessageListener context; private RefCountMessageListener context;
@ -40,12 +43,12 @@ public abstract class RefCountMessage implements Message {
@Override @Override
public int getRefCount() { public int getRefCount() {
return refCount.get(); return refCount;
} }
@Override @Override
public int incrementRefCount() throws Exception { public int incrementRefCount() throws Exception {
int count = refCount.incrementAndGet(); int count = REF_COUNT_UPDATER.incrementAndGet(this);
if (context != null) { if (context != null) {
context.nonDurableUp(this, count); context.nonDurableUp(this, count);
} }
@ -54,7 +57,7 @@ public abstract class RefCountMessage implements Message {
@Override @Override
public int incrementDurableRefCount() { public int incrementDurableRefCount() {
int count = durableRefCount.incrementAndGet(); int count = DURABLE_REF_COUNT_UPDATER.incrementAndGet(this);
if (context != null) { if (context != null) {
context.durableUp(this, count); context.durableUp(this, count);
} }
@ -63,7 +66,7 @@ public abstract class RefCountMessage implements Message {
@Override @Override
public int decrementDurableRefCount() { public int decrementDurableRefCount() {
int count = durableRefCount.decrementAndGet(); int count = DURABLE_REF_COUNT_UPDATER.decrementAndGet(this);
if (context != null) { if (context != null) {
context.durableDown(this, count); context.durableDown(this, count);
} }
@ -72,7 +75,7 @@ public abstract class RefCountMessage implements Message {
@Override @Override
public int decrementRefCount() throws Exception { public int decrementRefCount() throws Exception {
int count = refCount.decrementAndGet(); int count = REF_COUNT_UPDATER.decrementAndGet(this);
if (context != null) { if (context != null) {
context.nonDurableDown(this, count); context.nonDurableDown(this, count);
} }

View File

@ -1269,11 +1269,9 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
for (ListIterator<MessageReference> referenceIterator = ackRefs.listIterator(ackRefs.size()); referenceIterator.hasPrevious(); ) { for (ListIterator<MessageReference> referenceIterator = ackRefs.listIterator(ackRefs.size()); referenceIterator.hasPrevious(); ) {
MessageReference ref = referenceIterator.previous(); MessageReference ref = referenceIterator.previous();
Long consumerID = ref.getConsumerId();
ServerConsumer consumer = null; ServerConsumer consumer = null;
if (consumerID != null) { if (ref.hasConsumerId()) {
consumer = session.getCoreSession().locateConsumer(consumerID); consumer = session.getCoreSession().locateConsumer(ref.getConsumerId());
} }
if (consumer != null) { if (consumer != null) {

View File

@ -17,7 +17,7 @@
package org.apache.activemq.artemis.core.paging.cursor; package org.apache.activemq.artemis.core.paging.cursor;
import java.lang.ref.WeakReference; import java.lang.ref.WeakReference;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.core.paging.PagedMessage; import org.apache.activemq.artemis.core.paging.PagedMessage;
@ -33,19 +33,26 @@ public class PagedReferenceImpl extends LinkedListImpl.Node<PagedReferenceImpl>
private static final Logger logger = Logger.getLogger(PagedReferenceImpl.class); private static final Logger logger = Logger.getLogger(PagedReferenceImpl.class);
private static final AtomicIntegerFieldUpdater<PagedReferenceImpl> DELIVERY_COUNT_UPDATER = AtomicIntegerFieldUpdater
.newUpdater(PagedReferenceImpl.class, "deliveryCount");
private final PagePosition position; private final PagePosition position;
private WeakReference<PagedMessage> message; private WeakReference<PagedMessage> message;
private Long deliveryTime = null; private static final long UNDEFINED_DELIVERY_TIME = Long.MIN_VALUE;
private long deliveryTime = UNDEFINED_DELIVERY_TIME;
private int persistedCount; private int persistedCount;
private int messageEstimate = -1; private int messageEstimate = -1;
private Long consumerId; private long consumerID;
private final AtomicInteger deliveryCount = new AtomicInteger(0); private boolean hasConsumerID = false;
@SuppressWarnings("unused")
private volatile int deliveryCount = 0;
private final PageSubscription subscription; private final PageSubscription subscription;
@ -53,7 +60,11 @@ public class PagedReferenceImpl extends LinkedListImpl.Node<PagedReferenceImpl>
private Object protocolData; private Object protocolData;
private Boolean largeMessage; //0 is false, 1 is true, 2 not defined
private static final byte IS_NOT_LARGE_MESSAGE = 0;
private static final byte IS_LARGE_MESSAGE = 1;
private static final byte UNDEFINED_IS_LARGE_MESSAGE = 2;
private byte largeMessage;
private long transactionID = -1; private long transactionID = -1;
@ -104,14 +115,14 @@ public class PagedReferenceImpl extends LinkedListImpl.Node<PagedReferenceImpl>
this.message = new WeakReference<>(message); this.message = new WeakReference<>(message);
this.subscription = subscription; this.subscription = subscription;
if (message != null) { if (message != null) {
this.largeMessage = message.getMessage().isLargeMessage(); this.largeMessage = message.getMessage().isLargeMessage() ? IS_LARGE_MESSAGE : IS_NOT_LARGE_MESSAGE;
this.transactionID = message.getTransactionID(); this.transactionID = message.getTransactionID();
this.messageID = message.getMessage().getMessageID(); this.messageID = message.getMessage().getMessageID();
//pre-cache the message size so we don't have to reload the message later if it is GC'd //pre-cache the message size so we don't have to reload the message later if it is GC'd
getPersistentSize(); getPersistentSize();
} else { } else {
this.largeMessage = null; this.largeMessage = UNDEFINED_IS_LARGE_MESSAGE;
this.transactionID = -1; this.transactionID = -1;
this.messageID = -1; this.messageID = -1;
this.messageSize = -1; this.messageSize = -1;
@ -152,7 +163,7 @@ public class PagedReferenceImpl extends LinkedListImpl.Node<PagedReferenceImpl>
@Override @Override
public long getScheduledDeliveryTime() { public long getScheduledDeliveryTime() {
if (deliveryTime == null) { if (deliveryTime == UNDEFINED_DELIVERY_TIME) {
try { try {
Message msg = getMessage(); Message msg = getMessage();
return msg.getScheduledDeliveryTime(); return msg.getScheduledDeliveryTime();
@ -166,31 +177,31 @@ public class PagedReferenceImpl extends LinkedListImpl.Node<PagedReferenceImpl>
@Override @Override
public void setScheduledDeliveryTime(final long scheduledDeliveryTime) { public void setScheduledDeliveryTime(final long scheduledDeliveryTime) {
assert scheduledDeliveryTime != UNDEFINED_DELIVERY_TIME : "can't use a reserved value";
deliveryTime = scheduledDeliveryTime; deliveryTime = scheduledDeliveryTime;
} }
@Override @Override
public int getDeliveryCount() { public int getDeliveryCount() {
return deliveryCount.get(); return DELIVERY_COUNT_UPDATER.get(this);
} }
@Override @Override
public void setDeliveryCount(final int deliveryCount) { public void setDeliveryCount(final int deliveryCount) {
this.deliveryCount.set(deliveryCount); DELIVERY_COUNT_UPDATER.set(this, deliveryCount);
} }
@Override @Override
public void incrementDeliveryCount() { public void incrementDeliveryCount() {
deliveryCount.incrementAndGet(); DELIVERY_COUNT_UPDATER.incrementAndGet(this);
if (logger.isTraceEnabled()) { if (logger.isTraceEnabled()) {
logger.trace("++deliveryCount = " + deliveryCount + " for " + this, new Exception("trace")); logger.trace("++deliveryCount = " + deliveryCount + " for " + this, new Exception("trace"));
} }
} }
@Override @Override
public void decrementDeliveryCount() { public void decrementDeliveryCount() {
deliveryCount.decrementAndGet(); DELIVERY_COUNT_UPDATER.decrementAndGet(this);
if (logger.isTraceEnabled()) { if (logger.isTraceEnabled()) {
logger.trace("--deliveryCount = " + deliveryCount + " for " + this, new Exception("trace")); logger.trace("--deliveryCount = " + deliveryCount + " for " + this, new Exception("trace"));
} }
@ -251,7 +262,7 @@ public class PagedReferenceImpl extends LinkedListImpl.Node<PagedReferenceImpl>
", message=" + ", message=" +
msgToString + msgToString +
", deliveryTime=" + ", deliveryTime=" +
deliveryTime + (deliveryTime == UNDEFINED_DELIVERY_TIME ? null : deliveryTime) +
", persistedCount=" + ", persistedCount=" +
persistedCount + persistedCount +
", deliveryCount=" + ", deliveryCount=" +
@ -261,28 +272,41 @@ public class PagedReferenceImpl extends LinkedListImpl.Node<PagedReferenceImpl>
"]"; "]";
} }
/* (non-Javadoc)
* @see org.apache.activemq.artemis.core.server.MessageReference#setConsumerId(java.lang.Long)
*/
@Override @Override
public void setConsumerId(Long consumerID) { public void emptyConsumerID() {
this.consumerId = consumerID; this.hasConsumerID = false;
} }
/* (non-Javadoc)
* @see org.apache.activemq.artemis.core.server.MessageReference#getConsumerId()
*/
@Override @Override
public Long getConsumerId() { public void setConsumerId(long consumerID) {
return this.consumerId; this.hasConsumerID = true;
this.consumerID = consumerID;
}
@Override
public boolean hasConsumerId() {
return hasConsumerID;
}
@Override
public long getConsumerId() {
if (!this.hasConsumerID) {
throw new IllegalStateException("consumerID isn't specified: please check hasConsumerId first");
}
return this.consumerID;
} }
@Override @Override
public boolean isLargeMessage() { public boolean isLargeMessage() {
if (largeMessage == null && message != null) { if (largeMessage == UNDEFINED_IS_LARGE_MESSAGE && message != null) {
largeMessage = getMessage().isLargeMessage(); initializeIsLargeMessage();
} }
return largeMessage; return largeMessage == IS_LARGE_MESSAGE;
}
private void initializeIsLargeMessage() {
assert largeMessage == UNDEFINED_IS_LARGE_MESSAGE && message != null;
largeMessage = getMessage().isLargeMessage() ? IS_LARGE_MESSAGE : IS_NOT_LARGE_MESSAGE;
} }
@Override @Override

View File

@ -91,9 +91,13 @@ public interface MessageReference {
void acknowledge(Transaction tx, AckReason reason) throws Exception; void acknowledge(Transaction tx, AckReason reason) throws Exception;
void setConsumerId(Long consumerID); void emptyConsumerID();
Long getConsumerId(); void setConsumerId(long consumerID);
boolean hasConsumerId();
long getConsumerId();
void handled(); void handled();

View File

@ -175,7 +175,9 @@ public class LastValueQueue extends QueueImpl {
private volatile MessageReference ref; private volatile MessageReference ref;
private Long consumerId; private long consumerID;
private boolean hasConsumerID = false;
HolderReference(final SimpleString prop, final MessageReference ref) { HolderReference(final SimpleString prop, final MessageReference ref) {
this.prop = prop; this.prop = prop;
@ -309,20 +311,28 @@ public class LastValueQueue extends QueueImpl {
return ref.getMessage().getMemoryEstimate(); return ref.getMessage().getMemoryEstimate();
} }
/* (non-Javadoc)
* @see org.apache.activemq.artemis.core.server.MessageReference#setConsumerId(java.lang.Long)
*/
@Override @Override
public void setConsumerId(Long consumerID) { public void emptyConsumerID() {
this.consumerId = consumerID; this.hasConsumerID = false;
} }
/* (non-Javadoc)
* @see org.apache.activemq.artemis.core.server.MessageReference#getConsumerId()
*/
@Override @Override
public Long getConsumerId() { public void setConsumerId(long consumerID) {
return this.consumerId; this.hasConsumerID = true;
this.consumerID = consumerID;
}
@Override
public boolean hasConsumerId() {
return hasConsumerID;
}
@Override
public long getConsumerId() {
if (!this.hasConsumerID) {
throw new IllegalStateException("consumerID isn't specified: please check hasConsumerId first");
}
return this.consumerID;
} }
@Override @Override

View File

@ -16,7 +16,7 @@
*/ */
package org.apache.activemq.artemis.core.server.impl; package org.apache.activemq.artemis.core.server.impl;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.Message;
@ -30,7 +30,11 @@ import org.apache.activemq.artemis.utils.collections.LinkedListImpl;
*/ */
public class MessageReferenceImpl extends LinkedListImpl.Node<MessageReferenceImpl> implements MessageReference { public class MessageReferenceImpl extends LinkedListImpl.Node<MessageReferenceImpl> implements MessageReference {
private final AtomicInteger deliveryCount = new AtomicInteger(); private static final AtomicIntegerFieldUpdater<MessageReferenceImpl> DELIVERY_COUNT_UPDATER = AtomicIntegerFieldUpdater
.newUpdater(MessageReferenceImpl.class, "deliveryCount");
@SuppressWarnings("unused")
private volatile int deliveryCount = 0;
private volatile int persistedCount; private volatile int persistedCount;
@ -40,7 +44,9 @@ public class MessageReferenceImpl extends LinkedListImpl.Node<MessageReferenceIm
private final Queue queue; private final Queue queue;
private Long consumerID; private long consumerID;
private boolean hasConsumerID = false;
private boolean alreadyAcked; private boolean alreadyAcked;
@ -59,7 +65,7 @@ public class MessageReferenceImpl extends LinkedListImpl.Node<MessageReferenceIm
} }
public MessageReferenceImpl(final MessageReferenceImpl other, final Queue queue) { public MessageReferenceImpl(final MessageReferenceImpl other, final Queue queue) {
deliveryCount.set(other.deliveryCount.get()); DELIVERY_COUNT_UPDATER.set(this, other.getDeliveryCount());
scheduledDeliveryTime = other.scheduledDeliveryTime; scheduledDeliveryTime = other.scheduledDeliveryTime;
@ -113,23 +119,23 @@ public class MessageReferenceImpl extends LinkedListImpl.Node<MessageReferenceIm
@Override @Override
public int getDeliveryCount() { public int getDeliveryCount() {
return deliveryCount.get(); return DELIVERY_COUNT_UPDATER.get(this);
} }
@Override @Override
public void setDeliveryCount(final int deliveryCount) { public void setDeliveryCount(final int deliveryCount) {
this.deliveryCount.set(deliveryCount); DELIVERY_COUNT_UPDATER.set(this, deliveryCount);
this.persistedCount = this.deliveryCount.get(); this.persistedCount = deliveryCount;
} }
@Override @Override
public void incrementDeliveryCount() { public void incrementDeliveryCount() {
deliveryCount.incrementAndGet(); DELIVERY_COUNT_UPDATER.incrementAndGet(this);
} }
@Override @Override
public void decrementDeliveryCount() { public void decrementDeliveryCount() {
deliveryCount.decrementAndGet(); DELIVERY_COUNT_UPDATER.decrementAndGet(this);
} }
@Override @Override
@ -197,12 +203,26 @@ public class MessageReferenceImpl extends LinkedListImpl.Node<MessageReferenceIm
} }
@Override @Override
public void setConsumerId(Long consumerID) { public void emptyConsumerID() {
this.hasConsumerID = false;
}
@Override
public void setConsumerId(long consumerID) {
this.hasConsumerID = true;
this.consumerID = consumerID; this.consumerID = consumerID;
} }
@Override @Override
public Long getConsumerId() { public boolean hasConsumerId() {
return hasConsumerID;
}
@Override
public long getConsumerId() {
if (!this.hasConsumerID) {
throw new IllegalStateException("consumerID isn't specified: please check hasConsumerId first");
}
return this.consumerID; return this.consumerID;
} }

View File

@ -81,7 +81,7 @@ public class RefsOperation extends TransactionOperationAbstract {
List<MessageReference> ackedRefs = new ArrayList<>(); List<MessageReference> ackedRefs = new ArrayList<>();
for (MessageReference ref : refsToAck) { for (MessageReference ref : refsToAck) {
ref.setConsumerId(null); ref.emptyConsumerID();
if (logger.isTraceEnabled()) { if (logger.isTraceEnabled()) {
logger.trace("rolling back " + ref); logger.trace("rolling back " + ref);
@ -189,7 +189,7 @@ public class RefsOperation extends TransactionOperationAbstract {
public synchronized List<MessageReference> getListOnConsumer(long consumerID) { public synchronized List<MessageReference> getListOnConsumer(long consumerID) {
List<MessageReference> list = new LinkedList<>(); List<MessageReference> list = new LinkedList<>();
for (MessageReference ref : refsToAck) { for (MessageReference ref : refsToAck) {
if (ref.getConsumerId() != null && ref.getConsumerId().equals(consumerID)) { if (ref.hasConsumerId() && ref.getConsumerId() == consumerID) {
list.add(ref); list.add(ref);
} }
} }

View File

@ -629,7 +629,7 @@ public class LoggingActiveMQServerPlugin implements ActiveMQServerPlugin, Serial
// info level logging // info level logging
LoggingActiveMQServerPluginLogger.LOGGER.messageAcknowledged((message == null ? UNAVAILABLE : Long.toString(message.getMessageID())), LoggingActiveMQServerPluginLogger.LOGGER.messageAcknowledged((message == null ? UNAVAILABLE : Long.toString(message.getMessageID())),
(ref == null ? UNAVAILABLE : Long.toString(ref.getConsumerId())), (ref == null ? UNAVAILABLE : ref.hasConsumerId() ? Long.toString(ref.getConsumerId()) : null),
(queue == null ? UNAVAILABLE : queue.getName().toString()), (queue == null ? UNAVAILABLE : queue.getName().toString()),
reason); reason);
} }