HADOOP-16665. Filesystems to be closed if they failed during initialize().

Contributed by Steve Loughran.

This FileSystem instantiation so if an IOException or RuntimeException is
raised in the invocation of FileSystem.initialize() then a best-effort
attempt is made to close the FS instance; exceptions raised that there
are swallowed.

The S3AFileSystem is also modified to do its own cleanup if an
IOException is raised during its initialize() process, it being the
FS we know has the "potential" to leak threads, especially in
extension points (e.g AWS Authenticators) which spawn threads.

Change-Id: Ib84073a606c9d53bf53cbfca4629876a03894f04
This commit is contained in:
Steve Loughran 2019-11-12 18:17:02 +00:00
parent b83b9ab418
commit 990063d2af
No known key found for this signature in database
GPG Key ID: D22CF846DBB162A0
12 changed files with 338 additions and 63 deletions

View File

@ -63,6 +63,7 @@
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsCreateModes;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.MultipleIOException;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.AccessControlException;
@ -3391,9 +3392,22 @@ private static FileSystem createFileSystem(URI uri, Configuration conf)
Tracer tracer = FsTracer.get(conf);
try(TraceScope scope = tracer.newScope("FileSystem#createFileSystem")) {
scope.addKVAnnotation("scheme", uri.getScheme());
Class<?> clazz = getFileSystemClass(uri.getScheme(), conf);
FileSystem fs = (FileSystem)ReflectionUtils.newInstance(clazz, conf);
fs.initialize(uri, conf);
Class<? extends FileSystem> clazz =
getFileSystemClass(uri.getScheme(), conf);
FileSystem fs = ReflectionUtils.newInstance(clazz, conf);
try {
fs.initialize(uri, conf);
} catch (IOException | RuntimeException e) {
// exception raised during initialization.
// log summary at warn and full stack at debug
LOGGER.warn("Failed to initialize fileystem {}: {}",
uri, e.toString());
LOGGER.debug("Failed to initialize fileystem", e);
// then (robustly) close the FS, so as to invoke any
// cleanup code.
IOUtils.cleanupWithLogger(LOGGER, fs);
throw e;
}
return fs;
}
}

View File

@ -18,14 +18,24 @@
package org.apache.hadoop.fs;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.util.Progressable;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.URI;
import java.net.URL;
import java.util.ServiceConfigurationError;
import org.junit.Test;
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.Assert.*;
/**
* Tests related to filesystem creation and lifecycle.
*/
public class TestFileSystemInitialization {
/**
@ -55,4 +65,119 @@ public void testMissingLibraries() {
} catch (Exception | ServiceConfigurationError expected) {
}
}
@Test
public void testNewInstanceFailure() throws Throwable {
intercept(IOException.class, FailingFileSystem.INITIALIZE, () ->
FileSystem.newInstance(new URI("failing://localhost"), FailingFileSystem
.failingConf()));
assertThat(FailingFileSystem.initCount).describedAs("init count")
.isEqualTo(1);
assertThat(FailingFileSystem.closeCount).describedAs("close count")
.isEqualTo(1);
}
/**
* An FS which will fail on both init and close, and update
* counters of invocations as it does so.
*/
public static class FailingFileSystem extends FileSystem {
public static final String INITIALIZE = "initialize()";
public static final String CLOSE = "close()";
private static int initCount;
private static int closeCount;
private static Configuration failingConf() {
final Configuration conf = new Configuration(false);
conf.setClass("fs.failing.impl", FailingFileSystem.class,
FileSystem.class);
return conf;
}
@Override
public void initialize(final URI name, final Configuration conf)
throws IOException {
super.initialize(name, conf);
initCount++;
throw new IOException(INITIALIZE);
}
@Override
public void close() throws IOException {
closeCount++;
throw new IOException(CLOSE);
}
@Override
public URI getUri() {
return null;
}
@Override
public FSDataInputStream open(final Path f, final int bufferSize)
throws IOException {
return null;
}
@Override
public FSDataOutputStream create(final Path f,
final FsPermission permission,
final boolean overwrite,
final int bufferSize,
final short replication,
final long blockSize,
final Progressable progress) throws IOException {
return null;
}
@Override
public FSDataOutputStream append(final Path f,
final int bufferSize,
final Progressable progress) throws IOException {
return null;
}
@Override
public boolean rename(final Path src, final Path dst) throws IOException {
return false;
}
@Override
public boolean delete(final Path f, final boolean recursive)
throws IOException {
return false;
}
@Override
public FileStatus[] listStatus(final Path f)
throws FileNotFoundException, IOException {
return new FileStatus[0];
}
@Override
public void setWorkingDirectory(final Path new_dir) {
}
@Override
public Path getWorkingDirectory() {
return null;
}
@Override
public boolean mkdirs(final Path f, final FsPermission permission)
throws IOException {
return false;
}
@Override
public FileStatus getFileStatus(final Path f) throws IOException {
return null;
}
}
}

