HADOOP-16853. ITestS3GuardOutOfBandOperations failing on versioned S3 buckets (#1840)
Contributed by Steve Loughran. Signed-off-by: Mingliang Liu <liuml07@apache.org>
This commit is contained in:
parent
42dfd270a1
commit
929004074f
|
@ -51,6 +51,8 @@ import org.apache.hadoop.fs.contract.ContractTestUtils;
|
||||||
import org.apache.hadoop.test.LambdaTestUtils;
|
import org.apache.hadoop.test.LambdaTestUtils;
|
||||||
|
|
||||||
import static org.apache.hadoop.fs.contract.ContractTestUtils.readBytesToString;
|
import static org.apache.hadoop.fs.contract.ContractTestUtils.readBytesToString;
|
||||||
|
import static org.apache.hadoop.fs.contract.ContractTestUtils.readDataset;
|
||||||
|
import static org.apache.hadoop.fs.contract.ContractTestUtils.toChar;
|
||||||
import static org.apache.hadoop.fs.contract.ContractTestUtils.touch;
|
import static org.apache.hadoop.fs.contract.ContractTestUtils.touch;
|
||||||
import static org.apache.hadoop.fs.contract.ContractTestUtils.writeTextFile;
|
import static org.apache.hadoop.fs.contract.ContractTestUtils.writeTextFile;
|
||||||
import static org.apache.hadoop.fs.s3a.Constants.AUTHORITATIVE_PATH;
|
import static org.apache.hadoop.fs.s3a.Constants.AUTHORITATIVE_PATH;
|
||||||
|
@ -305,11 +307,6 @@ public class ITestS3GuardOutOfBandOperations extends AbstractS3ATestBase {
|
||||||
overwriteFileInListing("THE TEXT", "THE LONGER TEXT");
|
overwriteFileInListing("THE TEXT", "THE LONGER TEXT");
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testListingDelete() throws Exception {
|
|
||||||
deleteFileInListing();
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Tests that tombstone expiry is implemented. If a file is created raw
|
* Tests that tombstone expiry is implemented. If a file is created raw
|
||||||
* while the tombstone exist in ms for with the same name then S3Guard will
|
* while the tombstone exist in ms for with the same name then S3Guard will
|
||||||
|
@ -660,8 +657,7 @@ public class ITestS3GuardOutOfBandOperations extends AbstractS3ATestBase {
|
||||||
LOG.info("Authoritative: {} status path: {}",
|
LOG.info("Authoritative: {} status path: {}",
|
||||||
allowAuthoritative, status.getPath());
|
allowAuthoritative, status.getPath());
|
||||||
final boolean versionedChangeDetection =
|
final boolean versionedChangeDetection =
|
||||||
getFileSystem().getChangeDetectionPolicy().getSource()
|
isVersionedChangeDetection();
|
||||||
== Source.VersionId;
|
|
||||||
if (!versionedChangeDetection) {
|
if (!versionedChangeDetection) {
|
||||||
expectExceptionWhenReading(testFilePath, text);
|
expectExceptionWhenReading(testFilePath, text);
|
||||||
expectExceptionWhenReadingOpenFileAPI(testFilePath, text, null);
|
expectExceptionWhenReadingOpenFileAPI(testFilePath, text, null);
|
||||||
|
@ -939,8 +935,8 @@ public class ITestS3GuardOutOfBandOperations extends AbstractS3ATestBase {
|
||||||
/**
|
/**
|
||||||
* Delete a file and use listStatus to build up the S3Guard cache.
|
* Delete a file and use listStatus to build up the S3Guard cache.
|
||||||
*/
|
*/
|
||||||
private void deleteFileInListing()
|
@Test
|
||||||
throws Exception {
|
public void testListingDelete() throws Exception {
|
||||||
|
|
||||||
boolean allowAuthoritative = authoritative;
|
boolean allowAuthoritative = authoritative;
|
||||||
LOG.info("Authoritative mode enabled: {}", allowAuthoritative);
|
LOG.info("Authoritative mode enabled: {}", allowAuthoritative);
|
||||||
|
@ -969,16 +965,44 @@ public class ITestS3GuardOutOfBandOperations extends AbstractS3ATestBase {
|
||||||
deleteFile(rawFS, testFilePath);
|
deleteFile(rawFS, testFilePath);
|
||||||
|
|
||||||
// File status will be still readable from s3guard
|
// File status will be still readable from s3guard
|
||||||
FileStatus status = guardedFs.getFileStatus(testFilePath);
|
S3AFileStatus status = (S3AFileStatus)
|
||||||
|
guardedFs.getFileStatus(testFilePath);
|
||||||
LOG.info("authoritative: {} status: {}", allowAuthoritative, status);
|
LOG.info("authoritative: {} status: {}", allowAuthoritative, status);
|
||||||
|
if (isVersionedChangeDetection() && status.getVersionId() != null) {
|
||||||
|
// when the status entry has a version ID, then that may be used
|
||||||
|
// when opening the file on what is clearly a versioned store.
|
||||||
|
int length = text.length();
|
||||||
|
byte[] bytes = readOpenFileAPI(guardedFs, testFilePath, length, null);
|
||||||
|
Assertions.assertThat(toChar(bytes))
|
||||||
|
.describedAs("openFile(%s)", testFilePath)
|
||||||
|
.isEqualTo(text);
|
||||||
|
// reading the rawFS with status will also work.
|
||||||
|
bytes = readOpenFileAPI(rawFS, testFilePath, length, status);
|
||||||
|
Assertions.assertThat(toChar(bytes))
|
||||||
|
.describedAs("openFile(%s)", testFilePath)
|
||||||
|
.isEqualTo(text);
|
||||||
|
bytes = readDataset(guardedFs, testFilePath, length);
|
||||||
|
Assertions.assertThat(toChar(bytes))
|
||||||
|
.describedAs("open(%s)", testFilePath)
|
||||||
|
.isEqualTo(text);
|
||||||
|
expectExceptionWhenReadingOpenFileAPI(rawFS, testFilePath, text,
|
||||||
|
null);
|
||||||
|
} else {
|
||||||
|
// unversioned sequence
|
||||||
expectExceptionWhenReading(testFilePath, text);
|
expectExceptionWhenReading(testFilePath, text);
|
||||||
expectExceptionWhenReadingOpenFileAPI(testFilePath, text, null);
|
expectExceptionWhenReadingOpenFileAPI(testFilePath, text, null);
|
||||||
expectExceptionWhenReadingOpenFileAPI(testFilePath, text, status);
|
expectExceptionWhenReadingOpenFileAPI(testFilePath, text, status);
|
||||||
|
}
|
||||||
} finally {
|
} finally {
|
||||||
guardedFs.delete(testDirPath, true);
|
guardedFs.delete(testDirPath, true);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private boolean isVersionedChangeDetection() {
|
||||||
|
return getFileSystem().getChangeDetectionPolicy().getSource()
|
||||||
|
== Source.VersionId;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* We expect the read to fail with an FNFE: open will be happy.
|
* We expect the read to fail with an FNFE: open will be happy.
|
||||||
* @param testFilePath path of the test file
|
* @param testFilePath path of the test file
|
||||||
|
@ -1005,8 +1029,26 @@ public class ITestS3GuardOutOfBandOperations extends AbstractS3ATestBase {
|
||||||
private void expectExceptionWhenReadingOpenFileAPI(
|
private void expectExceptionWhenReadingOpenFileAPI(
|
||||||
Path testFilePath, String text, FileStatus status)
|
Path testFilePath, String text, FileStatus status)
|
||||||
throws Exception {
|
throws Exception {
|
||||||
|
expectExceptionWhenReadingOpenFileAPI(guardedFs,
|
||||||
|
testFilePath, text, status);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* We expect the read to fail with an FNFE: open will be happy.
|
||||||
|
* @param fs filesystem
|
||||||
|
* @param testFilePath path of the test file
|
||||||
|
* @param text the context in the file.
|
||||||
|
* @param status optional status for the withFileStatus operation.
|
||||||
|
* @throws Exception failure other than the FNFE
|
||||||
|
*/
|
||||||
|
private void expectExceptionWhenReadingOpenFileAPI(
|
||||||
|
final S3AFileSystem fs,
|
||||||
|
final Path testFilePath
|
||||||
|
, final String text,
|
||||||
|
final FileStatus status)
|
||||||
|
throws Exception {
|
||||||
final FutureDataInputStreamBuilder builder
|
final FutureDataInputStreamBuilder builder
|
||||||
= guardedFs.openFile(testFilePath);
|
= fs.openFile(testFilePath);
|
||||||
if (status != null) {
|
if (status != null) {
|
||||||
builder.withFileStatus(status);
|
builder.withFileStatus(status);
|
||||||
}
|
}
|
||||||
|
@ -1018,6 +1060,31 @@ public class ITestS3GuardOutOfBandOperations extends AbstractS3ATestBase {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Open and read a file with the openFile API.
|
||||||
|
* @param fs FS to read from
|
||||||
|
* @param testFilePath path of the test file
|
||||||
|
* @param len data length to read
|
||||||
|
* @param status optional status for the withFileStatus operation.
|
||||||
|
* @throws Exception failure
|
||||||
|
* @return the data
|
||||||
|
*/
|
||||||
|
private byte[] readOpenFileAPI(
|
||||||
|
S3AFileSystem fs,
|
||||||
|
Path testFilePath,
|
||||||
|
int len,
|
||||||
|
FileStatus status) throws Exception {
|
||||||
|
FutureDataInputStreamBuilder builder = fs.openFile(testFilePath);
|
||||||
|
if (status != null) {
|
||||||
|
builder.withFileStatus(status);
|
||||||
|
}
|
||||||
|
try (FSDataInputStream in = builder.build().get()) {
|
||||||
|
byte[] bytes = new byte[len];
|
||||||
|
in.readFully(0, bytes);
|
||||||
|
return bytes;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Wait for a deleted file to no longer be visible.
|
* Wait for a deleted file to no longer be visible.
|
||||||
* @param fs filesystem
|
* @param fs filesystem
|
||||||
|
|
Loading…
Reference in New Issue