MAPREDUCE-5352. Optimize node local splits generated by CombineFileInputFormat. (sseth)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1509345 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Siddharth Seth 2013-08-01 17:42:24 +00:00
parent 1ff7043a1f
commit 381a4c4213
3 changed files with 202 additions and 82 deletions

View File

@ -195,6 +195,9 @@ Release 2.1.1-beta - UNRELEASED
OPTIMIZATIONS
MAPREDUCE-5352. Optimize node local splits generated by
CombineFileInputFormat. (sseth)
BUG FIXES
MAPREDUCE-5385. Fixed a bug with JobContext getCacheFiles API. (Omkar Vinit

View File

@ -21,13 +21,18 @@ package org.apache.hadoop.mapreduce.lib.input;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedHashSet;
import java.util.HashSet;
import java.util.List;
import java.util.HashMap;
import java.util.Set;
import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
@ -49,6 +54,8 @@ import org.apache.hadoop.net.NodeBase;
import org.apache.hadoop.net.NetworkTopology;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.HashMultiset;
import com.google.common.collect.Multiset;
/**
* An abstract {@link InputFormat} that returns {@link CombineFileSplit}'s in
@ -78,6 +85,8 @@ import com.google.common.annotations.VisibleForTesting;
public abstract class CombineFileInputFormat<K, V>
extends FileInputFormat<K, V> {
private static final Log LOG = LogFactory.getLog(CombineFileInputFormat.class);
public static final String SPLIT_MINSIZE_PERNODE =
"mapreduce.input.fileinputformat.split.minsize.per.node";
public static final String SPLIT_MINSIZE_PERRACK =
@ -185,6 +194,8 @@ public abstract class CombineFileInputFormat<K, V>
maxSize = maxSplitSize;
} else {
maxSize = conf.getLong("mapreduce.input.fileinputformat.split.maxsize", 0);
// If maxSize is not configured, a single split will be generated per
// node.
}
if (minSizeNode != 0 && maxSize != 0 && minSizeNode > maxSize) {
throw new IOException("Minimum split size pernode " + minSizeNode +
@ -257,8 +268,8 @@ public abstract class CombineFileInputFormat<K, V>
new HashMap<OneBlockInfo, String[]>();
// mapping from a node to the list of blocks that it contains
HashMap<String, List<OneBlockInfo>> nodeToBlocks =
new HashMap<String, List<OneBlockInfo>>();
HashMap<String, Set<OneBlockInfo>> nodeToBlocks =
new HashMap<String, Set<OneBlockInfo>>();
files = new OneFileInfo[stats.size()];
if (stats.size() == 0) {
@ -279,9 +290,9 @@ public abstract class CombineFileInputFormat<K, V>
}
@VisibleForTesting
void createSplits(HashMap<String, List<OneBlockInfo>> nodeToBlocks,
HashMap<OneBlockInfo, String[]> blockToNodes,
HashMap<String, List<OneBlockInfo>> rackToBlocks,
void createSplits(Map<String, Set<OneBlockInfo>> nodeToBlocks,
Map<OneBlockInfo, String[]> blockToNodes,
Map<String, List<OneBlockInfo>> rackToBlocks,
long totLength,
long maxSize,
long minSizeNode,
@ -289,83 +300,118 @@ public abstract class CombineFileInputFormat<K, V>
List<InputSplit> splits
) {
ArrayList<OneBlockInfo> validBlocks = new ArrayList<OneBlockInfo>();
Set<String> nodes = new HashSet<String>();
long curSplitSize = 0;
int numNodes = nodeToBlocks.size();
int totalNodes = nodeToBlocks.size();
long totalLength = totLength;
Multiset<String> splitsPerNode = HashMultiset.create();
Set<String> completedNodes = new HashSet<String>();
while(true) {
// it is allowed for maxSize to be 0. Disable smoothing load for such cases
int avgSplitsPerNode = maxSize > 0 && numNodes > 0 ?
((int) (totalLength/maxSize))/numNodes
: Integer.MAX_VALUE;
int maxSplitsByNodeOnly = (avgSplitsPerNode > 0) ? avgSplitsPerNode : 1;
numNodes = 0;
// process all nodes and create splits that are local to a node.
for (Iterator<Map.Entry<String, List<OneBlockInfo>>> iter = nodeToBlocks
// process all nodes and create splits that are local to a node. Generate
// one split per node iteration, and walk over nodes multiple times to
// distribute the splits across nodes.
for (Iterator<Map.Entry<String, Set<OneBlockInfo>>> iter = nodeToBlocks
.entrySet().iterator(); iter.hasNext();) {
Map.Entry<String, List<OneBlockInfo>> one = iter.next();
nodes.add(one.getKey());
List<OneBlockInfo> blocksInNode = one.getValue();
Map.Entry<String, Set<OneBlockInfo>> one = iter.next();
String node = one.getKey();
// Skip the node if it has previously been marked as completed.
if (completedNodes.contains(node)) {
continue;
}
Set<OneBlockInfo> blocksInCurrentNode = one.getValue();
// for each block, copy it into validBlocks. Delete it from
// blockToNodes so that the same block does not appear in
// two different splits.
int splitsInNode = 0;
for (OneBlockInfo oneblock : blocksInNode) {
if (blockToNodes.containsKey(oneblock)) {
validBlocks.add(oneblock);
blockToNodes.remove(oneblock);
curSplitSize += oneblock.length;
Iterator<OneBlockInfo> oneBlockIter = blocksInCurrentNode.iterator();
while (oneBlockIter.hasNext()) {
OneBlockInfo oneblock = oneBlockIter.next();
// if the accumulated split size exceeds the maximum, then
// create this split.
if (maxSize != 0 && curSplitSize >= maxSize) {
// create an input split and add it to the splits array
addCreatedSplit(splits, nodes, validBlocks);
totalLength -= curSplitSize;
curSplitSize = 0;
validBlocks.clear();
splitsInNode++;
if (splitsInNode == maxSplitsByNodeOnly) {
// stop grouping on a node so as not to create
// disproportionately more splits on a node because it happens
// to have many blocks
// consider only these nodes in next round of grouping because
// they have leftover blocks that may need to be grouped
numNodes++;
break;
}
// Remove all blocks which may already have been assigned to other
// splits.
if(!blockToNodes.containsKey(oneblock)) {
oneBlockIter.remove();
continue;
}
validBlocks.add(oneblock);
blockToNodes.remove(oneblock);
curSplitSize += oneblock.length;
// if the accumulated split size exceeds the maximum, then
// create this split.
if (maxSize != 0 && curSplitSize >= maxSize) {
// create an input split and add it to the splits array
addCreatedSplit(splits, Collections.singleton(node), validBlocks);
totalLength -= curSplitSize;
curSplitSize = 0;
splitsPerNode.add(node);
// Remove entries from blocksInNode so that we don't walk these
// again.
blocksInCurrentNode.removeAll(validBlocks);
validBlocks.clear();
// Done creating a single split for this node. Move on to the next
// node so that splits are distributed across nodes.
break;
}
}
if (validBlocks.size() != 0) {
// This implies that the last few blocks (or all in case maxSize=0)
// were not part of a split. The node is complete.
// if there were any blocks left over and their combined size is
// larger than minSplitNode, then combine them into one split.
// Otherwise add them back to the unprocessed pool. It is likely
// that they will be combined with other blocks from the
// same rack later on.
// This condition also kicks in when max split size is not set. All
// blocks on a node will be grouped together into a single split.
if (minSizeNode != 0 && curSplitSize >= minSizeNode
&& splitsPerNode.count(node) == 0) {
// haven't created any split on this machine. so its ok to add a
// smaller one for parallelism. Otherwise group it in the rack for
// balanced size create an input split and add it to the splits
// array
addCreatedSplit(splits, Collections.singleton(node), validBlocks);
totalLength -= curSplitSize;
splitsPerNode.add(node);
// Remove entries from blocksInNode so that we don't walk this again.
blocksInCurrentNode.removeAll(validBlocks);
// The node is done. This was the last set of blocks for this node.
} else {
// Put the unplaced blocks back into the pool for later rack-allocation.
for (OneBlockInfo oneblock : validBlocks) {
blockToNodes.put(oneblock, oneblock.hosts);
}
}
validBlocks.clear();
curSplitSize = 0;
completedNodes.add(node);
} else { // No in-flight blocks.
if (blocksInCurrentNode.size() == 0) {
// Node is done. All blocks were fit into node-local splits.
completedNodes.add(node);
} // else Run through the node again.
}
// if there were any blocks left over and their combined size is
// larger than minSplitNode, then combine them into one split.
// Otherwise add them back to the unprocessed pool. It is likely
// that they will be combined with other blocks from the
// same rack later on.
if (minSizeNode != 0 && curSplitSize >= minSizeNode
&& splitsInNode == 0) {
// haven't created any split on this machine. so its ok to add a
// smaller
// one for parallelism. Otherwise group it in the rack for balanced
// size
// create an input split and add it to the splits array
addCreatedSplit(splits, nodes, validBlocks);
totalLength -= curSplitSize;
} else {
for (OneBlockInfo oneblock : validBlocks) {
blockToNodes.put(oneblock, oneblock.hosts);
}
}
validBlocks.clear();
nodes.clear();
curSplitSize = 0;
}
if(!(numNodes>0 && totalLength>0)) {
// Check if node-local assignments are complete.
if (completedNodes.size() == totalNodes || totalLength == 0) {
// All nodes have been walked over and marked as completed or all blocks
// have been assigned. The rest should be handled via rackLock assignment.
LOG.info("DEBUG: Terminated node allocation with : CompletedNodes: "
+ completedNodes.size() + ", size left: " + totalLength);
break;
}
}
@ -514,7 +560,7 @@ public abstract class CombineFileInputFormat<K, V>
boolean isSplitable,
HashMap<String, List<OneBlockInfo>> rackToBlocks,
HashMap<OneBlockInfo, String[]> blockToNodes,
HashMap<String, List<OneBlockInfo>> nodeToBlocks,
HashMap<String, Set<OneBlockInfo>> nodeToBlocks,
HashMap<String, Set<String>> rackToNodes,
long maxSize)
throws IOException {
@ -588,10 +634,10 @@ public abstract class CombineFileInputFormat<K, V>
@VisibleForTesting
static void populateBlockInfo(OneBlockInfo[] blocks,
HashMap<String, List<OneBlockInfo>> rackToBlocks,
HashMap<OneBlockInfo, String[]> blockToNodes,
HashMap<String, List<OneBlockInfo>> nodeToBlocks,
HashMap<String, Set<String>> rackToNodes) {
Map<String, List<OneBlockInfo>> rackToBlocks,
Map<OneBlockInfo, String[]> blockToNodes,
Map<String, Set<OneBlockInfo>> nodeToBlocks,
Map<String, Set<String>> rackToNodes) {
for (OneBlockInfo oneblock : blocks) {
// add this block to the block --> node locations map
blockToNodes.put(oneblock, oneblock.hosts);
@ -623,9 +669,9 @@ public abstract class CombineFileInputFormat<K, V>
// add this block to the node --> block map
for (int j = 0; j < oneblock.hosts.length; j++) {
String node = oneblock.hosts[j];
List<OneBlockInfo> blklist = nodeToBlocks.get(node);
Set<OneBlockInfo> blklist = nodeToBlocks.get(node);
if (blklist == null) {
blklist = new ArrayList<OneBlockInfo>();
blklist = new LinkedHashSet<OneBlockInfo>();
nodeToBlocks.put(node, blklist);
}
blklist.add(oneblock);
@ -689,7 +735,7 @@ public abstract class CombineFileInputFormat<K, V>
return fs.getFileBlockLocations(stat, 0, stat.getLen());
}
private static void addHostToRack(HashMap<String, Set<String>> rackToNodes,
private static void addHostToRack(Map<String, Set<String>> rackToNodes,
String rack, String host) {
Set<String> hosts = rackToNodes.get(rack);
if (hosts == null) {

View File

@ -20,23 +20,31 @@ package org.apache.hadoop.mapreduce.lib.input;
import java.io.IOException;
import java.io.OutputStream;
import java.net.URI;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.util.Set;
import java.util.zip.GZIPOutputStream;
import java.util.TreeMap;
import java.util.concurrent.TimeoutException;
import java.util.zip.GZIPOutputStream;
import junit.framework.Assert;
import junit.framework.TestCase;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.HdfsBlockLocation;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
@ -716,6 +724,69 @@ public class TestCombineFileInputFormat extends TestCase {
DFSTestUtil.waitReplication(fileSys, name, replication);
}
public void testNodeDistribution() throws IOException, InterruptedException {
DummyInputFormat inFormat = new DummyInputFormat();
int numBlocks = 60;
long totLength = 0;
long blockSize = 100;
int numNodes = 10;
long minSizeNode = 50;
long minSizeRack = 50;
int maxSplitSize = 200; // 4 blocks per split.
String[] locations = new String[numNodes];
for (int i = 0; i < numNodes; i++) {
locations[i] = "h" + i;
}
String[] racks = new String[0];
Path path = new Path("hdfs://file");
OneBlockInfo[] blocks = new OneBlockInfo[numBlocks];
int hostCountBase = 0;
// Generate block list. Replication 3 per block.
for (int i = 0; i < numBlocks; i++) {
int localHostCount = hostCountBase;
String[] blockHosts = new String[3];
for (int j = 0; j < 3; j++) {
int hostNum = localHostCount % numNodes;
blockHosts[j] = "h" + hostNum;
localHostCount++;
}
hostCountBase++;
blocks[i] = new OneBlockInfo(path, i * blockSize, blockSize, blockHosts,
racks);
totLength += blockSize;
}
List<InputSplit> splits = new ArrayList<InputSplit>();
HashMap<String, Set<String>> rackToNodes = new HashMap<String, Set<String>>();
HashMap<String, List<OneBlockInfo>> rackToBlocks = new HashMap<String, List<OneBlockInfo>>();
HashMap<OneBlockInfo, String[]> blockToNodes = new HashMap<OneBlockInfo, String[]>();
Map<String, Set<OneBlockInfo>> nodeToBlocks = new TreeMap<String, Set<OneBlockInfo>>();
OneFileInfo.populateBlockInfo(blocks, rackToBlocks, blockToNodes,
nodeToBlocks, rackToNodes);
inFormat.createSplits(nodeToBlocks, blockToNodes, rackToBlocks, totLength,
maxSplitSize, minSizeNode, minSizeRack, splits);
int expectedSplitCount = (int) (totLength / maxSplitSize);
Assert.assertEquals(expectedSplitCount, splits.size());
// Ensure 90+% of the splits have node local blocks.
// 100% locality may not always be achieved.
int numLocalSplits = 0;
for (InputSplit inputSplit : splits) {
Assert.assertEquals(maxSplitSize, inputSplit.getLength());
if (inputSplit.getLocations().length == 1) {
numLocalSplits++;
}
}
Assert.assertTrue(numLocalSplits >= 0.9 * splits.size());
}
public void testNodeInputSplit() throws IOException, InterruptedException {
// Regression test for MAPREDUCE-4892. There are 2 nodes with all blocks on
// both nodes. The grouping ensures that both nodes get splits instead of
@ -744,8 +815,8 @@ public class TestCombineFileInputFormat extends TestCase {
new HashMap<String, List<OneBlockInfo>>();
HashMap<OneBlockInfo, String[]> blockToNodes =
new HashMap<OneBlockInfo, String[]>();
HashMap<String, List<OneBlockInfo>> nodeToBlocks =
new HashMap<String, List<OneBlockInfo>>();
HashMap<String, Set<OneBlockInfo>> nodeToBlocks =
new HashMap<String, Set<OneBlockInfo>>();
OneFileInfo.populateBlockInfo(blocks, rackToBlocks, blockToNodes,
nodeToBlocks, rackToNodes);