HADOOP-13704. Optimized S3A getContentSummary()

Optimize the scan for s3 by performing a deep tree listing,
inferring directory counts from the paths returned.

Contributed by Ahmar Suhail.

Change-Id: I26ffa8c6f65fd11c68a88d6e2243b0eac6ffd024
This commit is contained in:
Steve Loughran 2022-03-22 13:20:37 +00:00
parent 4153c16324
commit 105e0dbd92
No known key found for this signature in database
GPG Key ID: D22CF846DBB162A0
9 changed files with 288 additions and 45 deletions

View File

@ -453,6 +453,26 @@ The function `getLocatedFileStatus(FS, d)` is as defined in
The atomicity and consistency constraints are as for
`listStatus(Path, PathFilter)`.
### `ContentSummary getContentSummary(Path path)`
Given a path return its content summary.
`getContentSummary()` first checks if the given path is a file and if yes, it returns 0 for directory count
and 1 for file count.
#### Preconditions
exists(FS, path) else raise FileNotFoundException
#### Postconditions
Returns a `ContentSummary` object with information such as directory count
and file count for a given path.
The atomicity and consistency constraints are as for
`listStatus(Path, PathFilter)`.
### `BlockLocation[] getFileBlockLocations(FileStatus f, int s, int l)`
#### Preconditions

View File

@ -0,0 +1,65 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.contract;
import org.apache.hadoop.fs.ContentSummary;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.assertj.core.api.Assertions;
import org.junit.Test;
import java.io.FileNotFoundException;
import static org.apache.hadoop.fs.contract.ContractTestUtils.touch;
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
public abstract class AbstractContractContentSummaryTest extends AbstractFSContractTestBase {
@Test
public void testGetContentSummary() throws Throwable {
FileSystem fs = getFileSystem();
Path parent = path("parent");
Path nested = path(parent + "/a/b/c");
Path filePath = path(nested + "file.txt");
fs.mkdirs(parent);
fs.mkdirs(nested);
touch(getFileSystem(), filePath);
ContentSummary summary = fs.getContentSummary(parent);
Assertions.assertThat(summary.getDirectoryCount()).as("Summary " + summary).isEqualTo(4);
Assertions.assertThat(summary.getFileCount()).as("Summary " + summary).isEqualTo(1);
}
@Test
public void testGetContentSummaryIncorrectPath() throws Throwable {
FileSystem fs = getFileSystem();
Path parent = path("parent");
Path nested = path(parent + "/a");
fs.mkdirs(parent);
intercept(FileNotFoundException.class, () -> fs.getContentSummary(nested));
}
}

View File

@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.contract.localfs;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.contract.AbstractContractContentSummaryTest;
import org.apache.hadoop.fs.contract.AbstractFSContract;
public class TestLocalFSContractContentSummary extends AbstractContractContentSummaryTest {
@Override
protected AbstractFSContract createContract(Configuration conf) {
return new LocalFSContract(conf);
}
}

View File

@ -3261,9 +3261,9 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
}
@Override
public RemoteIterator<S3AFileStatus> listStatusIterator(final Path path)
throws IOException {
return S3AFileSystem.this.innerListStatus(path);
public RemoteIterator<S3ALocatedFileStatus> listFilesIterator(final Path path,
final boolean recursive) throws IOException {
return S3AFileSystem.this.innerListFiles(path, recursive, Listing.ACCEPT_ALL_BUT_S3N, null);
}
}

View File

