HDFS-7878. API - expose a unique file identifier.

(cherry picked from commit d015e0bbd5)
This commit is contained in:
Chris Douglas 2017-10-31 09:44:01 -07:00
parent 75605766fd
commit 1ce0eb7d23
20 changed files with 1102 additions and 40 deletions

View File

@ -50,6 +50,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.GlobalStorageStatistics.StorageStatisticsProvider;
import org.apache.hadoop.fs.Options.ChecksumOpt;
import org.apache.hadoop.fs.Options.HandleOpt;
import org.apache.hadoop.fs.Options.Rename;
import org.apache.hadoop.fs.permission.AclEntry;
import org.apache.hadoop.fs.permission.AclStatus;
@ -950,6 +951,51 @@ public abstract class FileSystem extends Configured implements Closeable {
IO_FILE_BUFFER_SIZE_DEFAULT));
}
/**
* Open an FSDataInputStream matching the PathHandle instance. The
* implementation may encode metadata in PathHandle to address the
* resource directly and verify that the resource referenced
* satisfies constraints specified at its construciton.
* @param fd PathHandle object returned by the FS authority.
* @param bufferSize the size of the buffer to use
* @throws IOException IO failure
* @throws UnsupportedOperationException If not overridden by subclass
*/
public FSDataInputStream open(PathHandle fd, int bufferSize)
throws IOException {
throw new UnsupportedOperationException();
}
/**
* Create a durable, serializable handle to the referent of the given
* entity.
* @param stat Referent in the target FileSystem
* @param opt If absent, assume {@link HandleOpt#path()}.
* @throws IllegalArgumentException If the FileStatus does not belong to
* this FileSystem
* @throws UnsupportedOperationException If
* {@link #createPathHandle(FileStatus, HandleOpt[])}
* not overridden by subclass.
* @throws UnsupportedOperationException If this FileSystem cannot enforce
* the specified constraints.
*/
public final PathHandle getPathHandle(FileStatus stat, HandleOpt... opt) {
if (null == opt || 0 == opt.length) {
return createPathHandle(stat, HandleOpt.path());
}
return createPathHandle(stat, opt);
}
/**
* Hook to implement support for {@link PathHandle} operations.
* @param stat Referent in the target FileSystem
* @param opt Constraints that determine the validity of the
* {@link PathHandle} reference.
*/
protected PathHandle createPathHandle(FileStatus stat, HandleOpt... opt) {
throw new UnsupportedOperationException();
}
/**
* Create an FSDataOutputStream at the indicated Path.
* Files are overwritten by default.

View File

@ -34,6 +34,7 @@ import org.apache.hadoop.fs.permission.AclStatus;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.fs.Options.ChecksumOpt;
import org.apache.hadoop.fs.Options.HandleOpt;
import org.apache.hadoop.fs.Options.Rename;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.util.Progressable;
@ -163,6 +164,17 @@ public class FilterFileSystem extends FileSystem {
return fs.open(f, bufferSize);
}
@Override
public FSDataInputStream open(PathHandle fd, int bufferSize)
throws IOException {
return fs.open(fd, bufferSize);
}
@Override
protected PathHandle createPathHandle(FileStatus stat, HandleOpt... opts) {
return fs.getPathHandle(stat, opts);
}
@Override
public FSDataOutputStream append(Path f, int bufferSize,
Progressable progress) throws IOException {

View File

@ -19,6 +19,7 @@ package org.apache.hadoop.fs;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.fs.Options.HandleOpt;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.util.LineReader;
@ -699,6 +700,19 @@ public class HarFileSystem extends FileSystem {
hstatus.getStartIndex(), hstatus.getLength(), bufferSize);
}
@Override
protected PathHandle createPathHandle(FileStatus stat, HandleOpt... opts) {
// har consistency managed through metadata cache
// could extend HarMetaData to track more explicitly
throw new UnsupportedOperationException();
}
@Override
public FSDataInputStream open(PathHandle fd, int bufferSize)
throws IOException {
throw new UnsupportedOperationException();
}
/**
* Used for delegation token related functionality. Must delegate to
* underlying file system.

View File

@ -17,6 +17,10 @@
*/
package org.apache.hadoop.fs;
import java.util.Optional;
import java.util.function.Function;
import java.util.function.BiFunction;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.permission.FsPermission;
@ -325,4 +329,180 @@ public final class Options {
return processChecksumOpt(defaultOpt, userOpt, -1);
}
}
/**
* Options for creating {@link PathHandle} references.
*/
public static class HandleOpt {
protected HandleOpt() {
}
/**
* Utility function for mapping
* {@link FileSystem#getPathHandle(FileStatus, HandleOpt[])} to a
* fixed set of handle options.
* @param fs Target filesystem
* @param opt Options to bind in partially evaluated function
* @return Function reference with options fixed
*/
public static Function<FileStatus, PathHandle> resolve(
FileSystem fs, HandleOpt... opt) {
return resolve(fs::getPathHandle, opt);
}
/**
* Utility function for partial evaluation of {@link FileStatus}
* instances to a fixed set of handle options.
* @param fsr Function reference
* @param opt Options to associate with {@link FileStatus} instances to
* produce {@link PathHandle} instances.
* @return Function reference with options fixed
*/
public static Function<FileStatus, PathHandle> resolve(
BiFunction<FileStatus, HandleOpt[], PathHandle> fsr,
HandleOpt... opt) {
return (stat) -> fsr.apply(stat, opt);
}
/**
* Handle is valid iff the referent is neither moved nor changed.
* Equivalent to changed(false), moved(false).
* @return Options requiring that the content and location of the entity
* be unchanged between calls.
*/
public static HandleOpt[] exact() {
return new HandleOpt[] {changed(false), moved(false) };
}
/**
* Handle is valid iff the content of the referent is the same.
* Equivalent to changed(false), moved(true).
* @return Options requiring that the content of the entity is unchanged,
* but it may be at a different location.
*/
public static HandleOpt[] content() {
return new HandleOpt[] {changed(false), moved(true) };
}
/**
* Handle is valid iff the referent is unmoved in the namespace.
* Equivalent to changed(true), moved(false).
* @return Options requiring that the referent exist in the same location,
* but its content may have changed.
*/
public static HandleOpt[] path() {
return new HandleOpt[] {changed(true), moved(false) };
}
/**
* Handle is valid iff the referent exists in the namespace.
* Equivalent to changed(true), moved(true).
* @return Options requiring that the implementation resolve a reference
* to this entity regardless of changes to content or location.
*/
public static HandleOpt[] reference() {
return new HandleOpt[] {changed(true), moved(true) };
}
/**
* @param allow If true, resolve references to this entity even if it has
* been modified.
* @return Handle option encoding parameter.
*/
public static Data changed(boolean allow) {
return new Data(allow);
}
/**
* @param allow If true, resolve references to this entity anywhere in
* the namespace.
* @return Handle option encoding parameter.
*/
public static Location moved(boolean allow) {
return new Location(allow);
}
/**
* Utility method to extract a HandleOpt from the set provided.
* @param c Target class
* @param opt List of options
* @param <T> Type constraint for exact match
* @throws IllegalArgumentException If more than one matching type is found.
* @return An option assignable from the specified type or null if either
* opt is null or a suitable match is not found.
*/
public static <T extends HandleOpt> Optional<T> getOpt(
Class<T> c, HandleOpt... opt) {
if (null == opt) {
return Optional.empty();
}
T ret = null;
for (HandleOpt o : opt) {
if (c.isAssignableFrom(o.getClass())) {
if (ret != null) {
throw new IllegalArgumentException("Duplicate option "
+ c.getSimpleName());
}
@SuppressWarnings("unchecked")
T tmp = (T) o;
ret = tmp;
}
}
return Optional.ofNullable(ret);
}
/**
* Option storing standard constraints on data.
*/
public static class Data extends HandleOpt {
private final boolean allowChanged;
Data(boolean allowChanged) {
this.allowChanged = allowChanged;
}
/**
* Tracks whether any changes to file content are permitted.
* @return True if content changes are allowed, false otherwise.
*/
public boolean allowChange() {
return allowChanged;
}
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append("data(allowChange=")
.append(allowChanged).append(")");
return sb.toString();
}
}
/**
* Option storing standard constraints on location.
*/
public static class Location extends HandleOpt {
private final boolean allowChanged;
Location(boolean allowChanged) {
this.allowChanged = allowChanged;
}
/**
* Tracks whether any changes to file location are permitted.
* @return True if relocation in the namespace is allowed, false
* otherwise.
*/
public boolean allowChange() {
return allowChanged;
}
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append("loc(allowChange=")
.append(allowChanged).append(")");
return sb.toString();
}
}
}
}

