NIFI-5136 Ensure processor references are removed from LogRepository and from ProcessScheduler

- Forcing FileSystem statistics thread to be interrupted when HDFS processors are stopped
- Stop creating temp components during import from registry, use bundle info instead

This closes #2668.

Signed-off-by: Mark Payne <markap14@hotmail.com>
This commit is contained in:
Bryan Bende 2018-04-27 14:52:58 -04:00 committed by Mark Payne
parent 2094786ec8
commit 92b4a3208f
7 changed files with 106 additions and 14 deletions

View File

@ -23,6 +23,7 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.SaslPlainServer;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.nifi.annotation.behavior.RequiresInstanceClassLoading;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
@ -44,10 +45,12 @@ import javax.net.SocketFactory;
import java.io.File;
import java.io.IOException;
import java.lang.ref.WeakReference;
import java.lang.reflect.Field;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.URI;
import java.security.PrivilegedExceptionAction;
import java.security.Security;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@ -257,9 +260,63 @@ public abstract class AbstractHadoopProcessor extends AbstractProcessor {
@OnStopped
public final void abstractOnStopped() {
final HdfsResources resources = hdfsResources.get();
if (resources != null) {
// Attempt to close the FileSystem
final FileSystem fileSystem = resources.getFileSystem();
try {
interruptStatisticsThread(fileSystem);
} catch (Exception e) {
getLogger().warn("Error stopping FileSystem statistics thread: " + e.getMessage(), e);
} finally {
if (fileSystem != null) {
try {
fileSystem.close();
} catch (IOException e) {
getLogger().warn("Error close FileSystem: " + e.getMessage(), e);
}
}
}
// Clean-up the static reference to the Configuration instance
UserGroupInformation.setConfiguration(new Configuration());
// Clean-up the reference to the InstanceClassLoader that was put into Configuration
final Configuration configuration = resources.getConfiguration();
configuration.setClassLoader(null);
// Need to remove the Provider instance from the JVM's Providers class so that InstanceClassLoader can be GC'd eventually
final SaslPlainServer.SecurityProvider saslProvider = new SaslPlainServer.SecurityProvider();
Security.removeProvider(saslProvider.getName());
}
// Clear out the reference to the resources
hdfsResources.set(new HdfsResources(null, null, null));
}
private void interruptStatisticsThread(final FileSystem fileSystem) throws NoSuchFieldException, IllegalAccessException {
final Field statsField = FileSystem.class.getDeclaredField("statistics");
statsField.setAccessible(true);
final Object statsObj = statsField.get(fileSystem);
if (statsObj != null && statsObj instanceof FileSystem.Statistics) {
final FileSystem.Statistics statistics = (FileSystem.Statistics) statsObj;
final Field statsThreadField = statistics.getClass().getDeclaredField("STATS_DATA_CLEANER");
statsThreadField.setAccessible(true);
final Object statsThreadObj = statsThreadField.get(statistics);
if (statsThreadObj != null && statsThreadObj instanceof Thread) {
final Thread statsThread = (Thread) statsThreadObj;
try {
statsThread.interrupt();
} catch (Exception e) {
getLogger().warn("Error interrupting thread: " + e.getMessage(), e);
}
}
}
}
private static Configuration getConfigurationFromResources(final Configuration config, String configResources) throws IOException {
boolean foundResources = false;
if (null != configResources) {

View File

@ -83,6 +83,14 @@ public interface ProcessScheduler {
*/
void terminateProcessor(ProcessorNode procNode);
/*
* Notifies the schedule that the given processor is being removed so the scheduler may clean up any resources
* related to the given processor.
*
* @param procNode the processor node being removed
*/
void onProcessorRemoved(ProcessorNode procNode);
/**
* Starts scheduling the given Port to run. If the Port is already scheduled
* to run, does nothing.

View File

@ -58,4 +58,8 @@ public class LogRepositoryFactory {
return repository;
}
public static LogRepository removeRepository(final String componentId) {
return repositoryMap.remove(requireNonNull(componentId));
}
}

View File

@ -1129,6 +1129,10 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
boolean creationSuccessful;
LoggableComponent<Processor> processor;
// make sure the first reference to LogRepository happens outside of a NarCloseable so that we use the framework's ClassLoader
final LogRepository logRepository = LogRepositoryFactory.getRepository(id);
try {
processor = instantiateProcessor(type, id, coordinate, additionalUrls);
creationSuccessful = true;
@ -1154,7 +1158,6 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
componentType, type, nifiProperties, componentVarRegistry, this, true);
}
final LogRepository logRepository = LogRepositoryFactory.getRepository(id);
if (registerLogObserver) {
logRepository.addObserver(StandardProcessorNode.BULLETIN_OBSERVER_ID, LogLevel.WARN, new ProcessorLogObserver(getBulletinRepository(), procNode));
}
@ -3371,6 +3374,10 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
LoggableComponent<ReportingTask> task = null;
boolean creationSuccessful = true;
// make sure the first reference to LogRepository happens outside of a NarCloseable so that we use the framework's ClassLoader
final LogRepository logRepository = LogRepositoryFactory.getRepository(id);
try {
task = instantiateReportingTask(type, id, bundleCoordinate, additionalUrls);
} catch (final Exception e) {
@ -3418,7 +3425,6 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
reportingTasks.put(id, taskNode);
// Register log observer to provide bulletins when reporting task logs anything at WARN level or above
final LogRepository logRepository = LogRepositoryFactory.getRepository(id);
logRepository.addObserver(StandardProcessorNode.BULLETIN_OBSERVER_ID, LogLevel.WARN,
new ReportingTaskLogObserver(getBulletinRepository(), taskNode));
}
@ -3551,6 +3557,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
}
reportingTasks.remove(reportingTaskNode.getIdentifier());
LogRepositoryFactory.removeRepository(reportingTaskNode.getIdentifier());
ExtensionManager.removeInstanceClassLoader(reportingTaskNode.getIdentifier());
}
@ -3565,10 +3572,12 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
@Override
public ControllerServiceNode createControllerService(final String type, final String id, final BundleCoordinate bundleCoordinate, final Set<URL> additionalUrls, final boolean firstTimeAdded) {
// make sure the first reference to LogRepository happens outside of a NarCloseable so that we use the framework's ClassLoader
final LogRepository logRepository = LogRepositoryFactory.getRepository(id);
final ControllerServiceNode serviceNode = controllerServiceProvider.createControllerService(type, id, bundleCoordinate, additionalUrls, firstTimeAdded);
// Register log observer to provide bulletins when reporting task logs anything at WARN level or above
final LogRepository logRepository = LogRepositoryFactory.getRepository(id);
logRepository.addObserver(StandardProcessorNode.BULLETIN_OBSERVER_ID, LogLevel.WARN,
new ControllerServiceLogObserver(getBulletinRepository(), serviceNode));

View File

@ -377,6 +377,11 @@ public final class StandardProcessScheduler implements ProcessScheduler {
LOG.info("Successfully terminated {} with {} active threads", procNode, tasksTerminated);
}
@Override
public void onProcessorRemoved(final ProcessorNode procNode) {
this.lifecycleStates.remove(procNode);
}
@Override
public void yield(final ProcessorNode procNode) {
// This exists in the ProcessScheduler so that the scheduler can take

View File

@ -62,6 +62,7 @@ import org.apache.nifi.controller.scheduling.StandardProcessScheduler;
import org.apache.nifi.events.BulletinFactory;
import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.logging.LogRepositoryFactory;
import org.apache.nifi.nar.ExtensionManager;
import org.apache.nifi.nar.NarCloseable;
import org.apache.nifi.processor.SimpleProcessLogger;
@ -703,6 +704,7 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi
}
group.removeControllerService(serviceNode);
LogRepositoryFactory.removeRepository(serviceNode.getIdentifier());
ExtensionManager.removeInstanceClassLoader(serviceNode.getIdentifier());
serviceCache.remove(serviceNode.getIdentifier());
}

View File

@ -914,8 +914,8 @@ public final class StandardProcessGroup implements ProcessGroup {
processors.remove(id);
onComponentModified();
scheduler.onProcessorRemoved(processor);
flowController.onProcessorRemoved(processor);
LogRepositoryFactory.getRepository(processor.getIdentifier()).removeAllObservers();
final StateManagerProvider stateManagerProvider = flowController.getStateManagerProvider();
scheduler.submitFrameworkTask(new Runnable() {
@ -936,6 +936,7 @@ public final class StandardProcessGroup implements ProcessGroup {
} finally {
if (removed) {
try {
LogRepositoryFactory.removeRepository(processor.getIdentifier());
ExtensionManager.removeInstanceClassLoader(id);
} catch (Throwable t) {
}
@ -4462,11 +4463,14 @@ public final class StandardProcessGroup implements ProcessGroup {
.forEach(proc -> proposedProcessors.remove(proc.getVersionedComponentId().get()));
for (final VersionedProcessor processorToAdd : proposedProcessors.values()) {
final BundleCoordinate coordinate = toCoordinate(processorToAdd.getBundle());
try {
flowController.createProcessor(processorToAdd.getType(), UUID.randomUUID().toString(), coordinate, false);
} catch (Exception e) {
throw new IllegalArgumentException("Unable to create Processor of type " + processorToAdd.getType(), e);
final String processorToAddClass = processorToAdd.getType();
final BundleCoordinate processorToAddCoordinate = toCoordinate(processorToAdd.getBundle());
final boolean bundleExists = ExtensionManager.getBundles(processorToAddClass).stream()
.anyMatch(b -> processorToAddCoordinate.equals(b.getBundleDetails().getCoordinate()));
if (!bundleExists) {
throw new IllegalArgumentException("Unknown bundle " + processorToAddCoordinate.toString() + " for processor type " + processorToAddClass);
}
}
@ -4479,11 +4483,14 @@ public final class StandardProcessGroup implements ProcessGroup {
.forEach(service -> proposedServices.remove(service.getVersionedComponentId().get()));
for (final VersionedControllerService serviceToAdd : proposedServices.values()) {
final BundleCoordinate coordinate = toCoordinate(serviceToAdd.getBundle());
try {
flowController.createControllerService(serviceToAdd.getType(), UUID.randomUUID().toString(), coordinate, Collections.emptySet(), false);
} catch (Exception e) {
throw new IllegalArgumentException("Unable to create Controller Service of type " + serviceToAdd.getType(), e);
final String serviceToAddClass = serviceToAdd.getType();
final BundleCoordinate serviceToAddCoordinate = toCoordinate(serviceToAdd.getBundle());
final boolean bundleExists = ExtensionManager.getBundles(serviceToAddClass).stream()
.anyMatch(b -> serviceToAddCoordinate.equals(b.getBundleDetails().getCoordinate()));
if (!bundleExists) {
throw new IllegalArgumentException("Unknown bundle " + serviceToAddCoordinate.toString() + " for service type " + serviceToAddClass);
}
}