diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt index 7bfc5faf360..d16262aec14 100644 --- a/hadoop-common-project/hadoop-common/CHANGES.txt +++ b/hadoop-common-project/hadoop-common/CHANGES.txt @@ -356,6 +356,9 @@ Release 2.8.0 - UNRELEASED HADOOP-12054. RPC client should not retry for InvalidToken exceptions. (Varun Saxena via Arpit Agarwal) + HADOOP-12073. Azure FileSystem PageBlobInputStream does not return -1 on + EOF. (Ivan Mitic via cnauroth) + Release 2.7.1 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/AzureNativeFileSystemStore.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/AzureNativeFileSystemStore.java index 69bda06c4e7..7741f17b7f3 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/AzureNativeFileSystemStore.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/AzureNativeFileSystemStore.java @@ -2301,7 +2301,7 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore { throws AzureException { if (blob instanceof CloudPageBlobWrapper) { try { - return PageBlobInputStream.getPageBlobSize((CloudPageBlobWrapper) blob, + return PageBlobInputStream.getPageBlobDataSize((CloudPageBlobWrapper) blob, getInstrumentedContext( isConcurrentOOBAppendAllowed())); } catch (Exception e) { diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/PageBlobInputStream.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/PageBlobInputStream.java index 468ac65d76b..097201b2af3 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/PageBlobInputStream.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/PageBlobInputStream.java @@ -80,7 +80,7 @@ final class PageBlobInputStream extends InputStream { * @throws IOException If the format is corrupt. * @throws StorageException If anything goes wrong in the requests. */ - public static long getPageBlobSize(CloudPageBlobWrapper blob, + public static long getPageBlobDataSize(CloudPageBlobWrapper blob, OperationContext opContext) throws IOException, StorageException { // Get the page ranges for the blob. There should be one range starting // at byte 0, but we tolerate (and ignore) ranges after the first one. @@ -156,7 +156,7 @@ final class PageBlobInputStream extends InputStream { } if (pageBlobSize == -1) { try { - pageBlobSize = getPageBlobSize(blob, opContext); + pageBlobSize = getPageBlobDataSize(blob, opContext); } catch (StorageException e) { throw new IOException("Unable to get page blob size.", e); } @@ -179,7 +179,13 @@ final class PageBlobInputStream extends InputStream { /** * Check our buffer and download more from the server if needed. - * @return true if there's more data in the buffer, false if we're done. + * If data is not available in the buffer, method downloads maximum + * page blob download size (4MB) or if there is less then 4MB left, + * all remaining pages. + * If we are on the last page, method will return true even if + * we reached the end of stream. + * @return true if there's more data in the buffer, false if buffer is empty + * and we reached the end of the blob. * @throws IOException */ private synchronized boolean ensureDataInBuffer() throws IOException { @@ -257,11 +263,15 @@ final class PageBlobInputStream extends InputStream { @Override public synchronized int read(byte[] outputBuffer, int offset, int len) throws IOException { + // If len is zero return 0 per the InputStream contract + if (len == 0) { + return 0; + } + int numberOfBytesRead = 0; while (len > 0) { if (!ensureDataInBuffer()) { - filePosition += numberOfBytesRead; - return numberOfBytesRead; + break; } int bytesRemainingInCurrentPage = getBytesRemainingInCurrentPage(); int numBytesToRead = Math.min(len, bytesRemainingInCurrentPage); @@ -277,6 +287,13 @@ final class PageBlobInputStream extends InputStream { currentOffsetInBuffer += numBytesToRead; } } + + // if outputBuffer len is > 0 and zero bytes were read, we reached + // an EOF + if (numberOfBytesRead == 0) { + return -1; + } + filePosition += numberOfBytesRead; return numberOfBytesRead; } @@ -284,8 +301,9 @@ final class PageBlobInputStream extends InputStream { @Override public int read() throws IOException { byte[] oneByte = new byte[1]; - if (read(oneByte) == 0) { - return -1; + int result = read(oneByte); + if (result < 0) { + return result; } return oneByte[0]; } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/PageBlobOutputStream.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/PageBlobOutputStream.java index 2b8846c7f73..868937541c8 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/PageBlobOutputStream.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/PageBlobOutputStream.java @@ -117,6 +117,8 @@ final class PageBlobOutputStream extends OutputStream implements Syncable { // The last task given to the ioThreadPool to execute, to allow // waiting until it's done. private WriteRequest lastQueuedTask; + // Whether the stream has been closed. + private boolean closed = false; public static final Log LOG = LogFactory.getLog(AzureNativeFileSystemStore.class); @@ -201,7 +203,11 @@ final class PageBlobOutputStream extends OutputStream implements Syncable { * service. */ @Override - public void close() throws IOException { + public synchronized void close() throws IOException { + if (closed) { + return; + } + LOG.debug("Closing page blob output stream."); flush(); checkStreamState(); @@ -221,7 +227,7 @@ final class PageBlobOutputStream extends OutputStream implements Syncable { Thread.currentThread().interrupt(); } - this.lastError = new IOException("Stream is already closed."); + closed = true; } // Log the stacks of all threads. diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/NativeAzureFileSystemBaseTest.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/NativeAzureFileSystemBaseTest.java index 9ce6cc9b81b..6989a700f7e 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/NativeAzureFileSystemBaseTest.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/NativeAzureFileSystemBaseTest.java @@ -41,7 +41,6 @@ import java.util.TimeZone; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; @@ -54,7 +53,6 @@ import org.apache.hadoop.security.UserGroupInformation; import org.junit.After; import org.junit.Before; import org.junit.Test; - import org.apache.hadoop.fs.azure.AzureException; import org.apache.hadoop.fs.azure.NativeAzureFileSystem.FolderRenamePending; @@ -472,6 +470,83 @@ public abstract class NativeAzureFileSystemBaseTest { } } + @Test + public void testInputStreamReadWithZeroSizeBuffer() throws Exception { + Path newFile = new Path("zeroSizeRead"); + OutputStream output = fs.create(newFile); + output.write(10); + output.close(); + + InputStream input = fs.open(newFile); + int result = input.read(new byte[2], 0, 0); + assertEquals(0, result); + } + + @Test + public void testInputStreamReadWithBufferReturnsMinusOneOnEof() throws Exception { + Path newFile = new Path("eofRead"); + OutputStream output = fs.create(newFile); + output.write(10); + output.close(); + + // Read first byte back + InputStream input = fs.open(newFile); + byte[] buff = new byte[1]; + int result = input.read(buff, 0, 1); + assertEquals(1, result); + assertEquals(10, buff[0]); + + // Issue another read and make sure it returns -1 + buff[0] = 2; + result = input.read(buff, 0, 1); + assertEquals(-1, result); + // Buffer is intact + assertEquals(2, buff[0]); + } + + @Test + public void testInputStreamReadWithBufferReturnsMinusOneOnEofForLargeBuffer() throws Exception { + Path newFile = new Path("eofRead2"); + OutputStream output = fs.create(newFile); + byte[] outputBuff = new byte[97331]; + for(int i = 0; i < outputBuff.length; ++i) { + outputBuff[i] = (byte)(Math.random() * 255); + } + output.write(outputBuff); + output.close(); + + // Read the content of the file + InputStream input = fs.open(newFile); + byte[] buff = new byte[131072]; + int result = input.read(buff, 0, buff.length); + assertEquals(outputBuff.length, result); + for(int i = 0; i < outputBuff.length; ++i) { + assertEquals(outputBuff[i], buff[i]); + } + + // Issue another read and make sure it returns -1 + buff = new byte[131072]; + result = input.read(buff, 0, buff.length); + assertEquals(-1, result); + } + + @Test + public void testInputStreamReadIntReturnsMinusOneOnEof() throws Exception { + Path newFile = new Path("eofRead3"); + OutputStream output = fs.create(newFile); + output.write(10); + output.close(); + + // Read first byte back + InputStream input = fs.open(newFile); + int value = input.read(); + assertEquals(10, value); + + // Issue another read and make sure it returns -1 + value = input.read(); + assertEquals(-1, value); + } + @Test public void testSetPermissionOnFile() throws Exception { Path newFile = new Path("testPermission"); diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestNativeAzureFileSystemContractPageBlobLive.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestNativeAzureFileSystemContractPageBlobLive.java new file mode 100644 index 00000000000..3c3b782dacb --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestNativeAzureFileSystemContractPageBlobLive.java @@ -0,0 +1,90 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azure; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystemContractBaseTest; +import org.junit.Ignore; + +public class TestNativeAzureFileSystemContractPageBlobLive extends + FileSystemContractBaseTest { + private AzureBlobStorageTestAccount testAccount; + + private AzureBlobStorageTestAccount createTestAccount() + throws Exception { + Configuration conf = new Configuration(); + + // Configure the page blob directories key so every file created is a page blob. + conf.set(AzureNativeFileSystemStore.KEY_PAGE_BLOB_DIRECTORIES, "/"); + + // Configure the atomic rename directories key so every folder will have + // atomic rename applied. + conf.set(AzureNativeFileSystemStore.KEY_ATOMIC_RENAME_DIRECTORIES, "/"); + return AzureBlobStorageTestAccount.create(conf); + } + + @Override + protected void setUp() throws Exception { + testAccount = createTestAccount(); + if (testAccount != null) { + fs = testAccount.getFileSystem(); + } + } + + @Override + protected void tearDown() throws Exception { + if (testAccount != null) { + testAccount.cleanup(); + testAccount = null; + fs = null; + } + } + + @Override + protected void runTest() throws Throwable { + if (testAccount != null) { + super.runTest(); + } + } + + /** + * The following tests are failing on Azure and the Azure + * file system code needs to be modified to make them pass. + * A separate work item has been opened for this. + */ + @Ignore + public void testMoveFileUnderParent() throws Throwable { + } + + @Ignore + public void testRenameFileToSelf() throws Throwable { + } + + @Ignore + public void testRenameChildDirForbidden() throws Exception { + } + + @Ignore + public void testMoveDirUnderParent() throws Throwable { + } + + @Ignore + public void testRenameDirToSelf() throws Throwable { + } +}