HBASE-17072 CPU usage starts to climb up to 90-100% when using G1GC
Removes ThreadLocal. Uses AtomicReference instead (based on patch posted up in HBASE-10676 "Removing ThreadLocal of PrefetchedHeader in HFileBlock.FSReaderV2 make higher perforamce of scan") Signed-off-by: Michael Stack <stack@apache.org>
This commit is contained in:
parent
bbb81a7ac3
commit
3d5e686070
|
@ -23,6 +23,7 @@ import java.io.DataOutputStream;
|
|||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import java.util.concurrent.locks.Lock;
|
||||
import java.util.concurrent.locks.ReentrantLock;
|
||||
|
||||
|
@ -1366,13 +1367,20 @@ public class HFileBlock implements Cacheable {
|
|||
}
|
||||
|
||||
/**
|
||||
* We always prefetch the header of the next block, so that we know its
|
||||
* on-disk size in advance and can read it in one operation.
|
||||
* Data-structure to use caching the header of the NEXT block. Only works if next read
|
||||
* that comes in here is next in sequence in this block.
|
||||
*
|
||||
* When we read, we read current block and the next blocks' header. We do this so we have
|
||||
* the length of the next block to read if the hfile index is not available (rare).
|
||||
* TODO: Review!! This trick of reading next blocks header is a pain, complicates our
|
||||
* read path and I don't think it needed given it rare we don't have the block index
|
||||
* (it is 'normally' present, gotten from the hfile index). FIX!!!
|
||||
*/
|
||||
private static class PrefetchedHeader {
|
||||
long offset = -1;
|
||||
byte [] header = new byte[HConstants.HFILEBLOCK_HEADER_SIZE];
|
||||
final ByteBuffer buf = ByteBuffer.wrap(header, 0, HConstants.HFILEBLOCK_HEADER_SIZE);
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "offset=" + this.offset + ", header=" + Bytes.toStringBinary(header);
|
||||
|
@ -1393,19 +1401,13 @@ public class HFileBlock implements Cacheable {
|
|||
private final HFileBlockDefaultDecodingContext defaultDecodingCtx;
|
||||
|
||||
/**
|
||||
* When we read a block, we overread and pull in the next blocks header too. We will save it
|
||||
* here. If moving serially through the file, we will trip over this caching of the next blocks
|
||||
* header so we won't have to do explicit seek to find next blocks lengths, etc.
|
||||
* Cache of the NEXT header after this. Check it is indeed next blocks header
|
||||
* before using it. TODO: Review. This overread into next block to fetch
|
||||
* next blocks header seems unnecessary given we usually get the block size
|
||||
* from the hfile index. Review!
|
||||
*/
|
||||
private ThreadLocal<PrefetchedHeader> prefetchedHeaderForThread =
|
||||
new ThreadLocal<PrefetchedHeader>() {
|
||||
@Override
|
||||
public PrefetchedHeader initialValue() {
|
||||
return new PrefetchedHeader();
|
||||
}
|
||||
};
|
||||
|
||||
/** Compression algorithm used by the {@link HFile} */
|
||||
private AtomicReference<PrefetchedHeader> prefetchedHeader =
|
||||
new AtomicReference<PrefetchedHeader>(new PrefetchedHeader());
|
||||
|
||||
/** The size of the file we are reading from, or -1 if unknown. */
|
||||
protected long fileSize;
|
||||
|
@ -1455,13 +1457,17 @@ public class HFileBlock implements Cacheable {
|
|||
final FSReader owner = this; // handle for inner class
|
||||
return new BlockIterator() {
|
||||
private long offset = startOffset;
|
||||
// Cache length of next block. Current block has the length of next block in it.
|
||||
private long length = -1;
|
||||
|
||||
@Override
|
||||
public HFileBlock nextBlock() throws IOException {
|
||||
if (offset >= endOffset)
|
||||
if (offset >= endOffset) {
|
||||
return null;
|
||||
HFileBlock b = readBlockData(offset, -1, false);
|
||||
}
|
||||
HFileBlock b = readBlockData(offset, length, false);
|
||||
offset += b.getOnDiskSizeWithHeader();
|
||||
length = b.getNextBlockOnDiskSize();
|
||||
return b.unpack(fileContext, owner);
|
||||
}
|
||||
|
||||
|
@ -1546,7 +1552,8 @@ public class HFileBlock implements Cacheable {
|
|||
*
|
||||
* @param offset the offset in the stream to read at
|
||||
* @param onDiskSizeWithHeaderL the on-disk size of the block, including
|
||||
* the header, or -1 if unknown
|
||||
* the header, or -1 if unknown; i.e. when iterating over blocks reading
|
||||
* in the file metadata info.
|
||||
* @param pread whether to use a positional read
|
||||
*/
|
||||
@Override
|
||||
|
@ -1630,31 +1637,6 @@ public class HFileBlock implements Cacheable {
|
|||
return (int)onDiskSizeWithHeaderL;
|
||||
}
|
||||
|
||||
/**
|
||||
* Check threadlocal cache for this block's header; we usually read it on the tail of reading
|
||||
* the previous block to save a seek. Otherwise, we have to do a seek to read the header before
|
||||
* we can pull in the block.
|
||||
* @return The cached block header or null if not found.
|
||||
* @see #cacheNextBlockHeader(long, byte[], int, int)
|
||||
*/
|
||||
private ByteBuffer getCachedHeader(final long offset) {
|
||||
PrefetchedHeader prefetchedHeader = prefetchedHeaderForThread.get();
|
||||
// PrefetchedHeader prefetchedHeader = prefetchedHeaderForThread.get();
|
||||
return prefetchedHeader != null && prefetchedHeader.offset == offset?
|
||||
prefetchedHeader.buf: null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Save away the next blocks header in thread local.
|
||||
* @see #getCachedHeader(long)
|
||||
*/
|
||||
private void cacheNextBlockHeader(final long nextBlockOffset,
|
||||
final byte [] header, final int headerOffset, final int headerLength) {
|
||||
PrefetchedHeader prefetchedHeader = prefetchedHeaderForThread.get();
|
||||
prefetchedHeader.offset = nextBlockOffset;
|
||||
System.arraycopy(header, headerOffset, prefetchedHeader.header, 0, headerLength);
|
||||
}
|
||||
|
||||
/**
|
||||
* Verify the passed in onDiskSizeWithHeader aligns with what is in the header else something
|
||||
* is not right.
|
||||
|
@ -1671,24 +1653,59 @@ public class HFileBlock implements Cacheable {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Check atomic reference cache for this block's header. Cache only good if next
|
||||
* read coming through is next in sequence in the block. We read next block's
|
||||
* header on the tail of reading the previous block to save a seek. Otherwise,
|
||||
* we have to do a seek to read the header before we can pull in the block OR
|
||||
* we have to backup the stream because we over-read (the next block's header).
|
||||
* @see PrefetchedHeader
|
||||
* @return The cached block header or null if not found.
|
||||
* @see #cacheNextBlockHeader(long, byte[], int, int)
|
||||
*/
|
||||
private ByteBuffer getCachedHeader(final long offset) {
|
||||
PrefetchedHeader ph = this.prefetchedHeader.get();
|
||||
return ph != null && ph.offset == offset? ph.buf: null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Save away the next blocks header in atomic reference.
|
||||
* @see #getCachedHeader(long)
|
||||
* @see PrefetchedHeader
|
||||
*/
|
||||
private void cacheNextBlockHeader(final long offset,
|
||||
final byte [] header, final int headerOffset, final int headerLength) {
|
||||
PrefetchedHeader ph = new PrefetchedHeader();
|
||||
ph.offset = offset;
|
||||
System.arraycopy(header, headerOffset, ph.header, 0, headerLength);
|
||||
this.prefetchedHeader.set(ph);
|
||||
}
|
||||
|
||||
/**
|
||||
* Reads a version 2 block.
|
||||
*
|
||||
* @param offset the offset in the stream to read at
|
||||
* @param offset the offset in the stream to read at. Usually the
|
||||
* @param onDiskSizeWithHeaderL the on-disk size of the block, including
|
||||
* the header and checksums if present or -1 if unknown
|
||||
* the header and checksums if present or -1 if unknown (as a long). Can be -1
|
||||
* if we are doing raw iteration of blocks as when loading up file metadata; i.e.
|
||||
* the first read of a new file (TODO: Fix! See HBASE-17072). Usually non-null gotten
|
||||
* from the file index.
|
||||
* @param pread whether to use a positional read
|
||||
* @param verifyChecksum Whether to use HBase checksums.
|
||||
* If HBase checksum is switched off, then use HDFS checksum.
|
||||
* @return the HFileBlock or null if there is a HBase checksum mismatch
|
||||
*/
|
||||
protected HFileBlock readBlockDataInternal(FSDataInputStream is, long offset,
|
||||
long onDiskSizeWithHeaderL, boolean pread, boolean verifyChecksum) throws IOException {
|
||||
long onDiskSizeWithHeaderL, boolean pread, boolean verifyChecksum)
|
||||
throws IOException {
|
||||
if (offset < 0) {
|
||||
throw new IOException("Invalid offset=" + offset + " trying to read "
|
||||
+ "block (onDiskSize=" + onDiskSizeWithHeaderL + ")");
|
||||
}
|
||||
int onDiskSizeWithHeader = checkAndGetSizeAsInt(onDiskSizeWithHeaderL, hdrSize);
|
||||
// Try and get cached header. Will serve us in rare case where onDiskSizeWithHeaderL is -1
|
||||
// and will save us having to seek the stream backwards to reread the header we
|
||||
// read the last time through here.
|
||||
ByteBuffer headerBuf = getCachedHeader(offset);
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace("Reading " + this.fileContext.getHFileName() + " at offset=" + offset +
|
||||
|
@ -1697,10 +1714,15 @@ public class HFileBlock implements Cacheable {
|
|||
}
|
||||
if (onDiskSizeWithHeader <= 0) {
|
||||
// We were not passed the block size. Need to get it from the header. If header was not in
|
||||
// cache, need to seek to pull it in. This latter might happen when we are doing the first
|
||||
// read in a series of reads or a random read, and we don't have access to the block index.
|
||||
// This is costly and should happen very rarely.
|
||||
// cache, need to seek to pull it in. This is costly and should happen very rarely.
|
||||
// Currently happens on open of a hfile reader where we read the trailer blocks for
|
||||
// indices. Otherwise, we are reading block sizes out of the hfile index. To check,
|
||||
// enable TRACE in this file and you'll get an exception in a LOG every time we seek.
|
||||
// See HBASE-17072 for more detail.
|
||||
if (headerBuf == null) {
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace("Extra see to get block size!", new RuntimeException());
|
||||
}
|
||||
headerBuf = ByteBuffer.allocate(hdrSize);
|
||||
readAtOffset(is, headerBuf.array(), headerBuf.arrayOffset(), hdrSize, false,
|
||||
offset, pread);
|
||||
|
@ -1711,7 +1733,11 @@ public class HFileBlock implements Cacheable {
|
|||
int preReadHeaderSize = headerBuf == null? 0 : hdrSize;
|
||||
// Allocate enough space to fit the next block's header too; saves a seek next time through.
|
||||
// onDiskBlock is whole block + header + checksums then extra hdrSize to read next header;
|
||||
// onDiskSizeWithHeader is header, body, and any checksums if present.
|
||||
// onDiskSizeWithHeader is header, body, and any checksums if present. preReadHeaderSize
|
||||
// says where to start reading. If we have the header cached, then we don't need to read
|
||||
// it again and we can likely read from last place we left off w/o need to backup and reread
|
||||
// the header we read last time through here. TODO: Review this overread of the header. Is it necessary
|
||||
// when we get the block size from the hfile index? See note on PrefetchedHeader class above.
|
||||
// TODO: Make this ByteBuffer-based. Will make it easier to go to HDFS with BBPool (offheap).
|
||||
byte [] onDiskBlock = new byte[onDiskSizeWithHeader + hdrSize];
|
||||
int nextBlockOnDiskSize = readAtOffset(is, onDiskBlock, preReadHeaderSize,
|
||||
|
|
|
@ -514,7 +514,7 @@ public class HFileBlockIndex {
|
|||
* blocks at all other levels will be cached in the LRU cache in practice,
|
||||
* although this API does not enforce that.
|
||||
*
|
||||
* All non-root (leaf and intermediate) index blocks contain what we call a
|
||||
* <p>All non-root (leaf and intermediate) index blocks contain what we call a
|
||||
* "secondary index": an array of offsets to the entries within the block.
|
||||
* This allows us to do binary search for the entry corresponding to the
|
||||
* given key without having to deserialize the block.
|
||||
|
@ -583,17 +583,12 @@ public class HFileBlockIndex {
|
|||
}
|
||||
|
||||
/**
|
||||
* Return the BlockWithScanInfo which contains the DataBlock with other scan
|
||||
* info such as nextIndexedKey. This function will only be called when the
|
||||
* HFile version is larger than 1.
|
||||
* Return the BlockWithScanInfo, a data structure which contains the Data HFileBlock with
|
||||
* other scan info such as the key that starts the next HFileBlock. This function will only
|
||||
* be called when the HFile version is larger than 1.
|
||||
*
|
||||
* @param key
|
||||
* the key we are looking for
|
||||
* @param currentBlock
|
||||
* the current block, to avoid re-reading the same block
|
||||
* @param cacheBlocks
|
||||
* @param pread
|
||||
* @param isCompaction
|
||||
* @param key the key we are looking for
|
||||
* @param currentBlock the current block, to avoid re-reading the same block
|
||||
* @param expectedDataBlockEncoding the data block encoding the caller is
|
||||
* expecting the data block to be in, or null to not perform this
|
||||
* check and return the block irrespective of the encoding.
|
||||
|
|
Loading…
Reference in New Issue