NIFI-11310: Provide appropriate classpath resources to the ReloadComponent when a processor is terminated

NIFI-11310: Fixed META-INF/services file that was mistakenly listing an extra extension point, due to rebase
Signed-off-by: Matthew Burgess <mattyb149@apache.org>

This closes #7061
This commit is contained in:
Mark Payne 2023-03-20 15:42:34 -04:00 committed by Matthew Burgess
parent 63e9365449
commit 6adfb6131d
No known key found for this signature in database
GPG Key ID: 05D3DEB8126DAD24
6 changed files with 337 additions and 2 deletions

View File

@ -56,8 +56,9 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.lang.reflect.InvocationTargetException;
import java.util.Collections;
import java.net.URL;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
@ -440,7 +441,8 @@ public final class StandardProcessScheduler implements ProcessScheduler {
getSchedulingAgent(procNode).incrementMaxThreadCount(tasksTerminated);
try {
flowController.getReloadComponent().reload(procNode, procNode.getProcessor().getClass().getName(), procNode.getBundleCoordinate(), Collections.emptySet());
final Set<URL> additionalUrls = procNode.getAdditionalClasspathResources(procNode.getPropertyDescriptors());
flowController.getReloadComponent().reload(procNode, procNode.getProcessor().getClass().getName(), procNode.getBundleCoordinate(), additionalUrls);
} catch (final ProcessorInstantiationException e) {
// This shouldn't happen because we already have been able to instantiate the processor before
LOG.error("Failed to replace instance of Processor for {} when terminating Processor", procNode);

View File

@ -0,0 +1,125 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.tests.system;
import org.apache.nifi.annotation.behavior.RequiresInstanceClassLoading;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.resource.ResourceCardinality;
import org.apache.nifi.components.resource.ResourceType;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import java.io.BufferedWriter;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
@RequiresInstanceClassLoading
public class DynamicallyModifyClasspath extends AbstractProcessor {
static final PropertyDescriptor URLS = new PropertyDescriptor.Builder()
.name("URLs to Load")
.description("URLs to load onto the classpath")
.required(false)
.dynamicallyModifiesClasspath(true)
.identifiesExternalResource(ResourceCardinality.MULTIPLE, ResourceType.URL, ResourceType.FILE, ResourceType.DIRECTORY)
.build();
static final PropertyDescriptor CLASS_TO_LOAD = new PropertyDescriptor.Builder()
.name("Class to Load")
.description("The name of the Class to load")
.required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
static final PropertyDescriptor SLEEP_DURATION = new PropertyDescriptor.Builder()
.name("Sleep Duration")
.description("Amount of time to sleep in the onTrigger method")
.required(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.defaultValue("0 sec")
.build();
static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description("FlowFiles are routed to this relationship if the specified class can be loaded")
.build();
static final Relationship REL_FAILURE = new Relationship.Builder()
.name("failure")
.description("FlowFiles are routed to this relationship if the specified class cannot be loaded")
.build();
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return Arrays.asList(URLS, CLASS_TO_LOAD, SLEEP_DURATION);
}
@Override
public Set<Relationship> getRelationships() {
return new HashSet<>(Arrays.asList(REL_SUCCESS, REL_FAILURE));
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
FlowFile flowFile = session.get();
if (flowFile == null) {
return;
}
final long sleepMillis = context.getProperty(SLEEP_DURATION).evaluateAttributeExpressions(flowFile).asTimePeriod(TimeUnit.MILLISECONDS);
if (sleepMillis > 0) {
try {
Thread.sleep(sleepMillis);
} catch (final InterruptedException e) {
Thread.currentThread().interrupt();
}
}
final String classToLoad = context.getProperty(CLASS_TO_LOAD).getValue();
try {
final Class<?> clazz = Class.forName(classToLoad);
try (final OutputStream out = session.write(flowFile);
final OutputStreamWriter streamWriter = new OutputStreamWriter(out);
final BufferedWriter writer = new BufferedWriter(streamWriter)) {
writer.write(clazz.getName());
writer.newLine();
writer.write(clazz.getClassLoader().toString());
}
session.transfer(flowFile, REL_SUCCESS);
} catch (final Exception e) {
session.transfer(flowFile, REL_FAILURE);
}
}
}

View File

@ -20,6 +20,7 @@ org.apache.nifi.processors.tests.system.ConcatenateRangeOfFlowFiles
org.apache.nifi.processors.tests.system.DependOnProperties
org.apache.nifi.processors.tests.system.DoNotTransferFlowFile
org.apache.nifi.processors.tests.system.Duplicate
org.apache.nifi.processors.tests.system.DynamicallyModifyClasspath
org.apache.nifi.processors.tests.system.EnsureProcessorConfigurationCorrect
org.apache.nifi.processors.tests.system.EvaluatePropertiesWithDifferentELScopes
org.apache.nifi.processors.tests.system.FakeProcessor

View File

@ -0,0 +1,191 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.tests.system.processor;
import org.apache.nifi.tests.system.NiFiSystemIT;
import org.apache.nifi.toolkit.cli.impl.client.nifi.NiFiClientException;
import org.apache.nifi.web.api.entity.ConnectionEntity;
import org.apache.nifi.web.api.entity.ProcessorEntity;
import org.junit.jupiter.api.Test;
import java.io.File;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import static org.junit.jupiter.api.Assertions.assertEquals;
public class DynamicClassPathModificationIT extends NiFiSystemIT {
private ProcessorEntity generate;
private ProcessorEntity modify;
private ConnectionEntity modifyInputConnection;
private ConnectionEntity successConnection;
private ConnectionEntity failureConnection;
@Test
public void testLoadsClassOnBaseClasspath() throws NiFiClientException, IOException, InterruptedException {
createFlow();
// Configure with a class that is always on the classpath
final Map<String, String> propertyMap = new HashMap<>();
propertyMap.put("Class to Load", "org.apache.nifi.flowfile.FlowFile");
getClientUtil().updateProcessorProperties(modify, propertyMap);
getClientUtil().waitForValidProcessor(modify.getId());
// Let Generate create a FlowFile
getClientUtil().startProcessor(generate);
waitForQueueCount(modifyInputConnection.getId(), 1);
// Start the processor and make sure that the FlowFile is routed to success
getClientUtil().startProcessor(modify);
waitForQueueCount(successConnection.getId(), 1);
assertEquals(0, getConnectionQueueSize(failureConnection.getId()));
}
@Test
public void testLoadsClassFromDynamicModification() throws NiFiClientException, IOException, InterruptedException {
createFlow();
final Map<String, String> propertyMap = new HashMap<>();
propertyMap.put("Class to Load", "org.apache.commons.lang3.StringUtils");
getClientUtil().updateProcessorProperties(modify, propertyMap);
getClientUtil().waitForValidProcessor(modify.getId());
// Let Generate create a FlowFile
getClientUtil().startProcessor(generate);
waitForQueueCount(modifyInputConnection.getId(), 1);
// Start the processor and we expect the FlowFile to go to failure because the StringUtils class should not be available
getClientUtil().startProcessor(modify);
waitForQueueCount(failureConnection.getId(), 1);
assertEquals(0, getConnectionQueueSize(successConnection.getId()));
getClientUtil().stopProcessor(modify);
getClientUtil().stopProcessor(generate);
getClientUtil().waitForStoppedProcessor(modify.getId());
getClientUtil().waitForStoppedProcessor(generate.getId());
// Update modify to have the appropriate URL
propertyMap.put("URLs to Load", getCommonsLangJar().toURI().toURL().toString());
getClientUtil().updateProcessorProperties(modify, propertyMap);
getClientUtil().waitForValidProcessor(modify.getId());
// Let Generate create another FlowFile
getClientUtil().startProcessor(generate);
waitForQueueCount(modifyInputConnection.getId(), 1);
// Wait for a FlowFile to be routed to success
getClientUtil().startProcessor(modify);
waitForQueueCount(successConnection.getId(), 1);
getClientUtil().stopProcessor(generate);
getClientUtil().waitForStoppedProcessor(generate.getId());
// Restart and ensure that everything works as expected after restart
getNiFiInstance().stop();
getNiFiInstance().start(true);
// Feed another FlowFile through. Upon restart, in order to modify, we need to get the most up-to-date revision so will first fetch the Processor
final ProcessorEntity generateAfterRestart = getNifiClient().getProcessorClient().getProcessor(generate.getId());
getClientUtil().startProcessor(generateAfterRestart);
// Depending on whether or not the flow was written out with the processor running, the Modify processor may or may not be running. Ensure that it is running.
getClientUtil().waitForValidationCompleted(modify);
final ProcessorEntity modifyAfterRestart = getNifiClient().getProcessorClient().getProcessor(modify.getId());
final String modifyRunStatus = modifyAfterRestart.getStatus().getRunStatus();
if (!"Running".equalsIgnoreCase(modifyRunStatus)) {
getClientUtil().startProcessor(modifyAfterRestart);
}
// We now expect 2 FlowFiles to be in the success route
waitForQueueCount(successConnection.getId(), 2);
}
@Test
public void testSuccessAfterTerminate() throws NiFiClientException, IOException, InterruptedException {
createFlow();
// Configure so that the processor should succeed but sleep for 5 mins so that we can terminate it
final Map<String, String> propertyMap = new HashMap<>();
propertyMap.put("Class to Load", "org.apache.commons.lang3.StringUtils");
propertyMap.put("URLs to Load", getCommonsLangJar().toURI().toURL().toString());
propertyMap.put("Sleep Duration", "${sleep}");
getClientUtil().updateProcessorProperties(modify, propertyMap);
getClientUtil().waitForValidProcessor(modify.getId());
// Tell Generate Processor to add an attribute named 'sleep' with 5 mins as the value
getClientUtil().updateProcessorProperties(generate, Collections.singletonMap("sleep", "5 mins"));
getClientUtil().waitForValidProcessor(generate.getId());
// Let Generate create a FlowFile
getClientUtil().startProcessor(generate);
waitForQueueCount(modifyInputConnection.getId(), 1);
// Start the processor, wait a bit, stop it and terminate it.
getClientUtil().startProcessor(modify);
Thread.sleep(2000L);
getNifiClient().getProcessorClient().stopProcessor(modify);
getNifiClient().getProcessorClient().terminateProcessor(modify.getId());
getClientUtil().waitForStoppedProcessor(modify.getId());
// Empty the queue and generate another FlowFile with a sleep of 0 sec
getClientUtil().emptyQueue(modifyInputConnection.getId());
getClientUtil().stopProcessor(generate);
getClientUtil().updateProcessorProperties(generate, Collections.singletonMap("sleep", "0 sec"));
getClientUtil().waitForValidProcessor(generate.getId());
getClientUtil().startProcessor(generate);
// Start processor and expect data to go to 'success'. This time the processor will not sleep in onTrigger because
// it is configured to sleep only on the first iteration after an update.
getClientUtil().startProcessor(modify);
waitForQueueCount(successConnection.getId(), 1);
}
private File getCommonsLangJar() {
final File bootstrapLib = new File(getNiFiInstance().getInstanceDirectory(), "lib/bootstrap");
final File[] commonsLangJars = bootstrapLib.listFiles(file -> file.getName().startsWith("commons-lang"));
if (commonsLangJars == null || commonsLangJars.length == 0) {
throw new IllegalStateException("Could not find commons-lang jar in bootstrap lib directory");
}
if (commonsLangJars.length > 1) {
throw new IllegalStateException("Found multiple commons-lang jars in bootstrap lib directory");
}
return commonsLangJars[0];
}
// We have several tests running the same flow but with different configuration. Since we need to reference the ProcessorEntities and ConnectionEntities, we have a method
// that creates the flow and stores the entities are member variables
private void createFlow() throws NiFiClientException, IOException {
generate = getClientUtil().createProcessor("GenerateFlowFile");
modify = getClientUtil().createProcessor("DynamicallyModifyClasspath");
ProcessorEntity terminateSuccess = getClientUtil().createProcessor("TerminateFlowFile");
ProcessorEntity terminateFailure = getClientUtil().createProcessor("TerminateFlowFile");
modifyInputConnection = getClientUtil().createConnection(generate, modify, "success");
successConnection = getClientUtil().createConnection(modify, terminateSuccess, "success");
failureConnection = getClientUtil().createConnection(modify, terminateFailure, "failure");
}
}

View File

@ -57,6 +57,8 @@ public interface ProcessorClient {
PropertyDescriptorEntity getPropertyDescriptor(String processorId, String propertyName, Boolean sensitive) throws NiFiClientException, IOException;
ProcessorEntity terminateProcessor(String processorId) throws NiFiClientException, IOException;
/**
* Indicates that mutable requests should indicate that the client has acknowledged that the node is disconnected.
*/

View File

@ -275,4 +275,18 @@ public class JerseyProcessorClient extends AbstractJerseyClient implements Proce
return getRequestBuilder(target).get(PropertyDescriptorEntity.class);
});
}
@Override
public ProcessorEntity terminateProcessor(final String processorId) throws NiFiClientException, IOException {
Objects.requireNonNull(processorId, "Processor ID required");
return executeAction("Error terminating Processor", () -> {
final WebTarget target = processorTarget
.path("/threads")
.resolveTemplate("id", processorId);
return getRequestBuilder(target).delete(ProcessorEntity.class);
});
}
}