From 929004074fe99e39922c52517ac743deac537d00 Mon Sep 17 00:00:00 2001 From: Steve Loughran Date: Mon, 24 Feb 2020 18:45:34 +0000 Subject: [PATCH] HADOOP-16853. ITestS3GuardOutOfBandOperations failing on versioned S3 buckets (#1840) Contributed by Steve Loughran. Signed-off-by: Mingliang Liu --- .../s3a/ITestS3GuardOutOfBandOperations.java | 95 ++++++++++++++++--- 1 file changed, 81 insertions(+), 14 deletions(-) diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3GuardOutOfBandOperations.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3GuardOutOfBandOperations.java index 3cdb4e6eeee..70c62bf49ca 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3GuardOutOfBandOperations.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3GuardOutOfBandOperations.java @@ -51,6 +51,8 @@ import org.apache.hadoop.fs.contract.ContractTestUtils; import org.apache.hadoop.test.LambdaTestUtils; import static org.apache.hadoop.fs.contract.ContractTestUtils.readBytesToString; +import static org.apache.hadoop.fs.contract.ContractTestUtils.readDataset; +import static org.apache.hadoop.fs.contract.ContractTestUtils.toChar; import static org.apache.hadoop.fs.contract.ContractTestUtils.touch; import static org.apache.hadoop.fs.contract.ContractTestUtils.writeTextFile; import static org.apache.hadoop.fs.s3a.Constants.AUTHORITATIVE_PATH; @@ -305,11 +307,6 @@ public class ITestS3GuardOutOfBandOperations extends AbstractS3ATestBase { overwriteFileInListing("THE TEXT", "THE LONGER TEXT"); } - @Test - public void testListingDelete() throws Exception { - deleteFileInListing(); - } - /** * Tests that tombstone expiry is implemented. If a file is created raw * while the tombstone exist in ms for with the same name then S3Guard will @@ -660,8 +657,7 @@ public class ITestS3GuardOutOfBandOperations extends AbstractS3ATestBase { LOG.info("Authoritative: {} status path: {}", allowAuthoritative, status.getPath()); final boolean versionedChangeDetection = - getFileSystem().getChangeDetectionPolicy().getSource() - == Source.VersionId; + isVersionedChangeDetection(); if (!versionedChangeDetection) { expectExceptionWhenReading(testFilePath, text); expectExceptionWhenReadingOpenFileAPI(testFilePath, text, null); @@ -939,8 +935,8 @@ public class ITestS3GuardOutOfBandOperations extends AbstractS3ATestBase { /** * Delete a file and use listStatus to build up the S3Guard cache. */ - private void deleteFileInListing() - throws Exception { + @Test + public void testListingDelete() throws Exception { boolean allowAuthoritative = authoritative; LOG.info("Authoritative mode enabled: {}", allowAuthoritative); @@ -969,16 +965,44 @@ public class ITestS3GuardOutOfBandOperations extends AbstractS3ATestBase { deleteFile(rawFS, testFilePath); // File status will be still readable from s3guard - FileStatus status = guardedFs.getFileStatus(testFilePath); + S3AFileStatus status = (S3AFileStatus) + guardedFs.getFileStatus(testFilePath); LOG.info("authoritative: {} status: {}", allowAuthoritative, status); - expectExceptionWhenReading(testFilePath, text); - expectExceptionWhenReadingOpenFileAPI(testFilePath, text, null); - expectExceptionWhenReadingOpenFileAPI(testFilePath, text, status); + if (isVersionedChangeDetection() && status.getVersionId() != null) { + // when the status entry has a version ID, then that may be used + // when opening the file on what is clearly a versioned store. + int length = text.length(); + byte[] bytes = readOpenFileAPI(guardedFs, testFilePath, length, null); + Assertions.assertThat(toChar(bytes)) + .describedAs("openFile(%s)", testFilePath) + .isEqualTo(text); + // reading the rawFS with status will also work. + bytes = readOpenFileAPI(rawFS, testFilePath, length, status); + Assertions.assertThat(toChar(bytes)) + .describedAs("openFile(%s)", testFilePath) + .isEqualTo(text); + bytes = readDataset(guardedFs, testFilePath, length); + Assertions.assertThat(toChar(bytes)) + .describedAs("open(%s)", testFilePath) + .isEqualTo(text); + expectExceptionWhenReadingOpenFileAPI(rawFS, testFilePath, text, + null); + } else { + // unversioned sequence + expectExceptionWhenReading(testFilePath, text); + expectExceptionWhenReadingOpenFileAPI(testFilePath, text, null); + expectExceptionWhenReadingOpenFileAPI(testFilePath, text, status); + } } finally { guardedFs.delete(testDirPath, true); } } + private boolean isVersionedChangeDetection() { + return getFileSystem().getChangeDetectionPolicy().getSource() + == Source.VersionId; + } + /** * We expect the read to fail with an FNFE: open will be happy. * @param testFilePath path of the test file @@ -1005,8 +1029,26 @@ public class ITestS3GuardOutOfBandOperations extends AbstractS3ATestBase { private void expectExceptionWhenReadingOpenFileAPI( Path testFilePath, String text, FileStatus status) throws Exception { + expectExceptionWhenReadingOpenFileAPI(guardedFs, + testFilePath, text, status); + } + + /** + * We expect the read to fail with an FNFE: open will be happy. + * @param fs filesystem + * @param testFilePath path of the test file + * @param text the context in the file. + * @param status optional status for the withFileStatus operation. + * @throws Exception failure other than the FNFE + */ + private void expectExceptionWhenReadingOpenFileAPI( + final S3AFileSystem fs, + final Path testFilePath + , final String text, + final FileStatus status) + throws Exception { final FutureDataInputStreamBuilder builder - = guardedFs.openFile(testFilePath); + = fs.openFile(testFilePath); if (status != null) { builder.withFileStatus(status); } @@ -1018,6 +1060,31 @@ public class ITestS3GuardOutOfBandOperations extends AbstractS3ATestBase { } } + /** + * Open and read a file with the openFile API. + * @param fs FS to read from + * @param testFilePath path of the test file + * @param len data length to read + * @param status optional status for the withFileStatus operation. + * @throws Exception failure + * @return the data + */ + private byte[] readOpenFileAPI( + S3AFileSystem fs, + Path testFilePath, + int len, + FileStatus status) throws Exception { + FutureDataInputStreamBuilder builder = fs.openFile(testFilePath); + if (status != null) { + builder.withFileStatus(status); + } + try (FSDataInputStream in = builder.build().get()) { + byte[] bytes = new byte[len]; + in.readFully(0, bytes); + return bytes; + } + } + /** * Wait for a deleted file to no longer be visible. * @param fs filesystem