result =
+ resilientCommitByRename.commitSingleFileByRename(
+ entry.getSourcePath(),
+ entry.getDestPath(),
+ entry.getEtag());
+ return CommitFileResult.fromResilientCommit(result.getLeft(),
+ result.getRight());
+ } else {
+ return super.commitFile(entry);
+ }
+ }
+}
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/commit/AzureManifestCommitterFactory.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/commit/AzureManifestCommitterFactory.java
new file mode 100644
index 00000000000..b760fa7a4ac
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/commit/AzureManifestCommitterFactory.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.commit;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitter;
+import org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterFactory;
+
+import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterConstants.OPT_STORE_OPERATIONS_CLASS;
+
+/**
+ * A Committer for the manifest committer which performs all bindings needed
+ * to work best with abfs.
+ * This includes, at a minimum, switching to the abfs-specific manifest store operations.
+ *
+ * This classname is referenced in configurations, so MUST NOT change.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public class AzureManifestCommitterFactory extends ManifestCommitterFactory {
+
+ /**
+ * Classname, which can be declared in job configurations.
+ */
+ public static final String NAME = ManifestCommitterFactory.class.getName();
+
+ @Override
+ public ManifestCommitter createOutputCommitter(final Path outputPath,
+ final TaskAttemptContext context) throws IOException {
+ final Configuration conf = context.getConfiguration();
+ // use ABFS Store operations
+ conf.set(OPT_STORE_OPERATIONS_CLASS,
+ AbfsManifestStoreOperations.NAME);
+ return super.createOutputCommitter(outputPath, context);
+ }
+}
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/commit/ResilientCommitByRename.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/commit/ResilientCommitByRename.java
new file mode 100644
index 00000000000..2e91392a661
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/commit/ResilientCommitByRename.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.commit;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.time.Duration;
+import javax.annotation.Nullable;
+
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathIOException;
+import org.apache.hadoop.fs.statistics.IOStatisticsSource;
+
+/**
+ * API exclusively for committing files.
+ *
+ * This is only for use by (@link {@link AbfsManifestStoreOperations},
+ * and is intended to be implemented by ABFS.
+ * To ensure that there is no need to add mapreduce JARs to the
+ * classpath just to work with ABFS, this interface
+ * MUST NOT refer to anything in the
+ * {@code org.apache.hadoop.mapreduce} package.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public interface ResilientCommitByRename extends IOStatisticsSource {
+
+ /**
+ * Rename source file to dest path *Exactly*; no subdirectory games here.
+ * if the method does not raise an exception,then
+ * the data at dest is the data which was at source.
+ *
+ * Requirements
+ *
+ *
+ * exists(FS, source) else raise FileNotFoundException
+ * source != dest else raise PathIOException
+ * not exists(FS, dest)
+ * isDir(FS, dest.getParent)
+ *
+ *
+ * - source != dest else raise PathIOException
+ * - source must exist else raise FileNotFoundException
+ * - source must exist and be a file
+ * - dest must not exist;
+ * - dest.getParent() must be a dir
+ * - if sourceEtag is non-empty, it MAY be used to qualify/validate the rename.
+ *
+ *
+ * The outcome of the operation is undefined if source is not a file, dest exists,
+ * dest.getParent() doesn't exist/is a file.
+ * That is: implementations SHOULD assume that the code calling this method has
+ * set up the destination directory tree and is only invoking this call on a file.
+ * Accordingly: implementations MAY skip validation checks
+ *
+ * Post Conditions on a successful operation:
+ *
+ * FS' where:
+ * not exists(FS', source)
+ * and exists(FS', dest)
+ * and data(FS', dest) == data (FS, source)
+ *
+ * This is exactly the same outcome as `FileSystem.rename()` when the same preconditions
+ * are met. This API call simply restricts the operation to file rename with strict
+ * conditions, (no need to be 'clever' about dest path calculation) and the ability
+ * to pass in etags, modtimes and file status values.
+ *
+ * @param source path to source file
+ * @param dest destination of rename.
+ * @param sourceEtag etag of source file. may be null or empty
+ * @return true if recovery was needed.
+ * @throws FileNotFoundException source file not found
+ * @throws PathIOException failure, including source and dest being the same path
+ * @throws IOException any other exception
+ */
+ Pair commitSingleFileByRename(
+ Path source,
+ Path dest,
+ @Nullable String sourceEtag) throws IOException;
+
+
+}
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/commit/package-info.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/commit/package-info.java
new file mode 100644
index 00000000000..3567377350d
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/commit/package-info.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Support for manifest committer.
+ * Unless otherwise stated: classes are private.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+package org.apache.hadoop.fs.azurebfs.commit;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
\ No newline at end of file
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java
index 12beb5a9bba..9d3b2d5e82c 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java
@@ -220,6 +220,9 @@ public final class ConfigurationKeys {
/** Key for enabling the tracking of ABFS API latency and sending the latency numbers to the ABFS API service */
public static final String FS_AZURE_ABFS_LATENCY_TRACK = "fs.azure.abfs.latency.track";
+ /** Key for rate limit capacity, as used by IO operations which try to throttle themselves. */
+ public static final String FS_AZURE_ABFS_IO_RATE_LIMIT = "fs.azure.io.rate.limit";
+
public static String accountProperty(String property, String account) {
return property + "." + account;
}
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java
index f58c61e8908..63d62a33b18 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java
@@ -133,5 +133,10 @@ public final class FileSystemConfigurations {
public static final String DATA_BLOCKS_BUFFER_DEFAULT =
DATA_BLOCKS_BUFFER_DISK;
+ /**
+ * IO rate limit. Value: {@value}
+ */
+ public static final int RATE_LIMIT_DEFAULT = 10_000;
+
private FileSystemConfigurations() {}
}
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java
index 69ef0d01c78..dacce9a335b 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java
@@ -51,6 +51,7 @@ import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ThreadFact
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants;
import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations;
import org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams;
@@ -67,6 +68,8 @@ import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.security.ssl.DelegatingSSLSocketFactory;
import org.apache.hadoop.util.concurrent.HadoopExecutors;
+import static org.apache.commons.lang3.StringUtils.isNotEmpty;
+import static org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore.extractEtagHeader;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.*;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_DELETE_CONSIDERED_IDEMPOTENT;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.SERVER_SIDE_ENCRYPTION_ALGORITHM;
@@ -478,8 +481,30 @@ public class AbfsClient implements Closeable {
return op;
}
- public AbfsRestOperation renamePath(String source, final String destination,
- final String continuation, TracingContext tracingContext)
+
+ /**
+ * Rename a file or directory.
+ * If a source etag is passed in, the operation will attempt to recover
+ * from a missing source file by probing the destination for
+ * existence and comparing etags.
+ * The second value in the result will be true to indicate that this
+ * took place.
+ * As rename recovery is only attempted if the source etag is non-empty,
+ * in normal rename operations rename recovery will never happen.
+ * @param source path to source file
+ * @param destination destination of rename.
+ * @param continuation continuation.
+ * @param tracingContext trace context
+ * @param sourceEtag etag of source file. may be null or empty
+ * @return pair of (the rename operation, flag indicating recovery took place)
+ * @throws AzureBlobFileSystemException failure, excluding any recovery from overload failures.
+ */
+ public Pair renamePath(
+ final String source,
+ final String destination,
+ final String continuation,
+ final TracingContext tracingContext,
+ final String sourceEtag)
throws AzureBlobFileSystemException {
final List requestHeaders = createDefaultHeaders();
@@ -505,9 +530,73 @@ public class AbfsClient implements Closeable {
HTTP_METHOD_PUT,
url,
requestHeaders);
- // no attempt at recovery using timestamps as it was not reliable.
- op.execute(tracingContext);
- return op;
+ try {
+ op.execute(tracingContext);
+ return Pair.of(op, false);
+ } catch (AzureBlobFileSystemException e) {
+ // If we have no HTTP response, throw the original exception.
+ if (!op.hasResult()) {
+ throw e;
+ }
+ boolean etagCheckSucceeded = renameIdempotencyCheckOp(
+ source,
+ sourceEtag, op, destination, tracingContext);
+ if (!etagCheckSucceeded) {
+ // idempotency did not return different result
+ // throw back the exception
+ throw e;
+ }
+ return Pair.of(op, true);
+ }
+ }
+
+ /**
+ * Check if the rename request failure is post a retry and if earlier rename
+ * request might have succeeded at back-end.
+ *
+ * If a source etag was passed in, and the error was 404, get the
+ * etag of any file at the destination.
+ * If it matches the source etag, then the rename is considered
+ * a success.
+ * Exceptions raised in the probe of the destination are swallowed,
+ * so that they do not interfere with the original rename failures.
+ * @param source source path
+ * @param op Rename request REST operation response with non-null HTTP response
+ * @param destination rename destination path
+ * @param sourceEtag etag of source file. may be null or empty
+ * @param tracingContext Tracks identifiers for request header
+ * @return true if the file was successfully copied
+ */
+ public boolean renameIdempotencyCheckOp(
+ final String source,
+ final String sourceEtag,
+ final AbfsRestOperation op,
+ final String destination,
+ TracingContext tracingContext) {
+ Preconditions.checkArgument(op.hasResult(), "Operations has null HTTP response");
+
+ if ((op.isARetriedRequest())
+ && (op.getResult().getStatusCode() == HttpURLConnection.HTTP_NOT_FOUND)
+ && isNotEmpty(sourceEtag)) {
+
+ // Server has returned HTTP 404, which means rename source no longer
+ // exists. Check on destination status and if its etag matches
+ // that of the source, consider it to be a success.
+ LOG.debug("rename {} to {} failed, checking etag of destination",
+ source, destination);
+
+ try {
+ final AbfsRestOperation destStatusOp = getPathStatus(destination,
+ false, tracingContext);
+ final AbfsHttpOperation result = destStatusOp.getResult();
+
+ return result.getStatusCode() == HttpURLConnection.HTTP_OK
+ && sourceEtag.equals(extractEtagHeader(result));
+ } catch (AzureBlobFileSystemException ignored) {
+ // GetFileStatus on the destination failed, the rename did not take place
+ }
+ }
+ return false;
}
public AbfsRestOperation append(final String path, final byte[] buffer,
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/SimpleKeyProvider.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/SimpleKeyProvider.java
index bb1ec9e4a3f..e3adc59afac 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/SimpleKeyProvider.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/SimpleKeyProvider.java
@@ -48,9 +48,15 @@ public class SimpleKeyProvider implements KeyProvider {
// Validating the key.
validateStorageAccountKey(key);
} catch (IllegalAccessException | InvalidConfigurationValueException e) {
- throw new KeyProviderException("Failure to initialize configuration", e);
+ LOG.debug("Failure to retrieve storage account key for {}", accountName,
+ e);
+ throw new KeyProviderException("Failure to initialize configuration for "
+ + accountName
+ + " key =\"" + key + "\""
+ + ": " + e, e);
} catch(IOException ioe) {
- LOG.warn("Unable to get key from credential providers. {}", ioe);
+ LOG.warn("Unable to get key for {} from credential providers. {}",
+ accountName, ioe, ioe);
}
return key;
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/AbstractAbfsIntegrationTest.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/AbstractAbfsIntegrationTest.java
index a57e9bcdcd8..3bc83385d1e 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/AbstractAbfsIntegrationTest.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/AbstractAbfsIntegrationTest.java
@@ -270,11 +270,12 @@ public abstract class AbstractAbfsIntegrationTest extends
// The SAS tests do not have permission to create a filesystem
// so first create temporary instance of the filesystem using SharedKey
// then re-use the filesystem it creates with SAS auth instead of SharedKey.
- AzureBlobFileSystem tempFs = (AzureBlobFileSystem) FileSystem.newInstance(rawConfig);
- ContractTestUtils.assertPathExists(tempFs, "This path should exist",
- new Path("/"));
- abfsConfig.set(FS_AZURE_ACCOUNT_AUTH_TYPE_PROPERTY_NAME, AuthType.SAS.name());
- usingFilesystemForSASTests = true;
+ try (AzureBlobFileSystem tempFs = (AzureBlobFileSystem) FileSystem.newInstance(rawConfig)){
+ ContractTestUtils.assertPathExists(tempFs, "This path should exist",
+ new Path("/"));
+ abfsConfig.set(FS_AZURE_ACCOUNT_AUTH_TYPE_PROPERTY_NAME, AuthType.SAS.name());
+ usingFilesystemForSASTests = true;
+ }
}
public AzureBlobFileSystem getFileSystem() throws IOException {
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsReadWriteAndSeek.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsReadWriteAndSeek.java
index b0e82444afb..5bd6eaff42e 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsReadWriteAndSeek.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsReadWriteAndSeek.java
@@ -32,7 +32,10 @@ import org.apache.hadoop.fs.azurebfs.constants.FSOperationType;
import org.apache.hadoop.fs.azurebfs.services.AbfsInputStream;
import org.apache.hadoop.fs.azurebfs.services.AbfsOutputStream;
import org.apache.hadoop.fs.azurebfs.utils.TracingHeaderValidator;
+import org.apache.hadoop.fs.statistics.IOStatisticsLogging;
+import org.apache.hadoop.fs.statistics.IOStatisticsSource;
+import static org.apache.hadoop.fs.CommonConfigurationKeys.IOSTATISTICS_LOGGING_LEVEL_INFO;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.APPENDBLOB_MAX_WRITE_BUFFER_SIZE;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_READ_BUFFER_SIZE;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.MAX_BUFFER_SIZE;
@@ -76,13 +79,19 @@ public class ITestAbfsReadWriteAndSeek extends AbstractAbfsScaleTest {
new Random().nextBytes(b);
Path testPath = path(TEST_PATH);
- try (FSDataOutputStream stream = fs.create(testPath)) {
+ FSDataOutputStream stream = fs.create(testPath);
+ try {
stream.write(b);
+ } finally{
+ stream.close();
}
+ IOStatisticsLogging.logIOStatisticsAtLevel(LOG, IOSTATISTICS_LOGGING_LEVEL_INFO, stream);
final byte[] readBuffer = new byte[2 * bufferSize];
int result;
+ IOStatisticsSource statisticsSource = null;
try (FSDataInputStream inputStream = fs.open(testPath)) {
+ statisticsSource = inputStream;
((AbfsInputStream) inputStream.getWrappedStream()).registerListener(
new TracingHeaderValidator(abfsConfiguration.getClientCorrelationId(),
fs.getFileSystemId(), FSOperationType.READ, true, 0,
@@ -100,6 +109,8 @@ public class ITestAbfsReadWriteAndSeek extends AbstractAbfsScaleTest {
inputStream.seek(0);
result = inputStream.read(readBuffer, 0, bufferSize);
}
+ IOStatisticsLogging.logIOStatisticsAtLevel(LOG, IOSTATISTICS_LOGGING_LEVEL_INFO, statisticsSource);
+
assertNotEquals("data read in final read()", -1, result);
assertArrayEquals(readBuffer, b);
}
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemDelegationSAS.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemDelegationSAS.java
index ea9fba62579..965e02a0a3e 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemDelegationSAS.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemDelegationSAS.java
@@ -401,7 +401,8 @@ public class ITestAzureBlobFileSystemDelegationSAS extends AbstractAbfsIntegrati
fs.create(new Path(src)).close();
AbfsRestOperation abfsHttpRestOperation = fs.getAbfsClient()
.renamePath(src, "/testABC" + "/abc.txt", null,
- getTestTracingContext(fs, false));
+ getTestTracingContext(fs, false), null)
+ .getLeft();
AbfsHttpOperation result = abfsHttpRestOperation.getResult();
String url = result.getMaskedUrl();
String encodedUrl = result.getMaskedEncodedUrl();
@@ -418,7 +419,7 @@ public class ITestAzureBlobFileSystemDelegationSAS extends AbstractAbfsIntegrati
intercept(IOException.class, "sig=XXXX",
() -> getFileSystem().getAbfsClient()
.renamePath("testABC/test.xt", "testABC/abc.txt", null,
- getTestTracingContext(getFileSystem(), false)));
+ getTestTracingContext(getFileSystem(), false), null));
}
@Test
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestCustomerProvidedKey.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestCustomerProvidedKey.java
index 0873b8e24b5..a361581ccd1 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestCustomerProvidedKey.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestCustomerProvidedKey.java
@@ -526,7 +526,8 @@ public class ITestCustomerProvidedKey extends AbstractAbfsIntegrationTest {
AbfsClient abfsClient = fs.getAbfsClient();
AbfsRestOperation abfsRestOperation = abfsClient
.renamePath(testFileName, newName, null,
- getTestTracingContext(fs, false));
+ getTestTracingContext(fs, false), null)
+ .getLeft();
assertCPKHeaders(abfsRestOperation, false);
assertNoCPKResponseHeadersPresent(abfsRestOperation);
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/commit/AbfsCommitTestHelper.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/commit/AbfsCommitTestHelper.java
new file mode 100644
index 00000000000..8160cdc64c5
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/commit/AbfsCommitTestHelper.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.commit;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.azurebfs.contract.ABFSContractTestBinding;
+
+import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterConstants.OPT_STORE_OPERATIONS_CLASS;
+
+/**
+ * Helper methods for committer tests on ABFS.
+ */
+final class AbfsCommitTestHelper {
+ private AbfsCommitTestHelper() {
+ }
+
+ /**
+ * Prepare the test configuration.
+ * @param contractTestBinding test binding
+ * @return an extracted and patched configuration.
+ */
+ static Configuration prepareTestConfiguration(
+ ABFSContractTestBinding contractTestBinding) {
+ final Configuration conf =
+ contractTestBinding.getRawConfiguration();
+
+ // use ABFS Store operations
+ conf.set(OPT_STORE_OPERATIONS_CLASS,
+ AbfsManifestStoreOperations.NAME);
+
+ return conf;
+ }
+}
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/commit/AbstractAbfsClusterITest.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/commit/AbstractAbfsClusterITest.java
new file mode 100644
index 00000000000..55752055f0c
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/commit/AbstractAbfsClusterITest.java
@@ -0,0 +1,260 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.commit;
+
+import java.io.IOException;
+import java.time.LocalDateTime;
+import java.time.format.DateTimeFormatter;
+
+import org.junit.AfterClass;
+import org.junit.Rule;
+import org.junit.rules.TemporaryFolder;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.azure.integration.AzureTestConstants;
+import org.apache.hadoop.fs.azurebfs.contract.ABFSContractTestBinding;
+import org.apache.hadoop.fs.azurebfs.contract.AbfsFileSystemContract;
+import org.apache.hadoop.fs.contract.AbstractFSContract;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.output.committer.manifest.AbstractManifestCommitterTest;
+import org.apache.hadoop.mapreduce.v2.MiniMRYarnCluster;
+import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig;
+import org.apache.hadoop.util.DurationInfo;
+
+import static java.util.Objects.requireNonNull;
+import static org.apache.hadoop.fs.azure.integration.AzureTestUtils.assumeScaleTestsEnabled;
+import static org.apache.hadoop.io.IOUtils.closeStream;
+
+/**
+ * Tests which create a yarn minicluster.
+ * These are all considered scale tests; the probe for
+ * scale tests being enabled is executed before the cluster
+ * is set up to avoid wasting time on non-scale runs.
+ */
+public abstract class AbstractAbfsClusterITest extends
+ AbstractManifestCommitterTest {
+
+ public static final int NO_OF_NODEMANAGERS = 2;
+
+ private final ABFSContractTestBinding binding;
+
+
+ /**
+ * The static cluster binding with the lifecycle of this test; served
+ * through instance-level methods for sharing across methods in the
+ * suite.
+ */
+ @SuppressWarnings("StaticNonFinalField")
+ private static ClusterBinding clusterBinding;
+
+ protected AbstractAbfsClusterITest() throws Exception {
+ binding = new ABFSContractTestBinding();
+ }
+
+ @Override
+ protected int getTestTimeoutMillis() {
+ return AzureTestConstants.SCALE_TEST_TIMEOUT_MILLIS;
+ }
+
+ @Override
+ public void setup() throws Exception {
+ binding.setup();
+ super.setup();
+ requireScaleTestsEnabled();
+ if (getClusterBinding() == null) {
+ clusterBinding = demandCreateClusterBinding();
+ }
+ assertNotNull("cluster is not bound", getClusterBinding());
+ }
+
+ @AfterClass
+ public static void teardownClusters() throws IOException {
+ terminateCluster(clusterBinding);
+ clusterBinding = null;
+ }
+
+ @Override
+ protected AbstractFSContract createContract(final Configuration conf) {
+ return new AbfsFileSystemContract(conf, binding.isSecureMode());
+ }
+
+ @Override
+ protected Configuration createConfiguration() {
+ return AbfsCommitTestHelper.prepareTestConfiguration(binding);
+ }
+
+ /**
+ * This is the cluster binding which every subclass must create.
+ */
+ protected static final class ClusterBinding {
+
+ private String clusterName;
+
+ private final MiniMRYarnCluster yarn;
+
+ public ClusterBinding(
+ final String clusterName,
+ final MiniMRYarnCluster yarn) {
+ this.clusterName = clusterName;
+ this.yarn = requireNonNull(yarn);
+ }
+
+
+ /**
+ * Get the cluster FS, which will either be HDFS or the local FS.
+ * @return a filesystem.
+ * @throws IOException failure
+ */
+ public FileSystem getClusterFS() throws IOException {
+ return FileSystem.getLocal(yarn.getConfig());
+ }
+
+ public MiniMRYarnCluster getYarn() {
+ return yarn;
+ }
+
+ public Configuration getConf() {
+ return getYarn().getConfig();
+ }
+
+ public String getClusterName() {
+ return clusterName;
+ }
+
+ public void terminate() {
+ closeStream(getYarn());
+ }
+ }
+
+ /**
+ * Create the cluster binding.
+ * The configuration will be patched by propagating down options
+ * from the maven build (S3Guard binding etc) and turning off unwanted
+ * YARN features.
+ *
+ * If an HDFS cluster is requested,
+ * the HDFS and YARN clusters will share the same configuration, so
+ * the HDFS cluster binding is implicitly propagated to YARN.
+ * If one is not requested, the local filesystem is used as the cluster FS.
+ * @param conf configuration to start with.
+ * @return the cluster binding.
+ * @throws IOException failure.
+ */
+ protected static ClusterBinding createCluster(
+ final JobConf conf) throws IOException {
+ try (DurationInfo d = new DurationInfo(LOG, "Creating YARN MiniCluster")) {
+ conf.setBoolean(JHAdminConfig.MR_HISTORY_CLEANER_ENABLE, false);
+ // create a unique cluster name based on the current time in millis.
+ String timestamp = LocalDateTime.now().format(
+ DateTimeFormatter.ofPattern("yyyy-MM-dd-HH.mm.ss.SS"));
+ String clusterName = "yarn-" + timestamp;
+ MiniMRYarnCluster yarnCluster =
+ new MiniMRYarnCluster(clusterName, NO_OF_NODEMANAGERS);
+ yarnCluster.init(conf);
+ yarnCluster.start();
+ return new ClusterBinding(clusterName, yarnCluster);
+ }
+ }
+
+ /**
+ * Terminate the cluster if it is not null.
+ * @param cluster the cluster
+ */
+ protected static void terminateCluster(ClusterBinding cluster) {
+ if (cluster != null) {
+ cluster.terminate();
+ }
+ }
+
+ /**
+ * Get the cluster binding for this subclass.
+ * @return the cluster binding
+ */
+ protected ClusterBinding getClusterBinding() {
+ return clusterBinding;
+ }
+
+ protected MiniMRYarnCluster getYarn() {
+ return getClusterBinding().getYarn();
+ }
+
+
+ /**
+ * We stage work into a temporary directory rather than directly under
+ * the user's home directory, as that is often rejected by CI test
+ * runners.
+ */
+ @Rule
+ public final TemporaryFolder stagingFilesDir = new TemporaryFolder();
+
+
+ /**
+ * binding on demand rather than in a BeforeClass static method.
+ * Subclasses can override this to change the binding options.
+ * @return the cluster binding
+ */
+ protected ClusterBinding demandCreateClusterBinding() throws Exception {
+ return createCluster(new JobConf());
+ }
+
+ /**
+ * Create a job configuration.
+ * This creates a new job conf from the yarn
+ * cluster configuration then calls
+ * {@link #applyCustomConfigOptions(JobConf)} to allow it to be customized.
+ * @return the new job configuration.
+ * @throws IOException failure
+ */
+ protected JobConf newJobConf() throws IOException {
+ JobConf jobConf = new JobConf(getYarn().getConfig());
+ jobConf.addResource(getConfiguration());
+ applyCustomConfigOptions(jobConf);
+ return jobConf;
+ }
+
+ /**
+ * Patch the (job) configuration for this committer.
+ * @param jobConf configuration to patch
+ * @return a configuration which will run this configuration.
+ */
+ protected Configuration patchConfigurationForCommitter(
+ final Configuration jobConf) {
+ enableManifestCommitter(jobConf);
+ return jobConf;
+ }
+
+ /**
+ * Override point to let implementations tune the MR Job conf.
+ * @param jobConf configuration
+ */
+ protected void applyCustomConfigOptions(JobConf jobConf) throws IOException {
+
+ }
+
+
+ /**
+ * Assume that scale tests are enabled.
+ */
+ protected void requireScaleTestsEnabled() {
+ assumeScaleTestsEnabled(getConfiguration());
+ }
+
+}
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/commit/ITestAbfsCleanupStage.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/commit/ITestAbfsCleanupStage.java
new file mode 100644
index 00000000000..a597c35376a
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/commit/ITestAbfsCleanupStage.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.commit;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.azurebfs.contract.ABFSContractTestBinding;
+import org.apache.hadoop.fs.azurebfs.contract.AbfsFileSystemContract;
+import org.apache.hadoop.fs.contract.AbstractFSContract;
+import org.apache.hadoop.mapreduce.lib.output.committer.manifest.TestCleanupStage;
+
+/**
+ * Cleanup logic on ABFS.
+ */
+public class ITestAbfsCleanupStage extends TestCleanupStage {
+
+ private final ABFSContractTestBinding binding;
+
+ public ITestAbfsCleanupStage() throws Exception {
+ binding = new ABFSContractTestBinding();
+ }
+
+ @Override
+ public void setup() throws Exception {
+ binding.setup();
+ super.setup();
+ }
+
+ @Override
+ protected Configuration createConfiguration() {
+ return AbfsCommitTestHelper.prepareTestConfiguration(binding);
+ }
+
+ @Override
+ protected AbstractFSContract createContract(final Configuration conf) {
+ return new AbfsFileSystemContract(conf, binding.isSecureMode());
+ }
+
+}
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/commit/ITestAbfsCommitTaskStage.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/commit/ITestAbfsCommitTaskStage.java
new file mode 100644
index 00000000000..a0aaec85328
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/commit/ITestAbfsCommitTaskStage.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.commit;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.azurebfs.contract.ABFSContractTestBinding;
+import org.apache.hadoop.fs.azurebfs.contract.AbfsFileSystemContract;
+import org.apache.hadoop.fs.contract.AbstractFSContract;
+import org.apache.hadoop.mapreduce.lib.output.committer.manifest.TestCommitTaskStage;
+
+/**
+ * ABFS storage test of task committer.
+ */
+public class ITestAbfsCommitTaskStage extends TestCommitTaskStage {
+
+ private final ABFSContractTestBinding binding;
+
+ public ITestAbfsCommitTaskStage() throws Exception {
+ binding = new ABFSContractTestBinding();
+ }
+
+ @Override
+ public void setup() throws Exception {
+ binding.setup();
+ super.setup();
+ }
+
+ @Override
+ protected Configuration createConfiguration() {
+ return AbfsCommitTestHelper.prepareTestConfiguration(binding);
+ }
+
+ @Override
+ protected AbstractFSContract createContract(final Configuration conf) {
+ return new AbfsFileSystemContract(conf, binding.isSecureMode());
+ }
+
+}
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/commit/ITestAbfsCreateOutputDirectoriesStage.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/commit/ITestAbfsCreateOutputDirectoriesStage.java
new file mode 100644
index 00000000000..6621b80da00
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/commit/ITestAbfsCreateOutputDirectoriesStage.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.commit;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.azurebfs.contract.ABFSContractTestBinding;
+import org.apache.hadoop.fs.azurebfs.contract.AbfsFileSystemContract;
+import org.apache.hadoop.fs.contract.AbstractFSContract;
+import org.apache.hadoop.mapreduce.lib.output.committer.manifest.TestCreateOutputDirectoriesStage;
+
+/**
+ * ABFS storage test of directory creation.
+ */
+public class ITestAbfsCreateOutputDirectoriesStage extends TestCreateOutputDirectoriesStage {
+
+ private final ABFSContractTestBinding binding;
+
+ public ITestAbfsCreateOutputDirectoriesStage() throws Exception {
+ binding = new ABFSContractTestBinding();
+ }
+
+ @Override
+ public void setup() throws Exception {
+ binding.setup();
+ super.setup();
+ }
+
+ @Override
+ protected Configuration createConfiguration() {
+ return AbfsCommitTestHelper.prepareTestConfiguration(binding);
+ }
+
+ @Override
+ protected AbstractFSContract createContract(final Configuration conf) {
+ return new AbfsFileSystemContract(conf, binding.isSecureMode());
+ }
+
+}
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/commit/ITestAbfsJobThroughManifestCommitter.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/commit/ITestAbfsJobThroughManifestCommitter.java
new file mode 100644
index 00000000000..4e4c4f5996b
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/commit/ITestAbfsJobThroughManifestCommitter.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.commit;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.assertj.core.api.Assertions;
+import org.junit.FixMethodOrder;
+import org.junit.runners.MethodSorters;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.azurebfs.contract.ABFSContractTestBinding;
+import org.apache.hadoop.fs.azurebfs.contract.AbfsFileSystemContract;
+import org.apache.hadoop.fs.contract.AbstractFSContract;
+import org.apache.hadoop.mapreduce.lib.output.committer.manifest.TestJobThroughManifestCommitter;
+import org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.FileEntry;
+import org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.TaskManifest;
+import org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.ManifestCommitterSupport;
+import org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.ManifestStoreOperations;
+
+import static org.apache.hadoop.fs.azurebfs.commit.AbfsCommitTestHelper.prepareTestConfiguration;
+
+/**
+ * Test the Manifest committer stages against ABFS.
+ */
+@FixMethodOrder(MethodSorters.NAME_ASCENDING)
+public class ITestAbfsJobThroughManifestCommitter
+ extends TestJobThroughManifestCommitter {
+
+ private final ABFSContractTestBinding binding;
+
+ public ITestAbfsJobThroughManifestCommitter() throws Exception {
+ binding = new ABFSContractTestBinding();
+ }
+
+ @Override
+ public void setup() throws Exception {
+ binding.setup();
+ super.setup();
+ }
+
+ @Override
+ protected Configuration createConfiguration() {
+ return enableManifestCommitter(prepareTestConfiguration(binding));
+ }
+
+ @Override
+ protected AbstractFSContract createContract(final Configuration conf) {
+ return new AbfsFileSystemContract(conf, binding.isSecureMode());
+ }
+
+ @Override
+ protected boolean shouldDeleteTestRootAtEndOfTestRun() {
+ return true;
+ }
+
+ /**
+ * Add read of manifest and validate of output's etags.
+ * @param attemptId attempt ID
+ * @param files files which were created.
+ * @param manifest manifest
+ * @throws IOException failure
+ */
+ @Override
+ protected void validateTaskAttemptManifest(String attemptId,
+ List files,
+ TaskManifest manifest) throws IOException {
+ super.validateTaskAttemptManifest(attemptId, files, manifest);
+ final List commit = manifest.getFilesToCommit();
+ final ManifestStoreOperations operations = getStoreOperations();
+ for (FileEntry entry : commit) {
+ Assertions.assertThat(entry.getEtag())
+ .describedAs("Etag of %s", entry)
+ .isNotEmpty();
+ final FileStatus sourceStatus = operations.getFileStatus(entry.getSourcePath());
+ final String etag = ManifestCommitterSupport.getEtag(sourceStatus);
+ Assertions.assertThat(etag)
+ .describedAs("Etag of %s", sourceStatus)
+ .isEqualTo(entry.getEtag());
+ }
+ }
+}
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/commit/ITestAbfsLoadManifestsStage.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/commit/ITestAbfsLoadManifestsStage.java
new file mode 100644
index 00000000000..acd693e39a0
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/commit/ITestAbfsLoadManifestsStage.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.commit;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.azurebfs.contract.ABFSContractTestBinding;
+import org.apache.hadoop.fs.azurebfs.contract.AbfsFileSystemContract;
+import org.apache.hadoop.fs.contract.AbstractFSContract;
+import org.apache.hadoop.mapreduce.lib.output.committer.manifest.TestLoadManifestsStage;
+
+/**
+ * ABFS storage test of saving and loading a large number
+ * of manifests.
+ */
+public class ITestAbfsLoadManifestsStage extends TestLoadManifestsStage {
+
+ private final ABFSContractTestBinding binding;
+
+ public ITestAbfsLoadManifestsStage() throws Exception {
+ binding = new ABFSContractTestBinding();
+ }
+
+ @Override
+ public void setup() throws Exception {
+ binding.setup();
+ super.setup();
+ }
+
+ @Override
+ protected Configuration createConfiguration() {
+ return AbfsCommitTestHelper.prepareTestConfiguration(binding);
+ }
+
+ @Override
+ protected AbstractFSContract createContract(final Configuration conf) {
+ return new AbfsFileSystemContract(conf, binding.isSecureMode());
+ }
+
+}
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/commit/ITestAbfsManifestCommitProtocol.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/commit/ITestAbfsManifestCommitProtocol.java
new file mode 100644
index 00000000000..aac06f952da
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/commit/ITestAbfsManifestCommitProtocol.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.commit;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.azurebfs.contract.ABFSContractTestBinding;
+import org.apache.hadoop.fs.azurebfs.contract.AbfsFileSystemContract;
+import org.apache.hadoop.fs.contract.AbstractFSContract;
+import org.apache.hadoop.mapreduce.lib.output.committer.manifest.TestManifestCommitProtocol;
+
+import static org.apache.hadoop.fs.azurebfs.commit.AbfsCommitTestHelper.prepareTestConfiguration;
+
+/**
+ * Test the Manifest protocol against ABFS.
+ */
+public class ITestAbfsManifestCommitProtocol extends
+ TestManifestCommitProtocol {
+
+ private final ABFSContractTestBinding binding;
+
+ public ITestAbfsManifestCommitProtocol() throws Exception {
+ binding = new ABFSContractTestBinding();
+ }
+
+ @Override
+ public void setup() throws Exception {
+ binding.setup();
+ super.setup();
+ }
+
+ @Override
+ protected Configuration createConfiguration() {
+ return enableManifestCommitter(prepareTestConfiguration(binding));
+ }
+
+ @Override
+ protected AbstractFSContract createContract(final Configuration conf) {
+ return new AbfsFileSystemContract(conf, binding.isSecureMode());
+ }
+
+
+ @Override
+ protected String suitename() {
+ return "ITestAbfsManifestCommitProtocol";
+ }
+}
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/commit/ITestAbfsManifestStoreOperations.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/commit/ITestAbfsManifestStoreOperations.java
new file mode 100644
index 00000000000..922782da29c
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/commit/ITestAbfsManifestStoreOperations.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.commit;
+
+import java.nio.charset.StandardCharsets;
+
+import org.assertj.core.api.Assertions;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.azurebfs.contract.ABFSContractTestBinding;
+import org.apache.hadoop.fs.azurebfs.contract.AbfsFileSystemContract;
+import org.apache.hadoop.fs.contract.AbstractFSContract;
+import org.apache.hadoop.fs.contract.ContractTestUtils;
+import org.apache.hadoop.mapreduce.lib.output.committer.manifest.AbstractManifestCommitterTest;
+import org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.FileEntry;
+import org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.ManifestStoreOperations;
+
+import static org.apache.hadoop.fs.CommonPathCapabilities.ETAGS_PRESERVED_IN_RENAME;
+import static org.apache.hadoop.fs.azurebfs.commit.AbfsCommitTestHelper.prepareTestConfiguration;
+import static org.junit.Assume.assumeTrue;
+
+/**
+ * Test {@link AbfsManifestStoreOperations}.
+ * As this looks at etag handling through FS operations, it's actually testing how etags work
+ * in ABFS (preservation across renames) and in the client (are they consistent
+ * in LIST and HEAD calls).
+ *
+ * Skipped when tested against wasb-compatible stores.
+ */
+public class ITestAbfsManifestStoreOperations extends AbstractManifestCommitterTest {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(ITestAbfsManifestStoreOperations.class);
+
+ private final ABFSContractTestBinding binding;
+
+ public ITestAbfsManifestStoreOperations() throws Exception {
+ binding = new ABFSContractTestBinding();
+ }
+
+ @Override
+ public void setup() throws Exception {
+ binding.setup();
+ super.setup();
+
+ // skip tests on non-HNS stores
+ assumeTrue("Resilient rename not available",
+ getFileSystem().hasPathCapability(getContract().getTestPath(),
+ ETAGS_PRESERVED_IN_RENAME));
+
+ }
+
+ @Override
+ protected Configuration createConfiguration() {
+ return enableManifestCommitter(prepareTestConfiguration(binding));
+ }
+
+ @Override
+ protected AbstractFSContract createContract(final Configuration conf) {
+ return new AbfsFileSystemContract(conf, binding.isSecureMode());
+ }
+
+ /**
+ * basic consistency across operations, as well as being non-empty.
+ */
+ @Test
+ public void testEtagConsistencyAcrossListAndHead() throws Throwable {
+ describe("Etag values must be non-empty and consistent across LIST and HEAD Calls.");
+ final Path path = methodPath();
+ final FileSystem fs = getFileSystem();
+ ContractTestUtils.touch(fs, path);
+ final ManifestStoreOperations operations = createManifestStoreOperations();
+ Assertions.assertThat(operations)
+ .describedAs("Store operations class loaded via Configuration")
+ .isInstanceOf(AbfsManifestStoreOperations.class);
+
+ final FileStatus st = operations.getFileStatus(path);
+ final String etag = operations.getEtag(st);
+ Assertions.assertThat(etag)
+ .describedAs("Etag of %s", st)
+ .isNotBlank();
+ LOG.info("etag of empty file is \"{}\"", etag);
+
+ final FileStatus[] statuses = fs.listStatus(path);
+ Assertions.assertThat(statuses)
+ .describedAs("List(%s)", path)
+ .hasSize(1);
+ final FileStatus lsStatus = statuses[0];
+ Assertions.assertThat(operations.getEtag(lsStatus))
+ .describedAs("etag of list status (%s) compared to HEAD value of %s", lsStatus, st)
+ .isEqualTo(etag);
+ }
+
+ @Test
+ public void testEtagsOfDifferentDataDifferent() throws Throwable {
+ describe("Verify that two different blocks of data written have different tags");
+
+ final Path path = methodPath();
+ final FileSystem fs = getFileSystem();
+ Path src = new Path(path, "src");
+
+ ContractTestUtils.createFile(fs, src, true,
+ "data1234".getBytes(StandardCharsets.UTF_8));
+ final ManifestStoreOperations operations = createManifestStoreOperations();
+ final FileStatus srcStatus = operations.getFileStatus(src);
+ final String srcTag = operations.getEtag(srcStatus);
+ LOG.info("etag of file 1 is \"{}\"", srcTag);
+
+ // now overwrite with data of same length
+ // (ensure that path or length aren't used exclusively as tag)
+ ContractTestUtils.createFile(fs, src, true,
+ "1234data".getBytes(StandardCharsets.UTF_8));
+
+ // validate
+ final String tag2 = operations.getEtag(operations.getFileStatus(src));
+ LOG.info("etag of file 2 is \"{}\"", tag2);
+
+ Assertions.assertThat(tag2)
+ .describedAs("etag of updated file")
+ .isNotEqualTo(srcTag);
+ }
+
+ @Test
+ public void testEtagConsistencyAcrossRename() throws Throwable {
+ describe("Verify that when a file is renamed, the etag remains unchanged");
+ final Path path = methodPath();
+ final FileSystem fs = getFileSystem();
+ Path src = new Path(path, "src");
+ Path dest = new Path(path, "dest");
+
+ ContractTestUtils.createFile(fs, src, true,
+ "sample data".getBytes(StandardCharsets.UTF_8));
+ final ManifestStoreOperations operations = createManifestStoreOperations();
+ final FileStatus srcStatus = operations.getFileStatus(src);
+ final String srcTag = operations.getEtag(srcStatus);
+ LOG.info("etag of short file is \"{}\"", srcTag);
+
+ Assertions.assertThat(srcTag)
+ .describedAs("Etag of %s", srcStatus)
+ .isNotBlank();
+
+ // rename
+ operations.commitFile(new FileEntry(src, dest, 0, srcTag));
+
+ // validate
+ FileStatus destStatus = operations.getFileStatus(dest);
+ final String destTag = operations.getEtag(destStatus);
+ Assertions.assertThat(destTag)
+ .describedAs("etag of list status (%s) compared to HEAD value of %s", destStatus, srcStatus)
+ .isEqualTo(srcTag);
+ }
+
+}
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/commit/ITestAbfsRenameStageFailure.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/commit/ITestAbfsRenameStageFailure.java
new file mode 100644
index 00000000000..5547d081c96
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/commit/ITestAbfsRenameStageFailure.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.commit;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.azurebfs.contract.ABFSContractTestBinding;
+import org.apache.hadoop.fs.azurebfs.contract.AbfsFileSystemContract;
+import org.apache.hadoop.fs.contract.AbstractFSContract;
+import org.apache.hadoop.mapreduce.lib.output.committer.manifest.TestRenameStageFailure;
+
+/**
+ * Rename failure logic on ABFS.
+ * This will go through the resilient rename operation.
+ */
+public class ITestAbfsRenameStageFailure extends TestRenameStageFailure {
+
+ /**
+ * How many files to create.
+ */
+ private static final int FILES_TO_CREATE = 20;
+
+ private final ABFSContractTestBinding binding;
+
+ public ITestAbfsRenameStageFailure() throws Exception {
+ binding = new ABFSContractTestBinding();
+ }
+
+ @Override
+ public void setup() throws Exception {
+ binding.setup();
+ super.setup();
+ }
+
+ @Override
+ protected Configuration createConfiguration() {
+ return AbfsCommitTestHelper.prepareTestConfiguration(binding);
+ }
+
+ @Override
+ protected AbstractFSContract createContract(final Configuration conf) {
+ return new AbfsFileSystemContract(conf, binding.isSecureMode());
+ }
+
+ @Override
+ protected boolean requireRenameResilience() {
+ return true;
+ }
+
+ @Override
+ protected int filesToCreate() {
+ return FILES_TO_CREATE;
+ }
+}
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/commit/ITestAbfsTaskManifestFileIO.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/commit/ITestAbfsTaskManifestFileIO.java
new file mode 100644
index 00000000000..d2fe9de115c
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/commit/ITestAbfsTaskManifestFileIO.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.commit;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.azurebfs.contract.ABFSContractTestBinding;
+import org.apache.hadoop.fs.azurebfs.contract.AbfsFileSystemContract;
+import org.apache.hadoop.fs.contract.AbstractFSContract;
+import org.apache.hadoop.mapreduce.lib.output.committer.manifest.TestTaskManifestFileIO;
+
+/**
+ * Test Reading/writing manifest file through ABFS.
+ */
+public class ITestAbfsTaskManifestFileIO extends TestTaskManifestFileIO {
+
+ private final ABFSContractTestBinding binding;
+
+ public ITestAbfsTaskManifestFileIO() throws Exception {
+ binding = new ABFSContractTestBinding();
+ }
+
+ @Override
+ public void setup() throws Exception {
+ binding.setup();
+ super.setup();
+ }
+
+ @Override
+ protected Configuration createConfiguration() {
+ return AbfsCommitTestHelper.prepareTestConfiguration(binding);
+ }
+
+ @Override
+ protected AbstractFSContract createContract(final Configuration conf) {
+ return new AbfsFileSystemContract(conf, binding.isSecureMode());
+ }
+
+}
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/commit/ITestAbfsTerasort.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/commit/ITestAbfsTerasort.java
new file mode 100644
index 00000000000..4b21b838dec
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/commit/ITestAbfsTerasort.java
@@ -0,0 +1,353 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.commit;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.function.Consumer;
+
+import org.junit.Assume;
+import org.junit.FixMethodOrder;
+import org.junit.Test;
+import org.junit.runners.MethodSorters;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.examples.terasort.TeraGen;
+import org.apache.hadoop.examples.terasort.TeraSort;
+import org.apache.hadoop.examples.terasort.TeraSortConfigKeys;
+import org.apache.hadoop.examples.terasort.TeraValidate;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.statistics.IOStatisticsLogging;
+import org.apache.hadoop.fs.statistics.IOStatisticsSnapshot;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.ManifestSuccessData;
+import org.apache.hadoop.util.DurationInfo;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.hadoop.util.functional.RemoteIterators;
+
+import static java.util.Optional.empty;
+import static org.apache.hadoop.fs.CommonConfigurationKeys.IOSTATISTICS_LOGGING_LEVEL_INFO;
+import static org.apache.hadoop.fs.statistics.IOStatisticsSupport.snapshotIOStatistics;
+import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterTestSupport.loadSuccessFile;
+import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterTestSupport.validateSuccessFile;
+
+/**
+ * Runs Terasort against ABFS using the manifest committer.
+ * The tests run in sequence, so each operation is isolated.
+ * Scale test only (it is big and slow)
+ */
+@FixMethodOrder(MethodSorters.NAME_ASCENDING)
+@SuppressWarnings({"StaticNonFinalField", "OptionalUsedAsFieldOrParameterType"})
+public class ITestAbfsTerasort extends AbstractAbfsClusterITest {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(ITestAbfsTerasort.class);
+
+ public static final int EXPECTED_PARTITION_COUNT = 10;
+
+ public static final int PARTITION_SAMPLE_SIZE = 1000;
+
+ public static final int ROW_COUNT = 1000;
+
+ /**
+ * This has to be common across all test methods.
+ */
+ private static final Path TERASORT_PATH = new Path("/ITestAbfsTerasort");
+
+ /**
+ * Duration tracker created in the first of the test cases and closed
+ * in {@link #test_140_teracomplete()}.
+ */
+ private static Optional terasortDuration = empty();
+
+ /**
+ * Tracker of which stages are completed and how long they took.
+ */
+ private static final Map COMPLETED_STAGES = new HashMap<>();
+
+ /**
+ * FileSystem statistics are collected from the _SUCCESS markers.
+ */
+ protected static final IOStatisticsSnapshot JOB_IOSTATS =
+ snapshotIOStatistics();
+
+ /** Base path for all the terasort input and output paths. */
+ private Path terasortPath;
+
+ /** Input (teragen) path. */
+ private Path sortInput;
+
+ /** Path where sorted data goes. */
+ private Path sortOutput;
+
+ /** Path for validated job's output. */
+ private Path sortValidate;
+
+ public ITestAbfsTerasort() throws Exception {
+ }
+
+
+ @Override
+ public void setup() throws Exception {
+ // superclass calls requireScaleTestsEnabled();
+ super.setup();
+ prepareToTerasort();
+ }
+
+ /**
+ * Set up the job conf with the options for terasort chosen by the scale
+ * options.
+ * @param conf configuration
+ */
+ @Override
+ protected void applyCustomConfigOptions(JobConf conf) {
+ // small sample size for faster runs
+ conf.setInt(TeraSortConfigKeys.SAMPLE_SIZE.key(),
+ getSampleSizeForEachPartition());
+ conf.setInt(TeraSortConfigKeys.NUM_PARTITIONS.key(),
+ getExpectedPartitionCount());
+ conf.setBoolean(
+ TeraSortConfigKeys.USE_SIMPLE_PARTITIONER.key(),
+ false);
+ }
+
+ private int getExpectedPartitionCount() {
+ return EXPECTED_PARTITION_COUNT;
+ }
+
+ private int getSampleSizeForEachPartition() {
+ return PARTITION_SAMPLE_SIZE;
+ }
+
+ protected int getRowCount() {
+ return ROW_COUNT;
+ }
+
+ /**
+ * Set up the terasort by initializing paths variables
+ * The paths used must be unique across parameterized runs but
+ * common across all test cases in a single parameterized run.
+ */
+ private void prepareToTerasort() {
+ terasortPath = getFileSystem().makeQualified(TERASORT_PATH);
+ sortInput = new Path(terasortPath, "sortin");
+ sortOutput = new Path(terasortPath, "sortout");
+ sortValidate = new Path(terasortPath, "validate");
+ }
+
+ /**
+ * Declare that a stage has completed.
+ * @param stage stage name/key in the map
+ * @param d duration.
+ */
+ private static void completedStage(final String stage,
+ final DurationInfo d) {
+ COMPLETED_STAGES.put(stage, d);
+ }
+
+ /**
+ * Declare a stage which is required for this test case.
+ * @param stage stage name
+ */
+ private static void requireStage(final String stage) {
+ Assume.assumeTrue(
+ "Required stage was not completed: " + stage,
+ COMPLETED_STAGES.get(stage) != null);
+ }
+
+ /**
+ * Execute a single stage in the terasort.
+ * Updates the completed stages map with the stage duration -if successful.
+ * @param stage Stage name for the stages map.
+ * @param jobConf job conf
+ * @param dest destination directory -the _SUCCESS file will be expected here.
+ * @param tool tool to run.
+ * @param args args for the tool.
+ * @param minimumFileCount minimum number of files to have been created
+ * @throws Exception any failure
+ */
+ private void executeStage(
+ final String stage,
+ final JobConf jobConf,
+ final Path dest,
+ final Tool tool,
+ final String[] args,
+ final int minimumFileCount) throws Exception {
+ int result;
+
+ // the duration info is created outside a try-with-resources
+ // clause as it is used later.
+ DurationInfo d = new DurationInfo(LOG, stage);
+ try {
+ result = ToolRunner.run(jobConf, tool, args);
+ } finally {
+ d.close();
+ }
+ dumpOutputTree(dest);
+ assertEquals(stage
+ + "(" + StringUtils.join(", ", args) + ")"
+ + " failed", 0, result);
+ final ManifestSuccessData successFile = validateSuccessFile(getFileSystem(), dest,
+ minimumFileCount, "");
+ JOB_IOSTATS.aggregate(successFile.getIOStatistics());
+
+ completedStage(stage, d);
+ }
+
+ /**
+ * Set up terasort by cleaning out the destination, and note the initial
+ * time before any of the jobs are executed.
+ *
+ * This is executed first for each parameterized run.
+ * It is where all variables which need to be reset for each run need
+ * to be reset.
+ */
+ @Test
+ public void test_100_terasort_setup() throws Throwable {
+ describe("Setting up for a terasort");
+
+ getFileSystem().delete(terasortPath, true);
+ terasortDuration = Optional.of(new DurationInfo(LOG, false, "Terasort"));
+ }
+
+ @Test
+ public void test_110_teragen() throws Throwable {
+ describe("Teragen to %s", sortInput);
+ getFileSystem().delete(sortInput, true);
+
+ JobConf jobConf = newJobConf();
+ patchConfigurationForCommitter(jobConf);
+ executeStage("teragen",
+ jobConf,
+ sortInput,
+ new TeraGen(),
+ new String[]{Integer.toString(getRowCount()), sortInput.toString()},
+ 1);
+ }
+
+
+ @Test
+ public void test_120_terasort() throws Throwable {
+ describe("Terasort from %s to %s", sortInput, sortOutput);
+ requireStage("teragen");
+ getFileSystem().delete(sortOutput, true);
+
+ loadSuccessFile(getFileSystem(), sortInput);
+ JobConf jobConf = newJobConf();
+ patchConfigurationForCommitter(jobConf);
+ executeStage("terasort",
+ jobConf,
+ sortOutput,
+ new TeraSort(),
+ new String[]{sortInput.toString(), sortOutput.toString()},
+ 1);
+ }
+
+ @Test
+ public void test_130_teravalidate() throws Throwable {
+ describe("TeraValidate from %s to %s", sortOutput, sortValidate);
+ requireStage("terasort");
+ getFileSystem().delete(sortValidate, true);
+ loadSuccessFile(getFileSystem(), sortOutput);
+ JobConf jobConf = newJobConf();
+ patchConfigurationForCommitter(jobConf);
+ executeStage("teravalidate",
+ jobConf,
+ sortValidate,
+ new TeraValidate(),
+ new String[]{sortOutput.toString(), sortValidate.toString()},
+ 1);
+ }
+
+ /**
+ * Print the results, and save to the base dir as a CSV file.
+ * Why there? Makes it easy to list and compare.
+ */
+ @Test
+ public void test_140_teracomplete() throws Throwable {
+ terasortDuration.ifPresent(d -> {
+ d.close();
+ completedStage("overall", d);
+ });
+
+ // IO Statistics
+ IOStatisticsLogging.logIOStatisticsAtLevel(LOG, IOSTATISTICS_LOGGING_LEVEL_INFO, JOB_IOSTATS);
+
+ // and the summary
+ final StringBuilder results = new StringBuilder();
+ results.append("\"Operation\"\t\"Duration\"\n");
+
+ // this is how you dynamically create a function in a method
+ // for use afterwards.
+ // Works because there's no IOEs being raised in this sequence.
+ Consumer stage = (s) -> {
+ DurationInfo duration = COMPLETED_STAGES.get(s);
+ results.append(String.format("\"%s\"\t\"%s\"\n",
+ s,
+ duration == null ? "" : duration));
+ };
+
+ stage.accept("teragen");
+ stage.accept("terasort");
+ stage.accept("teravalidate");
+ stage.accept("overall");
+ String text = results.toString();
+ File resultsFile = File.createTempFile("results", ".csv");
+ FileUtils.write(resultsFile, text, StandardCharsets.UTF_8);
+ LOG.info("Results are in {}\n{}", resultsFile, text);
+ }
+
+ /**
+ * Reset the duration so if two committer tests are run sequentially.
+ * Without this the total execution time is reported as from the start of
+ * the first test suite to the end of the second.
+ */
+ @Test
+ public void test_150_teracleanup() throws Throwable {
+ terasortDuration = Optional.empty();
+ }
+
+ @Test
+ public void test_200_directory_deletion() throws Throwable {
+ getFileSystem().delete(terasortPath, true);
+ }
+
+ /**
+ * Dump the files under a path -but not fail if the path is not present.,
+ * @param path path to dump
+ * @throws Exception any failure.
+ */
+ protected void dumpOutputTree(Path path) throws Exception {
+ LOG.info("Files under output directory {}", path);
+ try {
+ RemoteIterators.foreach(getFileSystem().listFiles(path, true),
+ (status) -> LOG.info("{}", status));
+ } catch (FileNotFoundException e) {
+ LOG.info("Output directory {} not found", path);
+ }
+ }
+}
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/commit/package-info.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/commit/package-info.java
new file mode 100644
index 00000000000..3d49d62eaa8
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/commit/package-info.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Unit and integration tests for the manifest committer.
+ * JSON job reports will be saved to
+ * {@code target/reports}
+ */
+package org.apache.hadoop.fs.azurebfs.commit;
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/contract/AbfsFileSystemContract.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/contract/AbfsFileSystemContract.java
index 62bcca174ef..1319ea44c7c 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/contract/AbfsFileSystemContract.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/contract/AbfsFileSystemContract.java
@@ -34,7 +34,7 @@ public class AbfsFileSystemContract extends AbstractBondedFSContract {
public static final String CONTRACT_XML = "abfs.xml";
private final boolean isSecure;
- protected AbfsFileSystemContract(final Configuration conf, boolean secure) {
+ public AbfsFileSystemContract(final Configuration conf, boolean secure) {
super(conf);
//insert the base features
addConfResource(CONTRACT_XML);
diff --git a/hadoop-tools/hadoop-azure/src/test/resources/core-site.xml b/hadoop-tools/hadoop-azure/src/test/resources/core-site.xml
new file mode 100644
index 00000000000..7d2d11c04ef
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/test/resources/core-site.xml
@@ -0,0 +1,25 @@
+
+
+
+
+
+
+
+
+