MAPREDUCE-5402. In DynamicInputFormat, change MAX_CHUNKS_TOLERABLE, MAX_CHUNKS_IDEAL, MIN_RECORDS_PER_CHUNK and SPLIT_RATIO to be configurable. Contributed by Tsuyoshi OZAWA
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1592703 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
17d4fbbf0a
commit
03db13206f
|
@ -178,6 +178,10 @@ Release 2.5.0 - UNRELEASED
|
||||||
MAPREDUCE-5638. Port Hadoop Archives document to trunk (Akira AJISAKA via
|
MAPREDUCE-5638. Port Hadoop Archives document to trunk (Akira AJISAKA via
|
||||||
jeagles)
|
jeagles)
|
||||||
|
|
||||||
|
MAPREDUCE-5402. In DynamicInputFormat, change MAX_CHUNKS_TOLERABLE,
|
||||||
|
MAX_CHUNKS_IDEAL, MIN_RECORDS_PER_CHUNK and SPLIT_RATIO to be configurable.
|
||||||
|
(Tsuyoshi OZAWA via szetszwo)
|
||||||
|
|
||||||
OPTIMIZATIONS
|
OPTIMIZATIONS
|
||||||
|
|
||||||
BUG FIXES
|
BUG FIXES
|
||||||
|
|
|
@ -51,7 +51,16 @@ public class DistCpConstants {
|
||||||
public static final String CONF_LABEL_SKIP_CRC = "distcp.skip.crc";
|
public static final String CONF_LABEL_SKIP_CRC = "distcp.skip.crc";
|
||||||
public static final String CONF_LABEL_OVERWRITE = "distcp.copy.overwrite";
|
public static final String CONF_LABEL_OVERWRITE = "distcp.copy.overwrite";
|
||||||
public static final String CONF_LABEL_BANDWIDTH_MB = "distcp.map.bandwidth.mb";
|
public static final String CONF_LABEL_BANDWIDTH_MB = "distcp.map.bandwidth.mb";
|
||||||
|
|
||||||
|
public static final String CONF_LABEL_MAX_CHUNKS_TOLERABLE =
|
||||||
|
"distcp.dynamic.max.chunks.tolerable";
|
||||||
|
public static final String CONF_LABEL_MAX_CHUNKS_IDEAL =
|
||||||
|
"distcp.dynamic.max.chunks.ideal";
|
||||||
|
public static final String CONF_LABEL_MIN_RECORDS_PER_CHUNK =
|
||||||
|
"distcp.dynamic.min.records_per_chunk";
|
||||||
|
public static final String CONF_LABEL_SPLIT_RATIO =
|
||||||
|
"distcp.dynamic.split.ratio";
|
||||||
|
|
||||||
/* Total bytes to be copied. Updated by copylisting. Unfiltered count */
|
/* Total bytes to be copied. Updated by copylisting. Unfiltered count */
|
||||||
public static final String CONF_LABEL_TOTAL_BYTES_TO_BE_COPIED = "mapred.total.bytes.expected";
|
public static final String CONF_LABEL_TOTAL_BYTES_TO_BE_COPIED = "mapred.total.bytes.expected";
|
||||||
|
|
||||||
|
@ -107,4 +116,13 @@ public class DistCpConstants {
|
||||||
public static final int INVALID_ARGUMENT = -1;
|
public static final int INVALID_ARGUMENT = -1;
|
||||||
public static final int DUPLICATE_INPUT = -2;
|
public static final int DUPLICATE_INPUT = -2;
|
||||||
public static final int UNKNOWN_ERROR = -999;
|
public static final int UNKNOWN_ERROR = -999;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Constants for DistCp default values of configurable values
|
||||||
|
*/
|
||||||
|
public static final int MAX_CHUNKS_TOLERABLE_DEFAULT = 400;
|
||||||
|
public static final int MAX_CHUNKS_IDEAL_DEFAULT = 100;
|
||||||
|
public static final int MIN_RECORDS_PER_CHUNK_DEFAULT = 5;
|
||||||
|
public static final int SPLIT_RATIO_DEFAULT = 2;
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -57,7 +57,7 @@ public class DynamicInputFormat<K, V> extends InputFormat<K, V> {
|
||||||
= "mapred.num.splits";
|
= "mapred.num.splits";
|
||||||
private static final String CONF_LABEL_NUM_ENTRIES_PER_CHUNK
|
private static final String CONF_LABEL_NUM_ENTRIES_PER_CHUNK
|
||||||
= "mapred.num.entries.per.chunk";
|
= "mapred.num.entries.per.chunk";
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Implementation of InputFormat::getSplits(). This method splits up the
|
* Implementation of InputFormat::getSplits(). This method splits up the
|
||||||
* copy-listing file into chunks, and assigns the first batch to different
|
* copy-listing file into chunks, and assigns the first batch to different
|
||||||
|
@ -91,7 +91,7 @@ public class DynamicInputFormat<K, V> extends InputFormat<K, V> {
|
||||||
// Setting non-zero length for FileSplit size, to avoid a possible
|
// Setting non-zero length for FileSplit size, to avoid a possible
|
||||||
// future when 0-sized file-splits are considered "empty" and skipped
|
// future when 0-sized file-splits are considered "empty" and skipped
|
||||||
// over.
|
// over.
|
||||||
MIN_RECORDS_PER_CHUNK,
|
getMinRecordsPerChunk(jobContext.getConfiguration()),
|
||||||
null));
|
null));
|
||||||
}
|
}
|
||||||
DistCpUtils.publish(jobContext.getConfiguration(),
|
DistCpUtils.publish(jobContext.getConfiguration(),
|
||||||
|
@ -107,9 +107,11 @@ public class DynamicInputFormat<K, V> extends InputFormat<K, V> {
|
||||||
final Configuration configuration = context.getConfiguration();
|
final Configuration configuration = context.getConfiguration();
|
||||||
int numRecords = getNumberOfRecords(configuration);
|
int numRecords = getNumberOfRecords(configuration);
|
||||||
int numMaps = getNumMapTasks(configuration);
|
int numMaps = getNumMapTasks(configuration);
|
||||||
|
int maxChunksTolerable = getMaxChunksTolerable(configuration);
|
||||||
|
|
||||||
// Number of chunks each map will process, on average.
|
// Number of chunks each map will process, on average.
|
||||||
int splitRatio = getListingSplitRatio(configuration, numMaps, numRecords);
|
int splitRatio = getListingSplitRatio(configuration, numMaps, numRecords);
|
||||||
validateNumChunksUsing(splitRatio, numMaps);
|
validateNumChunksUsing(splitRatio, numMaps, maxChunksTolerable);
|
||||||
|
|
||||||
int numEntriesPerChunk = (int)Math.ceil((float)numRecords
|
int numEntriesPerChunk = (int)Math.ceil((float)numRecords
|
||||||
/(splitRatio * numMaps));
|
/(splitRatio * numMaps));
|
||||||
|
@ -168,9 +170,9 @@ public class DynamicInputFormat<K, V> extends InputFormat<K, V> {
|
||||||
return chunksFinal;
|
return chunksFinal;
|
||||||
}
|
}
|
||||||
|
|
||||||
private static void validateNumChunksUsing(int splitRatio, int numMaps)
|
private static void validateNumChunksUsing(int splitRatio, int numMaps,
|
||||||
throws IOException {
|
int maxChunksTolerable) throws IOException {
|
||||||
if (splitRatio * numMaps > MAX_CHUNKS_TOLERABLE)
|
if (splitRatio * numMaps > maxChunksTolerable)
|
||||||
throw new IOException("Too many chunks created with splitRatio:"
|
throw new IOException("Too many chunks created with splitRatio:"
|
||||||
+ splitRatio + ", numMaps:" + numMaps
|
+ splitRatio + ", numMaps:" + numMaps
|
||||||
+ ". Reduce numMaps or decrease split-ratio to proceed.");
|
+ ". Reduce numMaps or decrease split-ratio to proceed.");
|
||||||
|
@ -238,14 +240,61 @@ public class DynamicInputFormat<K, V> extends InputFormat<K, V> {
|
||||||
int numMaps, int numPaths) {
|
int numMaps, int numPaths) {
|
||||||
return configuration.getInt(
|
return configuration.getInt(
|
||||||
CONF_LABEL_LISTING_SPLIT_RATIO,
|
CONF_LABEL_LISTING_SPLIT_RATIO,
|
||||||
getSplitRatio(numMaps, numPaths));
|
getSplitRatio(numMaps, numPaths, configuration));
|
||||||
|
}
|
||||||
|
|
||||||
|
private static int getMaxChunksTolerable(Configuration conf) {
|
||||||
|
int maxChunksTolerable = conf.getInt(
|
||||||
|
DistCpConstants.CONF_LABEL_MAX_CHUNKS_TOLERABLE,
|
||||||
|
DistCpConstants.MAX_CHUNKS_TOLERABLE_DEFAULT);
|
||||||
|
if (maxChunksTolerable <= 0) {
|
||||||
|
LOG.warn(DistCpConstants.CONF_LABEL_MAX_CHUNKS_TOLERABLE +
|
||||||
|
" should be positive. Fall back to default value: "
|
||||||
|
+ DistCpConstants.MAX_CHUNKS_TOLERABLE_DEFAULT);
|
||||||
|
maxChunksTolerable = DistCpConstants.MAX_CHUNKS_TOLERABLE_DEFAULT;
|
||||||
|
}
|
||||||
|
return maxChunksTolerable;
|
||||||
|
}
|
||||||
|
|
||||||
|
private static int getMaxChunksIdeal(Configuration conf) {
|
||||||
|
int maxChunksIdeal = conf.getInt(
|
||||||
|
DistCpConstants.CONF_LABEL_MAX_CHUNKS_IDEAL,
|
||||||
|
DistCpConstants.MAX_CHUNKS_IDEAL_DEFAULT);
|
||||||
|
if (maxChunksIdeal <= 0) {
|
||||||
|
LOG.warn(DistCpConstants.CONF_LABEL_MAX_CHUNKS_IDEAL +
|
||||||
|
" should be positive. Fall back to default value: "
|
||||||
|
+ DistCpConstants.MAX_CHUNKS_IDEAL_DEFAULT);
|
||||||
|
maxChunksIdeal = DistCpConstants.MAX_CHUNKS_IDEAL_DEFAULT;
|
||||||
|
}
|
||||||
|
return maxChunksIdeal;
|
||||||
|
}
|
||||||
|
|
||||||
|
private static int getMinRecordsPerChunk(Configuration conf) {
|
||||||
|
int minRecordsPerChunk = conf.getInt(
|
||||||
|
DistCpConstants.CONF_LABEL_MIN_RECORDS_PER_CHUNK,
|
||||||
|
DistCpConstants.MIN_RECORDS_PER_CHUNK_DEFAULT);
|
||||||
|
if (minRecordsPerChunk <= 0) {
|
||||||
|
LOG.warn(DistCpConstants.CONF_LABEL_MIN_RECORDS_PER_CHUNK +
|
||||||
|
" should be positive. Fall back to default value: "
|
||||||
|
+ DistCpConstants.MIN_RECORDS_PER_CHUNK_DEFAULT);
|
||||||
|
minRecordsPerChunk = DistCpConstants.MIN_RECORDS_PER_CHUNK_DEFAULT;
|
||||||
|
}
|
||||||
|
return minRecordsPerChunk;
|
||||||
}
|
}
|
||||||
|
|
||||||
private static final int MAX_CHUNKS_TOLERABLE = 400;
|
private static int getSplitRatio(Configuration conf) {
|
||||||
private static final int MAX_CHUNKS_IDEAL = 100;
|
int splitRatio = conf.getInt(
|
||||||
private static final int MIN_RECORDS_PER_CHUNK = 5;
|
DistCpConstants.CONF_LABEL_SPLIT_RATIO,
|
||||||
private static final int SPLIT_RATIO_DEFAULT = 2;
|
DistCpConstants.SPLIT_RATIO_DEFAULT);
|
||||||
|
if (splitRatio <= 0) {
|
||||||
|
LOG.warn(DistCpConstants.CONF_LABEL_SPLIT_RATIO +
|
||||||
|
" should be positive. Fall back to default value: "
|
||||||
|
+ DistCpConstants.SPLIT_RATIO_DEFAULT);
|
||||||
|
splitRatio = DistCpConstants.SPLIT_RATIO_DEFAULT;
|
||||||
|
}
|
||||||
|
return splitRatio;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Package private, for testability.
|
* Package private, for testability.
|
||||||
* @param nMaps The number of maps requested for.
|
* @param nMaps The number of maps requested for.
|
||||||
|
@ -253,19 +302,34 @@ public class DynamicInputFormat<K, V> extends InputFormat<K, V> {
|
||||||
* @return The number of splits each map should handle, ideally.
|
* @return The number of splits each map should handle, ideally.
|
||||||
*/
|
*/
|
||||||
static int getSplitRatio(int nMaps, int nRecords) {
|
static int getSplitRatio(int nMaps, int nRecords) {
|
||||||
|
return getSplitRatio(nMaps, nRecords,new Configuration());
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Package private, for testability.
|
||||||
|
* @param nMaps The number of maps requested for.
|
||||||
|
* @param nRecords The number of records to be copied.
|
||||||
|
* @param conf The configuration set by users.
|
||||||
|
* @return The number of splits each map should handle, ideally.
|
||||||
|
*/
|
||||||
|
static int getSplitRatio(int nMaps, int nRecords, Configuration conf) {
|
||||||
|
int maxChunksIdeal = getMaxChunksIdeal(conf);
|
||||||
|
int minRecordsPerChunk = getMinRecordsPerChunk(conf);
|
||||||
|
int splitRatio = getSplitRatio(conf);
|
||||||
|
|
||||||
if (nMaps == 1) {
|
if (nMaps == 1) {
|
||||||
LOG.warn("nMaps == 1. Why use DynamicInputFormat?");
|
LOG.warn("nMaps == 1. Why use DynamicInputFormat?");
|
||||||
return 1;
|
return 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (nMaps > MAX_CHUNKS_IDEAL)
|
if (nMaps > maxChunksIdeal)
|
||||||
return SPLIT_RATIO_DEFAULT;
|
return splitRatio;
|
||||||
|
|
||||||
int nPickups = (int)Math.ceil((float)MAX_CHUNKS_IDEAL/nMaps);
|
int nPickups = (int)Math.ceil((float)maxChunksIdeal/nMaps);
|
||||||
int nRecordsPerChunk = (int)Math.ceil((float)nRecords/(nMaps*nPickups));
|
int nRecordsPerChunk = (int)Math.ceil((float)nRecords/(nMaps*nPickups));
|
||||||
|
|
||||||
return nRecordsPerChunk < MIN_RECORDS_PER_CHUNK ?
|
return nRecordsPerChunk < minRecordsPerChunk ?
|
||||||
SPLIT_RATIO_DEFAULT : nPickups;
|
splitRatio : nPickups;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int getNumEntriesPerChunk(Configuration configuration) {
|
static int getNumEntriesPerChunk(Configuration configuration) {
|
||||||
|
|
|
@ -18,6 +18,7 @@
|
||||||
|
|
||||||
package org.apache.hadoop.tools.mapred.lib;
|
package org.apache.hadoop.tools.mapred.lib;
|
||||||
|
|
||||||
|
import org.apache.hadoop.tools.DistCpConstants;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
@ -160,5 +161,25 @@ public class TestDynamicInputFormat {
|
||||||
Assert.assertEquals(2, DynamicInputFormat.getSplitRatio(11000000, 10));
|
Assert.assertEquals(2, DynamicInputFormat.getSplitRatio(11000000, 10));
|
||||||
Assert.assertEquals(4, DynamicInputFormat.getSplitRatio(30, 700));
|
Assert.assertEquals(4, DynamicInputFormat.getSplitRatio(30, 700));
|
||||||
Assert.assertEquals(2, DynamicInputFormat.getSplitRatio(30, 200));
|
Assert.assertEquals(2, DynamicInputFormat.getSplitRatio(30, 200));
|
||||||
|
|
||||||
|
// Tests with negative value configuration
|
||||||
|
Configuration conf = new Configuration();
|
||||||
|
conf.setInt(DistCpConstants.CONF_LABEL_MAX_CHUNKS_TOLERABLE, -1);
|
||||||
|
conf.setInt(DistCpConstants.CONF_LABEL_MAX_CHUNKS_IDEAL, -1);
|
||||||
|
conf.setInt(DistCpConstants.CONF_LABEL_MIN_RECORDS_PER_CHUNK, -1);
|
||||||
|
conf.setInt(DistCpConstants.CONF_LABEL_SPLIT_RATIO, -1);
|
||||||
|
Assert.assertEquals(1,
|
||||||
|
DynamicInputFormat.getSplitRatio(1, 1000000000, conf));
|
||||||
|
Assert.assertEquals(2,
|
||||||
|
DynamicInputFormat.getSplitRatio(11000000, 10, conf));
|
||||||
|
Assert.assertEquals(4, DynamicInputFormat.getSplitRatio(30, 700, conf));
|
||||||
|
Assert.assertEquals(2, DynamicInputFormat.getSplitRatio(30, 200, conf));
|
||||||
|
|
||||||
|
// Tests with valid configuration
|
||||||
|
conf.setInt(DistCpConstants.CONF_LABEL_MAX_CHUNKS_TOLERABLE, 100);
|
||||||
|
conf.setInt(DistCpConstants.CONF_LABEL_MAX_CHUNKS_IDEAL, 30);
|
||||||
|
conf.setInt(DistCpConstants.CONF_LABEL_MIN_RECORDS_PER_CHUNK, 10);
|
||||||
|
conf.setInt(DistCpConstants.CONF_LABEL_SPLIT_RATIO, 53);
|
||||||
|
Assert.assertEquals(53, DynamicInputFormat.getSplitRatio(3, 200, conf));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue