HADOOP-17597. Optionally downgrade on S3A Syncable calls (#2801)

Followup to HADOOP-13327, which changed S3A output stream hsync/hflush calls
to raise an exception.

Adds a new option fs.s3a.downgrade.syncable.exceptions

When true, calls to Syncable hsync/hflush on S3A output streams will
log once at warn (for entire process life, not just the stream), then
increment IOStats with the relevant operation counter

With the downgrade option false (default)
* IOStats are incremented
* The UnsupportedOperationException current raised includes a link to the
  JIRA.

Contributed by Steve Loughran.

Change-Id: I967e077eda1d1a1a3795b4d22e003fe7997b6679
This commit is contained in:
Steve Loughran 2021-04-23 18:44:41 +01:00
parent 1b2bc77923
commit fb71e6c91e
No known key found for this signature in database
GPG Key ID: D22CF846DBB162A0
13 changed files with 499 additions and 71 deletions

View File

@ -84,6 +84,12 @@ public final class StoreStatisticNames {
/** {@value}. */
public static final String OP_IS_FILE = "op_is_file";
/** {@value}. */
public static final String OP_HFLUSH = "op_hflush";
/** {@value}. */
public static final String OP_HSYNC = "op_hsync";
/** {@value}. */
public static final String OP_IS_DIRECTORY = "op_is_directory";

View File

@ -329,7 +329,6 @@ public final class Constants {
* Default is {@link #FAST_UPLOAD_BUFFER_DISK}
* Value: {@value}
*/
@InterfaceStability.Unstable
public static final String FAST_UPLOAD_BUFFER =
"fs.s3a.fast.upload.buffer";
@ -338,26 +337,22 @@ public final class Constants {
* Capacity is limited to available disk space.
*/
@InterfaceStability.Unstable
public static final String FAST_UPLOAD_BUFFER_DISK = "disk";
/**
* Use an in-memory array. Fast but will run of heap rapidly: {@value}.
*/
@InterfaceStability.Unstable
public static final String FAST_UPLOAD_BUFFER_ARRAY = "array";
/**
* Use a byte buffer. May be more memory efficient than the
* {@link #FAST_UPLOAD_BUFFER_ARRAY}: {@value}.
*/
@InterfaceStability.Unstable
public static final String FAST_UPLOAD_BYTEBUFFER = "bytebuffer";
/**
* Default buffer option: {@value}.
*/
@InterfaceStability.Unstable
public static final String DEFAULT_FAST_UPLOAD_BUFFER =
FAST_UPLOAD_BUFFER_DISK;
@ -370,7 +365,6 @@ public final class Constants {
* <p>
* Default is {@link #DEFAULT_FAST_UPLOAD_ACTIVE_BLOCKS}
*/
@InterfaceStability.Unstable
public static final String FAST_UPLOAD_ACTIVE_BLOCKS =
"fs.s3a.fast.upload.active.blocks";
@ -378,9 +372,23 @@ public final class Constants {
* Limit of queued block upload operations before writes
* block. Value: {@value}
*/
@InterfaceStability.Unstable
public static final int DEFAULT_FAST_UPLOAD_ACTIVE_BLOCKS = 4;
/**
* Rather than raise an exception when an attempt is
* made to call the Syncable APIs, warn and downgrade.
* Value: {@value}.
*/
public static final String DOWNGRADE_SYNCABLE_EXCEPTIONS =
"fs.s3a.downgrade.syncable.exceptions";
/**
* Default value for syncable invocation.
* Value: {@value}.
*/
public static final boolean DOWNGRADE_SYNCABLE_EXCEPTIONS_DEFAULT =
false;
/**
* The capacity of executor queues for operations other than block
* upload, where {@link #FAST_UPLOAD_ACTIVE_BLOCKS} is used instead.

View File

@ -55,6 +55,7 @@ import org.apache.hadoop.fs.StreamCapabilities;
import org.apache.hadoop.fs.Syncable;
import org.apache.hadoop.fs.s3a.commit.CommitConstants;
import org.apache.hadoop.fs.s3a.commit.PutTracker;
import org.apache.hadoop.fs.s3a.impl.LogExactlyOnce;
import org.apache.hadoop.fs.s3a.statistics.BlockOutputStreamStatistics;
import org.apache.hadoop.fs.statistics.DurationTracker;
import org.apache.hadoop.fs.statistics.IOStatistics;
@ -62,10 +63,10 @@ import org.apache.hadoop.fs.statistics.IOStatisticsLogging;
import org.apache.hadoop.fs.statistics.IOStatisticsSource;
import org.apache.hadoop.util.Progressable;
import static java.util.Objects.requireNonNull;
import static org.apache.hadoop.fs.s3a.S3AUtils.*;
import static org.apache.hadoop.fs.s3a.Statistic.*;
import static org.apache.hadoop.fs.s3a.statistics.impl.EmptyS3AStatisticsContext.EMPTY_BLOCK_OUTPUT_STREAM_STATISTICS;
import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.emptyStatistics;
import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDurationOfInvocation;
import static org.apache.hadoop.io.IOUtils.cleanupWithLogger;
@ -89,10 +90,8 @@ class S3ABlockOutputStream extends OutputStream implements
private static final Logger LOG =
LoggerFactory.getLogger(S3ABlockOutputStream.class);
private static final String E_NOT_SYNCABLE = "S3A streams are not Syncable";
/** Owner FileSystem. */
private final S3AFileSystem fs;
private static final String E_NOT_SYNCABLE =
"S3A streams are not Syncable. See HADOOP-17597.";
/** Object being uploaded. */
private final String key;
@ -136,62 +135,48 @@ class S3ABlockOutputStream extends OutputStream implements
/**
* Write operation helper; encapsulation of the filesystem operations.
*/
private final WriteOperationHelper writeOperationHelper;
private final WriteOperations writeOperationHelper;
/**
* Track multipart put operation.
*/
private final PutTracker putTracker;
/** Should Syncable calls be downgraded? */
private final boolean downgradeSyncableExceptions;
/**
* Downagraded syncable API calls are only logged at warn
* once across the entire process.
*/
private static final LogExactlyOnce WARN_ON_SYNCABLE =
new LogExactlyOnce(LOG);
/**
* An S3A output stream which uploads partitions in a separate pool of
* threads; different {@link S3ADataBlocks.BlockFactory}
* instances can control where data is buffered.
*
* @param fs S3AFilesystem
* @param key S3 object to work on.
* @param executorService the executor service to use to schedule work
* @param progress report progress in order to prevent timeouts. If
* this object implements {@code ProgressListener} then it will be
* directly wired up to the AWS client, so receive detailed progress
* information.
* @param blockSize size of a single block.
* @param blockFactory factory for creating stream destinations
* @param statistics stats for this stream
* @param writeOperationHelper state of the write operation.
* @param putTracker put tracking for commit support
* @throws IOException on any problem
*/
S3ABlockOutputStream(S3AFileSystem fs,
String key,
ExecutorService executorService,
Progressable progress,
long blockSize,
S3ADataBlocks.BlockFactory blockFactory,
BlockOutputStreamStatistics statistics,
WriteOperationHelper writeOperationHelper,
PutTracker putTracker)
S3ABlockOutputStream(BlockOutputStreamBuilder builder)
throws IOException {
this.fs = fs;
this.key = key;
this.blockFactory = blockFactory;
this.blockSize = (int) blockSize;
this.statistics = statistics != null
? statistics
: EMPTY_BLOCK_OUTPUT_STREAM_STATISTICS;
builder.validate();
this.key = builder.key;
this.blockFactory = builder.blockFactory;
this.blockSize = (int) builder.blockSize;
this.statistics = builder.statistics;
// test instantiations may not provide statistics;
this.iostatistics = statistics != null
? statistics.getIOStatistics()
: emptyStatistics();
this.writeOperationHelper = writeOperationHelper;
this.putTracker = putTracker;
Preconditions.checkArgument(blockSize >= Constants.MULTIPART_MIN_SIZE,
"Block size is too small: %d", blockSize);
this.executorService = MoreExecutors.listeningDecorator(executorService);
this.iostatistics = statistics.getIOStatistics();
this.writeOperationHelper = builder.writeOperations;
this.putTracker = builder.putTracker;
this.executorService = MoreExecutors.listeningDecorator(
builder.executorService);
this.multiPartUpload = null;
final Progressable progress = builder.progress;
this.progressListener = (progress instanceof ProgressListener) ?
(ProgressListener) progress
: new ProgressableListener(progress);
downgradeSyncableExceptions = builder.downgradeSyncableExceptions;
// create that first block. This guarantees that an open + close sequence
// writes a 0-byte entry.
createBlockIfNeeded();
@ -597,7 +582,7 @@ class S3ABlockOutputStream extends OutputStream implements
}
private void incrementWriteOperations() {
fs.incrementWriteOperations();
writeOperationHelper.incrementWriteOperations();
}
/**
@ -654,12 +639,31 @@ class S3ABlockOutputStream extends OutputStream implements
@Override
public void hflush() throws IOException {
throw new UnsupportedOperationException(E_NOT_SYNCABLE);
statistics.hflushInvoked();
handleSyncableInvocation();
}
@Override
public void hsync() throws IOException {
throw new UnsupportedOperationException(E_NOT_SYNCABLE);
statistics.hsyncInvoked();
handleSyncableInvocation();
}
/**
* Shared processing of Syncable operation reporting/downgrade.
*/
private void handleSyncableInvocation() {
final UnsupportedOperationException ex
= new UnsupportedOperationException(E_NOT_SYNCABLE);
if (!downgradeSyncableExceptions) {
throw ex;
}
// downgrading.
WARN_ON_SYNCABLE.warn("Application invoked the Syncable API against"
+ " stream writing to {}. This is unsupported",
key);
// and log at debug
LOG.debug("Downgrading Syncable call", ex);
}
@Override
@ -982,4 +986,166 @@ class S3ABlockOutputStream extends OutputStream implements
}
}
/**
* Create a builder.
* @return
*/
public static BlockOutputStreamBuilder builder() {
return new BlockOutputStreamBuilder();
}
/**
* Builder class for constructing an output stream.
*/
public static final class BlockOutputStreamBuilder {
/** S3 object to work on. */
private String key;
/** The executor service to use to schedule work. */
private ExecutorService executorService;
/**
* Report progress in order to prevent timeouts.
* this object implements {@code ProgressListener} then it will be
* directly wired up to the AWS client, so receive detailed progress
* information.
*/
private Progressable progress;
/** The size of a single block. */
private long blockSize;
/** The factory for creating stream destinations. */
private S3ADataBlocks.BlockFactory blockFactory;
/** The output statistics for the stream. */
private BlockOutputStreamStatistics statistics =
EMPTY_BLOCK_OUTPUT_STREAM_STATISTICS;
/** Operations to write data. */
private WriteOperations writeOperations;
/** put tracking for commit support. */
private PutTracker putTracker;
/** Should Syncable calls be downgraded? */
private boolean downgradeSyncableExceptions;
private BlockOutputStreamBuilder() {
}
/**
* Validate the arguments.
*/
public void validate() {
requireNonNull(key, "null key");
requireNonNull(executorService, "null executorService");
requireNonNull(blockFactory, "null blockFactory");
requireNonNull(statistics, "null statistics");
requireNonNull(writeOperations, "null writeOperationHelper");
requireNonNull(putTracker, "null putTracker");
Preconditions.checkArgument(blockSize >= Constants.MULTIPART_MIN_SIZE,
"Block size is too small: %s", blockSize);
}
/**
* Set builder value.
* @param value new value
* @return the builder
*/
public BlockOutputStreamBuilder withKey(
final String value) {
key = value;
return this;
}
/**
* Set builder value.
* @param value new value
* @return the builder
*/
public BlockOutputStreamBuilder withExecutorService(
final ExecutorService value) {
executorService = value;
return this;
}
/**
* Set builder value.
* @param value new value
* @return the builder
*/
public BlockOutputStreamBuilder withProgress(
final Progressable value) {
progress = value;
return this;
}
/**
* Set builder value.
* @param value new value
* @return the builder
*/
public BlockOutputStreamBuilder withBlockSize(
final long value) {
blockSize = value;
return this;
}
/**
* Set builder value.
* @param value new value
* @return the builder
*/
public BlockOutputStreamBuilder withBlockFactory(
final S3ADataBlocks.BlockFactory value) {
blockFactory = value;
return this;
}
/**
* Set builder value.
* @param value new value
* @return the builder
*/
public BlockOutputStreamBuilder withStatistics(
final BlockOutputStreamStatistics value) {
statistics = value;
return this;
}
/**
* Set builder value.
* @param value new value
* @return the builder
*/
public BlockOutputStreamBuilder withWriteOperations(
final WriteOperationHelper value) {
writeOperations = value;
return this;
}
/**
* Set builder value.
* @param value new value
* @return the builder
*/
public BlockOutputStreamBuilder withPutTracker(
final PutTracker value) {
putTracker = value;
return this;
}
/**
* Set builder value.
* @param value new value
* @return the builder
*/
public BlockOutputStreamBuilder withDowngradeSyncableExceptions(
final boolean value) {
downgradeSyncableExceptions = value;
return this;
}
}
}

View File

@ -1348,19 +1348,26 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
String destKey = putTracker.getDestKey();
final BlockOutputStreamStatistics outputStreamStatistics
= statisticsContext.newOutputStreamStatistics();
return new FSDataOutputStream(
new S3ABlockOutputStream(this,
destKey,
final S3ABlockOutputStream.BlockOutputStreamBuilder builder =
S3ABlockOutputStream.builder()
.withKey(destKey)
.withBlockFactory(blockFactory)
.withBlockSize(partSize)
.withStatistics(outputStreamStatistics)
.withProgress(progress)
.withPutTracker(putTracker)
.withWriteOperations(getWriteOperationHelper())
.withExecutorService(
new SemaphoredDelegatingExecutor(
boundedThreadPool,
blockOutputActiveBlocks,
true),
progress,
partSize,
blockFactory,
outputStreamStatistics,
getWriteOperationHelper(),
putTracker),
true))
.withDowngradeSyncableExceptions(
getConf().getBoolean(
DOWNGRADE_SYNCABLE_EXCEPTIONS,
DOWNGRADE_SYNCABLE_EXCEPTIONS_DEFAULT));
return new FSDataOutputStream(
new S3ABlockOutputStream(builder),
null);
}

View File

@ -1346,7 +1346,9 @@ public class S3AInstrumentation implements Closeable, MetricsSource,
STREAM_WRITE_EXCEPTIONS_COMPLETING_UPLOADS.getSymbol(),
STREAM_WRITE_QUEUE_DURATION.getSymbol(),
STREAM_WRITE_TOTAL_DATA.getSymbol(),
STREAM_WRITE_TOTAL_TIME.getSymbol())
STREAM_WRITE_TOTAL_TIME.getSymbol(),
INVOCATION_HFLUSH.getSymbol(),
INVOCATION_HSYNC.getSymbol())
.withGauges(
STREAM_WRITE_BLOCK_UPLOADS_PENDING.getSymbol(),
STREAM_WRITE_BLOCK_UPLOADS_BYTES_PENDING.getSymbol())
@ -1489,6 +1491,16 @@ public class S3AInstrumentation implements Closeable, MetricsSource,
incrementCounter(COMMITTER_BYTES_UPLOADED, size);
}
@Override
public void hflushInvoked() {
incCounter(INVOCATION_HFLUSH.getSymbol(), 1);
}
@Override
public void hsyncInvoked() {
incCounter(INVOCATION_HSYNC.getSymbol(), 1);
}
@Override
public void close() {
if (getBytesPendingUpload() > 0) {

View File

@ -137,6 +137,14 @@ public enum Statistic {
StoreStatisticNames.OP_IS_FILE,
"Calls of isFile()",
TYPE_COUNTER),
INVOCATION_HFLUSH(
StoreStatisticNames.OP_HFLUSH,
"Calls of hflush()",
TYPE_COUNTER),
INVOCATION_HSYNC(
StoreStatisticNames.OP_HSYNC,
"Calls of hsync()",
TYPE_COUNTER),
INVOCATION_LIST_FILES(
StoreStatisticNames.OP_LIST_FILES,
"Calls of listFiles()",

View File

@ -693,4 +693,9 @@ public class WriteOperationHelper implements WriteOperations {
}
});
}
@Override
public void incrementWriteOperations() {
owner.incrementWriteOperations();
}
}

