diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java index 82fb1063d3b..bccbc9bdb96 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java @@ -19,145 +19,197 @@ package org.apache.hadoop.hdds.scm.storage; import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; -import org.apache.hadoop.hdds.protocol.DatanodeDetails; -import org.apache.hadoop.hdds.scm.container.common.helpers - .StorageContainerException; -import org.apache.hadoop.ozone.common.Checksum; -import org.apache.hadoop.ozone.common.ChecksumData; -import org.apache.hadoop.ozone.common.OzoneChecksumException; -import org.apache.ratis.thirdparty.com.google.protobuf.ByteString; + +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.hdds.scm.pipeline.Pipeline; +import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; import org.apache.hadoop.fs.Seekable; import org.apache.hadoop.hdds.scm.XceiverClientManager; import org.apache.hadoop.hdds.scm.XceiverClientSpi; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChunkInfo; -import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos - .ReadChunkResponseProto; -import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos. - ContainerCommandResponseProto; -import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos. - ContainerCommandRequestProto; import org.apache.hadoop.hdds.client.BlockID; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.DatanodeBlockID; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.GetBlockResponseProto; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.io.EOFException; import java.io.IOException; import java.io.InputStream; -import java.nio.ByteBuffer; +import java.util.ArrayList; import java.util.Arrays; import java.util.List; /** - * An {@link InputStream} used by the REST service in combination with the - * SCMClient to read the value of a key from a sequence - * of container chunks. All bytes of the key value are stored in container - * chunks. Each chunk may contain multiple underlying {@link ByteBuffer} - * instances. This class encapsulates all state management for iterating - * through the sequence of chunks and the sequence of buffers within each chunk. + * An {@link InputStream} called from KeyInputStream to read a block from the + * container. + * This class encapsulates all state management for iterating + * through the sequence of chunks through {@link ChunkInputStream}. */ public class BlockInputStream extends InputStream implements Seekable { + private static final Logger LOG = + LoggerFactory.getLogger(BlockInputStream.class); + private static final int EOF = -1; private final BlockID blockID; + private final long length; + private Pipeline pipeline; + private final Token token; + private final boolean verifyChecksum; private final String traceID; private XceiverClientManager xceiverClientManager; private XceiverClientSpi xceiverClient; - private List chunks; - // ChunkIndex points to the index current chunk in the buffers or the the - // index of chunk which will be read next into the buffers in - // readChunkFromContainer(). + private boolean initialized = false; + + // List of ChunkInputStreams, one for each chunk in the block + private List chunkStreams; + + // chunkOffsets[i] stores the index of the first data byte in + // chunkStream i w.r.t the block data. + // Let’s say we have chunk size as 40 bytes. And let's say the parent + // block stores data from index 200 and has length 400. + // The first 40 bytes of this block will be stored in chunk[0], next 40 in + // chunk[1] and so on. But since the chunkOffsets are w.r.t the block only + // and not the key, the values in chunkOffsets will be [0, 40, 80,....]. + private long[] chunkOffsets = null; + + // Index of the chunkStream corresponding to the current position of the + // BlockInputStream i.e offset of the data to be read next from this block private int chunkIndex; - // ChunkIndexOfCurrentBuffer points to the index of chunk read into the - // buffers or index of the last chunk in the buffers. It is updated only - // when a new chunk is read from container into the buffers. - private int chunkIndexOfCurrentBuffer; - private long[] chunkOffset; - private List buffers; - private int bufferIndex; - private long bufferPosition; - private boolean verifyChecksum; + + // Position of the BlockInputStream is maintainted by this variable till + // the stream is initialized. This position is w.r.t to the block only and + // not the key. + // For the above example, if we seek to position 240 before the stream is + // initialized, then value of blockPosition will be set to 40. + // Once, the stream is initialized, the position of the stream + // will be determined by the current chunkStream and its position. + private long blockPosition = 0; + + // Tracks the chunkIndex corresponding to the last blockPosition so that it + // can be reset if a new position is seeked. + private int chunkIndexOfPrevPosition; + + public BlockInputStream(BlockID blockId, long blockLen, Pipeline pipeline, + Token token, boolean verifyChecksum, + String traceId, XceiverClientManager xceiverClientManager) { + this.blockID = blockId; + this.length = blockLen; + this.pipeline = pipeline; + this.token = token; + this.verifyChecksum = verifyChecksum; + this.traceID = traceId; + this.xceiverClientManager = xceiverClientManager; + } /** - * Creates a new BlockInputStream. - * - * @param blockID block ID of the chunk - * @param xceiverClientManager client manager that controls client - * @param xceiverClient client to perform container calls - * @param chunks list of chunks to read - * @param traceID container protocol call traceID - * @param verifyChecksum verify checksum - * @param initialPosition the initial position of the stream pointer. This - * position is seeked now if the up-stream was seeked - * before this was created. + * Initialize the BlockInputStream. Get the BlockData (list of chunks) from + * the Container and create the ChunkInputStreams for each Chunk in the Block. */ - public BlockInputStream( - BlockID blockID, XceiverClientManager xceiverClientManager, - XceiverClientSpi xceiverClient, List chunks, String traceID, - boolean verifyChecksum, long initialPosition) throws IOException { - this.blockID = blockID; - this.traceID = traceID; - this.xceiverClientManager = xceiverClientManager; - this.xceiverClient = xceiverClient; - this.chunks = chunks; - this.chunkIndex = 0; - this.chunkIndexOfCurrentBuffer = -1; - // chunkOffset[i] stores offset at which chunk i stores data in - // BlockInputStream - this.chunkOffset = new long[this.chunks.size()]; - initializeChunkOffset(); - this.buffers = null; - this.bufferIndex = 0; - this.bufferPosition = -1; - this.verifyChecksum = verifyChecksum; - if (initialPosition > 0) { - // The stream was seeked to a position before the stream was - // initialized. So seeking to the position now. - seek(initialPosition); + public synchronized void initialize() throws IOException { + + // Pre-check that the stream has not been intialized already + if (initialized) { + return; + } + + List chunks = getChunkInfos(); + if (chunks != null && !chunks.isEmpty()) { + // For each chunk in the block, create a ChunkInputStream and compute + // its chunkOffset + this.chunkOffsets = new long[chunks.size()]; + long tempOffset = 0; + + this.chunkStreams = new ArrayList<>(chunks.size()); + for (int i = 0; i < chunks.size(); i++) { + addStream(chunks.get(i)); + chunkOffsets[i] = tempOffset; + tempOffset += chunks.get(i).getLen(); + } + + initialized = true; + this.chunkIndex = 0; + + if (blockPosition > 0) { + // Stream was seeked to blockPosition before initialization. Seek to the + // blockPosition now. + seek(blockPosition); + } } } - private void initializeChunkOffset() { - long tempOffset = 0; - for (int i = 0; i < chunks.size(); i++) { - chunkOffset[i] = tempOffset; - tempOffset += chunks.get(i).getLen(); + /** + * Send RPC call to get the block info from the container. + * @return List of chunks in this block. + */ + protected List getChunkInfos() throws IOException { + // irrespective of the container state, we will always read via Standalone + // protocol. + if (pipeline.getType() != HddsProtos.ReplicationType.STAND_ALONE) { + pipeline = Pipeline.newBuilder(pipeline) + .setType(HddsProtos.ReplicationType.STAND_ALONE).build(); } + xceiverClient = xceiverClientManager.acquireClient(pipeline); + boolean success = false; + List chunks; + try { + LOG.debug("Initializing BlockInputStream for get key to access {}", + blockID.getContainerID()); + + if (token != null) { + UserGroupInformation.getCurrentUser().addToken(token); + } + DatanodeBlockID datanodeBlockID = blockID + .getDatanodeBlockIDProtobuf(); + GetBlockResponseProto response = ContainerProtocolCalls + .getBlock(xceiverClient, datanodeBlockID, traceID); + + chunks = response.getBlockData().getChunksList(); + success = true; + } finally { + if (!success) { + xceiverClientManager.releaseClient(xceiverClient, false); + } + } + + return chunks; } + /** + * Append another ChunkInputStream to the end of the list. Note that the + * ChunkInputStream is only created here. The chunk will be read from the + * Datanode only when a read operation is performed on for that chunk. + */ + protected synchronized void addStream(ChunkInfo chunkInfo) { + chunkStreams.add(new ChunkInputStream(chunkInfo, blockID, traceID, + xceiverClient, verifyChecksum)); + } + + public synchronized long getRemaining() throws IOException { + return length - getPos(); + } + + /** + * {@inheritDoc} + */ @Override - public synchronized int read() - throws IOException { - checkOpen(); - int available = prepareRead(1); - int dataout = EOF; - - if (available == EOF) { - Preconditions - .checkState(buffers == null); //should have released by now, see below - } else { - dataout = Byte.toUnsignedInt(buffers.get(bufferIndex).get()); + public synchronized int read() throws IOException { + byte[] buf = new byte[1]; + if (read(buf, 0, 1) == EOF) { + return EOF; } - - if (blockStreamEOF()) { - // consumer might use getPos to determine EOF, - // so release buffers when serving the last byte of data - releaseBuffers(); - } - - return dataout; + return Byte.toUnsignedInt(buf[0]); } + /** + * {@inheritDoc} + */ @Override public synchronized int read(byte[] b, int off, int len) throws IOException { - // According to the JavaDocs for InputStream, it is recommended that - // subclasses provide an override of bulk read if possible for performance - // reasons. In addition to performance, we need to do it for correctness - // reasons. The Ozone REST service uses PipedInputStream and - // PipedOutputStream to relay HTTP response data between a Jersey thread and - // a Netty thread. It turns out that PipedInputStream/PipedOutputStream - // have a subtle dependency (bug?) on the wrapped stream providing separate - // implementations of single-byte read and bulk read. Without this, get key - // responses might close the connection before writing all of the bytes - // advertised in the Content-Length. if (b == null) { throw new NullPointerException(); } @@ -167,50 +219,124 @@ public class BlockInputStream extends InputStream implements Seekable { if (len == 0) { return 0; } + + if (!initialized) { + initialize(); + } + checkOpen(); - int total = 0; + int totalReadLen = 0; while (len > 0) { - int available = prepareRead(len); - if (available == EOF) { - Preconditions - .checkState(buffers == null); //should have been released by now - return total != 0 ? total : EOF; + // if we are at the last chunk and have read the entire chunk, return + if (chunkStreams.size() == 0 || + (chunkStreams.size() - 1 <= chunkIndex && + chunkStreams.get(chunkIndex) + .getRemaining() == 0)) { + return totalReadLen == 0 ? EOF : totalReadLen; } - buffers.get(bufferIndex).get(b, off + total, available); - len -= available; - total += available; - } - if (blockStreamEOF()) { - // smart consumers determine EOF by calling getPos() - // so we release buffers when serving the final bytes of data - releaseBuffers(); + // Get the current chunkStream and read data from it + ChunkInputStream current = chunkStreams.get(chunkIndex); + int numBytesToRead = Math.min(len, (int)current.getRemaining()); + int numBytesRead = current.read(b, off, numBytesToRead); + if (numBytesRead != numBytesToRead) { + // This implies that there is either data loss or corruption in the + // chunk entries. Even EOF in the current stream would be covered in + // this case. + throw new IOException(String.format( + "Inconsistent read for chunkName=%s length=%d numBytesRead=%d", + current.getChunkName(), current.getLength(), numBytesRead)); + } + totalReadLen += numBytesRead; + off += numBytesRead; + len -= numBytesRead; + if (current.getRemaining() <= 0 && + ((chunkIndex + 1) < chunkStreams.size())) { + chunkIndex += 1; + } } - - return total; + return totalReadLen; } /** - * Determines if all data in the stream has been consumed. + * Seeks the BlockInputStream to the specified position. If the stream is + * not initialized, save the seeked position via blockPosition. Otherwise, + * update the position in 2 steps: + * 1. Updating the chunkIndex to the chunkStream corresponding to the + * seeked position. + * 2. Seek the corresponding chunkStream to the adjusted position. * - * @return true if EOF, false if more data is available + * Let’s say we have chunk size as 40 bytes. And let's say the parent block + * stores data from index 200 and has length 400. If the key was seeked to + * position 90, then this block will be seeked to position 90. + * When seek(90) is called on this blockStream, then + * 1. chunkIndex will be set to 2 (as indices 80 - 120 reside in chunk[2]). + * 2. chunkStream[2] will be seeked to position 10 + * (= 90 - chunkOffset[2] (= 80)). */ - protected boolean blockStreamEOF() { - if (buffersHaveData() || chunksRemaining()) { - return false; + @Override + public synchronized void seek(long pos) throws IOException { + if (!initialized) { + // Stream has not been initialized yet. Save the position so that it + // can be seeked when the stream is initialized. + blockPosition = pos; + return; + } + + checkOpen(); + if (pos < 0 || pos >= length) { + if (pos == 0) { + // It is possible for length and pos to be zero in which case + // seek should return instead of throwing exception + return; + } + throw new EOFException( + "EOF encountered at pos: " + pos + " for block: " + blockID); + } + + if (chunkIndex >= chunkStreams.size()) { + chunkIndex = Arrays.binarySearch(chunkOffsets, pos); + } else if (pos < chunkOffsets[chunkIndex]) { + chunkIndex = + Arrays.binarySearch(chunkOffsets, 0, chunkIndex, pos); + } else if (pos >= chunkOffsets[chunkIndex] + chunkStreams + .get(chunkIndex).getLength()) { + chunkIndex = Arrays.binarySearch(chunkOffsets, + chunkIndex + 1, chunkStreams.size(), pos); + } + if (chunkIndex < 0) { + // Binary search returns -insertionPoint - 1 if element is not present + // in the array. insertionPoint is the point at which element would be + // inserted in the sorted array. We need to adjust the chunkIndex + // accordingly so that chunkIndex = insertionPoint - 1 + chunkIndex = -chunkIndex - 2; + } + + // Reset the previous chunkStream's position + chunkStreams.get(chunkIndexOfPrevPosition).resetPosition(); + + // seek to the proper offset in the ChunkInputStream + chunkStreams.get(chunkIndex).seek(pos - chunkOffsets[chunkIndex]); + chunkIndexOfPrevPosition = chunkIndex; + } + + @Override + public synchronized long getPos() throws IOException { + if (length == 0) { + return 0; + } + + if (!initialized) { + // The stream is not initialized yet. Return the blockPosition + return blockPosition; } else { - // if there are any chunks, we better be at the last chunk for EOF - Preconditions.checkState(((chunks == null) || chunks.isEmpty() || - chunkIndex == (chunks.size() - 1)), - "EOF detected, but not at the last chunk"); - return true; + return chunkOffsets[chunkIndex] + chunkStreams.get(chunkIndex).getPos(); } } - private void releaseBuffers() { - //ashes to ashes, dust to dust - buffers = null; - bufferIndex = 0; + @Override + public boolean seekToNewSource(long targetPos) throws IOException { + return false; } @Override @@ -222,267 +348,41 @@ public class BlockInputStream extends InputStream implements Seekable { } } + public synchronized void resetPosition() { + this.blockPosition = 0; + } + /** - * Checks if the stream is open. If not, throws an exception. + * Checks if the stream is open. If not, throw an exception. * * @throws IOException if stream is closed */ - private synchronized void checkOpen() throws IOException { + protected synchronized void checkOpen() throws IOException { if (xceiverClient == null) { throw new IOException("BlockInputStream has been closed."); } } - /** - * Prepares to read by advancing through chunks and buffers as needed until it - * finds data to return or encounters EOF. - * - * @param len desired length of data to read - * @return length of data available to read, possibly less than desired length - */ - private synchronized int prepareRead(int len) throws IOException { - for (;;) { - if (!buffersAllocated()) { - // The current chunk at chunkIndex has not been read from the - // container. Read the chunk and put the data into buffers. - readChunkFromContainer(); - } - if (buffersHaveData()) { - // Data is available from buffers - ByteBuffer bb = buffers.get(bufferIndex); - return len > bb.remaining() ? bb.remaining() : len; - } else if (chunksRemaining()) { - // There are additional chunks available. - // Read the next chunk in the block. - chunkIndex += 1; - readChunkFromContainer(); - } else { - // All available input has been consumed. - return EOF; - } - } - } - - private boolean buffersAllocated() { - if (buffers == null || buffers.isEmpty()) { - return false; - } - return true; - } - - private boolean buffersHaveData() { - boolean hasData = false; - - if (buffersAllocated()) { - while (bufferIndex < (buffers.size())) { - if (buffers.get(bufferIndex).hasRemaining()) { - // current buffer has data - hasData = true; - break; - } else { - if (buffersRemaining()) { - // move to next available buffer - ++bufferIndex; - Preconditions.checkState(bufferIndex < buffers.size()); - } else { - // no more buffers remaining - break; - } - } - } - } - - return hasData; - } - - private boolean buffersRemaining() { - return (bufferIndex < (buffers.size() - 1)); - } - - private boolean chunksRemaining() { - if ((chunks == null) || chunks.isEmpty()) { - return false; - } - // Check if more chunks are remaining in the stream after chunkIndex - if (chunkIndex < (chunks.size() - 1)) { - return true; - } - // ChunkIndex is the last chunk in the stream. Check if this chunk has - // been read from container or not. Return true if chunkIndex has not - // been read yet and false otherwise. - return chunkIndexOfCurrentBuffer != chunkIndex; - } - - /** - * Attempts to read the chunk at the specified offset in the chunk list. If - * successful, then the data of the read chunk is saved so that its bytes can - * be returned from subsequent read calls. - * - * @throws IOException if there is an I/O error while performing the call - */ - private synchronized void readChunkFromContainer() throws IOException { - // Read the chunk at chunkIndex - final ChunkInfo chunkInfo = chunks.get(chunkIndex); - ByteString byteString; - byteString = readChunk(chunkInfo); - buffers = byteString.asReadOnlyByteBufferList(); - bufferIndex = 0; - chunkIndexOfCurrentBuffer = chunkIndex; - - // The bufferIndex and position might need to be adjusted if seek() was - // called on the stream before. This needs to be done so that the buffer - // position can be advanced to the 'seeked' position. - adjustBufferIndex(); - } - - /** - * Send RPC call to get the chunk from the container. - */ - @VisibleForTesting - protected ByteString readChunk(final ChunkInfo chunkInfo) - throws IOException { - ReadChunkResponseProto readChunkResponse; - try { - List validators = - ContainerProtocolCalls.getValidatorList(); - validators.add(validator); - readChunkResponse = ContainerProtocolCalls - .readChunk(xceiverClient, chunkInfo, blockID, traceID, validators); - } catch (IOException e) { - if (e instanceof StorageContainerException) { - throw e; - } - throw new IOException("Unexpected OzoneException: " + e.toString(), e); - } - return readChunkResponse.getData(); - } - - @VisibleForTesting - protected List getDatanodeList() { - return xceiverClient.getPipeline().getNodes(); - } - - private CheckedBiFunction validator = - (request, response) -> { - ReadChunkResponseProto readChunkResponse = response.getReadChunk(); - final ChunkInfo chunkInfo = readChunkResponse.getChunkData(); - ByteString byteString = readChunkResponse.getData(); - if (byteString.size() != chunkInfo.getLen()) { - // Bytes read from chunk should be equal to chunk size. - throw new OzoneChecksumException(String - .format("Inconsistent read for chunk=%s len=%d bytesRead=%d", - chunkInfo.getChunkName(), chunkInfo.getLen(), - byteString.size())); - } - ChecksumData checksumData = - ChecksumData.getFromProtoBuf(chunkInfo.getChecksumData()); - if (verifyChecksum) { - Checksum.verifyChecksum(byteString, checksumData); - } - }; - - @Override - public synchronized void seek(long pos) throws IOException { - if (pos < 0 || (chunks.size() == 0 && pos > 0) - || pos >= chunkOffset[chunks.size() - 1] + chunks.get(chunks.size() - 1) - .getLen()) { - throw new EOFException("EOF encountered pos: " + pos + " container key: " - + blockID.getLocalID()); - } - - if (pos < chunkOffset[chunkIndex]) { - chunkIndex = Arrays.binarySearch(chunkOffset, 0, chunkIndex, pos); - } else if (pos >= chunkOffset[chunkIndex] + chunks.get(chunkIndex) - .getLen()) { - chunkIndex = - Arrays.binarySearch(chunkOffset, chunkIndex + 1, chunks.size(), pos); - } - if (chunkIndex < 0) { - // Binary search returns -insertionPoint - 1 if element is not present - // in the array. insertionPoint is the point at which element would be - // inserted in the sorted array. We need to adjust the chunkIndex - // accordingly so that chunkIndex = insertionPoint - 1 - chunkIndex = -chunkIndex -2; - } - - // The bufferPosition should be adjusted to account for the chunk offset - // of the chunk the the pos actually points to. - bufferPosition = pos - chunkOffset[chunkIndex]; - - // Check if current buffers correspond to the chunk index being seeked - // and if the buffers have any data. - if (chunkIndex == chunkIndexOfCurrentBuffer && buffersAllocated()) { - // Position the buffer to the seeked position. - adjustBufferIndex(); - } else { - // Release the current buffers. The next readChunkFromContainer will - // read the required chunk and position the buffer to the seeked - // position. - releaseBuffers(); - } - } - - private void adjustBufferIndex() { - if (bufferPosition == -1) { - // The stream has not been seeked to a position. No need to adjust the - // buffer Index and position. - return; - } - // The bufferPosition is w.r.t the buffers for current chunk. - // Adjust the bufferIndex and position to the seeked position. - long tempOffest = 0; - for (int i = 0; i < buffers.size(); i++) { - if (bufferPosition - tempOffest >= buffers.get(i).capacity()) { - tempOffest += buffers.get(i).capacity(); - } else { - bufferIndex = i; - break; - } - } - buffers.get(bufferIndex).position((int) (bufferPosition - tempOffest)); - // Reset the bufferPosition as the seek() operation has been completed. - bufferPosition = -1; - } - - @Override - public synchronized long getPos() throws IOException { - // position = chunkOffset of current chunk (at chunkIndex) + position of - // the buffer corresponding to the chunk. - long bufferPos = 0; - - if (bufferPosition >= 0) { - // seek has been called but the buffers were empty. Hence, the buffer - // position will be advanced after the buffers are filled. - // We return the chunkOffset + bufferPosition here as that will be the - // position of the buffer pointer after reading the chunk file. - bufferPos = bufferPosition; - - } else if (blockStreamEOF()) { - // all data consumed, buffers have been released. - // get position from the chunk offset and chunk length of last chunk - bufferPos = chunks.get(chunkIndex).getLen(); - - } else if (buffersAllocated()) { - // get position from available buffers of current chunk - bufferPos = buffers.get(bufferIndex).position(); - - } - - return chunkOffset[chunkIndex] + bufferPos; - } - - @Override - public boolean seekToNewSource(long targetPos) throws IOException { - return false; - } - public BlockID getBlockID() { return blockID; } + public long getLength() { + return length; + } + @VisibleForTesting - protected int getChunkIndex() { + synchronized int getChunkIndex() { return chunkIndex; } + + @VisibleForTesting + synchronized long getBlockPosition() { + return blockPosition; + } + + @VisibleForTesting + synchronized List getChunkStreams() { + return chunkStreams; + } } diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkInputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkInputStream.java new file mode 100644 index 00000000000..8d30c225400 --- /dev/null +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkInputStream.java @@ -0,0 +1,546 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdds.scm.storage; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import org.apache.hadoop.fs.Seekable; +import org.apache.hadoop.hdds.client.BlockID; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProto; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandResponseProto; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChunkInfo; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ReadChunkResponseProto; +import org.apache.hadoop.hdds.scm.XceiverClientSpi; +import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException; +import org.apache.hadoop.ozone.common.Checksum; +import org.apache.hadoop.ozone.common.ChecksumData; +import org.apache.hadoop.ozone.common.OzoneChecksumException; +import org.apache.ratis.thirdparty.com.google.protobuf.ByteString; + +import java.io.EOFException; +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; +import java.util.List; + +/** + * An {@link InputStream} called from BlockInputStream to read a chunk from the + * container. Each chunk may contain multiple underlying {@link ByteBuffer} + * instances. + */ +public class ChunkInputStream extends InputStream implements Seekable { + + private ChunkInfo chunkInfo; + private final long length; + private final BlockID blockID; + private final String traceID; + private XceiverClientSpi xceiverClient; + private boolean verifyChecksum; + private boolean allocated = false; + + // Buffer to store the chunk data read from the DN container + private List buffers; + + // Index of the buffers corresponding to the current position of the buffers + private int bufferIndex; + + // The offset of the current data residing in the buffers w.r.t the start + // of chunk data + private long bufferOffset; + + // The number of bytes of chunk data residing in the buffers currently + private long bufferLength; + + // Position of the ChunkInputStream is maintained by this variable (if a + // seek is performed. This position is w.r.t to the chunk only and not the + // block or key. This variable is set only if either the buffers are not + // yet allocated or the if the allocated buffers do not cover the seeked + // position. Once the chunk is read, this variable is reset. + private long chunkPosition = -1; + + private static final int EOF = -1; + + ChunkInputStream(ChunkInfo chunkInfo, BlockID blockId, + String traceId, XceiverClientSpi xceiverClient, boolean verifyChecksum) { + this.chunkInfo = chunkInfo; + this.length = chunkInfo.getLen(); + this.blockID = blockId; + this.traceID = traceId; + this.xceiverClient = xceiverClient; + this.verifyChecksum = verifyChecksum; + } + + public synchronized long getRemaining() throws IOException { + return length - getPos(); + } + + /** + * {@inheritDoc} + */ + @Override + public synchronized int read() throws IOException { + checkOpen(); + int available = prepareRead(1); + int dataout = EOF; + + if (available == EOF) { + // There is no more data in the chunk stream. The buffers should have + // been released by now + Preconditions.checkState(buffers == null); + } else { + dataout = Byte.toUnsignedInt(buffers.get(bufferIndex).get()); + } + + if (chunkStreamEOF()) { + // consumer might use getPos to determine EOF, + // so release buffers when serving the last byte of data + releaseBuffers(); + } + + return dataout; + } + + /** + * {@inheritDoc} + */ + @Override + public synchronized int read(byte[] b, int off, int len) throws IOException { + // According to the JavaDocs for InputStream, it is recommended that + // subclasses provide an override of bulk read if possible for performance + // reasons. In addition to performance, we need to do it for correctness + // reasons. The Ozone REST service uses PipedInputStream and + // PipedOutputStream to relay HTTP response data between a Jersey thread and + // a Netty thread. It turns out that PipedInputStream/PipedOutputStream + // have a subtle dependency (bug?) on the wrapped stream providing separate + // implementations of single-byte read and bulk read. Without this, get key + // responses might close the connection before writing all of the bytes + // advertised in the Content-Length. + if (b == null) { + throw new NullPointerException(); + } + if (off < 0 || len < 0 || len > b.length - off) { + throw new IndexOutOfBoundsException(); + } + if (len == 0) { + return 0; + } + checkOpen(); + int total = 0; + while (len > 0) { + int available = prepareRead(len); + if (available == EOF) { + // There is no more data in the chunk stream. The buffers should have + // been released by now + Preconditions.checkState(buffers == null); + return total != 0 ? total : EOF; + } + buffers.get(bufferIndex).get(b, off + total, available); + len -= available; + total += available; + } + + if (chunkStreamEOF()) { + // smart consumers determine EOF by calling getPos() + // so we release buffers when serving the final bytes of data + releaseBuffers(); + } + + return total; + } + + /** + * Seeks the ChunkInputStream to the specified position. This is done by + * updating the chunkPosition to the seeked position in case the buffers + * are not allocated or buffers do not contain the data corresponding to + * the seeked position (determined by buffersHavePosition()). Otherwise, + * the buffers position is updated to the seeked position. + */ + @Override + public synchronized void seek(long pos) throws IOException { + if (pos < 0 || pos >= length) { + if (pos == 0) { + // It is possible for length and pos to be zero in which case + // seek should return instead of throwing exception + return; + } + throw new EOFException("EOF encountered at pos: " + pos + " for chunk: " + + chunkInfo.getChunkName()); + } + + if (buffersHavePosition(pos)) { + // The bufferPosition is w.r.t the current chunk. + // Adjust the bufferIndex and position to the seeked position. + adjustBufferPosition(pos - bufferOffset); + } else { + chunkPosition = pos; + } + } + + @Override + public synchronized long getPos() throws IOException { + if (chunkPosition >= 0) { + return chunkPosition; + } + if (chunkStreamEOF()) { + return length; + } + if (buffersHaveData()) { + return bufferOffset + buffers.get(bufferIndex).position(); + } + if (buffersAllocated()) { + return bufferOffset + bufferLength; + } + return 0; + } + + @Override + public boolean seekToNewSource(long targetPos) throws IOException { + return false; + } + + @Override + public synchronized void close() { + if (xceiverClient != null) { + xceiverClient = null; + } + } + + /** + * Checks if the stream is open. If not, throw an exception. + * + * @throws IOException if stream is closed + */ + protected synchronized void checkOpen() throws IOException { + if (xceiverClient == null) { + throw new IOException("BlockInputStream has been closed."); + } + } + + /** + * Prepares to read by advancing through buffers or allocating new buffers, + * as needed until it finds data to return, or encounters EOF. + * @param len desired lenght of data to read + * @return length of data available to read, possibly less than desired length + */ + private synchronized int prepareRead(int len) throws IOException { + for (;;) { + if (chunkPosition >= 0) { + if (buffersHavePosition(chunkPosition)) { + // The current buffers have the seeked position. Adjust the buffer + // index and position to point to the chunkPosition. + adjustBufferPosition(chunkPosition - bufferOffset); + } else { + // Read a required chunk data to fill the buffers with seeked + // position data + readChunkFromContainer(len); + } + } + if (buffersHaveData()) { + // Data is available from buffers + ByteBuffer bb = buffers.get(bufferIndex); + return len > bb.remaining() ? bb.remaining() : len; + } else if (dataRemainingInChunk()) { + // There is more data in the chunk stream which has not + // been read into the buffers yet. + readChunkFromContainer(len); + } else { + // All available input from this chunk stream has been consumed. + return EOF; + } + } + } + + /** + * Reads full or partial Chunk from DN Container based on the current + * position of the ChunkInputStream, the number of bytes of data to read + * and the checksum boundaries. + * If successful, then the read data in saved in the buffers so that + * subsequent read calls can utilize it. + * @param len number of bytes of data to be read + * @throws IOException if there is an I/O error while performing the call + * to Datanode + */ + private synchronized void readChunkFromContainer(int len) throws IOException { + + // index of first byte to be read from the chunk + long startByteIndex; + if (chunkPosition >= 0) { + // If seek operation was called to advance the buffer position, the + // chunk should be read from that position onwards. + startByteIndex = chunkPosition; + } else { + // Start reading the chunk from the last chunkPosition onwards. + startByteIndex = bufferOffset + bufferLength; + } + + if (verifyChecksum) { + // Update the bufferOffset and bufferLength as per the checksum + // boundary requirement. + computeChecksumBoundaries(startByteIndex, len); + } else { + // Read from the startByteIndex + bufferOffset = startByteIndex; + bufferLength = len; + } + + // Adjust the chunkInfo so that only the required bytes are read from + // the chunk. + final ChunkInfo adjustedChunkInfo = ChunkInfo.newBuilder(chunkInfo) + .setOffset(bufferOffset) + .setLen(bufferLength) + .build(); + + ByteString byteString = readChunk(adjustedChunkInfo); + + buffers = byteString.asReadOnlyByteBufferList(); + bufferIndex = 0; + allocated = true; + + // If the stream was seeked to position before, then the buffer + // position should be adjusted as the reads happen at checksum boundaries. + // The buffers position might need to be adjusted for the following + // scenarios: + // 1. Stream was seeked to a position before the chunk was read + // 2. Chunk was read from index < the current position to account for + // checksum boundaries. + adjustBufferPosition(startByteIndex - bufferOffset); + } + + /** + * Send RPC call to get the chunk from the container. + */ + @VisibleForTesting + protected ByteString readChunk(ChunkInfo readChunkInfo) throws IOException { + ReadChunkResponseProto readChunkResponse; + + try { + List validators = + ContainerProtocolCalls.getValidatorList(); + validators.add(validator); + + readChunkResponse = ContainerProtocolCalls.readChunk(xceiverClient, + readChunkInfo, blockID, traceID, validators); + + } catch (IOException e) { + if (e instanceof StorageContainerException) { + throw e; + } + throw new IOException("Unexpected OzoneException: " + e.toString(), e); + } + + return readChunkResponse.getData(); + } + + private CheckedBiFunction validator = + (request, response) -> { + final ChunkInfo reqChunkInfo = + request.getReadChunk().getChunkData(); + + ReadChunkResponseProto readChunkResponse = response.getReadChunk(); + ByteString byteString = readChunkResponse.getData(); + + if (byteString.size() != reqChunkInfo.getLen()) { + // Bytes read from chunk should be equal to chunk size. + throw new OzoneChecksumException(String + .format("Inconsistent read for chunk=%s len=%d bytesRead=%d", + reqChunkInfo.getChunkName(), reqChunkInfo.getLen(), + byteString.size())); + } + + if (verifyChecksum) { + ChecksumData checksumData = ChecksumData.getFromProtoBuf( + chunkInfo.getChecksumData()); + + // ChecksumData stores checksum for each 'numBytesPerChecksum' + // number of bytes in a list. Compute the index of the first + // checksum to match with the read data + + int checkumStartIndex = (int) (reqChunkInfo.getOffset() / + checksumData.getBytesPerChecksum()); + Checksum.verifyChecksum( + byteString, checksumData, checkumStartIndex); + } + }; + + /** + * Return the offset and length of bytes that need to be read from the + * chunk file to cover the checksum boundaries covering the actual start and + * end of the chunk index to be read. + * For example, lets say the client is reading from index 120 to 450 in the + * chunk. And let's say checksum is stored for every 100 bytes in the chunk + * i.e. the first checksum is for bytes from index 0 to 99, the next for + * bytes from index 100 to 199 and so on. To verify bytes from 120 to 450, + * we would need to read from bytes 100 to 499 so that checksum + * verification can be done. + * + * @param startByteIndex the first byte index to be read by client + * @param dataLen number of bytes to be read from the chunk + */ + private void computeChecksumBoundaries(long startByteIndex, int dataLen) { + + int bytesPerChecksum = chunkInfo.getChecksumData().getBytesPerChecksum(); + // index of the last byte to be read from chunk, inclusively. + final long endByteIndex = startByteIndex + dataLen - 1; + + bufferOffset = (startByteIndex / bytesPerChecksum) + * bytesPerChecksum; // inclusive + final long endIndex = ((endByteIndex / bytesPerChecksum) + 1) + * bytesPerChecksum; // exclusive + bufferLength = Math.min(endIndex, length) - bufferOffset; + } + + /** + * Adjust the buffers position to account for seeked position and/ or checksum + * boundary reads. + * @param bufferPosition the position to which the buffers must be advanced + */ + private void adjustBufferPosition(long bufferPosition) { + // The bufferPosition is w.r.t the current chunk. + // Adjust the bufferIndex and position to the seeked chunkPosition. + long tempOffest = 0; + for (int i = 0; i < buffers.size(); i++) { + if (bufferPosition - tempOffest >= buffers.get(i).capacity()) { + tempOffest += buffers.get(i).capacity(); + } else { + bufferIndex = i; + break; + } + } + buffers.get(bufferIndex).position((int) (bufferPosition - tempOffest)); + + // Reset the chunkPosition as chunk stream has been initialized i.e. the + // buffers have been allocated. + resetPosition(); + } + + /** + * Check if the buffers have been allocated data and false otherwise. + */ + private boolean buffersAllocated() { + return buffers != null && !buffers.isEmpty(); + } + + /** + * Check if the buffers have any data remaining between the current + * position and the limit. + */ + private boolean buffersHaveData() { + boolean hasData = false; + + if (buffersAllocated()) { + while (bufferIndex < (buffers.size())) { + if (buffers.get(bufferIndex).hasRemaining()) { + // current buffer has data + hasData = true; + break; + } else { + if (buffersRemaining()) { + // move to next available buffer + ++bufferIndex; + Preconditions.checkState(bufferIndex < buffers.size()); + } else { + // no more buffers remaining + break; + } + } + } + } + + return hasData; + } + + private boolean buffersRemaining() { + return (bufferIndex < (buffers.size() - 1)); + } + + /** + * Check if curernt buffers have the data corresponding to the input position. + */ + private boolean buffersHavePosition(long pos) { + // Check if buffers have been allocated + if (buffersAllocated()) { + // Check if the current buffers cover the input position + return pos >= bufferOffset && + pos < bufferOffset + bufferLength; + } + return false; + } + + /** + * Check if there is more data in the chunk which has not yet been read + * into the buffers. + */ + private boolean dataRemainingInChunk() { + long bufferPos; + if (chunkPosition >= 0) { + bufferPos = chunkPosition; + } else { + bufferPos = bufferOffset + bufferLength; + } + + return bufferPos < length; + } + + /** + * Check if end of chunkStream has been reached. + */ + private boolean chunkStreamEOF() { + if (!allocated) { + // Chunk data has not been read yet + return false; + } + + if (buffersHaveData() || dataRemainingInChunk()) { + return false; + } else { + Preconditions.checkState(bufferOffset + bufferLength == length, + "EOF detected, but not at the last byte of the chunk"); + return true; + } + } + + /** + * If EOF is reached, release the buffers. + */ + private void releaseBuffers() { + buffers = null; + bufferIndex = 0; + } + + /** + * Reset the chunkPosition once the buffers are allocated. + */ + void resetPosition() { + this.chunkPosition = -1; + } + + String getChunkName() { + return chunkInfo.getChunkName(); + } + + protected long getLength() { + return length; + } + + @VisibleForTesting + protected long getChunkPosition() { + return chunkPosition; + } +} diff --git a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockInputStream.java b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockInputStream.java index b6ceb2b2af7..a1985f05eea 100644 --- a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockInputStream.java +++ b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockInputStream.java @@ -1,32 +1,33 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with this - * work for additional information regarding copyright ownership. The ASF - * licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

- * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ + package org.apache.hadoop.hdds.scm.storage; +import com.google.common.primitives.Bytes; import org.apache.hadoop.hdds.client.BlockID; import org.apache.hadoop.hdds.client.ContainerBlockID; -import org.apache.hadoop.hdds.protocol.DatanodeDetails; -import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos - .ChecksumData; -import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos - .ChecksumType; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChecksumType; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChunkInfo; import org.apache.hadoop.hdds.scm.XceiverClientManager; -import org.apache.hadoop.hdds.scm.XceiverClientSpi; -import org.apache.ratis.thirdparty.com.google.protobuf.ByteString; +import org.apache.hadoop.hdds.scm.pipeline.Pipeline; +import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier; +import org.apache.hadoop.ozone.common.Checksum; +import org.apache.hadoop.security.token.Token; import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -34,106 +35,127 @@ import org.junit.Test; import java.io.EOFException; import java.io.IOException; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Random; -import java.util.UUID; + +import static org.apache.hadoop.hdds.scm.storage.TestChunkInputStream.generateRandomData; /** - * Tests {@link BlockInputStream}. + * Tests for {@link BlockInputStream}'s functionality. */ public class TestBlockInputStream { - private static BlockInputStream blockInputStream; - private static List chunks; - private static int blockSize; + private static final int CHUNK_SIZE = 100; + private static Checksum checksum; - private static final int CHUNK_SIZE = 20; + private BlockInputStream blockStream; + private byte[] blockData; + private int blockSize; + private List chunks; + private Map chunkDataMap; @Before public void setup() throws Exception { BlockID blockID = new BlockID(new ContainerBlockID(1, 1)); - chunks = createChunkList(10); - String traceID = UUID.randomUUID().toString(); - blockInputStream = new DummyBlockInputStream(blockID, null, null, chunks, - traceID, false, 0); + checksum = new Checksum(ChecksumType.NONE, CHUNK_SIZE); + createChunkList(5); - blockSize = 0; - for (ChunkInfo chunk : chunks) { - blockSize += chunk.getLen(); - } + blockStream = new DummyBlockInputStream(blockID, blockSize, null, null, + false, null, null); } /** * Create a mock list of chunks. The first n-1 chunks of length CHUNK_SIZE * and the last chunk with length CHUNK_SIZE/2. - * @param numChunks - * @return */ - private static List createChunkList(int numChunks) { - ChecksumData dummyChecksumData = ChecksumData.newBuilder() - .setType(ChecksumType.NONE) - .setBytesPerChecksum(100) - .build(); - List chunkList = new ArrayList<>(numChunks); - int i; - for (i = 0; i < numChunks - 1; i++) { - String chunkName = "chunk-" + i; + private void createChunkList(int numChunks) + throws Exception { + + chunks = new ArrayList<>(numChunks); + chunkDataMap = new HashMap<>(); + blockData = new byte[0]; + int i, chunkLen; + byte[] byteData; + String chunkName; + + for (i = 0; i < numChunks; i++) { + chunkName = "chunk-" + i; + chunkLen = CHUNK_SIZE; + if (i == numChunks - 1) { + chunkLen = CHUNK_SIZE / 2; + } + byteData = generateRandomData(chunkLen); ChunkInfo chunkInfo = ChunkInfo.newBuilder() .setChunkName(chunkName) .setOffset(0) - .setLen(CHUNK_SIZE) - .setChecksumData(dummyChecksumData) + .setLen(chunkLen) + .setChecksumData(checksum.computeChecksum( + byteData, 0, chunkLen).getProtoBufMessage()) .build(); - chunkList.add(chunkInfo); - } - ChunkInfo chunkInfo = ChunkInfo.newBuilder() - .setChunkName("chunk-" + i) - .setOffset(0) - .setLen(CHUNK_SIZE/2) - .setChecksumData(dummyChecksumData) - .build(); - chunkList.add(chunkInfo); - return chunkList; + chunkDataMap.put(chunkName, byteData); + chunks.add(chunkInfo); + + blockSize += chunkLen; + blockData = Bytes.concat(blockData, byteData); + } } /** - * A dummy BlockInputStream to test the functionality of BlockInputStream. + * A dummy BlockInputStream to mock read block call to DN. */ - private static class DummyBlockInputStream extends BlockInputStream { + private class DummyBlockInputStream extends BlockInputStream { - DummyBlockInputStream(BlockID blockID, - XceiverClientManager xceiverClientManager, - XceiverClientSpi xceiverClient, - List chunks, - String traceID, + DummyBlockInputStream(BlockID blockId, + long blockLen, + Pipeline pipeline, + Token token, boolean verifyChecksum, - long initialPosition) throws IOException { - super(blockID, xceiverClientManager, xceiverClient, chunks, traceID, - verifyChecksum, initialPosition); + String traceId, + XceiverClientManager xceiverClientManager) { + super(blockId, blockLen, pipeline, token, verifyChecksum, + traceId, xceiverClientManager); } @Override - protected ByteString readChunk(final ChunkInfo chunkInfo) - throws IOException { - return getByteString(chunkInfo.getChunkName(), (int) chunkInfo.getLen()); + protected List getChunkInfos() { + return chunks; } @Override - protected List getDatanodeList() { - // return an empty dummy list of size 10 - return new ArrayList<>(10); + protected void addStream(ChunkInfo chunkInfo) { + TestChunkInputStream testChunkInputStream = new TestChunkInputStream(); + getChunkStreams().add(testChunkInputStream.new DummyChunkInputStream( + chunkInfo, null, null, null, false, + chunkDataMap.get(chunkInfo.getChunkName()).clone())); } - /** - * Create ByteString with the input data to return when a readChunk call is - * placed. - */ - private static ByteString getByteString(String data, int length) { - while (data.length() < length) { - data = data + "0"; - } - return ByteString.copyFrom(data.getBytes(), 0, length); + @Override + protected synchronized void checkOpen() throws IOException { + // No action needed + } + } + + private void seekAndVerify(int pos) throws Exception { + blockStream.seek(pos); + Assert.assertEquals("Current position of buffer does not match with the " + + "seeked position", pos, blockStream.getPos()); + } + + /** + * Match readData with the chunkData byte-wise. + * @param readData Data read through ChunkInputStream + * @param inputDataStartIndex first index (inclusive) in chunkData to compare + * with read data + * @param length the number of bytes of data to match starting from + * inputDataStartIndex + */ + private void matchWithInputData(byte[] readData, int inputDataStartIndex, + int length) { + for (int i = inputDataStartIndex; i < inputDataStartIndex + length; i++) { + Assert.assertEquals(blockData[i], readData[i - inputDataStartIndex]); } } @@ -143,17 +165,26 @@ public class TestBlockInputStream { int pos = 0; seekAndVerify(pos); Assert.assertEquals("ChunkIndex is incorrect", 0, - blockInputStream.getChunkIndex()); + blockStream.getChunkIndex()); + // Before BlockInputStream is initialized (initialization happens during + // read operation), seek should update the BlockInputStream#blockPosition pos = CHUNK_SIZE; seekAndVerify(pos); - Assert.assertEquals("ChunkIndex is incorrect", 1, - blockInputStream.getChunkIndex()); + Assert.assertEquals("ChunkIndex is incorrect", 0, + blockStream.getChunkIndex()); + Assert.assertEquals(pos, blockStream.getBlockPosition()); - pos = (CHUNK_SIZE * 5) + 5; + // Initialize the BlockInputStream. After initializtion, the chunkIndex + // should be updated to correspond to the seeked position. + blockStream.initialize(); + Assert.assertEquals("ChunkIndex is incorrect", 1, + blockStream.getChunkIndex()); + + pos = (CHUNK_SIZE * 4) + 5; seekAndVerify(pos); - Assert.assertEquals("ChunkIndex is incorrect", 5, - blockInputStream.getChunkIndex()); + Assert.assertEquals("ChunkIndex is incorrect", 4, + blockStream.getChunkIndex()); try { // Try seeking beyond the blockSize. @@ -161,7 +192,7 @@ public class TestBlockInputStream { seekAndVerify(pos); Assert.fail("Seek to position beyond block size should fail."); } catch (EOFException e) { - // Expected + System.out.println(e); } // Seek to random positions between 0 and the block size. @@ -173,20 +204,32 @@ public class TestBlockInputStream { } @Test - public void testBlockEOF() throws Exception { - // Seek to some position < blockSize and verify EOF is not reached. - seekAndVerify(CHUNK_SIZE); - Assert.assertFalse(blockInputStream.blockStreamEOF()); + public void testRead() throws Exception { + // read 200 bytes of data starting from position 50. Chunk0 contains + // indices 0 to 99, chunk1 from 100 to 199 and chunk3 from 200 to 299. So + // the read should result in 3 ChunkInputStream reads + seekAndVerify(50); + byte[] b = new byte[200]; + blockStream.read(b, 0, 200); + matchWithInputData(b, 50, 200); - // Seek to blockSize-1 and verify that EOF is not reached as the chunk - // has not been read from container yet. - seekAndVerify(blockSize-1); - Assert.assertFalse(blockInputStream.blockStreamEOF()); + // The new position of the blockInputStream should be the last index read + // + 1. + Assert.assertEquals(250, blockStream.getPos()); + Assert.assertEquals(2, blockStream.getChunkIndex()); } - private void seekAndVerify(int pos) throws Exception { - blockInputStream.seek(pos); - Assert.assertEquals("Current position of buffer does not match with the " + - "seeked position", pos, blockInputStream.getPos()); + @Test + public void testSeekAndRead() throws Exception { + // Seek to a position and read data + seekAndVerify(50); + byte[] b1 = new byte[100]; + blockStream.read(b1, 0, 100); + matchWithInputData(b1, 50, 100); + + // Next read should start from the position of the last read + 1 i.e. 100 + byte[] b2 = new byte[100]; + blockStream.read(b2, 0, 100); + matchWithInputData(b2, 150, 100); } } diff --git a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestChunkInputStream.java b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestChunkInputStream.java new file mode 100644 index 00000000000..b113bc7f685 --- /dev/null +++ b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestChunkInputStream.java @@ -0,0 +1,224 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdds.scm.storage; + +import org.apache.hadoop.hdds.client.BlockID; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChecksumType; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChunkInfo; +import org.apache.hadoop.hdds.scm.XceiverClientSpi; +import org.apache.hadoop.ozone.OzoneConfigKeys; +import org.apache.hadoop.ozone.common.Checksum; +import org.apache.hadoop.test.GenericTestUtils; +import org.apache.ratis.thirdparty.com.google.protobuf.ByteString; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.io.EOFException; +import java.util.ArrayList; +import java.util.List; +import java.util.Random; + +/** + * Tests for {@link ChunkInputStream}'s functionality. + */ +public class TestChunkInputStream { + + private static final int CHUNK_SIZE = 100; + private static final int BYTES_PER_CHECKSUM = 20; + private static final String CHUNK_NAME = "dummyChunk"; + private static final Random RANDOM = new Random(); + private static Checksum checksum; + + private DummyChunkInputStream chunkStream; + private ChunkInfo chunkInfo; + private byte[] chunkData; + + @Before + public void setup() throws Exception { + checksum = new Checksum(ChecksumType.valueOf( + OzoneConfigKeys.OZONE_CLIENT_CHECKSUM_TYPE_DEFAULT), + BYTES_PER_CHECKSUM); + + chunkData = generateRandomData(CHUNK_SIZE); + + chunkInfo = ChunkInfo.newBuilder() + .setChunkName(CHUNK_NAME) + .setOffset(0) + .setLen(CHUNK_SIZE) + .setChecksumData(checksum.computeChecksum( + chunkData, 0, CHUNK_SIZE).getProtoBufMessage()) + .build(); + + chunkStream = new DummyChunkInputStream(chunkInfo, null, null, null, true); + } + + static byte[] generateRandomData(int length) { + byte[] bytes = new byte[length]; + RANDOM.nextBytes(bytes); + return bytes; + } + + /** + * A dummy ChunkInputStream to mock read chunk calls to DN. + */ + public class DummyChunkInputStream extends ChunkInputStream { + + // Stores the read chunk data in each readChunk call + private List readByteBuffers = new ArrayList<>(); + + DummyChunkInputStream(ChunkInfo chunkInfo, + BlockID blockId, + String traceId, + XceiverClientSpi xceiverClient, + boolean verifyChecksum) { + super(chunkInfo, blockId, traceId, xceiverClient, verifyChecksum); + } + + public DummyChunkInputStream(ChunkInfo chunkInfo, + BlockID blockId, + String traceId, + XceiverClientSpi xceiverClient, + boolean verifyChecksum, + byte[] data) { + super(chunkInfo, blockId, traceId, xceiverClient, verifyChecksum); + chunkData = data; + } + + @Override + protected ByteString readChunk(ChunkInfo readChunkInfo) { + ByteString byteString = ByteString.copyFrom(chunkData, + (int) readChunkInfo.getOffset(), + (int) readChunkInfo.getLen()); + readByteBuffers.add(byteString); + return byteString; + } + + @Override + protected void checkOpen() { + // No action needed + } + } + + /** + * Match readData with the chunkData byte-wise. + * @param readData Data read through ChunkInputStream + * @param inputDataStartIndex first index (inclusive) in chunkData to compare + * with read data + * @param length the number of bytes of data to match starting from + * inputDataStartIndex + */ + private void matchWithInputData(byte[] readData, int inputDataStartIndex, + int length) { + for (int i = inputDataStartIndex; i < inputDataStartIndex + length; i++) { + Assert.assertEquals(chunkData[i], readData[i - inputDataStartIndex]); + } + } + + /** + * Seek to a position and verify through getPos(). + */ + private void seekAndVerify(int pos) throws Exception { + chunkStream.seek(pos); + Assert.assertEquals("Current position of buffer does not match with the " + + "seeked position", pos, chunkStream.getPos()); + } + + @Test + public void testFullChunkRead() throws Exception { + byte[] b = new byte[CHUNK_SIZE]; + chunkStream.read(b, 0, CHUNK_SIZE); + + matchWithInputData(b, 0, CHUNK_SIZE); + } + + @Test + public void testPartialChunkRead() throws Exception { + int len = CHUNK_SIZE / 2; + byte[] b = new byte[len]; + + chunkStream.read(b, 0, len); + + matchWithInputData(b, 0, len); + + // To read chunk data from index 0 to 49 (len = 50), we need to read + // chunk from offset 0 to 60 as the checksum boundary is at every 20 + // bytes. Verify that 60 bytes of chunk data are read and stored in the + // buffers. + matchWithInputData(chunkStream.readByteBuffers.get(0).toByteArray(), + 0, 60); + + } + + @Test + public void testSeek() throws Exception { + seekAndVerify(0); + + try { + seekAndVerify(CHUNK_SIZE); + Assert.fail("Seeking to Chunk Length should fail."); + } catch (EOFException e) { + GenericTestUtils.assertExceptionContains("EOF encountered at pos: " + + CHUNK_SIZE + " for chunk: " + CHUNK_NAME, e); + } + + // Seek before read should update the ChunkInputStream#chunkPosition + seekAndVerify(25); + Assert.assertEquals(25, chunkStream.getChunkPosition()); + + // Read from the seeked position. + // Reading from index 25 to 54 should result in the ChunkInputStream + // copying chunk data from index 20 to 59 into the buffers (checksum + // boundaries). + byte[] b = new byte[30]; + chunkStream.read(b, 0, 30); + matchWithInputData(b, 25, 30); + matchWithInputData(chunkStream.readByteBuffers.get(0).toByteArray(), + 20, 40); + + // After read, the position of the chunkStream is evaluated from the + // buffers and the chunkPosition should be reset to -1. + Assert.assertEquals(-1, chunkStream.getChunkPosition()); + + // Seek to a position within the current buffers. Current buffers contain + // data from index 20 to 59. ChunkPosition should still not be used to + // set the position. + seekAndVerify(35); + Assert.assertEquals(-1, chunkStream.getChunkPosition()); + + // Seek to a position outside the current buffers. In this case, the + // chunkPosition should be updated to the seeked position. + seekAndVerify(75); + Assert.assertEquals(75, chunkStream.getChunkPosition()); + } + + @Test + public void testSeekAndRead() throws Exception { + // Seek to a position and read data + seekAndVerify(50); + byte[] b1 = new byte[20]; + chunkStream.read(b1, 0, 20); + matchWithInputData(b1, 50, 20); + + // Next read should start from the position of the last read + 1 i.e. 70 + byte[] b2 = new byte[20]; + chunkStream.read(b2, 0, 20); + matchWithInputData(b2, 70, 20); + } +} \ No newline at end of file diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/Checksum.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/Checksum.java index 1a359fe5c4d..0e70515a492 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/Checksum.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/Checksum.java @@ -225,15 +225,17 @@ public class Checksum { /** * Computes the ChecksumData for the input data and verifies that it - * matches with that of the input checksumData. + * matches with that of the input checksumData, starting from index + * startIndex. * @param byteString input data * @param checksumData checksumData to match with + * @param startIndex index of first checksum in checksumData to match with + * data's computed checksum. * @throws OzoneChecksumException is thrown if checksums do not match */ - public static boolean verifyChecksum( - ByteString byteString, ChecksumData checksumData) - throws OzoneChecksumException { - return verifyChecksum(byteString.toByteArray(), checksumData); + public static boolean verifyChecksum(ByteString byteString, + ChecksumData checksumData, int startIndex) throws OzoneChecksumException { + return verifyChecksum(byteString.toByteArray(), checksumData, startIndex); } /** @@ -245,6 +247,20 @@ public class Checksum { */ public static boolean verifyChecksum(byte[] data, ChecksumData checksumData) throws OzoneChecksumException { + return verifyChecksum(data, checksumData, 0); + } + + /** + * Computes the ChecksumData for the input data and verifies that it + * matches with that of the input checksumData. + * @param data input data + * @param checksumData checksumData to match with + * @param startIndex index of first checksum in checksumData to match with + * data's computed checksum. + * @throws OzoneChecksumException is thrown if checksums do not match + */ + public static boolean verifyChecksum(byte[] data, ChecksumData checksumData, + int startIndex) throws OzoneChecksumException { ChecksumType checksumType = checksumData.getChecksumType(); if (checksumType == ChecksumType.NONE) { // Checksum is set to NONE. No further verification is required. @@ -256,7 +272,8 @@ public class Checksum { ChecksumData computedChecksumData = checksum.computeChecksum(data, 0, data.length); - return checksumData.verifyChecksumDataMatches(computedChecksumData); + return checksumData.verifyChecksumDataMatches(computedChecksumData, + startIndex); } /** diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/ChecksumData.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/ChecksumData.java index dafa0e32a25..c0799bb25ee 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/ChecksumData.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/ChecksumData.java @@ -111,13 +111,20 @@ public class ChecksumData { } /** - * Verify that this ChecksumData matches with the input ChecksumData. + * Verify that this ChecksumData from startIndex to endIndex matches with the + * provided ChecksumData. + * The checksum at startIndex of this ChecksumData will be matched with the + * checksum at index 0 of the provided ChecksumData, and checksum at + * (startIndex + 1) of this ChecksumData with checksum at index 1 of + * provided ChecksumData and so on. * @param that the ChecksumData to match with + * @param startIndex index of the first checksum from this ChecksumData + * which will be used to compare checksums * @return true if checksums match * @throws OzoneChecksumException */ - public boolean verifyChecksumDataMatches(ChecksumData that) throws - OzoneChecksumException { + public boolean verifyChecksumDataMatches(ChecksumData that, int startIndex) + throws OzoneChecksumException { // pre checks if (this.checksums.size() == 0) { @@ -130,18 +137,22 @@ public class ChecksumData { "checksums"); } - if (this.checksums.size() != that.checksums.size()) { - throw new OzoneChecksumException("Original and Computed checksumData's " + - "has different number of checksums"); - } + int numChecksums = that.checksums.size(); - // Verify that checksum matches at each index - for (int index = 0; index < this.checksums.size(); index++) { - if (!matchChecksumAtIndex(this.checksums.get(index), - that.checksums.get(index))) { - // checksum mismatch. throw exception. - throw new OzoneChecksumException(index); + try { + // Verify that checksum matches at each index + for (int index = 0; index < numChecksums; index++) { + if (!matchChecksumAtIndex(this.checksums.get(startIndex + index), + that.checksums.get(index))) { + // checksum mismatch. throw exception. + throw new OzoneChecksumException(index); + } } + } catch (ArrayIndexOutOfBoundsException e) { + throw new OzoneChecksumException("Computed checksum has " + + numChecksums + " number of checksums. Original checksum has " + + (this.checksums.size() - startIndex) + " number of checksums " + + "starting from index " + startIndex); } return true; } diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyInputStream.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyInputStream.java index 5b6342028a4..41ac60f0bd8 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyInputStream.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyInputStream.java @@ -20,19 +20,10 @@ package org.apache.hadoop.ozone.client.io; import com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.fs.FSExceptionMessages; import org.apache.hadoop.fs.Seekable; -import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; -import org.apache.hadoop.hdds.client.BlockID; -import org.apache.hadoop.hdds.protocol.proto.HddsProtos; -import org.apache.hadoop.hdds.scm.pipeline.Pipeline; -import org.apache.hadoop.hdds.scm.protocol.StorageContainerLocationProtocol; +import org.apache.hadoop.hdds.scm.XceiverClientManager; +import org.apache.hadoop.hdds.scm.storage.BlockInputStream; import org.apache.hadoop.ozone.om.helpers.OmKeyInfo; import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo; -import org.apache.hadoop.hdds.scm.XceiverClientManager; -import org.apache.hadoop.hdds.scm.XceiverClientSpi; -import org.apache.hadoop.hdds.scm.storage.BlockInputStream; -import org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls; -import org.apache.hadoop.security.UserGroupInformation; -import org.apache.ratis.util.Preconditions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -53,60 +44,93 @@ public class KeyInputStream extends InputStream implements Seekable { private static final int EOF = -1; - private final ArrayList streamEntries; - // streamOffset[i] stores the offset at which blockInputStream i stores - // data in the key - private long[] streamOffset = null; - private int currentStreamIndex; + private String key; private long length = 0; private boolean closed = false; - private String key; + + // List of BlockInputStreams, one for each block in the key + private final List blockStreams; + + // blockOffsets[i] stores the index of the first data byte in + // blockStream w.r.t the key data. + // For example, let’s say the block size is 200 bytes and block[0] stores + // data from indices 0 - 199, block[1] from indices 200 - 399 and so on. + // Then, blockOffset[0] = 0 (the offset of the first byte of data in + // block[0]), blockOffset[1] = 200 and so on. + private long[] blockOffsets = null; + + // Index of the blockStream corresponding to the current position of the + // KeyInputStream i.e. offset of the data to be read next + private int blockIndex; + + // Tracks the blockIndex corresponding to the last seeked position so that it + // can be reset if a new position is seeked. + private int blockIndexOfPrevPosition; public KeyInputStream() { - streamEntries = new ArrayList<>(); - currentStreamIndex = 0; - } - - @VisibleForTesting - public synchronized int getCurrentStreamIndex() { - return currentStreamIndex; - } - - @VisibleForTesting - public long getRemainingOfIndex(int index) throws IOException { - return streamEntries.get(index).getRemaining(); + blockStreams = new ArrayList<>(); + blockIndex = 0; } /** - * Append another stream to the end of the list. - * - * @param stream the stream instance. - * @param streamLength the max number of bytes that should be written to this - * stream. + * For each block in keyInfo, add a BlockInputStream to blockStreams. */ - @VisibleForTesting - public synchronized void addStream(BlockInputStream stream, - long streamLength) { - streamEntries.add(new ChunkInputStreamEntry(stream, streamLength)); + public static LengthInputStream getFromOmKeyInfo(OmKeyInfo keyInfo, + XceiverClientManager xceiverClientManager, String requestId, + boolean verifyChecksum) { + List keyLocationInfos = keyInfo + .getLatestVersionLocations().getBlocksLatestVersionOnly(); + + KeyInputStream keyInputStream = new KeyInputStream(); + keyInputStream.initialize(keyInfo.getKeyName(), keyLocationInfos, + xceiverClientManager, requestId, verifyChecksum); + + return new LengthInputStream(keyInputStream, keyInputStream.length); + } + + private synchronized void initialize(String keyName, + List blockInfos, + XceiverClientManager xceiverClientManager, String requestId, + boolean verifyChecksum) { + this.key = keyName; + this.blockOffsets = new long[blockInfos.size()]; + long keyLength = 0; + for (int i = 0; i < blockInfos.size(); i++) { + OmKeyLocationInfo omKeyLocationInfo = blockInfos.get(i); + LOG.debug("Adding stream for accessing {}. The stream will be " + + "initialized later.", omKeyLocationInfo); + + addStream(omKeyLocationInfo, xceiverClientManager, requestId, + verifyChecksum); + + this.blockOffsets[i] = keyLength; + keyLength += omKeyLocationInfo.getLength(); + } + this.length = keyLength; } /** - * Append another ChunkInputStreamEntry to the end of the list. - * The stream will be constructed from the input information when it needs - * to be accessed. + * Append another BlockInputStream to the end of the list. Note that the + * BlockInputStream is only created here and not initialized. The + * BlockInputStream is initialized when a read operation is performed on + * the block for the first time. */ - private synchronized void addStream(OmKeyLocationInfo omKeyLocationInfo, + private synchronized void addStream(OmKeyLocationInfo blockInfo, XceiverClientManager xceiverClientMngr, String clientRequestId, boolean verifyChecksum) { - streamEntries.add(new ChunkInputStreamEntry(omKeyLocationInfo, - xceiverClientMngr, clientRequestId, verifyChecksum)); + blockStreams.add(new BlockInputStream(blockInfo.getBlockID(), + blockInfo.getLength(), blockInfo.getPipeline(), blockInfo.getToken(), + verifyChecksum, clientRequestId, xceiverClientMngr)); } - private synchronized ChunkInputStreamEntry getStreamEntry(int index) - throws IOException { - return streamEntries.get(index).getStream(); + @VisibleForTesting + public void addStream(BlockInputStream blockInputStream) { + blockStreams.add(blockInputStream); } + /** + * {@inheritDoc} + */ @Override public synchronized int read() throws IOException { byte[] buf = new byte[1]; @@ -116,9 +140,12 @@ public class KeyInputStream extends InputStream implements Seekable { return Byte.toUnsignedInt(buf[0]); } + /** + * {@inheritDoc} + */ @Override public synchronized int read(byte[] b, int off, int len) throws IOException { - checkNotClosed(); + checkOpen(); if (b == null) { throw new NullPointerException(); } @@ -131,13 +158,15 @@ public class KeyInputStream extends InputStream implements Seekable { int totalReadLen = 0; while (len > 0) { // if we are at the last block and have read the entire block, return - if (streamEntries.size() == 0 || - (streamEntries.size() - 1 <= currentStreamIndex && - streamEntries.get(currentStreamIndex) - .getRemaining() == 0)) { + if (blockStreams.size() == 0 || + (blockStreams.size() - 1 <= blockIndex && + blockStreams.get(blockIndex) + .getRemaining() == 0)) { return totalReadLen == 0 ? EOF : totalReadLen; } - ChunkInputStreamEntry current = getStreamEntry(currentStreamIndex); + + // Get the current blockStream and read data from it + BlockInputStream current = blockStreams.get(blockIndex); int numBytesToRead = Math.min(len, (int)current.getRemaining()); int numBytesRead = current.read(b, off, numBytesToRead); if (numBytesRead != numBytesToRead) { @@ -146,23 +175,35 @@ public class KeyInputStream extends InputStream implements Seekable { // this case. throw new IOException(String.format( "Inconsistent read for blockID=%s length=%d numBytesRead=%d", - current.blockInputStream.getBlockID(), current.length, - numBytesRead)); + current.getBlockID(), current.getLength(), numBytesRead)); } totalReadLen += numBytesRead; off += numBytesRead; len -= numBytesRead; if (current.getRemaining() <= 0 && - ((currentStreamIndex + 1) < streamEntries.size())) { - currentStreamIndex += 1; + ((blockIndex + 1) < blockStreams.size())) { + blockIndex += 1; } } return totalReadLen; } + /** + * Seeks the KeyInputStream to the specified position. This involves 2 steps: + * 1. Updating the blockIndex to the blockStream corresponding to the + * seeked position. + * 2. Seeking the corresponding blockStream to the adjusted position. + * + * For example, let’s say the block size is 200 bytes and block[0] stores + * data from indices 0 - 199, block[1] from indices 200 - 399 and so on. + * Let’s say we seek to position 240. In the first step, the blockIndex + * would be updated to 1 as indices 200 - 399 reside in blockStream[1]. In + * the second step, the blockStream[1] would be seeked to position 40 (= + * 240 - blockOffset[1] (= 200)). + */ @Override - public void seek(long pos) throws IOException { - checkNotClosed(); + public synchronized void seek(long pos) throws IOException { + checkOpen(); if (pos < 0 || pos >= length) { if (pos == 0) { // It is possible for length and pos to be zero in which case @@ -172,35 +213,39 @@ public class KeyInputStream extends InputStream implements Seekable { throw new EOFException( "EOF encountered at pos: " + pos + " for key: " + key); } - Preconditions.assertTrue(currentStreamIndex >= 0); - if (currentStreamIndex >= streamEntries.size()) { - currentStreamIndex = Arrays.binarySearch(streamOffset, pos); - } else if (pos < streamOffset[currentStreamIndex]) { - currentStreamIndex = - Arrays.binarySearch(streamOffset, 0, currentStreamIndex, pos); - } else if (pos >= streamOffset[currentStreamIndex] + streamEntries - .get(currentStreamIndex).length) { - currentStreamIndex = Arrays - .binarySearch(streamOffset, currentStreamIndex + 1, - streamEntries.size(), pos); + + // 1. Update the blockIndex + if (blockIndex >= blockStreams.size()) { + blockIndex = Arrays.binarySearch(blockOffsets, pos); + } else if (pos < blockOffsets[blockIndex]) { + blockIndex = + Arrays.binarySearch(blockOffsets, 0, blockIndex, pos); + } else if (pos >= blockOffsets[blockIndex] + blockStreams + .get(blockIndex).getLength()) { + blockIndex = Arrays + .binarySearch(blockOffsets, blockIndex + 1, + blockStreams.size(), pos); } - if (currentStreamIndex < 0) { + if (blockIndex < 0) { // Binary search returns -insertionPoint - 1 if element is not present // in the array. insertionPoint is the point at which element would be - // inserted in the sorted array. We need to adjust the currentStreamIndex - // accordingly so that currentStreamIndex = insertionPoint - 1 - currentStreamIndex = -currentStreamIndex - 2; + // inserted in the sorted array. We need to adjust the blockIndex + // accordingly so that blockIndex = insertionPoint - 1 + blockIndex = -blockIndex - 2; } - // seek to the proper offset in the BlockInputStream - streamEntries.get(currentStreamIndex) - .seek(pos - streamOffset[currentStreamIndex]); + + // Reset the previous blockStream's position + blockStreams.get(blockIndexOfPrevPosition).resetPosition(); + + // 2. Seek the blockStream to the adjusted position + blockStreams.get(blockIndex).seek(pos - blockOffsets[blockIndex]); + blockIndexOfPrevPosition = blockIndex; } @Override - public long getPos() throws IOException { - return length == 0 ? 0 : - streamOffset[currentStreamIndex] + streamEntries.get(currentStreamIndex) - .getPos(); + public synchronized long getPos() throws IOException { + return length == 0 ? 0 : blockOffsets[blockIndex] + + blockStreams.get(blockIndex).getPos(); } @Override @@ -210,7 +255,7 @@ public class KeyInputStream extends InputStream implements Seekable { @Override public int available() throws IOException { - checkNotClosed(); + checkOpen(); long remaining = length - getPos(); return remaining <= Integer.MAX_VALUE ? (int) remaining : Integer.MAX_VALUE; } @@ -218,177 +263,30 @@ public class KeyInputStream extends InputStream implements Seekable { @Override public void close() throws IOException { closed = true; - for (int i = 0; i < streamEntries.size(); i++) { - streamEntries.get(i).close(); + for (BlockInputStream blockStream : blockStreams) { + blockStream.close(); } } - /** - * Encapsulates BlockInputStream. - */ - public static class ChunkInputStreamEntry extends InputStream - implements Seekable { - - private BlockInputStream blockInputStream; - private final OmKeyLocationInfo blockLocationInfo; - private final long length; - private final XceiverClientManager xceiverClientManager; - private final String requestId; - private boolean verifyChecksum; - - // the position of the blockInputStream is maintained by this variable - // till the stream is initialized - private long position; - - public ChunkInputStreamEntry(OmKeyLocationInfo omKeyLocationInfo, - XceiverClientManager xceiverClientMngr, String clientRequestId, - boolean verifyChecksum) { - this.blockLocationInfo = omKeyLocationInfo; - this.length = omKeyLocationInfo.getLength(); - this.xceiverClientManager = xceiverClientMngr; - this.requestId = clientRequestId; - this.verifyChecksum = verifyChecksum; - } - - @VisibleForTesting - public ChunkInputStreamEntry(BlockInputStream blockInputStream, - long length) { - this.blockInputStream = blockInputStream; - this.length = length; - this.blockLocationInfo = null; - this.xceiverClientManager = null; - this.requestId = null; - } - - private ChunkInputStreamEntry getStream() throws IOException { - if (this.blockInputStream == null) { - initializeBlockInputStream(); - } - return this; - } - - private void initializeBlockInputStream() throws IOException { - BlockID blockID = blockLocationInfo.getBlockID(); - long containerID = blockID.getContainerID(); - Pipeline pipeline = blockLocationInfo.getPipeline(); - - // irrespective of the container state, we will always read via Standalone - // protocol. - if (pipeline.getType() != HddsProtos.ReplicationType.STAND_ALONE) { - pipeline = Pipeline.newBuilder(pipeline) - .setType(HddsProtos.ReplicationType.STAND_ALONE).build(); - } - XceiverClientSpi xceiverClient = xceiverClientManager - .acquireClient(pipeline); - boolean success = false; - long containerKey = blockLocationInfo.getLocalID(); - try { - LOG.debug("Initializing stream for get key to access {} {}", - containerID, containerKey); - ContainerProtos.DatanodeBlockID datanodeBlockID = blockID - .getDatanodeBlockIDProtobuf(); - if (blockLocationInfo.getToken() != null) { - UserGroupInformation.getCurrentUser(). - addToken(blockLocationInfo.getToken()); - } - ContainerProtos.GetBlockResponseProto response = ContainerProtocolCalls - .getBlock(xceiverClient, datanodeBlockID, requestId); - List chunks = - response.getBlockData().getChunksList(); - success = true; - this.blockInputStream = new BlockInputStream( - blockLocationInfo.getBlockID(), xceiverClientManager, xceiverClient, - chunks, requestId, verifyChecksum, position); - } finally { - if (!success) { - xceiverClientManager.releaseClient(xceiverClient, false); - } - } - } - - synchronized long getRemaining() throws IOException { - return length - getPos(); - } - - @Override - public synchronized int read(byte[] b, int off, int len) - throws IOException { - int readLen = blockInputStream.read(b, off, len); - return readLen; - } - - @Override - public synchronized int read() throws IOException { - int data = blockInputStream.read(); - return data; - } - - @Override - public synchronized void close() throws IOException { - if (blockInputStream != null) { - blockInputStream.close(); - } - } - - @Override - public void seek(long pos) throws IOException { - if (blockInputStream != null) { - blockInputStream.seek(pos); - } else { - position = pos; - } - } - - @Override - public long getPos() throws IOException { - if (blockInputStream != null) { - return blockInputStream.getPos(); - } else { - return position; - } - } - - @Override - public boolean seekToNewSource(long targetPos) throws IOException { - return false; - } - } - - public static LengthInputStream getFromOmKeyInfo( - OmKeyInfo keyInfo, - XceiverClientManager xceiverClientManager, - StorageContainerLocationProtocol - storageContainerLocationClient, - String requestId, boolean verifyChecksum) throws IOException { - long length = 0; - KeyInputStream groupInputStream = new KeyInputStream(); - groupInputStream.key = keyInfo.getKeyName(); - List keyLocationInfos = - keyInfo.getLatestVersionLocations().getBlocksLatestVersionOnly(); - groupInputStream.streamOffset = new long[keyLocationInfos.size()]; - for (int i = 0; i < keyLocationInfos.size(); i++) { - OmKeyLocationInfo omKeyLocationInfo = keyLocationInfos.get(i); - LOG.debug("Adding stream for accessing {}. The stream will be " + - "initialized later.", omKeyLocationInfo); - groupInputStream.addStream(omKeyLocationInfo, xceiverClientManager, - requestId, verifyChecksum); - - groupInputStream.streamOffset[i] = length; - length += omKeyLocationInfo.getLength(); - } - groupInputStream.length = length; - return new LengthInputStream(groupInputStream, length); - } - /** * Verify that the input stream is open. Non blocking; this gives * the last state of the volatile {@link #closed} field. * @throws IOException if the connection is closed. */ - private void checkNotClosed() throws IOException { + private void checkOpen() throws IOException { if (closed) { throw new IOException( ": " + FSExceptionMessages.STREAM_IS_CLOSED + " Key: " + key); } } + + @VisibleForTesting + public synchronized int getCurrentStreamIndex() { + return blockIndex; + } + + @VisibleForTesting + public long getRemainingOfIndex(int index) throws IOException { + return blockStreams.get(index).getRemaining(); + } } diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java index 48968a4647f..5f2df7dd917 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java @@ -1072,8 +1072,8 @@ public class RpcClient implements ClientProtocol, KeyProviderTokenIssuer { private OzoneInputStream createInputStream(OmKeyInfo keyInfo, String requestId) throws IOException { LengthInputStream lengthInputStream = KeyInputStream - .getFromOmKeyInfo(keyInfo, xceiverClientManager, - storageContainerLocationClient, requestId, verifyChecksum); + .getFromOmKeyInfo(keyInfo, xceiverClientManager, requestId, + verifyChecksum); FileEncryptionInfo feInfo = keyInfo.getFileEncryptionInfo(); if (feInfo != null) { final KeyProvider.KeyVersion decrypted = getDEK(feInfo); diff --git a/hadoop-ozone/objectstore-service/src/main/java/org/apache/hadoop/ozone/web/storage/DistributedStorageHandler.java b/hadoop-ozone/objectstore-service/src/main/java/org/apache/hadoop/ozone/web/storage/DistributedStorageHandler.java index a4aa361e398..6876166f8b0 100644 --- a/hadoop-ozone/objectstore-service/src/main/java/org/apache/hadoop/ozone/web/storage/DistributedStorageHandler.java +++ b/hadoop-ozone/objectstore-service/src/main/java/org/apache/hadoop/ozone/web/storage/DistributedStorageHandler.java @@ -503,8 +503,7 @@ public final class DistributedStorageHandler implements StorageHandler { .build(); OmKeyInfo keyInfo = ozoneManagerClient.lookupKey(keyArgs); return KeyInputStream.getFromOmKeyInfo( - keyInfo, xceiverClientManager, storageContainerLocationClient, - args.getRequestID(), verifyChecksum); + keyInfo, xceiverClientManager, args.getRequestID(), verifyChecksum); } @Override diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestChunkStreams.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestChunkStreams.java index 45f04dfae0b..80717dde86f 100644 --- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestChunkStreams.java +++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestChunkStreams.java @@ -25,7 +25,6 @@ import org.junit.rules.ExpectedException; import java.io.ByteArrayInputStream; import java.io.IOException; -import java.util.ArrayList; import static java.nio.charset.StandardCharsets.UTF_8; import static org.junit.Assert.assertEquals; @@ -48,8 +47,7 @@ public class TestChunkStreams { for (int i = 0; i < 5; i++) { int tempOffset = offset; BlockInputStream in = - new BlockInputStream(null, null, null, new ArrayList<>(), null, - true, 0) { + new BlockInputStream(null, 100, null, null, true, null, null) { private long pos = 0; private ByteArrayInputStream in = new ByteArrayInputStream(buf, tempOffset, 100); @@ -84,7 +82,7 @@ public class TestChunkStreams { } }; offset += 100; - groupInputStream.addStream(in, 100); + groupInputStream.addStream(in); } byte[] resBuf = new byte[500]; @@ -105,8 +103,7 @@ public class TestChunkStreams { for (int i = 0; i < 5; i++) { int tempOffset = offset; BlockInputStream in = - new BlockInputStream(null, null, null, new ArrayList<>(), null, - true, 0) { + new BlockInputStream(null, 100, null, null, true, null, null) { private long pos = 0; private ByteArrayInputStream in = new ByteArrayInputStream(buf, tempOffset, 100); @@ -141,7 +138,7 @@ public class TestChunkStreams { } }; offset += 100; - groupInputStream.addStream(in, 100); + groupInputStream.addStream(in); } byte[] resBuf = new byte[600];