HBASE-21387 Race condition surrounding in progress snapshot handling in snapshot cache leads to loss of snapshot files
This commit is contained in:
parent
248b8a6f56
commit
cc26c8716f
|
@ -27,7 +27,7 @@ import java.util.Map;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.Timer;
|
import java.util.Timer;
|
||||||
import java.util.TimerTask;
|
import java.util.TimerTask;
|
||||||
import java.util.concurrent.locks.ReentrantLock;
|
import java.util.concurrent.locks.Lock;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.FileStatus;
|
import org.apache.hadoop.fs.FileStatus;
|
||||||
|
@ -184,22 +184,39 @@ public class SnapshotFileCache implements Stoppable {
|
||||||
List<FileStatus> unReferencedFiles = Lists.newArrayList();
|
List<FileStatus> unReferencedFiles = Lists.newArrayList();
|
||||||
List<String> snapshotsInProgress = null;
|
List<String> snapshotsInProgress = null;
|
||||||
boolean refreshed = false;
|
boolean refreshed = false;
|
||||||
for (FileStatus file : files) {
|
Lock lock = null;
|
||||||
String fileName = file.getPath().getName();
|
if (snapshotManager != null) {
|
||||||
if (!refreshed && !cache.contains(fileName)) {
|
lock = snapshotManager.getTakingSnapshotLock().writeLock();
|
||||||
refreshCache();
|
}
|
||||||
refreshed = true;
|
if (lock == null || lock.tryLock()) {
|
||||||
|
try {
|
||||||
|
if (snapshotManager == null || snapshotManager.isTakingAnySnapshot()) {
|
||||||
|
LOG.warn("Not checking unreferenced files since snapshot is running, it will "
|
||||||
|
+ "skip to clean the HFiles this time");
|
||||||
|
return unReferencedFiles;
|
||||||
|
}
|
||||||
|
for (FileStatus file : files) {
|
||||||
|
String fileName = file.getPath().getName();
|
||||||
|
if (!refreshed && !cache.contains(fileName)) {
|
||||||
|
refreshCache();
|
||||||
|
refreshed = true;
|
||||||
|
}
|
||||||
|
if (cache.contains(fileName)) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
if (snapshotsInProgress == null) {
|
||||||
|
snapshotsInProgress = getSnapshotsInProgress();
|
||||||
|
}
|
||||||
|
if (snapshotsInProgress.contains(fileName)) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
unReferencedFiles.add(file);
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
if (lock != null) {
|
||||||
|
lock.unlock();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
if (cache.contains(fileName)) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
if (snapshotsInProgress == null) {
|
|
||||||
snapshotsInProgress = getSnapshotsInProgress(snapshotManager);
|
|
||||||
}
|
|
||||||
if (snapshotsInProgress.contains(fileName)) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
unReferencedFiles.add(file);
|
|
||||||
}
|
}
|
||||||
return unReferencedFiles;
|
return unReferencedFiles;
|
||||||
}
|
}
|
||||||
|
@ -269,19 +286,14 @@ public class SnapshotFileCache implements Stoppable {
|
||||||
this.snapshots.putAll(known);
|
this.snapshots.putAll(known);
|
||||||
}
|
}
|
||||||
|
|
||||||
@VisibleForTesting List<String> getSnapshotsInProgress(
|
@VisibleForTesting
|
||||||
final SnapshotManager snapshotManager) throws IOException {
|
List<String> getSnapshotsInProgress() throws IOException {
|
||||||
List<String> snapshotInProgress = Lists.newArrayList();
|
List<String> snapshotInProgress = Lists.newArrayList();
|
||||||
// only add those files to the cache, but not to the known snapshots
|
// only add those files to the cache, but not to the known snapshots
|
||||||
Path snapshotTmpDir = new Path(snapshotDir, SnapshotDescriptionUtils.SNAPSHOT_TMP_DIR_NAME);
|
Path snapshotTmpDir = new Path(snapshotDir, SnapshotDescriptionUtils.SNAPSHOT_TMP_DIR_NAME);
|
||||||
// only add those files to the cache, but not to the known snapshots
|
|
||||||
FileStatus[] running = FSUtils.listStatus(fs, snapshotTmpDir);
|
FileStatus[] running = FSUtils.listStatus(fs, snapshotTmpDir);
|
||||||
if (running != null) {
|
if (running != null) {
|
||||||
for (FileStatus run : running) {
|
for (FileStatus run : running) {
|
||||||
ReentrantLock lock = null;
|
|
||||||
if (snapshotManager != null) {
|
|
||||||
lock = snapshotManager.getLocks().acquireLock(run.getPath().getName());
|
|
||||||
}
|
|
||||||
try {
|
try {
|
||||||
snapshotInProgress.addAll(fileInspector.filesUnderSnapshot(run.getPath()));
|
snapshotInProgress.addAll(fileInspector.filesUnderSnapshot(run.getPath()));
|
||||||
} catch (CorruptedSnapshotException e) {
|
} catch (CorruptedSnapshotException e) {
|
||||||
|
@ -293,10 +305,6 @@ public class SnapshotFileCache implements Stoppable {
|
||||||
} else {
|
} else {
|
||||||
throw e;
|
throw e;
|
||||||
}
|
}
|
||||||
} finally {
|
|
||||||
if (lock != null) {
|
|
||||||
lock.unlock();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -28,6 +28,8 @@ import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.concurrent.ThreadPoolExecutor;
|
import java.util.concurrent.ThreadPoolExecutor;
|
||||||
|
import java.util.concurrent.locks.ReadWriteLock;
|
||||||
|
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.FSDataInputStream;
|
import org.apache.hadoop.fs.FSDataInputStream;
|
||||||
|
@ -77,7 +79,6 @@ import org.apache.hadoop.hbase.snapshot.TablePartiallyOpenException;
|
||||||
import org.apache.hadoop.hbase.snapshot.UnknownSnapshotException;
|
import org.apache.hadoop.hbase.snapshot.UnknownSnapshotException;
|
||||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||||
import org.apache.hadoop.hbase.util.FSUtils;
|
import org.apache.hadoop.hbase.util.FSUtils;
|
||||||
import org.apache.hadoop.hbase.util.KeyLocker;
|
|
||||||
import org.apache.hadoop.hbase.util.NonceKey;
|
import org.apache.hadoop.hbase.util.NonceKey;
|
||||||
import org.apache.yetus.audience.InterfaceAudience;
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
import org.apache.yetus.audience.InterfaceStability;
|
import org.apache.yetus.audience.InterfaceStability;
|
||||||
|
@ -164,14 +165,12 @@ public class SnapshotManager extends MasterProcedureManager implements Stoppable
|
||||||
private ExecutorService executorService;
|
private ExecutorService executorService;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Locks for snapshot operations
|
* Read write lock between taking snapshot and snapshot HFile cleaner. The cleaner should skip to
|
||||||
* key is snapshot's filename in progress, value is the related lock
|
* check the HFiles if any snapshot is in progress, otherwise it may clean a HFile which would
|
||||||
* - create snapshot
|
* belongs to the newly creating snapshot. So we should grab the write lock first when cleaner
|
||||||
* - SnapshotCleaner
|
* start to work. (See HBASE-21387)
|
||||||
* */
|
*/
|
||||||
private KeyLocker<String> locks = new KeyLocker<>();
|
private ReentrantReadWriteLock takingSnapshotLock = new ReentrantReadWriteLock(true);
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
public SnapshotManager() {}
|
public SnapshotManager() {}
|
||||||
|
|
||||||
|
@ -547,14 +546,38 @@ public class SnapshotManager extends MasterProcedureManager implements Stoppable
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public ReadWriteLock getTakingSnapshotLock() {
|
||||||
|
return this.takingSnapshotLock;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The snapshot operation processing as following: <br>
|
||||||
|
* 1. Create a Snapshot Handler, and do some initialization; <br>
|
||||||
|
* 2. Put the handler into snapshotHandlers <br>
|
||||||
|
* So when we consider if any snapshot is taking, we should consider both the takingSnapshotLock
|
||||||
|
* and snapshotHandlers;
|
||||||
|
* @return true to indicate that there're some running snapshots.
|
||||||
|
*/
|
||||||
|
public synchronized boolean isTakingAnySnapshot() {
|
||||||
|
return this.takingSnapshotLock.getReadHoldCount() > 0 || this.snapshotHandlers.size() > 0;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Take a snapshot based on the enabled/disabled state of the table.
|
* Take a snapshot based on the enabled/disabled state of the table.
|
||||||
*
|
|
||||||
* @param snapshot
|
* @param snapshot
|
||||||
* @throws HBaseSnapshotException when a snapshot specific exception occurs.
|
* @throws HBaseSnapshotException when a snapshot specific exception occurs.
|
||||||
* @throws IOException when some sort of generic IO exception occurs.
|
* @throws IOException when some sort of generic IO exception occurs.
|
||||||
*/
|
*/
|
||||||
public void takeSnapshot(SnapshotDescription snapshot) throws IOException {
|
public void takeSnapshot(SnapshotDescription snapshot) throws IOException {
|
||||||
|
this.takingSnapshotLock.readLock().lock();
|
||||||
|
try {
|
||||||
|
takeSnapshotInternal(snapshot);
|
||||||
|
} finally {
|
||||||
|
this.takingSnapshotLock.readLock().unlock();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void takeSnapshotInternal(SnapshotDescription snapshot) throws IOException {
|
||||||
// check to see if we already completed the snapshot
|
// check to see if we already completed the snapshot
|
||||||
if (isSnapshotCompleted(snapshot)) {
|
if (isSnapshotCompleted(snapshot)) {
|
||||||
throw new SnapshotExistsException(
|
throw new SnapshotExistsException(
|
||||||
|
@ -1189,9 +1212,4 @@ public class SnapshotManager extends MasterProcedureManager implements Stoppable
|
||||||
builder.setType(SnapshotDescription.Type.FLUSH);
|
builder.setType(SnapshotDescription.Type.FLUSH);
|
||||||
return builder.build();
|
return builder.build();
|
||||||
}
|
}
|
||||||
|
|
||||||
public KeyLocker<String> getLocks() {
|
|
||||||
return locks;
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -24,7 +24,6 @@ import java.util.HashSet;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.concurrent.CancellationException;
|
import java.util.concurrent.CancellationException;
|
||||||
import java.util.concurrent.locks.ReentrantLock;
|
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
@ -174,7 +173,6 @@ public abstract class TakeSnapshotHandler extends EventHandler implements Snapsh
|
||||||
String msg = "Running " + snapshot.getType() + " table snapshot " + snapshot.getName() + " "
|
String msg = "Running " + snapshot.getType() + " table snapshot " + snapshot.getName() + " "
|
||||||
+ eventType + " on table " + snapshotTable;
|
+ eventType + " on table " + snapshotTable;
|
||||||
LOG.info(msg);
|
LOG.info(msg);
|
||||||
ReentrantLock lock = snapshotManager.getLocks().acquireLock(snapshot.getName());
|
|
||||||
MasterLock tableLockToRelease = this.tableLock;
|
MasterLock tableLockToRelease = this.tableLock;
|
||||||
status.setStatus(msg);
|
status.setStatus(msg);
|
||||||
try {
|
try {
|
||||||
|
@ -251,7 +249,6 @@ public abstract class TakeSnapshotHandler extends EventHandler implements Snapsh
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
LOG.error("Couldn't delete snapshot working directory:" + workingDir);
|
LOG.error("Couldn't delete snapshot working directory:" + workingDir);
|
||||||
}
|
}
|
||||||
lock.unlock();
|
|
||||||
tableLockToRelease.release();
|
tableLockToRelease.release();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -120,8 +120,6 @@ public final class SnapshotDescriptionUtils {
|
||||||
*/
|
*/
|
||||||
public static final String SNAPSHOT_WORKING_DIR = "hbase.snapshot.working.dir";
|
public static final String SNAPSHOT_WORKING_DIR = "hbase.snapshot.working.dir";
|
||||||
|
|
||||||
/** This tag will be created in in-progess snapshots */
|
|
||||||
public static final String SNAPSHOT_IN_PROGRESS = ".inprogress";
|
|
||||||
// snapshot operation values
|
// snapshot operation values
|
||||||
/** Default value if no start time is specified */
|
/** Default value if no start time is specified */
|
||||||
public static final long NO_SNAPSHOT_START_TIME_SPECIFIED = 0;
|
public static final long NO_SNAPSHOT_START_TIME_SPECIFIED = 0;
|
||||||
|
@ -354,16 +352,6 @@ public final class SnapshotDescriptionUtils {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Create in-progress tag under .tmp of in-progress snapshot
|
|
||||||
* */
|
|
||||||
public static void createInProgressTag(Path workingDir, FileSystem fs) throws IOException {
|
|
||||||
FsPermission perms = FSUtils.getFilePermissions(fs, fs.getConf(),
|
|
||||||
HConstants.DATA_FILE_UMASK_KEY);
|
|
||||||
Path snapshot_in_progress = new Path(workingDir, SnapshotDescriptionUtils.SNAPSHOT_IN_PROGRESS);
|
|
||||||
FSUtils.create(fs, snapshot_in_progress, perms, true);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Read in the {@link org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription} stored for the snapshot in the passed directory
|
* Read in the {@link org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription} stored for the snapshot in the passed directory
|
||||||
* @param fs filesystem where the snapshot was taken
|
* @param fs filesystem where the snapshot was taken
|
||||||
|
|
|
@ -147,9 +147,9 @@ public class TestSnapshotFileCache {
|
||||||
SnapshotFileCache cache = new SnapshotFileCache(fs, rootDir, period, 10000000,
|
SnapshotFileCache cache = new SnapshotFileCache(fs, rootDir, period, 10000000,
|
||||||
"test-snapshot-file-cache-refresh", new SnapshotFiles()) {
|
"test-snapshot-file-cache-refresh", new SnapshotFiles()) {
|
||||||
@Override
|
@Override
|
||||||
List<String> getSnapshotsInProgress(final SnapshotManager snapshotManager)
|
List<String> getSnapshotsInProgress()
|
||||||
throws IOException {
|
throws IOException {
|
||||||
List<String> result = super.getSnapshotsInProgress(snapshotManager);
|
List<String> result = super.getSnapshotsInProgress();
|
||||||
count.incrementAndGet();
|
count.incrementAndGet();
|
||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
|
|
|
@ -145,7 +145,7 @@ public class TestSnapshotHFileCleaner {
|
||||||
SnapshotFileCache cache = new SnapshotFileCache(fs, rootDir, period, 10000000,
|
SnapshotFileCache cache = new SnapshotFileCache(fs, rootDir, period, 10000000,
|
||||||
"test-snapshot-file-cache-refresh", new SnapshotFiles());
|
"test-snapshot-file-cache-refresh", new SnapshotFiles());
|
||||||
try {
|
try {
|
||||||
cache.getSnapshotsInProgress(null);
|
cache.getSnapshotsInProgress();
|
||||||
} catch (CorruptedSnapshotException cse) {
|
} catch (CorruptedSnapshotException cse) {
|
||||||
LOG.info("Expected exception " + cse);
|
LOG.info("Expected exception " + cse);
|
||||||
} finally {
|
} finally {
|
||||||
|
@ -173,7 +173,7 @@ public class TestSnapshotHFileCleaner {
|
||||||
SnapshotFileCache cache = new SnapshotFileCache(fs, rootDir, period, 10000000,
|
SnapshotFileCache cache = new SnapshotFileCache(fs, rootDir, period, 10000000,
|
||||||
"test-snapshot-file-cache-refresh", new SnapshotFiles());
|
"test-snapshot-file-cache-refresh", new SnapshotFiles());
|
||||||
try {
|
try {
|
||||||
cache.getSnapshotsInProgress(null);
|
cache.getSnapshotsInProgress();
|
||||||
} catch (CorruptedSnapshotException cse) {
|
} catch (CorruptedSnapshotException cse) {
|
||||||
LOG.info("Expected exception " + cse);
|
LOG.info("Expected exception " + cse);
|
||||||
} finally {
|
} finally {
|
||||||
|
@ -197,7 +197,7 @@ public class TestSnapshotHFileCleaner {
|
||||||
long period = Long.MAX_VALUE;
|
long period = Long.MAX_VALUE;
|
||||||
SnapshotFileCache cache = new SnapshotFileCache(fs, rootDir, period, 10000000,
|
SnapshotFileCache cache = new SnapshotFileCache(fs, rootDir, period, 10000000,
|
||||||
"test-snapshot-file-cache-refresh", new SnapshotFiles());
|
"test-snapshot-file-cache-refresh", new SnapshotFiles());
|
||||||
cache.getSnapshotsInProgress(null);
|
cache.getSnapshotsInProgress();
|
||||||
assertFalse(fs.exists(builder.getSnapshotsDir()));
|
assertFalse(fs.exists(builder.getSnapshotsDir()));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,207 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.hbase.snapshot;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.FileStatus;
|
||||||
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||||
|
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||||
|
import org.apache.hadoop.hbase.TableName;
|
||||||
|
import org.apache.hadoop.hbase.TestTableName;
|
||||||
|
import org.apache.hadoop.hbase.client.Put;
|
||||||
|
import org.apache.hadoop.hbase.client.Table;
|
||||||
|
import org.apache.hadoop.hbase.master.HMaster;
|
||||||
|
import org.apache.hadoop.hbase.master.snapshot.SnapshotHFileCleaner;
|
||||||
|
import org.apache.hadoop.hbase.master.snapshot.SnapshotManager;
|
||||||
|
import org.apache.hadoop.hbase.testclassification.LargeTests;
|
||||||
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
|
import org.apache.hadoop.hbase.util.FSUtils;
|
||||||
|
import org.apache.hadoop.hbase.util.FSVisitor;
|
||||||
|
import org.junit.AfterClass;
|
||||||
|
import org.junit.Assert;
|
||||||
|
import org.junit.BeforeClass;
|
||||||
|
import org.junit.ClassRule;
|
||||||
|
import org.junit.Rule;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.junit.experimental.categories.Category;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
|
||||||
|
import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test Case for HBASE-21387
|
||||||
|
*/
|
||||||
|
@Category({ LargeTests.class })
|
||||||
|
public class TestSnapshotWhenChoreCleaning {
|
||||||
|
|
||||||
|
@ClassRule
|
||||||
|
public static final HBaseClassTestRule CLASS_RULE =
|
||||||
|
HBaseClassTestRule.forClass(TestSnapshotWhenChoreCleaning.class);
|
||||||
|
|
||||||
|
private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
|
||||||
|
private static final Configuration CONF = TEST_UTIL.getConfiguration();
|
||||||
|
private static final Logger LOG = LoggerFactory.getLogger(TestSnapshotClientRetries.class);
|
||||||
|
private static final TableName TABLE_NAME = TableName.valueOf("testTable");
|
||||||
|
private static final int MAX_SPLIT_KEYS_NUM = 100;
|
||||||
|
private static final byte[] FAMILY = Bytes.toBytes("family");
|
||||||
|
private static final byte[] QUALIFIER = Bytes.toBytes("qualifier");
|
||||||
|
private static final byte[] VALUE = Bytes.toBytes("value");
|
||||||
|
private static Table TABLE;
|
||||||
|
|
||||||
|
@Rule
|
||||||
|
public TestTableName TEST_TABLE = new TestTableName();
|
||||||
|
|
||||||
|
@BeforeClass
|
||||||
|
public static void setUp() throws Exception {
|
||||||
|
// Set the hbase.snapshot.thread.pool.max to 1;
|
||||||
|
CONF.setInt("hbase.snapshot.thread.pool.max", 1);
|
||||||
|
// Enable snapshot
|
||||||
|
CONF.setBoolean(SnapshotManager.HBASE_SNAPSHOT_ENABLED, true);
|
||||||
|
// Start MiniCluster.
|
||||||
|
TEST_UTIL.startMiniCluster(3);
|
||||||
|
// Create talbe
|
||||||
|
createTable();
|
||||||
|
}
|
||||||
|
|
||||||
|
private static byte[] integerToBytes(int i) {
|
||||||
|
return Bytes.toBytes(String.format("%06d", i));
|
||||||
|
}
|
||||||
|
|
||||||
|
private static void createTable() throws IOException {
|
||||||
|
byte[][] splitKeys = new byte[MAX_SPLIT_KEYS_NUM][];
|
||||||
|
for (int i = 0; i < splitKeys.length; i++) {
|
||||||
|
splitKeys[i] = integerToBytes(i);
|
||||||
|
}
|
||||||
|
TABLE = TEST_UTIL.createTable(TABLE_NAME, FAMILY, splitKeys);
|
||||||
|
}
|
||||||
|
|
||||||
|
@AfterClass
|
||||||
|
public static void tearDown() throws Exception {
|
||||||
|
TEST_UTIL.shutdownMiniCluster();
|
||||||
|
}
|
||||||
|
|
||||||
|
private static void loadDataAndFlush() throws IOException {
|
||||||
|
for (int i = 0; i < MAX_SPLIT_KEYS_NUM; i++) {
|
||||||
|
Put put = new Put(integerToBytes(i)).addColumn(FAMILY, QUALIFIER,
|
||||||
|
Bytes.add(VALUE, Bytes.toBytes(i)));
|
||||||
|
TABLE.put(put);
|
||||||
|
}
|
||||||
|
TEST_UTIL.flush(TABLE_NAME);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static List<Path> listHFileNames(final FileSystem fs, final Path tableDir)
|
||||||
|
throws IOException {
|
||||||
|
final List<Path> hfiles = new ArrayList<>();
|
||||||
|
FSVisitor.visitTableStoreFiles(fs, tableDir, (region, family, hfileName) -> {
|
||||||
|
hfiles.add(new Path(new Path(new Path(tableDir, region), family), hfileName));
|
||||||
|
});
|
||||||
|
Collections.sort(hfiles);
|
||||||
|
return hfiles;
|
||||||
|
}
|
||||||
|
|
||||||
|
private static boolean isAnySnapshots(FileSystem fs) throws IOException {
|
||||||
|
Path snapshotDir = SnapshotDescriptionUtils.getSnapshotsDir(FSUtils.getRootDir(CONF));
|
||||||
|
FileStatus[] snapFiles = fs.listStatus(snapshotDir);
|
||||||
|
if (snapFiles.length == 0) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
Path firstPath = snapFiles[0].getPath();
|
||||||
|
LOG.info("firstPath in isAnySnapshots: " + firstPath);
|
||||||
|
if (snapFiles.length == 1 && firstPath.getName().equals(".tmp")) {
|
||||||
|
FileStatus[] tmpSnapFiles = fs.listStatus(firstPath);
|
||||||
|
return tmpSnapFiles != null && tmpSnapFiles.length > 0;
|
||||||
|
}
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSnapshotWhenSnapshotHFileCleanerRunning() throws Exception {
|
||||||
|
// Load data and flush to generate huge number of HFiles.
|
||||||
|
loadDataAndFlush();
|
||||||
|
|
||||||
|
SnapshotHFileCleaner cleaner = new SnapshotHFileCleaner();
|
||||||
|
cleaner.init(ImmutableMap.of(HMaster.MASTER, TEST_UTIL.getHBaseCluster().getMaster()));
|
||||||
|
cleaner.setConf(CONF);
|
||||||
|
|
||||||
|
FileSystem fs = FSUtils.getCurrentFileSystem(CONF);
|
||||||
|
List<Path> fileNames =
|
||||||
|
listHFileNames(fs, FSUtils.getTableDir(FSUtils.getRootDir(CONF), TABLE_NAME));
|
||||||
|
List<FileStatus> files = new ArrayList<>();
|
||||||
|
for (Path fileName : fileNames) {
|
||||||
|
files.add(fs.getFileStatus(fileName));
|
||||||
|
}
|
||||||
|
|
||||||
|
TEST_UTIL.getAdmin().snapshot("snapshotName_prev", TABLE_NAME);
|
||||||
|
Assert.assertEquals(Lists.newArrayList(cleaner.getDeletableFiles(files)).size(), 0);
|
||||||
|
TEST_UTIL.getAdmin().deleteSnapshot("snapshotName_prev");
|
||||||
|
cleaner.getFileCacheForTesting().triggerCacheRefreshForTesting();
|
||||||
|
Assert.assertEquals(Lists.newArrayList(cleaner.getDeletableFiles(files)).size(), 100);
|
||||||
|
|
||||||
|
Runnable snapshotRunnable = () -> {
|
||||||
|
try {
|
||||||
|
// The thread will be busy on taking snapshot;
|
||||||
|
for (int k = 0; k < 5; k++) {
|
||||||
|
TEST_UTIL.getAdmin().snapshot("snapshotName_" + k, TABLE_NAME);
|
||||||
|
}
|
||||||
|
} catch (Exception e) {
|
||||||
|
LOG.error("Snapshot failed: ", e);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
final AtomicBoolean success = new AtomicBoolean(true);
|
||||||
|
Runnable cleanerRunnable = () -> {
|
||||||
|
try {
|
||||||
|
while (!isAnySnapshots(fs)) {
|
||||||
|
LOG.info("Not found any snapshot, sleep 100ms");
|
||||||
|
Thread.sleep(100);
|
||||||
|
}
|
||||||
|
for (int k = 0; k < 5; k++) {
|
||||||
|
cleaner.getFileCacheForTesting().triggerCacheRefreshForTesting();
|
||||||
|
Iterable<FileStatus> toDeleteFiles = cleaner.getDeletableFiles(files);
|
||||||
|
List<FileStatus> deletableFiles = Lists.newArrayList(toDeleteFiles);
|
||||||
|
LOG.info("Size of deletableFiles is: " + deletableFiles.size());
|
||||||
|
for (int i = 0; i < deletableFiles.size(); i++) {
|
||||||
|
LOG.debug("toDeleteFiles[{}] is: {}", i, deletableFiles.get(i));
|
||||||
|
}
|
||||||
|
if (deletableFiles.size() > 0) {
|
||||||
|
success.set(false);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} catch (Exception e) {
|
||||||
|
LOG.error("Chore cleaning failed: ", e);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
Thread t1 = new Thread(snapshotRunnable);
|
||||||
|
t1.start();
|
||||||
|
Thread t2 = new Thread(cleanerRunnable);
|
||||||
|
t2.start();
|
||||||
|
t1.join();
|
||||||
|
t2.join();
|
||||||
|
Assert.assertTrue(success.get());
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue