diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java index a8bf7c16eec..91274289f54 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java @@ -208,16 +208,15 @@ public class AzureBlobFileSystem extends FileSystem } private FSDataInputStream open(final Path path, - final Optional options) throws IOException { + final Optional parameters) throws IOException { statIncrement(CALL_OPEN); Path qualifiedPath = makeQualified(path); try { TracingContext tracingContext = new TracingContext(clientCorrelationId, - fileSystemId, FSOperationType.OPEN, tracingHeaderFormat, - listener); - InputStream inputStream = abfsStore.openFileForRead(qualifiedPath, - options, statistics, tracingContext); + fileSystemId, FSOperationType.OPEN, tracingHeaderFormat, listener); + InputStream inputStream = abfsStore + .openFileForRead(qualifiedPath, parameters, statistics, tracingContext); return new FSDataInputStream(inputStream); } catch(AzureBlobFileSystemException ex) { checkException(path, ex); @@ -225,6 +224,15 @@ public class AzureBlobFileSystem extends FileSystem } } + /** + * Takes config and other options through + * {@link org.apache.hadoop.fs.impl.OpenFileParameters}. Ensure that + * FileStatus entered is up-to-date, as it will be used to create the + * InputStream (with info such as contentLength, eTag) + * @param path The location of file to be opened + * @param parameters OpenFileParameters instance; can hold FileStatus, + * Configuration, bufferSize and mandatoryKeys + */ @Override protected CompletableFuture openFileWithOptions( final Path path, final OpenFileParameters parameters) throws IOException { @@ -235,7 +243,7 @@ public class AzureBlobFileSystem extends FileSystem "for " + path); return LambdaUtils.eval( new CompletableFuture<>(), () -> - open(path, Optional.of(parameters.getOptions()))); + open(path, Optional.of(parameters))); } @Override diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java index 3a527f7f0c3..de6f676bab5 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java @@ -115,6 +115,7 @@ import org.apache.hadoop.fs.azurebfs.utils.CRC64; import org.apache.hadoop.fs.azurebfs.utils.DateTimeUtils; import org.apache.hadoop.fs.azurebfs.utils.TracingContext; import org.apache.hadoop.fs.azurebfs.utils.UriUtils; +import org.apache.hadoop.fs.impl.OpenFileParameters; import org.apache.hadoop.fs.permission.AclEntry; import org.apache.hadoop.fs.permission.AclStatus; import org.apache.hadoop.fs.permission.FsAction; @@ -129,6 +130,8 @@ import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.CHAR_HYP import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.CHAR_PLUS; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.CHAR_STAR; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.CHAR_UNDERSCORE; +import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.DIRECTORY; +import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.FILE; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.ROOT_PATH; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.SINGLE_WHITE_SPACE; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.TOKEN_VERSION; @@ -669,44 +672,64 @@ public class AzureBlobFileSystemStore implements Closeable, ListingSupport { public AbfsInputStream openFileForRead(final Path path, final FileSystem.Statistics statistics, TracingContext tracingContext) - throws AzureBlobFileSystemException { - return openFileForRead(path, Optional.empty(), statistics, tracingContext); + throws IOException { + return openFileForRead(path, Optional.empty(), statistics, + tracingContext); } - public AbfsInputStream openFileForRead(final Path path, - final Optional options, + public AbfsInputStream openFileForRead(Path path, + final Optional parameters, final FileSystem.Statistics statistics, TracingContext tracingContext) - throws AzureBlobFileSystemException { - try (AbfsPerfInfo perfInfo = startTracking("openFileForRead", "getPathStatus")) { + throws IOException { + try (AbfsPerfInfo perfInfo = startTracking("openFileForRead", + "getPathStatus")) { LOG.debug("openFileForRead filesystem: {} path: {}", - client.getFileSystem(), - path); + client.getFileSystem(), path); + FileStatus fileStatus = parameters.map(OpenFileParameters::getStatus) + .orElse(null); String relativePath = getRelativePath(path); - - final AbfsRestOperation op = client - .getPathStatus(relativePath, false, tracingContext); - perfInfo.registerResult(op.getResult()); - - final String resourceType = op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_RESOURCE_TYPE); - final long contentLength = Long.parseLong(op.getResult().getResponseHeader(HttpHeaderConfigurations.CONTENT_LENGTH)); - final String eTag = op.getResult().getResponseHeader(HttpHeaderConfigurations.ETAG); + String resourceType, eTag; + long contentLength; + if (fileStatus instanceof VersionedFileStatus) { + path = path.makeQualified(this.uri, path); + Preconditions.checkArgument(fileStatus.getPath().equals(path), + String.format( + "Filestatus path [%s] does not match with given path [%s]", + fileStatus.getPath(), path)); + resourceType = fileStatus.isFile() ? FILE : DIRECTORY; + contentLength = fileStatus.getLen(); + eTag = ((VersionedFileStatus) fileStatus).getVersion(); + } else { + if (fileStatus != null) { + LOG.warn( + "Fallback to getPathStatus REST call as provided filestatus " + + "is not of type VersionedFileStatus"); + } + AbfsHttpOperation op = client.getPathStatus(relativePath, false, + tracingContext).getResult(); + resourceType = op.getResponseHeader( + HttpHeaderConfigurations.X_MS_RESOURCE_TYPE); + contentLength = Long.parseLong( + op.getResponseHeader(HttpHeaderConfigurations.CONTENT_LENGTH)); + eTag = op.getResponseHeader(HttpHeaderConfigurations.ETAG); + } if (parseIsDirectory(resourceType)) { throw new AbfsRestOperationException( - AzureServiceErrorCode.PATH_NOT_FOUND.getStatusCode(), - AzureServiceErrorCode.PATH_NOT_FOUND.getErrorCode(), - "openFileForRead must be used with files and not directories", - null); + AzureServiceErrorCode.PATH_NOT_FOUND.getStatusCode(), + AzureServiceErrorCode.PATH_NOT_FOUND.getErrorCode(), + "openFileForRead must be used with files and not directories", + null); } perfInfo.registerSuccess(true); // Add statistics for InputStream - return new AbfsInputStream(client, statistics, - relativePath, contentLength, - populateAbfsInputStreamContext(options), - eTag, tracingContext); + return new AbfsInputStream(client, statistics, relativePath, + contentLength, populateAbfsInputStreamContext( + parameters.map(OpenFileParameters::getOptions)), + eTag, tracingContext); } } diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/AbstractAbfsIntegrationTest.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/AbstractAbfsIntegrationTest.java index ae24cf4a107..d4e44c37f63 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/AbstractAbfsIntegrationTest.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/AbstractAbfsIntegrationTest.java @@ -38,6 +38,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.azurebfs.constants.FSOperationType; import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException; import org.apache.hadoop.fs.azurebfs.security.AbfsDelegationTokenManager; +import org.apache.hadoop.fs.azurebfs.services.AbfsClient; import org.apache.hadoop.fs.azurebfs.services.AbfsOutputStream; import org.apache.hadoop.fs.azurebfs.services.AuthType; import org.apache.hadoop.fs.azure.AzureNativeFileSystemStore; @@ -429,6 +430,15 @@ public abstract class AbstractAbfsIntegrationTest extends return fs.getAbfsStore(); } + public AbfsClient getAbfsClient(final AzureBlobFileSystemStore abfsStore) { + return abfsStore.getClient(); + } + + public void setAbfsClient(AzureBlobFileSystemStore abfsStore, + AbfsClient client) { + abfsStore.setClient(client); + } + public Path makeQualified(Path path) throws java.io.IOException { return getFileSystem().makeQualified(path); } diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java index 62326e0dbb3..b5ae9b73784 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java @@ -19,31 +19,40 @@ package org.apache.hadoop.fs.azurebfs.services; import java.io.IOException; - -import org.junit.Assert; -import org.junit.Test; import java.util.Arrays; +import java.util.Optional; +import java.util.Random; +import java.util.concurrent.ExecutionException; import org.assertj.core.api.Assertions; +import org.junit.Assert; +import org.junit.Test; +import org.mockito.Mockito; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FutureDataInputStreamBuilder; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.azurebfs.AbstractAbfsIntegrationTest; import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem; +import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore; import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException; import org.apache.hadoop.fs.azurebfs.contracts.exceptions.TimeoutException; import org.apache.hadoop.fs.azurebfs.contracts.services.ReadBufferStatus; import org.apache.hadoop.fs.azurebfs.utils.TestCachedSASToken; import org.apache.hadoop.fs.azurebfs.utils.TracingContext; +import org.apache.hadoop.fs.impl.OpenFileParameters; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyBoolean; +import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -192,6 +201,106 @@ public class TestAbfsInputStream extends ReadBufferManager.getBufferManager().setThresholdAgeMilliseconds(REDUCED_READ_BUFFER_AGE_THRESHOLD); } + private void writeBufferToNewFile(Path testFile, byte[] buffer) throws IOException { + AzureBlobFileSystem fs = getFileSystem(); + fs.create(testFile); + FSDataOutputStream out = fs.append(testFile); + out.write(buffer); + out.close(); + } + + private void verifyOpenWithProvidedStatus(Path path, FileStatus fileStatus, + byte[] buf, AbfsRestOperationType source) + throws IOException, ExecutionException, InterruptedException { + byte[] readBuf = new byte[buf.length]; + AzureBlobFileSystem fs = getFileSystem(); + FutureDataInputStreamBuilder builder = fs.openFile(path); + builder.withFileStatus(fileStatus); + FSDataInputStream in = builder.build().get(); + assertEquals(String.format( + "Open with fileStatus [from %s result]: Incorrect number of bytes read", + source), buf.length, in.read(readBuf)); + assertArrayEquals(String + .format("Open with fileStatus [from %s result]: Incorrect read data", + source), readBuf, buf); + } + + private void checkGetPathStatusCalls(Path testFile, FileStatus fileStatus, + AzureBlobFileSystemStore abfsStore, AbfsClient mockClient, + AbfsRestOperationType source, TracingContext tracingContext) + throws IOException { + + // verify GetPathStatus not invoked when FileStatus is provided + abfsStore.openFileForRead(testFile, Optional + .ofNullable(new OpenFileParameters().withStatus(fileStatus)), null, tracingContext); + verify(mockClient, times(0).description((String.format( + "FileStatus [from %s result] provided, GetFileStatus should not be invoked", + source)))).getPathStatus(anyString(), anyBoolean(), any(TracingContext.class)); + + // verify GetPathStatus invoked when FileStatus not provided + abfsStore.openFileForRead(testFile, + Optional.empty(), null, + tracingContext); + verify(mockClient, times(1).description( + "GetPathStatus should be invoked when FileStatus not provided")) + .getPathStatus(anyString(), anyBoolean(), any(TracingContext.class)); + + Mockito.reset(mockClient); //clears invocation count for next test case + } + + @Test + public void testOpenFileWithOptions() throws Exception { + AzureBlobFileSystem fs = getFileSystem(); + String testFolder = "/testFolder"; + Path smallTestFile = new Path(testFolder + "/testFile0"); + Path largeTestFile = new Path(testFolder + "/testFile1"); + fs.mkdirs(new Path(testFolder)); + int readBufferSize = getConfiguration().getReadBufferSize(); + byte[] smallBuffer = new byte[5]; + byte[] largeBuffer = new byte[readBufferSize + 5]; + new Random().nextBytes(smallBuffer); + new Random().nextBytes(largeBuffer); + writeBufferToNewFile(smallTestFile, smallBuffer); + writeBufferToNewFile(largeTestFile, largeBuffer); + + FileStatus[] getFileStatusResults = {fs.getFileStatus(smallTestFile), + fs.getFileStatus(largeTestFile)}; + FileStatus[] listStatusResults = fs.listStatus(new Path(testFolder)); + + // open with fileStatus from GetPathStatus + verifyOpenWithProvidedStatus(smallTestFile, getFileStatusResults[0], + smallBuffer, AbfsRestOperationType.GetPathStatus); + verifyOpenWithProvidedStatus(largeTestFile, getFileStatusResults[1], + largeBuffer, AbfsRestOperationType.GetPathStatus); + + // open with fileStatus from ListStatus + verifyOpenWithProvidedStatus(smallTestFile, listStatusResults[0], smallBuffer, + AbfsRestOperationType.ListPaths); + verifyOpenWithProvidedStatus(largeTestFile, listStatusResults[1], largeBuffer, + AbfsRestOperationType.ListPaths); + + // verify number of GetPathStatus invocations + AzureBlobFileSystemStore abfsStore = getAbfsStore(fs); + AbfsClient mockClient = spy(getAbfsClient(abfsStore)); + setAbfsClient(abfsStore, mockClient); + TracingContext tracingContext = getTestTracingContext(fs, false); + checkGetPathStatusCalls(smallTestFile, getFileStatusResults[0], + abfsStore, mockClient, AbfsRestOperationType.GetPathStatus, tracingContext); + checkGetPathStatusCalls(largeTestFile, getFileStatusResults[1], + abfsStore, mockClient, AbfsRestOperationType.GetPathStatus, tracingContext); + checkGetPathStatusCalls(smallTestFile, listStatusResults[0], + abfsStore, mockClient, AbfsRestOperationType.ListPaths, tracingContext); + checkGetPathStatusCalls(largeTestFile, listStatusResults[1], + abfsStore, mockClient, AbfsRestOperationType.ListPaths, tracingContext); + + // Verify with incorrect filestatus + getFileStatusResults[0].setPath(new Path("wrongPath")); + intercept(ExecutionException.class, + () -> verifyOpenWithProvidedStatus(smallTestFile, + getFileStatusResults[0], smallBuffer, + AbfsRestOperationType.GetPathStatus)); + } + /** * This test expects AbfsInputStream to throw the exception that readAhead * thread received on read. The readAhead thread must be initiated from the