HBASE-7174 [snapshots] Refactor snapshot file cleaner cache to use Snapshot FileVisitor (Matteo Bertozzi)

git-svn-id: https://svn.apache.org/repos/asf/hbase/branches/hbase-7290@1445792 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Jonathan Hsieh 2013-02-13 18:18:35 +00:00
parent 71b0e2c0a8
commit 77d05003e3
7 changed files with 164 additions and 97 deletions

View File

@ -92,6 +92,14 @@ public abstract class Chore extends HasThread {
this.sleeper.skipSleepCycle();
}
/*
* Exposed for TESTING!
* calls directly the chore method, from the current thread.
*/
public void choreForTesting() {
chore();
}
/**
* Override to run a task before we start looping.
* @return true if initial chore was successful

View File

@ -20,7 +20,6 @@ package org.apache.hadoop.hbase.master.snapshot;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
@ -36,7 +35,6 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
import org.apache.hadoop.hbase.util.FSUtils;
@ -60,25 +58,10 @@ import org.apache.hadoop.hbase.util.FSUtils;
* Further, the cache is periodically refreshed ensure that files in snapshots that were deleted are
* also removed from the cache.
* <p>
* A {@link PathFilter} must be passed when creating <tt>this</tt> to allow filtering of children
* A {@link SnapshotFileInspector} must be passed when creating <tt>this</tt> to allow extraction of files
* under the /hbase/.snapshot/[snapshot name] directory, for each snapshot. This allows you to only
* cache files under, for instance, all the logs in the .logs directory or all the files under all
* the regions. The filter is only applied the children, not to the children of children. For
* instance, given the layout:
*
* <pre>
* /hbase/.snapshot/SomeSnapshot/
* .logs/
* server/
* server.1234567
* .regioninfo
* 1234567890/
* family/
* 123456789
* </pre>
*
* would only apply a filter to directories: .logs, .regioninfo and 1234567890, not to their
* children.
* the regions.
* <p>
* <tt>this</tt> also considers all running snapshots (those under /hbase/.snapshot/.tmp) as valid
* snapshots and will attempt to cache files from those snapshots as well.
@ -88,11 +71,19 @@ import org.apache.hadoop.hbase.util.FSUtils;
@InterfaceAudience.Private
@InterfaceStability.Evolving
public class SnapshotFileCache implements Stoppable {
public interface SnapshotFileInspector {
/**
* Returns a collection of file names needed by the snapshot.
* @param snapshotDir {@link Path} to the snapshot directory to scan.
* @return the collection of file names needed by the snapshot.
*/
Collection<String> filesUnderSnapshot(final Path snapshotDir) throws IOException;
}
private static final Log LOG = LogFactory.getLog(SnapshotFileCache.class);
private volatile boolean stop = false;
private final FileSystem fs;
private final PathFilter dirFilter;
private final SnapshotFileInspector fileInspector;
private final Path snapshotDir;
private final Set<String> cache = new HashSet<String>();
/**
@ -113,13 +104,13 @@ public class SnapshotFileCache implements Stoppable {
* hbase root directory
* @param cacheRefreshPeriod frequency (ms) with which the cache should be refreshed
* @param refreshThreadName name of the cache refresh thread
* @param inspectSnapshotDirectory Filter to apply to the directories under each snapshot.
* @param inspectSnapshotFiles Filter to apply to each snapshot to extract the files.
* @throws IOException if the {@link FileSystem} or root directory cannot be loaded
*/
public SnapshotFileCache(Configuration conf, long cacheRefreshPeriod, String refreshThreadName,
PathFilter inspectSnapshotDirectory) throws IOException {
SnapshotFileInspector inspectSnapshotFiles) throws IOException {
this(FSUtils.getCurrentFileSystem(conf), FSUtils.getRootDir(conf), 0, cacheRefreshPeriod,
refreshThreadName, inspectSnapshotDirectory);
refreshThreadName, inspectSnapshotFiles);
}
/**
@ -130,12 +121,12 @@ public class SnapshotFileCache implements Stoppable {
* @param cacheRefreshPeriod frequency (ms) with which the cache should be refreshed
* @param cacheRefreshDelay amount of time to wait for the cache to be refreshed
* @param refreshThreadName name of the cache refresh thread
* @param inspectSnapshotDirectory Filter to apply to the directories under each snapshot.
* @param inspectSnapshotFiles Filter to apply to each snapshot to extract the files.
*/
public SnapshotFileCache(FileSystem fs, Path rootDir, long cacheRefreshPeriod,
long cacheRefreshDelay, String refreshThreadName, PathFilter inspectSnapshotDirectory) {
long cacheRefreshDelay, String refreshThreadName, SnapshotFileInspector inspectSnapshotFiles) {
this.fs = fs;
this.dirFilter = inspectSnapshotDirectory;
this.fileInspector = inspectSnapshotFiles;
this.snapshotDir = SnapshotDescriptionUtils.getSnapshotsDir(rootDir);
// periodically refresh the file cache to make sure we aren't superfluously saving files.
this.refreshTimer = new Timer(refreshThreadName, true);
@ -229,10 +220,9 @@ public class SnapshotFileCache implements Stoppable {
FileStatus[] running = FSUtils.listStatus(fs, snapshot.getPath());
if (running == null) continue;
for (FileStatus run : running) {
this.cache.addAll(getAllFiles(run, dirFilter));
this.cache.addAll(fileInspector.filesUnderSnapshot(run.getPath()));
}
}
} else {
SnapshotDirectoryInfo files = this.snapshots.remove(name);
// 3.1.1 if we don't know about the snapshot or its been modified, we need to update the files
// the latter could occur where I create a snapshot, then delete it, and then make a new
@ -240,35 +230,20 @@ public class SnapshotFileCache implements Stoppable {
// snapshot, even though it has the same name as the files referenced have probably changed.
if (files == null || files.hasBeenModified(snapshot.getModificationTime())) {
// get all files for the snapshot and create a new info
Collection<String> storedFiles = getAllFiles(snapshot, dirFilter);
Collection<String> storedFiles = fileInspector.filesUnderSnapshot(snapshot.getPath());
files = new SnapshotDirectoryInfo(snapshot.getModificationTime(), storedFiles);
}
// 3.2 add all the files to cache
this.cache.addAll(files.getFiles());
known.put(name, files);
}
}
// 4. set the snapshots we are tracking
this.snapshots.clear();
this.snapshots.putAll(known);
}
private Collection<String> getAllFiles(FileStatus file, PathFilter filter) throws IOException {
if (!file.isDir()) return Collections.singleton(file.getPath().getName());
Set<String> ret = new HashSet<String>();
// read all the files/directories below the passed directory
FileStatus[] files = FSUtils.listStatus(fs, file.getPath(), filter);
if (files == null) return ret;
// get all the files for the children
for (FileStatus child : files) {
// children aren't filtered out
ret.addAll(getAllFiles(child, null));
}
return ret;
}
/**
* Simple helper task that just periodically attempts to refresh the cache
*/

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.hbase.master.snapshot;
import java.io.IOException;
import java.util.Collection;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -27,6 +28,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.master.cleaner.BaseHFileCleanerDelegate;
import org.apache.hadoop.hbase.snapshot.SnapshotReferenceUtil;
import org.apache.hadoop.hbase.util.FSUtils;
/**
@ -67,11 +69,15 @@ public class SnapshotHFileCleaner extends BaseHFileCleanerDelegate {
try {
long cacheRefreshPeriod = conf.getLong(HFILE_CACHE_REFRESH_PERIOD_CONF_KEY,
DEFAULT_HFILE_CACHE_REFRESH_PERIOD);
FileSystem fs = FSUtils.getCurrentFileSystem(conf);
final FileSystem fs = FSUtils.getCurrentFileSystem(conf);
Path rootDir = FSUtils.getRootDir(conf);
cache = new SnapshotFileCache(fs, rootDir, cacheRefreshPeriod, cacheRefreshPeriod,
"snapshot-hfile-cleaner-cache-refresher", new FSUtils.RegionDirFilter(fs)
);
"snapshot-hfile-cleaner-cache-refresher", new SnapshotFileCache.SnapshotFileInspector() {
public Collection<String> filesUnderSnapshot(final Path snapshotDir)
throws IOException {
return SnapshotReferenceUtil.getHFileNames(fs, snapshotDir);
}
});
} catch (IOException e) {
LOG.error("Failed to create cleaner util", e);
}

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.hbase.master.snapshot;
import java.io.IOException;
import java.util.Collection;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -26,9 +27,9 @@ import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.master.cleaner.BaseLogCleanerDelegate;
import org.apache.hadoop.hbase.snapshot.SnapshotReferenceUtil;
import org.apache.hadoop.hbase.util.FSUtils;
/**
@ -77,20 +78,10 @@ public class SnapshotLogCleaner extends BaseLogCleanerDelegate {
final FileSystem fs = FSUtils.getCurrentFileSystem(conf);
Path rootDir = FSUtils.getRootDir(conf);
cache = new SnapshotFileCache(fs, rootDir, cacheRefreshPeriod, cacheRefreshPeriod,
"snapshot-log-cleaner-cache-refresher", new PathFilter() {
@Override
public boolean accept(Path path) {
if (path.getName().equals(HConstants.HREGION_LOGDIR_NAME)) {
try {
if (!fs.isFile(path)) return true;
} catch (IOException e) {
LOG.error("Couldn't reach fs to check:" + path + " is a directory, stopping!");
SnapshotLogCleaner.this.stop("Couldn't reach FS to check if " + path
+ " was a directory");
}
}
return false;
"snapshot-log-cleaner-cache-refresher", new SnapshotFileCache.SnapshotFileInspector() {
public Collection<String> filesUnderSnapshot(final Path snapshotDir)
throws IOException {
return SnapshotReferenceUtil.getHLogNames(fs, snapshotDir);
}
});
} catch (IOException e) {

View File

@ -36,6 +36,7 @@ import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.io.Reference;
import org.apache.hadoop.hbase.io.HFileLink;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.hadoop.hbase.regionserver.wal.HLogUtil;
import org.apache.hadoop.hbase.util.FSUtils;
@ -221,8 +222,12 @@ public final class SnapshotReferenceUtil {
visitTableStoreFiles(fs, snapshotDir, new FSVisitor.StoreFileVisitor() {
public void storeFile (final String region, final String family, final String hfile)
throws IOException {
if (HFileLink.isHFileLink(hfile)) {
names.add(HFileLink.getReferencedHFileName(hfile));
} else {
names.add(hfile);
}
}
});
return names;
}

View File

@ -71,7 +71,11 @@ public class TestRestoreSnapshotFromClient {
@BeforeClass
public static void setUpBeforeClass() throws Exception {
TEST_UTIL.getConfiguration().set("hbase.master.hfilecleaner.plugins",
"org.apache.hadoop.hbase.master.snapshot.SnapshotHFileCleaner," +
"org.apache.hadoop.hbase.master.cleaner.HFileLinkCleaner");
TEST_UTIL.getConfiguration().setBoolean("hbase.online.schema.update.enable", true);
TEST_UTIL.getConfiguration().setInt("hbase.hstore.compactionThreshold", 10);
TEST_UTIL.getConfiguration().setInt("hbase.regionserver.msginterval", 100);
TEST_UTIL.getConfiguration().setInt("hbase.client.pause", 250);
TEST_UTIL.getConfiguration().setInt("hbase.client.retries.number", 6);
@ -125,10 +129,13 @@ public class TestRestoreSnapshotFromClient {
@After
public void tearDown() throws Exception {
if (admin.tableExists(tableName)) {
admin.disableTable(tableName);
admin.deleteTable(tableName);
}
admin.deleteSnapshot(snapshotName0);
admin.deleteSnapshot(snapshotName1);
waitCleanerRun();
}
@Test
@ -185,6 +192,7 @@ public class TestRestoreSnapshotFromClient {
admin.disableTable(clonedTableName);
admin.snapshot(snapshotName2, clonedTableName);
admin.deleteTable(clonedTableName);
waitCleanerRun();
admin.cloneSnapshot(snapshotName2, clonedTableName);
table = new HTable(TEST_UTIL.getConfiguration(), clonedTableName);
@ -193,6 +201,68 @@ public class TestRestoreSnapshotFromClient {
admin.deleteTable(clonedTableName);
}
/**
* Verify that tables created from the snapshot are still alive after source table deletion.
*/
@Test
public void testCloneLinksAfterDelete() throws IOException, InterruptedException {
// Clone a table from the first snapshot
byte[] clonedTableName = Bytes.toBytes("clonedtb1-" + System.currentTimeMillis());
admin.cloneSnapshot(snapshotName0, clonedTableName);
HTable table = new HTable(TEST_UTIL.getConfiguration(), clonedTableName);
assertEquals(snapshot0Rows, TEST_UTIL.countRows(table));
// Take a snapshot of this cloned table.
admin.disableTable(clonedTableName);
admin.snapshot(snapshotName2, clonedTableName);
// Clone the snapshot of the cloned table
byte[] clonedTableName2 = Bytes.toBytes("clonedtb2-" + System.currentTimeMillis());
admin.cloneSnapshot(snapshotName2, clonedTableName2);
table = new HTable(TEST_UTIL.getConfiguration(), clonedTableName2);
assertEquals(snapshot0Rows, TEST_UTIL.countRows(table));
admin.disableTable(clonedTableName2);
// Remove the original table
admin.disableTable(tableName);
admin.deleteTable(tableName);
waitCleanerRun();
// Verify the first cloned table
admin.enableTable(clonedTableName);
table = new HTable(TEST_UTIL.getConfiguration(), clonedTableName);
assertEquals(snapshot0Rows, TEST_UTIL.countRows(table));
// Verify the second cloned table
admin.enableTable(clonedTableName2);
table = new HTable(TEST_UTIL.getConfiguration(), clonedTableName2);
assertEquals(snapshot0Rows, TEST_UTIL.countRows(table));
admin.disableTable(clonedTableName2);
// Delete the first cloned table
admin.disableTable(clonedTableName);
admin.deleteTable(clonedTableName);
waitCleanerRun();
// Verify the second cloned table
admin.enableTable(clonedTableName2);
table = new HTable(TEST_UTIL.getConfiguration(), clonedTableName2);
assertEquals(snapshot0Rows, TEST_UTIL.countRows(table));
// Clone a new table from cloned
byte[] clonedTableName3 = Bytes.toBytes("clonedtb3-" + System.currentTimeMillis());
admin.cloneSnapshot(snapshotName2, clonedTableName3);
table = new HTable(TEST_UTIL.getConfiguration(), clonedTableName3);
assertEquals(snapshot0Rows, TEST_UTIL.countRows(table));
// Delete the cloned tables
admin.disableTable(clonedTableName2);
admin.deleteTable(clonedTableName2);
admin.disableTable(clonedTableName3);
admin.deleteTable(clonedTableName3);
admin.deleteSnapshot(snapshotName2);
}
// ==========================================================================
// Helpers
// ==========================================================================
@ -225,4 +295,8 @@ public class TestRestoreSnapshotFromClient {
}
table.flushCommits();
}
private void waitCleanerRun() throws InterruptedException {
TEST_UTIL.getMiniHBaseCluster().getMaster().getHFileCleaner().choreForTesting();
}
}

