HADOOP-14154 Persist isAuthoritative bit in DynamoDBMetaStore (Contributed by Gabor Bota)
This commit is contained in:
parent
8d7c93186e
commit
d7232857d8
|
@ -0,0 +1,77 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.fs.s3a.s3guard;
|
||||||
|
|
||||||
|
import org.apache.hadoop.fs.FileStatus;
|
||||||
|
import org.apache.hadoop.fs.s3a.Tristate;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* {@code DDBPathMetadata} wraps {@link PathMetadata} and adds the
|
||||||
|
* isAuthoritativeDir flag to provide support for authoritative directory
|
||||||
|
* listings in {@link DynamoDBMetadataStore}.
|
||||||
|
*/
|
||||||
|
public class DDBPathMetadata extends PathMetadata {
|
||||||
|
|
||||||
|
private boolean isAuthoritativeDir;
|
||||||
|
|
||||||
|
public DDBPathMetadata(PathMetadata pmd, boolean isAuthoritativeDir) {
|
||||||
|
super(pmd.getFileStatus(), pmd.isEmptyDirectory(), pmd.isDeleted());
|
||||||
|
this.isAuthoritativeDir = isAuthoritativeDir;
|
||||||
|
}
|
||||||
|
|
||||||
|
public DDBPathMetadata(PathMetadata pmd) {
|
||||||
|
super(pmd.getFileStatus(), pmd.isEmptyDirectory(), pmd.isDeleted());
|
||||||
|
this.isAuthoritativeDir = false;
|
||||||
|
}
|
||||||
|
|
||||||
|
public DDBPathMetadata(FileStatus fileStatus) {
|
||||||
|
super(fileStatus);
|
||||||
|
this.isAuthoritativeDir = false;
|
||||||
|
}
|
||||||
|
|
||||||
|
public DDBPathMetadata(FileStatus fileStatus, Tristate isEmptyDir,
|
||||||
|
boolean isDeleted) {
|
||||||
|
super(fileStatus, isEmptyDir, isDeleted);
|
||||||
|
this.isAuthoritativeDir = false;
|
||||||
|
}
|
||||||
|
|
||||||
|
public DDBPathMetadata(FileStatus fileStatus, Tristate isEmptyDir,
|
||||||
|
boolean isDeleted, boolean isAuthoritativeDir) {
|
||||||
|
super(fileStatus, isEmptyDir, isDeleted);
|
||||||
|
this.isAuthoritativeDir = isAuthoritativeDir;
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean isAuthoritativeDir() {
|
||||||
|
return isAuthoritativeDir;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setAuthoritativeDir(boolean authoritativeDir) {
|
||||||
|
isAuthoritativeDir = authoritativeDir;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean equals(Object o) {
|
||||||
|
return super.equals(o);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override public int hashCode() {
|
||||||
|
return super.hashCode();
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -28,11 +28,16 @@ import java.util.Arrays;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.Date;
|
import java.util.Date;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
|
import java.util.HashSet;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
import java.util.Objects;
|
||||||
|
import java.util.Set;
|
||||||
import java.util.TreeMap;
|
import java.util.TreeMap;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
import com.amazonaws.AmazonClientException;
|
import com.amazonaws.AmazonClientException;
|
||||||
import com.amazonaws.auth.AWSCredentialsProvider;
|
import com.amazonaws.auth.AWSCredentialsProvider;
|
||||||
|
@ -422,7 +427,7 @@ public class DynamoDBMetadataStore implements MetadataStore {
|
||||||
boolean idempotent = S3AFileSystem.DELETE_CONSIDERED_IDEMPOTENT;
|
boolean idempotent = S3AFileSystem.DELETE_CONSIDERED_IDEMPOTENT;
|
||||||
if (tombstone) {
|
if (tombstone) {
|
||||||
Item item = PathMetadataDynamoDBTranslation.pathMetadataToItem(
|
Item item = PathMetadataDynamoDBTranslation.pathMetadataToItem(
|
||||||
PathMetadata.tombstone(path));
|
new DDBPathMetadata(PathMetadata.tombstone(path)));
|
||||||
invoker.retry("Put tombstone", path.toString(), idempotent,
|
invoker.retry("Put tombstone", path.toString(), idempotent,
|
||||||
() -> table.putItem(item));
|
() -> table.putItem(item));
|
||||||
} else {
|
} else {
|
||||||
|
@ -461,13 +466,13 @@ public class DynamoDBMetadataStore implements MetadataStore {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@Retries.OnceTranslated
|
@Retries.OnceTranslated
|
||||||
public PathMetadata get(Path path) throws IOException {
|
public DDBPathMetadata get(Path path) throws IOException {
|
||||||
return get(path, false);
|
return get(path, false);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@Retries.OnceTranslated
|
@Retries.OnceTranslated
|
||||||
public PathMetadata get(Path path, boolean wantEmptyDirectoryFlag)
|
public DDBPathMetadata get(Path path, boolean wantEmptyDirectoryFlag)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
checkPath(path);
|
checkPath(path);
|
||||||
LOG.debug("Get from table {} in region {}: {}", tableName, region, path);
|
LOG.debug("Get from table {} in region {}: {}", tableName, region, path);
|
||||||
|
@ -485,12 +490,13 @@ public class DynamoDBMetadataStore implements MetadataStore {
|
||||||
* @throws AmazonClientException dynamo DB level problem
|
* @throws AmazonClientException dynamo DB level problem
|
||||||
*/
|
*/
|
||||||
@Retries.OnceRaw
|
@Retries.OnceRaw
|
||||||
private PathMetadata innerGet(Path path, boolean wantEmptyDirectoryFlag)
|
private DDBPathMetadata innerGet(Path path, boolean wantEmptyDirectoryFlag)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
final PathMetadata meta;
|
final DDBPathMetadata meta;
|
||||||
if (path.isRoot()) {
|
if (path.isRoot()) {
|
||||||
// Root does not persist in the table
|
// Root does not persist in the table
|
||||||
meta = new PathMetadata(makeDirStatus(username, path));
|
meta =
|
||||||
|
new DDBPathMetadata(makeDirStatus(username, path));
|
||||||
} else {
|
} else {
|
||||||
final Item item = getConsistentItem(pathToKey(path));
|
final Item item = getConsistentItem(pathToKey(path));
|
||||||
meta = itemToPathMetadata(item, username);
|
meta = itemToPathMetadata(item, username);
|
||||||
|
@ -550,15 +556,22 @@ public class DynamoDBMetadataStore implements MetadataStore {
|
||||||
|
|
||||||
final List<PathMetadata> metas = new ArrayList<>();
|
final List<PathMetadata> metas = new ArrayList<>();
|
||||||
for (Item item : items) {
|
for (Item item : items) {
|
||||||
PathMetadata meta = itemToPathMetadata(item, username);
|
DDBPathMetadata meta = itemToPathMetadata(item, username);
|
||||||
metas.add(meta);
|
metas.add(meta);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
DDBPathMetadata dirPathMeta = get(path);
|
||||||
|
boolean isAuthoritative = false;
|
||||||
|
if(dirPathMeta != null) {
|
||||||
|
isAuthoritative = dirPathMeta.isAuthoritativeDir();
|
||||||
|
}
|
||||||
|
|
||||||
LOG.trace("Listing table {} in region {} for {} returning {}",
|
LOG.trace("Listing table {} in region {} for {} returning {}",
|
||||||
tableName, region, path, metas);
|
tableName, region, path, metas);
|
||||||
|
|
||||||
return (metas.isEmpty() && get(path) == null)
|
return (metas.isEmpty() && dirPathMeta == null)
|
||||||
? null
|
? null
|
||||||
: new DirListingMetadata(path, metas, false);
|
: new DirListingMetadata(path, metas, isAuthoritative);
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -567,24 +580,25 @@ public class DynamoDBMetadataStore implements MetadataStore {
|
||||||
* @param pathsToCreate paths to create
|
* @param pathsToCreate paths to create
|
||||||
* @return the full ancestry paths
|
* @return the full ancestry paths
|
||||||
*/
|
*/
|
||||||
Collection<PathMetadata> completeAncestry(
|
Collection<DDBPathMetadata> completeAncestry(
|
||||||
Collection<PathMetadata> pathsToCreate) {
|
Collection<DDBPathMetadata> pathsToCreate) {
|
||||||
// Key on path to allow fast lookup
|
// Key on path to allow fast lookup
|
||||||
Map<Path, PathMetadata> ancestry = new HashMap<>();
|
Map<Path, DDBPathMetadata> ancestry = new HashMap<>();
|
||||||
|
|
||||||
for (PathMetadata meta : pathsToCreate) {
|
for (DDBPathMetadata meta : pathsToCreate) {
|
||||||
Preconditions.checkArgument(meta != null);
|
Preconditions.checkArgument(meta != null);
|
||||||
Path path = meta.getFileStatus().getPath();
|
Path path = meta.getFileStatus().getPath();
|
||||||
if (path.isRoot()) {
|
if (path.isRoot()) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
ancestry.put(path, meta);
|
ancestry.put(path, new DDBPathMetadata(meta));
|
||||||
Path parent = path.getParent();
|
Path parent = path.getParent();
|
||||||
while (!parent.isRoot() && !ancestry.containsKey(parent)) {
|
while (!parent.isRoot() && !ancestry.containsKey(parent)) {
|
||||||
LOG.debug("auto-create ancestor path {} for child path {}",
|
LOG.debug("auto-create ancestor path {} for child path {}",
|
||||||
parent, path);
|
parent, path);
|
||||||
final FileStatus status = makeDirStatus(parent, username);
|
final FileStatus status = makeDirStatus(parent, username);
|
||||||
ancestry.put(parent, new PathMetadata(status, Tristate.FALSE, false));
|
ancestry.put(parent, new DDBPathMetadata(status, Tristate.FALSE,
|
||||||
|
false));
|
||||||
parent = parent.getParent();
|
parent = parent.getParent();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -611,13 +625,13 @@ public class DynamoDBMetadataStore implements MetadataStore {
|
||||||
// Following code is to maintain this invariant by putting all ancestor
|
// Following code is to maintain this invariant by putting all ancestor
|
||||||
// directories of the paths to create.
|
// directories of the paths to create.
|
||||||
// ancestor paths that are not explicitly added to paths to create
|
// ancestor paths that are not explicitly added to paths to create
|
||||||
Collection<PathMetadata> newItems = new ArrayList<>();
|
Collection<DDBPathMetadata> newItems = new ArrayList<>();
|
||||||
if (pathsToCreate != null) {
|
if (pathsToCreate != null) {
|
||||||
newItems.addAll(completeAncestry(pathsToCreate));
|
newItems.addAll(completeAncestry(pathMetaToDDBPathMeta(pathsToCreate)));
|
||||||
}
|
}
|
||||||
if (pathsToDelete != null) {
|
if (pathsToDelete != null) {
|
||||||
for (Path meta : pathsToDelete) {
|
for (Path meta : pathsToDelete) {
|
||||||
newItems.add(PathMetadata.tombstone(meta));
|
newItems.add(new DDBPathMetadata(PathMetadata.tombstone(meta)));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -725,7 +739,11 @@ public class DynamoDBMetadataStore implements MetadataStore {
|
||||||
@Override
|
@Override
|
||||||
@Retries.OnceRaw
|
@Retries.OnceRaw
|
||||||
public void put(Collection<PathMetadata> metas) throws IOException {
|
public void put(Collection<PathMetadata> metas) throws IOException {
|
||||||
|
innerPut(pathMetaToDDBPathMeta(metas));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Retries.OnceRaw
|
||||||
|
private void innerPut(Collection<DDBPathMetadata> metas) throws IOException {
|
||||||
Item[] items = pathMetadataToItem(completeAncestry(metas));
|
Item[] items = pathMetadataToItem(completeAncestry(metas));
|
||||||
LOG.debug("Saving batch of {} items to table {}, region {}", items.length,
|
LOG.debug("Saving batch of {} items to table {}, region {}", items.length,
|
||||||
tableName, region);
|
tableName, region);
|
||||||
|
@ -736,10 +754,10 @@ public class DynamoDBMetadataStore implements MetadataStore {
|
||||||
* Helper method to get full path of ancestors that are nonexistent in table.
|
* Helper method to get full path of ancestors that are nonexistent in table.
|
||||||
*/
|
*/
|
||||||
@Retries.OnceRaw
|
@Retries.OnceRaw
|
||||||
private Collection<PathMetadata> fullPathsToPut(PathMetadata meta)
|
private Collection<DDBPathMetadata> fullPathsToPut(DDBPathMetadata meta)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
checkPathMetadata(meta);
|
checkPathMetadata(meta);
|
||||||
final Collection<PathMetadata> metasToPut = new ArrayList<>();
|
final Collection<DDBPathMetadata> metasToPut = new ArrayList<>();
|
||||||
// root path is not persisted
|
// root path is not persisted
|
||||||
if (!meta.getFileStatus().getPath().isRoot()) {
|
if (!meta.getFileStatus().getPath().isRoot()) {
|
||||||
metasToPut.add(meta);
|
metasToPut.add(meta);
|
||||||
|
@ -752,7 +770,8 @@ public class DynamoDBMetadataStore implements MetadataStore {
|
||||||
final Item item = getConsistentItem(pathToKey(path));
|
final Item item = getConsistentItem(pathToKey(path));
|
||||||
if (!itemExists(item)) {
|
if (!itemExists(item)) {
|
||||||
final FileStatus status = makeDirStatus(path, username);
|
final FileStatus status = makeDirStatus(path, username);
|
||||||
metasToPut.add(new PathMetadata(status, Tristate.FALSE, false));
|
metasToPut.add(new DDBPathMetadata(status, Tristate.FALSE, false,
|
||||||
|
meta.isAuthoritativeDir()));
|
||||||
path = path.getParent();
|
path = path.getParent();
|
||||||
} else {
|
} else {
|
||||||
break;
|
break;
|
||||||
|
@ -793,16 +812,17 @@ public class DynamoDBMetadataStore implements MetadataStore {
|
||||||
|
|
||||||
// directory path
|
// directory path
|
||||||
Path path = meta.getPath();
|
Path path = meta.getPath();
|
||||||
PathMetadata p = new PathMetadata(makeDirStatus(path, username),
|
DDBPathMetadata ddbPathMeta =
|
||||||
meta.isEmpty(), false);
|
new DDBPathMetadata(makeDirStatus(path, username), meta.isEmpty(),
|
||||||
|
false, meta.isAuthoritative());
|
||||||
|
|
||||||
// First add any missing ancestors...
|
// First add any missing ancestors...
|
||||||
final Collection<PathMetadata> metasToPut = invoker.retry(
|
final Collection<DDBPathMetadata> metasToPut = invoker.retry(
|
||||||
"paths to put", path.toString(), true,
|
"paths to put", path.toString(), true,
|
||||||
() -> fullPathsToPut(p));
|
() -> fullPathsToPut(ddbPathMeta));
|
||||||
|
|
||||||
// next add all children of the directory
|
// next add all children of the directory
|
||||||
metasToPut.addAll(meta.getListing());
|
metasToPut.addAll(pathMetaToDDBPathMeta(meta.getListing()));
|
||||||
|
|
||||||
Invoker.once("put", path.toString(),
|
Invoker.once("put", path.toString(),
|
||||||
() -> processBatchWriteRequest(null, pathMetadataToItem(metasToPut)));
|
() -> processBatchWriteRequest(null, pathMetadataToItem(metasToPut)));
|
||||||
|
@ -880,21 +900,38 @@ public class DynamoDBMetadataStore implements MetadataStore {
|
||||||
new ArrayList<>(S3GUARD_DDB_BATCH_WRITE_REQUEST_LIMIT);
|
new ArrayList<>(S3GUARD_DDB_BATCH_WRITE_REQUEST_LIMIT);
|
||||||
int delay = conf.getInt(S3GUARD_DDB_BACKGROUND_SLEEP_MSEC_KEY,
|
int delay = conf.getInt(S3GUARD_DDB_BACKGROUND_SLEEP_MSEC_KEY,
|
||||||
S3GUARD_DDB_BACKGROUND_SLEEP_MSEC_DEFAULT);
|
S3GUARD_DDB_BACKGROUND_SLEEP_MSEC_DEFAULT);
|
||||||
|
Set<Path> parentPathSet = new HashSet<>();
|
||||||
for (Item item : expiredFiles(modTime, keyPrefix)) {
|
for (Item item : expiredFiles(modTime, keyPrefix)) {
|
||||||
PathMetadata md = PathMetadataDynamoDBTranslation
|
DDBPathMetadata md = PathMetadataDynamoDBTranslation
|
||||||
.itemToPathMetadata(item, username);
|
.itemToPathMetadata(item, username);
|
||||||
Path path = md.getFileStatus().getPath();
|
Path path = md.getFileStatus().getPath();
|
||||||
deletionBatch.add(path);
|
deletionBatch.add(path);
|
||||||
|
|
||||||
|
// add parent path of what we remove
|
||||||
|
Path parentPath = path.getParent();
|
||||||
|
if (parentPath != null) {
|
||||||
|
parentPathSet.add(parentPath);
|
||||||
|
}
|
||||||
|
|
||||||
itemCount++;
|
itemCount++;
|
||||||
if (deletionBatch.size() == S3GUARD_DDB_BATCH_WRITE_REQUEST_LIMIT) {
|
if (deletionBatch.size() == S3GUARD_DDB_BATCH_WRITE_REQUEST_LIMIT) {
|
||||||
Thread.sleep(delay);
|
Thread.sleep(delay);
|
||||||
processBatchWriteRequest(pathToKey(deletionBatch), null);
|
processBatchWriteRequest(pathToKey(deletionBatch), null);
|
||||||
|
|
||||||
|
// set authoritative false for each pruned dir listing
|
||||||
|
removeAuthoritativeDirFlag(parentPathSet);
|
||||||
|
parentPathSet.clear();
|
||||||
|
|
||||||
deletionBatch.clear();
|
deletionBatch.clear();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (deletionBatch.size() > 0) {
|
if (deletionBatch.size() > 0) {
|
||||||
Thread.sleep(delay);
|
Thread.sleep(delay);
|
||||||
processBatchWriteRequest(pathToKey(deletionBatch), null);
|
processBatchWriteRequest(pathToKey(deletionBatch), null);
|
||||||
|
|
||||||
|
// set authoritative false for each pruned dir listing
|
||||||
|
removeAuthoritativeDirFlag(parentPathSet);
|
||||||
|
parentPathSet.clear();
|
||||||
}
|
}
|
||||||
} catch (InterruptedException e) {
|
} catch (InterruptedException e) {
|
||||||
Thread.currentThread().interrupt();
|
Thread.currentThread().interrupt();
|
||||||
|
@ -904,6 +941,43 @@ public class DynamoDBMetadataStore implements MetadataStore {
|
||||||
S3GUARD_DDB_BATCH_WRITE_REQUEST_LIMIT);
|
S3GUARD_DDB_BATCH_WRITE_REQUEST_LIMIT);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void removeAuthoritativeDirFlag(Set<Path> pathSet)
|
||||||
|
throws IOException {
|
||||||
|
AtomicReference<IOException> rIOException = new AtomicReference<>();
|
||||||
|
|
||||||
|
Set<DDBPathMetadata> metas = pathSet.stream().map(path -> {
|
||||||
|
try {
|
||||||
|
DDBPathMetadata ddbPathMetadata = get(path);
|
||||||
|
if(ddbPathMetadata == null) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
LOG.debug("Setting false isAuthoritativeDir on {}", ddbPathMetadata);
|
||||||
|
ddbPathMetadata.setAuthoritativeDir(false);
|
||||||
|
return ddbPathMetadata;
|
||||||
|
} catch (IOException e) {
|
||||||
|
String msg = String.format("IOException while getting PathMetadata "
|
||||||
|
+ "on path: %s.", path);
|
||||||
|
LOG.error(msg, e);
|
||||||
|
rIOException.set(e);
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
}).filter(Objects::nonNull).collect(Collectors.toSet());
|
||||||
|
|
||||||
|
try {
|
||||||
|
LOG.debug("innerPut on metas: {}", metas);
|
||||||
|
innerPut(metas);
|
||||||
|
} catch (IOException e) {
|
||||||
|
String msg = String.format("IOException while setting false "
|
||||||
|
+ "authoritative directory flag on: %s.", metas);
|
||||||
|
LOG.error(msg, e);
|
||||||
|
rIOException.set(e);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (rIOException.get() != null) {
|
||||||
|
throw rIOException.get();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String toString() {
|
public String toString() {
|
||||||
return getClass().getSimpleName() + '{'
|
return getClass().getSimpleName() + '{'
|
||||||
|
@ -1197,7 +1271,7 @@ public class DynamoDBMetadataStore implements MetadataStore {
|
||||||
map.put(WRITE_CAPACITY, throughput.getWriteCapacityUnits().toString());
|
map.put(WRITE_CAPACITY, throughput.getWriteCapacityUnits().toString());
|
||||||
map.put(TABLE, desc.toString());
|
map.put(TABLE, desc.toString());
|
||||||
map.put(MetadataStoreCapabilities.PERSISTS_AUTHORITATIVE_BIT,
|
map.put(MetadataStoreCapabilities.PERSISTS_AUTHORITATIVE_BIT,
|
||||||
Boolean.toString(false));
|
Boolean.toString(true));
|
||||||
} else {
|
} else {
|
||||||
map.put("name", "DynamoDB Metadata Store");
|
map.put("name", "DynamoDB Metadata Store");
|
||||||
map.put(TABLE, "none");
|
map.put(TABLE, "none");
|
||||||
|
|
|
@ -22,6 +22,8 @@ import java.io.IOException;
|
||||||
import java.net.URI;
|
import java.net.URI;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
import com.amazonaws.services.dynamodbv2.document.Item;
|
import com.amazonaws.services.dynamodbv2.document.Item;
|
||||||
import com.amazonaws.services.dynamodbv2.document.KeyAttribute;
|
import com.amazonaws.services.dynamodbv2.document.KeyAttribute;
|
||||||
|
@ -64,6 +66,7 @@ final class PathMetadataDynamoDBTranslation {
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
static final String BLOCK_SIZE = "block_size";
|
static final String BLOCK_SIZE = "block_size";
|
||||||
static final String IS_DELETED = "is_deleted";
|
static final String IS_DELETED = "is_deleted";
|
||||||
|
static final String IS_AUTHORITATIVE = "is_authoritative";
|
||||||
|
|
||||||
/** Table version field {@value} in version marker item. */
|
/** Table version field {@value} in version marker item. */
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
|
@ -99,12 +102,27 @@ final class PathMetadataDynamoDBTranslation {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Converts a DynamoDB item to a {@link PathMetadata}.
|
* Converts a DynamoDB item to a {@link DDBPathMetadata}.
|
||||||
*
|
*
|
||||||
* @param item DynamoDB item to convert
|
* @param item DynamoDB item to convert
|
||||||
* @return {@code item} converted to a {@link PathMetadata}
|
* @return {@code item} converted to a {@link DDBPathMetadata}
|
||||||
*/
|
*/
|
||||||
static PathMetadata itemToPathMetadata(Item item, String username)
|
static DDBPathMetadata itemToPathMetadata(Item item, String username)
|
||||||
|
throws IOException {
|
||||||
|
return itemToPathMetadata(item, username, false);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Converts a DynamoDB item to a {@link DDBPathMetadata}.
|
||||||
|
* Can ignore {@code IS_AUTHORITATIVE} flag if {@code ignoreIsAuthFlag} is
|
||||||
|
* true.
|
||||||
|
*
|
||||||
|
* @param item DynamoDB item to convert
|
||||||
|
* @param ignoreIsAuthFlag if true, ignore the authoritative flag on item
|
||||||
|
* @return {@code item} converted to a {@link DDBPathMetadata}
|
||||||
|
*/
|
||||||
|
static DDBPathMetadata itemToPathMetadata(Item item, String username,
|
||||||
|
boolean ignoreIsAuthFlag)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
if (item == null) {
|
if (item == null) {
|
||||||
return null;
|
return null;
|
||||||
|
@ -125,8 +143,13 @@ final class PathMetadataDynamoDBTranslation {
|
||||||
Path path = new Path(parent, childStr);
|
Path path = new Path(parent, childStr);
|
||||||
|
|
||||||
boolean isDir = item.hasAttribute(IS_DIR) && item.getBoolean(IS_DIR);
|
boolean isDir = item.hasAttribute(IS_DIR) && item.getBoolean(IS_DIR);
|
||||||
|
boolean isAuthoritativeDir = false;
|
||||||
final FileStatus fileStatus;
|
final FileStatus fileStatus;
|
||||||
if (isDir) {
|
if (isDir) {
|
||||||
|
if (!ignoreIsAuthFlag) {
|
||||||
|
isAuthoritativeDir = item.hasAttribute(IS_AUTHORITATIVE)
|
||||||
|
&& item.getBoolean(IS_AUTHORITATIVE);
|
||||||
|
}
|
||||||
fileStatus = DynamoDBMetadataStore.makeDirStatus(path, username);
|
fileStatus = DynamoDBMetadataStore.makeDirStatus(path, username);
|
||||||
} else {
|
} else {
|
||||||
long len = item.hasAttribute(FILE_LENGTH) ? item.getLong(FILE_LENGTH) : 0;
|
long len = item.hasAttribute(FILE_LENGTH) ? item.getLong(FILE_LENGTH) : 0;
|
||||||
|
@ -138,21 +161,40 @@ final class PathMetadataDynamoDBTranslation {
|
||||||
boolean isDeleted =
|
boolean isDeleted =
|
||||||
item.hasAttribute(IS_DELETED) && item.getBoolean(IS_DELETED);
|
item.hasAttribute(IS_DELETED) && item.getBoolean(IS_DELETED);
|
||||||
|
|
||||||
return new PathMetadata(fileStatus, Tristate.UNKNOWN, isDeleted);
|
return new DDBPathMetadata(fileStatus, Tristate.UNKNOWN, isDeleted,
|
||||||
|
isAuthoritativeDir);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Converts a {@link PathMetadata} to a DynamoDB item.
|
* Converts a {@link DDBPathMetadata} to a DynamoDB item.
|
||||||
*
|
*
|
||||||
* @param meta {@link PathMetadata} to convert
|
* @param meta {@link DDBPathMetadata} to convert
|
||||||
* @return {@code meta} converted to DynamoDB item
|
* @return {@code meta} converted to DynamoDB item
|
||||||
*/
|
*/
|
||||||
static Item pathMetadataToItem(PathMetadata meta) {
|
static Item pathMetadataToItem(DDBPathMetadata meta) {
|
||||||
|
return pathMetadataToItem(meta, false);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Converts a {@link DDBPathMetadata} to a DynamoDB item.
|
||||||
|
*
|
||||||
|
* Can ignore {@code IS_AUTHORITATIVE} flag if {@code ignoreIsAuthFlag} is
|
||||||
|
* true.
|
||||||
|
*
|
||||||
|
* @param meta {@link DDBPathMetadata} to convert
|
||||||
|
* @param ignoreIsAuthFlag if true, ignore the authoritative flag on item
|
||||||
|
* @return {@code meta} converted to DynamoDB item
|
||||||
|
*/
|
||||||
|
static Item pathMetadataToItem(DDBPathMetadata meta,
|
||||||
|
boolean ignoreIsAuthFlag) {
|
||||||
Preconditions.checkNotNull(meta);
|
Preconditions.checkNotNull(meta);
|
||||||
final FileStatus status = meta.getFileStatus();
|
final FileStatus status = meta.getFileStatus();
|
||||||
final Item item = new Item().withPrimaryKey(pathToKey(status.getPath()));
|
final Item item = new Item().withPrimaryKey(pathToKey(status.getPath()));
|
||||||
if (status.isDirectory()) {
|
if (status.isDirectory()) {
|
||||||
item.withBoolean(IS_DIR, true);
|
item.withBoolean(IS_DIR, true);
|
||||||
|
if (!ignoreIsAuthFlag) {
|
||||||
|
item.withBoolean(IS_AUTHORITATIVE, meta.isAuthoritativeDir());
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
item.withLong(FILE_LENGTH, status.getLen())
|
item.withLong(FILE_LENGTH, status.getLen())
|
||||||
.withLong(MOD_TIME, status.getModificationTime())
|
.withLong(MOD_TIME, status.getModificationTime())
|
||||||
|
@ -214,18 +256,19 @@ final class PathMetadataDynamoDBTranslation {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Converts a collection {@link PathMetadata} to a collection DynamoDB items.
|
* Converts a collection {@link DDBPathMetadata} to a collection DynamoDB
|
||||||
|
* items.
|
||||||
*
|
*
|
||||||
* @see #pathMetadataToItem(PathMetadata)
|
* @see #pathMetadataToItem(DDBPathMetadata)
|
||||||
*/
|
*/
|
||||||
static Item[] pathMetadataToItem(Collection<PathMetadata> metas) {
|
static Item[] pathMetadataToItem(Collection<DDBPathMetadata> metas) {
|
||||||
if (metas == null) {
|
if (metas == null) {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
final Item[] items = new Item[metas.size()];
|
final Item[] items = new Item[metas.size()];
|
||||||
int i = 0;
|
int i = 0;
|
||||||
for (PathMetadata meta : metas) {
|
for (DDBPathMetadata meta : metas) {
|
||||||
items[i++] = pathMetadataToItem(meta);
|
items[i++] = pathMetadataToItem(meta);
|
||||||
}
|
}
|
||||||
return items;
|
return items;
|
||||||
|
@ -301,4 +344,10 @@ final class PathMetadataDynamoDBTranslation {
|
||||||
private PathMetadataDynamoDBTranslation() {
|
private PathMetadataDynamoDBTranslation() {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static List<DDBPathMetadata> pathMetaToDDBPathMeta(
|
||||||
|
Collection<PathMetadata> pathMetadatas) {
|
||||||
|
return pathMetadatas.stream().map(p -> new DDBPathMetadata(p))
|
||||||
|
.collect(Collectors.toList());
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -235,6 +235,10 @@ public final class S3Guard {
|
||||||
changed = changed || updated;
|
changed = changed || updated;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// If dirMeta is not authoritative, but isAuthoritative is true the
|
||||||
|
// directory metadata should be updated. Treat it as a change.
|
||||||
|
changed = changed || (!dirMeta.isAuthoritative() && isAuthoritative);
|
||||||
|
|
||||||
if (changed && isAuthoritative) {
|
if (changed && isAuthoritative) {
|
||||||
dirMeta.setAuthoritative(true); // This is the full directory contents
|
dirMeta.setAuthoritative(true); // This is the full directory contents
|
||||||
ms.put(dirMeta);
|
ms.put(dirMeta);
|
||||||
|
|
|
@ -122,8 +122,9 @@ two different reasons:
|
||||||
(`DirListingMetadata`) is full, and complete.
|
(`DirListingMetadata`) is full, and complete.
|
||||||
* If set to `FALSE` the listing may not be complete.
|
* If set to `FALSE` the listing may not be complete.
|
||||||
* Metadata store may persist the isAuthoritative bit on the metadata store.
|
* Metadata store may persist the isAuthoritative bit on the metadata store.
|
||||||
* Currently only `org.apache.hadoop.fs.s3a.s3guard.LocalMetadataStore`
|
* Currently `org.apache.hadoop.fs.s3a.s3guard.LocalMetadataStore` and
|
||||||
implementation supports authoritative bit.
|
`org.apache.hadoop.fs.s3a.s3guard.DynamoDBMetadataStore` implementation
|
||||||
|
supports authoritative bit.
|
||||||
|
|
||||||
More on Authoritative S3Guard:
|
More on Authoritative S3Guard:
|
||||||
|
|
||||||
|
|
|
@ -727,6 +727,13 @@ public abstract class MetadataStoreTestBase extends HadoopTestBase {
|
||||||
new FileStatus(0, false, 0, 0, time + 1, strToPath(freshFile)),
|
new FileStatus(0, false, 0, 0, time + 1, strToPath(freshFile)),
|
||||||
Tristate.FALSE, false));
|
Tristate.FALSE, false));
|
||||||
|
|
||||||
|
// set parent dir as authoritative
|
||||||
|
if (!allowMissing()) {
|
||||||
|
DirListingMetadata parentDirMd = ms.listChildren(strToPath(parentDir));
|
||||||
|
parentDirMd.setAuthoritative(true);
|
||||||
|
ms.put(parentDirMd);
|
||||||
|
}
|
||||||
|
|
||||||
ms.prune(time);
|
ms.prune(time);
|
||||||
DirListingMetadata listing;
|
DirListingMetadata listing;
|
||||||
for (String directory : directories) {
|
for (String directory : directories) {
|
||||||
|
@ -738,6 +745,48 @@ public abstract class MetadataStoreTestBase extends HadoopTestBase {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testPrunePreservesAuthoritative() throws Exception {
|
||||||
|
String rootDir = "/unpruned-root-dir";
|
||||||
|
String grandparentDir = rootDir + "/pruned-grandparent-dir";
|
||||||
|
String parentDir = grandparentDir + "/pruned-parent-dir";
|
||||||
|
String staleFile = parentDir + "/stale-file";
|
||||||
|
String freshFile = rootDir + "/fresh-file";
|
||||||
|
String[] directories = {rootDir, grandparentDir, parentDir};
|
||||||
|
|
||||||
|
// create dirs
|
||||||
|
createNewDirs(rootDir, grandparentDir, parentDir);
|
||||||
|
long time = System.currentTimeMillis();
|
||||||
|
ms.put(new PathMetadata(
|
||||||
|
new FileStatus(0, false, 0, 0, time + 1, strToPath(staleFile)),
|
||||||
|
Tristate.FALSE, false));
|
||||||
|
ms.put(new PathMetadata(
|
||||||
|
new FileStatus(0, false, 0, 0, time + 1, strToPath(freshFile)),
|
||||||
|
Tristate.FALSE, false));
|
||||||
|
|
||||||
|
if (!allowMissing()) {
|
||||||
|
// set parent dir as authoritative
|
||||||
|
DirListingMetadata parentDirMd = ms.listChildren(strToPath(parentDir));
|
||||||
|
parentDirMd.setAuthoritative(true);
|
||||||
|
ms.put(parentDirMd);
|
||||||
|
|
||||||
|
// prune the ms
|
||||||
|
ms.prune(time);
|
||||||
|
|
||||||
|
// get the directory listings
|
||||||
|
DirListingMetadata rootDirMd = ms.listChildren(strToPath(rootDir));
|
||||||
|
DirListingMetadata grandParentDirMd =
|
||||||
|
ms.listChildren(strToPath(grandparentDir));
|
||||||
|
parentDirMd = ms.listChildren(strToPath(parentDir));
|
||||||
|
|
||||||
|
// assert that parent dir is still authoritative (no removed elements
|
||||||
|
// during prune)
|
||||||
|
assertFalse(rootDirMd.isAuthoritative());
|
||||||
|
assertFalse(grandParentDirMd.isAuthoritative());
|
||||||
|
assertTrue(parentDirMd.isAuthoritative());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testPutDirListingMetadataPutsFileMetadata()
|
public void testPutDirListingMetadataPutsFileMetadata()
|
||||||
throws IOException {
|
throws IOException {
|
||||||
|
|
|
@ -40,6 +40,7 @@ import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.fs.s3a.S3AFileStatus;
|
import org.apache.hadoop.fs.s3a.S3AFileStatus;
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
import org.apache.hadoop.test.LambdaTestUtils;
|
import org.apache.hadoop.test.LambdaTestUtils;
|
||||||
|
import org.mockito.Mockito;
|
||||||
|
|
||||||
import static com.amazonaws.services.dynamodbv2.model.KeyType.HASH;
|
import static com.amazonaws.services.dynamodbv2.model.KeyType.HASH;
|
||||||
import static com.amazonaws.services.dynamodbv2.model.KeyType.RANGE;
|
import static com.amazonaws.services.dynamodbv2.model.KeyType.RANGE;
|
||||||
|
@ -50,6 +51,7 @@ import static org.hamcrest.CoreMatchers.is;
|
||||||
import static org.apache.hadoop.fs.s3a.s3guard.PathMetadataDynamoDBTranslation.*;
|
import static org.apache.hadoop.fs.s3a.s3guard.PathMetadataDynamoDBTranslation.*;
|
||||||
import static org.apache.hadoop.fs.s3a.s3guard.DynamoDBMetadataStore.VERSION_MARKER;
|
import static org.apache.hadoop.fs.s3a.s3guard.DynamoDBMetadataStore.VERSION_MARKER;
|
||||||
import static org.apache.hadoop.fs.s3a.s3guard.DynamoDBMetadataStore.VERSION;
|
import static org.apache.hadoop.fs.s3a.s3guard.DynamoDBMetadataStore.VERSION;
|
||||||
|
import static org.mockito.Mockito.never;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Test the PathMetadataDynamoDBTranslation is able to translate between domain
|
* Test the PathMetadataDynamoDBTranslation is able to translate between domain
|
||||||
|
@ -59,28 +61,30 @@ public class TestPathMetadataDynamoDBTranslation extends Assert {
|
||||||
|
|
||||||
private static final Path TEST_DIR_PATH = new Path("s3a://test-bucket/myDir");
|
private static final Path TEST_DIR_PATH = new Path("s3a://test-bucket/myDir");
|
||||||
private static final Item TEST_DIR_ITEM = new Item();
|
private static final Item TEST_DIR_ITEM = new Item();
|
||||||
private static PathMetadata testDirPathMetadata;
|
private static DDBPathMetadata testDirPathMetadata;
|
||||||
|
|
||||||
private static final long TEST_FILE_LENGTH = 100;
|
private static final long TEST_FILE_LENGTH = 100;
|
||||||
private static final long TEST_MOD_TIME = 9999;
|
private static final long TEST_MOD_TIME = 9999;
|
||||||
private static final long TEST_BLOCK_SIZE = 128;
|
private static final long TEST_BLOCK_SIZE = 128;
|
||||||
private static final Path TEST_FILE_PATH = new Path(TEST_DIR_PATH, "myFile");
|
private static final Path TEST_FILE_PATH = new Path(TEST_DIR_PATH, "myFile");
|
||||||
private static final Item TEST_FILE_ITEM = new Item();
|
private static final Item TEST_FILE_ITEM = new Item();
|
||||||
private static PathMetadata testFilePathMetadata;
|
private static DDBPathMetadata testFilePathMetadata;
|
||||||
|
|
||||||
@BeforeClass
|
@BeforeClass
|
||||||
public static void setUpBeforeClass() throws IOException {
|
public static void setUpBeforeClass() throws IOException {
|
||||||
String username = UserGroupInformation.getCurrentUser().getShortUserName();
|
String username = UserGroupInformation.getCurrentUser().getShortUserName();
|
||||||
|
|
||||||
testDirPathMetadata =
|
testDirPathMetadata = new DDBPathMetadata(new S3AFileStatus(false,
|
||||||
new PathMetadata(new S3AFileStatus(false, TEST_DIR_PATH, username));
|
TEST_DIR_PATH, username));
|
||||||
|
|
||||||
TEST_DIR_ITEM
|
TEST_DIR_ITEM
|
||||||
.withPrimaryKey(PARENT, "/test-bucket", CHILD, TEST_DIR_PATH.getName())
|
.withPrimaryKey(PARENT, "/test-bucket", CHILD, TEST_DIR_PATH.getName())
|
||||||
.withBoolean(IS_DIR, true);
|
.withBoolean(IS_DIR, true);
|
||||||
|
|
||||||
testFilePathMetadata = new PathMetadata(
|
testFilePathMetadata = new DDBPathMetadata(
|
||||||
new S3AFileStatus(TEST_FILE_LENGTH, TEST_MOD_TIME, TEST_FILE_PATH,
|
new S3AFileStatus(TEST_FILE_LENGTH, TEST_MOD_TIME, TEST_FILE_PATH,
|
||||||
TEST_BLOCK_SIZE, username));
|
TEST_BLOCK_SIZE, username));
|
||||||
|
|
||||||
TEST_FILE_ITEM
|
TEST_FILE_ITEM
|
||||||
.withPrimaryKey(PARENT, pathToParentKey(TEST_FILE_PATH.getParent()),
|
.withPrimaryKey(PARENT, pathToParentKey(TEST_FILE_PATH.getParent()),
|
||||||
CHILD, TEST_FILE_PATH.getName())
|
CHILD, TEST_FILE_PATH.getName())
|
||||||
|
@ -235,4 +239,37 @@ public class TestPathMetadataDynamoDBTranslation extends Assert {
|
||||||
itemToPathMetadata(marker, "alice"));
|
itemToPathMetadata(marker, "alice"));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test when translating an {@link Item} to {@link DDBPathMetadata} works
|
||||||
|
* if {@code IS_AUTHORITATIVE} flag is ignored.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testIsAuthoritativeCompatibilityItemToPathMetadata()
|
||||||
|
throws Exception {
|
||||||
|
Item item = Mockito.spy(TEST_DIR_ITEM);
|
||||||
|
item.withBoolean(IS_AUTHORITATIVE, true);
|
||||||
|
|
||||||
|
final String user =
|
||||||
|
UserGroupInformation.getCurrentUser().getShortUserName();
|
||||||
|
DDBPathMetadata meta = itemToPathMetadata(item, user, true);
|
||||||
|
|
||||||
|
Mockito.verify(item, Mockito.never()).getBoolean(IS_AUTHORITATIVE);
|
||||||
|
assertFalse(meta.isAuthoritativeDir());
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test when translating an {@link DDBPathMetadata} to {@link Item} works
|
||||||
|
* if {@code IS_AUTHORITATIVE} flag is ignored.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testIsAuthoritativeCompatibilityPathMetadataToItem() {
|
||||||
|
DDBPathMetadata meta = Mockito.spy(testFilePathMetadata);
|
||||||
|
meta.setAuthoritativeDir(true);
|
||||||
|
|
||||||
|
Item item = pathMetadataToItem(meta, true);
|
||||||
|
|
||||||
|
Mockito.verify(meta, never()).isAuthoritativeDir();
|
||||||
|
assertFalse(item.hasAttribute(IS_AUTHORITATIVE));
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue