HADOOP-13974. S3Guard CLI to support list/purge of pending multipart commits.

Contributed by Aaron Fabbri
This commit is contained in:
Steve Loughran 2017-12-18 21:18:52 +00:00
parent 94576b17fb
commit 35ad9b1dd2
18 changed files with 1082 additions and 70 deletions

View File

@ -81,6 +81,11 @@ public class KDiag extends Configured implements Tool, Closeable {
* variable. This is what kinit will use by default: {@value} * variable. This is what kinit will use by default: {@value}
*/ */
public static final String KRB5_CCNAME = "KRB5CCNAME"; public static final String KRB5_CCNAME = "KRB5CCNAME";
/**
* Location of main kerberos configuration file as passed down via an
* environment variable.
*/
public static final String KRB5_CONFIG = "KRB5_CONFIG";
public static final String JAVA_SECURITY_KRB5_CONF public static final String JAVA_SECURITY_KRB5_CONF
= "java.security.krb5.conf"; = "java.security.krb5.conf";
public static final String JAVA_SECURITY_KRB5_REALM public static final String JAVA_SECURITY_KRB5_REALM
@ -323,6 +328,7 @@ public class KDiag extends Configured implements Tool, Closeable {
for (String env : new String[]{ for (String env : new String[]{
HADOOP_JAAS_DEBUG, HADOOP_JAAS_DEBUG,
KRB5_CCNAME, KRB5_CCNAME,
KRB5_CONFIG,
HADOOP_USER_NAME, HADOOP_USER_NAME,
HADOOP_PROXY_USER, HADOOP_PROXY_USER,
HADOOP_TOKEN_FILE_LOCATION, HADOOP_TOKEN_FILE_LOCATION,
@ -562,14 +568,14 @@ public class KDiag extends Configured implements Tool, Closeable {
krbPath = jvmKrbPath; krbPath = jvmKrbPath;
} }
String krb5name = System.getenv(KRB5_CCNAME); String krb5name = System.getenv(KRB5_CONFIG);
if (krb5name != null) { if (krb5name != null) {
println("Setting kerberos path from environment variable %s: \"%s\"", println("Setting kerberos path from environment variable %s: \"%s\"",
KRB5_CCNAME, krb5name); KRB5_CONFIG, krb5name);
krbPath = krb5name; krbPath = krb5name;
if (jvmKrbPath != null) { if (jvmKrbPath != null) {
println("Warning - both %s and %s were set - %s takes priority", println("Warning - both %s and %s were set - %s takes priority",
JAVA_SECURITY_KRB5_CONF, KRB5_CCNAME, KRB5_CCNAME); JAVA_SECURITY_KRB5_CONF, KRB5_CONFIG, KRB5_CONFIG);
} }
} }
@ -919,7 +925,7 @@ public class KDiag extends Configured implements Tool, Closeable {
private void dump(File file) throws IOException { private void dump(File file) throws IOException {
try (FileInputStream in = new FileInputStream(file)) { try (FileInputStream in = new FileInputStream(file)) {
for (String line : IOUtils.readLines(in)) { for (String line : IOUtils.readLines(in)) {
println(line); println("%s", line);
} }
} }
} }

View File

@ -21,6 +21,7 @@ package org.apache.hadoop.fs.s3a;
import java.io.IOException; import java.io.IOException;
import java.io.InterruptedIOException; import java.io.InterruptedIOException;
import java.util.Optional; import java.util.Optional;
import javax.annotation.Nullable;
import com.amazonaws.AmazonClientException; import com.amazonaws.AmazonClientException;
import com.amazonaws.SdkBaseException; import com.amazonaws.SdkBaseException;
@ -222,7 +223,7 @@ public class Invoker {
*/ */
@Retries.RetryTranslated @Retries.RetryTranslated
public <T> T retry(String action, public <T> T retry(String action,
String path, @Nullable String path,
boolean idempotent, boolean idempotent,
Operation<T> operation) Operation<T> operation)
throws IOException { throws IOException {
@ -247,7 +248,7 @@ public class Invoker {
@Retries.RetryTranslated @Retries.RetryTranslated
public <T> T retry( public <T> T retry(
String action, String action,
String path, @Nullable String path,
boolean idempotent, boolean idempotent,
Retried retrying, Retried retrying,
Operation<T> operation) Operation<T> operation)
@ -413,7 +414,7 @@ public class Invoker {
* @param path path (may be null or empty) * @param path path (may be null or empty)
* @return string for logs * @return string for logs
*/ */
private static String toDescription(String action, String path) { private static String toDescription(String action, @Nullable String path) {
return action + return action +
(StringUtils.isNotEmpty(path) ? (" on " + path) : ""); (StringUtils.isNotEmpty(path) ? (" on " + path) : "");
} }

View File

@ -0,0 +1,214 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3a;
import java.io.IOException;
import java.util.ListIterator;
import java.util.NoSuchElementException;
import javax.annotation.Nullable;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.model.ListMultipartUploadsRequest;
import com.amazonaws.services.s3.model.MultipartUpload;
import com.amazonaws.services.s3.model.MultipartUploadListing;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.fs.RemoteIterator;
/**
* MultipartUtils upload-specific functions for use by S3AFileSystem and Hadoop
* CLI.
*/
public final class MultipartUtils {
private static final Logger LOG =
LoggerFactory.getLogger(MultipartUtils.class);
/** Not instantiated. */
private MultipartUtils() { }
/**
* List outstanding multipart uploads.
* Package private: S3AFileSystem and tests are the users of this.
* @param s3 AmazonS3 client to use.
* @param bucketName name of S3 bucket to use.
* @param maxKeys maximum batch size to request at a time from S3.
* @param prefix optional key prefix to narrow search. If null then whole
* bucket will be searched.
* @return an iterator of matching uploads
*/
static MultipartUtils.UploadIterator listMultipartUploads(AmazonS3 s3,
Invoker invoker, String bucketName, int maxKeys, @Nullable String prefix)
throws IOException {
return new MultipartUtils.UploadIterator(s3, invoker, bucketName, maxKeys,
prefix);
}
/**
* Simple RemoteIterator wrapper for AWS `listMultipartUpload` API.
* Iterates over batches of multipart upload metadata listings.
*/
static class ListingIterator implements
RemoteIterator<MultipartUploadListing> {
private final String bucketName;
private final String prefix;
private final int maxKeys;
private final AmazonS3 s3;
private final Invoker invoker;
/**
* Most recent listing results.
*/
private MultipartUploadListing listing;
/**
* Indicator that this is the first listing.
*/
private boolean firstListing = true;
private int listCount = 1;
ListingIterator(AmazonS3 s3, Invoker invoker, String bucketName,
int maxKeys, @Nullable String prefix) throws IOException {
this.s3 = s3;
this.bucketName = bucketName;
this.maxKeys = maxKeys;
this.prefix = prefix;
this.invoker = invoker;
requestNextBatch();
}
/**
* Iterator has data if it is either is the initial iteration, or
* the last listing obtained was incomplete.
* @throws IOException not thrown by this implementation.
*/
@Override
public boolean hasNext() throws IOException {
if (listing == null) {
// shouldn't happen, but don't trust AWS SDK
return false;
} else {
return firstListing || listing.isTruncated();
}
}
/**
* Get next listing. First call, this returns initial set (possibly
* empty) obtained from S3. Subsequent calls my block on I/O or fail.
* @return next upload listing.
* @throws IOException if S3 operation fails.
* @throws NoSuchElementException if there are no more uploads.
*/
@Override
@Retries.RetryTranslated
public MultipartUploadListing next() throws IOException {
if (firstListing) {
firstListing = false;
} else {
if (listing == null || !listing.isTruncated()) {
// nothing more to request: fail.
throw new NoSuchElementException("No more uploads under " + prefix);
}
// need to request a new set of objects.
requestNextBatch();
}
return listing;
}
@Override
public String toString() {
return "Upload iterator: prefix " + prefix + "; list count " +
listCount + "; isTruncated=" + listing.isTruncated();
}
@Retries.RetryTranslated
private void requestNextBatch() throws IOException {
ListMultipartUploadsRequest req =
new ListMultipartUploadsRequest(bucketName);
if (prefix != null) {
req.setPrefix(prefix);
}
if (!firstListing) {
req.setKeyMarker(listing.getNextKeyMarker());
req.setUploadIdMarker(listing.getNextUploadIdMarker());
}
req.setMaxUploads(listCount);
LOG.debug("[{}], Requesting next {} uploads prefix {}, " +
"next key {}, next upload id {}", listCount, maxKeys, prefix,
req.getKeyMarker(), req.getUploadIdMarker());
listCount++;
listing = invoker.retry("listMultipartUploads", prefix, true,
() -> s3.listMultipartUploads(req));
LOG.debug("New listing state: {}", this);
}
}
/**
* Iterator over multipart uploads. Similar to
* {@link org.apache.hadoop.fs.s3a.Listing.FileStatusListingIterator}, but
* iterates over pending uploads instead of existing objects.
*/
public static class UploadIterator
implements RemoteIterator<MultipartUpload> {
private ListingIterator lister;
/** Current listing: the last upload listing we fetched. */
private MultipartUploadListing listing;
/** Iterator over the current listing. */
private ListIterator<MultipartUpload> batchIterator;
@Retries.RetryTranslated
public UploadIterator(AmazonS3 s3, Invoker invoker, String bucketName,
int maxKeys, @Nullable String prefix)
throws IOException {
lister = new ListingIterator(s3, invoker, bucketName, maxKeys, prefix);
requestNextBatch();
}
@Override
public boolean hasNext() throws IOException {
return (batchIterator.hasNext() || requestNextBatch());
}
@Override
public MultipartUpload next() throws IOException {
if (!hasNext()) {
throw new NoSuchElementException();
}
return batchIterator.next();
}
private boolean requestNextBatch() throws IOException {
if (lister.hasNext()) {
listing = lister.next();
batchIterator = listing.getMultipartUploads().listIterator();
return batchIterator.hasNext();
}
return false;
}
}
}

View File

@ -43,6 +43,7 @@ import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import javax.annotation.Nullable;
import com.amazonaws.AmazonClientException; import com.amazonaws.AmazonClientException;
import com.amazonaws.AmazonServiceException; import com.amazonaws.AmazonServiceException;
@ -194,6 +195,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities {
private String blockOutputBuffer; private String blockOutputBuffer;
private S3ADataBlocks.BlockFactory blockFactory; private S3ADataBlocks.BlockFactory blockFactory;
private int blockOutputActiveBlocks; private int blockOutputActiveBlocks;
private WriteOperationHelper writeHelper;
private boolean useListV1; private boolean useListV1;
private MagicCommitIntegration committerIntegration; private MagicCommitIntegration committerIntegration;
@ -247,6 +249,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities {
s3 = ReflectionUtils.newInstance(s3ClientFactoryClass, conf) s3 = ReflectionUtils.newInstance(s3ClientFactoryClass, conf)
.createS3Client(name); .createS3Client(name);
invoker = new Invoker(new S3ARetryPolicy(getConf()), onRetry); invoker = new Invoker(new S3ARetryPolicy(getConf()), onRetry);
writeHelper = new WriteOperationHelper(this, getConf());
maxKeys = intOption(conf, MAX_PAGING_KEYS, DEFAULT_MAX_PAGING_KEYS, 1); maxKeys = intOption(conf, MAX_PAGING_KEYS, DEFAULT_MAX_PAGING_KEYS, 1);
listing = new Listing(this); listing = new Listing(this);
@ -753,13 +756,13 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities {
partSize, partSize,
blockFactory, blockFactory,
instrumentation.newOutputStreamStatistics(statistics), instrumentation.newOutputStreamStatistics(statistics),
createWriteOperationHelper(), getWriteOperationHelper(),
putTracker), putTracker),
null); null);
} }
/** /**
* Create a new {@code WriteOperationHelper} instance. * Get a {@code WriteOperationHelper} instance.
* *
* This class permits other low-level operations against the store. * This class permits other low-level operations against the store.
* It is unstable and * It is unstable and
@ -768,8 +771,8 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities {
* @return a new helper. * @return a new helper.
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
public WriteOperationHelper createWriteOperationHelper() { public WriteOperationHelper getWriteOperationHelper() {
return new WriteOperationHelper(this); return writeHelper;
} }
/** /**
@ -3078,8 +3081,26 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities {
: null); : null);
} }
/**
* List any pending multipart uploads whose keys begin with prefix, using
* an iterator that can handle an unlimited number of entries.
* See {@link #listMultipartUploads(String)} for a non-iterator version of
* this.
*
* @param prefix optional key prefix to search
* @return Iterator over multipart uploads.
* @throws IOException on failure
*/
public MultipartUtils.UploadIterator listUploads(@Nullable String prefix)
throws IOException {
return MultipartUtils.listMultipartUploads(s3, invoker, bucket, maxKeys,
prefix);
}
/** /**
* Listing all multipart uploads; limited to the first few hundred. * Listing all multipart uploads; limited to the first few hundred.
* See {@link #listUploads(String)} for an iterator-based version that does
* not limit the number of entries returned.
* Retry policy: retry, translated. * Retry policy: retry, translated.
* @return a listing of multipart uploads. * @return a listing of multipart uploads.
* @param prefix prefix to scan for, "" for none * @param prefix prefix to scan for, "" for none
@ -3166,5 +3187,4 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities {
return false; return false;
} }
} }
} }

View File

@ -51,6 +51,7 @@ import com.google.common.collect.Lists;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
import java.io.EOFException; import java.io.EOFException;
import java.io.FileNotFoundException; import java.io.FileNotFoundException;
import java.io.IOException; import java.io.IOException;
@ -149,7 +150,7 @@ public final class S3AUtils {
* @return an IOE which wraps the caught exception. * @return an IOE which wraps the caught exception.
*/ */
@SuppressWarnings("ThrowableInstanceNeverThrown") @SuppressWarnings("ThrowableInstanceNeverThrown")
public static IOException translateException(String operation, public static IOException translateException(@Nullable String operation,
String path, String path,
SdkBaseException exception) { SdkBaseException exception) {
String message = String.format("%s%s: %s", String message = String.format("%s%s: %s",

View File

@ -38,6 +38,7 @@ import com.amazonaws.services.s3.model.UploadPartRequest;
import com.amazonaws.services.s3.model.UploadPartResult; import com.amazonaws.services.s3.model.UploadPartResult;
import com.amazonaws.services.s3.transfer.model.UploadResult; import com.amazonaws.services.s3.transfer.model.UploadResult;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import org.apache.hadoop.conf.Configuration;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -83,9 +84,9 @@ public class WriteOperationHelper {
* @param owner owner FS creating the helper * @param owner owner FS creating the helper
* *
*/ */
protected WriteOperationHelper(S3AFileSystem owner) { protected WriteOperationHelper(S3AFileSystem owner, Configuration conf) {
this.owner = owner; this.owner = owner;
this.invoker = new Invoker(new S3ARetryPolicy(owner.getConf()), this.invoker = new Invoker(new S3ARetryPolicy(conf),
this::operationRetried); this::operationRetried);
} }

View File

@ -101,7 +101,7 @@ public class CommitOperations {
Preconditions.checkArgument(fs != null, "null fs"); Preconditions.checkArgument(fs != null, "null fs");
this.fs = fs; this.fs = fs;
statistics = fs.newCommitterStatistics(); statistics = fs.newCommitterStatistics();
writeOperations = fs.createWriteOperationHelper(); writeOperations = fs.getWriteOperationHelper();
} }
/** /**

View File

@ -101,7 +101,7 @@ public class MagicCommitIntegration {
key, key,
destKey, destKey,
pendingsetPath, pendingsetPath,
owner.createWriteOperationHelper()); owner.getWriteOperationHelper());
LOG.debug("Created {}", tracker); LOG.debug("Created {}", tracker);
} else { } else {
LOG.warn("File being created has a \"magic\" path, but the filesystem" LOG.warn("File being created has a \"magic\" path, but the filesystem"

View File

@ -23,14 +23,17 @@ import java.io.IOException;
import java.io.PrintStream; import java.io.PrintStream;
import java.net.URI; import java.net.URI;
import java.net.URISyntaxException; import java.net.URISyntaxException;
import java.util.Date;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Locale; import java.util.Locale;
import java.util.Map; import java.util.Map;
import java.util.Scanner;
import java.util.Set; import java.util.Set;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import com.amazonaws.services.s3.model.MultipartUpload;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import org.slf4j.Logger; import org.slf4j.Logger;
@ -44,6 +47,7 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.fs.s3a.MultipartUtils;
import org.apache.hadoop.fs.s3a.S3AFileStatus; import org.apache.hadoop.fs.s3a.S3AFileStatus;
import org.apache.hadoop.fs.s3a.S3AFileSystem; import org.apache.hadoop.fs.s3a.S3AFileSystem;
import org.apache.hadoop.fs.s3a.S3AUtils; import org.apache.hadoop.fs.s3a.S3AUtils;
@ -55,6 +59,7 @@ import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner; import org.apache.hadoop.util.ToolRunner;
import static org.apache.hadoop.fs.s3a.Constants.*; import static org.apache.hadoop.fs.s3a.Constants.*;
import static org.apache.hadoop.fs.s3a.Invoker.LOG_EVENT;
import static org.apache.hadoop.service.launcher.LauncherExitCodes.*; import static org.apache.hadoop.service.launcher.LauncherExitCodes.*;
/** /**
@ -79,6 +84,7 @@ public abstract class S3GuardTool extends Configured implements Tool {
"\t" + Destroy.NAME + " - " + Destroy.PURPOSE + "\n" + "\t" + Destroy.NAME + " - " + Destroy.PURPOSE + "\n" +
"\t" + Import.NAME + " - " + Import.PURPOSE + "\n" + "\t" + Import.NAME + " - " + Import.PURPOSE + "\n" +
"\t" + BucketInfo.NAME + " - " + BucketInfo.PURPOSE + "\n" + "\t" + BucketInfo.NAME + " - " + BucketInfo.PURPOSE + "\n" +
"\t" + Uploads.NAME + " - " + Uploads.PURPOSE + "\n" +
"\t" + Diff.NAME + " - " + Diff.PURPOSE + "\n" + "\t" + Diff.NAME + " - " + Diff.PURPOSE + "\n" +
"\t" + Prune.NAME + " - " + Prune.PURPOSE + "\n" + "\t" + Prune.NAME + " - " + Prune.PURPOSE + "\n" +
"\t" + SetCapacity.NAME + " - " +SetCapacity.PURPOSE + "\n"; "\t" + SetCapacity.NAME + " - " +SetCapacity.PURPOSE + "\n";
@ -100,10 +106,14 @@ public abstract class S3GuardTool extends Configured implements Tool {
private final CommandFormat commandFormat; private final CommandFormat commandFormat;
public static final String META_FLAG = "meta"; public static final String META_FLAG = "meta";
// These are common to prune, upload subcommands.
public static final String DAYS_FLAG = "days"; public static final String DAYS_FLAG = "days";
public static final String HOURS_FLAG = "hours"; public static final String HOURS_FLAG = "hours";
public static final String MINUTES_FLAG = "minutes"; public static final String MINUTES_FLAG = "minutes";
public static final String SECONDS_FLAG = "seconds"; public static final String SECONDS_FLAG = "seconds";
public static final String AGE_OPTIONS_USAGE = "[-days <days>] "
+ "[-hours <hours>] [-minutes <minutes>] [-seconds <seconds>]";
public static final String REGION_FLAG = "region"; public static final String REGION_FLAG = "region";
public static final String READ_FLAG = "read"; public static final String READ_FLAG = "read";
@ -177,6 +187,36 @@ public abstract class S3GuardTool extends Configured implements Tool {
"config, or S3 bucket"); "config, or S3 bucket");
} }
private long getDeltaComponent(TimeUnit unit, String arg) {
String raw = getCommandFormat().getOptValue(arg);
if (raw == null || raw.isEmpty()) {
return 0;
}
Long parsed = Long.parseLong(raw);
return unit.toMillis(parsed);
}
/**
* Convert all age options supplied to total milliseconds of time.
* @return Sum of all age options, or zero if none were given.
*/
long ageOptionsToMsec() {
long cliDelta = 0;
cliDelta += getDeltaComponent(TimeUnit.DAYS, DAYS_FLAG);
cliDelta += getDeltaComponent(TimeUnit.HOURS, HOURS_FLAG);
cliDelta += getDeltaComponent(TimeUnit.MINUTES, MINUTES_FLAG);
cliDelta += getDeltaComponent(TimeUnit.SECONDS, SECONDS_FLAG);
return cliDelta;
}
protected void addAgeOptions() {
CommandFormat format = getCommandFormat();
format.addOptionWithValue(DAYS_FLAG);
format.addOptionWithValue(HOURS_FLAG);
format.addOptionWithValue(MINUTES_FLAG);
format.addOptionWithValue(SECONDS_FLAG);
}
/** /**
* Parse metadata store from command line option or HDFS configuration. * Parse metadata store from command line option or HDFS configuration.
* *
@ -867,7 +907,8 @@ public abstract class S3GuardTool extends Configured implements Tool {
"Common options:\n" + "Common options:\n" +
" -" + META_FLAG + " URL - Metadata repository details " + " -" + META_FLAG + " URL - Metadata repository details " +
"(implementation-specific)\n" + "(implementation-specific)\n" +
"\n" + "Age options. Any combination of these integer-valued options:\n" +
AGE_OPTIONS_USAGE + "\n" +
"Amazon DynamoDB-specific options:\n" + "Amazon DynamoDB-specific options:\n" +
" -" + REGION_FLAG + " REGION - Service region for connections\n" + " -" + REGION_FLAG + " REGION - Service region for connections\n" +
"\n" + "\n" +
@ -877,12 +918,7 @@ public abstract class S3GuardTool extends Configured implements Tool {
Prune(Configuration conf) { Prune(Configuration conf) {
super(conf); super(conf);
addAgeOptions();
CommandFormat format = getCommandFormat();
format.addOptionWithValue(DAYS_FLAG);
format.addOptionWithValue(HOURS_FLAG);
format.addOptionWithValue(MINUTES_FLAG);
format.addOptionWithValue(SECONDS_FLAG);
} }
@VisibleForTesting @VisibleForTesting
@ -901,15 +937,6 @@ public abstract class S3GuardTool extends Configured implements Tool {
return USAGE; return USAGE;
} }
private long getDeltaComponent(TimeUnit unit, String arg) {
String raw = getCommandFormat().getOptValue(arg);
if (raw == null || raw.isEmpty()) {
return 0;
}
Long parsed = Long.parseLong(raw);
return unit.toMillis(parsed);
}
public int run(String[] args, PrintStream out) throws public int run(String[] args, PrintStream out) throws
InterruptedException, IOException { InterruptedException, IOException {
List<String> paths = parseArgs(args); List<String> paths = parseArgs(args);
@ -924,11 +951,7 @@ public abstract class S3GuardTool extends Configured implements Tool {
Configuration conf = getConf(); Configuration conf = getConf();
long confDelta = conf.getLong(S3GUARD_CLI_PRUNE_AGE, 0); long confDelta = conf.getLong(S3GUARD_CLI_PRUNE_AGE, 0);
long cliDelta = 0; long cliDelta = ageOptionsToMsec();
cliDelta += getDeltaComponent(TimeUnit.DAYS, "days");
cliDelta += getDeltaComponent(TimeUnit.HOURS, "hours");
cliDelta += getDeltaComponent(TimeUnit.MINUTES, "minutes");
cliDelta += getDeltaComponent(TimeUnit.SECONDS, "seconds");
if (confDelta <= 0 && cliDelta <= 0) { if (confDelta <= 0 && cliDelta <= 0) {
errorln("You must specify a positive age for metadata to prune."); errorln("You must specify a positive age for metadata to prune.");
@ -1080,6 +1103,214 @@ public abstract class S3GuardTool extends Configured implements Tool {
} }
/**
* Command to list / abort pending multipart uploads.
*/
static class Uploads extends S3GuardTool {
public static final String NAME = "uploads";
public static final String ABORT = "abort";
public static final String LIST = "list";
public static final String EXPECT = "expect";
public static final String VERBOSE = "verbose";
public static final String FORCE = "force";
public static final String PURPOSE = "list or abort pending " +
"multipart uploads";
private static final String USAGE = NAME + " [OPTIONS] " +
"s3a://BUCKET[/path]\n"
+ "\t" + PURPOSE + "\n\n"
+ "Common options:\n"
+ " (-" + LIST + " | -" + EXPECT +" <num-uploads> | -" + ABORT
+ ") [-" + VERBOSE +"] "
+ "[<age-options>] [-force]\n"
+ "\t - Under given path, list or delete all uploads," +
" or only those \n"
+ "older than specified by <age-options>\n"
+ "<age-options> are any combination of the integer-valued options:\n"
+ "\t" + AGE_OPTIONS_USAGE + "\n"
+ "-" + EXPECT + " is similar to list, except no output is printed,\n"
+ "\tbut the exit code will be an error if the provided number\n"
+ "\tis different that the number of uploads found by the command.\n"
+ "-" + FORCE + " option prevents the \"Are you sure\" prompt when\n"
+ "\tusing -" + ABORT;
/** Constant used for output and parsed by tests. */
public static final String TOTAL = "Total";
/** Runs in one of three modes. */
private enum Mode { LIST, EXPECT, ABORT };
private Mode mode = null;
/** For Mode == EXPECT, expected listing size. */
private int expectedCount;
/** List/abort uploads older than this many milliseconds. */
private long ageMsec = 0;
/** Verbose output flag. */
private boolean verbose = false;
/** Whether to delete with out "are you sure" prompt. */
private boolean force = false;
/** Path prefix to use when searching multipart uploads. */
private String prefix;
Uploads(Configuration conf) {
super(conf, ABORT, LIST, VERBOSE, FORCE);
addAgeOptions();
getCommandFormat().addOptionWithValue(EXPECT);
}
@Override
String getName() {
return NAME;
}
@Override
public String getUsage() {
return USAGE;
}
public int run(String[] args, PrintStream out)
throws InterruptedException, IOException {
List<String> paths = parseArgs(args);
if (paths.isEmpty()) {
errorln(getUsage());
throw invalidArgs("No options specified");
}
processArgs(paths, out);
promptBeforeAbort(out);
processUploads(out);
out.flush();
return SUCCESS;
}
private void promptBeforeAbort(PrintStream out) throws IOException {
if (mode != Mode.ABORT || force) {
return;
}
Scanner scanner = new Scanner(System.in, "UTF-8");
out.println("Are you sure you want to delete any pending " +
"uploads? (yes/no) >");
String response = scanner.nextLine();
if (!"yes".equalsIgnoreCase(response)) {
throw S3GuardTool.userAborted("User did not answer yes, quitting.");
}
}
private void processUploads(PrintStream out) throws IOException {
MultipartUtils.UploadIterator uploads;
uploads = getFilesystem().listUploads(prefix);
int count = 0;
while (uploads.hasNext()) {
MultipartUpload upload = uploads.next();
if (!olderThan(upload, ageMsec)) {
continue;
}
count++;
if (mode == Mode.ABORT || mode == Mode.LIST || verbose) {
println(out, "%s%s %s", mode == Mode.ABORT ? "Deleting: " : "",
upload.getKey(), upload.getUploadId());
}
if (mode == Mode.ABORT) {
getFilesystem().getWriteOperationHelper()
.abortMultipartUpload(upload.getKey(), upload.getUploadId(),
LOG_EVENT);
}
}
if (mode != Mode.EXPECT || verbose) {
println(out, "%s %d uploads %s.", TOTAL, count,
mode == Mode.ABORT ? "deleted" : "found");
}
if (mode == Mode.EXPECT) {
if (count != expectedCount) {
throw badState("Expected %d uploads, found %d", expectedCount, count);
}
}
}
/**
* Check if upload is at least as old as given age.
* @param u upload to check
* @param msec age in milliseconds
* @return true iff u was created at least age milliseconds ago.
*/
private boolean olderThan(MultipartUpload u, long msec) {
Date ageDate = new Date(System.currentTimeMillis() - msec);
return ageDate.compareTo(u.getInitiated()) >= 0;
}
private void processArgs(List<String> args, PrintStream out)
throws IOException {
CommandFormat commands = getCommandFormat();
String err = "Can only specify one of -" + LIST + ", " +
" -" + ABORT + ", and " + EXPECT;
// Three mutually-exclusive options
if (commands.getOpt(LIST)) {
mode = Mode.LIST;
}
if (commands.getOpt(ABORT)) {
if (mode != null) {
throw invalidArgs(err);
}
mode = Mode.ABORT;
}
String expectVal = commands.getOptValue(EXPECT);
if (expectVal != null) {
if (mode != null) {
throw invalidArgs(err);
}
mode = Mode.EXPECT;
expectedCount = Integer.parseInt(expectVal);
}
// Default to list
if (mode == null) {
vprintln(out, "No mode specified, defaulting to -" + LIST);
mode = Mode.LIST;
}
// Other flags
if (commands.getOpt(VERBOSE)) {
verbose = true;
}
if (commands.getOpt(FORCE)) {
force = true;
}
ageMsec = ageOptionsToMsec();
String s3Path = args.get(0);
URI uri = S3GuardTool.toUri(s3Path);
prefix = uri.getPath();
if (prefix.length() > 0) {
prefix = prefix.substring(1);
}
vprintln(out, "Command: %s, age %d msec, path %s (prefix \"%s\")",
mode.name(), ageMsec, s3Path, prefix);
initS3AFileSystem(s3Path);
}
/**
* If verbose flag is set, print a formatted string followed by a newline
* to the output stream.
* @param out destination
* @param format format string
* @param args optional arguments
*/
private void vprintln(PrintStream out, String format, Object...
args) {
if (verbose) {
out.println(String.format(format, args));
}
}
}
private static S3GuardTool command; private static S3GuardTool command;
/** /**
@ -1182,6 +1413,17 @@ public abstract class S3GuardTool extends Configured implements Tool {
String.format(format, args)); String.format(format, args));
} }
/**
* Build the exception to raise on user-aborted action.
* @param format string format
* @param args optional arguments for the string
* @return a new exception to throw
*/
protected static ExitUtil.ExitException userAborted(
String format, Object...args) {
return new ExitUtil.ExitException(ERROR, String.format(format, args));
}
/** /**
* Execute the command with the given arguments. * Execute the command with the given arguments.
* *
@ -1224,6 +1466,9 @@ public abstract class S3GuardTool extends Configured implements Tool {
case SetCapacity.NAME: case SetCapacity.NAME:
command = new SetCapacity(conf); command = new SetCapacity(conf);
break; break;
case Uploads.NAME:
command = new Uploads(conf);
break;
default: default:
printHelp(); printHelp();
throw new ExitUtil.ExitException(E_USAGE, throw new ExitUtil.ExitException(E_USAGE,

View File

@ -1490,8 +1490,13 @@ from VMs running on EC2.
</property> </property>
``` ```
### <a name="multipart_purge"></a>Cleaning up after partial Upload Failures: `fs.s3a.multipart.purge` ### <a name="multipart_purge"></a>Cleaning up after partial Upload Failures
There are two mechanisms for cleaning up after leftover multipart
uploads:
- Hadoop s3guard CLI commands for listing and deleting uploads by their
age. Doumented in the [S3Guard](./s3guard.html) section.
- The configuration parameter `fs.s3a.multipart.purge`, covered below.
If an large stream writeoperation is interrupted, there may be If an large stream writeoperation is interrupted, there may be
intermediate partitions uploaded to S3 —data which will be billed for. intermediate partitions uploaded to S3 —data which will be billed for.

View File

@ -515,10 +515,43 @@ hadoop s3guard bucket-info -guarded -auth s3a://landsat-pds
Require the bucket to be using S3Guard in authoritative mode. This will normally Require the bucket to be using S3Guard in authoritative mode. This will normally
fail against this specific bucket. fail against this specific bucket.
### List or Delete Leftover Multipart Uploads: `s3guard uploads`
Lists or deletes all pending (uncompleted) multipart uploads older than
given age.
```bash
hadoop s3guard uploads (-list | -abort | -expect <num-uploads>) [-verbose] \
[-days <days>] [-hours <hours>] [-minutes <minutes>] [-seconds <seconds>] \
[-force] s3a://bucket/prefix
```
The command lists or deletes all multipart uploads which are older than
the given age, and that match the prefix supplied, if any.
For example, to delete all uncompleted multipart uploads older than two
days in the folder at `s3a://my-bucket/path/to/stuff`, use the following
command:
```bash
hadoop s3guard uploads -abort -days 2 s3a://my-bucket/path/to/stuff
```
We recommend running with `-list` first to confirm the parts shown
are those that you wish to delete. Note that the command will prompt
you with a "Are you sure?" prompt unless you specify the `-force`
option. This is to safeguard against accidental deletion of data, which
is especially risky without a long age parameter as it can affect
in-fight uploads.
The `-expect` option is similar to `-list`, except it is silent by
default, and terminates with a success or failure exit code depending
on whether or not the supplied number matches the number of uploads
found that match the given options (path, age).
### Delete a table: `s3guard destroy` ### Delete a table: `s3guard destroy`
Deletes a metadata store. With DynamoDB as the store, this means Deletes a metadata store. With DynamoDB as the store, this means
the specific DynamoDB table use to store the metadata. the specific DynamoDB table use to store the metadata.

View File

@ -0,0 +1,126 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3a;
import com.amazonaws.services.s3.model.MultipartUpload;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.junit.Test;
import java.io.IOException;
import java.util.HashSet;
import java.util.Set;
/**
* Tests for {@link MultipartUtils}.
*/
public class ITestS3AMultipartUtils extends AbstractS3ATestBase {
private static final int UPLOAD_LEN = 1024;
private static final String PART_FILENAME_BASE = "pending-part";
private static final int LIST_BATCH_SIZE = 2;
private static final int NUM_KEYS = 5;
@Override
protected Configuration createConfiguration() {
Configuration conf = super.createConfiguration();
S3ATestUtils.disableFilesystemCaching(conf);
// Forces listings to come back in multiple batches to test that part of
// the iterators.
conf.setInt(Constants.MAX_PAGING_KEYS, LIST_BATCH_SIZE);
return conf;
}
/**
* Main test case for upload part listing and iterator paging.
* @throws Exception on failure.
*/
@Test
public void testListMultipartUploads() throws Exception {
S3AFileSystem fs = getFileSystem();
Set<MultipartTestUtils.IdKey> keySet = new HashSet<>();
try {
// 1. Create NUM_KEYS pending upload parts
for (int i = 0; i < NUM_KEYS; i++) {
Path filePath = getPartFilename(i);
String key = fs.pathToKey(filePath);
describe("creating upload part with key %s", key);
// create a multipart upload
MultipartTestUtils.IdKey idKey = MultipartTestUtils
.createPartUpload(fs, key, UPLOAD_LEN,
1);
keySet.add(idKey);
}
// 2. Verify all uploads are found listing by prefix
describe("Verifying upload list by prefix");
MultipartUtils.UploadIterator uploads = fs.listUploads(getPartPrefix(fs));
assertUploadsPresent(uploads, keySet);
// 3. Verify all uploads are found listing without prefix
describe("Verifying list all uploads");
uploads = fs.listUploads(null);
assertUploadsPresent(uploads, keySet);
} finally {
// 4. Delete all uploads we created
MultipartTestUtils.cleanupParts(fs, keySet);
}
}
/**
* Assert that all provided multipart uploads are contained in the upload
* iterator's results.
* @param list upload iterator
* @param ourUploads set up uploads that should be present
* @throws IOException on I/O error
*/
private void assertUploadsPresent(MultipartUtils.UploadIterator list,
Set<MultipartTestUtils.IdKey> ourUploads) throws IOException {
// Don't modify passed-in set, use copy.
Set<MultipartTestUtils.IdKey> uploads = new HashSet<>(ourUploads);
while (list.hasNext()) {
MultipartTestUtils.IdKey listing = toIdKey(list.next());
if (uploads.contains(listing)) {
LOG.debug("Matched: {},{}", listing.getKey(), listing.getUploadId());
uploads.remove(listing);
} else {
LOG.debug("Not our upload {},{}", listing.getKey(),
listing.getUploadId());
}
}
assertTrue("Not all our uploads were listed", uploads.isEmpty());
}
private MultipartTestUtils.IdKey toIdKey(MultipartUpload mu) {
return new MultipartTestUtils.IdKey(mu.getKey(), mu.getUploadId());
}
private Path getPartFilename(int index) throws IOException {
return path(String.format("%s-%d", PART_FILENAME_BASE, index));
}
private String getPartPrefix(S3AFileSystem fs) throws IOException {
return fs.pathToKey(path("blah").getParent());
}
}

View File

@ -78,6 +78,7 @@ public class MockS3AFileSystem extends S3AFileSystem {
private final S3AInstrumentation instrumentation = private final S3AInstrumentation instrumentation =
new S3AInstrumentation(FS_URI); new S3AInstrumentation(FS_URI);
private Configuration conf; private Configuration conf;
private WriteOperationHelper writeHelper;
public MockS3AFileSystem(S3AFileSystem mock, public MockS3AFileSystem(S3AFileSystem mock,
Pair<StagingTestBase.ClientResults, StagingTestBase.ClientErrors> outcome) { Pair<StagingTestBase.ClientResults, StagingTestBase.ClientErrors> outcome) {
@ -125,6 +126,12 @@ public class MockS3AFileSystem extends S3AFileSystem {
public void initialize(URI name, Configuration originalConf) public void initialize(URI name, Configuration originalConf)
throws IOException { throws IOException {
conf = originalConf; conf = originalConf;
writeHelper = new WriteOperationHelper(this, conf);
}
@Override
public WriteOperationHelper getWriteOperationHelper() {
return writeHelper;
} }
@Override @Override

View File

@ -0,0 +1,184 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3a;
import com.amazonaws.services.s3.model.MultipartUpload;
import com.amazonaws.services.s3.model.PartETag;
import com.amazonaws.services.s3.model.UploadPartRequest;
import org.apache.hadoop.fs.Path;
import org.junit.Assert;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset;
import static org.apache.hadoop.fs.s3a.Invoker.LOG_EVENT;
/**
* Utilities for S3A multipart upload tests.
*/
public final class MultipartTestUtils {
private static final Logger LOG = LoggerFactory.getLogger(
MultipartTestUtils.class);
/** Not instantiated. */
private MultipartTestUtils() { }
/**
* Clean up all provided uploads.
* @param keySet set of uploads to abort
*/
static void cleanupParts(S3AFileSystem fs, Set <IdKey> keySet) {
boolean anyFailure = false;
for (IdKey ik : keySet) {
try {
LOG.debug("aborting upload id {}", ik.getUploadId());
fs.abortMultipartUpload(ik.getKey(), ik.getUploadId());
} catch (Exception e) {
LOG.error(String.format("Failure aborting upload %s, continuing.",
ik.getKey()), e);
anyFailure = true;
}
}
Assert.assertFalse("Failure aborting multipart upload(s), see log.",
anyFailure);
}
public static IdKey createPartUpload(S3AFileSystem fs, String key, int len,
int partNo) throws IOException {
WriteOperationHelper writeHelper = fs.getWriteOperationHelper();
byte[] data = dataset(len, 'a', 'z');
InputStream in = new ByteArrayInputStream(data);
String uploadId = writeHelper.initiateMultiPartUpload(key);
UploadPartRequest req = writeHelper.newUploadPartRequest(key, uploadId,
partNo, len, in, null, 0L);
PartETag partEtag = fs.uploadPart(req).getPartETag();
LOG.debug("uploaded part etag {}, upid {}", partEtag.getETag(), uploadId);
return new IdKey(key, uploadId);
}
/** Delete any uploads under given path (recursive). Silent on failure. */
public static void clearAnyUploads(S3AFileSystem fs, Path path) {
try {
String key = fs.pathToKey(path);
MultipartUtils.UploadIterator uploads = fs.listUploads(key);
while (uploads.hasNext()) {
MultipartUpload upload = uploads.next();
fs.getWriteOperationHelper().abortMultipartUpload(upload.getKey(),
upload.getUploadId(), LOG_EVENT);
LOG.debug("Cleaning up upload: {} {}", upload.getKey(),
truncatedUploadId(upload.getUploadId()));
}
} catch (IOException ioe) {
LOG.info("Ignoring exception: ", ioe);
}
}
/** Assert that there are not any upload parts at given path. */
public static void assertNoUploadsAt(S3AFileSystem fs, Path path) throws
Exception {
String key = fs.pathToKey(path);
MultipartUtils.UploadIterator uploads = fs.listUploads(key);
while (uploads.hasNext()) {
MultipartUpload upload = uploads.next();
Assert.fail("Found unexpected upload " + upload.getKey() + " " +
truncatedUploadId(upload.getUploadId()));
}
}
/** Get number of part uploads under given path. */
public static int countUploadsAt(S3AFileSystem fs, Path path) throws
IOException {
String key = fs.pathToKey(path);
MultipartUtils.UploadIterator uploads = fs.listUploads(key);
int count = 0;
while (uploads.hasNext()) {
MultipartUpload upload = uploads.next();
count++;
}
return count;
}
/**
* Get a list of all pending uploads under a prefix, one which can be printed.
* @param prefix prefix to look under
* @return possibly empty list
* @throws IOException IO failure.
*/
public static List<String> listMultipartUploads(S3AFileSystem fs,
String prefix) throws IOException {
return fs
.listMultipartUploads(prefix).stream()
.map(upload -> String.format("Upload to %s with ID %s; initiated %s",
upload.getKey(),
upload.getUploadId(),
S3ATestUtils.LISTING_FORMAT.format(upload.getInitiated())))
.collect(Collectors.toList());
}
private static String truncatedUploadId(String fullId) {
return fullId.substring(0, 12) + " ...";
}
/** Struct of object key, upload ID. */
static class IdKey {
private String key;
private String uploadId;
IdKey(String key, String uploadId) {
this.key = key;
this.uploadId = uploadId;
}
public String getKey() {
return key;
}
public String getUploadId() {
return uploadId;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
IdKey key1 = (IdKey) o;
return Objects.equals(key, key1.key) &&
Objects.equals(uploadId, key1.uploadId);
}
@Override
public int hashCode() {
return Objects.hash(key, uploadId);
}
}
}

View File

@ -45,7 +45,6 @@ import java.net.URISyntaxException;
import java.text.DateFormat; import java.text.DateFormat;
import java.text.SimpleDateFormat; import java.text.SimpleDateFormat;
import java.util.List; import java.util.List;
import java.util.stream.Collectors;
import static org.apache.hadoop.fs.contract.ContractTestUtils.skip; import static org.apache.hadoop.fs.contract.ContractTestUtils.skip;
import static org.apache.hadoop.fs.s3a.InconsistentAmazonS3Client.*; import static org.apache.hadoop.fs.s3a.InconsistentAmazonS3Client.*;
@ -822,27 +821,9 @@ public final class S3ATestUtils {
/** /**
* Date format used for mapping upload initiation time to human string. * Date format used for mapping upload initiation time to human string.
*/ */
private static final DateFormat LISTING_FORMAT = new SimpleDateFormat( public static final DateFormat LISTING_FORMAT = new SimpleDateFormat(
"yyyy-MM-dd HH:mm:ss"); "yyyy-MM-dd HH:mm:ss");
/**
* Get a list of all pending uploads under a prefix, one which can be printed.
* @param prefix prefix to look under
* @return possibly empty list
* @throws IOException IO failure.
*/
public static List<String> listMultipartUploads(S3AFileSystem fs,
String prefix) throws IOException {
return fs
.listMultipartUploads(prefix).stream()
.map(upload -> String.format("Upload to %s with ID %s; initiated %s",
upload.getKey(),
upload.getUploadId(),
LISTING_FORMAT.format(upload.getInitiated())))
.collect(Collectors.toList());
}
/** /**
* Skip a test if the FS isn't marked as supporting magic commits. * Skip a test if the FS isn't marked as supporting magic commits.
* @param fs filesystem * @param fs filesystem

View File

@ -47,6 +47,7 @@ import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils; import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
import static org.apache.hadoop.fs.s3a.Constants.*; import static org.apache.hadoop.fs.s3a.Constants.*;
import static org.apache.hadoop.fs.s3a.MultipartTestUtils.listMultipartUploads;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.*; import static org.apache.hadoop.fs.s3a.S3ATestUtils.*;
import static org.apache.hadoop.fs.s3a.commit.CommitConstants.*; import static org.apache.hadoop.fs.s3a.commit.CommitConstants.*;
@ -247,7 +248,7 @@ public abstract class AbstractCommitITest extends AbstractS3ATestBase {
S3AFileSystem fs = getFileSystem(); S3AFileSystem fs = getFileSystem();
if (fs != null && path != null) { if (fs != null && path != null) {
String key = fs.pathToKey(path); String key = fs.pathToKey(path);
WriteOperationHelper writeOps = fs.createWriteOperationHelper(); WriteOperationHelper writeOps = fs.getWriteOperationHelper();
int count = writeOps.abortMultipartUploadsUnderPath(key); int count = writeOps.abortMultipartUploadsUnderPath(key);
if (count > 0) { if (count > 0) {
log().info("Multipart uploads deleted: {}", count); log().info("Multipart uploads deleted: {}", count);

View File

@ -40,8 +40,8 @@ import org.apache.hadoop.fs.s3a.commit.files.PendingSet;
import org.apache.hadoop.fs.s3a.commit.files.SinglePendingCommit; import org.apache.hadoop.fs.s3a.commit.files.SinglePendingCommit;
import org.apache.hadoop.fs.s3a.scale.AbstractSTestS3AHugeFiles; import org.apache.hadoop.fs.s3a.scale.AbstractSTestS3AHugeFiles;
import static org.apache.hadoop.fs.s3a.MultipartTestUtils.listMultipartUploads;
import static org.apache.hadoop.fs.s3a.commit.CommitConstants.*; import static org.apache.hadoop.fs.s3a.commit.CommitConstants.*;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.*;
/** /**

View File

@ -24,10 +24,15 @@ import java.io.ByteArrayOutputStream;
import java.io.FileNotFoundException; import java.io.FileNotFoundException;
import java.io.IOException; import java.io.IOException;
import java.io.InputStreamReader; import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet; import java.util.HashSet;
import java.util.List;
import java.util.Set; import java.util.Set;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
import org.apache.hadoop.test.LambdaTestUtils;
import org.apache.hadoop.util.StringUtils;
import org.junit.Test; import org.junit.Test;
import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FSDataOutputStream;
@ -35,15 +40,20 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.s3a.S3AFileSystem; import org.apache.hadoop.fs.s3a.S3AFileSystem;
import org.apache.hadoop.fs.s3a.s3guard.S3GuardTool.Diff; import org.apache.hadoop.fs.s3a.s3guard.S3GuardTool.Diff;
import static org.apache.hadoop.fs.s3a.MultipartTestUtils.*;
import static org.apache.hadoop.fs.s3a.s3guard.S3GuardTool.*; import static org.apache.hadoop.fs.s3a.s3guard.S3GuardTool.*;
import static org.apache.hadoop.test.LambdaTestUtils.intercept; import static org.apache.hadoop.test.LambdaTestUtils.intercept;
/** /**
* Test S3Guard related CLI commands against a LocalMetadataStore. * Test S3Guard related CLI commands against a LocalMetadataStore.
* Also responsible for testing the non s3guard-specific commands that, for
* now, live under the s3guard CLI command.
*/ */
public class ITestS3GuardToolLocal extends AbstractS3GuardToolTestBase { public class ITestS3GuardToolLocal extends AbstractS3GuardToolTestBase {
private static final String LOCAL_METADATA = "local://metadata"; private static final String LOCAL_METADATA = "local://metadata";
private static final String[] ABORT_FORCE_OPTIONS = new String[] {"-abort",
"-force", "-verbose"};
@Override @Override
protected MetadataStore newMetadataStore() { protected MetadataStore newMetadataStore() {
@ -261,5 +271,182 @@ public class ITestS3GuardToolLocal extends AbstractS3GuardToolTestBase {
LOG.info("Exec output=\n{}", output); LOG.info("Exec output=\n{}", output);
} }
private final static String UPLOAD_PREFIX = "test-upload-prefix";
private final static String UPLOAD_NAME = "test-upload";
@Test
public void testUploads() throws Throwable {
S3AFileSystem fs = getFileSystem();
Path path = path(UPLOAD_PREFIX + "/" + UPLOAD_NAME);
describe("Cleaning up any leftover uploads from previous runs.");
// 1. Make sure key doesn't already exist
clearAnyUploads(fs, path);
// 2. Confirm no uploads are listed via API
assertNoUploadsAt(fs, path.getParent());
// 3. Confirm no uploads are listed via CLI
describe("Confirming CLI lists nothing.");
assertNumUploads(path, 0);
// 4. Create a upload part
describe("Uploading single part.");
createPartUpload(fs, fs.pathToKey(path), 128, 1);
try {
// 5. Confirm it exists via API..
LambdaTestUtils.eventually(5000, /* 5 seconds until failure */
1000, /* one second retry interval */
() -> {
assertEquals("Should be one upload", 1, countUploadsAt(fs, path));
});
// 6. Confirm part exists via CLI, direct path and parent path
describe("Confirming CLI lists one part");
LambdaTestUtils.eventually(5000, 1000,
() -> assertNumUploads(path, 1));
LambdaTestUtils.eventually(5000, 1000,
() -> assertNumUploads(path.getParent(), 1));
// 7. Use CLI to delete part, assert it worked
describe("Deleting part via CLI");
assertNumDeleted(fs, path, 1);
// 8. Confirm deletion via API
describe("Confirming deletion via API");
assertEquals("Should be no uploads", 0, countUploadsAt(fs, path));
// 9. Confirm no uploads are listed via CLI
describe("Confirming CLI lists nothing.");
assertNumUploads(path, 0);
} catch (Throwable t) {
// Clean up on intermediate failure
clearAnyUploads(fs, path);
throw t;
}
}
@Test
public void testUploadListByAge() throws Throwable {
S3AFileSystem fs = getFileSystem();
Path path = path(UPLOAD_PREFIX + "/" + UPLOAD_NAME);
describe("Cleaning up any leftover uploads from previous runs.");
// 1. Make sure key doesn't already exist
clearAnyUploads(fs, path);
// 2. Create a upload part
describe("Uploading single part.");
createPartUpload(fs, fs.pathToKey(path), 128, 1);
try {
// 3. Confirm it exists via API.. may want to wrap with
// LambdaTestUtils.eventually() ?
LambdaTestUtils.eventually(5000, 1000,
() -> {
assertEquals("Should be one upload", 1, countUploadsAt(fs, path));
});
// 4. Confirm part does appear in listing with long age filter
describe("Confirming CLI older age doesn't list");
assertNumUploadsAge(path, 0, 600);
// 5. Confirm part does not get deleted with long age filter
describe("Confirming CLI older age doesn't delete");
uploadCommandAssertCount(fs, ABORT_FORCE_OPTIONS, path, 0,
600);
// 6. Wait a second and then assert the part is in listing of things at
// least a second old
describe("Sleeping 1 second then confirming upload still there");
Thread.sleep(1000);
LambdaTestUtils.eventually(5000, 1000,
() -> assertNumUploadsAge(path, 1, 1));
// 7. Assert deletion works when age filter matches
describe("Doing aged deletion");
uploadCommandAssertCount(fs, ABORT_FORCE_OPTIONS, path, 1, 1);
describe("Confirming age deletion happened");
assertEquals("Should be no uploads", 0, countUploadsAt(fs, path));
} catch (Throwable t) {
// Clean up on intermediate failure
clearAnyUploads(fs, path);
throw t;
}
}
@Test
public void testUploadNegativeExpect() throws Throwable {
runToFailure(E_BAD_STATE, Uploads.NAME, "-expect", "1",
path("/we/are/almost/postive/this/doesnt/exist/fhfsadfoijew")
.toString());
}
private void assertNumUploads(Path path, int numUploads) throws Exception {
assertNumUploadsAge(path, numUploads, 0);
}
private void assertNumUploadsAge(Path path, int numUploads, int ageSeconds)
throws Exception {
if (ageSeconds > 0) {
run(Uploads.NAME, "-expect", String.valueOf(numUploads), "-seconds",
String.valueOf(ageSeconds), path.toString());
} else {
run(Uploads.NAME, "-expect", String.valueOf(numUploads), path.toString());
}
}
private void assertNumDeleted(S3AFileSystem fs, Path path, int numDeleted)
throws Exception {
uploadCommandAssertCount(fs, ABORT_FORCE_OPTIONS, path,
numDeleted, 0);
}
/**
* Run uploads cli command and assert the reported count (listed or
* deleted) matches.
* @param fs S3AFileSystem
* @param options main command options
* @param path path of part(s)
* @param numUploads expected number of listed/deleted parts
* @param ageSeconds optional seconds of age to specify to CLI, or zero to
* search all parts
* @throws Exception on failure
*/
private void uploadCommandAssertCount(S3AFileSystem fs, String[] options,
Path path, int numUploads, int ageSeconds)
throws Exception {
List<String> allOptions = new ArrayList<>();
List<String> output = new ArrayList<>();
S3GuardTool.Uploads cmd = new S3GuardTool.Uploads(fs.getConf());
ByteArrayOutputStream buf = new ByteArrayOutputStream();
allOptions.add(cmd.getName());
allOptions.addAll(Arrays.asList(options));
if (ageSeconds > 0) {
allOptions.add("-" + Uploads.SECONDS_FLAG);
allOptions.add(String.valueOf(ageSeconds));
}
allOptions.add(path.toString());
exec(cmd, buf, allOptions.toArray(new String[0]));
try (BufferedReader reader = new BufferedReader(
new InputStreamReader(new ByteArrayInputStream(buf.toByteArray())))) {
String line;
while ((line = reader.readLine()) != null) {
String[] fields = line.split("\\s");
if (fields.length == 4 && fields[0].equals(Uploads.TOTAL)) {
int parsedUploads = Integer.valueOf(fields[1]);
LOG.debug("Matched CLI output: {} {} {} {}", fields);
assertEquals("Unexpected number of uploads", numUploads,
parsedUploads);
return;
}
LOG.debug("Not matched: {}", line);
output.add(line);
}
}
fail("Command output did not match: \n" + StringUtils.join("\n", output));
}
} }