diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java index 21501d28f42..5534b5fb44a 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java @@ -117,6 +117,7 @@ import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_BLOCK_UPLOAD_BUFFER_DIR; import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.BLOCK_UPLOAD_ACTIVE_BLOCKS_DEFAULT; import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DATA_BLOCKS_BUFFER_DEFAULT; +import static org.apache.hadoop.fs.azurebfs.constants.InternalConstants.CAPABILITY_SAFE_READAHEAD; import static org.apache.hadoop.fs.impl.PathCapabilitiesSupport.validatePathCapabilityArgs; import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.logIOStatisticsAtLevel; import static org.apache.hadoop.util.functional.RemoteIterators.filteringRemoteIterator; @@ -235,6 +236,7 @@ public String toString() { sb.append("uri=").append(uri); sb.append(", user='").append(abfsStore.getUser()).append('\''); sb.append(", primaryUserGroup='").append(abfsStore.getPrimaryGroup()).append('\''); + sb.append("[" + CAPABILITY_SAFE_READAHEAD + "]"); sb.append('}'); return sb.toString(); } @@ -1636,6 +1638,11 @@ public boolean hasPathCapability(final Path path, final String capability) new TracingContext(clientCorrelationId, fileSystemId, FSOperationType.HAS_PATH_CAPABILITY, tracingHeaderFormat, listener)); + + // probe for presence of the HADOOP-18546 readahead fix. + case CAPABILITY_SAFE_READAHEAD: + return true; + default: return super.hasPathCapability(p, capability); } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/InternalConstants.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/InternalConstants.java new file mode 100644 index 00000000000..12d4f14d92a --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/InternalConstants.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs.constants; + +import org.apache.hadoop.classification.InterfaceAudience; + +/** + * Constants which are used internally and which don't fit into the other + * classes. + * For use within the {@code hadoop-azure} module only. + */ +@InterfaceAudience.Private +public final class InternalConstants { + + private InternalConstants() { + } + + /** + * Does this version of the store have safe readahead? + * Possible combinations of this and the probe + * {@code "fs.capability.etags.available"}. + *
    + *
  1. {@value}: store is safe
  2. + *
  3. !etags: store is safe
  4. + *
  5. etags && !{@value}: store is UNSAFE
  6. + *
