ARTEMIS-612 Improving Failback's max replication

The server will always restart now, with older files being removed.
The system will now move current data into ./oldreplica.#, and remove old ones.
All the logic for moving these files is encapsulated at FileMoveManager.
This commit is contained in:
Clebert Suconic 2016-06-27 14:52:59 -04:00 committed by jbertram
parent 62f414fd35
commit 8154120027
14 changed files with 746 additions and 131 deletions

View File

@ -40,6 +40,7 @@ import org.apache.activemq.artemis.core.paging.cursor.PageCursorProvider;
import org.apache.activemq.artemis.core.paging.cursor.impl.PageCursorProviderImpl;
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.impl.FileMoveManager;
import org.apache.activemq.artemis.core.settings.HierarchicalRepository;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.utils.ExecutorFactory;
@ -147,7 +148,11 @@ public class PagingStoreFactoryNIO implements PagingStoreFactory {
final File addressFile = new File(file, PagingStoreFactoryNIO.ADDRESS_FILE);
if (!addressFile.exists()) {
ActiveMQServerLogger.LOGGER.pageStoreFactoryNoIdFile(file.toString(), PagingStoreFactoryNIO.ADDRESS_FILE);
// This means this folder is from a replication copy, nothing to worry about it, we just skip it
if (!file.getName().contains(FileMoveManager.PREFIX)) {
ActiveMQServerLogger.LOGGER.pageStoreFactoryNoIdFile(file.toString(), PagingStoreFactoryNIO.ADDRESS_FILE);
}
continue;
}

View File

@ -94,6 +94,12 @@ public interface ActiveMQServer extends ActiveMQComponent {
NodeManager getNodeManager();
/** it will release hold a lock for the activation. */
void unlockActivation();
/** it will hold a lock for the activation. This will prevent the activation from happening. */
void lockActivation();
/**
* Returns the resource to manage this ActiveMQ Artemis server.
*

View File

@ -998,7 +998,7 @@ public interface ActiveMQServerLogger extends BasicLogger {
@Message(id = 222161, value = "Group Handler timed-out waiting for sendCondition", format = Message.Format.MESSAGE_FORMAT)
void groupHandlerSendTimeout();
@LogMessage(level = Logger.Level.WARN)
@LogMessage(level = Logger.Level.INFO)
@Message(id = 222162, value = "Moving data directory {0} to {1}", format = Message.Format.MESSAGE_FORMAT)
void backupMovingDataAway(String oldPath, String newPath);
@ -1219,6 +1219,11 @@ public interface ActiveMQServerLogger extends BasicLogger {
format = Message.Format.MESSAGE_FORMAT)
void sslHandshakeFailed(String clientAddress, String cause);
@LogMessage(level = Logger.Level.INFO)
@Message(id = 222209, value = "There were too many old replicated folders upon startup, removing {0}",
format = Message.Format.MESSAGE_FORMAT)
void removingBackupData(String path);
@LogMessage(level = Logger.Level.ERROR)
@Message(id = 224000, value = "Failure in initialisation", format = Message.Format.MESSAGE_FORMAT)
void initializationError(@Cause Throwable e);

View File

@ -19,7 +19,7 @@ package org.apache.activemq.artemis.core.server.impl;
import javax.management.MBeanServer;
import javax.security.cert.X509Certificate;
import java.io.File;
import java.io.FilenameFilter;
import java.io.IOException;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.lang.management.ManagementFactory;
@ -39,6 +39,7 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
@ -253,6 +254,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
private final Map<String, ServerSession> sessions = new ConcurrentHashMap<>();
private final Semaphore activationLock = new Semaphore(1);
/**
* This class here has the same principle of CountDownLatch but you can reuse the counters.
* It's based on the same super classes of {@code CountDownLatch}
@ -436,7 +438,10 @@ public class ActiveMQServerImpl implements ActiveMQServer {
activation = haPolicy.createActivation(this, wasLive, activationParams, shutdownOnCriticalIO);
}
backupActivationThread = new Thread(activation, ActiveMQMessageBundle.BUNDLE.activationForServer(this));
if (logger.isTraceEnabled()) {
logger.trace("starting backupActivation");
}
backupActivationThread = new ActivationThread(activation, ActiveMQMessageBundle.BUNDLE.activationForServer(this));
backupActivationThread.start();
}
else {
@ -452,6 +457,21 @@ public class ActiveMQServerImpl implements ActiveMQServer {
}
}
@Override
public void unlockActivation() {
activationLock.release();
}
@Override
public void lockActivation() {
try {
activationLock.acquire();
}
catch (Exception e) {
logger.warn(e.getMessage(), e);
}
}
@Override
protected final void finalize() throws Throwable {
if (state != SERVER_STATE.STOPPED) {
@ -510,6 +530,9 @@ public class ActiveMQServerImpl implements ActiveMQServer {
@Override
public void setHAPolicy(HAPolicy haPolicy) {
if (logger.isTraceEnabled()) {
logger.tracef("XXX @@@ Setting %s, isBackup=%s at %s", haPolicy, haPolicy.isBackup(), this);
}
this.haPolicy = haPolicy;
}
@ -707,6 +730,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
* @param criticalIOError whether we have encountered an IO error with the journal etc
*/
void stop(boolean failoverOnServerShutdown, final boolean criticalIOError, boolean restarting) {
synchronized (this) {
if (state == SERVER_STATE.STOPPED || state == SERVER_STATE.STOPPING) {
return;
@ -2202,7 +2226,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
/**
* Check if journal directory exists or create it (if configured to do so)
*/
void checkJournalDirectory() {
public void checkJournalDirectory() {
File journalDir = configuration.getJournalLocation();
if (!journalDir.exists() && configuration.isPersistenceEnabled()) {
@ -2269,86 +2293,18 @@ public class ActiveMQServerImpl implements ActiveMQServer {
return scaledDownNodeIDs.contains(scaledDownNodeId);
}
int countNumberOfCopiedJournals() {
//will use the main journal to check for how many backups have been kept
File journalDir = new File(configuration.getJournalDirectory());
final String fileName = journalDir.getName();
int numberOfbackupsSaved = 0;
//fine if it doesn't exist, we aren't using file based persistence so it's no issue
if (journalDir.exists()) {
File parentFile = new File(journalDir.getParent());
String[] backupJournals = parentFile.list(new FilenameFilter() {
@Override
public boolean accept(File dir, String name) {
return name.startsWith(fileName) && !name.matches(fileName);
}
});
numberOfbackupsSaved = backupJournals != null ? backupJournals.length : 0;
}
return numberOfbackupsSaved;
}
/**
* Move data away before starting data synchronization for fail-back.
* <p>
* Use case is a server, upon restarting, finding a former backup running in its place. It will
* move any older data away and log a warning about it.
*/
void moveServerData() {
void moveServerData(int maxSavedReplicated) throws IOException {
File[] dataDirs = new File[]{configuration.getBindingsLocation(), configuration.getJournalLocation(), configuration.getPagingLocation(), configuration.getLargeMessagesLocation()};
boolean allEmpty = true;
int lowestSuffixForMovedData = 1;
boolean redo = true;
while (redo) {
redo = false;
for (File fDir : dataDirs) {
if (fDir.exists()) {
if (!fDir.isDirectory()) {
throw ActiveMQMessageBundle.BUNDLE.journalDirIsFile(fDir);
}
if (fDir.list().length > 0)
allEmpty = false;
}
String sanitizedPath = fDir.getPath();
while (new File(sanitizedPath + lowestSuffixForMovedData).exists()) {
lowestSuffixForMovedData++;
redo = true;
}
}
}
if (allEmpty)
return;
for (File dir : dataDirs) {
File newPath = new File(dir.getPath() + lowestSuffixForMovedData);
if (dir.exists()) {
if (!dir.renameTo(newPath)) {
throw ActiveMQMessageBundle.BUNDLE.couldNotMoveJournal(dir);
}
ActiveMQServerLogger.LOGGER.backupMovingDataAway(dir.getAbsolutePath(), newPath.getPath());
}
/*
* sometimes OS's can hold on to file handles for a while so we need to check this actually qorks and then wait
* a while and try again if it doesn't
* */
int count = 0;
while (!dir.exists() && !dir.mkdir()) {
try {
Thread.sleep(1000);
}
catch (InterruptedException e) {
}
count++;
if (count == 5) {
throw ActiveMQMessageBundle.BUNDLE.cannotCreateDir(dir.getPath());
}
}
for (File data : dataDirs) {
FileMoveManager moveManager = new FileMoveManager(data, maxSavedReplicated);
moveManager.doMove();
}
}
@ -2371,4 +2327,25 @@ public class ActiveMQServerImpl implements ActiveMQServer {
return new Date().getTime() - startDate.getTime();
}
private final class ActivationThread extends Thread {
final Runnable runnable;
ActivationThread(Runnable runnable, String name) {
super(name);
this.runnable = runnable;
}
public void run() {
lockActivation();
try {
runnable.run();
}
finally {
unlockActivation();
}
}
}
}

View File

@ -0,0 +1,218 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.server.impl;
import java.io.File;
import java.io.FilenameFilter;
import java.io.IOException;
import java.nio.file.Files;
import java.util.Arrays;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.jboss.logging.Logger;
/**
* Used to move files away.
* Each time a backup starts its formeter data will be moved to a backup folder called bkp.1, bkp.2, ... etc
* We may control the maximum number of folders so we remove old ones.
*/
public class FileMoveManager {
private static final Logger logger = Logger.getLogger(FileMoveManager.class);
private final File folder;
private int maxFolders;
public static final String PREFIX = "oldreplica.";
private static final FilenameFilter isPrefix = new FilenameFilter() {
@Override
public boolean accept(File dir, String name) {
boolean prefixed = name.contains(PREFIX);
if (prefixed) {
try {
Integer.parseInt(name.substring(PREFIX.length()));
}
catch (NumberFormatException e) {
// This function is not really used a lot
// so I don't really mind about performance here
// this is good enough for what we need
prefixed = false;
}
}
return prefixed;
}
};
private static final FilenameFilter notPrefix = new FilenameFilter() {
@Override
public boolean accept(File dir, String name) {
return !isPrefix.accept(dir, name);
}
};
public FileMoveManager(File folder) {
this(folder, -1);
}
public FileMoveManager(File folder, int maxFolders) {
this.folder = folder;
this.maxFolders = maxFolders;
}
public int getMaxFolders() {
return maxFolders;
}
public FileMoveManager setMaxFolders(int maxFolders) {
this.maxFolders = maxFolders;
return this;
}
public void doMove() throws IOException {
String[] files = getFiles();
if (files == null || files.length == 0) {
// if no files, nothing to be done, no backup, no deletes... nothing!
return;
}
// Since we will create one folder, we are already taking that one into consideration
internalCheckOldFolders(1);
int whereToMove = getMaxID() + 1;
File folderTo = getFolder(whereToMove);
folderTo.mkdirs();
ActiveMQServerLogger.LOGGER.backupMovingDataAway(folder.getPath(), folderTo.getPath());
for (String fileMove : files) {
File fileFrom = new File(folder, fileMove);
File fileTo = new File(folderTo, fileMove);
logger.tracef("doMove:: moving %s as %s", fileFrom, fileTo);
Files.move(fileFrom.toPath(), fileTo.toPath());
}
}
public void checkOldFolders() {
internalCheckOldFolders(0);
}
private void internalCheckOldFolders(int creating) {
if (maxFolders > 0) {
int folders = getNumberOfFolders();
// We are counting the next one to be created
int foldersToDelete = folders + creating - maxFolders;
if (foldersToDelete > 0) {
logger.tracef("There are %d folders to delete", foldersToDelete);
int[] ids = getIDlist();
for (int i = 0; i < foldersToDelete; i++) {
File file = getFolder(ids[i]);
ActiveMQServerLogger.LOGGER.removingBackupData(file.getPath());
deleteTree(file);
}
}
}
}
/**
* It will return non backup folders
*/
public String[] getFiles() {
return folder.list(notPrefix);
}
public int getNumberOfFolders() {
return getFolders().length;
}
public String[] getFolders() {
String[] list = folder.list(isPrefix);
if (list == null) {
list = new String[0];
}
return list;
}
public int getMinID() {
int[] list = getIDlist();
if (list.length == 0) {
return 0;
}
return list[0];
}
public int getMaxID() {
int[] list = getIDlist();
if (list.length == 0) {
return 0;
}
return list[list.length - 1];
}
public int[] getIDlist() {
String[] list = getFolders();
int[] ids = new int[list.length];
for (int i = 0; i < ids.length; i++) {
ids[i] = getID(list[i]);
}
Arrays.sort(ids);
return ids;
}
public int getID(String folderName) {
return Integer.parseInt(folderName.substring(PREFIX.length()));
}
public File getFolder(int id) {
return new File(folder, PREFIX + id);
}
private void deleteTree(File file) {
File[] files = file.listFiles();
if (files != null) {
for (File fileDelete : files) {
deleteTree(fileDelete);
}
}
file.delete();
}
}

View File

@ -104,7 +104,7 @@ public final class SharedNothingBackupActivation extends Activation {
}
// move all data away:
activeMQServer.getNodeManager().stop();
activeMQServer.moveServerData();
activeMQServer.moveServerData(replicaPolicy.getMaxSavedReplicatedJournalsSize());
activeMQServer.getNodeManager().start();
synchronized (this) {
if (closed)
@ -311,7 +311,7 @@ public final class SharedNothingBackupActivation extends Activation {
}
if (logger.isTraceEnabled()) {
logger.trace("setReplicaPolicy::" + replicaPolicy);
logger.trace("@@@ setReplicaPolicy::" + replicaPolicy);
}
replicaPolicy.getReplicatedPolicy().setReplicaPolicy(replicaPolicy);

View File

@ -90,11 +90,16 @@ public class SharedNothingLiveActivation extends LiveActivation {
try {
if (replicatedPolicy.isCheckForLiveServer() && isNodeIdUsed()) {
//set for when we failback
if (logger.isTraceEnabled()) {
logger.tracef("@@@ setting up replicatedPolicy.getReplicaPolicy for back start, replicaPolicy::%s, isBackup=%s, server=%s", replicatedPolicy.getReplicaPolicy(), replicatedPolicy.isBackup(), activeMQServer);
}
replicatedPolicy.getReplicaPolicy().setReplicatedPolicy(replicatedPolicy);
activeMQServer.setHAPolicy(replicatedPolicy.getReplicaPolicy());
return;
}
logger.trace("@@@ did not do it now");
activeMQServer.initialisePart1(false);
activeMQServer.initialisePart2(false);
@ -175,16 +180,11 @@ public class SharedNothingLiveActivation extends LiveActivation {
clusterConnection.addClusterTopologyListener(listener1);
if (listener1.waitForBackup()) {
//if we have to many backups kept or are not configured to restart just stop, otherwise restart as a backup
if (!replicatedPolicy.getReplicaPolicy().isRestartBackup() && activeMQServer.countNumberOfCopiedJournals() >= replicatedPolicy.getReplicaPolicy().getMaxSavedReplicatedJournalsSize() && replicatedPolicy.getReplicaPolicy().getMaxSavedReplicatedJournalsSize() >= 0) {
activeMQServer.stop(true);
ActiveMQServerLogger.LOGGER.stopReplicatedBackupAfterFailback();
}
else {
activeMQServer.stop(true);
ActiveMQServerLogger.LOGGER.restartingReplicatedBackupAfterFailback();
activeMQServer.setHAPolicy(replicatedPolicy.getReplicaPolicy());
activeMQServer.start();
}
activeMQServer.stop(true);
ActiveMQServerLogger.LOGGER.restartingReplicatedBackupAfterFailback();
// activeMQServer.moveServerData(replicatedPolicy.getReplicaPolicy().getMaxSavedReplicatedJournalsSize());
activeMQServer.setHAPolicy(replicatedPolicy.getReplicaPolicy());
activeMQServer.start();
}
else {
ActiveMQServerLogger.LOGGER.failbackMissedBackupAnnouncement();

View File

@ -0,0 +1,346 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.server.impl;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.paging.PagingStore;
import org.apache.activemq.artemis.core.paging.impl.PagingManagerImpl;
import org.apache.activemq.artemis.core.paging.impl.PagingStoreFactoryNIO;
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.persistence.impl.nullpm.NullStorageManager;
import org.apache.activemq.artemis.core.settings.HierarchicalRepository;
import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.core.settings.impl.HierarchicalObjectRepository;
import org.apache.activemq.artemis.logs.AssertionLoggerHandler;
import org.apache.activemq.artemis.tests.util.ThreadLeakCheckRule;
import org.apache.activemq.artemis.utils.OrderedExecutorFactory;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
public class FileMoveManagerTest {
@Rule
public TemporaryFolder temporaryFolder;
@Rule
public ThreadLeakCheckRule leakCheckRule = new ThreadLeakCheckRule();
private File dataLocation;
private FileMoveManager manager;
@Before
public void setUp() {
dataLocation = new File(temporaryFolder.getRoot(), "data");
dataLocation.mkdirs();
manager = new FileMoveManager(dataLocation, 10);
}
public FileMoveManagerTest() {
File parent = new File("./target/tmp");
parent.mkdirs();
temporaryFolder = new TemporaryFolder(parent);
}
@Test
public void testBackupFiles() {
int[] originalFiles = new int[12];
int count = 0;
// It will fake folders creation
for (int i = 0; i < 12; i++) {
originalFiles[count++] = i;
File bkp = new File(dataLocation, FileMoveManager.PREFIX + i);
bkp.mkdirs();
}
Assert.assertEquals(12, manager.getFolders().length);
Assert.assertEquals(12, manager.getNumberOfFolders());
assertIDs(originalFiles, manager.getIDlist());
}
@Test
public void testMinMax() {
int[] originalFiles = new int[12];
int count = 0;
// It will fake folders creation
for (int i = 0; i < 5; i++) {
originalFiles[count++] = i;
File bkp = new File(dataLocation, FileMoveManager.PREFIX + i);
bkp.mkdirs();
}
// simulates a hole where someone removed a folder by hand
// It will fake folders creation
for (int i = 7; i < 14; i++) {
originalFiles[count++] = i;
File bkp = new File(dataLocation, FileMoveManager.PREFIX + i);
bkp.mkdirs();
}
Assert.assertEquals(12, manager.getFolders().length);
Assert.assertEquals(12, manager.getNumberOfFolders());
int[] ids = manager.getIDlist();
assertIDs(originalFiles, ids);
Assert.assertEquals(0, manager.getMinID());
Assert.assertEquals(13, manager.getMaxID());
manager.setMaxFolders(3).checkOldFolders();
Assert.assertEquals(3, manager.getNumberOfFolders());
Assert.assertEquals(13, manager.getMaxID());
Assert.assertEquals(11, manager.getMinID());
}
@Test
public void testGarbageCreated() {
// I'm pretending an admin created a folder here
File garbage = new File(dataLocation, "bkp.zzz");
garbage.mkdirs();
testMinMax();
resetTmp();
// the admin renamed a folder maybe
garbage = new File(dataLocation, "bkp.001.old");
garbage.mkdirs();
resetTmp();
// the admin renamed a folder maybe
garbage = new File(dataLocation, "bkp.1.5");
garbage.mkdirs();
testMinMax();
}
@Test
public void testNoFolders() {
Assert.assertEquals(0, manager.getFolders().length);
Assert.assertEquals(0, manager.getNumberOfFolders());
Assert.assertTrue(dataLocation.delete());
Assert.assertEquals(0, manager.getFolders().length);
Assert.assertEquals(0, manager.getNumberOfFolders());
}
@Test
public void testNoFiles() throws Exception {
// nothing to be moved, so why to do a backup
manager.doMove();
Assert.assertEquals(0, manager.getNumberOfFolders());
}
@Test
public void testMoveFiles() throws Exception {
manager.setMaxFolders(3);
for (int bkp = 1; bkp <= 10; bkp++) {
for (int i = 0; i < 100; i++) {
createFile(dataLocation, i);
}
manager.doMove();
// We will always have maximum of 3 folders
Assert.assertEquals(Math.min(bkp, manager.getMaxFolders()), manager.getNumberOfFolders());
File bkpFolder = manager.getFolder(bkp);
FileMoveManager bkp1Manager = new FileMoveManager(bkpFolder, 10);
String[] filesAfterMove = bkp1Manager.getFiles();
for (String file : filesAfterMove) {
checkFile(bkpFolder, file);
}
}
Assert.assertEquals(manager.getMaxFolders(), manager.getNumberOfFolders());
manager.setMaxFolders(0).checkOldFolders();
Assert.assertEquals(3, manager.getNumberOfFolders());
manager.setMaxFolders(1).checkOldFolders();
Assert.assertEquals(1, manager.getNumberOfFolders());
Assert.assertEquals(10, manager.getMaxID());
Assert.assertEquals(10, manager.getMinID());
}
@Test
public void testMoveFolders() throws Exception {
manager.setMaxFolders(3);
int NUMBER_OF_FOLDERS = 10;
int FILES_PER_FOLDER = 10;
for (int bkp = 1; bkp <= 10; bkp++) {
for (int f = 0; f < NUMBER_OF_FOLDERS; f++) {
File folderF = new File(dataLocation, "folder" + f);
folderF.mkdirs();
// FILES_PER_FOLDER + f, I'm just creating more files as f grows.
// this is just to make each folder unique somehow
for (int i = 0; i < FILES_PER_FOLDER + f; i++) {
createFile(folderF, i);
}
}
manager.doMove();
// We will always have maximum of 3 folders
Assert.assertEquals(Math.min(bkp, manager.getMaxFolders()), manager.getNumberOfFolders());
File bkpFolder = manager.getFolder(bkp);
for (int f = 0; f < NUMBER_OF_FOLDERS; f++) {
File fileTmp = new File(bkpFolder, "folder" + f);
String[] filesOnFolder = fileTmp.list();
Assert.assertEquals(FILES_PER_FOLDER + f, filesOnFolder.length);
for (String file : filesOnFolder) {
checkFile(fileTmp, file);
}
}
}
Assert.assertEquals(manager.getMaxFolders(), manager.getNumberOfFolders());
manager.setMaxFolders(0).checkOldFolders();
Assert.assertEquals(3, manager.getNumberOfFolders());
manager.setMaxFolders(1).checkOldFolders();
Assert.assertEquals(1, manager.getNumberOfFolders());
Assert.assertEquals(10, manager.getMaxID());
Assert.assertEquals(10, manager.getMinID());
}
@Test
public void testMoveOverPaging() throws Exception {
AssertionLoggerHandler.startCapture();
ExecutorService threadPool = Executors.newCachedThreadPool();
try {
manager.setMaxFolders(3);
for (int i = 1; i <= 10; i++) {
HierarchicalRepository<AddressSettings> addressSettings = new HierarchicalObjectRepository<>();
AddressSettings settings = new AddressSettings();
settings.setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE);
addressSettings.setDefault(settings);
final StorageManager storageManager = new NullStorageManager();
PagingStoreFactoryNIO storeFactory =
new PagingStoreFactoryNIO(storageManager, dataLocation, 100, null,
new OrderedExecutorFactory(threadPool), true, null);
PagingManagerImpl managerImpl = new PagingManagerImpl(storeFactory, addressSettings);
managerImpl.start();
PagingStore store = managerImpl.getPageStore(new SimpleString("simple-test"));
store.startPaging();
store.stop();
managerImpl.stop();
manager.doMove();
Assert.assertEquals(Math.min(i, manager.getMaxFolders()), manager.getNumberOfFolders());
}
Assert.assertFalse("The loggers are complaining about address.txt", AssertionLoggerHandler.findText("address.txt"));
}
finally {
AssertionLoggerHandler.stopCapture();
threadPool.shutdown();
}
}
private void assertIDs(int[] originalFiles, int[] ids) {
Assert.assertEquals(originalFiles.length, ids.length);
for (int i = 0; i < ids.length; i++) {
Assert.assertEquals(originalFiles[i], ids[i]);
}
}
private void resetTmp() {
temporaryFolder.delete();
temporaryFolder.getRoot().mkdirs();
Assert.assertEquals(0, manager.getNumberOfFolders());
}
private void createFile(File folder, int i) throws FileNotFoundException {
File dataFile = new File(folder, i + ".jrn");
PrintWriter outData = new PrintWriter(new FileOutputStream(dataFile));
outData.print(i);
outData.close();
}
private void checkFile(File bkpFolder, String file) throws IOException {
File fileRead = new File(bkpFolder, file);
InputStreamReader stream = new InputStreamReader(new FileInputStream(fileRead));
BufferedReader reader = new BufferedReader(stream);
String valueRead = reader.readLine();
int id = Integer.parseInt(file.substring(0, file.indexOf('.')));
Assert.assertEquals("content of the file wasn't the expected", id, Integer.parseInt(valueRead));
}
}

View File

@ -1297,6 +1297,10 @@ public abstract class ActiveMQTestBase extends Assert {
}
protected void waitForServerToStart(ActiveMQServer server) throws InterruptedException {
waitForServerToStart(server, true);
}
protected void waitForServerToStart(ActiveMQServer server, boolean activation) throws InterruptedException {
if (server == null)
return;
final long wait = 5000;
@ -1310,9 +1314,12 @@ public abstract class ActiveMQTestBase extends Assert {
fail("server didn't start: " + server);
}
if (!server.getHAPolicy().isBackup()) {
if (!server.waitForActivation(wait, TimeUnit.MILLISECONDS))
fail("Server didn't initialize: " + server);
if (activation) {
if (!server.getHAPolicy().isBackup()) {
if (!server.waitForActivation(wait, TimeUnit.MILLISECONDS))
fail("Server didn't initialize: " + server);
}
}
}

View File

@ -51,7 +51,7 @@ handler.TEST.formatter=PATTERN
# Formatter pattern configuration
formatter.PATTERN=org.jboss.logmanager.formatters.PatternFormatter
formatter.PATTERN.properties=pattern
formatter.PATTERN.pattern=[%t] %d{HH:mm:ss,SSS} %-5p [%c] %s%E%n
#formatter.PATTERN.pattern=[%t] %d{HH:mm:ss,SSS} %-5p [%c] %s%E%n
# Alternate format useful when debugging
#formatter.PATTERN.pattern=*** [%t] ***\n%d{HH:mm:ss,SSS} %-5p [%c] %s%E%n\n
formatter.PATTERN.pattern=*** [%t] ***\n%d{HH:mm:ss,SSS} %-5p [%c] %s%E%n\n

View File

@ -41,15 +41,20 @@ import org.apache.activemq.artemis.core.paging.PagingStore;
import org.apache.activemq.artemis.core.persistence.impl.journal.DescribeJournal;
import org.apache.activemq.artemis.core.persistence.impl.journal.JournalStorageManager;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.impl.FileMoveManager;
import org.apache.activemq.artemis.tests.integration.cluster.util.BackupSyncDelay;
import org.apache.activemq.artemis.tests.integration.cluster.util.TestableServer;
import org.apache.activemq.artemis.tests.util.TransportConfigurationUtils;
import org.apache.activemq.artemis.utils.UUID;
import org.jboss.logging.Logger;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
public class BackupSyncJournalTest extends FailoverTestBase {
private static final Logger logger = Logger.getLogger(BackupSyncJournalTest.class);
protected static final int BACKUP_WAIT_TIME = 60;
private ServerLocatorInternal locator;
protected ClientSessionFactoryInternal sessionFactory;
@ -283,17 +288,28 @@ public class BackupSyncJournalTest extends FailoverTestBase {
sendMessages(session, producer, 2 * n_msgs);
assertFalse("must NOT be a backup", liveServer.getServer().getHAPolicy().isBackup());
adaptLiveConfigForReplicatedFailBack(liveServer);
liveServer.start();
FileMoveManager liveMoveManager = new FileMoveManager(liveServer.getServer().getConfiguration().getJournalLocation(), -1);
liveServer.getServer().lockActivation();
try {
liveServer.start();
assertTrue("must have become a backup", liveServer.getServer().getHAPolicy().isBackup());
Assert.assertEquals(0, liveMoveManager.getNumberOfFolders());
}
finally {
liveServer.getServer().unlockActivation();
}
waitForServerToStart(liveServer.getServer());
assertTrue("must have become a backup", liveServer.getServer().getHAPolicy().isBackup());
liveServer.getServer().waitForActivation(10, TimeUnit.SECONDS);
Assert.assertEquals(1, liveMoveManager.getNumberOfFolders());
assertTrue("must be active now", !liveServer.getServer().getHAPolicy().isBackup());
assertTrue("Fail-back must initialize live!", liveServer.getServer().waitForActivation(15, TimeUnit.SECONDS));
assertFalse("must be LIVE!", liveServer.getServer().getHAPolicy().isBackup());
int i = 0;
while (backupServer.isStarted() && i++ < 100) {
while (!backupServer.isStarted() && i++ < 100) {
Thread.sleep(100);
}
assertFalse("Backup should stop!", backupServer.getServer().isStarted());
assertTrue(backupServer.getServer().isStarted());
assertTrue(liveServer.getServer().isStarted());
receiveMsgsInRange(0, 2 * n_msgs);
assertNoMoreMessages();

View File

@ -44,8 +44,6 @@ import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.api.core.client.MessageHandler;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger;
import org.apache.activemq.artemis.tests.integration.cluster.util.TestableServer;
import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryInternal;
import org.apache.activemq.artemis.core.server.cluster.ha.BackupPolicy;
import org.apache.activemq.artemis.core.server.cluster.ha.HAPolicy;
@ -53,9 +51,12 @@ import org.apache.activemq.artemis.core.server.cluster.ha.ReplicaPolicy;
import org.apache.activemq.artemis.core.server.cluster.ha.ReplicatedPolicy;
import org.apache.activemq.artemis.core.server.cluster.ha.SharedStoreMasterPolicy;
import org.apache.activemq.artemis.core.server.cluster.ha.SharedStoreSlavePolicy;
import org.apache.activemq.artemis.core.server.impl.FileMoveManager;
import org.apache.activemq.artemis.core.server.impl.InVMNodeManager;
import org.apache.activemq.artemis.core.transaction.impl.XidImpl;
import org.apache.activemq.artemis.jms.client.ActiveMQTextMessage;
import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger;
import org.apache.activemq.artemis.tests.integration.cluster.util.TestableServer;
import org.apache.activemq.artemis.tests.util.CountDownSessionFailureListener;
import org.apache.activemq.artemis.tests.util.TransportConfigurationUtils;
import org.apache.activemq.artemis.utils.RandomUtil;
@ -518,10 +519,12 @@ public class FailoverTest extends FailoverTestBase {
boolean doFailBack = true;
HAPolicy haPolicy = backupServer.getServer().getHAPolicy();
if (haPolicy instanceof ReplicaPolicy) {
((ReplicaPolicy) haPolicy).setMaxSavedReplicatedJournalsSize(0);
((ReplicaPolicy) haPolicy).setMaxSavedReplicatedJournalsSize(1);
}
simpleReplication(doFailBack);
simpleFailover(haPolicy instanceof ReplicaPolicy, doFailBack);
tearDown();
setUp();
}
@Test
@ -571,9 +574,10 @@ public class FailoverTest extends FailoverTestBase {
}
@Test
public void testSimpleReplication() throws Exception {
boolean doFailBack = false;
simpleReplication(doFailBack);
public void testSimpleFailover() throws Exception {
HAPolicy haPolicy = backupServer.getServer().getHAPolicy();
simpleFailover(haPolicy instanceof ReplicaPolicy, false);
}
@Test
@ -628,7 +632,7 @@ public class FailoverTest extends FailoverTestBase {
* @param doFailBack
* @throws Exception
*/
private void simpleReplication(boolean doFailBack) throws Exception {
private void simpleFailover(boolean isReplicated, boolean doFailBack) throws Exception {
locator.setFailoverOnInitialConnection(true);
createSessionFactory();
ClientSession session = createSessionAndQueue();
@ -660,10 +664,16 @@ public class FailoverTest extends FailoverTestBase {
liveServer.start();
Assert.assertTrue("live initialized...", liveServer.getServer().waitForActivation(40, TimeUnit.SECONDS));
int i = 0;
while (backupServer.isStarted() && i++ < 100) {
while (!backupServer.isStarted() && i++ < 100) {
Thread.sleep(100);
}
Assert.assertFalse("Backup should stop!", backupServer.isStarted());
liveServer.getServer().waitForActivation(5, TimeUnit.SECONDS);
Assert.assertTrue(backupServer.isStarted());
if (isReplicated) {
FileMoveManager moveManager = new FileMoveManager(backupServer.getServer().getConfiguration().getJournalLocation(), 0);
Assert.assertEquals(1, moveManager.getNumberOfFolders());
}
}
else {
backupServer.stop();
@ -886,35 +896,49 @@ public class FailoverTest extends FailoverTestBase {
@Test
public void testTransactedMessagesNotSentSoNoRollback() throws Exception {
createSessionFactory();
try {
createSessionFactory();
ClientSession session = createSessionAndQueue();
ClientSession session = createSessionAndQueue();
ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS);
ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS);
sendMessagesSomeDurable(session, producer);
sendMessagesSomeDurable(session, producer);
session.commit();
session.commit();
crash(session);
crash(session);
// committing again should work since didn't send anything since last commit
// committing again should work since didn't send anything since last commit
Assert.assertFalse(session.isRollbackOnly());
Assert.assertFalse(session.isRollbackOnly());
session.commit();
session.commit();
ClientConsumer consumer = session.createConsumer(FailoverTestBase.ADDRESS);
ClientConsumer consumer = session.createConsumer(FailoverTestBase.ADDRESS);
session.start();
session.start();
receiveDurableMessages(consumer);
receiveDurableMessages(consumer);
Assert.assertNull(consumer.receiveImmediate());
Assert.assertNull(consumer.receiveImmediate());
session.commit();
session.commit();
session.close();
session.close();
}
finally {
try {
liveServer.getServer().stop();
}
catch (Throwable ignored) {
}
try {
backupServer.getServer().stop();
}
catch (Throwable ignored) {
}
}
}
@Test

View File

@ -320,7 +320,7 @@ public class LiveToLiveFailoverTest extends FailoverTest {
}
@Override
public void testSimpleReplication() throws Exception {
public void testSimpleFailover() throws Exception {
}
@Override

View File

@ -16,6 +16,8 @@
*/
package org.apache.activemq.artemis.tests.integration.cluster.failover;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.core.config.ha.ReplicaPolicyConfiguration;
import org.apache.activemq.artemis.core.config.ha.ReplicatedPolicyConfiguration;
@ -98,22 +100,31 @@ public class ReplicatedFailoverTest extends FailoverTest {
liveServer.start();
waitForRemoteBackupSynchronization(liveServer.getServer());
waitForServerToStart(liveServer.getServer());
//this will give the backup time to stop fully
waitForServerToStop(backupServer.getServer());
backupServer.getServer().waitForActivation(5, TimeUnit.SECONDS);
assertFalse(backupServer.getServer().isStarted());
waitForRemoteBackupSynchronization(liveServer.getServer());
waitForServerToStart(backupServer.getServer());
assertTrue(backupServer.getServer().isStarted());
//the server wouldnt have reset to backup
assertFalse(backupServer.getServer().getHAPolicy().isBackup());
}
finally {
if (sf != null) {
sf.close();
}
try {
liveServer.getServer().stop();
}
catch (Throwable ignored) {
}
try {
backupServer.getServer().stop();
}
catch (Throwable ignored) {
}
}
}