diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionTool.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionTool.java index 746bb5ff82c..00ad54b394a 100644 --- a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionTool.java +++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionTool.java @@ -80,14 +80,12 @@ public class CompactionTool extends Configured implements Tool { private final static String CONF_COMPACT_ONCE = "hbase.compactiontool.compact.once"; private final static String CONF_COMPACT_MAJOR = "hbase.compactiontool.compact.major"; private final static String CONF_DELETE_COMPACTED = "hbase.compactiontool.delete"; - private final static String CONF_COMPLETE_COMPACTION = "hbase.hstore.compaction.complete"; /** * Class responsible to execute the Compaction on the specified path. * The path can be a table, region or family directory. */ private static class CompactionWorker { - private final boolean keepCompactedFiles; private final boolean deleteCompacted; private final Configuration conf; private final FileSystem fs; @@ -95,7 +93,6 @@ public class CompactionTool extends Configured implements Tool { public CompactionWorker(final FileSystem fs, final Configuration conf) { this.conf = conf; - this.keepCompactedFiles = !conf.getBoolean(CONF_COMPLETE_COMPACTION, true); this.deleteCompacted = conf.getBoolean(CONF_DELETE_COMPACTED, false); this.tmpDir = new Path(conf.get(CONF_TMP_DIR)); this.fs = fs; @@ -169,7 +166,7 @@ public class CompactionTool extends Configured implements Tool { List storeFiles = store.compact(compaction.get(), NoLimitThroughputController.INSTANCE, null); if (storeFiles != null && !storeFiles.isEmpty()) { - if (keepCompactedFiles && deleteCompacted) { + if (deleteCompacted) { for (HStoreFile storeFile: storeFiles) { fs.delete(storeFile.getPath(), false); } @@ -458,7 +455,6 @@ public class CompactionTool extends Configured implements Tool { System.err.println(); System.err.println("Note: -D properties will be applied to the conf used. "); System.err.println("For example: "); - System.err.println(" To preserve input files, pass -D"+CONF_COMPLETE_COMPACTION+"=false"); System.err.println(" To stop delete of compacted file, pass -D"+CONF_DELETE_COMPACTED+"=false"); System.err.println(" To set tmp dir, pass -D"+CONF_TMP_DIR+"=ALTERNATE_DIR"); System.err.println(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java index 269ecbde922..8daaa780567 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java @@ -654,7 +654,8 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat completeCompaction(toBeRemovedStoreFiles); } - private HStoreFile createStoreFileAndReader(final Path p) throws IOException { + @VisibleForTesting + protected HStoreFile createStoreFileAndReader(final Path p) throws IOException { StoreFileInfo info = new StoreFileInfo(conf, this.getFileSystem(), p); return createStoreFileAndReader(info); } @@ -1353,54 +1354,45 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat " into tmpdir=" + fs.getTempDir() + ", totalSize=" + TraditionalBinaryPrefix.long2String(cr.getSize(), "", 1)); - // Commence the compaction. - List newFiles = compaction.compact(throughputController, user); - - // TODO: get rid of this! - if (!this.conf.getBoolean("hbase.hstore.compaction.complete", true)) { - LOG.warn("hbase.hstore.compaction.complete is set to false"); - sfs = new ArrayList<>(newFiles.size()); - final boolean evictOnClose = - cacheConf != null? cacheConf.shouldEvictOnClose(): true; - for (Path newFile : newFiles) { - // Create storefile around what we wrote with a reader on it. - HStoreFile sf = createStoreFileAndReader(newFile); - sf.closeStoreFile(evictOnClose); - sfs.add(sf); - } - return sfs; - } - // Do the steps necessary to complete the compaction. - sfs = moveCompactedFilesIntoPlace(cr, newFiles, user); - writeCompactionWalRecord(filesToCompact, sfs); - replaceStoreFiles(filesToCompact, sfs); - if (cr.isMajor()) { - majorCompactedCellsCount += getCompactionProgress().totalCompactingKVs; - majorCompactedCellsSize += getCompactionProgress().totalCompactedSize; - } else { - compactedCellsCount += getCompactionProgress().totalCompactingKVs; - compactedCellsSize += getCompactionProgress().totalCompactedSize; - } - long outputBytes = getTotalSize(sfs); - - // At this point the store will use new files for all new scanners. - completeCompaction(filesToCompact); // update store size. - - long now = EnvironmentEdgeManager.currentTime(); - if (region.getRegionServerServices() != null - && region.getRegionServerServices().getMetrics() != null) { - region.getRegionServerServices().getMetrics().updateCompaction(cr.isMajor(), - now - compactionStartTime, cr.getFiles().size(), newFiles.size(), cr.getSize(), - outputBytes); - } - - logCompactionEndMessage(cr, sfs, now, compactionStartTime); - return sfs; + return doCompaction(cr, filesToCompact, user, compactionStartTime, + compaction.compact(throughputController, user)); } finally { finishCompactionRequest(cr); } } + @VisibleForTesting + protected List doCompaction(CompactionRequestImpl cr, + Collection filesToCompact, User user, long compactionStartTime, + List newFiles) throws IOException { + // Do the steps necessary to complete the compaction. + List sfs = moveCompactedFilesIntoPlace(cr, newFiles, user); + writeCompactionWalRecord(filesToCompact, sfs); + replaceStoreFiles(filesToCompact, sfs); + if (cr.isMajor()) { + majorCompactedCellsCount += getCompactionProgress().totalCompactingKVs; + majorCompactedCellsSize += getCompactionProgress().totalCompactedSize; + } else { + compactedCellsCount += getCompactionProgress().totalCompactingKVs; + compactedCellsSize += getCompactionProgress().totalCompactedSize; + } + long outputBytes = getTotalSize(sfs); + + // At this point the store will use new files for all new scanners. + completeCompaction(filesToCompact); // update store size. + + long now = EnvironmentEdgeManager.currentTime(); + if (region.getRegionServerServices() != null + && region.getRegionServerServices().getMetrics() != null) { + region.getRegionServerServices().getMetrics().updateCompaction(cr.isMajor(), + now - compactionStartTime, cr.getFiles().size(), newFiles.size(), cr.getSize(), + outputBytes); + } + + logCompactionEndMessage(cr, sfs, now, compactionStartTime); + return sfs; + } + private List moveCompactedFilesIntoPlace(CompactionRequestImpl cr, List newFiles, User user) throws IOException { List sfs = new ArrayList<>(newFiles.size()); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java index ef5f3a5248e..5ddd4dfb120 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java @@ -102,6 +102,7 @@ import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TagType; import org.apache.hadoop.hbase.Waiter; import org.apache.hadoop.hbase.client.Append; +import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Durability; import org.apache.hadoop.hbase.client.Get; @@ -136,6 +137,7 @@ import org.apache.hadoop.hbase.regionserver.HRegion.MutationBatchOperation; import org.apache.hadoop.hbase.regionserver.HRegion.RegionScannerImpl; import org.apache.hadoop.hbase.regionserver.Region.RowLock; import org.apache.hadoop.hbase.regionserver.TestHStore.FaultyFileSystem; +import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequestImpl; import org.apache.hadoop.hbase.regionserver.wal.FSHLog; import org.apache.hadoop.hbase.regionserver.wal.MetricsWAL; import org.apache.hadoop.hbase.regionserver.wal.MetricsWALSource; @@ -843,6 +845,7 @@ public class TestHRegion { } public void testRecoveredEditsReplayCompaction(boolean mismatchedRegionName) throws Exception { + CONF.setClass(HConstants.REGION_IMPL, HRegionForTesting.class, Region.class); byte[] family = Bytes.toBytes("family"); this.region = initHRegion(tableName, method, CONF, family); final WALFactory wals = new WALFactory(CONF, null, method); @@ -945,6 +948,7 @@ public class TestHRegion { HBaseTestingUtility.closeRegionAndWAL(this.region); this.region = null; wals.close(); + CONF.setClass(HConstants.REGION_IMPL, HRegion.class, Region.class); } } @@ -6487,4 +6491,80 @@ public class TestHRegion { this.region = null; } } + + /** + * The same as HRegion class, the only difference is that instantiateHStore will + * create a different HStore - HStoreForTesting. [HBASE-8518] + */ + public static class HRegionForTesting extends HRegion { + + public HRegionForTesting(final Path tableDir, final WAL wal, final FileSystem fs, + final Configuration confParam, final RegionInfo regionInfo, + final TableDescriptor htd, final RegionServerServices rsServices) { + this(new HRegionFileSystem(confParam, fs, tableDir, regionInfo), + wal, confParam, htd, rsServices); + } + + public HRegionForTesting(HRegionFileSystem fs, WAL wal, + Configuration confParam, TableDescriptor htd, + RegionServerServices rsServices) { + super(fs, wal, confParam, htd, rsServices); + } + + /** + * Create HStore instance. + * @return If Mob is enabled, return HMobStore, otherwise return HStoreForTesting. + */ + @Override + protected HStore instantiateHStore(final ColumnFamilyDescriptor family) throws IOException { + if (family.isMobEnabled()) { + if (HFile.getFormatVersion(this.conf) < HFile.MIN_FORMAT_VERSION_WITH_TAGS) { + throw new IOException("A minimum HFile version of " + + HFile.MIN_FORMAT_VERSION_WITH_TAGS + + " is required for MOB feature. Consider setting " + HFile.FORMAT_VERSION_KEY + + " accordingly."); + } + return new HMobStore(this, family, this.conf); + } + return new HStoreForTesting(this, family, this.conf); + } + } + + /** + * HStoreForTesting is merely the same as HStore, the difference is in the doCompaction method + * of HStoreForTesting there is a checkpoint "hbase.hstore.compaction.complete" which + * doesn't let hstore compaction complete. In the former edition, this config is set in + * HStore class inside compact method, though this is just for testing, otherwise it + * doesn't do any help. In HBASE-8518, we try to get rid of all "hbase.hstore.compaction.complete" + * config (except for testing code). + */ + public static class HStoreForTesting extends HStore { + + protected HStoreForTesting(final HRegion region, + final ColumnFamilyDescriptor family, + final Configuration confParam) throws IOException { + super(region, family, confParam); + } + + @Override + protected List doCompaction(CompactionRequestImpl cr, + Collection filesToCompact, User user, long compactionStartTime, + List newFiles) throws IOException { + // let compaction incomplete. + if (!this.conf.getBoolean("hbase.hstore.compaction.complete", true)) { + LOG.warn("hbase.hstore.compaction.complete is set to false"); + List sfs = new ArrayList<>(newFiles.size()); + final boolean evictOnClose = + cacheConf != null? cacheConf.shouldEvictOnClose(): true; + for (Path newFile : newFiles) { + // Create storefile around what we wrote with a reader on it. + HStoreFile sf = createStoreFileAndReader(newFile); + sf.closeStoreFile(evictOnClose); + sfs.add(sf); + } + return sfs; + } + return super.doCompaction(cr, filesToCompact, user, compactionStartTime, newFiles); + } + } }