NIFI-4864 Fixing Additional Resources property pointing at a directory won't find new JARs

NIFI-4864: Code refactor and improved additionalUrlsFingerprint implementation
Signed-off-by: Bryan Bende <bbende@apache.org>
This commit is contained in:
zenfenan 2018-02-14 21:39:04 +05:30 committed by Bryan Bende
parent f1d7518547
commit fad152f389
No known key found for this signature in database
GPG Key ID: A0DDA9ED50711C39
10 changed files with 114 additions and 4 deletions

View File

@ -19,17 +19,24 @@ package org.apache.nifi.util.file.classloader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.xml.bind.DatatypeConverter;
import java.io.File;
import java.io.FilenameFilter;
import java.io.UnsupportedEncodingException;
import java.net.MalformedURLException;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URL;
import java.net.URLClassLoader;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.Arrays;
import java.util.Collections;
import java.util.LinkedHashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
public class ClassLoaderUtils {
@ -131,6 +138,39 @@ public class ClassLoaderUtils {
return additionalClasspath.toArray(new URL[additionalClasspath.size()]);
}
public static String generateAdditionalUrlsFingerprint(Set<URL> urls) {
MessageDigest md;
List<String> listOfUrls = urls.stream().map(Object::toString).collect(Collectors.toList());
byte[] bytesOfAdditionalUrls, bytesOfDigest;
StringBuffer urlBuffer = new StringBuffer();
//Sorting so that the order is maintained for generating the fingerprint
Collections.sort(listOfUrls);
try {
md = MessageDigest.getInstance("MD5");
listOfUrls.forEach(url -> {
urlBuffer.append(url).append("-").append(getLastModified(url)).append(";");
});
bytesOfAdditionalUrls = urlBuffer.toString().getBytes("UTF-8");
bytesOfDigest = md.digest(bytesOfAdditionalUrls);
return DatatypeConverter.printHexBinary(bytesOfDigest);
} catch (NoSuchAlgorithmException | UnsupportedEncodingException e) {
LOGGER.error("Unable to generate fingerprint for the provided additional resources {}", new Object[]{urls, e});
}
return null;
}
private static long getLastModified(String url) {
File file = null;
try {
file = new File(new URI(url));
} catch (URISyntaxException e) {
e.printStackTrace();
}
return file != null ? file.lastModified() : 0;
}
protected static ClassLoader createModuleClassLoader(URL[] modules, ClassLoader parentClassLoader) {
return new URLClassLoader(modules, parentClassLoader);
}

View File

@ -18,7 +18,9 @@ package org.apache.nifi.util.file.classloader;
import java.io.FilenameFilter;
import java.net.MalformedURLException;
import java.net.URISyntaxException;
import java.net.URL;
import java.nio.file.Paths;
import java.util.HashSet;
import java.util.Set;
@ -120,6 +122,15 @@ public class TestClassLoaderUtils {
assertEquals(1, urls.length);
}
@Test
public void testGenerateAdditionalUrlsFingerprint() throws MalformedURLException, URISyntaxException {
final Set<URL> urls = new HashSet<>();
URL testUrl = Paths.get("src/test/resources/TestClassLoaderUtils/TestSuccess.jar").toUri().toURL();
urls.add(testUrl);
String testFingerprint = ClassLoaderUtils.generateAdditionalUrlsFingerprint(urls);
assertNotNull(testFingerprint);
}
protected FilenameFilter getJarFilenameFilter(){
return (dir, name) -> name != null && name.endsWith(".jar");
}

View File

@ -71,6 +71,7 @@ public abstract class AbstractConfiguredComponent implements ConfigurableCompone
private final Lock lock = new ReentrantLock();
private final ConcurrentMap<PropertyDescriptor, String> properties = new ConcurrentHashMap<>();
private volatile String additionalResourcesFingerprint;
public AbstractConfiguredComponent(final String id,
final ValidationContextFactory validationContextFactory, final ControllerServiceProvider serviceProvider,
@ -297,6 +298,33 @@ public abstract class AbstractConfiguredComponent implements ConfigurableCompone
.forEach(e -> setProperty(e.getKey().getName(), e.getValue()));
}
/**
* Generates fingerprint for the additional urls and compares it with the previous
* fingerprint value. If the fingerprint values don't match, the function calls the
* component's reload() to load the newly found resources.
*/
public void reloadAdditionalResourcesIfNecessary(){
String oldFingerprint, newFingerprint;
final List<PropertyDescriptor> descriptors = new ArrayList<>(this.getProperties().keySet());
final Set<URL> additionalUrls = this.getAdditionalClasspathResources(descriptors);
newFingerprint = ClassLoaderUtils.generateAdditionalUrlsFingerprint(additionalUrls);
if(this.hasAdditionalResourcesFingerprint()){
oldFingerprint = this.getAdditionalResourcesFingerprint();
if(!oldFingerprint.equals(newFingerprint)) {
this.setAdditionalResourcesFingerprint(newFingerprint);
try {
logger.info("Adding new resources found to classpath for the component"+ this.componentType +" with the ID "+this.getIdentifier());
reload(additionalUrls);
} catch (Exception e) {
logger.error("Error reloading component with id " + id + ": " + e.getMessage(), e);
}
}
}
}
@Override
public int hashCode() {
return 273171 * id.hashCode();
@ -568,4 +596,17 @@ public abstract class AbstractConfiguredComponent implements ConfigurableCompone
}
}
}
public String getAdditionalResourcesFingerprint() {
return additionalResourcesFingerprint;
}
public boolean hasAdditionalResourcesFingerprint() {
return !StringUtils.isEmpty(additionalResourcesFingerprint);
}
public void setAdditionalResourcesFingerprint(String additionalResourcesFingerprint) {
this.additionalResourcesFingerprint = additionalResourcesFingerprint;
}
}

