diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java index 14019dca2d..87a5ac558f 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java @@ -56,8 +56,9 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.lang.reflect.InvocationTargetException; -import java.util.Collections; +import java.net.URL; import java.util.List; +import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; @@ -440,7 +441,8 @@ public final class StandardProcessScheduler implements ProcessScheduler { getSchedulingAgent(procNode).incrementMaxThreadCount(tasksTerminated); try { - flowController.getReloadComponent().reload(procNode, procNode.getProcessor().getClass().getName(), procNode.getBundleCoordinate(), Collections.emptySet()); + final Set additionalUrls = procNode.getAdditionalClasspathResources(procNode.getPropertyDescriptors()); + flowController.getReloadComponent().reload(procNode, procNode.getProcessor().getClass().getName(), procNode.getBundleCoordinate(), additionalUrls); } catch (final ProcessorInstantiationException e) { // This shouldn't happen because we already have been able to instantiate the processor before LOG.error("Failed to replace instance of Processor for {} when terminating Processor", procNode); diff --git a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/processors/tests/system/DynamicallyModifyClasspath.java b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/processors/tests/system/DynamicallyModifyClasspath.java new file mode 100644 index 0000000000..3e77c010d4 --- /dev/null +++ b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/processors/tests/system/DynamicallyModifyClasspath.java @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.tests.system; + +import org.apache.nifi.annotation.behavior.RequiresInstanceClassLoading; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.resource.ResourceCardinality; +import org.apache.nifi.components.resource.ResourceType; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; + +import java.io.BufferedWriter; +import java.io.OutputStream; +import java.io.OutputStreamWriter; +import java.util.Arrays; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +@RequiresInstanceClassLoading +public class DynamicallyModifyClasspath extends AbstractProcessor { + + static final PropertyDescriptor URLS = new PropertyDescriptor.Builder() + .name("URLs to Load") + .description("URLs to load onto the classpath") + .required(false) + .dynamicallyModifiesClasspath(true) + .identifiesExternalResource(ResourceCardinality.MULTIPLE, ResourceType.URL, ResourceType.FILE, ResourceType.DIRECTORY) + .build(); + + static final PropertyDescriptor CLASS_TO_LOAD = new PropertyDescriptor.Builder() + .name("Class to Load") + .description("The name of the Class to load") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + static final PropertyDescriptor SLEEP_DURATION = new PropertyDescriptor.Builder() + .name("Sleep Duration") + .description("Amount of time to sleep in the onTrigger method") + .required(false) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .defaultValue("0 sec") + .build(); + + + static final Relationship REL_SUCCESS = new Relationship.Builder() + .name("success") + .description("FlowFiles are routed to this relationship if the specified class can be loaded") + .build(); + static final Relationship REL_FAILURE = new Relationship.Builder() + .name("failure") + .description("FlowFiles are routed to this relationship if the specified class cannot be loaded") + .build(); + + + + @Override + protected List getSupportedPropertyDescriptors() { + return Arrays.asList(URLS, CLASS_TO_LOAD, SLEEP_DURATION); + } + + @Override + public Set getRelationships() { + return new HashSet<>(Arrays.asList(REL_SUCCESS, REL_FAILURE)); + } + + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { + FlowFile flowFile = session.get(); + if (flowFile == null) { + return; + } + + final long sleepMillis = context.getProperty(SLEEP_DURATION).evaluateAttributeExpressions(flowFile).asTimePeriod(TimeUnit.MILLISECONDS); + if (sleepMillis > 0) { + try { + Thread.sleep(sleepMillis); + } catch (final InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + + final String classToLoad = context.getProperty(CLASS_TO_LOAD).getValue(); + try { + final Class clazz = Class.forName(classToLoad); + try (final OutputStream out = session.write(flowFile); + final OutputStreamWriter streamWriter = new OutputStreamWriter(out); + final BufferedWriter writer = new BufferedWriter(streamWriter)) { + + writer.write(clazz.getName()); + writer.newLine(); + writer.write(clazz.getClassLoader().toString()); + } + + session.transfer(flowFile, REL_SUCCESS); + } catch (final Exception e) { + session.transfer(flowFile, REL_FAILURE); + } + } +} diff --git a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor index b0262a6dce..83e1f6a3fd 100644 --- a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor +++ b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor @@ -20,6 +20,7 @@ org.apache.nifi.processors.tests.system.ConcatenateRangeOfFlowFiles org.apache.nifi.processors.tests.system.DependOnProperties org.apache.nifi.processors.tests.system.DoNotTransferFlowFile org.apache.nifi.processors.tests.system.Duplicate +org.apache.nifi.processors.tests.system.DynamicallyModifyClasspath org.apache.nifi.processors.tests.system.EnsureProcessorConfigurationCorrect org.apache.nifi.processors.tests.system.EvaluatePropertiesWithDifferentELScopes org.apache.nifi.processors.tests.system.FakeProcessor diff --git a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/processor/DynamicClassPathModificationIT.java b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/processor/DynamicClassPathModificationIT.java new file mode 100644 index 0000000000..a5561e40bf --- /dev/null +++ b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/processor/DynamicClassPathModificationIT.java @@ -0,0 +1,191 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.tests.system.processor; + +import org.apache.nifi.tests.system.NiFiSystemIT; +import org.apache.nifi.toolkit.cli.impl.client.nifi.NiFiClientException; +import org.apache.nifi.web.api.entity.ConnectionEntity; +import org.apache.nifi.web.api.entity.ProcessorEntity; +import org.junit.jupiter.api.Test; + +import java.io.File; +import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +public class DynamicClassPathModificationIT extends NiFiSystemIT { + + private ProcessorEntity generate; + private ProcessorEntity modify; + + private ConnectionEntity modifyInputConnection; + private ConnectionEntity successConnection; + private ConnectionEntity failureConnection; + + + @Test + public void testLoadsClassOnBaseClasspath() throws NiFiClientException, IOException, InterruptedException { + createFlow(); + + // Configure with a class that is always on the classpath + final Map propertyMap = new HashMap<>(); + propertyMap.put("Class to Load", "org.apache.nifi.flowfile.FlowFile"); + getClientUtil().updateProcessorProperties(modify, propertyMap); + getClientUtil().waitForValidProcessor(modify.getId()); + + // Let Generate create a FlowFile + getClientUtil().startProcessor(generate); + waitForQueueCount(modifyInputConnection.getId(), 1); + + // Start the processor and make sure that the FlowFile is routed to success + getClientUtil().startProcessor(modify); + waitForQueueCount(successConnection.getId(), 1); + assertEquals(0, getConnectionQueueSize(failureConnection.getId())); + } + + @Test + public void testLoadsClassFromDynamicModification() throws NiFiClientException, IOException, InterruptedException { + createFlow(); + + final Map propertyMap = new HashMap<>(); + propertyMap.put("Class to Load", "org.apache.commons.lang3.StringUtils"); + getClientUtil().updateProcessorProperties(modify, propertyMap); + getClientUtil().waitForValidProcessor(modify.getId()); + + // Let Generate create a FlowFile + getClientUtil().startProcessor(generate); + waitForQueueCount(modifyInputConnection.getId(), 1); + + // Start the processor and we expect the FlowFile to go to failure because the StringUtils class should not be available + getClientUtil().startProcessor(modify); + waitForQueueCount(failureConnection.getId(), 1); + assertEquals(0, getConnectionQueueSize(successConnection.getId())); + + getClientUtil().stopProcessor(modify); + getClientUtil().stopProcessor(generate); + + getClientUtil().waitForStoppedProcessor(modify.getId()); + getClientUtil().waitForStoppedProcessor(generate.getId()); + + // Update modify to have the appropriate URL + propertyMap.put("URLs to Load", getCommonsLangJar().toURI().toURL().toString()); + getClientUtil().updateProcessorProperties(modify, propertyMap); + getClientUtil().waitForValidProcessor(modify.getId()); + + // Let Generate create another FlowFile + getClientUtil().startProcessor(generate); + waitForQueueCount(modifyInputConnection.getId(), 1); + + // Wait for a FlowFile to be routed to success + getClientUtil().startProcessor(modify); + waitForQueueCount(successConnection.getId(), 1); + + getClientUtil().stopProcessor(generate); + getClientUtil().waitForStoppedProcessor(generate.getId()); + + // Restart and ensure that everything works as expected after restart + getNiFiInstance().stop(); + getNiFiInstance().start(true); + + // Feed another FlowFile through. Upon restart, in order to modify, we need to get the most up-to-date revision so will first fetch the Processor + final ProcessorEntity generateAfterRestart = getNifiClient().getProcessorClient().getProcessor(generate.getId()); + getClientUtil().startProcessor(generateAfterRestart); + + // Depending on whether or not the flow was written out with the processor running, the Modify processor may or may not be running. Ensure that it is running. + getClientUtil().waitForValidationCompleted(modify); + final ProcessorEntity modifyAfterRestart = getNifiClient().getProcessorClient().getProcessor(modify.getId()); + final String modifyRunStatus = modifyAfterRestart.getStatus().getRunStatus(); + if (!"Running".equalsIgnoreCase(modifyRunStatus)) { + getClientUtil().startProcessor(modifyAfterRestart); + } + + // We now expect 2 FlowFiles to be in the success route + waitForQueueCount(successConnection.getId(), 2); + } + + + @Test + public void testSuccessAfterTerminate() throws NiFiClientException, IOException, InterruptedException { + createFlow(); + + // Configure so that the processor should succeed but sleep for 5 mins so that we can terminate it + final Map propertyMap = new HashMap<>(); + propertyMap.put("Class to Load", "org.apache.commons.lang3.StringUtils"); + propertyMap.put("URLs to Load", getCommonsLangJar().toURI().toURL().toString()); + propertyMap.put("Sleep Duration", "${sleep}"); + getClientUtil().updateProcessorProperties(modify, propertyMap); + getClientUtil().waitForValidProcessor(modify.getId()); + + // Tell Generate Processor to add an attribute named 'sleep' with 5 mins as the value + getClientUtil().updateProcessorProperties(generate, Collections.singletonMap("sleep", "5 mins")); + getClientUtil().waitForValidProcessor(generate.getId()); + + // Let Generate create a FlowFile + getClientUtil().startProcessor(generate); + waitForQueueCount(modifyInputConnection.getId(), 1); + + // Start the processor, wait a bit, stop it and terminate it. + getClientUtil().startProcessor(modify); + Thread.sleep(2000L); + getNifiClient().getProcessorClient().stopProcessor(modify); + getNifiClient().getProcessorClient().terminateProcessor(modify.getId()); + getClientUtil().waitForStoppedProcessor(modify.getId()); + + // Empty the queue and generate another FlowFile with a sleep of 0 sec + getClientUtil().emptyQueue(modifyInputConnection.getId()); + getClientUtil().stopProcessor(generate); + getClientUtil().updateProcessorProperties(generate, Collections.singletonMap("sleep", "0 sec")); + getClientUtil().waitForValidProcessor(generate.getId()); + getClientUtil().startProcessor(generate); + + // Start processor and expect data to go to 'success'. This time the processor will not sleep in onTrigger because + // it is configured to sleep only on the first iteration after an update. + getClientUtil().startProcessor(modify); + waitForQueueCount(successConnection.getId(), 1); + } + + private File getCommonsLangJar() { + final File bootstrapLib = new File(getNiFiInstance().getInstanceDirectory(), "lib/bootstrap"); + final File[] commonsLangJars = bootstrapLib.listFiles(file -> file.getName().startsWith("commons-lang")); + if (commonsLangJars == null || commonsLangJars.length == 0) { + throw new IllegalStateException("Could not find commons-lang jar in bootstrap lib directory"); + } + + if (commonsLangJars.length > 1) { + throw new IllegalStateException("Found multiple commons-lang jars in bootstrap lib directory"); + } + + return commonsLangJars[0]; + } + + // We have several tests running the same flow but with different configuration. Since we need to reference the ProcessorEntities and ConnectionEntities, we have a method + // that creates the flow and stores the entities are member variables + private void createFlow() throws NiFiClientException, IOException { + generate = getClientUtil().createProcessor("GenerateFlowFile"); + modify = getClientUtil().createProcessor("DynamicallyModifyClasspath"); + ProcessorEntity terminateSuccess = getClientUtil().createProcessor("TerminateFlowFile"); + ProcessorEntity terminateFailure = getClientUtil().createProcessor("TerminateFlowFile"); + + modifyInputConnection = getClientUtil().createConnection(generate, modify, "success"); + successConnection = getClientUtil().createConnection(modify, terminateSuccess, "success"); + failureConnection = getClientUtil().createConnection(modify, terminateFailure, "failure"); + } +} diff --git a/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/ProcessorClient.java b/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/ProcessorClient.java index 1b355fbe8c..40a6c6537c 100644 --- a/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/ProcessorClient.java +++ b/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/ProcessorClient.java @@ -57,6 +57,8 @@ public interface ProcessorClient { PropertyDescriptorEntity getPropertyDescriptor(String processorId, String propertyName, Boolean sensitive) throws NiFiClientException, IOException; + ProcessorEntity terminateProcessor(String processorId) throws NiFiClientException, IOException; + /** * Indicates that mutable requests should indicate that the client has acknowledged that the node is disconnected. */ diff --git a/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/impl/JerseyProcessorClient.java b/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/impl/JerseyProcessorClient.java index 3de97619b7..bcd22ed046 100644 --- a/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/impl/JerseyProcessorClient.java +++ b/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/impl/JerseyProcessorClient.java @@ -275,4 +275,18 @@ public class JerseyProcessorClient extends AbstractJerseyClient implements Proce return getRequestBuilder(target).get(PropertyDescriptorEntity.class); }); } + + @Override + public ProcessorEntity terminateProcessor(final String processorId) throws NiFiClientException, IOException { + Objects.requireNonNull(processorId, "Processor ID required"); + + return executeAction("Error terminating Processor", () -> { + final WebTarget target = processorTarget + .path("/threads") + .resolveTemplate("id", processorId); + + return getRequestBuilder(target).delete(ProcessorEntity.class); + }); + } + }