Revert "HBASE-23202 ExportSnapshot (import) will fail if copying files to root directory takes longer than cleaner TTL (#1791)" (#1873)
This reverts commit f862f3d9b5
.
This commit is contained in:
parent
f862f3d9b5
commit
5042b591f8
|
@ -33,7 +33,6 @@ import org.apache.hadoop.fs.FileStatus;
|
|||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.Stoppable;
|
||||
import org.apache.hadoop.hbase.snapshot.CorruptedSnapshotException;
|
||||
import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
|
||||
import org.apache.hadoop.hbase.util.CommonFSUtils;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
|
@ -41,7 +40,6 @@ import org.apache.yetus.audience.InterfaceStability;
|
|||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
|
||||
import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
|
||||
|
||||
/**
|
||||
|
@ -79,19 +77,17 @@ public class SnapshotFileCache implements Stoppable {
|
|||
interface SnapshotFileInspector {
|
||||
/**
|
||||
* Returns a collection of file names needed by the snapshot.
|
||||
* @param fs {@link FileSystem} where snapshot mainifest files are stored
|
||||
* @param snapshotDir {@link Path} to the snapshot directory to scan.
|
||||
* @return the collection of file names needed by the snapshot.
|
||||
*/
|
||||
Collection<String> filesUnderSnapshot(final FileSystem fs, final Path snapshotDir)
|
||||
throws IOException;
|
||||
Collection<String> filesUnderSnapshot(final Path snapshotDir) throws IOException;
|
||||
}
|
||||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(SnapshotFileCache.class);
|
||||
private volatile boolean stop = false;
|
||||
private final FileSystem fs, workingFs;
|
||||
private final FileSystem fs;
|
||||
private final SnapshotFileInspector fileInspector;
|
||||
private final Path snapshotDir, workingSnapshotDir;
|
||||
private final Path snapshotDir;
|
||||
private final Set<String> cache = new HashSet<>();
|
||||
/**
|
||||
* This is a helper map of information about the snapshot directories so we don't need to rescan
|
||||
|
@ -108,18 +104,14 @@ public class SnapshotFileCache implements Stoppable {
|
|||
* @param conf to extract the configured {@link FileSystem} where the snapshots are stored and
|
||||
* hbase root directory
|
||||
* @param cacheRefreshPeriod frequency (ms) with which the cache should be refreshed
|
||||
* @param cacheRefreshDelay amount of time to wait for the cache to be refreshed
|
||||
* @param refreshThreadName name of the cache refresh thread
|
||||
* @param inspectSnapshotFiles Filter to apply to each snapshot to extract the files.
|
||||
* @throws IOException if the {@link FileSystem} or root directory cannot be loaded
|
||||
*/
|
||||
public SnapshotFileCache(Configuration conf, long cacheRefreshPeriod, long cacheRefreshDelay,
|
||||
String refreshThreadName, SnapshotFileInspector inspectSnapshotFiles) throws IOException {
|
||||
this(CommonFSUtils.getCurrentFileSystem(conf), CommonFSUtils.getRootDir(conf),
|
||||
SnapshotDescriptionUtils.getWorkingSnapshotDir(CommonFSUtils.getRootDir(conf), conf).
|
||||
getFileSystem(conf),
|
||||
SnapshotDescriptionUtils.getWorkingSnapshotDir(CommonFSUtils.getRootDir(conf), conf),
|
||||
cacheRefreshPeriod, cacheRefreshDelay, refreshThreadName, inspectSnapshotFiles);
|
||||
public SnapshotFileCache(Configuration conf, long cacheRefreshPeriod, String refreshThreadName,
|
||||
SnapshotFileInspector inspectSnapshotFiles) throws IOException {
|
||||
this(CommonFSUtils.getCurrentFileSystem(conf), CommonFSUtils.getRootDir(conf), 0,
|
||||
cacheRefreshPeriod, refreshThreadName, inspectSnapshotFiles);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -127,19 +119,15 @@ public class SnapshotFileCache implements Stoppable {
|
|||
* filesystem
|
||||
* @param fs {@link FileSystem} where the snapshots are stored
|
||||
* @param rootDir hbase root directory
|
||||
* @param workingFs {@link FileSystem} where ongoing snapshot mainifest files are stored
|
||||
* @param workingDir Location to store ongoing snapshot manifest files
|
||||
* @param cacheRefreshPeriod period (ms) with which the cache should be refreshed
|
||||
* @param cacheRefreshDelay amount of time to wait for the cache to be refreshed
|
||||
* @param refreshThreadName name of the cache refresh thread
|
||||
* @param inspectSnapshotFiles Filter to apply to each snapshot to extract the files.
|
||||
*/
|
||||
public SnapshotFileCache(FileSystem fs, Path rootDir, FileSystem workingFs, Path workingDir,
|
||||
long cacheRefreshPeriod, long cacheRefreshDelay, String refreshThreadName,
|
||||
SnapshotFileInspector inspectSnapshotFiles) {
|
||||
public SnapshotFileCache(FileSystem fs, Path rootDir, long cacheRefreshPeriod,
|
||||
long cacheRefreshDelay, String refreshThreadName,
|
||||
SnapshotFileInspector inspectSnapshotFiles) {
|
||||
this.fs = fs;
|
||||
this.workingFs = workingFs;
|
||||
this.workingSnapshotDir = workingDir;
|
||||
this.fileInspector = inspectSnapshotFiles;
|
||||
this.snapshotDir = SnapshotDescriptionUtils.getSnapshotsDir(rootDir);
|
||||
// periodically refresh the file cache to make sure we aren't superfluously saving files.
|
||||
|
@ -188,7 +176,6 @@ public class SnapshotFileCache implements Stoppable {
|
|||
public synchronized Iterable<FileStatus> getUnreferencedFiles(Iterable<FileStatus> files,
|
||||
final SnapshotManager snapshotManager) throws IOException {
|
||||
List<FileStatus> unReferencedFiles = Lists.newArrayList();
|
||||
List<String> snapshotsInProgress = null;
|
||||
boolean refreshed = false;
|
||||
Lock lock = null;
|
||||
if (snapshotManager != null) {
|
||||
|
@ -210,12 +197,6 @@ public class SnapshotFileCache implements Stoppable {
|
|||
if (cache.contains(fileName)) {
|
||||
continue;
|
||||
}
|
||||
if (snapshotsInProgress == null) {
|
||||
snapshotsInProgress = getSnapshotsInProgress();
|
||||
}
|
||||
if (snapshotsInProgress.contains(fileName)) {
|
||||
continue;
|
||||
}
|
||||
unReferencedFiles.add(file);
|
||||
}
|
||||
} finally {
|
||||
|
@ -258,8 +239,7 @@ public class SnapshotFileCache implements Stoppable {
|
|||
// that new snapshot, even though it has the same name as the files referenced have
|
||||
// probably changed.
|
||||
if (files == null || files.hasBeenModified(snapshotDir.getModificationTime())) {
|
||||
Collection<String> storedFiles = fileInspector.filesUnderSnapshot(fs,
|
||||
snapshotDir.getPath());
|
||||
Collection<String> storedFiles = fileInspector.filesUnderSnapshot(snapshotDir.getPath());
|
||||
files = new SnapshotDirectoryInfo(snapshotDir.getModificationTime(), storedFiles);
|
||||
}
|
||||
// add all the files to cache
|
||||
|
@ -271,26 +251,6 @@ public class SnapshotFileCache implements Stoppable {
|
|||
this.snapshots.putAll(newSnapshots);
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
List<String> getSnapshotsInProgress() throws IOException {
|
||||
List<String> snapshotInProgress = Lists.newArrayList();
|
||||
// only add those files to the cache, but not to the known snapshots
|
||||
|
||||
FileStatus[] snapshotsInProgress = CommonFSUtils.listStatus(this.workingFs, this.workingSnapshotDir);
|
||||
|
||||
if (!ArrayUtils.isEmpty(snapshotsInProgress)) {
|
||||
for (FileStatus snapshot : snapshotsInProgress) {
|
||||
try {
|
||||
snapshotInProgress.addAll(fileInspector.filesUnderSnapshot(workingFs,
|
||||
snapshot.getPath()));
|
||||
} catch (CorruptedSnapshotException cse) {
|
||||
LOG.info("Corrupted in-progress snapshot file exception, ignored.", cse);
|
||||
}
|
||||
}
|
||||
}
|
||||
return snapshotInProgress;
|
||||
}
|
||||
|
||||
/**
|
||||
* Simple helper task that just periodically attempts to refresh the cache
|
||||
*/
|
||||
|
|
|
@ -30,7 +30,6 @@ import org.apache.hadoop.hbase.master.HMaster;
|
|||
import org.apache.hadoop.hbase.master.MasterServices;
|
||||
import org.apache.hadoop.hbase.master.cleaner.BaseHFileCleanerDelegate;
|
||||
import org.apache.hadoop.hbase.snapshot.CorruptedSnapshotException;
|
||||
import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
|
||||
import org.apache.hadoop.hbase.snapshot.SnapshotReferenceUtil;
|
||||
import org.apache.hadoop.hbase.util.CommonFSUtils;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
|
@ -94,15 +93,10 @@ public class SnapshotHFileCleaner extends BaseHFileCleanerDelegate {
|
|||
DEFAULT_HFILE_CACHE_REFRESH_PERIOD);
|
||||
final FileSystem fs = CommonFSUtils.getCurrentFileSystem(conf);
|
||||
Path rootDir = CommonFSUtils.getRootDir(conf);
|
||||
Path workingDir = SnapshotDescriptionUtils.getWorkingSnapshotDir(rootDir, conf);
|
||||
FileSystem workingFs = workingDir.getFileSystem(conf);
|
||||
|
||||
cache = new SnapshotFileCache(fs, rootDir, workingFs, workingDir, cacheRefreshPeriod,
|
||||
cacheRefreshPeriod, "snapshot-hfile-cleaner-cache-refresher",
|
||||
new SnapshotFileCache.SnapshotFileInspector() {
|
||||
cache = new SnapshotFileCache(fs, rootDir, cacheRefreshPeriod, cacheRefreshPeriod,
|
||||
"snapshot-hfile-cleaner-cache-refresher", new SnapshotFileCache.SnapshotFileInspector() {
|
||||
@Override
|
||||
public Collection<String> filesUnderSnapshot(final FileSystem fs,
|
||||
final Path snapshotDir)
|
||||
public Collection<String> filesUnderSnapshot(final Path snapshotDir)
|
||||
throws IOException {
|
||||
return SnapshotReferenceUtil.getHFileNames(conf, fs, snapshotDir);
|
||||
}
|
||||
|
|
|
@ -224,9 +224,7 @@ public abstract class TakeSnapshotHandler extends EventHandler implements Snapsh
|
|||
verifier.verifySnapshot(this.workingDir, serverNames);
|
||||
|
||||
// complete the snapshot, atomically moving from tmp to .snapshot dir.
|
||||
SnapshotDescriptionUtils.completeSnapshot(this.snapshotDir, this.workingDir, this.rootFs,
|
||||
this.workingDirFs, this.conf);
|
||||
finished = true;
|
||||
completeSnapshot(this.snapshotDir, this.workingDir, this.rootFs, this.workingDirFs);
|
||||
msg = "Snapshot " + snapshot.getName() + " of table " + snapshotTable + " completed";
|
||||
status.markComplete(msg);
|
||||
LOG.info(msg);
|
||||
|
@ -260,6 +258,42 @@ public abstract class TakeSnapshotHandler extends EventHandler implements Snapsh
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Reset the manager to allow another snapshot to proceed.
|
||||
* Commits the snapshot process by moving the working snapshot
|
||||
* to the finalized filepath
|
||||
*
|
||||
* @param snapshotDir The file path of the completed snapshots
|
||||
* @param workingDir The file path of the in progress snapshots
|
||||
* @param fs The file system of the completed snapshots
|
||||
* @param workingDirFs The file system of the in progress snapshots
|
||||
*
|
||||
* @throws SnapshotCreationException if the snapshot could not be moved
|
||||
* @throws IOException the filesystem could not be reached
|
||||
*/
|
||||
public void completeSnapshot(Path snapshotDir, Path workingDir, FileSystem fs,
|
||||
FileSystem workingDirFs) throws SnapshotCreationException, IOException {
|
||||
LOG.debug("Sentinel is done, just moving the snapshot from " + workingDir + " to "
|
||||
+ snapshotDir);
|
||||
// If the working and completed snapshot directory are on the same file system, attempt
|
||||
// to rename the working snapshot directory to the completed location. If that fails,
|
||||
// or the file systems differ, attempt to copy the directory over, throwing an exception
|
||||
// if this fails
|
||||
URI workingURI = workingDirFs.getUri();
|
||||
URI rootURI = fs.getUri();
|
||||
if ((!workingURI.getScheme().equals(rootURI.getScheme()) ||
|
||||
workingURI.getAuthority() == null ||
|
||||
!workingURI.getAuthority().equals(rootURI.getAuthority()) ||
|
||||
workingURI.getUserInfo() == null ||
|
||||
!workingURI.getUserInfo().equals(rootURI.getUserInfo()) ||
|
||||
!fs.rename(workingDir, snapshotDir)) && !FileUtil.copy(workingDirFs, workingDir, fs,
|
||||
snapshotDir, true, true, this.conf)) {
|
||||
throw new SnapshotCreationException("Failed to copy working directory(" + workingDir
|
||||
+ ") to completed directory(" + snapshotDir + ").");
|
||||
}
|
||||
finished = true;
|
||||
}
|
||||
|
||||
/**
|
||||
* When taking snapshot, first we must acquire the exclusive table lock to confirm that there are
|
||||
* no ongoing merge/split procedures. But later, we should try our best to release the exclusive
|
||||
|
|
|
@ -18,7 +18,6 @@
|
|||
package org.apache.hadoop.hbase.snapshot;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.URI;
|
||||
import java.security.PrivilegedExceptionAction;
|
||||
import java.util.Collections;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
@ -26,7 +25,6 @@ import org.apache.hadoop.conf.Configuration;
|
|||
import org.apache.hadoop.fs.FSDataInputStream;
|
||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.FileUtil;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.permission.FsPermission;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
|
@ -385,38 +383,25 @@ public final class SnapshotDescriptionUtils {
|
|||
}
|
||||
|
||||
/**
|
||||
* Commits the snapshot process by moving the working snapshot
|
||||
* to the finalized filepath
|
||||
*
|
||||
* @param snapshotDir The file path of the completed snapshots
|
||||
* @param workingDir The file path of the in progress snapshots
|
||||
* @param fs The file system of the completed snapshots
|
||||
* @param workingDirFs The file system of the in progress snapshots
|
||||
* @param conf Configuration
|
||||
*
|
||||
* @throws SnapshotCreationException if the snapshot could not be moved
|
||||
* Move the finished snapshot to its final, publicly visible directory - this marks the snapshot
|
||||
* as 'complete'.
|
||||
* @param snapshot description of the snapshot being tabken
|
||||
* @param rootdir root directory of the hbase installation
|
||||
* @param workingDir directory where the in progress snapshot was built
|
||||
* @param fs {@link FileSystem} where the snapshot was built
|
||||
* @throws org.apache.hadoop.hbase.snapshot.SnapshotCreationException if the
|
||||
* snapshot could not be moved
|
||||
* @throws IOException the filesystem could not be reached
|
||||
*/
|
||||
public static void completeSnapshot(Path snapshotDir, Path workingDir, FileSystem fs,
|
||||
FileSystem workingDirFs, final Configuration conf)
|
||||
throws SnapshotCreationException, IOException {
|
||||
LOG.debug("Sentinel is done, just moving the snapshot from " + workingDir + " to "
|
||||
+ snapshotDir);
|
||||
// If the working and completed snapshot directory are on the same file system, attempt
|
||||
// to rename the working snapshot directory to the completed location. If that fails,
|
||||
// or the file systems differ, attempt to copy the directory over, throwing an exception
|
||||
// if this fails
|
||||
URI workingURI = workingDirFs.getUri();
|
||||
URI rootURI = fs.getUri();
|
||||
if ((!workingURI.getScheme().equals(rootURI.getScheme()) ||
|
||||
workingURI.getAuthority() == null ||
|
||||
!workingURI.getAuthority().equals(rootURI.getAuthority()) ||
|
||||
workingURI.getUserInfo() == null ||
|
||||
!workingURI.getUserInfo().equals(rootURI.getUserInfo()) ||
|
||||
!fs.rename(workingDir, snapshotDir)) && !FileUtil.copy(workingDirFs, workingDir, fs,
|
||||
snapshotDir, true, true, conf)) {
|
||||
throw new SnapshotCreationException("Failed to copy working directory(" + workingDir
|
||||
+ ") to completed directory(" + snapshotDir + ").");
|
||||
public static void completeSnapshot(SnapshotDescription snapshot, Path rootdir, Path workingDir,
|
||||
FileSystem fs) throws SnapshotCreationException, IOException {
|
||||
Path finishedDir = getCompletedSnapshotDir(snapshot, rootdir);
|
||||
LOG.debug("Snapshot is done, just moving the snapshot from " + workingDir + " to "
|
||||
+ finishedDir);
|
||||
if (!fs.rename(workingDir, finishedDir)) {
|
||||
throw new SnapshotCreationException(
|
||||
"Failed to move working directory(" + workingDir + ") to completed directory("
|
||||
+ finishedDir + ").", ProtobufUtil.createSnapshotDesc(snapshot));
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -17,11 +17,8 @@
|
|||
*/
|
||||
package org.apache.hadoop.hbase.master.snapshot;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
|
@ -29,14 +26,11 @@ import java.util.Arrays;
|
|||
import java.util.Collection;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.client.RegionInfo;
|
||||
import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
|
||||
import org.apache.hadoop.hbase.snapshot.SnapshotReferenceUtil;
|
||||
import org.apache.hadoop.hbase.snapshot.SnapshotTestingUtils.SnapshotMock;
|
||||
|
@ -52,11 +46,6 @@ import org.junit.experimental.categories.Category;
|
|||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import org.apache.hbase.thirdparty.com.google.common.collect.Iterables;
|
||||
import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
|
||||
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos;
|
||||
|
||||
/**
|
||||
* Test that we correctly reload the cache, filter directories, etc.
|
||||
*/
|
||||
|
@ -67,30 +56,20 @@ public class TestSnapshotFileCache {
|
|||
public static final HBaseClassTestRule CLASS_RULE =
|
||||
HBaseClassTestRule.forClass(TestSnapshotFileCache.class);
|
||||
|
||||
protected static final Logger LOG = LoggerFactory.getLogger(TestSnapshotFileCache.class);
|
||||
protected static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
|
||||
private static final Logger LOG = LoggerFactory.getLogger(TestSnapshotFileCache.class);
|
||||
private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
|
||||
// don't refresh the cache unless we tell it to
|
||||
protected static final long PERIOD = Long.MAX_VALUE;
|
||||
protected static FileSystem fs;
|
||||
protected static Path rootDir;
|
||||
protected static Path snapshotDir;
|
||||
protected static Configuration conf;
|
||||
protected static FileSystem workingFs;
|
||||
protected static Path workingDir;
|
||||
private static final long PERIOD = Long.MAX_VALUE;
|
||||
private static FileSystem fs;
|
||||
private static Path rootDir;
|
||||
private static Path snapshotDir;
|
||||
|
||||
protected static void initCommon() throws Exception {
|
||||
@BeforeClass
|
||||
public static void startCluster() throws Exception {
|
||||
UTIL.startMiniDFSCluster(1);
|
||||
fs = UTIL.getDFSCluster().getFileSystem();
|
||||
rootDir = UTIL.getDefaultRootDirPath();
|
||||
snapshotDir = SnapshotDescriptionUtils.getSnapshotsDir(rootDir);
|
||||
conf = UTIL.getConfiguration();
|
||||
}
|
||||
|
||||
@BeforeClass
|
||||
public static void startCluster() throws Exception {
|
||||
initCommon();
|
||||
workingDir = SnapshotDescriptionUtils.getWorkingSnapshotDir(rootDir, conf);
|
||||
workingFs = workingDir.getFileSystem(conf);
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
|
@ -106,20 +85,18 @@ public class TestSnapshotFileCache {
|
|||
|
||||
@Test
|
||||
public void testLoadAndDelete() throws IOException {
|
||||
SnapshotFileCache cache = new SnapshotFileCache(fs, rootDir, workingFs, workingDir, PERIOD,
|
||||
10000000, "test-snapshot-file-cache-refresh", new SnapshotFiles());
|
||||
SnapshotFileCache cache = new SnapshotFileCache(fs, rootDir, PERIOD, 10000000,
|
||||
"test-snapshot-file-cache-refresh", new SnapshotFiles());
|
||||
|
||||
createAndTestSnapshotV1(cache, "snapshot1a", false, true, false);
|
||||
createAndTestSnapshotV1(cache, "snapshot1b", true, true, false);
|
||||
|
||||
createAndTestSnapshotV2(cache, "snapshot2a", false, true, false);
|
||||
createAndTestSnapshotV2(cache, "snapshot2b", true, true, false);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testReloadModifiedDirectory() throws IOException {
|
||||
SnapshotFileCache cache = new SnapshotFileCache(fs, rootDir, workingFs, workingDir, PERIOD,
|
||||
10000000, "test-snapshot-file-cache-refresh", new SnapshotFiles());
|
||||
SnapshotFileCache cache = new SnapshotFileCache(fs, rootDir, PERIOD, 10000000,
|
||||
"test-snapshot-file-cache-refresh", new SnapshotFiles());
|
||||
|
||||
createAndTestSnapshotV1(cache, "snapshot1", false, true, false);
|
||||
// now delete the snapshot and add a file with a different name
|
||||
|
@ -132,8 +109,8 @@ public class TestSnapshotFileCache {
|
|||
|
||||
@Test
|
||||
public void testSnapshotTempDirReload() throws IOException {
|
||||
SnapshotFileCache cache = new SnapshotFileCache(fs, rootDir, workingFs, workingDir, PERIOD,
|
||||
10000000, "test-snapshot-file-cache-refresh", new SnapshotFiles());
|
||||
SnapshotFileCache cache = new SnapshotFileCache(fs, rootDir, PERIOD, 10000000,
|
||||
"test-snapshot-file-cache-refresh", new SnapshotFiles());
|
||||
|
||||
// Add a new non-tmp snapshot
|
||||
createAndTestSnapshotV1(cache, "snapshot0v1", false, false, false);
|
||||
|
@ -142,8 +119,8 @@ public class TestSnapshotFileCache {
|
|||
|
||||
@Test
|
||||
public void testCacheUpdatedWhenLastModifiedOfSnapDirNotUpdated() throws IOException {
|
||||
SnapshotFileCache cache = new SnapshotFileCache(fs, rootDir, workingFs, workingDir, PERIOD,
|
||||
10000000, "test-snapshot-file-cache-refresh", new SnapshotFiles());
|
||||
SnapshotFileCache cache = new SnapshotFileCache(fs, rootDir, PERIOD, 10000000,
|
||||
"test-snapshot-file-cache-refresh", new SnapshotFiles());
|
||||
|
||||
// Add a new non-tmp snapshot
|
||||
createAndTestSnapshotV1(cache, "snapshot1v1", false, false, true);
|
||||
|
@ -156,77 +133,11 @@ public class TestSnapshotFileCache {
|
|||
createAndTestSnapshotV2(cache, "snapshot2v2", true, false, true);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testWeNeverCacheTmpDirAndLoadIt() throws Exception {
|
||||
|
||||
final AtomicInteger count = new AtomicInteger(0);
|
||||
// don't refresh the cache unless we tell it to
|
||||
long period = Long.MAX_VALUE;
|
||||
SnapshotFileCache cache = new SnapshotFileCache(fs, rootDir, workingFs, workingDir, period,
|
||||
10000000, "test-snapshot-file-cache-refresh", new SnapshotFiles()) {
|
||||
@Override
|
||||
List<String> getSnapshotsInProgress()
|
||||
throws IOException {
|
||||
List<String> result = super.getSnapshotsInProgress();
|
||||
count.incrementAndGet();
|
||||
return result;
|
||||
}
|
||||
|
||||
@Override public void triggerCacheRefreshForTesting() {
|
||||
super.triggerCacheRefreshForTesting();
|
||||
}
|
||||
};
|
||||
|
||||
SnapshotMock.SnapshotBuilder complete =
|
||||
createAndTestSnapshotV1(cache, "snapshot", false, false, false);
|
||||
|
||||
int countBeforeCheck = count.get();
|
||||
|
||||
CommonFSUtils.logFileSystemState(fs, rootDir, LOG);
|
||||
|
||||
List<FileStatus> allStoreFiles = getStoreFilesForSnapshot(complete);
|
||||
Iterable<FileStatus> deletableFiles = cache.getUnreferencedFiles(allStoreFiles, null);
|
||||
assertTrue(Iterables.isEmpty(deletableFiles));
|
||||
// no need for tmp dir check as all files are accounted for.
|
||||
assertEquals(0, count.get() - countBeforeCheck);
|
||||
|
||||
// add a random file to make sure we refresh
|
||||
FileStatus randomFile = mockStoreFile(UTIL.getRandomUUID().toString());
|
||||
allStoreFiles.add(randomFile);
|
||||
deletableFiles = cache.getUnreferencedFiles(allStoreFiles, null);
|
||||
assertEquals(randomFile, Iterables.getOnlyElement(deletableFiles));
|
||||
assertEquals(1, count.get() - countBeforeCheck); // we check the tmp directory
|
||||
}
|
||||
|
||||
private List<FileStatus> getStoreFilesForSnapshot(SnapshotMock.SnapshotBuilder builder)
|
||||
throws IOException {
|
||||
final List<FileStatus> allStoreFiles = Lists.newArrayList();
|
||||
SnapshotReferenceUtil
|
||||
.visitReferencedFiles(conf, fs, builder.getSnapshotsDir(),
|
||||
new SnapshotReferenceUtil.SnapshotVisitor() {
|
||||
@Override public void storeFile(RegionInfo regionInfo, String familyName,
|
||||
SnapshotProtos.SnapshotRegionManifest.StoreFile storeFile) throws IOException {
|
||||
FileStatus status = mockStoreFile(storeFile.getName());
|
||||
allStoreFiles.add(status);
|
||||
}
|
||||
});
|
||||
return allStoreFiles;
|
||||
}
|
||||
|
||||
private FileStatus mockStoreFile(String storeFileName) {
|
||||
FileStatus status = mock(FileStatus.class);
|
||||
Path path = mock(Path.class);
|
||||
when(path.getName()).thenReturn(storeFileName);
|
||||
when(status.getPath()).thenReturn(path);
|
||||
return status;
|
||||
}
|
||||
|
||||
class SnapshotFiles implements SnapshotFileCache.SnapshotFileInspector {
|
||||
@Override
|
||||
public Collection<String> filesUnderSnapshot(final FileSystem workingFs,
|
||||
final Path snapshotDir) throws IOException {
|
||||
public Collection<String> filesUnderSnapshot(final Path snapshotDir) throws IOException {
|
||||
Collection<String> files = new HashSet<>();
|
||||
files.addAll(SnapshotReferenceUtil.getHFileNames(conf, workingFs, snapshotDir));
|
||||
files.addAll(SnapshotReferenceUtil.getHFileNames(UTIL.getConfiguration(), fs, snapshotDir));
|
||||
return files;
|
||||
}
|
||||
}
|
||||
|
@ -234,7 +145,7 @@ public class TestSnapshotFileCache {
|
|||
private SnapshotMock.SnapshotBuilder createAndTestSnapshotV1(final SnapshotFileCache cache,
|
||||
final String name, final boolean tmp, final boolean removeOnExit, boolean setFolderTime)
|
||||
throws IOException {
|
||||
SnapshotMock snapshotMock = new SnapshotMock(conf, fs, rootDir);
|
||||
SnapshotMock snapshotMock = new SnapshotMock(UTIL.getConfiguration(), fs, rootDir);
|
||||
SnapshotMock.SnapshotBuilder builder = snapshotMock.createSnapshotV1(name, name);
|
||||
createAndTestSnapshot(cache, builder, tmp, removeOnExit, setFolderTime);
|
||||
return builder;
|
||||
|
@ -242,7 +153,7 @@ public class TestSnapshotFileCache {
|
|||
|
||||
private void createAndTestSnapshotV2(final SnapshotFileCache cache, final String name,
|
||||
final boolean tmp, final boolean removeOnExit, boolean setFolderTime) throws IOException {
|
||||
SnapshotMock snapshotMock = new SnapshotMock(conf, fs, rootDir);
|
||||
SnapshotMock snapshotMock = new SnapshotMock(UTIL.getConfiguration(), fs, rootDir);
|
||||
SnapshotMock.SnapshotBuilder builder = snapshotMock.createSnapshotV2(name, name);
|
||||
createAndTestSnapshot(cache, builder, tmp, removeOnExit, setFolderTime);
|
||||
}
|
||||
|
@ -253,20 +164,12 @@ public class TestSnapshotFileCache {
|
|||
List<Path> files = new ArrayList<>();
|
||||
for (int i = 0; i < 3; ++i) {
|
||||
for (Path filePath: builder.addRegion()) {
|
||||
if (tmp) {
|
||||
// We should be able to find all the files while the snapshot creation is in-progress
|
||||
CommonFSUtils.logFileSystemState(fs, rootDir, LOG);
|
||||
assertFalse("Cache didn't find " + filePath,
|
||||
contains(getNonSnapshotFiles(cache, filePath), filePath));
|
||||
}
|
||||
files.add(filePath);
|
||||
}
|
||||
}
|
||||
|
||||
// Finalize the snapshot
|
||||
if (!tmp) {
|
||||
builder.commit();
|
||||
}
|
||||
builder.commit();
|
||||
|
||||
if (setFolderTime) {
|
||||
fs.setTimes(snapshotDir, 0, -1);
|
||||
|
@ -280,7 +183,7 @@ public class TestSnapshotFileCache {
|
|||
CommonFSUtils.logFileSystemState(fs, rootDir, LOG);
|
||||
if (removeOnExit) {
|
||||
LOG.debug("Deleting snapshot.");
|
||||
builder.getSnapshotsDir().getFileSystem(conf).delete(builder.getSnapshotsDir(), true);
|
||||
fs.delete(builder.getSnapshotsDir(), true);
|
||||
CommonFSUtils.logFileSystemState(fs, rootDir, LOG);
|
||||
|
||||
// then trigger a refresh
|
||||
|
|
|
@ -1,63 +0,0 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.master.snapshot;
|
||||
|
||||
import java.io.File;
|
||||
import java.nio.file.Paths;
|
||||
import java.util.UUID;
|
||||
import org.apache.commons.io.FileUtils;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||
import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
|
||||
import org.apache.hadoop.hbase.testclassification.LargeTests;
|
||||
import org.apache.hadoop.hbase.testclassification.MasterTests;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.ClassRule;
|
||||
import org.junit.experimental.categories.Category;
|
||||
|
||||
/**
|
||||
* Test that we correctly reload the cache, filter directories, etc.
|
||||
* while the temporary directory is on a different file system than the root directory
|
||||
*/
|
||||
@Category({MasterTests.class, LargeTests.class})
|
||||
public class TestSnapshotFileCacheWithDifferentWorkingDir extends TestSnapshotFileCache {
|
||||
|
||||
@ClassRule
|
||||
public static final HBaseClassTestRule CLASS_RULE =
|
||||
HBaseClassTestRule.forClass(TestSnapshotFileCacheWithDifferentWorkingDir.class);
|
||||
|
||||
protected static String TEMP_DIR = Paths.get(".", UUID.randomUUID().toString()).toAbsolutePath().toString();
|
||||
|
||||
@BeforeClass
|
||||
public static void startCluster() throws Exception {
|
||||
initCommon();
|
||||
|
||||
// Set the snapshot working directory to be on another filesystem.
|
||||
conf.set(SnapshotDescriptionUtils.SNAPSHOT_WORKING_DIR,
|
||||
"file://" + new Path(TEMP_DIR, ".tmpDir").toUri());
|
||||
workingDir = SnapshotDescriptionUtils.getWorkingSnapshotDir(rootDir, conf);
|
||||
workingFs = workingDir.getFileSystem(conf);
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void stopCluster() throws Exception {
|
||||
UTIL.shutdownMiniDFSCluster();
|
||||
FileUtils.deleteDirectory(new File(TEMP_DIR));
|
||||
}
|
||||
}
|
|
@ -18,7 +18,6 @@
|
|||
package org.apache.hadoop.hbase.master.snapshot;
|
||||
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Collection;
|
||||
|
@ -61,7 +60,6 @@ public class TestSnapshotHFileCleaner {
|
|||
private static final String SNAPSHOT_NAME_STR = "testSnapshotManifest-snapshot";
|
||||
private static Path rootDir;
|
||||
private static FileSystem fs;
|
||||
private static Configuration conf;
|
||||
|
||||
@Rule
|
||||
public TestName name = new TestName();
|
||||
|
@ -71,7 +69,7 @@ public class TestSnapshotHFileCleaner {
|
|||
*/
|
||||
@BeforeClass
|
||||
public static void setup() throws Exception {
|
||||
conf = TEST_UTIL.getConfiguration();
|
||||
Configuration conf = TEST_UTIL.getConfiguration();
|
||||
rootDir = CommonFSUtils.getRootDir(conf);
|
||||
fs = FileSystem.get(conf);
|
||||
}
|
||||
|
@ -85,6 +83,7 @@ public class TestSnapshotHFileCleaner {
|
|||
|
||||
@Test
|
||||
public void testFindsSnapshotFilesWhenCleaning() throws IOException {
|
||||
Configuration conf = TEST_UTIL.getConfiguration();
|
||||
CommonFSUtils.setRootDir(conf, TEST_UTIL.getDataTestDir());
|
||||
Path rootDir = CommonFSUtils.getRootDir(conf);
|
||||
Path archivedHfileDir = new Path(TEST_UTIL.getDataTestDir(), HConstants.HFILE_ARCHIVE_DIRECTORY);
|
||||
|
@ -117,10 +116,9 @@ public class TestSnapshotHFileCleaner {
|
|||
|
||||
static class SnapshotFiles implements SnapshotFileCache.SnapshotFileInspector {
|
||||
@Override
|
||||
public Collection<String> filesUnderSnapshot(final FileSystem workingFs,
|
||||
final Path snapshotDir) throws IOException {
|
||||
public Collection<String> filesUnderSnapshot(final Path snapshotDir) throws IOException {
|
||||
Collection<String> files = new HashSet<>();
|
||||
files.addAll(SnapshotReferenceUtil.getHFileNames(conf, workingFs, snapshotDir));
|
||||
files.addAll(SnapshotReferenceUtil.getHFileNames(TEST_UTIL.getConfiguration(), fs, snapshotDir));
|
||||
return files;
|
||||
}
|
||||
}
|
||||
|
@ -132,20 +130,14 @@ public class TestSnapshotHFileCleaner {
|
|||
@Test
|
||||
public void testCorruptedRegionManifest() throws IOException {
|
||||
SnapshotTestingUtils.SnapshotMock
|
||||
snapshotMock = new SnapshotTestingUtils.SnapshotMock(conf, fs, rootDir);
|
||||
snapshotMock = new SnapshotTestingUtils.SnapshotMock(TEST_UTIL.getConfiguration(), fs, rootDir);
|
||||
SnapshotTestingUtils.SnapshotMock.SnapshotBuilder builder = snapshotMock.createSnapshotV2(
|
||||
SNAPSHOT_NAME_STR, TABLE_NAME_STR);
|
||||
builder.addRegionV2();
|
||||
builder.corruptOneRegionManifest();
|
||||
|
||||
long period = Long.MAX_VALUE;
|
||||
SnapshotFileCache cache = new SnapshotFileCache(conf, period, 10000000,
|
||||
"test-snapshot-file-cache-refresh", new SnapshotFiles());
|
||||
try {
|
||||
cache.getSnapshotsInProgress();
|
||||
} finally {
|
||||
fs.delete(SnapshotDescriptionUtils.getWorkingSnapshotDir(rootDir, conf), true);
|
||||
}
|
||||
fs.delete(SnapshotDescriptionUtils.getWorkingSnapshotDir(rootDir, TEST_UTIL.getConfiguration()),
|
||||
true);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -155,7 +147,7 @@ public class TestSnapshotHFileCleaner {
|
|||
@Test
|
||||
public void testCorruptedDataManifest() throws IOException {
|
||||
SnapshotTestingUtils.SnapshotMock
|
||||
snapshotMock = new SnapshotTestingUtils.SnapshotMock(conf, fs, rootDir);
|
||||
snapshotMock = new SnapshotTestingUtils.SnapshotMock(TEST_UTIL.getConfiguration(), fs, rootDir);
|
||||
SnapshotTestingUtils.SnapshotMock.SnapshotBuilder builder = snapshotMock.createSnapshotV2(
|
||||
SNAPSHOT_NAME_STR, TABLE_NAME_STR);
|
||||
builder.addRegionV2();
|
||||
|
@ -163,29 +155,7 @@ public class TestSnapshotHFileCleaner {
|
|||
builder.consolidate();
|
||||
builder.corruptDataManifest();
|
||||
|
||||
long period = Long.MAX_VALUE;
|
||||
SnapshotFileCache cache = new SnapshotFileCache(conf, period, 10000000,
|
||||
"test-snapshot-file-cache-refresh", new SnapshotFiles());
|
||||
try {
|
||||
cache.getSnapshotsInProgress();
|
||||
} finally {
|
||||
fs.delete(SnapshotDescriptionUtils.getWorkingSnapshotDir(rootDir,
|
||||
fs.delete(SnapshotDescriptionUtils.getWorkingSnapshotDir(rootDir,
|
||||
TEST_UTIL.getConfiguration()), true);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMissedTmpSnapshot() throws IOException {
|
||||
SnapshotTestingUtils.SnapshotMock snapshotMock =
|
||||
new SnapshotTestingUtils.SnapshotMock(conf, fs, rootDir);
|
||||
SnapshotTestingUtils.SnapshotMock.SnapshotBuilder builder = snapshotMock.createSnapshotV2(
|
||||
SNAPSHOT_NAME_STR, TABLE_NAME_STR);
|
||||
builder.addRegionV2();
|
||||
builder.missOneRegionSnapshotFile();
|
||||
long period = Long.MAX_VALUE;
|
||||
SnapshotFileCache cache = new SnapshotFileCache(conf, period, 10000000,
|
||||
"test-snapshot-file-cache-refresh", new SnapshotFiles());
|
||||
cache.getSnapshotsInProgress();
|
||||
assertTrue(fs.exists(builder.getSnapshotsDir()));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -481,8 +481,7 @@ public final class SnapshotTestingUtils {
|
|||
this.tableRegions = tableRegions;
|
||||
this.snapshotDir = SnapshotDescriptionUtils.getWorkingSnapshotDir(desc, rootDir, conf);
|
||||
new FSTableDescriptors(conf)
|
||||
.createTableDescriptorForTableDirectory(this.snapshotDir.getFileSystem(conf),
|
||||
snapshotDir, htd, false);
|
||||
.createTableDescriptorForTableDirectory(snapshotDir, htd, false);
|
||||
}
|
||||
|
||||
public TableDescriptor getTableDescriptor() {
|
||||
|
@ -605,9 +604,7 @@ public final class SnapshotTestingUtils {
|
|||
SnapshotManifest manifest = SnapshotManifest.create(conf, fs, snapshotDir, desc, monitor);
|
||||
manifest.addTableDescriptor(htd);
|
||||
manifest.consolidate();
|
||||
Path finishedDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(desc, rootDir);
|
||||
SnapshotDescriptionUtils.completeSnapshot(finishedDir, snapshotDir, fs,
|
||||
snapshotDir.getFileSystem(conf), conf);
|
||||
SnapshotDescriptionUtils.completeSnapshot(desc, rootDir, snapshotDir, fs);
|
||||
snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(desc, rootDir);
|
||||
return snapshotDir;
|
||||
}
|
||||
|
@ -669,8 +666,7 @@ public final class SnapshotTestingUtils {
|
|||
.build();
|
||||
|
||||
Path workingDir = SnapshotDescriptionUtils.getWorkingSnapshotDir(desc, rootDir, conf);
|
||||
FileSystem workingFs = workingDir.getFileSystem(conf);
|
||||
SnapshotDescriptionUtils.writeSnapshotInfo(desc, workingDir, workingFs);
|
||||
SnapshotDescriptionUtils.writeSnapshotInfo(desc, workingDir, fs);
|
||||
return new SnapshotBuilder(conf, fs, rootDir, htd, desc, regions);
|
||||
}
|
||||
|
||||
|
|
|
@ -97,15 +97,11 @@ public class TestSnapshotDescriptionUtils {
|
|||
Path snapshotDir = new Path(root, HConstants.SNAPSHOT_DIR_NAME);
|
||||
Path tmpDir = new Path(snapshotDir, ".tmp");
|
||||
Path workingDir = new Path(tmpDir, "not_a_snapshot");
|
||||
Configuration conf = new Configuration();
|
||||
FileSystem workingFs = workingDir.getFileSystem(conf);
|
||||
assertFalse("Already have working snapshot dir: " + workingDir
|
||||
+ " but shouldn't. Test file leak?", fs.exists(workingDir));
|
||||
SnapshotDescription snapshot = SnapshotDescription.newBuilder().setName("snapshot").build();
|
||||
Path finishedDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(snapshot, snapshotDir);
|
||||
|
||||
try {
|
||||
SnapshotDescriptionUtils.completeSnapshot(finishedDir, workingDir, fs, workingFs, conf);
|
||||
SnapshotDescriptionUtils.completeSnapshot(snapshot, root, workingDir, fs);
|
||||
fail("Shouldn't successfully complete move of a non-existent directory.");
|
||||
} catch (IOException e) {
|
||||
LOG.info("Correctly failed to move non-existant directory: " + e.getMessage());
|
||||
|
|
Loading…
Reference in New Issue