diff --git a/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml b/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml
index 1f1ab55164f..5cb37f0e88a 100644
--- a/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml
+++ b/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml
@@ -1874,6 +1874,48 @@
+
+ fs.s3a.change.detection.source
+ etag
+
+ Select which S3 object attribute to use for change detection.
+ Currently support 'etag' for S3 object eTags and 'versionid' for
+ S3 object version IDs. Use of version IDs requires object versioning to be
+ enabled for each S3 bucket utilized. Object versioning is disabled on
+ buckets by default. When version ID is used, the buckets utilized should
+ have versioning enabled before any data is written.
+
+
+
+
+ fs.s3a.change.detection.mode
+ server
+
+ Determines how change detection is applied to alert to S3 objects
+ rewritten while being read. Value 'server' indicates to apply the attribute
+ constraint directly on GetObject requests to S3. Value 'client' means to do a
+ client-side comparison of the attribute value returned in the response. Value
+ 'server' would not work with third-party S3 implementations that do not
+ support these constraints on GetObject. Values 'server' and 'client' generate
+ RemoteObjectChangedException when a mismatch is detected. Value 'warn' works
+ like 'client' but generates only a warning. Value 'none' will ignore change
+ detection completely.
+
+
+
+
+ fs.s3a.change.detection.version.required
+ true
+
+ Determines if S3 object version attribute defined by
+ fs.s3a.change.detection.source should be treated as required. If true and the
+ referred attribute is unavailable in an S3 GetObject response,
+ NoVersionAttributeException is thrown. Setting to 'true' is encouraged to
+ avoid potential for inconsistent reads with third-party S3 implementations or
+ against S3 buckets that have object versioning disabled.
+
+
+
fs.AbstractFileSystem.wasb.impl
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java
index bdd3add3aae..e0b1629b40a 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java
@@ -641,4 +641,84 @@ public final class Constants {
*/
public static final boolean ETAG_CHECKSUM_ENABLED_DEFAULT = false;
+ /**
+ * Where to get the value to use in change detection. E.g. eTag, or
+ * versionId?
+ */
+ public static final String CHANGE_DETECT_SOURCE
+ = "fs.s3a.change.detection.source";
+
+ /**
+ * eTag as the change detection mechanism.
+ */
+ public static final String CHANGE_DETECT_SOURCE_ETAG = "etag";
+
+ /**
+ * Object versionId as the change detection mechanism.
+ */
+ public static final String CHANGE_DETECT_SOURCE_VERSION_ID = "versionid";
+
+ /**
+ * Default change detection mechanism: eTag.
+ */
+ public static final String CHANGE_DETECT_SOURCE_DEFAULT =
+ CHANGE_DETECT_SOURCE_ETAG;
+
+ /**
+ * Mode to run change detection in. Server side comparison? Client side
+ * comparison? Client side compare and warn rather than exception? Don't
+ * bother at all?
+ */
+ public static final String CHANGE_DETECT_MODE =
+ "fs.s3a.change.detection.mode";
+
+ /**
+ * Change is detected on the client side by comparing the returned id with the
+ * expected id. A difference results in {@link RemoteFileChangedException}.
+ */
+ public static final String CHANGE_DETECT_MODE_CLIENT = "client";
+
+ /**
+ * Change is detected by passing the expected value in the GetObject request.
+ * If the expected value is unavailable, {@link RemoteFileChangedException} is
+ * thrown.
+ */
+ public static final String CHANGE_DETECT_MODE_SERVER = "server";
+
+ /**
+ * Change is detected on the client side by comparing the returned id with the
+ * expected id. A difference results in a WARN level message being logged.
+ */
+ public static final String CHANGE_DETECT_MODE_WARN = "warn";
+
+ /**
+ * Change detection is turned off. Readers may see inconsistent results due
+ * to concurrent writes without any exception or warning messages. May be
+ * useful with third-party S3 API implementations that don't support one of
+ * the change detection modes.
+ */
+ public static final String CHANGE_DETECT_MODE_NONE = "none";
+
+ /**
+ * Default change detection mode: server.
+ */
+ public static final String CHANGE_DETECT_MODE_DEFAULT =
+ CHANGE_DETECT_MODE_SERVER;
+
+ /**
+ * If true, raises a {@link RemoteFileChangedException} exception when S3
+ * doesn't provide the attribute defined by fs.s3a.change.detection.source.
+ * For example, if source is versionId, but object versioning is not enabled
+ * on the bucket, or alternatively if source is eTag and a third-party S3
+ * implementation that doesn't return eTag is used.
+ *
+ * When false, only a warning message will be logged for this condition.
+ */
+ public static final String CHANGE_DETECT_REQUIRE_VERSION =
+ "fs.s3a.change.detection.version.required";
+
+ /**
+ * Default change detection require version: true.
+ */
+ public static final boolean CHANGE_DETECT_REQUIRE_VERSION_DEFAULT = true;
}
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/NoVersionAttributeException.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/NoVersionAttributeException.java
new file mode 100644
index 00000000000..8cad74a154d
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/NoVersionAttributeException.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.fs.PathIOException;
+
+/**
+ * Indicates the S3 object does not provide the versioning attribute required
+ * by the configured change detection policy.
+ */
+@SuppressWarnings("serial")
+@InterfaceAudience.Public
+@InterfaceStability.Unstable
+public class NoVersionAttributeException extends PathIOException {
+
+ /**
+ * Constructs a NoVersionAttributeException.
+ *
+ * @param path the path accessed when the condition was detected
+ * @param message a message providing more details about the condition
+ */
+ public NoVersionAttributeException(String path,
+ String message) {
+ super(path, message);
+ }
+}
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/RemoteFileChangedException.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/RemoteFileChangedException.java
new file mode 100644
index 00000000000..cfa5935bbf3
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/RemoteFileChangedException.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.fs.PathIOException;
+
+/**
+ * Indicates the S3 object is out of sync with the expected version. Thrown in
+ * cases such as when the object is updated while an {@link S3AInputStream} is
+ * open.
+ */
+@SuppressWarnings("serial")
+@InterfaceAudience.Public
+@InterfaceStability.Unstable
+public class RemoteFileChangedException extends PathIOException {
+
+ /**
+ * Constructs a RemoteFileChangedException.
+ *
+ * @param path the path accessed when the change was detected
+ * @param operation the operation (e.g. open, re-open) performed when the
+ * change was detected
+ * @param message a message providing more details about the condition
+ */
+ public RemoteFileChangedException(String path,
+ String operation,
+ String message) {
+ super(path, message);
+ setOperation(operation);
+ }
+}
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
index 031a80be1d7..1f560d064a9 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
@@ -88,6 +88,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.s3a.impl.ChangeDetectionPolicy;
import org.apache.hadoop.fs.s3a.select.InternalSelectConstants;
import org.apache.hadoop.util.LambdaUtils;
import org.apache.hadoop.fs.FileAlreadyExistsException;
@@ -214,6 +215,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
createStorageStatistics();
private long readAhead;
private S3AInputPolicy inputPolicy;
+ private ChangeDetectionPolicy changeDetectionPolicy;
private final AtomicBoolean closed = new AtomicBoolean(false);
private volatile boolean isClosed = false;
private MetadataStore metadataStore;
@@ -361,6 +363,8 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
inputPolicy = S3AInputPolicy.getPolicy(
conf.getTrimmed(INPUT_FADVISE, INPUT_FADV_NORMAL));
LOG.debug("Input fadvise policy = {}", inputPolicy);
+ changeDetectionPolicy = ChangeDetectionPolicy.getPolicy(conf);
+ LOG.debug("Change detection policy = {}", changeDetectionPolicy);
boolean magicCommitterEnabled = conf.getBoolean(
CommitConstants.MAGIC_COMMITTER_ENABLED,
CommitConstants.DEFAULT_MAGIC_COMMITTER_ENABLED);
@@ -687,6 +691,15 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
return inputPolicy;
}
+ /**
+ * Get the change detection policy for this FS instance.
+ * @return the change detection policy
+ */
+ @VisibleForTesting
+ ChangeDetectionPolicy getChangeDetectionPolicy() {
+ return changeDetectionPolicy;
+ }
+
/**
* Get the encryption algorithm of this endpoint.
* @return the encryption algorithm.
@@ -875,9 +888,18 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
S3AInputPolicy policy = S3AInputPolicy.getPolicy(
o.get(INPUT_FADVISE, inputPolicy.toString()));
long readAheadRange2 = o.getLong(READAHEAD_RANGE, readAhead);
- readContext = createReadContext(fileStatus, policy, readAheadRange2);
+ // TODO support change detection policy from options?
+ readContext = createReadContext(
+ fileStatus,
+ policy,
+ changeDetectionPolicy,
+ readAheadRange2);
} else {
- readContext = createReadContext(fileStatus, inputPolicy, readAhead);
+ readContext = createReadContext(
+ fileStatus,
+ inputPolicy,
+ changeDetectionPolicy,
+ readAhead);
}
LOG.debug("Opening '{}'", readContext);
@@ -900,6 +922,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
private S3AReadOpContext createReadContext(
final FileStatus fileStatus,
final S3AInputPolicy seekPolicy,
+ final ChangeDetectionPolicy changePolicy,
final long readAheadRange) {
return new S3AReadOpContext(fileStatus.getPath(),
hasMetadataStore(),
@@ -909,6 +932,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
instrumentation,
fileStatus,
seekPolicy,
+ changePolicy,
readAheadRange);
}
@@ -3676,7 +3700,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
long ra = options.getLong(READAHEAD_RANGE, readAhead);
// build and execute the request
return selectBinding.select(
- createReadContext(fileStatus, inputPolicy, ra),
+ createReadContext(fileStatus, inputPolicy, changeDetectionPolicy, ra),
expression,
options,
generateSSECustomerKey(),
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java
index 35dd834bf19..d0966013074 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java
@@ -32,6 +32,8 @@ import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.CanSetReadahead;
import org.apache.hadoop.fs.FSExceptionMessages;
import org.apache.hadoop.fs.FSInputStream;
+import org.apache.hadoop.fs.PathIOException;
+import org.apache.hadoop.fs.s3a.impl.ChangeTracker;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -66,6 +68,9 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead {
public static final String E_NEGATIVE_READAHEAD_VALUE
= "Negative readahead value";
+ public static final String OPERATION_OPEN = "open";
+ public static final String OPERATION_REOPEN = "re-open";
+
/**
* This is the public position; the one set in {@link #seek(long)}
* and returned in {@link #getPos()}.
@@ -110,6 +115,9 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead {
*/
private long contentRangeStart;
+ /** change tracker. */
+ private final ChangeTracker changeTracker;
+
/**
* Create the stream.
* This does not attempt to open it; that is only done on the first
@@ -138,6 +146,9 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead {
this.serverSideEncryptionAlgorithm =
s3Attributes.getServerSideEncryptionAlgorithm();
this.serverSideEncryptionKey = s3Attributes.getServerSideEncryptionKey();
+ this.changeTracker = new ChangeTracker(uri,
+ ctx.getChangeDetectionPolicy(),
+ streamStatistics.getVersionMismatchCounter());
setInputPolicy(ctx.getInputPolicy());
setReadahead(ctx.getReadahead());
}
@@ -182,15 +193,20 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead {
StringUtils.isNotBlank(serverSideEncryptionKey)){
request.setSSECustomerKey(new SSECustomerKey(serverSideEncryptionKey));
}
- String text = String.format("Failed to %s %s at %d",
- (opencount == 0 ? "open" : "re-open"), uri, targetPos);
+ String operation = opencount == 0 ? OPERATION_OPEN : OPERATION_REOPEN;
+ String text = String.format("%s %s at %d",
+ operation, uri, targetPos);
+ changeTracker.maybeApplyConstraint(request);
S3Object object = Invoker.once(text, uri,
() -> client.getObject(request));
+
+ changeTracker.processResponse(object, operation,
+ targetPos);
wrappedStream = object.getObjectContent();
contentRangeStart = targetPos;
if (wrappedStream == null) {
- throw new IOException("Null IO stream from reopen of (" + reason + ") "
- + uri);
+ throw new PathIOException(uri,
+ "Null IO stream from " + operation + " of (" + reason + ") ");
}
this.pos = targetPos;
@@ -670,6 +686,7 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead {
sb.append(" contentRangeFinish=").append(contentRangeFinish);
sb.append(" remainingInCurrentRequest=")
.append(remainingInCurrentRequest());
+ sb.append(changeTracker);
sb.append('\n').append(s);
sb.append('}');
return sb.toString();
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java
index 17c5aff9af7..812b9c53684 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java
@@ -161,6 +161,7 @@ public class S3AInstrumentation implements Closeable, MetricsSource {
OBJECT_PUT_REQUESTS,
OBJECT_PUT_REQUESTS_COMPLETED,
OBJECT_SELECT_REQUESTS,
+ STREAM_READ_VERSION_MISMATCHES,
STREAM_WRITE_FAILURES,
STREAM_WRITE_BLOCK_UPLOADS,
STREAM_WRITE_BLOCK_UPLOADS_COMMITTED,
@@ -594,6 +595,8 @@ public class S3AInstrumentation implements Closeable, MetricsSource {
streamReadsIncomplete.incr(statistics.readsIncomplete);
streamBytesReadInClose.incr(statistics.bytesReadInClose);
streamBytesDiscardedInAbort.incr(statistics.bytesDiscardedInAbort);
+ incrementCounter(STREAM_READ_VERSION_MISMATCHES,
+ statistics.versionMismatches.get());
}
@Override
@@ -639,6 +642,8 @@ public class S3AInstrumentation implements Closeable, MetricsSource {
public long bytesDiscardedInAbort;
public long policySetCount;
public long inputPolicy;
+ /** This is atomic so that it can be passed as a reference. */
+ private final AtomicLong versionMismatches = new AtomicLong(0);
private InputStreamStatistics() {
}
@@ -763,6 +768,14 @@ public class S3AInstrumentation implements Closeable, MetricsSource {
inputPolicy = updatedPolicy;
}
+ /**
+ * Get a reference to the version mismatch counter.
+ * @return a counter which can be incremented.
+ */
+ public AtomicLong getVersionMismatchCounter() {
+ return versionMismatches;
+ }
+
/**
* String operator describes all the current statistics.
* Important: there are no guarantees as to the stability
@@ -796,6 +809,7 @@ public class S3AInstrumentation implements Closeable, MetricsSource {
sb.append(", BytesDiscardedInAbort=").append(bytesDiscardedInAbort);
sb.append(", InputPolicy=").append(inputPolicy);
sb.append(", InputPolicySetCount=").append(policySetCount);
+ sb.append(", versionMismatches=").append(versionMismatches.get());
sb.append('}');
return sb.toString();
}
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AReadOpContext.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AReadOpContext.java
index e49a7e9c947..a7317c94512 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AReadOpContext.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AReadOpContext.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.fs.s3a;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.s3a.impl.ChangeDetectionPolicy;
import javax.annotation.Nullable;
@@ -43,6 +44,11 @@ public class S3AReadOpContext extends S3AOpContext {
*/
private final S3AInputPolicy inputPolicy;
+ /**
+ * How to detect and deal with the object being updated during read.
+ */
+ private final ChangeDetectionPolicy changeDetectionPolicy;
+
/**
* Readahead for GET operations/skip, etc.
*/
@@ -59,6 +65,7 @@ public class S3AReadOpContext extends S3AOpContext {
* @param dstFileStatus target file status
* @param inputPolicy the input policy
* @param readahead readahead for GET operations/skip, etc.
+ * @param changeDetectionPolicy change detection policy.
*/
public S3AReadOpContext(
final Path path,
@@ -69,6 +76,7 @@ public class S3AReadOpContext extends S3AOpContext {
S3AInstrumentation instrumentation,
FileStatus dstFileStatus,
S3AInputPolicy inputPolicy,
+ ChangeDetectionPolicy changeDetectionPolicy,
final long readahead) {
super(isS3GuardEnabled, invoker, s3guardInvoker, stats, instrumentation,
dstFileStatus);
@@ -76,6 +84,7 @@ public class S3AReadOpContext extends S3AOpContext {
Preconditions.checkArgument(readahead >= 0,
"invalid readahead %d", readahead);
this.inputPolicy = checkNotNull(inputPolicy);
+ this.changeDetectionPolicy = checkNotNull(changeDetectionPolicy);
this.readahead = readahead;
}
@@ -110,6 +119,10 @@ public class S3AReadOpContext extends S3AOpContext {
return inputPolicy;
}
+ public ChangeDetectionPolicy getChangeDetectionPolicy() {
+ return changeDetectionPolicy;
+ }
+
/**
* Get the readahead for this operation.
* @return a value {@literal >=} 0
@@ -125,6 +138,7 @@ public class S3AReadOpContext extends S3AOpContext {
sb.append("path=").append(path);
sb.append(", inputPolicy=").append(inputPolicy);
sb.append(", readahead=").append(readahead);
+ sb.append(", changeDetectionPolicy=").append(changeDetectionPolicy);
sb.append('}');
return sb.toString();
}
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ARetryPolicy.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ARetryPolicy.java
index 1e475e1570c..c7b80c9d94c 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ARetryPolicy.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ARetryPolicy.java
@@ -172,6 +172,13 @@ public class S3ARetryPolicy implements RetryPolicy {
policyMap.put(FileNotFoundException.class, fail);
policyMap.put(InvalidRequestException.class, fail);
+ // once the file has changed, trying again is not going to help
+ policyMap.put(RemoteFileChangedException.class, fail);
+
+ // likely only recovered by changing the policy configuration or s3
+ // implementation
+ policyMap.put(NoVersionAttributeException.class, fail);
+
// should really be handled by resubmitting to new location;
// that's beyond the scope of this retry policy
policyMap.put(AWSRedirectException.class, fail);
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java
index 6f792860d64..54a2c602541 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java
@@ -116,13 +116,15 @@ public enum Statistic {
STREAM_OPENED("stream_opened",
"Total count of times an input stream to object store was opened"),
STREAM_READ_EXCEPTIONS("stream_read_exceptions",
- "Number of seek operations invoked on input streams"),
+ "Number of exceptions invoked on input streams"),
STREAM_READ_FULLY_OPERATIONS("stream_read_fully_operations",
"Count of readFully() operations in streams"),
STREAM_READ_OPERATIONS("stream_read_operations",
"Count of read() operations in streams"),
STREAM_READ_OPERATIONS_INCOMPLETE("stream_read_operations_incomplete",
"Count of incomplete read() operations in streams"),
+ STREAM_READ_VERSION_MISMATCHES("stream_read_version_mismatches",
+ "Count of version mismatches encountered while reading streams"),
STREAM_SEEK_BYTES_BACKWARDS("stream_bytes_backwards_on_seek",
"Count of bytes moved backwards during seek operations"),
STREAM_SEEK_BYTES_READ("stream_bytes_read",
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/ChangeDetectionPolicy.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/ChangeDetectionPolicy.java
new file mode 100644
index 00000000000..f3d8bc20c82
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/ChangeDetectionPolicy.java
@@ -0,0 +1,376 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.impl;
+
+import java.util.Locale;
+
+import com.amazonaws.services.s3.model.GetObjectRequest;
+import com.amazonaws.services.s3.model.ObjectMetadata;
+import com.google.common.annotations.VisibleForTesting;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.s3a.RemoteFileChangedException;
+
+import static org.apache.hadoop.fs.s3a.Constants.*;
+
+/**
+ * Object change detection policy.
+ * Determines which attribute is used to detect change and what to do when
+ * change is detected.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public abstract class ChangeDetectionPolicy {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(ChangeDetectionPolicy.class);
+
+ @VisibleForTesting
+ public static final String CHANGE_DETECTED = "change detected";
+
+ private final Mode mode;
+ private final boolean requireVersion;
+
+ /**
+ * Version support is only warned about once per S3A instance.
+ * This still means that on a long-lived application which destroys
+ * filesystems it'll appear once-per-query in the logs, but at least
+ * it will not appear once per file read.
+ */
+ private final LogExactlyOnce logNoVersionSupport = new LogExactlyOnce(LOG);
+
+ /**
+ * The S3 object attribute used to detect change.
+ */
+ public enum Source {
+ ETag(CHANGE_DETECT_SOURCE_ETAG),
+ VersionId(CHANGE_DETECT_SOURCE_VERSION_ID),
+ /** you can't ask for this explicitly outside of tests. */
+ None("none");
+
+ private final String source;
+
+ Source(String source) {
+ this.source = source;
+ }
+
+ private static Source fromString(String trimmed) {
+ for (Source value : values()) {
+ if (value.source.equals(trimmed)) {
+ return value;
+ }
+ }
+ LOG.warn("Unrecognized " + CHANGE_DETECT_SOURCE + " value: \"{}\"",
+ trimmed);
+ return fromString(CHANGE_DETECT_SOURCE_DEFAULT);
+ }
+
+ static Source fromConfiguration(Configuration configuration) {
+ String trimmed = configuration.get(CHANGE_DETECT_SOURCE,
+ CHANGE_DETECT_SOURCE_DEFAULT).trim()
+ .toLowerCase(Locale.ENGLISH);
+ return fromString(trimmed);
+ }
+ }
+
+ /**
+ * What to do when change is detected.
+ */
+ public enum Mode {
+ /** Client side validation. */
+ Client(CHANGE_DETECT_MODE_CLIENT),
+ /** Server side validation. */
+ Server(CHANGE_DETECT_MODE_SERVER),
+ /** Warn but continue. */
+ Warn(CHANGE_DETECT_MODE_WARN),
+ /** No checks. */
+ None(CHANGE_DETECT_MODE_NONE);
+
+ private final String mode;
+
+ Mode(String mode) {
+ this.mode = mode;
+ }
+
+ private static Mode fromString(String trimmed) {
+ for (Mode value : values()) {
+ if (value.mode.equals(trimmed)) {
+ return value;
+ }
+ }
+ LOG.warn("Unrecognized " + CHANGE_DETECT_MODE + " value: \"{}\"",
+ trimmed);
+ return fromString(CHANGE_DETECT_MODE_DEFAULT);
+ }
+
+ static Mode fromConfiguration(Configuration configuration) {
+ String trimmed = configuration.get(CHANGE_DETECT_MODE,
+ CHANGE_DETECT_MODE_DEFAULT)
+ .trim()
+ .toLowerCase(Locale.ENGLISH);
+ return fromString(trimmed);
+ }
+ }
+
+ protected ChangeDetectionPolicy(Mode mode, boolean requireVersion) {
+ this.mode = mode;
+ this.requireVersion = requireVersion;
+ }
+
+ public Mode getMode() {
+ return mode;
+ }
+
+ public abstract Source getSource();
+
+ public boolean isRequireVersion() {
+ return requireVersion;
+ }
+
+ public LogExactlyOnce getLogNoVersionSupport() {
+ return logNoVersionSupport;
+ }
+
+ /**
+ * Reads the change detection policy from Configuration.
+ *
+ * @param configuration the configuration
+ * @return the policy
+ */
+ public static ChangeDetectionPolicy getPolicy(Configuration configuration) {
+ Mode mode = Mode.fromConfiguration(configuration);
+ Source source = Source.fromConfiguration(configuration);
+ boolean requireVersion = configuration.getBoolean(
+ CHANGE_DETECT_REQUIRE_VERSION, CHANGE_DETECT_REQUIRE_VERSION_DEFAULT);
+ return createPolicy(mode, source, requireVersion);
+ }
+
+ /**
+ * Create a policy.
+ * @param mode mode pf checks
+ * @param source source of change
+ * @param requireVersion throw exception when no version available?
+ * @return the policy
+ */
+ @VisibleForTesting
+ public static ChangeDetectionPolicy createPolicy(final Mode mode,
+ final Source source, final boolean requireVersion) {
+ switch (source) {
+ case ETag:
+ return new ETagChangeDetectionPolicy(mode, requireVersion);
+ case VersionId:
+ return new VersionIdChangeDetectionPolicy(mode, requireVersion);
+ default:
+ return new NoChangeDetection();
+ }
+ }
+
+ /**
+ * Pulls the attribute this policy uses to detect change out of the S3 object
+ * metadata. The policy generically refers to this attribute as
+ * {@code revisionId}.
+ *
+ * @param objectMetadata the s3 object metadata
+ * @param uri the URI of the object
+ * @return the revisionId string as interpreted by this policy, or potentially
+ * null if the attribute is unavailable (such as when the policy says to use
+ * versionId but object versioning is not enabled for the bucket).
+ */
+ public abstract String getRevisionId(ObjectMetadata objectMetadata,
+ String uri);
+
+ /**
+ * Applies the given {@link #getRevisionId(ObjectMetadata, String) revisionId}
+ * as a server-side qualification on the {@code GetObjectRequest}.
+ *
+ * @param request the request
+ * @param revisionId the revision id
+ */
+ public abstract void applyRevisionConstraint(GetObjectRequest request,
+ String revisionId);
+
+ /**
+ * Takes appropriate action based on {@link #getMode() mode} when a change has
+ * been detected.
+ *
+ * @param revisionId the expected revision id
+ * @param newRevisionId the detected revision id
+ * @param uri the URI of the object being accessed
+ * @param position the position being read in the object
+ * @param operation the operation being performed on the object (e.g. open or
+ * re-open) that triggered the change detection
+ * @param timesAlreadyDetected number of times a change has already been
+ * detected on the current stream
+ * @return a pair of: was a change detected, and any exception to throw.
+ * If the change was detected, this updates a counter in the stream
+ * statistics; If an exception was returned it is thrown after the counter
+ * update.
+ */
+ public ImmutablePair onChangeDetected(
+ String revisionId,
+ String newRevisionId,
+ String uri,
+ long position,
+ String operation,
+ long timesAlreadyDetected) {
+ switch (mode) {
+ case None:
+ // something changed; we don't care.
+ return new ImmutablePair<>(false, null);
+ case Warn:
+ if (timesAlreadyDetected == 0) {
+ // only warn on the first detection to avoid a noisy log
+ LOG.warn(
+ String.format("%s change detected on %s %s at %d. Expected %s got %s",
+ getSource(), operation, uri, position, revisionId,
+ newRevisionId));
+ return new ImmutablePair<>(true, null);
+ }
+ return new ImmutablePair<>(false, null);
+ case Client:
+ case Server:
+ default:
+ // mode == Client (or Server, but really won't be called for Server)
+ return new ImmutablePair<>(true,
+ new RemoteFileChangedException(uri,
+ operation,
+ String.format("%s "
+ + CHANGE_DETECTED
+ + " while reading at position %s."
+ + " Expected %s got %s",
+ getSource(), position, revisionId, newRevisionId)));
+ }
+ }
+
+ /**
+ * Change detection policy based on {@link ObjectMetadata#getETag() eTag}.
+ */
+ static class ETagChangeDetectionPolicy extends ChangeDetectionPolicy {
+
+ ETagChangeDetectionPolicy(Mode mode, boolean requireVersion) {
+ super(mode, requireVersion);
+ }
+
+ @Override
+ public String getRevisionId(ObjectMetadata objectMetadata, String uri) {
+ return objectMetadata.getETag();
+ }
+
+ @Override
+ public void applyRevisionConstraint(GetObjectRequest request,
+ String revisionId) {
+ LOG.debug("Restricting request to etag {}", revisionId);
+ request.withMatchingETagConstraint(revisionId);
+ }
+
+ @Override
+ public Source getSource() {
+ return Source.ETag;
+ }
+
+ @Override
+ public String toString() {
+ return "ETagChangeDetectionPolicy mode=" + getMode();
+ }
+
+ }
+
+ /**
+ * Change detection policy based on
+ * {@link ObjectMetadata#getVersionId() versionId}.
+ */
+ static class VersionIdChangeDetectionPolicy extends
+ ChangeDetectionPolicy {
+
+ VersionIdChangeDetectionPolicy(Mode mode, boolean requireVersion) {
+ super(mode, requireVersion);
+ }
+
+ @Override
+ public String getRevisionId(ObjectMetadata objectMetadata, String uri) {
+ String versionId = objectMetadata.getVersionId();
+ if (versionId == null) {
+ // this policy doesn't work if the bucket doesn't have object versioning
+ // enabled (which isn't by default)
+ getLogNoVersionSupport().warn(
+ CHANGE_DETECT_MODE + " set to " + Source.VersionId
+ + " but no versionId available while reading {}. "
+ + "Ensure your bucket has object versioning enabled. "
+ + "You may see inconsistent reads.",
+ uri);
+ }
+ return versionId;
+ }
+
+ @Override
+ public void applyRevisionConstraint(GetObjectRequest request,
+ String revisionId) {
+ LOG.debug("Restricting request to version {}", revisionId);
+ request.withVersionId(revisionId);
+ }
+
+ @Override
+ public Source getSource() {
+ return Source.VersionId;
+ }
+
+ @Override
+ public String toString() {
+ return "VersionIdChangeDetectionPolicy mode=" + getMode();
+ }
+ }
+
+ /**
+ * Don't check for changes.
+ */
+ static class NoChangeDetection extends ChangeDetectionPolicy {
+
+ NoChangeDetection() {
+ super(Mode.None, false);
+ }
+
+ @Override
+ public Source getSource() {
+ return Source.None;
+ }
+
+ @Override
+ public String getRevisionId(final ObjectMetadata objectMetadata,
+ final String uri) {
+ return null;
+ }
+
+ @Override
+ public void applyRevisionConstraint(final GetObjectRequest request,
+ final String revisionId) {
+
+ }
+
+ @Override
+ public String toString() {
+ return "NoChangeDetection";
+ }
+
+ }
+}
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/ChangeTracker.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/ChangeTracker.java
new file mode 100644
index 00000000000..f76602b9532
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/ChangeTracker.java
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.impl;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+import com.amazonaws.services.s3.model.GetObjectRequest;
+import com.amazonaws.services.s3.model.ObjectMetadata;
+import com.amazonaws.services.s3.model.S3Object;
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.fs.s3a.NoVersionAttributeException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.fs.PathIOException;
+import org.apache.hadoop.fs.s3a.RemoteFileChangedException;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * Change tracking for input streams: the revision ID/etag
+ * the previous request is recorded and when the next request comes in,
+ * it is compared.
+ * Self-contained for testing and use in different streams.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class ChangeTracker {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(ChangeTracker.class);
+
+ public static final String CHANGE_REPORTED_BY_S3 = "reported by S3";
+
+ /** Policy to use. */
+ private final ChangeDetectionPolicy policy;
+
+ /**
+ * URI of file being read.
+ */
+ private final String uri;
+
+ /**
+ * Mismatch counter; expected to be wired up to StreamStatistics except
+ * during testing.
+ */
+ private final AtomicLong versionMismatches;
+
+ /**
+ * Revision identifier (e.g. eTag or versionId, depending on change
+ * detection policy).
+ */
+ private String revisionId;
+
+ /**
+ * Create a change tracker.
+ * @param uri URI of object being tracked
+ * @param policy policy to track.
+ * @param versionMismatches reference to the version mismatch counter
+ */
+ public ChangeTracker(final String uri,
+ final ChangeDetectionPolicy policy,
+ final AtomicLong versionMismatches) {
+ this.policy = checkNotNull(policy);
+ this.uri = uri;
+ this.versionMismatches = versionMismatches;
+ }
+
+ public String getRevisionId() {
+ return revisionId;
+ }
+
+ public ChangeDetectionPolicy.Source getSource() {
+ return policy.getSource();
+ }
+
+ @VisibleForTesting
+ public AtomicLong getVersionMismatches() {
+ return versionMismatches;
+ }
+
+ /**
+ * Apply any revision control set by the policy if it is to be
+ * enforced on the server.
+ * @param request request to modify
+ * @return true iff a constraint was added.
+ */
+ public boolean maybeApplyConstraint(
+ final GetObjectRequest request) {
+
+ if (policy.getMode() == ChangeDetectionPolicy.Mode.Server
+ && revisionId != null) {
+ policy.applyRevisionConstraint(request, revisionId);
+ return true;
+ }
+ return false;
+ }
+
+
+ /**
+ * Process the response from the server for validation against the
+ * change policy.
+ * @param object object returned; may be null.
+ * @param operation operation in progress.
+ * @param pos offset of read
+ * @throws PathIOException raised on failure
+ * @throws RemoteFileChangedException if the remote file has changed.
+ */
+ public void processResponse(final S3Object object,
+ final String operation,
+ final long pos) throws PathIOException {
+ if (object == null) {
+ // no object returned. Either mismatch or something odd.
+ if (revisionId != null) {
+ // the requirements of the change detection policy wasn't met: the
+ // object was not returned.
+ versionMismatches.incrementAndGet();
+ throw new RemoteFileChangedException(uri, operation,
+ String.format("%s change "
+ + CHANGE_REPORTED_BY_S3
+ + " while reading"
+ + " at position %s."
+ + " Version %s was unavailable",
+ getSource(),
+ pos,
+ getRevisionId()));
+ } else {
+ throw new PathIOException(uri, "No data returned from GET request");
+ }
+ }
+
+ final ObjectMetadata metadata = object.getObjectMetadata();
+ final String newRevisionId = policy.getRevisionId(metadata, uri);
+ if (newRevisionId == null && policy.isRequireVersion()) {
+ throw new NoVersionAttributeException(uri, String.format(
+ "Change detection policy requires %s",
+ policy.getSource()));
+ }
+ if (revisionId == null) {
+ // revisionId is null on first (re)open. Pin it so change can be detected
+ // if object has been updated
+ LOG.debug("Setting revision ID for object at {}: {}",
+ uri, newRevisionId);
+ revisionId = newRevisionId;
+ } else if (!revisionId.equals(newRevisionId)) {
+ LOG.debug("Revision ID changed from {} to {}",
+ revisionId, newRevisionId);
+ ImmutablePair pair =
+ policy.onChangeDetected(
+ revisionId,
+ newRevisionId,
+ uri,
+ pos,
+ operation,
+ versionMismatches.get());
+ if (pair.left) {
+ // an mismatch has occurred: note it.
+ versionMismatches.incrementAndGet();
+ }
+ if (pair.right != null) {
+ // there's an exception to raise: do it
+ throw pair.right;
+ }
+ }
+ }
+
+ @Override
+ public String toString() {
+ final StringBuilder sb = new StringBuilder(
+ "ChangeTracker{");
+ sb.append("changeDetectionPolicy=").append(policy);
+ sb.append(", revisionId='").append(revisionId).append('\'');
+ sb.append('}');
+ return sb.toString();
+ }
+
+}
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/LogExactlyOnce.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/LogExactlyOnce.java
new file mode 100644
index 00000000000..54a8836d02b
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/LogExactlyOnce.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.impl;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.slf4j.Logger;
+
+/**
+ * Log exactly once, even across threads.
+ */
+public class LogExactlyOnce {
+
+ private final AtomicBoolean logged = new AtomicBoolean(false);
+ private final Logger log;
+
+ public LogExactlyOnce(final Logger log) {
+ this.log = log;
+ }
+
+ public void warn(String format, Object...args) {
+ if (!logged.getAndSet(true)) {
+ log.warn(format, args);
+ }
+ }
+}
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/package-info.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/package-info.java
new file mode 100644
index 00000000000..2ef6db8a872
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/package-info.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+/**
+ * Implementation classes private to the S3A store.
+ * Do not use outside of the hadoop-aws module.
+ */
+
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+package org.apache.hadoop.fs.s3a.impl;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
diff --git a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md
index 11ff6104e7d..17414719ae8 100644
--- a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md
+++ b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md
@@ -1144,6 +1144,131 @@ the capacity through `hadoop s3guard set-capacity` (and pay more, obviously).
+## Handling Read-During-Overwrite
+
+Read-during-overwrite is the condition where a writer overwrites a file while
+a reader has an open input stream on the file. Depending on configuration,
+the S3AFileSystem may detect this and throw a `RemoteFileChangedException` in
+conditions where the reader's input stream might otherwise silently switch over
+from reading bytes from the original version of the file to reading bytes from
+the new version.
+
+The configurations items controlling this behavior are:
+
+```xml
+
+ fs.s3a.change.detection.source
+ etag
+
+ Select which S3 object attribute to use for change detection.
+ Currently support 'etag' for S3 object eTags and 'versionid' for
+ S3 object version IDs. Use of version IDs requires object versioning to be
+ enabled for each S3 bucket utilized. Object versioning is disabled on
+ buckets by default. When version ID is used, the buckets utilized should
+ have versioning enabled before any data is written.
+
+
+
+
+ fs.s3a.change.detection.mode
+ server
+
+ Determines how change detection is applied to alert to S3 objects
+ rewritten while being read. Value 'server' indicates to apply the attribute
+ constraint directly on GetObject requests to S3. Value 'client' means to do a
+ client-side comparison of the attribute value returned in the response. Value
+ 'server' would not work with third-party S3 implementations that do not
+ support these constraints on GetObject. Values 'server' and 'client' generate
+ RemoteObjectChangedException when a mismatch is detected. Value 'warn' works
+ like 'client' but generates only a warning. Value 'none' will ignore change
+ detection completely.
+
+
+
+
+ fs.s3a.change.detection.version.required
+ true
+
+ Determines if S3 object version attribute defined by
+ fs.s3.change.detection.source should be treated as required. If true and the
+ referred attribute is unavailable in an S3 GetObject response,
+ NoVersionAttributeException is thrown. Setting to 'true' is encouraged to
+ avoid potential for inconsistent reads with third-party S3 implementations or
+ against S3 buckets that have object versioning disabled.
+
+
+```
+
+In the default configuration, S3 object eTags are used to detect changes. When
+the filesystem retrieves a file from S3 using
+[Get Object](https://docs.aws.amazon.com/AmazonS3/latest/API/RESTObjectGET.html),
+it captures the eTag and uses that eTag in an 'If-Match' condition on each
+subsequent request. If a concurrent writer has overwritten the file, the
+'If-Match' condition will fail and a RemoteFileChangedException will be thrown.
+
+Even in this default configuration, a new write may not trigger this exception
+on an open reader. For example, if the reader only reads forward in the file
+then only a single S3 'Get Object' request is made and the full contents of the
+file are streamed from a single response. An overwrite of the file after the
+'Get Object' request would not be seen at all by a reader with an input stream
+that had already read the first byte. Seeks backward on the other hand can
+result in new 'Get Object' requests that can trigger the
+`RemoteFileChangedException`.
+
+Additionally, due to the eventual consistency of S3 in a read-after-overwrite
+scenario, visibility of a new write may be delayed, avoiding the
+`RemoteFileChangedException` for some readers. That said, if a reader does not
+see `RemoteFileChangedException`, they will have at least read a consistent view
+of a single version of the file (the version available when they started
+reading).
+
+### Change detection with S3 Versions.
+
+It is possible to switch to using the
+[S3 object version id](https://docs.aws.amazon.com/AmazonS3/latest/dev/ObjectVersioning.html)
+instead of eTag as the change detection mechanism. Use of this option requires
+object versioning to be enabled on any S3 buckets used by the filesystem. The
+benefit of using version id instead of eTag is potentially reduced frequency
+of RemoteFileChangedException. With object versioning enabled, old versions
+of objects remain available after they have been overwritten.
+This means an open input stream will still be able to seek backwards after a
+concurrent writer has overwritten the file.
+The reader will retain their consistent view of the version of the file from
+which they read the first byte.
+Because the version ID is null for objects written prior to enablement of
+object versioning, **this option should only be used when the S3 buckets
+have object versioning enabled from the beginning.**
+
+Note: when you rename files the copied files may have a different version number.
+
+### Change Detection Modes.
+
+Configurable change detection mode is the next option. Different modes are
+available primarily for compatibility with third-party S3 implementations which
+may not support all change detection mechanisms.
+
+* `server`: the version/etag check is performed on the server by adding
+extra headers to the `GET` request. This is the default.
+* `client` : check on the client by comparing the eTag/version ID of a
+reopened file with the previous version.
+This is useful when the implementation doesn't support the `If-Match` header.
+* `warn`: check on the client, but only warn on a mismatch, rather than fail.
+* `none` do not check. Useful if the implementation doesn't provide eTag
+or version ID support at all or you would like to retain previous behavior
+where the reader's input stream silently switches over to the new object version
+(not recommended).
+
+The final option (`fs.s3a.change.detection.version.required`) is present
+primarily to ensure the filesystem doesn't silently ignore the condition
+where it is configured to use version ID on a bucket that doesn't have
+object versioning enabled or alternatively it is configured to use eTag on
+an S3 implementation that doesn't return eTags.
+
+When `true` (default) and 'Get Object' doesn't return eTag or
+version ID (depending on configured 'source'), a `NoVersionAttributeException`
+will be thrown. When `false` and and eTag or version ID is not returned,
+the stream can be read, but without any version checking.
+
## Configuring different S3 buckets with Per-Bucket Configuration
diff --git a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/troubleshooting_s3a.md b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/troubleshooting_s3a.md
index b3c3e38ef03..3123221bd82 100644
--- a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/troubleshooting_s3a.md
+++ b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/troubleshooting_s3a.md
@@ -715,36 +715,36 @@ org.apache.hadoop.fs.s3a.AWSS3IOException: copyFromLocalFile(file:/tmp/hello.txt
(Service: Amazon S3; Status Code: 400; Error Code: BadDigest; Request ID: 4018131225),
S3 Extended Request ID: null
at org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:127)
- at org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:69)
- at org.apache.hadoop.fs.s3a.S3AFileSystem.copyFromLocalFile(S3AFileSystem.java:1494)
- at org.apache.hadoop.tools.cloudup.Cloudup.uploadOneFile(Cloudup.java:466)
- at org.apache.hadoop.tools.cloudup.Cloudup.access$000(Cloudup.java:63)
- at org.apache.hadoop.tools.cloudup.Cloudup$1.call(Cloudup.java:353)
- at org.apache.hadoop.tools.cloudup.Cloudup$1.call(Cloudup.java:350)
- at java.util.concurrent.FutureTask.run(FutureTask.java:266)
- at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
- at java.util.concurrent.FutureTask.run(FutureTask.java:266)
- at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
- at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
- at java.lang.Thread.run(Thread.java:748)
+ at org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:69)
+ at org.apache.hadoop.fs.s3a.S3AFileSystem.copyFromLocalFile(S3AFileSystem.java:1494)
+ at org.apache.hadoop.tools.cloudup.Cloudup.uploadOneFile(Cloudup.java:466)
+ at org.apache.hadoop.tools.cloudup.Cloudup.access$000(Cloudup.java:63)
+ at org.apache.hadoop.tools.cloudup.Cloudup$1.call(Cloudup.java:353)
+ at org.apache.hadoop.tools.cloudup.Cloudup$1.call(Cloudup.java:350)
+ at java.util.concurrent.FutureTask.run(FutureTask.java:266)
+ at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
+ at java.util.concurrent.FutureTask.run(FutureTask.java:266)
+ at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
+ at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
+ at java.lang.Thread.run(Thread.java:748)
Caused by: com.amazonaws.services.s3.model.AmazonS3Exception:
The Content-MD5 you specified did not match what we received.
(Service: Amazon S3; Status Code: 400; Error Code: BadDigest; Request ID: 4018131225),
S3 Extended Request ID: null
at com.amazonaws.http.AmazonHttpClient.handleErrorResponse(AmazonHttpClient.java:1307)
- at com.amazonaws.http.AmazonHttpClient.executeOneRequest(AmazonHttpClient.java:894)
- at com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:597)
- at com.amazonaws.http.AmazonHttpClient.doExecute(AmazonHttpClient.java:363)
- at com.amazonaws.http.AmazonHttpClient.executeWithTimer(AmazonHttpClient.java:329)
- at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:308)
- at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3659)
- at com.amazonaws.services.s3.AmazonS3Client.putObject(AmazonS3Client.java:1422)
- at com.amazonaws.services.s3.transfer.internal.UploadCallable.uploadInOneChunk(UploadCallable.java:131)
- at com.amazonaws.services.s3.transfer.internal.UploadCallable.call(UploadCallable.java:123)
- at com.amazonaws.services.s3.transfer.internal.UploadMonitor.call(UploadMonitor.java:139)
- at com.amazonaws.services.s3.transfer.internal.UploadMonitor.call(UploadMonitor.java:47)
- at org.apache.hadoop.fs.s3a.BlockingThreadPoolExecutorService$CallableWithPermitRelease.call(BlockingThreadPoolExecutorService.java:239)
- ... 4 more
+ at com.amazonaws.http.AmazonHttpClient.executeOneRequest(AmazonHttpClient.java:894)
+ at com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:597)
+ at com.amazonaws.http.AmazonHttpClient.doExecute(AmazonHttpClient.java:363)
+ at com.amazonaws.http.AmazonHttpClient.executeWithTimer(AmazonHttpClient.java:329)
+ at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:308)
+ at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3659)
+ at com.amazonaws.services.s3.AmazonS3Client.putObject(AmazonS3Client.java:1422)
+ at com.amazonaws.services.s3.transfer.internal.UploadCallable.uploadInOneChunk(UploadCallable.java:131)
+ at com.amazonaws.services.s3.transfer.internal.UploadCallable.call(UploadCallable.java:123)
+ at com.amazonaws.services.s3.transfer.internal.UploadMonitor.call(UploadMonitor.java:139)
+ at com.amazonaws.services.s3.transfer.internal.UploadMonitor.call(UploadMonitor.java:47)
+ at org.apache.hadoop.fs.s3a.BlockingThreadPoolExecutorService$CallableWithPermitRelease.call(BlockingThreadPoolExecutorService.java:239)
+ ... 4 more
```
This stack trace was seen when interacting with a third-party S3 store whose
@@ -966,6 +966,69 @@ if it is required that the data is persisted durably after every
This includes resilient logging, HBase-style journaling
and the like. The standard strategy here is to save to HDFS and then copy to S3.
+### `RemoteFileChangedException` and read-during-overwrite
+
+```
+org.apache.hadoop.fs.s3a.RemoteFileChangedException: re-open `s3a://my-bucket/test/file.txt':
+ ETag change reported by S3 while reading at position 1949.
+ Version f9c186d787d4de9657e99f280ba26555 was unavailable
+ at org.apache.hadoop.fs.s3a.impl.ChangeTracker.processResponse(ChangeTracker.java:137)
+ at org.apache.hadoop.fs.s3a.S3AInputStream.reopen(S3AInputStream.java:200)
+ at org.apache.hadoop.fs.s3a.S3AInputStream.lambda$lazySeek$1(S3AInputStream.java:346)
+ at org.apache.hadoop.fs.s3a.Invoker.lambda$retry$2(Invoker.java:195)
+ at org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:109)
+ at org.apache.hadoop.fs.s3a.Invoker.lambda$retry$3(Invoker.java:265)
+ at org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:322)
+ at org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:261)
+ at org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:193)
+ at org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:215)
+ at org.apache.hadoop.fs.s3a.S3AInputStream.lazySeek(S3AInputStream.java:339)
+ at org.apache.hadoop.fs.s3a.S3AInputStream.read(S3AInputStream.java:372)
+```
+
+If an S3 object is updated while an S3A filesystem reader has an open
+`InputStream` on it, the reader may encounter `RemoteFileChangedException`. This
+occurs if the S3A `InputStream` needs to re-open the object (e.g. during a seek())
+and detects the change.
+
+If the change detection mode is configured to 'warn', a warning like the
+following will be seen instead of `RemoteFileChangedException`:
+
+```
+WARN - ETag change detected on re-open s3a://my-bucket/test/readFileToChange.txt at 1949.
+ Expected f9c186d787d4de9657e99f280ba26555 got 043abff21b7bd068d2d2f27ccca70309
+```
+
+Using a third-party S3 implementation that doesn't support eTags might result in
+the following error.
+
+```
+org.apache.hadoop.fs.s3a.NoVersionAttributeException: `s3a://my-bucket/test/file.txt':
+ Change detection policy requires ETag
+ at org.apache.hadoop.fs.s3a.impl.ChangeTracker.processResponse(ChangeTracker.java:153)
+ at org.apache.hadoop.fs.s3a.S3AInputStream.reopen(S3AInputStream.java:200)
+ at org.apache.hadoop.fs.s3a.S3AInputStream.lambda$lazySeek$1(S3AInputStream.java:346)
+ at org.apache.hadoop.fs.s3a.Invoker.lambda$retry$2(Invoker.java:195)
+ at org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:109)
+ at org.apache.hadoop.fs.s3a.Invoker.lambda$retry$3(Invoker.java:265)
+ at org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:322)
+ at org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:261)
+ at org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:193)
+ at org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:215)
+ at org.apache.hadoop.fs.s3a.S3AInputStream.lazySeek(S3AInputStream.java:339)
+ at org.apache.hadoop.fs.s3a.S3AInputStream.read(S3AInputStream.java:372)
+```
+
+If the change policy is `versionid` there are a number of possible causes
+
+* The bucket does not have object versioning enabled.
+* The bucket does have versioning enabled, but the object being read was created
+before versioning was enabled.
+* The bucket is on a third-party store which does not support object versioning.
+
+See [Handling Read-During-Overwrite](./index.html#handling_read-during-overwrite)
+for more information.
+
## S3 Server Side Encryption
### `AWSS3IOException` `KMS.NotFoundException` "Invalid arn" when using SSE-KMS
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AFailureHandling.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AFailureHandling.java
index 5c2b5a399d0..8f8d8605653 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AFailureHandling.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AFailureHandling.java
@@ -20,17 +20,14 @@ package org.apache.hadoop.fs.s3a;
import com.amazonaws.services.s3.model.DeleteObjectsRequest;
import com.amazonaws.services.s3.model.MultiObjectDeleteException;
+
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.EOFException;
-import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
@@ -40,8 +37,7 @@ import static org.apache.hadoop.fs.s3a.S3ATestUtils.getLandsatCSVPath;
import static org.apache.hadoop.test.LambdaTestUtils.*;
/**
- * Test S3A Failure translation, including a functional test
- * generating errors during stream IO.
+ * Test S3A Failure translation.
*/
public class ITestS3AFailureHandling extends AbstractS3ATestBase {
private static final Logger LOG =
@@ -54,65 +50,6 @@ public class ITestS3AFailureHandling extends AbstractS3ATestBase {
conf.setBoolean(Constants.ENABLE_MULTI_DELETE, true);
return conf;
}
- @Test
- public void testReadFileChanged() throws Throwable {
- describe("overwrite a file with a shorter one during a read, seek");
- final int fullLength = 8192;
- final byte[] fullDataset = dataset(fullLength, 'a', 32);
- final int shortLen = 4096;
- final byte[] shortDataset = dataset(shortLen, 'A', 32);
- final FileSystem fs = getFileSystem();
- final Path testpath = path("readFileToChange.txt");
- // initial write
- writeDataset(fs, testpath, fullDataset, fullDataset.length, 1024, false);
- try(FSDataInputStream instream = fs.open(testpath)) {
- instream.seek(fullLength - 16);
- assertTrue("no data to read", instream.read() >= 0);
- // overwrite
- writeDataset(fs, testpath, shortDataset, shortDataset.length, 1024, true);
- // here the file length is less. Probe the file to see if this is true,
- // with a spin and wait
- eventually(30 * 1000, 1000,
- () -> {
- assertEquals(shortLen, fs.getFileStatus(testpath).getLen());
- });
-
- // here length is shorter. Assuming it has propagated to all replicas,
- // the position of the input stream is now beyond the EOF.
- // An attempt to seek backwards to a position greater than the
- // short length will raise an exception from AWS S3, which must be
- // translated into an EOF
-
- instream.seek(shortLen + 1024);
- int c = instream.read();
- assertIsEOF("read()", c);
-
- byte[] buf = new byte[256];
-
- assertIsEOF("read(buffer)", instream.read(buf));
- assertIsEOF("read(offset)",
- instream.read(instream.getPos(), buf, 0, buf.length));
-
- // now do a block read fully, again, backwards from the current pos
- intercept(EOFException.class, "", "readfully",
- () -> instream.readFully(shortLen + 512, buf));
-
- assertIsEOF("read(offset)",
- instream.read(shortLen + 510, buf, 0, buf.length));
-
- // seek somewhere useful
- instream.seek(shortLen - 256);
-
- // delete the file. Reads must fail
- fs.delete(testpath, false);
-
- intercept(FileNotFoundException.class, "", "read()",
- () -> instream.read());
- intercept(FileNotFoundException.class, "", "readfully",
- () -> instream.readFully(2048, buf));
-
- }
- }
/**
* Assert that a read operation returned an EOF value.
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ARemoteFileChanged.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ARemoteFileChanged.java
new file mode 100644
index 00000000000..98dd2026f5f
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ARemoteFileChanged.java
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import java.io.FileNotFoundException;
+import java.util.Arrays;
+import java.util.Collection;
+
+import org.junit.Assume;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.s3a.impl.ChangeDetectionPolicy.Source;
+
+import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset;
+import static org.apache.hadoop.fs.contract.ContractTestUtils.writeDataset;
+import static org.apache.hadoop.fs.s3a.Constants.*;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.getTestBucketName;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBucketOverrides;
+import static org.apache.hadoop.test.LambdaTestUtils.eventually;
+import static org.apache.hadoop.test.LambdaTestUtils.intercept;
+
+/**
+ * Test S3A remote file change detection.
+ */
+@RunWith(Parameterized.class)
+public class ITestS3ARemoteFileChanged extends AbstractS3ATestBase {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(ITestS3ARemoteFileChanged.class);
+
+ private final String changeDetectionSource;
+ private final String changeDetectionMode;
+ private final boolean expectChangeException;
+ private final boolean expectFileNotFoundException;
+
+ @Parameterized.Parameters
+ public static Collection