HBASE-26485 Introduce a method to clean restore directory after Snapshot Scan (#3877)

Signed-off-by: Duo Zhang <zhangduo@apache.org>
This commit is contained in:
Ruanhui 2021-11-27 20:35:24 +08:00 committed by GitHub
parent 7b0a61fb0f
commit 32cb580ba2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 49 additions and 0 deletions

View File

@ -224,4 +224,14 @@ public class TableSnapshotInputFormat extends InputFormat<ImmutableBytesWritable
TableSnapshotInputFormatImpl.setInput(job.getConfiguration(), snapshotName, restoreDir,
splitAlgo, numSplitsPerRegion);
}
/**
* clean restore directory after snapshot scan job
* @param job the snapshot scan job
* @param snapshotName the name of the snapshot to read from
* @throws IOException if an error occurs
*/
public static void cleanRestoreDir(Job job, String snapshotName) throws IOException {
TableSnapshotInputFormatImpl.cleanRestoreDir(job, snapshotName);
}
}

View File

@ -52,6 +52,7 @@ import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.apache.hadoop.hbase.util.RegionSplitter;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -622,4 +623,24 @@ public class TableSnapshotInputFormatImpl {
RestoreSnapshotHelper.copySnapshotForScanner(conf, fs, rootDir, restoreDir, snapshotName);
conf.set(RESTORE_DIR_KEY, restoreDir.toString());
}
/**
* clean restore directory after snapshot scan job
* @param job the snapshot scan job
* @param snapshotName the name of the snapshot to read from
* @throws IOException if an error occurs
*/
public static void cleanRestoreDir(Job job, String snapshotName) throws IOException {
Configuration conf = job.getConfiguration();
Path restoreDir = new Path(conf.get(RESTORE_DIR_KEY));
FileSystem fs = restoreDir.getFileSystem(conf);
if (!fs.exists(restoreDir)) {
LOG.warn("{} doesn't exist on file system, maybe it's already been cleaned", restoreDir);
return;
}
if (!fs.delete(restoreDir, true)) {
LOG.warn("Failed clean restore dir {} for snapshot {}", restoreDir, snapshotName);
}
LOG.debug("Clean restore directory {} for {}", restoreDir, snapshotName);
}
}

View File

@ -575,4 +575,22 @@ public class TestTableSnapshotInputFormat extends TableSnapshotInputFormatTestBa
public void testWithMapReduceMultipleMappersPerRegion() throws Exception {
testWithMapReduce(UTIL, "testWithMapReduceMultiRegion", 10, 5, 50, false);
}
@Test
public void testCleanRestoreDir() throws Exception {
TableName tableName = TableName.valueOf("test_table");
String snapshotName = "test_snapshot";
createTableAndSnapshot(UTIL, tableName, snapshotName, getStartRow(), getEndRow(), 1);
Job job = Job.getInstance(UTIL.getConfiguration());
Path workingDir = UTIL.getDataTestDirOnTestFS(snapshotName);
TableMapReduceUtil.initTableSnapshotMapperJob(snapshotName,
new Scan(), TestTableSnapshotMapper.class, ImmutableBytesWritable.class,
NullWritable.class, job, false, workingDir);
FileSystem fs = workingDir.getFileSystem(job.getConfiguration());
Path restorePath = new Path(job.getConfiguration()
.get("hbase.TableSnapshotInputFormat.restore.dir"));
Assert.assertTrue(fs.exists(restorePath));
TableSnapshotInputFormat.cleanRestoreDir(job, snapshotName);
Assert.assertFalse(fs.exists(restorePath));
}
}