HADOOP-17199. S3A Directory Marker HADOOP-13230 backport

This backports the listing-side changes of HADOOP-13230.

With this patch in, this branch of Hadoop is compatible with S3A clients
which do not delete directory markers when files are created underneath.

It does not allow this version to disable marker deletion; if the
fs.s3a.marker.retention option is changed to request this, a message
is printed at INFO and the policy remains at "delete"

The s3guard bucket-info command has been extended to support
probing for marker retention, as has the hasPathCapability method on
S3AFileSystem.

Read the documentation!

Change-Id: Ief23c39256b194abcc445231b5f7a018185ea9f6
This commit is contained in:
Steve Loughran 2020-09-03 18:29:30 +01:00
parent ecf88b972f
commit a75f2915f9
19 changed files with 2490 additions and 112 deletions

View File

@ -20,7 +20,9 @@ package org.apache.hadoop.fs.contract;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.UUID;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
@ -410,7 +412,8 @@ public abstract class AbstractContractGetFileStatusTest extends
Path path = getContract().getTestPath();
fs.delete(path, true);
// create a - non-qualified - Path for a subdir
Path subfolder = path.suffix('/' + this.methodName.getMethodName());
Path subfolder = path.suffix('/' + this.methodName.getMethodName()
+ "-" + UUID.randomUUID());
mkdirs(subfolder);
return subfolder;
}
@ -535,7 +538,8 @@ public abstract class AbstractContractGetFileStatusTest extends
Path path,
PathFilter filter) throws IOException {
FileStatus[] result = getFileSystem().listStatus(path, filter);
assertEquals("length of listStatus(" + path + ", " + filter + " )",
assertEquals("length of listStatus(" + path + ", " + filter + " ) " +
Arrays.toString(result),
expected, result.length);
return result;
}

View File

@ -472,4 +472,95 @@ public final class Constants {
public static final String FAIL_INJECT_INCONSISTENCY_PROBABILITY =
"fs.s3a.failinject.inconsistency.probability";
/**
* Policy for directory markers.
* This is a new feature of HADOOP-13230 which addresses
* some scale, performance and permissions issues -but
* at the risk of backwards compatibility.
* <p></p>
* This Hadoop release only supports the original "delete"
* policy.
*/
public static final String DIRECTORY_MARKER_POLICY =
"fs.s3a.directory.marker.retention";
/**
* Delete directory markers. This is the backwards compatible option.
* Value: {@value}.
*/
public static final String DIRECTORY_MARKER_POLICY_DELETE =
"delete";
/**
* Retain directory markers (unsupported in this release).
* Value: {@value}.
*/
public static final String DIRECTORY_MARKER_POLICY_KEEP =
"keep";
/**
* Retain directory markers in authoritative directory trees only
* (unsupported in this release).
* Value: {@value}.
*/
public static final String DIRECTORY_MARKER_POLICY_AUTHORITATIVE =
"authoritative";
/**
* Default retention policy: {@value}.
*/
public static final String DEFAULT_DIRECTORY_MARKER_POLICY =
DIRECTORY_MARKER_POLICY_DELETE;
/**
* {@code PathCapabilities} probe to verify that an S3A Filesystem
* has the changes needed to safely work with buckets where
* directoy markers have not been deleted.
* Value: {@value}.
*/
public static final String STORE_CAPABILITY_DIRECTORY_MARKER_AWARE
= "fs.s3a.capability.directory.marker.aware";
/**
* {@code PathCapabilities} probe to indicate that the filesystem
* keeps directory markers.
* Value: {@value}.
*/
public static final String STORE_CAPABILITY_DIRECTORY_MARKER_POLICY_KEEP
= "fs.s3a.capability.directory.marker.policy.keep";
/**
* {@code PathCapabilities} probe to indicate that the filesystem
* deletes directory markers.
* Value: {@value}.
*/
public static final String STORE_CAPABILITY_DIRECTORY_MARKER_POLICY_DELETE
= "fs.s3a.capability.directory.marker.policy.delete";
/**
* {@code PathCapabilities} probe to indicate that the filesystem
* keeps directory markers in authoritative paths only.
* Value: {@value}.
*/
public static final String
STORE_CAPABILITY_DIRECTORY_MARKER_POLICY_AUTHORITATIVE =
"fs.s3a.capability.directory.marker.policy.authoritative";
/**
* {@code PathCapabilities} probe to indicate that a path/S3GuardTool
* keeps directory markers.
* Value: {@value}.
*/
public static final String STORE_CAPABILITY_DIRECTORY_MARKER_ACTION_KEEP
= "fs.s3a.capability.directory.marker.action.keep";
/**
* {@code PathCapabilities} probe to indicate that a path
* deletes directory markers.
* Value: {@value}.
*/
public static final String STORE_CAPABILITY_DIRECTORY_MARKER_ACTION_DELETE
= "fs.s3a.capability.directory.marker.action.delete";
}

View File

@ -264,7 +264,9 @@ public class InconsistentAmazonS3Client extends AmazonS3Client {
} else {
Path actualParentPath = new Path(child).getParent();
Path expectedParentPath = new Path(parent);
return actualParentPath.equals(expectedParentPath);
// children which are directory markers are excluded here
return actualParentPath.equals(expectedParentPath)
&& !child.endsWith("/");
}
}

View File

