From 92b4a3208fddd44f0d7e3d618761b9db238b758c Mon Sep 17 00:00:00 2001 From: Bryan Bende Date: Fri, 27 Apr 2018 14:52:58 -0400 Subject: [PATCH] NIFI-5136 Ensure processor references are removed from LogRepository and from ProcessScheduler - Forcing FileSystem statistics thread to be interrupted when HDFS processors are stopped - Stop creating temp components during import from registry, use bundle info instead This closes #2668. Signed-off-by: Mark Payne --- .../hadoop/AbstractHadoopProcessor.java | 57 +++++++++++++++++++ .../nifi/controller/ProcessScheduler.java | 8 +++ .../nifi/logging/LogRepositoryFactory.java | 4 ++ .../nifi/controller/FlowController.java | 15 ++++- .../scheduling/StandardProcessScheduler.java | 5 ++ .../StandardControllerServiceProvider.java | 2 + .../nifi/groups/StandardProcessGroup.java | 29 ++++++---- 7 files changed, 106 insertions(+), 14 deletions(-) diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java b/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java index 1eca5af5ba..4104e3056c 100644 --- a/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java @@ -23,6 +23,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.compress.CompressionCodecFactory; import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.security.SaslPlainServer; import org.apache.hadoop.security.UserGroupInformation; import org.apache.nifi.annotation.behavior.RequiresInstanceClassLoading; import org.apache.nifi.annotation.lifecycle.OnScheduled; @@ -44,10 +45,12 @@ import javax.net.SocketFactory; import java.io.File; import java.io.IOException; import java.lang.ref.WeakReference; +import java.lang.reflect.Field; import java.net.InetSocketAddress; import java.net.Socket; import java.net.URI; import java.security.PrivilegedExceptionAction; +import java.security.Security; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -257,9 +260,63 @@ public abstract class AbstractHadoopProcessor extends AbstractProcessor { @OnStopped public final void abstractOnStopped() { + final HdfsResources resources = hdfsResources.get(); + if (resources != null) { + // Attempt to close the FileSystem + final FileSystem fileSystem = resources.getFileSystem(); + try { + interruptStatisticsThread(fileSystem); + } catch (Exception e) { + getLogger().warn("Error stopping FileSystem statistics thread: " + e.getMessage(), e); + } finally { + if (fileSystem != null) { + try { + fileSystem.close(); + } catch (IOException e) { + getLogger().warn("Error close FileSystem: " + e.getMessage(), e); + } + } + } + + // Clean-up the static reference to the Configuration instance + UserGroupInformation.setConfiguration(new Configuration()); + + // Clean-up the reference to the InstanceClassLoader that was put into Configuration + final Configuration configuration = resources.getConfiguration(); + configuration.setClassLoader(null); + + // Need to remove the Provider instance from the JVM's Providers class so that InstanceClassLoader can be GC'd eventually + final SaslPlainServer.SecurityProvider saslProvider = new SaslPlainServer.SecurityProvider(); + Security.removeProvider(saslProvider.getName()); + } + + // Clear out the reference to the resources hdfsResources.set(new HdfsResources(null, null, null)); } + private void interruptStatisticsThread(final FileSystem fileSystem) throws NoSuchFieldException, IllegalAccessException { + final Field statsField = FileSystem.class.getDeclaredField("statistics"); + statsField.setAccessible(true); + + final Object statsObj = statsField.get(fileSystem); + if (statsObj != null && statsObj instanceof FileSystem.Statistics) { + final FileSystem.Statistics statistics = (FileSystem.Statistics) statsObj; + + final Field statsThreadField = statistics.getClass().getDeclaredField("STATS_DATA_CLEANER"); + statsThreadField.setAccessible(true); + + final Object statsThreadObj = statsThreadField.get(statistics); + if (statsThreadObj != null && statsThreadObj instanceof Thread) { + final Thread statsThread = (Thread) statsThreadObj; + try { + statsThread.interrupt(); + } catch (Exception e) { + getLogger().warn("Error interrupting thread: " + e.getMessage(), e); + } + } + } + } + private static Configuration getConfigurationFromResources(final Configuration config, String configResources) throws IOException { boolean foundResources = false; if (null != configResources) { diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessScheduler.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessScheduler.java index 9231382f13..de005e4b35 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessScheduler.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessScheduler.java @@ -83,6 +83,14 @@ public interface ProcessScheduler { */ void terminateProcessor(ProcessorNode procNode); + /* + * Notifies the schedule that the given processor is being removed so the scheduler may clean up any resources + * related to the given processor. + * + * @param procNode the processor node being removed + */ + void onProcessorRemoved(ProcessorNode procNode); + /** * Starts scheduling the given Port to run. If the Port is already scheduled * to run, does nothing. diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/logging/LogRepositoryFactory.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/logging/LogRepositoryFactory.java index d7fa3fc4ef..530b6ef297 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/logging/LogRepositoryFactory.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/logging/LogRepositoryFactory.java @@ -58,4 +58,8 @@ public class LogRepositoryFactory { return repository; } + + public static LogRepository removeRepository(final String componentId) { + return repositoryMap.remove(requireNonNull(componentId)); + } } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java index ba4075e71b..92ab7b36b0 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java @@ -1129,6 +1129,10 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R boolean creationSuccessful; LoggableComponent processor; + + // make sure the first reference to LogRepository happens outside of a NarCloseable so that we use the framework's ClassLoader + final LogRepository logRepository = LogRepositoryFactory.getRepository(id); + try { processor = instantiateProcessor(type, id, coordinate, additionalUrls); creationSuccessful = true; @@ -1154,7 +1158,6 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R componentType, type, nifiProperties, componentVarRegistry, this, true); } - final LogRepository logRepository = LogRepositoryFactory.getRepository(id); if (registerLogObserver) { logRepository.addObserver(StandardProcessorNode.BULLETIN_OBSERVER_ID, LogLevel.WARN, new ProcessorLogObserver(getBulletinRepository(), procNode)); } @@ -3371,6 +3374,10 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R LoggableComponent task = null; boolean creationSuccessful = true; + + // make sure the first reference to LogRepository happens outside of a NarCloseable so that we use the framework's ClassLoader + final LogRepository logRepository = LogRepositoryFactory.getRepository(id); + try { task = instantiateReportingTask(type, id, bundleCoordinate, additionalUrls); } catch (final Exception e) { @@ -3418,7 +3425,6 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R reportingTasks.put(id, taskNode); // Register log observer to provide bulletins when reporting task logs anything at WARN level or above - final LogRepository logRepository = LogRepositoryFactory.getRepository(id); logRepository.addObserver(StandardProcessorNode.BULLETIN_OBSERVER_ID, LogLevel.WARN, new ReportingTaskLogObserver(getBulletinRepository(), taskNode)); } @@ -3551,6 +3557,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R } reportingTasks.remove(reportingTaskNode.getIdentifier()); + LogRepositoryFactory.removeRepository(reportingTaskNode.getIdentifier()); ExtensionManager.removeInstanceClassLoader(reportingTaskNode.getIdentifier()); } @@ -3565,10 +3572,12 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R @Override public ControllerServiceNode createControllerService(final String type, final String id, final BundleCoordinate bundleCoordinate, final Set additionalUrls, final boolean firstTimeAdded) { + // make sure the first reference to LogRepository happens outside of a NarCloseable so that we use the framework's ClassLoader + final LogRepository logRepository = LogRepositoryFactory.getRepository(id); + final ControllerServiceNode serviceNode = controllerServiceProvider.createControllerService(type, id, bundleCoordinate, additionalUrls, firstTimeAdded); // Register log observer to provide bulletins when reporting task logs anything at WARN level or above - final LogRepository logRepository = LogRepositoryFactory.getRepository(id); logRepository.addObserver(StandardProcessorNode.BULLETIN_OBSERVER_ID, LogLevel.WARN, new ControllerServiceLogObserver(getBulletinRepository(), serviceNode)); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java index 7315ef62e2..491abf0303 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java @@ -377,6 +377,11 @@ public final class StandardProcessScheduler implements ProcessScheduler { LOG.info("Successfully terminated {} with {} active threads", procNode, tasksTerminated); } + @Override + public void onProcessorRemoved(final ProcessorNode procNode) { + this.lifecycleStates.remove(procNode); + } + @Override public void yield(final ProcessorNode procNode) { // This exists in the ProcessScheduler so that the scheduler can take diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java index cba19787f5..640fcf79e0 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java @@ -62,6 +62,7 @@ import org.apache.nifi.controller.scheduling.StandardProcessScheduler; import org.apache.nifi.events.BulletinFactory; import org.apache.nifi.groups.ProcessGroup; import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.logging.LogRepositoryFactory; import org.apache.nifi.nar.ExtensionManager; import org.apache.nifi.nar.NarCloseable; import org.apache.nifi.processor.SimpleProcessLogger; @@ -703,6 +704,7 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi } group.removeControllerService(serviceNode); + LogRepositoryFactory.removeRepository(serviceNode.getIdentifier()); ExtensionManager.removeInstanceClassLoader(serviceNode.getIdentifier()); serviceCache.remove(serviceNode.getIdentifier()); } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java index 551d39e8bf..7357756bfb 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java @@ -914,8 +914,8 @@ public final class StandardProcessGroup implements ProcessGroup { processors.remove(id); onComponentModified(); + scheduler.onProcessorRemoved(processor); flowController.onProcessorRemoved(processor); - LogRepositoryFactory.getRepository(processor.getIdentifier()).removeAllObservers(); final StateManagerProvider stateManagerProvider = flowController.getStateManagerProvider(); scheduler.submitFrameworkTask(new Runnable() { @@ -936,6 +936,7 @@ public final class StandardProcessGroup implements ProcessGroup { } finally { if (removed) { try { + LogRepositoryFactory.removeRepository(processor.getIdentifier()); ExtensionManager.removeInstanceClassLoader(id); } catch (Throwable t) { } @@ -4462,11 +4463,14 @@ public final class StandardProcessGroup implements ProcessGroup { .forEach(proc -> proposedProcessors.remove(proc.getVersionedComponentId().get())); for (final VersionedProcessor processorToAdd : proposedProcessors.values()) { - final BundleCoordinate coordinate = toCoordinate(processorToAdd.getBundle()); - try { - flowController.createProcessor(processorToAdd.getType(), UUID.randomUUID().toString(), coordinate, false); - } catch (Exception e) { - throw new IllegalArgumentException("Unable to create Processor of type " + processorToAdd.getType(), e); + final String processorToAddClass = processorToAdd.getType(); + final BundleCoordinate processorToAddCoordinate = toCoordinate(processorToAdd.getBundle()); + + final boolean bundleExists = ExtensionManager.getBundles(processorToAddClass).stream() + .anyMatch(b -> processorToAddCoordinate.equals(b.getBundleDetails().getCoordinate())); + + if (!bundleExists) { + throw new IllegalArgumentException("Unknown bundle " + processorToAddCoordinate.toString() + " for processor type " + processorToAddClass); } } @@ -4479,11 +4483,14 @@ public final class StandardProcessGroup implements ProcessGroup { .forEach(service -> proposedServices.remove(service.getVersionedComponentId().get())); for (final VersionedControllerService serviceToAdd : proposedServices.values()) { - final BundleCoordinate coordinate = toCoordinate(serviceToAdd.getBundle()); - try { - flowController.createControllerService(serviceToAdd.getType(), UUID.randomUUID().toString(), coordinate, Collections.emptySet(), false); - } catch (Exception e) { - throw new IllegalArgumentException("Unable to create Controller Service of type " + serviceToAdd.getType(), e); + final String serviceToAddClass = serviceToAdd.getType(); + final BundleCoordinate serviceToAddCoordinate = toCoordinate(serviceToAdd.getBundle()); + + final boolean bundleExists = ExtensionManager.getBundles(serviceToAddClass).stream() + .anyMatch(b -> serviceToAddCoordinate.equals(b.getBundleDetails().getCoordinate())); + + if (!bundleExists) { + throw new IllegalArgumentException("Unknown bundle " + serviceToAddCoordinate.toString() + " for service type " + serviceToAddClass); } }