HADOOP-15117. open(PathHandle) contract test should be exhaustive for default options

This commit is contained in:
Chris Douglas 2017-12-30 17:58:35 -08:00
parent 4bb765ee2e
commit 7fe6f83c8f
4 changed files with 304 additions and 258 deletions

View File

@ -19,27 +19,17 @@ package org.apache.hadoop.fs.contract;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Arrays;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.InvalidPathHandleException;
import org.apache.hadoop.fs.Options.HandleOpt;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathHandle;
import org.apache.hadoop.fs.RawPathHandle;
import org.apache.hadoop.io.IOUtils;
import static org.apache.hadoop.fs.contract.ContractTestUtils.appendFile;
import static org.apache.hadoop.fs.contract.ContractTestUtils.createFile;
import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset;
import static org.apache.hadoop.fs.contract.ContractTestUtils.skip;
import static org.apache.hadoop.fs.contract.ContractTestUtils.touch;
import static org.apache.hadoop.fs.contract.ContractTestUtils.verifyRead;
import static org.apache.hadoop.fs.contract.ContractTestUtils.verifyFileContents;
import org.junit.Test;
@ -173,252 +163,4 @@ public abstract class AbstractContractOpenTest
instream.close();
}
/**
* Skip a test case if the FS doesn't support file references.
* The feature is assumed to be unsupported unless stated otherwise.
*/
protected void assumeSupportsFileReference() throws IOException {
if (getContract().isSupported(SUPPORTS_FILE_REFERENCE, false)) {
return;
}
skip("Skipping as unsupported feature: " + SUPPORTS_FILE_REFERENCE);
}
/**
* Skip a test case if the FS doesn't support content validation.
* The feature is assumed to be unsupported unless stated otherwise.
*/
protected void assumeSupportsContentCheck() throws IOException {
if (getContract().isSupported(SUPPORTS_CONTENT_CHECK, false)) {
return;
}
skip("Skipping as unsupported feature: " + SUPPORTS_CONTENT_CHECK);
}
private PathHandle getHandleOrSkip(FileStatus stat, HandleOpt... opts) {
try {
return getFileSystem().getPathHandle(stat, opts);
} catch (UnsupportedOperationException e) {
skip("FileSystem does not support " + Arrays.toString(opts));
}
// unreachable
return null;
}
/**
* Verify {@link HandleOpt#exact()} handle semantics.
* @throws Throwable on error
*/
@Test
public void testOpenFileByExact() throws Throwable {
describe("verify open(getPathHandle(FileStatus, exact())) operations" +
"detect changes");
assumeSupportsContentCheck();
assumeSupportsFileReference();
Path path1 = path("testopenfilebyexact1");
Path path2 = path("testopenfilebyexact2");
byte[] file1 = dataset(TEST_FILE_LEN, 43, 255);
createFile(getFileSystem(), path1, false, file1);
FileStatus stat1 = getFileSystem().getFileStatus(path1);
assertNotNull(stat1);
assertEquals(path1, stat1.getPath());
ContractTestUtils.rename(getFileSystem(), path1, path2);
FileStatus stat2 = getFileSystem().getFileStatus(path2);
assertNotNull(stat2);
assertEquals(path2, stat2.getPath());
// create identical file at same location, orig still exists at path2
createFile(getFileSystem(), path1, false, file1);
PathHandle fd1 = getHandleOrSkip(stat1, HandleOpt.exact());
PathHandle fd2 = getHandleOrSkip(stat2, HandleOpt.exact());
// verify path1, path2 contents identical
verifyFileContents(getFileSystem(), path1, file1);
verifyFileContents(getFileSystem(), path2, file1);
try {
// the PathHandle will not resolve, even though
// the original entity exists, it has not been modified, and an
// identical file exists at the old path. The handle would also
// fail to resolve if path1 had been modified
instream = getFileSystem().open(fd1);
fail("Expected an exception");
} catch (InvalidPathHandleException e) {
// expected
}
// verify unchanged resolves
instream = getFileSystem().open(fd2);
verifyRead(instream, file1, 0, TEST_FILE_LEN);
}
/**
* Verify {@link HandleOpt#content()} handle semantics.
* @throws Throwable on error
*/
@Test
public void testOpenFileByContent() throws Throwable {
describe("verify open(getPathHandle(FileStatus, content())) operations" +
"follow relocation");
assumeSupportsContentCheck();
assumeSupportsFileReference();
Path path1 = path("testopenfilebycontent1");
Path path2 = path("testopenfilebycontent2");
byte[] file1 = dataset(TEST_FILE_LEN, 43, 255);
createFile(getFileSystem(), path1, false, file1);
FileStatus stat = getFileSystem().getFileStatus(path1);
assertNotNull(stat);
assertEquals(path1, stat.getPath());
// rename the file after obtaining FileStatus
ContractTestUtils.rename(getFileSystem(), path1, path2);
// obtain handle to entity from #getFileStatus call
PathHandle fd = getHandleOrSkip(stat, HandleOpt.content());
try (FSDataInputStream in = getFileSystem().open(fd)) {
// verify read of consistent content at new location
verifyRead(in, file1, 0, TEST_FILE_LEN);
}
// modify the file at its new location by appending data
byte[] file1a = dataset(TEST_FILE_LEN, 44, 255);
appendFile(getFileSystem(), path2, file1a);
byte[] file1x = Arrays.copyOf(file1, file1.length + file1a.length);
System.arraycopy(file1a, 0, file1x, file1.length, file1a.length);
// verify fd entity contains contents of file1 + appended bytes
verifyFileContents(getFileSystem(), path2, file1x);
try {
// handle should not resolve when content changed
instream = getFileSystem().open(fd);
fail("Failed to detect change to content");
} catch (InvalidPathHandleException e) {
// expected
}
}
/**
* Verify {@link HandleOpt#path()} handle semantics.
* @throws Throwable on error
*/
@Test
public void testOpenFileByPath() throws Throwable {
describe("verify open(getPathHandle(FileStatus, path())) operations" +
"detect changes");
assumeSupportsContentCheck();
Path path1 = path("testopenfilebypath1");
Path path2 = path("testopenfilebypath2");
byte[] file1 = dataset(TEST_FILE_LEN, 43, 255);
createFile(getFileSystem(), path1, false, file1);
FileStatus stat1 = getFileSystem().getFileStatus(path1);
assertNotNull(stat1);
assertEquals(path1, stat1.getPath());
ContractTestUtils.rename(getFileSystem(), path1, path2);
FileStatus stat2 = getFileSystem().getFileStatus(path2);
assertNotNull(stat2);
assertEquals(path2, stat2.getPath());
// create identical file at same location, orig still exists at path2
createFile(getFileSystem(), path1, false, file1);
PathHandle fd1 = getHandleOrSkip(stat1, HandleOpt.path());
PathHandle fd2 = getHandleOrSkip(stat2, HandleOpt.path());
// verify path1, path2 contents identical
verifyFileContents(getFileSystem(), path1, file1);
verifyFileContents(getFileSystem(), path2, file1);
try {
// verify attempt to resolve the handle fails
instream = getFileSystem().open(fd1);
fail("Expected an exception");
} catch (InvalidPathHandleException e) {
// expected
}
// verify content change OK
byte[] file2a = dataset(TEST_FILE_LEN, 44, 255);
ContractTestUtils.appendFile(getFileSystem(), path2, file2a);
byte[] file2x = Arrays.copyOf(file1, file1.length + file2a.length);
System.arraycopy(file2a, 0, file2x, file1.length, file2a.length);
// verify path2 contains contents of orig + appended bytes
verifyFileContents(getFileSystem(), path2, file2x);
// verify open by fd succeeds
instream = getFileSystem().open(fd2);
verifyRead(instream, file2x, 0, 2 * TEST_FILE_LEN);
}
/**
* Verify {@link HandleOpt#reference()} handle semantics.
* @throws Throwable on error
*/
@Test
public void testOpenFileByReference() throws Throwable {
describe("verify open(getPathHandle(FileStatus, reference())) operations" +
" are independent of rename");
assumeSupportsFileReference();
Path path1 = path("testopenfilebyref1");
Path path2 = path("testopenfilebyref2");
byte[] file1 = dataset(TEST_FILE_LEN, 43, 255);
createFile(getFileSystem(), path1, false, file1);
FileStatus stat = getFileSystem().getFileStatus(path1);
assertNotNull(stat);
assertEquals(path1, stat.getPath());
ContractTestUtils.rename(getFileSystem(), path1, path2);
byte[] file2 = dataset(TEST_FILE_LEN, 44, 255);
createFile(getFileSystem(), path1, false, file2);
byte[] file1a = dataset(TEST_FILE_LEN, 42, 255);
appendFile(getFileSystem(), path2, file1a);
byte[] file1x = Arrays.copyOf(file1, file1.length + file1a.length);
System.arraycopy(file1a, 0, file1x, file1.length, file1a.length);
PathHandle fd = getHandleOrSkip(stat, HandleOpt.reference());
// verify path2 contains contents of file1 + appended bytes
verifyFileContents(getFileSystem(), path2, file1x);
// verify path1 contents contents of file2
verifyFileContents(getFileSystem(), path1, file2);
// verify fd contains contents of file1 + appended bytes
instream = getFileSystem().open(fd);
verifyRead(instream, file1x, 0, 2 * TEST_FILE_LEN);
}
/**
* Verify {@link PathHandle} may be serialized and restored.
* @throws Throwable on error
*/
@Test
public void testOpenFileBySerializedReference() throws Throwable {
describe("verify PathHandle supports generic serialization");
assumeSupportsFileReference();
Path path1 = path("testopenfilebyref1");
Path path2 = path("testopenfilebyref2");
byte[] file1 = dataset(TEST_FILE_LEN, 43, 255);
createFile(getFileSystem(), path1, false, file1);
FileStatus stat = getFileSystem().getFileStatus(path1);
assertNotNull(stat);
assertEquals(path1, stat.getPath());
ContractTestUtils.rename(getFileSystem(), path1, path2);
byte[] file2 = dataset(TEST_FILE_LEN, 44, 255);
createFile(getFileSystem(), path1, false, file2);
PathHandle fd = getHandleOrSkip(stat, HandleOpt.reference());
// serialize PathHandle
ByteBuffer sb = fd.bytes();
PathHandle fdb = new RawPathHandle(sb);
instream = getFileSystem().open(fdb);
// verify stat contains contents of file1
verifyRead(instream, file1, 0, TEST_FILE_LEN);
// verify path2 contains contents of file1
verifyFileContents(getFileSystem(), path2, file1);
// verify path1 contents contents of file2
verifyFileContents(getFileSystem(), path1, file2);
}
}

