ARTEMIS-1705 Only messages from MessageReferences are subtracted from the queueMemorySize

This commit is contained in:
17103355 2018-02-28 11:38:00 +08:00 committed by Clebert Suconic
parent 66dbc9e3b4
commit c808f246e5
2 changed files with 78 additions and 7 deletions

View File

@ -1484,10 +1484,17 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
return iterQueue(flushLimit, filter1, new QueueIterateAction() { return iterQueue(flushLimit, filter1, new QueueIterateAction() {
@Override @Override
public void actMessage(Transaction tx, MessageReference ref) throws Exception { public void actMessage(Transaction tx, MessageReference ref) throws Exception {
actMessage(tx, ref, true);
}
@Override
public void actMessage(Transaction tx, MessageReference ref, boolean fromMessageReferences) throws Exception {
incDelivering(ref); incDelivering(ref);
acknowledge(tx, ref, ackReason); acknowledge(tx, ref, ackReason);
if (fromMessageReferences) {
refRemoved(ref); refRemoved(ref);
} }
}
}); });
} }
@ -1558,7 +1565,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
if (filter1 == null || filter1.match(reference.getMessage())) { if (filter1 == null || filter1.match(reference.getMessage())) {
count++; count++;
txCount++; txCount++;
messageAction.actMessage(tx, reference); messageAction.actMessage(tx, reference, false);
} else { } else {
addTail(reference, false); addTail(reference, false);
} }
@ -3191,6 +3198,10 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
abstract class QueueIterateAction { abstract class QueueIterateAction {
public abstract void actMessage(Transaction tx, MessageReference ref) throws Exception; public abstract void actMessage(Transaction tx, MessageReference ref) throws Exception;
public void actMessage(Transaction tx, MessageReference ref, boolean fromMessageReferences) throws Exception {
actMessage(tx, ref);
}
} }
/* For external use we need to use a synchronized version since the list is not thread safe */ /* For external use we need to use a synchronized version since the list is not thread safe */

View File

@ -19,6 +19,7 @@ package org.apache.activemq.artemis.tests.integration.management;
import static org.apache.activemq.artemis.core.management.impl.openmbean.CompositeDataConstants.BODY; import static org.apache.activemq.artemis.core.management.impl.openmbean.CompositeDataConstants.BODY;
import java.lang.reflect.Field; import java.lang.reflect.Field;
import java.nio.ByteBuffer;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
import java.util.HashMap; import java.util.HashMap;
@ -36,6 +37,7 @@ import javax.json.JsonObject;
import javax.management.Notification; import javax.management.Notification;
import javax.management.openmbean.CompositeData; import javax.management.openmbean.CompositeData;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.JsonUtil; import org.apache.activemq.artemis.api.core.JsonUtil;
import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.Message;
@ -57,6 +59,7 @@ import org.apache.activemq.artemis.api.core.management.QueueControl;
import org.apache.activemq.artemis.core.config.Configuration; import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.config.DivertConfiguration; import org.apache.activemq.artemis.core.config.DivertConfiguration;
import org.apache.activemq.artemis.core.messagecounter.impl.MessageCounterManagerImpl; import org.apache.activemq.artemis.core.messagecounter.impl.MessageCounterManagerImpl;
import org.apache.activemq.artemis.core.paging.impl.PagingManagerImpl;
import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding; import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding;
import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServers; import org.apache.activemq.artemis.core.server.ActiveMQServers;
@ -1505,6 +1508,63 @@ public class QueueControlTest extends ManagementTestBase {
session.deleteQueue(queue); session.deleteQueue(queue);
} }
@Test
public void testRemoveAllWithPagingMode() throws Exception {
final int MESSAGE_SIZE = 1024 * 3; // 3k
// reset maxSize for Paging mode
Field maxSizField = PagingManagerImpl.class.getDeclaredField("maxSize");
maxSizField.setAccessible(true);
maxSizField.setLong(server.getPagingManager(), 10240);
clearDataRecreateServerDirs();
SimpleString address = RandomUtil.randomSimpleString();
SimpleString queueName = RandomUtil.randomSimpleString();
session.createQueue(address, RoutingType.MULTICAST, queueName, null, durable);
Queue queue = server.locateQueue(queueName);
Assert.assertEquals(false, queue.getPageSubscription().isPaging());
ClientProducer producer = session.createProducer(address);
byte[] body = new byte[MESSAGE_SIZE];
ByteBuffer bb = ByteBuffer.wrap(body);
for (int j = 1; j <= MESSAGE_SIZE; j++) {
bb.put(getSamplebyte(j));
}
final int numberOfMessages = 8000;
ClientMessage message;
for (int i = 0; i < numberOfMessages; i++) {
message = session.createMessage(true);
ActiveMQBuffer bodyLocal = message.getBodyBuffer();
bodyLocal.writeBytes(body);
producer.send(message);
}
Assert.assertEquals(true, queue.getPageSubscription().isPaging());
QueueControl queueControl = createManagementControl(address, queueName);
assertMessageMetrics(queueControl, numberOfMessages, durable);
int removedMatchedMessagesCount = queueControl.removeAllMessages();
Assert.assertEquals(numberOfMessages, removedMatchedMessagesCount);
assertMessageMetrics(queueControl, 0, durable);
Field queueMemoprySizeField = QueueImpl.class.getDeclaredField("queueMemorySize");
queueMemoprySizeField.setAccessible(true);
AtomicInteger queueMemorySize = (AtomicInteger) queueMemoprySizeField.get(queue);
Assert.assertEquals(0, queueMemorySize.get());
session.deleteQueue(queueName);
}
@Test @Test
public void testRemoveMessagesWithEmptyFilter() throws Exception { public void testRemoveMessagesWithEmptyFilter() throws Exception {
SimpleString address = RandomUtil.randomSimpleString(); SimpleString address = RandomUtil.randomSimpleString();