NIFI-6567 HandleHttpRequest does not shutdown HTTP server in some cir… (#3673)

* NIFI-6567 HandleHttpRequest does not shutdown HTTP server in some circumstances

signed-off by: Peter Wicks <patricker@gmail.com>
This commit is contained in:
Hsin-Ying Lee 2019-09-11 23:27:52 +08:00 committed by Peter Wicks
parent 9ec6314687
commit 758035b964
1 changed files with 18 additions and 0 deletions

View File

@ -26,6 +26,8 @@ import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.annotation.notification.OnPrimaryNodeStateChange;
import org.apache.nifi.annotation.notification.PrimaryNodeState;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.expression.ExpressionLanguageScope;
@ -39,6 +41,7 @@ import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.standard.util.HTTPUtils;
import org.apache.nifi.scheduling.ExecutionNode;
import org.apache.nifi.ssl.RestrictedSSLContextService;
import org.apache.nifi.ssl.SSLContextService;
import org.apache.nifi.stream.io.StreamUtils;
@ -303,6 +306,7 @@ public class HandleHttpRequest extends AbstractProcessor {
private volatile Server server;
private AtomicBoolean initialized = new AtomicBoolean(false);
private volatile BlockingQueue<HttpRequestContainer> containerQueue;
private AtomicBoolean runOnPrimary = new AtomicBoolean(false);
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
@ -323,6 +327,7 @@ public class HandleHttpRequest extends AbstractProcessor {
if(initialized.get()){
return;
}
runOnPrimary.set(context.getExecutionNode().equals(ExecutionNode.PRIMARY));
this.containerQueue = new LinkedBlockingQueue<>(context.getProperty(CONTAINER_QUEUE_SIZE).asInteger());
final String host = context.getProperty(HOSTNAME).getValue();
final int port = context.getProperty(PORT).evaluateAttributeExpressions().asInteger();
@ -537,10 +542,23 @@ public class HandleHttpRequest extends AbstractProcessor {
server.stop();
server.destroy();
server.join();
clearInit();
getLogger().info("Shut down {}", new Object[]{server});
}
}
@OnPrimaryNodeStateChange
public void onPrimaryNodeChange(final PrimaryNodeState newState) {
if (runOnPrimary.get() && newState.equals(PrimaryNodeState.PRIMARY_NODE_REVOKED)) {
try {
shutdown();
} catch (final Exception shutdownException) {
getLogger().warn("Processor is configured to run only on Primary Node, but failed to shutdown HTTP server following revocation of primary node status due to {}",
shutdownException);
}
}
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
try {