HBASE-23202 ExportSnapshot (import) will fail if copying files to root directory takes longer than cleaner TTL (#1791)

Co-authored-by: Huaxiang Sun <huaxiangsun@apache.org>

Signed-off-by: Nick Dimiduk <ndimiduk@apache.org>
Signed-off-by: Zach York <zyork@apache.org>
This commit is contained in:
huaxiangsun 2020-06-08 14:48:21 -07:00 committed by GitHub
parent 05ed4cd133
commit f862f3d9b5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 328 additions and 103 deletions

View File

@ -33,6 +33,7 @@ import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Stoppable; import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.snapshot.CorruptedSnapshotException;
import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils; import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
import org.apache.hadoop.hbase.util.CommonFSUtils; import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
@ -40,6 +41,7 @@ import org.apache.yetus.audience.InterfaceStability;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hbase.thirdparty.com.google.common.collect.Lists; import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
/** /**
@ -77,17 +79,19 @@ public class SnapshotFileCache implements Stoppable {
interface SnapshotFileInspector { interface SnapshotFileInspector {
/** /**
* Returns a collection of file names needed by the snapshot. * Returns a collection of file names needed by the snapshot.
* @param fs {@link FileSystem} where snapshot mainifest files are stored
* @param snapshotDir {@link Path} to the snapshot directory to scan. * @param snapshotDir {@link Path} to the snapshot directory to scan.
* @return the collection of file names needed by the snapshot. * @return the collection of file names needed by the snapshot.
*/ */
Collection<String> filesUnderSnapshot(final Path snapshotDir) throws IOException; Collection<String> filesUnderSnapshot(final FileSystem fs, final Path snapshotDir)
throws IOException;
} }
private static final Logger LOG = LoggerFactory.getLogger(SnapshotFileCache.class); private static final Logger LOG = LoggerFactory.getLogger(SnapshotFileCache.class);
private volatile boolean stop = false; private volatile boolean stop = false;
private final FileSystem fs; private final FileSystem fs, workingFs;
private final SnapshotFileInspector fileInspector; private final SnapshotFileInspector fileInspector;
private final Path snapshotDir; private final Path snapshotDir, workingSnapshotDir;
private final Set<String> cache = new HashSet<>(); private final Set<String> cache = new HashSet<>();
/** /**
* This is a helper map of information about the snapshot directories so we don't need to rescan * This is a helper map of information about the snapshot directories so we don't need to rescan
@ -104,14 +108,18 @@ public class SnapshotFileCache implements Stoppable {
* @param conf to extract the configured {@link FileSystem} where the snapshots are stored and * @param conf to extract the configured {@link FileSystem} where the snapshots are stored and
* hbase root directory * hbase root directory
* @param cacheRefreshPeriod frequency (ms) with which the cache should be refreshed * @param cacheRefreshPeriod frequency (ms) with which the cache should be refreshed
* @param cacheRefreshDelay amount of time to wait for the cache to be refreshed
* @param refreshThreadName name of the cache refresh thread * @param refreshThreadName name of the cache refresh thread
* @param inspectSnapshotFiles Filter to apply to each snapshot to extract the files. * @param inspectSnapshotFiles Filter to apply to each snapshot to extract the files.
* @throws IOException if the {@link FileSystem} or root directory cannot be loaded * @throws IOException if the {@link FileSystem} or root directory cannot be loaded
*/ */
public SnapshotFileCache(Configuration conf, long cacheRefreshPeriod, String refreshThreadName, public SnapshotFileCache(Configuration conf, long cacheRefreshPeriod, long cacheRefreshDelay,
SnapshotFileInspector inspectSnapshotFiles) throws IOException { String refreshThreadName, SnapshotFileInspector inspectSnapshotFiles) throws IOException {
this(CommonFSUtils.getCurrentFileSystem(conf), CommonFSUtils.getRootDir(conf), 0, this(CommonFSUtils.getCurrentFileSystem(conf), CommonFSUtils.getRootDir(conf),
cacheRefreshPeriod, refreshThreadName, inspectSnapshotFiles); SnapshotDescriptionUtils.getWorkingSnapshotDir(CommonFSUtils.getRootDir(conf), conf).
getFileSystem(conf),
SnapshotDescriptionUtils.getWorkingSnapshotDir(CommonFSUtils.getRootDir(conf), conf),
cacheRefreshPeriod, cacheRefreshDelay, refreshThreadName, inspectSnapshotFiles);
} }
/** /**
@ -119,15 +127,19 @@ public class SnapshotFileCache implements Stoppable {
* filesystem * filesystem
* @param fs {@link FileSystem} where the snapshots are stored * @param fs {@link FileSystem} where the snapshots are stored
* @param rootDir hbase root directory * @param rootDir hbase root directory
* @param workingFs {@link FileSystem} where ongoing snapshot mainifest files are stored
* @param workingDir Location to store ongoing snapshot manifest files
* @param cacheRefreshPeriod period (ms) with which the cache should be refreshed * @param cacheRefreshPeriod period (ms) with which the cache should be refreshed
* @param cacheRefreshDelay amount of time to wait for the cache to be refreshed * @param cacheRefreshDelay amount of time to wait for the cache to be refreshed
* @param refreshThreadName name of the cache refresh thread * @param refreshThreadName name of the cache refresh thread
* @param inspectSnapshotFiles Filter to apply to each snapshot to extract the files. * @param inspectSnapshotFiles Filter to apply to each snapshot to extract the files.
*/ */
public SnapshotFileCache(FileSystem fs, Path rootDir, long cacheRefreshPeriod, public SnapshotFileCache(FileSystem fs, Path rootDir, FileSystem workingFs, Path workingDir,
long cacheRefreshDelay, String refreshThreadName, long cacheRefreshPeriod, long cacheRefreshDelay, String refreshThreadName,
SnapshotFileInspector inspectSnapshotFiles) { SnapshotFileInspector inspectSnapshotFiles) {
this.fs = fs; this.fs = fs;
this.workingFs = workingFs;
this.workingSnapshotDir = workingDir;
this.fileInspector = inspectSnapshotFiles; this.fileInspector = inspectSnapshotFiles;
this.snapshotDir = SnapshotDescriptionUtils.getSnapshotsDir(rootDir); this.snapshotDir = SnapshotDescriptionUtils.getSnapshotsDir(rootDir);
// periodically refresh the file cache to make sure we aren't superfluously saving files. // periodically refresh the file cache to make sure we aren't superfluously saving files.
@ -176,6 +188,7 @@ public class SnapshotFileCache implements Stoppable {
public synchronized Iterable<FileStatus> getUnreferencedFiles(Iterable<FileStatus> files, public synchronized Iterable<FileStatus> getUnreferencedFiles(Iterable<FileStatus> files,
final SnapshotManager snapshotManager) throws IOException { final SnapshotManager snapshotManager) throws IOException {
List<FileStatus> unReferencedFiles = Lists.newArrayList(); List<FileStatus> unReferencedFiles = Lists.newArrayList();
List<String> snapshotsInProgress = null;
boolean refreshed = false; boolean refreshed = false;
Lock lock = null; Lock lock = null;
if (snapshotManager != null) { if (snapshotManager != null) {
@ -197,6 +210,12 @@ public class SnapshotFileCache implements Stoppable {
if (cache.contains(fileName)) { if (cache.contains(fileName)) {
continue; continue;
} }
if (snapshotsInProgress == null) {
snapshotsInProgress = getSnapshotsInProgress();
}
if (snapshotsInProgress.contains(fileName)) {
continue;
}
unReferencedFiles.add(file); unReferencedFiles.add(file);
} }
} finally { } finally {
@ -239,7 +258,8 @@ public class SnapshotFileCache implements Stoppable {
// that new snapshot, even though it has the same name as the files referenced have // that new snapshot, even though it has the same name as the files referenced have
// probably changed. // probably changed.
if (files == null || files.hasBeenModified(snapshotDir.getModificationTime())) { if (files == null || files.hasBeenModified(snapshotDir.getModificationTime())) {
Collection<String> storedFiles = fileInspector.filesUnderSnapshot(snapshotDir.getPath()); Collection<String> storedFiles = fileInspector.filesUnderSnapshot(fs,
snapshotDir.getPath());
files = new SnapshotDirectoryInfo(snapshotDir.getModificationTime(), storedFiles); files = new SnapshotDirectoryInfo(snapshotDir.getModificationTime(), storedFiles);
} }
// add all the files to cache // add all the files to cache
@ -251,6 +271,26 @@ public class SnapshotFileCache implements Stoppable {
this.snapshots.putAll(newSnapshots); this.snapshots.putAll(newSnapshots);
} }
@VisibleForTesting
List<String> getSnapshotsInProgress() throws IOException {
List<String> snapshotInProgress = Lists.newArrayList();
// only add those files to the cache, but not to the known snapshots
FileStatus[] snapshotsInProgress = CommonFSUtils.listStatus(this.workingFs, this.workingSnapshotDir);
if (!ArrayUtils.isEmpty(snapshotsInProgress)) {
for (FileStatus snapshot : snapshotsInProgress) {
try {
snapshotInProgress.addAll(fileInspector.filesUnderSnapshot(workingFs,
snapshot.getPath()));
} catch (CorruptedSnapshotException cse) {
LOG.info("Corrupted in-progress snapshot file exception, ignored.", cse);
}
}
}
return snapshotInProgress;
}
/** /**
* Simple helper task that just periodically attempts to refresh the cache * Simple helper task that just periodically attempts to refresh the cache
*/ */

View File

@ -30,6 +30,7 @@ import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.master.MasterServices; import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.hadoop.hbase.master.cleaner.BaseHFileCleanerDelegate; import org.apache.hadoop.hbase.master.cleaner.BaseHFileCleanerDelegate;
import org.apache.hadoop.hbase.snapshot.CorruptedSnapshotException; import org.apache.hadoop.hbase.snapshot.CorruptedSnapshotException;
import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
import org.apache.hadoop.hbase.snapshot.SnapshotReferenceUtil; import org.apache.hadoop.hbase.snapshot.SnapshotReferenceUtil;
import org.apache.hadoop.hbase.util.CommonFSUtils; import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
@ -93,10 +94,15 @@ public class SnapshotHFileCleaner extends BaseHFileCleanerDelegate {
DEFAULT_HFILE_CACHE_REFRESH_PERIOD); DEFAULT_HFILE_CACHE_REFRESH_PERIOD);
final FileSystem fs = CommonFSUtils.getCurrentFileSystem(conf); final FileSystem fs = CommonFSUtils.getCurrentFileSystem(conf);
Path rootDir = CommonFSUtils.getRootDir(conf); Path rootDir = CommonFSUtils.getRootDir(conf);
cache = new SnapshotFileCache(fs, rootDir, cacheRefreshPeriod, cacheRefreshPeriod, Path workingDir = SnapshotDescriptionUtils.getWorkingSnapshotDir(rootDir, conf);
"snapshot-hfile-cleaner-cache-refresher", new SnapshotFileCache.SnapshotFileInspector() { FileSystem workingFs = workingDir.getFileSystem(conf);
cache = new SnapshotFileCache(fs, rootDir, workingFs, workingDir, cacheRefreshPeriod,
cacheRefreshPeriod, "snapshot-hfile-cleaner-cache-refresher",
new SnapshotFileCache.SnapshotFileInspector() {
@Override @Override
public Collection<String> filesUnderSnapshot(final Path snapshotDir) public Collection<String> filesUnderSnapshot(final FileSystem fs,
final Path snapshotDir)
throws IOException { throws IOException {
return SnapshotReferenceUtil.getHFileNames(conf, fs, snapshotDir); return SnapshotReferenceUtil.getHFileNames(conf, fs, snapshotDir);
} }

View File

@ -224,7 +224,9 @@ public abstract class TakeSnapshotHandler extends EventHandler implements Snapsh
verifier.verifySnapshot(this.workingDir, serverNames); verifier.verifySnapshot(this.workingDir, serverNames);
// complete the snapshot, atomically moving from tmp to .snapshot dir. // complete the snapshot, atomically moving from tmp to .snapshot dir.
completeSnapshot(this.snapshotDir, this.workingDir, this.rootFs, this.workingDirFs); SnapshotDescriptionUtils.completeSnapshot(this.snapshotDir, this.workingDir, this.rootFs,
this.workingDirFs, this.conf);
finished = true;
msg = "Snapshot " + snapshot.getName() + " of table " + snapshotTable + " completed"; msg = "Snapshot " + snapshot.getName() + " of table " + snapshotTable + " completed";
status.markComplete(msg); status.markComplete(msg);
LOG.info(msg); LOG.info(msg);
@ -258,42 +260,6 @@ public abstract class TakeSnapshotHandler extends EventHandler implements Snapsh
} }
} }
/**
* Reset the manager to allow another snapshot to proceed.
* Commits the snapshot process by moving the working snapshot
* to the finalized filepath
*
* @param snapshotDir The file path of the completed snapshots
* @param workingDir The file path of the in progress snapshots
* @param fs The file system of the completed snapshots
* @param workingDirFs The file system of the in progress snapshots
*
* @throws SnapshotCreationException if the snapshot could not be moved
* @throws IOException the filesystem could not be reached
*/
public void completeSnapshot(Path snapshotDir, Path workingDir, FileSystem fs,
FileSystem workingDirFs) throws SnapshotCreationException, IOException {
LOG.debug("Sentinel is done, just moving the snapshot from " + workingDir + " to "
+ snapshotDir);
// If the working and completed snapshot directory are on the same file system, attempt
// to rename the working snapshot directory to the completed location. If that fails,
// or the file systems differ, attempt to copy the directory over, throwing an exception
// if this fails
URI workingURI = workingDirFs.getUri();
URI rootURI = fs.getUri();
if ((!workingURI.getScheme().equals(rootURI.getScheme()) ||
workingURI.getAuthority() == null ||
!workingURI.getAuthority().equals(rootURI.getAuthority()) ||
workingURI.getUserInfo() == null ||
!workingURI.getUserInfo().equals(rootURI.getUserInfo()) ||
!fs.rename(workingDir, snapshotDir)) && !FileUtil.copy(workingDirFs, workingDir, fs,
snapshotDir, true, true, this.conf)) {
throw new SnapshotCreationException("Failed to copy working directory(" + workingDir
+ ") to completed directory(" + snapshotDir + ").");
}
finished = true;
}
/** /**
* When taking snapshot, first we must acquire the exclusive table lock to confirm that there are * When taking snapshot, first we must acquire the exclusive table lock to confirm that there are
* no ongoing merge/split procedures. But later, we should try our best to release the exclusive * no ongoing merge/split procedures. But later, we should try our best to release the exclusive

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.hbase.snapshot; package org.apache.hadoop.hbase.snapshot;
import java.io.IOException; import java.io.IOException;
import java.net.URI;
import java.security.PrivilegedExceptionAction; import java.security.PrivilegedExceptionAction;
import java.util.Collections; import java.util.Collections;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -25,6 +26,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants;
@ -383,25 +385,38 @@ public final class SnapshotDescriptionUtils {
} }
/** /**
* Move the finished snapshot to its final, publicly visible directory - this marks the snapshot * Commits the snapshot process by moving the working snapshot
* as 'complete'. * to the finalized filepath
* @param snapshot description of the snapshot being tabken *
* @param rootdir root directory of the hbase installation * @param snapshotDir The file path of the completed snapshots
* @param workingDir directory where the in progress snapshot was built * @param workingDir The file path of the in progress snapshots
* @param fs {@link FileSystem} where the snapshot was built * @param fs The file system of the completed snapshots
* @throws org.apache.hadoop.hbase.snapshot.SnapshotCreationException if the * @param workingDirFs The file system of the in progress snapshots
* snapshot could not be moved * @param conf Configuration
*
* @throws SnapshotCreationException if the snapshot could not be moved
* @throws IOException the filesystem could not be reached * @throws IOException the filesystem could not be reached
*/ */
public static void completeSnapshot(SnapshotDescription snapshot, Path rootdir, Path workingDir, public static void completeSnapshot(Path snapshotDir, Path workingDir, FileSystem fs,
FileSystem fs) throws SnapshotCreationException, IOException { FileSystem workingDirFs, final Configuration conf)
Path finishedDir = getCompletedSnapshotDir(snapshot, rootdir); throws SnapshotCreationException, IOException {
LOG.debug("Snapshot is done, just moving the snapshot from " + workingDir + " to " LOG.debug("Sentinel is done, just moving the snapshot from " + workingDir + " to "
+ finishedDir); + snapshotDir);
if (!fs.rename(workingDir, finishedDir)) { // If the working and completed snapshot directory are on the same file system, attempt
throw new SnapshotCreationException( // to rename the working snapshot directory to the completed location. If that fails,
"Failed to move working directory(" + workingDir + ") to completed directory(" // or the file systems differ, attempt to copy the directory over, throwing an exception
+ finishedDir + ").", ProtobufUtil.createSnapshotDesc(snapshot)); // if this fails
URI workingURI = workingDirFs.getUri();
URI rootURI = fs.getUri();
if ((!workingURI.getScheme().equals(rootURI.getScheme()) ||
workingURI.getAuthority() == null ||
!workingURI.getAuthority().equals(rootURI.getAuthority()) ||
workingURI.getUserInfo() == null ||
!workingURI.getUserInfo().equals(rootURI.getUserInfo()) ||
!fs.rename(workingDir, snapshotDir)) && !FileUtil.copy(workingDirFs, workingDir, fs,
snapshotDir, true, true, conf)) {
throw new SnapshotCreationException("Failed to copy working directory(" + workingDir
+ ") to completed directory(" + snapshotDir + ").");
} }
} }

View File

@ -17,8 +17,11 @@
*/ */
package org.apache.hadoop.hbase.master.snapshot; package org.apache.hadoop.hbase.master.snapshot;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
@ -26,11 +29,14 @@ import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
import java.util.HashSet; import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils; import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
import org.apache.hadoop.hbase.snapshot.SnapshotReferenceUtil; import org.apache.hadoop.hbase.snapshot.SnapshotReferenceUtil;
import org.apache.hadoop.hbase.snapshot.SnapshotTestingUtils.SnapshotMock; import org.apache.hadoop.hbase.snapshot.SnapshotTestingUtils.SnapshotMock;
@ -46,6 +52,11 @@ import org.junit.experimental.categories.Category;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.collect.Iterables;
import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos;
/** /**
* Test that we correctly reload the cache, filter directories, etc. * Test that we correctly reload the cache, filter directories, etc.
*/ */
@ -56,20 +67,30 @@ public class TestSnapshotFileCache {
public static final HBaseClassTestRule CLASS_RULE = public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestSnapshotFileCache.class); HBaseClassTestRule.forClass(TestSnapshotFileCache.class);
private static final Logger LOG = LoggerFactory.getLogger(TestSnapshotFileCache.class); protected static final Logger LOG = LoggerFactory.getLogger(TestSnapshotFileCache.class);
private static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); protected static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
// don't refresh the cache unless we tell it to // don't refresh the cache unless we tell it to
private static final long PERIOD = Long.MAX_VALUE; protected static final long PERIOD = Long.MAX_VALUE;
private static FileSystem fs; protected static FileSystem fs;
private static Path rootDir; protected static Path rootDir;
private static Path snapshotDir; protected static Path snapshotDir;
protected static Configuration conf;
protected static FileSystem workingFs;
protected static Path workingDir;
@BeforeClass protected static void initCommon() throws Exception {
public static void startCluster() throws Exception {
UTIL.startMiniDFSCluster(1); UTIL.startMiniDFSCluster(1);
fs = UTIL.getDFSCluster().getFileSystem(); fs = UTIL.getDFSCluster().getFileSystem();
rootDir = UTIL.getDefaultRootDirPath(); rootDir = UTIL.getDefaultRootDirPath();
snapshotDir = SnapshotDescriptionUtils.getSnapshotsDir(rootDir); snapshotDir = SnapshotDescriptionUtils.getSnapshotsDir(rootDir);
conf = UTIL.getConfiguration();
}
@BeforeClass
public static void startCluster() throws Exception {
initCommon();
workingDir = SnapshotDescriptionUtils.getWorkingSnapshotDir(rootDir, conf);
workingFs = workingDir.getFileSystem(conf);
} }
@AfterClass @AfterClass
@ -85,18 +106,20 @@ public class TestSnapshotFileCache {
@Test @Test
public void testLoadAndDelete() throws IOException { public void testLoadAndDelete() throws IOException {
SnapshotFileCache cache = new SnapshotFileCache(fs, rootDir, PERIOD, 10000000, SnapshotFileCache cache = new SnapshotFileCache(fs, rootDir, workingFs, workingDir, PERIOD,
"test-snapshot-file-cache-refresh", new SnapshotFiles()); 10000000, "test-snapshot-file-cache-refresh", new SnapshotFiles());
createAndTestSnapshotV1(cache, "snapshot1a", false, true, false); createAndTestSnapshotV1(cache, "snapshot1a", false, true, false);
createAndTestSnapshotV1(cache, "snapshot1b", true, true, false);
createAndTestSnapshotV2(cache, "snapshot2a", false, true, false); createAndTestSnapshotV2(cache, "snapshot2a", false, true, false);
createAndTestSnapshotV2(cache, "snapshot2b", true, true, false);
} }
@Test @Test
public void testReloadModifiedDirectory() throws IOException { public void testReloadModifiedDirectory() throws IOException {
SnapshotFileCache cache = new SnapshotFileCache(fs, rootDir, PERIOD, 10000000, SnapshotFileCache cache = new SnapshotFileCache(fs, rootDir, workingFs, workingDir, PERIOD,
"test-snapshot-file-cache-refresh", new SnapshotFiles()); 10000000, "test-snapshot-file-cache-refresh", new SnapshotFiles());
createAndTestSnapshotV1(cache, "snapshot1", false, true, false); createAndTestSnapshotV1(cache, "snapshot1", false, true, false);
// now delete the snapshot and add a file with a different name // now delete the snapshot and add a file with a different name
@ -109,8 +132,8 @@ public class TestSnapshotFileCache {
@Test @Test
public void testSnapshotTempDirReload() throws IOException { public void testSnapshotTempDirReload() throws IOException {
SnapshotFileCache cache = new SnapshotFileCache(fs, rootDir, PERIOD, 10000000, SnapshotFileCache cache = new SnapshotFileCache(fs, rootDir, workingFs, workingDir, PERIOD,
"test-snapshot-file-cache-refresh", new SnapshotFiles()); 10000000, "test-snapshot-file-cache-refresh", new SnapshotFiles());
// Add a new non-tmp snapshot // Add a new non-tmp snapshot
createAndTestSnapshotV1(cache, "snapshot0v1", false, false, false); createAndTestSnapshotV1(cache, "snapshot0v1", false, false, false);
@ -119,8 +142,8 @@ public class TestSnapshotFileCache {
@Test @Test
public void testCacheUpdatedWhenLastModifiedOfSnapDirNotUpdated() throws IOException { public void testCacheUpdatedWhenLastModifiedOfSnapDirNotUpdated() throws IOException {
SnapshotFileCache cache = new SnapshotFileCache(fs, rootDir, PERIOD, 10000000, SnapshotFileCache cache = new SnapshotFileCache(fs, rootDir, workingFs, workingDir, PERIOD,
"test-snapshot-file-cache-refresh", new SnapshotFiles()); 10000000, "test-snapshot-file-cache-refresh", new SnapshotFiles());
// Add a new non-tmp snapshot // Add a new non-tmp snapshot
createAndTestSnapshotV1(cache, "snapshot1v1", false, false, true); createAndTestSnapshotV1(cache, "snapshot1v1", false, false, true);
@ -133,11 +156,77 @@ public class TestSnapshotFileCache {
createAndTestSnapshotV2(cache, "snapshot2v2", true, false, true); createAndTestSnapshotV2(cache, "snapshot2v2", true, false, true);
} }
@Test
public void testWeNeverCacheTmpDirAndLoadIt() throws Exception {
final AtomicInteger count = new AtomicInteger(0);
// don't refresh the cache unless we tell it to
long period = Long.MAX_VALUE;
SnapshotFileCache cache = new SnapshotFileCache(fs, rootDir, workingFs, workingDir, period,
10000000, "test-snapshot-file-cache-refresh", new SnapshotFiles()) {
@Override
List<String> getSnapshotsInProgress()
throws IOException {
List<String> result = super.getSnapshotsInProgress();
count.incrementAndGet();
return result;
}
@Override public void triggerCacheRefreshForTesting() {
super.triggerCacheRefreshForTesting();
}
};
SnapshotMock.SnapshotBuilder complete =
createAndTestSnapshotV1(cache, "snapshot", false, false, false);
int countBeforeCheck = count.get();
CommonFSUtils.logFileSystemState(fs, rootDir, LOG);
List<FileStatus> allStoreFiles = getStoreFilesForSnapshot(complete);
Iterable<FileStatus> deletableFiles = cache.getUnreferencedFiles(allStoreFiles, null);
assertTrue(Iterables.isEmpty(deletableFiles));
// no need for tmp dir check as all files are accounted for.
assertEquals(0, count.get() - countBeforeCheck);
// add a random file to make sure we refresh
FileStatus randomFile = mockStoreFile(UTIL.getRandomUUID().toString());
allStoreFiles.add(randomFile);
deletableFiles = cache.getUnreferencedFiles(allStoreFiles, null);
assertEquals(randomFile, Iterables.getOnlyElement(deletableFiles));
assertEquals(1, count.get() - countBeforeCheck); // we check the tmp directory
}
private List<FileStatus> getStoreFilesForSnapshot(SnapshotMock.SnapshotBuilder builder)
throws IOException {
final List<FileStatus> allStoreFiles = Lists.newArrayList();
SnapshotReferenceUtil
.visitReferencedFiles(conf, fs, builder.getSnapshotsDir(),
new SnapshotReferenceUtil.SnapshotVisitor() {
@Override public void storeFile(RegionInfo regionInfo, String familyName,
SnapshotProtos.SnapshotRegionManifest.StoreFile storeFile) throws IOException {
FileStatus status = mockStoreFile(storeFile.getName());
allStoreFiles.add(status);
}
});
return allStoreFiles;
}
private FileStatus mockStoreFile(String storeFileName) {
FileStatus status = mock(FileStatus.class);
Path path = mock(Path.class);
when(path.getName()).thenReturn(storeFileName);
when(status.getPath()).thenReturn(path);
return status;
}
class SnapshotFiles implements SnapshotFileCache.SnapshotFileInspector { class SnapshotFiles implements SnapshotFileCache.SnapshotFileInspector {
@Override @Override
public Collection<String> filesUnderSnapshot(final Path snapshotDir) throws IOException { public Collection<String> filesUnderSnapshot(final FileSystem workingFs,
final Path snapshotDir) throws IOException {
Collection<String> files = new HashSet<>(); Collection<String> files = new HashSet<>();
files.addAll(SnapshotReferenceUtil.getHFileNames(UTIL.getConfiguration(), fs, snapshotDir)); files.addAll(SnapshotReferenceUtil.getHFileNames(conf, workingFs, snapshotDir));
return files; return files;
} }
} }
@ -145,7 +234,7 @@ public class TestSnapshotFileCache {
private SnapshotMock.SnapshotBuilder createAndTestSnapshotV1(final SnapshotFileCache cache, private SnapshotMock.SnapshotBuilder createAndTestSnapshotV1(final SnapshotFileCache cache,
final String name, final boolean tmp, final boolean removeOnExit, boolean setFolderTime) final String name, final boolean tmp, final boolean removeOnExit, boolean setFolderTime)
throws IOException { throws IOException {
SnapshotMock snapshotMock = new SnapshotMock(UTIL.getConfiguration(), fs, rootDir); SnapshotMock snapshotMock = new SnapshotMock(conf, fs, rootDir);
SnapshotMock.SnapshotBuilder builder = snapshotMock.createSnapshotV1(name, name); SnapshotMock.SnapshotBuilder builder = snapshotMock.createSnapshotV1(name, name);
createAndTestSnapshot(cache, builder, tmp, removeOnExit, setFolderTime); createAndTestSnapshot(cache, builder, tmp, removeOnExit, setFolderTime);
return builder; return builder;
@ -153,7 +242,7 @@ public class TestSnapshotFileCache {
private void createAndTestSnapshotV2(final SnapshotFileCache cache, final String name, private void createAndTestSnapshotV2(final SnapshotFileCache cache, final String name,
final boolean tmp, final boolean removeOnExit, boolean setFolderTime) throws IOException { final boolean tmp, final boolean removeOnExit, boolean setFolderTime) throws IOException {
SnapshotMock snapshotMock = new SnapshotMock(UTIL.getConfiguration(), fs, rootDir); SnapshotMock snapshotMock = new SnapshotMock(conf, fs, rootDir);
SnapshotMock.SnapshotBuilder builder = snapshotMock.createSnapshotV2(name, name); SnapshotMock.SnapshotBuilder builder = snapshotMock.createSnapshotV2(name, name);
createAndTestSnapshot(cache, builder, tmp, removeOnExit, setFolderTime); createAndTestSnapshot(cache, builder, tmp, removeOnExit, setFolderTime);
} }
@ -164,12 +253,20 @@ public class TestSnapshotFileCache {
List<Path> files = new ArrayList<>(); List<Path> files = new ArrayList<>();
for (int i = 0; i < 3; ++i) { for (int i = 0; i < 3; ++i) {
for (Path filePath: builder.addRegion()) { for (Path filePath: builder.addRegion()) {
if (tmp) {
// We should be able to find all the files while the snapshot creation is in-progress
CommonFSUtils.logFileSystemState(fs, rootDir, LOG);
assertFalse("Cache didn't find " + filePath,
contains(getNonSnapshotFiles(cache, filePath), filePath));
}
files.add(filePath); files.add(filePath);
} }
} }
// Finalize the snapshot // Finalize the snapshot
if (!tmp) {
builder.commit(); builder.commit();
}
if (setFolderTime) { if (setFolderTime) {
fs.setTimes(snapshotDir, 0, -1); fs.setTimes(snapshotDir, 0, -1);
@ -183,7 +280,7 @@ public class TestSnapshotFileCache {
CommonFSUtils.logFileSystemState(fs, rootDir, LOG); CommonFSUtils.logFileSystemState(fs, rootDir, LOG);
if (removeOnExit) { if (removeOnExit) {
LOG.debug("Deleting snapshot."); LOG.debug("Deleting snapshot.");
fs.delete(builder.getSnapshotsDir(), true); builder.getSnapshotsDir().getFileSystem(conf).delete(builder.getSnapshotsDir(), true);
CommonFSUtils.logFileSystemState(fs, rootDir, LOG); CommonFSUtils.logFileSystemState(fs, rootDir, LOG);
// then trigger a refresh // then trigger a refresh

View File

@ -0,0 +1,63 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master.snapshot;
import java.io.File;
import java.nio.file.Paths;
import java.util.UUID;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.MasterTests;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.experimental.categories.Category;
/**
* Test that we correctly reload the cache, filter directories, etc.
* while the temporary directory is on a different file system than the root directory
*/
@Category({MasterTests.class, LargeTests.class})
public class TestSnapshotFileCacheWithDifferentWorkingDir extends TestSnapshotFileCache {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestSnapshotFileCacheWithDifferentWorkingDir.class);
protected static String TEMP_DIR = Paths.get(".", UUID.randomUUID().toString()).toAbsolutePath().toString();
@BeforeClass
public static void startCluster() throws Exception {
initCommon();
// Set the snapshot working directory to be on another filesystem.
conf.set(SnapshotDescriptionUtils.SNAPSHOT_WORKING_DIR,
"file://" + new Path(TEMP_DIR, ".tmpDir").toUri());
workingDir = SnapshotDescriptionUtils.getWorkingSnapshotDir(rootDir, conf);
workingFs = workingDir.getFileSystem(conf);
}
@AfterClass
public static void stopCluster() throws Exception {
UTIL.shutdownMiniDFSCluster();
FileUtils.deleteDirectory(new File(TEMP_DIR));
}
}

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.hbase.master.snapshot; package org.apache.hadoop.hbase.master.snapshot;
import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import java.io.IOException; import java.io.IOException;
import java.util.Collection; import java.util.Collection;
@ -60,6 +61,7 @@ public class TestSnapshotHFileCleaner {
private static final String SNAPSHOT_NAME_STR = "testSnapshotManifest-snapshot"; private static final String SNAPSHOT_NAME_STR = "testSnapshotManifest-snapshot";
private static Path rootDir; private static Path rootDir;
private static FileSystem fs; private static FileSystem fs;
private static Configuration conf;
@Rule @Rule
public TestName name = new TestName(); public TestName name = new TestName();
@ -69,7 +71,7 @@ public class TestSnapshotHFileCleaner {
*/ */
@BeforeClass @BeforeClass
public static void setup() throws Exception { public static void setup() throws Exception {
Configuration conf = TEST_UTIL.getConfiguration(); conf = TEST_UTIL.getConfiguration();
rootDir = CommonFSUtils.getRootDir(conf); rootDir = CommonFSUtils.getRootDir(conf);
fs = FileSystem.get(conf); fs = FileSystem.get(conf);
} }
@ -83,7 +85,6 @@ public class TestSnapshotHFileCleaner {
@Test @Test
public void testFindsSnapshotFilesWhenCleaning() throws IOException { public void testFindsSnapshotFilesWhenCleaning() throws IOException {
Configuration conf = TEST_UTIL.getConfiguration();
CommonFSUtils.setRootDir(conf, TEST_UTIL.getDataTestDir()); CommonFSUtils.setRootDir(conf, TEST_UTIL.getDataTestDir());
Path rootDir = CommonFSUtils.getRootDir(conf); Path rootDir = CommonFSUtils.getRootDir(conf);
Path archivedHfileDir = new Path(TEST_UTIL.getDataTestDir(), HConstants.HFILE_ARCHIVE_DIRECTORY); Path archivedHfileDir = new Path(TEST_UTIL.getDataTestDir(), HConstants.HFILE_ARCHIVE_DIRECTORY);
@ -116,9 +117,10 @@ public class TestSnapshotHFileCleaner {
static class SnapshotFiles implements SnapshotFileCache.SnapshotFileInspector { static class SnapshotFiles implements SnapshotFileCache.SnapshotFileInspector {
@Override @Override
public Collection<String> filesUnderSnapshot(final Path snapshotDir) throws IOException { public Collection<String> filesUnderSnapshot(final FileSystem workingFs,
final Path snapshotDir) throws IOException {
Collection<String> files = new HashSet<>(); Collection<String> files = new HashSet<>();
files.addAll(SnapshotReferenceUtil.getHFileNames(TEST_UTIL.getConfiguration(), fs, snapshotDir)); files.addAll(SnapshotReferenceUtil.getHFileNames(conf, workingFs, snapshotDir));
return files; return files;
} }
} }
@ -130,14 +132,20 @@ public class TestSnapshotHFileCleaner {
@Test @Test
public void testCorruptedRegionManifest() throws IOException { public void testCorruptedRegionManifest() throws IOException {
SnapshotTestingUtils.SnapshotMock SnapshotTestingUtils.SnapshotMock
snapshotMock = new SnapshotTestingUtils.SnapshotMock(TEST_UTIL.getConfiguration(), fs, rootDir); snapshotMock = new SnapshotTestingUtils.SnapshotMock(conf, fs, rootDir);
SnapshotTestingUtils.SnapshotMock.SnapshotBuilder builder = snapshotMock.createSnapshotV2( SnapshotTestingUtils.SnapshotMock.SnapshotBuilder builder = snapshotMock.createSnapshotV2(
SNAPSHOT_NAME_STR, TABLE_NAME_STR); SNAPSHOT_NAME_STR, TABLE_NAME_STR);
builder.addRegionV2(); builder.addRegionV2();
builder.corruptOneRegionManifest(); builder.corruptOneRegionManifest();
fs.delete(SnapshotDescriptionUtils.getWorkingSnapshotDir(rootDir, TEST_UTIL.getConfiguration()), long period = Long.MAX_VALUE;
true); SnapshotFileCache cache = new SnapshotFileCache(conf, period, 10000000,
"test-snapshot-file-cache-refresh", new SnapshotFiles());
try {
cache.getSnapshotsInProgress();
} finally {
fs.delete(SnapshotDescriptionUtils.getWorkingSnapshotDir(rootDir, conf), true);
}
} }
/** /**
@ -147,7 +155,7 @@ public class TestSnapshotHFileCleaner {
@Test @Test
public void testCorruptedDataManifest() throws IOException { public void testCorruptedDataManifest() throws IOException {
SnapshotTestingUtils.SnapshotMock SnapshotTestingUtils.SnapshotMock
snapshotMock = new SnapshotTestingUtils.SnapshotMock(TEST_UTIL.getConfiguration(), fs, rootDir); snapshotMock = new SnapshotTestingUtils.SnapshotMock(conf, fs, rootDir);
SnapshotTestingUtils.SnapshotMock.SnapshotBuilder builder = snapshotMock.createSnapshotV2( SnapshotTestingUtils.SnapshotMock.SnapshotBuilder builder = snapshotMock.createSnapshotV2(
SNAPSHOT_NAME_STR, TABLE_NAME_STR); SNAPSHOT_NAME_STR, TABLE_NAME_STR);
builder.addRegionV2(); builder.addRegionV2();
@ -155,7 +163,29 @@ public class TestSnapshotHFileCleaner {
builder.consolidate(); builder.consolidate();
builder.corruptDataManifest(); builder.corruptDataManifest();
long period = Long.MAX_VALUE;
SnapshotFileCache cache = new SnapshotFileCache(conf, period, 10000000,
"test-snapshot-file-cache-refresh", new SnapshotFiles());
try {
cache.getSnapshotsInProgress();
} finally {
fs.delete(SnapshotDescriptionUtils.getWorkingSnapshotDir(rootDir, fs.delete(SnapshotDescriptionUtils.getWorkingSnapshotDir(rootDir,
TEST_UTIL.getConfiguration()), true); TEST_UTIL.getConfiguration()), true);
} }
}
@Test
public void testMissedTmpSnapshot() throws IOException {
SnapshotTestingUtils.SnapshotMock snapshotMock =
new SnapshotTestingUtils.SnapshotMock(conf, fs, rootDir);
SnapshotTestingUtils.SnapshotMock.SnapshotBuilder builder = snapshotMock.createSnapshotV2(
SNAPSHOT_NAME_STR, TABLE_NAME_STR);
builder.addRegionV2();
builder.missOneRegionSnapshotFile();
long period = Long.MAX_VALUE;
SnapshotFileCache cache = new SnapshotFileCache(conf, period, 10000000,
"test-snapshot-file-cache-refresh", new SnapshotFiles());
cache.getSnapshotsInProgress();
assertTrue(fs.exists(builder.getSnapshotsDir()));
}
} }

View File

@ -481,7 +481,8 @@ public final class SnapshotTestingUtils {
this.tableRegions = tableRegions; this.tableRegions = tableRegions;
this.snapshotDir = SnapshotDescriptionUtils.getWorkingSnapshotDir(desc, rootDir, conf); this.snapshotDir = SnapshotDescriptionUtils.getWorkingSnapshotDir(desc, rootDir, conf);
new FSTableDescriptors(conf) new FSTableDescriptors(conf)
.createTableDescriptorForTableDirectory(snapshotDir, htd, false); .createTableDescriptorForTableDirectory(this.snapshotDir.getFileSystem(conf),
snapshotDir, htd, false);
} }
public TableDescriptor getTableDescriptor() { public TableDescriptor getTableDescriptor() {
@ -604,7 +605,9 @@ public final class SnapshotTestingUtils {
SnapshotManifest manifest = SnapshotManifest.create(conf, fs, snapshotDir, desc, monitor); SnapshotManifest manifest = SnapshotManifest.create(conf, fs, snapshotDir, desc, monitor);
manifest.addTableDescriptor(htd); manifest.addTableDescriptor(htd);
manifest.consolidate(); manifest.consolidate();
SnapshotDescriptionUtils.completeSnapshot(desc, rootDir, snapshotDir, fs); Path finishedDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(desc, rootDir);
SnapshotDescriptionUtils.completeSnapshot(finishedDir, snapshotDir, fs,
snapshotDir.getFileSystem(conf), conf);
snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(desc, rootDir); snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(desc, rootDir);
return snapshotDir; return snapshotDir;
} }
@ -666,7 +669,8 @@ public final class SnapshotTestingUtils {
.build(); .build();
Path workingDir = SnapshotDescriptionUtils.getWorkingSnapshotDir(desc, rootDir, conf); Path workingDir = SnapshotDescriptionUtils.getWorkingSnapshotDir(desc, rootDir, conf);
SnapshotDescriptionUtils.writeSnapshotInfo(desc, workingDir, fs); FileSystem workingFs = workingDir.getFileSystem(conf);
SnapshotDescriptionUtils.writeSnapshotInfo(desc, workingDir, workingFs);
return new SnapshotBuilder(conf, fs, rootDir, htd, desc, regions); return new SnapshotBuilder(conf, fs, rootDir, htd, desc, regions);
} }

View File

@ -97,11 +97,15 @@ public class TestSnapshotDescriptionUtils {
Path snapshotDir = new Path(root, HConstants.SNAPSHOT_DIR_NAME); Path snapshotDir = new Path(root, HConstants.SNAPSHOT_DIR_NAME);
Path tmpDir = new Path(snapshotDir, ".tmp"); Path tmpDir = new Path(snapshotDir, ".tmp");
Path workingDir = new Path(tmpDir, "not_a_snapshot"); Path workingDir = new Path(tmpDir, "not_a_snapshot");
Configuration conf = new Configuration();
FileSystem workingFs = workingDir.getFileSystem(conf);
assertFalse("Already have working snapshot dir: " + workingDir assertFalse("Already have working snapshot dir: " + workingDir
+ " but shouldn't. Test file leak?", fs.exists(workingDir)); + " but shouldn't. Test file leak?", fs.exists(workingDir));
SnapshotDescription snapshot = SnapshotDescription.newBuilder().setName("snapshot").build(); SnapshotDescription snapshot = SnapshotDescription.newBuilder().setName("snapshot").build();
Path finishedDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(snapshot, snapshotDir);
try { try {
SnapshotDescriptionUtils.completeSnapshot(snapshot, root, workingDir, fs); SnapshotDescriptionUtils.completeSnapshot(finishedDir, workingDir, fs, workingFs, conf);
fail("Shouldn't successfully complete move of a non-existent directory."); fail("Shouldn't successfully complete move of a non-existent directory.");
} catch (IOException e) { } catch (IOException e) {
LOG.info("Correctly failed to move non-existant directory: " + e.getMessage()); LOG.info("Correctly failed to move non-existant directory: " + e.getMessage());