HDDS-1496. Support partial chunk reads and checksum verification (#804)

This commit is contained in:
Hanisha Koneru 2019-06-06 19:44:40 -07:00 committed by GitHub
parent 0b115b60b0
commit a91d24fea4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 1389 additions and 754 deletions

View File

@ -19,145 +19,197 @@
package org.apache.hadoop.hdds.scm.storage;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.scm.container.common.helpers
.StorageContainerException;
import org.apache.hadoop.ozone.common.Checksum;
import org.apache.hadoop.ozone.common.ChecksumData;
import org.apache.hadoop.ozone.common.OzoneChecksumException;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.fs.Seekable;
import org.apache.hadoop.hdds.scm.XceiverClientManager;
import org.apache.hadoop.hdds.scm.XceiverClientSpi;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChunkInfo;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
.ReadChunkResponseProto;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.
ContainerCommandResponseProto;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.
ContainerCommandRequestProto;
import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.DatanodeBlockID;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.GetBlockResponseProto;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
/**
* An {@link InputStream} used by the REST service in combination with the
* SCMClient to read the value of a key from a sequence
* of container chunks. All bytes of the key value are stored in container
* chunks. Each chunk may contain multiple underlying {@link ByteBuffer}
* instances. This class encapsulates all state management for iterating
* through the sequence of chunks and the sequence of buffers within each chunk.
* An {@link InputStream} called from KeyInputStream to read a block from the
* container.
* This class encapsulates all state management for iterating
* through the sequence of chunks through {@link ChunkInputStream}.
*/
public class BlockInputStream extends InputStream implements Seekable {
private static final Logger LOG =
LoggerFactory.getLogger(BlockInputStream.class);
private static final int EOF = -1;
private final BlockID blockID;
private final long length;
private Pipeline pipeline;
private final Token<OzoneBlockTokenIdentifier> token;
private final boolean verifyChecksum;
private final String traceID;
private XceiverClientManager xceiverClientManager;
private XceiverClientSpi xceiverClient;
private List<ChunkInfo> chunks;
// ChunkIndex points to the index current chunk in the buffers or the the
// index of chunk which will be read next into the buffers in
// readChunkFromContainer().
private boolean initialized = false;
// List of ChunkInputStreams, one for each chunk in the block
private List<ChunkInputStream> chunkStreams;
// chunkOffsets[i] stores the index of the first data byte in
// chunkStream i w.r.t the block data.
// Lets say we have chunk size as 40 bytes. And let's say the parent
// block stores data from index 200 and has length 400.
// The first 40 bytes of this block will be stored in chunk[0], next 40 in
// chunk[1] and so on. But since the chunkOffsets are w.r.t the block only
// and not the key, the values in chunkOffsets will be [0, 40, 80,....].
private long[] chunkOffsets = null;
// Index of the chunkStream corresponding to the current position of the
// BlockInputStream i.e offset of the data to be read next from this block
private int chunkIndex;
// ChunkIndexOfCurrentBuffer points to the index of chunk read into the
// buffers or index of the last chunk in the buffers. It is updated only
// when a new chunk is read from container into the buffers.
private int chunkIndexOfCurrentBuffer;
private long[] chunkOffset;
private List<ByteBuffer> buffers;
private int bufferIndex;
private long bufferPosition;
private boolean verifyChecksum;
// Position of the BlockInputStream is maintainted by this variable till
// the stream is initialized. This position is w.r.t to the block only and
// not the key.
// For the above example, if we seek to position 240 before the stream is
// initialized, then value of blockPosition will be set to 40.
// Once, the stream is initialized, the position of the stream
// will be determined by the current chunkStream and its position.
private long blockPosition = 0;
// Tracks the chunkIndex corresponding to the last blockPosition so that it
// can be reset if a new position is seeked.
private int chunkIndexOfPrevPosition;
public BlockInputStream(BlockID blockId, long blockLen, Pipeline pipeline,
Token<OzoneBlockTokenIdentifier> token, boolean verifyChecksum,
String traceId, XceiverClientManager xceiverClientManager) {
this.blockID = blockId;
this.length = blockLen;
this.pipeline = pipeline;
this.token = token;
this.verifyChecksum = verifyChecksum;
this.traceID = traceId;
this.xceiverClientManager = xceiverClientManager;
}
/**
* Creates a new BlockInputStream.
*
* @param blockID block ID of the chunk
* @param xceiverClientManager client manager that controls client
* @param xceiverClient client to perform container calls
* @param chunks list of chunks to read
* @param traceID container protocol call traceID
* @param verifyChecksum verify checksum
* @param initialPosition the initial position of the stream pointer. This
* position is seeked now if the up-stream was seeked
* before this was created.
* Initialize the BlockInputStream. Get the BlockData (list of chunks) from
* the Container and create the ChunkInputStreams for each Chunk in the Block.
*/
public BlockInputStream(
BlockID blockID, XceiverClientManager xceiverClientManager,
XceiverClientSpi xceiverClient, List<ChunkInfo> chunks, String traceID,
boolean verifyChecksum, long initialPosition) throws IOException {
this.blockID = blockID;
this.traceID = traceID;
this.xceiverClientManager = xceiverClientManager;
this.xceiverClient = xceiverClient;
this.chunks = chunks;
this.chunkIndex = 0;
this.chunkIndexOfCurrentBuffer = -1;
// chunkOffset[i] stores offset at which chunk i stores data in
// BlockInputStream
this.chunkOffset = new long[this.chunks.size()];
initializeChunkOffset();
this.buffers = null;
this.bufferIndex = 0;
this.bufferPosition = -1;
this.verifyChecksum = verifyChecksum;
if (initialPosition > 0) {
// The stream was seeked to a position before the stream was
// initialized. So seeking to the position now.
seek(initialPosition);
public synchronized void initialize() throws IOException {
// Pre-check that the stream has not been intialized already
if (initialized) {
return;
}
List<ChunkInfo> chunks = getChunkInfos();
if (chunks != null && !chunks.isEmpty()) {
// For each chunk in the block, create a ChunkInputStream and compute
// its chunkOffset
this.chunkOffsets = new long[chunks.size()];
long tempOffset = 0;
this.chunkStreams = new ArrayList<>(chunks.size());
for (int i = 0; i < chunks.size(); i++) {
addStream(chunks.get(i));
chunkOffsets[i] = tempOffset;
tempOffset += chunks.get(i).getLen();
}
initialized = true;
this.chunkIndex = 0;
if (blockPosition > 0) {
// Stream was seeked to blockPosition before initialization. Seek to the
// blockPosition now.
seek(blockPosition);
}
}
}
private void initializeChunkOffset() {
long tempOffset = 0;
for (int i = 0; i < chunks.size(); i++) {
chunkOffset[i] = tempOffset;
tempOffset += chunks.get(i).getLen();
/**
* Send RPC call to get the block info from the container.
* @return List of chunks in this block.
*/
protected List<ChunkInfo> getChunkInfos() throws IOException {
// irrespective of the container state, we will always read via Standalone
// protocol.
if (pipeline.getType() != HddsProtos.ReplicationType.STAND_ALONE) {
pipeline = Pipeline.newBuilder(pipeline)
.setType(HddsProtos.ReplicationType.STAND_ALONE).build();
}
xceiverClient = xceiverClientManager.acquireClient(pipeline);
boolean success = false;
List<ChunkInfo> chunks;
try {
LOG.debug("Initializing BlockInputStream for get key to access {}",
blockID.getContainerID());
if (token != null) {
UserGroupInformation.getCurrentUser().addToken(token);
}
DatanodeBlockID datanodeBlockID = blockID
.getDatanodeBlockIDProtobuf();
GetBlockResponseProto response = ContainerProtocolCalls
.getBlock(xceiverClient, datanodeBlockID, traceID);
chunks = response.getBlockData().getChunksList();
success = true;
} finally {
if (!success) {
xceiverClientManager.releaseClient(xceiverClient, false);
}
}
return chunks;
}
/**
* Append another ChunkInputStream to the end of the list. Note that the
* ChunkInputStream is only created here. The chunk will be read from the
* Datanode only when a read operation is performed on for that chunk.
*/
protected synchronized void addStream(ChunkInfo chunkInfo) {
chunkStreams.add(new ChunkInputStream(chunkInfo, blockID, traceID,
xceiverClient, verifyChecksum));
}
public synchronized long getRemaining() throws IOException {
return length - getPos();
}
/**
* {@inheritDoc}
*/
@Override
public synchronized int read()
throws IOException {
checkOpen();
int available = prepareRead(1);
int dataout = EOF;
if (available == EOF) {
Preconditions
.checkState(buffers == null); //should have released by now, see below
} else {
dataout = Byte.toUnsignedInt(buffers.get(bufferIndex).get());
public synchronized int read() throws IOException {
byte[] buf = new byte[1];
if (read(buf, 0, 1) == EOF) {
return EOF;
}
if (blockStreamEOF()) {
// consumer might use getPos to determine EOF,
// so release buffers when serving the last byte of data
releaseBuffers();
}
return dataout;
return Byte.toUnsignedInt(buf[0]);
}
/**
* {@inheritDoc}
*/
@Override
public synchronized int read(byte[] b, int off, int len) throws IOException {
// According to the JavaDocs for InputStream, it is recommended that
// subclasses provide an override of bulk read if possible for performance
// reasons. In addition to performance, we need to do it for correctness
// reasons. The Ozone REST service uses PipedInputStream and
// PipedOutputStream to relay HTTP response data between a Jersey thread and
// a Netty thread. It turns out that PipedInputStream/PipedOutputStream
// have a subtle dependency (bug?) on the wrapped stream providing separate
// implementations of single-byte read and bulk read. Without this, get key
// responses might close the connection before writing all of the bytes
// advertised in the Content-Length.
if (b == null) {
throw new NullPointerException();
}
@ -167,50 +219,124 @@ public class BlockInputStream extends InputStream implements Seekable {
if (len == 0) {
return 0;
}
if (!initialized) {
initialize();
}
checkOpen();
int total = 0;
int totalReadLen = 0;
while (len > 0) {
int available = prepareRead(len);
if (available == EOF) {
Preconditions
.checkState(buffers == null); //should have been released by now
return total != 0 ? total : EOF;
// if we are at the last chunk and have read the entire chunk, return
if (chunkStreams.size() == 0 ||
(chunkStreams.size() - 1 <= chunkIndex &&
chunkStreams.get(chunkIndex)
.getRemaining() == 0)) {
return totalReadLen == 0 ? EOF : totalReadLen;
}
buffers.get(bufferIndex).get(b, off + total, available);
len -= available;
total += available;
}
if (blockStreamEOF()) {
// smart consumers determine EOF by calling getPos()
// so we release buffers when serving the final bytes of data
releaseBuffers();
// Get the current chunkStream and read data from it
ChunkInputStream current = chunkStreams.get(chunkIndex);
int numBytesToRead = Math.min(len, (int)current.getRemaining());
int numBytesRead = current.read(b, off, numBytesToRead);
if (numBytesRead != numBytesToRead) {
// This implies that there is either data loss or corruption in the
// chunk entries. Even EOF in the current stream would be covered in
// this case.
throw new IOException(String.format(
"Inconsistent read for chunkName=%s length=%d numBytesRead=%d",
current.getChunkName(), current.getLength(), numBytesRead));
}
totalReadLen += numBytesRead;
off += numBytesRead;
len -= numBytesRead;
if (current.getRemaining() <= 0 &&
((chunkIndex + 1) < chunkStreams.size())) {
chunkIndex += 1;
}
}
return total;
return totalReadLen;
}
/**
* Determines if all data in the stream has been consumed.
* Seeks the BlockInputStream to the specified position. If the stream is
* not initialized, save the seeked position via blockPosition. Otherwise,
* update the position in 2 steps:
* 1. Updating the chunkIndex to the chunkStream corresponding to the
* seeked position.
* 2. Seek the corresponding chunkStream to the adjusted position.
*
* @return true if EOF, false if more data is available
* Lets say we have chunk size as 40 bytes. And let's say the parent block
* stores data from index 200 and has length 400. If the key was seeked to
* position 90, then this block will be seeked to position 90.
* When seek(90) is called on this blockStream, then
* 1. chunkIndex will be set to 2 (as indices 80 - 120 reside in chunk[2]).
* 2. chunkStream[2] will be seeked to position 10
* (= 90 - chunkOffset[2] (= 80)).
*/
protected boolean blockStreamEOF() {
if (buffersHaveData() || chunksRemaining()) {
return false;
@Override
public synchronized void seek(long pos) throws IOException {
if (!initialized) {
// Stream has not been initialized yet. Save the position so that it
// can be seeked when the stream is initialized.
blockPosition = pos;
return;
}
checkOpen();
if (pos < 0 || pos >= length) {
if (pos == 0) {
// It is possible for length and pos to be zero in which case
// seek should return instead of throwing exception
return;
}
throw new EOFException(
"EOF encountered at pos: " + pos + " for block: " + blockID);
}
if (chunkIndex >= chunkStreams.size()) {
chunkIndex = Arrays.binarySearch(chunkOffsets, pos);
} else if (pos < chunkOffsets[chunkIndex]) {
chunkIndex =
Arrays.binarySearch(chunkOffsets, 0, chunkIndex, pos);
} else if (pos >= chunkOffsets[chunkIndex] + chunkStreams
.get(chunkIndex).getLength()) {
chunkIndex = Arrays.binarySearch(chunkOffsets,
chunkIndex + 1, chunkStreams.size(), pos);
}
if (chunkIndex < 0) {
// Binary search returns -insertionPoint - 1 if element is not present
// in the array. insertionPoint is the point at which element would be
// inserted in the sorted array. We need to adjust the chunkIndex
// accordingly so that chunkIndex = insertionPoint - 1
chunkIndex = -chunkIndex - 2;
}
// Reset the previous chunkStream's position
chunkStreams.get(chunkIndexOfPrevPosition).resetPosition();
// seek to the proper offset in the ChunkInputStream
chunkStreams.get(chunkIndex).seek(pos - chunkOffsets[chunkIndex]);
chunkIndexOfPrevPosition = chunkIndex;
}
@Override
public synchronized long getPos() throws IOException {
if (length == 0) {
return 0;
}
if (!initialized) {
// The stream is not initialized yet. Return the blockPosition
return blockPosition;
} else {
// if there are any chunks, we better be at the last chunk for EOF
Preconditions.checkState(((chunks == null) || chunks.isEmpty() ||
chunkIndex == (chunks.size() - 1)),
"EOF detected, but not at the last chunk");
return true;
return chunkOffsets[chunkIndex] + chunkStreams.get(chunkIndex).getPos();
}
}
private void releaseBuffers() {
//ashes to ashes, dust to dust
buffers = null;
bufferIndex = 0;
@Override
public boolean seekToNewSource(long targetPos) throws IOException {
return false;
}
@Override
@ -222,267 +348,41 @@ public class BlockInputStream extends InputStream implements Seekable {
}
}
public synchronized void resetPosition() {
this.blockPosition = 0;
}
/**
* Checks if the stream is open. If not, throws an exception.
* Checks if the stream is open. If not, throw an exception.
*
* @throws IOException if stream is closed
*/
private synchronized void checkOpen() throws IOException {
protected synchronized void checkOpen() throws IOException {
if (xceiverClient == null) {
throw new IOException("BlockInputStream has been closed.");
}
}
/**
* Prepares to read by advancing through chunks and buffers as needed until it
* finds data to return or encounters EOF.
*
* @param len desired length of data to read
* @return length of data available to read, possibly less than desired length
*/
private synchronized int prepareRead(int len) throws IOException {
for (;;) {
if (!buffersAllocated()) {
// The current chunk at chunkIndex has not been read from the
// container. Read the chunk and put the data into buffers.
readChunkFromContainer();
}
if (buffersHaveData()) {
// Data is available from buffers
ByteBuffer bb = buffers.get(bufferIndex);
return len > bb.remaining() ? bb.remaining() : len;
} else if (chunksRemaining()) {
// There are additional chunks available.
// Read the next chunk in the block.
chunkIndex += 1;
readChunkFromContainer();
} else {
// All available input has been consumed.
return EOF;
}
}
}
private boolean buffersAllocated() {
if (buffers == null || buffers.isEmpty()) {
return false;
}
return true;
}
private boolean buffersHaveData() {
boolean hasData = false;
if (buffersAllocated()) {
while (bufferIndex < (buffers.size())) {
if (buffers.get(bufferIndex).hasRemaining()) {
// current buffer has data
hasData = true;
break;
} else {
if (buffersRemaining()) {
// move to next available buffer
++bufferIndex;
Preconditions.checkState(bufferIndex < buffers.size());
} else {
// no more buffers remaining
break;
}
}
}
}
return hasData;
}
private boolean buffersRemaining() {
return (bufferIndex < (buffers.size() - 1));
}
private boolean chunksRemaining() {
if ((chunks == null) || chunks.isEmpty()) {
return false;
}
// Check if more chunks are remaining in the stream after chunkIndex
if (chunkIndex < (chunks.size() - 1)) {
return true;
}
// ChunkIndex is the last chunk in the stream. Check if this chunk has
// been read from container or not. Return true if chunkIndex has not
// been read yet and false otherwise.
return chunkIndexOfCurrentBuffer != chunkIndex;
}
/**
* Attempts to read the chunk at the specified offset in the chunk list. If
* successful, then the data of the read chunk is saved so that its bytes can
* be returned from subsequent read calls.
*
* @throws IOException if there is an I/O error while performing the call
*/
private synchronized void readChunkFromContainer() throws IOException {
// Read the chunk at chunkIndex
final ChunkInfo chunkInfo = chunks.get(chunkIndex);
ByteString byteString;
byteString = readChunk(chunkInfo);
buffers = byteString.asReadOnlyByteBufferList();
bufferIndex = 0;
chunkIndexOfCurrentBuffer = chunkIndex;
// The bufferIndex and position might need to be adjusted if seek() was
// called on the stream before. This needs to be done so that the buffer
// position can be advanced to the 'seeked' position.
adjustBufferIndex();
}
/**
* Send RPC call to get the chunk from the container.
*/
@VisibleForTesting
protected ByteString readChunk(final ChunkInfo chunkInfo)
throws IOException {
ReadChunkResponseProto readChunkResponse;
try {
List<CheckedBiFunction> validators =
ContainerProtocolCalls.getValidatorList();
validators.add(validator);
readChunkResponse = ContainerProtocolCalls
.readChunk(xceiverClient, chunkInfo, blockID, traceID, validators);
} catch (IOException e) {
if (e instanceof StorageContainerException) {
throw e;
}
throw new IOException("Unexpected OzoneException: " + e.toString(), e);
}
return readChunkResponse.getData();
}
@VisibleForTesting
protected List<DatanodeDetails> getDatanodeList() {
return xceiverClient.getPipeline().getNodes();
}
private CheckedBiFunction<ContainerCommandRequestProto,
ContainerCommandResponseProto, IOException> validator =
(request, response) -> {
ReadChunkResponseProto readChunkResponse = response.getReadChunk();
final ChunkInfo chunkInfo = readChunkResponse.getChunkData();
ByteString byteString = readChunkResponse.getData();
if (byteString.size() != chunkInfo.getLen()) {
// Bytes read from chunk should be equal to chunk size.
throw new OzoneChecksumException(String
.format("Inconsistent read for chunk=%s len=%d bytesRead=%d",
chunkInfo.getChunkName(), chunkInfo.getLen(),
byteString.size()));
}
ChecksumData checksumData =
ChecksumData.getFromProtoBuf(chunkInfo.getChecksumData());
if (verifyChecksum) {
Checksum.verifyChecksum(byteString, checksumData);
}
};
@Override
public synchronized void seek(long pos) throws IOException {
if (pos < 0 || (chunks.size() == 0 && pos > 0)
|| pos >= chunkOffset[chunks.size() - 1] + chunks.get(chunks.size() - 1)
.getLen()) {
throw new EOFException("EOF encountered pos: " + pos + " container key: "
+ blockID.getLocalID());
}
if (pos < chunkOffset[chunkIndex]) {
chunkIndex = Arrays.binarySearch(chunkOffset, 0, chunkIndex, pos);
} else if (pos >= chunkOffset[chunkIndex] + chunks.get(chunkIndex)
.getLen()) {
chunkIndex =
Arrays.binarySearch(chunkOffset, chunkIndex + 1, chunks.size(), pos);
}
if (chunkIndex < 0) {
// Binary search returns -insertionPoint - 1 if element is not present
// in the array. insertionPoint is the point at which element would be
// inserted in the sorted array. We need to adjust the chunkIndex
// accordingly so that chunkIndex = insertionPoint - 1
chunkIndex = -chunkIndex -2;
}
// The bufferPosition should be adjusted to account for the chunk offset
// of the chunk the the pos actually points to.
bufferPosition = pos - chunkOffset[chunkIndex];
// Check if current buffers correspond to the chunk index being seeked
// and if the buffers have any data.
if (chunkIndex == chunkIndexOfCurrentBuffer && buffersAllocated()) {
// Position the buffer to the seeked position.
adjustBufferIndex();
} else {
// Release the current buffers. The next readChunkFromContainer will
// read the required chunk and position the buffer to the seeked
// position.
releaseBuffers();
}
}
private void adjustBufferIndex() {
if (bufferPosition == -1) {
// The stream has not been seeked to a position. No need to adjust the
// buffer Index and position.
return;
}
// The bufferPosition is w.r.t the buffers for current chunk.
// Adjust the bufferIndex and position to the seeked position.
long tempOffest = 0;
for (int i = 0; i < buffers.size(); i++) {
if (bufferPosition - tempOffest >= buffers.get(i).capacity()) {
tempOffest += buffers.get(i).capacity();
} else {
bufferIndex = i;
break;
}
}
buffers.get(bufferIndex).position((int) (bufferPosition - tempOffest));
// Reset the bufferPosition as the seek() operation has been completed.
bufferPosition = -1;
}
@Override
public synchronized long getPos() throws IOException {
// position = chunkOffset of current chunk (at chunkIndex) + position of
// the buffer corresponding to the chunk.
long bufferPos = 0;
if (bufferPosition >= 0) {
// seek has been called but the buffers were empty. Hence, the buffer
// position will be advanced after the buffers are filled.
// We return the chunkOffset + bufferPosition here as that will be the
// position of the buffer pointer after reading the chunk file.
bufferPos = bufferPosition;
} else if (blockStreamEOF()) {
// all data consumed, buffers have been released.
// get position from the chunk offset and chunk length of last chunk
bufferPos = chunks.get(chunkIndex).getLen();
} else if (buffersAllocated()) {
// get position from available buffers of current chunk
bufferPos = buffers.get(bufferIndex).position();
}
return chunkOffset[chunkIndex] + bufferPos;
}
@Override
public boolean seekToNewSource(long targetPos) throws IOException {
return false;
}
public BlockID getBlockID() {
return blockID;
}
public long getLength() {
return length;
}
@VisibleForTesting
protected int getChunkIndex() {
synchronized int getChunkIndex() {
return chunkIndex;
}
@VisibleForTesting
synchronized long getBlockPosition() {
return blockPosition;
}
@VisibleForTesting
synchronized List<ChunkInputStream> getChunkStreams() {
return chunkStreams;
}
}

View File

@ -0,0 +1,546 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdds.scm.storage;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import org.apache.hadoop.fs.Seekable;
import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProto;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandResponseProto;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChunkInfo;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ReadChunkResponseProto;
import org.apache.hadoop.hdds.scm.XceiverClientSpi;
import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
import org.apache.hadoop.ozone.common.Checksum;
import org.apache.hadoop.ozone.common.ChecksumData;
import org.apache.hadoop.ozone.common.OzoneChecksumException;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.util.List;
/**
* An {@link InputStream} called from BlockInputStream to read a chunk from the
* container. Each chunk may contain multiple underlying {@link ByteBuffer}
* instances.
*/
public class ChunkInputStream extends InputStream implements Seekable {
private ChunkInfo chunkInfo;
private final long length;
private final BlockID blockID;
private final String traceID;
private XceiverClientSpi xceiverClient;
private boolean verifyChecksum;
private boolean allocated = false;
// Buffer to store the chunk data read from the DN container
private List<ByteBuffer> buffers;
// Index of the buffers corresponding to the current position of the buffers
private int bufferIndex;
// The offset of the current data residing in the buffers w.r.t the start
// of chunk data
private long bufferOffset;
// The number of bytes of chunk data residing in the buffers currently
private long bufferLength;
// Position of the ChunkInputStream is maintained by this variable (if a
// seek is performed. This position is w.r.t to the chunk only and not the
// block or key. This variable is set only if either the buffers are not
// yet allocated or the if the allocated buffers do not cover the seeked
// position. Once the chunk is read, this variable is reset.
private long chunkPosition = -1;
private static final int EOF = -1;
ChunkInputStream(ChunkInfo chunkInfo, BlockID blockId,
String traceId, XceiverClientSpi xceiverClient, boolean verifyChecksum) {
this.chunkInfo = chunkInfo;
this.length = chunkInfo.getLen();
this.blockID = blockId;
this.traceID = traceId;
this.xceiverClient = xceiverClient;
this.verifyChecksum = verifyChecksum;
}
public synchronized long getRemaining() throws IOException {
return length - getPos();
}
/**
* {@inheritDoc}
*/
@Override
public synchronized int read() throws IOException {
checkOpen();
int available = prepareRead(1);
int dataout = EOF;
if (available == EOF) {
// There is no more data in the chunk stream. The buffers should have
// been released by now
Preconditions.checkState(buffers == null);
} else {
dataout = Byte.toUnsignedInt(buffers.get(bufferIndex).get());
}
if (chunkStreamEOF()) {
// consumer might use getPos to determine EOF,
// so release buffers when serving the last byte of data
releaseBuffers();
}
return dataout;
}
/**
* {@inheritDoc}
*/
@Override
public synchronized int read(byte[] b, int off, int len) throws IOException {
// According to the JavaDocs for InputStream, it is recommended that
// subclasses provide an override of bulk read if possible for performance
// reasons. In addition to performance, we need to do it for correctness
// reasons. The Ozone REST service uses PipedInputStream and
// PipedOutputStream to relay HTTP response data between a Jersey thread and
// a Netty thread. It turns out that PipedInputStream/PipedOutputStream
// have a subtle dependency (bug?) on the wrapped stream providing separate
// implementations of single-byte read and bulk read. Without this, get key
// responses might close the connection before writing all of the bytes
// advertised in the Content-Length.
if (b == null) {
throw new NullPointerException();
}
if (off < 0 || len < 0 || len > b.length - off) {
throw new IndexOutOfBoundsException();
}
if (len == 0) {
return 0;
}
checkOpen();
int total = 0;
while (len > 0) {
int available = prepareRead(len);
if (available == EOF) {
// There is no more data in the chunk stream. The buffers should have
// been released by now
Preconditions.checkState(buffers == null);
return total != 0 ? total : EOF;
}
buffers.get(bufferIndex).get(b, off + total, available);
len -= available;
total += available;
}
if (chunkStreamEOF()) {
// smart consumers determine EOF by calling getPos()
// so we release buffers when serving the final bytes of data
releaseBuffers();
}
return total;
}
/**
* Seeks the ChunkInputStream to the specified position. This is done by
* updating the chunkPosition to the seeked position in case the buffers
* are not allocated or buffers do not contain the data corresponding to
* the seeked position (determined by buffersHavePosition()). Otherwise,
* the buffers position is updated to the seeked position.
*/
@Override
public synchronized void seek(long pos) throws IOException {
if (pos < 0 || pos >= length) {
if (pos == 0) {
// It is possible for length and pos to be zero in which case
// seek should return instead of throwing exception
return;
}
throw new EOFException("EOF encountered at pos: " + pos + " for chunk: "
+ chunkInfo.getChunkName());
}
if (buffersHavePosition(pos)) {
// The bufferPosition is w.r.t the current chunk.
// Adjust the bufferIndex and position to the seeked position.
adjustBufferPosition(pos - bufferOffset);
} else {
chunkPosition = pos;
}
}
@Override
public synchronized long getPos() throws IOException {
if (chunkPosition >= 0) {
return chunkPosition;
}
if (chunkStreamEOF()) {
return length;
}
if (buffersHaveData()) {
return bufferOffset + buffers.get(bufferIndex).position();
}
if (buffersAllocated()) {
return bufferOffset + bufferLength;
}
return 0;
}
@Override
public boolean seekToNewSource(long targetPos) throws IOException {
return false;
}
@Override
public synchronized void close() {
if (xceiverClient != null) {
xceiverClient = null;
}
}
/**
* Checks if the stream is open. If not, throw an exception.
*
* @throws IOException if stream is closed
*/
protected synchronized void checkOpen() throws IOException {
if (xceiverClient == null) {
throw new IOException("BlockInputStream has been closed.");
}
}
/**
* Prepares to read by advancing through buffers or allocating new buffers,
* as needed until it finds data to return, or encounters EOF.
* @param len desired lenght of data to read
* @return length of data available to read, possibly less than desired length
*/
private synchronized int prepareRead(int len) throws IOException {
for (;;) {
if (chunkPosition >= 0) {
if (buffersHavePosition(chunkPosition)) {
// The current buffers have the seeked position. Adjust the buffer
// index and position to point to the chunkPosition.
adjustBufferPosition(chunkPosition - bufferOffset);
} else {
// Read a required chunk data to fill the buffers with seeked
// position data
readChunkFromContainer(len);
}
}
if (buffersHaveData()) {
// Data is available from buffers
ByteBuffer bb = buffers.get(bufferIndex);
return len > bb.remaining() ? bb.remaining() : len;
} else if (dataRemainingInChunk()) {
// There is more data in the chunk stream which has not
// been read into the buffers yet.
readChunkFromContainer(len);
} else {
// All available input from this chunk stream has been consumed.
return EOF;
}
}
}
/**
* Reads full or partial Chunk from DN Container based on the current
* position of the ChunkInputStream, the number of bytes of data to read
* and the checksum boundaries.
* If successful, then the read data in saved in the buffers so that
* subsequent read calls can utilize it.
* @param len number of bytes of data to be read
* @throws IOException if there is an I/O error while performing the call
* to Datanode
*/
private synchronized void readChunkFromContainer(int len) throws IOException {
// index of first byte to be read from the chunk
long startByteIndex;
if (chunkPosition >= 0) {
// If seek operation was called to advance the buffer position, the
// chunk should be read from that position onwards.
startByteIndex = chunkPosition;
} else {
// Start reading the chunk from the last chunkPosition onwards.
startByteIndex = bufferOffset + bufferLength;
}
if (verifyChecksum) {
// Update the bufferOffset and bufferLength as per the checksum
// boundary requirement.
computeChecksumBoundaries(startByteIndex, len);
} else {
// Read from the startByteIndex
bufferOffset = startByteIndex;
bufferLength = len;
}
// Adjust the chunkInfo so that only the required bytes are read from
// the chunk.
final ChunkInfo adjustedChunkInfo = ChunkInfo.newBuilder(chunkInfo)
.setOffset(bufferOffset)
.setLen(bufferLength)
.build();
ByteString byteString = readChunk(adjustedChunkInfo);
buffers = byteString.asReadOnlyByteBufferList();
bufferIndex = 0;
allocated = true;
// If the stream was seeked to position before, then the buffer
// position should be adjusted as the reads happen at checksum boundaries.
// The buffers position might need to be adjusted for the following
// scenarios:
// 1. Stream was seeked to a position before the chunk was read
// 2. Chunk was read from index < the current position to account for
// checksum boundaries.
adjustBufferPosition(startByteIndex - bufferOffset);
}
/**
* Send RPC call to get the chunk from the container.
*/
@VisibleForTesting
protected ByteString readChunk(ChunkInfo readChunkInfo) throws IOException {
ReadChunkResponseProto readChunkResponse;
try {
List<CheckedBiFunction> validators =
ContainerProtocolCalls.getValidatorList();
validators.add(validator);
readChunkResponse = ContainerProtocolCalls.readChunk(xceiverClient,
readChunkInfo, blockID, traceID, validators);
} catch (IOException e) {
if (e instanceof StorageContainerException) {
throw e;
}
throw new IOException("Unexpected OzoneException: " + e.toString(), e);
}
return readChunkResponse.getData();
}
private CheckedBiFunction<ContainerCommandRequestProto,
ContainerCommandResponseProto, IOException> validator =
(request, response) -> {
final ChunkInfo reqChunkInfo =
request.getReadChunk().getChunkData();
ReadChunkResponseProto readChunkResponse = response.getReadChunk();
ByteString byteString = readChunkResponse.getData();
if (byteString.size() != reqChunkInfo.getLen()) {
// Bytes read from chunk should be equal to chunk size.
throw new OzoneChecksumException(String
.format("Inconsistent read for chunk=%s len=%d bytesRead=%d",
reqChunkInfo.getChunkName(), reqChunkInfo.getLen(),
byteString.size()));
}
if (verifyChecksum) {
ChecksumData checksumData = ChecksumData.getFromProtoBuf(
chunkInfo.getChecksumData());
// ChecksumData stores checksum for each 'numBytesPerChecksum'
// number of bytes in a list. Compute the index of the first
// checksum to match with the read data
int checkumStartIndex = (int) (reqChunkInfo.getOffset() /
checksumData.getBytesPerChecksum());
Checksum.verifyChecksum(
byteString, checksumData, checkumStartIndex);
}
};
/**
* Return the offset and length of bytes that need to be read from the
* chunk file to cover the checksum boundaries covering the actual start and
* end of the chunk index to be read.
* For example, lets say the client is reading from index 120 to 450 in the
* chunk. And let's say checksum is stored for every 100 bytes in the chunk
* i.e. the first checksum is for bytes from index 0 to 99, the next for
* bytes from index 100 to 199 and so on. To verify bytes from 120 to 450,
* we would need to read from bytes 100 to 499 so that checksum
* verification can be done.
*
* @param startByteIndex the first byte index to be read by client
* @param dataLen number of bytes to be read from the chunk
*/
private void computeChecksumBoundaries(long startByteIndex, int dataLen) {
int bytesPerChecksum = chunkInfo.getChecksumData().getBytesPerChecksum();
// index of the last byte to be read from chunk, inclusively.
final long endByteIndex = startByteIndex + dataLen - 1;
bufferOffset = (startByteIndex / bytesPerChecksum)
* bytesPerChecksum; // inclusive
final long endIndex = ((endByteIndex / bytesPerChecksum) + 1)
* bytesPerChecksum; // exclusive
bufferLength = Math.min(endIndex, length) - bufferOffset;
}
/**
* Adjust the buffers position to account for seeked position and/ or checksum
* boundary reads.
* @param bufferPosition the position to which the buffers must be advanced
*/
private void adjustBufferPosition(long bufferPosition) {
// The bufferPosition is w.r.t the current chunk.
// Adjust the bufferIndex and position to the seeked chunkPosition.
long tempOffest = 0;
for (int i = 0; i < buffers.size(); i++) {
if (bufferPosition - tempOffest >= buffers.get(i).capacity()) {
tempOffest += buffers.get(i).capacity();
} else {
bufferIndex = i;
break;
}
}
buffers.get(bufferIndex).position((int) (bufferPosition - tempOffest));
// Reset the chunkPosition as chunk stream has been initialized i.e. the
// buffers have been allocated.
resetPosition();
}
/**
* Check if the buffers have been allocated data and false otherwise.
*/
private boolean buffersAllocated() {
return buffers != null && !buffers.isEmpty();
}
/**
* Check if the buffers have any data remaining between the current
* position and the limit.
*/
private boolean buffersHaveData() {
boolean hasData = false;
if (buffersAllocated()) {
while (bufferIndex < (buffers.size())) {
if (buffers.get(bufferIndex).hasRemaining()) {
// current buffer has data
hasData = true;
break;
} else {
if (buffersRemaining()) {
// move to next available buffer
++bufferIndex;
Preconditions.checkState(bufferIndex < buffers.size());
} else {
// no more buffers remaining
break;
}
}
}
}
return hasData;
}
private boolean buffersRemaining() {
return (bufferIndex < (buffers.size() - 1));
}
/**
* Check if curernt buffers have the data corresponding to the input position.
*/
private boolean buffersHavePosition(long pos) {
// Check if buffers have been allocated
if (buffersAllocated()) {
// Check if the current buffers cover the input position
return pos >= bufferOffset &&
pos < bufferOffset + bufferLength;
}
return false;
}
/**
* Check if there is more data in the chunk which has not yet been read
* into the buffers.
*/
private boolean dataRemainingInChunk() {
long bufferPos;
if (chunkPosition >= 0) {
bufferPos = chunkPosition;
} else {
bufferPos = bufferOffset + bufferLength;
}
return bufferPos < length;
}
/**
* Check if end of chunkStream has been reached.
*/
private boolean chunkStreamEOF() {
if (!allocated) {
// Chunk data has not been read yet
return false;
}
if (buffersHaveData() || dataRemainingInChunk()) {
return false;
} else {
Preconditions.checkState(bufferOffset + bufferLength == length,
"EOF detected, but not at the last byte of the chunk");
return true;
}
}
/**
* If EOF is reached, release the buffers.
*/
private void releaseBuffers() {
buffers = null;
bufferIndex = 0;
}
/**
* Reset the chunkPosition once the buffers are allocated.
*/
void resetPosition() {
this.chunkPosition = -1;
}
String getChunkName() {
return chunkInfo.getChunkName();
}
protected long getLength() {
return length;
}
@VisibleForTesting
protected long getChunkPosition() {
return chunkPosition;
}
}

View File

@ -1,32 +1,33 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdds.scm.storage;
import com.google.common.primitives.Bytes;
import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.client.ContainerBlockID;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
.ChecksumData;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
.ChecksumType;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChecksumType;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChunkInfo;
import org.apache.hadoop.hdds.scm.XceiverClientManager;
import org.apache.hadoop.hdds.scm.XceiverClientSpi;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier;
import org.apache.hadoop.ozone.common.Checksum;
import org.apache.hadoop.security.token.Token;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@ -34,106 +35,127 @@ import org.junit.Test;
import java.io.EOFException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.UUID;
import static org.apache.hadoop.hdds.scm.storage.TestChunkInputStream.generateRandomData;
/**
* Tests {@link BlockInputStream}.
* Tests for {@link BlockInputStream}'s functionality.
*/
public class TestBlockInputStream {
private static BlockInputStream blockInputStream;
private static List<ChunkInfo> chunks;
private static int blockSize;
private static final int CHUNK_SIZE = 100;
private static Checksum checksum;
private static final int CHUNK_SIZE = 20;
private BlockInputStream blockStream;
private byte[] blockData;
private int blockSize;
private List<ChunkInfo> chunks;
private Map<String, byte[]> chunkDataMap;
@Before
public void setup() throws Exception {
BlockID blockID = new BlockID(new ContainerBlockID(1, 1));
chunks = createChunkList(10);
String traceID = UUID.randomUUID().toString();
blockInputStream = new DummyBlockInputStream(blockID, null, null, chunks,
traceID, false, 0);
checksum = new Checksum(ChecksumType.NONE, CHUNK_SIZE);
createChunkList(5);
blockSize = 0;
for (ChunkInfo chunk : chunks) {
blockSize += chunk.getLen();
}
blockStream = new DummyBlockInputStream(blockID, blockSize, null, null,
false, null, null);
}
/**
* Create a mock list of chunks. The first n-1 chunks of length CHUNK_SIZE
* and the last chunk with length CHUNK_SIZE/2.
* @param numChunks
* @return
*/
private static List<ChunkInfo> createChunkList(int numChunks) {
ChecksumData dummyChecksumData = ChecksumData.newBuilder()
.setType(ChecksumType.NONE)
.setBytesPerChecksum(100)
.build();
List<ChunkInfo> chunkList = new ArrayList<>(numChunks);
int i;
for (i = 0; i < numChunks - 1; i++) {
String chunkName = "chunk-" + i;
private void createChunkList(int numChunks)
throws Exception {
chunks = new ArrayList<>(numChunks);
chunkDataMap = new HashMap<>();
blockData = new byte[0];
int i, chunkLen;
byte[] byteData;
String chunkName;
for (i = 0; i < numChunks; i++) {
chunkName = "chunk-" + i;
chunkLen = CHUNK_SIZE;
if (i == numChunks - 1) {
chunkLen = CHUNK_SIZE / 2;
}
byteData = generateRandomData(chunkLen);
ChunkInfo chunkInfo = ChunkInfo.newBuilder()
.setChunkName(chunkName)
.setOffset(0)
.setLen(CHUNK_SIZE)
.setChecksumData(dummyChecksumData)
.setLen(chunkLen)
.setChecksumData(checksum.computeChecksum(
byteData, 0, chunkLen).getProtoBufMessage())
.build();
chunkList.add(chunkInfo);
}
ChunkInfo chunkInfo = ChunkInfo.newBuilder()
.setChunkName("chunk-" + i)
.setOffset(0)
.setLen(CHUNK_SIZE/2)
.setChecksumData(dummyChecksumData)
.build();
chunkList.add(chunkInfo);
return chunkList;
chunkDataMap.put(chunkName, byteData);
chunks.add(chunkInfo);
blockSize += chunkLen;
blockData = Bytes.concat(blockData, byteData);
}
}
/**
* A dummy BlockInputStream to test the functionality of BlockInputStream.
* A dummy BlockInputStream to mock read block call to DN.
*/
private static class DummyBlockInputStream extends BlockInputStream {
private class DummyBlockInputStream extends BlockInputStream {
DummyBlockInputStream(BlockID blockID,
XceiverClientManager xceiverClientManager,
XceiverClientSpi xceiverClient,
List<ChunkInfo> chunks,
String traceID,
DummyBlockInputStream(BlockID blockId,
long blockLen,
Pipeline pipeline,
Token<OzoneBlockTokenIdentifier> token,
boolean verifyChecksum,
long initialPosition) throws IOException {
super(blockID, xceiverClientManager, xceiverClient, chunks, traceID,
verifyChecksum, initialPosition);
String traceId,
XceiverClientManager xceiverClientManager) {
super(blockId, blockLen, pipeline, token, verifyChecksum,
traceId, xceiverClientManager);
}
@Override
protected ByteString readChunk(final ChunkInfo chunkInfo)
throws IOException {
return getByteString(chunkInfo.getChunkName(), (int) chunkInfo.getLen());
protected List<ChunkInfo> getChunkInfos() {
return chunks;
}
@Override
protected List<DatanodeDetails> getDatanodeList() {
// return an empty dummy list of size 10
return new ArrayList<>(10);
protected void addStream(ChunkInfo chunkInfo) {
TestChunkInputStream testChunkInputStream = new TestChunkInputStream();
getChunkStreams().add(testChunkInputStream.new DummyChunkInputStream(
chunkInfo, null, null, null, false,
chunkDataMap.get(chunkInfo.getChunkName()).clone()));
}
/**
* Create ByteString with the input data to return when a readChunk call is
* placed.
*/
private static ByteString getByteString(String data, int length) {
while (data.length() < length) {
data = data + "0";
}
return ByteString.copyFrom(data.getBytes(), 0, length);
@Override
protected synchronized void checkOpen() throws IOException {
// No action needed
}
}
private void seekAndVerify(int pos) throws Exception {
blockStream.seek(pos);
Assert.assertEquals("Current position of buffer does not match with the " +
"seeked position", pos, blockStream.getPos());
}
/**
* Match readData with the chunkData byte-wise.
* @param readData Data read through ChunkInputStream
* @param inputDataStartIndex first index (inclusive) in chunkData to compare
* with read data
* @param length the number of bytes of data to match starting from
* inputDataStartIndex
*/
private void matchWithInputData(byte[] readData, int inputDataStartIndex,
int length) {
for (int i = inputDataStartIndex; i < inputDataStartIndex + length; i++) {
Assert.assertEquals(blockData[i], readData[i - inputDataStartIndex]);
}
}
@ -143,17 +165,26 @@ public class TestBlockInputStream {
int pos = 0;
seekAndVerify(pos);
Assert.assertEquals("ChunkIndex is incorrect", 0,
blockInputStream.getChunkIndex());
blockStream.getChunkIndex());
// Before BlockInputStream is initialized (initialization happens during
// read operation), seek should update the BlockInputStream#blockPosition
pos = CHUNK_SIZE;
seekAndVerify(pos);
Assert.assertEquals("ChunkIndex is incorrect", 1,
blockInputStream.getChunkIndex());
Assert.assertEquals("ChunkIndex is incorrect", 0,
blockStream.getChunkIndex());
Assert.assertEquals(pos, blockStream.getBlockPosition());
pos = (CHUNK_SIZE * 5) + 5;
// Initialize the BlockInputStream. After initializtion, the chunkIndex
// should be updated to correspond to the seeked position.
blockStream.initialize();
Assert.assertEquals("ChunkIndex is incorrect", 1,
blockStream.getChunkIndex());
pos = (CHUNK_SIZE * 4) + 5;
seekAndVerify(pos);
Assert.assertEquals("ChunkIndex is incorrect", 5,
blockInputStream.getChunkIndex());
Assert.assertEquals("ChunkIndex is incorrect", 4,
blockStream.getChunkIndex());
try {
// Try seeking beyond the blockSize.
@ -161,7 +192,7 @@ public class TestBlockInputStream {
seekAndVerify(pos);
Assert.fail("Seek to position beyond block size should fail.");
} catch (EOFException e) {
// Expected
System.out.println(e);
}
// Seek to random positions between 0 and the block size.
@ -173,20 +204,32 @@ public class TestBlockInputStream {
}
@Test
public void testBlockEOF() throws Exception {
// Seek to some position < blockSize and verify EOF is not reached.
seekAndVerify(CHUNK_SIZE);
Assert.assertFalse(blockInputStream.blockStreamEOF());
public void testRead() throws Exception {
// read 200 bytes of data starting from position 50. Chunk0 contains
// indices 0 to 99, chunk1 from 100 to 199 and chunk3 from 200 to 299. So
// the read should result in 3 ChunkInputStream reads
seekAndVerify(50);
byte[] b = new byte[200];
blockStream.read(b, 0, 200);
matchWithInputData(b, 50, 200);
// Seek to blockSize-1 and verify that EOF is not reached as the chunk
// has not been read from container yet.
seekAndVerify(blockSize-1);
Assert.assertFalse(blockInputStream.blockStreamEOF());
// The new position of the blockInputStream should be the last index read
// + 1.
Assert.assertEquals(250, blockStream.getPos());
Assert.assertEquals(2, blockStream.getChunkIndex());
}
private void seekAndVerify(int pos) throws Exception {
blockInputStream.seek(pos);
Assert.assertEquals("Current position of buffer does not match with the " +
"seeked position", pos, blockInputStream.getPos());
@Test
public void testSeekAndRead() throws Exception {
// Seek to a position and read data
seekAndVerify(50);
byte[] b1 = new byte[100];
blockStream.read(b1, 0, 100);
matchWithInputData(b1, 50, 100);
// Next read should start from the position of the last read + 1 i.e. 100
byte[] b2 = new byte[100];
blockStream.read(b2, 0, 100);
matchWithInputData(b2, 150, 100);
}
}

View File

@ -0,0 +1,224 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdds.scm.storage;
import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChecksumType;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChunkInfo;
import org.apache.hadoop.hdds.scm.XceiverClientSpi;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.common.Checksum;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.io.EOFException;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
/**
* Tests for {@link ChunkInputStream}'s functionality.
*/
public class TestChunkInputStream {
private static final int CHUNK_SIZE = 100;
private static final int BYTES_PER_CHECKSUM = 20;
private static final String CHUNK_NAME = "dummyChunk";
private static final Random RANDOM = new Random();
private static Checksum checksum;
private DummyChunkInputStream chunkStream;
private ChunkInfo chunkInfo;
private byte[] chunkData;
@Before
public void setup() throws Exception {
checksum = new Checksum(ChecksumType.valueOf(
OzoneConfigKeys.OZONE_CLIENT_CHECKSUM_TYPE_DEFAULT),
BYTES_PER_CHECKSUM);
chunkData = generateRandomData(CHUNK_SIZE);
chunkInfo = ChunkInfo.newBuilder()
.setChunkName(CHUNK_NAME)
.setOffset(0)
.setLen(CHUNK_SIZE)
.setChecksumData(checksum.computeChecksum(
chunkData, 0, CHUNK_SIZE).getProtoBufMessage())
.build();
chunkStream = new DummyChunkInputStream(chunkInfo, null, null, null, true);
}
static byte[] generateRandomData(int length) {
byte[] bytes = new byte[length];
RANDOM.nextBytes(bytes);
return bytes;
}
/**
* A dummy ChunkInputStream to mock read chunk calls to DN.
*/
public class DummyChunkInputStream extends ChunkInputStream {
// Stores the read chunk data in each readChunk call
private List<ByteString> readByteBuffers = new ArrayList<>();
DummyChunkInputStream(ChunkInfo chunkInfo,
BlockID blockId,
String traceId,
XceiverClientSpi xceiverClient,
boolean verifyChecksum) {
super(chunkInfo, blockId, traceId, xceiverClient, verifyChecksum);
}
public DummyChunkInputStream(ChunkInfo chunkInfo,
BlockID blockId,
String traceId,
XceiverClientSpi xceiverClient,
boolean verifyChecksum,
byte[] data) {
super(chunkInfo, blockId, traceId, xceiverClient, verifyChecksum);
chunkData = data;
}
@Override
protected ByteString readChunk(ChunkInfo readChunkInfo) {
ByteString byteString = ByteString.copyFrom(chunkData,
(int) readChunkInfo.getOffset(),
(int) readChunkInfo.getLen());
readByteBuffers.add(byteString);
return byteString;
}
@Override
protected void checkOpen() {
// No action needed
}
}
/**
* Match readData with the chunkData byte-wise.
* @param readData Data read through ChunkInputStream
* @param inputDataStartIndex first index (inclusive) in chunkData to compare
* with read data
* @param length the number of bytes of data to match starting from
* inputDataStartIndex
*/
private void matchWithInputData(byte[] readData, int inputDataStartIndex,
int length) {
for (int i = inputDataStartIndex; i < inputDataStartIndex + length; i++) {
Assert.assertEquals(chunkData[i], readData[i - inputDataStartIndex]);
}
}
/**
* Seek to a position and verify through getPos().
*/
private void seekAndVerify(int pos) throws Exception {
chunkStream.seek(pos);
Assert.assertEquals("Current position of buffer does not match with the " +
"seeked position", pos, chunkStream.getPos());
}
@Test
public void testFullChunkRead() throws Exception {
byte[] b = new byte[CHUNK_SIZE];
chunkStream.read(b, 0, CHUNK_SIZE);
matchWithInputData(b, 0, CHUNK_SIZE);
}
@Test
public void testPartialChunkRead() throws Exception {
int len = CHUNK_SIZE / 2;
byte[] b = new byte[len];
chunkStream.read(b, 0, len);
matchWithInputData(b, 0, len);
// To read chunk data from index 0 to 49 (len = 50), we need to read
// chunk from offset 0 to 60 as the checksum boundary is at every 20
// bytes. Verify that 60 bytes of chunk data are read and stored in the
// buffers.
matchWithInputData(chunkStream.readByteBuffers.get(0).toByteArray(),
0, 60);
}
@Test
public void testSeek() throws Exception {
seekAndVerify(0);
try {
seekAndVerify(CHUNK_SIZE);
Assert.fail("Seeking to Chunk Length should fail.");
} catch (EOFException e) {
GenericTestUtils.assertExceptionContains("EOF encountered at pos: "
+ CHUNK_SIZE + " for chunk: " + CHUNK_NAME, e);
}
// Seek before read should update the ChunkInputStream#chunkPosition
seekAndVerify(25);
Assert.assertEquals(25, chunkStream.getChunkPosition());
// Read from the seeked position.
// Reading from index 25 to 54 should result in the ChunkInputStream
// copying chunk data from index 20 to 59 into the buffers (checksum
// boundaries).
byte[] b = new byte[30];
chunkStream.read(b, 0, 30);
matchWithInputData(b, 25, 30);
matchWithInputData(chunkStream.readByteBuffers.get(0).toByteArray(),
20, 40);
// After read, the position of the chunkStream is evaluated from the
// buffers and the chunkPosition should be reset to -1.
Assert.assertEquals(-1, chunkStream.getChunkPosition());
// Seek to a position within the current buffers. Current buffers contain
// data from index 20 to 59. ChunkPosition should still not be used to
// set the position.
seekAndVerify(35);
Assert.assertEquals(-1, chunkStream.getChunkPosition());
// Seek to a position outside the current buffers. In this case, the
// chunkPosition should be updated to the seeked position.
seekAndVerify(75);
Assert.assertEquals(75, chunkStream.getChunkPosition());
}
@Test
public void testSeekAndRead() throws Exception {
// Seek to a position and read data
seekAndVerify(50);
byte[] b1 = new byte[20];
chunkStream.read(b1, 0, 20);
matchWithInputData(b1, 50, 20);
// Next read should start from the position of the last read + 1 i.e. 70
byte[] b2 = new byte[20];
chunkStream.read(b2, 0, 20);
matchWithInputData(b2, 70, 20);
}
}

View File

@ -225,15 +225,17 @@ public class Checksum {
/**
* Computes the ChecksumData for the input data and verifies that it
* matches with that of the input checksumData.
* matches with that of the input checksumData, starting from index
* startIndex.
* @param byteString input data
* @param checksumData checksumData to match with
* @param startIndex index of first checksum in checksumData to match with
* data's computed checksum.
* @throws OzoneChecksumException is thrown if checksums do not match
*/
public static boolean verifyChecksum(
ByteString byteString, ChecksumData checksumData)
throws OzoneChecksumException {
return verifyChecksum(byteString.toByteArray(), checksumData);
public static boolean verifyChecksum(ByteString byteString,
ChecksumData checksumData, int startIndex) throws OzoneChecksumException {
return verifyChecksum(byteString.toByteArray(), checksumData, startIndex);
}
/**
@ -245,6 +247,20 @@ public class Checksum {
*/
public static boolean verifyChecksum(byte[] data, ChecksumData checksumData)
throws OzoneChecksumException {
return verifyChecksum(data, checksumData, 0);
}
/**
* Computes the ChecksumData for the input data and verifies that it
* matches with that of the input checksumData.
* @param data input data
* @param checksumData checksumData to match with
* @param startIndex index of first checksum in checksumData to match with
* data's computed checksum.
* @throws OzoneChecksumException is thrown if checksums do not match
*/
public static boolean verifyChecksum(byte[] data, ChecksumData checksumData,
int startIndex) throws OzoneChecksumException {
ChecksumType checksumType = checksumData.getChecksumType();
if (checksumType == ChecksumType.NONE) {
// Checksum is set to NONE. No further verification is required.
@ -256,7 +272,8 @@ public class Checksum {
ChecksumData computedChecksumData =
checksum.computeChecksum(data, 0, data.length);
return checksumData.verifyChecksumDataMatches(computedChecksumData);
return checksumData.verifyChecksumDataMatches(computedChecksumData,
startIndex);
}
/**

View File

@ -111,13 +111,20 @@ public class ChecksumData {
}
/**
* Verify that this ChecksumData matches with the input ChecksumData.
* Verify that this ChecksumData from startIndex to endIndex matches with the
* provided ChecksumData.
* The checksum at startIndex of this ChecksumData will be matched with the
* checksum at index 0 of the provided ChecksumData, and checksum at
* (startIndex + 1) of this ChecksumData with checksum at index 1 of
* provided ChecksumData and so on.
* @param that the ChecksumData to match with
* @param startIndex index of the first checksum from this ChecksumData
* which will be used to compare checksums
* @return true if checksums match
* @throws OzoneChecksumException
*/
public boolean verifyChecksumDataMatches(ChecksumData that) throws
OzoneChecksumException {
public boolean verifyChecksumDataMatches(ChecksumData that, int startIndex)
throws OzoneChecksumException {
// pre checks
if (this.checksums.size() == 0) {
@ -130,18 +137,22 @@ public class ChecksumData {
"checksums");
}
if (this.checksums.size() != that.checksums.size()) {
throw new OzoneChecksumException("Original and Computed checksumData's " +
"has different number of checksums");
}
int numChecksums = that.checksums.size();
// Verify that checksum matches at each index
for (int index = 0; index < this.checksums.size(); index++) {
if (!matchChecksumAtIndex(this.checksums.get(index),
that.checksums.get(index))) {
// checksum mismatch. throw exception.
throw new OzoneChecksumException(index);
try {
// Verify that checksum matches at each index
for (int index = 0; index < numChecksums; index++) {
if (!matchChecksumAtIndex(this.checksums.get(startIndex + index),
that.checksums.get(index))) {
// checksum mismatch. throw exception.
throw new OzoneChecksumException(index);
}
}
} catch (ArrayIndexOutOfBoundsException e) {
throw new OzoneChecksumException("Computed checksum has "
+ numChecksums + " number of checksums. Original checksum has " +
(this.checksums.size() - startIndex) + " number of checksums " +
"starting from index " + startIndex);
}
return true;
}

View File

@ -20,19 +20,10 @@ package org.apache.hadoop.ozone.client.io;
import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.fs.FSExceptionMessages;
import org.apache.hadoop.fs.Seekable;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.protocol.StorageContainerLocationProtocol;
import org.apache.hadoop.hdds.scm.XceiverClientManager;
import org.apache.hadoop.hdds.scm.storage.BlockInputStream;
import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
import org.apache.hadoop.hdds.scm.XceiverClientManager;
import org.apache.hadoop.hdds.scm.XceiverClientSpi;
import org.apache.hadoop.hdds.scm.storage.BlockInputStream;
import org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.ratis.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -53,60 +44,93 @@ public class KeyInputStream extends InputStream implements Seekable {
private static final int EOF = -1;
private final ArrayList<ChunkInputStreamEntry> streamEntries;
// streamOffset[i] stores the offset at which blockInputStream i stores
// data in the key
private long[] streamOffset = null;
private int currentStreamIndex;
private String key;
private long length = 0;
private boolean closed = false;
private String key;
// List of BlockInputStreams, one for each block in the key
private final List<BlockInputStream> blockStreams;
// blockOffsets[i] stores the index of the first data byte in
// blockStream w.r.t the key data.
// For example, lets say the block size is 200 bytes and block[0] stores
// data from indices 0 - 199, block[1] from indices 200 - 399 and so on.
// Then, blockOffset[0] = 0 (the offset of the first byte of data in
// block[0]), blockOffset[1] = 200 and so on.
private long[] blockOffsets = null;
// Index of the blockStream corresponding to the current position of the
// KeyInputStream i.e. offset of the data to be read next
private int blockIndex;
// Tracks the blockIndex corresponding to the last seeked position so that it
// can be reset if a new position is seeked.
private int blockIndexOfPrevPosition;
public KeyInputStream() {
streamEntries = new ArrayList<>();
currentStreamIndex = 0;
}
@VisibleForTesting
public synchronized int getCurrentStreamIndex() {
return currentStreamIndex;
}
@VisibleForTesting
public long getRemainingOfIndex(int index) throws IOException {
return streamEntries.get(index).getRemaining();
blockStreams = new ArrayList<>();
blockIndex = 0;
}
/**
* Append another stream to the end of the list.
*
* @param stream the stream instance.
* @param streamLength the max number of bytes that should be written to this
* stream.
* For each block in keyInfo, add a BlockInputStream to blockStreams.
*/
@VisibleForTesting
public synchronized void addStream(BlockInputStream stream,
long streamLength) {
streamEntries.add(new ChunkInputStreamEntry(stream, streamLength));
public static LengthInputStream getFromOmKeyInfo(OmKeyInfo keyInfo,
XceiverClientManager xceiverClientManager, String requestId,
boolean verifyChecksum) {
List<OmKeyLocationInfo> keyLocationInfos = keyInfo
.getLatestVersionLocations().getBlocksLatestVersionOnly();
KeyInputStream keyInputStream = new KeyInputStream();
keyInputStream.initialize(keyInfo.getKeyName(), keyLocationInfos,
xceiverClientManager, requestId, verifyChecksum);
return new LengthInputStream(keyInputStream, keyInputStream.length);
}
private synchronized void initialize(String keyName,
List<OmKeyLocationInfo> blockInfos,
XceiverClientManager xceiverClientManager, String requestId,
boolean verifyChecksum) {
this.key = keyName;
this.blockOffsets = new long[blockInfos.size()];
long keyLength = 0;
for (int i = 0; i < blockInfos.size(); i++) {
OmKeyLocationInfo omKeyLocationInfo = blockInfos.get(i);
LOG.debug("Adding stream for accessing {}. The stream will be " +
"initialized later.", omKeyLocationInfo);
addStream(omKeyLocationInfo, xceiverClientManager, requestId,
verifyChecksum);
this.blockOffsets[i] = keyLength;
keyLength += omKeyLocationInfo.getLength();
}
this.length = keyLength;
}
/**
* Append another ChunkInputStreamEntry to the end of the list.
* The stream will be constructed from the input information when it needs
* to be accessed.
* Append another BlockInputStream to the end of the list. Note that the
* BlockInputStream is only created here and not initialized. The
* BlockInputStream is initialized when a read operation is performed on
* the block for the first time.
*/
private synchronized void addStream(OmKeyLocationInfo omKeyLocationInfo,
private synchronized void addStream(OmKeyLocationInfo blockInfo,
XceiverClientManager xceiverClientMngr, String clientRequestId,
boolean verifyChecksum) {
streamEntries.add(new ChunkInputStreamEntry(omKeyLocationInfo,
xceiverClientMngr, clientRequestId, verifyChecksum));
blockStreams.add(new BlockInputStream(blockInfo.getBlockID(),
blockInfo.getLength(), blockInfo.getPipeline(), blockInfo.getToken(),
verifyChecksum, clientRequestId, xceiverClientMngr));
}
private synchronized ChunkInputStreamEntry getStreamEntry(int index)
throws IOException {
return streamEntries.get(index).getStream();
@VisibleForTesting
public void addStream(BlockInputStream blockInputStream) {
blockStreams.add(blockInputStream);
}
/**
* {@inheritDoc}
*/
@Override
public synchronized int read() throws IOException {
byte[] buf = new byte[1];
@ -116,9 +140,12 @@ public class KeyInputStream extends InputStream implements Seekable {
return Byte.toUnsignedInt(buf[0]);
}
/**
* {@inheritDoc}
*/
@Override
public synchronized int read(byte[] b, int off, int len) throws IOException {
checkNotClosed();
checkOpen();
if (b == null) {
throw new NullPointerException();
}
@ -131,13 +158,15 @@ public class KeyInputStream extends InputStream implements Seekable {
int totalReadLen = 0;
while (len > 0) {
// if we are at the last block and have read the entire block, return
if (streamEntries.size() == 0 ||
(streamEntries.size() - 1 <= currentStreamIndex &&
streamEntries.get(currentStreamIndex)
.getRemaining() == 0)) {
if (blockStreams.size() == 0 ||
(blockStreams.size() - 1 <= blockIndex &&
blockStreams.get(blockIndex)
.getRemaining() == 0)) {
return totalReadLen == 0 ? EOF : totalReadLen;
}
ChunkInputStreamEntry current = getStreamEntry(currentStreamIndex);
// Get the current blockStream and read data from it
BlockInputStream current = blockStreams.get(blockIndex);
int numBytesToRead = Math.min(len, (int)current.getRemaining());
int numBytesRead = current.read(b, off, numBytesToRead);
if (numBytesRead != numBytesToRead) {
@ -146,23 +175,35 @@ public class KeyInputStream extends InputStream implements Seekable {
// this case.
throw new IOException(String.format(
"Inconsistent read for blockID=%s length=%d numBytesRead=%d",
current.blockInputStream.getBlockID(), current.length,
numBytesRead));
current.getBlockID(), current.getLength(), numBytesRead));
}
totalReadLen += numBytesRead;
off += numBytesRead;
len -= numBytesRead;
if (current.getRemaining() <= 0 &&
((currentStreamIndex + 1) < streamEntries.size())) {
currentStreamIndex += 1;
((blockIndex + 1) < blockStreams.size())) {
blockIndex += 1;
}
}
return totalReadLen;
}
/**
* Seeks the KeyInputStream to the specified position. This involves 2 steps:
* 1. Updating the blockIndex to the blockStream corresponding to the
* seeked position.
* 2. Seeking the corresponding blockStream to the adjusted position.
*
* For example, lets say the block size is 200 bytes and block[0] stores
* data from indices 0 - 199, block[1] from indices 200 - 399 and so on.
* Lets say we seek to position 240. In the first step, the blockIndex
* would be updated to 1 as indices 200 - 399 reside in blockStream[1]. In
* the second step, the blockStream[1] would be seeked to position 40 (=
* 240 - blockOffset[1] (= 200)).
*/
@Override
public void seek(long pos) throws IOException {
checkNotClosed();
public synchronized void seek(long pos) throws IOException {
checkOpen();
if (pos < 0 || pos >= length) {
if (pos == 0) {
// It is possible for length and pos to be zero in which case
@ -172,35 +213,39 @@ public class KeyInputStream extends InputStream implements Seekable {
throw new EOFException(
"EOF encountered at pos: " + pos + " for key: " + key);
}
Preconditions.assertTrue(currentStreamIndex >= 0);
if (currentStreamIndex >= streamEntries.size()) {
currentStreamIndex = Arrays.binarySearch(streamOffset, pos);
} else if (pos < streamOffset[currentStreamIndex]) {
currentStreamIndex =
Arrays.binarySearch(streamOffset, 0, currentStreamIndex, pos);
} else if (pos >= streamOffset[currentStreamIndex] + streamEntries
.get(currentStreamIndex).length) {
currentStreamIndex = Arrays
.binarySearch(streamOffset, currentStreamIndex + 1,
streamEntries.size(), pos);
// 1. Update the blockIndex
if (blockIndex >= blockStreams.size()) {
blockIndex = Arrays.binarySearch(blockOffsets, pos);
} else if (pos < blockOffsets[blockIndex]) {
blockIndex =
Arrays.binarySearch(blockOffsets, 0, blockIndex, pos);
} else if (pos >= blockOffsets[blockIndex] + blockStreams
.get(blockIndex).getLength()) {
blockIndex = Arrays
.binarySearch(blockOffsets, blockIndex + 1,
blockStreams.size(), pos);
}
if (currentStreamIndex < 0) {
if (blockIndex < 0) {
// Binary search returns -insertionPoint - 1 if element is not present
// in the array. insertionPoint is the point at which element would be
// inserted in the sorted array. We need to adjust the currentStreamIndex
// accordingly so that currentStreamIndex = insertionPoint - 1
currentStreamIndex = -currentStreamIndex - 2;
// inserted in the sorted array. We need to adjust the blockIndex
// accordingly so that blockIndex = insertionPoint - 1
blockIndex = -blockIndex - 2;
}
// seek to the proper offset in the BlockInputStream
streamEntries.get(currentStreamIndex)
.seek(pos - streamOffset[currentStreamIndex]);
// Reset the previous blockStream's position
blockStreams.get(blockIndexOfPrevPosition).resetPosition();
// 2. Seek the blockStream to the adjusted position
blockStreams.get(blockIndex).seek(pos - blockOffsets[blockIndex]);
blockIndexOfPrevPosition = blockIndex;
}
@Override
public long getPos() throws IOException {
return length == 0 ? 0 :
streamOffset[currentStreamIndex] + streamEntries.get(currentStreamIndex)
.getPos();
public synchronized long getPos() throws IOException {
return length == 0 ? 0 : blockOffsets[blockIndex] +
blockStreams.get(blockIndex).getPos();
}
@Override
@ -210,7 +255,7 @@ public class KeyInputStream extends InputStream implements Seekable {
@Override
public int available() throws IOException {
checkNotClosed();
checkOpen();
long remaining = length - getPos();
return remaining <= Integer.MAX_VALUE ? (int) remaining : Integer.MAX_VALUE;
}
@ -218,177 +263,30 @@ public class KeyInputStream extends InputStream implements Seekable {
@Override
public void close() throws IOException {
closed = true;
for (int i = 0; i < streamEntries.size(); i++) {
streamEntries.get(i).close();
for (BlockInputStream blockStream : blockStreams) {
blockStream.close();
}
}
/**
* Encapsulates BlockInputStream.
*/
public static class ChunkInputStreamEntry extends InputStream
implements Seekable {
private BlockInputStream blockInputStream;
private final OmKeyLocationInfo blockLocationInfo;
private final long length;
private final XceiverClientManager xceiverClientManager;
private final String requestId;
private boolean verifyChecksum;
// the position of the blockInputStream is maintained by this variable
// till the stream is initialized
private long position;
public ChunkInputStreamEntry(OmKeyLocationInfo omKeyLocationInfo,
XceiverClientManager xceiverClientMngr, String clientRequestId,
boolean verifyChecksum) {
this.blockLocationInfo = omKeyLocationInfo;
this.length = omKeyLocationInfo.getLength();
this.xceiverClientManager = xceiverClientMngr;
this.requestId = clientRequestId;
this.verifyChecksum = verifyChecksum;
}
@VisibleForTesting
public ChunkInputStreamEntry(BlockInputStream blockInputStream,
long length) {
this.blockInputStream = blockInputStream;
this.length = length;
this.blockLocationInfo = null;
this.xceiverClientManager = null;
this.requestId = null;
}
private ChunkInputStreamEntry getStream() throws IOException {
if (this.blockInputStream == null) {
initializeBlockInputStream();
}
return this;
}
private void initializeBlockInputStream() throws IOException {
BlockID blockID = blockLocationInfo.getBlockID();
long containerID = blockID.getContainerID();
Pipeline pipeline = blockLocationInfo.getPipeline();
// irrespective of the container state, we will always read via Standalone
// protocol.
if (pipeline.getType() != HddsProtos.ReplicationType.STAND_ALONE) {
pipeline = Pipeline.newBuilder(pipeline)
.setType(HddsProtos.ReplicationType.STAND_ALONE).build();
}
XceiverClientSpi xceiverClient = xceiverClientManager
.acquireClient(pipeline);
boolean success = false;
long containerKey = blockLocationInfo.getLocalID();
try {
LOG.debug("Initializing stream for get key to access {} {}",
containerID, containerKey);
ContainerProtos.DatanodeBlockID datanodeBlockID = blockID
.getDatanodeBlockIDProtobuf();
if (blockLocationInfo.getToken() != null) {
UserGroupInformation.getCurrentUser().
addToken(blockLocationInfo.getToken());
}
ContainerProtos.GetBlockResponseProto response = ContainerProtocolCalls
.getBlock(xceiverClient, datanodeBlockID, requestId);
List<ContainerProtos.ChunkInfo> chunks =
response.getBlockData().getChunksList();
success = true;
this.blockInputStream = new BlockInputStream(
blockLocationInfo.getBlockID(), xceiverClientManager, xceiverClient,
chunks, requestId, verifyChecksum, position);
} finally {
if (!success) {
xceiverClientManager.releaseClient(xceiverClient, false);
}
}
}
synchronized long getRemaining() throws IOException {
return length - getPos();
}
@Override
public synchronized int read(byte[] b, int off, int len)
throws IOException {
int readLen = blockInputStream.read(b, off, len);
return readLen;
}
@Override
public synchronized int read() throws IOException {
int data = blockInputStream.read();
return data;
}
@Override
public synchronized void close() throws IOException {
if (blockInputStream != null) {
blockInputStream.close();
}
}
@Override
public void seek(long pos) throws IOException {
if (blockInputStream != null) {
blockInputStream.seek(pos);
} else {
position = pos;
}
}
@Override
public long getPos() throws IOException {
if (blockInputStream != null) {
return blockInputStream.getPos();
} else {
return position;
}
}
@Override
public boolean seekToNewSource(long targetPos) throws IOException {
return false;
}
}
public static LengthInputStream getFromOmKeyInfo(
OmKeyInfo keyInfo,
XceiverClientManager xceiverClientManager,
StorageContainerLocationProtocol
storageContainerLocationClient,
String requestId, boolean verifyChecksum) throws IOException {
long length = 0;
KeyInputStream groupInputStream = new KeyInputStream();
groupInputStream.key = keyInfo.getKeyName();
List<OmKeyLocationInfo> keyLocationInfos =
keyInfo.getLatestVersionLocations().getBlocksLatestVersionOnly();
groupInputStream.streamOffset = new long[keyLocationInfos.size()];
for (int i = 0; i < keyLocationInfos.size(); i++) {
OmKeyLocationInfo omKeyLocationInfo = keyLocationInfos.get(i);
LOG.debug("Adding stream for accessing {}. The stream will be " +
"initialized later.", omKeyLocationInfo);
groupInputStream.addStream(omKeyLocationInfo, xceiverClientManager,
requestId, verifyChecksum);
groupInputStream.streamOffset[i] = length;
length += omKeyLocationInfo.getLength();
}
groupInputStream.length = length;
return new LengthInputStream(groupInputStream, length);
}
/**
* Verify that the input stream is open. Non blocking; this gives
* the last state of the volatile {@link #closed} field.
* @throws IOException if the connection is closed.
*/
private void checkNotClosed() throws IOException {
private void checkOpen() throws IOException {
if (closed) {
throw new IOException(
": " + FSExceptionMessages.STREAM_IS_CLOSED + " Key: " + key);
}
}
@VisibleForTesting
public synchronized int getCurrentStreamIndex() {
return blockIndex;
}
@VisibleForTesting
public long getRemainingOfIndex(int index) throws IOException {
return blockStreams.get(index).getRemaining();
}
}

View File

@ -1072,8 +1072,8 @@ public class RpcClient implements ClientProtocol, KeyProviderTokenIssuer {
private OzoneInputStream createInputStream(OmKeyInfo keyInfo,
String requestId) throws IOException {
LengthInputStream lengthInputStream = KeyInputStream
.getFromOmKeyInfo(keyInfo, xceiverClientManager,
storageContainerLocationClient, requestId, verifyChecksum);
.getFromOmKeyInfo(keyInfo, xceiverClientManager, requestId,
verifyChecksum);
FileEncryptionInfo feInfo = keyInfo.getFileEncryptionInfo();
if (feInfo != null) {
final KeyProvider.KeyVersion decrypted = getDEK(feInfo);

View File

@ -503,8 +503,7 @@ public final class DistributedStorageHandler implements StorageHandler {
.build();
OmKeyInfo keyInfo = ozoneManagerClient.lookupKey(keyArgs);
return KeyInputStream.getFromOmKeyInfo(
keyInfo, xceiverClientManager, storageContainerLocationClient,
args.getRequestID(), verifyChecksum);
keyInfo, xceiverClientManager, args.getRequestID(), verifyChecksum);
}
@Override

View File

@ -25,7 +25,6 @@ import org.junit.rules.ExpectedException;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.util.ArrayList;
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.junit.Assert.assertEquals;
@ -48,8 +47,7 @@ public class TestChunkStreams {
for (int i = 0; i < 5; i++) {
int tempOffset = offset;
BlockInputStream in =
new BlockInputStream(null, null, null, new ArrayList<>(), null,
true, 0) {
new BlockInputStream(null, 100, null, null, true, null, null) {
private long pos = 0;
private ByteArrayInputStream in =
new ByteArrayInputStream(buf, tempOffset, 100);
@ -84,7 +82,7 @@ public class TestChunkStreams {
}
};
offset += 100;
groupInputStream.addStream(in, 100);
groupInputStream.addStream(in);
}
byte[] resBuf = new byte[500];
@ -105,8 +103,7 @@ public class TestChunkStreams {
for (int i = 0; i < 5; i++) {
int tempOffset = offset;
BlockInputStream in =
new BlockInputStream(null, null, null, new ArrayList<>(), null,
true, 0) {
new BlockInputStream(null, 100, null, null, true, null, null) {
private long pos = 0;
private ByteArrayInputStream in =
new ByteArrayInputStream(buf, tempOffset, 100);
@ -141,7 +138,7 @@ public class TestChunkStreams {
}
};
offset += 100;
groupInputStream.addStream(in, 100);
groupInputStream.addStream(in);
}
byte[] resBuf = new byte[600];