HBASE-5480. Fixups to MultithreadedTableMapper for Hadoop 0.23
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1298259 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
e4f4022493
commit
edc6696bc5
|
@ -18,6 +18,7 @@
|
||||||
package org.apache.hadoop.hbase.mapreduce;
|
package org.apache.hadoop.hbase.mapreduce;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.lang.reflect.Constructor;
|
||||||
import java.util.concurrent.ExecutorService;
|
import java.util.concurrent.ExecutorService;
|
||||||
import java.util.concurrent.Executors;
|
import java.util.concurrent.Executors;
|
||||||
|
|
||||||
|
@ -221,6 +222,10 @@ public class MultithreadedTableMapper<K2, V2> extends TableMapper<K2, V2> {
|
||||||
public void setStatus(String status) {
|
public void setStatus(String status) {
|
||||||
outer.setStatus(status);
|
outer.setStatus(status);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public float getProgress() {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private class MapRunner implements Runnable {
|
private class MapRunner implements Runnable {
|
||||||
|
@ -228,16 +233,32 @@ public class MultithreadedTableMapper<K2, V2> extends TableMapper<K2, V2> {
|
||||||
private Context subcontext;
|
private Context subcontext;
|
||||||
private Throwable throwable;
|
private Throwable throwable;
|
||||||
|
|
||||||
|
@SuppressWarnings({ "rawtypes", "unchecked" })
|
||||||
MapRunner(Context context) throws IOException, InterruptedException {
|
MapRunner(Context context) throws IOException, InterruptedException {
|
||||||
mapper = ReflectionUtils.newInstance(mapClass,
|
mapper = ReflectionUtils.newInstance(mapClass,
|
||||||
context.getConfiguration());
|
context.getConfiguration());
|
||||||
subcontext = new Context(outer.getConfiguration(),
|
try {
|
||||||
|
Constructor c = context.getClass().getConstructor(
|
||||||
|
Configuration.class,
|
||||||
|
outer.getTaskAttemptID().getClass(),
|
||||||
|
SubMapRecordReader.class,
|
||||||
|
SubMapRecordWriter.class,
|
||||||
|
context.getOutputCommitter().getClass(),
|
||||||
|
SubMapStatusReporter.class,
|
||||||
|
outer.getInputSplit().getClass());
|
||||||
|
c.setAccessible(true);
|
||||||
|
subcontext = (Context) c.newInstance(
|
||||||
|
outer.getConfiguration(),
|
||||||
outer.getTaskAttemptID(),
|
outer.getTaskAttemptID(),
|
||||||
new SubMapRecordReader(),
|
new SubMapRecordReader(),
|
||||||
new SubMapRecordWriter(),
|
new SubMapRecordWriter(),
|
||||||
context.getOutputCommitter(),
|
context.getOutputCommitter(),
|
||||||
new SubMapStatusReporter(),
|
new SubMapStatusReporter(),
|
||||||
outer.getInputSplit());
|
outer.getInputSplit());
|
||||||
|
} catch (Exception e) {
|
||||||
|
// rethrow as IOE
|
||||||
|
throw new IOException(e);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
Loading…
Reference in New Issue