@ -31,6 +31,7 @@ import java.util.Date;
import java.util.EnumSet;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.Objects;
@ -81,6 +82,10 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.StreamCapabilities;
import org.apache.hadoop.fs.s3a.impl.DirectoryPolicy;
import org.apache.hadoop.fs.s3a.impl.DirectoryPolicyImpl;
import org.apache.hadoop.fs.s3a.impl.StatusProbeEnum;
import org.apache.hadoop.fs.FileAlreadyExistsException;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
@ -131,7 +136,7 @@ import org.slf4j.LoggerFactory;
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
public class S3AFileSystem extends FileSystem {
public class S3AFileSystem extends FileSystem implements StreamCapabilities {
/**
* Default blocksize as used in blocksize and FS status queries.
*/
@ -170,6 +175,11 @@ public class S3AFileSystem extends FileSystem {
private S3ADataBlocks.BlockFactory blockFactory;
private int blockOutputActiveBlocks;
/**
* Directory policy.
*/
private DirectoryPolicy directoryPolicy;
/** Add any deprecated keys. */
@SuppressWarnings("deprecation")
private static void addDeprecatedKeys() {
@ -298,6 +308,9 @@ public class S3AFileSystem extends FileSystem {
LOG.debug("Using metadata store {}, authoritative={}",
getMetadataStore(), allowAuthoritative);
}
// directory policy, which may look at authoritative paths
directoryPolicy = DirectoryPolicyImpl.getDirectoryPolicy(conf);
LOG.debug("Directory marker retention policy is {}", directoryPolicy);
} catch (AmazonClientException e) {
throw translateException("initializing ", new Path(name), e);
}
@ -413,6 +426,19 @@ public class S3AFileSystem extends FileSystem {
return s3;
}
/**
* Returns the S3 client used by this filesystem.
* <i>Warning: this must only be used for testing, as it bypasses core
* S3A operations. </i>
* @param reason a justification for requesting access.
* @return AmazonS3Client
*/
@VisibleForTesting
public AmazonS3 getAmazonS3ClientForTesting(String reason) {
LOG.warn("Access to S3A client requested, reason {}", reason);
return s3;
}
/**
* Get the region of a bucket.
* @return the region in which a bucket is located
@ -846,6 +872,10 @@ public class S3AFileSystem extends FileSystem {
}
// TODO S3Guard HADOOP-13761: retries when source paths are not visible yet
// TODO S3Guard: performance: mark destination dirs as authoritative
// The path to whichever file or directory is created by the
// rename. When deleting markers all parents of
// this path will need their markers pruned.
Path destCreated = dst;
// Ok! Time to start
if (srcStatus.isFile()) {
@ -859,9 +889,11 @@ public class S3AFileSystem extends FileSystem {
String filename =
srcKey.substring(pathToKey(src.getParent()).length()+1);
newDstKey = newDstKey + filename;
destCreated = keyToQualifiedPath(newDstKey);
copyFile(srcKey, newDstKey, length);
S3Guard.addMoveFile(metadataStore, srcPaths, dstMetas, src,
keyToQualifiedPath(newDstKey), length, getDefaultBlockSize(dst),
destCreated, length, getDefaultBlockSize(dst),
username);
} else {
copyFile(srcKey, dstKey, srcStatus.getLen());
@ -948,9 +980,10 @@ public class S3AFileSystem extends FileSystem {
metadataStore.move(srcPaths, dstMetas);
if (src.getParent() != dst.getParent()) {
deleteUnnecessaryFakeDirectories(dst.getParent());
createFakeDirectoryIfNecessary(src.getParent());
if (!src.getParent().equals(destCreated.getParent())) {
LOG.debug("source & dest parents are different; fix up dir markers");
deleteUnnecessaryFakeDirectories(destCreated.getParent());
maybeCreateFakeParentDirectory(src);
}
return true;
}
@ -1547,6 +1580,21 @@ public class S3AFileSystem extends FileSystem {
}
}
/**
* Create a fake parent directory if required.
* That is: it parent is not the root path and does not yet exist.
* @param path whose parent is created if needed.
* @throws IOException IO problem
* @throws AmazonClientException untranslated AWS client problem
*/
void maybeCreateFakeParentDirectory(Path path)
throws IOException, AmazonClientException {
Path parent = path.getParent();
if (parent != null) {
createFakeDirectoryIfNecessary(parent);
}
}
/**
* List the statuses of the files/directories in the given path if the path is
* a directory.
@ -1792,6 +1840,8 @@ public class S3AFileSystem extends FileSystem {
FileStatus msStatus = pm.getFileStatus();
if (needEmptyDirectoryFlag && msStatus.isDirectory()) {
// the caller needs to know if a directory is empty,
// and that this is a directory.
if (pm.isEmptyDirectory() != Tristate.UNKNOWN) {
// We have a definitive true / false from MetadataStore, we are done.
return S3AFileStatus.fromFileStatus(msStatus, pm.isEmptyDirectory());
@ -1800,28 +1850,33 @@ public class S3AFileSystem extends FileSystem {
if (children != null) {
tombstones = children.listTombstones();
}
LOG.debug("MetadataStore doesn't know if dir is empty, using S3.");
LOG.debug("MetadataStore doesn't know if {} is empty, using S3.",
path);
}
} else {
// Either this is not a directory, or we don't care if it is empty
return S3AFileStatus.fromFileStatus(msStatus, pm.isEmptyDirectory());
}
// If the metadata store has no children for it and it's not listed in
// S3 yet, we'll assume the empty directory is true;
S3AFileStatus s3FileStatus;
// now issue the S3 getFileStatus call.
try {
s3FileStatus = s3GetFileStatus(path, key, tombstones);
S3AFileStatus s3FileStatus = s3GetFileStatus(path, key,
StatusProbeEnum.ALL,
tombstones,
true);
// entry was found, so save in S3Guard and return the final value.
return S3Guard.putAndReturn(metadataStore, s3FileStatus,
instrumentation);
} catch (FileNotFoundException e) {
return S3AFileStatus.fromFileStatus(msStatus, Tristate.TRUE);
}
// entry was found, save in S3Guard
return S3Guard.putAndReturn(metadataStore, s3FileStatus, instrumentation);
} else {
// there was no entry in S3Guard
// retrieve the data and update the metadata store in the process.
return S3Guard.putAndReturn(metadataStore,
s3GetFileStatus(path, key, tombstones), instrumentation);
s3GetFileStatus(path, key, StatusProbeEnum.ALL,
tombstones, needEmptyDirectoryFlag),
instrumentation);
}
}
@ -1831,101 +1886,120 @@ public class S3AFileSystem extends FileSystem {
* and for direct management of empty directory blobs.
* @param path Qualified path
* @param key Key string for the path
* @param probes probes to make
* @param tombstones tombstones to filter
* @param needEmptyDirectoryFlag if true, implementation will calculate
* a TRUE or FALSE value for {@link S3AFileStatus#isEmptyDirectory()}
* @return Status
* @throws FileNotFoundException when the path does not exist
* @throws FileNotFoundException the supplied probes failed.
* @throws IOException on other problems.
*/
private S3AFileStatus s3GetFileStatus(final Path path, String key,
Set<Path> tombstones) throws IOException {
if (!key.isEmpty()) {
try {
ObjectMetadata meta = getObjectMetadata(key);
private S3AFileStatus s3GetFileStatus(final Path path,
final String key,
final Set<StatusProbeEnum> probes,
final Set<Path> tombstones,
final boolean needEmptyDirectoryFlag) throws IOException {
LOG.debug("S3GetFileStatus {}", path);
Preconditions.checkArgument(!needEmptyDirectoryFlag
|| probes.contains(StatusProbeEnum.List),
"s3GetFileStatus(%s) wants to know if a directory is empty but"
+ " does not request a list probe", path);
if (objectRepresentsDirectory(key, meta.getContentLength())) {
LOG.debug("Found exact file: fake directory");
return new S3AFileStatus(Tristate.TRUE, path, username);
} else {
LOG.debug("Found exact file: normal file");
return new S3AFileStatus(meta.getContentLength(),
dateToLong(meta.getLastModified()),
path,
getDefaultBlockSize(path),
username);
}
if (!key.isEmpty() && !key.endsWith("/")
&& probes.contains(StatusProbeEnum.Head)) {
try {
// look for the simple file
ObjectMetadata meta = getObjectMetadata(key);
LOG.debug("Found exact file: normal file {}", key);
return new S3AFileStatus(meta.getContentLength(),
dateToLong(meta.getLastModified()),
path,
getDefaultBlockSize(path),
username);
} catch (AmazonServiceException e) {
// if the response is a 404 error, it just means that there is
// no file at that path...the remaining checks will be needed.
if (e.getStatusCode() != 404) {
throw translateException("getFileStatus", path, e);
}
} catch (AmazonClientException e) {
throw translateException("getFileStatus", path, e);
}
// Necessary?
if (!key.endsWith("/")) {
String newKey = key + "/";
try {
ObjectMetadata meta = getObjectMetadata(newKey);
if (objectRepresentsDirectory(newKey, meta.getContentLength())) {
LOG.debug("Found file (with /): fake directory");
return new S3AFileStatus(Tristate.TRUE, path, username);
} else {
LOG.warn("Found file (with /): real file? should not happen: {}",
key);
return new S3AFileStatus(meta.getContentLength(),
dateToLong(meta.getLastModified()),
path,
getDefaultBlockSize(path),
username);
}
} catch (AmazonServiceException e) {
if (e.getStatusCode() != 404) {
throw translateException("getFileStatus", newKey, e);
}
} catch (AmazonClientException e) {
throw translateException("getFileStatus", newKey, e);
}
}
}
try {
key = maybeAddTrailingSlash(key);
ListObjectsRequest request = new ListObjectsRequest();
request.setBucketName(bucket);
request.setPrefix(key);
request.setDelimiter("/");
request.setMaxKeys(1);
// execute the list
if (probes.contains(StatusProbeEnum.List)) {
// this will find a marker dir / as well as an entry.
// When making a simple "is this a dir check" all is good.
// but when looking for an empty dir, we need to verify there are no
// children, so ask for two entries, so as to find
// a child
String dirKey = maybeAddTrailingSlash(key);
// list size is dir marker + at least one non-tombstone entry
// there's a corner case: more tombstones than you have in a
// single page list. We assume that if you have been deleting
// that many files, then the AWS listing will have purged some
// by the time of listing so that the response includes some
// which have not.
ObjectListing objects = listObjects(request);
Collection<String> prefixes = objects.getCommonPrefixes();
Collection<S3ObjectSummary> summaries = objects.getObjectSummaries();
if (!isEmptyOfKeys(prefixes, tombstones) ||
!isEmptyOfObjects(summaries, tombstones)) {
if (LOG.isDebugEnabled()) {
LOG.debug("Found path as directory (with /): {}/{}",
prefixes.size(), summaries.size());
for (S3ObjectSummary summary : summaries) {
LOG.debug("Summary: {} {}", summary.getKey(), summary.getSize());
}
for (String prefix : prefixes) {
LOG.debug("Prefix: {}", prefix);
}
}
return new S3AFileStatus(Tristate.FALSE, path, username);
} else if (key.isEmpty()) {
LOG.debug("Found root directory");
return new S3AFileStatus(Tristate.TRUE, path, username);
int listSize;
if (tombstones == null) {
// no tombstones so look for a marker and at least one child.
listSize = 2;
} else {
// build a listing > tombstones. If the caller has many thousands
// of tombstones this won't work properly, which is why pruning
// of expired tombstones matters.
listSize = Math.min(2 + tombstones.size(), Math.max(2, maxKeys));
}
} catch (AmazonServiceException e) {
if (e.getStatusCode() != 404) {
try {
ListObjectsRequest request = new ListObjectsRequest();
request.setBucketName(bucket);
request.setPrefix(dirKey);
request.setDelimiter("/");
request.setMaxKeys(listSize);
ObjectListing objects = listObjects(request);
Collection<String> prefixes = objects.getCommonPrefixes();
Collection<S3ObjectSummary> summaries = objects.getObjectSummaries();
if (!isEmptyOfKeys(prefixes, tombstones) ||
!isEmptyOfObjects(summaries, tombstones)) {
if (LOG.isDebugEnabled()) {
LOG.debug("Found path as directory (with /): {}/{}",
prefixes.size(), summaries.size());
for (S3ObjectSummary summary : summaries) {
LOG.debug("Summary: {} {}", summary.getKey(),
summary.getSize());
}
for (String prefix : prefixes) {
LOG.debug("Prefix: {}", prefix);
}
}
// At least one entry has been found.
// If looking for an empty directory, the marker must exist but no children.
// So the listing must contain the marker entry only.
if (needEmptyDirectoryFlag
&& representsEmptyDirectory(objects,
dirKey, tombstones)) {
return new S3AFileStatus(Tristate.TRUE, path, username);
}
// either an empty directory is not needed, or the
// listing does not meet the requirements.
return new S3AFileStatus(Tristate.FALSE, path, username);
} else if (key.isEmpty()) {
LOG.debug("Found root directory");
return new S3AFileStatus(Tristate.TRUE, path, username);
}
} catch (AmazonServiceException e) {
if (e.getStatusCode() != 404) {
throw translateException("getFileStatus", key, e);
}
} catch (AmazonClientException e) {
throw translateException("getFileStatus", key, e);
}
} catch (AmazonClientException e) {
throw translateException("getFileStatus", key, e);
}
LOG.debug("Not Found: {}", path);
@ -1967,11 +2041,41 @@ public class S3AFileSystem extends FileSystem {
if (tombstones == null) {
return summaries.isEmpty();
}
Collection<String> stringCollection = objectSummaryKeys(summaries);
return isEmptyOfKeys(stringCollection, tombstones);
}
/**
* Get the list of keys in the object summary.
* @return a possibly empty list
*/
private Collection<String> objectSummaryKeys(
final Collection<S3ObjectSummary> summaries) {
Collection<String> stringCollection = new ArrayList<>(summaries.size());
for (S3ObjectSummary summary : summaries) {
stringCollection.add(summary.getKey());
}
return isEmptyOfKeys(stringCollection, tombstones);
return stringCollection;
}
/**
* Does this listing represent an empty directory?
* @param result listing
* @param dirKey directory key
* @param tombstones Set of tombstone markers, or null if not applicable.
* @return true if the list is considered empty.
*/
public boolean representsEmptyDirectory(
ObjectListing result,
final String dirKey,
final Set<Path> tombstones) {
// If looking for an empty directory, the marker must exist but
// no children.
// So the listing must contain the marker entry only as an object,
// and prefixes is null
Collection<String> keys = objectSummaryKeys(result.getObjectSummaries());
return keys.size() == 1 && keys.contains(dirKey)
&& result.getCommonPrefixes().isEmpty();
}
/**
@ -1983,7 +2087,8 @@ public class S3AFileSystem extends FileSystem {
Path path = qualify(f);
String key = pathToKey(path);
try {
s3GetFileStatus(path, key, null);
s3GetFileStatus(path, key, StatusProbeEnum.ALL,
null, false);
return true;
} catch (FileNotFoundException e) {
return false;
@ -2422,6 +2527,14 @@ public class S3AFileSystem extends FileSystem {
return getConf().getLongBytes(FS_S3A_BLOCK_SIZE, DEFAULT_BLOCKSIZE);
}
/**
* Get the directory marker policy of this filesystem.
* @return the marker policy.
*/
public DirectoryPolicy getDirectoryMarkerPolicy() {
return directoryPolicy;
}
@Override
public String toString() {
final StringBuilder sb = new StringBuilder(
@ -2450,6 +2563,7 @@ public class S3AFileSystem extends FileSystem {
sb.append(", authoritative=").append(allowAuthoritative);
sb.append(", boundedExecutor=").append(boundedThreadPool);
sb.append(", unboundedExecutor=").append(unboundedThreadPool);
sb.append(", ").append(directoryPolicy);
sb.append(", statistics {")
.append(statistics)
.append("}");
@ -2911,4 +3025,47 @@ public class S3AFileSystem extends FileSystem {
}
}
/**
* This Hadoop version does not support PathCapabilities.
* By implementing the method on its own, code which
* introspects to find the method can still probe for
* the capabilities of the store.
* @param path path (unused)
* @param capability capability to probe for.
* @return true if the FS has the specific capability.
* @throws IOException failure
*/
public boolean hasPathCapability(final Path path, final String capability)
throws IOException {
return hasCapability(capability);
}
/**
* Return the capabilities of this filesystem instance.
* @param capability string to query the stream support for.
* @return whether the FS instance has the capability.
*/
@Override
public boolean hasCapability(String capability) {
final String cap = capability.toLowerCase(Locale.ENGLISH);
switch (cap) {
case STORE_CAPABILITY_DIRECTORY_MARKER_AWARE:
return true;
/*
* Marker policy capabilities are handed off.
*/
case STORE_CAPABILITY_DIRECTORY_MARKER_POLICY_KEEP:
case STORE_CAPABILITY_DIRECTORY_MARKER_POLICY_DELETE:
case STORE_CAPABILITY_DIRECTORY_MARKER_POLICY_AUTHORITATIVE:
case STORE_CAPABILITY_DIRECTORY_MARKER_ACTION_KEEP:
case STORE_CAPABILITY_DIRECTORY_MARKER_ACTION_DELETE:
return getDirectoryMarkerPolicy().hasPathCapability(new Path("/"), cap);
default:
return false;
}
}
}

View File

@ -0,0 +1,110 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3a.impl;
import org.apache.hadoop.fs.Path;
import static org.apache.hadoop.fs.s3a.Constants.DIRECTORY_MARKER_POLICY_AUTHORITATIVE;
import static org.apache.hadoop.fs.s3a.Constants.DIRECTORY_MARKER_POLICY_DELETE;
import static org.apache.hadoop.fs.s3a.Constants.DIRECTORY_MARKER_POLICY_KEEP;
/**
* Interface for Directory Marker policies to implement.
*/
public interface DirectoryPolicy {
/**
* Should a directory marker be retained?
* @param path path a file/directory is being created with.
* @return true if the marker MAY be kept, false if it MUST be deleted.
*/
boolean keepDirectoryMarkers(Path path);
/**
* Get the marker policy.
* @return policy.
*/
MarkerPolicy getMarkerPolicy();
/**
* Describe the policy for marker tools and logs.
* @return description of the current policy.
*/
String describe();
/**
* Does a specific path have the relevant option.
* This is to be forwarded from the S3AFileSystem.hasPathCapability
* But only for those capabilities related to markers*
* @param path path
* @param capability capability
* @return true if the capability is supported, false if not
* @throws IllegalArgumentException if the capability is unknown.
*/
boolean hasPathCapability(Path path, String capability);
/**
* Supported retention policies.
*/
enum MarkerPolicy {
/**
* Delete markers.
* <p></p>
* This is the classic S3A policy,
*/
Delete(DIRECTORY_MARKER_POLICY_DELETE),
/**
* Keep markers.
* <p></p>
* This is <i>Not backwards compatible</i>.
*/
Keep(DIRECTORY_MARKER_POLICY_KEEP),
/**
* Keep markers in authoritative paths only.
* <p></p>
* This is <i>Not backwards compatible</i> within the
* auth paths, but is outside these.
*/
Authoritative(DIRECTORY_MARKER_POLICY_AUTHORITATIVE);
/**
* The name of the option as allowed in configuration files
* and marker-aware tooling.
*/
private final String optionName;
MarkerPolicy(final String optionName) {
this.optionName = optionName;
}
/**
* Get the option name.
* @return name of the option
*/
public String getOptionName() {
return optionName;
}
}
}

View File

@ -0,0 +1,178 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3a.impl;
import java.util.EnumSet;
import java.util.Locale;
import java.util.Set;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import static org.apache.hadoop.fs.s3a.Constants.DEFAULT_DIRECTORY_MARKER_POLICY;
import static org.apache.hadoop.fs.s3a.Constants.DIRECTORY_MARKER_POLICY;
import static org.apache.hadoop.fs.s3a.Constants.DIRECTORY_MARKER_POLICY_AUTHORITATIVE;
import static org.apache.hadoop.fs.s3a.Constants.DIRECTORY_MARKER_POLICY_DELETE;
import static org.apache.hadoop.fs.s3a.Constants.DIRECTORY_MARKER_POLICY_KEEP;
import static org.apache.hadoop.fs.s3a.Constants.STORE_CAPABILITY_DIRECTORY_MARKER_AWARE;
import static org.apache.hadoop.fs.s3a.Constants.STORE_CAPABILITY_DIRECTORY_MARKER_ACTION_DELETE;
import static org.apache.hadoop.fs.s3a.Constants.STORE_CAPABILITY_DIRECTORY_MARKER_ACTION_KEEP;
import static org.apache.hadoop.fs.s3a.Constants.STORE_CAPABILITY_DIRECTORY_MARKER_POLICY_AUTHORITATIVE;
import static org.apache.hadoop.fs.s3a.Constants.STORE_CAPABILITY_DIRECTORY_MARKER_POLICY_DELETE;
import static org.apache.hadoop.fs.s3a.Constants.STORE_CAPABILITY_DIRECTORY_MARKER_POLICY_KEEP;
/**
* Implementation of directory policy.
* <p> </p>
* As only the DELETE policy is supported, the policy logic here is relatively
* straightforward.
*/
public final class DirectoryPolicyImpl
implements DirectoryPolicy {
private static final Logger LOG = LoggerFactory.getLogger(
DirectoryPolicyImpl.class);
/**
* Error string when unable to parse the marker policy option.
*/
public static final String UNKNOWN_MARKER_POLICY =
"Unknown policy in "
+ DIRECTORY_MARKER_POLICY + ": ";
/**
* All available policies.
*/
private static final Set<MarkerPolicy> AVAILABLE_POLICIES =
EnumSet.of(MarkerPolicy.Delete);
/**
* Delete all markers.
*/
public static final DirectoryPolicy DELETE = new DirectoryPolicyImpl(
MarkerPolicy.Delete);
/**
* Chosen marker policy.
*/
private final MarkerPolicy markerPolicy;
/**
* Constructor.
* @param markerPolicy marker policy
*
*/
public DirectoryPolicyImpl(final MarkerPolicy markerPolicy) {
this.markerPolicy = markerPolicy;
}
@Override
public boolean keepDirectoryMarkers(final Path path) {
return false;
}
@Override
public MarkerPolicy getMarkerPolicy() {
return markerPolicy;
}
@Override
public String describe() {
return markerPolicy.getOptionName();
}
@Override
public String toString() {
final StringBuilder sb = new StringBuilder(
"DirectoryMarkerRetention{");
sb.append("policy='").append(markerPolicy.getOptionName()).append('\'');
sb.append('}');
return sb.toString();
}
/**
* Return path policy for store and paths.
* @param path path
* @param capability capability
* @return true if a capability is active
*/
@Override
public boolean hasPathCapability(final Path path, final String capability) {
switch (capability) {
/*
* Marker policy is dynamically determined for the given path.
*/
case STORE_CAPABILITY_DIRECTORY_MARKER_AWARE:
return true;
case STORE_CAPABILITY_DIRECTORY_MARKER_POLICY_KEEP:
case STORE_CAPABILITY_DIRECTORY_MARKER_ACTION_DELETE:
return true;
case STORE_CAPABILITY_DIRECTORY_MARKER_POLICY_DELETE:
case STORE_CAPABILITY_DIRECTORY_MARKER_POLICY_AUTHORITATIVE:
case STORE_CAPABILITY_DIRECTORY_MARKER_ACTION_KEEP:
return false;
default:
throw new IllegalArgumentException("Unknown capability " + capability);
}
}
/**
* Create/Get the policy for this configuration.
* @param conf config
* @return a policy
*/
public static DirectoryPolicy getDirectoryPolicy(
final Configuration conf) {
String option = conf.getTrimmed(DIRECTORY_MARKER_POLICY,
DEFAULT_DIRECTORY_MARKER_POLICY)
.toLowerCase(Locale.ENGLISH);
switch (option) {
case DIRECTORY_MARKER_POLICY_DELETE:
// only supported policy
LOG.debug("Directory markers will be deleted");
break;
case DIRECTORY_MARKER_POLICY_KEEP:
case DIRECTORY_MARKER_POLICY_AUTHORITATIVE:
// known but not available.
LOG.info("Directory marker policy \"{}\" is unsupported,"
+ " using \"delete\"", option);
break;
default:
throw new IllegalArgumentException(UNKNOWN_MARKER_POLICY + option);
}
return DELETE;
}
/**
* Enumerate all available policies.
* @return set of the policies.
*/
public static Set<MarkerPolicy> availablePolicies() {
return AVAILABLE_POLICIES;
}
}

View File

@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3a.impl;
import java.util.EnumSet;
import java.util.Set;
/**
* Enum of probes which can be made of S3.
*/
public enum StatusProbeEnum {
/** The actual path. */
Head,
/** HEAD of the path + /. */
DirMarker,
/** LIST under the path. */
List;
/** Look for files and directories. */
public static final Set<StatusProbeEnum> ALL =
EnumSet.of(Head, List);
/** We only want the HEAD. */
public static final Set<StatusProbeEnum> HEAD_ONLY =
EnumSet.of(Head);
/** List operation only. */
public static final Set<StatusProbeEnum> LIST_ONLY =
EnumSet.of(List);
/** Look for files and directories. */
public static final Set<StatusProbeEnum> FILE =
HEAD_ONLY;
/** Skip the HEAD and only look for directories. */
public static final Set<StatusProbeEnum> DIRECTORIES =
LIST_ONLY;
}

View File

@ -37,6 +37,8 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileStatus;
@ -47,6 +49,7 @@ import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.fs.s3a.S3AFileStatus;
import org.apache.hadoop.fs.s3a.S3AFileSystem;
import org.apache.hadoop.fs.s3a.S3AUtils;
import org.apache.hadoop.fs.s3a.impl.DirectoryPolicy;
import org.apache.hadoop.fs.shell.CommandFormat;
import org.apache.hadoop.util.ExitUtil;
import org.apache.hadoop.util.GenericOptionsParser;
@ -59,6 +62,8 @@ import static org.apache.hadoop.service.launcher.LauncherExitCodes.*;
/**
* CLI to manage S3Guard Metadata Store.
*/
@InterfaceAudience.LimitedPrivate("management tools")
@InterfaceStability.Evolving
public abstract class S3GuardTool extends Configured implements Tool {
private static final Logger LOG = LoggerFactory.getLogger(S3GuardTool.class);
@ -960,6 +965,9 @@ public abstract class S3GuardTool extends Configured implements Tool {
public static final String AUTH_FLAG = "auth";
public static final String NONAUTH_FLAG = "nonauth";
public static final String ENCRYPTION_FLAG = "encryption";
public static final String MAGIC_FLAG = "magic";
public static final String MARKERS_FLAG = "markers";
public static final String MARKERS_AWARE = "aware";
public static final String PURPOSE = "provide/check S3Guard information"
+ " about a specific bucket";
@ -968,12 +976,22 @@ public abstract class S3GuardTool extends Configured implements Tool {
+ "Common options:\n"
+ " -" + GUARDED_FLAG + " - Require S3Guard\n"
+ " -" + ENCRYPTION_FLAG
+ " -require {none, sse-s3, sse-kms} - Require encryption policy";
+ " (none, sse-s3, sse-kms) - Require encryption policy\n"
+ " -" + MARKERS_FLAG
+ " (aware, keep, delete, authoritative) - directory markers policy\n";
BucketInfo(Configuration conf) {
@VisibleForTesting
public static final String IS_MARKER_AWARE =
"The S3A connector can read data in S3 buckets where"
+ " directory markers%n"
+ "are not deleted (optional with later hadoop releases),%n"
+ "and with buckets where they are.%n";
public BucketInfo(Configuration conf) {
super(conf, GUARDED_FLAG, UNGUARDED_FLAG, AUTH_FLAG, NONAUTH_FLAG);
CommandFormat format = getCommandFormat();
format.addOptionWithValue(ENCRYPTION_FLAG);
format.addOptionWithValue(MARKERS_FLAG);
}
@Override
@ -1053,10 +1071,53 @@ public abstract class S3GuardTool extends Configured implements Tool {
fsUri, desiredEncryption, encryption);
}
// directory markers
processMarkerOption(out, fs,
getCommandFormat().getOptValue(MARKERS_FLAG));
// and finally flush the output and report a success.
out.flush();
return SUCCESS;
}
/**
* Validate the marker options.
* @param out output stream
* @param fs filesystem
* @param path test path
* @param marker desired marker option -may be null.
*/
private void processMarkerOption(final PrintStream out,
final S3AFileSystem fs,
final String marker) {
DirectoryPolicy markerPolicy = fs.getDirectoryMarkerPolicy();
String desc = markerPolicy.describe();
println(out, "%nThe directory marker policy is \"%s\"%n", desc);
DirectoryPolicy.MarkerPolicy mp = markerPolicy.getMarkerPolicy();
String desiredMarker = marker == null
? ""
: marker.trim();
final String optionName = mp.getOptionName();
if (!desiredMarker.isEmpty()) {
if (MARKERS_AWARE.equalsIgnoreCase(desiredMarker)) {
// simple awareness test -provides a way to validate compatibility
// on the command line
println(out, IS_MARKER_AWARE);
println(out, "Available Policies: delete");
} else {
// compare with current policy
if (!optionName.equalsIgnoreCase(desiredMarker)) {
throw badState("Bucket %s: required marker policy is \"%s\""
+ " but actual policy is \"%s\"",
fs.getUri(), desiredMarker, optionName);
}
}
}
}
private String printOption(PrintStream out,
String description, String key, String defVal) {
String t = getFilesystem().getConf().getTrimmed(key, defVal);

View File

@ -0,0 +1,293 @@
<!---
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. See accompanying LICENSE file.
-->
# Controlling the S3A Directory Marker Behavior
From Hadoop 3.3.1 onwards, the S3A client can be configured to skip deleting
directory markers when creating files under paths. This removes all scalability
problems caused by deleting these markers -however, it is achieved at the expense
of backwards compatibility.
_This Hadoop release is compatible with versions of Hadoop which
can be configured to retain directory markers above files._
_It does not support any options to change the marker retention
policy to anything other than the default `delete` policy._
If the S3A filesystem is configured via
`fs.s3a.directory.marker.retention` to use a different policy
(i.e `keep` or `authoritative`),
a message will be logged at INFO and the connector will
use the "delete" policy.
The `s3guard bucket-info` tool [can be used to verify support](#bucket-info).
This allows for a command line check of compatibility, including
in scripts.
_For details on alternative marker retention policies and strategies
for safe usage, consult the documentation of a Hadoop release which
supports the ability to change the marker policy._
## <a name="bucket-info"></a> Verifying marker policy with `s3guard bucket-info`
The `bucket-info` command has been enhanced to support verification from the command
line of bucket policies via the `-marker` option
| option | verifies |
|--------|--------|
| `-markers aware` | the hadoop release is "aware" of directory markers |
| `-markers delete` | directory markers are deleted |
| `-markers keep` | directory markers are kept (will always fail) |
| `-markers authoritative` | directory markers are kept in authoritative paths (will always fail) |
All releases of Hadoop which have been updated to be marker aware will support the `-markers aware` option.
1. Updated releases which do not support switching marker retention policy will also support the
`-markers delete` option.
1. As this is such a a release, the other marker options
(`-markers keep` and `-markers authoritative`)] will always fail.
Probing for marker awareness: `s3guard bucket-info -markers aware`
```
> bin/hadoop s3guard bucket-info -markers aware s3a://landsat-pds/
Filesystem s3a://landsat-pds
Location: us-west-2
Filesystem s3a://landsat-pds is not using S3Guard
S3A Client
Endpoint: fs.s3a.endpoint=s3.amazonaws.com
Encryption: fs.s3a.server-side-encryption-algorithm=none
Input seek policy: fs.s3a.experimental.input.fadvise=normal
The directory marker policy is "delete"
The S3A connector can read data in S3 buckets where directory markers
are not deleted (optional with later hadoop releases),
and with buckets where they are.
Available Policies: delete
```
The same command will fail on older releases, because the `-markers` option
is unknown
```
> hadoop s3guard bucket-info -markers aware s3a://landsat-pds/
Illegal option -markers
Usage: hadoop bucket-info [OPTIONS] s3a://BUCKET
provide/check S3Guard information about a specific bucket
Common options:
-guarded - Require S3Guard
-unguarded - Force S3Guard to be disabled
-auth - Require the S3Guard mode to be "authoritative"
-nonauth - Require the S3Guard mode to be "non-authoritative"
-encryption -require {none, sse-s3, sse-kms} - Require encryption policy
When possible and not overridden by more specific options, metadata
repository information will be inferred from the S3A URL (if provided)
Generic options supported are:
-conf <config file> - specify an application configuration file
-D <property=value> - define a value for a given property
2020-08-12 16:47:16,579 [main] INFO util.ExitUtil (ExitUtil.java:terminate(210)) - Exiting with status 42:
Illegal option -markers
````
The `-markers delete` option will verify that this release will delete directory markers.
```
> hadoop s3guard bucket-info -markers delete s3a://landsat-pds/
Filesystem s3a://landsat-pds
Location: us-west-2
Filesystem s3a://landsat-pds is not using S3Guard
The "magic" committer is not supported
S3A Client
Endpoint: fs.s3a.endpoint=s3.amazonaws.com
Encryption: fs.s3a.server-side-encryption-algorithm=none
Input seek policy: fs.s3a.experimental.input.fadvise=normal
The directory marker policy is "delete"
```
As noted: the sole option available on this Hadoop release is `delete`. Other policy
probes will fail, returning error code 46. "unsupported"
```
> hadoop s3guard bucket-info -markers keep s3a://landsat-pds/
Filesystem s3a://landsat-pds
Location: us-west-2
Filesystem s3a://landsat-pds is not using S3Guard
The "magic" committer is not supported
S3A Client
Endpoint: fs.s3a.endpoint=s3.amazonaws.com
Encryption: fs.s3a.server-side-encryption-algorithm=none
Input seek policy: fs.s3a.experimental.input.fadvise=normal
The directory marker policy is "delete"
2020-08-25 12:20:18,805 [main] INFO util.ExitUtil (ExitUtil.java:terminate(210)) - Exiting with status 46:
46: Bucket s3a://landsat-pds: required marker policy is "keep" but actual policy is "delete"
```
Even if the bucket configuration attempts to change the marker policy, probes for `keep` and `authoritative`
will fail.
Take, for example, a configuration for a specific bucket to delete markers under the authoritative path `/tables`:
```xml
<property>
<name>fs.s3a.bucket.s3-london.directory.marker.retention</name>
<value>authoritative</value>
</property>
<property>
<name>fs.s3a.bucket.s3-london.authoritative.path</name>
<value>/tables</value>
</property>
```
The marker settings will be warned about on filesystem creation, and the marker policy to remain as `delete`.
Thus a check for `-markers authoritative` will fail
```
> hadoop s3guard bucket-info -markers authoritative s3a://s3-london/
2020-08-25 12:33:52,682 [main] INFO impl.DirectoryPolicyImpl (DirectoryPolicyImpl.java:getDirectoryPolicy(163)) -
Directory marker policy "authoritative" is unsupported, using "delete"
Filesystem s3a://s3-london
Location: eu-west-2
Filesystem s3a://s3-london is not using S3Guard
The "magic" committer is supported
S3A Client
Endpoint: fs.s3a.endpoint=s3.eu-west-2.amazonaws.com
Encryption: fs.s3a.server-side-encryption-algorithm=none
Input seek policy: fs.s3a.experimental.input.fadvise=normal
The directory marker policy is "delete"
2020-08-25 12:33:52,746 [main] INFO util.ExitUtil (ExitUtil.java:terminate(210)) - Exiting with status 46:
46: Bucket s3a://s3-london: required marker policy is "authoritative" but actual policy is "delete"
```
### <a name="pathcapabilities"></a> Probing for retention via `PathCapabilities` and `StreamCapabilities`
An instance of the filesystem can be probed for its directory marker retention ability/
policy can be probed for through the `org.apache.hadoop.fs.PathCapabilities` interface,
which all FileSystem classes have supported since Hadoop 3.2.
| Probe | Meaning |
|-------------------------|-------------------------|
| `fs.s3a.capability.directory.marker.aware` | Does the filesystem support surplus directory markers? |
| `fs.s3a.capability.directory.marker.policy.delete` | Is the bucket policy "delete"? |
| `fs.s3a.capability.directory.marker.policy.keep` | Is the bucket policy "keep"? |
| `fs.s3a.capability.directory.marker.policy.authoritative` | Is the bucket policy "authoritative"? |
| `fs.s3a.capability.directory.marker.action.delete` | If a file was created at this path, would directory markers be deleted? |
| `fs.s3a.capability.directory.marker.action.keep` | If a file was created at this path, would directory markers be retained? |
The probe `fs.s3a.capability.directory.marker.aware` allows for a filesystem to be
probed to determine if its file listing policy is "aware" of directory marker retention
-that is: can this s3a client safely work with S3 buckets where markers have not been deleted.
The `fs.s3a.capability.directory.marker.policy.` probes return the active policy for the bucket.
The two `fs.s3a.capability.directory.marker.action.` probes dynamically query the marker
retention behavior of a specific path.
That is: if a file was created at that location, would ancestor directory markers
be kept or deleted?
The `S3AFileSystem` class also implements the `org.apache.hadoop.fs.StreamCapabilities` interface, which
can be used to probe for marker awareness via the `fs.s3a.capability.directory.marker.aware` capability.
Again, this will be true if-and-only-if the S3A connector is safe to work with S3A buckets/paths where
directories are retained.
*If an S3A instance, probed by `PathCapabilities` or `StreamCapabilities` for the capability
`fs.s3a.capability.directory.marker.aware` and it returns false, *it is not safe to be used with
S3A paths where markers have been retained*.
## <a name="glossary"></a> Glossary
#### Directory Marker
An object in an S3 bucket with a trailing "/", used to indicate that there is a directory at that location.
These are necessary to maintain expectations about directories in an object store:
1. After `mkdirs(path)`, `exists(path)` holds.
1. After `rm(path/*)`, `exists(path)` holds.
In previous releases of Hadoop, the marker created by a `mkdirs()` operation was deleted after a file was created.
Rather than make a slow HEAD probe + optional marker DELETE of every parent path element, HADOOP-13164 switched
to enumerating all parent paths and issuing a single bulk DELETE request.
This is faster under light load, but
as each row in the delete consumes one write operation on the allocated IOPs of that bucket partition, creates
load issues when many worker threads/processes are writing to files.
This problem is bad on Apache Hive as:
* The hive partition structure places all files within the same S3 partition.
* As they are deep structures, there are many parent entries to include in the bulk delete calls.
* It's creating a lot temporary files, and still uses rename to commit output.
Apache Spark has less of an issue when an S3A committer is used -although the partition structure
is the same, the delayed manifestation of output files reduces load.
#### Leaf Marker
A directory marker which has not files or directory marker objects underneath.
It genuinely represents an empty directory.
#### Surplus Marker
A directory marker which is above one or more files, and so is superfluous.
These are the markers which were traditionally deleted; now it is optional.
Older versions of Hadoop mistake such surplus markers as Leaf Markers.
#### Versioned Bucket
An S3 Bucket which has Object Versioning enabled.
This provides a backup and recovery mechanism for data within the same
bucket: older objects can be listed and restored through the AWS S3 console
and some applications.
## References
<!-- if extending, keep JIRAs separate, have them in numerical order; the rest in lexical.` -->
* [HADOOP-13164](https://issues.apache.org/jira/browse/HADOOP-13164). _Optimize S3AFileSystem::deleteUnnecessaryFakeDirectories._
* [HADOOP-13230](https://issues.apache.org/jira/browse/HADOOP-13230). _S3A to optionally retain directory markers_
* [HADOOP-16090](https://issues.apache.org/jira/browse/HADOOP-16090). _S3A Client to add explicit support for versioned stores._
* [HADOOP-16823](https://issues.apache.org/jira/browse/HADOOP-16823). _Large DeleteObject requests are their own Thundering Herd_
* [Object Versioning](https://docs.aws.amazon.com/AmazonS3/latest/dev/Versioning.html). _Using versioning_
* [Optimizing Performance](https://docs.aws.amazon.com/AmazonS3/latest/dev/optimizing-performance.html). _Best Practices Design Patterns: Optimizing Amazon S3 Performance_

View File

@ -15,7 +15,23 @@
# Hadoop-AWS module: Integration with Amazon Web Services
<!-- MACRO{toc|fromDepth=0|toDepth=5} -->
<!-- MACRO{toc|fromDepth=0|toDepth=2} -->
## <a name="compatibility"></a> Compatibility
### <a name="directory-marker-compatibility"></a> Directory Marker Compatibility
1. This release can safely list/index/read S3 buckets where "empty directory"
markers are retained.
1. This release does not support the switching to directory marker
policies of "keep" and "authoritative" -the
Consult [Controlling the S3A Directory Marker Behavior](directory_markers.html) for
full details.
## Overview
@ -42,6 +58,7 @@ See also:
* [Testing](testing.html)
* [Troubleshooting S3a](troubleshooting_s3a.html)
* [S3Guard](s3guard.html)
* [Controlling the S3A Directory Marker Behavior](directory_markers.html).
### Warning #1: Object Stores are not filesystems

View File

@ -60,6 +60,10 @@ public abstract class AbstractS3ATestBase extends AbstractFSContractTestBase
Thread.currentThread().setName("JUnit-" + methodName.getMethodName());
}
protected String getMethodName() {
return methodName.getMethodName();
}
@Override
protected int getTestTimeoutMillis() {
return S3A_TEST_TIMEOUT;

View File

@ -33,6 +33,7 @@ import java.net.URI;
import static org.apache.hadoop.fs.contract.ContractTestUtils.*;
import static org.apache.hadoop.fs.s3a.Statistic.*;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.*;
import static org.apache.hadoop.fs.s3a.performance.OperationCost.*;
import static org.apache.hadoop.test.GenericTestUtils.getTestDir;
import static org.junit.Assume.assumeFalse;
@ -87,9 +88,9 @@ public class ITestS3AFileOperationCost extends AbstractS3ATestBase {
status.isEmptyDirectory() == Tristate.TRUE);
if (!fs.hasMetadataStore()) {
metadataRequests.assertDiffEquals(2);
metadataRequests.assertDiffEquals(GET_FILE_STATUS_ON_EMPTY_DIR.head());
}
listRequests.assertDiffEquals(0);
listRequests.assertDiffEquals(GET_FILE_STATUS_ON_EMPTY_DIR.list());
}
@Test
@ -104,8 +105,8 @@ public class ITestS3AFileOperationCost extends AbstractS3ATestBase {
} catch (FileNotFoundException expected) {
// expected
}
metadataRequests.assertDiffEquals(2);
listRequests.assertDiffEquals(1);
metadataRequests.assertDiffEquals(GET_FILE_STATUS_FNFE.head());
listRequests.assertDiffEquals(GET_FILE_STATUS_FNFE.list());
}
@Test
@ -120,8 +121,8 @@ public class ITestS3AFileOperationCost extends AbstractS3ATestBase {
} catch (FileNotFoundException expected) {
// expected
}
metadataRequests.assertDiffEquals(2);
listRequests.assertDiffEquals(1);
metadataRequests.assertDiffEquals(GET_FILE_STATUS_FNFE.head());
listRequests.assertDiffEquals(GET_FILE_STATUS_FNFE.list());
}
@Test
@ -142,8 +143,8 @@ public class ITestS3AFileOperationCost extends AbstractS3ATestBase {
+ "\n" + fsState);
}
if (!fs.hasMetadataStore()) {
metadataRequests.assertDiffEquals(2);
listRequests.assertDiffEquals(1);
metadataRequests.assertDiffEquals(GET_FILE_STATUS_FNFE.head());
listRequests.assertDiffEquals(GET_FILE_STATUS_FNFE.list());
}
}

View File

@ -41,6 +41,7 @@ import java.net.URISyntaxException;
import java.util.List;
import java.util.concurrent.Callable;
import static com.google.common.base.Preconditions.checkNotNull;
import static org.apache.hadoop.fs.contract.ContractTestUtils.skip;
import static org.apache.hadoop.fs.s3a.S3ATestConstants.*;
import static org.apache.hadoop.fs.s3a.Constants.*;
@ -471,6 +472,19 @@ public final class S3ATestUtils {
}});
}
/**
* Get the name of the test bucket.
* @param conf configuration to scan.
* @return the bucket name from the config.
* @throws NullPointerException: no test bucket
*/
public static String getTestBucketName(final Configuration conf) {
String bucket = checkNotNull(conf.get(TEST_FS_S3A_NAME),
"No test bucket");
return URI.create(bucket).getHost();
}
/**
* Remove any values from a bucket.
* @param bucket bucket whose overrides are to be removed. Can be null/empty

View File

@ -68,11 +68,15 @@ public class TestS3AGetFileStatus extends AbstractS3AMockTest {
String key = path.toUri().getPath().substring(1);
when(s3.getObjectMetadata(argThat(correctGetMetadataRequest(BUCKET, key))))
.thenThrow(NOT_FOUND);
ObjectMetadata meta = new ObjectMetadata();
meta.setContentLength(0L);
when(s3.getObjectMetadata(argThat(
correctGetMetadataRequest(BUCKET, key + "/"))
)).thenReturn(meta);
String keyDir = key + "/";
ObjectListing listResult = new ObjectListing();
S3ObjectSummary objectSummary = new S3ObjectSummary();
objectSummary.setKey(keyDir);
objectSummary.setSize(0L);
listResult.getObjectSummaries().add(objectSummary);
when(s3.listObjects(argThat(
matchListRequest(BUCKET, keyDir))
)).thenReturn(listResult);
FileStatus stat = fs.getFileStatus(path);
assertNotNull(stat);
assertEquals(fs.makeQualified(path), stat.getPath());
@ -162,4 +166,28 @@ public class TestS3AGetFileStatus extends AbstractS3AMockTest {
}
};
}
private Matcher<ListObjectsRequest> matchListRequest(
final String bucket, final String key) {
return new BaseMatcher<ListObjectsRequest>() {
@Override
public void describeTo(Description description) {
description.appendText("bucket and key match");
}
@Override
public boolean matches(Object o) {
if(o instanceof ListObjectsRequest) {
ListObjectsRequest request =
(ListObjectsRequest)o;
return request.getBucketName().equals(bucket)
&& request.getPrefix().equals(key);
}
return false;
}
};
}
}

View File

@ -0,0 +1,155 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3a.impl;
import java.util.Arrays;
import java.util.Collection;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import static org.apache.hadoop.fs.s3a.Constants.DIRECTORY_MARKER_POLICY;
import static org.apache.hadoop.fs.s3a.Constants.STORE_CAPABILITY_DIRECTORY_MARKER_ACTION_DELETE;
import static org.apache.hadoop.fs.s3a.Constants.STORE_CAPABILITY_DIRECTORY_MARKER_ACTION_KEEP;
import static org.junit.Assert.assertEquals;
/**
* Unit tests for directory marker policies.
* <p></p>
* As this FS only supports "delete", this is more minimal test
* suite than on later versions.
* <p></p>
* It helps ensure that there aren't unexpected problems if the site
* configuration asks for retention of some form.
*/
@RunWith(Parameterized.class)
public class TestDirectoryMarkerPolicy {
@Parameterized.Parameters(name = "{0}")
public static Collection<Object[]> data() {
return Arrays.asList(new Object[][]{
{
DirectoryPolicy.MarkerPolicy.Delete,
},
{
DirectoryPolicy.MarkerPolicy.Keep,
},
{
DirectoryPolicy.MarkerPolicy.Authoritative,
}
});
}
private final DirectoryPolicy directoryPolicy;
private static final boolean EXPECT_MARKER_RETENTION = false;
public TestDirectoryMarkerPolicy(
final DirectoryPolicy.MarkerPolicy markerPolicy) {
this.directoryPolicy = newPolicy(markerPolicy);
}
/**
* Create a new retention policy.
* @param markerPolicy policy option
* @return the retention policy.
*/
private DirectoryPolicy newPolicy(
DirectoryPolicy.MarkerPolicy markerPolicy) {
return new DirectoryPolicyImpl(markerPolicy);
}
private final Path nonAuthPath = new Path("s3a://bucket/nonauth/data");
private final Path authPath = new Path("s3a://bucket/auth/data1");
private final Path deepAuth = new Path("s3a://bucket/auth/d1/d2/data2");
/**
* Assert that a path has a retention outcome.
* @param path path
* @param retain should the marker be retained
*/
private void assertMarkerRetention(Path path, boolean retain) {
assertEquals("Retention of path " + path + " by " + directoryPolicy,
retain,
directoryPolicy.keepDirectoryMarkers(path));
}
/**
* Assert that a path has a capability.
*/
private void assertPathCapability(Path path,
String capability,
boolean outcome) {
assertEquals(String.format(
"%s support for capability %s by path %s expected as %s",
directoryPolicy, capability, path, outcome),
outcome,
directoryPolicy.hasPathCapability(path, capability));
}
@Test
public void testNonAuthPath() throws Throwable {
assertMarkerRetention(nonAuthPath, EXPECT_MARKER_RETENTION);
assertPathCapability(nonAuthPath,
STORE_CAPABILITY_DIRECTORY_MARKER_ACTION_DELETE,
!EXPECT_MARKER_RETENTION);
assertPathCapability(nonAuthPath,
STORE_CAPABILITY_DIRECTORY_MARKER_ACTION_KEEP,
EXPECT_MARKER_RETENTION);
}
@Test
public void testAuthPath() throws Throwable {
assertMarkerRetention(authPath, EXPECT_MARKER_RETENTION);
assertPathCapability(authPath,
STORE_CAPABILITY_DIRECTORY_MARKER_ACTION_DELETE,
!EXPECT_MARKER_RETENTION);
assertPathCapability(authPath,
STORE_CAPABILITY_DIRECTORY_MARKER_ACTION_KEEP,
EXPECT_MARKER_RETENTION);
}
@Test
public void testDeepAuthPath() throws Throwable {
assertMarkerRetention(deepAuth, EXPECT_MARKER_RETENTION);
assertPathCapability(deepAuth,
STORE_CAPABILITY_DIRECTORY_MARKER_ACTION_DELETE,
!EXPECT_MARKER_RETENTION);
assertPathCapability(deepAuth,
STORE_CAPABILITY_DIRECTORY_MARKER_ACTION_KEEP,
EXPECT_MARKER_RETENTION);
}
@Test
public void testInstantiate() throws Throwable {
Configuration conf = new Configuration(false);
conf.set(DIRECTORY_MARKER_POLICY,
directoryPolicy.getMarkerPolicy().getOptionName());
DirectoryPolicy policy = DirectoryPolicyImpl.getDirectoryPolicy(
conf);
assertEquals(DirectoryPolicyImpl.DELETE, policy);
}
}

View File

@ -0,0 +1,847 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3a.performance;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.Callable;
import com.amazonaws.AmazonClientException;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.model.ObjectMetadata;
import com.google.common.base.Joiner;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileAlreadyExistsException;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathIsNotEmptyDirectoryException;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.fs.contract.ContractTestUtils;
import org.apache.hadoop.fs.s3a.AbstractS3ATestBase;
import org.apache.hadoop.fs.s3a.S3AFileSystem;
import org.apache.hadoop.fs.s3a.S3AUtils;
import static org.apache.hadoop.fs.contract.ContractTestUtils.touch;
import static org.apache.hadoop.fs.s3a.Constants.METADATASTORE_AUTHORITATIVE;
import static org.apache.hadoop.fs.s3a.Constants.S3_METADATA_STORE_IMPL;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.assume;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.getTestBucketName;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
/**
* This is a test suite designed to verify that directory markers do
* not get misconstrued as empty directories during operations
* which explicitly or implicitly list directory trees.
* <p></p>
* It is also intended it to be backported to all releases
* which are enhanced to read directory trees where markers have
* been retained.
* Hence: it does not use any of the new helper classes to
* measure the cost of operations or attempt to create markers
* through the FS APIs.
* <p></p>
* Instead, the directory structure to test is created through
* low-level S3 SDK API calls.
* We also skip any probes to measure/assert metrics.
* We're testing the semantics here, not the cost of the operations.
* Doing that makes it a lot easier to backport.
*
* <p></p>
* Similarly: JUnit assertions over AssertJ.
* <p></p>
* The tests work with unguarded buckets only -the bucket settings are changed
* appropriately.
*/
public class ITestDirectoryMarkerListing extends AbstractS3ATestBase {
private static final Logger LOG =
LoggerFactory.getLogger(ITestDirectoryMarkerListing.class);
private static final String FILENAME = "fileUnderMarker";
private static final String HELLO = "hello";
private static final String MARKER = "marker";
private static final String MARKER_PEER = "markerpeer";
/**
* Does rename copy markers?
* Value: {@value}
* <p></p>
* Older releases: yes.
* <p></p>
* The full marker-optimized releases: no.
*/
private static final boolean RENAME_COPIES_MARKERS = true;
/**
* Path to a directory which has a marker.
*/
private Path markerDir;
/**
* Key to the object representing {@link #markerDir}.
*/
private String markerKey;
/**
* Key to the object representing {@link #markerDir} with
* a trailing / added. This references the actual object
* which has been created.
*/
private String markerKeySlash;
/**
* bucket of tests.
*/
private String bucket;
/**
* S3 Client of the FS.
*/
private AmazonS3 s3client;
/**
* Path to a file under the marker.
*/
private Path filePathUnderMarker;
/**
* Key to a file under the marker.
*/
private String fileKeyUnderMarker;
/**
* base path for the test files; the marker dir goes under this.
*/
private Path basePath;
/**
* Path to a file a peer of markerDir.
*/
private Path markerPeer;
/**
* Key to a file a peer of markerDir.
*/
private String markerPeerKey;
public ITestDirectoryMarkerListing() {
}
@Override
protected Configuration createConfiguration() {
Configuration conf = super.createConfiguration();
String bucketName = getTestBucketName(conf);
// Turn off S3Guard
removeBaseAndBucketOverrides(bucketName, conf,
S3_METADATA_STORE_IMPL,
METADATASTORE_AUTHORITATIVE);
return conf;
}
/**
* The setup phase includes creating the test objects.
*/
@Override
public void setup() throws Exception {
super.setup();
S3AFileSystem fs = getFileSystem();
assume("unguarded FS only",
!fs.hasMetadataStore());
s3client = fs.getAmazonS3ClientForTesting("markers");
bucket = fs.getBucket();
Path base = new Path(methodPath(), "base");
createTestObjects(base);
}
/**
* Teardown deletes the objects created before
* the superclass does the directory cleanup.
*/
@Override
public void teardown() throws Exception {
if (s3client != null) {
deleteObject(markerKey);
deleteObject(markerKeySlash);
deleteObject(markerPeerKey);
deleteObject(fileKeyUnderMarker);
}
// do this ourselves to avoid audits teardown failing
// when surplus markers are found
deleteTestDirInTeardown();
super.teardown();
}
public Path methodPath() throws IOException {
return path(getMethodName());
}
/**
* Create the test objects under the given path, setting
* various fields in the process.
* @param path parent path of everything
*/
private void createTestObjects(final Path path) throws Exception {
S3AFileSystem fs = getFileSystem();
basePath = path;
markerDir = new Path(basePath, MARKER);
// peer path has the same initial name to make sure there
// is no confusion there.
markerPeer = new Path(basePath, MARKER_PEER);
markerPeerKey = fs.pathToKey(markerPeer);
markerKey = fs.pathToKey(markerDir);
markerKeySlash = markerKey + "/";
fileKeyUnderMarker = markerKeySlash + FILENAME;
filePathUnderMarker = new Path(markerDir, FILENAME);
// put the empty dir
fs.mkdirs(markerDir);
touch(fs, markerPeer);
put(fileKeyUnderMarker, HELLO);
}
/*
=================================================================
Basic probes
=================================================================
*/
@Test
public void testMarkerExists() throws Throwable {
describe("Verify the marker exists");
head(markerKeySlash);
assertIsDirectory(markerDir);
}
@Test
public void testObjectUnderMarker() throws Throwable {
describe("verify the file under the marker dir exists");
assertIsFile(filePathUnderMarker);
head(fileKeyUnderMarker);
}
/*
=================================================================
The listing operations
=================================================================
*/
@Test
public void testListStatusMarkerDir() throws Throwable {
describe("list the marker directory and expect to see the file");
assertContainsFileUnderMarkerOnly(
toList(getFileSystem().listStatus(markerDir)));
}
@Test
public void testListFilesMarkerDirFlat() throws Throwable {
assertContainsFileUnderMarkerOnly(toList(
getFileSystem().listFiles(markerDir, false)));
}
@Test
public void testListFilesMarkerDirRecursive() throws Throwable {
List<FileStatus> statuses = toList(
getFileSystem().listFiles(markerDir, true));
assertContainsFileUnderMarkerOnly(statuses);
}
/**
* Path listing above the base dir MUST only find the file
* and not the marker.
*/
@Test
public void testListStatusBaseDirRecursive() throws Throwable {
List<FileStatus> statuses = toList(
getFileSystem().listFiles(basePath, true));
assertContainsExactlyStatusOfPaths(statuses, filePathUnderMarker,
markerPeer);
}
@Test
public void testGlobStatusBaseDirRecursive() throws Throwable {
final Path escapedPath = new Path(escape(basePath.toUri().getPath()));
List<FileStatus> statuses =
exec("glob", new Callable<List<FileStatus>>() {
@Override
public List<FileStatus> call() throws Exception {
return ITestDirectoryMarkerListing.this.toList(
ITestDirectoryMarkerListing.this.getFileSystem()
.globStatus(new Path(escapedPath, "*")));
}
});
assertContainsExactlyStatusOfPaths(statuses, markerDir, markerPeer);
assertIsFileAtPath(markerPeer, statuses.get(1));
}
@Test
public void testGlobStatusMarkerDir() throws Throwable {
final Path escapedPath = new Path(escape(markerDir.toUri().getPath()));
List<FileStatus> statuses =
exec("glob", new Callable<List<FileStatus>>() {
@Override
public List<FileStatus> call() throws Exception {
return ITestDirectoryMarkerListing.this.toList(
ITestDirectoryMarkerListing.this.getFileSystem()
.globStatus(new Path(escapedPath, "*")));
}
});
assertContainsFileUnderMarkerOnly(statuses);
}
/**
* Call {@code listLocatedStatus(basePath)}
* <p></p>
* The list here returns the marker peer before the
* dir. Reason: the listing iterators return
* the objects before the common prefixes, and the
* marker dir is coming back as a prefix.
*/
@Test
public void testListLocatedStatusBaseDir() throws Throwable {
List<FileStatus> statuses =
exec("listLocatedStatus", new Callable<List<FileStatus>>() {
@Override
public List<FileStatus> call() throws Exception {
return ITestDirectoryMarkerListing.this.toList(
ITestDirectoryMarkerListing.this.getFileSystem()
.listLocatedStatus(basePath));
}
});
assertContainsExactlyStatusOfPaths(statuses, markerPeer, markerDir);
}
/**
* Call {@code listLocatedStatus(markerDir)}; expect
* the file entry only.
*/
@Test
public void testListLocatedStatusMarkerDir() throws Throwable {
List<FileStatus> statuses =
exec("listLocatedStatus", new Callable<List<FileStatus>>() {
@Override
public List<FileStatus> call() throws Exception {
return ITestDirectoryMarkerListing.this.toList(
ITestDirectoryMarkerListing.this.getFileSystem()
.listLocatedStatus(markerDir));
}
});
assertContainsFileUnderMarkerOnly(statuses);
}
/*
=================================================================
Creation Rejection
=================================================================
*/
@Test
public void testCreateNoOverwriteMarkerDir() throws Throwable {
describe("create no-overwrite over the marker dir fails");
head(markerKeySlash);
intercept(FileAlreadyExistsException.class,
new Callable<Object>() {
@Override
public Object call() throws Exception {
return ITestDirectoryMarkerListing.this.exec("create",
new Callable<Object>() {
@Override
public Object call() throws Exception {
return ITestDirectoryMarkerListing.this.getFileSystem()
.create(markerDir, false);
}
});
}
});
// dir is still there.
head(markerKeySlash);
}
@Test
public void testCreateNoOverwriteFile() throws Throwable {
describe("create-no-overwrite on the file fails");
head(fileKeyUnderMarker);
intercept(FileAlreadyExistsException.class,
new Callable<Object>() {
@Override
public Object call() throws Exception {
return ITestDirectoryMarkerListing.this.exec("create",
new Callable<Object>() {
@Override
public Object call() throws Exception {
return ITestDirectoryMarkerListing.this.getFileSystem()
.create(filePathUnderMarker, false);
}
});
}
});
assertTestObjectsExist();
}
@Test
public void testCreateFileNoOverwrite() throws Throwable {
describe("verify the createFile() API also fails");
head(fileKeyUnderMarker);
intercept(FileAlreadyExistsException.class, "",
new Callable<Object>() {
@Override
public Object call() throws Exception {
return ITestDirectoryMarkerListing.this.exec("create",
new Callable<Object>() {
@Override
public Object call() throws Exception {
return ITestDirectoryMarkerListing.this.getFileSystem()
.createFile(filePathUnderMarker)
.overwrite(false)
.build();
}
});
}
});
assertTestObjectsExist();
}
/*
=================================================================
Delete.
=================================================================
*/
@Test
public void testDelete() throws Throwable {
final S3AFileSystem fs = getFileSystem();
// a non recursive delete MUST fail because
// it is not empty
intercept(PathIsNotEmptyDirectoryException.class,
new Callable<Object>() {
@Override
public Object call() throws Exception {
return fs.delete(markerDir, false);
}
});
// file is still there
head(fileKeyUnderMarker);
// recursive delete MUST succeed
fs.delete(markerDir, true);
// and the markers are gone
head404(fileKeyUnderMarker);
head404(markerKeySlash);
// just for completeness
fs.delete(basePath, true);
}
/*
=================================================================
Rename.
=================================================================
*/
/**
* Rename the base directory, expect the source files to move.
* <p></p>
* Whether or not the marker itself is copied depends on whether
* the release's rename operation explicitly skips
* markers on renames.
*/
@Test
public void testRenameBase() throws Throwable {
describe("rename base directory");
Path src = basePath;
Path dest = new Path(methodPath(), "dest");
assertRenamed(src, dest);
assertPathDoesNotExist("source", src);
assertPathDoesNotExist("source", filePathUnderMarker);
assertPathExists("dest not found", dest);
// all the paths dest relative
Path destMarkerDir = new Path(dest, MARKER);
// peer path has the same initial name to make sure there
// is no confusion there.
Path destMarkerPeer = new Path(dest, MARKER_PEER);
String destMarkerKey = toKey(destMarkerDir);
String destMarkerKeySlash = destMarkerKey + "/";
String destFileKeyUnderMarker = destMarkerKeySlash + FILENAME;
Path destFilePathUnderMarker = new Path(destMarkerDir, FILENAME);
assertIsFile(destFilePathUnderMarker);
assertIsFile(destMarkerPeer);
head(destFileKeyUnderMarker);
// probe for the marker based on expected rename
// behavior
if (RENAME_COPIES_MARKERS) {
head(destMarkerKeySlash);
} else {
head404(destMarkerKeySlash);
}
}
/**
* Rename a file under a marker by passing in the marker
* directory as the destination; the final path is derived
* from the original filename.
* <p></p>
* After the rename:
* <ol>
* <li>The data must be at the derived destination path.</li>
* <li>The source file must not exist.</li>
* <li>The parent dir of the source file must exist.</li>
* <li>The marker above the destination file must not exist.</li>
* </ol>
*/
@Test
public void testRenameUnderMarkerDir() throws Throwable {
describe("directory rename under an existing marker");
String name = "sourceFile";
Path srcDir = new Path(basePath, "srcdir");
mkdirs(srcDir);
Path src = new Path(srcDir, name);
String srcKey = toKey(src);
put(srcKey, name);
head(srcKey);
// set the destination to be the marker directory.
Path dest = markerDir;
// rename the source file under the dest dir.
assertRenamed(src, dest);
assertIsFile(new Path(dest, name));
assertIsDirectory(srcDir);
head404(markerKeySlash);
}
/**
* Rename file under a marker, giving the full path to the destination
* file.
* <p></p>
* After the rename:
* <ol>
* <li>The data must be at the explicit destination path.</li>
* <li>The source file must not exist.</li>
* <li>The parent dir of the source file must exist.</li>
* <li>The marker above the destination file must not exist.</li>
* </ol>
*/
@Test
public void testRenameUnderMarkerWithPath() throws Throwable {
describe("directory rename under an existing marker");
String name = "sourceFile";
Path srcDir = new Path(basePath, "srcdir");
mkdirs(srcDir);
Path src = new Path(srcDir, name);
String srcKey = toKey(src);
put(srcKey, name);
head(srcKey);
// set the destination to be the final file
Path dest = new Path(markerDir, "destFile");
// rename the source file to the destination file
assertRenamed(src, dest);
assertIsFile(dest);
assertIsDirectory(srcDir);
head404(markerKeySlash);
}
/**
* This test creates an empty dir and renames it over the directory marker.
* If the dest was considered to be empty, the rename would fail.
*/
@Test
public void testRenameEmptyDirOverMarker() throws Throwable {
describe("rename an empty directory over the marker");
S3AFileSystem fs = getFileSystem();
String name = "sourceDir";
Path src = new Path(basePath, name);
fs.mkdirs(src);
assertIsDirectory(src);
String srcKey = toKey(src) + "/";
head(srcKey);
Path dest = markerDir;
// renamed into the dest dir
assertFalse("rename(" + src + ", " + dest + ") should have failed",
getFileSystem().rename(src, dest));
// source is still there
assertIsDirectory(src);
head(srcKey);
// and a non-recursive delete lets us verify it is considered
// an empty dir
assertDeleted(src, false);
assertTestObjectsExist();
}
/*
=================================================================
Utility methods and assertions.
=================================================================
*/
/**
* Assert the test objects exist.
*/
private void assertTestObjectsExist() throws Exception {
head(fileKeyUnderMarker);
head(markerKeySlash);
}
/**
* Put a string to a path.
* @param key key
* @param content string
*/
private void put(final String key, final String content) throws Exception {
exec("PUT " + key, new Callable<Object>() {
@Override
public Object call() throws Exception {
return s3client.putObject(bucket, key, content);
}
});
}
/**
* Delete an object.
* @param key key
* @param content string
*/
private void deleteObject(final String key) throws Exception {
exec("DELETE " + key, new Callable<Object>() {
@Override
public Object call() throws Exception {
s3client.deleteObject(bucket, key);
return "deleted " + key;
}
});
}
/**
* Issue a HEAD request.
* @param key
* @return a description of the object.
*/
private String head(final String key) throws Exception {
ObjectMetadata md = exec("HEAD " + key, new Callable<ObjectMetadata>() {
@Override
public ObjectMetadata call() throws Exception {
return s3client.getObjectMetadata(bucket, key);
}
});
return String.format("Object %s of length %d",
key, md.getInstanceLength());
}
/**
* Issue a HEAD request and expect a 404 back.
* @param key
* @return the metadata
*/
private void head404(final String key) throws Exception {
intercept(FileNotFoundException.class, "",
new Callable<String>() {
@Override
public String call() throws Exception {
return ITestDirectoryMarkerListing.this.head(key);
}
});
}
/**
* Execute an operation; transate AWS exceptions.
* @param op operation
* @param call call to make
* @param <T> returned type
* @return result of the call.
* @throws Exception failure
*/
private <T> T exec(String op, Callable<T> call) throws Exception {
ContractTestUtils.NanoTimer timer = new ContractTestUtils.NanoTimer();
try {
return call.call();
} catch (AmazonClientException ex) {
throw S3AUtils.translateException(op, "", ex);
} finally {
timer.end(op);
}
}
/**
* Assert that the listing contains only the status
* of the file under the marker.
* @param statuses status objects
*/
private void assertContainsFileUnderMarkerOnly(
final List<FileStatus> statuses) {
assertContainsExactlyStatusOfPaths(statuses, filePathUnderMarker);
assertIsFileUnderMarker(statuses.get(0));
}
/**
* Expect the list of status objects to match that of the paths.
* @param statuses status object list
* @param paths ordered varargs list of paths
* @param <T> type of status objects
*/
private <T extends FileStatus> void assertContainsExactlyStatusOfPaths(
List<T> statuses, Path... paths) {
String actual = Joiner.on(";").join(statuses);
String expected = Joiner.on(";").join(paths);
String summary = "expected [" + expected + "]"
+ " actual = [" + actual + "]";
assertEquals("mismatch in size of listing " + summary,
paths.length, statuses.size());
for (int i = 0; i < statuses.size(); i++) {
assertEquals("Path mismatch at element " + i + " in " + summary,
paths[i], statuses.get(i).getPath());
}
}
/**
* Assert the status object refers to the file created
* under the marker.
* @param stat status object
*/
private void assertIsFileUnderMarker(final FileStatus stat) {
assertIsFileAtPath(filePathUnderMarker, stat);
}
/**
* Assert the status object refers to a path at the given name.
* @param path path
* @param stat status object
*/
private void assertIsFileAtPath(final Path path, final FileStatus stat) {
assertTrue("Is not file " + stat, stat.isFile());
assertPathEquals(path, stat);
}
/**
* Assert a status object's path matches expected.
* @param path path to expect
* @param stat status object
*/
private void assertPathEquals(final Path path, final FileStatus stat) {
assertEquals("filename is not the expected path :" + stat,
path, stat.getPath());
}
/**
* Given a remote iterator of status objects,
* build a list of the values.
* @param <T> actual type.
* @param status status list
* @return source.
* @throws IOException
*/
private <T extends FileStatus> List<FileStatus> toList(
RemoteIterator<T> status) throws IOException {
List<T> l = new ArrayList<>();
while (status.hasNext()) {
l.add(status.next());
}
return (List<FileStatus>)dump(l);
}
/**
* Given an array of status objects,
* build a list of the values.
* @param status status list
* @param <T> actual type.
* @return source.
* @throws IOException
*/
private <T extends FileStatus> List<T> toList(
T[] status) throws IOException {
return dump(Arrays.asList(status));
}
/**
* Dump the string values of a list to the log; return
* the list.
* @param l source.
* @param <T> source type
* @return the list
*/
private <T extends FileStatus> List<T> dump(List<T> l) {
int c = 1;
for (T t : l) {
LOG.info("{}\t{}", c++, t);
}
return l;
}
/**
* Rename: assert the outcome is true.
* @param src source path
* @param dest dest path
*/
private void assertRenamed(final Path src, final Path dest)
throws IOException {
assertTrue("rename(" + src + ", " + dest + ") failed",
getFileSystem().rename(src, dest));
}
/**
* Convert a path to a key; does not add any trailing / .
* @param path path in
* @return key out
*/
private String toKey(final Path path) {
return getFileSystem().pathToKey(path);
}
/**
* Escape paths before handing to globStatus; this is needed as
* parameterized runs produce paths with [] in them.
* @param pathstr source path string
* @return an escaped path string
*/
private String escape(String pathstr) {
StringBuilder r = new StringBuilder();
for (char c : pathstr.toCharArray()) {
String ch = Character.toString(c);
if ("?*[{".contains(ch)) {
r.append("\\");
}
r.append(ch);
}
return r.toString();
}
}

View File

@ -0,0 +1,200 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3a.performance;
/**
* Declaration of the costs of head and list calls for various FS IO
* operations.
* <p></p>
* An instance declares the number of head and list calls expected for
* various operations -with a {@link #plus(OperationCost)}
* method to add operation costs together to produce an
* aggregate cost. These can then be validated in tests.
*
*/
public final class OperationCost {
/** Head costs for getFileStatus() directory probe: {@value}. */
public static final int FILESTATUS_DIR_PROBE_H = 0;
/** List costs for getFileStatus() directory probe: {@value}. */
public static final int FILESTATUS_DIR_PROBE_L = 1;
/** Head cost getFileStatus() file probe only. */
public static final int FILESTATUS_FILE_PROBE_H = 1;
/** Liast cost getFileStatus() file probe only. */
public static final int FILESTATUS_FILE_PROBE_L = 0;
/**
* Delete cost when deleting an object.
*/
public static final int DELETE_OBJECT_REQUEST = 1;
/**
* Delete cost when deleting a marker.
*/
public static final int DELETE_MARKER_REQUEST = DELETE_OBJECT_REQUEST;
/**
* No IO takes place.
*/
public static final OperationCost NO_IO =
new OperationCost(0, 0);
/** A HEAD operation. */
public static final OperationCost HEAD_OPERATION = new OperationCost(1, 0);
/** A LIST operation. */
public static final OperationCost LIST_OPERATION = new OperationCost(0, 1);
/**
* Cost of {@link org.apache.hadoop.fs.s3a.impl.StatusProbeEnum#DIRECTORIES}.
*/
public static final OperationCost FILE_STATUS_DIR_PROBE = LIST_OPERATION;
/**
* Cost of {@link org.apache.hadoop.fs.s3a.impl.StatusProbeEnum#FILE}.
*/
public static final OperationCost FILE_STATUS_FILE_PROBE = HEAD_OPERATION;
/**
* Cost of {@link org.apache.hadoop.fs.s3a.impl.StatusProbeEnum#ALL}.
*/
public static final OperationCost FILE_STATUS_ALL_PROBES =
FILE_STATUS_FILE_PROBE.plus(FILE_STATUS_DIR_PROBE);
/** getFileStatus() on a file which exists. */
public static final OperationCost GET_FILE_STATUS_ON_FILE =
FILE_STATUS_FILE_PROBE;
/** List costs for getFileStatus() on a non-empty directory: {@value}. */
public static final OperationCost GET_FILE_STATUS_ON_DIR =
FILE_STATUS_FILE_PROBE.plus(FILE_STATUS_DIR_PROBE);
/** Costs for getFileStatus() on an empty directory: {@value}. */
public static final OperationCost GET_FILE_STATUS_ON_EMPTY_DIR =
GET_FILE_STATUS_ON_DIR;
/** getFileStatus() directory marker which exists. */
public static final OperationCost GET_FILE_STATUS_ON_DIR_MARKER =
GET_FILE_STATUS_ON_EMPTY_DIR;
/** getFileStatus() call which fails to find any entry. */
public static final OperationCost GET_FILE_STATUS_FNFE =
FILE_STATUS_ALL_PROBES;
/** listLocatedStatus always does a LIST. */
public static final OperationCost LIST_LOCATED_STATUS_LIST_OP =
new OperationCost(0, 1);
/** listFiles always does a LIST. */
public static final OperationCost LIST_FILES_LIST_OP =
new OperationCost(0, 1);
/**
* Metadata cost of a copy operation, as used during rename.
* This happens even if the store is guarded.
*/
public static final OperationCost COPY_OP =
new OperationCost(1, 0);
/**
* Cost of renaming a file to a different directory.
* <p></p>
* LIST on dest not found, look for dest dir, and then, at
* end of rename, whether a parent dir needs to be created.
*/
public static final OperationCost RENAME_SINGLE_FILE_DIFFERENT_DIR =
FILE_STATUS_FILE_PROBE // source file probe
.plus(GET_FILE_STATUS_FNFE) // dest does not exist
.plus(FILE_STATUS_DIR_PROBE) // parent dir of dest
.plus(FILE_STATUS_DIR_PROBE) // recreate source parent dir?
.plus(COPY_OP); // metadata read on copy
/**
* Cost of renaming a file to the same directory
* <p></p>
* No need to look for parent directories, so only file
* existence checks and the copy.
*/
public static final OperationCost RENAME_SINGLE_FILE_SAME_DIR =
FILE_STATUS_FILE_PROBE // source file probe
.plus(GET_FILE_STATUS_FNFE) // dest must not exist
.plus(COPY_OP); // metadata read on copy
/**
* create(overwrite = true) does not look for the file existing.
*/
public static final OperationCost CREATE_FILE_OVERWRITE =
FILE_STATUS_DIR_PROBE;
/**
* create(overwrite = false) runs all the checks.
*/
public static final OperationCost CREATE_FILE_NO_OVERWRITE =
FILE_STATUS_ALL_PROBES;
/** Expected HEAD count. */
private final int head;
/** Expected LIST count. */
private final int list;
/**
* Constructor.
* @param head head requests.
* @param list list requests.
*/
public OperationCost(final int head,
final int list) {
this.head = head;
this.list = list;
}
/** Expected HEAD count. */
public int head() {
return head;
}
/** Expected LIST count. */
public int list() {
return list;
}
/**
* Add to create a new cost.
* @param that the other entry
* @return cost of the combined operation.
*/
public OperationCost plus(OperationCost that) {
return new OperationCost(
head + that.head,
list + that.list);
}
@Override
public String toString() {
return "OperationCost{" +
"head=" + head +
", list=" + list +
'}';
}
}

View File

@ -40,6 +40,7 @@ import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.util.ExitUtil;
import org.apache.hadoop.util.StringUtils;
import static org.apache.hadoop.fs.s3a.s3guard.S3GuardTool.E_BAD_STATE;
import static org.apache.hadoop.fs.s3a.s3guard.S3GuardTool.SUCCESS;
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
@ -288,4 +289,39 @@ public abstract class AbstractS3GuardToolTestBase extends AbstractS3ATestBase {
assertEquals("Command " + cmd + " failed\n"+ buf, 0, r);
}
@Test
public void testLandsatBucketMarkerAware() throws Throwable {
describe("verify that -markers aware succeeds");
run(S3GuardTool.BucketInfo.NAME,
"-" + S3GuardTool.BucketInfo.MARKERS_FLAG,
S3GuardTool.BucketInfo.MARKERS_AWARE,
getLandsatCSVFile());
}
@Test
public void testLandsatBucketMarkerDelete() throws Throwable {
describe("verify that -markers delete succeeds");
run(S3GuardTool.BucketInfo.NAME,
"-" + S3GuardTool.BucketInfo.MARKERS_FLAG, "delete",
getLandsatCSVFile());
}
@Test
public void testLandsatBucketMarkerKeepFails() throws Throwable {
describe("verify that -markers keep fails");
runToFailure(E_BAD_STATE,
S3GuardTool.BucketInfo.NAME,
"-" + S3GuardTool.BucketInfo.MARKERS_FLAG, "keep",
getLandsatCSVFile());
}
@Test
public void testLandsatBucketMarkerAuthFails() throws Throwable {
describe("verify that -markers authoritative fails");
runToFailure(E_BAD_STATE,
S3GuardTool.BucketInfo.NAME,
"-" + S3GuardTool.BucketInfo.MARKERS_FLAG, "authoritative",
getLandsatCSVFile());
}
}

View File

@ -0,0 +1,123 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3a.test.costs;
/**
* Declaration of the costs of head and list calls for various FS IO operations.
*/
public class HeadListCosts {
/** Head costs for getFileStatus() directory probe: {@value}. */
public static final int FILESTATUS_DIR_PROBE_H = 0;
/** List costs for getFileStatus() directory probe: {@value}. */
public static final int FILESTATUS_DIR_PROBE_L = 1;
/** Head cost getFileStatus() file probe only. */
public static final int FILESTATUS_FILE_PROBE_H = 1;
/** Liast cost getFileStatus() file probe only. */
public static final int FILESTATUS_FILE_PROBE_L = 0;
/** Head costs getFileStatus() no file or dir. */
public static final int GETFILESTATUS_FNFE_H = FILESTATUS_FILE_PROBE_H;
/** List costs for getFileStatus() on an empty path: {@value}. */
public static final int GETFILESTATUS_FNFE_L = FILESTATUS_DIR_PROBE_L;
/** getFileStatus() directory which is non-empty. */
public static final int GETFILESTATUS_DIR_H = FILESTATUS_FILE_PROBE_H;
/** List costs for getFileStatus() on a non-empty directory: {@value}. */
public static final int GETFILESTATUS_DIR_L = FILESTATUS_DIR_PROBE_L;
/** List costs for getFileStatus() on an non-empty directory: {@value}. */
public static final int GETFILESTATUS_EMPTY_DIR_L = FILESTATUS_DIR_PROBE_L;
/** List costs for getFileStatus() on an non-empty directory: {@value}. */
public static final int GETFILESTATUS_EMPTY_DIR_H = GETFILESTATUS_DIR_H;
/** getFileStatus() directory marker which exists. */
public static final int GETFILESTATUS_MARKER_H = FILESTATUS_FILE_PROBE_H;
/** getFileStatus() on a file which exists. */
public static final int GETFILESTATUS_SINGLE_FILE_H = FILESTATUS_FILE_PROBE_H;
public static final int GETFILESTATUS_SINGLE_FILE_L = FILESTATUS_FILE_PROBE_L;
public static final int DELETE_OBJECT_REQUEST = 1;
public static final int DELETE_MARKER_REQUEST = 1;
/** listLocatedStatus always does a list. */
public static final int LIST_LOCATED_STATUS_L = 1;
public static final int LIST_FILES_L = 1;
/**
* Cost of renaming a file to a different directory.
* <p></p>
* LIST on dest not found, look for dest dir, and then, at
* end of rename, whether a parent dir needs to be created.
*/
public static final int RENAME_SINGLE_FILE_RENAME_DIFFERENT_DIR_L =
GETFILESTATUS_FNFE_L + GETFILESTATUS_DIR_L * 2;
/**
* Cost of renaming a file to a different directory.
* <p></p>
* LIST on dest not found, look for dest dir, and then, at
* end of rename, whether a parent dir needs to be created.
*/
public static final int RENAME_SINGLE_FILE_RENAME_SAME_DIR_L =
GETFILESTATUS_FNFE_L;
/**
* Rename a single file.
* <p></p>
* source is found, dest not found, copy adds a
* metadata request.
*/
public static final int RENAME_SINGLE_FILE_RENAME_H =
FILESTATUS_FILE_PROBE_H + GETFILESTATUS_FNFE_H + 1;
/**
* Create file no overwrite head : {@value}.
*/
public static final int CREATE_FILE_OVERWRITE_H = 0;
/**
* Create file no overwrite list : {@value}.
*/
public static final int CREATE_FILE_OVERWRITE_L = FILESTATUS_DIR_PROBE_L;
/**
* Create file no overwrite head : {@value}.
*/
public static final int CREATE_FILE_NO_OVERWRITE_H = FILESTATUS_FILE_PROBE_H;
/**
* Create file no overwrite list : {@value}.
*/
public static final int CREATE_FILE_NO_OVERWRITE_L = FILESTATUS_DIR_PROBE_L;
}