MAPREDUCE-5176. Add annotation for tagging tasks as responsive to

preemption. Contributed by Carlo Curino, cdouglas


git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1488103 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Christopher Douglas 2013-05-31 06:07:18 +00:00
parent e188287946
commit 0a452a30ab
7 changed files with 299 additions and 4 deletions

View File

@ -249,6 +249,9 @@ Release 2.0.5-beta - UNRELEASED
FileOuputFormat.Counter for binary compatibility with 1.x mapred APIs.
(Mayank Bansal via vinodkv)
MAPREDUCE-5176. Add annotation for tagging tasks as responsive to
preemption. (Carlo Curino, cdouglas)
OPTIMIZATIONS
MAPREDUCE-4974. Optimising the LineRecordReader initialize() method

View File

@ -23,8 +23,7 @@ import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.RawComparator;
import org.apache.hadoop.mapred.RawKeyValueIterator;
import org.apache.hadoop.mapreduce.task.annotation.Checkpointable;
import java.util.Iterator;
@ -119,6 +118,7 @@ import java.util.Iterator;
* @see Mapper
* @see Partitioner
*/
@Checkpointable
@InterfaceAudience.Public
@InterfaceStability.Stable
public class Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT> {

View File

@ -154,7 +154,7 @@ public class FileOutputCommitter extends OutputCommitter {
* @param appAttemptId the ID of the application attempt for this job.
* @return the path to store job attempt data.
*/
private Path getJobAttemptPath(int appAttemptId) {
protected Path getJobAttemptPath(int appAttemptId) {
return getJobAttemptPath(appAttemptId, getOutputPath());
}
@ -232,7 +232,7 @@ public class FileOutputCommitter extends OutputCommitter {
* @param context the context of any task.
* @return the path where the output of a committed task is stored.
*/
private Path getCommittedTaskPath(int appAttemptId, TaskAttemptContext context) {
protected Path getCommittedTaskPath(int appAttemptId, TaskAttemptContext context) {
return new Path(getJobAttemptPath(appAttemptId),
String.valueOf(context.getTaskAttemptID().getTaskID()));
}

View File

@ -0,0 +1,106 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapreduce.lib.output;
import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.TaskID;
import org.apache.hadoop.mapreduce.task.annotation.Checkpointable;
import com.google.common.annotations.VisibleForTesting;
/** An {@link OutputCommitter} that commits files specified
* in job output directory i.e. ${mapreduce.output.fileoutputformat.outputdir}.
**/
@Checkpointable
@InterfaceAudience.Public
@InterfaceStability.Evolving
public class PartialFileOutputCommitter
extends FileOutputCommitter implements PartialOutputCommitter {
private static final Log LOG =
LogFactory.getLog(PartialFileOutputCommitter.class);
public PartialFileOutputCommitter(Path outputPath,
TaskAttemptContext context) throws IOException {
super(outputPath, context);
}
public PartialFileOutputCommitter(Path outputPath,
JobContext context) throws IOException {
super(outputPath, context);
}
@Override
public Path getCommittedTaskPath(int appAttemptId, TaskAttemptContext context) {
return new Path(getJobAttemptPath(appAttemptId),
String.valueOf(context.getTaskAttemptID()));
}
@VisibleForTesting
FileSystem fsFor(Path p, Configuration conf) throws IOException {
return p.getFileSystem(conf);
}
@Override
public void cleanUpPartialOutputForTask(TaskAttemptContext context)
throws IOException {
// we double check this is never invoked from a non-preemptable subclass.
// This should never happen, since the invoking codes is checking it too,
// but it is safer to double check. Errors handling this would produce
// inconsistent output.
if (!this.getClass().isAnnotationPresent(Checkpointable.class)) {
throw new IllegalStateException("Invoking cleanUpPartialOutputForTask() " +
"from non @Preemptable class");
}
FileSystem fs =
fsFor(getTaskAttemptPath(context), context.getConfiguration());
LOG.info("cleanUpPartialOutputForTask: removing everything belonging to " +
context.getTaskAttemptID().getTaskID() + " in: " +
getCommittedTaskPath(context).getParent());
final TaskAttemptID taid = context.getTaskAttemptID();
final TaskID tid = taid.getTaskID();
Path pCommit = getCommittedTaskPath(context).getParent();
// remove any committed output
for (int i = 0; i < taid.getId(); ++i) {
TaskAttemptID oldId = new TaskAttemptID(tid, i);
Path pTask = new Path(pCommit, oldId.toString());
if (fs.exists(pTask) && !fs.delete(pTask, true)) {
throw new IOException("Failed to delete " + pTask);
}
}
}
}

View File

@ -0,0 +1,43 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapreduce.lib.output;
import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
/**
* Interface for an {@link org.apache.hadoop.mapreduce.OutputCommitter}
* implementing partial commit of task output, as during preemption.
*/
@InterfaceAudience.Public
@InterfaceStability.Evolving
public interface PartialOutputCommitter {
/**
* Remove all previously committed outputs from prior executions of this task.
* @param context Context for cleaning up previously promoted output.
* @throws IOException If cleanup fails, then the state of the task my not be
* well defined.
*/
public void cleanUpPartialOutputForTask(TaskAttemptContext context)
throws IOException;
}

View File

@ -0,0 +1,42 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapreduce.task.annotation;
import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
/**
* Contract representing to the framework that the task can be safely preempted
* and restarted between invocations of the user-defined function.
*
* This is often true when the result of a function does not rely on state
* derived from previous elements in the record stream, but the guarantee is
* left as an exercise to the implementor.
*/
@Documented
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@InterfaceAudience.Public
@InterfaceStability.Evolving
public @interface Checkpointable { }

View File

@ -0,0 +1,101 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapreduce.lib.output;
import java.io.FileNotFoundException;
import java.io.IOException;
import org.apache.hadoop.mapreduce.task.annotation.Checkpointable;
import org.junit.Test;
import static org.mockito.Mockito.*;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.TaskType;
public class TestPreemptableFileOutputCommitter {
@Test
public void testPartialOutputCleanup()
throws FileNotFoundException, IllegalArgumentException, IOException {
Configuration conf = new Configuration(false);
conf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID, 1);
TaskAttemptID tid0 =
new TaskAttemptID("1363718006656", 1, TaskType.REDUCE, 14, 3);
Path p = spy(new Path("/user/hadoop/out"));
Path a = new Path("hdfs://user/hadoop/out");
Path p0 = new Path(a, "_temporary/1/attempt_1363718006656_0001_r_000014_0");
Path p1 = new Path(a, "_temporary/1/attempt_1363718006656_0001_r_000014_1");
Path p2 = new Path(a, "_temporary/1/attempt_1363718006656_0001_r_000013_0");
// (p3 does not exist)
Path p3 = new Path(a, "_temporary/1/attempt_1363718006656_0001_r_000014_2");
FileStatus[] fsa = new FileStatus[3];
fsa[0] = new FileStatus();
fsa[0].setPath(p0);
fsa[1] = new FileStatus();
fsa[1].setPath(p1);
fsa[2] = new FileStatus();
fsa[2].setPath(p2);
final FileSystem fs = mock(FileSystem.class);
when(fs.exists(eq(p0))).thenReturn(true);
when(fs.exists(eq(p1))).thenReturn(true);
when(fs.exists(eq(p2))).thenReturn(true);
when(fs.exists(eq(p3))).thenReturn(false);
when(fs.delete(eq(p0), eq(true))).thenReturn(true);
when(fs.delete(eq(p1), eq(true))).thenReturn(true);
doReturn(fs).when(p).getFileSystem(any(Configuration.class));
when(fs.makeQualified(eq(p))).thenReturn(a);
TaskAttemptContext context = mock(TaskAttemptContext.class);
when(context.getTaskAttemptID()).thenReturn(tid0);
when(context.getConfiguration()).thenReturn(conf);
PartialFileOutputCommitter foc = new TestPFOC(p, context, fs);
foc.cleanUpPartialOutputForTask(context);
verify(fs).delete(eq(p0), eq(true));
verify(fs).delete(eq(p1), eq(true));
verify(fs, never()).delete(eq(p3), eq(true));
verify(fs, never()).delete(eq(p2), eq(true));
}
@Checkpointable
static class TestPFOC extends PartialFileOutputCommitter {
final FileSystem fs;
TestPFOC(Path outputPath, TaskAttemptContext ctxt, FileSystem fs)
throws IOException {
super(outputPath, ctxt);
this.fs = fs;
}
@Override
FileSystem fsFor(Path p, Configuration conf) {
return fs;
}
}
}