Reverting ARTEMIS-4765
I am gettig a few test failures when running the server in soak with Mirroring. I will reapply the changes after some testing.
This commit is contained in:
parent
946d3338b3
commit
abb1a7739e
|
@ -31,10 +31,6 @@ public interface DuplicateIDCache {
|
||||||
|
|
||||||
void addToCache(byte[] duplicateID, Transaction tx) throws Exception;
|
void addToCache(byte[] duplicateID, Transaction tx) throws Exception;
|
||||||
|
|
||||||
int getSize();
|
|
||||||
|
|
||||||
DuplicateIDCache resize(int newSize);
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* it will add the data to the cache.
|
* it will add the data to the cache.
|
||||||
* If TX == null it won't use a transaction.
|
* If TX == null it won't use a transaction.
|
||||||
|
|
|
@ -58,7 +58,7 @@ final class InMemoryDuplicateIDCache implements DuplicateIDCache {
|
||||||
|
|
||||||
private int pos;
|
private int pos;
|
||||||
|
|
||||||
private int cacheSize;
|
private final int cacheSize;
|
||||||
|
|
||||||
InMemoryDuplicateIDCache(final SimpleString address, final int size) {
|
InMemoryDuplicateIDCache(final SimpleString address, final int size) {
|
||||||
this.address = address;
|
this.address = address;
|
||||||
|
@ -75,18 +75,6 @@ final class InMemoryDuplicateIDCache implements DuplicateIDCache {
|
||||||
logger.debug("address = {} ignore loading ids: in memory cache won't load previously stored ids", address);
|
logger.debug("address = {} ignore loading ids: in memory cache won't load previously stored ids", address);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public int getSize() {
|
|
||||||
return cacheSize;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public synchronized DuplicateIDCache resize(int newSize) {
|
|
||||||
newSize = Math.max(cache.size(), newSize);
|
|
||||||
this.cacheSize = newSize;
|
|
||||||
return this;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void deleteFromCache(byte[] duplicateID) {
|
public void deleteFromCache(byte[] duplicateID) {
|
||||||
deleteFromCache(new ByteArray(duplicateID));
|
deleteFromCache(new ByteArray(duplicateID));
|
||||||
|
|
|
@ -70,16 +70,6 @@ public final class NoOpDuplicateIDCache implements DuplicateIDCache {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public int getSize() {
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public DuplicateIDCache resize(int newSize) {
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public List<Pair<byte[], Long>> getMap() {
|
public List<Pair<byte[], Long>> getMap() {
|
||||||
return Collections.emptyList();
|
return Collections.emptyList();
|
||||||
|
|
|
@ -62,7 +62,7 @@ final class PersistentDuplicateIDCache implements DuplicateIDCache {
|
||||||
|
|
||||||
private int pos;
|
private int pos;
|
||||||
|
|
||||||
private int cacheSize;
|
private final int cacheSize;
|
||||||
|
|
||||||
private final StorageManager storageManager;
|
private final StorageManager storageManager;
|
||||||
|
|
||||||
|
@ -173,20 +173,6 @@ final class PersistentDuplicateIDCache implements DuplicateIDCache {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public int getSize() {
|
|
||||||
return cacheSize;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public synchronized DuplicateIDCache resize(int newSize) {
|
|
||||||
// We won't be shrinking items here
|
|
||||||
newSize = Math.max(cache.size(), newSize);
|
|
||||||
this.cacheSize = newSize;
|
|
||||||
logger.trace("newSize = {} after math.min check", newSize);
|
|
||||||
return this;
|
|
||||||
}
|
|
||||||
|
|
||||||
private static String describeID(byte[] duplicateID) {
|
private static String describeID(byte[] duplicateID) {
|
||||||
return ByteUtil.bytesToHex(duplicateID, 4) + ", simpleString=" + ByteUtil.toSimpleString(duplicateID);
|
return ByteUtil.bytesToHex(duplicateID, 4) + ", simpleString=" + ByteUtil.toSimpleString(duplicateID);
|
||||||
}
|
}
|
||||||
|
|
|
@ -1464,10 +1464,6 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (cache.getSize() != cacheSizeToUse) {
|
|
||||||
cache.resize(cacheSizeToUse);
|
|
||||||
}
|
|
||||||
|
|
||||||
return cache;
|
return cache;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1,64 +0,0 @@
|
||||||
/*
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
|
||||||
* contributor license agreements. See the NOTICE file distributed with
|
|
||||||
* this work for additional information regarding copyright ownership.
|
|
||||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
|
||||||
* (the "License"); you may not use this file except in compliance with
|
|
||||||
* the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
package org.apache.activemq.artemis.tests.integration.persistence;
|
|
||||||
|
|
||||||
import java.nio.charset.StandardCharsets;
|
|
||||||
import java.util.HashMap;
|
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
|
||||||
|
|
||||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
|
||||||
import org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds;
|
|
||||||
import org.apache.activemq.artemis.core.postoffice.DuplicateIDCache;
|
|
||||||
import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
|
||||||
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
|
|
||||||
import org.apache.activemq.artemis.utils.RandomUtil;
|
|
||||||
import org.junit.Assert;
|
|
||||||
import org.junit.Test;
|
|
||||||
|
|
||||||
public class ResizeDuplicateCacheTest extends ActiveMQTestBase {
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testResizeCache() throws Exception {
|
|
||||||
int duplicateSize = 30;
|
|
||||||
SimpleString randomString = RandomUtil.randomSimpleString();
|
|
||||||
|
|
||||||
ActiveMQServer server = createServer(true, false);
|
|
||||||
server.start();
|
|
||||||
|
|
||||||
DuplicateIDCache duplicateIDCache = server.getPostOffice().getDuplicateIDCache(randomString, duplicateSize);
|
|
||||||
|
|
||||||
for (int i = 0; i < duplicateSize * 2; i++) {
|
|
||||||
duplicateIDCache.addToCache(("a" + i).getBytes(StandardCharsets.UTF_8));
|
|
||||||
}
|
|
||||||
|
|
||||||
server.stop();
|
|
||||||
server.start();
|
|
||||||
|
|
||||||
duplicateIDCache = server.getPostOffice().getDuplicateIDCache(randomString, duplicateSize);
|
|
||||||
|
|
||||||
Assert.assertEquals(duplicateSize, duplicateIDCache.getSize());
|
|
||||||
|
|
||||||
server.getStorageManager().getMessageJournal().scheduleCompactAndBlock(10_000);
|
|
||||||
HashMap<Integer, AtomicInteger> records = countJournal(server.getConfiguration());
|
|
||||||
|
|
||||||
AtomicInteger duplicateRecordsCount = records.get((int) JournalRecordIds.DUPLICATE_ID);
|
|
||||||
Assert.assertNotNull(duplicateRecordsCount);
|
|
||||||
Assert.assertEquals(duplicateSize, duplicateRecordsCount.get());
|
|
||||||
|
|
||||||
server.stop();
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -27,18 +27,14 @@ import javax.jms.Topic;
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
import java.io.StringWriter;
|
import java.io.StringWriter;
|
||||||
import java.lang.invoke.MethodHandles;
|
import java.lang.invoke.MethodHandles;
|
||||||
import java.util.HashMap;
|
|
||||||
import java.util.Properties;
|
import java.util.Properties;
|
||||||
import java.util.concurrent.ExecutorService;
|
import java.util.concurrent.ExecutorService;
|
||||||
import java.util.concurrent.Executors;
|
import java.util.concurrent.Executors;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
|
||||||
|
|
||||||
import org.apache.activemq.artemis.api.core.management.SimpleManagement;
|
import org.apache.activemq.artemis.api.core.management.SimpleManagement;
|
||||||
import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectionAddressType;
|
import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectionAddressType;
|
||||||
import org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds;
|
|
||||||
import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
|
||||||
import org.apache.activemq.artemis.tests.soak.SoakTestBase;
|
import org.apache.activemq.artemis.tests.soak.SoakTestBase;
|
||||||
import org.apache.activemq.artemis.tests.util.CFUtil;
|
import org.apache.activemq.artemis.tests.util.CFUtil;
|
||||||
import org.apache.activemq.artemis.util.ServerUtil;
|
import org.apache.activemq.artemis.util.ServerUtil;
|
||||||
|
@ -66,13 +62,6 @@ public class SingleMirrorSoakTest extends SoakTestBase {
|
||||||
private static final int SEND_COMMIT = TestParameters.testProperty(TEST_NAME, "SEND_COMMIT", 100);
|
private static final int SEND_COMMIT = TestParameters.testProperty(TEST_NAME, "SEND_COMMIT", 100);
|
||||||
private static final int KILL_INTERNAL = TestParameters.testProperty(TEST_NAME, "KILL_INTERVAL", 500);
|
private static final int KILL_INTERNAL = TestParameters.testProperty(TEST_NAME, "KILL_INTERVAL", 500);
|
||||||
private static final int SNF_TIMEOUT = TestParameters.testProperty(TEST_NAME, "SNF_TIMEOUT", 60_000);
|
private static final int SNF_TIMEOUT = TestParameters.testProperty(TEST_NAME, "SNF_TIMEOUT", 60_000);
|
||||||
private static final int GENERAL_WAIT_TIMEOUT = TestParameters.testProperty(TEST_NAME, "GENERAL_TIMEOUT", 10_000);
|
|
||||||
|
|
||||||
/*
|
|
||||||
* Time each consumer takes to process a message received to allow some messages accumulating.
|
|
||||||
* This sleep happens right before the commit.
|
|
||||||
*/
|
|
||||||
private static final int CONSUMER_PROCESSING_TIME = TestParameters.testProperty(TEST_NAME, "CONSUMER_PROCESSING_TIME", 0);
|
|
||||||
|
|
||||||
private static final String TOPIC_NAME = "topicTest";
|
private static final String TOPIC_NAME = "topicTest";
|
||||||
|
|
||||||
|
@ -93,16 +82,12 @@ public class SingleMirrorSoakTest extends SoakTestBase {
|
||||||
volatile Process processDC2;
|
volatile Process processDC2;
|
||||||
|
|
||||||
@After
|
@After
|
||||||
public void destroyServers() throws Exception {
|
public void destroyServers() {
|
||||||
if (processDC1 != null) {
|
if (processDC1 != null) {
|
||||||
processDC1.destroyForcibly();
|
processDC1.destroyForcibly();
|
||||||
processDC1.waitFor(1, TimeUnit.MINUTES);
|
|
||||||
processDC1 = null;
|
|
||||||
}
|
}
|
||||||
if (processDC2 != null) {
|
if (processDC2 != null) {
|
||||||
processDC2.destroyForcibly();
|
processDC2.destroyForcibly();
|
||||||
processDC2.waitFor(1, TimeUnit.MINUTES);
|
|
||||||
processDC2 = null;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -136,15 +121,6 @@ public class SingleMirrorSoakTest extends SoakTestBase {
|
||||||
brokerProperties.put("largeMessageSync", "false");
|
brokerProperties.put("largeMessageSync", "false");
|
||||||
brokerProperties.put("mirrorAckManagerPageAttempts", "10");
|
brokerProperties.put("mirrorAckManagerPageAttempts", "10");
|
||||||
brokerProperties.put("mirrorAckManagerRetryDelay", "1000");
|
brokerProperties.put("mirrorAckManagerRetryDelay", "1000");
|
||||||
|
|
||||||
if (paging) {
|
|
||||||
brokerProperties.put("addressSettings.#.maxSizeMessages", "1");
|
|
||||||
brokerProperties.put("addressSettings.#.maxReadPageMessages", "1000");
|
|
||||||
brokerProperties.put("addressSettings.#.maxReadPageBytes", "-1");
|
|
||||||
brokerProperties.put("addressSettings.#.prefetchPageMessages", "100");
|
|
||||||
// un-comment this line if you want to rather use the work around without the fix on the PostOfficeImpl
|
|
||||||
// brokerProperties.put("addressSettings.#.iDCacheSize", "1000");
|
|
||||||
}
|
|
||||||
// if we don't use pageTransactions we may eventually get a few duplicates
|
// if we don't use pageTransactions we may eventually get a few duplicates
|
||||||
brokerProperties.put("mirrorPageTransaction", "true");
|
brokerProperties.put("mirrorPageTransaction", "true");
|
||||||
File brokerPropertiesFile = new File(serverLocation, "broker.properties");
|
File brokerPropertiesFile = new File(serverLocation, "broker.properties");
|
||||||
|
@ -154,6 +130,11 @@ public class SingleMirrorSoakTest extends SoakTestBase {
|
||||||
Assert.assertTrue(brokerXml.exists());
|
Assert.assertTrue(brokerXml.exists());
|
||||||
// Adding redistribution delay to broker configuration
|
// Adding redistribution delay to broker configuration
|
||||||
Assert.assertTrue(FileUtil.findReplace(brokerXml, "<address-setting match=\"#\">", "<address-setting match=\"#\">\n\n" + " <redistribution-delay>0</redistribution-delay> <!-- added by SimpleMirrorSoakTest.java --> \n"));
|
Assert.assertTrue(FileUtil.findReplace(brokerXml, "<address-setting match=\"#\">", "<address-setting match=\"#\">\n\n" + " <redistribution-delay>0</redistribution-delay> <!-- added by SimpleMirrorSoakTest.java --> \n"));
|
||||||
|
if (paging) {
|
||||||
|
Assert.assertTrue(FileUtil.findReplace(brokerXml, "<max-size-messages>-1</max-size-messages>", "<max-size-messages>1</max-size-messages>"));
|
||||||
|
Assert.assertTrue(FileUtil.findReplace(brokerXml, "<max-read-page-bytes>20M</max-read-page-bytes>", "<max-read-page-bytes>-1</max-read-page-bytes>"));
|
||||||
|
Assert.assertTrue(FileUtil.findReplace(brokerXml, "<max-read-page-messages>-1</max-read-page-messages>", "<max-read-page-messages>100000</max-read-page-messages>\n" + " <prefetch-page-messages>10000</prefetch-page-messages>"));
|
||||||
|
}
|
||||||
|
|
||||||
if (TRACE_LOGS) {
|
if (TRACE_LOGS) {
|
||||||
File log4j = new File(serverLocation, "/etc/log4j2.properties");
|
File log4j = new File(serverLocation, "/etc/log4j2.properties");
|
||||||
|
@ -267,27 +248,10 @@ public class SingleMirrorSoakTest extends SoakTestBase {
|
||||||
|
|
||||||
Wait.assertEquals(0, () -> getCount(managementDC1, snfQueue), SNF_TIMEOUT);
|
Wait.assertEquals(0, () -> getCount(managementDC1, snfQueue), SNF_TIMEOUT);
|
||||||
Wait.assertEquals(0, () -> getCount(managementDC2, snfQueue), SNF_TIMEOUT);
|
Wait.assertEquals(0, () -> getCount(managementDC2, snfQueue), SNF_TIMEOUT);
|
||||||
Wait.assertEquals(0, () -> getCount(managementDC1, clientIDA + "." + subscriptionID), GENERAL_WAIT_TIMEOUT);
|
Wait.assertEquals(0, () -> getCount(managementDC1, clientIDA + "." + subscriptionID), 10_000);
|
||||||
Wait.assertEquals(0, () -> getCount(managementDC1, clientIDB + "." + subscriptionID), GENERAL_WAIT_TIMEOUT);
|
Wait.assertEquals(0, () -> getCount(managementDC1, clientIDB + "." + subscriptionID), 10_000);
|
||||||
Wait.assertEquals(0, () -> getCount(managementDC2, clientIDA + "." + subscriptionID), GENERAL_WAIT_TIMEOUT);
|
Wait.assertEquals(0, () -> getCount(managementDC2, clientIDA + "." + subscriptionID), 10_000);
|
||||||
Wait.assertEquals(0, () -> getCount(managementDC2, clientIDB + "." + subscriptionID), GENERAL_WAIT_TIMEOUT);
|
Wait.assertEquals(0, () -> getCount(managementDC2, clientIDB + "." + subscriptionID), 10_000);
|
||||||
|
|
||||||
destroyServers();
|
|
||||||
|
|
||||||
// counting the number of records on duplicate cache
|
|
||||||
// to validate if ARTEMIS-4765 is fixed
|
|
||||||
ActiveMQServer server = createServer(true, false);
|
|
||||||
server.getConfiguration().setJournalDirectory(getServerLocation(DC2_NODE) + "/data/journal");
|
|
||||||
server.getConfiguration().setBindingsDirectory(getServerLocation(DC2_NODE) + "/data/bindings");
|
|
||||||
server.getConfiguration().setPagingDirectory(getServerLocation(DC2_NODE) + "/data/paging");
|
|
||||||
server.start();
|
|
||||||
server.getStorageManager().getMessageJournal().scheduleCompactAndBlock(10_000);
|
|
||||||
HashMap<Integer, AtomicInteger> records = countJournal(server.getConfiguration());
|
|
||||||
AtomicInteger duplicateRecordsCount = records.get((int) JournalRecordIds.DUPLICATE_ID);
|
|
||||||
Assert.assertNotNull(duplicateRecordsCount);
|
|
||||||
// 1000 credits by default
|
|
||||||
Assert.assertTrue(duplicateRecordsCount.get() <= 1000);
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private static void consume(ConnectionFactory factory,
|
private static void consume(ConnectionFactory factory,
|
||||||
|
@ -319,9 +283,6 @@ public class SingleMirrorSoakTest extends SoakTestBase {
|
||||||
logger.debug("Consumed {}, large={}", i, message.getBooleanProperty("large"));
|
logger.debug("Consumed {}, large={}", i, message.getBooleanProperty("large"));
|
||||||
pendingCommit++;
|
pendingCommit++;
|
||||||
if (pendingCommit >= batchCommit) {
|
if (pendingCommit >= batchCommit) {
|
||||||
if (CONSUMER_PROCESSING_TIME > 0) {
|
|
||||||
Thread.sleep(CONSUMER_PROCESSING_TIME);
|
|
||||||
}
|
|
||||||
logger.info("received {}", i);
|
logger.info("received {}", i);
|
||||||
session.commit();
|
session.commit();
|
||||||
pendingCommit = 0;
|
pendingCommit = 0;
|
||||||
|
@ -340,6 +301,7 @@ public class SingleMirrorSoakTest extends SoakTestBase {
|
||||||
public long getCount(SimpleManagement simpleManagement, String queue) throws Exception {
|
public long getCount(SimpleManagement simpleManagement, String queue) throws Exception {
|
||||||
try {
|
try {
|
||||||
long value = simpleManagement.getMessageCountOnQueue(queue);
|
long value = simpleManagement.getMessageCountOnQueue(queue);
|
||||||
|
logger.debug("count on queue {} is {}", queue, value);
|
||||||
return value;
|
return value;
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
logger.warn(e.getMessage(), e);
|
logger.warn(e.getMessage(), e);
|
||||||
|
|
Loading…
Reference in New Issue