From edc6696bc5a611a20fb1e713eb00b48d2cb7dd87 Mon Sep 17 00:00:00 2001 From: Andrew Kyle Purtell Date: Thu, 8 Mar 2012 02:28:39 +0000 Subject: [PATCH] HBASE-5480. Fixups to MultithreadedTableMapper for Hadoop 0.23 git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1298259 13f79535-47bb-0310-9956-ffa450edef68 --- .../mapreduce/MultithreadedTableMapper.java | 23 ++++++++++++++++++- 1 file changed, 22 insertions(+), 1 deletion(-) diff --git a/src/main/java/org/apache/hadoop/hbase/mapreduce/MultithreadedTableMapper.java b/src/main/java/org/apache/hadoop/hbase/mapreduce/MultithreadedTableMapper.java index 1bc1abc5a9c..eb96ac68445 100644 --- a/src/main/java/org/apache/hadoop/hbase/mapreduce/MultithreadedTableMapper.java +++ b/src/main/java/org/apache/hadoop/hbase/mapreduce/MultithreadedTableMapper.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hbase.mapreduce; import java.io.IOException; +import java.lang.reflect.Constructor; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -221,6 +222,10 @@ public class MultithreadedTableMapper extends TableMapper { public void setStatus(String status) { outer.setStatus(status); } + + public float getProgress() { + return 0; + } } private class MapRunner implements Runnable { @@ -228,16 +233,32 @@ public class MultithreadedTableMapper extends TableMapper { private Context subcontext; private Throwable throwable; + @SuppressWarnings({ "rawtypes", "unchecked" }) MapRunner(Context context) throws IOException, InterruptedException { mapper = ReflectionUtils.newInstance(mapClass, context.getConfiguration()); - subcontext = new Context(outer.getConfiguration(), + try { + Constructor c = context.getClass().getConstructor( + Configuration.class, + outer.getTaskAttemptID().getClass(), + SubMapRecordReader.class, + SubMapRecordWriter.class, + context.getOutputCommitter().getClass(), + SubMapStatusReporter.class, + outer.getInputSplit().getClass()); + c.setAccessible(true); + subcontext = (Context) c.newInstance( + outer.getConfiguration(), outer.getTaskAttemptID(), new SubMapRecordReader(), new SubMapRecordWriter(), context.getOutputCommitter(), new SubMapStatusReporter(), outer.getInputSplit()); + } catch (Exception e) { + // rethrow as IOE + throw new IOException(e); + } } @Override