View File

@ -0,0 +1,50 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs;
import java.io.Serializable;
import java.nio.ByteBuffer;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
/**
* Opaque, serializable reference to an entity in the FileSystem. May contain
* metadata sufficient to resolve or verify subsequent accesses indepedent of
* other modifications to the FileSystem.
*/
@InterfaceAudience.Public
@InterfaceStability.Evolving
public interface PathHandle extends Serializable {
/**
* @return Serialized from in bytes.
*/
default byte[] toByteArray() {
ByteBuffer bb = bytes();
byte[] ret = new byte[bb.remaining()];
bb.get(ret);
return ret;
}
ByteBuffer bytes();
@Override
boolean equals(Object other);
}

View File

@ -0,0 +1,119 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import java.io.IOException;
import java.io.InvalidObjectException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.ObjectStreamException;
import java.nio.ByteBuffer;
/**
* Generic format of FileStatus objects. When the origin is unknown, the
* attributes of the handle are undefined.
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
public final class RawPathHandle implements PathHandle {
private static final long serialVersionUID = 0x12ba4689510L;
public static final int MAX_SIZE = 1 << 20;
private transient ByteBuffer fd;
/**
* Store a reference to the given bytes as the serialized form.
* @param fd serialized bytes
*/
public RawPathHandle(ByteBuffer fd) {
this.fd = null == fd
? ByteBuffer.allocate(0)
: fd.asReadOnlyBuffer();
}
/**
* Initialize using a copy of bytes from the serialized handle.
* @param handle PathHandle to preserve in serialized form.
*/
public RawPathHandle(PathHandle handle) {
ByteBuffer hb = null == handle
? ByteBuffer.allocate(0)
: handle.bytes();
fd = ByteBuffer.allocate(hb.remaining());
fd.put(hb);
fd.flip();
}
@Override
public ByteBuffer bytes() {
return fd.asReadOnlyBuffer();
}
@Override
public boolean equals(Object other) {
if (!(other instanceof PathHandle)) {
return false;
}
PathHandle o = (PathHandle) other;
return bytes().equals(o.bytes());
}
@Override
public int hashCode() {
return bytes().hashCode();
}
@Override
public String toString() {
return bytes().toString();
}
private void writeObject(ObjectOutputStream out) throws IOException {
out.defaultWriteObject();
out.writeInt(fd.remaining());
if (fd.hasArray()) {
out.write(fd.array(), fd.position(), fd.remaining());
} else {
byte[] x = new byte[fd.remaining()];
fd.slice().get(x);
out.write(x);
}
}
private void readObject(ObjectInputStream in)
throws IOException, ClassNotFoundException {
in.defaultReadObject();
int len = in.readInt();
if (len < 0 || len > MAX_SIZE) {
throw new IOException("Illegal buffer length " + len);
}
byte[] x = new byte[len];
in.readFully(x);
fd = ByteBuffer.wrap(x);
}
private void readObjectNoData() throws ObjectStreamException {
throw new InvalidObjectException("Stream data required");
}
}

View File

@ -702,6 +702,121 @@ symbolic links
exists in the metadata, but no copies of any its blocks can be located;
-`FileNotFoundException` would seem more accurate and useful.
### `PathHandle getPathHandle(FileStatus stat, HandleOpt... options)`
Implementaions without a compliant call MUST throw `UnsupportedOperationException`
#### Preconditions
let stat = getFileStatus(Path p)
let FS' where:
(FS.Directories', FS.Files', FS.Symlinks')
p' in paths(FS') where:
exists(FS, stat.path) implies exists(FS', p')
The referent of a `FileStatus` instance, at the time it was resolved, is the
same referent as the result of `getPathHandle(FileStatus)`. The `PathHandle`
may be used in subsequent operations to ensure invariants hold between
calls.
The `options` parameter specifies whether a subsequent call e.g.,
`open(PathHandle)` will succeed if the referent data or location changed. By
default, any modification results in an error. The caller MAY specify
relaxations that allow operations to succeed even if the referent exists at
a different path and/or its data are changed.
An implementation MUST throw `UnsupportedOperationException` if it cannot
support the semantics specified by the caller. The default set of options
are as follows.
| | Unmoved | Moved |
|-----------:|:--------:|:---------:|
| Unchanged | EXACT | CONTENT |
| Changed | PATH | REFERENCE |
Changes to ownership, extended attributes, and other metadata are not
required to match the `PathHandle`. Implementations can extend the set of
`HandleOpt` parameters with custom constraints.
##### Examples
A client specifies that the `PathHandle` should track the entity across
renames using `REFERENCE`. The implementation MUST throw an
`UnsupportedOperationException` when creating the `PathHandle` unless
failure to resolve the reference implies the entity no longer exists.
A client specifies that the `PathHandle` should resolve only if the entity
is unchanged using `PATH`. The implementation MUST throw an
`UnsupportedOperationException` when creating the `PathHandle` unless it can
distinguish between an identical entity located subsequently at the same
path.
#### Postconditions
result = PathHandle(p')
#### Implementation notes
The referent of a `PathHandle` is the namespace when the `FileStatus`
instance was created, _not_ its state when the `PathHandle` is created. An
implementation MAY reject attempts to create or resolve `PathHandle`
instances that are valid, but expensive to service.
Object stores that implement rename by copying objects MUST NOT claim to
support `CONTENT` and `REFERENCE` unless the lineage of the object is
resolved.
It MUST be possible to serialize a `PathHandle` instance and reinstantiate
it in one or more processes, on another machine, and arbitrarily far into
the future without changing its semantics. The implementation MUST refuse to
resolve instances if it can no longer guarantee its invariants.
#### HDFS implementation notes
HDFS does not support `PathHandle` references to directories or symlinks.
Support for `CONTENT` and `REFERENCE` looks up files by INode. INodes are
not unique across NameNodes, so federated clusters SHOULD include enough
metadata in the `PathHandle` to detect references from other namespaces.
### `FSDataInputStream open(PathHandle handle, int bufferSize)`
Implementaions without a compliant call MUST throw `UnsupportedOperationException`
#### Preconditions
let fd = getPathHandle(FileStatus stat)
if stat.isdir : raise IOException
let FS' where:
(FS.Directories', FS.Files', FS.Symlinks')
p' in FS.Files' where:
FS.Files'[p'] = fd
if not exists(FS', p') : raise FileNotFoundException
The implementation MUST resolve the referent of the `PathHandle` following
the constraints specified at its creation by `getPathHandle(FileStatus)`.
Metadata necessary for the `FileSystem` to satisfy this contract MAY be
encoded in the `PathHandle`.
#### Postconditions
result = FSDataInputStream(0, FS.Files'[p'])
The stream returned is subject to the constraints of a stream returned by
`open(Path)`. Constraints checked on open MAY hold to hold for the stream, but
this is not guaranteed.
For example, a `PathHandle` created with `CONTENT` constraints MAY return a
stream that ignores updates to the file after it is opened, if it was
unmodified when `open(PathHandle)` was resolved.
#### Implementation notes
An implementation MAY check invariants either at the server or before
returning the stream to the client. For example, an implementation may open
the file, then verify the invariants in the `PathHandle` using
`getFileStatus(Path)` to implement `CONTENT`. This could yield false
positives and it requires additional RPC traffic.
### `boolean delete(Path p, boolean recursive)`

View File

