MAPREDUCE-6165. [JDK8] TestCombineFileInputFormat failed on JDK8. Contributed by Akira AJISAKA.

(cherry picked from commit 551615fa13)
This commit is contained in:
Tsuyoshi Ozawa 2015-05-05 10:23:13 +09:00
parent f71c49c5e3
commit 4e96175b33
3 changed files with 805 additions and 362 deletions

View File

@ -116,6 +116,9 @@ Release 2.8.0 - UNRELEASED
MAPREDUCE-5649. Reduce cannot use more than 2G memory for the final merge MAPREDUCE-5649. Reduce cannot use more than 2G memory for the final merge
(Gera Shegalov via jlowe) (Gera Shegalov via jlowe)
MAPREDUCE-6165. [JDK8] TestCombineFileInputFormat failed on JDK8.
(Akira AJISAKA via ozawa)
Release 2.7.1 - UNRELEASED Release 2.7.1 - UNRELEASED
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -29,7 +29,6 @@ import java.util.HashMap;
import java.util.Set; import java.util.Set;
import java.util.Iterator; import java.util.Iterator;
import java.util.Map; import java.util.Map;
import java.util.Map.Entry;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -289,6 +288,26 @@ public abstract class CombineFileInputFormat<K, V>
maxSize, minSizeNode, minSizeRack, splits); maxSize, minSizeNode, minSizeRack, splits);
} }
/**
* Process all the nodes and create splits that are local to a node.
* Generate one split per node iteration, and walk over nodes multiple times
* to distribute the splits across nodes.
* <p>
* Note: The order of processing the nodes is undetermined because the
* implementation of nodeToBlocks is {@link java.util.HashMap} and its order
* of the entries is undetermined.
* @param nodeToBlocks Mapping from a node to the list of blocks that
* it contains.
* @param blockToNodes Mapping from a block to the nodes on which
* it has replicas.
* @param rackToBlocks Mapping from a rack name to the list of blocks it has.
* @param totLength Total length of the input files.
* @param maxSize Max size of each split.
* If set to 0, disable smoothing load.
* @param minSizeNode Minimum split size per node.
* @param minSizeRack Minimum split size per rack.
* @param splits New splits created by this method are added to the list.
*/
@VisibleForTesting @VisibleForTesting
void createSplits(Map<String, Set<OneBlockInfo>> nodeToBlocks, void createSplits(Map<String, Set<OneBlockInfo>> nodeToBlocks,
Map<OneBlockInfo, String[]> blockToNodes, Map<OneBlockInfo, String[]> blockToNodes,
@ -309,11 +328,6 @@ public abstract class CombineFileInputFormat<K, V>
Set<String> completedNodes = new HashSet<String>(); Set<String> completedNodes = new HashSet<String>();
while(true) { while(true) {
// it is allowed for maxSize to be 0. Disable smoothing load for such cases
// process all nodes and create splits that are local to a node. Generate
// one split per node iteration, and walk over nodes multiple times to
// distribute the splits across nodes.
for (Iterator<Map.Entry<String, Set<OneBlockInfo>>> iter = nodeToBlocks for (Iterator<Map.Entry<String, Set<OneBlockInfo>>> iter = nodeToBlocks
.entrySet().iterator(); iter.hasNext();) { .entrySet().iterator(); iter.hasNext();) {
Map.Entry<String, Set<OneBlockInfo>> one = iter.next(); Map.Entry<String, Set<OneBlockInfo>> one = iter.next();