MAPREDUCE-5896. InputSplits should indicate which locations have the block cached in memory. (Sandy Ryza via kasha)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1603670 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Karthik Kambatla 2014-06-18 23:28:50 +00:00
parent db437b7cbb
commit bd23a2ff22
10 changed files with 269 additions and 18 deletions

View File

@ -213,6 +213,9 @@ Release 2.5.0 - UNRELEASED
MAPREDUCE-5834. Increased test-timeouts in TestGridMixClasses to avoid
occassional failures. (Mit Desai via vinodkv)
MAPREDUCE-5896. InputSplits should indicate which locations have the block
cached in memory. (Sandy Ryza via kasha)
OPTIMIZATIONS
BUG FIXES

View File

@ -295,6 +295,15 @@ public abstract class FileInputFormat<K, V> implements InputFormat<K, V> {
String[] hosts) {
return new FileSplit(file, start, length, hosts);
}
/**
* A factory that makes the split for this class. It can be overridden
* by sub-classes to make sub-types
*/
protected FileSplit makeSplit(Path file, long start, long length,
String[] hosts, String[] inMemoryHosts) {
return new FileSplit(file, start, length, hosts, inMemoryHosts);
}
/** Splits files returned by {@link #listStatus(JobConf)} when
* they're too big.*/
@ -337,22 +346,22 @@ public abstract class FileInputFormat<K, V> implements InputFormat<K, V> {
long bytesRemaining = length;
while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
String[] splitHosts = getSplitHosts(blkLocations,
String[][] splitHosts = getSplitHostsAndCachedHosts(blkLocations,
length-bytesRemaining, splitSize, clusterMap);
splits.add(makeSplit(path, length-bytesRemaining, splitSize,
splitHosts));
splitHosts[0], splitHosts[1]));
bytesRemaining -= splitSize;
}
if (bytesRemaining != 0) {
String[] splitHosts = getSplitHosts(blkLocations, length
String[][] splitHosts = getSplitHostsAndCachedHosts(blkLocations, length
- bytesRemaining, bytesRemaining, clusterMap);
splits.add(makeSplit(path, length - bytesRemaining, bytesRemaining,
splitHosts));
splitHosts[0], splitHosts[1]));
}
} else {
String[] splitHosts = getSplitHosts(blkLocations,0,length,clusterMap);
splits.add(makeSplit(path, 0, length, splitHosts));
String[][] splitHosts = getSplitHostsAndCachedHosts(blkLocations,0,length,clusterMap);
splits.add(makeSplit(path, 0, length, splitHosts[0], splitHosts[1]));
}
} else {
//Create empty hosts array for zero length files
@ -538,10 +547,30 @@ public abstract class FileInputFormat<K, V> implements InputFormat<K, V> {
* @param blkLocations The list of block locations
* @param offset
* @param splitSize
* @return array of hosts that contribute most to this split
* @return an array of hosts that contribute most to this split
* @throws IOException
*/
protected String[] getSplitHosts(BlockLocation[] blkLocations,
long offset, long splitSize, NetworkTopology clusterMap) throws IOException {
return getSplitHostsAndCachedHosts(blkLocations, offset, splitSize,
clusterMap)[0];
}
/**
* This function identifies and returns the hosts that contribute
* most for a given split. For calculating the contribution, rack
* locality is treated on par with host locality, so hosts from racks
* that contribute the most are preferred over hosts on racks that
* contribute less
* @param blkLocations The list of block locations
* @param offset
* @param splitSize
* @return two arrays - one of hosts that contribute most to this split, and
* one of hosts that contribute most to this split that have the data
* cached on them
* @throws IOException
*/
private String[][] getSplitHostsAndCachedHosts(BlockLocation[] blkLocations,
long offset, long splitSize, NetworkTopology clusterMap)
throws IOException {
@ -552,7 +581,8 @@ public abstract class FileInputFormat<K, V> implements InputFormat<K, V> {
//If this is the only block, just return
if (bytesInThisBlock >= splitSize) {
return blkLocations[startIndex].getHosts();
return new String[][] { blkLocations[startIndex].getHosts(),
blkLocations[startIndex].getCachedHosts() };
}
long bytesInFirstBlock = bytesInThisBlock;
@ -639,7 +669,9 @@ public abstract class FileInputFormat<K, V> implements InputFormat<K, V> {
} // for all indices
return identifyHosts(allTopos.length, racksMap);
// We don't yet support cached hosts when bytesInThisBlock > splitSize
return new String[][] { identifyHosts(allTopos.length, racksMap),
new String[0]};
}
private String[] identifyHosts(int replicationFactor,

View File

@ -24,6 +24,7 @@ import java.io.DataOutput;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.classification.InterfaceStability.Evolving;
import org.apache.hadoop.fs.Path;
/** A section of an input file. Returned by {@link
@ -33,7 +34,7 @@ import org.apache.hadoop.fs.Path;
@InterfaceAudience.Public
@InterfaceStability.Stable
public class FileSplit extends org.apache.hadoop.mapreduce.InputSplit
implements InputSplit {
implements InputSplitWithLocationInfo {
org.apache.hadoop.mapreduce.lib.input.FileSplit fs;
protected FileSplit() {
fs = new org.apache.hadoop.mapreduce.lib.input.FileSplit();
@ -62,6 +63,20 @@ public class FileSplit extends org.apache.hadoop.mapreduce.InputSplit
length, hosts);
}
/** Constructs a split with host information
*
* @param file the file name
* @param start the position of the first byte in the file to process
* @param length the number of bytes in the file to process
* @param hosts the list of hosts containing the block, possibly null
* @param inMemoryHosts the list of hosts containing the block in memory
*/
public FileSplit(Path file, long start, long length, String[] hosts,
String[] inMemoryHosts) {
fs = new org.apache.hadoop.mapreduce.lib.input.FileSplit(file, start,
length, hosts, inMemoryHosts);
}
public FileSplit(org.apache.hadoop.mapreduce.lib.input.FileSplit fs) {
this.fs = fs;
}
@ -92,4 +107,9 @@ public class FileSplit extends org.apache.hadoop.mapreduce.InputSplit
return fs.getLocations();
}
@Override
@Evolving
public SplitLocationInfo[] getLocationInfo() throws IOException {
return fs.getLocationInfo();
}
}

View File

@ -0,0 +1,39 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapred;
import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Evolving;
@Public
@Evolving
public interface InputSplitWithLocationInfo extends InputSplit {
/**
* Gets info about which nodes the input split is stored on and how it is
* stored at each location.
*
* @return list of <code>SplitLocationInfo</code>s describing how the split
* data is stored at each location. A null value indicates that all the
* locations have the data stored on disk.
* @throws IOException
*/
SplitLocationInfo[] getLocationInfo() throws IOException;
}

View File

@ -0,0 +1,46 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapred;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Evolving;
@Public
@Evolving
public class SplitLocationInfo {
private boolean inMemory;
private String location;
public SplitLocationInfo(String location, boolean inMemory) {
this.location = location;
this.inMemory = inMemory;
}
public boolean isOnDisk() {
return true;
}
public boolean isInMemory() {
return inMemory;
}
public String getLocation() {
return location;
}
}

View File

@ -22,6 +22,8 @@ import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.classification.InterfaceStability.Evolving;
import org.apache.hadoop.mapred.SplitLocationInfo;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.RecordReader;
@ -51,10 +53,25 @@ public abstract class InputSplit {
/**
* Get the list of nodes by name where the data for the split would be local.
* The locations do not need to be serialized.
*
* @return a new array of the node nodes.
* @throws IOException
* @throws InterruptedException
*/
public abstract
String[] getLocations() throws IOException, InterruptedException;
/**
* Gets info about which nodes the input split is stored on and how it is
* stored at each location.
*
* @return list of <code>SplitLocationInfo</code>s describing how the split
* data is stored at each location. A null value indicates that all the
* locations have the data stored on disk.
* @throws IOException
*/
@Evolving
public SplitLocationInfo[] getLocationInfo() throws IOException {
return null;
}
}

View File

@ -35,6 +35,7 @@ import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.mapred.LocatedFileStatusFetcher;
import org.apache.hadoop.mapred.SplitLocationInfo;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
@ -359,6 +360,15 @@ public abstract class FileInputFormat<K, V> extends InputFormat<K, V> {
String[] hosts) {
return new FileSplit(file, start, length, hosts);
}
/**
* A factory that makes the split for this class. It can be overridden
* by sub-classes to make sub-types
*/
protected FileSplit makeSplit(Path file, long start, long length,
String[] hosts, String[] inMemoryHosts) {
return new FileSplit(file, start, length, hosts, inMemoryHosts);
}
/**
* Generate the list of files and make them into FileSplits.
@ -392,17 +402,20 @@ public abstract class FileInputFormat<K, V> extends InputFormat<K, V> {
while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
splits.add(makeSplit(path, length-bytesRemaining, splitSize,
blkLocations[blkIndex].getHosts()));
blkLocations[blkIndex].getHosts(),
blkLocations[blkIndex].getCachedHosts()));
bytesRemaining -= splitSize;
}
if (bytesRemaining != 0) {
int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining,
blkLocations[blkIndex].getHosts()));
blkLocations[blkIndex].getHosts(),
blkLocations[blkIndex].getCachedHosts()));
}
} else { // not splitable
splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts()));
splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts(),
blkLocations[0].getCachedHosts()));
}
} else {
//Create empty hosts array for zero length files

View File

@ -22,11 +22,13 @@ import java.io.IOException;
import java.io.DataInput;
import java.io.DataOutput;
import org.apache.hadoop.mapred.SplitLocationInfo;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.classification.InterfaceStability.Evolving;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
@ -41,6 +43,7 @@ public class FileSplit extends InputSplit implements Writable {
private long start;
private long length;
private String[] hosts;
private SplitLocationInfo[] hostInfos;
public FileSplit() {}
@ -57,6 +60,31 @@ public class FileSplit extends InputSplit implements Writable {
this.length = length;
this.hosts = hosts;
}
/** Constructs a split with host and cached-blocks information
*
* @param file the file name
* @param start the position of the first byte in the file to process
* @param length the number of bytes in the file to process
* @param hosts the list of hosts containing the block
* @param inMemoryHosts the list of hosts containing the block in memory
*/
public FileSplit(Path file, long start, long length, String[] hosts,
String[] inMemoryHosts) {
this(file, start, length, hosts);
hostInfos = new SplitLocationInfo[hosts.length];
for (int i = 0; i < hosts.length; i++) {
// because N will be tiny, scanning is probably faster than a HashSet
boolean inMemory = false;
for (String inMemoryHost : inMemoryHosts) {
if (inMemoryHost.equals(hosts[i])) {
inMemory = true;
break;
}
}
hostInfos[i] = new SplitLocationInfo(hosts[i], inMemory);
}
}
/** The file containing this split's data. */
public Path getPath() { return file; }
@ -98,4 +126,10 @@ public class FileSplit extends InputSplit implements Writable {
return this.hosts;
}
}
@Override
@Evolving
public SplitLocationInfo[] getLocationInfo() throws IOException {
return hostInfos;
}
}

