diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 0ecc4f5f67c..73f9b93fba6 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -39,6 +39,10 @@ Release 2.5.0 - UNRELEASED MAPREDUCE-5638. Port Hadoop Archives document to trunk (Akira AJISAKA via jeagles) + MAPREDUCE-5402. In DynamicInputFormat, change MAX_CHUNKS_TOLERABLE, + MAX_CHUNKS_IDEAL, MIN_RECORDS_PER_CHUNK and SPLIT_RATIO to be configurable. + (Tsuyoshi OZAWA via szetszwo) + OPTIMIZATIONS BUG FIXES diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpConstants.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpConstants.java index e4b3e42e936..804d2802931 100644 --- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpConstants.java +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpConstants.java @@ -51,7 +51,16 @@ public class DistCpConstants { public static final String CONF_LABEL_SKIP_CRC = "distcp.skip.crc"; public static final String CONF_LABEL_OVERWRITE = "distcp.copy.overwrite"; public static final String CONF_LABEL_BANDWIDTH_MB = "distcp.map.bandwidth.mb"; - + + public static final String CONF_LABEL_MAX_CHUNKS_TOLERABLE = + "distcp.dynamic.max.chunks.tolerable"; + public static final String CONF_LABEL_MAX_CHUNKS_IDEAL = + "distcp.dynamic.max.chunks.ideal"; + public static final String CONF_LABEL_MIN_RECORDS_PER_CHUNK = + "distcp.dynamic.min.records_per_chunk"; + public static final String CONF_LABEL_SPLIT_RATIO = + "distcp.dynamic.split.ratio"; + /* Total bytes to be copied. Updated by copylisting. Unfiltered count */ public static final String CONF_LABEL_TOTAL_BYTES_TO_BE_COPIED = "mapred.total.bytes.expected"; @@ -107,4 +116,13 @@ public class DistCpConstants { public static final int INVALID_ARGUMENT = -1; public static final int DUPLICATE_INPUT = -2; public static final int UNKNOWN_ERROR = -999; + + /** + * Constants for DistCp default values of configurable values + */ + public static final int MAX_CHUNKS_TOLERABLE_DEFAULT = 400; + public static final int MAX_CHUNKS_IDEAL_DEFAULT = 100; + public static final int MIN_RECORDS_PER_CHUNK_DEFAULT = 5; + public static final int SPLIT_RATIO_DEFAULT = 2; + } diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/lib/DynamicInputFormat.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/lib/DynamicInputFormat.java index e1ae9f90f57..14895d30af0 100644 --- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/lib/DynamicInputFormat.java +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/lib/DynamicInputFormat.java @@ -57,7 +57,7 @@ public class DynamicInputFormat extends InputFormat { = "mapred.num.splits"; private static final String CONF_LABEL_NUM_ENTRIES_PER_CHUNK = "mapred.num.entries.per.chunk"; - + /** * Implementation of InputFormat::getSplits(). This method splits up the * copy-listing file into chunks, and assigns the first batch to different @@ -91,7 +91,7 @@ public class DynamicInputFormat extends InputFormat { // Setting non-zero length for FileSplit size, to avoid a possible // future when 0-sized file-splits are considered "empty" and skipped // over. - MIN_RECORDS_PER_CHUNK, + getMinRecordsPerChunk(jobContext.getConfiguration()), null)); } DistCpUtils.publish(jobContext.getConfiguration(), @@ -107,9 +107,11 @@ public class DynamicInputFormat extends InputFormat { final Configuration configuration = context.getConfiguration(); int numRecords = getNumberOfRecords(configuration); int numMaps = getNumMapTasks(configuration); + int maxChunksTolerable = getMaxChunksTolerable(configuration); + // Number of chunks each map will process, on average. int splitRatio = getListingSplitRatio(configuration, numMaps, numRecords); - validateNumChunksUsing(splitRatio, numMaps); + validateNumChunksUsing(splitRatio, numMaps, maxChunksTolerable); int numEntriesPerChunk = (int)Math.ceil((float)numRecords /(splitRatio * numMaps)); @@ -168,9 +170,9 @@ public class DynamicInputFormat extends InputFormat { return chunksFinal; } - private static void validateNumChunksUsing(int splitRatio, int numMaps) - throws IOException { - if (splitRatio * numMaps > MAX_CHUNKS_TOLERABLE) + private static void validateNumChunksUsing(int splitRatio, int numMaps, + int maxChunksTolerable) throws IOException { + if (splitRatio * numMaps > maxChunksTolerable) throw new IOException("Too many chunks created with splitRatio:" + splitRatio + ", numMaps:" + numMaps + ". Reduce numMaps or decrease split-ratio to proceed."); @@ -238,14 +240,61 @@ public class DynamicInputFormat extends InputFormat { int numMaps, int numPaths) { return configuration.getInt( CONF_LABEL_LISTING_SPLIT_RATIO, - getSplitRatio(numMaps, numPaths)); + getSplitRatio(numMaps, numPaths, configuration)); + } + + private static int getMaxChunksTolerable(Configuration conf) { + int maxChunksTolerable = conf.getInt( + DistCpConstants.CONF_LABEL_MAX_CHUNKS_TOLERABLE, + DistCpConstants.MAX_CHUNKS_TOLERABLE_DEFAULT); + if (maxChunksTolerable <= 0) { + LOG.warn(DistCpConstants.CONF_LABEL_MAX_CHUNKS_TOLERABLE + + " should be positive. Fall back to default value: " + + DistCpConstants.MAX_CHUNKS_TOLERABLE_DEFAULT); + maxChunksTolerable = DistCpConstants.MAX_CHUNKS_TOLERABLE_DEFAULT; + } + return maxChunksTolerable; + } + + private static int getMaxChunksIdeal(Configuration conf) { + int maxChunksIdeal = conf.getInt( + DistCpConstants.CONF_LABEL_MAX_CHUNKS_IDEAL, + DistCpConstants.MAX_CHUNKS_IDEAL_DEFAULT); + if (maxChunksIdeal <= 0) { + LOG.warn(DistCpConstants.CONF_LABEL_MAX_CHUNKS_IDEAL + + " should be positive. Fall back to default value: " + + DistCpConstants.MAX_CHUNKS_IDEAL_DEFAULT); + maxChunksIdeal = DistCpConstants.MAX_CHUNKS_IDEAL_DEFAULT; + } + return maxChunksIdeal; + } + + private static int getMinRecordsPerChunk(Configuration conf) { + int minRecordsPerChunk = conf.getInt( + DistCpConstants.CONF_LABEL_MIN_RECORDS_PER_CHUNK, + DistCpConstants.MIN_RECORDS_PER_CHUNK_DEFAULT); + if (minRecordsPerChunk <= 0) { + LOG.warn(DistCpConstants.CONF_LABEL_MIN_RECORDS_PER_CHUNK + + " should be positive. Fall back to default value: " + + DistCpConstants.MIN_RECORDS_PER_CHUNK_DEFAULT); + minRecordsPerChunk = DistCpConstants.MIN_RECORDS_PER_CHUNK_DEFAULT; + } + return minRecordsPerChunk; } - private static final int MAX_CHUNKS_TOLERABLE = 400; - private static final int MAX_CHUNKS_IDEAL = 100; - private static final int MIN_RECORDS_PER_CHUNK = 5; - private static final int SPLIT_RATIO_DEFAULT = 2; - + private static int getSplitRatio(Configuration conf) { + int splitRatio = conf.getInt( + DistCpConstants.CONF_LABEL_SPLIT_RATIO, + DistCpConstants.SPLIT_RATIO_DEFAULT); + if (splitRatio <= 0) { + LOG.warn(DistCpConstants.CONF_LABEL_SPLIT_RATIO + + " should be positive. Fall back to default value: " + + DistCpConstants.SPLIT_RATIO_DEFAULT); + splitRatio = DistCpConstants.SPLIT_RATIO_DEFAULT; + } + return splitRatio; + } + /** * Package private, for testability. * @param nMaps The number of maps requested for. @@ -253,19 +302,34 @@ public class DynamicInputFormat extends InputFormat { * @return The number of splits each map should handle, ideally. */ static int getSplitRatio(int nMaps, int nRecords) { + return getSplitRatio(nMaps, nRecords,new Configuration()); + } + + /** + * Package private, for testability. + * @param nMaps The number of maps requested for. + * @param nRecords The number of records to be copied. + * @param conf The configuration set by users. + * @return The number of splits each map should handle, ideally. + */ + static int getSplitRatio(int nMaps, int nRecords, Configuration conf) { + int maxChunksIdeal = getMaxChunksIdeal(conf); + int minRecordsPerChunk = getMinRecordsPerChunk(conf); + int splitRatio = getSplitRatio(conf); + if (nMaps == 1) { LOG.warn("nMaps == 1. Why use DynamicInputFormat?"); return 1; } - if (nMaps > MAX_CHUNKS_IDEAL) - return SPLIT_RATIO_DEFAULT; + if (nMaps > maxChunksIdeal) + return splitRatio; - int nPickups = (int)Math.ceil((float)MAX_CHUNKS_IDEAL/nMaps); + int nPickups = (int)Math.ceil((float)maxChunksIdeal/nMaps); int nRecordsPerChunk = (int)Math.ceil((float)nRecords/(nMaps*nPickups)); - return nRecordsPerChunk < MIN_RECORDS_PER_CHUNK ? - SPLIT_RATIO_DEFAULT : nPickups; + return nRecordsPerChunk < minRecordsPerChunk ? + splitRatio : nPickups; } static int getNumEntriesPerChunk(Configuration configuration) { diff --git a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/lib/TestDynamicInputFormat.java b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/lib/TestDynamicInputFormat.java index e5e636da532..ad67eb0371c 100644 --- a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/lib/TestDynamicInputFormat.java +++ b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/lib/TestDynamicInputFormat.java @@ -18,6 +18,7 @@ package org.apache.hadoop.tools.mapred.lib; +import org.apache.hadoop.tools.DistCpConstants; import org.junit.Assert; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -160,5 +161,25 @@ public class TestDynamicInputFormat { Assert.assertEquals(2, DynamicInputFormat.getSplitRatio(11000000, 10)); Assert.assertEquals(4, DynamicInputFormat.getSplitRatio(30, 700)); Assert.assertEquals(2, DynamicInputFormat.getSplitRatio(30, 200)); + + // Tests with negative value configuration + Configuration conf = new Configuration(); + conf.setInt(DistCpConstants.CONF_LABEL_MAX_CHUNKS_TOLERABLE, -1); + conf.setInt(DistCpConstants.CONF_LABEL_MAX_CHUNKS_IDEAL, -1); + conf.setInt(DistCpConstants.CONF_LABEL_MIN_RECORDS_PER_CHUNK, -1); + conf.setInt(DistCpConstants.CONF_LABEL_SPLIT_RATIO, -1); + Assert.assertEquals(1, + DynamicInputFormat.getSplitRatio(1, 1000000000, conf)); + Assert.assertEquals(2, + DynamicInputFormat.getSplitRatio(11000000, 10, conf)); + Assert.assertEquals(4, DynamicInputFormat.getSplitRatio(30, 700, conf)); + Assert.assertEquals(2, DynamicInputFormat.getSplitRatio(30, 200, conf)); + + // Tests with valid configuration + conf.setInt(DistCpConstants.CONF_LABEL_MAX_CHUNKS_TOLERABLE, 100); + conf.setInt(DistCpConstants.CONF_LABEL_MAX_CHUNKS_IDEAL, 30); + conf.setInt(DistCpConstants.CONF_LABEL_MIN_RECORDS_PER_CHUNK, 10); + conf.setInt(DistCpConstants.CONF_LABEL_SPLIT_RATIO, 53); + Assert.assertEquals(53, DynamicInputFormat.getSplitRatio(3, 200, conf)); } }