HBASE-4219 Per Column Family Metrics

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1185835 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Nicolas Spiegelberg 2011-10-18 20:23:28 +00:00
parent 9e071332b2
commit a1a1b1b781
24 changed files with 686 additions and 61 deletions

View File

@ -696,6 +696,7 @@ Release 0.92.0 - Unreleased
HBASE-4292 Add a debugging dump servlet to the master and regionserver
(todd)
HBASE-4057 Implement HBase version of "show processlist" (Riley Patterson)
HBASE-4219 Per Column Family Metrics
Release 0.90.5 - Unreleased

View File

@ -78,8 +78,9 @@ public class HalfStoreFileReader extends StoreFile.Reader {
}
@Override
public HFileScanner getScanner(final boolean cacheBlocks, final boolean pread) {
final HFileScanner s = super.getScanner(cacheBlocks, pread);
public HFileScanner getScanner(final boolean cacheBlocks,
final boolean pread, final boolean isCompaction) {
final HFileScanner s = super.getScanner(cacheBlocks, pread, isCompaction);
return new HFileScanner() {
final HFileScanner delegate = s;
public boolean atEnd = false;

View File

@ -95,6 +95,19 @@ public abstract class AbstractHFileReader implements HFile.Reader {
/** Prefix of the form cf.<column_family_name> for statistics counters. */
private final String cfStatsPrefix;
// various metrics that we want to track on a per-cf basis
public String fsReadTimeNanoMetric = "";
public String compactionReadTimeNanoMetric = "";
public String fsBlockReadCntMetric = "";
public String compactionBlockReadCntMetric = "";
public String fsBlockReadCacheHitCntMetric = "";
public String compactionBlockReadCacheHitCntMetric = "";
public String fsMetaBlockReadCntMetric = "";
public String fsMetaBlockReadCacheHitCntMetric = "";
protected AbstractHFileReader(Path path, FixedFileTrailer trailer,
final FSDataInputStream fsdis, final long fileSize,
final boolean closeIStream,
@ -108,6 +121,20 @@ public abstract class AbstractHFileReader implements HFile.Reader {
this.path = path;
this.name = path.getName();
cfStatsPrefix = "cf." + parseCfNameFromPath(path.toString());
fsReadTimeNanoMetric = cfStatsPrefix + ".fsReadNano";
compactionReadTimeNanoMetric = cfStatsPrefix + ".compactionReadNano";
fsBlockReadCntMetric = cfStatsPrefix + ".fsBlockReadCnt";
fsBlockReadCacheHitCntMetric = cfStatsPrefix + ".fsBlockReadCacheHitCnt";
compactionBlockReadCntMetric = cfStatsPrefix + ".compactionBlockReadCnt";
compactionBlockReadCacheHitCntMetric = cfStatsPrefix
+ ".compactionBlockReadCacheHitCnt";
fsMetaBlockReadCntMetric = cfStatsPrefix + ".fsMetaBlockReadCnt";
fsMetaBlockReadCacheHitCntMetric = cfStatsPrefix
+ ".fsMetaBlockReadCacheHitCnt";
}
@SuppressWarnings("serial")

View File

@ -26,6 +26,8 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
@ -42,6 +44,8 @@ import org.apache.hadoop.io.Writable;
*/
public abstract class AbstractHFileWriter implements HFile.Writer {
private static final Log LOG = LogFactory.getLog(AbstractHFileWriter.class);
/** Key previously appended. Becomes the last key in the file. */
protected byte[] lastKeyBuffer = null;
@ -90,6 +94,11 @@ public abstract class AbstractHFileWriter implements HFile.Writer {
/** May be null if we were passed a stream. */
protected final Path path;
/** Prefix of the form cf.<column_family_name> for statistics counters. */
// Note that this is gotten from the path, which can be null, so this can
// remain unknown
public String cfStatsPrefix = "cf.unknown";
/** Cache configuration for caching data on write. */
protected final CacheConfig cacheConf;
@ -113,6 +122,27 @@ public abstract class AbstractHFileWriter implements HFile.Writer {
closeOutputStream = path != null;
this.cacheConf = cacheConf;
if (path != null)
cfStatsPrefix = "cf." + parseCfNameFromPath(path.toString());
}
/**
* Parse the HFile path to figure out which table and column family it belongs
* to. This is used to maintain read statistics on a per-column-family basis.
*
* @param path
* HFile path name
*/
public static String parseCfNameFromPath(String path) {
String splits[] = path.split("/");
if (splits.length < 2) {
LOG.warn("Could not determine the table and column family of the "
+ "HFile path " + path);
return "unknown";
}
return splits[splits.length - 2];
}
/**
@ -219,6 +249,11 @@ public abstract class AbstractHFileWriter implements HFile.Writer {
return path;
}
@Override
public String getColumnFamilyName() {
return cfStatsPrefix;
}
@Override
public String toString() {
return "writer=" + (path != null ? path.toString() : null) + ", name="

View File

@ -37,49 +37,55 @@ public enum BlockType {
// Scanned block section
/** Data block, both versions */
DATA("DATABLK*"),
DATA("DATABLK*", BlockCategory.DATA),
/** Version 2 leaf index block. Appears in the data block section */
LEAF_INDEX("IDXLEAF2"),
LEAF_INDEX("IDXLEAF2", BlockCategory.INDEX),
/** Bloom filter block, version 2 */
BLOOM_CHUNK("BLMFBLK2"),
BLOOM_CHUNK("BLMFBLK2", BlockCategory.BLOOM),
// Non-scanned block section
/** Meta blocks */
META("METABLKc"),
META("METABLKc", BlockCategory.META),
/** Intermediate-level version 2 index in the non-data block section */
INTERMEDIATE_INDEX("IDXINTE2"),
INTERMEDIATE_INDEX("IDXINTE2", BlockCategory.INDEX),
// Load-on-open section.
/** Root index block, also used for the single-level meta index, version 2 */
ROOT_INDEX("IDXROOT2"),
ROOT_INDEX("IDXROOT2", BlockCategory.INDEX),
/** File info, version 2 */
FILE_INFO("FILEINF2"),
FILE_INFO("FILEINF2", BlockCategory.META),
/** Bloom filter metadata, version 2 */
BLOOM_META("BLMFMET2"),
BLOOM_META("BLMFMET2", BlockCategory.BLOOM),
// Trailer
/** Fixed file trailer, both versions (always just a magic string) */
TRAILER("TRABLK\"$"),
TRAILER("TRABLK\"$", BlockCategory.META),
// Legacy blocks
/** Block index magic string in version 1 */
INDEX_V1("IDXBLK)+");
INDEX_V1("IDXBLK)+", BlockCategory.INDEX);
public enum BlockCategory {
DATA, META, INDEX, BLOOM
}
public static final int MAGIC_LENGTH = 8;
private final byte[] magic;
private final BlockCategory metricCat;
private BlockType(String magicStr) {
private BlockType(String magicStr, BlockCategory metricCat) {
magic = Bytes.toBytes(magicStr);
this.metricCat = metricCat;
assert magic.length == MAGIC_LENGTH;
}
@ -95,6 +101,10 @@ public enum BlockType {
buf.put(magic);
}
public String getMetricName(){
return metricCat.toString();
}
public static BlockType parse(byte[] buf, int offset, int length)
throws IOException {
if (length != MAGIC_LENGTH) {

View File

@ -67,7 +67,13 @@ public class CachedBlock implements HeapSize, Comparable<CachedBlock> {
this.blockName = blockName;
this.buf = buf;
this.accessTime = accessTime;
this.size = ClassSize.align(blockName.length())
// We approximate the size of this class by the size of its name string
// plus the size of its byte buffer plus the overhead associated with all
// the base classes. Strings have two bytes per character due to default
// Java unicode encoding (hence the times 2). We also include the base class
// sizes in the PER_BLOCK_OVERHEAD variable rather than align()ing them with
// their buffer lengths. This variable is used elsewhere in unit tests.
this.size = ClassSize.align(2 * blockName.length())
+ ClassSize.align(buf.heapSize()) + PER_BLOCK_OVERHEAD;
if(inMemory) {
this.priority = BlockPriority.MEMORY;

View File

@ -176,6 +176,8 @@ public class HFile {
/** @return the path to this {@link HFile} */
Path getPath();
String getColumnFamilyName();
void appendMetaBlock(String bloomFilterMetaKey, Writable metaWriter);
/**

View File

@ -33,6 +33,7 @@ import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.hbase.io.DoubleOutputStream;
import org.apache.hadoop.hbase.io.hfile.HFileBlockInfo;
import org.apache.hadoop.hbase.io.hfile.Compression.Algorithm;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ClassSize;
@ -73,7 +74,7 @@ import static org.apache.hadoop.hbase.io.hfile.Compression.Algorithm.NONE;
* The version 2 block representation in the block cache is the same as above,
* except that the data section is always uncompressed in the cache.
*/
public class HFileBlock implements Cacheable {
public class HFileBlock implements Cacheable, HFileBlockInfo {
/** The size of a version 2 {@link HFile} block header */
public static final int HEADER_SIZE = MAGIC_LENGTH + 2 * Bytes.SIZEOF_INT
@ -156,6 +157,16 @@ public class HFileBlock implements Cacheable {
this.offset = offset;
}
private String cfStatsPrefix = "cf.unknown";
public String getColumnFamilyName() {
return this.cfStatsPrefix;
}
public void setColumnFamilyName(String cfName) {
this.cfStatsPrefix = cfName;
}
/**
* Creates a block from an existing buffer starting with a header. Rewinds
* and takes ownership of the buffer. By definition of rewind, ignores the
@ -423,8 +434,8 @@ public class HFileBlock implements Cacheable {
// If we are on heap, then we add the capacity of buf.
if (buf != null) {
return ClassSize.align(ClassSize.OBJECT + 2 * ClassSize.REFERENCE + 3
* Bytes.SIZEOF_INT + 2 * Bytes.SIZEOF_LONG + BYTE_BUFFER_HEAP_SIZE)
+ ClassSize.align(buf.capacity());
* Bytes.SIZEOF_INT + 2 * Bytes.SIZEOF_LONG)
+ ClassSize.align(BYTE_BUFFER_HEAP_SIZE + buf.capacity());
} else {
return ClassSize.align(ClassSize.OBJECT + 2 * ClassSize.REFERENCE + 3

View File

@ -0,0 +1,44 @@
/*
* Copyright 2011 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.io.hfile;
import org.apache.hadoop.hbase.io.hfile.BlockType;
import org.apache.hadoop.hbase.io.hfile.HFileBlock.Writer;
import org.apache.hadoop.hbase.io.HeapSize;
/**
* An interface that exposes methods to retrieve the column type and BlockType
* of a particular cached block. This is more information than that which is
* required by most cache implementations, but is used for more specific
* metrics, for example. Used by implementations of HeapSize, such as
* {@link HFileBlock}
*/
public interface HFileBlockInfo {
/**
* @return Column family name of this cached item.
*/
public String getColumnFamilyName();
/**
* @return BlockType descriptor of this cached item. Indicates the type of
* data, such as a data block or an index one.
*/
public BlockType getBlockType();
}

View File

@ -33,6 +33,7 @@ import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.io.hfile.HFile.FileInfo;
import org.apache.hadoop.hbase.io.hfile.HFile.Reader;
import org.apache.hadoop.hbase.io.hfile.HFile.Writer;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.RawComparator;
@ -216,6 +217,7 @@ public class HFileReaderV1 extends AbstractHFileReader {
// Per meta key from any given file, synchronize reads for said block
synchronized (metaBlockIndexReader.getRootBlockKey(block)) {
metaLoads.incrementAndGet();
HRegion.incrNumericMetric(this.fsMetaBlockReadCntMetric, 1);
// Check cache for block. If found return.
if (cacheConf.isBlockCacheEnabled()) {
HFileBlock cachedBlock =
@ -223,6 +225,7 @@ public class HFileReaderV1 extends AbstractHFileReader {
cacheConf.shouldCacheDataOnRead());
if (cachedBlock != null) {
cacheHits.incrementAndGet();
HRegion.incrNumericMetric(this.fsMetaBlockReadCacheHitCntMetric, 1);
return cachedBlock.getBufferWithoutHeader();
}
// Cache Miss, please load.
@ -231,9 +234,12 @@ public class HFileReaderV1 extends AbstractHFileReader {
HFileBlock hfileBlock = fsBlockReader.readBlockData(offset,
nextOffset - offset, metaBlockIndexReader.getRootBlockDataSize(block),
true);
hfileBlock.setColumnFamilyName(this.getColumnFamilyName());
hfileBlock.expectType(BlockType.META);
HFile.readTimeNano.addAndGet(System.nanoTime() - startTimeNs);
long delta = System.nanoTime() - startTimeNs;
HRegion.incrTimeVaryingMetric(fsReadTimeNanoMetric, delta);
HFile.readTimeNano.addAndGet(delta);
HFile.readOps.incrementAndGet();
// Cache the block
@ -276,6 +282,12 @@ public class HFileReaderV1 extends AbstractHFileReader {
synchronized (dataBlockIndexReader.getRootBlockKey(block)) {
blockLoads.incrementAndGet();
if (isCompaction) {
HRegion.incrNumericMetric(this.compactionBlockReadCntMetric, 1);
} else {
HRegion.incrNumericMetric(this.fsBlockReadCntMetric, 1);
}
// Check cache for block. If found return.
if (cacheConf.isBlockCacheEnabled()) {
HFileBlock cachedBlock =
@ -283,6 +295,15 @@ public class HFileReaderV1 extends AbstractHFileReader {
cacheConf.shouldCacheDataOnRead());
if (cachedBlock != null) {
cacheHits.incrementAndGet();
if (isCompaction) {
HRegion.incrNumericMetric(
this.compactionBlockReadCacheHitCntMetric, 1);
} else {
HRegion.incrNumericMetric(
this.fsBlockReadCacheHitCntMetric, 1);
}
return cachedBlock.getBufferWithoutHeader();
}
// Carry on, please load.
@ -304,11 +325,18 @@ public class HFileReaderV1 extends AbstractHFileReader {
HFileBlock hfileBlock = fsBlockReader.readBlockData(offset, nextOffset
- offset, dataBlockIndexReader.getRootBlockDataSize(block), pread);
hfileBlock.setColumnFamilyName(this.getColumnFamilyName());
hfileBlock.expectType(BlockType.DATA);
ByteBuffer buf = hfileBlock.getBufferWithoutHeader();
HFile.readTimeNano.addAndGet(System.nanoTime() - startTimeNs);
long delta = System.nanoTime() - startTimeNs;
HFile.readTimeNano.addAndGet(delta);
HFile.readOps.incrementAndGet();
if (isCompaction) {
HRegion.incrTimeVaryingMetric(this.compactionReadTimeNanoMetric, delta);
} else {
HRegion.incrTimeVaryingMetric(this.fsReadTimeNanoMetric, delta);
}
// Cache the block
if (cacheConf.shouldCacheDataOnRead() && cacheBlock) {

View File

@ -31,6 +31,7 @@ import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.io.hfile.HFile.FileInfo;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.IdLock;
@ -169,6 +170,7 @@ public class HFileReaderV2 extends AbstractHFileReader {
// single-level.
synchronized (metaBlockIndexReader.getRootBlockKey(block)) {
metaLoads.incrementAndGet();
HRegion.incrNumericMetric(fsMetaBlockReadCntMetric, 1);
// Check cache for block. If found return.
long metaBlockOffset = metaBlockIndexReader.getRootBlockOffset(block);
@ -182,6 +184,7 @@ public class HFileReaderV2 extends AbstractHFileReader {
// Return a distinct 'shallow copy' of the block,
// so pos does not get messed by the scanner
cacheHits.incrementAndGet();
HRegion.incrNumericMetric(fsMetaBlockReadCacheHitCntMetric, 1);
return cachedBlock.getBufferWithoutHeader();
}
// Cache Miss, please load.
@ -189,8 +192,11 @@ public class HFileReaderV2 extends AbstractHFileReader {
HFileBlock metaBlock = fsBlockReader.readBlockData(metaBlockOffset,
blockSize, -1, true);
metaBlock.setColumnFamilyName(this.getColumnFamilyName());
HFile.readTimeNano.addAndGet(System.nanoTime() - startTimeNs);
long delta = System.nanoTime() - startTimeNs;
HRegion.incrTimeVaryingMetric(fsReadTimeNanoMetric, delta);
HFile.readTimeNano.addAndGet(delta);
HFile.readOps.incrementAndGet();
// Cache the block
@ -246,6 +252,13 @@ public class HFileReaderV2 extends AbstractHFileReader {
if (cachedBlock != null) {
cacheHits.incrementAndGet();
if (isCompaction) {
HRegion.incrNumericMetric(
this.compactionBlockReadCacheHitCntMetric, 1);
} else {
HRegion.incrNumericMetric(this.fsBlockReadCacheHitCntMetric, 1);
}
return cachedBlock;
}
// Carry on, please load.
@ -255,9 +268,16 @@ public class HFileReaderV2 extends AbstractHFileReader {
long startTimeNs = System.nanoTime();
HFileBlock dataBlock = fsBlockReader.readBlockData(dataBlockOffset,
onDiskBlockSize, -1, pread);
dataBlock.setColumnFamilyName(this.getColumnFamilyName());
HFile.readTimeNano.addAndGet(System.nanoTime() - startTimeNs);
long delta = System.nanoTime() - startTimeNs;
HFile.readTimeNano.addAndGet(delta);
HFile.readOps.incrementAndGet();
if (isCompaction) {
HRegion.incrTimeVaryingMetric(this.compactionReadTimeNanoMetric, delta);
} else {
HRegion.incrTimeVaryingMetric(this.fsReadTimeNanoMetric, delta);
}
// Cache the block
if (cacheBlock) {

View File

@ -201,11 +201,12 @@ public class HFileWriterV1 extends AbstractHFileWriter {
if (cacheConf.shouldCacheDataOnWrite()) {
baosDos.flush();
byte[] bytes = baos.toByteArray();
HFileBlock cBlock = new HFileBlock(BlockType.DATA,
(int) (outputStream.getPos() - blockBegin), bytes.length, -1,
ByteBuffer.wrap(bytes, 0, bytes.length), true, blockBegin);
cBlock.setColumnFamilyName(this.getColumnFamilyName());
cacheConf.getBlockCache().cacheBlock(
HFile.getBlockCacheKey(name, blockBegin),
new HFileBlock(BlockType.DATA,
(int) (outputStream.getPos() - blockBegin), bytes.length, -1,
ByteBuffer.wrap(bytes, 0, bytes.length), true, blockBegin));
HFile.getBlockCacheKey(name, blockBegin), cBlock);
baosDos.close();
}
blockNumber++;
@ -485,4 +486,4 @@ public class HFileWriterV1 extends AbstractHFileWriter {
return pos;
}
}
}

View File

@ -220,9 +220,10 @@ public class HFileWriterV2 extends AbstractHFileWriter {
HFile.writeOps.incrementAndGet();
if (cacheConf.shouldCacheDataOnWrite()) {
HFileBlock blockForCaching = fsBlockWriter.getBlockForCaching();
blockForCaching.setColumnFamilyName(this.getColumnFamilyName());
cacheConf.getBlockCache().cacheBlock(
HFile.getBlockCacheKey(name, lastDataBlockOffset),
fsBlockWriter.getBlockForCaching());
HFile.getBlockCacheKey(name, lastDataBlockOffset), blockForCaching);
}
}
@ -240,9 +241,10 @@ public class HFileWriterV2 extends AbstractHFileWriter {
if (cacheThisBlock) {
// Cache this block on write.
HFileBlock cBlock = fsBlockWriter.getBlockForCaching();
cBlock.setColumnFamilyName(this.getColumnFamilyName());
cacheConf.getBlockCache().cacheBlock(
HFile.getBlockCacheKey(name, offset),
fsBlockWriter.getBlockForCaching());
HFile.getBlockCacheKey(name, offset), cBlock);
}
}
}

View File

@ -41,6 +41,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.io.HeapSize;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ClassSize;
import org.apache.hadoop.hbase.util.FSUtils;
@ -266,7 +267,7 @@ public class LruBlockCache implements BlockCache, HeapSize {
throw new RuntimeException("Cached an already cached block");
}
cb = new CachedBlock(blockName, buf, count.incrementAndGet(), inMemory);
long newSize = size.addAndGet(cb.heapSize());
long newSize = updateSizeMetrics(cb, false);
map.put(blockName, cb);
elements.incrementAndGet();
if(newSize > acceptableSize() && !evictionInProgress) {
@ -288,6 +289,30 @@ public class LruBlockCache implements BlockCache, HeapSize {
cacheBlock(blockName, buf, false);
}
/**
* Helper function that updates the local size counter and also updates any
* per-cf or per-blocktype metrics it can discern from given
* {@link CachedBlock}
*
* @param cb
* @param evict
*/
protected long updateSizeMetrics(CachedBlock cb, boolean evict) {
long heapsize = cb.heapSize();
if (evict) {
heapsize *= -1;
}
if (cb.getBuffer() instanceof HFileBlockInfo) {
HFileBlockInfo cb_hfbi = (HFileBlockInfo) cb.getBuffer();
HRegion.incrNumericPersistentMetric(cb_hfbi.getColumnFamilyName()
+ ".blockCacheSize", heapsize);
HRegion.incrNumericPersistentMetric("bt."
+ cb_hfbi.getBlockType().getMetricName() + ".blockCacheSize",
heapsize);
}
return size.addAndGet(heapsize);
}
/**
* Get the buffer of the block with the specified name.
* @param blockName block name
@ -340,7 +365,7 @@ public class LruBlockCache implements BlockCache, HeapSize {
protected long evictBlock(CachedBlock block) {
map.remove(block.getName());
size.addAndGet(-1 * block.heapSize());
updateSizeMetrics(block, true);
elements.decrementAndGet();
stats.evicted();
return block.heapSize();

View File

@ -395,7 +395,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
halfWriter = new StoreFile.Writer(
fs, outFile, blocksize, compression, conf, cacheConf,
KeyValue.COMPARATOR, bloomFilterType, 0);
HFileScanner scanner = halfReader.getScanner(false, false);
HFileScanner scanner = halfReader.getScanner(false, false, false);
scanner.seekTo();
do {
KeyValue kv = scanner.getKeyValue();

View File

@ -41,6 +41,7 @@ import java.util.Random;
import java.util.TreeMap;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@ -274,6 +275,55 @@ public class HRegion implements HeapSize { // , Writable{
private HTableDescriptor htableDescriptor = null;
private RegionSplitPolicy splitPolicy;
// for simple numeric metrics (# of blocks read from block cache)
public static final ConcurrentMap<String, AtomicLong> numericMetrics = new ConcurrentHashMap<String, AtomicLong>();
// for simple numeric metrics (current block cache size)
// These ones are not reset to zero when queried, unlike the previous.
public static final ConcurrentMap<String, AtomicLong> numericPersistentMetrics = new ConcurrentHashMap<String, AtomicLong>();
// Used for metrics where we want track a metrics (such as latency)
// over a number of operations.
public static final ConcurrentMap<String, Pair<AtomicLong, AtomicInteger>> timeVaryingMetrics = new ConcurrentHashMap<String, Pair<AtomicLong, AtomicInteger>>();
public static void incrNumericMetric(String key, long amount) {
AtomicLong oldVal = numericMetrics.get(key);
if (oldVal == null) {
oldVal = numericMetrics.putIfAbsent(key, new AtomicLong(amount));
if (oldVal == null)
return;
}
oldVal.addAndGet(amount);
}
public static void setNumericMetric(String key, long amount) {
numericMetrics.put(key, new AtomicLong(amount));
}
public static void incrTimeVaryingMetric(String key, long amount) {
Pair<AtomicLong, AtomicInteger> oldVal = timeVaryingMetrics.get(key);
if (oldVal == null) {
oldVal = timeVaryingMetrics.putIfAbsent(key,
new Pair<AtomicLong, AtomicInteger>(new AtomicLong(amount),
new AtomicInteger(1)));
if (oldVal == null)
return;
}
oldVal.getFirst().addAndGet(amount); // total time
oldVal.getSecond().incrementAndGet(); // increment ops by 1
}
public static void incrNumericPersistentMetric(String key, long amount) {
AtomicLong oldVal = numericPersistentMetrics.get(key);
if (oldVal == null) {
oldVal = numericPersistentMetrics
.putIfAbsent(key, new AtomicLong(amount));
if (oldVal == null)
return;
}
oldVal.addAndGet(amount);
}
/**
* Should only be used for testing purposes
*/

View File

@ -42,12 +42,14 @@ import java.util.Random;
import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.commons.lang.mutable.MutableDouble;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@ -122,6 +124,7 @@ import org.apache.hadoop.hbase.regionserver.handler.CloseRootHandler;
import org.apache.hadoop.hbase.regionserver.handler.OpenMetaHandler;
import org.apache.hadoop.hbase.regionserver.handler.OpenRegionHandler;
import org.apache.hadoop.hbase.regionserver.handler.OpenRootHandler;
import org.apache.hadoop.hbase.regionserver.metrics.RegionServerDynamicMetrics;
import org.apache.hadoop.hbase.regionserver.metrics.RegionServerMetrics;
import org.apache.hadoop.hbase.regionserver.wal.FailedLogCloseException;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
@ -241,6 +244,7 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler,
private final LinkedList<byte[]> reservedSpace = new LinkedList<byte[]>();
private RegionServerMetrics metrics;
private RegionServerDynamicMetrics dynamicMetrics;
// Compactions
public CompactSplitThread compactSplitThread;
@ -914,6 +918,7 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler,
this.hlog = setupWALAndReplication();
// Init in here rather than in constructor after thread name has been set
this.metrics = new RegionServerMetrics();
this.dynamicMetrics = RegionServerDynamicMetrics.newInstance();
startServiceThreads();
LOG.info("Serving as " + this.serverNameFromMasterPOV +
", RPC listening on " + this.isa +
@ -1236,6 +1241,24 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler,
}
}
/**
* Help function for metrics() that increments a map value if it exists.
*
* @param map
* The map to work with
* @param key
* the string key
* @param val
* the value to add or set the map key to
*/
protected void incrMap(Map<String, MutableDouble> map, String key, double val) {
if (map.get(key) != null) {
map.get(key).add(val);
} else {
map.put(key, new MutableDouble(val));
}
}
protected void metrics() {
this.metrics.regions.set(this.onlineRegions.size());
this.metrics.incrementRequests(this.requestCount.get());
@ -1252,24 +1275,62 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler,
new HDFSBlocksDistribution();
long totalStaticIndexSize = 0;
long totalStaticBloomSize = 0;
for (Map.Entry<String, HRegion> e : this.onlineRegions.entrySet()) {
HRegion r = e.getValue();
memstoreSize += r.memstoreSize.get();
readRequestsCount += r.readRequestsCount.get();
writeRequestsCount += r.writeRequestsCount.get();
synchronized (r.stores) {
stores += r.stores.size();
for (Map.Entry<byte[], Store> ee : r.stores.entrySet()) {
Store store = ee.getValue();
storefiles += store.getStorefilesCount();
storefileIndexSize += store.getStorefilesIndexSize();
totalStaticIndexSize += store.getTotalStaticIndexSize();
totalStaticBloomSize += store.getTotalStaticBloomSize();
}
}
hdfsBlocksDistribution.add(r.getHDFSBlocksDistribution());
long tmpfiles;
long tmpindex;
long tmpfilesize;
long tmpbloomsize;
long tmpstaticsize;
String cfname;
// Note that this is a map of Doubles instead of Longs. This is because we
// do effective integer division, which would perhaps truncate more than it
// should because we do it only on one part of our sum at a time. Rather
// than dividing at the end, where it is difficult to know the proper
// factor, everything is exact then truncated.
Map<String, MutableDouble> tempVals = new HashMap<String, MutableDouble>();
for (Map.Entry<String, HRegion> e : this.onlineRegions.entrySet()) {
HRegion r = e.getValue();
memstoreSize += r.memstoreSize.get();
readRequestsCount += r.readRequestsCount.get();
writeRequestsCount += r.writeRequestsCount.get();
synchronized (r.stores) {
stores += r.stores.size();
for (Map.Entry<byte[], Store> ee : r.stores.entrySet()) {
Store store = ee.getValue();
tmpfiles = store.getStorefilesCount();
tmpindex = store.getStorefilesIndexSize();
tmpfilesize = store.getStorefilesSize();
tmpbloomsize = store.getTotalStaticBloomSize();
tmpstaticsize = store.getTotalStaticIndexSize();
// Note that there is only one store per CF so setting is safe
cfname = "cf." + store.toString();
this.incrMap(tempVals, cfname + ".storeFileCount", tmpfiles);
this.incrMap(tempVals, cfname + ".storeFileIndexSizeMB",
(tmpindex / (1024.0 * 1024)));
this.incrMap(tempVals, cfname + ".storeFileSizeMB",
(tmpfilesize / (1024.0 * 1024)));
this.incrMap(tempVals, cfname + ".staticBloomSizeKB",
(tmpbloomsize / 1024.0));
this.incrMap(tempVals, cfname + ".memstoreSizeMB",
(store.getMemStoreSize() / (1024.0 * 1024)));
this.incrMap(tempVals, cfname + ".staticIndexSizeKB",
tmpstaticsize / 1024.0);
storefiles += tmpfiles;
storefileIndexSize += tmpindex;
totalStaticIndexSize += tmpstaticsize;
totalStaticBloomSize += tmpbloomsize;
}
}
hdfsBlocksDistribution.add(r.getHDFSBlocksDistribution());
}
for (Entry<String, MutableDouble> e : tempVals.entrySet()) {
HRegion.setNumericMetric(e.getKey(), e.getValue().longValue());
}
this.metrics.stores.set(stores);
this.metrics.storefiles.set(storefiles);
this.metrics.memstoreSizeMB.set((int) (memstoreSize / (1024 * 1024)));

View File

@ -536,6 +536,14 @@ public class Store implements HeapSize {
StoreFile.Reader r = sf.createReader();
this.storeSize += r.length();
this.totalUncompressedBytes += r.getTotalUncompressedBytes();
// This increments the metrics associated with total flushed bytes for this
// family. The overall flush count is stored in the static metrics and
// retrieved from HRegion.recentFlushes, which is set within
// HRegion.internalFlushcache, which indirectly calls this to actually do
// the flushing through the StoreFlusherImpl class
HRegion.incrNumericPersistentMetric("cf." + this.toString() + ".flushSize",
flushed);
if(LOG.isInfoEnabled()) {
LOG.info("Added " + sf + ", entries=" + r.getEntries() +
", sequenceid=" + logCacheFlushId +
@ -1122,7 +1130,7 @@ public class Store implements HeapSize {
// For each file, obtain a scanner:
List<StoreFileScanner> scanners = StoreFileScanner
.getScannersForStoreFiles(filesToCompact, false, false);
.getScannersForStoreFiles(filesToCompact, false, false, true);
// Make the instantiation lazy in case compaction produces no product; i.e.
// where all source cells are expired or deleted.
@ -1417,7 +1425,7 @@ public class Store implements HeapSize {
firstOnRow = new KeyValue(lastKV.getRow(), HConstants.LATEST_TIMESTAMP);
}
// Get a scanner that caches blocks and that uses pread.
HFileScanner scanner = r.getHFileReader().getScanner(true, true);
HFileScanner scanner = r.getHFileReader().getScanner(true, true, false);
// Seek scanner. If can't seek it, return.
if (!seekToScanner(scanner, firstOnRow, firstKV)) return;
// If we found candidate on firstOnRow, just return. THIS WILL NEVER HAPPEN!
@ -1700,6 +1708,13 @@ public class Store implements HeapSize {
return size;
}
/**
* @return The size of this store's memstore, in bytes
*/
long getMemStoreSize() {
return this.memstore.heapSize();
}
/**
* @return The priority that this store should have in the compaction queue
*/

View File

@ -969,11 +969,21 @@ public class StoreFile {
private final HFile.Reader reader;
protected TimeRangeTracker timeRangeTracker = null;
protected long sequenceID = -1;
private final String bloomAccessedMetric;
private final String bloomSkippedMetric;
private byte[] lastBloomKey;
public Reader(FileSystem fs, Path path, CacheConfig cacheConf)
throws IOException {
reader = HFile.createReader(fs, path, cacheConf);
// prepare the text (key) for the metrics
bloomAccessedMetric = reader.getColumnFamilyName() +
".keyMaybeInBloomCnt";
bloomSkippedMetric = reader.getColumnFamilyName() +
".keyNotInBloomCnt";
bloomFilterType = BloomType.NONE;
}
@ -982,6 +992,8 @@ public class StoreFile {
*/
Reader() {
this.reader = null;
bloomAccessedMetric = "";
bloomSkippedMetric = "";
}
public RawComparator<byte []> getComparator() {
@ -989,14 +1001,32 @@ public class StoreFile {
}
/**
* Get a scanner to scan over this StoreFile.
* Get a scanner to scan over this StoreFile. Do not use
* this overload if using this scanner for compactions.
*
* @param cacheBlocks should this scanner cache blocks?
* @param pread use pread (for highly concurrent small readers)
* @return a scanner
*/
public StoreFileScanner getStoreFileScanner(boolean cacheBlocks, boolean pread) {
return new StoreFileScanner(this, getScanner(cacheBlocks, pread));
public StoreFileScanner getStoreFileScanner(boolean cacheBlocks,
boolean pread) {
return getStoreFileScanner(cacheBlocks, pread, false);
}
/**
* Get a scanner to scan over this StoreFile.
*
* @param cacheBlocks should this scanner cache blocks?
* @param pread use pread (for highly concurrent small readers)
* @param isCompaction is scanner being used for compaction?
* @return a scanner
*/
public StoreFileScanner getStoreFileScanner(boolean cacheBlocks,
boolean pread,
boolean isCompaction) {
return new StoreFileScanner(this,
getScanner(cacheBlocks, pread,
isCompaction));
}
/**
@ -1010,7 +1040,26 @@ public class StoreFile {
*/
@Deprecated
public HFileScanner getScanner(boolean cacheBlocks, boolean pread) {
return reader.getScanner(cacheBlocks, pread);
return getScanner(cacheBlocks, pread, false);
}
/**
* Warning: Do not write further code which depends on this call. Instead
* use getStoreFileScanner() which uses the StoreFileScanner class/interface
* which is the preferred way to scan a store with higher level concepts.
*
* @param cacheBlocks
* should we cache the blocks?
* @param pread
* use pread (for concurrent small readers)
* @param isCompaction
* is scanner being used for compaction?
* @return the underlying HFileScanner
*/
@Deprecated
public HFileScanner getScanner(boolean cacheBlocks, boolean pread,
boolean isCompaction) {
return reader.getScanner(cacheBlocks, pread, isCompaction);
}
public void close(boolean evictOnClose) throws IOException {
@ -1175,6 +1224,10 @@ public class StoreFile {
&& this.bloomFilter.contains(key, 0, key.length, bloom);
}
if (exists)
HRegion.incrNumericMetric(bloomAccessedMetric, 1);
else
HRegion.incrNumericMetric(bloomSkippedMetric, 1);
return exists;
}
} catch (IOException e) {
@ -1273,6 +1326,10 @@ public class StoreFile {
return reader.indexSize();
}
public String getColumnFamilyName() {
return reader.getColumnFamilyName();
}
public BloomType getBloomFilterType() {
return this.bloomFilterType;
}

View File

@ -71,14 +71,27 @@ class StoreFileScanner implements KeyValueScanner {
* set of store files.
*/
public static List<StoreFileScanner> getScannersForStoreFiles(
Collection<StoreFile> filesToCompact,
Collection<StoreFile> files,
boolean cacheBlocks,
boolean usePread) throws IOException {
return getScannersForStoreFiles(files, cacheBlocks,
usePread, false);
}
/**
* Return an array of scanners corresponding to the given
* set of store files.
*/
public static List<StoreFileScanner> getScannersForStoreFiles(
Collection<StoreFile> files,
boolean cacheBlocks,
boolean usePread,
boolean isCompaction) throws IOException {
List<StoreFileScanner> scanners =
new ArrayList<StoreFileScanner>(filesToCompact.size());
for (StoreFile file : filesToCompact) {
new ArrayList<StoreFileScanner>(files.size());
for (StoreFile file : files) {
StoreFile.Reader r = file.createReader();
scanners.add(r.getStoreFileScanner(cacheBlocks, usePread));
scanners.add(r.getStoreFileScanner(cacheBlocks, usePread, isCompaction));
}
return scanners;
}

View File

@ -167,7 +167,8 @@ class StoreScanner extends NonLazyKeyValueScanner
// but now we get them in ascending order, which I think is
// actually more correct, since memstore get put at the end.
List<StoreFileScanner> sfScanners = StoreFileScanner
.getScannersForStoreFiles(store.getStorefiles(), cacheBlocks, isGet);
.getScannersForStoreFiles(store.getStorefiles(), cacheBlocks, isGet,
false);
List<KeyValueScanner> scanners =
new ArrayList<KeyValueScanner>(sfScanners.size()+1);
scanners.addAll(sfScanners);

View File

@ -0,0 +1,166 @@
/**
* Copyright 2010 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver.metrics;
import java.lang.reflect.Method;
import java.util.Map.Entry;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.metrics.MetricsContext;
import org.apache.hadoop.metrics.MetricsRecord;
import org.apache.hadoop.metrics.MetricsUtil;
import org.apache.hadoop.metrics.Updater;
import org.apache.hadoop.metrics.util.MetricsBase;
import org.apache.hadoop.metrics.util.MetricsLongValue;
import org.apache.hadoop.metrics.util.MetricsRegistry;
import org.apache.hadoop.metrics.util.MetricsTimeVaryingRate;
/**
*
* This class is for maintaining the various RPC statistics
* and publishing them through the metrics interfaces.
* This also registers the JMX MBean for RPC.
* <p>
* This class has a number of metrics variables that are publicly accessible;
* these variables (objects) have methods to update their values;
* for example:
* <p> {@link #rpcQueueTime}.inc(time)
*
*/
public class RegionServerDynamicMetrics implements Updater {
private MetricsRecord metricsRecord;
private MetricsContext context;
private final RegionServerDynamicStatistics rsDynamicStatistics;
private Method updateMbeanInfoIfMetricsListChanged = null;
private static final Log LOG =
LogFactory.getLog(RegionServerDynamicStatistics.class);
/**
* The metrics variables are public:
* - they can be set directly by calling their set/inc methods
* -they can also be read directly - e.g. JMX does this.
*/
public final MetricsRegistry registry = new MetricsRegistry();
private RegionServerDynamicMetrics() {
this.context = MetricsUtil.getContext("hbase");
this.metricsRecord = MetricsUtil.createRecord(
this.context,
"RegionServerDynamicStatistics");
context.registerUpdater(this);
this.rsDynamicStatistics = new RegionServerDynamicStatistics(this.registry);
try {
updateMbeanInfoIfMetricsListChanged =
this.rsDynamicStatistics.getClass().getSuperclass()
.getDeclaredMethod("updateMbeanInfoIfMetricsListChanged",
new Class[]{});
updateMbeanInfoIfMetricsListChanged.setAccessible(true);
} catch (Exception e) {
LOG.error(e);
}
}
public static RegionServerDynamicMetrics newInstance() {
RegionServerDynamicMetrics metrics =
new RegionServerDynamicMetrics();
return metrics;
}
public synchronized void setNumericMetric(String name, long amt) {
MetricsLongValue m = (MetricsLongValue)registry.get(name);
if (m == null) {
m = new MetricsLongValue(name, this.registry);
try {
if (updateMbeanInfoIfMetricsListChanged != null) {
updateMbeanInfoIfMetricsListChanged.invoke(this.rsDynamicStatistics,
new Object[]{});
}
} catch (Exception e) {
LOG.error(e);
}
}
m.set(amt);
}
public synchronized void incrTimeVaryingMetric(
String name,
long amt,
int numOps) {
MetricsTimeVaryingRate m = (MetricsTimeVaryingRate)registry.get(name);
if (m == null) {
m = new MetricsTimeVaryingRate(name, this.registry);
try {
if (updateMbeanInfoIfMetricsListChanged != null) {
updateMbeanInfoIfMetricsListChanged.invoke(this.rsDynamicStatistics,
new Object[]{});
}
} catch (Exception e) {
LOG.error(e);
}
}
if (numOps > 0) {
m.inc(numOps, amt);
}
}
/**
* Push the metrics to the monitoring subsystem on doUpdate() call.
* @param context ctx
*/
public void doUpdates(MetricsContext context) {
/* get dynamically created numeric metrics, and push the metrics */
for (Entry<String, AtomicLong> entry : HRegion.numericMetrics.entrySet()) {
this.setNumericMetric(entry.getKey(), entry.getValue().getAndSet(0));
}
/* get dynamically created numeric metrics, and push the metrics.
* These ones aren't to be reset; they are cumulative. */
for (Entry<String, AtomicLong> entry : HRegion.numericPersistentMetrics.entrySet()) {
this.setNumericMetric(entry.getKey(), entry.getValue().get());
}
/* get dynamically created time varying metrics, and push the metrics */
for (Entry<String, Pair<AtomicLong, AtomicInteger>> entry :
HRegion.timeVaryingMetrics.entrySet()) {
Pair<AtomicLong, AtomicInteger> value = entry.getValue();
this.incrTimeVaryingMetric(entry.getKey(),
value.getFirst().getAndSet(0),
value.getSecond().getAndSet(0));
}
synchronized (registry) {
// Iterate through the registry to propagate the different rpc metrics.
for (String metricName : registry.getKeyList() ) {
MetricsBase value = registry.get(metricName);
value.pushMetric(metricsRecord);
}
}
metricsRecord.update();
}
public void shutdown() {
if (rsDynamicStatistics != null)
rsDynamicStatistics.shutdown();
}
}

View File

@ -0,0 +1,49 @@
/**
* Copyright 2010 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver.metrics;
import org.apache.hadoop.metrics.util.MBeanUtil;
import org.apache.hadoop.metrics.util.MetricsDynamicMBeanBase;
import org.apache.hadoop.metrics.util.MetricsRegistry;
import javax.management.ObjectName;
/**
* Exports dynamic region server metric recorded in
* {@link RegionServerDynamicMetrics} as an MBean
* for JMX monitoring.
*/
public class RegionServerDynamicStatistics extends MetricsDynamicMBeanBase {
private final ObjectName mbeanName;
public RegionServerDynamicStatistics(MetricsRegistry registry) {
super(registry, "RegionServerDynamicStatistics");
mbeanName = MBeanUtil.registerMBean("RegionServerDynamic",
"RegionServerDynamicStatistics",
this);
}
public void shutdown() {
if (mbeanName != null)
MBeanUtil.unregisterMBean(mbeanName);
}
}

View File

@ -529,7 +529,7 @@ public class TestLruBlockCache extends TestCase {
/** Size of the cache block holding this item. Used for verification. */
public long cacheBlockHeapSize() {
return CachedBlock.PER_BLOCK_OVERHEAD
+ ClassSize.align(blockName.length())
+ ClassSize.align(2 * blockName.length())
+ ClassSize.align(size);
}