NIFI-11924 Closing FileSystem after using in HDFSExternalResourceProvider

This closes #7588.

Signed-off-by: Peter Turcsanyi <turcsanyi@apache.org>
This commit is contained in:
Bence Simon 2023-08-09 13:36:26 +02:00 committed by Peter Turcsanyi
parent baea7ffee9
commit 7340bb8153
No known key found for this signature in database
GPG Key ID: 55A813F1C3E553DC
4 changed files with 198 additions and 21 deletions

View File

@ -376,20 +376,7 @@ public abstract class AbstractHadoopProcessor extends AbstractProcessor implemen
if (resources != null) {
// Attempt to close the FileSystem
final FileSystem fileSystem = resources.getFileSystem();
try {
interruptStatisticsThread(fileSystem);
} catch (Exception e) {
getLogger().warn("Error stopping FileSystem statistics thread: " + e.getMessage());
getLogger().debug("", e);
} finally {
if (fileSystem != null) {
try {
fileSystem.close();
} catch (IOException e) {
getLogger().warn("Error close FileSystem: " + e.getMessage(), e);
}
}
}
HDFSResourceHelper.closeFileSystem(fileSystem);
}
// Clear out the reference to the resources

View File

@ -0,0 +1,72 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.hadoop;
import org.apache.hadoop.fs.FileSystem;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.lang.reflect.Field;
public final class HDFSResourceHelper {
private static final Logger LOGGER = LoggerFactory.getLogger(HDFSResourceHelper.class);
private HDFSResourceHelper() {
// Not to be instantiated
}
public static void closeFileSystem(final FileSystem fileSystem) {
try {
interruptStatisticsThread(fileSystem);
} catch (Exception e) {
LOGGER.warn("Error stopping FileSystem statistics thread: " + e.getMessage());
LOGGER.debug("", e);
} finally {
if (fileSystem != null) {
try {
fileSystem.close();
} catch (IOException e) {
LOGGER.warn("Error close FileSystem: " + e.getMessage(), e);
}
}
}
}
private static void interruptStatisticsThread(final FileSystem fileSystem) throws NoSuchFieldException, IllegalAccessException {
final Field statsField = FileSystem.class.getDeclaredField("statistics");
statsField.setAccessible(true);
final Object statsObj = statsField.get(fileSystem);
if (statsObj instanceof FileSystem.Statistics) {
final FileSystem.Statistics statistics = (FileSystem.Statistics) statsObj;
final Field statsThreadField = statistics.getClass().getDeclaredField("STATS_DATA_CLEANER");
statsThreadField.setAccessible(true);
final Object statsThreadObj = statsThreadField.get(statistics);
if (statsThreadObj instanceof Thread) {
final Thread statsThread = (Thread) statsThreadObj;
try {
statsThread.interrupt();
} catch (Exception e) {
LOGGER.warn("Error interrupting thread: " + e.getMessage(), e);
}
}
}
}
}

View File

