HADOOP-17979. Add Interface EtagSource to allow FileStatus subclasses to provide etags (#3633)
Contributed by Steve Loughran
This commit is contained in:
parent
e8566b3812
commit
98fe0d0fc3
|
@ -146,4 +146,22 @@ public final class CommonPathCapabilities {
|
|||
*/
|
||||
public static final String ABORTABLE_STREAM =
|
||||
"fs.capability.outputstream.abortable";
|
||||
|
||||
/**
|
||||
* Does this FS support etags?
|
||||
* That is: will FileStatus entries from listing/getFileStatus
|
||||
* probes support EtagSource and return real values.
|
||||
*/
|
||||
public static final String ETAGS_AVAILABLE =
|
||||
"fs.capability.etags.available";
|
||||
|
||||
/**
|
||||
* Are etags guaranteed to be preserved across rename() operations..
|
||||
* FileSystems MUST NOT declare support for this feature
|
||||
* unless this holds.
|
||||
*/
|
||||
public static final String ETAGS_PRESERVED_IN_RENAME =
|
||||
"fs.capability.etags.preserved.in.rename";
|
||||
|
||||
|
||||
}
|
||||
|
|
|
@ -0,0 +1,38 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.fs;
|
||||
|
||||
/**
|
||||
* An optional interface for {@link FileStatus} subclasses to implement
|
||||
* to provide access to etags.
|
||||
* If available FS SHOULD also implement the matching PathCapabilities
|
||||
* -- etag supported: {@link CommonPathCapabilities#ETAGS_AVAILABLE}.
|
||||
* -- etag consistent over rename:
|
||||
* {@link CommonPathCapabilities#ETAGS_PRESERVED_IN_RENAME}.
|
||||
*/
|
||||
public interface EtagSource {
|
||||
|
||||
/**
|
||||
* Return an etag of this file status.
|
||||
* A return value of null or "" means "no etag"
|
||||
* @return a possibly null or empty etag.
|
||||
*/
|
||||
String getEtag();
|
||||
|
||||
}
|
|
@ -1240,7 +1240,7 @@ Renaming a file where the destination is a directory moves the file as a child
|
|||
FS' where:
|
||||
not exists(FS', src)
|
||||
and exists(FS', dest)
|
||||
and data(FS', dest) == data (FS, dest)
|
||||
and data(FS', dest) == data (FS, source)
|
||||
result = True
|
||||
|
||||
|
||||
|
@ -1698,3 +1698,92 @@ in:readahead | READAHEAD | CanSetReadahead | Set the readahead on the input st
|
|||
dropbehind | DROPBEHIND | CanSetDropBehind | Drop the cache.
|
||||
in:unbuffer | UNBUFFER | CanUnbuffer | Reduce the buffering on the input stream.
|
||||
|
||||
## <a name="etagsource"></a> Etag probes through the interface `EtagSource`
|
||||
|
||||
FileSystem implementations MAY support querying HTTP etags from `FileStatus`
|
||||
entries. If so, the requirements are as follows
|
||||
|
||||
### Etag support MUST BE across all list/`getFileStatus()` calls.
|
||||
|
||||
That is: when adding etag support, all operations which return `FileStatus` or `ListLocatedStatus`
|
||||
entries MUST return subclasses which are instances of `EtagSource`.
|
||||
|
||||
### FileStatus instances MUST have etags whenever the remote store provides them.
|
||||
|
||||
To support etags, they MUST BE to be provided in both `getFileStatus()`
|
||||
and list calls.
|
||||
|
||||
Implementors note: the core APIs which MUST BE overridden to achieve this are as follows:
|
||||
|
||||
```java
|
||||
FileStatus getFileStatus(Path)
|
||||
FileStatus[] listStatus(Path)
|
||||
RemoteIterator<FileStatus> listStatusIterator(Path)
|
||||
RemoteIterator<LocatedFileStatus> listFiles([Path, boolean)
|
||||
```
|
||||
|
||||
|
||||
### Etags of files MUST BE Consistent across all list/getFileStatus operations.
|
||||
|
||||
The value of `EtagSource.getEtag()` MUST be the same for list* queries which return etags for calls of `getFileStatus()` for the specific object.
|
||||
|
||||
```java
|
||||
((EtagSource)getFileStatus(path)).getEtag() == ((EtagSource)listStatus(path)[0]).getEtag()
|
||||
```
|
||||
|
||||
Similarly, the same value MUST BE returned for `listFiles()`, `listStatusIncremental()` of the path and
|
||||
when listing the parent path, of all files in the listing.
|
||||
|
||||
### Etags MUST BE different for different file contents.
|
||||
|
||||
Two different arrays of data written to the same path MUST have different etag values when probed.
|
||||
This is a requirement of the HTTP specification.
|
||||
|
||||
### Etags of files SHOULD BE preserved across rename operations
|
||||
|
||||
After a file is renamed, the value of `((EtagSource)getFileStatus(dest)).getEtag()`
|
||||
SHOULD be the same as the value of `((EtagSource)getFileStatus(source)).getEtag()`
|
||||
was before the rename took place.
|
||||
|
||||
This is an implementation detail of the store; it does not hold for AWS S3.
|
||||
|
||||
If and only if the store consistently meets this requirement, the filesystem SHOULD
|
||||
declare in `hasPathCapability()` that it supports
|
||||
`fs.capability.etags.preserved.in.rename`
|
||||
|
||||
### Directories MAY have etags
|
||||
|
||||
Directory entries MAY return etags in listing/probe operations; these entries MAY be preserved across renames.
|
||||
|
||||
Equally, directory entries MAY NOT provide such entries, MAY NOT preserve them acrosss renames,
|
||||
and MAY NOT guarantee consistency over time.
|
||||
|
||||
Note: special mention of the root path "/".
|
||||
As that isn't a real "directory", nobody should expect it to have an etag.
|
||||
|
||||
### All etag-aware `FileStatus` subclass MUST BE `Serializable`; MAY BE `Writable`
|
||||
|
||||
The base `FileStatus` class implements `Serializable` and `Writable` and marshalls its fields appropriately.
|
||||
|
||||
Subclasses MUST support java serialization (Some Apache Spark applications use it), preserving the etag.
|
||||
This is a matter of making the etag field non-static and adding a `serialVersionUID`.
|
||||
|
||||
The `Writable` support was used for marshalling status data over Hadoop IPC calls;
|
||||
in Hadoop 3 that is implemented through `org/apache/hadoop/fs/protocolPB/PBHelper.java`and the methods deprecated.
|
||||
Subclasses MAY override the deprecated methods to add etag marshalling.
|
||||
However -but there is no expectation of this and such marshalling is unlikely to ever take place.
|
||||
|
||||
### Appropriate etag Path Capabilities SHOULD BE declared
|
||||
|
||||
1. `hasPathCapability(path, "fs.capability.etags.available")` MUST return true iff
|
||||
the filesystem returns valid (non-empty etags) on file status/listing operations.
|
||||
2. `hasPathCapability(path, "fs.capability.etags.consistent.across.rename")` MUST return
|
||||
true if and only if etags are preserved across renames.
|
||||
|
||||
### Non-requirements of etag support
|
||||
|
||||
* There is no requirement/expectation that `FileSystem.getFileChecksum(Path)` returns
|
||||
a checksum value related to the etag of an object, if any value is returned.
|
||||
* If the same data is uploaded to the twice to the same or a different path,
|
||||
the etag of the second upload MAY NOT match that of the first upload.
|
||||
|
||||
|
|
|
@ -0,0 +1,194 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.fs.contract;
|
||||
|
||||
import java.nio.charset.StandardCharsets;
|
||||
|
||||
import org.assertj.core.api.Assertions;
|
||||
import org.junit.Assume;
|
||||
import org.junit.Test;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import org.apache.hadoop.fs.EtagSource;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.LocatedFileStatus;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
|
||||
import static org.apache.hadoop.fs.CommonPathCapabilities.ETAGS_AVAILABLE;
|
||||
import static org.apache.hadoop.fs.CommonPathCapabilities.ETAGS_PRESERVED_IN_RENAME;
|
||||
|
||||
/**
|
||||
* For filesystems which support etags, validate correctness
|
||||
* of their implementation.
|
||||
*/
|
||||
public abstract class AbstractContractEtagTest extends
|
||||
AbstractFSContractTestBase {
|
||||
|
||||
private static final Logger LOG =
|
||||
LoggerFactory.getLogger(AbstractContractEtagTest.class);
|
||||
|
||||
/**
|
||||
* basic consistency across operations, as well as being non-empty.
|
||||
*/
|
||||
@Test
|
||||
public void testEtagConsistencyAcrossListAndHead() throws Throwable {
|
||||
describe("Etag values must be non-empty and consistent across LIST and HEAD Calls.");
|
||||
final Path path = methodPath();
|
||||
final FileSystem fs = getFileSystem();
|
||||
|
||||
Assertions.assertThat(fs.hasPathCapability(path, ETAGS_AVAILABLE))
|
||||
.describedAs("path capability %s of %s",
|
||||
ETAGS_AVAILABLE, path)
|
||||
.isTrue();
|
||||
|
||||
ContractTestUtils.touch(fs, path);
|
||||
|
||||
final FileStatus st = fs.getFileStatus(path);
|
||||
final String etag = etagFromStatus(st);
|
||||
LOG.info("etag of empty file is \"{}\"", etag);
|
||||
|
||||
final FileStatus[] statuses = fs.listStatus(path);
|
||||
Assertions.assertThat(statuses)
|
||||
.describedAs("List(%s)", path)
|
||||
.hasSize(1);
|
||||
final FileStatus lsStatus = statuses[0];
|
||||
Assertions.assertThat(etagFromStatus(lsStatus))
|
||||
.describedAs("etag of list status (%s) compared to HEAD value of %s", lsStatus, st)
|
||||
.isEqualTo(etag);
|
||||
}
|
||||
|
||||
/**
|
||||
* Get an etag from a FileStatus which MUST BE
|
||||
* an implementation of EtagSource and
|
||||
* whose etag MUST NOT BE null/empty.
|
||||
* @param st the status
|
||||
* @return the etag
|
||||
*/
|
||||
String etagFromStatus(FileStatus st) {
|
||||
Assertions.assertThat(st)
|
||||
.describedAs("FileStatus %s", st)
|
||||
.isInstanceOf(EtagSource.class);
|
||||
final String etag = ((EtagSource) st).getEtag();
|
||||
Assertions.assertThat(etag)
|
||||
.describedAs("Etag of %s", st)
|
||||
.isNotBlank();
|
||||
return etag;
|
||||
}
|
||||
|
||||
/**
|
||||
* Overwritten data has different etags.
|
||||
*/
|
||||
@Test
|
||||
public void testEtagsOfDifferentDataDifferent() throws Throwable {
|
||||
describe("Verify that two different blocks of data written have different tags");
|
||||
|
||||
final Path path = methodPath();
|
||||
final FileSystem fs = getFileSystem();
|
||||
Path src = new Path(path, "src");
|
||||
|
||||
ContractTestUtils.createFile(fs, src, true,
|
||||
"data1234".getBytes(StandardCharsets.UTF_8));
|
||||
final FileStatus srcStatus = fs.getFileStatus(src);
|
||||
final String srcTag = etagFromStatus(srcStatus);
|
||||
LOG.info("etag of file 1 is \"{}\"", srcTag);
|
||||
|
||||
// now overwrite with data of same length
|
||||
// (ensure that path or length aren't used exclusively as tag)
|
||||
ContractTestUtils.createFile(fs, src, true,
|
||||
"1234data".getBytes(StandardCharsets.UTF_8));
|
||||
|
||||
// validate
|
||||
final String tag2 = etagFromStatus(fs.getFileStatus(src));
|
||||
LOG.info("etag of file 2 is \"{}\"", tag2);
|
||||
|
||||
Assertions.assertThat(tag2)
|
||||
.describedAs("etag of updated file")
|
||||
.isNotEqualTo(srcTag);
|
||||
}
|
||||
|
||||
/**
|
||||
* If supported, rename preserves etags.
|
||||
*/
|
||||
@Test
|
||||
public void testEtagConsistencyAcrossRename() throws Throwable {
|
||||
describe("Verify that when a file is renamed, the etag remains unchanged");
|
||||
final Path path = methodPath();
|
||||
final FileSystem fs = getFileSystem();
|
||||
Assume.assumeTrue(
|
||||
"Filesystem does not declare that etags are preserved across renames",
|
||||
fs.hasPathCapability(path, ETAGS_PRESERVED_IN_RENAME));
|
||||
Path src = new Path(path, "src");
|
||||
Path dest = new Path(path, "dest");
|
||||
|
||||
ContractTestUtils.createFile(fs, src, true,
|
||||
"sample data".getBytes(StandardCharsets.UTF_8));
|
||||
final FileStatus srcStatus = fs.getFileStatus(src);
|
||||
LOG.info("located file status string value " + srcStatus);
|
||||
|
||||
final String srcTag = etagFromStatus(srcStatus);
|
||||
LOG.info("etag of short file is \"{}\"", srcTag);
|
||||
|
||||
Assertions.assertThat(srcTag)
|
||||
.describedAs("Etag of %s", srcStatus)
|
||||
.isNotBlank();
|
||||
|
||||
// rename
|
||||
fs.rename(src, dest);
|
||||
|
||||
// validate
|
||||
FileStatus destStatus = fs.getFileStatus(dest);
|
||||
final String destTag = etagFromStatus(destStatus);
|
||||
Assertions.assertThat(destTag)
|
||||
.describedAs("etag of list status (%s) compared to HEAD value of %s",
|
||||
destStatus, srcStatus)
|
||||
.isEqualTo(srcTag);
|
||||
}
|
||||
|
||||
/**
|
||||
* For effective use of etags, listLocatedStatus SHOULD return status entries
|
||||
* with consistent values.
|
||||
* This ensures that listing during query planning can collect and use the etags.
|
||||
*/
|
||||
@Test
|
||||
public void testLocatedStatusAlsoHasEtag() throws Throwable {
|
||||
describe("verify that listLocatedStatus() and listFiles() are etag sources");
|
||||
final Path path = methodPath();
|
||||
final FileSystem fs = getFileSystem();
|
||||
Path src = new Path(path, "src");
|
||||
ContractTestUtils.createFile(fs, src, true,
|
||||
"sample data".getBytes(StandardCharsets.UTF_8));
|
||||
final FileStatus srcStatus = fs.getFileStatus(src);
|
||||
final String srcTag = etagFromStatus(srcStatus);
|
||||
final LocatedFileStatus entry = fs.listLocatedStatus(path).next();
|
||||
LOG.info("located file status string value " + entry);
|
||||
final String listTag = etagFromStatus(entry);
|
||||
Assertions.assertThat(listTag)
|
||||
.describedAs("etag of listLocatedStatus (%s) compared to HEAD value of %s",
|
||||
entry, srcStatus)
|
||||
.isEqualTo(srcTag);
|
||||
|
||||
final LocatedFileStatus entry2 = fs.listFiles(path, false).next();
|
||||
Assertions.assertThat(etagFromStatus(entry2))
|
||||
.describedAs("etag of listFiles (%s) compared to HEAD value of %s",
|
||||
entry, srcStatus)
|
||||
.isEqualTo(srcTag);
|
||||
}
|
||||
}
|
|
@ -19,6 +19,7 @@ package org.apache.hadoop.fs.s3a;
|
|||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
import org.apache.hadoop.fs.EtagSource;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
|
||||
|
@ -30,7 +31,7 @@ import org.apache.hadoop.fs.Path;
|
|||
*/
|
||||
@InterfaceAudience.Private
|
||||
@InterfaceStability.Evolving
|
||||
public class S3AFileStatus extends FileStatus {
|
||||
public class S3AFileStatus extends FileStatus implements EtagSource {
|
||||
|
||||
private static final long serialVersionUID = -5955674081978903922L;
|
||||
|
||||
|
@ -166,8 +167,16 @@ public class S3AFileStatus extends FileStatus {
|
|||
|
||||
/**
|
||||
* @return the S3 object eTag when available, else null.
|
||||
* @deprecated use {@link EtagSource#getEtag()} for
|
||||
* public access.
|
||||
*/
|
||||
@Deprecated
|
||||
public String getETag() {
|
||||
return getEtag();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getEtag() {
|
||||
return eTag;
|
||||
}
|
||||
|
||||
|
|
|
@ -1617,7 +1617,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
|||
final S3AFileStatus fileStatus) {
|
||||
return createObjectAttributes(
|
||||
fileStatus.getPath(),
|
||||
fileStatus.getETag(),
|
||||
fileStatus.getEtag(),
|
||||
fileStatus.getVersionId(),
|
||||
fileStatus.getLen());
|
||||
}
|
||||
|
@ -5242,7 +5242,13 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
|||
case STORE_CAPABILITY_DIRECTORY_MARKER_AWARE:
|
||||
return true;
|
||||
|
||||
/*
|
||||
// etags are avaialable in listings, but they
|
||||
// are not consistent across renames.
|
||||
// therefore, only availability is declared
|
||||
case CommonPathCapabilities.ETAGS_AVAILABLE:
|
||||
return true;
|
||||
|
||||
/*
|
||||
* Marker policy capabilities are handed off.
|
||||
*/
|
||||
case STORE_CAPABILITY_DIRECTORY_MARKER_POLICY_KEEP:
|
||||
|
@ -5329,7 +5335,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
|||
changeDetectionPolicy, ra, auditSpan);
|
||||
|
||||
if (changeDetectionPolicy.getSource() != ChangeDetectionPolicy.Source.None
|
||||
&& fileStatus.getETag() != null) {
|
||||
&& fileStatus.getEtag() != null) {
|
||||
// if there is change detection, and the status includes at least an
|
||||
// etag,
|
||||
// check that the object metadata lines up with what is expected
|
||||
|
|
|
@ -19,6 +19,7 @@
|
|||
package org.apache.hadoop.fs.s3a;
|
||||
|
||||
import org.apache.hadoop.fs.BlockLocation;
|
||||
import org.apache.hadoop.fs.EtagSource;
|
||||
import org.apache.hadoop.fs.LocatedFileStatus;
|
||||
|
||||
import static org.apache.hadoop.util.Preconditions.checkNotNull;
|
||||
|
@ -26,7 +27,7 @@ import static org.apache.hadoop.util.Preconditions.checkNotNull;
|
|||
/**
|
||||
* {@link LocatedFileStatus} extended to also carry ETag and object version ID.
|
||||
*/
|
||||
public class S3ALocatedFileStatus extends LocatedFileStatus {
|
||||
public class S3ALocatedFileStatus extends LocatedFileStatus implements EtagSource {
|
||||
|
||||
private static final long serialVersionUID = 3597192103662929338L;
|
||||
|
||||
|
@ -37,12 +38,23 @@ public class S3ALocatedFileStatus extends LocatedFileStatus {
|
|||
|
||||
public S3ALocatedFileStatus(S3AFileStatus status, BlockLocation[] locations) {
|
||||
super(checkNotNull(status), locations);
|
||||
this.eTag = status.getETag();
|
||||
this.eTag = status.getEtag();
|
||||
this.versionId = status.getVersionId();
|
||||
isEmptyDirectory = status.isEmptyDirectory();
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the S3 object eTag when available, else null.
|
||||
* @deprecated use {@link EtagSource#getEtag()} for
|
||||
* public access.
|
||||
*/
|
||||
@Deprecated
|
||||
public String getETag() {
|
||||
return getEtag();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getEtag() {
|
||||
return eTag;
|
||||
}
|
||||
|
||||
|
@ -77,7 +89,7 @@ public class S3ALocatedFileStatus extends LocatedFileStatus {
|
|||
getModificationTime(),
|
||||
getBlockSize(),
|
||||
getOwner(),
|
||||
getETag(),
|
||||
getEtag(),
|
||||
getVersionId());
|
||||
}
|
||||
|
||||
|
|
|
@ -593,7 +593,7 @@ public class RenameOperation extends ExecutingStoreOperation<Long> {
|
|||
S3ObjectAttributes sourceAttributes =
|
||||
callbacks.createObjectAttributes(
|
||||
source.getPath(),
|
||||
source.getETag(),
|
||||
source.getEtag(),
|
||||
source.getVersionId(),
|
||||
source.getLen());
|
||||
// queue the copy operation for execution in the thread pool
|
||||
|
|
|
@ -0,0 +1,34 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.fs.contract.s3a;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.contract.AbstractContractEtagTest;
|
||||
import org.apache.hadoop.fs.contract.AbstractFSContract;
|
||||
|
||||
/**
|
||||
* Test S3A etag support.
|
||||
*/
|
||||
public class ITestS3AContractEtag extends AbstractContractEtagTest {
|
||||
|
||||
@Override
|
||||
protected AbstractFSContract createContract(Configuration conf) {
|
||||
return new S3AContract(conf);
|
||||
}
|
||||
}
|
|
@ -63,8 +63,10 @@ import org.apache.hadoop.fs.FSDataOutputStream;
|
|||
import org.apache.hadoop.fs.FileAlreadyExistsException;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.LocatedFileStatus;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.PathIOException;
|
||||
import org.apache.hadoop.fs.PathFilter;
|
||||
import org.apache.hadoop.fs.XAttrSetFlag;
|
||||
import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants;
|
||||
import org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations;
|
||||
|
@ -79,6 +81,7 @@ import org.apache.hadoop.fs.azurebfs.contracts.exceptions.SASTokenProviderExcept
|
|||
import org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode;
|
||||
import org.apache.hadoop.fs.azurebfs.security.AbfsDelegationTokenManager;
|
||||
import org.apache.hadoop.fs.azurebfs.services.AbfsCounters;
|
||||
import org.apache.hadoop.fs.azurebfs.services.AbfsLocatedFileStatus;
|
||||
import org.apache.hadoop.fs.azurebfs.utils.Listener;
|
||||
import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
|
||||
import org.apache.hadoop.fs.azurebfs.utils.TracingHeaderFormat;
|
||||
|
@ -109,6 +112,8 @@ import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.B
|
|||
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DATA_BLOCKS_BUFFER_DEFAULT;
|
||||
import static org.apache.hadoop.fs.impl.PathCapabilitiesSupport.validatePathCapabilityArgs;
|
||||
import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.logIOStatisticsAtLevel;
|
||||
import static org.apache.hadoop.util.functional.RemoteIterators.filteringRemoteIterator;
|
||||
import static org.apache.hadoop.util.functional.RemoteIterators.mappingRemoteIterator;
|
||||
|
||||
/**
|
||||
* A {@link org.apache.hadoop.fs.FileSystem} for reading and writing files stored on <a
|
||||
|
@ -1196,6 +1201,36 @@ public class AzureBlobFileSystem extends FileSystem
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Incremental listing of located status entries,
|
||||
* preserving etags.
|
||||
* @param path path to list
|
||||
* @param filter a path filter
|
||||
* @return iterator of results.
|
||||
* @throws FileNotFoundException source path not found.
|
||||
* @throws IOException other values.
|
||||
*/
|
||||
@Override
|
||||
protected RemoteIterator<LocatedFileStatus> listLocatedStatus(
|
||||
final Path path,
|
||||
final PathFilter filter)
|
||||
throws FileNotFoundException, IOException {
|
||||
|
||||
LOG.debug("AzureBlobFileSystem.listStatusIterator path : {}", path);
|
||||
// get a paged iterator over the source data, filtering out non-matching
|
||||
// entries.
|
||||
final RemoteIterator<FileStatus> sourceEntries = filteringRemoteIterator(
|
||||
listStatusIterator(path),
|
||||
(st) -> filter.accept(st.getPath()));
|
||||
// and then map that to a remote iterator of located file status
|
||||
// entries, propagating any etags.
|
||||
return mappingRemoteIterator(sourceEntries,
|
||||
st -> new AbfsLocatedFileStatus(st,
|
||||
st.isFile()
|
||||
? getFileBlockLocations(st, 0, st.getLen())
|
||||
: null));
|
||||
}
|
||||
|
||||
private FileStatus tryGetFileStatus(final Path f, TracingContext tracingContext) {
|
||||
try {
|
||||
return getFileStatus(f, tracingContext);
|
||||
|
@ -1498,6 +1533,8 @@ public class AzureBlobFileSystem extends FileSystem
|
|||
switch (validatePathCapabilityArgs(p, capability)) {
|
||||
case CommonPathCapabilities.FS_PERMISSIONS:
|
||||
case CommonPathCapabilities.FS_APPEND:
|
||||
case CommonPathCapabilities.ETAGS_AVAILABLE:
|
||||
case CommonPathCapabilities.ETAGS_PRESERVED_IN_RENAME:
|
||||
return true;
|
||||
case CommonPathCapabilities.FS_ACLS:
|
||||
return getIsNamespaceEnabled(
|
||||
|
|
|
@ -65,6 +65,7 @@ import org.slf4j.LoggerFactory;
|
|||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.EtagSource;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
|
@ -975,7 +976,7 @@ public class AzureBlobFileSystemStore implements Closeable, ListingSupport {
|
|||
final long blockSize = abfsConfiguration.getAzureBlockSize();
|
||||
final AbfsHttpOperation result = op.getResult();
|
||||
|
||||
final String eTag = result.getResponseHeader(HttpHeaderConfigurations.ETAG);
|
||||
String eTag = extractEtagHeader(result);
|
||||
final String lastModified = result.getResponseHeader(HttpHeaderConfigurations.LAST_MODIFIED);
|
||||
final String permissions = result.getResponseHeader((HttpHeaderConfigurations.X_MS_PERMISSIONS));
|
||||
final boolean hasAcl = AbfsPermission.isExtendedAcl(permissions);
|
||||
|
@ -1733,10 +1734,27 @@ public class AzureBlobFileSystemStore implements Closeable, ListingSupport {
|
|||
return new AbfsPerfInfo(abfsPerfTracker, callerName, calleeName);
|
||||
}
|
||||
|
||||
private static class VersionedFileStatus extends FileStatus {
|
||||
private final String version;
|
||||
/**
|
||||
* A File status with version info extracted from the etag value returned
|
||||
* in a LIST or HEAD request.
|
||||
* The etag is included in the java serialization.
|
||||
*/
|
||||
private static final class VersionedFileStatus extends FileStatus
|
||||
implements EtagSource {
|
||||
|
||||
VersionedFileStatus(
|
||||
/**
|
||||
* The superclass is declared serializable; this subclass can also
|
||||
* be serialized.
|
||||
*/
|
||||
private static final long serialVersionUID = -2009013240419749458L;
|
||||
|
||||
/**
|
||||
* The etag of an object.
|
||||
* Not-final so that serialization via reflection will preserve the value.
|
||||
*/
|
||||
private String version;
|
||||
|
||||
private VersionedFileStatus(
|
||||
final String owner, final String group, final FsPermission fsPermission, final boolean hasAcl,
|
||||
final long length, final boolean isdir, final int blockReplication,
|
||||
final long blocksize, final long modificationTime, final Path path,
|
||||
|
@ -1797,6 +1815,11 @@ public class AzureBlobFileSystemStore implements Closeable, ListingSupport {
|
|||
return this.version;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getEtag() {
|
||||
return getVersion();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
final StringBuilder sb = new StringBuilder(
|
||||
|
@ -1902,4 +1925,30 @@ public class AzureBlobFileSystemStore implements Closeable, ListingSupport {
|
|||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the etag header from a response, stripping any quotations.
|
||||
* see: https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/ETag
|
||||
* @param result response to process.
|
||||
* @return the quote-unwrapped etag.
|
||||
*/
|
||||
private static String extractEtagHeader(AbfsHttpOperation result) {
|
||||
String etag = result.getResponseHeader(HttpHeaderConfigurations.ETAG);
|
||||
if (etag != null) {
|
||||
// strip out any wrapper "" quotes which come back, for consistency with
|
||||
// list calls
|
||||
if (etag.startsWith("W/\"")) {
|
||||
// Weak etag
|
||||
etag = etag.substring(3);
|
||||
} else if (etag.startsWith("\"")) {
|
||||
// strong etag
|
||||
etag = etag.substring(1);
|
||||
}
|
||||
if (etag.endsWith("\"")) {
|
||||
// trailing quote
|
||||
etag = etag.substring(0, etag.length() - 1);
|
||||
}
|
||||
}
|
||||
return etag;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,73 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
* <p>
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* <p>
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.fs.azurebfs.services;
|
||||
|
||||
import org.apache.hadoop.fs.BlockLocation;
|
||||
import org.apache.hadoop.fs.EtagSource;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.LocatedFileStatus;
|
||||
|
||||
import static org.apache.hadoop.thirdparty.com.google.common.base.Preconditions.checkNotNull;
|
||||
|
||||
/**
|
||||
* {@link LocatedFileStatus} extended to also carry an ETag.
|
||||
*/
|
||||
public class AbfsLocatedFileStatus extends LocatedFileStatus implements EtagSource {
|
||||
|
||||
private static final long serialVersionUID = -8185960773314341594L;
|
||||
|
||||
/**
|
||||
* etag; may be null.
|
||||
*/
|
||||
private final String etag;
|
||||
|
||||
public AbfsLocatedFileStatus(FileStatus status, BlockLocation[] locations) {
|
||||
super(checkNotNull(status), locations);
|
||||
if (status instanceof EtagSource) {
|
||||
this.etag = ((EtagSource) status).getEtag();
|
||||
} else {
|
||||
this.etag = null;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getEtag() {
|
||||
return etag;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "AbfsLocatedFileStatus{"
|
||||
+ "etag='" + etag + '\'' + "} "
|
||||
+ super.toString();
|
||||
}
|
||||
// equals() and hashCode() overridden to avoid FindBugs warning.
|
||||
// Base implementation is equality on Path only, which is still appropriate.
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o) {
|
||||
return super.equals(o);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return super.hashCode();
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,57 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.fs.azurebfs.contract;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.contract.AbstractContractEtagTest;
|
||||
import org.apache.hadoop.fs.contract.AbstractFSContract;
|
||||
|
||||
/**
|
||||
* Contract test for etag support.
|
||||
*/
|
||||
public class ITestAbfsFileSystemContractEtag extends AbstractContractEtagTest {
|
||||
private final boolean isSecure;
|
||||
private final ABFSContractTestBinding binding;
|
||||
|
||||
public ITestAbfsFileSystemContractEtag() throws Exception {
|
||||
binding = new ABFSContractTestBinding();
|
||||
this.isSecure = binding.isSecureMode();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setup() throws Exception {
|
||||
binding.setup();
|
||||
super.setup();
|
||||
// Base rename contract test class re-uses the test folder
|
||||
// This leads to failures when the test is re-run as same ABFS test
|
||||
// containers are re-used for test run and creation of source and
|
||||
// destination test paths fail, as they are already present.
|
||||
binding.getFileSystem().delete(binding.getTestPath(), true);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Configuration createConfiguration() {
|
||||
return binding.getRawConfiguration();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected AbstractFSContract createContract(final Configuration conf) {
|
||||
return new AbfsFileSystemContract(conf, isSecure);
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue