From 027c8fb257eb5144a4cee42341bf6b774c0fd8d1 Mon Sep 17 00:00:00 2001 From: Steve Loughran Date: Fri, 23 Apr 2021 18:44:41 +0100 Subject: [PATCH] HADOOP-17597. Optionally downgrade on S3A Syncable calls (#2801) Followup to HADOOP-13327, which changed S3A output stream hsync/hflush calls to raise an exception. Adds a new option fs.s3a.downgrade.syncable.exceptions When true, calls to Syncable hsync/hflush on S3A output streams will log once at warn (for entire process life, not just the stream), then increment IOStats with the relevant operation counter With the downgrade option false (default) * IOStats are incremented * The UnsupportedOperationException current raised includes a link to the JIRA. Contributed by Steve Loughran. --- .../fs/statistics/StoreStatisticNames.java | 6 + .../org/apache/hadoop/fs/s3a/Constants.java | 22 +- .../hadoop/fs/s3a/S3ABlockOutputStream.java | 258 ++++++++++++++---- .../apache/hadoop/fs/s3a/S3AFileSystem.java | 27 +- .../hadoop/fs/s3a/S3AInstrumentation.java | 14 +- .../org/apache/hadoop/fs/s3a/Statistic.java | 8 + .../hadoop/fs/s3a/WriteOperationHelper.java | 5 + .../apache/hadoop/fs/s3a/WriteOperations.java | 6 + .../BlockOutputStreamStatistics.java | 10 + .../impl/EmptyS3AStatisticsContext.java | 8 + .../tools/hadoop-aws/troubleshooting_s3a.md | 37 ++- .../hadoop/fs/s3a/ITestDowngradeSyncable.java | 114 ++++++++ .../fs/s3a/TestS3ABlockOutputStream.java | 55 +++- 13 files changed, 499 insertions(+), 71 deletions(-) create mode 100644 hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestDowngradeSyncable.java diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StoreStatisticNames.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StoreStatisticNames.java index b6b08fe009e..95144393585 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StoreStatisticNames.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StoreStatisticNames.java @@ -84,6 +84,12 @@ public final class StoreStatisticNames { /** {@value}. */ public static final String OP_IS_FILE = "op_is_file"; + /** {@value}. */ + public static final String OP_HFLUSH = "op_hflush"; + + /** {@value}. */ + public static final String OP_HSYNC = "op_hsync"; + /** {@value}. */ public static final String OP_IS_DIRECTORY = "op_is_directory"; diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java index c4b8f6e3c46..f6900cb1c8e 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java @@ -329,7 +329,6 @@ public final class Constants { * Default is {@link #FAST_UPLOAD_BUFFER_DISK} * Value: {@value} */ - @InterfaceStability.Unstable public static final String FAST_UPLOAD_BUFFER = "fs.s3a.fast.upload.buffer"; @@ -338,26 +337,22 @@ public final class Constants { * Capacity is limited to available disk space. */ - @InterfaceStability.Unstable public static final String FAST_UPLOAD_BUFFER_DISK = "disk"; /** * Use an in-memory array. Fast but will run of heap rapidly: {@value}. */ - @InterfaceStability.Unstable public static final String FAST_UPLOAD_BUFFER_ARRAY = "array"; /** * Use a byte buffer. May be more memory efficient than the * {@link #FAST_UPLOAD_BUFFER_ARRAY}: {@value}. */ - @InterfaceStability.Unstable public static final String FAST_UPLOAD_BYTEBUFFER = "bytebuffer"; /** * Default buffer option: {@value}. */ - @InterfaceStability.Unstable public static final String DEFAULT_FAST_UPLOAD_BUFFER = FAST_UPLOAD_BUFFER_DISK; @@ -370,7 +365,6 @@ public final class Constants { *

* Default is {@link #DEFAULT_FAST_UPLOAD_ACTIVE_BLOCKS} */ - @InterfaceStability.Unstable public static final String FAST_UPLOAD_ACTIVE_BLOCKS = "fs.s3a.fast.upload.active.blocks"; @@ -378,9 +372,23 @@ public final class Constants { * Limit of queued block upload operations before writes * block. Value: {@value} */ - @InterfaceStability.Unstable public static final int DEFAULT_FAST_UPLOAD_ACTIVE_BLOCKS = 4; + /** + * Rather than raise an exception when an attempt is + * made to call the Syncable APIs, warn and downgrade. + * Value: {@value}. + */ + public static final String DOWNGRADE_SYNCABLE_EXCEPTIONS = + "fs.s3a.downgrade.syncable.exceptions"; + + /** + * Default value for syncable invocation. + * Value: {@value}. + */ + public static final boolean DOWNGRADE_SYNCABLE_EXCEPTIONS_DEFAULT = + false; + /** * The capacity of executor queues for operations other than block * upload, where {@link #FAST_UPLOAD_ACTIVE_BLOCKS} is used instead. diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java index 4f06981bc2d..65b9535ba65 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java @@ -55,6 +55,7 @@ import org.apache.hadoop.fs.StreamCapabilities; import org.apache.hadoop.fs.Syncable; import org.apache.hadoop.fs.s3a.commit.CommitConstants; import org.apache.hadoop.fs.s3a.commit.PutTracker; +import org.apache.hadoop.fs.s3a.impl.LogExactlyOnce; import org.apache.hadoop.fs.s3a.statistics.BlockOutputStreamStatistics; import org.apache.hadoop.fs.statistics.DurationTracker; import org.apache.hadoop.fs.statistics.IOStatistics; @@ -62,10 +63,10 @@ import org.apache.hadoop.fs.statistics.IOStatisticsLogging; import org.apache.hadoop.fs.statistics.IOStatisticsSource; import org.apache.hadoop.util.Progressable; +import static java.util.Objects.requireNonNull; import static org.apache.hadoop.fs.s3a.S3AUtils.*; import static org.apache.hadoop.fs.s3a.Statistic.*; import static org.apache.hadoop.fs.s3a.statistics.impl.EmptyS3AStatisticsContext.EMPTY_BLOCK_OUTPUT_STREAM_STATISTICS; -import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.emptyStatistics; import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDurationOfInvocation; import static org.apache.hadoop.io.IOUtils.cleanupWithLogger; @@ -89,10 +90,8 @@ class S3ABlockOutputStream extends OutputStream implements private static final Logger LOG = LoggerFactory.getLogger(S3ABlockOutputStream.class); - private static final String E_NOT_SYNCABLE = "S3A streams are not Syncable"; - - /** Owner FileSystem. */ - private final S3AFileSystem fs; + private static final String E_NOT_SYNCABLE = + "S3A streams are not Syncable. See HADOOP-17597."; /** Object being uploaded. */ private final String key; @@ -136,62 +135,48 @@ class S3ABlockOutputStream extends OutputStream implements /** * Write operation helper; encapsulation of the filesystem operations. */ - private final WriteOperationHelper writeOperationHelper; + private final WriteOperations writeOperationHelper; /** * Track multipart put operation. */ private final PutTracker putTracker; + /** Should Syncable calls be downgraded? */ + private final boolean downgradeSyncableExceptions; + + /** + * Downagraded syncable API calls are only logged at warn + * once across the entire process. + */ + private static final LogExactlyOnce WARN_ON_SYNCABLE = + new LogExactlyOnce(LOG); + /** * An S3A output stream which uploads partitions in a separate pool of * threads; different {@link S3ADataBlocks.BlockFactory} * instances can control where data is buffered. - * - * @param fs S3AFilesystem - * @param key S3 object to work on. - * @param executorService the executor service to use to schedule work - * @param progress report progress in order to prevent timeouts. If - * this object implements {@code ProgressListener} then it will be - * directly wired up to the AWS client, so receive detailed progress - * information. - * @param blockSize size of a single block. - * @param blockFactory factory for creating stream destinations - * @param statistics stats for this stream - * @param writeOperationHelper state of the write operation. - * @param putTracker put tracking for commit support * @throws IOException on any problem */ - S3ABlockOutputStream(S3AFileSystem fs, - String key, - ExecutorService executorService, - Progressable progress, - long blockSize, - S3ADataBlocks.BlockFactory blockFactory, - BlockOutputStreamStatistics statistics, - WriteOperationHelper writeOperationHelper, - PutTracker putTracker) + S3ABlockOutputStream(BlockOutputStreamBuilder builder) throws IOException { - this.fs = fs; - this.key = key; - this.blockFactory = blockFactory; - this.blockSize = (int) blockSize; - this.statistics = statistics != null - ? statistics - : EMPTY_BLOCK_OUTPUT_STREAM_STATISTICS; + builder.validate(); + this.key = builder.key; + this.blockFactory = builder.blockFactory; + this.blockSize = (int) builder.blockSize; + this.statistics = builder.statistics; // test instantiations may not provide statistics; - this.iostatistics = statistics != null - ? statistics.getIOStatistics() - : emptyStatistics(); - this.writeOperationHelper = writeOperationHelper; - this.putTracker = putTracker; - Preconditions.checkArgument(blockSize >= Constants.MULTIPART_MIN_SIZE, - "Block size is too small: %d", blockSize); - this.executorService = MoreExecutors.listeningDecorator(executorService); + this.iostatistics = statistics.getIOStatistics(); + this.writeOperationHelper = builder.writeOperations; + this.putTracker = builder.putTracker; + this.executorService = MoreExecutors.listeningDecorator( + builder.executorService); this.multiPartUpload = null; + final Progressable progress = builder.progress; this.progressListener = (progress instanceof ProgressListener) ? (ProgressListener) progress : new ProgressableListener(progress); + downgradeSyncableExceptions = builder.downgradeSyncableExceptions; // create that first block. This guarantees that an open + close sequence // writes a 0-byte entry. createBlockIfNeeded(); @@ -597,7 +582,7 @@ class S3ABlockOutputStream extends OutputStream implements } private void incrementWriteOperations() { - fs.incrementWriteOperations(); + writeOperationHelper.incrementWriteOperations(); } /** @@ -654,12 +639,31 @@ class S3ABlockOutputStream extends OutputStream implements @Override public void hflush() throws IOException { - throw new UnsupportedOperationException(E_NOT_SYNCABLE); + statistics.hflushInvoked(); + handleSyncableInvocation(); } @Override public void hsync() throws IOException { - throw new UnsupportedOperationException(E_NOT_SYNCABLE); + statistics.hsyncInvoked(); + handleSyncableInvocation(); + } + + /** + * Shared processing of Syncable operation reporting/downgrade. + */ + private void handleSyncableInvocation() { + final UnsupportedOperationException ex + = new UnsupportedOperationException(E_NOT_SYNCABLE); + if (!downgradeSyncableExceptions) { + throw ex; + } + // downgrading. + WARN_ON_SYNCABLE.warn("Application invoked the Syncable API against" + + " stream writing to {}. This is unsupported", + key); + // and log at debug + LOG.debug("Downgrading Syncable call", ex); } @Override @@ -982,4 +986,166 @@ class S3ABlockOutputStream extends OutputStream implements } } + /** + * Create a builder. + * @return + */ + public static BlockOutputStreamBuilder builder() { + return new BlockOutputStreamBuilder(); + } + + /** + * Builder class for constructing an output stream. + */ + public static final class BlockOutputStreamBuilder { + + /** S3 object to work on. */ + private String key; + + /** The executor service to use to schedule work. */ + private ExecutorService executorService; + + /** + * Report progress in order to prevent timeouts. + * this object implements {@code ProgressListener} then it will be + * directly wired up to the AWS client, so receive detailed progress + * information. + */ + private Progressable progress; + + /** The size of a single block. */ + private long blockSize; + + /** The factory for creating stream destinations. */ + private S3ADataBlocks.BlockFactory blockFactory; + + /** The output statistics for the stream. */ + private BlockOutputStreamStatistics statistics = + EMPTY_BLOCK_OUTPUT_STREAM_STATISTICS; + + /** Operations to write data. */ + private WriteOperations writeOperations; + + /** put tracking for commit support. */ + private PutTracker putTracker; + + /** Should Syncable calls be downgraded? */ + private boolean downgradeSyncableExceptions; + + private BlockOutputStreamBuilder() { + } + + /** + * Validate the arguments. + */ + public void validate() { + requireNonNull(key, "null key"); + requireNonNull(executorService, "null executorService"); + requireNonNull(blockFactory, "null blockFactory"); + requireNonNull(statistics, "null statistics"); + requireNonNull(writeOperations, "null writeOperationHelper"); + requireNonNull(putTracker, "null putTracker"); + Preconditions.checkArgument(blockSize >= Constants.MULTIPART_MIN_SIZE, + "Block size is too small: %s", blockSize); + } + + /** + * Set builder value. + * @param value new value + * @return the builder + */ + public BlockOutputStreamBuilder withKey( + final String value) { + key = value; + return this; + } + + /** + * Set builder value. + * @param value new value + * @return the builder + */ + public BlockOutputStreamBuilder withExecutorService( + final ExecutorService value) { + executorService = value; + return this; + } + + /** + * Set builder value. + * @param value new value + * @return the builder + */ + public BlockOutputStreamBuilder withProgress( + final Progressable value) { + progress = value; + return this; + } + + /** + * Set builder value. + * @param value new value + * @return the builder + */ + public BlockOutputStreamBuilder withBlockSize( + final long value) { + blockSize = value; + return this; + } + + /** + * Set builder value. + * @param value new value + * @return the builder + */ + public BlockOutputStreamBuilder withBlockFactory( + final S3ADataBlocks.BlockFactory value) { + blockFactory = value; + return this; + } + + /** + * Set builder value. + * @param value new value + * @return the builder + */ + public BlockOutputStreamBuilder withStatistics( + final BlockOutputStreamStatistics value) { + statistics = value; + return this; + } + + /** + * Set builder value. + * @param value new value + * @return the builder + */ + public BlockOutputStreamBuilder withWriteOperations( + final WriteOperationHelper value) { + writeOperations = value; + return this; + } + + /** + * Set builder value. + * @param value new value + * @return the builder + */ + public BlockOutputStreamBuilder withPutTracker( + final PutTracker value) { + putTracker = value; + return this; + } + + /** + * Set builder value. + * @param value new value + * @return the builder + */ + public BlockOutputStreamBuilder withDowngradeSyncableExceptions( + final boolean value) { + downgradeSyncableExceptions = value; + return this; + } + } } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java index 8db5d51def8..7f0160a5070 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java @@ -1348,20 +1348,27 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities, String destKey = putTracker.getDestKey(); final BlockOutputStreamStatistics outputStreamStatistics = statisticsContext.newOutputStreamStatistics(); - return new FSDataOutputStream( - new S3ABlockOutputStream(this, - destKey, + final S3ABlockOutputStream.BlockOutputStreamBuilder builder = + S3ABlockOutputStream.builder() + .withKey(destKey) + .withBlockFactory(blockFactory) + .withBlockSize(partSize) + .withStatistics(outputStreamStatistics) + .withProgress(progress) + .withPutTracker(putTracker) + .withWriteOperations(getWriteOperationHelper()) + .withExecutorService( new SemaphoredDelegatingExecutor( boundedThreadPool, blockOutputActiveBlocks, true, - outputStreamStatistics), - progress, - partSize, - blockFactory, - outputStreamStatistics, - getWriteOperationHelper(), - putTracker), + outputStreamStatistics)) + .withDowngradeSyncableExceptions( + getConf().getBoolean( + DOWNGRADE_SYNCABLE_EXCEPTIONS, + DOWNGRADE_SYNCABLE_EXCEPTIONS_DEFAULT)); + return new FSDataOutputStream( + new S3ABlockOutputStream(builder), null); } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java index dd28f3e59e0..169a74abef2 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java @@ -1346,7 +1346,9 @@ public class S3AInstrumentation implements Closeable, MetricsSource, STREAM_WRITE_EXCEPTIONS_COMPLETING_UPLOADS.getSymbol(), STREAM_WRITE_QUEUE_DURATION.getSymbol(), STREAM_WRITE_TOTAL_DATA.getSymbol(), - STREAM_WRITE_TOTAL_TIME.getSymbol()) + STREAM_WRITE_TOTAL_TIME.getSymbol(), + INVOCATION_HFLUSH.getSymbol(), + INVOCATION_HSYNC.getSymbol()) .withGauges( STREAM_WRITE_BLOCK_UPLOADS_PENDING.getSymbol(), STREAM_WRITE_BLOCK_UPLOADS_BYTES_PENDING.getSymbol()) @@ -1489,6 +1491,16 @@ public class S3AInstrumentation implements Closeable, MetricsSource, incrementCounter(COMMITTER_BYTES_UPLOADED, size); } + @Override + public void hflushInvoked() { + incCounter(INVOCATION_HFLUSH.getSymbol(), 1); + } + + @Override + public void hsyncInvoked() { + incCounter(INVOCATION_HSYNC.getSymbol(), 1); + } + @Override public void close() { if (getBytesPendingUpload() > 0) { diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java index 1a53f0d1f87..c613c06c9bb 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java @@ -137,6 +137,14 @@ public enum Statistic { StoreStatisticNames.OP_IS_FILE, "Calls of isFile()", TYPE_COUNTER), + INVOCATION_HFLUSH( + StoreStatisticNames.OP_HFLUSH, + "Calls of hflush()", + TYPE_COUNTER), + INVOCATION_HSYNC( + StoreStatisticNames.OP_HSYNC, + "Calls of hsync()", + TYPE_COUNTER), INVOCATION_LIST_FILES( StoreStatisticNames.OP_LIST_FILES, "Calls of listFiles()", diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperationHelper.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperationHelper.java index 9bdf61c22a1..8b71fc32771 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperationHelper.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperationHelper.java @@ -693,4 +693,9 @@ public class WriteOperationHelper implements WriteOperations { } }); } + + @Override + public void incrementWriteOperations() { + owner.incrementWriteOperations(); + } } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperations.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperations.java index 09b9cc924c6..0a8150ce34d 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperations.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperations.java @@ -338,4 +338,10 @@ public interface WriteOperations { SelectObjectContentRequest request, String action) throws IOException; + + /** + * Increment the write operation counter + * of the filesystem. + */ + void incrementWriteOperations(); } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/BlockOutputStreamStatistics.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/BlockOutputStreamStatistics.java index b1cee718c20..772b965d4f4 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/BlockOutputStreamStatistics.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/BlockOutputStreamStatistics.java @@ -134,4 +134,14 @@ public interface BlockOutputStreamStatistics extends Closeable, * @return the value or null if no matching gauge was found. */ Long lookupGaugeValue(String name); + + /** + * Syncable.hflush() has been invoked. + */ + void hflushInvoked(); + + /** + * Syncable.hsync() has been invoked. + */ + void hsyncInvoked(); } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/impl/EmptyS3AStatisticsContext.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/impl/EmptyS3AStatisticsContext.java index c8cd8059208..3a651026a0b 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/impl/EmptyS3AStatisticsContext.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/impl/EmptyS3AStatisticsContext.java @@ -482,6 +482,14 @@ public final class EmptyS3AStatisticsContext implements S3AStatisticsContext { return 0L; } + @Override + public void hflushInvoked() { + } + + @Override + public void hsyncInvoked() { + } + @Override public void close() throws IOException { } diff --git a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/troubleshooting_s3a.md b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/troubleshooting_s3a.md index 416793b8ed9..661dd2f36ad 100644 --- a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/troubleshooting_s3a.md +++ b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/troubleshooting_s3a.md @@ -22,7 +22,7 @@ Common problems working with S3 are 1. Classpath setup 1. Authentication -1. S3 Inconsistency side-effects +1. Incorrect configuration Troubleshooting IAM Assumed Roles is covered in its @@ -1027,7 +1027,7 @@ at the end of a write operation. If a process terminated unexpectedly, or failed to call the `close()` method on an output stream, the pending data will have been lost. -### File `flush()`, `hsync` and `hflush()` calls do not save data to S3 +### File `flush()` calls do not save data to S3 Again, this is due to the fact that the data is cached locally until the `close()` operation. The S3A filesystem cannot be used as a store of data @@ -1036,6 +1036,39 @@ if it is required that the data is persisted durably after every This includes resilient logging, HBase-style journaling and the like. The standard strategy here is to save to HDFS and then copy to S3. +### `UnsupportedOperationException` "S3A streams are not Syncable. See HADOOP-17597." + +The application has tried to call either the `Syncable.hsync()` or `Syncable.hflush()` +methods on an S3A output stream. This has been rejected because the +connector isn't saving any data at all. The `Syncable` API, especially the +`hsync()` call, are critical for applications such as HBase to safely +persist data. + +The S3A connector throws an `UnsupportedOperationException` when these API calls +are made, because the guarantees absolutely cannot be met: nothing is being flushed +or saved. + +* Applications which intend to invoke the Syncable APIs call `hasCapability("hsync")` on + the stream to see if they are supported. +* Or catch and downgrade `UnsupportedOperationException`. + +These recommendations _apply to all filesystems_. + +To downgrade the S3A connector to simply warning of the use of +`hsync()` or `hflush()` calls, set the option +`fs.s3a.downgrade.syncable.exceptions` to true. + +```xml + + fs.s3a.downgrade.syncable.exceptions + true + +``` + +The count of invocations of the two APIs are collected +in the S3A filesystem Statistics/IOStatistics and so +their use can be monitored. + ### `RemoteFileChangedException` and read-during-overwrite ``` diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestDowngradeSyncable.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestDowngradeSyncable.java new file mode 100644 index 00000000000..0bcb11a823d --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestDowngradeSyncable.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a; + +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.s3a.performance.AbstractS3ACostTest; +import org.apache.hadoop.fs.statistics.IOStatistics; + +import static org.apache.hadoop.fs.s3a.Constants.DOWNGRADE_SYNCABLE_EXCEPTIONS; +import static org.apache.hadoop.fs.s3a.S3ATestUtils.getTestBucketName; +import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBucketOverrides; +import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.assertThatStatisticCounter; +import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.ioStatisticsToString; +import static org.apache.hadoop.fs.statistics.StoreStatisticNames.OP_HFLUSH; +import static org.apache.hadoop.fs.statistics.StoreStatisticNames.OP_HSYNC; + + +public class ITestDowngradeSyncable extends AbstractS3ACostTest { + + protected static final Logger LOG = + LoggerFactory.getLogger(ITestDowngradeSyncable.class); + + + public ITestDowngradeSyncable() { + super(false, true, false); + } + + @Override + public Configuration createConfiguration() { + final Configuration conf = super.createConfiguration(); + String bucketName = getTestBucketName(conf); + removeBucketOverrides(bucketName, conf, + DOWNGRADE_SYNCABLE_EXCEPTIONS); + conf.setBoolean(DOWNGRADE_SYNCABLE_EXCEPTIONS, true); + return conf; + } + + @Test + public void testHFlushDowngrade() throws Throwable { + describe("Verify that hflush() calls can be downgraded from fail" + + " to ignore; the relevant counter is updated"); + Path path = methodPath(); + S3AFileSystem fs = getFileSystem(); + final IOStatistics fsIoStats = fs.getIOStatistics(); + assertThatStatisticCounter(fsIoStats, OP_HFLUSH) + .isEqualTo(0); + + try (FSDataOutputStream out = fs.create(path, true)) { + out.write('1'); + // must succeed + out.hflush(); + // stats counter records the downgrade + IOStatistics iostats = out.getIOStatistics(); + LOG.info("IOStats {}", ioStatisticsToString(iostats)); + assertThatStatisticCounter(iostats, OP_HFLUSH) + .isEqualTo(1); + assertThatStatisticCounter(iostats, OP_HSYNC) + .isEqualTo(0); + } + // once closed. the FS will have its stats merged. + assertThatStatisticCounter(fsIoStats, OP_HFLUSH) + .isEqualTo(1); + } + + @Test + public void testHSyncDowngrade() throws Throwable { + describe("Verify that hsync() calls can be downgraded from fail" + + " to ignore; the relevant counter is updated"); + Path path = methodPath(); + S3AFileSystem fs = getFileSystem(); + final IOStatistics fsIoStats = fs.getIOStatistics(); + assertThatStatisticCounter(fsIoStats, OP_HSYNC) + .isEqualTo(0); + + try (FSDataOutputStream out = fs.create(path, true)) { + out.write('1'); + // must succeed + out.hsync(); + // stats counter records the downgrade + IOStatistics iostats = out.getIOStatistics(); + LOG.info("IOStats {}", ioStatisticsToString(iostats)); + assertThatStatisticCounter(iostats, OP_HFLUSH) + .isEqualTo(0); + assertThatStatisticCounter(iostats, OP_HSYNC) + .isEqualTo(1); + } + // once closed. the FS will have its stats merged. + assertThatStatisticCounter(fsIoStats, OP_HSYNC) + .isEqualTo(1); + } + +} diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3ABlockOutputStream.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3ABlockOutputStream.java index baa4a542c85..de27411a41a 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3ABlockOutputStream.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3ABlockOutputStream.java @@ -43,8 +43,11 @@ public class TestS3ABlockOutputStream extends AbstractS3AMockTest { private S3ABlockOutputStream stream; - @Before - public void setUp() throws Exception { + /** + * Create an S3A Builder all mocked up from component pieces. + * @return stream builder. + */ + private S3ABlockOutputStream.BlockOutputStreamBuilder mockS3ABuilder() { ExecutorService executorService = mock(ExecutorService.class); Progressable progressable = mock(Progressable.class); S3ADataBlocks.BlockFactory blockFactory = @@ -52,11 +55,26 @@ public class TestS3ABlockOutputStream extends AbstractS3AMockTest { long blockSize = Constants.DEFAULT_MULTIPART_SIZE; WriteOperationHelper oHelper = mock(WriteOperationHelper.class); PutTracker putTracker = mock(PutTracker.class); - stream = spy(new S3ABlockOutputStream(fs, "", executorService, - progressable, blockSize, blockFactory, null, oHelper, - putTracker)); + final S3ABlockOutputStream.BlockOutputStreamBuilder builder = + S3ABlockOutputStream.builder() + .withBlockFactory(blockFactory) + .withBlockSize(blockSize) + .withExecutorService(executorService) + .withKey("") + .withProgress(progressable) + .withPutTracker(putTracker) + .withWriteOperations(oHelper); + return builder; } + @Before + public void setUp() throws Exception { + final S3ABlockOutputStream.BlockOutputStreamBuilder + builder = mockS3ABuilder(); + stream = spy(new S3ABlockOutputStream(builder)); + } + + @Test public void testFlushNoOpWhenStreamClosed() throws Exception { doThrow(new IOException()).when(stream).checkOpen(); @@ -108,4 +126,31 @@ public class TestS3ABlockOutputStream extends AbstractS3AMockTest { // This will ensure abort() can be called with try-with-resource. stream.close(); } + + + /** + * Unless configured to downgrade, the stream will raise exceptions on + * Syncable API calls. + */ + @Test + public void testSyncableUnsupported() throws Exception { + intercept(UnsupportedOperationException.class, () -> stream.hflush()); + intercept(UnsupportedOperationException.class, () -> stream.hsync()); + } + + /** + * When configured to downgrade, the stream downgrades on + * Syncable API calls. + */ + @Test + public void testSyncableDowngrade() throws Exception { + final S3ABlockOutputStream.BlockOutputStreamBuilder + builder = mockS3ABuilder(); + builder.withDowngradeSyncableExceptions(true); + stream = spy(new S3ABlockOutputStream(builder)); + + stream.hflush(); + stream.hsync(); + } + }