View File

@ -0,0 +1,110 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.test;
import java.util.concurrent.TimeUnit;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.rules.TestName;
import org.junit.rules.Timeout;
/**
* A base class for JUnit5+ tests that sets a default timeout for all tests
* that subclass this test.
*
* Threads are named to the method being executed, for ease of diagnostics
* in logs and thread dumps.
*
* Unlike {@link HadoopTestBase} this class does not extend JUnit Assert
* so is easier to use with AssertJ.
*/
public abstract class AbstractHadoopTestBase {
/**
* System property name to set the test timeout: {@value}.
*/
public static final String PROPERTY_TEST_DEFAULT_TIMEOUT =
"test.default.timeout";
/**
* The default timeout (in milliseconds) if the system property
* {@link #PROPERTY_TEST_DEFAULT_TIMEOUT}
* is not set: {@value}.
*/
public static final int TEST_DEFAULT_TIMEOUT_VALUE = 100000;
/**
* The JUnit rule that sets the default timeout for tests.
*/
@Rule
public Timeout defaultTimeout = retrieveTestTimeout();
/**
* Retrieve the test timeout from the system property
* {@link #PROPERTY_TEST_DEFAULT_TIMEOUT}, falling back to
* the value in {@link #TEST_DEFAULT_TIMEOUT_VALUE} if the
* property is not defined.
* @return the recommended timeout for tests
*/
public static Timeout retrieveTestTimeout() {
String propval = System.getProperty(PROPERTY_TEST_DEFAULT_TIMEOUT,
Integer.toString(
TEST_DEFAULT_TIMEOUT_VALUE));
int millis;
try {
millis = Integer.parseInt(propval);
} catch (NumberFormatException e) {
//fall back to the default value, as the property cannot be parsed
millis = TEST_DEFAULT_TIMEOUT_VALUE;
}
return new Timeout(millis, TimeUnit.MILLISECONDS);
}
/**
* The method name.
*/
@Rule
public TestName methodName = new TestName();
/**
* Get the method name; defaults to the value of {@link #methodName}.
* Subclasses may wish to override it, which will tune the thread naming.
* @return the name of the method.
*/
protected String getMethodName() {
return methodName.getMethodName();
}
/**
* Static initializer names this thread "JUnit".
*/
@BeforeClass
public static void nameTestThread() {
Thread.currentThread().setName("JUnit");
}
/**
* Before each method, the thread is renamed to match the method name.
*/
@Before
public void nameThreadToMethod() {
Thread.currentThread().setName("JUnit-" + getMethodName());
}
}

View File

@ -17,6 +17,8 @@
*/
package org.apache.hadoop.test;
import java.util.concurrent.TimeUnit;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
@ -70,7 +72,7 @@ public static Timeout retrieveTestTimeout() {
//fall back to the default value, as the property cannot be parsed
millis = TEST_DEFAULT_TIMEOUT_VALUE;
}
return new Timeout(millis);
return new Timeout(millis, TimeUnit.MILLISECONDS);
}
/**

View File

@ -53,6 +53,7 @@
import static org.apache.hadoop.fs.s3a.S3AUtils.*;
import static org.apache.hadoop.fs.s3a.Statistic.*;
import static org.apache.hadoop.io.IOUtils.cleanupWithLogger;
/**
* Upload files/parts directly via different buffering mechanisms:
@ -396,9 +397,9 @@ public void close() throws IOException {
writeOperationHelper.writeFailed(ioe);
throw ioe;
} finally {
closeAll(LOG, block, blockFactory);
cleanupWithLogger(LOG, block, blockFactory);
LOG.debug("Statistics: {}", statistics);
closeAll(LOG, statistics);
cleanupWithLogger(LOG, statistics);
clearActiveBlock();
}
// Note end of write. This does not change the state of the remote FS.
@ -437,7 +438,7 @@ private int putObject() throws IOException {
// stream afterwards.
return writeOperationHelper.putObject(putObjectRequest);
} finally {
closeAll(LOG, uploadData, block);
cleanupWithLogger(LOG, uploadData, block);
}
});
clearActiveBlock();
@ -614,7 +615,7 @@ private void uploadBlockAsync(final S3ADataBlocks.DataBlock block)
return partETag;
} finally {
// close the stream and block
closeAll(LOG, uploadData, block);
cleanupWithLogger(LOG, uploadData, block);
}
});
partETagsFutures.add(partETagFuture);

View File

@ -40,7 +40,7 @@
import org.apache.hadoop.util.DirectBufferPool;
import static org.apache.hadoop.fs.s3a.S3ADataBlocks.DataBlock.DestState.*;
import static org.apache.hadoop.fs.s3a.S3AUtils.closeAll;
import static org.apache.hadoop.io.IOUtils.cleanupWithLogger;
/**
* Set of classes to support output streaming into blocks which are then
@ -155,7 +155,7 @@ InputStream getUploadStream() {
*/
@Override
public void close() throws IOException {
closeAll(LOG, uploadStream);
cleanupWithLogger(LOG, uploadStream);
}
}

View File

