ARTEMIS-3067, ARTEMIS-3135 - rework accounting for lazy decoding by directly referencing the owning page store, owner now tracked on a message rather than the message reference. This avoids the error prone checks around potential decoding sites

This commit is contained in:
gtully 2021-02-23 21:11:48 +00:00 committed by Gary Tully
parent 66040b009c
commit a0ce3812ba
21 changed files with 103 additions and 110 deletions

View File

@ -758,4 +758,7 @@ public interface Message {
*/
long getPersistentSize() throws ActiveMQException;
Object getOwner();
void setOwner(Object object);
}

View File

@ -101,6 +101,8 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
private final CoreMessageObjectPools coreMessageObjectPools;
private volatile Object owner;
public CoreMessage(final CoreMessageObjectPools coreMessageObjectPools) {
this.coreMessageObjectPools = coreMessageObjectPools;
}
@ -1259,4 +1261,14 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
public long getPersistentSize() throws ActiveMQException {
return getEncodeSize();
}
@Override
public Object getOwner() {
return owner;
}
@Override
public void setOwner(Object object) {
this.owner = object;
}
}

View File

@ -709,4 +709,14 @@ public class MessageInternalImpl implements MessageInternal {
return message.getPersistentSize();
}
@Override
public Object getOwner() {
return message.getOwner();
}
@Override
public void setOwner(Object object) {
message.setOwner(object);
}
}

View File

@ -34,6 +34,7 @@ import org.apache.activemq.artemis.api.core.ICoreMessage;
import org.apache.activemq.artemis.api.core.RefCountMessage;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.paging.PagingStore;
import org.apache.activemq.artemis.core.persistence.CoreMessageObjectPools;
import org.apache.activemq.artemis.core.persistence.Persister;
import org.apache.activemq.artemis.core.server.MessageReference;
@ -213,6 +214,8 @@ public abstract class AMQPMessage extends RefCountMessage implements org.apache.
// These are properties set at the broker level and carried only internally by broker storage.
protected volatile TypedProperties extraProperties;
private volatile Object owner;
/**
* Creates a new {@link AMQPMessage} instance from binary encoded message data.
*
@ -490,12 +493,30 @@ public abstract class AMQPMessage extends RefCountMessage implements org.apache.
protected ApplicationProperties lazyDecodeApplicationProperties(ReadableBuffer data) {
if (applicationProperties == null && applicationPropertiesPosition != VALUE_NOT_PRESENT) {
applicationProperties = scanForMessageSection(data, applicationPropertiesPosition, ApplicationProperties.class);
memoryEstimate = -1;
if (owner != null && memoryEstimate != -1) {
// the memory has already been tracked and needs to be updated to reflect the new decoding
int addition = unmarshalledApplicationPropertiesMemoryEstimateFromData(data);
((PagingStore)owner).addSize(addition);
final int updatedEstimate = memoryEstimate + addition;
memoryEstimate = updatedEstimate;
}
}
return applicationProperties;
}
protected int unmarshalledApplicationPropertiesMemoryEstimateFromData(ReadableBuffer data) {
if (applicationProperties != null) {
// they have been unmarshalled, estimate memory usage based on their encoded size
if (remainingBodyPosition != VALUE_NOT_PRESENT) {
return remainingBodyPosition - applicationPropertiesPosition;
} else {
return data.capacity() - applicationPropertiesPosition;
}
}
return 0;
}
@SuppressWarnings("unchecked")
protected Map<String, Object> getApplicationPropertiesMap(boolean createIfAbsent) {
ApplicationProperties appMap = lazyDecodeApplicationProperties();
@ -1692,4 +1713,14 @@ public abstract class AMQPMessage extends RefCountMessage implements org.apache.
protected SimpleString.StringSimpleStringPool getPropertyValuesPool() {
return coreMessageObjectPools == null ? null : coreMessageObjectPools.getPropertiesStringSimpleStringPools().getPropertyValuesPool();
}
@Override
public Object getOwner() {
return owner;
}
@Override
public void setOwner(Object object) {
this.owner = object;
}
}

View File

@ -187,23 +187,12 @@ public class AMQPStandardMessage extends AMQPMessage {
@Override
public int getMemoryEstimate() {
if (memoryEstimate == -1) {
memoryEstimate = memoryOffset + (data != null ? data.capacity() + unmarshalledApplicationPropertiesMemoryEstimateFromData() : 0);
memoryEstimate = memoryOffset + (data != null ? data.capacity() + unmarshalledApplicationPropertiesMemoryEstimateFromData(data) : 0);
}
return memoryEstimate;
}
private int unmarshalledApplicationPropertiesMemoryEstimateFromData() {
if (applicationProperties != null) {
// they have been unmarshalled, estimate memory usage based on their encoded size
if (remainingBodyPosition != VALUE_NOT_PRESENT) {
return remainingBodyPosition - applicationPropertiesPosition;
} else {
return data.capacity() - applicationPropertiesPosition;
}
}
return 0;
}
@Override
public void persist(ActiveMQBuffer targetRecord) {

View File

@ -23,7 +23,6 @@ import java.util.Map;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.paging.PagingStore;
import org.apache.activemq.artemis.core.server.ActiveMQComponent;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.MessageReference;
@ -153,15 +152,8 @@ public class AMQPMirrorControllerSource implements MirrorController, ActiveMQCom
try {
context.setReusable(false);
PagingStore storeOwner = null;
if (refs.size() > 0) {
storeOwner = refs.get(0).getOwner();
}
if (storeOwner != null && !storeOwner.getAddress().equals(message.getAddressSimpleString())) {
storeOwner = server.getPagingManager().getPageStore(message.getAddressSimpleString());
}
MessageReference ref = MessageReference.Factory.createReference(message, snfQueue, storeOwner);
MessageReference ref = MessageReference.Factory.createReference(message, snfQueue);
snfQueue.refUp(ref);
Map<Symbol, Object> daMap = new HashMap<>();

View File

@ -498,6 +498,15 @@ public class OpenwireMessage implements Message {
return 0;
}
@Override
public Object getOwner() {
return null;
}
@Override
public void setOwner(Object object) {
}
@Override
public int getUsage() {
return 0;

View File

@ -51,7 +51,6 @@ import org.apache.activemq.artemis.core.server.Consumer;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.ServerConsumer;
import org.apache.activemq.artemis.core.server.impl.MessageReferenceImpl;
import org.apache.activemq.artemis.core.server.impl.RefsOperation;
import org.apache.activemq.artemis.core.settings.HierarchicalRepository;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
@ -793,9 +792,7 @@ public class QueueControlImpl extends AbstractControl implements QueueControl {
int i = 0;
for (MessageReference ref : refs) {
Message message = ref.getMessage();
final int currentMemoryEstimate = message.getMemoryEstimate();
messages[i++] = message.toMap();
MessageReferenceImpl.accountForChangeInMemoryEstimate(ref, currentMemoryEstimate);
}
return messages;
}
@ -856,9 +853,7 @@ public class QueueControlImpl extends AbstractControl implements QueueControl {
MessageReference ref = iterator.next();
if (filter == null || filter.match(ref.getMessage())) {
Message message = ref.getMessage();
final int currentMemoryEstimate = message.getMemoryEstimate();
messages.add(message.toMap());
MessageReferenceImpl.accountForChangeInMemoryEstimate(ref, currentMemoryEstimate);
}
}
} catch (NoSuchElementException ignored) {
@ -903,9 +898,7 @@ public class QueueControlImpl extends AbstractControl implements QueueControl {
if (iterator.hasNext()) {
MessageReference ref = iterator.next();
Message message = ref.getMessage();
final int currentMemoryEstimate = message.getMemoryEstimate();
messages.add(message.toMap());
MessageReferenceImpl.accountForChangeInMemoryEstimate(ref, currentMemoryEstimate);
}
return messages.toArray(new Map[1]);
}

View File

@ -23,7 +23,6 @@ import java.util.function.Consumer;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.paging.PagedMessage;
import org.apache.activemq.artemis.core.paging.PagingStore;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.Queue;
@ -406,16 +405,6 @@ public class PagedReferenceImpl extends LinkedListImpl.Node<PagedReferenceImpl>
return messageSize;
}
@Override
public PagingStore getOwner() {
return null;
}
@Override
public void setOwner(PagingStore owner) {
}
@Override
public boolean isDurable() {
if (durable == UNDEFINED_IS_DURABLE) {

View File

@ -1244,7 +1244,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
@Override
public MessageReference reload(final Message message, final Queue queue, final Transaction tx) throws Exception {
MessageReference reference = MessageReference.Factory.createReference(message, queue, pagingManager.getPageStore(message.getAddressSimpleString()));
MessageReference reference = MessageReference.Factory.createReference(message, queue);
Long scheduledDeliveryTime;
if (message.hasScheduledDeliveryTime()) {
@ -1499,6 +1499,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
}
PagingStore owningStore = pagingManager.getPageStore(message.getAddressSimpleString());
message.setOwner(owningStore);
for (Map.Entry<SimpleString, RouteContextList> entry : context.getContexListing().entrySet()) {
PagingStore store;
if (entry.getKey() == message.getAddressSimpleString() || entry.getKey().equals(message.getAddressSimpleString())) {
@ -1518,7 +1519,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
}
for (Queue queue : entry.getValue().getNonDurableQueues()) {
MessageReference reference = MessageReference.Factory.createReference(message, queue, owningStore);
MessageReference reference = MessageReference.Factory.createReference(message, queue);
if (deliveryTime != null) {
reference.setScheduledDeliveryTime(deliveryTime);
@ -1533,7 +1534,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
while (iter.hasNext()) {
Queue queue = iter.next();
MessageReference reference = MessageReference.Factory.createReference(message, queue, owningStore);
MessageReference reference = MessageReference.Factory.createReference(message, queue);
if (context.isAlreadyAcked(context.getAddress(message), queue)) {
reference.setAlreadyAcked();

View File

@ -22,7 +22,6 @@ import java.util.function.Consumer;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.paging.PagingStore;
import org.apache.activemq.artemis.core.server.impl.AckReason;
import org.apache.activemq.artemis.core.server.impl.MessageReferenceImpl;
import org.apache.activemq.artemis.core.transaction.Transaction;
@ -35,8 +34,8 @@ import org.apache.activemq.artemis.core.transaction.Transaction;
public interface MessageReference {
final class Factory {
public static MessageReference createReference(Message encode, final Queue queue, PagingStore pageStore) {
return new MessageReferenceImpl(encode, queue, pageStore);
public static MessageReference createReference(Message encode, final Queue queue) {
return new MessageReferenceImpl(encode, queue);
}
}
boolean isPaged();
@ -138,7 +137,4 @@ public interface MessageReference {
*/
long getPersistentSize() throws ActiveMQException;
PagingStore getOwner();
void setOwner(PagingStore owner);
}

View File

@ -218,13 +218,4 @@ public class GroupFirstMessageReference implements MessageReference {
return messageReference.getPersistentSize();
}
@Override
public PagingStore getOwner() {
return this.owner;
}
@Override
public void setOwner(PagingStore owner) {
this.owner = owner;
}
}

View File

@ -570,16 +570,6 @@ public class LastValueQueue extends QueueImpl {
return new StringBuilder().append("HolderReference").append("@").append(Integer.toHexString(System.identityHashCode(this))).append("[ref=").append(ref).append("]").toString();
}
@Override
public PagingStore getOwner() {
return ref.getOwner();
}
@Override
public void setOwner(PagingStore owner) {
ref.setOwner(owner);
}
}
@Override

View File

@ -23,7 +23,6 @@ import java.util.function.Consumer;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.paging.PagingStore;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.ServerConsumer;
@ -36,7 +35,6 @@ import org.apache.activemq.artemis.utils.collections.LinkedListImpl;
public class MessageReferenceImpl extends LinkedListImpl.Node<MessageReferenceImpl> implements MessageReference, Runnable {
private static final MessageReferenceComparatorByID idComparator = new MessageReferenceComparatorByID();
private volatile PagingStore owner;
public static Comparator<MessageReference> getIDComparator() {
return idComparator;
@ -107,15 +105,13 @@ public class MessageReferenceImpl extends LinkedListImpl.Node<MessageReferenceIm
this.queue = queue;
this.owner = other.owner;
}
public MessageReferenceImpl(final Message message, final Queue queue, final PagingStore owner) {
public MessageReferenceImpl(final Message message, final Queue queue) {
this.message = message;
this.queue = queue;
this.owner = owner;
}
// MessageReference implementation -------------------------------
@ -179,15 +175,6 @@ public class MessageReferenceImpl extends LinkedListImpl.Node<MessageReferenceIm
return MessageReferenceImpl.memoryOffset;
}
public static void accountForChangeInMemoryEstimate(final MessageReference ref, final int existingMemoryEstimate) {
final int delta = ref.getMessageMemoryEstimate() - existingMemoryEstimate;
if (delta > 0) {
PagingStore pageStore = ref.getOwner();
if (pageStore != null) {
pageStore.addSize(delta);
}
}
}
@Override
public int getDeliveryCount() {
@ -367,13 +354,4 @@ public class MessageReferenceImpl extends LinkedListImpl.Node<MessageReferenceIm
return this.getMessage().getPersistentSize();
}
@Override
public PagingStore getOwner() {
return this.owner;
}
@Override
public void setOwner(PagingStore owner) {
this.owner = owner;
}
}

View File

