HADOOP-16746. mkdirs and s3guard Authoritative mode.
Contributed by Steve Loughran. This fixes two problems with S3Guard authoritative mode and the auth directory flags which are stored in DynamoDB. 1. mkdirs was creating dir markers without the auth bit, forcing needless scans on newly created directories and files subsequently added; it was only with the first listStatus call on that directory that the dir would be marked as authoritative -even though it would be complete already. 2. listStatus(path) would reset the authoritative status bit of all child directories even if they were already marked as authoritative. Issue #2 is possibly the most expensive, as any treewalk using listStatus (e.g globfiles) would clear the auth bit for all child directories before listing them. And this would happen every single time... essentially you weren't getting authoritative directory listings. For the curious, that the major bug was actually found during testing -we'd all missed it during reviews. A lesson there: the better the tests the fewer the bugs. Maybe also: something obvious and significant can get by code reviews. modified: hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java modified: hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/BulkOperationState.java modified: hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DynamoDBMetadataStore.java modified: hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/LocalMetadataStore.java modified: hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/MetadataStore.java modified: hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/NullMetadataStore.java modified: hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/S3Guard.java modified: hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3GuardWriteBack.java modified: hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/auth/ITestRestrictedReadAccess.java modified: hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestPartialDeleteFailures.java modified: hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/ITestDynamoDBMetadataStore.java modified: hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/ITestDynamoDBMetadataStoreAuthoritativeMode.java modified: hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/ITestDynamoDBMetadataStoreScale.java modified: hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/ITestS3GuardFsck.java modified: hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/MetadataStoreTestBase.java modified: hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/TestS3Guard.java Change-Id: Ic3ffda13f2af2430afedd50fd657b595c83e90a7
This commit is contained in:
parent
1afd54fbbb
commit
7f40e6688a
|
@ -3528,7 +3528,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
||||||
// information gleaned from addAncestors is preserved into the
|
// information gleaned from addAncestors is preserved into the
|
||||||
// subsequent put.
|
// subsequent put.
|
||||||
stateToClose = S3Guard.initiateBulkWrite(metadataStore,
|
stateToClose = S3Guard.initiateBulkWrite(metadataStore,
|
||||||
BulkOperationState.OperationType.Put,
|
BulkOperationState.OperationType.Mkdir,
|
||||||
keyToPath(key));
|
keyToPath(key));
|
||||||
activeState = stateToClose;
|
activeState = stateToClose;
|
||||||
}
|
}
|
||||||
|
@ -3537,13 +3537,20 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
||||||
S3AFileStatus status = createUploadFileStatus(p,
|
S3AFileStatus status = createUploadFileStatus(p,
|
||||||
isDir, length,
|
isDir, length,
|
||||||
getDefaultBlockSize(p), username, eTag, versionId);
|
getDefaultBlockSize(p), username, eTag, versionId);
|
||||||
if (!isDir) {
|
boolean authoritative = false;
|
||||||
|
if (isDir) {
|
||||||
|
// this is a directory marker so put it as such.
|
||||||
|
status.setIsEmptyDirectory(Tristate.TRUE);
|
||||||
|
// and maybe mark as auth
|
||||||
|
authoritative = allowAuthoritative(p);
|
||||||
|
}
|
||||||
|
if (!authoritative) {
|
||||||
|
// for files and non-auth directories
|
||||||
S3Guard.putAndReturn(metadataStore, status,
|
S3Guard.putAndReturn(metadataStore, status,
|
||||||
ttlTimeProvider,
|
ttlTimeProvider,
|
||||||
activeState);
|
activeState);
|
||||||
} else {
|
} else {
|
||||||
// this is a directory marker so put it as such.
|
// authoritative directory
|
||||||
status.setIsEmptyDirectory(Tristate.TRUE);
|
|
||||||
S3Guard.putAuthDirectoryMarker(metadataStore, status,
|
S3Guard.putAuthDirectoryMarker(metadataStore, status,
|
||||||
ttlTimeProvider,
|
ttlTimeProvider,
|
||||||
activeState);
|
activeState);
|
||||||
|
|
|
@ -98,5 +98,9 @@ public class BulkOperationState implements Closeable {
|
||||||
* Listing update.
|
* Listing update.
|
||||||
*/
|
*/
|
||||||
Listing,
|
Listing,
|
||||||
|
/**
|
||||||
|
* Mkdir operation.
|
||||||
|
*/
|
||||||
|
Mkdir,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -202,6 +202,7 @@ import static org.apache.hadoop.fs.s3a.s3guard.S3Guard.*;
|
||||||
* same region. The region may also be set explicitly by setting the config
|
* same region. The region may also be set explicitly by setting the config
|
||||||
* parameter {@code fs.s3a.s3guard.ddb.region} to the corresponding region.
|
* parameter {@code fs.s3a.s3guard.ddb.region} to the corresponding region.
|
||||||
*/
|
*/
|
||||||
|
@SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter")
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
@InterfaceStability.Evolving
|
@InterfaceStability.Evolving
|
||||||
public class DynamoDBMetadataStore implements MetadataStore,
|
public class DynamoDBMetadataStore implements MetadataStore,
|
||||||
|
@ -899,19 +900,18 @@ public class DynamoDBMetadataStore implements MetadataStore,
|
||||||
List<DDBPathMetadata> sortedPaths = new ArrayList<>(pathsToCreate);
|
List<DDBPathMetadata> sortedPaths = new ArrayList<>(pathsToCreate);
|
||||||
sortedPaths.sort(PathOrderComparators.TOPMOST_PM_FIRST);
|
sortedPaths.sort(PathOrderComparators.TOPMOST_PM_FIRST);
|
||||||
// iterate through the paths.
|
// iterate through the paths.
|
||||||
for (DDBPathMetadata meta : sortedPaths) {
|
for (DDBPathMetadata entry : sortedPaths) {
|
||||||
Preconditions.checkArgument(meta != null);
|
Preconditions.checkArgument(entry != null);
|
||||||
Path path = meta.getFileStatus().getPath();
|
Path path = entry.getFileStatus().getPath();
|
||||||
LOG.debug("Adding entry {}", path);
|
LOG.debug("Adding entry {}", path);
|
||||||
if (path.isRoot()) {
|
if (path.isRoot()) {
|
||||||
// this is a root entry: do not add it.
|
// this is a root entry: do not add it.
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
// create the new entry
|
|
||||||
DDBPathMetadata entry = new DDBPathMetadata(meta);
|
|
||||||
// add it to the ancestor state, failing if it is already there and
|
// add it to the ancestor state, failing if it is already there and
|
||||||
// of a different type.
|
// of a different type.
|
||||||
DDBPathMetadata oldEntry = ancestorState.put(path, entry);
|
DDBPathMetadata oldEntry = ancestorState.put(path, entry);
|
||||||
|
boolean addAncestors = true;
|
||||||
if (oldEntry != null) {
|
if (oldEntry != null) {
|
||||||
if (!oldEntry.getFileStatus().isDirectory()
|
if (!oldEntry.getFileStatus().isDirectory()
|
||||||
|| !entry.getFileStatus().isDirectory()) {
|
|| !entry.getFileStatus().isDirectory()) {
|
||||||
|
@ -928,12 +928,18 @@ public class DynamoDBMetadataStore implements MetadataStore,
|
||||||
// a directory is already present. Log and continue.
|
// a directory is already present. Log and continue.
|
||||||
LOG.debug("Directory at {} being updated with value {}",
|
LOG.debug("Directory at {} being updated with value {}",
|
||||||
path, entry);
|
path, entry);
|
||||||
|
// and we skip the the subsequent parent scan as we've already been
|
||||||
|
// here
|
||||||
|
addAncestors = false;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// add the entry to the ancestry map as an explicitly requested entry.
|
// add the entry to the ancestry map as an explicitly requested entry.
|
||||||
ancestry.put(path, Pair.of(EntryOrigin.Requested, entry));
|
ancestry.put(path, Pair.of(EntryOrigin.Requested, entry));
|
||||||
|
// now scan up the ancestor tree to see if there are any
|
||||||
|
// immediately missing entries.
|
||||||
Path parent = path.getParent();
|
Path parent = path.getParent();
|
||||||
while (!parent.isRoot() && !ancestry.containsKey(parent)) {
|
while (addAncestors
|
||||||
|
&& !parent.isRoot() && !ancestry.containsKey(parent)) {
|
||||||
if (!ancestorState.findEntry(parent, true)) {
|
if (!ancestorState.findEntry(parent, true)) {
|
||||||
// there is no entry in the ancestor state.
|
// there is no entry in the ancestor state.
|
||||||
// look in the store
|
// look in the store
|
||||||
|
@ -947,6 +953,9 @@ public class DynamoDBMetadataStore implements MetadataStore,
|
||||||
md = itemToPathMetadata(item, username);
|
md = itemToPathMetadata(item, username);
|
||||||
LOG.debug("Found existing entry for parent: {}", md);
|
LOG.debug("Found existing entry for parent: {}", md);
|
||||||
newEntry = Pair.of(EntryOrigin.Retrieved, md);
|
newEntry = Pair.of(EntryOrigin.Retrieved, md);
|
||||||
|
// and we break, assuming that if there is an entry, its parents
|
||||||
|
// are valid too.
|
||||||
|
addAncestors = false;
|
||||||
} else {
|
} else {
|
||||||
// A directory entry was not found in the DB. Create one.
|
// A directory entry was not found in the DB. Create one.
|
||||||
LOG.debug("auto-create ancestor path {} for child path {}",
|
LOG.debug("auto-create ancestor path {} for child path {}",
|
||||||
|
@ -1439,6 +1448,7 @@ public class DynamoDBMetadataStore implements MetadataStore,
|
||||||
* {@link #processBatchWriteRequest(DynamoDBMetadataStore.AncestorState, PrimaryKey[], Item[])}
|
* {@link #processBatchWriteRequest(DynamoDBMetadataStore.AncestorState, PrimaryKey[], Item[])}
|
||||||
* is only tried once.
|
* is only tried once.
|
||||||
* @param meta Directory listing metadata.
|
* @param meta Directory listing metadata.
|
||||||
|
* @param unchangedEntries unchanged child entry paths
|
||||||
* @param operationState operational state for a bulk update
|
* @param operationState operational state for a bulk update
|
||||||
* @throws IOException IO problem
|
* @throws IOException IO problem
|
||||||
*/
|
*/
|
||||||
|
@ -1446,6 +1456,7 @@ public class DynamoDBMetadataStore implements MetadataStore,
|
||||||
@Retries.RetryTranslated
|
@Retries.RetryTranslated
|
||||||
public void put(
|
public void put(
|
||||||
final DirListingMetadata meta,
|
final DirListingMetadata meta,
|
||||||
|
final List<Path> unchangedEntries,
|
||||||
@Nullable final BulkOperationState operationState) throws IOException {
|
@Nullable final BulkOperationState operationState) throws IOException {
|
||||||
LOG.debug("Saving {} dir meta for {} to table {} in region {}: {}",
|
LOG.debug("Saving {} dir meta for {} to table {} in region {}: {}",
|
||||||
meta.isAuthoritative() ? "auth" : "nonauth",
|
meta.isAuthoritative() ? "auth" : "nonauth",
|
||||||
|
@ -1463,8 +1474,14 @@ public class DynamoDBMetadataStore implements MetadataStore,
|
||||||
final List<DDBPathMetadata> metasToPut = fullPathsToPut(ddbPathMeta,
|
final List<DDBPathMetadata> metasToPut = fullPathsToPut(ddbPathMeta,
|
||||||
ancestorState);
|
ancestorState);
|
||||||
|
|
||||||
// next add all children of the directory
|
// next add all changed children of the directory
|
||||||
metasToPut.addAll(pathMetaToDDBPathMeta(meta.getListing()));
|
// ones that came from the previous listing are left as-is
|
||||||
|
final Collection<PathMetadata> children = meta.getListing()
|
||||||
|
.stream()
|
||||||
|
.filter(e -> !unchangedEntries.contains(e.getFileStatus().getPath()))
|
||||||
|
.collect(Collectors.toList());
|
||||||
|
|
||||||
|
metasToPut.addAll(pathMetaToDDBPathMeta(children));
|
||||||
|
|
||||||
// sort so highest-level entries are written to the store first.
|
// sort so highest-level entries are written to the store first.
|
||||||
// if a sequence fails, no orphan entries will have been written.
|
// if a sequence fails, no orphan entries will have been written.
|
||||||
|
|
|
@ -45,6 +45,7 @@ import java.util.Collection;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.LinkedList;
|
import java.util.LinkedList;
|
||||||
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.atomic.AtomicLong;
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
|
@ -341,13 +342,14 @@ public class LocalMetadataStore implements MetadataStore {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public synchronized void put(DirListingMetadata meta,
|
public synchronized void put(DirListingMetadata meta,
|
||||||
|
final List<Path> unchangedEntries,
|
||||||
final BulkOperationState operationState) throws IOException {
|
final BulkOperationState operationState) throws IOException {
|
||||||
if (LOG.isDebugEnabled()) {
|
if (LOG.isDebugEnabled()) {
|
||||||
LOG.debug("put dirMeta {}", meta.prettyPrint());
|
LOG.debug("put dirMeta {}", meta.prettyPrint());
|
||||||
}
|
}
|
||||||
LocalMetadataEntry entry =
|
LocalMetadataEntry entry =
|
||||||
localCache.getIfPresent(standardize(meta.getPath()));
|
localCache.getIfPresent(standardize(meta.getPath()));
|
||||||
if(entry == null){
|
if (entry == null) {
|
||||||
localCache.put(standardize(meta.getPath()), new LocalMetadataEntry(meta));
|
localCache.put(standardize(meta.getPath()), new LocalMetadataEntry(meta));
|
||||||
} else {
|
} else {
|
||||||
entry.setDirListingMetadata(meta);
|
entry.setDirListingMetadata(meta);
|
||||||
|
|
|
@ -22,6 +22,7 @@ import javax.annotation.Nullable;
|
||||||
import java.io.Closeable;
|
import java.io.Closeable;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
|
@ -265,11 +266,19 @@ public interface MetadataStore extends Closeable {
|
||||||
* missing metadata updates (create, delete) made to the same path by
|
* missing metadata updates (create, delete) made to the same path by
|
||||||
* another process.
|
* another process.
|
||||||
*
|
*
|
||||||
|
* To optimize updates and avoid overwriting existing entries which
|
||||||
|
* may contain extra data, entries in the list of unchangedEntries may
|
||||||
|
* be excluded. That is: the listing metadata has the full list of
|
||||||
|
* what it believes are children, but implementations can opt to ignore
|
||||||
|
* some.
|
||||||
* @param meta Directory listing metadata.
|
* @param meta Directory listing metadata.
|
||||||
|
* @param unchangedEntries list of entries in the dir listing which have
|
||||||
|
* not changed since the directory was list scanned on s3guard.
|
||||||
* @param operationState operational state for a bulk update
|
* @param operationState operational state for a bulk update
|
||||||
* @throws IOException if there is an error
|
* @throws IOException if there is an error
|
||||||
*/
|
*/
|
||||||
void put(DirListingMetadata meta,
|
void put(DirListingMetadata meta,
|
||||||
|
final List<Path> unchangedEntries,
|
||||||
@Nullable BulkOperationState operationState) throws IOException;
|
@Nullable BulkOperationState operationState) throws IOException;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -30,6 +30,7 @@ import org.apache.hadoop.fs.s3a.impl.StoreContext;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -113,6 +114,7 @@ public class NullMetadataStore implements MetadataStore {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void put(DirListingMetadata meta,
|
public void put(DirListingMetadata meta,
|
||||||
|
final List<Path> unchangedEntries,
|
||||||
final BulkOperationState operationState) throws IOException {
|
final BulkOperationState operationState) throws IOException {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -23,6 +23,7 @@ import java.io.IOException;
|
||||||
import java.net.URI;
|
import java.net.URI;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
|
import java.util.Collections;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Locale;
|
import java.util.Locale;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
@ -75,6 +76,16 @@ public final class S3Guard {
|
||||||
DynamoDBClientFactory.DefaultDynamoDBClientFactory.class;
|
DynamoDBClientFactory.DefaultDynamoDBClientFactory.class;
|
||||||
private static final S3AFileStatus[] EMPTY_LISTING = new S3AFileStatus[0];
|
private static final S3AFileStatus[] EMPTY_LISTING = new S3AFileStatus[0];
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Hard-coded policy : {@value}.
|
||||||
|
* If true, when merging an S3 LIST with S3Guard in non-auth mode,
|
||||||
|
* only updated entries are added; new entries are left out.
|
||||||
|
* This policy choice reduces the amount of data stored in Dynamo,
|
||||||
|
* and hence the complexity of the merge in a non-auth listing.
|
||||||
|
*/
|
||||||
|
@VisibleForTesting
|
||||||
|
public static final boolean DIR_MERGE_UPDATES_ALL_RECORDS_NONAUTH = false;
|
||||||
|
|
||||||
// Utility class. All static functions.
|
// Utility class. All static functions.
|
||||||
private S3Guard() { }
|
private S3Guard() { }
|
||||||
|
|
||||||
|
@ -203,6 +214,7 @@ public final class S3Guard {
|
||||||
final PathMetadata fileMeta = authoritativeEmptyDirectoryMarker(status);
|
final PathMetadata fileMeta = authoritativeEmptyDirectoryMarker(status);
|
||||||
putWithTtl(ms, fileMeta, timeProvider, operationState);
|
putWithTtl(ms, fileMeta, timeProvider, operationState);
|
||||||
} finally {
|
} finally {
|
||||||
|
ms.getInstrumentation().directoryMarkedAuthoritative();
|
||||||
ms.getInstrumentation().entryAdded((System.nanoTime() - startTimeNano));
|
ms.getInstrumentation().entryAdded((System.nanoTime() - startTimeNano));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -291,36 +303,82 @@ public final class S3Guard {
|
||||||
false);
|
false);
|
||||||
}
|
}
|
||||||
|
|
||||||
Set<Path> deleted = dirMeta.listTombstones();
|
|
||||||
|
|
||||||
// Since we treat the MetadataStore as a "fresher" or "consistent" view
|
// Since we treat the MetadataStore as a "fresher" or "consistent" view
|
||||||
// of metadata, we always use its metadata first.
|
// of metadata, we always use its metadata first.
|
||||||
|
|
||||||
// Since the authoritative case is already handled outside this function,
|
// Since the authoritative case is already handled outside this function,
|
||||||
// we will basically start with the set of directory entries in the
|
// we will basically start with the set of directory entries in the
|
||||||
// DirListingMetadata, and add any that only exist in the backingStatuses.
|
// DirListingMetadata, and add any that only exist in the backingStatuses.
|
||||||
boolean changed = false;
|
//
|
||||||
final Map<Path, FileStatus> dirMetaMap = dirMeta.getListing().stream()
|
// We try to avoid writing any more child entries than need be to :-
|
||||||
.collect(Collectors.toMap(
|
// (a) save time and money.
|
||||||
pm -> pm.getFileStatus().getPath(), PathMetadata::getFileStatus)
|
// (b) avoid overwriting the authoritative bit of children (HADOOP-16746).
|
||||||
);
|
// For auth mode updates, we supply the full listing and a list of which
|
||||||
|
// child entries have not been changed; the store gets to optimize its
|
||||||
|
// update however it chooses.
|
||||||
|
//
|
||||||
|
// for non-auth-mode S3Guard, we just build a list of entries to add and
|
||||||
|
// submit them in a batch; this is more efficient than trickling out the
|
||||||
|
// updates one-by-one.
|
||||||
|
|
||||||
BulkOperationState operationState = ms.initiateBulkWrite(
|
BulkOperationState operationState = ms.initiateBulkWrite(
|
||||||
BulkOperationState.OperationType.Listing,
|
BulkOperationState.OperationType.Listing,
|
||||||
path);
|
path);
|
||||||
|
if (isAuthoritative) {
|
||||||
|
authoritativeUnion(ms, path, backingStatuses, dirMeta,
|
||||||
|
timeProvider, operationState);
|
||||||
|
} else {
|
||||||
|
nonAuthoritativeUnion(ms, path, backingStatuses, dirMeta,
|
||||||
|
timeProvider, operationState);
|
||||||
|
}
|
||||||
|
IOUtils.cleanupWithLogger(LOG, operationState);
|
||||||
|
|
||||||
|
return dirMetaToStatuses(dirMeta);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Perform the authoritative union operation.
|
||||||
|
* Here all updated/missing entries are added back; we take care
|
||||||
|
* not to overwrite unchanged entries as that will lose their
|
||||||
|
* isAuthoritative bit (HADOOP-16746).
|
||||||
|
* @param ms MetadataStore to use.
|
||||||
|
* @param path path to directory
|
||||||
|
* @param backingStatuses Directory listing from the backing store.
|
||||||
|
* @param dirMeta Directory listing from MetadataStore. May be null.
|
||||||
|
* @param timeProvider Time provider to use when updating entries
|
||||||
|
* @param operationState ongoing operation
|
||||||
|
* @throws IOException if metadata store update failed
|
||||||
|
*/
|
||||||
|
private static void authoritativeUnion(
|
||||||
|
final MetadataStore ms,
|
||||||
|
final Path path,
|
||||||
|
final List<S3AFileStatus> backingStatuses,
|
||||||
|
final DirListingMetadata dirMeta,
|
||||||
|
final ITtlTimeProvider timeProvider,
|
||||||
|
final BulkOperationState operationState) throws IOException {
|
||||||
|
// track all unchanged entries; used so the metastore can identify entries
|
||||||
|
// it doesn't need to update
|
||||||
|
List<Path> unchangedEntries = new ArrayList<>(dirMeta.getListing().size());
|
||||||
|
boolean changed = !dirMeta.isAuthoritative();
|
||||||
|
Set<Path> deleted = dirMeta.listTombstones();
|
||||||
|
final Map<Path, PathMetadata> dirMetaMap = dirMeta.getListing().stream()
|
||||||
|
.collect(Collectors.toMap(pm -> pm.getFileStatus().getPath(), pm -> pm));
|
||||||
for (S3AFileStatus s : backingStatuses) {
|
for (S3AFileStatus s : backingStatuses) {
|
||||||
if (deleted.contains(s.getPath())) {
|
final Path statusPath = s.getPath();
|
||||||
|
if (deleted.contains(statusPath)) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
final PathMetadata pathMetadata = new PathMetadata(s);
|
// this is built up to be whatever entry is to be added to the dirMeta
|
||||||
|
// collection
|
||||||
|
PathMetadata pathMetadata = dirMetaMap.get(statusPath);
|
||||||
|
|
||||||
if (!isAuthoritative){
|
if (pathMetadata == null) {
|
||||||
FileStatus status = dirMetaMap.get(s.getPath());
|
// there's no entry in the listing, so create one.
|
||||||
if (status != null
|
pathMetadata = new PathMetadata(s);
|
||||||
&& s.getModificationTime() > status.getModificationTime()) {
|
} else {
|
||||||
LOG.debug("Update ms with newer metadata of: {}", status);
|
// no change -add the path to the list of unchangedEntries
|
||||||
S3Guard.putWithTtl(ms, pathMetadata, timeProvider, operationState);
|
unchangedEntries.add(statusPath);
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Minor race condition here. Multiple threads could add to this
|
// Minor race condition here. Multiple threads could add to this
|
||||||
|
@ -330,27 +388,87 @@ public final class S3Guard {
|
||||||
// Any FileSystem has similar race conditions, but we could persist
|
// Any FileSystem has similar race conditions, but we could persist
|
||||||
// a stale entry longer. We could expose an atomic
|
// a stale entry longer. We could expose an atomic
|
||||||
// DirListingMetadata#putIfNotPresent()
|
// DirListingMetadata#putIfNotPresent()
|
||||||
boolean updated = dirMeta.put(pathMetadata);
|
changed |= dirMeta.put(pathMetadata);
|
||||||
changed = changed || updated;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// If dirMeta is not authoritative, but isAuthoritative is true the
|
if (changed) {
|
||||||
// directory metadata should be updated. Treat it as a change.
|
// in an authoritative update, we pass in the full list of entries,
|
||||||
changed = changed || (!dirMeta.isAuthoritative() && isAuthoritative);
|
// but do declare which have not changed to avoid needless and potentially
|
||||||
|
// destructive overwrites.
|
||||||
if (changed && isAuthoritative) {
|
|
||||||
LOG.debug("Marking the directory {} as authoritative", path);
|
LOG.debug("Marking the directory {} as authoritative", path);
|
||||||
final MetastoreInstrumentation instrumentation
|
ms.getInstrumentation().directoryMarkedAuthoritative();
|
||||||
= ms.getInstrumentation();
|
|
||||||
if (instrumentation != null) {
|
|
||||||
instrumentation.directoryMarkedAuthoritative();
|
|
||||||
}
|
|
||||||
dirMeta.setAuthoritative(true); // This is the full directory contents
|
dirMeta.setAuthoritative(true); // This is the full directory contents
|
||||||
S3Guard.putWithTtl(ms, dirMeta, timeProvider, operationState);
|
// write the updated dir entry and any changed children.
|
||||||
|
S3Guard.putWithTtl(ms, dirMeta, unchangedEntries, timeProvider, operationState);
|
||||||
}
|
}
|
||||||
IOUtils.cleanupWithLogger(LOG, operationState);
|
}
|
||||||
|
|
||||||
return dirMetaToStatuses(dirMeta);
|
/**
|
||||||
|
* Perform the authoritative union operation.
|
||||||
|
* @param ms MetadataStore to use.
|
||||||
|
* @param path path to directory
|
||||||
|
* @param backingStatuses Directory listing from the backing store.
|
||||||
|
* @param dirMeta Directory listing from MetadataStore. May be null.
|
||||||
|
* @param timeProvider Time provider to use when updating entries
|
||||||
|
* @param operationState ongoing operation
|
||||||
|
* @throws IOException if metadata store update failed
|
||||||
|
*/
|
||||||
|
private static void nonAuthoritativeUnion(
|
||||||
|
final MetadataStore ms,
|
||||||
|
final Path path,
|
||||||
|
final List<S3AFileStatus> backingStatuses,
|
||||||
|
final DirListingMetadata dirMeta,
|
||||||
|
final ITtlTimeProvider timeProvider,
|
||||||
|
final BulkOperationState operationState) throws IOException {
|
||||||
|
List<PathMetadata> entriesToAdd = new ArrayList<>(backingStatuses.size());
|
||||||
|
Set<Path> deleted = dirMeta.listTombstones();
|
||||||
|
|
||||||
|
final Map<Path, PathMetadata> dirMetaMap = dirMeta.getListing().stream()
|
||||||
|
.collect(Collectors.toMap(pm -> pm.getFileStatus().getPath(), pm -> pm));
|
||||||
|
for (S3AFileStatus s : backingStatuses) {
|
||||||
|
final Path statusPath = s.getPath();
|
||||||
|
if (deleted.contains(statusPath)) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
// this is the record in dynamo
|
||||||
|
PathMetadata pathMetadata = dirMetaMap.get(statusPath);
|
||||||
|
|
||||||
|
// in non-auth listings, we compare the file status of the metastore
|
||||||
|
// list with those in the FS, and overwrite the MS entry if
|
||||||
|
// either of two conditions are met
|
||||||
|
// - there is no entry in the metastore and
|
||||||
|
// DIR_MERGE_UPDATES_ALL_RECORDS_NONAUTH is compiled to true
|
||||||
|
// - there is an entry in the metastore the FS entry is newer.
|
||||||
|
boolean shouldUpdate;
|
||||||
|
if (pathMetadata != null) {
|
||||||
|
// entry is in DDB; check modification time
|
||||||
|
shouldUpdate = s.getModificationTime() > (pathMetadata.getFileStatus())
|
||||||
|
.getModificationTime();
|
||||||
|
// create an updated record.
|
||||||
|
pathMetadata = new PathMetadata(s);
|
||||||
|
} else {
|
||||||
|
// entry is not present. Create for insertion into dirMeta
|
||||||
|
pathMetadata = new PathMetadata(s);
|
||||||
|
// use hard-coded policy about updating
|
||||||
|
shouldUpdate = DIR_MERGE_UPDATES_ALL_RECORDS_NONAUTH;
|
||||||
|
}
|
||||||
|
if (shouldUpdate) {
|
||||||
|
// we do want to update DDB and the listing with a new entry.
|
||||||
|
LOG.debug("Update ms with newer metadata of: {}", s);
|
||||||
|
// ensure it gets into the dirListing
|
||||||
|
// add to the list of entries to add later,
|
||||||
|
entriesToAdd.add(pathMetadata);
|
||||||
|
}
|
||||||
|
// add the entry to the union; no-op if it was already there.
|
||||||
|
dirMeta.put(pathMetadata);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!entriesToAdd.isEmpty()) {
|
||||||
|
// non-auth, just push out the updated entry list
|
||||||
|
LOG.debug("Adding {} entries under directory {}", entriesToAdd.size(), path);
|
||||||
|
putWithTtl(ms, entriesToAdd, timeProvider, operationState);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -433,7 +551,7 @@ public final class S3Guard {
|
||||||
children.add(new PathMetadata(prevStatus));
|
children.add(new PathMetadata(prevStatus));
|
||||||
}
|
}
|
||||||
dirMeta = new DirListingMetadata(f, children, authoritative);
|
dirMeta = new DirListingMetadata(f, children, authoritative);
|
||||||
S3Guard.putWithTtl(ms, dirMeta, timeProvider, null);
|
S3Guard.putWithTtl(ms, dirMeta, Collections.emptyList(), timeProvider, null);
|
||||||
}
|
}
|
||||||
|
|
||||||
pathMetas.add(new PathMetadata(status));
|
pathMetas.add(new PathMetadata(status));
|
||||||
|
@ -662,10 +780,13 @@ public final class S3Guard {
|
||||||
* directory and its children.
|
* directory and its children.
|
||||||
* @param ms metastore
|
* @param ms metastore
|
||||||
* @param dirMeta directory
|
* @param dirMeta directory
|
||||||
|
* @param unchangedEntries list of unchanged entries from the listing
|
||||||
* @param timeProvider nullable time provider
|
* @param timeProvider nullable time provider
|
||||||
* @throws IOException failure.
|
* @throws IOException failure.
|
||||||
*/
|
*/
|
||||||
public static void putWithTtl(MetadataStore ms, DirListingMetadata dirMeta,
|
public static void putWithTtl(MetadataStore ms,
|
||||||
|
DirListingMetadata dirMeta,
|
||||||
|
final List<Path> unchangedEntries,
|
||||||
final ITtlTimeProvider timeProvider,
|
final ITtlTimeProvider timeProvider,
|
||||||
@Nullable final BulkOperationState operationState)
|
@Nullable final BulkOperationState operationState)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
|
@ -673,7 +794,7 @@ public final class S3Guard {
|
||||||
dirMeta.setLastUpdated(now);
|
dirMeta.setLastUpdated(now);
|
||||||
dirMeta.getListing()
|
dirMeta.getListing()
|
||||||
.forEach(pm -> pm.setLastUpdated(now));
|
.forEach(pm -> pm.setLastUpdated(now));
|
||||||
ms.put(dirMeta, operationState);
|
ms.put(dirMeta, unchangedEntries, operationState);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -24,7 +24,8 @@ import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.fs.contract.ContractTestUtils;
|
import org.apache.hadoop.fs.contract.ContractTestUtils;
|
||||||
import org.apache.hadoop.fs.s3a.s3guard.DirListingMetadata;
|
import org.apache.hadoop.fs.s3a.s3guard.DirListingMetadata;
|
||||||
import org.junit.Assume;
|
import org.apache.hadoop.fs.s3a.s3guard.S3Guard;
|
||||||
|
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
@ -32,6 +33,7 @@ import java.net.URI;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
|
|
||||||
import static org.apache.hadoop.fs.s3a.Constants.*;
|
import static org.apache.hadoop.fs.s3a.Constants.*;
|
||||||
|
import static org.junit.Assume.assumeTrue;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Test cases that validate S3Guard's behavior for writing things like
|
* Test cases that validate S3Guard's behavior for writing things like
|
||||||
|
@ -39,6 +41,13 @@ import static org.apache.hadoop.fs.s3a.Constants.*;
|
||||||
*/
|
*/
|
||||||
public class ITestS3GuardWriteBack extends AbstractS3ATestBase {
|
public class ITestS3GuardWriteBack extends AbstractS3ATestBase {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void setup() throws Exception {
|
||||||
|
assumeTrue("dirListingUnion always writes back records",
|
||||||
|
!S3Guard.DIR_MERGE_UPDATES_ALL_RECORDS_NONAUTH);
|
||||||
|
super.setup();
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* In listStatus(), when S3Guard is enabled, the full listing for a
|
* In listStatus(), when S3Guard is enabled, the full listing for a
|
||||||
* directory is "written back" to the MetadataStore before the listing is
|
* directory is "written back" to the MetadataStore before the listing is
|
||||||
|
@ -49,7 +58,7 @@ public class ITestS3GuardWriteBack extends AbstractS3ATestBase {
|
||||||
*/
|
*/
|
||||||
@Test
|
@Test
|
||||||
public void testListStatusWriteBack() throws Exception {
|
public void testListStatusWriteBack() throws Exception {
|
||||||
Assume.assumeTrue(getFileSystem().hasMetadataStore());
|
assumeTrue(getFileSystem().hasMetadataStore());
|
||||||
|
|
||||||
Path directory = path("ListStatusWriteBack");
|
Path directory = path("ListStatusWriteBack");
|
||||||
|
|
||||||
|
|
|
@ -53,13 +53,13 @@ import org.apache.hadoop.mapreduce.lib.input.InvalidInputException;
|
||||||
import static org.apache.hadoop.fs.contract.ContractTestUtils.createFile;
|
import static org.apache.hadoop.fs.contract.ContractTestUtils.createFile;
|
||||||
import static org.apache.hadoop.fs.contract.ContractTestUtils.touch;
|
import static org.apache.hadoop.fs.contract.ContractTestUtils.touch;
|
||||||
import static org.apache.hadoop.fs.s3a.Constants.ASSUMED_ROLE_ARN;
|
import static org.apache.hadoop.fs.s3a.Constants.ASSUMED_ROLE_ARN;
|
||||||
|
import static org.apache.hadoop.fs.s3a.Constants.AUTHORITATIVE_PATH;
|
||||||
import static org.apache.hadoop.fs.s3a.Constants.METADATASTORE_AUTHORITATIVE;
|
import static org.apache.hadoop.fs.s3a.Constants.METADATASTORE_AUTHORITATIVE;
|
||||||
import static org.apache.hadoop.fs.s3a.Constants.S3_METADATA_STORE_IMPL;
|
import static org.apache.hadoop.fs.s3a.Constants.S3_METADATA_STORE_IMPL;
|
||||||
import static org.apache.hadoop.fs.s3a.S3ATestUtils.assume;
|
import static org.apache.hadoop.fs.s3a.S3ATestUtils.assume;
|
||||||
import static org.apache.hadoop.fs.s3a.S3ATestUtils.assumeS3GuardState;
|
import static org.apache.hadoop.fs.s3a.S3ATestUtils.assumeS3GuardState;
|
||||||
import static org.apache.hadoop.fs.s3a.S3ATestUtils.disableFilesystemCaching;
|
import static org.apache.hadoop.fs.s3a.S3ATestUtils.disableFilesystemCaching;
|
||||||
import static org.apache.hadoop.fs.s3a.S3ATestUtils.getTestBucketName;
|
import static org.apache.hadoop.fs.s3a.S3ATestUtils.getTestBucketName;
|
||||||
import static org.apache.hadoop.fs.s3a.S3ATestUtils.isS3GuardTestPropertySet;
|
|
||||||
import static org.apache.hadoop.fs.s3a.S3ATestUtils.lsR;
|
import static org.apache.hadoop.fs.s3a.S3ATestUtils.lsR;
|
||||||
import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
|
import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
|
||||||
import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBucketOverrides;
|
import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBucketOverrides;
|
||||||
|
@ -221,13 +221,10 @@ public class ITestRestrictedReadAccess extends AbstractS3ATestBase {
|
||||||
public Configuration createConfiguration() {
|
public Configuration createConfiguration() {
|
||||||
Configuration conf = super.createConfiguration();
|
Configuration conf = super.createConfiguration();
|
||||||
String bucketName = getTestBucketName(conf);
|
String bucketName = getTestBucketName(conf);
|
||||||
// is s3guard enabled?
|
|
||||||
boolean guardedTestRun = isS3GuardTestPropertySet(conf);
|
|
||||||
|
|
||||||
// in a guarded test run, except for the special case of raw,
|
|
||||||
// all DDB settings are left alone.
|
|
||||||
removeBaseAndBucketOverrides(bucketName, conf,
|
removeBaseAndBucketOverrides(bucketName, conf,
|
||||||
METADATASTORE_AUTHORITATIVE);
|
METADATASTORE_AUTHORITATIVE,
|
||||||
|
AUTHORITATIVE_PATH);
|
||||||
removeBucketOverrides(bucketName, conf,
|
removeBucketOverrides(bucketName, conf,
|
||||||
S3_METADATA_STORE_IMPL);
|
S3_METADATA_STORE_IMPL);
|
||||||
if (!s3guard) {
|
if (!s3guard) {
|
||||||
|
@ -317,8 +314,10 @@ public class ITestRestrictedReadAccess extends AbstractS3ATestBase {
|
||||||
verifyS3GuardSettings(realFS, "real filesystem");
|
verifyS3GuardSettings(realFS, "real filesystem");
|
||||||
|
|
||||||
// avoiding the parameterization to steer clear of accidentally creating
|
// avoiding the parameterization to steer clear of accidentally creating
|
||||||
// patterns
|
// patterns; a timestamp is used to ensure tombstones from previous runs
|
||||||
basePath = path("testNoReadAccess-" + name);
|
// do not interfere
|
||||||
|
basePath = path("testNoReadAccess-" + name
|
||||||
|
+ "-" + System.currentTimeMillis() / 1000);
|
||||||
|
|
||||||
// define the paths and create them.
|
// define the paths and create them.
|
||||||
describe("Creating test directories and files");
|
describe("Creating test directories and files");
|
||||||
|
@ -628,7 +627,7 @@ public class ITestRestrictedReadAccess extends AbstractS3ATestBase {
|
||||||
* Do some cleanup to see what happens with delete calls.
|
* Do some cleanup to see what happens with delete calls.
|
||||||
* Cleanup happens in test teardown anyway; doing it here
|
* Cleanup happens in test teardown anyway; doing it here
|
||||||
* just makes use of the delete calls to see how delete failures
|
* just makes use of the delete calls to see how delete failures
|
||||||
* change with permissions and S3Guard stettings.
|
* change with permissions and S3Guard settings.
|
||||||
*/
|
*/
|
||||||
public void checkDeleteOperations() throws Throwable {
|
public void checkDeleteOperations() throws Throwable {
|
||||||
describe("Testing delete operations");
|
describe("Testing delete operations");
|
||||||
|
|
|
@ -310,6 +310,7 @@ public class TestPartialDeleteFailures {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void put(final DirListingMetadata meta,
|
public void put(final DirListingMetadata meta,
|
||||||
|
final List<Path> unchangedEntries,
|
||||||
final BulkOperationState operationState) {
|
final BulkOperationState operationState) {
|
||||||
created.add(meta.getPath());
|
created.add(meta.getPath());
|
||||||
}
|
}
|
||||||
|
|
|
@ -28,6 +28,7 @@ import java.net.URI;
|
||||||
import java.nio.charset.Charset;
|
import java.nio.charset.Charset;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
|
import java.util.Collections;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
@ -133,6 +134,8 @@ public class ITestDynamoDBMetadataStore extends MetadataStoreTestBase {
|
||||||
|
|
||||||
private static String testDynamoDBTableName;
|
private static String testDynamoDBTableName;
|
||||||
|
|
||||||
|
private static final List<Path> UNCHANGED_ENTRIES = Collections.emptyList();
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Create a path under the test path provided by
|
* Create a path under the test path provided by
|
||||||
* the FS contract.
|
* the FS contract.
|
||||||
|
@ -593,7 +596,8 @@ public class ITestDynamoDBMetadataStore extends MetadataStoreTestBase {
|
||||||
Collection<Path> pathsToDelete = null;
|
Collection<Path> pathsToDelete = null;
|
||||||
if (oldMetas != null) {
|
if (oldMetas != null) {
|
||||||
// put all metadata of old paths and verify
|
// put all metadata of old paths and verify
|
||||||
ms.put(new DirListingMetadata(oldDir, oldMetas, false), putState);
|
ms.put(new DirListingMetadata(oldDir, oldMetas, false), UNCHANGED_ENTRIES,
|
||||||
|
putState);
|
||||||
assertEquals("Child count",
|
assertEquals("Child count",
|
||||||
0, ms.listChildren(newDir).withoutTombstones().numEntries());
|
0, ms.listChildren(newDir).withoutTombstones().numEntries());
|
||||||
Assertions.assertThat(ms.listChildren(oldDir).getListing())
|
Assertions.assertThat(ms.listChildren(oldDir).getListing())
|
||||||
|
@ -960,13 +964,13 @@ public class ITestDynamoDBMetadataStore extends MetadataStoreTestBase {
|
||||||
grandchildPath,
|
grandchildPath,
|
||||||
new ArrayList<>(), false);
|
new ArrayList<>(), false);
|
||||||
intercept(PathIOException.class, E_INCONSISTENT_UPDATE,
|
intercept(PathIOException.class, E_INCONSISTENT_UPDATE,
|
||||||
() -> ddbms.put(grandchildListing, bulkWrite));
|
() -> ddbms.put(grandchildListing, UNCHANGED_ENTRIES, bulkWrite));
|
||||||
|
|
||||||
// but a directory update under another path is fine
|
// but a directory update under another path is fine
|
||||||
DirListingMetadata grandchild2Listing = new DirListingMetadata(
|
DirListingMetadata grandchild2Listing = new DirListingMetadata(
|
||||||
grandchild2Path,
|
grandchild2Path,
|
||||||
new ArrayList<>(), false);
|
new ArrayList<>(), false);
|
||||||
ddbms.put(grandchild2Listing, bulkWrite);
|
ddbms.put(grandchild2Listing, UNCHANGED_ENTRIES, bulkWrite);
|
||||||
// and it creates a new entry for its parent
|
// and it creates a new entry for its parent
|
||||||
verifyInAncestor(bulkWrite, child2, true);
|
verifyInAncestor(bulkWrite, child2, true);
|
||||||
}
|
}
|
||||||
|
@ -1097,7 +1101,7 @@ public class ITestDynamoDBMetadataStore extends MetadataStoreTestBase {
|
||||||
assertEquals(auth, dlm.isAuthoritative());
|
assertEquals(auth, dlm.isAuthoritative());
|
||||||
|
|
||||||
// Test with non-authoritative listing, empty dir
|
// Test with non-authoritative listing, empty dir
|
||||||
ms.put(dlm, null);
|
ms.put(dlm, UNCHANGED_ENTRIES, null);
|
||||||
final PathMetadata pmdResultEmpty = ms.get(dirToPut, true);
|
final PathMetadata pmdResultEmpty = ms.get(dirToPut, true);
|
||||||
if(auth){
|
if(auth){
|
||||||
assertEquals(Tristate.TRUE, pmdResultEmpty.isEmptyDirectory());
|
assertEquals(Tristate.TRUE, pmdResultEmpty.isEmptyDirectory());
|
||||||
|
@ -1107,7 +1111,7 @@ public class ITestDynamoDBMetadataStore extends MetadataStoreTestBase {
|
||||||
|
|
||||||
// Test with non-authoritative listing, non-empty dir
|
// Test with non-authoritative listing, non-empty dir
|
||||||
dlm.put(new PathMetadata(basicFileStatus(fileToPut, 1, false)));
|
dlm.put(new PathMetadata(basicFileStatus(fileToPut, 1, false)));
|
||||||
ms.put(dlm, null);
|
ms.put(dlm, UNCHANGED_ENTRIES, null);
|
||||||
final PathMetadata pmdResultNotEmpty = ms.get(dirToPut, true);
|
final PathMetadata pmdResultNotEmpty = ms.get(dirToPut, true);
|
||||||
assertEquals(Tristate.FALSE, pmdResultNotEmpty.isEmptyDirectory());
|
assertEquals(Tristate.FALSE, pmdResultNotEmpty.isEmptyDirectory());
|
||||||
}
|
}
|
||||||
|
|
|
@ -45,8 +45,6 @@ import org.apache.hadoop.fs.s3a.S3ATestUtils;
|
||||||
import org.apache.hadoop.fs.s3a.Tristate;
|
import org.apache.hadoop.fs.s3a.Tristate;
|
||||||
import org.apache.hadoop.io.IOUtils;
|
import org.apache.hadoop.io.IOUtils;
|
||||||
|
|
||||||
import static org.apache.hadoop.fs.contract.ContractTestUtils.rm;
|
|
||||||
import static org.apache.hadoop.fs.contract.ContractTestUtils.writeTextFile;
|
|
||||||
import static org.apache.hadoop.fs.s3a.Constants.AUTHORITATIVE_PATH;
|
import static org.apache.hadoop.fs.s3a.Constants.AUTHORITATIVE_PATH;
|
||||||
import static org.apache.hadoop.fs.s3a.Constants.METADATASTORE_AUTHORITATIVE;
|
import static org.apache.hadoop.fs.s3a.Constants.METADATASTORE_AUTHORITATIVE;
|
||||||
import static org.apache.hadoop.fs.s3a.Constants.S3GUARD_DDB_BACKGROUND_SLEEP_MSEC_KEY;
|
import static org.apache.hadoop.fs.s3a.Constants.S3GUARD_DDB_BACKGROUND_SLEEP_MSEC_KEY;
|
||||||
|
@ -56,6 +54,7 @@ import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides
|
||||||
import static org.apache.hadoop.fs.s3a.S3AUtils.applyLocatedFiles;
|
import static org.apache.hadoop.fs.s3a.S3AUtils.applyLocatedFiles;
|
||||||
import static org.apache.hadoop.fs.s3a.Statistic.OBJECT_LIST_REQUESTS;
|
import static org.apache.hadoop.fs.s3a.Statistic.OBJECT_LIST_REQUESTS;
|
||||||
import static org.apache.hadoop.fs.s3a.Statistic.S3GUARD_METADATASTORE_AUTHORITATIVE_DIRECTORIES_UPDATED;
|
import static org.apache.hadoop.fs.s3a.Statistic.S3GUARD_METADATASTORE_AUTHORITATIVE_DIRECTORIES_UPDATED;
|
||||||
|
import static org.apache.hadoop.fs.s3a.Statistic.S3GUARD_METADATASTORE_RECORD_WRITES;
|
||||||
import static org.apache.hadoop.fs.s3a.s3guard.AuthoritativeAuditOperation.ERROR_PATH_NOT_AUTH_IN_FS;
|
import static org.apache.hadoop.fs.s3a.s3guard.AuthoritativeAuditOperation.ERROR_PATH_NOT_AUTH_IN_FS;
|
||||||
import static org.apache.hadoop.fs.s3a.s3guard.PathMetadataDynamoDBTranslation.authoritativeEmptyDirectoryMarker;
|
import static org.apache.hadoop.fs.s3a.s3guard.PathMetadataDynamoDBTranslation.authoritativeEmptyDirectoryMarker;
|
||||||
import static org.apache.hadoop.fs.s3a.s3guard.S3GuardTool.Authoritative.CHECK_FLAG;
|
import static org.apache.hadoop.fs.s3a.s3guard.S3GuardTool.Authoritative.CHECK_FLAG;
|
||||||
|
@ -139,8 +138,14 @@ public class ITestDynamoDBMetadataStoreAuthoritativeMode
|
||||||
*/
|
*/
|
||||||
private AuthoritativeAuditOperation auditor;
|
private AuthoritativeAuditOperation auditor;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Path {@code $methodAuthPath/dir}.
|
||||||
|
*/
|
||||||
private Path dir;
|
private Path dir;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Path {@code $methodAuthPath/dir/file}.
|
||||||
|
*/
|
||||||
private Path dirFile;
|
private Path dirFile;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -148,6 +153,11 @@ public class ITestDynamoDBMetadataStoreAuthoritativeMode
|
||||||
*/
|
*/
|
||||||
private final List<S3GuardTool> toolsToClose = new ArrayList<>();
|
private final List<S3GuardTool> toolsToClose = new ArrayList<>();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The metastore of the auth filesystem.
|
||||||
|
*/
|
||||||
|
private DynamoDBMetadataStore metastore;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* After all tests have run, close the filesystems.
|
* After all tests have run, close the filesystems.
|
||||||
*/
|
*/
|
||||||
|
@ -205,9 +215,10 @@ public class ITestDynamoDBMetadataStoreAuthoritativeMode
|
||||||
S3_METADATA_STORE_IMPL);
|
S3_METADATA_STORE_IMPL);
|
||||||
unguardedFS = (S3AFileSystem) FileSystem.newInstance(uri, unguardedConf);
|
unguardedFS = (S3AFileSystem) FileSystem.newInstance(uri, unguardedConf);
|
||||||
}
|
}
|
||||||
|
metastore = (DynamoDBMetadataStore) authFS.getMetadataStore();
|
||||||
auditor = new AuthoritativeAuditOperation(
|
auditor = new AuthoritativeAuditOperation(
|
||||||
authFS.createStoreContext(),
|
authFS.createStoreContext(),
|
||||||
(DynamoDBMetadataStore) authFS.getMetadataStore(),
|
metastore,
|
||||||
true,
|
true,
|
||||||
true);
|
true);
|
||||||
|
|
||||||
|
@ -272,7 +283,6 @@ public class ITestDynamoDBMetadataStoreAuthoritativeMode
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
@Ignore("HADOOP-16697. Needs mkdir to be authoritative")
|
|
||||||
public void testMkDirAuth() throws Throwable {
|
public void testMkDirAuth() throws Throwable {
|
||||||
describe("create an empty dir and assert it is tagged as authoritative");
|
describe("create an empty dir and assert it is tagged as authoritative");
|
||||||
authFS.mkdirs(dir);
|
authFS.mkdirs(dir);
|
||||||
|
@ -282,10 +292,11 @@ public class ITestDynamoDBMetadataStoreAuthoritativeMode
|
||||||
@Test
|
@Test
|
||||||
public void testListStatusMakesEmptyDirAuth() throws Throwable {
|
public void testListStatusMakesEmptyDirAuth() throws Throwable {
|
||||||
describe("Verify listStatus marks an Empty dir as auth");
|
describe("Verify listStatus marks an Empty dir as auth");
|
||||||
authFS.mkdirs(dir);
|
mkNonauthDir(dir);
|
||||||
expectNonauthRecursive(dir);
|
// initial dir is non-auth
|
||||||
|
expectNonauthNonRecursive(dir);
|
||||||
authFS.listStatus(dir);
|
authFS.listStatus(dir);
|
||||||
// dir is auth; subdir is not
|
// dir is auth;
|
||||||
expectAuthRecursive(dir);
|
expectAuthRecursive(dir);
|
||||||
// Next list will not go to s3
|
// Next list will not go to s3
|
||||||
assertListDoesNotUpdateAuth(dir);
|
assertListDoesNotUpdateAuth(dir);
|
||||||
|
@ -299,12 +310,36 @@ public class ITestDynamoDBMetadataStoreAuthoritativeMode
|
||||||
mkAuthDir(dir);
|
mkAuthDir(dir);
|
||||||
expectAuthRecursive(dir);
|
expectAuthRecursive(dir);
|
||||||
authFS.mkdirs(subdir);
|
authFS.mkdirs(subdir);
|
||||||
// dir is auth; subdir is not
|
// dir and subdirs are auth
|
||||||
expectAuthNonRecursive(dir);
|
expectAuthRecursive(dir);
|
||||||
expectNonauthRecursive(dir);
|
expectAuthRecursive(subdir);
|
||||||
assertListDoesNotUpdateAuth(dir);
|
// now mark the dir as nonauth
|
||||||
// Subdir list makes it auth
|
markDirNonauth(dir);
|
||||||
assertListUpdatesAuth(subdir);
|
expectNonauthNonRecursive(dir);
|
||||||
|
expectAuthRecursive(subdir);
|
||||||
|
|
||||||
|
// look at the MD & make sure that the dir and subdir are auth
|
||||||
|
final DirListingMetadata listing = metastore.listChildren(dir);
|
||||||
|
Assertions.assertThat(listing)
|
||||||
|
.describedAs("metadata of %s", dir)
|
||||||
|
.matches(d -> !d.isAuthoritative(), "is not auth");
|
||||||
|
Assertions.assertThat(listing.getListing())
|
||||||
|
.describedAs("listing of %s", dir)
|
||||||
|
.hasSize(1)
|
||||||
|
.allMatch(md -> ((DDBPathMetadata) md).isAuthoritativeDir(),
|
||||||
|
"is auth");
|
||||||
|
|
||||||
|
// directory list makes the dir auth and leaves the child auth
|
||||||
|
assertListUpdatesAuth(dir);
|
||||||
|
|
||||||
|
// and afterwards, a followup list does not write anything to DDB
|
||||||
|
// (as the dir is auth, its not going to go near the FS to update...)
|
||||||
|
expectOperationUpdatesDDB(0, () -> authFS.listStatus(dir));
|
||||||
|
// mark the dir nonauth again
|
||||||
|
markDirNonauth(dir);
|
||||||
|
// and only one record is written to DDB, the dir marker as auth
|
||||||
|
// the subdir is not overwritten
|
||||||
|
expectOperationUpdatesDDB(1, () -> authFS.listStatus(dir));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -322,7 +357,6 @@ public class ITestDynamoDBMetadataStoreAuthoritativeMode
|
||||||
* marker is added. This must be auth.
|
* marker is added. This must be auth.
|
||||||
*/
|
*/
|
||||||
@Test
|
@Test
|
||||||
@Ignore("HADOOP-16697. Needs mkdir to be authoritative")
|
|
||||||
public void testDeleteSingleFileLeavesMarkersAlone() throws Throwable {
|
public void testDeleteSingleFileLeavesMarkersAlone() throws Throwable {
|
||||||
describe("Deleting a file with no peers makes no changes to ancestors");
|
describe("Deleting a file with no peers makes no changes to ancestors");
|
||||||
mkAuthDir(methodAuthPath);
|
mkAuthDir(methodAuthPath);
|
||||||
|
@ -345,6 +379,16 @@ public class ITestDynamoDBMetadataStoreAuthoritativeMode
|
||||||
expectAuthRecursive(methodAuthPath);
|
expectAuthRecursive(methodAuthPath);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testDeleteEmptyDirLeavesParentAuth() throws Throwable {
|
||||||
|
describe("Deleting a directory retains the auth status "
|
||||||
|
+ "of the parent directory");
|
||||||
|
mkAuthDir(dir);
|
||||||
|
mkAuthDir(dirFile);
|
||||||
|
expectAuthRecursive(dir);
|
||||||
|
authFS.delete(dirFile, false);
|
||||||
|
expectAuthRecursive(dir);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Assert the number of pruned files matches expectations.
|
* Assert the number of pruned files matches expectations.
|
||||||
|
@ -417,50 +461,23 @@ public class ITestDynamoDBMetadataStoreAuthoritativeMode
|
||||||
@Test
|
@Test
|
||||||
public void testRenameDirMarksDestAsAuth() throws Throwable {
|
public void testRenameDirMarksDestAsAuth() throws Throwable {
|
||||||
describe("renaming a dir must mark dest tree as auth");
|
describe("renaming a dir must mark dest tree as auth");
|
||||||
final Path d = methodAuthPath;
|
final Path base = methodAuthPath;
|
||||||
final Path source = new Path(d, "source");
|
mkAuthDir(base);
|
||||||
final Path dest = new Path(d, "dest");
|
final Path source = new Path(base, "source");
|
||||||
|
final Path dest = new Path(base, "dest");
|
||||||
mkAuthDir(source);
|
mkAuthDir(source);
|
||||||
Path f = new Path(source, "subdir/file");
|
expectAuthRecursive(base);
|
||||||
|
Path subdir = new Path(source, "subdir");
|
||||||
|
Path f = new Path(subdir, "file");
|
||||||
touchFile(f);
|
touchFile(f);
|
||||||
|
expectNonauthRecursive(base);
|
||||||
|
// list the source directories so everything is
|
||||||
|
// marked as auth
|
||||||
|
authFS.listStatus(source);
|
||||||
|
authFS.listStatus(subdir);
|
||||||
|
expectAuthRecursive(base);
|
||||||
authFS.rename(source, dest);
|
authFS.rename(source, dest);
|
||||||
expectNonauthRecursive(d);
|
expectAuthRecursive(base);
|
||||||
expectAuthRecursive(dest);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testRenameWithNonEmptySubDir() throws Throwable {
|
|
||||||
final Path renameTestDir = methodAuthPath;
|
|
||||||
final Path srcDir = new Path(renameTestDir, "src1");
|
|
||||||
final Path srcSubDir = new Path(srcDir, "sub");
|
|
||||||
final Path finalDir = new Path(renameTestDir, "dest");
|
|
||||||
FileSystem fs = authFS;
|
|
||||||
rm(fs, renameTestDir, true, false);
|
|
||||||
|
|
||||||
fs.mkdirs(srcDir);
|
|
||||||
fs.mkdirs(finalDir);
|
|
||||||
writeTextFile(fs, new Path(srcDir, "source.txt"),
|
|
||||||
"this is the file in src dir", false);
|
|
||||||
writeTextFile(fs, new Path(srcSubDir, "subfile.txt"),
|
|
||||||
"this is the file in src/sub dir", false);
|
|
||||||
|
|
||||||
assertPathExists("not created in src dir",
|
|
||||||
new Path(srcDir, "source.txt"));
|
|
||||||
assertPathExists("not created in src/sub dir",
|
|
||||||
new Path(srcSubDir, "subfile.txt"));
|
|
||||||
|
|
||||||
boolean rename = fs.rename(srcDir, finalDir);
|
|
||||||
Assertions.assertThat(rename)
|
|
||||||
.describedAs("rename(%s, %s)", srcDir, finalDir)
|
|
||||||
.isTrue();
|
|
||||||
|
|
||||||
// POSIX rename behavior
|
|
||||||
assertPathExists("not renamed into dest dir",
|
|
||||||
new Path(finalDir, "source.txt"));
|
|
||||||
assertPathExists("not renamed into dest/sub dir",
|
|
||||||
new Path(finalDir, "sub/subfile.txt"));
|
|
||||||
assertPathDoesNotExist("not deleted",
|
|
||||||
new Path(srcDir, "source.txt"));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -538,7 +555,7 @@ public class ITestDynamoDBMetadataStoreAuthoritativeMode
|
||||||
@Test
|
@Test
|
||||||
public void testAuditS3GuardTool() throws Throwable {
|
public void testAuditS3GuardTool() throws Throwable {
|
||||||
describe("Test the s3guard audit CLI");
|
describe("Test the s3guard audit CLI");
|
||||||
authFS.mkdirs(methodAuthPath);
|
mkNonauthDir(methodAuthPath);
|
||||||
final String path = methodAuthPath.toString();
|
final String path = methodAuthPath.toString();
|
||||||
// this is non-auth, so the scan is rejected
|
// this is non-auth, so the scan is rejected
|
||||||
expectExecResult(EXIT_NOT_ACCEPTABLE,
|
expectExecResult(EXIT_NOT_ACCEPTABLE,
|
||||||
|
@ -645,7 +662,8 @@ public class ITestDynamoDBMetadataStoreAuthoritativeMode
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Invoke an operation expecting the meta store to be updated{@code updates}
|
* Invoke an operation expecting the meta store to have its
|
||||||
|
* directoryMarkedAuthoritative count to be be updated {@code updates}
|
||||||
* times and S3 LIST requests made {@code lists} times.
|
* times and S3 LIST requests made {@code lists} times.
|
||||||
* @param <T> Return type
|
* @param <T> Return type
|
||||||
* @param updates Expected count
|
* @param updates Expected count
|
||||||
|
@ -669,6 +687,25 @@ public class ITestDynamoDBMetadataStoreAuthoritativeMode
|
||||||
return call;
|
return call;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Invoke an operation expecting {@code writes} records written to DDB.
|
||||||
|
* @param <T> Return type
|
||||||
|
* @param writes Expected count
|
||||||
|
* @param fn Function to invoke
|
||||||
|
* @return Result of the function call
|
||||||
|
* @throws Exception Failure
|
||||||
|
*/
|
||||||
|
private <T> T expectOperationUpdatesDDB(
|
||||||
|
int writes,
|
||||||
|
Callable<T> fn)
|
||||||
|
throws Exception {
|
||||||
|
S3ATestUtils.MetricDiff writeDiff = new S3ATestUtils.MetricDiff(authFS,
|
||||||
|
S3GUARD_METADATASTORE_RECORD_WRITES);
|
||||||
|
final T call = fn.call();
|
||||||
|
writeDiff.assertDiffEquals(writes);
|
||||||
|
return call;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Assert that a listStatus call increments the
|
* Assert that a listStatus call increments the
|
||||||
* "s3guard_metadatastore_authoritative_directories_updated" counter.
|
* "s3guard_metadatastore_authoritative_directories_updated" counter.
|
||||||
|
@ -695,9 +732,40 @@ public class ITestDynamoDBMetadataStoreAuthoritativeMode
|
||||||
*/
|
*/
|
||||||
private void mkAuthDir(Path path) throws IOException {
|
private void mkAuthDir(Path path) throws IOException {
|
||||||
authFS.mkdirs(path);
|
authFS.mkdirs(path);
|
||||||
authFS.listStatus(path);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create a non-auth directory.
|
||||||
|
* @param path dir
|
||||||
|
*/
|
||||||
|
private void mkNonauthDir(Path path) throws IOException {
|
||||||
|
authFS.mkdirs(path);
|
||||||
|
// overwrite entry with a nonauth one
|
||||||
|
markDirNonauth(path);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Mark a directory as nonauth.
|
||||||
|
* @param path path to the directory
|
||||||
|
* @throws IOException failure
|
||||||
|
*/
|
||||||
|
private void markDirNonauth(final Path path) throws IOException {
|
||||||
|
S3Guard.putWithTtl(metastore,
|
||||||
|
nonAuthEmptyDirectoryMarker((S3AFileStatus) authFS.getFileStatus(path)),
|
||||||
|
null, null);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create an empty dir marker which, when passed to the
|
||||||
|
* DDB metastore, is considered authoritative.
|
||||||
|
* @param status file status
|
||||||
|
* @return path metadata.
|
||||||
|
*/
|
||||||
|
private PathMetadata nonAuthEmptyDirectoryMarker(
|
||||||
|
final S3AFileStatus status) {
|
||||||
|
return new DDBPathMetadata(status, Tristate.TRUE,
|
||||||
|
false, false, 0);
|
||||||
|
}
|
||||||
/**
|
/**
|
||||||
* Performed a recursive audit of the directory
|
* Performed a recursive audit of the directory
|
||||||
* -require everything to be authoritative.
|
* -require everything to be authoritative.
|
||||||
|
@ -729,4 +797,17 @@ public class ITestDynamoDBMetadataStoreAuthoritativeMode
|
||||||
.getPath();
|
.getPath();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Performed a recursive audit of the directory
|
||||||
|
* -expect a failure.
|
||||||
|
* @param path directory
|
||||||
|
* @return the path returned by the exception
|
||||||
|
*/
|
||||||
|
private Path expectNonauthNonRecursive(Path path) throws Exception {
|
||||||
|
return intercept(
|
||||||
|
AuthoritativeAuditOperation.NonAuthoritativeDirException.class,
|
||||||
|
() -> auditor.executeAudit(path, true, true))
|
||||||
|
.getPath();
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -21,6 +21,7 @@ package org.apache.hadoop.fs.s3a.s3guard;
|
||||||
import javax.annotation.Nullable;
|
import javax.annotation.Nullable;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
import java.util.Collections;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.concurrent.Callable;
|
import java.util.concurrent.Callable;
|
||||||
import java.util.concurrent.ExecutorService;
|
import java.util.concurrent.ExecutorService;
|
||||||
|
@ -377,7 +378,7 @@ public class ITestDynamoDBMetadataStoreScale
|
||||||
execute("list",
|
execute("list",
|
||||||
OPERATIONS_PER_THREAD,
|
OPERATIONS_PER_THREAD,
|
||||||
expectThrottling(),
|
expectThrottling(),
|
||||||
() -> ddbms.put(children, state));
|
() -> ddbms.put(children, Collections.emptyList(), state));
|
||||||
} finally {
|
} finally {
|
||||||
retryingDelete(path);
|
retryingDelete(path);
|
||||||
}
|
}
|
||||||
|
|
|
@ -21,6 +21,7 @@ package org.apache.hadoop.fs.s3a.s3guard;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.net.URI;
|
import java.net.URI;
|
||||||
|
import java.util.Collections;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.UUID;
|
import java.util.UUID;
|
||||||
|
|
||||||
|
@ -271,8 +272,8 @@ public class ITestS3GuardFsck extends AbstractS3ATestBase {
|
||||||
final DirListingMetadata dlmIc = metadataStore.listChildren(cwdIncorrect);
|
final DirListingMetadata dlmIc = metadataStore.listChildren(cwdIncorrect);
|
||||||
dlmC.setAuthoritative(true);
|
dlmC.setAuthoritative(true);
|
||||||
dlmIc.setAuthoritative(true);
|
dlmIc.setAuthoritative(true);
|
||||||
metadataStore.put(dlmC, null);
|
metadataStore.put(dlmC, Collections.emptyList(), null);
|
||||||
metadataStore.put(dlmIc, null);
|
metadataStore.put(dlmIc, Collections.emptyList(), null);
|
||||||
|
|
||||||
// add a file raw so the listing will be different.
|
// add a file raw so the listing will be different.
|
||||||
touchRawAndWaitRaw(fileIc2);
|
touchRawAndWaitRaw(fileIc2);
|
||||||
|
|
|
@ -22,6 +22,7 @@ import java.io.IOException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
|
import java.util.Collections;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
@ -74,6 +75,8 @@ public abstract class MetadataStoreTestBase extends HadoopTestBase {
|
||||||
private final long accessTime = 0;
|
private final long accessTime = 0;
|
||||||
private static ITtlTimeProvider ttlTimeProvider;
|
private static ITtlTimeProvider ttlTimeProvider;
|
||||||
|
|
||||||
|
private static final List<Path> EMPTY_LIST = Collections.emptyList();
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Each test should override this. Will use a new Configuration instance.
|
* Each test should override this. Will use a new Configuration instance.
|
||||||
* @return Contract which specifies the MetadataStore under test plus config.
|
* @return Contract which specifies the MetadataStore under test plus config.
|
||||||
|
@ -558,7 +561,7 @@ public abstract class MetadataStoreTestBase extends HadoopTestBase {
|
||||||
dirMeta.setAuthoritative(true);
|
dirMeta.setAuthoritative(true);
|
||||||
dirMeta.put(new PathMetadata(
|
dirMeta.put(new PathMetadata(
|
||||||
makeFileStatus("/a1/b1/file_new", 100)));
|
makeFileStatus("/a1/b1/file_new", 100)));
|
||||||
ms.put(dirMeta, null);
|
ms.put(dirMeta, EMPTY_LIST, null);
|
||||||
|
|
||||||
dirMeta = ms.listChildren(strToPath("/a1/b1"));
|
dirMeta = ms.listChildren(strToPath("/a1/b1"));
|
||||||
assertListingsEqual(dirMeta.getListing(), "/a1/b1/file1", "/a1/b1/file2",
|
assertListingsEqual(dirMeta.getListing(), "/a1/b1/file1", "/a1/b1/file2",
|
||||||
|
@ -759,7 +762,7 @@ public abstract class MetadataStoreTestBase extends HadoopTestBase {
|
||||||
if (!allowMissing()) {
|
if (!allowMissing()) {
|
||||||
DirListingMetadata parentDirMd = ms.listChildren(strToPath(parentDir));
|
DirListingMetadata parentDirMd = ms.listChildren(strToPath(parentDir));
|
||||||
parentDirMd.setAuthoritative(true);
|
parentDirMd.setAuthoritative(true);
|
||||||
ms.put(parentDirMd, null);
|
ms.put(parentDirMd, EMPTY_LIST, null);
|
||||||
}
|
}
|
||||||
|
|
||||||
ms.prune(MetadataStore.PruneMode.ALL_BY_MODTIME, time);
|
ms.prune(MetadataStore.PruneMode.ALL_BY_MODTIME, time);
|
||||||
|
@ -798,7 +801,7 @@ public abstract class MetadataStoreTestBase extends HadoopTestBase {
|
||||||
// set parent dir as authoritative
|
// set parent dir as authoritative
|
||||||
DirListingMetadata parentDirMd = ms.listChildren(strToPath(parentDir));
|
DirListingMetadata parentDirMd = ms.listChildren(strToPath(parentDir));
|
||||||
parentDirMd.setAuthoritative(true);
|
parentDirMd.setAuthoritative(true);
|
||||||
ms.put(parentDirMd, null);
|
ms.put(parentDirMd, EMPTY_LIST, null);
|
||||||
|
|
||||||
// prune the ms
|
// prune the ms
|
||||||
ms.prune(MetadataStore.PruneMode.ALL_BY_MODTIME, time);
|
ms.prune(MetadataStore.PruneMode.ALL_BY_MODTIME, time);
|
||||||
|
@ -830,7 +833,7 @@ public abstract class MetadataStoreTestBase extends HadoopTestBase {
|
||||||
}
|
}
|
||||||
DirListingMetadata dirMeta =
|
DirListingMetadata dirMeta =
|
||||||
new DirListingMetadata(strToPath(dirPath), metas, authoritative);
|
new DirListingMetadata(strToPath(dirPath), metas, authoritative);
|
||||||
ms.put(dirMeta, null);
|
ms.put(dirMeta, EMPTY_LIST, null);
|
||||||
|
|
||||||
if (!allowMissing()) {
|
if (!allowMissing()) {
|
||||||
assertDirectorySize(dirPath, filenames.length);
|
assertDirectorySize(dirPath, filenames.length);
|
||||||
|
@ -1011,7 +1014,7 @@ public abstract class MetadataStoreTestBase extends HadoopTestBase {
|
||||||
}
|
}
|
||||||
DirListingMetadata dirMeta =
|
DirListingMetadata dirMeta =
|
||||||
new DirListingMetadata(strToPath(dirPath), metas, authoritative);
|
new DirListingMetadata(strToPath(dirPath), metas, authoritative);
|
||||||
ms.put(dirMeta, null);
|
ms.put(dirMeta, EMPTY_LIST, null);
|
||||||
}
|
}
|
||||||
|
|
||||||
protected void createNewDirs(String... dirs)
|
protected void createNewDirs(String... dirs)
|
||||||
|
|
|
@ -18,15 +18,20 @@
|
||||||
|
|
||||||
package org.apache.hadoop.fs.s3a.s3guard;
|
package org.apache.hadoop.fs.s3a.s3guard;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
|
import java.util.Collections;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Locale;
|
import java.util.Locale;
|
||||||
import java.util.UUID;
|
import java.util.UUID;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
import org.assertj.core.api.Assertions;
|
||||||
|
import org.junit.After;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
|
import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
@ -51,38 +56,180 @@ import static org.mockito.Mockito.when;
|
||||||
*/
|
*/
|
||||||
public class TestS3Guard extends Assert {
|
public class TestS3Guard extends Assert {
|
||||||
|
|
||||||
|
public static final String MS_FILE_1 = "s3a://bucket/dir/ms-file1";
|
||||||
|
|
||||||
|
public static final String MS_FILE_2 = "s3a://bucket/dir/ms-file2";
|
||||||
|
|
||||||
|
public static final String S3_FILE_3 = "s3a://bucket/dir/s3-file3";
|
||||||
|
|
||||||
|
public static final String S3_DIR_4 = "s3a://bucket/dir/s3-dir4";
|
||||||
|
|
||||||
|
public static final Path DIR_PATH = new Path("s3a://bucket/dir");
|
||||||
|
|
||||||
|
private MetadataStore ms;
|
||||||
|
|
||||||
|
private ITtlTimeProvider timeProvider;
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void setUp() throws Exception {
|
||||||
|
final Configuration conf = new Configuration(false);
|
||||||
|
ms = new LocalMetadataStore();
|
||||||
|
ms.initialize(conf, new S3Guard.TtlTimeProvider(conf));
|
||||||
|
timeProvider = new S3Guard.TtlTimeProvider(
|
||||||
|
DEFAULT_METADATASTORE_METADATA_TTL);
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@After
|
||||||
|
public void tearDown() throws Exception {
|
||||||
|
if (ms != null) {
|
||||||
|
ms.destroy();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Basic test to ensure results from S3 and MetadataStore are merged
|
* Basic test to ensure results from S3 and MetadataStore are merged
|
||||||
* correctly.
|
* correctly.
|
||||||
*/
|
*/
|
||||||
@Test
|
@Test
|
||||||
public void testDirListingUnion() throws Exception {
|
public void testDirListingUnionNonauth() throws Exception {
|
||||||
MetadataStore ms = new LocalMetadataStore();
|
|
||||||
|
|
||||||
Path dirPath = new Path("s3a://bucket/dir");
|
|
||||||
|
|
||||||
// Two files in metadata store listing
|
// Two files in metadata store listing
|
||||||
PathMetadata m1 = makePathMeta("s3a://bucket/dir/ms-file1", false);
|
PathMetadata m1 = makePathMeta(MS_FILE_1, false);
|
||||||
PathMetadata m2 = makePathMeta("s3a://bucket/dir/ms-file2", false);
|
PathMetadata m2 = makePathMeta(MS_FILE_2, false);
|
||||||
DirListingMetadata dirMeta = new DirListingMetadata(dirPath,
|
DirListingMetadata dirMeta = new DirListingMetadata(DIR_PATH,
|
||||||
Arrays.asList(m1, m2), false);
|
Arrays.asList(m1, m2), false);
|
||||||
|
|
||||||
// Two other files in s3
|
// Two other entries in s3
|
||||||
|
final S3AFileStatus s1Status = makeFileStatus(S3_FILE_3, false);
|
||||||
|
final S3AFileStatus s2Status = makeFileStatus(S3_DIR_4, true);
|
||||||
List<S3AFileStatus> s3Listing = Arrays.asList(
|
List<S3AFileStatus> s3Listing = Arrays.asList(
|
||||||
makeFileStatus("s3a://bucket/dir/s3-file3", false),
|
s1Status,
|
||||||
makeFileStatus("s3a://bucket/dir/s3-file4", false)
|
s2Status);
|
||||||
);
|
|
||||||
|
|
||||||
ITtlTimeProvider timeProvider = new S3Guard.TtlTimeProvider(
|
FileStatus[] result = S3Guard.dirListingUnion(ms, DIR_PATH, s3Listing,
|
||||||
DEFAULT_METADATASTORE_METADATA_TTL);
|
|
||||||
FileStatus[] result = S3Guard.dirListingUnion(ms, dirPath, s3Listing,
|
|
||||||
dirMeta, false, timeProvider);
|
dirMeta, false, timeProvider);
|
||||||
|
|
||||||
assertEquals("listing length", 4, result.length);
|
assertEquals("listing length", 4, result.length);
|
||||||
assertContainsPath(result, "s3a://bucket/dir/ms-file1");
|
assertContainsPaths(result, MS_FILE_1, MS_FILE_2, S3_FILE_3, S3_DIR_4);
|
||||||
assertContainsPath(result, "s3a://bucket/dir/ms-file2");
|
|
||||||
assertContainsPath(result, "s3a://bucket/dir/s3-file3");
|
// check the MS doesn't contain the s3 entries as nonauth
|
||||||
assertContainsPath(result, "s3a://bucket/dir/s3-file4");
|
// unions should block them
|
||||||
|
assertNoRecord(ms, S3_FILE_3);
|
||||||
|
assertNoRecord(ms, S3_DIR_4);
|
||||||
|
|
||||||
|
// for entries which do exist, when updated in S3, the metastore is updated
|
||||||
|
S3AFileStatus f1Status2 = new S3AFileStatus(
|
||||||
|
200, System.currentTimeMillis(), new Path(MS_FILE_1),
|
||||||
|
1, null, "tag2", "ver2");
|
||||||
|
FileStatus[] result2 = S3Guard.dirListingUnion(ms, DIR_PATH,
|
||||||
|
Arrays.asList(f1Status2),
|
||||||
|
dirMeta, false, timeProvider);
|
||||||
|
// the listing returns the new status
|
||||||
|
Assertions.assertThat(find(result2, MS_FILE_1))
|
||||||
|
.describedAs("Entry in listing results for %s", MS_FILE_1)
|
||||||
|
.isSameAs(f1Status2);
|
||||||
|
// as does a query of the MS
|
||||||
|
final PathMetadata updatedMD = verifyRecord(ms, MS_FILE_1);
|
||||||
|
Assertions.assertThat(updatedMD.getFileStatus())
|
||||||
|
.describedAs("Entry in metastore for %s: %s", MS_FILE_1, updatedMD)
|
||||||
|
.isEqualTo(f1Status2);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Auth mode unions are different.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testDirListingUnionAuth() throws Exception {
|
||||||
|
|
||||||
|
// Two files in metadata store listing
|
||||||
|
PathMetadata m1 = makePathMeta(MS_FILE_1, false);
|
||||||
|
PathMetadata m2 = makePathMeta(MS_FILE_2, false);
|
||||||
|
DirListingMetadata dirMeta = new DirListingMetadata(DIR_PATH,
|
||||||
|
Arrays.asList(m1, m2), true);
|
||||||
|
|
||||||
|
// Two other entries in s3
|
||||||
|
S3AFileStatus s1Status = makeFileStatus(S3_FILE_3, false);
|
||||||
|
S3AFileStatus s2Status = makeFileStatus(S3_DIR_4, true);
|
||||||
|
List<S3AFileStatus> s3Listing = Arrays.asList(
|
||||||
|
s1Status,
|
||||||
|
s2Status);
|
||||||
|
|
||||||
|
ITtlTimeProvider timeProvider = new S3Guard.TtlTimeProvider(
|
||||||
|
DEFAULT_METADATASTORE_METADATA_TTL);
|
||||||
|
FileStatus[] result = S3Guard.dirListingUnion(ms, DIR_PATH, s3Listing,
|
||||||
|
dirMeta, true, timeProvider);
|
||||||
|
|
||||||
|
assertEquals("listing length", 4, result.length);
|
||||||
|
assertContainsPaths(result, MS_FILE_1, MS_FILE_2, S3_FILE_3, S3_DIR_4);
|
||||||
|
|
||||||
|
// now verify an auth scan added the records
|
||||||
|
PathMetadata file3Meta = verifyRecord(ms, S3_FILE_3);
|
||||||
|
PathMetadata dir4Meta = verifyRecord(ms, S3_DIR_4);
|
||||||
|
|
||||||
|
// we can't check auth flag handling because local FS doesn't have one
|
||||||
|
// so do just check the dir status still all good.
|
||||||
|
Assertions.assertThat(dir4Meta)
|
||||||
|
.describedAs("Metastore entry for dir %s", dir4Meta)
|
||||||
|
.matches(m -> m.getFileStatus().isDirectory());
|
||||||
|
|
||||||
|
DirListingMetadata dirMeta2 = new DirListingMetadata(DIR_PATH,
|
||||||
|
Arrays.asList(m1, m2, file3Meta, dir4Meta), true);
|
||||||
|
// now s1 status is updated on S3
|
||||||
|
S3AFileStatus s1Status2 = new S3AFileStatus(
|
||||||
|
200, System.currentTimeMillis(), new Path(S3_FILE_3),
|
||||||
|
1, null, "tag2", "ver2");
|
||||||
|
|
||||||
|
// but the result of the listing contains the old entry
|
||||||
|
// because auth mode doesn't pick up changes in S3 which
|
||||||
|
// didn't go through s3guard
|
||||||
|
FileStatus[] result2 = S3Guard.dirListingUnion(ms, DIR_PATH,
|
||||||
|
Arrays.asList(s1Status2),
|
||||||
|
dirMeta2, true, timeProvider);
|
||||||
|
Assertions.assertThat(find(result2, S3_FILE_3))
|
||||||
|
.describedAs("Entry in listing results for %s", S3_FILE_3)
|
||||||
|
.isSameAs(file3Meta.getFileStatus());
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Assert there is no record in the store.
|
||||||
|
* @param ms metastore
|
||||||
|
* @param path path
|
||||||
|
* @throws IOException IOError
|
||||||
|
*/
|
||||||
|
private void assertNoRecord(MetadataStore ms, String path)
|
||||||
|
throws IOException {
|
||||||
|
Assertions.assertThat(lookup(ms, path))
|
||||||
|
.describedAs("Metastore entry for %s", path)
|
||||||
|
.isNull();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Assert there is arecord in the store, then return it.
|
||||||
|
* @param ms metastore
|
||||||
|
* @param path path
|
||||||
|
* @return the record.
|
||||||
|
* @throws IOException IO Error
|
||||||
|
*/
|
||||||
|
private PathMetadata verifyRecord(MetadataStore ms, String path)
|
||||||
|
throws IOException {
|
||||||
|
final PathMetadata md = lookup(ms, path);
|
||||||
|
Assertions.assertThat(md)
|
||||||
|
.describedAs("Metastore entry for %s", path)
|
||||||
|
.isNotNull();
|
||||||
|
return md;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Look up a record.
|
||||||
|
* @param ms store
|
||||||
|
* @param path path
|
||||||
|
* @return the record or null
|
||||||
|
* @throws IOException IO Error
|
||||||
|
*/
|
||||||
|
private PathMetadata lookup(final MetadataStore ms, final String path)
|
||||||
|
throws IOException {
|
||||||
|
return ms.get(new Path(path));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -96,12 +243,12 @@ public class TestS3Guard extends Assert {
|
||||||
when(timeProvider.getNow()).thenReturn(100L);
|
when(timeProvider.getNow()).thenReturn(100L);
|
||||||
|
|
||||||
// act
|
// act
|
||||||
S3Guard.putWithTtl(ms, dlm, timeProvider, null);
|
S3Guard.putWithTtl(ms, dlm, Collections.emptyList(), timeProvider, null);
|
||||||
|
|
||||||
// assert
|
// assert
|
||||||
assertEquals("last update in " + dlm, 100L, dlm.getLastUpdated());
|
assertEquals("last update in " + dlm, 100L, dlm.getLastUpdated());
|
||||||
verify(timeProvider, times(1)).getNow();
|
verify(timeProvider, times(1)).getNow();
|
||||||
verify(ms, times(1)).put(dlm, null);
|
verify(ms, times(1)).put(dlm, Collections.emptyList(), null);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -290,18 +437,32 @@ public class TestS3Guard extends Assert {
|
||||||
localLogger, "FOO_BAR_LEVEL", "bucket"));
|
localLogger, "FOO_BAR_LEVEL", "bucket"));
|
||||||
}
|
}
|
||||||
|
|
||||||
void assertContainsPath(FileStatus[] statuses, String pathStr) {
|
void assertContainsPaths(FileStatus[] statuses, String...pathStr) {
|
||||||
assertTrue("listing doesn't contain " + pathStr,
|
for (String s :pathStr) {
|
||||||
containsPath(statuses, pathStr));
|
assertContainsPath(statuses, s);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
boolean containsPath(FileStatus[] statuses, String pathStr) {
|
void assertContainsPath(FileStatus[] statuses, String pathStr) {
|
||||||
|
find(statuses, pathStr);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Look up an entry or raise an assertion
|
||||||
|
* @param statuses list of statuses
|
||||||
|
* @param pathStr path to search
|
||||||
|
* @return the entry if found
|
||||||
|
*/
|
||||||
|
private FileStatus find(FileStatus[] statuses, String pathStr) {
|
||||||
for (FileStatus s : statuses) {
|
for (FileStatus s : statuses) {
|
||||||
if (s.getPath().toString().equals(pathStr)) {
|
if (s.getPath().toString().equals(pathStr)) {
|
||||||
return true;
|
return s;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return false;
|
// no match, fail meaningfully
|
||||||
|
Assertions.assertThat(statuses)
|
||||||
|
.anyMatch(s -> s.getPath().toString().equals(pathStr));
|
||||||
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
private PathMetadata makePathMeta(String pathStr, boolean isDir) {
|
private PathMetadata makePathMeta(String pathStr, boolean isDir) {
|
||||||
|
|
Loading…
Reference in New Issue