From a85cb0f89a01495897e513df37e435d013e152ce Mon Sep 17 00:00:00 2001 From: rahulgidwani Date: Mon, 26 Jan 2015 18:44:25 -0800 Subject: [PATCH] HBASE-12627 Add back snapshot batching facility Signed-off-by: Andrew Purtell --- .../master/snapshot/SnapshotFileCache.java | 80 +++++++---- .../master/snapshot/SnapshotHFileCleaner.java | 14 +- .../master/snapshot/SnapshotLogCleaner.java | 11 +- .../snapshot/TestSnapshotFileCache.java | 130 +++++++++++++++--- 4 files changed, 179 insertions(+), 56 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/SnapshotFileCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/SnapshotFileCache.java index 364855672a9..dfd3cb5e7b1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/SnapshotFileCache.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/SnapshotFileCache.java @@ -22,11 +22,14 @@ import java.io.IOException; import java.util.Collection; import java.util.HashMap; import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.Set; import java.util.Timer; import java.util.TimerTask; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.Lists; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.classification.InterfaceAudience; @@ -153,10 +156,10 @@ public class SnapshotFileCache implements Stoppable { } /** - * Check to see if the passed file name is contained in any of the snapshots. First checks an - * in-memory cache of the files to keep. If its not in the cache, then the cache is refreshed and - * the cache checked again for that file. This ensures that we always return true for a - * files that exists. + * Check to see if any of the passed file names is contained in any of the snapshots. + * First checks an in-memory cache of the files to keep. If its not in the cache, then the cache + * is refreshed and the cache checked again for that file. + * This ensures that we never return files that exist. *

* Note this may lead to periodic false positives for the file being referenced. Periodically, the * cache is refreshed even if there are no requests to ensure that the false negatives get removed @@ -165,22 +168,37 @@ public class SnapshotFileCache implements Stoppable { * at that point, cache will still think the file system contains that file and return * true, even if it is no longer present (false positive). However, if the file never was * on the filesystem, we will never find it and always return false. - * @param fileName file to check - * @return false if the file is not referenced in any current or running snapshot, - * true if the file is in the cache. + * @param files file to check, NOTE: Relies that files are loaded from hdfs before method + * is called (NOT LAZY) + * @return unReferencedFiles the collection of files that do not have snapshot references * @throws IOException if there is an unexpected error reaching the filesystem. */ // XXX this is inefficient to synchronize on the method, when what we really need to guard against // is an illegal access to the cache. Really we could do a mutex-guarded pointer swap on the // cache, but that seems overkill at the moment and isn't necessarily a bottleneck. - public synchronized boolean contains(String fileName) throws IOException { - boolean hasFile = this.cache.contains(fileName); - if (!hasFile) { - refreshCache(); - // then check again - hasFile = this.cache.contains(fileName); + public synchronized Iterable getUnreferencedFiles(Iterable files) + throws IOException { + List unReferencedFiles = Lists.newArrayList(); + List snapshotsInProgress = null; + boolean refreshed = false; + for (FileStatus file : files) { + String fileName = file.getPath().getName(); + if (!refreshed && !cache.contains(fileName)) { + refreshCache(); + refreshed = true; + } + if (cache.contains(fileName)) { + continue; + } + if (snapshotsInProgress == null) { + snapshotsInProgress = getSnapshotsInProgress(); + } + if (snapshotsInProgress.contains(fileName)) { + continue; + } + unReferencedFiles.add(file); } - return hasFile; + return unReferencedFiles; } private synchronized void refreshCache() throws IOException { @@ -250,20 +268,14 @@ public class SnapshotFileCache implements Stoppable { // 3.1 iterate through the on-disk snapshots for (FileStatus snapshot : snapshots) { String name = snapshot.getPath().getName(); - // its the tmp dir - if (name.equals(SnapshotDescriptionUtils.SNAPSHOT_TMP_DIR_NAME)) { - // only add those files to the cache, but not to the known snapshots - FileStatus[] running = FSUtils.listStatus(fs, snapshot.getPath()); - if (running == null) continue; - for (FileStatus run : running) { - this.cache.addAll(fileInspector.filesUnderSnapshot(run.getPath())); - } - } else { + // its not the tmp dir, + if (!name.equals(SnapshotDescriptionUtils.SNAPSHOT_TMP_DIR_NAME)) { SnapshotDirectoryInfo files = this.snapshots.remove(name); - // 3.1.1 if we don't know about the snapshot or its been modified, we need to update the files - // the latter could occur where I create a snapshot, then delete it, and then make a new - // snapshot with the same name. We will need to update the cache the information from that new - // snapshot, even though it has the same name as the files referenced have probably changed. + // 3.1.1 if we don't know about the snapshot or its been modified, we need to update the + // files the latter could occur where I create a snapshot, then delete it, and then make a + // new snapshot with the same name. We will need to update the cache the information from + // that new snapshot, even though it has the same name as the files referenced have + // probably changed. if (files == null || files.hasBeenModified(snapshot.getModificationTime())) { // get all files for the snapshot and create a new info Collection storedFiles = fileInspector.filesUnderSnapshot(snapshot.getPath()); @@ -279,6 +291,20 @@ public class SnapshotFileCache implements Stoppable { this.snapshots.clear(); this.snapshots.putAll(known); } + + @VisibleForTesting List getSnapshotsInProgress() throws IOException { + List snapshotInProgress = Lists.newArrayList(); + // only add those files to the cache, but not to the known snapshots + Path snapshotTmpDir = new Path(snapshotDir, SnapshotDescriptionUtils.SNAPSHOT_TMP_DIR_NAME); + // only add those files to the cache, but not to the known snapshots + FileStatus[] running = FSUtils.listStatus(fs, snapshotTmpDir); + if (running != null) { + for (FileStatus run : running) { + snapshotInProgress.addAll(fileInspector.filesUnderSnapshot(run.getPath())); + } + } + return snapshotInProgress; + } /** * Simple helper task that just periodically attempts to refresh the cache diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/SnapshotHFileCleaner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/SnapshotHFileCleaner.java index 4bbc1b62ebf..09b0c94b365 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/SnapshotHFileCleaner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/SnapshotHFileCleaner.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.master.snapshot; import java.io.IOException; import java.util.Collection; +import java.util.Collections; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -56,17 +57,20 @@ public class SnapshotHFileCleaner extends BaseHFileCleanerDelegate { private SnapshotFileCache cache; @Override - public synchronized boolean isFileDeletable(FileStatus fStat) { + public synchronized Iterable getDeletableFiles(Iterable files) { try { - return !cache.contains(fStat.getPath().getName()); + return cache.getUnreferencedFiles(files); } catch (IOException e) { - LOG.error("Exception while checking if:" + fStat.getPath() - + " was valid, keeping it just in case.", e); - return false; + LOG.error("Exception while checking if files were valid, keeping them just in case.", e); + return Collections.emptyList(); } } @Override + protected boolean isFileDeletable(FileStatus fStat) { + return false; + } + public void setConf(final Configuration conf) { super.setConf(conf); try { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/SnapshotLogCleaner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/SnapshotLogCleaner.java index a927db314cd..28aa800d17e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/SnapshotLogCleaner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/SnapshotLogCleaner.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.master.snapshot; import java.io.IOException; import java.util.Collection; +import java.util.Collections; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -55,14 +56,12 @@ public class SnapshotLogCleaner extends BaseLogCleanerDelegate { private SnapshotFileCache cache; @Override - public synchronized boolean isFileDeletable(FileStatus fStat) { + public synchronized Iterable getDeletableFiles(Iterable files) { try { - if (null == cache) return false; - return !cache.contains(fStat.getPath().getName()); + return cache.getUnreferencedFiles(files); } catch (IOException e) { - LOG.error("Exception while checking if:" + fStat.getPath() - + " was valid, keeping it just in case.", e); - return false; + LOG.error("Exception while checking if files were valid, keeping them just in case.", e); + return Collections.emptyList(); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/snapshot/TestSnapshotFileCache.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/snapshot/TestSnapshotFileCache.java index efaef9de6c4..1da38b85a4d 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/snapshot/TestSnapshotFileCache.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/snapshot/TestSnapshotFileCache.java @@ -17,20 +17,26 @@ */ package org.apache.hadoop.hbase.master.snapshot; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; import java.io.IOException; -import java.util.Collection; -import java.util.ArrayList; -import java.util.HashSet; -import java.util.List; +import java.util.*; +import java.util.concurrent.atomic.AtomicInteger; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.protobuf.generated.SnapshotProtos; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.MasterTests; import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription; @@ -117,9 +123,11 @@ public class TestSnapshotFileCache { FSUtils.logFileSystemState(fs, rootDir, LOG); // then make sure the cache only finds the log files + Iterable notSnapshot = getNonSnapshotFiles(cache, file1); assertFalse("Cache found '" + file1 + "', but it shouldn't have.", - cache.contains(file1.getName())); - assertTrue("Cache didn't find:" + log, cache.contains(log.getName())); + Iterables.contains(notSnapshot, file1.getName())); + notSnapshot = getNonSnapshotFiles(cache, log); + assertTrue("Cache didn't find:" + log, !Iterables.contains(notSnapshot, log)); } @Test @@ -156,6 +164,78 @@ public class TestSnapshotFileCache { createAndTestSnapshotV2(cache, "snapshot2", true, false); } + @Test + public void testWeNeverCacheTmpDirAndLoadIt() throws Exception { + + final AtomicInteger count = new AtomicInteger(0); + // don't refresh the cache unless we tell it to + long period = Long.MAX_VALUE; + SnapshotFileCache cache = new SnapshotFileCache(fs, rootDir, period, 10000000, + "test-snapshot-file-cache-refresh", new SnapshotFiles()) { + @Override + List getSnapshotsInProgress() throws IOException { + List result = super.getSnapshotsInProgress(); + count.incrementAndGet(); + return result; + } + + @Override public void triggerCacheRefreshForTesting() { + super.triggerCacheRefreshForTesting(); + } + }; + + SnapshotMock.SnapshotBuilder complete = + createAndTestSnapshotV1(cache, "snapshot", false, false); + + SnapshotMock.SnapshotBuilder inProgress = + createAndTestSnapshotV1(cache, "snapshotInProgress", true, false); + + int countBeforeCheck = count.get(); + + FSUtils.logFileSystemState(fs, rootDir, LOG); + + List allStoreFiles = getStoreFilesForSnapshot(complete); + Iterable deletableFiles = cache.getUnreferencedFiles(allStoreFiles); + assertTrue(Iterables.isEmpty(deletableFiles)); + // no need for tmp dir check as all files are accounted for. + assertEquals(0, count.get() - countBeforeCheck); + + + // add a random file to make sure we refresh + FileStatus randomFile = mockStoreFile(UUID.randomUUID().toString()); + allStoreFiles.add(randomFile); + deletableFiles = cache.getUnreferencedFiles(allStoreFiles); + assertEquals(randomFile, Iterables.getOnlyElement(deletableFiles)); + assertEquals(1, count.get() - countBeforeCheck); // we check the tmp directory + } + + private List getStoreFilesForSnapshot(SnapshotMock.SnapshotBuilder builder) + throws IOException { + final List allStoreFiles = Lists.newArrayList(); + SnapshotReferenceUtil + .visitReferencedFiles(UTIL.getConfiguration(), fs, builder.getSnapshotsDir(), + new SnapshotReferenceUtil.SnapshotVisitor() { + @Override public void logFile(String server, String logfile) throws IOException { + // do nothing. + } + + @Override public void storeFile(HRegionInfo regionInfo, String familyName, + SnapshotProtos.SnapshotRegionManifest.StoreFile storeFile) throws IOException { + FileStatus status = mockStoreFile(storeFile.getName()); + allStoreFiles.add(status); + } + }); + return allStoreFiles; + } + + private FileStatus mockStoreFile(String storeFileName) { + FileStatus status = mock(FileStatus.class); + Path path = mock(Path.class); + when(path.getName()).thenReturn(storeFileName); + when(status.getPath()).thenReturn(path); + return status; + } + class SnapshotFiles implements SnapshotFileCache.SnapshotFileInspector { public Collection filesUnderSnapshot(final Path snapshotDir) throws IOException { Collection files = new HashSet(); @@ -165,11 +245,12 @@ public class TestSnapshotFileCache { } }; - private void createAndTestSnapshotV1(final SnapshotFileCache cache, final String name, - final boolean tmp, final boolean removeOnExit) throws IOException { + private SnapshotMock.SnapshotBuilder createAndTestSnapshotV1(final SnapshotFileCache cache, + final String name, final boolean tmp, final boolean removeOnExit) throws IOException { SnapshotMock snapshotMock = new SnapshotMock(UTIL.getConfiguration(), fs, rootDir); SnapshotMock.SnapshotBuilder builder = snapshotMock.createSnapshotV1(name); createAndTestSnapshot(cache, builder, tmp, removeOnExit); + return builder; } private void createAndTestSnapshotV2(final SnapshotFileCache cache, final String name, @@ -182,16 +263,17 @@ public class TestSnapshotFileCache { private void createAndTestSnapshot(final SnapshotFileCache cache, final SnapshotMock.SnapshotBuilder builder, final boolean tmp, final boolean removeOnExit) throws IOException { - List files = new ArrayList(); + List files = new ArrayList(); for (int i = 0; i < 3; ++i) { for (Path filePath: builder.addRegion()) { String fileName = filePath.getName(); if (tmp) { // We should be able to find all the files while the snapshot creation is in-progress FSUtils.logFileSystemState(fs, rootDir, LOG); - assertTrue("Cache didn't find " + fileName, cache.contains(fileName)); + Iterable nonSnapshot = getNonSnapshotFiles(cache, filePath); + assertFalse("Cache didn't find " + fileName, Iterables.contains(nonSnapshot, fileName)); } - files.add(fileName); + files.add(filePath); } } @@ -201,8 +283,10 @@ public class TestSnapshotFileCache { } // Make sure that all files are still present - for (String fileName: files) { - assertTrue("Cache didn't find " + fileName, cache.contains(fileName)); + for (Path path: files) { + Iterable nonSnapshotFiles = getNonSnapshotFiles(cache, path); + assertFalse("Cache didn't find " + path.getName(), + Iterables.contains(nonSnapshotFiles, path.getName())); } FSUtils.logFileSystemState(fs, rootDir, LOG); @@ -212,17 +296,27 @@ public class TestSnapshotFileCache { FSUtils.logFileSystemState(fs, rootDir, LOG); // The files should be in cache until next refresh - for (String fileName: files) { - assertTrue("Cache didn't find " + fileName, cache.contains(fileName)); + for (Path filePath: files) { + Iterable nonSnapshotFiles = getNonSnapshotFiles(cache, filePath); + assertFalse("Cache didn't find " + filePath.getName(), Iterables.contains(nonSnapshotFiles, + filePath.getName())); } // then trigger a refresh cache.triggerCacheRefreshForTesting(); // and not it shouldn't find those files - for (String fileName: files) { - assertFalse("Cache found '" + fileName + "', but it shouldn't have.", - cache.contains(fileName)); + for (Path filePath: files) { + Iterable nonSnapshotFiles = getNonSnapshotFiles(cache, filePath); + assertTrue("Cache found '" + filePath.getName() + "', but it shouldn't have.", + !Iterables.contains(nonSnapshotFiles, filePath.getName())); } } } + + private Iterable getNonSnapshotFiles(SnapshotFileCache cache, Path storeFile) + throws IOException { + return cache.getUnreferencedFiles( + Arrays.asList(FSUtils.listStatus(fs, storeFile.getParent())) + ); + } }