HBASE-25756 Support alternate compression for major and minor compactions (#3142)
Signed-off-by: Duo Zhang <zhangduo@apache.org> Conflicts: hbase-client/src/main/java/org/apache/hadoop/hbase/HColumnDescriptor.java hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreCompactor.java hbase-shell/src/main/ruby/hbase/admin.rb
This commit is contained in:
parent
f374357cc3
commit
01ce44abc4
|
@ -45,6 +45,8 @@ public class HColumnDescriptor implements ColumnFamilyDescriptor, Comparable<HCo
|
||||||
public static final String IN_MEMORY_COMPACTION = ColumnFamilyDescriptorBuilder.IN_MEMORY_COMPACTION;
|
public static final String IN_MEMORY_COMPACTION = ColumnFamilyDescriptorBuilder.IN_MEMORY_COMPACTION;
|
||||||
public static final String COMPRESSION = ColumnFamilyDescriptorBuilder.COMPRESSION;
|
public static final String COMPRESSION = ColumnFamilyDescriptorBuilder.COMPRESSION;
|
||||||
public static final String COMPRESSION_COMPACT = ColumnFamilyDescriptorBuilder.COMPRESSION_COMPACT;
|
public static final String COMPRESSION_COMPACT = ColumnFamilyDescriptorBuilder.COMPRESSION_COMPACT;
|
||||||
|
public static final String COMPRESSION_COMPACT_MAJOR = ColumnFamilyDescriptorBuilder.COMPRESSION_COMPACT_MAJOR;
|
||||||
|
public static final String COMPRESSION_COMPACT_MINOR = ColumnFamilyDescriptorBuilder.COMPRESSION_COMPACT_MINOR;
|
||||||
public static final String ENCODE_ON_DISK = "ENCODE_ON_DISK";
|
public static final String ENCODE_ON_DISK = "ENCODE_ON_DISK";
|
||||||
public static final String DATA_BLOCK_ENCODING = ColumnFamilyDescriptorBuilder.DATA_BLOCK_ENCODING;
|
public static final String DATA_BLOCK_ENCODING = ColumnFamilyDescriptorBuilder.DATA_BLOCK_ENCODING;
|
||||||
public static final String BLOCKCACHE = ColumnFamilyDescriptorBuilder.BLOCKCACHE;
|
public static final String BLOCKCACHE = ColumnFamilyDescriptorBuilder.BLOCKCACHE;
|
||||||
|
@ -373,6 +375,16 @@ public class HColumnDescriptor implements ColumnFamilyDescriptor, Comparable<HCo
|
||||||
return delegatee.getCompactionCompressionType();
|
return delegatee.getCompactionCompressionType();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Compression.Algorithm getMajorCompactionCompressionType() {
|
||||||
|
return delegatee.getMajorCompactionCompressionType();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Compression.Algorithm getMinorCompactionCompressionType() {
|
||||||
|
return delegatee.getMinorCompactionCompressionType();
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Compression types supported in hbase.
|
* Compression types supported in hbase.
|
||||||
* LZO is not bundled as part of the hbase distribution.
|
* LZO is not bundled as part of the hbase distribution.
|
||||||
|
@ -386,6 +398,16 @@ public class HColumnDescriptor implements ColumnFamilyDescriptor, Comparable<HCo
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public HColumnDescriptor setMajorCompactionCompressionType(Compression.Algorithm value) {
|
||||||
|
getDelegateeForModification().setMajorCompactionCompressionType(value);
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public HColumnDescriptor setMinorCompactionCompressionType(Compression.Algorithm value) {
|
||||||
|
getDelegateeForModification().setMinorCompactionCompressionType(value);
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean isInMemory() {
|
public boolean isInMemory() {
|
||||||
return delegatee.isInMemory();
|
return delegatee.isInMemory();
|
||||||
|
|
|
@ -94,10 +94,22 @@ public interface ColumnFamilyDescriptor {
|
||||||
* @return Compression type setting.
|
* @return Compression type setting.
|
||||||
*/
|
*/
|
||||||
Compression.Algorithm getCompactionCompressionType();
|
Compression.Algorithm getCompactionCompressionType();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return Compression type setting for major compactions.
|
||||||
|
*/
|
||||||
|
Compression.Algorithm getMajorCompactionCompressionType();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return Compression type setting for minor compactions.
|
||||||
|
*/
|
||||||
|
Compression.Algorithm getMinorCompactionCompressionType();
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @return Compression type setting.
|
* @return Compression type setting.
|
||||||
*/
|
*/
|
||||||
Compression.Algorithm getCompressionType();
|
Compression.Algorithm getCompressionType();
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @return an unmodifiable map.
|
* @return an unmodifiable map.
|
||||||
*/
|
*/
|
||||||
|
|
|
@ -75,6 +75,10 @@ public class ColumnFamilyDescriptorBuilder {
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
public static final String COMPRESSION_COMPACT = "COMPRESSION_COMPACT";
|
public static final String COMPRESSION_COMPACT = "COMPRESSION_COMPACT";
|
||||||
private static final Bytes COMPRESSION_COMPACT_BYTES = new Bytes(Bytes.toBytes(COMPRESSION_COMPACT));
|
private static final Bytes COMPRESSION_COMPACT_BYTES = new Bytes(Bytes.toBytes(COMPRESSION_COMPACT));
|
||||||
|
public static final String COMPRESSION_COMPACT_MAJOR = "COMPRESSION_COMPACT_MAJOR";
|
||||||
|
private static final Bytes COMPRESSION_COMPACT_MAJOR_BYTES = new Bytes(Bytes.toBytes(COMPRESSION_COMPACT_MAJOR));
|
||||||
|
public static final String COMPRESSION_COMPACT_MINOR = "COMPRESSION_COMPACT_MINOR";
|
||||||
|
private static final Bytes COMPRESSION_COMPACT_MINOR_BYTES = new Bytes(Bytes.toBytes(COMPRESSION_COMPACT_MINOR));
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
public static final String DATA_BLOCK_ENCODING = "DATA_BLOCK_ENCODING";
|
public static final String DATA_BLOCK_ENCODING = "DATA_BLOCK_ENCODING";
|
||||||
private static final Bytes DATA_BLOCK_ENCODING_BYTES = new Bytes(Bytes.toBytes(DATA_BLOCK_ENCODING));
|
private static final Bytes DATA_BLOCK_ENCODING_BYTES = new Bytes(Bytes.toBytes(DATA_BLOCK_ENCODING));
|
||||||
|
@ -449,6 +453,16 @@ public class ColumnFamilyDescriptorBuilder {
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public ColumnFamilyDescriptorBuilder setMajorCompactionCompressionType(Compression.Algorithm value) {
|
||||||
|
desc.setMajorCompactionCompressionType(value);
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public ColumnFamilyDescriptorBuilder setMinorCompactionCompressionType(Compression.Algorithm value) {
|
||||||
|
desc.setMinorCompactionCompressionType(value);
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
public ColumnFamilyDescriptorBuilder setCompressTags(boolean value) {
|
public ColumnFamilyDescriptorBuilder setCompressTags(boolean value) {
|
||||||
desc.setCompressTags(value);
|
desc.setCompressTags(value);
|
||||||
return this;
|
return this;
|
||||||
|
@ -851,6 +865,18 @@ public class ColumnFamilyDescriptorBuilder {
|
||||||
n -> Compression.Algorithm.valueOf(n.toUpperCase()), getCompressionType());
|
n -> Compression.Algorithm.valueOf(n.toUpperCase()), getCompressionType());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Compression.Algorithm getMajorCompactionCompressionType() {
|
||||||
|
return getStringOrDefault(COMPRESSION_COMPACT_MAJOR_BYTES,
|
||||||
|
n -> Compression.Algorithm.valueOf(n.toUpperCase()), getCompactionCompressionType());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Compression.Algorithm getMinorCompactionCompressionType() {
|
||||||
|
return getStringOrDefault(COMPRESSION_COMPACT_MINOR_BYTES,
|
||||||
|
n -> Compression.Algorithm.valueOf(n.toUpperCase()), getCompactionCompressionType());
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Compression types supported in hbase. LZO is not bundled as part of the
|
* Compression types supported in hbase. LZO is not bundled as part of the
|
||||||
* hbase distribution. See
|
* hbase distribution. See
|
||||||
|
@ -866,6 +892,16 @@ public class ColumnFamilyDescriptorBuilder {
|
||||||
return setValue(COMPRESSION_COMPACT_BYTES, type.name());
|
return setValue(COMPRESSION_COMPACT_BYTES, type.name());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public ModifyableColumnFamilyDescriptor setMajorCompactionCompressionType(
|
||||||
|
Compression.Algorithm type) {
|
||||||
|
return setValue(COMPRESSION_COMPACT_MAJOR_BYTES, type.name());
|
||||||
|
}
|
||||||
|
|
||||||
|
public ModifyableColumnFamilyDescriptor setMinorCompactionCompressionType(
|
||||||
|
Compression.Algorithm type) {
|
||||||
|
return setValue(COMPRESSION_COMPACT_MINOR_BYTES, type.name());
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean isInMemory() {
|
public boolean isInMemory() {
|
||||||
return getStringOrDefault(IN_MEMORY_BYTES, Boolean::valueOf, DEFAULT_IN_MEMORY);
|
return getStringOrDefault(IN_MEMORY_BYTES, Boolean::valueOf, DEFAULT_IN_MEMORY);
|
||||||
|
|
|
@ -83,9 +83,10 @@ public class DefaultMobStoreCompactor extends DefaultCompactor {
|
||||||
@Override
|
@Override
|
||||||
public StoreFileWriter createWriter(InternalScanner scanner,
|
public StoreFileWriter createWriter(InternalScanner scanner,
|
||||||
org.apache.hadoop.hbase.regionserver.compactions.Compactor.FileDetails fd,
|
org.apache.hadoop.hbase.regionserver.compactions.Compactor.FileDetails fd,
|
||||||
boolean shouldDropBehind) throws IOException {
|
boolean shouldDropBehind, boolean major) throws IOException {
|
||||||
// make this writer with tags always because of possible new cells with tags.
|
// make this writer with tags always because of possible new cells with tags.
|
||||||
return store.createWriterInTmp(fd.maxKeyCount, compactionCompression, true, true, true,
|
return store.createWriterInTmp(fd.maxKeyCount,
|
||||||
|
major ? majorCompactionCompression : minorCompactionCompression, true, true, true,
|
||||||
shouldDropBehind);
|
shouldDropBehind);
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
@ -197,7 +198,8 @@ public class DefaultMobStoreCompactor extends DefaultCompactor {
|
||||||
try {
|
try {
|
||||||
// If the mob file writer could not be created, directly write the cell to the store file.
|
// If the mob file writer could not be created, directly write the cell to the store file.
|
||||||
mobFileWriter = mobStore.createWriterInTmp(new Date(fd.latestPutTs), fd.maxKeyCount,
|
mobFileWriter = mobStore.createWriterInTmp(new Date(fd.latestPutTs), fd.maxKeyCount,
|
||||||
compactionCompression, store.getRegionInfo().getStartKey(), true);
|
major ? majorCompactionCompression : minorCompactionCompression,
|
||||||
|
store.getRegionInfo().getStartKey(), true);
|
||||||
fileName = Bytes.toBytes(mobFileWriter.getPath().getName());
|
fileName = Bytes.toBytes(mobFileWriter.getPath().getName());
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
LOG.warn("Failed to create mob writer, "
|
LOG.warn("Failed to create mob writer, "
|
||||||
|
@ -206,7 +208,8 @@ public class DefaultMobStoreCompactor extends DefaultCompactor {
|
||||||
if (major) {
|
if (major) {
|
||||||
try {
|
try {
|
||||||
delFileWriter = mobStore.createDelFileWriterInTmp(new Date(fd.latestPutTs),
|
delFileWriter = mobStore.createDelFileWriterInTmp(new Date(fd.latestPutTs),
|
||||||
fd.maxKeyCount, compactionCompression, store.getRegionInfo().getStartKey());
|
fd.maxKeyCount, major ? majorCompactionCompression : minorCompactionCompression,
|
||||||
|
store.getRegionInfo().getStartKey());
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
LOG.warn(
|
LOG.warn(
|
||||||
"Failed to create del writer, "
|
"Failed to create del writer, "
|
||||||
|
|
|
@ -47,17 +47,17 @@ public abstract class AbstractMultiOutputCompactor<T extends AbstractMultiFileWr
|
||||||
}
|
}
|
||||||
|
|
||||||
protected void initMultiWriter(AbstractMultiFileWriter writer, InternalScanner scanner,
|
protected void initMultiWriter(AbstractMultiFileWriter writer, InternalScanner scanner,
|
||||||
final FileDetails fd, final boolean shouldDropBehind) {
|
final FileDetails fd, final boolean shouldDropBehind, boolean major) {
|
||||||
WriterFactory writerFactory = new WriterFactory() {
|
WriterFactory writerFactory = new WriterFactory() {
|
||||||
@Override
|
@Override
|
||||||
public StoreFileWriter createWriter() throws IOException {
|
public StoreFileWriter createWriter() throws IOException {
|
||||||
return createTmpWriter(fd, shouldDropBehind);
|
return createTmpWriter(fd, shouldDropBehind, major);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public StoreFileWriter createWriterWithStoragePolicy(String fileStoragePolicy)
|
public StoreFileWriter createWriterWithStoragePolicy(String fileStoragePolicy)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
return createTmpWriter(fd, shouldDropBehind, fileStoragePolicy);
|
return createTmpWriter(fd, shouldDropBehind, fileStoragePolicy, major);
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
// Prepare multi-writer, and perform the compaction using scanner and writer.
|
// Prepare multi-writer, and perform the compaction using scanner and writer.
|
||||||
|
|
|
@ -75,7 +75,8 @@ public abstract class Compactor<T extends CellSink> {
|
||||||
protected final HStore store;
|
protected final HStore store;
|
||||||
|
|
||||||
protected final int compactionKVMax;
|
protected final int compactionKVMax;
|
||||||
protected final Compression.Algorithm compactionCompression;
|
protected final Compression.Algorithm majorCompactionCompression;
|
||||||
|
protected final Compression.Algorithm minorCompactionCompression;
|
||||||
|
|
||||||
/** specify how many days to keep MVCC values during major compaction **/
|
/** specify how many days to keep MVCC values during major compaction **/
|
||||||
protected int keepSeqIdPeriod;
|
protected int keepSeqIdPeriod;
|
||||||
|
@ -95,8 +96,10 @@ public abstract class Compactor<T extends CellSink> {
|
||||||
this.store = store;
|
this.store = store;
|
||||||
this.compactionKVMax =
|
this.compactionKVMax =
|
||||||
this.conf.getInt(HConstants.COMPACTION_KV_MAX, HConstants.COMPACTION_KV_MAX_DEFAULT);
|
this.conf.getInt(HConstants.COMPACTION_KV_MAX, HConstants.COMPACTION_KV_MAX_DEFAULT);
|
||||||
this.compactionCompression = (this.store.getColumnFamilyDescriptor() == null) ?
|
this.majorCompactionCompression = (store.getColumnFamilyDescriptor() == null) ?
|
||||||
Compression.Algorithm.NONE : this.store.getColumnFamilyDescriptor().getCompactionCompressionType();
|
Compression.Algorithm.NONE : store.getColumnFamilyDescriptor().getMajorCompactionCompressionType();
|
||||||
|
this.minorCompactionCompression = (store.getColumnFamilyDescriptor() == null) ?
|
||||||
|
Compression.Algorithm.NONE : store.getColumnFamilyDescriptor().getMinorCompactionCompressionType();
|
||||||
this.keepSeqIdPeriod = Math.max(this.conf.getInt(HConstants.KEEP_SEQID_PERIOD,
|
this.keepSeqIdPeriod = Math.max(this.conf.getInt(HConstants.KEEP_SEQID_PERIOD,
|
||||||
HConstants.MIN_KEEP_SEQID_PERIOD), HConstants.MIN_KEEP_SEQID_PERIOD);
|
HConstants.MIN_KEEP_SEQID_PERIOD), HConstants.MIN_KEEP_SEQID_PERIOD);
|
||||||
this.dropCacheMajor = conf.getBoolean(MAJOR_COMPACTION_DROP_CACHE, true);
|
this.dropCacheMajor = conf.getBoolean(MAJOR_COMPACTION_DROP_CACHE, true);
|
||||||
|
@ -106,7 +109,7 @@ public abstract class Compactor<T extends CellSink> {
|
||||||
|
|
||||||
|
|
||||||
protected interface CellSinkFactory<S> {
|
protected interface CellSinkFactory<S> {
|
||||||
S createWriter(InternalScanner scanner, FileDetails fd, boolean shouldDropBehind)
|
S createWriter(InternalScanner scanner, FileDetails fd, boolean shouldDropBehind, boolean major)
|
||||||
throws IOException;
|
throws IOException;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -138,10 +141,11 @@ public abstract class Compactor<T extends CellSink> {
|
||||||
* Extracts some details about the files to compact that are commonly needed by compactors.
|
* Extracts some details about the files to compact that are commonly needed by compactors.
|
||||||
* @param filesToCompact Files.
|
* @param filesToCompact Files.
|
||||||
* @param allFiles Whether all files are included for compaction
|
* @param allFiles Whether all files are included for compaction
|
||||||
|
* @parma major If major compaction
|
||||||
* @return The result.
|
* @return The result.
|
||||||
*/
|
*/
|
||||||
private FileDetails getFileDetails(
|
private FileDetails getFileDetails(
|
||||||
Collection<HStoreFile> filesToCompact, boolean allFiles) throws IOException {
|
Collection<HStoreFile> filesToCompact, boolean allFiles, boolean major) throws IOException {
|
||||||
FileDetails fd = new FileDetails();
|
FileDetails fd = new FileDetails();
|
||||||
long oldestHFileTimestampToKeepMVCC = System.currentTimeMillis() -
|
long oldestHFileTimestampToKeepMVCC = System.currentTimeMillis() -
|
||||||
(1000L * 60 * 60 * 24 * this.keepSeqIdPeriod);
|
(1000L * 60 * 60 * 24 * this.keepSeqIdPeriod);
|
||||||
|
@ -211,7 +215,7 @@ public abstract class Compactor<T extends CellSink> {
|
||||||
r.getBloomFilterType().toString(),
|
r.getBloomFilterType().toString(),
|
||||||
TraditionalBinaryPrefix.long2String(r.length(), "", 1),
|
TraditionalBinaryPrefix.long2String(r.length(), "", 1),
|
||||||
r.getHFileReader().getDataBlockEncoding(),
|
r.getHFileReader().getDataBlockEncoding(),
|
||||||
compactionCompression,
|
major ? majorCompactionCompression : minorCompactionCompression,
|
||||||
seqNum,
|
seqNum,
|
||||||
(allFiles? ", earliestPutTs=" + earliestPutTs: ""));
|
(allFiles? ", earliestPutTs=" + earliestPutTs: ""));
|
||||||
}
|
}
|
||||||
|
@ -262,21 +266,23 @@ public abstract class Compactor<T extends CellSink> {
|
||||||
* @return Writer for a new StoreFile in the tmp dir.
|
* @return Writer for a new StoreFile in the tmp dir.
|
||||||
* @throws IOException if creation failed
|
* @throws IOException if creation failed
|
||||||
*/
|
*/
|
||||||
protected final StoreFileWriter createTmpWriter(FileDetails fd, boolean shouldDropBehind)
|
protected final StoreFileWriter createTmpWriter(FileDetails fd, boolean shouldDropBehind, boolean major)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
// When all MVCC readpoints are 0, don't write them.
|
// When all MVCC readpoints are 0, don't write them.
|
||||||
// See HBASE-8166, HBASE-12600, and HBASE-13389.
|
// See HBASE-8166, HBASE-12600, and HBASE-13389.
|
||||||
return store
|
return store.createWriterInTmp(fd.maxKeyCount,
|
||||||
.createWriterInTmp(fd.maxKeyCount, this.compactionCompression, true, fd.maxMVCCReadpoint > 0,
|
major ? majorCompactionCompression : minorCompactionCompression,
|
||||||
|
true, fd.maxMVCCReadpoint > 0,
|
||||||
fd.maxTagsLength > 0, shouldDropBehind, fd.totalCompactedFilesSize,
|
fd.maxTagsLength > 0, shouldDropBehind, fd.totalCompactedFilesSize,
|
||||||
HConstants.EMPTY_STRING);
|
HConstants.EMPTY_STRING);
|
||||||
}
|
}
|
||||||
|
|
||||||
protected final StoreFileWriter createTmpWriter(FileDetails fd, boolean shouldDropBehind,
|
protected final StoreFileWriter createTmpWriter(FileDetails fd, boolean shouldDropBehind,
|
||||||
String fileStoragePolicy) throws IOException {
|
String fileStoragePolicy, boolean major) throws IOException {
|
||||||
return store
|
return store.createWriterInTmp(fd.maxKeyCount,
|
||||||
.createWriterInTmp(fd.maxKeyCount, this.compactionCompression, true, fd.maxMVCCReadpoint > 0,
|
major ? majorCompactionCompression : minorCompactionCompression,
|
||||||
fd.maxTagsLength > 0, shouldDropBehind, fd.totalCompactedFilesSize, fileStoragePolicy);
|
true, fd.maxMVCCReadpoint > 0,
|
||||||
|
fd.maxTagsLength > 0, shouldDropBehind, fd.totalCompactedFilesSize, fileStoragePolicy);
|
||||||
}
|
}
|
||||||
|
|
||||||
private ScanInfo preCompactScannerOpen(CompactionRequestImpl request, ScanType scanType,
|
private ScanInfo preCompactScannerOpen(CompactionRequestImpl request, ScanType scanType,
|
||||||
|
@ -307,7 +313,7 @@ public abstract class Compactor<T extends CellSink> {
|
||||||
protected final List<Path> compact(final CompactionRequestImpl request,
|
protected final List<Path> compact(final CompactionRequestImpl request,
|
||||||
InternalScannerFactory scannerFactory, CellSinkFactory<T> sinkFactory,
|
InternalScannerFactory scannerFactory, CellSinkFactory<T> sinkFactory,
|
||||||
ThroughputController throughputController, User user) throws IOException {
|
ThroughputController throughputController, User user) throws IOException {
|
||||||
FileDetails fd = getFileDetails(request.getFiles(), request.isAllFiles());
|
FileDetails fd = getFileDetails(request.getFiles(), request.isAllFiles(), request.isMajor());
|
||||||
this.progress = new CompactionProgress(fd.maxKeyCount);
|
this.progress = new CompactionProgress(fd.maxKeyCount);
|
||||||
|
|
||||||
// Find the smallest read point across all the Scanners.
|
// Find the smallest read point across all the Scanners.
|
||||||
|
@ -337,7 +343,7 @@ public abstract class Compactor<T extends CellSink> {
|
||||||
smallestReadPoint = Math.min(fd.minSeqIdToKeep, smallestReadPoint);
|
smallestReadPoint = Math.min(fd.minSeqIdToKeep, smallestReadPoint);
|
||||||
cleanSeqId = true;
|
cleanSeqId = true;
|
||||||
}
|
}
|
||||||
writer = sinkFactory.createWriter(scanner, fd, dropCache);
|
writer = sinkFactory.createWriter(scanner, fd, dropCache, request.isMajor());
|
||||||
finished = performCompaction(fd, scanner, writer, smallestReadPoint, cleanSeqId,
|
finished = performCompaction(fd, scanner, writer, smallestReadPoint, cleanSeqId,
|
||||||
throughputController, request.isAllFiles(), request.getFiles().size());
|
throughputController, request.isAllFiles(), request.getFiles().size());
|
||||||
if (!finished) {
|
if (!finished) {
|
||||||
|
|
|
@ -68,11 +68,11 @@ public class DateTieredCompactor extends AbstractMultiOutputCompactor<DateTiered
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public DateTieredMultiFileWriter createWriter(InternalScanner scanner, FileDetails fd,
|
public DateTieredMultiFileWriter createWriter(InternalScanner scanner, FileDetails fd,
|
||||||
boolean shouldDropBehind) throws IOException {
|
boolean shouldDropBehind, boolean major) throws IOException {
|
||||||
DateTieredMultiFileWriter writer = new DateTieredMultiFileWriter(lowerBoundaries,
|
DateTieredMultiFileWriter writer = new DateTieredMultiFileWriter(lowerBoundaries,
|
||||||
lowerBoundariesPolicies,
|
lowerBoundariesPolicies,
|
||||||
needEmptyFile(request));
|
needEmptyFile(request));
|
||||||
initMultiWriter(writer, scanner, fd, shouldDropBehind);
|
initMultiWriter(writer, scanner, fd, shouldDropBehind, major);
|
||||||
return writer;
|
return writer;
|
||||||
}
|
}
|
||||||
}, throughputController, user);
|
}, throughputController, user);
|
||||||
|
|
|
@ -52,8 +52,8 @@ public class DefaultCompactor extends Compactor<StoreFileWriter> {
|
||||||
@Override
|
@Override
|
||||||
public StoreFileWriter createWriter(InternalScanner scanner,
|
public StoreFileWriter createWriter(InternalScanner scanner,
|
||||||
org.apache.hadoop.hbase.regionserver.compactions.Compactor.FileDetails fd,
|
org.apache.hadoop.hbase.regionserver.compactions.Compactor.FileDetails fd,
|
||||||
boolean shouldDropBehind) throws IOException {
|
boolean shouldDropBehind, boolean major) throws IOException {
|
||||||
return createTmpWriter(fd, shouldDropBehind);
|
return createTmpWriter(fd, shouldDropBehind, major);
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
|
@ -93,10 +93,10 @@ public class StripeCompactor extends AbstractMultiOutputCompactor<StripeMultiFil
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public StripeMultiFileWriter createWriter(InternalScanner scanner, FileDetails fd,
|
public StripeMultiFileWriter createWriter(InternalScanner scanner, FileDetails fd,
|
||||||
boolean shouldDropBehind) throws IOException {
|
boolean shouldDropBehind, boolean major) throws IOException {
|
||||||
StripeMultiFileWriter writer = new StripeMultiFileWriter.BoundaryMultiWriter(
|
StripeMultiFileWriter writer = new StripeMultiFileWriter.BoundaryMultiWriter(
|
||||||
store.getComparator(), targetBoundaries, majorRangeFromRow, majorRangeToRow);
|
store.getComparator(), targetBoundaries, majorRangeFromRow, majorRangeToRow);
|
||||||
initMultiWriter(writer, scanner, fd, shouldDropBehind);
|
initMultiWriter(writer, scanner, fd, shouldDropBehind, major);
|
||||||
return writer;
|
return writer;
|
||||||
}
|
}
|
||||||
}, throughputController, user);
|
}, throughputController, user);
|
||||||
|
@ -115,10 +115,10 @@ public class StripeCompactor extends AbstractMultiOutputCompactor<StripeMultiFil
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public StripeMultiFileWriter createWriter(InternalScanner scanner, FileDetails fd,
|
public StripeMultiFileWriter createWriter(InternalScanner scanner, FileDetails fd,
|
||||||
boolean shouldDropBehind) throws IOException {
|
boolean shouldDropBehind, boolean major) throws IOException {
|
||||||
StripeMultiFileWriter writer = new StripeMultiFileWriter.SizeMultiWriter(
|
StripeMultiFileWriter writer = new StripeMultiFileWriter.SizeMultiWriter(
|
||||||
store.getComparator(), targetCount, targetSize, left, right);
|
store.getComparator(), targetCount, targetSize, left, right);
|
||||||
initMultiWriter(writer, scanner, fd, shouldDropBehind);
|
initMultiWriter(writer, scanner, fd, shouldDropBehind, major);
|
||||||
return writer;
|
return writer;
|
||||||
}
|
}
|
||||||
}, throughputController, user);
|
}, throughputController, user);
|
||||||
|
|
|
@ -293,6 +293,8 @@ public final class TableDescriptorChecker {
|
||||||
for (ColumnFamilyDescriptor cfd : td.getColumnFamilies()) {
|
for (ColumnFamilyDescriptor cfd : td.getColumnFamilies()) {
|
||||||
CompressionTest.testCompression(cfd.getCompressionType());
|
CompressionTest.testCompression(cfd.getCompressionType());
|
||||||
CompressionTest.testCompression(cfd.getCompactionCompressionType());
|
CompressionTest.testCompression(cfd.getCompactionCompressionType());
|
||||||
|
CompressionTest.testCompression(cfd.getMajorCompactionCompressionType());
|
||||||
|
CompressionTest.testCompression(cfd.getMinorCompactionCompressionType());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1069,6 +1069,22 @@ module Hbase
|
||||||
raise(ArgumentError, "Compression #{compression} is not supported. Use one of " + org.apache.hadoop.hbase.io.compress.Compression::Algorithm.constants.join(' '))
|
raise(ArgumentError, "Compression #{compression} is not supported. Use one of " + org.apache.hadoop.hbase.io.compress.Compression::Algorithm.constants.join(' '))
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
if arg.include?(org.apache.hadoop.hbase.HColumnDescriptor::COMPRESSION_COMPACT_MAJOR)
|
||||||
|
compression = arg.delete(org.apache.hadoop.hbase.HColumnDescriptor::COMPRESSION_COMPACT_MAJPR).upcase.to_sym
|
||||||
|
if org.apache.hadoop.hbase.io.compress.Compression::Algorithm.constants.include?(compression)
|
||||||
|
family.setMajorCompactionCompressionType(org.apache.hadoop.hbase.io.compress.Compression::Algorithm.valueOf(compression))
|
||||||
|
else
|
||||||
|
raise(ArgumentError, "Compression #{compression} is not supported. Use one of " + org.apache.hadoop.hbase.io.compress.Compression::Algorithm.constants.join(' '))
|
||||||
|
end
|
||||||
|
end
|
||||||
|
if arg.include?(org.apache.hadoop.hbase.HColumnDescriptor::COMPRESSION_COMPACT_MINOR)
|
||||||
|
compression = arg.delete(org.apache.hadoop.hbase.HColumnDescriptor::COMPRESSION_COMPACT_MINOR).upcase.to_sym
|
||||||
|
if org.apache.hadoop.hbase.io.compress.Compression::Algorithm.constants.include?(compression)
|
||||||
|
family.setMinorCompactionCompressionType(org.apache.hadoop.hbase.io.compress.Compression::Algorithm.valueOf(compression))
|
||||||
|
else
|
||||||
|
raise(ArgumentError, "Compression #{compression} is not supported. Use one of " + org.apache.hadoop.hbase.io.compress.Compression::Algorithm.constants.join(' '))
|
||||||
|
end
|
||||||
|
end
|
||||||
if arg.include?(org.apache.hadoop.hbase.HColumnDescriptor::STORAGE_POLICY)
|
if arg.include?(org.apache.hadoop.hbase.HColumnDescriptor::STORAGE_POLICY)
|
||||||
storage_policy = arg.delete(org.apache.hadoop.hbase.HColumnDescriptor::STORAGE_POLICY).upcase
|
storage_policy = arg.delete(org.apache.hadoop.hbase.HColumnDescriptor::STORAGE_POLICY).upcase
|
||||||
family.setStoragePolicy(storage_policy)
|
family.setStoragePolicy(storage_policy)
|
||||||
|
|
Loading…
Reference in New Issue