From c3981d3f2c41e528e85c9be0490b7e8ba93e53cd Mon Sep 17 00:00:00 2001 From: Vinod Kumar Vavilapalli Date: Wed, 27 Feb 2013 18:53:03 +0000 Subject: [PATCH] MAPREDUCE-4892. Modify CombineFileInputFormat to not skew input slits' allocation on small clusters. Contributed by Bikas Saha. svn merge --ignore-ancestry -c 1450912 ../../trunk/ git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1450915 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-mapreduce-project/CHANGES.txt | 3 + .../lib/input/CombineFileInputFormat.java | 208 +++++++++++------- .../lib/input/TestCombineFileInputFormat.java | 123 ++++++++--- 3 files changed, 223 insertions(+), 111 deletions(-) diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index ec1610e4410..b8bfb4e0ca1 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -11,6 +11,9 @@ Release 2.0.4-beta - UNRELEASED MAPREDUCE-5033. mapred shell script should respect usage flags (--help -help -h). (Andrew Wang via atm) + MAPREDUCE-4892. Modify CombineFileInputFormat to not skew input slits' + allocation on small clusters. (Bikas Saha via vinodkv) + OPTIMIZATIONS BUG FIXES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/CombineFileInputFormat.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/CombineFileInputFormat.java index 42ab9f3b952..504c7b711a2 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/CombineFileInputFormat.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/CombineFileInputFormat.java @@ -49,6 +49,8 @@ import org.apache.hadoop.net.NodeBase; import org.apache.hadoop.net.NetworkTopology; +import com.google.common.annotations.VisibleForTesting; + /** * An abstract {@link InputFormat} that returns {@link CombineFileSplit}'s in * {@link InputFormat#getSplits(JobContext)} method. @@ -76,7 +78,7 @@ @InterfaceStability.Stable public abstract class CombineFileInputFormat extends FileInputFormat { - + public static final String SPLIT_MINSIZE_PERNODE = "mapreduce.input.fileinputformat.split.minsize.per.node"; public static final String SPLIT_MINSIZE_PERRACK = @@ -163,7 +165,6 @@ public CombineFileInputFormat() { @Override public List getSplits(JobContext job) throws IOException { - long minSizeNode = 0; long minSizeRack = 0; long maxSize = 0; @@ -286,56 +287,100 @@ private void getMoreSplits(JobContext job, Path[] paths, rackToNodes, maxSize); totLength += files[i].getLength(); } + createSplits(nodeToBlocks, blockToNodes, rackToBlocks, totLength, + maxSize, minSizeNode, minSizeRack, splits); + } + @VisibleForTesting + void createSplits(HashMap> nodeToBlocks, + HashMap blockToNodes, + HashMap> rackToBlocks, + long totLength, + long maxSize, + long minSizeNode, + long minSizeRack, + List splits + ) { ArrayList validBlocks = new ArrayList(); Set nodes = new HashSet(); long curSplitSize = 0; + + int numNodes = nodeToBlocks.size(); + long totalLength = totLength; - // process all nodes and create splits that are local - // to a node. - for (Iterator>> iter = nodeToBlocks.entrySet().iterator(); - iter.hasNext();) { + while(true) { + // it is allowed for maxSize to be 0. Disable smoothing load for such cases + int avgSplitsPerNode = maxSize > 0 && numNodes > 0 ? + ((int) (totalLength/maxSize))/numNodes + : Integer.MAX_VALUE; + int maxSplitsByNodeOnly = (avgSplitsPerNode > 0) ? avgSplitsPerNode : 1; + numNodes = 0; - Map.Entry> one = iter.next(); - nodes.add(one.getKey()); - List blocksInNode = one.getValue(); + // process all nodes and create splits that are local to a node. + for (Iterator>> iter = nodeToBlocks + .entrySet().iterator(); iter.hasNext();) { + Map.Entry> one = iter.next(); + nodes.add(one.getKey()); + List blocksInNode = one.getValue(); - // for each block, copy it into validBlocks. Delete it from - // blockToNodes so that the same block does not appear in - // two different splits. - for (OneBlockInfo oneblock : blocksInNode) { - if (blockToNodes.containsKey(oneblock)) { - validBlocks.add(oneblock); - blockToNodes.remove(oneblock); - curSplitSize += oneblock.length; + // for each block, copy it into validBlocks. Delete it from + // blockToNodes so that the same block does not appear in + // two different splits. + int splitsInNode = 0; + for (OneBlockInfo oneblock : blocksInNode) { + if (blockToNodes.containsKey(oneblock)) { + validBlocks.add(oneblock); + blockToNodes.remove(oneblock); + curSplitSize += oneblock.length; - // if the accumulated split size exceeds the maximum, then - // create this split. - if (maxSize != 0 && curSplitSize >= maxSize) { - // create an input split and add it to the splits array - addCreatedSplit(splits, nodes, validBlocks); - curSplitSize = 0; - validBlocks.clear(); + // if the accumulated split size exceeds the maximum, then + // create this split. + if (maxSize != 0 && curSplitSize >= maxSize) { + // create an input split and add it to the splits array + addCreatedSplit(splits, nodes, validBlocks); + totalLength -= curSplitSize; + curSplitSize = 0; + validBlocks.clear(); + splitsInNode++; + if (splitsInNode == maxSplitsByNodeOnly) { + // stop grouping on a node so as not to create + // disproportionately more splits on a node because it happens + // to have many blocks + // consider only these nodes in next round of grouping because + // they have leftover blocks that may need to be grouped + numNodes++; + break; + } + } } } - } - // if there were any blocks left over and their combined size is - // larger than minSplitNode, then combine them into one split. - // Otherwise add them back to the unprocessed pool. It is likely - // that they will be combined with other blocks from the - // same rack later on. - if (minSizeNode != 0 && curSplitSize >= minSizeNode) { - // create an input split and add it to the splits array - addCreatedSplit(splits, nodes, validBlocks); - } else { - for (OneBlockInfo oneblock : validBlocks) { - blockToNodes.put(oneblock, oneblock.hosts); + // if there were any blocks left over and their combined size is + // larger than minSplitNode, then combine them into one split. + // Otherwise add them back to the unprocessed pool. It is likely + // that they will be combined with other blocks from the + // same rack later on. + if (minSizeNode != 0 && curSplitSize >= minSizeNode + && splitsInNode == 0) { + // haven't created any split on this machine. so its ok to add a + // smaller + // one for parallelism. Otherwise group it in the rack for balanced + // size + // create an input split and add it to the splits array + addCreatedSplit(splits, nodes, validBlocks); + totalLength -= curSplitSize; + } else { + for (OneBlockInfo oneblock : validBlocks) { + blockToNodes.put(oneblock, oneblock.hosts); + } } + validBlocks.clear(); + nodes.clear(); + curSplitSize = 0; + } + + if(!(numNodes>0 && totalLength>0)) { + break; } - validBlocks.clear(); - nodes.clear(); - curSplitSize = 0; } // if blocks in a rack are below the specified minimum size, then keep them @@ -458,7 +503,6 @@ private void addCreatedSplit(List splitList, offset[i] = validBlocks.get(i).offset; length[i] = validBlocks.get(i).length; } - // add this split to the list that is returned CombineFileSplit thissplit = new CombineFileSplit(fl, offset, length, locations.toArray(new String[0])); @@ -474,7 +518,8 @@ public abstract RecordReader createRecordReader(InputSplit split, /** * information about one file from the File System */ - private static class OneFileInfo { + @VisibleForTesting + static class OneFileInfo { private long fileSize; // size of the file private OneBlockInfo[] blocks; // all blocks in this file @@ -545,45 +590,55 @@ private static class OneFileInfo { } blocks = blocksList.toArray(new OneBlockInfo[blocksList.size()]); } + + populateBlockInfo(blocks, rackToBlocks, blockToNodes, + nodeToBlocks, rackToNodes); + } + } + + @VisibleForTesting + static void populateBlockInfo(OneBlockInfo[] blocks, + HashMap> rackToBlocks, + HashMap blockToNodes, + HashMap> nodeToBlocks, + HashMap> rackToNodes) { + for (OneBlockInfo oneblock : blocks) { + // add this block to the block --> node locations map + blockToNodes.put(oneblock, oneblock.hosts); - for (OneBlockInfo oneblock : blocks) { - // add this block to the block --> node locations map - blockToNodes.put(oneblock, oneblock.hosts); + // For blocks that do not have host/rack information, + // assign to default rack. + String[] racks = null; + if (oneblock.hosts.length == 0) { + racks = new String[]{NetworkTopology.DEFAULT_RACK}; + } else { + racks = oneblock.racks; + } - // For blocks that do not have host/rack information, - // assign to default rack. - String[] racks = null; - if (oneblock.hosts.length == 0) { - racks = new String[]{NetworkTopology.DEFAULT_RACK}; - } else { - racks = oneblock.racks; + // add this block to the rack --> block map + for (int j = 0; j < racks.length; j++) { + String rack = racks[j]; + List blklist = rackToBlocks.get(rack); + if (blklist == null) { + blklist = new ArrayList(); + rackToBlocks.put(rack, blklist); } - - // add this block to the rack --> block map - for (int j = 0; j < racks.length; j++) { - String rack = racks[j]; - List blklist = rackToBlocks.get(rack); - if (blklist == null) { - blklist = new ArrayList(); - rackToBlocks.put(rack, blklist); - } - blklist.add(oneblock); - if (!racks[j].equals(NetworkTopology.DEFAULT_RACK)) { - // Add this host to rackToNodes map - addHostToRack(rackToNodes, racks[j], oneblock.hosts[j]); - } + blklist.add(oneblock); + if (!racks[j].equals(NetworkTopology.DEFAULT_RACK)) { + // Add this host to rackToNodes map + addHostToRack(rackToNodes, racks[j], oneblock.hosts[j]); } + } - // add this block to the node --> block map - for (int j = 0; j < oneblock.hosts.length; j++) { - String node = oneblock.hosts[j]; - List blklist = nodeToBlocks.get(node); - if (blklist == null) { - blklist = new ArrayList(); - nodeToBlocks.put(node, blklist); - } - blklist.add(oneblock); + // add this block to the node --> block map + for (int j = 0; j < oneblock.hosts.length; j++) { + String node = oneblock.hosts[j]; + List blklist = nodeToBlocks.get(node); + if (blklist == null) { + blklist = new ArrayList(); + nodeToBlocks.put(node, blklist); } + blklist.add(oneblock); } } } @@ -600,7 +655,8 @@ OneBlockInfo[] getBlocks() { /** * information about one block from the File System */ - private static class OneBlockInfo { + @VisibleForTesting + static class OneBlockInfo { Path onepath; // name of this file long offset; // offset in file long length; // length of this block diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestCombineFileInputFormat.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestCombineFileInputFormat.java index 889443a84c3..07ff2922d06 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestCombineFileInputFormat.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestCombineFileInputFormat.java @@ -20,11 +20,14 @@ import java.io.IOException; import java.io.OutputStream; import java.net.URI; +import java.util.HashMap; import java.util.List; import java.util.ArrayList; +import java.util.Set; import java.util.zip.GZIPOutputStream; import java.util.concurrent.TimeoutException; +import junit.framework.Assert; import junit.framework.TestCase; import org.apache.hadoop.fs.*; @@ -42,9 +45,13 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.TaskAttemptID; import org.apache.hadoop.mapreduce.TaskType; +import org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat.OneBlockInfo; +import org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat.OneFileInfo; import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl; import org.junit.Test; +import com.google.common.collect.HashMultiset; + public class TestCombineFileInputFormat extends TestCase { private static final String rack1[] = new String[] { @@ -476,23 +483,23 @@ public void testSplitPlacement() throws Exception { assertEquals(BLOCKSIZE, fileSplit.getLength(1)); assertEquals("host3.rack3.com", fileSplit.getLocations()[0]); fileSplit = (CombineFileSplit) splits.get(1); - assertEquals(file3.getName(), fileSplit.getPath(0).getName()); - assertEquals(2 * BLOCKSIZE, fileSplit.getOffset(0)); + assertEquals(file2.getName(), fileSplit.getPath(0).getName()); + assertEquals(0, fileSplit.getOffset(0)); assertEquals(BLOCKSIZE, fileSplit.getLength(0)); - assertEquals(file4.getName(), fileSplit.getPath(1).getName()); - assertEquals(0, fileSplit.getOffset(1)); + assertEquals(file2.getName(), fileSplit.getPath(1).getName()); + assertEquals(BLOCKSIZE, fileSplit.getOffset(1)); assertEquals(BLOCKSIZE, fileSplit.getLength(1)); - assertEquals("host3.rack3.com", fileSplit.getLocations()[0]); + assertEquals("host2.rack2.com", fileSplit.getLocations()[0]); fileSplit = (CombineFileSplit) splits.get(2); assertEquals(2, fileSplit.getNumPaths()); assertEquals(1, fileSplit.getLocations().length); - assertEquals(file4.getName(), fileSplit.getPath(0).getName()); - assertEquals(BLOCKSIZE, fileSplit.getOffset(0)); + assertEquals(file1.getName(), fileSplit.getPath(0).getName()); + assertEquals(0, fileSplit.getOffset(0)); assertEquals(BLOCKSIZE, fileSplit.getLength(0)); - assertEquals(file4.getName(), fileSplit.getPath(1).getName()); + assertEquals(file3.getName(), fileSplit.getPath(1).getName()); assertEquals(2 * BLOCKSIZE, fileSplit.getOffset(1)); assertEquals(BLOCKSIZE, fileSplit.getLength(1)); - assertEquals("host3.rack3.com", fileSplit.getLocations()[0]); + assertEquals("host1.rack1.com", fileSplit.getLocations()[0]); // maximum split size is 3 blocks inFormat = new DummyInputFormat(); @@ -504,7 +511,7 @@ public void testSplitPlacement() throws Exception { for (InputSplit split : splits) { System.out.println("File split(Test5): " + split); } - assertEquals(4, splits.size()); + assertEquals(3, splits.size()); fileSplit = (CombineFileSplit) splits.get(0); assertEquals(3, fileSplit.getNumPaths()); assertEquals(1, fileSplit.getLocations().length); @@ -519,32 +526,28 @@ public void testSplitPlacement() throws Exception { assertEquals(BLOCKSIZE, fileSplit.getLength(2)); assertEquals("host3.rack3.com", fileSplit.getLocations()[0]); fileSplit = (CombineFileSplit) splits.get(1); - assertEquals(file4.getName(), fileSplit.getPath(0).getName()); - assertEquals(0, fileSplit.getOffset(0)); - assertEquals(BLOCKSIZE, fileSplit.getLength(0)); - assertEquals(file4.getName(), fileSplit.getPath(1).getName()); - assertEquals(BLOCKSIZE, fileSplit.getOffset(1)); - assertEquals(BLOCKSIZE, fileSplit.getLength(1)); - assertEquals(file4.getName(), fileSplit.getPath(2).getName()); - assertEquals( 2 * BLOCKSIZE, fileSplit.getOffset(2)); - assertEquals(BLOCKSIZE, fileSplit.getLength(2)); - assertEquals("host3.rack3.com", fileSplit.getLocations()[0]); - fileSplit = (CombineFileSplit) splits.get(2); - assertEquals(2, fileSplit.getNumPaths()); - assertEquals(1, fileSplit.getLocations().length); assertEquals(file2.getName(), fileSplit.getPath(0).getName()); assertEquals(0, fileSplit.getOffset(0)); assertEquals(BLOCKSIZE, fileSplit.getLength(0)); assertEquals(file2.getName(), fileSplit.getPath(1).getName()); assertEquals(BLOCKSIZE, fileSplit.getOffset(1)); assertEquals(BLOCKSIZE, fileSplit.getLength(1)); + assertEquals(file4.getName(), fileSplit.getPath(2).getName()); + assertEquals(0, fileSplit.getOffset(2)); + assertEquals(BLOCKSIZE, fileSplit.getLength(2)); assertEquals("host2.rack2.com", fileSplit.getLocations()[0]); - fileSplit = (CombineFileSplit) splits.get(3); - assertEquals(1, fileSplit.getNumPaths()); + fileSplit = (CombineFileSplit) splits.get(2); + assertEquals(3, fileSplit.getNumPaths()); assertEquals(1, fileSplit.getLocations().length); assertEquals(file1.getName(), fileSplit.getPath(0).getName()); assertEquals(0, fileSplit.getOffset(0)); assertEquals(BLOCKSIZE, fileSplit.getLength(0)); + assertEquals(file4.getName(), fileSplit.getPath(1).getName()); + assertEquals(BLOCKSIZE, fileSplit.getOffset(1)); + assertEquals(BLOCKSIZE, fileSplit.getLength(1)); + assertEquals(file4.getName(), fileSplit.getPath(2).getName()); + assertEquals(2*BLOCKSIZE, fileSplit.getOffset(2)); + assertEquals(BLOCKSIZE, fileSplit.getLength(2)); assertEquals("host1.rack1.com", fileSplit.getLocations()[0]); // maximum split size is 4 blocks @@ -713,6 +716,56 @@ private static void writeDataAndSetReplication(FileSystem fileSys, Path name, DFSTestUtil.waitReplication(fileSys, name, replication); } + public void testNodeInputSplit() throws IOException, InterruptedException { + // Regression test for MAPREDUCE-4892. There are 2 nodes with all blocks on + // both nodes. The grouping ensures that both nodes get splits instead of + // just the first node + DummyInputFormat inFormat = new DummyInputFormat(); + int numBlocks = 12; + long totLength = 0; + long blockSize = 100; + long maxSize = 200; + long minSizeNode = 50; + long minSizeRack = 50; + String[] locations = { "h1", "h2" }; + String[] racks = new String[0]; + Path path = new Path("hdfs://file"); + + OneBlockInfo[] blocks = new OneBlockInfo[numBlocks]; + for(int i=0; i splits = new ArrayList(); + HashMap> rackToNodes = + new HashMap>(); + HashMap> rackToBlocks = + new HashMap>(); + HashMap blockToNodes = + new HashMap(); + HashMap> nodeToBlocks = + new HashMap>(); + + OneFileInfo.populateBlockInfo(blocks, rackToBlocks, blockToNodes, + nodeToBlocks, rackToNodes); + + inFormat.createSplits(nodeToBlocks, blockToNodes, rackToBlocks, totLength, + maxSize, minSizeNode, minSizeRack, splits); + + int expectedSplitCount = (int)(totLength/maxSize); + Assert.assertEquals(expectedSplitCount, splits.size()); + HashMultiset nodeSplits = HashMultiset.create(); + for(int i=0; i