MAPREDUCE-6628. Potential memory leak in CryptoOutputStream. Contributed by Mariappan Asokan

(cherry picked from commit 9f192cc5ac)
This commit is contained in:
Chris Douglas 2016-09-09 11:12:44 -07:00
parent 6a016faf59
commit 806a8dea38
5 changed files with 159 additions and 24 deletions

View File

@ -76,6 +76,7 @@ public class CryptoOutputStream extends FilterOutputStream implements
private final byte[] key;
private final byte[] initIV;
private byte[] iv;
private boolean closeOutputStream;
public CryptoOutputStream(OutputStream out, CryptoCodec codec,
int bufferSize, byte[] key, byte[] iv) throws IOException {
@ -85,6 +86,13 @@ public class CryptoOutputStream extends FilterOutputStream implements
public CryptoOutputStream(OutputStream out, CryptoCodec codec,
int bufferSize, byte[] key, byte[] iv, long streamOffset)
throws IOException {
this(out, codec, bufferSize, key, iv, streamOffset, true);
}
public CryptoOutputStream(OutputStream out, CryptoCodec codec,
int bufferSize, byte[] key, byte[] iv, long streamOffset,
boolean closeOutputStream)
throws IOException {
super(out);
CryptoStreamUtils.checkCodec(codec);
this.bufferSize = CryptoStreamUtils.checkBufferSize(codec, bufferSize);
@ -95,6 +103,7 @@ public class CryptoOutputStream extends FilterOutputStream implements
inBuffer = ByteBuffer.allocateDirect(this.bufferSize);
outBuffer = ByteBuffer.allocateDirect(this.bufferSize);
this.streamOffset = streamOffset;
this.closeOutputStream = closeOutputStream;
try {
encryptor = codec.createEncryptor();
} catch (GeneralSecurityException e) {
@ -110,8 +119,14 @@ public class CryptoOutputStream extends FilterOutputStream implements
public CryptoOutputStream(OutputStream out, CryptoCodec codec,
byte[] key, byte[] iv, long streamOffset) throws IOException {
this(out, codec, key, iv, streamOffset, true);
}
public CryptoOutputStream(OutputStream out, CryptoCodec codec,
byte[] key, byte[] iv, long streamOffset, boolean closeOutputStream)
throws IOException {
this(out, codec, CryptoStreamUtils.getBufferSize(codec.getConf()),
key, iv, streamOffset);
key, iv, streamOffset, closeOutputStream);
}
public OutputStream getWrappedStream() {
@ -221,7 +236,10 @@ public class CryptoOutputStream extends FilterOutputStream implements
return;
}
try {
flush();
if (closeOutputStream) {
super.close();
}
freeBuffers();
} finally {
closed = true;

View File

@ -28,8 +28,14 @@ public class CryptoFSDataOutputStream extends FSDataOutputStream {
public CryptoFSDataOutputStream(FSDataOutputStream out, CryptoCodec codec,
int bufferSize, byte[] key, byte[] iv) throws IOException {
this(out, codec, bufferSize, key, iv, true);
}
public CryptoFSDataOutputStream(FSDataOutputStream out, CryptoCodec codec,
int bufferSize, byte[] key, byte[] iv, boolean closeOutputStream)
throws IOException {
super(new CryptoOutputStream(out, codec, bufferSize, key, iv,
out.getPos()), null, out.getPos());
out.getPos(), closeOutputStream), null, out.getPos());
this.fsOut = out;
}

View File

@ -0,0 +1,57 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.crypto;
import java.io.OutputStream;
import org.apache.hadoop.conf.Configuration;
import org.junit.BeforeClass;
import org.junit.Test;
import static org.mockito.Mockito.*;
/**
* To test proper closing of underlying stream of CryptoOutputStream.
*/
public class TestCryptoOutputStreamClosing {
private static CryptoCodec codec;
@BeforeClass
public static void init() throws Exception {
codec = CryptoCodec.getInstance(new Configuration());
}
@Test
public void testOutputStreamClosing() throws Exception {
OutputStream outputStream = mock(OutputStream.class);
CryptoOutputStream cos = new CryptoOutputStream(outputStream, codec,
new byte[16], new byte[16], 0L, true);
cos.close();
verify(outputStream).close();
}
@Test
public void testOutputStreamNotClosing() throws Exception {
OutputStream outputStream = mock(OutputStream.class);
CryptoOutputStream cos = new CryptoOutputStream(outputStream, codec,
new byte[16], new byte[16], 0L, false);
cos.close();
verify(outputStream, never()).close();
}
}

View File

@ -1587,6 +1587,7 @@ public class MapTask extends Task {
final long size = distanceTo(bufstart, bufend, bufvoid) +
partitions * APPROX_HEADER_LENGTH;
FSDataOutputStream out = null;
FSDataOutputStream partitionOut = null;
try {
// create spill file
final SpillRecord spillRec = new SpillRecord(partitions);
@ -1607,7 +1608,7 @@ public class MapTask extends Task {
IFile.Writer<K, V> writer = null;
try {
long segmentStart = out.getPos();
FSDataOutputStream partitionOut = CryptoUtils.wrapIfNecessary(job, out);
partitionOut = CryptoUtils.wrapIfNecessary(job, out, false);
writer = new Writer<K, V>(job, partitionOut, keyClass, valClass, codec,
spilledRecordsCounter);
if (combinerRunner == null) {
@ -1642,6 +1643,10 @@ public class MapTask extends Task {
// close the writer
writer.close();
if (partitionOut != out) {
partitionOut.close();
partitionOut = null;
}
// record offsets
rec.startOffset = segmentStart;
@ -1670,6 +1675,9 @@ public class MapTask extends Task {
++numSpills;
} finally {
if (out != null) out.close();
if (partitionOut != null) {
partitionOut.close();
}
}
}
@ -1682,6 +1690,7 @@ public class MapTask extends Task {
int partition) throws IOException {
long size = kvbuffer.length + partitions * APPROX_HEADER_LENGTH;
FSDataOutputStream out = null;
FSDataOutputStream partitionOut = null;
try {
// create spill file
final SpillRecord spillRec = new SpillRecord(partitions);
@ -1696,7 +1705,7 @@ public class MapTask extends Task {
try {
long segmentStart = out.getPos();
// Create a new codec, don't care!
FSDataOutputStream partitionOut = CryptoUtils.wrapIfNecessary(job, out);
partitionOut = CryptoUtils.wrapIfNecessary(job, out, false);
writer = new IFile.Writer<K,V>(job, partitionOut, keyClass, valClass, codec,
spilledRecordsCounter);
@ -1708,6 +1717,10 @@ public class MapTask extends Task {
mapOutputByteCounter.increment(out.getPos() - recordStart);
}
writer.close();
if (partitionOut != out) {
partitionOut.close();
partitionOut = null;
}
// record offsets
rec.startOffset = segmentStart;
@ -1735,6 +1748,9 @@ public class MapTask extends Task {
++numSpills;
} finally {
if (out != null) out.close();
if (partitionOut != null) {
partitionOut.close();
}
}
}
@ -1846,6 +1862,7 @@ public class MapTask extends Task {
//The output stream for the final single output file
FSDataOutputStream finalOut = rfs.create(finalOutputFile, true, 4096);
FSDataOutputStream finalPartitionOut = null;
if (numSpills == 0) {
//create dummy files
@ -1854,10 +1871,15 @@ public class MapTask extends Task {
try {
for (int i = 0; i < partitions; i++) {
long segmentStart = finalOut.getPos();
FSDataOutputStream finalPartitionOut = CryptoUtils.wrapIfNecessary(job, finalOut);
finalPartitionOut = CryptoUtils.wrapIfNecessary(job, finalOut,
false);
Writer<K, V> writer =
new Writer<K, V>(job, finalPartitionOut, keyClass, valClass, codec, null);
writer.close();
if (finalPartitionOut != finalOut) {
finalPartitionOut.close();
finalPartitionOut = null;
}
rec.startOffset = segmentStart;
rec.rawLength = writer.getRawLength() + CryptoUtils.cryptoPadding(job);
rec.partLength = writer.getCompressedLength() + CryptoUtils.cryptoPadding(job);
@ -1866,6 +1888,9 @@ public class MapTask extends Task {
sr.writeToFile(finalIndexFile, job);
} finally {
finalOut.close();
if (finalPartitionOut != null) {
finalPartitionOut.close();
}
}
sortPhase.complete();
return;
@ -1909,7 +1934,7 @@ public class MapTask extends Task {
//write merged output to disk
long segmentStart = finalOut.getPos();
FSDataOutputStream finalPartitionOut = CryptoUtils.wrapIfNecessary(job, finalOut);
finalPartitionOut = CryptoUtils.wrapIfNecessary(job, finalOut, false);
Writer<K, V> writer =
new Writer<K, V>(job, finalPartitionOut, keyClass, valClass, codec,
spilledRecordsCounter);
@ -1922,6 +1947,10 @@ public class MapTask extends Task {
//close
writer.close();
if (finalPartitionOut != finalOut) {
finalPartitionOut.close();
finalPartitionOut = null;
}
sortPhase.startNextPhase();
@ -1933,6 +1962,9 @@ public class MapTask extends Task {
}
spillRec.writeToFile(finalIndexFile, job);
finalOut.close();
if (finalPartitionOut != null) {
finalPartitionOut.close();
}
for(int i = 0; i < numSpills; i++) {
rfs.delete(filename[i],true);
}

View File

@ -57,9 +57,9 @@ public class CryptoUtils {
/**
* This method creates and initializes an IV (Initialization Vector)
*
* @param conf
* @return byte[]
* @throws IOException
* @param conf configuration
* @return byte[] initialization vector
* @throws IOException exception in case of error
*/
public static byte[] createIV(Configuration conf) throws IOException {
CryptoCodec cryptoCodec = CryptoCodec.getInstance(conf);
@ -94,13 +94,33 @@ public class CryptoUtils {
* "mapreduce.job.encrypted-intermediate-data.buffer.kb" Job configuration
* variable.
*
* @param conf
* @param out
* @return FSDataOutputStream
* @throws IOException
* @param conf configuration
* @param out given output stream
* @return FSDataOutputStream encrypted output stream if encryption is
* enabled; otherwise the given output stream itself
* @throws IOException exception in case of error
*/
public static FSDataOutputStream wrapIfNecessary(Configuration conf,
FSDataOutputStream out) throws IOException {
return wrapIfNecessary(conf, out, true);
}
/**
* Wraps a given FSDataOutputStream with a CryptoOutputStream. The size of the
* data buffer required for the stream is specified by the
* "mapreduce.job.encrypted-intermediate-data.buffer.kb" Job configuration
* variable.
*
* @param conf configuration
* @param out given output stream
* @param closeOutputStream flag to indicate whether closing the wrapped
* stream will close the given output stream
* @return FSDataOutputStream encrypted output stream if encryption is
* enabled; otherwise the given output stream itself
* @throws IOException exception in case of error
*/
public static FSDataOutputStream wrapIfNecessary(Configuration conf,
FSDataOutputStream out, boolean closeOutputStream) throws IOException {
if (isEncryptedSpillEnabled(conf)) {
out.write(ByteBuffer.allocate(8).putLong(out.getPos()).array());
byte[] iv = createIV(conf);
@ -110,7 +130,7 @@ public class CryptoUtils {
+ Base64.encodeBase64URLSafeString(iv) + "]");
}
return new CryptoFSDataOutputStream(out, CryptoCodec.getInstance(conf),
getBufferSize(conf), getEncryptionKey(), iv);
getBufferSize(conf), getEncryptionKey(), iv, closeOutputStream);
} else {
return out;
}
@ -128,11 +148,12 @@ public class CryptoUtils {
* LimitInputStream will ensure that the CryptoStream does not read past the
* provided length from the given Input Stream.
*
* @param conf
* @param in
* @param length
* @return InputStream
* @throws IOException
* @param conf configuration
* @param in given input stream
* @param length maximum number of bytes to read from the input stream
* @return InputStream encrypted input stream if encryption is
* enabled; otherwise the given input stream itself
* @throws IOException exception in case of error
*/
public static InputStream wrapIfNecessary(Configuration conf, InputStream in,
long length) throws IOException {
@ -166,10 +187,11 @@ public class CryptoUtils {
* "mapreduce.job.encrypted-intermediate-data.buffer.kb" Job configuration
* variable.
*
* @param conf
* @param in
* @return FSDataInputStream
* @throws IOException
* @param conf configuration
* @param in given input stream
* @return FSDataInputStream encrypted input stream if encryption is
* enabled; otherwise the given input stream itself
* @throws IOException exception in case of error
*/
public static FSDataInputStream wrapIfNecessary(Configuration conf,
FSDataInputStream in) throws IOException {