MAPREDUCE-5952. LocalContainerLauncher#renameMapOutputForReduce incorrectly assumes a single dir for mapOutIndex. (Gera Shegalov via kasha)
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1611197 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
d405271060
commit
4c7db3c6a3
|
@ -162,6 +162,9 @@ Release 2.5.0 - UNRELEASED
|
|||
resource configuration for deciding uber-mode on map-only jobs. (Siqi Li via
|
||||
vinodkv)
|
||||
|
||||
MAPREDUCE-5952. LocalContainerLauncher#renameMapOutputForReduce incorrectly
|
||||
assumes a single dir for mapOutIndex. (Gera Shegalov via kasha)
|
||||
|
||||
Release 2.4.1 - 2014-06-23
|
||||
|
||||
INCOMPATIBLE CHANGES
|
||||
|
|
|
@ -30,6 +30,7 @@ import java.util.concurrent.Executors;
|
|||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.LinkedBlockingQueue;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.fs.FSError;
|
||||
|
@ -437,43 +438,6 @@ public class LocalContainerLauncher extends AbstractService implements
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Within the _local_ filesystem (not HDFS), all activity takes place within
|
||||
* a single subdir (${local.dir}/usercache/$user/appcache/$appId/$contId/),
|
||||
* and all sub-MapTasks create the same filename ("file.out"). Rename that
|
||||
* to something unique (e.g., "map_0.out") to avoid collisions.
|
||||
*
|
||||
* Longer-term, we'll modify [something] to use TaskAttemptID-based
|
||||
* filenames instead of "file.out". (All of this is entirely internal,
|
||||
* so there are no particular compatibility issues.)
|
||||
*/
|
||||
private MapOutputFile renameMapOutputForReduce(JobConf conf,
|
||||
TaskAttemptId mapId, MapOutputFile subMapOutputFile) throws IOException {
|
||||
FileSystem localFs = FileSystem.getLocal(conf);
|
||||
// move map output to reduce input
|
||||
Path mapOut = subMapOutputFile.getOutputFile();
|
||||
FileStatus mStatus = localFs.getFileStatus(mapOut);
|
||||
Path reduceIn = subMapOutputFile.getInputFileForWrite(
|
||||
TypeConverter.fromYarn(mapId).getTaskID(), mStatus.getLen());
|
||||
Path mapOutIndex = new Path(mapOut.toString() + ".index");
|
||||
Path reduceInIndex = new Path(reduceIn.toString() + ".index");
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Renaming map output file for task attempt "
|
||||
+ mapId.toString() + " from original location " + mapOut.toString()
|
||||
+ " to destination " + reduceIn.toString());
|
||||
}
|
||||
if (!localFs.mkdirs(reduceIn.getParent())) {
|
||||
throw new IOException("Mkdirs failed to create "
|
||||
+ reduceIn.getParent().toString());
|
||||
}
|
||||
if (!localFs.rename(mapOut, reduceIn))
|
||||
throw new IOException("Couldn't rename " + mapOut);
|
||||
if (!localFs.rename(mapOutIndex, reduceInIndex))
|
||||
throw new IOException("Couldn't rename " + mapOutIndex);
|
||||
|
||||
return new RenamedMapOutputFile(reduceIn);
|
||||
}
|
||||
|
||||
/**
|
||||
* Also within the local filesystem, we need to restore the initial state
|
||||
* of the directory as much as possible. Compare current contents against
|
||||
|
@ -506,7 +470,46 @@ public class LocalContainerLauncher extends AbstractService implements
|
|||
}
|
||||
|
||||
} // end EventHandler
|
||||
|
||||
|
||||
/**
|
||||
* Within the _local_ filesystem (not HDFS), all activity takes place within
|
||||
* a subdir inside one of the LOCAL_DIRS
|
||||
* (${local.dir}/usercache/$user/appcache/$appId/$contId/),
|
||||
* and all sub-MapTasks create the same filename ("file.out"). Rename that
|
||||
* to something unique (e.g., "map_0.out") to avoid possible collisions.
|
||||
*
|
||||
* Longer-term, we'll modify [something] to use TaskAttemptID-based
|
||||
* filenames instead of "file.out". (All of this is entirely internal,
|
||||
* so there are no particular compatibility issues.)
|
||||
*/
|
||||
@VisibleForTesting
|
||||
protected static MapOutputFile renameMapOutputForReduce(JobConf conf,
|
||||
TaskAttemptId mapId, MapOutputFile subMapOutputFile) throws IOException {
|
||||
FileSystem localFs = FileSystem.getLocal(conf);
|
||||
// move map output to reduce input
|
||||
Path mapOut = subMapOutputFile.getOutputFile();
|
||||
FileStatus mStatus = localFs.getFileStatus(mapOut);
|
||||
Path reduceIn = subMapOutputFile.getInputFileForWrite(
|
||||
TypeConverter.fromYarn(mapId).getTaskID(), mStatus.getLen());
|
||||
Path mapOutIndex = subMapOutputFile.getOutputIndexFile();
|
||||
Path reduceInIndex = new Path(reduceIn.toString() + ".index");
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Renaming map output file for task attempt "
|
||||
+ mapId.toString() + " from original location " + mapOut.toString()
|
||||
+ " to destination " + reduceIn.toString());
|
||||
}
|
||||
if (!localFs.mkdirs(reduceIn.getParent())) {
|
||||
throw new IOException("Mkdirs failed to create "
|
||||
+ reduceIn.getParent().toString());
|
||||
}
|
||||
if (!localFs.rename(mapOut, reduceIn))
|
||||
throw new IOException("Couldn't rename " + mapOut);
|
||||
if (!localFs.rename(mapOutIndex, reduceInIndex))
|
||||
throw new IOException("Couldn't rename " + mapOutIndex);
|
||||
|
||||
return new RenamedMapOutputFile(reduceIn);
|
||||
}
|
||||
|
||||
private static class RenamedMapOutputFile extends MapOutputFile {
|
||||
private Path path;
|
||||
|
||||
|
|
|
@ -18,17 +18,26 @@
|
|||
|
||||
package org.apache.hadoop.mapred;
|
||||
|
||||
import static org.apache.hadoop.fs.CreateFlag.CREATE;
|
||||
import static org.mockito.Matchers.isA;
|
||||
import static org.mockito.Mockito.doAnswer;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.util.EnumSet;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileContext;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.mapreduce.MRConfig;
|
||||
import org.apache.hadoop.mapreduce.TypeConverter;
|
||||
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
|
||||
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
|
||||
|
@ -46,6 +55,9 @@ import org.apache.hadoop.yarn.api.records.Container;
|
|||
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||
import org.apache.hadoop.yarn.event.Event;
|
||||
import org.apache.hadoop.yarn.event.EventHandler;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.Assert;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
import org.mockito.invocation.InvocationOnMock;
|
||||
import org.mockito.stubbing.Answer;
|
||||
|
@ -53,6 +65,36 @@ import org.mockito.stubbing.Answer;
|
|||
public class TestLocalContainerLauncher {
|
||||
private static final Log LOG =
|
||||
LogFactory.getLog(TestLocalContainerLauncher.class);
|
||||
private static File testWorkDir;
|
||||
private static final String[] localDirs = new String[2];
|
||||
|
||||
private static void delete(File dir) throws IOException {
|
||||
Configuration conf = new Configuration();
|
||||
FileSystem fs = FileSystem.getLocal(conf);
|
||||
Path p = fs.makeQualified(new Path(dir.getAbsolutePath()));
|
||||
fs.delete(p, true);
|
||||
}
|
||||
|
||||
@BeforeClass
|
||||
public static void setupTestDirs() throws IOException {
|
||||
testWorkDir = new File("target",
|
||||
TestLocalContainerLauncher.class.getCanonicalName());
|
||||
testWorkDir.delete();
|
||||
testWorkDir.mkdirs();
|
||||
testWorkDir = testWorkDir.getAbsoluteFile();
|
||||
for (int i = 0; i < localDirs.length; i++) {
|
||||
final File dir = new File(testWorkDir, "local-" + i);
|
||||
dir.mkdirs();
|
||||
localDirs[i] = dir.toString();
|
||||
}
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void cleanupTestDirs() throws IOException {
|
||||
if (testWorkDir != null) {
|
||||
delete(testWorkDir);
|
||||
}
|
||||
}
|
||||
|
||||
@SuppressWarnings("rawtypes")
|
||||
@Test(timeout=10000)
|
||||
|
@ -141,4 +183,35 @@ public class TestLocalContainerLauncher {
|
|||
when(container.getNodeId()).thenReturn(nodeId);
|
||||
return container;
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testRenameMapOutputForReduce() throws Exception {
|
||||
final JobConf conf = new JobConf();
|
||||
|
||||
final MROutputFiles mrOutputFiles = new MROutputFiles();
|
||||
mrOutputFiles.setConf(conf);
|
||||
|
||||
// make sure both dirs are distinct
|
||||
//
|
||||
conf.set(MRConfig.LOCAL_DIR, localDirs[0].toString());
|
||||
final Path mapOut = mrOutputFiles.getOutputFileForWrite(1);
|
||||
conf.set(MRConfig.LOCAL_DIR, localDirs[1].toString());
|
||||
final Path mapOutIdx = mrOutputFiles.getOutputIndexFileForWrite(1);
|
||||
Assert.assertNotEquals("Paths must be different!",
|
||||
mapOut.getParent(), mapOutIdx.getParent());
|
||||
|
||||
// make both dirs part of LOCAL_DIR
|
||||
conf.setStrings(MRConfig.LOCAL_DIR, localDirs);
|
||||
|
||||
final FileContext lfc = FileContext.getLocalFSFileContext(conf);
|
||||
lfc.create(mapOut, EnumSet.of(CREATE)).close();
|
||||
lfc.create(mapOutIdx, EnumSet.of(CREATE)).close();
|
||||
|
||||
final JobId jobId = MRBuilderUtils.newJobId(12345L, 1, 2);
|
||||
final TaskId tid = MRBuilderUtils.newTaskId(jobId, 0, TaskType.MAP);
|
||||
final TaskAttemptId taid = MRBuilderUtils.newTaskAttemptId(tid, 0);
|
||||
|
||||
LocalContainerLauncher.renameMapOutputForReduce(conf, taid, mrOutputFiles);
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue