HADOOP-15625. S3A input stream to use etags/version number to detect changed source files.

Author: Ben Roling <ben.roling@gmail.com>

Initial patch from Brahma Reddy Battula.
This commit is contained in:
Ben Roling 2019-03-13 20:31:13 +00:00 committed by Steve Loughran
parent 66357574ae
commit 6fa229891e
No known key found for this signature in database
GPG Key ID: D22CF846DBB162A0
20 changed files with 1625 additions and 98 deletions

View File

@ -1874,6 +1874,48 @@
</description>
</property>
<property>
<name>fs.s3a.change.detection.source</name>
<value>etag</value>
<description>
Select which S3 object attribute to use for change detection.
Currently support 'etag' for S3 object eTags and 'versionid' for
S3 object version IDs. Use of version IDs requires object versioning to be
enabled for each S3 bucket utilized. Object versioning is disabled on
buckets by default. When version ID is used, the buckets utilized should
have versioning enabled before any data is written.
</description>
</property>
<property>
<name>fs.s3a.change.detection.mode</name>
<value>server</value>
<description>
Determines how change detection is applied to alert to S3 objects
rewritten while being read. Value 'server' indicates to apply the attribute
constraint directly on GetObject requests to S3. Value 'client' means to do a
client-side comparison of the attribute value returned in the response. Value
'server' would not work with third-party S3 implementations that do not
support these constraints on GetObject. Values 'server' and 'client' generate
RemoteObjectChangedException when a mismatch is detected. Value 'warn' works
like 'client' but generates only a warning. Value 'none' will ignore change
detection completely.
</description>
</property>
<property>
<name>fs.s3a.change.detection.version.required</name>
<value>true</value>
<description>
Determines if S3 object version attribute defined by
fs.s3a.change.detection.source should be treated as required. If true and the
referred attribute is unavailable in an S3 GetObject response,
NoVersionAttributeException is thrown. Setting to 'true' is encouraged to
avoid potential for inconsistent reads with third-party S3 implementations or
against S3 buckets that have object versioning disabled.
</description>
</property>
<!-- Azure file system properties -->
<property>
<name>fs.AbstractFileSystem.wasb.impl</name>

View File

@ -641,4 +641,84 @@ public final class Constants {
*/
public static final boolean ETAG_CHECKSUM_ENABLED_DEFAULT = false;
/**
* Where to get the value to use in change detection. E.g. eTag, or
* versionId?
*/
public static final String CHANGE_DETECT_SOURCE
= "fs.s3a.change.detection.source";
/**
* eTag as the change detection mechanism.
*/
public static final String CHANGE_DETECT_SOURCE_ETAG = "etag";
/**
* Object versionId as the change detection mechanism.
*/
public static final String CHANGE_DETECT_SOURCE_VERSION_ID = "versionid";
/**
* Default change detection mechanism: eTag.
*/
public static final String CHANGE_DETECT_SOURCE_DEFAULT =
CHANGE_DETECT_SOURCE_ETAG;
/**
* Mode to run change detection in. Server side comparison? Client side
* comparison? Client side compare and warn rather than exception? Don't
* bother at all?
*/
public static final String CHANGE_DETECT_MODE =
"fs.s3a.change.detection.mode";
/**
* Change is detected on the client side by comparing the returned id with the
* expected id. A difference results in {@link RemoteFileChangedException}.
*/
public static final String CHANGE_DETECT_MODE_CLIENT = "client";
/**
* Change is detected by passing the expected value in the GetObject request.
* If the expected value is unavailable, {@link RemoteFileChangedException} is
* thrown.
*/
public static final String CHANGE_DETECT_MODE_SERVER = "server";
/**
* Change is detected on the client side by comparing the returned id with the
* expected id. A difference results in a WARN level message being logged.
*/
public static final String CHANGE_DETECT_MODE_WARN = "warn";
/**
* Change detection is turned off. Readers may see inconsistent results due
* to concurrent writes without any exception or warning messages. May be
* useful with third-party S3 API implementations that don't support one of
* the change detection modes.
*/
public static final String CHANGE_DETECT_MODE_NONE = "none";
/**
* Default change detection mode: server.
*/
public static final String CHANGE_DETECT_MODE_DEFAULT =
CHANGE_DETECT_MODE_SERVER;
/**
* If true, raises a {@link RemoteFileChangedException} exception when S3
* doesn't provide the attribute defined by fs.s3a.change.detection.source.
* For example, if source is versionId, but object versioning is not enabled
* on the bucket, or alternatively if source is eTag and a third-party S3
* implementation that doesn't return eTag is used.
* <p>
* When false, only a warning message will be logged for this condition.
*/
public static final String CHANGE_DETECT_REQUIRE_VERSION =
"fs.s3a.change.detection.version.required";
/**
* Default change detection require version: true.
*/
public static final boolean CHANGE_DETECT_REQUIRE_VERSION_DEFAULT = true;
}

View File

@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3a;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.PathIOException;
/**
* Indicates the S3 object does not provide the versioning attribute required
* by the configured change detection policy.
*/
@SuppressWarnings("serial")
@InterfaceAudience.Public
@InterfaceStability.Unstable
public class NoVersionAttributeException extends PathIOException {
/**
* Constructs a NoVersionAttributeException.
*
* @param path the path accessed when the condition was detected
* @param message a message providing more details about the condition
*/
public NoVersionAttributeException(String path,
String message) {
super(path, message);
}
}

View File

@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3a;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.PathIOException;
/**
* Indicates the S3 object is out of sync with the expected version. Thrown in
* cases such as when the object is updated while an {@link S3AInputStream} is
* open.
*/
@SuppressWarnings("serial")
@InterfaceAudience.Public
@InterfaceStability.Unstable
public class RemoteFileChangedException extends PathIOException {
/**
* Constructs a RemoteFileChangedException.
*
* @param path the path accessed when the change was detected
* @param operation the operation (e.g. open, re-open) performed when the
* change was detected
* @param message a message providing more details about the condition
*/
public RemoteFileChangedException(String path,
String operation,
String message) {
super(path, message);
setOperation(operation);
}
}

View File

