HBASE-5663 HBASE-5636 MultithreadedTableMapper doesn't work (Takuya Ueshin)

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1308353 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Zhihong Yu 2012-04-02 13:50:03 +00:00
parent 86f8282bef
commit 80722e7290
4 changed files with 77 additions and 40 deletions

View File

@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.mapreduce;
import java.io.IOException; import java.io.IOException;
import java.lang.reflect.Constructor; import java.lang.reflect.Constructor;
import java.lang.reflect.Method;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
@ -31,11 +32,14 @@ import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.MapContext;
import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.StatusReporter; import org.apache.hadoop.mapreduce.StatusReporter;
import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.ReflectionUtils;
@ -239,15 +243,17 @@ public class MultithreadedTableMapper<K2, V2> extends TableMapper<K2, V2> {
context.getConfiguration()); context.getConfiguration());
try { try {
Constructor c = context.getClass().getConstructor( Constructor c = context.getClass().getConstructor(
Mapper.class,
Configuration.class, Configuration.class,
outer.getTaskAttemptID().getClass(), TaskAttemptID.class,
SubMapRecordReader.class, RecordReader.class,
SubMapRecordWriter.class, RecordWriter.class,
context.getOutputCommitter().getClass(), OutputCommitter.class,
SubMapStatusReporter.class, StatusReporter.class,
outer.getInputSplit().getClass()); InputSplit.class);
c.setAccessible(true); c.setAccessible(true);
subcontext = (Context) c.newInstance( subcontext = (Context) c.newInstance(
mapper,
outer.getConfiguration(), outer.getConfiguration(),
outer.getTaskAttemptID(), outer.getTaskAttemptID(),
new SubMapRecordReader(), new SubMapRecordReader(),
@ -256,8 +262,31 @@ public class MultithreadedTableMapper<K2, V2> extends TableMapper<K2, V2> {
new SubMapStatusReporter(), new SubMapStatusReporter(),
outer.getInputSplit()); outer.getInputSplit());
} catch (Exception e) { } catch (Exception e) {
// rethrow as IOE try {
throw new IOException(e); Constructor c = Class.forName("org.apache.hadoop.mapreduce.task.MapContextImpl").getConstructor(
Configuration.class,
TaskAttemptID.class,
RecordReader.class,
RecordWriter.class,
OutputCommitter.class,
StatusReporter.class,
InputSplit.class);
c.setAccessible(true);
MapContext mc = (MapContext) c.newInstance(
outer.getConfiguration(),
outer.getTaskAttemptID(),
new SubMapRecordReader(),
new SubMapRecordWriter(),
context.getOutputCommitter(),
new SubMapStatusReporter(),
outer.getInputSplit());
Class<?> wrappedMapperClass = Class.forName("org.apache.hadoop.mapreduce.lib.map.WrappedMapper");
Method getMapContext = wrappedMapperClass.getMethod("getMapContext", MapContext.class);
subcontext = (Context) getMapContext.invoke(wrappedMapperClass.newInstance(), mc);
} catch (Exception ee) {
// rethrow as IOE
throw new IOException(e);
}
} }
} }
@ -270,4 +299,4 @@ public class MultithreadedTableMapper<K2, V2> extends TableMapper<K2, V2> {
} }
} }
} }
} }

View File

@ -21,6 +21,7 @@ package org.apache.hadoop.hbase.mapred;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.util.Iterator;
import java.util.Map; import java.util.Map;
import java.util.NavigableMap; import java.util.NavigableMap;
@ -28,7 +29,6 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.hbase.*; import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Result;
@ -42,11 +42,15 @@ import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase; import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.RunningJob;
import org.junit.AfterClass; import org.junit.AfterClass;
import org.junit.BeforeClass; import org.junit.BeforeClass;
import org.junit.Test; import org.junit.Test;
import org.junit.experimental.categories.Category; import org.junit.experimental.categories.Category;
import static org.junit.Assert.fail;
import static org.junit.Assert.assertTrue;
/** /**
* Test Map/Reduce job over HBase tables. The map/reduce process we're testing * Test Map/Reduce job over HBase tables. The map/reduce process we're testing
* on our tables is simple - take every row in the table, reverse the value of * on our tables is simple - take every row in the table, reverse the value of
@ -58,7 +62,7 @@ public class TestTableMapReduce {
LogFactory.getLog(TestTableMapReduce.class.getName()); LogFactory.getLog(TestTableMapReduce.class.getName());
private static final HBaseTestingUtility UTIL = private static final HBaseTestingUtility UTIL =
new HBaseTestingUtility(); new HBaseTestingUtility();
static final String MULTI_REGION_TABLE_NAME = "mrtest"; static final byte[] MULTI_REGION_TABLE_NAME = Bytes.toBytes("mrtest");
static final byte[] INPUT_FAMILY = Bytes.toBytes("contents"); static final byte[] INPUT_FAMILY = Bytes.toBytes("contents");
static final byte[] OUTPUT_FAMILY = Bytes.toBytes("text"); static final byte[] OUTPUT_FAMILY = Bytes.toBytes("text");
@ -69,12 +73,10 @@ public class TestTableMapReduce {
@BeforeClass @BeforeClass
public static void beforeClass() throws Exception { public static void beforeClass() throws Exception {
HTableDescriptor desc = new HTableDescriptor(MULTI_REGION_TABLE_NAME);
desc.addFamily(new HColumnDescriptor(INPUT_FAMILY));
desc.addFamily(new HColumnDescriptor(OUTPUT_FAMILY));
UTIL.startMiniCluster(); UTIL.startMiniCluster();
HBaseAdmin admin = new HBaseAdmin(UTIL.getConfiguration()); HTable table = UTIL.createTable(MULTI_REGION_TABLE_NAME, new byte[][] {INPUT_FAMILY, OUTPUT_FAMILY});
admin.createTable(desc, HBaseTestingUtility.KEYS); UTIL.createMultiRegions(table, INPUT_FAMILY);
UTIL.loadTable(table, INPUT_FAMILY);
UTIL.startMiniMapReduceCluster(); UTIL.startMiniMapReduceCluster();
} }
@ -150,7 +152,8 @@ public class TestTableMapReduce {
IdentityTableReduce.class, jobConf); IdentityTableReduce.class, jobConf);
LOG.info("Started " + Bytes.toString(table.getTableName())); LOG.info("Started " + Bytes.toString(table.getTableName()));
JobClient.runJob(jobConf); RunningJob job = JobClient.runJob(jobConf);
assertTrue(job.isSuccessful());
LOG.info("After map/reduce completion"); LOG.info("After map/reduce completion");
// verify map-reduce results // verify map-reduce results
@ -184,7 +187,7 @@ public class TestTableMapReduce {
// continue // continue
} }
} }
org.junit.Assert.assertTrue(verified); assertTrue(verified);
} }
/** /**
@ -199,7 +202,10 @@ public class TestTableMapReduce {
TableInputFormat.addColumns(scan, columns); TableInputFormat.addColumns(scan, columns);
ResultScanner scanner = table.getScanner(scan); ResultScanner scanner = table.getScanner(scan);
try { try {
for (Result r : scanner) { Iterator<Result> itr = scanner.iterator();
assertTrue(itr.hasNext());
while(itr.hasNext()) {
Result r = itr.next();
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
if (r.size() > 2 ) { if (r.size() > 2 ) {
throw new IOException("Too many results, expected 2 got " + throw new IOException("Too many results, expected 2 got " +
@ -247,7 +253,7 @@ public class TestTableMapReduce {
r.getRow() + ", first value=" + first + ", second value=" + r.getRow() + ", first value=" + first + ", second value=" +
second); second);
} }
org.junit.Assert.fail(); fail();
} }
} }
} finally { } finally {

View File

@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.mapreduce;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.util.Iterator;
import java.util.Map; import java.util.Map;
import java.util.NavigableMap; import java.util.NavigableMap;
@ -28,7 +29,6 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.*; import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Result;
@ -52,23 +52,21 @@ import static org.junit.Assert.assertTrue;
* a particular cell, and write it back to the table. * a particular cell, and write it back to the table.
*/ */
@Category(LargeTests.class) @Category(LargeTests.class)
public class TestMulitthreadedTableMapper { public class TestMultithreadedTableMapper {
private static final Log LOG = LogFactory.getLog(TestMulitthreadedTableMapper.class); private static final Log LOG = LogFactory.getLog(TestMultithreadedTableMapper.class);
private static final HBaseTestingUtility UTIL = private static final HBaseTestingUtility UTIL =
new HBaseTestingUtility(); new HBaseTestingUtility();
static final String MULTI_REGION_TABLE_NAME = "mrtest"; static final byte[] MULTI_REGION_TABLE_NAME = Bytes.toBytes("mrtest");
static final byte[] INPUT_FAMILY = Bytes.toBytes("contents"); static final byte[] INPUT_FAMILY = Bytes.toBytes("contents");
static final byte[] OUTPUT_FAMILY = Bytes.toBytes("text"); static final byte[] OUTPUT_FAMILY = Bytes.toBytes("text");
static final int NUMBER_OF_THREADS = 10; static final int NUMBER_OF_THREADS = 10;
@BeforeClass @BeforeClass
public static void beforeClass() throws Exception { public static void beforeClass() throws Exception {
HTableDescriptor desc = new HTableDescriptor(MULTI_REGION_TABLE_NAME);
desc.addFamily(new HColumnDescriptor(INPUT_FAMILY));
desc.addFamily(new HColumnDescriptor(OUTPUT_FAMILY));
UTIL.startMiniCluster(); UTIL.startMiniCluster();
HBaseAdmin admin = new HBaseAdmin(UTIL.getConfiguration()); HTable table = UTIL.createTable(MULTI_REGION_TABLE_NAME, new byte[][] {INPUT_FAMILY, OUTPUT_FAMILY});
admin.createTable(desc, HBaseTestingUtility.KEYS); UTIL.createMultiRegions(table, INPUT_FAMILY);
UTIL.loadTable(table, INPUT_FAMILY);
UTIL.startMiniMapReduceCluster(); UTIL.startMiniMapReduceCluster();
} }
@ -149,7 +147,7 @@ public class TestMulitthreadedTableMapper {
IdentityTableReducer.class, job); IdentityTableReducer.class, job);
FileOutputFormat.setOutputPath(job, new Path("test")); FileOutputFormat.setOutputPath(job, new Path("test"));
LOG.info("Started " + Bytes.toString(table.getTableName())); LOG.info("Started " + Bytes.toString(table.getTableName()));
job.waitForCompletion(true); assertTrue(job.waitForCompletion(true));
LOG.info("After map/reduce completion"); LOG.info("After map/reduce completion");
// verify map-reduce results // verify map-reduce results
verify(Bytes.toString(table.getTableName())); verify(Bytes.toString(table.getTableName()));
@ -203,7 +201,10 @@ public class TestMulitthreadedTableMapper {
scan.addFamily(OUTPUT_FAMILY); scan.addFamily(OUTPUT_FAMILY);
ResultScanner scanner = table.getScanner(scan); ResultScanner scanner = table.getScanner(scan);
try { try {
for (Result r : scanner) { Iterator<Result> itr = scanner.iterator();
assertTrue(itr.hasNext());
while(itr.hasNext()) {
Result r = itr.next();
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
if (r.size() > 2 ) { if (r.size() > 2 ) {
throw new IOException("Too many results, expected 2 got " + throw new IOException("Too many results, expected 2 got " +

View File

@ -21,6 +21,7 @@ package org.apache.hadoop.hbase.mapreduce;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.util.Iterator;
import java.util.Map; import java.util.Map;
import java.util.NavigableMap; import java.util.NavigableMap;
@ -30,7 +31,6 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.*; import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Result;
@ -59,18 +59,16 @@ public class TestTableMapReduce {
private static final Log LOG = LogFactory.getLog(TestTableMapReduce.class); private static final Log LOG = LogFactory.getLog(TestTableMapReduce.class);
private static final HBaseTestingUtility UTIL = private static final HBaseTestingUtility UTIL =
new HBaseTestingUtility(); new HBaseTestingUtility();
static final String MULTI_REGION_TABLE_NAME = "mrtest"; static final byte[] MULTI_REGION_TABLE_NAME = Bytes.toBytes("mrtest");
static final byte[] INPUT_FAMILY = Bytes.toBytes("contents"); static final byte[] INPUT_FAMILY = Bytes.toBytes("contents");
static final byte[] OUTPUT_FAMILY = Bytes.toBytes("text"); static final byte[] OUTPUT_FAMILY = Bytes.toBytes("text");
@BeforeClass @BeforeClass
public static void beforeClass() throws Exception { public static void beforeClass() throws Exception {
HTableDescriptor desc = new HTableDescriptor(MULTI_REGION_TABLE_NAME);
desc.addFamily(new HColumnDescriptor(INPUT_FAMILY));
desc.addFamily(new HColumnDescriptor(OUTPUT_FAMILY));
UTIL.startMiniCluster(); UTIL.startMiniCluster();
HBaseAdmin admin = new HBaseAdmin(UTIL.getConfiguration()); HTable table = UTIL.createTable(MULTI_REGION_TABLE_NAME, new byte[][] {INPUT_FAMILY, OUTPUT_FAMILY});
admin.createTable(desc, HBaseTestingUtility.KEYS); UTIL.createMultiRegions(table, INPUT_FAMILY);
UTIL.loadTable(table, INPUT_FAMILY);
UTIL.startMiniMapReduceCluster(); UTIL.startMiniMapReduceCluster();
} }
@ -150,7 +148,7 @@ public class TestTableMapReduce {
IdentityTableReducer.class, job); IdentityTableReducer.class, job);
FileOutputFormat.setOutputPath(job, new Path("test")); FileOutputFormat.setOutputPath(job, new Path("test"));
LOG.info("Started " + Bytes.toString(table.getTableName())); LOG.info("Started " + Bytes.toString(table.getTableName()));
job.waitForCompletion(true); assertTrue(job.waitForCompletion(true));
LOG.info("After map/reduce completion"); LOG.info("After map/reduce completion");
// verify map-reduce results // verify map-reduce results
@ -204,7 +202,10 @@ public class TestTableMapReduce {
scan.addFamily(OUTPUT_FAMILY); scan.addFamily(OUTPUT_FAMILY);
ResultScanner scanner = table.getScanner(scan); ResultScanner scanner = table.getScanner(scan);
try { try {
for (Result r : scanner) { Iterator<Result> itr = scanner.iterator();
assertTrue(itr.hasNext());
while(itr.hasNext()) {
Result r = itr.next();
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
if (r.size() > 2 ) { if (r.size() > 2 ) {
throw new IOException("Too many results, expected 2 got " + throw new IOException("Too many results, expected 2 got " +