This commit is contained in:
Clebert Suconic 2019-09-06 14:26:30 -04:00
commit 0a0b0fcb23
3 changed files with 29 additions and 1 deletions

View File

@ -166,7 +166,8 @@ public class LastValueQueue extends QueueImpl {
private void replaceLVQMessage(MessageReference ref, HolderReference hr) {
MessageReference oldRef = hr.getReference();
referenceHandled(ref);
referenceHandled(oldRef);
super.refRemoved(oldRef);
try {
oldRef.acknowledge(null, AckReason.REPLACED, null);
@ -175,6 +176,8 @@ public class LastValueQueue extends QueueImpl {
}
hr.setReference(ref);
addRefSize(ref);
refAdded(ref);
}
@Override

View File

@ -2902,6 +2902,11 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
}
}
protected void addRefSize(MessageReference ref) {
queueMemorySize.addAndGet(ref.getMessageMemoryEstimate());
pendingMetrics.incrementMetrics(ref);
}
protected void refAdded(final MessageReference ref) {
if (ref.isPaged()) {
pagedReferences.incrementAndGet();

View File

@ -671,6 +671,26 @@ public class LVQTest extends ActiveMQTestBase {
clientSessionTxReceives.commit();
}
@Test
public void testSizeInReplace() throws Exception {
ClientProducer producer = clientSession.createProducer(address);
ClientMessage m1 = createTextMessage(clientSession, "m1");
SimpleString rh = new SimpleString("SMID1");
m1.putStringProperty(Message.HDR_LAST_VALUE_NAME, rh);
ClientMessage m2 = clientSession.createMessage(true);
m2.setBodyInputStream(createFakeLargeStream(10 * 1024));
m2.putStringProperty(Message.HDR_LAST_VALUE_NAME, rh);
Queue queue = server.locateQueue(qName1);
producer.send(m1);
long oldSize = queue.getPersistentSize();
producer.send(m2);
assertEquals(queue.getDeliveringSize(), 0);
assertNotEquals(queue.getPersistentSize(), oldSize);
assertTrue(queue.getPersistentSize() > 10 * 1024);
}
@Override
@Before
public void setUp() throws Exception {