HBASE-13030 [1.0.0 polish] Make ScanMetrics public again and align Put 'add' with Get, Delete, etc., addColumn
This commit is contained in:
parent
76a3b50f1f
commit
3babad30e6
|
@ -23,14 +23,12 @@ import java.util.Iterator;
|
|||
|
||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
|
||||
/**
|
||||
* Helper class for custom client scanners.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public abstract class AbstractClientScanner implements ResultScanner {
|
||||
|
||||
protected ScanMetrics scanMetrics;
|
||||
|
||||
/**
|
||||
|
@ -38,14 +36,19 @@ public abstract class AbstractClientScanner implements ResultScanner {
|
|||
*/
|
||||
protected void initScanMetrics(Scan scan) {
|
||||
// check if application wants to collect scan metrics
|
||||
byte[] enableMetrics = scan.getAttribute(
|
||||
Scan.SCAN_ATTRIBUTES_METRICS_ENABLE);
|
||||
if (enableMetrics != null && Bytes.toBoolean(enableMetrics)) {
|
||||
if (scan.isScanMetricsEnabled()) {
|
||||
scanMetrics = new ScanMetrics();
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: should this be at ResultScanner? ScanMetrics is not public API it seems.
|
||||
/**
|
||||
* Used internally accumulating metrics on scan. To
|
||||
* enable collection of metrics on a Scanner, call {@link Scan#setScanMetricsEnabled(boolean)}.
|
||||
* These metrics are cleared at key transition points. Metrics are accumulated in the
|
||||
* {@link Scan} object itself.
|
||||
* @see Scan#getScanMetrics()
|
||||
* @return Returns the running {@link ScanMetrics} instance or null if scan metrics not enabled.
|
||||
*/
|
||||
public ScanMetrics getScanMetrics() {
|
||||
return scanMetrics;
|
||||
}
|
||||
|
|
|
@ -317,9 +317,9 @@ public class ClientScanner extends AbstractClientScanner {
|
|||
* machine; for scan/map reduce scenarios, we will have multiple scans running at the same time.
|
||||
*
|
||||
* By default, scan metrics are disabled; if the application wants to collect them, this
|
||||
* behavior can be turned on by calling calling:
|
||||
* behavior can be turned on by calling calling {@link Scan#setScanMetricsEnabled(boolean)}
|
||||
*
|
||||
* scan.setAttribute(SCAN_ATTRIBUTES_METRICS_ENABLE, Bytes.toBytes(Boolean.TRUE))
|
||||
* <p>This invocation clears the scan metrics. Metrics are aggregated in the Scan instance.
|
||||
*/
|
||||
protected void writeScanMetrics() {
|
||||
if (this.scanMetrics == null || scanMetricsPublished) {
|
||||
|
|
|
@ -137,9 +137,22 @@ public class Put extends Mutation implements HeapSize, Comparable<Row> {
|
|||
* @param qualifier column qualifier
|
||||
* @param value column value
|
||||
* @return this
|
||||
* @deprecated Since 1.0.0. Use {@link #addColumn(byte[], byte[], byte[])}
|
||||
*/
|
||||
@Deprecated
|
||||
public Put add(byte [] family, byte [] qualifier, byte [] value) {
|
||||
return add(family, qualifier, this.ts, value);
|
||||
return addColumn(family, qualifier, value);
|
||||
}
|
||||
|
||||
/**
|
||||
* Add the specified column and value to this Put operation.
|
||||
* @param family family name
|
||||
* @param qualifier column qualifier
|
||||
* @param value column value
|
||||
* @return this
|
||||
*/
|
||||
public Put addColumn(byte [] family, byte [] qualifier, byte [] value) {
|
||||
return addColumn(family, qualifier, this.ts, value);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -167,8 +180,23 @@ public class Put extends Mutation implements HeapSize, Comparable<Row> {
|
|||
* @param ts version timestamp
|
||||
* @param value column value
|
||||
* @return this
|
||||
* @deprecated Since 1.0.0. Use {@link #addColumn(byte[], byte[], long, byte[])}
|
||||
*/
|
||||
@Deprecated
|
||||
public Put add(byte [] family, byte [] qualifier, long ts, byte [] value) {
|
||||
return addColumn(family, qualifier, ts, value);
|
||||
}
|
||||
|
||||
/**
|
||||
* Add the specified column and value, with the specified timestamp as
|
||||
* its version to this Put operation.
|
||||
* @param family family name
|
||||
* @param qualifier column qualifier
|
||||
* @param ts version timestamp
|
||||
* @param value column value
|
||||
* @return this
|
||||
*/
|
||||
public Put addColumn(byte [] family, byte [] qualifier, long ts, byte [] value) {
|
||||
if (ts < 0) {
|
||||
throw new IllegalArgumentException("Timestamp cannot be negative. ts=" + ts);
|
||||
}
|
||||
|
@ -199,7 +227,6 @@ public class Put extends Mutation implements HeapSize, Comparable<Row> {
|
|||
* This expects that the underlying arrays won't change. It's intended
|
||||
* for usage internal HBase to and for advanced client applications.
|
||||
*/
|
||||
@SuppressWarnings("unchecked")
|
||||
public Put addImmutable(byte[] family, byte[] qualifier, long ts, byte[] value, Tag[] tag) {
|
||||
List<Cell> list = getCellList(family);
|
||||
KeyValue kv = createPutKeyValue(family, qualifier, ts, value, tag);
|
||||
|
@ -233,8 +260,23 @@ public class Put extends Mutation implements HeapSize, Comparable<Row> {
|
|||
* @param ts version timestamp
|
||||
* @param value column value
|
||||
* @return this
|
||||
* @deprecated Since 1.0.0. Use {@link Put#addColumn(byte[], ByteBuffer, long, ByteBuffer)}
|
||||
*/
|
||||
@Deprecated
|
||||
public Put add(byte[] family, ByteBuffer qualifier, long ts, ByteBuffer value) {
|
||||
return addColumn(family, qualifier, ts, value);
|
||||
}
|
||||
|
||||
/**
|
||||
* Add the specified column and value, with the specified timestamp as
|
||||
* its version to this Put operation.
|
||||
* @param family family name
|
||||
* @param qualifier column qualifier
|
||||
* @param ts version timestamp
|
||||
* @param value column value
|
||||
* @return this
|
||||
*/
|
||||
public Put addColumn(byte[] family, ByteBuffer qualifier, long ts, ByteBuffer value) {
|
||||
if (ts < 0) {
|
||||
throw new IllegalArgumentException("Timestamp cannot be negative. ts=" + ts);
|
||||
}
|
||||
|
|
|
@ -361,6 +361,9 @@ public class Result implements CellScannable, CellScanner {
|
|||
|
||||
/**
|
||||
* Get the latest version of the specified column.
|
||||
* Note: this call clones the value content of the hosting Cell. See
|
||||
* {@link #getValueAsByteBuffer(byte[], byte[])}, etc., or {@link #listCells()} if you would
|
||||
* avoid the cloning.
|
||||
* @param family family name
|
||||
* @param qualifier column qualifier
|
||||
* @return value of latest version of column, null if none found
|
||||
|
@ -388,7 +391,8 @@ public class Result implements CellScannable, CellScanner {
|
|||
if (kv == null) {
|
||||
return null;
|
||||
}
|
||||
return ByteBuffer.wrap(kv.getValueArray(), kv.getValueOffset(), kv.getValueLength());
|
||||
return ByteBuffer.wrap(kv.getValueArray(), kv.getValueOffset(), kv.getValueLength()).
|
||||
asReadOnlyBuffer();
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -411,7 +415,8 @@ public class Result implements CellScannable, CellScanner {
|
|||
if (kv == null) {
|
||||
return null;
|
||||
}
|
||||
return ByteBuffer.wrap(kv.getValueArray(), kv.getValueOffset(), kv.getValueLength());
|
||||
return ByteBuffer.wrap(kv.getValueArray(), kv.getValueOffset(), kv.getValueLength()).
|
||||
asReadOnlyBuffer();
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -34,9 +34,11 @@ import org.apache.commons.logging.LogFactory;
|
|||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceStability;
|
||||
import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
|
||||
import org.apache.hadoop.hbase.filter.Filter;
|
||||
import org.apache.hadoop.hbase.filter.IncompatibleFilterException;
|
||||
import org.apache.hadoop.hbase.io.TimeRange;
|
||||
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
|
||||
import org.apache.hadoop.hbase.security.access.Permission;
|
||||
import org.apache.hadoop.hbase.security.visibility.Authorizations;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
|
@ -117,9 +119,18 @@ public class Scan extends Query {
|
|||
private int storeOffset = 0;
|
||||
private boolean getScan;
|
||||
|
||||
// If application wants to collect scan metrics, it needs to
|
||||
// call scan.setAttribute(SCAN_ATTRIBUTES_ENABLE, Bytes.toBytes(Boolean.TRUE))
|
||||
/**
|
||||
* @deprecated since 1.0.0. Use {@link #setScanMetricsEnabled(boolean)}
|
||||
*/
|
||||
// Make private or remove.
|
||||
@Deprecated
|
||||
static public final String SCAN_ATTRIBUTES_METRICS_ENABLE = "scan.attributes.metrics.enable";
|
||||
|
||||
/**
|
||||
* Use {@link #getScanMetrics()}
|
||||
*/
|
||||
// Make this private or remove.
|
||||
@Deprecated
|
||||
static public final String SCAN_ATTRIBUTES_METRICS_DATA = "scan.attributes.metrics.data";
|
||||
|
||||
// If an application wants to use multiple scans over different tables each scan must
|
||||
|
@ -916,4 +927,31 @@ public class Scan extends Query {
|
|||
scan.setCaching(1);
|
||||
return scan;
|
||||
}
|
||||
|
||||
/**
|
||||
* Enable collection of {@link ScanMetrics}. For advanced users.
|
||||
* @param enabled Set to true to enable accumulating scan metrics
|
||||
*/
|
||||
public Scan setScanMetricsEnabled(final boolean enabled) {
|
||||
setAttribute(Scan.SCAN_ATTRIBUTES_METRICS_ENABLE, Bytes.toBytes(Boolean.valueOf(enabled)));
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return True if collection of scan metrics is enabled. For advanced users.
|
||||
*/
|
||||
public boolean isScanMetricsEnabled() {
|
||||
byte[] attr = getAttribute(Scan.SCAN_ATTRIBUTES_METRICS_ENABLE);
|
||||
return attr == null ? false : Bytes.toBoolean(attr);
|
||||
}
|
||||
|
||||
/**
|
||||
* @return Metrics on this Scan, if metrics were enabled.
|
||||
* @see #setScanMetricsEnabled(boolean)
|
||||
*/
|
||||
public ScanMetrics getScanMetrics() {
|
||||
byte [] bytes = getAttribute(Scan.SCAN_ATTRIBUTES_METRICS_DATA);
|
||||
if (bytes == null) return null;
|
||||
return ProtobufUtil.toScanMetrics(bytes);
|
||||
}
|
||||
}
|
|
@ -22,15 +22,14 @@ import java.util.HashMap;
|
|||
import java.util.Map;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceStability;
|
||||
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
|
||||
|
||||
/**
|
||||
* Provides client-side metrics related to scan operations
|
||||
* Provides client-side metrics related to scan operations.
|
||||
* The data can be passed to mapreduce framework or other systems.
|
||||
* We use atomic longs so that one thread can increment,
|
||||
* while another atomically resets to zero after the values are reported
|
||||
|
@ -40,12 +39,10 @@ import com.google.common.collect.ImmutableMap;
|
|||
* However, there is no need for this. So they are defined under scan operation
|
||||
* for now.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
@InterfaceAudience.Public
|
||||
@InterfaceStability.Evolving
|
||||
public class ScanMetrics {
|
||||
|
||||
|
||||
private static final Log LOG = LogFactory.getLog(ScanMetrics.class);
|
||||
|
||||
/**
|
||||
* Hash to hold the String -> Atomic Long mappings.
|
||||
*/
|
||||
|
|
|
@ -61,6 +61,10 @@ public final class CellUtil {
|
|||
cell.getQualifierLength());
|
||||
}
|
||||
|
||||
public static ByteRange fillValueRange(Cell cell, ByteRange range) {
|
||||
return range.set(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());
|
||||
}
|
||||
|
||||
public static ByteRange fillTagRange(Cell cell, ByteRange range) {
|
||||
return range.set(cell.getTagsArray(), cell.getTagsOffset(), cell.getTagsLength());
|
||||
}
|
||||
|
|
|
@ -250,10 +250,12 @@ public class RemoteHTable implements Table {
|
|||
return TableName.valueOf(name);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Configuration getConfiguration() {
|
||||
return conf;
|
||||
}
|
||||
|
||||
@Override
|
||||
public HTableDescriptor getTableDescriptor() throws IOException {
|
||||
StringBuilder sb = new StringBuilder();
|
||||
sb.append('/');
|
||||
|
@ -282,10 +284,12 @@ public class RemoteHTable implements Table {
|
|||
throw new IOException("schema request timed out");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws IOException {
|
||||
client.shutdown();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Result get(Get get) throws IOException {
|
||||
TimeRange range = get.getTimeRange();
|
||||
String spec = buildRowSpec(get.getRow(), get.getFamilyMap(),
|
||||
|
@ -304,6 +308,7 @@ public class RemoteHTable implements Table {
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Result[] get(List<Get> gets) throws IOException {
|
||||
byte[][] rows = new byte[gets.size()][];
|
||||
int maxVersions = 1;
|
||||
|
@ -360,6 +365,7 @@ public class RemoteHTable implements Table {
|
|||
throw new IOException("get request timed out");
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean exists(Get get) throws IOException {
|
||||
LOG.warn("exists() is really get(), just use get()");
|
||||
Result result = get(get);
|
||||
|
@ -370,6 +376,7 @@ public class RemoteHTable implements Table {
|
|||
* exists(List) is really a list of get() calls. Just use get().
|
||||
* @param gets list of Get to test for the existence
|
||||
*/
|
||||
@Override
|
||||
public boolean[] existsAll(List<Get> gets) throws IOException {
|
||||
LOG.warn("exists(List<Get>) is really list of get() calls, just use get()");
|
||||
boolean[] results = new boolean[gets.size()];
|
||||
|
@ -389,6 +396,7 @@ public class RemoteHTable implements Table {
|
|||
return objectResults;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void put(Put put) throws IOException {
|
||||
CellSetModel model = buildModelFromPut(put);
|
||||
StringBuilder sb = new StringBuilder();
|
||||
|
@ -417,6 +425,7 @@ public class RemoteHTable implements Table {
|
|||
throw new IOException("put request timed out");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void put(List<Put> puts) throws IOException {
|
||||
// this is a trick: The gateway accepts multiple rows in a cell set and
|
||||
// ignores the row specification in the URI
|
||||
|
@ -472,6 +481,7 @@ public class RemoteHTable implements Table {
|
|||
throw new IOException("multiput request timed out");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void delete(Delete delete) throws IOException {
|
||||
String spec = buildRowSpec(delete.getRow(), delete.getFamilyCellMap(),
|
||||
delete.getTimeStamp(), delete.getTimeStamp(), 1);
|
||||
|
@ -495,6 +505,7 @@ public class RemoteHTable implements Table {
|
|||
throw new IOException("delete request timed out");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void delete(List<Delete> deletes) throws IOException {
|
||||
for (Delete delete: deletes) {
|
||||
delete(delete);
|
||||
|
@ -632,19 +643,21 @@ public class RemoteHTable implements Table {
|
|||
LOG.warn(StringUtils.stringifyException(e));
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public ResultScanner getScanner(Scan scan) throws IOException {
|
||||
return new Scanner(scan);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ResultScanner getScanner(byte[] family) throws IOException {
|
||||
Scan scan = new Scan();
|
||||
scan.addFamily(family);
|
||||
return new Scanner(scan);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ResultScanner getScanner(byte[] family, byte[] qualifier)
|
||||
throws IOException {
|
||||
Scan scan = new Scan();
|
||||
|
@ -660,6 +673,7 @@ public class RemoteHTable implements Table {
|
|||
throw new IOException("getRowOrBefore not supported");
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean checkAndPut(byte[] row, byte[] family, byte[] qualifier,
|
||||
byte[] value, Put put) throws IOException {
|
||||
// column to check-the-value
|
||||
|
@ -696,11 +710,13 @@ public class RemoteHTable implements Table {
|
|||
throw new IOException("checkAndPut request timed out");
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean checkAndPut(byte[] row, byte[] family, byte[] qualifier,
|
||||
CompareOp compareOp, byte[] value, Put put) throws IOException {
|
||||
throw new IOException("checkAndPut for non-equal comparison not implemented");
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean checkAndDelete(byte[] row, byte[] family, byte[] qualifier,
|
||||
byte[] value, Delete delete) throws IOException {
|
||||
Put put = new Put(row);
|
||||
|
@ -737,24 +753,29 @@ public class RemoteHTable implements Table {
|
|||
throw new IOException("checkAndDelete request timed out");
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean checkAndDelete(byte[] row, byte[] family, byte[] qualifier,
|
||||
CompareOp compareOp, byte[] value, Delete delete) throws IOException {
|
||||
throw new IOException("checkAndDelete for non-equal comparison not implemented");
|
||||
}
|
||||
|
||||
@Override
|
||||
public Result increment(Increment increment) throws IOException {
|
||||
throw new IOException("Increment not supported");
|
||||
}
|
||||
|
||||
@Override
|
||||
public Result append(Append append) throws IOException {
|
||||
throw new IOException("Append not supported");
|
||||
}
|
||||
|
||||
@Override
|
||||
public long incrementColumnValue(byte[] row, byte[] family, byte[] qualifier,
|
||||
long amount) throws IOException {
|
||||
throw new IOException("incrementColumnValue not supported");
|
||||
}
|
||||
|
||||
@Override
|
||||
public long incrementColumnValue(byte[] row, byte[] family, byte[] qualifier,
|
||||
long amount, Durability durability) throws IOException {
|
||||
throw new IOException("incrementColumnValue not supported");
|
||||
|
|
|
@ -33,7 +33,6 @@ import org.apache.hadoop.hbase.client.ScannerCallable;
|
|||
import org.apache.hadoop.hbase.client.Table;
|
||||
import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
|
||||
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
|
||||
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.mapreduce.Counter;
|
||||
import org.apache.hadoop.mapreduce.InputSplit;
|
||||
|
@ -80,8 +79,7 @@ public class TableRecordReaderImpl {
|
|||
public void restart(byte[] firstRow) throws IOException {
|
||||
currentScan = new Scan(scan);
|
||||
currentScan.setStartRow(firstRow);
|
||||
currentScan.setAttribute(Scan.SCAN_ATTRIBUTES_METRICS_ENABLE,
|
||||
Bytes.toBytes(Boolean.TRUE));
|
||||
currentScan.setScanMetricsEnabled(true);
|
||||
if (this.scanner != null) {
|
||||
if (logScannerActivity) {
|
||||
LOG.info("Closing the previously opened scanner object.");
|
||||
|
@ -265,14 +263,11 @@ public class TableRecordReaderImpl {
|
|||
* @throws IOException
|
||||
*/
|
||||
private void updateCounters() throws IOException {
|
||||
byte[] serializedMetrics = currentScan.getAttribute(
|
||||
Scan.SCAN_ATTRIBUTES_METRICS_DATA);
|
||||
if (serializedMetrics == null || serializedMetrics.length == 0 ) {
|
||||
ScanMetrics scanMetrics = this.scan.getScanMetrics();
|
||||
if (scanMetrics == null) {
|
||||
return;
|
||||
}
|
||||
|
||||
ScanMetrics scanMetrics = ProtobufUtil.toScanMetrics(serializedMetrics);
|
||||
|
||||
updateCounters(scanMetrics, numRestarts, getCounter, context, numStale);
|
||||
}
|
||||
|
||||
|
|
|
@ -2073,7 +2073,6 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
|
|||
if (!isLoadingCfsOnDemandSet) {
|
||||
scan.setLoadColumnFamiliesOnDemand(region.isLoadingCfsOnDemandDefault());
|
||||
}
|
||||
scan.getAttribute(Scan.SCAN_ATTRIBUTES_METRICS_ENABLE);
|
||||
region.prepareScanner(scan);
|
||||
if (region.getCoprocessorHost() != null) {
|
||||
scanner = region.getCoprocessorHost().preScannerOpen(scan);
|
||||
|
|
|
@ -29,7 +29,6 @@ import org.apache.hadoop.fs.Path;
|
|||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.client.Connection;
|
||||
import org.apache.hadoop.hbase.client.ConnectionFactory;
|
||||
import org.apache.hadoop.hbase.client.HTable;
|
||||
import org.apache.hadoop.hbase.client.Result;
|
||||
import org.apache.hadoop.hbase.client.ResultScanner;
|
||||
import org.apache.hadoop.hbase.client.Scan;
|
||||
|
@ -39,9 +38,7 @@ import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
|
|||
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
|
||||
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
|
||||
import org.apache.hadoop.hbase.mapreduce.TableMapper;
|
||||
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
|
||||
import org.apache.hadoop.hbase.util.AbstractHBaseTool;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.FSUtils;
|
||||
import org.apache.hadoop.io.NullWritable;
|
||||
import org.apache.hadoop.mapreduce.Counters;
|
||||
|
@ -137,7 +134,7 @@ public class ScanPerformanceEvaluation extends AbstractHBaseTool {
|
|||
Scan scan = new Scan(); // default scan settings
|
||||
scan.setCacheBlocks(false);
|
||||
scan.setMaxVersions(1);
|
||||
scan.setAttribute(Scan.SCAN_ATTRIBUTES_METRICS_ENABLE, Bytes.toBytes(Boolean.TRUE));
|
||||
scan.setScanMetricsEnabled(true);
|
||||
if (caching != null) {
|
||||
scan.setCaching(Integer.parseInt(caching));
|
||||
}
|
||||
|
@ -177,7 +174,7 @@ public class ScanPerformanceEvaluation extends AbstractHBaseTool {
|
|||
table.close();
|
||||
connection.close();
|
||||
|
||||
ScanMetrics metrics = ProtobufUtil.toScanMetrics(scan.getAttribute(Scan.SCAN_ATTRIBUTES_METRICS_DATA));
|
||||
ScanMetrics metrics = scan.getScanMetrics();
|
||||
long totalBytes = metrics.countOfBytesInResults.get();
|
||||
double throughput = (double)totalBytes / scanTimer.elapsedTime(TimeUnit.SECONDS);
|
||||
double throughputRows = (double)numRows / scanTimer.elapsedTime(TimeUnit.SECONDS);
|
||||
|
|
|
@ -5003,37 +5003,39 @@ public class TestFromClientSide {
|
|||
|
||||
Scan scan1 = new Scan();
|
||||
int numRecords = 0;
|
||||
for(Result result : ht.getScanner(scan1)) {
|
||||
ResultScanner scanner = ht.getScanner(scan1);
|
||||
for(Result result : scanner) {
|
||||
numRecords++;
|
||||
}
|
||||
scanner.close();
|
||||
LOG.info("test data has " + numRecords + " records.");
|
||||
|
||||
// by default, scan metrics collection is turned off
|
||||
assertEquals(null, scan1.getAttribute(Scan.SCAN_ATTRIBUTES_METRICS_DATA));
|
||||
assertEquals(null, scan1.getScanMetrics());
|
||||
|
||||
// turn on scan metrics
|
||||
Scan scan = new Scan();
|
||||
scan.setAttribute(Scan.SCAN_ATTRIBUTES_METRICS_ENABLE, Bytes.toBytes(Boolean.TRUE));
|
||||
scan.setCaching(numRecords+1);
|
||||
ResultScanner scanner = ht.getScanner(scan);
|
||||
Scan scan2 = new Scan();
|
||||
scan2.setScanMetricsEnabled(true);
|
||||
scan2.setCaching(numRecords+1);
|
||||
scanner = ht.getScanner(scan2);
|
||||
for (Result result : scanner.next(numRecords - 1)) {
|
||||
}
|
||||
scanner.close();
|
||||
// closing the scanner will set the metrics.
|
||||
assertNotNull(scan.getAttribute(Scan.SCAN_ATTRIBUTES_METRICS_DATA));
|
||||
assertNotNull(scan2.getScanMetrics());
|
||||
|
||||
// set caching to 1, becasue metrics are collected in each roundtrip only
|
||||
scan = new Scan();
|
||||
scan.setAttribute(Scan.SCAN_ATTRIBUTES_METRICS_ENABLE, Bytes.toBytes(Boolean.TRUE));
|
||||
scan.setCaching(1);
|
||||
scanner = ht.getScanner(scan);
|
||||
// set caching to 1, because metrics are collected in each roundtrip only
|
||||
scan2 = new Scan();
|
||||
scan2.setScanMetricsEnabled(true);
|
||||
scan2.setCaching(1);
|
||||
scanner = ht.getScanner(scan2);
|
||||
// per HBASE-5717, this should still collect even if you don't run all the way to
|
||||
// the end of the scanner. So this is asking for 2 of the 3 rows we inserted.
|
||||
for (Result result : scanner.next(numRecords - 1)) {
|
||||
}
|
||||
scanner.close();
|
||||
|
||||
ScanMetrics scanMetrics = getScanMetrics(scan);
|
||||
ScanMetrics scanMetrics = scan2.getScanMetrics();
|
||||
assertEquals("Did not access all the regions in the table", numOfRegions,
|
||||
scanMetrics.countOfRegions.get());
|
||||
|
||||
|
@ -5041,7 +5043,7 @@ public class TestFromClientSide {
|
|||
// run past the end of all the records
|
||||
Scan scanWithoutClose = new Scan();
|
||||
scanWithoutClose.setCaching(1);
|
||||
scanWithoutClose.setAttribute(Scan.SCAN_ATTRIBUTES_METRICS_ENABLE, Bytes.toBytes(Boolean.TRUE));
|
||||
scanWithoutClose.setScanMetricsEnabled(true);
|
||||
ResultScanner scannerWithoutClose = ht.getScanner(scanWithoutClose);
|
||||
for (Result result : scannerWithoutClose.next(numRecords + 1)) {
|
||||
}
|
||||
|
@ -5054,7 +5056,7 @@ public class TestFromClientSide {
|
|||
Scan scanWithClose = new Scan();
|
||||
// make sure we can set caching up to the number of a scanned values
|
||||
scanWithClose.setCaching(numRecords);
|
||||
scanWithClose.setAttribute(Scan.SCAN_ATTRIBUTES_METRICS_ENABLE, Bytes.toBytes(Boolean.TRUE));
|
||||
scan2.setScanMetricsEnabled(true);
|
||||
ResultScanner scannerWithClose = ht.getScanner(scanWithClose);
|
||||
for (Result result : scannerWithClose.next(numRecords + 1)) {
|
||||
}
|
||||
|
@ -5068,7 +5070,6 @@ public class TestFromClientSide {
|
|||
byte[] serializedMetrics = scan.getAttribute(Scan.SCAN_ATTRIBUTES_METRICS_DATA);
|
||||
assertTrue("Serialized metrics were not found.", serializedMetrics != null);
|
||||
|
||||
|
||||
ScanMetrics scanMetrics = ProtobufUtil.toScanMetrics(serializedMetrics);
|
||||
|
||||
return scanMetrics;
|
||||
|
|
|
@ -112,7 +112,7 @@ public class TestRegionObserverInterface {
|
|||
util.shutdownMiniCluster();
|
||||
}
|
||||
|
||||
@Test
|
||||
@Test (timeout=300000)
|
||||
public void testRegionObserver() throws IOException {
|
||||
TableName tableName = TableName.valueOf(TEST_TABLE.getNameAsString() + ".testRegionObserver");
|
||||
// recreate table every time in order to reset the status of the
|
||||
|
@ -176,7 +176,7 @@ public class TestRegionObserverInterface {
|
|||
new Integer[] {1, 1, 1, 1});
|
||||
}
|
||||
|
||||
@Test
|
||||
@Test (timeout=300000)
|
||||
public void testRowMutation() throws IOException {
|
||||
TableName tableName = TableName.valueOf(TEST_TABLE.getNameAsString() + ".testRowMutation");
|
||||
Table table = util.createTable(tableName, new byte[][] {A, B, C});
|
||||
|
@ -213,7 +213,7 @@ public class TestRegionObserverInterface {
|
|||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
@Test (timeout=300000)
|
||||
public void testIncrementHook() throws IOException {
|
||||
TableName tableName = TableName.valueOf(TEST_TABLE.getNameAsString() + ".testIncrementHook");
|
||||
Table table = util.createTable(tableName, new byte[][] {A, B, C});
|
||||
|
@ -240,7 +240,7 @@ public class TestRegionObserverInterface {
|
|||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
@Test (timeout=300000)
|
||||
public void testCheckAndPutHooks() throws IOException {
|
||||
TableName tableName =
|
||||
TableName.valueOf(TEST_TABLE.getNameAsString() + ".testCheckAndPutHooks");
|
||||
|
@ -268,7 +268,7 @@ public class TestRegionObserverInterface {
|
|||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
@Test (timeout=300000)
|
||||
public void testCheckAndDeleteHooks() throws IOException {
|
||||
TableName tableName =
|
||||
TableName.valueOf(TEST_TABLE.getNameAsString() + ".testCheckAndDeleteHooks");
|
||||
|
@ -298,7 +298,7 @@ public class TestRegionObserverInterface {
|
|||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
@Test (timeout=300000)
|
||||
public void testAppendHook() throws IOException {
|
||||
TableName tableName = TableName.valueOf(TEST_TABLE.getNameAsString() + ".testAppendHook");
|
||||
Table table = util.createTable(tableName, new byte[][] {A, B, C});
|
||||
|
@ -325,7 +325,7 @@ public class TestRegionObserverInterface {
|
|||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
@Test (timeout=300000)
|
||||
// HBase-3583
|
||||
public void testHBase3583() throws IOException {
|
||||
TableName tableName =
|
||||
|
@ -377,7 +377,7 @@ public class TestRegionObserverInterface {
|
|||
table.close();
|
||||
}
|
||||
|
||||
@Test
|
||||
@Test (timeout=300000)
|
||||
// HBase-3758
|
||||
public void testHBase3758() throws IOException {
|
||||
TableName tableName =
|
||||
|
@ -483,7 +483,7 @@ public class TestRegionObserverInterface {
|
|||
* Tests overriding compaction handling via coprocessor hooks
|
||||
* @throws Exception
|
||||
*/
|
||||
@Test
|
||||
@Test (timeout=300000)
|
||||
public void testCompactionOverride() throws Exception {
|
||||
TableName compactTable = TableName.valueOf("TestCompactionOverride");
|
||||
Admin admin = util.getHBaseAdmin();
|
||||
|
@ -554,7 +554,7 @@ public class TestRegionObserverInterface {
|
|||
table.close();
|
||||
}
|
||||
|
||||
@Test
|
||||
@Test (timeout=300000)
|
||||
public void bulkLoadHFileTest() throws Exception {
|
||||
String testName = TestRegionObserverInterface.class.getName()+".bulkLoadHFileTest";
|
||||
TableName tableName = TableName.valueOf(TEST_TABLE.getNameAsString() + ".bulkLoadHFileTest");
|
||||
|
@ -587,7 +587,7 @@ public class TestRegionObserverInterface {
|
|||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
@Test (timeout=300000)
|
||||
public void testRecovery() throws Exception {
|
||||
LOG.info(TestRegionObserverInterface.class.getName() +".testRecovery");
|
||||
TableName tableName = TableName.valueOf(TEST_TABLE.getNameAsString() + ".testRecovery");
|
||||
|
@ -637,7 +637,7 @@ public class TestRegionObserverInterface {
|
|||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
@Test (timeout=300000)
|
||||
public void testLegacyRecovery() throws Exception {
|
||||
LOG.info(TestRegionObserverInterface.class.getName() +".testLegacyRecovery");
|
||||
TableName tableName = TableName.valueOf(TEST_TABLE.getNameAsString() + ".testLegacyRecovery");
|
||||
|
@ -687,7 +687,7 @@ public class TestRegionObserverInterface {
|
|||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
@Test (timeout=300000)
|
||||
public void testPreWALRestoreSkip() throws Exception {
|
||||
LOG.info(TestRegionObserverInterface.class.getName() + ".testPreWALRestoreSkip");
|
||||
TableName tableName = TableName.valueOf(SimpleRegionObserver.TABLE_SKIPPED);
|
||||
|
@ -772,5 +772,4 @@ public class TestRegionObserverInterface {
|
|||
writer.close();
|
||||
}
|
||||
}
|
||||
|
||||
}
|
Loading…
Reference in New Issue