diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionSource.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionSource.java index 2a198d61bc5..b3a556e3d9f 100644 --- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionSource.java +++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionSource.java @@ -53,6 +53,10 @@ public interface MetricsRegionSource extends Comparable { String COPROCESSOR_EXECUTION_STATISTICS_DESC = "Statistics for coprocessor execution times"; String REPLICA_ID = "replicaid"; String REPLICA_ID_DESC = "The replica ID of a region. 0 is primary, otherwise is secondary"; + String ROW_READS_ONLY_ON_MEMSTORE = "memstoreOnlyRowReadsCount"; + String ROW_READS_ONLY_ON_MEMSTORE_DESC = "Row reads happening completely out of memstore"; + String MIXED_ROW_READS = "mixedRowReadsCount"; + String MIXED_ROW_READS_ON_STORE_DESC = "Row reads happening out of files and memstore on store"; /** * Close the region's metrics as this region is closing. diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionSourceImpl.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionSourceImpl.java index f2eabf886b5..40df2eef63d 100644 --- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionSourceImpl.java +++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionSourceImpl.java @@ -18,6 +18,8 @@ package org.apache.hadoop.hbase.regionserver; +import java.util.Map; +import java.util.Map.Entry; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.hadoop.hbase.metrics.Interns; @@ -33,6 +35,8 @@ public class MetricsRegionSourceImpl implements MetricsRegionSource { private static final Logger LOG = LoggerFactory.getLogger(MetricsRegionSourceImpl.class); + private static final String _STORE = "_store_"; + private AtomicBoolean closed = new AtomicBoolean(false); // Non-final so that we can null out the wrapper @@ -45,6 +49,8 @@ public class MetricsRegionSourceImpl implements MetricsRegionSource { private final DynamicMetricsRegistry registry; private final String regionNamePrefix; + private final String regionNamePrefix1; + private final String regionNamePrefix2; private final String regionPutKey; private final String regionDeleteKey; private final String regionGetKey; @@ -77,10 +83,10 @@ public class MetricsRegionSourceImpl implements MetricsRegionSource { registry = agg.getMetricsRegistry(); - regionNamePrefix = "Namespace_" + regionWrapper.getNamespace() + - "_table_" + regionWrapper.getTableName() + - "_region_" + regionWrapper.getRegionName() + - "_metric_"; + regionNamePrefix1 = "Namespace_" + regionWrapper.getNamespace() + "_table_" + + regionWrapper.getTableName() + "_region_" + regionWrapper.getRegionName(); + regionNamePrefix2 = "_metric_"; + regionNamePrefix = regionNamePrefix1 + regionNamePrefix2; String suffix = "Count"; @@ -302,6 +308,24 @@ public class MetricsRegionSourceImpl implements MetricsRegionSource { regionNamePrefix + MetricsRegionSource.MAX_FLUSH_QUEUE_SIZE, MetricsRegionSource.MAX_FLUSH_QUEUE_DESC), this.regionWrapper.getMaxFlushQueueSize()); + addCounter(mrb, this.regionWrapper.getMemstoreOnlyRowReadsCount(), + MetricsRegionSource.ROW_READS_ONLY_ON_MEMSTORE, + MetricsRegionSource.ROW_READS_ONLY_ON_MEMSTORE_DESC); + addCounter(mrb, this.regionWrapper.getMixedRowReadsCount(), + MetricsRegionSource.MIXED_ROW_READS, + MetricsRegionSource.MIXED_ROW_READS_ON_STORE_DESC); + } + } + + private void addCounter(MetricsRecordBuilder mrb, Map metricMap, String metricName, + String metricDesc) { + if (metricMap != null) { + for (Entry entry : metricMap.entrySet()) { + // append 'store' and its name to the metric + mrb.addCounter(Interns.info( + this.regionNamePrefix1 + _STORE + entry.getKey() + this.regionNamePrefix2 + metricName, + metricDesc), entry.getValue()); + } } } diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionWrapper.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionWrapper.java index e63a424137d..6bf010ce91b 100644 --- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionWrapper.java +++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionWrapper.java @@ -18,6 +18,8 @@ package org.apache.hadoop.hbase.regionserver; +import java.util.Map; + import org.apache.yetus.audience.InterfaceAudience; /** @@ -170,4 +172,15 @@ public interface MetricsRegionWrapper { * all compacted store files that belong to this region */ long getMaxCompactedStoreFileRefCount(); + + /** + * @return the number of row reads completely on memstore per store + */ + Map getMemstoreOnlyRowReadsCount(); + + /** + * @return the number of row reads on memstore and file per store + */ + Map getMixedRowReadsCount(); + } diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsTableAggregateSourceImpl.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsTableAggregateSourceImpl.java index 5133a96db10..0b13e5c8dfe 100644 --- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsTableAggregateSourceImpl.java +++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsTableAggregateSourceImpl.java @@ -92,7 +92,6 @@ public class MetricsTableAggregateSourceImpl extends BaseSourceImpl @Override public void getMetrics(MetricsCollector collector, boolean all) { MetricsRecordBuilder mrb = collector.addRecord(metricsName); - if (tableSources != null) { for (MetricsTableSource tableMetricSource : tableSources.values()) { if (tableMetricSource instanceof MetricsTableSourceImpl) { diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsTableSourceImpl.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsTableSourceImpl.java index 5e789d50625..b39e1444dd2 100644 --- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsTableSourceImpl.java +++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsTableSourceImpl.java @@ -61,7 +61,10 @@ import static org.apache.hadoop.hbase.regionserver.MetricsRegionServerSource.SPL import static org.apache.hadoop.hbase.regionserver.MetricsRegionServerSource.SPLIT_SUCCESS_DESC; import static org.apache.hadoop.hbase.regionserver.MetricsRegionServerSource.SPLIT_SUCCESS_KEY; +import java.util.Map; +import java.util.Map.Entry; import java.util.concurrent.atomic.AtomicBoolean; + import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.metrics.Interns; import org.apache.hadoop.metrics2.MetricHistogram; @@ -75,6 +78,8 @@ import org.slf4j.LoggerFactory; @InterfaceAudience.Private public class MetricsTableSourceImpl implements MetricsTableSource { + private static final String _COLUMNFAMILY = "_columnfamily_"; + private static final Logger LOG = LoggerFactory.getLogger(MetricsTableSourceImpl.class); private AtomicBoolean closed = new AtomicBoolean(false); @@ -87,6 +92,8 @@ public class MetricsTableSourceImpl implements MetricsTableSource { private final MetricsTableAggregateSourceImpl agg; private final DynamicMetricsRegistry registry; private final String tableNamePrefix; + private final String tableNamePrefixPart1; + private final String tableNamePrefixPart2; private final TableName tableName; private final int hashCode; @@ -127,8 +134,11 @@ public class MetricsTableSourceImpl implements MetricsTableSource { this.tableWrapperAgg = tblWrapperAgg; this.registry = agg.getMetricsRegistry(); - this.tableNamePrefix = "Namespace_" + this.tableName.getNamespaceAsString() + - "_table_" + this.tableName.getQualifierAsString() + "_metric_"; + this.tableNamePrefixPart1 = "Namespace_" + this.tableName.getNamespaceAsString() + + "_table_" + this.tableName.getQualifierAsString(); + this.tableNamePrefixPart2 = "_metric_"; + this.tableNamePrefix = tableNamePrefixPart1 + + tableNamePrefixPart2; this.hashCode = this.tableName.hashCode(); } @@ -311,6 +321,25 @@ public class MetricsTableSourceImpl implements MetricsTableSource { mrb.addGauge(Interns.info(tableNamePrefix + MetricsRegionServerSource.NUM_REFERENCE_FILES, MetricsRegionServerSource.NUM_REFERENCE_FILES_DESC), tableWrapperAgg.getNumReferenceFiles(tableName.getNameAsString())); + addGauge(mrb, tableWrapperAgg.getMemstoreOnlyRowReadsCount(tableName.getNameAsString()), + MetricsRegionSource.ROW_READS_ONLY_ON_MEMSTORE, + MetricsRegionSource.ROW_READS_ONLY_ON_MEMSTORE_DESC); + addGauge(mrb, tableWrapperAgg.getMixedRowReadsCount(tableName.getNameAsString()), + MetricsRegionSource.MIXED_ROW_READS, + MetricsRegionSource.MIXED_ROW_READS_ON_STORE_DESC); + } + } + } + + private void addGauge(MetricsRecordBuilder mrb, Map metricMap, String metricName, + String metricDesc) { + if (metricMap != null) { + for (Entry entry : metricMap.entrySet()) { + // append 'store' and its name to the metric + mrb.addGauge(Interns.info(this.tableNamePrefixPart1 + _COLUMNFAMILY + + entry.getKey().split(MetricsTableWrapperAggregate.UNDERSCORE)[1] + + this.tableNamePrefixPart2 + metricName, + metricDesc), entry.getValue()); } } } diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsTableWrapperAggregate.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsTableWrapperAggregate.java index 48b3a2c8252..4b8c46af2c0 100644 --- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsTableWrapperAggregate.java +++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsTableWrapperAggregate.java @@ -18,6 +18,8 @@ package org.apache.hadoop.hbase.regionserver; +import java.util.Map; + import org.apache.yetus.audience.InterfaceAudience; /** @@ -26,7 +28,7 @@ import org.apache.yetus.audience.InterfaceAudience; */ @InterfaceAudience.Private public interface MetricsTableWrapperAggregate { - + public String UNDERSCORE = "_"; /** * Get the number of read requests that have been issued against this table */ @@ -107,6 +109,13 @@ public interface MetricsTableWrapperAggregate { */ long getNumReferenceFiles(String table); + /** + * @return number of row reads completely from memstore per store for this table + */ + Map getMemstoreOnlyRowReadsCount(String table); - + /** + * @return number of row reads from file and memstore per store for this table + */ + Map getMixedRowReadsCount(String table); } diff --git a/hbase-hadoop-compat/src/test/java/org/apache/hadoop/hbase/regionserver/MetricsTableWrapperStub.java b/hbase-hadoop-compat/src/test/java/org/apache/hadoop/hbase/regionserver/MetricsTableWrapperStub.java index 01d914e38ed..9a860a041d4 100644 --- a/hbase-hadoop-compat/src/test/java/org/apache/hadoop/hbase/regionserver/MetricsTableWrapperStub.java +++ b/hbase-hadoop-compat/src/test/java/org/apache/hadoop/hbase/regionserver/MetricsTableWrapperStub.java @@ -18,6 +18,9 @@ package org.apache.hadoop.hbase.regionserver; +import java.util.HashMap; +import java.util.Map; + public class MetricsTableWrapperStub implements MetricsTableWrapperAggregate { private String tableName; @@ -109,4 +112,18 @@ public class MetricsTableWrapperStub implements MetricsTableWrapperAggregate { public long getCpRequestsCount(String table) { return 99; } + + @Override + public Map getMemstoreOnlyRowReadsCount(String table) { + Map map = new HashMap(); + map.put("table_info", 3L); + return map; + } + + @Override + public Map getMixedRowReadsCount(String table) { + Map map = new HashMap(); + map.put("table_info", 3L); + return map; + } } diff --git a/hbase-hadoop-compat/src/test/java/org/apache/hadoop/hbase/regionserver/TestMetricsRegionSourceImpl.java b/hbase-hadoop-compat/src/test/java/org/apache/hadoop/hbase/regionserver/TestMetricsRegionSourceImpl.java index 4bd05661053..598658a56cc 100644 --- a/hbase-hadoop-compat/src/test/java/org/apache/hadoop/hbase/regionserver/TestMetricsRegionSourceImpl.java +++ b/hbase-hadoop-compat/src/test/java/org/apache/hadoop/hbase/regionserver/TestMetricsRegionSourceImpl.java @@ -20,6 +20,9 @@ package org.apache.hadoop.hbase.regionserver; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotEquals; +import java.util.HashMap; +import java.util.Map; + import org.apache.hadoop.hbase.CompatibilitySingletonFactory; import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.testclassification.MetricsTests; @@ -216,5 +219,19 @@ public class TestMetricsRegionSourceImpl { public long getTotalRequestCount() { return 0; } + + @Override + public Map getMemstoreOnlyRowReadsCount() { + Map map = new HashMap(); + map.put("info", 0L); + return map; + } + + @Override + public Map getMixedRowReadsCount() { + Map map = new HashMap(); + map.put("info", 0L); + return map; + } } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java index 3fbad6b7e25..f9b7e1c0069 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java @@ -46,12 +46,14 @@ import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.LongAdder; import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.function.Predicate; import java.util.function.ToLongFunction; import java.util.stream.Collectors; import java.util.stream.LongStream; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -94,6 +96,8 @@ import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController; import org.apache.hadoop.hbase.regionserver.wal.WALUtil; import org.apache.hadoop.hbase.security.EncryptionUtil; import org.apache.hadoop.hbase.security.User; +import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.CompactionDescriptor; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.ChecksumType; import org.apache.hadoop.hbase.util.ClassSize; @@ -114,9 +118,6 @@ import org.apache.hbase.thirdparty.com.google.common.collect.Maps; import org.apache.hbase.thirdparty.com.google.common.collect.Sets; import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils; import org.apache.hbase.thirdparty.org.apache.commons.collections4.IterableUtils; -import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; -import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.CompactionDescriptor; - /** * A Store holds a column family in a Region. Its a memstore and a set of zero * or more StoreFiles, which stretch backwards over time. @@ -162,6 +163,9 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, volatile boolean forceMajor = false; private AtomicLong storeSize = new AtomicLong(); private AtomicLong totalUncompressedBytes = new AtomicLong(); + private LongAdder memstoreOnlyRowReadsCount = new LongAdder(); + // rows that has cells from both memstore and files (or only files) + private LongAdder mixedRowReadsCount = new LongAdder(); private boolean cacheOnWriteLogged; @@ -331,7 +335,8 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, confPrintThreshold = 10; } this.parallelPutCountPrintThreshold = confPrintThreshold; - LOG.info("{} created, memstore type={}, storagePolicy={}, verifyBulkLoads={}, " + + LOG.info("Store={}, memstore type={}, storagePolicy={}, verifyBulkLoads={}, " + "parallelPutCountPrintThreshold={}, encoding={}, compression={}", this, memstore.getClass().getSimpleName(), policyName, verifyBulkLoads, parallelPutCountPrintThreshold, family.getDataBlockEncoding(), @@ -2546,7 +2551,7 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, } public static final long FIXED_OVERHEAD = - ClassSize.align(ClassSize.OBJECT + (27 * ClassSize.REFERENCE) + (2 * Bytes.SIZEOF_LONG) + ClassSize.align(ClassSize.OBJECT + (29 * ClassSize.REFERENCE) + (2 * Bytes.SIZEOF_LONG) + (6 * Bytes.SIZEOF_INT) + (2 * Bytes.SIZEOF_BOOLEAN)); public static final long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD @@ -2886,8 +2891,7 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, } /** - * @return get maximum ref count of storeFile among all compacted HStore Files - * for the HStore + * @return get maximum ref count of storeFile among all compacted HStore Files for the HStore */ public int getMaxCompactedStoreFileRefCount() { OptionalInt maxCompactedStoreFileRefCount = this.storeEngine.getStoreFileManager() @@ -2901,4 +2905,21 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, ? maxCompactedStoreFileRefCount.getAsInt() : 0; } + @Override + public long getMemstoreOnlyRowReadsCount() { + return memstoreOnlyRowReadsCount.sum(); + } + + @Override + public long getMixedRowReadsCount() { + return mixedRowReadsCount.sum(); + } + + void updateMetricsStore(boolean memstoreRead) { + if (memstoreRead) { + memstoreOnlyRowReadsCount.increment(); + } else { + mixedRowReadsCount.increment(); + } + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java index c629f7a7f8f..f28e28e31cc 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java @@ -112,6 +112,10 @@ public class KeyValueHeap extends NonReversedNonLazyKeyValueScanner return this.current.peek(); } + boolean isLatestCellFromMemstore() { + return !this.current.isFileScanner(); + } + @Override public Cell next() throws IOException { if(this.current == null) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionWrapperImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionWrapperImpl.java index 1c75137f753..1a266f76abb 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionWrapperImpl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionWrapperImpl.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.regionserver; import java.io.Closeable; import java.io.IOException; +import java.util.HashMap; import java.util.Map; import java.util.OptionalDouble; import java.util.OptionalLong; @@ -58,6 +59,8 @@ public class MetricsRegionWrapperImpl implements MetricsRegionWrapper, Closeable private long numReferenceFiles; private long maxFlushQueueSize; private long maxCompactionQueueSize; + private Map readsOnlyFromMemstore; + private Map mixedReadsOnStore; private ScheduledFuture regionMetricsUpdateTask; @@ -233,6 +236,16 @@ public class MetricsRegionWrapperImpl implements MetricsRegionWrapper, Closeable return this.region.hashCode(); } + @Override + public Map getMemstoreOnlyRowReadsCount() { + return readsOnlyFromMemstore; + } + + @Override + public Map getMixedRowReadsCount() { + return mixedReadsOnStore; + } + public class HRegionMetricsWrapperRunnable implements Runnable { @Override @@ -279,6 +292,26 @@ public class MetricsRegionWrapperImpl implements MetricsRegionWrapper, Closeable if (storeAvgStoreFileAge.isPresent()) { avgAgeNumerator += (long) storeAvgStoreFileAge.getAsDouble() * storeHFiles; } + if(mixedReadsOnStore == null) { + mixedReadsOnStore = new HashMap(); + } + Long tempVal = mixedReadsOnStore.get(store.getColumnFamilyName()); + if (tempVal == null) { + tempVal = 0L; + } else { + tempVal += store.getMixedRowReadsCount(); + } + mixedReadsOnStore.put(store.getColumnFamilyName(), tempVal); + if (readsOnlyFromMemstore == null) { + readsOnlyFromMemstore = new HashMap(); + } + tempVal = readsOnlyFromMemstore.get(store.getColumnFamilyName()); + if (tempVal == null) { + tempVal = 0L; + } else { + tempVal += store.getMemstoreOnlyRowReadsCount(); + } + readsOnlyFromMemstore.put(store.getColumnFamilyName(), tempVal); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsTableWrapperAggregateImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsTableWrapperAggregateImpl.java index d283b2b754c..7b5c6ef9701 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsTableWrapperAggregateImpl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsTableWrapperAggregateImpl.java @@ -53,7 +53,7 @@ public class MetricsTableWrapperAggregateImpl implements MetricsTableWrapperAggr this.executor = CompatibilitySingletonFactory.getInstance(MetricsExecutor.class).getExecutor(); this.runnable = new TableMetricsWrapperRunnable(); this.tableMetricsUpdateTask = this.executor.scheduleWithFixedDelay(this.runnable, period, - this.period, TimeUnit.MILLISECONDS); + period, TimeUnit.MILLISECONDS); } public class TableMetricsWrapperRunnable implements Runnable { @@ -61,7 +61,6 @@ public class MetricsTableWrapperAggregateImpl implements MetricsTableWrapperAggr @Override public void run() { Map localMetricsTableMap = new HashMap<>(); - for (Region r : regionServer.getOnlineRegionsLocalContext()) { TableName tbl = r.getTableDescriptor().getTableName(); MetricsTableValues mt = localMetricsTableMap.get(tbl); @@ -69,11 +68,17 @@ public class MetricsTableWrapperAggregateImpl implements MetricsTableWrapperAggr mt = new MetricsTableValues(); localMetricsTableMap.put(tbl, mt); } + long memstoreReadCount = 0L; + long mixedReadCount = 0L; + String tempKey = null; if (r.getStores() != null) { + String familyName = null; for (Store store : r.getStores()) { + familyName = store.getColumnFamilyName(); + mt.storeFileCount += store.getStorefilesCount(); - mt.memstoreSize += (store.getMemStoreSize().getDataSize() + - store.getMemStoreSize().getHeapSize() + store.getMemStoreSize().getOffHeapSize()); + mt.memstoreSize += (store.getMemStoreSize().getDataSize() + + store.getMemStoreSize().getHeapSize() + store.getMemStoreSize().getOffHeapSize()); mt.storeFileSize += store.getStorefilesSize(); mt.referenceFileCount += store.getNumReferenceFiles(); if (store.getMaxStoreFileAge().isPresent()) { @@ -89,13 +94,27 @@ public class MetricsTableWrapperAggregateImpl implements MetricsTableWrapperAggr (long) store.getAvgStoreFileAge().getAsDouble() * store.getStorefilesCount(); } mt.storeCount += 1; + tempKey = tbl.getNameAsString() + UNDERSCORE + familyName; + Long tempVal = mt.perStoreMemstoreOnlyReadCount.get(tempKey); + if (tempVal == null) { + tempVal = 0L; + } + memstoreReadCount = store.getMemstoreOnlyRowReadsCount() + tempVal; + tempVal = mt.perStoreMixedReadCount.get(tempKey); + if (tempVal == null) { + tempVal = 0L; + } + mixedReadCount = store.getMixedRowReadsCount() + tempVal; + // accumulate the count + mt.perStoreMemstoreOnlyReadCount.put(tempKey, memstoreReadCount); + mt.perStoreMixedReadCount.put(tempKey, mixedReadCount); } + mt.regionCount += 1; mt.readRequestCount += r.getReadRequestsCount(); - mt.filteredReadRequestCount += getFilteredReadRequestCount(tbl.getNameAsString()); + mt.filteredReadRequestCount += r.getFilteredReadRequestsCount(); mt.writeRequestCount += r.getWriteRequestsCount(); - } } @@ -133,6 +152,26 @@ public class MetricsTableWrapperAggregateImpl implements MetricsTableWrapperAggr } } + @Override + public Map getMemstoreOnlyRowReadsCount(String table) { + MetricsTableValues metricsTable = metricsTableMap.get(TableName.valueOf(table)); + if (metricsTable == null) { + return null; + } else { + return metricsTable.perStoreMemstoreOnlyReadCount; + } + } + + @Override + public Map getMixedRowReadsCount(String table) { + MetricsTableValues metricsTable = metricsTableMap.get(TableName.valueOf(table)); + if (metricsTable == null) { + return null; + } else { + return metricsTable.perStoreMixedReadCount; + } + } + @Override public long getCpRequestsCount(String table) { MetricsTableValues metricsTable = metricsTableMap.get(TableName.valueOf(table)); @@ -304,6 +343,8 @@ public class MetricsTableWrapperAggregateImpl implements MetricsTableWrapperAggr long totalStoreFileAge; long referenceFileCount; long cpRequestCount; + Map perStoreMemstoreOnlyReadCount = new HashMap<>(); + Map perStoreMixedReadCount = new HashMap<>(); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java index 6eb9f18b703..6ec9c51930c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java @@ -282,4 +282,14 @@ public interface Store { boolean isSloppyMemStore(); int getCurrentParallelPutCount(); + + /** + * @return the number of read requests purely from the memstore. + */ + long getMemstoreOnlyRowReadsCount(); + + /** + * @return the number of read requests from the files under this store. + */ + long getMixedRowReadsCount(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java index 654158e6379..021d09b829a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java @@ -93,6 +93,10 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner private final int minVersions; private final long maxRowSize; private final long cellsPerHeartbeatCheck; + @VisibleForTesting + long memstoreOnlyReads; + @VisibleForTesting + long mixedReads; // 1) Collects all the KVHeap that are eagerly getting closed during the // course of a scan @@ -565,173 +569,202 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner int count = 0; long totalBytesRead = 0; - - LOOP: do { - // Update and check the time limit based on the configured value of cellsPerTimeoutCheck - // Or if the preadMaxBytes is reached and we may want to return so we can switch to stream in - // the shipped method below. - if (kvsScanned % cellsPerHeartbeatCheck == 0 || (scanUsePread && - readType == Scan.ReadType.DEFAULT && bytesRead > preadMaxBytes)) { - if (scannerContext.checkTimeLimit(LimitScope.BETWEEN_CELLS)) { - return scannerContext.setScannerState(NextState.TIME_LIMIT_REACHED).hasMoreValues(); - } - } - // Do object compare - we set prevKV from the same heap. - if (prevCell != cell) { - ++kvsScanned; - } - checkScanOrder(prevCell, cell, comparator); - int cellSize = PrivateCellUtil.estimatedSerializedSizeOf(cell); - bytesRead += cellSize; - if (scanUsePread && readType == Scan.ReadType.DEFAULT && - bytesRead > preadMaxBytes) { - // return immediately if we want to switch from pread to stream. We need this because we can - // only switch in the shipped method, if user use a filter to filter out everything and rpc - // timeout is very large then the shipped method will never be called until the whole scan - // is finished, but at that time we have already scan all the data... - // See HBASE-20457 for more details. - // And there is still a scenario that can not be handled. If we have a very large row, which - // have millions of qualifiers, and filter.filterRow is used, then even if we set the flag - // here, we still need to scan all the qualifiers before returning... - scannerContext.returnImmediately(); - } - prevCell = cell; - scannerContext.setLastPeekedCell(cell); - topChanged = false; - ScanQueryMatcher.MatchCode qcode = matcher.match(cell); - switch (qcode) { - case INCLUDE: - case INCLUDE_AND_SEEK_NEXT_ROW: - case INCLUDE_AND_SEEK_NEXT_COL: - - Filter f = matcher.getFilter(); - if (f != null) { - cell = f.transformCell(cell); + boolean onlyFromMemstore = true; + try { + LOOP: do { + // Update and check the time limit based on the configured value of cellsPerTimeoutCheck + // Or if the preadMaxBytes is reached and we may want to return so we can switch to stream + // in + // the shipped method below. + if (kvsScanned % cellsPerHeartbeatCheck == 0 + || (scanUsePread && readType == Scan.ReadType.DEFAULT && bytesRead > preadMaxBytes)) { + if (scannerContext.checkTimeLimit(LimitScope.BETWEEN_CELLS)) { + return scannerContext.setScannerState(NextState.TIME_LIMIT_REACHED).hasMoreValues(); } + } + // Do object compare - we set prevKV from the same heap. + if (prevCell != cell) { + ++kvsScanned; + } + checkScanOrder(prevCell, cell, comparator); + int cellSize = PrivateCellUtil.estimatedSerializedSizeOf(cell); + bytesRead += cellSize; + if (scanUsePread && readType == Scan.ReadType.DEFAULT && bytesRead > preadMaxBytes) { + // return immediately if we want to switch from pread to stream. We need this because we + // can + // only switch in the shipped method, if user use a filter to filter out everything and + // rpc + // timeout is very large then the shipped method will never be called until the whole scan + // is finished, but at that time we have already scan all the data... + // See HBASE-20457 for more details. + // And there is still a scenario that can not be handled. If we have a very large row, + // which + // have millions of qualifiers, and filter.filterRow is used, then even if we set the flag + // here, we still need to scan all the qualifiers before returning... + scannerContext.returnImmediately(); + } + prevCell = cell; + scannerContext.setLastPeekedCell(cell); + topChanged = false; + ScanQueryMatcher.MatchCode qcode = matcher.match(cell); + switch (qcode) { + case INCLUDE: + case INCLUDE_AND_SEEK_NEXT_ROW: + case INCLUDE_AND_SEEK_NEXT_COL: - this.countPerRow++; - if (storeLimit > -1 && this.countPerRow > (storeLimit + storeOffset)) { - // do what SEEK_NEXT_ROW does. - if (!matcher.moreRowsMayExistAfter(cell)) { + Filter f = matcher.getFilter(); + if (f != null) { + cell = f.transformCell(cell); + } + this.countPerRow++; + if (storeLimit > -1 && this.countPerRow > (storeLimit + storeOffset)) { + // do what SEEK_NEXT_ROW does. + if (!matcher.moreRowsMayExistAfter(cell)) { + close(false);// Do all cleanup except heap.close() + return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); + } + matcher.clearCurrentRow(); + seekToNextRow(cell); + break LOOP; + } + + // add to results only if we have skipped #storeOffset kvs + // also update metric accordingly + if (this.countPerRow > storeOffset) { + outResult.add(cell); + + // Update local tracking information + count++; + totalBytesRead += cellSize; + + /** + * Increment the metric if all the cells are from memstore. + * If not we will account it for mixed reads + */ + onlyFromMemstore = onlyFromMemstore && heap.isLatestCellFromMemstore(); + // Update the progress of the scanner context + scannerContext.incrementSizeProgress(cellSize, cell.heapSize()); + scannerContext.incrementBatchProgress(1); + + if (matcher.isUserScan() && totalBytesRead > maxRowSize) { + String message = "Max row size allowed: " + maxRowSize + + ", but the row is bigger than that, the row info: " + + CellUtil.toString(cell, false) + ", already have process row cells = " + + outResult.size() + ", it belong to region = " + + store.getHRegion().getRegionInfo().getRegionNameAsString(); + LOG.warn(message); + throw new RowTooBigException(message); + } + } + + if (qcode == ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_ROW) { + if (!matcher.moreRowsMayExistAfter(cell)) { + close(false);// Do all cleanup except heap.close() + return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); + } + matcher.clearCurrentRow(); + seekOrSkipToNextRow(cell); + } else if (qcode == ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_COL) { + seekOrSkipToNextColumn(cell); + } else { + this.heap.next(); + } + + if (scannerContext.checkBatchLimit(LimitScope.BETWEEN_CELLS)) { + break LOOP; + } + if (scannerContext.checkSizeLimit(LimitScope.BETWEEN_CELLS)) { + break LOOP; + } + continue; + + case DONE: + // Optimization for Gets! If DONE, no more to get on this row, early exit! + if (get) { + // Then no more to this row... exit. close(false);// Do all cleanup except heap.close() + // update metric return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); } matcher.clearCurrentRow(); - seekToNextRow(cell); - break LOOP; - } + return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues(); - // add to results only if we have skipped #storeOffset kvs - // also update metric accordingly - if (this.countPerRow > storeOffset) { - outResult.add(cell); + case DONE_SCAN: + close(false);// Do all cleanup except heap.close() + return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); - // Update local tracking information - count++; - totalBytesRead += cellSize; - - // Update the progress of the scanner context - scannerContext.incrementSizeProgress(cellSize, cell.heapSize()); - scannerContext.incrementBatchProgress(1); - - if (matcher.isUserScan() && totalBytesRead > maxRowSize) { - String message = "Max row size allowed: " + maxRowSize - + ", but the row is bigger than that, the row info: " + CellUtil - .toString(cell, false) + ", already have process row cells = " + outResult.size() - + ", it belong to region = " + store.getHRegion().getRegionInfo() - .getRegionNameAsString(); - LOG.warn(message); - throw new RowTooBigException(message); - } - } - - if (qcode == ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_ROW) { + case SEEK_NEXT_ROW: + // This is just a relatively simple end of scan fix, to short-cut end + // us if there is an endKey in the scan. if (!matcher.moreRowsMayExistAfter(cell)) { close(false);// Do all cleanup except heap.close() return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); } matcher.clearCurrentRow(); seekOrSkipToNextRow(cell); - } else if (qcode == ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_COL) { - seekOrSkipToNextColumn(cell); - } else { - this.heap.next(); - } - - if (scannerContext.checkBatchLimit(LimitScope.BETWEEN_CELLS)) { - break LOOP; - } - if (scannerContext.checkSizeLimit(LimitScope.BETWEEN_CELLS)) { - break LOOP; - } - continue; - - case DONE: - // Optimization for Gets! If DONE, no more to get on this row, early exit! - if (get) { - // Then no more to this row... exit. - close(false);// Do all cleanup except heap.close() - return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); - } - matcher.clearCurrentRow(); - return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues(); - - case DONE_SCAN: - close(false);// Do all cleanup except heap.close() - return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); - - case SEEK_NEXT_ROW: - // This is just a relatively simple end of scan fix, to short-cut end - // us if there is an endKey in the scan. - if (!matcher.moreRowsMayExistAfter(cell)) { - close(false);// Do all cleanup except heap.close() - return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); - } - matcher.clearCurrentRow(); - seekOrSkipToNextRow(cell); - NextState stateAfterSeekNextRow = needToReturn(outResult); - if (stateAfterSeekNextRow != null) { - return scannerContext.setScannerState(stateAfterSeekNextRow).hasMoreValues(); - } - break; - - case SEEK_NEXT_COL: - seekOrSkipToNextColumn(cell); - NextState stateAfterSeekNextColumn = needToReturn(outResult); - if (stateAfterSeekNextColumn != null) { - return scannerContext.setScannerState(stateAfterSeekNextColumn).hasMoreValues(); - } - break; - - case SKIP: - this.heap.next(); - break; - - case SEEK_NEXT_USING_HINT: - Cell nextKV = matcher.getNextKeyHint(cell); - if (nextKV != null && comparator.compare(nextKV, cell) > 0) { - seekAsDirection(nextKV); - NextState stateAfterSeekByHint = needToReturn(outResult); - if (stateAfterSeekByHint != null) { - return scannerContext.setScannerState(stateAfterSeekByHint).hasMoreValues(); + NextState stateAfterSeekNextRow = needToReturn(outResult); + if (stateAfterSeekNextRow != null) { + return scannerContext.setScannerState(stateAfterSeekNextRow).hasMoreValues(); } - } else { - heap.next(); - } - break; + break; - default: - throw new RuntimeException("UNEXPECTED"); + case SEEK_NEXT_COL: + seekOrSkipToNextColumn(cell); + NextState stateAfterSeekNextColumn = needToReturn(outResult); + if (stateAfterSeekNextColumn != null) { + return scannerContext.setScannerState(stateAfterSeekNextColumn).hasMoreValues(); + } + break; + + case SKIP: + this.heap.next(); + break; + + case SEEK_NEXT_USING_HINT: + Cell nextKV = matcher.getNextKeyHint(cell); + if (nextKV != null && comparator.compare(nextKV, cell) > 0) { + seekAsDirection(nextKV); + NextState stateAfterSeekByHint = needToReturn(outResult); + if (stateAfterSeekByHint != null) { + return scannerContext.setScannerState(stateAfterSeekByHint).hasMoreValues(); + } + } else { + heap.next(); + } + break; + + default: + throw new RuntimeException("UNEXPECTED"); + } + } while ((cell = this.heap.peek()) != null); + + if (count > 0) { + return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues(); } - } while ((cell = this.heap.peek()) != null); - if (count > 0) { - return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues(); + // No more keys + close(false);// Do all cleanup except heap.close() + return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); + } finally { + // increment only if we have some result + if (count > 0) { + // if true increment memstore metrics, if not the mixed one + updateMetricsStore(onlyFromMemstore); + } } + } - // No more keys - close(false);// Do all cleanup except heap.close() - return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); + private void updateMetricsStore(boolean memstoreRead) { + if (store != null) { + store.updateMetricsStore(memstoreRead); + } else { + // for testing. + if (memstoreRead) { + memstoreOnlyReads++; + } else { + mixedReads++; + } + } } /** @@ -1208,4 +1241,3 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner } } } - diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/MetricsRegionWrapperStub.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/MetricsRegionWrapperStub.java index b1b8405bdd2..4f40f6289cb 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/MetricsRegionWrapperStub.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/MetricsRegionWrapperStub.java @@ -18,6 +18,9 @@ package org.apache.hadoop.hbase.regionserver; +import java.util.HashMap; +import java.util.Map; + public class MetricsRegionWrapperStub implements MetricsRegionWrapper { int replicaid = 0; @@ -182,4 +185,18 @@ public class MetricsRegionWrapperStub implements MetricsRegionWrapper { public long getTotalRequestCount() { return 0; } + + @Override + public Map getMemstoreOnlyRowReadsCount() { + Map map = new HashMap<>(); + map.put("info", 0L); + return map; + } + + @Override + public Map getMixedRowReadsCount() { + Map map = new HashMap<>(); + map.put("info", 0L); + return map; + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreScanner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreScanner.java index 995ef746268..885e7d2006b 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreScanner.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreScanner.java @@ -23,6 +23,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; + import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; @@ -31,6 +32,7 @@ import java.util.List; import java.util.NavigableSet; import java.util.TreeSet; import java.util.concurrent.atomic.AtomicInteger; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellBuilderType; @@ -405,6 +407,7 @@ public class TestStoreScanner { // We should have gone the optimize route 5 times totally... an INCLUDE for the four cells // in the row plus the DONE on the end. assertEquals(5, scanner.count.get()); + assertEquals(1, scanner.memstoreOnlyReads); // For a full row Get, there should be no opportunity for scanner optimization. assertEquals(0, scanner.optimization.get()); } @@ -479,6 +482,8 @@ public class TestStoreScanner { // And we should have gone through optimize twice only. assertEquals("First qcode is SEEK_NEXT_COL and second INCLUDE_AND_SEEK_NEXT_ROW", 3, scanner.count.get()); + assertEquals("Memstore Read count should be", 1, + scanner.memstoreOnlyReads); } } @@ -578,6 +583,7 @@ public class TestStoreScanner { List results = new ArrayList<>(); assertEquals(true, scan.next(results)); assertEquals(1, results.size()); + assertEquals(1, scan.memstoreOnlyReads); assertEquals(kvs[0], results.get(0)); } }