HADOOP-14220 Enhance S3GuardTool with bucket-info and set-capacity commands, tests.
Contributed by Steve Loughran
This commit is contained in:
parent
42e873d4ea
commit
28e05dc1da
|
@ -292,6 +292,10 @@ public class S3AFileSystem extends FileSystem {
|
||||||
metadataStore = S3Guard.getMetadataStore(this);
|
metadataStore = S3Guard.getMetadataStore(this);
|
||||||
allowAuthoritative = conf.getBoolean(METADATASTORE_AUTHORITATIVE,
|
allowAuthoritative = conf.getBoolean(METADATASTORE_AUTHORITATIVE,
|
||||||
DEFAULT_METADATASTORE_AUTHORITATIVE);
|
DEFAULT_METADATASTORE_AUTHORITATIVE);
|
||||||
|
if (hasMetadataStore()) {
|
||||||
|
LOG.debug("Using metadata store {}, authoritative={}",
|
||||||
|
getMetadataStore(), allowAuthoritative);
|
||||||
|
}
|
||||||
} catch (AmazonClientException e) {
|
} catch (AmazonClientException e) {
|
||||||
throw translateException("initializing ", new Path(name), e);
|
throw translateException("initializing ", new Path(name), e);
|
||||||
}
|
}
|
||||||
|
@ -975,7 +979,7 @@ public class S3AFileSystem extends FileSystem {
|
||||||
* @return the metadata store of this FS instance
|
* @return the metadata store of this FS instance
|
||||||
*/
|
*/
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
MetadataStore getMetadataStore() {
|
public MetadataStore getMetadataStore() {
|
||||||
return metadataStore;
|
return metadataStore;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2430,9 +2434,11 @@ public class S3AFileSystem extends FileSystem {
|
||||||
sb.append(", statistics {")
|
sb.append(", statistics {")
|
||||||
.append(statistics)
|
.append(statistics)
|
||||||
.append("}");
|
.append("}");
|
||||||
sb.append(", metrics {")
|
if (instrumentation != null) {
|
||||||
.append(instrumentation.dump("{", "=", "} ", true))
|
sb.append(", metrics {")
|
||||||
.append("}");
|
.append(instrumentation.dump("{", "=", "} ", true))
|
||||||
|
.append("}");
|
||||||
|
}
|
||||||
sb.append('}');
|
sb.append('}');
|
||||||
return sb.toString();
|
return sb.toString();
|
||||||
}
|
}
|
||||||
|
|
|
@ -743,9 +743,11 @@ public final class S3AUtils {
|
||||||
// propagate the value, building a new origin field.
|
// propagate the value, building a new origin field.
|
||||||
// to track overwrites, the generic key is overwritten even if
|
// to track overwrites, the generic key is overwritten even if
|
||||||
// already matches the new one.
|
// already matches the new one.
|
||||||
|
String origin = "[" + StringUtils.join(
|
||||||
|
source.getPropertySources(key), ", ") +"]";
|
||||||
final String generic = FS_S3A_PREFIX + stripped;
|
final String generic = FS_S3A_PREFIX + stripped;
|
||||||
LOG.debug("Updating {}", generic);
|
LOG.debug("Updating {} from {}", generic, origin);
|
||||||
dest.set(generic, value, key);
|
dest.set(generic, value, key + " via " + origin);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return dest;
|
return dest;
|
||||||
|
@ -888,4 +890,58 @@ public final class S3AUtils {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Set a bucket-specific property to a particular value.
|
||||||
|
* If the generic key passed in has an {@code fs.s3a. prefix},
|
||||||
|
* that's stripped off, so that when the the bucket properties are propagated
|
||||||
|
* down to the generic values, that value gets copied down.
|
||||||
|
* @param conf configuration to set
|
||||||
|
* @param bucket bucket name
|
||||||
|
* @param genericKey key; can start with "fs.s3a."
|
||||||
|
* @param value value to set
|
||||||
|
*/
|
||||||
|
public static void setBucketOption(Configuration conf, String bucket,
|
||||||
|
String genericKey, String value) {
|
||||||
|
final String baseKey = genericKey.startsWith(FS_S3A_PREFIX) ?
|
||||||
|
genericKey.substring(FS_S3A_PREFIX.length())
|
||||||
|
: genericKey;
|
||||||
|
conf.set(FS_S3A_BUCKET_PREFIX + bucket + '.' + baseKey, value, "S3AUtils");
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Clear a bucket-specific property.
|
||||||
|
* If the generic key passed in has an {@code fs.s3a. prefix},
|
||||||
|
* that's stripped off, so that when the the bucket properties are propagated
|
||||||
|
* down to the generic values, that value gets copied down.
|
||||||
|
* @param conf configuration to set
|
||||||
|
* @param bucket bucket name
|
||||||
|
* @param genericKey key; can start with "fs.s3a."
|
||||||
|
*/
|
||||||
|
public static void clearBucketOption(Configuration conf, String bucket,
|
||||||
|
String genericKey) {
|
||||||
|
final String baseKey = genericKey.startsWith(FS_S3A_PREFIX) ?
|
||||||
|
genericKey.substring(FS_S3A_PREFIX.length())
|
||||||
|
: genericKey;
|
||||||
|
String k = FS_S3A_BUCKET_PREFIX + bucket + '.' + baseKey;
|
||||||
|
LOG.debug("Unset {}", k);
|
||||||
|
conf.unset(k);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get a bucket-specific property.
|
||||||
|
* If the generic key passed in has an {@code fs.s3a. prefix},
|
||||||
|
* that's stripped off.
|
||||||
|
* @param conf configuration to set
|
||||||
|
* @param bucket bucket name
|
||||||
|
* @param genericKey key; can start with "fs.s3a."
|
||||||
|
* @return the bucket option, null if there is none
|
||||||
|
*/
|
||||||
|
public static String getBucketOption(Configuration conf, String bucket,
|
||||||
|
String genericKey) {
|
||||||
|
final String baseKey = genericKey.startsWith(FS_S3A_PREFIX) ?
|
||||||
|
genericKey.substring(FS_S3A_PREFIX.length())
|
||||||
|
: genericKey;
|
||||||
|
return conf.get(FS_S3A_BUCKET_PREFIX + bucket + '.' + baseKey);
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -29,6 +29,7 @@ import java.util.Date;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
import java.util.TreeMap;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
import com.amazonaws.AmazonClientException;
|
import com.amazonaws.AmazonClientException;
|
||||||
|
@ -51,6 +52,7 @@ import com.amazonaws.services.dynamodbv2.model.ProvisionedThroughput;
|
||||||
import com.amazonaws.services.dynamodbv2.model.ProvisionedThroughputDescription;
|
import com.amazonaws.services.dynamodbv2.model.ProvisionedThroughputDescription;
|
||||||
import com.amazonaws.services.dynamodbv2.model.ResourceInUseException;
|
import com.amazonaws.services.dynamodbv2.model.ResourceInUseException;
|
||||||
import com.amazonaws.services.dynamodbv2.model.ResourceNotFoundException;
|
import com.amazonaws.services.dynamodbv2.model.ResourceNotFoundException;
|
||||||
|
import com.amazonaws.services.dynamodbv2.model.TableDescription;
|
||||||
import com.amazonaws.services.dynamodbv2.model.WriteRequest;
|
import com.amazonaws.services.dynamodbv2.model.WriteRequest;
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
import com.google.common.base.Preconditions;
|
import com.google.common.base.Preconditions;
|
||||||
|
@ -184,6 +186,18 @@ public class DynamoDBMetadataStore implements MetadataStore {
|
||||||
* DynamoDB. Value is {@value} msec. */
|
* DynamoDB. Value is {@value} msec. */
|
||||||
public static final long MIN_RETRY_SLEEP_MSEC = 100;
|
public static final long MIN_RETRY_SLEEP_MSEC = 100;
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
static final String DESCRIPTION
|
||||||
|
= "S3Guard metadata store in DynamoDB";
|
||||||
|
@VisibleForTesting
|
||||||
|
static final String READ_CAPACITY = "read-capacity";
|
||||||
|
@VisibleForTesting
|
||||||
|
static final String WRITE_CAPACITY = "write-capacity";
|
||||||
|
@VisibleForTesting
|
||||||
|
static final String STATUS = "status";
|
||||||
|
@VisibleForTesting
|
||||||
|
static final String TABLE = "table";
|
||||||
|
|
||||||
private static ValueMap deleteTrackingValueMap =
|
private static ValueMap deleteTrackingValueMap =
|
||||||
new ValueMap().withBoolean(":false", false);
|
new ValueMap().withBoolean(":false", false);
|
||||||
|
|
||||||
|
@ -788,7 +802,9 @@ public class DynamoDBMetadataStore implements MetadataStore {
|
||||||
try {
|
try {
|
||||||
try {
|
try {
|
||||||
LOG.debug("Binding to table {}", tableName);
|
LOG.debug("Binding to table {}", tableName);
|
||||||
final String status = table.describe().getTableStatus();
|
TableDescription description = table.describe();
|
||||||
|
LOG.debug("Table state: {}", description);
|
||||||
|
final String status = description.getTableStatus();
|
||||||
switch (status) {
|
switch (status) {
|
||||||
case "CREATING":
|
case "CREATING":
|
||||||
case "UPDATING":
|
case "UPDATING":
|
||||||
|
@ -824,9 +840,10 @@ public class DynamoDBMetadataStore implements MetadataStore {
|
||||||
|
|
||||||
createTable(capacity);
|
createTable(capacity);
|
||||||
} else {
|
} else {
|
||||||
throw new FileNotFoundException("DynamoDB table "
|
throw (FileNotFoundException)new FileNotFoundException(
|
||||||
+ "'" + tableName + "' does not "
|
"DynamoDB table '" + tableName + "' does not "
|
||||||
+ "exist in region " + region + "; auto-creation is turned off");
|
+ "exist in region " + region + "; auto-creation is turned off")
|
||||||
|
.initCause(rnfe);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1007,4 +1024,83 @@ public class DynamoDBMetadataStore implements MetadataStore {
|
||||||
Preconditions.checkNotNull(meta.getFileStatus().getPath());
|
Preconditions.checkNotNull(meta.getFileStatus().getPath());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Map<String, String> getDiagnostics() throws IOException {
|
||||||
|
Map<String, String> map = new TreeMap<>();
|
||||||
|
if (table != null) {
|
||||||
|
TableDescription desc = getTableDescription(true);
|
||||||
|
map.put("name", desc.getTableName());
|
||||||
|
map.put(STATUS, desc.getTableStatus());
|
||||||
|
map.put("ARN", desc.getTableArn());
|
||||||
|
map.put("size", desc.getTableSizeBytes().toString());
|
||||||
|
map.put(TABLE, desc.toString());
|
||||||
|
ProvisionedThroughputDescription throughput
|
||||||
|
= desc.getProvisionedThroughput();
|
||||||
|
map.put(READ_CAPACITY, throughput.getReadCapacityUnits().toString());
|
||||||
|
map.put(WRITE_CAPACITY, throughput.getWriteCapacityUnits().toString());
|
||||||
|
map.put(TABLE, desc.toString());
|
||||||
|
} else {
|
||||||
|
map.put("name", "DynamoDB Metadata Store");
|
||||||
|
map.put(TABLE, "none");
|
||||||
|
map.put(STATUS, "undefined");
|
||||||
|
}
|
||||||
|
map.put("description", DESCRIPTION);
|
||||||
|
map.put("region", region);
|
||||||
|
if (dataAccessRetryPolicy != null) {
|
||||||
|
map.put("retryPolicy", dataAccessRetryPolicy.toString());
|
||||||
|
}
|
||||||
|
return map;
|
||||||
|
}
|
||||||
|
|
||||||
|
private TableDescription getTableDescription(boolean forceUpdate) {
|
||||||
|
TableDescription desc = table.getDescription();
|
||||||
|
if (desc == null || forceUpdate) {
|
||||||
|
desc = table.describe();
|
||||||
|
}
|
||||||
|
return desc;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void updateParameters(Map<String, String> parameters)
|
||||||
|
throws IOException {
|
||||||
|
Preconditions.checkNotNull(table, "Not initialized");
|
||||||
|
TableDescription desc = getTableDescription(true);
|
||||||
|
ProvisionedThroughputDescription current
|
||||||
|
= desc.getProvisionedThroughput();
|
||||||
|
|
||||||
|
long currentRead = current.getReadCapacityUnits();
|
||||||
|
long newRead = getLongParam(parameters,
|
||||||
|
S3GUARD_DDB_TABLE_CAPACITY_READ_KEY,
|
||||||
|
currentRead);
|
||||||
|
long currentWrite = current.getWriteCapacityUnits();
|
||||||
|
long newWrite = getLongParam(parameters,
|
||||||
|
S3GUARD_DDB_TABLE_CAPACITY_WRITE_KEY,
|
||||||
|
currentWrite);
|
||||||
|
|
||||||
|
ProvisionedThroughput throughput = new ProvisionedThroughput()
|
||||||
|
.withReadCapacityUnits(newRead)
|
||||||
|
.withWriteCapacityUnits(newWrite);
|
||||||
|
if (newRead != currentRead || newWrite != currentWrite) {
|
||||||
|
LOG.info("Current table capacity is read: {}, write: {}",
|
||||||
|
currentRead, currentWrite);
|
||||||
|
LOG.info("Changing capacity of table to read: {}, write: {}",
|
||||||
|
newRead, newWrite);
|
||||||
|
table.updateTable(throughput);
|
||||||
|
} else {
|
||||||
|
LOG.info("Table capacity unchanged at read: {}, write: {}",
|
||||||
|
newRead, newWrite);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private long getLongParam(Map<String, String> parameters,
|
||||||
|
String key,
|
||||||
|
long defVal) {
|
||||||
|
String k = parameters.get(key);
|
||||||
|
if (k != null) {
|
||||||
|
return Long.parseLong(k);
|
||||||
|
} else {
|
||||||
|
return defVal;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -20,6 +20,8 @@ package org.apache.hadoop.fs.s3a.s3guard;
|
||||||
|
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
import com.google.common.base.Preconditions;
|
import com.google.common.base.Preconditions;
|
||||||
|
|
||||||
|
import org.apache.commons.lang.StringUtils;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.FileStatus;
|
import org.apache.hadoop.fs.FileStatus;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
@ -31,6 +33,7 @@ import org.slf4j.LoggerFactory;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.net.URI;
|
import java.net.URI;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
|
import java.util.HashMap;
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
import java.util.LinkedList;
|
import java.util.LinkedList;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
@ -100,7 +103,7 @@ public class LocalMetadataStore implements MetadataStore {
|
||||||
public String toString() {
|
public String toString() {
|
||||||
final StringBuilder sb = new StringBuilder(
|
final StringBuilder sb = new StringBuilder(
|
||||||
"LocalMetadataStore{");
|
"LocalMetadataStore{");
|
||||||
sb.append(", uriHost='").append(uriHost).append('\'');
|
sb.append("uriHost='").append(uriHost).append('\'');
|
||||||
sb.append('}');
|
sb.append('}');
|
||||||
return sb.toString();
|
return sb.toString();
|
||||||
}
|
}
|
||||||
|
@ -153,7 +156,9 @@ public class LocalMetadataStore implements MetadataStore {
|
||||||
m.setIsEmptyDirectory(isEmptyDirectory(p));
|
m.setIsEmptyDirectory(isEmptyDirectory(p));
|
||||||
}
|
}
|
||||||
|
|
||||||
LOG.debug("get({}) -> {}", path, m == null ? "null" : m.prettyPrint());
|
if (LOG.isDebugEnabled()) {
|
||||||
|
LOG.debug("get({}) -> {}", path, m == null ? "null" : m.prettyPrint());
|
||||||
|
}
|
||||||
return m;
|
return m;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -424,12 +429,22 @@ public class LocalMetadataStore implements MetadataStore {
|
||||||
Preconditions.checkArgument(p.isAbsolute(), "Path must be absolute");
|
Preconditions.checkArgument(p.isAbsolute(), "Path must be absolute");
|
||||||
URI uri = p.toUri();
|
URI uri = p.toUri();
|
||||||
if (uriHost != null) {
|
if (uriHost != null) {
|
||||||
Preconditions.checkArgument(!isEmpty(uri.getHost()));
|
Preconditions.checkArgument(StringUtils.isNotEmpty(uri.getHost()));
|
||||||
}
|
}
|
||||||
return p;
|
return p;
|
||||||
}
|
}
|
||||||
|
|
||||||
private static boolean isEmpty(String s) {
|
@Override
|
||||||
return (s == null || s.isEmpty());
|
public Map<String, String> getDiagnostics() throws IOException {
|
||||||
|
Map<String, String> map = new HashMap<>();
|
||||||
|
map.put("name", "local://metadata");
|
||||||
|
map.put("uriHost", uriHost);
|
||||||
|
map.put("description", "Local in-VM metadata store for testing");
|
||||||
|
return map;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void updateParameters(Map<String, String> parameters)
|
||||||
|
throws IOException {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -21,6 +21,7 @@ package org.apache.hadoop.fs.s3a.s3guard;
|
||||||
import java.io.Closeable;
|
import java.io.Closeable;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
|
@ -218,4 +219,20 @@ public interface MetadataStore extends Closeable {
|
||||||
* @throws UnsupportedOperationException if not implemented
|
* @throws UnsupportedOperationException if not implemented
|
||||||
*/
|
*/
|
||||||
void prune(long modTime) throws IOException, UnsupportedOperationException;
|
void prune(long modTime) throws IOException, UnsupportedOperationException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get any diagnostics information from a store, as a list of (key, value)
|
||||||
|
* tuples for display. Arbitrary values; no guarantee of stability.
|
||||||
|
* These are for debugging only.
|
||||||
|
* @return a map of strings.
|
||||||
|
* @throws IOException if there is an error
|
||||||
|
*/
|
||||||
|
Map<String, String> getDiagnostics() throws IOException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Tune/update parameters for an existing table.
|
||||||
|
* @param parameters map of params to change.
|
||||||
|
* @throws IOException if there is an error
|
||||||
|
*/
|
||||||
|
void updateParameters(Map<String, String> parameters) throws IOException;
|
||||||
}
|
}
|
||||||
|
|
|
@ -24,6 +24,8 @@ import org.apache.hadoop.fs.Path;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A no-op implementation of MetadataStore. Clients that use this
|
* A no-op implementation of MetadataStore. Clients that use this
|
||||||
|
@ -101,4 +103,17 @@ public class NullMetadataStore implements MetadataStore {
|
||||||
public String toString() {
|
public String toString() {
|
||||||
return "NullMetadataStore";
|
return "NullMetadataStore";
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Map<String, String> getDiagnostics() throws IOException {
|
||||||
|
Map<String, String> map = new HashMap<>();
|
||||||
|
map.put("name", "Null Metadata Store");
|
||||||
|
map.put("description", "This is not a real metadata store");
|
||||||
|
return map;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void updateParameters(Map<String, String> parameters)
|
||||||
|
throws IOException {
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -18,6 +18,7 @@
|
||||||
|
|
||||||
package org.apache.hadoop.fs.s3a.s3guard;
|
package org.apache.hadoop.fs.s3a.s3guard;
|
||||||
|
|
||||||
|
import java.io.FileNotFoundException;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.net.URI;
|
import java.net.URI;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
@ -95,6 +96,10 @@ public final class S3Guard {
|
||||||
msClass.getSimpleName(), fs.getScheme());
|
msClass.getSimpleName(), fs.getScheme());
|
||||||
msInstance.initialize(fs);
|
msInstance.initialize(fs);
|
||||||
return msInstance;
|
return msInstance;
|
||||||
|
} catch (FileNotFoundException e) {
|
||||||
|
// Don't log this exception as it means the table doesn't exist yet;
|
||||||
|
// rely on callers to catch and treat specially
|
||||||
|
throw e;
|
||||||
} catch (RuntimeException | IOException e) {
|
} catch (RuntimeException | IOException e) {
|
||||||
String message = "Failed to instantiate metadata store " +
|
String message = "Failed to instantiate metadata store " +
|
||||||
conf.get(S3_METADATA_STORE_IMPL)
|
conf.get(S3_METADATA_STORE_IMPL)
|
||||||
|
@ -109,14 +114,20 @@ public final class S3Guard {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private static Class<? extends MetadataStore> getMetadataStoreClass(
|
static Class<? extends MetadataStore> getMetadataStoreClass(
|
||||||
Configuration conf) {
|
Configuration conf) {
|
||||||
if (conf == null) {
|
if (conf == null) {
|
||||||
return NullMetadataStore.class;
|
return NullMetadataStore.class;
|
||||||
}
|
}
|
||||||
|
if (conf.get(S3_METADATA_STORE_IMPL) != null && LOG.isDebugEnabled()) {
|
||||||
|
LOG.debug("Metastore option source {}",
|
||||||
|
conf.getPropertySources(S3_METADATA_STORE_IMPL));
|
||||||
|
}
|
||||||
|
|
||||||
return conf.getClass(S3_METADATA_STORE_IMPL, NullMetadataStore.class,
|
Class<? extends MetadataStore> aClass = conf.getClass(
|
||||||
MetadataStore.class);
|
S3_METADATA_STORE_IMPL, NullMetadataStore.class,
|
||||||
|
MetadataStore.class);
|
||||||
|
return aClass;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -36,6 +36,7 @@ import com.google.common.base.Preconditions;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import org.apache.commons.lang.StringUtils;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.conf.Configured;
|
import org.apache.hadoop.conf.Configured;
|
||||||
import org.apache.hadoop.fs.FileStatus;
|
import org.apache.hadoop.fs.FileStatus;
|
||||||
|
@ -43,15 +44,17 @@ import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.LocatedFileStatus;
|
import org.apache.hadoop.fs.LocatedFileStatus;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.fs.RemoteIterator;
|
import org.apache.hadoop.fs.RemoteIterator;
|
||||||
import org.apache.hadoop.fs.s3a.Constants;
|
|
||||||
import org.apache.hadoop.fs.s3a.S3AFileStatus;
|
import org.apache.hadoop.fs.s3a.S3AFileStatus;
|
||||||
import org.apache.hadoop.fs.s3a.S3AFileSystem;
|
import org.apache.hadoop.fs.s3a.S3AFileSystem;
|
||||||
|
import org.apache.hadoop.fs.s3a.S3AUtils;
|
||||||
import org.apache.hadoop.fs.shell.CommandFormat;
|
import org.apache.hadoop.fs.shell.CommandFormat;
|
||||||
|
import org.apache.hadoop.util.ExitUtil;
|
||||||
import org.apache.hadoop.util.GenericOptionsParser;
|
import org.apache.hadoop.util.GenericOptionsParser;
|
||||||
import org.apache.hadoop.util.Tool;
|
import org.apache.hadoop.util.Tool;
|
||||||
import org.apache.hadoop.util.ToolRunner;
|
import org.apache.hadoop.util.ToolRunner;
|
||||||
|
|
||||||
import static org.apache.hadoop.fs.s3a.Constants.*;
|
import static org.apache.hadoop.fs.s3a.Constants.*;
|
||||||
|
import static org.apache.hadoop.service.launcher.LauncherExitCodes.*;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* CLI to manage S3Guard Metadata Store.
|
* CLI to manage S3Guard Metadata Store.
|
||||||
|
@ -74,40 +77,46 @@ public abstract class S3GuardTool extends Configured implements Tool {
|
||||||
"\t" + Init.NAME + " - " + Init.PURPOSE + "\n" +
|
"\t" + Init.NAME + " - " + Init.PURPOSE + "\n" +
|
||||||
"\t" + Destroy.NAME + " - " + Destroy.PURPOSE + "\n" +
|
"\t" + Destroy.NAME + " - " + Destroy.PURPOSE + "\n" +
|
||||||
"\t" + Import.NAME + " - " + Import.PURPOSE + "\n" +
|
"\t" + Import.NAME + " - " + Import.PURPOSE + "\n" +
|
||||||
|
"\t" + BucketInfo.NAME + " - " + BucketInfo.PURPOSE + "\n" +
|
||||||
"\t" + Diff.NAME + " - " + Diff.PURPOSE + "\n" +
|
"\t" + Diff.NAME + " - " + Diff.PURPOSE + "\n" +
|
||||||
"\t" + Prune.NAME + " - " + Prune.PURPOSE + "\n";
|
"\t" + Prune.NAME + " - " + Prune.PURPOSE + "\n" +
|
||||||
|
"\t" + SetCapacity.NAME + " - " +SetCapacity.PURPOSE + "\n";
|
||||||
private static final String DATA_IN_S3_IS_PRESERVED
|
private static final String DATA_IN_S3_IS_PRESERVED
|
||||||
= "(all data in S3 is preserved";
|
= "(all data in S3 is preserved)";
|
||||||
|
|
||||||
abstract public String getUsage();
|
abstract public String getUsage();
|
||||||
|
|
||||||
// Exit codes
|
// Exit codes
|
||||||
static final int SUCCESS = 0;
|
static final int SUCCESS = EXIT_SUCCESS;
|
||||||
static final int INVALID_ARGUMENT = 1;
|
static final int INVALID_ARGUMENT = EXIT_COMMAND_ARGUMENT_ERROR;
|
||||||
static final int ERROR = 99;
|
static final int E_USAGE = EXIT_USAGE;
|
||||||
|
static final int ERROR = EXIT_FAIL;
|
||||||
|
static final int E_BAD_STATE = EXIT_NOT_ACCEPTABLE;
|
||||||
|
static final int E_NOT_FOUND = EXIT_NOT_FOUND;
|
||||||
|
|
||||||
private S3AFileSystem filesystem;
|
private S3AFileSystem filesystem;
|
||||||
private MetadataStore store;
|
private MetadataStore store;
|
||||||
private final CommandFormat commandFormat;
|
private final CommandFormat commandFormat;
|
||||||
|
|
||||||
private static final String META_FLAG = "meta";
|
public static final String META_FLAG = "meta";
|
||||||
private static final String DAYS_FLAG = "days";
|
public static final String DAYS_FLAG = "days";
|
||||||
private static final String HOURS_FLAG = "hours";
|
public static final String HOURS_FLAG = "hours";
|
||||||
private static final String MINUTES_FLAG = "minutes";
|
public static final String MINUTES_FLAG = "minutes";
|
||||||
private static final String SECONDS_FLAG = "seconds";
|
public static final String SECONDS_FLAG = "seconds";
|
||||||
|
|
||||||
private static final String REGION_FLAG = "region";
|
public static final String REGION_FLAG = "region";
|
||||||
private static final String READ_FLAG = "read";
|
public static final String READ_FLAG = "read";
|
||||||
private static final String WRITE_FLAG = "write";
|
public static final String WRITE_FLAG = "write";
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Constructor a S3Guard tool with HDFS configuration.
|
* Constructor a S3Guard tool with HDFS configuration.
|
||||||
* @param conf Configuration.
|
* @param conf Configuration.
|
||||||
|
* @param opts any boolean options to support
|
||||||
*/
|
*/
|
||||||
protected S3GuardTool(Configuration conf) {
|
protected S3GuardTool(Configuration conf, String...opts) {
|
||||||
super(conf);
|
super(conf);
|
||||||
|
|
||||||
commandFormat = new CommandFormat(0, Integer.MAX_VALUE);
|
commandFormat = new CommandFormat(0, Integer.MAX_VALUE, opts);
|
||||||
// For metadata store URI
|
// For metadata store URI
|
||||||
commandFormat.addOptionWithValue(META_FLAG);
|
commandFormat.addOptionWithValue(META_FLAG);
|
||||||
// DDB region.
|
// DDB region.
|
||||||
|
@ -126,10 +135,10 @@ public abstract class S3GuardTool extends Configured implements Tool {
|
||||||
* {@link Destroy}.
|
* {@link Destroy}.
|
||||||
*
|
*
|
||||||
* @param paths remaining parameters from CLI.
|
* @param paths remaining parameters from CLI.
|
||||||
* @return false for invalid parameters.
|
|
||||||
* @throws IOException on I/O errors.
|
* @throws IOException on I/O errors.
|
||||||
|
* @throws ExitUtil.ExitException on validation errors
|
||||||
*/
|
*/
|
||||||
boolean parseDynamoDBRegion(List<String> paths) throws IOException {
|
void parseDynamoDBRegion(List<String> paths) throws IOException {
|
||||||
Configuration conf = getConf();
|
Configuration conf = getConf();
|
||||||
String fromCli = getCommandFormat().getOptValue(REGION_FLAG);
|
String fromCli = getCommandFormat().getOptValue(REGION_FLAG);
|
||||||
String fromConf = conf.get(S3GUARD_DDB_REGION_KEY);
|
String fromConf = conf.get(S3GUARD_DDB_REGION_KEY);
|
||||||
|
@ -137,37 +146,34 @@ public abstract class S3GuardTool extends Configured implements Tool {
|
||||||
|
|
||||||
if (fromCli != null) {
|
if (fromCli != null) {
|
||||||
if (fromCli.isEmpty()) {
|
if (fromCli.isEmpty()) {
|
||||||
System.err.println("No region provided with -" + REGION_FLAG + " flag");
|
throw invalidArgs("No region provided with -" + REGION_FLAG + " flag");
|
||||||
return false;
|
|
||||||
}
|
}
|
||||||
if (hasS3Path) {
|
if (hasS3Path) {
|
||||||
System.err.println("Providing both an S3 path and the -" + REGION_FLAG
|
throw invalidArgs("Providing both an S3 path and the"
|
||||||
|
+ " -" + REGION_FLAG
|
||||||
+ " flag is not supported. If you need to specify a different "
|
+ " flag is not supported. If you need to specify a different "
|
||||||
+ "region than the S3 bucket, configure " + S3GUARD_DDB_REGION_KEY);
|
+ "region than the S3 bucket, configure " + S3GUARD_DDB_REGION_KEY);
|
||||||
return false;
|
|
||||||
}
|
}
|
||||||
conf.set(S3GUARD_DDB_REGION_KEY, fromCli);
|
conf.set(S3GUARD_DDB_REGION_KEY, fromCli);
|
||||||
return true;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (fromConf != null) {
|
if (fromConf != null) {
|
||||||
if (fromConf.isEmpty()) {
|
if (fromConf.isEmpty()) {
|
||||||
System.err.printf("No region provided with config %s, %n",
|
throw invalidArgs("No region provided with config %s",
|
||||||
S3GUARD_DDB_REGION_KEY);
|
S3GUARD_DDB_REGION_KEY);
|
||||||
return false;
|
|
||||||
}
|
}
|
||||||
return true;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (hasS3Path) {
|
if (hasS3Path) {
|
||||||
String s3Path = paths.get(0);
|
String s3Path = paths.get(0);
|
||||||
initS3AFileSystem(s3Path);
|
initS3AFileSystem(s3Path);
|
||||||
return true;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
System.err.println("No region found from -" + REGION_FLAG + " flag, " +
|
throw invalidArgs("No region found from -" + REGION_FLAG + " flag, " +
|
||||||
"config, or S3 bucket");
|
"config, or S3 bucket");
|
||||||
return false;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -189,7 +195,7 @@ public abstract class S3GuardTool extends Configured implements Tool {
|
||||||
String metaURI = getCommandFormat().getOptValue(META_FLAG);
|
String metaURI = getCommandFormat().getOptValue(META_FLAG);
|
||||||
if (metaURI != null && !metaURI.isEmpty()) {
|
if (metaURI != null && !metaURI.isEmpty()) {
|
||||||
URI uri = URI.create(metaURI);
|
URI uri = URI.create(metaURI);
|
||||||
LOG.info("create metadata store: {}", uri + " scheme: "
|
LOG.info("Create metadata store: {}", uri + " scheme: "
|
||||||
+ uri.getScheme());
|
+ uri.getScheme());
|
||||||
switch (uri.getScheme().toLowerCase(Locale.ENGLISH)) {
|
switch (uri.getScheme().toLowerCase(Locale.ENGLISH)) {
|
||||||
case "local":
|
case "local":
|
||||||
|
@ -225,35 +231,48 @@ public abstract class S3GuardTool extends Configured implements Tool {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Initialize S3A FileSystem instance.
|
* Create and initialize a new S3A FileSystem instance.
|
||||||
|
* This instance is always created without S3Guard, so allowing
|
||||||
|
* a previously created metastore to be patched in.
|
||||||
|
*
|
||||||
|
* Note: this is a bit convoluted as it needs to also handle the situation
|
||||||
|
* of a per-bucket option in core-site.xml, which isn't easily overridden.
|
||||||
|
* The new config and the setting of the values before any
|
||||||
|
* {@code Configuration.get()} calls are critical.
|
||||||
*
|
*
|
||||||
* @param path s3a URI
|
* @param path s3a URI
|
||||||
* @throws IOException
|
* @throws IOException failure to init filesystem
|
||||||
|
* @throws ExitUtil.ExitException if the FS is not an S3A FS
|
||||||
*/
|
*/
|
||||||
void initS3AFileSystem(String path) throws IOException {
|
void initS3AFileSystem(String path) throws IOException {
|
||||||
URI uri;
|
URI uri = toUri(path);
|
||||||
try {
|
|
||||||
uri = new URI(path);
|
|
||||||
} catch (URISyntaxException e) {
|
|
||||||
throw new IOException(e);
|
|
||||||
}
|
|
||||||
// Make sure that S3AFileSystem does not hold an actual MetadataStore
|
// Make sure that S3AFileSystem does not hold an actual MetadataStore
|
||||||
// implementation.
|
// implementation.
|
||||||
Configuration conf = getConf();
|
Configuration conf = new Configuration(getConf());
|
||||||
conf.setClass(S3_METADATA_STORE_IMPL, NullMetadataStore.class,
|
String nullStore = NullMetadataStore.class.getName();
|
||||||
MetadataStore.class);
|
conf.set(S3_METADATA_STORE_IMPL, nullStore);
|
||||||
FileSystem fs = FileSystem.get(uri, getConf());
|
String bucket = uri.getHost();
|
||||||
|
S3AUtils.setBucketOption(conf,
|
||||||
|
bucket,
|
||||||
|
S3_METADATA_STORE_IMPL, S3GUARD_METASTORE_NULL);
|
||||||
|
String updatedBucketOption = S3AUtils.getBucketOption(conf, bucket,
|
||||||
|
S3_METADATA_STORE_IMPL);
|
||||||
|
LOG.debug("updated bucket store option {}", updatedBucketOption);
|
||||||
|
Preconditions.checkState(S3GUARD_METASTORE_NULL.equals(updatedBucketOption),
|
||||||
|
"Expected bucket option to be %s but was %s",
|
||||||
|
S3GUARD_METASTORE_NULL, updatedBucketOption);
|
||||||
|
|
||||||
|
FileSystem fs = FileSystem.newInstance(uri, conf);
|
||||||
if (!(fs instanceof S3AFileSystem)) {
|
if (!(fs instanceof S3AFileSystem)) {
|
||||||
throw new IOException(
|
throw invalidArgs("URI %s is not a S3A file system: %s",
|
||||||
String.format("URI %s is not a S3A file system: %s", uri,
|
uri, fs.getClass().getName());
|
||||||
fs.getClass().getName()));
|
|
||||||
}
|
}
|
||||||
filesystem = (S3AFileSystem) fs;
|
filesystem = (S3AFileSystem) fs;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Parse CLI arguments and returns the position arguments.
|
* Parse CLI arguments and returns the position arguments.
|
||||||
* The options are stored in {@link #commandFormat}
|
* The options are stored in {@link #commandFormat}.
|
||||||
*
|
*
|
||||||
* @param args command line arguments.
|
* @param args command line arguments.
|
||||||
* @return the position arguments from CLI.
|
* @return the position arguments from CLI.
|
||||||
|
@ -285,11 +304,32 @@ public abstract class S3GuardTool extends Configured implements Tool {
|
||||||
return commandFormat;
|
return commandFormat;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public final int run(String[] args) throws Exception {
|
||||||
|
return run(args, System.out);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Run the tool, capturing the output (if the tool supports that).
|
||||||
|
*
|
||||||
|
* As well as returning an exit code, the implementations can choose to
|
||||||
|
* throw an instance of {@link ExitUtil.ExitException} with their exit
|
||||||
|
* code set to the desired exit value. The exit code of auch an exception
|
||||||
|
* is used for the tool's exit code, and the stack trace only logged at
|
||||||
|
* debug.
|
||||||
|
* @param args argument list
|
||||||
|
* @param out output stream
|
||||||
|
* @return the exit code to return.
|
||||||
|
* @throws Exception on any failure
|
||||||
|
* @throws ExitUtil.ExitException for an alternative clean exit
|
||||||
|
*/
|
||||||
|
public abstract int run(String[] args, PrintStream out) throws Exception;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Create the metadata store.
|
* Create the metadata store.
|
||||||
*/
|
*/
|
||||||
static class Init extends S3GuardTool {
|
static class Init extends S3GuardTool {
|
||||||
private static final String NAME = "init";
|
public static final String NAME = "init";
|
||||||
public static final String PURPOSE = "initialize metadata repository";
|
public static final String PURPOSE = "initialize metadata repository";
|
||||||
private static final String USAGE = NAME + " [OPTIONS] [s3a://BUCKET]\n" +
|
private static final String USAGE = NAME + " [OPTIONS] [s3a://BUCKET]\n" +
|
||||||
"\t" + PURPOSE + "\n\n" +
|
"\t" + PURPOSE + "\n\n" +
|
||||||
|
@ -325,7 +365,7 @@ public abstract class S3GuardTool extends Configured implements Tool {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public int run(String[] args) throws IOException {
|
public int run(String[] args, PrintStream out) throws Exception {
|
||||||
List<String> paths = parseArgs(args);
|
List<String> paths = parseArgs(args);
|
||||||
|
|
||||||
String readCap = getCommandFormat().getOptValue(READ_FLAG);
|
String readCap = getCommandFormat().getOptValue(READ_FLAG);
|
||||||
|
@ -340,20 +380,92 @@ public abstract class S3GuardTool extends Configured implements Tool {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Validate parameters.
|
// Validate parameters.
|
||||||
if (!parseDynamoDBRegion(paths)) {
|
try {
|
||||||
System.err.println(USAGE);
|
parseDynamoDBRegion(paths);
|
||||||
return INVALID_ARGUMENT;
|
} catch (ExitUtil.ExitException e) {
|
||||||
|
errorln(USAGE);
|
||||||
|
throw e;
|
||||||
}
|
}
|
||||||
initMetadataStore(true);
|
MetadataStore store = initMetadataStore(true);
|
||||||
|
printStoreDiagnostics(out, store);
|
||||||
return SUCCESS;
|
return SUCCESS;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Change the capacity of the metadata store.
|
||||||
|
*/
|
||||||
|
static class SetCapacity extends S3GuardTool {
|
||||||
|
public static final String NAME = "set-capacity";
|
||||||
|
public static final String PURPOSE = "Alter metadata store IO capacity";
|
||||||
|
private static final String USAGE = NAME + " [OPTIONS] [s3a://BUCKET]\n" +
|
||||||
|
"\t" + PURPOSE + "\n\n" +
|
||||||
|
"Common options:\n" +
|
||||||
|
" -" + META_FLAG + " URL - Metadata repository details " +
|
||||||
|
"(implementation-specific)\n" +
|
||||||
|
"\n" +
|
||||||
|
"Amazon DynamoDB-specific options:\n" +
|
||||||
|
" -" + READ_FLAG + " UNIT - Provisioned read throughput units\n" +
|
||||||
|
" -" + WRITE_FLAG + " UNIT - Provisioned write through put units\n" +
|
||||||
|
"\n" +
|
||||||
|
" URLs for Amazon DynamoDB are of the form dynamodb://TABLE_NAME.\n" +
|
||||||
|
" Specifying both the -" + REGION_FLAG + " option and an S3A path\n" +
|
||||||
|
" is not supported.";
|
||||||
|
|
||||||
|
SetCapacity(Configuration conf) {
|
||||||
|
super(conf);
|
||||||
|
// read capacity.
|
||||||
|
getCommandFormat().addOptionWithValue(READ_FLAG);
|
||||||
|
// write capacity.
|
||||||
|
getCommandFormat().addOptionWithValue(WRITE_FLAG);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
String getName() {
|
||||||
|
return NAME;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String getUsage() {
|
||||||
|
return USAGE;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int run(String[] args, PrintStream out) throws Exception {
|
||||||
|
List<String> paths = parseArgs(args);
|
||||||
|
Map<String, String> options = new HashMap<>();
|
||||||
|
|
||||||
|
String readCap = getCommandFormat().getOptValue(READ_FLAG);
|
||||||
|
if (StringUtils.isNotEmpty(readCap)) {
|
||||||
|
S3GuardTool.println(out, "Read capacity set to %s", readCap);
|
||||||
|
options.put(S3GUARD_DDB_TABLE_CAPACITY_READ_KEY, readCap);
|
||||||
|
}
|
||||||
|
String writeCap = getCommandFormat().getOptValue(WRITE_FLAG);
|
||||||
|
if (StringUtils.isNotEmpty(writeCap)) {
|
||||||
|
S3GuardTool.println(out, "Write capacity set to %s", writeCap);
|
||||||
|
options.put(S3GUARD_DDB_TABLE_CAPACITY_WRITE_KEY, writeCap);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Validate parameters.
|
||||||
|
try {
|
||||||
|
parseDynamoDBRegion(paths);
|
||||||
|
} catch (ExitUtil.ExitException e) {
|
||||||
|
errorln(USAGE);
|
||||||
|
throw e;
|
||||||
|
}
|
||||||
|
MetadataStore store = initMetadataStore(false);
|
||||||
|
store.updateParameters(options);
|
||||||
|
printStoreDiagnostics(out, store);
|
||||||
|
return SUCCESS;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Destroy a metadata store.
|
* Destroy a metadata store.
|
||||||
*/
|
*/
|
||||||
static class Destroy extends S3GuardTool {
|
static class Destroy extends S3GuardTool {
|
||||||
private static final String NAME = "destroy";
|
public static final String NAME = "destroy";
|
||||||
public static final String PURPOSE = "destroy Metadata Store data "
|
public static final String PURPOSE = "destroy Metadata Store data "
|
||||||
+ DATA_IN_S3_IS_PRESERVED;
|
+ DATA_IN_S3_IS_PRESERVED;
|
||||||
private static final String USAGE = NAME + " [OPTIONS] [s3a://BUCKET]\n" +
|
private static final String USAGE = NAME + " [OPTIONS] [s3a://BUCKET]\n" +
|
||||||
|
@ -383,19 +495,21 @@ public abstract class S3GuardTool extends Configured implements Tool {
|
||||||
return USAGE;
|
return USAGE;
|
||||||
}
|
}
|
||||||
|
|
||||||
public int run(String[] args) throws IOException {
|
public int run(String[] args, PrintStream out) throws Exception {
|
||||||
List<String> paths = parseArgs(args);
|
List<String> paths = parseArgs(args);
|
||||||
if (!parseDynamoDBRegion(paths)) {
|
try {
|
||||||
System.err.println(USAGE);
|
parseDynamoDBRegion(paths);
|
||||||
return INVALID_ARGUMENT;
|
} catch (ExitUtil.ExitException e) {
|
||||||
|
errorln(USAGE);
|
||||||
|
throw e;
|
||||||
}
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
initMetadataStore(false);
|
initMetadataStore(false);
|
||||||
} catch (FileNotFoundException e) {
|
} catch (FileNotFoundException e) {
|
||||||
// indication that the table was not found
|
// indication that the table was not found
|
||||||
|
println(out, "Metadata Store does not exist.");
|
||||||
LOG.debug("Failed to bind to store to be destroyed", e);
|
LOG.debug("Failed to bind to store to be destroyed", e);
|
||||||
LOG.info("Metadata Store does not exist.");
|
|
||||||
return SUCCESS;
|
return SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -403,7 +517,7 @@ public abstract class S3GuardTool extends Configured implements Tool {
|
||||||
"Metadata Store is not initialized");
|
"Metadata Store is not initialized");
|
||||||
|
|
||||||
getStore().destroy();
|
getStore().destroy();
|
||||||
LOG.info("Metadata store is deleted.");
|
println(out, "Metadata store is deleted.");
|
||||||
return SUCCESS;
|
return SUCCESS;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -412,7 +526,7 @@ public abstract class S3GuardTool extends Configured implements Tool {
|
||||||
* Import s3 metadata to the metadata store.
|
* Import s3 metadata to the metadata store.
|
||||||
*/
|
*/
|
||||||
static class Import extends S3GuardTool {
|
static class Import extends S3GuardTool {
|
||||||
private static final String NAME = "import";
|
public static final String NAME = "import";
|
||||||
public static final String PURPOSE = "import metadata from existing S3 " +
|
public static final String PURPOSE = "import metadata from existing S3 " +
|
||||||
"data";
|
"data";
|
||||||
private static final String USAGE = NAME + " [OPTIONS] [s3a://BUCKET]\n" +
|
private static final String USAGE = NAME + " [OPTIONS] [s3a://BUCKET]\n" +
|
||||||
|
@ -498,21 +612,16 @@ public abstract class S3GuardTool extends Configured implements Tool {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public int run(String[] args) throws IOException {
|
public int run(String[] args, PrintStream out) throws Exception {
|
||||||
List<String> paths = parseArgs(args);
|
List<String> paths = parseArgs(args);
|
||||||
if (paths.isEmpty()) {
|
if (paths.isEmpty()) {
|
||||||
System.err.println(getUsage());
|
errorln(getUsage());
|
||||||
return INVALID_ARGUMENT;
|
throw invalidArgs("no arguments");
|
||||||
}
|
}
|
||||||
String s3Path = paths.get(0);
|
String s3Path = paths.get(0);
|
||||||
initS3AFileSystem(s3Path);
|
initS3AFileSystem(s3Path);
|
||||||
|
|
||||||
URI uri;
|
URI uri = toUri(s3Path);
|
||||||
try {
|
|
||||||
uri = new URI(s3Path);
|
|
||||||
} catch (URISyntaxException e) {
|
|
||||||
throw new IOException(e);
|
|
||||||
}
|
|
||||||
String filePath = uri.getPath();
|
String filePath = uri.getPath();
|
||||||
if (filePath.isEmpty()) {
|
if (filePath.isEmpty()) {
|
||||||
// If they specify a naked S3 URI (e.g. s3a://bucket), we'll consider
|
// If they specify a naked S3 URI (e.g. s3a://bucket), we'll consider
|
||||||
|
@ -522,7 +631,11 @@ public abstract class S3GuardTool extends Configured implements Tool {
|
||||||
Path path = new Path(filePath);
|
Path path = new Path(filePath);
|
||||||
FileStatus status = getFilesystem().getFileStatus(path);
|
FileStatus status = getFilesystem().getFileStatus(path);
|
||||||
|
|
||||||
initMetadataStore(false);
|
try {
|
||||||
|
initMetadataStore(false);
|
||||||
|
} catch (FileNotFoundException e) {
|
||||||
|
throw storeNotFound(e);
|
||||||
|
}
|
||||||
|
|
||||||
long items = 1;
|
long items = 1;
|
||||||
if (status.isFile()) {
|
if (status.isFile()) {
|
||||||
|
@ -532,17 +645,18 @@ public abstract class S3GuardTool extends Configured implements Tool {
|
||||||
items = importDir(status);
|
items = importDir(status);
|
||||||
}
|
}
|
||||||
|
|
||||||
System.out.printf("Inserted %d items into Metadata Store%n", items);
|
println(out, "Inserted %d items into Metadata Store", items);
|
||||||
|
|
||||||
return SUCCESS;
|
return SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Show diffs between the s3 and metadata store.
|
* Show diffs between the s3 and metadata store.
|
||||||
*/
|
*/
|
||||||
static class Diff extends S3GuardTool {
|
static class Diff extends S3GuardTool {
|
||||||
private static final String NAME = "diff";
|
public static final String NAME = "diff";
|
||||||
public static final String PURPOSE = "report on delta between S3 and " +
|
public static final String PURPOSE = "report on delta between S3 and " +
|
||||||
"repository";
|
"repository";
|
||||||
private static final String USAGE = NAME + " [OPTIONS] s3a://BUCKET\n" +
|
private static final String USAGE = NAME + " [OPTIONS] s3a://BUCKET\n" +
|
||||||
|
@ -625,10 +739,10 @@ public abstract class S3GuardTool extends Configured implements Tool {
|
||||||
|
|
||||||
if (differ(msStatus, s3Status)) {
|
if (differ(msStatus, s3Status)) {
|
||||||
if (s3Status != null) {
|
if (s3Status != null) {
|
||||||
out.printf("%s%s%s%n", S3_PREFIX, SEP, formatFileStatus(s3Status));
|
println(out, "%s%s%s", S3_PREFIX, SEP, formatFileStatus(s3Status));
|
||||||
}
|
}
|
||||||
if (msStatus != null) {
|
if (msStatus != null) {
|
||||||
out.printf("%s%s%s%n", MS_PREFIX, SEP, formatFileStatus(msStatus));
|
println(out, "%s%s%s", MS_PREFIX, SEP, formatFileStatus(msStatus));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -705,6 +819,7 @@ public abstract class S3GuardTool extends Configured implements Tool {
|
||||||
try {
|
try {
|
||||||
s3Status = getFilesystem().getFileStatus(qualified);
|
s3Status = getFilesystem().getFileStatus(qualified);
|
||||||
} catch (FileNotFoundException e) {
|
} catch (FileNotFoundException e) {
|
||||||
|
/* ignored */
|
||||||
}
|
}
|
||||||
PathMetadata meta = getStore().get(qualified);
|
PathMetadata meta = getStore().get(qualified);
|
||||||
FileStatus msStatus = (meta != null && !meta.isDeleted()) ?
|
FileStatus msStatus = (meta != null && !meta.isDeleted()) ?
|
||||||
|
@ -717,18 +832,13 @@ public abstract class S3GuardTool extends Configured implements Tool {
|
||||||
List<String> paths = parseArgs(args);
|
List<String> paths = parseArgs(args);
|
||||||
if (paths.isEmpty()) {
|
if (paths.isEmpty()) {
|
||||||
out.println(USAGE);
|
out.println(USAGE);
|
||||||
return INVALID_ARGUMENT;
|
throw invalidArgs("no arguments");
|
||||||
}
|
}
|
||||||
String s3Path = paths.get(0);
|
String s3Path = paths.get(0);
|
||||||
initS3AFileSystem(s3Path);
|
initS3AFileSystem(s3Path);
|
||||||
initMetadataStore(true);
|
initMetadataStore(false);
|
||||||
|
|
||||||
URI uri;
|
URI uri = toUri(s3Path);
|
||||||
try {
|
|
||||||
uri = new URI(s3Path);
|
|
||||||
} catch (URISyntaxException e) {
|
|
||||||
throw new IOException(e);
|
|
||||||
}
|
|
||||||
Path root;
|
Path root;
|
||||||
if (uri.getPath().isEmpty()) {
|
if (uri.getPath().isEmpty()) {
|
||||||
root = new Path("/");
|
root = new Path("/");
|
||||||
|
@ -741,17 +851,13 @@ public abstract class S3GuardTool extends Configured implements Tool {
|
||||||
return SUCCESS;
|
return SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public int run(String[] args) throws IOException {
|
|
||||||
return run(args, System.out);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Prune metadata that has not been modified recently.
|
* Prune metadata that has not been modified recently.
|
||||||
*/
|
*/
|
||||||
static class Prune extends S3GuardTool {
|
static class Prune extends S3GuardTool {
|
||||||
private static final String NAME = "prune";
|
public static final String NAME = "prune";
|
||||||
public static final String PURPOSE = "truncate older metadata from " +
|
public static final String PURPOSE = "truncate older metadata from " +
|
||||||
"repository "
|
"repository "
|
||||||
+ DATA_IN_S3_IS_PRESERVED;;
|
+ DATA_IN_S3_IS_PRESERVED;;
|
||||||
|
@ -803,18 +909,19 @@ public abstract class S3GuardTool extends Configured implements Tool {
|
||||||
return unit.toMillis(parsed);
|
return unit.toMillis(parsed);
|
||||||
}
|
}
|
||||||
|
|
||||||
@VisibleForTesting
|
|
||||||
public int run(String[] args, PrintStream out) throws
|
public int run(String[] args, PrintStream out) throws
|
||||||
InterruptedException, IOException {
|
InterruptedException, IOException {
|
||||||
List<String> paths = parseArgs(args);
|
List<String> paths = parseArgs(args);
|
||||||
if (!parseDynamoDBRegion(paths)) {
|
try {
|
||||||
System.err.println(USAGE);
|
parseDynamoDBRegion(paths);
|
||||||
return INVALID_ARGUMENT;
|
} catch (ExitUtil.ExitException e) {
|
||||||
|
errorln(USAGE);
|
||||||
|
throw e;
|
||||||
}
|
}
|
||||||
initMetadataStore(false);
|
initMetadataStore(false);
|
||||||
|
|
||||||
Configuration conf = getConf();
|
Configuration conf = getConf();
|
||||||
long confDelta = conf.getLong(Constants.S3GUARD_CLI_PRUNE_AGE, 0);
|
long confDelta = conf.getLong(S3GUARD_CLI_PRUNE_AGE, 0);
|
||||||
|
|
||||||
long cliDelta = 0;
|
long cliDelta = 0;
|
||||||
cliDelta += getDeltaComponent(TimeUnit.DAYS, "days");
|
cliDelta += getDeltaComponent(TimeUnit.DAYS, "days");
|
||||||
|
@ -823,8 +930,7 @@ public abstract class S3GuardTool extends Configured implements Tool {
|
||||||
cliDelta += getDeltaComponent(TimeUnit.SECONDS, "seconds");
|
cliDelta += getDeltaComponent(TimeUnit.SECONDS, "seconds");
|
||||||
|
|
||||||
if (confDelta <= 0 && cliDelta <= 0) {
|
if (confDelta <= 0 && cliDelta <= 0) {
|
||||||
System.err.println(
|
errorln("You must specify a positive age for metadata to prune.");
|
||||||
"You must specify a positive age for metadata to prune.");
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// A delta provided on the CLI overrides if one is configured
|
// A delta provided on the CLI overrides if one is configured
|
||||||
|
@ -842,35 +948,235 @@ public abstract class S3GuardTool extends Configured implements Tool {
|
||||||
return SUCCESS;
|
return SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
}
|
||||||
public int run(String[] args) throws InterruptedException, IOException {
|
|
||||||
return run(args, System.out);
|
/**
|
||||||
|
* Get info about a bucket and its S3Guard integration status.
|
||||||
|
*/
|
||||||
|
static class BucketInfo extends S3GuardTool {
|
||||||
|
public static final String NAME = "bucket-info";
|
||||||
|
public static final String GUARDED_FLAG = "guarded";
|
||||||
|
public static final String UNGUARDED_FLAG = "unguarded";
|
||||||
|
public static final String AUTH_FLAG = "auth";
|
||||||
|
public static final String NONAUTH_FLAG = "nonauth";
|
||||||
|
public static final String ENCRYPTION_FLAG = "encryption";
|
||||||
|
|
||||||
|
public static final String PURPOSE = "provide/check S3Guard information"
|
||||||
|
+ " about a specific bucket";
|
||||||
|
private static final String USAGE = NAME + " [OPTIONS] s3a://BUCKET\n"
|
||||||
|
+ "\t" + PURPOSE + "\n\n"
|
||||||
|
+ "Common options:\n"
|
||||||
|
+ " -" + GUARDED_FLAG + " - Require S3Guard\n"
|
||||||
|
+ " -" + ENCRYPTION_FLAG
|
||||||
|
+ " -require {none, sse-s3, sse-kms} - Require encryption policy";
|
||||||
|
|
||||||
|
BucketInfo(Configuration conf) {
|
||||||
|
super(conf, GUARDED_FLAG, UNGUARDED_FLAG, AUTH_FLAG, NONAUTH_FLAG);
|
||||||
|
CommandFormat format = getCommandFormat();
|
||||||
|
format.addOptionWithValue(ENCRYPTION_FLAG);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
String getName() {
|
||||||
|
return NAME;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String getUsage() {
|
||||||
|
return USAGE;
|
||||||
|
}
|
||||||
|
|
||||||
|
public int run(String[] args, PrintStream out)
|
||||||
|
throws InterruptedException, IOException {
|
||||||
|
List<String> paths = parseArgs(args);
|
||||||
|
if (paths.isEmpty()) {
|
||||||
|
errorln(getUsage());
|
||||||
|
throw invalidArgs("No bucket specified");
|
||||||
|
}
|
||||||
|
String s3Path = paths.get(0);
|
||||||
|
S3AFileSystem fs = (S3AFileSystem) FileSystem.newInstance(
|
||||||
|
toUri(s3Path), getConf());
|
||||||
|
setFilesystem(fs);
|
||||||
|
Configuration conf = fs.getConf();
|
||||||
|
URI fsUri = fs.getUri();
|
||||||
|
MetadataStore store = fs.getMetadataStore();
|
||||||
|
println(out, "Filesystem %s", fsUri);
|
||||||
|
println(out, "Location: %s", fs.getBucketLocation());
|
||||||
|
boolean usingS3Guard = !(store instanceof NullMetadataStore);
|
||||||
|
boolean authMode = false;
|
||||||
|
if (usingS3Guard) {
|
||||||
|
out.printf("Filesystem %s is using S3Guard with store %s%n",
|
||||||
|
fsUri, store.toString());
|
||||||
|
printOption(out, "Authoritative S3Guard",
|
||||||
|
METADATASTORE_AUTHORITATIVE, "false");
|
||||||
|
authMode = conf.getBoolean(METADATASTORE_AUTHORITATIVE, false);
|
||||||
|
printStoreDiagnostics(out, store);
|
||||||
|
} else {
|
||||||
|
println(out, "Filesystem %s is not using S3Guard", fsUri);
|
||||||
|
}
|
||||||
|
println(out, "%nS3A Client");
|
||||||
|
|
||||||
|
String endpoint = conf.getTrimmed(ENDPOINT, "");
|
||||||
|
println(out, "\tEndpoint: %s=%s",
|
||||||
|
ENDPOINT,
|
||||||
|
StringUtils.isNotEmpty(endpoint) ? endpoint : "(unset)");
|
||||||
|
String encryption =
|
||||||
|
printOption(out, "\tEncryption", SERVER_SIDE_ENCRYPTION_ALGORITHM,
|
||||||
|
"none");
|
||||||
|
printOption(out, "\tInput seek policy", INPUT_FADVISE, INPUT_FADV_NORMAL);
|
||||||
|
|
||||||
|
CommandFormat commands = getCommandFormat();
|
||||||
|
if (usingS3Guard) {
|
||||||
|
if (commands.getOpt(UNGUARDED_FLAG)) {
|
||||||
|
throw badState("S3Guard is enabled for %s", fsUri);
|
||||||
|
}
|
||||||
|
if (commands.getOpt(AUTH_FLAG) && !authMode) {
|
||||||
|
throw badState("S3Guard is enabled for %s,"
|
||||||
|
+ " but not in authoritative mode", fsUri);
|
||||||
|
}
|
||||||
|
if (commands.getOpt(NONAUTH_FLAG) && authMode) {
|
||||||
|
throw badState("S3Guard is enabled in authoritative mode for %s",
|
||||||
|
fsUri);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
if (commands.getOpt(GUARDED_FLAG)) {
|
||||||
|
throw badState("S3Guard is not enabled for %s", fsUri);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
String desiredEncryption = getCommandFormat()
|
||||||
|
.getOptValue(ENCRYPTION_FLAG);
|
||||||
|
if (StringUtils.isNotEmpty(desiredEncryption)
|
||||||
|
&& !desiredEncryption.equalsIgnoreCase(encryption)) {
|
||||||
|
throw badState("Bucket %s: required encryption is %s"
|
||||||
|
+ " but actual encryption is %s",
|
||||||
|
fsUri, desiredEncryption, encryption);
|
||||||
|
}
|
||||||
|
|
||||||
|
out.flush();
|
||||||
|
return SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
private String printOption(PrintStream out,
|
||||||
|
String description, String key, String defVal) {
|
||||||
|
String t = getFilesystem().getConf().getTrimmed(key, defVal);
|
||||||
|
println(out, "%s: %s=%s", description, key, t);
|
||||||
|
return t;
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private static S3GuardTool command;
|
private static S3GuardTool command;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Convert a path to a URI, catching any {@code URISyntaxException}
|
||||||
|
* and converting to an invalid args exception.
|
||||||
|
* @param s3Path path to convert to a URI
|
||||||
|
* @return a URI of the path
|
||||||
|
* @throws ExitUtil.ExitException INVALID_ARGUMENT if the URI is invalid
|
||||||
|
*/
|
||||||
|
protected static URI toUri(String s3Path) {
|
||||||
|
URI uri;
|
||||||
|
try {
|
||||||
|
uri = new URI(s3Path);
|
||||||
|
} catch (URISyntaxException e) {
|
||||||
|
throw invalidArgs("Not a valid fileystem path: %s", s3Path);
|
||||||
|
}
|
||||||
|
return uri;
|
||||||
|
}
|
||||||
|
|
||||||
private static void printHelp() {
|
private static void printHelp() {
|
||||||
if (command == null) {
|
if (command == null) {
|
||||||
System.err.println("Usage: hadoop " + USAGE);
|
errorln("Usage: hadoop " + USAGE);
|
||||||
System.err.println("\tperform S3Guard metadata store " +
|
errorln("\tperform S3Guard metadata store " +
|
||||||
"administrative commands.");
|
"administrative commands.");
|
||||||
} else {
|
} else {
|
||||||
System.err.println("Usage: hadoop " + command.getUsage());
|
errorln("Usage: hadoop " + command.getUsage());
|
||||||
}
|
}
|
||||||
|
errorln();
|
||||||
|
errorln(COMMON_USAGE);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static void errorln() {
|
||||||
System.err.println();
|
System.err.println();
|
||||||
System.err.println(COMMON_USAGE);
|
}
|
||||||
|
|
||||||
|
private static void errorln(String x) {
|
||||||
|
System.err.println(x);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Print a formatted string followed by a newline to the output stream.
|
||||||
|
* @param out destination
|
||||||
|
* @param format format string
|
||||||
|
* @param args optional arguments
|
||||||
|
*/
|
||||||
|
private static void println(PrintStream out, String format, Object... args) {
|
||||||
|
out.println(String.format(format, args));
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Retrieve and Print store diagnostics.
|
||||||
|
* @param out output stream
|
||||||
|
* @param store store
|
||||||
|
* @throws IOException Failure to retrieve the data.
|
||||||
|
*/
|
||||||
|
protected static void printStoreDiagnostics(PrintStream out,
|
||||||
|
MetadataStore store)
|
||||||
|
throws IOException {
|
||||||
|
Map<String, String> diagnostics = store.getDiagnostics();
|
||||||
|
out.println("Metadata Store Diagnostics:");
|
||||||
|
for (Map.Entry<String, String> entry : diagnostics.entrySet()) {
|
||||||
|
println(out, "\t%s=%s", entry.getKey(), entry.getValue());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Handle store not found by converting to an exit exception
|
||||||
|
* with specific error code.
|
||||||
|
* @param e exception
|
||||||
|
* @return a new exception to throw
|
||||||
|
*/
|
||||||
|
protected static ExitUtil.ExitException storeNotFound(
|
||||||
|
FileNotFoundException e) {
|
||||||
|
return new ExitUtil.ExitException(
|
||||||
|
E_NOT_FOUND, e.toString(), e);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Build the exception to raise on invalid arguments.
|
||||||
|
* @param format string format
|
||||||
|
* @param args optional arguments for the string
|
||||||
|
* @return a new exception to throw
|
||||||
|
*/
|
||||||
|
protected static ExitUtil.ExitException invalidArgs(
|
||||||
|
String format, Object...args) {
|
||||||
|
return new ExitUtil.ExitException(INVALID_ARGUMENT,
|
||||||
|
String.format(format, args));
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Build the exception to raise on a bad store/bucket state.
|
||||||
|
* @param format string format
|
||||||
|
* @param args optional arguments for the string
|
||||||
|
* @return a new exception to throw
|
||||||
|
*/
|
||||||
|
protected static ExitUtil.ExitException badState(
|
||||||
|
String format, Object...args) {
|
||||||
|
return new ExitUtil.ExitException(E_BAD_STATE,
|
||||||
|
String.format(format, args));
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Execute the command with the given arguments.
|
* Execute the command with the given arguments.
|
||||||
*
|
*
|
||||||
* @param args command specific arguments.
|
|
||||||
* @param conf Hadoop configuration.
|
* @param conf Hadoop configuration.
|
||||||
|
* @param args command specific arguments.
|
||||||
* @return exit code.
|
* @return exit code.
|
||||||
* @throws Exception on I/O errors.
|
* @throws Exception on I/O errors.
|
||||||
*/
|
*/
|
||||||
public static int run(String[] args, Configuration conf) throws
|
public static int run(Configuration conf, String...args) throws
|
||||||
Exception {
|
Exception {
|
||||||
/* ToolRunner.run does this too, but we must do it before looking at
|
/* ToolRunner.run does this too, but we must do it before looking at
|
||||||
subCommand or instantiating the cmd object below */
|
subCommand or instantiating the cmd object below */
|
||||||
|
@ -878,9 +1184,10 @@ public abstract class S3GuardTool extends Configured implements Tool {
|
||||||
.getRemainingArgs();
|
.getRemainingArgs();
|
||||||
if (otherArgs.length == 0) {
|
if (otherArgs.length == 0) {
|
||||||
printHelp();
|
printHelp();
|
||||||
return INVALID_ARGUMENT;
|
throw new ExitUtil.ExitException(E_USAGE, "No arguments provided");
|
||||||
}
|
}
|
||||||
final String subCommand = otherArgs[0];
|
final String subCommand = otherArgs[0];
|
||||||
|
LOG.debug("Executing command {}", subCommand);
|
||||||
switch (subCommand) {
|
switch (subCommand) {
|
||||||
case Init.NAME:
|
case Init.NAME:
|
||||||
command = new Init(conf);
|
command = new Init(conf);
|
||||||
|
@ -891,15 +1198,22 @@ public abstract class S3GuardTool extends Configured implements Tool {
|
||||||
case Import.NAME:
|
case Import.NAME:
|
||||||
command = new Import(conf);
|
command = new Import(conf);
|
||||||
break;
|
break;
|
||||||
|
case BucketInfo.NAME:
|
||||||
|
command = new BucketInfo(conf);
|
||||||
|
break;
|
||||||
case Diff.NAME:
|
case Diff.NAME:
|
||||||
command = new Diff(conf);
|
command = new Diff(conf);
|
||||||
break;
|
break;
|
||||||
case Prune.NAME:
|
case Prune.NAME:
|
||||||
command = new Prune(conf);
|
command = new Prune(conf);
|
||||||
break;
|
break;
|
||||||
|
case SetCapacity.NAME:
|
||||||
|
command = new SetCapacity(conf);
|
||||||
|
break;
|
||||||
default:
|
default:
|
||||||
printHelp();
|
printHelp();
|
||||||
return INVALID_ARGUMENT;
|
throw new ExitUtil.ExitException(E_USAGE,
|
||||||
|
"Unknown command " + subCommand);
|
||||||
}
|
}
|
||||||
return ToolRunner.run(conf, command, otherArgs);
|
return ToolRunner.run(conf, command, otherArgs);
|
||||||
}
|
}
|
||||||
|
@ -910,15 +1224,22 @@ public abstract class S3GuardTool extends Configured implements Tool {
|
||||||
*/
|
*/
|
||||||
public static void main(String[] args) {
|
public static void main(String[] args) {
|
||||||
try {
|
try {
|
||||||
int ret = run(args, new Configuration());
|
int ret = run(new Configuration(), args);
|
||||||
System.exit(ret);
|
exit(ret, "");
|
||||||
} catch (CommandFormat.UnknownOptionException e) {
|
} catch (CommandFormat.UnknownOptionException e) {
|
||||||
System.err.println(e.getMessage());
|
errorln(e.getMessage());
|
||||||
printHelp();
|
printHelp();
|
||||||
System.exit(INVALID_ARGUMENT);
|
exit(E_USAGE, e.getMessage());
|
||||||
|
} catch (ExitUtil.ExitException e) {
|
||||||
|
// explicitly raised exit code
|
||||||
|
exit(e.getExitCode(), e.toString());
|
||||||
} catch (Throwable e) {
|
} catch (Throwable e) {
|
||||||
e.printStackTrace(System.err);
|
e.printStackTrace(System.err);
|
||||||
System.exit(ERROR);
|
exit(ERROR, e.toString());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
protected static void exit(int status, String text) {
|
||||||
|
ExitUtil.terminate(status, text);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -388,7 +388,7 @@ Example 2
|
||||||
hadoop s3guard init -meta dynamodb://ireland-team -region eu-west-1
|
hadoop s3guard init -meta dynamodb://ireland-team -region eu-west-1
|
||||||
```
|
```
|
||||||
|
|
||||||
Creates a table "ireland-team" in the same region "s3-eu-west-1.amazonaws.com"
|
Creates a table "ireland-team" in the region "eu-west-1.amazonaws.com"
|
||||||
|
|
||||||
|
|
||||||
### Import a bucket: `s3guard import`
|
### Import a bucket: `s3guard import`
|
||||||
|
@ -422,6 +422,98 @@ Example
|
||||||
hadoop s3guard diff s3a://ireland-1
|
hadoop s3guard diff s3a://ireland-1
|
||||||
```
|
```
|
||||||
|
|
||||||
|
### Display information about a bucket, `s3guard bucket-info`
|
||||||
|
|
||||||
|
Prints and optionally checks the s3guard and encryption status of a bucket.
|
||||||
|
|
||||||
|
```bash
|
||||||
|
hadoop s3guard bucket-info [ -guarded ] [-unguarded] [-auth] [-nonauth] [-encryption ENCRYPTION] s3a://BUCKET
|
||||||
|
```
|
||||||
|
|
||||||
|
Options
|
||||||
|
|
||||||
|
| argument | meaning |
|
||||||
|
|-----------|-------------|
|
||||||
|
| `-guarded` | Require S3Guard to be enabled |
|
||||||
|
| `-unguarded` | Require S3Guard to be disabled |
|
||||||
|
| `-auth` | Require the S3Guard mode to be "authoritative" |
|
||||||
|
| `-nonauth` | Require the S3Guard mode to be "non-authoritative" |
|
||||||
|
| `-encryption <type>` | Require a specific server-side encryption algorithm |
|
||||||
|
|
||||||
|
The server side encryption options are not directly related to S3Guard, but
|
||||||
|
it is often convenient to check them at the same time.
|
||||||
|
|
||||||
|
Example
|
||||||
|
|
||||||
|
```bash
|
||||||
|
hadoop s3guard bucket-info -guarded s3a://ireland-1
|
||||||
|
```
|
||||||
|
|
||||||
|
List the details of bucket `s3a://ireland-1`, mandating that it must have S3Guard enabled
|
||||||
|
|
||||||
|
```
|
||||||
|
Filesystem s3a://ireland-1
|
||||||
|
Location: eu-west-1
|
||||||
|
Filesystem s3a://ireland-1 is using S3Guard with store DynamoDBMetadataStore{region=eu-west-1, tableName=ireland-1}
|
||||||
|
Authoritative S3Guard: fs.s3a.metadatastore.authoritative=false
|
||||||
|
Metadata Store Diagnostics:
|
||||||
|
ARN=arn:aws:dynamodb:eu-west-1:00000000:table/ireland-1
|
||||||
|
description=S3Guard metadata store in DynamoDB
|
||||||
|
name=ireland-1
|
||||||
|
read-capacity=20
|
||||||
|
region=eu-west-1
|
||||||
|
retryPolicy=ExponentialBackoffRetry(maxRetries=9, sleepTime=100 MILLISECONDS)
|
||||||
|
size=12812
|
||||||
|
status=ACTIVE
|
||||||
|
table={AttributeDefinitions: [{AttributeName: child,AttributeType: S},
|
||||||
|
{AttributeName: parent,AttributeType: S}],TableName: ireland-1,
|
||||||
|
KeySchema: [{AttributeName: parent,KeyType: HASH}, {AttributeName: child,KeyType: RANGE}],
|
||||||
|
TableStatus: ACTIVE,
|
||||||
|
CreationDateTime: Fri Aug 25 19:07:25 BST 2017,
|
||||||
|
ProvisionedThroughput: {LastIncreaseDateTime: Tue Aug 29 11:45:18 BST 2017,
|
||||||
|
LastDecreaseDateTime: Wed Aug 30 15:37:51 BST 2017,
|
||||||
|
NumberOfDecreasesToday: 1,
|
||||||
|
ReadCapacityUnits: 20,WriteCapacityUnits: 20},
|
||||||
|
TableSizeBytes: 12812,ItemCount: 91,
|
||||||
|
TableArn: arn:aws:dynamodb:eu-west-1:00000000:table/ireland-1,}
|
||||||
|
write-capacity=20
|
||||||
|
|
||||||
|
S3A Client
|
||||||
|
Endpoint: fs.s3a.endpoint=s3-eu-west-1.amazonaws.com
|
||||||
|
Encryption: fs.s3a.server-side-encryption-algorithm=none
|
||||||
|
Input seek policy: fs.s3a.experimental.input.fadvise=normal
|
||||||
|
```
|
||||||
|
|
||||||
|
This listing includes all the information about the table supplied from
|
||||||
|
|
||||||
|
```bash
|
||||||
|
hadoop s3guard bucket-info -unguarded -encryption none s3a://landsat-pds
|
||||||
|
```
|
||||||
|
|
||||||
|
List the S3Guard status of clients of the public `landsat-pds` bucket,
|
||||||
|
and verifies that the data is neither tracked with S3Guard nor encrypted.
|
||||||
|
|
||||||
|
|
||||||
|
```
|
||||||
|
Filesystem s3a://landsat-pdsLocation: us-west-2
|
||||||
|
Filesystem s3a://landsat-pds is not using S3Guard
|
||||||
|
Endpoint: fs.s3a.endpoints3.amazonaws.com
|
||||||
|
Encryption: fs.s3a.server-side-encryption-algorithm=none
|
||||||
|
Input seek policy: fs.s3a.experimental.input.fadvise=normal
|
||||||
|
```
|
||||||
|
|
||||||
|
Note that other clients may have a S3Guard table set up to store metadata
|
||||||
|
on this bucket; the checks are all done from the perspective of the configuration
|
||||||
|
setttings of the current client.
|
||||||
|
|
||||||
|
```bash
|
||||||
|
hadoop s3guard bucket-info -guarded -auth s3a://landsat-pds
|
||||||
|
```
|
||||||
|
|
||||||
|
Require the bucket to be using S3Guard in authoritative mode. This will normally
|
||||||
|
fail against this specific bucket.
|
||||||
|
|
||||||
|
|
||||||
### Delete a table: `s3guard destroy`
|
### Delete a table: `s3guard destroy`
|
||||||
|
|
||||||
|
|
||||||
|
@ -450,7 +542,6 @@ hadoop s3guard destroy -meta dynamodb://ireland-team -region eu-west-1
|
||||||
```
|
```
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
### Clean up a table, `s3guard prune`
|
### Clean up a table, `s3guard prune`
|
||||||
|
|
||||||
Delete all file entries in the MetadataStore table whose object "modification
|
Delete all file entries in the MetadataStore table whose object "modification
|
||||||
|
@ -461,7 +552,7 @@ hadoop s3guard prune [-days DAYS] [-hours HOURS] [-minutes MINUTES]
|
||||||
[-seconds SECONDS] [-m URI] ( -region REGION | s3a://BUCKET )
|
[-seconds SECONDS] [-m URI] ( -region REGION | s3a://BUCKET )
|
||||||
```
|
```
|
||||||
|
|
||||||
A time value must be supplied.
|
A time value of hours, minutes and/or seconds must be supplied.
|
||||||
|
|
||||||
1. This does not delete the entries in the bucket itself.
|
1. This does not delete the entries in the bucket itself.
|
||||||
1. The modification time is effectively the creation time of the objects
|
1. The modification time is effectively the creation time of the objects
|
||||||
|
@ -486,6 +577,63 @@ Delete all entries more than 90 minutes old from the table "ireland-team" in
|
||||||
the region "eu-west-1".
|
the region "eu-west-1".
|
||||||
|
|
||||||
|
|
||||||
|
### Tune the IO capacity of the DynamoDB Table, `s3guard set-capacity`
|
||||||
|
|
||||||
|
Alter the read and/or write capacity of a s3guard table.
|
||||||
|
|
||||||
|
```bash
|
||||||
|
hadoop s3guard set-capacity [--read UNIT] [--write UNIT] ( -region REGION | s3a://BUCKET )
|
||||||
|
```
|
||||||
|
|
||||||
|
The `--read` and `--write` units are those of `s3guard init`.
|
||||||
|
|
||||||
|
|
||||||
|
Example
|
||||||
|
|
||||||
|
```
|
||||||
|
hadoop s3guard set-capacity -read 20 -write 20 s3a://ireland-1
|
||||||
|
```
|
||||||
|
|
||||||
|
Set the capacity of the table used by bucket `s3a://ireland-1` to 20 read
|
||||||
|
and 20 write. (This is a low number, incidentally)
|
||||||
|
|
||||||
|
```
|
||||||
|
2017-08-30 16:21:26,343 [main] INFO s3guard.S3GuardTool (S3GuardTool.java:initMetadataStore(229)) - Metadata store DynamoDBMetadataStore{region=eu-west-1, tableName=ireland-1} is initialized.
|
||||||
|
2017-08-30 16:21:26,344 [main] INFO s3guard.DynamoDBMetadataStore (DynamoDBMetadataStore.java:updateParameters(1084)) - Current table capacity is read: 25, write: 25
|
||||||
|
2017-08-30 16:21:26,344 [main] INFO s3guard.DynamoDBMetadataStore (DynamoDBMetadataStore.java:updateParameters(1086)) - Changing capacity of table to read: 20, write: 20
|
||||||
|
Metadata Store Diagnostics:
|
||||||
|
ARN=arn:aws:dynamodb:eu-west-1:00000000000:table/ireland-1
|
||||||
|
description=S3Guard metadata store in DynamoDB
|
||||||
|
name=ireland-1
|
||||||
|
read-capacity=25
|
||||||
|
region=eu-west-1
|
||||||
|
retryPolicy=ExponentialBackoffRetry(maxRetries=9, sleepTime=100 MILLISECONDS)
|
||||||
|
size=12812
|
||||||
|
status=UPDATING
|
||||||
|
table={ ... }
|
||||||
|
write-capacity=25
|
||||||
|
```
|
||||||
|
|
||||||
|
After the update, the table status changes to `UPDATING`; this is a sign that
|
||||||
|
the capacity has been changed
|
||||||
|
|
||||||
|
Repeating the same command will not change the capacity, as both read and
|
||||||
|
write values match that already in use
|
||||||
|
|
||||||
|
```
|
||||||
|
2017-08-30 16:24:35,337 [main] INFO s3guard.DynamoDBMetadataStore (DynamoDBMetadataStore.java:updateParameters(1090)) - Table capacity unchanged at read: 20, write: 20
|
||||||
|
Metadata Store Diagnostics:
|
||||||
|
ARN=arn:aws:dynamodb:eu-west-1:00000000000:table/ireland-1
|
||||||
|
description=S3Guard metadata store in DynamoDB
|
||||||
|
name=ireland-1
|
||||||
|
read-capacity=20
|
||||||
|
region=eu-west-1
|
||||||
|
retryPolicy=ExponentialBackoffRetry(maxRetries=9, sleepTime=100 MILLISECONDS)
|
||||||
|
size=12812
|
||||||
|
status=ACTIVE
|
||||||
|
table={ ... }
|
||||||
|
write-capacity=20
|
||||||
|
```
|
||||||
|
|
||||||
## Debugging and Error Handling
|
## Debugging and Error Handling
|
||||||
|
|
||||||
|
@ -608,3 +756,14 @@ or the configuration is preventing S3Guard from finding the table.
|
||||||
1. If the region is not set, verify that the table exists in the same
|
1. If the region is not set, verify that the table exists in the same
|
||||||
region as the bucket being used.
|
region as the bucket being used.
|
||||||
1. Create the table if necessary.
|
1. Create the table if necessary.
|
||||||
|
|
||||||
|
### Error `"The level of configured provisioned throughput for the table was exceeded"`
|
||||||
|
|
||||||
|
The IO load of clients of the (shared) DynamoDB table was exceeded.
|
||||||
|
|
||||||
|
Currently S3Guard doesn't do any throttling and retries here; the way to address
|
||||||
|
this is to increase capacity via the AWS console or the `set-capacity` command.
|
||||||
|
|
||||||
|
## Other Topis
|
||||||
|
|
||||||
|
For details on how to test S3Guard, see [Testing S3Guard](./testing.html#s3guard)
|
||||||
|
|
|
@ -591,6 +591,16 @@ public class ITestS3AConfiguration {
|
||||||
assertOptionEquals(updated, USER_AGENT_PREFIX, "UA-c");
|
assertOptionEquals(updated, USER_AGENT_PREFIX, "UA-c");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testClearBucketOption() throws Throwable {
|
||||||
|
Configuration config = new Configuration();
|
||||||
|
config.set(USER_AGENT_PREFIX, "base");
|
||||||
|
setBucketOption(config, "bucket", USER_AGENT_PREFIX, "overridden");
|
||||||
|
clearBucketOption(config, "bucket", USER_AGENT_PREFIX);
|
||||||
|
Configuration updated = propagateBucketOptions(config, "c");
|
||||||
|
assertOptionEquals(updated, USER_AGENT_PREFIX, "base");
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testBucketConfigurationSkipsUnmodifiable() throws Throwable {
|
public void testBucketConfigurationSkipsUnmodifiable() throws Throwable {
|
||||||
Configuration config = new Configuration(false);
|
Configuration config = new Configuration(false);
|
||||||
|
|
|
@ -435,16 +435,16 @@ public class ITestS3GuardListConsistency extends AbstractS3ATestBase {
|
||||||
// files to list are delaying visibility
|
// files to list are delaying visibility
|
||||||
if (!recursive) {
|
if (!recursive) {
|
||||||
// in this case only the top level files are listed
|
// in this case only the top level files are listed
|
||||||
|
verifyFileIsListed(listedFiles, baseTestDir, fileNames);
|
||||||
assertEquals("Unexpected number of files returned by listFiles() call",
|
assertEquals("Unexpected number of files returned by listFiles() call",
|
||||||
normalFileNum + delayedFileNum, listedFiles.size());
|
normalFileNum + delayedFileNum, listedFiles.size());
|
||||||
verifyFileIsListed(listedFiles, baseTestDir, fileNames);
|
|
||||||
} else {
|
} else {
|
||||||
assertEquals("Unexpected number of files returned by listFiles() call",
|
|
||||||
filesAndEmptyDirectories,
|
|
||||||
listedFiles.size());
|
|
||||||
for (Path dir : testDirs) {
|
for (Path dir : testDirs) {
|
||||||
verifyFileIsListed(listedFiles, dir, fileNames);
|
verifyFileIsListed(listedFiles, dir, fileNames);
|
||||||
}
|
}
|
||||||
|
assertEquals("Unexpected number of files returned by listFiles() call",
|
||||||
|
filesAndEmptyDirectories,
|
||||||
|
listedFiles.size());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -22,15 +22,17 @@ import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.FileStatus;
|
import org.apache.hadoop.fs.FileStatus;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.fs.contract.ContractTestUtils;
|
||||||
import org.apache.hadoop.fs.s3a.s3guard.DirListingMetadata;
|
import org.apache.hadoop.fs.s3a.s3guard.DirListingMetadata;
|
||||||
import org.junit.Assume;
|
import org.junit.Assume;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
import java.io.FileNotFoundException;
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.net.URI;
|
import java.net.URI;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.fs.s3a.Constants.*;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Test cases that validate S3Guard's behavior for writing things like
|
* Test cases that validate S3Guard's behavior for writing things like
|
||||||
* directory listings back to the MetadataStore.
|
* directory listings back to the MetadataStore.
|
||||||
|
@ -66,7 +68,7 @@ public class ITestS3GuardWriteBack extends AbstractS3ATestBase {
|
||||||
noS3Guard.mkdirs(new Path(directory, "OnS3"));
|
noS3Guard.mkdirs(new Path(directory, "OnS3"));
|
||||||
// Create a directory on both S3 and metadata store
|
// Create a directory on both S3 and metadata store
|
||||||
Path p = new Path(directory, "OnS3AndMS");
|
Path p = new Path(directory, "OnS3AndMS");
|
||||||
assertPathDoesntExist(noWriteBack, p);
|
ContractTestUtils.assertPathDoesNotExist(noWriteBack, "path", p);
|
||||||
noWriteBack.mkdirs(p);
|
noWriteBack.mkdirs(p);
|
||||||
|
|
||||||
FileStatus[] fsResults;
|
FileStatus[] fsResults;
|
||||||
|
@ -87,7 +89,7 @@ public class ITestS3GuardWriteBack extends AbstractS3ATestBase {
|
||||||
|
|
||||||
// FS should return both (and will write it back)
|
// FS should return both (and will write it back)
|
||||||
fsResults = yesWriteBack.listStatus(directory);
|
fsResults = yesWriteBack.listStatus(directory);
|
||||||
assertEquals("Filesystem enabled S3Guard with write back should have "
|
assertEquals("Filesystem enabled S3Guard with write back should have"
|
||||||
+ " both /OnS3 and /OnS3AndMS: " + Arrays.toString(fsResults),
|
+ " both /OnS3 and /OnS3AndMS: " + Arrays.toString(fsResults),
|
||||||
2, fsResults.length);
|
2, fsResults.length);
|
||||||
|
|
||||||
|
@ -104,7 +106,12 @@ public class ITestS3GuardWriteBack extends AbstractS3ATestBase {
|
||||||
new Path(directory, "OnS3"));
|
new Path(directory, "OnS3"));
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Create a separate S3AFileSystem instance for testing. */
|
/**
|
||||||
|
* Create a separate S3AFileSystem instance for testing.
|
||||||
|
* There's a bit of complexity as it forces pushes up s3guard options from
|
||||||
|
* the base values to the per-bucket options. This stops explicit bucket
|
||||||
|
* settings in test XML configs from unintentionally breaking tests.
|
||||||
|
*/
|
||||||
private S3AFileSystem createTestFS(URI fsURI, boolean disableS3Guard,
|
private S3AFileSystem createTestFS(URI fsURI, boolean disableS3Guard,
|
||||||
boolean authoritativeMeta) throws IOException {
|
boolean authoritativeMeta) throws IOException {
|
||||||
Configuration conf;
|
Configuration conf;
|
||||||
|
@ -112,12 +119,22 @@ public class ITestS3GuardWriteBack extends AbstractS3ATestBase {
|
||||||
// Create a FileSystem that is S3-backed only
|
// Create a FileSystem that is S3-backed only
|
||||||
conf = createConfiguration();
|
conf = createConfiguration();
|
||||||
S3ATestUtils.disableFilesystemCaching(conf);
|
S3ATestUtils.disableFilesystemCaching(conf);
|
||||||
|
String host = fsURI.getHost();
|
||||||
if (disableS3Guard) {
|
if (disableS3Guard) {
|
||||||
conf.set(Constants.S3_METADATA_STORE_IMPL,
|
conf.set(Constants.S3_METADATA_STORE_IMPL,
|
||||||
Constants.S3GUARD_METASTORE_NULL);
|
Constants.S3GUARD_METASTORE_NULL);
|
||||||
|
S3AUtils.setBucketOption(conf, host,
|
||||||
|
S3_METADATA_STORE_IMPL,
|
||||||
|
S3GUARD_METASTORE_NULL);
|
||||||
} else {
|
} else {
|
||||||
S3ATestUtils.maybeEnableS3Guard(conf);
|
S3ATestUtils.maybeEnableS3Guard(conf);
|
||||||
conf.setBoolean(Constants.METADATASTORE_AUTHORITATIVE, authoritativeMeta);
|
conf.setBoolean(METADATASTORE_AUTHORITATIVE, authoritativeMeta);
|
||||||
|
S3AUtils.setBucketOption(conf, host,
|
||||||
|
METADATASTORE_AUTHORITATIVE,
|
||||||
|
Boolean.toString(authoritativeMeta));
|
||||||
|
S3AUtils.setBucketOption(conf, host,
|
||||||
|
S3_METADATA_STORE_IMPL,
|
||||||
|
conf.get(S3_METADATA_STORE_IMPL));
|
||||||
}
|
}
|
||||||
FileSystem fs = FileSystem.get(fsURI, conf);
|
FileSystem fs = FileSystem.get(fsURI, conf);
|
||||||
return asS3AFS(fs);
|
return asS3AFS(fs);
|
||||||
|
@ -128,14 +145,4 @@ public class ITestS3GuardWriteBack extends AbstractS3ATestBase {
|
||||||
return (S3AFileSystem)fs;
|
return (S3AFileSystem)fs;
|
||||||
}
|
}
|
||||||
|
|
||||||
private static void assertPathDoesntExist(FileSystem fs, Path p)
|
|
||||||
throws IOException {
|
|
||||||
try {
|
|
||||||
FileStatus s = fs.getFileStatus(p);
|
|
||||||
} catch (FileNotFoundException e) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
fail("Path should not exist: " + p);
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -696,24 +696,6 @@ public final class S3ATestUtils {
|
||||||
assertEquals("Permission: " + details, permission, status.getPermission());
|
assertEquals("Permission: " + details, permission, status.getPermission());
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Set a bucket specific property to a particular value.
|
|
||||||
* If the generic key passed in has an {@code fs.s3a. prefix},
|
|
||||||
* that's stripped off, so that when the the bucket properties are propagated
|
|
||||||
* down to the generic values, that value gets copied down.
|
|
||||||
* @param conf configuration to set
|
|
||||||
* @param bucket bucket name
|
|
||||||
* @param genericKey key; can start with "fs.s3a."
|
|
||||||
* @param value value to set
|
|
||||||
*/
|
|
||||||
public static void setBucketOption(Configuration conf, String bucket,
|
|
||||||
String genericKey, String value) {
|
|
||||||
final String baseKey = genericKey.startsWith(FS_S3A_PREFIX) ?
|
|
||||||
genericKey.substring(FS_S3A_PREFIX.length())
|
|
||||||
: genericKey;
|
|
||||||
conf.set(FS_S3A_BUCKET_PREFIX + bucket + '.' + baseKey, value);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Assert that a configuration option matches the expected value.
|
* Assert that a configuration option matches the expected value.
|
||||||
* @param conf configuration
|
* @param conf configuration
|
||||||
|
|
|
@ -18,9 +18,15 @@
|
||||||
|
|
||||||
package org.apache.hadoop.fs.s3a.s3guard;
|
package org.apache.hadoop.fs.s3a.s3guard;
|
||||||
|
|
||||||
|
import java.io.ByteArrayOutputStream;
|
||||||
|
import java.io.FileNotFoundException;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.io.PrintStream;
|
||||||
|
import java.util.Collection;
|
||||||
|
import java.util.concurrent.Callable;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
import org.junit.Assume;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
@ -31,8 +37,11 @@ import org.apache.hadoop.fs.s3a.Constants;
|
||||||
import org.apache.hadoop.fs.s3a.S3AFileStatus;
|
import org.apache.hadoop.fs.s3a.S3AFileStatus;
|
||||||
import org.apache.hadoop.fs.s3a.S3ATestUtils;
|
import org.apache.hadoop.fs.s3a.S3ATestUtils;
|
||||||
import org.apache.hadoop.io.IOUtils;
|
import org.apache.hadoop.io.IOUtils;
|
||||||
|
import org.apache.hadoop.util.ExitUtil;
|
||||||
|
import org.apache.hadoop.util.StringUtils;
|
||||||
|
|
||||||
import static org.apache.hadoop.fs.s3a.s3guard.S3GuardTool.SUCCESS;
|
import static org.apache.hadoop.fs.s3a.s3guard.S3GuardTool.SUCCESS;
|
||||||
|
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Common functionality for S3GuardTool test cases.
|
* Common functionality for S3GuardTool test cases.
|
||||||
|
@ -40,6 +49,9 @@ import static org.apache.hadoop.fs.s3a.s3guard.S3GuardTool.SUCCESS;
|
||||||
public abstract class AbstractS3GuardToolTestBase extends AbstractS3ATestBase {
|
public abstract class AbstractS3GuardToolTestBase extends AbstractS3ATestBase {
|
||||||
|
|
||||||
protected static final String OWNER = "hdfs";
|
protected static final String OWNER = "hdfs";
|
||||||
|
protected static final String DYNAMODB_TABLE = "dynamodb://ireland-team";
|
||||||
|
protected static final String S3A_THIS_BUCKET_DOES_NOT_EXIST
|
||||||
|
= "s3a://this-bucket-does-not-exist-00000000000";
|
||||||
|
|
||||||
private MetadataStore ms;
|
private MetadataStore ms;
|
||||||
|
|
||||||
|
@ -57,6 +69,51 @@ public abstract class AbstractS3GuardToolTestBase extends AbstractS3ATestBase {
|
||||||
assertEquals(message, SUCCESS, tool.run(args));
|
assertEquals(message, SUCCESS, tool.run(args));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Run a S3GuardTool command from a varags list.
|
||||||
|
* @param conf configuration
|
||||||
|
* @param args argument list
|
||||||
|
* @return the return code
|
||||||
|
* @throws Exception any exception
|
||||||
|
*/
|
||||||
|
protected int run(Configuration conf, String... args)
|
||||||
|
throws Exception {
|
||||||
|
return S3GuardTool.run(conf, args);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Run a S3GuardTool command from a varags list and the
|
||||||
|
* configuration returned by {@code getConfiguration()}.
|
||||||
|
* @param args argument list
|
||||||
|
* @return the return code
|
||||||
|
* @throws Exception any exception
|
||||||
|
*/
|
||||||
|
protected int run(String... args) throws Exception {
|
||||||
|
return S3GuardTool.run(getConfiguration(), args);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Run a S3GuardTool command from a varags list, catch any raised
|
||||||
|
* ExitException and verify the status code matches that expected.
|
||||||
|
* @param status expected status code of the exception
|
||||||
|
* @param args argument list
|
||||||
|
* @throws Exception any exception
|
||||||
|
*/
|
||||||
|
protected void runToFailure(int status, final String... args)
|
||||||
|
throws Exception {
|
||||||
|
ExitUtil.ExitException ex =
|
||||||
|
intercept(ExitUtil.ExitException.class,
|
||||||
|
new Callable<Integer>() {
|
||||||
|
@Override
|
||||||
|
public Integer call() throws Exception {
|
||||||
|
return run(args);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
if (ex.status != status) {
|
||||||
|
throw ex;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
protected MetadataStore getMetadataStore() {
|
protected MetadataStore getMetadataStore() {
|
||||||
return ms;
|
return ms;
|
||||||
}
|
}
|
||||||
|
@ -134,16 +191,23 @@ public abstract class AbstractS3GuardToolTestBase extends AbstractS3ATestBase {
|
||||||
Thread.sleep(TimeUnit.SECONDS.toMillis(2));
|
Thread.sleep(TimeUnit.SECONDS.toMillis(2));
|
||||||
createFile(new Path(parent, "fresh"), true, true);
|
createFile(new Path(parent, "fresh"), true, true);
|
||||||
|
|
||||||
assertEquals(2, ms.listChildren(parent).getListing().size());
|
assertMetastoreListingCount(parent, "Children count before pruning", 2);
|
||||||
expectSuccess("Prune command did not exit successfully - see output", cmd,
|
exec(cmd, args);
|
||||||
args);
|
assertMetastoreListingCount(parent, "Pruned children count", 1);
|
||||||
assertEquals(1, ms.listChildren(parent).getListing().size());
|
|
||||||
} finally {
|
} finally {
|
||||||
getFileSystem().delete(parent, true);
|
getFileSystem().delete(parent, true);
|
||||||
ms.prune(Long.MAX_VALUE);
|
ms.prune(Long.MAX_VALUE);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void assertMetastoreListingCount(Path parent,
|
||||||
|
String message,
|
||||||
|
int expected) throws IOException {
|
||||||
|
Collection<PathMetadata> listing = ms.listChildren(parent).getListing();
|
||||||
|
assertEquals(message +" [" + StringUtils.join(", ", listing) + "]",
|
||||||
|
expected, listing.size());
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testPruneCommandCLI() throws Exception {
|
public void testPruneCommandCLI() throws Exception {
|
||||||
String testPath = path("testPruneCommandCLI").toString();
|
String testPath = path("testPruneCommandCLI").toString();
|
||||||
|
@ -158,4 +222,70 @@ public abstract class AbstractS3GuardToolTestBase extends AbstractS3ATestBase {
|
||||||
String testPath = path("testPruneCommandConf").toString();
|
String testPath = path("testPruneCommandConf").toString();
|
||||||
testPruneCommand(getConfiguration(), "prune", testPath);
|
testPruneCommand(getConfiguration(), "prune", testPath);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testDestroyNoBucket() throws Throwable {
|
||||||
|
intercept(FileNotFoundException.class,
|
||||||
|
new Callable<Integer>() {
|
||||||
|
@Override
|
||||||
|
public Integer call() throws Exception {
|
||||||
|
return run(S3GuardTool.Destroy.NAME,
|
||||||
|
S3A_THIS_BUCKET_DOES_NOT_EXIST);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the test CSV file; assume() that it is not modified (i.e. we haven't
|
||||||
|
* switched to a new storage infrastructure where the bucket is no longer
|
||||||
|
* read only).
|
||||||
|
* @return test file.
|
||||||
|
*/
|
||||||
|
protected String getLandsatCSVFile() {
|
||||||
|
String csvFile = getConfiguration()
|
||||||
|
.getTrimmed(KEY_CSVTEST_FILE, DEFAULT_CSVTEST_FILE);
|
||||||
|
Assume.assumeTrue("CSV test file is not the default",
|
||||||
|
DEFAULT_CSVTEST_FILE.equals(csvFile));
|
||||||
|
return csvFile;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Execute a command, returning the buffer if the command actually completes.
|
||||||
|
* If an exception is raised the output is logged instead.
|
||||||
|
* @param cmd command
|
||||||
|
* @param buf buffer to use for tool output (not SLF4J output)
|
||||||
|
* @param args argument list
|
||||||
|
* @throws Exception on any failure
|
||||||
|
*/
|
||||||
|
public String exec(S3GuardTool cmd, String...args) throws Exception {
|
||||||
|
ByteArrayOutputStream buf = new ByteArrayOutputStream();
|
||||||
|
try {
|
||||||
|
exec(cmd, buf, args);
|
||||||
|
return buf.toString();
|
||||||
|
} catch (AssertionError e) {
|
||||||
|
throw e;
|
||||||
|
} catch (Exception e) {
|
||||||
|
LOG.error("Command {} failed: \n{}", cmd, buf);
|
||||||
|
throw e;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Execute a command, saving the output into the buffer.
|
||||||
|
* @param cmd command
|
||||||
|
* @param buf buffer to use for tool output (not SLF4J output)
|
||||||
|
* @param args argument list
|
||||||
|
* @throws Exception on any failure
|
||||||
|
*/
|
||||||
|
protected void exec(S3GuardTool cmd, ByteArrayOutputStream buf, String...args)
|
||||||
|
throws Exception {
|
||||||
|
LOG.info("exec {}", (Object) args);
|
||||||
|
int r = 0;
|
||||||
|
try(PrintStream out =new PrintStream(buf)) {
|
||||||
|
r = cmd.run(args, out);
|
||||||
|
out.flush();
|
||||||
|
}
|
||||||
|
assertEquals("Command " + cmd + " failed\n"+ buf, 0, r);
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -19,19 +19,28 @@
|
||||||
package org.apache.hadoop.fs.s3a.s3guard;
|
package org.apache.hadoop.fs.s3a.s3guard;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.Objects;
|
||||||
import java.util.Random;
|
import java.util.Random;
|
||||||
import java.util.concurrent.Callable;
|
import java.util.concurrent.Callable;
|
||||||
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
|
||||||
import com.amazonaws.services.dynamodbv2.document.DynamoDB;
|
import com.amazonaws.services.dynamodbv2.document.DynamoDB;
|
||||||
import com.amazonaws.services.dynamodbv2.document.Table;
|
import com.amazonaws.services.dynamodbv2.document.Table;
|
||||||
import com.amazonaws.services.dynamodbv2.model.ResourceNotFoundException;
|
import com.amazonaws.services.dynamodbv2.model.ResourceNotFoundException;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.s3a.Constants;
|
||||||
import org.apache.hadoop.fs.s3a.S3AFileSystem;
|
import org.apache.hadoop.fs.s3a.S3AFileSystem;
|
||||||
|
import org.apache.hadoop.fs.s3a.S3AUtils;
|
||||||
import org.apache.hadoop.fs.s3a.s3guard.S3GuardTool.Destroy;
|
import org.apache.hadoop.fs.s3a.s3guard.S3GuardTool.Destroy;
|
||||||
import org.apache.hadoop.fs.s3a.s3guard.S3GuardTool.Init;
|
import org.apache.hadoop.fs.s3a.s3guard.S3GuardTool.Init;
|
||||||
import org.apache.hadoop.test.LambdaTestUtils;
|
import org.apache.hadoop.test.LambdaTestUtils;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.fs.s3a.s3guard.DynamoDBMetadataStore.*;
|
||||||
|
import static org.apache.hadoop.fs.s3a.s3guard.S3GuardTool.*;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Test S3Guard related CLI commands against DynamoDB.
|
* Test S3Guard related CLI commands against DynamoDB.
|
||||||
*/
|
*/
|
||||||
|
@ -78,8 +87,76 @@ public class ITestS3GuardToolDynamoDB extends AbstractS3GuardToolTestBase {
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static class Capacities {
|
||||||
|
private final long read, write;
|
||||||
|
|
||||||
|
Capacities(long read, long write) {
|
||||||
|
this.read = read;
|
||||||
|
this.write = write;
|
||||||
|
}
|
||||||
|
|
||||||
|
public long getRead() {
|
||||||
|
return read;
|
||||||
|
}
|
||||||
|
|
||||||
|
public long getWrite() {
|
||||||
|
return write;
|
||||||
|
}
|
||||||
|
|
||||||
|
String getReadStr() {
|
||||||
|
return Long.toString(read);
|
||||||
|
}
|
||||||
|
|
||||||
|
String getWriteStr() {
|
||||||
|
return Long.toString(write);
|
||||||
|
}
|
||||||
|
|
||||||
|
void checkEquals(String text, Capacities that) throws Exception {
|
||||||
|
if (!this.equals(that)) {
|
||||||
|
throw new Exception(text + " expected = " + this +"; actual = "+ that);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean equals(Object o) {
|
||||||
|
if (this == o) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
if (o == null || getClass() != o.getClass()) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
Capacities that = (Capacities) o;
|
||||||
|
return read == that.read && write == that.write;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int hashCode() {
|
||||||
|
return Objects.hash(read, write);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String toString() {
|
||||||
|
final StringBuilder sb = new StringBuilder("Capacities{");
|
||||||
|
sb.append("read=").append(read);
|
||||||
|
sb.append(", write=").append(write);
|
||||||
|
sb.append('}');
|
||||||
|
return sb.toString();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private Capacities getCapacities() throws IOException {
|
||||||
|
Map<String, String> diagnostics = getMetadataStore().getDiagnostics();
|
||||||
|
return getCapacities(diagnostics);
|
||||||
|
}
|
||||||
|
|
||||||
|
private Capacities getCapacities(Map<String, String> diagnostics) {
|
||||||
|
return new Capacities(
|
||||||
|
Long.parseLong(diagnostics.get(DynamoDBMetadataStore.READ_CAPACITY)),
|
||||||
|
Long.parseLong(diagnostics.get(DynamoDBMetadataStore.WRITE_CAPACITY)));
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testDynamoDBInitDestroyCycle() throws Exception {
|
public void testDynamoDBInitDestroyCycle() throws Throwable {
|
||||||
String testTableName = "testDynamoDBInitDestroy" + new Random().nextInt();
|
String testTableName = "testDynamoDBInitDestroy" + new Random().nextInt();
|
||||||
String testS3Url = path(testTableName).toString();
|
String testS3Url = path(testTableName).toString();
|
||||||
S3AFileSystem fs = getFileSystem();
|
S3AFileSystem fs = getFileSystem();
|
||||||
|
@ -99,11 +176,80 @@ public class ITestS3GuardToolDynamoDB extends AbstractS3GuardToolTestBase {
|
||||||
assertTrue(String.format("%s does not exist", testTableName),
|
assertTrue(String.format("%s does not exist", testTableName),
|
||||||
exist(db, testTableName));
|
exist(db, testTableName));
|
||||||
|
|
||||||
|
Configuration conf = fs.getConf();
|
||||||
|
String bucket = fs.getBucket();
|
||||||
|
// force in a new bucket
|
||||||
|
S3AUtils.setBucketOption(conf, bucket, Constants.S3_METADATA_STORE_IMPL,
|
||||||
|
Constants.S3GUARD_METASTORE_DYNAMO);
|
||||||
|
initCmd = new Init(conf);
|
||||||
|
String initOutput = exec(initCmd,
|
||||||
|
"init", "-meta", "dynamodb://" + testTableName, testS3Url);
|
||||||
|
assertTrue("No Dynamo diagnostics in output " + initOutput,
|
||||||
|
initOutput.contains(DESCRIPTION));
|
||||||
|
|
||||||
|
// run a bucket info command and look for
|
||||||
|
// confirmation that it got the output from DDB diags
|
||||||
|
S3GuardTool.BucketInfo infocmd = new S3GuardTool.BucketInfo(conf);
|
||||||
|
String info = exec(infocmd, S3GuardTool.BucketInfo.NAME,
|
||||||
|
"-" + S3GuardTool.BucketInfo.GUARDED_FLAG,
|
||||||
|
testS3Url);
|
||||||
|
assertTrue("No Dynamo diagnostics in output " + info,
|
||||||
|
info.contains(DESCRIPTION));
|
||||||
|
|
||||||
|
// get the current values to set again
|
||||||
|
|
||||||
|
// play with the set-capacity option
|
||||||
|
Capacities original = getCapacities();
|
||||||
|
String fsURI = getFileSystem().getUri().toString();
|
||||||
|
String capacityOut = exec(newSetCapacity(),
|
||||||
|
S3GuardTool.SetCapacity.NAME,
|
||||||
|
fsURI);
|
||||||
|
LOG.info("Set Capacity output=\n{}", capacityOut);
|
||||||
|
capacityOut = exec(newSetCapacity(),
|
||||||
|
S3GuardTool.SetCapacity.NAME,
|
||||||
|
"-" + READ_FLAG, original.getReadStr(),
|
||||||
|
"-" + WRITE_FLAG, original.getWriteStr(),
|
||||||
|
fsURI);
|
||||||
|
LOG.info("Set Capacity output=\n{}", capacityOut);
|
||||||
|
|
||||||
|
// that call does not change the values
|
||||||
|
original.checkEquals("unchanged", getCapacities());
|
||||||
|
|
||||||
|
// now update the value
|
||||||
|
long readCap = original.getRead();
|
||||||
|
long writeCap = original.getWrite();
|
||||||
|
long rc2 = readCap + 1;
|
||||||
|
long wc2 = writeCap + 1;
|
||||||
|
final Capacities desired = new Capacities(rc2, wc2);
|
||||||
|
capacityOut = exec(newSetCapacity(),
|
||||||
|
S3GuardTool.SetCapacity.NAME,
|
||||||
|
"-" + READ_FLAG, Long.toString(rc2),
|
||||||
|
"-" + WRITE_FLAG, Long.toString(wc2),
|
||||||
|
fsURI);
|
||||||
|
LOG.info("Set Capacity output=\n{}", capacityOut);
|
||||||
|
|
||||||
|
// to avoid race conditions, spin for the state change
|
||||||
|
final AtomicInteger c = new AtomicInteger(0);
|
||||||
|
LambdaTestUtils.eventually(60000,
|
||||||
|
new LambdaTestUtils.VoidCallable() {
|
||||||
|
@Override
|
||||||
|
public void call() throws Exception {
|
||||||
|
c.incrementAndGet();
|
||||||
|
Map<String, String> diags = getMetadataStore().getDiagnostics();
|
||||||
|
Capacities updated = getCapacities(diags);
|
||||||
|
String tableInfo = String.format("[%02d] table state: %s",
|
||||||
|
c.intValue(), diags.get(STATUS));
|
||||||
|
LOG.info("{}; capacities {}",
|
||||||
|
tableInfo, updated);
|
||||||
|
desired.checkEquals(tableInfo, updated);
|
||||||
|
}
|
||||||
|
},
|
||||||
|
new LambdaTestUtils.ProportionalRetryInterval(500, 5000));
|
||||||
|
|
||||||
// Destroy MetadataStore
|
// Destroy MetadataStore
|
||||||
Destroy destroyCmd = new Destroy(fs.getConf());
|
Destroy destroyCmd = new Destroy(fs.getConf());
|
||||||
|
|
||||||
expectSuccess("Destroy command did not exit successfully - see output",
|
String destroyed = exec(destroyCmd,
|
||||||
destroyCmd,
|
|
||||||
"destroy", "-meta", "dynamodb://" + testTableName, testS3Url);
|
"destroy", "-meta", "dynamodb://" + testTableName, testS3Url);
|
||||||
// Verify it does not exist
|
// Verify it does not exist
|
||||||
assertFalse(String.format("%s still exists", testTableName),
|
assertFalse(String.format("%s still exists", testTableName),
|
||||||
|
@ -131,4 +277,19 @@ public class ITestS3GuardToolDynamoDB extends AbstractS3GuardToolTestBase {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private S3GuardTool newSetCapacity() {
|
||||||
|
S3GuardTool setCapacity = new S3GuardTool.SetCapacity(
|
||||||
|
getFileSystem().getConf());
|
||||||
|
setCapacity.setStore(getMetadataStore());
|
||||||
|
return setCapacity;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testDestroyUnknownTable() throws Throwable {
|
||||||
|
run(S3GuardTool.Destroy.NAME,
|
||||||
|
"-region", "us-west-2",
|
||||||
|
"-meta", DYNAMODB_TABLE);
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -21,11 +21,12 @@ package org.apache.hadoop.fs.s3a.s3guard;
|
||||||
import java.io.BufferedReader;
|
import java.io.BufferedReader;
|
||||||
import java.io.ByteArrayInputStream;
|
import java.io.ByteArrayInputStream;
|
||||||
import java.io.ByteArrayOutputStream;
|
import java.io.ByteArrayOutputStream;
|
||||||
|
import java.io.FileNotFoundException;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.InputStreamReader;
|
import java.io.InputStreamReader;
|
||||||
import java.io.PrintStream;
|
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
import java.util.concurrent.Callable;
|
||||||
|
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
|
@ -34,13 +35,16 @@ import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.fs.s3a.S3AFileSystem;
|
import org.apache.hadoop.fs.s3a.S3AFileSystem;
|
||||||
import org.apache.hadoop.fs.s3a.s3guard.S3GuardTool.Diff;
|
import org.apache.hadoop.fs.s3a.s3guard.S3GuardTool.Diff;
|
||||||
|
|
||||||
import static org.apache.hadoop.fs.s3a.s3guard.S3GuardTool.SUCCESS;
|
import static org.apache.hadoop.fs.s3a.s3guard.S3GuardTool.*;
|
||||||
|
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Test S3Guard related CLI commands against a LocalMetadataStore.
|
* Test S3Guard related CLI commands against a LocalMetadataStore.
|
||||||
*/
|
*/
|
||||||
public class ITestS3GuardToolLocal extends AbstractS3GuardToolTestBase {
|
public class ITestS3GuardToolLocal extends AbstractS3GuardToolTestBase {
|
||||||
|
|
||||||
|
private static final String LOCAL_METADATA = "local://metadata";
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected MetadataStore newMetadataStore() {
|
protected MetadataStore newMetadataStore() {
|
||||||
return new LocalMetadataStore();
|
return new LocalMetadataStore();
|
||||||
|
@ -65,10 +69,7 @@ public class ITestS3GuardToolLocal extends AbstractS3GuardToolTestBase {
|
||||||
|
|
||||||
S3GuardTool.Import cmd = new S3GuardTool.Import(fs.getConf());
|
S3GuardTool.Import cmd = new S3GuardTool.Import(fs.getConf());
|
||||||
cmd.setStore(ms);
|
cmd.setStore(ms);
|
||||||
|
exec(cmd, "import", parent.toString());
|
||||||
expectSuccess("Import command did not exit successfully - see output",
|
|
||||||
cmd,
|
|
||||||
"import", parent.toString());
|
|
||||||
|
|
||||||
DirListingMetadata children =
|
DirListingMetadata children =
|
||||||
ms.listChildren(dir);
|
ms.listChildren(dir);
|
||||||
|
@ -80,7 +81,7 @@ public class ITestS3GuardToolLocal extends AbstractS3GuardToolTestBase {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testDiffCommand() throws IOException {
|
public void testDiffCommand() throws Exception {
|
||||||
S3AFileSystem fs = getFileSystem();
|
S3AFileSystem fs = getFileSystem();
|
||||||
MetadataStore ms = getMetadataStore();
|
MetadataStore ms = getMetadataStore();
|
||||||
Set<Path> filesOnS3 = new HashSet<>(); // files on S3.
|
Set<Path> filesOnS3 = new HashSet<>(); // files on S3.
|
||||||
|
@ -108,13 +109,10 @@ public class ITestS3GuardToolLocal extends AbstractS3GuardToolTestBase {
|
||||||
}
|
}
|
||||||
|
|
||||||
ByteArrayOutputStream buf = new ByteArrayOutputStream();
|
ByteArrayOutputStream buf = new ByteArrayOutputStream();
|
||||||
PrintStream out = new PrintStream(buf);
|
|
||||||
Diff cmd = new Diff(fs.getConf());
|
Diff cmd = new Diff(fs.getConf());
|
||||||
cmd.setStore(ms);
|
cmd.setStore(ms);
|
||||||
assertEquals("Diff command did not exit successfully - see output", SUCCESS,
|
exec(cmd, buf, "diff", "-meta", LOCAL_METADATA,
|
||||||
cmd.run(new String[]{"diff", "-meta", "local://metadata",
|
testPath.toString());
|
||||||
testPath.toString()}, out));
|
|
||||||
out.close();
|
|
||||||
|
|
||||||
Set<Path> actualOnS3 = new HashSet<>();
|
Set<Path> actualOnS3 = new HashSet<>();
|
||||||
Set<Path> actualOnMS = new HashSet<>();
|
Set<Path> actualOnMS = new HashSet<>();
|
||||||
|
@ -140,10 +138,128 @@ public class ITestS3GuardToolLocal extends AbstractS3GuardToolTestBase {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
String actualOut = out.toString();
|
String actualOut = buf.toString();
|
||||||
assertEquals("Mismatched metadata store outputs: " + actualOut,
|
assertEquals("Mismatched metadata store outputs: " + actualOut,
|
||||||
filesOnMS, actualOnMS);
|
filesOnMS, actualOnMS);
|
||||||
assertEquals("Mismatched s3 outputs: " + actualOut, filesOnS3, actualOnS3);
|
assertEquals("Mismatched s3 outputs: " + actualOut, filesOnS3, actualOnS3);
|
||||||
assertFalse("Diff contained duplicates", duplicates);
|
assertFalse("Diff contained duplicates", duplicates);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testDestroyBucketExistsButNoTable() throws Throwable {
|
||||||
|
run(Destroy.NAME,
|
||||||
|
"-meta", LOCAL_METADATA,
|
||||||
|
getLandsatCSVFile());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testImportNoFilesystem() throws Throwable {
|
||||||
|
final Import importer =
|
||||||
|
new S3GuardTool.Import(getConfiguration());
|
||||||
|
importer.setStore(getMetadataStore());
|
||||||
|
intercept(IOException.class,
|
||||||
|
new Callable<Integer>() {
|
||||||
|
@Override
|
||||||
|
public Integer call() throws Exception {
|
||||||
|
return importer.run(
|
||||||
|
new String[]{
|
||||||
|
"import",
|
||||||
|
"-meta", LOCAL_METADATA,
|
||||||
|
S3A_THIS_BUCKET_DOES_NOT_EXIST
|
||||||
|
});
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testInfoBucketAndRegionNoFS() throws Throwable {
|
||||||
|
intercept(FileNotFoundException.class,
|
||||||
|
new Callable<Integer>() {
|
||||||
|
@Override
|
||||||
|
public Integer call() throws Exception {
|
||||||
|
return run(BucketInfo.NAME, "-meta",
|
||||||
|
LOCAL_METADATA, "-region",
|
||||||
|
"any-region", S3A_THIS_BUCKET_DOES_NOT_EXIST);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testInitNegativeRead() throws Throwable {
|
||||||
|
runToFailure(INVALID_ARGUMENT,
|
||||||
|
Init.NAME, "-meta", LOCAL_METADATA, "-region",
|
||||||
|
"eu-west-1",
|
||||||
|
READ_FLAG, "-10");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testInit() throws Throwable {
|
||||||
|
run(Init.NAME,
|
||||||
|
"-meta", LOCAL_METADATA,
|
||||||
|
"-region", "us-west-1");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testInitTwice() throws Throwable {
|
||||||
|
run(Init.NAME,
|
||||||
|
"-meta", LOCAL_METADATA,
|
||||||
|
"-region", "us-west-1");
|
||||||
|
run(Init.NAME,
|
||||||
|
"-meta", LOCAL_METADATA,
|
||||||
|
"-region", "us-west-1");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testLandsatBucketUnguarded() throws Throwable {
|
||||||
|
run(BucketInfo.NAME,
|
||||||
|
"-" + BucketInfo.UNGUARDED_FLAG,
|
||||||
|
getLandsatCSVFile());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testLandsatBucketRequireGuarded() throws Throwable {
|
||||||
|
runToFailure(E_BAD_STATE,
|
||||||
|
BucketInfo.NAME,
|
||||||
|
"-" + BucketInfo.GUARDED_FLAG,
|
||||||
|
ITestS3GuardToolLocal.this.getLandsatCSVFile());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testLandsatBucketRequireUnencrypted() throws Throwable {
|
||||||
|
run(BucketInfo.NAME,
|
||||||
|
"-" + BucketInfo.ENCRYPTION_FLAG, "none",
|
||||||
|
getLandsatCSVFile());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testLandsatBucketRequireEncrypted() throws Throwable {
|
||||||
|
runToFailure(E_BAD_STATE,
|
||||||
|
BucketInfo.NAME,
|
||||||
|
"-" + BucketInfo.ENCRYPTION_FLAG,
|
||||||
|
"AES256", ITestS3GuardToolLocal.this.getLandsatCSVFile());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testStoreInfo() throws Throwable {
|
||||||
|
S3GuardTool.BucketInfo cmd = new S3GuardTool.BucketInfo(
|
||||||
|
getFileSystem().getConf());
|
||||||
|
cmd.setStore(getMetadataStore());
|
||||||
|
String output = exec(cmd, cmd.getName(),
|
||||||
|
"-" + S3GuardTool.BucketInfo.GUARDED_FLAG,
|
||||||
|
getFileSystem().getUri().toString());
|
||||||
|
LOG.info("Exec output=\n{}", output);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSetCapacity() throws Throwable {
|
||||||
|
S3GuardTool cmd = new S3GuardTool.SetCapacity(getFileSystem().getConf());
|
||||||
|
cmd.setStore(getMetadataStore());
|
||||||
|
String output = exec(cmd, cmd.getName(),
|
||||||
|
"-" + READ_FLAG, "100",
|
||||||
|
"-" + WRITE_FLAG, "100",
|
||||||
|
getFileSystem().getUri().toString());
|
||||||
|
LOG.info("Exec output=\n{}", output);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,128 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.fs.s3a.s3guard;
|
||||||
|
|
||||||
|
import java.util.concurrent.Callable;
|
||||||
|
|
||||||
|
import org.junit.Assert;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.s3a.S3ATestConstants;
|
||||||
|
import org.apache.hadoop.test.LambdaTestUtils;
|
||||||
|
import org.apache.hadoop.util.ExitUtil;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.fs.s3a.s3guard.S3GuardTool.*;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test the S3Guard CLI entry point.
|
||||||
|
*/
|
||||||
|
public class TestS3GuardCLI extends Assert {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Run a S3GuardTool command from a varags list.
|
||||||
|
* @param args argument list
|
||||||
|
* @return the return code
|
||||||
|
* @throws Exception any exception
|
||||||
|
*/
|
||||||
|
protected int run(String... args)
|
||||||
|
throws Exception {
|
||||||
|
Configuration conf = new Configuration(false);
|
||||||
|
return S3GuardTool.run(conf, args);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Run a S3GuardTool command from a varags list, catch any raised
|
||||||
|
* ExitException and verify the status code matches that expected.
|
||||||
|
* @param status expected status code of an exception
|
||||||
|
* @param args argument list
|
||||||
|
* @throws Exception any exception
|
||||||
|
*/
|
||||||
|
protected void runToFailure(int status, final String... args)
|
||||||
|
throws Exception {
|
||||||
|
ExitUtil.ExitException ex =
|
||||||
|
LambdaTestUtils.intercept(ExitUtil.ExitException.class,
|
||||||
|
new Callable<Integer>() {
|
||||||
|
@Override
|
||||||
|
public Integer call() throws Exception {
|
||||||
|
return run(args);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
if (ex.status != status) {
|
||||||
|
throw ex;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testInfoNoArgs() throws Throwable {
|
||||||
|
runToFailure(INVALID_ARGUMENT, BucketInfo.NAME);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testInfoWrongFilesystem() throws Throwable {
|
||||||
|
runToFailure(INVALID_ARGUMENT,
|
||||||
|
BucketInfo.NAME, "file://");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testNoCommand() throws Throwable {
|
||||||
|
runToFailure(E_USAGE);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testUnknownCommand() throws Throwable {
|
||||||
|
runToFailure(E_USAGE, "unknown");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testPruneNoArgs() throws Throwable {
|
||||||
|
runToFailure(INVALID_ARGUMENT, Prune.NAME);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testDiffNoArgs() throws Throwable {
|
||||||
|
runToFailure(INVALID_ARGUMENT, Diff.NAME);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testImportNoArgs() throws Throwable {
|
||||||
|
runToFailure(INVALID_ARGUMENT, Import.NAME);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testDestroyNoArgs() throws Throwable {
|
||||||
|
runToFailure(INVALID_ARGUMENT, Destroy.NAME);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testDestroyUnknownTableNoRegion() throws Throwable {
|
||||||
|
runToFailure(INVALID_ARGUMENT, Destroy.NAME,
|
||||||
|
"-meta", "dynamodb://ireland-team");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testInitBucketAndRegion() throws Throwable {
|
||||||
|
runToFailure(INVALID_ARGUMENT, Init.NAME,
|
||||||
|
"-meta", "dynamodb://ireland-team",
|
||||||
|
"-region", "eu-west-1",
|
||||||
|
S3ATestConstants.DEFAULT_CSVTEST_FILE
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
Loading…
Reference in New Issue