HADOOP-18577. ABFS: Add probes of readahead fix (#5205)

Followup patch to  HADOOP-18456 as part of HADOOP-18521,
ABFS ReadBufferManager buffer sharing across concurrent HTTP requests

Add probes of readahead fix aid in checking safety of
hadoop ABFS client across different releases.

* ReadBufferManager constructor logs the fact it is safe at TRACE
* AbfsInputStream declares it is fixed in toString()
  by including fs.azure.capability.readahead.safe" in the
  result.

The ABFS FileSystem hasPathCapability("fs.azure.capability.readahead.safe")
probe returns true to indicate the client's readahead manager has been fixed
to be safe when prefetching.

All Hadoop releases for which probe this returns false
and for which the probe "fs.capability.etags.available"
returns true at risk of returning invalid data when reading
ADLS Gen2/Azure storage data.

Contributed by Steve Loughran.
This commit is contained in:
Steve Loughran 2022-12-15 17:08:25 +00:00 committed by GitHub
parent 5f08e51b72
commit cf1244492d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 115 additions and 5 deletions

View File

@ -117,6 +117,7 @@
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_BLOCK_UPLOAD_BUFFER_DIR;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.BLOCK_UPLOAD_ACTIVE_BLOCKS_DEFAULT;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DATA_BLOCKS_BUFFER_DEFAULT;
import static org.apache.hadoop.fs.azurebfs.constants.InternalConstants.CAPABILITY_SAFE_READAHEAD;
import static org.apache.hadoop.fs.impl.PathCapabilitiesSupport.validatePathCapabilityArgs;
import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.logIOStatisticsAtLevel;
import static org.apache.hadoop.util.functional.RemoteIterators.filteringRemoteIterator;
@ -235,6 +236,7 @@ public String toString() {
sb.append("uri=").append(uri);
sb.append(", user='").append(abfsStore.getUser()).append('\'');
sb.append(", primaryUserGroup='").append(abfsStore.getPrimaryGroup()).append('\'');
sb.append("[" + CAPABILITY_SAFE_READAHEAD + "]");
sb.append('}');
return sb.toString();
}
@ -1636,6 +1638,11 @@ public boolean hasPathCapability(final Path path, final String capability)
new TracingContext(clientCorrelationId, fileSystemId,
FSOperationType.HAS_PATH_CAPABILITY, tracingHeaderFormat,
listener));
// probe for presence of the HADOOP-18546 readahead fix.
case CAPABILITY_SAFE_READAHEAD:
return true;
default:
return super.hasPathCapability(p, capability);
}

View File

@ -0,0 +1,46 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.azurebfs.constants;
import org.apache.hadoop.classification.InterfaceAudience;
/**
* Constants which are used internally and which don't fit into the other
* classes.
* For use within the {@code hadoop-azure} module only.
*/
@InterfaceAudience.Private
public final class InternalConstants {
private InternalConstants() {
}
/**
* Does this version of the store have safe readahead?
* Possible combinations of this and the probe
* {@code "fs.capability.etags.available"}.
* <ol>
* <li>{@value}: store is safe</li>
* <li>!etags: store is safe</li>
* <li>etags && !{@value}: store is <i>UNSAFE</i></li>
* </ol>
*/
public static final String CAPABILITY_SAFE_READAHEAD =
"fs.azure.capability.readahead.safe";
}

View File

