HBASE-8518 Get rid of hbase.hstore.compaction.complete setting
Signed-off-by: Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
parent
e2f07aafb6
commit
9e141d12a4
|
@ -80,14 +80,12 @@ public class CompactionTool extends Configured implements Tool {
|
||||||
private final static String CONF_COMPACT_ONCE = "hbase.compactiontool.compact.once";
|
private final static String CONF_COMPACT_ONCE = "hbase.compactiontool.compact.once";
|
||||||
private final static String CONF_COMPACT_MAJOR = "hbase.compactiontool.compact.major";
|
private final static String CONF_COMPACT_MAJOR = "hbase.compactiontool.compact.major";
|
||||||
private final static String CONF_DELETE_COMPACTED = "hbase.compactiontool.delete";
|
private final static String CONF_DELETE_COMPACTED = "hbase.compactiontool.delete";
|
||||||
private final static String CONF_COMPLETE_COMPACTION = "hbase.hstore.compaction.complete";
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Class responsible to execute the Compaction on the specified path.
|
* Class responsible to execute the Compaction on the specified path.
|
||||||
* The path can be a table, region or family directory.
|
* The path can be a table, region or family directory.
|
||||||
*/
|
*/
|
||||||
private static class CompactionWorker {
|
private static class CompactionWorker {
|
||||||
private final boolean keepCompactedFiles;
|
|
||||||
private final boolean deleteCompacted;
|
private final boolean deleteCompacted;
|
||||||
private final Configuration conf;
|
private final Configuration conf;
|
||||||
private final FileSystem fs;
|
private final FileSystem fs;
|
||||||
|
@ -95,7 +93,6 @@ public class CompactionTool extends Configured implements Tool {
|
||||||
|
|
||||||
public CompactionWorker(final FileSystem fs, final Configuration conf) {
|
public CompactionWorker(final FileSystem fs, final Configuration conf) {
|
||||||
this.conf = conf;
|
this.conf = conf;
|
||||||
this.keepCompactedFiles = !conf.getBoolean(CONF_COMPLETE_COMPACTION, true);
|
|
||||||
this.deleteCompacted = conf.getBoolean(CONF_DELETE_COMPACTED, false);
|
this.deleteCompacted = conf.getBoolean(CONF_DELETE_COMPACTED, false);
|
||||||
this.tmpDir = new Path(conf.get(CONF_TMP_DIR));
|
this.tmpDir = new Path(conf.get(CONF_TMP_DIR));
|
||||||
this.fs = fs;
|
this.fs = fs;
|
||||||
|
@ -169,7 +166,7 @@ public class CompactionTool extends Configured implements Tool {
|
||||||
List<HStoreFile> storeFiles =
|
List<HStoreFile> storeFiles =
|
||||||
store.compact(compaction.get(), NoLimitThroughputController.INSTANCE, null);
|
store.compact(compaction.get(), NoLimitThroughputController.INSTANCE, null);
|
||||||
if (storeFiles != null && !storeFiles.isEmpty()) {
|
if (storeFiles != null && !storeFiles.isEmpty()) {
|
||||||
if (keepCompactedFiles && deleteCompacted) {
|
if (deleteCompacted) {
|
||||||
for (HStoreFile storeFile: storeFiles) {
|
for (HStoreFile storeFile: storeFiles) {
|
||||||
fs.delete(storeFile.getPath(), false);
|
fs.delete(storeFile.getPath(), false);
|
||||||
}
|
}
|
||||||
|
@ -458,7 +455,6 @@ public class CompactionTool extends Configured implements Tool {
|
||||||
System.err.println();
|
System.err.println();
|
||||||
System.err.println("Note: -D properties will be applied to the conf used. ");
|
System.err.println("Note: -D properties will be applied to the conf used. ");
|
||||||
System.err.println("For example: ");
|
System.err.println("For example: ");
|
||||||
System.err.println(" To preserve input files, pass -D"+CONF_COMPLETE_COMPACTION+"=false");
|
|
||||||
System.err.println(" To stop delete of compacted file, pass -D"+CONF_DELETE_COMPACTED+"=false");
|
System.err.println(" To stop delete of compacted file, pass -D"+CONF_DELETE_COMPACTED+"=false");
|
||||||
System.err.println(" To set tmp dir, pass -D"+CONF_TMP_DIR+"=ALTERNATE_DIR");
|
System.err.println(" To set tmp dir, pass -D"+CONF_TMP_DIR+"=ALTERNATE_DIR");
|
||||||
System.err.println();
|
System.err.println();
|
||||||
|
|
|
@ -654,7 +654,8 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat
|
||||||
completeCompaction(toBeRemovedStoreFiles);
|
completeCompaction(toBeRemovedStoreFiles);
|
||||||
}
|
}
|
||||||
|
|
||||||
private HStoreFile createStoreFileAndReader(final Path p) throws IOException {
|
@VisibleForTesting
|
||||||
|
protected HStoreFile createStoreFileAndReader(final Path p) throws IOException {
|
||||||
StoreFileInfo info = new StoreFileInfo(conf, this.getFileSystem(), p);
|
StoreFileInfo info = new StoreFileInfo(conf, this.getFileSystem(), p);
|
||||||
return createStoreFileAndReader(info);
|
return createStoreFileAndReader(info);
|
||||||
}
|
}
|
||||||
|
@ -1353,54 +1354,45 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat
|
||||||
" into tmpdir=" + fs.getTempDir() + ", totalSize=" +
|
" into tmpdir=" + fs.getTempDir() + ", totalSize=" +
|
||||||
TraditionalBinaryPrefix.long2String(cr.getSize(), "", 1));
|
TraditionalBinaryPrefix.long2String(cr.getSize(), "", 1));
|
||||||
|
|
||||||
// Commence the compaction.
|
return doCompaction(cr, filesToCompact, user, compactionStartTime,
|
||||||
List<Path> newFiles = compaction.compact(throughputController, user);
|
compaction.compact(throughputController, user));
|
||||||
|
|
||||||
// TODO: get rid of this!
|
|
||||||
if (!this.conf.getBoolean("hbase.hstore.compaction.complete", true)) {
|
|
||||||
LOG.warn("hbase.hstore.compaction.complete is set to false");
|
|
||||||
sfs = new ArrayList<>(newFiles.size());
|
|
||||||
final boolean evictOnClose =
|
|
||||||
cacheConf != null? cacheConf.shouldEvictOnClose(): true;
|
|
||||||
for (Path newFile : newFiles) {
|
|
||||||
// Create storefile around what we wrote with a reader on it.
|
|
||||||
HStoreFile sf = createStoreFileAndReader(newFile);
|
|
||||||
sf.closeStoreFile(evictOnClose);
|
|
||||||
sfs.add(sf);
|
|
||||||
}
|
|
||||||
return sfs;
|
|
||||||
}
|
|
||||||
// Do the steps necessary to complete the compaction.
|
|
||||||
sfs = moveCompactedFilesIntoPlace(cr, newFiles, user);
|
|
||||||
writeCompactionWalRecord(filesToCompact, sfs);
|
|
||||||
replaceStoreFiles(filesToCompact, sfs);
|
|
||||||
if (cr.isMajor()) {
|
|
||||||
majorCompactedCellsCount += getCompactionProgress().totalCompactingKVs;
|
|
||||||
majorCompactedCellsSize += getCompactionProgress().totalCompactedSize;
|
|
||||||
} else {
|
|
||||||
compactedCellsCount += getCompactionProgress().totalCompactingKVs;
|
|
||||||
compactedCellsSize += getCompactionProgress().totalCompactedSize;
|
|
||||||
}
|
|
||||||
long outputBytes = getTotalSize(sfs);
|
|
||||||
|
|
||||||
// At this point the store will use new files for all new scanners.
|
|
||||||
completeCompaction(filesToCompact); // update store size.
|
|
||||||
|
|
||||||
long now = EnvironmentEdgeManager.currentTime();
|
|
||||||
if (region.getRegionServerServices() != null
|
|
||||||
&& region.getRegionServerServices().getMetrics() != null) {
|
|
||||||
region.getRegionServerServices().getMetrics().updateCompaction(cr.isMajor(),
|
|
||||||
now - compactionStartTime, cr.getFiles().size(), newFiles.size(), cr.getSize(),
|
|
||||||
outputBytes);
|
|
||||||
}
|
|
||||||
|
|
||||||
logCompactionEndMessage(cr, sfs, now, compactionStartTime);
|
|
||||||
return sfs;
|
|
||||||
} finally {
|
} finally {
|
||||||
finishCompactionRequest(cr);
|
finishCompactionRequest(cr);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
protected List<HStoreFile> doCompaction(CompactionRequestImpl cr,
|
||||||
|
Collection<HStoreFile> filesToCompact, User user, long compactionStartTime,
|
||||||
|
List<Path> newFiles) throws IOException {
|
||||||
|
// Do the steps necessary to complete the compaction.
|
||||||
|
List<HStoreFile> sfs = moveCompactedFilesIntoPlace(cr, newFiles, user);
|
||||||
|
writeCompactionWalRecord(filesToCompact, sfs);
|
||||||
|
replaceStoreFiles(filesToCompact, sfs);
|
||||||
|
if (cr.isMajor()) {
|
||||||
|
majorCompactedCellsCount += getCompactionProgress().totalCompactingKVs;
|
||||||
|
majorCompactedCellsSize += getCompactionProgress().totalCompactedSize;
|
||||||
|
} else {
|
||||||
|
compactedCellsCount += getCompactionProgress().totalCompactingKVs;
|
||||||
|
compactedCellsSize += getCompactionProgress().totalCompactedSize;
|
||||||
|
}
|
||||||
|
long outputBytes = getTotalSize(sfs);
|
||||||
|
|
||||||
|
// At this point the store will use new files for all new scanners.
|
||||||
|
completeCompaction(filesToCompact); // update store size.
|
||||||
|
|
||||||
|
long now = EnvironmentEdgeManager.currentTime();
|
||||||
|
if (region.getRegionServerServices() != null
|
||||||
|
&& region.getRegionServerServices().getMetrics() != null) {
|
||||||
|
region.getRegionServerServices().getMetrics().updateCompaction(cr.isMajor(),
|
||||||
|
now - compactionStartTime, cr.getFiles().size(), newFiles.size(), cr.getSize(),
|
||||||
|
outputBytes);
|
||||||
|
}
|
||||||
|
|
||||||
|
logCompactionEndMessage(cr, sfs, now, compactionStartTime);
|
||||||
|
return sfs;
|
||||||
|
}
|
||||||
|
|
||||||
private List<HStoreFile> moveCompactedFilesIntoPlace(CompactionRequestImpl cr, List<Path> newFiles,
|
private List<HStoreFile> moveCompactedFilesIntoPlace(CompactionRequestImpl cr, List<Path> newFiles,
|
||||||
User user) throws IOException {
|
User user) throws IOException {
|
||||||
List<HStoreFile> sfs = new ArrayList<>(newFiles.size());
|
List<HStoreFile> sfs = new ArrayList<>(newFiles.size());
|
||||||
|
|
|
@ -102,6 +102,7 @@ import org.apache.hadoop.hbase.TableName;
|
||||||
import org.apache.hadoop.hbase.TagType;
|
import org.apache.hadoop.hbase.TagType;
|
||||||
import org.apache.hadoop.hbase.Waiter;
|
import org.apache.hadoop.hbase.Waiter;
|
||||||
import org.apache.hadoop.hbase.client.Append;
|
import org.apache.hadoop.hbase.client.Append;
|
||||||
|
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
|
||||||
import org.apache.hadoop.hbase.client.Delete;
|
import org.apache.hadoop.hbase.client.Delete;
|
||||||
import org.apache.hadoop.hbase.client.Durability;
|
import org.apache.hadoop.hbase.client.Durability;
|
||||||
import org.apache.hadoop.hbase.client.Get;
|
import org.apache.hadoop.hbase.client.Get;
|
||||||
|
@ -136,6 +137,7 @@ import org.apache.hadoop.hbase.regionserver.HRegion.MutationBatchOperation;
|
||||||
import org.apache.hadoop.hbase.regionserver.HRegion.RegionScannerImpl;
|
import org.apache.hadoop.hbase.regionserver.HRegion.RegionScannerImpl;
|
||||||
import org.apache.hadoop.hbase.regionserver.Region.RowLock;
|
import org.apache.hadoop.hbase.regionserver.Region.RowLock;
|
||||||
import org.apache.hadoop.hbase.regionserver.TestHStore.FaultyFileSystem;
|
import org.apache.hadoop.hbase.regionserver.TestHStore.FaultyFileSystem;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequestImpl;
|
||||||
import org.apache.hadoop.hbase.regionserver.wal.FSHLog;
|
import org.apache.hadoop.hbase.regionserver.wal.FSHLog;
|
||||||
import org.apache.hadoop.hbase.regionserver.wal.MetricsWAL;
|
import org.apache.hadoop.hbase.regionserver.wal.MetricsWAL;
|
||||||
import org.apache.hadoop.hbase.regionserver.wal.MetricsWALSource;
|
import org.apache.hadoop.hbase.regionserver.wal.MetricsWALSource;
|
||||||
|
@ -843,6 +845,7 @@ public class TestHRegion {
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testRecoveredEditsReplayCompaction(boolean mismatchedRegionName) throws Exception {
|
public void testRecoveredEditsReplayCompaction(boolean mismatchedRegionName) throws Exception {
|
||||||
|
CONF.setClass(HConstants.REGION_IMPL, HRegionForTesting.class, Region.class);
|
||||||
byte[] family = Bytes.toBytes("family");
|
byte[] family = Bytes.toBytes("family");
|
||||||
this.region = initHRegion(tableName, method, CONF, family);
|
this.region = initHRegion(tableName, method, CONF, family);
|
||||||
final WALFactory wals = new WALFactory(CONF, null, method);
|
final WALFactory wals = new WALFactory(CONF, null, method);
|
||||||
|
@ -945,6 +948,7 @@ public class TestHRegion {
|
||||||
HBaseTestingUtility.closeRegionAndWAL(this.region);
|
HBaseTestingUtility.closeRegionAndWAL(this.region);
|
||||||
this.region = null;
|
this.region = null;
|
||||||
wals.close();
|
wals.close();
|
||||||
|
CONF.setClass(HConstants.REGION_IMPL, HRegion.class, Region.class);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -6487,4 +6491,80 @@ public class TestHRegion {
|
||||||
this.region = null;
|
this.region = null;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The same as HRegion class, the only difference is that instantiateHStore will
|
||||||
|
* create a different HStore - HStoreForTesting. [HBASE-8518]
|
||||||
|
*/
|
||||||
|
public static class HRegionForTesting extends HRegion {
|
||||||
|
|
||||||
|
public HRegionForTesting(final Path tableDir, final WAL wal, final FileSystem fs,
|
||||||
|
final Configuration confParam, final RegionInfo regionInfo,
|
||||||
|
final TableDescriptor htd, final RegionServerServices rsServices) {
|
||||||
|
this(new HRegionFileSystem(confParam, fs, tableDir, regionInfo),
|
||||||
|
wal, confParam, htd, rsServices);
|
||||||
|
}
|
||||||
|
|
||||||
|
public HRegionForTesting(HRegionFileSystem fs, WAL wal,
|
||||||
|
Configuration confParam, TableDescriptor htd,
|
||||||
|
RegionServerServices rsServices) {
|
||||||
|
super(fs, wal, confParam, htd, rsServices);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create HStore instance.
|
||||||
|
* @return If Mob is enabled, return HMobStore, otherwise return HStoreForTesting.
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
protected HStore instantiateHStore(final ColumnFamilyDescriptor family) throws IOException {
|
||||||
|
if (family.isMobEnabled()) {
|
||||||
|
if (HFile.getFormatVersion(this.conf) < HFile.MIN_FORMAT_VERSION_WITH_TAGS) {
|
||||||
|
throw new IOException("A minimum HFile version of "
|
||||||
|
+ HFile.MIN_FORMAT_VERSION_WITH_TAGS
|
||||||
|
+ " is required for MOB feature. Consider setting " + HFile.FORMAT_VERSION_KEY
|
||||||
|
+ " accordingly.");
|
||||||
|
}
|
||||||
|
return new HMobStore(this, family, this.conf);
|
||||||
|
}
|
||||||
|
return new HStoreForTesting(this, family, this.conf);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* HStoreForTesting is merely the same as HStore, the difference is in the doCompaction method
|
||||||
|
* of HStoreForTesting there is a checkpoint "hbase.hstore.compaction.complete" which
|
||||||
|
* doesn't let hstore compaction complete. In the former edition, this config is set in
|
||||||
|
* HStore class inside compact method, though this is just for testing, otherwise it
|
||||||
|
* doesn't do any help. In HBASE-8518, we try to get rid of all "hbase.hstore.compaction.complete"
|
||||||
|
* config (except for testing code).
|
||||||
|
*/
|
||||||
|
public static class HStoreForTesting extends HStore {
|
||||||
|
|
||||||
|
protected HStoreForTesting(final HRegion region,
|
||||||
|
final ColumnFamilyDescriptor family,
|
||||||
|
final Configuration confParam) throws IOException {
|
||||||
|
super(region, family, confParam);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected List<HStoreFile> doCompaction(CompactionRequestImpl cr,
|
||||||
|
Collection<HStoreFile> filesToCompact, User user, long compactionStartTime,
|
||||||
|
List<Path> newFiles) throws IOException {
|
||||||
|
// let compaction incomplete.
|
||||||
|
if (!this.conf.getBoolean("hbase.hstore.compaction.complete", true)) {
|
||||||
|
LOG.warn("hbase.hstore.compaction.complete is set to false");
|
||||||
|
List<HStoreFile> sfs = new ArrayList<>(newFiles.size());
|
||||||
|
final boolean evictOnClose =
|
||||||
|
cacheConf != null? cacheConf.shouldEvictOnClose(): true;
|
||||||
|
for (Path newFile : newFiles) {
|
||||||
|
// Create storefile around what we wrote with a reader on it.
|
||||||
|
HStoreFile sf = createStoreFileAndReader(newFile);
|
||||||
|
sf.closeStoreFile(evictOnClose);
|
||||||
|
sfs.add(sf);
|
||||||
|
}
|
||||||
|
return sfs;
|
||||||
|
}
|
||||||
|
return super.doCompaction(cr, filesToCompact, user, compactionStartTime, newFiles);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue