From b6177569a1e4eb9cc24207465c7db448dc739972 Mon Sep 17 00:00:00 2001 From: Peter Somogyi Date: Mon, 6 Apr 2020 11:54:19 +0200 Subject: [PATCH] HBASE-24111 Enable CompactionTool executions on non-HDFS filesystems (#1427) Signed-off-by: Josh Elser --- .../hadoop/hbase/mapreduce/JobUtil.java | 16 +++++ .../hbase/regionserver/CompactionTool.java | 58 +++++++++---------- 2 files changed, 42 insertions(+), 32 deletions(-) diff --git a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/mapreduce/JobUtil.java b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/mapreduce/JobUtil.java index b6d96426bfd..b4f62b3970b 100644 --- a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/mapreduce/JobUtil.java +++ b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/mapreduce/JobUtil.java @@ -53,4 +53,20 @@ public abstract class JobUtil { throws IOException, InterruptedException { return JobSubmissionFiles.getStagingDir(new Cluster(conf), conf); } + + /** + * Initializes the staging directory and returns the qualified path. + * + * @param conf conf system configuration + * @return qualified staging directory path + * @throws IOException if the ownership on the staging directory is not as expected + * @throws InterruptedException if the thread getting the staging directory is interrupted + */ + public static Path getQualifiedStagingDir(Configuration conf) + throws IOException, InterruptedException { + Cluster cluster = new Cluster(conf); + Path stagingDir = JobSubmissionFiles.getStagingDir(cluster, conf); + return cluster.getFileSystem().makeQualified(stagingDir); + } + } diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionTool.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionTool.java index da06ffffdfb..f01e3d90224 100644 --- a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionTool.java +++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionTool.java @@ -44,6 +44,7 @@ import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext; import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker; import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.CommonFSUtils; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.FSTableDescriptors; import org.apache.hadoop.hbase.util.FSUtils; @@ -57,6 +58,7 @@ import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileSplit; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; +import org.apache.hadoop.mapreduce.security.TokenCache; import org.apache.hadoop.util.LineReader; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; @@ -76,7 +78,6 @@ import org.slf4j.LoggerFactory; public class CompactionTool extends Configured implements Tool { private static final Logger LOG = LoggerFactory.getLogger(CompactionTool.class); - private final static String CONF_TMP_DIR = "hbase.tmp.dir"; private final static String CONF_COMPACT_ONCE = "hbase.compactiontool.compact.once"; private final static String CONF_COMPACT_MAJOR = "hbase.compactiontool.compact.major"; private final static String CONF_DELETE_COMPACTED = "hbase.compactiontool.delete"; @@ -89,12 +90,10 @@ public class CompactionTool extends Configured implements Tool { private final boolean deleteCompacted; private final Configuration conf; private final FileSystem fs; - private final Path tmpDir; public CompactionWorker(final FileSystem fs, final Configuration conf) { this.conf = conf; this.deleteCompacted = conf.getBoolean(CONF_DELETE_COMPACTED, false); - this.tmpDir = new Path(conf.get(CONF_TMP_DIR)); this.fs = fs; } @@ -105,7 +104,8 @@ public class CompactionTool extends Configured implements Tool { * @param compactOnce Execute just a single step of compaction. * @param major Request major compaction. */ - public void compact(final Path path, final boolean compactOnce, final boolean major) throws IOException { + public void compact(final Path path, final boolean compactOnce, final boolean major) + throws IOException { if (isFamilyDir(fs, path)) { Path regionDir = path.getParent(); Path tableDir = regionDir.getParent(); @@ -150,7 +150,7 @@ public class CompactionTool extends Configured implements Tool { private void compactStoreFiles(final Path tableDir, final TableDescriptor htd, final RegionInfo hri, final String familyName, final boolean compactOnce, final boolean major) throws IOException { - HStore store = getStore(conf, fs, tableDir, htd, hri, familyName, tmpDir); + HStore store = getStore(conf, fs, tableDir, htd, hri, familyName); LOG.info("Compact table=" + htd.getTableName() + " region=" + hri.getRegionNameAsString() + " family=" + familyName); @@ -177,19 +177,10 @@ public class CompactionTool extends Configured implements Tool { store.close(); } - /** - * Create a "mock" HStore that uses the tmpDir specified by the user and - * the store dir to compact as source. - */ private static HStore getStore(final Configuration conf, final FileSystem fs, final Path tableDir, final TableDescriptor htd, final RegionInfo hri, - final String familyName, final Path tempDir) throws IOException { - HRegionFileSystem regionFs = new HRegionFileSystem(conf, fs, tableDir, hri) { - @Override - public Path getTempDir() { - return tempDir; - } - }; + final String familyName) throws IOException { + HRegionFileSystem regionFs = new HRegionFileSystem(conf, fs, tableDir, hri); HRegion region = new HRegion(regionFs, null, conf, htd, null); return new HStore(region, htd.getColumnFamily(Bytes.toBytes(familyName)), conf, false); } @@ -221,7 +212,7 @@ public class CompactionTool extends Configured implements Tool { major = conf.getBoolean(CONF_COMPACT_MAJOR, false); try { - FileSystem fs = FileSystem.get(conf); + FileSystem fs = CommonFSUtils.getRootDirFileSystem(conf); this.compactor = new CompactionWorker(fs, conf); } catch (IOException e) { throw new RuntimeException("Could not get the input FileSystem", e); @@ -301,23 +292,19 @@ public class CompactionTool extends Configured implements Tool { * The file is a TextFile with each line corrisponding to a * store files directory to compact. */ - public static void createInputFile(final FileSystem fs, final Path path, - final Set toCompactDirs) throws IOException { + public static List createInputFile(final FileSystem fs, final FileSystem stagingFs, + final Path path, final Set toCompactDirs) throws IOException { // Extract the list of store dirs List storeDirs = new LinkedList<>(); for (Path compactDir: toCompactDirs) { if (isFamilyDir(fs, compactDir)) { storeDirs.add(compactDir); } else if (isRegionDir(fs, compactDir)) { - for (Path familyDir: FSUtils.getFamilyDirs(fs, compactDir)) { - storeDirs.add(familyDir); - } + storeDirs.addAll(FSUtils.getFamilyDirs(fs, compactDir)); } else if (isTableDir(fs, compactDir)) { // Lookup regions for (Path regionDir: FSUtils.getRegionDirs(fs, compactDir)) { - for (Path familyDir: FSUtils.getFamilyDirs(fs, regionDir)) { - storeDirs.add(familyDir); - } + storeDirs.addAll(FSUtils.getFamilyDirs(fs, regionDir)); } } else { throw new IOException( @@ -326,7 +313,7 @@ public class CompactionTool extends Configured implements Tool { } // Write Input File - FSDataOutputStream stream = fs.create(path); + FSDataOutputStream stream = stagingFs.create(path); LOG.info("Create input file=" + path + " with " + storeDirs.size() + " dirs to compact."); try { final byte[] newLine = Bytes.toBytes("\n"); @@ -337,6 +324,7 @@ public class CompactionTool extends Configured implements Tool { } finally { stream.close(); } + return storeDirs; } } @@ -361,15 +349,20 @@ public class CompactionTool extends Configured implements Tool { // add dependencies (including HBase ones) TableMapReduceUtil.addDependencyJars(job); - Path stagingDir = JobUtil.getStagingDir(conf); + Path stagingDir = JobUtil.getQualifiedStagingDir(conf); + FileSystem stagingFs = stagingDir.getFileSystem(conf); try { // Create input file with the store dirs Path inputPath = new Path(stagingDir, "compact-"+ EnvironmentEdgeManager.currentTime()); - CompactionInputFormat.createInputFile(fs, inputPath, toCompactDirs); + List storeDirs = CompactionInputFormat.createInputFile(fs, stagingFs, + inputPath, toCompactDirs); CompactionInputFormat.addInputPath(job, inputPath); // Initialize credential for secure cluster TableMapReduceUtil.initCredentials(job); + // Despite the method name this will get delegation token for the filesystem + TokenCache.obtainTokensForNamenodes(job.getCredentials(), + storeDirs.toArray(new Path[0]), conf); // Start the MR Job and wait return job.waitForCompletion(true) ? 0 : 1; @@ -398,7 +391,7 @@ public class CompactionTool extends Configured implements Tool { boolean mapred = false; Configuration conf = getConf(); - FileSystem fs = FileSystem.get(conf); + FileSystem fs = CommonFSUtils.getRootDirFileSystem(conf); try { for (int i = 0; i < args.length; ++i) { @@ -458,14 +451,15 @@ public class CompactionTool extends Configured implements Tool { System.err.println("Note: -D properties will be applied to the conf used. "); System.err.println("For example: "); System.err.println(" To stop delete of compacted file, pass -D"+CONF_DELETE_COMPACTED+"=false"); - System.err.println(" To set tmp dir, pass -D"+CONF_TMP_DIR+"=ALTERNATE_DIR"); System.err.println(); System.err.println("Examples:"); System.err.println(" To compact the full 'TestTable' using MapReduce:"); - System.err.println(" $ hbase " + this.getClass().getName() + " -mapred hdfs://hbase/data/default/TestTable"); + System.err.println(" $ hbase " + this.getClass().getName() + + " -mapred hdfs://hbase/data/default/TestTable"); System.err.println(); System.err.println(" To compact column family 'x' of the table 'TestTable' region 'abc':"); - System.err.println(" $ hbase " + this.getClass().getName() + " hdfs://hbase/data/default/TestTable/abc/x"); + System.err.println(" $ hbase " + this.getClass().getName() + + " hdfs://hbase/data/default/TestTable/abc/x"); } public static void main(String[] args) throws Exception {