HADOOP-12073. Azure FileSystem PageBlobInputStream does not return -1 on EOF. Contributed by Ivan Mitic.

This commit is contained in:
cnauroth 2015-06-08 22:42:14 -07:00
parent 927577c87c
commit c45784bc90
6 changed files with 204 additions and 12 deletions

View File

@ -843,6 +843,9 @@ Release 2.8.0 - UNRELEASED
HADOOP-12054. RPC client should not retry for InvalidToken exceptions. HADOOP-12054. RPC client should not retry for InvalidToken exceptions.
(Varun Saxena via Arpit Agarwal) (Varun Saxena via Arpit Agarwal)
HADOOP-12073. Azure FileSystem PageBlobInputStream does not return -1 on
EOF. (Ivan Mitic via cnauroth)
Release 2.7.1 - UNRELEASED Release 2.7.1 - UNRELEASED
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -2301,7 +2301,7 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
throws AzureException { throws AzureException {
if (blob instanceof CloudPageBlobWrapper) { if (blob instanceof CloudPageBlobWrapper) {
try { try {
return PageBlobInputStream.getPageBlobSize((CloudPageBlobWrapper) blob, return PageBlobInputStream.getPageBlobDataSize((CloudPageBlobWrapper) blob,
getInstrumentedContext( getInstrumentedContext(
isConcurrentOOBAppendAllowed())); isConcurrentOOBAppendAllowed()));
} catch (Exception e) { } catch (Exception e) {

View File

@ -80,7 +80,7 @@ final class PageBlobInputStream extends InputStream {
* @throws IOException If the format is corrupt. * @throws IOException If the format is corrupt.
* @throws StorageException If anything goes wrong in the requests. * @throws StorageException If anything goes wrong in the requests.
*/ */
public static long getPageBlobSize(CloudPageBlobWrapper blob, public static long getPageBlobDataSize(CloudPageBlobWrapper blob,
OperationContext opContext) throws IOException, StorageException { OperationContext opContext) throws IOException, StorageException {
// Get the page ranges for the blob. There should be one range starting // Get the page ranges for the blob. There should be one range starting
// at byte 0, but we tolerate (and ignore) ranges after the first one. // at byte 0, but we tolerate (and ignore) ranges after the first one.
@ -156,7 +156,7 @@ final class PageBlobInputStream extends InputStream {
} }
if (pageBlobSize == -1) { if (pageBlobSize == -1) {
try { try {
pageBlobSize = getPageBlobSize(blob, opContext); pageBlobSize = getPageBlobDataSize(blob, opContext);
} catch (StorageException e) { } catch (StorageException e) {
throw new IOException("Unable to get page blob size.", e); throw new IOException("Unable to get page blob size.", e);
} }
@ -179,7 +179,13 @@ final class PageBlobInputStream extends InputStream {
/** /**
* Check our buffer and download more from the server if needed. * Check our buffer and download more from the server if needed.
* @return true if there's more data in the buffer, false if we're done. * If data is not available in the buffer, method downloads maximum
* page blob download size (4MB) or if there is less then 4MB left,
* all remaining pages.
* If we are on the last page, method will return true even if
* we reached the end of stream.
* @return true if there's more data in the buffer, false if buffer is empty
* and we reached the end of the blob.
* @throws IOException * @throws IOException
*/ */
private synchronized boolean ensureDataInBuffer() throws IOException { private synchronized boolean ensureDataInBuffer() throws IOException {
@ -257,11 +263,15 @@ final class PageBlobInputStream extends InputStream {
@Override @Override
public synchronized int read(byte[] outputBuffer, int offset, int len) public synchronized int read(byte[] outputBuffer, int offset, int len)
throws IOException { throws IOException {
// If len is zero return 0 per the InputStream contract
if (len == 0) {
return 0;
}
int numberOfBytesRead = 0; int numberOfBytesRead = 0;
while (len > 0) { while (len > 0) {
if (!ensureDataInBuffer()) { if (!ensureDataInBuffer()) {
filePosition += numberOfBytesRead; break;
return numberOfBytesRead;
} }
int bytesRemainingInCurrentPage = getBytesRemainingInCurrentPage(); int bytesRemainingInCurrentPage = getBytesRemainingInCurrentPage();
int numBytesToRead = Math.min(len, bytesRemainingInCurrentPage); int numBytesToRead = Math.min(len, bytesRemainingInCurrentPage);
@ -277,6 +287,13 @@ final class PageBlobInputStream extends InputStream {
currentOffsetInBuffer += numBytesToRead; currentOffsetInBuffer += numBytesToRead;
} }
} }
// if outputBuffer len is > 0 and zero bytes were read, we reached
// an EOF
if (numberOfBytesRead == 0) {
return -1;
}
filePosition += numberOfBytesRead; filePosition += numberOfBytesRead;
return numberOfBytesRead; return numberOfBytesRead;
} }
@ -284,8 +301,9 @@ final class PageBlobInputStream extends InputStream {
@Override @Override
public int read() throws IOException { public int read() throws IOException {
byte[] oneByte = new byte[1]; byte[] oneByte = new byte[1];
if (read(oneByte) == 0) { int result = read(oneByte);
return -1; if (result < 0) {
return result;
} }
return oneByte[0]; return oneByte[0];
} }

View File

@ -117,6 +117,8 @@ final class PageBlobOutputStream extends OutputStream implements Syncable {
// The last task given to the ioThreadPool to execute, to allow // The last task given to the ioThreadPool to execute, to allow
// waiting until it's done. // waiting until it's done.
private WriteRequest lastQueuedTask; private WriteRequest lastQueuedTask;
// Whether the stream has been closed.
private boolean closed = false;
public static final Log LOG = LogFactory.getLog(AzureNativeFileSystemStore.class); public static final Log LOG = LogFactory.getLog(AzureNativeFileSystemStore.class);
@ -201,7 +203,11 @@ final class PageBlobOutputStream extends OutputStream implements Syncable {
* service. * service.
*/ */
@Override @Override
public void close() throws IOException { public synchronized void close() throws IOException {
if (closed) {
return;
}
LOG.debug("Closing page blob output stream."); LOG.debug("Closing page blob output stream.");
flush(); flush();
checkStreamState(); checkStreamState();
@ -221,7 +227,7 @@ final class PageBlobOutputStream extends OutputStream implements Syncable {
Thread.currentThread().interrupt(); Thread.currentThread().interrupt();
} }
this.lastError = new IOException("Stream is already closed."); closed = true;
} }
// Log the stacks of all threads. // Log the stacks of all threads.

View File

@ -41,7 +41,6 @@ import java.util.TimeZone;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FSDataOutputStream;
@ -54,7 +53,6 @@ import org.apache.hadoop.security.UserGroupInformation;
import org.junit.After; import org.junit.After;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.apache.hadoop.fs.azure.AzureException; import org.apache.hadoop.fs.azure.AzureException;
import org.apache.hadoop.fs.azure.NativeAzureFileSystem.FolderRenamePending; import org.apache.hadoop.fs.azure.NativeAzureFileSystem.FolderRenamePending;
@ -472,6 +470,83 @@ public abstract class NativeAzureFileSystemBaseTest {
} }
} }
@Test
public void testInputStreamReadWithZeroSizeBuffer() throws Exception {
Path newFile = new Path("zeroSizeRead");
OutputStream output = fs.create(newFile);
output.write(10);
output.close();
InputStream input = fs.open(newFile);
int result = input.read(new byte[2], 0, 0);
assertEquals(0, result);
}
@Test
public void testInputStreamReadWithBufferReturnsMinusOneOnEof() throws Exception {
Path newFile = new Path("eofRead");
OutputStream output = fs.create(newFile);
output.write(10);
output.close();
// Read first byte back
InputStream input = fs.open(newFile);
byte[] buff = new byte[1];
int result = input.read(buff, 0, 1);
assertEquals(1, result);
assertEquals(10, buff[0]);
// Issue another read and make sure it returns -1
buff[0] = 2;
result = input.read(buff, 0, 1);
assertEquals(-1, result);
// Buffer is intact
assertEquals(2, buff[0]);
}
@Test
public void testInputStreamReadWithBufferReturnsMinusOneOnEofForLargeBuffer() throws Exception {
Path newFile = new Path("eofRead2");
OutputStream output = fs.create(newFile);
byte[] outputBuff = new byte[97331];
for(int i = 0; i < outputBuff.length; ++i) {
outputBuff[i] = (byte)(Math.random() * 255);
}
output.write(outputBuff);
output.close();
// Read the content of the file
InputStream input = fs.open(newFile);
byte[] buff = new byte[131072];
int result = input.read(buff, 0, buff.length);
assertEquals(outputBuff.length, result);
for(int i = 0; i < outputBuff.length; ++i) {
assertEquals(outputBuff[i], buff[i]);
}
// Issue another read and make sure it returns -1
buff = new byte[131072];
result = input.read(buff, 0, buff.length);
assertEquals(-1, result);
}
@Test
public void testInputStreamReadIntReturnsMinusOneOnEof() throws Exception {
Path newFile = new Path("eofRead3");
OutputStream output = fs.create(newFile);
output.write(10);
output.close();
// Read first byte back
InputStream input = fs.open(newFile);
int value = input.read();
assertEquals(10, value);
// Issue another read and make sure it returns -1
value = input.read();
assertEquals(-1, value);
}
@Test @Test
public void testSetPermissionOnFile() throws Exception { public void testSetPermissionOnFile() throws Exception {
Path newFile = new Path("testPermission"); Path newFile = new Path("testPermission");

View File

@ -0,0 +1,90 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.azure;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystemContractBaseTest;
import org.junit.Ignore;
public class TestNativeAzureFileSystemContractPageBlobLive extends
FileSystemContractBaseTest {
private AzureBlobStorageTestAccount testAccount;
private AzureBlobStorageTestAccount createTestAccount()
throws Exception {
Configuration conf = new Configuration();
// Configure the page blob directories key so every file created is a page blob.
conf.set(AzureNativeFileSystemStore.KEY_PAGE_BLOB_DIRECTORIES, "/");
// Configure the atomic rename directories key so every folder will have
// atomic rename applied.
conf.set(AzureNativeFileSystemStore.KEY_ATOMIC_RENAME_DIRECTORIES, "/");
return AzureBlobStorageTestAccount.create(conf);
}
@Override
protected void setUp() throws Exception {
testAccount = createTestAccount();
if (testAccount != null) {
fs = testAccount.getFileSystem();
}
}
@Override
protected void tearDown() throws Exception {
if (testAccount != null) {
testAccount.cleanup();
testAccount = null;
fs = null;
}
}
@Override
protected void runTest() throws Throwable {
if (testAccount != null) {
super.runTest();
}
}
/**
* The following tests are failing on Azure and the Azure
* file system code needs to be modified to make them pass.
* A separate work item has been opened for this.
*/
@Ignore
public void testMoveFileUnderParent() throws Throwable {
}
@Ignore
public void testRenameFileToSelf() throws Throwable {
}
@Ignore
public void testRenameChildDirForbidden() throws Exception {
}
@Ignore
public void testMoveDirUnderParent() throws Throwable {
}
@Ignore
public void testRenameDirToSelf() throws Throwable {
}
}