This closes #3282
This commit is contained in:
commit
4395a951b5
|
@ -23,6 +23,7 @@ import java.util.function.Consumer;
|
||||||
import org.apache.activemq.artemis.api.core.Message;
|
import org.apache.activemq.artemis.api.core.Message;
|
||||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||||
import org.apache.activemq.artemis.core.paging.PagedMessage;
|
import org.apache.activemq.artemis.core.paging.PagedMessage;
|
||||||
|
import org.apache.activemq.artemis.core.paging.PagingStore;
|
||||||
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
|
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
|
||||||
import org.apache.activemq.artemis.core.server.MessageReference;
|
import org.apache.activemq.artemis.core.server.MessageReference;
|
||||||
import org.apache.activemq.artemis.core.server.Queue;
|
import org.apache.activemq.artemis.core.server.Queue;
|
||||||
|
@ -405,6 +406,16 @@ public class PagedReferenceImpl extends LinkedListImpl.Node<PagedReferenceImpl>
|
||||||
return messageSize;
|
return messageSize;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public PagingStore getOwner() {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void setOwner(PagingStore owner) {
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean isDurable() {
|
public boolean isDurable() {
|
||||||
if (durable == UNDEFINED_IS_DURABLE) {
|
if (durable == UNDEFINED_IS_DURABLE) {
|
||||||
|
|
|
@ -966,11 +966,7 @@ public class PagingStoreImpl implements PagingStore {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void refUp(Message message, int count) {
|
public void refUp(Message message, int count) {
|
||||||
if (count == 1) {
|
this.addSize(MessageReferenceImpl.getMemoryEstimate());
|
||||||
this.addSize(message.getMemoryEstimate() + MessageReferenceImpl.getMemoryEstimate());
|
|
||||||
} else {
|
|
||||||
this.addSize(MessageReferenceImpl.getMemoryEstimate());
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -979,15 +975,7 @@ public class PagingStoreImpl implements PagingStore {
|
||||||
// this could happen on paged messages since they are not routed and refUp is never called
|
// this could happen on paged messages since they are not routed and refUp is never called
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
this.addSize(-MessageReferenceImpl.getMemoryEstimate());
|
||||||
if (count == 0) {
|
|
||||||
this.addSize(-message.getMemoryEstimate() - MessageReferenceImpl.getMemoryEstimate());
|
|
||||||
|
|
||||||
} else {
|
|
||||||
this.addSize(-MessageReferenceImpl.getMemoryEstimate());
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private void installPageTransaction(final Transaction tx, final RouteContextList listCtx) throws Exception {
|
private void installPageTransaction(final Transaction tx, final RouteContextList listCtx) throws Exception {
|
||||||
|
|
|
@ -1201,7 +1201,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
|
||||||
@Override
|
@Override
|
||||||
public MessageReference reload(final Message message, final Queue queue, final Transaction tx) throws Exception {
|
public MessageReference reload(final Message message, final Queue queue, final Transaction tx) throws Exception {
|
||||||
|
|
||||||
MessageReference reference = MessageReference.Factory.createReference(message, queue);
|
MessageReference reference = MessageReference.Factory.createReference(message, queue, pagingManager.getPageStore(message.getAddressSimpleString()));
|
||||||
|
|
||||||
Long scheduledDeliveryTime;
|
Long scheduledDeliveryTime;
|
||||||
if (message.hasScheduledDeliveryTime()) {
|
if (message.hasScheduledDeliveryTime()) {
|
||||||
|
@ -1211,6 +1211,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
queue.refUp(reference);
|
||||||
queue.durableUp(message);
|
queue.durableUp(message);
|
||||||
|
|
||||||
if (tx == null) {
|
if (tx == null) {
|
||||||
|
@ -1455,8 +1456,14 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
|
||||||
deliveryTime = message.getScheduledDeliveryTime();
|
deliveryTime = message.getScheduledDeliveryTime();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
PagingStore owningStore = pagingManager.getPageStore(message.getAddressSimpleString());
|
||||||
for (Map.Entry<SimpleString, RouteContextList> entry : context.getContexListing().entrySet()) {
|
for (Map.Entry<SimpleString, RouteContextList> entry : context.getContexListing().entrySet()) {
|
||||||
PagingStore store = pagingManager.getPageStore(entry.getKey());
|
PagingStore store;
|
||||||
|
if (entry.getKey() == message.getAddressSimpleString() || entry.getKey().equals(message.getAddressSimpleString())) {
|
||||||
|
store = owningStore;
|
||||||
|
} else {
|
||||||
|
store = pagingManager.getPageStore(entry.getKey());
|
||||||
|
}
|
||||||
|
|
||||||
if (store != null && storageManager.addToPage(store, message, context.getTransaction(), entry.getValue())) {
|
if (store != null && storageManager.addToPage(store, message, context.getTransaction(), entry.getValue())) {
|
||||||
if (message.isLargeMessage()) {
|
if (message.isLargeMessage()) {
|
||||||
|
@ -1469,14 +1476,14 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
|
||||||
}
|
}
|
||||||
|
|
||||||
for (Queue queue : entry.getValue().getNonDurableQueues()) {
|
for (Queue queue : entry.getValue().getNonDurableQueues()) {
|
||||||
MessageReference reference = MessageReference.Factory.createReference(message, queue);
|
MessageReference reference = MessageReference.Factory.createReference(message, queue, owningStore);
|
||||||
|
|
||||||
if (deliveryTime != null) {
|
if (deliveryTime != null) {
|
||||||
reference.setScheduledDeliveryTime(deliveryTime);
|
reference.setScheduledDeliveryTime(deliveryTime);
|
||||||
}
|
}
|
||||||
refs.add(reference);
|
refs.add(reference);
|
||||||
|
|
||||||
queue.refUp(message);
|
queue.refUp(reference);
|
||||||
}
|
}
|
||||||
|
|
||||||
Iterator<Queue> iter = entry.getValue().getDurableQueues().iterator();
|
Iterator<Queue> iter = entry.getValue().getDurableQueues().iterator();
|
||||||
|
@ -1484,7 +1491,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
|
||||||
while (iter.hasNext()) {
|
while (iter.hasNext()) {
|
||||||
Queue queue = iter.next();
|
Queue queue = iter.next();
|
||||||
|
|
||||||
MessageReference reference = MessageReference.Factory.createReference(message, queue);
|
MessageReference reference = MessageReference.Factory.createReference(message, queue, owningStore);
|
||||||
|
|
||||||
if (context.isAlreadyAcked(context.getAddress(message), queue)) {
|
if (context.isAlreadyAcked(context.getAddress(message), queue)) {
|
||||||
reference.setAlreadyAcked();
|
reference.setAlreadyAcked();
|
||||||
|
@ -1497,6 +1504,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
|
||||||
reference.setScheduledDeliveryTime(deliveryTime);
|
reference.setScheduledDeliveryTime(deliveryTime);
|
||||||
}
|
}
|
||||||
refs.add(reference);
|
refs.add(reference);
|
||||||
|
queue.refUp(reference);
|
||||||
|
|
||||||
if (message.isDurable()) {
|
if (message.isDurable()) {
|
||||||
int durableRefCount = queue.durableUp(message);
|
int durableRefCount = queue.durableUp(message);
|
||||||
|
@ -1528,8 +1536,6 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
|
||||||
storageManager.updateScheduledDeliveryTime(reference);
|
storageManager.updateScheduledDeliveryTime(reference);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else {
|
|
||||||
queue.refUp(message);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1852,12 +1858,10 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
|
||||||
// Reverse the ref counts, and paging sizes
|
// Reverse the ref counts, and paging sizes
|
||||||
|
|
||||||
for (MessageReference ref : refs) {
|
for (MessageReference ref : refs) {
|
||||||
|
ref.getQueue().refDown(ref);
|
||||||
Message message = ref.getMessage();
|
Message message = ref.getMessage();
|
||||||
|
|
||||||
if (message.isDurable() && ref.getQueue().isDurable()) {
|
if (message.isDurable() && ref.getQueue().isDurable()) {
|
||||||
ref.getQueue().durableDown(message);
|
ref.getQueue().durableDown(message);
|
||||||
} else {
|
|
||||||
ref.getQueue().refDown(message);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -22,6 +22,7 @@ import java.util.function.Consumer;
|
||||||
import org.apache.activemq.artemis.api.core.ActiveMQException;
|
import org.apache.activemq.artemis.api.core.ActiveMQException;
|
||||||
import org.apache.activemq.artemis.api.core.Message;
|
import org.apache.activemq.artemis.api.core.Message;
|
||||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||||
|
import org.apache.activemq.artemis.core.paging.PagingStore;
|
||||||
import org.apache.activemq.artemis.core.server.impl.AckReason;
|
import org.apache.activemq.artemis.core.server.impl.AckReason;
|
||||||
import org.apache.activemq.artemis.core.server.impl.MessageReferenceImpl;
|
import org.apache.activemq.artemis.core.server.impl.MessageReferenceImpl;
|
||||||
import org.apache.activemq.artemis.core.transaction.Transaction;
|
import org.apache.activemq.artemis.core.transaction.Transaction;
|
||||||
|
@ -34,8 +35,8 @@ import org.apache.activemq.artemis.core.transaction.Transaction;
|
||||||
public interface MessageReference {
|
public interface MessageReference {
|
||||||
|
|
||||||
final class Factory {
|
final class Factory {
|
||||||
public static MessageReference createReference(Message encode, final Queue queue) {
|
public static MessageReference createReference(Message encode, final Queue queue, PagingStore pageStore) {
|
||||||
return new MessageReferenceImpl(encode, queue);
|
return new MessageReferenceImpl(encode, queue, pageStore);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
boolean isPaged();
|
boolean isPaged();
|
||||||
|
@ -136,4 +137,8 @@ public interface MessageReference {
|
||||||
* @throws ActiveMQException
|
* @throws ActiveMQException
|
||||||
*/
|
*/
|
||||||
long getPersistentSize() throws ActiveMQException;
|
long getPersistentSize() throws ActiveMQException;
|
||||||
|
|
||||||
|
PagingStore getOwner();
|
||||||
|
|
||||||
|
void setOwner(PagingStore owner);
|
||||||
}
|
}
|
||||||
|
|
|
@ -68,9 +68,9 @@ public interface Queue extends Bindable,CriticalComponent {
|
||||||
|
|
||||||
int durableDown(Message message);
|
int durableDown(Message message);
|
||||||
|
|
||||||
void refUp(Message message);
|
void refUp(MessageReference messageReference);
|
||||||
|
|
||||||
void refDown(Message message);
|
void refDown(MessageReference messageReference);
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -20,6 +20,7 @@ import java.util.function.Consumer;
|
||||||
import org.apache.activemq.artemis.api.core.ActiveMQException;
|
import org.apache.activemq.artemis.api.core.ActiveMQException;
|
||||||
import org.apache.activemq.artemis.api.core.Message;
|
import org.apache.activemq.artemis.api.core.Message;
|
||||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||||
|
import org.apache.activemq.artemis.core.paging.PagingStore;
|
||||||
import org.apache.activemq.artemis.core.server.MessageReference;
|
import org.apache.activemq.artemis.core.server.MessageReference;
|
||||||
import org.apache.activemq.artemis.core.server.Queue;
|
import org.apache.activemq.artemis.core.server.Queue;
|
||||||
import org.apache.activemq.artemis.core.server.ServerConsumer;
|
import org.apache.activemq.artemis.core.server.ServerConsumer;
|
||||||
|
@ -37,6 +38,7 @@ public class GroupFirstMessageReference implements MessageReference {
|
||||||
private final MessageReference messageReference;
|
private final MessageReference messageReference;
|
||||||
private final SimpleString key;
|
private final SimpleString key;
|
||||||
private volatile Message message;
|
private volatile Message message;
|
||||||
|
private volatile PagingStore owner;
|
||||||
|
|
||||||
public GroupFirstMessageReference(SimpleString key, MessageReference messageReference) {
|
public GroupFirstMessageReference(SimpleString key, MessageReference messageReference) {
|
||||||
this.messageReference = messageReference;
|
this.messageReference = messageReference;
|
||||||
|
@ -215,4 +217,14 @@ public class GroupFirstMessageReference implements MessageReference {
|
||||||
public long getPersistentSize() throws ActiveMQException {
|
public long getPersistentSize() throws ActiveMQException {
|
||||||
return messageReference.getPersistentSize();
|
return messageReference.getPersistentSize();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public PagingStore getOwner() {
|
||||||
|
return this.owner;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void setOwner(PagingStore owner) {
|
||||||
|
this.owner = owner;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -538,6 +538,16 @@ public class LastValueQueue extends QueueImpl {
|
||||||
public long getPersistentSize() throws ActiveMQException {
|
public long getPersistentSize() throws ActiveMQException {
|
||||||
return ref.getPersistentSize();
|
return ref.getPersistentSize();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public PagingStore getOwner() {
|
||||||
|
return ref.getOwner();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void setOwner(PagingStore owner) {
|
||||||
|
ref.setOwner(owner);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -23,6 +23,7 @@ import java.util.function.Consumer;
|
||||||
import org.apache.activemq.artemis.api.core.ActiveMQException;
|
import org.apache.activemq.artemis.api.core.ActiveMQException;
|
||||||
import org.apache.activemq.artemis.api.core.Message;
|
import org.apache.activemq.artemis.api.core.Message;
|
||||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||||
|
import org.apache.activemq.artemis.core.paging.PagingStore;
|
||||||
import org.apache.activemq.artemis.core.server.MessageReference;
|
import org.apache.activemq.artemis.core.server.MessageReference;
|
||||||
import org.apache.activemq.artemis.core.server.Queue;
|
import org.apache.activemq.artemis.core.server.Queue;
|
||||||
import org.apache.activemq.artemis.core.server.ServerConsumer;
|
import org.apache.activemq.artemis.core.server.ServerConsumer;
|
||||||
|
@ -35,6 +36,7 @@ import org.apache.activemq.artemis.utils.collections.LinkedListImpl;
|
||||||
public class MessageReferenceImpl extends LinkedListImpl.Node<MessageReferenceImpl> implements MessageReference, Runnable {
|
public class MessageReferenceImpl extends LinkedListImpl.Node<MessageReferenceImpl> implements MessageReference, Runnable {
|
||||||
|
|
||||||
private static final MessageReferenceComparatorByID idComparator = new MessageReferenceComparatorByID();
|
private static final MessageReferenceComparatorByID idComparator = new MessageReferenceComparatorByID();
|
||||||
|
private volatile PagingStore owner;
|
||||||
|
|
||||||
public static Comparator<MessageReference> getIDComparator() {
|
public static Comparator<MessageReference> getIDComparator() {
|
||||||
return idComparator;
|
return idComparator;
|
||||||
|
@ -102,12 +104,16 @@ public class MessageReferenceImpl extends LinkedListImpl.Node<MessageReferenceIm
|
||||||
message = other.message;
|
message = other.message;
|
||||||
|
|
||||||
this.queue = queue;
|
this.queue = queue;
|
||||||
|
|
||||||
|
this.owner = other.owner;
|
||||||
}
|
}
|
||||||
|
|
||||||
public MessageReferenceImpl(final Message message, final Queue queue) {
|
public MessageReferenceImpl(final Message message, final Queue queue, final PagingStore owner) {
|
||||||
this.message = message;
|
this.message = message;
|
||||||
|
|
||||||
this.queue = queue;
|
this.queue = queue;
|
||||||
|
|
||||||
|
this.owner = owner;
|
||||||
}
|
}
|
||||||
|
|
||||||
// MessageReference implementation -------------------------------
|
// MessageReference implementation -------------------------------
|
||||||
|
@ -348,4 +354,14 @@ public class MessageReferenceImpl extends LinkedListImpl.Node<MessageReferenceIm
|
||||||
public long getPersistentSize() throws ActiveMQException {
|
public long getPersistentSize() throws ActiveMQException {
|
||||||
return this.getMessage().getPersistentSize();
|
return this.getMessage().getPersistentSize();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public PagingStore getOwner() {
|
||||||
|
return this.owner;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void setOwner(PagingStore owner) {
|
||||||
|
this.owner = owner;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -970,36 +970,37 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public int durableUp(Message message) {
|
public int durableUp(Message message) {
|
||||||
int count = message.durableUp();
|
return message.durableUp();
|
||||||
if (pagingStore != null) {
|
|
||||||
pagingStore.durableUp(message, count);
|
|
||||||
}
|
|
||||||
return count;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public int durableDown(Message message) {
|
public int durableDown(Message message) {
|
||||||
int count = message.durableDown();
|
return message.durableDown();
|
||||||
if (pagingStore != null) {
|
|
||||||
pagingStore.durableDown(message, count);
|
|
||||||
}
|
|
||||||
return count;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void refUp(Message message) {
|
public void refUp(MessageReference messageReference) {
|
||||||
int count = message.refUp();
|
int count = messageReference.getMessage().refUp();
|
||||||
if (pagingStore != null) {
|
if (count == 1) {
|
||||||
pagingStore.refUp(message, count);
|
if (messageReference.getOwner() != null) {
|
||||||
|
messageReference.getOwner().addSize(messageReference.getMessageMemoryEstimate());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (pagingStore != null) {
|
||||||
|
pagingStore.refUp(messageReference.getMessage(), count);
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void refDown(Message message) {
|
public void refDown(MessageReference messageReference) {
|
||||||
int count = message.refDown();
|
int count = messageReference.getMessage().refDown();
|
||||||
|
if (count == 0) {
|
||||||
|
if (messageReference.getOwner() != null) {
|
||||||
|
messageReference.getOwner().addSize(-messageReference.getMessageMemoryEstimate());
|
||||||
|
}
|
||||||
|
}
|
||||||
if (pagingStore != null) {
|
if (pagingStore != null) {
|
||||||
pagingStore.refDown(message, count);
|
pagingStore.refDown(messageReference.getMessage(), count);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -3826,6 +3827,8 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
|
||||||
if (message == null || (nonDestructive && reason == AckReason.NORMAL))
|
if (message == null || (nonDestructive && reason == AckReason.NORMAL))
|
||||||
return;
|
return;
|
||||||
|
|
||||||
|
queue.refDown(ref);
|
||||||
|
|
||||||
boolean durableRef = message.isDurable() && queue.isDurable();
|
boolean durableRef = message.isDurable() && queue.isDurable();
|
||||||
|
|
||||||
if (durableRef) {
|
if (durableRef) {
|
||||||
|
@ -3854,8 +3857,6 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
|
||||||
ActiveMQServerLogger.LOGGER.errorRemovingMessage(e, message.getMessageID());
|
ActiveMQServerLogger.LOGGER.errorRemovingMessage(e, message.getMessageID());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else {
|
|
||||||
queue.refDown(message);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -144,7 +144,7 @@ public class RefsOperation extends TransactionOperationAbstract {
|
||||||
ackedTX.setContainsPersistent();
|
ackedTX.setContainsPersistent();
|
||||||
}
|
}
|
||||||
|
|
||||||
ref.getQueue().refUp(message);
|
ref.getQueue().refUp(ref);
|
||||||
}
|
}
|
||||||
ackedTX.commit(true);
|
ackedTX.commit(true);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
|
@ -188,7 +188,7 @@ public class RefsOperation extends TransactionOperationAbstract {
|
||||||
for (MessageReference refmsg : pagedMessagesToPostACK) {
|
for (MessageReference refmsg : pagedMessagesToPostACK) {
|
||||||
((PagedReference)refmsg).removePendingFlag();
|
((PagedReference)refmsg).removePendingFlag();
|
||||||
if (((PagedReference) refmsg).isLargeMessage()) {
|
if (((PagedReference) refmsg).isLargeMessage()) {
|
||||||
refmsg.getQueue().refDown(refmsg.getMessage());
|
refmsg.getQueue().refDown(refmsg);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -646,7 +646,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
|
||||||
public void forceDelivery(final long sequence) {
|
public void forceDelivery(final long sequence) {
|
||||||
forceDelivery(sequence, () -> {
|
forceDelivery(sequence, () -> {
|
||||||
Message forcedDeliveryMessage = new CoreMessage(storageManager.generateID(), 50);
|
Message forcedDeliveryMessage = new CoreMessage(storageManager.generateID(), 50);
|
||||||
MessageReference reference = MessageReference.Factory.createReference(forcedDeliveryMessage, messageQueue);
|
MessageReference reference = MessageReference.Factory.createReference(forcedDeliveryMessage, messageQueue, null);
|
||||||
reference.setDeliveryCount(0);
|
reference.setDeliveryCount(0);
|
||||||
|
|
||||||
forcedDeliveryMessage.putLongProperty(ClientConsumerImpl.FORCED_DELIVERY_MESSAGE, sequence);
|
forcedDeliveryMessage.putLongProperty(ClientConsumerImpl.FORCED_DELIVERY_MESSAGE, sequence);
|
||||||
|
|
|
@ -250,7 +250,7 @@ public class ScheduledDeliveryHandlerTest extends Assert {
|
||||||
long nextMessageID,
|
long nextMessageID,
|
||||||
long nextScheduledTime,
|
long nextScheduledTime,
|
||||||
boolean tail) {
|
boolean tail) {
|
||||||
MessageReferenceImpl refImpl = new MessageReferenceImpl(new FakeMessage(nextMessageID), null);
|
MessageReferenceImpl refImpl = new MessageReferenceImpl(new FakeMessage(nextMessageID), null, null);
|
||||||
refImpl.setScheduledDeliveryTime(nextScheduledTime);
|
refImpl.setScheduledDeliveryTime(nextScheduledTime);
|
||||||
handler.addInPlace(nextScheduledTime, refImpl, tail);
|
handler.addInPlace(nextScheduledTime, refImpl, tail);
|
||||||
}
|
}
|
||||||
|
@ -260,7 +260,7 @@ public class ScheduledDeliveryHandlerTest extends Assert {
|
||||||
long nextScheduledTime,
|
long nextScheduledTime,
|
||||||
boolean tail,
|
boolean tail,
|
||||||
Queue queue) {
|
Queue queue) {
|
||||||
MessageReferenceImpl refImpl = new MessageReferenceImpl(new FakeMessage(nextMessageID), queue);
|
MessageReferenceImpl refImpl = new MessageReferenceImpl(new FakeMessage(nextMessageID), queue, null);
|
||||||
refImpl.setScheduledDeliveryTime(nextScheduledTime);
|
refImpl.setScheduledDeliveryTime(nextScheduledTime);
|
||||||
handler.checkAndSchedule(refImpl, tail);
|
handler.checkAndSchedule(refImpl, tail);
|
||||||
}
|
}
|
||||||
|
@ -808,6 +808,7 @@ public class ScheduledDeliveryHandlerTest extends Assert {
|
||||||
public long getPersistentSize() throws ActiveMQException {
|
public long getPersistentSize() throws ActiveMQException {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public class FakeQueueForScheduleUnitTest extends CriticalComponentImpl implements Queue {
|
public class FakeQueueForScheduleUnitTest extends CriticalComponentImpl implements Queue {
|
||||||
|
@ -843,12 +844,12 @@ public class ScheduledDeliveryHandlerTest extends Assert {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void refUp(Message message) {
|
public void refUp(MessageReference messageReference) {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void refDown(Message message) {
|
public void refDown(MessageReference messageReference) {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -2138,7 +2138,7 @@ public abstract class ActiveMQTestBase extends Assert {
|
||||||
protected MessageReference generateReference(final Queue queue, final long id) {
|
protected MessageReference generateReference(final Queue queue, final long id) {
|
||||||
Message message = generateMessage(id);
|
Message message = generateMessage(id);
|
||||||
|
|
||||||
return MessageReference.Factory.createReference(message, queue);
|
return MessageReference.Factory.createReference(message, queue, null);
|
||||||
}
|
}
|
||||||
|
|
||||||
protected int calculateRecordSize(final int size, final int alignment) {
|
protected int calculateRecordSize(final int size, final int alignment) {
|
||||||
|
|
|
@ -0,0 +1,105 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.activemq.artemis.tests.integration.jms.cluster;
|
||||||
|
|
||||||
|
import javax.jms.Connection;
|
||||||
|
import javax.jms.DeliveryMode;
|
||||||
|
import javax.jms.MessageConsumer;
|
||||||
|
import javax.jms.MessageProducer;
|
||||||
|
import javax.jms.Session;
|
||||||
|
import javax.jms.TextMessage;
|
||||||
|
import javax.jms.Topic;
|
||||||
|
|
||||||
|
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||||
|
import org.apache.activemq.artemis.tests.util.JMSClusteredTestBase;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
public class TopicClusterPageStoreSizeTest extends JMSClusteredTestBase {
|
||||||
|
|
||||||
|
public static final String TOPIC = "jms.t1";
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testPageStoreSizeWithClusteredDurableSub() throws Exception {
|
||||||
|
doTestPageStoreSizeWithClusteredDurableSub(false);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testPageStoreSizeWithClusteredDurableSubWithPaging() throws Exception {
|
||||||
|
doTestPageStoreSizeWithClusteredDurableSub(true);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void doTestPageStoreSizeWithClusteredDurableSub(boolean forcePaging) throws Exception {
|
||||||
|
|
||||||
|
Connection conn1 = cf1.createConnection();
|
||||||
|
|
||||||
|
conn1.setClientID("someClient1");
|
||||||
|
|
||||||
|
Connection conn2 = cf2.createConnection();
|
||||||
|
|
||||||
|
conn2.setClientID("someClient2");
|
||||||
|
|
||||||
|
conn1.start();
|
||||||
|
|
||||||
|
conn2.start();
|
||||||
|
|
||||||
|
Topic topic1 = createTopic(TOPIC, true);
|
||||||
|
|
||||||
|
Session session1 = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||||
|
|
||||||
|
Session session2 = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||||
|
|
||||||
|
MessageProducer prod1 = session1.createProducer(null);
|
||||||
|
prod1.setDeliveryMode(DeliveryMode.PERSISTENT);
|
||||||
|
|
||||||
|
MessageConsumer cons1 = session1.createDurableSubscriber(topic1, "sub1");
|
||||||
|
MessageConsumer cons2 = session2.createDurableSubscriber(topic1, "sub2");
|
||||||
|
|
||||||
|
waitForBindings(server1, TOPIC, true, 1, 1, 2000);
|
||||||
|
waitForBindings(server2, TOPIC, true, 1, 1, 2000);
|
||||||
|
waitForBindings(server1, TOPIC, false, 1, 1, 2000);
|
||||||
|
waitForBindings(server2, TOPIC, false, 1, 1, 2000);
|
||||||
|
|
||||||
|
if (forcePaging) {
|
||||||
|
for (SimpleString psName : server1.getPagingManager().getStoreNames()) {
|
||||||
|
server1.getPagingManager().getPageStore(psName).startPaging();
|
||||||
|
}
|
||||||
|
for (SimpleString psName : server2.getPagingManager().getStoreNames()) {
|
||||||
|
server2.getPagingManager().getPageStore(psName).startPaging();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
prod1.send(topic1, session1.createTextMessage("someMessage"));
|
||||||
|
|
||||||
|
TextMessage m2 = (TextMessage) cons2.receive(5000);
|
||||||
|
assertNotNull(m2);
|
||||||
|
TextMessage m1 = (TextMessage) cons1.receive(5000);
|
||||||
|
assertTrue(m1.getJMSDestination().toString().contains(TOPIC));
|
||||||
|
|
||||||
|
assertNotNull(m1);
|
||||||
|
|
||||||
|
conn1.close();
|
||||||
|
conn2.close();
|
||||||
|
|
||||||
|
for (SimpleString psName : server1.getPagingManager().getStoreNames()) {
|
||||||
|
assertTrue("non negative size: " + psName, server1.getPagingManager().getPageStore(psName).getAddressSize() >= 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
for (SimpleString psName : server2.getPagingManager().getStoreNames()) {
|
||||||
|
assertTrue("non negative size: " + psName, server2.getPagingManager().getPageStore(psName).getAddressSize() >= 0);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -334,7 +334,7 @@ public class GlobalPagingTest extends PagingTest {
|
||||||
int id = 1000;
|
int id = 1000;
|
||||||
try (ClientConsumer consumer = session.createConsumer(replyQueue)) {
|
try (ClientConsumer consumer = session.createConsumer(replyQueue)) {
|
||||||
final Queue queue = server.locateQueue(replyQueue);
|
final Queue queue = server.locateQueue(replyQueue);
|
||||||
final MessageReference reference = MessageReference.Factory.createReference(session.createMessage(false), queue);
|
final MessageReference reference = MessageReference.Factory.createReference(session.createMessage(false), queue, null);
|
||||||
reference.getMessage().setMessageID(id++);
|
reference.getMessage().setMessageID(id++);
|
||||||
//it will cause QueueImpl::directDeliver -> false
|
//it will cause QueueImpl::directDeliver -> false
|
||||||
queue.addHead(reference, false);
|
queue.addHead(reference, false);
|
||||||
|
|
|
@ -140,7 +140,7 @@ public class QueueConcurrentTest extends ActiveMQTestBase {
|
||||||
while (System.currentTimeMillis() - start < testTime) {
|
while (System.currentTimeMillis() - start < testTime) {
|
||||||
Message message = generateMessage(i);
|
Message message = generateMessage(i);
|
||||||
|
|
||||||
MessageReference ref = MessageReference.Factory.createReference(message, queue);
|
MessageReference ref = MessageReference.Factory.createReference(message, queue, null);
|
||||||
|
|
||||||
queue.addTail(ref, false);
|
queue.addTail(ref, false);
|
||||||
|
|
||||||
|
|
|
@ -312,11 +312,10 @@ public class FakeSequentialFileFactory implements SequentialFileFactory {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void delete() {
|
public void delete() {
|
||||||
|
fileMap.remove(fileName);
|
||||||
if (open) {
|
if (open) {
|
||||||
close();
|
close();
|
||||||
}
|
}
|
||||||
|
|
||||||
fileMap.remove(fileName);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -76,12 +76,12 @@ public class FakeQueue extends CriticalComponentImpl implements Queue {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void refUp(Message message) {
|
public void refUp(MessageReference messageReference) {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void refDown(Message message) {
|
public void refDown(MessageReference messageReference) {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue