HADOOP-15297. Make S3A etag => checksum feature optional.
Contributed by Steve Loughran.
This commit is contained in:
parent
e1f5251f3c
commit
dd05871b8b
|
@ -1547,6 +1547,17 @@
|
|||
</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>fs.s3a.etag.checksum.enabled</name>
|
||||
<value>false</value>
|
||||
<description>
|
||||
Should calls to getFileChecksum() return the etag value of the remote
|
||||
object.
|
||||
WARNING: if enabled, distcp operations between HDFS and S3 will fail unless
|
||||
-skipcrccheck is set.
|
||||
</description>
|
||||
</property>
|
||||
|
||||
<!-- Azure file system properties -->
|
||||
<property>
|
||||
<name>fs.wasb.impl</name>
|
||||
|
|
|
@ -542,4 +542,15 @@ public final class Constants {
|
|||
*/
|
||||
public static final String RETRY_THROTTLE_INTERVAL_DEFAULT = "500ms";
|
||||
|
||||
/**
|
||||
* Should etags be exposed as checksums?
|
||||
*/
|
||||
public static final String ETAG_CHECKSUM_ENABLED =
|
||||
"fs.s3a.etag.checksum.enabled";
|
||||
|
||||
/**
|
||||
* Default value: false.
|
||||
*/
|
||||
public static final boolean ETAG_CHECKSUM_ENABLED_DEFAULT = false;
|
||||
|
||||
}
|
||||
|
|
|
@ -2993,17 +2993,21 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities {
|
|||
}
|
||||
|
||||
/**
|
||||
* Get the etag of a object at the path via HEAD request and return it
|
||||
* as a checksum object. This has the whatever guarantees about equivalence
|
||||
* the S3 implementation offers.
|
||||
* When enabled, get the etag of a object at the path via HEAD request and
|
||||
* return it as a checksum object.
|
||||
* <ol>
|
||||
* <li>If a tag has not changed, consider the object unchanged.</li>
|
||||
* <li>Two tags being different does not imply the data is different.</li>
|
||||
* </ol>
|
||||
* Different S3 implementations may offer different guarantees.
|
||||
*
|
||||
* This check is (currently) only made if
|
||||
* {@link Constants#ETAG_CHECKSUM_ENABLED} is set; turning it on
|
||||
* has caused problems with Distcp (HADOOP-15273).
|
||||
*
|
||||
* @param f The file path
|
||||
* @param length The length of the file range for checksum calculation
|
||||
* @return The EtagChecksum or null if checksums are not supported.
|
||||
* @return The EtagChecksum or null if checksums are not enabled or supported.
|
||||
* @throws IOException IO failure
|
||||
* @see <a href="http://docs.aws.amazon.com/AmazonS3/latest/API/RESTCommonResponseHeaders.html">Common Response Headers</a>
|
||||
*/
|
||||
|
@ -3012,15 +3016,23 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities {
|
|||
public EtagChecksum getFileChecksum(Path f, final long length)
|
||||
throws IOException {
|
||||
Preconditions.checkArgument(length >= 0);
|
||||
Path path = qualify(f);
|
||||
LOG.debug("getFileChecksum({})", path);
|
||||
return once("getFileChecksum", path.toString(),
|
||||
() -> {
|
||||
// this always does a full HEAD to the object
|
||||
ObjectMetadata headers = getObjectMetadata(path);
|
||||
String eTag = headers.getETag();
|
||||
return eTag != null ? new EtagChecksum(eTag) : null;
|
||||
});
|
||||
entryPoint(INVOCATION_GET_FILE_CHECKSUM);
|
||||
|
||||
if (getConf().getBoolean(ETAG_CHECKSUM_ENABLED,
|
||||
ETAG_CHECKSUM_ENABLED_DEFAULT)) {
|
||||
Path path = qualify(f);
|
||||
LOG.debug("getFileChecksum({})", path);
|
||||
return once("getFileChecksum", path.toString(),
|
||||
() -> {
|
||||
// this always does a full HEAD to the object
|
||||
ObjectMetadata headers = getObjectMetadata(path);
|
||||
String eTag = headers.getETag();
|
||||
return eTag != null ? new EtagChecksum(eTag) : null;
|
||||
});
|
||||
} else {
|
||||
// disabled
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -139,6 +139,7 @@ public class S3AInstrumentation implements Closeable, MetricsSource {
|
|||
INVOCATION_CREATE_NON_RECURSIVE,
|
||||
INVOCATION_DELETE,
|
||||
INVOCATION_EXISTS,
|
||||
INVOCATION_GET_FILE_CHECKSUM,
|
||||
INVOCATION_GET_FILE_STATUS,
|
||||
INVOCATION_GLOB_STATUS,
|
||||
INVOCATION_IS_DIRECTORY,
|
||||
|
|
|
@ -57,6 +57,8 @@ public enum Statistic {
|
|||
"Calls of delete()"),
|
||||
INVOCATION_EXISTS(CommonStatisticNames.OP_EXISTS,
|
||||
"Calls of exists()"),
|
||||
INVOCATION_GET_FILE_CHECKSUM(CommonStatisticNames.OP_GET_FILE_CHECKSUM,
|
||||
"Calls of getFileChecksum()"),
|
||||
INVOCATION_GET_FILE_STATUS(CommonStatisticNames.OP_GET_FILE_STATUS,
|
||||
"Calls of getFileStatus()"),
|
||||
INVOCATION_GLOB_STATUS(CommonStatisticNames.OP_GLOB_STATUS,
|
||||
|
|
|
@ -1656,11 +1656,48 @@ in these metrics.
|
|||
|
||||
##<a name="further_reading"></a> Other Topics
|
||||
|
||||
### Copying Data with distcp
|
||||
### <a name="distcp"></a> Copying Data with distcp
|
||||
|
||||
Hadoop's `distcp` application can be used to copy data between a Hadoop
|
||||
Hadoop's `distcp` tool is often used to copy data between a Hadoop
|
||||
cluster and Amazon S3.
|
||||
See [Copying Data Between a Cluster and Amazon S3](https://hortonworks.github.io/hdp-aws/s3-copy-data/index.html)
|
||||
for details on S3 copying specifically.
|
||||
|
||||
The `distcp update` command tries to do incremental updates of data.
|
||||
It is straightforward to verify when files do not match when they are of
|
||||
different length, but not when they are the same size.
|
||||
|
||||
Distcp addresses this by comparing file checksums on the source and destination
|
||||
filesystems, which it tries to do *even if the filesystems have incompatible
|
||||
checksum algorithms*.
|
||||
|
||||
The S3A connector can provide the HTTP etag header to the caller as the
|
||||
checksum of the uploaded file. Doing so will break distcp operations
|
||||
between hdfs and s3a.
|
||||
|
||||
For this reason, the etag-as-checksum feature is disabled by default.
|
||||
|
||||
```xml
|
||||
<property>
|
||||
<name>fs.s3a.etag.checksum.enabled</name>
|
||||
<value>false</value>
|
||||
<description>
|
||||
Should calls to getFileChecksum() return the etag value of the remote
|
||||
object.
|
||||
WARNING: if enabled, distcp operations between HDFS and S3 will fail unless
|
||||
-skipcrccheck is set.
|
||||
</description>
|
||||
</property>
|
||||
```
|
||||
|
||||
If enabled, `distcp` between two S3 buckets can use the checksum to compare
|
||||
objects. Their checksums should be identical if they were either each uploaded
|
||||
as a single file PUT, or, if in a multipart PUT, in blocks of the same size,
|
||||
as configured by the value `fs.s3a.multipart.size`.
|
||||
|
||||
To disable checksum verification in `distcp`, use the `-skipcrccheck` option:
|
||||
|
||||
```bash
|
||||
hadoop distcp -update -skipcrccheck /user/alice/datasets s3a://alice-backup/datasets
|
||||
```
|
||||
|
||||
|
|
|
@ -39,12 +39,30 @@ import static org.apache.hadoop.fs.contract.ContractTestUtils.touch;
|
|||
|
||||
/**
|
||||
* Tests of the S3A FileSystem which don't have a specific home and can share
|
||||
* a filesystem instance with others..
|
||||
* a filesystem instance with others.
|
||||
* Checksums are turned on unless explicitly disabled for a test case.
|
||||
*/
|
||||
public class ITestS3AMiscOperations extends AbstractS3ATestBase {
|
||||
|
||||
private static final byte[] HELLO = "hello".getBytes(StandardCharsets.UTF_8);
|
||||
|
||||
@Override
|
||||
public void setup() throws Exception {
|
||||
super.setup();
|
||||
// checksums are forced on.
|
||||
enableChecksums(true);
|
||||
}
|
||||
|
||||
/**
|
||||
* Turn checksums on.
|
||||
* Relies on the FS not caching the configuration option
|
||||
* @param enabled enabled flag.
|
||||
*/
|
||||
protected void enableChecksums(final boolean enabled) {
|
||||
getFileSystem().getConf().setBoolean(Constants.ETAG_CHECKSUM_ENABLED,
|
||||
enabled);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCreateNonRecursiveSuccess() throws IOException {
|
||||
Path shouldWork = path("nonrecursivenode");
|
||||
|
@ -124,12 +142,26 @@ public class ITestS3AMiscOperations extends AbstractS3ATestBase {
|
|||
Path file1 = touchFile("file1");
|
||||
EtagChecksum checksum1 = fs.getFileChecksum(file1, 0);
|
||||
LOG.info("Checksum for {}: {}", file1, checksum1);
|
||||
assertNotNull("file 1 checksum", checksum1);
|
||||
assertNotNull("Null file 1 checksum", checksum1);
|
||||
assertNotEquals("file 1 checksum", 0, checksum1.getLength());
|
||||
assertEquals("checksums", checksum1,
|
||||
fs.getFileChecksum(touchFile("file2"), 0));
|
||||
}
|
||||
|
||||
/**
|
||||
* Make sure that when checksums are disabled, the caller
|
||||
* gets null back.
|
||||
*/
|
||||
@Test
|
||||
public void testChecksumDisabled() throws Throwable {
|
||||
// checksums are forced off.
|
||||
enableChecksums(false);
|
||||
final S3AFileSystem fs = getFileSystem();
|
||||
Path file1 = touchFile("file1");
|
||||
EtagChecksum checksum1 = fs.getFileChecksum(file1, 0);
|
||||
assertNull("Checksums are being generated", checksum1);
|
||||
}
|
||||
|
||||
/**
|
||||
* Verify that different file contents have different
|
||||
* checksums, and that that they aren't the same as the empty file.
|
||||
|
@ -138,6 +170,7 @@ public class ITestS3AMiscOperations extends AbstractS3ATestBase {
|
|||
@Test
|
||||
public void testNonEmptyFileChecksums() throws Throwable {
|
||||
final S3AFileSystem fs = getFileSystem();
|
||||
|
||||
final Path file3 = mkFile("file3", HELLO);
|
||||
final EtagChecksum checksum1 = fs.getFileChecksum(file3, 0);
|
||||
assertNotNull("file 3 checksum", checksum1);
|
||||
|
@ -178,12 +211,20 @@ public class ITestS3AMiscOperations extends AbstractS3ATestBase {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testLengthPastEOF() throws Throwable {
|
||||
public void testNegativeLengthDisabledChecksum() throws Throwable {
|
||||
enableChecksums(false);
|
||||
LambdaTestUtils.intercept(IllegalArgumentException.class,
|
||||
() -> getFileSystem().getFileChecksum(mkFile("negative", HELLO), -1));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testChecksumLengthPastEOF() throws Throwable {
|
||||
enableChecksums(true);
|
||||
final S3AFileSystem fs = getFileSystem();
|
||||
Path f = mkFile("file5", HELLO);
|
||||
assertEquals(
|
||||
fs.getFileChecksum(f, HELLO.length),
|
||||
fs.getFileChecksum(f, HELLO.length * 2));
|
||||
EtagChecksum l = fs.getFileChecksum(f, HELLO.length);
|
||||
assertNotNull("Null checksum", l);
|
||||
assertEquals(l, fs.getFileChecksum(f, HELLO.length * 2));
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
Loading…
Reference in New Issue