diff --git a/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml b/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml index 6d6ed42eb8c..90743008298 100644 --- a/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml +++ b/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml @@ -1547,6 +1547,17 @@ + + fs.s3a.etag.checksum.enabled + false + + Should calls to getFileChecksum() return the etag value of the remote + object. + WARNING: if enabled, distcp operations between HDFS and S3 will fail unless + -skipcrccheck is set. + + + fs.wasb.impl diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java index faec784c606..4c958439b0c 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java @@ -542,4 +542,15 @@ public final class Constants { */ public static final String RETRY_THROTTLE_INTERVAL_DEFAULT = "500ms"; + /** + * Should etags be exposed as checksums? + */ + public static final String ETAG_CHECKSUM_ENABLED = + "fs.s3a.etag.checksum.enabled"; + + /** + * Default value: false. + */ + public static final boolean ETAG_CHECKSUM_ENABLED_DEFAULT = false; + } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java index eb65cfa6d1f..4b0c2088051 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java @@ -2993,17 +2993,21 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities { } /** - * Get the etag of a object at the path via HEAD request and return it - * as a checksum object. This has the whatever guarantees about equivalence - * the S3 implementation offers. + * When enabled, get the etag of a object at the path via HEAD request and + * return it as a checksum object. *
    *
  1. If a tag has not changed, consider the object unchanged.
  2. *
  3. Two tags being different does not imply the data is different.
  4. *
* Different S3 implementations may offer different guarantees. + * + * This check is (currently) only made if + * {@link Constants#ETAG_CHECKSUM_ENABLED} is set; turning it on + * has caused problems with Distcp (HADOOP-15273). + * * @param f The file path * @param length The length of the file range for checksum calculation - * @return The EtagChecksum or null if checksums are not supported. + * @return The EtagChecksum or null if checksums are not enabled or supported. * @throws IOException IO failure * @see Common Response Headers */ @@ -3012,15 +3016,23 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities { public EtagChecksum getFileChecksum(Path f, final long length) throws IOException { Preconditions.checkArgument(length >= 0); - Path path = qualify(f); - LOG.debug("getFileChecksum({})", path); - return once("getFileChecksum", path.toString(), - () -> { - // this always does a full HEAD to the object - ObjectMetadata headers = getObjectMetadata(path); - String eTag = headers.getETag(); - return eTag != null ? new EtagChecksum(eTag) : null; - }); + entryPoint(INVOCATION_GET_FILE_CHECKSUM); + + if (getConf().getBoolean(ETAG_CHECKSUM_ENABLED, + ETAG_CHECKSUM_ENABLED_DEFAULT)) { + Path path = qualify(f); + LOG.debug("getFileChecksum({})", path); + return once("getFileChecksum", path.toString(), + () -> { + // this always does a full HEAD to the object + ObjectMetadata headers = getObjectMetadata(path); + String eTag = headers.getETag(); + return eTag != null ? new EtagChecksum(eTag) : null; + }); + } else { + // disabled + return null; + } } /** diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java index b883455ec88..29ee0c53106 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java @@ -139,6 +139,7 @@ public class S3AInstrumentation implements Closeable, MetricsSource { INVOCATION_CREATE_NON_RECURSIVE, INVOCATION_DELETE, INVOCATION_EXISTS, + INVOCATION_GET_FILE_CHECKSUM, INVOCATION_GET_FILE_STATUS, INVOCATION_GLOB_STATUS, INVOCATION_IS_DIRECTORY, diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java index bb30f1f0c26..799a5869c90 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java @@ -57,6 +57,8 @@ public enum Statistic { "Calls of delete()"), INVOCATION_EXISTS(CommonStatisticNames.OP_EXISTS, "Calls of exists()"), + INVOCATION_GET_FILE_CHECKSUM(CommonStatisticNames.OP_GET_FILE_CHECKSUM, + "Calls of getFileChecksum()"), INVOCATION_GET_FILE_STATUS(CommonStatisticNames.OP_GET_FILE_STATUS, "Calls of getFileStatus()"), INVOCATION_GLOB_STATUS(CommonStatisticNames.OP_GLOB_STATUS, diff --git a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md index 88cab37eabb..1099d84118e 100644 --- a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md +++ b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md @@ -1656,11 +1656,48 @@ in these metrics. ## Other Topics -### Copying Data with distcp +### Copying Data with distcp -Hadoop's `distcp` application can be used to copy data between a Hadoop +Hadoop's `distcp` tool is often used to copy data between a Hadoop cluster and Amazon S3. See [Copying Data Between a Cluster and Amazon S3](https://hortonworks.github.io/hdp-aws/s3-copy-data/index.html) for details on S3 copying specifically. +The `distcp update` command tries to do incremental updates of data. +It is straightforward to verify when files do not match when they are of +different length, but not when they are the same size. + +Distcp addresses this by comparing file checksums on the source and destination +filesystems, which it tries to do *even if the filesystems have incompatible +checksum algorithms*. + +The S3A connector can provide the HTTP etag header to the caller as the +checksum of the uploaded file. Doing so will break distcp operations +between hdfs and s3a. + +For this reason, the etag-as-checksum feature is disabled by default. + +```xml + + fs.s3a.etag.checksum.enabled + false + + Should calls to getFileChecksum() return the etag value of the remote + object. + WARNING: if enabled, distcp operations between HDFS and S3 will fail unless + -skipcrccheck is set. + + +``` + +If enabled, `distcp` between two S3 buckets can use the checksum to compare +objects. Their checksums should be identical if they were either each uploaded +as a single file PUT, or, if in a multipart PUT, in blocks of the same size, +as configured by the value `fs.s3a.multipart.size`. + +To disable checksum verification in `distcp`, use the `-skipcrccheck` option: + +```bash +hadoop distcp -update -skipcrccheck /user/alice/datasets s3a://alice-backup/datasets +``` diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AMiscOperations.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AMiscOperations.java index 7a218763a02..fc8d872463c 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AMiscOperations.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AMiscOperations.java @@ -39,12 +39,30 @@ import static org.apache.hadoop.fs.contract.ContractTestUtils.touch; /** * Tests of the S3A FileSystem which don't have a specific home and can share - * a filesystem instance with others.. + * a filesystem instance with others. + * Checksums are turned on unless explicitly disabled for a test case. */ public class ITestS3AMiscOperations extends AbstractS3ATestBase { private static final byte[] HELLO = "hello".getBytes(StandardCharsets.UTF_8); + @Override + public void setup() throws Exception { + super.setup(); + // checksums are forced on. + enableChecksums(true); + } + + /** + * Turn checksums on. + * Relies on the FS not caching the configuration option + * @param enabled enabled flag. + */ + protected void enableChecksums(final boolean enabled) { + getFileSystem().getConf().setBoolean(Constants.ETAG_CHECKSUM_ENABLED, + enabled); + } + @Test public void testCreateNonRecursiveSuccess() throws IOException { Path shouldWork = path("nonrecursivenode"); @@ -124,12 +142,26 @@ public class ITestS3AMiscOperations extends AbstractS3ATestBase { Path file1 = touchFile("file1"); EtagChecksum checksum1 = fs.getFileChecksum(file1, 0); LOG.info("Checksum for {}: {}", file1, checksum1); - assertNotNull("file 1 checksum", checksum1); + assertNotNull("Null file 1 checksum", checksum1); assertNotEquals("file 1 checksum", 0, checksum1.getLength()); assertEquals("checksums", checksum1, fs.getFileChecksum(touchFile("file2"), 0)); } + /** + * Make sure that when checksums are disabled, the caller + * gets null back. + */ + @Test + public void testChecksumDisabled() throws Throwable { + // checksums are forced off. + enableChecksums(false); + final S3AFileSystem fs = getFileSystem(); + Path file1 = touchFile("file1"); + EtagChecksum checksum1 = fs.getFileChecksum(file1, 0); + assertNull("Checksums are being generated", checksum1); + } + /** * Verify that different file contents have different * checksums, and that that they aren't the same as the empty file. @@ -138,6 +170,7 @@ public class ITestS3AMiscOperations extends AbstractS3ATestBase { @Test public void testNonEmptyFileChecksums() throws Throwable { final S3AFileSystem fs = getFileSystem(); + final Path file3 = mkFile("file3", HELLO); final EtagChecksum checksum1 = fs.getFileChecksum(file3, 0); assertNotNull("file 3 checksum", checksum1); @@ -178,12 +211,20 @@ public class ITestS3AMiscOperations extends AbstractS3ATestBase { } @Test - public void testLengthPastEOF() throws Throwable { + public void testNegativeLengthDisabledChecksum() throws Throwable { + enableChecksums(false); + LambdaTestUtils.intercept(IllegalArgumentException.class, + () -> getFileSystem().getFileChecksum(mkFile("negative", HELLO), -1)); + } + + @Test + public void testChecksumLengthPastEOF() throws Throwable { + enableChecksums(true); final S3AFileSystem fs = getFileSystem(); Path f = mkFile("file5", HELLO); - assertEquals( - fs.getFileChecksum(f, HELLO.length), - fs.getFileChecksum(f, HELLO.length * 2)); + EtagChecksum l = fs.getFileChecksum(f, HELLO.length); + assertNotNull("Null checksum", l); + assertEquals(l, fs.getFileChecksum(f, HELLO.length * 2)); } @Test