HADOOP-13171. Add StorageStatistics to S3A; instrument some more operations. Contributed by Steve Loughran.

This commit is contained in:
Chris Nauroth 2016-06-03 08:56:36 -07:00
parent d0dc5aaa2d
commit b8216c10d8
18 changed files with 1985 additions and 439 deletions

View File

@ -22,7 +22,9 @@ import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.io.IOUtils;
import org.junit.Assert;
import org.junit.internal.AssumptionViolatedException;
@ -34,8 +36,14 @@ import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Properties;
import java.util.Set;
import java.util.UUID;
/**
@ -892,4 +900,416 @@ public class ContractTestUtils extends Assert {
fs.delete(objectPath, false);
}
}
/**
* Make times more readable, by adding a "," every three digits.
* @param nanos nanos or other large number
* @return a string for logging
*/
public static String toHuman(long nanos) {
return String.format(Locale.ENGLISH, "%,d", nanos);
}
/**
* Log the bandwidth of a timer as inferred from the number of
* bytes processed.
* @param timer timer
* @param bytes bytes processed in the time period
*/
public static void bandwidth(NanoTimer timer, long bytes) {
LOG.info("Bandwidth = {} MB/S",
timer.bandwidthDescription(bytes));
}
/**
* Work out the bandwidth in MB/s.
* @param bytes bytes
* @param durationNS duration in nanos
* @return the number of megabytes/second of the recorded operation
*/
public static double bandwidthMBs(long bytes, long durationNS) {
return (bytes * 1000.0) / durationNS;
}
/**
* Recursively create a directory tree.
* Return the details about the created tree. The files and directories
* are those created under the path, not the base directory created. That
* is retrievable via {@link TreeScanResults#getBasePath()}.
* @param fs filesystem
* @param current parent dir
* @param depth depth of directory tree
* @param width width: subdirs per entry
* @param files number of files per entry
* @param filesize size of files to create in bytes.
* @return the details about the created tree.
* @throws IOException IO Problems
*/
public static TreeScanResults createSubdirs(FileSystem fs,
Path current,
int depth,
int width,
int files,
int filesize) throws IOException {
return createSubdirs(fs, current, depth, width, files,
filesize, "dir-", "file-", "0");
}
/**
* Recursively create a directory tree.
* @param fs filesystem
* @param current the current dir in the walk
* @param depth depth of directory tree
* @param width width: subdirs per entry
* @param files number of files per entry
* @param filesize size of files to create in bytes.
* @param dirPrefix prefix for directory entries
* @param filePrefix prefix for file entries
* @param marker string which is slowly built up to uniquely name things
* @return the details about the created tree.
* @throws IOException IO Problems
*/
public static TreeScanResults createSubdirs(FileSystem fs,
Path current,
int depth,
int width,
int files,
int filesize,
String dirPrefix,
String filePrefix,
String marker) throws IOException {
fs.mkdirs(current);
TreeScanResults results = new TreeScanResults(current);
if (depth > 0) {
byte[] data = dataset(filesize, 'a', 'z');
for (int i = 0; i < files; i++) {
String name = String.format("%s-%s-%04d.txt", filePrefix, marker, i);
Path path = new Path(current, name);
createFile(fs, path, true, data);
results.add(fs, path);
}
for (int w = 0; w < width; w++) {
String marker2 = String.format("%s-%04d", marker, w);
Path child = new Path(current, dirPrefix + marker2);
results.add(createSubdirs(fs, child, depth - 1, width, files,
filesize, dirPrefix, filePrefix, marker2));
results.add(fs, child);
}
}
return results;
}
/**
* Predicate to determine if two lists are equivalent, that is, they
* contain the same entries.
* @param left first collection of paths
* @param right second collection of paths
* @return true if all entries are in each collection of path.
*/
public static boolean collectionsEquivalent(Collection<Path> left,
Collection<Path> right) {
Set<Path> leftSet = new HashSet<>(left);
Set<Path> rightSet = new HashSet<>(right);
return leftSet.containsAll(right) && rightSet.containsAll(left);
}
/**
* Predicate to determine if two lists are equivalent, that is, they
* contain the same entries.
* @param left first collection of paths
* @param right second collection of paths
* @return true if all entries are in each collection of path.
*/
public static boolean collectionsEquivalentNoDuplicates(Collection<Path> left,
Collection<Path> right) {
return collectionsEquivalent(left, right) &&
!containsDuplicates(left) && !containsDuplicates(right);
}
/**
* Predicate to test for a collection of paths containing duplicate entries.
* @param paths collection of paths
* @return true if there are duplicates.
*/
public static boolean containsDuplicates(Collection<Path> paths) {
return new HashSet<>(paths).size() != paths.size();
}
/**
* Recursively list all entries, with a depth first traversal of the
* directory tree.
* @param path path
* @return the number of entries listed
* @throws IOException IO problems
*/
public static TreeScanResults treeWalk(FileSystem fs, Path path)
throws IOException {
TreeScanResults dirsAndFiles = new TreeScanResults();
FileStatus[] statuses = fs.listStatus(path);
for (FileStatus status : statuses) {
LOG.info("{}{}", status.getPath(), status.isDirectory() ? "*" : "");
}
for (FileStatus status : statuses) {
dirsAndFiles.add(status);
if (status.isDirectory()) {
dirsAndFiles.add(treeWalk(fs, status.getPath()));
}
}
return dirsAndFiles;
}
/**
* Results of recursive directory creation/scan operations.
*/
public static final class TreeScanResults {
private Path basePath;
private final List<Path> files = new ArrayList<>();
private final List<Path> directories = new ArrayList<>();
private final List<Path> other = new ArrayList<>();
public TreeScanResults() {
}
public TreeScanResults(Path basePath) {
this.basePath = basePath;
}
/**
* Build from a located file status iterator.
* @param results results of the listFiles/listStatus call.
* @throws IOException IO problems during the iteration.
*/
public TreeScanResults(RemoteIterator<LocatedFileStatus> results)
throws IOException {
while (results.hasNext()) {
add(results.next());
}
}
/**
* Construct results from an array of statistics.
* @param stats statistics array. Must not be null.
*/
public TreeScanResults(FileStatus[] stats) {
assertNotNull("Null file status array", stats);
for (FileStatus stat : stats) {
add(stat);
}
}
/**
* Add all paths in the other set of results to this instance.
* @param that the other instance
* @return this instance
*/
public TreeScanResults add(TreeScanResults that) {
files.addAll(that.files);
directories.addAll(that.directories);
other.addAll(that.other);
return this;
}
/**
* Increment the counters based on the file status.
* @param status path status to count.
*/
public void add(FileStatus status) {
if (status.isFile()) {
files.add(status.getPath());
} else if (status.isDirectory()) {
directories.add(status.getPath());
} else {
other.add(status.getPath());
}
}
public void add(FileSystem fs, Path path) throws IOException {
add(fs.getFileStatus(path));
}
@Override
public String toString() {
return String.format("%d director%s and %d file%s",
getDirCount(),
getDirCount() == 1 ? "y" : "ies",
getFileCount(),
getFileCount() == 1 ? "" : "s");
}
/**
* Assert that the state of a listing has the specific number of files,
* directories and other entries. The error text will include
* the {@code text} param, the field in question, and the entire object's
* string value.
* @param text text prefix for assertions.
* @param f file count
* @param d expected directory count
* @param o expected other entries.
*/
public void assertSizeEquals(String text, long f, long d, long o) {
String self = toString();
Assert.assertEquals(text + ": file count in " + self,
f, getFileCount());
Assert.assertEquals(text + ": directory count in " + self,
d, getDirCount());
Assert.assertEquals(text + ": 'other' count in " + self,
o, getOtherCount());
}
/**
* Assert that the trees are equivalent: that every list matches (and
* that neither has any duplicates).
* @param that the other entry
*/
public void assertEquivalent(TreeScanResults that) {
String details = "this= " + this + "; that=" + that;
assertFieldsEquivalent("files", that, files, that.files);
assertFieldsEquivalent("directories", that,
directories, that.directories);
assertFieldsEquivalent("other", that, other, that.other);
}
/**
* Assert that a field in two instances are equivalent.
* @param fieldname field name for error messages
* @param that the other instance to scan
* @param ours our field's contents
* @param theirs the other instance's field constants
*/
public void assertFieldsEquivalent(String fieldname,
TreeScanResults that,
List<Path> ours, List<Path> theirs) {
assertFalse("Duplicate " + files + " in " + this,
containsDuplicates(ours));
assertFalse("Duplicate " + files + " in other " + that,
containsDuplicates(theirs));
assertTrue(fieldname + " mismatch: between {" + this + "}" +
" and {" + that + "}",
collectionsEquivalent(files, that.files));
}
public List<Path> getFiles() {
return files;
}
public List<Path> getDirectories() {
return directories;
}
public List<Path> getOther() {
return other;
}
public Path getBasePath() {
return basePath;
}
public long getFileCount() {
return files.size();
}
public long getDirCount() {
return directories.size();
}
public long getOtherCount() {
return other.size();
}
/**
* Total count of entries.
* @return the total number of entries
*/
public long totalCount() {
return getFileCount() + getDirCount() + getOtherCount();
}
}
/**
* A simple class for timing operations in nanoseconds, and for
* printing some useful results in the process.
*/
public static final class NanoTimer {
private final long startTime;
private long endTime;
public NanoTimer() {
startTime = now();
}
/**
* End the operation.
* @return the duration of the operation
*/
public long end() {
endTime = now();
return duration();
}
/**
* End the operation; log the duration.
* @param format message
* @param args any arguments
* @return the duration of the operation
*/
public long end(String format, Object... args) {
long d = end();
LOG.info("Duration of {}: {} nS",
String.format(format, args), toHuman(d));
return d;
}
public long now() {
return System.nanoTime();
}
public long duration() {
return endTime - startTime;
}
public double bandwidth(long bytes) {
return bandwidthMBs(bytes, duration());
}
/**
* Bandwidth as bytes per second.
* @param bytes bytes in
* @return the number of bytes per second this operation timed.
*/
public double bandwidthBytes(long bytes) {
return (bytes * 1.0) / duration();
}
/**
* How many nanoseconds per IOP, byte, etc.
* @param operations operations processed in this time period
* @return the nanoseconds it took each byte to be processed
*/
public long nanosPerOperation(long operations) {
return duration() / operations;
}
/**
* Get a description of the bandwidth, even down to fractions of
* a MB.
* @param bytes bytes processed
* @return bandwidth
*/
public String bandwidthDescription(long bytes) {
return String.format("%,.6f", bandwidth(bytes));
}
public long getStartTime() {
return startTime;
}
public long getEndTime() {
return endTime;
}
}
}

View File

@ -0,0 +1,94 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3a;
import com.amazonaws.event.ProgressEvent;
import com.amazonaws.event.ProgressEventType;
import com.amazonaws.event.ProgressListener;
import com.amazonaws.services.s3.transfer.Upload;
import org.apache.hadoop.util.Progressable;
import org.slf4j.Logger;
import static com.amazonaws.event.ProgressEventType.TRANSFER_COMPLETED_EVENT;
import static com.amazonaws.event.ProgressEventType.TRANSFER_PART_STARTED_EVENT;
/**
* Listener to progress from AWS regarding transfers.
*/
public class ProgressableProgressListener implements ProgressListener {
private static final Logger LOG = S3AFileSystem.LOG;
private final S3AFileSystem fs;
private final String key;
private final Progressable progress;
private long lastBytesTransferred;
private final Upload upload;
/**
* Instantiate.
* @param fs filesystem: will be invoked with statistics updates
* @param key key for the upload
* @param upload source of events
* @param progress optional callback for progress.
*/
public ProgressableProgressListener(S3AFileSystem fs,
String key,
Upload upload,
Progressable progress) {
this.fs = fs;
this.key = key;
this.upload = upload;
this.progress = progress;
this.lastBytesTransferred = 0;
}
@Override
public void progressChanged(ProgressEvent progressEvent) {
if (progress != null) {
progress.progress();
}
// There are 3 http ops here, but this should be close enough for now
ProgressEventType pet = progressEvent.getEventType();
if (pet == TRANSFER_PART_STARTED_EVENT ||
pet == TRANSFER_COMPLETED_EVENT) {
fs.incrementWriteOperations();
}
long transferred = upload.getProgress().getBytesTransferred();
long delta = transferred - lastBytesTransferred;
fs.incrementPutProgressStatistics(key, delta);
lastBytesTransferred = transferred;
}
/**
* Method to invoke after upload has completed.
* This can handle race conditions in setup/teardown.
* @return the number of bytes which were transferred after the notification
*/
public long uploadCompleted() {
long delta = upload.getProgress().getBytesTransferred() -
lastBytesTransferred;
if (delta > 0) {
LOG.debug("S3A write delta changed after finished: {} bytes", delta);
fs.incrementPutProgressStatistics(key, delta);
}
return delta;
}
}

