HBASE-24111 Enable CompactionTool executions on non-HDFS filesystems (#1427)

Signed-off-by: Josh Elser <elserj@apache.org>
This commit is contained in:
Peter Somogyi 2020-04-06 11:54:19 +02:00
parent 5a67fb8c83
commit b6177569a1
2 changed files with 42 additions and 32 deletions

View File

@ -53,4 +53,20 @@ public abstract class JobUtil {
throws IOException, InterruptedException { throws IOException, InterruptedException {
return JobSubmissionFiles.getStagingDir(new Cluster(conf), conf); return JobSubmissionFiles.getStagingDir(new Cluster(conf), conf);
} }
/**
* Initializes the staging directory and returns the qualified path.
*
* @param conf conf system configuration
* @return qualified staging directory path
* @throws IOException if the ownership on the staging directory is not as expected
* @throws InterruptedException if the thread getting the staging directory is interrupted
*/
public static Path getQualifiedStagingDir(Configuration conf)
throws IOException, InterruptedException {
Cluster cluster = new Cluster(conf);
Path stagingDir = JobSubmissionFiles.getStagingDir(cluster, conf);
return cluster.getFileSystem().makeQualified(stagingDir);
}
} }

View File

@ -44,6 +44,7 @@ import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker; import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController; import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.FSTableDescriptors; import org.apache.hadoop.hbase.util.FSTableDescriptors;
import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.FSUtils;
@ -57,6 +58,7 @@ import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit; import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
import org.apache.hadoop.mapreduce.security.TokenCache;
import org.apache.hadoop.util.LineReader; import org.apache.hadoop.util.LineReader;
import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner; import org.apache.hadoop.util.ToolRunner;
@ -76,7 +78,6 @@ import org.slf4j.LoggerFactory;
public class CompactionTool extends Configured implements Tool { public class CompactionTool extends Configured implements Tool {
private static final Logger LOG = LoggerFactory.getLogger(CompactionTool.class); private static final Logger LOG = LoggerFactory.getLogger(CompactionTool.class);
private final static String CONF_TMP_DIR = "hbase.tmp.dir";
private final static String CONF_COMPACT_ONCE = "hbase.compactiontool.compact.once"; private final static String CONF_COMPACT_ONCE = "hbase.compactiontool.compact.once";
private final static String CONF_COMPACT_MAJOR = "hbase.compactiontool.compact.major"; private final static String CONF_COMPACT_MAJOR = "hbase.compactiontool.compact.major";
private final static String CONF_DELETE_COMPACTED = "hbase.compactiontool.delete"; private final static String CONF_DELETE_COMPACTED = "hbase.compactiontool.delete";
@ -89,12 +90,10 @@ public class CompactionTool extends Configured implements Tool {
private final boolean deleteCompacted; private final boolean deleteCompacted;
private final Configuration conf; private final Configuration conf;
private final FileSystem fs; private final FileSystem fs;
private final Path tmpDir;
public CompactionWorker(final FileSystem fs, final Configuration conf) { public CompactionWorker(final FileSystem fs, final Configuration conf) {
this.conf = conf; this.conf = conf;
this.deleteCompacted = conf.getBoolean(CONF_DELETE_COMPACTED, false); this.deleteCompacted = conf.getBoolean(CONF_DELETE_COMPACTED, false);
this.tmpDir = new Path(conf.get(CONF_TMP_DIR));
this.fs = fs; this.fs = fs;
} }
@ -105,7 +104,8 @@ public class CompactionTool extends Configured implements Tool {
* @param compactOnce Execute just a single step of compaction. * @param compactOnce Execute just a single step of compaction.
* @param major Request major compaction. * @param major Request major compaction.
*/ */
public void compact(final Path path, final boolean compactOnce, final boolean major) throws IOException { public void compact(final Path path, final boolean compactOnce, final boolean major)
throws IOException {
if (isFamilyDir(fs, path)) { if (isFamilyDir(fs, path)) {
Path regionDir = path.getParent(); Path regionDir = path.getParent();
Path tableDir = regionDir.getParent(); Path tableDir = regionDir.getParent();
@ -150,7 +150,7 @@ public class CompactionTool extends Configured implements Tool {
private void compactStoreFiles(final Path tableDir, final TableDescriptor htd, private void compactStoreFiles(final Path tableDir, final TableDescriptor htd,
final RegionInfo hri, final String familyName, final boolean compactOnce, final RegionInfo hri, final String familyName, final boolean compactOnce,
final boolean major) throws IOException { final boolean major) throws IOException {
HStore store = getStore(conf, fs, tableDir, htd, hri, familyName, tmpDir); HStore store = getStore(conf, fs, tableDir, htd, hri, familyName);
LOG.info("Compact table=" + htd.getTableName() + LOG.info("Compact table=" + htd.getTableName() +
" region=" + hri.getRegionNameAsString() + " region=" + hri.getRegionNameAsString() +
" family=" + familyName); " family=" + familyName);
@ -177,19 +177,10 @@ public class CompactionTool extends Configured implements Tool {
store.close(); store.close();
} }
/**
* Create a "mock" HStore that uses the tmpDir specified by the user and
* the store dir to compact as source.
*/
private static HStore getStore(final Configuration conf, final FileSystem fs, private static HStore getStore(final Configuration conf, final FileSystem fs,
final Path tableDir, final TableDescriptor htd, final RegionInfo hri, final Path tableDir, final TableDescriptor htd, final RegionInfo hri,
final String familyName, final Path tempDir) throws IOException { final String familyName) throws IOException {
HRegionFileSystem regionFs = new HRegionFileSystem(conf, fs, tableDir, hri) { HRegionFileSystem regionFs = new HRegionFileSystem(conf, fs, tableDir, hri);
@Override
public Path getTempDir() {
return tempDir;
}
};
HRegion region = new HRegion(regionFs, null, conf, htd, null); HRegion region = new HRegion(regionFs, null, conf, htd, null);
return new HStore(region, htd.getColumnFamily(Bytes.toBytes(familyName)), conf, false); return new HStore(region, htd.getColumnFamily(Bytes.toBytes(familyName)), conf, false);
} }
@ -221,7 +212,7 @@ public class CompactionTool extends Configured implements Tool {
major = conf.getBoolean(CONF_COMPACT_MAJOR, false); major = conf.getBoolean(CONF_COMPACT_MAJOR, false);
try { try {
FileSystem fs = FileSystem.get(conf); FileSystem fs = CommonFSUtils.getRootDirFileSystem(conf);
this.compactor = new CompactionWorker(fs, conf); this.compactor = new CompactionWorker(fs, conf);
} catch (IOException e) { } catch (IOException e) {
throw new RuntimeException("Could not get the input FileSystem", e); throw new RuntimeException("Could not get the input FileSystem", e);
@ -301,23 +292,19 @@ public class CompactionTool extends Configured implements Tool {
* The file is a TextFile with each line corrisponding to a * The file is a TextFile with each line corrisponding to a
* store files directory to compact. * store files directory to compact.
*/ */
public static void createInputFile(final FileSystem fs, final Path path, public static List<Path> createInputFile(final FileSystem fs, final FileSystem stagingFs,
final Set<Path> toCompactDirs) throws IOException { final Path path, final Set<Path> toCompactDirs) throws IOException {
// Extract the list of store dirs // Extract the list of store dirs
List<Path> storeDirs = new LinkedList<>(); List<Path> storeDirs = new LinkedList<>();
for (Path compactDir: toCompactDirs) { for (Path compactDir: toCompactDirs) {
if (isFamilyDir(fs, compactDir)) { if (isFamilyDir(fs, compactDir)) {
storeDirs.add(compactDir); storeDirs.add(compactDir);
} else if (isRegionDir(fs, compactDir)) { } else if (isRegionDir(fs, compactDir)) {
for (Path familyDir: FSUtils.getFamilyDirs(fs, compactDir)) { storeDirs.addAll(FSUtils.getFamilyDirs(fs, compactDir));
storeDirs.add(familyDir);
}
} else if (isTableDir(fs, compactDir)) { } else if (isTableDir(fs, compactDir)) {
// Lookup regions // Lookup regions
for (Path regionDir: FSUtils.getRegionDirs(fs, compactDir)) { for (Path regionDir: FSUtils.getRegionDirs(fs, compactDir)) {
for (Path familyDir: FSUtils.getFamilyDirs(fs, regionDir)) { storeDirs.addAll(FSUtils.getFamilyDirs(fs, regionDir));
storeDirs.add(familyDir);
}
} }
} else { } else {
throw new IOException( throw new IOException(
@ -326,7 +313,7 @@ public class CompactionTool extends Configured implements Tool {
} }
// Write Input File // Write Input File
FSDataOutputStream stream = fs.create(path); FSDataOutputStream stream = stagingFs.create(path);
LOG.info("Create input file=" + path + " with " + storeDirs.size() + " dirs to compact."); LOG.info("Create input file=" + path + " with " + storeDirs.size() + " dirs to compact.");
try { try {
final byte[] newLine = Bytes.toBytes("\n"); final byte[] newLine = Bytes.toBytes("\n");
@ -337,6 +324,7 @@ public class CompactionTool extends Configured implements Tool {
} finally { } finally {
stream.close(); stream.close();
} }
return storeDirs;
} }
} }
@ -361,15 +349,20 @@ public class CompactionTool extends Configured implements Tool {
// add dependencies (including HBase ones) // add dependencies (including HBase ones)
TableMapReduceUtil.addDependencyJars(job); TableMapReduceUtil.addDependencyJars(job);
Path stagingDir = JobUtil.getStagingDir(conf); Path stagingDir = JobUtil.getQualifiedStagingDir(conf);
FileSystem stagingFs = stagingDir.getFileSystem(conf);
try { try {
// Create input file with the store dirs // Create input file with the store dirs
Path inputPath = new Path(stagingDir, "compact-"+ EnvironmentEdgeManager.currentTime()); Path inputPath = new Path(stagingDir, "compact-"+ EnvironmentEdgeManager.currentTime());
CompactionInputFormat.createInputFile(fs, inputPath, toCompactDirs); List<Path> storeDirs = CompactionInputFormat.createInputFile(fs, stagingFs,
inputPath, toCompactDirs);
CompactionInputFormat.addInputPath(job, inputPath); CompactionInputFormat.addInputPath(job, inputPath);
// Initialize credential for secure cluster // Initialize credential for secure cluster
TableMapReduceUtil.initCredentials(job); TableMapReduceUtil.initCredentials(job);
// Despite the method name this will get delegation token for the filesystem
TokenCache.obtainTokensForNamenodes(job.getCredentials(),
storeDirs.toArray(new Path[0]), conf);
// Start the MR Job and wait // Start the MR Job and wait
return job.waitForCompletion(true) ? 0 : 1; return job.waitForCompletion(true) ? 0 : 1;
@ -398,7 +391,7 @@ public class CompactionTool extends Configured implements Tool {
boolean mapred = false; boolean mapred = false;
Configuration conf = getConf(); Configuration conf = getConf();
FileSystem fs = FileSystem.get(conf); FileSystem fs = CommonFSUtils.getRootDirFileSystem(conf);
try { try {
for (int i = 0; i < args.length; ++i) { for (int i = 0; i < args.length; ++i) {
@ -458,14 +451,15 @@ public class CompactionTool extends Configured implements Tool {
System.err.println("Note: -D properties will be applied to the conf used. "); System.err.println("Note: -D properties will be applied to the conf used. ");
System.err.println("For example: "); System.err.println("For example: ");
System.err.println(" To stop delete of compacted file, pass -D"+CONF_DELETE_COMPACTED+"=false"); System.err.println(" To stop delete of compacted file, pass -D"+CONF_DELETE_COMPACTED+"=false");
System.err.println(" To set tmp dir, pass -D"+CONF_TMP_DIR+"=ALTERNATE_DIR");
System.err.println(); System.err.println();
System.err.println("Examples:"); System.err.println("Examples:");
System.err.println(" To compact the full 'TestTable' using MapReduce:"); System.err.println(" To compact the full 'TestTable' using MapReduce:");
System.err.println(" $ hbase " + this.getClass().getName() + " -mapred hdfs://hbase/data/default/TestTable"); System.err.println(" $ hbase " + this.getClass().getName() +
" -mapred hdfs://hbase/data/default/TestTable");
System.err.println(); System.err.println();
System.err.println(" To compact column family 'x' of the table 'TestTable' region 'abc':"); System.err.println(" To compact column family 'x' of the table 'TestTable' region 'abc':");
System.err.println(" $ hbase " + this.getClass().getName() + " hdfs://hbase/data/default/TestTable/abc/x"); System.err.println(" $ hbase " + this.getClass().getName() +
" hdfs://hbase/data/default/TestTable/abc/x");
} }
public static void main(String[] args) throws Exception { public static void main(String[] args) throws Exception {