View File

@ -338,4 +338,10 @@ public interface WriteOperations {
SelectObjectContentRequest request,
String action)
throws IOException;
/**
* Increment the write operation counter
* of the filesystem.
*/
void incrementWriteOperations();
}

View File

@ -134,4 +134,14 @@ public interface BlockOutputStreamStatistics extends Closeable,
* @return the value or null if no matching gauge was found.
*/
Long lookupGaugeValue(String name);
/**
* Syncable.hflush() has been invoked.
*/
void hflushInvoked();
/**
* Syncable.hsync() has been invoked.
*/
void hsyncInvoked();
}

View File

@ -482,6 +482,14 @@ public final class EmptyS3AStatisticsContext implements S3AStatisticsContext {
return 0L;
}
@Override
public void hflushInvoked() {
}
@Override
public void hsyncInvoked() {
}
@Override
public void close() throws IOException {
}

View File

@ -22,7 +22,7 @@ Common problems working with S3 are
1. Classpath setup
1. Authentication
1. S3 Inconsistency side-effects
1. Incorrect configuration
Troubleshooting IAM Assumed Roles is covered in its
@ -1027,7 +1027,7 @@ at the end of a write operation. If a process terminated unexpectedly, or failed
to call the `close()` method on an output stream, the pending data will have
been lost.
### File `flush()`, `hsync` and `hflush()` calls do not save data to S3
### File `flush()` calls do not save data to S3
Again, this is due to the fact that the data is cached locally until the
`close()` operation. The S3A filesystem cannot be used as a store of data
@ -1036,6 +1036,39 @@ if it is required that the data is persisted durably after every
This includes resilient logging, HBase-style journaling
and the like. The standard strategy here is to save to HDFS and then copy to S3.
### <a name="syncable"></a> `UnsupportedOperationException` "S3A streams are not Syncable. See HADOOP-17597."
The application has tried to call either the `Syncable.hsync()` or `Syncable.hflush()`
methods on an S3A output stream. This has been rejected because the
connector isn't saving any data at all. The `Syncable` API, especially the
`hsync()` call, are critical for applications such as HBase to safely
persist data.
The S3A connector throws an `UnsupportedOperationException` when these API calls
are made, because the guarantees absolutely cannot be met: nothing is being flushed
or saved.
* Applications which intend to invoke the Syncable APIs call `hasCapability("hsync")` on
the stream to see if they are supported.
* Or catch and downgrade `UnsupportedOperationException`.
These recommendations _apply to all filesystems_.
To downgrade the S3A connector to simply warning of the use of
`hsync()` or `hflush()` calls, set the option
`fs.s3a.downgrade.syncable.exceptions` to true.
```xml
<property>
<name>fs.s3a.downgrade.syncable.exceptions</name>
<value>true</value>
</property>
```
The count of invocations of the two APIs are collected
in the S3A filesystem Statistics/IOStatistics and so
their use can be monitored.
### `RemoteFileChangedException` and read-during-overwrite
```

View File

@ -0,0 +1,114 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3a;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.s3a.performance.AbstractS3ACostTest;
import org.apache.hadoop.fs.statistics.IOStatistics;
import static org.apache.hadoop.fs.s3a.Constants.DOWNGRADE_SYNCABLE_EXCEPTIONS;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.getTestBucketName;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBucketOverrides;
import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.assertThatStatisticCounter;
import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.ioStatisticsToString;
import static org.apache.hadoop.fs.statistics.StoreStatisticNames.OP_HFLUSH;
import static org.apache.hadoop.fs.statistics.StoreStatisticNames.OP_HSYNC;
public class ITestDowngradeSyncable extends AbstractS3ACostTest {
protected static final Logger LOG =
LoggerFactory.getLogger(ITestDowngradeSyncable.class);
public ITestDowngradeSyncable() {
super(false, true, false);
}
@Override
public Configuration createConfiguration() {
final Configuration conf = super.createConfiguration();
String bucketName = getTestBucketName(conf);
removeBucketOverrides(bucketName, conf,
DOWNGRADE_SYNCABLE_EXCEPTIONS);
conf.setBoolean(DOWNGRADE_SYNCABLE_EXCEPTIONS, true);
return conf;
}
@Test
public void testHFlushDowngrade() throws Throwable {
describe("Verify that hflush() calls can be downgraded from fail"
+ " to ignore; the relevant counter is updated");
Path path = methodPath();
S3AFileSystem fs = getFileSystem();
final IOStatistics fsIoStats = fs.getIOStatistics();
assertThatStatisticCounter(fsIoStats, OP_HFLUSH)
.isEqualTo(0);
try (FSDataOutputStream out = fs.create(path, true)) {
out.write('1');
// must succeed
out.hflush();
// stats counter records the downgrade
IOStatistics iostats = out.getIOStatistics();
LOG.info("IOStats {}", ioStatisticsToString(iostats));
assertThatStatisticCounter(iostats, OP_HFLUSH)
.isEqualTo(1);
assertThatStatisticCounter(iostats, OP_HSYNC)
.isEqualTo(0);
}
// once closed. the FS will have its stats merged.
assertThatStatisticCounter(fsIoStats, OP_HFLUSH)
.isEqualTo(1);
}
@Test
public void testHSyncDowngrade() throws Throwable {
describe("Verify that hsync() calls can be downgraded from fail"
+ " to ignore; the relevant counter is updated");
Path path = methodPath();
S3AFileSystem fs = getFileSystem();
final IOStatistics fsIoStats = fs.getIOStatistics();
assertThatStatisticCounter(fsIoStats, OP_HSYNC)
.isEqualTo(0);
try (FSDataOutputStream out = fs.create(path, true)) {
out.write('1');
// must succeed
out.hsync();
// stats counter records the downgrade
IOStatistics iostats = out.getIOStatistics();
LOG.info("IOStats {}", ioStatisticsToString(iostats));
assertThatStatisticCounter(iostats, OP_HFLUSH)
.isEqualTo(0);
assertThatStatisticCounter(iostats, OP_HSYNC)
.isEqualTo(1);
}
// once closed. the FS will have its stats merged.
assertThatStatisticCounter(fsIoStats, OP_HSYNC)
.isEqualTo(1);
}
}

View File

@ -43,8 +43,11 @@ public class TestS3ABlockOutputStream extends AbstractS3AMockTest {
private S3ABlockOutputStream stream;
@Before
public void setUp() throws Exception {
/**
* Create an S3A Builder all mocked up from component pieces.
* @return stream builder.
*/
private S3ABlockOutputStream.BlockOutputStreamBuilder mockS3ABuilder() {
ExecutorService executorService = mock(ExecutorService.class);
Progressable progressable = mock(Progressable.class);
S3ADataBlocks.BlockFactory blockFactory =
@ -52,11 +55,26 @@ public class TestS3ABlockOutputStream extends AbstractS3AMockTest {
long blockSize = Constants.DEFAULT_MULTIPART_SIZE;
WriteOperationHelper oHelper = mock(WriteOperationHelper.class);
PutTracker putTracker = mock(PutTracker.class);
stream = spy(new S3ABlockOutputStream(fs, "", executorService,
progressable, blockSize, blockFactory, null, oHelper,
putTracker));
final S3ABlockOutputStream.BlockOutputStreamBuilder builder =
S3ABlockOutputStream.builder()
.withBlockFactory(blockFactory)
.withBlockSize(blockSize)
.withExecutorService(executorService)
.withKey("")
.withProgress(progressable)
.withPutTracker(putTracker)
.withWriteOperations(oHelper);
return builder;
}
@Before
public void setUp() throws Exception {
final S3ABlockOutputStream.BlockOutputStreamBuilder
builder = mockS3ABuilder();
stream = spy(new S3ABlockOutputStream(builder));
}
@Test
public void testFlushNoOpWhenStreamClosed() throws Exception {
doThrow(new IOException()).when(stream).checkOpen();
@ -108,4 +126,31 @@ public class TestS3ABlockOutputStream extends AbstractS3AMockTest {
// This will ensure abort() can be called with try-with-resource.
stream.close();
}
/**
* Unless configured to downgrade, the stream will raise exceptions on
* Syncable API calls.
*/
@Test
public void testSyncableUnsupported() throws Exception {
intercept(UnsupportedOperationException.class, () -> stream.hflush());
intercept(UnsupportedOperationException.class, () -> stream.hsync());
}
/**
* When configured to downgrade, the stream downgrades on
* Syncable API calls.
*/
@Test
public void testSyncableDowngrade() throws Exception {
final S3ABlockOutputStream.BlockOutputStreamBuilder
builder = mockS3ABuilder();
builder.withDowngradeSyncableExceptions(true);
stream = spy(new S3ABlockOutputStream(builder));
stream.hflush();
stream.hsync();
}
}