NIFI-2326 This closes #685. fixed test breaking static class causing ordering issues. Addressed potential issue in NarClassLoaders for multi-init scenarios - now idempotent for a given config

This commit is contained in:
joewitt 2016-07-20 10:18:42 -04:00
parent 259f5bba47
commit 2a8be95480
12 changed files with 116 additions and 90 deletions

View File

@ -43,9 +43,9 @@ public class DocGeneratorTest {
NarUnpacker.unpackNars(properties); NarUnpacker.unpackNars(properties);
NarClassLoaders.load(properties); NarClassLoaders.getInstance().init(properties.getFrameworkWorkingDirectory(), properties.getExtensionsWorkingDirectory());
ExtensionManager.discoverExtensions(); ExtensionManager.discoverExtensions(NarClassLoaders.getInstance().getExtensionClassLoaders());
DocGenerator.generate(properties); DocGenerator.generate(properties);

View File

@ -913,7 +913,7 @@ public class StandardFlowService implements FlowService, ProtocolHandler {
@Override @Override
public void run() { public void run() {
final ClassLoader currentCl = Thread.currentThread().getContextClassLoader(); final ClassLoader currentCl = Thread.currentThread().getContextClassLoader();
final ClassLoader cl = NarClassLoaders.getFrameworkClassLoader(); final ClassLoader cl = NarClassLoaders.getInstance().getFrameworkClassLoader();
Thread.currentThread().setContextClassLoader(cl); Thread.currentThread().setContextClassLoader(cl);
try { try {
//Hang onto the SaveHolder here rather than setting it to null because if the save fails we will try again //Hang onto the SaveHolder here rather than setting it to null because if the save fails we will try again

View File

@ -68,6 +68,7 @@ import org.w3c.dom.Node;
* NOT THREAD-SAFE. * NOT THREAD-SAFE.
*/ */
public class StandardFlowSerializer implements FlowSerializer { public class StandardFlowSerializer implements FlowSerializer {
private static final String MAX_ENCODING_VERSION = "1.0"; private static final String MAX_ENCODING_VERSION = "1.0";
private final StringEncryptor encryptor; private final StringEncryptor encryptor;

View File

@ -17,7 +17,6 @@
package org.apache.nifi.persistence; package org.apache.nifi.persistence;
import org.apache.nifi.controller.serialization.FlowSerializationException; import org.apache.nifi.controller.serialization.FlowSerializationException;
import org.apache.nifi.nar.NarClassLoaders;
import org.apache.nifi.web.api.dto.TemplateDTO; import org.apache.nifi.web.api.dto.TemplateDTO;
import javax.xml.bind.JAXBContext; import javax.xml.bind.JAXBContext;
@ -34,10 +33,13 @@ import com.sun.xml.txw2.output.IndentingXMLStreamWriter;
public final class TemplateSerializer { public final class TemplateSerializer {
/**
* This method when called assumes the Framework Nar ClassLoader is in the
* classloader hierarchy of the current context class loader.
* @param dto the template dto to serialize
* @return serialized representation of the DTO
*/
public static byte[] serialize(final TemplateDTO dto) { public static byte[] serialize(final TemplateDTO dto) {
final ClassLoader currentCl = Thread.currentThread().getContextClassLoader();
final ClassLoader cl = NarClassLoaders.getFrameworkClassLoader();
Thread.currentThread().setContextClassLoader(cl);
try { try {
final ByteArrayOutputStream baos = new ByteArrayOutputStream(); final ByteArrayOutputStream baos = new ByteArrayOutputStream();
final BufferedOutputStream bos = new BufferedOutputStream(baos); final BufferedOutputStream bos = new BufferedOutputStream(baos);
@ -49,13 +51,9 @@ public final class TemplateSerializer {
marshaller.marshal(dto, writer); marshaller.marshal(dto, writer);
bos.flush(); bos.flush();
return baos.toByteArray(); return baos.toByteArray(); //Note: For really large templates this could use a lot of heap space
} catch (final IOException | JAXBException | XMLStreamException e) { } catch (final IOException | JAXBException | XMLStreamException e) {
throw new FlowSerializationException(e); throw new FlowSerializationException(e);
} finally {
if (currentCl != null) {
Thread.currentThread().setContextClassLoader(currentCl);
}
} }
} }

View File

@ -38,8 +38,8 @@ public class StandardControllerServiceProviderTest {
public static void setupSuite() throws Exception { public static void setupSuite() throws Exception {
System.setProperty(NiFiProperties.PROPERTIES_FILE_PATH, StandardFlowServiceTest.class.getResource("/conf/nifi.properties").getFile()); System.setProperty(NiFiProperties.PROPERTIES_FILE_PATH, StandardFlowServiceTest.class.getResource("/conf/nifi.properties").getFile());
NiFiProperties properties = NiFiProperties.getInstance(); NiFiProperties properties = NiFiProperties.getInstance();
NarClassLoaders.load(properties); NarClassLoaders.getInstance().init(properties.getFrameworkWorkingDirectory(), properties.getExtensionsWorkingDirectory());
ExtensionManager.discoverExtensions(); ExtensionManager.discoverExtensions(NarClassLoaders.getInstance().getExtensionClassLoaders());
} }
@Before @Before

View File

@ -21,16 +21,11 @@ import static org.junit.Assert.assertEquals;
import java.io.BufferedReader; import java.io.BufferedReader;
import java.io.ByteArrayInputStream; import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream; import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.InputStreamReader; import java.io.InputStreamReader;
import java.lang.reflect.Field;
import java.lang.reflect.Modifier;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.util.HashSet; import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Set; import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import javax.xml.bind.JAXBContext; import javax.xml.bind.JAXBContext;
@ -38,8 +33,6 @@ import javax.xml.bind.JAXBElement;
import javax.xml.bind.Unmarshaller; import javax.xml.bind.Unmarshaller;
import javax.xml.transform.stream.StreamSource; import javax.xml.transform.stream.StreamSource;
import org.apache.nifi.nar.NarClassLoader;
import org.apache.nifi.nar.NarClassLoaders;
import org.apache.nifi.util.TypeOneUUIDGenerator; import org.apache.nifi.util.TypeOneUUIDGenerator;
import org.apache.nifi.web.api.dto.FlowSnippetDTO; import org.apache.nifi.web.api.dto.FlowSnippetDTO;
import org.apache.nifi.web.api.dto.ProcessorDTO; import org.apache.nifi.web.api.dto.ProcessorDTO;
@ -49,18 +42,9 @@ import org.eclipse.jgit.diff.EditList;
import org.eclipse.jgit.diff.HistogramDiff; import org.eclipse.jgit.diff.HistogramDiff;
import org.eclipse.jgit.diff.RawText; import org.eclipse.jgit.diff.RawText;
import org.eclipse.jgit.diff.RawTextComparator; import org.eclipse.jgit.diff.RawTextComparator;
import org.junit.Before;
import org.junit.Test; import org.junit.Test;
public class TemplateSerializerTest { public class TemplateSerializerTest {
@Before
public void before() throws Exception {
Field initField = NarClassLoaders.class.getDeclaredField("initialized");
setFinalField(initField, new AtomicBoolean(true));
Field clField = NarClassLoaders.class.getDeclaredField("frameworkClassLoader");
NarClassLoader cl = new NarClassLoader(new File(""), Thread.currentThread().getContextClassLoader());
setFinalField(clField, new AtomicReference<NarClassLoader>(cl));
}
@Test @Test
public void validateDiffWithChangingComponentIdAndAdditionalElements() throws Exception { public void validateDiffWithChangingComponentIdAndAdditionalElements() throws Exception {
@ -130,11 +114,4 @@ public class TemplateSerializerTest {
} }
} }
public static void setFinalField(Field field, Object newValue) throws Exception {
field.setAccessible(true);
Field modifiersField = Field.class.getDeclaredField("modifiers");
modifiersField.setAccessible(true);
modifiersField.setInt(field, field.getModifiers() & ~Modifier.FINAL);
field.set(null, newValue);
}
} }

View File

@ -68,8 +68,9 @@ public class ExtensionManager {
/** /**
* Loads all FlowFileProcessor, FlowFileComparator, ReportingTask class types that can be found on the bootstrap classloader and by creating classloaders for all NARs found within the classpath. * Loads all FlowFileProcessor, FlowFileComparator, ReportingTask class types that can be found on the bootstrap classloader and by creating classloaders for all NARs found within the classpath.
* @param extensionLoaders the loaders to scan through in search of extensions
*/ */
public static void discoverExtensions() { public static void discoverExtensions(final Set<ClassLoader> extensionLoaders) {
final ClassLoader systemClassLoader = ClassLoader.getSystemClassLoader(); final ClassLoader systemClassLoader = ClassLoader.getSystemClassLoader();
// get the current context class loader // get the current context class loader
@ -79,7 +80,7 @@ public class ExtensionManager {
loadExtensions(systemClassLoader); loadExtensions(systemClassLoader);
// consider each nar class loader // consider each nar class loader
for (final ClassLoader ncl : NarClassLoaders.getExtensionClassLoaders()) { for (final ClassLoader ncl : extensionLoaders) {
// Must set the context class loader to the nar classloader itself // Must set the context class loader to the nar classloader itself
// so that static initialization techniques that depend on the context class loader will work properly // so that static initialization techniques that depend on the context class loader will work properly

View File

@ -28,45 +28,100 @@ import java.util.LinkedHashSet;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.jar.Attributes; import java.util.jar.Attributes;
import java.util.jar.Manifest; import java.util.jar.Manifest;
import org.apache.nifi.util.FileUtils; import org.apache.nifi.util.FileUtils;
import org.apache.nifi.util.NiFiProperties;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
/** /**
* * A singleton class used to initialize the extension and framework
* classloaders.
*/ */
public final class NarClassLoaders { public final class NarClassLoaders {
public static final String FRAMEWORK_NAR_ID = "nifi-framework-nar"; public static final String FRAMEWORK_NAR_ID = "nifi-framework-nar";
public static final String JETTY_NAR_ID = "nifi-jetty-bundle"; public static final String JETTY_NAR_ID = "nifi-jetty-bundle";
private static volatile NarClassLoaders ncl;
private volatile InitContext initContext;
private static final Logger logger = LoggerFactory.getLogger(NarClassLoaders.class); private static final Logger logger = LoggerFactory.getLogger(NarClassLoaders.class);
private static final AtomicBoolean initialized = new AtomicBoolean(false);
private static final AtomicReference<Map<String, ClassLoader>> extensionClassLoaders = new AtomicReference<>(); private final static class InitContext {
private static final AtomicReference<ClassLoader> frameworkClassLoader = new AtomicReference<>();
private final File frameworkWorkingDir;
private final File extensionWorkingDir;
private final ClassLoader frameworkClassLoader;
private final Map<String, ClassLoader> extensionClassLoaders;
private InitContext(
final File frameworkDir,
final File extensionDir,
final ClassLoader frameworkClassloader,
final Map<String, ClassLoader> extensionClassLoaders) {
this.frameworkWorkingDir = frameworkDir;
this.extensionWorkingDir = extensionDir;
this.frameworkClassLoader = frameworkClassloader;
this.extensionClassLoaders = extensionClassLoaders;
}
}
private NarClassLoaders() {
}
/** /**
* Loads the extensions class loaders from the specified working directory. * @return The singleton instance of the NarClassLoaders
* Loading is only performed during the initial invocation of load.
* Subsequent attempts will be ignored.
*
*
* @param properties properties object to initialize with
* @throws java.io.IOException ioe
* @throws java.lang.ClassNotFoundException cfne
* @throws IllegalStateException if the class loaders have already been
* created
*/ */
public static void load(final NiFiProperties properties) throws IOException, ClassNotFoundException { public static NarClassLoaders getInstance() {
if (initialized.getAndSet(true)) { NarClassLoaders result = ncl;
throw new IllegalStateException("Extensions class loaders have already been loaded."); if (result == null) {
synchronized (NarClassLoaders.class) {
result = ncl;
if (result == null) {
ncl = result = new NarClassLoaders();
}
}
} }
return result;
}
/**
* Initializes and loads the NarClassLoaders. This method must be called
* before the rest of the methods to access the classloaders are called and
* it can be safely called any number of times provided the same framework
* and extension working dirs are used.
*
* @param frameworkWorkingDir where to find framework artifacts
* @param extensionsWorkingDir where to find extension artifacts
* @throws java.io.IOException if any issue occurs while exploding nar working directories.
* @throws java.lang.ClassNotFoundException if unable to load class definition
* @throws IllegalStateException already initialized with a given pair of
* directories cannot reinitialize or use a different pair of directories.
*/
public void init(final File frameworkWorkingDir, final File extensionsWorkingDir) throws IOException, ClassNotFoundException {
if (frameworkWorkingDir == null || extensionsWorkingDir == null) {
throw new NullPointerException("cannot have empty arguments");
}
InitContext ic = initContext;
if (ic == null) {
synchronized (this) {
ic = initContext;
if (ic == null) {
initContext = ic = load(frameworkWorkingDir, extensionsWorkingDir);
}
}
}
boolean matching = initContext.extensionWorkingDir.equals(extensionsWorkingDir)
&& initContext.frameworkWorkingDir.equals(frameworkWorkingDir);
if (!matching) {
throw new IllegalStateException("Cannot reinitialize and extension/framework directories cannot change");
}
}
/**
* Should be called at most once.
*/
private InitContext load(final File frameworkWorkingDir, final File extensionsWorkingDir) throws IOException, ClassNotFoundException {
// get the system classloader // get the system classloader
final ClassLoader systemClassLoader = ClassLoader.getSystemClassLoader(); final ClassLoader systemClassLoader = ClassLoader.getSystemClassLoader();
@ -74,19 +129,16 @@ public final class NarClassLoaders {
final Map<String, ClassLoader> extensionDirectoryClassLoaderLookup = new LinkedHashMap<>(); final Map<String, ClassLoader> extensionDirectoryClassLoaderLookup = new LinkedHashMap<>();
final Map<String, ClassLoader> narIdClassLoaderLookup = new HashMap<>(); final Map<String, ClassLoader> narIdClassLoaderLookup = new HashMap<>();
final File frameworkWorkingDirectory = properties.getFrameworkWorkingDirectory();
final File extensionsWorkingDirectory = properties.getExtensionsWorkingDirectory();
// make sure the nar directory is there and accessible // make sure the nar directory is there and accessible
FileUtils.ensureDirectoryExistAndCanAccess(frameworkWorkingDirectory); FileUtils.ensureDirectoryExistAndCanAccess(frameworkWorkingDir);
FileUtils.ensureDirectoryExistAndCanAccess(extensionsWorkingDirectory); FileUtils.ensureDirectoryExistAndCanAccess(extensionsWorkingDir);
final List<File> narWorkingDirContents = new ArrayList<>(); final List<File> narWorkingDirContents = new ArrayList<>();
final File[] frameworkWorkingDirContents = frameworkWorkingDirectory.listFiles(); final File[] frameworkWorkingDirContents = frameworkWorkingDir.listFiles();
if (frameworkWorkingDirContents != null) { if (frameworkWorkingDirContents != null) {
narWorkingDirContents.addAll(Arrays.asList(frameworkWorkingDirContents)); narWorkingDirContents.addAll(Arrays.asList(frameworkWorkingDirContents));
} }
final File[] extensionsWorkingDirContents = extensionsWorkingDirectory.listFiles(); final File[] extensionsWorkingDirContents = extensionsWorkingDir.listFiles();
if (extensionsWorkingDirContents != null) { if (extensionsWorkingDirContents != null) {
narWorkingDirContents.addAll(Arrays.asList(extensionsWorkingDirContents)); narWorkingDirContents.addAll(Arrays.asList(extensionsWorkingDirContents));
} }
@ -165,11 +217,7 @@ public final class NarClassLoaders {
} }
} }
// set the framework class loader return new InitContext(frameworkWorkingDir, extensionsWorkingDir, narIdClassLoaderLookup.get(FRAMEWORK_NAR_ID), new LinkedHashMap<>(extensionDirectoryClassLoaderLookup));
frameworkClassLoader.set(narIdClassLoaderLookup.get(FRAMEWORK_NAR_ID));
// set the extensions class loader map
extensionClassLoaders.set(new LinkedHashMap<>(extensionDirectoryClassLoaderLookup));
} }
/** /**
@ -219,12 +267,12 @@ public final class NarClassLoaders {
* @throws IllegalStateException if the frame class loader has not been * @throws IllegalStateException if the frame class loader has not been
* loaded * loaded
*/ */
public static ClassLoader getFrameworkClassLoader() { public ClassLoader getFrameworkClassLoader() {
if (!initialized.get()) { if (initContext == null) {
throw new IllegalStateException("Framework class loader has not been loaded."); throw new IllegalStateException("Framework class loader has not been loaded.");
} }
return frameworkClassLoader.get(); return initContext.frameworkClassLoader;
} }
/** /**
@ -233,14 +281,17 @@ public final class NarClassLoaders {
* null when no class loader exists for the specified working directory * null when no class loader exists for the specified working directory
* @throws IllegalStateException if the class loaders have not been loaded * @throws IllegalStateException if the class loaders have not been loaded
*/ */
public static ClassLoader getExtensionClassLoader(final File extensionWorkingDirectory) { public ClassLoader getExtensionClassLoader(final File extensionWorkingDirectory) {
if (!initialized.get()) { if (initContext == null) {
throw new IllegalStateException("Extensions class loaders have not been loaded."); throw new IllegalStateException("Extensions class loaders have not been loaded.");
} }
try { try {
return extensionClassLoaders.get().get(extensionWorkingDirectory.getCanonicalPath()); return initContext.extensionClassLoaders.get(extensionWorkingDirectory.getCanonicalPath());
} catch (final IOException ioe) { } catch (final IOException ioe) {
if(logger.isDebugEnabled()){
logger.debug("Unable to get extension classloader for working directory '{}'", extensionWorkingDirectory);
}
return null; return null;
} }
} }
@ -249,12 +300,12 @@ public final class NarClassLoaders {
* @return the extension class loaders * @return the extension class loaders
* @throws IllegalStateException if the class loaders have not been loaded * @throws IllegalStateException if the class loaders have not been loaded
*/ */
public static Set<ClassLoader> getExtensionClassLoaders() { public Set<ClassLoader> getExtensionClassLoaders() {
if (!initialized.get()) { if (initContext == null) {
throw new IllegalStateException("Extensions class loaders have not been loaded."); throw new IllegalStateException("Extensions class loaders have not been loaded.");
} }
return new LinkedHashSet<>(extensionClassLoaders.get().values()); return new LinkedHashSet<>(initContext.extensionClassLoaders.values());
} }
private static class NarDetails { private static class NarDetails {
@ -288,6 +339,4 @@ public final class NarClassLoaders {
} }
} }
private NarClassLoaders() {
}
} }

View File

@ -108,16 +108,16 @@ public class NiFi {
final ExtensionMapping extensionMapping = NarUnpacker.unpackNars(properties); final ExtensionMapping extensionMapping = NarUnpacker.unpackNars(properties);
// load the extensions classloaders // load the extensions classloaders
NarClassLoaders.load(properties); NarClassLoaders.getInstance().init(properties.getFrameworkWorkingDirectory(), properties.getExtensionsWorkingDirectory());
// load the framework classloader // load the framework classloader
final ClassLoader frameworkClassLoader = NarClassLoaders.getFrameworkClassLoader(); final ClassLoader frameworkClassLoader = NarClassLoaders.getInstance().getFrameworkClassLoader();
if (frameworkClassLoader == null) { if (frameworkClassLoader == null) {
throw new IllegalStateException("Unable to find the framework NAR ClassLoader."); throw new IllegalStateException("Unable to find the framework NAR ClassLoader.");
} }
// discover the extensions // discover the extensions
ExtensionManager.discoverExtensions(); ExtensionManager.discoverExtensions(NarClassLoaders.getInstance().getExtensionClassLoaders());
ExtensionManager.logClassLoaderMapping(); ExtensionManager.logClassLoaderMapping();
DocGenerator.generate(properties); DocGenerator.generate(properties);

View File

@ -238,7 +238,7 @@ public class JettyServer implements NiFiServer {
String warContextPath = String.format("/%s", warName); String warContextPath = String.format("/%s", warName);
// attempt to locate the nar class loader for this war // attempt to locate the nar class loader for this war
ClassLoader narClassLoaderForWar = NarClassLoaders.getExtensionClassLoader(warToNarWorkingDirectoryLookup.get(war)); ClassLoader narClassLoaderForWar = NarClassLoaders.getInstance().getExtensionClassLoader(warToNarWorkingDirectoryLookup.get(war));
// this should never be null // this should never be null
if (narClassLoaderForWar == null) { if (narClassLoaderForWar == null) {

View File

@ -76,8 +76,8 @@ public class AccessControlHelper {
} }
// load extensions // load extensions
NarClassLoaders.load(props); NarClassLoaders.getInstance().init(props.getFrameworkWorkingDirectory(), props.getExtensionsWorkingDirectory());
ExtensionManager.discoverExtensions(); ExtensionManager.discoverExtensions(NarClassLoaders.getInstance().getExtensionClassLoaders());
// start the server // start the server
server = new NiFiTestServer("src/main/webapp", CONTEXT_PATH); server = new NiFiTestServer("src/main/webapp", CONTEXT_PATH);

View File

@ -71,8 +71,8 @@ public class ITAccessTokenEndpoint {
FileUtils.deleteDirectory(props.getDatabaseRepositoryPath().toFile()); FileUtils.deleteDirectory(props.getDatabaseRepositoryPath().toFile());
// load extensions // load extensions
NarClassLoaders.load(props); NarClassLoaders.getInstance().init(props.getFrameworkWorkingDirectory(), props.getExtensionsWorkingDirectory());
ExtensionManager.discoverExtensions(); ExtensionManager.discoverExtensions(NarClassLoaders.getInstance().getExtensionClassLoaders());
// start the server // start the server
SERVER = new NiFiTestServer("src/main/webapp", CONTEXT_PATH); SERVER = new NiFiTestServer("src/main/webapp", CONTEXT_PATH);