HADOOP-17074. S3A Listing to be fully asynchronous. (#2207)

Contributed by Mukund Thakur.

Change-Id: I1b0574a0c9ebc0805f285dd5280a00e5add081f1
This commit is contained in:
Mukund Thakur 2020-08-25 15:59:43 +05:30 committed by Steve Loughran
parent da129a67bb
commit 0840c0c1f3
No known key found for this signature in database
GPG Key ID: D22CF846DBB162A0
5 changed files with 164 additions and 23 deletions

View File

@ -55,7 +55,9 @@ import java.util.ListIterator;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import static org.apache.hadoop.fs.impl.FutureIOSupport.awaitFuture;
import static org.apache.hadoop.fs.s3a.Constants.S3N_FOLDER_SUFFIX;
import static org.apache.hadoop.fs.s3a.S3AUtils.ACCEPT_ALL;
import static org.apache.hadoop.fs.s3a.S3AUtils.createFileStatus;
@ -739,6 +741,16 @@ public class Listing extends AbstractStoreOperation {
*/
private int maxKeys;
/**
* Future to store current batch listing result.
*/
private CompletableFuture<S3ListResult> s3ListResultFuture;
/**
* Result of previous batch.
*/
private S3ListResult objectsPrev;
/**
* Constructor -calls `listObjects()` on the request to populate the
* initial set of results/fail if there was a problem talking to the bucket.
@ -752,8 +764,10 @@ public class Listing extends AbstractStoreOperation {
S3ListRequest request) throws IOException {
this.listPath = listPath;
this.maxKeys = listingOperationCallbacks.getMaxKeys();
this.objects = listingOperationCallbacks.listObjects(request);
this.s3ListResultFuture = listingOperationCallbacks
.listObjectsAsync(request);
this.request = request;
this.objectsPrev = null;
}
/**
@ -764,7 +778,8 @@ public class Listing extends AbstractStoreOperation {
*/
@Override
public boolean hasNext() throws IOException {
return firstListing || objects.isTruncated();
return firstListing ||
(objectsPrev != null && objectsPrev.isTruncated());
}
/**
@ -780,29 +795,44 @@ public class Listing extends AbstractStoreOperation {
@Retries.RetryTranslated
public S3ListResult next() throws IOException {
if (firstListing) {
// on the first listing, don't request more data.
// Instead just clear the firstListing flag so that it future calls
// will request new data.
// clear the firstListing flag for future calls.
firstListing = false;
// Calculating the result of last async list call.
objects = awaitFuture(s3ListResultFuture);
fetchNextBatchAsyncIfPresent();
} else {
try {
if (!objects.isTruncated()) {
if (objectsPrev!= null && !objectsPrev.isTruncated()) {
// nothing more to request: fail.
throw new NoSuchElementException("No more results in listing of "
+ listPath);
+ listPath);
}
// need to request a new set of objects.
LOG.debug("[{}], Requesting next {} objects under {}",
listingCount, maxKeys, listPath);
objects = listingOperationCallbacks
.continueListObjects(request, objects);
// Calculating the result of last async list call.
objects = awaitFuture(s3ListResultFuture);
// Requesting next batch of results.
fetchNextBatchAsyncIfPresent();
listingCount++;
LOG.debug("New listing status: {}", this);
} catch (AmazonClientException e) {
throw translateException("listObjects()", listPath, e);
}
}
return objects;
// Storing the current result to be used by hasNext() call.
objectsPrev = objects;
return objectsPrev;
}
/**
* If there are more listings present, call for next batch async.
* @throws IOException
*/
private void fetchNextBatchAsyncIfPresent() throws IOException {
if (objects.isTruncated()) {
LOG.debug("[{}], Requesting next {} objects under {}",
listingCount, maxKeys, listPath);
s3ListResultFuture = listingOperationCallbacks
.continueListObjectsAsync(request, objects);
}
}
@Override

View File

@ -1649,19 +1649,21 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
@Override
@Retries.RetryRaw
public S3ListResult listObjects(
public CompletableFuture<S3ListResult> listObjectsAsync(
S3ListRequest request)
throws IOException {
return S3AFileSystem.this.listObjects(request);
return submit(unboundedThreadPool,
() -> listObjects(request));
}
@Override
@Retries.RetryRaw
public S3ListResult continueListObjects(
public CompletableFuture<S3ListResult> continueListObjectsAsync(
S3ListRequest request,
S3ListResult prevResult)
throws IOException {
return S3AFileSystem.this.continueListObjects(request, prevResult);
return submit(unboundedThreadPool,
() -> continueListObjects(request, prevResult));
}
@Override
@ -2279,6 +2281,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
* not be saved to the metadata store and
* fs.s3a.metadatastore.fail.on.write.error=true
*/
@VisibleForTesting
@Retries.OnceRaw("For PUT; post-PUT actions are RetryTranslated")
PutObjectResult putObjectDirect(PutObjectRequest putObjectRequest)
throws AmazonClientException, MetadataPersistenceException {

View File

@ -19,6 +19,7 @@
package org.apache.hadoop.fs.s3a.impl;
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
@ -38,7 +39,7 @@ import org.apache.hadoop.fs.s3a.s3guard.ITtlTimeProvider;
public interface ListingOperationCallbacks {
/**
* Initiate a {@code listObjects} operation, incrementing metrics
* Initiate a {@code listObjectsAsync} operation, incrementing metrics
* in the process.
*
* Retry policy: retry untranslated.
@ -47,7 +48,7 @@ public interface ListingOperationCallbacks {
* @throws IOException if the retry invocation raises one (it shouldn't).
*/
@Retries.RetryRaw
S3ListResult listObjects(
CompletableFuture<S3ListResult> listObjectsAsync(
S3ListRequest request)
throws IOException;
@ -60,7 +61,7 @@ public interface ListingOperationCallbacks {
* @throws IOException none, just there for retryUntranslated.
*/
@Retries.RetryRaw
S3ListResult continueListObjects(
CompletableFuture<S3ListResult> continueListObjectsAsync(
S3ListRequest request,
S3ListResult prevResult)
throws IOException;

View File

@ -29,6 +29,7 @@ import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
@ -243,13 +244,14 @@ public class TestPartialDeleteFailures {
private static class MinimalListingOperationCallbacks
implements ListingOperationCallbacks {
@Override
public S3ListResult listObjects(S3ListRequest request)
public CompletableFuture<S3ListResult> listObjectsAsync(
S3ListRequest request)
throws IOException {
return null;
}
@Override
public S3ListResult continueListObjects(
public CompletableFuture<S3ListResult> continueListObjectsAsync(
S3ListRequest request,
S3ListResult prevResult)
throws IOException {

View File

@ -18,14 +18,33 @@
package org.apache.hadoop.fs.s3a.scale;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.fs.s3a.Constants;
import org.apache.hadoop.fs.s3a.S3AFileSystem;
import org.apache.hadoop.fs.s3a.S3ATestUtils;
import org.apache.hadoop.fs.s3a.Statistic;
import org.junit.Test;
import org.assertj.core.api.Assertions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import com.amazonaws.services.s3.model.ObjectMetadata;
import com.amazonaws.services.s3.model.PutObjectRequest;
import com.amazonaws.services.s3.model.PutObjectResult;
import static org.apache.hadoop.fs.s3a.Statistic.*;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.*;
@ -137,6 +156,93 @@ public class ITestS3ADirectoryPerformance extends S3AScaleTestBase {
}
}
@Test
public void testMultiPagesListingPerformanceAndCorrectness()
throws Throwable {
describe("Check performance and correctness for multi page listing " +
"using different listing api");
final Path dir = methodPath();
final int batchSize = 10;
final int numOfPutRequests = 1000;
final int eachFileProcessingTime = 10;
final int numOfPutThreads = 50;
final Configuration conf =
getConfigurationWithConfiguredBatchSize(batchSize);
final InputStream im = new InputStream() {
@Override
public int read() throws IOException {
return -1;
}
};
final List<String> originalListOfFiles = new ArrayList<>();
List<Callable<PutObjectResult>> putObjectRequests = new ArrayList<>();
ExecutorService executorService = Executors
.newFixedThreadPool(numOfPutThreads);
NanoTimer uploadTimer = new NanoTimer();
try(S3AFileSystem fs = (S3AFileSystem) FileSystem.get(dir.toUri(), conf)) {
fs.create(dir);
assume("Test is only for raw fs", !fs.hasMetadataStore());
for (int i=0; i<numOfPutRequests; i++) {
Path file = new Path(dir, String.format("file-%03d", i));
originalListOfFiles.add(file.toString());
ObjectMetadata om = fs.newObjectMetadata(0L);
PutObjectRequest put = new PutObjectRequest(fs.getBucket(),
fs.pathToKey(file),
im,
om);
putObjectRequests.add(() ->
fs.getWriteOperationHelper().putObject(put));
}
executorService.invokeAll(putObjectRequests);
uploadTimer.end("uploading %d files with a parallelism of %d",
numOfPutRequests, numOfPutThreads);
RemoteIterator<LocatedFileStatus> resIterator = fs.listFiles(dir, true);
List<String> listUsingListFiles = new ArrayList<>();
NanoTimer timeUsingListFiles = new NanoTimer();
while(resIterator.hasNext()) {
listUsingListFiles.add(resIterator.next().getPath().toString());
Thread.sleep(eachFileProcessingTime);
}
timeUsingListFiles.end("listing %d files using listFiles() api with " +
"batch size of %d including %dms of processing time" +
" for each file",
numOfPutRequests, batchSize, eachFileProcessingTime);
Assertions.assertThat(listUsingListFiles)
.describedAs("Listing results using listFiles() must" +
"match with original list of files")
.hasSameElementsAs(originalListOfFiles)
.hasSize(numOfPutRequests);
List<String> listUsingListStatus = new ArrayList<>();
NanoTimer timeUsingListStatus = new NanoTimer();
FileStatus[] fileStatuses = fs.listStatus(dir);
for(FileStatus fileStatus : fileStatuses) {
listUsingListStatus.add(fileStatus.getPath().toString());
Thread.sleep(eachFileProcessingTime);
}
timeUsingListStatus.end("listing %d files using listStatus() api with " +
"batch size of %d including %dms of processing time" +
" for each file",
numOfPutRequests, batchSize, eachFileProcessingTime);
Assertions.assertThat(listUsingListStatus)
.describedAs("Listing results using listStatus() must" +
"match with original list of files")
.hasSameElementsAs(originalListOfFiles)
.hasSize(numOfPutRequests);
} finally {
executorService.shutdown();
}
}
private Configuration getConfigurationWithConfiguredBatchSize(int batchSize) {
Configuration conf = new Configuration(getFileSystem().getConf());
S3ATestUtils.disableFilesystemCaching(conf);
conf.setInt(Constants.MAX_PAGING_KEYS, batchSize);
return conf;
}
@Test
public void testTimeToStatEmptyDirectory() throws Throwable {
describe("Time to stat an empty directory");
@ -188,5 +294,4 @@ public class ITestS3ADirectoryPerformance extends S3AScaleTestBase {
LOG.info("listObjects: {}", listRequests);
LOG.info("listObjects: per operation {}", listRequests.diff() / attempts);
}
}