View File

@ -81,6 +81,14 @@ public interface ConfiguredComponent extends ComponentAuthorizable {
void verifyCanUpdateBundle(BundleCoordinate bundleCoordinate) throws IllegalStateException;
boolean hasAdditionalResourcesFingerprint();
String getAdditionalResourcesFingerprint();
void setAdditionalResourcesFingerprint(String newFingerprint);
void reloadAdditionalResourcesIfNecessary();
/**
* @return the any validation errors for this connectable
*/

View File

@ -3447,7 +3447,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
if (isTerminated()) {
throw new IllegalStateException("Cannot start reporting task " + reportingTaskNode.getIdentifier() + " because the controller is terminated");
}
reportingTaskNode.reloadAdditionalResourcesIfNecessary();
reportingTaskNode.verifyCanStart();
processScheduler.schedule(reportingTaskNode);
}
@ -3571,6 +3571,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
@Override
public void enableReportingTask(final ReportingTaskNode reportingTaskNode) {
reportingTaskNode.verifyCanEnable();
reportingTaskNode.reloadAdditionalResourcesIfNecessary();
processScheduler.enableReportingTask(reportingTaskNode);
}

View File

@ -86,6 +86,7 @@ import org.apache.nifi.util.CharacterFilterUtils;
import org.apache.nifi.util.FormatUtils;
import org.apache.nifi.util.NiFiProperties;
import org.apache.nifi.util.ReflectionUtils;
import org.apache.nifi.util.file.classloader.ClassLoaderUtils;
import org.quartz.CronExpression;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -890,7 +891,8 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
if (isRunning()) {
throw new IllegalStateException("Cannot reload Processor while the Processor is running");
}
String additionalResourcesFingerprint = ClassLoaderUtils.generateAdditionalUrlsFingerprint(additionalUrls);
setAdditionalResourcesFingerprint(additionalResourcesFingerprint);
getReloadComponent().reload(this, getCanonicalClassName(), getBundleCoordinate(), additionalUrls);
}

View File

@ -46,6 +46,7 @@ import org.apache.nifi.reporting.ReportingTask;
import org.apache.nifi.scheduling.SchedulingStrategy;
import org.apache.nifi.util.CharacterFilterUtils;
import org.apache.nifi.util.FormatUtils;
import org.apache.nifi.util.file.classloader.ClassLoaderUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.core.annotation.AnnotationUtils;
@ -161,7 +162,8 @@ public abstract class AbstractReportingTaskNode extends AbstractConfiguredCompon
if (isRunning()) {
throw new IllegalStateException("Cannot reload Reporting Task while Reporting Task is running");
}
String additionalResourcesFingerprint = ClassLoaderUtils.generateAdditionalUrlsFingerprint(additionalUrls);
setAdditionalResourcesFingerprint(additionalResourcesFingerprint);
getReloadComponent().reload(this, getCanonicalClassName(), getBundleCoordinate(), additionalUrls);
}

View File

@ -63,6 +63,7 @@ import org.apache.nifi.processor.SimpleProcessLogger;
import org.apache.nifi.registry.ComponentVariableRegistry;
import org.apache.nifi.util.CharacterFilterUtils;
import org.apache.nifi.util.ReflectionUtils;
import org.apache.nifi.util.file.classloader.ClassLoaderUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -195,7 +196,8 @@ public class StandardControllerServiceNode extends AbstractConfiguredComponent i
if (isActive()) {
throw new IllegalStateException("Cannot reload Controller Service while service is active");
}
String additionalResourcesFingerprint = ClassLoaderUtils.generateAdditionalUrlsFingerprint(additionalUrls);
setAdditionalResourcesFingerprint(additionalResourcesFingerprint);
getReloadComponent().reload(this, getCanonicalClassName(), getBundleCoordinate(), additionalUrls);
}
}

View File

@ -72,6 +72,7 @@ import org.apache.nifi.reporting.BulletinRepository;
import org.apache.nifi.reporting.Severity;
import org.apache.nifi.util.NiFiProperties;
import org.apache.nifi.util.ReflectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -341,6 +342,7 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi
@Override
public CompletableFuture<Void> enableControllerService(final ControllerServiceNode serviceNode) {
serviceNode.verifyCanEnable();
serviceNode.reloadAdditionalResourcesIfNecessary();
return processScheduler.enableControllerService(serviceNode);
}

View File

@ -1220,6 +1220,7 @@ public final class StandardProcessGroup implements ProcessGroup {
} else if (state == ScheduledState.RUNNING) {
return CompletableFuture.completedFuture(null);
}
processor.reloadAdditionalResourcesIfNecessary();
return scheduler.startProcessor(processor, failIfStopping);
} finally {