@ -306,22 +306,22 @@ public void initialize(URI name, Configuration originalConf)
throws IOException {
// get the host; this is guaranteed to be non-null, non-empty
bucket = name.getHost();
LOG.debug("Initializing S3AFileSystem for {}", bucket);
// clone the configuration into one with propagated bucket options
Configuration conf = propagateBucketOptions(originalConf, bucket);
// patch the Hadoop security providers
patchSecurityCredentialProviders(conf);
// look for delegation token support early.
boolean delegationTokensEnabled = hasDelegationTokenBinding(conf);
if (delegationTokensEnabled) {
LOG.debug("Using delegation tokens");
}
// set the URI, this will do any fixup of the URI to remove secrets,
// canonicalize.
setUri(name, delegationTokensEnabled);
super.initialize(uri, conf);
setConf(conf);
try {
LOG.debug("Initializing S3AFileSystem for {}", bucket);
// clone the configuration into one with propagated bucket options
Configuration conf = propagateBucketOptions(originalConf, bucket);
// patch the Hadoop security providers
patchSecurityCredentialProviders(conf);
// look for delegation token support early.
boolean delegationTokensEnabled = hasDelegationTokenBinding(conf);
if (delegationTokensEnabled) {
LOG.debug("Using delegation tokens");
}
// set the URI, this will do any fixup of the URI to remove secrets,
// canonicalize.
setUri(name, delegationTokensEnabled);
super.initialize(uri, conf);
setConf(conf);
// look for encryption data
// DT Bindings may override this
@ -381,6 +381,9 @@ public void initialize(URI name, Configuration originalConf)
initCannedAcls(conf);
// This initiates a probe against S3 for the bucket existing.
// It is where all network and authentication configuration issues
// surface, and is potentially slow.
verifyBucketExists();
inputPolicy = S3AInputPolicy.getPolicy(
@ -436,7 +439,13 @@ public void initialize(URI name, Configuration originalConf)
initMultipartUploads(conf);
} catch (AmazonClientException e) {
// amazon client exception: stop all services then throw the translation
stopAllServices();
throw translateException("initializing ", new Path(name), e);
} catch (IOException | RuntimeException e) {
// other exceptions: stop the services.
stopAllServices();
throw e;
}
}
@ -3118,27 +3127,43 @@ public void close() throws IOException {
try {
super.close();
} finally {
if (transfers != null) {
transfers.shutdownNow(true);
transfers = null;
}
HadoopExecutors.shutdown(boundedThreadPool, LOG,
THREAD_POOL_SHUTDOWN_DELAY_SECONDS, TimeUnit.SECONDS);
boundedThreadPool = null;
HadoopExecutors.shutdown(unboundedThreadPool, LOG,
THREAD_POOL_SHUTDOWN_DELAY_SECONDS, TimeUnit.SECONDS);
unboundedThreadPool = null;
S3AUtils.closeAll(LOG, metadataStore, instrumentation);
metadataStore = null;
instrumentation = null;
closeAutocloseables(LOG, credentials);
cleanupWithLogger(LOG, delegationTokens.orElse(null));
cleanupWithLogger(LOG, signerManager);
signerManager = null;
credentials = null;
stopAllServices();
}
}
/**
* Stop all services.
* This is invoked in close() and during failures of initialize()
* -make sure that all operations here are robust to failures in
* both the expected state of this FS and of failures while being stopped.
*/
protected synchronized void stopAllServices() {
if (transfers != null) {
try {
transfers.shutdownNow(true);
} catch (RuntimeException e) {
// catch and swallow for resilience.
LOG.debug("When shutting down", e);
}
transfers = null;
}
HadoopExecutors.shutdown(boundedThreadPool, LOG,
THREAD_POOL_SHUTDOWN_DELAY_SECONDS, TimeUnit.SECONDS);
boundedThreadPool = null;
HadoopExecutors.shutdown(unboundedThreadPool, LOG,
THREAD_POOL_SHUTDOWN_DELAY_SECONDS, TimeUnit.SECONDS);
unboundedThreadPool = null;
closeAutocloseables(LOG, credentials);
cleanupWithLogger(LOG,
metadataStore,
instrumentation,
delegationTokens.orElse(null),
signerManager);
delegationTokens = Optional.empty();
signerManager = null;
credentials = null;
}
/**
* Verify that the input stream is open. Non blocking; this gives
* the last state of the volatile {@link #closed} field.

View File

@ -86,6 +86,7 @@
import static org.apache.commons.lang3.StringUtils.isEmpty;
import static org.apache.hadoop.fs.s3a.Constants.*;
import static org.apache.hadoop.fs.s3a.impl.MultiObjectDeleteSupport.translateDeleteException;
import static org.apache.hadoop.io.IOUtils.cleanupWithLogger;
/**
* Utility methods for S3A code.
@ -1613,26 +1614,17 @@ private static String passwordDiagnostics(String pass, String description) {
/**
* Close the Closeable objects and <b>ignore</b> any Exception or
* null pointers.
* (This is the SLF4J equivalent of that in {@code IOUtils}).
* This is obsolete: use
* {@link org.apache.hadoop.io.IOUtils#cleanupWithLogger(Logger, Closeable...)}
* @param log the log to log at debug level. Can be null.
* @param closeables the objects to close
*/
@Deprecated
public static void closeAll(Logger log,
Closeable... closeables) {
if (log == null) {
log = LOG;
}
for (Closeable c : closeables) {
if (c != null) {
try {
log.debug("Closing {}", c);
c.close();
} catch (Exception e) {
log.debug("Exception in closing {}", c, e);
}
}
}
cleanupWithLogger(log, closeables);
}
/**
* Close the Closeable objects and <b>ignore</b> any Exception or
* null pointers.

View File

@ -43,6 +43,7 @@
import org.apache.hadoop.util.Progressable;
import static com.google.common.base.Preconditions.checkNotNull;
import static org.apache.hadoop.io.IOUtils.cleanupWithLogger;
/**
* Relays FS calls to the mocked FS, allows for some extra logging with
@ -148,6 +149,11 @@ public void initialize(URI name, Configuration originalConf)
writeHelper = new WriteOperationHelper(this, conf);
}
@Override
public void close() {
cleanupWithLogger(LOG, instrumentation);
}
@Override
public WriteOperationHelper getWriteOperationHelper() {
return writeHelper;

View File

@ -45,7 +45,6 @@
import org.apache.hadoop.fs.s3a.MultipartUtils;
import org.apache.hadoop.fs.s3a.S3AFileSystem;
import org.apache.hadoop.fs.s3a.S3ATestConstants;
import org.apache.hadoop.fs.s3a.S3AUtils;
import org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider;
import org.apache.hadoop.fs.s3a.commit.CommitConstants;
import org.apache.hadoop.fs.s3a.commit.CommitOperations;
@ -63,6 +62,7 @@
import static org.apache.hadoop.fs.s3a.auth.RoleTestUtils.forbidden;
import static org.apache.hadoop.fs.s3a.auth.RoleTestUtils.newAssumedRoleConfig;
import static org.apache.hadoop.fs.s3a.s3guard.S3GuardToolTestHelper.exec;
import static org.apache.hadoop.io.IOUtils.cleanupWithLogger;
import static org.apache.hadoop.test.GenericTestUtils.assertExceptionContains;
import static org.apache.hadoop.test.LambdaTestUtils.*;
@ -106,7 +106,7 @@ public void setup() throws Exception {
@Override
public void teardown() throws Exception {
S3AUtils.closeAll(LOG, roleFS);
cleanupWithLogger(LOG, roleFS);
super.teardown();
}

View File

@ -26,7 +26,6 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.s3a.S3AFileSystem;
import org.apache.hadoop.fs.s3a.S3AUtils;
import org.apache.hadoop.fs.s3a.commit.ITestCommitOperations;
import static org.apache.hadoop.fs.s3a.Constants.ASSUMED_ROLE_ARN;
@ -34,6 +33,7 @@
import static org.apache.hadoop.fs.s3a.auth.RoleModel.*;
import static org.apache.hadoop.fs.s3a.auth.RolePolicies.*;
import static org.apache.hadoop.fs.s3a.auth.RoleTestUtils.*;
import static org.apache.hadoop.io.IOUtils.cleanupWithLogger;
/**
* Verify that the commit operations work with a restricted set of operations.
@ -84,7 +84,7 @@ public void setup() throws Exception {
@Override
public void teardown() throws Exception {
S3AUtils.closeAll(LOG, roleFS);
cleanupWithLogger(LOG, roleFS);
// switches getFileSystem() back to the full FS.
roleFS = null;
super.teardown();

View File

@ -45,7 +45,6 @@
import org.apache.hadoop.fs.s3a.AbstractS3ATestBase;
import org.apache.hadoop.fs.s3a.S3AFileSystem;
import org.apache.hadoop.fs.s3a.S3ATestUtils;
import org.apache.hadoop.fs.s3a.S3AUtils;
import org.apache.hadoop.fs.s3a.Statistic;
import org.apache.hadoop.fs.s3a.s3guard.DynamoDBMetadataStore;
import org.apache.hadoop.mapred.LocatedFileStatusFetcher;
@ -71,6 +70,7 @@
import static org.apache.hadoop.fs.s3a.auth.RolePolicies.*;
import static org.apache.hadoop.fs.s3a.auth.RoleTestUtils.bindRolePolicyStatements;
import static org.apache.hadoop.fs.s3a.auth.RoleTestUtils.newAssumedRoleConfig;
import static org.apache.hadoop.io.IOUtils.cleanupWithLogger;
import static org.apache.hadoop.mapreduce.lib.input.FileInputFormat.LIST_STATUS_NUM_THREADS;
import static org.apache.hadoop.test.GenericTestUtils.assertExceptionContains;
import static org.apache.hadoop.test.GenericTestUtils.failif;
@ -255,7 +255,7 @@ public void teardown() throws Exception {
try {
super.teardown();
} finally {
S3AUtils.closeAll(LOG, readonlyFS);
cleanupWithLogger(LOG, readonlyFS);
}
}