MAPREDUCE-6451. DistCp has incorrect chunkFilePath for multiple jobs when strategy is dynamic. Contributed by Kuhu Shukla.

(cherry picked from commit 2868ca0328)
This commit is contained in:
Kihwal Lee 2015-10-30 14:57:57 -05:00
parent a7d8895b60
commit 52819fe4ef
6 changed files with 83 additions and 138 deletions

View File

@ -393,6 +393,9 @@ Release 2.7.2 - UNRELEASED
MAPREDUCE-6528. Memory leak for HistoryFileManager.getJobSummary() MAPREDUCE-6528. Memory leak for HistoryFileManager.getJobSummary()
(Junping Du via jlowe) (Junping Du via jlowe)
MAPREDUCE-6451. DistCp has incorrect chunkFilePath for multiple jobs when
strategy is dynamic (Kuhu Shukla via kihwal)
Release 2.7.1 - 2015-07-06 Release 2.7.1 - 2015-07-06
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -20,14 +20,10 @@ package org.apache.hadoop.tools.mapred.lib;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.tools.CopyListingFileStatus; import org.apache.hadoop.tools.CopyListingFileStatus;
import org.apache.hadoop.tools.DistCpConstants;
import org.apache.hadoop.tools.util.DistCpUtils; import org.apache.hadoop.tools.util.DistCpUtils;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileRecordReader; import org.apache.hadoop.mapreduce.lib.input.SequenceFileRecordReader;
import org.apache.hadoop.mapreduce.lib.input.FileSplit; import org.apache.hadoop.mapreduce.lib.input.FileSplit;
@ -47,71 +43,27 @@ import java.io.IOException;
*/ */
class DynamicInputChunk<K, V> { class DynamicInputChunk<K, V> {
private static Log LOG = LogFactory.getLog(DynamicInputChunk.class); private static Log LOG = LogFactory.getLog(DynamicInputChunk.class);
private static Configuration configuration;
private static Path chunkRootPath;
private static String chunkFilePrefix;
private static int numChunksLeft = -1; // Un-initialized before 1st dir-scan.
private static FileSystem fs;
private Path chunkFilePath; private Path chunkFilePath;
private SequenceFileRecordReader<K, V> reader; private SequenceFileRecordReader<K, V> reader;
private SequenceFile.Writer writer; private SequenceFile.Writer writer;
private DynamicInputChunkContext chunkContext;
private static void initializeChunkInvariants(Configuration config) DynamicInputChunk(String chunkId, DynamicInputChunkContext chunkContext)
throws IOException {
configuration = config;
Path listingFilePath = new Path(getListingFilePath(configuration));
chunkRootPath = new Path(listingFilePath.getParent(), "chunkDir");
fs = chunkRootPath.getFileSystem(configuration);
chunkFilePrefix = listingFilePath.getName() + ".chunk.";
}
private static String getListingFilePath(Configuration configuration) {
final String listingFileString = configuration.get(
DistCpConstants.CONF_LABEL_LISTING_FILE_PATH, "");
assert !listingFileString.equals("") : "Listing file not found.";
return listingFileString;
}
private static boolean areInvariantsInitialized() {
return chunkRootPath != null;
}
private DynamicInputChunk(String chunkId, Configuration configuration)
throws IOException { throws IOException {
if (!areInvariantsInitialized()) this.chunkContext = chunkContext;
initializeChunkInvariants(configuration); chunkFilePath = new Path(chunkContext.getChunkRootPath(),
chunkContext.getChunkFilePrefix() + chunkId);
chunkFilePath = new Path(chunkRootPath, chunkFilePrefix + chunkId);
openForWrite(); openForWrite();
} }
private void openForWrite() throws IOException { private void openForWrite() throws IOException {
writer = SequenceFile.createWriter( writer = SequenceFile.createWriter(
chunkFilePath.getFileSystem(configuration), configuration, chunkContext.getFs(), chunkContext.getConfiguration(),
chunkFilePath, Text.class, CopyListingFileStatus.class, chunkFilePath, Text.class, CopyListingFileStatus.class,
SequenceFile.CompressionType.NONE); SequenceFile.CompressionType.NONE);
} }
/**
* Factory method to create chunk-files for writing to.
* (For instance, when the DynamicInputFormat splits the input-file into
* chunks.)
* @param chunkId String to identify the chunk.
* @param configuration Configuration, describing the location of the listing-
* file, file-system for the map-job, etc.
* @return A DynamicInputChunk, corresponding to a chunk-file, with the name
* incorporating the chunk-id.
* @throws IOException Exception on failure to create the chunk.
*/
public static DynamicInputChunk createChunkForWrite(String chunkId,
Configuration configuration) throws IOException {
return new DynamicInputChunk(chunkId, configuration);
}
/** /**
* Method to write records into a chunk. * Method to write records into a chunk.
* @param key Key from the listing file. * @param key Key from the listing file.
@ -135,19 +87,19 @@ class DynamicInputChunk<K, V> {
* @throws IOException Exception on failure to reassign. * @throws IOException Exception on failure to reassign.
*/ */
public void assignTo(TaskID taskId) throws IOException { public void assignTo(TaskID taskId) throws IOException {
Path newPath = new Path(chunkRootPath, taskId.toString()); Path newPath = new Path(chunkContext.getChunkRootPath(), taskId.toString());
if (!fs.rename(chunkFilePath, newPath)) { if (!chunkContext.getFs().rename(chunkFilePath, newPath)) {
LOG.warn(chunkFilePath + " could not be assigned to " + taskId); LOG.warn(chunkFilePath + " could not be assigned to " + taskId);
} }
} }
private DynamicInputChunk(Path chunkFilePath, public DynamicInputChunk(Path chunkFilePath,
TaskAttemptContext taskAttemptContext) TaskAttemptContext taskAttemptContext,
throws IOException, InterruptedException { DynamicInputChunkContext chunkContext) throws IOException,
if (!areInvariantsInitialized()) InterruptedException {
initializeChunkInvariants(taskAttemptContext.getConfiguration());
this.chunkFilePath = chunkFilePath; this.chunkFilePath = chunkFilePath;
this.chunkContext = chunkContext;
openForRead(taskAttemptContext); openForRead(taskAttemptContext);
} }
@ -155,45 +107,8 @@ class DynamicInputChunk<K, V> {
throws IOException, InterruptedException { throws IOException, InterruptedException {
reader = new SequenceFileRecordReader<K, V>(); reader = new SequenceFileRecordReader<K, V>();
reader.initialize(new FileSplit(chunkFilePath, 0, reader.initialize(new FileSplit(chunkFilePath, 0,
DistCpUtils.getFileSize(chunkFilePath, configuration), null), DistCpUtils.getFileSize(chunkFilePath,
taskAttemptContext); chunkContext.getConfiguration()), null), taskAttemptContext);
}
/**
* Factory method that
* 1. acquires a chunk for the specified map-task attempt
* 2. returns a DynamicInputChunk associated with the acquired chunk-file.
* @param taskAttemptContext The attempt-context for the map task that's
* trying to acquire a chunk.
* @return The acquired dynamic-chunk. The chunk-file is renamed to the
* attempt-id (from the attempt-context.)
* @throws IOException Exception on failure.
* @throws InterruptedException Exception on failure.
*/
public static DynamicInputChunk acquire(TaskAttemptContext taskAttemptContext)
throws IOException, InterruptedException {
if (!areInvariantsInitialized())
initializeChunkInvariants(taskAttemptContext.getConfiguration());
String taskId
= taskAttemptContext.getTaskAttemptID().getTaskID().toString();
Path acquiredFilePath = new Path(chunkRootPath, taskId);
if (fs.exists(acquiredFilePath)) {
LOG.info("Acquiring pre-assigned chunk: " + acquiredFilePath);
return new DynamicInputChunk(acquiredFilePath, taskAttemptContext);
}
for (FileStatus chunkFile : getListOfChunkFiles()) {
if (fs.rename(chunkFile.getPath(), acquiredFilePath)) {
LOG.info(taskId + " acquired " + chunkFile.getPath());
return new DynamicInputChunk(acquiredFilePath, taskAttemptContext);
}
else
LOG.warn(taskId + " could not acquire " + chunkFile.getPath());
}
return null;
} }
/** /**
@ -204,19 +119,13 @@ class DynamicInputChunk<K, V> {
*/ */
public void release() throws IOException { public void release() throws IOException {
close(); close();
if (!fs.delete(chunkFilePath, false)) { if (!chunkContext.getFs().delete(chunkFilePath, false)) {
LOG.error("Unable to release chunk at path: " + chunkFilePath); LOG.error("Unable to release chunk at path: " + chunkFilePath);
throw new IOException("Unable to release chunk at path: " + chunkFilePath); throw new IOException("Unable to release chunk at path: " +
chunkFilePath);
} }
} }
static FileStatus [] getListOfChunkFiles() throws IOException {
Path chunkFilePattern = new Path(chunkRootPath, chunkFilePrefix + "*");
FileStatus chunkFiles[] = fs.globStatus(chunkFilePattern);
numChunksLeft = chunkFiles.length;
return chunkFiles;
}
/** /**
* Getter for the chunk-file's path, on HDFS. * Getter for the chunk-file's path, on HDFS.
* @return The qualified path to the chunk-file. * @return The qualified path to the chunk-file.
@ -234,14 +143,4 @@ class DynamicInputChunk<K, V> {
return reader; return reader;
} }
/**
* Getter for the number of chunk-files left in the chunk-file directory.
* Useful to determine how many chunks (and hence, records) are left to be
* processed.
* @return Before the first scan of the directory, the number returned is -1.
* Otherwise, the number of chunk-files seen from the last scan is returned.
*/
public static int getNumChunksLeft() {
return numChunksLeft;
}
} }

View File

@ -57,7 +57,8 @@ public class DynamicInputFormat<K, V> extends InputFormat<K, V> {
= "mapred.num.splits"; = "mapred.num.splits";
private static final String CONF_LABEL_NUM_ENTRIES_PER_CHUNK private static final String CONF_LABEL_NUM_ENTRIES_PER_CHUNK
= "mapred.num.entries.per.chunk"; = "mapred.num.entries.per.chunk";
private DynamicInputChunkContext<K, V> chunkContext = null;
/** /**
* Implementation of InputFormat::getSplits(). This method splits up the * Implementation of InputFormat::getSplits(). This method splits up the
* copy-listing file into chunks, and assigns the first batch to different * copy-listing file into chunks, and assigns the first batch to different
@ -72,6 +73,7 @@ public class DynamicInputFormat<K, V> extends InputFormat<K, V> {
throws IOException, InterruptedException { throws IOException, InterruptedException {
LOG.info("DynamicInputFormat: Getting splits for job:" LOG.info("DynamicInputFormat: Getting splits for job:"
+ jobContext.getJobID()); + jobContext.getJobID());
chunkContext = getChunkContext(jobContext.getConfiguration());
return createSplits(jobContext, return createSplits(jobContext,
splitCopyListingIntoChunksWithShuffle(jobContext)); splitCopyListingIntoChunksWithShuffle(jobContext));
} }
@ -101,6 +103,13 @@ public class DynamicInputFormat<K, V> extends InputFormat<K, V> {
private static int N_CHUNKS_OPEN_AT_ONCE_DEFAULT = 16; private static int N_CHUNKS_OPEN_AT_ONCE_DEFAULT = 16;
public DynamicInputChunkContext<K, V> getChunkContext(
Configuration configuration) throws IOException {
if(chunkContext == null) {
chunkContext = new DynamicInputChunkContext<K, V>(configuration);
}
return chunkContext;
}
private List<DynamicInputChunk> splitCopyListingIntoChunksWithShuffle private List<DynamicInputChunk> splitCopyListingIntoChunksWithShuffle
(JobContext context) throws IOException { (JobContext context) throws IOException {
@ -146,8 +155,8 @@ public class DynamicInputFormat<K, V> extends InputFormat<K, V> {
closeAll(openChunks); closeAll(openChunks);
chunksFinal.addAll(openChunks); chunksFinal.addAll(openChunks);
openChunks = createChunks( openChunks = createChunks(chunkCount, nChunksTotal,
configuration, chunkCount, nChunksTotal, nChunksOpenAtOnce); nChunksOpenAtOnce);
chunkCount += openChunks.size(); chunkCount += openChunks.size();
@ -183,9 +192,9 @@ public class DynamicInputFormat<K, V> extends InputFormat<K, V> {
chunk.close(); chunk.close();
} }
private static List<DynamicInputChunk> createChunks(Configuration config, private List<DynamicInputChunk> createChunks(int chunkCount,
int chunkCount, int nChunksTotal, int nChunksOpenAtOnce) int nChunksTotal, int nChunksOpenAtOnce)
throws IOException { throws IOException {
List<DynamicInputChunk> chunks = new ArrayList<DynamicInputChunk>(); List<DynamicInputChunk> chunks = new ArrayList<DynamicInputChunk>();
int chunkIdUpperBound int chunkIdUpperBound
= Math.min(nChunksTotal, chunkCount + nChunksOpenAtOnce); = Math.min(nChunksTotal, chunkCount + nChunksOpenAtOnce);
@ -197,14 +206,13 @@ public class DynamicInputFormat<K, V> extends InputFormat<K, V> {
chunkIdUpperBound = nChunksTotal; chunkIdUpperBound = nChunksTotal;
for (int i=chunkCount; i < chunkIdUpperBound; ++i) for (int i=chunkCount; i < chunkIdUpperBound; ++i)
chunks.add(createChunk(i, config)); chunks.add(createChunk(i));
return chunks; return chunks;
} }
private static DynamicInputChunk createChunk(int chunkId, Configuration config) private DynamicInputChunk createChunk(int chunkId)
throws IOException { throws IOException {
return DynamicInputChunk.createChunkForWrite(String.format("%05d", chunkId), return chunkContext.createChunkForWrite(String.format("%05d", chunkId));
config);
} }
@ -351,6 +359,7 @@ public class DynamicInputFormat<K, V> extends InputFormat<K, V> {
InputSplit inputSplit, InputSplit inputSplit,
TaskAttemptContext taskAttemptContext) TaskAttemptContext taskAttemptContext)
throws IOException, InterruptedException { throws IOException, InterruptedException {
return new DynamicRecordReader<K, V>(); chunkContext = getChunkContext(taskAttemptContext.getConfiguration());
return new DynamicRecordReader<K, V>(chunkContext);
} }
} }

View File

@ -49,9 +49,14 @@ public class DynamicRecordReader<K, V> extends RecordReader<K, V> {
private int numRecordsProcessedByThisMap = 0; private int numRecordsProcessedByThisMap = 0;
private long timeOfLastChunkDirScan = 0; private long timeOfLastChunkDirScan = 0;
private boolean isChunkDirAlreadyScanned = false; private boolean isChunkDirAlreadyScanned = false;
private DynamicInputChunkContext<K, V> chunkContext;
private static long TIME_THRESHOLD_FOR_DIR_SCANS = TimeUnit.MINUTES.toMillis(5); private static long TIME_THRESHOLD_FOR_DIR_SCANS = TimeUnit.MINUTES.toMillis(5);
DynamicRecordReader(DynamicInputChunkContext<K, V> chunkContext) {
this.chunkContext = chunkContext;
}
/** /**
* Implementation for RecordReader::initialize(). Initializes the internal * Implementation for RecordReader::initialize(). Initializes the internal
* RecordReader to read from chunks. * RecordReader to read from chunks.
@ -69,7 +74,7 @@ public class DynamicRecordReader<K, V> extends RecordReader<K, V> {
this.taskAttemptContext = taskAttemptContext; this.taskAttemptContext = taskAttemptContext;
configuration = taskAttemptContext.getConfiguration(); configuration = taskAttemptContext.getConfiguration();
taskId = taskAttemptContext.getTaskAttemptID().getTaskID(); taskId = taskAttemptContext.getTaskAttemptID().getTaskID();
chunk = DynamicInputChunk.acquire(this.taskAttemptContext); chunk = chunkContext.acquire(this.taskAttemptContext);
timeOfLastChunkDirScan = System.currentTimeMillis(); timeOfLastChunkDirScan = System.currentTimeMillis();
isChunkDirAlreadyScanned = false; isChunkDirAlreadyScanned = false;
@ -114,7 +119,7 @@ public class DynamicRecordReader<K, V> extends RecordReader<K, V> {
timeOfLastChunkDirScan = System.currentTimeMillis(); timeOfLastChunkDirScan = System.currentTimeMillis();
isChunkDirAlreadyScanned = false; isChunkDirAlreadyScanned = false;
chunk = DynamicInputChunk.acquire(taskAttemptContext); chunk = chunkContext.acquire(taskAttemptContext);
if (chunk == null) return false; if (chunk == null) return false;
@ -182,12 +187,12 @@ public class DynamicRecordReader<K, V> extends RecordReader<K, V> {
|| (!isChunkDirAlreadyScanned && || (!isChunkDirAlreadyScanned &&
numRecordsProcessedByThisMap%numRecordsPerChunk numRecordsProcessedByThisMap%numRecordsPerChunk
> numRecordsPerChunk/2)) { > numRecordsPerChunk/2)) {
DynamicInputChunk.getListOfChunkFiles(); chunkContext.getListOfChunkFiles();
isChunkDirAlreadyScanned = true; isChunkDirAlreadyScanned = true;
timeOfLastChunkDirScan = now; timeOfLastChunkDirScan = now;
} }
return DynamicInputChunk.getNumChunksLeft(); return chunkContext.getNumChunksLeft();
} }
/** /**
* Implementation of RecordReader::close(). * Implementation of RecordReader::close().

View File

@ -64,6 +64,10 @@ public class StubContext {
return reader; return reader;
} }
public void setReader(RecordReader<Text, CopyListingFileStatus> reader) {
this.reader = reader;
}
public StubInMemoryWriter getWriter() { public StubInMemoryWriter getWriter() {
return writer; return writer;
} }

View File

@ -40,6 +40,7 @@ import org.junit.BeforeClass;
import org.junit.Test; import org.junit.Test;
import java.io.DataOutputStream; import java.io.DataOutputStream;
import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
@ -126,13 +127,14 @@ public class TestDynamicInputFormat {
int taskId = 0; int taskId = 0;
for (InputSplit split : splits) { for (InputSplit split : splits) {
RecordReader<Text, CopyListingFileStatus> recordReader =
inputFormat.createRecordReader(split, null);
StubContext stubContext = new StubContext(jobContext.getConfiguration(), StubContext stubContext = new StubContext(jobContext.getConfiguration(),
recordReader, taskId); null, taskId);
final TaskAttemptContext taskAttemptContext final TaskAttemptContext taskAttemptContext
= stubContext.getContext(); = stubContext.getContext();
RecordReader<Text, CopyListingFileStatus> recordReader =
inputFormat.createRecordReader(split, taskAttemptContext);
stubContext.setReader(recordReader);
recordReader.initialize(splits.get(0), taskAttemptContext); recordReader.initialize(splits.get(0), taskAttemptContext);
float previousProgressValue = 0f; float previousProgressValue = 0f;
while (recordReader.nextKeyValue()) { while (recordReader.nextKeyValue()) {
@ -182,4 +184,27 @@ public class TestDynamicInputFormat {
conf.setInt(DistCpConstants.CONF_LABEL_SPLIT_RATIO, 53); conf.setInt(DistCpConstants.CONF_LABEL_SPLIT_RATIO, 53);
Assert.assertEquals(53, DynamicInputFormat.getSplitRatio(3, 200, conf)); Assert.assertEquals(53, DynamicInputFormat.getSplitRatio(3, 200, conf));
} }
@Test
public void testDynamicInputChunkContext() throws IOException {
Configuration configuration = new Configuration();
configuration.set(DistCpConstants.CONF_LABEL_LISTING_FILE_PATH,
"/tmp/test/file1.seq");
DynamicInputFormat firstInputFormat = new DynamicInputFormat();
DynamicInputFormat secondInputFormat = new DynamicInputFormat();
DynamicInputChunkContext firstContext =
firstInputFormat.getChunkContext(configuration);
DynamicInputChunkContext secondContext =
firstInputFormat.getChunkContext(configuration);
DynamicInputChunkContext thirdContext =
secondInputFormat.getChunkContext(configuration);
DynamicInputChunkContext fourthContext =
secondInputFormat.getChunkContext(configuration);
Assert.assertTrue("Chunk contexts from the same DynamicInputFormat " +
"object should be the same.",firstContext.equals(secondContext));
Assert.assertTrue("Chunk contexts from the same DynamicInputFormat " +
"object should be the same.",thirdContext.equals(fourthContext));
Assert.assertTrue("Contexts from different DynamicInputFormat " +
"objects should be different.",!firstContext.equals(thirdContext));
}
} }