Revert "HADOOP-13974. S3Guard CLI to support list/purge of pending multipart commits."
This reverts commit 35ad9b1dd2
.
This commit is contained in:
parent
08332e12d0
commit
f274fe33ea
|
@ -81,11 +81,6 @@ public class KDiag extends Configured implements Tool, Closeable {
|
||||||
* variable. This is what kinit will use by default: {@value}
|
* variable. This is what kinit will use by default: {@value}
|
||||||
*/
|
*/
|
||||||
public static final String KRB5_CCNAME = "KRB5CCNAME";
|
public static final String KRB5_CCNAME = "KRB5CCNAME";
|
||||||
/**
|
|
||||||
* Location of main kerberos configuration file as passed down via an
|
|
||||||
* environment variable.
|
|
||||||
*/
|
|
||||||
public static final String KRB5_CONFIG = "KRB5_CONFIG";
|
|
||||||
public static final String JAVA_SECURITY_KRB5_CONF
|
public static final String JAVA_SECURITY_KRB5_CONF
|
||||||
= "java.security.krb5.conf";
|
= "java.security.krb5.conf";
|
||||||
public static final String JAVA_SECURITY_KRB5_REALM
|
public static final String JAVA_SECURITY_KRB5_REALM
|
||||||
|
@ -326,15 +321,14 @@ public class KDiag extends Configured implements Tool, Closeable {
|
||||||
|
|
||||||
title("Environment Variables");
|
title("Environment Variables");
|
||||||
for (String env : new String[]{
|
for (String env : new String[]{
|
||||||
HADOOP_JAAS_DEBUG,
|
HADOOP_JAAS_DEBUG,
|
||||||
KRB5_CCNAME,
|
KRB5_CCNAME,
|
||||||
KRB5_CONFIG,
|
HADOOP_USER_NAME,
|
||||||
HADOOP_USER_NAME,
|
HADOOP_PROXY_USER,
|
||||||
HADOOP_PROXY_USER,
|
HADOOP_TOKEN_FILE_LOCATION,
|
||||||
HADOOP_TOKEN_FILE_LOCATION,
|
"HADOOP_SECURE_LOG",
|
||||||
"HADOOP_SECURE_LOG",
|
"HADOOP_OPTS",
|
||||||
"HADOOP_OPTS",
|
"HADOOP_CLIENT_OPTS",
|
||||||
"HADOOP_CLIENT_OPTS",
|
|
||||||
}) {
|
}) {
|
||||||
printEnv(env);
|
printEnv(env);
|
||||||
}
|
}
|
||||||
|
@ -568,14 +562,14 @@ public class KDiag extends Configured implements Tool, Closeable {
|
||||||
krbPath = jvmKrbPath;
|
krbPath = jvmKrbPath;
|
||||||
}
|
}
|
||||||
|
|
||||||
String krb5name = System.getenv(KRB5_CONFIG);
|
String krb5name = System.getenv(KRB5_CCNAME);
|
||||||
if (krb5name != null) {
|
if (krb5name != null) {
|
||||||
println("Setting kerberos path from environment variable %s: \"%s\"",
|
println("Setting kerberos path from environment variable %s: \"%s\"",
|
||||||
KRB5_CONFIG, krb5name);
|
KRB5_CCNAME, krb5name);
|
||||||
krbPath = krb5name;
|
krbPath = krb5name;
|
||||||
if (jvmKrbPath != null) {
|
if (jvmKrbPath != null) {
|
||||||
println("Warning - both %s and %s were set - %s takes priority",
|
println("Warning - both %s and %s were set - %s takes priority",
|
||||||
JAVA_SECURITY_KRB5_CONF, KRB5_CONFIG, KRB5_CONFIG);
|
JAVA_SECURITY_KRB5_CONF, KRB5_CCNAME, KRB5_CCNAME);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -925,7 +919,7 @@ public class KDiag extends Configured implements Tool, Closeable {
|
||||||
private void dump(File file) throws IOException {
|
private void dump(File file) throws IOException {
|
||||||
try (FileInputStream in = new FileInputStream(file)) {
|
try (FileInputStream in = new FileInputStream(file)) {
|
||||||
for (String line : IOUtils.readLines(in)) {
|
for (String line : IOUtils.readLines(in)) {
|
||||||
println("%s", line);
|
println(line);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -21,7 +21,6 @@ package org.apache.hadoop.fs.s3a;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.InterruptedIOException;
|
import java.io.InterruptedIOException;
|
||||||
import java.util.Optional;
|
import java.util.Optional;
|
||||||
import javax.annotation.Nullable;
|
|
||||||
|
|
||||||
import com.amazonaws.AmazonClientException;
|
import com.amazonaws.AmazonClientException;
|
||||||
import com.amazonaws.SdkBaseException;
|
import com.amazonaws.SdkBaseException;
|
||||||
|
@ -223,7 +222,7 @@ public class Invoker {
|
||||||
*/
|
*/
|
||||||
@Retries.RetryTranslated
|
@Retries.RetryTranslated
|
||||||
public <T> T retry(String action,
|
public <T> T retry(String action,
|
||||||
@Nullable String path,
|
String path,
|
||||||
boolean idempotent,
|
boolean idempotent,
|
||||||
Operation<T> operation)
|
Operation<T> operation)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
|
@ -248,7 +247,7 @@ public class Invoker {
|
||||||
@Retries.RetryTranslated
|
@Retries.RetryTranslated
|
||||||
public <T> T retry(
|
public <T> T retry(
|
||||||
String action,
|
String action,
|
||||||
@Nullable String path,
|
String path,
|
||||||
boolean idempotent,
|
boolean idempotent,
|
||||||
Retried retrying,
|
Retried retrying,
|
||||||
Operation<T> operation)
|
Operation<T> operation)
|
||||||
|
@ -414,7 +413,7 @@ public class Invoker {
|
||||||
* @param path path (may be null or empty)
|
* @param path path (may be null or empty)
|
||||||
* @return string for logs
|
* @return string for logs
|
||||||
*/
|
*/
|
||||||
private static String toDescription(String action, @Nullable String path) {
|
private static String toDescription(String action, String path) {
|
||||||
return action +
|
return action +
|
||||||
(StringUtils.isNotEmpty(path) ? (" on " + path) : "");
|
(StringUtils.isNotEmpty(path) ? (" on " + path) : "");
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,214 +0,0 @@
|
||||||
/*
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one
|
|
||||||
* or more contributor license agreements. See the NOTICE file
|
|
||||||
* distributed with this work for additional information
|
|
||||||
* regarding copyright ownership. The ASF licenses this file
|
|
||||||
* to you under the Apache License, Version 2.0 (the
|
|
||||||
* "License"); you may not use this file except in compliance
|
|
||||||
* with the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
|
|
||||||
package org.apache.hadoop.fs.s3a;
|
|
||||||
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.util.ListIterator;
|
|
||||||
import java.util.NoSuchElementException;
|
|
||||||
import javax.annotation.Nullable;
|
|
||||||
|
|
||||||
import com.amazonaws.services.s3.AmazonS3;
|
|
||||||
import com.amazonaws.services.s3.model.ListMultipartUploadsRequest;
|
|
||||||
import com.amazonaws.services.s3.model.MultipartUpload;
|
|
||||||
import com.amazonaws.services.s3.model.MultipartUploadListing;
|
|
||||||
import org.slf4j.Logger;
|
|
||||||
import org.slf4j.LoggerFactory;
|
|
||||||
|
|
||||||
import org.apache.hadoop.fs.RemoteIterator;
|
|
||||||
|
|
||||||
|
|
||||||
/**
|
|
||||||
* MultipartUtils upload-specific functions for use by S3AFileSystem and Hadoop
|
|
||||||
* CLI.
|
|
||||||
*/
|
|
||||||
public final class MultipartUtils {
|
|
||||||
|
|
||||||
private static final Logger LOG =
|
|
||||||
LoggerFactory.getLogger(MultipartUtils.class);
|
|
||||||
|
|
||||||
/** Not instantiated. */
|
|
||||||
private MultipartUtils() { }
|
|
||||||
|
|
||||||
/**
|
|
||||||
* List outstanding multipart uploads.
|
|
||||||
* Package private: S3AFileSystem and tests are the users of this.
|
|
||||||
* @param s3 AmazonS3 client to use.
|
|
||||||
* @param bucketName name of S3 bucket to use.
|
|
||||||
* @param maxKeys maximum batch size to request at a time from S3.
|
|
||||||
* @param prefix optional key prefix to narrow search. If null then whole
|
|
||||||
* bucket will be searched.
|
|
||||||
* @return an iterator of matching uploads
|
|
||||||
*/
|
|
||||||
static MultipartUtils.UploadIterator listMultipartUploads(AmazonS3 s3,
|
|
||||||
Invoker invoker, String bucketName, int maxKeys, @Nullable String prefix)
|
|
||||||
throws IOException {
|
|
||||||
return new MultipartUtils.UploadIterator(s3, invoker, bucketName, maxKeys,
|
|
||||||
prefix);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Simple RemoteIterator wrapper for AWS `listMultipartUpload` API.
|
|
||||||
* Iterates over batches of multipart upload metadata listings.
|
|
||||||
*/
|
|
||||||
static class ListingIterator implements
|
|
||||||
RemoteIterator<MultipartUploadListing> {
|
|
||||||
|
|
||||||
private final String bucketName;
|
|
||||||
private final String prefix;
|
|
||||||
private final int maxKeys;
|
|
||||||
private final AmazonS3 s3;
|
|
||||||
private final Invoker invoker;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Most recent listing results.
|
|
||||||
*/
|
|
||||||
private MultipartUploadListing listing;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Indicator that this is the first listing.
|
|
||||||
*/
|
|
||||||
private boolean firstListing = true;
|
|
||||||
|
|
||||||
private int listCount = 1;
|
|
||||||
|
|
||||||
ListingIterator(AmazonS3 s3, Invoker invoker, String bucketName,
|
|
||||||
int maxKeys, @Nullable String prefix) throws IOException {
|
|
||||||
this.s3 = s3;
|
|
||||||
this.bucketName = bucketName;
|
|
||||||
this.maxKeys = maxKeys;
|
|
||||||
this.prefix = prefix;
|
|
||||||
this.invoker = invoker;
|
|
||||||
|
|
||||||
requestNextBatch();
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Iterator has data if it is either is the initial iteration, or
|
|
||||||
* the last listing obtained was incomplete.
|
|
||||||
* @throws IOException not thrown by this implementation.
|
|
||||||
*/
|
|
||||||
@Override
|
|
||||||
public boolean hasNext() throws IOException {
|
|
||||||
if (listing == null) {
|
|
||||||
// shouldn't happen, but don't trust AWS SDK
|
|
||||||
return false;
|
|
||||||
} else {
|
|
||||||
return firstListing || listing.isTruncated();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Get next listing. First call, this returns initial set (possibly
|
|
||||||
* empty) obtained from S3. Subsequent calls my block on I/O or fail.
|
|
||||||
* @return next upload listing.
|
|
||||||
* @throws IOException if S3 operation fails.
|
|
||||||
* @throws NoSuchElementException if there are no more uploads.
|
|
||||||
*/
|
|
||||||
@Override
|
|
||||||
@Retries.RetryTranslated
|
|
||||||
public MultipartUploadListing next() throws IOException {
|
|
||||||
if (firstListing) {
|
|
||||||
firstListing = false;
|
|
||||||
} else {
|
|
||||||
if (listing == null || !listing.isTruncated()) {
|
|
||||||
// nothing more to request: fail.
|
|
||||||
throw new NoSuchElementException("No more uploads under " + prefix);
|
|
||||||
}
|
|
||||||
// need to request a new set of objects.
|
|
||||||
requestNextBatch();
|
|
||||||
}
|
|
||||||
return listing;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public String toString() {
|
|
||||||
return "Upload iterator: prefix " + prefix + "; list count " +
|
|
||||||
listCount + "; isTruncated=" + listing.isTruncated();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Retries.RetryTranslated
|
|
||||||
private void requestNextBatch() throws IOException {
|
|
||||||
ListMultipartUploadsRequest req =
|
|
||||||
new ListMultipartUploadsRequest(bucketName);
|
|
||||||
if (prefix != null) {
|
|
||||||
req.setPrefix(prefix);
|
|
||||||
}
|
|
||||||
if (!firstListing) {
|
|
||||||
req.setKeyMarker(listing.getNextKeyMarker());
|
|
||||||
req.setUploadIdMarker(listing.getNextUploadIdMarker());
|
|
||||||
}
|
|
||||||
req.setMaxUploads(listCount);
|
|
||||||
|
|
||||||
LOG.debug("[{}], Requesting next {} uploads prefix {}, " +
|
|
||||||
"next key {}, next upload id {}", listCount, maxKeys, prefix,
|
|
||||||
req.getKeyMarker(), req.getUploadIdMarker());
|
|
||||||
listCount++;
|
|
||||||
|
|
||||||
listing = invoker.retry("listMultipartUploads", prefix, true,
|
|
||||||
() -> s3.listMultipartUploads(req));
|
|
||||||
LOG.debug("New listing state: {}", this);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Iterator over multipart uploads. Similar to
|
|
||||||
* {@link org.apache.hadoop.fs.s3a.Listing.FileStatusListingIterator}, but
|
|
||||||
* iterates over pending uploads instead of existing objects.
|
|
||||||
*/
|
|
||||||
public static class UploadIterator
|
|
||||||
implements RemoteIterator<MultipartUpload> {
|
|
||||||
|
|
||||||
private ListingIterator lister;
|
|
||||||
/** Current listing: the last upload listing we fetched. */
|
|
||||||
private MultipartUploadListing listing;
|
|
||||||
/** Iterator over the current listing. */
|
|
||||||
private ListIterator<MultipartUpload> batchIterator;
|
|
||||||
|
|
||||||
@Retries.RetryTranslated
|
|
||||||
public UploadIterator(AmazonS3 s3, Invoker invoker, String bucketName,
|
|
||||||
int maxKeys, @Nullable String prefix)
|
|
||||||
throws IOException {
|
|
||||||
|
|
||||||
lister = new ListingIterator(s3, invoker, bucketName, maxKeys, prefix);
|
|
||||||
requestNextBatch();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public boolean hasNext() throws IOException {
|
|
||||||
return (batchIterator.hasNext() || requestNextBatch());
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public MultipartUpload next() throws IOException {
|
|
||||||
if (!hasNext()) {
|
|
||||||
throw new NoSuchElementException();
|
|
||||||
}
|
|
||||||
return batchIterator.next();
|
|
||||||
}
|
|
||||||
|
|
||||||
private boolean requestNextBatch() throws IOException {
|
|
||||||
if (lister.hasNext()) {
|
|
||||||
listing = lister.next();
|
|
||||||
batchIterator = listing.getMultipartUploads().listIterator();
|
|
||||||
return batchIterator.hasNext();
|
|
||||||
}
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -43,7 +43,6 @@ import java.util.concurrent.LinkedBlockingQueue;
|
||||||
import java.util.concurrent.ThreadPoolExecutor;
|
import java.util.concurrent.ThreadPoolExecutor;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
import javax.annotation.Nullable;
|
|
||||||
|
|
||||||
import com.amazonaws.AmazonClientException;
|
import com.amazonaws.AmazonClientException;
|
||||||
import com.amazonaws.AmazonServiceException;
|
import com.amazonaws.AmazonServiceException;
|
||||||
|
@ -196,7 +195,6 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities {
|
||||||
private String blockOutputBuffer;
|
private String blockOutputBuffer;
|
||||||
private S3ADataBlocks.BlockFactory blockFactory;
|
private S3ADataBlocks.BlockFactory blockFactory;
|
||||||
private int blockOutputActiveBlocks;
|
private int blockOutputActiveBlocks;
|
||||||
private WriteOperationHelper writeHelper;
|
|
||||||
private boolean useListV1;
|
private boolean useListV1;
|
||||||
private MagicCommitIntegration committerIntegration;
|
private MagicCommitIntegration committerIntegration;
|
||||||
|
|
||||||
|
@ -250,7 +248,6 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities {
|
||||||
s3 = ReflectionUtils.newInstance(s3ClientFactoryClass, conf)
|
s3 = ReflectionUtils.newInstance(s3ClientFactoryClass, conf)
|
||||||
.createS3Client(name);
|
.createS3Client(name);
|
||||||
invoker = new Invoker(new S3ARetryPolicy(getConf()), onRetry);
|
invoker = new Invoker(new S3ARetryPolicy(getConf()), onRetry);
|
||||||
writeHelper = new WriteOperationHelper(this, getConf());
|
|
||||||
|
|
||||||
maxKeys = intOption(conf, MAX_PAGING_KEYS, DEFAULT_MAX_PAGING_KEYS, 1);
|
maxKeys = intOption(conf, MAX_PAGING_KEYS, DEFAULT_MAX_PAGING_KEYS, 1);
|
||||||
listing = new Listing(this);
|
listing = new Listing(this);
|
||||||
|
@ -766,13 +763,13 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities {
|
||||||
partSize,
|
partSize,
|
||||||
blockFactory,
|
blockFactory,
|
||||||
instrumentation.newOutputStreamStatistics(statistics),
|
instrumentation.newOutputStreamStatistics(statistics),
|
||||||
getWriteOperationHelper(),
|
createWriteOperationHelper(),
|
||||||
putTracker),
|
putTracker),
|
||||||
null);
|
null);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get a {@code WriteOperationHelper} instance.
|
* Create a new {@code WriteOperationHelper} instance.
|
||||||
*
|
*
|
||||||
* This class permits other low-level operations against the store.
|
* This class permits other low-level operations against the store.
|
||||||
* It is unstable and
|
* It is unstable and
|
||||||
|
@ -781,8 +778,8 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities {
|
||||||
* @return a new helper.
|
* @return a new helper.
|
||||||
*/
|
*/
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
public WriteOperationHelper getWriteOperationHelper() {
|
public WriteOperationHelper createWriteOperationHelper() {
|
||||||
return writeHelper;
|
return new WriteOperationHelper(this);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -3156,28 +3153,8 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities {
|
||||||
: null);
|
: null);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* List any pending multipart uploads whose keys begin with prefix, using
|
|
||||||
* an iterator that can handle an unlimited number of entries.
|
|
||||||
* See {@link #listMultipartUploads(String)} for a non-iterator version of
|
|
||||||
* this.
|
|
||||||
*
|
|
||||||
* @param prefix optional key prefix to search
|
|
||||||
* @return Iterator over multipart uploads.
|
|
||||||
* @throws IOException on failure
|
|
||||||
*/
|
|
||||||
@InterfaceAudience.Private
|
|
||||||
@Retries.RetryTranslated
|
|
||||||
public MultipartUtils.UploadIterator listUploads(@Nullable String prefix)
|
|
||||||
throws IOException {
|
|
||||||
return MultipartUtils.listMultipartUploads(s3, invoker, bucket, maxKeys,
|
|
||||||
prefix);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Listing all multipart uploads; limited to the first few hundred.
|
* Listing all multipart uploads; limited to the first few hundred.
|
||||||
* See {@link #listUploads(String)} for an iterator-based version that does
|
|
||||||
* not limit the number of entries returned.
|
|
||||||
* Retry policy: retry, translated.
|
* Retry policy: retry, translated.
|
||||||
* @return a listing of multipart uploads.
|
* @return a listing of multipart uploads.
|
||||||
* @param prefix prefix to scan for, "" for none
|
* @param prefix prefix to scan for, "" for none
|
||||||
|
@ -3264,4 +3241,5 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -51,7 +51,6 @@ import com.google.common.collect.Lists;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import javax.annotation.Nullable;
|
|
||||||
import java.io.EOFException;
|
import java.io.EOFException;
|
||||||
import java.io.FileNotFoundException;
|
import java.io.FileNotFoundException;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
@ -154,7 +153,7 @@ public final class S3AUtils {
|
||||||
* @return an IOE which wraps the caught exception.
|
* @return an IOE which wraps the caught exception.
|
||||||
*/
|
*/
|
||||||
@SuppressWarnings("ThrowableInstanceNeverThrown")
|
@SuppressWarnings("ThrowableInstanceNeverThrown")
|
||||||
public static IOException translateException(@Nullable String operation,
|
public static IOException translateException(String operation,
|
||||||
String path,
|
String path,
|
||||||
SdkBaseException exception) {
|
SdkBaseException exception) {
|
||||||
String message = String.format("%s%s: %s",
|
String message = String.format("%s%s: %s",
|
||||||
|
|
|
@ -38,7 +38,6 @@ import com.amazonaws.services.s3.model.UploadPartRequest;
|
||||||
import com.amazonaws.services.s3.model.UploadPartResult;
|
import com.amazonaws.services.s3.model.UploadPartResult;
|
||||||
import com.amazonaws.services.s3.transfer.model.UploadResult;
|
import com.amazonaws.services.s3.transfer.model.UploadResult;
|
||||||
import com.google.common.base.Preconditions;
|
import com.google.common.base.Preconditions;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
@ -85,9 +84,9 @@ public class WriteOperationHelper {
|
||||||
* @param conf Configuration object
|
* @param conf Configuration object
|
||||||
*
|
*
|
||||||
*/
|
*/
|
||||||
protected WriteOperationHelper(S3AFileSystem owner, Configuration conf) {
|
protected WriteOperationHelper(S3AFileSystem owner) {
|
||||||
this.owner = owner;
|
this.owner = owner;
|
||||||
this.invoker = new Invoker(new S3ARetryPolicy(conf),
|
this.invoker = new Invoker(new S3ARetryPolicy(owner.getConf()),
|
||||||
this::operationRetried);
|
this::operationRetried);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -101,7 +101,7 @@ public class CommitOperations {
|
||||||
Preconditions.checkArgument(fs != null, "null fs");
|
Preconditions.checkArgument(fs != null, "null fs");
|
||||||
this.fs = fs;
|
this.fs = fs;
|
||||||
statistics = fs.newCommitterStatistics();
|
statistics = fs.newCommitterStatistics();
|
||||||
writeOperations = fs.getWriteOperationHelper();
|
writeOperations = fs.createWriteOperationHelper();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -101,7 +101,7 @@ public class MagicCommitIntegration {
|
||||||
key,
|
key,
|
||||||
destKey,
|
destKey,
|
||||||
pendingsetPath,
|
pendingsetPath,
|
||||||
owner.getWriteOperationHelper());
|
owner.createWriteOperationHelper());
|
||||||
LOG.debug("Created {}", tracker);
|
LOG.debug("Created {}", tracker);
|
||||||
} else {
|
} else {
|
||||||
LOG.warn("File being created has a \"magic\" path, but the filesystem"
|
LOG.warn("File being created has a \"magic\" path, but the filesystem"
|
||||||
|
|
|
@ -23,17 +23,14 @@ import java.io.IOException;
|
||||||
import java.io.PrintStream;
|
import java.io.PrintStream;
|
||||||
import java.net.URI;
|
import java.net.URI;
|
||||||
import java.net.URISyntaxException;
|
import java.net.URISyntaxException;
|
||||||
import java.util.Date;
|
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Locale;
|
import java.util.Locale;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Scanner;
|
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
import com.amazonaws.services.s3.model.MultipartUpload;
|
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
import com.google.common.base.Preconditions;
|
import com.google.common.base.Preconditions;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
|
@ -47,7 +44,6 @@ import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.LocatedFileStatus;
|
import org.apache.hadoop.fs.LocatedFileStatus;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.fs.RemoteIterator;
|
import org.apache.hadoop.fs.RemoteIterator;
|
||||||
import org.apache.hadoop.fs.s3a.MultipartUtils;
|
|
||||||
import org.apache.hadoop.fs.s3a.S3AFileStatus;
|
import org.apache.hadoop.fs.s3a.S3AFileStatus;
|
||||||
import org.apache.hadoop.fs.s3a.S3AFileSystem;
|
import org.apache.hadoop.fs.s3a.S3AFileSystem;
|
||||||
import org.apache.hadoop.fs.s3a.S3AUtils;
|
import org.apache.hadoop.fs.s3a.S3AUtils;
|
||||||
|
@ -59,7 +55,6 @@ import org.apache.hadoop.util.Tool;
|
||||||
import org.apache.hadoop.util.ToolRunner;
|
import org.apache.hadoop.util.ToolRunner;
|
||||||
|
|
||||||
import static org.apache.hadoop.fs.s3a.Constants.*;
|
import static org.apache.hadoop.fs.s3a.Constants.*;
|
||||||
import static org.apache.hadoop.fs.s3a.Invoker.LOG_EVENT;
|
|
||||||
import static org.apache.hadoop.service.launcher.LauncherExitCodes.*;
|
import static org.apache.hadoop.service.launcher.LauncherExitCodes.*;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -84,7 +79,6 @@ public abstract class S3GuardTool extends Configured implements Tool {
|
||||||
"\t" + Destroy.NAME + " - " + Destroy.PURPOSE + "\n" +
|
"\t" + Destroy.NAME + " - " + Destroy.PURPOSE + "\n" +
|
||||||
"\t" + Import.NAME + " - " + Import.PURPOSE + "\n" +
|
"\t" + Import.NAME + " - " + Import.PURPOSE + "\n" +
|
||||||
"\t" + BucketInfo.NAME + " - " + BucketInfo.PURPOSE + "\n" +
|
"\t" + BucketInfo.NAME + " - " + BucketInfo.PURPOSE + "\n" +
|
||||||
"\t" + Uploads.NAME + " - " + Uploads.PURPOSE + "\n" +
|
|
||||||
"\t" + Diff.NAME + " - " + Diff.PURPOSE + "\n" +
|
"\t" + Diff.NAME + " - " + Diff.PURPOSE + "\n" +
|
||||||
"\t" + Prune.NAME + " - " + Prune.PURPOSE + "\n" +
|
"\t" + Prune.NAME + " - " + Prune.PURPOSE + "\n" +
|
||||||
"\t" + SetCapacity.NAME + " - " +SetCapacity.PURPOSE + "\n";
|
"\t" + SetCapacity.NAME + " - " +SetCapacity.PURPOSE + "\n";
|
||||||
|
@ -106,14 +100,10 @@ public abstract class S3GuardTool extends Configured implements Tool {
|
||||||
private final CommandFormat commandFormat;
|
private final CommandFormat commandFormat;
|
||||||
|
|
||||||
public static final String META_FLAG = "meta";
|
public static final String META_FLAG = "meta";
|
||||||
|
|
||||||
// These are common to prune, upload subcommands.
|
|
||||||
public static final String DAYS_FLAG = "days";
|
public static final String DAYS_FLAG = "days";
|
||||||
public static final String HOURS_FLAG = "hours";
|
public static final String HOURS_FLAG = "hours";
|
||||||
public static final String MINUTES_FLAG = "minutes";
|
public static final String MINUTES_FLAG = "minutes";
|
||||||
public static final String SECONDS_FLAG = "seconds";
|
public static final String SECONDS_FLAG = "seconds";
|
||||||
public static final String AGE_OPTIONS_USAGE = "[-days <days>] "
|
|
||||||
+ "[-hours <hours>] [-minutes <minutes>] [-seconds <seconds>]";
|
|
||||||
|
|
||||||
public static final String REGION_FLAG = "region";
|
public static final String REGION_FLAG = "region";
|
||||||
public static final String READ_FLAG = "read";
|
public static final String READ_FLAG = "read";
|
||||||
|
@ -187,36 +177,6 @@ public abstract class S3GuardTool extends Configured implements Tool {
|
||||||
"config, or S3 bucket");
|
"config, or S3 bucket");
|
||||||
}
|
}
|
||||||
|
|
||||||
private long getDeltaComponent(TimeUnit unit, String arg) {
|
|
||||||
String raw = getCommandFormat().getOptValue(arg);
|
|
||||||
if (raw == null || raw.isEmpty()) {
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
Long parsed = Long.parseLong(raw);
|
|
||||||
return unit.toMillis(parsed);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Convert all age options supplied to total milliseconds of time.
|
|
||||||
* @return Sum of all age options, or zero if none were given.
|
|
||||||
*/
|
|
||||||
long ageOptionsToMsec() {
|
|
||||||
long cliDelta = 0;
|
|
||||||
cliDelta += getDeltaComponent(TimeUnit.DAYS, DAYS_FLAG);
|
|
||||||
cliDelta += getDeltaComponent(TimeUnit.HOURS, HOURS_FLAG);
|
|
||||||
cliDelta += getDeltaComponent(TimeUnit.MINUTES, MINUTES_FLAG);
|
|
||||||
cliDelta += getDeltaComponent(TimeUnit.SECONDS, SECONDS_FLAG);
|
|
||||||
return cliDelta;
|
|
||||||
}
|
|
||||||
|
|
||||||
protected void addAgeOptions() {
|
|
||||||
CommandFormat format = getCommandFormat();
|
|
||||||
format.addOptionWithValue(DAYS_FLAG);
|
|
||||||
format.addOptionWithValue(HOURS_FLAG);
|
|
||||||
format.addOptionWithValue(MINUTES_FLAG);
|
|
||||||
format.addOptionWithValue(SECONDS_FLAG);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Parse metadata store from command line option or HDFS configuration.
|
* Parse metadata store from command line option or HDFS configuration.
|
||||||
*
|
*
|
||||||
|
@ -907,8 +867,7 @@ public abstract class S3GuardTool extends Configured implements Tool {
|
||||||
"Common options:\n" +
|
"Common options:\n" +
|
||||||
" -" + META_FLAG + " URL - Metadata repository details " +
|
" -" + META_FLAG + " URL - Metadata repository details " +
|
||||||
"(implementation-specific)\n" +
|
"(implementation-specific)\n" +
|
||||||
"Age options. Any combination of these integer-valued options:\n" +
|
"\n" +
|
||||||
AGE_OPTIONS_USAGE + "\n" +
|
|
||||||
"Amazon DynamoDB-specific options:\n" +
|
"Amazon DynamoDB-specific options:\n" +
|
||||||
" -" + REGION_FLAG + " REGION - Service region for connections\n" +
|
" -" + REGION_FLAG + " REGION - Service region for connections\n" +
|
||||||
"\n" +
|
"\n" +
|
||||||
|
@ -918,7 +877,12 @@ public abstract class S3GuardTool extends Configured implements Tool {
|
||||||
|
|
||||||
Prune(Configuration conf) {
|
Prune(Configuration conf) {
|
||||||
super(conf);
|
super(conf);
|
||||||
addAgeOptions();
|
|
||||||
|
CommandFormat format = getCommandFormat();
|
||||||
|
format.addOptionWithValue(DAYS_FLAG);
|
||||||
|
format.addOptionWithValue(HOURS_FLAG);
|
||||||
|
format.addOptionWithValue(MINUTES_FLAG);
|
||||||
|
format.addOptionWithValue(SECONDS_FLAG);
|
||||||
}
|
}
|
||||||
|
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
|
@ -937,6 +901,15 @@ public abstract class S3GuardTool extends Configured implements Tool {
|
||||||
return USAGE;
|
return USAGE;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private long getDeltaComponent(TimeUnit unit, String arg) {
|
||||||
|
String raw = getCommandFormat().getOptValue(arg);
|
||||||
|
if (raw == null || raw.isEmpty()) {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
Long parsed = Long.parseLong(raw);
|
||||||
|
return unit.toMillis(parsed);
|
||||||
|
}
|
||||||
|
|
||||||
public int run(String[] args, PrintStream out) throws
|
public int run(String[] args, PrintStream out) throws
|
||||||
InterruptedException, IOException {
|
InterruptedException, IOException {
|
||||||
List<String> paths = parseArgs(args);
|
List<String> paths = parseArgs(args);
|
||||||
|
@ -951,7 +924,11 @@ public abstract class S3GuardTool extends Configured implements Tool {
|
||||||
Configuration conf = getConf();
|
Configuration conf = getConf();
|
||||||
long confDelta = conf.getLong(S3GUARD_CLI_PRUNE_AGE, 0);
|
long confDelta = conf.getLong(S3GUARD_CLI_PRUNE_AGE, 0);
|
||||||
|
|
||||||
long cliDelta = ageOptionsToMsec();
|
long cliDelta = 0;
|
||||||
|
cliDelta += getDeltaComponent(TimeUnit.DAYS, "days");
|
||||||
|
cliDelta += getDeltaComponent(TimeUnit.HOURS, "hours");
|
||||||
|
cliDelta += getDeltaComponent(TimeUnit.MINUTES, "minutes");
|
||||||
|
cliDelta += getDeltaComponent(TimeUnit.SECONDS, "seconds");
|
||||||
|
|
||||||
if (confDelta <= 0 && cliDelta <= 0) {
|
if (confDelta <= 0 && cliDelta <= 0) {
|
||||||
errorln("You must specify a positive age for metadata to prune.");
|
errorln("You must specify a positive age for metadata to prune.");
|
||||||
|
@ -1103,214 +1080,6 @@ public abstract class S3GuardTool extends Configured implements Tool {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Command to list / abort pending multipart uploads.
|
|
||||||
*/
|
|
||||||
static class Uploads extends S3GuardTool {
|
|
||||||
public static final String NAME = "uploads";
|
|
||||||
public static final String ABORT = "abort";
|
|
||||||
public static final String LIST = "list";
|
|
||||||
public static final String EXPECT = "expect";
|
|
||||||
public static final String VERBOSE = "verbose";
|
|
||||||
public static final String FORCE = "force";
|
|
||||||
|
|
||||||
public static final String PURPOSE = "list or abort pending " +
|
|
||||||
"multipart uploads";
|
|
||||||
private static final String USAGE = NAME + " [OPTIONS] " +
|
|
||||||
"s3a://BUCKET[/path]\n"
|
|
||||||
+ "\t" + PURPOSE + "\n\n"
|
|
||||||
+ "Common options:\n"
|
|
||||||
+ " (-" + LIST + " | -" + EXPECT +" <num-uploads> | -" + ABORT
|
|
||||||
+ ") [-" + VERBOSE +"] "
|
|
||||||
+ "[<age-options>] [-force]\n"
|
|
||||||
+ "\t - Under given path, list or delete all uploads," +
|
|
||||||
" or only those \n"
|
|
||||||
+ "older than specified by <age-options>\n"
|
|
||||||
+ "<age-options> are any combination of the integer-valued options:\n"
|
|
||||||
+ "\t" + AGE_OPTIONS_USAGE + "\n"
|
|
||||||
+ "-" + EXPECT + " is similar to list, except no output is printed,\n"
|
|
||||||
+ "\tbut the exit code will be an error if the provided number\n"
|
|
||||||
+ "\tis different that the number of uploads found by the command.\n"
|
|
||||||
+ "-" + FORCE + " option prevents the \"Are you sure\" prompt when\n"
|
|
||||||
+ "\tusing -" + ABORT;
|
|
||||||
|
|
||||||
/** Constant used for output and parsed by tests. */
|
|
||||||
public static final String TOTAL = "Total";
|
|
||||||
|
|
||||||
/** Runs in one of three modes. */
|
|
||||||
private enum Mode { LIST, EXPECT, ABORT };
|
|
||||||
private Mode mode = null;
|
|
||||||
|
|
||||||
/** For Mode == EXPECT, expected listing size. */
|
|
||||||
private int expectedCount;
|
|
||||||
|
|
||||||
/** List/abort uploads older than this many milliseconds. */
|
|
||||||
private long ageMsec = 0;
|
|
||||||
|
|
||||||
/** Verbose output flag. */
|
|
||||||
private boolean verbose = false;
|
|
||||||
|
|
||||||
/** Whether to delete with out "are you sure" prompt. */
|
|
||||||
private boolean force = false;
|
|
||||||
|
|
||||||
/** Path prefix to use when searching multipart uploads. */
|
|
||||||
private String prefix;
|
|
||||||
|
|
||||||
Uploads(Configuration conf) {
|
|
||||||
super(conf, ABORT, LIST, VERBOSE, FORCE);
|
|
||||||
addAgeOptions();
|
|
||||||
getCommandFormat().addOptionWithValue(EXPECT);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
String getName() {
|
|
||||||
return NAME;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public String getUsage() {
|
|
||||||
return USAGE;
|
|
||||||
}
|
|
||||||
|
|
||||||
public int run(String[] args, PrintStream out)
|
|
||||||
throws InterruptedException, IOException {
|
|
||||||
List<String> paths = parseArgs(args);
|
|
||||||
if (paths.isEmpty()) {
|
|
||||||
errorln(getUsage());
|
|
||||||
throw invalidArgs("No options specified");
|
|
||||||
}
|
|
||||||
processArgs(paths, out);
|
|
||||||
promptBeforeAbort(out);
|
|
||||||
processUploads(out);
|
|
||||||
|
|
||||||
out.flush();
|
|
||||||
return SUCCESS;
|
|
||||||
}
|
|
||||||
|
|
||||||
private void promptBeforeAbort(PrintStream out) throws IOException {
|
|
||||||
if (mode != Mode.ABORT || force) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
Scanner scanner = new Scanner(System.in, "UTF-8");
|
|
||||||
out.println("Are you sure you want to delete any pending " +
|
|
||||||
"uploads? (yes/no) >");
|
|
||||||
String response = scanner.nextLine();
|
|
||||||
if (!"yes".equalsIgnoreCase(response)) {
|
|
||||||
throw S3GuardTool.userAborted("User did not answer yes, quitting.");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private void processUploads(PrintStream out) throws IOException {
|
|
||||||
MultipartUtils.UploadIterator uploads;
|
|
||||||
uploads = getFilesystem().listUploads(prefix);
|
|
||||||
|
|
||||||
int count = 0;
|
|
||||||
while (uploads.hasNext()) {
|
|
||||||
MultipartUpload upload = uploads.next();
|
|
||||||
if (!olderThan(upload, ageMsec)) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
count++;
|
|
||||||
if (mode == Mode.ABORT || mode == Mode.LIST || verbose) {
|
|
||||||
println(out, "%s%s %s", mode == Mode.ABORT ? "Deleting: " : "",
|
|
||||||
upload.getKey(), upload.getUploadId());
|
|
||||||
}
|
|
||||||
if (mode == Mode.ABORT) {
|
|
||||||
getFilesystem().getWriteOperationHelper()
|
|
||||||
.abortMultipartUpload(upload.getKey(), upload.getUploadId(),
|
|
||||||
LOG_EVENT);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if (mode != Mode.EXPECT || verbose) {
|
|
||||||
println(out, "%s %d uploads %s.", TOTAL, count,
|
|
||||||
mode == Mode.ABORT ? "deleted" : "found");
|
|
||||||
}
|
|
||||||
if (mode == Mode.EXPECT) {
|
|
||||||
if (count != expectedCount) {
|
|
||||||
throw badState("Expected %d uploads, found %d", expectedCount, count);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Check if upload is at least as old as given age.
|
|
||||||
* @param u upload to check
|
|
||||||
* @param msec age in milliseconds
|
|
||||||
* @return true iff u was created at least age milliseconds ago.
|
|
||||||
*/
|
|
||||||
private boolean olderThan(MultipartUpload u, long msec) {
|
|
||||||
Date ageDate = new Date(System.currentTimeMillis() - msec);
|
|
||||||
return ageDate.compareTo(u.getInitiated()) >= 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
private void processArgs(List<String> args, PrintStream out)
|
|
||||||
throws IOException {
|
|
||||||
CommandFormat commands = getCommandFormat();
|
|
||||||
String err = "Can only specify one of -" + LIST + ", " +
|
|
||||||
" -" + ABORT + ", and " + EXPECT;
|
|
||||||
|
|
||||||
// Three mutually-exclusive options
|
|
||||||
if (commands.getOpt(LIST)) {
|
|
||||||
mode = Mode.LIST;
|
|
||||||
}
|
|
||||||
if (commands.getOpt(ABORT)) {
|
|
||||||
if (mode != null) {
|
|
||||||
throw invalidArgs(err);
|
|
||||||
}
|
|
||||||
mode = Mode.ABORT;
|
|
||||||
}
|
|
||||||
|
|
||||||
String expectVal = commands.getOptValue(EXPECT);
|
|
||||||
if (expectVal != null) {
|
|
||||||
if (mode != null) {
|
|
||||||
throw invalidArgs(err);
|
|
||||||
}
|
|
||||||
mode = Mode.EXPECT;
|
|
||||||
expectedCount = Integer.parseInt(expectVal);
|
|
||||||
}
|
|
||||||
|
|
||||||
// Default to list
|
|
||||||
if (mode == null) {
|
|
||||||
vprintln(out, "No mode specified, defaulting to -" + LIST);
|
|
||||||
mode = Mode.LIST;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Other flags
|
|
||||||
if (commands.getOpt(VERBOSE)) {
|
|
||||||
verbose = true;
|
|
||||||
}
|
|
||||||
if (commands.getOpt(FORCE)) {
|
|
||||||
force = true;
|
|
||||||
}
|
|
||||||
ageMsec = ageOptionsToMsec();
|
|
||||||
|
|
||||||
String s3Path = args.get(0);
|
|
||||||
URI uri = S3GuardTool.toUri(s3Path);
|
|
||||||
prefix = uri.getPath();
|
|
||||||
if (prefix.length() > 0) {
|
|
||||||
prefix = prefix.substring(1);
|
|
||||||
}
|
|
||||||
vprintln(out, "Command: %s, age %d msec, path %s (prefix \"%s\")",
|
|
||||||
mode.name(), ageMsec, s3Path, prefix);
|
|
||||||
|
|
||||||
initS3AFileSystem(s3Path);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* If verbose flag is set, print a formatted string followed by a newline
|
|
||||||
* to the output stream.
|
|
||||||
* @param out destination
|
|
||||||
* @param format format string
|
|
||||||
* @param args optional arguments
|
|
||||||
*/
|
|
||||||
private void vprintln(PrintStream out, String format, Object...
|
|
||||||
args) {
|
|
||||||
if (verbose) {
|
|
||||||
out.println(String.format(format, args));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private static S3GuardTool command;
|
private static S3GuardTool command;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -1413,17 +1182,6 @@ public abstract class S3GuardTool extends Configured implements Tool {
|
||||||
String.format(format, args));
|
String.format(format, args));
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Build the exception to raise on user-aborted action.
|
|
||||||
* @param format string format
|
|
||||||
* @param args optional arguments for the string
|
|
||||||
* @return a new exception to throw
|
|
||||||
*/
|
|
||||||
protected static ExitUtil.ExitException userAborted(
|
|
||||||
String format, Object...args) {
|
|
||||||
return new ExitUtil.ExitException(ERROR, String.format(format, args));
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Execute the command with the given arguments.
|
* Execute the command with the given arguments.
|
||||||
*
|
*
|
||||||
|
@ -1466,9 +1224,6 @@ public abstract class S3GuardTool extends Configured implements Tool {
|
||||||
case SetCapacity.NAME:
|
case SetCapacity.NAME:
|
||||||
command = new SetCapacity(conf);
|
command = new SetCapacity(conf);
|
||||||
break;
|
break;
|
||||||
case Uploads.NAME:
|
|
||||||
command = new Uploads(conf);
|
|
||||||
break;
|
|
||||||
default:
|
default:
|
||||||
printHelp();
|
printHelp();
|
||||||
throw new ExitUtil.ExitException(E_USAGE,
|
throw new ExitUtil.ExitException(E_USAGE,
|
||||||
|
|
|
@ -1531,13 +1531,8 @@ from VMs running on EC2.
|
||||||
</property>
|
</property>
|
||||||
```
|
```
|
||||||
|
|
||||||
### <a name="multipart_purge"></a>Cleaning up after partial Upload Failures
|
### <a name="multipart_purge"></a>Cleaning up after partial Upload Failures: `fs.s3a.multipart.purge`
|
||||||
|
|
||||||
There are two mechanisms for cleaning up after leftover multipart
|
|
||||||
uploads:
|
|
||||||
- Hadoop s3guard CLI commands for listing and deleting uploads by their
|
|
||||||
age. Doumented in the [S3Guard](./s3guard.html) section.
|
|
||||||
- The configuration parameter `fs.s3a.multipart.purge`, covered below.
|
|
||||||
|
|
||||||
If an large stream writeoperation is interrupted, there may be
|
If an large stream writeoperation is interrupted, there may be
|
||||||
intermediate partitions uploaded to S3 —data which will be billed for.
|
intermediate partitions uploaded to S3 —data which will be billed for.
|
||||||
|
|
|
@ -515,43 +515,10 @@ hadoop s3guard bucket-info -guarded -auth s3a://landsat-pds
|
||||||
Require the bucket to be using S3Guard in authoritative mode. This will normally
|
Require the bucket to be using S3Guard in authoritative mode. This will normally
|
||||||
fail against this specific bucket.
|
fail against this specific bucket.
|
||||||
|
|
||||||
### List or Delete Leftover Multipart Uploads: `s3guard uploads`
|
|
||||||
|
|
||||||
Lists or deletes all pending (uncompleted) multipart uploads older than
|
|
||||||
given age.
|
|
||||||
|
|
||||||
```bash
|
|
||||||
hadoop s3guard uploads (-list | -abort | -expect <num-uploads>) [-verbose] \
|
|
||||||
[-days <days>] [-hours <hours>] [-minutes <minutes>] [-seconds <seconds>] \
|
|
||||||
[-force] s3a://bucket/prefix
|
|
||||||
```
|
|
||||||
|
|
||||||
The command lists or deletes all multipart uploads which are older than
|
|
||||||
the given age, and that match the prefix supplied, if any.
|
|
||||||
|
|
||||||
For example, to delete all uncompleted multipart uploads older than two
|
|
||||||
days in the folder at `s3a://my-bucket/path/to/stuff`, use the following
|
|
||||||
command:
|
|
||||||
|
|
||||||
```bash
|
|
||||||
hadoop s3guard uploads -abort -days 2 s3a://my-bucket/path/to/stuff
|
|
||||||
```
|
|
||||||
|
|
||||||
We recommend running with `-list` first to confirm the parts shown
|
|
||||||
are those that you wish to delete. Note that the command will prompt
|
|
||||||
you with a "Are you sure?" prompt unless you specify the `-force`
|
|
||||||
option. This is to safeguard against accidental deletion of data, which
|
|
||||||
is especially risky without a long age parameter as it can affect
|
|
||||||
in-fight uploads.
|
|
||||||
|
|
||||||
The `-expect` option is similar to `-list`, except it is silent by
|
|
||||||
default, and terminates with a success or failure exit code depending
|
|
||||||
on whether or not the supplied number matches the number of uploads
|
|
||||||
found that match the given options (path, age).
|
|
||||||
|
|
||||||
|
|
||||||
### Delete a table: `s3guard destroy`
|
### Delete a table: `s3guard destroy`
|
||||||
|
|
||||||
|
|
||||||
Deletes a metadata store. With DynamoDB as the store, this means
|
Deletes a metadata store. With DynamoDB as the store, this means
|
||||||
the specific DynamoDB table use to store the metadata.
|
the specific DynamoDB table use to store the metadata.
|
||||||
|
|
||||||
|
|
|
@ -1,126 +0,0 @@
|
||||||
/*
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one
|
|
||||||
* or more contributor license agreements. See the NOTICE file
|
|
||||||
* distributed with this work for additional information
|
|
||||||
* regarding copyright ownership. The ASF licenses this file
|
|
||||||
* to you under the Apache License, Version 2.0 (the
|
|
||||||
* "License"); you may not use this file except in compliance
|
|
||||||
* with the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
|
|
||||||
package org.apache.hadoop.fs.s3a;
|
|
||||||
|
|
||||||
import com.amazonaws.services.s3.model.MultipartUpload;
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
|
||||||
import org.apache.hadoop.fs.Path;
|
|
||||||
import org.junit.Test;
|
|
||||||
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.util.HashSet;
|
|
||||||
import java.util.Set;
|
|
||||||
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Tests for {@link MultipartUtils}.
|
|
||||||
*/
|
|
||||||
public class ITestS3AMultipartUtils extends AbstractS3ATestBase {
|
|
||||||
|
|
||||||
private static final int UPLOAD_LEN = 1024;
|
|
||||||
private static final String PART_FILENAME_BASE = "pending-part";
|
|
||||||
private static final int LIST_BATCH_SIZE = 2;
|
|
||||||
private static final int NUM_KEYS = 5;
|
|
||||||
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected Configuration createConfiguration() {
|
|
||||||
Configuration conf = super.createConfiguration();
|
|
||||||
S3ATestUtils.disableFilesystemCaching(conf);
|
|
||||||
// Forces listings to come back in multiple batches to test that part of
|
|
||||||
// the iterators.
|
|
||||||
conf.setInt(Constants.MAX_PAGING_KEYS, LIST_BATCH_SIZE);
|
|
||||||
return conf;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Main test case for upload part listing and iterator paging.
|
|
||||||
* @throws Exception on failure.
|
|
||||||
*/
|
|
||||||
@Test
|
|
||||||
public void testListMultipartUploads() throws Exception {
|
|
||||||
S3AFileSystem fs = getFileSystem();
|
|
||||||
Set<MultipartTestUtils.IdKey> keySet = new HashSet<>();
|
|
||||||
try {
|
|
||||||
// 1. Create NUM_KEYS pending upload parts
|
|
||||||
for (int i = 0; i < NUM_KEYS; i++) {
|
|
||||||
Path filePath = getPartFilename(i);
|
|
||||||
String key = fs.pathToKey(filePath);
|
|
||||||
describe("creating upload part with key %s", key);
|
|
||||||
// create a multipart upload
|
|
||||||
MultipartTestUtils.IdKey idKey = MultipartTestUtils
|
|
||||||
.createPartUpload(fs, key, UPLOAD_LEN,
|
|
||||||
1);
|
|
||||||
keySet.add(idKey);
|
|
||||||
}
|
|
||||||
|
|
||||||
// 2. Verify all uploads are found listing by prefix
|
|
||||||
describe("Verifying upload list by prefix");
|
|
||||||
MultipartUtils.UploadIterator uploads = fs.listUploads(getPartPrefix(fs));
|
|
||||||
assertUploadsPresent(uploads, keySet);
|
|
||||||
|
|
||||||
// 3. Verify all uploads are found listing without prefix
|
|
||||||
describe("Verifying list all uploads");
|
|
||||||
uploads = fs.listUploads(null);
|
|
||||||
assertUploadsPresent(uploads, keySet);
|
|
||||||
|
|
||||||
} finally {
|
|
||||||
// 4. Delete all uploads we created
|
|
||||||
MultipartTestUtils.cleanupParts(fs, keySet);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Assert that all provided multipart uploads are contained in the upload
|
|
||||||
* iterator's results.
|
|
||||||
* @param list upload iterator
|
|
||||||
* @param ourUploads set up uploads that should be present
|
|
||||||
* @throws IOException on I/O error
|
|
||||||
*/
|
|
||||||
private void assertUploadsPresent(MultipartUtils.UploadIterator list,
|
|
||||||
Set<MultipartTestUtils.IdKey> ourUploads) throws IOException {
|
|
||||||
|
|
||||||
// Don't modify passed-in set, use copy.
|
|
||||||
Set<MultipartTestUtils.IdKey> uploads = new HashSet<>(ourUploads);
|
|
||||||
while (list.hasNext()) {
|
|
||||||
MultipartTestUtils.IdKey listing = toIdKey(list.next());
|
|
||||||
if (uploads.contains(listing)) {
|
|
||||||
LOG.debug("Matched: {},{}", listing.getKey(), listing.getUploadId());
|
|
||||||
uploads.remove(listing);
|
|
||||||
} else {
|
|
||||||
LOG.debug("Not our upload {},{}", listing.getKey(),
|
|
||||||
listing.getUploadId());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
assertTrue("Not all our uploads were listed", uploads.isEmpty());
|
|
||||||
}
|
|
||||||
|
|
||||||
private MultipartTestUtils.IdKey toIdKey(MultipartUpload mu) {
|
|
||||||
return new MultipartTestUtils.IdKey(mu.getKey(), mu.getUploadId());
|
|
||||||
}
|
|
||||||
|
|
||||||
private Path getPartFilename(int index) throws IOException {
|
|
||||||
return path(String.format("%s-%d", PART_FILENAME_BASE, index));
|
|
||||||
}
|
|
||||||
|
|
||||||
private String getPartPrefix(S3AFileSystem fs) throws IOException {
|
|
||||||
return fs.pathToKey(path("blah").getParent());
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
|
@ -78,7 +78,6 @@ public class MockS3AFileSystem extends S3AFileSystem {
|
||||||
private final S3AInstrumentation instrumentation =
|
private final S3AInstrumentation instrumentation =
|
||||||
new S3AInstrumentation(FS_URI);
|
new S3AInstrumentation(FS_URI);
|
||||||
private Configuration conf;
|
private Configuration conf;
|
||||||
private WriteOperationHelper writeHelper;
|
|
||||||
|
|
||||||
public MockS3AFileSystem(S3AFileSystem mock,
|
public MockS3AFileSystem(S3AFileSystem mock,
|
||||||
Pair<StagingTestBase.ClientResults, StagingTestBase.ClientErrors> outcome) {
|
Pair<StagingTestBase.ClientResults, StagingTestBase.ClientErrors> outcome) {
|
||||||
|
@ -126,12 +125,6 @@ public class MockS3AFileSystem extends S3AFileSystem {
|
||||||
public void initialize(URI name, Configuration originalConf)
|
public void initialize(URI name, Configuration originalConf)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
conf = originalConf;
|
conf = originalConf;
|
||||||
writeHelper = new WriteOperationHelper(this, conf);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public WriteOperationHelper getWriteOperationHelper() {
|
|
||||||
return writeHelper;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -1,184 +0,0 @@
|
||||||
/*
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one
|
|
||||||
* or more contributor license agreements. See the NOTICE file
|
|
||||||
* distributed with this work for additional information
|
|
||||||
* regarding copyright ownership. The ASF licenses this file
|
|
||||||
* to you under the Apache License, Version 2.0 (the
|
|
||||||
* "License"); you may not use this file except in compliance
|
|
||||||
* with the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
|
|
||||||
package org.apache.hadoop.fs.s3a;
|
|
||||||
|
|
||||||
import com.amazonaws.services.s3.model.MultipartUpload;
|
|
||||||
import com.amazonaws.services.s3.model.PartETag;
|
|
||||||
import com.amazonaws.services.s3.model.UploadPartRequest;
|
|
||||||
import org.apache.hadoop.fs.Path;
|
|
||||||
import org.junit.Assert;
|
|
||||||
import org.slf4j.Logger;
|
|
||||||
import org.slf4j.LoggerFactory;
|
|
||||||
|
|
||||||
import java.io.ByteArrayInputStream;
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.io.InputStream;
|
|
||||||
import java.util.List;
|
|
||||||
import java.util.Objects;
|
|
||||||
import java.util.Set;
|
|
||||||
import java.util.stream.Collectors;
|
|
||||||
|
|
||||||
import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset;
|
|
||||||
import static org.apache.hadoop.fs.s3a.Invoker.LOG_EVENT;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Utilities for S3A multipart upload tests.
|
|
||||||
*/
|
|
||||||
public final class MultipartTestUtils {
|
|
||||||
private static final Logger LOG = LoggerFactory.getLogger(
|
|
||||||
MultipartTestUtils.class);
|
|
||||||
|
|
||||||
/** Not instantiated. */
|
|
||||||
private MultipartTestUtils() { }
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Clean up all provided uploads.
|
|
||||||
* @param keySet set of uploads to abort
|
|
||||||
*/
|
|
||||||
static void cleanupParts(S3AFileSystem fs, Set <IdKey> keySet) {
|
|
||||||
boolean anyFailure = false;
|
|
||||||
for (IdKey ik : keySet) {
|
|
||||||
try {
|
|
||||||
LOG.debug("aborting upload id {}", ik.getUploadId());
|
|
||||||
fs.abortMultipartUpload(ik.getKey(), ik.getUploadId());
|
|
||||||
} catch (Exception e) {
|
|
||||||
LOG.error(String.format("Failure aborting upload %s, continuing.",
|
|
||||||
ik.getKey()), e);
|
|
||||||
anyFailure = true;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Assert.assertFalse("Failure aborting multipart upload(s), see log.",
|
|
||||||
anyFailure);
|
|
||||||
}
|
|
||||||
|
|
||||||
public static IdKey createPartUpload(S3AFileSystem fs, String key, int len,
|
|
||||||
int partNo) throws IOException {
|
|
||||||
WriteOperationHelper writeHelper = fs.getWriteOperationHelper();
|
|
||||||
byte[] data = dataset(len, 'a', 'z');
|
|
||||||
InputStream in = new ByteArrayInputStream(data);
|
|
||||||
String uploadId = writeHelper.initiateMultiPartUpload(key);
|
|
||||||
UploadPartRequest req = writeHelper.newUploadPartRequest(key, uploadId,
|
|
||||||
partNo, len, in, null, 0L);
|
|
||||||
PartETag partEtag = fs.uploadPart(req).getPartETag();
|
|
||||||
LOG.debug("uploaded part etag {}, upid {}", partEtag.getETag(), uploadId);
|
|
||||||
return new IdKey(key, uploadId);
|
|
||||||
}
|
|
||||||
|
|
||||||
/** Delete any uploads under given path (recursive). Silent on failure. */
|
|
||||||
public static void clearAnyUploads(S3AFileSystem fs, Path path) {
|
|
||||||
try {
|
|
||||||
String key = fs.pathToKey(path);
|
|
||||||
MultipartUtils.UploadIterator uploads = fs.listUploads(key);
|
|
||||||
while (uploads.hasNext()) {
|
|
||||||
MultipartUpload upload = uploads.next();
|
|
||||||
fs.getWriteOperationHelper().abortMultipartUpload(upload.getKey(),
|
|
||||||
upload.getUploadId(), LOG_EVENT);
|
|
||||||
LOG.debug("Cleaning up upload: {} {}", upload.getKey(),
|
|
||||||
truncatedUploadId(upload.getUploadId()));
|
|
||||||
}
|
|
||||||
} catch (IOException ioe) {
|
|
||||||
LOG.info("Ignoring exception: ", ioe);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/** Assert that there are not any upload parts at given path. */
|
|
||||||
public static void assertNoUploadsAt(S3AFileSystem fs, Path path) throws
|
|
||||||
Exception {
|
|
||||||
String key = fs.pathToKey(path);
|
|
||||||
MultipartUtils.UploadIterator uploads = fs.listUploads(key);
|
|
||||||
while (uploads.hasNext()) {
|
|
||||||
MultipartUpload upload = uploads.next();
|
|
||||||
Assert.fail("Found unexpected upload " + upload.getKey() + " " +
|
|
||||||
truncatedUploadId(upload.getUploadId()));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/** Get number of part uploads under given path. */
|
|
||||||
public static int countUploadsAt(S3AFileSystem fs, Path path) throws
|
|
||||||
IOException {
|
|
||||||
String key = fs.pathToKey(path);
|
|
||||||
MultipartUtils.UploadIterator uploads = fs.listUploads(key);
|
|
||||||
int count = 0;
|
|
||||||
while (uploads.hasNext()) {
|
|
||||||
MultipartUpload upload = uploads.next();
|
|
||||||
count++;
|
|
||||||
}
|
|
||||||
return count;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Get a list of all pending uploads under a prefix, one which can be printed.
|
|
||||||
* @param prefix prefix to look under
|
|
||||||
* @return possibly empty list
|
|
||||||
* @throws IOException IO failure.
|
|
||||||
*/
|
|
||||||
public static List<String> listMultipartUploads(S3AFileSystem fs,
|
|
||||||
String prefix) throws IOException {
|
|
||||||
|
|
||||||
return fs
|
|
||||||
.listMultipartUploads(prefix).stream()
|
|
||||||
.map(upload -> String.format("Upload to %s with ID %s; initiated %s",
|
|
||||||
upload.getKey(),
|
|
||||||
upload.getUploadId(),
|
|
||||||
S3ATestUtils.LISTING_FORMAT.format(upload.getInitiated())))
|
|
||||||
.collect(Collectors.toList());
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
private static String truncatedUploadId(String fullId) {
|
|
||||||
return fullId.substring(0, 12) + " ...";
|
|
||||||
}
|
|
||||||
|
|
||||||
/** Struct of object key, upload ID. */
|
|
||||||
static class IdKey {
|
|
||||||
private String key;
|
|
||||||
private String uploadId;
|
|
||||||
|
|
||||||
IdKey(String key, String uploadId) {
|
|
||||||
this.key = key;
|
|
||||||
this.uploadId = uploadId;
|
|
||||||
}
|
|
||||||
|
|
||||||
public String getKey() {
|
|
||||||
return key;
|
|
||||||
}
|
|
||||||
|
|
||||||
public String getUploadId() {
|
|
||||||
return uploadId;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public boolean equals(Object o) {
|
|
||||||
if (this == o) {
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
if (o == null || getClass() != o.getClass()) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
IdKey key1 = (IdKey) o;
|
|
||||||
return Objects.equals(key, key1.key) &&
|
|
||||||
Objects.equals(uploadId, key1.uploadId);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public int hashCode() {
|
|
||||||
return Objects.hash(key, uploadId);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -45,6 +45,7 @@ import java.net.URISyntaxException;
|
||||||
import java.text.DateFormat;
|
import java.text.DateFormat;
|
||||||
import java.text.SimpleDateFormat;
|
import java.text.SimpleDateFormat;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
import static org.apache.hadoop.fs.contract.ContractTestUtils.skip;
|
import static org.apache.hadoop.fs.contract.ContractTestUtils.skip;
|
||||||
import static org.apache.hadoop.fs.s3a.InconsistentAmazonS3Client.*;
|
import static org.apache.hadoop.fs.s3a.InconsistentAmazonS3Client.*;
|
||||||
|
@ -821,9 +822,27 @@ public final class S3ATestUtils {
|
||||||
/**
|
/**
|
||||||
* Date format used for mapping upload initiation time to human string.
|
* Date format used for mapping upload initiation time to human string.
|
||||||
*/
|
*/
|
||||||
public static final DateFormat LISTING_FORMAT = new SimpleDateFormat(
|
private static final DateFormat LISTING_FORMAT = new SimpleDateFormat(
|
||||||
"yyyy-MM-dd HH:mm:ss");
|
"yyyy-MM-dd HH:mm:ss");
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get a list of all pending uploads under a prefix, one which can be printed.
|
||||||
|
* @param prefix prefix to look under
|
||||||
|
* @return possibly empty list
|
||||||
|
* @throws IOException IO failure.
|
||||||
|
*/
|
||||||
|
public static List<String> listMultipartUploads(S3AFileSystem fs,
|
||||||
|
String prefix) throws IOException {
|
||||||
|
|
||||||
|
return fs
|
||||||
|
.listMultipartUploads(prefix).stream()
|
||||||
|
.map(upload -> String.format("Upload to %s with ID %s; initiated %s",
|
||||||
|
upload.getKey(),
|
||||||
|
upload.getUploadId(),
|
||||||
|
LISTING_FORMAT.format(upload.getInitiated())))
|
||||||
|
.collect(Collectors.toList());
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Skip a test if the FS isn't marked as supporting magic commits.
|
* Skip a test if the FS isn't marked as supporting magic commits.
|
||||||
* @param fs filesystem
|
* @param fs filesystem
|
||||||
|
|
|
@ -47,7 +47,6 @@ import org.apache.hadoop.mapreduce.v2.api.records.JobId;
|
||||||
import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
|
import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
|
||||||
|
|
||||||
import static org.apache.hadoop.fs.s3a.Constants.*;
|
import static org.apache.hadoop.fs.s3a.Constants.*;
|
||||||
import static org.apache.hadoop.fs.s3a.MultipartTestUtils.listMultipartUploads;
|
|
||||||
import static org.apache.hadoop.fs.s3a.S3ATestUtils.*;
|
import static org.apache.hadoop.fs.s3a.S3ATestUtils.*;
|
||||||
import static org.apache.hadoop.fs.s3a.commit.CommitConstants.*;
|
import static org.apache.hadoop.fs.s3a.commit.CommitConstants.*;
|
||||||
|
|
||||||
|
@ -248,7 +247,7 @@ public abstract class AbstractCommitITest extends AbstractS3ATestBase {
|
||||||
S3AFileSystem fs = getFileSystem();
|
S3AFileSystem fs = getFileSystem();
|
||||||
if (fs != null && path != null) {
|
if (fs != null && path != null) {
|
||||||
String key = fs.pathToKey(path);
|
String key = fs.pathToKey(path);
|
||||||
WriteOperationHelper writeOps = fs.getWriteOperationHelper();
|
WriteOperationHelper writeOps = fs.createWriteOperationHelper();
|
||||||
int count = writeOps.abortMultipartUploadsUnderPath(key);
|
int count = writeOps.abortMultipartUploadsUnderPath(key);
|
||||||
if (count > 0) {
|
if (count > 0) {
|
||||||
log().info("Multipart uploads deleted: {}", count);
|
log().info("Multipart uploads deleted: {}", count);
|
||||||
|
|
|
@ -40,8 +40,8 @@ import org.apache.hadoop.fs.s3a.commit.files.PendingSet;
|
||||||
import org.apache.hadoop.fs.s3a.commit.files.SinglePendingCommit;
|
import org.apache.hadoop.fs.s3a.commit.files.SinglePendingCommit;
|
||||||
import org.apache.hadoop.fs.s3a.scale.AbstractSTestS3AHugeFiles;
|
import org.apache.hadoop.fs.s3a.scale.AbstractSTestS3AHugeFiles;
|
||||||
|
|
||||||
import static org.apache.hadoop.fs.s3a.MultipartTestUtils.listMultipartUploads;
|
|
||||||
import static org.apache.hadoop.fs.s3a.commit.CommitConstants.*;
|
import static org.apache.hadoop.fs.s3a.commit.CommitConstants.*;
|
||||||
|
import static org.apache.hadoop.fs.s3a.S3ATestUtils.*;
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -24,15 +24,10 @@ import java.io.ByteArrayOutputStream;
|
||||||
import java.io.FileNotFoundException;
|
import java.io.FileNotFoundException;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.InputStreamReader;
|
import java.io.InputStreamReader;
|
||||||
import java.util.ArrayList;
|
|
||||||
import java.util.Arrays;
|
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
import java.util.List;
|
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.concurrent.Callable;
|
import java.util.concurrent.Callable;
|
||||||
|
|
||||||
import org.apache.hadoop.test.LambdaTestUtils;
|
|
||||||
import org.apache.hadoop.util.StringUtils;
|
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||||
|
@ -40,20 +35,15 @@ import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.fs.s3a.S3AFileSystem;
|
import org.apache.hadoop.fs.s3a.S3AFileSystem;
|
||||||
import org.apache.hadoop.fs.s3a.s3guard.S3GuardTool.Diff;
|
import org.apache.hadoop.fs.s3a.s3guard.S3GuardTool.Diff;
|
||||||
|
|
||||||
import static org.apache.hadoop.fs.s3a.MultipartTestUtils.*;
|
|
||||||
import static org.apache.hadoop.fs.s3a.s3guard.S3GuardTool.*;
|
import static org.apache.hadoop.fs.s3a.s3guard.S3GuardTool.*;
|
||||||
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
|
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Test S3Guard related CLI commands against a LocalMetadataStore.
|
* Test S3Guard related CLI commands against a LocalMetadataStore.
|
||||||
* Also responsible for testing the non s3guard-specific commands that, for
|
|
||||||
* now, live under the s3guard CLI command.
|
|
||||||
*/
|
*/
|
||||||
public class ITestS3GuardToolLocal extends AbstractS3GuardToolTestBase {
|
public class ITestS3GuardToolLocal extends AbstractS3GuardToolTestBase {
|
||||||
|
|
||||||
private static final String LOCAL_METADATA = "local://metadata";
|
private static final String LOCAL_METADATA = "local://metadata";
|
||||||
private static final String[] ABORT_FORCE_OPTIONS = new String[] {"-abort",
|
|
||||||
"-force", "-verbose"};
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected MetadataStore newMetadataStore() {
|
protected MetadataStore newMetadataStore() {
|
||||||
|
@ -271,182 +261,5 @@ public class ITestS3GuardToolLocal extends AbstractS3GuardToolTestBase {
|
||||||
LOG.info("Exec output=\n{}", output);
|
LOG.info("Exec output=\n{}", output);
|
||||||
}
|
}
|
||||||
|
|
||||||
private final static String UPLOAD_PREFIX = "test-upload-prefix";
|
|
||||||
private final static String UPLOAD_NAME = "test-upload";
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testUploads() throws Throwable {
|
|
||||||
S3AFileSystem fs = getFileSystem();
|
|
||||||
Path path = path(UPLOAD_PREFIX + "/" + UPLOAD_NAME);
|
|
||||||
|
|
||||||
describe("Cleaning up any leftover uploads from previous runs.");
|
|
||||||
// 1. Make sure key doesn't already exist
|
|
||||||
clearAnyUploads(fs, path);
|
|
||||||
|
|
||||||
// 2. Confirm no uploads are listed via API
|
|
||||||
assertNoUploadsAt(fs, path.getParent());
|
|
||||||
|
|
||||||
// 3. Confirm no uploads are listed via CLI
|
|
||||||
describe("Confirming CLI lists nothing.");
|
|
||||||
assertNumUploads(path, 0);
|
|
||||||
|
|
||||||
// 4. Create a upload part
|
|
||||||
describe("Uploading single part.");
|
|
||||||
createPartUpload(fs, fs.pathToKey(path), 128, 1);
|
|
||||||
|
|
||||||
try {
|
|
||||||
// 5. Confirm it exists via API..
|
|
||||||
LambdaTestUtils.eventually(5000, /* 5 seconds until failure */
|
|
||||||
1000, /* one second retry interval */
|
|
||||||
() -> {
|
|
||||||
assertEquals("Should be one upload", 1, countUploadsAt(fs, path));
|
|
||||||
});
|
|
||||||
|
|
||||||
// 6. Confirm part exists via CLI, direct path and parent path
|
|
||||||
describe("Confirming CLI lists one part");
|
|
||||||
LambdaTestUtils.eventually(5000, 1000,
|
|
||||||
() -> assertNumUploads(path, 1));
|
|
||||||
LambdaTestUtils.eventually(5000, 1000,
|
|
||||||
() -> assertNumUploads(path.getParent(), 1));
|
|
||||||
|
|
||||||
// 7. Use CLI to delete part, assert it worked
|
|
||||||
describe("Deleting part via CLI");
|
|
||||||
assertNumDeleted(fs, path, 1);
|
|
||||||
|
|
||||||
// 8. Confirm deletion via API
|
|
||||||
describe("Confirming deletion via API");
|
|
||||||
assertEquals("Should be no uploads", 0, countUploadsAt(fs, path));
|
|
||||||
|
|
||||||
// 9. Confirm no uploads are listed via CLI
|
|
||||||
describe("Confirming CLI lists nothing.");
|
|
||||||
assertNumUploads(path, 0);
|
|
||||||
|
|
||||||
} catch (Throwable t) {
|
|
||||||
// Clean up on intermediate failure
|
|
||||||
clearAnyUploads(fs, path);
|
|
||||||
throw t;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testUploadListByAge() throws Throwable {
|
|
||||||
S3AFileSystem fs = getFileSystem();
|
|
||||||
Path path = path(UPLOAD_PREFIX + "/" + UPLOAD_NAME);
|
|
||||||
|
|
||||||
describe("Cleaning up any leftover uploads from previous runs.");
|
|
||||||
// 1. Make sure key doesn't already exist
|
|
||||||
clearAnyUploads(fs, path);
|
|
||||||
|
|
||||||
// 2. Create a upload part
|
|
||||||
describe("Uploading single part.");
|
|
||||||
createPartUpload(fs, fs.pathToKey(path), 128, 1);
|
|
||||||
|
|
||||||
try {
|
|
||||||
// 3. Confirm it exists via API.. may want to wrap with
|
|
||||||
// LambdaTestUtils.eventually() ?
|
|
||||||
LambdaTestUtils.eventually(5000, 1000,
|
|
||||||
() -> {
|
|
||||||
assertEquals("Should be one upload", 1, countUploadsAt(fs, path));
|
|
||||||
});
|
|
||||||
|
|
||||||
// 4. Confirm part does appear in listing with long age filter
|
|
||||||
describe("Confirming CLI older age doesn't list");
|
|
||||||
assertNumUploadsAge(path, 0, 600);
|
|
||||||
|
|
||||||
// 5. Confirm part does not get deleted with long age filter
|
|
||||||
describe("Confirming CLI older age doesn't delete");
|
|
||||||
uploadCommandAssertCount(fs, ABORT_FORCE_OPTIONS, path, 0,
|
|
||||||
600);
|
|
||||||
|
|
||||||
// 6. Wait a second and then assert the part is in listing of things at
|
|
||||||
// least a second old
|
|
||||||
describe("Sleeping 1 second then confirming upload still there");
|
|
||||||
Thread.sleep(1000);
|
|
||||||
LambdaTestUtils.eventually(5000, 1000,
|
|
||||||
() -> assertNumUploadsAge(path, 1, 1));
|
|
||||||
|
|
||||||
// 7. Assert deletion works when age filter matches
|
|
||||||
describe("Doing aged deletion");
|
|
||||||
uploadCommandAssertCount(fs, ABORT_FORCE_OPTIONS, path, 1, 1);
|
|
||||||
describe("Confirming age deletion happened");
|
|
||||||
assertEquals("Should be no uploads", 0, countUploadsAt(fs, path));
|
|
||||||
} catch (Throwable t) {
|
|
||||||
// Clean up on intermediate failure
|
|
||||||
clearAnyUploads(fs, path);
|
|
||||||
throw t;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testUploadNegativeExpect() throws Throwable {
|
|
||||||
runToFailure(E_BAD_STATE, Uploads.NAME, "-expect", "1",
|
|
||||||
path("/we/are/almost/postive/this/doesnt/exist/fhfsadfoijew")
|
|
||||||
.toString());
|
|
||||||
}
|
|
||||||
|
|
||||||
private void assertNumUploads(Path path, int numUploads) throws Exception {
|
|
||||||
assertNumUploadsAge(path, numUploads, 0);
|
|
||||||
}
|
|
||||||
|
|
||||||
private void assertNumUploadsAge(Path path, int numUploads, int ageSeconds)
|
|
||||||
throws Exception {
|
|
||||||
if (ageSeconds > 0) {
|
|
||||||
run(Uploads.NAME, "-expect", String.valueOf(numUploads), "-seconds",
|
|
||||||
String.valueOf(ageSeconds), path.toString());
|
|
||||||
} else {
|
|
||||||
run(Uploads.NAME, "-expect", String.valueOf(numUploads), path.toString());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private void assertNumDeleted(S3AFileSystem fs, Path path, int numDeleted)
|
|
||||||
throws Exception {
|
|
||||||
uploadCommandAssertCount(fs, ABORT_FORCE_OPTIONS, path,
|
|
||||||
numDeleted, 0);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Run uploads cli command and assert the reported count (listed or
|
|
||||||
* deleted) matches.
|
|
||||||
* @param fs S3AFileSystem
|
|
||||||
* @param options main command options
|
|
||||||
* @param path path of part(s)
|
|
||||||
* @param numUploads expected number of listed/deleted parts
|
|
||||||
* @param ageSeconds optional seconds of age to specify to CLI, or zero to
|
|
||||||
* search all parts
|
|
||||||
* @throws Exception on failure
|
|
||||||
*/
|
|
||||||
private void uploadCommandAssertCount(S3AFileSystem fs, String[] options,
|
|
||||||
Path path, int numUploads, int ageSeconds)
|
|
||||||
throws Exception {
|
|
||||||
List<String> allOptions = new ArrayList<>();
|
|
||||||
List<String> output = new ArrayList<>();
|
|
||||||
S3GuardTool.Uploads cmd = new S3GuardTool.Uploads(fs.getConf());
|
|
||||||
ByteArrayOutputStream buf = new ByteArrayOutputStream();
|
|
||||||
allOptions.add(cmd.getName());
|
|
||||||
allOptions.addAll(Arrays.asList(options));
|
|
||||||
if (ageSeconds > 0) {
|
|
||||||
allOptions.add("-" + Uploads.SECONDS_FLAG);
|
|
||||||
allOptions.add(String.valueOf(ageSeconds));
|
|
||||||
}
|
|
||||||
allOptions.add(path.toString());
|
|
||||||
exec(cmd, buf, allOptions.toArray(new String[0]));
|
|
||||||
|
|
||||||
try (BufferedReader reader = new BufferedReader(
|
|
||||||
new InputStreamReader(new ByteArrayInputStream(buf.toByteArray())))) {
|
|
||||||
String line;
|
|
||||||
while ((line = reader.readLine()) != null) {
|
|
||||||
String[] fields = line.split("\\s");
|
|
||||||
if (fields.length == 4 && fields[0].equals(Uploads.TOTAL)) {
|
|
||||||
int parsedUploads = Integer.valueOf(fields[1]);
|
|
||||||
LOG.debug("Matched CLI output: {} {} {} {}", fields);
|
|
||||||
assertEquals("Unexpected number of uploads", numUploads,
|
|
||||||
parsedUploads);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
LOG.debug("Not matched: {}", line);
|
|
||||||
output.add(line);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
fail("Command output did not match: \n" + StringUtils.join("\n", output));
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue