From 043a0c2e6cb1d3708bee254a67c48407c6a06095 Mon Sep 17 00:00:00 2001 From: Chris Nauroth Date: Fri, 3 Jun 2016 08:56:07 -0700 Subject: [PATCH] HADOOP-13171. Add StorageStatistics to S3A; instrument some more operations. Contributed by Steve Loughran. --- .../hadoop/fs/contract/ContractTestUtils.java | 420 +++++++++++++++ .../fs/s3a/ProgressableProgressListener.java | 94 ++++ .../hadoop/fs/s3a/S3AFastOutputStream.java | 65 +-- .../apache/hadoop/fs/s3a/S3AFileStatus.java | 7 + .../apache/hadoop/fs/s3a/S3AFileSystem.java | 507 +++++++++++++----- .../hadoop/fs/s3a/S3AInstrumentation.java | 218 +++++--- .../apache/hadoop/fs/s3a/S3AOutputStream.java | 98 +--- .../hadoop/fs/s3a/S3AStorageStatistics.java | 104 ++++ .../org/apache/hadoop/fs/s3a/S3AUtils.java | 48 ++ .../org/apache/hadoop/fs/s3a/Statistic.java | 143 +++++ .../site/markdown/tools/hadoop-aws/index.md | 12 +- .../apache/hadoop/fs/s3a/S3ATestUtils.java | 153 ++++++ .../fs/s3a/TestS3AFileOperationCost.java | 191 +++++++ .../hadoop/fs/s3a/scale/S3AScaleTestBase.java | 154 ++---- .../fs/s3a/scale/TestS3ADeleteManyFiles.java | 10 +- .../scale/TestS3ADirectoryPerformance.java | 189 +++++++ .../scale/TestS3AInputStreamPerformance.java | 6 +- .../src/test/resources/log4j.properties | 4 +- 18 files changed, 1984 insertions(+), 439 deletions(-) create mode 100644 hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/ProgressableProgressListener.java create mode 100644 hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AStorageStatistics.java create mode 100644 hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java create mode 100644 hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AFileOperationCost.java create mode 100644 hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/TestS3ADirectoryPerformance.java diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/ContractTestUtils.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/ContractTestUtils.java index 6343d40ee8f..20ba0752aaa 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/ContractTestUtils.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/ContractTestUtils.java @@ -22,7 +22,9 @@ import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.io.IOUtils; import org.junit.Assert; import org.junit.internal.AssumptionViolatedException; @@ -34,8 +36,14 @@ import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; +import java.util.HashSet; +import java.util.List; +import java.util.Locale; import java.util.Properties; +import java.util.Set; import java.util.UUID; /** @@ -892,4 +900,416 @@ public static void createAndVerifyFile(FileSystem fs, Path parent, final long fi fs.delete(objectPath, false); } } + + /** + * Make times more readable, by adding a "," every three digits. + * @param nanos nanos or other large number + * @return a string for logging + */ + public static String toHuman(long nanos) { + return String.format(Locale.ENGLISH, "%,d", nanos); + } + + /** + * Log the bandwidth of a timer as inferred from the number of + * bytes processed. + * @param timer timer + * @param bytes bytes processed in the time period + */ + public static void bandwidth(NanoTimer timer, long bytes) { + LOG.info("Bandwidth = {} MB/S", + timer.bandwidthDescription(bytes)); + } + + /** + * Work out the bandwidth in MB/s. + * @param bytes bytes + * @param durationNS duration in nanos + * @return the number of megabytes/second of the recorded operation + */ + public static double bandwidthMBs(long bytes, long durationNS) { + return (bytes * 1000.0) / durationNS; + } + + /** + * Recursively create a directory tree. + * Return the details about the created tree. The files and directories + * are those created under the path, not the base directory created. That + * is retrievable via {@link TreeScanResults#getBasePath()}. + * @param fs filesystem + * @param current parent dir + * @param depth depth of directory tree + * @param width width: subdirs per entry + * @param files number of files per entry + * @param filesize size of files to create in bytes. + * @return the details about the created tree. + * @throws IOException IO Problems + */ + public static TreeScanResults createSubdirs(FileSystem fs, + Path current, + int depth, + int width, + int files, + int filesize) throws IOException { + return createSubdirs(fs, current, depth, width, files, + filesize, "dir-", "file-", "0"); + } + + /** + * Recursively create a directory tree. + * @param fs filesystem + * @param current the current dir in the walk + * @param depth depth of directory tree + * @param width width: subdirs per entry + * @param files number of files per entry + * @param filesize size of files to create in bytes. + * @param dirPrefix prefix for directory entries + * @param filePrefix prefix for file entries + * @param marker string which is slowly built up to uniquely name things + * @return the details about the created tree. + * @throws IOException IO Problems + */ + public static TreeScanResults createSubdirs(FileSystem fs, + Path current, + int depth, + int width, + int files, + int filesize, + String dirPrefix, + String filePrefix, + String marker) throws IOException { + fs.mkdirs(current); + TreeScanResults results = new TreeScanResults(current); + if (depth > 0) { + byte[] data = dataset(filesize, 'a', 'z'); + for (int i = 0; i < files; i++) { + String name = String.format("%s-%s-%04d.txt", filePrefix, marker, i); + Path path = new Path(current, name); + createFile(fs, path, true, data); + results.add(fs, path); + } + for (int w = 0; w < width; w++) { + String marker2 = String.format("%s-%04d", marker, w); + Path child = new Path(current, dirPrefix + marker2); + results.add(createSubdirs(fs, child, depth - 1, width, files, + filesize, dirPrefix, filePrefix, marker2)); + results.add(fs, child); + } + } + return results; + } + + /** + * Predicate to determine if two lists are equivalent, that is, they + * contain the same entries. + * @param left first collection of paths + * @param right second collection of paths + * @return true if all entries are in each collection of path. + */ + public static boolean collectionsEquivalent(Collection left, + Collection right) { + Set leftSet = new HashSet<>(left); + Set rightSet = new HashSet<>(right); + return leftSet.containsAll(right) && rightSet.containsAll(left); + } + + /** + * Predicate to determine if two lists are equivalent, that is, they + * contain the same entries. + * @param left first collection of paths + * @param right second collection of paths + * @return true if all entries are in each collection of path. + */ + public static boolean collectionsEquivalentNoDuplicates(Collection left, + Collection right) { + return collectionsEquivalent(left, right) && + !containsDuplicates(left) && !containsDuplicates(right); + } + + + /** + * Predicate to test for a collection of paths containing duplicate entries. + * @param paths collection of paths + * @return true if there are duplicates. + */ + public static boolean containsDuplicates(Collection paths) { + return new HashSet<>(paths).size() != paths.size(); + } + + /** + * Recursively list all entries, with a depth first traversal of the + * directory tree. + * @param path path + * @return the number of entries listed + * @throws IOException IO problems + */ + public static TreeScanResults treeWalk(FileSystem fs, Path path) + throws IOException { + TreeScanResults dirsAndFiles = new TreeScanResults(); + + FileStatus[] statuses = fs.listStatus(path); + for (FileStatus status : statuses) { + LOG.info("{}{}", status.getPath(), status.isDirectory() ? "*" : ""); + } + for (FileStatus status : statuses) { + dirsAndFiles.add(status); + if (status.isDirectory()) { + dirsAndFiles.add(treeWalk(fs, status.getPath())); + } + } + return dirsAndFiles; + } + + /** + * Results of recursive directory creation/scan operations. + */ + public static final class TreeScanResults { + + private Path basePath; + private final List files = new ArrayList<>(); + private final List directories = new ArrayList<>(); + private final List other = new ArrayList<>(); + + + public TreeScanResults() { + } + + public TreeScanResults(Path basePath) { + this.basePath = basePath; + } + + /** + * Build from a located file status iterator. + * @param results results of the listFiles/listStatus call. + * @throws IOException IO problems during the iteration. + */ + public TreeScanResults(RemoteIterator results) + throws IOException { + while (results.hasNext()) { + add(results.next()); + } + } + + /** + * Construct results from an array of statistics. + * @param stats statistics array. Must not be null. + */ + public TreeScanResults(FileStatus[] stats) { + assertNotNull("Null file status array", stats); + for (FileStatus stat : stats) { + add(stat); + } + } + + /** + * Add all paths in the other set of results to this instance. + * @param that the other instance + * @return this instance + */ + public TreeScanResults add(TreeScanResults that) { + files.addAll(that.files); + directories.addAll(that.directories); + other.addAll(that.other); + return this; + } + + /** + * Increment the counters based on the file status. + * @param status path status to count. + */ + public void add(FileStatus status) { + if (status.isFile()) { + files.add(status.getPath()); + } else if (status.isDirectory()) { + directories.add(status.getPath()); + } else { + other.add(status.getPath()); + } + } + + public void add(FileSystem fs, Path path) throws IOException { + add(fs.getFileStatus(path)); + } + + @Override + public String toString() { + return String.format("%d director%s and %d file%s", + getDirCount(), + getDirCount() == 1 ? "y" : "ies", + getFileCount(), + getFileCount() == 1 ? "" : "s"); + } + + /** + * Assert that the state of a listing has the specific number of files, + * directories and other entries. The error text will include + * the {@code text} param, the field in question, and the entire object's + * string value. + * @param text text prefix for assertions. + * @param f file count + * @param d expected directory count + * @param o expected other entries. + */ + public void assertSizeEquals(String text, long f, long d, long o) { + String self = toString(); + Assert.assertEquals(text + ": file count in " + self, + f, getFileCount()); + Assert.assertEquals(text + ": directory count in " + self, + d, getDirCount()); + Assert.assertEquals(text + ": 'other' count in " + self, + o, getOtherCount()); + } + + /** + * Assert that the trees are equivalent: that every list matches (and + * that neither has any duplicates). + * @param that the other entry + */ + public void assertEquivalent(TreeScanResults that) { + String details = "this= " + this + "; that=" + that; + assertFieldsEquivalent("files", that, files, that.files); + assertFieldsEquivalent("directories", that, + directories, that.directories); + assertFieldsEquivalent("other", that, other, that.other); + } + + /** + * Assert that a field in two instances are equivalent. + * @param fieldname field name for error messages + * @param that the other instance to scan + * @param ours our field's contents + * @param theirs the other instance's field constants + */ + public void assertFieldsEquivalent(String fieldname, + TreeScanResults that, + List ours, List theirs) { + assertFalse("Duplicate " + files + " in " + this, + containsDuplicates(ours)); + assertFalse("Duplicate " + files + " in other " + that, + containsDuplicates(theirs)); + assertTrue(fieldname + " mismatch: between {" + this + "}" + + " and {" + that + "}", + collectionsEquivalent(files, that.files)); + } + + public List getFiles() { + return files; + } + + public List getDirectories() { + return directories; + } + + public List getOther() { + return other; + } + + public Path getBasePath() { + return basePath; + } + + public long getFileCount() { + return files.size(); + } + + public long getDirCount() { + return directories.size(); + } + + public long getOtherCount() { + return other.size(); + } + + /** + * Total count of entries. + * @return the total number of entries + */ + public long totalCount() { + return getFileCount() + getDirCount() + getOtherCount(); + } + + } + + /** + * A simple class for timing operations in nanoseconds, and for + * printing some useful results in the process. + */ + public static final class NanoTimer { + private final long startTime; + private long endTime; + + public NanoTimer() { + startTime = now(); + } + + /** + * End the operation. + * @return the duration of the operation + */ + public long end() { + endTime = now(); + return duration(); + } + + /** + * End the operation; log the duration. + * @param format message + * @param args any arguments + * @return the duration of the operation + */ + public long end(String format, Object... args) { + long d = end(); + LOG.info("Duration of {}: {} nS", + String.format(format, args), toHuman(d)); + return d; + } + + public long now() { + return System.nanoTime(); + } + + public long duration() { + return endTime - startTime; + } + + public double bandwidth(long bytes) { + return bandwidthMBs(bytes, duration()); + } + + /** + * Bandwidth as bytes per second. + * @param bytes bytes in + * @return the number of bytes per second this operation timed. + */ + public double bandwidthBytes(long bytes) { + return (bytes * 1.0) / duration(); + } + + /** + * How many nanoseconds per IOP, byte, etc. + * @param operations operations processed in this time period + * @return the nanoseconds it took each byte to be processed + */ + public long nanosPerOperation(long operations) { + return duration() / operations; + } + + /** + * Get a description of the bandwidth, even down to fractions of + * a MB. + * @param bytes bytes processed + * @return bandwidth + */ + public String bandwidthDescription(long bytes) { + return String.format("%,.6f", bandwidth(bytes)); + } + + public long getStartTime() { + return startTime; + } + + public long getEndTime() { + return endTime; + } + } } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/ProgressableProgressListener.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/ProgressableProgressListener.java new file mode 100644 index 00000000000..0ce022aa885 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/ProgressableProgressListener.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a; + +import com.amazonaws.event.ProgressEvent; +import com.amazonaws.event.ProgressEventType; +import com.amazonaws.event.ProgressListener; +import com.amazonaws.services.s3.transfer.Upload; +import org.apache.hadoop.util.Progressable; +import org.slf4j.Logger; + +import static com.amazonaws.event.ProgressEventType.TRANSFER_COMPLETED_EVENT; +import static com.amazonaws.event.ProgressEventType.TRANSFER_PART_STARTED_EVENT; + +/** + * Listener to progress from AWS regarding transfers. + */ +public class ProgressableProgressListener implements ProgressListener { + private static final Logger LOG = S3AFileSystem.LOG; + private final S3AFileSystem fs; + private final String key; + private final Progressable progress; + private long lastBytesTransferred; + private final Upload upload; + + /** + * Instantiate. + * @param fs filesystem: will be invoked with statistics updates + * @param key key for the upload + * @param upload source of events + * @param progress optional callback for progress. + */ + public ProgressableProgressListener(S3AFileSystem fs, + String key, + Upload upload, + Progressable progress) { + this.fs = fs; + this.key = key; + this.upload = upload; + this.progress = progress; + this.lastBytesTransferred = 0; + } + + @Override + public void progressChanged(ProgressEvent progressEvent) { + if (progress != null) { + progress.progress(); + } + + // There are 3 http ops here, but this should be close enough for now + ProgressEventType pet = progressEvent.getEventType(); + if (pet == TRANSFER_PART_STARTED_EVENT || + pet == TRANSFER_COMPLETED_EVENT) { + fs.incrementWriteOperations(); + } + + long transferred = upload.getProgress().getBytesTransferred(); + long delta = transferred - lastBytesTransferred; + fs.incrementPutProgressStatistics(key, delta); + lastBytesTransferred = transferred; + } + + /** + * Method to invoke after upload has completed. + * This can handle race conditions in setup/teardown. + * @return the number of bytes which were transferred after the notification + */ + public long uploadCompleted() { + long delta = upload.getProgress().getBytesTransferred() - + lastBytesTransferred; + if (delta > 0) { + LOG.debug("S3A write delta changed after finished: {} bytes", delta); + fs.incrementPutProgressStatistics(key, delta); + } + return delta; + } + +} diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFastOutputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFastOutputStream.java index 61a83d4b062..7a985c6f95c 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFastOutputStream.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFastOutputStream.java @@ -35,10 +35,8 @@ import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; -import org.apache.commons.lang.StringUtils; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; -import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.util.Progressable; import org.slf4j.Logger; @@ -54,6 +52,7 @@ import java.util.concurrent.ThreadPoolExecutor; import static org.apache.hadoop.fs.s3a.S3AUtils.*; +import static org.apache.hadoop.fs.s3a.Statistic.*; /** * Upload files/parts asap directly from a memory buffer (instead of buffering @@ -77,8 +76,6 @@ public class S3AFastOutputStream extends OutputStream { private final int multiPartThreshold; private final S3AFileSystem fs; private final CannedAccessControlList cannedACL; - private final FileSystem.Statistics statistics; - private final String serverSideEncryptionAlgorithm; private final ProgressListener progressListener; private final ListeningExecutorService executorService; private MultiPartUpload multiPartUpload; @@ -98,28 +95,28 @@ public class S3AFastOutputStream extends OutputStream { * @param bucket S3 bucket name * @param key S3 key name * @param progress report progress in order to prevent timeouts - * @param statistics track FileSystem.Statistics on the performed operations * @param cannedACL used CannedAccessControlList - * @param serverSideEncryptionAlgorithm algorithm for server side encryption * @param partSize size of a single part in a multi-part upload (except * last part) * @param multiPartThreshold files at least this size use multi-part upload * @param threadPoolExecutor thread factory * @throws IOException on any problem */ - public S3AFastOutputStream(AmazonS3Client client, S3AFileSystem fs, - String bucket, String key, Progressable progress, - FileSystem.Statistics statistics, CannedAccessControlList cannedACL, - String serverSideEncryptionAlgorithm, long partSize, - long multiPartThreshold, ThreadPoolExecutor threadPoolExecutor) + public S3AFastOutputStream(AmazonS3Client client, + S3AFileSystem fs, + String bucket, + String key, + Progressable progress, + CannedAccessControlList cannedACL, + long partSize, + long multiPartThreshold, + ThreadPoolExecutor threadPoolExecutor) throws IOException { this.bucket = bucket; this.key = key; this.client = client; this.fs = fs; this.cannedACL = cannedACL; - this.statistics = statistics; - this.serverSideEncryptionAlgorithm = serverSideEncryptionAlgorithm; //Ensure limit as ByteArrayOutputStream size cannot exceed Integer.MAX_VALUE if (partSize > Integer.MAX_VALUE) { this.partSize = Integer.MAX_VALUE; @@ -246,16 +243,17 @@ public synchronized void close() throws IOException { if (multiPartUpload == null) { putObject(); } else { - if (buffer.size() > 0) { + int size = buffer.size(); + if (size > 0) { + fs.incrementPutStartStatistics(size); //send last part multiPartUpload.uploadPartAsync(new ByteArrayInputStream(buffer - .toByteArray()), buffer.size()); + .toByteArray()), size); } final List partETags = multiPartUpload .waitForAllPartUploads(); multiPartUpload.complete(partETags); } - statistics.incrementWriteOps(1); // This will delete unnecessary fake parent directories fs.finishedWrite(key); LOG.debug("Upload complete for bucket '{}' key '{}'", bucket, key); @@ -265,18 +263,19 @@ public synchronized void close() throws IOException { } } + /** + * Create the default metadata for a multipart upload operation. + * @return the metadata to use/extend. + */ private ObjectMetadata createDefaultMetadata() { - ObjectMetadata om = new ObjectMetadata(); - if (StringUtils.isNotBlank(serverSideEncryptionAlgorithm)) { - om.setSSEAlgorithm(serverSideEncryptionAlgorithm); - } - return om; + return fs.newObjectMetadata(); } private MultiPartUpload initiateMultiPartUpload() throws IOException { - final ObjectMetadata om = createDefaultMetadata(); final InitiateMultipartUploadRequest initiateMPURequest = - new InitiateMultipartUploadRequest(bucket, key, om); + new InitiateMultipartUploadRequest(bucket, + key, + createDefaultMetadata()); initiateMPURequest.setCannedACL(cannedACL); try { return new MultiPartUpload( @@ -290,15 +289,18 @@ private void putObject() throws IOException { LOG.debug("Executing regular upload for bucket '{}' key '{}'", bucket, key); final ObjectMetadata om = createDefaultMetadata(); - om.setContentLength(buffer.size()); - final PutObjectRequest putObjectRequest = new PutObjectRequest(bucket, key, - new ByteArrayInputStream(buffer.toByteArray()), om); - putObjectRequest.setCannedAcl(cannedACL); + final int size = buffer.size(); + om.setContentLength(size); + final PutObjectRequest putObjectRequest = + fs.newPutObjectRequest(key, + om, + new ByteArrayInputStream(buffer.toByteArray())); putObjectRequest.setGeneralProgressListener(progressListener); ListenableFuture putObjectResult = executorService.submit(new Callable() { @Override public PutObjectResult call() throws Exception { + fs.incrementPutStartStatistics(size); return client.putObject(putObjectRequest); } }); @@ -306,7 +308,7 @@ public PutObjectResult call() throws Exception { try { putObjectResult.get(); } catch (InterruptedException ie) { - LOG.warn("Interrupted object upload:" + ie, ie); + LOG.warn("Interrupted object upload: {}", ie, ie); Thread.currentThread().interrupt(); } catch (ExecutionException ee) { throw extractException("regular upload", key, ee); @@ -339,7 +341,7 @@ private void uploadPartAsync(ByteArrayInputStream inputStream, public PartETag call() throws Exception { LOG.debug("Uploading part {} for id '{}'", currentPartNumber, uploadId); - return client.uploadPart(request).getPartETag(); + return fs.uploadPart(request).getPartETag(); } }); partETagsFutures.add(partETagFuture); @@ -349,7 +351,7 @@ private List waitForAllPartUploads() throws IOException { try { return Futures.allAsList(partETagsFutures).get(); } catch (InterruptedException ie) { - LOG.warn("Interrupted partUpload:" + ie, ie); + LOG.warn("Interrupted partUpload: {}", ie, ie); Thread.currentThread().interrupt(); return null; } catch (ExecutionException ee) { @@ -382,11 +384,12 @@ private void complete(List partETags) throws IOException { public void abort() { LOG.warn("Aborting multi-part upload with id '{}'", uploadId); try { + fs.incrementStatistic(OBJECT_MULTIPART_UPLOAD_ABORTED); client.abortMultipartUpload(new AbortMultipartUploadRequest(bucket, key, uploadId)); } catch (Exception e2) { LOG.warn("Unable to abort multipart upload, you may need to purge " + - "uploaded parts: " + e2, e2); + "uploaded parts: {}", e2, e2); } } } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileStatus.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileStatus.java index 9ecca332663..75a650073b3 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileStatus.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileStatus.java @@ -93,4 +93,11 @@ public long getModificationTime(){ return super.getModificationTime(); } } + + @Override + public String toString() { + return super.toString() + + String.format(" isEmptyDirectory=%s", isEmptyDirectory()); + } + } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java index a4c0c25c9a9..d392d8efe34 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java @@ -45,7 +45,6 @@ import com.amazonaws.services.s3.S3ClientOptions; import com.amazonaws.services.s3.model.AmazonS3Exception; import com.amazonaws.services.s3.model.CannedAccessControlList; -import com.amazonaws.services.s3.model.DeleteObjectRequest; import com.amazonaws.services.s3.model.DeleteObjectsRequest; import com.amazonaws.services.s3.model.ListObjectsRequest; import com.amazonaws.services.s3.model.ObjectListing; @@ -53,6 +52,8 @@ import com.amazonaws.services.s3.model.PutObjectRequest; import com.amazonaws.services.s3.model.CopyObjectRequest; import com.amazonaws.services.s3.model.S3ObjectSummary; +import com.amazonaws.services.s3.model.UploadPartRequest; +import com.amazonaws.services.s3.model.UploadPartResult; import com.amazonaws.services.s3.transfer.Copy; import com.amazonaws.services.s3.transfer.TransferManager; import com.amazonaws.services.s3.transfer.TransferManagerConfiguration; @@ -71,8 +72,13 @@ import org.apache.hadoop.fs.FileAlreadyExistsException; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.GlobalStorageStatistics; import org.apache.hadoop.fs.LocalFileSystem; +import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathFilter; +import org.apache.hadoop.fs.RemoteIterator; +import org.apache.hadoop.fs.StorageStatistics; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.security.ProviderUtils; import org.apache.hadoop.util.Progressable; @@ -80,6 +86,7 @@ import static org.apache.hadoop.fs.s3a.Constants.*; import static org.apache.hadoop.fs.s3a.S3AUtils.*; +import static org.apache.hadoop.fs.s3a.Statistic.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -118,6 +125,7 @@ public class S3AFileSystem extends FileSystem { private CannedAccessControlList cannedACL; private String serverSideEncryptionAlgorithm; private S3AInstrumentation instrumentation; + private S3AStorageStatistics storageStatistics; private long readAhead; // The maximum number of entries that can be deleted in any call to s3 @@ -237,6 +245,15 @@ public void initialize(URI name, Configuration conf) throws IOException { enableMultiObjectsDelete = conf.getBoolean(ENABLE_MULTI_DELETE, true); readAhead = longOption(conf, READAHEAD_RANGE, DEFAULT_READAHEAD_RANGE, 0); + storageStatistics = (S3AStorageStatistics) + GlobalStorageStatistics.INSTANCE + .put(S3AStorageStatistics.NAME, + new GlobalStorageStatistics.StorageStatisticsProvider() { + @Override + public StorageStatistics provide() { + return new S3AStorageStatistics(); + } + }); int maxThreads = intOption(conf, MAX_THREADS, DEFAULT_MAX_THREADS, 0); int coreThreads = intOption(conf, CORE_THREADS, DEFAULT_CORE_THREADS, 0); @@ -345,6 +362,14 @@ void initProxySupport(Configuration conf, ClientConfiguration awsConf, } } + /** + * Get S3A Instrumentation. For test purposes. + * @return this instance's instrumentation. + */ + public S3AInstrumentation getInstrumentation() { + return instrumentation; + } + /** * Initializes the User-Agent header to send in HTTP requests to the S3 * back-end. We always include the Hadoop version number. The user also may @@ -621,23 +646,26 @@ public FSDataOutputStream create(Path f, FsPermission permission, } instrumentation.fileCreated(); if (getConf().getBoolean(FAST_UPLOAD, DEFAULT_FAST_UPLOAD)) { - return new FSDataOutputStream(new S3AFastOutputStream(s3, this, bucket, - key, progress, statistics, cannedACL, - serverSideEncryptionAlgorithm, partSize, multiPartThreshold, - threadPoolExecutor), statistics); + return new FSDataOutputStream( + new S3AFastOutputStream(s3, + this, + bucket, + key, + progress, + cannedACL, + partSize, + multiPartThreshold, + threadPoolExecutor), + statistics); } // We pass null to FSDataOutputStream so it won't count writes that // are being buffered to a file return new FSDataOutputStream( new S3AOutputStream(getConf(), - transfers, this, - bucket, key, - progress, - cannedACL, - statistics, - serverSideEncryptionAlgorithm), + progress + ), null); } @@ -693,6 +721,7 @@ public boolean rename(Path src, Path dst) throws IOException { private boolean innerRename(Path src, Path dst) throws IOException, AmazonClientException { LOG.debug("Rename path {} to {}", src, dst); + incrementStatistic(INVOCATION_RENAME); String srcKey = pathToKey(src); String dstKey = pathToKey(dst); @@ -793,8 +822,7 @@ private boolean innerRename(Path src, Path dst) throws IOException, request.setPrefix(srcKey); request.setMaxKeys(maxKeys); - ObjectListing objects = s3.listObjects(request); - statistics.incrementReadOps(1); + ObjectListing objects = listObjects(request); while (true) { for (S3ObjectSummary summary : objects.getObjectSummaries()) { @@ -808,8 +836,7 @@ private boolean innerRename(Path src, Path dst) throws IOException, } if (objects.isTruncated()) { - objects = s3.listNextBatchOfObjects(objects); - statistics.incrementReadOps(1); + objects = continueListObjects(objects); } else { if (!keysToDelete.isEmpty()) { removeKeys(keysToDelete, false); @@ -837,17 +864,223 @@ public ObjectMetadata getObjectMetadata(Path path) throws IOException { return getObjectMetadata(pathToKey(path)); } + /** + * Increment a statistic by 1. + * @param statistic The operation to increment + */ + protected void incrementStatistic(Statistic statistic) { + incrementStatistic(statistic, 1); + } + + /** + * Increment a statistic by a specific value. + * @param statistic The operation to increment + * @param count the count to increment + */ + protected void incrementStatistic(Statistic statistic, long count) { + instrumentation.incrementCounter(statistic, count); + storageStatistics.incrementCounter(statistic, count); + } + /** * Request object metadata; increments counters in the process. * @param key key * @return the metadata */ - private ObjectMetadata getObjectMetadata(String key) { + protected ObjectMetadata getObjectMetadata(String key) { + incrementStatistic(OBJECT_METADATA_REQUESTS); ObjectMetadata meta = s3.getObjectMetadata(bucket, key); - statistics.incrementReadOps(1); + incrementReadOperations(); return meta; } + /** + * Initiate a {@code listObjects} operation, incrementing metrics + * in the process. + * @param request request to initiate + * @return the results + */ + protected ObjectListing listObjects(ListObjectsRequest request) { + incrementStatistic(OBJECT_LIST_REQUESTS); + incrementReadOperations(); + return s3.listObjects(request); + } + + /** + * List the next set of objects. + * @param objects paged result + * @return the next result object + */ + protected ObjectListing continueListObjects(ObjectListing objects) { + incrementStatistic(OBJECT_LIST_REQUESTS); + incrementReadOperations(); + return s3.listNextBatchOfObjects(objects); + } + + /** + * Increment read operations. + */ + public void incrementReadOperations() { + statistics.incrementReadOps(1); + } + + /** + * Increment the write operation counter. + * This is somewhat inaccurate, as it appears to be invoked more + * often than needed in progress callbacks. + */ + public void incrementWriteOperations() { + statistics.incrementWriteOps(1); + } + + /** + * Delete an object. + * Increments the {@code OBJECT_DELETE_REQUESTS} and write + * operation statistics. + * @param key key to blob to delete. + */ + private void deleteObject(String key) { + incrementWriteOperations(); + incrementStatistic(OBJECT_DELETE_REQUESTS); + s3.deleteObject(bucket, key); + } + + /** + * Perform a bulk object delete operation. + * Increments the {@code OBJECT_DELETE_REQUESTS} and write + * operation statistics. + * @param deleteRequest keys to delete on the s3-backend + */ + private void deleteObjects(DeleteObjectsRequest deleteRequest) { + incrementWriteOperations(); + incrementStatistic(OBJECT_DELETE_REQUESTS, 1); + s3.deleteObjects(deleteRequest); + } + + /** + * Create a putObject request. + * Adds the ACL and metadata + * @param key key of object + * @param metadata metadata header + * @param srcfile source file + * @return the request + */ + public PutObjectRequest newPutObjectRequest(String key, + ObjectMetadata metadata, File srcfile) { + PutObjectRequest putObjectRequest = new PutObjectRequest(bucket, key, + srcfile); + putObjectRequest.setCannedAcl(cannedACL); + putObjectRequest.setMetadata(metadata); + return putObjectRequest; + } + + /** + * Create a {@link PutObjectRequest} request. + * The metadata is assumed to have been configured with the size of the + * operation. + * @param key key of object + * @param metadata metadata header + * @param inputStream source data. + * @return the request + */ + PutObjectRequest newPutObjectRequest(String key, + ObjectMetadata metadata, InputStream inputStream) { + PutObjectRequest putObjectRequest = new PutObjectRequest(bucket, key, + inputStream, metadata); + putObjectRequest.setCannedAcl(cannedACL); + return putObjectRequest; + } + + /** + * Create a new object metadata instance. + * Any standard metadata headers are added here, for example: + * encryption. + * @return a new metadata instance + */ + public ObjectMetadata newObjectMetadata() { + final ObjectMetadata om = new ObjectMetadata(); + if (StringUtils.isNotBlank(serverSideEncryptionAlgorithm)) { + om.setSSEAlgorithm(serverSideEncryptionAlgorithm); + } + return om; + } + + /** + * Create a new object metadata instance. + * Any standard metadata headers are added here, for example: + * encryption. + * + * @param length length of data to set in header. + * @return a new metadata instance + */ + public ObjectMetadata newObjectMetadata(long length) { + final ObjectMetadata om = newObjectMetadata(); + om.setContentLength(length); + return om; + } + + /** + * PUT an object, incrementing the put requests and put bytes + * counters. + * It does not update the other counters, + * as existing code does that as progress callbacks come in. + * Byte length is calculated from the file length, or, if there is no + * file, from the content length of the header. + * @param putObjectRequest the request + * @return the upload initiated + */ + public Upload putObject(PutObjectRequest putObjectRequest) { + long len; + if (putObjectRequest.getFile() != null) { + len = putObjectRequest.getFile().length(); + } else { + len = putObjectRequest.getMetadata().getContentLength(); + } + incrementPutStartStatistics(len); + return transfers.upload(putObjectRequest); + } + + /** + * Upload part of a multi-partition file. + * Increments the write and put counters + * @param request request + * @return the result of the operation. + */ + public UploadPartResult uploadPart(UploadPartRequest request) { + incrementPutStartStatistics(request.getPartSize()); + return s3.uploadPart(request); + } + + /** + * At the start of a put/multipart upload operation, update the + * relevant counters. + * + * @param bytes bytes in the request. + */ + public void incrementPutStartStatistics(long bytes) { + LOG.debug("PUT start {} bytes", bytes); + incrementWriteOperations(); + incrementStatistic(OBJECT_PUT_REQUESTS); + if (bytes > 0) { + incrementStatistic(OBJECT_PUT_BYTES, bytes); + } + } + + /** + * Callback for use in progress callbacks from put/multipart upload events. + * Increments those statistics which are expected to be updated during + * the ongoing upload operation. + * @param key key to file that is being written (for logging) + * @param bytes bytes successfully uploaded. + */ + public void incrementPutProgressStatistics(String key, long bytes) { + LOG.debug("PUT {}: {} bytes", key, bytes); + incrementWriteOperations(); + if (bytes > 0) { + statistics.incrementBytesWritten(bytes); + } + } + /** * A helper method to delete a list of keys on a s3-backend. * @@ -858,21 +1091,13 @@ private ObjectMetadata getObjectMetadata(String key) { private void removeKeys(List keysToDelete, boolean clearKeys) throws AmazonClientException { if (enableMultiObjectsDelete) { - DeleteObjectsRequest deleteRequest - = new DeleteObjectsRequest(bucket).withKeys(keysToDelete); - s3.deleteObjects(deleteRequest); + deleteObjects(new DeleteObjectsRequest(bucket).withKeys(keysToDelete)); instrumentation.fileDeleted(keysToDelete.size()); - statistics.incrementWriteOps(1); } else { - int writeops = 0; - for (DeleteObjectsRequest.KeyVersion keyVersion : keysToDelete) { - s3.deleteObject( - new DeleteObjectRequest(bucket, keyVersion.getKey())); - writeops++; + deleteObject(keyVersion.getKey()); } instrumentation.fileDeleted(keysToDelete.size()); - statistics.incrementWriteOps(writeops); } if (clearKeys) { keysToDelete.clear(); @@ -942,9 +1167,8 @@ private boolean innerDelete(Path f, boolean recursive) throws IOException, if (status.isEmptyDirectory()) { LOG.debug("Deleting fake empty directory {}", key); - s3.deleteObject(bucket, key); + deleteObject(key); instrumentation.directoryDeleted(); - statistics.incrementWriteOps(1); } else { LOG.debug("Getting objects for directory prefix {} to delete", key); @@ -955,9 +1179,9 @@ private boolean innerDelete(Path f, boolean recursive) throws IOException, //request.setDelimiter("/"); request.setMaxKeys(maxKeys); - List keys = new ArrayList<>(); - ObjectListing objects = s3.listObjects(request); - statistics.incrementReadOps(1); + ObjectListing objects = listObjects(request); + List keys = + new ArrayList<>(objects.getObjectSummaries().size()); while (true) { for (S3ObjectSummary summary : objects.getObjectSummaries()) { keys.add(new DeleteObjectsRequest.KeyVersion(summary.getKey())); @@ -969,8 +1193,7 @@ private boolean innerDelete(Path f, boolean recursive) throws IOException, } if (objects.isTruncated()) { - objects = s3.listNextBatchOfObjects(objects); - statistics.incrementReadOps(1); + objects = continueListObjects(objects); } else { if (!keys.isEmpty()) { removeKeys(keys, false); @@ -981,13 +1204,11 @@ private boolean innerDelete(Path f, boolean recursive) throws IOException, } } else { LOG.debug("delete: Path is a file"); - s3.deleteObject(bucket, key); instrumentation.fileDeleted(1); - statistics.incrementWriteOps(1); + deleteObject(key); } createFakeDirectoryIfNecessary(f.getParent()); - return true; } @@ -996,7 +1217,7 @@ private void createFakeDirectoryIfNecessary(Path f) String key = pathToKey(f); if (!key.isEmpty() && !exists(f)) { LOG.debug("Creating new fake directory at {}", f); - createFakeDirectory(bucket, key); + createFakeDirectory(key); } } @@ -1032,6 +1253,7 @@ public FileStatus[] innerListStatus(Path f) throws FileNotFoundException, IOException, AmazonClientException { String key = pathToKey(f); LOG.debug("List status for path: {}", f); + incrementStatistic(INVOCATION_LIST_STATUS); final List result = new ArrayList(); final FileStatus fileStatus = getFileStatus(f); @@ -1049,8 +1271,7 @@ public FileStatus[] innerListStatus(Path f) throws FileNotFoundException, LOG.debug("listStatus: doing listObjects for directory {}", key); - ObjectListing objects = s3.listObjects(request); - statistics.incrementReadOps(1); + ObjectListing objects = listObjects(request); Path fQualified = f.makeQualified(uri, workingDir); @@ -1061,33 +1282,25 @@ public FileStatus[] innerListStatus(Path f) throws FileNotFoundException, if (keyPath.equals(fQualified) || summary.getKey().endsWith(S3N_FOLDER_SUFFIX)) { LOG.debug("Ignoring: {}", keyPath); - continue; - } - - if (objectRepresentsDirectory(summary.getKey(), summary.getSize())) { - result.add(new S3AFileStatus(true, true, keyPath)); - LOG.debug("Adding: fd: {}", keyPath); } else { - result.add(new S3AFileStatus(summary.getSize(), - dateToLong(summary.getLastModified()), keyPath, - getDefaultBlockSize(fQualified))); - LOG.debug("Adding: fi: {}", keyPath); + S3AFileStatus status = createFileStatus(keyPath, summary, + getDefaultBlockSize(keyPath)); + result.add(status); + LOG.debug("Adding: {}", status); } } for (String prefix : objects.getCommonPrefixes()) { Path keyPath = keyToPath(prefix).makeQualified(uri, workingDir); - if (keyPath.equals(f)) { - continue; + if (!keyPath.equals(f)) { + result.add(new S3AFileStatus(true, false, keyPath)); + LOG.debug("Adding: rd: {}", keyPath); } - result.add(new S3AFileStatus(true, false, keyPath)); - LOG.debug("Adding: rd: {}", keyPath); } if (objects.isTruncated()) { LOG.debug("listStatus: list truncated - getting next batch"); - objects = s3.listNextBatchOfObjects(objects); - statistics.incrementReadOps(1); + objects = continueListObjects(objects); } else { break; } @@ -1100,8 +1313,6 @@ public FileStatus[] innerListStatus(Path f) throws FileNotFoundException, return result.toArray(new FileStatus[result.size()]); } - - /** * Set the current working directory for the given file system. All relative * paths will be resolved relative to it. @@ -1123,7 +1334,7 @@ public Path getWorkingDirectory() { /** * * Make the given path and all non-existent parents into - * directories. Has the semantics of Unix @{code 'mkdir -p'}. + * directories. Has the semantics of Unix {@code 'mkdir -p'}. * Existence of the directory hierarchy is not an error. * @param path path to create * @param permission to apply to f @@ -1158,7 +1369,7 @@ public boolean mkdirs(Path path, FsPermission permission) throws IOException, private boolean innerMkdirs(Path f, FsPermission permission) throws IOException, FileAlreadyExistsException, AmazonClientException { LOG.debug("Making directory: {}", f); - + incrementStatistic(INVOCATION_MKDIRS); try { FileStatus fileStatus = getFileStatus(f); @@ -1187,7 +1398,7 @@ private boolean innerMkdirs(Path f, FsPermission permission) } while (fPart != null); String key = pathToKey(f); - createFakeDirectory(bucket, key); + createFakeDirectory(key); return true; } } @@ -1201,12 +1412,12 @@ private boolean innerMkdirs(Path f, FsPermission permission) */ public S3AFileStatus getFileStatus(Path f) throws IOException { String key = pathToKey(f); + incrementStatistic(INVOCATION_GET_FILE_STATUS); LOG.debug("Getting path status for {} ({})", f , key); if (!key.isEmpty()) { try { - ObjectMetadata meta = s3.getObjectMetadata(bucket, key); - statistics.incrementReadOps(1); + ObjectMetadata meta = getObjectMetadata(key); if (objectRepresentsDirectory(key, meta.getContentLength())) { LOG.debug("Found exact file: fake directory"); @@ -1231,8 +1442,7 @@ public S3AFileStatus getFileStatus(Path f) throws IOException { if (!key.endsWith("/")) { String newKey = key + "/"; try { - ObjectMetadata meta = s3.getObjectMetadata(bucket, newKey); - statistics.incrementReadOps(1); + ObjectMetadata meta = getObjectMetadata(newKey); if (objectRepresentsDirectory(newKey, meta.getContentLength())) { LOG.debug("Found file (with /): fake directory"); @@ -1265,8 +1475,7 @@ public S3AFileStatus getFileStatus(Path f) throws IOException { request.setDelimiter("/"); request.setMaxKeys(1); - ObjectListing objects = s3.listObjects(request); - statistics.incrementReadOps(1); + ObjectListing objects = listObjects(request); if (!objects.getCommonPrefixes().isEmpty() || !objects.getObjectSummaries().isEmpty()) { @@ -1349,7 +1558,8 @@ public void copyFromLocalFile(boolean delSrc, boolean overwrite, Path src, private void innerCopyFromLocalFile(boolean delSrc, boolean overwrite, Path src, Path dst) throws IOException, FileAlreadyExistsException, AmazonClientException { - String key = pathToKey(dst); + incrementStatistic(INVOCATION_COPY_FROM_LOCAL_FILE); + final String key = pathToKey(dst); if (!overwrite && exists(dst)) { throw new FileAlreadyExistsException(dst + " already exists"); @@ -1360,35 +1570,19 @@ private void innerCopyFromLocalFile(boolean delSrc, boolean overwrite, LocalFileSystem local = getLocal(getConf()); File srcfile = local.pathToFile(src); - final ObjectMetadata om = new ObjectMetadata(); - if (StringUtils.isNotBlank(serverSideEncryptionAlgorithm)) { - om.setSSEAlgorithm(serverSideEncryptionAlgorithm); - } - PutObjectRequest putObjectRequest = new PutObjectRequest(bucket, key, srcfile); - putObjectRequest.setCannedAcl(cannedACL); - putObjectRequest.setMetadata(om); - - ProgressListener progressListener = new ProgressListener() { - public void progressChanged(ProgressEvent progressEvent) { - switch (progressEvent.getEventType()) { - case TRANSFER_PART_COMPLETED_EVENT: - statistics.incrementWriteOps(1); - break; - default: - break; - } - } - }; - - statistics.incrementWriteOps(1); - Upload up = transfers.upload(putObjectRequest); - up.addProgressListener(progressListener); + final ObjectMetadata om = newObjectMetadata(); + PutObjectRequest putObjectRequest = newPutObjectRequest(key, om, srcfile); + Upload up = putObject(putObjectRequest); + ProgressableProgressListener listener = new ProgressableProgressListener( + this, key, up, null); + up.addProgressListener(listener); try { up.waitForUploadResult(); } catch (InterruptedException e) { throw new InterruptedIOException("Interrupted copying " + src + " to " + dst + ", cancelling"); } + listener.uploadCompleted(); // This will delete unnecessary fake parent directories finishedWrite(key); @@ -1437,7 +1631,7 @@ private void copyFile(String srcKey, String dstKey, long size) LOG.debug("copyFile {} -> {} ", srcKey, dstKey); try { - ObjectMetadata srcom = s3.getObjectMetadata(bucket, srcKey); + ObjectMetadata srcom = getObjectMetadata(srcKey); ObjectMetadata dstom = cloneObjectMetadata(srcom); if (StringUtils.isNotBlank(serverSideEncryptionAlgorithm)) { dstom.setSSEAlgorithm(serverSideEncryptionAlgorithm); @@ -1451,7 +1645,7 @@ private void copyFile(String srcKey, String dstKey, long size) public void progressChanged(ProgressEvent progressEvent) { switch (progressEvent.getEventType()) { case TRANSFER_PART_COMPLETED_EVENT: - statistics.incrementWriteOps(1); + incrementWriteOperations(); break; default: break; @@ -1463,7 +1657,7 @@ public void progressChanged(ProgressEvent progressEvent) { copy.addProgressListener(progressListener); try { copy.waitForCopyResult(); - statistics.incrementWriteOps(1); + incrementWriteOperations(); instrumentation.filesCopied(1, size); } catch (InterruptedException e) { throw new InterruptedIOException("Interrupted copying " + srcKey @@ -1475,26 +1669,12 @@ public void progressChanged(ProgressEvent progressEvent) { } } - private boolean objectRepresentsDirectory(final String name, final long size) { - return !name.isEmpty() - && name.charAt(name.length() - 1) == '/' - && size == 0L; - } - - // Handles null Dates that can be returned by AWS - private static long dateToLong(final Date date) { - if (date == null) { - return 0L; - } - - return date.getTime(); - } - /** * Perform post-write actions. * @param key key written to */ public void finishedWrite(String key) { + LOG.debug("Finished write to {}", key); deleteUnnecessaryFakeDirectories(keyToPath(key).getParent()); } @@ -1516,8 +1696,7 @@ private void deleteUnnecessaryFakeDirectories(Path f) { if (status.isDirectory() && status.isEmptyDirectory()) { LOG.debug("Deleting fake directory {}/", key); - s3.deleteObject(bucket, key + "/"); - statistics.incrementWriteOps(1); + deleteObject(key + "/"); } } catch (IOException | AmazonClientException e) { LOG.debug("While deleting key {} ", key, e); @@ -1533,18 +1712,20 @@ private void deleteUnnecessaryFakeDirectories(Path f) { } - private void createFakeDirectory(final String bucketName, final String objectName) - throws AmazonClientException, AmazonServiceException { + private void createFakeDirectory(final String objectName) + throws AmazonClientException, AmazonServiceException, + InterruptedIOException { if (!objectName.endsWith("/")) { - createEmptyObject(bucketName, objectName + "/"); + createEmptyObject(objectName + "/"); } else { - createEmptyObject(bucketName, objectName); + createEmptyObject(objectName); } } // Used to create an empty file that represents an empty directory - private void createEmptyObject(final String bucketName, final String objectName) - throws AmazonClientException, AmazonServiceException { + private void createEmptyObject(final String objectName) + throws AmazonClientException, AmazonServiceException, + InterruptedIOException { final InputStream im = new InputStream() { @Override public int read() throws IOException { @@ -1552,16 +1733,16 @@ public int read() throws IOException { } }; - final ObjectMetadata om = new ObjectMetadata(); - om.setContentLength(0L); - if (StringUtils.isNotBlank(serverSideEncryptionAlgorithm)) { - om.setSSEAlgorithm(serverSideEncryptionAlgorithm); + PutObjectRequest putObjectRequest = newPutObjectRequest(objectName, + newObjectMetadata(0L), + im); + Upload upload = putObject(putObjectRequest); + try { + upload.waitForUploadResult(); + } catch (InterruptedException e) { + throw new InterruptedIOException("Interrupted creating " + objectName); } - PutObjectRequest putObjectRequest = - new PutObjectRequest(bucketName, objectName, im, om); - putObjectRequest.setCannedAcl(cannedACL); - s3.putObject(putObjectRequest); - statistics.incrementWriteOps(1); + incrementPutProgressStatistics(objectName, 0); instrumentation.directoryCreated(); } @@ -1576,10 +1757,7 @@ private ObjectMetadata cloneObjectMetadata(ObjectMetadata source) { // This approach may be too brittle, especially if // in future there are new attributes added to ObjectMetadata // that we do not explicitly call to set here - ObjectMetadata ret = new ObjectMetadata(); - - // Non null attributes - ret.setContentLength(source.getContentLength()); + ObjectMetadata ret = newObjectMetadata(source.getContentLength()); // Possibly null attributes // Allowing nulls to pass breaks it during later use @@ -1688,6 +1866,75 @@ public long getMultiPartThreshold() { return multiPartThreshold; } + /** + * Override superclass so as to add statistic collection. + * {@inheritDoc} + */ + @Override + public FileStatus[] globStatus(Path pathPattern) throws IOException { + incrementStatistic(INVOCATION_GLOB_STATUS); + return super.globStatus(pathPattern); + } + + /** + * Override superclass so as to add statistic collection. + * {@inheritDoc} + */ + @Override + public FileStatus[] globStatus(Path pathPattern, PathFilter filter) + throws IOException { + incrementStatistic(INVOCATION_GLOB_STATUS); + return super.globStatus(pathPattern, filter); + } + + /** + * Override superclass so as to add statistic collection. + * {@inheritDoc} + */ + @Override + public RemoteIterator listLocatedStatus(Path f) + throws FileNotFoundException, IOException { + incrementStatistic(INVOCATION_LIST_LOCATED_STATUS); + return super.listLocatedStatus(f); + } + + @Override + public RemoteIterator listFiles(Path f, + boolean recursive) throws FileNotFoundException, IOException { + incrementStatistic(INVOCATION_LIST_FILES); + return super.listFiles(f, recursive); + } + + /** + * Override superclass so as to add statistic collection. + * {@inheritDoc} + */ + @Override + public boolean exists(Path f) throws IOException { + incrementStatistic(INVOCATION_EXISTS); + return super.exists(f); + } + + /** + * Override superclass so as to add statistic collection. + * {@inheritDoc} + */ + @Override + public boolean isDirectory(Path f) throws IOException { + incrementStatistic(INVOCATION_IS_DIRECTORY); + return super.isDirectory(f); + } + + /** + * Override superclass so as to add statistic collection. + * {@inheritDoc} + */ + @Override + public boolean isFile(Path f) throws IOException { + incrementStatistic(INVOCATION_IS_FILE); + return super.isFile(f); + } + /** * Get a integer option >= the minimum allowed value. * @param conf configuration diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java index 285f2284b6c..8892f0e5cfa 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java @@ -18,6 +18,7 @@ package org.apache.hadoop.fs.s3a; +import com.google.common.base.Preconditions; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.metrics2.MetricStringBuilder; @@ -26,49 +27,30 @@ import org.apache.hadoop.metrics2.lib.MetricsRegistry; import org.apache.hadoop.metrics2.lib.MutableCounterLong; import org.apache.hadoop.metrics2.lib.MutableGaugeLong; +import org.apache.hadoop.metrics2.lib.MutableMetric; import java.net.URI; import java.util.HashMap; import java.util.Map; import java.util.UUID; +import static org.apache.hadoop.fs.s3a.Statistic.*; + /** * Instrumentation of S3a. - * Derived from the {@code AzureFileSystemInstrumentation} + * Derived from the {@code AzureFileSystemInstrumentation}. + * + * Counters and metrics are generally addressed in code by their name or + * {@link Statistic} key. There may be some Statistics which do + * not have an entry here. To avoid attempts to access such counters failing, + * the operations to increment/query metric values are designed to handle + * lookup failures. */ @Metrics(about = "Metrics for S3a", context = "S3AFileSystem") @InterfaceAudience.Private @InterfaceStability.Evolving public class S3AInstrumentation { public static final String CONTEXT = "S3AFileSystem"; - - public static final String STREAM_OPENED = "streamOpened"; - public static final String STREAM_CLOSE_OPERATIONS = "streamCloseOperations"; - public static final String STREAM_CLOSED = "streamClosed"; - public static final String STREAM_ABORTED = "streamAborted"; - public static final String STREAM_READ_EXCEPTIONS = "streamReadExceptions"; - public static final String STREAM_SEEK_OPERATIONS = "streamSeekOperations"; - public static final String STREAM_FORWARD_SEEK_OPERATIONS - = "streamForwardSeekOperations"; - public static final String STREAM_BACKWARD_SEEK_OPERATIONS - = "streamBackwardSeekOperations"; - public static final String STREAM_SEEK_BYTES_SKIPPED = - "streamBytesSkippedOnSeek"; - public static final String STREAM_SEEK_BYTES_BACKWARDS = - "streamBytesBackwardsOnSeek"; - public static final String STREAM_SEEK_BYTES_READ = "streamBytesRead"; - public static final String STREAM_READ_OPERATIONS = "streamReadOperations"; - public static final String STREAM_READ_FULLY_OPERATIONS - = "streamReadFullyOperations"; - public static final String STREAM_READ_OPERATIONS_INCOMPLETE - = "streamReadOperationsIncomplete"; - public static final String FILES_CREATED = "files_created"; - public static final String FILES_COPIED = "files_copied"; - public static final String FILES_COPIED_BYTES = "files_copied_bytes"; - public static final String FILES_DELETED = "files_deleted"; - public static final String DIRECTORIES_CREATED = "directories_created"; - public static final String DIRECTORIES_DELETED = "directories_deleted"; - public static final String IGNORED_ERRORS = "ignored_errors"; private final MetricsRegistry registry = new MetricsRegistry("S3AFileSystem").setContext(CONTEXT); private final MutableCounterLong streamOpenOperations; @@ -95,6 +77,27 @@ public class S3AInstrumentation { private final MutableCounterLong numberOfDirectoriesDeleted; private final Map streamMetrics = new HashMap<>(); + private static final Statistic[] COUNTERS_TO_CREATE = { + INVOCATION_COPY_FROM_LOCAL_FILE, + INVOCATION_EXISTS, + INVOCATION_GET_FILE_STATUS, + INVOCATION_GLOB_STATUS, + INVOCATION_IS_DIRECTORY, + INVOCATION_IS_FILE, + INVOCATION_LIST_FILES, + INVOCATION_LIST_LOCATED_STATUS, + INVOCATION_LIST_STATUS, + INVOCATION_MKDIRS, + INVOCATION_RENAME, + OBJECT_COPY_REQUESTS, + OBJECT_DELETE_REQUESTS, + OBJECT_LIST_REQUESTS, + OBJECT_METADATA_REQUESTS, + OBJECT_MULTIPART_UPLOAD_ABORTED, + OBJECT_PUT_BYTES, + OBJECT_PUT_REQUESTS + }; + public S3AInstrumentation(URI name) { UUID fileSystemInstanceId = UUID.randomUUID(); registry.tag("FileSystemId", @@ -103,50 +106,35 @@ public S3AInstrumentation(URI name) { registry.tag("fsURI", "URI of this filesystem", name.toString()); - streamOpenOperations = streamCounter(STREAM_OPENED, - "Total count of times an input stream to object store was opened"); - streamCloseOperations = streamCounter(STREAM_CLOSE_OPERATIONS, - "Total count of times an attempt to close a data stream was made"); - streamClosed = streamCounter(STREAM_CLOSED, - "Count of times the TCP stream was closed"); - streamAborted = streamCounter(STREAM_ABORTED, - "Count of times the TCP stream was aborted"); - streamSeekOperations = streamCounter(STREAM_SEEK_OPERATIONS, - "Number of seek operations invoked on input streams"); - streamReadExceptions = streamCounter(STREAM_READ_EXCEPTIONS, - "Number of read exceptions caught and attempted to recovered from"); - streamForwardSeekOperations = streamCounter(STREAM_FORWARD_SEEK_OPERATIONS, - "Number of executed seek operations which went forward in a stream"); - streamBackwardSeekOperations = streamCounter( - STREAM_BACKWARD_SEEK_OPERATIONS, - "Number of executed seek operations which went backwards in a stream"); - streamBytesSkippedOnSeek = streamCounter(STREAM_SEEK_BYTES_SKIPPED, - "Count of bytes skipped during forward seek operations"); - streamBytesBackwardsOnSeek = streamCounter(STREAM_SEEK_BYTES_BACKWARDS, - "Count of bytes moved backwards during seek operations"); - streamBytesRead = streamCounter(STREAM_SEEK_BYTES_READ, - "Count of bytes read during seek() in stream operations"); - streamReadOperations = streamCounter(STREAM_READ_OPERATIONS, - "Count of read() operations in streams"); - streamReadFullyOperations = streamCounter(STREAM_READ_FULLY_OPERATIONS, - "Count of readFully() operations in streams"); - streamReadsIncomplete = streamCounter(STREAM_READ_OPERATIONS_INCOMPLETE, - "Count of incomplete read() operations in streams"); - - numberOfFilesCreated = counter(FILES_CREATED, - "Total number of files created through the object store."); - numberOfFilesCopied = counter(FILES_COPIED, - "Total number of files copied within the object store."); - bytesOfFilesCopied = counter(FILES_COPIED_BYTES, - "Total number of bytes copied within the object store."); - numberOfFilesDeleted = counter(FILES_DELETED, - "Total number of files deleted through from the object store."); - numberOfDirectoriesCreated = counter(DIRECTORIES_CREATED, - "Total number of directories created through the object store."); - numberOfDirectoriesDeleted = counter(DIRECTORIES_DELETED, - "Total number of directories deleted through the object store."); - ignoredErrors = counter(IGNORED_ERRORS, - "Total number of errors caught and ingored."); + streamOpenOperations = streamCounter(STREAM_OPENED); + streamCloseOperations = streamCounter(STREAM_CLOSE_OPERATIONS); + streamClosed = streamCounter(STREAM_CLOSED); + streamAborted = streamCounter(STREAM_ABORTED); + streamSeekOperations = streamCounter(STREAM_SEEK_OPERATIONS); + streamReadExceptions = streamCounter(STREAM_READ_EXCEPTIONS); + streamForwardSeekOperations = + streamCounter(STREAM_FORWARD_SEEK_OPERATIONS); + streamBackwardSeekOperations = + streamCounter(STREAM_BACKWARD_SEEK_OPERATIONS); + streamBytesSkippedOnSeek = streamCounter(STREAM_SEEK_BYTES_SKIPPED); + streamBytesBackwardsOnSeek = + streamCounter(STREAM_SEEK_BYTES_BACKWARDS); + streamBytesRead = streamCounter(STREAM_SEEK_BYTES_READ); + streamReadOperations = streamCounter(STREAM_READ_OPERATIONS); + streamReadFullyOperations = + streamCounter(STREAM_READ_FULLY_OPERATIONS); + streamReadsIncomplete = + streamCounter(STREAM_READ_OPERATIONS_INCOMPLETE); + numberOfFilesCreated = counter(FILES_CREATED); + numberOfFilesCopied = counter(FILES_COPIED); + bytesOfFilesCopied = counter(FILES_COPIED_BYTES); + numberOfFilesDeleted = counter(FILES_DELETED); + numberOfDirectoriesCreated = counter(DIRECTORIES_CREATED); + numberOfDirectoriesDeleted = counter(DIRECTORIES_DELETED); + ignoredErrors = counter(IGNORED_ERRORS); + for (Statistic statistic : COUNTERS_TO_CREATE) { + counter(statistic); + } } /** @@ -173,6 +161,25 @@ protected final MutableCounterLong streamCounter(String name, String desc) { return counter; } + /** + * Create a counter in the registry. + * @param op statistic to count + * @return a new counter + */ + protected final MutableCounterLong counter(Statistic op) { + return counter(op.getSymbol(), op.getDescription()); + } + + /** + * Create a counter in the stream map: these are unregistered in the public + * metrics. + * @param op statistic to count + * @return a new counter + */ + protected final MutableCounterLong streamCounter(Statistic op) { + return streamCounter(op.getSymbol(), op.getDescription()); + } + /** * Create a gauge in the registry. * @param name name gauge name @@ -215,6 +222,58 @@ public String dump(String prefix, return metricBuilder.toString(); } + /** + * Get the value of a counter. + * @param statistic the operation + * @return its value, or 0 if not found. + */ + public long getCounterValue(Statistic statistic) { + return getCounterValue(statistic.getSymbol()); + } + + /** + * Get the value of a counter. + * If the counter is null, return 0. + * @param name the name of the counter + * @return its value. + */ + public long getCounterValue(String name) { + MutableCounterLong counter = lookupCounter(name); + return counter == null ? 0 : counter.value(); + } + + /** + * Lookup a counter by name. Return null if it is not known. + * @param name counter name + * @return the counter + */ + private MutableCounterLong lookupCounter(String name) { + MutableMetric metric = lookupMetric(name); + if (metric == null) { + return null; + } + Preconditions.checkNotNull(metric, "not found: " + name); + if (!(metric instanceof MutableCounterLong)) { + throw new IllegalStateException("Metric " + name + + " is not a MutableCounterLong: " + metric); + } + return (MutableCounterLong) metric; + } + + /** + * Look up a metric from both the registered set and the lighter weight + * stream entries. + * @param name metric name + * @return the metric or null + */ + public MutableMetric lookupMetric(String name) { + MutableMetric metric = getRegistry().get(name); + if (metric == null) { + metric = streamMetrics.get(name); + } + return metric; + } + /** * Indicate that S3A created a file. */ @@ -262,6 +321,19 @@ public void errorIgnored() { ignoredErrors.incr(); } + /** + * Increment a specific counter. + * No-op if not defined. + * @param op operation + * @param count increment value + */ + public void incrementCounter(Statistic op, long count) { + MutableCounterLong counter = lookupCounter(op.getSymbol()); + if (counter != null) { + counter.incr(count); + } + } + /** * Create a stream input statistics instance. * @return the new instance diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AOutputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AOutputStream.java index 593e9e8da49..23ba6828e77 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AOutputStream.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AOutputStream.java @@ -19,19 +19,11 @@ package org.apache.hadoop.fs.s3a; import com.amazonaws.AmazonClientException; -import com.amazonaws.event.ProgressEvent; -import com.amazonaws.event.ProgressEventType; -import com.amazonaws.event.ProgressListener; -import com.amazonaws.services.s3.model.CannedAccessControlList; import com.amazonaws.services.s3.model.ObjectMetadata; -import com.amazonaws.services.s3.model.PutObjectRequest; -import com.amazonaws.services.s3.transfer.TransferManager; import com.amazonaws.services.s3.transfer.Upload; -import org.apache.commons.lang.StringUtils; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.LocalDirAllocator; import org.apache.hadoop.util.Progressable; @@ -44,8 +36,6 @@ import java.io.InterruptedIOException; import java.io.OutputStream; -import static com.amazonaws.event.ProgressEventType.TRANSFER_COMPLETED_EVENT; -import static com.amazonaws.event.ProgressEventType.TRANSFER_PART_STARTED_EVENT; import static org.apache.hadoop.fs.s3a.Constants.*; import static org.apache.hadoop.fs.s3a.S3AUtils.*; @@ -59,32 +49,20 @@ public class S3AOutputStream extends OutputStream { private File backupFile; private boolean closed; private String key; - private String bucket; - private TransferManager transfers; private Progressable progress; private long partSize; private long partSizeThreshold; private S3AFileSystem fs; - private CannedAccessControlList cannedACL; - private FileSystem.Statistics statistics; private LocalDirAllocator lDirAlloc; - private String serverSideEncryptionAlgorithm; public static final Logger LOG = S3AFileSystem.LOG; - public S3AOutputStream(Configuration conf, TransferManager transfers, - S3AFileSystem fs, String bucket, String key, Progressable progress, - CannedAccessControlList cannedACL, FileSystem.Statistics statistics, - String serverSideEncryptionAlgorithm) + public S3AOutputStream(Configuration conf, + S3AFileSystem fs, String key, Progressable progress) throws IOException { - this.bucket = bucket; this.key = key; - this.transfers = transfers; this.progress = progress; this.fs = fs; - this.cannedACL = cannedACL; - this.statistics = statistics; - this.serverSideEncryptionAlgorithm = serverSideEncryptionAlgorithm; partSize = fs.getPartitionSize(); partSizeThreshold = fs.getMultiPartThreshold(); @@ -124,30 +102,18 @@ public synchronized void close() throws IOException { try { - final ObjectMetadata om = new ObjectMetadata(); - if (StringUtils.isNotBlank(serverSideEncryptionAlgorithm)) { - om.setSSEAlgorithm(serverSideEncryptionAlgorithm); - } - PutObjectRequest putObjectRequest = - new PutObjectRequest(bucket, key, backupFile); - putObjectRequest.setCannedAcl(cannedACL); - putObjectRequest.setMetadata(om); - - Upload upload = transfers.upload(putObjectRequest); - - ProgressableProgressListener listener = - new ProgressableProgressListener(upload, progress, statistics); + final ObjectMetadata om = fs.newObjectMetadata(); + Upload upload = fs.putObject( + fs.newPutObjectRequest( + key, + om, + backupFile)); + ProgressableProgressListener listener = + new ProgressableProgressListener(fs, key, upload, progress); upload.addProgressListener(listener); upload.waitForUploadResult(); - - long delta = upload.getProgress().getBytesTransferred() - - listener.getLastBytesTransferred(); - if (statistics != null && delta != 0) { - LOG.debug("S3A write delta changed after finished: {} bytes", delta); - statistics.incrementBytesWritten(delta); - } - + listener.uploadCompleted(); // This will delete unnecessary fake parent directories fs.finishedWrite(key); } catch (InterruptedException e) { @@ -175,46 +141,4 @@ public void write(byte[] b, int off, int len) throws IOException { backupStream.write(b, off, len); } - /** - * Listener to progress from AWS regarding transfers. - */ - public static class ProgressableProgressListener implements ProgressListener { - private Progressable progress; - private FileSystem.Statistics statistics; - private long lastBytesTransferred; - private Upload upload; - - public ProgressableProgressListener(Upload upload, Progressable progress, - FileSystem.Statistics statistics) { - this.upload = upload; - this.progress = progress; - this.statistics = statistics; - this.lastBytesTransferred = 0; - } - - public void progressChanged(ProgressEvent progressEvent) { - if (progress != null) { - progress.progress(); - } - - // There are 3 http ops here, but this should be close enough for now - ProgressEventType pet = progressEvent.getEventType(); - if (pet == TRANSFER_PART_STARTED_EVENT || - pet == TRANSFER_COMPLETED_EVENT) { - statistics.incrementWriteOps(1); - } - - long transferred = upload.getProgress().getBytesTransferred(); - long delta = transferred - lastBytesTransferred; - if (statistics != null && delta != 0) { - statistics.incrementBytesWritten(delta); - } - - lastBytesTransferred = transferred; - } - - public long getLastBytesTransferred() { - return lastBytesTransferred; - } - } } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AStorageStatistics.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AStorageStatistics.java new file mode 100644 index 00000000000..f69159a0f54 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AStorageStatistics.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.fs.StorageStatistics; +import org.slf4j.Logger; + +import java.util.Collections; +import java.util.EnumMap; +import java.util.Iterator; +import java.util.Map; +import java.util.NoSuchElementException; +import java.util.concurrent.atomic.AtomicLong; + +/** + * Storage statistics for S3A. + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +public class S3AStorageStatistics extends StorageStatistics { + private static final Logger LOG = S3AFileSystem.LOG; + + public static final String NAME = "S3AStorageStatistics"; + private final Map opsCount = + new EnumMap<>(Statistic.class); + + public S3AStorageStatistics() { + super(NAME); + for (Statistic opType : Statistic.values()) { + opsCount.put(opType, new AtomicLong(0)); + } + } + + /** + * Increment a specific counter. + * @param op operation + * @param count increment value + * @return the new value + */ + public long incrementCounter(Statistic op, long count) { + long updated = opsCount.get(op).addAndGet(count); + LOG.debug("{} += {} -> {}", op, count, updated); + return updated; + } + + private class LongIterator implements Iterator { + private Iterator> iterator = + Collections.unmodifiableSet(opsCount.entrySet()).iterator(); + + @Override + public boolean hasNext() { + return iterator.hasNext(); + } + + @Override + public LongStatistic next() { + if (!iterator.hasNext()) { + throw new NoSuchElementException(); + } + final Map.Entry entry = iterator.next(); + return new LongStatistic(entry.getKey().name(), entry.getValue().get()); + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + } + + @Override + public Iterator getLongStatistics() { + return new LongIterator(); + } + + @Override + public Long getLong(String key) { + final Statistic type = Statistic.fromSymbol(key); + return type == null ? null : opsCount.get(type).get(); + } + + @Override + public boolean isTracked(String key) { + return Statistic.fromSymbol(key) == null; + } + +} diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java index 12d14e27f54..062fca48a82 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java @@ -21,6 +21,7 @@ import com.amazonaws.AmazonClientException; import com.amazonaws.AmazonServiceException; import com.amazonaws.services.s3.model.AmazonS3Exception; +import com.amazonaws.services.s3.model.S3ObjectSummary; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.fs.Path; @@ -29,6 +30,7 @@ import java.io.FileNotFoundException; import java.io.IOException; import java.nio.file.AccessDeniedException; +import java.util.Date; import java.util.Map; import java.util.concurrent.ExecutionException; @@ -186,4 +188,50 @@ public static String stringify(AmazonS3Exception e) { } return builder.toString(); } + + /** + * Create a files status instance from a listing. + * @param keyPath path to entry + * @param summary summary from AWS + * @param blockSize block size to declare. + * @return a status entry + */ + public static S3AFileStatus createFileStatus(Path keyPath, + S3ObjectSummary summary, + long blockSize) { + if (objectRepresentsDirectory(summary.getKey(), summary.getSize())) { + return new S3AFileStatus(true, true, keyPath); + } else { + return new S3AFileStatus(summary.getSize(), + dateToLong(summary.getLastModified()), keyPath, + blockSize); + } + } + + /** + * Predicate: does the object represent a directory?. + * @param name object name + * @param size object size + * @return true if it meets the criteria for being an object + */ + public static boolean objectRepresentsDirectory(final String name, + final long size) { + return !name.isEmpty() + && name.charAt(name.length() - 1) == '/' + && size == 0L; + } + + /** + * Date to long conversion. + * Handles null Dates that can be returned by AWS by returning 0 + * @param date date from AWS query + * @return timestamp of the object + */ + public static long dateToLong(final Date date) { + if (date == null) { + return 0L; + } + + return date.getTime(); + } } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java new file mode 100644 index 00000000000..d29cb2f1ca2 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a; + +/** + * Statistic which are collected in S3A. + * These statistics are available at a low level in {@link S3AStorageStatistics} + * and as metrics in {@link S3AInstrumentation} + */ +public enum Statistic { + + DIRECTORIES_CREATED("directories_created", + "Total number of directories created through the object store."), + DIRECTORIES_DELETED("directories_deleted", + "Total number of directories deleted through the object store."), + FILES_COPIED("files_copied", + "Total number of files copied within the object store."), + FILES_COPIED_BYTES("files_copied_bytes", + "Total number of bytes copied within the object store."), + FILES_CREATED("files_created", + "Total number of files created through the object store."), + FILES_DELETED("files_deleted", + "Total number of files deleted from the object store."), + IGNORED_ERRORS("ignored_errors", "Errors caught and ignored"), + INVOCATION_COPY_FROM_LOCAL_FILE("invocations_copyfromlocalfile", + "Calls of copyFromLocalFile()"), + INVOCATION_EXISTS("invocations_exists", + "Calls of exists()"), + INVOCATION_GET_FILE_STATUS("invocations_getfilestatus", + "Calls of getFileStatus()"), + INVOCATION_GLOB_STATUS("invocations_globstatus", + "Calls of globStatus()"), + INVOCATION_IS_DIRECTORY("invocations_is_directory", + "Calls of isDirectory()"), + INVOCATION_IS_FILE("invocations_is_file", + "Calls of isFile()"), + INVOCATION_LIST_FILES("invocations_listfiles", + "Calls of listFiles()"), + INVOCATION_LIST_LOCATED_STATUS("invocations_listlocatedstatus", + "Calls of listLocatedStatus()"), + INVOCATION_LIST_STATUS("invocations_liststatus", + "Calls of listStatus()"), + INVOCATION_MKDIRS("invocations_mdkirs", + "Calls of mkdirs()"), + INVOCATION_RENAME("invocations_rename", + "Calls of rename()"), + OBJECT_COPY_REQUESTS("object_copy_requests", "Object copy requests"), + OBJECT_DELETE_REQUESTS("object_delete_requests", "Object delete requests"), + OBJECT_LIST_REQUESTS("object_list_requests", + "Number of object listings made"), + OBJECT_METADATA_REQUESTS("object_metadata_requests", + "Number of requests for object metadata"), + OBJECT_MULTIPART_UPLOAD_ABORTED("object_multipart_aborted", + "Object multipart upload aborted"), + OBJECT_PUT_REQUESTS("object_put_requests", + "Object put/multipart upload count"), + OBJECT_PUT_BYTES("object_put_bytes", "number of bytes uploaded"), + STREAM_ABORTED("streamAborted", + "Count of times the TCP stream was aborted"), + STREAM_BACKWARD_SEEK_OPERATIONS("streamBackwardSeekOperations", + "Number of executed seek operations which went backwards in a stream"), + STREAM_CLOSED("streamClosed", "Count of times the TCP stream was closed"), + STREAM_CLOSE_OPERATIONS("streamCloseOperations", + "Total count of times an attempt to close a data stream was made"), + STREAM_FORWARD_SEEK_OPERATIONS("streamForwardSeekOperations", + "Number of executed seek operations which went forward in a stream"), + STREAM_OPENED("streamOpened", + "Total count of times an input stream to object store was opened"), + STREAM_READ_EXCEPTIONS("streamReadExceptions", + "Number of seek operations invoked on input streams"), + STREAM_READ_FULLY_OPERATIONS("streamReadFullyOperations", + "count of readFully() operations in streams"), + STREAM_READ_OPERATIONS("streamReadOperations", + "Count of read() operations in streams"), + STREAM_READ_OPERATIONS_INCOMPLETE("streamReadOperationsIncomplete", + "Count of incomplete read() operations in streams"), + STREAM_SEEK_BYTES_BACKWARDS("streamBytesBackwardsOnSeek", + "Count of bytes moved backwards during seek operations"), + STREAM_SEEK_BYTES_READ("streamBytesRead", + "Count of bytes read during seek() in stream operations"), + STREAM_SEEK_BYTES_SKIPPED("streamBytesSkippedOnSeek", + "Count of bytes skipped during forward seek operation"), + STREAM_SEEK_OPERATIONS("streamSeekOperations", + "Number of read exceptions caught and attempted to recovered from"); + + Statistic(String symbol, String description) { + this.symbol = symbol; + this.description = description; + } + + private final String symbol; + private final String description; + + public String getSymbol() { + return symbol; + } + + /** + * Get a statistic from a symbol. + * @param symbol statistic to look up + * @return the value or null. + */ + public static Statistic fromSymbol(String symbol) { + if (symbol != null) { + for (Statistic opType : values()) { + if (opType.getSymbol().equals(symbol)) { + return opType; + } + } + } + return null; + } + + public String getDescription() { + return description; + } + + /** + * The string value is simply the symbol. + * This makes this operation very low cost. + * @return the symbol of this statistic. + */ + @Override + public String toString() { + return symbol; + } +} diff --git a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md index 568ffff091f..8cd2155d1f8 100644 --- a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md +++ b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md @@ -738,9 +738,19 @@ The exact number of operations to perform is configurable in the option Larger values generate more load, and are recommended when testing locally, or in batch runs. -Smaller values should result in faster test runs, especially when the object +Smaller values results in faster test runs, especially when the object store is a long way away. +Operations which work on directories have a separate option: this controls +the width and depth of tests creating recursive directories. Larger +values create exponentially more directories, with consequent performance +impact. + + + scale.test.directory.count + 2 + + DistCp tests targeting S3A support a configurable file size. The default is 10 MB, but the configuration value is expressed in KB so that it can be tuned smaller to achieve faster test runs. diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java index a4f9b99d99d..04010d635bf 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java @@ -21,7 +21,9 @@ import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileContext; +import org.junit.Assert; import org.junit.internal.AssumptionViolatedException; +import org.slf4j.Logger; import java.io.IOException; import java.net.URI; @@ -190,4 +192,155 @@ public static void skipIfEncryptionTestsDisabled( } } + /** + * Reset all metrics in a list. + * @param metrics metrics to reset + */ + public static void reset(S3ATestUtils.MetricDiff... metrics) { + for (S3ATestUtils.MetricDiff metric : metrics) { + metric.reset(); + } + } + + /** + * Print all metrics in a list. + * @param log log to print the metrics to. + * @param metrics metrics to process + */ + public static void print(Logger log, S3ATestUtils.MetricDiff... metrics) { + for (S3ATestUtils.MetricDiff metric : metrics) { + log.info(metric.toString()); + } + } + + /** + * Print all metrics in a list, then reset them. + * @param log log to print the metrics to. + * @param metrics metrics to process + */ + public static void printThenReset(Logger log, + S3ATestUtils.MetricDiff... metrics) { + print(log, metrics); + reset(metrics); + } + + /** + * Helper class to do diffs of metrics. + */ + public static final class MetricDiff { + private final S3AFileSystem fs; + private final Statistic statistic; + private long startingValue; + + /** + * Constructor. + * Invokes {@link #reset()} so it is immediately capable of measuring the + * difference in metric values. + * + * @param fs the filesystem to monitor + * @param statistic the statistic to monitor. + */ + public MetricDiff(S3AFileSystem fs, Statistic statistic) { + this.fs = fs; + this.statistic = statistic; + reset(); + } + + /** + * Reset the starting value to the current value. + * Diffs will be against this new value. + */ + public void reset() { + startingValue = currentValue(); + } + + /** + * Get the current value of the metric. + * @return the latest value. + */ + public long currentValue() { + return fs.getInstrumentation().getCounterValue(statistic); + } + + /** + * Get the difference between the the current value and + * {@link #startingValue}. + * @return the difference. + */ + public long diff() { + return currentValue() - startingValue; + } + + @Override + public String toString() { + long c = currentValue(); + final StringBuilder sb = new StringBuilder(statistic.getSymbol()); + sb.append(" starting=").append(startingValue); + sb.append(" current=").append(c); + sb.append(" diff=").append(c - startingValue); + return sb.toString(); + } + + /** + * Assert that the value of {@link #diff()} matches that expected. + * @param expected expected value. + */ + public void assertDiffEquals(long expected) { + Assert.assertEquals("Count of " + this, + expected, diff()); + } + + /** + * Assert that the value of {@link #diff()} matches that of another + * instance. + * @param that the other metric diff instance. + */ + public void assertDiffEquals(MetricDiff that) { + Assert.assertEquals(this.toString() + " != " + that, + this.diff(), that.diff()); + } + + /** + * Comparator for assertions. + * @param that other metric diff + * @return true if the value is {@code ==} the other's + */ + public boolean diffEquals(MetricDiff that) { + return this.currentValue() == that.currentValue(); + } + + /** + * Comparator for assertions. + * @param that other metric diff + * @return true if the value is {@code <} the other's + */ + public boolean diffLessThan(MetricDiff that) { + return this.currentValue() < that.currentValue(); + } + + /** + * Comparator for assertions. + * @param that other metric diff + * @return true if the value is {@code <=} the other's + */ + public boolean diffLessThanOrEquals(MetricDiff that) { + return this.currentValue() <= that.currentValue(); + } + + /** + * Get the statistic + * @return the statistic + */ + public Statistic getStatistic() { + return statistic; + } + + /** + * Get the starting value; that set in the last {@link #reset()}. + * @return the starting value for diffs. + */ + public long getStartingValue() { + return startingValue; + } + } } diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AFileOperationCost.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AFileOperationCost.java new file mode 100644 index 00000000000..0a8dd2d6082 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AFileOperationCost.java @@ -0,0 +1,191 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.contract.AbstractFSContract; +import org.apache.hadoop.fs.contract.AbstractFSContractTestBase; +import org.apache.hadoop.fs.contract.ContractTestUtils; +import org.apache.hadoop.fs.contract.s3a.S3AContract; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.FileNotFoundException; +import java.net.URI; + +import static org.apache.hadoop.fs.contract.ContractTestUtils.*; +import static org.apache.hadoop.fs.s3a.Statistic.*; +import static org.apache.hadoop.fs.s3a.S3ATestUtils.*; +import static org.apache.hadoop.fs.s3a.S3ATestUtils.MetricDiff; +import static org.apache.hadoop.test.GenericTestUtils.getTestDir; + +/** + * Use metrics to assert about the cost of file status queries. + * {@link S3AFileSystem#getFileStatus(Path)}. + */ +public class TestS3AFileOperationCost extends AbstractFSContractTestBase { + + private MetricDiff metadataRequests; + private MetricDiff listRequests; + + private static final Logger LOG = + LoggerFactory.getLogger(TestS3AFileOperationCost.class); + + @Override + protected AbstractFSContract createContract(Configuration conf) { + return new S3AContract(conf); + } + + @Override + public S3AFileSystem getFileSystem() { + return (S3AFileSystem) super.getFileSystem(); + } + + @Override + public void setup() throws Exception { + super.setup(); + S3AFileSystem fs = getFileSystem(); + metadataRequests = new MetricDiff(fs, OBJECT_METADATA_REQUESTS); + listRequests = new MetricDiff(fs, OBJECT_LIST_REQUESTS); + } + + @Test + public void testCostOfGetFileStatusOnFile() throws Throwable { + describe("performing getFileStatus on a file"); + Path simpleFile = path("simple.txt"); + S3AFileSystem fs = getFileSystem(); + touch(fs, simpleFile); + resetMetricDiffs(); + S3AFileStatus status = fs.getFileStatus(simpleFile); + assertTrue("not a file: " + status, status.isFile()); + metadataRequests.assertDiffEquals(1); + listRequests.assertDiffEquals(0); + } + + private void resetMetricDiffs() { + reset(metadataRequests, listRequests); + } + + @Test + public void testCostOfGetFileStatusOnEmptyDir() throws Throwable { + describe("performing getFileStatus on an empty directory"); + S3AFileSystem fs = getFileSystem(); + Path dir = path("empty"); + fs.mkdirs(dir); + resetMetricDiffs(); + S3AFileStatus status = fs.getFileStatus(dir); + assertTrue("not empty: " + status, status.isEmptyDirectory()); + metadataRequests.assertDiffEquals(2); + listRequests.assertDiffEquals(0); + } + + @Test + public void testCostOfGetFileStatusOnMissingFile() throws Throwable { + describe("performing getFileStatus on a missing file"); + S3AFileSystem fs = getFileSystem(); + Path path = path("missing"); + resetMetricDiffs(); + try { + S3AFileStatus status = fs.getFileStatus(path); + fail("Got a status back from a missing file path " + status); + } catch (FileNotFoundException expected) { + // expected + } + metadataRequests.assertDiffEquals(2); + listRequests.assertDiffEquals(1); + } + + @Test + public void testCostOfGetFileStatusOnMissingSubPath() throws Throwable { + describe("performing getFileStatus on a missing file"); + S3AFileSystem fs = getFileSystem(); + Path path = path("missingdir/missingpath"); + resetMetricDiffs(); + try { + S3AFileStatus status = fs.getFileStatus(path); + fail("Got a status back from a missing file path " + status); + } catch (FileNotFoundException expected) { + // expected + } + metadataRequests.assertDiffEquals(2); + listRequests.assertDiffEquals(1); + } + + @Test + public void testCostOfGetFileStatusOnNonEmptyDir() throws Throwable { + describe("performing getFileStatus on a non-empty directory"); + S3AFileSystem fs = getFileSystem(); + Path dir = path("empty"); + fs.mkdirs(dir); + Path simpleFile = new Path(dir, "simple.txt"); + touch(fs, simpleFile); + resetMetricDiffs(); + S3AFileStatus status = fs.getFileStatus(dir); + if (status.isEmptyDirectory()) { + // erroneous state + String fsState = fs.toString(); + fail("FileStatus says directory isempty: " + status + + "\n" + ContractTestUtils.ls(fs, dir) + + "\n" + fsState); + } + metadataRequests.assertDiffEquals(2); + listRequests.assertDiffEquals(1); + } + + @Test + public void testCostOfCopyFromLocalFile() throws Throwable { + describe("testCostOfCopyFromLocalFile"); + File localTestDir = getTestDir("tmp"); + localTestDir.mkdirs(); + File tmpFile = File.createTempFile("tests3acost", ".txt", + localTestDir); + tmpFile.delete(); + try { + URI localFileURI = tmpFile.toURI(); + FileSystem localFS = FileSystem.get(localFileURI, + getFileSystem().getConf()); + Path localPath = new Path(localFileURI); + int len = 10 * 1024; + byte[] data = dataset(len, 'A', 'Z'); + writeDataset(localFS, localPath, data, len, 1024, true); + S3AFileSystem s3a = getFileSystem(); + MetricDiff copyLocalOps = new MetricDiff(s3a, + INVOCATION_COPY_FROM_LOCAL_FILE); + MetricDiff putRequests = new MetricDiff(s3a, + OBJECT_PUT_REQUESTS); + MetricDiff putBytes = new MetricDiff(s3a, + OBJECT_PUT_BYTES); + + Path remotePath = path("copied"); + s3a.copyFromLocalFile(false, true, localPath, remotePath); + verifyFileContents(s3a, remotePath, data); + copyLocalOps.assertDiffEquals(1); + putRequests.assertDiffEquals(1); + putBytes.assertDiffEquals(len); + // print final stats + LOG.info("Filesystem {}", s3a); + } finally { + tmpFile.delete(); + } + } +} diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/S3AScaleTestBase.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/S3AScaleTestBase.java index d65f693fc48..21639b151e9 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/S3AScaleTestBase.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/S3AScaleTestBase.java @@ -34,13 +34,11 @@ import org.junit.BeforeClass; import org.junit.Rule; import org.junit.rules.TestName; +import org.junit.rules.Timeout; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.InputStream; -import java.util.Locale; - -import static org.junit.Assume.assumeTrue; /** * Base class for scale tests; here is where the common scale configuration @@ -51,11 +49,57 @@ public class S3AScaleTestBase extends Assert implements S3ATestConstants { @Rule public TestName methodName = new TestName(); + @Rule + public Timeout testTimeout = new Timeout(30 * 60 * 1000); + @BeforeClass public static void nameThread() { Thread.currentThread().setName("JUnit"); } + /** + * The number of operations to perform: {@value}. + */ + public static final String KEY_OPERATION_COUNT = + SCALE_TEST + "operation.count"; + + /** + * The number of directory operations to perform: {@value}. + */ + public static final String KEY_DIRECTORY_COUNT = + SCALE_TEST + "directory.count"; + + /** + * The readahead buffer: {@value}. + */ + public static final String KEY_READ_BUFFER_SIZE = + S3A_SCALE_TEST + "read.buffer.size"; + + public static final int DEFAULT_READ_BUFFER_SIZE = 16384; + + /** + * Key for a multi MB test file: {@value}. + */ + public static final String KEY_CSVTEST_FILE = + S3A_SCALE_TEST + "csvfile"; + + /** + * Default path for the multi MB test file: {@value}. + */ + public static final String DEFAULT_CSVTEST_FILE + = "s3a://landsat-pds/scene_list.gz"; + + /** + * The default number of operations to perform: {@value}. + */ + public static final long DEFAULT_OPERATION_COUNT = 2005; + + /** + * Default number of directories to create when performing + * directory performance/scale tests. + */ + public static final int DEFAULT_DIRECTORY_COUNT = 2; + protected S3AFileSystem fs; protected static final Logger LOG = @@ -132,108 +176,4 @@ protected S3AInstrumentation.InputStreamStatistics getInputStreamStatistics( } } - /** - * Make times more readable, by adding a "," every three digits. - * @param nanos nanos or other large number - * @return a string for logging - */ - protected static String toHuman(long nanos) { - return String.format(Locale.ENGLISH, "%,d", nanos); - } - - /** - * Log the bandwidth of a timer as inferred from the number of - * bytes processed. - * @param timer timer - * @param bytes bytes processed in the time period - */ - protected void bandwidth(NanoTimer timer, long bytes) { - LOG.info("Bandwidth = {} MB/S", - timer.bandwidthDescription(bytes)); - } - - /** - * Work out the bandwidth in MB/s - * @param bytes bytes - * @param durationNS duration in nanos - * @return the number of megabytes/second of the recorded operation - */ - public static double bandwidthMBs(long bytes, long durationNS) { - return (bytes * 1000.0 ) / durationNS; - } - - /** - * A simple class for timing operations in nanoseconds, and for - * printing some useful results in the process. - */ - protected static class NanoTimer { - final long startTime; - long endTime; - - public NanoTimer() { - startTime = now(); - } - - /** - * End the operation - * @return the duration of the operation - */ - public long end() { - endTime = now(); - return duration(); - } - - /** - * End the operation; log the duration - * @param format message - * @param args any arguments - * @return the duration of the operation - */ - public long end(String format, Object... args) { - long d = end(); - LOG.info("Duration of {}: {} nS", - String.format(format, args), toHuman(d)); - return d; - } - - long now() { - return System.nanoTime(); - } - - long duration() { - return endTime - startTime; - } - - double bandwidth(long bytes) { - return S3AScaleTestBase.bandwidthMBs(bytes, duration()); - } - - /** - * Bandwidth as bytes per second - * @param bytes bytes in - * @return the number of bytes per second this operation timed. - */ - double bandwidthBytes(long bytes) { - return (bytes * 1.0 ) / duration(); - } - - /** - * How many nanoseconds per byte - * @param bytes bytes processed in this time period - * @return the nanoseconds it took each byte to be processed - */ - long nanosPerByte(long bytes) { - return duration() / bytes; - } - - /** - * Get a description of the bandwidth, even down to fractions of - * a MB - * @param bytes bytes processed - * @return bandwidth - */ - String bandwidthDescription(long bytes) { - return String.format("%,.6f", bandwidth(bytes)); - } - } } diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/TestS3ADeleteManyFiles.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/TestS3ADeleteManyFiles.java index af1883e7670..5e07dcb8707 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/TestS3ADeleteManyFiles.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/TestS3ADeleteManyFiles.java @@ -20,9 +20,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.contract.ContractTestUtils; -import org.junit.Rule; import org.junit.Test; -import org.junit.rules.Timeout; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -34,15 +32,13 @@ import java.util.concurrent.Executors; import java.util.concurrent.Future; -import static org.junit.Assert.assertEquals; - +/** + * Test some scalable operations related to file renaming and deletion. + */ public class TestS3ADeleteManyFiles extends S3AScaleTestBase { private static final Logger LOG = LoggerFactory.getLogger(TestS3ADeleteManyFiles.class); - @Rule - public Timeout testTimeout = new Timeout(30 * 60 * 1000); - /** * CAUTION: If this test starts failing, please make sure that the * {@link org.apache.hadoop.fs.s3a.Constants#MAX_THREADS} configuration is not diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/TestS3ADirectoryPerformance.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/TestS3ADirectoryPerformance.java new file mode 100644 index 00000000000..7ece394ea76 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/TestS3ADirectoryPerformance.java @@ -0,0 +1,189 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a.scale; + +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.s3a.Statistic; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; + +import static org.apache.hadoop.fs.s3a.Statistic.*; +import static org.apache.hadoop.fs.s3a.S3ATestUtils.*; +import static org.apache.hadoop.fs.contract.ContractTestUtils.*; + +/** + * Test the performance of listing files/directories. + */ +public class TestS3ADirectoryPerformance extends S3AScaleTestBase { + private static final Logger LOG = LoggerFactory.getLogger( + TestS3ADirectoryPerformance.class); + + @Test + public void testListOperations() throws Throwable { + describe("Test recursive list operations"); + final Path scaleTestDir = getTestPath(); + final Path listDir = new Path(scaleTestDir, "lists"); + + // scale factor. + int scale = getConf().getInt(KEY_DIRECTORY_COUNT, DEFAULT_DIRECTORY_COUNT); + int width = scale; + int depth = scale; + int files = scale; + MetricDiff metadataRequests = new MetricDiff(fs, OBJECT_METADATA_REQUESTS); + MetricDiff listRequests = new MetricDiff(fs, OBJECT_LIST_REQUESTS); + MetricDiff listStatusCalls = new MetricDiff(fs, INVOCATION_LIST_FILES); + MetricDiff getFileStatusCalls = + new MetricDiff(fs, INVOCATION_GET_FILE_STATUS); + NanoTimer createTimer = new NanoTimer(); + TreeScanResults created = + createSubdirs(fs, listDir, depth, width, files, 0); + // add some empty directories + int emptyDepth = 1 * scale; + int emptyWidth = 3 * scale; + + created.add(createSubdirs(fs, listDir, emptyDepth, emptyWidth, 0, + 0, "empty", "f-", "")); + createTimer.end("Time to create %s", created); + LOG.info("Time per operation: {}", + toHuman(createTimer.nanosPerOperation(created.totalCount()))); + printThenReset(LOG, + metadataRequests, + listRequests, + listStatusCalls, + getFileStatusCalls); + + try { + // Scan the directory via an explicit tree walk. + // This is the baseline for any listing speedups. + MetricDiff treewalkMetadataRequests = + new MetricDiff(fs, OBJECT_METADATA_REQUESTS); + MetricDiff treewalkListRequests = new MetricDiff(fs, + OBJECT_LIST_REQUESTS); + MetricDiff treewalkListStatusCalls = new MetricDiff(fs, + INVOCATION_LIST_FILES); + MetricDiff treewalkGetFileStatusCalls = + new MetricDiff(fs, INVOCATION_GET_FILE_STATUS); + NanoTimer treeWalkTimer = new NanoTimer(); + TreeScanResults treewalkResults = treeWalk(fs, listDir); + treeWalkTimer.end("List status via treewalk"); + + print(LOG, + treewalkMetadataRequests, + treewalkListRequests, + treewalkListStatusCalls, + treewalkGetFileStatusCalls); + assertEquals("Files found in listFiles(recursive=true) " + + " created=" + created + " listed=" + treewalkResults, + created.getFileCount(), treewalkResults.getFileCount()); + + + // listFiles() does the recursion internally + NanoTimer listFilesRecursiveTimer = new NanoTimer(); + + TreeScanResults listFilesResults = new TreeScanResults( + fs.listFiles(listDir, true)); + + listFilesRecursiveTimer.end("listFiles(recursive=true) of %s", created); + assertEquals("Files found in listFiles(recursive=true) " + + " created=" + created + " listed=" + listFilesResults, + created.getFileCount(), listFilesResults.getFileCount()); + + treewalkListRequests.assertDiffEquals(listRequests); + printThenReset(LOG, + metadataRequests, listRequests, + listStatusCalls, getFileStatusCalls); + + NanoTimer globStatusTimer = new NanoTimer(); + FileStatus[] globStatusFiles = fs.globStatus(listDir); + globStatusTimer.end("Time to globStatus() %s", globStatusTimer); + LOG.info("Time for glob status {} entries: {}", + globStatusFiles.length, + toHuman(createTimer.duration())); + printThenReset(LOG, + metadataRequests, + listRequests, + listStatusCalls, + getFileStatusCalls); + + } finally { + // deletion at the end of the run + NanoTimer deleteTimer = new NanoTimer(); + fs.delete(listDir, true); + deleteTimer.end("Deleting directory tree"); + printThenReset(LOG, + metadataRequests, listRequests, + listStatusCalls, getFileStatusCalls); + } + } + + @Test + public void testTimeToStatEmptyDirectory() throws Throwable { + describe("Time to stat an empty directory"); + Path path = new Path(getTestPath(), "empty"); + fs.mkdirs(path); + timeToStatPath(path); + } + + @Test + public void testTimeToStatNonEmptyDirectory() throws Throwable { + describe("Time to stat a non-empty directory"); + Path path = new Path(getTestPath(), "dir"); + fs.mkdirs(path); + touch(fs, new Path(path, "file")); + timeToStatPath(path); + } + + @Test + public void testTimeToStatFile() throws Throwable { + describe("Time to stat a simple file"); + Path path = new Path(getTestPath(), "file"); + touch(fs, path); + timeToStatPath(path); + } + + @Test + public void testTimeToStatRoot() throws Throwable { + describe("Time to stat the root path"); + timeToStatPath(new Path("/")); + } + + private void timeToStatPath(Path path) throws IOException { + describe("Timing getFileStatus(\"%s\")", path); + MetricDiff metadataRequests = + new MetricDiff(fs, Statistic.OBJECT_METADATA_REQUESTS); + MetricDiff listRequests = + new MetricDiff(fs, Statistic.OBJECT_LIST_REQUESTS); + long attempts = getOperationCount(); + NanoTimer timer = new NanoTimer(); + for (long l = 0; l < attempts; l++) { + fs.getFileStatus(path); + } + timer.end("Time to execute %d getFileStatusCalls", attempts); + LOG.info("Time per call: {}", toHuman(timer.nanosPerOperation(attempts))); + LOG.info("metadata: {}", metadataRequests); + LOG.info("metadata per operation {}", metadataRequests.diff() / attempts); + LOG.info("listObjects: {}", listRequests); + LOG.info("listObjects: per operation {}", listRequests.diff() / attempts); + } + +} diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/TestS3AInputStreamPerformance.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/TestS3AInputStreamPerformance.java index 0c8b2732c56..5222a4eb99e 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/TestS3AInputStreamPerformance.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/TestS3AInputStreamPerformance.java @@ -36,8 +36,10 @@ import java.io.IOException; +import static org.apache.hadoop.fs.contract.ContractTestUtils.*; + /** - * Look at the performance of S3a operations + * Look at the performance of S3a operations. */ public class TestS3AInputStreamPerformance extends S3AScaleTestBase { private static final Logger LOG = LoggerFactory.getLogger( @@ -151,7 +153,7 @@ public void testTimeToOpenAndReadWholeFileByByte() throws Throwable { readTimer.end("Time to read %d bytes", len); bandwidth(readTimer, count); assertEquals("Not enough bytes were read)", len, count); - long nanosPerByte = readTimer.nanosPerByte(count); + long nanosPerByte = readTimer.nanosPerOperation(count); LOG.info("An open() call has the equivalent duration of reading {} bytes", toHuman( timeOpen.duration() / nanosPerByte)); } diff --git a/hadoop-tools/hadoop-aws/src/test/resources/log4j.properties b/hadoop-tools/hadoop-aws/src/test/resources/log4j.properties index bc85425aed8..1330ed1aef3 100644 --- a/hadoop-tools/hadoop-aws/src/test/resources/log4j.properties +++ b/hadoop-tools/hadoop-aws/src/test/resources/log4j.properties @@ -15,7 +15,9 @@ log4j.rootLogger=info,stdout log4j.threshold=ALL log4j.appender.stdout=org.apache.log4j.ConsoleAppender log4j.appender.stdout.layout=org.apache.log4j.PatternLayout -log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %-5p %c{2} (%F:%M(%L)) - %m%n +log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} (%F:%M(%L)) - %m%n + +log4j.logger.org.apache.hadoop.util.NativeCodeLoader=ERROR # for debugging low level S3a operations, uncomment this line # log4j.logger.org.apache.hadoop.fs.s3a=DEBUG