diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/LinkedListImpl.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/LinkedListImpl.java index 6071324559..cb202583b9 100644 --- a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/LinkedListImpl.java +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/LinkedListImpl.java @@ -30,7 +30,7 @@ public class LinkedListImpl implements LinkedList { private static final int INITIAL_ITERATOR_ARRAY_SIZE = 10; - private final Node head = new Node<>(null); + private final Node head = new NodeHolder<>(null); private Node tail = null; @@ -91,7 +91,7 @@ public class LinkedListImpl implements LinkedList { if (ret != null) { removeAfter(head); - return ret.val; + return ret.val(); } else { return null; } @@ -218,29 +218,37 @@ public class LinkedListImpl implements LinkedList { throw new IllegalStateException("Cannot find iter to remove"); } + private static final class NodeHolder extends Node { + + private final T val; + + //only the head is allowed to hold a null + private NodeHolder(T e) { + val = e; + } + + @Override + protected T val() { + return val; + } + } + public static class Node { private Node next; private Node prev; - private final T val; - private int iterCount; @SuppressWarnings("unchecked") - protected Node() { - val = (T)this; - } - - //only the head is allowed to hold a null - private Node(T e) { - val = e; + protected T val() { + return (T) this; } @Override public String toString() { - return val == this ? "Intrusive Node" : "Node, value = " + val; + return val() == this ? "Intrusive Node" : "Node, value = " + val(); } private static Node with(final T o) { @@ -254,7 +262,7 @@ public class LinkedListImpl implements LinkedList { return node; } } - return new Node(o); + return new NodeHolder<>(o); } } @@ -298,14 +306,14 @@ public class LinkedListImpl implements LinkedList { repeat = false; if (e != null) { - return e.val; + return e.val(); } else { if (canAdvance()) { advance(); e = getNode(); - return e.val; + return e.val(); } else { throw new NoSuchElementException(); } @@ -326,7 +334,7 @@ public class LinkedListImpl implements LinkedList { repeat = false; - return e.val; + return e.val(); } @Override diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/RefCountMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/RefCountMessage.java index 64dd44d299..5754bcc054 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/RefCountMessage.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/RefCountMessage.java @@ -17,13 +17,16 @@ package org.apache.activemq.artemis.api.core; -import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; public abstract class RefCountMessage implements Message { - private final AtomicInteger durableRefCount = new AtomicInteger(); + private static final AtomicIntegerFieldUpdater DURABLE_REF_COUNT_UPDATER = AtomicIntegerFieldUpdater.newUpdater(RefCountMessage.class, "durableRefCount"); + private static final AtomicIntegerFieldUpdater REF_COUNT_UPDATER = AtomicIntegerFieldUpdater.newUpdater(RefCountMessage.class, "refCount"); - private final AtomicInteger refCount = new AtomicInteger(); + private volatile int durableRefCount = 0; + + private volatile int refCount = 0; private RefCountMessageListener context; @@ -40,12 +43,12 @@ public abstract class RefCountMessage implements Message { @Override public int getRefCount() { - return refCount.get(); + return refCount; } @Override public int incrementRefCount() throws Exception { - int count = refCount.incrementAndGet(); + int count = REF_COUNT_UPDATER.incrementAndGet(this); if (context != null) { context.nonDurableUp(this, count); } @@ -54,7 +57,7 @@ public abstract class RefCountMessage implements Message { @Override public int incrementDurableRefCount() { - int count = durableRefCount.incrementAndGet(); + int count = DURABLE_REF_COUNT_UPDATER.incrementAndGet(this); if (context != null) { context.durableUp(this, count); } @@ -63,7 +66,7 @@ public abstract class RefCountMessage implements Message { @Override public int decrementDurableRefCount() { - int count = durableRefCount.decrementAndGet(); + int count = DURABLE_REF_COUNT_UPDATER.decrementAndGet(this); if (context != null) { context.durableDown(this, count); } @@ -72,7 +75,7 @@ public abstract class RefCountMessage implements Message { @Override public int decrementRefCount() throws Exception { - int count = refCount.decrementAndGet(); + int count = REF_COUNT_UPDATER.decrementAndGet(this); if (context != null) { context.nonDurableDown(this, count); } diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java index 499fb4bbaf..f671671e73 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java @@ -1269,11 +1269,9 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se for (ListIterator referenceIterator = ackRefs.listIterator(ackRefs.size()); referenceIterator.hasPrevious(); ) { MessageReference ref = referenceIterator.previous(); - Long consumerID = ref.getConsumerId(); - ServerConsumer consumer = null; - if (consumerID != null) { - consumer = session.getCoreSession().locateConsumer(consumerID); + if (ref.hasConsumerId()) { + consumer = session.getCoreSession().locateConsumer(ref.getConsumerId()); } if (consumer != null) { diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReferenceImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReferenceImpl.java index 23f01f9577..9a37bd81f6 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReferenceImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReferenceImpl.java @@ -17,7 +17,7 @@ package org.apache.activemq.artemis.core.paging.cursor; import java.lang.ref.WeakReference; -import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.core.paging.PagedMessage; @@ -33,19 +33,26 @@ public class PagedReferenceImpl extends LinkedListImpl.Node private static final Logger logger = Logger.getLogger(PagedReferenceImpl.class); + private static final AtomicIntegerFieldUpdater DELIVERY_COUNT_UPDATER = AtomicIntegerFieldUpdater + .newUpdater(PagedReferenceImpl.class, "deliveryCount"); + private final PagePosition position; private WeakReference message; - private Long deliveryTime = null; + private static final long UNDEFINED_DELIVERY_TIME = Long.MIN_VALUE; + private long deliveryTime = UNDEFINED_DELIVERY_TIME; private int persistedCount; private int messageEstimate = -1; - private Long consumerId; + private long consumerID; - private final AtomicInteger deliveryCount = new AtomicInteger(0); + private boolean hasConsumerID = false; + + @SuppressWarnings("unused") + private volatile int deliveryCount = 0; private final PageSubscription subscription; @@ -53,7 +60,11 @@ public class PagedReferenceImpl extends LinkedListImpl.Node private Object protocolData; - private Boolean largeMessage; + //0 is false, 1 is true, 2 not defined + private static final byte IS_NOT_LARGE_MESSAGE = 0; + private static final byte IS_LARGE_MESSAGE = 1; + private static final byte UNDEFINED_IS_LARGE_MESSAGE = 2; + private byte largeMessage; private long transactionID = -1; @@ -104,14 +115,14 @@ public class PagedReferenceImpl extends LinkedListImpl.Node this.message = new WeakReference<>(message); this.subscription = subscription; if (message != null) { - this.largeMessage = message.getMessage().isLargeMessage(); + this.largeMessage = message.getMessage().isLargeMessage() ? IS_LARGE_MESSAGE : IS_NOT_LARGE_MESSAGE; this.transactionID = message.getTransactionID(); this.messageID = message.getMessage().getMessageID(); //pre-cache the message size so we don't have to reload the message later if it is GC'd getPersistentSize(); } else { - this.largeMessage = null; + this.largeMessage = UNDEFINED_IS_LARGE_MESSAGE; this.transactionID = -1; this.messageID = -1; this.messageSize = -1; @@ -152,7 +163,7 @@ public class PagedReferenceImpl extends LinkedListImpl.Node @Override public long getScheduledDeliveryTime() { - if (deliveryTime == null) { + if (deliveryTime == UNDEFINED_DELIVERY_TIME) { try { Message msg = getMessage(); return msg.getScheduledDeliveryTime(); @@ -166,31 +177,31 @@ public class PagedReferenceImpl extends LinkedListImpl.Node @Override public void setScheduledDeliveryTime(final long scheduledDeliveryTime) { + assert scheduledDeliveryTime != UNDEFINED_DELIVERY_TIME : "can't use a reserved value"; deliveryTime = scheduledDeliveryTime; } @Override public int getDeliveryCount() { - return deliveryCount.get(); + return DELIVERY_COUNT_UPDATER.get(this); } @Override public void setDeliveryCount(final int deliveryCount) { - this.deliveryCount.set(deliveryCount); + DELIVERY_COUNT_UPDATER.set(this, deliveryCount); } @Override public void incrementDeliveryCount() { - deliveryCount.incrementAndGet(); + DELIVERY_COUNT_UPDATER.incrementAndGet(this); if (logger.isTraceEnabled()) { logger.trace("++deliveryCount = " + deliveryCount + " for " + this, new Exception("trace")); } - } @Override public void decrementDeliveryCount() { - deliveryCount.decrementAndGet(); + DELIVERY_COUNT_UPDATER.decrementAndGet(this); if (logger.isTraceEnabled()) { logger.trace("--deliveryCount = " + deliveryCount + " for " + this, new Exception("trace")); } @@ -251,7 +262,7 @@ public class PagedReferenceImpl extends LinkedListImpl.Node ", message=" + msgToString + ", deliveryTime=" + - deliveryTime + + (deliveryTime == UNDEFINED_DELIVERY_TIME ? null : deliveryTime) + ", persistedCount=" + persistedCount + ", deliveryCount=" + @@ -261,28 +272,41 @@ public class PagedReferenceImpl extends LinkedListImpl.Node "]"; } - /* (non-Javadoc) - * @see org.apache.activemq.artemis.core.server.MessageReference#setConsumerId(java.lang.Long) - */ @Override - public void setConsumerId(Long consumerID) { - this.consumerId = consumerID; + public void emptyConsumerID() { + this.hasConsumerID = false; } - /* (non-Javadoc) - * @see org.apache.activemq.artemis.core.server.MessageReference#getConsumerId() - */ @Override - public Long getConsumerId() { - return this.consumerId; + public void setConsumerId(long consumerID) { + this.hasConsumerID = true; + this.consumerID = consumerID; + } + + @Override + public boolean hasConsumerId() { + return hasConsumerID; + } + + @Override + public long getConsumerId() { + if (!this.hasConsumerID) { + throw new IllegalStateException("consumerID isn't specified: please check hasConsumerId first"); + } + return this.consumerID; } @Override public boolean isLargeMessage() { - if (largeMessage == null && message != null) { - largeMessage = getMessage().isLargeMessage(); + if (largeMessage == UNDEFINED_IS_LARGE_MESSAGE && message != null) { + initializeIsLargeMessage(); } - return largeMessage; + return largeMessage == IS_LARGE_MESSAGE; + } + + private void initializeIsLargeMessage() { + assert largeMessage == UNDEFINED_IS_LARGE_MESSAGE && message != null; + largeMessage = getMessage().isLargeMessage() ? IS_LARGE_MESSAGE : IS_NOT_LARGE_MESSAGE; } @Override diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/MessageReference.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/MessageReference.java index d9145b1498..0db84c543b 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/MessageReference.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/MessageReference.java @@ -91,9 +91,13 @@ public interface MessageReference { void acknowledge(Transaction tx, AckReason reason) throws Exception; - void setConsumerId(Long consumerID); + void emptyConsumerID(); - Long getConsumerId(); + void setConsumerId(long consumerID); + + boolean hasConsumerId(); + + long getConsumerId(); void handled(); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java index 2620cf92de..e3097d195a 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java @@ -175,7 +175,9 @@ public class LastValueQueue extends QueueImpl { private volatile MessageReference ref; - private Long consumerId; + private long consumerID; + + private boolean hasConsumerID = false; HolderReference(final SimpleString prop, final MessageReference ref) { this.prop = prop; @@ -309,20 +311,28 @@ public class LastValueQueue extends QueueImpl { return ref.getMessage().getMemoryEstimate(); } - /* (non-Javadoc) - * @see org.apache.activemq.artemis.core.server.MessageReference#setConsumerId(java.lang.Long) - */ @Override - public void setConsumerId(Long consumerID) { - this.consumerId = consumerID; + public void emptyConsumerID() { + this.hasConsumerID = false; } - /* (non-Javadoc) - * @see org.apache.activemq.artemis.core.server.MessageReference#getConsumerId() - */ @Override - public Long getConsumerId() { - return this.consumerId; + public void setConsumerId(long consumerID) { + this.hasConsumerID = true; + this.consumerID = consumerID; + } + + @Override + public boolean hasConsumerId() { + return hasConsumerID; + } + + @Override + public long getConsumerId() { + if (!this.hasConsumerID) { + throw new IllegalStateException("consumerID isn't specified: please check hasConsumerId first"); + } + return this.consumerID; } @Override diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/MessageReferenceImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/MessageReferenceImpl.java index 2802740191..4d077aedfe 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/MessageReferenceImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/MessageReferenceImpl.java @@ -16,7 +16,7 @@ */ package org.apache.activemq.artemis.core.server.impl; -import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.Message; @@ -30,7 +30,11 @@ import org.apache.activemq.artemis.utils.collections.LinkedListImpl; */ public class MessageReferenceImpl extends LinkedListImpl.Node implements MessageReference { - private final AtomicInteger deliveryCount = new AtomicInteger(); + private static final AtomicIntegerFieldUpdater DELIVERY_COUNT_UPDATER = AtomicIntegerFieldUpdater + .newUpdater(MessageReferenceImpl.class, "deliveryCount"); + + @SuppressWarnings("unused") + private volatile int deliveryCount = 0; private volatile int persistedCount; @@ -40,7 +44,9 @@ public class MessageReferenceImpl extends LinkedListImpl.Node ackedRefs = new ArrayList<>(); for (MessageReference ref : refsToAck) { - ref.setConsumerId(null); + ref.emptyConsumerID(); if (logger.isTraceEnabled()) { logger.trace("rolling back " + ref); @@ -189,7 +189,7 @@ public class RefsOperation extends TransactionOperationAbstract { public synchronized List getListOnConsumer(long consumerID) { List list = new LinkedList<>(); for (MessageReference ref : refsToAck) { - if (ref.getConsumerId() != null && ref.getConsumerId().equals(consumerID)) { + if (ref.hasConsumerId() && ref.getConsumerId() == consumerID) { list.add(ref); } } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/impl/LoggingActiveMQServerPlugin.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/impl/LoggingActiveMQServerPlugin.java index 70bb6ca632..3e66bf1c29 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/impl/LoggingActiveMQServerPlugin.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/impl/LoggingActiveMQServerPlugin.java @@ -629,7 +629,7 @@ public class LoggingActiveMQServerPlugin implements ActiveMQServerPlugin, Serial // info level logging LoggingActiveMQServerPluginLogger.LOGGER.messageAcknowledged((message == null ? UNAVAILABLE : Long.toString(message.getMessageID())), - (ref == null ? UNAVAILABLE : Long.toString(ref.getConsumerId())), + (ref == null ? UNAVAILABLE : ref.hasConsumerId() ? Long.toString(ref.getConsumerId()) : null), (queue == null ? UNAVAILABLE : queue.getName().toString()), reason); }