diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/SnapshotFileCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/SnapshotFileCache.java index 358b4eae513..006ca2a0ca5 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/SnapshotFileCache.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/SnapshotFileCache.java @@ -27,7 +27,7 @@ import java.util.Map; import java.util.Set; import java.util.Timer; import java.util.TimerTask; -import java.util.concurrent.locks.ReentrantLock; +import java.util.concurrent.locks.Lock; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; @@ -184,22 +184,39 @@ public class SnapshotFileCache implements Stoppable { List unReferencedFiles = Lists.newArrayList(); List snapshotsInProgress = null; boolean refreshed = false; - for (FileStatus file : files) { - String fileName = file.getPath().getName(); - if (!refreshed && !cache.contains(fileName)) { - refreshCache(); - refreshed = true; + Lock lock = null; + if (snapshotManager != null) { + lock = snapshotManager.getTakingSnapshotLock().writeLock(); + } + if (lock == null || lock.tryLock()) { + try { + if (snapshotManager == null || snapshotManager.isTakingAnySnapshot()) { + LOG.warn("Not checking unreferenced files since snapshot is running, it will " + + "skip to clean the HFiles this time"); + return unReferencedFiles; + } + for (FileStatus file : files) { + String fileName = file.getPath().getName(); + if (!refreshed && !cache.contains(fileName)) { + refreshCache(); + refreshed = true; + } + if (cache.contains(fileName)) { + continue; + } + if (snapshotsInProgress == null) { + snapshotsInProgress = getSnapshotsInProgress(); + } + if (snapshotsInProgress.contains(fileName)) { + continue; + } + unReferencedFiles.add(file); + } + } finally { + if (lock != null) { + lock.unlock(); + } } - if (cache.contains(fileName)) { - continue; - } - if (snapshotsInProgress == null) { - snapshotsInProgress = getSnapshotsInProgress(snapshotManager); - } - if (snapshotsInProgress.contains(fileName)) { - continue; - } - unReferencedFiles.add(file); } return unReferencedFiles; } @@ -269,19 +286,14 @@ public class SnapshotFileCache implements Stoppable { this.snapshots.putAll(known); } - @VisibleForTesting List getSnapshotsInProgress( - final SnapshotManager snapshotManager) throws IOException { + @VisibleForTesting + List getSnapshotsInProgress() throws IOException { List snapshotInProgress = Lists.newArrayList(); // only add those files to the cache, but not to the known snapshots Path snapshotTmpDir = new Path(snapshotDir, SnapshotDescriptionUtils.SNAPSHOT_TMP_DIR_NAME); - // only add those files to the cache, but not to the known snapshots FileStatus[] running = FSUtils.listStatus(fs, snapshotTmpDir); if (running != null) { for (FileStatus run : running) { - ReentrantLock lock = null; - if (snapshotManager != null) { - lock = snapshotManager.getLocks().acquireLock(run.getPath().getName()); - } try { snapshotInProgress.addAll(fileInspector.filesUnderSnapshot(run.getPath())); } catch (CorruptedSnapshotException e) { @@ -293,10 +305,6 @@ public class SnapshotFileCache implements Stoppable { } else { throw e; } - } finally { - if (lock != null) { - lock.unlock(); - } } } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/SnapshotManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/SnapshotManager.java index 82acc7f6b9f..ae9b6fb0021 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/SnapshotManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/SnapshotManager.java @@ -28,6 +28,8 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; @@ -77,7 +79,6 @@ import org.apache.hadoop.hbase.snapshot.TablePartiallyOpenException; import org.apache.hadoop.hbase.snapshot.UnknownSnapshotException; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.FSUtils; -import org.apache.hadoop.hbase.util.KeyLocker; import org.apache.hadoop.hbase.util.NonceKey; import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceStability; @@ -164,14 +165,12 @@ public class SnapshotManager extends MasterProcedureManager implements Stoppable private ExecutorService executorService; /** - * Locks for snapshot operations - * key is snapshot's filename in progress, value is the related lock - * - create snapshot - * - SnapshotCleaner - * */ - private KeyLocker locks = new KeyLocker<>(); - - + * Read write lock between taking snapshot and snapshot HFile cleaner. The cleaner should skip to + * check the HFiles if any snapshot is in progress, otherwise it may clean a HFile which would + * belongs to the newly creating snapshot. So we should grab the write lock first when cleaner + * start to work. (See HBASE-21387) + */ + private ReentrantReadWriteLock takingSnapshotLock = new ReentrantReadWriteLock(true); public SnapshotManager() {} @@ -547,14 +546,38 @@ public class SnapshotManager extends MasterProcedureManager implements Stoppable } } + public ReadWriteLock getTakingSnapshotLock() { + return this.takingSnapshotLock; + } + + /** + * The snapshot operation processing as following:
+ * 1. Create a Snapshot Handler, and do some initialization;
+ * 2. Put the handler into snapshotHandlers
+ * So when we consider if any snapshot is taking, we should consider both the takingSnapshotLock + * and snapshotHandlers; + * @return true to indicate that there're some running snapshots. + */ + public synchronized boolean isTakingAnySnapshot() { + return this.takingSnapshotLock.getReadHoldCount() > 0 || this.snapshotHandlers.size() > 0; + } + /** * Take a snapshot based on the enabled/disabled state of the table. - * * @param snapshot * @throws HBaseSnapshotException when a snapshot specific exception occurs. * @throws IOException when some sort of generic IO exception occurs. */ public void takeSnapshot(SnapshotDescription snapshot) throws IOException { + this.takingSnapshotLock.readLock().lock(); + try { + takeSnapshotInternal(snapshot); + } finally { + this.takingSnapshotLock.readLock().unlock(); + } + } + + private void takeSnapshotInternal(SnapshotDescription snapshot) throws IOException { // check to see if we already completed the snapshot if (isSnapshotCompleted(snapshot)) { throw new SnapshotExistsException( @@ -1189,9 +1212,4 @@ public class SnapshotManager extends MasterProcedureManager implements Stoppable builder.setType(SnapshotDescription.Type.FLUSH); return builder.build(); } - - public KeyLocker getLocks() { - return locks; - } - } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/TakeSnapshotHandler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/TakeSnapshotHandler.java index d44312ab9ac..1dce79fc1db 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/TakeSnapshotHandler.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/TakeSnapshotHandler.java @@ -24,7 +24,6 @@ import java.util.HashSet; import java.util.List; import java.util.Set; import java.util.concurrent.CancellationException; -import java.util.concurrent.locks.ReentrantLock; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; @@ -174,7 +173,6 @@ public abstract class TakeSnapshotHandler extends EventHandler implements Snapsh String msg = "Running " + snapshot.getType() + " table snapshot " + snapshot.getName() + " " + eventType + " on table " + snapshotTable; LOG.info(msg); - ReentrantLock lock = snapshotManager.getLocks().acquireLock(snapshot.getName()); MasterLock tableLockToRelease = this.tableLock; status.setStatus(msg); try { @@ -251,7 +249,6 @@ public abstract class TakeSnapshotHandler extends EventHandler implements Snapsh } catch (IOException e) { LOG.error("Couldn't delete snapshot working directory:" + workingDir); } - lock.unlock(); tableLockToRelease.release(); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/SnapshotDescriptionUtils.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/SnapshotDescriptionUtils.java index 0c1e761cbcf..39202c4b18a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/SnapshotDescriptionUtils.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/SnapshotDescriptionUtils.java @@ -120,8 +120,6 @@ public final class SnapshotDescriptionUtils { */ public static final String SNAPSHOT_WORKING_DIR = "hbase.snapshot.working.dir"; - /** This tag will be created in in-progess snapshots */ - public static final String SNAPSHOT_IN_PROGRESS = ".inprogress"; // snapshot operation values /** Default value if no start time is specified */ public static final long NO_SNAPSHOT_START_TIME_SPECIFIED = 0; @@ -354,16 +352,6 @@ public final class SnapshotDescriptionUtils { } } - /** - * Create in-progress tag under .tmp of in-progress snapshot - * */ - public static void createInProgressTag(Path workingDir, FileSystem fs) throws IOException { - FsPermission perms = FSUtils.getFilePermissions(fs, fs.getConf(), - HConstants.DATA_FILE_UMASK_KEY); - Path snapshot_in_progress = new Path(workingDir, SnapshotDescriptionUtils.SNAPSHOT_IN_PROGRESS); - FSUtils.create(fs, snapshot_in_progress, perms, true); - } - /** * Read in the {@link org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription} stored for the snapshot in the passed directory * @param fs filesystem where the snapshot was taken diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/snapshot/TestSnapshotFileCache.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/snapshot/TestSnapshotFileCache.java index 6cb014233a3..192086496fb 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/snapshot/TestSnapshotFileCache.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/snapshot/TestSnapshotFileCache.java @@ -147,9 +147,9 @@ public class TestSnapshotFileCache { SnapshotFileCache cache = new SnapshotFileCache(fs, rootDir, period, 10000000, "test-snapshot-file-cache-refresh", new SnapshotFiles()) { @Override - List getSnapshotsInProgress(final SnapshotManager snapshotManager) + List getSnapshotsInProgress() throws IOException { - List result = super.getSnapshotsInProgress(snapshotManager); + List result = super.getSnapshotsInProgress(); count.incrementAndGet(); return result; } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/snapshot/TestSnapshotHFileCleaner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/snapshot/TestSnapshotHFileCleaner.java index 0a14f77eaf2..08a68be4d19 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/snapshot/TestSnapshotHFileCleaner.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/snapshot/TestSnapshotHFileCleaner.java @@ -145,7 +145,7 @@ public class TestSnapshotHFileCleaner { SnapshotFileCache cache = new SnapshotFileCache(fs, rootDir, period, 10000000, "test-snapshot-file-cache-refresh", new SnapshotFiles()); try { - cache.getSnapshotsInProgress(null); + cache.getSnapshotsInProgress(); } catch (CorruptedSnapshotException cse) { LOG.info("Expected exception " + cse); } finally { @@ -173,7 +173,7 @@ public class TestSnapshotHFileCleaner { SnapshotFileCache cache = new SnapshotFileCache(fs, rootDir, period, 10000000, "test-snapshot-file-cache-refresh", new SnapshotFiles()); try { - cache.getSnapshotsInProgress(null); + cache.getSnapshotsInProgress(); } catch (CorruptedSnapshotException cse) { LOG.info("Expected exception " + cse); } finally { @@ -197,7 +197,7 @@ public class TestSnapshotHFileCleaner { long period = Long.MAX_VALUE; SnapshotFileCache cache = new SnapshotFileCache(fs, rootDir, period, 10000000, "test-snapshot-file-cache-refresh", new SnapshotFiles()); - cache.getSnapshotsInProgress(null); + cache.getSnapshotsInProgress(); assertFalse(fs.exists(builder.getSnapshotsDir())); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/snapshot/TestSnapshotWhenChoreCleaning.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/snapshot/TestSnapshotWhenChoreCleaning.java new file mode 100644 index 00000000000..22591c7bd37 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/snapshot/TestSnapshotWhenChoreCleaning.java @@ -0,0 +1,207 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.snapshot; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.TestTableName; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.master.HMaster; +import org.apache.hadoop.hbase.master.snapshot.SnapshotHFileCleaner; +import org.apache.hadoop.hbase.master.snapshot.SnapshotManager; +import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.FSUtils; +import org.apache.hadoop.hbase.util.FSVisitor; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap; +import org.apache.hbase.thirdparty.com.google.common.collect.Lists; + +/** + * Test Case for HBASE-21387 + */ +@Category({ LargeTests.class }) +public class TestSnapshotWhenChoreCleaning { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestSnapshotWhenChoreCleaning.class); + + private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + private static final Configuration CONF = TEST_UTIL.getConfiguration(); + private static final Logger LOG = LoggerFactory.getLogger(TestSnapshotClientRetries.class); + private static final TableName TABLE_NAME = TableName.valueOf("testTable"); + private static final int MAX_SPLIT_KEYS_NUM = 100; + private static final byte[] FAMILY = Bytes.toBytes("family"); + private static final byte[] QUALIFIER = Bytes.toBytes("qualifier"); + private static final byte[] VALUE = Bytes.toBytes("value"); + private static Table TABLE; + + @Rule + public TestTableName TEST_TABLE = new TestTableName(); + + @BeforeClass + public static void setUp() throws Exception { + // Set the hbase.snapshot.thread.pool.max to 1; + CONF.setInt("hbase.snapshot.thread.pool.max", 1); + // Enable snapshot + CONF.setBoolean(SnapshotManager.HBASE_SNAPSHOT_ENABLED, true); + // Start MiniCluster. + TEST_UTIL.startMiniCluster(3); + // Create talbe + createTable(); + } + + private static byte[] integerToBytes(int i) { + return Bytes.toBytes(String.format("%06d", i)); + } + + private static void createTable() throws IOException { + byte[][] splitKeys = new byte[MAX_SPLIT_KEYS_NUM][]; + for (int i = 0; i < splitKeys.length; i++) { + splitKeys[i] = integerToBytes(i); + } + TABLE = TEST_UTIL.createTable(TABLE_NAME, FAMILY, splitKeys); + } + + @AfterClass + public static void tearDown() throws Exception { + TEST_UTIL.shutdownMiniCluster(); + } + + private static void loadDataAndFlush() throws IOException { + for (int i = 0; i < MAX_SPLIT_KEYS_NUM; i++) { + Put put = new Put(integerToBytes(i)).addColumn(FAMILY, QUALIFIER, + Bytes.add(VALUE, Bytes.toBytes(i))); + TABLE.put(put); + } + TEST_UTIL.flush(TABLE_NAME); + } + + private static List listHFileNames(final FileSystem fs, final Path tableDir) + throws IOException { + final List hfiles = new ArrayList<>(); + FSVisitor.visitTableStoreFiles(fs, tableDir, (region, family, hfileName) -> { + hfiles.add(new Path(new Path(new Path(tableDir, region), family), hfileName)); + }); + Collections.sort(hfiles); + return hfiles; + } + + private static boolean isAnySnapshots(FileSystem fs) throws IOException { + Path snapshotDir = SnapshotDescriptionUtils.getSnapshotsDir(FSUtils.getRootDir(CONF)); + FileStatus[] snapFiles = fs.listStatus(snapshotDir); + if (snapFiles.length == 0) { + return false; + } + Path firstPath = snapFiles[0].getPath(); + LOG.info("firstPath in isAnySnapshots: " + firstPath); + if (snapFiles.length == 1 && firstPath.getName().equals(".tmp")) { + FileStatus[] tmpSnapFiles = fs.listStatus(firstPath); + return tmpSnapFiles != null && tmpSnapFiles.length > 0; + } + return true; + } + + @Test + public void testSnapshotWhenSnapshotHFileCleanerRunning() throws Exception { + // Load data and flush to generate huge number of HFiles. + loadDataAndFlush(); + + SnapshotHFileCleaner cleaner = new SnapshotHFileCleaner(); + cleaner.init(ImmutableMap.of(HMaster.MASTER, TEST_UTIL.getHBaseCluster().getMaster())); + cleaner.setConf(CONF); + + FileSystem fs = FSUtils.getCurrentFileSystem(CONF); + List fileNames = + listHFileNames(fs, FSUtils.getTableDir(FSUtils.getRootDir(CONF), TABLE_NAME)); + List files = new ArrayList<>(); + for (Path fileName : fileNames) { + files.add(fs.getFileStatus(fileName)); + } + + TEST_UTIL.getAdmin().snapshot("snapshotName_prev", TABLE_NAME); + Assert.assertEquals(Lists.newArrayList(cleaner.getDeletableFiles(files)).size(), 0); + TEST_UTIL.getAdmin().deleteSnapshot("snapshotName_prev"); + cleaner.getFileCacheForTesting().triggerCacheRefreshForTesting(); + Assert.assertEquals(Lists.newArrayList(cleaner.getDeletableFiles(files)).size(), 100); + + Runnable snapshotRunnable = () -> { + try { + // The thread will be busy on taking snapshot; + for (int k = 0; k < 5; k++) { + TEST_UTIL.getAdmin().snapshot("snapshotName_" + k, TABLE_NAME); + } + } catch (Exception e) { + LOG.error("Snapshot failed: ", e); + } + }; + final AtomicBoolean success = new AtomicBoolean(true); + Runnable cleanerRunnable = () -> { + try { + while (!isAnySnapshots(fs)) { + LOG.info("Not found any snapshot, sleep 100ms"); + Thread.sleep(100); + } + for (int k = 0; k < 5; k++) { + cleaner.getFileCacheForTesting().triggerCacheRefreshForTesting(); + Iterable toDeleteFiles = cleaner.getDeletableFiles(files); + List deletableFiles = Lists.newArrayList(toDeleteFiles); + LOG.info("Size of deletableFiles is: " + deletableFiles.size()); + for (int i = 0; i < deletableFiles.size(); i++) { + LOG.debug("toDeleteFiles[{}] is: {}", i, deletableFiles.get(i)); + } + if (deletableFiles.size() > 0) { + success.set(false); + } + } + } catch (Exception e) { + LOG.error("Chore cleaning failed: ", e); + } + }; + Thread t1 = new Thread(snapshotRunnable); + t1.start(); + Thread t2 = new Thread(cleanerRunnable); + t2.start(); + t1.join(); + t2.join(); + Assert.assertTrue(success.get()); + } +}