HBASE-24831 Avoid invoke Counter using reflection in SnapshotInputFormat (#2209)
Signed-off-by: Duo Zhang <zhangduo@apache.org>
This commit is contained in:
parent
70c6205bc2
commit
e042cabfb3
|
@ -18,7 +18,6 @@
|
||||||
package org.apache.hadoop.hbase.mapreduce;
|
package org.apache.hadoop.hbase.mapreduce;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.lang.reflect.Method;
|
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.hbase.DoNotRetryIOException;
|
import org.apache.hadoop.hbase.DoNotRetryIOException;
|
||||||
|
@ -60,7 +59,6 @@ public class TableRecordReaderImpl {
|
||||||
private ImmutableBytesWritable key = null;
|
private ImmutableBytesWritable key = null;
|
||||||
private Result value = null;
|
private Result value = null;
|
||||||
private TaskAttemptContext context = null;
|
private TaskAttemptContext context = null;
|
||||||
private Method getCounter = null;
|
|
||||||
private long numRestarts = 0;
|
private long numRestarts = 0;
|
||||||
private long numStale = 0;
|
private long numStale = 0;
|
||||||
private long timestamp;
|
private long timestamp;
|
||||||
|
@ -96,25 +94,6 @@ public class TableRecordReaderImpl {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* In new mapreduce APIs, TaskAttemptContext has two getCounter methods
|
|
||||||
* Check if getCounter(String, String) method is available.
|
|
||||||
* @return The getCounter method or null if not available.
|
|
||||||
*/
|
|
||||||
protected static Method retrieveGetCounterWithStringsParams(TaskAttemptContext context)
|
|
||||||
throws IOException {
|
|
||||||
Method m = null;
|
|
||||||
try {
|
|
||||||
m = context.getClass().getMethod("getCounter",
|
|
||||||
new Class [] {String.class, String.class});
|
|
||||||
} catch (SecurityException e) {
|
|
||||||
throw new IOException("Failed test for getCounter", e);
|
|
||||||
} catch (NoSuchMethodException e) {
|
|
||||||
// Ignore
|
|
||||||
}
|
|
||||||
return m;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Sets the HBase table.
|
* Sets the HBase table.
|
||||||
*
|
*
|
||||||
|
@ -145,7 +124,6 @@ public class TableRecordReaderImpl {
|
||||||
InterruptedException {
|
InterruptedException {
|
||||||
if (context != null) {
|
if (context != null) {
|
||||||
this.context = context;
|
this.context = context;
|
||||||
getCounter = retrieveGetCounterWithStringsParams(context);
|
|
||||||
}
|
}
|
||||||
restart(scan.getStartRow());
|
restart(scan.getStartRow());
|
||||||
}
|
}
|
||||||
|
@ -213,8 +191,7 @@ public class TableRecordReaderImpl {
|
||||||
rowcount ++;
|
rowcount ++;
|
||||||
if (rowcount >= logPerRowCount) {
|
if (rowcount >= logPerRowCount) {
|
||||||
long now = System.currentTimeMillis();
|
long now = System.currentTimeMillis();
|
||||||
LOG.info("Mapper took " + (now-timestamp)
|
LOG.info("Mapper took {}ms to process {} rows", (now - timestamp), rowcount);
|
||||||
+ "ms to process " + rowcount + " rows");
|
|
||||||
timestamp = now;
|
timestamp = now;
|
||||||
rowcount = 0;
|
rowcount = 0;
|
||||||
}
|
}
|
||||||
|
@ -266,8 +243,7 @@ public class TableRecordReaderImpl {
|
||||||
updateCounters();
|
updateCounters();
|
||||||
if (logScannerActivity) {
|
if (logScannerActivity) {
|
||||||
long now = System.currentTimeMillis();
|
long now = System.currentTimeMillis();
|
||||||
LOG.info("Mapper took " + (now-timestamp)
|
LOG.info("Mapper took {}ms to process {} rows", (now - timestamp), rowcount);
|
||||||
+ "ms to process " + rowcount + " rows");
|
|
||||||
LOG.info(ioe.toString(), ioe);
|
LOG.info(ioe.toString(), ioe);
|
||||||
String lastRow = lastSuccessfulRow == null ?
|
String lastRow = lastSuccessfulRow == null ?
|
||||||
"null" : Bytes.toStringBinary(lastSuccessfulRow);
|
"null" : Bytes.toStringBinary(lastSuccessfulRow);
|
||||||
|
@ -283,35 +259,39 @@ public class TableRecordReaderImpl {
|
||||||
* If hbase runs on old version of mapreduce, it won't be able to get
|
* If hbase runs on old version of mapreduce, it won't be able to get
|
||||||
* access to counters and TableRecorderReader can't update counter values.
|
* access to counters and TableRecorderReader can't update counter values.
|
||||||
*/
|
*/
|
||||||
private void updateCounters() throws IOException {
|
private void updateCounters() {
|
||||||
ScanMetrics scanMetrics = scanner.getScanMetrics();
|
ScanMetrics scanMetrics = scanner.getScanMetrics();
|
||||||
if (scanMetrics == null) {
|
if (scanMetrics == null) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
updateCounters(scanMetrics, numRestarts, getCounter, context, numStale);
|
updateCounters(scanMetrics, numRestarts, context, numStale);
|
||||||
}
|
}
|
||||||
|
|
||||||
protected static void updateCounters(ScanMetrics scanMetrics, long numScannerRestarts,
|
protected static void updateCounters(ScanMetrics scanMetrics, long numScannerRestarts,
|
||||||
Method getCounter, TaskAttemptContext context, long numStale) {
|
TaskAttemptContext context, long numStale) {
|
||||||
// we can get access to counters only if hbase uses new mapreduce APIs
|
// we can get access to counters only if hbase uses new mapreduce APIs
|
||||||
if (getCounter == null) {
|
if (context == null) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
try {
|
for (Map.Entry<String, Long> entry : scanMetrics.getMetricsMap().entrySet()) {
|
||||||
for (Map.Entry<String, Long> entry:scanMetrics.getMetricsMap().entrySet()) {
|
Counter counter = context.getCounter(HBASE_COUNTER_GROUP_NAME, entry.getKey());
|
||||||
Counter ct = (Counter)getCounter.invoke(context,
|
if (counter != null) {
|
||||||
HBASE_COUNTER_GROUP_NAME, entry.getKey());
|
counter.increment(entry.getValue());
|
||||||
|
}
|
||||||
ct.increment(entry.getValue());
|
}
|
||||||
|
if (numScannerRestarts != 0L) {
|
||||||
|
Counter counter = context.getCounter(HBASE_COUNTER_GROUP_NAME, "NUM_SCANNER_RESTARTS");
|
||||||
|
if (counter != null) {
|
||||||
|
counter.increment(numScannerRestarts);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (numStale != 0L) {
|
||||||
|
Counter counter = context.getCounter(HBASE_COUNTER_GROUP_NAME, "NUM_SCAN_RESULTS_STALE");
|
||||||
|
if (counter != null) {
|
||||||
|
counter.increment(numStale);
|
||||||
}
|
}
|
||||||
((Counter) getCounter.invoke(context, HBASE_COUNTER_GROUP_NAME,
|
|
||||||
"NUM_SCANNER_RESTARTS")).increment(numScannerRestarts);
|
|
||||||
((Counter) getCounter.invoke(context, HBASE_COUNTER_GROUP_NAME,
|
|
||||||
"NUM_SCAN_RESULTS_STALE")).increment(numStale);
|
|
||||||
} catch (Exception e) {
|
|
||||||
LOG.debug("can't update counter." + StringUtils.stringifyException(e));
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -21,7 +21,6 @@ package org.apache.hadoop.hbase.mapreduce;
|
||||||
import java.io.DataInput;
|
import java.io.DataInput;
|
||||||
import java.io.DataOutput;
|
import java.io.DataOutput;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.lang.reflect.Method;
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
|
@ -149,13 +148,11 @@ public class TableSnapshotInputFormat extends InputFormat<ImmutableBytesWritable
|
||||||
private TableSnapshotInputFormatImpl.RecordReader delegate =
|
private TableSnapshotInputFormatImpl.RecordReader delegate =
|
||||||
new TableSnapshotInputFormatImpl.RecordReader();
|
new TableSnapshotInputFormatImpl.RecordReader();
|
||||||
private TaskAttemptContext context;
|
private TaskAttemptContext context;
|
||||||
private Method getCounter;
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void initialize(InputSplit split, TaskAttemptContext context) throws IOException,
|
public void initialize(InputSplit split, TaskAttemptContext context) throws IOException,
|
||||||
InterruptedException {
|
InterruptedException {
|
||||||
this.context = context;
|
this.context = context;
|
||||||
getCounter = TableRecordReaderImpl.retrieveGetCounterWithStringsParams(context);
|
|
||||||
delegate.initialize(
|
delegate.initialize(
|
||||||
((TableSnapshotRegionSplit) split).delegate,
|
((TableSnapshotRegionSplit) split).delegate,
|
||||||
context.getConfiguration());
|
context.getConfiguration());
|
||||||
|
@ -167,7 +164,7 @@ public class TableSnapshotInputFormat extends InputFormat<ImmutableBytesWritable
|
||||||
if (result) {
|
if (result) {
|
||||||
ScanMetrics scanMetrics = delegate.getScanner().getScanMetrics();
|
ScanMetrics scanMetrics = delegate.getScanner().getScanMetrics();
|
||||||
if (scanMetrics != null && context != null) {
|
if (scanMetrics != null && context != null) {
|
||||||
TableRecordReaderImpl.updateCounters(scanMetrics, 0, getCounter, context, 0);
|
TableRecordReaderImpl.updateCounters(scanMetrics, 0, context, 0);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return result;
|
return result;
|
||||||
|
|
Loading…
Reference in New Issue