ARTEMIS-4765 DuplicateIDCache on Mirror Target is using 20K elements instead of amqpCredits

This commit is contained in:
Clebert Suconic 2024-05-07 18:56:57 -04:00 committed by clebertsuconic
parent 3c46871f79
commit 355c600ea8
7 changed files with 159 additions and 13 deletions

View File

@ -31,6 +31,10 @@ public interface DuplicateIDCache {
void addToCache(byte[] duplicateID, Transaction tx) throws Exception; void addToCache(byte[] duplicateID, Transaction tx) throws Exception;
int getSize();
DuplicateIDCache resize(int newSize);
/** /**
* it will add the data to the cache. * it will add the data to the cache.
* If TX == null it won't use a transaction. * If TX == null it won't use a transaction.

View File

@ -58,7 +58,7 @@ final class InMemoryDuplicateIDCache implements DuplicateIDCache {
private int pos; private int pos;
private final int cacheSize; private int cacheSize;
InMemoryDuplicateIDCache(final SimpleString address, final int size) { InMemoryDuplicateIDCache(final SimpleString address, final int size) {
this.address = address; this.address = address;
@ -75,6 +75,18 @@ final class InMemoryDuplicateIDCache implements DuplicateIDCache {
logger.debug("address = {} ignore loading ids: in memory cache won't load previously stored ids", address); logger.debug("address = {} ignore loading ids: in memory cache won't load previously stored ids", address);
} }
@Override
public int getSize() {
return cacheSize;
}
@Override
public synchronized DuplicateIDCache resize(int newSize) {
newSize = Math.max(cache.size(), newSize);
this.cacheSize = newSize;
return this;
}
@Override @Override
public void deleteFromCache(byte[] duplicateID) { public void deleteFromCache(byte[] duplicateID) {
deleteFromCache(new ByteArray(duplicateID)); deleteFromCache(new ByteArray(duplicateID));

View File

@ -70,6 +70,16 @@ public final class NoOpDuplicateIDCache implements DuplicateIDCache {
} }
@Override
public int getSize() {
return 0;
}
@Override
public DuplicateIDCache resize(int newSize) {
return null;
}
@Override @Override
public List<Pair<byte[], Long>> getMap() { public List<Pair<byte[], Long>> getMap() {
return Collections.emptyList(); return Collections.emptyList();

View File

@ -62,7 +62,7 @@ final class PersistentDuplicateIDCache implements DuplicateIDCache {
private int pos; private int pos;
private final int cacheSize; private int cacheSize;
private final StorageManager storageManager; private final StorageManager storageManager;
@ -173,6 +173,20 @@ final class PersistentDuplicateIDCache implements DuplicateIDCache {
} }
@Override
public int getSize() {
return cacheSize;
}
@Override
public synchronized DuplicateIDCache resize(int newSize) {
// We won't be shrinking items here
newSize = Math.max(cache.size(), newSize);
this.cacheSize = newSize;
logger.trace("newSize = {} after math.min check", newSize);
return this;
}
private static String describeID(byte[] duplicateID) { private static String describeID(byte[] duplicateID) {
return ByteUtil.bytesToHex(duplicateID, 4) + ", simpleString=" + ByteUtil.toSimpleString(duplicateID); return ByteUtil.bytesToHex(duplicateID, 4) + ", simpleString=" + ByteUtil.toSimpleString(duplicateID);
} }

View File

@ -1464,6 +1464,10 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
} }
} }
if (cache.getSize() != cacheSizeToUse) {
cache.resize(cacheSizeToUse);
}
return cache; return cache;
} }

View File

@ -0,0 +1,64 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.persistence;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds;
import org.apache.activemq.artemis.core.postoffice.DuplicateIDCache;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.utils.RandomUtil;
import org.junit.Assert;
import org.junit.Test;
public class ResizeDuplicateCacheTest extends ActiveMQTestBase {
@Test
public void testResizeCache() throws Exception {
int duplicateSize = 30;
SimpleString randomString = RandomUtil.randomSimpleString();
ActiveMQServer server = createServer(true, false);
server.start();
DuplicateIDCache duplicateIDCache = server.getPostOffice().getDuplicateIDCache(randomString, duplicateSize);
for (int i = 0; i < duplicateSize * 2; i++) {
duplicateIDCache.addToCache(("a" + i).getBytes(StandardCharsets.UTF_8));
}
server.stop();
server.start();
duplicateIDCache = server.getPostOffice().getDuplicateIDCache(randomString, duplicateSize);
Assert.assertEquals(duplicateSize, duplicateIDCache.getSize());
server.getStorageManager().getMessageJournal().scheduleCompactAndBlock(10_000);
HashMap<Integer, AtomicInteger> records = countJournal(server.getConfiguration());
AtomicInteger duplicateRecordsCount = records.get((int) JournalRecordIds.DUPLICATE_ID);
Assert.assertNotNull(duplicateRecordsCount);
Assert.assertEquals(duplicateSize, duplicateRecordsCount.get());
server.stop();
}
}

View File

@ -27,14 +27,18 @@ import javax.jms.Topic;
import java.io.File; import java.io.File;
import java.io.StringWriter; import java.io.StringWriter;
import java.lang.invoke.MethodHandles; import java.lang.invoke.MethodHandles;
import java.util.HashMap;
import java.util.Properties; import java.util.Properties;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.api.core.management.SimpleManagement; import org.apache.activemq.artemis.api.core.management.SimpleManagement;
import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectionAddressType; import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectionAddressType;
import org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.tests.soak.SoakTestBase; import org.apache.activemq.artemis.tests.soak.SoakTestBase;
import org.apache.activemq.artemis.tests.util.CFUtil; import org.apache.activemq.artemis.tests.util.CFUtil;
import org.apache.activemq.artemis.util.ServerUtil; import org.apache.activemq.artemis.util.ServerUtil;
@ -62,6 +66,13 @@ public class SingleMirrorSoakTest extends SoakTestBase {
private static final int SEND_COMMIT = TestParameters.testProperty(TEST_NAME, "SEND_COMMIT", 100); private static final int SEND_COMMIT = TestParameters.testProperty(TEST_NAME, "SEND_COMMIT", 100);
private static final int KILL_INTERNAL = TestParameters.testProperty(TEST_NAME, "KILL_INTERVAL", 500); private static final int KILL_INTERNAL = TestParameters.testProperty(TEST_NAME, "KILL_INTERVAL", 500);
private static final int SNF_TIMEOUT = TestParameters.testProperty(TEST_NAME, "SNF_TIMEOUT", 60_000); private static final int SNF_TIMEOUT = TestParameters.testProperty(TEST_NAME, "SNF_TIMEOUT", 60_000);
private static final int GENERAL_WAIT_TIMEOUT = TestParameters.testProperty(TEST_NAME, "GENERAL_TIMEOUT", 10_000);
/*
* Time each consumer takes to process a message received to allow some messages accumulating.
* This sleep happens right before the commit.
*/
private static final int CONSUMER_PROCESSING_TIME = TestParameters.testProperty(TEST_NAME, "CONSUMER_PROCESSING_TIME", 0);
private static final String TOPIC_NAME = "topicTest"; private static final String TOPIC_NAME = "topicTest";
@ -82,12 +93,16 @@ public class SingleMirrorSoakTest extends SoakTestBase {
volatile Process processDC2; volatile Process processDC2;
@After @After
public void destroyServers() { public void destroyServers() throws Exception {
if (processDC1 != null) { if (processDC1 != null) {
processDC1.destroyForcibly(); processDC1.destroyForcibly();
processDC1.waitFor(1, TimeUnit.MINUTES);
processDC1 = null;
} }
if (processDC2 != null) { if (processDC2 != null) {
processDC2.destroyForcibly(); processDC2.destroyForcibly();
processDC2.waitFor(1, TimeUnit.MINUTES);
processDC2 = null;
} }
} }
@ -121,6 +136,15 @@ public class SingleMirrorSoakTest extends SoakTestBase {
brokerProperties.put("largeMessageSync", "false"); brokerProperties.put("largeMessageSync", "false");
brokerProperties.put("mirrorAckManagerPageAttempts", "10"); brokerProperties.put("mirrorAckManagerPageAttempts", "10");
brokerProperties.put("mirrorAckManagerRetryDelay", "1000"); brokerProperties.put("mirrorAckManagerRetryDelay", "1000");
if (paging) {
brokerProperties.put("addressSettings.#.maxSizeMessages", "1");
brokerProperties.put("addressSettings.#.maxReadPageMessages", "100000");
brokerProperties.put("addressSettings.#.maxReadPageBytes", "-1");
brokerProperties.put("addressSettings.#.prefetchPageMessages", "10000");
// un-comment this line if you want to rather use the work around without the fix on the PostOfficeImpl
// brokerProperties.put("addressSettings.#.iDCacheSize", "1000");
}
// if we don't use pageTransactions we may eventually get a few duplicates // if we don't use pageTransactions we may eventually get a few duplicates
brokerProperties.put("mirrorPageTransaction", "true"); brokerProperties.put("mirrorPageTransaction", "true");
File brokerPropertiesFile = new File(serverLocation, "broker.properties"); File brokerPropertiesFile = new File(serverLocation, "broker.properties");
@ -130,11 +154,6 @@ public class SingleMirrorSoakTest extends SoakTestBase {
Assert.assertTrue(brokerXml.exists()); Assert.assertTrue(brokerXml.exists());
// Adding redistribution delay to broker configuration // Adding redistribution delay to broker configuration
Assert.assertTrue(FileUtil.findReplace(brokerXml, "<address-setting match=\"#\">", "<address-setting match=\"#\">\n\n" + " <redistribution-delay>0</redistribution-delay> <!-- added by SimpleMirrorSoakTest.java --> \n")); Assert.assertTrue(FileUtil.findReplace(brokerXml, "<address-setting match=\"#\">", "<address-setting match=\"#\">\n\n" + " <redistribution-delay>0</redistribution-delay> <!-- added by SimpleMirrorSoakTest.java --> \n"));
if (paging) {
Assert.assertTrue(FileUtil.findReplace(brokerXml, "<max-size-messages>-1</max-size-messages>", "<max-size-messages>1</max-size-messages>"));
Assert.assertTrue(FileUtil.findReplace(brokerXml, "<max-read-page-bytes>20M</max-read-page-bytes>", "<max-read-page-bytes>-1</max-read-page-bytes>"));
Assert.assertTrue(FileUtil.findReplace(brokerXml, "<max-read-page-messages>-1</max-read-page-messages>", "<max-read-page-messages>100000</max-read-page-messages>\n" + " <prefetch-page-messages>10000</prefetch-page-messages>"));
}
if (TRACE_LOGS) { if (TRACE_LOGS) {
File log4j = new File(serverLocation, "/etc/log4j2.properties"); File log4j = new File(serverLocation, "/etc/log4j2.properties");
@ -248,10 +267,27 @@ public class SingleMirrorSoakTest extends SoakTestBase {
Wait.assertEquals(0, () -> getCount(managementDC1, snfQueue), SNF_TIMEOUT); Wait.assertEquals(0, () -> getCount(managementDC1, snfQueue), SNF_TIMEOUT);
Wait.assertEquals(0, () -> getCount(managementDC2, snfQueue), SNF_TIMEOUT); Wait.assertEquals(0, () -> getCount(managementDC2, snfQueue), SNF_TIMEOUT);
Wait.assertEquals(0, () -> getCount(managementDC1, clientIDA + "." + subscriptionID), 10_000); Wait.assertEquals(0, () -> getCount(managementDC1, clientIDA + "." + subscriptionID), GENERAL_WAIT_TIMEOUT);
Wait.assertEquals(0, () -> getCount(managementDC1, clientIDB + "." + subscriptionID), 10_000); Wait.assertEquals(0, () -> getCount(managementDC1, clientIDB + "." + subscriptionID), GENERAL_WAIT_TIMEOUT);
Wait.assertEquals(0, () -> getCount(managementDC2, clientIDA + "." + subscriptionID), 10_000); Wait.assertEquals(0, () -> getCount(managementDC2, clientIDA + "." + subscriptionID), GENERAL_WAIT_TIMEOUT);
Wait.assertEquals(0, () -> getCount(managementDC2, clientIDB + "." + subscriptionID), 10_000); Wait.assertEquals(0, () -> getCount(managementDC2, clientIDB + "." + subscriptionID), GENERAL_WAIT_TIMEOUT);
destroyServers();
// counting the number of records on duplicate cache
// to validate if ARTEMIS-4765 is fixed
ActiveMQServer server = createServer(true, false);
server.getConfiguration().setJournalDirectory(getServerLocation(DC2_NODE) + "/data/journal");
server.getConfiguration().setBindingsDirectory(getServerLocation(DC2_NODE) + "/data/bindings");
server.getConfiguration().setPagingDirectory(getServerLocation(DC2_NODE) + "/data/paging");
server.start();
server.getStorageManager().getMessageJournal().scheduleCompactAndBlock(10_000);
HashMap<Integer, AtomicInteger> records = countJournal(server.getConfiguration());
AtomicInteger duplicateRecordsCount = records.get((int) JournalRecordIds.DUPLICATE_ID);
Assert.assertNotNull(duplicateRecordsCount);
// 1000 credits by default
Assert.assertTrue(duplicateRecordsCount.get() <= 1000);
} }
private static void consume(ConnectionFactory factory, private static void consume(ConnectionFactory factory,
@ -283,6 +319,9 @@ public class SingleMirrorSoakTest extends SoakTestBase {
logger.debug("Consumed {}, large={}", i, message.getBooleanProperty("large")); logger.debug("Consumed {}, large={}", i, message.getBooleanProperty("large"));
pendingCommit++; pendingCommit++;
if (pendingCommit >= batchCommit) { if (pendingCommit >= batchCommit) {
if (CONSUMER_PROCESSING_TIME > 0) {
Thread.sleep(CONSUMER_PROCESSING_TIME);
}
logger.info("received {}", i); logger.info("received {}", i);
session.commit(); session.commit();
pendingCommit = 0; pendingCommit = 0;
@ -301,7 +340,6 @@ public class SingleMirrorSoakTest extends SoakTestBase {
public long getCount(SimpleManagement simpleManagement, String queue) throws Exception { public long getCount(SimpleManagement simpleManagement, String queue) throws Exception {
try { try {
long value = simpleManagement.getMessageCountOnQueue(queue); long value = simpleManagement.getMessageCountOnQueue(queue);
logger.debug("count on queue {} is {}", queue, value);
return value; return value;
} catch (Exception e) { } catch (Exception e) {
logger.warn(e.getMessage(), e); logger.warn(e.getMessage(), e);