MAPREDUCE-6451. DistCp has incorrect chunkFilePath for multiple jobs when strategy is dynamic. Contributed by Kuhu Shukla.

This commit is contained in:
Kihwal Lee 2015-10-30 14:56:41 -05:00
parent 18727c63da
commit 2868ca0328
6 changed files with 83 additions and 138 deletions

View File

@ -674,6 +674,9 @@ Release 2.7.2 - UNRELEASED
MAPREDUCE-6528. Memory leak for HistoryFileManager.getJobSummary()
(Junping Du via jlowe)
MAPREDUCE-6451. DistCp has incorrect chunkFilePath for multiple jobs when
strategy is dynamic (Kuhu Shukla via kihwal)
Release 2.7.1 - 2015-07-06
INCOMPATIBLE CHANGES

View File

@ -20,14 +20,10 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.tools.CopyListingFileStatus;
import org.apache.hadoop.tools.DistCpConstants;
import org.apache.hadoop.tools.util.DistCpUtils;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileRecordReader;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
@ -47,71 +43,27 @@
*/
class DynamicInputChunk<K, V> {
private static Log LOG = LogFactory.getLog(DynamicInputChunk.class);
private static Configuration configuration;
private static Path chunkRootPath;
private static String chunkFilePrefix;
private static int numChunksLeft = -1; // Un-initialized before 1st dir-scan.
private static FileSystem fs;
private Path chunkFilePath;
private SequenceFileRecordReader<K, V> reader;
private SequenceFile.Writer writer;
private DynamicInputChunkContext chunkContext;
private static void initializeChunkInvariants(Configuration config)
throws IOException {
configuration = config;
Path listingFilePath = new Path(getListingFilePath(configuration));
chunkRootPath = new Path(listingFilePath.getParent(), "chunkDir");
fs = chunkRootPath.getFileSystem(configuration);
chunkFilePrefix = listingFilePath.getName() + ".chunk.";
}
private static String getListingFilePath(Configuration configuration) {
final String listingFileString = configuration.get(
DistCpConstants.CONF_LABEL_LISTING_FILE_PATH, "");
assert !listingFileString.equals("") : "Listing file not found.";
return listingFileString;
}
private static boolean areInvariantsInitialized() {
return chunkRootPath != null;
}
private DynamicInputChunk(String chunkId, Configuration configuration)
DynamicInputChunk(String chunkId, DynamicInputChunkContext chunkContext)
throws IOException {
if (!areInvariantsInitialized())
initializeChunkInvariants(configuration);
chunkFilePath = new Path(chunkRootPath, chunkFilePrefix + chunkId);
this.chunkContext = chunkContext;
chunkFilePath = new Path(chunkContext.getChunkRootPath(),
chunkContext.getChunkFilePrefix() + chunkId);
openForWrite();
}
private void openForWrite() throws IOException {
writer = SequenceFile.createWriter(
chunkFilePath.getFileSystem(configuration), configuration,
chunkContext.getFs(), chunkContext.getConfiguration(),
chunkFilePath, Text.class, CopyListingFileStatus.class,
SequenceFile.CompressionType.NONE);
}
/**
* Factory method to create chunk-files for writing to.
* (For instance, when the DynamicInputFormat splits the input-file into
* chunks.)
* @param chunkId String to identify the chunk.
* @param configuration Configuration, describing the location of the listing-
* file, file-system for the map-job, etc.
* @return A DynamicInputChunk, corresponding to a chunk-file, with the name
* incorporating the chunk-id.
* @throws IOException Exception on failure to create the chunk.
*/
public static DynamicInputChunk createChunkForWrite(String chunkId,
Configuration configuration) throws IOException {
return new DynamicInputChunk(chunkId, configuration);
}
/**
* Method to write records into a chunk.
* @param key Key from the listing file.
@ -135,19 +87,19 @@ public void close() {
* @throws IOException Exception on failure to reassign.
*/
public void assignTo(TaskID taskId) throws IOException {
Path newPath = new Path(chunkRootPath, taskId.toString());
if (!fs.rename(chunkFilePath, newPath)) {
Path newPath = new Path(chunkContext.getChunkRootPath(), taskId.toString());
if (!chunkContext.getFs().rename(chunkFilePath, newPath)) {
LOG.warn(chunkFilePath + " could not be assigned to " + taskId);
}
}
private DynamicInputChunk(Path chunkFilePath,
TaskAttemptContext taskAttemptContext)
throws IOException, InterruptedException {
if (!areInvariantsInitialized())
initializeChunkInvariants(taskAttemptContext.getConfiguration());
public DynamicInputChunk(Path chunkFilePath,
TaskAttemptContext taskAttemptContext,
DynamicInputChunkContext chunkContext) throws IOException,
InterruptedException {
this.chunkFilePath = chunkFilePath;
this.chunkContext = chunkContext;
openForRead(taskAttemptContext);
}
@ -155,45 +107,8 @@ private void openForRead(TaskAttemptContext taskAttemptContext)
throws IOException, InterruptedException {
reader = new SequenceFileRecordReader<K, V>();
reader.initialize(new FileSplit(chunkFilePath, 0,
DistCpUtils.getFileSize(chunkFilePath, configuration), null),
taskAttemptContext);
}
/**
* Factory method that
* 1. acquires a chunk for the specified map-task attempt
* 2. returns a DynamicInputChunk associated with the acquired chunk-file.
* @param taskAttemptContext The attempt-context for the map task that's
* trying to acquire a chunk.
* @return The acquired dynamic-chunk. The chunk-file is renamed to the
* attempt-id (from the attempt-context.)
* @throws IOException Exception on failure.
* @throws InterruptedException Exception on failure.
*/
public static DynamicInputChunk acquire(TaskAttemptContext taskAttemptContext)
throws IOException, InterruptedException {
if (!areInvariantsInitialized())
initializeChunkInvariants(taskAttemptContext.getConfiguration());
String taskId
= taskAttemptContext.getTaskAttemptID().getTaskID().toString();
Path acquiredFilePath = new Path(chunkRootPath, taskId);
if (fs.exists(acquiredFilePath)) {
LOG.info("Acquiring pre-assigned chunk: " + acquiredFilePath);
return new DynamicInputChunk(acquiredFilePath, taskAttemptContext);
}
for (FileStatus chunkFile : getListOfChunkFiles()) {
if (fs.rename(chunkFile.getPath(), acquiredFilePath)) {
LOG.info(taskId + " acquired " + chunkFile.getPath());
return new DynamicInputChunk(acquiredFilePath, taskAttemptContext);
}
else
LOG.warn(taskId + " could not acquire " + chunkFile.getPath());
}
return null;
DistCpUtils.getFileSize(chunkFilePath,
chunkContext.getConfiguration()), null), taskAttemptContext);
}
/**
@ -204,19 +119,13 @@ public static DynamicInputChunk acquire(TaskAttemptContext taskAttemptContext)
*/
public void release() throws IOException {
close();
if (!fs.delete(chunkFilePath, false)) {
if (!chunkContext.getFs().delete(chunkFilePath, false)) {
LOG.error("Unable to release chunk at path: " + chunkFilePath);
throw new IOException("Unable to release chunk at path: " + chunkFilePath);
throw new IOException("Unable to release chunk at path: " +
chunkFilePath);
}
}
static FileStatus [] getListOfChunkFiles() throws IOException {
Path chunkFilePattern = new Path(chunkRootPath, chunkFilePrefix + "*");
FileStatus chunkFiles[] = fs.globStatus(chunkFilePattern);
numChunksLeft = chunkFiles.length;
return chunkFiles;
}
/**
* Getter for the chunk-file's path, on HDFS.
* @return The qualified path to the chunk-file.
@ -234,14 +143,4 @@ public SequenceFileRecordReader<K,V> getReader() {
return reader;
}
/**
* Getter for the number of chunk-files left in the chunk-file directory.
* Useful to determine how many chunks (and hence, records) are left to be
* processed.
* @return Before the first scan of the directory, the number returned is -1.
* Otherwise, the number of chunk-files seen from the last scan is returned.
*/
public static int getNumChunksLeft() {
return numChunksLeft;
}
}

View File

@ -57,7 +57,8 @@ public class DynamicInputFormat<K, V> extends InputFormat<K, V> {
= "mapred.num.splits";
private static final String CONF_LABEL_NUM_ENTRIES_PER_CHUNK
= "mapred.num.entries.per.chunk";
private DynamicInputChunkContext<K, V> chunkContext = null;
/**
* Implementation of InputFormat::getSplits(). This method splits up the
* copy-listing file into chunks, and assigns the first batch to different
@ -72,6 +73,7 @@ public List<InputSplit> getSplits(JobContext jobContext)
throws IOException, InterruptedException {
LOG.info("DynamicInputFormat: Getting splits for job:"
+ jobContext.getJobID());
chunkContext = getChunkContext(jobContext.getConfiguration());
return createSplits(jobContext,
splitCopyListingIntoChunksWithShuffle(jobContext));
}
@ -101,6 +103,13 @@ private List<InputSplit> createSplits(JobContext jobContext,
private static int N_CHUNKS_OPEN_AT_ONCE_DEFAULT = 16;
public DynamicInputChunkContext<K, V> getChunkContext(
Configuration configuration) throws IOException {
if(chunkContext == null) {
chunkContext = new DynamicInputChunkContext<K, V>(configuration);
}
return chunkContext;
}
private List<DynamicInputChunk> splitCopyListingIntoChunksWithShuffle
(JobContext context) throws IOException {
@ -146,8 +155,8 @@ private List<InputSplit> createSplits(JobContext jobContext,
closeAll(openChunks);
chunksFinal.addAll(openChunks);
openChunks = createChunks(
configuration, chunkCount, nChunksTotal, nChunksOpenAtOnce);
openChunks = createChunks(chunkCount, nChunksTotal,
nChunksOpenAtOnce);
chunkCount += openChunks.size();
@ -183,9 +192,9 @@ private static void closeAll(List<DynamicInputChunk> chunks) {
chunk.close();
}
private static List<DynamicInputChunk> createChunks(Configuration config,
int chunkCount, int nChunksTotal, int nChunksOpenAtOnce)
throws IOException {
private List<DynamicInputChunk> createChunks(int chunkCount,
int nChunksTotal, int nChunksOpenAtOnce)
throws IOException {
List<DynamicInputChunk> chunks = new ArrayList<DynamicInputChunk>();
int chunkIdUpperBound
= Math.min(nChunksTotal, chunkCount + nChunksOpenAtOnce);
@ -197,14 +206,13 @@ private static List<DynamicInputChunk> createChunks(Configuration config,
chunkIdUpperBound = nChunksTotal;
for (int i=chunkCount; i < chunkIdUpperBound; ++i)
chunks.add(createChunk(i, config));
chunks.add(createChunk(i));
return chunks;
}
private static DynamicInputChunk createChunk(int chunkId, Configuration config)
private DynamicInputChunk createChunk(int chunkId)
throws IOException {
return DynamicInputChunk.createChunkForWrite(String.format("%05d", chunkId),
config);
return chunkContext.createChunkForWrite(String.format("%05d", chunkId));
}
@ -351,6 +359,7 @@ public RecordReader<K, V> createRecordReader(
InputSplit inputSplit,
TaskAttemptContext taskAttemptContext)
throws IOException, InterruptedException {
return new DynamicRecordReader<K, V>();
chunkContext = getChunkContext(taskAttemptContext.getConfiguration());
return new DynamicRecordReader<K, V>(chunkContext);
}
}

View File

@ -49,9 +49,14 @@ public class DynamicRecordReader<K, V> extends RecordReader<K, V> {
private int numRecordsProcessedByThisMap = 0;
private long timeOfLastChunkDirScan = 0;
private boolean isChunkDirAlreadyScanned = false;
private DynamicInputChunkContext<K, V> chunkContext;
private static long TIME_THRESHOLD_FOR_DIR_SCANS = TimeUnit.MINUTES.toMillis(5);
DynamicRecordReader(DynamicInputChunkContext<K, V> chunkContext) {
this.chunkContext = chunkContext;
}
/**
* Implementation for RecordReader::initialize(). Initializes the internal
* RecordReader to read from chunks.
@ -69,7 +74,7 @@ public void initialize(InputSplit inputSplit,
this.taskAttemptContext = taskAttemptContext;
configuration = taskAttemptContext.getConfiguration();
taskId = taskAttemptContext.getTaskAttemptID().getTaskID();
chunk = DynamicInputChunk.acquire(this.taskAttemptContext);
chunk = chunkContext.acquire(this.taskAttemptContext);
timeOfLastChunkDirScan = System.currentTimeMillis();
isChunkDirAlreadyScanned = false;
@ -114,7 +119,7 @@ public boolean nextKeyValue()
timeOfLastChunkDirScan = System.currentTimeMillis();
isChunkDirAlreadyScanned = false;
chunk = DynamicInputChunk.acquire(taskAttemptContext);
chunk = chunkContext.acquire(taskAttemptContext);
if (chunk == null) return false;
@ -182,12 +187,12 @@ private int getNumChunksLeft() throws IOException {
|| (!isChunkDirAlreadyScanned &&
numRecordsProcessedByThisMap%numRecordsPerChunk
> numRecordsPerChunk/2)) {
DynamicInputChunk.getListOfChunkFiles();
chunkContext.getListOfChunkFiles();
isChunkDirAlreadyScanned = true;
timeOfLastChunkDirScan = now;
}
return DynamicInputChunk.getNumChunksLeft();
return chunkContext.getNumChunksLeft();
}
/**
* Implementation of RecordReader::close().

View File

@ -64,6 +64,10 @@ public RecordReader<Text, CopyListingFileStatus> getReader() {
return reader;
}
public void setReader(RecordReader<Text, CopyListingFileStatus> reader) {
this.reader = reader;
}
public StubInMemoryWriter getWriter() {
return writer;
}

View File

@ -40,6 +40,7 @@
import org.junit.Test;
import java.io.DataOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
@ -126,13 +127,14 @@ public void testGetSplits() throws Exception {
int taskId = 0;
for (InputSplit split : splits) {
RecordReader<Text, CopyListingFileStatus> recordReader =
inputFormat.createRecordReader(split, null);
StubContext stubContext = new StubContext(jobContext.getConfiguration(),
recordReader, taskId);
null, taskId);
final TaskAttemptContext taskAttemptContext
= stubContext.getContext();
RecordReader<Text, CopyListingFileStatus> recordReader =
inputFormat.createRecordReader(split, taskAttemptContext);
stubContext.setReader(recordReader);
recordReader.initialize(splits.get(0), taskAttemptContext);
float previousProgressValue = 0f;
while (recordReader.nextKeyValue()) {
@ -182,4 +184,27 @@ public void testGetSplitRatio() throws Exception {
conf.setInt(DistCpConstants.CONF_LABEL_SPLIT_RATIO, 53);
Assert.assertEquals(53, DynamicInputFormat.getSplitRatio(3, 200, conf));
}
@Test
public void testDynamicInputChunkContext() throws IOException {
Configuration configuration = new Configuration();
configuration.set(DistCpConstants.CONF_LABEL_LISTING_FILE_PATH,
"/tmp/test/file1.seq");
DynamicInputFormat firstInputFormat = new DynamicInputFormat();
DynamicInputFormat secondInputFormat = new DynamicInputFormat();
DynamicInputChunkContext firstContext =
firstInputFormat.getChunkContext(configuration);
DynamicInputChunkContext secondContext =
firstInputFormat.getChunkContext(configuration);
DynamicInputChunkContext thirdContext =
secondInputFormat.getChunkContext(configuration);
DynamicInputChunkContext fourthContext =
secondInputFormat.getChunkContext(configuration);
Assert.assertTrue("Chunk contexts from the same DynamicInputFormat " +
"object should be the same.",firstContext.equals(secondContext));
Assert.assertTrue("Chunk contexts from the same DynamicInputFormat " +
"object should be the same.",thirdContext.equals(fourthContext));
Assert.assertTrue("Contexts from different DynamicInputFormat " +
"objects should be different.",!firstContext.equals(thirdContext));
}
}