diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/HandleHttpRequest.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/HandleHttpRequest.java index 49bad40b73..1be8dd941c 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/HandleHttpRequest.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/HandleHttpRequest.java @@ -34,6 +34,7 @@ import java.util.UUID; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.regex.Pattern; import javax.security.cert.X509Certificate; @@ -226,6 +227,7 @@ public class HandleHttpRequest extends AbstractProcessor { .build(); private volatile Server server; + private AtomicBoolean initialized = new AtomicBoolean(false); private final BlockingQueue containerQueue = new LinkedBlockingQueue<>(50); @Override @@ -255,7 +257,15 @@ public class HandleHttpRequest extends AbstractProcessor { } @OnScheduled - public void initializeServer(final ProcessContext context) throws Exception { + public void clearInit(){ + initialized.set(false); + } + + private synchronized void initializeServer(final ProcessContext context) throws Exception { + if(initialized.get()){ + return; + } + final String host = context.getProperty(HOSTNAME).getValue(); final int port = context.getProperty(PORT).asInteger(); final SSLContextService sslService = context.getProperty(SSL_CONTEXT).asControllerService(SSLContextService.class); @@ -402,6 +412,8 @@ public class HandleHttpRequest extends AbstractProcessor { server.start(); getLogger().info("Server started and listening on port " + getPort()); + + initialized.set(true); } protected int getPort() { @@ -452,6 +464,15 @@ public class HandleHttpRequest extends AbstractProcessor { @Override public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { + try { + if(!initialized.get()) { + initializeServer(context); + } + } catch (Exception e) { + context.yield(); + throw new ProcessException("Failed to initialize the server",e); + } + final HttpRequestContainer container = containerQueue.poll(); if (container == null) { return;