View File

@ -102,6 +102,29 @@ public class TestFileInputFormat {
FileSystem.closeAll();
}
@Test
public void testSplitLocationInfo() throws Exception {
Configuration conf = getConfiguration();
conf.set(org.apache.hadoop.mapreduce.lib.input.FileInputFormat.INPUT_DIR,
"test:///a1/a2");
JobConf job = new JobConf(conf);
TextInputFormat fileInputFormat = new TextInputFormat();
fileInputFormat.configure(job);
FileSplit[] splits = (FileSplit[]) fileInputFormat.getSplits(job, 1);
String[] locations = splits[0].getLocations();
Assert.assertEquals(2, locations.length);
SplitLocationInfo[] locationInfo = splits[0].getLocationInfo();
Assert.assertEquals(2, locationInfo.length);
SplitLocationInfo localhostInfo = locations[0].equals("localhost") ?
locationInfo[0] : locationInfo[1];
SplitLocationInfo otherhostInfo = locations[0].equals("otherhost") ?
locationInfo[0] : locationInfo[1];
Assert.assertTrue(localhostInfo.isOnDisk());
Assert.assertTrue(localhostInfo.isInMemory());
Assert.assertTrue(otherhostInfo.isOnDisk());
Assert.assertFalse(otherhostInfo.isInMemory());
}
@Test
public void testListStatusSimple() throws IOException {
Configuration conf = new Configuration();
@ -223,8 +246,9 @@ public class TestFileInputFormat {
public BlockLocation[] getFileBlockLocations(Path p, long start, long len)
throws IOException {
return new BlockLocation[] {
new BlockLocation(new String[] { "localhost:50010" },
new String[] { "localhost" }, 0, len) };
new BlockLocation(new String[] { "localhost:50010", "otherhost:50010" },
new String[] { "localhost", "otherhost" }, new String[] { "localhost" },
new String[0], 0, len, false) };
}
@Override

View File

@ -39,6 +39,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.fs.RawLocalFileSystem;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.mapred.SplitLocationInfo;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
import org.junit.After;
@ -139,6 +140,28 @@ public class TestFileInputFormat {
1, mockFs.numListLocatedStatusCalls);
FileSystem.closeAll();
}
@Test
public void testSplitLocationInfo() throws Exception {
Configuration conf = getConfiguration();
conf.set(org.apache.hadoop.mapreduce.lib.input.FileInputFormat.INPUT_DIR,
"test:///a1/a2");
Job job = Job.getInstance(conf);
TextInputFormat fileInputFormat = new TextInputFormat();
List<InputSplit> splits = fileInputFormat.getSplits(job);
String[] locations = splits.get(0).getLocations();
Assert.assertEquals(2, locations.length);
SplitLocationInfo[] locationInfo = splits.get(0).getLocationInfo();
Assert.assertEquals(2, locationInfo.length);
SplitLocationInfo localhostInfo = locations[0].equals("localhost") ?
locationInfo[0] : locationInfo[1];
SplitLocationInfo otherhostInfo = locations[0].equals("otherhost") ?
locationInfo[0] : locationInfo[1];
Assert.assertTrue(localhostInfo.isOnDisk());
Assert.assertTrue(localhostInfo.isInMemory());
Assert.assertTrue(otherhostInfo.isOnDisk());
Assert.assertFalse(otherhostInfo.isInMemory());
}
@Test
public void testListStatusSimple() throws IOException {
@ -402,9 +425,9 @@ public class TestFileInputFormat {
public BlockLocation[] getFileBlockLocations(Path p, long start, long len)
throws IOException {
return new BlockLocation[] {
new BlockLocation(new String[] { "localhost:50010" },
new String[] { "localhost" }, 0, len) };
}
new BlockLocation(new String[] { "localhost:50010", "otherhost:50010" },
new String[] { "localhost", "otherhost" }, new String[] { "localhost" },
new String[0], 0, len, false) }; }
@Override
protected RemoteIterator<LocatedFileStatus> listLocatedStatus(Path f,