ARTEMIS-3261 Fixing tests and allowing configuration to reload data files on start
This commit is contained in:
parent
792fc7f80e
commit
62395dcd44
|
@ -25,6 +25,7 @@ import org.apache.activemq.artemis.core.config.Configuration;
|
||||||
import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
|
import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
|
||||||
import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory;
|
import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory;
|
||||||
import org.apache.activemq.artemis.core.journal.impl.JournalImpl;
|
import org.apache.activemq.artemis.core.journal.impl.JournalImpl;
|
||||||
|
import org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds;
|
||||||
|
|
||||||
@Command(name = "compact", description = "Compacts the journal of a non running server")
|
@Command(name = "compact", description = "Compacts the journal of a non running server")
|
||||||
public final class CompactJournal extends LockAbstract {
|
public final class CompactJournal extends LockAbstract {
|
||||||
|
@ -34,10 +35,7 @@ public final class CompactJournal extends LockAbstract {
|
||||||
super.execute(context);
|
super.execute(context);
|
||||||
try {
|
try {
|
||||||
Configuration configuration = getFileConfiguration();
|
Configuration configuration = getFileConfiguration();
|
||||||
compactJournal(new File(getJournal()), "activemq-data", "amq", configuration.getJournalMinFiles(), configuration.getJournalPoolFiles(), configuration.getJournalFileSize(), null);
|
compactJournals(configuration);
|
||||||
System.out.println("Compactation succeeded for " + getJournal());
|
|
||||||
compactJournal(new File(getBinding()), "activemq-bindings", "bindings", 2, 2, 1048576, null);
|
|
||||||
System.out.println("Compactation succeeded for " + getBinding());
|
|
||||||
|
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
treatError(e, "data", "compact");
|
treatError(e, "data", "compact");
|
||||||
|
@ -45,16 +43,30 @@ public final class CompactJournal extends LockAbstract {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
private void compactJournal(final File directory,
|
public static void compactJournals(Configuration configuration) throws Exception {
|
||||||
|
compactJournal(configuration.getJournalLocation(), "activemq-data", "amq", configuration.getJournalMinFiles(),
|
||||||
|
configuration.getJournalPoolFiles(), configuration.getJournalFileSize(), null, JournalRecordIds.UPDATE_DELIVERY_COUNT,
|
||||||
|
JournalRecordIds.SET_SCHEDULED_DELIVERY_TIME);
|
||||||
|
System.out.println("Compactation succeeded for " + configuration.getJournalLocation().getAbsolutePath());
|
||||||
|
compactJournal(configuration.getBindingsLocation(), "activemq-bindings", "bindings", 2, 2, 1048576, null);
|
||||||
|
System.out.println("Compactation succeeded for " + configuration.getBindingsLocation());
|
||||||
|
}
|
||||||
|
|
||||||
|
public static void compactJournal(final File directory,
|
||||||
final String journalPrefix,
|
final String journalPrefix,
|
||||||
final String journalSuffix,
|
final String journalSuffix,
|
||||||
final int minFiles,
|
final int minFiles,
|
||||||
final int poolFiles,
|
final int poolFiles,
|
||||||
final int fileSize,
|
final int fileSize,
|
||||||
final IOCriticalErrorListener listener) throws Exception {
|
final IOCriticalErrorListener listener,
|
||||||
|
int... replaceableRecords) throws Exception {
|
||||||
NIOSequentialFileFactory nio = new NIOSequentialFileFactory(directory, listener, 1);
|
NIOSequentialFileFactory nio = new NIOSequentialFileFactory(directory, listener, 1);
|
||||||
|
|
||||||
JournalImpl journal = new JournalImpl(fileSize, minFiles, poolFiles, 0, 0, nio, journalPrefix, journalSuffix, 1);
|
JournalImpl journal = new JournalImpl(fileSize, minFiles, poolFiles, 0, 0, nio, journalPrefix, journalSuffix, 1);
|
||||||
|
for (int i : replaceableRecords) {
|
||||||
|
journal.replaceableRecord(i);
|
||||||
|
}
|
||||||
|
journal.setRemoveExtraFilesOnLoad(true);
|
||||||
|
|
||||||
journal.start();
|
journal.start();
|
||||||
|
|
||||||
|
|
|
@ -62,6 +62,16 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal {
|
||||||
// Sync Delay in ms
|
// Sync Delay in ms
|
||||||
//private static final int SYNC_DELAY = 5;
|
//private static final int SYNC_DELAY = 5;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void setRemoveExtraFilesOnLoad(boolean removeExtraFilesOnLoad) {
|
||||||
|
// no op on JDBC
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean isRemoveExtraFilesOnLoad() {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
private long syncDelay;
|
private long syncDelay;
|
||||||
|
|
||||||
private static int USER_VERSION = 1;
|
private static int USER_VERSION = 1;
|
||||||
|
|
|
@ -58,6 +58,10 @@ public interface Journal extends ActiveMQComponent {
|
||||||
LOADED;
|
LOADED;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void setRemoveExtraFilesOnLoad(boolean removeExtraFilesOnLoad);
|
||||||
|
|
||||||
|
boolean isRemoveExtraFilesOnLoad();
|
||||||
|
|
||||||
// Non transactional operations
|
// Non transactional operations
|
||||||
|
|
||||||
void appendAddRecord(long id, byte recordType, byte[] record, boolean sync) throws Exception;
|
void appendAddRecord(long id, byte recordType, byte[] record, boolean sync) throws Exception;
|
||||||
|
|
|
@ -28,6 +28,17 @@ abstract class JournalBase implements Journal {
|
||||||
|
|
||||||
protected final int fileSize;
|
protected final int fileSize;
|
||||||
private final boolean supportsCallback;
|
private final boolean supportsCallback;
|
||||||
|
protected boolean removeExtraFilesOnLoad = false;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void setRemoveExtraFilesOnLoad(boolean setting) {
|
||||||
|
this.removeExtraFilesOnLoad = setting;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean isRemoveExtraFilesOnLoad() {
|
||||||
|
return removeExtraFilesOnLoad;
|
||||||
|
}
|
||||||
|
|
||||||
JournalBase(boolean supportsCallback, int fileSize) {
|
JournalBase(boolean supportsCallback, int fileSize) {
|
||||||
if (fileSize < JournalImpl.MIN_FILE_SIZE) {
|
if (fileSize < JournalImpl.MIN_FILE_SIZE) {
|
||||||
|
|
|
@ -2195,7 +2195,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
|
||||||
} else {
|
} else {
|
||||||
if (changeData) {
|
if (changeData) {
|
||||||
// Empty dataFiles with no data
|
// Empty dataFiles with no data
|
||||||
filesRepository.addFreeFile(file, false, true);
|
filesRepository.addFreeFile(file, false, isRemoveExtraFilesOnLoad());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -957,6 +957,7 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
|
||||||
readLock();
|
readLock();
|
||||||
try {
|
try {
|
||||||
|
|
||||||
|
messageJournal.setRemoveExtraFilesOnLoad(true);
|
||||||
JournalLoadInformation info = messageJournal.load(records, preparedTransactions, new LargeMessageTXFailureCallback(this));
|
JournalLoadInformation info = messageJournal.load(records, preparedTransactions, new LargeMessageTXFailureCallback(this));
|
||||||
|
|
||||||
ArrayList<LargeServerMessage> largeMessages = new ArrayList<>();
|
ArrayList<LargeServerMessage> largeMessages = new ArrayList<>();
|
||||||
|
@ -1606,6 +1607,8 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
|
||||||
|
|
||||||
List<PreparedTransactionInfo> preparedTransactions = new ArrayList<>();
|
List<PreparedTransactionInfo> preparedTransactions = new ArrayList<>();
|
||||||
|
|
||||||
|
bindingsJournal.setRemoveExtraFilesOnLoad(true);
|
||||||
|
|
||||||
JournalLoadInformation bindingsInfo = bindingsJournal.load(records, preparedTransactions, null);
|
JournalLoadInformation bindingsInfo = bindingsJournal.load(records, preparedTransactions, null);
|
||||||
|
|
||||||
HashMap<Long, PersistentQueueBindingEncoding> mapBindings = new HashMap<>();
|
HashMap<Long, PersistentQueueBindingEncoding> mapBindings = new HashMap<>();
|
||||||
|
|
|
@ -54,6 +54,16 @@ public class ReplicatedJournal implements Journal {
|
||||||
|
|
||||||
private final byte journalID;
|
private final byte journalID;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void setRemoveExtraFilesOnLoad(boolean removeExtraFilesOnLoad) {
|
||||||
|
this.localJournal.setRemoveExtraFilesOnLoad(removeExtraFilesOnLoad);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean isRemoveExtraFilesOnLoad() {
|
||||||
|
return localJournal.isRemoveExtraFilesOnLoad();
|
||||||
|
}
|
||||||
|
|
||||||
public ReplicatedJournal(final byte journalID,
|
public ReplicatedJournal(final byte journalID,
|
||||||
final Journal localJournal,
|
final Journal localJournal,
|
||||||
final ReplicationManager replicationManager) {
|
final ReplicationManager replicationManager) {
|
||||||
|
|
|
@ -31,6 +31,7 @@ import java.util.concurrent.atomic.AtomicInteger;
|
||||||
import org.apache.activemq.artemis.api.core.QueueConfiguration;
|
import org.apache.activemq.artemis.api.core.QueueConfiguration;
|
||||||
import org.apache.activemq.artemis.api.core.RoutingType;
|
import org.apache.activemq.artemis.api.core.RoutingType;
|
||||||
import org.apache.activemq.artemis.api.core.TransportConfiguration;
|
import org.apache.activemq.artemis.api.core.TransportConfiguration;
|
||||||
|
import org.apache.activemq.artemis.cli.commands.tools.journal.CompactJournal;
|
||||||
import org.apache.activemq.artemis.core.config.Configuration;
|
import org.apache.activemq.artemis.core.config.Configuration;
|
||||||
import org.apache.activemq.artemis.core.config.ha.ReplicaPolicyConfiguration;
|
import org.apache.activemq.artemis.core.config.ha.ReplicaPolicyConfiguration;
|
||||||
import org.apache.activemq.artemis.core.config.ha.SharedStoreSlavePolicyConfiguration;
|
import org.apache.activemq.artemis.core.config.ha.SharedStoreSlavePolicyConfiguration;
|
||||||
|
@ -57,17 +58,19 @@ public class InfiniteRedeliveryTest extends ActiveMQTestBase {
|
||||||
|
|
||||||
private static final Logger logger = Logger.getLogger(InfiniteRedeliveryTest.class);
|
private static final Logger logger = Logger.getLogger(InfiniteRedeliveryTest.class);
|
||||||
|
|
||||||
@Parameterized.Parameters(name = "protocol={0}")
|
@Parameterized.Parameters(name = "protocol={0}, useCLI={1}")
|
||||||
public static Collection getParameters() {
|
public static Collection getParameters() {
|
||||||
return Arrays.asList(new Object[][]{{"CORE"}, {"AMQP"}, {"OPENWIRE"}});
|
return Arrays.asList(new Object[][]{{"CORE", true}, {"AMQP", false}, {"OPENWIRE", false}});
|
||||||
}
|
}
|
||||||
|
|
||||||
public InfiniteRedeliveryTest(String protocol) {
|
public InfiniteRedeliveryTest(String protocol, boolean useCLI) {
|
||||||
this.protocol = protocol;
|
this.protocol = protocol;
|
||||||
|
this.useCLI = useCLI;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
String protocol;
|
String protocol;
|
||||||
|
boolean useCLI;
|
||||||
|
|
||||||
TestableServer liveServer;
|
TestableServer liveServer;
|
||||||
TestableServer backupServer;
|
TestableServer backupServer;
|
||||||
|
@ -171,15 +174,23 @@ public class InfiniteRedeliveryTest extends ActiveMQTestBase {
|
||||||
}
|
}
|
||||||
connection.close();
|
connection.close();
|
||||||
|
|
||||||
liveServer.getServer().getStorageManager().getMessageJournal().scheduleCompactAndBlock(5000);
|
if (!useCLI) {
|
||||||
backupServer.getServer().getStorageManager().getMessageJournal().scheduleCompactAndBlock(5000);
|
liveServer.getServer().getStorageManager().getMessageJournal().scheduleCompactAndBlock(5000);
|
||||||
|
backupServer.getServer().getStorageManager().getMessageJournal().scheduleCompactAndBlock(5000);
|
||||||
|
}
|
||||||
|
|
||||||
|
liveServer.stop();
|
||||||
|
backupServer.stop();
|
||||||
|
|
||||||
|
if (useCLI) {
|
||||||
|
CompactJournal.compactJournals(backupServer.getServer().getConfiguration());
|
||||||
|
CompactJournal.compactJournals(liveServer.getServer().getConfiguration());
|
||||||
|
}
|
||||||
|
|
||||||
HashMap<Integer, AtomicInteger> counts = countJournal(liveServer.getServer().getConfiguration());
|
HashMap<Integer, AtomicInteger> counts = countJournal(liveServer.getServer().getConfiguration());
|
||||||
counts.forEach((k, v) -> logger.debug(k + "=" + v));
|
counts.forEach((k, v) -> logger.debug(k + "=" + v));
|
||||||
counts.forEach((k, v) -> Assert.assertTrue("Record type " + k + " has a lot of records:" + v, v.intValue() < 20));
|
counts.forEach((k, v) -> Assert.assertTrue("Record type " + k + " has a lot of records:" + v, v.intValue() < 20));
|
||||||
|
|
||||||
backupServer.stop();
|
|
||||||
|
|
||||||
HashMap<Integer, AtomicInteger> backupCounts = countJournal(backupServer.getServer().getConfiguration());
|
HashMap<Integer, AtomicInteger> backupCounts = countJournal(backupServer.getServer().getConfiguration());
|
||||||
Assert.assertTrue(backupCounts.size() > 0);
|
Assert.assertTrue(backupCounts.size() > 0);
|
||||||
backupCounts.forEach((k, v) -> logger.debug("On Backup:" + k + "=" + v));
|
backupCounts.forEach((k, v) -> logger.debug("On Backup:" + k + "=" + v));
|
||||||
|
|
|
@ -0,0 +1,43 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.activemq.artemis.tests.integration.journal;
|
||||||
|
|
||||||
|
import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
||||||
|
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
|
||||||
|
import org.apache.activemq.artemis.utils.Wait;
|
||||||
|
import org.junit.Assert;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
public class ShrinkDataOnStartTest extends ActiveMQTestBase {
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void shrinkDataOnStart() throws Exception {
|
||||||
|
|
||||||
|
ActiveMQServer server = addServer(createServer(true));
|
||||||
|
server.getConfiguration().setJournalMinFiles(10);
|
||||||
|
server.getConfiguration().setJournalPoolFiles(2);
|
||||||
|
server.start();
|
||||||
|
Wait.waitFor(server::isActive);
|
||||||
|
Assert.assertEquals(10, server.getStorageManager().getMessageJournal().getFileFactory().listFiles("amq").size());
|
||||||
|
server.stop();
|
||||||
|
server.getConfiguration().setJournalMinFiles(2);
|
||||||
|
server.getConfiguration().setJournalPoolFiles(2);
|
||||||
|
server.start();
|
||||||
|
Assert.assertEquals(2, server.getStorageManager().getMessageJournal().getFileFactory().listFiles("amq").size());
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -647,6 +647,16 @@ public final class ReplicationTest extends ActiveMQTestBase {
|
||||||
|
|
||||||
static final class FakeJournal implements Journal {
|
static final class FakeJournal implements Journal {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void setRemoveExtraFilesOnLoad(boolean removeExtraFilesOnLoad) {
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean isRemoveExtraFilesOnLoad() {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void appendAddRecord(long id,
|
public void appendAddRecord(long id,
|
||||||
byte recordType,
|
byte recordType,
|
||||||
|
|
|
@ -459,6 +459,33 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase {
|
||||||
stopJournal();
|
stopJournal();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testReduceFreeFilesWithCleanup() throws Exception {
|
||||||
|
setup(10, 10 * 1024, true);
|
||||||
|
createJournal();
|
||||||
|
startJournal();
|
||||||
|
load();
|
||||||
|
|
||||||
|
List<String> files1 = fileFactory.listFiles(fileExtension);
|
||||||
|
|
||||||
|
Assert.assertEquals(10, files1.size());
|
||||||
|
|
||||||
|
stopJournal();
|
||||||
|
|
||||||
|
setup(5, 10 * 1024, true);
|
||||||
|
createJournal();
|
||||||
|
journal.setRemoveExtraFilesOnLoad(true);
|
||||||
|
startJournal();
|
||||||
|
load();
|
||||||
|
|
||||||
|
List<String> files2 = fileFactory.listFiles(fileExtension);
|
||||||
|
|
||||||
|
Assert.assertEquals(5, files2.size());
|
||||||
|
|
||||||
|
stopJournal();
|
||||||
|
}
|
||||||
|
|
||||||
private int calculateRecordsPerFile(final int fileSize, final int alignment, int recordSize) {
|
private int calculateRecordsPerFile(final int fileSize, final int alignment, int recordSize) {
|
||||||
recordSize = calculateRecordSize(recordSize, alignment);
|
recordSize = calculateRecordSize(recordSize, alignment);
|
||||||
int headerSize = calculateRecordSize(JournalImpl.SIZE_HEADER, alignment);
|
int headerSize = calculateRecordSize(JournalImpl.SIZE_HEADER, alignment);
|
||||||
|
|
Loading…
Reference in New Issue