@ -15,29 +15,38 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.contract;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Arrays;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Options.HandleOpt;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathHandle;
import org.apache.hadoop.fs.RawPathHandle;
import org.apache.hadoop.io.IOUtils;
import org.junit.Test;
import java.io.FileNotFoundException;
import java.io.IOException;
import static org.apache.hadoop.fs.contract.ContractTestUtils.appendFile;
import static org.apache.hadoop.fs.contract.ContractTestUtils.createFile;
import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset;
import static org.apache.hadoop.fs.contract.ContractTestUtils.rm;
import static org.apache.hadoop.fs.contract.ContractTestUtils.skip;
import static org.apache.hadoop.fs.contract.ContractTestUtils.touch;
import static org.apache.hadoop.fs.contract.ContractTestUtils.verifyRead;
import static org.apache.hadoop.fs.contract.ContractTestUtils.verifyFileContents;
import org.junit.Test;
/**
* Test Seek operations
*/
public abstract class AbstractContractOpenTest extends AbstractFSContractTestBase {
public abstract class AbstractContractOpenTest
extends AbstractFSContractTestBase {
private FSDataInputStream instream;
@ -163,5 +172,229 @@ public abstract class AbstractContractOpenTest extends AbstractFSContractTestBas
instream.close();
}
/**
* Skip a test case if the FS doesn't support file references.
* The feature is assumed to be unsupported unless stated otherwise.
*/
protected void assumeSupportsFileReference() throws IOException {
if (getContract().isSupported(SUPPORTS_FILE_REFERENCE, false)) {
return;
}
skip("Skipping as unsupported feature: " + SUPPORTS_FILE_REFERENCE);
}
/**
* Skip a test case if the FS doesn't support content validation.
* The feature is assumed to be unsupported unless stated otherwise.
*/
protected void assumeSupportsContentCheck() throws IOException {
if (getContract().isSupported(SUPPORTS_CONTENT_CHECK, false)) {
return;
}
skip("Skipping as unsupported feature: " + SUPPORTS_CONTENT_CHECK);
}
private PathHandle getHandleOrSkip(FileStatus stat, HandleOpt... opts) {
try {
return getFileSystem().getPathHandle(stat, opts);
} catch (UnsupportedOperationException e) {
skip("FileSystem does not support " + Arrays.toString(opts));
}
// unreachable
return null;
}
/**
* Verify {@link HandleOpt#exact()} handle semantics.
* @throws Throwable on error
*/
@Test
public void testOpenFileByExact() throws Throwable {
describe("verify open(getPathHandle(FileStatus, exact())) operations" +
"detect changes");
assumeSupportsContentCheck();
assumeSupportsFileReference();
Path path1 = path("testopenfilebyexact1");
Path path2 = path("testopenfilebyexact2");
byte[] file1 = dataset(TEST_FILE_LEN, 43, 255);
createFile(getFileSystem(), path1, false, file1);
FileStatus stat = getFileSystem().getFileStatus(path1);
assertNotNull(stat);
assertEquals(path1, stat.getPath());
ContractTestUtils.rename(getFileSystem(), path1, path2);
// create identical file at same location, orig still exists at path2
createFile(getFileSystem(), path1, false, file1);
PathHandle fd = getHandleOrSkip(stat, HandleOpt.exact());
// verify path1, path2 contents identical
verifyFileContents(getFileSystem(), path1, file1);
verifyFileContents(getFileSystem(), path2, file1);
try {
// the PathHandle will not resolve, even though
// the original entity exists, it has not been modified, and an
// identical file exists at the old path. The handle would also
// fail to resolve if path1 had been modified
instream = getFileSystem().open(fd, 1 << 15);
fail("Expected an exception");
} catch (IOException e) {
// expected
}
}
/**
* Verify {@link HandleOpt#content()} handle semantics.
* @throws Throwable on error
*/
@Test
public void testOpenFileByContent() throws Throwable {
describe("verify open(getPathHandle(FileStatus, content())) operations" +
"follow relocation");
assumeSupportsContentCheck();
assumeSupportsFileReference();
Path path1 = path("testopenfilebycontent1");
Path path2 = path("testopenfilebycontent2");
byte[] file1 = dataset(TEST_FILE_LEN, 43, 255);
createFile(getFileSystem(), path1, false, file1);
FileStatus stat = getFileSystem().getFileStatus(path1);
assertNotNull(stat);
assertEquals(path1, stat.getPath());
// rename the file after obtaining FileStatus
ContractTestUtils.rename(getFileSystem(), path1, path2);
// obtain handle to entity from #getFileStatus call
PathHandle fd = getHandleOrSkip(stat, HandleOpt.content());
try (FSDataInputStream in = getFileSystem().open(fd, 1 << 15)) {
// verify read of consistent content at new location
verifyRead(in, file1, 0, TEST_FILE_LEN);
}
// modify the file at its new location by appending data
byte[] file1a = dataset(TEST_FILE_LEN, 44, 255);
appendFile(getFileSystem(), path2, file1a);
byte[] file1x = Arrays.copyOf(file1, file1.length + file1a.length);
System.arraycopy(file1a, 0, file1x, file1.length, file1a.length);
// verify fd entity contains contents of file1 + appended bytes
verifyFileContents(getFileSystem(), path2, file1x);
try {
// handle should not resolve when content changed
instream = getFileSystem().open(fd, 1 << 15);
fail("Failed to detect change to content");
} catch (IOException e) {
// expected
}
}
/**
* Verify {@link HandleOpt#path()} handle semantics.
* @throws Throwable on error
*/
@Test
public void testOpenFileByPath() throws Throwable {
describe("verify open(getPathHandle(FileStatus, path())) operations" +
"detect changes");
assumeSupportsContentCheck();
Path path1 = path("testopenfilebypath1");
Path path2 = path("testopenfilebypath2");
byte[] file1 = dataset(TEST_FILE_LEN, 43, 255);
createFile(getFileSystem(), path1, false, file1);
FileStatus stat = getFileSystem().getFileStatus(path1);
assertNotNull(stat);
assertEquals(path1, stat.getPath());
ContractTestUtils.rename(getFileSystem(), path1, path2);
// create identical file at same location, orig still exists at path2
createFile(getFileSystem(), path1, false, file1);
PathHandle fd = getHandleOrSkip(stat, HandleOpt.path());
// verify path1, path2 contents identical
verifyFileContents(getFileSystem(), path1, file1);
verifyFileContents(getFileSystem(), path2, file1);
try {
// verify attempt to resolve the handle fails
instream = getFileSystem().open(fd, 1 << 15);
fail("Expected an exception");
} catch (IOException e) {
// expected
}
}
/**
* Verify {@link HandleOpt#reference()} handle semantics.
* @throws Throwable on error
*/
@Test
public void testOpenFileByReference() throws Throwable {
describe("verify open(getPathHandle(FileStatus, reference())) operations" +
" are independent of rename");
assumeSupportsFileReference();
Path path1 = path("testopenfilebyref1");
Path path2 = path("testopenfilebyref2");
byte[] file1 = dataset(TEST_FILE_LEN, 43, 255);
createFile(getFileSystem(), path1, false, file1);
FileStatus stat = getFileSystem().getFileStatus(path1);
assertNotNull(stat);
assertEquals(path1, stat.getPath());
ContractTestUtils.rename(getFileSystem(), path1, path2);
byte[] file2 = dataset(TEST_FILE_LEN, 44, 255);
createFile(getFileSystem(), path1, false, file2);
byte[] file1a = dataset(TEST_FILE_LEN, 42, 255);
appendFile(getFileSystem(), path2, file1a);
byte[] file1x = Arrays.copyOf(file1, file1.length + file1a.length);
System.arraycopy(file1a, 0, file1x, file1.length, file1a.length);
PathHandle fd = getHandleOrSkip(stat, HandleOpt.reference());
// verify path2 contains contents of file1 + appended bytes
verifyFileContents(getFileSystem(), path2, file1x);
// verify path1 contents contents of file2
verifyFileContents(getFileSystem(), path1, file2);
// verify fd contains contents of file1 + appended bytes
instream = getFileSystem().open(fd, 1 << 15);
verifyRead(instream, file1x, 0, TEST_FILE_LEN);
}
/**
* Verify {@link PathHandle} may be serialized and restored.
* @throws Throwable on error
*/
@Test
public void testOpenFileBySerializedReference() throws Throwable {
describe("verify PathHandle supports generic serialization");
assumeSupportsFileReference();
Path path1 = path("testopenfilebyref1");
Path path2 = path("testopenfilebyref2");
byte[] file1 = dataset(TEST_FILE_LEN, 43, 255);
createFile(getFileSystem(), path1, false, file1);
FileStatus stat = getFileSystem().getFileStatus(path1);
assertNotNull(stat);
assertEquals(path1, stat.getPath());
ContractTestUtils.rename(getFileSystem(), path1, path2);
byte[] file2 = dataset(TEST_FILE_LEN, 44, 255);
createFile(getFileSystem(), path1, false, file2);
PathHandle fd = getHandleOrSkip(stat, HandleOpt.reference());
// serialize PathHandle
ByteBuffer sb = fd.bytes();
PathHandle fdb = new RawPathHandle(sb);
instream = getFileSystem().open(fdb, 1 << 15);
// verify stat contains contents of file1
verifyRead(instream, file1, 0, TEST_FILE_LEN);
// verify path2 contains contents of file1
verifyFileContents(getFileSystem(), path2, file1);
// verify path1 contents contents of file2
verifyFileContents(getFileSystem(), path1, file2);
}
}

