This closes #1907
This commit is contained in:
commit
838859f59a
|
@ -1484,9 +1484,16 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
|
||||||
return iterQueue(flushLimit, filter1, new QueueIterateAction() {
|
return iterQueue(flushLimit, filter1, new QueueIterateAction() {
|
||||||
@Override
|
@Override
|
||||||
public void actMessage(Transaction tx, MessageReference ref) throws Exception {
|
public void actMessage(Transaction tx, MessageReference ref) throws Exception {
|
||||||
|
actMessage(tx, ref, true);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void actMessage(Transaction tx, MessageReference ref, boolean fromMessageReferences) throws Exception {
|
||||||
incDelivering(ref);
|
incDelivering(ref);
|
||||||
acknowledge(tx, ref, ackReason);
|
acknowledge(tx, ref, ackReason);
|
||||||
refRemoved(ref);
|
if (fromMessageReferences) {
|
||||||
|
refRemoved(ref);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
@ -1558,7 +1565,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
|
||||||
if (filter1 == null || filter1.match(reference.getMessage())) {
|
if (filter1 == null || filter1.match(reference.getMessage())) {
|
||||||
count++;
|
count++;
|
||||||
txCount++;
|
txCount++;
|
||||||
messageAction.actMessage(tx, reference);
|
messageAction.actMessage(tx, reference, false);
|
||||||
} else {
|
} else {
|
||||||
addTail(reference, false);
|
addTail(reference, false);
|
||||||
}
|
}
|
||||||
|
@ -2764,7 +2771,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
|
||||||
ref.acknowledge(tx, AckReason.KILLED);
|
ref.acknowledge(tx, AckReason.KILLED);
|
||||||
} else {
|
} else {
|
||||||
ActiveMQServerLogger.LOGGER.messageExceededMaxDeliverySendtoDLA(ref, deadLetterAddress, name);
|
ActiveMQServerLogger.LOGGER.messageExceededMaxDeliverySendtoDLA(ref, deadLetterAddress, name);
|
||||||
move(tx, deadLetterAddress,null, ref, false, AckReason.KILLED);
|
move(tx, deadLetterAddress, null, ref, false, AckReason.KILLED);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
ActiveMQServerLogger.LOGGER.messageExceededMaxDeliveryNoDLA(ref, name);
|
ActiveMQServerLogger.LOGGER.messageExceededMaxDeliveryNoDLA(ref, name);
|
||||||
|
@ -3191,6 +3198,10 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
|
||||||
abstract class QueueIterateAction {
|
abstract class QueueIterateAction {
|
||||||
|
|
||||||
public abstract void actMessage(Transaction tx, MessageReference ref) throws Exception;
|
public abstract void actMessage(Transaction tx, MessageReference ref) throws Exception;
|
||||||
|
|
||||||
|
public void actMessage(Transaction tx, MessageReference ref, boolean fromMessageReferences) throws Exception {
|
||||||
|
actMessage(tx, ref);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/* For external use we need to use a synchronized version since the list is not thread safe */
|
/* For external use we need to use a synchronized version since the list is not thread safe */
|
||||||
|
|
|
@ -19,6 +19,7 @@ package org.apache.activemq.artemis.tests.integration.management;
|
||||||
import static org.apache.activemq.artemis.core.management.impl.openmbean.CompositeDataConstants.BODY;
|
import static org.apache.activemq.artemis.core.management.impl.openmbean.CompositeDataConstants.BODY;
|
||||||
|
|
||||||
import java.lang.reflect.Field;
|
import java.lang.reflect.Field;
|
||||||
|
import java.nio.ByteBuffer;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
|
@ -36,6 +37,7 @@ import javax.json.JsonObject;
|
||||||
import javax.management.Notification;
|
import javax.management.Notification;
|
||||||
import javax.management.openmbean.CompositeData;
|
import javax.management.openmbean.CompositeData;
|
||||||
|
|
||||||
|
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
|
||||||
import org.apache.activemq.artemis.api.core.ActiveMQException;
|
import org.apache.activemq.artemis.api.core.ActiveMQException;
|
||||||
import org.apache.activemq.artemis.api.core.JsonUtil;
|
import org.apache.activemq.artemis.api.core.JsonUtil;
|
||||||
import org.apache.activemq.artemis.api.core.Message;
|
import org.apache.activemq.artemis.api.core.Message;
|
||||||
|
@ -57,6 +59,7 @@ import org.apache.activemq.artemis.api.core.management.QueueControl;
|
||||||
import org.apache.activemq.artemis.core.config.Configuration;
|
import org.apache.activemq.artemis.core.config.Configuration;
|
||||||
import org.apache.activemq.artemis.core.config.DivertConfiguration;
|
import org.apache.activemq.artemis.core.config.DivertConfiguration;
|
||||||
import org.apache.activemq.artemis.core.messagecounter.impl.MessageCounterManagerImpl;
|
import org.apache.activemq.artemis.core.messagecounter.impl.MessageCounterManagerImpl;
|
||||||
|
import org.apache.activemq.artemis.core.paging.impl.PagingManagerImpl;
|
||||||
import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding;
|
import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding;
|
||||||
import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
||||||
import org.apache.activemq.artemis.core.server.ActiveMQServers;
|
import org.apache.activemq.artemis.core.server.ActiveMQServers;
|
||||||
|
@ -946,8 +949,8 @@ public class QueueControlTest extends ManagementTestBase {
|
||||||
session.createAddress(myTopic, RoutingType.MULTICAST, false);
|
session.createAddress(myTopic, RoutingType.MULTICAST, false);
|
||||||
|
|
||||||
DivertConfiguration divert = new DivertConfiguration().setName("local-divert")
|
DivertConfiguration divert = new DivertConfiguration().setName("local-divert")
|
||||||
.setRoutingName("some-name").setAddress(myTopic.toString())
|
.setRoutingName("some-name").setAddress(myTopic.toString())
|
||||||
.setForwardingAddress(forwardingAddress.toString()).setExclusive(false);
|
.setForwardingAddress(forwardingAddress.toString()).setExclusive(false);
|
||||||
server.deployDivert(divert);
|
server.deployDivert(divert);
|
||||||
|
|
||||||
// Send message to topic.
|
// Send message to topic.
|
||||||
|
@ -1505,6 +1508,63 @@ public class QueueControlTest extends ManagementTestBase {
|
||||||
session.deleteQueue(queue);
|
session.deleteQueue(queue);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testRemoveAllWithPagingMode() throws Exception {
|
||||||
|
|
||||||
|
final int MESSAGE_SIZE = 1024 * 3; // 3k
|
||||||
|
|
||||||
|
// reset maxSize for Paging mode
|
||||||
|
Field maxSizField = PagingManagerImpl.class.getDeclaredField("maxSize");
|
||||||
|
maxSizField.setAccessible(true);
|
||||||
|
maxSizField.setLong(server.getPagingManager(), 10240);
|
||||||
|
clearDataRecreateServerDirs();
|
||||||
|
|
||||||
|
SimpleString address = RandomUtil.randomSimpleString();
|
||||||
|
SimpleString queueName = RandomUtil.randomSimpleString();
|
||||||
|
|
||||||
|
session.createQueue(address, RoutingType.MULTICAST, queueName, null, durable);
|
||||||
|
|
||||||
|
Queue queue = server.locateQueue(queueName);
|
||||||
|
Assert.assertEquals(false, queue.getPageSubscription().isPaging());
|
||||||
|
|
||||||
|
ClientProducer producer = session.createProducer(address);
|
||||||
|
|
||||||
|
byte[] body = new byte[MESSAGE_SIZE];
|
||||||
|
|
||||||
|
ByteBuffer bb = ByteBuffer.wrap(body);
|
||||||
|
|
||||||
|
for (int j = 1; j <= MESSAGE_SIZE; j++) {
|
||||||
|
bb.put(getSamplebyte(j));
|
||||||
|
}
|
||||||
|
|
||||||
|
final int numberOfMessages = 8000;
|
||||||
|
ClientMessage message;
|
||||||
|
for (int i = 0; i < numberOfMessages; i++) {
|
||||||
|
message = session.createMessage(true);
|
||||||
|
|
||||||
|
ActiveMQBuffer bodyLocal = message.getBodyBuffer();
|
||||||
|
|
||||||
|
bodyLocal.writeBytes(body);
|
||||||
|
|
||||||
|
producer.send(message);
|
||||||
|
}
|
||||||
|
|
||||||
|
Assert.assertEquals(true, queue.getPageSubscription().isPaging());
|
||||||
|
|
||||||
|
QueueControl queueControl = createManagementControl(address, queueName);
|
||||||
|
assertMessageMetrics(queueControl, numberOfMessages, durable);
|
||||||
|
int removedMatchedMessagesCount = queueControl.removeAllMessages();
|
||||||
|
Assert.assertEquals(numberOfMessages, removedMatchedMessagesCount);
|
||||||
|
assertMessageMetrics(queueControl, 0, durable);
|
||||||
|
|
||||||
|
Field queueMemoprySizeField = QueueImpl.class.getDeclaredField("queueMemorySize");
|
||||||
|
queueMemoprySizeField.setAccessible(true);
|
||||||
|
AtomicInteger queueMemorySize = (AtomicInteger) queueMemoprySizeField.get(queue);
|
||||||
|
Assert.assertEquals(0, queueMemorySize.get());
|
||||||
|
|
||||||
|
session.deleteQueue(queueName);
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testRemoveMessagesWithEmptyFilter() throws Exception {
|
public void testRemoveMessagesWithEmptyFilter() throws Exception {
|
||||||
SimpleString address = RandomUtil.randomSimpleString();
|
SimpleString address = RandomUtil.randomSimpleString();
|
||||||
|
@ -2491,8 +2551,8 @@ public class QueueControlTest extends ManagementTestBase {
|
||||||
}
|
}
|
||||||
|
|
||||||
protected void assertMetrics(final QueueControl queueControl, long messageCount, boolean durable,
|
protected void assertMetrics(final QueueControl queueControl, long messageCount, boolean durable,
|
||||||
Supplier<Number> count, Supplier<Number> size,
|
Supplier<Number> count, Supplier<Number> size,
|
||||||
Supplier<Number>durableCount, Supplier<Number> durableSize) throws Exception {
|
Supplier<Number> durableCount, Supplier<Number> durableSize) throws Exception {
|
||||||
|
|
||||||
//make sure count stat equals message count
|
//make sure count stat equals message count
|
||||||
Assert.assertTrue(Wait.waitFor(() -> count.get().longValue() == messageCount, 3, 100));
|
Assert.assertTrue(Wait.waitFor(() -> count.get().longValue() == messageCount, 3, 100));
|
||||||
|
|
Loading…
Reference in New Issue