diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java index 66671637bfa..bb4a5b05633 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hdds.scm.storage; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; @@ -61,10 +62,18 @@ public class BlockInputStream extends InputStream implements Seekable { private XceiverClientManager xceiverClientManager; private XceiverClientSpi xceiverClient; private List chunks; + // ChunkIndex points to the index current chunk in the buffers or the the + // index of chunk which will be read next into the buffers in + // readChunkFromContainer(). private int chunkIndex; + // ChunkIndexOfCurrentBuffer points to the index of chunk read into the + // buffers or index of the last chunk in the buffers. It is updated only + // when a new chunk is read from container into the buffers. + private int chunkIndexOfCurrentBuffer; private long[] chunkOffset; private List buffers; private int bufferIndex; + private long bufferPosition; private final boolean verifyChecksum; /** @@ -76,24 +85,34 @@ public class BlockInputStream extends InputStream implements Seekable { * @param chunks list of chunks to read * @param traceID container protocol call traceID * @param verifyChecksum verify checksum + * @param initialPosition the initial position of the stream pointer. This + * position is seeked now if the up-stream was seeked + * before this was created. */ public BlockInputStream( BlockID blockID, XceiverClientManager xceiverClientManager, XceiverClientSpi xceiverClient, List chunks, String traceID, - boolean verifyChecksum) { + boolean verifyChecksum, long initialPosition) throws IOException { this.blockID = blockID; this.traceID = traceID; this.xceiverClientManager = xceiverClientManager; this.xceiverClient = xceiverClient; this.chunks = chunks; - this.chunkIndex = -1; + this.chunkIndex = 0; + this.chunkIndexOfCurrentBuffer = -1; // chunkOffset[i] stores offset at which chunk i stores data in // BlockInputStream this.chunkOffset = new long[this.chunks.size()]; initializeChunkOffset(); this.buffers = null; this.bufferIndex = 0; + this.bufferPosition = -1; this.verifyChecksum = verifyChecksum; + if (initialPosition > 0) { + // The stream was seeked to a position before the stream was + // initialized. So seeking to the position now. + seek(initialPosition); + } } private void initializeChunkOffset() { @@ -176,7 +195,7 @@ public synchronized int read(byte[] b, int off, int len) throws IOException { * * @return true if EOF, false if more data is available */ - private boolean blockStreamEOF() { + protected boolean blockStreamEOF() { if (buffersHaveData() || chunksRemaining()) { return false; } else { @@ -223,12 +242,19 @@ private synchronized void checkOpen() throws IOException { */ private synchronized int prepareRead(int len) throws IOException { for (;;) { + if (!buffersAllocated()) { + // The current chunk at chunkIndex has not been read from the + // container. Read the chunk and put the data into buffers. + readChunkFromContainer(); + } if (buffersHaveData()) { // Data is available from buffers ByteBuffer bb = buffers.get(bufferIndex); return len > bb.remaining() ? bb.remaining() : len; } else if (chunksRemaining()) { // There are additional chunks available. + // Read the next chunk in the block. + chunkIndex += 1; readChunkFromContainer(); } else { // All available input has been consumed. @@ -237,26 +263,31 @@ private synchronized int prepareRead(int len) throws IOException { } } - private boolean buffersHaveData() { - boolean hasData = false; - + private boolean buffersAllocated() { if (buffers == null || buffers.isEmpty()) { return false; } + return true; + } - while (bufferIndex < (buffers.size())) { - if (buffers.get(bufferIndex).hasRemaining()) { - // current buffer has data - hasData = true; - break; - } else { - if (buffersRemaining()) { - // move to next available buffer - ++bufferIndex; - Preconditions.checkState(bufferIndex < buffers.size()); - } else { - // no more buffers remaining + private boolean buffersHaveData() { + boolean hasData = false; + + if (buffersAllocated()) { + while (bufferIndex < (buffers.size())) { + if (buffers.get(bufferIndex).hasRemaining()) { + // current buffer has data + hasData = true; break; + } else { + if (buffersRemaining()) { + // move to next available buffer + ++bufferIndex; + Preconditions.checkState(bufferIndex < buffers.size()); + } else { + // no more buffers remaining + break; + } } } } @@ -272,7 +303,14 @@ private boolean chunksRemaining() { if ((chunks == null) || chunks.isEmpty()) { return false; } - return (chunkIndex < (chunks.size() - 1)); + // Check if more chunks are remaining in the stream after chunkIndex + if (chunkIndex < (chunks.size() - 1)) { + return true; + } + // ChunkIndex is the last chunk in the stream. Check if this chunk has + // been read from container or not. Return true if chunkIndex has not + // been read yet and false otherwise. + return chunkIndexOfCurrentBuffer != chunkIndex; } /** @@ -283,34 +321,14 @@ private boolean chunksRemaining() { * @throws IOException if there is an I/O error while performing the call */ private synchronized void readChunkFromContainer() throws IOException { - // On every chunk read chunkIndex should be increased so as to read the - // next chunk - chunkIndex += 1; - XceiverClientReply reply; - ReadChunkResponseProto readChunkResponse = null; + // Read the chunk at chunkIndex final ChunkInfo chunkInfo = chunks.get(chunkIndex); List excludeDns = null; ByteString byteString; - List dnList = xceiverClient.getPipeline().getNodes(); + List dnList = getDatanodeList(); while (true) { - try { - reply = ContainerProtocolCalls - .readChunk(xceiverClient, chunkInfo, blockID, traceID, excludeDns); - ContainerProtos.ContainerCommandResponseProto response; - response = reply.getResponse().get(); - ContainerProtocolCalls.validateContainerResponse(response); - readChunkResponse = response.getReadChunk(); - } catch (IOException e) { - if (e instanceof StorageContainerException) { - throw e; - } - throw new IOException("Unexpected OzoneException: " + e.toString(), e); - } catch (ExecutionException | InterruptedException e) { - throw new IOException( - "Failed to execute ReadChunk command for chunk " + chunkInfo - .getChunkName(), e); - } - byteString = readChunkResponse.getData(); + List dnListFromReadChunkCall = new ArrayList<>(); + byteString = readChunk(chunkInfo, excludeDns, dnListFromReadChunkCall); try { if (byteString.size() != chunkInfo.getLen()) { // Bytes read from chunk should be equal to chunk size. @@ -333,7 +351,7 @@ private synchronized void readChunkFromContainer() throws IOException { if (excludeDns == null) { excludeDns = new ArrayList<>(); } - excludeDns.addAll(reply.getDatanodes()); + excludeDns.addAll(dnListFromReadChunkCall); if (excludeDns.size() == dnList.size()) { throw ioe; } @@ -342,6 +360,47 @@ private synchronized void readChunkFromContainer() throws IOException { buffers = byteString.asReadOnlyByteBufferList(); bufferIndex = 0; + chunkIndexOfCurrentBuffer = chunkIndex; + + // The bufferIndex and position might need to be adjusted if seek() was + // called on the stream before. This needs to be done so that the buffer + // position can be advanced to the 'seeked' position. + adjustBufferIndex(); + } + + /** + * Send RPC call to get the chunk from the container. + */ + @VisibleForTesting + protected ByteString readChunk(final ChunkInfo chunkInfo, + List excludeDns, List dnListFromReply) + throws IOException { + XceiverClientReply reply; + ReadChunkResponseProto readChunkResponse = null; + try { + reply = ContainerProtocolCalls + .readChunk(xceiverClient, chunkInfo, blockID, traceID, excludeDns); + ContainerProtos.ContainerCommandResponseProto response; + response = reply.getResponse().get(); + ContainerProtocolCalls.validateContainerResponse(response); + readChunkResponse = response.getReadChunk(); + dnListFromReply.addAll(reply.getDatanodes()); + } catch (IOException e) { + if (e instanceof StorageContainerException) { + throw e; + } + throw new IOException("Unexpected OzoneException: " + e.toString(), e); + } catch (ExecutionException | InterruptedException e) { + throw new IOException( + "Failed to execute ReadChunk command for chunk " + chunkInfo + .getChunkName(), e); + } + return readChunkResponse.getData(); + } + + @VisibleForTesting + protected List getDatanodeList() { + return xceiverClient.getPipeline().getNodes(); } @Override @@ -352,9 +411,8 @@ public synchronized void seek(long pos) throws IOException { throw new EOFException("EOF encountered pos: " + pos + " container key: " + blockID.getLocalID()); } - if (chunkIndex == -1) { - chunkIndex = Arrays.binarySearch(chunkOffset, pos); - } else if (pos < chunkOffset[chunkIndex]) { + + if (pos < chunkOffset[chunkIndex]) { chunkIndex = Arrays.binarySearch(chunkOffset, 0, chunkIndex, pos); } else if (pos >= chunkOffset[chunkIndex] + chunks.get(chunkIndex) .getLen()) { @@ -368,40 +426,71 @@ public synchronized void seek(long pos) throws IOException { // accordingly so that chunkIndex = insertionPoint - 1 chunkIndex = -chunkIndex -2; } - // adjust chunkIndex so that readChunkFromContainer reads the correct chunk - chunkIndex -= 1; - readChunkFromContainer(); - adjustBufferIndex(pos); + + // The bufferPosition should be adjusted to account for the chunk offset + // of the chunk the the pos actually points to. + bufferPosition = pos - chunkOffset[chunkIndex]; + + // Check if current buffers correspond to the chunk index being seeked + // and if the buffers have any data. + if (chunkIndex == chunkIndexOfCurrentBuffer && buffersAllocated()) { + // Position the buffer to the seeked position. + adjustBufferIndex(); + } else { + // Release the current buffers. The next readChunkFromContainer will + // read the required chunk and position the buffer to the seeked + // position. + releaseBuffers(); + } } - private void adjustBufferIndex(long pos) { - long tempOffest = chunkOffset[chunkIndex]; + private void adjustBufferIndex() { + if (bufferPosition == -1) { + // The stream has not been seeked to a position. No need to adjust the + // buffer Index and position. + return; + } + // The bufferPosition is w.r.t the buffers for current chunk. + // Adjust the bufferIndex and position to the seeked position. + long tempOffest = 0; for (int i = 0; i < buffers.size(); i++) { - if (pos - tempOffest >= buffers.get(i).capacity()) { + if (bufferPosition - tempOffest >= buffers.get(i).capacity()) { tempOffest += buffers.get(i).capacity(); } else { bufferIndex = i; break; } } - buffers.get(bufferIndex).position((int) (pos - tempOffest)); + buffers.get(bufferIndex).position((int) (bufferPosition - tempOffest)); + // Reset the bufferPosition as the seek() operation has been completed. + bufferPosition = -1; } @Override public synchronized long getPos() throws IOException { - if (chunkIndex == -1) { - // no data consumed yet, a new stream OR after seek - return 0; - } + // position = chunkOffset of current chunk (at chunkIndex) + position of + // the buffer corresponding to the chunk. + long bufferPos = 0; - if (blockStreamEOF()) { + if (bufferPosition >= 0) { + // seek has been called but the buffers were empty. Hence, the buffer + // position will be advanced after the buffers are filled. + // We return the chunkOffset + bufferPosition here as that will be the + // position of the buffer pointer after reading the chunk file. + bufferPos = bufferPosition; + + } else if (blockStreamEOF()) { // all data consumed, buffers have been released. // get position from the chunk offset and chunk length of last chunk - return chunkOffset[chunkIndex] + chunks.get(chunkIndex).getLen(); + bufferPos = chunks.get(chunkIndex).getLen(); + + } else if (buffersAllocated()) { + // get position from available buffers of current chunk + bufferPos = buffers.get(bufferIndex).position(); + } - // get position from available buffers of current chunk - return chunkOffset[chunkIndex] + buffers.get(bufferIndex).position(); + return chunkOffset[chunkIndex] + bufferPos; } @Override @@ -412,4 +501,9 @@ public boolean seekToNewSource(long targetPos) throws IOException { public BlockID getBlockID() { return blockID; } + + @VisibleForTesting + protected int getChunkIndex() { + return chunkIndex; + } } diff --git a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockInputStream.java b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockInputStream.java new file mode 100644 index 00000000000..35c102257f4 --- /dev/null +++ b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockInputStream.java @@ -0,0 +1,193 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.hdds.scm.storage; + +import org.apache.hadoop.hdds.client.BlockID; +import org.apache.hadoop.hdds.client.ContainerBlockID; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos + .ChecksumData; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos + .ChecksumType; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChunkInfo; +import org.apache.hadoop.hdds.scm.XceiverClientManager; +import org.apache.hadoop.hdds.scm.XceiverClientSpi; +import org.apache.ratis.thirdparty.com.google.protobuf.ByteString; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.io.EOFException; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Random; +import java.util.UUID; + +/** + * Tests {@link BlockInputStream}. + */ +public class TestBlockInputStream { + + private static BlockInputStream blockInputStream; + private static List chunks; + private static int blockSize; + + private static final int CHUNK_SIZE = 20; + + @Before + public void setup() throws Exception { + BlockID blockID = new BlockID(new ContainerBlockID(1, 1)); + chunks = createChunkList(10); + String traceID = UUID.randomUUID().toString(); + blockInputStream = new DummyBlockInputStream(blockID, null, null, chunks, + traceID, false, 0); + + blockSize = 0; + for (ChunkInfo chunk : chunks) { + blockSize += chunk.getLen(); + } + } + + /** + * Create a mock list of chunks. The first n-1 chunks of length CHUNK_SIZE + * and the last chunk with length CHUNK_SIZE/2. + * @param numChunks + * @return + */ + private static List createChunkList(int numChunks) { + ChecksumData dummyChecksumData = ChecksumData.newBuilder() + .setType(ChecksumType.NONE) + .setBytesPerChecksum(100) + .build(); + List chunkList = new ArrayList<>(numChunks); + int i; + for (i = 0; i < numChunks - 1; i++) { + String chunkName = "chunk-" + i; + ChunkInfo chunkInfo = ChunkInfo.newBuilder() + .setChunkName(chunkName) + .setOffset(0) + .setLen(CHUNK_SIZE) + .setChecksumData(dummyChecksumData) + .build(); + chunkList.add(chunkInfo); + } + ChunkInfo chunkInfo = ChunkInfo.newBuilder() + .setChunkName("chunk-" + i) + .setOffset(0) + .setLen(CHUNK_SIZE/2) + .setChecksumData(dummyChecksumData) + .build(); + chunkList.add(chunkInfo); + + return chunkList; + } + + /** + * A dummy BlockInputStream to test the functionality of BlockInputStream. + */ + private static class DummyBlockInputStream extends BlockInputStream { + + DummyBlockInputStream(BlockID blockID, + XceiverClientManager xceiverClientManager, + XceiverClientSpi xceiverClient, + List chunks, + String traceID, + boolean verifyChecksum, + long initialPosition) throws IOException { + super(blockID, xceiverClientManager, xceiverClient, chunks, traceID, + verifyChecksum, initialPosition); + } + + @Override + protected ByteString readChunk(final ChunkInfo chunkInfo, + List excludeDns, List dnListFromReply) + throws IOException { + return getByteString(chunkInfo.getChunkName(), (int) chunkInfo.getLen()); + } + + @Override + protected List getDatanodeList() { + // return an empty dummy list of size 10 + return new ArrayList<>(10); + } + + /** + * Create ByteString with the input data to return when a readChunk call is + * placed. + */ + private static ByteString getByteString(String data, int length) { + while (data.length() < length) { + data = data + "0"; + } + return ByteString.copyFrom(data.getBytes(), 0, length); + } + } + + @Test + public void testSeek() throws Exception { + // Seek to position 0 + int pos = 0; + seekAndVerify(pos); + Assert.assertEquals("ChunkIndex is incorrect", 0, + blockInputStream.getChunkIndex()); + + pos = CHUNK_SIZE; + seekAndVerify(pos); + Assert.assertEquals("ChunkIndex is incorrect", 1, + blockInputStream.getChunkIndex()); + + pos = (CHUNK_SIZE * 5) + 5; + seekAndVerify(pos); + Assert.assertEquals("ChunkIndex is incorrect", 5, + blockInputStream.getChunkIndex()); + + try { + // Try seeking beyond the blockSize. + pos = blockSize + 10; + seekAndVerify(pos); + Assert.fail("Seek to position beyond block size should fail."); + } catch (EOFException e) { + // Expected + } + + // Seek to random positions between 0 and the block size. + Random random = new Random(); + for (int i = 0; i < 10; i++) { + pos = random.nextInt(blockSize); + seekAndVerify(pos); + } + } + + @Test + public void testBlockEOF() throws Exception { + // Seek to some position < blockSize and verify EOF is not reached. + seekAndVerify(CHUNK_SIZE); + Assert.assertFalse(blockInputStream.blockStreamEOF()); + + // Seek to blockSize-1 and verify that EOF is not reached as the chunk + // has not been read from container yet. + seekAndVerify(blockSize-1); + Assert.assertFalse(blockInputStream.blockStreamEOF()); + } + + private void seekAndVerify(int pos) throws Exception { + blockInputStream.seek(pos); + Assert.assertEquals("Current position of buffer does not match with the " + + "seeked position", pos, blockInputStream.getPos()); + } +} diff --git a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/package-info.java b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/package-info.java new file mode 100644 index 00000000000..abdd04ea967 --- /dev/null +++ b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/package-info.java @@ -0,0 +1,21 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +/** + * This package contains Ozone InputStream related tests. + */ +package org.apache.hadoop.hdds.scm.storage; \ No newline at end of file diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyInputStream.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyInputStream.java index 3a92e0186e2..5b6342028a4 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyInputStream.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyInputStream.java @@ -84,11 +84,28 @@ public long getRemainingOfIndex(int index) throws IOException { * @param streamLength the max number of bytes that should be written to this * stream. */ + @VisibleForTesting public synchronized void addStream(BlockInputStream stream, long streamLength) { streamEntries.add(new ChunkInputStreamEntry(stream, streamLength)); } + /** + * Append another ChunkInputStreamEntry to the end of the list. + * The stream will be constructed from the input information when it needs + * to be accessed. + */ + private synchronized void addStream(OmKeyLocationInfo omKeyLocationInfo, + XceiverClientManager xceiverClientMngr, String clientRequestId, + boolean verifyChecksum) { + streamEntries.add(new ChunkInputStreamEntry(omKeyLocationInfo, + xceiverClientMngr, clientRequestId, verifyChecksum)); + } + + private synchronized ChunkInputStreamEntry getStreamEntry(int index) + throws IOException { + return streamEntries.get(index).getStream(); + } @Override public synchronized int read() throws IOException { @@ -120,7 +137,7 @@ public synchronized int read(byte[] b, int off, int len) throws IOException { .getRemaining() == 0)) { return totalReadLen == 0 ? EOF : totalReadLen; } - ChunkInputStreamEntry current = streamEntries.get(currentStreamIndex); + ChunkInputStreamEntry current = getStreamEntry(currentStreamIndex); int numBytesToRead = Math.min(len, (int)current.getRemaining()); int numBytesRead = current.read(b, off, numBytesToRead); if (numBytesRead != numBytesToRead) { @@ -212,13 +229,81 @@ public void close() throws IOException { public static class ChunkInputStreamEntry extends InputStream implements Seekable { - private final BlockInputStream blockInputStream; + private BlockInputStream blockInputStream; + private final OmKeyLocationInfo blockLocationInfo; private final long length; + private final XceiverClientManager xceiverClientManager; + private final String requestId; + private boolean verifyChecksum; + // the position of the blockInputStream is maintained by this variable + // till the stream is initialized + private long position; + + public ChunkInputStreamEntry(OmKeyLocationInfo omKeyLocationInfo, + XceiverClientManager xceiverClientMngr, String clientRequestId, + boolean verifyChecksum) { + this.blockLocationInfo = omKeyLocationInfo; + this.length = omKeyLocationInfo.getLength(); + this.xceiverClientManager = xceiverClientMngr; + this.requestId = clientRequestId; + this.verifyChecksum = verifyChecksum; + } + + @VisibleForTesting public ChunkInputStreamEntry(BlockInputStream blockInputStream, long length) { this.blockInputStream = blockInputStream; this.length = length; + this.blockLocationInfo = null; + this.xceiverClientManager = null; + this.requestId = null; + } + + private ChunkInputStreamEntry getStream() throws IOException { + if (this.blockInputStream == null) { + initializeBlockInputStream(); + } + return this; + } + + private void initializeBlockInputStream() throws IOException { + BlockID blockID = blockLocationInfo.getBlockID(); + long containerID = blockID.getContainerID(); + Pipeline pipeline = blockLocationInfo.getPipeline(); + + // irrespective of the container state, we will always read via Standalone + // protocol. + if (pipeline.getType() != HddsProtos.ReplicationType.STAND_ALONE) { + pipeline = Pipeline.newBuilder(pipeline) + .setType(HddsProtos.ReplicationType.STAND_ALONE).build(); + } + XceiverClientSpi xceiverClient = xceiverClientManager + .acquireClient(pipeline); + boolean success = false; + long containerKey = blockLocationInfo.getLocalID(); + try { + LOG.debug("Initializing stream for get key to access {} {}", + containerID, containerKey); + ContainerProtos.DatanodeBlockID datanodeBlockID = blockID + .getDatanodeBlockIDProtobuf(); + if (blockLocationInfo.getToken() != null) { + UserGroupInformation.getCurrentUser(). + addToken(blockLocationInfo.getToken()); + } + ContainerProtos.GetBlockResponseProto response = ContainerProtocolCalls + .getBlock(xceiverClient, datanodeBlockID, requestId); + List chunks = + response.getBlockData().getChunksList(); + success = true; + this.blockInputStream = new BlockInputStream( + blockLocationInfo.getBlockID(), xceiverClientManager, xceiverClient, + chunks, requestId, verifyChecksum, position); + } finally { + if (!success) { + xceiverClientManager.releaseClient(xceiverClient, false); + } + } } synchronized long getRemaining() throws IOException { @@ -240,17 +325,27 @@ public synchronized int read() throws IOException { @Override public synchronized void close() throws IOException { - blockInputStream.close(); + if (blockInputStream != null) { + blockInputStream.close(); + } } @Override public void seek(long pos) throws IOException { - blockInputStream.seek(pos); + if (blockInputStream != null) { + blockInputStream.seek(pos); + } else { + position = pos; + } } @Override public long getPos() throws IOException { - return blockInputStream.getPos(); + if (blockInputStream != null) { + return blockInputStream.getPos(); + } else { + return position; + } } @Override @@ -266,7 +361,6 @@ public static LengthInputStream getFromOmKeyInfo( storageContainerLocationClient, String requestId, boolean verifyChecksum) throws IOException { long length = 0; - long containerKey; KeyInputStream groupInputStream = new KeyInputStream(); groupInputStream.key = keyInfo.getKeyName(); List keyLocationInfos = @@ -274,48 +368,13 @@ public static LengthInputStream getFromOmKeyInfo( groupInputStream.streamOffset = new long[keyLocationInfos.size()]; for (int i = 0; i < keyLocationInfos.size(); i++) { OmKeyLocationInfo omKeyLocationInfo = keyLocationInfos.get(i); - BlockID blockID = omKeyLocationInfo.getBlockID(); - long containerID = blockID.getContainerID(); - Pipeline pipeline = omKeyLocationInfo.getPipeline(); + LOG.debug("Adding stream for accessing {}. The stream will be " + + "initialized later.", omKeyLocationInfo); + groupInputStream.addStream(omKeyLocationInfo, xceiverClientManager, + requestId, verifyChecksum); - // irrespective of the container state, we will always read via Standalone - // protocol. - if (pipeline.getType() != HddsProtos.ReplicationType.STAND_ALONE) { - pipeline = Pipeline.newBuilder(pipeline) - .setType(HddsProtos.ReplicationType.STAND_ALONE).build(); - } - XceiverClientSpi xceiverClient = xceiverClientManager - .acquireClient(pipeline); - boolean success = false; - containerKey = omKeyLocationInfo.getLocalID(); - try { - LOG.debug("get key accessing {} {}", - containerID, containerKey); - groupInputStream.streamOffset[i] = length; - ContainerProtos.DatanodeBlockID datanodeBlockID = blockID - .getDatanodeBlockIDProtobuf(); - if (omKeyLocationInfo.getToken() != null) { - UserGroupInformation.getCurrentUser(). - addToken(omKeyLocationInfo.getToken()); - } - ContainerProtos.GetBlockResponseProto response = ContainerProtocolCalls - .getBlock(xceiverClient, datanodeBlockID, requestId); - List chunks = - response.getBlockData().getChunksList(); - for (ContainerProtos.ChunkInfo chunk : chunks) { - length += chunk.getLen(); - } - success = true; - BlockInputStream inputStream = new BlockInputStream( - omKeyLocationInfo.getBlockID(), xceiverClientManager, xceiverClient, - chunks, requestId, verifyChecksum); - groupInputStream.addStream(inputStream, - omKeyLocationInfo.getLength()); - } finally { - if (!success) { - xceiverClientManager.releaseClient(xceiverClient, false); - } - } + groupInputStream.streamOffset[i] = length; + length += omKeyLocationInfo.getLength(); } groupInputStream.length = length; return new LengthInputStream(groupInputStream, length); diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestKeyInputStream.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestKeyInputStream.java new file mode 100644 index 00000000000..fa8a289ea81 --- /dev/null +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestKeyInputStream.java @@ -0,0 +1,175 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.ozone.client.rpc; + +import org.apache.hadoop.conf.StorageUnit; +import org.apache.hadoop.hdds.client.ReplicationType; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; +import org.apache.hadoop.hdds.scm.XceiverClientManager; +import org.apache.hadoop.hdds.scm.XceiverClientMetrics; +import org.apache.hadoop.ozone.MiniOzoneCluster; +import org.apache.hadoop.ozone.OzoneConfigKeys; +import org.apache.hadoop.ozone.client.ObjectStore; +import org.apache.hadoop.ozone.client.OzoneClient; +import org.apache.hadoop.ozone.client.OzoneClientFactory; +import org.apache.hadoop.ozone.client.io.KeyInputStream; +import org.apache.hadoop.ozone.client.io.OzoneOutputStream; +import org.apache.hadoop.ozone.container.ContainerTestHelper; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.IOException; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +import static java.nio.charset.StandardCharsets.UTF_8; +import static org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_SCM_WATCHER_TIMEOUT; +import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL; + +/** + * Tests {@link KeyInputStream}. + */ +public class TestKeyInputStream { + private static MiniOzoneCluster cluster; + private static OzoneConfiguration conf = new OzoneConfiguration(); + private static OzoneClient client; + private static ObjectStore objectStore; + private static int chunkSize; + private static int flushSize; + private static int maxFlushSize; + private static int blockSize; + private static String volumeName; + private static String bucketName; + private static String keyString; + + /** + * Create a MiniDFSCluster for testing. + *

+ * Ozone is made active by setting OZONE_ENABLED = true + * + * @throws IOException + */ + @BeforeClass + public static void init() throws Exception { + chunkSize = 100; + flushSize = 4 * chunkSize; + maxFlushSize = 2 * flushSize; + blockSize = 2 * maxFlushSize; + conf.set(OzoneConfigKeys.OZONE_CLIENT_WATCH_REQUEST_TIMEOUT, "5000ms"); + conf.setTimeDuration(HDDS_SCM_WATCHER_TIMEOUT, 1000, TimeUnit.MILLISECONDS); + conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 3, TimeUnit.SECONDS); + conf.setQuietMode(false); + conf.setStorageSize(OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE, 4, + StorageUnit.MB); + cluster = MiniOzoneCluster.newBuilder(conf) + .setNumDatanodes(3) + .setBlockSize(blockSize) + .setChunkSize(chunkSize) + .setStreamBufferFlushSize(flushSize) + .setStreamBufferMaxSize(maxFlushSize) + .setStreamBufferSizeUnit(StorageUnit.BYTES) + .build(); + cluster.waitForClusterToBeReady(); + //the easiest way to create an open container is creating a key + client = OzoneClientFactory.getClient(conf); + objectStore = client.getObjectStore(); + keyString = UUID.randomUUID().toString(); + volumeName = "test-key-input-stream-volume"; + bucketName = "test-key-input-stream-bucket"; + objectStore.createVolume(volumeName); + objectStore.getVolume(volumeName).createBucket(bucketName); + } + + /** + * Shutdown MiniDFSCluster. + */ + @AfterClass + public static void shutdown() { + if (cluster != null) { + cluster.shutdown(); + } + } + + private String getKeyName() { + return UUID.randomUUID().toString(); + } + + private OzoneOutputStream createKey(String keyName, ReplicationType type, + long size) throws Exception { + return ContainerTestHelper + .createKey(keyName, type, size, objectStore, volumeName, bucketName); + } + + @Test + public void testSeek() throws Exception { + XceiverClientMetrics metrics = XceiverClientManager + .getXceiverClientMetrics(); + long writeChunkCount = metrics.getContainerOpCountMetrics( + ContainerProtos.Type.WriteChunk); + long readChunkCount = metrics.getContainerOpCountMetrics( + ContainerProtos.Type.ReadChunk); + + String keyName = getKeyName(); + OzoneOutputStream key = ContainerTestHelper.createKey(keyName, + ReplicationType.RATIS, 0, objectStore, volumeName, bucketName); + + // write data spanning 3 chunks + int dataLength = (2 * chunkSize) + (chunkSize / 2); + byte[] inputData = ContainerTestHelper.getFixedLengthString( + keyString, dataLength).getBytes(UTF_8); + key.write(inputData); + key.close(); + + Assert.assertEquals(writeChunkCount + 3, + metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk)); + + KeyInputStream keyInputStream = (KeyInputStream) objectStore + .getVolume(volumeName).getBucket(bucketName).readKey(keyName) + .getInputStream(); + + // Seek to position 150 + keyInputStream.seek(150); + + Assert.assertEquals(150, keyInputStream.getPos()); + + // Seek operation should not result in any readChunk operation. + Assert.assertEquals(readChunkCount, metrics + .getContainerOpsMetrics(ContainerProtos.Type.ReadChunk)); + Assert.assertEquals(readChunkCount, metrics + .getContainerOpCountMetrics(ContainerProtos.Type.ReadChunk)); + + byte[] readData = new byte[chunkSize]; + keyInputStream.read(readData, 0, chunkSize); + + // Since we reading data from index 150 to 250 and the chunk boundary is + // 100 bytes, we need to read 2 chunks. + Assert.assertEquals(readChunkCount + 2, + metrics.getContainerOpCountMetrics(ContainerProtos.Type.ReadChunk)); + + keyInputStream.close(); + + // Verify that the data read matches with the input data at corresponding + // indices. + for (int i = 0; i < chunkSize; i++) { + Assert.assertEquals(inputData[chunkSize + 50 + i], readData[i]); + } + } +} diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestChunkStreams.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestChunkStreams.java index e4e449bb60e..45f04dfae0b 100644 --- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestChunkStreams.java +++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestChunkStreams.java @@ -49,7 +49,7 @@ public void testReadGroupInputStream() throws Exception { int tempOffset = offset; BlockInputStream in = new BlockInputStream(null, null, null, new ArrayList<>(), null, - true) { + true, 0) { private long pos = 0; private ByteArrayInputStream in = new ByteArrayInputStream(buf, tempOffset, 100); @@ -106,7 +106,7 @@ public void testErrorReadGroupInputStream() throws Exception { int tempOffset = offset; BlockInputStream in = new BlockInputStream(null, null, null, new ArrayList<>(), null, - true) { + true, 0) { private long pos = 0; private ByteArrayInputStream in = new ByteArrayInputStream(buf, tempOffset, 100);