@ -66,7 +66,7 @@ public class S3ObjectAttributes {
/**
* Construct from the result of a copy and those parameters
* which aren't included in an AWS SDK response.
* @param path
* @param path path
* @param copyResult copy result.
* @param serverSideEncryptionAlgorithm current encryption algorithm
* @param serverSideEncryptionKey any server side encryption key?

View File

@ -20,6 +20,7 @@ package org.apache.hadoop.fs.s3a.impl;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.HashSet;
import java.util.Set;
import org.slf4j.Logger;
@ -34,22 +35,15 @@ import org.apache.hadoop.fs.s3a.S3AFileStatus;
import org.apache.hadoop.fs.statistics.IOStatistics;
import org.apache.hadoop.fs.statistics.IOStatisticsSnapshot;
import org.apache.hadoop.fs.statistics.IOStatisticsSource;
import org.apache.hadoop.fs.s3a.S3ALocatedFileStatus;
import static org.apache.hadoop.fs.statistics.IOStatisticsSupport.retrieveIOStatistics;
/**
* GetContentSummary operation.
* This is based on {@code FileSystem.get#getContentSummary};
* its still doing sequential treewalk with the efficiency
* issues.
*
* Changes:
* 1. On the recursive calls there
* is no probe to see if the path is a file: we know the
* recursion only happens with a dir.
* 2. If a subdirectory is not found during the walk, that
* does not trigger an error. The directory is clearly
* not part of the content any more.
* It is optimized for s3 and performs a deep tree listing,
* inferring directory counts from the paths returned.
*
* The Operation serves up IOStatistics; this counts
* the cost of all the list operations, but not the
@ -122,9 +116,7 @@ public class GetContentSummaryOperation extends
/**
* Return the {@link ContentSummary} of a given directory.
* This is a recursive operation (as the original is);
* it'd be more efficient of stack and heap if it managed its
* own stack.
*
* @param dir dir to scan
* @throws FileNotFoundException if the path does not resolve
* @throws IOException IO failure
@ -133,34 +125,65 @@ public class GetContentSummaryOperation extends
* @throws IOException failure
*/
public ContentSummary getDirSummary(Path dir) throws IOException {
long totalLength = 0;
long fileCount = 0;
long dirCount = 1;
final RemoteIterator<S3AFileStatus> it
= callbacks.listStatusIterator(dir);
RemoteIterator<S3ALocatedFileStatus> it = callbacks.listFilesIterator(dir, true);
Set<Path> dirSet = new HashSet<>();
Set<Path> pathsTraversed = new HashSet<>();
while (it.hasNext()) {
final S3AFileStatus s = it.next();
if (s.isDirectory()) {
try {
ContentSummary c = getDirSummary(s.getPath());
totalLength += c.getLength();
fileCount += c.getFileCount();
dirCount += c.getDirectoryCount();
} catch (FileNotFoundException ignored) {
// path was deleted during the scan; exclude from
// summary.
}
} else {
totalLength += s.getLen();
S3ALocatedFileStatus fileStatus = it.next();
Path filePath = fileStatus.getPath();
if (fileStatus.isDirectory() && !filePath.equals(dir)) {
dirSet.add(filePath);
buildDirectorySet(dirSet, pathsTraversed, dir, filePath.getParent());
} else if (!fileStatus.isDirectory()) {
fileCount += 1;
totalLength += fileStatus.getLen();
buildDirectorySet(dirSet, pathsTraversed, dir, filePath.getParent());
}
}
// Add the list's IOStatistics
iostatistics.aggregate(retrieveIOStatistics(it));
return new ContentSummary.Builder().length(totalLength).
fileCount(fileCount).directoryCount(dirCount).
spaceConsumed(totalLength).build();
fileCount(fileCount).directoryCount(dirCount + dirSet.size()).
spaceConsumed(totalLength).build();
}
/***
* This method builds the set of all directories found under the base path. We need to do this
* because if the directory structure /a/b/c was created with a single mkdirs() call, it is
* stored as 1 object in S3 and the list files iterator will only return a single entry /a/b/c.
*
* We keep track of paths traversed so far to prevent duplication of work. For eg, if we had
* a/b/c/file-1.txt and /a/b/c/file-2.txt, we will only recurse over the complete path once
* and won't have to do anything for file-2.txt.
*
* @param dirSet Set of all directories found in the path
* @param pathsTraversed Set of all paths traversed so far
* @param basePath Path of directory to scan
* @param parentPath Parent path of the current file/directory in the iterator
*/
private void buildDirectorySet(Set<Path> dirSet, Set<Path> pathsTraversed, Path basePath,
Path parentPath) {
if (parentPath == null || pathsTraversed.contains(parentPath) || parentPath.equals(basePath)) {
return;
}
dirSet.add(parentPath);
buildDirectorySet(dirSet, pathsTraversed, basePath, parentPath.getParent());
pathsTraversed.add(parentPath);
}
/**
@ -186,23 +209,24 @@ public class GetContentSummaryOperation extends
/**
* Get the status of a path.
* @param path path to probe.
*
* @param path path to probe.
* @param probes probes to exec
* @return the status
* @throws IOException failure
*/
@Retries.RetryTranslated
S3AFileStatus probePathStatus(Path path,
Set<StatusProbeEnum> probes) throws IOException;
S3AFileStatus probePathStatus(Path path, Set<StatusProbeEnum> probes) throws IOException;
/**
* Incremental list of all entries in a directory.
* @param path path of dir
* @return an iterator
/***
* List all entries under a path.
*
* @param path
* @param recursive if the subdirectories need to be traversed recursively
* @return an iterator over the listing.
* @throws IOException failure
*/
RemoteIterator<S3AFileStatus> listStatusIterator(Path path)
RemoteIterator<S3ALocatedFileStatus> listFilesIterator(Path path, boolean recursive)
throws IOException;
}
}