View File

@ -21,18 +21,20 @@ import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.util.Collection;
import java.util.HashSet;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.MediumTests;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
import org.apache.hadoop.hbase.server.snapshot.TakeSnapshotUtils;
import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
import org.apache.hadoop.hbase.snapshot.SnapshotReferenceUtil;
import org.apache.hadoop.hbase.util.FSUtils;
import org.junit.After;
import org.junit.AfterClass;
@ -76,10 +78,10 @@ public class TestSnapshotFileCache {
long period = Long.MAX_VALUE;
Path snapshotDir = SnapshotDescriptionUtils.getSnapshotsDir(rootDir);
SnapshotFileCache cache = new SnapshotFileCache(fs, rootDir, period, 10000000,
"test-snapshot-file-cache-refresh", null);
"test-snapshot-file-cache-refresh", new SnapshotFiles());
Path snapshot = new Path(snapshotDir, "snapshot");
Path region = new Path(snapshot, "region1");
Path region = new Path(snapshot, "7e91021");
Path family = new Path(region, "fam");
Path file1 = new Path(family, "file1");
Path file2 = new Path(family, "file2");
@ -129,19 +131,19 @@ public class TestSnapshotFileCache {
long period = Long.MAX_VALUE;
Path snapshotDir = SnapshotDescriptionUtils.getSnapshotsDir(rootDir);
SnapshotFileCache cache = new SnapshotFileCache(fs, rootDir, period, 10000000,
"test-snapshot-file-cache-refresh", null);
"test-snapshot-file-cache-refresh", new SnapshotFiles());
// create a file in a 'completed' snapshot
Path snapshot = new Path(snapshotDir, "snapshot");
Path region = new Path(snapshot, "region1");
Path region = new Path(snapshot, "7e91021");
Path family = new Path(region, "fam");
Path file1 = new Path(family, "file1");
fs.create(file1);
// create an 'in progress' snapshot
SnapshotDescription desc = SnapshotDescription.newBuilder().setName("working").build();
snapshot = SnapshotDescriptionUtils.getWorkingSnapshotDir(desc, snapshotDir);
region = new Path(snapshot, "region1");
snapshot = SnapshotDescriptionUtils.getWorkingSnapshotDir(desc, rootDir);
region = new Path(snapshot, "7e91021");
family = new Path(region, "fam");
Path file2 = new Path(family, "file2");
fs.create(file2);
@ -158,19 +160,17 @@ public class TestSnapshotFileCache {
// don't refresh the cache unless we tell it to
long period = Long.MAX_VALUE;
Path snapshotDir = SnapshotDescriptionUtils.getSnapshotsDir(rootDir);
PathFilter filter = new PathFilter() {
@Override
public boolean accept(Path path) {
return path.getName().equals(HConstants.HREGION_LOGDIR_NAME);
}
};
SnapshotFileCache cache = new SnapshotFileCache(fs, rootDir, period, 10000000,
"test-snapshot-file-cache-refresh", filter);
"test-snapshot-file-cache-refresh", new SnapshotFileCache.SnapshotFileInspector() {
public Collection<String> filesUnderSnapshot(final Path snapshotDir)
throws IOException {
return SnapshotReferenceUtil.getHLogNames(fs, snapshotDir);
}
});
// create a file in a 'completed' snapshot
Path snapshot = new Path(snapshotDir, "snapshot");
Path region = new Path(snapshot, "region1");
Path region = new Path(snapshot, "7e91021");
Path family = new Path(region, "fam");
Path file1 = new Path(family, "file1");
fs.create(file1);
@ -194,10 +194,10 @@ public class TestSnapshotFileCache {
long period = Long.MAX_VALUE;
Path snapshotDir = SnapshotDescriptionUtils.getSnapshotsDir(rootDir);
SnapshotFileCache cache = new SnapshotFileCache(fs, rootDir, period, 10000000,
"test-snapshot-file-cache-refresh", null);
"test-snapshot-file-cache-refresh", new SnapshotFiles());
Path snapshot = new Path(snapshotDir, "snapshot");
Path region = new Path(snapshot, "region1");
Path region = new Path(snapshot, "7e91021");
Path family = new Path(region, "fam");
Path file1 = new Path(family, "file1");
Path file2 = new Path(family, "file2");
@ -219,4 +219,12 @@ public class TestSnapshotFileCache {
assertTrue("Cache didn't find new file:" + file3, cache.contains(file3.getName()));
}
class SnapshotFiles implements SnapshotFileCache.SnapshotFileInspector {
public Collection<String> filesUnderSnapshot(final Path snapshotDir) throws IOException {
Collection<String> files = new HashSet<String>();
files.addAll(SnapshotReferenceUtil.getHLogNames(fs, snapshotDir));
files.addAll(SnapshotReferenceUtil.getHFileNames(fs, snapshotDir));
return files;
}
};
}