From bd23a2ff22dba8a5203e8e498244f985e728da51 Mon Sep 17 00:00:00 2001 From: Karthik Kambatla Date: Wed, 18 Jun 2014 23:28:50 +0000 Subject: [PATCH] MAPREDUCE-5896. InputSplits should indicate which locations have the block cached in memory. (Sandy Ryza via kasha) git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1603670 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-mapreduce-project/CHANGES.txt | 3 ++ .../apache/hadoop/mapred/FileInputFormat.java | 50 +++++++++++++++---- .../org/apache/hadoop/mapred/FileSplit.java | 22 +++++++- .../mapred/InputSplitWithLocationInfo.java | 39 +++++++++++++++ .../hadoop/mapred/SplitLocationInfo.java | 46 +++++++++++++++++ .../apache/hadoop/mapreduce/InputSplit.java | 17 +++++++ .../mapreduce/lib/input/FileInputFormat.java | 19 +++++-- .../hadoop/mapreduce/lib/input/FileSplit.java | 34 +++++++++++++ .../hadoop/mapred/TestFileInputFormat.java | 28 ++++++++++- .../lib/input/TestFileInputFormat.java | 29 +++++++++-- 10 files changed, 269 insertions(+), 18 deletions(-) create mode 100644 hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/InputSplitWithLocationInfo.java create mode 100644 hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/SplitLocationInfo.java diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index f3c4d0aa473..ae327063310 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -213,6 +213,9 @@ Release 2.5.0 - UNRELEASED MAPREDUCE-5834. Increased test-timeouts in TestGridMixClasses to avoid occassional failures. (Mit Desai via vinodkv) + MAPREDUCE-5896. InputSplits should indicate which locations have the block + cached in memory. (Sandy Ryza via kasha) + OPTIMIZATIONS BUG FIXES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileInputFormat.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileInputFormat.java index 9863427076e..0ae56717ab9 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileInputFormat.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileInputFormat.java @@ -295,6 +295,15 @@ public abstract class FileInputFormat implements InputFormat { String[] hosts) { return new FileSplit(file, start, length, hosts); } + + /** + * A factory that makes the split for this class. It can be overridden + * by sub-classes to make sub-types + */ + protected FileSplit makeSplit(Path file, long start, long length, + String[] hosts, String[] inMemoryHosts) { + return new FileSplit(file, start, length, hosts, inMemoryHosts); + } /** Splits files returned by {@link #listStatus(JobConf)} when * they're too big.*/ @@ -337,22 +346,22 @@ public abstract class FileInputFormat implements InputFormat { long bytesRemaining = length; while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) { - String[] splitHosts = getSplitHosts(blkLocations, + String[][] splitHosts = getSplitHostsAndCachedHosts(blkLocations, length-bytesRemaining, splitSize, clusterMap); splits.add(makeSplit(path, length-bytesRemaining, splitSize, - splitHosts)); + splitHosts[0], splitHosts[1])); bytesRemaining -= splitSize; } if (bytesRemaining != 0) { - String[] splitHosts = getSplitHosts(blkLocations, length + String[][] splitHosts = getSplitHostsAndCachedHosts(blkLocations, length - bytesRemaining, bytesRemaining, clusterMap); splits.add(makeSplit(path, length - bytesRemaining, bytesRemaining, - splitHosts)); + splitHosts[0], splitHosts[1])); } } else { - String[] splitHosts = getSplitHosts(blkLocations,0,length,clusterMap); - splits.add(makeSplit(path, 0, length, splitHosts)); + String[][] splitHosts = getSplitHostsAndCachedHosts(blkLocations,0,length,clusterMap); + splits.add(makeSplit(path, 0, length, splitHosts[0], splitHosts[1])); } } else { //Create empty hosts array for zero length files @@ -538,10 +547,30 @@ public abstract class FileInputFormat implements InputFormat { * @param blkLocations The list of block locations * @param offset * @param splitSize - * @return array of hosts that contribute most to this split + * @return an array of hosts that contribute most to this split * @throws IOException */ protected String[] getSplitHosts(BlockLocation[] blkLocations, + long offset, long splitSize, NetworkTopology clusterMap) throws IOException { + return getSplitHostsAndCachedHosts(blkLocations, offset, splitSize, + clusterMap)[0]; + } + + /** + * This function identifies and returns the hosts that contribute + * most for a given split. For calculating the contribution, rack + * locality is treated on par with host locality, so hosts from racks + * that contribute the most are preferred over hosts on racks that + * contribute less + * @param blkLocations The list of block locations + * @param offset + * @param splitSize + * @return two arrays - one of hosts that contribute most to this split, and + * one of hosts that contribute most to this split that have the data + * cached on them + * @throws IOException + */ + private String[][] getSplitHostsAndCachedHosts(BlockLocation[] blkLocations, long offset, long splitSize, NetworkTopology clusterMap) throws IOException { @@ -552,7 +581,8 @@ public abstract class FileInputFormat implements InputFormat { //If this is the only block, just return if (bytesInThisBlock >= splitSize) { - return blkLocations[startIndex].getHosts(); + return new String[][] { blkLocations[startIndex].getHosts(), + blkLocations[startIndex].getCachedHosts() }; } long bytesInFirstBlock = bytesInThisBlock; @@ -639,7 +669,9 @@ public abstract class FileInputFormat implements InputFormat { } // for all indices - return identifyHosts(allTopos.length, racksMap); + // We don't yet support cached hosts when bytesInThisBlock > splitSize + return new String[][] { identifyHosts(allTopos.length, racksMap), + new String[0]}; } private String[] identifyHosts(int replicationFactor, diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileSplit.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileSplit.java index fb1c651a9a9..c38f2f78f83 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileSplit.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileSplit.java @@ -24,6 +24,7 @@ import java.io.DataOutput; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.classification.InterfaceStability.Evolving; import org.apache.hadoop.fs.Path; /** A section of an input file. Returned by {@link @@ -33,7 +34,7 @@ import org.apache.hadoop.fs.Path; @InterfaceAudience.Public @InterfaceStability.Stable public class FileSplit extends org.apache.hadoop.mapreduce.InputSplit - implements InputSplit { + implements InputSplitWithLocationInfo { org.apache.hadoop.mapreduce.lib.input.FileSplit fs; protected FileSplit() { fs = new org.apache.hadoop.mapreduce.lib.input.FileSplit(); @@ -62,6 +63,20 @@ public class FileSplit extends org.apache.hadoop.mapreduce.InputSplit length, hosts); } + /** Constructs a split with host information + * + * @param file the file name + * @param start the position of the first byte in the file to process + * @param length the number of bytes in the file to process + * @param hosts the list of hosts containing the block, possibly null + * @param inMemoryHosts the list of hosts containing the block in memory + */ + public FileSplit(Path file, long start, long length, String[] hosts, + String[] inMemoryHosts) { + fs = new org.apache.hadoop.mapreduce.lib.input.FileSplit(file, start, + length, hosts, inMemoryHosts); + } + public FileSplit(org.apache.hadoop.mapreduce.lib.input.FileSplit fs) { this.fs = fs; } @@ -92,4 +107,9 @@ public class FileSplit extends org.apache.hadoop.mapreduce.InputSplit return fs.getLocations(); } + @Override + @Evolving + public SplitLocationInfo[] getLocationInfo() throws IOException { + return fs.getLocationInfo(); + } } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/InputSplitWithLocationInfo.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/InputSplitWithLocationInfo.java new file mode 100644 index 00000000000..bb95882188a --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/InputSplitWithLocationInfo.java @@ -0,0 +1,39 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapred; + +import java.io.IOException; + +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Evolving; + +@Public +@Evolving +public interface InputSplitWithLocationInfo extends InputSplit { + /** + * Gets info about which nodes the input split is stored on and how it is + * stored at each location. + * + * @return list of SplitLocationInfos describing how the split + * data is stored at each location. A null value indicates that all the + * locations have the data stored on disk. + * @throws IOException + */ + SplitLocationInfo[] getLocationInfo() throws IOException; +} diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/SplitLocationInfo.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/SplitLocationInfo.java new file mode 100644 index 00000000000..a8e69fb52dd --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/SplitLocationInfo.java @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapred; + +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Evolving; + +@Public +@Evolving +public class SplitLocationInfo { + private boolean inMemory; + private String location; + + public SplitLocationInfo(String location, boolean inMemory) { + this.location = location; + this.inMemory = inMemory; + } + + public boolean isOnDisk() { + return true; + } + + public boolean isInMemory() { + return inMemory; + } + + public String getLocation() { + return location; + } +} diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/InputSplit.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/InputSplit.java index 95d4a8c4796..515b423f935 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/InputSplit.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/InputSplit.java @@ -22,6 +22,8 @@ import java.io.IOException; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.classification.InterfaceStability.Evolving; +import org.apache.hadoop.mapred.SplitLocationInfo; import org.apache.hadoop.mapreduce.InputFormat; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.RecordReader; @@ -51,10 +53,25 @@ public abstract class InputSplit { /** * Get the list of nodes by name where the data for the split would be local. * The locations do not need to be serialized. + * * @return a new array of the node nodes. * @throws IOException * @throws InterruptedException */ public abstract String[] getLocations() throws IOException, InterruptedException; + + /** + * Gets info about which nodes the input split is stored on and how it is + * stored at each location. + * + * @return list of SplitLocationInfos describing how the split + * data is stored at each location. A null value indicates that all the + * locations have the data stored on disk. + * @throws IOException + */ + @Evolving + public SplitLocationInfo[] getLocationInfo() throws IOException { + return null; + } } \ No newline at end of file diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java index 5f32f11ca0c..56fb9fcdf11 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java @@ -35,6 +35,7 @@ import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.fs.BlockLocation; import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.mapred.LocatedFileStatusFetcher; +import org.apache.hadoop.mapred.SplitLocationInfo; import org.apache.hadoop.mapreduce.InputFormat; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.Job; @@ -359,6 +360,15 @@ public abstract class FileInputFormat extends InputFormat { String[] hosts) { return new FileSplit(file, start, length, hosts); } + + /** + * A factory that makes the split for this class. It can be overridden + * by sub-classes to make sub-types + */ + protected FileSplit makeSplit(Path file, long start, long length, + String[] hosts, String[] inMemoryHosts) { + return new FileSplit(file, start, length, hosts, inMemoryHosts); + } /** * Generate the list of files and make them into FileSplits. @@ -392,17 +402,20 @@ public abstract class FileInputFormat extends InputFormat { while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) { int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining); splits.add(makeSplit(path, length-bytesRemaining, splitSize, - blkLocations[blkIndex].getHosts())); + blkLocations[blkIndex].getHosts(), + blkLocations[blkIndex].getCachedHosts())); bytesRemaining -= splitSize; } if (bytesRemaining != 0) { int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining); splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining, - blkLocations[blkIndex].getHosts())); + blkLocations[blkIndex].getHosts(), + blkLocations[blkIndex].getCachedHosts())); } } else { // not splitable - splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts())); + splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts(), + blkLocations[0].getCachedHosts())); } } else { //Create empty hosts array for zero length files diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FileSplit.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FileSplit.java index 72c845060e3..9fba79c2a42 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FileSplit.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FileSplit.java @@ -22,11 +22,13 @@ import java.io.IOException; import java.io.DataInput; import java.io.DataOutput; +import org.apache.hadoop.mapred.SplitLocationInfo; import org.apache.hadoop.mapreduce.InputFormat; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.classification.InterfaceStability.Evolving; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; @@ -41,6 +43,7 @@ public class FileSplit extends InputSplit implements Writable { private long start; private long length; private String[] hosts; + private SplitLocationInfo[] hostInfos; public FileSplit() {} @@ -57,6 +60,31 @@ public class FileSplit extends InputSplit implements Writable { this.length = length; this.hosts = hosts; } + + /** Constructs a split with host and cached-blocks information + * + * @param file the file name + * @param start the position of the first byte in the file to process + * @param length the number of bytes in the file to process + * @param hosts the list of hosts containing the block + * @param inMemoryHosts the list of hosts containing the block in memory + */ + public FileSplit(Path file, long start, long length, String[] hosts, + String[] inMemoryHosts) { + this(file, start, length, hosts); + hostInfos = new SplitLocationInfo[hosts.length]; + for (int i = 0; i < hosts.length; i++) { + // because N will be tiny, scanning is probably faster than a HashSet + boolean inMemory = false; + for (String inMemoryHost : inMemoryHosts) { + if (inMemoryHost.equals(hosts[i])) { + inMemory = true; + break; + } + } + hostInfos[i] = new SplitLocationInfo(hosts[i], inMemory); + } + } /** The file containing this split's data. */ public Path getPath() { return file; } @@ -98,4 +126,10 @@ public class FileSplit extends InputSplit implements Writable { return this.hosts; } } + + @Override + @Evolving + public SplitLocationInfo[] getLocationInfo() throws IOException { + return hostInfos; + } } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestFileInputFormat.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestFileInputFormat.java index 0bb4e96470f..ba636b60db7 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestFileInputFormat.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestFileInputFormat.java @@ -102,6 +102,29 @@ public class TestFileInputFormat { FileSystem.closeAll(); } + @Test + public void testSplitLocationInfo() throws Exception { + Configuration conf = getConfiguration(); + conf.set(org.apache.hadoop.mapreduce.lib.input.FileInputFormat.INPUT_DIR, + "test:///a1/a2"); + JobConf job = new JobConf(conf); + TextInputFormat fileInputFormat = new TextInputFormat(); + fileInputFormat.configure(job); + FileSplit[] splits = (FileSplit[]) fileInputFormat.getSplits(job, 1); + String[] locations = splits[0].getLocations(); + Assert.assertEquals(2, locations.length); + SplitLocationInfo[] locationInfo = splits[0].getLocationInfo(); + Assert.assertEquals(2, locationInfo.length); + SplitLocationInfo localhostInfo = locations[0].equals("localhost") ? + locationInfo[0] : locationInfo[1]; + SplitLocationInfo otherhostInfo = locations[0].equals("otherhost") ? + locationInfo[0] : locationInfo[1]; + Assert.assertTrue(localhostInfo.isOnDisk()); + Assert.assertTrue(localhostInfo.isInMemory()); + Assert.assertTrue(otherhostInfo.isOnDisk()); + Assert.assertFalse(otherhostInfo.isInMemory()); + } + @Test public void testListStatusSimple() throws IOException { Configuration conf = new Configuration(); @@ -223,8 +246,9 @@ public class TestFileInputFormat { public BlockLocation[] getFileBlockLocations(Path p, long start, long len) throws IOException { return new BlockLocation[] { - new BlockLocation(new String[] { "localhost:50010" }, - new String[] { "localhost" }, 0, len) }; + new BlockLocation(new String[] { "localhost:50010", "otherhost:50010" }, + new String[] { "localhost", "otherhost" }, new String[] { "localhost" }, + new String[0], 0, len, false) }; } @Override diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestFileInputFormat.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestFileInputFormat.java index 246c158732f..3f877f11eb9 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestFileInputFormat.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestFileInputFormat.java @@ -39,6 +39,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.fs.RawLocalFileSystem; import org.apache.hadoop.fs.RemoteIterator; +import org.apache.hadoop.mapred.SplitLocationInfo; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.Job; import org.junit.After; @@ -139,6 +140,28 @@ public class TestFileInputFormat { 1, mockFs.numListLocatedStatusCalls); FileSystem.closeAll(); } + + @Test + public void testSplitLocationInfo() throws Exception { + Configuration conf = getConfiguration(); + conf.set(org.apache.hadoop.mapreduce.lib.input.FileInputFormat.INPUT_DIR, + "test:///a1/a2"); + Job job = Job.getInstance(conf); + TextInputFormat fileInputFormat = new TextInputFormat(); + List splits = fileInputFormat.getSplits(job); + String[] locations = splits.get(0).getLocations(); + Assert.assertEquals(2, locations.length); + SplitLocationInfo[] locationInfo = splits.get(0).getLocationInfo(); + Assert.assertEquals(2, locationInfo.length); + SplitLocationInfo localhostInfo = locations[0].equals("localhost") ? + locationInfo[0] : locationInfo[1]; + SplitLocationInfo otherhostInfo = locations[0].equals("otherhost") ? + locationInfo[0] : locationInfo[1]; + Assert.assertTrue(localhostInfo.isOnDisk()); + Assert.assertTrue(localhostInfo.isInMemory()); + Assert.assertTrue(otherhostInfo.isOnDisk()); + Assert.assertFalse(otherhostInfo.isInMemory()); + } @Test public void testListStatusSimple() throws IOException { @@ -402,9 +425,9 @@ public class TestFileInputFormat { public BlockLocation[] getFileBlockLocations(Path p, long start, long len) throws IOException { return new BlockLocation[] { - new BlockLocation(new String[] { "localhost:50010" }, - new String[] { "localhost" }, 0, len) }; - } + new BlockLocation(new String[] { "localhost:50010", "otherhost:50010" }, + new String[] { "localhost", "otherhost" }, new String[] { "localhost" }, + new String[0], 0, len, false) }; } @Override protected RemoteIterator listLocatedStatus(Path f,