@ -31,6 +31,7 @@ import org.apache.nifi.flow.resource.ExternalResourceProviderInitializationConte
import org.apache.nifi.flow.resource.ImmutableExternalResourceDescriptor;
import org.apache.nifi.hadoop.SecurityUtil;
import org.apache.nifi.processors.hadoop.ExtendedConfiguration;
import org.apache.nifi.processors.hadoop.HDFSResourceHelper;
import org.apache.nifi.processors.hadoop.HdfsResources;
import org.apache.nifi.security.krb.KerberosKeytabUser;
import org.apache.nifi.security.krb.KerberosPasswordUser;
@ -51,6 +52,11 @@ import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
// Implementation considerations: The public methods are considered as steps orchestrated by clients. As of this, there is no direct dependency
// or connection between {@code listResources} and {@code fetchExternalResource}: both are self-sufficing actions. As of this they do not share
// a {@code FileSystem} instance but every method is responsible for collecting and maintaining one. This comes with a minimal overhead but due to
// the nature of the service the method calls are relatively rare. Alternatively a provider could have a FileService instance maintained during its
// lifecycle but that is considered a more error-prone approach as it comes with logic regularly checking for the state of the maintained instance.
@RequiresInstanceClassLoading(cloneAncestorResources = true)
public class HDFSExternalResourceProvider implements ExternalResourceProvider {
private static final Logger LOGGER = LoggerFactory.getLogger(HDFSExternalResourceProvider.class);
@ -104,7 +110,6 @@ public class HDFSExternalResourceProvider implements ExternalResourceProvider {
final HdfsResources hdfsResources = getHdfsResources();
try {
final FileStatus[] fileStatuses = hdfsResources.getUserGroupInformation()
.doAs((PrivilegedExceptionAction<FileStatus[]>) () -> hdfsResources.getFileSystem().listStatus(sourceDirectory));
@ -122,6 +127,8 @@ public class HDFSExternalResourceProvider implements ExternalResourceProvider {
} catch (final InterruptedException e) {
Thread.currentThread().interrupt();
throw new IOException("Provider cannot list resources", e);
} finally {
HDFSResourceHelper.closeFileSystem(hdfsResources.getFileSystem());
}
}
@ -140,13 +147,18 @@ public class HDFSExternalResourceProvider implements ExternalResourceProvider {
final HdfsResources hdfsResources = getHdfsResources();
try {
return hdfsResources.getUserGroupInformation().doAs((PrivilegedExceptionAction<FSDataInputStream>) () -> {
if (!hdfsResources.getFileSystem().exists(path)) {
throw new IOException("Cannot find file in HDFS at location " + location);
}
final FSDataInputStream fsDataInputStream =
hdfsResources.getUserGroupInformation().doAs((PrivilegedExceptionAction<FSDataInputStream>) () -> {
if (!hdfsResources.getFileSystem().exists(path)) {
throw new IOException("Cannot find file in HDFS at location " + location);
}
return hdfsResources.getFileSystem().open(path, BUFFER_SIZE_DEFAULT);
});
return hdfsResources.getFileSystem().open(path, BUFFER_SIZE_DEFAULT);
});
// The acquired InputStream is used by the client and for this reason the FileSystem cannot be closed here.
// The closing of the file system is delegated to the decorator (HDFSResourceInputStream) which will close
// it when the decorated input stream is closed.
return new HDFSResourceInputStream(hdfsResources.getFileSystem(), fsDataInputStream);
} catch (final InterruptedException e) {
Thread.currentThread().interrupt();
throw new IOException("Error during acquiring file", e);

View File

@ -0,0 +1,106 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.flow.resource.hadoop;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.nifi.processors.hadoop.HDFSResourceHelper;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
final class HDFSResourceInputStream extends InputStream {
private final FileSystem fileSystem;
private final FSDataInputStream inputStream;
HDFSResourceInputStream(final FileSystem fileSystem, final FSDataInputStream inputStream) {
this.fileSystem = fileSystem;
this.inputStream = inputStream;
}
@Override
public int read() throws IOException {
return inputStream.read();
}
@Override
public int read(final byte[] b) throws IOException {
return inputStream.read(b);
}
@Override
public int read(final byte[] b, final int off, final int len) throws IOException {
return inputStream.read(b, off, len);
}
@Override
public byte[] readAllBytes() throws IOException {
return inputStream.readAllBytes();
}
@Override
public byte[] readNBytes(final int len) throws IOException {
return inputStream.readNBytes(len);
}
@Override
public int readNBytes(final byte[] b, final int off, final int len) throws IOException {
return inputStream.readNBytes(b, off, len);
}
@Override
public long skip(final long n) throws IOException {
return inputStream.skip(n);
}
@Override
public void skipNBytes(final long n) throws IOException {
inputStream.skipNBytes(n);
}
@Override
public int available() throws IOException {
return inputStream.available();
}
@Override
public void close() throws IOException {
inputStream.close();
HDFSResourceHelper.closeFileSystem(fileSystem);
}
@Override
public synchronized void mark(final int readlimit) {
inputStream.mark(readlimit);
}
@Override
public synchronized void reset() throws IOException {
inputStream.reset();
}
@Override
public boolean markSupported() {
return inputStream.markSupported();
}
@Override
public long transferTo(final OutputStream out) throws IOException {
return inputStream.transferTo(out);
}
}