HADOOP-17682. ABFS: Support FileStatus input to OpenFileWithOptions() via OpenFileParameters (#2975)

This commit is contained in:
sumangala-patki 2021-08-18 19:14:10 +05:30 committed by GitHub
parent ee07b90286
commit dcddc6a59f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 183 additions and 33 deletions

View File

@ -208,16 +208,15 @@ public class AzureBlobFileSystem extends FileSystem
} }
private FSDataInputStream open(final Path path, private FSDataInputStream open(final Path path,
final Optional<Configuration> options) throws IOException { final Optional<OpenFileParameters> parameters) throws IOException {
statIncrement(CALL_OPEN); statIncrement(CALL_OPEN);
Path qualifiedPath = makeQualified(path); Path qualifiedPath = makeQualified(path);
try { try {
TracingContext tracingContext = new TracingContext(clientCorrelationId, TracingContext tracingContext = new TracingContext(clientCorrelationId,
fileSystemId, FSOperationType.OPEN, tracingHeaderFormat, fileSystemId, FSOperationType.OPEN, tracingHeaderFormat, listener);
listener); InputStream inputStream = abfsStore
InputStream inputStream = abfsStore.openFileForRead(qualifiedPath, .openFileForRead(qualifiedPath, parameters, statistics, tracingContext);
options, statistics, tracingContext);
return new FSDataInputStream(inputStream); return new FSDataInputStream(inputStream);
} catch(AzureBlobFileSystemException ex) { } catch(AzureBlobFileSystemException ex) {
checkException(path, ex); checkException(path, ex);
@ -225,6 +224,15 @@ public class AzureBlobFileSystem extends FileSystem
} }
} }
/**
* Takes config and other options through
* {@link org.apache.hadoop.fs.impl.OpenFileParameters}. Ensure that
* FileStatus entered is up-to-date, as it will be used to create the
* InputStream (with info such as contentLength, eTag)
* @param path The location of file to be opened
* @param parameters OpenFileParameters instance; can hold FileStatus,
* Configuration, bufferSize and mandatoryKeys
*/
@Override @Override
protected CompletableFuture<FSDataInputStream> openFileWithOptions( protected CompletableFuture<FSDataInputStream> openFileWithOptions(
final Path path, final OpenFileParameters parameters) throws IOException { final Path path, final OpenFileParameters parameters) throws IOException {
@ -235,7 +243,7 @@ public class AzureBlobFileSystem extends FileSystem
"for " + path); "for " + path);
return LambdaUtils.eval( return LambdaUtils.eval(
new CompletableFuture<>(), () -> new CompletableFuture<>(), () ->
open(path, Optional.of(parameters.getOptions()))); open(path, Optional.of(parameters)));
} }
@Override @Override

View File

