Merge -r 1178649:1178650 from trunk to branch-0.23 to fix MAPREDUCE-3138.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23@1178651 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
9a251d5ac0
commit
4f754bfb15
|
@ -306,6 +306,9 @@ Release 0.23.0 - Unreleased
|
||||||
|
|
||||||
MAPREDUCE-3134. Added documentation the CapacityScheduler. (acmurthy)
|
MAPREDUCE-3134. Added documentation the CapacityScheduler. (acmurthy)
|
||||||
|
|
||||||
|
MAPREDUCE-3138. Add a utility to help applications bridge changes in
|
||||||
|
Context Objects APIs due to MAPREDUCE-954. (omalley via acmurthy)
|
||||||
|
|
||||||
OPTIMIZATIONS
|
OPTIMIZATIONS
|
||||||
|
|
||||||
MAPREDUCE-2026. Make JobTracker.getJobCounters() and
|
MAPREDUCE-2026. Make JobTracker.getJobCounters() and
|
||||||
|
|
|
@ -0,0 +1,241 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.mapreduce;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.lang.reflect.Constructor;
|
||||||
|
import java.lang.reflect.Field;
|
||||||
|
import java.lang.reflect.InvocationTargetException;
|
||||||
|
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A factory to allow applications to deal with inconsistencies between
|
||||||
|
* MapReduce Context Objects API between hadoop-0.20 and later versions.
|
||||||
|
*/
|
||||||
|
public class ContextFactory {
|
||||||
|
|
||||||
|
private static final Constructor<?> JOB_CONTEXT_CONSTRUCTOR;
|
||||||
|
private static final Constructor<?> TASK_CONTEXT_CONSTRUCTOR;
|
||||||
|
private static final Constructor<?> MAP_CONTEXT_CONSTRUCTOR;
|
||||||
|
private static final Constructor<?> MAP_CONTEXT_IMPL_CONSTRUCTOR;
|
||||||
|
private static final boolean useV21;
|
||||||
|
|
||||||
|
private static final Field REPORTER_FIELD;
|
||||||
|
private static final Field READER_FIELD;
|
||||||
|
private static final Field WRITER_FIELD;
|
||||||
|
private static final Field OUTER_MAP_FIELD;
|
||||||
|
private static final Field WRAPPED_CONTEXT_FIELD;
|
||||||
|
|
||||||
|
static {
|
||||||
|
boolean v21 = true;
|
||||||
|
final String PACKAGE = "org.apache.hadoop.mapreduce";
|
||||||
|
try {
|
||||||
|
Class.forName(PACKAGE + ".task.JobContextImpl");
|
||||||
|
} catch (ClassNotFoundException cnfe) {
|
||||||
|
v21 = false;
|
||||||
|
}
|
||||||
|
useV21 = v21;
|
||||||
|
Class<?> jobContextCls;
|
||||||
|
Class<?> taskContextCls;
|
||||||
|
Class<?> taskIOContextCls;
|
||||||
|
Class<?> mapCls;
|
||||||
|
Class<?> mapContextCls;
|
||||||
|
Class<?> innerMapContextCls;
|
||||||
|
try {
|
||||||
|
if (v21) {
|
||||||
|
jobContextCls =
|
||||||
|
Class.forName(PACKAGE+".task.JobContextImpl");
|
||||||
|
taskContextCls =
|
||||||
|
Class.forName(PACKAGE+".task.TaskAttemptContextImpl");
|
||||||
|
taskIOContextCls =
|
||||||
|
Class.forName(PACKAGE+".task.TaskInputOutputContextImpl");
|
||||||
|
mapContextCls = Class.forName(PACKAGE + ".task.MapContextImpl");
|
||||||
|
mapCls = Class.forName(PACKAGE + ".lib.map.WrappedMapper");
|
||||||
|
innerMapContextCls =
|
||||||
|
Class.forName(PACKAGE+".lib.map.WrappedMapper$Context");
|
||||||
|
} else {
|
||||||
|
jobContextCls =
|
||||||
|
Class.forName(PACKAGE+".JobContext");
|
||||||
|
taskContextCls =
|
||||||
|
Class.forName(PACKAGE+".TaskAttemptContext");
|
||||||
|
taskIOContextCls =
|
||||||
|
Class.forName(PACKAGE+".TaskInputOutputContext");
|
||||||
|
mapContextCls = Class.forName(PACKAGE + ".MapContext");
|
||||||
|
mapCls = Class.forName(PACKAGE + ".Mapper");
|
||||||
|
innerMapContextCls =
|
||||||
|
Class.forName(PACKAGE+".Mapper$Context");
|
||||||
|
}
|
||||||
|
} catch (ClassNotFoundException e) {
|
||||||
|
throw new IllegalArgumentException("Can't find class", e);
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
JOB_CONTEXT_CONSTRUCTOR =
|
||||||
|
jobContextCls.getConstructor(Configuration.class, JobID.class);
|
||||||
|
JOB_CONTEXT_CONSTRUCTOR.setAccessible(true);
|
||||||
|
TASK_CONTEXT_CONSTRUCTOR =
|
||||||
|
taskContextCls.getConstructor(Configuration.class,
|
||||||
|
TaskAttemptID.class);
|
||||||
|
TASK_CONTEXT_CONSTRUCTOR.setAccessible(true);
|
||||||
|
if (useV21) {
|
||||||
|
MAP_CONTEXT_CONSTRUCTOR =
|
||||||
|
innerMapContextCls.getConstructor(mapCls,
|
||||||
|
MapContext.class);
|
||||||
|
MAP_CONTEXT_IMPL_CONSTRUCTOR =
|
||||||
|
mapContextCls.getDeclaredConstructor(Configuration.class,
|
||||||
|
TaskAttemptID.class,
|
||||||
|
RecordReader.class,
|
||||||
|
RecordWriter.class,
|
||||||
|
OutputCommitter.class,
|
||||||
|
StatusReporter.class,
|
||||||
|
InputSplit.class);
|
||||||
|
MAP_CONTEXT_IMPL_CONSTRUCTOR.setAccessible(true);
|
||||||
|
WRAPPED_CONTEXT_FIELD =
|
||||||
|
innerMapContextCls.getDeclaredField("mapContext");
|
||||||
|
WRAPPED_CONTEXT_FIELD.setAccessible(true);
|
||||||
|
} else {
|
||||||
|
MAP_CONTEXT_CONSTRUCTOR =
|
||||||
|
innerMapContextCls.getConstructor(mapCls,
|
||||||
|
Configuration.class,
|
||||||
|
TaskAttemptID.class,
|
||||||
|
RecordReader.class,
|
||||||
|
RecordWriter.class,
|
||||||
|
OutputCommitter.class,
|
||||||
|
StatusReporter.class,
|
||||||
|
InputSplit.class);
|
||||||
|
MAP_CONTEXT_IMPL_CONSTRUCTOR = null;
|
||||||
|
WRAPPED_CONTEXT_FIELD = null;
|
||||||
|
}
|
||||||
|
MAP_CONTEXT_CONSTRUCTOR.setAccessible(true);
|
||||||
|
REPORTER_FIELD = taskIOContextCls.getDeclaredField("reporter");
|
||||||
|
REPORTER_FIELD.setAccessible(true);
|
||||||
|
READER_FIELD = mapContextCls.getDeclaredField("reader");
|
||||||
|
READER_FIELD.setAccessible(true);
|
||||||
|
WRITER_FIELD = taskIOContextCls.getDeclaredField("output");
|
||||||
|
WRITER_FIELD.setAccessible(true);
|
||||||
|
OUTER_MAP_FIELD = innerMapContextCls.getDeclaredField("this$0");
|
||||||
|
OUTER_MAP_FIELD.setAccessible(true);
|
||||||
|
} catch (SecurityException e) {
|
||||||
|
throw new IllegalArgumentException("Can't run constructor ", e);
|
||||||
|
} catch (NoSuchMethodException e) {
|
||||||
|
throw new IllegalArgumentException("Can't find constructor ", e);
|
||||||
|
} catch (NoSuchFieldException e) {
|
||||||
|
throw new IllegalArgumentException("Can't find field ", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Clone a job or task attempt context with a new configuration.
|
||||||
|
* @param original the original context
|
||||||
|
* @param conf the new configuration
|
||||||
|
* @return a new context object
|
||||||
|
* @throws InterruptedException
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
|
public static JobContext cloneContext(JobContext original,
|
||||||
|
Configuration conf
|
||||||
|
) throws IOException,
|
||||||
|
InterruptedException {
|
||||||
|
try {
|
||||||
|
if (original instanceof MapContext<?,?,?,?>) {
|
||||||
|
return cloneMapContext((Mapper.Context) original, conf, null, null);
|
||||||
|
} else if (original instanceof ReduceContext<?,?,?,?>) {
|
||||||
|
throw new IllegalArgumentException("can't clone ReduceContext");
|
||||||
|
} else if (original instanceof TaskAttemptContext) {
|
||||||
|
TaskAttemptContext spec = (TaskAttemptContext) original;
|
||||||
|
return (JobContext)
|
||||||
|
TASK_CONTEXT_CONSTRUCTOR.newInstance(conf, spec.getTaskAttemptID());
|
||||||
|
} else {
|
||||||
|
return (JobContext)
|
||||||
|
JOB_CONTEXT_CONSTRUCTOR.newInstance(conf, original.getJobID());
|
||||||
|
}
|
||||||
|
} catch (InstantiationException e) {
|
||||||
|
throw new IllegalArgumentException("Can't clone object", e);
|
||||||
|
} catch (IllegalAccessException e) {
|
||||||
|
throw new IllegalArgumentException("Can't clone object", e);
|
||||||
|
} catch (InvocationTargetException e) {
|
||||||
|
throw new IllegalArgumentException("Can't clone object", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Copy a mapper context, optionally replacing the input and output.
|
||||||
|
* @param <K1> input key type
|
||||||
|
* @param <V1> input value type
|
||||||
|
* @param <K2> output key type
|
||||||
|
* @param <V2> output value type
|
||||||
|
* @param context the context to clone
|
||||||
|
* @param conf a new configuration
|
||||||
|
* @param reader Reader to read from. Null means to clone from context.
|
||||||
|
* @param writer Writer to write to. Null means to clone from context.
|
||||||
|
* @return a new context. it will not be the same class as the original.
|
||||||
|
* @throws IOException
|
||||||
|
* @throws InterruptedException
|
||||||
|
*/
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
|
public static <K1,V1,K2,V2> Mapper<K1,V1,K2,V2>.Context
|
||||||
|
cloneMapContext(MapContext<K1,V1,K2,V2> context,
|
||||||
|
Configuration conf,
|
||||||
|
RecordReader<K1,V1> reader,
|
||||||
|
RecordWriter<K2,V2> writer
|
||||||
|
) throws IOException, InterruptedException {
|
||||||
|
try {
|
||||||
|
// get the outer object pointer
|
||||||
|
Object outer = OUTER_MAP_FIELD.get(context);
|
||||||
|
// if it is a wrapped 21 context, unwrap it
|
||||||
|
if ("org.apache.hadoop.mapreduce.lib.map.WrappedMapper$Context".equals
|
||||||
|
(context.getClass().getName())) {
|
||||||
|
context = (MapContext<K1,V1,K2,V2>) WRAPPED_CONTEXT_FIELD.get(context);
|
||||||
|
}
|
||||||
|
// if the reader or writer aren't given, use the same ones
|
||||||
|
if (reader == null) {
|
||||||
|
reader = (RecordReader<K1,V1>) READER_FIELD.get(context);
|
||||||
|
}
|
||||||
|
if (writer == null) {
|
||||||
|
writer = (RecordWriter<K2,V2>) WRITER_FIELD.get(context);
|
||||||
|
}
|
||||||
|
if (useV21) {
|
||||||
|
Object basis =
|
||||||
|
MAP_CONTEXT_IMPL_CONSTRUCTOR.newInstance(conf,
|
||||||
|
context.getTaskAttemptID(),
|
||||||
|
reader, writer,
|
||||||
|
context.getOutputCommitter(),
|
||||||
|
REPORTER_FIELD.get(context),
|
||||||
|
context.getInputSplit());
|
||||||
|
return (Mapper.Context)
|
||||||
|
MAP_CONTEXT_CONSTRUCTOR.newInstance(outer, basis);
|
||||||
|
} else {
|
||||||
|
return (Mapper.Context)
|
||||||
|
MAP_CONTEXT_CONSTRUCTOR.newInstance(outer,
|
||||||
|
conf, context.getTaskAttemptID(),
|
||||||
|
reader, writer,
|
||||||
|
context.getOutputCommitter(),
|
||||||
|
REPORTER_FIELD.get(context),
|
||||||
|
context.getInputSplit());
|
||||||
|
}
|
||||||
|
} catch (IllegalAccessException e) {
|
||||||
|
throw new IllegalArgumentException("Can't access field", e);
|
||||||
|
} catch (InstantiationException e) {
|
||||||
|
throw new IllegalArgumentException("Can't create object", e);
|
||||||
|
} catch (InvocationTargetException e) {
|
||||||
|
throw new IllegalArgumentException("Can't invoke constructor", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue