MAPREDUCE-6174. Combine common stream code into parent class for InMemoryMapOutput and OnDiskMapOutput. (Eric Payne via gera)

This commit is contained in:
Gera Shegalov 2015-06-03 16:26:45 -07:00
parent bc85959edd
commit d90c13e2da
6 changed files with 114 additions and 52 deletions

View File

@ -334,6 +334,9 @@ Release 2.8.0 - UNRELEASED
MAPREDUCE-5248. Let NNBenchWithoutMR specify the replication factor for MAPREDUCE-5248. Let NNBenchWithoutMR specify the replication factor for
its test (Erik Paulson via jlowe) its test (Erik Paulson via jlowe)
MAPREDUCE-6174. Combine common stream code into parent class for
InMemoryMapOutput and OnDiskMapOutput. (Eric Payne via gera)
OPTIMIZATIONS OPTIMIZATIONS
BUG FIXES BUG FIXES

View File

@ -0,0 +1,72 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapreduce.task.reduce;
import java.io.IOException;
import java.io.InputStream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapred.IFileInputStream;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapreduce.TaskAttemptID;
/**
* Common code for allowing MapOutput classes to handle streams.
*
* @param <K> key type for map output
* @param <V> value type for map output
*/
public abstract class IFileWrappedMapOutput<K, V> extends MapOutput<K, V> {
private final Configuration conf;
private final MergeManagerImpl<K, V> merger;
public IFileWrappedMapOutput(
Configuration c, MergeManagerImpl<K, V> m, TaskAttemptID mapId,
long size, boolean primaryMapOutput) {
super(mapId, size, primaryMapOutput);
conf = c;
merger = m;
}
/**
* @return the merger
*/
protected MergeManagerImpl<K, V> getMerger() {
return merger;
}
protected abstract void doShuffle(
MapHost host, IFileInputStream iFileInputStream,
long compressedLength, long decompressedLength,
ShuffleClientMetrics metrics, Reporter reporter) throws IOException;
@Override
public void shuffle(MapHost host, InputStream input,
long compressedLength, long decompressedLength,
ShuffleClientMetrics metrics,
Reporter reporter) throws IOException {
IFileInputStream iFin =
new IFileInputStream(input, compressedLength, conf);
try {
this.doShuffle(host, iFin, compressedLength,
decompressedLength, metrics, reporter);
} finally {
iFin.close();
}
}
}

View File

@ -42,10 +42,8 @@ import org.apache.hadoop.mapreduce.TaskAttemptID;
@InterfaceAudience.Private @InterfaceAudience.Private
@InterfaceStability.Unstable @InterfaceStability.Unstable
class InMemoryMapOutput<K, V> extends MapOutput<K, V> { class InMemoryMapOutput<K, V> extends IFileWrappedMapOutput<K, V> {
private static final Log LOG = LogFactory.getLog(InMemoryMapOutput.class); private static final Log LOG = LogFactory.getLog(InMemoryMapOutput.class);
private Configuration conf;
private final MergeManagerImpl<K, V> merger;
private final byte[] memory; private final byte[] memory;
private BoundedByteArrayOutputStream byteStream; private BoundedByteArrayOutputStream byteStream;
// Decompression of map-outputs // Decompression of map-outputs
@ -56,9 +54,7 @@ class InMemoryMapOutput<K, V> extends MapOutput<K, V> {
MergeManagerImpl<K, V> merger, MergeManagerImpl<K, V> merger,
int size, CompressionCodec codec, int size, CompressionCodec codec,
boolean primaryMapOutput) { boolean primaryMapOutput) {
super(mapId, (long)size, primaryMapOutput); super(conf, merger, mapId, (long)size, primaryMapOutput);
this.conf = conf;
this.merger = merger;
this.codec = codec; this.codec = codec;
byteStream = new BoundedByteArrayOutputStream(size); byteStream = new BoundedByteArrayOutputStream(size);
memory = byteStream.getBuffer(); memory = byteStream.getBuffer();
@ -78,14 +74,11 @@ class InMemoryMapOutput<K, V> extends MapOutput<K, V> {
} }
@Override @Override
public void shuffle(MapHost host, InputStream input, protected void doShuffle(MapHost host, IFileInputStream iFin,
long compressedLength, long decompressedLength, long compressedLength, long decompressedLength,
ShuffleClientMetrics metrics, ShuffleClientMetrics metrics,
Reporter reporter) throws IOException { Reporter reporter) throws IOException {
IFileInputStream checksumIn = InputStream input = iFin;
new IFileInputStream(input, compressedLength, conf);
input = checksumIn;
// Are map-outputs compressed? // Are map-outputs compressed?
if (codec != null) { if (codec != null) {
@ -111,13 +104,6 @@ class InMemoryMapOutput<K, V> extends MapOutput<K, V> {
throw new IOException("Unexpected extra bytes from input stream for " + throw new IOException("Unexpected extra bytes from input stream for " +
getMapId()); getMapId());
} }
} catch (IOException ioe) {
// Close the streams
IOUtils.cleanup(LOG, input);
// Re-throw
throw ioe;
} finally { } finally {
CodecPool.returnDecompressor(decompressor); CodecPool.returnDecompressor(decompressor);
} }
@ -125,12 +111,12 @@ class InMemoryMapOutput<K, V> extends MapOutput<K, V> {
@Override @Override
public void commit() throws IOException { public void commit() throws IOException {
merger.closeInMemoryFile(this); getMerger().closeInMemoryFile(this);
} }
@Override @Override
public void abort() { public void abort() {
merger.unreserve(memory.length); getMerger().unreserve(memory.length);
} }
@Override @Override

View File

@ -263,8 +263,9 @@ public class MergeManagerImpl<K, V> implements MergeManager<K, V> {
LOG.info(mapId + ": Shuffling to disk since " + requestedSize + LOG.info(mapId + ": Shuffling to disk since " + requestedSize +
" is greater than maxSingleShuffleLimit (" + " is greater than maxSingleShuffleLimit (" +
maxSingleShuffleLimit + ")"); maxSingleShuffleLimit + ")");
return new OnDiskMapOutput<K,V>(mapId, reduceId, this, requestedSize, return new OnDiskMapOutput<K,V>(mapId, this, requestedSize, jobConf,
jobConf, mapOutputFile, fetcher, true); fetcher, true, FileSystem.getLocal(jobConf).getRaw(),
mapOutputFile.getInputFileForWrite(mapId.getTaskID(), requestedSize));
} }
// Stall shuffle if we are above the memory limit // Stall shuffle if we are above the memory limit

View File

@ -18,13 +18,11 @@
package org.apache.hadoop.mapreduce.task.reduce; package org.apache.hadoop.mapreduce.task.reduce;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream; import java.io.OutputStream;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -46,41 +44,46 @@ import com.google.common.annotations.VisibleForTesting;
@InterfaceAudience.Private @InterfaceAudience.Private
@InterfaceStability.Unstable @InterfaceStability.Unstable
class OnDiskMapOutput<K, V> extends MapOutput<K, V> { class OnDiskMapOutput<K, V> extends IFileWrappedMapOutput<K, V> {
private static final Log LOG = LogFactory.getLog(OnDiskMapOutput.class); private static final Log LOG = LogFactory.getLog(OnDiskMapOutput.class);
private final FileSystem fs; private final FileSystem fs;
private final Path tmpOutputPath; private final Path tmpOutputPath;
private final Path outputPath; private final Path outputPath;
private final MergeManagerImpl<K, V> merger;
private final OutputStream disk; private final OutputStream disk;
private long compressedSize; private long compressedSize;
private final Configuration conf;
@Deprecated
public OnDiskMapOutput(TaskAttemptID mapId, TaskAttemptID reduceId, public OnDiskMapOutput(TaskAttemptID mapId, TaskAttemptID reduceId,
MergeManagerImpl<K,V> merger, long size, MergeManagerImpl<K,V> merger, long size,
JobConf conf, JobConf conf,
MapOutputFile mapOutputFile, MapOutputFile mapOutputFile,
int fetcher, boolean primaryMapOutput) int fetcher, boolean primaryMapOutput)
throws IOException { throws IOException {
this(mapId, reduceId, merger, size, conf, mapOutputFile, fetcher, this(mapId, merger, size, conf, fetcher,
primaryMapOutput, FileSystem.getLocal(conf).getRaw(), primaryMapOutput, FileSystem.getLocal(conf).getRaw(),
mapOutputFile.getInputFileForWrite(mapId.getTaskID(), size)); mapOutputFile.getInputFileForWrite(mapId.getTaskID(), size));
} }
@VisibleForTesting @Deprecated
OnDiskMapOutput(TaskAttemptID mapId, TaskAttemptID reduceId, OnDiskMapOutput(TaskAttemptID mapId, TaskAttemptID reduceId,
MergeManagerImpl<K,V> merger, long size, MergeManagerImpl<K,V> merger, long size,
JobConf conf, JobConf conf,
MapOutputFile mapOutputFile, MapOutputFile mapOutputFile,
int fetcher, boolean primaryMapOutput, int fetcher, boolean primaryMapOutput,
FileSystem fs, Path outputPath) throws IOException { FileSystem fs, Path outputPath) throws IOException {
super(mapId, size, primaryMapOutput); this(mapId, merger, size, conf, fetcher, primaryMapOutput, fs, outputPath);
}
OnDiskMapOutput(TaskAttemptID mapId,
MergeManagerImpl<K, V> merger, long size,
JobConf conf,
int fetcher, boolean primaryMapOutput,
FileSystem fs, Path outputPath) throws IOException {
super(conf, merger, mapId, size, primaryMapOutput);
this.fs = fs; this.fs = fs;
this.merger = merger;
this.outputPath = outputPath; this.outputPath = outputPath;
tmpOutputPath = getTempPath(outputPath, fetcher); tmpOutputPath = getTempPath(outputPath, fetcher);
disk = CryptoUtils.wrapIfNecessary(conf, fs.create(tmpOutputPath)); disk = CryptoUtils.wrapIfNecessary(conf, fs.create(tmpOutputPath));
this.conf = conf;
} }
@VisibleForTesting @VisibleForTesting
@ -89,18 +92,18 @@ class OnDiskMapOutput<K, V> extends MapOutput<K, V> {
} }
@Override @Override
public void shuffle(MapHost host, InputStream input, protected void doShuffle(MapHost host, IFileInputStream input,
long compressedLength, long decompressedLength, long compressedLength, long decompressedLength,
ShuffleClientMetrics metrics, ShuffleClientMetrics metrics,
Reporter reporter) throws IOException { Reporter reporter) throws IOException {
input = new IFileInputStream(input, compressedLength, conf);
// Copy data to local-disk // Copy data to local-disk
long bytesLeft = compressedLength; long bytesLeft = compressedLength;
try { try {
final int BYTES_TO_READ = 64 * 1024; final int BYTES_TO_READ = 64 * 1024;
byte[] buf = new byte[BYTES_TO_READ]; byte[] buf = new byte[BYTES_TO_READ];
while (bytesLeft > 0) { while (bytesLeft > 0) {
int n = ((IFileInputStream)input).readWithChecksum(buf, 0, (int) Math.min(bytesLeft, BYTES_TO_READ)); int n = input.readWithChecksum(buf, 0,
(int) Math.min(bytesLeft, BYTES_TO_READ));
if (n < 0) { if (n < 0) {
throw new IOException("read past end of stream reading " + throw new IOException("read past end of stream reading " +
getMapId()); getMapId());
@ -117,7 +120,7 @@ class OnDiskMapOutput<K, V> extends MapOutput<K, V> {
disk.close(); disk.close();
} catch (IOException ioe) { } catch (IOException ioe) {
// Close the streams // Close the streams
IOUtils.cleanup(LOG, input, disk); IOUtils.cleanup(LOG, disk);
// Re-throw // Re-throw
throw ioe; throw ioe;
@ -139,7 +142,7 @@ class OnDiskMapOutput<K, V> extends MapOutput<K, V> {
fs.rename(tmpOutputPath, outputPath); fs.rename(tmpOutputPath, outputPath);
CompressAwarePath compressAwarePath = new CompressAwarePath(outputPath, CompressAwarePath compressAwarePath = new CompressAwarePath(outputPath,
getSize(), this.compressedSize); getSize(), this.compressedSize);
merger.closeOnDiskFile(compressAwarePath); getMerger().closeOnDiskFile(compressAwarePath);
} }
@Override @Override

View File

@ -19,9 +19,7 @@
package org.apache.hadoop.mapreduce.task.reduce; package org.apache.hadoop.mapreduce.task.reduce;
import java.io.FilterInputStream; import java.io.FilterInputStream;
import java.lang.Void; import java.lang.Void;
import java.net.HttpURLConnection; import java.net.HttpURLConnection;
import org.apache.hadoop.fs.ChecksumException; import org.apache.hadoop.fs.ChecksumException;
@ -30,13 +28,12 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.MapOutputFile; import org.apache.hadoop.mapred.MapOutputFile;
import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.TaskID; import org.apache.hadoop.mapreduce.TaskID;
import org.junit.After; import org.junit.After;
import org.junit.Before; import org.junit.Before;
import org.junit.Rule; import org.junit.Rule;
import org.junit.rules.TestName; import org.junit.rules.TestName;
import static org.junit.Assert.*;
import static org.junit.Assert.*;
import static org.mockito.Matchers.*; import static org.mockito.Matchers.*;
import static org.mockito.Mockito.*; import static org.mockito.Mockito.*;
@ -65,10 +62,11 @@ import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
import org.apache.hadoop.util.DiskChecker.DiskErrorException; import org.apache.hadoop.util.DiskChecker.DiskErrorException;
import org.apache.hadoop.util.Time; import org.apache.hadoop.util.Time;
import org.junit.Test; import org.junit.Test;
import org.mockito.invocation.InvocationOnMock; import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer; import org.mockito.stubbing.Answer;
import com.nimbusds.jose.util.StringUtils;
/** /**
* Test that the Fetcher does what we expect it to. * Test that the Fetcher does what we expect it to.
*/ */
@ -453,9 +451,9 @@ public class TestFetcher {
ByteArrayInputStream in = new ByteArrayInputStream(bout.toByteArray()); ByteArrayInputStream in = new ByteArrayInputStream(bout.toByteArray());
when(connection.getInputStream()).thenReturn(in); when(connection.getInputStream()).thenReturn(in);
// 8 < 10 therefore there appear to be extra bytes in the IFileInputStream // 8 < 10 therefore there appear to be extra bytes in the IFileInputStream
InMemoryMapOutput<Text,Text> mapOut = new InMemoryMapOutput<Text, Text>( IFileWrappedMapOutput<Text,Text> mapOut = new InMemoryMapOutput<Text, Text>(
job, map1ID, mm, 8, null, true ); job, map1ID, mm, 8, null, true );
InMemoryMapOutput<Text,Text> mapOut2 = new InMemoryMapOutput<Text, Text>( IFileWrappedMapOutput<Text,Text> mapOut2 = new InMemoryMapOutput<Text, Text>(
job, map2ID, mm, 10, null, true ); job, map2ID, mm, 10, null, true );
when(mm.reserve(eq(map1ID), anyLong(), anyInt())).thenReturn(mapOut); when(mm.reserve(eq(map1ID), anyLong(), anyInt())).thenReturn(mapOut);
@ -478,9 +476,9 @@ public class TestFetcher {
Path shuffledToDisk = Path shuffledToDisk =
OnDiskMapOutput.getTempPath(onDiskMapOutputPath, fetcher); OnDiskMapOutput.getTempPath(onDiskMapOutputPath, fetcher);
fs = FileSystem.getLocal(job).getRaw(); fs = FileSystem.getLocal(job).getRaw();
MapOutputFile mof = mock(MapOutputFile.class); IFileWrappedMapOutput<Text,Text> odmo =
OnDiskMapOutput<Text,Text> odmo = new OnDiskMapOutput<Text,Text>(map1ID, new OnDiskMapOutput<Text,Text>(map1ID, mm, 100L, job, fetcher, true,
id, mm, 100L, job, mof, fetcher, true, fs, onDiskMapOutputPath); fs, onDiskMapOutputPath);
String mapData = "MAPDATA12345678901234567890"; String mapData = "MAPDATA12345678901234567890";
@ -538,7 +536,7 @@ public class TestFetcher {
@Test(timeout=10000) @Test(timeout=10000)
public void testInterruptInMemory() throws Exception { public void testInterruptInMemory() throws Exception {
final int FETCHER = 2; final int FETCHER = 2;
InMemoryMapOutput<Text,Text> immo = spy(new InMemoryMapOutput<Text,Text>( IFileWrappedMapOutput<Text,Text> immo = spy(new InMemoryMapOutput<Text,Text>(
job, id, mm, 100, null, true)); job, id, mm, 100, null, true));
when(mm.reserve(any(TaskAttemptID.class), anyLong(), anyInt())) when(mm.reserve(any(TaskAttemptID.class), anyLong(), anyInt()))
.thenReturn(immo); .thenReturn(immo);
@ -584,10 +582,9 @@ public class TestFetcher {
Path p = new Path("file:///tmp/foo"); Path p = new Path("file:///tmp/foo");
Path pTmp = OnDiskMapOutput.getTempPath(p, FETCHER); Path pTmp = OnDiskMapOutput.getTempPath(p, FETCHER);
FileSystem mFs = mock(FileSystem.class, RETURNS_DEEP_STUBS); FileSystem mFs = mock(FileSystem.class, RETURNS_DEEP_STUBS);
MapOutputFile mof = mock(MapOutputFile.class); IFileWrappedMapOutput<Text,Text> odmo =
when(mof.getInputFileForWrite(any(TaskID.class), anyLong())).thenReturn(p); spy(new OnDiskMapOutput<Text,Text>(map1ID, mm, 100L, job,
OnDiskMapOutput<Text,Text> odmo = spy(new OnDiskMapOutput<Text,Text>(map1ID, FETCHER, true, mFs, p));
id, mm, 100L, job, mof, FETCHER, true, mFs, p));
when(mm.reserve(any(TaskAttemptID.class), anyLong(), anyInt())) when(mm.reserve(any(TaskAttemptID.class), anyLong(), anyInt()))
.thenReturn(odmo); .thenReturn(odmo);
doNothing().when(mm).waitForResource(); doNothing().when(mm).waitForResource();