HADOOP-16080. hadoop-aws does not work with hadoop-client-api (#2510). Contributed by Chao Sun

This commit is contained in:
Chao Sun 2020-12-03 16:06:33 -08:00 committed by GitHub
parent f547cd43d1
commit d82b0cc439
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 59 additions and 60 deletions

View File

@ -28,8 +28,6 @@ import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.util.concurrent.MoreExecutors;
import org.apache.hadoop.classification.InterfaceAudience;
/**
@ -105,8 +103,7 @@ public final class BlockingThreadPoolExecutorService
private BlockingThreadPoolExecutorService(int permitCount,
ThreadPoolExecutor eventProcessingExecutor) {
super(MoreExecutors.listeningDecorator(eventProcessingExecutor),
permitCount, false);
super(eventProcessingExecutor, permitCount, false);
this.eventProcessingExecutor = eventProcessingExecutor;
}

View File

@ -18,10 +18,8 @@
package org.apache.hadoop.util;
import com.google.common.util.concurrent.ForwardingListeningExecutorService;
import com.google.common.util.concurrent.ForwardingExecutorService;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import org.apache.hadoop.classification.InterfaceAudience;
@ -29,6 +27,7 @@ import java.util.Collection;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
@ -49,10 +48,10 @@ import java.util.concurrent.TimeoutException;
@SuppressWarnings("NullableProblems")
@InterfaceAudience.Private
public class SemaphoredDelegatingExecutor extends
ForwardingListeningExecutorService {
ForwardingExecutorService {
private final Semaphore queueingPermits;
private final ListeningExecutorService executorDelegatee;
private final ExecutorService executorDelegatee;
private final int permitCount;
/**
@ -62,7 +61,7 @@ public class SemaphoredDelegatingExecutor extends
* @param fair should the semaphore be "fair"
*/
public SemaphoredDelegatingExecutor(
ListeningExecutorService executorDelegatee,
ExecutorService executorDelegatee,
int permitCount,
boolean fair) {
this.permitCount = permitCount;
@ -71,7 +70,7 @@ public class SemaphoredDelegatingExecutor extends
}
@Override
protected ListeningExecutorService delegate() {
protected ExecutorService delegate() {
return executorDelegatee;
}
@ -102,7 +101,7 @@ public class SemaphoredDelegatingExecutor extends
}
@Override
public <T> ListenableFuture<T> submit(Callable<T> task) {
public <T> Future<T> submit(Callable<T> task) {
try {
queueingPermits.acquire();
} catch (InterruptedException e) {
@ -113,7 +112,7 @@ public class SemaphoredDelegatingExecutor extends
}
@Override
public <T> ListenableFuture<T> submit(Runnable task, T result) {
public <T> Future<T> submit(Runnable task, T result) {
try {
queueingPermits.acquire();
} catch (InterruptedException e) {
@ -124,7 +123,7 @@ public class SemaphoredDelegatingExecutor extends
}
@Override
public ListenableFuture<?> submit(Runnable task) {
public Future<?> submit(Runnable task) {
try {
queueingPermits.acquire();
} catch (InterruptedException e) {

View File

@ -27,7 +27,6 @@ import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
@ -78,8 +77,8 @@ public class AliyunOSSFileSystem extends FileSystem {
private int maxKeys;
private int maxReadAheadPartNumber;
private int maxConcurrentCopyTasksPerDir;
private ListeningExecutorService boundedThreadPool;
private ListeningExecutorService boundedCopyThreadPool;
private ExecutorService boundedThreadPool;
private ExecutorService boundedCopyThreadPool;
private static final PathFilter DEFAULT_FILTER = new PathFilter() {
@Override

View File

@ -156,7 +156,7 @@ public class FailureInjectionPolicy {
*/
private static float validProbability(float p) {
Preconditions.checkArgument(p >= 0.0f && p <= 1.0f,
"Probability out of range 0 to 1 %s", p);
String.format("Probability out of range 0 to 1 %s", p));
return p;
}

View File

@ -294,8 +294,8 @@ public class InconsistentAmazonS3Client extends AmazonS3Client {
String child) {
Path prefixCandidate = new Path(child).getParent();
Path ancestorPath = new Path(ancestor);
Preconditions.checkArgument(child.startsWith(ancestor), "%s does not " +
"start with %s", child, ancestor);
Preconditions.checkArgument(child.startsWith(ancestor),
String.format("%s does not start with %s", child, ancestor));
while (!prefixCandidate.isRoot()) {
Path nextParent = prefixCandidate.getParent();
if (nextParent.equals(ancestorPath)) {

View File

@ -157,7 +157,7 @@ class S3ABlockOutputStream extends OutputStream implements
this.writeOperationHelper = writeOperationHelper;
this.putTracker = putTracker;
Preconditions.checkArgument(blockSize >= Constants.MULTIPART_MIN_SIZE,
"Block size is too small: %d", blockSize);
String.format("Block size is too small: %d", blockSize));
this.executorService = MoreExecutors.listeningDecorator(executorService);
this.multiPartUpload = null;
this.progressListener = (progress instanceof ProgressListener) ?

View File

@ -75,7 +75,6 @@ import com.amazonaws.services.s3.transfer.model.UploadResult;
import com.amazonaws.event.ProgressListener;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.ListeningExecutorService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -181,7 +180,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities {
private long partSize;
private boolean enableMultiObjectsDelete;
private TransferManager transfers;
private ListeningExecutorService boundedThreadPool;
private ExecutorService boundedThreadPool;
private ExecutorService unboundedThreadPool;
private long multiPartThreshold;
public static final Logger LOG = LoggerFactory.getLogger(S3AFileSystem.class);
@ -2254,9 +2253,9 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities {
final boolean needEmptyDirectoryFlag) throws IOException {
LOG.debug("S3GetFileStatus {}", path);
Preconditions.checkArgument(!needEmptyDirectoryFlag
|| probes.contains(StatusProbeEnum.List),
"s3GetFileStatus(%s) wants to know if a directory is empty but"
+ " does not request a list probe", path);
|| probes.contains(StatusProbeEnum.List), String.format(
"s3GetFileStatus(%s) wants to know if a directory is empty but"
+ " does not request a list probe", path));
if (!key.isEmpty() && !key.endsWith("/")
&& probes.contains(StatusProbeEnum.Head)) {

View File

@ -71,7 +71,7 @@ public class S3AMultipartUploader extends MultipartUploader {
public S3AMultipartUploader(FileSystem fs, Configuration conf) {
Preconditions.checkArgument(fs instanceof S3AFileSystem,
"Wrong filesystem: expected S3A but got %s", fs);
String.format("Wrong filesystem: expected S3A but got %s", fs));
s3a = (S3AFileSystem) fs;
}

View File

@ -82,7 +82,7 @@ public class S3AReadOpContext extends S3AOpContext {
dstFileStatus);
this.path = checkNotNull(path);
Preconditions.checkArgument(readahead >= 0,
"invalid readahead %d", readahead);
String.format("invalid readahead %d", readahead));
this.inputPolicy = checkNotNull(inputPolicy);
this.changeDetectionPolicy = checkNotNull(changeDetectionPolicy);
this.readahead = readahead;

View File

@ -814,7 +814,7 @@ public final class S3AUtils {
throws IOException {
String initialVal;
Preconditions.checkArgument(baseKey.startsWith(FS_S3A_PREFIX),
"%s does not start with $%s", baseKey, FS_S3A_PREFIX);
String.format("%s does not start with $%s", baseKey, FS_S3A_PREFIX));
// if there's a bucket, work with it
if (StringUtils.isNotEmpty(bucket)) {
String subkey = baseKey.substring(FS_S3A_PREFIX.length());

View File

@ -373,10 +373,10 @@ public class WriteOperationHelper {
// exactly one source must be set; xor verifies this
checkArgument((uploadStream != null) ^ (sourceFile != null),
"Data source");
checkArgument(size >= 0, "Invalid partition size %s", size);
checkArgument(size >= 0, String.format("Invalid partition size %s", size));
checkArgument(partNumber > 0 && partNumber <= 10000,
"partNumber must be between 1 and 10000 inclusive, but is %s",
partNumber);
String.format("partNumber must be between 1 and 10000 inclusive,"
+ " but is %s", partNumber));
LOG.debug("Creating part upload request for {} #{} size {}",
uploadId, partNumber, size);
@ -391,11 +391,11 @@ public class WriteOperationHelper {
request.setInputStream(uploadStream);
} else {
checkArgument(sourceFile.exists(),
"Source file does not exist: %s", sourceFile);
checkArgument(offset >= 0, "Invalid offset %s", offset);
String.format("Source file does not exist: %s", sourceFile));
checkArgument(offset >= 0, String.format("Invalid offset %s", offset));
long length = sourceFile.length();
checkArgument(offset == 0 || offset < length,
"Offset %s beyond length of file %s", offset, length);
String.format("Offset %s beyond length of file %s", offset, length));
request.setFile(sourceFile);
request.setFileOffset(offset);
}

View File

@ -312,7 +312,8 @@ public class RoleModel {
@Override
public void validate() {
checkNotNull(statement, "Statement");
checkState(VERSION.equals(version), "Invalid Version: %s", version);
checkState(VERSION.equals(version),
String.format("Invalid Version: %s", version));
statement.stream().forEach((a) -> a.validate());
}

View File

@ -378,7 +378,7 @@ public class StagingCommitter extends AbstractS3ACommitter {
// get files on the local FS in the attempt path
Path attemptPath = getTaskAttemptPath(context);
Preconditions.checkNotNull(attemptPath,
"No attemptPath path in {}", this);
"No attemptPath path in " + this);
LOG.debug("Scanning {} for files to commit", attemptPath);

View File

@ -266,19 +266,19 @@ public class DirListingMetadata {
URI parentUri = path.toUri();
if (parentUri.getHost() != null) {
URI childUri = childPath.toUri();
Preconditions.checkNotNull(childUri.getHost(), "Expected non-null URI " +
"host: %s", childUri);
Preconditions.checkNotNull(childUri.getHost(),
String.format("Expected non-null URI host: %s", childUri));
Preconditions.checkArgument(
childUri.getHost().equals(parentUri.getHost()),
"childUri %s and parentUri %s must have the same host",
childUri, parentUri);
Preconditions.checkNotNull(childUri.getScheme(), "No scheme in path %s",
childUri);
String.format("childUri %s and parentUri %s must have the same host",
childUri, parentUri));
Preconditions.checkNotNull(childUri.getScheme(),
String.format("No scheme in path %s", childUri));
}
Preconditions.checkArgument(!childPath.isRoot(),
"childPath cannot be the root path: %s", childPath);
String.format("childPath cannot be the root path: %s", childPath));
Preconditions.checkArgument(childPath.getParent().equals(path),
"childPath %s must be a child of %s", childPath, path);
String.format("childPath %s must be a child of %s", childPath, path));
}
/**
@ -296,9 +296,9 @@ public class DirListingMetadata {
Path p = status.getPath();
Preconditions.checkNotNull(p, "Child status' path cannot be null");
Preconditions.checkArgument(!p.isRoot(),
"childPath cannot be the root path: %s", p);
String.format("childPath cannot be the root path: %s", p));
Preconditions.checkArgument(p.getParent().equals(path),
"childPath %s must be a child of %s", p, path);
String.format("childPath %s must be a child of %s", p, path));
URI uri = p.toUri();
URI parentUri = path.toUri();
// If FileStatus' path is missing host, but should have one, add it.
@ -317,6 +317,7 @@ public class DirListingMetadata {
private void checkPathAbsolute(Path p) {
Preconditions.checkNotNull(p, "path must be non-null");
Preconditions.checkArgument(p.isAbsolute(), "path must be absolute: %s", p);
Preconditions.checkArgument(p.isAbsolute(),
String.format("path must be absolute: %s", p));
}
}

View File

@ -1420,14 +1420,15 @@ public class DynamoDBMetadataStore implements MetadataStore {
*/
private Path checkPath(Path path) {
Preconditions.checkNotNull(path);
Preconditions.checkArgument(path.isAbsolute(), "Path %s is not absolute",
path);
Preconditions.checkArgument(path.isAbsolute(),
String.format("Path %s is not absolute", path));
URI uri = path.toUri();
Preconditions.checkNotNull(uri.getScheme(), "Path %s missing scheme", path);
Preconditions.checkNotNull(uri.getScheme(),
String.format("Path %s missing scheme", path));
Preconditions.checkArgument(uri.getScheme().equals(Constants.FS_S3A),
"Path %s scheme must be %s", path, Constants.FS_S3A);
Preconditions.checkArgument(!StringUtils.isEmpty(uri.getHost()), "Path %s" +
" is missing bucket.", path);
String.format("Path %s scheme must be %s", path, Constants.FS_S3A));
Preconditions.checkArgument(!StringUtils.isEmpty(uri.getHost()),
String.format("Path %s is missing bucket.", path));
return path;
}

View File

@ -129,9 +129,11 @@ final class PathMetadataDynamoDBTranslation {
}
String parentStr = item.getString(PARENT);
Preconditions.checkNotNull(parentStr, "No parent entry in item %s", item);
Preconditions.checkNotNull(parentStr,
String.format("No parent entry in item %s", item));
String childStr = item.getString(CHILD);
Preconditions.checkNotNull(childStr, "No child entry in item %s", item);
Preconditions.checkNotNull(childStr,
String.format("No child entry in item %s", item));
// Skip table version markers, which are only non-absolute paths stored.
Path rawPath = new Path(parentStr, childStr);

View File

@ -309,8 +309,8 @@ public abstract class S3GuardTool extends Configured implements Tool {
S3_METADATA_STORE_IMPL);
LOG.debug("updated bucket store option {}", updatedBucketOption);
Preconditions.checkState(S3GUARD_METASTORE_NULL.equals(updatedBucketOption),
"Expected bucket option to be %s but was %s",
S3GUARD_METASTORE_NULL, updatedBucketOption);
String.format("Expected bucket option to be %s but was %s",
S3GUARD_METASTORE_NULL, updatedBucketOption));
FileSystem fs = FileSystem.newInstance(uri, conf);
if (!(fs instanceof S3AFileSystem)) {

View File

@ -18,7 +18,6 @@
package org.apache.hadoop.fs.s3a;
import com.google.common.util.concurrent.ListenableFuture;
import org.apache.hadoop.util.BlockingThreadPoolExecutorService;
import org.apache.hadoop.util.SemaphoredDelegatingExecutor;
import org.apache.hadoop.util.StopWatch;
@ -33,6 +32,7 @@ import org.slf4j.LoggerFactory;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import static org.junit.Assert.assertEquals;
@ -70,7 +70,7 @@ public class ITestBlockingThreadPoolExecutorService {
@Test
public void testSubmitCallable() throws Exception {
ensureCreated();
ListenableFuture<Integer> f = tpe.submit(callableSleeper);
Future<Integer> f = tpe.submit(callableSleeper);
Integer v = f.get();
assertEquals(SOME_VALUE, v);
}