View File

@ -35,10 +35,8 @@ import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.util.Progressable;
import org.slf4j.Logger;
@ -54,6 +52,7 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.ThreadPoolExecutor;
import static org.apache.hadoop.fs.s3a.S3AUtils.*;
import static org.apache.hadoop.fs.s3a.Statistic.*;
/**
* Upload files/parts asap directly from a memory buffer (instead of buffering
@ -77,8 +76,6 @@ public class S3AFastOutputStream extends OutputStream {
private final int multiPartThreshold;
private final S3AFileSystem fs;
private final CannedAccessControlList cannedACL;
private final FileSystem.Statistics statistics;
private final String serverSideEncryptionAlgorithm;
private final ProgressListener progressListener;
private final ListeningExecutorService executorService;
private MultiPartUpload multiPartUpload;
@ -98,28 +95,28 @@ public class S3AFastOutputStream extends OutputStream {
* @param bucket S3 bucket name
* @param key S3 key name
* @param progress report progress in order to prevent timeouts
* @param statistics track FileSystem.Statistics on the performed operations
* @param cannedACL used CannedAccessControlList
* @param serverSideEncryptionAlgorithm algorithm for server side encryption
* @param partSize size of a single part in a multi-part upload (except
* last part)
* @param multiPartThreshold files at least this size use multi-part upload
* @param threadPoolExecutor thread factory
* @throws IOException on any problem
*/
public S3AFastOutputStream(AmazonS3Client client, S3AFileSystem fs,
String bucket, String key, Progressable progress,
FileSystem.Statistics statistics, CannedAccessControlList cannedACL,
String serverSideEncryptionAlgorithm, long partSize,
long multiPartThreshold, ThreadPoolExecutor threadPoolExecutor)
public S3AFastOutputStream(AmazonS3Client client,
S3AFileSystem fs,
String bucket,
String key,
Progressable progress,
CannedAccessControlList cannedACL,
long partSize,
long multiPartThreshold,
ThreadPoolExecutor threadPoolExecutor)
throws IOException {
this.bucket = bucket;
this.key = key;
this.client = client;
this.fs = fs;
this.cannedACL = cannedACL;
this.statistics = statistics;
this.serverSideEncryptionAlgorithm = serverSideEncryptionAlgorithm;
//Ensure limit as ByteArrayOutputStream size cannot exceed Integer.MAX_VALUE
if (partSize > Integer.MAX_VALUE) {
this.partSize = Integer.MAX_VALUE;
@ -246,16 +243,17 @@ public class S3AFastOutputStream extends OutputStream {
if (multiPartUpload == null) {
putObject();
} else {
if (buffer.size() > 0) {
int size = buffer.size();
if (size > 0) {
fs.incrementPutStartStatistics(size);
//send last part
multiPartUpload.uploadPartAsync(new ByteArrayInputStream(buffer
.toByteArray()), buffer.size());
.toByteArray()), size);
}
final List<PartETag> partETags = multiPartUpload
.waitForAllPartUploads();
multiPartUpload.complete(partETags);
}
statistics.incrementWriteOps(1);
// This will delete unnecessary fake parent directories
fs.finishedWrite(key);
LOG.debug("Upload complete for bucket '{}' key '{}'", bucket, key);
@ -265,18 +263,19 @@ public class S3AFastOutputStream extends OutputStream {
}
}
/**
* Create the default metadata for a multipart upload operation.
* @return the metadata to use/extend.
*/
private ObjectMetadata createDefaultMetadata() {
ObjectMetadata om = new ObjectMetadata();
if (StringUtils.isNotBlank(serverSideEncryptionAlgorithm)) {
om.setSSEAlgorithm(serverSideEncryptionAlgorithm);
}
return om;
return fs.newObjectMetadata();
}
private MultiPartUpload initiateMultiPartUpload() throws IOException {
final ObjectMetadata om = createDefaultMetadata();
final InitiateMultipartUploadRequest initiateMPURequest =
new InitiateMultipartUploadRequest(bucket, key, om);
new InitiateMultipartUploadRequest(bucket,
key,
createDefaultMetadata());
initiateMPURequest.setCannedACL(cannedACL);
try {
return new MultiPartUpload(
@ -290,15 +289,18 @@ public class S3AFastOutputStream extends OutputStream {
LOG.debug("Executing regular upload for bucket '{}' key '{}'",
bucket, key);
final ObjectMetadata om = createDefaultMetadata();
om.setContentLength(buffer.size());
final PutObjectRequest putObjectRequest = new PutObjectRequest(bucket, key,
new ByteArrayInputStream(buffer.toByteArray()), om);
putObjectRequest.setCannedAcl(cannedACL);
final int size = buffer.size();
om.setContentLength(size);
final PutObjectRequest putObjectRequest =
fs.newPutObjectRequest(key,
om,
new ByteArrayInputStream(buffer.toByteArray()));
putObjectRequest.setGeneralProgressListener(progressListener);
ListenableFuture<PutObjectResult> putObjectResult =
executorService.submit(new Callable<PutObjectResult>() {
@Override
public PutObjectResult call() throws Exception {
fs.incrementPutStartStatistics(size);
return client.putObject(putObjectRequest);
}
});
@ -306,7 +308,7 @@ public class S3AFastOutputStream extends OutputStream {
try {
putObjectResult.get();
} catch (InterruptedException ie) {
LOG.warn("Interrupted object upload:" + ie, ie);
LOG.warn("Interrupted object upload: {}", ie, ie);
Thread.currentThread().interrupt();
} catch (ExecutionException ee) {
throw extractException("regular upload", key, ee);
@ -339,7 +341,7 @@ public class S3AFastOutputStream extends OutputStream {
public PartETag call() throws Exception {
LOG.debug("Uploading part {} for id '{}'", currentPartNumber,
uploadId);
return client.uploadPart(request).getPartETag();
return fs.uploadPart(request).getPartETag();
}
});
partETagsFutures.add(partETagFuture);
@ -349,7 +351,7 @@ public class S3AFastOutputStream extends OutputStream {
try {
return Futures.allAsList(partETagsFutures).get();
} catch (InterruptedException ie) {
LOG.warn("Interrupted partUpload:" + ie, ie);
LOG.warn("Interrupted partUpload: {}", ie, ie);
Thread.currentThread().interrupt();
return null;
} catch (ExecutionException ee) {
@ -382,11 +384,12 @@ public class S3AFastOutputStream extends OutputStream {
public void abort() {
LOG.warn("Aborting multi-part upload with id '{}'", uploadId);
try {
fs.incrementStatistic(OBJECT_MULTIPART_UPLOAD_ABORTED);
client.abortMultipartUpload(new AbortMultipartUploadRequest(bucket,
key, uploadId));
} catch (Exception e2) {
LOG.warn("Unable to abort multipart upload, you may need to purge " +
"uploaded parts: " + e2, e2);
"uploaded parts: {}", e2, e2);
}
}
}

View File

@ -93,4 +93,11 @@ public class S3AFileStatus extends FileStatus {
return super.getModificationTime();
}
}
@Override
public String toString() {
return super.toString() +
String.format(" isEmptyDirectory=%s", isEmptyDirectory());
}
}

View File

@ -45,7 +45,6 @@ import com.amazonaws.services.s3.AmazonS3Client;
import com.amazonaws.services.s3.S3ClientOptions;
import com.amazonaws.services.s3.model.AmazonS3Exception;
import com.amazonaws.services.s3.model.CannedAccessControlList;
import com.amazonaws.services.s3.model.DeleteObjectRequest;
import com.amazonaws.services.s3.model.DeleteObjectsRequest;
import com.amazonaws.services.s3.model.ListObjectsRequest;
import com.amazonaws.services.s3.model.ObjectListing;
@ -53,6 +52,8 @@ import com.amazonaws.services.s3.model.ObjectMetadata;
import com.amazonaws.services.s3.model.PutObjectRequest;
import com.amazonaws.services.s3.model.CopyObjectRequest;
import com.amazonaws.services.s3.model.S3ObjectSummary;
import com.amazonaws.services.s3.model.UploadPartRequest;
import com.amazonaws.services.s3.model.UploadPartResult;
import com.amazonaws.services.s3.transfer.Copy;
import com.amazonaws.services.s3.transfer.TransferManager;
import com.amazonaws.services.s3.transfer.TransferManagerConfiguration;
@ -71,8 +72,13 @@ import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileAlreadyExistsException;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.GlobalStorageStatistics;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.fs.StorageStatistics;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.security.ProviderUtils;
import org.apache.hadoop.util.Progressable;
@ -80,6 +86,7 @@ import org.apache.hadoop.util.VersionInfo;
import static org.apache.hadoop.fs.s3a.Constants.*;
import static org.apache.hadoop.fs.s3a.S3AUtils.*;
import static org.apache.hadoop.fs.s3a.Statistic.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -118,6 +125,7 @@ public class S3AFileSystem extends FileSystem {
private CannedAccessControlList cannedACL;
private String serverSideEncryptionAlgorithm;
private S3AInstrumentation instrumentation;
private S3AStorageStatistics storageStatistics;
private long readAhead;
// The maximum number of entries that can be deleted in any call to s3
@ -237,6 +245,15 @@ public class S3AFileSystem extends FileSystem {
enableMultiObjectsDelete = conf.getBoolean(ENABLE_MULTI_DELETE, true);
readAhead = longOption(conf, READAHEAD_RANGE, DEFAULT_READAHEAD_RANGE, 0);
storageStatistics = (S3AStorageStatistics)
GlobalStorageStatistics.INSTANCE
.put(S3AStorageStatistics.NAME,
new GlobalStorageStatistics.StorageStatisticsProvider() {
@Override
public StorageStatistics provide() {
return new S3AStorageStatistics();
}
});
int maxThreads = intOption(conf, MAX_THREADS, DEFAULT_MAX_THREADS, 0);
int coreThreads = intOption(conf, CORE_THREADS, DEFAULT_CORE_THREADS, 0);
@ -345,6 +362,14 @@ public class S3AFileSystem extends FileSystem {
}
}
/**
* Get S3A Instrumentation. For test purposes.
* @return this instance's instrumentation.
*/
public S3AInstrumentation getInstrumentation() {
return instrumentation;
}
/**
* Initializes the User-Agent header to send in HTTP requests to the S3
* back-end. We always include the Hadoop version number. The user also may
@ -621,23 +646,26 @@ public class S3AFileSystem extends FileSystem {
}
instrumentation.fileCreated();
if (getConf().getBoolean(FAST_UPLOAD, DEFAULT_FAST_UPLOAD)) {
return new FSDataOutputStream(new S3AFastOutputStream(s3, this, bucket,
key, progress, statistics, cannedACL,
serverSideEncryptionAlgorithm, partSize, multiPartThreshold,
threadPoolExecutor), statistics);
return new FSDataOutputStream(
new S3AFastOutputStream(s3,
this,
bucket,
key,
progress,
cannedACL,
partSize,
multiPartThreshold,
threadPoolExecutor),
statistics);
}
// We pass null to FSDataOutputStream so it won't count writes that
// are being buffered to a file
return new FSDataOutputStream(
new S3AOutputStream(getConf(),
transfers,
this,
bucket,
key,
progress,
cannedACL,
statistics,
serverSideEncryptionAlgorithm),
progress
),
null);
}
@ -693,6 +721,7 @@ public class S3AFileSystem extends FileSystem {
private boolean innerRename(Path src, Path dst) throws IOException,
AmazonClientException {
LOG.debug("Rename path {} to {}", src, dst);
incrementStatistic(INVOCATION_RENAME);
String srcKey = pathToKey(src);
String dstKey = pathToKey(dst);
@ -793,8 +822,7 @@ public class S3AFileSystem extends FileSystem {
request.setPrefix(srcKey);
request.setMaxKeys(maxKeys);
ObjectListing objects = s3.listObjects(request);
statistics.incrementReadOps(1);
ObjectListing objects = listObjects(request);
while (true) {
for (S3ObjectSummary summary : objects.getObjectSummaries()) {
@ -808,8 +836,7 @@ public class S3AFileSystem extends FileSystem {
}
if (objects.isTruncated()) {
objects = s3.listNextBatchOfObjects(objects);
statistics.incrementReadOps(1);
objects = continueListObjects(objects);
} else {
if (!keysToDelete.isEmpty()) {
removeKeys(keysToDelete, false);
@ -837,17 +864,223 @@ public class S3AFileSystem extends FileSystem {
return getObjectMetadata(pathToKey(path));
}
/**
* Increment a statistic by 1.
* @param statistic The operation to increment
*/
protected void incrementStatistic(Statistic statistic) {
incrementStatistic(statistic, 1);
}
/**
* Increment a statistic by a specific value.
* @param statistic The operation to increment
* @param count the count to increment
*/
protected void incrementStatistic(Statistic statistic, long count) {
instrumentation.incrementCounter(statistic, count);
storageStatistics.incrementCounter(statistic, count);
}
/**
* Request object metadata; increments counters in the process.
* @param key key
* @return the metadata
*/
private ObjectMetadata getObjectMetadata(String key) {
protected ObjectMetadata getObjectMetadata(String key) {
incrementStatistic(OBJECT_METADATA_REQUESTS);
ObjectMetadata meta = s3.getObjectMetadata(bucket, key);
statistics.incrementReadOps(1);
incrementReadOperations();
return meta;
}
/**
* Initiate a {@code listObjects} operation, incrementing metrics
* in the process.
* @param request request to initiate
* @return the results
*/
protected ObjectListing listObjects(ListObjectsRequest request) {
incrementStatistic(OBJECT_LIST_REQUESTS);
incrementReadOperations();
return s3.listObjects(request);
}
/**
* List the next set of objects.
* @param objects paged result
* @return the next result object
*/
protected ObjectListing continueListObjects(ObjectListing objects) {
incrementStatistic(OBJECT_LIST_REQUESTS);
incrementReadOperations();
return s3.listNextBatchOfObjects(objects);
}
/**
* Increment read operations.
*/
public void incrementReadOperations() {
statistics.incrementReadOps(1);
}
/**
* Increment the write operation counter.
* This is somewhat inaccurate, as it appears to be invoked more
* often than needed in progress callbacks.
*/
public void incrementWriteOperations() {
statistics.incrementWriteOps(1);
}
/**
* Delete an object.
* Increments the {@code OBJECT_DELETE_REQUESTS} and write
* operation statistics.
* @param key key to blob to delete.
*/
private void deleteObject(String key) {
incrementWriteOperations();
incrementStatistic(OBJECT_DELETE_REQUESTS);
s3.deleteObject(bucket, key);
}
/**
* Perform a bulk object delete operation.
* Increments the {@code OBJECT_DELETE_REQUESTS} and write
* operation statistics.
* @param deleteRequest keys to delete on the s3-backend
*/
private void deleteObjects(DeleteObjectsRequest deleteRequest) {
incrementWriteOperations();
incrementStatistic(OBJECT_DELETE_REQUESTS, 1);
s3.deleteObjects(deleteRequest);
}
/**
* Create a putObject request.
* Adds the ACL and metadata
* @param key key of object
* @param metadata metadata header
* @param srcfile source file
* @return the request
*/
public PutObjectRequest newPutObjectRequest(String key,
ObjectMetadata metadata, File srcfile) {
PutObjectRequest putObjectRequest = new PutObjectRequest(bucket, key,
srcfile);
putObjectRequest.setCannedAcl(cannedACL);
putObjectRequest.setMetadata(metadata);
return putObjectRequest;
}
/**
* Create a {@link PutObjectRequest} request.
* The metadata is assumed to have been configured with the size of the
* operation.
* @param key key of object
* @param metadata metadata header
* @param inputStream source data.
* @return the request
*/
PutObjectRequest newPutObjectRequest(String key,
ObjectMetadata metadata, InputStream inputStream) {
PutObjectRequest putObjectRequest = new PutObjectRequest(bucket, key,
inputStream, metadata);
putObjectRequest.setCannedAcl(cannedACL);
return putObjectRequest;
}
/**
* Create a new object metadata instance.
* Any standard metadata headers are added here, for example:
* encryption.
* @return a new metadata instance
*/
public ObjectMetadata newObjectMetadata() {
final ObjectMetadata om = new ObjectMetadata();
if (StringUtils.isNotBlank(serverSideEncryptionAlgorithm)) {
om.setSSEAlgorithm(serverSideEncryptionAlgorithm);
}
return om;
}
/**
* Create a new object metadata instance.
* Any standard metadata headers are added here, for example:
* encryption.
*
* @param length length of data to set in header.
* @return a new metadata instance
*/
public ObjectMetadata newObjectMetadata(long length) {
final ObjectMetadata om = newObjectMetadata();
om.setContentLength(length);
return om;
}
/**
* PUT an object, incrementing the put requests and put bytes
* counters.
* It does not update the other counters,
* as existing code does that as progress callbacks come in.
* Byte length is calculated from the file length, or, if there is no
* file, from the content length of the header.
* @param putObjectRequest the request
* @return the upload initiated
*/
public Upload putObject(PutObjectRequest putObjectRequest) {
long len;
if (putObjectRequest.getFile() != null) {
len = putObjectRequest.getFile().length();
} else {
len = putObjectRequest.getMetadata().getContentLength();
}
incrementPutStartStatistics(len);
return transfers.upload(putObjectRequest);
}
/**
* Upload part of a multi-partition file.
* Increments the write and put counters
* @param request request
* @return the result of the operation.
*/
public UploadPartResult uploadPart(UploadPartRequest request) {
incrementPutStartStatistics(request.getPartSize());
return s3.uploadPart(request);
}
/**
* At the start of a put/multipart upload operation, update the
* relevant counters.
*
* @param bytes bytes in the request.
*/
public void incrementPutStartStatistics(long bytes) {
LOG.debug("PUT start {} bytes", bytes);
incrementWriteOperations();
incrementStatistic(OBJECT_PUT_REQUESTS);
if (bytes > 0) {
incrementStatistic(OBJECT_PUT_BYTES, bytes);
}
}
/**
* Callback for use in progress callbacks from put/multipart upload events.
* Increments those statistics which are expected to be updated during
* the ongoing upload operation.
* @param key key to file that is being written (for logging)
* @param bytes bytes successfully uploaded.
*/
public void incrementPutProgressStatistics(String key, long bytes) {
LOG.debug("PUT {}: {} bytes", key, bytes);
incrementWriteOperations();
if (bytes > 0) {
statistics.incrementBytesWritten(bytes);
}
}
/**
* A helper method to delete a list of keys on a s3-backend.
*
@ -858,21 +1091,13 @@ public class S3AFileSystem extends FileSystem {
private void removeKeys(List<DeleteObjectsRequest.KeyVersion> keysToDelete,
boolean clearKeys) throws AmazonClientException {
if (enableMultiObjectsDelete) {
DeleteObjectsRequest deleteRequest
= new DeleteObjectsRequest(bucket).withKeys(keysToDelete);
s3.deleteObjects(deleteRequest);
deleteObjects(new DeleteObjectsRequest(bucket).withKeys(keysToDelete));
instrumentation.fileDeleted(keysToDelete.size());
statistics.incrementWriteOps(1);
} else {
int writeops = 0;
for (DeleteObjectsRequest.KeyVersion keyVersion : keysToDelete) {
s3.deleteObject(
new DeleteObjectRequest(bucket, keyVersion.getKey()));
writeops++;
deleteObject(keyVersion.getKey());
}
instrumentation.fileDeleted(keysToDelete.size());
statistics.incrementWriteOps(writeops);
}
if (clearKeys) {
keysToDelete.clear();
@ -942,9 +1167,8 @@ public class S3AFileSystem extends FileSystem {
if (status.isEmptyDirectory()) {
LOG.debug("Deleting fake empty directory {}", key);
s3.deleteObject(bucket, key);
deleteObject(key);
instrumentation.directoryDeleted();
statistics.incrementWriteOps(1);
} else {
LOG.debug("Getting objects for directory prefix {} to delete", key);
@ -955,9 +1179,9 @@ public class S3AFileSystem extends FileSystem {
//request.setDelimiter("/");
request.setMaxKeys(maxKeys);
List<DeleteObjectsRequest.KeyVersion> keys = new ArrayList<>();
ObjectListing objects = s3.listObjects(request);
statistics.incrementReadOps(1);
ObjectListing objects = listObjects(request);
List<DeleteObjectsRequest.KeyVersion> keys =
new ArrayList<>(objects.getObjectSummaries().size());
while (true) {
for (S3ObjectSummary summary : objects.getObjectSummaries()) {
keys.add(new DeleteObjectsRequest.KeyVersion(summary.getKey()));
@ -969,8 +1193,7 @@ public class S3AFileSystem extends FileSystem {
}
if (objects.isTruncated()) {
objects = s3.listNextBatchOfObjects(objects);
statistics.incrementReadOps(1);
objects = continueListObjects(objects);
} else {
if (!keys.isEmpty()) {
removeKeys(keys, false);
@ -981,13 +1204,11 @@ public class S3AFileSystem extends FileSystem {
}
} else {
LOG.debug("delete: Path is a file");
s3.deleteObject(bucket, key);
instrumentation.fileDeleted(1);
statistics.incrementWriteOps(1);
deleteObject(key);
}
createFakeDirectoryIfNecessary(f.getParent());
return true;
}
@ -996,7 +1217,7 @@ public class S3AFileSystem extends FileSystem {
String key = pathToKey(f);
if (!key.isEmpty() && !exists(f)) {
LOG.debug("Creating new fake directory at {}", f);
createFakeDirectory(bucket, key);
createFakeDirectory(key);
}
}
@ -1032,6 +1253,7 @@ public class S3AFileSystem extends FileSystem {
IOException, AmazonClientException {
String key = pathToKey(f);
LOG.debug("List status for path: {}", f);
incrementStatistic(INVOCATION_LIST_STATUS);
final List<FileStatus> result = new ArrayList<FileStatus>();
final FileStatus fileStatus = getFileStatus(f);
@ -1049,8 +1271,7 @@ public class S3AFileSystem extends FileSystem {
LOG.debug("listStatus: doing listObjects for directory {}", key);
ObjectListing objects = s3.listObjects(request);
statistics.incrementReadOps(1);
ObjectListing objects = listObjects(request);
Path fQualified = f.makeQualified(uri, workingDir);
@ -1061,33 +1282,25 @@ public class S3AFileSystem extends FileSystem {
if (keyPath.equals(fQualified) ||
summary.getKey().endsWith(S3N_FOLDER_SUFFIX)) {
LOG.debug("Ignoring: {}", keyPath);
continue;
}
if (objectRepresentsDirectory(summary.getKey(), summary.getSize())) {
result.add(new S3AFileStatus(true, true, keyPath));
LOG.debug("Adding: fd: {}", keyPath);
} else {
result.add(new S3AFileStatus(summary.getSize(),
dateToLong(summary.getLastModified()), keyPath,
getDefaultBlockSize(fQualified)));
LOG.debug("Adding: fi: {}", keyPath);
S3AFileStatus status = createFileStatus(keyPath, summary,
getDefaultBlockSize(keyPath));
result.add(status);
LOG.debug("Adding: {}", status);
}
}
for (String prefix : objects.getCommonPrefixes()) {
Path keyPath = keyToPath(prefix).makeQualified(uri, workingDir);
if (keyPath.equals(f)) {
continue;
if (!keyPath.equals(f)) {
result.add(new S3AFileStatus(true, false, keyPath));
LOG.debug("Adding: rd: {}", keyPath);
}
result.add(new S3AFileStatus(true, false, keyPath));
LOG.debug("Adding: rd: {}", keyPath);
}
if (objects.isTruncated()) {
LOG.debug("listStatus: list truncated - getting next batch");
objects = s3.listNextBatchOfObjects(objects);
statistics.incrementReadOps(1);
objects = continueListObjects(objects);
} else {
break;
}
@ -1100,8 +1313,6 @@ public class S3AFileSystem extends FileSystem {
return result.toArray(new FileStatus[result.size()]);
}
/**
* Set the current working directory for the given file system. All relative
* paths will be resolved relative to it.
@ -1123,7 +1334,7 @@ public class S3AFileSystem extends FileSystem {
/**
*
* Make the given path and all non-existent parents into
* directories. Has the semantics of Unix @{code 'mkdir -p'}.
* directories. Has the semantics of Unix {@code 'mkdir -p'}.
* Existence of the directory hierarchy is not an error.
* @param path path to create
* @param permission to apply to f
@ -1158,7 +1369,7 @@ public class S3AFileSystem extends FileSystem {
private boolean innerMkdirs(Path f, FsPermission permission)
throws IOException, FileAlreadyExistsException, AmazonClientException {
LOG.debug("Making directory: {}", f);
incrementStatistic(INVOCATION_MKDIRS);
try {
FileStatus fileStatus = getFileStatus(f);
@ -1187,7 +1398,7 @@ public class S3AFileSystem extends FileSystem {
} while (fPart != null);
String key = pathToKey(f);
createFakeDirectory(bucket, key);
createFakeDirectory(key);
return true;
}
}
@ -1201,12 +1412,12 @@ public class S3AFileSystem extends FileSystem {
*/
public S3AFileStatus getFileStatus(Path f) throws IOException {
String key = pathToKey(f);
incrementStatistic(INVOCATION_GET_FILE_STATUS);
LOG.debug("Getting path status for {} ({})", f , key);
if (!key.isEmpty()) {
try {
ObjectMetadata meta = s3.getObjectMetadata(bucket, key);
statistics.incrementReadOps(1);
ObjectMetadata meta = getObjectMetadata(key);
if (objectRepresentsDirectory(key, meta.getContentLength())) {
LOG.debug("Found exact file: fake directory");
@ -1231,8 +1442,7 @@ public class S3AFileSystem extends FileSystem {
if (!key.endsWith("/")) {
String newKey = key + "/";
try {
ObjectMetadata meta = s3.getObjectMetadata(bucket, newKey);
statistics.incrementReadOps(1);
ObjectMetadata meta = getObjectMetadata(newKey);
if (objectRepresentsDirectory(newKey, meta.getContentLength())) {
LOG.debug("Found file (with /): fake directory");
@ -1265,8 +1475,7 @@ public class S3AFileSystem extends FileSystem {
request.setDelimiter("/");
request.setMaxKeys(1);
ObjectListing objects = s3.listObjects(request);
statistics.incrementReadOps(1);
ObjectListing objects = listObjects(request);
if (!objects.getCommonPrefixes().isEmpty()
|| !objects.getObjectSummaries().isEmpty()) {
@ -1349,7 +1558,8 @@ public class S3AFileSystem extends FileSystem {
private void innerCopyFromLocalFile(boolean delSrc, boolean overwrite,
Path src, Path dst)
throws IOException, FileAlreadyExistsException, AmazonClientException {
String key = pathToKey(dst);
incrementStatistic(INVOCATION_COPY_FROM_LOCAL_FILE);
final String key = pathToKey(dst);
if (!overwrite && exists(dst)) {
throw new FileAlreadyExistsException(dst + " already exists");
@ -1360,35 +1570,19 @@ public class S3AFileSystem extends FileSystem {
LocalFileSystem local = getLocal(getConf());
File srcfile = local.pathToFile(src);
final ObjectMetadata om = new ObjectMetadata();
if (StringUtils.isNotBlank(serverSideEncryptionAlgorithm)) {
om.setSSEAlgorithm(serverSideEncryptionAlgorithm);
}
PutObjectRequest putObjectRequest = new PutObjectRequest(bucket, key, srcfile);
putObjectRequest.setCannedAcl(cannedACL);
putObjectRequest.setMetadata(om);
ProgressListener progressListener = new ProgressListener() {
public void progressChanged(ProgressEvent progressEvent) {
switch (progressEvent.getEventType()) {
case TRANSFER_PART_COMPLETED_EVENT:
statistics.incrementWriteOps(1);
break;
default:
break;
}
}
};
statistics.incrementWriteOps(1);
Upload up = transfers.upload(putObjectRequest);
up.addProgressListener(progressListener);
final ObjectMetadata om = newObjectMetadata();
PutObjectRequest putObjectRequest = newPutObjectRequest(key, om, srcfile);
Upload up = putObject(putObjectRequest);
ProgressableProgressListener listener = new ProgressableProgressListener(
this, key, up, null);
up.addProgressListener(listener);
try {
up.waitForUploadResult();
} catch (InterruptedException e) {
throw new InterruptedIOException("Interrupted copying " + src
+ " to " + dst + ", cancelling");
}
listener.uploadCompleted();
// This will delete unnecessary fake parent directories
finishedWrite(key);
@ -1437,7 +1631,7 @@ public class S3AFileSystem extends FileSystem {
LOG.debug("copyFile {} -> {} ", srcKey, dstKey);
try {
ObjectMetadata srcom = s3.getObjectMetadata(bucket, srcKey);
ObjectMetadata srcom = getObjectMetadata(srcKey);
ObjectMetadata dstom = cloneObjectMetadata(srcom);
if (StringUtils.isNotBlank(serverSideEncryptionAlgorithm)) {
dstom.setSSEAlgorithm(serverSideEncryptionAlgorithm);
@ -1451,7 +1645,7 @@ public class S3AFileSystem extends FileSystem {
public void progressChanged(ProgressEvent progressEvent) {
switch (progressEvent.getEventType()) {
case TRANSFER_PART_COMPLETED_EVENT:
statistics.incrementWriteOps(1);
incrementWriteOperations();
break;
default:
break;
@ -1463,7 +1657,7 @@ public class S3AFileSystem extends FileSystem {
copy.addProgressListener(progressListener);
try {
copy.waitForCopyResult();
statistics.incrementWriteOps(1);
incrementWriteOperations();
instrumentation.filesCopied(1, size);
} catch (InterruptedException e) {
throw new InterruptedIOException("Interrupted copying " + srcKey
@ -1475,26 +1669,12 @@ public class S3AFileSystem extends FileSystem {
}
}
private boolean objectRepresentsDirectory(final String name, final long size) {
return !name.isEmpty()
&& name.charAt(name.length() - 1) == '/'
&& size == 0L;
}
// Handles null Dates that can be returned by AWS
private static long dateToLong(final Date date) {
if (date == null) {
return 0L;
}
return date.getTime();
}
/**
* Perform post-write actions.
* @param key key written to
*/
public void finishedWrite(String key) {
LOG.debug("Finished write to {}", key);
deleteUnnecessaryFakeDirectories(keyToPath(key).getParent());
}
@ -1516,8 +1696,7 @@ public class S3AFileSystem extends FileSystem {
if (status.isDirectory() && status.isEmptyDirectory()) {
LOG.debug("Deleting fake directory {}/", key);
s3.deleteObject(bucket, key + "/");
statistics.incrementWriteOps(1);
deleteObject(key + "/");
}
} catch (IOException | AmazonClientException e) {
LOG.debug("While deleting key {} ", key, e);
@ -1533,18 +1712,20 @@ public class S3AFileSystem extends FileSystem {
}
private void createFakeDirectory(final String bucketName, final String objectName)
throws AmazonClientException, AmazonServiceException {
private void createFakeDirectory(final String objectName)
throws AmazonClientException, AmazonServiceException,
InterruptedIOException {
if (!objectName.endsWith("/")) {
createEmptyObject(bucketName, objectName + "/");
createEmptyObject(objectName + "/");
} else {
createEmptyObject(bucketName, objectName);
createEmptyObject(objectName);
}
}
// Used to create an empty file that represents an empty directory
private void createEmptyObject(final String bucketName, final String objectName)
throws AmazonClientException, AmazonServiceException {
private void createEmptyObject(final String objectName)
throws AmazonClientException, AmazonServiceException,
InterruptedIOException {
final InputStream im = new InputStream() {
@Override
public int read() throws IOException {
@ -1552,16 +1733,16 @@ public class S3AFileSystem extends FileSystem {
}
};
final ObjectMetadata om = new ObjectMetadata();
om.setContentLength(0L);
if (StringUtils.isNotBlank(serverSideEncryptionAlgorithm)) {
om.setSSEAlgorithm(serverSideEncryptionAlgorithm);
PutObjectRequest putObjectRequest = newPutObjectRequest(objectName,
newObjectMetadata(0L),
im);
Upload upload = putObject(putObjectRequest);
try {
upload.waitForUploadResult();
} catch (InterruptedException e) {
throw new InterruptedIOException("Interrupted creating " + objectName);
}
PutObjectRequest putObjectRequest =
new PutObjectRequest(bucketName, objectName, im, om);
putObjectRequest.setCannedAcl(cannedACL);
s3.putObject(putObjectRequest);
statistics.incrementWriteOps(1);
incrementPutProgressStatistics(objectName, 0);
instrumentation.directoryCreated();
}
@ -1576,10 +1757,7 @@ public class S3AFileSystem extends FileSystem {
// This approach may be too brittle, especially if
// in future there are new attributes added to ObjectMetadata
// that we do not explicitly call to set here
ObjectMetadata ret = new ObjectMetadata();
// Non null attributes
ret.setContentLength(source.getContentLength());
ObjectMetadata ret = newObjectMetadata(source.getContentLength());
// Possibly null attributes
// Allowing nulls to pass breaks it during later use
@ -1688,6 +1866,75 @@ public class S3AFileSystem extends FileSystem {
return multiPartThreshold;
}
/**
* Override superclass so as to add statistic collection.
* {@inheritDoc}
*/
@Override
public FileStatus[] globStatus(Path pathPattern) throws IOException {
incrementStatistic(INVOCATION_GLOB_STATUS);
return super.globStatus(pathPattern);
}
/**
* Override superclass so as to add statistic collection.
* {@inheritDoc}
*/
@Override
public FileStatus[] globStatus(Path pathPattern, PathFilter filter)
throws IOException {
incrementStatistic(INVOCATION_GLOB_STATUS);
return super.globStatus(pathPattern, filter);
}
/**
* Override superclass so as to add statistic collection.
* {@inheritDoc}
*/
@Override
public RemoteIterator<LocatedFileStatus> listLocatedStatus(Path f)
throws FileNotFoundException, IOException {
incrementStatistic(INVOCATION_LIST_LOCATED_STATUS);
return super.listLocatedStatus(f);
}
@Override
public RemoteIterator<LocatedFileStatus> listFiles(Path f,
boolean recursive) throws FileNotFoundException, IOException {
incrementStatistic(INVOCATION_LIST_FILES);
return super.listFiles(f, recursive);
}
/**
* Override superclass so as to add statistic collection.
* {@inheritDoc}
*/
@Override
public boolean exists(Path f) throws IOException {
incrementStatistic(INVOCATION_EXISTS);
return super.exists(f);
}
/**
* Override superclass so as to add statistic collection.
* {@inheritDoc}
*/
@Override
public boolean isDirectory(Path f) throws IOException {
incrementStatistic(INVOCATION_IS_DIRECTORY);
return super.isDirectory(f);
}
/**
* Override superclass so as to add statistic collection.
* {@inheritDoc}
*/
@Override
public boolean isFile(Path f) throws IOException {
incrementStatistic(INVOCATION_IS_FILE);
return super.isFile(f);
}
/**
* Get a integer option >= the minimum allowed value.
* @param conf configuration

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.fs.s3a;
import com.google.common.base.Preconditions;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.metrics2.MetricStringBuilder;
@ -26,49 +27,30 @@ import org.apache.hadoop.metrics2.lib.Interns;
import org.apache.hadoop.metrics2.lib.MetricsRegistry;
import org.apache.hadoop.metrics2.lib.MutableCounterLong;
import org.apache.hadoop.metrics2.lib.MutableGaugeLong;
import org.apache.hadoop.metrics2.lib.MutableMetric;
import java.net.URI;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import static org.apache.hadoop.fs.s3a.Statistic.*;
/**
* Instrumentation of S3a.
* Derived from the {@code AzureFileSystemInstrumentation}
* Derived from the {@code AzureFileSystemInstrumentation}.
*
* Counters and metrics are generally addressed in code by their name or
* {@link Statistic} key. There <i>may</i> be some Statistics which do
* not have an entry here. To avoid attempts to access such counters failing,
* the operations to increment/query metric values are designed to handle
* lookup failures.
*/
@Metrics(about = "Metrics for S3a", context = "S3AFileSystem")
@InterfaceAudience.Private
@InterfaceStability.Evolving
public class S3AInstrumentation {
public static final String CONTEXT = "S3AFileSystem";
public static final String STREAM_OPENED = "streamOpened";
public static final String STREAM_CLOSE_OPERATIONS = "streamCloseOperations";
public static final String STREAM_CLOSED = "streamClosed";
public static final String STREAM_ABORTED = "streamAborted";
public static final String STREAM_READ_EXCEPTIONS = "streamReadExceptions";
public static final String STREAM_SEEK_OPERATIONS = "streamSeekOperations";
public static final String STREAM_FORWARD_SEEK_OPERATIONS
= "streamForwardSeekOperations";
public static final String STREAM_BACKWARD_SEEK_OPERATIONS
= "streamBackwardSeekOperations";
public static final String STREAM_SEEK_BYTES_SKIPPED =
"streamBytesSkippedOnSeek";
public static final String STREAM_SEEK_BYTES_BACKWARDS =
"streamBytesBackwardsOnSeek";
public static final String STREAM_SEEK_BYTES_READ = "streamBytesRead";
public static final String STREAM_READ_OPERATIONS = "streamReadOperations";
public static final String STREAM_READ_FULLY_OPERATIONS
= "streamReadFullyOperations";
public static final String STREAM_READ_OPERATIONS_INCOMPLETE
= "streamReadOperationsIncomplete";
public static final String FILES_CREATED = "files_created";
public static final String FILES_COPIED = "files_copied";
public static final String FILES_COPIED_BYTES = "files_copied_bytes";
public static final String FILES_DELETED = "files_deleted";
public static final String DIRECTORIES_CREATED = "directories_created";
public static final String DIRECTORIES_DELETED = "directories_deleted";
public static final String IGNORED_ERRORS = "ignored_errors";
private final MetricsRegistry registry =
new MetricsRegistry("S3AFileSystem").setContext(CONTEXT);
private final MutableCounterLong streamOpenOperations;
@ -95,6 +77,27 @@ public class S3AInstrumentation {
private final MutableCounterLong numberOfDirectoriesDeleted;
private final Map<String, MutableCounterLong> streamMetrics = new HashMap<>();
private static final Statistic[] COUNTERS_TO_CREATE = {
INVOCATION_COPY_FROM_LOCAL_FILE,
INVOCATION_EXISTS,
INVOCATION_GET_FILE_STATUS,
INVOCATION_GLOB_STATUS,
INVOCATION_IS_DIRECTORY,
INVOCATION_IS_FILE,
INVOCATION_LIST_FILES,
INVOCATION_LIST_LOCATED_STATUS,
INVOCATION_LIST_STATUS,
INVOCATION_MKDIRS,
INVOCATION_RENAME,
OBJECT_COPY_REQUESTS,
OBJECT_DELETE_REQUESTS,
OBJECT_LIST_REQUESTS,
OBJECT_METADATA_REQUESTS,
OBJECT_MULTIPART_UPLOAD_ABORTED,
OBJECT_PUT_BYTES,
OBJECT_PUT_REQUESTS
};
public S3AInstrumentation(URI name) {
UUID fileSystemInstanceId = UUID.randomUUID();
registry.tag("FileSystemId",
@ -103,50 +106,35 @@ public class S3AInstrumentation {
registry.tag("fsURI",
"URI of this filesystem",
name.toString());
streamOpenOperations = streamCounter(STREAM_OPENED,
"Total count of times an input stream to object store was opened");
streamCloseOperations = streamCounter(STREAM_CLOSE_OPERATIONS,
"Total count of times an attempt to close a data stream was made");
streamClosed = streamCounter(STREAM_CLOSED,
"Count of times the TCP stream was closed");
streamAborted = streamCounter(STREAM_ABORTED,
"Count of times the TCP stream was aborted");
streamSeekOperations = streamCounter(STREAM_SEEK_OPERATIONS,
"Number of seek operations invoked on input streams");
streamReadExceptions = streamCounter(STREAM_READ_EXCEPTIONS,
"Number of read exceptions caught and attempted to recovered from");
streamForwardSeekOperations = streamCounter(STREAM_FORWARD_SEEK_OPERATIONS,
"Number of executed seek operations which went forward in a stream");
streamBackwardSeekOperations = streamCounter(
STREAM_BACKWARD_SEEK_OPERATIONS,
"Number of executed seek operations which went backwards in a stream");
streamBytesSkippedOnSeek = streamCounter(STREAM_SEEK_BYTES_SKIPPED,
"Count of bytes skipped during forward seek operations");
streamBytesBackwardsOnSeek = streamCounter(STREAM_SEEK_BYTES_BACKWARDS,
"Count of bytes moved backwards during seek operations");
streamBytesRead = streamCounter(STREAM_SEEK_BYTES_READ,
"Count of bytes read during seek() in stream operations");
streamReadOperations = streamCounter(STREAM_READ_OPERATIONS,
"Count of read() operations in streams");
streamReadFullyOperations = streamCounter(STREAM_READ_FULLY_OPERATIONS,
"Count of readFully() operations in streams");
streamReadsIncomplete = streamCounter(STREAM_READ_OPERATIONS_INCOMPLETE,
"Count of incomplete read() operations in streams");
numberOfFilesCreated = counter(FILES_CREATED,
"Total number of files created through the object store.");
numberOfFilesCopied = counter(FILES_COPIED,
"Total number of files copied within the object store.");
bytesOfFilesCopied = counter(FILES_COPIED_BYTES,
"Total number of bytes copied within the object store.");
numberOfFilesDeleted = counter(FILES_DELETED,
"Total number of files deleted through from the object store.");
numberOfDirectoriesCreated = counter(DIRECTORIES_CREATED,
"Total number of directories created through the object store.");
numberOfDirectoriesDeleted = counter(DIRECTORIES_DELETED,
"Total number of directories deleted through the object store.");
ignoredErrors = counter(IGNORED_ERRORS,
"Total number of errors caught and ingored.");
streamOpenOperations = streamCounter(STREAM_OPENED);
streamCloseOperations = streamCounter(STREAM_CLOSE_OPERATIONS);
streamClosed = streamCounter(STREAM_CLOSED);
streamAborted = streamCounter(STREAM_ABORTED);
streamSeekOperations = streamCounter(STREAM_SEEK_OPERATIONS);
streamReadExceptions = streamCounter(STREAM_READ_EXCEPTIONS);
streamForwardSeekOperations =
streamCounter(STREAM_FORWARD_SEEK_OPERATIONS);
streamBackwardSeekOperations =
streamCounter(STREAM_BACKWARD_SEEK_OPERATIONS);
streamBytesSkippedOnSeek = streamCounter(STREAM_SEEK_BYTES_SKIPPED);
streamBytesBackwardsOnSeek =
streamCounter(STREAM_SEEK_BYTES_BACKWARDS);
streamBytesRead = streamCounter(STREAM_SEEK_BYTES_READ);
streamReadOperations = streamCounter(STREAM_READ_OPERATIONS);
streamReadFullyOperations =
streamCounter(STREAM_READ_FULLY_OPERATIONS);
streamReadsIncomplete =
streamCounter(STREAM_READ_OPERATIONS_INCOMPLETE);
numberOfFilesCreated = counter(FILES_CREATED);
numberOfFilesCopied = counter(FILES_COPIED);
bytesOfFilesCopied = counter(FILES_COPIED_BYTES);
numberOfFilesDeleted = counter(FILES_DELETED);
numberOfDirectoriesCreated = counter(DIRECTORIES_CREATED);
numberOfDirectoriesDeleted = counter(DIRECTORIES_DELETED);
ignoredErrors = counter(IGNORED_ERRORS);
for (Statistic statistic : COUNTERS_TO_CREATE) {
counter(statistic);
}
}
/**
@ -173,6 +161,25 @@ public class S3AInstrumentation {
return counter;
}
/**
* Create a counter in the registry.
* @param op statistic to count
* @return a new counter
*/
protected final MutableCounterLong counter(Statistic op) {
return counter(op.getSymbol(), op.getDescription());
}
/**
* Create a counter in the stream map: these are unregistered in the public
* metrics.
* @param op statistic to count
* @return a new counter
*/
protected final MutableCounterLong streamCounter(Statistic op) {
return streamCounter(op.getSymbol(), op.getDescription());
}
/**
* Create a gauge in the registry.
* @param name name gauge name
@ -215,6 +222,58 @@ public class S3AInstrumentation {
return metricBuilder.toString();
}
/**
* Get the value of a counter.
* @param statistic the operation
* @return its value, or 0 if not found.
*/
public long getCounterValue(Statistic statistic) {
return getCounterValue(statistic.getSymbol());
}
/**
* Get the value of a counter.
* If the counter is null, return 0.
* @param name the name of the counter
* @return its value.
*/
public long getCounterValue(String name) {
MutableCounterLong counter = lookupCounter(name);
return counter == null ? 0 : counter.value();
}
/**
* Lookup a counter by name. Return null if it is not known.
* @param name counter name
* @return the counter
*/
private MutableCounterLong lookupCounter(String name) {
MutableMetric metric = lookupMetric(name);
if (metric == null) {
return null;
}
Preconditions.checkNotNull(metric, "not found: " + name);
if (!(metric instanceof MutableCounterLong)) {
throw new IllegalStateException("Metric " + name
+ " is not a MutableCounterLong: " + metric);
}
return (MutableCounterLong) metric;
}
/**
* Look up a metric from both the registered set and the lighter weight
* stream entries.
* @param name metric name
* @return the metric or null
*/
public MutableMetric lookupMetric(String name) {
MutableMetric metric = getRegistry().get(name);
if (metric == null) {
metric = streamMetrics.get(name);
}
return metric;
}
/**
* Indicate that S3A created a file.
*/
@ -262,6 +321,19 @@ public class S3AInstrumentation {
ignoredErrors.incr();
}
/**
* Increment a specific counter.
* No-op if not defined.
* @param op operation
* @param count increment value
*/
public void incrementCounter(Statistic op, long count) {
MutableCounterLong counter = lookupCounter(op.getSymbol());
if (counter != null) {
counter.incr(count);
}
}
/**
* Create a stream input statistics instance.
* @return the new instance

View File

@ -19,19 +19,11 @@
package org.apache.hadoop.fs.s3a;
import com.amazonaws.AmazonClientException;
import com.amazonaws.event.ProgressEvent;
import com.amazonaws.event.ProgressEventType;
import com.amazonaws.event.ProgressListener;
import com.amazonaws.services.s3.model.CannedAccessControlList;
import com.amazonaws.services.s3.model.ObjectMetadata;
import com.amazonaws.services.s3.model.PutObjectRequest;
import com.amazonaws.services.s3.transfer.TransferManager;
import com.amazonaws.services.s3.transfer.Upload;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.util.Progressable;
@ -44,8 +36,6 @@ import java.io.IOException;
import java.io.InterruptedIOException;
import java.io.OutputStream;
import static com.amazonaws.event.ProgressEventType.TRANSFER_COMPLETED_EVENT;
import static com.amazonaws.event.ProgressEventType.TRANSFER_PART_STARTED_EVENT;
import static org.apache.hadoop.fs.s3a.Constants.*;
import static org.apache.hadoop.fs.s3a.S3AUtils.*;
@ -59,32 +49,20 @@ public class S3AOutputStream extends OutputStream {
private File backupFile;
private boolean closed;
private String key;
private String bucket;
private TransferManager transfers;
private Progressable progress;
private long partSize;
private long partSizeThreshold;
private S3AFileSystem fs;
private CannedAccessControlList cannedACL;
private FileSystem.Statistics statistics;
private LocalDirAllocator lDirAlloc;
private String serverSideEncryptionAlgorithm;
public static final Logger LOG = S3AFileSystem.LOG;
public S3AOutputStream(Configuration conf, TransferManager transfers,
S3AFileSystem fs, String bucket, String key, Progressable progress,
CannedAccessControlList cannedACL, FileSystem.Statistics statistics,
String serverSideEncryptionAlgorithm)
public S3AOutputStream(Configuration conf,
S3AFileSystem fs, String key, Progressable progress)
throws IOException {
this.bucket = bucket;
this.key = key;
this.transfers = transfers;
this.progress = progress;
this.fs = fs;
this.cannedACL = cannedACL;
this.statistics = statistics;
this.serverSideEncryptionAlgorithm = serverSideEncryptionAlgorithm;
partSize = fs.getPartitionSize();
partSizeThreshold = fs.getMultiPartThreshold();
@ -124,30 +102,18 @@ public class S3AOutputStream extends OutputStream {
try {
final ObjectMetadata om = new ObjectMetadata();
if (StringUtils.isNotBlank(serverSideEncryptionAlgorithm)) {
om.setSSEAlgorithm(serverSideEncryptionAlgorithm);
}
PutObjectRequest putObjectRequest =
new PutObjectRequest(bucket, key, backupFile);
putObjectRequest.setCannedAcl(cannedACL);
putObjectRequest.setMetadata(om);
Upload upload = transfers.upload(putObjectRequest);
ProgressableProgressListener listener =
new ProgressableProgressListener(upload, progress, statistics);
final ObjectMetadata om = fs.newObjectMetadata();
Upload upload = fs.putObject(
fs.newPutObjectRequest(
key,
om,
backupFile));
ProgressableProgressListener listener =
new ProgressableProgressListener(fs, key, upload, progress);
upload.addProgressListener(listener);
upload.waitForUploadResult();
long delta = upload.getProgress().getBytesTransferred() -
listener.getLastBytesTransferred();
if (statistics != null && delta != 0) {
LOG.debug("S3A write delta changed after finished: {} bytes", delta);
statistics.incrementBytesWritten(delta);
}
listener.uploadCompleted();
// This will delete unnecessary fake parent directories
fs.finishedWrite(key);
} catch (InterruptedException e) {
@ -175,46 +141,4 @@ public class S3AOutputStream extends OutputStream {
backupStream.write(b, off, len);
}
/**
* Listener to progress from AWS regarding transfers.
*/
public static class ProgressableProgressListener implements ProgressListener {
private Progressable progress;
private FileSystem.Statistics statistics;
private long lastBytesTransferred;
private Upload upload;
public ProgressableProgressListener(Upload upload, Progressable progress,
FileSystem.Statistics statistics) {
this.upload = upload;
this.progress = progress;
this.statistics = statistics;
this.lastBytesTransferred = 0;
}
public void progressChanged(ProgressEvent progressEvent) {
if (progress != null) {
progress.progress();
}
// There are 3 http ops here, but this should be close enough for now
ProgressEventType pet = progressEvent.getEventType();
if (pet == TRANSFER_PART_STARTED_EVENT ||
pet == TRANSFER_COMPLETED_EVENT) {
statistics.incrementWriteOps(1);
}
long transferred = upload.getProgress().getBytesTransferred();
long delta = transferred - lastBytesTransferred;
if (statistics != null && delta != 0) {
statistics.incrementBytesWritten(delta);
}
lastBytesTransferred = transferred;
}
public long getLastBytesTransferred() {
return lastBytesTransferred;
}
}
}

View File

@ -0,0 +1,104 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3a;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.StorageStatistics;
import org.slf4j.Logger;
import java.util.Collections;
import java.util.EnumMap;
import java.util.Iterator;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.concurrent.atomic.AtomicLong;
/**
* Storage statistics for S3A.
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
public class S3AStorageStatistics extends StorageStatistics {
private static final Logger LOG = S3AFileSystem.LOG;
public static final String NAME = "S3AStorageStatistics";
private final Map<Statistic, AtomicLong> opsCount =
new EnumMap<>(Statistic.class);
public S3AStorageStatistics() {
super(NAME);
for (Statistic opType : Statistic.values()) {
opsCount.put(opType, new AtomicLong(0));
}
}
/**
* Increment a specific counter.
* @param op operation
* @param count increment value
* @return the new value
*/
public long incrementCounter(Statistic op, long count) {
long updated = opsCount.get(op).addAndGet(count);
LOG.debug("{} += {} -> {}", op, count, updated);
return updated;
}
private class LongIterator implements Iterator<LongStatistic> {
private Iterator<Map.Entry<Statistic, AtomicLong>> iterator =
Collections.unmodifiableSet(opsCount.entrySet()).iterator();
@Override
public boolean hasNext() {
return iterator.hasNext();
}
@Override
public LongStatistic next() {
if (!iterator.hasNext()) {
throw new NoSuchElementException();
}
final Map.Entry<Statistic, AtomicLong> entry = iterator.next();
return new LongStatistic(entry.getKey().name(), entry.getValue().get());
}
@Override
public void remove() {
throw new UnsupportedOperationException();
}
}
@Override
public Iterator<LongStatistic> getLongStatistics() {
return new LongIterator();
}
@Override
public Long getLong(String key) {
final Statistic type = Statistic.fromSymbol(key);
return type == null ? null : opsCount.get(type).get();
}
@Override
public boolean isTracked(String key) {
return Statistic.fromSymbol(key) == null;
}
}

View File

@ -21,6 +21,7 @@ package org.apache.hadoop.fs.s3a;
import com.amazonaws.AmazonClientException;
import com.amazonaws.AmazonServiceException;
import com.amazonaws.services.s3.model.AmazonS3Exception;
import com.amazonaws.services.s3.model.S3ObjectSummary;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.Path;
@ -29,6 +30,7 @@ import java.io.EOFException;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.nio.file.AccessDeniedException;
import java.util.Date;
import java.util.Map;
import java.util.concurrent.ExecutionException;
@ -186,4 +188,50 @@ public final class S3AUtils {
}
return builder.toString();
}
/**
* Create a files status instance from a listing.
* @param keyPath path to entry
* @param summary summary from AWS
* @param blockSize block size to declare.
* @return a status entry
*/
public static S3AFileStatus createFileStatus(Path keyPath,
S3ObjectSummary summary,
long blockSize) {
if (objectRepresentsDirectory(summary.getKey(), summary.getSize())) {
return new S3AFileStatus(true, true, keyPath);
} else {
return new S3AFileStatus(summary.getSize(),
dateToLong(summary.getLastModified()), keyPath,
blockSize);
}
}
/**
* Predicate: does the object represent a directory?.
* @param name object name
* @param size object size
* @return true if it meets the criteria for being an object
*/
public static boolean objectRepresentsDirectory(final String name,
final long size) {
return !name.isEmpty()
&& name.charAt(name.length() - 1) == '/'
&& size == 0L;
}
/**
* Date to long conversion.
* Handles null Dates that can be returned by AWS by returning 0
* @param date date from AWS query
* @return timestamp of the object
*/
public static long dateToLong(final Date date) {
if (date == null) {
return 0L;
}
return date.getTime();
}
}

View File

@ -0,0 +1,143 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3a;
/**
* Statistic which are collected in S3A.
* These statistics are available at a low level in {@link S3AStorageStatistics}
* and as metrics in {@link S3AInstrumentation}
*/
public enum Statistic {
DIRECTORIES_CREATED("directories_created",
"Total number of directories created through the object store."),
DIRECTORIES_DELETED("directories_deleted",
"Total number of directories deleted through the object store."),
FILES_COPIED("files_copied",
"Total number of files copied within the object store."),
FILES_COPIED_BYTES("files_copied_bytes",
"Total number of bytes copied within the object store."),
FILES_CREATED("files_created",
"Total number of files created through the object store."),
FILES_DELETED("files_deleted",
"Total number of files deleted from the object store."),
IGNORED_ERRORS("ignored_errors", "Errors caught and ignored"),
INVOCATION_COPY_FROM_LOCAL_FILE("invocations_copyfromlocalfile",
"Calls of copyFromLocalFile()"),
INVOCATION_EXISTS("invocations_exists",
"Calls of exists()"),
INVOCATION_GET_FILE_STATUS("invocations_getfilestatus",
"Calls of getFileStatus()"),
INVOCATION_GLOB_STATUS("invocations_globstatus",
"Calls of globStatus()"),
INVOCATION_IS_DIRECTORY("invocations_is_directory",
"Calls of isDirectory()"),
INVOCATION_IS_FILE("invocations_is_file",
"Calls of isFile()"),
INVOCATION_LIST_FILES("invocations_listfiles",
"Calls of listFiles()"),
INVOCATION_LIST_LOCATED_STATUS("invocations_listlocatedstatus",
"Calls of listLocatedStatus()"),
INVOCATION_LIST_STATUS("invocations_liststatus",
"Calls of listStatus()"),
INVOCATION_MKDIRS("invocations_mdkirs",
"Calls of mkdirs()"),
INVOCATION_RENAME("invocations_rename",
"Calls of rename()"),
OBJECT_COPY_REQUESTS("object_copy_requests", "Object copy requests"),
OBJECT_DELETE_REQUESTS("object_delete_requests", "Object delete requests"),
OBJECT_LIST_REQUESTS("object_list_requests",
"Number of object listings made"),
OBJECT_METADATA_REQUESTS("object_metadata_requests",
"Number of requests for object metadata"),
OBJECT_MULTIPART_UPLOAD_ABORTED("object_multipart_aborted",
"Object multipart upload aborted"),
OBJECT_PUT_REQUESTS("object_put_requests",
"Object put/multipart upload count"),
OBJECT_PUT_BYTES("object_put_bytes", "number of bytes uploaded"),
STREAM_ABORTED("streamAborted",
"Count of times the TCP stream was aborted"),
STREAM_BACKWARD_SEEK_OPERATIONS("streamBackwardSeekOperations",
"Number of executed seek operations which went backwards in a stream"),
STREAM_CLOSED("streamClosed", "Count of times the TCP stream was closed"),
STREAM_CLOSE_OPERATIONS("streamCloseOperations",
"Total count of times an attempt to close a data stream was made"),
STREAM_FORWARD_SEEK_OPERATIONS("streamForwardSeekOperations",
"Number of executed seek operations which went forward in a stream"),
STREAM_OPENED("streamOpened",
"Total count of times an input stream to object store was opened"),
STREAM_READ_EXCEPTIONS("streamReadExceptions",
"Number of seek operations invoked on input streams"),
STREAM_READ_FULLY_OPERATIONS("streamReadFullyOperations",
"count of readFully() operations in streams"),
STREAM_READ_OPERATIONS("streamReadOperations",
"Count of read() operations in streams"),
STREAM_READ_OPERATIONS_INCOMPLETE("streamReadOperationsIncomplete",
"Count of incomplete read() operations in streams"),
STREAM_SEEK_BYTES_BACKWARDS("streamBytesBackwardsOnSeek",
"Count of bytes moved backwards during seek operations"),
STREAM_SEEK_BYTES_READ("streamBytesRead",
"Count of bytes read during seek() in stream operations"),
STREAM_SEEK_BYTES_SKIPPED("streamBytesSkippedOnSeek",
"Count of bytes skipped during forward seek operation"),
STREAM_SEEK_OPERATIONS("streamSeekOperations",
"Number of read exceptions caught and attempted to recovered from");
Statistic(String symbol, String description) {
this.symbol = symbol;
this.description = description;
}
private final String symbol;
private final String description;
public String getSymbol() {
return symbol;
}
/**
* Get a statistic from a symbol.
* @param symbol statistic to look up
* @return the value or null.
*/
public static Statistic fromSymbol(String symbol) {
if (symbol != null) {
for (Statistic opType : values()) {
if (opType.getSymbol().equals(symbol)) {
return opType;
}
}
}
return null;
}
public String getDescription() {
return description;
}
/**
* The string value is simply the symbol.
* This makes this operation very low cost.
* @return the symbol of this statistic.
*/
@Override
public String toString() {
return symbol;
}
}

View File

@ -738,9 +738,19 @@ The exact number of operations to perform is configurable in the option
Larger values generate more load, and are recommended when testing locally,
or in batch runs.
Smaller values should result in faster test runs, especially when the object
Smaller values results in faster test runs, especially when the object
store is a long way away.
Operations which work on directories have a separate option: this controls
the width and depth of tests creating recursive directories. Larger
values create exponentially more directories, with consequent performance
impact.
<property>
<name>scale.test.directory.count</name>
<value>2</value>
</property>
DistCp tests targeting S3A support a configurable file size. The default is
10 MB, but the configuration value is expressed in KB so that it can be tuned
smaller to achieve faster test runs.

View File

@ -21,7 +21,9 @@ package org.apache.hadoop.fs.s3a;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileContext;
import org.junit.Assert;
import org.junit.internal.AssumptionViolatedException;
import org.slf4j.Logger;
import java.io.IOException;
import java.net.URI;
@ -190,4 +192,155 @@ public class S3ATestUtils {
}
}
/**
* Reset all metrics in a list.
* @param metrics metrics to reset
*/
public static void reset(S3ATestUtils.MetricDiff... metrics) {
for (S3ATestUtils.MetricDiff metric : metrics) {
metric.reset();
}
}
/**
* Print all metrics in a list.
* @param log log to print the metrics to.
* @param metrics metrics to process
*/
public static void print(Logger log, S3ATestUtils.MetricDiff... metrics) {
for (S3ATestUtils.MetricDiff metric : metrics) {
log.info(metric.toString());
}
}
/**
* Print all metrics in a list, then reset them.
* @param log log to print the metrics to.
* @param metrics metrics to process
*/
public static void printThenReset(Logger log,
S3ATestUtils.MetricDiff... metrics) {
print(log, metrics);
reset(metrics);
}
/**
* Helper class to do diffs of metrics.
*/
public static final class MetricDiff {
private final S3AFileSystem fs;
private final Statistic statistic;
private long startingValue;
/**
* Constructor.
* Invokes {@link #reset()} so it is immediately capable of measuring the
* difference in metric values.
*
* @param fs the filesystem to monitor
* @param statistic the statistic to monitor.
*/
public MetricDiff(S3AFileSystem fs, Statistic statistic) {
this.fs = fs;
this.statistic = statistic;
reset();
}
/**
* Reset the starting value to the current value.
* Diffs will be against this new value.
*/
public void reset() {
startingValue = currentValue();
}
/**
* Get the current value of the metric.
* @return the latest value.
*/
public long currentValue() {
return fs.getInstrumentation().getCounterValue(statistic);
}
/**
* Get the difference between the the current value and
* {@link #startingValue}.
* @return the difference.
*/
public long diff() {
return currentValue() - startingValue;
}
@Override
public String toString() {
long c = currentValue();
final StringBuilder sb = new StringBuilder(statistic.getSymbol());
sb.append(" starting=").append(startingValue);
sb.append(" current=").append(c);
sb.append(" diff=").append(c - startingValue);
return sb.toString();
}
/**
* Assert that the value of {@link #diff()} matches that expected.
* @param expected expected value.
*/
public void assertDiffEquals(long expected) {
Assert.assertEquals("Count of " + this,
expected, diff());
}
/**
* Assert that the value of {@link #diff()} matches that of another
* instance.
* @param that the other metric diff instance.
*/
public void assertDiffEquals(MetricDiff that) {
Assert.assertEquals(this.toString() + " != " + that,
this.diff(), that.diff());
}
/**
* Comparator for assertions.
* @param that other metric diff
* @return true if the value is {@code ==} the other's
*/
public boolean diffEquals(MetricDiff that) {
return this.currentValue() == that.currentValue();
}
/**
* Comparator for assertions.
* @param that other metric diff
* @return true if the value is {@code <} the other's
*/
public boolean diffLessThan(MetricDiff that) {
return this.currentValue() < that.currentValue();
}
/**
* Comparator for assertions.
* @param that other metric diff
* @return true if the value is {@code <=} the other's
*/
public boolean diffLessThanOrEquals(MetricDiff that) {
return this.currentValue() <= that.currentValue();
}
/**
* Get the statistic
* @return the statistic
*/
public Statistic getStatistic() {
return statistic;
}
/**
* Get the starting value; that set in the last {@link #reset()}.
* @return the starting value for diffs.
*/
public long getStartingValue() {
return startingValue;
}
}
}

View File

@ -0,0 +1,192 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3a;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.contract.AbstractFSContract;
import org.apache.hadoop.fs.contract.AbstractFSContractTestBase;
import org.apache.hadoop.fs.contract.ContractTestUtils;
import org.apache.hadoop.fs.contract.s3a.S3AContract;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.FileNotFoundException;
import java.net.URI;
import static org.apache.hadoop.fs.contract.ContractTestUtils.*;
import static org.apache.hadoop.fs.s3a.Statistic.*;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.*;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.MetricDiff;
/**
* Use metrics to assert about the cost of file status queries.
* {@link S3AFileSystem#getFileStatus(Path)}.
*/
public class TestS3AFileOperationCost extends AbstractFSContractTestBase {
private MetricDiff metadataRequests;
private MetricDiff listRequests;
private static final Logger LOG =
LoggerFactory.getLogger(TestS3AFileOperationCost.class);
@Override
protected AbstractFSContract createContract(Configuration conf) {
return new S3AContract(conf);
}
@Override
public S3AFileSystem getFileSystem() {
return (S3AFileSystem) super.getFileSystem();
}
@Override
public void setup() throws Exception {
super.setup();
S3AFileSystem fs = getFileSystem();
metadataRequests = new MetricDiff(fs, OBJECT_METADATA_REQUESTS);
listRequests = new MetricDiff(fs, OBJECT_LIST_REQUESTS);
}
@Test
public void testCostOfGetFileStatusOnFile() throws Throwable {
describe("performing getFileStatus on a file");
Path simpleFile = path("simple.txt");
S3AFileSystem fs = getFileSystem();
touch(fs, simpleFile);
resetMetricDiffs();
S3AFileStatus status = fs.getFileStatus(simpleFile);
assertTrue("not a file: " + status, status.isFile());
metadataRequests.assertDiffEquals(1);
listRequests.assertDiffEquals(0);
}
private void resetMetricDiffs() {
reset(metadataRequests, listRequests);
}
@Test
public void testCostOfGetFileStatusOnEmptyDir() throws Throwable {
describe("performing getFileStatus on an empty directory");
S3AFileSystem fs = getFileSystem();
Path dir = path("empty");
fs.mkdirs(dir);
resetMetricDiffs();
S3AFileStatus status = fs.getFileStatus(dir);
assertTrue("not empty: " + status, status.isEmptyDirectory());
metadataRequests.assertDiffEquals(2);
listRequests.assertDiffEquals(0);
}
@Test
public void testCostOfGetFileStatusOnMissingFile() throws Throwable {
describe("performing getFileStatus on a missing file");
S3AFileSystem fs = getFileSystem();
Path path = path("missing");
resetMetricDiffs();
try {
S3AFileStatus status = fs.getFileStatus(path);
fail("Got a status back from a missing file path " + status);
} catch (FileNotFoundException expected) {
// expected
}
metadataRequests.assertDiffEquals(2);
listRequests.assertDiffEquals(1);
}
@Test
public void testCostOfGetFileStatusOnMissingSubPath() throws Throwable {
describe("performing getFileStatus on a missing file");
S3AFileSystem fs = getFileSystem();
Path path = path("missingdir/missingpath");
resetMetricDiffs();
try {
S3AFileStatus status = fs.getFileStatus(path);
fail("Got a status back from a missing file path " + status);
} catch (FileNotFoundException expected) {
// expected
}
metadataRequests.assertDiffEquals(2);
listRequests.assertDiffEquals(1);
}
@Test
public void testCostOfGetFileStatusOnNonEmptyDir() throws Throwable {
describe("performing getFileStatus on a non-empty directory");
S3AFileSystem fs = getFileSystem();
Path dir = path("empty");
fs.mkdirs(dir);
Path simpleFile = new Path(dir, "simple.txt");
touch(fs, simpleFile);
resetMetricDiffs();
S3AFileStatus status = fs.getFileStatus(dir);
if (status.isEmptyDirectory()) {
// erroneous state
String fsState = fs.toString();
fail("FileStatus says directory isempty: " + status
+ "\n" + ContractTestUtils.ls(fs, dir)
+ "\n" + fsState);
}
metadataRequests.assertDiffEquals(2);
listRequests.assertDiffEquals(1);
}
@Test
public void testCostOfCopyFromLocalFile() throws Throwable {
describe("testCostOfCopyFromLocalFile");
String testDirProp = System.getProperty("test.build.data",
"target" + File.separator + "test" + File.separator + "data");
File localTestDir = new File(testDirProp, "tmp").getAbsoluteFile();
localTestDir.mkdirs();
File tmpFile = File.createTempFile("tests3acost", ".txt",
localTestDir);
tmpFile.delete();
try {
URI localFileURI = tmpFile.toURI();
FileSystem localFS = FileSystem.get(localFileURI,
getFileSystem().getConf());
Path localPath = new Path(localFileURI);
int len = 10 * 1024;
byte[] data = dataset(len, 'A', 'Z');
writeDataset(localFS, localPath, data, len, 1024, true);
S3AFileSystem s3a = getFileSystem();
MetricDiff copyLocalOps = new MetricDiff(s3a,
INVOCATION_COPY_FROM_LOCAL_FILE);
MetricDiff putRequests = new MetricDiff(s3a,
OBJECT_PUT_REQUESTS);
MetricDiff putBytes = new MetricDiff(s3a,
OBJECT_PUT_BYTES);
Path remotePath = path("copied");
s3a.copyFromLocalFile(false, true, localPath, remotePath);
verifyFileContents(s3a, remotePath, data);
copyLocalOps.assertDiffEquals(1);
putRequests.assertDiffEquals(1);
putBytes.assertDiffEquals(len);
// print final stats
LOG.info("Filesystem {}", s3a);
} finally {
tmpFile.delete();
}
}
}

View File

@ -34,13 +34,11 @@ import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.rules.TestName;
import org.junit.rules.Timeout;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.InputStream;
import java.util.Locale;
import static org.junit.Assume.assumeTrue;
/**
* Base class for scale tests; here is where the common scale configuration
@ -51,11 +49,57 @@ public class S3AScaleTestBase extends Assert implements S3ATestConstants {
@Rule
public TestName methodName = new TestName();
@Rule
public Timeout testTimeout = new Timeout(30 * 60 * 1000);
@BeforeClass
public static void nameThread() {
Thread.currentThread().setName("JUnit");
}
/**
* The number of operations to perform: {@value}.
*/
public static final String KEY_OPERATION_COUNT =
SCALE_TEST + "operation.count";
/**
* The number of directory operations to perform: {@value}.
*/
public static final String KEY_DIRECTORY_COUNT =
SCALE_TEST + "directory.count";
/**
* The readahead buffer: {@value}.
*/
public static final String KEY_READ_BUFFER_SIZE =
S3A_SCALE_TEST + "read.buffer.size";
public static final int DEFAULT_READ_BUFFER_SIZE = 16384;
/**
* Key for a multi MB test file: {@value}.
*/
public static final String KEY_CSVTEST_FILE =
S3A_SCALE_TEST + "csvfile";
/**
* Default path for the multi MB test file: {@value}.
*/
public static final String DEFAULT_CSVTEST_FILE
= "s3a://landsat-pds/scene_list.gz";
/**
* The default number of operations to perform: {@value}.
*/
public static final long DEFAULT_OPERATION_COUNT = 2005;
/**
* Default number of directories to create when performing
* directory performance/scale tests.
*/
public static final int DEFAULT_DIRECTORY_COUNT = 2;
protected S3AFileSystem fs;
protected static final Logger LOG =
@ -132,108 +176,4 @@ public class S3AScaleTestBase extends Assert implements S3ATestConstants {
}
}
/**
* Make times more readable, by adding a "," every three digits.
* @param nanos nanos or other large number
* @return a string for logging
*/
protected static String toHuman(long nanos) {
return String.format(Locale.ENGLISH, "%,d", nanos);
}
/**
* Log the bandwidth of a timer as inferred from the number of
* bytes processed.
* @param timer timer
* @param bytes bytes processed in the time period
*/
protected void bandwidth(NanoTimer timer, long bytes) {
LOG.info("Bandwidth = {} MB/S",
timer.bandwidthDescription(bytes));
}
/**
* Work out the bandwidth in MB/s
* @param bytes bytes
* @param durationNS duration in nanos
* @return the number of megabytes/second of the recorded operation
*/
public static double bandwidthMBs(long bytes, long durationNS) {
return (bytes * 1000.0 ) / durationNS;
}
/**
* A simple class for timing operations in nanoseconds, and for
* printing some useful results in the process.
*/
protected static class NanoTimer {
final long startTime;
long endTime;
public NanoTimer() {
startTime = now();
}
/**
* End the operation
* @return the duration of the operation
*/
public long end() {
endTime = now();
return duration();
}
/**
* End the operation; log the duration
* @param format message
* @param args any arguments
* @return the duration of the operation
*/
public long end(String format, Object... args) {
long d = end();
LOG.info("Duration of {}: {} nS",
String.format(format, args), toHuman(d));
return d;
}
long now() {
return System.nanoTime();
}
long duration() {
return endTime - startTime;
}
double bandwidth(long bytes) {
return S3AScaleTestBase.bandwidthMBs(bytes, duration());
}
/**
* Bandwidth as bytes per second
* @param bytes bytes in
* @return the number of bytes per second this operation timed.
*/
double bandwidthBytes(long bytes) {
return (bytes * 1.0 ) / duration();
}
/**
* How many nanoseconds per byte
* @param bytes bytes processed in this time period
* @return the nanoseconds it took each byte to be processed
*/
long nanosPerByte(long bytes) {
return duration() / bytes;
}
/**
* Get a description of the bandwidth, even down to fractions of
* a MB
* @param bytes bytes processed
* @return bandwidth
*/
String bandwidthDescription(long bytes) {
return String.format("%,.6f", bandwidth(bytes));
}
}
}

View File

@ -20,9 +20,7 @@ package org.apache.hadoop.fs.s3a.scale;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.contract.ContractTestUtils;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.Timeout;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -34,15 +32,13 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import static org.junit.Assert.assertEquals;
/**
* Test some scalable operations related to file renaming and deletion.
*/
public class TestS3ADeleteManyFiles extends S3AScaleTestBase {
private static final Logger LOG =
LoggerFactory.getLogger(TestS3ADeleteManyFiles.class);
@Rule
public Timeout testTimeout = new Timeout(30 * 60 * 1000);
@Test
public void testBulkRenameAndDelete() throws Throwable {
final Path scaleTestDir = getTestPath();

View File

@ -0,0 +1,189 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3a.scale;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.s3a.Statistic;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import static org.apache.hadoop.fs.s3a.Statistic.*;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.*;
import static org.apache.hadoop.fs.contract.ContractTestUtils.*;
/**
* Test the performance of listing files/directories.
*/
public class TestS3ADirectoryPerformance extends S3AScaleTestBase {
private static final Logger LOG = LoggerFactory.getLogger(
TestS3ADirectoryPerformance.class);
@Test
public void testListOperations() throws Throwable {
describe("Test recursive list operations");
final Path scaleTestDir = getTestPath();
final Path listDir = new Path(scaleTestDir, "lists");
// scale factor.
int scale = getConf().getInt(KEY_DIRECTORY_COUNT, DEFAULT_DIRECTORY_COUNT);
int width = scale;
int depth = scale;
int files = scale;
MetricDiff metadataRequests = new MetricDiff(fs, OBJECT_METADATA_REQUESTS);
MetricDiff listRequests = new MetricDiff(fs, OBJECT_LIST_REQUESTS);
MetricDiff listStatusCalls = new MetricDiff(fs, INVOCATION_LIST_FILES);
MetricDiff getFileStatusCalls =
new MetricDiff(fs, INVOCATION_GET_FILE_STATUS);
NanoTimer createTimer = new NanoTimer();
TreeScanResults created =
createSubdirs(fs, listDir, depth, width, files, 0);
// add some empty directories
int emptyDepth = 1 * scale;
int emptyWidth = 3 * scale;
created.add(createSubdirs(fs, listDir, emptyDepth, emptyWidth, 0,
0, "empty", "f-", ""));
createTimer.end("Time to create %s", created);
LOG.info("Time per operation: {}",
toHuman(createTimer.nanosPerOperation(created.totalCount())));
printThenReset(LOG,
metadataRequests,
listRequests,
listStatusCalls,
getFileStatusCalls);
try {
// Scan the directory via an explicit tree walk.
// This is the baseline for any listing speedups.
MetricDiff treewalkMetadataRequests =
new MetricDiff(fs, OBJECT_METADATA_REQUESTS);
MetricDiff treewalkListRequests = new MetricDiff(fs,
OBJECT_LIST_REQUESTS);
MetricDiff treewalkListStatusCalls = new MetricDiff(fs,
INVOCATION_LIST_FILES);
MetricDiff treewalkGetFileStatusCalls =
new MetricDiff(fs, INVOCATION_GET_FILE_STATUS);
NanoTimer treeWalkTimer = new NanoTimer();
TreeScanResults treewalkResults = treeWalk(fs, listDir);
treeWalkTimer.end("List status via treewalk");
print(LOG,
treewalkMetadataRequests,
treewalkListRequests,
treewalkListStatusCalls,
treewalkGetFileStatusCalls);
assertEquals("Files found in listFiles(recursive=true) " +
" created=" + created + " listed=" + treewalkResults,
created.getFileCount(), treewalkResults.getFileCount());
// listFiles() does the recursion internally
NanoTimer listFilesRecursiveTimer = new NanoTimer();
TreeScanResults listFilesResults = new TreeScanResults(
fs.listFiles(listDir, true));
listFilesRecursiveTimer.end("listFiles(recursive=true) of %s", created);
assertEquals("Files found in listFiles(recursive=true) " +
" created=" + created + " listed=" + listFilesResults,
created.getFileCount(), listFilesResults.getFileCount());
treewalkListRequests.assertDiffEquals(listRequests);
printThenReset(LOG,
metadataRequests, listRequests,
listStatusCalls, getFileStatusCalls);
NanoTimer globStatusTimer = new NanoTimer();
FileStatus[] globStatusFiles = fs.globStatus(listDir);
globStatusTimer.end("Time to globStatus() %s", globStatusTimer);
LOG.info("Time for glob status {} entries: {}",
globStatusFiles.length,
toHuman(createTimer.duration()));
printThenReset(LOG,
metadataRequests,
listRequests,
listStatusCalls,
getFileStatusCalls);
} finally {
// deletion at the end of the run
NanoTimer deleteTimer = new NanoTimer();
fs.delete(listDir, true);
deleteTimer.end("Deleting directory tree");
printThenReset(LOG,
metadataRequests, listRequests,
listStatusCalls, getFileStatusCalls);
}
}
@Test
public void testTimeToStatEmptyDirectory() throws Throwable {
describe("Time to stat an empty directory");
Path path = new Path(getTestPath(), "empty");
fs.mkdirs(path);
timeToStatPath(path);
}
@Test
public void testTimeToStatNonEmptyDirectory() throws Throwable {
describe("Time to stat a non-empty directory");
Path path = new Path(getTestPath(), "dir");
fs.mkdirs(path);
touch(fs, new Path(path, "file"));
timeToStatPath(path);
}
@Test
public void testTimeToStatFile() throws Throwable {
describe("Time to stat a simple file");
Path path = new Path(getTestPath(), "file");
touch(fs, path);
timeToStatPath(path);
}
@Test
public void testTimeToStatRoot() throws Throwable {
describe("Time to stat the root path");
timeToStatPath(new Path("/"));
}
private void timeToStatPath(Path path) throws IOException {
describe("Timing getFileStatus(\"%s\")", path);
MetricDiff metadataRequests =
new MetricDiff(fs, Statistic.OBJECT_METADATA_REQUESTS);
MetricDiff listRequests =
new MetricDiff(fs, Statistic.OBJECT_LIST_REQUESTS);
long attempts = getOperationCount();
NanoTimer timer = new NanoTimer();
for (long l = 0; l < attempts; l++) {
fs.getFileStatus(path);
}
timer.end("Time to execute %d getFileStatusCalls", attempts);
LOG.info("Time per call: {}", toHuman(timer.nanosPerOperation(attempts)));
LOG.info("metadata: {}", metadataRequests);
LOG.info("metadata per operation {}", metadataRequests.diff() / attempts);
LOG.info("listObjects: {}", listRequests);
LOG.info("listObjects: per operation {}", listRequests.diff() / attempts);
}
}

View File

@ -36,8 +36,10 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import static org.apache.hadoop.fs.contract.ContractTestUtils.*;
/**
* Look at the performance of S3a operations
* Look at the performance of S3a operations.
*/
public class TestS3AInputStreamPerformance extends S3AScaleTestBase {
private static final Logger LOG = LoggerFactory.getLogger(
@ -151,7 +153,7 @@ public class TestS3AInputStreamPerformance extends S3AScaleTestBase {
readTimer.end("Time to read %d bytes", len);
bandwidth(readTimer, count);
assertEquals("Not enough bytes were read)", len, count);
long nanosPerByte = readTimer.nanosPerByte(count);
long nanosPerByte = readTimer.nanosPerOperation(count);
LOG.info("An open() call has the equivalent duration of reading {} bytes",
toHuman( timeOpen.duration() / nanosPerByte));
}

View File

@ -15,7 +15,9 @@ log4j.rootLogger=info,stdout
log4j.threshold=ALL
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %-5p %c{2} (%F:%M(%L)) - %m%n
log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} (%F:%M(%L)) - %m%n
log4j.logger.org.apache.hadoop.util.NativeCodeLoader=ERROR
# for debugging low level S3a operations, uncomment this line
# log4j.logger.org.apache.hadoop.fs.s3a=DEBUG