NIFI-3904 Adding logic to only reload when incoming bundle is different and to use additional URLs from spec component

NIFI-3908 Changing UI to submit filterType for CSs and filter for processors and reporting tasks
This commit is contained in:
Bryan Bende 2017-05-16 13:52:10 -04:00 committed by Matt Gilman
parent 494a0e8928
commit 3a0004a665
No known key found for this signature in database
GPG Key ID: DF61EC19432AEE37
7 changed files with 133 additions and 63 deletions

View File

@ -34,6 +34,7 @@ import org.apache.nifi.util.file.classloader.ClassLoaderUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.ArrayList;
import java.util.Collection;
@ -119,6 +120,33 @@ public abstract class AbstractConfiguredComponent implements ConfigurableCompone
annotationData.set(CharacterFilterUtils.filterInvalidXmlCharacters(data));
}
@Override
public Set<URL> getAdditionalClasspathResources(final List<PropertyDescriptor> propertyDescriptors) {
final Set<String> modulePaths = new LinkedHashSet<>();
for (final PropertyDescriptor descriptor : propertyDescriptors) {
if (descriptor.isDynamicClasspathModifier()) {
final String value = getProperty(descriptor);
if (!StringUtils.isEmpty(value)) {
final StandardPropertyValue propertyValue = new StandardPropertyValue(value, null, variableRegistry);
modulePaths.add(propertyValue.evaluateAttributeExpressions().getValue());
}
}
}
final Set<URL> additionalUrls = new LinkedHashSet<>();
try {
final URL[] urls = ClassLoaderUtils.getURLsForClasspath(modulePaths, null, true);
if (urls != null) {
for (final URL url : urls) {
additionalUrls.add(url);
}
}
} catch (MalformedURLException mfe) {
getLogger().error("Error processing classpath resources for " + id + ": " + mfe.getMessage(), mfe);
}
return additionalUrls;
}
@Override
public void setProperties(Map<String, String> properties) {
if (properties == null) {
@ -145,19 +173,15 @@ public abstract class AbstractConfiguredComponent implements ConfigurableCompone
}
}
// if at least one property with dynamicallyModifiesClasspath(true) was set, then re-calculate the module paths
// and reset the InstanceClassLoader to the new module paths
// if at least one property with dynamicallyModifiesClasspath(true) was set, then reload the component with the new urls
if (classpathChanged) {
final Set<String> modulePaths = new LinkedHashSet<>();
for (final Map.Entry<PropertyDescriptor, String> entry : this.properties.entrySet()) {
final PropertyDescriptor descriptor = entry.getKey();
if (descriptor.isDynamicClasspathModifier() && !StringUtils.isEmpty(entry.getValue())) {
final StandardPropertyValue propertyValue = new StandardPropertyValue(entry.getValue(), null, variableRegistry);
modulePaths.add(propertyValue.evaluateAttributeExpressions().getValue());
final Set<URL> additionalUrls = getAdditionalClasspathResources(getComponent().getPropertyDescriptors());
try {
reload(additionalUrls);
} catch (Exception e) {
getLogger().error("Error reloading component with id " + id + ": " + e.getMessage(), e);
}
}
processClasspathModifiers(modulePaths);
}
}
} finally {
lock.unlock();
@ -237,32 +261,6 @@ public abstract class AbstractConfiguredComponent implements ConfigurableCompone
return false;
}
/**
* Triggers the reloading of the underlying component using a new InstanceClassLoader that includes the additional URL resources.
*
* @param modulePaths a list of module paths where each entry can be a comma-separated list of multiple module paths
*/
private void processClasspathModifiers(final Set<String> modulePaths) {
try {
// compute the URLs from all the modules paths
final URL[] urls = ClassLoaderUtils.getURLsForClasspath(modulePaths, null, true);
// convert to a set of URLs
final Set<URL> additionalUrls = new LinkedHashSet<>();
if (urls != null) {
for (final URL url : urls) {
additionalUrls.add(url);
}
}
// reload the underlying component with a new InstanceClassLoader that includes the new URLs
reload(additionalUrls);
} catch (Exception e) {
getLogger().warn("Error processing classpath resources for " + id + ": " + e.getMessage(), e);
}
}
@Override
public Map<PropertyDescriptor, String> getProperties() {
try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(getComponent().getClass(), getComponent().getIdentifier())) {
@ -285,6 +283,14 @@ public abstract class AbstractConfiguredComponent implements ConfigurableCompone
return properties.get(property);
}
@Override
public void refreshProperties() {
// use setProperty instead of setProperties so we can bypass the class loading logic
getProperties().entrySet().stream()
.filter(e -> e.getKey() != null && e.getValue() != null)
.forEach(e -> setProperty(e.getKey().getName(), e.getValue()));
}
@Override
public int hashCode() {
return 273171 * id.hashCode();

View File

@ -32,6 +32,7 @@ import org.apache.nifi.logging.ComponentLog;
import java.net.URL;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
@ -57,6 +58,10 @@ public interface ConfiguredComponent extends ComponentAuthorizable {
void reload(Set<URL> additionalUrls) throws Exception;
void refreshProperties();
Set<URL> getAdditionalClasspathResources(List<PropertyDescriptor> propertyDescriptors);
BundleCoordinate getBundleCoordinate();
ConfigurableComponent getComponent();

View File

@ -1197,6 +1197,11 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
final String id = existingNode.getProcessor().getIdentifier();
// ghost components will have a null logger
if (existingNode.getLogger() != null) {
existingNode.getLogger().debug("Reloading component {} to type {} from bundle {}", new Object[]{id, newType, bundleCoordinate});
}
// createProcessor will create a new instance class loader for the same id so
// save the instance class loader to use it for calling OnRemoved on the existing processor
final ClassLoader existingInstanceClassLoader = ExtensionManager.getInstanceClassLoader(id);
@ -1218,6 +1223,9 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
final LoggableComponent<Processor> newProcessor = new LoggableComponent<>(newNode.getProcessor(), newNode.getBundleCoordinate(), newNode.getLogger());
existingNode.setProcessor(newProcessor);
existingNode.setExtensionMissing(newNode.isExtensionMissing());
// need to refresh the properties in case we are changing from ghost component to real component
existingNode.refreshProperties();
}
/**
@ -3065,6 +3073,11 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
final String id = existingNode.getReportingTask().getIdentifier();
// ghost components will have a null logger
if (existingNode.getLogger() != null) {
existingNode.getLogger().debug("Reloading component {} to type {} from bundle {}", new Object[]{id, newType, bundleCoordinate});
}
// createReportingTask will create a new instance class loader for the same id so
// save the instance class loader to use it for calling OnRemoved on the existing processor
final ClassLoader existingInstanceClassLoader = ExtensionManager.getInstanceClassLoader(id);
@ -3084,6 +3097,9 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
final LoggableComponent<ReportingTask> newReportingTask = new LoggableComponent<>(newNode.getReportingTask(), newNode.getBundleCoordinate(), newNode.getLogger());
existingNode.setReportingTask(newReportingTask);
existingNode.setExtensionMissing(newNode.isExtensionMissing());
// need to refresh the properties in case we are changing from ghost component to real component
existingNode.refreshProperties();
}
@Override
@ -3175,6 +3191,11 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
final String id = existingNode.getIdentifier();
// ghost components will have a null logger
if (existingNode.getLogger() != null) {
existingNode.getLogger().debug("Reloading component {} to type {} from bundle {}", new Object[]{id, newType, bundleCoordinate});
}
// createControllerService will create a new instance class loader for the same id so
// save the instance class loader to use it for calling OnRemoved on the existing service
final ClassLoader existingInstanceClassLoader = ExtensionManager.getInstanceClassLoader(id);
@ -3203,6 +3224,9 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
// set the new impl, proxy, and invocation handler into the existing node
existingNode.setControllerServiceAndProxy(loggableImplementation, loggableProxy, invocationHandler);
existingNode.setExtensionMissing(newNode.isExtensionMissing());
// need to refresh the properties in case we are changing from ghost component to real component
existingNode.refreshProperties();
}
@Override

View File

@ -17,6 +17,7 @@
package org.apache.nifi.web.dao.impl;
import org.apache.nifi.bundle.BundleCoordinate;
import org.apache.nifi.components.ConfigurableComponent;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.components.state.StateMap;
import org.apache.nifi.controller.ConfiguredComponent;
@ -28,6 +29,7 @@ import org.apache.nifi.controller.service.ControllerServiceNode;
import org.apache.nifi.controller.service.ControllerServiceProvider;
import org.apache.nifi.controller.service.ControllerServiceState;
import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.nar.ExtensionManager;
import org.apache.nifi.util.BundleUtils;
import org.apache.nifi.web.NiFiCoreException;
import org.apache.nifi.web.ResourceNotFoundException;
@ -36,6 +38,7 @@ import org.apache.nifi.web.api.dto.ControllerServiceDTO;
import org.apache.nifi.web.dao.ComponentStateDAO;
import org.apache.nifi.web.dao.ControllerServiceDAO;
import java.net.URL;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
@ -147,6 +150,7 @@ public class StandardControllerServiceDAO extends ComponentDAO implements Contro
configureControllerService(controllerService, controllerServiceDTO);
// attempt to change the underlying controller service if an updated bundle is specified
// updating the bundle must happen after configuring so that any additional classpath resources are set first
updateBundle(controllerService, controllerServiceDTO);
// enable or disable as appropriate
@ -167,17 +171,23 @@ public class StandardControllerServiceDAO extends ComponentDAO implements Contro
}
private void updateBundle(final ControllerServiceNode controllerService, final ControllerServiceDTO controllerServiceDTO) {
BundleDTO bundleDTO = controllerServiceDTO.getBundle();
final BundleDTO bundleDTO = controllerServiceDTO.getBundle();
if (bundleDTO != null) {
final BundleCoordinate incomingCoordinate = BundleUtils.getBundle(controllerService.getCanonicalClassName(), bundleDTO);
final BundleCoordinate existingCoordinate = controllerService.getBundleCoordinate();
if (!existingCoordinate.getCoordinate().equals(incomingCoordinate.getCoordinate())) {
try {
flowController.reload(controllerService, controllerService.getCanonicalClassName(), incomingCoordinate, Collections.emptySet());
// we need to use the property descriptors from the temp component here in case we are changing from a ghost component to a real component
final ConfigurableComponent tempComponent = ExtensionManager.getTempComponent(controllerService.getCanonicalClassName(), incomingCoordinate);
final Set<URL> additionalUrls = controllerService.getAdditionalClasspathResources(tempComponent.getPropertyDescriptors());
flowController.reload(controllerService, controllerService.getCanonicalClassName(), incomingCoordinate, additionalUrls);
} catch (ControllerServiceInstantiationException e) {
throw new NiFiCoreException(String.format("Unable to update controller service %s from %s to %s due to: %s",
controllerServiceDTO.getId(), controllerService.getBundleCoordinate().getCoordinate(), incomingCoordinate.getCoordinate(), e.getMessage()), e);
}
}
}
}
@Override
public Set<ConfiguredComponent> updateControllerServiceReferencingComponents(

View File

@ -18,6 +18,7 @@ package org.apache.nifi.web.dao.impl;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.bundle.BundleCoordinate;
import org.apache.nifi.components.ConfigurableComponent;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.components.state.StateMap;
import org.apache.nifi.connectable.Connection;
@ -30,6 +31,7 @@ import org.apache.nifi.controller.exception.ProcessorInstantiationException;
import org.apache.nifi.controller.exception.ValidationException;
import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.logging.LogLevel;
import org.apache.nifi.nar.ExtensionManager;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.scheduling.ExecutionNode;
import org.apache.nifi.scheduling.SchedulingStrategy;
@ -46,9 +48,9 @@ import org.quartz.CronExpression;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.net.URL;
import java.text.ParseException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@ -406,6 +408,7 @@ public class StandardProcessorDAO extends ComponentDAO implements ProcessorDAO {
configureProcessor(processor, processorDTO);
// attempt to change the underlying processor if an updated bundle is specified
// updating the bundle must happen after configuring so that any additional classpath resources are set first
updateBundle(processor, processorDTO);
// see if an update is necessary
@ -450,17 +453,23 @@ public class StandardProcessorDAO extends ComponentDAO implements ProcessorDAO {
}
private void updateBundle(ProcessorNode processor, ProcessorDTO processorDTO) {
BundleDTO bundleDTO = processorDTO.getBundle();
final BundleDTO bundleDTO = processorDTO.getBundle();
if (bundleDTO != null) {
BundleCoordinate incomingCoordinate = BundleUtils.getBundle(processor.getCanonicalClassName(), bundleDTO);
final BundleCoordinate incomingCoordinate = BundleUtils.getBundle(processor.getCanonicalClassName(), bundleDTO);
final BundleCoordinate existingCoordinate = processor.getBundleCoordinate();
if (!existingCoordinate.getCoordinate().equals(incomingCoordinate.getCoordinate())) {
try {
flowController.reload(processor, processor.getCanonicalClassName(), incomingCoordinate, Collections.emptySet());
// we need to use the property descriptors from the temp component here in case we are changing from a ghost component to a real component
final ConfigurableComponent tempComponent = ExtensionManager.getTempComponent(processor.getCanonicalClassName(), incomingCoordinate);
final Set<URL> additionalUrls = processor.getAdditionalClasspathResources(tempComponent.getPropertyDescriptors());
flowController.reload(processor, processor.getCanonicalClassName(), incomingCoordinate, additionalUrls);
} catch (ProcessorInstantiationException e) {
throw new NiFiCoreException(String.format("Unable to update processor %s from %s to %s due to: %s",
processorDTO.getId(), processor.getBundleCoordinate().getCoordinate(), incomingCoordinate.getCoordinate(), e.getMessage()), e);
}
}
}
}
@Override
public void verifyDelete(String processorId) {

View File

@ -18,6 +18,7 @@ package org.apache.nifi.web.dao.impl;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.bundle.BundleCoordinate;
import org.apache.nifi.components.ConfigurableComponent;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.components.state.StateMap;
import org.apache.nifi.controller.ReloadComponent;
@ -27,6 +28,7 @@ import org.apache.nifi.controller.exception.ComponentLifeCycleException;
import org.apache.nifi.controller.exception.ValidationException;
import org.apache.nifi.controller.reporting.ReportingTaskInstantiationException;
import org.apache.nifi.controller.reporting.ReportingTaskProvider;
import org.apache.nifi.nar.ExtensionManager;
import org.apache.nifi.scheduling.SchedulingStrategy;
import org.apache.nifi.util.BundleUtils;
import org.apache.nifi.util.FormatUtils;
@ -38,9 +40,9 @@ import org.apache.nifi.web.dao.ComponentStateDAO;
import org.apache.nifi.web.dao.ReportingTaskDAO;
import org.quartz.CronExpression;
import java.net.URL;
import java.text.ParseException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
@ -121,6 +123,7 @@ public class StandardReportingTaskDAO extends ComponentDAO implements ReportingT
configureReportingTask(reportingTask, reportingTaskDTO);
// attempt to change the underlying processor if an updated bundle is specified
// updating the bundle must happen after configuring so that any additional classpath resources are set first
updateBundle(reportingTask, reportingTaskDTO);
// configure scheduled state
@ -166,16 +169,21 @@ public class StandardReportingTaskDAO extends ComponentDAO implements ReportingT
}
private void updateBundle(ReportingTaskNode reportingTask, ReportingTaskDTO reportingTaskDTO) {
BundleDTO bundleDTO = reportingTaskDTO.getBundle();
final BundleDTO bundleDTO = reportingTaskDTO.getBundle();
if (bundleDTO != null) {
final BundleCoordinate incomingCoordinate = BundleUtils.getBundle(reportingTask.getCanonicalClassName(), bundleDTO);
final BundleCoordinate existingCoordinate = reportingTask.getBundleCoordinate();
if (!existingCoordinate.getCoordinate().equals(incomingCoordinate.getCoordinate())) {
try {
reloadComponent.reload(reportingTask, reportingTask.getCanonicalClassName(), incomingCoordinate, Collections.emptySet());
// we need to use the property descriptors from the temp component here in case we are changing from a ghost component to a real component
final ConfigurableComponent tempComponent = ExtensionManager.getTempComponent(reportingTask.getCanonicalClassName(), incomingCoordinate);
final Set<URL> additionalUrls = reportingTask.getAdditionalClasspathResources(tempComponent.getPropertyDescriptors());
reloadComponent.reload(reportingTask, reportingTask.getCanonicalClassName(), incomingCoordinate, additionalUrls);
} catch (ReportingTaskInstantiationException e) {
throw new NiFiCoreException(String.format("Unable to update reporting task %s from %s to %s due to: %s",
reportingTaskDTO.getId(), reportingTask.getBundleCoordinate().getCoordinate(), incomingCoordinate.getCoordinate(), e.getMessage()), e);
}
}
}
}

View File

@ -282,13 +282,21 @@
* @param {object} componentEntity
*/
promptForVersionChange: function (componentEntity) {
var params = {
'bundleGroupFilter': componentEntity.component.bundle.group,
'bundleArtifactFilter': componentEntity.component.bundle.artifact
};
// special handling for incorrect query param
if (getTypeField(componentEntity) === 'controllerServiceTypes') {
params['typeFilter'] = componentEntity.component.type;
} else {
params['type'] = componentEntity.component.type;
}
return $.ajax({
type: 'GET',
url: getTypeUri(componentEntity) + '?' + $.param({
'bundleGroupFilter': componentEntity.component.bundle.group,
'bundleArtifactFilter': componentEntity.component.bundle.artifact,
'typeFilter': componentEntity.component.type
}),
url: getTypeUri(componentEntity) + '?' + $.param(params),
dataType: 'json'
}).done(function (response) {
var options = [];