@ -115,6 +115,7 @@ import org.apache.hadoop.fs.azurebfs.utils.CRC64;
import org.apache.hadoop.fs.azurebfs.utils.DateTimeUtils; import org.apache.hadoop.fs.azurebfs.utils.DateTimeUtils;
import org.apache.hadoop.fs.azurebfs.utils.TracingContext; import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
import org.apache.hadoop.fs.azurebfs.utils.UriUtils; import org.apache.hadoop.fs.azurebfs.utils.UriUtils;
import org.apache.hadoop.fs.impl.OpenFileParameters;
import org.apache.hadoop.fs.permission.AclEntry; import org.apache.hadoop.fs.permission.AclEntry;
import org.apache.hadoop.fs.permission.AclStatus; import org.apache.hadoop.fs.permission.AclStatus;
import org.apache.hadoop.fs.permission.FsAction; import org.apache.hadoop.fs.permission.FsAction;
@ -129,6 +130,8 @@ import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.CHAR_HYP
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.CHAR_PLUS; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.CHAR_PLUS;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.CHAR_STAR; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.CHAR_STAR;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.CHAR_UNDERSCORE; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.CHAR_UNDERSCORE;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.DIRECTORY;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.FILE;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.ROOT_PATH; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.ROOT_PATH;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.SINGLE_WHITE_SPACE; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.SINGLE_WHITE_SPACE;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.TOKEN_VERSION; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.TOKEN_VERSION;
@ -669,44 +672,64 @@ public class AzureBlobFileSystemStore implements Closeable, ListingSupport {
public AbfsInputStream openFileForRead(final Path path, public AbfsInputStream openFileForRead(final Path path,
final FileSystem.Statistics statistics, TracingContext tracingContext) final FileSystem.Statistics statistics, TracingContext tracingContext)
throws AzureBlobFileSystemException { throws IOException {
return openFileForRead(path, Optional.empty(), statistics, tracingContext); return openFileForRead(path, Optional.empty(), statistics,
tracingContext);
} }
public AbfsInputStream openFileForRead(final Path path, public AbfsInputStream openFileForRead(Path path,
final Optional<Configuration> options, final Optional<OpenFileParameters> parameters,
final FileSystem.Statistics statistics, TracingContext tracingContext) final FileSystem.Statistics statistics, TracingContext tracingContext)
throws AzureBlobFileSystemException { throws IOException {
try (AbfsPerfInfo perfInfo = startTracking("openFileForRead", "getPathStatus")) { try (AbfsPerfInfo perfInfo = startTracking("openFileForRead",
"getPathStatus")) {
LOG.debug("openFileForRead filesystem: {} path: {}", LOG.debug("openFileForRead filesystem: {} path: {}",
client.getFileSystem(), client.getFileSystem(), path);
path);
FileStatus fileStatus = parameters.map(OpenFileParameters::getStatus)
.orElse(null);
String relativePath = getRelativePath(path); String relativePath = getRelativePath(path);
String resourceType, eTag;
final AbfsRestOperation op = client long contentLength;
.getPathStatus(relativePath, false, tracingContext); if (fileStatus instanceof VersionedFileStatus) {
perfInfo.registerResult(op.getResult()); path = path.makeQualified(this.uri, path);
Preconditions.checkArgument(fileStatus.getPath().equals(path),
final String resourceType = op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_RESOURCE_TYPE); String.format(
final long contentLength = Long.parseLong(op.getResult().getResponseHeader(HttpHeaderConfigurations.CONTENT_LENGTH)); "Filestatus path [%s] does not match with given path [%s]",
final String eTag = op.getResult().getResponseHeader(HttpHeaderConfigurations.ETAG); fileStatus.getPath(), path));
resourceType = fileStatus.isFile() ? FILE : DIRECTORY;
contentLength = fileStatus.getLen();
eTag = ((VersionedFileStatus) fileStatus).getVersion();
} else {
if (fileStatus != null) {
LOG.warn(
"Fallback to getPathStatus REST call as provided filestatus "
+ "is not of type VersionedFileStatus");
}
AbfsHttpOperation op = client.getPathStatus(relativePath, false,
tracingContext).getResult();
resourceType = op.getResponseHeader(
HttpHeaderConfigurations.X_MS_RESOURCE_TYPE);
contentLength = Long.parseLong(
op.getResponseHeader(HttpHeaderConfigurations.CONTENT_LENGTH));
eTag = op.getResponseHeader(HttpHeaderConfigurations.ETAG);
}
if (parseIsDirectory(resourceType)) { if (parseIsDirectory(resourceType)) {
throw new AbfsRestOperationException( throw new AbfsRestOperationException(
AzureServiceErrorCode.PATH_NOT_FOUND.getStatusCode(), AzureServiceErrorCode.PATH_NOT_FOUND.getStatusCode(),
AzureServiceErrorCode.PATH_NOT_FOUND.getErrorCode(), AzureServiceErrorCode.PATH_NOT_FOUND.getErrorCode(),
"openFileForRead must be used with files and not directories", "openFileForRead must be used with files and not directories",
null); null);
} }
perfInfo.registerSuccess(true); perfInfo.registerSuccess(true);
// Add statistics for InputStream // Add statistics for InputStream
return new AbfsInputStream(client, statistics, return new AbfsInputStream(client, statistics, relativePath,
relativePath, contentLength, contentLength, populateAbfsInputStreamContext(
populateAbfsInputStreamContext(options), parameters.map(OpenFileParameters::getOptions)),
eTag, tracingContext); eTag, tracingContext);
} }
} }

View File

