diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionSource.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionSource.java index 2a198d61bc5..b3a556e3d9f 100644 --- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionSource.java +++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionSource.java @@ -53,6 +53,10 @@ public interface MetricsRegionSource extends Comparable { String COPROCESSOR_EXECUTION_STATISTICS_DESC = "Statistics for coprocessor execution times"; String REPLICA_ID = "replicaid"; String REPLICA_ID_DESC = "The replica ID of a region. 0 is primary, otherwise is secondary"; + String ROW_READS_ONLY_ON_MEMSTORE = "memstoreOnlyRowReadsCount"; + String ROW_READS_ONLY_ON_MEMSTORE_DESC = "Row reads happening completely out of memstore"; + String MIXED_ROW_READS = "mixedRowReadsCount"; + String MIXED_ROW_READS_ON_STORE_DESC = "Row reads happening out of files and memstore on store"; /** * Close the region's metrics as this region is closing. diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionWrapper.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionWrapper.java index 34552edb0c0..28f8832e749 100644 --- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionWrapper.java +++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionWrapper.java @@ -18,6 +18,8 @@ package org.apache.hadoop.hbase.regionserver; +import java.util.Map; + import org.apache.yetus.audience.InterfaceAudience; /** @@ -163,4 +165,15 @@ public interface MetricsRegionWrapper { * all compacted store files that belong to this region */ long getMaxCompactedStoreFileRefCount(); + + /** + * @return the number of row reads completely on memstore per store + */ + Map getMemstoreOnlyRowReadsCount(); + + /** + * @return the number of row reads on memstore and file per store + */ + Map getMixedRowReadsCount(); + } diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsTableWrapperAggregate.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsTableWrapperAggregate.java index bf8b4c9ed0a..1db3c006438 100644 --- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsTableWrapperAggregate.java +++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsTableWrapperAggregate.java @@ -18,6 +18,8 @@ package org.apache.hadoop.hbase.regionserver; +import java.util.Map; + import org.apache.yetus.audience.InterfaceAudience; /** @@ -26,7 +28,7 @@ import org.apache.yetus.audience.InterfaceAudience; */ @InterfaceAudience.Private public interface MetricsTableWrapperAggregate { - + public String UNDERSCORE = "_"; /** * Get the number of read requests that have been issued against this table */ @@ -102,6 +104,13 @@ public interface MetricsTableWrapperAggregate { */ long getNumReferenceFiles(String table); + /** + * @return number of row reads completely from memstore per store for this table + */ + Map getMemstoreOnlyRowReadsCount(String table); - + /** + * @return number of row reads from file and memstore per store for this table + */ + Map getMixedRowReadsCount(String table); } diff --git a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionSourceImpl.java b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionSourceImpl.java index 6198717d3ba..5397496bf83 100644 --- a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionSourceImpl.java +++ b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionSourceImpl.java @@ -18,6 +18,8 @@ package org.apache.hadoop.hbase.regionserver; +import java.util.Map; +import java.util.Map.Entry; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.hadoop.hbase.metrics.Interns; @@ -33,6 +35,8 @@ public class MetricsRegionSourceImpl implements MetricsRegionSource { private static final Logger LOG = LoggerFactory.getLogger(MetricsRegionSourceImpl.class); + private static final String _STORE = "_store_"; + private AtomicBoolean closed = new AtomicBoolean(false); // Non-final so that we can null out the wrapper @@ -45,6 +49,8 @@ public class MetricsRegionSourceImpl implements MetricsRegionSource { private final DynamicMetricsRegistry registry; private final String regionNamePrefix; + private final String regionNamePrefix1; + private final String regionNamePrefix2; private final String regionPutKey; private final String regionDeleteKey; private final String regionGetKey; @@ -77,10 +83,10 @@ public class MetricsRegionSourceImpl implements MetricsRegionSource { registry = agg.getMetricsRegistry(); - regionNamePrefix = "Namespace_" + regionWrapper.getNamespace() + - "_table_" + regionWrapper.getTableName() + - "_region_" + regionWrapper.getRegionName() + - "_metric_"; + regionNamePrefix1 = "Namespace_" + regionWrapper.getNamespace() + "_table_" + + regionWrapper.getTableName() + "_region_" + regionWrapper.getRegionName(); + regionNamePrefix2 = "_metric_"; + regionNamePrefix = regionNamePrefix1 + regionNamePrefix2; String suffix = "Count"; @@ -298,6 +304,24 @@ public class MetricsRegionSourceImpl implements MetricsRegionSource { regionNamePrefix + MetricsRegionSource.MAX_FLUSH_QUEUE_SIZE, MetricsRegionSource.MAX_FLUSH_QUEUE_DESC), this.regionWrapper.getMaxFlushQueueSize()); + addCounter(mrb, this.regionWrapper.getMemstoreOnlyRowReadsCount(), + MetricsRegionSource.ROW_READS_ONLY_ON_MEMSTORE, + MetricsRegionSource.ROW_READS_ONLY_ON_MEMSTORE_DESC); + addCounter(mrb, this.regionWrapper.getMixedRowReadsCount(), + MetricsRegionSource.MIXED_ROW_READS, + MetricsRegionSource.MIXED_ROW_READS_ON_STORE_DESC); + } + } + + private void addCounter(MetricsRecordBuilder mrb, Map metricMap, String metricName, + String metricDesc) { + if (metricMap != null) { + for (Entry entry : metricMap.entrySet()) { + // append 'store' and its name to the metric + mrb.addCounter(Interns.info( + this.regionNamePrefix1 + _STORE + entry.getKey() + this.regionNamePrefix2 + metricName, + metricDesc), entry.getValue()); + } } } diff --git a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsTableAggregateSourceImpl.java b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsTableAggregateSourceImpl.java index 5133a96db10..0b13e5c8dfe 100644 --- a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsTableAggregateSourceImpl.java +++ b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsTableAggregateSourceImpl.java @@ -92,7 +92,6 @@ public class MetricsTableAggregateSourceImpl extends BaseSourceImpl @Override public void getMetrics(MetricsCollector collector, boolean all) { MetricsRecordBuilder mrb = collector.addRecord(metricsName); - if (tableSources != null) { for (MetricsTableSource tableMetricSource : tableSources.values()) { if (tableMetricSource instanceof MetricsTableSourceImpl) { diff --git a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsTableSourceImpl.java b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsTableSourceImpl.java index d65c446b3ef..84fc86eef98 100644 --- a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsTableSourceImpl.java +++ b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsTableSourceImpl.java @@ -61,7 +61,10 @@ import static org.apache.hadoop.hbase.regionserver.MetricsRegionServerSource.SPL import static org.apache.hadoop.hbase.regionserver.MetricsRegionServerSource.SPLIT_SUCCESS_DESC; import static org.apache.hadoop.hbase.regionserver.MetricsRegionServerSource.SPLIT_SUCCESS_KEY; +import java.util.Map; +import java.util.Map.Entry; import java.util.concurrent.atomic.AtomicBoolean; + import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.metrics.Interns; import org.apache.hadoop.metrics2.MetricHistogram; @@ -75,6 +78,8 @@ import org.slf4j.LoggerFactory; @InterfaceAudience.Private public class MetricsTableSourceImpl implements MetricsTableSource { + private static final String _COLUMNFAMILY = "_columnfamily_"; + private static final Logger LOG = LoggerFactory.getLogger(MetricsTableSourceImpl.class); private AtomicBoolean closed = new AtomicBoolean(false); @@ -87,6 +92,8 @@ public class MetricsTableSourceImpl implements MetricsTableSource { private final MetricsTableAggregateSourceImpl agg; private final DynamicMetricsRegistry registry; private final String tableNamePrefix; + private final String tableNamePrefixPart1; + private final String tableNamePrefixPart2; private final TableName tableName; private final int hashCode; @@ -127,8 +134,11 @@ public class MetricsTableSourceImpl implements MetricsTableSource { this.tableWrapperAgg = tblWrapperAgg; this.registry = agg.getMetricsRegistry(); - this.tableNamePrefix = "Namespace_" + this.tableName.getNamespaceAsString() + - "_table_" + this.tableName.getQualifierAsString() + "_metric_"; + this.tableNamePrefixPart1 = "Namespace_" + this.tableName.getNamespaceAsString() + + "_table_" + this.tableName.getQualifierAsString(); + this.tableNamePrefixPart2 = "_metric_"; + this.tableNamePrefix = tableNamePrefixPart1 + + tableNamePrefixPart2; this.hashCode = this.tableName.hashCode(); } @@ -308,6 +318,25 @@ public class MetricsTableSourceImpl implements MetricsTableSource { mrb.addGauge(Interns.info(tableNamePrefix + MetricsRegionServerSource.NUM_REFERENCE_FILES, MetricsRegionServerSource.NUM_REFERENCE_FILES_DESC), tableWrapperAgg.getNumReferenceFiles(tableName.getNameAsString())); + addGauge(mrb, tableWrapperAgg.getMemstoreOnlyRowReadsCount(tableName.getNameAsString()), + MetricsRegionSource.ROW_READS_ONLY_ON_MEMSTORE, + MetricsRegionSource.ROW_READS_ONLY_ON_MEMSTORE_DESC); + addGauge(mrb, tableWrapperAgg.getMixedRowReadsCount(tableName.getNameAsString()), + MetricsRegionSource.MIXED_ROW_READS, + MetricsRegionSource.MIXED_ROW_READS_ON_STORE_DESC); + } + } + } + + private void addGauge(MetricsRecordBuilder mrb, Map metricMap, String metricName, + String metricDesc) { + if (metricMap != null) { + for (Entry entry : metricMap.entrySet()) { + // append 'store' and its name to the metric + mrb.addGauge(Interns.info(this.tableNamePrefixPart1 + _COLUMNFAMILY + + entry.getKey().split(MetricsTableWrapperAggregate.UNDERSCORE)[1] + + this.tableNamePrefixPart2 + metricName, + metricDesc), entry.getValue()); } } } diff --git a/hbase-hadoop2-compat/src/test/java/org/apache/hadoop/hbase/regionserver/MetricsTableWrapperStub.java b/hbase-hadoop2-compat/src/test/java/org/apache/hadoop/hbase/regionserver/MetricsTableWrapperStub.java index b9d3d1f125a..b810a136026 100644 --- a/hbase-hadoop2-compat/src/test/java/org/apache/hadoop/hbase/regionserver/MetricsTableWrapperStub.java +++ b/hbase-hadoop2-compat/src/test/java/org/apache/hadoop/hbase/regionserver/MetricsTableWrapperStub.java @@ -18,6 +18,9 @@ package org.apache.hadoop.hbase.regionserver; +import java.util.HashMap; +import java.util.Map; + public class MetricsTableWrapperStub implements MetricsTableWrapperAggregate { private String tableName; @@ -104,4 +107,18 @@ public class MetricsTableWrapperStub implements MetricsTableWrapperAggregate { public long getAvgRegionSize(String table) { return 88; } + + @Override + public Map getMemstoreOnlyRowReadsCount(String table) { + Map map = new HashMap(); + map.put("table_info", 3L); + return map; + } + + @Override + public Map getMixedRowReadsCount(String table) { + Map map = new HashMap(); + map.put("table_info", 3L); + return map; + } } diff --git a/hbase-hadoop2-compat/src/test/java/org/apache/hadoop/hbase/regionserver/TestMetricsRegionSourceImpl.java b/hbase-hadoop2-compat/src/test/java/org/apache/hadoop/hbase/regionserver/TestMetricsRegionSourceImpl.java index 837e7da2719..a802e8321c3 100644 --- a/hbase-hadoop2-compat/src/test/java/org/apache/hadoop/hbase/regionserver/TestMetricsRegionSourceImpl.java +++ b/hbase-hadoop2-compat/src/test/java/org/apache/hadoop/hbase/regionserver/TestMetricsRegionSourceImpl.java @@ -20,6 +20,9 @@ package org.apache.hadoop.hbase.regionserver; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotEquals; +import java.util.HashMap; +import java.util.Map; + import org.apache.hadoop.hbase.CompatibilitySingletonFactory; import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.testclassification.MetricsTests; @@ -211,5 +214,19 @@ public class TestMetricsRegionSourceImpl { public long getTotalRequestCount() { return 0; } + + @Override + public Map getMemstoreOnlyRowReadsCount() { + Map map = new HashMap(); + map.put("info", 0L); + return map; + } + + @Override + public Map getMixedRowReadsCount() { + Map map = new HashMap(); + map.put("info", 0L); + return map; + } } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java index b94de5a84fb..f193d8b0840 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java @@ -46,12 +46,14 @@ import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.LongAdder; import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.function.Predicate; import java.util.function.ToLongFunction; import java.util.stream.Collectors; import java.util.stream.LongStream; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -116,7 +118,6 @@ import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUti import org.apache.hbase.thirdparty.org.apache.commons.collections4.IterableUtils; import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.CompactionDescriptor; - /** * A Store holds a column family in a Region. Its a memstore and a set of zero * or more StoreFiles, which stretch backwards over time. @@ -164,6 +165,9 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, static int closeCheckInterval = 0; private AtomicLong storeSize = new AtomicLong(); private AtomicLong totalUncompressedBytes = new AtomicLong(); + private LongAdder memstoreOnlyRowReadsCount = new LongAdder(); + // rows that has cells from both memstore and files (or only files) + private LongAdder mixedRowReadsCount = new LongAdder(); private boolean cacheOnWriteLogged; @@ -338,7 +342,8 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, confPrintThreshold = 10; } this.parallelPutCountPrintThreshold = confPrintThreshold; - LOG.info("{} created, memstore type={}, storagePolicy={}, verifyBulkLoads={}, " + + LOG.info("Store={}, memstore type={}, storagePolicy={}, verifyBulkLoads={}, " + "parallelPutCountPrintThreshold={}, encoding={}, compression={}", this, memstore.getClass().getSimpleName(), policyName, verifyBulkLoads, parallelPutCountPrintThreshold, family.getDataBlockEncoding(), @@ -2560,7 +2565,7 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, } public static final long FIXED_OVERHEAD = - ClassSize.align(ClassSize.OBJECT + (27 * ClassSize.REFERENCE) + (2 * Bytes.SIZEOF_LONG) + ClassSize.align(ClassSize.OBJECT + (29 * ClassSize.REFERENCE) + (2 * Bytes.SIZEOF_LONG) + (6 * Bytes.SIZEOF_INT) + (2 * Bytes.SIZEOF_BOOLEAN)); public static final long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD @@ -2873,8 +2878,7 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, } /** - * @return get maximum ref count of storeFile among all compacted HStore Files - * for the HStore + * @return get maximum ref count of storeFile among all compacted HStore Files for the HStore */ public int getMaxCompactedStoreFileRefCount() { OptionalInt maxCompactedStoreFileRefCount = this.storeEngine.getStoreFileManager() @@ -2897,7 +2901,7 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, if (rss == null) { return; } - List> filesWithSizes = new ArrayList<>(archivedFiles.size()); + List> filesWithSizes = new ArrayList<>(archivedFiles.size()); Iterator fileSizeIter = fileSizes.iterator(); for (StoreFile storeFile : archivedFiles) { final long fileSize = fileSizeIter.next(); @@ -2914,4 +2918,21 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, LOG.warn("Failed to report archival of files: " + filesWithSizes); } } + @Override + public long getMemstoreOnlyRowReadsCount() { + return memstoreOnlyRowReadsCount.sum(); + } + + @Override + public long getMixedRowReadsCount() { + return mixedRowReadsCount.sum(); + } + + void updateMetricsStore(boolean memstoreRead) { + if (memstoreRead) { + memstoreOnlyRowReadsCount.increment(); + } else { + mixedRowReadsCount.increment(); + } + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java index c629f7a7f8f..f28e28e31cc 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java @@ -112,6 +112,10 @@ public class KeyValueHeap extends NonReversedNonLazyKeyValueScanner return this.current.peek(); } + boolean isLatestCellFromMemstore() { + return !this.current.isFileScanner(); + } + @Override public Cell next() throws IOException { if(this.current == null) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionWrapperImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionWrapperImpl.java index 41b515a6197..c83a49bb384 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionWrapperImpl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionWrapperImpl.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.regionserver; import java.io.Closeable; import java.io.IOException; +import java.util.HashMap; import java.util.Map; import java.util.OptionalDouble; import java.util.OptionalLong; @@ -58,6 +59,8 @@ public class MetricsRegionWrapperImpl implements MetricsRegionWrapper, Closeable private long numReferenceFiles; private long maxFlushQueueSize; private long maxCompactionQueueSize; + private Map readsOnlyFromMemstore; + private Map mixedReadsOnStore; private ScheduledFuture regionMetricsUpdateTask; @@ -228,6 +231,16 @@ public class MetricsRegionWrapperImpl implements MetricsRegionWrapper, Closeable return this.region.hashCode(); } + @Override + public Map getMemstoreOnlyRowReadsCount() { + return readsOnlyFromMemstore; + } + + @Override + public Map getMixedRowReadsCount() { + return mixedReadsOnStore; + } + public class HRegionMetricsWrapperRunnable implements Runnable { @Override @@ -274,6 +287,26 @@ public class MetricsRegionWrapperImpl implements MetricsRegionWrapper, Closeable if (storeAvgStoreFileAge.isPresent()) { avgAgeNumerator += (long) storeAvgStoreFileAge.getAsDouble() * storeHFiles; } + if(mixedReadsOnStore == null) { + mixedReadsOnStore = new HashMap(); + } + Long tempVal = mixedReadsOnStore.get(store.getColumnFamilyName()); + if (tempVal == null) { + tempVal = 0L; + } else { + tempVal += store.getMixedRowReadsCount(); + } + mixedReadsOnStore.put(store.getColumnFamilyName(), tempVal); + if (readsOnlyFromMemstore == null) { + readsOnlyFromMemstore = new HashMap(); + } + tempVal = readsOnlyFromMemstore.get(store.getColumnFamilyName()); + if (tempVal == null) { + tempVal = 0L; + } else { + tempVal += store.getMemstoreOnlyRowReadsCount(); + } + readsOnlyFromMemstore.put(store.getColumnFamilyName(), tempVal); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsTableWrapperAggregateImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsTableWrapperAggregateImpl.java index f727decbc63..142d2ccac9d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsTableWrapperAggregateImpl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsTableWrapperAggregateImpl.java @@ -53,7 +53,7 @@ public class MetricsTableWrapperAggregateImpl implements MetricsTableWrapperAggr this.executor = CompatibilitySingletonFactory.getInstance(MetricsExecutor.class).getExecutor(); this.runnable = new TableMetricsWrapperRunnable(); this.tableMetricsUpdateTask = this.executor.scheduleWithFixedDelay(this.runnable, period, - this.period, TimeUnit.MILLISECONDS); + period, TimeUnit.MILLISECONDS); } public class TableMetricsWrapperRunnable implements Runnable { @@ -61,7 +61,6 @@ public class MetricsTableWrapperAggregateImpl implements MetricsTableWrapperAggr @Override public void run() { Map localMetricsTableMap = new HashMap<>(); - for (Region r : regionServer.getOnlineRegionsLocalContext()) { TableName tbl = r.getTableDescriptor().getTableName(); MetricsTableValues mt = localMetricsTableMap.get(tbl); @@ -69,11 +68,17 @@ public class MetricsTableWrapperAggregateImpl implements MetricsTableWrapperAggr mt = new MetricsTableValues(); localMetricsTableMap.put(tbl, mt); } + long memstoreReadCount = 0L; + long mixedReadCount = 0L; + String tempKey = null; if (r.getStores() != null) { + String familyName = null; for (Store store : r.getStores()) { + familyName = store.getColumnFamilyName(); + mt.storeFileCount += store.getStorefilesCount(); - mt.memstoreSize += (store.getMemStoreSize().getDataSize() + - store.getMemStoreSize().getHeapSize() + store.getMemStoreSize().getOffHeapSize()); + mt.memstoreSize += (store.getMemStoreSize().getDataSize() + + store.getMemStoreSize().getHeapSize() + store.getMemStoreSize().getOffHeapSize()); mt.storeFileSize += store.getStorefilesSize(); mt.referenceFileCount += store.getNumReferenceFiles(); if (store.getMaxStoreFileAge().isPresent()) { @@ -89,13 +94,27 @@ public class MetricsTableWrapperAggregateImpl implements MetricsTableWrapperAggr (long) store.getAvgStoreFileAge().getAsDouble() * store.getStorefilesCount(); } mt.storeCount += 1; + tempKey = tbl.getNameAsString() + UNDERSCORE + familyName; + Long tempVal = mt.perStoreMemstoreOnlyReadCount.get(tempKey); + if (tempVal == null) { + tempVal = 0L; + } + memstoreReadCount = store.getMemstoreOnlyRowReadsCount() + tempVal; + tempVal = mt.perStoreMixedReadCount.get(tempKey); + if (tempVal == null) { + tempVal = 0L; + } + mixedReadCount = store.getMixedRowReadsCount() + tempVal; + // accumulate the count + mt.perStoreMemstoreOnlyReadCount.put(tempKey, memstoreReadCount); + mt.perStoreMixedReadCount.put(tempKey, mixedReadCount); } + mt.regionCount += 1; mt.readRequestCount += r.getReadRequestsCount(); - mt.filteredReadRequestCount += getFilteredReadRequestCount(tbl.getNameAsString()); + mt.filteredReadRequestCount += r.getFilteredReadRequestsCount(); mt.writeRequestCount += r.getWriteRequestsCount(); - } } @@ -133,6 +152,35 @@ public class MetricsTableWrapperAggregateImpl implements MetricsTableWrapperAggr } } + @Override + public Map getMemstoreOnlyRowReadsCount(String table) { + MetricsTableValues metricsTable = metricsTableMap.get(TableName.valueOf(table)); + if (metricsTable == null) { + return null; + } else { + return metricsTable.perStoreMemstoreOnlyReadCount; + } + } + + @Override + public Map getMixedRowReadsCount(String table) { + MetricsTableValues metricsTable = metricsTableMap.get(TableName.valueOf(table)); + if (metricsTable == null) { + return null; + } else { + return metricsTable.perStoreMixedReadCount; + } + } + + public long getCpRequestsCount(String table) { + MetricsTableValues metricsTable = metricsTableMap.get(TableName.valueOf(table)); + if (metricsTable == null) { + return 0; + } else { + return metricsTable.cpRequestCount; + } + } + public long getFilteredReadRequestCount(String table) { MetricsTableValues metricsTable = metricsTableMap.get(TableName.valueOf(table)); if (metricsTable == null) { @@ -294,6 +342,8 @@ public class MetricsTableWrapperAggregateImpl implements MetricsTableWrapperAggr long totalStoreFileAge; long referenceFileCount; long cpRequestCount; + Map perStoreMemstoreOnlyReadCount = new HashMap<>(); + Map perStoreMixedReadCount = new HashMap<>(); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java index 6eb9f18b703..6ec9c51930c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java @@ -282,4 +282,14 @@ public interface Store { boolean isSloppyMemStore(); int getCurrentParallelPutCount(); + + /** + * @return the number of read requests purely from the memstore. + */ + long getMemstoreOnlyRowReadsCount(); + + /** + * @return the number of read requests from the files under this store. + */ + long getMixedRowReadsCount(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java index 654158e6379..3ad3be86149 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java @@ -93,6 +93,10 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner private final int minVersions; private final long maxRowSize; private final long cellsPerHeartbeatCheck; + @VisibleForTesting + long memstoreOnlyReads; + @VisibleForTesting + long mixedReads; // 1) Collects all the KVHeap that are eagerly getting closed during the // course of a scan @@ -350,6 +354,23 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner seekAllScanner(scanInfo, scanners); } + // Used to instantiate a scanner for user scan in test + @VisibleForTesting + StoreScanner(Scan scan, ScanInfo scanInfo, NavigableSet columns, + List scanners, ScanType scanType) throws IOException { + // 0 is passed as readpoint because the test bypasses Store + this(null, scan, scanInfo, columns != null ? columns.size() : 0, 0L, scan.getCacheBlocks(), + scanType); + if (scanType == ScanType.USER_SCAN) { + this.matcher = + UserScanQueryMatcher.create(scan, scanInfo, columns, oldestUnexpiredTS, now, null); + } else { + this.matcher = CompactionScanQueryMatcher.create(scanInfo, scanType, Long.MAX_VALUE, + HConstants.OLDEST_TIMESTAMP, oldestUnexpiredTS, now, null, null, null); + } + seekAllScanner(scanInfo, scanners); + } + // Used to instantiate a scanner for compaction in test @VisibleForTesting StoreScanner(ScanInfo scanInfo, int maxVersions, ScanType scanType, @@ -565,173 +586,202 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner int count = 0; long totalBytesRead = 0; - - LOOP: do { - // Update and check the time limit based on the configured value of cellsPerTimeoutCheck - // Or if the preadMaxBytes is reached and we may want to return so we can switch to stream in - // the shipped method below. - if (kvsScanned % cellsPerHeartbeatCheck == 0 || (scanUsePread && - readType == Scan.ReadType.DEFAULT && bytesRead > preadMaxBytes)) { - if (scannerContext.checkTimeLimit(LimitScope.BETWEEN_CELLS)) { - return scannerContext.setScannerState(NextState.TIME_LIMIT_REACHED).hasMoreValues(); - } - } - // Do object compare - we set prevKV from the same heap. - if (prevCell != cell) { - ++kvsScanned; - } - checkScanOrder(prevCell, cell, comparator); - int cellSize = PrivateCellUtil.estimatedSerializedSizeOf(cell); - bytesRead += cellSize; - if (scanUsePread && readType == Scan.ReadType.DEFAULT && - bytesRead > preadMaxBytes) { - // return immediately if we want to switch from pread to stream. We need this because we can - // only switch in the shipped method, if user use a filter to filter out everything and rpc - // timeout is very large then the shipped method will never be called until the whole scan - // is finished, but at that time we have already scan all the data... - // See HBASE-20457 for more details. - // And there is still a scenario that can not be handled. If we have a very large row, which - // have millions of qualifiers, and filter.filterRow is used, then even if we set the flag - // here, we still need to scan all the qualifiers before returning... - scannerContext.returnImmediately(); - } - prevCell = cell; - scannerContext.setLastPeekedCell(cell); - topChanged = false; - ScanQueryMatcher.MatchCode qcode = matcher.match(cell); - switch (qcode) { - case INCLUDE: - case INCLUDE_AND_SEEK_NEXT_ROW: - case INCLUDE_AND_SEEK_NEXT_COL: - - Filter f = matcher.getFilter(); - if (f != null) { - cell = f.transformCell(cell); + boolean onlyFromMemstore = matcher.isUserScan(); + try { + LOOP: do { + // Update and check the time limit based on the configured value of cellsPerTimeoutCheck + // Or if the preadMaxBytes is reached and we may want to return so we can switch to stream + // in + // the shipped method below. + if (kvsScanned % cellsPerHeartbeatCheck == 0 + || (scanUsePread && readType == Scan.ReadType.DEFAULT && bytesRead > preadMaxBytes)) { + if (scannerContext.checkTimeLimit(LimitScope.BETWEEN_CELLS)) { + return scannerContext.setScannerState(NextState.TIME_LIMIT_REACHED).hasMoreValues(); } + } + // Do object compare - we set prevKV from the same heap. + if (prevCell != cell) { + ++kvsScanned; + } + checkScanOrder(prevCell, cell, comparator); + int cellSize = PrivateCellUtil.estimatedSerializedSizeOf(cell); + bytesRead += cellSize; + if (scanUsePread && readType == Scan.ReadType.DEFAULT && bytesRead > preadMaxBytes) { + // return immediately if we want to switch from pread to stream. We need this because we + // can + // only switch in the shipped method, if user use a filter to filter out everything and + // rpc + // timeout is very large then the shipped method will never be called until the whole scan + // is finished, but at that time we have already scan all the data... + // See HBASE-20457 for more details. + // And there is still a scenario that can not be handled. If we have a very large row, + // which + // have millions of qualifiers, and filter.filterRow is used, then even if we set the flag + // here, we still need to scan all the qualifiers before returning... + scannerContext.returnImmediately(); + } + prevCell = cell; + scannerContext.setLastPeekedCell(cell); + topChanged = false; + ScanQueryMatcher.MatchCode qcode = matcher.match(cell); + switch (qcode) { + case INCLUDE: + case INCLUDE_AND_SEEK_NEXT_ROW: + case INCLUDE_AND_SEEK_NEXT_COL: - this.countPerRow++; - if (storeLimit > -1 && this.countPerRow > (storeLimit + storeOffset)) { - // do what SEEK_NEXT_ROW does. - if (!matcher.moreRowsMayExistAfter(cell)) { + Filter f = matcher.getFilter(); + if (f != null) { + cell = f.transformCell(cell); + } + this.countPerRow++; + if (storeLimit > -1 && this.countPerRow > (storeLimit + storeOffset)) { + // do what SEEK_NEXT_ROW does. + if (!matcher.moreRowsMayExistAfter(cell)) { + close(false);// Do all cleanup except heap.close() + return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); + } + matcher.clearCurrentRow(); + seekToNextRow(cell); + break LOOP; + } + + // add to results only if we have skipped #storeOffset kvs + // also update metric accordingly + if (this.countPerRow > storeOffset) { + outResult.add(cell); + + // Update local tracking information + count++; + totalBytesRead += cellSize; + + /** + * Increment the metric if all the cells are from memstore. + * If not we will account it for mixed reads + */ + onlyFromMemstore = onlyFromMemstore && heap.isLatestCellFromMemstore(); + // Update the progress of the scanner context + scannerContext.incrementSizeProgress(cellSize, cell.heapSize()); + scannerContext.incrementBatchProgress(1); + + if (matcher.isUserScan() && totalBytesRead > maxRowSize) { + String message = "Max row size allowed: " + maxRowSize + + ", but the row is bigger than that, the row info: " + + CellUtil.toString(cell, false) + ", already have process row cells = " + + outResult.size() + ", it belong to region = " + + store.getHRegion().getRegionInfo().getRegionNameAsString(); + LOG.warn(message); + throw new RowTooBigException(message); + } + } + + if (qcode == ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_ROW) { + if (!matcher.moreRowsMayExistAfter(cell)) { + close(false);// Do all cleanup except heap.close() + return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); + } + matcher.clearCurrentRow(); + seekOrSkipToNextRow(cell); + } else if (qcode == ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_COL) { + seekOrSkipToNextColumn(cell); + } else { + this.heap.next(); + } + + if (scannerContext.checkBatchLimit(LimitScope.BETWEEN_CELLS)) { + break LOOP; + } + if (scannerContext.checkSizeLimit(LimitScope.BETWEEN_CELLS)) { + break LOOP; + } + continue; + + case DONE: + // Optimization for Gets! If DONE, no more to get on this row, early exit! + if (get) { + // Then no more to this row... exit. close(false);// Do all cleanup except heap.close() + // update metric return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); } matcher.clearCurrentRow(); - seekToNextRow(cell); - break LOOP; - } + return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues(); - // add to results only if we have skipped #storeOffset kvs - // also update metric accordingly - if (this.countPerRow > storeOffset) { - outResult.add(cell); + case DONE_SCAN: + close(false);// Do all cleanup except heap.close() + return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); - // Update local tracking information - count++; - totalBytesRead += cellSize; - - // Update the progress of the scanner context - scannerContext.incrementSizeProgress(cellSize, cell.heapSize()); - scannerContext.incrementBatchProgress(1); - - if (matcher.isUserScan() && totalBytesRead > maxRowSize) { - String message = "Max row size allowed: " + maxRowSize - + ", but the row is bigger than that, the row info: " + CellUtil - .toString(cell, false) + ", already have process row cells = " + outResult.size() - + ", it belong to region = " + store.getHRegion().getRegionInfo() - .getRegionNameAsString(); - LOG.warn(message); - throw new RowTooBigException(message); - } - } - - if (qcode == ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_ROW) { + case SEEK_NEXT_ROW: + // This is just a relatively simple end of scan fix, to short-cut end + // us if there is an endKey in the scan. if (!matcher.moreRowsMayExistAfter(cell)) { close(false);// Do all cleanup except heap.close() return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); } matcher.clearCurrentRow(); seekOrSkipToNextRow(cell); - } else if (qcode == ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_COL) { - seekOrSkipToNextColumn(cell); - } else { - this.heap.next(); - } - - if (scannerContext.checkBatchLimit(LimitScope.BETWEEN_CELLS)) { - break LOOP; - } - if (scannerContext.checkSizeLimit(LimitScope.BETWEEN_CELLS)) { - break LOOP; - } - continue; - - case DONE: - // Optimization for Gets! If DONE, no more to get on this row, early exit! - if (get) { - // Then no more to this row... exit. - close(false);// Do all cleanup except heap.close() - return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); - } - matcher.clearCurrentRow(); - return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues(); - - case DONE_SCAN: - close(false);// Do all cleanup except heap.close() - return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); - - case SEEK_NEXT_ROW: - // This is just a relatively simple end of scan fix, to short-cut end - // us if there is an endKey in the scan. - if (!matcher.moreRowsMayExistAfter(cell)) { - close(false);// Do all cleanup except heap.close() - return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); - } - matcher.clearCurrentRow(); - seekOrSkipToNextRow(cell); - NextState stateAfterSeekNextRow = needToReturn(outResult); - if (stateAfterSeekNextRow != null) { - return scannerContext.setScannerState(stateAfterSeekNextRow).hasMoreValues(); - } - break; - - case SEEK_NEXT_COL: - seekOrSkipToNextColumn(cell); - NextState stateAfterSeekNextColumn = needToReturn(outResult); - if (stateAfterSeekNextColumn != null) { - return scannerContext.setScannerState(stateAfterSeekNextColumn).hasMoreValues(); - } - break; - - case SKIP: - this.heap.next(); - break; - - case SEEK_NEXT_USING_HINT: - Cell nextKV = matcher.getNextKeyHint(cell); - if (nextKV != null && comparator.compare(nextKV, cell) > 0) { - seekAsDirection(nextKV); - NextState stateAfterSeekByHint = needToReturn(outResult); - if (stateAfterSeekByHint != null) { - return scannerContext.setScannerState(stateAfterSeekByHint).hasMoreValues(); + NextState stateAfterSeekNextRow = needToReturn(outResult); + if (stateAfterSeekNextRow != null) { + return scannerContext.setScannerState(stateAfterSeekNextRow).hasMoreValues(); } - } else { - heap.next(); - } - break; + break; - default: - throw new RuntimeException("UNEXPECTED"); + case SEEK_NEXT_COL: + seekOrSkipToNextColumn(cell); + NextState stateAfterSeekNextColumn = needToReturn(outResult); + if (stateAfterSeekNextColumn != null) { + return scannerContext.setScannerState(stateAfterSeekNextColumn).hasMoreValues(); + } + break; + + case SKIP: + this.heap.next(); + break; + + case SEEK_NEXT_USING_HINT: + Cell nextKV = matcher.getNextKeyHint(cell); + if (nextKV != null && comparator.compare(nextKV, cell) > 0) { + seekAsDirection(nextKV); + NextState stateAfterSeekByHint = needToReturn(outResult); + if (stateAfterSeekByHint != null) { + return scannerContext.setScannerState(stateAfterSeekByHint).hasMoreValues(); + } + } else { + heap.next(); + } + break; + + default: + throw new RuntimeException("UNEXPECTED"); + } + } while ((cell = this.heap.peek()) != null); + + if (count > 0) { + return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues(); } - } while ((cell = this.heap.peek()) != null); - if (count > 0) { - return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues(); + // No more keys + close(false);// Do all cleanup except heap.close() + return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); + } finally { + // increment only if we have some result + if (count > 0 && matcher.isUserScan()) { + // if true increment memstore metrics, if not the mixed one + updateMetricsStore(onlyFromMemstore); + } } + } - // No more keys - close(false);// Do all cleanup except heap.close() - return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); + private void updateMetricsStore(boolean memstoreRead) { + if (store != null) { + store.updateMetricsStore(memstoreRead); + } else { + // for testing. + if (memstoreRead) { + memstoreOnlyReads++; + } else { + mixedReads++; + } + } } /** @@ -1208,4 +1258,3 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner } } } - diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/MetricsRegionWrapperStub.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/MetricsRegionWrapperStub.java index b6d714dbaa8..05ac2718afa 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/MetricsRegionWrapperStub.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/MetricsRegionWrapperStub.java @@ -18,6 +18,9 @@ package org.apache.hadoop.hbase.regionserver; +import java.util.HashMap; +import java.util.Map; + public class MetricsRegionWrapperStub implements MetricsRegionWrapper { int replicaid = 0; @@ -177,4 +180,18 @@ public class MetricsRegionWrapperStub implements MetricsRegionWrapper { public long getTotalRequestCount() { return 0; } + + @Override + public Map getMemstoreOnlyRowReadsCount() { + Map map = new HashMap<>(); + map.put("info", 0L); + return map; + } + + @Override + public Map getMixedRowReadsCount() { + Map map = new HashMap<>(); + map.put("info", 0L); + return map; + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreScanner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreScanner.java index 4c9531f1bf1..207d1584e7d 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreScanner.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreScanner.java @@ -24,6 +24,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; + import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; @@ -32,6 +33,7 @@ import java.util.List; import java.util.NavigableSet; import java.util.TreeSet; import java.util.concurrent.atomic.AtomicInteger; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellComparator; @@ -350,6 +352,7 @@ public class TestStoreScanner { // We should have gone the optimize route 5 times totally... an INCLUDE for the four cells // in the row plus the DONE on the end. assertEquals(5, scanner.count.get()); + assertEquals(1, scanner.memstoreOnlyReads); // For a full row Get, there should be no opportunity for scanner optimization. assertEquals(0, scanner.optimization.get()); } @@ -424,6 +427,8 @@ public class TestStoreScanner { // And we should have gone through optimize twice only. assertEquals("First qcode is SEEK_NEXT_COL and second INCLUDE_AND_SEEK_NEXT_ROW", 3, scanner.count.get()); + assertEquals("Memstore Read count should be", 1, + scanner.memstoreOnlyReads); } } @@ -523,6 +528,29 @@ public class TestStoreScanner { List results = new ArrayList<>(); assertEquals(true, scan.next(results)); assertEquals(1, results.size()); + assertEquals(1, scan.memstoreOnlyReads); + assertEquals(kvs[0], results.get(0)); + } + } + + @Test + public void testNonUserScan() throws IOException { + // returns only 1 of these 2 even though same timestamp + KeyValue [] kvs = new KeyValue[] { + create("R1", "cf", "a", 1, KeyValue.Type.Put, "dont-care"), + create("R1", "cf", "a", 1, KeyValue.Type.Put, "dont-care"), + }; + List scanners = Arrays.asList( + new KeyValueScanner[] {new KeyValueScanFixture(CellComparator.getInstance(), kvs)}); + + Scan scanSpec = new Scan().withStartRow(Bytes.toBytes("R1")); + // this only uses maxVersions (default=1) and TimeRange (default=all) + try (StoreScanner scan = new StoreScanner(scanSpec, scanInfo, getCols("a"), scanners, + ScanType.COMPACT_RETAIN_DELETES)) { + List results = new ArrayList<>(); + assertEquals(true, scan.next(results)); + assertEquals(1, results.size()); + assertEquals(0, scan.memstoreOnlyReads); assertEquals(kvs[0], results.get(0)); } }