diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 32be987ad75..e9996596f06 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -674,6 +674,9 @@ Release 2.7.2 - UNRELEASED MAPREDUCE-6528. Memory leak for HistoryFileManager.getJobSummary() (Junping Du via jlowe) + MAPREDUCE-6451. DistCp has incorrect chunkFilePath for multiple jobs when + strategy is dynamic (Kuhu Shukla via kihwal) + Release 2.7.1 - 2015-07-06 INCOMPATIBLE CHANGES diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/lib/DynamicInputChunk.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/lib/DynamicInputChunk.java index 8482e7df49a..9bf8e479d0a 100644 --- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/lib/DynamicInputChunk.java +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/lib/DynamicInputChunk.java @@ -20,14 +20,10 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.IOUtils; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.tools.CopyListingFileStatus; -import org.apache.hadoop.tools.DistCpConstants; import org.apache.hadoop.tools.util.DistCpUtils; import org.apache.hadoop.mapreduce.lib.input.SequenceFileRecordReader; import org.apache.hadoop.mapreduce.lib.input.FileSplit; @@ -47,71 +43,27 @@ */ class DynamicInputChunk { private static Log LOG = LogFactory.getLog(DynamicInputChunk.class); - - private static Configuration configuration; - private static Path chunkRootPath; - private static String chunkFilePrefix; - private static int numChunksLeft = -1; // Un-initialized before 1st dir-scan. - private static FileSystem fs; - private Path chunkFilePath; private SequenceFileRecordReader reader; private SequenceFile.Writer writer; + private DynamicInputChunkContext chunkContext; - private static void initializeChunkInvariants(Configuration config) - throws IOException { - configuration = config; - Path listingFilePath = new Path(getListingFilePath(configuration)); - chunkRootPath = new Path(listingFilePath.getParent(), "chunkDir"); - fs = chunkRootPath.getFileSystem(configuration); - chunkFilePrefix = listingFilePath.getName() + ".chunk."; - } - - private static String getListingFilePath(Configuration configuration) { - final String listingFileString = configuration.get( - DistCpConstants.CONF_LABEL_LISTING_FILE_PATH, ""); - assert !listingFileString.equals("") : "Listing file not found."; - return listingFileString; - } - - private static boolean areInvariantsInitialized() { - return chunkRootPath != null; - } - - private DynamicInputChunk(String chunkId, Configuration configuration) + DynamicInputChunk(String chunkId, DynamicInputChunkContext chunkContext) throws IOException { - if (!areInvariantsInitialized()) - initializeChunkInvariants(configuration); - - chunkFilePath = new Path(chunkRootPath, chunkFilePrefix + chunkId); + this.chunkContext = chunkContext; + chunkFilePath = new Path(chunkContext.getChunkRootPath(), + chunkContext.getChunkFilePrefix() + chunkId); openForWrite(); } - private void openForWrite() throws IOException { writer = SequenceFile.createWriter( - chunkFilePath.getFileSystem(configuration), configuration, + chunkContext.getFs(), chunkContext.getConfiguration(), chunkFilePath, Text.class, CopyListingFileStatus.class, SequenceFile.CompressionType.NONE); } - /** - * Factory method to create chunk-files for writing to. - * (For instance, when the DynamicInputFormat splits the input-file into - * chunks.) - * @param chunkId String to identify the chunk. - * @param configuration Configuration, describing the location of the listing- - * file, file-system for the map-job, etc. - * @return A DynamicInputChunk, corresponding to a chunk-file, with the name - * incorporating the chunk-id. - * @throws IOException Exception on failure to create the chunk. - */ - public static DynamicInputChunk createChunkForWrite(String chunkId, - Configuration configuration) throws IOException { - return new DynamicInputChunk(chunkId, configuration); - } - /** * Method to write records into a chunk. * @param key Key from the listing file. @@ -135,19 +87,19 @@ public void close() { * @throws IOException Exception on failure to reassign. */ public void assignTo(TaskID taskId) throws IOException { - Path newPath = new Path(chunkRootPath, taskId.toString()); - if (!fs.rename(chunkFilePath, newPath)) { + Path newPath = new Path(chunkContext.getChunkRootPath(), taskId.toString()); + if (!chunkContext.getFs().rename(chunkFilePath, newPath)) { LOG.warn(chunkFilePath + " could not be assigned to " + taskId); } } - private DynamicInputChunk(Path chunkFilePath, - TaskAttemptContext taskAttemptContext) - throws IOException, InterruptedException { - if (!areInvariantsInitialized()) - initializeChunkInvariants(taskAttemptContext.getConfiguration()); + public DynamicInputChunk(Path chunkFilePath, + TaskAttemptContext taskAttemptContext, + DynamicInputChunkContext chunkContext) throws IOException, + InterruptedException { this.chunkFilePath = chunkFilePath; + this.chunkContext = chunkContext; openForRead(taskAttemptContext); } @@ -155,45 +107,8 @@ private void openForRead(TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException { reader = new SequenceFileRecordReader(); reader.initialize(new FileSplit(chunkFilePath, 0, - DistCpUtils.getFileSize(chunkFilePath, configuration), null), - taskAttemptContext); - } - - /** - * Factory method that - * 1. acquires a chunk for the specified map-task attempt - * 2. returns a DynamicInputChunk associated with the acquired chunk-file. - * @param taskAttemptContext The attempt-context for the map task that's - * trying to acquire a chunk. - * @return The acquired dynamic-chunk. The chunk-file is renamed to the - * attempt-id (from the attempt-context.) - * @throws IOException Exception on failure. - * @throws InterruptedException Exception on failure. - */ - public static DynamicInputChunk acquire(TaskAttemptContext taskAttemptContext) - throws IOException, InterruptedException { - if (!areInvariantsInitialized()) - initializeChunkInvariants(taskAttemptContext.getConfiguration()); - - String taskId - = taskAttemptContext.getTaskAttemptID().getTaskID().toString(); - Path acquiredFilePath = new Path(chunkRootPath, taskId); - - if (fs.exists(acquiredFilePath)) { - LOG.info("Acquiring pre-assigned chunk: " + acquiredFilePath); - return new DynamicInputChunk(acquiredFilePath, taskAttemptContext); - } - - for (FileStatus chunkFile : getListOfChunkFiles()) { - if (fs.rename(chunkFile.getPath(), acquiredFilePath)) { - LOG.info(taskId + " acquired " + chunkFile.getPath()); - return new DynamicInputChunk(acquiredFilePath, taskAttemptContext); - } - else - LOG.warn(taskId + " could not acquire " + chunkFile.getPath()); - } - - return null; + DistCpUtils.getFileSize(chunkFilePath, + chunkContext.getConfiguration()), null), taskAttemptContext); } /** @@ -204,19 +119,13 @@ public static DynamicInputChunk acquire(TaskAttemptContext taskAttemptContext) */ public void release() throws IOException { close(); - if (!fs.delete(chunkFilePath, false)) { + if (!chunkContext.getFs().delete(chunkFilePath, false)) { LOG.error("Unable to release chunk at path: " + chunkFilePath); - throw new IOException("Unable to release chunk at path: " + chunkFilePath); + throw new IOException("Unable to release chunk at path: " + + chunkFilePath); } } - static FileStatus [] getListOfChunkFiles() throws IOException { - Path chunkFilePattern = new Path(chunkRootPath, chunkFilePrefix + "*"); - FileStatus chunkFiles[] = fs.globStatus(chunkFilePattern); - numChunksLeft = chunkFiles.length; - return chunkFiles; - } - /** * Getter for the chunk-file's path, on HDFS. * @return The qualified path to the chunk-file. @@ -234,14 +143,4 @@ public SequenceFileRecordReader getReader() { return reader; } - /** - * Getter for the number of chunk-files left in the chunk-file directory. - * Useful to determine how many chunks (and hence, records) are left to be - * processed. - * @return Before the first scan of the directory, the number returned is -1. - * Otherwise, the number of chunk-files seen from the last scan is returned. - */ - public static int getNumChunksLeft() { - return numChunksLeft; - } } diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/lib/DynamicInputFormat.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/lib/DynamicInputFormat.java index 38269c755fe..fe8604a745e 100644 --- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/lib/DynamicInputFormat.java +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/lib/DynamicInputFormat.java @@ -57,7 +57,8 @@ public class DynamicInputFormat extends InputFormat { = "mapred.num.splits"; private static final String CONF_LABEL_NUM_ENTRIES_PER_CHUNK = "mapred.num.entries.per.chunk"; - + private DynamicInputChunkContext chunkContext = null; + /** * Implementation of InputFormat::getSplits(). This method splits up the * copy-listing file into chunks, and assigns the first batch to different @@ -72,6 +73,7 @@ public List getSplits(JobContext jobContext) throws IOException, InterruptedException { LOG.info("DynamicInputFormat: Getting splits for job:" + jobContext.getJobID()); + chunkContext = getChunkContext(jobContext.getConfiguration()); return createSplits(jobContext, splitCopyListingIntoChunksWithShuffle(jobContext)); } @@ -101,6 +103,13 @@ private List createSplits(JobContext jobContext, private static int N_CHUNKS_OPEN_AT_ONCE_DEFAULT = 16; + public DynamicInputChunkContext getChunkContext( + Configuration configuration) throws IOException { + if(chunkContext == null) { + chunkContext = new DynamicInputChunkContext(configuration); + } + return chunkContext; + } private List splitCopyListingIntoChunksWithShuffle (JobContext context) throws IOException { @@ -146,8 +155,8 @@ private List createSplits(JobContext jobContext, closeAll(openChunks); chunksFinal.addAll(openChunks); - openChunks = createChunks( - configuration, chunkCount, nChunksTotal, nChunksOpenAtOnce); + openChunks = createChunks(chunkCount, nChunksTotal, + nChunksOpenAtOnce); chunkCount += openChunks.size(); @@ -183,9 +192,9 @@ private static void closeAll(List chunks) { chunk.close(); } - private static List createChunks(Configuration config, - int chunkCount, int nChunksTotal, int nChunksOpenAtOnce) - throws IOException { + private List createChunks(int chunkCount, + int nChunksTotal, int nChunksOpenAtOnce) + throws IOException { List chunks = new ArrayList(); int chunkIdUpperBound = Math.min(nChunksTotal, chunkCount + nChunksOpenAtOnce); @@ -197,14 +206,13 @@ private static List createChunks(Configuration config, chunkIdUpperBound = nChunksTotal; for (int i=chunkCount; i < chunkIdUpperBound; ++i) - chunks.add(createChunk(i, config)); + chunks.add(createChunk(i)); return chunks; } - private static DynamicInputChunk createChunk(int chunkId, Configuration config) + private DynamicInputChunk createChunk(int chunkId) throws IOException { - return DynamicInputChunk.createChunkForWrite(String.format("%05d", chunkId), - config); + return chunkContext.createChunkForWrite(String.format("%05d", chunkId)); } @@ -351,6 +359,7 @@ public RecordReader createRecordReader( InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException { - return new DynamicRecordReader(); + chunkContext = getChunkContext(taskAttemptContext.getConfiguration()); + return new DynamicRecordReader(chunkContext); } } diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/lib/DynamicRecordReader.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/lib/DynamicRecordReader.java index 00b3c69de3d..87b8f089b06 100644 --- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/lib/DynamicRecordReader.java +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/lib/DynamicRecordReader.java @@ -49,9 +49,14 @@ public class DynamicRecordReader extends RecordReader { private int numRecordsProcessedByThisMap = 0; private long timeOfLastChunkDirScan = 0; private boolean isChunkDirAlreadyScanned = false; + private DynamicInputChunkContext chunkContext; private static long TIME_THRESHOLD_FOR_DIR_SCANS = TimeUnit.MINUTES.toMillis(5); + DynamicRecordReader(DynamicInputChunkContext chunkContext) { + this.chunkContext = chunkContext; + } + /** * Implementation for RecordReader::initialize(). Initializes the internal * RecordReader to read from chunks. @@ -69,7 +74,7 @@ public void initialize(InputSplit inputSplit, this.taskAttemptContext = taskAttemptContext; configuration = taskAttemptContext.getConfiguration(); taskId = taskAttemptContext.getTaskAttemptID().getTaskID(); - chunk = DynamicInputChunk.acquire(this.taskAttemptContext); + chunk = chunkContext.acquire(this.taskAttemptContext); timeOfLastChunkDirScan = System.currentTimeMillis(); isChunkDirAlreadyScanned = false; @@ -114,7 +119,7 @@ public boolean nextKeyValue() timeOfLastChunkDirScan = System.currentTimeMillis(); isChunkDirAlreadyScanned = false; - chunk = DynamicInputChunk.acquire(taskAttemptContext); + chunk = chunkContext.acquire(taskAttemptContext); if (chunk == null) return false; @@ -182,12 +187,12 @@ private int getNumChunksLeft() throws IOException { || (!isChunkDirAlreadyScanned && numRecordsProcessedByThisMap%numRecordsPerChunk > numRecordsPerChunk/2)) { - DynamicInputChunk.getListOfChunkFiles(); + chunkContext.getListOfChunkFiles(); isChunkDirAlreadyScanned = true; timeOfLastChunkDirScan = now; } - return DynamicInputChunk.getNumChunksLeft(); + return chunkContext.getNumChunksLeft(); } /** * Implementation of RecordReader::close(). diff --git a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/StubContext.java b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/StubContext.java index 1a2227cf742..4bb6c98c30d 100644 --- a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/StubContext.java +++ b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/StubContext.java @@ -64,6 +64,10 @@ public RecordReader getReader() { return reader; } + public void setReader(RecordReader reader) { + this.reader = reader; + } + public StubInMemoryWriter getWriter() { return writer; } diff --git a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/lib/TestDynamicInputFormat.java b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/lib/TestDynamicInputFormat.java index 8cc8317b994..bb2dd9d37d1 100644 --- a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/lib/TestDynamicInputFormat.java +++ b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/lib/TestDynamicInputFormat.java @@ -40,6 +40,7 @@ import org.junit.Test; import java.io.DataOutputStream; +import java.io.IOException; import java.util.ArrayList; import java.util.List; @@ -126,13 +127,14 @@ public void testGetSplits() throws Exception { int taskId = 0; for (InputSplit split : splits) { - RecordReader recordReader = - inputFormat.createRecordReader(split, null); StubContext stubContext = new StubContext(jobContext.getConfiguration(), - recordReader, taskId); + null, taskId); final TaskAttemptContext taskAttemptContext = stubContext.getContext(); - + + RecordReader recordReader = + inputFormat.createRecordReader(split, taskAttemptContext); + stubContext.setReader(recordReader); recordReader.initialize(splits.get(0), taskAttemptContext); float previousProgressValue = 0f; while (recordReader.nextKeyValue()) { @@ -182,4 +184,27 @@ public void testGetSplitRatio() throws Exception { conf.setInt(DistCpConstants.CONF_LABEL_SPLIT_RATIO, 53); Assert.assertEquals(53, DynamicInputFormat.getSplitRatio(3, 200, conf)); } + + @Test + public void testDynamicInputChunkContext() throws IOException { + Configuration configuration = new Configuration(); + configuration.set(DistCpConstants.CONF_LABEL_LISTING_FILE_PATH, + "/tmp/test/file1.seq"); + DynamicInputFormat firstInputFormat = new DynamicInputFormat(); + DynamicInputFormat secondInputFormat = new DynamicInputFormat(); + DynamicInputChunkContext firstContext = + firstInputFormat.getChunkContext(configuration); + DynamicInputChunkContext secondContext = + firstInputFormat.getChunkContext(configuration); + DynamicInputChunkContext thirdContext = + secondInputFormat.getChunkContext(configuration); + DynamicInputChunkContext fourthContext = + secondInputFormat.getChunkContext(configuration); + Assert.assertTrue("Chunk contexts from the same DynamicInputFormat " + + "object should be the same.",firstContext.equals(secondContext)); + Assert.assertTrue("Chunk contexts from the same DynamicInputFormat " + + "object should be the same.",thirdContext.equals(fourthContext)); + Assert.assertTrue("Contexts from different DynamicInputFormat " + + "objects should be different.",!firstContext.equals(thirdContext)); + } }