HBASE-24205 Create metric to know the number of reads that happens fr… (#1552)
* HBASE-24205 Create metric to know the number of reads that happens from memstore * Fix checkstyles and whitespaces * Checkstyl, whitespace and javadoc * Fixed review comments * Fix unused imports * Rebase with latest commit * Adding the table vs store metric by consolidating * Combine get and scan metrics and make all relevant changes * Track for full row and then increment either memstore or file read metric * TestMetricsStore test fix * Only increment the memstore metric if all cells are from memstore, if not treat as mixed reads * Remove metricsstore and aggregate at region level * Addresses review comments-metric name updated everywhere * Metric name change * Review comment changes Co-authored-by: Ramkrishna <ramkrishna@apache.org> Signed-off by:Anoop Sam John<anoopsamjohn@gmail.com> Signed-off by:Viraj Jasani<virajjasani@apache.org>
This commit is contained in:
parent
3558ee0c3b
commit
510aad3125
|
@ -53,6 +53,10 @@ public interface MetricsRegionSource extends Comparable<MetricsRegionSource> {
|
||||||
String COPROCESSOR_EXECUTION_STATISTICS_DESC = "Statistics for coprocessor execution times";
|
String COPROCESSOR_EXECUTION_STATISTICS_DESC = "Statistics for coprocessor execution times";
|
||||||
String REPLICA_ID = "replicaid";
|
String REPLICA_ID = "replicaid";
|
||||||
String REPLICA_ID_DESC = "The replica ID of a region. 0 is primary, otherwise is secondary";
|
String REPLICA_ID_DESC = "The replica ID of a region. 0 is primary, otherwise is secondary";
|
||||||
|
String ROW_READS_ONLY_ON_MEMSTORE = "memstoreOnlyRowReadsCount";
|
||||||
|
String ROW_READS_ONLY_ON_MEMSTORE_DESC = "Row reads happening completely out of memstore";
|
||||||
|
String MIXED_ROW_READS = "mixedRowReadsCount";
|
||||||
|
String MIXED_ROW_READS_ON_STORE_DESC = "Row reads happening out of files and memstore on store";
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Close the region's metrics as this region is closing.
|
* Close the region's metrics as this region is closing.
|
||||||
|
|
|
@ -18,6 +18,8 @@
|
||||||
|
|
||||||
package org.apache.hadoop.hbase.regionserver;
|
package org.apache.hadoop.hbase.regionserver;
|
||||||
|
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.Map.Entry;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
|
||||||
import org.apache.hadoop.hbase.metrics.Interns;
|
import org.apache.hadoop.hbase.metrics.Interns;
|
||||||
|
@ -33,6 +35,8 @@ public class MetricsRegionSourceImpl implements MetricsRegionSource {
|
||||||
|
|
||||||
private static final Logger LOG = LoggerFactory.getLogger(MetricsRegionSourceImpl.class);
|
private static final Logger LOG = LoggerFactory.getLogger(MetricsRegionSourceImpl.class);
|
||||||
|
|
||||||
|
private static final String _STORE = "_store_";
|
||||||
|
|
||||||
private AtomicBoolean closed = new AtomicBoolean(false);
|
private AtomicBoolean closed = new AtomicBoolean(false);
|
||||||
|
|
||||||
// Non-final so that we can null out the wrapper
|
// Non-final so that we can null out the wrapper
|
||||||
|
@ -45,6 +49,8 @@ public class MetricsRegionSourceImpl implements MetricsRegionSource {
|
||||||
private final DynamicMetricsRegistry registry;
|
private final DynamicMetricsRegistry registry;
|
||||||
|
|
||||||
private final String regionNamePrefix;
|
private final String regionNamePrefix;
|
||||||
|
private final String regionNamePrefix1;
|
||||||
|
private final String regionNamePrefix2;
|
||||||
private final String regionPutKey;
|
private final String regionPutKey;
|
||||||
private final String regionDeleteKey;
|
private final String regionDeleteKey;
|
||||||
private final String regionGetKey;
|
private final String regionGetKey;
|
||||||
|
@ -77,10 +83,10 @@ public class MetricsRegionSourceImpl implements MetricsRegionSource {
|
||||||
|
|
||||||
registry = agg.getMetricsRegistry();
|
registry = agg.getMetricsRegistry();
|
||||||
|
|
||||||
regionNamePrefix = "Namespace_" + regionWrapper.getNamespace() +
|
regionNamePrefix1 = "Namespace_" + regionWrapper.getNamespace() + "_table_"
|
||||||
"_table_" + regionWrapper.getTableName() +
|
+ regionWrapper.getTableName() + "_region_" + regionWrapper.getRegionName();
|
||||||
"_region_" + regionWrapper.getRegionName() +
|
regionNamePrefix2 = "_metric_";
|
||||||
"_metric_";
|
regionNamePrefix = regionNamePrefix1 + regionNamePrefix2;
|
||||||
|
|
||||||
String suffix = "Count";
|
String suffix = "Count";
|
||||||
|
|
||||||
|
@ -302,6 +308,24 @@ public class MetricsRegionSourceImpl implements MetricsRegionSource {
|
||||||
regionNamePrefix + MetricsRegionSource.MAX_FLUSH_QUEUE_SIZE,
|
regionNamePrefix + MetricsRegionSource.MAX_FLUSH_QUEUE_SIZE,
|
||||||
MetricsRegionSource.MAX_FLUSH_QUEUE_DESC),
|
MetricsRegionSource.MAX_FLUSH_QUEUE_DESC),
|
||||||
this.regionWrapper.getMaxFlushQueueSize());
|
this.regionWrapper.getMaxFlushQueueSize());
|
||||||
|
addCounter(mrb, this.regionWrapper.getMemstoreOnlyRowReadsCount(),
|
||||||
|
MetricsRegionSource.ROW_READS_ONLY_ON_MEMSTORE,
|
||||||
|
MetricsRegionSource.ROW_READS_ONLY_ON_MEMSTORE_DESC);
|
||||||
|
addCounter(mrb, this.regionWrapper.getMixedRowReadsCount(),
|
||||||
|
MetricsRegionSource.MIXED_ROW_READS,
|
||||||
|
MetricsRegionSource.MIXED_ROW_READS_ON_STORE_DESC);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void addCounter(MetricsRecordBuilder mrb, Map<String, Long> metricMap, String metricName,
|
||||||
|
String metricDesc) {
|
||||||
|
if (metricMap != null) {
|
||||||
|
for (Entry<String, Long> entry : metricMap.entrySet()) {
|
||||||
|
// append 'store' and its name to the metric
|
||||||
|
mrb.addCounter(Interns.info(
|
||||||
|
this.regionNamePrefix1 + _STORE + entry.getKey() + this.regionNamePrefix2 + metricName,
|
||||||
|
metricDesc), entry.getValue());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -18,6 +18,8 @@
|
||||||
|
|
||||||
package org.apache.hadoop.hbase.regionserver;
|
package org.apache.hadoop.hbase.regionserver;
|
||||||
|
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
import org.apache.yetus.audience.InterfaceAudience;
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -170,4 +172,15 @@ public interface MetricsRegionWrapper {
|
||||||
* all compacted store files that belong to this region
|
* all compacted store files that belong to this region
|
||||||
*/
|
*/
|
||||||
long getMaxCompactedStoreFileRefCount();
|
long getMaxCompactedStoreFileRefCount();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return the number of row reads completely on memstore per store
|
||||||
|
*/
|
||||||
|
Map<String, Long> getMemstoreOnlyRowReadsCount();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return the number of row reads on memstore and file per store
|
||||||
|
*/
|
||||||
|
Map<String, Long> getMixedRowReadsCount();
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -92,7 +92,6 @@ public class MetricsTableAggregateSourceImpl extends BaseSourceImpl
|
||||||
@Override
|
@Override
|
||||||
public void getMetrics(MetricsCollector collector, boolean all) {
|
public void getMetrics(MetricsCollector collector, boolean all) {
|
||||||
MetricsRecordBuilder mrb = collector.addRecord(metricsName);
|
MetricsRecordBuilder mrb = collector.addRecord(metricsName);
|
||||||
|
|
||||||
if (tableSources != null) {
|
if (tableSources != null) {
|
||||||
for (MetricsTableSource tableMetricSource : tableSources.values()) {
|
for (MetricsTableSource tableMetricSource : tableSources.values()) {
|
||||||
if (tableMetricSource instanceof MetricsTableSourceImpl) {
|
if (tableMetricSource instanceof MetricsTableSourceImpl) {
|
||||||
|
|
|
@ -61,7 +61,10 @@ import static org.apache.hadoop.hbase.regionserver.MetricsRegionServerSource.SPL
|
||||||
import static org.apache.hadoop.hbase.regionserver.MetricsRegionServerSource.SPLIT_SUCCESS_DESC;
|
import static org.apache.hadoop.hbase.regionserver.MetricsRegionServerSource.SPLIT_SUCCESS_DESC;
|
||||||
import static org.apache.hadoop.hbase.regionserver.MetricsRegionServerSource.SPLIT_SUCCESS_KEY;
|
import static org.apache.hadoop.hbase.regionserver.MetricsRegionServerSource.SPLIT_SUCCESS_KEY;
|
||||||
|
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.Map.Entry;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
|
||||||
import org.apache.hadoop.hbase.TableName;
|
import org.apache.hadoop.hbase.TableName;
|
||||||
import org.apache.hadoop.hbase.metrics.Interns;
|
import org.apache.hadoop.hbase.metrics.Interns;
|
||||||
import org.apache.hadoop.metrics2.MetricHistogram;
|
import org.apache.hadoop.metrics2.MetricHistogram;
|
||||||
|
@ -75,6 +78,8 @@ import org.slf4j.LoggerFactory;
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
public class MetricsTableSourceImpl implements MetricsTableSource {
|
public class MetricsTableSourceImpl implements MetricsTableSource {
|
||||||
|
|
||||||
|
private static final String _COLUMNFAMILY = "_columnfamily_";
|
||||||
|
|
||||||
private static final Logger LOG = LoggerFactory.getLogger(MetricsTableSourceImpl.class);
|
private static final Logger LOG = LoggerFactory.getLogger(MetricsTableSourceImpl.class);
|
||||||
|
|
||||||
private AtomicBoolean closed = new AtomicBoolean(false);
|
private AtomicBoolean closed = new AtomicBoolean(false);
|
||||||
|
@ -87,6 +92,8 @@ public class MetricsTableSourceImpl implements MetricsTableSource {
|
||||||
private final MetricsTableAggregateSourceImpl agg;
|
private final MetricsTableAggregateSourceImpl agg;
|
||||||
private final DynamicMetricsRegistry registry;
|
private final DynamicMetricsRegistry registry;
|
||||||
private final String tableNamePrefix;
|
private final String tableNamePrefix;
|
||||||
|
private final String tableNamePrefixPart1;
|
||||||
|
private final String tableNamePrefixPart2;
|
||||||
private final TableName tableName;
|
private final TableName tableName;
|
||||||
private final int hashCode;
|
private final int hashCode;
|
||||||
|
|
||||||
|
@ -127,8 +134,11 @@ public class MetricsTableSourceImpl implements MetricsTableSource {
|
||||||
|
|
||||||
this.tableWrapperAgg = tblWrapperAgg;
|
this.tableWrapperAgg = tblWrapperAgg;
|
||||||
this.registry = agg.getMetricsRegistry();
|
this.registry = agg.getMetricsRegistry();
|
||||||
this.tableNamePrefix = "Namespace_" + this.tableName.getNamespaceAsString() +
|
this.tableNamePrefixPart1 = "Namespace_" + this.tableName.getNamespaceAsString() +
|
||||||
"_table_" + this.tableName.getQualifierAsString() + "_metric_";
|
"_table_" + this.tableName.getQualifierAsString();
|
||||||
|
this.tableNamePrefixPart2 = "_metric_";
|
||||||
|
this.tableNamePrefix = tableNamePrefixPart1 +
|
||||||
|
tableNamePrefixPart2;
|
||||||
this.hashCode = this.tableName.hashCode();
|
this.hashCode = this.tableName.hashCode();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -311,6 +321,25 @@ public class MetricsTableSourceImpl implements MetricsTableSource {
|
||||||
mrb.addGauge(Interns.info(tableNamePrefix + MetricsRegionServerSource.NUM_REFERENCE_FILES,
|
mrb.addGauge(Interns.info(tableNamePrefix + MetricsRegionServerSource.NUM_REFERENCE_FILES,
|
||||||
MetricsRegionServerSource.NUM_REFERENCE_FILES_DESC),
|
MetricsRegionServerSource.NUM_REFERENCE_FILES_DESC),
|
||||||
tableWrapperAgg.getNumReferenceFiles(tableName.getNameAsString()));
|
tableWrapperAgg.getNumReferenceFiles(tableName.getNameAsString()));
|
||||||
|
addGauge(mrb, tableWrapperAgg.getMemstoreOnlyRowReadsCount(tableName.getNameAsString()),
|
||||||
|
MetricsRegionSource.ROW_READS_ONLY_ON_MEMSTORE,
|
||||||
|
MetricsRegionSource.ROW_READS_ONLY_ON_MEMSTORE_DESC);
|
||||||
|
addGauge(mrb, tableWrapperAgg.getMixedRowReadsCount(tableName.getNameAsString()),
|
||||||
|
MetricsRegionSource.MIXED_ROW_READS,
|
||||||
|
MetricsRegionSource.MIXED_ROW_READS_ON_STORE_DESC);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void addGauge(MetricsRecordBuilder mrb, Map<String, Long> metricMap, String metricName,
|
||||||
|
String metricDesc) {
|
||||||
|
if (metricMap != null) {
|
||||||
|
for (Entry<String, Long> entry : metricMap.entrySet()) {
|
||||||
|
// append 'store' and its name to the metric
|
||||||
|
mrb.addGauge(Interns.info(this.tableNamePrefixPart1 + _COLUMNFAMILY
|
||||||
|
+ entry.getKey().split(MetricsTableWrapperAggregate.UNDERSCORE)[1]
|
||||||
|
+ this.tableNamePrefixPart2 + metricName,
|
||||||
|
metricDesc), entry.getValue());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -18,6 +18,8 @@
|
||||||
|
|
||||||
package org.apache.hadoop.hbase.regionserver;
|
package org.apache.hadoop.hbase.regionserver;
|
||||||
|
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
import org.apache.yetus.audience.InterfaceAudience;
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -26,7 +28,7 @@ import org.apache.yetus.audience.InterfaceAudience;
|
||||||
*/
|
*/
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
public interface MetricsTableWrapperAggregate {
|
public interface MetricsTableWrapperAggregate {
|
||||||
|
public String UNDERSCORE = "_";
|
||||||
/**
|
/**
|
||||||
* Get the number of read requests that have been issued against this table
|
* Get the number of read requests that have been issued against this table
|
||||||
*/
|
*/
|
||||||
|
@ -107,6 +109,13 @@ public interface MetricsTableWrapperAggregate {
|
||||||
*/
|
*/
|
||||||
long getNumReferenceFiles(String table);
|
long getNumReferenceFiles(String table);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return number of row reads completely from memstore per store for this table
|
||||||
|
*/
|
||||||
|
Map<String, Long> getMemstoreOnlyRowReadsCount(String table);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return number of row reads from file and memstore per store for this table
|
||||||
|
*/
|
||||||
|
Map<String, Long> getMixedRowReadsCount(String table);
|
||||||
}
|
}
|
||||||
|
|
|
@ -18,6 +18,9 @@
|
||||||
|
|
||||||
package org.apache.hadoop.hbase.regionserver;
|
package org.apache.hadoop.hbase.regionserver;
|
||||||
|
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
public class MetricsTableWrapperStub implements MetricsTableWrapperAggregate {
|
public class MetricsTableWrapperStub implements MetricsTableWrapperAggregate {
|
||||||
|
|
||||||
private String tableName;
|
private String tableName;
|
||||||
|
@ -109,4 +112,18 @@ public class MetricsTableWrapperStub implements MetricsTableWrapperAggregate {
|
||||||
public long getCpRequestsCount(String table) {
|
public long getCpRequestsCount(String table) {
|
||||||
return 99;
|
return 99;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Map<String, Long> getMemstoreOnlyRowReadsCount(String table) {
|
||||||
|
Map<String, Long> map = new HashMap<String, Long>();
|
||||||
|
map.put("table_info", 3L);
|
||||||
|
return map;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Map<String, Long> getMixedRowReadsCount(String table) {
|
||||||
|
Map<String, Long> map = new HashMap<String, Long>();
|
||||||
|
map.put("table_info", 3L);
|
||||||
|
return map;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -20,6 +20,9 @@ package org.apache.hadoop.hbase.regionserver;
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
import static org.junit.Assert.assertNotEquals;
|
import static org.junit.Assert.assertNotEquals;
|
||||||
|
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
import org.apache.hadoop.hbase.CompatibilitySingletonFactory;
|
import org.apache.hadoop.hbase.CompatibilitySingletonFactory;
|
||||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||||
import org.apache.hadoop.hbase.testclassification.MetricsTests;
|
import org.apache.hadoop.hbase.testclassification.MetricsTests;
|
||||||
|
@ -216,5 +219,19 @@ public class TestMetricsRegionSourceImpl {
|
||||||
public long getTotalRequestCount() {
|
public long getTotalRequestCount() {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Map<String, Long> getMemstoreOnlyRowReadsCount() {
|
||||||
|
Map<String, Long> map = new HashMap<String, Long>();
|
||||||
|
map.put("info", 0L);
|
||||||
|
return map;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Map<String, Long> getMixedRowReadsCount() {
|
||||||
|
Map<String, Long> map = new HashMap<String, Long>();
|
||||||
|
map.put("info", 0L);
|
||||||
|
return map;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -46,12 +46,14 @@ import java.util.concurrent.ThreadPoolExecutor;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
import java.util.concurrent.atomic.AtomicLong;
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
|
import java.util.concurrent.atomic.LongAdder;
|
||||||
import java.util.concurrent.locks.ReentrantLock;
|
import java.util.concurrent.locks.ReentrantLock;
|
||||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||||
import java.util.function.Predicate;
|
import java.util.function.Predicate;
|
||||||
import java.util.function.ToLongFunction;
|
import java.util.function.ToLongFunction;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
import java.util.stream.LongStream;
|
import java.util.stream.LongStream;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
|
@ -94,6 +96,8 @@ import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
|
||||||
import org.apache.hadoop.hbase.regionserver.wal.WALUtil;
|
import org.apache.hadoop.hbase.regionserver.wal.WALUtil;
|
||||||
import org.apache.hadoop.hbase.security.EncryptionUtil;
|
import org.apache.hadoop.hbase.security.EncryptionUtil;
|
||||||
import org.apache.hadoop.hbase.security.User;
|
import org.apache.hadoop.hbase.security.User;
|
||||||
|
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
|
||||||
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.CompactionDescriptor;
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
import org.apache.hadoop.hbase.util.ChecksumType;
|
import org.apache.hadoop.hbase.util.ChecksumType;
|
||||||
import org.apache.hadoop.hbase.util.ClassSize;
|
import org.apache.hadoop.hbase.util.ClassSize;
|
||||||
|
@ -114,9 +118,6 @@ import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
|
||||||
import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
|
import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
|
||||||
import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils;
|
import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils;
|
||||||
import org.apache.hbase.thirdparty.org.apache.commons.collections4.IterableUtils;
|
import org.apache.hbase.thirdparty.org.apache.commons.collections4.IterableUtils;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
|
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.CompactionDescriptor;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A Store holds a column family in a Region. Its a memstore and a set of zero
|
* A Store holds a column family in a Region. Its a memstore and a set of zero
|
||||||
* or more StoreFiles, which stretch backwards over time.
|
* or more StoreFiles, which stretch backwards over time.
|
||||||
|
@ -162,6 +163,9 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
|
||||||
volatile boolean forceMajor = false;
|
volatile boolean forceMajor = false;
|
||||||
private AtomicLong storeSize = new AtomicLong();
|
private AtomicLong storeSize = new AtomicLong();
|
||||||
private AtomicLong totalUncompressedBytes = new AtomicLong();
|
private AtomicLong totalUncompressedBytes = new AtomicLong();
|
||||||
|
private LongAdder memstoreOnlyRowReadsCount = new LongAdder();
|
||||||
|
// rows that has cells from both memstore and files (or only files)
|
||||||
|
private LongAdder mixedRowReadsCount = new LongAdder();
|
||||||
|
|
||||||
private boolean cacheOnWriteLogged;
|
private boolean cacheOnWriteLogged;
|
||||||
|
|
||||||
|
@ -331,7 +335,8 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
|
||||||
confPrintThreshold = 10;
|
confPrintThreshold = 10;
|
||||||
}
|
}
|
||||||
this.parallelPutCountPrintThreshold = confPrintThreshold;
|
this.parallelPutCountPrintThreshold = confPrintThreshold;
|
||||||
LOG.info("{} created, memstore type={}, storagePolicy={}, verifyBulkLoads={}, "
|
|
||||||
|
LOG.info("Store={}, memstore type={}, storagePolicy={}, verifyBulkLoads={}, "
|
||||||
+ "parallelPutCountPrintThreshold={}, encoding={}, compression={}",
|
+ "parallelPutCountPrintThreshold={}, encoding={}, compression={}",
|
||||||
this, memstore.getClass().getSimpleName(), policyName, verifyBulkLoads,
|
this, memstore.getClass().getSimpleName(), policyName, verifyBulkLoads,
|
||||||
parallelPutCountPrintThreshold, family.getDataBlockEncoding(),
|
parallelPutCountPrintThreshold, family.getDataBlockEncoding(),
|
||||||
|
@ -2546,7 +2551,7 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
|
||||||
}
|
}
|
||||||
|
|
||||||
public static final long FIXED_OVERHEAD =
|
public static final long FIXED_OVERHEAD =
|
||||||
ClassSize.align(ClassSize.OBJECT + (27 * ClassSize.REFERENCE) + (2 * Bytes.SIZEOF_LONG)
|
ClassSize.align(ClassSize.OBJECT + (29 * ClassSize.REFERENCE) + (2 * Bytes.SIZEOF_LONG)
|
||||||
+ (6 * Bytes.SIZEOF_INT) + (2 * Bytes.SIZEOF_BOOLEAN));
|
+ (6 * Bytes.SIZEOF_INT) + (2 * Bytes.SIZEOF_BOOLEAN));
|
||||||
|
|
||||||
public static final long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD
|
public static final long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD
|
||||||
|
@ -2886,8 +2891,7 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @return get maximum ref count of storeFile among all compacted HStore Files
|
* @return get maximum ref count of storeFile among all compacted HStore Files for the HStore
|
||||||
* for the HStore
|
|
||||||
*/
|
*/
|
||||||
public int getMaxCompactedStoreFileRefCount() {
|
public int getMaxCompactedStoreFileRefCount() {
|
||||||
OptionalInt maxCompactedStoreFileRefCount = this.storeEngine.getStoreFileManager()
|
OptionalInt maxCompactedStoreFileRefCount = this.storeEngine.getStoreFileManager()
|
||||||
|
@ -2901,4 +2905,21 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
|
||||||
? maxCompactedStoreFileRefCount.getAsInt() : 0;
|
? maxCompactedStoreFileRefCount.getAsInt() : 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long getMemstoreOnlyRowReadsCount() {
|
||||||
|
return memstoreOnlyRowReadsCount.sum();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long getMixedRowReadsCount() {
|
||||||
|
return mixedRowReadsCount.sum();
|
||||||
|
}
|
||||||
|
|
||||||
|
void updateMetricsStore(boolean memstoreRead) {
|
||||||
|
if (memstoreRead) {
|
||||||
|
memstoreOnlyRowReadsCount.increment();
|
||||||
|
} else {
|
||||||
|
mixedRowReadsCount.increment();
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -112,6 +112,10 @@ public class KeyValueHeap extends NonReversedNonLazyKeyValueScanner
|
||||||
return this.current.peek();
|
return this.current.peek();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
boolean isLatestCellFromMemstore() {
|
||||||
|
return !this.current.isFileScanner();
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Cell next() throws IOException {
|
public Cell next() throws IOException {
|
||||||
if(this.current == null) {
|
if(this.current == null) {
|
||||||
|
|
|
@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.regionserver;
|
||||||
|
|
||||||
import java.io.Closeable;
|
import java.io.Closeable;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.util.HashMap;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.OptionalDouble;
|
import java.util.OptionalDouble;
|
||||||
import java.util.OptionalLong;
|
import java.util.OptionalLong;
|
||||||
|
@ -58,6 +59,8 @@ public class MetricsRegionWrapperImpl implements MetricsRegionWrapper, Closeable
|
||||||
private long numReferenceFiles;
|
private long numReferenceFiles;
|
||||||
private long maxFlushQueueSize;
|
private long maxFlushQueueSize;
|
||||||
private long maxCompactionQueueSize;
|
private long maxCompactionQueueSize;
|
||||||
|
private Map<String, Long> readsOnlyFromMemstore;
|
||||||
|
private Map<String, Long> mixedReadsOnStore;
|
||||||
|
|
||||||
private ScheduledFuture<?> regionMetricsUpdateTask;
|
private ScheduledFuture<?> regionMetricsUpdateTask;
|
||||||
|
|
||||||
|
@ -233,6 +236,16 @@ public class MetricsRegionWrapperImpl implements MetricsRegionWrapper, Closeable
|
||||||
return this.region.hashCode();
|
return this.region.hashCode();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Map<String, Long> getMemstoreOnlyRowReadsCount() {
|
||||||
|
return readsOnlyFromMemstore;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Map<String, Long> getMixedRowReadsCount() {
|
||||||
|
return mixedReadsOnStore;
|
||||||
|
}
|
||||||
|
|
||||||
public class HRegionMetricsWrapperRunnable implements Runnable {
|
public class HRegionMetricsWrapperRunnable implements Runnable {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -279,6 +292,26 @@ public class MetricsRegionWrapperImpl implements MetricsRegionWrapper, Closeable
|
||||||
if (storeAvgStoreFileAge.isPresent()) {
|
if (storeAvgStoreFileAge.isPresent()) {
|
||||||
avgAgeNumerator += (long) storeAvgStoreFileAge.getAsDouble() * storeHFiles;
|
avgAgeNumerator += (long) storeAvgStoreFileAge.getAsDouble() * storeHFiles;
|
||||||
}
|
}
|
||||||
|
if(mixedReadsOnStore == null) {
|
||||||
|
mixedReadsOnStore = new HashMap<String, Long>();
|
||||||
|
}
|
||||||
|
Long tempVal = mixedReadsOnStore.get(store.getColumnFamilyName());
|
||||||
|
if (tempVal == null) {
|
||||||
|
tempVal = 0L;
|
||||||
|
} else {
|
||||||
|
tempVal += store.getMixedRowReadsCount();
|
||||||
|
}
|
||||||
|
mixedReadsOnStore.put(store.getColumnFamilyName(), tempVal);
|
||||||
|
if (readsOnlyFromMemstore == null) {
|
||||||
|
readsOnlyFromMemstore = new HashMap<String, Long>();
|
||||||
|
}
|
||||||
|
tempVal = readsOnlyFromMemstore.get(store.getColumnFamilyName());
|
||||||
|
if (tempVal == null) {
|
||||||
|
tempVal = 0L;
|
||||||
|
} else {
|
||||||
|
tempVal += store.getMemstoreOnlyRowReadsCount();
|
||||||
|
}
|
||||||
|
readsOnlyFromMemstore.put(store.getColumnFamilyName(), tempVal);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -53,7 +53,7 @@ public class MetricsTableWrapperAggregateImpl implements MetricsTableWrapperAggr
|
||||||
this.executor = CompatibilitySingletonFactory.getInstance(MetricsExecutor.class).getExecutor();
|
this.executor = CompatibilitySingletonFactory.getInstance(MetricsExecutor.class).getExecutor();
|
||||||
this.runnable = new TableMetricsWrapperRunnable();
|
this.runnable = new TableMetricsWrapperRunnable();
|
||||||
this.tableMetricsUpdateTask = this.executor.scheduleWithFixedDelay(this.runnable, period,
|
this.tableMetricsUpdateTask = this.executor.scheduleWithFixedDelay(this.runnable, period,
|
||||||
this.period, TimeUnit.MILLISECONDS);
|
period, TimeUnit.MILLISECONDS);
|
||||||
}
|
}
|
||||||
|
|
||||||
public class TableMetricsWrapperRunnable implements Runnable {
|
public class TableMetricsWrapperRunnable implements Runnable {
|
||||||
|
@ -61,7 +61,6 @@ public class MetricsTableWrapperAggregateImpl implements MetricsTableWrapperAggr
|
||||||
@Override
|
@Override
|
||||||
public void run() {
|
public void run() {
|
||||||
Map<TableName, MetricsTableValues> localMetricsTableMap = new HashMap<>();
|
Map<TableName, MetricsTableValues> localMetricsTableMap = new HashMap<>();
|
||||||
|
|
||||||
for (Region r : regionServer.getOnlineRegionsLocalContext()) {
|
for (Region r : regionServer.getOnlineRegionsLocalContext()) {
|
||||||
TableName tbl = r.getTableDescriptor().getTableName();
|
TableName tbl = r.getTableDescriptor().getTableName();
|
||||||
MetricsTableValues mt = localMetricsTableMap.get(tbl);
|
MetricsTableValues mt = localMetricsTableMap.get(tbl);
|
||||||
|
@ -69,11 +68,17 @@ public class MetricsTableWrapperAggregateImpl implements MetricsTableWrapperAggr
|
||||||
mt = new MetricsTableValues();
|
mt = new MetricsTableValues();
|
||||||
localMetricsTableMap.put(tbl, mt);
|
localMetricsTableMap.put(tbl, mt);
|
||||||
}
|
}
|
||||||
|
long memstoreReadCount = 0L;
|
||||||
|
long mixedReadCount = 0L;
|
||||||
|
String tempKey = null;
|
||||||
if (r.getStores() != null) {
|
if (r.getStores() != null) {
|
||||||
|
String familyName = null;
|
||||||
for (Store store : r.getStores()) {
|
for (Store store : r.getStores()) {
|
||||||
|
familyName = store.getColumnFamilyName();
|
||||||
|
|
||||||
mt.storeFileCount += store.getStorefilesCount();
|
mt.storeFileCount += store.getStorefilesCount();
|
||||||
mt.memstoreSize += (store.getMemStoreSize().getDataSize() +
|
mt.memstoreSize += (store.getMemStoreSize().getDataSize()
|
||||||
store.getMemStoreSize().getHeapSize() + store.getMemStoreSize().getOffHeapSize());
|
+ store.getMemStoreSize().getHeapSize() + store.getMemStoreSize().getOffHeapSize());
|
||||||
mt.storeFileSize += store.getStorefilesSize();
|
mt.storeFileSize += store.getStorefilesSize();
|
||||||
mt.referenceFileCount += store.getNumReferenceFiles();
|
mt.referenceFileCount += store.getNumReferenceFiles();
|
||||||
if (store.getMaxStoreFileAge().isPresent()) {
|
if (store.getMaxStoreFileAge().isPresent()) {
|
||||||
|
@ -89,13 +94,27 @@ public class MetricsTableWrapperAggregateImpl implements MetricsTableWrapperAggr
|
||||||
(long) store.getAvgStoreFileAge().getAsDouble() * store.getStorefilesCount();
|
(long) store.getAvgStoreFileAge().getAsDouble() * store.getStorefilesCount();
|
||||||
}
|
}
|
||||||
mt.storeCount += 1;
|
mt.storeCount += 1;
|
||||||
|
tempKey = tbl.getNameAsString() + UNDERSCORE + familyName;
|
||||||
|
Long tempVal = mt.perStoreMemstoreOnlyReadCount.get(tempKey);
|
||||||
|
if (tempVal == null) {
|
||||||
|
tempVal = 0L;
|
||||||
|
}
|
||||||
|
memstoreReadCount = store.getMemstoreOnlyRowReadsCount() + tempVal;
|
||||||
|
tempVal = mt.perStoreMixedReadCount.get(tempKey);
|
||||||
|
if (tempVal == null) {
|
||||||
|
tempVal = 0L;
|
||||||
|
}
|
||||||
|
mixedReadCount = store.getMixedRowReadsCount() + tempVal;
|
||||||
|
// accumulate the count
|
||||||
|
mt.perStoreMemstoreOnlyReadCount.put(tempKey, memstoreReadCount);
|
||||||
|
mt.perStoreMixedReadCount.put(tempKey, mixedReadCount);
|
||||||
}
|
}
|
||||||
|
|
||||||
mt.regionCount += 1;
|
mt.regionCount += 1;
|
||||||
|
|
||||||
mt.readRequestCount += r.getReadRequestsCount();
|
mt.readRequestCount += r.getReadRequestsCount();
|
||||||
mt.filteredReadRequestCount += getFilteredReadRequestCount(tbl.getNameAsString());
|
mt.filteredReadRequestCount += r.getFilteredReadRequestsCount();
|
||||||
mt.writeRequestCount += r.getWriteRequestsCount();
|
mt.writeRequestCount += r.getWriteRequestsCount();
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -133,6 +152,26 @@ public class MetricsTableWrapperAggregateImpl implements MetricsTableWrapperAggr
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Map<String, Long> getMemstoreOnlyRowReadsCount(String table) {
|
||||||
|
MetricsTableValues metricsTable = metricsTableMap.get(TableName.valueOf(table));
|
||||||
|
if (metricsTable == null) {
|
||||||
|
return null;
|
||||||
|
} else {
|
||||||
|
return metricsTable.perStoreMemstoreOnlyReadCount;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Map<String, Long> getMixedRowReadsCount(String table) {
|
||||||
|
MetricsTableValues metricsTable = metricsTableMap.get(TableName.valueOf(table));
|
||||||
|
if (metricsTable == null) {
|
||||||
|
return null;
|
||||||
|
} else {
|
||||||
|
return metricsTable.perStoreMixedReadCount;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public long getCpRequestsCount(String table) {
|
public long getCpRequestsCount(String table) {
|
||||||
MetricsTableValues metricsTable = metricsTableMap.get(TableName.valueOf(table));
|
MetricsTableValues metricsTable = metricsTableMap.get(TableName.valueOf(table));
|
||||||
|
@ -304,6 +343,8 @@ public class MetricsTableWrapperAggregateImpl implements MetricsTableWrapperAggr
|
||||||
long totalStoreFileAge;
|
long totalStoreFileAge;
|
||||||
long referenceFileCount;
|
long referenceFileCount;
|
||||||
long cpRequestCount;
|
long cpRequestCount;
|
||||||
|
Map<String, Long> perStoreMemstoreOnlyReadCount = new HashMap<>();
|
||||||
|
Map<String, Long> perStoreMixedReadCount = new HashMap<>();
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -282,4 +282,14 @@ public interface Store {
|
||||||
boolean isSloppyMemStore();
|
boolean isSloppyMemStore();
|
||||||
|
|
||||||
int getCurrentParallelPutCount();
|
int getCurrentParallelPutCount();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return the number of read requests purely from the memstore.
|
||||||
|
*/
|
||||||
|
long getMemstoreOnlyRowReadsCount();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return the number of read requests from the files under this store.
|
||||||
|
*/
|
||||||
|
long getMixedRowReadsCount();
|
||||||
}
|
}
|
||||||
|
|
|
@ -93,6 +93,10 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
|
||||||
private final int minVersions;
|
private final int minVersions;
|
||||||
private final long maxRowSize;
|
private final long maxRowSize;
|
||||||
private final long cellsPerHeartbeatCheck;
|
private final long cellsPerHeartbeatCheck;
|
||||||
|
@VisibleForTesting
|
||||||
|
long memstoreOnlyReads;
|
||||||
|
@VisibleForTesting
|
||||||
|
long mixedReads;
|
||||||
|
|
||||||
// 1) Collects all the KVHeap that are eagerly getting closed during the
|
// 1) Collects all the KVHeap that are eagerly getting closed during the
|
||||||
// course of a scan
|
// course of a scan
|
||||||
|
@ -565,173 +569,202 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
|
||||||
|
|
||||||
int count = 0;
|
int count = 0;
|
||||||
long totalBytesRead = 0;
|
long totalBytesRead = 0;
|
||||||
|
boolean onlyFromMemstore = true;
|
||||||
LOOP: do {
|
try {
|
||||||
// Update and check the time limit based on the configured value of cellsPerTimeoutCheck
|
LOOP: do {
|
||||||
// Or if the preadMaxBytes is reached and we may want to return so we can switch to stream in
|
// Update and check the time limit based on the configured value of cellsPerTimeoutCheck
|
||||||
// the shipped method below.
|
// Or if the preadMaxBytes is reached and we may want to return so we can switch to stream
|
||||||
if (kvsScanned % cellsPerHeartbeatCheck == 0 || (scanUsePread &&
|
// in
|
||||||
readType == Scan.ReadType.DEFAULT && bytesRead > preadMaxBytes)) {
|
// the shipped method below.
|
||||||
if (scannerContext.checkTimeLimit(LimitScope.BETWEEN_CELLS)) {
|
if (kvsScanned % cellsPerHeartbeatCheck == 0
|
||||||
return scannerContext.setScannerState(NextState.TIME_LIMIT_REACHED).hasMoreValues();
|
|| (scanUsePread && readType == Scan.ReadType.DEFAULT && bytesRead > preadMaxBytes)) {
|
||||||
}
|
if (scannerContext.checkTimeLimit(LimitScope.BETWEEN_CELLS)) {
|
||||||
}
|
return scannerContext.setScannerState(NextState.TIME_LIMIT_REACHED).hasMoreValues();
|
||||||
// Do object compare - we set prevKV from the same heap.
|
|
||||||
if (prevCell != cell) {
|
|
||||||
++kvsScanned;
|
|
||||||
}
|
|
||||||
checkScanOrder(prevCell, cell, comparator);
|
|
||||||
int cellSize = PrivateCellUtil.estimatedSerializedSizeOf(cell);
|
|
||||||
bytesRead += cellSize;
|
|
||||||
if (scanUsePread && readType == Scan.ReadType.DEFAULT &&
|
|
||||||
bytesRead > preadMaxBytes) {
|
|
||||||
// return immediately if we want to switch from pread to stream. We need this because we can
|
|
||||||
// only switch in the shipped method, if user use a filter to filter out everything and rpc
|
|
||||||
// timeout is very large then the shipped method will never be called until the whole scan
|
|
||||||
// is finished, but at that time we have already scan all the data...
|
|
||||||
// See HBASE-20457 for more details.
|
|
||||||
// And there is still a scenario that can not be handled. If we have a very large row, which
|
|
||||||
// have millions of qualifiers, and filter.filterRow is used, then even if we set the flag
|
|
||||||
// here, we still need to scan all the qualifiers before returning...
|
|
||||||
scannerContext.returnImmediately();
|
|
||||||
}
|
|
||||||
prevCell = cell;
|
|
||||||
scannerContext.setLastPeekedCell(cell);
|
|
||||||
topChanged = false;
|
|
||||||
ScanQueryMatcher.MatchCode qcode = matcher.match(cell);
|
|
||||||
switch (qcode) {
|
|
||||||
case INCLUDE:
|
|
||||||
case INCLUDE_AND_SEEK_NEXT_ROW:
|
|
||||||
case INCLUDE_AND_SEEK_NEXT_COL:
|
|
||||||
|
|
||||||
Filter f = matcher.getFilter();
|
|
||||||
if (f != null) {
|
|
||||||
cell = f.transformCell(cell);
|
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
// Do object compare - we set prevKV from the same heap.
|
||||||
|
if (prevCell != cell) {
|
||||||
|
++kvsScanned;
|
||||||
|
}
|
||||||
|
checkScanOrder(prevCell, cell, comparator);
|
||||||
|
int cellSize = PrivateCellUtil.estimatedSerializedSizeOf(cell);
|
||||||
|
bytesRead += cellSize;
|
||||||
|
if (scanUsePread && readType == Scan.ReadType.DEFAULT && bytesRead > preadMaxBytes) {
|
||||||
|
// return immediately if we want to switch from pread to stream. We need this because we
|
||||||
|
// can
|
||||||
|
// only switch in the shipped method, if user use a filter to filter out everything and
|
||||||
|
// rpc
|
||||||
|
// timeout is very large then the shipped method will never be called until the whole scan
|
||||||
|
// is finished, but at that time we have already scan all the data...
|
||||||
|
// See HBASE-20457 for more details.
|
||||||
|
// And there is still a scenario that can not be handled. If we have a very large row,
|
||||||
|
// which
|
||||||
|
// have millions of qualifiers, and filter.filterRow is used, then even if we set the flag
|
||||||
|
// here, we still need to scan all the qualifiers before returning...
|
||||||
|
scannerContext.returnImmediately();
|
||||||
|
}
|
||||||
|
prevCell = cell;
|
||||||
|
scannerContext.setLastPeekedCell(cell);
|
||||||
|
topChanged = false;
|
||||||
|
ScanQueryMatcher.MatchCode qcode = matcher.match(cell);
|
||||||
|
switch (qcode) {
|
||||||
|
case INCLUDE:
|
||||||
|
case INCLUDE_AND_SEEK_NEXT_ROW:
|
||||||
|
case INCLUDE_AND_SEEK_NEXT_COL:
|
||||||
|
|
||||||
this.countPerRow++;
|
Filter f = matcher.getFilter();
|
||||||
if (storeLimit > -1 && this.countPerRow > (storeLimit + storeOffset)) {
|
if (f != null) {
|
||||||
// do what SEEK_NEXT_ROW does.
|
cell = f.transformCell(cell);
|
||||||
if (!matcher.moreRowsMayExistAfter(cell)) {
|
}
|
||||||
|
this.countPerRow++;
|
||||||
|
if (storeLimit > -1 && this.countPerRow > (storeLimit + storeOffset)) {
|
||||||
|
// do what SEEK_NEXT_ROW does.
|
||||||
|
if (!matcher.moreRowsMayExistAfter(cell)) {
|
||||||
|
close(false);// Do all cleanup except heap.close()
|
||||||
|
return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
|
||||||
|
}
|
||||||
|
matcher.clearCurrentRow();
|
||||||
|
seekToNextRow(cell);
|
||||||
|
break LOOP;
|
||||||
|
}
|
||||||
|
|
||||||
|
// add to results only if we have skipped #storeOffset kvs
|
||||||
|
// also update metric accordingly
|
||||||
|
if (this.countPerRow > storeOffset) {
|
||||||
|
outResult.add(cell);
|
||||||
|
|
||||||
|
// Update local tracking information
|
||||||
|
count++;
|
||||||
|
totalBytesRead += cellSize;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Increment the metric if all the cells are from memstore.
|
||||||
|
* If not we will account it for mixed reads
|
||||||
|
*/
|
||||||
|
onlyFromMemstore = onlyFromMemstore && heap.isLatestCellFromMemstore();
|
||||||
|
// Update the progress of the scanner context
|
||||||
|
scannerContext.incrementSizeProgress(cellSize, cell.heapSize());
|
||||||
|
scannerContext.incrementBatchProgress(1);
|
||||||
|
|
||||||
|
if (matcher.isUserScan() && totalBytesRead > maxRowSize) {
|
||||||
|
String message = "Max row size allowed: " + maxRowSize
|
||||||
|
+ ", but the row is bigger than that, the row info: "
|
||||||
|
+ CellUtil.toString(cell, false) + ", already have process row cells = "
|
||||||
|
+ outResult.size() + ", it belong to region = "
|
||||||
|
+ store.getHRegion().getRegionInfo().getRegionNameAsString();
|
||||||
|
LOG.warn(message);
|
||||||
|
throw new RowTooBigException(message);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (qcode == ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_ROW) {
|
||||||
|
if (!matcher.moreRowsMayExistAfter(cell)) {
|
||||||
|
close(false);// Do all cleanup except heap.close()
|
||||||
|
return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
|
||||||
|
}
|
||||||
|
matcher.clearCurrentRow();
|
||||||
|
seekOrSkipToNextRow(cell);
|
||||||
|
} else if (qcode == ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_COL) {
|
||||||
|
seekOrSkipToNextColumn(cell);
|
||||||
|
} else {
|
||||||
|
this.heap.next();
|
||||||
|
}
|
||||||
|
|
||||||
|
if (scannerContext.checkBatchLimit(LimitScope.BETWEEN_CELLS)) {
|
||||||
|
break LOOP;
|
||||||
|
}
|
||||||
|
if (scannerContext.checkSizeLimit(LimitScope.BETWEEN_CELLS)) {
|
||||||
|
break LOOP;
|
||||||
|
}
|
||||||
|
continue;
|
||||||
|
|
||||||
|
case DONE:
|
||||||
|
// Optimization for Gets! If DONE, no more to get on this row, early exit!
|
||||||
|
if (get) {
|
||||||
|
// Then no more to this row... exit.
|
||||||
close(false);// Do all cleanup except heap.close()
|
close(false);// Do all cleanup except heap.close()
|
||||||
|
// update metric
|
||||||
return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
|
return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
|
||||||
}
|
}
|
||||||
matcher.clearCurrentRow();
|
matcher.clearCurrentRow();
|
||||||
seekToNextRow(cell);
|
return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues();
|
||||||
break LOOP;
|
|
||||||
}
|
|
||||||
|
|
||||||
// add to results only if we have skipped #storeOffset kvs
|
case DONE_SCAN:
|
||||||
// also update metric accordingly
|
close(false);// Do all cleanup except heap.close()
|
||||||
if (this.countPerRow > storeOffset) {
|
return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
|
||||||
outResult.add(cell);
|
|
||||||
|
|
||||||
// Update local tracking information
|
case SEEK_NEXT_ROW:
|
||||||
count++;
|
// This is just a relatively simple end of scan fix, to short-cut end
|
||||||
totalBytesRead += cellSize;
|
// us if there is an endKey in the scan.
|
||||||
|
|
||||||
// Update the progress of the scanner context
|
|
||||||
scannerContext.incrementSizeProgress(cellSize, cell.heapSize());
|
|
||||||
scannerContext.incrementBatchProgress(1);
|
|
||||||
|
|
||||||
if (matcher.isUserScan() && totalBytesRead > maxRowSize) {
|
|
||||||
String message = "Max row size allowed: " + maxRowSize
|
|
||||||
+ ", but the row is bigger than that, the row info: " + CellUtil
|
|
||||||
.toString(cell, false) + ", already have process row cells = " + outResult.size()
|
|
||||||
+ ", it belong to region = " + store.getHRegion().getRegionInfo()
|
|
||||||
.getRegionNameAsString();
|
|
||||||
LOG.warn(message);
|
|
||||||
throw new RowTooBigException(message);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (qcode == ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_ROW) {
|
|
||||||
if (!matcher.moreRowsMayExistAfter(cell)) {
|
if (!matcher.moreRowsMayExistAfter(cell)) {
|
||||||
close(false);// Do all cleanup except heap.close()
|
close(false);// Do all cleanup except heap.close()
|
||||||
return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
|
return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
|
||||||
}
|
}
|
||||||
matcher.clearCurrentRow();
|
matcher.clearCurrentRow();
|
||||||
seekOrSkipToNextRow(cell);
|
seekOrSkipToNextRow(cell);
|
||||||
} else if (qcode == ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_COL) {
|
NextState stateAfterSeekNextRow = needToReturn(outResult);
|
||||||
seekOrSkipToNextColumn(cell);
|
if (stateAfterSeekNextRow != null) {
|
||||||
} else {
|
return scannerContext.setScannerState(stateAfterSeekNextRow).hasMoreValues();
|
||||||
this.heap.next();
|
|
||||||
}
|
|
||||||
|
|
||||||
if (scannerContext.checkBatchLimit(LimitScope.BETWEEN_CELLS)) {
|
|
||||||
break LOOP;
|
|
||||||
}
|
|
||||||
if (scannerContext.checkSizeLimit(LimitScope.BETWEEN_CELLS)) {
|
|
||||||
break LOOP;
|
|
||||||
}
|
|
||||||
continue;
|
|
||||||
|
|
||||||
case DONE:
|
|
||||||
// Optimization for Gets! If DONE, no more to get on this row, early exit!
|
|
||||||
if (get) {
|
|
||||||
// Then no more to this row... exit.
|
|
||||||
close(false);// Do all cleanup except heap.close()
|
|
||||||
return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
|
|
||||||
}
|
|
||||||
matcher.clearCurrentRow();
|
|
||||||
return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues();
|
|
||||||
|
|
||||||
case DONE_SCAN:
|
|
||||||
close(false);// Do all cleanup except heap.close()
|
|
||||||
return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
|
|
||||||
|
|
||||||
case SEEK_NEXT_ROW:
|
|
||||||
// This is just a relatively simple end of scan fix, to short-cut end
|
|
||||||
// us if there is an endKey in the scan.
|
|
||||||
if (!matcher.moreRowsMayExistAfter(cell)) {
|
|
||||||
close(false);// Do all cleanup except heap.close()
|
|
||||||
return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
|
|
||||||
}
|
|
||||||
matcher.clearCurrentRow();
|
|
||||||
seekOrSkipToNextRow(cell);
|
|
||||||
NextState stateAfterSeekNextRow = needToReturn(outResult);
|
|
||||||
if (stateAfterSeekNextRow != null) {
|
|
||||||
return scannerContext.setScannerState(stateAfterSeekNextRow).hasMoreValues();
|
|
||||||
}
|
|
||||||
break;
|
|
||||||
|
|
||||||
case SEEK_NEXT_COL:
|
|
||||||
seekOrSkipToNextColumn(cell);
|
|
||||||
NextState stateAfterSeekNextColumn = needToReturn(outResult);
|
|
||||||
if (stateAfterSeekNextColumn != null) {
|
|
||||||
return scannerContext.setScannerState(stateAfterSeekNextColumn).hasMoreValues();
|
|
||||||
}
|
|
||||||
break;
|
|
||||||
|
|
||||||
case SKIP:
|
|
||||||
this.heap.next();
|
|
||||||
break;
|
|
||||||
|
|
||||||
case SEEK_NEXT_USING_HINT:
|
|
||||||
Cell nextKV = matcher.getNextKeyHint(cell);
|
|
||||||
if (nextKV != null && comparator.compare(nextKV, cell) > 0) {
|
|
||||||
seekAsDirection(nextKV);
|
|
||||||
NextState stateAfterSeekByHint = needToReturn(outResult);
|
|
||||||
if (stateAfterSeekByHint != null) {
|
|
||||||
return scannerContext.setScannerState(stateAfterSeekByHint).hasMoreValues();
|
|
||||||
}
|
}
|
||||||
} else {
|
break;
|
||||||
heap.next();
|
|
||||||
}
|
|
||||||
break;
|
|
||||||
|
|
||||||
default:
|
case SEEK_NEXT_COL:
|
||||||
throw new RuntimeException("UNEXPECTED");
|
seekOrSkipToNextColumn(cell);
|
||||||
|
NextState stateAfterSeekNextColumn = needToReturn(outResult);
|
||||||
|
if (stateAfterSeekNextColumn != null) {
|
||||||
|
return scannerContext.setScannerState(stateAfterSeekNextColumn).hasMoreValues();
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
|
||||||
|
case SKIP:
|
||||||
|
this.heap.next();
|
||||||
|
break;
|
||||||
|
|
||||||
|
case SEEK_NEXT_USING_HINT:
|
||||||
|
Cell nextKV = matcher.getNextKeyHint(cell);
|
||||||
|
if (nextKV != null && comparator.compare(nextKV, cell) > 0) {
|
||||||
|
seekAsDirection(nextKV);
|
||||||
|
NextState stateAfterSeekByHint = needToReturn(outResult);
|
||||||
|
if (stateAfterSeekByHint != null) {
|
||||||
|
return scannerContext.setScannerState(stateAfterSeekByHint).hasMoreValues();
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
heap.next();
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
|
||||||
|
default:
|
||||||
|
throw new RuntimeException("UNEXPECTED");
|
||||||
|
}
|
||||||
|
} while ((cell = this.heap.peek()) != null);
|
||||||
|
|
||||||
|
if (count > 0) {
|
||||||
|
return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues();
|
||||||
}
|
}
|
||||||
} while ((cell = this.heap.peek()) != null);
|
|
||||||
|
|
||||||
if (count > 0) {
|
// No more keys
|
||||||
return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues();
|
close(false);// Do all cleanup except heap.close()
|
||||||
|
return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
|
||||||
|
} finally {
|
||||||
|
// increment only if we have some result
|
||||||
|
if (count > 0) {
|
||||||
|
// if true increment memstore metrics, if not the mixed one
|
||||||
|
updateMetricsStore(onlyFromMemstore);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// No more keys
|
private void updateMetricsStore(boolean memstoreRead) {
|
||||||
close(false);// Do all cleanup except heap.close()
|
if (store != null) {
|
||||||
return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
|
store.updateMetricsStore(memstoreRead);
|
||||||
|
} else {
|
||||||
|
// for testing.
|
||||||
|
if (memstoreRead) {
|
||||||
|
memstoreOnlyReads++;
|
||||||
|
} else {
|
||||||
|
mixedReads++;
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -1208,4 +1241,3 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -18,6 +18,9 @@
|
||||||
|
|
||||||
package org.apache.hadoop.hbase.regionserver;
|
package org.apache.hadoop.hbase.regionserver;
|
||||||
|
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
public class MetricsRegionWrapperStub implements MetricsRegionWrapper {
|
public class MetricsRegionWrapperStub implements MetricsRegionWrapper {
|
||||||
int replicaid = 0;
|
int replicaid = 0;
|
||||||
|
|
||||||
|
@ -182,4 +185,18 @@ public class MetricsRegionWrapperStub implements MetricsRegionWrapper {
|
||||||
public long getTotalRequestCount() {
|
public long getTotalRequestCount() {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Map<String, Long> getMemstoreOnlyRowReadsCount() {
|
||||||
|
Map<String, Long> map = new HashMap<>();
|
||||||
|
map.put("info", 0L);
|
||||||
|
return map;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Map<String, Long> getMixedRowReadsCount() {
|
||||||
|
Map<String, Long> map = new HashMap<>();
|
||||||
|
map.put("info", 0L);
|
||||||
|
return map;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -23,6 +23,7 @@ import static org.junit.Assert.assertEquals;
|
||||||
import static org.junit.Assert.assertFalse;
|
import static org.junit.Assert.assertFalse;
|
||||||
import static org.junit.Assert.assertNull;
|
import static org.junit.Assert.assertNull;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
|
@ -31,6 +32,7 @@ import java.util.List;
|
||||||
import java.util.NavigableSet;
|
import java.util.NavigableSet;
|
||||||
import java.util.TreeSet;
|
import java.util.TreeSet;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.hbase.Cell;
|
import org.apache.hadoop.hbase.Cell;
|
||||||
import org.apache.hadoop.hbase.CellBuilderType;
|
import org.apache.hadoop.hbase.CellBuilderType;
|
||||||
|
@ -405,6 +407,7 @@ public class TestStoreScanner {
|
||||||
// We should have gone the optimize route 5 times totally... an INCLUDE for the four cells
|
// We should have gone the optimize route 5 times totally... an INCLUDE for the four cells
|
||||||
// in the row plus the DONE on the end.
|
// in the row plus the DONE on the end.
|
||||||
assertEquals(5, scanner.count.get());
|
assertEquals(5, scanner.count.get());
|
||||||
|
assertEquals(1, scanner.memstoreOnlyReads);
|
||||||
// For a full row Get, there should be no opportunity for scanner optimization.
|
// For a full row Get, there should be no opportunity for scanner optimization.
|
||||||
assertEquals(0, scanner.optimization.get());
|
assertEquals(0, scanner.optimization.get());
|
||||||
}
|
}
|
||||||
|
@ -479,6 +482,8 @@ public class TestStoreScanner {
|
||||||
// And we should have gone through optimize twice only.
|
// And we should have gone through optimize twice only.
|
||||||
assertEquals("First qcode is SEEK_NEXT_COL and second INCLUDE_AND_SEEK_NEXT_ROW", 3,
|
assertEquals("First qcode is SEEK_NEXT_COL and second INCLUDE_AND_SEEK_NEXT_ROW", 3,
|
||||||
scanner.count.get());
|
scanner.count.get());
|
||||||
|
assertEquals("Memstore Read count should be", 1,
|
||||||
|
scanner.memstoreOnlyReads);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -578,6 +583,7 @@ public class TestStoreScanner {
|
||||||
List<Cell> results = new ArrayList<>();
|
List<Cell> results = new ArrayList<>();
|
||||||
assertEquals(true, scan.next(results));
|
assertEquals(true, scan.next(results));
|
||||||
assertEquals(1, results.size());
|
assertEquals(1, results.size());
|
||||||
|
assertEquals(1, scan.memstoreOnlyReads);
|
||||||
assertEquals(kvs[0], results.get(0));
|
assertEquals(kvs[0], results.get(0));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue