NIFI-13713 Use Asynchronous Loading for Extension UIs in Jetty Server (#9231)

Signed-off-by: David Handermann <exceptionfactory@apache.org>
This commit is contained in:
Bryan Bende 2024-09-09 08:47:35 -04:00 committed by GitHub
parent bb7cce50d2
commit 68b74547e8
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
1 changed files with 56 additions and 1 deletions

View File

@ -32,6 +32,7 @@ import java.net.URISyntaxException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@ -44,7 +45,11 @@ import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Predicate;
import java.util.jar.JarEntry;
import java.util.jar.JarFile;
@ -182,6 +187,8 @@ public class JettyServer implements NiFiServer, ExtensionUiLoader {
private static final String SPRING_SECURITY_FILTER_CHAIN = "springSecurityFilterChain";
private static final Duration EXTENSION_UI_POLL_INTERVAL = Duration.ofSeconds(5);
private final DeploymentManager deploymentManager = new DeploymentManager();
private Server server;
@ -211,6 +218,9 @@ public class JettyServer implements NiFiServer, ExtensionUiLoader {
private final Map<BundleCoordinate, List<App>> appsByBundleCoordinate = new ConcurrentHashMap<>();
private final BlockingQueue<Bundle> extensionUisToLoad = new LinkedBlockingQueue<>();
private final ExtensionUiLoadTask extensionUiLoadTask = new ExtensionUiLoadTask(extensionUisToLoad, this::processExtensionUiBundle);
/**
* Default no-arg constructor for ServiceLoader
*/
@ -328,8 +338,12 @@ public class JettyServer implements NiFiServer, ExtensionUiLoader {
@Override
public synchronized void loadExtensionUis(final Set<Bundle> bundles) {
extensionUisToLoad.addAll(bundles);
}
private void processExtensionUiBundle(final Bundle bundle) {
// Find and load any WARs contained within the set of bundles...
final Map<File, Bundle> warToBundleLookup = findWars(bundles);
final Map<File, Bundle> warToBundleLookup = findWars(Set.of(bundle));
final ExtensionUiInfo extensionUiInfo = loadWars(warToBundleLookup);
final Map<BundleCoordinate, List<WebAppContext>> webappContextsByBundleCoordinate = extensionUiInfo.webAppContextsByBundleCoordinate();
@ -929,6 +943,10 @@ public class JettyServer implements NiFiServer, ExtensionUiLoader {
webDocsServletContext.setAttribute("nifi-python-extension-mapping", pythonExtensionMapping);
}
// Start background task to process bundles that are submitted for loading extension UIs, this needs to be
// started after Jetty has been started to ensure the Spring WebApplicationContext is available
Thread.ofVirtual().name("Extension UI Loader").start(extensionUiLoadTask);
// if this nifi is a node in a cluster, start the flow service and load the flow - the
// flow service is loaded here for clustered nodes because the loading of the flow will
// initialize the connection between the node and the coordinator. if the node connects (starts
@ -1112,6 +1130,7 @@ public class JettyServer implements NiFiServer, ExtensionUiLoader {
logger.warn("Failed to stop NAR provider", e);
}
extensionUiLoadTask.stop();
}
private ErrorPageErrorHandler getErrorHandler() {
@ -1170,5 +1189,41 @@ public class JettyServer implements NiFiServer, ExtensionUiLoader {
}
}
/**
* Task that asynchronously processes any bundles that were submitted to have extension UIs loaded.
*/
private static class ExtensionUiLoadTask implements Runnable {
private final BlockingQueue<Bundle> extensionUiBundlesToLoad;
private final Consumer<Bundle> extensionUiLoadFunction;
private volatile boolean stopped = false;
public ExtensionUiLoadTask(final BlockingQueue<Bundle> extensionUiBundlesToLoad, final Consumer<Bundle> extensionUiLoadFunction) {
this.extensionUiBundlesToLoad = extensionUiBundlesToLoad;
this.extensionUiLoadFunction = extensionUiLoadFunction;
}
@Override
public void run() {
while (!stopped) {
try {
final Bundle bundle = extensionUiBundlesToLoad.poll(EXTENSION_UI_POLL_INTERVAL.getSeconds(), TimeUnit.SECONDS);
if (bundle != null) {
extensionUiLoadFunction.accept(bundle);
}
} catch (final InterruptedException e) {
Thread.currentThread().interrupt();
} catch (final Exception e) {
logger.error("Failed to load extension UI", e);
}
}
}
public void stop() {
stopped = true;
}
}
}