View File

@ -191,6 +191,16 @@ public interface ContractOptions {
*/
String SUPPORTS_POSITIONED_READABLE = "supports-positioned-readable";
/**
* Indicates that FS exposes durable references to files.
*/
String SUPPORTS_FILE_REFERENCE = "supports-file-reference";
/**
* Indicates that FS supports content checks on open.
*/
String SUPPORTS_CONTENT_CHECK = "supports-content-check";
/**
* Maximum path length
* {@value}

View File

@ -402,6 +402,21 @@ public class ContractTestUtils extends Assert {
}
/**
* Rename operation. Safety check for attempts to rename the root directory.
* Verifies that src no longer exists after rename.
* @param fileSystem filesystem to work with
* @param src source path
* @param dst destination path
* @throws IOException If rename fails or src is the root directory.
*/
public static void rename(FileSystem fileSystem, Path src, Path dst)
throws IOException {
rejectRootOperation(src, false);
assertTrue(fileSystem.rename(src, dst));
assertPathDoesNotExist(fileSystem, "renamed", src);
}
/**
* Block any operation on the root path. This is a safety check
* @param path path in the filesystem
@ -622,6 +637,23 @@ public class ContractTestUtils extends Assert {
}
}
/**
* Append to an existing file.
* @param fs filesystem
* @param path path to file
* @param data data to append. Can be null
* @throws IOException On any error
*/
public static void appendFile(FileSystem fs,
Path path,
byte[] data) throws IOException {
try (FSDataOutputStream stream = fs.appendFile(path).build()) {
if (data != null && data.length > 0) {
stream.write(data);
}
}
}
/**
* Touch a file.
* @param fs filesystem

View File

@ -860,4 +860,25 @@ public class DFSUtilClient {
}
return threadPoolExecutor;
}
private static final int INODE_PATH_MAX_LENGTH = 3 * Path.SEPARATOR.length()
+ HdfsConstants.DOT_RESERVED_STRING.length()
+ HdfsConstants.DOT_INODES_STRING.length()
+ (int)Math.ceil(Math.log10(Long.MAX_VALUE)) + 1;
/**
* Create the internal unique file path from HDFS file ID (inode ID). Unlike
* a regular file path, this one is guaranteed to refer to the same file at
* all times, across overwrites, etc.
* @param fileId File ID.
* @return The internal ID-based path.
*/
public static Path makePathFromFileId(long fileId) {
StringBuilder sb = new StringBuilder(INODE_PATH_MAX_LENGTH);
sb.append(Path.SEPARATOR).append(HdfsConstants.DOT_RESERVED_STRING)
.append(Path.SEPARATOR).append(HdfsConstants.DOT_INODES_STRING)
.append(Path.SEPARATOR).append(fileId);
return new Path(sb.toString());
}
}