@ -88,6 +88,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.s3a.impl.ChangeDetectionPolicy;
import org.apache.hadoop.fs.s3a.select.InternalSelectConstants;
import org.apache.hadoop.util.LambdaUtils;
import org.apache.hadoop.fs.FileAlreadyExistsException;
@ -214,6 +215,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
createStorageStatistics();
private long readAhead;
private S3AInputPolicy inputPolicy;
private ChangeDetectionPolicy changeDetectionPolicy;
private final AtomicBoolean closed = new AtomicBoolean(false);
private volatile boolean isClosed = false;
private MetadataStore metadataStore;
@ -361,6 +363,8 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
inputPolicy = S3AInputPolicy.getPolicy(
conf.getTrimmed(INPUT_FADVISE, INPUT_FADV_NORMAL));
LOG.debug("Input fadvise policy = {}", inputPolicy);
changeDetectionPolicy = ChangeDetectionPolicy.getPolicy(conf);
LOG.debug("Change detection policy = {}", changeDetectionPolicy);
boolean magicCommitterEnabled = conf.getBoolean(
CommitConstants.MAGIC_COMMITTER_ENABLED,
CommitConstants.DEFAULT_MAGIC_COMMITTER_ENABLED);
@ -687,6 +691,15 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
return inputPolicy;
}
/**
* Get the change detection policy for this FS instance.
* @return the change detection policy
*/
@VisibleForTesting
ChangeDetectionPolicy getChangeDetectionPolicy() {
return changeDetectionPolicy;
}
/**
* Get the encryption algorithm of this endpoint.
* @return the encryption algorithm.
@ -875,9 +888,18 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
S3AInputPolicy policy = S3AInputPolicy.getPolicy(
o.get(INPUT_FADVISE, inputPolicy.toString()));
long readAheadRange2 = o.getLong(READAHEAD_RANGE, readAhead);
readContext = createReadContext(fileStatus, policy, readAheadRange2);
// TODO support change detection policy from options?
readContext = createReadContext(
fileStatus,
policy,
changeDetectionPolicy,
readAheadRange2);
} else {
readContext = createReadContext(fileStatus, inputPolicy, readAhead);
readContext = createReadContext(
fileStatus,
inputPolicy,
changeDetectionPolicy,
readAhead);
}
LOG.debug("Opening '{}'", readContext);
@ -900,6 +922,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
private S3AReadOpContext createReadContext(
final FileStatus fileStatus,
final S3AInputPolicy seekPolicy,
final ChangeDetectionPolicy changePolicy,
final long readAheadRange) {
return new S3AReadOpContext(fileStatus.getPath(),
hasMetadataStore(),
@ -909,6 +932,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
instrumentation,
fileStatus,
seekPolicy,
changePolicy,
readAheadRange);
}
@ -3676,7 +3700,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
long ra = options.getLong(READAHEAD_RANGE, readAhead);
// build and execute the request
return selectBinding.select(
createReadContext(fileStatus, inputPolicy, ra),
createReadContext(fileStatus, inputPolicy, changeDetectionPolicy, ra),
expression,
options,
generateSSECustomerKey(),

View File

@ -32,6 +32,8 @@ import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.CanSetReadahead;
import org.apache.hadoop.fs.FSExceptionMessages;
import org.apache.hadoop.fs.FSInputStream;
import org.apache.hadoop.fs.PathIOException;
import org.apache.hadoop.fs.s3a.impl.ChangeTracker;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -66,6 +68,9 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead {
public static final String E_NEGATIVE_READAHEAD_VALUE
= "Negative readahead value";
public static final String OPERATION_OPEN = "open";
public static final String OPERATION_REOPEN = "re-open";
/**
* This is the public position; the one set in {@link #seek(long)}
* and returned in {@link #getPos()}.
@ -110,6 +115,9 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead {
*/
private long contentRangeStart;
/** change tracker. */
private final ChangeTracker changeTracker;
/**
* Create the stream.
* This does not attempt to open it; that is only done on the first
@ -138,6 +146,9 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead {
this.serverSideEncryptionAlgorithm =
s3Attributes.getServerSideEncryptionAlgorithm();
this.serverSideEncryptionKey = s3Attributes.getServerSideEncryptionKey();
this.changeTracker = new ChangeTracker(uri,
ctx.getChangeDetectionPolicy(),
streamStatistics.getVersionMismatchCounter());
setInputPolicy(ctx.getInputPolicy());
setReadahead(ctx.getReadahead());
}
@ -182,15 +193,20 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead {
StringUtils.isNotBlank(serverSideEncryptionKey)){
request.setSSECustomerKey(new SSECustomerKey(serverSideEncryptionKey));
}
String text = String.format("Failed to %s %s at %d",
(opencount == 0 ? "open" : "re-open"), uri, targetPos);
String operation = opencount == 0 ? OPERATION_OPEN : OPERATION_REOPEN;
String text = String.format("%s %s at %d",
operation, uri, targetPos);
changeTracker.maybeApplyConstraint(request);
S3Object object = Invoker.once(text, uri,
() -> client.getObject(request));
changeTracker.processResponse(object, operation,
targetPos);
wrappedStream = object.getObjectContent();
contentRangeStart = targetPos;
if (wrappedStream == null) {
throw new IOException("Null IO stream from reopen of (" + reason + ") "
+ uri);
throw new PathIOException(uri,
"Null IO stream from " + operation + " of (" + reason + ") ");
}
this.pos = targetPos;
@ -670,6 +686,7 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead {
sb.append(" contentRangeFinish=").append(contentRangeFinish);
sb.append(" remainingInCurrentRequest=")
.append(remainingInCurrentRequest());
sb.append(changeTracker);
sb.append('\n').append(s);
sb.append('}');
return sb.toString();

View File

@ -161,6 +161,7 @@ public class S3AInstrumentation implements Closeable, MetricsSource {
OBJECT_PUT_REQUESTS,
OBJECT_PUT_REQUESTS_COMPLETED,
OBJECT_SELECT_REQUESTS,
STREAM_READ_VERSION_MISMATCHES,
STREAM_WRITE_FAILURES,
STREAM_WRITE_BLOCK_UPLOADS,
STREAM_WRITE_BLOCK_UPLOADS_COMMITTED,
@ -594,6 +595,8 @@ public class S3AInstrumentation implements Closeable, MetricsSource {
streamReadsIncomplete.incr(statistics.readsIncomplete);
streamBytesReadInClose.incr(statistics.bytesReadInClose);
streamBytesDiscardedInAbort.incr(statistics.bytesDiscardedInAbort);
incrementCounter(STREAM_READ_VERSION_MISMATCHES,
statistics.versionMismatches.get());
}
@Override
@ -639,6 +642,8 @@ public class S3AInstrumentation implements Closeable, MetricsSource {
public long bytesDiscardedInAbort;
public long policySetCount;
public long inputPolicy;
/** This is atomic so that it can be passed as a reference. */
private final AtomicLong versionMismatches = new AtomicLong(0);
private InputStreamStatistics() {
}
@ -763,6 +768,14 @@ public class S3AInstrumentation implements Closeable, MetricsSource {
inputPolicy = updatedPolicy;
}
/**
* Get a reference to the version mismatch counter.
* @return a counter which can be incremented.
*/
public AtomicLong getVersionMismatchCounter() {
return versionMismatches;
}
/**
* String operator describes all the current statistics.
* <b>Important: there are no guarantees as to the stability
@ -796,6 +809,7 @@ public class S3AInstrumentation implements Closeable, MetricsSource {
sb.append(", BytesDiscardedInAbort=").append(bytesDiscardedInAbort);
sb.append(", InputPolicy=").append(inputPolicy);
sb.append(", InputPolicySetCount=").append(policySetCount);
sb.append(", versionMismatches=").append(versionMismatches.get());
sb.append('}');
return sb.toString();
}

View File

@ -21,6 +21,7 @@ package org.apache.hadoop.fs.s3a;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.s3a.impl.ChangeDetectionPolicy;
import javax.annotation.Nullable;
@ -43,6 +44,11 @@ public class S3AReadOpContext extends S3AOpContext {
*/
private final S3AInputPolicy inputPolicy;
/**
* How to detect and deal with the object being updated during read.
*/
private final ChangeDetectionPolicy changeDetectionPolicy;
/**
* Readahead for GET operations/skip, etc.
*/
@ -59,6 +65,7 @@ public class S3AReadOpContext extends S3AOpContext {
* @param dstFileStatus target file status
* @param inputPolicy the input policy
* @param readahead readahead for GET operations/skip, etc.
* @param changeDetectionPolicy change detection policy.
*/
public S3AReadOpContext(
final Path path,
@ -69,6 +76,7 @@ public class S3AReadOpContext extends S3AOpContext {
S3AInstrumentation instrumentation,
FileStatus dstFileStatus,
S3AInputPolicy inputPolicy,
ChangeDetectionPolicy changeDetectionPolicy,
final long readahead) {
super(isS3GuardEnabled, invoker, s3guardInvoker, stats, instrumentation,
dstFileStatus);
@ -76,6 +84,7 @@ public class S3AReadOpContext extends S3AOpContext {
Preconditions.checkArgument(readahead >= 0,
"invalid readahead %d", readahead);
this.inputPolicy = checkNotNull(inputPolicy);
this.changeDetectionPolicy = checkNotNull(changeDetectionPolicy);
this.readahead = readahead;
}
@ -110,6 +119,10 @@ public class S3AReadOpContext extends S3AOpContext {
return inputPolicy;
}
public ChangeDetectionPolicy getChangeDetectionPolicy() {
return changeDetectionPolicy;
}
/**
* Get the readahead for this operation.
* @return a value {@literal >=} 0
@ -125,6 +138,7 @@ public class S3AReadOpContext extends S3AOpContext {
sb.append("path=").append(path);
sb.append(", inputPolicy=").append(inputPolicy);
sb.append(", readahead=").append(readahead);
sb.append(", changeDetectionPolicy=").append(changeDetectionPolicy);
sb.append('}');
return sb.toString();
}

View File

@ -172,6 +172,13 @@ public class S3ARetryPolicy implements RetryPolicy {
policyMap.put(FileNotFoundException.class, fail);
policyMap.put(InvalidRequestException.class, fail);
// once the file has changed, trying again is not going to help
policyMap.put(RemoteFileChangedException.class, fail);
// likely only recovered by changing the policy configuration or s3
// implementation
policyMap.put(NoVersionAttributeException.class, fail);
// should really be handled by resubmitting to new location;
// that's beyond the scope of this retry policy
policyMap.put(AWSRedirectException.class, fail);

View File

@ -116,13 +116,15 @@ public enum Statistic {
STREAM_OPENED("stream_opened",
"Total count of times an input stream to object store was opened"),
STREAM_READ_EXCEPTIONS("stream_read_exceptions",
"Number of seek operations invoked on input streams"),
"Number of exceptions invoked on input streams"),
STREAM_READ_FULLY_OPERATIONS("stream_read_fully_operations",
"Count of readFully() operations in streams"),
STREAM_READ_OPERATIONS("stream_read_operations",
"Count of read() operations in streams"),
STREAM_READ_OPERATIONS_INCOMPLETE("stream_read_operations_incomplete",
"Count of incomplete read() operations in streams"),
STREAM_READ_VERSION_MISMATCHES("stream_read_version_mismatches",
"Count of version mismatches encountered while reading streams"),
STREAM_SEEK_BYTES_BACKWARDS("stream_bytes_backwards_on_seek",
"Count of bytes moved backwards during seek operations"),
STREAM_SEEK_BYTES_READ("stream_bytes_read",

View File

@ -0,0 +1,376 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3a.impl;
import java.util.Locale;
import com.amazonaws.services.s3.model.GetObjectRequest;
import com.amazonaws.services.s3.model.ObjectMetadata;
import com.google.common.annotations.VisibleForTesting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.s3a.RemoteFileChangedException;
import static org.apache.hadoop.fs.s3a.Constants.*;
/**
* Object change detection policy.
* Determines which attribute is used to detect change and what to do when
* change is detected.
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
public abstract class ChangeDetectionPolicy {
private static final Logger LOG =
LoggerFactory.getLogger(ChangeDetectionPolicy.class);
@VisibleForTesting
public static final String CHANGE_DETECTED = "change detected";
private final Mode mode;
private final boolean requireVersion;
/**
* Version support is only warned about once per S3A instance.
* This still means that on a long-lived application which destroys
* filesystems it'll appear once-per-query in the logs, but at least
* it will not appear once per file read.
*/
private final LogExactlyOnce logNoVersionSupport = new LogExactlyOnce(LOG);
/**
* The S3 object attribute used to detect change.
*/
public enum Source {
ETag(CHANGE_DETECT_SOURCE_ETAG),
VersionId(CHANGE_DETECT_SOURCE_VERSION_ID),
/** you can't ask for this explicitly outside of tests. */
None("none");
private final String source;
Source(String source) {
this.source = source;
}
private static Source fromString(String trimmed) {
for (Source value : values()) {
if (value.source.equals(trimmed)) {
return value;
}
}
LOG.warn("Unrecognized " + CHANGE_DETECT_SOURCE + " value: \"{}\"",
trimmed);
return fromString(CHANGE_DETECT_SOURCE_DEFAULT);
}
static Source fromConfiguration(Configuration configuration) {
String trimmed = configuration.get(CHANGE_DETECT_SOURCE,
CHANGE_DETECT_SOURCE_DEFAULT).trim()
.toLowerCase(Locale.ENGLISH);
return fromString(trimmed);
}
}
/**
* What to do when change is detected.
*/
public enum Mode {
/** Client side validation. */
Client(CHANGE_DETECT_MODE_CLIENT),
/** Server side validation. */
Server(CHANGE_DETECT_MODE_SERVER),
/** Warn but continue. */
Warn(CHANGE_DETECT_MODE_WARN),
/** No checks. */
None(CHANGE_DETECT_MODE_NONE);
private final String mode;
Mode(String mode) {
this.mode = mode;
}
private static Mode fromString(String trimmed) {
for (Mode value : values()) {
if (value.mode.equals(trimmed)) {
return value;
}
}
LOG.warn("Unrecognized " + CHANGE_DETECT_MODE + " value: \"{}\"",
trimmed);
return fromString(CHANGE_DETECT_MODE_DEFAULT);
}
static Mode fromConfiguration(Configuration configuration) {
String trimmed = configuration.get(CHANGE_DETECT_MODE,
CHANGE_DETECT_MODE_DEFAULT)
.trim()
.toLowerCase(Locale.ENGLISH);
return fromString(trimmed);
}
}
protected ChangeDetectionPolicy(Mode mode, boolean requireVersion) {
this.mode = mode;
this.requireVersion = requireVersion;
}
public Mode getMode() {
return mode;
}
public abstract Source getSource();
public boolean isRequireVersion() {
return requireVersion;
}
public LogExactlyOnce getLogNoVersionSupport() {
return logNoVersionSupport;
}
/**
* Reads the change detection policy from Configuration.
*
* @param configuration the configuration
* @return the policy
*/
public static ChangeDetectionPolicy getPolicy(Configuration configuration) {
Mode mode = Mode.fromConfiguration(configuration);
Source source = Source.fromConfiguration(configuration);
boolean requireVersion = configuration.getBoolean(
CHANGE_DETECT_REQUIRE_VERSION, CHANGE_DETECT_REQUIRE_VERSION_DEFAULT);
return createPolicy(mode, source, requireVersion);
}
/**
* Create a policy.
* @param mode mode pf checks
* @param source source of change
* @param requireVersion throw exception when no version available?
* @return the policy
*/
@VisibleForTesting
public static ChangeDetectionPolicy createPolicy(final Mode mode,
final Source source, final boolean requireVersion) {
switch (source) {
case ETag:
return new ETagChangeDetectionPolicy(mode, requireVersion);
case VersionId:
return new VersionIdChangeDetectionPolicy(mode, requireVersion);
default:
return new NoChangeDetection();
}
}
/**
* Pulls the attribute this policy uses to detect change out of the S3 object
* metadata. The policy generically refers to this attribute as
* {@code revisionId}.
*
* @param objectMetadata the s3 object metadata
* @param uri the URI of the object
* @return the revisionId string as interpreted by this policy, or potentially
* null if the attribute is unavailable (such as when the policy says to use
* versionId but object versioning is not enabled for the bucket).
*/
public abstract String getRevisionId(ObjectMetadata objectMetadata,
String uri);
/**
* Applies the given {@link #getRevisionId(ObjectMetadata, String) revisionId}
* as a server-side qualification on the {@code GetObjectRequest}.
*
* @param request the request
* @param revisionId the revision id
*/
public abstract void applyRevisionConstraint(GetObjectRequest request,
String revisionId);
/**
* Takes appropriate action based on {@link #getMode() mode} when a change has
* been detected.
*
* @param revisionId the expected revision id
* @param newRevisionId the detected revision id
* @param uri the URI of the object being accessed
* @param position the position being read in the object
* @param operation the operation being performed on the object (e.g. open or
* re-open) that triggered the change detection
* @param timesAlreadyDetected number of times a change has already been
* detected on the current stream
* @return a pair of: was a change detected, and any exception to throw.
* If the change was detected, this updates a counter in the stream
* statistics; If an exception was returned it is thrown after the counter
* update.
*/
public ImmutablePair<Boolean, RemoteFileChangedException> onChangeDetected(
String revisionId,
String newRevisionId,
String uri,
long position,
String operation,
long timesAlreadyDetected) {
switch (mode) {
case None:
// something changed; we don't care.
return new ImmutablePair<>(false, null);
case Warn:
if (timesAlreadyDetected == 0) {
// only warn on the first detection to avoid a noisy log
LOG.warn(
String.format("%s change detected on %s %s at %d. Expected %s got %s",
getSource(), operation, uri, position, revisionId,
newRevisionId));
return new ImmutablePair<>(true, null);
}
return new ImmutablePair<>(false, null);
case Client:
case Server:
default:
// mode == Client (or Server, but really won't be called for Server)
return new ImmutablePair<>(true,
new RemoteFileChangedException(uri,
operation,
String.format("%s "
+ CHANGE_DETECTED
+ " while reading at position %s."
+ " Expected %s got %s",
getSource(), position, revisionId, newRevisionId)));
}
}
/**
* Change detection policy based on {@link ObjectMetadata#getETag() eTag}.
*/
static class ETagChangeDetectionPolicy extends ChangeDetectionPolicy {
ETagChangeDetectionPolicy(Mode mode, boolean requireVersion) {
super(mode, requireVersion);
}
@Override
public String getRevisionId(ObjectMetadata objectMetadata, String uri) {
return objectMetadata.getETag();
}
@Override
public void applyRevisionConstraint(GetObjectRequest request,
String revisionId) {
LOG.debug("Restricting request to etag {}", revisionId);
request.withMatchingETagConstraint(revisionId);
}
@Override
public Source getSource() {
return Source.ETag;
}
@Override
public String toString() {
return "ETagChangeDetectionPolicy mode=" + getMode();
}
}
/**
* Change detection policy based on
* {@link ObjectMetadata#getVersionId() versionId}.
*/
static class VersionIdChangeDetectionPolicy extends
ChangeDetectionPolicy {
VersionIdChangeDetectionPolicy(Mode mode, boolean requireVersion) {
super(mode, requireVersion);
}
@Override
public String getRevisionId(ObjectMetadata objectMetadata, String uri) {
String versionId = objectMetadata.getVersionId();
if (versionId == null) {
// this policy doesn't work if the bucket doesn't have object versioning
// enabled (which isn't by default)
getLogNoVersionSupport().warn(
CHANGE_DETECT_MODE + " set to " + Source.VersionId
+ " but no versionId available while reading {}. "
+ "Ensure your bucket has object versioning enabled. "
+ "You may see inconsistent reads.",
uri);
}
return versionId;
}
@Override
public void applyRevisionConstraint(GetObjectRequest request,
String revisionId) {
LOG.debug("Restricting request to version {}", revisionId);
request.withVersionId(revisionId);
}
@Override
public Source getSource() {
return Source.VersionId;
}
@Override
public String toString() {
return "VersionIdChangeDetectionPolicy mode=" + getMode();
}
}
/**
* Don't check for changes.
*/
static class NoChangeDetection extends ChangeDetectionPolicy {
NoChangeDetection() {
super(Mode.None, false);
}
@Override
public Source getSource() {
return Source.None;
}
@Override
public String getRevisionId(final ObjectMetadata objectMetadata,
final String uri) {
return null;
}
@Override
public void applyRevisionConstraint(final GetObjectRequest request,
final String revisionId) {
}
@Override
public String toString() {
return "NoChangeDetection";
}
}
}

View File

@ -0,0 +1,196 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3a.impl;
import java.util.concurrent.atomic.AtomicLong;
import com.amazonaws.services.s3.model.GetObjectRequest;
import com.amazonaws.services.s3.model.ObjectMetadata;
import com.amazonaws.services.s3.model.S3Object;
import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.fs.s3a.NoVersionAttributeException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.PathIOException;
import org.apache.hadoop.fs.s3a.RemoteFileChangedException;
import static com.google.common.base.Preconditions.checkNotNull;
/**
* Change tracking for input streams: the revision ID/etag
* the previous request is recorded and when the next request comes in,
* it is compared.
* Self-contained for testing and use in different streams.
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
public class ChangeTracker {
private static final Logger LOG =
LoggerFactory.getLogger(ChangeTracker.class);
public static final String CHANGE_REPORTED_BY_S3 = "reported by S3";
/** Policy to use. */
private final ChangeDetectionPolicy policy;
/**
* URI of file being read.
*/
private final String uri;
/**
* Mismatch counter; expected to be wired up to StreamStatistics except
* during testing.
*/
private final AtomicLong versionMismatches;
/**
* Revision identifier (e.g. eTag or versionId, depending on change
* detection policy).
*/
private String revisionId;
/**
* Create a change tracker.
* @param uri URI of object being tracked
* @param policy policy to track.
* @param versionMismatches reference to the version mismatch counter
*/
public ChangeTracker(final String uri,
final ChangeDetectionPolicy policy,
final AtomicLong versionMismatches) {
this.policy = checkNotNull(policy);
this.uri = uri;
this.versionMismatches = versionMismatches;
}
public String getRevisionId() {
return revisionId;
}
public ChangeDetectionPolicy.Source getSource() {
return policy.getSource();
}
@VisibleForTesting
public AtomicLong getVersionMismatches() {
return versionMismatches;
}
/**
* Apply any revision control set by the policy if it is to be
* enforced on the server.
* @param request request to modify
* @return true iff a constraint was added.
*/
public boolean maybeApplyConstraint(
final GetObjectRequest request) {
if (policy.getMode() == ChangeDetectionPolicy.Mode.Server
&& revisionId != null) {
policy.applyRevisionConstraint(request, revisionId);
return true;
}
return false;
}
/**
* Process the response from the server for validation against the
* change policy.
* @param object object returned; may be null.
* @param operation operation in progress.
* @param pos offset of read
* @throws PathIOException raised on failure
* @throws RemoteFileChangedException if the remote file has changed.
*/
public void processResponse(final S3Object object,
final String operation,
final long pos) throws PathIOException {
if (object == null) {
// no object returned. Either mismatch or something odd.
if (revisionId != null) {
// the requirements of the change detection policy wasn't met: the
// object was not returned.
versionMismatches.incrementAndGet();
throw new RemoteFileChangedException(uri, operation,
String.format("%s change "
+ CHANGE_REPORTED_BY_S3
+ " while reading"
+ " at position %s."
+ " Version %s was unavailable",
getSource(),
pos,
getRevisionId()));
} else {
throw new PathIOException(uri, "No data returned from GET request");
}
}
final ObjectMetadata metadata = object.getObjectMetadata();
final String newRevisionId = policy.getRevisionId(metadata, uri);
if (newRevisionId == null && policy.isRequireVersion()) {
throw new NoVersionAttributeException(uri, String.format(
"Change detection policy requires %s",
policy.getSource()));
}
if (revisionId == null) {
// revisionId is null on first (re)open. Pin it so change can be detected
// if object has been updated
LOG.debug("Setting revision ID for object at {}: {}",
uri, newRevisionId);
revisionId = newRevisionId;
} else if (!revisionId.equals(newRevisionId)) {
LOG.debug("Revision ID changed from {} to {}",
revisionId, newRevisionId);
ImmutablePair<Boolean, RemoteFileChangedException> pair =
policy.onChangeDetected(
revisionId,
newRevisionId,
uri,
pos,
operation,
versionMismatches.get());
if (pair.left) {
// an mismatch has occurred: note it.
versionMismatches.incrementAndGet();
}
if (pair.right != null) {
// there's an exception to raise: do it
throw pair.right;
}
}
}
@Override
public String toString() {
final StringBuilder sb = new StringBuilder(
"ChangeTracker{");
sb.append("changeDetectionPolicy=").append(policy);
sb.append(", revisionId='").append(revisionId).append('\'');
sb.append('}');
return sb.toString();
}
}

View File

@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3a.impl;
import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.Logger;
/**
* Log exactly once, even across threads.
*/
public class LogExactlyOnce {
private final AtomicBoolean logged = new AtomicBoolean(false);
private final Logger log;
public LogExactlyOnce(final Logger log) {
this.log = log;
}
public void warn(String format, Object...args) {
if (!logged.getAndSet(true)) {
log.warn(format, args);
}
}
}

View File

@ -0,0 +1,30 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* Implementation classes private to the S3A store.
* Do not use outside of the hadoop-aws module.
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
package org.apache.hadoop.fs.s3a.impl;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;

View File

@ -1144,6 +1144,131 @@ the capacity through `hadoop s3guard set-capacity` (and pay more, obviously).
## Handling Read-During-Overwrite
Read-during-overwrite is the condition where a writer overwrites a file while
a reader has an open input stream on the file. Depending on configuration,
the S3AFileSystem may detect this and throw a `RemoteFileChangedException` in
conditions where the reader's input stream might otherwise silently switch over
from reading bytes from the original version of the file to reading bytes from
the new version.
The configurations items controlling this behavior are:
```xml
<property>
<name>fs.s3a.change.detection.source</name>
<value>etag</value>
<description>
Select which S3 object attribute to use for change detection.
Currently support 'etag' for S3 object eTags and 'versionid' for
S3 object version IDs. Use of version IDs requires object versioning to be
enabled for each S3 bucket utilized. Object versioning is disabled on
buckets by default. When version ID is used, the buckets utilized should
have versioning enabled before any data is written.
</description>
</property>
<property>
<name>fs.s3a.change.detection.mode</name>
<value>server</value>
<description>
Determines how change detection is applied to alert to S3 objects
rewritten while being read. Value 'server' indicates to apply the attribute
constraint directly on GetObject requests to S3. Value 'client' means to do a
client-side comparison of the attribute value returned in the response. Value
'server' would not work with third-party S3 implementations that do not
support these constraints on GetObject. Values 'server' and 'client' generate
RemoteObjectChangedException when a mismatch is detected. Value 'warn' works
like 'client' but generates only a warning. Value 'none' will ignore change
detection completely.
</description>
</property>
<property>
<name>fs.s3a.change.detection.version.required</name>
<value>true</value>
<description>
Determines if S3 object version attribute defined by
fs.s3.change.detection.source should be treated as required. If true and the
referred attribute is unavailable in an S3 GetObject response,
NoVersionAttributeException is thrown. Setting to 'true' is encouraged to
avoid potential for inconsistent reads with third-party S3 implementations or
against S3 buckets that have object versioning disabled.
</description>
</property>
```
In the default configuration, S3 object eTags are used to detect changes. When
the filesystem retrieves a file from S3 using
[Get Object](https://docs.aws.amazon.com/AmazonS3/latest/API/RESTObjectGET.html),
it captures the eTag and uses that eTag in an 'If-Match' condition on each
subsequent request. If a concurrent writer has overwritten the file, the
'If-Match' condition will fail and a RemoteFileChangedException will be thrown.
Even in this default configuration, a new write may not trigger this exception
on an open reader. For example, if the reader only reads forward in the file
then only a single S3 'Get Object' request is made and the full contents of the
file are streamed from a single response. An overwrite of the file after the
'Get Object' request would not be seen at all by a reader with an input stream
that had already read the first byte. Seeks backward on the other hand can
result in new 'Get Object' requests that can trigger the
`RemoteFileChangedException`.
Additionally, due to the eventual consistency of S3 in a read-after-overwrite
scenario, visibility of a new write may be delayed, avoiding the
`RemoteFileChangedException` for some readers. That said, if a reader does not
see `RemoteFileChangedException`, they will have at least read a consistent view
of a single version of the file (the version available when they started
reading).
### Change detection with S3 Versions.
It is possible to switch to using the
[S3 object version id](https://docs.aws.amazon.com/AmazonS3/latest/dev/ObjectVersioning.html)
instead of eTag as the change detection mechanism. Use of this option requires
object versioning to be enabled on any S3 buckets used by the filesystem. The
benefit of using version id instead of eTag is potentially reduced frequency
of RemoteFileChangedException. With object versioning enabled, old versions
of objects remain available after they have been overwritten.
This means an open input stream will still be able to seek backwards after a
concurrent writer has overwritten the file.
The reader will retain their consistent view of the version of the file from
which they read the first byte.
Because the version ID is null for objects written prior to enablement of
object versioning, **this option should only be used when the S3 buckets
have object versioning enabled from the beginning.**
Note: when you rename files the copied files may have a different version number.
### Change Detection Modes.
Configurable change detection mode is the next option. Different modes are
available primarily for compatibility with third-party S3 implementations which
may not support all change detection mechanisms.
* `server`: the version/etag check is performed on the server by adding
extra headers to the `GET` request. This is the default.
* `client` : check on the client by comparing the eTag/version ID of a
reopened file with the previous version.
This is useful when the implementation doesn't support the `If-Match` header.
* `warn`: check on the client, but only warn on a mismatch, rather than fail.
* `none` do not check. Useful if the implementation doesn't provide eTag
or version ID support at all or you would like to retain previous behavior
where the reader's input stream silently switches over to the new object version
(not recommended).
The final option (`fs.s3a.change.detection.version.required`) is present
primarily to ensure the filesystem doesn't silently ignore the condition
where it is configured to use version ID on a bucket that doesn't have
object versioning enabled or alternatively it is configured to use eTag on
an S3 implementation that doesn't return eTags.
When `true` (default) and 'Get Object' doesn't return eTag or
version ID (depending on configured 'source'), a `NoVersionAttributeException`
will be thrown. When `false` and and eTag or version ID is not returned,
the stream can be read, but without any version checking.
## <a name="per_bucket_configuration"></a>Configuring different S3 buckets with Per-Bucket Configuration

View File

@ -966,6 +966,69 @@ if it is required that the data is persisted durably after every
This includes resilient logging, HBase-style journaling
and the like. The standard strategy here is to save to HDFS and then copy to S3.
### `RemoteFileChangedException` and read-during-overwrite
```
org.apache.hadoop.fs.s3a.RemoteFileChangedException: re-open `s3a://my-bucket/test/file.txt':
ETag change reported by S3 while reading at position 1949.
Version f9c186d787d4de9657e99f280ba26555 was unavailable
at org.apache.hadoop.fs.s3a.impl.ChangeTracker.processResponse(ChangeTracker.java:137)
at org.apache.hadoop.fs.s3a.S3AInputStream.reopen(S3AInputStream.java:200)
at org.apache.hadoop.fs.s3a.S3AInputStream.lambda$lazySeek$1(S3AInputStream.java:346)
at org.apache.hadoop.fs.s3a.Invoker.lambda$retry$2(Invoker.java:195)
at org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:109)
at org.apache.hadoop.fs.s3a.Invoker.lambda$retry$3(Invoker.java:265)
at org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:322)
at org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:261)
at org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:193)
at org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:215)
at org.apache.hadoop.fs.s3a.S3AInputStream.lazySeek(S3AInputStream.java:339)
at org.apache.hadoop.fs.s3a.S3AInputStream.read(S3AInputStream.java:372)
```
If an S3 object is updated while an S3A filesystem reader has an open
`InputStream` on it, the reader may encounter `RemoteFileChangedException`. This
occurs if the S3A `InputStream` needs to re-open the object (e.g. during a seek())
and detects the change.
If the change detection mode is configured to 'warn', a warning like the
following will be seen instead of `RemoteFileChangedException`:
```
WARN - ETag change detected on re-open s3a://my-bucket/test/readFileToChange.txt at 1949.
Expected f9c186d787d4de9657e99f280ba26555 got 043abff21b7bd068d2d2f27ccca70309
```
Using a third-party S3 implementation that doesn't support eTags might result in
the following error.
```
org.apache.hadoop.fs.s3a.NoVersionAttributeException: `s3a://my-bucket/test/file.txt':
Change detection policy requires ETag
at org.apache.hadoop.fs.s3a.impl.ChangeTracker.processResponse(ChangeTracker.java:153)
at org.apache.hadoop.fs.s3a.S3AInputStream.reopen(S3AInputStream.java:200)
at org.apache.hadoop.fs.s3a.S3AInputStream.lambda$lazySeek$1(S3AInputStream.java:346)
at org.apache.hadoop.fs.s3a.Invoker.lambda$retry$2(Invoker.java:195)
at org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:109)
at org.apache.hadoop.fs.s3a.Invoker.lambda$retry$3(Invoker.java:265)
at org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:322)
at org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:261)
at org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:193)
at org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:215)
at org.apache.hadoop.fs.s3a.S3AInputStream.lazySeek(S3AInputStream.java:339)
at org.apache.hadoop.fs.s3a.S3AInputStream.read(S3AInputStream.java:372)
```
If the change policy is `versionid` there are a number of possible causes
* The bucket does not have object versioning enabled.
* The bucket does have versioning enabled, but the object being read was created
before versioning was enabled.
* The bucket is on a third-party store which does not support object versioning.
See [Handling Read-During-Overwrite](./index.html#handling_read-during-overwrite)
for more information.
## <a name="encryption"></a> S3 Server Side Encryption
### `AWSS3IOException` `KMS.NotFoundException` "Invalid arn" when using SSE-KMS

View File

@ -20,17 +20,14 @@ package org.apache.hadoop.fs.s3a;
import com.amazonaws.services.s3.model.DeleteObjectsRequest;
import com.amazonaws.services.s3.model.MultiObjectDeleteException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.EOFException;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
@ -40,8 +37,7 @@ import static org.apache.hadoop.fs.s3a.S3ATestUtils.getLandsatCSVPath;
import static org.apache.hadoop.test.LambdaTestUtils.*;
/**
* Test S3A Failure translation, including a functional test
* generating errors during stream IO.
* Test S3A Failure translation.
*/
public class ITestS3AFailureHandling extends AbstractS3ATestBase {
private static final Logger LOG =
@ -54,65 +50,6 @@ public class ITestS3AFailureHandling extends AbstractS3ATestBase {
conf.setBoolean(Constants.ENABLE_MULTI_DELETE, true);
return conf;
}
@Test
public void testReadFileChanged() throws Throwable {
describe("overwrite a file with a shorter one during a read, seek");
final int fullLength = 8192;
final byte[] fullDataset = dataset(fullLength, 'a', 32);
final int shortLen = 4096;
final byte[] shortDataset = dataset(shortLen, 'A', 32);
final FileSystem fs = getFileSystem();
final Path testpath = path("readFileToChange.txt");
// initial write
writeDataset(fs, testpath, fullDataset, fullDataset.length, 1024, false);
try(FSDataInputStream instream = fs.open(testpath)) {
instream.seek(fullLength - 16);
assertTrue("no data to read", instream.read() >= 0);
// overwrite
writeDataset(fs, testpath, shortDataset, shortDataset.length, 1024, true);
// here the file length is less. Probe the file to see if this is true,
// with a spin and wait
eventually(30 * 1000, 1000,
() -> {
assertEquals(shortLen, fs.getFileStatus(testpath).getLen());
});
// here length is shorter. Assuming it has propagated to all replicas,
// the position of the input stream is now beyond the EOF.
// An attempt to seek backwards to a position greater than the
// short length will raise an exception from AWS S3, which must be
// translated into an EOF
instream.seek(shortLen + 1024);
int c = instream.read();
assertIsEOF("read()", c);
byte[] buf = new byte[256];
assertIsEOF("read(buffer)", instream.read(buf));
assertIsEOF("read(offset)",
instream.read(instream.getPos(), buf, 0, buf.length));
// now do a block read fully, again, backwards from the current pos
intercept(EOFException.class, "", "readfully",
() -> instream.readFully(shortLen + 512, buf));
assertIsEOF("read(offset)",
instream.read(shortLen + 510, buf, 0, buf.length));
// seek somewhere useful
instream.seek(shortLen - 256);
// delete the file. Reads must fail
fs.delete(testpath, false);
intercept(FileNotFoundException.class, "", "read()",
() -> instream.read());
intercept(FileNotFoundException.class, "", "readfully",
() -> instream.readFully(2048, buf));
}
}
/**
* Assert that a read operation returned an EOF value.

View File

@ -0,0 +1,197 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3a;
import java.io.FileNotFoundException;
import java.util.Arrays;
import java.util.Collection;
import org.junit.Assume;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.s3a.impl.ChangeDetectionPolicy.Source;
import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset;
import static org.apache.hadoop.fs.contract.ContractTestUtils.writeDataset;
import static org.apache.hadoop.fs.s3a.Constants.*;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.getTestBucketName;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBucketOverrides;
import static org.apache.hadoop.test.LambdaTestUtils.eventually;
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
/**
* Test S3A remote file change detection.
*/
@RunWith(Parameterized.class)
public class ITestS3ARemoteFileChanged extends AbstractS3ATestBase {
private static final Logger LOG =
LoggerFactory.getLogger(ITestS3ARemoteFileChanged.class);
private final String changeDetectionSource;
private final String changeDetectionMode;
private final boolean expectChangeException;
private final boolean expectFileNotFoundException;
@Parameterized.Parameters
public static Collection<Object[]> params() {
return Arrays.asList(new Object[][]{
// make sure it works with invalid config
{"bogus", "bogus", true, true},
// test with etag
{CHANGE_DETECT_SOURCE_ETAG, CHANGE_DETECT_MODE_SERVER, true, true},
{CHANGE_DETECT_SOURCE_ETAG, CHANGE_DETECT_MODE_CLIENT, true, true},
{CHANGE_DETECT_SOURCE_ETAG, CHANGE_DETECT_MODE_WARN, false, true},
{CHANGE_DETECT_SOURCE_ETAG, CHANGE_DETECT_MODE_NONE, false, true},
// test with versionId
// when using server-side versionId, the exceptions shouldn't happen
// since the previous version will still be available
{CHANGE_DETECT_SOURCE_VERSION_ID, CHANGE_DETECT_MODE_SERVER, false,
false},
// with client-side versionId it will behave similar to client-side eTag
{CHANGE_DETECT_SOURCE_VERSION_ID, CHANGE_DETECT_MODE_CLIENT, true,
true},
{CHANGE_DETECT_SOURCE_VERSION_ID, CHANGE_DETECT_MODE_WARN, false, true},
{CHANGE_DETECT_SOURCE_VERSION_ID, CHANGE_DETECT_MODE_NONE, false, true}
});
}
public ITestS3ARemoteFileChanged(String changeDetectionSource,
String changeDetectionMode,
boolean expectException,
boolean expectFileNotFoundException) {
this.changeDetectionSource = changeDetectionSource;
this.changeDetectionMode = changeDetectionMode;
this.expectChangeException = expectException;
this.expectFileNotFoundException = expectFileNotFoundException;
}
@Override
protected Configuration createConfiguration() {
Configuration conf = super.createConfiguration();
String bucketName = getTestBucketName(conf);
removeBucketOverrides(bucketName, conf,
CHANGE_DETECT_SOURCE,
CHANGE_DETECT_MODE);
conf.set(CHANGE_DETECT_SOURCE, changeDetectionSource);
conf.set(CHANGE_DETECT_MODE, changeDetectionMode);
S3ATestUtils.disableFilesystemCaching(conf);
return conf;
}
@Test
public void testReadFileChanged() throws Throwable {
final int originalLength = 8192;
final byte[] originalDataset = dataset(originalLength, 'a', 32);
final int newLength = originalLength + 1;
final byte[] newDataset = dataset(newLength, 'A', 32);
final S3AFileSystem fs = getFileSystem();
final Path testpath = path("readFileToChange.txt");
// initial write
writeDataset(fs, testpath, originalDataset, originalDataset.length,
1024, false);
if (fs.getChangeDetectionPolicy().getSource() == Source.VersionId) {
// skip versionId tests if the bucket doesn't have object versioning
// enabled
Assume.assumeTrue(
"Target filesystem does not support versioning",
fs.getObjectMetadata(fs.pathToKey(testpath)).getVersionId() != null);
}
try(FSDataInputStream instream = fs.open(testpath)) {
// seek forward and read successfully
instream.seek(1024);
assertTrue("no data to read", instream.read() >= 0);
// overwrite
writeDataset(fs, testpath, newDataset, newDataset.length, 1024, true);
// here the new file length is larger. Probe the file to see if this is
// true, with a spin and wait
eventually(30 * 1000, 1000,
() -> {
assertEquals(newLength, fs.getFileStatus(testpath).getLen());
});
// With the new file version in place, any subsequent S3 read by
// eTag/versionId will fail. A new read by eTag/versionId will occur in
// reopen() on read after a seek() backwards. We verify seek backwards
// results in the expected exception and seek() forward works without
// issue.
// first check seek forward
instream.seek(2048);
assertTrue("no data to read", instream.read() >= 0);
// now check seek backward
instream.seek(instream.getPos() - 100);
if (expectChangeException) {
intercept(RemoteFileChangedException.class, "", "read",
() -> instream.read());
} else {
instream.read();
}
byte[] buf = new byte[256];
// seek backward
instream.seek(0);
if (expectChangeException) {
intercept(RemoteFileChangedException.class, "", "read",
() -> instream.read(buf));
intercept(RemoteFileChangedException.class, "", "read",
() -> instream.read(0, buf, 0, buf.length));
intercept(RemoteFileChangedException.class, "", "readfully",
() -> instream.readFully(0, buf));
} else {
instream.read(buf);
instream.read(0, buf, 0, buf.length);
instream.readFully(0, buf);
}
// delete the file. Reads must fail
fs.delete(testpath, false);
// seek backward
instream.seek(0);
if (expectFileNotFoundException) {
intercept(FileNotFoundException.class, "", "read()",
() -> instream.read());
intercept(FileNotFoundException.class, "", "readfully",
() -> instream.readFully(2048, buf));
} else {
instream.read();
instream.readFully(2048, buf);
}
}
}
}

View File

@ -61,6 +61,7 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import static com.google.common.base.Preconditions.checkNotNull;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_CREDENTIAL_PROVIDER_PATH;
import static org.apache.commons.lang3.StringUtils.isNotEmpty;
import static org.apache.hadoop.fs.contract.ContractTestUtils.skip;
@ -695,6 +696,18 @@ public final class S3ATestUtils {
return after;
}
/**
* Get the name of the test bucket.
* @param conf configuration to scan.
* @return the bucket name from the config.
* @throws NullPointerException: no test bucket
*/
public static String getTestBucketName(final Configuration conf) {
String bucket = checkNotNull(conf.get(TEST_FS_S3A_NAME),
"No test bucket");
return URI.create(bucket).getHost();
}
/**
* Remove any values from a bucket.
* @param bucket bucket whose overrides are to be removed. Can be null/empty

View File

@ -0,0 +1,255 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3a;
import java.util.concurrent.atomic.AtomicLong;
import com.amazonaws.services.s3.Headers;
import com.amazonaws.services.s3.model.GetObjectRequest;
import com.amazonaws.services.s3.model.ObjectMetadata;
import com.amazonaws.services.s3.model.S3Object;
import org.apache.hadoop.fs.PathIOException;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.fs.s3a.impl.ChangeDetectionPolicy;
import org.apache.hadoop.fs.s3a.impl.ChangeTracker;
import org.apache.hadoop.test.HadoopTestBase;
import static org.apache.hadoop.fs.s3a.impl.ChangeDetectionPolicy.CHANGE_DETECTED;
import static org.apache.hadoop.fs.s3a.impl.ChangeDetectionPolicy.createPolicy;
import static org.apache.hadoop.fs.s3a.impl.ChangeTracker.CHANGE_REPORTED_BY_S3;
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
/**
* Test {@link ChangeTracker}.
*/
public class TestStreamChangeTracker extends HadoopTestBase {
private static final Logger LOG =
LoggerFactory.getLogger(TestStreamChangeTracker.class);
public static final String BUCKET = "bucket";
public static final String OBJECT = "object";
public static final String URI = "s3a://" + BUCKET + "/" + OBJECT;
@Test
public void testVersionCheckingHandlingNoVersions() throws Throwable {
LOG.info("If an endpoint doesn't return versions, that's OK");
ChangeTracker tracker = newTracker(
ChangeDetectionPolicy.Mode.Client,
ChangeDetectionPolicy.Source.VersionId,
false);
assertFalse("Tracker should not have applied contraints " + tracker,
tracker.maybeApplyConstraint(newGetObjectRequest()));
tracker.processResponse(
newResponse(null, null),
"", 0);
assertTrackerMismatchCount(tracker, 0);
}
@Test
public void testVersionCheckingHandlingNoVersionsVersionRequired()
throws Throwable {
LOG.info("If an endpoint doesn't return versions but we are configured to"
+ "require them");
ChangeTracker tracker = newTracker(
ChangeDetectionPolicy.Mode.Client,
ChangeDetectionPolicy.Source.VersionId,
true);
expectNoVersionAttributeException(tracker, newResponse(null, null),
"policy requires VersionId");
}
@Test
public void testEtagCheckingWarn() throws Throwable {
LOG.info("If an endpoint doesn't return errors, that's OK");
ChangeTracker tracker = newTracker(
ChangeDetectionPolicy.Mode.Warn,
ChangeDetectionPolicy.Source.ETag,
false);
assertFalse("Tracker should not have applied constraints " + tracker,
tracker.maybeApplyConstraint(newGetObjectRequest()));
tracker.processResponse(
newResponse("e1", null),
"", 0);
tracker.processResponse(
newResponse("e1", null),
"", 0);
tracker.processResponse(
newResponse("e2", null),
"", 0);
assertTrackerMismatchCount(tracker, 1);
// subsequent error triggers doesn't trigger another warning
tracker.processResponse(
newResponse("e2", null),
"", 0);
assertTrackerMismatchCount(tracker, 1);
}
@Test
public void testVersionCheckingOnClient() throws Throwable {
LOG.info("Verify the client-side version checker raises exceptions");
ChangeTracker tracker = newTracker(
ChangeDetectionPolicy.Mode.Client,
ChangeDetectionPolicy.Source.VersionId,
false);
assertFalse("Tracker should not have applied constraints " + tracker,
tracker.maybeApplyConstraint(newGetObjectRequest()));
tracker.processResponse(
newResponse(null, "rev1"),
"", 0);
assertTrackerMismatchCount(tracker, 0);
assertRevisionId(tracker, "rev1");
GetObjectRequest request = newGetObjectRequest();
expectChangeException(tracker,
newResponse(null, "rev2"), "change detected");
// mismatch was noted (so gets to FS stats)
assertTrackerMismatchCount(tracker, 1);
// another read causes another exception
expectChangeException(tracker,
newResponse(null, "rev2"), "change detected");
// mismatch was noted again
assertTrackerMismatchCount(tracker, 2);
}
@Test
public void testVersionCheckingOnServer() throws Throwable {
LOG.info("Verify the client-side version checker handles null-ness");
ChangeTracker tracker = newTracker(
ChangeDetectionPolicy.Mode.Server,
ChangeDetectionPolicy.Source.VersionId,
false);
assertFalse("Tracker should not have applied contraints " + tracker,
tracker.maybeApplyConstraint(newGetObjectRequest()));
tracker.processResponse(
newResponse(null, "rev1"),
"", 0);
assertTrackerMismatchCount(tracker, 0);
assertRevisionId(tracker, "rev1");
GetObjectRequest request = newGetObjectRequest();
assertConstraintApplied(tracker, request);
// now, the tracker expects a null response
expectChangeException(tracker, null, CHANGE_REPORTED_BY_S3);
assertTrackerMismatchCount(tracker, 1);
// now, imagine the server doesn't trigger a failure due to some
// bug in its logic
// we should still react to the reported value
expectChangeException(tracker,
newResponse(null, "rev2"),
CHANGE_DETECTED);
}
protected void assertConstraintApplied(final ChangeTracker tracker,
final GetObjectRequest request) {
assertTrue("Tracker should have applied contraints " + tracker,
tracker.maybeApplyConstraint(request));
}
protected RemoteFileChangedException expectChangeException(
final ChangeTracker tracker,
final S3Object response,
final String message) throws Exception {
return expectException(tracker, response, message,
RemoteFileChangedException.class);
}
protected PathIOException expectNoVersionAttributeException(
final ChangeTracker tracker,
final S3Object response,
final String message) throws Exception {
return expectException(tracker, response, message,
NoVersionAttributeException.class);
}
protected <T extends Exception> T expectException(
final ChangeTracker tracker,
final S3Object response,
final String message,
final Class<T> clazz) throws Exception {
return intercept(
clazz,
message,
() -> {
tracker.processResponse(response, "", 0);
return tracker;
});
}
protected void assertRevisionId(final ChangeTracker tracker,
final String revId) {
assertEquals("Wrong revision ID in " + tracker,
revId, tracker.getRevisionId());
}
protected void assertTrackerMismatchCount(
final ChangeTracker tracker,
final int expectedCount) {
assertEquals("counter in tracker " + tracker,
expectedCount, tracker.getVersionMismatches().get());
}
/**
* Create tracker.
* Contains standard assertions(s).
* @return the tracker.
*/
protected ChangeTracker newTracker(final ChangeDetectionPolicy.Mode mode,
final ChangeDetectionPolicy.Source source, boolean requireVersion) {
ChangeDetectionPolicy policy = createPolicy(
mode,
source,
requireVersion);
ChangeTracker tracker = new ChangeTracker(URI, policy,
new AtomicLong(0));
assertFalse("Tracker should not have applied constraints " + tracker,
tracker.maybeApplyConstraint(newGetObjectRequest()));
return tracker;
}
private GetObjectRequest newGetObjectRequest() {
return new GetObjectRequest(BUCKET, OBJECT);
}
private S3Object newResponse(String etag, String versionId) {
ObjectMetadata md = new ObjectMetadata();
if (etag != null) {
md.setHeader(Headers.ETAG, etag);
}
if (versionId != null) {
md.setHeader(Headers.S3_VERSION_ID, versionId);
}
S3Object response = emptyResponse();
response.setObjectMetadata(md);
return response;
}
private S3Object emptyResponse() {
S3Object response = new S3Object();
response.setBucketName(BUCKET);
response.setKey(OBJECT);
return response;
}
}