diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java index 001fedab1dd..d53d20e1a79 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hdds.scm.storage; +import com.google.common.base.Preconditions; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; import org.apache.hadoop.hdds.scm.XceiverClientReply; @@ -105,8 +106,21 @@ public class BlockInputStream extends InputStream implements Seekable { throws IOException { checkOpen(); int available = prepareRead(1); - return available == EOF ? EOF : - Byte.toUnsignedInt(buffers.get(bufferIndex).get()); + int dataout = EOF; + + if (available == EOF) { + Preconditions.checkState (buffers == null); //should have released by now, see below + } else { + dataout = Byte.toUnsignedInt(buffers.get(bufferIndex).get()); + } + + if (blockStreamEOF()) { + // consumer might use getPos to determine EOF, + // so release buffers when serving the last byte of data + releaseBuffers(); + } + + return dataout; } @Override @@ -135,15 +149,45 @@ public class BlockInputStream extends InputStream implements Seekable { while (len > 0) { int available = prepareRead(len); if (available == EOF) { + Preconditions.checkState(buffers == null); //should have been released by now return total != 0 ? total : EOF; } buffers.get(bufferIndex).get(b, off + total, available); len -= available; total += available; } + + if (blockStreamEOF()) { + // smart consumers determine EOF by calling getPos() + // so we release buffers when serving the final bytes of data + releaseBuffers(); + } + return total; } + /** + * Determines if all data in the stream has been consumed + * + * @return true if EOF, false if more data is available + */ + private boolean blockStreamEOF() { + if (buffersHaveData() || chunksRemaining()) { + return false; + } else { + // if there are any chunks, we better be at the last chunk for EOF + Preconditions.checkState (((chunks == null) || chunks.isEmpty() || + chunkIndex == (chunks.size() - 1)), "EOF detected, but not at the last chunk"); + return true; + } + } + + private void releaseBuffers() { + //ashes to ashes, dust to dust + buffers = null; + bufferIndex = 0; + } + @Override public synchronized void close() { if (xceiverClientManager != null && xceiverClient != null) { @@ -173,23 +217,11 @@ public class BlockInputStream extends InputStream implements Seekable { */ private synchronized int prepareRead(int len) throws IOException { for (;;) { - if (chunks == null || chunks.isEmpty()) { - // This must be an empty key. - return EOF; - } else if (buffers == null) { - // The first read triggers fetching the first chunk. - readChunkFromContainer(); - } else if (!buffers.isEmpty() && - buffers.get(bufferIndex).hasRemaining()) { - // Data is available from the current buffer. + if (buffersHaveData()) { + // Data is available from buffers ByteBuffer bb = buffers.get(bufferIndex); return len > bb.remaining() ? bb.remaining() : len; - } else if (!buffers.isEmpty() && - !buffers.get(bufferIndex).hasRemaining() && - bufferIndex < buffers.size() - 1) { - // There are additional buffers available. - ++bufferIndex; - } else if (chunkIndex < chunks.size() - 1) { + } else if (chunksRemaining()) { // There are additional chunks available. readChunkFromContainer(); } else { @@ -199,6 +231,44 @@ public class BlockInputStream extends InputStream implements Seekable { } } + private boolean buffersHaveData() { + boolean hasData = false; + + if (buffers == null || buffers.isEmpty()) { + return false; + } + + while (bufferIndex < (buffers.size())) { + if (buffers.get(bufferIndex).hasRemaining()) { + // current buffer has data + hasData = true; + break; + } else { + if (buffersRemaining()) { + // move to next available buffer + ++bufferIndex; + Preconditions.checkState (bufferIndex < buffers.size()); + } else { + // no more buffers remaining + break; + } + } + } + + return hasData; + } + + private boolean buffersRemaining() { + return (bufferIndex < (buffers.size() - 1)); + } + + private boolean chunksRemaining() { + if ((chunks == null) || chunks.isEmpty()) { + return false; + } + return (chunkIndex < (chunks.size() - 1)); + } + /** * Attempts to read the chunk at the specified offset in the chunk list. If * successful, then the data of the read chunk is saved so that its bytes can @@ -311,8 +381,19 @@ public class BlockInputStream extends InputStream implements Seekable { @Override public synchronized long getPos() throws IOException { - return chunkIndex == -1 ? 0 : - chunkOffset[chunkIndex] + buffers.get(bufferIndex).position(); + if (chunkIndex == -1) { + // no data consumed yet, a new stream OR after seek + return 0; + } + + if (blockStreamEOF()) { + // all data consumed, buffers have been released. + // get position from the chunk offset and chunk length of last chunk + return chunkOffset[chunkIndex] + chunks.get(chunkIndex).getLen(); + } + + // get position from available buffers of current chunk + return chunkOffset[chunkIndex] + buffers.get(bufferIndex).position(); } @Override