+ */ + public static final String CAPABILITY_SAFE_READAHEAD = + "fs.azure.capability.readahead.safe"; +} diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java index 8f12484a55c..fdeaf701775 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java @@ -50,6 +50,7 @@ import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.ONE_KB; import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.STREAM_ID_LEN; +import static org.apache.hadoop.fs.azurebfs.constants.InternalConstants.CAPABILITY_SAFE_READAHEAD; import static org.apache.hadoop.util.StringUtils.toLowerCase; /** @@ -828,11 +829,12 @@ public IOStatistics getIOStatistics() { @Override public String toString() { final StringBuilder sb = new StringBuilder(super.toString()); + sb.append("AbfsInputStream@(").append(this.hashCode()).append("){"); + sb.append("[" + CAPABILITY_SAFE_READAHEAD + "]"); if (streamStatistics != null) { - sb.append("AbfsInputStream@(").append(this.hashCode()).append("){"); - sb.append(streamStatistics.toString()); - sb.append("}"); + sb.append(", ").append(streamStatistics); } + sb.append("}"); return sb.toString(); } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java index ac84f0b27cf..031545f57a1 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java @@ -101,6 +101,7 @@ private void init() { // hide instance constructor private ReadBufferManager() { + LOGGER.trace("Creating readbuffer manager with HADOOP-18546 patch"); } diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestFileSystemInitialization.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestFileSystemInitialization.java index 8b60dd801cb..f7d4a5b7a83 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestFileSystemInitialization.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestFileSystemInitialization.java @@ -20,14 +20,22 @@ import java.net.URI; +import org.assertj.core.api.Assertions; import org.junit.Test; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.azurebfs.constants.FileSystemUriSchemes; import org.apache.hadoop.fs.azurebfs.services.AuthType; +import static org.apache.hadoop.fs.CommonPathCapabilities.ETAGS_AVAILABLE; +import static org.apache.hadoop.fs.CommonPathCapabilities.ETAGS_PRESERVED_IN_RENAME; +import static org.apache.hadoop.fs.CommonPathCapabilities.FS_ACLS; +import static org.apache.hadoop.fs.azurebfs.constants.InternalConstants.CAPABILITY_SAFE_READAHEAD; +import static org.junit.Assume.assumeTrue; + /** * Test AzureBlobFileSystem initialization. */ @@ -74,4 +82,28 @@ public void ensureSecureAzureBlobFileSystemIsInitialized() throws Exception { assertNotNull("working directory", fs.getWorkingDirectory()); } } + + @Test + public void testFileSystemCapabilities() throws Throwable { + final AzureBlobFileSystem fs = getFileSystem(); + + final Path p = new Path("}"); + // etags always present + Assertions.assertThat(fs.hasPathCapability(p, ETAGS_AVAILABLE)) + .describedAs("path capability %s in %s", ETAGS_AVAILABLE, fs) + .isTrue(); + // readahead always correct + Assertions.assertThat(fs.hasPathCapability(p, CAPABILITY_SAFE_READAHEAD)) + .describedAs("path capability %s in %s", CAPABILITY_SAFE_READAHEAD, fs) + .isTrue(); + + // etags-over-rename and ACLs are either both true or both false. + final boolean etagsAcrossRename = fs.hasPathCapability(p, ETAGS_PRESERVED_IN_RENAME); + final boolean acls = fs.hasPathCapability(p, FS_ACLS); + Assertions.assertThat(etagsAcrossRename) + .describedAs("capabilities %s=%s and %s=%s in %s", + ETAGS_PRESERVED_IN_RENAME, etagsAcrossRename, + FS_ACLS, acls, fs) + .isEqualTo(acls); + } } diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestReadBufferManager.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestReadBufferManager.java index eca670fba90..a57430fa808 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestReadBufferManager.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestReadBufferManager.java @@ -44,9 +44,24 @@ import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_READ_AHEAD_QUEUE_DEPTH; import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.MIN_BUFFER_SIZE; import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.ONE_MB; +import static org.apache.hadoop.fs.azurebfs.constants.InternalConstants.CAPABILITY_SAFE_READAHEAD; +import static org.apache.hadoop.test.LambdaTestUtils.eventually; public class ITestReadBufferManager extends AbstractAbfsIntegrationTest { + /** + * Time before the JUnit test times out for eventually() clauses + * to fail. This copes with slow network connections and debugging + * sessions, yet still allows for tests to fail with meaningful + * messages. + */ + public static final int TIMEOUT_OFFSET = 5 * 60_000; + + /** + * Interval between eventually preobes. + */ + public static final int PROBE_INTERVAL_MILLIS = 1_000; + public ITestReadBufferManager() throws Exception { } @@ -61,6 +76,11 @@ public void testPurgeBufferManagerForParallelStreams() throws Exception { } ExecutorService executorService = Executors.newFixedThreadPool(4); AzureBlobFileSystem fs = getABFSWithReadAheadConfig(); + // verify that the fs has the capability to validate the fix + Assertions.assertThat(fs.hasPathCapability(new Path("/"), CAPABILITY_SAFE_READAHEAD)) + .describedAs("path capability %s in %s", CAPABILITY_SAFE_READAHEAD, fs) + .isTrue(); + try { for (int i = 0; i < 4; i++) { final String fileName = methodName.getMethodName() + i; @@ -80,9 +100,11 @@ public void testPurgeBufferManagerForParallelStreams() throws Exception { } ReadBufferManager bufferManager = ReadBufferManager.getBufferManager(); - // verify there is no work in progress or the readahead queue. - assertListEmpty("InProgressList", bufferManager.getInProgressCopiedList()); + // readahead queue is empty assertListEmpty("ReadAheadQueue", bufferManager.getReadAheadQueueCopy()); + // verify the in progress list eventually empties out. + eventually(getTestTimeoutMillis() - TIMEOUT_OFFSET, PROBE_INTERVAL_MILLIS, () -> + assertListEmpty("InProgressList", bufferManager.getInProgressCopiedList())); } private void assertListEmpty(String listName, List list) {