From 35ad9b1dd279b769381ea1625d9bf776c309c5cb Mon Sep 17 00:00:00 2001 From: Steve Loughran Date: Mon, 18 Dec 2017 21:18:52 +0000 Subject: [PATCH] HADOOP-13974. S3Guard CLI to support list/purge of pending multipart commits. Contributed by Aaron Fabbri --- .../org/apache/hadoop/security/KDiag.java | 30 +- .../org/apache/hadoop/fs/s3a/Invoker.java | 7 +- .../apache/hadoop/fs/s3a/MultipartUtils.java | 214 +++++++++++++ .../apache/hadoop/fs/s3a/S3AFileSystem.java | 30 +- .../org/apache/hadoop/fs/s3a/S3AUtils.java | 3 +- .../hadoop/fs/s3a/WriteOperationHelper.java | 5 +- .../fs/s3a/commit/CommitOperations.java | 2 +- .../fs/s3a/commit/MagicCommitIntegration.java | 2 +- .../hadoop/fs/s3a/s3guard/S3GuardTool.java | 287 ++++++++++++++++-- .../site/markdown/tools/hadoop-aws/index.md | 7 +- .../site/markdown/tools/hadoop-aws/s3guard.md | 35 ++- .../hadoop/fs/s3a/ITestS3AMultipartUtils.java | 126 ++++++++ .../hadoop/fs/s3a/MockS3AFileSystem.java | 7 + .../hadoop/fs/s3a/MultipartTestUtils.java | 184 +++++++++++ .../apache/hadoop/fs/s3a/S3ATestUtils.java | 21 +- .../fs/s3a/commit/AbstractCommitITest.java | 3 +- .../magic/ITestS3AHugeMagicCommits.java | 2 +- .../fs/s3a/s3guard/ITestS3GuardToolLocal.java | 187 ++++++++++++ 18 files changed, 1082 insertions(+), 70 deletions(-) create mode 100644 hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/MultipartUtils.java create mode 100644 hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AMultipartUtils.java create mode 100644 hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/MultipartTestUtils.java diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/KDiag.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/KDiag.java index c8d0b334d36..b4e535cc002 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/KDiag.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/KDiag.java @@ -81,6 +81,11 @@ public class KDiag extends Configured implements Tool, Closeable { * variable. This is what kinit will use by default: {@value} */ public static final String KRB5_CCNAME = "KRB5CCNAME"; + /** + * Location of main kerberos configuration file as passed down via an + * environment variable. + */ + public static final String KRB5_CONFIG = "KRB5_CONFIG"; public static final String JAVA_SECURITY_KRB5_CONF = "java.security.krb5.conf"; public static final String JAVA_SECURITY_KRB5_REALM @@ -321,14 +326,15 @@ public class KDiag extends Configured implements Tool, Closeable { title("Environment Variables"); for (String env : new String[]{ - HADOOP_JAAS_DEBUG, - KRB5_CCNAME, - HADOOP_USER_NAME, - HADOOP_PROXY_USER, - HADOOP_TOKEN_FILE_LOCATION, - "HADOOP_SECURE_LOG", - "HADOOP_OPTS", - "HADOOP_CLIENT_OPTS", + HADOOP_JAAS_DEBUG, + KRB5_CCNAME, + KRB5_CONFIG, + HADOOP_USER_NAME, + HADOOP_PROXY_USER, + HADOOP_TOKEN_FILE_LOCATION, + "HADOOP_SECURE_LOG", + "HADOOP_OPTS", + "HADOOP_CLIENT_OPTS", }) { printEnv(env); } @@ -562,14 +568,14 @@ public class KDiag extends Configured implements Tool, Closeable { krbPath = jvmKrbPath; } - String krb5name = System.getenv(KRB5_CCNAME); + String krb5name = System.getenv(KRB5_CONFIG); if (krb5name != null) { println("Setting kerberos path from environment variable %s: \"%s\"", - KRB5_CCNAME, krb5name); + KRB5_CONFIG, krb5name); krbPath = krb5name; if (jvmKrbPath != null) { println("Warning - both %s and %s were set - %s takes priority", - JAVA_SECURITY_KRB5_CONF, KRB5_CCNAME, KRB5_CCNAME); + JAVA_SECURITY_KRB5_CONF, KRB5_CONFIG, KRB5_CONFIG); } } @@ -919,7 +925,7 @@ public class KDiag extends Configured implements Tool, Closeable { private void dump(File file) throws IOException { try (FileInputStream in = new FileInputStream(file)) { for (String line : IOUtils.readLines(in)) { - println(line); + println("%s", line); } } } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Invoker.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Invoker.java index 107a247a27e..875948eb858 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Invoker.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Invoker.java @@ -21,6 +21,7 @@ package org.apache.hadoop.fs.s3a; import java.io.IOException; import java.io.InterruptedIOException; import java.util.Optional; +import javax.annotation.Nullable; import com.amazonaws.AmazonClientException; import com.amazonaws.SdkBaseException; @@ -222,7 +223,7 @@ public class Invoker { */ @Retries.RetryTranslated public T retry(String action, - String path, + @Nullable String path, boolean idempotent, Operation operation) throws IOException { @@ -247,7 +248,7 @@ public class Invoker { @Retries.RetryTranslated public T retry( String action, - String path, + @Nullable String path, boolean idempotent, Retried retrying, Operation operation) @@ -413,7 +414,7 @@ public class Invoker { * @param path path (may be null or empty) * @return string for logs */ - private static String toDescription(String action, String path) { + private static String toDescription(String action, @Nullable String path) { return action + (StringUtils.isNotEmpty(path) ? (" on " + path) : ""); } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/MultipartUtils.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/MultipartUtils.java new file mode 100644 index 00000000000..6eb490f2df4 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/MultipartUtils.java @@ -0,0 +1,214 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a; + +import java.io.IOException; +import java.util.ListIterator; +import java.util.NoSuchElementException; +import javax.annotation.Nullable; + +import com.amazonaws.services.s3.AmazonS3; +import com.amazonaws.services.s3.model.ListMultipartUploadsRequest; +import com.amazonaws.services.s3.model.MultipartUpload; +import com.amazonaws.services.s3.model.MultipartUploadListing; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.fs.RemoteIterator; + + +/** + * MultipartUtils upload-specific functions for use by S3AFileSystem and Hadoop + * CLI. + */ +public final class MultipartUtils { + + private static final Logger LOG = + LoggerFactory.getLogger(MultipartUtils.class); + + /** Not instantiated. */ + private MultipartUtils() { } + + /** + * List outstanding multipart uploads. + * Package private: S3AFileSystem and tests are the users of this. + * @param s3 AmazonS3 client to use. + * @param bucketName name of S3 bucket to use. + * @param maxKeys maximum batch size to request at a time from S3. + * @param prefix optional key prefix to narrow search. If null then whole + * bucket will be searched. + * @return an iterator of matching uploads + */ + static MultipartUtils.UploadIterator listMultipartUploads(AmazonS3 s3, + Invoker invoker, String bucketName, int maxKeys, @Nullable String prefix) + throws IOException { + return new MultipartUtils.UploadIterator(s3, invoker, bucketName, maxKeys, + prefix); + } + + /** + * Simple RemoteIterator wrapper for AWS `listMultipartUpload` API. + * Iterates over batches of multipart upload metadata listings. + */ + static class ListingIterator implements + RemoteIterator { + + private final String bucketName; + private final String prefix; + private final int maxKeys; + private final AmazonS3 s3; + private final Invoker invoker; + + /** + * Most recent listing results. + */ + private MultipartUploadListing listing; + + /** + * Indicator that this is the first listing. + */ + private boolean firstListing = true; + + private int listCount = 1; + + ListingIterator(AmazonS3 s3, Invoker invoker, String bucketName, + int maxKeys, @Nullable String prefix) throws IOException { + this.s3 = s3; + this.bucketName = bucketName; + this.maxKeys = maxKeys; + this.prefix = prefix; + this.invoker = invoker; + + requestNextBatch(); + } + + /** + * Iterator has data if it is either is the initial iteration, or + * the last listing obtained was incomplete. + * @throws IOException not thrown by this implementation. + */ + @Override + public boolean hasNext() throws IOException { + if (listing == null) { + // shouldn't happen, but don't trust AWS SDK + return false; + } else { + return firstListing || listing.isTruncated(); + } + } + + /** + * Get next listing. First call, this returns initial set (possibly + * empty) obtained from S3. Subsequent calls my block on I/O or fail. + * @return next upload listing. + * @throws IOException if S3 operation fails. + * @throws NoSuchElementException if there are no more uploads. + */ + @Override + @Retries.RetryTranslated + public MultipartUploadListing next() throws IOException { + if (firstListing) { + firstListing = false; + } else { + if (listing == null || !listing.isTruncated()) { + // nothing more to request: fail. + throw new NoSuchElementException("No more uploads under " + prefix); + } + // need to request a new set of objects. + requestNextBatch(); + } + return listing; + } + + @Override + public String toString() { + return "Upload iterator: prefix " + prefix + "; list count " + + listCount + "; isTruncated=" + listing.isTruncated(); + } + + @Retries.RetryTranslated + private void requestNextBatch() throws IOException { + ListMultipartUploadsRequest req = + new ListMultipartUploadsRequest(bucketName); + if (prefix != null) { + req.setPrefix(prefix); + } + if (!firstListing) { + req.setKeyMarker(listing.getNextKeyMarker()); + req.setUploadIdMarker(listing.getNextUploadIdMarker()); + } + req.setMaxUploads(listCount); + + LOG.debug("[{}], Requesting next {} uploads prefix {}, " + + "next key {}, next upload id {}", listCount, maxKeys, prefix, + req.getKeyMarker(), req.getUploadIdMarker()); + listCount++; + + listing = invoker.retry("listMultipartUploads", prefix, true, + () -> s3.listMultipartUploads(req)); + LOG.debug("New listing state: {}", this); + } + } + + /** + * Iterator over multipart uploads. Similar to + * {@link org.apache.hadoop.fs.s3a.Listing.FileStatusListingIterator}, but + * iterates over pending uploads instead of existing objects. + */ + public static class UploadIterator + implements RemoteIterator { + + private ListingIterator lister; + /** Current listing: the last upload listing we fetched. */ + private MultipartUploadListing listing; + /** Iterator over the current listing. */ + private ListIterator batchIterator; + + @Retries.RetryTranslated + public UploadIterator(AmazonS3 s3, Invoker invoker, String bucketName, + int maxKeys, @Nullable String prefix) + throws IOException { + + lister = new ListingIterator(s3, invoker, bucketName, maxKeys, prefix); + requestNextBatch(); + } + + @Override + public boolean hasNext() throws IOException { + return (batchIterator.hasNext() || requestNextBatch()); + } + + @Override + public MultipartUpload next() throws IOException { + if (!hasNext()) { + throw new NoSuchElementException(); + } + return batchIterator.next(); + } + + private boolean requestNextBatch() throws IOException { + if (lister.hasNext()) { + listing = lister.next(); + batchIterator = listing.getMultipartUploads().listIterator(); + return batchIterator.hasNext(); + } + return false; + } + } +} diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java index e9277586e4b..9431f1790de 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java @@ -43,6 +43,7 @@ import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import javax.annotation.Nullable; import com.amazonaws.AmazonClientException; import com.amazonaws.AmazonServiceException; @@ -194,6 +195,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities { private String blockOutputBuffer; private S3ADataBlocks.BlockFactory blockFactory; private int blockOutputActiveBlocks; + private WriteOperationHelper writeHelper; private boolean useListV1; private MagicCommitIntegration committerIntegration; @@ -247,6 +249,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities { s3 = ReflectionUtils.newInstance(s3ClientFactoryClass, conf) .createS3Client(name); invoker = new Invoker(new S3ARetryPolicy(getConf()), onRetry); + writeHelper = new WriteOperationHelper(this, getConf()); maxKeys = intOption(conf, MAX_PAGING_KEYS, DEFAULT_MAX_PAGING_KEYS, 1); listing = new Listing(this); @@ -753,13 +756,13 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities { partSize, blockFactory, instrumentation.newOutputStreamStatistics(statistics), - createWriteOperationHelper(), + getWriteOperationHelper(), putTracker), null); } /** - * Create a new {@code WriteOperationHelper} instance. + * Get a {@code WriteOperationHelper} instance. * * This class permits other low-level operations against the store. * It is unstable and @@ -768,8 +771,8 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities { * @return a new helper. */ @InterfaceAudience.Private - public WriteOperationHelper createWriteOperationHelper() { - return new WriteOperationHelper(this); + public WriteOperationHelper getWriteOperationHelper() { + return writeHelper; } /** @@ -3078,8 +3081,26 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities { : null); } + /** + * List any pending multipart uploads whose keys begin with prefix, using + * an iterator that can handle an unlimited number of entries. + * See {@link #listMultipartUploads(String)} for a non-iterator version of + * this. + * + * @param prefix optional key prefix to search + * @return Iterator over multipart uploads. + * @throws IOException on failure + */ + public MultipartUtils.UploadIterator listUploads(@Nullable String prefix) + throws IOException { + return MultipartUtils.listMultipartUploads(s3, invoker, bucket, maxKeys, + prefix); + } + /** * Listing all multipart uploads; limited to the first few hundred. + * See {@link #listUploads(String)} for an iterator-based version that does + * not limit the number of entries returned. * Retry policy: retry, translated. * @return a listing of multipart uploads. * @param prefix prefix to scan for, "" for none @@ -3166,5 +3187,4 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities { return false; } } - } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java index 23ceafa0555..245721796cd 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java @@ -51,6 +51,7 @@ import com.google.common.collect.Lists; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.annotation.Nullable; import java.io.EOFException; import java.io.FileNotFoundException; import java.io.IOException; @@ -149,7 +150,7 @@ public final class S3AUtils { * @return an IOE which wraps the caught exception. */ @SuppressWarnings("ThrowableInstanceNeverThrown") - public static IOException translateException(String operation, + public static IOException translateException(@Nullable String operation, String path, SdkBaseException exception) { String message = String.format("%s%s: %s", diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperationHelper.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperationHelper.java index 477200e8104..c611b941513 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperationHelper.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperationHelper.java @@ -38,6 +38,7 @@ import com.amazonaws.services.s3.model.UploadPartRequest; import com.amazonaws.services.s3.model.UploadPartResult; import com.amazonaws.services.s3.transfer.model.UploadResult; import com.google.common.base.Preconditions; +import org.apache.hadoop.conf.Configuration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -83,9 +84,9 @@ public class WriteOperationHelper { * @param owner owner FS creating the helper * */ - protected WriteOperationHelper(S3AFileSystem owner) { + protected WriteOperationHelper(S3AFileSystem owner, Configuration conf) { this.owner = owner; - this.invoker = new Invoker(new S3ARetryPolicy(owner.getConf()), + this.invoker = new Invoker(new S3ARetryPolicy(conf), this::operationRetried); } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/CommitOperations.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/CommitOperations.java index 1338d2e0803..f6e12f47b3d 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/CommitOperations.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/CommitOperations.java @@ -101,7 +101,7 @@ public class CommitOperations { Preconditions.checkArgument(fs != null, "null fs"); this.fs = fs; statistics = fs.newCommitterStatistics(); - writeOperations = fs.createWriteOperationHelper(); + writeOperations = fs.getWriteOperationHelper(); } /** diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/MagicCommitIntegration.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/MagicCommitIntegration.java index a07b5c954a7..7f9dadf06f3 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/MagicCommitIntegration.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/MagicCommitIntegration.java @@ -101,7 +101,7 @@ public class MagicCommitIntegration { key, destKey, pendingsetPath, - owner.createWriteOperationHelper()); + owner.getWriteOperationHelper()); LOG.debug("Created {}", tracker); } else { LOG.warn("File being created has a \"magic\" path, but the filesystem" diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/S3GuardTool.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/S3GuardTool.java index ace043be396..e7640211594 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/S3GuardTool.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/S3GuardTool.java @@ -23,14 +23,17 @@ import java.io.IOException; import java.io.PrintStream; import java.net.URI; import java.net.URISyntaxException; +import java.util.Date; import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Locale; import java.util.Map; +import java.util.Scanner; import java.util.Set; import java.util.concurrent.TimeUnit; +import com.amazonaws.services.s3.model.MultipartUpload; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import org.slf4j.Logger; @@ -44,6 +47,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.RemoteIterator; +import org.apache.hadoop.fs.s3a.MultipartUtils; import org.apache.hadoop.fs.s3a.S3AFileStatus; import org.apache.hadoop.fs.s3a.S3AFileSystem; import org.apache.hadoop.fs.s3a.S3AUtils; @@ -55,6 +59,7 @@ import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import static org.apache.hadoop.fs.s3a.Constants.*; +import static org.apache.hadoop.fs.s3a.Invoker.LOG_EVENT; import static org.apache.hadoop.service.launcher.LauncherExitCodes.*; /** @@ -79,6 +84,7 @@ public abstract class S3GuardTool extends Configured implements Tool { "\t" + Destroy.NAME + " - " + Destroy.PURPOSE + "\n" + "\t" + Import.NAME + " - " + Import.PURPOSE + "\n" + "\t" + BucketInfo.NAME + " - " + BucketInfo.PURPOSE + "\n" + + "\t" + Uploads.NAME + " - " + Uploads.PURPOSE + "\n" + "\t" + Diff.NAME + " - " + Diff.PURPOSE + "\n" + "\t" + Prune.NAME + " - " + Prune.PURPOSE + "\n" + "\t" + SetCapacity.NAME + " - " +SetCapacity.PURPOSE + "\n"; @@ -100,10 +106,14 @@ public abstract class S3GuardTool extends Configured implements Tool { private final CommandFormat commandFormat; public static final String META_FLAG = "meta"; + + // These are common to prune, upload subcommands. public static final String DAYS_FLAG = "days"; public static final String HOURS_FLAG = "hours"; public static final String MINUTES_FLAG = "minutes"; public static final String SECONDS_FLAG = "seconds"; + public static final String AGE_OPTIONS_USAGE = "[-days ] " + + "[-hours ] [-minutes ] [-seconds ]"; public static final String REGION_FLAG = "region"; public static final String READ_FLAG = "read"; @@ -177,6 +187,36 @@ public abstract class S3GuardTool extends Configured implements Tool { "config, or S3 bucket"); } + private long getDeltaComponent(TimeUnit unit, String arg) { + String raw = getCommandFormat().getOptValue(arg); + if (raw == null || raw.isEmpty()) { + return 0; + } + Long parsed = Long.parseLong(raw); + return unit.toMillis(parsed); + } + + /** + * Convert all age options supplied to total milliseconds of time. + * @return Sum of all age options, or zero if none were given. + */ + long ageOptionsToMsec() { + long cliDelta = 0; + cliDelta += getDeltaComponent(TimeUnit.DAYS, DAYS_FLAG); + cliDelta += getDeltaComponent(TimeUnit.HOURS, HOURS_FLAG); + cliDelta += getDeltaComponent(TimeUnit.MINUTES, MINUTES_FLAG); + cliDelta += getDeltaComponent(TimeUnit.SECONDS, SECONDS_FLAG); + return cliDelta; + } + + protected void addAgeOptions() { + CommandFormat format = getCommandFormat(); + format.addOptionWithValue(DAYS_FLAG); + format.addOptionWithValue(HOURS_FLAG); + format.addOptionWithValue(MINUTES_FLAG); + format.addOptionWithValue(SECONDS_FLAG); + } + /** * Parse metadata store from command line option or HDFS configuration. * @@ -867,7 +907,8 @@ public abstract class S3GuardTool extends Configured implements Tool { "Common options:\n" + " -" + META_FLAG + " URL - Metadata repository details " + "(implementation-specific)\n" + - "\n" + + "Age options. Any combination of these integer-valued options:\n" + + AGE_OPTIONS_USAGE + "\n" + "Amazon DynamoDB-specific options:\n" + " -" + REGION_FLAG + " REGION - Service region for connections\n" + "\n" + @@ -877,12 +918,7 @@ public abstract class S3GuardTool extends Configured implements Tool { Prune(Configuration conf) { super(conf); - - CommandFormat format = getCommandFormat(); - format.addOptionWithValue(DAYS_FLAG); - format.addOptionWithValue(HOURS_FLAG); - format.addOptionWithValue(MINUTES_FLAG); - format.addOptionWithValue(SECONDS_FLAG); + addAgeOptions(); } @VisibleForTesting @@ -901,15 +937,6 @@ public abstract class S3GuardTool extends Configured implements Tool { return USAGE; } - private long getDeltaComponent(TimeUnit unit, String arg) { - String raw = getCommandFormat().getOptValue(arg); - if (raw == null || raw.isEmpty()) { - return 0; - } - Long parsed = Long.parseLong(raw); - return unit.toMillis(parsed); - } - public int run(String[] args, PrintStream out) throws InterruptedException, IOException { List paths = parseArgs(args); @@ -924,11 +951,7 @@ public abstract class S3GuardTool extends Configured implements Tool { Configuration conf = getConf(); long confDelta = conf.getLong(S3GUARD_CLI_PRUNE_AGE, 0); - long cliDelta = 0; - cliDelta += getDeltaComponent(TimeUnit.DAYS, "days"); - cliDelta += getDeltaComponent(TimeUnit.HOURS, "hours"); - cliDelta += getDeltaComponent(TimeUnit.MINUTES, "minutes"); - cliDelta += getDeltaComponent(TimeUnit.SECONDS, "seconds"); + long cliDelta = ageOptionsToMsec(); if (confDelta <= 0 && cliDelta <= 0) { errorln("You must specify a positive age for metadata to prune."); @@ -1080,6 +1103,214 @@ public abstract class S3GuardTool extends Configured implements Tool { } + /** + * Command to list / abort pending multipart uploads. + */ + static class Uploads extends S3GuardTool { + public static final String NAME = "uploads"; + public static final String ABORT = "abort"; + public static final String LIST = "list"; + public static final String EXPECT = "expect"; + public static final String VERBOSE = "verbose"; + public static final String FORCE = "force"; + + public static final String PURPOSE = "list or abort pending " + + "multipart uploads"; + private static final String USAGE = NAME + " [OPTIONS] " + + "s3a://BUCKET[/path]\n" + + "\t" + PURPOSE + "\n\n" + + "Common options:\n" + + " (-" + LIST + " | -" + EXPECT +" | -" + ABORT + + ") [-" + VERBOSE +"] " + + "[] [-force]\n" + + "\t - Under given path, list or delete all uploads," + + " or only those \n" + + "older than specified by \n" + + " are any combination of the integer-valued options:\n" + + "\t" + AGE_OPTIONS_USAGE + "\n" + + "-" + EXPECT + " is similar to list, except no output is printed,\n" + + "\tbut the exit code will be an error if the provided number\n" + + "\tis different that the number of uploads found by the command.\n" + + "-" + FORCE + " option prevents the \"Are you sure\" prompt when\n" + + "\tusing -" + ABORT; + + /** Constant used for output and parsed by tests. */ + public static final String TOTAL = "Total"; + + /** Runs in one of three modes. */ + private enum Mode { LIST, EXPECT, ABORT }; + private Mode mode = null; + + /** For Mode == EXPECT, expected listing size. */ + private int expectedCount; + + /** List/abort uploads older than this many milliseconds. */ + private long ageMsec = 0; + + /** Verbose output flag. */ + private boolean verbose = false; + + /** Whether to delete with out "are you sure" prompt. */ + private boolean force = false; + + /** Path prefix to use when searching multipart uploads. */ + private String prefix; + + Uploads(Configuration conf) { + super(conf, ABORT, LIST, VERBOSE, FORCE); + addAgeOptions(); + getCommandFormat().addOptionWithValue(EXPECT); + } + + @Override + String getName() { + return NAME; + } + + @Override + public String getUsage() { + return USAGE; + } + + public int run(String[] args, PrintStream out) + throws InterruptedException, IOException { + List paths = parseArgs(args); + if (paths.isEmpty()) { + errorln(getUsage()); + throw invalidArgs("No options specified"); + } + processArgs(paths, out); + promptBeforeAbort(out); + processUploads(out); + + out.flush(); + return SUCCESS; + } + + private void promptBeforeAbort(PrintStream out) throws IOException { + if (mode != Mode.ABORT || force) { + return; + } + Scanner scanner = new Scanner(System.in, "UTF-8"); + out.println("Are you sure you want to delete any pending " + + "uploads? (yes/no) >"); + String response = scanner.nextLine(); + if (!"yes".equalsIgnoreCase(response)) { + throw S3GuardTool.userAborted("User did not answer yes, quitting."); + } + } + + private void processUploads(PrintStream out) throws IOException { + MultipartUtils.UploadIterator uploads; + uploads = getFilesystem().listUploads(prefix); + + int count = 0; + while (uploads.hasNext()) { + MultipartUpload upload = uploads.next(); + if (!olderThan(upload, ageMsec)) { + continue; + } + count++; + if (mode == Mode.ABORT || mode == Mode.LIST || verbose) { + println(out, "%s%s %s", mode == Mode.ABORT ? "Deleting: " : "", + upload.getKey(), upload.getUploadId()); + } + if (mode == Mode.ABORT) { + getFilesystem().getWriteOperationHelper() + .abortMultipartUpload(upload.getKey(), upload.getUploadId(), + LOG_EVENT); + } + } + if (mode != Mode.EXPECT || verbose) { + println(out, "%s %d uploads %s.", TOTAL, count, + mode == Mode.ABORT ? "deleted" : "found"); + } + if (mode == Mode.EXPECT) { + if (count != expectedCount) { + throw badState("Expected %d uploads, found %d", expectedCount, count); + } + } + } + + /** + * Check if upload is at least as old as given age. + * @param u upload to check + * @param msec age in milliseconds + * @return true iff u was created at least age milliseconds ago. + */ + private boolean olderThan(MultipartUpload u, long msec) { + Date ageDate = new Date(System.currentTimeMillis() - msec); + return ageDate.compareTo(u.getInitiated()) >= 0; + } + + private void processArgs(List args, PrintStream out) + throws IOException { + CommandFormat commands = getCommandFormat(); + String err = "Can only specify one of -" + LIST + ", " + + " -" + ABORT + ", and " + EXPECT; + + // Three mutually-exclusive options + if (commands.getOpt(LIST)) { + mode = Mode.LIST; + } + if (commands.getOpt(ABORT)) { + if (mode != null) { + throw invalidArgs(err); + } + mode = Mode.ABORT; + } + + String expectVal = commands.getOptValue(EXPECT); + if (expectVal != null) { + if (mode != null) { + throw invalidArgs(err); + } + mode = Mode.EXPECT; + expectedCount = Integer.parseInt(expectVal); + } + + // Default to list + if (mode == null) { + vprintln(out, "No mode specified, defaulting to -" + LIST); + mode = Mode.LIST; + } + + // Other flags + if (commands.getOpt(VERBOSE)) { + verbose = true; + } + if (commands.getOpt(FORCE)) { + force = true; + } + ageMsec = ageOptionsToMsec(); + + String s3Path = args.get(0); + URI uri = S3GuardTool.toUri(s3Path); + prefix = uri.getPath(); + if (prefix.length() > 0) { + prefix = prefix.substring(1); + } + vprintln(out, "Command: %s, age %d msec, path %s (prefix \"%s\")", + mode.name(), ageMsec, s3Path, prefix); + + initS3AFileSystem(s3Path); + } + + /** + * If verbose flag is set, print a formatted string followed by a newline + * to the output stream. + * @param out destination + * @param format format string + * @param args optional arguments + */ + private void vprintln(PrintStream out, String format, Object... + args) { + if (verbose) { + out.println(String.format(format, args)); + } + } + } + private static S3GuardTool command; /** @@ -1182,6 +1413,17 @@ public abstract class S3GuardTool extends Configured implements Tool { String.format(format, args)); } + /** + * Build the exception to raise on user-aborted action. + * @param format string format + * @param args optional arguments for the string + * @return a new exception to throw + */ + protected static ExitUtil.ExitException userAborted( + String format, Object...args) { + return new ExitUtil.ExitException(ERROR, String.format(format, args)); + } + /** * Execute the command with the given arguments. * @@ -1224,6 +1466,9 @@ public abstract class S3GuardTool extends Configured implements Tool { case SetCapacity.NAME: command = new SetCapacity(conf); break; + case Uploads.NAME: + command = new Uploads(conf); + break; default: printHelp(); throw new ExitUtil.ExitException(E_USAGE, diff --git a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md index a8d2e483c94..fbcd54ababa 100644 --- a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md +++ b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md @@ -1490,8 +1490,13 @@ from VMs running on EC2. ``` -### Cleaning up after partial Upload Failures: `fs.s3a.multipart.purge` +### Cleaning up after partial Upload Failures +There are two mechanisms for cleaning up after leftover multipart +uploads: +- Hadoop s3guard CLI commands for listing and deleting uploads by their +age. Doumented in the [S3Guard](./s3guard.html) section. +- The configuration parameter `fs.s3a.multipart.purge`, covered below. If an large stream writeoperation is interrupted, there may be intermediate partitions uploaded to S3 —data which will be billed for. diff --git a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/s3guard.md b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/s3guard.md index e2cb5499faa..1050f8a16ce 100644 --- a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/s3guard.md +++ b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/s3guard.md @@ -515,10 +515,43 @@ hadoop s3guard bucket-info -guarded -auth s3a://landsat-pds Require the bucket to be using S3Guard in authoritative mode. This will normally fail against this specific bucket. +### List or Delete Leftover Multipart Uploads: `s3guard uploads` + +Lists or deletes all pending (uncompleted) multipart uploads older than +given age. + +```bash +hadoop s3guard uploads (-list | -abort | -expect ) [-verbose] \ + [-days ] [-hours ] [-minutes ] [-seconds ] \ + [-force] s3a://bucket/prefix +``` + +The command lists or deletes all multipart uploads which are older than +the given age, and that match the prefix supplied, if any. + +For example, to delete all uncompleted multipart uploads older than two +days in the folder at `s3a://my-bucket/path/to/stuff`, use the following +command: + +```bash +hadoop s3guard uploads -abort -days 2 s3a://my-bucket/path/to/stuff +``` + +We recommend running with `-list` first to confirm the parts shown +are those that you wish to delete. Note that the command will prompt +you with a "Are you sure?" prompt unless you specify the `-force` +option. This is to safeguard against accidental deletion of data, which +is especially risky without a long age parameter as it can affect +in-fight uploads. + +The `-expect` option is similar to `-list`, except it is silent by +default, and terminates with a success or failure exit code depending +on whether or not the supplied number matches the number of uploads +found that match the given options (path, age). + ### Delete a table: `s3guard destroy` - Deletes a metadata store. With DynamoDB as the store, this means the specific DynamoDB table use to store the metadata. diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AMultipartUtils.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AMultipartUtils.java new file mode 100644 index 00000000000..4746ad5588b --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AMultipartUtils.java @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a; + +import com.amazonaws.services.s3.model.MultipartUpload; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.junit.Test; + +import java.io.IOException; +import java.util.HashSet; +import java.util.Set; + + +/** + * Tests for {@link MultipartUtils}. + */ +public class ITestS3AMultipartUtils extends AbstractS3ATestBase { + + private static final int UPLOAD_LEN = 1024; + private static final String PART_FILENAME_BASE = "pending-part"; + private static final int LIST_BATCH_SIZE = 2; + private static final int NUM_KEYS = 5; + + + @Override + protected Configuration createConfiguration() { + Configuration conf = super.createConfiguration(); + S3ATestUtils.disableFilesystemCaching(conf); + // Forces listings to come back in multiple batches to test that part of + // the iterators. + conf.setInt(Constants.MAX_PAGING_KEYS, LIST_BATCH_SIZE); + return conf; + } + + /** + * Main test case for upload part listing and iterator paging. + * @throws Exception on failure. + */ + @Test + public void testListMultipartUploads() throws Exception { + S3AFileSystem fs = getFileSystem(); + Set keySet = new HashSet<>(); + try { + // 1. Create NUM_KEYS pending upload parts + for (int i = 0; i < NUM_KEYS; i++) { + Path filePath = getPartFilename(i); + String key = fs.pathToKey(filePath); + describe("creating upload part with key %s", key); + // create a multipart upload + MultipartTestUtils.IdKey idKey = MultipartTestUtils + .createPartUpload(fs, key, UPLOAD_LEN, + 1); + keySet.add(idKey); + } + + // 2. Verify all uploads are found listing by prefix + describe("Verifying upload list by prefix"); + MultipartUtils.UploadIterator uploads = fs.listUploads(getPartPrefix(fs)); + assertUploadsPresent(uploads, keySet); + + // 3. Verify all uploads are found listing without prefix + describe("Verifying list all uploads"); + uploads = fs.listUploads(null); + assertUploadsPresent(uploads, keySet); + + } finally { + // 4. Delete all uploads we created + MultipartTestUtils.cleanupParts(fs, keySet); + } + } + + /** + * Assert that all provided multipart uploads are contained in the upload + * iterator's results. + * @param list upload iterator + * @param ourUploads set up uploads that should be present + * @throws IOException on I/O error + */ + private void assertUploadsPresent(MultipartUtils.UploadIterator list, + Set ourUploads) throws IOException { + + // Don't modify passed-in set, use copy. + Set uploads = new HashSet<>(ourUploads); + while (list.hasNext()) { + MultipartTestUtils.IdKey listing = toIdKey(list.next()); + if (uploads.contains(listing)) { + LOG.debug("Matched: {},{}", listing.getKey(), listing.getUploadId()); + uploads.remove(listing); + } else { + LOG.debug("Not our upload {},{}", listing.getKey(), + listing.getUploadId()); + } + } + assertTrue("Not all our uploads were listed", uploads.isEmpty()); + } + + private MultipartTestUtils.IdKey toIdKey(MultipartUpload mu) { + return new MultipartTestUtils.IdKey(mu.getKey(), mu.getUploadId()); + } + + private Path getPartFilename(int index) throws IOException { + return path(String.format("%s-%d", PART_FILENAME_BASE, index)); + } + + private String getPartPrefix(S3AFileSystem fs) throws IOException { + return fs.pathToKey(path("blah").getParent()); + } + +} \ No newline at end of file diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/MockS3AFileSystem.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/MockS3AFileSystem.java index 55e3e3789b3..49525802a9c 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/MockS3AFileSystem.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/MockS3AFileSystem.java @@ -78,6 +78,7 @@ public class MockS3AFileSystem extends S3AFileSystem { private final S3AInstrumentation instrumentation = new S3AInstrumentation(FS_URI); private Configuration conf; + private WriteOperationHelper writeHelper; public MockS3AFileSystem(S3AFileSystem mock, Pair outcome) { @@ -125,6 +126,12 @@ public class MockS3AFileSystem extends S3AFileSystem { public void initialize(URI name, Configuration originalConf) throws IOException { conf = originalConf; + writeHelper = new WriteOperationHelper(this, conf); + } + + @Override + public WriteOperationHelper getWriteOperationHelper() { + return writeHelper; } @Override diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/MultipartTestUtils.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/MultipartTestUtils.java new file mode 100644 index 00000000000..8be3ff7dfda --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/MultipartTestUtils.java @@ -0,0 +1,184 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a; + +import com.amazonaws.services.s3.model.MultipartUpload; +import com.amazonaws.services.s3.model.PartETag; +import com.amazonaws.services.s3.model.UploadPartRequest; +import org.apache.hadoop.fs.Path; +import org.junit.Assert; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.List; +import java.util.Objects; +import java.util.Set; +import java.util.stream.Collectors; + +import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset; +import static org.apache.hadoop.fs.s3a.Invoker.LOG_EVENT; + +/** + * Utilities for S3A multipart upload tests. + */ +public final class MultipartTestUtils { + private static final Logger LOG = LoggerFactory.getLogger( + MultipartTestUtils.class); + + /** Not instantiated. */ + private MultipartTestUtils() { } + + /** + * Clean up all provided uploads. + * @param keySet set of uploads to abort + */ + static void cleanupParts(S3AFileSystem fs, Set keySet) { + boolean anyFailure = false; + for (IdKey ik : keySet) { + try { + LOG.debug("aborting upload id {}", ik.getUploadId()); + fs.abortMultipartUpload(ik.getKey(), ik.getUploadId()); + } catch (Exception e) { + LOG.error(String.format("Failure aborting upload %s, continuing.", + ik.getKey()), e); + anyFailure = true; + } + } + Assert.assertFalse("Failure aborting multipart upload(s), see log.", + anyFailure); + } + + public static IdKey createPartUpload(S3AFileSystem fs, String key, int len, + int partNo) throws IOException { + WriteOperationHelper writeHelper = fs.getWriteOperationHelper(); + byte[] data = dataset(len, 'a', 'z'); + InputStream in = new ByteArrayInputStream(data); + String uploadId = writeHelper.initiateMultiPartUpload(key); + UploadPartRequest req = writeHelper.newUploadPartRequest(key, uploadId, + partNo, len, in, null, 0L); + PartETag partEtag = fs.uploadPart(req).getPartETag(); + LOG.debug("uploaded part etag {}, upid {}", partEtag.getETag(), uploadId); + return new IdKey(key, uploadId); + } + + /** Delete any uploads under given path (recursive). Silent on failure. */ + public static void clearAnyUploads(S3AFileSystem fs, Path path) { + try { + String key = fs.pathToKey(path); + MultipartUtils.UploadIterator uploads = fs.listUploads(key); + while (uploads.hasNext()) { + MultipartUpload upload = uploads.next(); + fs.getWriteOperationHelper().abortMultipartUpload(upload.getKey(), + upload.getUploadId(), LOG_EVENT); + LOG.debug("Cleaning up upload: {} {}", upload.getKey(), + truncatedUploadId(upload.getUploadId())); + } + } catch (IOException ioe) { + LOG.info("Ignoring exception: ", ioe); + } + } + + /** Assert that there are not any upload parts at given path. */ + public static void assertNoUploadsAt(S3AFileSystem fs, Path path) throws + Exception { + String key = fs.pathToKey(path); + MultipartUtils.UploadIterator uploads = fs.listUploads(key); + while (uploads.hasNext()) { + MultipartUpload upload = uploads.next(); + Assert.fail("Found unexpected upload " + upload.getKey() + " " + + truncatedUploadId(upload.getUploadId())); + } + } + + /** Get number of part uploads under given path. */ + public static int countUploadsAt(S3AFileSystem fs, Path path) throws + IOException { + String key = fs.pathToKey(path); + MultipartUtils.UploadIterator uploads = fs.listUploads(key); + int count = 0; + while (uploads.hasNext()) { + MultipartUpload upload = uploads.next(); + count++; + } + return count; + } + + /** + * Get a list of all pending uploads under a prefix, one which can be printed. + * @param prefix prefix to look under + * @return possibly empty list + * @throws IOException IO failure. + */ + public static List listMultipartUploads(S3AFileSystem fs, + String prefix) throws IOException { + + return fs + .listMultipartUploads(prefix).stream() + .map(upload -> String.format("Upload to %s with ID %s; initiated %s", + upload.getKey(), + upload.getUploadId(), + S3ATestUtils.LISTING_FORMAT.format(upload.getInitiated()))) + .collect(Collectors.toList()); + } + + + private static String truncatedUploadId(String fullId) { + return fullId.substring(0, 12) + " ..."; + } + + /** Struct of object key, upload ID. */ + static class IdKey { + private String key; + private String uploadId; + + IdKey(String key, String uploadId) { + this.key = key; + this.uploadId = uploadId; + } + + public String getKey() { + return key; + } + + public String getUploadId() { + return uploadId; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + IdKey key1 = (IdKey) o; + return Objects.equals(key, key1.key) && + Objects.equals(uploadId, key1.uploadId); + } + + @Override + public int hashCode() { + return Objects.hash(key, uploadId); + } + } +} diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java index 773c25af635..f4e7c689889 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java @@ -45,7 +45,6 @@ import java.net.URISyntaxException; import java.text.DateFormat; import java.text.SimpleDateFormat; import java.util.List; -import java.util.stream.Collectors; import static org.apache.hadoop.fs.contract.ContractTestUtils.skip; import static org.apache.hadoop.fs.s3a.InconsistentAmazonS3Client.*; @@ -822,27 +821,9 @@ public final class S3ATestUtils { /** * Date format used for mapping upload initiation time to human string. */ - private static final DateFormat LISTING_FORMAT = new SimpleDateFormat( + public static final DateFormat LISTING_FORMAT = new SimpleDateFormat( "yyyy-MM-dd HH:mm:ss"); - /** - * Get a list of all pending uploads under a prefix, one which can be printed. - * @param prefix prefix to look under - * @return possibly empty list - * @throws IOException IO failure. - */ - public static List listMultipartUploads(S3AFileSystem fs, - String prefix) throws IOException { - - return fs - .listMultipartUploads(prefix).stream() - .map(upload -> String.format("Upload to %s with ID %s; initiated %s", - upload.getKey(), - upload.getUploadId(), - LISTING_FORMAT.format(upload.getInitiated()))) - .collect(Collectors.toList()); - } - /** * Skip a test if the FS isn't marked as supporting magic commits. * @param fs filesystem diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractCommitITest.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractCommitITest.java index 267d4df1e63..04676db0b88 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractCommitITest.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractCommitITest.java @@ -47,6 +47,7 @@ import org.apache.hadoop.mapreduce.v2.api.records.JobId; import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils; import static org.apache.hadoop.fs.s3a.Constants.*; +import static org.apache.hadoop.fs.s3a.MultipartTestUtils.listMultipartUploads; import static org.apache.hadoop.fs.s3a.S3ATestUtils.*; import static org.apache.hadoop.fs.s3a.commit.CommitConstants.*; @@ -247,7 +248,7 @@ public abstract class AbstractCommitITest extends AbstractS3ATestBase { S3AFileSystem fs = getFileSystem(); if (fs != null && path != null) { String key = fs.pathToKey(path); - WriteOperationHelper writeOps = fs.createWriteOperationHelper(); + WriteOperationHelper writeOps = fs.getWriteOperationHelper(); int count = writeOps.abortMultipartUploadsUnderPath(key); if (count > 0) { log().info("Multipart uploads deleted: {}", count); diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/magic/ITestS3AHugeMagicCommits.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/magic/ITestS3AHugeMagicCommits.java index e3a295ba589..072295962ce 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/magic/ITestS3AHugeMagicCommits.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/magic/ITestS3AHugeMagicCommits.java @@ -40,8 +40,8 @@ import org.apache.hadoop.fs.s3a.commit.files.PendingSet; import org.apache.hadoop.fs.s3a.commit.files.SinglePendingCommit; import org.apache.hadoop.fs.s3a.scale.AbstractSTestS3AHugeFiles; +import static org.apache.hadoop.fs.s3a.MultipartTestUtils.listMultipartUploads; import static org.apache.hadoop.fs.s3a.commit.CommitConstants.*; -import static org.apache.hadoop.fs.s3a.S3ATestUtils.*; /** diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/ITestS3GuardToolLocal.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/ITestS3GuardToolLocal.java index 43cbe93330a..2178f47f0e5 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/ITestS3GuardToolLocal.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/ITestS3GuardToolLocal.java @@ -24,10 +24,15 @@ import java.io.ByteArrayOutputStream; import java.io.FileNotFoundException; import java.io.IOException; import java.io.InputStreamReader; +import java.util.ArrayList; +import java.util.Arrays; import java.util.HashSet; +import java.util.List; import java.util.Set; import java.util.concurrent.Callable; +import org.apache.hadoop.test.LambdaTestUtils; +import org.apache.hadoop.util.StringUtils; import org.junit.Test; import org.apache.hadoop.fs.FSDataOutputStream; @@ -35,15 +40,20 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.s3a.S3AFileSystem; import org.apache.hadoop.fs.s3a.s3guard.S3GuardTool.Diff; +import static org.apache.hadoop.fs.s3a.MultipartTestUtils.*; import static org.apache.hadoop.fs.s3a.s3guard.S3GuardTool.*; import static org.apache.hadoop.test.LambdaTestUtils.intercept; /** * Test S3Guard related CLI commands against a LocalMetadataStore. + * Also responsible for testing the non s3guard-specific commands that, for + * now, live under the s3guard CLI command. */ public class ITestS3GuardToolLocal extends AbstractS3GuardToolTestBase { private static final String LOCAL_METADATA = "local://metadata"; + private static final String[] ABORT_FORCE_OPTIONS = new String[] {"-abort", + "-force", "-verbose"}; @Override protected MetadataStore newMetadataStore() { @@ -261,5 +271,182 @@ public class ITestS3GuardToolLocal extends AbstractS3GuardToolTestBase { LOG.info("Exec output=\n{}", output); } + private final static String UPLOAD_PREFIX = "test-upload-prefix"; + private final static String UPLOAD_NAME = "test-upload"; + @Test + public void testUploads() throws Throwable { + S3AFileSystem fs = getFileSystem(); + Path path = path(UPLOAD_PREFIX + "/" + UPLOAD_NAME); + + describe("Cleaning up any leftover uploads from previous runs."); + // 1. Make sure key doesn't already exist + clearAnyUploads(fs, path); + + // 2. Confirm no uploads are listed via API + assertNoUploadsAt(fs, path.getParent()); + + // 3. Confirm no uploads are listed via CLI + describe("Confirming CLI lists nothing."); + assertNumUploads(path, 0); + + // 4. Create a upload part + describe("Uploading single part."); + createPartUpload(fs, fs.pathToKey(path), 128, 1); + + try { + // 5. Confirm it exists via API.. + LambdaTestUtils.eventually(5000, /* 5 seconds until failure */ + 1000, /* one second retry interval */ + () -> { + assertEquals("Should be one upload", 1, countUploadsAt(fs, path)); + }); + + // 6. Confirm part exists via CLI, direct path and parent path + describe("Confirming CLI lists one part"); + LambdaTestUtils.eventually(5000, 1000, + () -> assertNumUploads(path, 1)); + LambdaTestUtils.eventually(5000, 1000, + () -> assertNumUploads(path.getParent(), 1)); + + // 7. Use CLI to delete part, assert it worked + describe("Deleting part via CLI"); + assertNumDeleted(fs, path, 1); + + // 8. Confirm deletion via API + describe("Confirming deletion via API"); + assertEquals("Should be no uploads", 0, countUploadsAt(fs, path)); + + // 9. Confirm no uploads are listed via CLI + describe("Confirming CLI lists nothing."); + assertNumUploads(path, 0); + + } catch (Throwable t) { + // Clean up on intermediate failure + clearAnyUploads(fs, path); + throw t; + } + } + + @Test + public void testUploadListByAge() throws Throwable { + S3AFileSystem fs = getFileSystem(); + Path path = path(UPLOAD_PREFIX + "/" + UPLOAD_NAME); + + describe("Cleaning up any leftover uploads from previous runs."); + // 1. Make sure key doesn't already exist + clearAnyUploads(fs, path); + + // 2. Create a upload part + describe("Uploading single part."); + createPartUpload(fs, fs.pathToKey(path), 128, 1); + + try { + // 3. Confirm it exists via API.. may want to wrap with + // LambdaTestUtils.eventually() ? + LambdaTestUtils.eventually(5000, 1000, + () -> { + assertEquals("Should be one upload", 1, countUploadsAt(fs, path)); + }); + + // 4. Confirm part does appear in listing with long age filter + describe("Confirming CLI older age doesn't list"); + assertNumUploadsAge(path, 0, 600); + + // 5. Confirm part does not get deleted with long age filter + describe("Confirming CLI older age doesn't delete"); + uploadCommandAssertCount(fs, ABORT_FORCE_OPTIONS, path, 0, + 600); + + // 6. Wait a second and then assert the part is in listing of things at + // least a second old + describe("Sleeping 1 second then confirming upload still there"); + Thread.sleep(1000); + LambdaTestUtils.eventually(5000, 1000, + () -> assertNumUploadsAge(path, 1, 1)); + + // 7. Assert deletion works when age filter matches + describe("Doing aged deletion"); + uploadCommandAssertCount(fs, ABORT_FORCE_OPTIONS, path, 1, 1); + describe("Confirming age deletion happened"); + assertEquals("Should be no uploads", 0, countUploadsAt(fs, path)); + } catch (Throwable t) { + // Clean up on intermediate failure + clearAnyUploads(fs, path); + throw t; + } + } + + @Test + public void testUploadNegativeExpect() throws Throwable { + runToFailure(E_BAD_STATE, Uploads.NAME, "-expect", "1", + path("/we/are/almost/postive/this/doesnt/exist/fhfsadfoijew") + .toString()); + } + + private void assertNumUploads(Path path, int numUploads) throws Exception { + assertNumUploadsAge(path, numUploads, 0); + } + + private void assertNumUploadsAge(Path path, int numUploads, int ageSeconds) + throws Exception { + if (ageSeconds > 0) { + run(Uploads.NAME, "-expect", String.valueOf(numUploads), "-seconds", + String.valueOf(ageSeconds), path.toString()); + } else { + run(Uploads.NAME, "-expect", String.valueOf(numUploads), path.toString()); + } + } + + private void assertNumDeleted(S3AFileSystem fs, Path path, int numDeleted) + throws Exception { + uploadCommandAssertCount(fs, ABORT_FORCE_OPTIONS, path, + numDeleted, 0); + } + + /** + * Run uploads cli command and assert the reported count (listed or + * deleted) matches. + * @param fs S3AFileSystem + * @param options main command options + * @param path path of part(s) + * @param numUploads expected number of listed/deleted parts + * @param ageSeconds optional seconds of age to specify to CLI, or zero to + * search all parts + * @throws Exception on failure + */ + private void uploadCommandAssertCount(S3AFileSystem fs, String[] options, + Path path, int numUploads, int ageSeconds) + throws Exception { + List allOptions = new ArrayList<>(); + List output = new ArrayList<>(); + S3GuardTool.Uploads cmd = new S3GuardTool.Uploads(fs.getConf()); + ByteArrayOutputStream buf = new ByteArrayOutputStream(); + allOptions.add(cmd.getName()); + allOptions.addAll(Arrays.asList(options)); + if (ageSeconds > 0) { + allOptions.add("-" + Uploads.SECONDS_FLAG); + allOptions.add(String.valueOf(ageSeconds)); + } + allOptions.add(path.toString()); + exec(cmd, buf, allOptions.toArray(new String[0])); + + try (BufferedReader reader = new BufferedReader( + new InputStreamReader(new ByteArrayInputStream(buf.toByteArray())))) { + String line; + while ((line = reader.readLine()) != null) { + String[] fields = line.split("\\s"); + if (fields.length == 4 && fields[0].equals(Uploads.TOTAL)) { + int parsedUploads = Integer.valueOf(fields[1]); + LOG.debug("Matched CLI output: {} {} {} {}", fields); + assertEquals("Unexpected number of uploads", numUploads, + parsedUploads); + return; + } + LOG.debug("Not matched: {}", line); + output.add(line); + } + } + fail("Command output did not match: \n" + StringUtils.join("\n", output)); + } }