ARTEMIS-1657 - Properly decrement memory usage when moving messages from
queue When messages are retried and moved froma DLQ to the original queue the memory usage tracker needs to be decremented
This commit is contained in:
parent
26a28d0686
commit
586487155c
|
@ -1813,6 +1813,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
|
||||||
|
|
||||||
if (!ignored) {
|
if (!ignored) {
|
||||||
move(null, toAddress, binding, ref, rejectDuplicates, AckReason.NORMAL);
|
move(null, toAddress, binding, ref, rejectDuplicates, AckReason.NORMAL);
|
||||||
|
refRemoved(ref);
|
||||||
//move(toAddress, tx, ref, false, rejectDuplicates);
|
//move(toAddress, tx, ref, false, rejectDuplicates);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1861,9 +1862,8 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
|
||||||
move(SimpleString.toSimpleString(originalMessageAddress), tx, ref, false, false, targetQueue.longValue());
|
move(SimpleString.toSimpleString(originalMessageAddress), tx, ref, false, false, targetQueue.longValue());
|
||||||
} else {
|
} else {
|
||||||
move(SimpleString.toSimpleString(originalMessageAddress), tx, ref, false, false);
|
move(SimpleString.toSimpleString(originalMessageAddress), tx, ref, false, false);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
refRemoved(ref);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
|
@ -18,6 +18,7 @@ package org.apache.activemq.artemis.tests.integration.management;
|
||||||
|
|
||||||
import static org.apache.activemq.artemis.core.management.impl.openmbean.CompositeDataConstants.BODY;
|
import static org.apache.activemq.artemis.core.management.impl.openmbean.CompositeDataConstants.BODY;
|
||||||
|
|
||||||
|
import java.lang.reflect.Field;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.LinkedList;
|
import java.util.LinkedList;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
@ -25,6 +26,7 @@ import java.util.concurrent.CountDownLatch;
|
||||||
import java.util.concurrent.ExecutorService;
|
import java.util.concurrent.ExecutorService;
|
||||||
import java.util.concurrent.Executors;
|
import java.util.concurrent.Executors;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
|
||||||
import javax.json.JsonArray;
|
import javax.json.JsonArray;
|
||||||
import javax.json.JsonObject;
|
import javax.json.JsonObject;
|
||||||
|
@ -52,9 +54,11 @@ import org.apache.activemq.artemis.api.core.management.QueueControl;
|
||||||
import org.apache.activemq.artemis.core.config.Configuration;
|
import org.apache.activemq.artemis.core.config.Configuration;
|
||||||
import org.apache.activemq.artemis.core.config.DivertConfiguration;
|
import org.apache.activemq.artemis.core.config.DivertConfiguration;
|
||||||
import org.apache.activemq.artemis.core.messagecounter.impl.MessageCounterManagerImpl;
|
import org.apache.activemq.artemis.core.messagecounter.impl.MessageCounterManagerImpl;
|
||||||
|
import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding;
|
||||||
import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
||||||
import org.apache.activemq.artemis.core.server.ActiveMQServers;
|
import org.apache.activemq.artemis.core.server.ActiveMQServers;
|
||||||
import org.apache.activemq.artemis.core.server.Queue;
|
import org.apache.activemq.artemis.core.server.Queue;
|
||||||
|
import org.apache.activemq.artemis.core.server.impl.QueueImpl;
|
||||||
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
|
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
|
||||||
import org.apache.activemq.artemis.tests.integration.jms.server.management.JMSUtil;
|
import org.apache.activemq.artemis.tests.integration.jms.server.management.JMSUtil;
|
||||||
import org.apache.activemq.artemis.utils.Base64;
|
import org.apache.activemq.artemis.utils.Base64;
|
||||||
|
@ -977,6 +981,22 @@ public class QueueControlTest extends ManagementTestBase {
|
||||||
|
|
||||||
session.start();
|
session.start();
|
||||||
|
|
||||||
|
final LocalQueueBinding binding = (LocalQueueBinding) server.getPostOffice().getBinding(qName);
|
||||||
|
Queue q = binding.getQueue();
|
||||||
|
final LocalQueueBinding binding2 = (LocalQueueBinding) server.getPostOffice().getBinding(dlq);
|
||||||
|
Queue q2 = binding2.getQueue();
|
||||||
|
|
||||||
|
Field queueMemorySizeField = QueueImpl.class.getDeclaredField("queueMemorySize");
|
||||||
|
queueMemorySizeField.setAccessible(true);
|
||||||
|
|
||||||
|
//Get memory size counters to verify
|
||||||
|
AtomicInteger queueMemorySize1 = (AtomicInteger) queueMemorySizeField.get(q);
|
||||||
|
AtomicInteger queueMemorySize2 = (AtomicInteger) queueMemorySizeField.get(q2);
|
||||||
|
|
||||||
|
//Verify that original queue has a memory size greater than 0 and DLQ is 0
|
||||||
|
assertTrue(queueMemorySize1.get() > 0);
|
||||||
|
assertTrue(queueMemorySize2.get() == 0);
|
||||||
|
|
||||||
// Read and rollback all messages to DLQ
|
// Read and rollback all messages to DLQ
|
||||||
ClientConsumer clientConsumer = session.createConsumer(qName);
|
ClientConsumer clientConsumer = session.createConsumer(qName);
|
||||||
for (int i = 0; i < numMessagesToTest; i++) {
|
for (int i = 0; i < numMessagesToTest; i++) {
|
||||||
|
@ -989,6 +1009,10 @@ public class QueueControlTest extends ManagementTestBase {
|
||||||
|
|
||||||
Assert.assertNull(clientConsumer.receiveImmediate());
|
Assert.assertNull(clientConsumer.receiveImmediate());
|
||||||
|
|
||||||
|
//Verify that original queue has a memory size of 0 and DLQ is greater than 0 after rollback
|
||||||
|
assertTrue(queueMemorySize1.get() == 0);
|
||||||
|
assertTrue(queueMemorySize2.get() > 0);
|
||||||
|
|
||||||
QueueControl dlqQueueControl = createManagementControl(dla, dlq);
|
QueueControl dlqQueueControl = createManagementControl(dla, dlq);
|
||||||
Assert.assertEquals(numMessagesToTest, getMessageCount(dlqQueueControl));
|
Assert.assertEquals(numMessagesToTest, getMessageCount(dlqQueueControl));
|
||||||
|
|
||||||
|
@ -998,6 +1022,10 @@ public class QueueControlTest extends ManagementTestBase {
|
||||||
// Assert DLQ is empty...
|
// Assert DLQ is empty...
|
||||||
Assert.assertEquals(0, getMessageCount(dlqQueueControl));
|
Assert.assertEquals(0, getMessageCount(dlqQueueControl));
|
||||||
|
|
||||||
|
//Verify that original queue has a memory size of greater than 0 and DLQ is 0 after move
|
||||||
|
assertTrue(queueMemorySize1.get() > 0);
|
||||||
|
assertTrue(queueMemorySize2.get() == 0);
|
||||||
|
|
||||||
// .. and that the messages is now on the original queue once more.
|
// .. and that the messages is now on the original queue once more.
|
||||||
for (int i = 0; i < numMessagesToTest; i++) {
|
for (int i = 0; i < numMessagesToTest; i++) {
|
||||||
ClientMessage clientMessage = clientConsumer.receive(500);
|
ClientMessage clientMessage = clientConsumer.receive(500);
|
||||||
|
@ -1007,6 +1035,10 @@ public class QueueControlTest extends ManagementTestBase {
|
||||||
}
|
}
|
||||||
|
|
||||||
clientConsumer.close();
|
clientConsumer.close();
|
||||||
|
|
||||||
|
//Verify that original queue and DLQ have a memory size of 0
|
||||||
|
assertTrue(queueMemorySize1.get() == 0);
|
||||||
|
assertTrue(queueMemorySize2.get() == 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -1035,14 +1067,28 @@ public class QueueControlTest extends ManagementTestBase {
|
||||||
message.putLongProperty(key, value);
|
message.putLongProperty(key, value);
|
||||||
producer.send(message);
|
producer.send(message);
|
||||||
|
|
||||||
|
final LocalQueueBinding binding = (LocalQueueBinding) server.getPostOffice().getBinding(queue);
|
||||||
|
Queue q = binding.getQueue();
|
||||||
|
Field queueMemorySizeField = QueueImpl.class.getDeclaredField("queueMemorySize");
|
||||||
|
queueMemorySizeField.setAccessible(true);
|
||||||
|
|
||||||
|
//Get memory size counters to verify
|
||||||
|
AtomicInteger queueMemorySize = (AtomicInteger) queueMemorySizeField.get(q);
|
||||||
|
|
||||||
QueueControl queueControl = createManagementControl(address, queue);
|
QueueControl queueControl = createManagementControl(address, queue);
|
||||||
Assert.assertEquals(1, getMessageCount(queueControl));
|
Assert.assertEquals(1, getMessageCount(queueControl));
|
||||||
|
|
||||||
|
//verify memory usage is greater than 0
|
||||||
|
Assert.assertTrue(queueMemorySize.get() > 0);
|
||||||
|
|
||||||
// moved all messages to otherQueue
|
// moved all messages to otherQueue
|
||||||
int movedMessagesCount = queueControl.moveMessages(null, otherQueue.toString());
|
int movedMessagesCount = queueControl.moveMessages(null, otherQueue.toString());
|
||||||
Assert.assertEquals(1, movedMessagesCount);
|
Assert.assertEquals(1, movedMessagesCount);
|
||||||
Assert.assertEquals(0, getMessageCount(queueControl));
|
Assert.assertEquals(0, getMessageCount(queueControl));
|
||||||
|
|
||||||
|
//verify memory usage is 0 after move
|
||||||
|
Assert.assertEquals(0, queueMemorySize.get());
|
||||||
|
|
||||||
// check there is no message to consume from queue
|
// check there is no message to consume from queue
|
||||||
consumeMessages(0, session, queue);
|
consumeMessages(0, session, queue);
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue