diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/HandleHttpRequest.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/HandleHttpRequest.java index 98158166f5..6e955344c1 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/HandleHttpRequest.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/HandleHttpRequest.java @@ -26,6 +26,8 @@ import org.apache.nifi.annotation.documentation.SeeAlso; import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.lifecycle.OnScheduled; import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.annotation.notification.OnPrimaryNodeStateChange; +import org.apache.nifi.annotation.notification.PrimaryNodeState; import org.apache.nifi.components.AllowableValue; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.expression.ExpressionLanguageScope; @@ -39,6 +41,7 @@ import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.processors.standard.util.HTTPUtils; +import org.apache.nifi.scheduling.ExecutionNode; import org.apache.nifi.ssl.RestrictedSSLContextService; import org.apache.nifi.ssl.SSLContextService; import org.apache.nifi.stream.io.StreamUtils; @@ -303,6 +306,7 @@ public class HandleHttpRequest extends AbstractProcessor { private volatile Server server; private AtomicBoolean initialized = new AtomicBoolean(false); private volatile BlockingQueue containerQueue; + private AtomicBoolean runOnPrimary = new AtomicBoolean(false); @Override protected List getSupportedPropertyDescriptors() { @@ -323,6 +327,7 @@ public class HandleHttpRequest extends AbstractProcessor { if(initialized.get()){ return; } + runOnPrimary.set(context.getExecutionNode().equals(ExecutionNode.PRIMARY)); this.containerQueue = new LinkedBlockingQueue<>(context.getProperty(CONTAINER_QUEUE_SIZE).asInteger()); final String host = context.getProperty(HOSTNAME).getValue(); final int port = context.getProperty(PORT).evaluateAttributeExpressions().asInteger(); @@ -537,10 +542,23 @@ public class HandleHttpRequest extends AbstractProcessor { server.stop(); server.destroy(); server.join(); + clearInit(); getLogger().info("Shut down {}", new Object[]{server}); } } + @OnPrimaryNodeStateChange + public void onPrimaryNodeChange(final PrimaryNodeState newState) { + if (runOnPrimary.get() && newState.equals(PrimaryNodeState.PRIMARY_NODE_REVOKED)) { + try { + shutdown(); + } catch (final Exception shutdownException) { + getLogger().warn("Processor is configured to run only on Primary Node, but failed to shutdown HTTP server following revocation of primary node status due to {}", + shutdownException); + } + } + } + @Override public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { try {