This closes #1840
This commit is contained in:
commit
8fc8ae2add
|
@ -1813,6 +1813,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
|
|||
|
||||
if (!ignored) {
|
||||
move(null, toAddress, binding, ref, rejectDuplicates, AckReason.NORMAL);
|
||||
refRemoved(ref);
|
||||
//move(toAddress, tx, ref, false, rejectDuplicates);
|
||||
}
|
||||
}
|
||||
|
@ -1861,9 +1862,8 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
|
|||
move(SimpleString.toSimpleString(originalMessageAddress), tx, ref, false, false, targetQueue.longValue());
|
||||
} else {
|
||||
move(SimpleString.toSimpleString(originalMessageAddress), tx, ref, false, false);
|
||||
|
||||
}
|
||||
|
||||
refRemoved(ref);
|
||||
}
|
||||
}
|
||||
});
|
||||
|
|
|
@ -18,6 +18,7 @@ package org.apache.activemq.artemis.tests.integration.management;
|
|||
|
||||
import static org.apache.activemq.artemis.core.management.impl.openmbean.CompositeDataConstants.BODY;
|
||||
|
||||
import java.lang.reflect.Field;
|
||||
import java.util.HashMap;
|
||||
import java.util.LinkedList;
|
||||
import java.util.Map;
|
||||
|
@ -25,6 +26,7 @@ import java.util.concurrent.CountDownLatch;
|
|||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import javax.json.JsonArray;
|
||||
import javax.json.JsonObject;
|
||||
|
@ -52,9 +54,11 @@ import org.apache.activemq.artemis.api.core.management.QueueControl;
|
|||
import org.apache.activemq.artemis.core.config.Configuration;
|
||||
import org.apache.activemq.artemis.core.config.DivertConfiguration;
|
||||
import org.apache.activemq.artemis.core.messagecounter.impl.MessageCounterManagerImpl;
|
||||
import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding;
|
||||
import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
||||
import org.apache.activemq.artemis.core.server.ActiveMQServers;
|
||||
import org.apache.activemq.artemis.core.server.Queue;
|
||||
import org.apache.activemq.artemis.core.server.impl.QueueImpl;
|
||||
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
|
||||
import org.apache.activemq.artemis.tests.integration.jms.server.management.JMSUtil;
|
||||
import org.apache.activemq.artemis.utils.Base64;
|
||||
|
@ -977,6 +981,22 @@ public class QueueControlTest extends ManagementTestBase {
|
|||
|
||||
session.start();
|
||||
|
||||
final LocalQueueBinding binding = (LocalQueueBinding) server.getPostOffice().getBinding(qName);
|
||||
Queue q = binding.getQueue();
|
||||
final LocalQueueBinding binding2 = (LocalQueueBinding) server.getPostOffice().getBinding(dlq);
|
||||
Queue q2 = binding2.getQueue();
|
||||
|
||||
Field queueMemorySizeField = QueueImpl.class.getDeclaredField("queueMemorySize");
|
||||
queueMemorySizeField.setAccessible(true);
|
||||
|
||||
//Get memory size counters to verify
|
||||
AtomicInteger queueMemorySize1 = (AtomicInteger) queueMemorySizeField.get(q);
|
||||
AtomicInteger queueMemorySize2 = (AtomicInteger) queueMemorySizeField.get(q2);
|
||||
|
||||
//Verify that original queue has a memory size greater than 0 and DLQ is 0
|
||||
assertTrue(queueMemorySize1.get() > 0);
|
||||
assertTrue(queueMemorySize2.get() == 0);
|
||||
|
||||
// Read and rollback all messages to DLQ
|
||||
ClientConsumer clientConsumer = session.createConsumer(qName);
|
||||
for (int i = 0; i < numMessagesToTest; i++) {
|
||||
|
@ -989,6 +1009,10 @@ public class QueueControlTest extends ManagementTestBase {
|
|||
|
||||
Assert.assertNull(clientConsumer.receiveImmediate());
|
||||
|
||||
//Verify that original queue has a memory size of 0 and DLQ is greater than 0 after rollback
|
||||
assertTrue(queueMemorySize1.get() == 0);
|
||||
assertTrue(queueMemorySize2.get() > 0);
|
||||
|
||||
QueueControl dlqQueueControl = createManagementControl(dla, dlq);
|
||||
Assert.assertEquals(numMessagesToTest, getMessageCount(dlqQueueControl));
|
||||
|
||||
|
@ -998,6 +1022,10 @@ public class QueueControlTest extends ManagementTestBase {
|
|||
// Assert DLQ is empty...
|
||||
Assert.assertEquals(0, getMessageCount(dlqQueueControl));
|
||||
|
||||
//Verify that original queue has a memory size of greater than 0 and DLQ is 0 after move
|
||||
assertTrue(queueMemorySize1.get() > 0);
|
||||
assertTrue(queueMemorySize2.get() == 0);
|
||||
|
||||
// .. and that the messages is now on the original queue once more.
|
||||
for (int i = 0; i < numMessagesToTest; i++) {
|
||||
ClientMessage clientMessage = clientConsumer.receive(500);
|
||||
|
@ -1007,6 +1035,10 @@ public class QueueControlTest extends ManagementTestBase {
|
|||
}
|
||||
|
||||
clientConsumer.close();
|
||||
|
||||
//Verify that original queue and DLQ have a memory size of 0
|
||||
assertTrue(queueMemorySize1.get() == 0);
|
||||
assertTrue(queueMemorySize2.get() == 0);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -1035,14 +1067,28 @@ public class QueueControlTest extends ManagementTestBase {
|
|||
message.putLongProperty(key, value);
|
||||
producer.send(message);
|
||||
|
||||
final LocalQueueBinding binding = (LocalQueueBinding) server.getPostOffice().getBinding(queue);
|
||||
Queue q = binding.getQueue();
|
||||
Field queueMemorySizeField = QueueImpl.class.getDeclaredField("queueMemorySize");
|
||||
queueMemorySizeField.setAccessible(true);
|
||||
|
||||
//Get memory size counters to verify
|
||||
AtomicInteger queueMemorySize = (AtomicInteger) queueMemorySizeField.get(q);
|
||||
|
||||
QueueControl queueControl = createManagementControl(address, queue);
|
||||
Assert.assertEquals(1, getMessageCount(queueControl));
|
||||
|
||||
//verify memory usage is greater than 0
|
||||
Assert.assertTrue(queueMemorySize.get() > 0);
|
||||
|
||||
// moved all messages to otherQueue
|
||||
int movedMessagesCount = queueControl.moveMessages(null, otherQueue.toString());
|
||||
Assert.assertEquals(1, movedMessagesCount);
|
||||
Assert.assertEquals(0, getMessageCount(queueControl));
|
||||
|
||||
//verify memory usage is 0 after move
|
||||
Assert.assertEquals(0, queueMemorySize.get());
|
||||
|
||||
// check there is no message to consume from queue
|
||||
consumeMessages(0, session, queue);
|
||||
|
||||
|
|
Loading…
Reference in New Issue