NIFI-5922: Bug fixes; initialize, setup, and enable controller services; code cleanup

This commit is contained in:
Mark Payne 2019-05-14 11:37:50 -04:00
parent 146689b36f
commit 60b8fcac1b
10 changed files with 188 additions and 39 deletions

View File

@ -43,8 +43,9 @@ public class NiFiStateless {
public static void main(final String[] args) throws IOException, ClassNotFoundException, NoSuchMethodException, InvocationTargetException, IllegalAccessException {
String nifi_home = System.getenv("NIFI_HOME");
if(nifi_home == null || nifi_home.equals(""))
if(nifi_home == null || nifi_home.equals("")) {
nifi_home = ".";
}
final File libDir = new File(nifi_home+"/lib");
final File statelesslibDir = new File(nifi_home+"/stateless-lib");

View File

@ -87,7 +87,7 @@ public abstract class AbstractStatelessComponent implements StatelessComponent {
boolean hasSuccessOutputPort = this.successOutputPorts.contains(relationship);
if (!(hasChildren || hasAutoterminate || hasFailureOutputPort || hasSuccessOutputPort)) {
getLogger().error("Component: {}, Relationship: {}, needs either auto terminate, child processors, or an output port", new Object[] {toString(), relationship.getName()});
getLogger().error("Component: {}, Relationship: {}, either needs to be auto-terminated or connected to another component", new Object[] {toString(), relationship.getName()});
return false;
}
}

View File

@ -21,7 +21,10 @@ import org.apache.nifi.attribute.expression.language.StandardPropertyValue;
import org.apache.nifi.bundle.Bundle;
import org.apache.nifi.bundle.BundleCoordinate;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.state.StateManager;
import org.apache.nifi.controller.ControllerService;
import org.apache.nifi.controller.ControllerServiceInitializationContext;
import org.apache.nifi.controller.ControllerServiceLookup;
import org.apache.nifi.controller.exception.ControllerServiceInstantiationException;
import org.apache.nifi.controller.exception.ProcessorInstantiationException;
import org.apache.nifi.logging.ComponentLog;
@ -33,6 +36,8 @@ import org.apache.nifi.registry.VariableRegistry;
import org.apache.nifi.registry.flow.VersionedControllerService;
import org.apache.nifi.registry.flow.VersionedProcessor;
import org.apache.nifi.util.file.classloader.ClassLoaderUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.net.MalformedURLException;
import java.net.URL;
@ -44,6 +49,7 @@ import java.util.Map;
import java.util.Set;
public class ComponentFactory {
private static final Logger logger = LoggerFactory.getLogger(ComponentFactory.class);
private final ExtensionManager extensionManager;
public ComponentFactory(final ExtensionManager extensionManager) {
@ -69,7 +75,7 @@ public class ComponentFactory {
final ClassLoader detectedClassLoader = extensionManager.createInstanceClassLoader(type, identifier, bundle,
classpathUrls == null ? Collections.emptySet() : classpathUrls);
System.out.println("Setting context class loader to " + detectedClassLoader + " (parent = " + detectedClassLoader.getParent() + ") to create " + type);
logger.debug("Setting context class loader to {} (parent = {}) to create {}", detectedClassLoader, detectedClassLoader.getParent(), type);
final Class<?> rawClass = Class.forName(type, true, detectedClassLoader);
Thread.currentThread().setContextClassLoader(detectedClassLoader);
@ -138,12 +144,14 @@ public class ComponentFactory {
}
public ControllerService createControllerService(final VersionedControllerService versionedControllerService, final VariableRegistry variableRegistry) {
return createControllerService(versionedControllerService, variableRegistry, null);
public ControllerService createControllerService(final VersionedControllerService versionedControllerService, final VariableRegistry variableRegistry,
final ControllerServiceLookup serviceLookup, final StateManager stateManager) {
return createControllerService(versionedControllerService, variableRegistry, null, serviceLookup, stateManager);
}
private ControllerService createControllerService(final VersionedControllerService versionedControllerService, final VariableRegistry variableRegistry, final Set<URL> classpathUrls) {
private ControllerService createControllerService(final VersionedControllerService versionedControllerService, final VariableRegistry variableRegistry, final Set<URL> classpathUrls,
final ControllerServiceLookup serviceLookup, final StateManager stateManager) {
final String type = versionedControllerService.getType();
final String identifier = versionedControllerService.getIdentifier();
@ -161,7 +169,7 @@ public class ComponentFactory {
final ClassLoader detectedClassLoader = extensionManager.createInstanceClassLoader(type, identifier, bundle,
classpathUrls == null ? Collections.emptySet() : classpathUrls);
System.out.println("Setting context class loader to " + detectedClassLoader + " (parent = " + detectedClassLoader.getParent() + ") to create " + type);
logger.debug("Setting context class loader to {} (parent = {}) to create {}", detectedClassLoader, detectedClassLoader.getParent(), type);
final Class<?> rawClass = Class.forName(type, true, detectedClassLoader);
Thread.currentThread().setContextClassLoader(detectedClassLoader);
@ -169,6 +177,8 @@ public class ComponentFactory {
final ComponentLog componentLog = new SLF4JComponentLog(extensionInstance);
final ControllerService service = (ControllerService) extensionInstance;
final ControllerServiceInitializationContext initializationContext = new StatelessControllerServiceInitializationContext(identifier, service, serviceLookup, stateManager);
service.initialize(initializationContext);
// If no classpath urls were provided, check if we need to add additional classpath URL's based on configured properties.
if (classpathUrls == null) {
@ -176,7 +186,7 @@ public class ComponentFactory {
variableRegistry, componentLog);
if (!additionalClasspathUrls.isEmpty()) {
return createControllerService(versionedControllerService, variableRegistry, additionalClasspathUrls);
return createControllerService(versionedControllerService, variableRegistry, additionalClasspathUrls, serviceLookup, stateManager);
}
}

View File

@ -27,12 +27,15 @@ import java.util.concurrent.atomic.AtomicBoolean;
public class StatelessControllerServiceConfiguration {
private final ControllerService service;
private final String name;
private final AtomicBoolean enabled = new AtomicBoolean(false);
private String annotationData;
private Map<PropertyDescriptor, String> properties = new HashMap<>();
public StatelessControllerServiceConfiguration(final ControllerService service) {
public StatelessControllerServiceConfiguration(final ControllerService service, final String name) {
this.service = service;
this.name = name;
}
public ControllerService getService() {
@ -75,4 +78,8 @@ public class StatelessControllerServiceConfiguration {
public Map<PropertyDescriptor, String> getProperties() {
return Collections.unmodifiableMap(properties);
}
public String getName() {
return name;
}
}

View File

@ -0,0 +1,80 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.stateless.core;
import org.apache.nifi.components.state.StateManager;
import org.apache.nifi.controller.ControllerService;
import org.apache.nifi.controller.ControllerServiceInitializationContext;
import org.apache.nifi.controller.ControllerServiceLookup;
import org.apache.nifi.controller.NodeTypeProvider;
import org.apache.nifi.logging.ComponentLog;
import java.io.File;
public class StatelessControllerServiceInitializationContext implements ControllerServiceInitializationContext {
private final ComponentLog logger;
private final String processorId;
private final ControllerServiceLookup controllerServiceLookup;
private final StateManager stateManager;
public StatelessControllerServiceInitializationContext(final String id, final ControllerService controllerService, final ControllerServiceLookup serviceLookup, final StateManager stateManager) {
processorId = id;
logger = new SLF4JComponentLog(controllerService);
controllerServiceLookup = serviceLookup;
this.stateManager = stateManager;
}
public String getIdentifier() {
return processorId;
}
public ComponentLog getLogger() {
return logger;
}
@Override
public StateManager getStateManager() {
return stateManager;
}
public ControllerServiceLookup getControllerServiceLookup() {
return controllerServiceLookup;
}
public NodeTypeProvider getNodeTypeProvider() {
return new NodeTypeProvider() {
public boolean isClustered() {
return false;
}
public boolean isPrimary() {
return false;
}
};
}
public String getKerberosServicePrincipal() {
return null; //this needs to be wired in.
}
public File getKerberosServiceKeytab() {
return null; //this needs to be wired in.
}
public File getKerberosConfigurationFile() {
return null; //this needs to be wired in.
}}

View File

@ -21,6 +21,7 @@ import org.apache.nifi.annotation.lifecycle.OnEnabled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.state.StateManager;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.controller.ControllerService;
import org.apache.nifi.controller.ControllerServiceLookup;
@ -28,6 +29,7 @@ import org.apache.nifi.registry.VariableRegistry;
import org.apache.nifi.reporting.InitializationException;
import java.lang.reflect.InvocationTargetException;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
@ -47,7 +49,7 @@ public class StatelessControllerServiceLookup implements ControllerServiceLookup
}
public void addControllerService(final ControllerService service) throws InitializationException {
public void addControllerService(final ControllerService service, final String serviceName) throws InitializationException {
final String identifier = service.getIdentifier();
final SLF4JComponentLog logger = new SLF4JComponentLog(service);
controllerServiceLoggers.put(identifier, logger);
@ -55,7 +57,7 @@ public class StatelessControllerServiceLookup implements ControllerServiceLookup
StatelessStateManager serviceStateManager = new StatelessStateManager();
controllerServiceStateManagers.put(identifier, serviceStateManager);
final StatelessProcessContext initContext = new StatelessProcessContext(requireNonNull(service), this, requireNonNull(identifier), logger, serviceStateManager);
final StatelessProcessContext initContext = new StatelessProcessContext(requireNonNull(service), this, serviceName, logger, serviceStateManager);
service.initialize(initContext);
try {
@ -64,7 +66,7 @@ public class StatelessControllerServiceLookup implements ControllerServiceLookup
throw new InitializationException(e);
}
final StatelessControllerServiceConfiguration config = new StatelessControllerServiceConfiguration(service);
final StatelessControllerServiceConfiguration config = new StatelessControllerServiceConfiguration(service, serviceName);
controllerServiceMap.put(identifier, config);
}
@ -116,7 +118,33 @@ public class StatelessControllerServiceLookup implements ControllerServiceLookup
return status == null ? null : serviceIdentifier;
}
public void enableControllerService(final ControllerService service, VariableRegistry registry) throws InvocationTargetException, IllegalAccessException {
public void enableControllerServices(final VariableRegistry variableRegistry) {
for (final StatelessControllerServiceConfiguration config : controllerServiceMap.values()) {
final ControllerService service = config.getService();
final Collection<ValidationResult> validationResults = validate(service, config.getName(), variableRegistry);
if (!validationResults.isEmpty()) {
throw new RuntimeException("Failed to enable Controller Service {id=" + service.getIdentifier() + ", name=" + config.getName() + ", type=" + service.getClass() + "} because " +
"validation failed: " + validationResults);
}
try {
enableControllerService(service, variableRegistry);
} catch (IllegalAccessException| InvocationTargetException e) {
throw new RuntimeException("Failed to enable Controller Service {id=" + service.getIdentifier() + ", name=" + config.getName() + ", type=" + service.getClass() + "}", e);
}
}
}
public Collection<ValidationResult> validate(final ControllerService service, final String serviceName, final VariableRegistry variableRegistry) {
final StateManager stateManager = controllerServiceStateManagers.get(service.getIdentifier());
final SLF4JComponentLog logger = controllerServiceLoggers.get(service.getIdentifier());
final StatelessProcessContext processContext = new StatelessProcessContext(service, this, serviceName, logger, stateManager, variableRegistry);
final StatelessValidationContext validationContext = new StatelessValidationContext(processContext, this, stateManager, variableRegistry);
return service.validate(validationContext);
}
private void enableControllerService(final ControllerService service, final VariableRegistry registry) throws InvocationTargetException, IllegalAccessException {
final StatelessControllerServiceConfiguration configuration = getConfiguration(service.getIdentifier());
if (configuration == null) {
throw new IllegalArgumentException("Controller Service " + service + " is not known");
@ -125,6 +153,7 @@ public class StatelessControllerServiceLookup implements ControllerServiceLookup
if (configuration.isEnabled()) {
throw new IllegalStateException("Cannot enable Controller Service " + service + " because it is not disabled");
}
final ConfigurationContext configContext = new StatelessConfigurationContext(service, configuration.getProperties(), this, registry);
ReflectionUtils.invokeMethodsWithAnnotation(OnEnabled.class, service, configContext);
@ -150,8 +179,8 @@ public class StatelessControllerServiceLookup implements ControllerServiceLookup
return configuration;
}
public ValidationResult setControllerServiceProperty(final ControllerService service, final PropertyDescriptor property, final StatelessProcessContext context, final VariableRegistry registry, final
String value) {
public ValidationResult setControllerServiceProperty(final ControllerService service, final PropertyDescriptor property, final StatelessProcessContext context,
final VariableRegistry registry, final String value) {
final StatelessStateManager serviceStateManager = controllerServiceStateManagers.get(service.getIdentifier());
if (serviceStateManager == null) {
throw new IllegalStateException("Controller service " + service + " has not been added to this TestRunner via the #addControllerService method");

View File

@ -19,11 +19,10 @@ package org.apache.nifi.stateless.core;
import com.google.gson.JsonArray;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.state.StateManager;
import org.apache.nifi.controller.ControllerService;
import org.apache.nifi.controller.exception.ProcessorInstantiationException;
import org.apache.nifi.stateless.bootstrap.ExtensionDiscovery;
import org.apache.nifi.stateless.bootstrap.InMemoryFlowFile;
import org.apache.nifi.stateless.bootstrap.RunnableFlow;
import org.apache.nifi.nar.ExtensionManager;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.registry.VariableDescriptor;
@ -40,6 +39,9 @@ import org.apache.nifi.registry.flow.VersionedRemoteGroupPort;
import org.apache.nifi.registry.flow.VersionedRemoteProcessGroup;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.security.util.SslContextFactory;
import org.apache.nifi.stateless.bootstrap.ExtensionDiscovery;
import org.apache.nifi.stateless.bootstrap.InMemoryFlowFile;
import org.apache.nifi.stateless.bootstrap.RunnableFlow;
import javax.net.ssl.SSLContext;
import java.io.File;
@ -117,12 +119,36 @@ public class StatelessFlow implements RunnableFlow {
final Set<VersionedControllerService> controllerServices = flow.getControllerServices();
for (final VersionedControllerService versionedControllerService : controllerServices) {
final ControllerService service = componentFactory.createControllerService(versionedControllerService, variableRegistry);
serviceLookup.addControllerService(service);
final StateManager stateManager = new StatelessStateManager();
final ControllerService service = componentFactory.createControllerService(versionedControllerService, variableRegistry, serviceLookup, stateManager);
serviceLookup.addControllerService(service, versionedControllerService.getName());
serviceLookup.setControllerServiceAnnotationData(service, versionedControllerService.getAnnotationData());
final SLF4JComponentLog logger = new SLF4JComponentLog(service);
final StatelessProcessContext processContext = new StatelessProcessContext(service, serviceLookup, versionedControllerService.getName(), logger, stateManager, variableRegistry);
final Map<String, String> versionedPropertyValues = versionedControllerService.getProperties();
for (final Map.Entry<String, String> entry : versionedPropertyValues.entrySet()) {
final String propertyName = entry.getKey();
final String propertyValue = entry.getValue();
final PropertyDescriptor descriptor = service.getPropertyDescriptor(propertyName);
serviceLookup.setControllerServiceProperty(service, descriptor, processContext, variableRegistry, propertyValue);
}
for (final PropertyDescriptor descriptor : service.getPropertyDescriptors()) {
final String versionedPropertyValue = versionedPropertyValues.get(descriptor.getName());
if (versionedPropertyValue == null && descriptor.getDefaultValue() != null) {
serviceLookup.setControllerServiceProperty(service, descriptor, processContext, variableRegistry, descriptor.getDefaultValue());
}
}
}
final Map<String, StatelessComponent> componentMap = new HashMap<>();
serviceLookup.enableControllerServices(variableRegistry);
final Map<String, StatelessComponent> componentMap = new HashMap<>();
for (final VersionedConnection connection : connections) {
boolean isInputPortConnection = false;
@ -249,10 +275,9 @@ public class StatelessFlow implements RunnableFlow {
}
}
roots = componentMap.entrySet()
roots = componentMap.values()
.stream()
.filter(e -> e.getValue().getParents().isEmpty())
.map(Map.Entry::getValue)
.filter(statelessComponent -> statelessComponent.getParents().isEmpty())
.collect(Collectors.toList());
}

View File

@ -70,29 +70,25 @@ public class StatelessProcessContext implements SchedulingContext, ControllerSer
private final StatelessControllerServiceLookup lookup;
public StatelessProcessContext(final ConfigurableComponent component, final StatelessControllerServiceLookup lookup, final String componentName, final StateManager stateManager, final VariableRegistry
variableRegistry) {
public StatelessProcessContext(final ConfigurableComponent component, final StatelessControllerServiceLookup lookup, final String componentName, final StateManager stateManager,
final VariableRegistry variableRegistry) {
this(component, lookup, componentName, new SLF4JComponentLog(component), stateManager, variableRegistry);
}
public StatelessProcessContext(final ConfigurableComponent component, final StatelessControllerServiceLookup lookup, final String componentName, final SLF4JComponentLog logger, final StatelessStateManager
statemanager) {
public StatelessProcessContext(final ConfigurableComponent component, final StatelessControllerServiceLookup lookup, final String componentName, final SLF4JComponentLog logger,
final StatelessStateManager statemanager) {
this(component, lookup, componentName, logger, statemanager, VariableRegistry.EMPTY_REGISTRY);
}
public StatelessProcessContext(final ConfigurableComponent component,
final StatelessControllerServiceLookup lookup,
final String componentName,
final SLF4JComponentLog logger,
final StateManager stateManager,
final VariableRegistry variableRegistry) {
public StatelessProcessContext(final ConfigurableComponent component, final StatelessControllerServiceLookup lookup, final String componentName,
final SLF4JComponentLog logger, final StateManager stateManager, final VariableRegistry variableRegistry) {
this.component = Objects.requireNonNull(component);
this.componentName = componentName == null ? "" : componentName;
this.inputRequirement = component.getClass().getAnnotation(InputRequirement.class);
this.lookup = lookup;
this.stateManager = stateManager;
this.variableRegistry = variableRegistry;
this.identifier = "ProcessContext-" + this.hashCode();
this.identifier = component.getIdentifier();
this.logger = logger;
}

View File

@ -777,12 +777,13 @@ public class StatelessProcessSession implements ProcessSession {
throw new IllegalArgumentException("Cannot export a flow file that I did not create");
}
final StatelessFlowFile StatelessFlowFile = validateState(flowFile);
validateState(flowFile);
final ByteArrayOutputStream baos = new ByteArrayOutputStream() {
@Override
public void close() throws IOException {
super.close();
final StatelessFlowFile newFlowFile = new StatelessFlowFile((StatelessFlowFile) flowFile, materializeContent);
newFlowFile.setData(toByteArray());
currentVersions.put(newFlowFile.getId(), newFlowFile);
}
};

View File

@ -107,8 +107,8 @@ public class StatelessProcessorWrapper extends AbstractStatelessComponent implem
//Validate context
final Collection<ValidationResult> validationResult = context.validate();
if (validationResult.stream().anyMatch(a -> !a.isValid()) || !this.validate()) {
throw new IllegalArgumentException("Processor is not valid: "
+ String.join("\n", validationResult.stream().map(ValidationResult::toString).collect(Collectors.toList())));
throw new IllegalArgumentException(processor + " is not valid: "
+ validationResult.stream().map(ValidationResult::toString).collect(Collectors.joining("\n")));
}
try (final CloseableNarLoader c = withNarClassLoader()) {
@ -147,7 +147,7 @@ public class StatelessProcessorWrapper extends AbstractStatelessComponent implem
final AtomicBoolean nextStepCalled = new AtomicBoolean(false);
try {
logger.info("Running " + this.processor.getClass().getSimpleName() + ".onTrigger with " + inputQueue.size() + " FlowFiles");
logger.debug("Running {}.onTrigger with {} FlowFiles", new Object[] {this.processor.getClass().getSimpleName(), inputQueue.size()});
try (final CloseableNarLoader c = withNarClassLoader()) { // Trigger processor with the appropriate class loader
processor.onTrigger(context, () -> {