@ -50,6 +50,7 @@
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.ONE_KB;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.STREAM_ID_LEN;
import static org.apache.hadoop.fs.azurebfs.constants.InternalConstants.CAPABILITY_SAFE_READAHEAD;
import static org.apache.hadoop.util.StringUtils.toLowerCase;
/**
@ -828,11 +829,12 @@ public IOStatistics getIOStatistics() {
@Override
public String toString() {
final StringBuilder sb = new StringBuilder(super.toString());
if (streamStatistics != null) {
sb.append("AbfsInputStream@(").append(this.hashCode()).append("){");
sb.append(streamStatistics.toString());
sb.append("}");
sb.append("[" + CAPABILITY_SAFE_READAHEAD + "]");
if (streamStatistics != null) {
sb.append(", ").append(streamStatistics);
}
sb.append("}");
return sb.toString();
}

View File

@ -101,6 +101,7 @@ private void init() {
// hide instance constructor
private ReadBufferManager() {
LOGGER.trace("Creating readbuffer manager with HADOOP-18546 patch");
}

View File

@ -20,14 +20,22 @@
import java.net.URI;
import org.assertj.core.api.Assertions;
import org.junit.Test;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.azurebfs.constants.FileSystemUriSchemes;
import org.apache.hadoop.fs.azurebfs.services.AuthType;
import static org.apache.hadoop.fs.CommonPathCapabilities.ETAGS_AVAILABLE;
import static org.apache.hadoop.fs.CommonPathCapabilities.ETAGS_PRESERVED_IN_RENAME;
import static org.apache.hadoop.fs.CommonPathCapabilities.FS_ACLS;
import static org.apache.hadoop.fs.azurebfs.constants.InternalConstants.CAPABILITY_SAFE_READAHEAD;
import static org.junit.Assume.assumeTrue;
/**
* Test AzureBlobFileSystem initialization.
*/
@ -74,4 +82,28 @@ public void ensureSecureAzureBlobFileSystemIsInitialized() throws Exception {
assertNotNull("working directory", fs.getWorkingDirectory());
}
}
@Test
public void testFileSystemCapabilities() throws Throwable {
final AzureBlobFileSystem fs = getFileSystem();
final Path p = new Path("}");
// etags always present
Assertions.assertThat(fs.hasPathCapability(p, ETAGS_AVAILABLE))
.describedAs("path capability %s in %s", ETAGS_AVAILABLE, fs)
.isTrue();
// readahead always correct
Assertions.assertThat(fs.hasPathCapability(p, CAPABILITY_SAFE_READAHEAD))
.describedAs("path capability %s in %s", CAPABILITY_SAFE_READAHEAD, fs)
.isTrue();
// etags-over-rename and ACLs are either both true or both false.
final boolean etagsAcrossRename = fs.hasPathCapability(p, ETAGS_PRESERVED_IN_RENAME);
final boolean acls = fs.hasPathCapability(p, FS_ACLS);
Assertions.assertThat(etagsAcrossRename)
.describedAs("capabilities %s=%s and %s=%s in %s",
ETAGS_PRESERVED_IN_RENAME, etagsAcrossRename,
FS_ACLS, acls, fs)
.isEqualTo(acls);
}
}

View File

@ -44,9 +44,24 @@
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_READ_AHEAD_QUEUE_DEPTH;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.MIN_BUFFER_SIZE;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.ONE_MB;
import static org.apache.hadoop.fs.azurebfs.constants.InternalConstants.CAPABILITY_SAFE_READAHEAD;
import static org.apache.hadoop.test.LambdaTestUtils.eventually;
public class ITestReadBufferManager extends AbstractAbfsIntegrationTest {
/**
* Time before the JUnit test times out for eventually() clauses
* to fail. This copes with slow network connections and debugging
* sessions, yet still allows for tests to fail with meaningful
* messages.
*/
public static final int TIMEOUT_OFFSET = 5 * 60_000;
/**
* Interval between eventually preobes.
*/
public static final int PROBE_INTERVAL_MILLIS = 1_000;
public ITestReadBufferManager() throws Exception {
}
@ -61,6 +76,11 @@ public void testPurgeBufferManagerForParallelStreams() throws Exception {
}
ExecutorService executorService = Executors.newFixedThreadPool(4);
AzureBlobFileSystem fs = getABFSWithReadAheadConfig();
// verify that the fs has the capability to validate the fix
Assertions.assertThat(fs.hasPathCapability(new Path("/"), CAPABILITY_SAFE_READAHEAD))
.describedAs("path capability %s in %s", CAPABILITY_SAFE_READAHEAD, fs)
.isTrue();
try {
for (int i = 0; i < 4; i++) {
final String fileName = methodName.getMethodName() + i;
@ -80,9 +100,11 @@ public void testPurgeBufferManagerForParallelStreams() throws Exception {
}
ReadBufferManager bufferManager = ReadBufferManager.getBufferManager();
// verify there is no work in progress or the readahead queue.
assertListEmpty("InProgressList", bufferManager.getInProgressCopiedList());
// readahead queue is empty
assertListEmpty("ReadAheadQueue", bufferManager.getReadAheadQueueCopy());
// verify the in progress list eventually empties out.
eventually(getTestTimeoutMillis() - TIMEOUT_OFFSET, PROBE_INTERVAL_MILLIS, () ->
assertListEmpty("InProgressList", bufferManager.getInProgressCopiedList()));
}
private void assertListEmpty(String listName, List<ReadBuffer> list) {