From 806a8dea3894c22940a640b539782dd2c04df1f3 Mon Sep 17 00:00:00 2001 From: Chris Douglas Date: Fri, 9 Sep 2016 11:12:44 -0700 Subject: [PATCH] MAPREDUCE-6628. Potential memory leak in CryptoOutputStream. Contributed by Mariappan Asokan (cherry picked from commit 9f192cc5ac4a6145e2eeaecba0a754d31e601898) --- .../hadoop/crypto/CryptoOutputStream.java | 22 ++++++- .../fs/crypto/CryptoFSDataOutputStream.java | 8 ++- .../crypto/TestCryptoOutputStreamClosing.java | 57 +++++++++++++++++++ .../org/apache/hadoop/mapred/MapTask.java | 40 +++++++++++-- .../apache/hadoop/mapreduce/CryptoUtils.java | 56 ++++++++++++------ 5 files changed, 159 insertions(+), 24 deletions(-) create mode 100644 hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/TestCryptoOutputStreamClosing.java diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/CryptoOutputStream.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/CryptoOutputStream.java index 1753019cad2..ab51bfe884a 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/CryptoOutputStream.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/CryptoOutputStream.java @@ -76,6 +76,7 @@ public class CryptoOutputStream extends FilterOutputStream implements private final byte[] key; private final byte[] initIV; private byte[] iv; + private boolean closeOutputStream; public CryptoOutputStream(OutputStream out, CryptoCodec codec, int bufferSize, byte[] key, byte[] iv) throws IOException { @@ -85,6 +86,13 @@ public class CryptoOutputStream extends FilterOutputStream implements public CryptoOutputStream(OutputStream out, CryptoCodec codec, int bufferSize, byte[] key, byte[] iv, long streamOffset) throws IOException { + this(out, codec, bufferSize, key, iv, streamOffset, true); + } + + public CryptoOutputStream(OutputStream out, CryptoCodec codec, + int bufferSize, byte[] key, byte[] iv, long streamOffset, + boolean closeOutputStream) + throws IOException { super(out); CryptoStreamUtils.checkCodec(codec); this.bufferSize = CryptoStreamUtils.checkBufferSize(codec, bufferSize); @@ -95,6 +103,7 @@ public class CryptoOutputStream extends FilterOutputStream implements inBuffer = ByteBuffer.allocateDirect(this.bufferSize); outBuffer = ByteBuffer.allocateDirect(this.bufferSize); this.streamOffset = streamOffset; + this.closeOutputStream = closeOutputStream; try { encryptor = codec.createEncryptor(); } catch (GeneralSecurityException e) { @@ -110,8 +119,14 @@ public class CryptoOutputStream extends FilterOutputStream implements public CryptoOutputStream(OutputStream out, CryptoCodec codec, byte[] key, byte[] iv, long streamOffset) throws IOException { + this(out, codec, key, iv, streamOffset, true); + } + + public CryptoOutputStream(OutputStream out, CryptoCodec codec, + byte[] key, byte[] iv, long streamOffset, boolean closeOutputStream) + throws IOException { this(out, codec, CryptoStreamUtils.getBufferSize(codec.getConf()), - key, iv, streamOffset); + key, iv, streamOffset, closeOutputStream); } public OutputStream getWrappedStream() { @@ -221,7 +236,10 @@ public class CryptoOutputStream extends FilterOutputStream implements return; } try { - super.close(); + flush(); + if (closeOutputStream) { + super.close(); + } freeBuffers(); } finally { closed = true; diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/crypto/CryptoFSDataOutputStream.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/crypto/CryptoFSDataOutputStream.java index 3beb3616191..6450bc322a7 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/crypto/CryptoFSDataOutputStream.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/crypto/CryptoFSDataOutputStream.java @@ -28,8 +28,14 @@ public class CryptoFSDataOutputStream extends FSDataOutputStream { public CryptoFSDataOutputStream(FSDataOutputStream out, CryptoCodec codec, int bufferSize, byte[] key, byte[] iv) throws IOException { + this(out, codec, bufferSize, key, iv, true); + } + + public CryptoFSDataOutputStream(FSDataOutputStream out, CryptoCodec codec, + int bufferSize, byte[] key, byte[] iv, boolean closeOutputStream) + throws IOException { super(new CryptoOutputStream(out, codec, bufferSize, key, iv, - out.getPos()), null, out.getPos()); + out.getPos(), closeOutputStream), null, out.getPos()); this.fsOut = out; } diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/TestCryptoOutputStreamClosing.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/TestCryptoOutputStreamClosing.java new file mode 100644 index 00000000000..39e4bb85880 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/TestCryptoOutputStreamClosing.java @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.crypto; + +import java.io.OutputStream; + +import org.apache.hadoop.conf.Configuration; + +import org.junit.BeforeClass; +import org.junit.Test; +import static org.mockito.Mockito.*; + +/** + * To test proper closing of underlying stream of CryptoOutputStream. + */ +public class TestCryptoOutputStreamClosing { + private static CryptoCodec codec; + + @BeforeClass + public static void init() throws Exception { + codec = CryptoCodec.getInstance(new Configuration()); + } + + @Test + public void testOutputStreamClosing() throws Exception { + OutputStream outputStream = mock(OutputStream.class); + CryptoOutputStream cos = new CryptoOutputStream(outputStream, codec, + new byte[16], new byte[16], 0L, true); + cos.close(); + verify(outputStream).close(); + } + + @Test + public void testOutputStreamNotClosing() throws Exception { + OutputStream outputStream = mock(OutputStream.class); + CryptoOutputStream cos = new CryptoOutputStream(outputStream, codec, + new byte[16], new byte[16], 0L, false); + cos.close(); + verify(outputStream, never()).close(); + } + +} diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java index c4957b75517..82b82545c6f 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java @@ -1587,6 +1587,7 @@ public class MapTask extends Task { final long size = distanceTo(bufstart, bufend, bufvoid) + partitions * APPROX_HEADER_LENGTH; FSDataOutputStream out = null; + FSDataOutputStream partitionOut = null; try { // create spill file final SpillRecord spillRec = new SpillRecord(partitions); @@ -1607,7 +1608,7 @@ public class MapTask extends Task { IFile.Writer writer = null; try { long segmentStart = out.getPos(); - FSDataOutputStream partitionOut = CryptoUtils.wrapIfNecessary(job, out); + partitionOut = CryptoUtils.wrapIfNecessary(job, out, false); writer = new Writer(job, partitionOut, keyClass, valClass, codec, spilledRecordsCounter); if (combinerRunner == null) { @@ -1642,6 +1643,10 @@ public class MapTask extends Task { // close the writer writer.close(); + if (partitionOut != out) { + partitionOut.close(); + partitionOut = null; + } // record offsets rec.startOffset = segmentStart; @@ -1670,6 +1675,9 @@ public class MapTask extends Task { ++numSpills; } finally { if (out != null) out.close(); + if (partitionOut != null) { + partitionOut.close(); + } } } @@ -1682,6 +1690,7 @@ public class MapTask extends Task { int partition) throws IOException { long size = kvbuffer.length + partitions * APPROX_HEADER_LENGTH; FSDataOutputStream out = null; + FSDataOutputStream partitionOut = null; try { // create spill file final SpillRecord spillRec = new SpillRecord(partitions); @@ -1696,7 +1705,7 @@ public class MapTask extends Task { try { long segmentStart = out.getPos(); // Create a new codec, don't care! - FSDataOutputStream partitionOut = CryptoUtils.wrapIfNecessary(job, out); + partitionOut = CryptoUtils.wrapIfNecessary(job, out, false); writer = new IFile.Writer(job, partitionOut, keyClass, valClass, codec, spilledRecordsCounter); @@ -1708,6 +1717,10 @@ public class MapTask extends Task { mapOutputByteCounter.increment(out.getPos() - recordStart); } writer.close(); + if (partitionOut != out) { + partitionOut.close(); + partitionOut = null; + } // record offsets rec.startOffset = segmentStart; @@ -1735,6 +1748,9 @@ public class MapTask extends Task { ++numSpills; } finally { if (out != null) out.close(); + if (partitionOut != null) { + partitionOut.close(); + } } } @@ -1846,6 +1862,7 @@ public class MapTask extends Task { //The output stream for the final single output file FSDataOutputStream finalOut = rfs.create(finalOutputFile, true, 4096); + FSDataOutputStream finalPartitionOut = null; if (numSpills == 0) { //create dummy files @@ -1854,10 +1871,15 @@ public class MapTask extends Task { try { for (int i = 0; i < partitions; i++) { long segmentStart = finalOut.getPos(); - FSDataOutputStream finalPartitionOut = CryptoUtils.wrapIfNecessary(job, finalOut); + finalPartitionOut = CryptoUtils.wrapIfNecessary(job, finalOut, + false); Writer writer = new Writer(job, finalPartitionOut, keyClass, valClass, codec, null); writer.close(); + if (finalPartitionOut != finalOut) { + finalPartitionOut.close(); + finalPartitionOut = null; + } rec.startOffset = segmentStart; rec.rawLength = writer.getRawLength() + CryptoUtils.cryptoPadding(job); rec.partLength = writer.getCompressedLength() + CryptoUtils.cryptoPadding(job); @@ -1866,6 +1888,9 @@ public class MapTask extends Task { sr.writeToFile(finalIndexFile, job); } finally { finalOut.close(); + if (finalPartitionOut != null) { + finalPartitionOut.close(); + } } sortPhase.complete(); return; @@ -1909,7 +1934,7 @@ public class MapTask extends Task { //write merged output to disk long segmentStart = finalOut.getPos(); - FSDataOutputStream finalPartitionOut = CryptoUtils.wrapIfNecessary(job, finalOut); + finalPartitionOut = CryptoUtils.wrapIfNecessary(job, finalOut, false); Writer writer = new Writer(job, finalPartitionOut, keyClass, valClass, codec, spilledRecordsCounter); @@ -1922,6 +1947,10 @@ public class MapTask extends Task { //close writer.close(); + if (finalPartitionOut != finalOut) { + finalPartitionOut.close(); + finalPartitionOut = null; + } sortPhase.startNextPhase(); @@ -1933,6 +1962,9 @@ public class MapTask extends Task { } spillRec.writeToFile(finalIndexFile, job); finalOut.close(); + if (finalPartitionOut != null) { + finalPartitionOut.close(); + } for(int i = 0; i < numSpills; i++) { rfs.delete(filename[i],true); } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/CryptoUtils.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/CryptoUtils.java index c4130b1b07f..c05b6b08156 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/CryptoUtils.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/CryptoUtils.java @@ -57,9 +57,9 @@ public class CryptoUtils { /** * This method creates and initializes an IV (Initialization Vector) * - * @param conf - * @return byte[] - * @throws IOException + * @param conf configuration + * @return byte[] initialization vector + * @throws IOException exception in case of error */ public static byte[] createIV(Configuration conf) throws IOException { CryptoCodec cryptoCodec = CryptoCodec.getInstance(conf); @@ -94,13 +94,33 @@ public class CryptoUtils { * "mapreduce.job.encrypted-intermediate-data.buffer.kb" Job configuration * variable. * - * @param conf - * @param out - * @return FSDataOutputStream - * @throws IOException + * @param conf configuration + * @param out given output stream + * @return FSDataOutputStream encrypted output stream if encryption is + * enabled; otherwise the given output stream itself + * @throws IOException exception in case of error */ public static FSDataOutputStream wrapIfNecessary(Configuration conf, FSDataOutputStream out) throws IOException { + return wrapIfNecessary(conf, out, true); + } + + /** + * Wraps a given FSDataOutputStream with a CryptoOutputStream. The size of the + * data buffer required for the stream is specified by the + * "mapreduce.job.encrypted-intermediate-data.buffer.kb" Job configuration + * variable. + * + * @param conf configuration + * @param out given output stream + * @param closeOutputStream flag to indicate whether closing the wrapped + * stream will close the given output stream + * @return FSDataOutputStream encrypted output stream if encryption is + * enabled; otherwise the given output stream itself + * @throws IOException exception in case of error + */ + public static FSDataOutputStream wrapIfNecessary(Configuration conf, + FSDataOutputStream out, boolean closeOutputStream) throws IOException { if (isEncryptedSpillEnabled(conf)) { out.write(ByteBuffer.allocate(8).putLong(out.getPos()).array()); byte[] iv = createIV(conf); @@ -110,7 +130,7 @@ public class CryptoUtils { + Base64.encodeBase64URLSafeString(iv) + "]"); } return new CryptoFSDataOutputStream(out, CryptoCodec.getInstance(conf), - getBufferSize(conf), getEncryptionKey(), iv); + getBufferSize(conf), getEncryptionKey(), iv, closeOutputStream); } else { return out; } @@ -128,11 +148,12 @@ public class CryptoUtils { * LimitInputStream will ensure that the CryptoStream does not read past the * provided length from the given Input Stream. * - * @param conf - * @param in - * @param length - * @return InputStream - * @throws IOException + * @param conf configuration + * @param in given input stream + * @param length maximum number of bytes to read from the input stream + * @return InputStream encrypted input stream if encryption is + * enabled; otherwise the given input stream itself + * @throws IOException exception in case of error */ public static InputStream wrapIfNecessary(Configuration conf, InputStream in, long length) throws IOException { @@ -166,10 +187,11 @@ public class CryptoUtils { * "mapreduce.job.encrypted-intermediate-data.buffer.kb" Job configuration * variable. * - * @param conf - * @param in - * @return FSDataInputStream - * @throws IOException + * @param conf configuration + * @param in given input stream + * @return FSDataInputStream encrypted input stream if encryption is + * enabled; otherwise the given input stream itself + * @throws IOException exception in case of error */ public static FSDataInputStream wrapIfNecessary(Configuration conf, FSDataInputStream in) throws IOException {