ARTEMIS-4765 DuplicateIDCache on Mirror Target is using 20K elements instead of amqpCredits
in this commit I'm storing a binding record with the address-settings for the correct size this is also validating eventual merges of the AddressSettings in the same namespace.
This commit is contained in:
parent
2a43c53bb2
commit
cd563b49ad
|
@ -377,6 +377,8 @@ public interface StorageManager extends IDGenerator, ActiveMQComponent {
|
|||
|
||||
List<AbstractPersistedAddressSetting> recoverAddressSettings() throws Exception;
|
||||
|
||||
AbstractPersistedAddressSetting recoverAddressSettings(SimpleString address);
|
||||
|
||||
void storeSecuritySetting(PersistedSecuritySetting persistedRoles) throws Exception;
|
||||
|
||||
void deleteSecuritySetting(SimpleString addressMatch) throws Exception;
|
||||
|
|
|
@ -751,6 +751,11 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
|
|||
return new ArrayList<>(mapPersistedAddressSettings.values());
|
||||
}
|
||||
|
||||
@Override
|
||||
public AbstractPersistedAddressSetting recoverAddressSettings(SimpleString address) {
|
||||
return mapPersistedAddressSettings.get(address);
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<PersistedSecuritySetting> recoverSecuritySettings() throws Exception {
|
||||
return new ArrayList<>(mapPersistedSecuritySettings.values());
|
||||
|
|
|
@ -80,6 +80,11 @@ public class NullStorageManager implements StorageManager {
|
|||
|
||||
private final IOCriticalErrorListener ioCriticalErrorListener;
|
||||
|
||||
@Override
|
||||
public AbstractPersistedAddressSetting recoverAddressSettings(SimpleString address) {
|
||||
return null;
|
||||
}
|
||||
|
||||
public NullStorageManager(IOCriticalErrorListener ioCriticalErrorListener) {
|
||||
this.ioCriticalErrorListener = ioCriticalErrorListener;
|
||||
}
|
||||
|
|
|
@ -31,6 +31,8 @@ public interface DuplicateIDCache {
|
|||
|
||||
void addToCache(byte[] duplicateID, Transaction tx) throws Exception;
|
||||
|
||||
int getSize();
|
||||
|
||||
/**
|
||||
* it will add the data to the cache.
|
||||
* If TX == null it won't use a transaction.
|
||||
|
|
|
@ -277,4 +277,9 @@ final class InMemoryDuplicateIDCache implements DuplicateIDCache {
|
|||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getSize() {
|
||||
return cacheSize;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -74,4 +74,9 @@ public final class NoOpDuplicateIDCache implements DuplicateIDCache {
|
|||
public List<Pair<byte[], Long>> getMap() {
|
||||
return Collections.emptyList();
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getSize() {
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -396,4 +396,9 @@ final class PersistentDuplicateIDCache implements DuplicateIDCache {
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getSize() {
|
||||
return cacheSize;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -60,6 +60,8 @@ import org.apache.activemq.artemis.core.message.impl.CoreMessage;
|
|||
import org.apache.activemq.artemis.core.paging.PagingManager;
|
||||
import org.apache.activemq.artemis.core.paging.PagingStore;
|
||||
import org.apache.activemq.artemis.core.persistence.StorageManager;
|
||||
import org.apache.activemq.artemis.core.persistence.config.AbstractPersistedAddressSetting;
|
||||
import org.apache.activemq.artemis.core.persistence.config.PersistedAddressSettingJSON;
|
||||
import org.apache.activemq.artemis.core.postoffice.AddressManager;
|
||||
import org.apache.activemq.artemis.core.postoffice.Binding;
|
||||
import org.apache.activemq.artemis.core.postoffice.BindingType;
|
||||
|
@ -1443,15 +1445,22 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
|
|||
@Override
|
||||
public DuplicateIDCache getDuplicateIDCache(final SimpleString address) {
|
||||
int resolvedIdCacheSize = resolveIdCacheSize(address);
|
||||
return getDuplicateIDCache(address, resolvedIdCacheSize);
|
||||
return getDuplicateIDCache(address, resolvedIdCacheSize, false);
|
||||
}
|
||||
|
||||
@Override
|
||||
public DuplicateIDCache getDuplicateIDCache(final SimpleString address, int cacheSizeToUse) {
|
||||
return getDuplicateIDCache(address, cacheSizeToUse, true);
|
||||
}
|
||||
|
||||
private DuplicateIDCache getDuplicateIDCache(final SimpleString address, int cacheSizeToUse, boolean allowRegistration) {
|
||||
DuplicateIDCache cache = duplicateIDCaches.get(address);
|
||||
|
||||
if (cache == null) {
|
||||
if (persistIDCache) {
|
||||
if (allowRegistration) {
|
||||
registerCacheSize(address, cacheSizeToUse);
|
||||
}
|
||||
cache = DuplicateIDCaches.persistent(address, cacheSizeToUse, storageManager);
|
||||
} else {
|
||||
cache = DuplicateIDCaches.inMemory(address, cacheSizeToUse);
|
||||
|
@ -1467,6 +1476,22 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
|
|||
return cache;
|
||||
}
|
||||
|
||||
private void registerCacheSize(SimpleString address, int cacheSizeToUse) {
|
||||
AbstractPersistedAddressSetting recordedSetting = storageManager.recoverAddressSettings(address);
|
||||
if (recordedSetting == null || recordedSetting.getSetting().getIDCacheSize() == null || recordedSetting.getSetting().getIDCacheSize().intValue() != cacheSizeToUse) {
|
||||
AddressSettings settings = recordedSetting != null ? recordedSetting.getSetting() : new AddressSettings();
|
||||
settings.setIDCacheSize(cacheSizeToUse);
|
||||
server.getAddressSettingsRepository().addMatch(address.toString(), settings);
|
||||
try {
|
||||
storageManager.storeAddressSetting(new PersistedAddressSettingJSON(address, settings, settings.toJSON()));
|
||||
} catch (Exception e) {
|
||||
// nothing could be done here, we just log
|
||||
// if an exception is happening, if IO is compromised the server will eventually be shutdown
|
||||
ActiveMQServerLogger.LOGGER.errorRegisteringDuplicateCacheSize(String.valueOf(address), e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public ConcurrentMap<SimpleString, DuplicateIDCache> getDuplicateIDCaches() {
|
||||
return duplicateIDCaches;
|
||||
}
|
||||
|
|
|
@ -1614,4 +1614,7 @@ public interface ActiveMQServerLogger {
|
|||
|
||||
@LogMessage(id = 224137, value = "Skipping SSL auto reload for sources of store {} because of {}", level = LogMessage.Level.WARN)
|
||||
void skipSSLAutoReloadForSourcesOfStore(String storePath, String reason);
|
||||
|
||||
@LogMessage(id = 224138, value = "Error Registering DuplicateCacheSize on namespace {}", level = LogMessage.Level.WARN)
|
||||
void errorRegisteringDuplicateCacheSize(String address, Exception reason);
|
||||
}
|
||||
|
|
|
@ -332,6 +332,11 @@ public class TransactionImplTest extends ServerTestBase {
|
|||
|
||||
}
|
||||
|
||||
@Override
|
||||
public AbstractPersistedAddressSetting recoverAddressSettings(SimpleString address) {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void asyncCommit(long txID) throws Exception {
|
||||
|
||||
|
|
|
@ -303,6 +303,11 @@ public class SendAckFailTest extends SpawnedTestBase {
|
|||
return manager.onLargeMessageCreate(id, largeMessage);
|
||||
}
|
||||
|
||||
@Override
|
||||
public AbstractPersistedAddressSetting recoverAddressSettings(SimpleString address) {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void stop() throws Exception {
|
||||
manager.stop();
|
||||
|
|
|
@ -0,0 +1,109 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.artemis.tests.integration.persistence;
|
||||
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.HashMap;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||
import org.apache.activemq.artemis.api.core.management.ActiveMQServerControl;
|
||||
import org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds;
|
||||
import org.apache.activemq.artemis.core.postoffice.DuplicateIDCache;
|
||||
import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
||||
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
|
||||
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
|
||||
import org.apache.activemq.artemis.utils.RandomUtil;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
public class ResizeDuplicateCacheTest extends ActiveMQTestBase {
|
||||
|
||||
@Test
|
||||
public void testReloadCache() throws Exception {
|
||||
internalReloadCache(false, null);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testReloadCacheAdditionalSettings() throws Exception {
|
||||
// this variance will add a settings on the same namespace for something else
|
||||
// this will validate that the setting should be merged
|
||||
internalReloadCache(true, null);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testReloadCacheAdditionalSettingsValueSet() throws Exception {
|
||||
// this variance will add a settings on the same namespace for something else
|
||||
// this will validate that the setting should be merged
|
||||
internalReloadCache(true, 50_000);
|
||||
}
|
||||
|
||||
public void internalReloadCache(boolean additionalSettings, Integer preExistingCacheValue) throws Exception {
|
||||
int duplicateSize = 30;
|
||||
SimpleString randomString = RandomUtil.randomSimpleString();
|
||||
|
||||
ActiveMQServer server = createServer(true, false);
|
||||
server.start();
|
||||
|
||||
ActiveMQServerControl serverControl = server.getActiveMQServerControl();
|
||||
|
||||
if (additionalSettings) {
|
||||
AddressSettings settings = new AddressSettings().setDefaultRingSize(3333);
|
||||
if (preExistingCacheValue != null) {
|
||||
settings.setIDCacheSize(preExistingCacheValue);
|
||||
}
|
||||
serverControl.addAddressSettings(randomString.toString(), settings.toJSON());
|
||||
String json = serverControl.getAddressSettingsAsJSON(randomString.toString());
|
||||
AddressSettings settingsFromJson = AddressSettings.fromJSON(json);
|
||||
Assert.assertEquals(preExistingCacheValue, settingsFromJson.getIDCacheSize());
|
||||
Assert.assertEquals(3333, settingsFromJson.getDefaultRingSize());
|
||||
}
|
||||
|
||||
DuplicateIDCache duplicateIDCache = server.getPostOffice().getDuplicateIDCache(randomString, duplicateSize);
|
||||
|
||||
for (int i = 0; i < duplicateSize * 2; i++) {
|
||||
duplicateIDCache.addToCache(("a" + i).getBytes(StandardCharsets.UTF_8));
|
||||
}
|
||||
|
||||
server.stop();
|
||||
server = createServer(true, false);
|
||||
server.start();
|
||||
serverControl = server.getActiveMQServerControl();
|
||||
|
||||
duplicateIDCache = server.getPostOffice().getDuplicateIDCache(randomString, duplicateSize);
|
||||
|
||||
Assert.assertEquals(duplicateSize, duplicateIDCache.getSize());
|
||||
|
||||
server.getStorageManager().getMessageJournal().scheduleCompactAndBlock(10_000);
|
||||
HashMap<Integer, AtomicInteger> records = countJournal(server.getConfiguration());
|
||||
|
||||
AtomicInteger duplicateRecordsCount = records.get((int) JournalRecordIds.DUPLICATE_ID);
|
||||
Assert.assertNotNull(duplicateRecordsCount);
|
||||
Assert.assertEquals(duplicateSize, duplicateRecordsCount.get());
|
||||
|
||||
if (additionalSettings) {
|
||||
String json = serverControl.getAddressSettingsAsJSON(randomString.toString());
|
||||
|
||||
// checking if the previous value was merged as expected
|
||||
AddressSettings settingsFromJson = AddressSettings.fromJSON(json);
|
||||
Assert.assertEquals(Integer.valueOf(duplicateSize), settingsFromJson.getIDCacheSize());
|
||||
Assert.assertEquals(3333, settingsFromJson.getDefaultRingSize());
|
||||
}
|
||||
|
||||
server.stop();
|
||||
}
|
||||
}
|
|
@ -27,14 +27,18 @@ import javax.jms.Topic;
|
|||
import java.io.File;
|
||||
import java.io.StringWriter;
|
||||
import java.lang.invoke.MethodHandles;
|
||||
import java.util.HashMap;
|
||||
import java.util.Properties;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import org.apache.activemq.artemis.api.core.management.SimpleManagement;
|
||||
import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectionAddressType;
|
||||
import org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds;
|
||||
import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
||||
import org.apache.activemq.artemis.tests.soak.SoakTestBase;
|
||||
import org.apache.activemq.artemis.tests.util.CFUtil;
|
||||
import org.apache.activemq.artemis.util.ServerUtil;
|
||||
|
@ -62,6 +66,13 @@ public class SingleMirrorSoakTest extends SoakTestBase {
|
|||
private static final int SEND_COMMIT = TestParameters.testProperty(TEST_NAME, "SEND_COMMIT", 100);
|
||||
private static final int KILL_INTERNAL = TestParameters.testProperty(TEST_NAME, "KILL_INTERVAL", 500);
|
||||
private static final int SNF_TIMEOUT = TestParameters.testProperty(TEST_NAME, "SNF_TIMEOUT", 60_000);
|
||||
private static final int GENERAL_WAIT_TIMEOUT = TestParameters.testProperty(TEST_NAME, "GENERAL_TIMEOUT", 10_000);
|
||||
|
||||
/*
|
||||
* Time each consumer takes to process a message received to allow some messages accumulating.
|
||||
* This sleep happens right before the commit.
|
||||
*/
|
||||
private static final int CONSUMER_PROCESSING_TIME = TestParameters.testProperty(TEST_NAME, "CONSUMER_PROCESSING_TIME", 0);
|
||||
|
||||
private static final String TOPIC_NAME = "topicTest";
|
||||
|
||||
|
@ -82,12 +93,16 @@ public class SingleMirrorSoakTest extends SoakTestBase {
|
|||
volatile Process processDC2;
|
||||
|
||||
@After
|
||||
public void destroyServers() {
|
||||
public void destroyServers() throws Exception {
|
||||
if (processDC1 != null) {
|
||||
processDC1.destroyForcibly();
|
||||
processDC1.waitFor(1, TimeUnit.MINUTES);
|
||||
processDC1 = null;
|
||||
}
|
||||
if (processDC2 != null) {
|
||||
processDC2.destroyForcibly();
|
||||
processDC2.waitFor(1, TimeUnit.MINUTES);
|
||||
processDC2 = null;
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -121,6 +136,15 @@ public class SingleMirrorSoakTest extends SoakTestBase {
|
|||
brokerProperties.put("largeMessageSync", "false");
|
||||
brokerProperties.put("mirrorAckManagerPageAttempts", "10");
|
||||
brokerProperties.put("mirrorAckManagerRetryDelay", "1000");
|
||||
|
||||
if (paging) {
|
||||
brokerProperties.put("addressSettings.#.maxSizeMessages", "1");
|
||||
brokerProperties.put("addressSettings.#.maxReadPageMessages", "1000");
|
||||
brokerProperties.put("addressSettings.#.maxReadPageBytes", "-1");
|
||||
brokerProperties.put("addressSettings.#.prefetchPageMessages", "100");
|
||||
// un-comment this line if you want to rather use the work around without the fix on the PostOfficeImpl
|
||||
// brokerProperties.put("addressSettings.#.iDCacheSize", "1000");
|
||||
}
|
||||
// if we don't use pageTransactions we may eventually get a few duplicates
|
||||
brokerProperties.put("mirrorPageTransaction", "true");
|
||||
File brokerPropertiesFile = new File(serverLocation, "broker.properties");
|
||||
|
@ -130,11 +154,6 @@ public class SingleMirrorSoakTest extends SoakTestBase {
|
|||
Assert.assertTrue(brokerXml.exists());
|
||||
// Adding redistribution delay to broker configuration
|
||||
Assert.assertTrue(FileUtil.findReplace(brokerXml, "<address-setting match=\"#\">", "<address-setting match=\"#\">\n\n" + " <redistribution-delay>0</redistribution-delay> <!-- added by SimpleMirrorSoakTest.java --> \n"));
|
||||
if (paging) {
|
||||
Assert.assertTrue(FileUtil.findReplace(brokerXml, "<max-size-messages>-1</max-size-messages>", "<max-size-messages>1</max-size-messages>"));
|
||||
Assert.assertTrue(FileUtil.findReplace(brokerXml, "<max-read-page-bytes>20M</max-read-page-bytes>", "<max-read-page-bytes>-1</max-read-page-bytes>"));
|
||||
Assert.assertTrue(FileUtil.findReplace(brokerXml, "<max-read-page-messages>-1</max-read-page-messages>", "<max-read-page-messages>100000</max-read-page-messages>\n" + " <prefetch-page-messages>10000</prefetch-page-messages>"));
|
||||
}
|
||||
|
||||
if (TRACE_LOGS) {
|
||||
File log4j = new File(serverLocation, "/etc/log4j2.properties");
|
||||
|
@ -248,10 +267,27 @@ public class SingleMirrorSoakTest extends SoakTestBase {
|
|||
|
||||
Wait.assertEquals(0, () -> getCount(managementDC1, snfQueue), SNF_TIMEOUT);
|
||||
Wait.assertEquals(0, () -> getCount(managementDC2, snfQueue), SNF_TIMEOUT);
|
||||
Wait.assertEquals(0, () -> getCount(managementDC1, clientIDA + "." + subscriptionID), 10_000);
|
||||
Wait.assertEquals(0, () -> getCount(managementDC1, clientIDB + "." + subscriptionID), 10_000);
|
||||
Wait.assertEquals(0, () -> getCount(managementDC2, clientIDA + "." + subscriptionID), 10_000);
|
||||
Wait.assertEquals(0, () -> getCount(managementDC2, clientIDB + "." + subscriptionID), 10_000);
|
||||
Wait.assertEquals(0, () -> getCount(managementDC1, clientIDA + "." + subscriptionID), GENERAL_WAIT_TIMEOUT);
|
||||
Wait.assertEquals(0, () -> getCount(managementDC1, clientIDB + "." + subscriptionID), GENERAL_WAIT_TIMEOUT);
|
||||
Wait.assertEquals(0, () -> getCount(managementDC2, clientIDA + "." + subscriptionID), GENERAL_WAIT_TIMEOUT);
|
||||
Wait.assertEquals(0, () -> getCount(managementDC2, clientIDB + "." + subscriptionID), GENERAL_WAIT_TIMEOUT);
|
||||
|
||||
destroyServers();
|
||||
|
||||
// counting the number of records on duplicate cache
|
||||
// to validate if ARTEMIS-4765 is fixed
|
||||
ActiveMQServer server = createServer(true, false);
|
||||
server.getConfiguration().setJournalDirectory(getServerLocation(DC2_NODE) + "/data/journal");
|
||||
server.getConfiguration().setBindingsDirectory(getServerLocation(DC2_NODE) + "/data/bindings");
|
||||
server.getConfiguration().setPagingDirectory(getServerLocation(DC2_NODE) + "/data/paging");
|
||||
server.start();
|
||||
server.getStorageManager().getMessageJournal().scheduleCompactAndBlock(10_000);
|
||||
HashMap<Integer, AtomicInteger> records = countJournal(server.getConfiguration());
|
||||
AtomicInteger duplicateRecordsCount = records.get((int) JournalRecordIds.DUPLICATE_ID);
|
||||
Assert.assertNotNull(duplicateRecordsCount);
|
||||
// 1000 credits by default
|
||||
Assert.assertTrue(duplicateRecordsCount.get() <= 1000);
|
||||
|
||||
}
|
||||
|
||||
private static void consume(ConnectionFactory factory,
|
||||
|
@ -283,6 +319,9 @@ public class SingleMirrorSoakTest extends SoakTestBase {
|
|||
logger.debug("Consumed {}, large={}", i, message.getBooleanProperty("large"));
|
||||
pendingCommit++;
|
||||
if (pendingCommit >= batchCommit) {
|
||||
if (CONSUMER_PROCESSING_TIME > 0) {
|
||||
Thread.sleep(CONSUMER_PROCESSING_TIME);
|
||||
}
|
||||
logger.info("received {}", i);
|
||||
session.commit();
|
||||
pendingCommit = 0;
|
||||
|
@ -301,7 +340,6 @@ public class SingleMirrorSoakTest extends SoakTestBase {
|
|||
public long getCount(SimpleManagement simpleManagement, String queue) throws Exception {
|
||||
try {
|
||||
long value = simpleManagement.getMessageCountOnQueue(queue);
|
||||
logger.debug("count on queue {} is {}", queue, value);
|
||||
return value;
|
||||
} catch (Exception e) {
|
||||
logger.warn(e.getMessage(), e);
|
||||
|
|
Loading…
Reference in New Issue