View File

@ -44,9 +44,11 @@ import org.apache.hadoop.fs.FsServerDefaults;
import org.apache.hadoop.fs.FsStatus;
import org.apache.hadoop.fs.GlobalStorageStatistics;
import org.apache.hadoop.fs.GlobalStorageStatistics.StorageStatisticsProvider;
import org.apache.hadoop.fs.PathHandle;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Options;
import org.apache.hadoop.fs.Options.ChecksumOpt;
import org.apache.hadoop.fs.Options.HandleOpt;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.fs.QuotaUsage;
@ -81,6 +83,7 @@ import org.apache.hadoop.hdfs.protocol.HdfsConstants.ReencryptAction;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.RollingUpgradeAction;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.protocol.HdfsPathHandle;
import org.apache.hadoop.hdfs.protocol.HdfsLocatedFileStatus;
import org.apache.hadoop.hdfs.protocol.OpenFileEntry;
import org.apache.hadoop.hdfs.protocol.ZoneReencryptionStatus;
@ -105,6 +108,7 @@ import java.util.Collection;
import java.util.EnumSet;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
/****************************************************************
* Implementation of the abstract FileSystem for the DFS system.
@ -319,6 +323,56 @@ public class DistributedFileSystem extends FileSystem {
}.resolve(this, absF);
}
/**
* Opens an FSDataInputStream with the indicated file ID extracted from
* the {@link PathHandle}.
* @param fd Reference to entity in this FileSystem.
* @param bufferSize the size of the buffer to be used.
*/
@Override
public FSDataInputStream open(PathHandle fd, int bufferSize)
throws IOException {
if (!(fd instanceof HdfsPathHandle)) {
fd = new HdfsPathHandle(fd.bytes());
}
HdfsPathHandle id = (HdfsPathHandle) fd;
return open(DFSUtilClient.makePathFromFileId(id.getInodeId()), bufferSize);
}
/**
* Create a handle to an HDFS file.
* @param st HdfsFileStatus instance from NameNode
* @param opts Standard handle arguments
* @throws IllegalArgumentException If the FileStatus instance refers to a
* directory, symlink, or another namesystem.
* @throws UnsupportedOperationException If opts are not specified or both
* data and location are not allowed to change.
* @return A handle to the file.
*/
@Override
protected PathHandle createPathHandle(FileStatus st, HandleOpt... opts) {
if (!(st instanceof HdfsFileStatus)) {
throw new IllegalArgumentException("Invalid FileStatus "
+ st.getClass().getSimpleName());
}
if (st.isDirectory() || st.isSymlink()) {
throw new IllegalArgumentException("PathHandle only available for files");
}
if (!getUri().getAuthority().equals(st.getPath().toUri().getAuthority())) {
throw new IllegalArgumentException("Wrong FileSystem: " + st.getPath());
}
HandleOpt.Data data = HandleOpt.getOpt(HandleOpt.Data.class, opts)
.orElse(HandleOpt.changed(false));
HandleOpt.Location loc = HandleOpt.getOpt(HandleOpt.Location.class, opts)
.orElse(HandleOpt.moved(false));
if (!data.allowChange() || !loc.allowChange()) {
throw new UnsupportedOperationException("Unsupported opts "
+ Arrays.stream(opts)
.map(HandleOpt::toString).collect(Collectors.joining(",")));
}
return new HdfsPathHandle((HdfsFileStatus)st);
}
@Override
public FSDataOutputStream append(Path f, final int bufferSize,
final Progressable progress) throws IOException {

View File

@ -58,6 +58,10 @@ public final class HdfsConstants {
= Path.SEPARATOR + DOT_SNAPSHOT_DIR;
public static final String SEPARATOR_DOT_SNAPSHOT_DIR_SEPARATOR
= Path.SEPARATOR + DOT_SNAPSHOT_DIR + Path.SEPARATOR;
public final static String DOT_RESERVED_STRING = ".reserved";
public final static String DOT_RESERVED_PATH_PREFIX = Path.SEPARATOR
+ DOT_RESERVED_STRING;
public final static String DOT_INODES_STRING = ".inodes";
/**
* Generation stamp of blocks that pre-date the introduction

View File

@ -0,0 +1,98 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.protocol;
import java.io.IOException;
import java.nio.ByteBuffer;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.PathHandle;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.HdfsPathHandleProto;
import com.google.protobuf.ByteString;
/**
* Opaque handle to an entity in HDFS.
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
public final class HdfsPathHandle implements PathHandle {
private static final long serialVersionUID = 0xc5308795428L;
private final long inodeId;
public HdfsPathHandle(HdfsFileStatus hstat) {
this(hstat.getFileId());
}
public HdfsPathHandle(long inodeId) {
this.inodeId = inodeId;
}
public HdfsPathHandle(ByteBuffer bytes) throws IOException {
if (null == bytes) {
throw new IOException("Missing PathHandle");
}
HdfsPathHandleProto p =
HdfsPathHandleProto.parseFrom(ByteString.copyFrom(bytes));
inodeId = p.getInodeId();
}
public long getInodeId() {
return inodeId;
}
@Override
public ByteBuffer bytes() {
return HdfsPathHandleProto.newBuilder()
.setInodeId(getInodeId())
.build()
.toByteString()
.asReadOnlyByteBuffer();
}
@Override
public boolean equals(Object other) {
if (null == other) {
return false;
}
if (!HdfsPathHandle.class.equals(other.getClass())) {
// require exact match
return false;
}
HdfsPathHandle o = (HdfsPathHandle)other;
return getInodeId() == o.getInodeId();
}
@Override
public int hashCode() {
return Long.hashCode(inodeId);
}
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append("{ ");
sb.append("inodeId : ").append(Long.toString(getInodeId()));
sb.append(" }");
return sb.toString();
}
}

View File

@ -82,13 +82,14 @@ import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicyState;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.FsPermissionExtension;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.ReencryptAction;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.RollingUpgradeAction;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.protocol.HdfsLocatedFileStatus;
import org.apache.hadoop.hdfs.protocol.HdfsPathHandle;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
@ -159,6 +160,7 @@ import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ErasureCodingPolicyProto
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.FsServerDefaultsProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.HdfsFileStatusProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.HdfsFileStatusProto.FileType;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.HdfsPathHandleProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.LocatedBlockProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.LocatedBlockProto.Builder;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.LocatedBlocksProto;
@ -1561,6 +1563,19 @@ public class PBHelperClient {
return FsPermissionProto.newBuilder().setPerm(p.toShort()).build();
}
public static HdfsPathHandle convert(HdfsPathHandleProto fd) {
if (null == fd) {
return null;
}
return new HdfsPathHandle(fd.getInodeId());
}
public static HdfsPathHandleProto convert(HdfsPathHandle fd) {
return HdfsPathHandleProto.newBuilder()
.setInodeId(fd.getInodeId())
.build();
}
public static HdfsFileStatus convert(HdfsFileStatusProto fs) {
if (fs == null) {
return null;

View File

@ -396,6 +396,13 @@ message AddErasureCodingPolicyResponseProto {
optional string errorMsg = 3;
}
/**
* Placeholder type for consistent HDFS operations.
*/
message HdfsPathHandleProto {
optional uint64 inodeId = 1;
}
/**
* Status of a file, directory or symlink
* Optionally includes a file's block locations if requested by client on the rpc call.

View File

@ -122,14 +122,16 @@ public class FSDirectory implements Closeable {
@VisibleForTesting
static boolean CHECK_RESERVED_FILE_NAMES = true;
public final static String DOT_RESERVED_STRING = ".reserved";
public final static String DOT_RESERVED_PATH_PREFIX = Path.SEPARATOR
+ DOT_RESERVED_STRING;
public final static String DOT_RESERVED_STRING =
HdfsConstants.DOT_RESERVED_STRING;
public final static String DOT_RESERVED_PATH_PREFIX =
HdfsConstants.DOT_RESERVED_PATH_PREFIX;
public final static byte[] DOT_RESERVED =
DFSUtil.string2Bytes(DOT_RESERVED_STRING);
private final static String RAW_STRING = "raw";
private final static byte[] RAW = DFSUtil.string2Bytes(RAW_STRING);
public final static String DOT_INODES_STRING = ".inodes";
public final static String DOT_INODES_STRING =
HdfsConstants.DOT_INODES_STRING;
public final static byte[] DOT_INODES =
DFSUtil.string2Bytes(DOT_INODES_STRING);
private final static byte[] DOT_DOT =

View File

@ -54,6 +54,7 @@ import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Random;
import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.conf.Configuration;
@ -1049,4 +1050,19 @@ public class TestDFSUtil {
DFSUtilClient.isHDFSEncryptionEnabled(conf));
}
@Test
public void testFileIdPath() throws Throwable {
// /.reserved/.inodes/
String prefix = Path.SEPARATOR + HdfsConstants.DOT_RESERVED_STRING +
Path.SEPARATOR + HdfsConstants.DOT_INODES_STRING +
Path.SEPARATOR;
Random r = new Random();
for (int i = 0; i < 100; ++i) {
long inode = r.nextLong() & Long.MAX_VALUE;
assertEquals(new Path(prefix + inode),
DFSUtilClient.makePathFromFileId(inode));
}
}
}

View File

@ -17,6 +17,10 @@
*/
package org.apache.hadoop.hdfs;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.net.URI;
import org.apache.hadoop.fs.FSProtos.FileStatusProto;
@ -33,7 +37,7 @@ import org.apache.hadoop.io.DataOutputBuffer;
import com.google.protobuf.ByteString;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.*;
/**
* Verify compatible FileStatus/HdfsFileStatus serialization.
@ -53,6 +57,27 @@ public class TestFileStatusSerialization {
assertEquals(expected.getBlockSize(), actual.getBlockSize());
}
private static final URI BASEURI = new Path("hdfs://foobar").toUri();
private static final Path BASEPATH = new Path("/dingos");
private static final String FILE = "zot";
private static final Path FULLPATH = new Path("hdfs://foobar/dingos/zot");
private static HdfsFileStatusProto.Builder baseStatus() {
FsPermission perm = FsPermission.getFileDefault();
HdfsFileStatusProto.Builder hspb = HdfsFileStatusProto.newBuilder()
.setFileType(FileType.IS_FILE)
.setPath(ByteString.copyFromUtf8("zot"))
.setLength(4344)
.setPermission(PBHelperClient.convert(perm))
.setOwner("hadoop")
.setGroup("unqbbc")
.setModificationTime(12345678L)
.setAccessTime(87654321L)
.setBlockReplication(10)
.setBlocksize(1L << 33)
.setFlags(0);
return hspb;
}
/**
* Test API backwards-compatibility with 2.x applications w.r.t. FsPermission.
*/
@ -65,21 +90,12 @@ public class TestFileStatusSerialization {
// test verifies.
for (int i = 0; i < flagmask; ++i) {
FsPermission perm = FsPermission.createImmutable((short) 0013);
HdfsFileStatusProto.Builder hspb = HdfsFileStatusProto.newBuilder()
.setFileType(FileType.IS_FILE)
.setPath(ByteString.copyFromUtf8("hdfs://foobar/dingos/zot"))
.setLength(4344)
HdfsFileStatusProto.Builder hspb = baseStatus()
.setPermission(PBHelperClient.convert(perm))
.setOwner("hadoop")
.setGroup("unqbbc")
.setModificationTime(12345678L)
.setAccessTime(87654321L)
.setBlockReplication(10)
.setBlocksize(1L << 33)
.setFlags(i);
HdfsFileStatus stat = PBHelperClient.convert(hspb.build());
stat.makeQualified(new URI("hdfs://foobar"), new Path("/dingos"));
assertEquals(new Path("hdfs://foobar/dingos/zot"), stat.getPath());
stat.makeQualified(BASEURI, BASEPATH);
assertEquals(FULLPATH, stat.getPath());
// verify deprecated FsPermissionExtension methods
FsPermission sp = stat.getPermission();
@ -103,23 +119,29 @@ public class TestFileStatusSerialization {
assertEquals(sp.getErasureCodedBit(), fstat.isErasureCoded());
}
}
// param for LocatedFileStatus, HttpFileStatus
@Test
public void testJavaSerialization() throws Exception {
HdfsFileStatusProto hsp = baseStatus().build();
HdfsFileStatus hs = PBHelperClient.convert(hsp);
hs.makeQualified(BASEURI, BASEPATH);
ByteArrayOutputStream baos = new ByteArrayOutputStream(1024);
try (ObjectOutputStream oos = new ObjectOutputStream(baos)) {
oos.writeObject(hs);
}
ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray());
try (ObjectInputStream ois = new ObjectInputStream(bais)) {
FileStatus deser = (FileStatus) ois.readObject();
assertEquals(hs, deser);
checkFields(hs, deser);
}
}
@Test
public void testCrossSerializationProto() throws Exception {
FsPermission perm = FsPermission.getFileDefault();
for (FileType t : FileType.values()) {
HdfsFileStatusProto.Builder hspb = HdfsFileStatusProto.newBuilder()
.setFileType(t)
.setPath(ByteString.copyFromUtf8("hdfs://foobar/dingos"))
.setLength(4344)
.setPermission(PBHelperClient.convert(perm))
.setOwner("hadoop")
.setGroup("unqbbc")
.setModificationTime(12345678L)
.setAccessTime(87654321L)
.setBlockReplication(10)
.setBlocksize(1L << 33);
HdfsFileStatusProto.Builder hspb = baseStatus()
.setFileType(t);
if (FileType.IS_SYMLINK.equals(t)) {
hspb.setSymlink(ByteString.copyFromUtf8("hdfs://yaks/dingos"));
}
@ -146,7 +168,9 @@ public class TestFileStatusSerialization {
byte[] dst = fsp.toByteArray();
HdfsFileStatusProto hsp2 = HdfsFileStatusProto.parseFrom(dst);
assertEquals(hsp, hsp2);
checkFields(PBHelperClient.convert(hsp), PBHelperClient.convert(hsp2));
FileStatus hstat = PBHelperClient.convert(hsp);
FileStatus hstat2 = PBHelperClient.convert(hsp2);
checkFields(hstat, hstat2);
}
}

View File

@ -101,4 +101,14 @@
<value>true</value>
</property>
</configuration>
<property>
<name>fs.contract.supports-file-reference</name>
<value>true</value>
</property>
<property>
<name>fs.contract.supports-content-check</name>
<value>false</value>
</property>
</configuration>