diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index a0fedb9a27b..05d1fcbeed1 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -306,6 +306,9 @@ Release 0.23.0 - Unreleased MAPREDUCE-3134. Added documentation the CapacityScheduler. (acmurthy) + MAPREDUCE-3138. Add a utility to help applications bridge changes in + Context Objects APIs due to MAPREDUCE-954. (omalley via acmurthy) + OPTIMIZATIONS MAPREDUCE-2026. Make JobTracker.getJobCounters() and diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/ContextFactory.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/ContextFactory.java new file mode 100644 index 00000000000..1b1a85b7af4 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/ContextFactory.java @@ -0,0 +1,241 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapreduce; + +import java.io.IOException; +import java.lang.reflect.Constructor; +import java.lang.reflect.Field; +import java.lang.reflect.InvocationTargetException; + +import org.apache.hadoop.conf.Configuration; + +/** + * A factory to allow applications to deal with inconsistencies between + * MapReduce Context Objects API between hadoop-0.20 and later versions. + */ +public class ContextFactory { + + private static final Constructor JOB_CONTEXT_CONSTRUCTOR; + private static final Constructor TASK_CONTEXT_CONSTRUCTOR; + private static final Constructor MAP_CONTEXT_CONSTRUCTOR; + private static final Constructor MAP_CONTEXT_IMPL_CONSTRUCTOR; + private static final boolean useV21; + + private static final Field REPORTER_FIELD; + private static final Field READER_FIELD; + private static final Field WRITER_FIELD; + private static final Field OUTER_MAP_FIELD; + private static final Field WRAPPED_CONTEXT_FIELD; + + static { + boolean v21 = true; + final String PACKAGE = "org.apache.hadoop.mapreduce"; + try { + Class.forName(PACKAGE + ".task.JobContextImpl"); + } catch (ClassNotFoundException cnfe) { + v21 = false; + } + useV21 = v21; + Class jobContextCls; + Class taskContextCls; + Class taskIOContextCls; + Class mapCls; + Class mapContextCls; + Class innerMapContextCls; + try { + if (v21) { + jobContextCls = + Class.forName(PACKAGE+".task.JobContextImpl"); + taskContextCls = + Class.forName(PACKAGE+".task.TaskAttemptContextImpl"); + taskIOContextCls = + Class.forName(PACKAGE+".task.TaskInputOutputContextImpl"); + mapContextCls = Class.forName(PACKAGE + ".task.MapContextImpl"); + mapCls = Class.forName(PACKAGE + ".lib.map.WrappedMapper"); + innerMapContextCls = + Class.forName(PACKAGE+".lib.map.WrappedMapper$Context"); + } else { + jobContextCls = + Class.forName(PACKAGE+".JobContext"); + taskContextCls = + Class.forName(PACKAGE+".TaskAttemptContext"); + taskIOContextCls = + Class.forName(PACKAGE+".TaskInputOutputContext"); + mapContextCls = Class.forName(PACKAGE + ".MapContext"); + mapCls = Class.forName(PACKAGE + ".Mapper"); + innerMapContextCls = + Class.forName(PACKAGE+".Mapper$Context"); + } + } catch (ClassNotFoundException e) { + throw new IllegalArgumentException("Can't find class", e); + } + try { + JOB_CONTEXT_CONSTRUCTOR = + jobContextCls.getConstructor(Configuration.class, JobID.class); + JOB_CONTEXT_CONSTRUCTOR.setAccessible(true); + TASK_CONTEXT_CONSTRUCTOR = + taskContextCls.getConstructor(Configuration.class, + TaskAttemptID.class); + TASK_CONTEXT_CONSTRUCTOR.setAccessible(true); + if (useV21) { + MAP_CONTEXT_CONSTRUCTOR = + innerMapContextCls.getConstructor(mapCls, + MapContext.class); + MAP_CONTEXT_IMPL_CONSTRUCTOR = + mapContextCls.getDeclaredConstructor(Configuration.class, + TaskAttemptID.class, + RecordReader.class, + RecordWriter.class, + OutputCommitter.class, + StatusReporter.class, + InputSplit.class); + MAP_CONTEXT_IMPL_CONSTRUCTOR.setAccessible(true); + WRAPPED_CONTEXT_FIELD = + innerMapContextCls.getDeclaredField("mapContext"); + WRAPPED_CONTEXT_FIELD.setAccessible(true); + } else { + MAP_CONTEXT_CONSTRUCTOR = + innerMapContextCls.getConstructor(mapCls, + Configuration.class, + TaskAttemptID.class, + RecordReader.class, + RecordWriter.class, + OutputCommitter.class, + StatusReporter.class, + InputSplit.class); + MAP_CONTEXT_IMPL_CONSTRUCTOR = null; + WRAPPED_CONTEXT_FIELD = null; + } + MAP_CONTEXT_CONSTRUCTOR.setAccessible(true); + REPORTER_FIELD = taskIOContextCls.getDeclaredField("reporter"); + REPORTER_FIELD.setAccessible(true); + READER_FIELD = mapContextCls.getDeclaredField("reader"); + READER_FIELD.setAccessible(true); + WRITER_FIELD = taskIOContextCls.getDeclaredField("output"); + WRITER_FIELD.setAccessible(true); + OUTER_MAP_FIELD = innerMapContextCls.getDeclaredField("this$0"); + OUTER_MAP_FIELD.setAccessible(true); + } catch (SecurityException e) { + throw new IllegalArgumentException("Can't run constructor ", e); + } catch (NoSuchMethodException e) { + throw new IllegalArgumentException("Can't find constructor ", e); + } catch (NoSuchFieldException e) { + throw new IllegalArgumentException("Can't find field ", e); + } + } + + /** + * Clone a job or task attempt context with a new configuration. + * @param original the original context + * @param conf the new configuration + * @return a new context object + * @throws InterruptedException + * @throws IOException + */ + @SuppressWarnings("unchecked") + public static JobContext cloneContext(JobContext original, + Configuration conf + ) throws IOException, + InterruptedException { + try { + if (original instanceof MapContext) { + return cloneMapContext((Mapper.Context) original, conf, null, null); + } else if (original instanceof ReduceContext) { + throw new IllegalArgumentException("can't clone ReduceContext"); + } else if (original instanceof TaskAttemptContext) { + TaskAttemptContext spec = (TaskAttemptContext) original; + return (JobContext) + TASK_CONTEXT_CONSTRUCTOR.newInstance(conf, spec.getTaskAttemptID()); + } else { + return (JobContext) + JOB_CONTEXT_CONSTRUCTOR.newInstance(conf, original.getJobID()); + } + } catch (InstantiationException e) { + throw new IllegalArgumentException("Can't clone object", e); + } catch (IllegalAccessException e) { + throw new IllegalArgumentException("Can't clone object", e); + } catch (InvocationTargetException e) { + throw new IllegalArgumentException("Can't clone object", e); + } + } + + /** + * Copy a mapper context, optionally replacing the input and output. + * @param input key type + * @param input value type + * @param output key type + * @param output value type + * @param context the context to clone + * @param conf a new configuration + * @param reader Reader to read from. Null means to clone from context. + * @param writer Writer to write to. Null means to clone from context. + * @return a new context. it will not be the same class as the original. + * @throws IOException + * @throws InterruptedException + */ + @SuppressWarnings("unchecked") + public static Mapper.Context + cloneMapContext(MapContext context, + Configuration conf, + RecordReader reader, + RecordWriter writer + ) throws IOException, InterruptedException { + try { + // get the outer object pointer + Object outer = OUTER_MAP_FIELD.get(context); + // if it is a wrapped 21 context, unwrap it + if ("org.apache.hadoop.mapreduce.lib.map.WrappedMapper$Context".equals + (context.getClass().getName())) { + context = (MapContext) WRAPPED_CONTEXT_FIELD.get(context); + } + // if the reader or writer aren't given, use the same ones + if (reader == null) { + reader = (RecordReader) READER_FIELD.get(context); + } + if (writer == null) { + writer = (RecordWriter) WRITER_FIELD.get(context); + } + if (useV21) { + Object basis = + MAP_CONTEXT_IMPL_CONSTRUCTOR.newInstance(conf, + context.getTaskAttemptID(), + reader, writer, + context.getOutputCommitter(), + REPORTER_FIELD.get(context), + context.getInputSplit()); + return (Mapper.Context) + MAP_CONTEXT_CONSTRUCTOR.newInstance(outer, basis); + } else { + return (Mapper.Context) + MAP_CONTEXT_CONSTRUCTOR.newInstance(outer, + conf, context.getTaskAttemptID(), + reader, writer, + context.getOutputCommitter(), + REPORTER_FIELD.get(context), + context.getInputSplit()); + } + } catch (IllegalAccessException e) { + throw new IllegalArgumentException("Can't access field", e); + } catch (InstantiationException e) { + throw new IllegalArgumentException("Can't create object", e); + } catch (InvocationTargetException e) { + throw new IllegalArgumentException("Can't invoke constructor", e); + } + } +}