HBASE-26485 Introduce a method to clean restore directory after Snapshot Scan (#3877)
Signed-off-by: Duo Zhang <zhangduo@apache.org>
This commit is contained in:
parent
5437532910
commit
4fb3e304e2
|
@ -235,4 +235,14 @@ public class TableSnapshotInputFormat extends InputFormat<ImmutableBytesWritable
|
|||
TableSnapshotInputFormatImpl.setInput(job.getConfiguration(), snapshotName, restoreDir,
|
||||
splitAlgo, numSplitsPerRegion);
|
||||
}
|
||||
|
||||
/**
|
||||
* clean restore directory after snapshot scan job
|
||||
* @param job the snapshot scan job
|
||||
* @param snapshotName the name of the snapshot to read from
|
||||
* @throws IOException if an error occurs
|
||||
*/
|
||||
public static void cleanRestoreDir(Job job, String snapshotName) throws IOException {
|
||||
TableSnapshotInputFormatImpl.cleanRestoreDir(job, snapshotName);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -52,6 +52,7 @@ import org.apache.hadoop.hbase.util.Bytes;
|
|||
import org.apache.hadoop.hbase.util.CommonFSUtils;
|
||||
import org.apache.hadoop.hbase.util.RegionSplitter;
|
||||
import org.apache.hadoop.io.Writable;
|
||||
import org.apache.hadoop.mapreduce.Job;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
@ -622,4 +623,24 @@ public class TableSnapshotInputFormatImpl {
|
|||
RestoreSnapshotHelper.copySnapshotForScanner(conf, fs, rootDir, restoreDir, snapshotName);
|
||||
conf.set(RESTORE_DIR_KEY, restoreDir.toString());
|
||||
}
|
||||
|
||||
/**
|
||||
* clean restore directory after snapshot scan job
|
||||
* @param job the snapshot scan job
|
||||
* @param snapshotName the name of the snapshot to read from
|
||||
* @throws IOException if an error occurs
|
||||
*/
|
||||
public static void cleanRestoreDir(Job job, String snapshotName) throws IOException {
|
||||
Configuration conf = job.getConfiguration();
|
||||
Path restoreDir = new Path(conf.get(RESTORE_DIR_KEY));
|
||||
FileSystem fs = restoreDir.getFileSystem(conf);
|
||||
if (!fs.exists(restoreDir)) {
|
||||
LOG.warn("{} doesn't exist on file system, maybe it's already been cleaned", restoreDir);
|
||||
return;
|
||||
}
|
||||
if (!fs.delete(restoreDir, true)) {
|
||||
LOG.warn("Failed clean restore dir {} for snapshot {}", restoreDir, snapshotName);
|
||||
}
|
||||
LOG.debug("Clean restore directory {} for {}", restoreDir, snapshotName);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -575,4 +575,22 @@ public class TestTableSnapshotInputFormat extends TableSnapshotInputFormatTestBa
|
|||
public void testWithMapReduceMultipleMappersPerRegion() throws Exception {
|
||||
testWithMapReduce(UTIL, "testWithMapReduceMultiRegion", 10, 5, 50, false);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCleanRestoreDir() throws Exception {
|
||||
TableName tableName = TableName.valueOf("test_table");
|
||||
String snapshotName = "test_snapshot";
|
||||
createTableAndSnapshot(UTIL, tableName, snapshotName, getStartRow(), getEndRow(), 1);
|
||||
Job job = Job.getInstance(UTIL.getConfiguration());
|
||||
Path workingDir = UTIL.getDataTestDirOnTestFS(snapshotName);
|
||||
TableMapReduceUtil.initTableSnapshotMapperJob(snapshotName,
|
||||
new Scan(), TestTableSnapshotMapper.class, ImmutableBytesWritable.class,
|
||||
NullWritable.class, job, false, workingDir);
|
||||
FileSystem fs = workingDir.getFileSystem(job.getConfiguration());
|
||||
Path restorePath = new Path(job.getConfiguration()
|
||||
.get("hbase.TableSnapshotInputFormat.restore.dir"));
|
||||
Assert.assertTrue(fs.exists(restorePath));
|
||||
TableSnapshotInputFormat.cleanRestoreDir(job, snapshotName);
|
||||
Assert.assertFalse(fs.exists(restorePath));
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue