HBASE-14636 Clear HFileScannerImpl#prevBlocks in between Compaction flow.
This commit is contained in:
parent
51693b9cde
commit
c9523a569d
|
@ -1937,6 +1937,13 @@ public class HFileBlock implements Cacheable {
|
|||
return this.memType;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return true if this block is backed by a shared memory area(such as that of a BucketCache).
|
||||
*/
|
||||
public boolean usesSharedMemory() {
|
||||
return this.memType == MemoryType.SHARED;
|
||||
}
|
||||
|
||||
/**
|
||||
* Convert the contents of the block header into a human readable string.
|
||||
* This is mostly helpful for debugging. This assumes that the block
|
||||
|
|
|
@ -510,14 +510,16 @@ public class HFileReaderImpl implements HFile.Reader, Configurable {
|
|||
block.getOffset() == this.curBlock.getOffset()) {
|
||||
return;
|
||||
}
|
||||
if (this.curBlock != null) {
|
||||
// We don't have to keep ref to EXCLUSIVE type of block
|
||||
if (this.curBlock != null && this.curBlock.usesSharedMemory()) {
|
||||
prevBlocks.add(this.curBlock);
|
||||
}
|
||||
this.curBlock = block;
|
||||
}
|
||||
|
||||
void reset() {
|
||||
if (this.curBlock != null) {
|
||||
// We don't have to keep ref to EXCLUSIVE type of block
|
||||
if (this.curBlock != null && this.curBlock.usesSharedMemory()) {
|
||||
this.prevBlocks.add(this.curBlock);
|
||||
}
|
||||
this.curBlock = null;
|
||||
|
|
|
@ -39,6 +39,7 @@ import org.apache.hadoop.hbase.io.hfile.HFile;
|
|||
import org.apache.hadoop.hbase.io.hfile.HFile.FileInfo;
|
||||
import org.apache.hadoop.hbase.regionserver.HStore;
|
||||
import org.apache.hadoop.hbase.regionserver.InternalScanner;
|
||||
import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
|
||||
import org.apache.hadoop.hbase.regionserver.ScanType;
|
||||
import org.apache.hadoop.hbase.regionserver.ScannerContext;
|
||||
import org.apache.hadoop.hbase.regionserver.Store;
|
||||
|
@ -58,13 +59,14 @@ import org.apache.hadoop.util.StringUtils.TraditionalBinaryPrefix;
|
|||
@InterfaceAudience.Private
|
||||
public abstract class Compactor {
|
||||
private static final Log LOG = LogFactory.getLog(Compactor.class);
|
||||
private static final long COMPACTION_PROGRESS_LOG_INTERVAL = 60 * 1000;
|
||||
protected CompactionProgress progress;
|
||||
protected Configuration conf;
|
||||
protected Store store;
|
||||
|
||||
protected int compactionKVMax;
|
||||
protected Compression.Algorithm compactionCompression;
|
||||
|
||||
|
||||
/** specify how many days to keep MVCC values during major compaction **/
|
||||
protected int keepSeqIdPeriod;
|
||||
|
||||
|
@ -272,12 +274,13 @@ public abstract class Compactor {
|
|||
protected boolean performCompaction(FileDetails fd, InternalScanner scanner, CellSink writer,
|
||||
long smallestReadPoint, boolean cleanSeqId,
|
||||
CompactionThroughputController throughputController, boolean major) throws IOException {
|
||||
long bytesWritten = 0;
|
||||
long bytesWrittenProgress = 0;
|
||||
long bytesWrittenProgressForCloseCheck = 0;
|
||||
long bytesWrittenProgressForLog = 0;
|
||||
long bytesWrittenProgressForShippedCall = 0;
|
||||
// Since scanner.next() can return 'false' but still be delivering data,
|
||||
// we have to use a do/while loop.
|
||||
List<Cell> cells = new ArrayList<Cell>();
|
||||
long closeCheckInterval = HStore.getCloseCheckInterval();
|
||||
long closeCheckSizeLimit = HStore.getCloseCheckInterval();
|
||||
long lastMillis = 0;
|
||||
if (LOG.isDebugEnabled()) {
|
||||
lastMillis = EnvironmentEdgeManager.currentTime();
|
||||
|
@ -289,6 +292,11 @@ public abstract class Compactor {
|
|||
ScannerContext.newBuilder().setBatchLimit(compactionKVMax).build();
|
||||
|
||||
throughputController.start(compactionName);
|
||||
KeyValueScanner kvs = (scanner instanceof KeyValueScanner)? (KeyValueScanner)scanner : null;
|
||||
int minFilesToCompact = Math.max(2,
|
||||
conf.getInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MIN_KEY,
|
||||
/* old name */ conf.getInt("hbase.hstore.compactionThreshold", 3)));
|
||||
long shippedCallSizeLimit = minFilesToCompact * HConstants.DEFAULT_BLOCKSIZE;
|
||||
try {
|
||||
do {
|
||||
hasMore = scanner.next(cells, scannerContext);
|
||||
|
@ -304,35 +312,46 @@ public abstract class Compactor {
|
|||
int len = KeyValueUtil.length(c);
|
||||
++progress.currentCompactedKVs;
|
||||
progress.totalCompactedSize += len;
|
||||
bytesWrittenProgressForShippedCall += len;
|
||||
if (LOG.isDebugEnabled()) {
|
||||
bytesWrittenProgress += len;
|
||||
bytesWrittenProgressForLog += len;
|
||||
}
|
||||
throughputController.control(compactionName, len);
|
||||
// check periodically to see if a system stop is requested
|
||||
if (closeCheckInterval > 0) {
|
||||
bytesWritten += len;
|
||||
if (bytesWritten > closeCheckInterval) {
|
||||
bytesWritten = 0;
|
||||
if (closeCheckSizeLimit > 0) {
|
||||
bytesWrittenProgressForCloseCheck += len;
|
||||
if (bytesWrittenProgressForCloseCheck > closeCheckSizeLimit) {
|
||||
bytesWrittenProgressForCloseCheck = 0;
|
||||
if (!store.areWritesEnabled()) {
|
||||
progress.cancel();
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
||||
if (kvs != null && bytesWrittenProgressForShippedCall > shippedCallSizeLimit) {
|
||||
// The SHARED block references, being read for compaction, will be kept in prevBlocks
|
||||
// list(See HFileScannerImpl#prevBlocks). In case of scan flow, after each set of cells
|
||||
// being returned to client, we will call shipped() which can clear this list. Here by
|
||||
// we are doing the similar thing. In between the compaction (after every N cells
|
||||
// written with collective size of 'shippedCallSizeLimit') we will call shipped which
|
||||
// may clear prevBlocks list.
|
||||
kvs.shipped();
|
||||
bytesWrittenProgressForShippedCall = 0;
|
||||
}
|
||||
}
|
||||
// Log the progress of long running compactions every minute if
|
||||
// logging at DEBUG level
|
||||
if (LOG.isDebugEnabled()) {
|
||||
if ((now - lastMillis) >= 60 * 1000) {
|
||||
if ((now - lastMillis) >= COMPACTION_PROGRESS_LOG_INTERVAL) {
|
||||
LOG.debug("Compaction progress: "
|
||||
+ compactionName
|
||||
+ " "
|
||||
+ progress
|
||||
+ String.format(", rate=%.2f kB/sec", (bytesWrittenProgress / 1024.0)
|
||||
+ String.format(", rate=%.2f kB/sec", (bytesWrittenProgressForLog / 1024.0)
|
||||
/ ((now - lastMillis) / 1000.0)) + ", throughputController is "
|
||||
+ throughputController);
|
||||
lastMillis = now;
|
||||
bytesWrittenProgress = 0;
|
||||
bytesWrittenProgressForLog = 0;
|
||||
}
|
||||
}
|
||||
cells.clear();
|
||||
|
|
Loading…
Reference in New Issue