HDDS-1082. OutOfMemoryError because of memory leak in KeyInputStream. Contributed by Supratim Deka.

This commit is contained in:
Mukul Kumar Singh 2019-02-15 20:09:15 +05:30
parent 9385ec45d7
commit 9584b47e03
1 changed files with 100 additions and 19 deletions

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.hdds.scm.storage;
import com.google.common.base.Preconditions;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.scm.XceiverClientReply;
@ -105,8 +106,21 @@ public class BlockInputStream extends InputStream implements Seekable {
throws IOException {
checkOpen();
int available = prepareRead(1);
return available == EOF ? EOF :
Byte.toUnsignedInt(buffers.get(bufferIndex).get());
int dataout = EOF;
if (available == EOF) {
Preconditions.checkState (buffers == null); //should have released by now, see below
} else {
dataout = Byte.toUnsignedInt(buffers.get(bufferIndex).get());
}
if (blockStreamEOF()) {
// consumer might use getPos to determine EOF,
// so release buffers when serving the last byte of data
releaseBuffers();
}
return dataout;
}
@Override
@ -135,15 +149,45 @@ public class BlockInputStream extends InputStream implements Seekable {
while (len > 0) {
int available = prepareRead(len);
if (available == EOF) {
Preconditions.checkState(buffers == null); //should have been released by now
return total != 0 ? total : EOF;
}
buffers.get(bufferIndex).get(b, off + total, available);
len -= available;
total += available;
}
if (blockStreamEOF()) {
// smart consumers determine EOF by calling getPos()
// so we release buffers when serving the final bytes of data
releaseBuffers();
}
return total;
}
/**
* Determines if all data in the stream has been consumed
*
* @return true if EOF, false if more data is available
*/
private boolean blockStreamEOF() {
if (buffersHaveData() || chunksRemaining()) {
return false;
} else {
// if there are any chunks, we better be at the last chunk for EOF
Preconditions.checkState (((chunks == null) || chunks.isEmpty() ||
chunkIndex == (chunks.size() - 1)), "EOF detected, but not at the last chunk");
return true;
}
}
private void releaseBuffers() {
//ashes to ashes, dust to dust
buffers = null;
bufferIndex = 0;
}
@Override
public synchronized void close() {
if (xceiverClientManager != null && xceiverClient != null) {
@ -173,23 +217,11 @@ public class BlockInputStream extends InputStream implements Seekable {
*/
private synchronized int prepareRead(int len) throws IOException {
for (;;) {
if (chunks == null || chunks.isEmpty()) {
// This must be an empty key.
return EOF;
} else if (buffers == null) {
// The first read triggers fetching the first chunk.
readChunkFromContainer();
} else if (!buffers.isEmpty() &&
buffers.get(bufferIndex).hasRemaining()) {
// Data is available from the current buffer.
if (buffersHaveData()) {
// Data is available from buffers
ByteBuffer bb = buffers.get(bufferIndex);
return len > bb.remaining() ? bb.remaining() : len;
} else if (!buffers.isEmpty() &&
!buffers.get(bufferIndex).hasRemaining() &&
bufferIndex < buffers.size() - 1) {
// There are additional buffers available.
++bufferIndex;
} else if (chunkIndex < chunks.size() - 1) {
} else if (chunksRemaining()) {
// There are additional chunks available.
readChunkFromContainer();
} else {
@ -199,6 +231,44 @@ public class BlockInputStream extends InputStream implements Seekable {
}
}
private boolean buffersHaveData() {
boolean hasData = false;
if (buffers == null || buffers.isEmpty()) {
return false;
}
while (bufferIndex < (buffers.size())) {
if (buffers.get(bufferIndex).hasRemaining()) {
// current buffer has data
hasData = true;
break;
} else {
if (buffersRemaining()) {
// move to next available buffer
++bufferIndex;
Preconditions.checkState (bufferIndex < buffers.size());
} else {
// no more buffers remaining
break;
}
}
}
return hasData;
}
private boolean buffersRemaining() {
return (bufferIndex < (buffers.size() - 1));
}
private boolean chunksRemaining() {
if ((chunks == null) || chunks.isEmpty()) {
return false;
}
return (chunkIndex < (chunks.size() - 1));
}
/**
* Attempts to read the chunk at the specified offset in the chunk list. If
* successful, then the data of the read chunk is saved so that its bytes can
@ -311,8 +381,19 @@ public class BlockInputStream extends InputStream implements Seekable {
@Override
public synchronized long getPos() throws IOException {
return chunkIndex == -1 ? 0 :
chunkOffset[chunkIndex] + buffers.get(bufferIndex).position();
if (chunkIndex == -1) {
// no data consumed yet, a new stream OR after seek
return 0;
}
if (blockStreamEOF()) {
// all data consumed, buffers have been released.
// get position from the chunk offset and chunk length of last chunk
return chunkOffset[chunkIndex] + chunks.get(chunkIndex).getLen();
}
// get position from available buffers of current chunk
return chunkOffset[chunkIndex] + buffers.get(bufferIndex).position();
}
@Override