HBASE-16454 Compactor's shipping logic decision should be based on the
current store's block size (Ram)
This commit is contained in:
parent
2aae923c32
commit
ff6182b020
|
@ -160,12 +160,13 @@ public class DefaultMobStoreCompactor extends DefaultCompactor {
|
|||
* @param cleanSeqId When true, remove seqId(used to be mvcc) value which is <= smallestReadPoint
|
||||
* @param throughputController The compaction throughput controller.
|
||||
* @param major Is a major compaction.
|
||||
* @param numofFilesToCompact the number of files to compact
|
||||
* @return Whether compaction ended; false if it was interrupted for any reason.
|
||||
*/
|
||||
@Override
|
||||
protected boolean performCompaction(FileDetails fd, InternalScanner scanner, CellSink writer,
|
||||
long smallestReadPoint, boolean cleanSeqId,
|
||||
ThroughputController throughputController, boolean major) throws IOException {
|
||||
long smallestReadPoint, boolean cleanSeqId, ThroughputController throughputController,
|
||||
boolean major, int numofFilesToCompact) throws IOException {
|
||||
if (!(scanner instanceof MobCompactionStoreScanner)) {
|
||||
throw new IllegalArgumentException(
|
||||
"The scanner should be an instance of MobCompactionStoreScanner");
|
||||
|
|
|
@ -310,7 +310,7 @@ public abstract class Compactor<T extends CellSink> {
|
|||
}
|
||||
writer = sinkFactory.createWriter(scanner, fd, store.throttleCompaction(request.getSize()));
|
||||
finished = performCompaction(fd, scanner, writer, smallestReadPoint, cleanSeqId,
|
||||
throughputController, request.isAllFiles());
|
||||
throughputController, request.isAllFiles(), request.getFiles().size());
|
||||
if (!finished) {
|
||||
throw new InterruptedIOException("Aborting compaction of store " + store + " in region "
|
||||
+ store.getRegionInfo().getRegionNameAsString() + " because it was interrupted.");
|
||||
|
@ -385,11 +385,12 @@ public abstract class Compactor<T extends CellSink> {
|
|||
* @param cleanSeqId When true, remove seqId(used to be mvcc) value which is <=
|
||||
* smallestReadPoint
|
||||
* @param major Is a major compaction.
|
||||
* @param numofFilesToCompact the number of files to compact
|
||||
* @return Whether compaction ended; false if it was interrupted for some reason.
|
||||
*/
|
||||
protected boolean performCompaction(FileDetails fd, InternalScanner scanner, CellSink writer,
|
||||
long smallestReadPoint, boolean cleanSeqId,
|
||||
ThroughputController throughputController, boolean major) throws IOException {
|
||||
long smallestReadPoint, boolean cleanSeqId, ThroughputController throughputController,
|
||||
boolean major, int numofFilesToCompact) throws IOException {
|
||||
long bytesWrittenProgressForCloseCheck = 0;
|
||||
long bytesWrittenProgressForLog = 0;
|
||||
long bytesWrittenProgressForShippedCall = 0;
|
||||
|
@ -409,10 +410,7 @@ public abstract class Compactor<T extends CellSink> {
|
|||
|
||||
throughputController.start(compactionName);
|
||||
KeyValueScanner kvs = (scanner instanceof KeyValueScanner)? (KeyValueScanner)scanner : null;
|
||||
long minFilesToCompact = Math.max(2L,
|
||||
conf.getInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MIN_KEY,
|
||||
/* old name */ conf.getInt("hbase.hstore.compactionThreshold", 3)));
|
||||
long shippedCallSizeLimit = (long) minFilesToCompact * HConstants.DEFAULT_BLOCKSIZE;
|
||||
long shippedCallSizeLimit = (long) numofFilesToCompact * this.store.getFamily().getBlocksize();
|
||||
try {
|
||||
do {
|
||||
hasMore = scanner.next(cells, scannerContext);
|
||||
|
|
Loading…
Reference in New Issue