NIFI-8770 Use queue drainTo() on shutdown in HandleHttpRequest

- Refactored response handling to use shared sendError() method
- Standardized request logging to include HTTP Method and URI

Signed-off-by: Nathan Gough <thenatog@gmail.com>

This closes #5218.
This commit is contained in:
exceptionfactory 2021-07-15 14:19:57 -05:00 committed by Nathan Gough
parent 47eeabd8a5
commit cb020072f7
2 changed files with 210 additions and 177 deletions

View File

@ -56,10 +56,6 @@ import org.eclipse.jetty.server.SslConnectionFactory;
import org.eclipse.jetty.server.handler.AbstractHandler; import org.eclipse.jetty.server.handler.AbstractHandler;
import org.eclipse.jetty.util.ssl.SslContextFactory; import org.eclipse.jetty.util.ssl.SslContextFactory;
import com.google.common.base.Optional;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableList;
import javax.net.ssl.SSLContext; import javax.net.ssl.SSLContext;
import javax.servlet.AsyncContext; import javax.servlet.AsyncContext;
import javax.servlet.DispatcherType; import javax.servlet.DispatcherType;
@ -73,7 +69,6 @@ import java.io.IOException;
import java.io.OutputStream; import java.io.OutputStream;
import java.io.UnsupportedEncodingException; import java.io.UnsupportedEncodingException;
import java.net.URI; import java.net.URI;
import java.net.URISyntaxException;
import java.net.URLDecoder; import java.net.URLDecoder;
import java.security.Principal; import java.security.Principal;
import java.security.cert.X509Certificate; import java.security.cert.X509Certificate;
@ -84,6 +79,7 @@ import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Optional;
import java.util.Set; import java.util.Set;
import java.util.UUID; import java.util.UUID;
import java.util.concurrent.BlockingQueue; import java.util.concurrent.BlockingQueue;
@ -93,6 +89,11 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Pattern; import java.util.regex.Pattern;
import static javax.servlet.http.HttpServletResponse.SC_SERVICE_UNAVAILABLE;
import static javax.servlet.http.HttpServletResponse.SC_METHOD_NOT_ALLOWED;
import static javax.servlet.http.HttpServletResponse.SC_NOT_FOUND;
import static javax.servlet.http.HttpServletResponse.SC_BAD_REQUEST;
@InputRequirement(Requirement.INPUT_FORBIDDEN) @InputRequirement(Requirement.INPUT_FORBIDDEN)
@Tags({"http", "https", "request", "listen", "ingress", "web service"}) @Tags({"http", "https", "request", "listen", "ingress", "web service"})
@CapabilityDescription("Starts an HTTP Server and listens for HTTP Requests. For each request, creates a FlowFile and transfers to 'success'. " @CapabilityDescription("Starts an HTTP Server and listens for HTTP Requests. For each request, creates a FlowFile and transfers to 'success'. "
@ -322,10 +323,10 @@ public class HandleHttpRequest extends AbstractProcessor {
private volatile Server server; private volatile Server server;
private volatile boolean ready; private volatile boolean ready;
private AtomicBoolean initialized = new AtomicBoolean(false);
private volatile BlockingQueue<HttpRequestContainer> containerQueue; private volatile BlockingQueue<HttpRequestContainer> containerQueue;
private AtomicBoolean runOnPrimary = new AtomicBoolean(false); private final AtomicBoolean initialized = new AtomicBoolean(false);
private AtomicReference<Set<String>> parameterToAttributesReference = new AtomicReference<>(null); private final AtomicBoolean runOnPrimary = new AtomicBoolean(false);
private final AtomicReference<Set<String>> parameterToAttributesReference = new AtomicReference<>(null);
@Override @Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
@ -357,10 +358,10 @@ public class HandleHttpRequest extends AbstractProcessor {
final String clientAuthValue = context.getProperty(CLIENT_AUTH).getValue(); final String clientAuthValue = context.getProperty(CLIENT_AUTH).getValue();
final boolean need; final boolean need;
final boolean want; final boolean want;
if (CLIENT_NEED.equals(clientAuthValue)) { if (CLIENT_NEED.getValue().equals(clientAuthValue)) {
need = true; need = true;
want = false; want = false;
} else if (CLIENT_WANT.equals(clientAuthValue)) { } else if (CLIENT_WANT.getValue().equals(clientAuthValue)) {
need = false; need = false;
want = true; want = true;
} else { } else {
@ -458,66 +459,40 @@ public class HandleHttpRequest extends AbstractProcessor {
server.setHandler(new AbstractHandler() { server.setHandler(new AbstractHandler() {
@Override @Override
public void handle(final String target, final Request baseRequest, final HttpServletRequest request, final HttpServletResponse response) public void handle(final String target, final Request baseRequest, final HttpServletRequest request, final HttpServletResponse response) {
throws IOException, ServletException {
final String requestUri = request.getRequestURI(); final String requestUri = request.getRequestURI();
if (!allowedMethods.contains(request.getMethod().toUpperCase())) { final String method = request.getMethod().toUpperCase();
getLogger().info("Sending back METHOD_NOT_ALLOWED response to {}; method was {}; request URI was {}", if (!allowedMethods.contains(method)) {
new Object[]{request.getRemoteAddr(), request.getMethod(), requestUri}); sendError(SC_METHOD_NOT_ALLOWED, "Method Not Allowed", request, response);
response.sendError(HttpServletResponse.SC_METHOD_NOT_ALLOWED);
return; return;
} }
if (pathPattern != null) { if (pathPattern != null) {
final URI uri; final URI uri = URI.create(requestUri);
try {
uri = new URI(requestUri);
} catch (final URISyntaxException e) {
throw new ServletException(e);
}
if (!pathPattern.matcher(uri.getPath()).matches()) { if (!pathPattern.matcher(uri.getPath()).matches()) {
getLogger().info("Sending back NOT_FOUND response to {}; request was {} {}", sendError(SC_NOT_FOUND, "Path Not Found", request, response);
new Object[]{request.getRemoteAddr(), request.getMethod(), requestUri});
response.sendError(HttpServletResponse.SC_NOT_FOUND);
return; return;
} }
} }
// If destination queues full, send back a 503: Service Unavailable.
if (context.getAvailableRelationships().isEmpty()) { if (context.getAvailableRelationships().isEmpty()) {
getLogger().warn("Request from {} cannot be processed, processor downstream queue is full; responding with SERVICE_UNAVAILABLE", sendError(SC_SERVICE_UNAVAILABLE, "No Available Relationships", request, response);
new Object[]{request.getRemoteAddr()});
response.sendError(HttpServletResponse.SC_SERVICE_UNAVAILABLE, "Processor queue is full");
return; return;
} else if (!ready) { } else if (!ready) {
getLogger().warn("Request from {} cannot be processed, processor is being shut down; responding with SERVICE_UNAVAILABLE", sendError(SC_SERVICE_UNAVAILABLE, "Server Not Ready", request, response);
new Object[]{request.getRemoteAddr()});
response.sendError(HttpServletResponse.SC_SERVICE_UNAVAILABLE, "Processor is shutting down");
return; return;
} }
// Right now, that information, though, is only in the ProcessSession, not the ProcessContext,
// so it is not known to us. Should see if it can be added to the ProcessContext.
final AsyncContext async = baseRequest.startAsync(); final AsyncContext async = baseRequest.startAsync();
// disable timeout handling on AsyncContext, timeout will be handled in HttpContextMap // disable timeout handling on AsyncContext, timeout will be handled in HttpContextMap
async.setTimeout(0); async.setTimeout(0);
final boolean added = containerQueue.offer(new HttpRequestContainer(request, response, async)); final HttpRequestContainer container = new HttpRequestContainer(request, response, async);
final boolean added = containerQueue.offer(container);
if (added) { if (added) {
getLogger().debug("Added Http Request to queue for {} {} from {}", getLogger().debug("Request Queued: Method [{}] URI [{}] Address [{}]", method, requestUri, request.getRemoteAddr());
new Object[]{request.getMethod(), requestUri, request.getRemoteAddr()});
} else { } else {
getLogger().warn("Request from {} cannot be processed, container queue is full; responding with SERVICE_UNAVAILABLE", sendError(SC_SERVICE_UNAVAILABLE, "Request Queue Full", container);
new Object[]{request.getRemoteAddr()});
response.sendError(HttpServletResponse.SC_SERVICE_UNAVAILABLE, "Container queue is full");
async.complete();
} }
} }
}); });
@ -525,7 +500,9 @@ public class HandleHttpRequest extends AbstractProcessor {
this.server = server; this.server = server;
server.start(); server.start();
getLogger().info("Server started and listening on port " + getPort()); for (final Connector connector : server.getConnectors()) {
getLogger().info("Started Connector {}", connector);
}
initialized.set(true); initialized.set(true);
ready = true; ready = true;
@ -561,54 +538,50 @@ public class HandleHttpRequest extends AbstractProcessor {
public void shutdown() throws Exception { public void shutdown() throws Exception {
ready = false; ready = false;
if (server != null) { if (server == null) {
getLogger().debug("Shutting down server"); getLogger().debug("Server not configured");
rejectPendingRequests(); } else {
if (server.isStopped()) {
getLogger().debug("Server Stopped {}", server);
} else {
for (final Connector connector : server.getConnectors()) {
getLogger().debug("Stopping Connector {}", connector);
}
drainContainerQueue();
server.stop(); server.stop();
server.destroy(); server.destroy();
server.join(); server.join();
clearInit(); clearInit();
getLogger().info("Shut down {}", new Object[]{server});
}
}
void rejectPendingRequests() { for (final Connector connector : server.getConnectors()) {
HttpRequestContainer container; getLogger().info("Stopped Connector {}", connector);
while ((container = getNextContainer()) != null) { }
try {
getLogger().warn("Rejecting request from {} during cleanup after processor shutdown; responding with SERVICE_UNAVAILABLE",
new Object[]{container.getRequest().getRemoteAddr()});
HttpServletResponse response = container.getResponse();
response.sendError(HttpServletResponse.SC_SERVICE_UNAVAILABLE, "Processor is shutting down");
container.getContext().complete();
} catch (final IOException e) {
getLogger().warn("Failed to send HTTP response to {} due to {}",
new Object[]{container.getRequest().getRemoteAddr(), e});
} }
} }
} }
private HttpRequestContainer getNextContainer() { void drainContainerQueue() {
HttpRequestContainer container; if (containerQueue.isEmpty()) {
try { getLogger().debug("No Pending Requests Queued");
container = containerQueue.poll(2, TimeUnit.SECONDS); } else {
} catch (final InterruptedException e) { final List<HttpRequestContainer> pendingContainers = new ArrayList<>();
getLogger().warn("Interrupted while polling for " + HttpRequestContainer.class.getSimpleName() + " during cleanup."); containerQueue.drainTo(pendingContainers);
container = null; getLogger().warn("Pending Requests Queued [{}]", pendingContainers.size());
for (final HttpRequestContainer container : pendingContainers) {
sendError(SC_SERVICE_UNAVAILABLE, "Stopping Server", container);
}
} }
return container;
} }
@OnPrimaryNodeStateChange @OnPrimaryNodeStateChange
public void onPrimaryNodeChange(final PrimaryNodeState newState) { public void onPrimaryNodeChange(final PrimaryNodeState state) {
if (runOnPrimary.get() && newState.equals(PrimaryNodeState.PRIMARY_NODE_REVOKED)) { if (runOnPrimary.get() && state.equals(PrimaryNodeState.PRIMARY_NODE_REVOKED)) {
getLogger().info("Server Shutdown Started: Primary Node State Changed [{}]", state);
try { try {
shutdown(); shutdown();
} catch (final Exception shutdownException) { } catch (final Exception e) {
getLogger().warn("Processor is configured to run only on Primary Node, but failed to shutdown HTTP server following revocation of primary node status due to {}", getLogger().warn("Server Shutdown Failed: Primary Node State Changed [{}]", state, e);
shutdownException);
} }
} }
} }
@ -626,7 +599,7 @@ public class HandleHttpRequest extends AbstractProcessor {
// shutdown to release any resources allocated during the failed initialization // shutdown to release any resources allocated during the failed initialization
shutdown(); shutdown();
} catch (final Exception shutdownException) { } catch (final Exception shutdownException) {
getLogger().debug("Failed to shutdown following a failed initialization: " + shutdownException); getLogger().debug("Server Shutdown Failed after Initialization Failed", shutdownException);
} }
throw new ProcessException("Failed to initialize the server", e); throw new ProcessException("Failed to initialize the server", e);
@ -647,14 +620,14 @@ public class HandleHttpRequest extends AbstractProcessor {
final long start = System.nanoTime(); final long start = System.nanoTime();
final HttpServletRequest request = container.getRequest(); final HttpServletRequest request = container.getRequest();
if (!Strings.isNullOrEmpty(request.getContentType()) && request.getContentType().contains(MIME_TYPE__MULTIPART_FORM_DATA)) { if (StringUtils.contains(request.getContentType(), MIME_TYPE__MULTIPART_FORM_DATA)) {
final long requestMaxSize = context.getProperty(MULTIPART_REQUEST_MAX_SIZE).asDataSize(DataUnit.B).longValue(); final long requestMaxSize = context.getProperty(MULTIPART_REQUEST_MAX_SIZE).asDataSize(DataUnit.B).longValue();
final int readBufferSize = context.getProperty(MULTIPART_READ_BUFFER_SIZE).asDataSize(DataUnit.B).intValue(); final int readBufferSize = context.getProperty(MULTIPART_READ_BUFFER_SIZE).asDataSize(DataUnit.B).intValue();
String tempDir = System.getProperty("java.io.tmpdir"); String tempDir = System.getProperty("java.io.tmpdir");
request.setAttribute(Request.MULTIPART_CONFIG_ELEMENT, new MultipartConfigElement(tempDir, requestMaxSize, requestMaxSize, readBufferSize)); request.setAttribute(Request.MULTIPART_CONFIG_ELEMENT, new MultipartConfigElement(tempDir, requestMaxSize, requestMaxSize, readBufferSize));
List<Part> parts = null; List<Part> parts = null;
try { try {
parts = ImmutableList.copyOf(request.getParts()); parts = Collections.unmodifiableList(new ArrayList<>(request.getParts()));
int allPartsCount = parts.size(); int allPartsCount = parts.size();
final String contextIdentifier = UUID.randomUUID().toString(); final String contextIdentifier = UUID.randomUUID().toString();
for (int i = 0; i < allPartsCount; i++) { for (int i = 0; i < allPartsCount; i++) {
@ -663,22 +636,21 @@ public class HandleHttpRequest extends AbstractProcessor {
try (OutputStream flowFileOut = session.write(flowFile)) { try (OutputStream flowFileOut = session.write(flowFile)) {
StreamUtils.copy(part.getInputStream(), flowFileOut); StreamUtils.copy(part.getInputStream(), flowFileOut);
} catch (IOException e) { } catch (IOException e) {
handleFlowContentStreamingError(session, container, request, Optional.of(flowFile), e); handleFlowContentStreamingError(session, container, Optional.of(flowFile), e);
return; return;
} }
flowFile = savePartAttributes(context, session, part, flowFile, i, allPartsCount); flowFile = savePartAttributes(session, part, flowFile, i, allPartsCount);
flowFile = saveRequestAttributes(context, session, request, flowFile, contextIdentifier); flowFile = saveRequestAttributes(context, session, request, flowFile, contextIdentifier);
if (i == 0) { if (i == 0) {
// each one of multipart comes from a single request, thus registering only once per loop. // each one of multipart comes from a single request, thus registering only once per loop.
boolean requestRegistrationSuccess = registerRequest(context, session, container, start, request, flowFile); boolean requestRegistrationSuccess = registerRequest(context, session, container, flowFile);
if (!requestRegistrationSuccess) if (!requestRegistrationSuccess)
break; break;
} }
forwardFlowFile(context, session, container, start, request, flowFile); forwardFlowFile(session, start, request, flowFile);
} }
} catch (IOException | ServletException | IllegalStateException e) { } catch (IOException | ServletException | IllegalStateException e) {
handleFlowContentStreamingError(session, container, request, Optional.absent(), e); handleFlowContentStreamingError(session, container, Optional.empty(), e);
return;
} finally { } finally {
if (parts != null) { if (parts != null) {
for (Part part : parts) { for (Part part : parts) {
@ -695,18 +667,18 @@ public class HandleHttpRequest extends AbstractProcessor {
try (OutputStream flowFileOut = session.write(flowFile)) { try (OutputStream flowFileOut = session.write(flowFile)) {
StreamUtils.copy(request.getInputStream(), flowFileOut); StreamUtils.copy(request.getInputStream(), flowFileOut);
} catch (final IOException e) { } catch (final IOException e) {
handleFlowContentStreamingError(session, container, request, Optional.of(flowFile), e); handleFlowContentStreamingError(session, container, Optional.of(flowFile), e);
return; return;
} }
final String contextIdentifier = UUID.randomUUID().toString(); final String contextIdentifier = UUID.randomUUID().toString();
flowFile = saveRequestAttributes(context, session, request, flowFile, contextIdentifier); flowFile = saveRequestAttributes(context, session, request, flowFile, contextIdentifier);
boolean requestRegistrationSuccess = registerRequest(context, session, container, start, request, flowFile); boolean requestRegistrationSuccess = registerRequest(context, session, container, flowFile);
if (requestRegistrationSuccess) if (requestRegistrationSuccess)
forwardFlowFile(context, session, container, start, request, flowFile); forwardFlowFile(session, start, request, flowFile);
} }
} }
private FlowFile savePartAttributes(ProcessContext context, ProcessSession session, Part part, FlowFile flowFile, final int i, final int allPartsCount) { private FlowFile savePartAttributes(ProcessSession session, Part part, FlowFile flowFile, final int i, final int allPartsCount) {
final Map<String, String> attributes = new HashMap<>(); final Map<String, String> attributes = new HashMap<>();
for (String headerName : part.getHeaderNames()) { for (String headerName : part.getHeaderNames()) {
final String headerValue = part.getHeader(headerName); final String headerValue = part.getHeader(headerName);
@ -817,76 +789,53 @@ public class HandleHttpRequest extends AbstractProcessor {
putAttribute(attributes, "http.principal.name", principal.getName()); putAttribute(attributes, "http.principal.name", principal.getName());
} }
final X509Certificate certs[] = (X509Certificate[]) request.getAttribute("javax.servlet.request.X509Certificate"); final X509Certificate[] certs = (X509Certificate[]) request.getAttribute("javax.servlet.request.X509Certificate");
final String subjectDn;
if (certs != null && certs.length > 0) { if (certs != null && certs.length > 0) {
final X509Certificate cert = certs[0]; final X509Certificate cert = certs[0];
subjectDn = cert.getSubjectDN().getName(); final String subjectDn = cert.getSubjectDN().getName();
final String issuerDn = cert.getIssuerDN().getName(); final String issuerDn = cert.getIssuerDN().getName();
putAttribute(attributes, HTTPUtils.HTTP_SSL_CERT, subjectDn); putAttribute(attributes, HTTPUtils.HTTP_SSL_CERT, subjectDn);
putAttribute(attributes, "http.issuer.dn", issuerDn); putAttribute(attributes, "http.issuer.dn", issuerDn);
} else {
subjectDn = null;
} }
return session.putAllAttributes(flowFile, attributes); return session.putAllAttributes(flowFile, attributes);
} }
private void forwardFlowFile(final ProcessContext context, final ProcessSession session, private void forwardFlowFile(final ProcessSession session, final long start, final HttpServletRequest request, final FlowFile flowFile) {
HttpRequestContainer container, final long start, final HttpServletRequest request, FlowFile flowFile) {
final long receiveMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start); final long receiveMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
String subjectDn = flowFile.getAttribute(HTTPUtils.HTTP_SSL_CERT); final String subjectDn = flowFile.getAttribute(HTTPUtils.HTTP_SSL_CERT);
session.getProvenanceReporter().receive(flowFile, HTTPUtils.getURI(flowFile.getAttributes()), session.getProvenanceReporter().receive(flowFile, HTTPUtils.getURI(flowFile.getAttributes()),
"Received from " + request.getRemoteAddr() + (subjectDn == null ? "" : " with DN=" + subjectDn), receiveMillis); "Received from " + request.getRemoteAddr() + (subjectDn == null ? "" : " with DN=" + subjectDn), receiveMillis);
session.transfer(flowFile, REL_SUCCESS); session.transfer(flowFile, REL_SUCCESS);
getLogger().info("Transferring {} to 'success'; received from {}", new Object[]{flowFile, request.getRemoteAddr()}); getLogger().debug("Transferred {} to [{}] Remote Address [{}] ", flowFile, REL_SUCCESS, request.getRemoteAddr());
} }
private boolean registerRequest(final ProcessContext context, final ProcessSession session, private boolean registerRequest(final ProcessContext context, final ProcessSession session,
HttpRequestContainer container, final long start, final HttpServletRequest request, FlowFile flowFile) { final HttpRequestContainer container, final FlowFile flowFile) {
final HttpContextMap contextMap = context.getProperty(HTTP_CONTEXT_MAP).asControllerService(HttpContextMap.class); final HttpContextMap contextMap = context.getProperty(HTTP_CONTEXT_MAP).asControllerService(HttpContextMap.class);
String contextIdentifier = flowFile.getAttribute(HTTPUtils.HTTP_CONTEXT_ID); final String contextIdentifier = flowFile.getAttribute(HTTPUtils.HTTP_CONTEXT_ID);
final HttpServletRequest request = container.getRequest();
final boolean registered = contextMap.register(contextIdentifier, request, container.getResponse(), container.getContext()); final boolean registered = contextMap.register(contextIdentifier, request, container.getResponse(), container.getContext());
if (registered) if (registered) {
return true; return true;
getLogger().warn("Received request from {} but could not process it because too many requests are already outstanding; responding with SERVICE_UNAVAILABLE",
new Object[]{request.getRemoteAddr()});
try {
container.getResponse().sendError(HttpServletResponse.SC_SERVICE_UNAVAILABLE, "HttpContextMap is full");
container.getContext().complete();
} catch (final Exception e) {
getLogger().warn("Failed to respond with SERVICE_UNAVAILABLE message to {} due to {}",
new Object[]{request.getRemoteAddr(), e});
} }
session.remove(flowFile); session.remove(flowFile);
sendError(SC_SERVICE_UNAVAILABLE, "Request Registration Failed", container);
return false; return false;
} }
protected void handleFlowContentStreamingError(final ProcessSession session, final HttpRequestContainer container, final Optional<FlowFile> flowFile, final Exception e) {
protected void handleFlowContentStreamingError(final ProcessSession session, HttpRequestContainer container,
final HttpServletRequest request, Optional<FlowFile> flowFile, final Exception e) {
// There may be many reasons which can produce an IOException on the HTTP stream and in some of them, eg. // There may be many reasons which can produce an IOException on the HTTP stream and in some of them, eg.
// bad requests, the connection to the client is not closed. In order to address also these cases, we try // bad requests, the connection to the client is not closed. In order to address also these cases, we try
// and answer with a BAD_REQUEST, which lets the client know that the request has not been correctly // and answer with a BAD_REQUEST, which lets the client know that the request has not been correctly
// processed and makes it aware that the connection can be closed. // processed and makes it aware that the connection can be closed.
getLogger().error("Failed to receive content from HTTP Request from {} due to {}", final HttpServletRequest request = container.getRequest();
new Object[]{request.getRemoteAddr(), e}); getLogger().error("Stream Processing Failed: Method [{}] URI [{}] Address [{}]", request.getMethod(), request.getRequestURI(), request.getRemoteAddr(), e);
if (flowFile.isPresent()) flowFile.ifPresent(session::remove);
session.remove(flowFile.get()); sendError(SC_BAD_REQUEST, "Stream Processing Failed", container);
try {
HttpServletResponse response = container.getResponse();
response.sendError(HttpServletResponse.SC_BAD_REQUEST);
container.getContext().complete();
} catch (final IOException ioe) {
getLogger().warn("Failed to send HTTP response to {} due to {}",
new Object[]{request.getRemoteAddr(), ioe});
}
} }
private void putAttribute(final Map<String, String> map, final String key, final Object value) { private void putAttribute(final Map<String, String> map, final String key, final Object value) {
@ -905,8 +854,34 @@ public class HandleHttpRequest extends AbstractProcessor {
map.put(key, value); map.put(key, value);
} }
private static class HttpRequestContainer { private void sendError(final int statusCode, final String message, final HttpRequestContainer container) {
sendError(statusCode, message, container.getRequest(), container.getResponse());
final AsyncContext asyncContext = container.getContext();
try {
asyncContext.complete();
} catch (final RuntimeException e) {
final HttpServletRequest request = container.getRequest();
final String method = request.getMethod();
final String uri = request.getRequestURI();
final String remoteAddr = request.getRemoteAddr();
getLogger().error("Complete Request Failed: Method [{}] URI [{}] Address [{}]", method, uri, remoteAddr, e);
}
}
private void sendError(final int statusCode, final String message, final HttpServletRequest request, final HttpServletResponse response) {
final String method = request.getMethod();
final String uri = request.getRequestURI();
final String remoteAddr = request.getRemoteAddr();
try {
response.sendError(statusCode, message);
getLogger().warn("Send Error Completed: HTTP {} [{}] Method [{}] URI [{}] Address [{}]", statusCode, message, method, uri, remoteAddr);
} catch (final Exception e) {
getLogger().error("Send Error Failed: HTTP {} [{}] Method [{}] URI [{}] Address [{}]", statusCode, message, method, uri, remoteAddr, e);
}
}
private static class HttpRequestContainer {
private final HttpServletRequest request; private final HttpServletRequest request;
private final HttpServletResponse response; private final HttpServletResponse response;
private final AsyncContext context; private final AsyncContext context;

View File

@ -17,23 +17,31 @@
package org.apache.nifi.processors.standard; package org.apache.nifi.processors.standard;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertNull;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
import java.io.File; import java.io.File;
import java.io.FileWriter;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.io.PrintWriter;
import java.net.HttpURLConnection; import java.net.HttpURLConnection;
import java.net.URL; import java.net.URL;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Optional;
import java.util.Random; import java.util.Random;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.IntStream; import java.util.stream.IntStream;
import javax.net.ssl.HttpsURLConnection; import javax.net.ssl.HttpsURLConnection;
import javax.net.ssl.SSLContext; import javax.net.ssl.SSLContext;
@ -41,10 +49,6 @@ import javax.servlet.AsyncContext;
import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse; import javax.servlet.http.HttpServletResponse;
import com.google.api.client.util.Charsets;
import com.google.common.base.Optional;
import com.google.common.collect.Iterables;
import com.google.common.io.Files;
import okhttp3.Call; import okhttp3.Call;
import okhttp3.Callback; import okhttp3.Callback;
import okhttp3.MediaType; import okhttp3.MediaType;
@ -53,17 +57,21 @@ import okhttp3.OkHttpClient;
import okhttp3.Request; import okhttp3.Request;
import okhttp3.RequestBody; import okhttp3.RequestBody;
import okhttp3.Response; import okhttp3.Response;
import org.apache.nifi.annotation.notification.PrimaryNodeState;
import org.apache.nifi.controller.AbstractControllerService; import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.http.HttpContextMap; import org.apache.nifi.http.HttpContextMap;
import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processors.standard.util.HTTPUtils; import org.apache.nifi.processors.standard.util.HTTPUtils;
import org.apache.nifi.remote.io.socket.NetworkUtils;
import org.apache.nifi.reporting.InitializationException; import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.scheduling.ExecutionNode;
import org.apache.nifi.security.util.TlsException; import org.apache.nifi.security.util.TlsException;
import org.apache.nifi.ssl.RestrictedSSLContextService; import org.apache.nifi.ssl.RestrictedSSLContextService;
import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners; import org.apache.nifi.util.TestRunners;
import org.apache.nifi.web.util.ssl.SslContextUtils; import org.apache.nifi.web.util.ssl.SslContextUtils;
import org.jetbrains.annotations.NotNull;
import org.junit.After; import org.junit.After;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
@ -179,11 +187,11 @@ public class ITestHandleHttpRequest {
.addFormDataPart("p1", "v1") .addFormDataPart("p1", "v1")
.addFormDataPart("p2", "v2") .addFormDataPart("p2", "v2")
.addFormDataPart("file1", "my-file-text.txt", .addFormDataPart("file1", "my-file-text.txt",
RequestBody.create(MediaType.parse("text/plain"), createTextFile("my-file-text.txt", "Hello", "World"))) RequestBody.create(createTextFile("Hello", "World"), MediaType.parse("text/plain")))
.addFormDataPart("file2", "my-file-data.json", .addFormDataPart("file2", "my-file-data.json",
RequestBody.create(MediaType.parse("application/json"), createTextFile("my-file-text.txt", "{ \"name\":\"John\", \"age\":30 }"))) RequestBody.create(createTextFile( "{ \"name\":\"John\", \"age\":30 }"), MediaType.parse("application/json")))
.addFormDataPart("file3", "my-file-binary.bin", .addFormDataPart("file3", "my-file-binary.bin",
RequestBody.create(MediaType.parse("application/octet-stream"), generateRandomBinaryData(100))) RequestBody.create(generateRandomBinaryData(), MediaType.parse("application/octet-stream")))
.build(); .build();
Request request = new Request.Builder() Request request = new Request.Builder()
@ -324,7 +332,6 @@ public class ITestHandleHttpRequest {
// We cannot rely on the order we sent them in. // We cannot rely on the order we sent them in.
for (int i = 1; i < 4; i++) { for (int i = 1; i < 4; i++) {
MockFlowFile mff = findFlowFile(flowFilesForRelationship, "http.multipart.name", String.format("p%d", i)); MockFlowFile mff = findFlowFile(flowFilesForRelationship, "http.multipart.name", String.format("p%d", i));
String contextId = mff.getAttribute(HTTPUtils.HTTP_CONTEXT_ID);
mff.assertAttributeEquals("http.multipart.name", String.format("p%d", i)); mff.assertAttributeEquals("http.multipart.name", String.format("p%d", i));
mff.assertAttributeExists("http.param.p1"); mff.assertAttributeExists("http.param.p1");
mff.assertAttributeEquals("http.param.p1", "v1"); mff.assertAttributeEquals("http.param.p1", "v1");
@ -364,11 +371,11 @@ public class ITestHandleHttpRequest {
.addFormDataPart("p1", "v1") .addFormDataPart("p1", "v1")
.addFormDataPart("p2", "v2") .addFormDataPart("p2", "v2")
.addFormDataPart("file1", "my-file-text.txt", .addFormDataPart("file1", "my-file-text.txt",
RequestBody.create(MediaType.parse("text/plain"), createTextFile("my-file-text.txt", "Hello", "World"))) RequestBody.create(createTextFile("my-file-text.txt", "Hello", "World"), MediaType.parse("text/plain")))
.addFormDataPart("file2", "my-file-data.json", .addFormDataPart("file2", "my-file-data.json",
RequestBody.create(MediaType.parse("application/json"), createTextFile("my-file-text.txt", "{ \"name\":\"John\", \"age\":30 }"))) RequestBody.create(createTextFile("my-file-text.txt", "{ \"name\":\"John\", \"age\":30 }"), MediaType.parse("application/json")))
.addFormDataPart("file3", "my-file-binary.bin", .addFormDataPart("file3", "my-file-binary.bin",
RequestBody.create(MediaType.parse("application/octet-stream"), generateRandomBinaryData(100))) RequestBody.create(generateRandomBinaryData(), MediaType.parse("application/octet-stream")))
.build(); .build();
Request request = new Request.Builder() Request request = new Request.Builder()
@ -383,12 +390,12 @@ public class ITestHandleHttpRequest {
Callback callback = new Callback() { Callback callback = new Callback() {
@Override @Override
public void onFailure(Call call, IOException e) { public void onFailure(@NotNull Call call, @NotNull IOException e) {
// Not going to happen // Not going to happen
} }
@Override @Override
public void onResponse(Call call, Response response) { public void onResponse(@NotNull Call call, @NotNull Response response) {
responseCode.set(response.code()); responseCode.set(response.code());
resultReady.countDown(); resultReady.countDown();
} }
@ -409,25 +416,27 @@ public class ITestHandleHttpRequest {
Assert.assertEquals(503, responseCode.get()); Assert.assertEquals(503, responseCode.get());
} }
private byte[] generateRandomBinaryData(int i) { private byte[] generateRandomBinaryData() {
byte[] bytes = new byte[100]; byte[] bytes = new byte[100];
new Random().nextBytes(bytes); new Random().nextBytes(bytes);
return bytes; return bytes;
} }
private File createTextFile(String fileName, String... lines) throws IOException { private File createTextFile(String... lines) throws IOException {
File file = new File(fileName); File file = new File(getClass().getSimpleName());
file.deleteOnExit(); file.deleteOnExit();
for (String string : lines) { try (final PrintWriter writer = new PrintWriter(new FileWriter(file))) {
Files.append(string, file, Charsets.UTF_8); for (final String line : lines) {
writer.println(line);
}
} }
return file; return file;
} }
protected MockFlowFile findFlowFile(List<MockFlowFile> flowFilesForRelationship, String attributeName, String attributeValue) { protected MockFlowFile findFlowFile(List<MockFlowFile> flowFilesForRelationship, String attributeName, String attributeValue) {
Optional<MockFlowFile> optional = Iterables.tryFind(flowFilesForRelationship, ff -> ff.getAttribute(attributeName).equals(attributeValue)); Optional<MockFlowFile> optional = flowFilesForRelationship.stream().filter(ff -> ff.getAttribute(attributeName).equals(attributeValue)).findFirst();
Assert.assertTrue(optional.isPresent()); Assert.assertTrue(optional.isPresent());
return optional.get(); return optional.get();
} }
@ -450,7 +459,6 @@ public class ITestHandleHttpRequest {
contextMap.setRegisterSuccessfully(false); contextMap.setRegisterSuccessfully(false);
final int[] responseCode = new int[1]; final int[] responseCode = new int[1];
responseCode[0] = 0;
final Thread httpThread = new Thread(new Runnable() { final Thread httpThread = new Thread(new Runnable() {
@Override @Override
public void run() { public void run() {
@ -543,12 +551,12 @@ public class ITestHandleHttpRequest {
Callback callback = new Callback() { Callback callback = new Callback() {
@Override @Override
public void onFailure(Call call, IOException e) { public void onFailure(@NotNull Call call, @NotNull IOException e) {
// Will only happen once for the first non-rejected request, but not important // Will only happen once for the first non-rejected request, but not important
} }
@Override @Override
public void onResponse(Call call, Response response) throws IOException { public void onResponse(@NotNull Call call, @NotNull Response response) {
responses.add(response); responses.add(response);
cleanupDone.countDown(); cleanupDone.countDown();
} }
@ -583,10 +591,64 @@ public class ITestHandleHttpRequest {
assertEquals(responses.size(), nrOfRequests - 1); assertEquals(responses.size(), nrOfRequests - 1);
for (Response response : responses) { for (Response response : responses) {
assertEquals(HttpServletResponse.SC_SERVICE_UNAVAILABLE, response.code()); assertEquals(HttpServletResponse.SC_SERVICE_UNAVAILABLE, response.code());
assertTrue("Unexpected HTTP response for rejected requests", new String(response.body().bytes()).contains("Processor is shutting down"));
} }
} }
@Test(timeout = 15000)
public void testOnPrimaryNodeChangePrimaryNodeRevoked() throws Exception {
processor = new HandleHttpRequest();
final TestRunner runner = TestRunners.newTestRunner(processor);
final int port = NetworkUtils.getAvailableTcpPort();
runner.setProperty(HandleHttpRequest.PORT, Integer.toString(port));
final MockHttpContextMap contextMap = new MockHttpContextMap();
final String contextMapId = MockHttpContextMap.class.getSimpleName();
runner.addControllerService(contextMapId, contextMap);
runner.enableControllerService(contextMap);
runner.setProperty(HandleHttpRequest.HTTP_CONTEXT_MAP, contextMapId);
final ProcessContext processContext = spy(runner.getProcessContext());
when(processContext.getExecutionNode()).thenReturn(ExecutionNode.PRIMARY);
processor.initializeServer(processContext);
final OkHttpClient client = new OkHttpClient.Builder().build();
final String url = String.format("http://localhost:%d", port);
final ExecutorService executorService = Executors.newSingleThreadExecutor();
final CountDownLatch requestCompleted = new CountDownLatch(1);
final CountDownLatch requestStarted = new CountDownLatch(1);
final AtomicReference<IOException> requestException = new AtomicReference<>();
final AtomicInteger responseStatus = new AtomicInteger();
executorService.execute(() -> {
final Request request = new Request.Builder().url(url).get().build();
final Call call = client.newCall(request);
call.enqueue(new Callback() {
@Override
public void onFailure(@NotNull Call call, @NotNull IOException e) {
requestException.set(e);
requestCompleted.countDown();
}
@Override
public void onResponse(@NotNull Call call, @NotNull Response response) {
responseStatus.set(response.code());
requestCompleted.countDown();
}
});
requestStarted.countDown();
});
requestStarted.await();
Thread.sleep(1000);
processor.onPrimaryNodeChange(PrimaryNodeState.PRIMARY_NODE_REVOKED);
requestCompleted.await();
assertNull("HTTP Request Exception found", requestException.get());
assertEquals("HTTP Status not matched", HttpServletResponse.SC_SERVICE_UNAVAILABLE, responseStatus.get());
}
@Test @Test
public void testSecure() throws Exception { public void testSecure() throws Exception {
secureTest(false); secureTest(false);
@ -610,7 +672,7 @@ public class ITestHandleHttpRequest {
runner.enableControllerService(contextMap); runner.enableControllerService(contextMap);
runner.setProperty(HandleHttpRequest.HTTP_CONTEXT_MAP, "http-context-map"); runner.setProperty(HandleHttpRequest.HTTP_CONTEXT_MAP, "http-context-map");
final RestrictedSSLContextService sslContextService = Mockito.mock(RestrictedSSLContextService.class); final RestrictedSSLContextService sslContextService = mock(RestrictedSSLContextService.class);
final String serviceIdentifier = RestrictedSSLContextService.class.getName(); final String serviceIdentifier = RestrictedSSLContextService.class.getName();
Mockito.when(sslContextService.getIdentifier()).thenReturn(serviceIdentifier); Mockito.when(sslContextService.getIdentifier()).thenReturn(serviceIdentifier);
Mockito.when(sslContextService.createContext()).thenReturn(keyStoreSslContext); Mockito.when(sslContextService.createContext()).thenReturn(keyStoreSslContext);
@ -683,7 +745,7 @@ public class ITestHandleHttpRequest {
} }
@Override @Override
void rejectPendingRequests() { void drainContainerQueue() {
// Skip this, otherwise it would wait to make sure there are no more requests // Skip this, otherwise it would wait to make sure there are no more requests
} }
}; };
@ -691,7 +753,7 @@ public class ITestHandleHttpRequest {
private void sendRequest(HttpURLConnection connection, CountDownLatch requestSent) throws Exception { private void sendRequest(HttpURLConnection connection, CountDownLatch requestSent) throws Exception {
Future<InputStream> executionFuture = Executors.newSingleThreadExecutor() Future<InputStream> executionFuture = Executors.newSingleThreadExecutor()
.submit(() -> connection.getInputStream()); .submit(connection::getInputStream);
requestSent.countDown(); requestSent.countDown();
@ -701,12 +763,12 @@ public class ITestHandleHttpRequest {
private void sendRequest(OkHttpClient client, Request request, CountDownLatch requestSent) { private void sendRequest(OkHttpClient client, Request request, CountDownLatch requestSent) {
Callback callback = new Callback() { Callback callback = new Callback() {
@Override @Override
public void onFailure(Call call, IOException e) { public void onFailure(@NotNull Call call, @NotNull IOException e) {
// We (may) get a timeout as the processor doesn't answer unless there is some kind of error // We (may) get a timeout as the processor doesn't answer unless there is some kind of error
} }
@Override @Override
public void onResponse(Call call, Response response) throws IOException { public void onResponse(@NotNull Call call, @NotNull Response response) {
// Not called as the processor doesn't answer unless there is some kind of error // Not called as the processor doesn't answer unless there is some kind of error
} }
}; };
@ -747,10 +809,6 @@ public class ITestHandleHttpRequest {
return responseMap.size(); return responseMap.size();
} }
public boolean isRegisterSuccessfully() {
return registerSuccessfully;
}
public void setRegisterSuccessfully(boolean registerSuccessfully) { public void setRegisterSuccessfully(boolean registerSuccessfully) {
this.registerSuccessfully = registerSuccessfully; this.registerSuccessfully = registerSuccessfully;
} }