HBASE-16678 MapReduce jobs do not update counters from ScanMetrics

Conflicts:
	hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableMapReduce.java
This commit is contained in:
Enis Soztutar 2016-09-29 15:34:03 -07:00
parent bf5a7aba5c
commit 911f9b9eb7
2 changed files with 23 additions and 3 deletions

View File

@ -40,6 +40,8 @@ import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.StringUtils;
import com.google.common.annotations.VisibleForTesting;
/** /**
* Iterate over an HBase table data, return (ImmutableBytesWritable, Result) * Iterate over an HBase table data, return (ImmutableBytesWritable, Result)
* pairs. * pairs.
@ -53,8 +55,8 @@ public class TableRecordReaderImpl {
private static final Log LOG = LogFactory.getLog(TableRecordReaderImpl.class); private static final Log LOG = LogFactory.getLog(TableRecordReaderImpl.class);
// HBASE_COUNTER_GROUP_NAME is the name of mapreduce counter group for HBase // HBASE_COUNTER_GROUP_NAME is the name of mapreduce counter group for HBase
private static final String HBASE_COUNTER_GROUP_NAME = @VisibleForTesting
"HBase Counters"; static final String HBASE_COUNTER_GROUP_NAME = "HBase Counters";
private ResultScanner scanner = null; private ResultScanner scanner = null;
private Scan scan = null; private Scan scan = null;
private Scan currentScan = null; private Scan currentScan = null;
@ -269,7 +271,7 @@ public class TableRecordReaderImpl {
* @throws IOException * @throws IOException
*/ */
private void updateCounters() throws IOException { private void updateCounters() throws IOException {
ScanMetrics scanMetrics = this.scan.getScanMetrics(); ScanMetrics scanMetrics = currentScan.getScanMetrics();
if (scanMetrics == null) { if (scanMetrics == null) {
return; return;
} }

View File

@ -19,6 +19,7 @@
package org.apache.hadoop.hbase.mapreduce; package org.apache.hadoop.hbase.mapreduce;
import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail; import static org.junit.Assert.fail;
@ -42,6 +43,8 @@ import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.junit.Test; import org.junit.Test;
@ -119,6 +122,8 @@ public class TestTableMapReduce extends TestTableMapReduceBase {
// verify map-reduce results // verify map-reduce results
verify(table.getName()); verify(table.getName());
verifyJobCountersAreEmitted(job);
} catch (InterruptedException e) { } catch (InterruptedException e) {
throw new IOException(e); throw new IOException(e);
} catch (ClassNotFoundException e) { } catch (ClassNotFoundException e) {
@ -132,6 +137,19 @@ public class TestTableMapReduce extends TestTableMapReduceBase {
} }
} }
/**
* Verify scan counters are emitted from the job
* @param job
* @throws IOException
*/
private void verifyJobCountersAreEmitted(Job job) throws IOException {
Counters counters = job.getCounters();
Counter counter
= counters.findCounter(TableRecordReaderImpl.HBASE_COUNTER_GROUP_NAME, "RPC_CALLS");
assertNotNull("Unable to find Job counter for HBase scan metrics, RPC_CALLS", counter);
assertTrue("Counter value for RPC_CALLS should be larger than 0", counter.getValue() > 0);
}
@Test(expected = TableNotEnabledException.class) @Test(expected = TableNotEnabledException.class)
public void testWritingToDisabledTable() throws IOException { public void testWritingToDisabledTable() throws IOException {