HADOOP-14722. Azure: BlockBlobInputStream position incorrect after seek.
Contributed by Thomas Marquardt
This commit is contained in:
parent
024c3ec4a3
commit
d91b7a8451
|
@ -43,11 +43,16 @@ final class BlockBlobInputStream extends InputStream implements Seekable {
|
|||
private InputStream blobInputStream = null;
|
||||
private int minimumReadSizeInBytes = 0;
|
||||
private long streamPositionAfterLastRead = -1;
|
||||
// position of next network read within stream
|
||||
private long streamPosition = 0;
|
||||
// length of stream
|
||||
private long streamLength = 0;
|
||||
private boolean closed = false;
|
||||
// internal buffer, re-used for performance optimization
|
||||
private byte[] streamBuffer;
|
||||
// zero-based offset within streamBuffer of current read position
|
||||
private int streamBufferPosition;
|
||||
// length of data written to streamBuffer, streamBuffer may be larger
|
||||
private int streamBufferLength;
|
||||
|
||||
/**
|
||||
|
@ -81,6 +86,16 @@ final class BlockBlobInputStream extends InputStream implements Seekable {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Reset the internal stream buffer but do not release the memory.
|
||||
* The buffer can be reused to avoid frequent memory allocations of
|
||||
* a large buffer.
|
||||
*/
|
||||
private void resetStreamBuffer() {
|
||||
streamBufferPosition = 0;
|
||||
streamBufferLength = 0;
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets the read position of the stream.
|
||||
* @return the zero-based byte offset of the read position.
|
||||
|
@ -89,7 +104,9 @@ final class BlockBlobInputStream extends InputStream implements Seekable {
|
|||
@Override
|
||||
public synchronized long getPos() throws IOException {
|
||||
checkState();
|
||||
return streamPosition;
|
||||
return (streamBuffer != null)
|
||||
? streamPosition - streamBufferLength + streamBufferPosition
|
||||
: streamPosition;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -107,21 +124,39 @@ final class BlockBlobInputStream extends InputStream implements Seekable {
|
|||
throw new EOFException(
|
||||
FSExceptionMessages.CANNOT_SEEK_PAST_EOF + " " + pos);
|
||||
}
|
||||
if (pos == getPos()) {
|
||||
|
||||
// calculate offset between the target and current position in the stream
|
||||
long offset = pos - getPos();
|
||||
|
||||
if (offset == 0) {
|
||||
// no=op, no state change
|
||||
return;
|
||||
}
|
||||
|
||||
if (streamBuffer != null) {
|
||||
long offset = streamPosition - pos;
|
||||
if (offset > 0 && offset < streamBufferLength) {
|
||||
streamBufferPosition = streamBufferLength - (int) offset;
|
||||
} else {
|
||||
streamBufferPosition = streamBufferLength;
|
||||
if (offset > 0) {
|
||||
// forward seek, data can be skipped as an optimization
|
||||
if (skip(offset) != offset) {
|
||||
throw new EOFException(FSExceptionMessages.EOF_IN_READ_FULLY);
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
// reverse seek, offset is negative
|
||||
if (streamBuffer != null) {
|
||||
if (streamBufferPosition + offset >= 0) {
|
||||
// target position is inside the stream buffer,
|
||||
// only need to move backwards within the stream buffer
|
||||
streamBufferPosition += offset;
|
||||
} else {
|
||||
// target position is outside the stream buffer,
|
||||
// need to reset stream buffer and move position for next network read
|
||||
resetStreamBuffer();
|
||||
streamPosition = pos;
|
||||
}
|
||||
} else {
|
||||
streamPosition = pos;
|
||||
}
|
||||
|
||||
streamPosition = pos;
|
||||
// close BlobInputStream after seek is invoked because BlobInputStream
|
||||
// does not support seek
|
||||
closeBlobInputStream();
|
||||
|
@ -189,8 +224,7 @@ final class BlockBlobInputStream extends InputStream implements Seekable {
|
|||
streamBuffer = new byte[(int) Math.min(minimumReadSizeInBytes,
|
||||
streamLength)];
|
||||
}
|
||||
streamBufferPosition = 0;
|
||||
streamBufferLength = 0;
|
||||
resetStreamBuffer();
|
||||
outputStream = new MemoryOutputStream(streamBuffer, streamBufferPosition,
|
||||
streamBuffer.length);
|
||||
needToCopy = true;
|
||||
|
@ -295,27 +329,44 @@ final class BlockBlobInputStream extends InputStream implements Seekable {
|
|||
* @param n the number of bytes to be skipped.
|
||||
* @return the actual number of bytes skipped.
|
||||
* @throws IOException IO failure
|
||||
* @throws IndexOutOfBoundsException if n is negative or if the sum of n
|
||||
* and the current value of getPos() is greater than the length of the stream.
|
||||
*/
|
||||
@Override
|
||||
public synchronized long skip(long n) throws IOException {
|
||||
checkState();
|
||||
|
||||
if (blobInputStream != null) {
|
||||
return blobInputStream.skip(n);
|
||||
} else {
|
||||
if (n < 0 || streamPosition + n > streamLength) {
|
||||
throw new IndexOutOfBoundsException("skip range");
|
||||
}
|
||||
|
||||
if (streamBuffer != null) {
|
||||
streamBufferPosition = (n < streamBufferLength - streamBufferPosition)
|
||||
? streamBufferPosition + (int) n
|
||||
: streamBufferLength;
|
||||
}
|
||||
|
||||
streamPosition += n;
|
||||
return n;
|
||||
// blobInput stream is open; delegate the work to it
|
||||
long skipped = blobInputStream.skip(n);
|
||||
// update position to the actual skip value
|
||||
streamPosition += skipped;
|
||||
return skipped;
|
||||
}
|
||||
|
||||
// no blob stream; implement the skip logic directly
|
||||
if (n < 0 || n > streamLength - getPos()) {
|
||||
throw new IndexOutOfBoundsException("skip range");
|
||||
}
|
||||
|
||||
if (streamBuffer != null) {
|
||||
// there's a buffer, so seek with it
|
||||
if (n < streamBufferLength - streamBufferPosition) {
|
||||
// new range is in the buffer, so just update the buffer position
|
||||
// skip within the buffer.
|
||||
streamBufferPosition += (int) n;
|
||||
} else {
|
||||
// skip is out of range, so move position to ne value and reset
|
||||
// the buffer ready for the next read()
|
||||
streamPosition = getPos() + n;
|
||||
resetStreamBuffer();
|
||||
}
|
||||
} else {
|
||||
// no stream buffer; increment the stream position ready for
|
||||
// the next triggered connection & read
|
||||
streamPosition += n;
|
||||
}
|
||||
return n;
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -155,7 +155,7 @@ public class TestBlockBlobInputStream extends AbstractWasbTestBase {
|
|||
}
|
||||
|
||||
LOG.info("Creating test file {} of size: {}", TEST_FILE_PATH,
|
||||
TEST_FILE_SIZE );
|
||||
TEST_FILE_SIZE);
|
||||
ContractTestUtils.NanoTimer timer = new ContractTestUtils.NanoTimer();
|
||||
|
||||
try(FSDataOutputStream outputStream = fs.create(TEST_FILE_PATH)) {
|
||||
|
@ -198,7 +198,7 @@ public class TestBlockBlobInputStream extends AbstractWasbTestBase {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void test_0200_BasicReadTestV2() throws Exception {
|
||||
public void test_0200_BasicReadTest() throws Exception {
|
||||
assumeHugeFileExists();
|
||||
|
||||
try (
|
||||
|
@ -214,12 +214,12 @@ public class TestBlockBlobInputStream extends AbstractWasbTestBase {
|
|||
// v1 forward seek and read a kilobyte into first kilobyte of bufferV1
|
||||
inputStreamV1.seek(5 * MEGABYTE);
|
||||
int numBytesReadV1 = inputStreamV1.read(bufferV1, 0, KILOBYTE);
|
||||
assertEquals(numBytesReadV1, KILOBYTE);
|
||||
assertEquals(KILOBYTE, numBytesReadV1);
|
||||
|
||||
// v2 forward seek and read a kilobyte into first kilobyte of bufferV2
|
||||
inputStreamV2.seek(5 * MEGABYTE);
|
||||
int numBytesReadV2 = inputStreamV2.read(bufferV2, 0, KILOBYTE);
|
||||
assertEquals(numBytesReadV2, KILOBYTE);
|
||||
assertEquals(KILOBYTE, numBytesReadV2);
|
||||
|
||||
assertArrayEquals(bufferV1, bufferV2);
|
||||
|
||||
|
@ -229,17 +229,90 @@ public class TestBlockBlobInputStream extends AbstractWasbTestBase {
|
|||
// v1 reverse seek and read a megabyte into last megabyte of bufferV1
|
||||
inputStreamV1.seek(3 * MEGABYTE);
|
||||
numBytesReadV1 = inputStreamV1.read(bufferV1, offset, len);
|
||||
assertEquals(numBytesReadV1, len);
|
||||
assertEquals(len, numBytesReadV1);
|
||||
|
||||
// v2 reverse seek and read a megabyte into last megabyte of bufferV2
|
||||
inputStreamV2.seek(3 * MEGABYTE);
|
||||
numBytesReadV2 = inputStreamV2.read(bufferV2, offset, len);
|
||||
assertEquals(numBytesReadV2, len);
|
||||
assertEquals(len, numBytesReadV2);
|
||||
|
||||
assertArrayEquals(bufferV1, bufferV2);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void test_0201_RandomReadTest() throws Exception {
|
||||
assumeHugeFileExists();
|
||||
|
||||
try (
|
||||
FSDataInputStream inputStreamV1
|
||||
= accountUsingInputStreamV1.getFileSystem().open(TEST_FILE_PATH);
|
||||
|
||||
FSDataInputStream inputStreamV2
|
||||
= accountUsingInputStreamV2.getFileSystem().open(TEST_FILE_PATH);
|
||||
) {
|
||||
final int bufferSize = 4 * KILOBYTE;
|
||||
byte[] bufferV1 = new byte[bufferSize];
|
||||
byte[] bufferV2 = new byte[bufferV1.length];
|
||||
|
||||
verifyConsistentReads(inputStreamV1, inputStreamV2, bufferV1, bufferV2);
|
||||
|
||||
inputStreamV1.seek(0);
|
||||
inputStreamV2.seek(0);
|
||||
|
||||
verifyConsistentReads(inputStreamV1, inputStreamV2, bufferV1, bufferV2);
|
||||
|
||||
verifyConsistentReads(inputStreamV1, inputStreamV2, bufferV1, bufferV2);
|
||||
|
||||
int seekPosition = 2 * KILOBYTE;
|
||||
inputStreamV1.seek(seekPosition);
|
||||
inputStreamV2.seek(seekPosition);
|
||||
|
||||
inputStreamV1.seek(0);
|
||||
inputStreamV2.seek(0);
|
||||
|
||||
verifyConsistentReads(inputStreamV1, inputStreamV2, bufferV1, bufferV2);
|
||||
|
||||
verifyConsistentReads(inputStreamV1, inputStreamV2, bufferV1, bufferV2);
|
||||
|
||||
verifyConsistentReads(inputStreamV1, inputStreamV2, bufferV1, bufferV2);
|
||||
|
||||
seekPosition = 5 * KILOBYTE;
|
||||
inputStreamV1.seek(seekPosition);
|
||||
inputStreamV2.seek(seekPosition);
|
||||
|
||||
verifyConsistentReads(inputStreamV1, inputStreamV2, bufferV1, bufferV2);
|
||||
|
||||
seekPosition = 10 * KILOBYTE;
|
||||
inputStreamV1.seek(seekPosition);
|
||||
inputStreamV2.seek(seekPosition);
|
||||
|
||||
verifyConsistentReads(inputStreamV1, inputStreamV2, bufferV1, bufferV2);
|
||||
|
||||
verifyConsistentReads(inputStreamV1, inputStreamV2, bufferV1, bufferV2);
|
||||
|
||||
seekPosition = 4100 * KILOBYTE;
|
||||
inputStreamV1.seek(seekPosition);
|
||||
inputStreamV2.seek(seekPosition);
|
||||
|
||||
verifyConsistentReads(inputStreamV1, inputStreamV2, bufferV1, bufferV2);
|
||||
}
|
||||
}
|
||||
|
||||
private void verifyConsistentReads(FSDataInputStream inputStreamV1,
|
||||
FSDataInputStream inputStreamV2,
|
||||
byte[] bufferV1,
|
||||
byte[] bufferV2) throws IOException {
|
||||
int size = bufferV1.length;
|
||||
final int numBytesReadV1 = inputStreamV1.read(bufferV1, 0, size);
|
||||
assertEquals("Bytes read from V1 stream", size, numBytesReadV1);
|
||||
|
||||
final int numBytesReadV2 = inputStreamV2.read(bufferV2, 0, size);
|
||||
assertEquals("Bytes read from V2 stream", size, numBytesReadV2);
|
||||
|
||||
assertArrayEquals("Mismatch in read data", bufferV1, bufferV2);
|
||||
}
|
||||
|
||||
/**
|
||||
* Validates the implementation of InputStream.markSupported.
|
||||
* @throws IOException
|
||||
|
|
Loading…
Reference in New Issue