This commit is contained in:
Clebert Suconic 2019-01-16 16:35:11 -05:00
commit 9cbe4519b0
6 changed files with 34 additions and 5 deletions

View File

@ -77,6 +77,12 @@ public class PagedReferenceImpl extends LinkedListImpl.Node<PagedReferenceImpl>
private Consumer<? super MessageReference> onDelivery;
//Durable field : 0 is false, 1 is true, -1 not defined
private static final byte IS_NOT_DURABLE = 0;
private static final byte IS_DURABLE = 1;
private static final byte UNDEFINED_IS_DURABLE = -1;
private byte durable = UNDEFINED_IS_DURABLE;
@Override
public Object getProtocolData() {
return protocolData;
@ -144,7 +150,8 @@ public class PagedReferenceImpl extends LinkedListImpl.Node<PagedReferenceImpl>
this.largeMessage = message.getMessage().isLargeMessage() ? IS_LARGE_MESSAGE : IS_NOT_LARGE_MESSAGE;
this.transactionID = message.getTransactionID();
this.messageID = message.getMessage().getMessageID();
this.durable = message.getMessage().isDurable() ? IS_DURABLE : IS_NOT_DURABLE;
this.deliveryTime = message.getMessage().getScheduledDeliveryTime();
//pre-cache the message size so we don't have to reload the message later if it is GC'd
getPersistentSize();
} else {
@ -152,6 +159,8 @@ public class PagedReferenceImpl extends LinkedListImpl.Node<PagedReferenceImpl>
this.transactionID = -2;
this.messageID = -1;
this.messageSize = -1;
this.durable = UNDEFINED_IS_DURABLE;
this.deliveryTime = UNDEFINED_DELIVERY_TIME;
}
}
@ -387,4 +396,12 @@ public class PagedReferenceImpl extends LinkedListImpl.Node<PagedReferenceImpl>
return messageSize;
}
@Override
public boolean isDurable() {
if (durable == UNDEFINED_IS_DURABLE) {
durable = getMessage().isDurable() ? IS_DURABLE : IS_NOT_DURABLE;
}
return durable == IS_DURABLE;
}
}

View File

@ -44,6 +44,8 @@ public interface MessageReference {
long getMessageID();
boolean isDurable();
SimpleString getLastValueProperty();
/**

View File

@ -305,6 +305,11 @@ public class LastValueQueue extends QueueImpl {
return ref.getMessageID();
}
@Override
public boolean isDurable() {
return getMessage().isDurable();
}
@Override
public SimpleString getLastValueProperty() {
return prop;

View File

@ -189,6 +189,11 @@ public class MessageReferenceImpl extends LinkedListImpl.Node<MessageReferenceIm
return queue;
}
@Override
public boolean isDurable() {
return getMessage().isDurable();
}
@Override
public void handled() {
queue.referenceHandled(this);

View File

@ -2767,7 +2767,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
return true;
}
if (!internalQueue && message.isDurable() && isDurableMessage() && !reference.isPaged()) {
if (!internalQueue && reference.isDurable() && isDurableMessage() && !reference.isPaged()) {
storageManager.updateDeliveryCount(reference);
}
@ -2796,7 +2796,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
reference.setScheduledDeliveryTime(timeBase + redeliveryDelay);
if (!reference.isPaged() && message.isDurable() && isDurableMessage()) {
if (!reference.isPaged() && reference.isDurable() && isDurableMessage()) {
storageManager.updateScheduledDeliveryTime(reference);
}
}

View File

@ -58,7 +58,7 @@ public class QueuePendingMessageMetrics {
long size = getPersistentSize(reference);
COUNT_UPDATER.incrementAndGet(this);
SIZE_UPDATER.addAndGet(this, size);
if (queue.isDurable() && reference.getMessage().isDurable()) {
if (queue.isDurable() && reference.isDurable()) {
DURABLE_COUNT_UPDATER.incrementAndGet(this);
DURABLE_SIZE_UPDATER.addAndGet(this, size);
}
@ -68,7 +68,7 @@ public class QueuePendingMessageMetrics {
long size = -getPersistentSize(reference);
COUNT_UPDATER.decrementAndGet(this);
SIZE_UPDATER.addAndGet(this, size);
if (queue.isDurable() && reference.getMessage().isDurable()) {
if (queue.isDurable() && reference.isDurable()) {
DURABLE_COUNT_UPDATER.decrementAndGet(this);
DURABLE_SIZE_UPDATER.addAndGet(this, size);
}