@ -38,6 +38,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.azurebfs.constants.FSOperationType; import org.apache.hadoop.fs.azurebfs.constants.FSOperationType;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException; import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
import org.apache.hadoop.fs.azurebfs.security.AbfsDelegationTokenManager; import org.apache.hadoop.fs.azurebfs.security.AbfsDelegationTokenManager;
import org.apache.hadoop.fs.azurebfs.services.AbfsClient;
import org.apache.hadoop.fs.azurebfs.services.AbfsOutputStream; import org.apache.hadoop.fs.azurebfs.services.AbfsOutputStream;
import org.apache.hadoop.fs.azurebfs.services.AuthType; import org.apache.hadoop.fs.azurebfs.services.AuthType;
import org.apache.hadoop.fs.azure.AzureNativeFileSystemStore; import org.apache.hadoop.fs.azure.AzureNativeFileSystemStore;
@ -429,6 +430,15 @@ public abstract class AbstractAbfsIntegrationTest extends
return fs.getAbfsStore(); return fs.getAbfsStore();
} }
public AbfsClient getAbfsClient(final AzureBlobFileSystemStore abfsStore) {
return abfsStore.getClient();
}
public void setAbfsClient(AzureBlobFileSystemStore abfsStore,
AbfsClient client) {
abfsStore.setClient(client);
}
public Path makeQualified(Path path) throws java.io.IOException { public Path makeQualified(Path path) throws java.io.IOException {
return getFileSystem().makeQualified(path); return getFileSystem().makeQualified(path);
} }

View File

