HDDS-1491. Ozone KeyInputStream seek() should not read the chunk file. (#795)

This commit is contained in:
Hanisha Koneru 2019-05-13 20:49:52 -07:00 committed by GitHub
parent 389e640f0c
commit 02c9efcb81
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 654 additions and 112 deletions

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.hdds.scm.storage;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
@ -61,10 +62,18 @@ public class BlockInputStream extends InputStream implements Seekable {
private XceiverClientManager xceiverClientManager;
private XceiverClientSpi xceiverClient;
private List<ChunkInfo> chunks;
// ChunkIndex points to the index current chunk in the buffers or the the
// index of chunk which will be read next into the buffers in
// readChunkFromContainer().
private int chunkIndex;
// ChunkIndexOfCurrentBuffer points to the index of chunk read into the
// buffers or index of the last chunk in the buffers. It is updated only
// when a new chunk is read from container into the buffers.
private int chunkIndexOfCurrentBuffer;
private long[] chunkOffset;
private List<ByteBuffer> buffers;
private int bufferIndex;
private long bufferPosition;
private final boolean verifyChecksum;
/**
@ -76,24 +85,34 @@ public class BlockInputStream extends InputStream implements Seekable {
* @param chunks list of chunks to read
* @param traceID container protocol call traceID
* @param verifyChecksum verify checksum
* @param initialPosition the initial position of the stream pointer. This
* position is seeked now if the up-stream was seeked
* before this was created.
*/
public BlockInputStream(
BlockID blockID, XceiverClientManager xceiverClientManager,
XceiverClientSpi xceiverClient, List<ChunkInfo> chunks, String traceID,
boolean verifyChecksum) {
boolean verifyChecksum, long initialPosition) throws IOException {
this.blockID = blockID;
this.traceID = traceID;
this.xceiverClientManager = xceiverClientManager;
this.xceiverClient = xceiverClient;
this.chunks = chunks;
this.chunkIndex = -1;
this.chunkIndex = 0;
this.chunkIndexOfCurrentBuffer = -1;
// chunkOffset[i] stores offset at which chunk i stores data in
// BlockInputStream
this.chunkOffset = new long[this.chunks.size()];
initializeChunkOffset();
this.buffers = null;
this.bufferIndex = 0;
this.bufferPosition = -1;
this.verifyChecksum = verifyChecksum;
if (initialPosition > 0) {
// The stream was seeked to a position before the stream was
// initialized. So seeking to the position now.
seek(initialPosition);
}
}
private void initializeChunkOffset() {
@ -176,7 +195,7 @@ public class BlockInputStream extends InputStream implements Seekable {
*
* @return true if EOF, false if more data is available
*/
private boolean blockStreamEOF() {
protected boolean blockStreamEOF() {
if (buffersHaveData() || chunksRemaining()) {
return false;
} else {
@ -223,12 +242,19 @@ public class BlockInputStream extends InputStream implements Seekable {
*/
private synchronized int prepareRead(int len) throws IOException {
for (;;) {
if (!buffersAllocated()) {
// The current chunk at chunkIndex has not been read from the
// container. Read the chunk and put the data into buffers.
readChunkFromContainer();
}
if (buffersHaveData()) {
// Data is available from buffers
ByteBuffer bb = buffers.get(bufferIndex);
return len > bb.remaining() ? bb.remaining() : len;
} else if (chunksRemaining()) {
// There are additional chunks available.
// Read the next chunk in the block.
chunkIndex += 1;
readChunkFromContainer();
} else {
// All available input has been consumed.
@ -237,13 +263,17 @@ public class BlockInputStream extends InputStream implements Seekable {
}
}
private boolean buffersHaveData() {
boolean hasData = false;
private boolean buffersAllocated() {
if (buffers == null || buffers.isEmpty()) {
return false;
}
return true;
}
private boolean buffersHaveData() {
boolean hasData = false;
if (buffersAllocated()) {
while (bufferIndex < (buffers.size())) {
if (buffers.get(bufferIndex).hasRemaining()) {
// current buffer has data
@ -260,6 +290,7 @@ public class BlockInputStream extends InputStream implements Seekable {
}
}
}
}
return hasData;
}
@ -272,7 +303,14 @@ public class BlockInputStream extends InputStream implements Seekable {
if ((chunks == null) || chunks.isEmpty()) {
return false;
}
return (chunkIndex < (chunks.size() - 1));
// Check if more chunks are remaining in the stream after chunkIndex
if (chunkIndex < (chunks.size() - 1)) {
return true;
}
// ChunkIndex is the last chunk in the stream. Check if this chunk has
// been read from container or not. Return true if chunkIndex has not
// been read yet and false otherwise.
return chunkIndexOfCurrentBuffer != chunkIndex;
}
/**
@ -283,34 +321,14 @@ public class BlockInputStream extends InputStream implements Seekable {
* @throws IOException if there is an I/O error while performing the call
*/
private synchronized void readChunkFromContainer() throws IOException {
// On every chunk read chunkIndex should be increased so as to read the
// next chunk
chunkIndex += 1;
XceiverClientReply reply;
ReadChunkResponseProto readChunkResponse = null;
// Read the chunk at chunkIndex
final ChunkInfo chunkInfo = chunks.get(chunkIndex);
List<DatanodeDetails> excludeDns = null;
ByteString byteString;
List<DatanodeDetails> dnList = xceiverClient.getPipeline().getNodes();
List<DatanodeDetails> dnList = getDatanodeList();
while (true) {
try {
reply = ContainerProtocolCalls
.readChunk(xceiverClient, chunkInfo, blockID, traceID, excludeDns);
ContainerProtos.ContainerCommandResponseProto response;
response = reply.getResponse().get();
ContainerProtocolCalls.validateContainerResponse(response);
readChunkResponse = response.getReadChunk();
} catch (IOException e) {
if (e instanceof StorageContainerException) {
throw e;
}
throw new IOException("Unexpected OzoneException: " + e.toString(), e);
} catch (ExecutionException | InterruptedException e) {
throw new IOException(
"Failed to execute ReadChunk command for chunk " + chunkInfo
.getChunkName(), e);
}
byteString = readChunkResponse.getData();
List<DatanodeDetails> dnListFromReadChunkCall = new ArrayList<>();
byteString = readChunk(chunkInfo, excludeDns, dnListFromReadChunkCall);
try {
if (byteString.size() != chunkInfo.getLen()) {
// Bytes read from chunk should be equal to chunk size.
@ -333,7 +351,7 @@ public class BlockInputStream extends InputStream implements Seekable {
if (excludeDns == null) {
excludeDns = new ArrayList<>();
}
excludeDns.addAll(reply.getDatanodes());
excludeDns.addAll(dnListFromReadChunkCall);
if (excludeDns.size() == dnList.size()) {
throw ioe;
}
@ -342,6 +360,47 @@ public class BlockInputStream extends InputStream implements Seekable {
buffers = byteString.asReadOnlyByteBufferList();
bufferIndex = 0;
chunkIndexOfCurrentBuffer = chunkIndex;
// The bufferIndex and position might need to be adjusted if seek() was
// called on the stream before. This needs to be done so that the buffer
// position can be advanced to the 'seeked' position.
adjustBufferIndex();
}
/**
* Send RPC call to get the chunk from the container.
*/
@VisibleForTesting
protected ByteString readChunk(final ChunkInfo chunkInfo,
List<DatanodeDetails> excludeDns, List<DatanodeDetails> dnListFromReply)
throws IOException {
XceiverClientReply reply;
ReadChunkResponseProto readChunkResponse = null;
try {
reply = ContainerProtocolCalls
.readChunk(xceiverClient, chunkInfo, blockID, traceID, excludeDns);
ContainerProtos.ContainerCommandResponseProto response;
response = reply.getResponse().get();
ContainerProtocolCalls.validateContainerResponse(response);
readChunkResponse = response.getReadChunk();
dnListFromReply.addAll(reply.getDatanodes());
} catch (IOException e) {
if (e instanceof StorageContainerException) {
throw e;
}
throw new IOException("Unexpected OzoneException: " + e.toString(), e);
} catch (ExecutionException | InterruptedException e) {
throw new IOException(
"Failed to execute ReadChunk command for chunk " + chunkInfo
.getChunkName(), e);
}
return readChunkResponse.getData();
}
@VisibleForTesting
protected List<DatanodeDetails> getDatanodeList() {
return xceiverClient.getPipeline().getNodes();
}
@Override
@ -352,9 +411,8 @@ public class BlockInputStream extends InputStream implements Seekable {
throw new EOFException("EOF encountered pos: " + pos + " container key: "
+ blockID.getLocalID());
}
if (chunkIndex == -1) {
chunkIndex = Arrays.binarySearch(chunkOffset, pos);
} else if (pos < chunkOffset[chunkIndex]) {
if (pos < chunkOffset[chunkIndex]) {
chunkIndex = Arrays.binarySearch(chunkOffset, 0, chunkIndex, pos);
} else if (pos >= chunkOffset[chunkIndex] + chunks.get(chunkIndex)
.getLen()) {
@ -368,40 +426,71 @@ public class BlockInputStream extends InputStream implements Seekable {
// accordingly so that chunkIndex = insertionPoint - 1
chunkIndex = -chunkIndex -2;
}
// adjust chunkIndex so that readChunkFromContainer reads the correct chunk
chunkIndex -= 1;
readChunkFromContainer();
adjustBufferIndex(pos);
// The bufferPosition should be adjusted to account for the chunk offset
// of the chunk the the pos actually points to.
bufferPosition = pos - chunkOffset[chunkIndex];
// Check if current buffers correspond to the chunk index being seeked
// and if the buffers have any data.
if (chunkIndex == chunkIndexOfCurrentBuffer && buffersAllocated()) {
// Position the buffer to the seeked position.
adjustBufferIndex();
} else {
// Release the current buffers. The next readChunkFromContainer will
// read the required chunk and position the buffer to the seeked
// position.
releaseBuffers();
}
}
private void adjustBufferIndex(long pos) {
long tempOffest = chunkOffset[chunkIndex];
private void adjustBufferIndex() {
if (bufferPosition == -1) {
// The stream has not been seeked to a position. No need to adjust the
// buffer Index and position.
return;
}
// The bufferPosition is w.r.t the buffers for current chunk.
// Adjust the bufferIndex and position to the seeked position.
long tempOffest = 0;
for (int i = 0; i < buffers.size(); i++) {
if (pos - tempOffest >= buffers.get(i).capacity()) {
if (bufferPosition - tempOffest >= buffers.get(i).capacity()) {
tempOffest += buffers.get(i).capacity();
} else {
bufferIndex = i;
break;
}
}
buffers.get(bufferIndex).position((int) (pos - tempOffest));
buffers.get(bufferIndex).position((int) (bufferPosition - tempOffest));
// Reset the bufferPosition as the seek() operation has been completed.
bufferPosition = -1;
}
@Override
public synchronized long getPos() throws IOException {
if (chunkIndex == -1) {
// no data consumed yet, a new stream OR after seek
return 0;
}
// position = chunkOffset of current chunk (at chunkIndex) + position of
// the buffer corresponding to the chunk.
long bufferPos = 0;
if (blockStreamEOF()) {
if (bufferPosition >= 0) {
// seek has been called but the buffers were empty. Hence, the buffer
// position will be advanced after the buffers are filled.
// We return the chunkOffset + bufferPosition here as that will be the
// position of the buffer pointer after reading the chunk file.
bufferPos = bufferPosition;
} else if (blockStreamEOF()) {
// all data consumed, buffers have been released.
// get position from the chunk offset and chunk length of last chunk
return chunkOffset[chunkIndex] + chunks.get(chunkIndex).getLen();
bufferPos = chunks.get(chunkIndex).getLen();
} else if (buffersAllocated()) {
// get position from available buffers of current chunk
bufferPos = buffers.get(bufferIndex).position();
}
// get position from available buffers of current chunk
return chunkOffset[chunkIndex] + buffers.get(bufferIndex).position();
return chunkOffset[chunkIndex] + bufferPos;
}
@Override
@ -412,4 +501,9 @@ public class BlockInputStream extends InputStream implements Seekable {
public BlockID getBlockID() {
return blockID;
}
@VisibleForTesting
protected int getChunkIndex() {
return chunkIndex;
}
}

View File

@ -0,0 +1,193 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.hdds.scm.storage;
import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.client.ContainerBlockID;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
.ChecksumData;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
.ChecksumType;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChunkInfo;
import org.apache.hadoop.hdds.scm.XceiverClientManager;
import org.apache.hadoop.hdds.scm.XceiverClientSpi;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.io.EOFException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.UUID;
/**
* Tests {@link BlockInputStream}.
*/
public class TestBlockInputStream {
private static BlockInputStream blockInputStream;
private static List<ChunkInfo> chunks;
private static int blockSize;
private static final int CHUNK_SIZE = 20;
@Before
public void setup() throws Exception {
BlockID blockID = new BlockID(new ContainerBlockID(1, 1));
chunks = createChunkList(10);
String traceID = UUID.randomUUID().toString();
blockInputStream = new DummyBlockInputStream(blockID, null, null, chunks,
traceID, false, 0);
blockSize = 0;
for (ChunkInfo chunk : chunks) {
blockSize += chunk.getLen();
}
}
/**
* Create a mock list of chunks. The first n-1 chunks of length CHUNK_SIZE
* and the last chunk with length CHUNK_SIZE/2.
* @param numChunks
* @return
*/
private static List<ChunkInfo> createChunkList(int numChunks) {
ChecksumData dummyChecksumData = ChecksumData.newBuilder()
.setType(ChecksumType.NONE)
.setBytesPerChecksum(100)
.build();
List<ChunkInfo> chunkList = new ArrayList<>(numChunks);
int i;
for (i = 0; i < numChunks - 1; i++) {
String chunkName = "chunk-" + i;
ChunkInfo chunkInfo = ChunkInfo.newBuilder()
.setChunkName(chunkName)
.setOffset(0)
.setLen(CHUNK_SIZE)
.setChecksumData(dummyChecksumData)
.build();
chunkList.add(chunkInfo);
}
ChunkInfo chunkInfo = ChunkInfo.newBuilder()
.setChunkName("chunk-" + i)
.setOffset(0)
.setLen(CHUNK_SIZE/2)
.setChecksumData(dummyChecksumData)
.build();
chunkList.add(chunkInfo);
return chunkList;
}
/**
* A dummy BlockInputStream to test the functionality of BlockInputStream.
*/
private static class DummyBlockInputStream extends BlockInputStream {
DummyBlockInputStream(BlockID blockID,
XceiverClientManager xceiverClientManager,
XceiverClientSpi xceiverClient,
List<ChunkInfo> chunks,
String traceID,
boolean verifyChecksum,
long initialPosition) throws IOException {
super(blockID, xceiverClientManager, xceiverClient, chunks, traceID,
verifyChecksum, initialPosition);
}
@Override
protected ByteString readChunk(final ChunkInfo chunkInfo,
List<DatanodeDetails> excludeDns, List<DatanodeDetails> dnListFromReply)
throws IOException {
return getByteString(chunkInfo.getChunkName(), (int) chunkInfo.getLen());
}
@Override
protected List<DatanodeDetails> getDatanodeList() {
// return an empty dummy list of size 10
return new ArrayList<>(10);
}
/**
* Create ByteString with the input data to return when a readChunk call is
* placed.
*/
private static ByteString getByteString(String data, int length) {
while (data.length() < length) {
data = data + "0";
}
return ByteString.copyFrom(data.getBytes(), 0, length);
}
}
@Test
public void testSeek() throws Exception {
// Seek to position 0
int pos = 0;
seekAndVerify(pos);
Assert.assertEquals("ChunkIndex is incorrect", 0,
blockInputStream.getChunkIndex());
pos = CHUNK_SIZE;
seekAndVerify(pos);
Assert.assertEquals("ChunkIndex is incorrect", 1,
blockInputStream.getChunkIndex());
pos = (CHUNK_SIZE * 5) + 5;
seekAndVerify(pos);
Assert.assertEquals("ChunkIndex is incorrect", 5,
blockInputStream.getChunkIndex());
try {
// Try seeking beyond the blockSize.
pos = blockSize + 10;
seekAndVerify(pos);
Assert.fail("Seek to position beyond block size should fail.");
} catch (EOFException e) {
// Expected
}
// Seek to random positions between 0 and the block size.
Random random = new Random();
for (int i = 0; i < 10; i++) {
pos = random.nextInt(blockSize);
seekAndVerify(pos);
}
}
@Test
public void testBlockEOF() throws Exception {
// Seek to some position < blockSize and verify EOF is not reached.
seekAndVerify(CHUNK_SIZE);
Assert.assertFalse(blockInputStream.blockStreamEOF());
// Seek to blockSize-1 and verify that EOF is not reached as the chunk
// has not been read from container yet.
seekAndVerify(blockSize-1);
Assert.assertFalse(blockInputStream.blockStreamEOF());
}
private void seekAndVerify(int pos) throws Exception {
blockInputStream.seek(pos);
Assert.assertEquals("Current position of buffer does not match with the " +
"seeked position", pos, blockInputStream.getPos());
}
}

View File

@ -0,0 +1,21 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
/**
* This package contains Ozone InputStream related tests.
*/
package org.apache.hadoop.hdds.scm.storage;

View File

@ -84,11 +84,28 @@ public class KeyInputStream extends InputStream implements Seekable {
* @param streamLength the max number of bytes that should be written to this
* stream.
*/
@VisibleForTesting
public synchronized void addStream(BlockInputStream stream,
long streamLength) {
streamEntries.add(new ChunkInputStreamEntry(stream, streamLength));
}
/**
* Append another ChunkInputStreamEntry to the end of the list.
* The stream will be constructed from the input information when it needs
* to be accessed.
*/
private synchronized void addStream(OmKeyLocationInfo omKeyLocationInfo,
XceiverClientManager xceiverClientMngr, String clientRequestId,
boolean verifyChecksum) {
streamEntries.add(new ChunkInputStreamEntry(omKeyLocationInfo,
xceiverClientMngr, clientRequestId, verifyChecksum));
}
private synchronized ChunkInputStreamEntry getStreamEntry(int index)
throws IOException {
return streamEntries.get(index).getStream();
}
@Override
public synchronized int read() throws IOException {
@ -120,7 +137,7 @@ public class KeyInputStream extends InputStream implements Seekable {
.getRemaining() == 0)) {
return totalReadLen == 0 ? EOF : totalReadLen;
}
ChunkInputStreamEntry current = streamEntries.get(currentStreamIndex);
ChunkInputStreamEntry current = getStreamEntry(currentStreamIndex);
int numBytesToRead = Math.min(len, (int)current.getRemaining());
int numBytesRead = current.read(b, off, numBytesToRead);
if (numBytesRead != numBytesToRead) {
@ -212,13 +229,81 @@ public class KeyInputStream extends InputStream implements Seekable {
public static class ChunkInputStreamEntry extends InputStream
implements Seekable {
private final BlockInputStream blockInputStream;
private BlockInputStream blockInputStream;
private final OmKeyLocationInfo blockLocationInfo;
private final long length;
private final XceiverClientManager xceiverClientManager;
private final String requestId;
private boolean verifyChecksum;
// the position of the blockInputStream is maintained by this variable
// till the stream is initialized
private long position;
public ChunkInputStreamEntry(OmKeyLocationInfo omKeyLocationInfo,
XceiverClientManager xceiverClientMngr, String clientRequestId,
boolean verifyChecksum) {
this.blockLocationInfo = omKeyLocationInfo;
this.length = omKeyLocationInfo.getLength();
this.xceiverClientManager = xceiverClientMngr;
this.requestId = clientRequestId;
this.verifyChecksum = verifyChecksum;
}
@VisibleForTesting
public ChunkInputStreamEntry(BlockInputStream blockInputStream,
long length) {
this.blockInputStream = blockInputStream;
this.length = length;
this.blockLocationInfo = null;
this.xceiverClientManager = null;
this.requestId = null;
}
private ChunkInputStreamEntry getStream() throws IOException {
if (this.blockInputStream == null) {
initializeBlockInputStream();
}
return this;
}
private void initializeBlockInputStream() throws IOException {
BlockID blockID = blockLocationInfo.getBlockID();
long containerID = blockID.getContainerID();
Pipeline pipeline = blockLocationInfo.getPipeline();
// irrespective of the container state, we will always read via Standalone
// protocol.
if (pipeline.getType() != HddsProtos.ReplicationType.STAND_ALONE) {
pipeline = Pipeline.newBuilder(pipeline)
.setType(HddsProtos.ReplicationType.STAND_ALONE).build();
}
XceiverClientSpi xceiverClient = xceiverClientManager
.acquireClient(pipeline);
boolean success = false;
long containerKey = blockLocationInfo.getLocalID();
try {
LOG.debug("Initializing stream for get key to access {} {}",
containerID, containerKey);
ContainerProtos.DatanodeBlockID datanodeBlockID = blockID
.getDatanodeBlockIDProtobuf();
if (blockLocationInfo.getToken() != null) {
UserGroupInformation.getCurrentUser().
addToken(blockLocationInfo.getToken());
}
ContainerProtos.GetBlockResponseProto response = ContainerProtocolCalls
.getBlock(xceiverClient, datanodeBlockID, requestId);
List<ContainerProtos.ChunkInfo> chunks =
response.getBlockData().getChunksList();
success = true;
this.blockInputStream = new BlockInputStream(
blockLocationInfo.getBlockID(), xceiverClientManager, xceiverClient,
chunks, requestId, verifyChecksum, position);
} finally {
if (!success) {
xceiverClientManager.releaseClient(xceiverClient, false);
}
}
}
synchronized long getRemaining() throws IOException {
@ -240,17 +325,27 @@ public class KeyInputStream extends InputStream implements Seekable {
@Override
public synchronized void close() throws IOException {
if (blockInputStream != null) {
blockInputStream.close();
}
}
@Override
public void seek(long pos) throws IOException {
if (blockInputStream != null) {
blockInputStream.seek(pos);
} else {
position = pos;
}
}
@Override
public long getPos() throws IOException {
if (blockInputStream != null) {
return blockInputStream.getPos();
} else {
return position;
}
}
@Override
@ -266,7 +361,6 @@ public class KeyInputStream extends InputStream implements Seekable {
storageContainerLocationClient,
String requestId, boolean verifyChecksum) throws IOException {
long length = 0;
long containerKey;
KeyInputStream groupInputStream = new KeyInputStream();
groupInputStream.key = keyInfo.getKeyName();
List<OmKeyLocationInfo> keyLocationInfos =
@ -274,48 +368,13 @@ public class KeyInputStream extends InputStream implements Seekable {
groupInputStream.streamOffset = new long[keyLocationInfos.size()];
for (int i = 0; i < keyLocationInfos.size(); i++) {
OmKeyLocationInfo omKeyLocationInfo = keyLocationInfos.get(i);
BlockID blockID = omKeyLocationInfo.getBlockID();
long containerID = blockID.getContainerID();
Pipeline pipeline = omKeyLocationInfo.getPipeline();
LOG.debug("Adding stream for accessing {}. The stream will be " +
"initialized later.", omKeyLocationInfo);
groupInputStream.addStream(omKeyLocationInfo, xceiverClientManager,
requestId, verifyChecksum);
// irrespective of the container state, we will always read via Standalone
// protocol.
if (pipeline.getType() != HddsProtos.ReplicationType.STAND_ALONE) {
pipeline = Pipeline.newBuilder(pipeline)
.setType(HddsProtos.ReplicationType.STAND_ALONE).build();
}
XceiverClientSpi xceiverClient = xceiverClientManager
.acquireClient(pipeline);
boolean success = false;
containerKey = omKeyLocationInfo.getLocalID();
try {
LOG.debug("get key accessing {} {}",
containerID, containerKey);
groupInputStream.streamOffset[i] = length;
ContainerProtos.DatanodeBlockID datanodeBlockID = blockID
.getDatanodeBlockIDProtobuf();
if (omKeyLocationInfo.getToken() != null) {
UserGroupInformation.getCurrentUser().
addToken(omKeyLocationInfo.getToken());
}
ContainerProtos.GetBlockResponseProto response = ContainerProtocolCalls
.getBlock(xceiverClient, datanodeBlockID, requestId);
List<ContainerProtos.ChunkInfo> chunks =
response.getBlockData().getChunksList();
for (ContainerProtos.ChunkInfo chunk : chunks) {
length += chunk.getLen();
}
success = true;
BlockInputStream inputStream = new BlockInputStream(
omKeyLocationInfo.getBlockID(), xceiverClientManager, xceiverClient,
chunks, requestId, verifyChecksum);
groupInputStream.addStream(inputStream,
omKeyLocationInfo.getLength());
} finally {
if (!success) {
xceiverClientManager.releaseClient(xceiverClient, false);
}
}
length += omKeyLocationInfo.getLength();
}
groupInputStream.length = length;
return new LengthInputStream(groupInputStream, length);

View File

@ -0,0 +1,175 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.ozone.client.rpc;
import org.apache.hadoop.conf.StorageUnit;
import org.apache.hadoop.hdds.client.ReplicationType;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.scm.XceiverClientManager;
import org.apache.hadoop.hdds.scm.XceiverClientMetrics;
import org.apache.hadoop.ozone.MiniOzoneCluster;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.client.ObjectStore;
import org.apache.hadoop.ozone.client.OzoneClient;
import org.apache.hadoop.ozone.client.OzoneClientFactory;
import org.apache.hadoop.ozone.client.io.KeyInputStream;
import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
import org.apache.hadoop.ozone.container.ContainerTestHelper;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_SCM_WATCHER_TIMEOUT;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL;
/**
* Tests {@link KeyInputStream}.
*/
public class TestKeyInputStream {
private static MiniOzoneCluster cluster;
private static OzoneConfiguration conf = new OzoneConfiguration();
private static OzoneClient client;
private static ObjectStore objectStore;
private static int chunkSize;
private static int flushSize;
private static int maxFlushSize;
private static int blockSize;
private static String volumeName;
private static String bucketName;
private static String keyString;
/**
* Create a MiniDFSCluster for testing.
* <p>
* Ozone is made active by setting OZONE_ENABLED = true
*
* @throws IOException
*/
@BeforeClass
public static void init() throws Exception {
chunkSize = 100;
flushSize = 4 * chunkSize;
maxFlushSize = 2 * flushSize;
blockSize = 2 * maxFlushSize;
conf.set(OzoneConfigKeys.OZONE_CLIENT_WATCH_REQUEST_TIMEOUT, "5000ms");
conf.setTimeDuration(HDDS_SCM_WATCHER_TIMEOUT, 1000, TimeUnit.MILLISECONDS);
conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 3, TimeUnit.SECONDS);
conf.setQuietMode(false);
conf.setStorageSize(OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE, 4,
StorageUnit.MB);
cluster = MiniOzoneCluster.newBuilder(conf)
.setNumDatanodes(3)
.setBlockSize(blockSize)
.setChunkSize(chunkSize)
.setStreamBufferFlushSize(flushSize)
.setStreamBufferMaxSize(maxFlushSize)
.setStreamBufferSizeUnit(StorageUnit.BYTES)
.build();
cluster.waitForClusterToBeReady();
//the easiest way to create an open container is creating a key
client = OzoneClientFactory.getClient(conf);
objectStore = client.getObjectStore();
keyString = UUID.randomUUID().toString();
volumeName = "test-key-input-stream-volume";
bucketName = "test-key-input-stream-bucket";
objectStore.createVolume(volumeName);
objectStore.getVolume(volumeName).createBucket(bucketName);
}
/**
* Shutdown MiniDFSCluster.
*/
@AfterClass
public static void shutdown() {
if (cluster != null) {
cluster.shutdown();
}
}
private String getKeyName() {
return UUID.randomUUID().toString();
}
private OzoneOutputStream createKey(String keyName, ReplicationType type,
long size) throws Exception {
return ContainerTestHelper
.createKey(keyName, type, size, objectStore, volumeName, bucketName);
}
@Test
public void testSeek() throws Exception {
XceiverClientMetrics metrics = XceiverClientManager
.getXceiverClientMetrics();
long writeChunkCount = metrics.getContainerOpCountMetrics(
ContainerProtos.Type.WriteChunk);
long readChunkCount = metrics.getContainerOpCountMetrics(
ContainerProtos.Type.ReadChunk);
String keyName = getKeyName();
OzoneOutputStream key = ContainerTestHelper.createKey(keyName,
ReplicationType.RATIS, 0, objectStore, volumeName, bucketName);
// write data spanning 3 chunks
int dataLength = (2 * chunkSize) + (chunkSize / 2);
byte[] inputData = ContainerTestHelper.getFixedLengthString(
keyString, dataLength).getBytes(UTF_8);
key.write(inputData);
key.close();
Assert.assertEquals(writeChunkCount + 3,
metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk));
KeyInputStream keyInputStream = (KeyInputStream) objectStore
.getVolume(volumeName).getBucket(bucketName).readKey(keyName)
.getInputStream();
// Seek to position 150
keyInputStream.seek(150);
Assert.assertEquals(150, keyInputStream.getPos());
// Seek operation should not result in any readChunk operation.
Assert.assertEquals(readChunkCount, metrics
.getContainerOpsMetrics(ContainerProtos.Type.ReadChunk));
Assert.assertEquals(readChunkCount, metrics
.getContainerOpCountMetrics(ContainerProtos.Type.ReadChunk));
byte[] readData = new byte[chunkSize];
keyInputStream.read(readData, 0, chunkSize);
// Since we reading data from index 150 to 250 and the chunk boundary is
// 100 bytes, we need to read 2 chunks.
Assert.assertEquals(readChunkCount + 2,
metrics.getContainerOpCountMetrics(ContainerProtos.Type.ReadChunk));
keyInputStream.close();
// Verify that the data read matches with the input data at corresponding
// indices.
for (int i = 0; i < chunkSize; i++) {
Assert.assertEquals(inputData[chunkSize + 50 + i], readData[i]);
}
}
}

View File

@ -49,7 +49,7 @@ public class TestChunkStreams {
int tempOffset = offset;
BlockInputStream in =
new BlockInputStream(null, null, null, new ArrayList<>(), null,
true) {
true, 0) {
private long pos = 0;
private ByteArrayInputStream in =
new ByteArrayInputStream(buf, tempOffset, 100);
@ -106,7 +106,7 @@ public class TestChunkStreams {
int tempOffset = offset;
BlockInputStream in =
new BlockInputStream(null, null, null, new ArrayList<>(), null,
true) {
true, 0) {
private long pos = 0;
private ByteArrayInputStream in =
new ByteArrayInputStream(buf, tempOffset, 100);