ARTEMIS-2508 Crititical analyser trigger shutdown if removeAllMessages

The crititical analyser trigger the broker shutdown if try to
removeAllMessages with a huge queue. The iterQueue is split so as
not to keep the lock too time.
This commit is contained in:
brusdev 2019-10-08 06:26:16 +02:00 committed by Clebert Suconic
parent 5ad8f89e1e
commit 28d1a53630
2 changed files with 124 additions and 36 deletions

View File

@ -1915,7 +1915,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
} }
@Override @Override
public synchronized int deleteMatchingReferences(final int flushLimit, final Filter filter1, AckReason ackReason) throws Exception { public int deleteMatchingReferences(final int flushLimit, final Filter filter1, AckReason ackReason) throws Exception {
return iterQueue(flushLimit, filter1, createDeleteMatchingAction(ackReason)); return iterQueue(flushLimit, filter1, createDeleteMatchingAction(ackReason));
} }
@ -1947,7 +1947,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
* @return * @return
* @throws Exception * @throws Exception
*/ */
private synchronized int iterQueue(final int flushLimit, private int iterQueue(final int flushLimit,
final Filter filter1, final Filter filter1,
QueueIterateAction messageAction) throws Exception { QueueIterateAction messageAction) throws Exception {
int count = 0; int count = 0;
@ -1955,45 +1955,47 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
Transaction tx = new TransactionImpl(storageManager); Transaction tx = new TransactionImpl(storageManager);
try (LinkedListIterator<MessageReference> iter = iterator()) { synchronized (this) {
try (LinkedListIterator<MessageReference> iter = iterator()) {
while (iter.hasNext()) { while (iter.hasNext()) {
MessageReference ref = iter.next(); MessageReference ref = iter.next();
if (ref.isPaged() && queueDestroyed) { if (ref.isPaged() && queueDestroyed) {
// this means the queue is being removed // this means the queue is being removed
// hence paged references are just going away through // hence paged references are just going away through
// page cleanup // page cleanup
continue; continue;
}
if (filter1 == null || filter1.match(ref.getMessage())) {
messageAction.actMessage(tx, ref);
iter.remove();
txCount++;
count++;
}
} }
if (filter1 == null || filter1.match(ref.getMessage())) { if (txCount > 0) {
messageAction.actMessage(tx, ref); tx.commit();
iter.remove();
txCount++; tx = new TransactionImpl(storageManager);
txCount = 0;
}
List<MessageReference> cancelled = scheduledDeliveryHandler.cancel(filter1);
for (MessageReference messageReference : cancelled) {
messageAction.actMessage(tx, messageReference, false);
count++; count++;
txCount++;
} }
}
if (txCount > 0) { if (txCount > 0) {
tx.commit(); tx.commit();
tx = new TransactionImpl(storageManager);
tx = new TransactionImpl(storageManager); txCount = 0;
}
txCount = 0;
}
List<MessageReference> cancelled = scheduledDeliveryHandler.cancel(filter1);
for (MessageReference messageReference : cancelled) {
messageAction.actMessage(tx, messageReference, false);
count++;
txCount++;
}
if (txCount > 0) {
tx.commit();
tx = new TransactionImpl(storageManager);
txCount = 0;
} }
if (pageIterator != null && !queueDestroyed) { if (pageIterator != null && !queueDestroyed) {
@ -2350,7 +2352,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
} }
@Override @Override
public synchronized int moveReferences(final int flushLimit, public int moveReferences(final int flushLimit,
final Filter filter, final Filter filter,
final SimpleString toAddress, final SimpleString toAddress,
final boolean rejectDuplicates, final boolean rejectDuplicates,
@ -2384,7 +2386,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
}); });
} }
public synchronized int moveReferencesBetweenSnFQueues(final SimpleString queueSuffix) throws Exception { public int moveReferencesBetweenSnFQueues(final SimpleString queueSuffix) throws Exception {
return iterQueue(DEFAULT_FLUSH_LIMIT, null, new QueueIterateAction() { return iterQueue(DEFAULT_FLUSH_LIMIT, null, new QueueIterateAction() {
@Override @Override
public void actMessage(Transaction tx, MessageReference ref) throws Exception { public void actMessage(Transaction tx, MessageReference ref) throws Exception {

View File

@ -57,6 +57,8 @@ import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory; import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.api.core.client.MessageHandler; import org.apache.activemq.artemis.api.core.client.MessageHandler;
import org.apache.activemq.artemis.api.core.client.ServerLocator; import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.api.core.management.QueueControl;
import org.apache.activemq.artemis.api.core.management.ResourceNames;
import org.apache.activemq.artemis.core.client.impl.ClientConsumerInternal; import org.apache.activemq.artemis.core.client.impl.ClientConsumerInternal;
import org.apache.activemq.artemis.core.config.Configuration; import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.config.DivertConfiguration; import org.apache.activemq.artemis.core.config.DivertConfiguration;
@ -492,6 +494,90 @@ public class PagingTest extends ActiveMQTestBase {
System.out.println("pgComplete = " + pgComplete); System.out.println("pgComplete = " + pgComplete);
} }
@Test
public void testQueueRemoveAll() throws Exception {
clearDataRecreateServerDirs();
Configuration config = createDefaultInVMConfig().setJournalSyncNonTransactional(false);
server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX);
server.start();
final int numberOfMessages = 5000;
locator = createInVMNonHALocator().setBlockOnNonDurableSend(true).setBlockOnDurableSend(true).setBlockOnAcknowledge(true);
sf = createSessionFactory(locator);
ClientSession session = sf.createSession(false, false, false);
session.createQueue(PagingTest.ADDRESS, PagingTest.ADDRESS, null, true);
ClientProducer producer = session.createProducer(PagingTest.ADDRESS);
ClientMessage message = null;
byte[] body = new byte[MESSAGE_SIZE];
ByteBuffer bb = ByteBuffer.wrap(body);
for (int j = 1; j <= MESSAGE_SIZE; j++) {
bb.put(getSamplebyte(j));
}
for (int i = 0; i < numberOfMessages; i++) {
message = session.createMessage(true);
ActiveMQBuffer bodyLocal = message.getBodyBuffer();
bodyLocal.writeBytes(body);
producer.send(message);
if (i % 1000 == 0) {
session.commit();
}
}
session.commit();
producer.close();
session.close();
session = sf.createSession(false, false, false);
producer = session.createProducer(PagingTest.ADDRESS);
producer.send(session.createMessage(true));
session.rollback();
producer.close();
session.close();
session = sf.createSession(false, false, false);
producer = session.createProducer(PagingTest.ADDRESS);
for (int i = 0; i < numberOfMessages; i++) {
message = session.createMessage(true);
ActiveMQBuffer bodyLocal = message.getBodyBuffer();
bodyLocal.writeBytes(body);
producer.send(message);
if (i % 1000 == 0) {
session.commit();
}
}
session.commit();
producer.close();
session.close();
Queue queue = server.locateQueue(PagingTest.ADDRESS);
Wait.assertEquals(numberOfMessages * 2, queue::getMessageCount);
QueueControl queueControl = (QueueControl) this.server.getManagementService().getResource(ResourceNames.QUEUE + PagingSendTest.ADDRESS);
int removedMessages = queueControl.removeAllMessages();
Assert.assertEquals(numberOfMessages * 2, removedMessages);
}
@Test @Test
public void testEmptyAddress() throws Exception { public void testEmptyAddress() throws Exception {
if (storeType == StoreConfiguration.StoreType.FILE) { if (storeType == StoreConfiguration.StoreType.FILE) {