diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java index 76d94eac05..81ca168588 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java @@ -377,6 +377,8 @@ public interface StorageManager extends IDGenerator, ActiveMQComponent { List recoverAddressSettings() throws Exception; + AbstractPersistedAddressSetting recoverAddressSettings(SimpleString address); + void storeSecuritySetting(PersistedSecuritySetting persistedRoles) throws Exception; void deleteSecuritySetting(SimpleString addressMatch) throws Exception; diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java index b63b88a9e7..4afcdb3430 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java @@ -751,6 +751,11 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp return new ArrayList<>(mapPersistedAddressSettings.values()); } + @Override + public AbstractPersistedAddressSetting recoverAddressSettings(SimpleString address) { + return mapPersistedAddressSettings.get(address); + } + @Override public List recoverSecuritySettings() throws Exception { return new ArrayList<>(mapPersistedSecuritySettings.values()); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageManager.java index 366f3040d9..6e09b3f76a 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageManager.java @@ -80,6 +80,11 @@ public class NullStorageManager implements StorageManager { private final IOCriticalErrorListener ioCriticalErrorListener; + @Override + public AbstractPersistedAddressSetting recoverAddressSettings(SimpleString address) { + return null; + } + public NullStorageManager(IOCriticalErrorListener ioCriticalErrorListener) { this.ioCriticalErrorListener = ioCriticalErrorListener; } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/DuplicateIDCache.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/DuplicateIDCache.java index 75cc17b7d3..11d8535d03 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/DuplicateIDCache.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/DuplicateIDCache.java @@ -31,6 +31,8 @@ public interface DuplicateIDCache { void addToCache(byte[] duplicateID, Transaction tx) throws Exception; + int getSize(); + /** * it will add the data to the cache. * If TX == null it won't use a transaction. diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/InMemoryDuplicateIDCache.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/InMemoryDuplicateIDCache.java index 63c94d7123..7b07d95fdb 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/InMemoryDuplicateIDCache.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/InMemoryDuplicateIDCache.java @@ -277,4 +277,9 @@ final class InMemoryDuplicateIDCache implements DuplicateIDCache { return null; } } + + @Override + public int getSize() { + return cacheSize; + } } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/NoOpDuplicateIDCache.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/NoOpDuplicateIDCache.java index 998074ed05..4cea18e87d 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/NoOpDuplicateIDCache.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/NoOpDuplicateIDCache.java @@ -74,4 +74,9 @@ public final class NoOpDuplicateIDCache implements DuplicateIDCache { public List> getMap() { return Collections.emptyList(); } + + @Override + public int getSize() { + return 0; + } } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PersistentDuplicateIDCache.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PersistentDuplicateIDCache.java index dcaed7c2fe..2ce82aebc7 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PersistentDuplicateIDCache.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PersistentDuplicateIDCache.java @@ -396,4 +396,9 @@ final class PersistentDuplicateIDCache implements DuplicateIDCache { } } + @Override + public int getSize() { + return cacheSize; + } + } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java index 1e21bb2639..cad4d3f3d9 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java @@ -60,6 +60,8 @@ import org.apache.activemq.artemis.core.message.impl.CoreMessage; import org.apache.activemq.artemis.core.paging.PagingManager; import org.apache.activemq.artemis.core.paging.PagingStore; import org.apache.activemq.artemis.core.persistence.StorageManager; +import org.apache.activemq.artemis.core.persistence.config.AbstractPersistedAddressSetting; +import org.apache.activemq.artemis.core.persistence.config.PersistedAddressSettingJSON; import org.apache.activemq.artemis.core.postoffice.AddressManager; import org.apache.activemq.artemis.core.postoffice.Binding; import org.apache.activemq.artemis.core.postoffice.BindingType; @@ -1443,15 +1445,22 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding @Override public DuplicateIDCache getDuplicateIDCache(final SimpleString address) { int resolvedIdCacheSize = resolveIdCacheSize(address); - return getDuplicateIDCache(address, resolvedIdCacheSize); + return getDuplicateIDCache(address, resolvedIdCacheSize, false); } @Override public DuplicateIDCache getDuplicateIDCache(final SimpleString address, int cacheSizeToUse) { + return getDuplicateIDCache(address, cacheSizeToUse, true); + } + + private DuplicateIDCache getDuplicateIDCache(final SimpleString address, int cacheSizeToUse, boolean allowRegistration) { DuplicateIDCache cache = duplicateIDCaches.get(address); if (cache == null) { if (persistIDCache) { + if (allowRegistration) { + registerCacheSize(address, cacheSizeToUse); + } cache = DuplicateIDCaches.persistent(address, cacheSizeToUse, storageManager); } else { cache = DuplicateIDCaches.inMemory(address, cacheSizeToUse); @@ -1467,6 +1476,22 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding return cache; } + private void registerCacheSize(SimpleString address, int cacheSizeToUse) { + AbstractPersistedAddressSetting recordedSetting = storageManager.recoverAddressSettings(address); + if (recordedSetting == null || recordedSetting.getSetting().getIDCacheSize() == null || recordedSetting.getSetting().getIDCacheSize().intValue() != cacheSizeToUse) { + AddressSettings settings = recordedSetting != null ? recordedSetting.getSetting() : new AddressSettings(); + settings.setIDCacheSize(cacheSizeToUse); + server.getAddressSettingsRepository().addMatch(address.toString(), settings); + try { + storageManager.storeAddressSetting(new PersistedAddressSettingJSON(address, settings, settings.toJSON())); + } catch (Exception e) { + // nothing could be done here, we just log + // if an exception is happening, if IO is compromised the server will eventually be shutdown + ActiveMQServerLogger.LOGGER.errorRegisteringDuplicateCacheSize(String.valueOf(address), e); + } + } + } + public ConcurrentMap getDuplicateIDCaches() { return duplicateIDCaches; } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java index 19e7facfe3..b9fba03ffe 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java @@ -1614,4 +1614,7 @@ public interface ActiveMQServerLogger { @LogMessage(id = 224137, value = "Skipping SSL auto reload for sources of store {} because of {}", level = LogMessage.Level.WARN) void skipSSLAutoReloadForSourcesOfStore(String storePath, String reason); + + @LogMessage(id = 224138, value = "Error Registering DuplicateCacheSize on namespace {}", level = LogMessage.Level.WARN) + void errorRegisteringDuplicateCacheSize(String address, Exception reason); } diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImplTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImplTest.java index e486690b17..34eced285b 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImplTest.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImplTest.java @@ -332,6 +332,11 @@ public class TransactionImplTest extends ServerTestBase { } + @Override + public AbstractPersistedAddressSetting recoverAddressSettings(SimpleString address) { + return null; + } + @Override public void asyncCommit(long txID) throws Exception { diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SendAckFailTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SendAckFailTest.java index 41904d7c8b..3ff194c8cd 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SendAckFailTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SendAckFailTest.java @@ -303,6 +303,11 @@ public class SendAckFailTest extends SpawnedTestBase { return manager.onLargeMessageCreate(id, largeMessage); } + @Override + public AbstractPersistedAddressSetting recoverAddressSettings(SimpleString address) { + return null; + } + @Override public void stop() throws Exception { manager.stop(); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/ResizeDuplicateCacheTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/ResizeDuplicateCacheTest.java new file mode 100644 index 0000000000..eac81d9cba --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/ResizeDuplicateCacheTest.java @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.persistence; + +import java.nio.charset.StandardCharsets; +import java.util.HashMap; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.api.core.management.ActiveMQServerControl; +import org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds; +import org.apache.activemq.artemis.core.postoffice.DuplicateIDCache; +import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.core.settings.impl.AddressSettings; +import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; +import org.apache.activemq.artemis.utils.RandomUtil; +import org.junit.Assert; +import org.junit.Test; + +public class ResizeDuplicateCacheTest extends ActiveMQTestBase { + + @Test + public void testReloadCache() throws Exception { + internalReloadCache(false, null); + } + + @Test + public void testReloadCacheAdditionalSettings() throws Exception { + // this variance will add a settings on the same namespace for something else + // this will validate that the setting should be merged + internalReloadCache(true, null); + } + + @Test + public void testReloadCacheAdditionalSettingsValueSet() throws Exception { + // this variance will add a settings on the same namespace for something else + // this will validate that the setting should be merged + internalReloadCache(true, 50_000); + } + + public void internalReloadCache(boolean additionalSettings, Integer preExistingCacheValue) throws Exception { + int duplicateSize = 30; + SimpleString randomString = RandomUtil.randomSimpleString(); + + ActiveMQServer server = createServer(true, false); + server.start(); + + ActiveMQServerControl serverControl = server.getActiveMQServerControl(); + + if (additionalSettings) { + AddressSettings settings = new AddressSettings().setDefaultRingSize(3333); + if (preExistingCacheValue != null) { + settings.setIDCacheSize(preExistingCacheValue); + } + serverControl.addAddressSettings(randomString.toString(), settings.toJSON()); + String json = serverControl.getAddressSettingsAsJSON(randomString.toString()); + AddressSettings settingsFromJson = AddressSettings.fromJSON(json); + Assert.assertEquals(preExistingCacheValue, settingsFromJson.getIDCacheSize()); + Assert.assertEquals(3333, settingsFromJson.getDefaultRingSize()); + } + + DuplicateIDCache duplicateIDCache = server.getPostOffice().getDuplicateIDCache(randomString, duplicateSize); + + for (int i = 0; i < duplicateSize * 2; i++) { + duplicateIDCache.addToCache(("a" + i).getBytes(StandardCharsets.UTF_8)); + } + + server.stop(); + server = createServer(true, false); + server.start(); + serverControl = server.getActiveMQServerControl(); + + duplicateIDCache = server.getPostOffice().getDuplicateIDCache(randomString, duplicateSize); + + Assert.assertEquals(duplicateSize, duplicateIDCache.getSize()); + + server.getStorageManager().getMessageJournal().scheduleCompactAndBlock(10_000); + HashMap records = countJournal(server.getConfiguration()); + + AtomicInteger duplicateRecordsCount = records.get((int) JournalRecordIds.DUPLICATE_ID); + Assert.assertNotNull(duplicateRecordsCount); + Assert.assertEquals(duplicateSize, duplicateRecordsCount.get()); + + if (additionalSettings) { + String json = serverControl.getAddressSettingsAsJSON(randomString.toString()); + + // checking if the previous value was merged as expected + AddressSettings settingsFromJson = AddressSettings.fromJSON(json); + Assert.assertEquals(Integer.valueOf(duplicateSize), settingsFromJson.getIDCacheSize()); + Assert.assertEquals(3333, settingsFromJson.getDefaultRingSize()); + } + + server.stop(); + } +} \ No newline at end of file diff --git a/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/brokerConnection/mirror/SingleMirrorSoakTest.java b/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/brokerConnection/mirror/SingleMirrorSoakTest.java index c93d419ff9..196bbc4767 100644 --- a/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/brokerConnection/mirror/SingleMirrorSoakTest.java +++ b/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/brokerConnection/mirror/SingleMirrorSoakTest.java @@ -27,14 +27,18 @@ import javax.jms.Topic; import java.io.File; import java.io.StringWriter; import java.lang.invoke.MethodHandles; +import java.util.HashMap; import java.util.Properties; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.activemq.artemis.api.core.management.SimpleManagement; import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectionAddressType; +import org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds; +import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.tests.soak.SoakTestBase; import org.apache.activemq.artemis.tests.util.CFUtil; import org.apache.activemq.artemis.util.ServerUtil; @@ -62,6 +66,13 @@ public class SingleMirrorSoakTest extends SoakTestBase { private static final int SEND_COMMIT = TestParameters.testProperty(TEST_NAME, "SEND_COMMIT", 100); private static final int KILL_INTERNAL = TestParameters.testProperty(TEST_NAME, "KILL_INTERVAL", 500); private static final int SNF_TIMEOUT = TestParameters.testProperty(TEST_NAME, "SNF_TIMEOUT", 60_000); + private static final int GENERAL_WAIT_TIMEOUT = TestParameters.testProperty(TEST_NAME, "GENERAL_TIMEOUT", 10_000); + + /* + * Time each consumer takes to process a message received to allow some messages accumulating. + * This sleep happens right before the commit. + */ + private static final int CONSUMER_PROCESSING_TIME = TestParameters.testProperty(TEST_NAME, "CONSUMER_PROCESSING_TIME", 0); private static final String TOPIC_NAME = "topicTest"; @@ -82,12 +93,16 @@ public class SingleMirrorSoakTest extends SoakTestBase { volatile Process processDC2; @After - public void destroyServers() { + public void destroyServers() throws Exception { if (processDC1 != null) { processDC1.destroyForcibly(); + processDC1.waitFor(1, TimeUnit.MINUTES); + processDC1 = null; } if (processDC2 != null) { processDC2.destroyForcibly(); + processDC2.waitFor(1, TimeUnit.MINUTES); + processDC2 = null; } } @@ -121,6 +136,15 @@ public class SingleMirrorSoakTest extends SoakTestBase { brokerProperties.put("largeMessageSync", "false"); brokerProperties.put("mirrorAckManagerPageAttempts", "10"); brokerProperties.put("mirrorAckManagerRetryDelay", "1000"); + + if (paging) { + brokerProperties.put("addressSettings.#.maxSizeMessages", "1"); + brokerProperties.put("addressSettings.#.maxReadPageMessages", "1000"); + brokerProperties.put("addressSettings.#.maxReadPageBytes", "-1"); + brokerProperties.put("addressSettings.#.prefetchPageMessages", "100"); + // un-comment this line if you want to rather use the work around without the fix on the PostOfficeImpl + // brokerProperties.put("addressSettings.#.iDCacheSize", "1000"); + } // if we don't use pageTransactions we may eventually get a few duplicates brokerProperties.put("mirrorPageTransaction", "true"); File brokerPropertiesFile = new File(serverLocation, "broker.properties"); @@ -130,11 +154,6 @@ public class SingleMirrorSoakTest extends SoakTestBase { Assert.assertTrue(brokerXml.exists()); // Adding redistribution delay to broker configuration Assert.assertTrue(FileUtil.findReplace(brokerXml, "", "\n\n" + " 0 \n")); - if (paging) { - Assert.assertTrue(FileUtil.findReplace(brokerXml, "-1", "1")); - Assert.assertTrue(FileUtil.findReplace(brokerXml, "20M", "-1")); - Assert.assertTrue(FileUtil.findReplace(brokerXml, "-1", "100000\n" + " 10000")); - } if (TRACE_LOGS) { File log4j = new File(serverLocation, "/etc/log4j2.properties"); @@ -248,10 +267,27 @@ public class SingleMirrorSoakTest extends SoakTestBase { Wait.assertEquals(0, () -> getCount(managementDC1, snfQueue), SNF_TIMEOUT); Wait.assertEquals(0, () -> getCount(managementDC2, snfQueue), SNF_TIMEOUT); - Wait.assertEquals(0, () -> getCount(managementDC1, clientIDA + "." + subscriptionID), 10_000); - Wait.assertEquals(0, () -> getCount(managementDC1, clientIDB + "." + subscriptionID), 10_000); - Wait.assertEquals(0, () -> getCount(managementDC2, clientIDA + "." + subscriptionID), 10_000); - Wait.assertEquals(0, () -> getCount(managementDC2, clientIDB + "." + subscriptionID), 10_000); + Wait.assertEquals(0, () -> getCount(managementDC1, clientIDA + "." + subscriptionID), GENERAL_WAIT_TIMEOUT); + Wait.assertEquals(0, () -> getCount(managementDC1, clientIDB + "." + subscriptionID), GENERAL_WAIT_TIMEOUT); + Wait.assertEquals(0, () -> getCount(managementDC2, clientIDA + "." + subscriptionID), GENERAL_WAIT_TIMEOUT); + Wait.assertEquals(0, () -> getCount(managementDC2, clientIDB + "." + subscriptionID), GENERAL_WAIT_TIMEOUT); + + destroyServers(); + + // counting the number of records on duplicate cache + // to validate if ARTEMIS-4765 is fixed + ActiveMQServer server = createServer(true, false); + server.getConfiguration().setJournalDirectory(getServerLocation(DC2_NODE) + "/data/journal"); + server.getConfiguration().setBindingsDirectory(getServerLocation(DC2_NODE) + "/data/bindings"); + server.getConfiguration().setPagingDirectory(getServerLocation(DC2_NODE) + "/data/paging"); + server.start(); + server.getStorageManager().getMessageJournal().scheduleCompactAndBlock(10_000); + HashMap records = countJournal(server.getConfiguration()); + AtomicInteger duplicateRecordsCount = records.get((int) JournalRecordIds.DUPLICATE_ID); + Assert.assertNotNull(duplicateRecordsCount); + // 1000 credits by default + Assert.assertTrue(duplicateRecordsCount.get() <= 1000); + } private static void consume(ConnectionFactory factory, @@ -283,6 +319,9 @@ public class SingleMirrorSoakTest extends SoakTestBase { logger.debug("Consumed {}, large={}", i, message.getBooleanProperty("large")); pendingCommit++; if (pendingCommit >= batchCommit) { + if (CONSUMER_PROCESSING_TIME > 0) { + Thread.sleep(CONSUMER_PROCESSING_TIME); + } logger.info("received {}", i); session.commit(); pendingCommit = 0; @@ -301,7 +340,6 @@ public class SingleMirrorSoakTest extends SoakTestBase { public long getCount(SimpleManagement simpleManagement, String queue) throws Exception { try { long value = simpleManagement.getMessageCountOnQueue(queue); - logger.debug("count on queue {} is {}", queue, value); return value; } catch (Exception e) { logger.warn(e.getMessage(), e);