From 0bf97794897f576f28248dfc793e61127a314626 Mon Sep 17 00:00:00 2001 From: Vinod Kumar Vavilapalli Date: Tue, 24 Jan 2012 21:30:48 +0000 Subject: [PATCH] MAPREDUCE-3710. Improved FileInputFormat to return better locality for the last split. Contributed by Siddarth Seth. svn merge --ignore-ancestry -c 1235510 ../../trunk/ git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23@1235511 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-mapreduce-project/CHANGES.txt | 3 + .../apache/hadoop/mapred/FileInputFormat.java | 6 +- .../mapreduce/lib/input/FileInputFormat.java | 3 +- .../hadoop/mapred/TestFileInputFormat.java | 101 ++++++++++++++++ .../lib/input/TestFileInputFormat.java | 111 ++++++++++++++++++ 5 files changed, 221 insertions(+), 3 deletions(-) diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 9baf946b2e5..1d2b3c42e69 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -127,6 +127,9 @@ Release 0.23.1 - Unreleased MAPREDUCE-3692. yarn-resourcemanager out and log files can get big. (eli) + MAPREDUCE-3710. Improved FileInputFormat to return better locality for the + last split. (Siddarth Seth via vinodkv) + OPTIMIZATIONS MAPREDUCE-3567. Extraneous JobConf objects in AM heap. (Vinod Kumar diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileInputFormat.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileInputFormat.java index a7e59562bd5..aaf3c26b789 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileInputFormat.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileInputFormat.java @@ -289,8 +289,10 @@ public InputSplit[] getSplits(JobConf job, int numSplits) } if (bytesRemaining != 0) { - splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining, - blkLocations[blkLocations.length-1].getHosts())); + String[] splitHosts = getSplitHosts(blkLocations, length + - bytesRemaining, bytesRemaining, clusterMap); + splits.add(makeSplit(path, length - bytesRemaining, bytesRemaining, + splitHosts)); } } else if (length != 0) { String[] splitHosts = getSplitHosts(blkLocations,0,length,clusterMap); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java index 781715dbeef..d86ad156bda 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java @@ -286,8 +286,9 @@ public List getSplits(JobContext job) throws IOException { } if (bytesRemaining != 0) { + int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining); splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining, - blkLocations[blkLocations.length-1].getHosts())); + blkLocations[blkIndex].getHosts())); } } else { // not splitable splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts())); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestFileInputFormat.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestFileInputFormat.java index 3476e635c5c..fca9b358647 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestFileInputFormat.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestFileInputFormat.java @@ -17,6 +17,10 @@ */ package org.apache.hadoop.mapred; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + import java.io.DataOutputStream; import java.io.IOException; @@ -32,6 +36,7 @@ import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.io.Text; +@SuppressWarnings("deprecation") public class TestFileInputFormat extends TestCase { Configuration conf = new Configuration(); @@ -186,6 +191,102 @@ public void testMultiLevelInput() throws IOException { assertEquals(splits.length, 2); } + @SuppressWarnings("rawtypes") + public void testLastInputSplitAtSplitBoundary() throws Exception { + FileInputFormat fif = new FileInputFormatForTest(1024l * 1024 * 1024, + 128l * 1024 * 1024); + JobConf job = new JobConf(); + InputSplit[] splits = fif.getSplits(job, 8); + assertEquals(8, splits.length); + for (int i = 0; i < splits.length; i++) { + InputSplit split = splits[i]; + assertEquals(("host" + i), split.getLocations()[0]); + } + } + + @SuppressWarnings("rawtypes") + public void testLastInputSplitExceedingSplitBoundary() throws Exception { + FileInputFormat fif = new FileInputFormatForTest(1027l * 1024 * 1024, + 128l * 1024 * 1024); + JobConf job = new JobConf(); + InputSplit[] splits = fif.getSplits(job, 8); + assertEquals(8, splits.length); + for (int i = 0; i < splits.length; i++) { + InputSplit split = splits[i]; + assertEquals(("host" + i), split.getLocations()[0]); + } + } + + @SuppressWarnings("rawtypes") + public void testLastInputSplitSingleSplit() throws Exception { + FileInputFormat fif = new FileInputFormatForTest(100l * 1024 * 1024, + 128l * 1024 * 1024); + JobConf job = new JobConf(); + InputSplit[] splits = fif.getSplits(job, 1); + assertEquals(1, splits.length); + for (int i = 0; i < splits.length; i++) { + InputSplit split = splits[i]; + assertEquals(("host" + i), split.getLocations()[0]); + } + } + + private class FileInputFormatForTest extends FileInputFormat { + + long splitSize; + long length; + + FileInputFormatForTest(long length, long splitSize) { + this.length = length; + this.splitSize = splitSize; + } + + @Override + public RecordReader getRecordReader(InputSplit split, JobConf job, + Reporter reporter) throws IOException { + return null; + } + + @Override + protected FileStatus[] listStatus(JobConf job) throws IOException { + FileStatus mockFileStatus = mock(FileStatus.class); + when(mockFileStatus.getBlockSize()).thenReturn(splitSize); + when(mockFileStatus.isDirectory()).thenReturn(false); + Path mockPath = mock(Path.class); + FileSystem mockFs = mock(FileSystem.class); + + BlockLocation[] blockLocations = mockBlockLocations(length, splitSize); + when(mockFs.getFileBlockLocations(mockFileStatus, 0, length)).thenReturn( + blockLocations); + when(mockPath.getFileSystem(any(Configuration.class))).thenReturn(mockFs); + + when(mockFileStatus.getPath()).thenReturn(mockPath); + when(mockFileStatus.getLen()).thenReturn(length); + + FileStatus[] fs = new FileStatus[1]; + fs[0] = mockFileStatus; + return fs; + } + + @Override + protected long computeSplitSize(long blockSize, long minSize, long maxSize) { + return splitSize; + } + + private BlockLocation[] mockBlockLocations(long size, long splitSize) { + int numLocations = (int) (size / splitSize); + if (size % splitSize != 0) + numLocations++; + BlockLocation[] blockLocations = new BlockLocation[numLocations]; + for (int i = 0; i < numLocations; i++) { + String[] names = new String[] { "b" + i }; + String[] hosts = new String[] { "host" + i }; + blockLocations[i] = new BlockLocation(names, hosts, i * splitSize, + Math.min(splitSize, size - (splitSize * i))); + } + return blockLocations; + } + } + static void writeFile(Configuration conf, Path name, short replication, int numBlocks) throws IOException { FileSystem fileSys = FileSystem.get(conf); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestFileInputFormat.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestFileInputFormat.java index 692a6b6da83..824e6842cff 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestFileInputFormat.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestFileInputFormat.java @@ -19,7 +19,9 @@ package org.apache.hadoop.mapreduce.lib.input; import java.io.IOException; +import java.util.ArrayList; import java.util.Arrays; +import java.util.List; import org.junit.Test; import static org.junit.Assert.*; @@ -28,10 +30,15 @@ import static org.apache.hadoop.test.MockitoMaker.*; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.BlockLocation; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; public class TestFileInputFormat { @@ -80,4 +87,108 @@ public void testNumInputFiles() throws Exception { ispy.getSplits(job); verify(conf).setLong(FileInputFormat.NUM_INPUT_FILES, 1); } + + @Test + @SuppressWarnings({"rawtypes", "unchecked"}) + public void testLastInputSplitAtSplitBoundary() throws Exception { + FileInputFormat fif = new FileInputFormatForTest(1024l * 1024 * 1024, + 128l * 1024 * 1024); + Configuration conf = new Configuration(); + JobContext jobContext = mock(JobContext.class); + when(jobContext.getConfiguration()).thenReturn(conf); + List splits = fif.getSplits(jobContext); + assertEquals(8, splits.size()); + for (int i = 0 ; i < splits.size() ; i++) { + InputSplit split = splits.get(i); + assertEquals(("host" + i), split.getLocations()[0]); + } + } + + @Test + @SuppressWarnings({ "rawtypes", "unchecked" }) + public void testLastInputSplitExceedingSplitBoundary() throws Exception { + FileInputFormat fif = new FileInputFormatForTest(1027l * 1024 * 1024, + 128l * 1024 * 1024); + Configuration conf = new Configuration(); + JobContext jobContext = mock(JobContext.class); + when(jobContext.getConfiguration()).thenReturn(conf); + List splits = fif.getSplits(jobContext); + assertEquals(8, splits.size()); + for (int i = 0; i < splits.size(); i++) { + InputSplit split = splits.get(i); + assertEquals(("host" + i), split.getLocations()[0]); + } + } + + @Test + @SuppressWarnings({ "rawtypes", "unchecked" }) + public void testLastInputSplitSingleSplit() throws Exception { + FileInputFormat fif = new FileInputFormatForTest(100l * 1024 * 1024, + 128l * 1024 * 1024); + Configuration conf = new Configuration(); + JobContext jobContext = mock(JobContext.class); + when(jobContext.getConfiguration()).thenReturn(conf); + List splits = fif.getSplits(jobContext); + assertEquals(1, splits.size()); + for (int i = 0; i < splits.size(); i++) { + InputSplit split = splits.get(i); + assertEquals(("host" + i), split.getLocations()[0]); + } + } + + private class FileInputFormatForTest extends FileInputFormat { + + long splitSize; + long length; + + FileInputFormatForTest(long length, long splitSize) { + this.length = length; + this.splitSize = splitSize; + } + + @Override + public RecordReader createRecordReader(InputSplit split, + TaskAttemptContext context) throws IOException, InterruptedException { + return null; + } + + @Override + protected List listStatus(JobContext job) throws IOException { + FileStatus mockFileStatus = mock(FileStatus.class); + when(mockFileStatus.getBlockSize()).thenReturn(splitSize); + Path mockPath = mock(Path.class); + FileSystem mockFs = mock(FileSystem.class); + + BlockLocation[] blockLocations = mockBlockLocations(length, splitSize); + when(mockFs.getFileBlockLocations(mockFileStatus, 0, length)).thenReturn( + blockLocations); + when(mockPath.getFileSystem(any(Configuration.class))).thenReturn(mockFs); + + when(mockFileStatus.getPath()).thenReturn(mockPath); + when(mockFileStatus.getLen()).thenReturn(length); + + List list = new ArrayList(); + list.add(mockFileStatus); + return list; + } + + @Override + protected long computeSplitSize(long blockSize, long minSize, long maxSize) { + return splitSize; + } + + private BlockLocation[] mockBlockLocations(long size, long splitSize) { + int numLocations = (int) (size / splitSize); + if (size % splitSize != 0) + numLocations++; + BlockLocation[] blockLocations = new BlockLocation[numLocations]; + for (int i = 0; i < numLocations; i++) { + String[] names = new String[] { "b" + i }; + String[] hosts = new String[] { "host" + i }; + blockLocations[i] = new BlockLocation(names, hosts, i * splitSize, + Math.min(splitSize, size - (splitSize * i))); + } + return blockLocations; + } + } }