HBASE-22190 SnapshotFileCache may fail to load the correct snapshot file list when there is an on-going snapshot operation
This commit is contained in:
parent
4379fe4ad3
commit
4489598a8e
|
@ -17,7 +17,6 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hbase.master.snapshot;
|
package org.apache.hadoop.hbase.master.snapshot;
|
||||||
|
|
||||||
import java.io.FileNotFoundException;
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
|
@ -28,7 +27,7 @@ import java.util.Set;
|
||||||
import java.util.Timer;
|
import java.util.Timer;
|
||||||
import java.util.TimerTask;
|
import java.util.TimerTask;
|
||||||
import java.util.concurrent.locks.Lock;
|
import java.util.concurrent.locks.Lock;
|
||||||
|
import org.apache.commons.lang3.ArrayUtils;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.FileStatus;
|
import org.apache.hadoop.fs.FileStatus;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
@ -97,8 +96,6 @@ public class SnapshotFileCache implements Stoppable {
|
||||||
private final Map<String, SnapshotDirectoryInfo> snapshots = new HashMap<>();
|
private final Map<String, SnapshotDirectoryInfo> snapshots = new HashMap<>();
|
||||||
private final Timer refreshTimer;
|
private final Timer refreshTimer;
|
||||||
|
|
||||||
private long lastModifiedTime = Long.MIN_VALUE;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Create a snapshot file cache for all snapshots under the specified [root]/.snapshot on the
|
* Create a snapshot file cache for all snapshots under the specified [root]/.snapshot on the
|
||||||
* filesystem.
|
* filesystem.
|
||||||
|
@ -128,7 +125,8 @@ public class SnapshotFileCache implements Stoppable {
|
||||||
* @param inspectSnapshotFiles Filter to apply to each snapshot to extract the files.
|
* @param inspectSnapshotFiles Filter to apply to each snapshot to extract the files.
|
||||||
*/
|
*/
|
||||||
public SnapshotFileCache(FileSystem fs, Path rootDir, long cacheRefreshPeriod,
|
public SnapshotFileCache(FileSystem fs, Path rootDir, long cacheRefreshPeriod,
|
||||||
long cacheRefreshDelay, String refreshThreadName, SnapshotFileInspector inspectSnapshotFiles) {
|
long cacheRefreshDelay, String refreshThreadName,
|
||||||
|
SnapshotFileInspector inspectSnapshotFiles) {
|
||||||
this.fs = fs;
|
this.fs = fs;
|
||||||
this.fileInspector = inspectSnapshotFiles;
|
this.fileInspector = inspectSnapshotFiles;
|
||||||
this.snapshotDir = SnapshotDescriptionUtils.getSnapshotsDir(rootDir);
|
this.snapshotDir = SnapshotDescriptionUtils.getSnapshotsDir(rootDir);
|
||||||
|
@ -141,14 +139,14 @@ public class SnapshotFileCache implements Stoppable {
|
||||||
/**
|
/**
|
||||||
* Trigger a cache refresh, even if its before the next cache refresh. Does not affect pending
|
* Trigger a cache refresh, even if its before the next cache refresh. Does not affect pending
|
||||||
* cache refreshes.
|
* cache refreshes.
|
||||||
* <p>
|
* <p/>
|
||||||
* Blocks until the cache is refreshed.
|
* Blocks until the cache is refreshed.
|
||||||
* <p>
|
* <p/>
|
||||||
* Exposed for TESTING.
|
* Exposed for TESTING.
|
||||||
*/
|
*/
|
||||||
public void triggerCacheRefreshForTesting() {
|
public synchronized void triggerCacheRefreshForTesting() {
|
||||||
try {
|
try {
|
||||||
SnapshotFileCache.this.refreshCache();
|
refreshCache();
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
LOG.warn("Failed to refresh snapshot hfile cache!", e);
|
LOG.warn("Failed to refresh snapshot hfile cache!", e);
|
||||||
}
|
}
|
||||||
|
@ -156,10 +154,9 @@ public class SnapshotFileCache implements Stoppable {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Check to see if any of the passed file names is contained in any of the snapshots.
|
* Check to see if any of the passed file names is contained in any of the snapshots. First checks
|
||||||
* First checks an in-memory cache of the files to keep. If its not in the cache, then the cache
|
* an in-memory cache of the files to keep. If its not in the cache, then the cache is refreshed
|
||||||
* is refreshed and the cache checked again for that file.
|
* and the cache checked again for that file. This ensures that we never return files that exist.
|
||||||
* This ensures that we never return files that exist.
|
|
||||||
* <p>
|
* <p>
|
||||||
* Note this may lead to periodic false positives for the file being referenced. Periodically, the
|
* Note this may lead to periodic false positives for the file being referenced. Periodically, the
|
||||||
* cache is refreshed even if there are no requests to ensure that the false negatives get removed
|
* cache is refreshed even if there are no requests to ensure that the false negatives get removed
|
||||||
|
@ -168,8 +165,8 @@ public class SnapshotFileCache implements Stoppable {
|
||||||
* at that point, cache will still think the file system contains that file and return
|
* at that point, cache will still think the file system contains that file and return
|
||||||
* <tt>true</tt>, even if it is no longer present (false positive). However, if the file never was
|
* <tt>true</tt>, even if it is no longer present (false positive). However, if the file never was
|
||||||
* on the filesystem, we will never find it and always return <tt>false</tt>.
|
* on the filesystem, we will never find it and always return <tt>false</tt>.
|
||||||
* @param files file to check, NOTE: Relies that files are loaded from hdfs before method
|
* @param files file to check, NOTE: Relies that files are loaded from hdfs before method is
|
||||||
* is called (NOT LAZY)
|
* called (NOT LAZY)
|
||||||
* @return <tt>unReferencedFiles</tt> the collection of files that do not have snapshot references
|
* @return <tt>unReferencedFiles</tt> the collection of files that do not have snapshot references
|
||||||
* @throws IOException if there is an unexpected error reaching the filesystem.
|
* @throws IOException if there is an unexpected error reaching the filesystem.
|
||||||
*/
|
*/
|
||||||
|
@ -177,8 +174,7 @@ public class SnapshotFileCache implements Stoppable {
|
||||||
// is an illegal access to the cache. Really we could do a mutex-guarded pointer swap on the
|
// is an illegal access to the cache. Really we could do a mutex-guarded pointer swap on the
|
||||||
// cache, but that seems overkill at the moment and isn't necessarily a bottleneck.
|
// cache, but that seems overkill at the moment and isn't necessarily a bottleneck.
|
||||||
public synchronized Iterable<FileStatus> getUnreferencedFiles(Iterable<FileStatus> files,
|
public synchronized Iterable<FileStatus> getUnreferencedFiles(Iterable<FileStatus> files,
|
||||||
final SnapshotManager snapshotManager)
|
final SnapshotManager snapshotManager) throws IOException {
|
||||||
throws IOException {
|
|
||||||
List<FileStatus> unReferencedFiles = Lists.newArrayList();
|
List<FileStatus> unReferencedFiles = Lists.newArrayList();
|
||||||
boolean refreshed = false;
|
boolean refreshed = false;
|
||||||
Lock lock = null;
|
Lock lock = null;
|
||||||
|
@ -188,8 +184,8 @@ public class SnapshotFileCache implements Stoppable {
|
||||||
if (lock == null || lock.tryLock()) {
|
if (lock == null || lock.tryLock()) {
|
||||||
try {
|
try {
|
||||||
if (snapshotManager != null && snapshotManager.isTakingAnySnapshot()) {
|
if (snapshotManager != null && snapshotManager.isTakingAnySnapshot()) {
|
||||||
LOG.warn("Not checking unreferenced files since snapshot is running, it will "
|
LOG.warn("Not checking unreferenced files since snapshot is running, it will " +
|
||||||
+ "skip to clean the HFiles this time");
|
"skip to clean the HFiles this time");
|
||||||
return unReferencedFiles;
|
return unReferencedFiles;
|
||||||
}
|
}
|
||||||
for (FileStatus file : files) {
|
for (FileStatus file : files) {
|
||||||
|
@ -212,69 +208,47 @@ public class SnapshotFileCache implements Stoppable {
|
||||||
return unReferencedFiles;
|
return unReferencedFiles;
|
||||||
}
|
}
|
||||||
|
|
||||||
private synchronized void refreshCache() throws IOException {
|
private void refreshCache() throws IOException {
|
||||||
// get the status of the snapshots directory and check if it is has changes
|
// just list the snapshot directory directly, do not check the modification time for the root
|
||||||
FileStatus dirStatus;
|
// snapshot directory, as some file system implementations do not modify the parent directory's
|
||||||
try {
|
// modTime when there are new sub items, for example, S3.
|
||||||
dirStatus = fs.getFileStatus(snapshotDir);
|
FileStatus[] snapshotDirs = FSUtils.listStatus(fs, snapshotDir,
|
||||||
} catch (FileNotFoundException e) {
|
p -> !p.getName().equals(SnapshotDescriptionUtils.SNAPSHOT_TMP_DIR_NAME));
|
||||||
if (this.cache.size() > 0) {
|
|
||||||
LOG.error("Snapshot directory: " + snapshotDir + " doesn't exist");
|
|
||||||
}
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
// if the snapshot directory wasn't modified since we last check, we are done
|
// clear the cache, as in the below code, either we will also clear the snapshots, or we will
|
||||||
if (dirStatus.getModificationTime() <= this.lastModifiedTime) return;
|
// refill the file name cache again.
|
||||||
|
|
||||||
// directory was modified, so we need to reload our cache
|
|
||||||
// there could be a slight race here where we miss the cache, check the directory modification
|
|
||||||
// time, then someone updates the directory, causing us to not scan the directory again.
|
|
||||||
// However, snapshot directories are only created once, so this isn't an issue.
|
|
||||||
|
|
||||||
// 1. update the modified time
|
|
||||||
this.lastModifiedTime = dirStatus.getModificationTime();
|
|
||||||
|
|
||||||
// 2.clear the cache
|
|
||||||
this.cache.clear();
|
this.cache.clear();
|
||||||
Map<String, SnapshotDirectoryInfo> known = new HashMap<>();
|
if (ArrayUtils.isEmpty(snapshotDirs)) {
|
||||||
|
|
||||||
// 3. check each of the snapshot directories
|
|
||||||
FileStatus[] snapshots = FSUtils.listStatus(fs, snapshotDir);
|
|
||||||
if (snapshots == null) {
|
|
||||||
// remove all the remembered snapshots because we don't have any left
|
// remove all the remembered snapshots because we don't have any left
|
||||||
if (LOG.isDebugEnabled() && this.snapshots.size() > 0) {
|
if (LOG.isDebugEnabled() && this.snapshots.size() > 0) {
|
||||||
LOG.debug("No snapshots on-disk, cache empty");
|
LOG.debug("No snapshots on-disk, clear cache");
|
||||||
}
|
}
|
||||||
this.snapshots.clear();
|
this.snapshots.clear();
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
// 3.1 iterate through the on-disk snapshots
|
// iterate over all the cached snapshots and see if we need to update some, it is not an
|
||||||
for (FileStatus snapshot : snapshots) {
|
// expensive operation if we do not reload the manifest of snapshots.
|
||||||
String name = snapshot.getPath().getName();
|
Map<String, SnapshotDirectoryInfo> newSnapshots = new HashMap<>();
|
||||||
// its not the tmp dir,
|
for (FileStatus snapshotDir : snapshotDirs) {
|
||||||
if (!name.equals(SnapshotDescriptionUtils.SNAPSHOT_TMP_DIR_NAME)) {
|
String name = snapshotDir.getPath().getName();
|
||||||
SnapshotDirectoryInfo files = this.snapshots.remove(name);
|
SnapshotDirectoryInfo files = this.snapshots.remove(name);
|
||||||
// 3.1.1 if we don't know about the snapshot or its been modified, we need to update the
|
// if we don't know about the snapshot or its been modified, we need to update the
|
||||||
// files the latter could occur where I create a snapshot, then delete it, and then make a
|
// files the latter could occur where I create a snapshot, then delete it, and then make a
|
||||||
// new snapshot with the same name. We will need to update the cache the information from
|
// new snapshot with the same name. We will need to update the cache the information from
|
||||||
// that new snapshot, even though it has the same name as the files referenced have
|
// that new snapshot, even though it has the same name as the files referenced have
|
||||||
// probably changed.
|
// probably changed.
|
||||||
if (files == null || files.hasBeenModified(snapshot.getModificationTime())) {
|
if (files == null || files.hasBeenModified(snapshotDir.getModificationTime())) {
|
||||||
// get all files for the snapshot and create a new info
|
Collection<String> storedFiles = fileInspector.filesUnderSnapshot(snapshotDir.getPath());
|
||||||
Collection<String> storedFiles = fileInspector.filesUnderSnapshot(snapshot.getPath());
|
files = new SnapshotDirectoryInfo(snapshotDir.getModificationTime(), storedFiles);
|
||||||
files = new SnapshotDirectoryInfo(snapshot.getModificationTime(), storedFiles);
|
|
||||||
}
|
}
|
||||||
// 3.2 add all the files to cache
|
// add all the files to cache
|
||||||
this.cache.addAll(files.getFiles());
|
this.cache.addAll(files.getFiles());
|
||||||
known.put(name, files);
|
newSnapshots.put(name, files);
|
||||||
}
|
}
|
||||||
}
|
// set the snapshots we are tracking
|
||||||
|
|
||||||
// 4. set the snapshots we are tracking
|
|
||||||
this.snapshots.clear();
|
this.snapshots.clear();
|
||||||
this.snapshots.putAll(known);
|
this.snapshots.putAll(newSnapshots);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -283,12 +257,18 @@ public class SnapshotFileCache implements Stoppable {
|
||||||
public class RefreshCacheTask extends TimerTask {
|
public class RefreshCacheTask extends TimerTask {
|
||||||
@Override
|
@Override
|
||||||
public void run() {
|
public void run() {
|
||||||
|
synchronized (SnapshotFileCache.this) {
|
||||||
try {
|
try {
|
||||||
SnapshotFileCache.this.refreshCache();
|
SnapshotFileCache.this.refreshCache();
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
LOG.warn("Failed to refresh snapshot hfile cache!", e);
|
LOG.warn("Failed to refresh snapshot hfile cache!", e);
|
||||||
|
// clear all the cached entries if we meet an error
|
||||||
|
cache.clear();
|
||||||
|
snapshots.clear();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
Loading…
Reference in New Issue