From 67468651b19ae9de04d20decb79c23e2541df584 Mon Sep 17 00:00:00 2001 From: Steve Loughran Date: Mon, 7 May 2018 11:54:08 +0100 Subject: [PATCH] HADOOP-15446. WASB: PageBlobInputStream.skip breaks HBASE replication. Contributed by Thomas Marquardt (cherry picked from commit 5b11b9fd413470e134ecdc7c50468f8c7b39fa50) --- .../hadoop/fs/azure/PageBlobInputStream.java | 123 ++-- .../fs/azure/ITestPageBlobInputStream.java | 527 ++++++++++++++++++ 2 files changed, 605 insertions(+), 45 deletions(-) create mode 100644 hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestPageBlobInputStream.java diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/PageBlobInputStream.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/PageBlobInputStream.java index aaac4904133..40bf6f4ae1a 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/PageBlobInputStream.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/PageBlobInputStream.java @@ -25,12 +25,14 @@ import static org.apache.hadoop.fs.azure.PageBlobFormatHelpers.toShort; import static org.apache.hadoop.fs.azure.PageBlobFormatHelpers.withMD5Checking; import java.io.ByteArrayOutputStream; +import java.io.EOFException; import java.io.IOException; import java.io.InputStream; import java.util.ArrayList; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.FSExceptionMessages; import org.apache.hadoop.fs.azure.StorageInterface.CloudPageBlobWrapper; import com.microsoft.azure.storage.OperationContext; @@ -58,7 +60,9 @@ final class PageBlobInputStream extends InputStream { // The buffer holding the current data we last read from the server. private byte[] currentBuffer; // The current byte offset we're at in the buffer. - private int currentOffsetInBuffer; + private int currentBufferOffset; + // The current buffer length + private int currentBufferLength; // Maximum number of pages to get per any one request. private static final int MAX_PAGES_PER_DOWNLOAD = 4 * 1024 * 1024 / PAGE_SIZE; @@ -174,7 +178,7 @@ final class PageBlobInputStream extends InputStream { private boolean dataAvailableInBuffer() { return currentBuffer != null - && currentOffsetInBuffer < currentBuffer.length; + && currentBufferOffset < currentBufferLength; } /** @@ -194,6 +198,8 @@ final class PageBlobInputStream extends InputStream { return true; } currentBuffer = null; + currentBufferOffset = 0; + currentBufferLength = 0; if (numberOfPagesRemaining == 0) { // No more data to read. return false; @@ -209,43 +215,48 @@ final class PageBlobInputStream extends InputStream { ByteArrayOutputStream baos = new ByteArrayOutputStream(bufferSize); blob.downloadRange(currentOffsetInBlob, bufferSize, baos, withMD5Checking(), opContext); - currentBuffer = baos.toByteArray(); + validateDataIntegrity(baos.toByteArray()); } catch (StorageException e) { throw new IOException(e); } numberOfPagesRemaining -= pagesToRead; currentOffsetInBlob += bufferSize; - currentOffsetInBuffer = PAGE_HEADER_SIZE; - - // Since we just downloaded a new buffer, validate its consistency. - validateCurrentBufferConsistency(); return true; } - private void validateCurrentBufferConsistency() + private void validateDataIntegrity(byte[] buffer) throws IOException { - if (currentBuffer.length % PAGE_SIZE != 0) { + + if (buffer.length % PAGE_SIZE != 0) { throw new AssertionError("Unexpected buffer size: " - + currentBuffer.length); + + buffer.length); } - int numberOfPages = currentBuffer.length / PAGE_SIZE; + + int bufferLength = 0; + int numberOfPages = buffer.length / PAGE_SIZE; + long totalPagesAfterCurrent = numberOfPagesRemaining; + for (int page = 0; page < numberOfPages; page++) { - short currentPageSize = getPageSize(blob, currentBuffer, - page * PAGE_SIZE); - // Calculate the number of pages that exist after this one - // in the blob. - long totalPagesAfterCurrent = - (numberOfPages - page - 1) + numberOfPagesRemaining; - // Only the last page is allowed to be not filled completely. - if (currentPageSize < PAGE_DATA_SIZE + // Calculate the number of pages that exist in the blob after this one + totalPagesAfterCurrent--; + + short currentPageSize = getPageSize(blob, buffer, page * PAGE_SIZE); + + // Only the last page can be partially filled. + if (currentPageSize < PAGE_DATA_SIZE && totalPagesAfterCurrent > 0) { throw fileCorruptException(blob, String.format( - "Page with partial data found in the middle (%d pages from the" - + " end) that only has %d bytes of data.", - totalPagesAfterCurrent, currentPageSize)); + "Page with partial data found in the middle (%d pages from the" + + " end) that only has %d bytes of data.", + totalPagesAfterCurrent, currentPageSize)); } + bufferLength += currentPageSize + PAGE_HEADER_SIZE; } + + currentBufferOffset = PAGE_HEADER_SIZE; + currentBufferLength = bufferLength; + currentBuffer = buffer; } // Reads the page size from the page header at the given offset. @@ -275,7 +286,7 @@ final class PageBlobInputStream extends InputStream { } int bytesRemainingInCurrentPage = getBytesRemainingInCurrentPage(); int numBytesToRead = Math.min(len, bytesRemainingInCurrentPage); - System.arraycopy(currentBuffer, currentOffsetInBuffer, outputBuffer, + System.arraycopy(currentBuffer, currentBufferOffset, outputBuffer, offset, numBytesToRead); numberOfBytesRead += numBytesToRead; offset += numBytesToRead; @@ -284,7 +295,7 @@ final class PageBlobInputStream extends InputStream { // We've finished this page, move on to the next. advancePagesInBuffer(1); } else { - currentOffsetInBuffer += numBytesToRead; + currentBufferOffset += numBytesToRead; } } @@ -309,9 +320,26 @@ final class PageBlobInputStream extends InputStream { } /** - * Skips over and discards n bytes of data from this input stream. - * @param n the number of bytes to be skipped. - * @return the actual number of bytes skipped. + * Skips over and discards n bytes of data from this input + * stream. The skip method may, for a variety of reasons, end + * up skipping over some smaller number of bytes, possibly 0. + * This may result from any of a number of conditions; reaching end of file + * before n bytes have been skipped is only one possibility. + * The actual number of bytes skipped is returned. If {@code n} is + * negative, the {@code skip} method for class {@code InputStream} always + * returns 0, and no bytes are skipped. Subclasses may handle the negative + * value differently. + * + *

The skip method of this class creates a + * byte array and then repeatedly reads into it until n bytes + * have been read or the end of the stream has been reached. Subclasses are + * encouraged to provide a more efficient implementation of this method. + * For instance, the implementation may depend on the ability to seek. + * + * @param n the number of bytes to be skipped. + * @return the actual number of bytes skipped. + * @exception IOException if the stream does not support seek, + * or if some other I/O error occurs. */ @Override public synchronized long skip(long n) throws IOException { @@ -338,18 +366,23 @@ final class PageBlobInputStream extends InputStream { n -= skippedWithinBuffer; long skipped = skippedWithinBuffer; - // Empty the current buffer, we're going beyond it. - currentBuffer = null; + if (n == 0) { + return skipped; + } + + if (numberOfPagesRemaining == 0) { + throw new EOFException(FSExceptionMessages.CANNOT_SEEK_PAST_EOF); + } else if (numberOfPagesRemaining > 1) { + // skip over as many pages as we can, but we must read the last + // page as it may not be full + long pagesToSkipOver = Math.min(n / PAGE_DATA_SIZE, + numberOfPagesRemaining - 1); + numberOfPagesRemaining -= pagesToSkipOver; + currentOffsetInBlob += pagesToSkipOver * PAGE_SIZE; + skipped += pagesToSkipOver * PAGE_DATA_SIZE; + n -= pagesToSkipOver * PAGE_DATA_SIZE; + } - // Skip over whole pages as necessary without retrieving them from the - // server. - long pagesToSkipOver = Math.max(0, Math.min( - n / PAGE_DATA_SIZE, - numberOfPagesRemaining - 1)); - numberOfPagesRemaining -= pagesToSkipOver; - currentOffsetInBlob += pagesToSkipOver * PAGE_SIZE; - skipped += pagesToSkipOver * PAGE_DATA_SIZE; - n -= pagesToSkipOver * PAGE_DATA_SIZE; if (n == 0) { return skipped; } @@ -387,14 +420,14 @@ final class PageBlobInputStream extends InputStream { // Calculate how many whole pages (pages before the possibly partially // filled last page) remain. - int currentPageIndex = currentOffsetInBuffer / PAGE_SIZE; + int currentPageIndex = currentBufferOffset / PAGE_SIZE; int numberOfPagesInBuffer = currentBuffer.length / PAGE_SIZE; int wholePagesRemaining = numberOfPagesInBuffer - currentPageIndex - 1; if (n < (PAGE_DATA_SIZE * wholePagesRemaining)) { // I'm within one of the whole pages remaining, skip in there. advancePagesInBuffer((int) (n / PAGE_DATA_SIZE)); - currentOffsetInBuffer += n % PAGE_DATA_SIZE; + currentBufferOffset += n % PAGE_DATA_SIZE; return n + skipped; } @@ -417,8 +450,8 @@ final class PageBlobInputStream extends InputStream { */ private long skipWithinCurrentPage(long n) throws IOException { int remainingBytesInCurrentPage = getBytesRemainingInCurrentPage(); - if (n < remainingBytesInCurrentPage) { - currentOffsetInBuffer += n; + if (n <= remainingBytesInCurrentPage) { + currentBufferOffset += n; return n; } else { advancePagesInBuffer(1); @@ -438,7 +471,7 @@ final class PageBlobInputStream extends InputStream { // Calculate our current position relative to the start of the current // page. int currentDataOffsetInPage = - (currentOffsetInBuffer % PAGE_SIZE) - PAGE_HEADER_SIZE; + (currentBufferOffset % PAGE_SIZE) - PAGE_HEADER_SIZE; int pageBoundary = getCurrentPageStartInBuffer(); // Get the data size of the current page from the header. short sizeOfCurrentPage = getPageSize(blob, currentBuffer, pageBoundary); @@ -454,14 +487,14 @@ final class PageBlobInputStream extends InputStream { } private void advancePagesInBuffer(int numberOfPages) { - currentOffsetInBuffer = + currentBufferOffset = getCurrentPageStartInBuffer() + (numberOfPages * PAGE_SIZE) + PAGE_HEADER_SIZE; } private int getCurrentPageStartInBuffer() { - return PAGE_SIZE * (currentOffsetInBuffer / PAGE_SIZE); + return PAGE_SIZE * (currentBufferOffset / PAGE_SIZE); } private static IOException fileCorruptException(CloudPageBlobWrapper blob, diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestPageBlobInputStream.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestPageBlobInputStream.java new file mode 100644 index 00000000000..8c939fc089a --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestPageBlobInputStream.java @@ -0,0 +1,527 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azure; + +import java.io.EOFException; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.util.EnumSet; +import java.util.concurrent.Callable; + +import org.junit.FixMethodOrder; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.Timeout; +import org.junit.runners.MethodSorters; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FSExceptionMessages; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.contract.ContractTestUtils; + +import static org.apache.hadoop.test.LambdaTestUtils.intercept; + +/** + * Test semantics of the page blob input stream + */ +@FixMethodOrder(MethodSorters.NAME_ASCENDING) + +public class ITestPageBlobInputStream extends AbstractWasbTestBase { + private static final Logger LOG = LoggerFactory.getLogger( + ITestPageBlobInputStream.class); + private static final int KILOBYTE = 1024; + private static final int MEGABYTE = KILOBYTE * KILOBYTE; + private static final int TEST_FILE_SIZE = 6 * MEGABYTE; + private static final Path TEST_FILE_PATH = new Path( + "TestPageBlobInputStream.txt"); + + private long testFileLength; + + /** + * Long test timeout. + */ + @Rule + public Timeout testTimeout = new Timeout(10 * 60 * 1000); + private FileStatus testFileStatus; + private Path hugefile; + + @Override + public void setUp() throws Exception { + super.setUp(); + createTestAccount(); + + hugefile = fs.makeQualified(TEST_FILE_PATH); + try { + testFileStatus = fs.getFileStatus(TEST_FILE_PATH); + testFileLength = testFileStatus.getLen(); + } catch (FileNotFoundException e) { + // file doesn't exist + testFileLength = 0; + } + } + + @Override + protected AzureBlobStorageTestAccount createTestAccount() throws Exception { + Configuration conf = new Configuration(); + + // Configure the page blob directories key so every file created is a page blob. + conf.set(AzureNativeFileSystemStore.KEY_PAGE_BLOB_DIRECTORIES, "/"); + + return AzureBlobStorageTestAccount.create( + "testpageblobinputstream", + EnumSet.of(AzureBlobStorageTestAccount.CreateOptions.CreateContainer), + conf, + true); + } + + /** + * Create a test file by repeating the characters in the alphabet. + * @throws IOException + */ + private void createTestFileAndSetLength() throws IOException { + // To reduce test run time, the test file can be reused. + if (fs.exists(TEST_FILE_PATH)) { + testFileStatus = fs.getFileStatus(TEST_FILE_PATH); + testFileLength = testFileStatus.getLen(); + LOG.info("Reusing test file: {}", testFileStatus); + return; + } + + byte[] buffer = new byte[256]; + for (int i = 0; i < buffer.length; i++) { + buffer[i] = (byte) i; + } + + LOG.info("Creating test file {} of size: {}", TEST_FILE_PATH, + TEST_FILE_SIZE); + + try(FSDataOutputStream outputStream = fs.create(TEST_FILE_PATH)) { + int bytesWritten = 0; + while (bytesWritten < TEST_FILE_SIZE) { + outputStream.write(buffer); + bytesWritten += buffer.length; + } + LOG.info("Closing stream {}", outputStream); + outputStream.close(); + } + testFileLength = fs.getFileStatus(TEST_FILE_PATH).getLen(); + } + + void assumeHugeFileExists() throws IOException { + ContractTestUtils.assertPathExists(fs, "huge file not created", hugefile); + FileStatus status = fs.getFileStatus(hugefile); + ContractTestUtils.assertIsFile(hugefile, status); + assertTrue("File " + hugefile + " is empty", status.getLen() > 0); + } + + @Test + public void test_0100_CreateHugeFile() throws IOException { + createTestFileAndSetLength(); + } + + @Test + public void test_0200_BasicReadTest() throws Exception { + assumeHugeFileExists(); + + try ( + FSDataInputStream inputStream = fs.open(TEST_FILE_PATH); + ) { + byte[] buffer = new byte[3 * MEGABYTE]; + + // v1 forward seek and read a kilobyte into first kilobyte of buffer + long position = 5 * MEGABYTE; + inputStream.seek(position); + int numBytesRead = inputStream.read(buffer, 0, KILOBYTE); + assertEquals(KILOBYTE, numBytesRead); + + byte[] expected = new byte[3 * MEGABYTE]; + + for (int i = 0; i < KILOBYTE; i++) { + expected[i] = (byte) ((i + position) % 256); + } + + assertArrayEquals(expected, buffer); + + int len = MEGABYTE; + int offset = buffer.length - len; + + // v1 reverse seek and read a megabyte into last megabyte of buffer + position = 3 * MEGABYTE; + inputStream.seek(position); + numBytesRead = inputStream.read(buffer, offset, len); + assertEquals(len, numBytesRead); + + for (int i = offset; i < offset + len; i++) { + expected[i] = (byte) ((i + position) % 256); + } + + assertArrayEquals(expected, buffer); + } + } + + @Test + public void test_0201_RandomReadTest() throws Exception { + assumeHugeFileExists(); + + try ( + FSDataInputStream inputStream = fs.open(TEST_FILE_PATH); + ) { + final int bufferSize = 4 * KILOBYTE; + byte[] buffer = new byte[bufferSize]; + long position = 0; + + verifyConsistentReads(inputStream, buffer, position); + + inputStream.seek(0); + + verifyConsistentReads(inputStream, buffer, position); + + int seekPosition = 2 * KILOBYTE; + inputStream.seek(seekPosition); + position = seekPosition; + verifyConsistentReads(inputStream, buffer, position); + + inputStream.seek(0); + position = 0; + verifyConsistentReads(inputStream, buffer, position); + + seekPosition = 5 * KILOBYTE; + inputStream.seek(seekPosition); + position = seekPosition; + verifyConsistentReads(inputStream, buffer, position); + + seekPosition = 10 * KILOBYTE; + inputStream.seek(seekPosition); + position = seekPosition; + verifyConsistentReads(inputStream, buffer, position); + + seekPosition = 4100 * KILOBYTE; + inputStream.seek(seekPosition); + position = seekPosition; + verifyConsistentReads(inputStream, buffer, position); + + for (int i = 4 * 1024 * 1023; i < 5000; i++) { + seekPosition = i; + inputStream.seek(seekPosition); + position = seekPosition; + verifyConsistentReads(inputStream, buffer, position); + } + + inputStream.seek(0); + position = 0; + buffer = new byte[1]; + + for (int i = 0; i < 5000; i++) { + assertEquals(1, inputStream.skip(1)); + position++; + verifyConsistentReads(inputStream, buffer, position); + position++; + } + } + } + + private void verifyConsistentReads(FSDataInputStream inputStream, + byte[] buffer, + long position) throws IOException { + int size = buffer.length; + final int numBytesRead = inputStream.read(buffer, 0, size); + assertEquals("Bytes read from stream", size, numBytesRead); + + byte[] expected = new byte[size]; + for (int i = 0; i < expected.length; i++) { + expected[i] = (byte) ((position + i) % 256); + } + + assertArrayEquals("Mismatch", expected, buffer); + } + + /** + * Validates the implementation of InputStream.markSupported. + * @throws IOException + */ + @Test + public void test_0301_MarkSupported() throws IOException { + assumeHugeFileExists(); + try (FSDataInputStream inputStream = fs.open(TEST_FILE_PATH)) { + assertTrue("mark is not supported", inputStream.markSupported()); + } + } + + /** + * Validates the implementation of InputStream.mark and reset + * for version 1 of the block blob input stream. + * @throws Exception + */ + @Test + public void test_0303_MarkAndResetV1() throws Exception { + assumeHugeFileExists(); + try (FSDataInputStream inputStream = fs.open(TEST_FILE_PATH)) { + inputStream.mark(KILOBYTE - 1); + + byte[] buffer = new byte[KILOBYTE]; + int bytesRead = inputStream.read(buffer); + assertEquals(buffer.length, bytesRead); + + inputStream.reset(); + assertEquals("rest -> pos 0", 0, inputStream.getPos()); + + inputStream.mark(8 * KILOBYTE - 1); + + buffer = new byte[8 * KILOBYTE]; + bytesRead = inputStream.read(buffer); + assertEquals(buffer.length, bytesRead); + + intercept(IOException.class, + "Resetting to invalid mark", + new Callable() { + @Override + public FSDataInputStream call() throws Exception { + inputStream.reset(); + return inputStream; + } + } + ); + } + } + + /** + * Validates the implementation of Seekable.seekToNewSource, which should + * return false for version 1 of the block blob input stream. + * @throws IOException + */ + @Test + public void test_0305_SeekToNewSourceV1() throws IOException { + assumeHugeFileExists(); + try (FSDataInputStream inputStream = fs.open(TEST_FILE_PATH)) { + assertFalse(inputStream.seekToNewSource(0)); + } + } + + /** + * Validates the implementation of InputStream.skip and ensures there is no + * network I/O for version 1 of the block blob input stream. + * @throws Exception + */ + @Test + public void test_0307_SkipBounds() throws Exception { + assumeHugeFileExists(); + try (FSDataInputStream inputStream = fs.open(TEST_FILE_PATH)) { + long skipped = inputStream.skip(-1); + assertEquals(0, skipped); + + skipped = inputStream.skip(0); + assertEquals(0, skipped); + + assertTrue(testFileLength > 0); + + skipped = inputStream.skip(testFileLength); + assertEquals(testFileLength, skipped); + + intercept(EOFException.class, + new Callable() { + @Override + public Long call() throws Exception { + return inputStream.skip(1); + } + } + ); + } + } + + /** + * Validates the implementation of Seekable.seek and ensures there is no + * network I/O for forward seek. + * @throws Exception + */ + @Test + public void test_0309_SeekBounds() throws Exception { + assumeHugeFileExists(); + try ( + FSDataInputStream inputStream = fs.open(TEST_FILE_PATH); + ) { + inputStream.seek(0); + assertEquals(0, inputStream.getPos()); + + intercept(EOFException.class, + FSExceptionMessages.NEGATIVE_SEEK, + new Callable() { + @Override + public FSDataInputStream call() throws Exception { + inputStream.seek(-1); + return inputStream; + } + } + ); + + assertTrue("Test file length only " + testFileLength, testFileLength > 0); + inputStream.seek(testFileLength); + assertEquals(testFileLength, inputStream.getPos()); + + intercept(EOFException.class, + FSExceptionMessages.CANNOT_SEEK_PAST_EOF, + new Callable() { + @Override + public FSDataInputStream call() throws Exception { + inputStream.seek(testFileLength + 1); + return inputStream; + } + } + ); + } + } + + /** + * Validates the implementation of Seekable.seek, Seekable.getPos, + * and InputStream.available. + * @throws Exception + */ + @Test + public void test_0311_SeekAndAvailableAndPosition() throws Exception { + assumeHugeFileExists(); + try (FSDataInputStream inputStream = fs.open(TEST_FILE_PATH)) { + byte[] expected1 = {0, 1, 2}; + byte[] expected2 = {3, 4, 5}; + byte[] expected3 = {1, 2, 3}; + byte[] expected4 = {6, 7, 8}; + byte[] buffer = new byte[3]; + + int bytesRead = inputStream.read(buffer); + assertEquals(buffer.length, bytesRead); + assertArrayEquals(expected1, buffer); + assertEquals(buffer.length, inputStream.getPos()); + assertEquals(testFileLength - inputStream.getPos(), + inputStream.available()); + + bytesRead = inputStream.read(buffer); + assertEquals(buffer.length, bytesRead); + assertArrayEquals(expected2, buffer); + assertEquals(2 * buffer.length, inputStream.getPos()); + assertEquals(testFileLength - inputStream.getPos(), + inputStream.available()); + + // reverse seek + int seekPos = 0; + inputStream.seek(seekPos); + + bytesRead = inputStream.read(buffer); + assertEquals(buffer.length, bytesRead); + assertArrayEquals(expected1, buffer); + assertEquals(buffer.length + seekPos, inputStream.getPos()); + assertEquals(testFileLength - inputStream.getPos(), + inputStream.available()); + + // reverse seek + seekPos = 1; + inputStream.seek(seekPos); + + bytesRead = inputStream.read(buffer); + assertEquals(buffer.length, bytesRead); + assertArrayEquals(expected3, buffer); + assertEquals(buffer.length + seekPos, inputStream.getPos()); + assertEquals(testFileLength - inputStream.getPos(), + inputStream.available()); + + // forward seek + seekPos = 6; + inputStream.seek(seekPos); + + bytesRead = inputStream.read(buffer); + assertEquals(buffer.length, bytesRead); + assertArrayEquals(expected4, buffer); + assertEquals(buffer.length + seekPos, inputStream.getPos()); + assertEquals(testFileLength - inputStream.getPos(), + inputStream.available()); + } + } + + /** + * Validates the implementation of InputStream.skip, Seekable.getPos, + * and InputStream.available. + * @throws IOException + */ + @Test + public void test_0313_SkipAndAvailableAndPosition() throws IOException { + assumeHugeFileExists(); + try ( + FSDataInputStream inputStream = fs.open(TEST_FILE_PATH); + ) { + byte[] expected1 = {0, 1, 2}; + byte[] expected2 = {3, 4, 5}; + byte[] expected3 = {1, 2, 3}; + byte[] expected4 = {6, 7, 8}; + assertEquals(testFileLength, inputStream.available()); + assertEquals(0, inputStream.getPos()); + + int n = 3; + long skipped = inputStream.skip(n); + + assertEquals(skipped, inputStream.getPos()); + assertEquals(testFileLength - inputStream.getPos(), + inputStream.available()); + assertEquals(skipped, n); + + byte[] buffer = new byte[3]; + int bytesRead = inputStream.read(buffer); + assertEquals(buffer.length, bytesRead); + assertArrayEquals(expected2, buffer); + assertEquals(buffer.length + skipped, inputStream.getPos()); + assertEquals(testFileLength - inputStream.getPos(), + inputStream.available()); + + // does skip still work after seek? + int seekPos = 1; + inputStream.seek(seekPos); + + bytesRead = inputStream.read(buffer); + assertEquals(buffer.length, bytesRead); + assertArrayEquals(expected3, buffer); + assertEquals(buffer.length + seekPos, inputStream.getPos()); + assertEquals(testFileLength - inputStream.getPos(), + inputStream.available()); + + long currentPosition = inputStream.getPos(); + n = 2; + skipped = inputStream.skip(n); + + assertEquals(currentPosition + skipped, inputStream.getPos()); + assertEquals(testFileLength - inputStream.getPos(), + inputStream.available()); + assertEquals(skipped, n); + + bytesRead = inputStream.read(buffer); + assertEquals(buffer.length, bytesRead); + assertArrayEquals(expected4, buffer); + assertEquals(buffer.length + skipped + currentPosition, + inputStream.getPos()); + assertEquals(testFileLength - inputStream.getPos(), + inputStream.available()); + } + } + + @Test + public void test_999_DeleteHugeFiles() throws IOException { + fs.delete(TEST_FILE_PATH, false); + } + +}