View File

@ -0,0 +1,70 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.contract.s3a;
import org.assertj.core.api.Assertions;
import org.junit.Test;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.ContentSummary;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.contract.AbstractContractContentSummaryTest;
import org.apache.hadoop.fs.contract.AbstractFSContract;
import org.apache.hadoop.fs.s3a.S3AFileSystem;
import static org.apache.hadoop.fs.contract.ContractTestUtils.touch;
public class ITestS3AContractContentSummary extends AbstractContractContentSummaryTest {
@Test
public void testGetContentSummaryDir() throws Throwable {
describe("getContentSummary on test dir with children");
S3AFileSystem fs = getFileSystem();
Path baseDir = methodPath();
// Nested folders created separately will return as separate objects in listFiles()
fs.mkdirs(new Path(baseDir, "a"));
fs.mkdirs(new Path(baseDir, "a/b"));
fs.mkdirs(new Path(baseDir, "a/b/a"));
// Will return as one object
fs.mkdirs(new Path(baseDir, "d/e/f"));
Path filePath = new Path(baseDir, "a/b/file");
touch(fs, filePath);
// look at path to see if it is a file
// it is not: so LIST
final ContentSummary summary = fs.getContentSummary(baseDir);
Assertions.assertThat(summary.getDirectoryCount()).as("Summary " + summary).isEqualTo(7);
Assertions.assertThat(summary.getFileCount()).as("Summary " + summary).isEqualTo(1);
}
@Override
protected AbstractFSContract createContract(Configuration conf) {
return new S3AContract(conf);
}
@Override
public S3AFileSystem getFileSystem() {
return (S3AFileSystem) super.getFileSystem();
}
}

View File

@ -144,8 +144,8 @@ public class ITestS3AMiscOperationCost extends AbstractS3ACostTest {
with(INVOCATION_GET_CONTENT_SUMMARY, 1),
withAuditCount(1),
always(FILE_STATUS_FILE_PROBE // look at path to see if it is a file
.plus(LIST_OPERATION) // it is not: so LIST
.plus(LIST_OPERATION))); // and a LIST on the child dir
.plus(LIST_OPERATION))); // it is not: so LIST
Assertions.assertThat(summary.getDirectoryCount())
.as("Summary " + summary)
.isEqualTo(2);

View File

@ -19,6 +19,7 @@
package org.apache.hadoop.fs.s3a.scale;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.ContentSummary;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
@ -154,6 +155,38 @@ public class ITestS3ADirectoryPerformance extends S3AScaleTestBase {
listStatusCalls,
getFileStatusCalls);
describe("Get content summary for directory");
NanoTimer getContentSummaryTimer = new NanoTimer();
ContentSummary rootPathSummary = fs.getContentSummary(scaleTestDir);
ContentSummary testPathSummary = fs.getContentSummary(listDir);
getContentSummaryTimer.end("getContentSummary of %s", created);
// only two list operations should have taken place
print(LOG,
metadataRequests,
listRequests,
listContinueRequests,
listStatusCalls,
getFileStatusCalls);
assertEquals(listRequests.toString(), 2, listRequests.diff());
reset(metadataRequests,
listRequests,
listContinueRequests,
listStatusCalls,
getFileStatusCalls);
assertTrue("Root directory count should be > test path",
rootPathSummary.getDirectoryCount() > testPathSummary.getDirectoryCount());
assertTrue("Root file count should be >= to test path",
rootPathSummary.getFileCount() >= testPathSummary.getFileCount());
assertEquals("Incorrect directory count", created.getDirCount() + 1,
testPathSummary.getDirectoryCount());
assertEquals("Incorrect file count", created.getFileCount(),
testPathSummary.getFileCount());
} finally {
describe("deletion");
// deletion at the end of the run