diff --git a/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/filesystem.md b/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/filesystem.md index 3751847c7f7..9440b4e050a 100644 --- a/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/filesystem.md +++ b/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/filesystem.md @@ -96,6 +96,17 @@ can be queried to find if the path has an ACL. `getFileStatus(Path p).isEncrypte can be queried to find if the path is encrypted. `getFileStatus(Path p).isErasureCoded()` will tell if the path is erasure coded or not. +YARN's distributed cache lets applications add paths to be cached across +containers and applications via `Job.addCacheFile()` and `Job.addCacheArchive()`. +The cache treats world-readable resources paths added as shareable across +applications, and downloads them differently, unless they are declared as encrypted. + +To avoid failures during container launching, especially when delegation tokens +are used, filesystems and object stores which not implement POSIX access permissions +for both files and directories, MUST always return `true` to the `isEncrypted()` +predicate. This can be done by setting the `encrypted` flag to true when creating +the `FileStatus` instance. + ### `Path getHomeDirectory()` The function `getHomeDirectory` returns the home directory for the FileSystem diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractOpenTest.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractOpenTest.java index c9283dcd522..b6e94a66416 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractOpenTest.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractOpenTest.java @@ -80,10 +80,20 @@ public abstract class AbstractContractOpenTest final Path path = path("file"); createFile(getFileSystem(), path, false, new byte[0]); final FileStatus stat = getFileSystem().getFileStatus(path); - assertFalse("Expecting false for stat.isEncrypted()", + assertEquals("Result wrong for for isEncrypted() in " + stat, + areZeroByteFilesEncrypted(), stat.isEncrypted()); } + /** + * Are zero byte files encrypted. This is implicitly + * false for filesystems which do not encrypt. + * @return true iff zero byte files are encrypted. + */ + protected boolean areZeroByteFilesEncrypted() { + return false; + } + @Test public void testOpenReadDir() throws Throwable { describe("create & read a directory"); diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileStatus.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileStatus.java index be08afe4b10..ca6ca908bec 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileStatus.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileStatus.java @@ -54,7 +54,9 @@ public class S3AFileStatus extends FileStatus { public S3AFileStatus(Tristate isemptydir, Path path, String owner) { - super(0, true, 1, 0, 0, path); + super(0, true, 1, 0, 0, 0, + null, null, null, null, + path, false, true, false); isEmptyDirectory = isemptydir; setOwner(owner); setGroup(owner); @@ -70,7 +72,9 @@ public class S3AFileStatus extends FileStatus { */ public S3AFileStatus(long length, long modification_time, Path path, long blockSize, String owner) { - super(length, false, 1, blockSize, modification_time, path); + super(length, false, 1, blockSize, modification_time, 0, + null, null, null, null, + path, false, true, false); isEmptyDirectory = Tristate.FALSE; setOwner(owner); setGroup(owner); diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractOpen.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractOpen.java index 8e338b71d11..d78273b1471 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractOpen.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractOpen.java @@ -45,4 +45,13 @@ public class ITestS3AContractOpen extends AbstractContractOpenTest { protected AbstractFSContract createContract(Configuration conf) { return new S3AContract(conf); } + + /** + * S3A always declares zero byte files as encrypted. + * @return true, always. + */ + @Override + protected boolean areZeroByteFilesEncrypted() { + return true; + } } diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/auth/delegation/AbstractDelegationIT.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/auth/delegation/AbstractDelegationIT.java index 7651e24a692..cd94d19c655 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/auth/delegation/AbstractDelegationIT.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/auth/delegation/AbstractDelegationIT.java @@ -70,7 +70,10 @@ public abstract class AbstractDelegationIT extends AbstractS3ATestBase { assertEquals("Kind of token " + token, kind, token.getKind()); - return token.decodeIdentifier(); + AbstractS3ATokenIdentifier tid + = token.decodeIdentifier(); + LOG.info("Found for URI {}, token {}", uri, tid); + return tid; } /** diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/auth/delegation/ITestDelegatedMRJob.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/auth/delegation/ITestDelegatedMRJob.java index 2170e53103c..75ea01c0724 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/auth/delegation/ITestDelegatedMRJob.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/auth/delegation/ITestDelegatedMRJob.java @@ -16,6 +16,7 @@ */ package org.apache.hadoop.fs.s3a.auth.delegation; +import java.net.URI; import java.util.Arrays; import java.util.Collection; @@ -29,6 +30,7 @@ import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.examples.WordCount; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.s3a.S3AFileSystem; @@ -47,6 +49,7 @@ import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.yarn.conf.YarnConfiguration; import static java.util.Objects.requireNonNull; +import static org.apache.hadoop.fs.s3a.Constants.S3GUARD_METASTORE_NULL; import static org.apache.hadoop.fs.s3a.S3ATestUtils.assumeSessionTestsEnabled; import static org.apache.hadoop.fs.s3a.S3ATestUtils.deployService; import static org.apache.hadoop.fs.s3a.S3ATestUtils.disableFilesystemCaching; @@ -72,7 +75,12 @@ import static org.apache.hadoop.fs.s3a.auth.delegation.MiniKerberizedHadoopClust * of org.apache.hadoop.mapreduce.protocol.ClientProtocol is mock. * * It's still an ITest though, as it does use S3A as the source and - * dest so as to collect URLs. + * dest so as to collect delegation tokens. + * + * It also uses the open street map open bucket, so that there's an extra + * S3 URL in job submission which can be added as a job resource. + * This is needed to verify that job resources have their tokens extracted + * too. */ @RunWith(Parameterized.class) public class ITestDelegatedMRJob extends AbstractDelegationIT { @@ -99,6 +107,11 @@ public class ITestDelegatedMRJob extends AbstractDelegationIT { private Path destPath; + private static final Path EXTRA_JOB_RESOURCE_PATH + = new Path("s3a://osm-pds/planet/planet-latest.orc"); + + public static final URI jobResource = EXTRA_JOB_RESOURCE_PATH.toUri(); + /** * Test array for parameterized test runs. * @return a list of parameter tuples. @@ -149,6 +162,15 @@ public class ITestDelegatedMRJob extends AbstractDelegationIT { conf.setInt(YarnConfiguration.RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_MS, 10_000); + // turn off DDB for the job resource bucket + String host = jobResource.getHost(); + conf.set( + String.format("fs.s3a.bucket.%s.metadatastore.impl", host), + S3GUARD_METASTORE_NULL); + // and fix to the main endpoint if the caller has moved + conf.set( + String.format("fs.s3a.bucket.%s.endpoint", host), ""); + // set up DTs enableDelegationTokens(conf, tokenBinding); return conf; @@ -210,6 +232,15 @@ public class ITestDelegatedMRJob extends AbstractDelegationIT { return getTestTimeoutSeconds() * 1000; } + @Test + public void testCommonCrawlLookup() throws Throwable { + FileSystem resourceFS = EXTRA_JOB_RESOURCE_PATH.getFileSystem( + getConfiguration()); + FileStatus status = resourceFS.getFileStatus(EXTRA_JOB_RESOURCE_PATH); + LOG.info("Extra job resource is {}", status); + assertTrue("Not encrypted: " + status, status.isEncrypted()); + } + @Test public void testJobSubmissionCollectsTokens() throws Exception { describe("Mock Job test"); @@ -242,6 +273,14 @@ public class ITestDelegatedMRJob extends AbstractDelegationIT { job.setMaxMapAttempts(1); job.setMaxReduceAttempts(1); + // and a file for a different store. + // This is to actually stress the terasort code for which + // the yarn ResourceLocalizationService was having problems with + // fetching resources from. + URI partitionUri = new URI(EXTRA_JOB_RESOURCE_PATH.toString() + + "#_partition.lst"); + job.addCacheFile(partitionUri); + describe("Executing Mock Job Submission to %s", output); job.submit(); @@ -267,6 +306,8 @@ public class ITestDelegatedMRJob extends AbstractDelegationIT { lookupToken(submittedCredentials, sourceFS.getUri(), tokenKind); // look up the destination token lookupToken(submittedCredentials, fs.getUri(), tokenKind); + lookupToken(submittedCredentials, + EXTRA_JOB_RESOURCE_PATH.getFileSystem(conf).getUri(), tokenKind); } } diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/mapreduce/filecache/TestS3AResourceScope.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/mapreduce/filecache/TestS3AResourceScope.java new file mode 100644 index 00000000000..c9b1ddc97ee --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/mapreduce/filecache/TestS3AResourceScope.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapreduce.filecache; + +import java.io.IOException; +import java.net.URI; +import java.util.HashMap; +import java.util.Map; + +import org.junit.Test; + +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.s3a.S3AFileStatus; +import org.apache.hadoop.test.HadoopTestBase; + +/** + * Test how S3A resources are scoped in YARN caching. + * In this package to make use of package-private methods of + * {@link ClientDistributedCacheManager}. + */ +public class TestS3AResourceScope extends HadoopTestBase { + + private static final Path PATH = new Path("s3a://example/path"); + + @Test + public void testS3AFilesArePrivate() throws Throwable { + S3AFileStatus status = new S3AFileStatus(false, PATH, "self"); + assertTrue("Not encrypted: " + status, status.isEncrypted()); + assertNotExecutable(status); + } + + @Test + public void testS3AFilesArePrivateOtherContstructor() throws Throwable { + S3AFileStatus status = new S3AFileStatus(0, 0, PATH, 1, "self"); + assertTrue("Not encrypted: " + status, status.isEncrypted()); + assertNotExecutable(status); + } + + private void assertNotExecutable(final S3AFileStatus status) + throws IOException { + Map cache = new HashMap<>(); + cache.put(PATH.toUri(), status); + assertFalse("Should not have been executable " + status, + ClientDistributedCacheManager.ancestorsHaveExecutePermissions( + null, PATH, cache)); + } +}