@ -19,31 +19,40 @@
package org.apache.hadoop.fs.azurebfs.services; package org.apache.hadoop.fs.azurebfs.services;
import java.io.IOException; import java.io.IOException;
import org.junit.Assert;
import org.junit.Test;
import java.util.Arrays; import java.util.Arrays;
import java.util.Optional;
import java.util.Random;
import java.util.concurrent.ExecutionException;
import org.assertj.core.api.Assertions; import org.assertj.core.api.Assertions;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FutureDataInputStreamBuilder;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.azurebfs.AbstractAbfsIntegrationTest; import org.apache.hadoop.fs.azurebfs.AbstractAbfsIntegrationTest;
import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem; import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem;
import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException; import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.TimeoutException; import org.apache.hadoop.fs.azurebfs.contracts.exceptions.TimeoutException;
import org.apache.hadoop.fs.azurebfs.contracts.services.ReadBufferStatus; import org.apache.hadoop.fs.azurebfs.contracts.services.ReadBufferStatus;
import org.apache.hadoop.fs.azurebfs.utils.TestCachedSASToken; import org.apache.hadoop.fs.azurebfs.utils.TestCachedSASToken;
import org.apache.hadoop.fs.azurebfs.utils.TracingContext; import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
import org.apache.hadoop.fs.impl.OpenFileParameters;
import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times; import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
@ -192,6 +201,106 @@ public class TestAbfsInputStream extends
ReadBufferManager.getBufferManager().setThresholdAgeMilliseconds(REDUCED_READ_BUFFER_AGE_THRESHOLD); ReadBufferManager.getBufferManager().setThresholdAgeMilliseconds(REDUCED_READ_BUFFER_AGE_THRESHOLD);
} }
private void writeBufferToNewFile(Path testFile, byte[] buffer) throws IOException {
AzureBlobFileSystem fs = getFileSystem();
fs.create(testFile);
FSDataOutputStream out = fs.append(testFile);
out.write(buffer);
out.close();
}
private void verifyOpenWithProvidedStatus(Path path, FileStatus fileStatus,
byte[] buf, AbfsRestOperationType source)
throws IOException, ExecutionException, InterruptedException {
byte[] readBuf = new byte[buf.length];
AzureBlobFileSystem fs = getFileSystem();
FutureDataInputStreamBuilder builder = fs.openFile(path);
builder.withFileStatus(fileStatus);
FSDataInputStream in = builder.build().get();
assertEquals(String.format(
"Open with fileStatus [from %s result]: Incorrect number of bytes read",
source), buf.length, in.read(readBuf));
assertArrayEquals(String
.format("Open with fileStatus [from %s result]: Incorrect read data",
source), readBuf, buf);
}
private void checkGetPathStatusCalls(Path testFile, FileStatus fileStatus,
AzureBlobFileSystemStore abfsStore, AbfsClient mockClient,
AbfsRestOperationType source, TracingContext tracingContext)
throws IOException {
// verify GetPathStatus not invoked when FileStatus is provided
abfsStore.openFileForRead(testFile, Optional
.ofNullable(new OpenFileParameters().withStatus(fileStatus)), null, tracingContext);
verify(mockClient, times(0).description((String.format(
"FileStatus [from %s result] provided, GetFileStatus should not be invoked",
source)))).getPathStatus(anyString(), anyBoolean(), any(TracingContext.class));
// verify GetPathStatus invoked when FileStatus not provided
abfsStore.openFileForRead(testFile,
Optional.empty(), null,
tracingContext);
verify(mockClient, times(1).description(
"GetPathStatus should be invoked when FileStatus not provided"))
.getPathStatus(anyString(), anyBoolean(), any(TracingContext.class));
Mockito.reset(mockClient); //clears invocation count for next test case
}
@Test
public void testOpenFileWithOptions() throws Exception {
AzureBlobFileSystem fs = getFileSystem();
String testFolder = "/testFolder";
Path smallTestFile = new Path(testFolder + "/testFile0");
Path largeTestFile = new Path(testFolder + "/testFile1");
fs.mkdirs(new Path(testFolder));
int readBufferSize = getConfiguration().getReadBufferSize();
byte[] smallBuffer = new byte[5];
byte[] largeBuffer = new byte[readBufferSize + 5];
new Random().nextBytes(smallBuffer);
new Random().nextBytes(largeBuffer);
writeBufferToNewFile(smallTestFile, smallBuffer);
writeBufferToNewFile(largeTestFile, largeBuffer);
FileStatus[] getFileStatusResults = {fs.getFileStatus(smallTestFile),
fs.getFileStatus(largeTestFile)};
FileStatus[] listStatusResults = fs.listStatus(new Path(testFolder));
// open with fileStatus from GetPathStatus
verifyOpenWithProvidedStatus(smallTestFile, getFileStatusResults[0],
smallBuffer, AbfsRestOperationType.GetPathStatus);
verifyOpenWithProvidedStatus(largeTestFile, getFileStatusResults[1],
largeBuffer, AbfsRestOperationType.GetPathStatus);
// open with fileStatus from ListStatus
verifyOpenWithProvidedStatus(smallTestFile, listStatusResults[0], smallBuffer,
AbfsRestOperationType.ListPaths);
verifyOpenWithProvidedStatus(largeTestFile, listStatusResults[1], largeBuffer,
AbfsRestOperationType.ListPaths);
// verify number of GetPathStatus invocations
AzureBlobFileSystemStore abfsStore = getAbfsStore(fs);
AbfsClient mockClient = spy(getAbfsClient(abfsStore));
setAbfsClient(abfsStore, mockClient);
TracingContext tracingContext = getTestTracingContext(fs, false);
checkGetPathStatusCalls(smallTestFile, getFileStatusResults[0],
abfsStore, mockClient, AbfsRestOperationType.GetPathStatus, tracingContext);
checkGetPathStatusCalls(largeTestFile, getFileStatusResults[1],
abfsStore, mockClient, AbfsRestOperationType.GetPathStatus, tracingContext);
checkGetPathStatusCalls(smallTestFile, listStatusResults[0],
abfsStore, mockClient, AbfsRestOperationType.ListPaths, tracingContext);
checkGetPathStatusCalls(largeTestFile, listStatusResults[1],
abfsStore, mockClient, AbfsRestOperationType.ListPaths, tracingContext);
// Verify with incorrect filestatus
getFileStatusResults[0].setPath(new Path("wrongPath"));
intercept(ExecutionException.class,
() -> verifyOpenWithProvidedStatus(smallTestFile,
getFileStatusResults[0], smallBuffer,
AbfsRestOperationType.GetPathStatus));
}
/** /**
* This test expects AbfsInputStream to throw the exception that readAhead * This test expects AbfsInputStream to throw the exception that readAhead
* thread received on read. The readAhead thread must be initiated from the * thread received on read. The readAhead thread must be initiated from the