diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/PageBlobInputStream.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/PageBlobInputStream.java
index aaac4904133..40bf6f4ae1a 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/PageBlobInputStream.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/PageBlobInputStream.java
@@ -25,12 +25,14 @@ import static org.apache.hadoop.fs.azure.PageBlobFormatHelpers.toShort;
import static org.apache.hadoop.fs.azure.PageBlobFormatHelpers.withMD5Checking;
import java.io.ByteArrayOutputStream;
+import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FSExceptionMessages;
import org.apache.hadoop.fs.azure.StorageInterface.CloudPageBlobWrapper;
import com.microsoft.azure.storage.OperationContext;
@@ -58,7 +60,9 @@ final class PageBlobInputStream extends InputStream {
// The buffer holding the current data we last read from the server.
private byte[] currentBuffer;
// The current byte offset we're at in the buffer.
- private int currentOffsetInBuffer;
+ private int currentBufferOffset;
+ // The current buffer length
+ private int currentBufferLength;
// Maximum number of pages to get per any one request.
private static final int MAX_PAGES_PER_DOWNLOAD =
4 * 1024 * 1024 / PAGE_SIZE;
@@ -174,7 +178,7 @@ final class PageBlobInputStream extends InputStream {
private boolean dataAvailableInBuffer() {
return currentBuffer != null
- && currentOffsetInBuffer < currentBuffer.length;
+ && currentBufferOffset < currentBufferLength;
}
/**
@@ -194,6 +198,8 @@ final class PageBlobInputStream extends InputStream {
return true;
}
currentBuffer = null;
+ currentBufferOffset = 0;
+ currentBufferLength = 0;
if (numberOfPagesRemaining == 0) {
// No more data to read.
return false;
@@ -209,43 +215,48 @@ final class PageBlobInputStream extends InputStream {
ByteArrayOutputStream baos = new ByteArrayOutputStream(bufferSize);
blob.downloadRange(currentOffsetInBlob, bufferSize, baos,
withMD5Checking(), opContext);
- currentBuffer = baos.toByteArray();
+ validateDataIntegrity(baos.toByteArray());
} catch (StorageException e) {
throw new IOException(e);
}
numberOfPagesRemaining -= pagesToRead;
currentOffsetInBlob += bufferSize;
- currentOffsetInBuffer = PAGE_HEADER_SIZE;
-
- // Since we just downloaded a new buffer, validate its consistency.
- validateCurrentBufferConsistency();
return true;
}
- private void validateCurrentBufferConsistency()
+ private void validateDataIntegrity(byte[] buffer)
throws IOException {
- if (currentBuffer.length % PAGE_SIZE != 0) {
+
+ if (buffer.length % PAGE_SIZE != 0) {
throw new AssertionError("Unexpected buffer size: "
- + currentBuffer.length);
+ + buffer.length);
}
- int numberOfPages = currentBuffer.length / PAGE_SIZE;
+
+ int bufferLength = 0;
+ int numberOfPages = buffer.length / PAGE_SIZE;
+ long totalPagesAfterCurrent = numberOfPagesRemaining;
+
for (int page = 0; page < numberOfPages; page++) {
- short currentPageSize = getPageSize(blob, currentBuffer,
- page * PAGE_SIZE);
- // Calculate the number of pages that exist after this one
- // in the blob.
- long totalPagesAfterCurrent =
- (numberOfPages - page - 1) + numberOfPagesRemaining;
- // Only the last page is allowed to be not filled completely.
- if (currentPageSize < PAGE_DATA_SIZE
+ // Calculate the number of pages that exist in the blob after this one
+ totalPagesAfterCurrent--;
+
+ short currentPageSize = getPageSize(blob, buffer, page * PAGE_SIZE);
+
+ // Only the last page can be partially filled.
+ if (currentPageSize < PAGE_DATA_SIZE
&& totalPagesAfterCurrent > 0) {
throw fileCorruptException(blob, String.format(
- "Page with partial data found in the middle (%d pages from the"
- + " end) that only has %d bytes of data.",
- totalPagesAfterCurrent, currentPageSize));
+ "Page with partial data found in the middle (%d pages from the"
+ + " end) that only has %d bytes of data.",
+ totalPagesAfterCurrent, currentPageSize));
}
+ bufferLength += currentPageSize + PAGE_HEADER_SIZE;
}
+
+ currentBufferOffset = PAGE_HEADER_SIZE;
+ currentBufferLength = bufferLength;
+ currentBuffer = buffer;
}
// Reads the page size from the page header at the given offset.
@@ -275,7 +286,7 @@ final class PageBlobInputStream extends InputStream {
}
int bytesRemainingInCurrentPage = getBytesRemainingInCurrentPage();
int numBytesToRead = Math.min(len, bytesRemainingInCurrentPage);
- System.arraycopy(currentBuffer, currentOffsetInBuffer, outputBuffer,
+ System.arraycopy(currentBuffer, currentBufferOffset, outputBuffer,
offset, numBytesToRead);
numberOfBytesRead += numBytesToRead;
offset += numBytesToRead;
@@ -284,7 +295,7 @@ final class PageBlobInputStream extends InputStream {
// We've finished this page, move on to the next.
advancePagesInBuffer(1);
} else {
- currentOffsetInBuffer += numBytesToRead;
+ currentBufferOffset += numBytesToRead;
}
}
@@ -309,9 +320,26 @@ final class PageBlobInputStream extends InputStream {
}
/**
- * Skips over and discards n bytes of data from this input stream.
- * @param n the number of bytes to be skipped.
- * @return the actual number of bytes skipped.
+ * Skips over and discards n
bytes of data from this input
+ * stream. The skip
method may, for a variety of reasons, end
+ * up skipping over some smaller number of bytes, possibly 0
.
+ * This may result from any of a number of conditions; reaching end of file
+ * before n
bytes have been skipped is only one possibility.
+ * The actual number of bytes skipped is returned. If {@code n} is
+ * negative, the {@code skip} method for class {@code InputStream} always
+ * returns 0, and no bytes are skipped. Subclasses may handle the negative
+ * value differently.
+ *
+ *
The skip
method of this class creates a
+ * byte array and then repeatedly reads into it until n
bytes
+ * have been read or the end of the stream has been reached. Subclasses are
+ * encouraged to provide a more efficient implementation of this method.
+ * For instance, the implementation may depend on the ability to seek.
+ *
+ * @param n the number of bytes to be skipped.
+ * @return the actual number of bytes skipped.
+ * @exception IOException if the stream does not support seek,
+ * or if some other I/O error occurs.
*/
@Override
public synchronized long skip(long n) throws IOException {
@@ -338,18 +366,23 @@ final class PageBlobInputStream extends InputStream {
n -= skippedWithinBuffer;
long skipped = skippedWithinBuffer;
- // Empty the current buffer, we're going beyond it.
- currentBuffer = null;
+ if (n == 0) {
+ return skipped;
+ }
+
+ if (numberOfPagesRemaining == 0) {
+ throw new EOFException(FSExceptionMessages.CANNOT_SEEK_PAST_EOF);
+ } else if (numberOfPagesRemaining > 1) {
+ // skip over as many pages as we can, but we must read the last
+ // page as it may not be full
+ long pagesToSkipOver = Math.min(n / PAGE_DATA_SIZE,
+ numberOfPagesRemaining - 1);
+ numberOfPagesRemaining -= pagesToSkipOver;
+ currentOffsetInBlob += pagesToSkipOver * PAGE_SIZE;
+ skipped += pagesToSkipOver * PAGE_DATA_SIZE;
+ n -= pagesToSkipOver * PAGE_DATA_SIZE;
+ }
- // Skip over whole pages as necessary without retrieving them from the
- // server.
- long pagesToSkipOver = Math.max(0, Math.min(
- n / PAGE_DATA_SIZE,
- numberOfPagesRemaining - 1));
- numberOfPagesRemaining -= pagesToSkipOver;
- currentOffsetInBlob += pagesToSkipOver * PAGE_SIZE;
- skipped += pagesToSkipOver * PAGE_DATA_SIZE;
- n -= pagesToSkipOver * PAGE_DATA_SIZE;
if (n == 0) {
return skipped;
}
@@ -387,14 +420,14 @@ final class PageBlobInputStream extends InputStream {
// Calculate how many whole pages (pages before the possibly partially
// filled last page) remain.
- int currentPageIndex = currentOffsetInBuffer / PAGE_SIZE;
+ int currentPageIndex = currentBufferOffset / PAGE_SIZE;
int numberOfPagesInBuffer = currentBuffer.length / PAGE_SIZE;
int wholePagesRemaining = numberOfPagesInBuffer - currentPageIndex - 1;
if (n < (PAGE_DATA_SIZE * wholePagesRemaining)) {
// I'm within one of the whole pages remaining, skip in there.
advancePagesInBuffer((int) (n / PAGE_DATA_SIZE));
- currentOffsetInBuffer += n % PAGE_DATA_SIZE;
+ currentBufferOffset += n % PAGE_DATA_SIZE;
return n + skipped;
}
@@ -417,8 +450,8 @@ final class PageBlobInputStream extends InputStream {
*/
private long skipWithinCurrentPage(long n) throws IOException {
int remainingBytesInCurrentPage = getBytesRemainingInCurrentPage();
- if (n < remainingBytesInCurrentPage) {
- currentOffsetInBuffer += n;
+ if (n <= remainingBytesInCurrentPage) {
+ currentBufferOffset += n;
return n;
} else {
advancePagesInBuffer(1);
@@ -438,7 +471,7 @@ final class PageBlobInputStream extends InputStream {
// Calculate our current position relative to the start of the current
// page.
int currentDataOffsetInPage =
- (currentOffsetInBuffer % PAGE_SIZE) - PAGE_HEADER_SIZE;
+ (currentBufferOffset % PAGE_SIZE) - PAGE_HEADER_SIZE;
int pageBoundary = getCurrentPageStartInBuffer();
// Get the data size of the current page from the header.
short sizeOfCurrentPage = getPageSize(blob, currentBuffer, pageBoundary);
@@ -454,14 +487,14 @@ final class PageBlobInputStream extends InputStream {
}
private void advancePagesInBuffer(int numberOfPages) {
- currentOffsetInBuffer =
+ currentBufferOffset =
getCurrentPageStartInBuffer()
+ (numberOfPages * PAGE_SIZE)
+ PAGE_HEADER_SIZE;
}
private int getCurrentPageStartInBuffer() {
- return PAGE_SIZE * (currentOffsetInBuffer / PAGE_SIZE);
+ return PAGE_SIZE * (currentBufferOffset / PAGE_SIZE);
}
private static IOException fileCorruptException(CloudPageBlobWrapper blob,
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestPageBlobInputStream.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestPageBlobInputStream.java
new file mode 100644
index 00000000000..8c939fc089a
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestPageBlobInputStream.java
@@ -0,0 +1,527 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azure;
+
+import java.io.EOFException;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.EnumSet;
+import java.util.concurrent.Callable;
+
+import org.junit.FixMethodOrder;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.Timeout;
+import org.junit.runners.MethodSorters;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FSExceptionMessages;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.contract.ContractTestUtils;
+
+import static org.apache.hadoop.test.LambdaTestUtils.intercept;
+
+/**
+ * Test semantics of the page blob input stream
+ */
+@FixMethodOrder(MethodSorters.NAME_ASCENDING)
+
+public class ITestPageBlobInputStream extends AbstractWasbTestBase {
+ private static final Logger LOG = LoggerFactory.getLogger(
+ ITestPageBlobInputStream.class);
+ private static final int KILOBYTE = 1024;
+ private static final int MEGABYTE = KILOBYTE * KILOBYTE;
+ private static final int TEST_FILE_SIZE = 6 * MEGABYTE;
+ private static final Path TEST_FILE_PATH = new Path(
+ "TestPageBlobInputStream.txt");
+
+ private long testFileLength;
+
+ /**
+ * Long test timeout.
+ */
+ @Rule
+ public Timeout testTimeout = new Timeout(10 * 60 * 1000);
+ private FileStatus testFileStatus;
+ private Path hugefile;
+
+ @Override
+ public void setUp() throws Exception {
+ super.setUp();
+ createTestAccount();
+
+ hugefile = fs.makeQualified(TEST_FILE_PATH);
+ try {
+ testFileStatus = fs.getFileStatus(TEST_FILE_PATH);
+ testFileLength = testFileStatus.getLen();
+ } catch (FileNotFoundException e) {
+ // file doesn't exist
+ testFileLength = 0;
+ }
+ }
+
+ @Override
+ protected AzureBlobStorageTestAccount createTestAccount() throws Exception {
+ Configuration conf = new Configuration();
+
+ // Configure the page blob directories key so every file created is a page blob.
+ conf.set(AzureNativeFileSystemStore.KEY_PAGE_BLOB_DIRECTORIES, "/");
+
+ return AzureBlobStorageTestAccount.create(
+ "testpageblobinputstream",
+ EnumSet.of(AzureBlobStorageTestAccount.CreateOptions.CreateContainer),
+ conf,
+ true);
+ }
+
+ /**
+ * Create a test file by repeating the characters in the alphabet.
+ * @throws IOException
+ */
+ private void createTestFileAndSetLength() throws IOException {
+ // To reduce test run time, the test file can be reused.
+ if (fs.exists(TEST_FILE_PATH)) {
+ testFileStatus = fs.getFileStatus(TEST_FILE_PATH);
+ testFileLength = testFileStatus.getLen();
+ LOG.info("Reusing test file: {}", testFileStatus);
+ return;
+ }
+
+ byte[] buffer = new byte[256];
+ for (int i = 0; i < buffer.length; i++) {
+ buffer[i] = (byte) i;
+ }
+
+ LOG.info("Creating test file {} of size: {}", TEST_FILE_PATH,
+ TEST_FILE_SIZE);
+
+ try(FSDataOutputStream outputStream = fs.create(TEST_FILE_PATH)) {
+ int bytesWritten = 0;
+ while (bytesWritten < TEST_FILE_SIZE) {
+ outputStream.write(buffer);
+ bytesWritten += buffer.length;
+ }
+ LOG.info("Closing stream {}", outputStream);
+ outputStream.close();
+ }
+ testFileLength = fs.getFileStatus(TEST_FILE_PATH).getLen();
+ }
+
+ void assumeHugeFileExists() throws IOException {
+ ContractTestUtils.assertPathExists(fs, "huge file not created", hugefile);
+ FileStatus status = fs.getFileStatus(hugefile);
+ ContractTestUtils.assertIsFile(hugefile, status);
+ assertTrue("File " + hugefile + " is empty", status.getLen() > 0);
+ }
+
+ @Test
+ public void test_0100_CreateHugeFile() throws IOException {
+ createTestFileAndSetLength();
+ }
+
+ @Test
+ public void test_0200_BasicReadTest() throws Exception {
+ assumeHugeFileExists();
+
+ try (
+ FSDataInputStream inputStream = fs.open(TEST_FILE_PATH);
+ ) {
+ byte[] buffer = new byte[3 * MEGABYTE];
+
+ // v1 forward seek and read a kilobyte into first kilobyte of buffer
+ long position = 5 * MEGABYTE;
+ inputStream.seek(position);
+ int numBytesRead = inputStream.read(buffer, 0, KILOBYTE);
+ assertEquals(KILOBYTE, numBytesRead);
+
+ byte[] expected = new byte[3 * MEGABYTE];
+
+ for (int i = 0; i < KILOBYTE; i++) {
+ expected[i] = (byte) ((i + position) % 256);
+ }
+
+ assertArrayEquals(expected, buffer);
+
+ int len = MEGABYTE;
+ int offset = buffer.length - len;
+
+ // v1 reverse seek and read a megabyte into last megabyte of buffer
+ position = 3 * MEGABYTE;
+ inputStream.seek(position);
+ numBytesRead = inputStream.read(buffer, offset, len);
+ assertEquals(len, numBytesRead);
+
+ for (int i = offset; i < offset + len; i++) {
+ expected[i] = (byte) ((i + position) % 256);
+ }
+
+ assertArrayEquals(expected, buffer);
+ }
+ }
+
+ @Test
+ public void test_0201_RandomReadTest() throws Exception {
+ assumeHugeFileExists();
+
+ try (
+ FSDataInputStream inputStream = fs.open(TEST_FILE_PATH);
+ ) {
+ final int bufferSize = 4 * KILOBYTE;
+ byte[] buffer = new byte[bufferSize];
+ long position = 0;
+
+ verifyConsistentReads(inputStream, buffer, position);
+
+ inputStream.seek(0);
+
+ verifyConsistentReads(inputStream, buffer, position);
+
+ int seekPosition = 2 * KILOBYTE;
+ inputStream.seek(seekPosition);
+ position = seekPosition;
+ verifyConsistentReads(inputStream, buffer, position);
+
+ inputStream.seek(0);
+ position = 0;
+ verifyConsistentReads(inputStream, buffer, position);
+
+ seekPosition = 5 * KILOBYTE;
+ inputStream.seek(seekPosition);
+ position = seekPosition;
+ verifyConsistentReads(inputStream, buffer, position);
+
+ seekPosition = 10 * KILOBYTE;
+ inputStream.seek(seekPosition);
+ position = seekPosition;
+ verifyConsistentReads(inputStream, buffer, position);
+
+ seekPosition = 4100 * KILOBYTE;
+ inputStream.seek(seekPosition);
+ position = seekPosition;
+ verifyConsistentReads(inputStream, buffer, position);
+
+ for (int i = 4 * 1024 * 1023; i < 5000; i++) {
+ seekPosition = i;
+ inputStream.seek(seekPosition);
+ position = seekPosition;
+ verifyConsistentReads(inputStream, buffer, position);
+ }
+
+ inputStream.seek(0);
+ position = 0;
+ buffer = new byte[1];
+
+ for (int i = 0; i < 5000; i++) {
+ assertEquals(1, inputStream.skip(1));
+ position++;
+ verifyConsistentReads(inputStream, buffer, position);
+ position++;
+ }
+ }
+ }
+
+ private void verifyConsistentReads(FSDataInputStream inputStream,
+ byte[] buffer,
+ long position) throws IOException {
+ int size = buffer.length;
+ final int numBytesRead = inputStream.read(buffer, 0, size);
+ assertEquals("Bytes read from stream", size, numBytesRead);
+
+ byte[] expected = new byte[size];
+ for (int i = 0; i < expected.length; i++) {
+ expected[i] = (byte) ((position + i) % 256);
+ }
+
+ assertArrayEquals("Mismatch", expected, buffer);
+ }
+
+ /**
+ * Validates the implementation of InputStream.markSupported.
+ * @throws IOException
+ */
+ @Test
+ public void test_0301_MarkSupported() throws IOException {
+ assumeHugeFileExists();
+ try (FSDataInputStream inputStream = fs.open(TEST_FILE_PATH)) {
+ assertTrue("mark is not supported", inputStream.markSupported());
+ }
+ }
+
+ /**
+ * Validates the implementation of InputStream.mark and reset
+ * for version 1 of the block blob input stream.
+ * @throws Exception
+ */
+ @Test
+ public void test_0303_MarkAndResetV1() throws Exception {
+ assumeHugeFileExists();
+ try (FSDataInputStream inputStream = fs.open(TEST_FILE_PATH)) {
+ inputStream.mark(KILOBYTE - 1);
+
+ byte[] buffer = new byte[KILOBYTE];
+ int bytesRead = inputStream.read(buffer);
+ assertEquals(buffer.length, bytesRead);
+
+ inputStream.reset();
+ assertEquals("rest -> pos 0", 0, inputStream.getPos());
+
+ inputStream.mark(8 * KILOBYTE - 1);
+
+ buffer = new byte[8 * KILOBYTE];
+ bytesRead = inputStream.read(buffer);
+ assertEquals(buffer.length, bytesRead);
+
+ intercept(IOException.class,
+ "Resetting to invalid mark",
+ new Callable