NIFI-11493: Defaulted dynamically modified classpath fix

This closes #7201.

Co-authored-by: Peter Turcsanyi <turcsanyi@apache.org>
Signed-off-by: Peter Turcsanyi <turcsanyi@apache.org>
This commit is contained in:
Lehel Boér 2023-04-27 21:43:12 +02:00 committed by Peter Turcsanyi
parent 01e72d6b51
commit ab20a93a90
4 changed files with 222 additions and 15 deletions

View File

@ -604,6 +604,14 @@ public abstract class AbstractComponentNode implements ComponentNode {
return getProperty(property).getEffectiveValue(getParameterContext());
}
private String getEffectivePropertyValueWithDefault(final PropertyDescriptor property) {
String value = getProperty(property).getEffectiveValue(getParameterContext());
if (value == null) {
value = property.getDefaultValue();
}
return value;
}
@Override
public String getRawPropertyValue(final PropertyDescriptor property) {
return getProperty(property).getRawValue();
@ -662,23 +670,23 @@ public abstract class AbstractComponentNode implements ComponentNode {
*/
@Override
public synchronized void reloadAdditionalResourcesIfNecessary() {
// Components that don't have any PropertyDescriptors marked `dynamicallyModifiesClasspath`
// won't have the fingerprint i.e. will be null, in such cases do nothing
if (additionalResourcesFingerprint == null) {
return;
}
final Set<PropertyDescriptor> descriptors = this.getProperties().keySet();
final Set<URL> additionalUrls = this.getAdditionalClasspathResources(descriptors);
final String newFingerprint = ClassLoaderUtils.generateAdditionalUrlsFingerprint(additionalUrls, determineClasloaderIsolationKey());
if(!StringUtils.equals(additionalResourcesFingerprint, newFingerprint)) {
setAdditionalResourcesFingerprint(newFingerprint);
try {
logger.info("Updating classpath for " + this.componentType + " with the ID " + this.getIdentifier());
reload(additionalUrls);
} catch (Exception e) {
logger.error("Error reloading component with id " + id + ": " + e.getMessage(), e);
final boolean dynamicallyModifiesClasspath = descriptors.stream()
.anyMatch(PropertyDescriptor::isDynamicClasspathModifier);
if (dynamicallyModifiesClasspath) {
final Set<URL> additionalUrls = this.getAdditionalClasspathResources(descriptors, this::getEffectivePropertyValueWithDefault);
final String newFingerprint = ClassLoaderUtils.generateAdditionalUrlsFingerprint(additionalUrls, determineClasloaderIsolationKey());
if (!StringUtils.equals(additionalResourcesFingerprint, newFingerprint)) {
setAdditionalResourcesFingerprint(newFingerprint);
try {
logger.info("Updating classpath for [{}] with the ID [{}]", this.componentType, this.getIdentifier());
reload(additionalUrls);
} catch (Exception e) {
logger.error("Error reloading component with id [{}]: {}", id, e.getMessage(), e);
}
}
}
}

View File

@ -0,0 +1,104 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.tests.system;
import org.apache.nifi.annotation.behavior.RequiresInstanceClassLoading;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.resource.ResourceCardinality;
import org.apache.nifi.components.resource.ResourceType;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import java.io.BufferedWriter;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
@RequiresInstanceClassLoading
public class DefaultedDynamicallyModifyClasspath extends AbstractProcessor {
static final PropertyDescriptor URLS = new PropertyDescriptor.Builder()
.name("URLs to Load")
.description("URLs to load onto the classpath")
.required(false)
.defaultValue("lib/bootstrap/commons-lang3-3.12.0.jar")
.dynamicallyModifiesClasspath(true)
.identifiesExternalResource(ResourceCardinality.MULTIPLE, ResourceType.URL, ResourceType.FILE, ResourceType.DIRECTORY)
.build();
static final PropertyDescriptor CLASS_TO_LOAD = new PropertyDescriptor.Builder()
.name("Class to Load")
.description("The name of the Class to load")
.required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description("FlowFiles are routed to this relationship if the specified class can be loaded")
.build();
static final Relationship REL_FAILURE = new Relationship.Builder()
.name("failure")
.description("FlowFiles are routed to this relationship if the specified class cannot be loaded")
.build();
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return Arrays.asList(URLS, CLASS_TO_LOAD);
}
@Override
public Set<Relationship> getRelationships() {
return new HashSet<>(Arrays.asList(REL_SUCCESS, REL_FAILURE));
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
FlowFile flowFile = session.get();
if (flowFile == null) {
return;
}
final String classToLoad = context.getProperty(CLASS_TO_LOAD).getValue();
try {
final Class<?> clazz = Class.forName(classToLoad);
try (final OutputStream out = session.write(flowFile);
final OutputStreamWriter streamWriter = new OutputStreamWriter(out);
final BufferedWriter writer = new BufferedWriter(streamWriter)) {
writer.write(clazz.getName());
writer.newLine();
writer.write(clazz.getClassLoader().toString());
}
session.transfer(flowFile, REL_SUCCESS);
} catch (final Exception e) {
session.transfer(flowFile, REL_FAILURE);
}
}
}

View File

@ -21,6 +21,7 @@ org.apache.nifi.processors.tests.system.DependOnProperties
org.apache.nifi.processors.tests.system.DoNotTransferFlowFile
org.apache.nifi.processors.tests.system.Duplicate
org.apache.nifi.processors.tests.system.DynamicallyModifyClasspath
org.apache.nifi.processors.tests.system.DefaultedDynamicallyModifyClasspath
org.apache.nifi.processors.tests.system.EnsureProcessorConfigurationCorrect
org.apache.nifi.processors.tests.system.EvaluatePropertiesWithDifferentELScopes
org.apache.nifi.processors.tests.system.FakeProcessor

View File

@ -0,0 +1,94 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.tests.system.processor;
import org.apache.nifi.tests.system.NiFiSystemIT;
import org.apache.nifi.toolkit.cli.impl.client.nifi.NiFiClientException;
import org.apache.nifi.web.api.entity.ConnectionEntity;
import org.apache.nifi.web.api.entity.ProcessorEntity;
import org.junit.jupiter.api.Test;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
class DefaultedDynamicClassPathModificationIT extends NiFiSystemIT {
private ProcessorEntity generateFlowFileProcessor;
private ProcessorEntity defaultedModifyClasspathProcessor;
private ConnectionEntity defaultedModifyClasspathInputConnection;
private ConnectionEntity successConnection;
private ConnectionEntity failureConnection;
@Test
void testLoadsClassFromDefaultedDynamicModification() throws NiFiClientException, IOException, InterruptedException {
createFlow();
// Update modify to have the appropriate URL, don't update URL to load to let it on default value
final Map<String, String> propertyMap = new HashMap<>();
propertyMap.put("Class to Load", "org.apache.commons.lang3.StringUtils");
getClientUtil().updateProcessorProperties(defaultedModifyClasspathProcessor, propertyMap);
getClientUtil().waitForValidProcessor(defaultedModifyClasspathProcessor.getId());
// Create a FlowFile
getClientUtil().waitForValidProcessor(generateFlowFileProcessor.getId());
getClientUtil().startProcessor(generateFlowFileProcessor);
waitForQueueCount(defaultedModifyClasspathInputConnection.getId(), 1);
// Wait for a FlowFile to be routed to success
getClientUtil().startProcessor(defaultedModifyClasspathProcessor);
waitForQueueCount(successConnection.getId(), 1);
getClientUtil().stopProcessor(generateFlowFileProcessor);
getClientUtil().waitForStoppedProcessor(generateFlowFileProcessor.getId());
// Restart and ensure that everything works as expected after restart
getNiFiInstance().stop();
getNiFiInstance().start(true);
// Feed another FlowFile through. Upon restart, in order to modify, we need to get the most up-to-date revision so will first fetch the Processor
final ProcessorEntity generateAfterRestart = getNifiClient().getProcessorClient().getProcessor(generateFlowFileProcessor.getId());
getClientUtil().waitForValidProcessor(generateAfterRestart.getId());
getClientUtil().startProcessor(generateAfterRestart);
// Depending on whether or not the flow was written out with the processor running, the Modify processor may or may not be running. Ensure that it is running.
getClientUtil().waitForValidationCompleted(defaultedModifyClasspathProcessor);
final ProcessorEntity modifyAfterRestart = getNifiClient().getProcessorClient().getProcessor(defaultedModifyClasspathProcessor.getId());
final String modifyRunStatus = modifyAfterRestart.getStatus().getRunStatus();
if (!"Running".equalsIgnoreCase(modifyRunStatus)) {
getClientUtil().startProcessor(modifyAfterRestart);
}
// We now expect 2 FlowFiles to be in the success route
waitForQueueCount(successConnection.getId(), 2);
}
// We have several tests running the same flow but with different configuration. Since we need to reference the ProcessorEntities and ConnectionEntities, we have a method
// that creates the flow and stores the entities are member variables
private void createFlow() throws NiFiClientException, IOException {
generateFlowFileProcessor = getClientUtil().createProcessor("GenerateFlowFile");
defaultedModifyClasspathProcessor = getClientUtil().createProcessor("DefaultedDynamicallyModifyClasspath");
ProcessorEntity terminateSuccess = getClientUtil().createProcessor("TerminateFlowFile");
ProcessorEntity terminateFailure = getClientUtil().createProcessor("TerminateFlowFile");
defaultedModifyClasspathInputConnection = getClientUtil().createConnection(generateFlowFileProcessor, defaultedModifyClasspathProcessor, "success");
successConnection = getClientUtil().createConnection(defaultedModifyClasspathProcessor, terminateSuccess, "success");
failureConnection = getClientUtil().createConnection(defaultedModifyClasspathProcessor, terminateFailure, "failure");
}
}