HBASE-17072 CPU usage starts to climb up to 90-100% when using G1GC

Removes ThreadLocal. Uses AtomicReference instead (based on patch
posted up in HBASE-10676 "Removing ThreadLocal of PrefetchedHeader in
HFileBlock.FSReaderV2 make higher perforamce of scan")

Signed-off-by: Michael Stack <stack@apache.org>
This commit is contained in:
Michael Stack 2016-11-28 11:27:30 -08:00
parent 10c070825f
commit 987205caf9
2 changed files with 71 additions and 46 deletions

View File

@ -24,6 +24,7 @@ import java.io.DataOutputStream;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantLock;
@ -1393,13 +1394,17 @@ public class HFileBlock implements Cacheable {
final FSReader owner = this; // handle for inner class final FSReader owner = this; // handle for inner class
return new BlockIterator() { return new BlockIterator() {
private long offset = startOffset; private long offset = startOffset;
// Cache length of next block. Current block has the length of next block in it.
private long length = -1;
@Override @Override
public HFileBlock nextBlock() throws IOException { public HFileBlock nextBlock() throws IOException {
if (offset >= endOffset) if (offset >= endOffset) {
return null; return null;
HFileBlock b = readBlockData(offset, -1, false); }
HFileBlock b = readBlockData(offset, length, false);
offset += b.getOnDiskSizeWithHeader(); offset += b.getOnDiskSizeWithHeader();
length = b.getNextBlockOnDiskSize();
return b.unpack(fileContext, owner); return b.unpack(fileContext, owner);
} }
@ -1481,13 +1486,20 @@ public class HFileBlock implements Cacheable {
} }
/** /**
* We always prefetch the header of the next block, so that we know its * Data-structure to use caching the header of the NEXT block. Only works if next read
* on-disk size in advance and can read it in one operation. * that comes in here is next in sequence in this block.
*/ *
* When we read, we read current block and the next blocks' header. We do this so we have
* the length of the next block to read if the hfile index is not available (rare).
* TODO: Review!! This trick of reading next blocks header is a pain, complicates our
* read path and I don't think it needed given it rare we don't have the block index
* (it is 'normally' present, gotten from the hfile index). FIX!!!
*/
private static class PrefetchedHeader { private static class PrefetchedHeader {
long offset = -1; long offset = -1;
byte[] header = new byte[HConstants.HFILEBLOCK_HEADER_SIZE]; byte [] header = new byte[HConstants.HFILEBLOCK_HEADER_SIZE];
final ByteBuffer buf = ByteBuffer.wrap(header, 0, HConstants.HFILEBLOCK_HEADER_SIZE); final ByteBuffer buf = ByteBuffer.wrap(header, 0, HConstants.HFILEBLOCK_HEADER_SIZE);
@Override @Override
public String toString() { public String toString() {
return "offset=" + this.offset + ", header=" + Bytes.toStringBinary(header); return "offset=" + this.offset + ", header=" + Bytes.toStringBinary(header);
@ -1508,17 +1520,13 @@ public class HFileBlock implements Cacheable {
private final HFileBlockDefaultDecodingContext defaultDecodingCtx; private final HFileBlockDefaultDecodingContext defaultDecodingCtx;
/** /**
* When we read a block, we overread and pull in the next blocks header too. We will save it * Cache of the NEXT header after this. Check it is indeed next blocks header
* here. If moving serially through the file, we will trip over this caching of the next blocks * before using it. TODO: Review. This overread into next block to fetch
* header so we won't have to do explicit seek to find next blocks lengths, etc. * next blocks header seems unnecessary given we usually get the block size
* from the hfile index. Review!
*/ */
private ThreadLocal<PrefetchedHeader> prefetchedHeaderForThread = private AtomicReference<PrefetchedHeader> prefetchedHeader =
new ThreadLocal<PrefetchedHeader>() { new AtomicReference<PrefetchedHeader>(new PrefetchedHeader());
@Override
public PrefetchedHeader initialValue() {
return new PrefetchedHeader();
}
};
public FSReaderImpl(FSDataInputStreamWrapper stream, long fileSize, HFileSystem hfs, Path path, public FSReaderImpl(FSDataInputStreamWrapper stream, long fileSize, HFileSystem hfs, Path path,
HFileContext fileContext) throws IOException { HFileContext fileContext) throws IOException {
@ -1545,7 +1553,8 @@ public class HFileBlock implements Cacheable {
* *
* @param offset the offset in the stream to read at * @param offset the offset in the stream to read at
* @param onDiskSizeWithHeaderL the on-disk size of the block, including * @param onDiskSizeWithHeaderL the on-disk size of the block, including
* the header, or -1 if unknown * the header, or -1 if unknown; i.e. when iterating over blocks reading
* in the file metadata info.
* @param pread whether to use a positional read * @param pread whether to use a positional read
*/ */
@Override @Override
@ -1630,28 +1639,31 @@ public class HFileBlock implements Cacheable {
} }
/** /**
* Check threadlocal cache for this block's header; we usually read it on the tail of reading * Check atomic reference cache for this block's header. Cache only good if next
* the previous block to save a seek. Otherwise, we have to do a seek to read the header before * read coming through is next in sequence in the block. We read next block's
* we can pull in the block. * header on the tail of reading the previous block to save a seek. Otherwise,
* we have to do a seek to read the header before we can pull in the block OR
* we have to backup the stream because we over-read (the next block's header).
* @see PrefetchedHeader
* @return The cached block header or null if not found. * @return The cached block header or null if not found.
* @see #cacheNextBlockHeader(long, byte[], int, int) * @see #cacheNextBlockHeader(long, byte[], int, int)
*/ */
private ByteBuffer getCachedHeader(final long offset) { private ByteBuffer getCachedHeader(final long offset) {
PrefetchedHeader prefetchedHeader = prefetchedHeaderForThread.get(); PrefetchedHeader ph = this.prefetchedHeader.get();
// PrefetchedHeader prefetchedHeader = prefetchedHeaderForThread.get(); return ph != null && ph.offset == offset? ph.buf: null;
return prefetchedHeader != null && prefetchedHeader.offset == offset?
prefetchedHeader.buf: null;
} }
/** /**
* Save away the next blocks header in thread local. * Save away the next blocks header in atomic reference.
* @see #getCachedHeader(long) * @see #getCachedHeader(long)
* @see PrefetchedHeader
*/ */
private void cacheNextBlockHeader(final long nextBlockOffset, private void cacheNextBlockHeader(final long offset,
final byte [] header, final int headerOffset, final int headerLength) { final byte [] header, final int headerOffset, final int headerLength) {
PrefetchedHeader prefetchedHeader = prefetchedHeaderForThread.get(); PrefetchedHeader ph = new PrefetchedHeader();
prefetchedHeader.offset = nextBlockOffset; ph.offset = offset;
System.arraycopy(header, headerOffset, prefetchedHeader.header, 0, headerLength); System.arraycopy(header, headerOffset, ph.header, 0, headerLength);
this.prefetchedHeader.set(ph);
} }
/** /**
@ -1673,21 +1685,28 @@ public class HFileBlock implements Cacheable {
/** /**
* Reads a version 2 block. * Reads a version 2 block.
* *
* @param offset the offset in the stream to read at * @param offset the offset in the stream to read at. Usually the
* @param onDiskSizeWithHeaderL the on-disk size of the block, including * @param onDiskSizeWithHeaderL the on-disk size of the block, including
* the header and checksums if present or -1 if unknown * the header and checksums if present or -1 if unknown (as a long). Can be -1
* if we are doing raw iteration of blocks as when loading up file metadata; i.e.
* the first read of a new file (TODO: Fix! See HBASE-17072). Usually non-null gotten
* from the file index.
* @param pread whether to use a positional read * @param pread whether to use a positional read
* @param verifyChecksum Whether to use HBase checksums. * @param verifyChecksum Whether to use HBase checksums.
* If HBase checksum is switched off, then use HDFS checksum. * If HBase checksum is switched off, then use HDFS checksum.
* @return the HFileBlock or null if there is a HBase checksum mismatch * @return the HFileBlock or null if there is a HBase checksum mismatch
*/ */
protected HFileBlock readBlockDataInternal(FSDataInputStream is, long offset, protected HFileBlock readBlockDataInternal(FSDataInputStream is, long offset,
long onDiskSizeWithHeaderL, boolean pread, boolean verifyChecksum) throws IOException { long onDiskSizeWithHeaderL, boolean pread, boolean verifyChecksum)
throws IOException {
if (offset < 0) { if (offset < 0) {
throw new IOException("Invalid offset=" + offset + " trying to read " throw new IOException("Invalid offset=" + offset + " trying to read "
+ "block (onDiskSize=" + onDiskSizeWithHeaderL + ")"); + "block (onDiskSize=" + onDiskSizeWithHeaderL + ")");
} }
int onDiskSizeWithHeader = checkAndGetSizeAsInt(onDiskSizeWithHeaderL, hdrSize); int onDiskSizeWithHeader = checkAndGetSizeAsInt(onDiskSizeWithHeaderL, hdrSize);
// Try and get cached header. Will serve us in rare case where onDiskSizeWithHeaderL is -1
// and will save us having to seek the stream backwards to reread the header we
// read the last time through here.
ByteBuffer headerBuf = getCachedHeader(offset); ByteBuffer headerBuf = getCachedHeader(offset);
if (LOG.isTraceEnabled()) { if (LOG.isTraceEnabled()) {
LOG.trace("Reading " + this.fileContext.getHFileName() + " at offset=" + offset + LOG.trace("Reading " + this.fileContext.getHFileName() + " at offset=" + offset +
@ -1696,10 +1715,15 @@ public class HFileBlock implements Cacheable {
} }
if (onDiskSizeWithHeader <= 0) { if (onDiskSizeWithHeader <= 0) {
// We were not passed the block size. Need to get it from the header. If header was not in // We were not passed the block size. Need to get it from the header. If header was not in
// cache, need to seek to pull it in. This latter might happen when we are doing the first // cache, need to seek to pull it in. This is costly and should happen very rarely.
// read in a series of reads or a random read, and we don't have access to the block index. // Currently happens on open of a hfile reader where we read the trailer blocks for
// This is costly and should happen very rarely. // indices. Otherwise, we are reading block sizes out of the hfile index. To check,
// enable TRACE in this file and you'll get an exception in a LOG every time we seek.
// See HBASE-17072 for more detail.
if (headerBuf == null) { if (headerBuf == null) {
if (LOG.isTraceEnabled()) {
LOG.trace("Extra see to get block size!", new RuntimeException());
}
headerBuf = ByteBuffer.allocate(hdrSize); headerBuf = ByteBuffer.allocate(hdrSize);
readAtOffset(is, headerBuf.array(), headerBuf.arrayOffset(), hdrSize, false, readAtOffset(is, headerBuf.array(), headerBuf.arrayOffset(), hdrSize, false,
offset, pread); offset, pread);
@ -1711,9 +1735,12 @@ public class HFileBlock implements Cacheable {
int preReadHeaderSize = headerBuf == null? 0 : hdrSize; int preReadHeaderSize = headerBuf == null? 0 : hdrSize;
// Allocate enough space to fit the next block's header too; saves a seek next time through. // Allocate enough space to fit the next block's header too; saves a seek next time through.
// onDiskBlock is whole block + header + checksums then extra hdrSize to read next header; // onDiskBlock is whole block + header + checksums then extra hdrSize to read next header;
// onDiskSizeWithHeader is header, body, and any checksums if present. // onDiskSizeWithHeader is header, body, and any checksums if present. preReadHeaderSize
// TODO: Make this ByteBuffer-based. Will make it easier to go to HDFS with BBPool (offheap). // says where to start reading. If we have the header cached, then we don't need to read
byte[] onDiskBlock = new byte[onDiskSizeWithHeader + hdrSize]; // it again and we can likely read from last place we left off w/o need to backup and reread
// the header we read last time through here. TODO: Review this overread of the header. Is it necessary
// when we get the block size from the hfile index? See note on PrefetchedHeader class above.
byte [] onDiskBlock = new byte[onDiskSizeWithHeader + hdrSize];
int nextBlockOnDiskSize = readAtOffset(is, onDiskBlock, preReadHeaderSize, int nextBlockOnDiskSize = readAtOffset(is, onDiskBlock, preReadHeaderSize,
onDiskSizeWithHeader - preReadHeaderSize, true, offset + preReadHeaderSize, pread); onDiskSizeWithHeader - preReadHeaderSize, true, offset + preReadHeaderSize, pread);
if (headerBuf != null) { if (headerBuf != null) {

View File

@ -112,7 +112,7 @@ public class HFileBlockIndex {
* blocks at all other levels will be cached in the LRU cache in practice, * blocks at all other levels will be cached in the LRU cache in practice,
* although this API does not enforce that. * although this API does not enforce that.
* *
* All non-root (leaf and intermediate) index blocks contain what we call a * <p>All non-root (leaf and intermediate) index blocks contain what we call a
* "secondary index": an array of offsets to the entries within the block. * "secondary index": an array of offsets to the entries within the block.
* This allows us to do binary search for the entry corresponding to the * This allows us to do binary search for the entry corresponding to the
* given key without having to deserialize the block. * given key without having to deserialize the block.
@ -202,14 +202,12 @@ public class HFileBlockIndex {
} }
/** /**
* Return the BlockWithScanInfo which contains the DataBlock with other scan * Return the BlockWithScanInfo, a data structure which contains the Data HFileBlock with
* info such as nextIndexedKey. This function will only be called when the * other scan info such as the key that starts the next HFileBlock. This function will only
* HFile version is larger than 1. * be called when the HFile version is larger than 1.
* *
* @param key * @param key the key we are looking for
* the key we are looking for * @param currentBlock the current block, to avoid re-reading the same block
* @param currentBlock
* the current block, to avoid re-reading the same block
* @param cacheBlocks * @param cacheBlocks
* @param pread * @param pread
* @param isCompaction * @param isCompaction