HBASE-23202 ExportSnapshot (import) will fail if copying files to root directory takes longer than cleaner TTL (#1874) (#1875)

Co-authored-by: Guangxu Cheng <guangxucheng@gmail.com>

Signed-off-by: Nick Dimiduk <ndimiduk@apache.org>
Signed-off-by: Zach York <zyork@apache.org>
This commit is contained in:
huaxiangsun 2020-06-08 22:51:48 -07:00 committed by GitHub
parent c5dacfbbea
commit 02e7beaca1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 328 additions and 103 deletions

View File

@ -33,6 +33,7 @@ import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.snapshot.CorruptedSnapshotException;
import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.apache.yetus.audience.InterfaceAudience;
@ -40,6 +41,7 @@ import org.apache.yetus.audience.InterfaceStability;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
/**
@ -77,17 +79,19 @@ public class SnapshotFileCache implements Stoppable {
interface SnapshotFileInspector {
/**
* Returns a collection of file names needed by the snapshot.
* @param fs {@link FileSystem} where snapshot mainifest files are stored
* @param snapshotDir {@link Path} to the snapshot directory to scan.
* @return the collection of file names needed by the snapshot.
*/
Collection<String> filesUnderSnapshot(final Path snapshotDir) throws IOException;
Collection<String> filesUnderSnapshot(final FileSystem fs, final Path snapshotDir)
throws IOException;
}
private static final Logger LOG = LoggerFactory.getLogger(SnapshotFileCache.class);
private volatile boolean stop = false;
private final FileSystem fs;
private final FileSystem fs, workingFs;
private final SnapshotFileInspector fileInspector;
private final Path snapshotDir;
private final Path snapshotDir, workingSnapshotDir;
private final Set<String> cache = new HashSet<>();
/**
* This is a helper map of information about the snapshot directories so we don't need to rescan
@ -104,14 +108,18 @@ public class SnapshotFileCache implements Stoppable {
* @param conf to extract the configured {@link FileSystem} where the snapshots are stored and
* hbase root directory
* @param cacheRefreshPeriod frequency (ms) with which the cache should be refreshed
* @param cacheRefreshDelay amount of time to wait for the cache to be refreshed
* @param refreshThreadName name of the cache refresh thread
* @param inspectSnapshotFiles Filter to apply to each snapshot to extract the files.
* @throws IOException if the {@link FileSystem} or root directory cannot be loaded
*/
public SnapshotFileCache(Configuration conf, long cacheRefreshPeriod, String refreshThreadName,
SnapshotFileInspector inspectSnapshotFiles) throws IOException {
this(CommonFSUtils.getCurrentFileSystem(conf), CommonFSUtils.getRootDir(conf), 0,
cacheRefreshPeriod, refreshThreadName, inspectSnapshotFiles);
public SnapshotFileCache(Configuration conf, long cacheRefreshPeriod, long cacheRefreshDelay,
String refreshThreadName, SnapshotFileInspector inspectSnapshotFiles) throws IOException {
this(CommonFSUtils.getCurrentFileSystem(conf), CommonFSUtils.getRootDir(conf),
SnapshotDescriptionUtils.getWorkingSnapshotDir(CommonFSUtils.getRootDir(conf), conf).
getFileSystem(conf),
SnapshotDescriptionUtils.getWorkingSnapshotDir(CommonFSUtils.getRootDir(conf), conf),
cacheRefreshPeriod, cacheRefreshDelay, refreshThreadName, inspectSnapshotFiles);
}
/**
@ -119,15 +127,19 @@ public class SnapshotFileCache implements Stoppable {
* filesystem
* @param fs {@link FileSystem} where the snapshots are stored
* @param rootDir hbase root directory
* @param workingFs {@link FileSystem} where ongoing snapshot mainifest files are stored
* @param workingDir Location to store ongoing snapshot manifest files
* @param cacheRefreshPeriod period (ms) with which the cache should be refreshed
* @param cacheRefreshDelay amount of time to wait for the cache to be refreshed
* @param refreshThreadName name of the cache refresh thread
* @param inspectSnapshotFiles Filter to apply to each snapshot to extract the files.
*/
public SnapshotFileCache(FileSystem fs, Path rootDir, long cacheRefreshPeriod,
long cacheRefreshDelay, String refreshThreadName,
SnapshotFileInspector inspectSnapshotFiles) {
public SnapshotFileCache(FileSystem fs, Path rootDir, FileSystem workingFs, Path workingDir,
long cacheRefreshPeriod, long cacheRefreshDelay, String refreshThreadName,
SnapshotFileInspector inspectSnapshotFiles) {
this.fs = fs;
this.workingFs = workingFs;
this.workingSnapshotDir = workingDir;
this.fileInspector = inspectSnapshotFiles;
this.snapshotDir = SnapshotDescriptionUtils.getSnapshotsDir(rootDir);
// periodically refresh the file cache to make sure we aren't superfluously saving files.
@ -176,6 +188,7 @@ public class SnapshotFileCache implements Stoppable {
public synchronized Iterable<FileStatus> getUnreferencedFiles(Iterable<FileStatus> files,
final SnapshotManager snapshotManager) throws IOException {
List<FileStatus> unReferencedFiles = Lists.newArrayList();
List<String> snapshotsInProgress = null;
boolean refreshed = false;
Lock lock = null;
if (snapshotManager != null) {
@ -197,6 +210,12 @@ public class SnapshotFileCache implements Stoppable {
if (cache.contains(fileName)) {
continue;
}
if (snapshotsInProgress == null) {
snapshotsInProgress = getSnapshotsInProgress();
}
if (snapshotsInProgress.contains(fileName)) {
continue;
}
unReferencedFiles.add(file);
}
} finally {
@ -239,7 +258,8 @@ public class SnapshotFileCache implements Stoppable {
// that new snapshot, even though it has the same name as the files referenced have
// probably changed.
if (files == null || files.hasBeenModified(snapshotDir.getModificationTime())) {
Collection<String> storedFiles = fileInspector.filesUnderSnapshot(snapshotDir.getPath());
Collection<String> storedFiles = fileInspector.filesUnderSnapshot(fs,
snapshotDir.getPath());
files = new SnapshotDirectoryInfo(snapshotDir.getModificationTime(), storedFiles);
}
// add all the files to cache
@ -251,6 +271,26 @@ public class SnapshotFileCache implements Stoppable {
this.snapshots.putAll(newSnapshots);
}
@VisibleForTesting
List<String> getSnapshotsInProgress() throws IOException {
List<String> snapshotInProgress = Lists.newArrayList();
// only add those files to the cache, but not to the known snapshots
FileStatus[] snapshotsInProgress = CommonFSUtils.listStatus(this.workingFs, this.workingSnapshotDir);
if (!ArrayUtils.isEmpty(snapshotsInProgress)) {
for (FileStatus snapshot : snapshotsInProgress) {
try {
snapshotInProgress.addAll(fileInspector.filesUnderSnapshot(workingFs,
snapshot.getPath()));
} catch (CorruptedSnapshotException cse) {
LOG.info("Corrupted in-progress snapshot file exception, ignored.", cse);
}
}
}
return snapshotInProgress;
}
/**
* Simple helper task that just periodically attempts to refresh the cache
*/

View File

@ -30,6 +30,7 @@ import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.hadoop.hbase.master.cleaner.BaseHFileCleanerDelegate;
import org.apache.hadoop.hbase.snapshot.CorruptedSnapshotException;
import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
import org.apache.hadoop.hbase.snapshot.SnapshotReferenceUtil;
import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.apache.yetus.audience.InterfaceAudience;
@ -93,10 +94,15 @@ public class SnapshotHFileCleaner extends BaseHFileCleanerDelegate {
DEFAULT_HFILE_CACHE_REFRESH_PERIOD);
final FileSystem fs = CommonFSUtils.getCurrentFileSystem(conf);
Path rootDir = CommonFSUtils.getRootDir(conf);
cache = new SnapshotFileCache(fs, rootDir, cacheRefreshPeriod, cacheRefreshPeriod,
"snapshot-hfile-cleaner-cache-refresher", new SnapshotFileCache.SnapshotFileInspector() {
Path workingDir = SnapshotDescriptionUtils.getWorkingSnapshotDir(rootDir, conf);
FileSystem workingFs = workingDir.getFileSystem(conf);
cache = new SnapshotFileCache(fs, rootDir, workingFs, workingDir, cacheRefreshPeriod,
cacheRefreshPeriod, "snapshot-hfile-cleaner-cache-refresher",
new SnapshotFileCache.SnapshotFileInspector() {
@Override
public Collection<String> filesUnderSnapshot(final Path snapshotDir)
public Collection<String> filesUnderSnapshot(final FileSystem fs,
final Path snapshotDir)
throws IOException {
return SnapshotReferenceUtil.getHFileNames(conf, fs, snapshotDir);
}

View File

@ -224,7 +224,9 @@ public abstract class TakeSnapshotHandler extends EventHandler implements Snapsh
verifier.verifySnapshot(this.workingDir, serverNames);
// complete the snapshot, atomically moving from tmp to .snapshot dir.
completeSnapshot(this.snapshotDir, this.workingDir, this.rootFs, this.workingDirFs);
SnapshotDescriptionUtils.completeSnapshot(this.snapshotDir, this.workingDir, this.rootFs,
this.workingDirFs, this.conf);
finished = true;
msg = "Snapshot " + snapshot.getName() + " of table " + snapshotTable + " completed";
status.markComplete(msg);
LOG.info(msg);
@ -258,42 +260,6 @@ public abstract class TakeSnapshotHandler extends EventHandler implements Snapsh
}
}
/**
* Reset the manager to allow another snapshot to proceed.
* Commits the snapshot process by moving the working snapshot
* to the finalized filepath
*
* @param snapshotDir The file path of the completed snapshots
* @param workingDir The file path of the in progress snapshots
* @param fs The file system of the completed snapshots
* @param workingDirFs The file system of the in progress snapshots
*
* @throws SnapshotCreationException if the snapshot could not be moved
* @throws IOException the filesystem could not be reached
*/
public void completeSnapshot(Path snapshotDir, Path workingDir, FileSystem fs,
FileSystem workingDirFs) throws SnapshotCreationException, IOException {
LOG.debug("Sentinel is done, just moving the snapshot from " + workingDir + " to "
+ snapshotDir);
// If the working and completed snapshot directory are on the same file system, attempt
// to rename the working snapshot directory to the completed location. If that fails,
// or the file systems differ, attempt to copy the directory over, throwing an exception
// if this fails
URI workingURI = workingDirFs.getUri();
URI rootURI = fs.getUri();
if ((!workingURI.getScheme().equals(rootURI.getScheme()) ||
workingURI.getAuthority() == null ||
!workingURI.getAuthority().equals(rootURI.getAuthority()) ||
workingURI.getUserInfo() == null ||
!workingURI.getUserInfo().equals(rootURI.getUserInfo()) ||
!fs.rename(workingDir, snapshotDir)) && !FileUtil.copy(workingDirFs, workingDir, fs,
snapshotDir, true, true, this.conf)) {
throw new SnapshotCreationException("Failed to copy working directory(" + workingDir
+ ") to completed directory(" + snapshotDir + ").");
}
finished = true;
}
/**
* When taking snapshot, first we must acquire the exclusive table lock to confirm that there are
* no ongoing merge/split procedures. But later, we should try our best to release the exclusive

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.hbase.snapshot;
import java.io.IOException;
import java.net.URI;
import java.security.PrivilegedExceptionAction;
import java.util.Collections;
import java.util.concurrent.TimeUnit;
@ -25,6 +26,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hbase.HConstants;
@ -400,25 +402,38 @@ public final class SnapshotDescriptionUtils {
}
/**
* Move the finished snapshot to its final, publicly visible directory - this marks the snapshot
* as 'complete'.
* @param snapshot description of the snapshot being tabken
* @param rootdir root directory of the hbase installation
* @param workingDir directory where the in progress snapshot was built
* @param fs {@link FileSystem} where the snapshot was built
* @throws org.apache.hadoop.hbase.snapshot.SnapshotCreationException if the
* snapshot could not be moved
* Commits the snapshot process by moving the working snapshot
* to the finalized filepath
*
* @param snapshotDir The file path of the completed snapshots
* @param workingDir The file path of the in progress snapshots
* @param fs The file system of the completed snapshots
* @param workingDirFs The file system of the in progress snapshots
* @param conf Configuration
*
* @throws SnapshotCreationException if the snapshot could not be moved
* @throws IOException the filesystem could not be reached
*/
public static void completeSnapshot(SnapshotDescription snapshot, Path rootdir, Path workingDir,
FileSystem fs) throws SnapshotCreationException, IOException {
Path finishedDir = getCompletedSnapshotDir(snapshot, rootdir);
LOG.debug("Snapshot is done, just moving the snapshot from " + workingDir + " to "
+ finishedDir);
if (!fs.rename(workingDir, finishedDir)) {
throw new SnapshotCreationException(
"Failed to move working directory(" + workingDir + ") to completed directory("
+ finishedDir + ").", ProtobufUtil.createSnapshotDesc(snapshot));
public static void completeSnapshot(Path snapshotDir, Path workingDir, FileSystem fs,
FileSystem workingDirFs, final Configuration conf)
throws SnapshotCreationException, IOException {
LOG.debug("Sentinel is done, just moving the snapshot from " + workingDir + " to "
+ snapshotDir);
// If the working and completed snapshot directory are on the same file system, attempt
// to rename the working snapshot directory to the completed location. If that fails,
// or the file systems differ, attempt to copy the directory over, throwing an exception
// if this fails
URI workingURI = workingDirFs.getUri();
URI rootURI = fs.getUri();
if ((!workingURI.getScheme().equals(rootURI.getScheme()) ||
workingURI.getAuthority() == null ||
!workingURI.getAuthority().equals(rootURI.getAuthority()) ||
workingURI.getUserInfo() == null ||
!workingURI.getUserInfo().equals(rootURI.getUserInfo()) ||
!fs.rename(workingDir, snapshotDir)) && !FileUtil.copy(workingDirFs, workingDir, fs,
snapshotDir, true, true, conf)) {
throw new SnapshotCreationException("Failed to copy working directory(" + workingDir
+ ") to completed directory(" + snapshotDir + ").");
}
}

View File

@ -17,8 +17,11 @@
*/
package org.apache.hadoop.hbase.master.snapshot;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.io.IOException;
import java.util.ArrayList;
@ -26,11 +29,14 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
import org.apache.hadoop.hbase.snapshot.SnapshotReferenceUtil;
import org.apache.hadoop.hbase.snapshot.SnapshotTestingUtils.SnapshotMock;
@ -46,6 +52,11 @@ import org.junit.experimental.categories.Category;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.collect.Iterables;
import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos;
/**
* Test that we correctly reload the cache, filter directories, etc.
*/
@ -56,20 +67,30 @@ public class TestSnapshotFileCache {
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestSnapshotFileCache.class);
private static final Logger LOG = LoggerFactory.getLogger(TestSnapshotFileCache.class);
private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
protected static final Logger LOG = LoggerFactory.getLogger(TestSnapshotFileCache.class);
protected static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
// don't refresh the cache unless we tell it to
private static final long PERIOD = Long.MAX_VALUE;
private static FileSystem fs;
private static Path rootDir;
private static Path snapshotDir;
protected static final long PERIOD = Long.MAX_VALUE;
protected static FileSystem fs;
protected static Path rootDir;
protected static Path snapshotDir;
protected static Configuration conf;
protected static FileSystem workingFs;
protected static Path workingDir;
@BeforeClass
public static void startCluster() throws Exception {
protected static void initCommon() throws Exception {
UTIL.startMiniDFSCluster(1);
fs = UTIL.getDFSCluster().getFileSystem();
rootDir = UTIL.getDefaultRootDirPath();
snapshotDir = SnapshotDescriptionUtils.getSnapshotsDir(rootDir);
conf = UTIL.getConfiguration();
}
@BeforeClass
public static void startCluster() throws Exception {
initCommon();
workingDir = SnapshotDescriptionUtils.getWorkingSnapshotDir(rootDir, conf);
workingFs = workingDir.getFileSystem(conf);
}
@AfterClass
@ -85,18 +106,20 @@ public class TestSnapshotFileCache {
@Test
public void testLoadAndDelete() throws IOException {
SnapshotFileCache cache = new SnapshotFileCache(fs, rootDir, PERIOD, 10000000,
"test-snapshot-file-cache-refresh", new SnapshotFiles());
SnapshotFileCache cache = new SnapshotFileCache(fs, rootDir, workingFs, workingDir, PERIOD,
10000000, "test-snapshot-file-cache-refresh", new SnapshotFiles());
createAndTestSnapshotV1(cache, "snapshot1a", false, true, false);
createAndTestSnapshotV1(cache, "snapshot1b", true, true, false);
createAndTestSnapshotV2(cache, "snapshot2a", false, true, false);
createAndTestSnapshotV2(cache, "snapshot2b", true, true, false);
}
@Test
public void testReloadModifiedDirectory() throws IOException {
SnapshotFileCache cache = new SnapshotFileCache(fs, rootDir, PERIOD, 10000000,
"test-snapshot-file-cache-refresh", new SnapshotFiles());
SnapshotFileCache cache = new SnapshotFileCache(fs, rootDir, workingFs, workingDir, PERIOD,
10000000, "test-snapshot-file-cache-refresh", new SnapshotFiles());
createAndTestSnapshotV1(cache, "snapshot1", false, true, false);
// now delete the snapshot and add a file with a different name
@ -109,8 +132,8 @@ public class TestSnapshotFileCache {
@Test
public void testSnapshotTempDirReload() throws IOException {
SnapshotFileCache cache = new SnapshotFileCache(fs, rootDir, PERIOD, 10000000,
"test-snapshot-file-cache-refresh", new SnapshotFiles());
SnapshotFileCache cache = new SnapshotFileCache(fs, rootDir, workingFs, workingDir, PERIOD,
10000000, "test-snapshot-file-cache-refresh", new SnapshotFiles());
// Add a new non-tmp snapshot
createAndTestSnapshotV1(cache, "snapshot0v1", false, false, false);
@ -119,8 +142,8 @@ public class TestSnapshotFileCache {
@Test
public void testCacheUpdatedWhenLastModifiedOfSnapDirNotUpdated() throws IOException {
SnapshotFileCache cache = new SnapshotFileCache(fs, rootDir, PERIOD, 10000000,
"test-snapshot-file-cache-refresh", new SnapshotFiles());
SnapshotFileCache cache = new SnapshotFileCache(fs, rootDir, workingFs, workingDir, PERIOD,
10000000, "test-snapshot-file-cache-refresh", new SnapshotFiles());
// Add a new non-tmp snapshot
createAndTestSnapshotV1(cache, "snapshot1v1", false, false, true);
@ -133,11 +156,77 @@ public class TestSnapshotFileCache {
createAndTestSnapshotV2(cache, "snapshot2v2", true, false, true);
}
@Test
public void testWeNeverCacheTmpDirAndLoadIt() throws Exception {
final AtomicInteger count = new AtomicInteger(0);
// don't refresh the cache unless we tell it to
long period = Long.MAX_VALUE;
SnapshotFileCache cache = new SnapshotFileCache(fs, rootDir, workingFs, workingDir, period,
10000000, "test-snapshot-file-cache-refresh", new SnapshotFiles()) {
@Override
List<String> getSnapshotsInProgress()
throws IOException {
List<String> result = super.getSnapshotsInProgress();
count.incrementAndGet();
return result;
}
@Override public void triggerCacheRefreshForTesting() {
super.triggerCacheRefreshForTesting();
}
};
SnapshotMock.SnapshotBuilder complete =
createAndTestSnapshotV1(cache, "snapshot", false, false, false);
int countBeforeCheck = count.get();
CommonFSUtils.logFileSystemState(fs, rootDir, LOG);
List<FileStatus> allStoreFiles = getStoreFilesForSnapshot(complete);
Iterable<FileStatus> deletableFiles = cache.getUnreferencedFiles(allStoreFiles, null);
assertTrue(Iterables.isEmpty(deletableFiles));
// no need for tmp dir check as all files are accounted for.
assertEquals(0, count.get() - countBeforeCheck);
// add a random file to make sure we refresh
FileStatus randomFile = mockStoreFile(UTIL.getRandomUUID().toString());
allStoreFiles.add(randomFile);
deletableFiles = cache.getUnreferencedFiles(allStoreFiles, null);
assertEquals(randomFile, Iterables.getOnlyElement(deletableFiles));
assertEquals(1, count.get() - countBeforeCheck); // we check the tmp directory
}
private List<FileStatus> getStoreFilesForSnapshot(SnapshotMock.SnapshotBuilder builder)
throws IOException {
final List<FileStatus> allStoreFiles = Lists.newArrayList();
SnapshotReferenceUtil
.visitReferencedFiles(conf, fs, builder.getSnapshotsDir(),
new SnapshotReferenceUtil.SnapshotVisitor() {
@Override public void storeFile(RegionInfo regionInfo, String familyName,
SnapshotProtos.SnapshotRegionManifest.StoreFile storeFile) throws IOException {
FileStatus status = mockStoreFile(storeFile.getName());
allStoreFiles.add(status);
}
});
return allStoreFiles;
}
private FileStatus mockStoreFile(String storeFileName) {
FileStatus status = mock(FileStatus.class);
Path path = mock(Path.class);
when(path.getName()).thenReturn(storeFileName);
when(status.getPath()).thenReturn(path);
return status;
}
class SnapshotFiles implements SnapshotFileCache.SnapshotFileInspector {
@Override
public Collection<String> filesUnderSnapshot(final Path snapshotDir) throws IOException {
public Collection<String> filesUnderSnapshot(final FileSystem workingFs,
final Path snapshotDir) throws IOException {
Collection<String> files = new HashSet<>();
files.addAll(SnapshotReferenceUtil.getHFileNames(UTIL.getConfiguration(), fs, snapshotDir));
files.addAll(SnapshotReferenceUtil.getHFileNames(conf, workingFs, snapshotDir));
return files;
}
};
@ -145,7 +234,7 @@ public class TestSnapshotFileCache {
private SnapshotMock.SnapshotBuilder createAndTestSnapshotV1(final SnapshotFileCache cache,
final String name, final boolean tmp, final boolean removeOnExit, boolean setFolderTime)
throws IOException {
SnapshotMock snapshotMock = new SnapshotMock(UTIL.getConfiguration(), fs, rootDir);
SnapshotMock snapshotMock = new SnapshotMock(conf, fs, rootDir);
SnapshotMock.SnapshotBuilder builder = snapshotMock.createSnapshotV1(name, name);
createAndTestSnapshot(cache, builder, tmp, removeOnExit, setFolderTime);
return builder;
@ -153,7 +242,7 @@ public class TestSnapshotFileCache {
private void createAndTestSnapshotV2(final SnapshotFileCache cache, final String name,
final boolean tmp, final boolean removeOnExit, boolean setFolderTime) throws IOException {
SnapshotMock snapshotMock = new SnapshotMock(UTIL.getConfiguration(), fs, rootDir);
SnapshotMock snapshotMock = new SnapshotMock(conf, fs, rootDir);
SnapshotMock.SnapshotBuilder builder = snapshotMock.createSnapshotV2(name, name);
createAndTestSnapshot(cache, builder, tmp, removeOnExit, setFolderTime);
}
@ -164,12 +253,20 @@ public class TestSnapshotFileCache {
List<Path> files = new ArrayList<>();
for (int i = 0; i < 3; ++i) {
for (Path filePath: builder.addRegion()) {
if (tmp) {
// We should be able to find all the files while the snapshot creation is in-progress
CommonFSUtils.logFileSystemState(fs, rootDir, LOG);
assertFalse("Cache didn't find " + filePath,
contains(getNonSnapshotFiles(cache, filePath), filePath));
}
files.add(filePath);
}
}
// Finalize the snapshot
builder.commit();
if (!tmp) {
builder.commit();
}
if (setFolderTime) {
fs.setTimes(snapshotDir, 0, -1);
@ -183,7 +280,7 @@ public class TestSnapshotFileCache {
CommonFSUtils.logFileSystemState(fs, rootDir, LOG);
if (removeOnExit) {
LOG.debug("Deleting snapshot.");
fs.delete(builder.getSnapshotsDir(), true);
builder.getSnapshotsDir().getFileSystem(conf).delete(builder.getSnapshotsDir(), true);
CommonFSUtils.logFileSystemState(fs, rootDir, LOG);
// then trigger a refresh

View File

@ -0,0 +1,63 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master.snapshot;
import java.io.File;
import java.nio.file.Paths;
import java.util.UUID;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.MasterTests;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.experimental.categories.Category;
/**
* Test that we correctly reload the cache, filter directories, etc.
* while the temporary directory is on a different file system than the root directory
*/
@Category({MasterTests.class, LargeTests.class})
public class TestSnapshotFileCacheWithDifferentWorkingDir extends TestSnapshotFileCache {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestSnapshotFileCacheWithDifferentWorkingDir.class);
protected static String TEMP_DIR = Paths.get(".", UUID.randomUUID().toString()).toAbsolutePath().toString();
@BeforeClass
public static void startCluster() throws Exception {
initCommon();
// Set the snapshot working directory to be on another filesystem.
conf.set(SnapshotDescriptionUtils.SNAPSHOT_WORKING_DIR,
"file://" + new Path(TEMP_DIR, ".tmpDir").toUri());
workingDir = SnapshotDescriptionUtils.getWorkingSnapshotDir(rootDir, conf);
workingFs = workingDir.getFileSystem(conf);
}
@AfterClass
public static void stopCluster() throws Exception {
UTIL.shutdownMiniDFSCluster();
FileUtils.deleteDirectory(new File(TEMP_DIR));
}
}

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.hbase.master.snapshot;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.util.Collection;
@ -60,6 +61,7 @@ public class TestSnapshotHFileCleaner {
private static final String SNAPSHOT_NAME_STR = "testSnapshotManifest-snapshot";
private static Path rootDir;
private static FileSystem fs;
private static Configuration conf;
@Rule
public TestName name = new TestName();
@ -69,7 +71,7 @@ public class TestSnapshotHFileCleaner {
*/
@BeforeClass
public static void setup() throws Exception {
Configuration conf = TEST_UTIL.getConfiguration();
conf = TEST_UTIL.getConfiguration();
rootDir = CommonFSUtils.getRootDir(conf);
fs = FileSystem.get(conf);
}
@ -83,7 +85,6 @@ public class TestSnapshotHFileCleaner {
@Test
public void testFindsSnapshotFilesWhenCleaning() throws IOException {
Configuration conf = TEST_UTIL.getConfiguration();
CommonFSUtils.setRootDir(conf, TEST_UTIL.getDataTestDir());
Path rootDir = CommonFSUtils.getRootDir(conf);
Path archivedHfileDir = new Path(TEST_UTIL.getDataTestDir(), HConstants.HFILE_ARCHIVE_DIRECTORY);
@ -117,9 +118,10 @@ public class TestSnapshotHFileCleaner {
static class SnapshotFiles implements SnapshotFileCache.SnapshotFileInspector {
@Override
public Collection<String> filesUnderSnapshot(final Path snapshotDir) throws IOException {
public Collection<String> filesUnderSnapshot(final FileSystem workingFs,
final Path snapshotDir) throws IOException {
Collection<String> files = new HashSet<>();
files.addAll(SnapshotReferenceUtil.getHFileNames(TEST_UTIL.getConfiguration(), fs, snapshotDir));
files.addAll(SnapshotReferenceUtil.getHFileNames(conf, workingFs, snapshotDir));
return files;
}
}
@ -131,14 +133,20 @@ public class TestSnapshotHFileCleaner {
@Test
public void testCorruptedRegionManifest() throws IOException {
SnapshotTestingUtils.SnapshotMock
snapshotMock = new SnapshotTestingUtils.SnapshotMock(TEST_UTIL.getConfiguration(), fs, rootDir);
snapshotMock = new SnapshotTestingUtils.SnapshotMock(conf, fs, rootDir);
SnapshotTestingUtils.SnapshotMock.SnapshotBuilder builder = snapshotMock.createSnapshotV2(
SNAPSHOT_NAME_STR, TABLE_NAME_STR);
builder.addRegionV2();
builder.corruptOneRegionManifest();
fs.delete(SnapshotDescriptionUtils.getWorkingSnapshotDir(rootDir, TEST_UTIL.getConfiguration()),
true);
long period = Long.MAX_VALUE;
SnapshotFileCache cache = new SnapshotFileCache(conf, period, 10000000,
"test-snapshot-file-cache-refresh", new SnapshotFiles());
try {
cache.getSnapshotsInProgress();
} finally {
fs.delete(SnapshotDescriptionUtils.getWorkingSnapshotDir(rootDir, conf), true);
}
}
/**
@ -148,7 +156,7 @@ public class TestSnapshotHFileCleaner {
@Test
public void testCorruptedDataManifest() throws IOException {
SnapshotTestingUtils.SnapshotMock
snapshotMock = new SnapshotTestingUtils.SnapshotMock(TEST_UTIL.getConfiguration(), fs, rootDir);
snapshotMock = new SnapshotTestingUtils.SnapshotMock(conf, fs, rootDir);
SnapshotTestingUtils.SnapshotMock.SnapshotBuilder builder = snapshotMock.createSnapshotV2(
SNAPSHOT_NAME_STR, TABLE_NAME_STR);
builder.addRegionV2();
@ -156,7 +164,29 @@ public class TestSnapshotHFileCleaner {
builder.consolidate();
builder.corruptDataManifest();
fs.delete(SnapshotDescriptionUtils.getWorkingSnapshotDir(rootDir,
long period = Long.MAX_VALUE;
SnapshotFileCache cache = new SnapshotFileCache(conf, period, 10000000,
"test-snapshot-file-cache-refresh", new SnapshotFiles());
try {
cache.getSnapshotsInProgress();
} finally {
fs.delete(SnapshotDescriptionUtils.getWorkingSnapshotDir(rootDir,
TEST_UTIL.getConfiguration()), true);
}
}
@Test
public void testMissedTmpSnapshot() throws IOException {
SnapshotTestingUtils.SnapshotMock snapshotMock =
new SnapshotTestingUtils.SnapshotMock(conf, fs, rootDir);
SnapshotTestingUtils.SnapshotMock.SnapshotBuilder builder = snapshotMock.createSnapshotV2(
SNAPSHOT_NAME_STR, TABLE_NAME_STR);
builder.addRegionV2();
builder.missOneRegionSnapshotFile();
long period = Long.MAX_VALUE;
SnapshotFileCache cache = new SnapshotFileCache(conf, period, 10000000,
"test-snapshot-file-cache-refresh", new SnapshotFiles());
cache.getSnapshotsInProgress();
assertTrue(fs.exists(builder.getSnapshotsDir()));
}
}

View File

@ -510,7 +510,8 @@ public final class SnapshotTestingUtils {
this.tableRegions = tableRegions;
this.snapshotDir = SnapshotDescriptionUtils.getWorkingSnapshotDir(desc, rootDir, conf);
new FSTableDescriptors(conf)
.createTableDescriptorForTableDirectory(snapshotDir, htd, false);
.createTableDescriptorForTableDirectory(this.snapshotDir.getFileSystem(conf),
snapshotDir, htd, false);
}
public TableDescriptor getTableDescriptor() {
@ -633,7 +634,9 @@ public final class SnapshotTestingUtils {
SnapshotManifest manifest = SnapshotManifest.create(conf, fs, snapshotDir, desc, monitor);
manifest.addTableDescriptor(htd);
manifest.consolidate();
SnapshotDescriptionUtils.completeSnapshot(desc, rootDir, snapshotDir, fs);
Path finishedDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(desc, rootDir);
SnapshotDescriptionUtils.completeSnapshot(finishedDir, snapshotDir, fs,
snapshotDir.getFileSystem(conf), conf);
snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(desc, rootDir);
return snapshotDir;
}
@ -695,7 +698,8 @@ public final class SnapshotTestingUtils {
.build();
Path workingDir = SnapshotDescriptionUtils.getWorkingSnapshotDir(desc, rootDir, conf);
SnapshotDescriptionUtils.writeSnapshotInfo(desc, workingDir, fs);
FileSystem workingFs = workingDir.getFileSystem(conf);
SnapshotDescriptionUtils.writeSnapshotInfo(desc, workingDir, workingFs);
return new SnapshotBuilder(conf, fs, rootDir, htd, desc, regions);
}

View File

@ -97,11 +97,15 @@ public class TestSnapshotDescriptionUtils {
Path snapshotDir = new Path(root, HConstants.SNAPSHOT_DIR_NAME);
Path tmpDir = new Path(snapshotDir, ".tmp");
Path workingDir = new Path(tmpDir, "not_a_snapshot");
Configuration conf = new Configuration();
FileSystem workingFs = workingDir.getFileSystem(conf);
assertFalse("Already have working snapshot dir: " + workingDir
+ " but shouldn't. Test file leak?", fs.exists(workingDir));
SnapshotDescription snapshot = SnapshotDescription.newBuilder().setName("snapshot").build();
Path finishedDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(snapshot, snapshotDir);
try {
SnapshotDescriptionUtils.completeSnapshot(snapshot, root, workingDir, fs);
SnapshotDescriptionUtils.completeSnapshot(finishedDir, workingDir, fs, workingFs, conf);
fail("Shouldn't successfully complete move of a non-existent directory.");
} catch (IOException e) {
LOG.info("Correctly failed to move non-existant directory: " + e.getMessage());