@ -1005,8 +1005,8 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
public void refUp(MessageReference messageReference) {
int count = messageReference.getMessage().refUp();
if (count == 1) {
if (messageReference.getOwner() != null) {
messageReference.getOwner().addSize(messageReference.getMessageMemoryEstimate());
if (messageReference.getMessage().getOwner() != null) {
((PagingStore)messageReference.getMessage().getOwner()).addSize(messageReference.getMessageMemoryEstimate());
}
}
if (pagingStore != null) {
@ -1018,8 +1018,8 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
public void refDown(MessageReference messageReference) {
int count = messageReference.getMessage().refDown();
if (count == 0) {
if (messageReference.getOwner() != null) {
messageReference.getOwner().addSize(-messageReference.getMessageMemoryEstimate());
if (messageReference.getMessage().getOwner() != null) {
((PagingStore)messageReference.getMessage().getOwner()).addSize(-messageReference.getMessageMemoryEstimate());
}
}
if (pagingStore != null) {
@ -3071,9 +3071,6 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
proceedDeliver(handledconsumer, ref);
}
if (existingMemoryEstimate > 0 ) {
MessageReferenceImpl.accountForChangeInMemoryEstimate(ref, existingMemoryEstimate);
}
}
return true;
@ -3697,13 +3694,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
consumer = groupConsumer;
}
// filter evaluation may cause properties to be lazyDecoded
final int existingMemoryEstimate = ref.getMessageMemoryEstimate();
HandleStatus status = handle(ref, consumer);
MessageReferenceImpl.accountForChangeInMemoryEstimate(ref, existingMemoryEstimate);
if (status == HandleStatus.HANDLED) {
final MessageReference reference;
if (redistributor == null) {

View File

@ -644,7 +644,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
public void forceDelivery(final long sequence) {
forceDelivery(sequence, () -> {
Message forcedDeliveryMessage = new CoreMessage(storageManager.generateID(), 50);
MessageReference reference = MessageReference.Factory.createReference(forcedDeliveryMessage, messageQueue, null);
MessageReference reference = MessageReference.Factory.createReference(forcedDeliveryMessage, messageQueue);
reference.setDeliveryCount(0);
forcedDeliveryMessage.putLongProperty(ClientConsumerImpl.FORCED_DELIVERY_MESSAGE, sequence);

View File

@ -251,7 +251,7 @@ public class ScheduledDeliveryHandlerTest extends Assert {
long nextMessageID,
long nextScheduledTime,
boolean tail) {
MessageReferenceImpl refImpl = new MessageReferenceImpl(new FakeMessage(nextMessageID), null, null);
MessageReferenceImpl refImpl = new MessageReferenceImpl(new FakeMessage(nextMessageID), null);
refImpl.setScheduledDeliveryTime(nextScheduledTime);
handler.addInPlace(nextScheduledTime, refImpl, tail);
}
@ -261,7 +261,7 @@ public class ScheduledDeliveryHandlerTest extends Assert {
long nextScheduledTime,
boolean tail,
Queue queue) {
MessageReferenceImpl refImpl = new MessageReferenceImpl(new FakeMessage(nextMessageID), queue, null);
MessageReferenceImpl refImpl = new MessageReferenceImpl(new FakeMessage(nextMessageID), queue);
refImpl.setScheduledDeliveryTime(nextScheduledTime);
handler.checkAndSchedule(refImpl, tail);
}
@ -810,6 +810,15 @@ public class ScheduledDeliveryHandlerTest extends Assert {
return 0;
}
@Override
public Object getOwner() {
return null;
}
@Override
public void setOwner(Object object) {
}
}
public class FakeQueueForScheduleUnitTest extends CriticalComponentImpl implements Queue {

View File

@ -2150,7 +2150,7 @@ public abstract class ActiveMQTestBase extends Assert {
protected MessageReference generateReference(final Queue queue, final long id) {
Message message = generateMessage(id);
return MessageReference.Factory.createReference(message, queue, null);
return MessageReference.Factory.createReference(message, queue);
}
protected int calculateRecordSize(final int size, final int alignment) {

View File

@ -839,5 +839,14 @@ public class AcknowledgeTest extends ActiveMQTestBase {
public long getPersistentSize() throws ActiveMQException {
return 0;
}
@Override
public Object getOwner() {
return null;
}
@Override
public void setOwner(Object object) {
}
}
}

View File

@ -334,7 +334,7 @@ public class GlobalPagingTest extends PagingTest {
int id = 1000;
try (ClientConsumer consumer = session.createConsumer(replyQueue)) {
final Queue queue = server.locateQueue(replyQueue);
final MessageReference reference = MessageReference.Factory.createReference(session.createMessage(false), queue, null);
final MessageReference reference = MessageReference.Factory.createReference(session.createMessage(false), queue);
reference.getMessage().setMessageID(id++);
//it will cause QueueImpl::directDeliver -> false
queue.addHead(reference, false);

View File

@ -140,7 +140,7 @@ public class QueueConcurrentTest extends ActiveMQTestBase {
while (System.currentTimeMillis() - start < testTime) {
Message message = generateMessage(i);
MessageReference ref = MessageReference.Factory.createReference(message, queue, null);
MessageReference ref = MessageReference.Factory.createReference(message, queue);
queue.addTail(ref, false);