HADOOP-16490. Avoid/handle cached 404s during S3A file creation.
Contributed by Steve Loughran. This patch avoids issuing any HEAD path request when creating a file with overwrite=true, so 404s will not end up in the S3 load balancers unless someone calls getFileStatus/exists/isFile in their own code. The Hadoop FsShell CommandWithDestination class is modified to not register uncreated files for deleteOnExit(), because that calls exists() and so can place the 404 in the cache, even after S3A is patched to not do it itself. Because S3Guard knows when a file should be present, it adds a special FileNotFound retry policy independently configurable from other retry policies; it is also exponential, but with different parameters. This is because every HEAD request will refresh any 404 cached in the S3 Load Balancers. It's not enough to retry: we have to have a suitable gap between attempts to (hopefully) ensure any cached entry wil be gone. The options and values are: fs.s3a.s3guard.consistency.retry.interval: 2s fs.s3a.s3guard.consistency.retry.limit: 7 The S3A copy() method used during rename() raises a RemoteFileChangedException which is not caught so not downgraded to false. Thus: when a rename is unrecoverable, this fact is propagated. Copy operations without S3Guard lack the confidence that the file exists, so don't retry the same way: it will fail fast with a different error message. However, because create(path, overwrite=false) no longer does HEAD path, we can at least be confident that S3A itself is not creating those cached 404 markers. Change-Id: Ia7807faad8b9a8546836cb19f816cccf17cca26d
This commit is contained in:
parent
5a381f73e9
commit
9221704f85
|
@ -458,7 +458,7 @@ abstract public class Command extends Configured {
|
|||
if (e instanceof InterruptedIOException) {
|
||||
throw new CommandInterruptException();
|
||||
}
|
||||
|
||||
LOG.debug("{} failure", getName(), e);
|
||||
String errorMessage = e.getLocalizedMessage();
|
||||
if (errorMessage == null) {
|
||||
// this is an unexpected condition, so dump the whole exception since
|
||||
|
|
|
@ -30,6 +30,9 @@ import java.util.Map;
|
|||
import java.util.Map.Entry;
|
||||
import java.util.NoSuchElementException;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import org.apache.hadoop.fs.CreateFlag;
|
||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
|
@ -59,6 +62,10 @@ import static org.apache.hadoop.fs.CreateFlag.LAZY_PERSIST;
|
|||
* a destination directory.
|
||||
*/
|
||||
abstract class CommandWithDestination extends FsCommand {
|
||||
|
||||
protected static final Logger LOG = LoggerFactory.getLogger(
|
||||
CommandWithDestination.class);
|
||||
|
||||
protected PathData dst;
|
||||
private boolean overwrite = false;
|
||||
private boolean verifyChecksum = true;
|
||||
|
@ -220,6 +227,7 @@ abstract class CommandWithDestination extends FsCommand {
|
|||
}
|
||||
} else if (dst.exists) {
|
||||
if (!dst.stat.isDirectory() && !overwrite) {
|
||||
LOG.debug("Destination file exists: {}", dst.stat);
|
||||
throw new PathExistsException(dst.toString());
|
||||
}
|
||||
} else if (!dst.parentExists()) {
|
||||
|
@ -407,6 +415,7 @@ abstract class CommandWithDestination extends FsCommand {
|
|||
targetFs.setWriteChecksum(writeChecksum);
|
||||
targetFs.writeStreamToFile(in, tempTarget, lazyPersist, direct);
|
||||
if (!direct) {
|
||||
targetFs.deleteOnExit(tempTarget.path);
|
||||
targetFs.rename(tempTarget, target);
|
||||
}
|
||||
} finally {
|
||||
|
@ -484,6 +493,15 @@ abstract class CommandWithDestination extends FsCommand {
|
|||
try {
|
||||
out = create(target, lazyPersist, direct);
|
||||
IOUtils.copyBytes(in, out, getConf(), true);
|
||||
} catch (IOException e) {
|
||||
// failure: clean up if we got as far as creating the file
|
||||
if (!direct && out != null) {
|
||||
try {
|
||||
fs.delete(target.path, false);
|
||||
} catch (IOException ignored) {
|
||||
}
|
||||
}
|
||||
throw e;
|
||||
} finally {
|
||||
IOUtils.closeStream(out); // just in case copyBytes didn't
|
||||
}
|
||||
|
@ -493,7 +511,6 @@ abstract class CommandWithDestination extends FsCommand {
|
|||
FSDataOutputStream create(PathData item, boolean lazyPersist,
|
||||
boolean direct)
|
||||
throws IOException {
|
||||
try {
|
||||
if (lazyPersist) {
|
||||
long defaultBlockSize;
|
||||
try {
|
||||
|
@ -520,11 +537,6 @@ abstract class CommandWithDestination extends FsCommand {
|
|||
} else {
|
||||
return create(item.path, true);
|
||||
}
|
||||
} finally { // might have been created but stream was interrupted
|
||||
if (!direct) {
|
||||
deleteOnExit(item.path);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void rename(PathData src, PathData target) throws IOException {
|
||||
|
|
|
@ -1663,7 +1663,7 @@
|
|||
<value>7</value>
|
||||
<description>
|
||||
Number of times to retry any repeatable S3 client request on failure,
|
||||
excluding throttling requests.
|
||||
excluding throttling requests and S3Guard inconsistency resolution.
|
||||
</description>
|
||||
</property>
|
||||
|
||||
|
@ -1672,7 +1672,7 @@
|
|||
<value>500ms</value>
|
||||
<description>
|
||||
Initial retry interval when retrying operations for any reason other
|
||||
than S3 throttle errors.
|
||||
than S3 throttle errors and S3Guard inconsistency resolution.
|
||||
</description>
|
||||
</property>
|
||||
|
||||
|
@ -1692,6 +1692,27 @@
|
|||
</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>fs.s3a.s3guard.consistency.retry.limit</name>
|
||||
<value>7</value>
|
||||
<description>
|
||||
Number of times to retry attempts to read/open/copy files when
|
||||
S3Guard believes a specific version of the file to be available,
|
||||
but the S3 request does not find any version of a file, or a different
|
||||
version.
|
||||
</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>fs.s3a.s3guard.consistency.retry.interval</name>
|
||||
<value>2s</value>
|
||||
<description>
|
||||
Initial interval between attempts to retry operations while waiting for S3
|
||||
to become consistent with the S3Guard data.
|
||||
An exponential back-off is used here: every failure doubles the delay.
|
||||
</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>fs.s3a.committer.name</name>
|
||||
<value>file</value>
|
||||
|
|
|
@ -558,7 +558,8 @@ public class ContractTestUtils extends Assert {
|
|||
*/
|
||||
public static void assertIsDirectory(FileSystem fs,
|
||||
Path path) throws IOException {
|
||||
FileStatus fileStatus = fs.getFileStatus(path);
|
||||
FileStatus fileStatus = verifyPathExists(fs,
|
||||
"Expected to find a directory", path);
|
||||
assertIsDirectory(fileStatus);
|
||||
}
|
||||
|
||||
|
|
|
@ -18,6 +18,7 @@
|
|||
|
||||
package org.apache.hadoop.fs.shell;
|
||||
|
||||
import static org.apache.hadoop.test.GenericTestUtils.assertExceptionContains;
|
||||
import static org.junit.Assert.*;
|
||||
import static org.mockito.Mockito.*;
|
||||
|
||||
|
@ -77,10 +78,19 @@ public class TestCopy {
|
|||
when(in.read(any(byte[].class), anyInt(), anyInt())).thenReturn(-1);
|
||||
|
||||
tryCopyStream(in, true);
|
||||
verify(in).close();
|
||||
verify(out, times(2)).close();
|
||||
// no data was written.
|
||||
verify(out, never()).write(any(byte[].class), anyInt(), anyInt());
|
||||
verify(mockFs, never()).delete(eq(path), anyBoolean());
|
||||
verify(mockFs).rename(eq(tmpPath), eq(path));
|
||||
verify(mockFs, never()).delete(eq(tmpPath), anyBoolean());
|
||||
verify(mockFs, never()).close();
|
||||
// temp path never had is existence checked. This is critical for S3 as it
|
||||
// avoids the successful path accidentally getting a 404 into the S3 load
|
||||
// balancer cache
|
||||
verify(mockFs, never()).exists(eq(tmpPath));
|
||||
verify(mockFs, never()).exists(eq(path));
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -110,6 +120,31 @@ public class TestCopy {
|
|||
FSDataInputStream in = mock(FSDataInputStream.class);
|
||||
|
||||
tryCopyStream(in, false);
|
||||
verify(mockFs, never()).rename(any(Path.class), any(Path.class));
|
||||
verify(mockFs, never()).delete(eq(tmpPath), anyBoolean());
|
||||
verify(mockFs, never()).delete(eq(path), anyBoolean());
|
||||
verify(mockFs, never()).close();
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a file but fail in the write.
|
||||
* The copy operation should attempt to clean up by
|
||||
* closing the output stream then deleting it.
|
||||
*/
|
||||
@Test
|
||||
public void testFailedWrite() throws Exception {
|
||||
FSDataOutputStream out = mock(FSDataOutputStream.class);
|
||||
doThrow(new IOException("mocked"))
|
||||
.when(out).write(any(byte[].class), anyInt(), anyInt());
|
||||
whenFsCreate().thenReturn(out);
|
||||
when(mockFs.getFileStatus(eq(tmpPath))).thenReturn(fileStat);
|
||||
FSInputStream in = mock(FSInputStream.class);
|
||||
doReturn(0)
|
||||
.when(in).read(any(byte[].class), anyInt(), anyInt());
|
||||
Throwable thrown = tryCopyStream(in, false);
|
||||
assertExceptionContains("mocked", thrown);
|
||||
verify(in).close();
|
||||
verify(out, times(2)).close();
|
||||
verify(mockFs).delete(eq(tmpPath), anyBoolean());
|
||||
verify(mockFs, never()).rename(any(Path.class), any(Path.class));
|
||||
verify(mockFs, never()).delete(eq(path), anyBoolean());
|
||||
|
@ -155,13 +190,20 @@ public class TestCopy {
|
|||
anyBoolean(), anyInt(), anyShort(), anyLong(), any()));
|
||||
}
|
||||
|
||||
private void tryCopyStream(InputStream in, boolean shouldPass) {
|
||||
private Throwable tryCopyStream(InputStream in, boolean shouldPass) {
|
||||
try {
|
||||
cmd.copyStreamToTarget(new FSDataInputStream(in), target);
|
||||
return null;
|
||||
} catch (InterruptedIOException e) {
|
||||
assertFalse("copy failed", shouldPass);
|
||||
if (shouldPass) {
|
||||
throw new AssertionError("copy failed", e);
|
||||
}
|
||||
return e;
|
||||
} catch (Throwable e) {
|
||||
assertFalse(e.getMessage(), shouldPass);
|
||||
if (shouldPass) {
|
||||
throw new AssertionError(e.getMessage(), e);
|
||||
}
|
||||
return e;
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -761,4 +761,32 @@ public final class Constants {
|
|||
* Default change detection require version: true.
|
||||
*/
|
||||
public static final boolean CHANGE_DETECT_REQUIRE_VERSION_DEFAULT = true;
|
||||
|
||||
/**
|
||||
* Number of times to retry any repeatable S3 client request on failure,
|
||||
* excluding throttling requests: {@value}.
|
||||
*/
|
||||
public static final String S3GUARD_CONSISTENCY_RETRY_LIMIT =
|
||||
"fs.s3a.s3guard.consistency.retry.limit";
|
||||
|
||||
/**
|
||||
* Default retry limit: {@value}.
|
||||
*/
|
||||
public static final int S3GUARD_CONSISTENCY_RETRY_LIMIT_DEFAULT = 7;
|
||||
|
||||
/**
|
||||
* Initial retry interval: {@value}.
|
||||
*/
|
||||
public static final String S3GUARD_CONSISTENCY_RETRY_INTERVAL =
|
||||
"fs.s3a.s3guard.consistency.retry.interval";
|
||||
|
||||
/**
|
||||
* Default initial retry interval: {@value}.
|
||||
* The consistency retry probe uses exponential backoff, because
|
||||
* each probe can cause the S3 load balancers to retain any 404 in
|
||||
* its cache for longer. See HADOOP-16490.
|
||||
*/
|
||||
public static final String S3GUARD_CONSISTENCY_RETRY_INTERVAL_DEFAULT =
|
||||
"2s";
|
||||
|
||||
}
|
||||
|
|
|
@ -25,7 +25,7 @@ import org.apache.hadoop.fs.PathIOException;
|
|||
/**
|
||||
* Indicates the S3 object is out of sync with the expected version. Thrown in
|
||||
* cases such as when the object is updated while an {@link S3AInputStream} is
|
||||
* open.
|
||||
* open, or when a file expected was never found.
|
||||
*/
|
||||
@SuppressWarnings("serial")
|
||||
@InterfaceAudience.Public
|
||||
|
@ -35,6 +35,20 @@ public class RemoteFileChangedException extends PathIOException {
|
|||
public static final String PRECONDITIONS_FAILED =
|
||||
"Constraints of request were unsatisfiable";
|
||||
|
||||
/**
|
||||
* While trying to get information on a file known to S3Guard, the
|
||||
* file never became visible in S3.
|
||||
*/
|
||||
public static final String FILE_NEVER_FOUND =
|
||||
"File to rename not found on guarded S3 store after repeated attempts";
|
||||
|
||||
/**
|
||||
* The file wasn't found in rename after a single attempt -the unguarded
|
||||
* codepath.
|
||||
*/
|
||||
public static final String FILE_NOT_FOUND_SINGLE_ATTEMPT =
|
||||
"File to rename not found on unguarded S3 store";
|
||||
|
||||
/**
|
||||
* Constructs a RemoteFileChangedException.
|
||||
*
|
||||
|
|
|
@ -102,6 +102,7 @@ import org.apache.hadoop.fs.s3a.impl.InternalConstants;
|
|||
import org.apache.hadoop.fs.s3a.impl.MultiObjectDeleteSupport;
|
||||
import org.apache.hadoop.fs.s3a.impl.OperationCallbacks;
|
||||
import org.apache.hadoop.fs.s3a.impl.RenameOperation;
|
||||
import org.apache.hadoop.fs.s3a.impl.StatusProbeEnum;
|
||||
import org.apache.hadoop.fs.s3a.impl.StoreContext;
|
||||
import org.apache.hadoop.fs.s3a.s3guard.BulkOperationState;
|
||||
import org.apache.hadoop.fs.s3a.select.InternalSelectConstants;
|
||||
|
@ -160,6 +161,7 @@ import static org.apache.hadoop.fs.s3a.auth.RolePolicies.STATEMENT_ALLOW_SSE_KMS
|
|||
import static org.apache.hadoop.fs.s3a.auth.RolePolicies.allowS3Operations;
|
||||
import static org.apache.hadoop.fs.s3a.auth.delegation.S3ADelegationTokens.TokenIssuingPolicy.NoTokensAvailable;
|
||||
import static org.apache.hadoop.fs.s3a.auth.delegation.S3ADelegationTokens.hasDelegationTokenBinding;
|
||||
import static org.apache.hadoop.fs.s3a.impl.InternalConstants.SC_404;
|
||||
import static org.apache.hadoop.io.IOUtils.cleanupWithLogger;
|
||||
|
||||
/**
|
||||
|
@ -1059,8 +1061,14 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
|||
String key = pathToKey(path);
|
||||
FileStatus status = null;
|
||||
try {
|
||||
// get the status or throw an FNFE
|
||||
status = getFileStatus(path);
|
||||
// get the status or throw an FNFE.
|
||||
// when overwriting, there is no need to look for any existing file,
|
||||
// and attempting to do so can poison the load balancers with 404
|
||||
// entries.
|
||||
status = innerGetFileStatus(path, false,
|
||||
overwrite
|
||||
? StatusProbeEnum.DIRECTORIES
|
||||
: StatusProbeEnum.ALL);
|
||||
|
||||
// if the thread reaches here, there is something at the path
|
||||
if (status.isDirectory()) {
|
||||
|
@ -1216,7 +1224,8 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
|||
|
||||
// get the source file status; this raises a FNFE if there is no source
|
||||
// file.
|
||||
S3AFileStatus srcStatus = innerGetFileStatus(src, true);
|
||||
S3AFileStatus srcStatus = innerGetFileStatus(src, true,
|
||||
StatusProbeEnum.ALL);
|
||||
|
||||
if (srcKey.equals(dstKey)) {
|
||||
LOG.debug("rename: src and dest refer to the same file or directory: {}",
|
||||
|
@ -1228,7 +1237,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
|||
|
||||
S3AFileStatus dstStatus = null;
|
||||
try {
|
||||
dstStatus = innerGetFileStatus(dst, true);
|
||||
dstStatus = innerGetFileStatus(dst, true, StatusProbeEnum.ALL);
|
||||
// if there is no destination entry, an exception is raised.
|
||||
// hence this code sequence can assume that there is something
|
||||
// at the end of the path; the only detail being what it is and
|
||||
|
@ -1261,7 +1270,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
|||
if (!pathToKey(parent).isEmpty()) {
|
||||
try {
|
||||
S3AFileStatus dstParentStatus = innerGetFileStatus(dst.getParent(),
|
||||
false);
|
||||
false, StatusProbeEnum.ALL);
|
||||
if (!dstParentStatus.isDirectory()) {
|
||||
throw new RenameFailedException(src, dst,
|
||||
"destination parent is not a directory");
|
||||
|
@ -1660,6 +1669,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
|||
ObjectMetadata meta = changeInvoker.retryUntranslated("GET " + key, true,
|
||||
() -> {
|
||||
incrementStatistic(OBJECT_METADATA_REQUESTS);
|
||||
LOG.debug("HEAD {} with change tracker {}", key, changeTracker);
|
||||
if (changeTracker != null) {
|
||||
changeTracker.maybeApplyConstraint(request);
|
||||
}
|
||||
|
@ -2267,7 +2277,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
|||
entryPoint(INVOCATION_DELETE);
|
||||
DeleteOperation deleteOperation = new DeleteOperation(
|
||||
createStoreContext(),
|
||||
innerGetFileStatus(f, true),
|
||||
innerGetFileStatus(f, true, StatusProbeEnum.ALL),
|
||||
recursive,
|
||||
operationCallbacks,
|
||||
InternalConstants.MAX_ENTRIES_TO_DELETE);
|
||||
|
@ -2297,13 +2307,15 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
|||
* Retry policy: retrying; untranslated.
|
||||
* @param f path to create
|
||||
* @throws IOException IO problem
|
||||
* @throws AmazonClientException untranslated AWS client problem
|
||||
*/
|
||||
@Retries.RetryTranslated
|
||||
private void createFakeDirectoryIfNecessary(Path f)
|
||||
throws IOException, AmazonClientException {
|
||||
String key = pathToKey(f);
|
||||
if (!key.isEmpty() && !s3Exists(f)) {
|
||||
// we only make the LIST call; the codepaths to get here should not
|
||||
// be reached if there is an empty dir marker -and if they do, it
|
||||
// is mostly harmless to create a new one.
|
||||
if (!key.isEmpty() && !s3Exists(f, EnumSet.of(StatusProbeEnum.List))) {
|
||||
LOG.debug("Creating new fake directory at {}", f);
|
||||
createFakeDirectory(key);
|
||||
}
|
||||
|
@ -2314,7 +2326,6 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
|||
* That is: it parent is not the root path and does not yet exist.
|
||||
* @param path whose parent is created if needed.
|
||||
* @throws IOException IO problem
|
||||
* @throws AmazonClientException untranslated AWS client problem
|
||||
*/
|
||||
@Retries.RetryTranslated
|
||||
void maybeCreateFakeParentDirectory(Path path)
|
||||
|
@ -2568,14 +2579,23 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
|||
*/
|
||||
@Retries.RetryTranslated
|
||||
public FileStatus getFileStatus(final Path f) throws IOException {
|
||||
return innerGetFileStatus(f, false);
|
||||
entryPoint(INVOCATION_GET_FILE_STATUS);
|
||||
return innerGetFileStatus(f, false, StatusProbeEnum.ALL);
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the status of a file or directory, first through S3Guard and then
|
||||
* through S3.
|
||||
* The S3 probes can leave 404 responses in the S3 load balancers; if
|
||||
* a check is only needed for a directory, declaring this saves time and
|
||||
* avoids creating one for the object.
|
||||
* When only probing for directories, if an entry for a file is found in
|
||||
* S3Guard it is returned, but checks for updated values are skipped.
|
||||
* Internal version of {@link #getFileStatus(Path)}.
|
||||
* @param f The path we want information from
|
||||
* @param needEmptyDirectoryFlag if true, implementation will calculate
|
||||
* a TRUE or FALSE value for {@link S3AFileStatus#isEmptyDirectory()}
|
||||
* @param probes probes to make
|
||||
* @return a S3AFileStatus object
|
||||
* @throws FileNotFoundException when the path does not exist
|
||||
* @throws IOException on other problems.
|
||||
|
@ -2583,9 +2603,8 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
|||
@VisibleForTesting
|
||||
@Retries.RetryTranslated
|
||||
S3AFileStatus innerGetFileStatus(final Path f,
|
||||
boolean needEmptyDirectoryFlag) throws IOException {
|
||||
entryPoint(INVOCATION_GET_FILE_STATUS);
|
||||
checkNotClosed();
|
||||
final boolean needEmptyDirectoryFlag,
|
||||
final Set<StatusProbeEnum> probes) throws IOException {
|
||||
final Path path = qualify(f);
|
||||
String key = pathToKey(path);
|
||||
LOG.debug("Getting path status for {} ({})", path, key);
|
||||
|
@ -2602,7 +2621,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
|||
OffsetDateTime deletedAt = OffsetDateTime.ofInstant(
|
||||
Instant.ofEpochMilli(pm.getFileStatus().getModificationTime()),
|
||||
ZoneOffset.UTC);
|
||||
throw new FileNotFoundException("Path " + f + " is recorded as " +
|
||||
throw new FileNotFoundException("Path " + path + " is recorded as " +
|
||||
"deleted by S3Guard at " + deletedAt);
|
||||
}
|
||||
|
||||
|
@ -2612,15 +2631,18 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
|||
// dest is also a directory, there's no difference.
|
||||
// TODO After HADOOP-16085 the modification detection can be done with
|
||||
// etags or object version instead of modTime
|
||||
boolean allowAuthoritative = allowAuthoritative(f);
|
||||
boolean allowAuthoritative = allowAuthoritative(path);
|
||||
if (!pm.getFileStatus().isDirectory() &&
|
||||
!allowAuthoritative) {
|
||||
!allowAuthoritative &&
|
||||
probes.contains(StatusProbeEnum.Head)) {
|
||||
// a file has been found in a non-auth path and the caller has not said
|
||||
// they only care about directories
|
||||
LOG.debug("Metadata for {} found in the non-auth metastore.", path);
|
||||
final long msModTime = pm.getFileStatus().getModificationTime();
|
||||
|
||||
S3AFileStatus s3AFileStatus;
|
||||
try {
|
||||
s3AFileStatus = s3GetFileStatus(path, key, tombstones);
|
||||
s3AFileStatus = s3GetFileStatus(path, key, probes, tombstones);
|
||||
} catch (FileNotFoundException fne) {
|
||||
s3AFileStatus = null;
|
||||
}
|
||||
|
@ -2662,7 +2684,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
|||
// S3 yet, we'll assume the empty directory is true;
|
||||
S3AFileStatus s3FileStatus;
|
||||
try {
|
||||
s3FileStatus = s3GetFileStatus(path, key, tombstones);
|
||||
s3FileStatus = s3GetFileStatus(path, key, probes, tombstones);
|
||||
} catch (FileNotFoundException e) {
|
||||
return S3AFileStatus.fromFileStatus(msStatus, Tristate.TRUE,
|
||||
null, null);
|
||||
|
@ -2674,7 +2696,8 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
|||
// there was no entry in S3Guard
|
||||
// retrieve the data and update the metadata store in the process.
|
||||
return S3Guard.putAndReturn(metadataStore,
|
||||
s3GetFileStatus(path, key, tombstones), instrumentation,
|
||||
s3GetFileStatus(path, key, StatusProbeEnum.ALL, tombstones),
|
||||
instrumentation,
|
||||
ttlTimeProvider);
|
||||
}
|
||||
}
|
||||
|
@ -2686,14 +2709,18 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
|||
* Retry policy: retry translated.
|
||||
* @param path Qualified path
|
||||
* @param key Key string for the path
|
||||
* @param probes probes to make
|
||||
* @param tombstones tombstones to filter
|
||||
* @return Status
|
||||
* @throws FileNotFoundException when the path does not exist
|
||||
* @throws IOException on other problems.
|
||||
*/
|
||||
@Retries.RetryTranslated
|
||||
private S3AFileStatus s3GetFileStatus(final Path path, String key,
|
||||
Set<Path> tombstones) throws IOException {
|
||||
if (!key.isEmpty()) {
|
||||
private S3AFileStatus s3GetFileStatus(final Path path,
|
||||
String key,
|
||||
final Set<StatusProbeEnum> probes,
|
||||
final Set<Path> tombstones) throws IOException {
|
||||
if (!key.isEmpty() && probes.contains(StatusProbeEnum.Head)) {
|
||||
try {
|
||||
ObjectMetadata meta = getObjectMetadata(key);
|
||||
|
||||
|
@ -2711,15 +2738,15 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
|||
meta.getVersionId());
|
||||
}
|
||||
} catch (AmazonServiceException e) {
|
||||
if (e.getStatusCode() != 404) {
|
||||
if (e.getStatusCode() != SC_404) {
|
||||
throw translateException("getFileStatus", path, e);
|
||||
}
|
||||
} catch (AmazonClientException e) {
|
||||
throw translateException("getFileStatus", path, e);
|
||||
}
|
||||
|
||||
// Necessary?
|
||||
if (!key.endsWith("/")) {
|
||||
// Look for the dir marker
|
||||
if (!key.endsWith("/") && probes.contains(StatusProbeEnum.DirMarker)) {
|
||||
String newKey = key + "/";
|
||||
try {
|
||||
ObjectMetadata meta = getObjectMetadata(newKey);
|
||||
|
@ -2740,7 +2767,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
|||
meta.getVersionId());
|
||||
}
|
||||
} catch (AmazonServiceException e) {
|
||||
if (e.getStatusCode() != 404) {
|
||||
if (e.getStatusCode() != SC_404) {
|
||||
throw translateException("getFileStatus", newKey, e);
|
||||
}
|
||||
} catch (AmazonClientException e) {
|
||||
|
@ -2749,6 +2776,8 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
|||
}
|
||||
}
|
||||
|
||||
// execute the list
|
||||
if (probes.contains(StatusProbeEnum.List)) {
|
||||
try {
|
||||
key = maybeAddTrailingSlash(key);
|
||||
S3ListRequest request = createListObjectsRequest(key, "/", 1);
|
||||
|
@ -2777,12 +2806,13 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
|||
return new S3AFileStatus(Tristate.TRUE, path, username);
|
||||
}
|
||||
} catch (AmazonServiceException e) {
|
||||
if (e.getStatusCode() != 404) {
|
||||
if (e.getStatusCode() != SC_404) {
|
||||
throw translateException("getFileStatus", path, e);
|
||||
}
|
||||
} catch (AmazonClientException e) {
|
||||
throw translateException("getFileStatus", path, e);
|
||||
}
|
||||
}
|
||||
|
||||
LOG.debug("Not Found: {}", path);
|
||||
throw new FileNotFoundException("No such file or directory: " + path);
|
||||
|
@ -2834,15 +2864,17 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
|||
* Raw version of {@link FileSystem#exists(Path)} which uses S3 only:
|
||||
* S3Guard MetadataStore, if any, will be skipped.
|
||||
* Retry policy: retrying; translated.
|
||||
* @param path qualified path to look for
|
||||
* @param probes probes to make
|
||||
* @return true if path exists in S3
|
||||
* @throws IOException IO failure
|
||||
*/
|
||||
@Retries.RetryTranslated
|
||||
private boolean s3Exists(final Path f) throws IOException {
|
||||
Path path = qualify(f);
|
||||
private boolean s3Exists(final Path path, final Set<StatusProbeEnum> probes)
|
||||
throws IOException {
|
||||
String key = pathToKey(path);
|
||||
try {
|
||||
s3GetFileStatus(path, key, null);
|
||||
s3GetFileStatus(path, key, probes, null);
|
||||
return true;
|
||||
} catch (FileNotFoundException e) {
|
||||
return false;
|
||||
|
@ -3160,10 +3192,31 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
|||
String action = "copyFile(" + srcKey + ", " + dstKey + ")";
|
||||
Invoker readInvoker = readContext.getReadInvoker();
|
||||
|
||||
ObjectMetadata srcom =
|
||||
once(action, srcKey,
|
||||
ObjectMetadata srcom;
|
||||
try {
|
||||
srcom = once(action, srcKey,
|
||||
() ->
|
||||
getObjectMetadata(srcKey, changeTracker, readInvoker, "copy"));
|
||||
} catch (FileNotFoundException e) {
|
||||
// if rename fails at this point it means that the expected file was not
|
||||
// found.
|
||||
// The cause is believed to always be one of
|
||||
// - File was deleted since LIST/S3Guard metastore.list.() knew of it.
|
||||
// - S3Guard is asking for a specific version and it's been removed by
|
||||
// lifecycle rules.
|
||||
// - there's a 404 cached in the S3 load balancers.
|
||||
LOG.debug("getObjectMetadata({}) failed to find an expected file",
|
||||
srcKey, e);
|
||||
// We create an exception, but the text depends on the S3Guard state
|
||||
String message = hasMetadataStore()
|
||||
? RemoteFileChangedException.FILE_NEVER_FOUND
|
||||
: RemoteFileChangedException.FILE_NOT_FOUND_SINGLE_ATTEMPT;
|
||||
throw new RemoteFileChangedException(
|
||||
keyToQualifiedPath(srcKey).toString(),
|
||||
action,
|
||||
message,
|
||||
e);
|
||||
}
|
||||
ObjectMetadata dstom = cloneObjectMetadata(srcom);
|
||||
setOptionalObjectMetadata(dstom);
|
||||
|
||||
|
|
|
@ -33,6 +33,8 @@ import java.util.concurrent.TimeUnit;
|
|||
import com.amazonaws.AmazonClientException;
|
||||
import com.amazonaws.services.dynamodbv2.model.ProvisionedThroughputExceededException;
|
||||
import com.google.common.base.Preconditions;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.InvalidRequestException;
|
||||
|
@ -80,15 +82,21 @@ import static org.apache.hadoop.fs.s3a.Constants.*;
|
|||
@SuppressWarnings("visibilitymodifier") // I want a struct of finals, for real.
|
||||
public class S3ARetryPolicy implements RetryPolicy {
|
||||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(
|
||||
S3ARetryPolicy.class);
|
||||
|
||||
private final Configuration configuration;
|
||||
|
||||
/** Final retry policy we end up with. */
|
||||
private final RetryPolicy retryPolicy;
|
||||
|
||||
// Retry policies for mapping exceptions to
|
||||
|
||||
/** Base policy from configuration. */
|
||||
protected final RetryPolicy fixedRetries;
|
||||
/** Exponential policy for the base of normal failures. */
|
||||
protected final RetryPolicy baseExponentialRetry;
|
||||
|
||||
/** Rejection of all non-idempotent calls except specific failures. */
|
||||
/** Idempotent calls which raise IOEs are retried.
|
||||
* */
|
||||
protected final RetryPolicy retryIdempotentCalls;
|
||||
|
||||
/** Policy for throttle requests, which are considered repeatable, even for
|
||||
|
@ -98,7 +106,10 @@ public class S3ARetryPolicy implements RetryPolicy {
|
|||
/** No retry on network and tangible API issues. */
|
||||
protected final RetryPolicy fail = RetryPolicies.TRY_ONCE_THEN_FAIL;
|
||||
|
||||
/** Client connectivity: fixed retries without care for idempotency. */
|
||||
/**
|
||||
* Client connectivity: baseExponentialRetry without worrying about whether
|
||||
* or not the command is idempotent.
|
||||
*/
|
||||
protected final RetryPolicy connectivityFailure;
|
||||
|
||||
/**
|
||||
|
@ -107,19 +118,26 @@ public class S3ARetryPolicy implements RetryPolicy {
|
|||
*/
|
||||
public S3ARetryPolicy(Configuration conf) {
|
||||
Preconditions.checkArgument(conf != null, "Null configuration");
|
||||
this.configuration = conf;
|
||||
|
||||
// base policy from configuration
|
||||
fixedRetries = exponentialBackoffRetry(
|
||||
conf.getInt(RETRY_LIMIT, RETRY_LIMIT_DEFAULT),
|
||||
conf.getTimeDuration(RETRY_INTERVAL,
|
||||
int limit = conf.getInt(RETRY_LIMIT, RETRY_LIMIT_DEFAULT);
|
||||
long interval = conf.getTimeDuration(RETRY_INTERVAL,
|
||||
RETRY_INTERVAL_DEFAULT,
|
||||
TimeUnit.MILLISECONDS),
|
||||
TimeUnit.MILLISECONDS);
|
||||
baseExponentialRetry = exponentialBackoffRetry(
|
||||
limit,
|
||||
interval,
|
||||
TimeUnit.MILLISECONDS);
|
||||
|
||||
// which is wrapped by a rejection of all non-idempotent calls except
|
||||
// for specific failures.
|
||||
LOG.debug("Retrying on recoverable AWS failures {} times with an"
|
||||
+ " initial interval of {}ms", limit, interval);
|
||||
|
||||
// which is wrapped by a rejection of failures of non-idempotent calls
|
||||
// except for specific exceptions considered recoverable.
|
||||
// idempotent calls are retried on IOEs but not other exceptions
|
||||
retryIdempotentCalls = new FailNonIOEs(
|
||||
new IdempotencyRetryFilter(fixedRetries));
|
||||
new IdempotencyRetryFilter(baseExponentialRetry));
|
||||
|
||||
// and a separate policy for throttle requests, which are considered
|
||||
// repeatable, even for non-idempotent calls, as the service
|
||||
|
@ -127,7 +145,7 @@ public class S3ARetryPolicy implements RetryPolicy {
|
|||
throttlePolicy = createThrottleRetryPolicy(conf);
|
||||
|
||||
// client connectivity: fixed retries without care for idempotency
|
||||
connectivityFailure = fixedRetries;
|
||||
connectivityFailure = baseExponentialRetry;
|
||||
|
||||
Map<Class<? extends Exception>, RetryPolicy> policyMap =
|
||||
createExceptionMap();
|
||||
|
@ -239,6 +257,14 @@ public class S3ARetryPolicy implements RetryPolicy {
|
|||
return retryPolicy.shouldRetry(ex, retries, failovers, idempotent);
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the configuration this policy was created from.
|
||||
* @return the configuration.
|
||||
*/
|
||||
protected Configuration getConfiguration() {
|
||||
return configuration;
|
||||
}
|
||||
|
||||
/**
|
||||
* Policy which fails fast any non-idempotent call; hands off
|
||||
* all idempotent calls to the next retry policy.
|
||||
|
|
|
@ -20,16 +20,29 @@ package org.apache.hadoop.fs.s3a;
|
|||
|
||||
import java.io.FileNotFoundException;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.io.retry.RetryPolicy;
|
||||
|
||||
import static org.apache.hadoop.fs.s3a.Constants.S3GUARD_CONSISTENCY_RETRY_INTERVAL;
|
||||
import static org.apache.hadoop.fs.s3a.Constants.S3GUARD_CONSISTENCY_RETRY_INTERVAL_DEFAULT;
|
||||
import static org.apache.hadoop.fs.s3a.Constants.S3GUARD_CONSISTENCY_RETRY_LIMIT;
|
||||
import static org.apache.hadoop.fs.s3a.Constants.S3GUARD_CONSISTENCY_RETRY_LIMIT_DEFAULT;
|
||||
import static org.apache.hadoop.io.retry.RetryPolicies.retryUpToMaximumCountWithProportionalSleep;
|
||||
|
||||
/**
|
||||
* Slightly-modified retry policy for cases when the file is present in the
|
||||
* MetadataStore, but may be still throwing FileNotFoundException from S3.
|
||||
*/
|
||||
public class S3GuardExistsRetryPolicy extends S3ARetryPolicy {
|
||||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(
|
||||
S3GuardExistsRetryPolicy.class);
|
||||
|
||||
/**
|
||||
* Instantiate.
|
||||
* @param conf configuration to read.
|
||||
|
@ -41,8 +54,23 @@ public class S3GuardExistsRetryPolicy extends S3ARetryPolicy {
|
|||
@Override
|
||||
protected Map<Class<? extends Exception>, RetryPolicy> createExceptionMap() {
|
||||
Map<Class<? extends Exception>, RetryPolicy> b = super.createExceptionMap();
|
||||
b.put(FileNotFoundException.class, retryIdempotentCalls);
|
||||
b.put(RemoteFileChangedException.class, retryIdempotentCalls);
|
||||
Configuration conf = getConfiguration();
|
||||
|
||||
// base policy from configuration
|
||||
int limit = conf.getInt(S3GUARD_CONSISTENCY_RETRY_LIMIT,
|
||||
S3GUARD_CONSISTENCY_RETRY_LIMIT_DEFAULT);
|
||||
long interval = conf.getTimeDuration(S3GUARD_CONSISTENCY_RETRY_INTERVAL,
|
||||
S3GUARD_CONSISTENCY_RETRY_INTERVAL_DEFAULT,
|
||||
TimeUnit.MILLISECONDS);
|
||||
RetryPolicy retryPolicy = retryUpToMaximumCountWithProportionalSleep(
|
||||
limit,
|
||||
interval,
|
||||
TimeUnit.MILLISECONDS);
|
||||
LOG.debug("Retrying on recoverable S3Guard table/S3 inconsistencies {}"
|
||||
+ " times with an initial interval of {}ms", limit, interval);
|
||||
|
||||
b.put(FileNotFoundException.class, retryPolicy);
|
||||
b.put(RemoteFileChangedException.class, retryPolicy);
|
||||
return b;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -190,6 +190,15 @@ public abstract class ChangeDetectionPolicy {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* String value for logging.
|
||||
* @return source and mode.
|
||||
*/
|
||||
@Override
|
||||
public String toString() {
|
||||
return "Policy " + getSource() + "/" + getMode();
|
||||
}
|
||||
|
||||
/**
|
||||
* Pulls the attribute this policy uses to detect change out of the S3 object
|
||||
* metadata. The policy generically refers to this attribute as
|
||||
|
@ -342,6 +351,8 @@ public abstract class ChangeDetectionPolicy {
|
|||
if (revisionId != null) {
|
||||
LOG.debug("Restricting get request to etag {}", revisionId);
|
||||
request.withMatchingETagConstraint(revisionId);
|
||||
} else {
|
||||
LOG.debug("No etag revision ID to use as a constraint");
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -351,13 +362,15 @@ public abstract class ChangeDetectionPolicy {
|
|||
if (revisionId != null) {
|
||||
LOG.debug("Restricting copy request to etag {}", revisionId);
|
||||
request.withMatchingETagConstraint(revisionId);
|
||||
} else {
|
||||
LOG.debug("No etag revision ID to use as a constraint");
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void applyRevisionConstraint(GetObjectMetadataRequest request,
|
||||
String revisionId) {
|
||||
// GetObjectMetadataRequest doesn't support eTag qualification
|
||||
LOG.debug("Unable to restrict HEAD request to etag; will check later");
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -415,6 +428,8 @@ public abstract class ChangeDetectionPolicy {
|
|||
if (revisionId != null) {
|
||||
LOG.debug("Restricting get request to version {}", revisionId);
|
||||
request.withVersionId(revisionId);
|
||||
} else {
|
||||
LOG.debug("No version ID to use as a constraint");
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -424,6 +439,8 @@ public abstract class ChangeDetectionPolicy {
|
|||
if (revisionId != null) {
|
||||
LOG.debug("Restricting copy request to version {}", revisionId);
|
||||
request.withSourceVersionId(revisionId);
|
||||
} else {
|
||||
LOG.debug("No version ID to use as a constraint");
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -433,6 +450,8 @@ public abstract class ChangeDetectionPolicy {
|
|||
if (revisionId != null) {
|
||||
LOG.debug("Restricting metadata request to version {}", revisionId);
|
||||
request.withVersionId(revisionId);
|
||||
} else {
|
||||
LOG.debug("No version ID to use as a constraint");
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -97,7 +97,8 @@ public class ChangeTracker {
|
|||
this.versionMismatches = versionMismatches;
|
||||
this.revisionId = policy.getRevisionId(s3ObjectAttributes);
|
||||
if (revisionId != null) {
|
||||
LOG.debug("Revision ID for object at {}: {}", uri, revisionId);
|
||||
LOG.debug("Tracker {} has revision ID for object at {}: {}",
|
||||
policy, uri, revisionId);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -307,7 +308,7 @@ public class ChangeTracker {
|
|||
public String toString() {
|
||||
final StringBuilder sb = new StringBuilder(
|
||||
"ChangeTracker{");
|
||||
sb.append("changeDetectionPolicy=").append(policy);
|
||||
sb.append(policy);
|
||||
sb.append(", revisionId='").append(revisionId).append('\'');
|
||||
sb.append('}');
|
||||
return sb.toString();
|
||||
|
|
|
@ -83,4 +83,6 @@ public final class InternalConstants {
|
|||
Arrays.asList(Constants.INPUT_FADVISE,
|
||||
Constants.READAHEAD_RANGE)));
|
||||
|
||||
/** 404 error code. */
|
||||
public static final int SC_404 = 404;
|
||||
}
|
||||
|
|
|
@ -0,0 +1,44 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.fs.s3a.impl;
|
||||
|
||||
import java.util.EnumSet;
|
||||
import java.util.Set;
|
||||
|
||||
/**
|
||||
* Enum of probes which can be made of S3.
|
||||
*/
|
||||
public enum StatusProbeEnum {
|
||||
|
||||
/** The actual path. */
|
||||
Head,
|
||||
/** HEAD of the path + /. */
|
||||
DirMarker,
|
||||
/** LIST under the path. */
|
||||
List;
|
||||
|
||||
/** All probes. */
|
||||
public static final Set<StatusProbeEnum> ALL = EnumSet.allOf(
|
||||
StatusProbeEnum.class);
|
||||
|
||||
/** Skip the HEAD and only look for directories. */
|
||||
public static final Set<StatusProbeEnum> DIRECTORIES =
|
||||
EnumSet.of(DirMarker, List);
|
||||
|
||||
}
|
|
@ -1202,9 +1202,9 @@ The configurations items controlling this behavior are:
|
|||
In the default configuration, S3 object eTags are used to detect changes. When
|
||||
the filesystem retrieves a file from S3 using
|
||||
[Get Object](https://docs.aws.amazon.com/AmazonS3/latest/API/RESTObjectGET.html),
|
||||
it captures the eTag and uses that eTag in an 'If-Match' condition on each
|
||||
it captures the eTag and uses that eTag in an `If-Match` condition on each
|
||||
subsequent request. If a concurrent writer has overwritten the file, the
|
||||
'If-Match' condition will fail and a RemoteFileChangedException will be thrown.
|
||||
'If-Match' condition will fail and a `RemoteFileChangedException` will be thrown.
|
||||
|
||||
Even in this default configuration, a new write may not trigger this exception
|
||||
on an open reader. For example, if the reader only reads forward in the file
|
||||
|
@ -1229,7 +1229,7 @@ It is possible to switch to using the
|
|||
instead of eTag as the change detection mechanism. Use of this option requires
|
||||
object versioning to be enabled on any S3 buckets used by the filesystem. The
|
||||
benefit of using version id instead of eTag is potentially reduced frequency
|
||||
of RemoteFileChangedException. With object versioning enabled, old versions
|
||||
of `RemoteFileChangedException`. With object versioning enabled, old versions
|
||||
of objects remain available after they have been overwritten.
|
||||
This means an open input stream will still be able to seek backwards after a
|
||||
concurrent writer has overwritten the file.
|
||||
|
|
|
@ -1029,6 +1029,56 @@ before versioning was enabled.
|
|||
See [Handling Read-During-Overwrite](./index.html#handling_read-during-overwrite)
|
||||
for more information.
|
||||
|
||||
### `RemoteFileChangedException`: "File to rename not found on guarded S3 store after repeated attempts"
|
||||
|
||||
A file being renamed and listed in the S3Guard table could not be found
|
||||
in the S3 bucket even after multiple attempts.
|
||||
|
||||
```
|
||||
org.apache.hadoop.fs.s3a.RemoteFileChangedException: copyFile(/sourcedir/missing, /destdir/)
|
||||
`s3a://example/sourcedir/missing': File not found on S3 after repeated attempts: `s3a://example/sourcedir/missing'
|
||||
at org.apache.hadoop.fs.s3a.S3AFileSystem.copyFile(S3AFileSystem.java:3231)
|
||||
at org.apache.hadoop.fs.s3a.S3AFileSystem.access$700(S3AFileSystem.java:177)
|
||||
at org.apache.hadoop.fs.s3a.S3AFileSystem$RenameOperationCallbacksImpl.copyFile(S3AFileSystem.java:1368)
|
||||
at org.apache.hadoop.fs.s3a.impl.RenameOperation.copySourceAndUpdateTracker(RenameOperation.java:448)
|
||||
at org.apache.hadoop.fs.s3a.impl.RenameOperation.lambda$initiateCopy$0(RenameOperation.java:412)
|
||||
```
|
||||
|
||||
Either the file has been deleted, or an attempt was made to read a file before it
|
||||
was created and the S3 load balancer has briefly cached the 404 returned by that
|
||||
operation. This is something which AWS S3 can do for short periods.
|
||||
|
||||
If error occurs and the file is on S3, consider increasing the value of
|
||||
`fs.s3a.s3guard.consistency.retry.limit`.
|
||||
|
||||
We also recommend using applications/application
|
||||
options which do not rename files when committing work or when copying data
|
||||
to S3, but instead write directly to the final destination.
|
||||
|
||||
### `RemoteFileChangedException`: "File to rename not found on unguarded S3 store"
|
||||
|
||||
```
|
||||
org.apache.hadoop.fs.s3a.RemoteFileChangedException: copyFile(/sourcedir/missing, /destdir/)
|
||||
`s3a://example/sourcedir/missing': File to rename not found on unguarded S3 store: `s3a://example/sourcedir/missing'
|
||||
at org.apache.hadoop.fs.s3a.S3AFileSystem.copyFile(S3AFileSystem.java:3231)
|
||||
at org.apache.hadoop.fs.s3a.S3AFileSystem.access$700(S3AFileSystem.java:177)
|
||||
at org.apache.hadoop.fs.s3a.S3AFileSystem$RenameOperationCallbacksImpl.copyFile(S3AFileSystem.java:1368)
|
||||
at org.apache.hadoop.fs.s3a.impl.RenameOperation.copySourceAndUpdateTracker(RenameOperation.java:448)
|
||||
at org.apache.hadoop.fs.s3a.impl.RenameOperation.lambda$initiateCopy$0(RenameOperation.java:412)
|
||||
```
|
||||
|
||||
An attempt was made to rename a file in an S3 store not protected by SGuard,
|
||||
the directory list operation included the filename in its results but the
|
||||
actual operation to rename the file failed.
|
||||
|
||||
This can happen because S3 directory listings and the store itself are not
|
||||
consistent: the list operation tends to lag changes in the store.
|
||||
It is possible that the file has been deleted.
|
||||
|
||||
The fix here is to use S3Guard. We also recommend using applications/application
|
||||
options which do not rename files when committing work or when copying data
|
||||
to S3, but instead write directly to the final destination.
|
||||
|
||||
## <a name="encryption"></a> S3 Server Side Encryption
|
||||
|
||||
### `AWSS3IOException` `KMS.NotFoundException` "Invalid arn" when using SSE-KMS
|
||||
|
@ -1275,3 +1325,40 @@ Please don't do that. Given that the emulated directory rename and delete operat
|
|||
are not atomic, even without retries, multiple S3 clients working with the same
|
||||
paths can interfere with each other
|
||||
|
||||
### <a name="retries"></a> Tuning S3Guard open/rename Retry Policies
|
||||
|
||||
When the S3A connector attempts to open a file for which it has an entry in
|
||||
its database, it will retry if the desired file is not found. This is
|
||||
done if:
|
||||
|
||||
* No file is found in S3.
|
||||
* There is a file but its version or etag is not consistent with S3Guard table.
|
||||
|
||||
These can be symptoms of S3's eventual consistency, hence the retries.
|
||||
They can also be caused by changes having been made to the S3 Store without
|
||||
SGuard being kept up to date.
|
||||
|
||||
For this reason, the number of retry events are limited.
|
||||
|
||||
```xml
|
||||
<property>
|
||||
<name>fs.s3a.s3guard.consistency.retry.limit</name>
|
||||
<value>7</value>
|
||||
<description>
|
||||
Number of times to retry attempts to read/open/copy files when
|
||||
S3Guard believes a specific version of the file to be available,
|
||||
but the S3 request does not find any version of a file, or a different
|
||||
version.
|
||||
</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>fs.s3a.s3guard.consistency.retry.interval</name>
|
||||
<value>2s</value>
|
||||
<description>
|
||||
Initial interval between attempts to retry operations while waiting for S3
|
||||
to become consistent with the S3Guard data.
|
||||
An exponential back-off is used here: every failure doubles the delay.
|
||||
</description>
|
||||
</property>
|
||||
```
|
||||
|
|
|
@ -20,6 +20,8 @@ package org.apache.hadoop.fs.s3a;
|
|||
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.contract.ContractTestUtils;
|
||||
import org.apache.hadoop.fs.s3a.impl.StatusProbeEnum;
|
||||
|
||||
import org.junit.Test;
|
||||
|
||||
import java.io.IOException;
|
||||
|
@ -77,7 +79,8 @@ public class ITestS3AEmptyDirectory extends AbstractS3ATestBase {
|
|||
|
||||
private S3AFileStatus getS3AFileStatus(S3AFileSystem fs, Path p) throws
|
||||
IOException {
|
||||
return fs.innerGetFileStatus(p, true /* want isEmptyDirectory value */);
|
||||
return fs.innerGetFileStatus(p, true,
|
||||
StatusProbeEnum.ALL);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -22,6 +22,8 @@ import org.apache.hadoop.fs.FileStatus;
|
|||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.contract.ContractTestUtils;
|
||||
import org.apache.hadoop.fs.s3a.impl.StatusProbeEnum;
|
||||
|
||||
import org.junit.Test;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
@ -85,14 +87,23 @@ public class ITestS3AFileOperationCost extends AbstractS3ATestBase {
|
|||
Path dir = path("empty");
|
||||
fs.mkdirs(dir);
|
||||
resetMetricDiffs();
|
||||
S3AFileStatus status = fs.innerGetFileStatus(dir, true);
|
||||
assertSame("not empty: " + status, status.isEmptyDirectory(),
|
||||
Tristate.TRUE);
|
||||
S3AFileStatus status = fs.innerGetFileStatus(dir, true,
|
||||
StatusProbeEnum.ALL);
|
||||
assertSame("not empty: " + status, Tristate.TRUE,
|
||||
status.isEmptyDirectory());
|
||||
|
||||
if (!fs.hasMetadataStore()) {
|
||||
metadataRequests.assertDiffEquals(2);
|
||||
}
|
||||
listRequests.assertDiffEquals(0);
|
||||
|
||||
// but now only ask for the directories and the file check is skipped.
|
||||
resetMetricDiffs();
|
||||
fs.innerGetFileStatus(dir, false,
|
||||
StatusProbeEnum.DIRECTORIES);
|
||||
if (!fs.hasMetadataStore()) {
|
||||
metadataRequests.assertDiffEquals(1);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -128,7 +139,8 @@ public class ITestS3AFileOperationCost extends AbstractS3ATestBase {
|
|||
Path simpleFile = new Path(dir, "simple.txt");
|
||||
touch(fs, simpleFile);
|
||||
resetMetricDiffs();
|
||||
S3AFileStatus status = fs.innerGetFileStatus(dir, true);
|
||||
S3AFileStatus status = fs.innerGetFileStatus(dir, true,
|
||||
StatusProbeEnum.ALL);
|
||||
if (status.isEmptyDirectory() == Tristate.TRUE) {
|
||||
// erroneous state
|
||||
String fsState = fs.toString();
|
||||
|
|
|
@ -129,6 +129,10 @@ public class ITestS3ARemoteFileChanged extends AbstractS3ATestBase {
|
|||
|
||||
private Optional<AmazonS3> originalS3Client = Optional.empty();
|
||||
|
||||
private static final String INCONSISTENT = "inconsistent";
|
||||
|
||||
private static final String CONSISTENT = "consistent";
|
||||
|
||||
private enum InteractionType {
|
||||
READ,
|
||||
READ_AFTER_DELETE,
|
||||
|
@ -280,15 +284,21 @@ public class ITestS3ARemoteFileChanged extends AbstractS3ATestBase {
|
|||
CHANGE_DETECT_MODE,
|
||||
RETRY_LIMIT,
|
||||
RETRY_INTERVAL,
|
||||
METADATASTORE_AUTHORITATIVE);
|
||||
S3GUARD_CONSISTENCY_RETRY_LIMIT,
|
||||
S3GUARD_CONSISTENCY_RETRY_INTERVAL,
|
||||
METADATASTORE_AUTHORITATIVE,
|
||||
AUTHORITATIVE_PATH);
|
||||
conf.set(CHANGE_DETECT_SOURCE, changeDetectionSource);
|
||||
conf.set(CHANGE_DETECT_MODE, changeDetectionMode);
|
||||
conf.setBoolean(METADATASTORE_AUTHORITATIVE, authMode);
|
||||
conf.set(AUTHORITATIVE_PATH, "");
|
||||
|
||||
// reduce retry limit so FileNotFoundException cases timeout faster,
|
||||
// speeding up the tests
|
||||
conf.setInt(RETRY_LIMIT, TEST_MAX_RETRIES);
|
||||
conf.set(RETRY_INTERVAL, TEST_RETRY_INTERVAL);
|
||||
conf.setInt(S3GUARD_CONSISTENCY_RETRY_LIMIT, TEST_MAX_RETRIES);
|
||||
conf.set(S3GUARD_CONSISTENCY_RETRY_INTERVAL, TEST_RETRY_INTERVAL);
|
||||
|
||||
if (conf.getClass(S3_METADATA_STORE_IMPL, MetadataStore.class) ==
|
||||
NullMetadataStore.class) {
|
||||
|
@ -697,10 +707,8 @@ public class ITestS3ARemoteFileChanged extends AbstractS3ATestBase {
|
|||
Path sourcedir = new Path(basedir, "sourcedir");
|
||||
fs.mkdirs(sourcedir);
|
||||
Path destdir = new Path(basedir, "destdir");
|
||||
String inconsistent = "inconsistent";
|
||||
String consistent = "consistent";
|
||||
Path inconsistentFile = new Path(sourcedir, inconsistent);
|
||||
Path consistentFile = new Path(sourcedir, consistent);
|
||||
Path inconsistentFile = new Path(sourcedir, INCONSISTENT);
|
||||
Path consistentFile = new Path(sourcedir, CONSISTENT);
|
||||
|
||||
// write the consistent data
|
||||
writeDataset(fs, consistentFile, TEST_DATA_BYTES, TEST_DATA_BYTES.length,
|
||||
|
@ -723,6 +731,82 @@ public class ITestS3ARemoteFileChanged extends AbstractS3ATestBase {
|
|||
fs.rename(sourcedir, destdir);
|
||||
}
|
||||
|
||||
/**
|
||||
* Tests doing a rename() on a file which is eventually visible.
|
||||
*/
|
||||
@Test
|
||||
public void testRenameEventuallyVisibleFile() throws Throwable {
|
||||
requireS3Guard();
|
||||
AmazonS3 s3ClientSpy = spyOnFilesystem();
|
||||
Path basedir = path();
|
||||
Path sourcedir = new Path(basedir, "sourcedir");
|
||||
fs.mkdirs(sourcedir);
|
||||
Path destdir = new Path(basedir, "destdir");
|
||||
Path inconsistentFile = new Path(sourcedir, INCONSISTENT);
|
||||
Path consistentFile = new Path(sourcedir, CONSISTENT);
|
||||
|
||||
// write the consistent data
|
||||
writeDataset(fs, consistentFile, TEST_DATA_BYTES, TEST_DATA_BYTES.length,
|
||||
1024, true, true);
|
||||
|
||||
Pair<Integer, Integer> counts = renameInconsistencyCounts(0);
|
||||
int metadataInconsistencyCount = counts.getLeft();
|
||||
|
||||
writeDataset(fs, inconsistentFile, TEST_DATA_BYTES, TEST_DATA_BYTES.length,
|
||||
1024, true, true);
|
||||
|
||||
stubTemporaryNotFound(s3ClientSpy, metadataInconsistencyCount,
|
||||
inconsistentFile);
|
||||
|
||||
// must not fail since the inconsistency doesn't last through the
|
||||
// configured retry limit
|
||||
fs.rename(sourcedir, destdir);
|
||||
}
|
||||
|
||||
/**
|
||||
* Tests doing a rename() on a file which never quite appears will
|
||||
* fail with a RemoteFileChangedException rather than have the exception
|
||||
* downgraded to a failure.
|
||||
*/
|
||||
@Test
|
||||
public void testRenameMissingFile()
|
||||
throws Throwable {
|
||||
requireS3Guard();
|
||||
AmazonS3 s3ClientSpy = spyOnFilesystem();
|
||||
Path basedir = path();
|
||||
Path sourcedir = new Path(basedir, "sourcedir");
|
||||
fs.mkdirs(sourcedir);
|
||||
Path destdir = new Path(basedir, "destdir");
|
||||
Path inconsistentFile = new Path(sourcedir, INCONSISTENT);
|
||||
Path consistentFile = new Path(sourcedir, CONSISTENT);
|
||||
|
||||
// write the consistent data
|
||||
writeDataset(fs, consistentFile, TEST_DATA_BYTES, TEST_DATA_BYTES.length,
|
||||
1024, true, true);
|
||||
|
||||
Pair<Integer, Integer> counts = renameInconsistencyCounts(0);
|
||||
int metadataInconsistencyCount = counts.getLeft();
|
||||
|
||||
writeDataset(fs, inconsistentFile, TEST_DATA_BYTES, TEST_DATA_BYTES.length,
|
||||
1024, true, true);
|
||||
|
||||
stubTemporaryNotFound(s3ClientSpy, metadataInconsistencyCount + 1,
|
||||
inconsistentFile);
|
||||
|
||||
String expected = fs.hasMetadataStore()
|
||||
? RemoteFileChangedException.FILE_NEVER_FOUND
|
||||
: RemoteFileChangedException.FILE_NOT_FOUND_SINGLE_ATTEMPT;
|
||||
RemoteFileChangedException ex = intercept(
|
||||
RemoteFileChangedException.class,
|
||||
expected,
|
||||
() -> fs.rename(sourcedir, destdir));
|
||||
assertEquals("Path in " + ex,
|
||||
inconsistentFile, ex.getPath());
|
||||
if (!(ex.getCause() instanceof FileNotFoundException)) {
|
||||
throw ex;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Ensures a file can be renamed when there is no version metadata
|
||||
* (ETag, versionId).
|
||||
|
@ -910,6 +994,9 @@ public class ITestS3ARemoteFileChanged extends AbstractS3ATestBase {
|
|||
LOG.debug("Updated file info: {}: version={}, etag={}", testpath,
|
||||
newStatus.getVersionId(), newStatus.getETag());
|
||||
|
||||
LOG.debug("File {} will be inconsistent for {} HEAD and {} GET requests",
|
||||
testpath, getMetadataInconsistencyCount, getObjectInconsistencyCount);
|
||||
|
||||
stubTemporaryUnavailable(s3ClientSpy, getObjectInconsistencyCount,
|
||||
testpath, newStatus);
|
||||
|
||||
|
@ -919,6 +1006,8 @@ public class ITestS3ARemoteFileChanged extends AbstractS3ATestBase {
|
|||
if (versionCheckingIsOnServer()) {
|
||||
// only stub inconsistency when mode is server since no constraints that
|
||||
// should trigger inconsistency are passed in any other mode
|
||||
LOG.debug("File {} will be inconsistent for {} COPY operations",
|
||||
testpath, copyInconsistencyCount);
|
||||
stubTemporaryCopyInconsistency(s3ClientSpy, testpath, newStatus,
|
||||
copyInconsistencyCount);
|
||||
}
|
||||
|
@ -1230,6 +1319,18 @@ public class ITestS3ARemoteFileChanged extends AbstractS3ATestBase {
|
|||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Match any getObjectMetadata request against a given path.
|
||||
* @param path path to to match.
|
||||
* @return the matching request.
|
||||
*/
|
||||
private GetObjectMetadataRequest matchingMetadataRequest(Path path) {
|
||||
return ArgumentMatchers.argThat(request -> {
|
||||
return request.getBucketName().equals(fs.getBucket())
|
||||
&& request.getKey().equals(fs.pathToKey(path));
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Skip a test case if it needs S3Guard and the filesystem does
|
||||
* not have it.
|
||||
|
@ -1290,4 +1391,42 @@ public class ITestS3ARemoteFileChanged extends AbstractS3ATestBase {
|
|||
private boolean versionCheckingIsOnServer() {
|
||||
return fs.getChangeDetectionPolicy().getMode() == Mode.Server;
|
||||
}
|
||||
|
||||
/**
|
||||
* Stubs {@link AmazonS3#getObject(GetObjectRequest)}
|
||||
* within s3ClientSpy to return throw a FileNotFoundException
|
||||
* until inconsistentCallCount calls have been made.
|
||||
* This simulates the condition where the S3 endpoint is caching
|
||||
* a 404 request, or there is a tombstone in the way which has yet
|
||||
* to clear.
|
||||
* @param s3ClientSpy the spy to stub
|
||||
* @param inconsistentCallCount the number of calls that should return the
|
||||
* null response
|
||||
* @param testpath the path of the object the stub should apply to
|
||||
*/
|
||||
private void stubTemporaryNotFound(AmazonS3 s3ClientSpy,
|
||||
int inconsistentCallCount, Path testpath) {
|
||||
Answer<ObjectMetadata> notFound = new Answer<ObjectMetadata>() {
|
||||
private int callCount = 0;
|
||||
|
||||
@Override
|
||||
public ObjectMetadata answer(InvocationOnMock invocation
|
||||
) throws Throwable {
|
||||
// simulates delayed visibility.
|
||||
callCount++;
|
||||
if (callCount <= inconsistentCallCount) {
|
||||
LOG.info("Temporarily unavailable {} count {} of {}",
|
||||
testpath, callCount, inconsistentCallCount);
|
||||
logLocationAtDebug();
|
||||
throw new FileNotFoundException(testpath.toString());
|
||||
}
|
||||
return (ObjectMetadata) invocation.callRealMethod();
|
||||
}
|
||||
};
|
||||
|
||||
// HEAD requests will fail
|
||||
doAnswer(notFound).when(s3ClientSpy).getObjectMetadata(
|
||||
matchingMetadataRequest(testpath));
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -32,6 +32,7 @@ import org.assertj.core.api.Assertions;
|
|||
import org.junit.Test;
|
||||
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.s3a.impl.StatusProbeEnum;
|
||||
import org.apache.hadoop.fs.s3a.impl.StoreContext;
|
||||
import org.apache.hadoop.fs.s3a.s3guard.DDBPathMetadata;
|
||||
import org.apache.hadoop.fs.s3a.s3guard.DynamoDBMetadataStore;
|
||||
|
@ -92,7 +93,7 @@ public class ITestS3GuardEmptyDirs extends AbstractS3ATestBase {
|
|||
}
|
||||
|
||||
private S3AFileStatus getEmptyDirStatus(Path dir) throws IOException {
|
||||
return getFileSystem().innerGetFileStatus(dir, true);
|
||||
return getFileSystem().innerGetFileStatus(dir, true, StatusProbeEnum.ALL);
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -118,21 +119,22 @@ public class ITestS3GuardEmptyDirs extends AbstractS3ATestBase {
|
|||
Path newFile = path("existing-dir/new-file");
|
||||
touch(fs, newFile);
|
||||
|
||||
S3AFileStatus status = fs.innerGetFileStatus(existingDir, true);
|
||||
S3AFileStatus status = fs.innerGetFileStatus(existingDir, true,
|
||||
StatusProbeEnum.ALL);
|
||||
assertEquals("Should not be empty dir", Tristate.FALSE,
|
||||
status.isEmptyDirectory());
|
||||
|
||||
// 3. Assert that removing the only file the MetadataStore witnessed
|
||||
// being created doesn't cause it to think the directory is now empty.
|
||||
fs.delete(newFile, false);
|
||||
status = fs.innerGetFileStatus(existingDir, true);
|
||||
status = fs.innerGetFileStatus(existingDir, true, StatusProbeEnum.ALL);
|
||||
assertEquals("Should not be empty dir", Tristate.FALSE,
|
||||
status.isEmptyDirectory());
|
||||
|
||||
// 4. Assert that removing the final file, that existed "before"
|
||||
// MetadataStore started, *does* cause the directory to be marked empty.
|
||||
fs.delete(existingFile, false);
|
||||
status = fs.innerGetFileStatus(existingDir, true);
|
||||
status = fs.innerGetFileStatus(existingDir, true, StatusProbeEnum.ALL);
|
||||
assertEquals("Should be empty dir now", Tristate.TRUE,
|
||||
status.isEmptyDirectory());
|
||||
} finally {
|
||||
|
|
|
@ -54,6 +54,7 @@ import org.apache.hadoop.test.LambdaTestUtils;
|
|||
import static org.apache.hadoop.fs.contract.ContractTestUtils.readBytesToString;
|
||||
import static org.apache.hadoop.fs.contract.ContractTestUtils.touch;
|
||||
import static org.apache.hadoop.fs.contract.ContractTestUtils.writeTextFile;
|
||||
import static org.apache.hadoop.fs.s3a.Constants.AUTHORITATIVE_PATH;
|
||||
import static org.apache.hadoop.fs.s3a.Constants.DEFAULT_METADATASTORE_METADATA_TTL;
|
||||
import static org.apache.hadoop.fs.s3a.Constants.METADATASTORE_AUTHORITATIVE;
|
||||
import static org.apache.hadoop.fs.s3a.Constants.METADATASTORE_METADATA_TTL;
|
||||
|
@ -224,7 +225,8 @@ public class ITestS3GuardOutOfBandOperations extends AbstractS3ATestBase {
|
|||
|
||||
removeBaseAndBucketOverrides(uri.getHost(), config,
|
||||
METADATASTORE_AUTHORITATIVE,
|
||||
METADATASTORE_METADATA_TTL);
|
||||
METADATASTORE_METADATA_TTL,
|
||||
AUTHORITATIVE_PATH);
|
||||
config.setBoolean(METADATASTORE_AUTHORITATIVE, authoritativeMode);
|
||||
config.setLong(METADATASTORE_METADATA_TTL,
|
||||
DEFAULT_METADATASTORE_METADATA_TTL);
|
||||
|
@ -247,7 +249,8 @@ public class ITestS3GuardOutOfBandOperations extends AbstractS3ATestBase {
|
|||
removeBaseAndBucketOverrides(uri.getHost(), config,
|
||||
S3_METADATA_STORE_IMPL);
|
||||
removeBaseAndBucketOverrides(uri.getHost(), config,
|
||||
METADATASTORE_AUTHORITATIVE);
|
||||
METADATASTORE_AUTHORITATIVE,
|
||||
AUTHORITATIVE_PATH);
|
||||
return createFS(uri, config);
|
||||
}
|
||||
|
||||
|
|
|
@ -18,6 +18,7 @@
|
|||
|
||||
package org.apache.hadoop.fs.s3a;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
|
@ -331,7 +332,7 @@ public class ITestS3GuardTtl extends AbstractS3ATestBase {
|
|||
|
||||
// listing will contain the tombstone with oldtime
|
||||
when(mockTimeProvider.getNow()).thenReturn(oldTime);
|
||||
final DirListingMetadata fullDLM = ms.listChildren(baseDirPath);
|
||||
final DirListingMetadata fullDLM = getDirListingMetadata(ms, baseDirPath);
|
||||
List<Path> containedPaths = fullDLM.getListing().stream()
|
||||
.map(pm -> pm.getFileStatus().getPath())
|
||||
.collect(Collectors.toList());
|
||||
|
@ -342,7 +343,8 @@ public class ITestS3GuardTtl extends AbstractS3ATestBase {
|
|||
|
||||
// listing will be filtered, and won't contain the tombstone with oldtime
|
||||
when(mockTimeProvider.getNow()).thenReturn(newTime);
|
||||
final DirListingMetadata filteredDLM = ms.listChildren(baseDirPath);
|
||||
final DirListingMetadata filteredDLM = getDirListingMetadata(ms,
|
||||
baseDirPath);
|
||||
containedPaths = filteredDLM.getListing().stream()
|
||||
.map(pm -> pm.getFileStatus().getPath())
|
||||
.collect(Collectors.toList());
|
||||
|
@ -356,4 +358,14 @@ public class ITestS3GuardTtl extends AbstractS3ATestBase {
|
|||
}
|
||||
}
|
||||
|
||||
protected DirListingMetadata getDirListingMetadata(final MetadataStore ms,
|
||||
final Path baseDirPath) throws IOException {
|
||||
final DirListingMetadata fullDLM = ms.listChildren(baseDirPath);
|
||||
Assertions.assertThat(fullDLM)
|
||||
.describedAs("Metastrore directory listing of %s",
|
||||
baseDirPath)
|
||||
.isNotNull();
|
||||
return fullDLM;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -132,6 +132,7 @@ public class ITestS3GuardWriteBack extends AbstractS3ATestBase {
|
|||
|
||||
conf.set(Constants.S3_METADATA_STORE_IMPL, metastore);
|
||||
conf.setBoolean(METADATASTORE_AUTHORITATIVE, authoritativeMeta);
|
||||
conf.unset(AUTHORITATIVE_PATH);
|
||||
S3AUtils.setBucketOption(conf, host,
|
||||
METADATASTORE_AUTHORITATIVE,
|
||||
Boolean.toString(authoritativeMeta));
|
||||
|
|
|
@ -35,6 +35,7 @@ import org.apache.hadoop.fs.s3a.auth.MarshalledCredentialBinding;
|
|||
import org.apache.hadoop.fs.s3a.auth.MarshalledCredentials;
|
||||
import org.apache.hadoop.fs.s3a.commit.CommitConstants;
|
||||
|
||||
import org.apache.hadoop.fs.s3a.impl.StatusProbeEnum;
|
||||
import org.apache.hadoop.fs.s3a.s3guard.MetadataStore;
|
||||
import org.apache.hadoop.fs.s3a.s3guard.MetadataStoreCapabilities;
|
||||
import org.apache.hadoop.fs.s3native.S3xLoginHelper;
|
||||
|
@ -483,6 +484,7 @@ public final class S3ATestUtils {
|
|||
LOG.debug("Enabling S3Guard, authoritative={}, implementation={}",
|
||||
authoritative, implClass);
|
||||
conf.setBoolean(METADATASTORE_AUTHORITATIVE, authoritative);
|
||||
conf.set(AUTHORITATIVE_PATH, "");
|
||||
conf.set(S3_METADATA_STORE_IMPL, implClass);
|
||||
conf.setBoolean(S3GUARD_DDB_TABLE_CREATE_KEY, true);
|
||||
}
|
||||
|
@ -855,7 +857,7 @@ public final class S3ATestUtils {
|
|||
public static S3AFileStatus getStatusWithEmptyDirFlag(
|
||||
final S3AFileSystem fs,
|
||||
final Path dir) throws IOException {
|
||||
return fs.innerGetFileStatus(dir, true);
|
||||
return fs.innerGetFileStatus(dir, true, StatusProbeEnum.ALL);
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -30,6 +30,7 @@ import java.util.Set;
|
|||
import java.util.UUID;
|
||||
|
||||
import com.google.common.collect.Sets;
|
||||
import org.assertj.core.api.Assertions;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
import org.junit.rules.TemporaryFolder;
|
||||
|
@ -52,8 +53,10 @@ import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
|
|||
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
|
||||
import org.apache.hadoop.util.DurationInfo;
|
||||
|
||||
import static org.apache.hadoop.fs.contract.ContractTestUtils.verifyPathExists;
|
||||
import static org.apache.hadoop.fs.s3a.S3ATestUtils.disableFilesystemCaching;
|
||||
import static org.apache.hadoop.fs.s3a.commit.InternalCommitterConstants.FS_S3A_COMMITTER_STAGING_UUID;
|
||||
import static org.apache.hadoop.fs.s3a.commit.CommitConstants.FS_S3A_COMMITTER_STAGING_TMP_PATH;
|
||||
|
||||
/**
|
||||
* Test for an MR Job with all the different committers.
|
||||
|
@ -112,6 +115,7 @@ public abstract class AbstractITCommitMRJob extends AbstractYarnClusterITest {
|
|||
String committerPath = "file:" + mockResultsFile;
|
||||
jobConf.set("mock-results-file", committerPath);
|
||||
jobConf.set(FS_S3A_COMMITTER_STAGING_UUID, commitUUID);
|
||||
jobConf.set(FS_S3A_COMMITTER_STAGING_TMP_PATH, "/staging");
|
||||
|
||||
mrJob.setInputFormatClass(TextInputFormat.class);
|
||||
FileInputFormat.addInputPath(mrJob, new Path(temp.getRoot().toURI()));
|
||||
|
@ -143,6 +147,10 @@ public abstract class AbstractITCommitMRJob extends AbstractYarnClusterITest {
|
|||
}
|
||||
|
||||
waitForConsistency();
|
||||
verifyPathExists(fs,
|
||||
"MR job Output directory not found,"
|
||||
+ " even though the job did not report a failure",
|
||||
outputPath);
|
||||
assertIsDirectory(outputPath);
|
||||
FileStatus[] results = fs.listStatus(outputPath,
|
||||
S3AUtils.HIDDEN_FILE_FILTER);
|
||||
|
@ -160,16 +168,17 @@ public abstract class AbstractITCommitMRJob extends AbstractYarnClusterITest {
|
|||
fs, "MR job");
|
||||
List<String> successFiles = successData.getFilenames();
|
||||
String commitData = successData.toString();
|
||||
assertTrue("No filenames in " + commitData,
|
||||
!successFiles.isEmpty());
|
||||
assertFalse("No filenames in " + commitData,
|
||||
successFiles.isEmpty());
|
||||
|
||||
assertEquals("Should commit the expected files",
|
||||
expectedFiles, actualFiles);
|
||||
Assertions.assertThat(actualFiles)
|
||||
.describedAs("Committed files in the job output directory")
|
||||
.containsExactlyInAnyOrderElementsOf(expectedFiles);
|
||||
|
||||
Assertions.assertThat(successFiles)
|
||||
.describedAs("List of committed files in %s", commitData)
|
||||
.containsExactlyInAnyOrderElementsOf(expectedKeys);
|
||||
|
||||
Set<String> summaryKeys = Sets.newHashSet();
|
||||
summaryKeys.addAll(successFiles);
|
||||
assertEquals("Summary keyset doesn't list the the expected paths "
|
||||
+ commitData, expectedKeys, summaryKeys);
|
||||
assertPathDoesNotExist("temporary dir",
|
||||
new Path(outputPath, CommitConstants.TEMPORARY));
|
||||
customPostExecutionValidation(outputPath, successData);
|
||||
|
|
Loading…
Reference in New Issue