View File

@ -0,0 +1,246 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.contract;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.InvalidPathHandleException;
import org.apache.hadoop.fs.Options.HandleOpt;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathHandle;
import static org.apache.hadoop.fs.contract.ContractTestUtils.appendFile;
import static org.apache.hadoop.fs.contract.ContractTestUtils.createFile;
import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset;
import static org.apache.hadoop.fs.contract.ContractTestUtils.skip;
import static org.apache.hadoop.fs.contract.ContractTestUtils.verifyRead;
import static org.apache.hadoop.fs.contract.ContractTestUtils.verifyFileContents;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY;
import org.apache.hadoop.fs.RawPathHandle;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
/**
* Test {@link PathHandle} operations and semantics.
* @see ContractOptions#SUPPORTS_FILE_REFERENCE
* @see ContractOptions#SUPPORTS_CONTENT_CHECK
* @see org.apache.hadoop.fs.FileSystem#getPathHandle(FileStatus, HandleOpt...)
* @see org.apache.hadoop.fs.FileSystem#open(PathHandle)
* @see org.apache.hadoop.fs.FileSystem#open(PathHandle, int)
*/
@RunWith(Parameterized.class)
public abstract class AbstractContractPathHandleTest
extends AbstractFSContractTestBase {
private final HandleOpt[] opts;
private final boolean serialized;
private static final byte[] B1 = dataset(TEST_FILE_LEN, 43, 255);
private static final byte[] B2 = dataset(TEST_FILE_LEN, 44, 255);
/**
* Create an instance of the test from {@link #params()}.
* @param testname Name of the set of options under test
* @param opts Set of {@link HandleOpt} params under test.
* @param serialized Serialize the handle before using it.
*/
public AbstractContractPathHandleTest(String testname, HandleOpt[] opts,
boolean serialized) {
this.opts = opts;
this.serialized = serialized;
}
/**
* Run test against all combinations of default options. Also run each
* after converting the PathHandle to bytes and back.
* @return
*/
@Parameterized.Parameters(name="Test{0}")
public static Collection<Object[]> params() {
return Arrays.asList(
Arrays.asList("Exact", HandleOpt.exact()),
Arrays.asList("Content", HandleOpt.content()),
Arrays.asList("Path", HandleOpt.path()),
Arrays.asList("Reference", HandleOpt.reference())
).stream()
.flatMap((x) -> Arrays.asList(true, false).stream()
.map((b) -> {
ArrayList<Object> y = new ArrayList<>(x);
y.add(b);
return y;
}))
.map(ArrayList::toArray)
.collect(Collectors.toList());
}
@Override
protected Configuration createConfiguration() {
Configuration conf = super.createConfiguration();
conf.setInt(IO_FILE_BUFFER_SIZE_KEY, 4096);
return conf;
}
@Test
public void testIdent() throws IOException {
describe("verify simple open, no changes");
FileStatus stat = testFile(B1);
PathHandle fd = getHandleOrSkip(stat);
verifyFileContents(getFileSystem(), stat.getPath(), B1);
try (FSDataInputStream in = getFileSystem().open(fd)) {
verifyRead(in, B1, 0, TEST_FILE_LEN);
}
}
@Test
public void testChanged() throws IOException {
describe("verify open(PathHandle, changed(*))");
assumeSupportsContentCheck();
HandleOpt.Data data = HandleOpt.getOpt(HandleOpt.Data.class, opts)
.orElseThrow(IllegalArgumentException::new);
FileStatus stat = testFile(B1);
// modify the file by appending data
appendFile(getFileSystem(), stat.getPath(), B2);
byte[] b12 = Arrays.copyOf(B1, B1.length + B2.length);
System.arraycopy(B2, 0, b12, B1.length, B2.length);
// verify fd entity contains contents of file1 + appended bytes
verifyFileContents(getFileSystem(), stat.getPath(), b12);
// get the handle *after* the file has been modified
PathHandle fd = getHandleOrSkip(stat);
try (FSDataInputStream in = getFileSystem().open(fd)) {
assertTrue("Failed to detect content change", data.allowChange());
verifyRead(in, b12, 0, b12.length);
} catch (InvalidPathHandleException e) {
assertFalse("Failed to allow content change", data.allowChange());
}
}
@Test
public void testMoved() throws IOException {
describe("verify open(PathHandle, moved(*))");
assumeSupportsFileReference();
HandleOpt.Location loc = HandleOpt.getOpt(HandleOpt.Location.class, opts)
.orElseThrow(IllegalArgumentException::new);
FileStatus stat = testFile(B1);
// rename the file after obtaining FileStatus
ContractTestUtils.rename(getFileSystem(), stat.getPath(),
path(stat.getPath() + "2"));
// obtain handle to entity from #getFileStatus call
PathHandle fd = getHandleOrSkip(stat);
try (FSDataInputStream in = getFileSystem().open(fd)) {
assertTrue("Failed to detect location change", loc.allowChange());
verifyRead(in, B1, 0, B1.length);
} catch (InvalidPathHandleException e) {
assertFalse("Failed to allow location change", loc.allowChange());
}
}
@Test
public void testChangedAndMoved() throws IOException {
describe("verify open(PathHandle, changed(*), moved(*))");
assumeSupportsFileReference();
assumeSupportsContentCheck();
HandleOpt.Data data = HandleOpt.getOpt(HandleOpt.Data.class, opts)
.orElseThrow(IllegalArgumentException::new);
HandleOpt.Location loc = HandleOpt.getOpt(HandleOpt.Location.class, opts)
.orElseThrow(IllegalArgumentException::new);
FileStatus stat = testFile(B1);
Path dst = path(stat.getPath() + "2");
ContractTestUtils.rename(getFileSystem(), stat.getPath(), dst);
appendFile(getFileSystem(), dst, B2);
PathHandle fd = getHandleOrSkip(stat);
byte[] b12 = Arrays.copyOf(B1, B1.length + B2.length);
System.arraycopy(B2, 0, b12, B1.length, B2.length);
try (FSDataInputStream in = getFileSystem().open(fd)) {
assertTrue("Failed to detect location change", loc.allowChange());
assertTrue("Failed to detect content change", data.allowChange());
verifyRead(in, b12, 0, b12.length);
} catch (InvalidPathHandleException e) {
if (data.allowChange()) {
assertFalse("Failed to allow location change", loc.allowChange());
}
if (loc.allowChange()) {
assertFalse("Failed to allow content change", data.allowChange());
}
}
}
private FileStatus testFile(byte[] content) throws IOException {
Path path = path(methodName.getMethodName());
createFile(getFileSystem(), path, false, content);
FileStatus stat = getFileSystem().getFileStatus(path);
assertNotNull(stat);
assertEquals(path, stat.getPath());
return stat;
}
/**
* Skip a test case if the FS doesn't support file references.
* The feature is assumed to be unsupported unless stated otherwise.
*/
protected void assumeSupportsFileReference() throws IOException {
if (getContract().isSupported(SUPPORTS_FILE_REFERENCE, false)) {
return;
}
skip("Skipping as unsupported feature: " + SUPPORTS_FILE_REFERENCE);
}
/**
* Skip a test case if the FS doesn't support content validation.
* The feature is assumed to be unsupported unless stated otherwise.
*/
protected void assumeSupportsContentCheck() throws IOException {
if (getContract().isSupported(SUPPORTS_CONTENT_CHECK, false)) {
return;
}
skip("Skipping as unsupported feature: " + SUPPORTS_CONTENT_CHECK);
}
/**
* Utility method to obtain a handle or skip the test if the set of opts
* are not supported.
* @param stat Target file status
* @return Handle to the indicated entity or skip the test
*/
protected PathHandle getHandleOrSkip(FileStatus stat) {
try {
PathHandle fd = getFileSystem().getPathHandle(stat, opts);
if (serialized) {
ByteBuffer sb = fd.bytes();
return new RawPathHandle(sb);
}
return fd;
} catch (UnsupportedOperationException e) {
skip("FileSystem does not support " + Arrays.toString(opts));
}
// unreachable
return null;
}
}

View File

@ -66,6 +66,9 @@ public final class HdfsPathHandle implements PathHandle {
public void verify(HdfsLocatedFileStatus stat)
throws InvalidPathHandleException {
if (null == stat) {
throw new InvalidPathHandleException("Could not resolve handle");
}
if (mtime != null && mtime != stat.getModificationTime()) {
throw new InvalidPathHandleException("Content changed");
}

View File

@ -0,0 +1,55 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.contract.hdfs;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Options;
import org.apache.hadoop.fs.contract.AbstractContractPathHandleTest;
import org.apache.hadoop.fs.contract.AbstractFSContract;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import java.io.IOException;
/**
* Verify HDFS compliance with {@link org.apache.hadoop.fs.PathHandle}
* semantics.
*/
public class TestHDFSContractPathHandle
extends AbstractContractPathHandleTest {
public TestHDFSContractPathHandle(String testname, Options.HandleOpt[] opts,
boolean serialized) {
super(testname, opts, serialized);
}
@BeforeClass
public static void createCluster() throws IOException {
HDFSContract.createCluster();
}
@AfterClass
public static void teardownCluster() throws IOException {
HDFSContract.destroyCluster();
}
@Override
protected AbstractFSContract createContract(Configuration conf) {
return new HDFSContract(conf);
}
}