HADOOP-14722. Azure: BlockBlobInputStream position incorrect after seek.

Contributed by Thomas Marquardt

(cherry picked from commit d91b7a8451)
This commit is contained in:
Steve Loughran 2017-08-06 20:30:40 +01:00
parent b9549e108e
commit 7f7ab0302f
No known key found for this signature in database
GPG Key ID: 950CC3E032B79CA2
2 changed files with 155 additions and 31 deletions

View File

@ -43,11 +43,16 @@ final class BlockBlobInputStream extends InputStream implements Seekable {
private InputStream blobInputStream = null;
private int minimumReadSizeInBytes = 0;
private long streamPositionAfterLastRead = -1;
// position of next network read within stream
private long streamPosition = 0;
// length of stream
private long streamLength = 0;
private boolean closed = false;
// internal buffer, re-used for performance optimization
private byte[] streamBuffer;
// zero-based offset within streamBuffer of current read position
private int streamBufferPosition;
// length of data written to streamBuffer, streamBuffer may be larger
private int streamBufferLength;
/**
@ -81,6 +86,16 @@ final class BlockBlobInputStream extends InputStream implements Seekable {
}
}
/**
* Reset the internal stream buffer but do not release the memory.
* The buffer can be reused to avoid frequent memory allocations of
* a large buffer.
*/
private void resetStreamBuffer() {
streamBufferPosition = 0;
streamBufferLength = 0;
}
/**
* Gets the read position of the stream.
* @return the zero-based byte offset of the read position.
@ -89,7 +104,9 @@ final class BlockBlobInputStream extends InputStream implements Seekable {
@Override
public synchronized long getPos() throws IOException {
checkState();
return streamPosition;
return (streamBuffer != null)
? streamPosition - streamBufferLength + streamBufferPosition
: streamPosition;
}
/**
@ -107,21 +124,39 @@ final class BlockBlobInputStream extends InputStream implements Seekable {
throw new EOFException(
FSExceptionMessages.CANNOT_SEEK_PAST_EOF + " " + pos);
}
if (pos == getPos()) {
// calculate offset between the target and current position in the stream
long offset = pos - getPos();
if (offset == 0) {
// no=op, no state change
return;
}
if (streamBuffer != null) {
long offset = streamPosition - pos;
if (offset > 0 && offset < streamBufferLength) {
streamBufferPosition = streamBufferLength - (int) offset;
} else {
streamBufferPosition = streamBufferLength;
if (offset > 0) {
// forward seek, data can be skipped as an optimization
if (skip(offset) != offset) {
throw new EOFException(FSExceptionMessages.EOF_IN_READ_FULLY);
}
return;
}
// reverse seek, offset is negative
if (streamBuffer != null) {
if (streamBufferPosition + offset >= 0) {
// target position is inside the stream buffer,
// only need to move backwards within the stream buffer
streamBufferPosition += offset;
} else {
// target position is outside the stream buffer,
// need to reset stream buffer and move position for next network read
resetStreamBuffer();
streamPosition = pos;
}
} else {
streamPosition = pos;
}
streamPosition = pos;
// close BlobInputStream after seek is invoked because BlobInputStream
// does not support seek
closeBlobInputStream();
@ -189,8 +224,7 @@ final class BlockBlobInputStream extends InputStream implements Seekable {
streamBuffer = new byte[(int) Math.min(minimumReadSizeInBytes,
streamLength)];
}
streamBufferPosition = 0;
streamBufferLength = 0;
resetStreamBuffer();
outputStream = new MemoryOutputStream(streamBuffer, streamBufferPosition,
streamBuffer.length);
needToCopy = true;
@ -295,27 +329,44 @@ final class BlockBlobInputStream extends InputStream implements Seekable {
* @param n the number of bytes to be skipped.
* @return the actual number of bytes skipped.
* @throws IOException IO failure
* @throws IndexOutOfBoundsException if n is negative or if the sum of n
* and the current value of getPos() is greater than the length of the stream.
*/
@Override
public synchronized long skip(long n) throws IOException {
checkState();
if (blobInputStream != null) {
return blobInputStream.skip(n);
} else {
if (n < 0 || streamPosition + n > streamLength) {
throw new IndexOutOfBoundsException("skip range");
}
if (streamBuffer != null) {
streamBufferPosition = (n < streamBufferLength - streamBufferPosition)
? streamBufferPosition + (int) n
: streamBufferLength;
}
streamPosition += n;
return n;
// blobInput stream is open; delegate the work to it
long skipped = blobInputStream.skip(n);
// update position to the actual skip value
streamPosition += skipped;
return skipped;
}
// no blob stream; implement the skip logic directly
if (n < 0 || n > streamLength - getPos()) {
throw new IndexOutOfBoundsException("skip range");
}
if (streamBuffer != null) {
// there's a buffer, so seek with it
if (n < streamBufferLength - streamBufferPosition) {
// new range is in the buffer, so just update the buffer position
// skip within the buffer.
streamBufferPosition += (int) n;
} else {
// skip is out of range, so move position to ne value and reset
// the buffer ready for the next read()
streamPosition = getPos() + n;
resetStreamBuffer();
}
} else {
// no stream buffer; increment the stream position ready for
// the next triggered connection & read
streamPosition += n;
}
return n;
}
/**

View File

@ -155,7 +155,7 @@ public class TestBlockBlobInputStream extends AbstractWasbTestBase {
}
LOG.info("Creating test file {} of size: {}", TEST_FILE_PATH,
TEST_FILE_SIZE );
TEST_FILE_SIZE);
ContractTestUtils.NanoTimer timer = new ContractTestUtils.NanoTimer();
try(FSDataOutputStream outputStream = fs.create(TEST_FILE_PATH)) {
@ -198,7 +198,7 @@ public class TestBlockBlobInputStream extends AbstractWasbTestBase {
}
@Test
public void test_0200_BasicReadTestV2() throws Exception {
public void test_0200_BasicReadTest() throws Exception {
assumeHugeFileExists();
try (
@ -214,12 +214,12 @@ public class TestBlockBlobInputStream extends AbstractWasbTestBase {
// v1 forward seek and read a kilobyte into first kilobyte of bufferV1
inputStreamV1.seek(5 * MEGABYTE);
int numBytesReadV1 = inputStreamV1.read(bufferV1, 0, KILOBYTE);
assertEquals(numBytesReadV1, KILOBYTE);
assertEquals(KILOBYTE, numBytesReadV1);
// v2 forward seek and read a kilobyte into first kilobyte of bufferV2
inputStreamV2.seek(5 * MEGABYTE);
int numBytesReadV2 = inputStreamV2.read(bufferV2, 0, KILOBYTE);
assertEquals(numBytesReadV2, KILOBYTE);
assertEquals(KILOBYTE, numBytesReadV2);
assertArrayEquals(bufferV1, bufferV2);
@ -229,17 +229,90 @@ public class TestBlockBlobInputStream extends AbstractWasbTestBase {
// v1 reverse seek and read a megabyte into last megabyte of bufferV1
inputStreamV1.seek(3 * MEGABYTE);
numBytesReadV1 = inputStreamV1.read(bufferV1, offset, len);
assertEquals(numBytesReadV1, len);
assertEquals(len, numBytesReadV1);
// v2 reverse seek and read a megabyte into last megabyte of bufferV2
inputStreamV2.seek(3 * MEGABYTE);
numBytesReadV2 = inputStreamV2.read(bufferV2, offset, len);
assertEquals(numBytesReadV2, len);
assertEquals(len, numBytesReadV2);
assertArrayEquals(bufferV1, bufferV2);
}
}
@Test
public void test_0201_RandomReadTest() throws Exception {
assumeHugeFileExists();
try (
FSDataInputStream inputStreamV1
= accountUsingInputStreamV1.getFileSystem().open(TEST_FILE_PATH);
FSDataInputStream inputStreamV2
= accountUsingInputStreamV2.getFileSystem().open(TEST_FILE_PATH);
) {
final int bufferSize = 4 * KILOBYTE;
byte[] bufferV1 = new byte[bufferSize];
byte[] bufferV2 = new byte[bufferV1.length];
verifyConsistentReads(inputStreamV1, inputStreamV2, bufferV1, bufferV2);
inputStreamV1.seek(0);
inputStreamV2.seek(0);
verifyConsistentReads(inputStreamV1, inputStreamV2, bufferV1, bufferV2);
verifyConsistentReads(inputStreamV1, inputStreamV2, bufferV1, bufferV2);
int seekPosition = 2 * KILOBYTE;
inputStreamV1.seek(seekPosition);
inputStreamV2.seek(seekPosition);
inputStreamV1.seek(0);
inputStreamV2.seek(0);
verifyConsistentReads(inputStreamV1, inputStreamV2, bufferV1, bufferV2);
verifyConsistentReads(inputStreamV1, inputStreamV2, bufferV1, bufferV2);
verifyConsistentReads(inputStreamV1, inputStreamV2, bufferV1, bufferV2);
seekPosition = 5 * KILOBYTE;
inputStreamV1.seek(seekPosition);
inputStreamV2.seek(seekPosition);
verifyConsistentReads(inputStreamV1, inputStreamV2, bufferV1, bufferV2);
seekPosition = 10 * KILOBYTE;
inputStreamV1.seek(seekPosition);
inputStreamV2.seek(seekPosition);
verifyConsistentReads(inputStreamV1, inputStreamV2, bufferV1, bufferV2);
verifyConsistentReads(inputStreamV1, inputStreamV2, bufferV1, bufferV2);
seekPosition = 4100 * KILOBYTE;
inputStreamV1.seek(seekPosition);
inputStreamV2.seek(seekPosition);
verifyConsistentReads(inputStreamV1, inputStreamV2, bufferV1, bufferV2);
}
}
private void verifyConsistentReads(FSDataInputStream inputStreamV1,
FSDataInputStream inputStreamV2,
byte[] bufferV1,
byte[] bufferV2) throws IOException {
int size = bufferV1.length;
final int numBytesReadV1 = inputStreamV1.read(bufferV1, 0, size);
assertEquals("Bytes read from V1 stream", size, numBytesReadV1);
final int numBytesReadV2 = inputStreamV2.read(bufferV2, 0, size);
assertEquals("Bytes read from V2 stream", size, numBytesReadV2);
assertArrayEquals("Mismatch in read data", bufferV1, bufferV2);
}
/**
* Validates the implementation of InputStream.markSupported.
* @throws IOException