NIFI-883 Fixing issue HandleHttpRequest had with PrimaryNodeOnly scheduling

Signed-off-by: Mark Payne <markap14@hotmail.com>
This commit is contained in:
Joseph Percivall 2015-11-02 10:17:32 -05:00 committed by Mark Payne
parent 37e2f178f8
commit 2ae49026e8
1 changed files with 22 additions and 1 deletions

View File

@ -34,6 +34,7 @@ import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.regex.Pattern;
import javax.security.cert.X509Certificate;
@ -226,6 +227,7 @@ public class HandleHttpRequest extends AbstractProcessor {
.build();
private volatile Server server;
private AtomicBoolean initialized = new AtomicBoolean(false);
private final BlockingQueue<HttpRequestContainer> containerQueue = new LinkedBlockingQueue<>(50);
@Override
@ -255,7 +257,15 @@ public class HandleHttpRequest extends AbstractProcessor {
}
@OnScheduled
public void initializeServer(final ProcessContext context) throws Exception {
public void clearInit(){
initialized.set(false);
}
private synchronized void initializeServer(final ProcessContext context) throws Exception {
if(initialized.get()){
return;
}
final String host = context.getProperty(HOSTNAME).getValue();
final int port = context.getProperty(PORT).asInteger();
final SSLContextService sslService = context.getProperty(SSL_CONTEXT).asControllerService(SSLContextService.class);
@ -402,6 +412,8 @@ public class HandleHttpRequest extends AbstractProcessor {
server.start();
getLogger().info("Server started and listening on port " + getPort());
initialized.set(true);
}
protected int getPort() {
@ -452,6 +464,15 @@ public class HandleHttpRequest extends AbstractProcessor {
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
try {
if(!initialized.get()) {
initializeServer(context);
}
} catch (Exception e) {
context.yield();
throw new ProcessException("Failed to initialize the server",e);
}
final HttpRequestContainer container = containerQueue.poll();
if (container == null) {
return;