HADOOP-16396. Allow authoritative mode on a subdirectory. (#1043)
This commit is contained in:
parent
a2a8be18cb
commit
34747c373f
|
@ -363,6 +363,10 @@ public final class Constants {
|
|||
|
||||
public static final String USER_AGENT_PREFIX = "fs.s3a.user.agent.prefix";
|
||||
|
||||
/** Whether or not to allow MetadataStore to be source of truth for a path prefix */
|
||||
public static final String AUTHORITATIVE_PATH = "fs.s3a.authoritative.path";
|
||||
public static final String[] DEFAULT_AUTHORITATIVE_PATH = {};
|
||||
|
||||
/** Whether or not to allow MetadataStore to be source of truth. */
|
||||
public static final String METADATASTORE_AUTHORITATIVE =
|
||||
"fs.s3a.metadatastore.authoritative";
|
||||
|
|
|
@ -237,7 +237,8 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
|||
private final AtomicBoolean closed = new AtomicBoolean(false);
|
||||
private volatile boolean isClosed = false;
|
||||
private MetadataStore metadataStore;
|
||||
private boolean allowAuthoritative;
|
||||
private boolean allowAuthoritativeMetadataStore;
|
||||
private Collection<String> allowAuthoritativePaths;
|
||||
|
||||
/** Delegation token integration; non-empty when DT support is enabled. */
|
||||
private Optional<S3ADelegationTokens> delegationTokens = Optional.empty();
|
||||
|
@ -397,11 +398,13 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
|||
ttlTimeProvider = new S3Guard.TtlTimeProvider(authDirTtl);
|
||||
|
||||
setMetadataStore(S3Guard.getMetadataStore(this));
|
||||
allowAuthoritative = conf.getBoolean(METADATASTORE_AUTHORITATIVE,
|
||||
allowAuthoritativeMetadataStore = conf.getBoolean(METADATASTORE_AUTHORITATIVE,
|
||||
DEFAULT_METADATASTORE_AUTHORITATIVE);
|
||||
allowAuthoritativePaths = S3Guard.getAuthoritativePaths(this);
|
||||
|
||||
if (hasMetadataStore()) {
|
||||
LOG.debug("Using metadata store {}, authoritative={}",
|
||||
getMetadataStore(), allowAuthoritative);
|
||||
LOG.debug("Using metadata store {}, authoritative store={}, authoritative path={}",
|
||||
getMetadataStore(), allowAuthoritativeMetadataStore, allowAuthoritativePaths);
|
||||
}
|
||||
initMultipartUploads(conf);
|
||||
} catch (AmazonClientException e) {
|
||||
|
@ -840,7 +843,8 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
|||
* @param key s3 key or ""
|
||||
* @return the with a trailing "/", or, if it is the root key, "",
|
||||
*/
|
||||
private String maybeAddTrailingSlash(String key) {
|
||||
@InterfaceAudience.Private
|
||||
public String maybeAddTrailingSlash(String key) {
|
||||
if (!key.isEmpty() && !key.endsWith("/")) {
|
||||
return key + '/';
|
||||
} else {
|
||||
|
@ -1446,7 +1450,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
|||
*/
|
||||
@VisibleForTesting
|
||||
boolean hasAuthoritativeMetadataStore() {
|
||||
return hasMetadataStore() && allowAuthoritative;
|
||||
return hasMetadataStore() && allowAuthoritativeMetadataStore;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -2398,6 +2402,8 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
|||
|
||||
DirListingMetadata dirMeta =
|
||||
S3Guard.listChildrenWithTtl(metadataStore, path, ttlTimeProvider);
|
||||
boolean allowAuthoritative = S3Guard.allowAuthoritative(f, this,
|
||||
allowAuthoritativeMetadataStore, allowAuthoritativePaths);
|
||||
if (allowAuthoritative && dirMeta != null && dirMeta.isAuthoritative()) {
|
||||
return S3Guard.dirMetaToStatuses(dirMeta);
|
||||
}
|
||||
|
@ -2415,6 +2421,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
|||
result.add(files.next());
|
||||
}
|
||||
// merge the results. This will update the store as needed
|
||||
|
||||
return S3Guard.dirListingUnion(metadataStore, path, result, dirMeta,
|
||||
allowAuthoritative, ttlTimeProvider);
|
||||
} else {
|
||||
|
@ -2629,6 +2636,8 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
|||
// dest is also a directory, there's no difference.
|
||||
// TODO After HADOOP-16085 the modification detection can be done with
|
||||
// etags or object version instead of modTime
|
||||
boolean allowAuthoritative = S3Guard.allowAuthoritative(f, this,
|
||||
allowAuthoritativeMetadataStore, allowAuthoritativePaths);
|
||||
if (!pm.getFileStatus().isDirectory() &&
|
||||
!allowAuthoritative) {
|
||||
LOG.debug("Metadata for {} found in the non-auth metastore.", path);
|
||||
|
@ -3554,7 +3563,8 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
|||
sb.append(", blockFactory=").append(blockFactory);
|
||||
}
|
||||
sb.append(", metastore=").append(metadataStore);
|
||||
sb.append(", authoritative=").append(allowAuthoritative);
|
||||
sb.append(", authoritativeStore=").append(allowAuthoritativeMetadataStore);
|
||||
sb.append(", authoritativePath=").append(allowAuthoritativePaths);
|
||||
sb.append(", useListV1=").append(useListV1);
|
||||
if (committerIntegration != null) {
|
||||
sb.append(", magicCommitter=").append(isMagicCommitEnabled());
|
||||
|
@ -3794,10 +3804,13 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
|||
key, delimiter);
|
||||
final RemoteIterator<S3AFileStatus> cachedFilesIterator;
|
||||
final Set<Path> tombstones;
|
||||
boolean allowAuthoritative = S3Guard.allowAuthoritative(f, this,
|
||||
allowAuthoritativeMetadataStore, allowAuthoritativePaths);
|
||||
if (recursive) {
|
||||
final PathMetadata pm = metadataStore.get(path, true);
|
||||
// shouldn't need to check pm.isDeleted() because that will have
|
||||
// been caught by getFileStatus above.
|
||||
|
||||
MetadataStoreListFilesIterator metadataStoreListFilesIterator =
|
||||
new MetadataStoreListFilesIterator(metadataStore, pm,
|
||||
allowAuthoritative);
|
||||
|
@ -3886,6 +3899,8 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
|||
final RemoteIterator<S3AFileStatus> cachedFileStatusIterator =
|
||||
listing.createProvidedFileStatusIterator(
|
||||
S3Guard.dirMetaToStatuses(meta), filter, acceptor);
|
||||
boolean allowAuthoritative = S3Guard.allowAuthoritative(f, this,
|
||||
allowAuthoritativeMetadataStore, allowAuthoritativePaths);
|
||||
return (allowAuthoritative && meta != null
|
||||
&& meta.isAuthoritative())
|
||||
? listing.createLocatedFileStatusIterator(
|
||||
|
|
|
@ -399,6 +399,8 @@ public class CommitOperations {
|
|||
conf.getTrimmed(S3_METADATA_STORE_IMPL, ""));
|
||||
successData.addDiagnostic(METADATASTORE_AUTHORITATIVE,
|
||||
conf.getTrimmed(METADATASTORE_AUTHORITATIVE, "false"));
|
||||
successData.addDiagnostic(AUTHORITATIVE_PATH,
|
||||
conf.getTrimmed(AUTHORITATIVE_PATH, ""));
|
||||
successData.addDiagnostic(MAGIC_COMMITTER_ENABLED,
|
||||
conf.getTrimmed(MAGIC_COMMITTER_ENABLED, "false"));
|
||||
|
||||
|
|
|
@ -1321,7 +1321,6 @@ public class DynamoDBMetadataStore implements MetadataStore,
|
|||
final DirListingMetadata meta,
|
||||
@Nullable final BulkOperationState operationState) throws IOException {
|
||||
LOG.debug("Saving to table {} in region {}: {}", tableName, region, meta);
|
||||
|
||||
// directory path
|
||||
Path path = meta.getPath();
|
||||
DDBPathMetadata ddbPathMeta =
|
||||
|
|
|
@ -34,6 +34,7 @@ import javax.annotation.Nullable;
|
|||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.base.Preconditions;
|
||||
import org.apache.hadoop.fs.s3a.S3AFileSystem;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
|
@ -49,9 +50,8 @@ import org.apache.hadoop.fs.s3a.S3AFileStatus;
|
|||
import org.apache.hadoop.fs.s3a.S3AInstrumentation;
|
||||
import org.apache.hadoop.util.ReflectionUtils;
|
||||
|
||||
import static org.apache.hadoop.fs.s3a.Constants.DEFAULT_METADATASTORE_METADATA_TTL;
|
||||
import static org.apache.hadoop.fs.s3a.Constants.METADATASTORE_METADATA_TTL;
|
||||
import static org.apache.hadoop.fs.s3a.Constants.S3_METADATA_STORE_IMPL;
|
||||
import static org.apache.hadoop.fs.s3a.Constants.*;
|
||||
import static org.apache.hadoop.fs.s3a.Constants.DEFAULT_AUTHORITATIVE_PATH;
|
||||
import static org.apache.hadoop.fs.s3a.Statistic.S3GUARD_METADATASTORE_PUT_PATH_LATENCY;
|
||||
import static org.apache.hadoop.fs.s3a.Statistic.S3GUARD_METADATASTORE_PUT_PATH_REQUEST;
|
||||
import static org.apache.hadoop.fs.s3a.S3AUtils.createUploadFileStatus;
|
||||
|
@ -772,4 +772,33 @@ public final class S3Guard {
|
|||
return dlm;
|
||||
}
|
||||
|
||||
public static Collection<String> getAuthoritativePaths(S3AFileSystem fs) {
|
||||
String[] rawAuthoritativePaths =
|
||||
fs.getConf().getTrimmedStrings(AUTHORITATIVE_PATH, DEFAULT_AUTHORITATIVE_PATH);
|
||||
Collection<String> authoritativePaths = new ArrayList<>();
|
||||
if (rawAuthoritativePaths.length > 0) {
|
||||
for (int i = 0; i < rawAuthoritativePaths.length; i++) {
|
||||
Path qualified = fs.qualify(new Path(rawAuthoritativePaths[i]));
|
||||
authoritativePaths.add(fs.maybeAddTrailingSlash(qualified.toString()));
|
||||
}
|
||||
}
|
||||
return authoritativePaths;
|
||||
}
|
||||
|
||||
public static boolean allowAuthoritative(Path p, S3AFileSystem fs,
|
||||
boolean authMetadataStore, Collection<String> authPaths) {
|
||||
String haystack = fs.maybeAddTrailingSlash(p.toString());
|
||||
if (authMetadataStore) {
|
||||
return true;
|
||||
}
|
||||
if (!authPaths.isEmpty()) {
|
||||
for (String needle : authPaths) {
|
||||
|
||||
if (haystack.startsWith(needle)) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1177,8 +1177,10 @@ public abstract class S3GuardTool extends Configured implements Tool {
|
|||
if (usingS3Guard) {
|
||||
out.printf("Filesystem %s is using S3Guard with store %s%n",
|
||||
fsUri, store.toString());
|
||||
printOption(out, "Authoritative S3Guard",
|
||||
printOption(out, "Authoritative Metadata Store",
|
||||
METADATASTORE_AUTHORITATIVE, "false");
|
||||
printOption(out, "Authoritative Path",
|
||||
AUTHORITATIVE_PATH, "");
|
||||
authMode = conf.getBoolean(METADATASTORE_AUTHORITATIVE, false);
|
||||
printStoreDiagnostics(out, store);
|
||||
} else {
|
||||
|
|
|
@ -113,6 +113,9 @@ two different reasons:
|
|||
stored in metadata store.
|
||||
* This mode can be set as a configuration property
|
||||
`fs.s3a.metadatastore.authoritative`
|
||||
* It can also be set only on specific directories by setting
|
||||
`fs.s3a.authoritative.path` to one or more prefixes, for example
|
||||
`s3a://bucket/path` or "/auth1,/auth2".
|
||||
* All interactions with the S3 bucket(s) must be through S3A clients sharing
|
||||
the same metadata store.
|
||||
* This is independent from which metadata store implementation is used.
|
||||
|
|
|
@ -0,0 +1,301 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.fs.s3a;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.URI;
|
||||
import java.util.Collection;
|
||||
|
||||
import org.apache.hadoop.fs.s3a.s3guard.NullMetadataStore;
|
||||
import org.apache.hadoop.fs.s3a.s3guard.S3Guard;
|
||||
import org.apache.hadoop.io.IOUtils;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.s3a.s3guard.MetadataStore;
|
||||
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
import static org.apache.hadoop.fs.contract.ContractTestUtils.touch;
|
||||
import static org.apache.hadoop.fs.s3a.Constants.METADATASTORE_AUTHORITATIVE;
|
||||
import static org.apache.hadoop.fs.s3a.Constants.AUTHORITATIVE_PATH;
|
||||
import static org.apache.hadoop.fs.s3a.Constants.S3_METADATA_STORE_IMPL;
|
||||
import static org.apache.hadoop.fs.s3a.S3ATestUtils.checkListingContainsPath;
|
||||
import static org.apache.hadoop.fs.s3a.S3ATestUtils.checkListingDoesNotContainPath;
|
||||
import static org.apache.hadoop.fs.s3a.S3ATestUtils.metadataStorePersistsAuthoritativeBit;
|
||||
import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
|
||||
import static org.junit.Assume.assumeTrue;
|
||||
|
||||
public class ITestAuthoritativePath extends AbstractS3ATestBase {
|
||||
|
||||
public Path testRoot;
|
||||
|
||||
private S3AFileSystem fullyAuthFS;
|
||||
private S3AFileSystem rawFS;
|
||||
|
||||
private MetadataStore ms;
|
||||
|
||||
@Before
|
||||
public void setup() throws Exception {
|
||||
super.setup();
|
||||
|
||||
long timestamp = System.currentTimeMillis();
|
||||
testRoot = path("" + timestamp);
|
||||
|
||||
S3AFileSystem fs = getFileSystem();
|
||||
// These test will fail if no ms
|
||||
assumeTrue("FS needs to have a metadatastore.",
|
||||
fs.hasMetadataStore());
|
||||
assumeTrue("Metadatastore should persist authoritative bit",
|
||||
metadataStorePersistsAuthoritativeBit(fs.getMetadataStore()));
|
||||
|
||||
// This test setup shares a single metadata store across instances,
|
||||
// so that test runs with a local FS work.
|
||||
// but this needs to be addressed in teardown, where the Auth fs
|
||||
// needs to be detached from the metadata store before it is closed,
|
||||
ms = fs.getMetadataStore();
|
||||
|
||||
fullyAuthFS = createFullyAuthFS();
|
||||
assertTrue("No S3Guard store for fullyAuthFS",
|
||||
fullyAuthFS.hasMetadataStore());
|
||||
assertTrue("Authoritative mode off in fullyAuthFS",
|
||||
fullyAuthFS.hasAuthoritativeMetadataStore());
|
||||
|
||||
rawFS = createRawFS();
|
||||
assertFalse("UnguardedFS still has S3Guard",
|
||||
rawFS.hasMetadataStore());
|
||||
}
|
||||
|
||||
private void cleanUpFS(S3AFileSystem fs) {
|
||||
// detach from the (shared) metadata store.
|
||||
fs.setMetadataStore(new NullMetadataStore());
|
||||
|
||||
IOUtils.cleanupWithLogger(LOG, fs);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void teardown() throws Exception {
|
||||
fullyAuthFS.delete(testRoot, true);
|
||||
|
||||
cleanUpFS(fullyAuthFS);
|
||||
cleanUpFS(rawFS);
|
||||
super.teardown();
|
||||
}
|
||||
|
||||
private S3AFileSystem createFullyAuthFS()
|
||||
throws Exception {
|
||||
S3AFileSystem testFS = getFileSystem();
|
||||
Configuration config = new Configuration(testFS.getConf());
|
||||
URI uri = testFS.getUri();
|
||||
|
||||
removeBaseAndBucketOverrides(uri.getHost(), config,
|
||||
METADATASTORE_AUTHORITATIVE);
|
||||
config.setBoolean(METADATASTORE_AUTHORITATIVE, true);
|
||||
final S3AFileSystem newFS = createFS(uri, config);
|
||||
// set back the same metadata store instance
|
||||
newFS.setMetadataStore(ms);
|
||||
return newFS;
|
||||
}
|
||||
|
||||
private S3AFileSystem createSinglePathAuthFS(String authPath)
|
||||
throws Exception {
|
||||
S3AFileSystem testFS = getFileSystem();
|
||||
Configuration config = new Configuration(testFS.getConf());
|
||||
URI uri = testFS.getUri();
|
||||
|
||||
removeBaseAndBucketOverrides(uri.getHost(), config,
|
||||
METADATASTORE_AUTHORITATIVE);
|
||||
config.set(AUTHORITATIVE_PATH, authPath.toString());
|
||||
final S3AFileSystem newFS = createFS(uri, config);
|
||||
// set back the same metadata store instance
|
||||
newFS.setMetadataStore(ms);
|
||||
return newFS;
|
||||
}
|
||||
|
||||
private S3AFileSystem createMultiPathAuthFS(String first, String middle, String last)
|
||||
throws Exception {
|
||||
S3AFileSystem testFS = getFileSystem();
|
||||
Configuration config = new Configuration(testFS.getConf());
|
||||
URI uri = testFS.getUri();
|
||||
|
||||
removeBaseAndBucketOverrides(uri.getHost(), config,
|
||||
METADATASTORE_AUTHORITATIVE);
|
||||
config.set(AUTHORITATIVE_PATH, first + "," + middle + "," + last);
|
||||
final S3AFileSystem newFS = createFS(uri, config);
|
||||
// set back the same metadata store instance
|
||||
newFS.setMetadataStore(ms);
|
||||
return newFS;
|
||||
}
|
||||
|
||||
private S3AFileSystem createRawFS() throws Exception {
|
||||
S3AFileSystem testFS = getFileSystem();
|
||||
Configuration config = new Configuration(testFS.getConf());
|
||||
URI uri = testFS.getUri();
|
||||
|
||||
removeBaseAndBucketOverrides(uri.getHost(), config,
|
||||
S3_METADATA_STORE_IMPL);
|
||||
removeBaseAndBucketOverrides(uri.getHost(), config,
|
||||
METADATASTORE_AUTHORITATIVE);
|
||||
return createFS(uri, config);
|
||||
}
|
||||
|
||||
/**
|
||||
* Create and initialize a new filesystem.
|
||||
* This filesystem MUST be closed in test teardown.
|
||||
* @param uri FS URI
|
||||
* @param config config.
|
||||
* @return new instance
|
||||
* @throws IOException failure
|
||||
*/
|
||||
private S3AFileSystem createFS(final URI uri, final Configuration config)
|
||||
throws IOException {
|
||||
S3AFileSystem fs2 = new S3AFileSystem();
|
||||
fs2.initialize(uri, config);
|
||||
return fs2;
|
||||
}
|
||||
|
||||
private void runTestOutsidePath(S3AFileSystem partiallyAuthFS, Path nonAuthPath) throws Exception {
|
||||
Path inBandPath = new Path(nonAuthPath, "out-of-path-in-band");
|
||||
Path outOfBandPath = new Path(nonAuthPath, "out-of-path-out-of-band");
|
||||
|
||||
touch(fullyAuthFS, inBandPath);
|
||||
|
||||
// trigger an authoritative write-back
|
||||
fullyAuthFS.listStatus(inBandPath.getParent());
|
||||
|
||||
touch(rawFS, outOfBandPath);
|
||||
|
||||
// listing lacks outOfBandPath => short-circuited by auth mode
|
||||
checkListingDoesNotContainPath(fullyAuthFS, outOfBandPath);
|
||||
|
||||
// partiallyAuthFS differs from fullyAuthFS because we're outside the path
|
||||
checkListingContainsPath(partiallyAuthFS, outOfBandPath);
|
||||
|
||||
// sanity check that in-band operations are always visible
|
||||
checkListingContainsPath(fullyAuthFS, inBandPath);
|
||||
checkListingContainsPath(partiallyAuthFS, inBandPath);
|
||||
|
||||
}
|
||||
|
||||
private void runTestInsidePath(S3AFileSystem partiallyAuthFS, Path authPath) throws Exception {
|
||||
Path inBandPath = new Path(authPath, "in-path-in-band");
|
||||
Path outOfBandPath = new Path(authPath, "in-path-out-of-band");
|
||||
|
||||
touch(fullyAuthFS, inBandPath);
|
||||
|
||||
// trigger an authoritative write-back
|
||||
fullyAuthFS.listStatus(inBandPath.getParent());
|
||||
|
||||
touch(rawFS, outOfBandPath);
|
||||
|
||||
// listing lacks outOfBandPath => short-circuited by auth mode
|
||||
checkListingDoesNotContainPath(fullyAuthFS, outOfBandPath);
|
||||
checkListingDoesNotContainPath(partiallyAuthFS, outOfBandPath);
|
||||
|
||||
// sanity check that in-band operations are always successful
|
||||
checkListingContainsPath(fullyAuthFS, inBandPath);
|
||||
checkListingContainsPath(partiallyAuthFS, inBandPath);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSingleAuthPath() throws Exception {
|
||||
Path authPath = new Path(testRoot, "testSingleAuthPath-auth");
|
||||
Path nonAuthPath = new Path(testRoot, "testSingleAuthPath");
|
||||
S3AFileSystem fs = createSinglePathAuthFS(authPath.toString());
|
||||
try {
|
||||
assertTrue("No S3Guard store for partially authoritative FS",
|
||||
fs.hasMetadataStore());
|
||||
|
||||
runTestInsidePath(fs, authPath);
|
||||
runTestOutsidePath(fs, nonAuthPath);
|
||||
} finally {
|
||||
cleanUpFS(fs);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMultiAuthPath() throws Exception {
|
||||
Path authPath;
|
||||
Path nonAuthPath;
|
||||
S3AFileSystem fs = null;
|
||||
String decoy1 = "/decoy1";
|
||||
String decoy2 = "/decoy2";
|
||||
|
||||
try {
|
||||
authPath = new Path(testRoot, "testMultiAuthPath-first");
|
||||
nonAuthPath = new Path(testRoot, "nonAuth-1");
|
||||
fs = createMultiPathAuthFS(authPath.toString(), decoy1, decoy2);
|
||||
assertTrue("No S3Guard store for partially authoritative FS",
|
||||
fs.hasMetadataStore());
|
||||
|
||||
runTestInsidePath(fs, authPath);
|
||||
runTestOutsidePath(fs, nonAuthPath);
|
||||
} finally {
|
||||
cleanUpFS(fs);
|
||||
}
|
||||
|
||||
try {
|
||||
authPath = new Path(testRoot, "testMultiAuthPath-middle");
|
||||
nonAuthPath = new Path(testRoot, "nonAuth-2");
|
||||
fs = createMultiPathAuthFS(decoy1, authPath.toString(), decoy2);
|
||||
assertTrue("No S3Guard store for partially authoritative FS",
|
||||
fs.hasMetadataStore());
|
||||
|
||||
runTestInsidePath(fs, authPath);
|
||||
runTestOutsidePath(fs, nonAuthPath);
|
||||
} finally {
|
||||
cleanUpFS(fs);
|
||||
}
|
||||
|
||||
try {
|
||||
authPath = new Path(testRoot, "testMultiAuthPath-last");
|
||||
nonAuthPath = new Path(testRoot, "nonAuth-3");
|
||||
fs = createMultiPathAuthFS(decoy1, decoy2, authPath.toString());
|
||||
assertTrue("No S3Guard store for partially authoritative FS",
|
||||
fs.hasMetadataStore());
|
||||
|
||||
runTestInsidePath(fs, authPath);
|
||||
runTestOutsidePath(fs, nonAuthPath);
|
||||
} finally {
|
||||
cleanUpFS(fs);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testPrefixVsDirectory() throws Exception {
|
||||
S3AFileSystem fs = createSinglePathAuthFS("/auth");
|
||||
Collection<String> authPaths = S3Guard.getAuthoritativePaths(fs);
|
||||
|
||||
try{
|
||||
Path totalMismatch = new Path(testRoot, "/non-auth");
|
||||
assertFalse(S3Guard.allowAuthoritative(totalMismatch, fs,
|
||||
false, authPaths));
|
||||
|
||||
Path prefixMatch = new Path(testRoot, "/authoritative");
|
||||
assertFalse(S3Guard.allowAuthoritative(prefixMatch, fs,
|
||||
false, authPaths));
|
||||
|
||||
Path directoryMatch = new Path(testRoot, "/auth/oritative");
|
||||
assertTrue(S3Guard.allowAuthoritative(directoryMatch, fs,
|
||||
false, authPaths));
|
||||
} finally {
|
||||
cleanUpFS(fs);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -47,21 +47,21 @@ import org.apache.hadoop.fs.s3a.s3guard.DirListingMetadata;
|
|||
import org.apache.hadoop.fs.s3a.s3guard.MetadataStore;
|
||||
import org.apache.hadoop.fs.s3a.s3guard.PathMetadata;
|
||||
import org.apache.hadoop.fs.s3a.s3guard.ITtlTimeProvider;
|
||||
import org.apache.hadoop.fs.LocatedFileStatus;
|
||||
import org.apache.hadoop.fs.RemoteIterator;
|
||||
|
||||
import static org.apache.hadoop.fs.contract.ContractTestUtils.readBytesToString;
|
||||
import static org.apache.hadoop.fs.contract.ContractTestUtils.touch;
|
||||
import static org.apache.hadoop.fs.contract.ContractTestUtils.writeTextFile;
|
||||
import static org.apache.hadoop.fs.s3a.Constants.DEFAULT_METADATASTORE_METADATA_TTL;
|
||||
import static org.apache.hadoop.fs.s3a.Constants.METADATASTORE_AUTHORITATIVE;
|
||||
import static org.apache.hadoop.fs.s3a.Constants.METADATASTORE_METADATA_TTL;
|
||||
import static org.apache.hadoop.fs.s3a.Constants.S3_METADATA_STORE_IMPL;
|
||||
import static org.apache.hadoop.fs.s3a.S3ATestUtils.checkListingContainsPath;
|
||||
import static org.apache.hadoop.fs.s3a.S3ATestUtils.checkListingDoesNotContainPath;
|
||||
import static org.apache.hadoop.fs.s3a.S3ATestUtils.metadataStorePersistsAuthoritativeBit;
|
||||
import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
|
||||
import static org.apache.hadoop.test.LambdaTestUtils.eventually;
|
||||
import static org.junit.Assume.assumeTrue;
|
||||
import static org.apache.hadoop.fs.contract.ContractTestUtils.readBytesToString;
|
||||
import static org.apache.hadoop.fs.contract.ContractTestUtils.writeTextFile;
|
||||
import static org.apache.hadoop.fs.s3a.Constants.METADATASTORE_AUTHORITATIVE;
|
||||
import static org.apache.hadoop.fs.s3a.Constants.S3_METADATA_STORE_IMPL;
|
||||
import static org.apache.hadoop.fs.s3a.S3ATestUtils.metadataStorePersistsAuthoritativeBit;
|
||||
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
|
||||
import static org.junit.Assume.assumeTrue;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
|
@ -538,41 +538,6 @@ public class ITestS3GuardOutOfBandOperations extends AbstractS3ATestBase {
|
|||
}
|
||||
}
|
||||
|
||||
private void checkListingDoesNotContainPath(S3AFileSystem fs, Path filePath)
|
||||
throws IOException {
|
||||
final RemoteIterator<LocatedFileStatus> listIter =
|
||||
fs.listFiles(filePath.getParent(), false);
|
||||
while (listIter.hasNext()) {
|
||||
final LocatedFileStatus lfs = listIter.next();
|
||||
assertNotEquals("The tombstone has not been expired, so must not be"
|
||||
+ " listed.", filePath, lfs.getPath());
|
||||
}
|
||||
LOG.info("{}; file omitted from listFiles listing as expected.", filePath);
|
||||
|
||||
final FileStatus[] fileStatuses = fs.listStatus(filePath.getParent());
|
||||
for (FileStatus fileStatus : fileStatuses) {
|
||||
assertNotEquals("The tombstone has not been expired, so must not be"
|
||||
+ " listed.", filePath, fileStatus.getPath());
|
||||
}
|
||||
LOG.info("{}; file omitted from listStatus as expected.", filePath);
|
||||
}
|
||||
|
||||
private void checkListingContainsPath(S3AFileSystem fs, Path filePath)
|
||||
throws IOException {
|
||||
final RemoteIterator<LocatedFileStatus> listIter =
|
||||
fs.listFiles(filePath.getParent(), false);
|
||||
|
||||
while (listIter.hasNext()) {
|
||||
final LocatedFileStatus lfs = listIter.next();
|
||||
assertEquals(filePath, lfs.getPath());
|
||||
}
|
||||
|
||||
final FileStatus[] fileStatuses = fs.listStatus(filePath.getParent());
|
||||
for (FileStatus fileStatus : fileStatuses)
|
||||
assertEquals("The file should be listed in fs.listStatus",
|
||||
filePath, fileStatus.getPath());
|
||||
}
|
||||
|
||||
/**
|
||||
* Perform an out-of-band delete.
|
||||
* @param testFilePath filename
|
||||
|
|
|
@ -27,7 +27,9 @@ import org.apache.hadoop.fs.FSDataInputStream;
|
|||
import org.apache.hadoop.fs.FileContext;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.LocatedFileStatus;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.RemoteIterator;
|
||||
import org.apache.hadoop.fs.permission.FsPermission;
|
||||
import org.apache.hadoop.fs.s3a.auth.MarshalledCredentialBinding;
|
||||
import org.apache.hadoop.fs.s3a.auth.MarshalledCredentials;
|
||||
|
@ -1216,4 +1218,53 @@ public final class S3ATestUtils {
|
|||
}
|
||||
return Boolean.valueOf(persists);
|
||||
}
|
||||
|
||||
public static void checkListingDoesNotContainPath(S3AFileSystem fs, Path filePath)
|
||||
throws IOException {
|
||||
final RemoteIterator<LocatedFileStatus> listIter =
|
||||
fs.listFiles(filePath.getParent(), false);
|
||||
while (listIter.hasNext()) {
|
||||
final LocatedFileStatus lfs = listIter.next();
|
||||
assertNotEquals("Listing was not supposed to include " + filePath,
|
||||
filePath, lfs.getPath());
|
||||
}
|
||||
LOG.info("{}; file omitted from listFiles listing as expected.", filePath);
|
||||
|
||||
final FileStatus[] fileStatuses = fs.listStatus(filePath.getParent());
|
||||
for (FileStatus fileStatus : fileStatuses) {
|
||||
assertNotEquals("Listing was not supposed to include " + filePath,
|
||||
filePath, fileStatus.getPath());
|
||||
}
|
||||
LOG.info("{}; file omitted from listStatus as expected.", filePath);
|
||||
}
|
||||
|
||||
public static void checkListingContainsPath(S3AFileSystem fs, Path filePath)
|
||||
throws IOException {
|
||||
|
||||
boolean listFilesHasIt = false;
|
||||
boolean listStatusHasIt = false;
|
||||
|
||||
final RemoteIterator<LocatedFileStatus> listIter =
|
||||
fs.listFiles(filePath.getParent(), false);
|
||||
|
||||
|
||||
while (listIter.hasNext()) {
|
||||
final LocatedFileStatus lfs = listIter.next();
|
||||
if (filePath.equals(lfs.getPath())) {
|
||||
listFilesHasIt = true;
|
||||
}
|
||||
}
|
||||
|
||||
final FileStatus[] fileStatuses = fs.listStatus(filePath.getParent());
|
||||
for (FileStatus fileStatus : fileStatuses) {
|
||||
if (filePath.equals(fileStatus.getPath())) {
|
||||
listStatusHasIt = true;
|
||||
}
|
||||
}
|
||||
assertTrue("fs.listFiles didn't include " + filePath,
|
||||
listFilesHasIt);
|
||||
assertTrue("fs.listStatus didn't include " + filePath,
|
||||
listStatusHasIt);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue