diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/HandleHttpRequest.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/HandleHttpRequest.java index b9a26f8421..96af6c2c7b 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/HandleHttpRequest.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/HandleHttpRequest.java @@ -56,10 +56,6 @@ import org.eclipse.jetty.server.SslConnectionFactory; import org.eclipse.jetty.server.handler.AbstractHandler; import org.eclipse.jetty.util.ssl.SslContextFactory; -import com.google.common.base.Optional; -import com.google.common.base.Strings; -import com.google.common.collect.ImmutableList; - import javax.net.ssl.SSLContext; import javax.servlet.AsyncContext; import javax.servlet.DispatcherType; @@ -73,7 +69,6 @@ import java.io.IOException; import java.io.OutputStream; import java.io.UnsupportedEncodingException; import java.net.URI; -import java.net.URISyntaxException; import java.net.URLDecoder; import java.security.Principal; import java.security.cert.X509Certificate; @@ -84,6 +79,7 @@ import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.UUID; import java.util.concurrent.BlockingQueue; @@ -93,6 +89,11 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import java.util.regex.Pattern; +import static javax.servlet.http.HttpServletResponse.SC_SERVICE_UNAVAILABLE; +import static javax.servlet.http.HttpServletResponse.SC_METHOD_NOT_ALLOWED; +import static javax.servlet.http.HttpServletResponse.SC_NOT_FOUND; +import static javax.servlet.http.HttpServletResponse.SC_BAD_REQUEST; + @InputRequirement(Requirement.INPUT_FORBIDDEN) @Tags({"http", "https", "request", "listen", "ingress", "web service"}) @CapabilityDescription("Starts an HTTP Server and listens for HTTP Requests. For each request, creates a FlowFile and transfers to 'success'. " @@ -322,10 +323,10 @@ public class HandleHttpRequest extends AbstractProcessor { private volatile Server server; private volatile boolean ready; - private AtomicBoolean initialized = new AtomicBoolean(false); private volatile BlockingQueue containerQueue; - private AtomicBoolean runOnPrimary = new AtomicBoolean(false); - private AtomicReference> parameterToAttributesReference = new AtomicReference<>(null); + private final AtomicBoolean initialized = new AtomicBoolean(false); + private final AtomicBoolean runOnPrimary = new AtomicBoolean(false); + private final AtomicReference> parameterToAttributesReference = new AtomicReference<>(null); @Override protected List getSupportedPropertyDescriptors() { @@ -343,7 +344,7 @@ public class HandleHttpRequest extends AbstractProcessor { } synchronized void initializeServer(final ProcessContext context) throws Exception { - if(initialized.get()){ + if (initialized.get()) { return; } runOnPrimary.set(context.getExecutionNode().equals(ExecutionNode.PRIMARY)); @@ -357,10 +358,10 @@ public class HandleHttpRequest extends AbstractProcessor { final String clientAuthValue = context.getProperty(CLIENT_AUTH).getValue(); final boolean need; final boolean want; - if (CLIENT_NEED.equals(clientAuthValue)) { + if (CLIENT_NEED.getValue().equals(clientAuthValue)) { need = true; want = false; - } else if (CLIENT_WANT.equals(clientAuthValue)) { + } else if (CLIENT_WANT.getValue().equals(clientAuthValue)) { need = false; want = true; } else { @@ -458,66 +459,40 @@ public class HandleHttpRequest extends AbstractProcessor { server.setHandler(new AbstractHandler() { @Override - public void handle(final String target, final Request baseRequest, final HttpServletRequest request, final HttpServletResponse response) - throws IOException, ServletException { - + public void handle(final String target, final Request baseRequest, final HttpServletRequest request, final HttpServletResponse response) { final String requestUri = request.getRequestURI(); - if (!allowedMethods.contains(request.getMethod().toUpperCase())) { - getLogger().info("Sending back METHOD_NOT_ALLOWED response to {}; method was {}; request URI was {}", - new Object[]{request.getRemoteAddr(), request.getMethod(), requestUri}); - response.sendError(HttpServletResponse.SC_METHOD_NOT_ALLOWED); + final String method = request.getMethod().toUpperCase(); + if (!allowedMethods.contains(method)) { + sendError(SC_METHOD_NOT_ALLOWED, "Method Not Allowed", request, response); return; } if (pathPattern != null) { - final URI uri; - try { - uri = new URI(requestUri); - } catch (final URISyntaxException e) { - throw new ServletException(e); - } - + final URI uri = URI.create(requestUri); if (!pathPattern.matcher(uri.getPath()).matches()) { - getLogger().info("Sending back NOT_FOUND response to {}; request was {} {}", - new Object[]{request.getRemoteAddr(), request.getMethod(), requestUri}); - response.sendError(HttpServletResponse.SC_NOT_FOUND); + sendError(SC_NOT_FOUND, "Path Not Found", request, response); return; } } - // If destination queues full, send back a 503: Service Unavailable. if (context.getAvailableRelationships().isEmpty()) { - getLogger().warn("Request from {} cannot be processed, processor downstream queue is full; responding with SERVICE_UNAVAILABLE", - new Object[]{request.getRemoteAddr()}); - - response.sendError(HttpServletResponse.SC_SERVICE_UNAVAILABLE, "Processor queue is full"); + sendError(SC_SERVICE_UNAVAILABLE, "No Available Relationships", request, response); return; } else if (!ready) { - getLogger().warn("Request from {} cannot be processed, processor is being shut down; responding with SERVICE_UNAVAILABLE", - new Object[]{request.getRemoteAddr()}); - - response.sendError(HttpServletResponse.SC_SERVICE_UNAVAILABLE, "Processor is shutting down"); + sendError(SC_SERVICE_UNAVAILABLE, "Server Not Ready", request, response); return; } - // Right now, that information, though, is only in the ProcessSession, not the ProcessContext, - // so it is not known to us. Should see if it can be added to the ProcessContext. final AsyncContext async = baseRequest.startAsync(); - // disable timeout handling on AsyncContext, timeout will be handled in HttpContextMap async.setTimeout(0); - final boolean added = containerQueue.offer(new HttpRequestContainer(request, response, async)); - + final HttpRequestContainer container = new HttpRequestContainer(request, response, async); + final boolean added = containerQueue.offer(container); if (added) { - getLogger().debug("Added Http Request to queue for {} {} from {}", - new Object[]{request.getMethod(), requestUri, request.getRemoteAddr()}); + getLogger().debug("Request Queued: Method [{}] URI [{}] Address [{}]", method, requestUri, request.getRemoteAddr()); } else { - getLogger().warn("Request from {} cannot be processed, container queue is full; responding with SERVICE_UNAVAILABLE", - new Object[]{request.getRemoteAddr()}); - - response.sendError(HttpServletResponse.SC_SERVICE_UNAVAILABLE, "Container queue is full"); - async.complete(); + sendError(SC_SERVICE_UNAVAILABLE, "Request Queue Full", container); } } }); @@ -525,7 +500,9 @@ public class HandleHttpRequest extends AbstractProcessor { this.server = server; server.start(); - getLogger().info("Server started and listening on port " + getPort()); + for (final Connector connector : server.getConnectors()) { + getLogger().info("Started Connector {}", connector); + } initialized.set(true); ready = true; @@ -561,54 +538,50 @@ public class HandleHttpRequest extends AbstractProcessor { public void shutdown() throws Exception { ready = false; - if (server != null) { - getLogger().debug("Shutting down server"); - rejectPendingRequests(); - server.stop(); - server.destroy(); - server.join(); - clearInit(); - getLogger().info("Shut down {}", new Object[]{server}); - } - } + if (server == null) { + getLogger().debug("Server not configured"); + } else { + if (server.isStopped()) { + getLogger().debug("Server Stopped {}", server); + } else { + for (final Connector connector : server.getConnectors()) { + getLogger().debug("Stopping Connector {}", connector); + } - void rejectPendingRequests() { - HttpRequestContainer container; - while ((container = getNextContainer()) != null) { - try { - getLogger().warn("Rejecting request from {} during cleanup after processor shutdown; responding with SERVICE_UNAVAILABLE", - new Object[]{container.getRequest().getRemoteAddr()}); + drainContainerQueue(); + server.stop(); + server.destroy(); + server.join(); + clearInit(); - HttpServletResponse response = container.getResponse(); - response.sendError(HttpServletResponse.SC_SERVICE_UNAVAILABLE, "Processor is shutting down"); - container.getContext().complete(); - } catch (final IOException e) { - getLogger().warn("Failed to send HTTP response to {} due to {}", - new Object[]{container.getRequest().getRemoteAddr(), e}); + for (final Connector connector : server.getConnectors()) { + getLogger().info("Stopped Connector {}", connector); + } } } } - private HttpRequestContainer getNextContainer() { - HttpRequestContainer container; - try { - container = containerQueue.poll(2, TimeUnit.SECONDS); - } catch (final InterruptedException e) { - getLogger().warn("Interrupted while polling for " + HttpRequestContainer.class.getSimpleName() + " during cleanup."); - container = null; + void drainContainerQueue() { + if (containerQueue.isEmpty()) { + getLogger().debug("No Pending Requests Queued"); + } else { + final List pendingContainers = new ArrayList<>(); + containerQueue.drainTo(pendingContainers); + getLogger().warn("Pending Requests Queued [{}]", pendingContainers.size()); + for (final HttpRequestContainer container : pendingContainers) { + sendError(SC_SERVICE_UNAVAILABLE, "Stopping Server", container); + } } - - return container; } @OnPrimaryNodeStateChange - public void onPrimaryNodeChange(final PrimaryNodeState newState) { - if (runOnPrimary.get() && newState.equals(PrimaryNodeState.PRIMARY_NODE_REVOKED)) { + public void onPrimaryNodeChange(final PrimaryNodeState state) { + if (runOnPrimary.get() && state.equals(PrimaryNodeState.PRIMARY_NODE_REVOKED)) { + getLogger().info("Server Shutdown Started: Primary Node State Changed [{}]", state); try { shutdown(); - } catch (final Exception shutdownException) { - getLogger().warn("Processor is configured to run only on Primary Node, but failed to shutdown HTTP server following revocation of primary node status due to {}", - shutdownException); + } catch (final Exception e) { + getLogger().warn("Server Shutdown Failed: Primary Node State Changed [{}]", state, e); } } } @@ -616,7 +589,7 @@ public class HandleHttpRequest extends AbstractProcessor { @Override public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { try { - if(!initialized.get()) { + if (!initialized.get()) { initializeServer(context); } } catch (Exception e) { @@ -626,7 +599,7 @@ public class HandleHttpRequest extends AbstractProcessor { // shutdown to release any resources allocated during the failed initialization shutdown(); } catch (final Exception shutdownException) { - getLogger().debug("Failed to shutdown following a failed initialization: " + shutdownException); + getLogger().debug("Server Shutdown Failed after Initialization Failed", shutdownException); } throw new ProcessException("Failed to initialize the server", e); @@ -647,14 +620,14 @@ public class HandleHttpRequest extends AbstractProcessor { final long start = System.nanoTime(); final HttpServletRequest request = container.getRequest(); - if (!Strings.isNullOrEmpty(request.getContentType()) && request.getContentType().contains(MIME_TYPE__MULTIPART_FORM_DATA)) { + if (StringUtils.contains(request.getContentType(), MIME_TYPE__MULTIPART_FORM_DATA)) { final long requestMaxSize = context.getProperty(MULTIPART_REQUEST_MAX_SIZE).asDataSize(DataUnit.B).longValue(); final int readBufferSize = context.getProperty(MULTIPART_READ_BUFFER_SIZE).asDataSize(DataUnit.B).intValue(); String tempDir = System.getProperty("java.io.tmpdir"); request.setAttribute(Request.MULTIPART_CONFIG_ELEMENT, new MultipartConfigElement(tempDir, requestMaxSize, requestMaxSize, readBufferSize)); List parts = null; try { - parts = ImmutableList.copyOf(request.getParts()); + parts = Collections.unmodifiableList(new ArrayList<>(request.getParts())); int allPartsCount = parts.size(); final String contextIdentifier = UUID.randomUUID().toString(); for (int i = 0; i < allPartsCount; i++) { @@ -663,22 +636,21 @@ public class HandleHttpRequest extends AbstractProcessor { try (OutputStream flowFileOut = session.write(flowFile)) { StreamUtils.copy(part.getInputStream(), flowFileOut); } catch (IOException e) { - handleFlowContentStreamingError(session, container, request, Optional.of(flowFile), e); + handleFlowContentStreamingError(session, container, Optional.of(flowFile), e); return; } - flowFile = savePartAttributes(context, session, part, flowFile, i, allPartsCount); + flowFile = savePartAttributes(session, part, flowFile, i, allPartsCount); flowFile = saveRequestAttributes(context, session, request, flowFile, contextIdentifier); if (i == 0) { // each one of multipart comes from a single request, thus registering only once per loop. - boolean requestRegistrationSuccess = registerRequest(context, session, container, start, request, flowFile); + boolean requestRegistrationSuccess = registerRequest(context, session, container, flowFile); if (!requestRegistrationSuccess) break; } - forwardFlowFile(context, session, container, start, request, flowFile); + forwardFlowFile(session, start, request, flowFile); } } catch (IOException | ServletException | IllegalStateException e) { - handleFlowContentStreamingError(session, container, request, Optional.absent(), e); - return; + handleFlowContentStreamingError(session, container, Optional.empty(), e); } finally { if (parts != null) { for (Part part : parts) { @@ -695,18 +667,18 @@ public class HandleHttpRequest extends AbstractProcessor { try (OutputStream flowFileOut = session.write(flowFile)) { StreamUtils.copy(request.getInputStream(), flowFileOut); } catch (final IOException e) { - handleFlowContentStreamingError(session, container, request, Optional.of(flowFile), e); + handleFlowContentStreamingError(session, container, Optional.of(flowFile), e); return; } final String contextIdentifier = UUID.randomUUID().toString(); flowFile = saveRequestAttributes(context, session, request, flowFile, contextIdentifier); - boolean requestRegistrationSuccess = registerRequest(context, session, container, start, request, flowFile); + boolean requestRegistrationSuccess = registerRequest(context, session, container, flowFile); if (requestRegistrationSuccess) - forwardFlowFile(context, session, container, start, request, flowFile); + forwardFlowFile(session, start, request, flowFile); } } - private FlowFile savePartAttributes(ProcessContext context, ProcessSession session, Part part, FlowFile flowFile, final int i, final int allPartsCount) { + private FlowFile savePartAttributes(ProcessSession session, Part part, FlowFile flowFile, final int i, final int allPartsCount) { final Map attributes = new HashMap<>(); for (String headerName : part.getHeaderNames()) { final String headerValue = part.getHeader(headerName); @@ -817,76 +789,53 @@ public class HandleHttpRequest extends AbstractProcessor { putAttribute(attributes, "http.principal.name", principal.getName()); } - final X509Certificate certs[] = (X509Certificate[]) request.getAttribute("javax.servlet.request.X509Certificate"); - final String subjectDn; + final X509Certificate[] certs = (X509Certificate[]) request.getAttribute("javax.servlet.request.X509Certificate"); if (certs != null && certs.length > 0) { final X509Certificate cert = certs[0]; - subjectDn = cert.getSubjectDN().getName(); + final String subjectDn = cert.getSubjectDN().getName(); final String issuerDn = cert.getIssuerDN().getName(); putAttribute(attributes, HTTPUtils.HTTP_SSL_CERT, subjectDn); putAttribute(attributes, "http.issuer.dn", issuerDn); - } else { - subjectDn = null; } return session.putAllAttributes(flowFile, attributes); } - private void forwardFlowFile(final ProcessContext context, final ProcessSession session, - HttpRequestContainer container, final long start, final HttpServletRequest request, FlowFile flowFile) { + private void forwardFlowFile(final ProcessSession session, final long start, final HttpServletRequest request, final FlowFile flowFile) { final long receiveMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start); - String subjectDn = flowFile.getAttribute(HTTPUtils.HTTP_SSL_CERT); + final String subjectDn = flowFile.getAttribute(HTTPUtils.HTTP_SSL_CERT); session.getProvenanceReporter().receive(flowFile, HTTPUtils.getURI(flowFile.getAttributes()), "Received from " + request.getRemoteAddr() + (subjectDn == null ? "" : " with DN=" + subjectDn), receiveMillis); session.transfer(flowFile, REL_SUCCESS); - getLogger().info("Transferring {} to 'success'; received from {}", new Object[]{flowFile, request.getRemoteAddr()}); + getLogger().debug("Transferred {} to [{}] Remote Address [{}] ", flowFile, REL_SUCCESS, request.getRemoteAddr()); } private boolean registerRequest(final ProcessContext context, final ProcessSession session, - HttpRequestContainer container, final long start, final HttpServletRequest request, FlowFile flowFile) { + final HttpRequestContainer container, final FlowFile flowFile) { final HttpContextMap contextMap = context.getProperty(HTTP_CONTEXT_MAP).asControllerService(HttpContextMap.class); - String contextIdentifier = flowFile.getAttribute(HTTPUtils.HTTP_CONTEXT_ID); + final String contextIdentifier = flowFile.getAttribute(HTTPUtils.HTTP_CONTEXT_ID); + final HttpServletRequest request = container.getRequest(); final boolean registered = contextMap.register(contextIdentifier, request, container.getResponse(), container.getContext()); - if (registered) - return true; - - getLogger().warn("Received request from {} but could not process it because too many requests are already outstanding; responding with SERVICE_UNAVAILABLE", - new Object[]{request.getRemoteAddr()}); - - try { - container.getResponse().sendError(HttpServletResponse.SC_SERVICE_UNAVAILABLE, "HttpContextMap is full"); - container.getContext().complete(); - } catch (final Exception e) { - getLogger().warn("Failed to respond with SERVICE_UNAVAILABLE message to {} due to {}", - new Object[]{request.getRemoteAddr(), e}); + if (registered) { + return true; } session.remove(flowFile); + sendError(SC_SERVICE_UNAVAILABLE, "Request Registration Failed", container); return false; } - - protected void handleFlowContentStreamingError(final ProcessSession session, HttpRequestContainer container, - final HttpServletRequest request, Optional flowFile, final Exception e) { - // There may be many reasons which can produce an IOException on the HTTP stream and in some of them, eg. - // bad requests, the connection to the client is not closed. In order to address also these cases, we try - // and answer with a BAD_REQUEST, which lets the client know that the request has not been correctly - // processed and makes it aware that the connection can be closed. - getLogger().error("Failed to receive content from HTTP Request from {} due to {}", - new Object[]{request.getRemoteAddr(), e}); - if (flowFile.isPresent()) - session.remove(flowFile.get()); - - try { - HttpServletResponse response = container.getResponse(); - response.sendError(HttpServletResponse.SC_BAD_REQUEST); - container.getContext().complete(); - } catch (final IOException ioe) { - getLogger().warn("Failed to send HTTP response to {} due to {}", - new Object[]{request.getRemoteAddr(), ioe}); - } + protected void handleFlowContentStreamingError(final ProcessSession session, final HttpRequestContainer container, final Optional flowFile, final Exception e) { + // There may be many reasons which can produce an IOException on the HTTP stream and in some of them, eg. + // bad requests, the connection to the client is not closed. In order to address also these cases, we try + // and answer with a BAD_REQUEST, which lets the client know that the request has not been correctly + // processed and makes it aware that the connection can be closed. + final HttpServletRequest request = container.getRequest(); + getLogger().error("Stream Processing Failed: Method [{}] URI [{}] Address [{}]", request.getMethod(), request.getRequestURI(), request.getRemoteAddr(), e); + flowFile.ifPresent(session::remove); + sendError(SC_BAD_REQUEST, "Stream Processing Failed", container); } private void putAttribute(final Map map, final String key, final Object value) { @@ -905,8 +854,34 @@ public class HandleHttpRequest extends AbstractProcessor { map.put(key, value); } - private static class HttpRequestContainer { + private void sendError(final int statusCode, final String message, final HttpRequestContainer container) { + sendError(statusCode, message, container.getRequest(), container.getResponse()); + final AsyncContext asyncContext = container.getContext(); + try { + asyncContext.complete(); + } catch (final RuntimeException e) { + final HttpServletRequest request = container.getRequest(); + final String method = request.getMethod(); + final String uri = request.getRequestURI(); + final String remoteAddr = request.getRemoteAddr(); + getLogger().error("Complete Request Failed: Method [{}] URI [{}] Address [{}]", method, uri, remoteAddr, e); + } + } + private void sendError(final int statusCode, final String message, final HttpServletRequest request, final HttpServletResponse response) { + final String method = request.getMethod(); + final String uri = request.getRequestURI(); + final String remoteAddr = request.getRemoteAddr(); + + try { + response.sendError(statusCode, message); + getLogger().warn("Send Error Completed: HTTP {} [{}] Method [{}] URI [{}] Address [{}]", statusCode, message, method, uri, remoteAddr); + } catch (final Exception e) { + getLogger().error("Send Error Failed: HTTP {} [{}] Method [{}] URI [{}] Address [{}]", statusCode, message, method, uri, remoteAddr, e); + } + } + + private static class HttpRequestContainer { private final HttpServletRequest request; private final HttpServletResponse response; private final AsyncContext context; diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/ITestHandleHttpRequest.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/ITestHandleHttpRequest.java index 67954f1852..9e98ef331a 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/ITestHandleHttpRequest.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/ITestHandleHttpRequest.java @@ -17,23 +17,31 @@ package org.apache.nifi.processors.standard; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; +import static org.junit.Assert.assertNull; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.when; import java.io.File; +import java.io.FileWriter; import java.io.IOException; import java.io.InputStream; +import java.io.PrintWriter; import java.net.HttpURLConnection; import java.net.URL; import java.util.ArrayList; import java.util.List; +import java.util.Optional; import java.util.Random; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; import java.util.stream.IntStream; import javax.net.ssl.HttpsURLConnection; import javax.net.ssl.SSLContext; @@ -41,10 +49,6 @@ import javax.servlet.AsyncContext; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; -import com.google.api.client.util.Charsets; -import com.google.common.base.Optional; -import com.google.common.collect.Iterables; -import com.google.common.io.Files; import okhttp3.Call; import okhttp3.Callback; import okhttp3.MediaType; @@ -53,17 +57,21 @@ import okhttp3.OkHttpClient; import okhttp3.Request; import okhttp3.RequestBody; import okhttp3.Response; +import org.apache.nifi.annotation.notification.PrimaryNodeState; import org.apache.nifi.controller.AbstractControllerService; import org.apache.nifi.http.HttpContextMap; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processors.standard.util.HTTPUtils; +import org.apache.nifi.remote.io.socket.NetworkUtils; import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.scheduling.ExecutionNode; import org.apache.nifi.security.util.TlsException; import org.apache.nifi.ssl.RestrictedSSLContextService; import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; import org.apache.nifi.web.util.ssl.SslContextUtils; +import org.jetbrains.annotations.NotNull; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -179,11 +187,11 @@ public class ITestHandleHttpRequest { .addFormDataPart("p1", "v1") .addFormDataPart("p2", "v2") .addFormDataPart("file1", "my-file-text.txt", - RequestBody.create(MediaType.parse("text/plain"), createTextFile("my-file-text.txt", "Hello", "World"))) + RequestBody.create(createTextFile("Hello", "World"), MediaType.parse("text/plain"))) .addFormDataPart("file2", "my-file-data.json", - RequestBody.create(MediaType.parse("application/json"), createTextFile("my-file-text.txt", "{ \"name\":\"John\", \"age\":30 }"))) + RequestBody.create(createTextFile( "{ \"name\":\"John\", \"age\":30 }"), MediaType.parse("application/json"))) .addFormDataPart("file3", "my-file-binary.bin", - RequestBody.create(MediaType.parse("application/octet-stream"), generateRandomBinaryData(100))) + RequestBody.create(generateRandomBinaryData(), MediaType.parse("application/octet-stream"))) .build(); Request request = new Request.Builder() @@ -324,7 +332,6 @@ public class ITestHandleHttpRequest { // We cannot rely on the order we sent them in. for (int i = 1; i < 4; i++) { MockFlowFile mff = findFlowFile(flowFilesForRelationship, "http.multipart.name", String.format("p%d", i)); - String contextId = mff.getAttribute(HTTPUtils.HTTP_CONTEXT_ID); mff.assertAttributeEquals("http.multipart.name", String.format("p%d", i)); mff.assertAttributeExists("http.param.p1"); mff.assertAttributeEquals("http.param.p1", "v1"); @@ -364,11 +371,11 @@ public class ITestHandleHttpRequest { .addFormDataPart("p1", "v1") .addFormDataPart("p2", "v2") .addFormDataPart("file1", "my-file-text.txt", - RequestBody.create(MediaType.parse("text/plain"), createTextFile("my-file-text.txt", "Hello", "World"))) + RequestBody.create(createTextFile("my-file-text.txt", "Hello", "World"), MediaType.parse("text/plain"))) .addFormDataPart("file2", "my-file-data.json", - RequestBody.create(MediaType.parse("application/json"), createTextFile("my-file-text.txt", "{ \"name\":\"John\", \"age\":30 }"))) + RequestBody.create(createTextFile("my-file-text.txt", "{ \"name\":\"John\", \"age\":30 }"), MediaType.parse("application/json"))) .addFormDataPart("file3", "my-file-binary.bin", - RequestBody.create(MediaType.parse("application/octet-stream"), generateRandomBinaryData(100))) + RequestBody.create(generateRandomBinaryData(), MediaType.parse("application/octet-stream"))) .build(); Request request = new Request.Builder() @@ -383,12 +390,12 @@ public class ITestHandleHttpRequest { Callback callback = new Callback() { @Override - public void onFailure(Call call, IOException e) { + public void onFailure(@NotNull Call call, @NotNull IOException e) { // Not going to happen } @Override - public void onResponse(Call call, Response response) { + public void onResponse(@NotNull Call call, @NotNull Response response) { responseCode.set(response.code()); resultReady.countDown(); } @@ -409,25 +416,27 @@ public class ITestHandleHttpRequest { Assert.assertEquals(503, responseCode.get()); } - private byte[] generateRandomBinaryData(int i) { + private byte[] generateRandomBinaryData() { byte[] bytes = new byte[100]; new Random().nextBytes(bytes); return bytes; } - private File createTextFile(String fileName, String... lines) throws IOException { - File file = new File(fileName); + private File createTextFile(String... lines) throws IOException { + File file = new File(getClass().getSimpleName()); file.deleteOnExit(); - for (String string : lines) { - Files.append(string, file, Charsets.UTF_8); + try (final PrintWriter writer = new PrintWriter(new FileWriter(file))) { + for (final String line : lines) { + writer.println(line); + } } return file; } protected MockFlowFile findFlowFile(List flowFilesForRelationship, String attributeName, String attributeValue) { - Optional optional = Iterables.tryFind(flowFilesForRelationship, ff -> ff.getAttribute(attributeName).equals(attributeValue)); + Optional optional = flowFilesForRelationship.stream().filter(ff -> ff.getAttribute(attributeName).equals(attributeValue)).findFirst(); Assert.assertTrue(optional.isPresent()); return optional.get(); } @@ -450,7 +459,6 @@ public class ITestHandleHttpRequest { contextMap.setRegisterSuccessfully(false); final int[] responseCode = new int[1]; - responseCode[0] = 0; final Thread httpThread = new Thread(new Runnable() { @Override public void run() { @@ -543,12 +551,12 @@ public class ITestHandleHttpRequest { Callback callback = new Callback() { @Override - public void onFailure(Call call, IOException e) { + public void onFailure(@NotNull Call call, @NotNull IOException e) { // Will only happen once for the first non-rejected request, but not important } @Override - public void onResponse(Call call, Response response) throws IOException { + public void onResponse(@NotNull Call call, @NotNull Response response) { responses.add(response); cleanupDone.countDown(); } @@ -583,10 +591,64 @@ public class ITestHandleHttpRequest { assertEquals(responses.size(), nrOfRequests - 1); for (Response response : responses) { assertEquals(HttpServletResponse.SC_SERVICE_UNAVAILABLE, response.code()); - assertTrue("Unexpected HTTP response for rejected requests", new String(response.body().bytes()).contains("Processor is shutting down")); } } + @Test(timeout = 15000) + public void testOnPrimaryNodeChangePrimaryNodeRevoked() throws Exception { + processor = new HandleHttpRequest(); + final TestRunner runner = TestRunners.newTestRunner(processor); + final int port = NetworkUtils.getAvailableTcpPort(); + runner.setProperty(HandleHttpRequest.PORT, Integer.toString(port)); + + final MockHttpContextMap contextMap = new MockHttpContextMap(); + final String contextMapId = MockHttpContextMap.class.getSimpleName(); + runner.addControllerService(contextMapId, contextMap); + runner.enableControllerService(contextMap); + runner.setProperty(HandleHttpRequest.HTTP_CONTEXT_MAP, contextMapId); + + final ProcessContext processContext = spy(runner.getProcessContext()); + when(processContext.getExecutionNode()).thenReturn(ExecutionNode.PRIMARY); + processor.initializeServer(processContext); + + final OkHttpClient client = new OkHttpClient.Builder().build(); + + final String url = String.format("http://localhost:%d", port); + final ExecutorService executorService = Executors.newSingleThreadExecutor(); + + final CountDownLatch requestCompleted = new CountDownLatch(1); + final CountDownLatch requestStarted = new CountDownLatch(1); + + final AtomicReference requestException = new AtomicReference<>(); + final AtomicInteger responseStatus = new AtomicInteger(); + executorService.execute(() -> { + final Request request = new Request.Builder().url(url).get().build(); + final Call call = client.newCall(request); + call.enqueue(new Callback() { + @Override + public void onFailure(@NotNull Call call, @NotNull IOException e) { + requestException.set(e); + requestCompleted.countDown(); + } + + @Override + public void onResponse(@NotNull Call call, @NotNull Response response) { + responseStatus.set(response.code()); + requestCompleted.countDown(); + } + }); + requestStarted.countDown(); + }); + + requestStarted.await(); + Thread.sleep(1000); + processor.onPrimaryNodeChange(PrimaryNodeState.PRIMARY_NODE_REVOKED); + requestCompleted.await(); + + assertNull("HTTP Request Exception found", requestException.get()); + assertEquals("HTTP Status not matched", HttpServletResponse.SC_SERVICE_UNAVAILABLE, responseStatus.get()); + } + @Test public void testSecure() throws Exception { secureTest(false); @@ -610,7 +672,7 @@ public class ITestHandleHttpRequest { runner.enableControllerService(contextMap); runner.setProperty(HandleHttpRequest.HTTP_CONTEXT_MAP, "http-context-map"); - final RestrictedSSLContextService sslContextService = Mockito.mock(RestrictedSSLContextService.class); + final RestrictedSSLContextService sslContextService = mock(RestrictedSSLContextService.class); final String serviceIdentifier = RestrictedSSLContextService.class.getName(); Mockito.when(sslContextService.getIdentifier()).thenReturn(serviceIdentifier); Mockito.when(sslContextService.createContext()).thenReturn(keyStoreSslContext); @@ -683,7 +745,7 @@ public class ITestHandleHttpRequest { } @Override - void rejectPendingRequests() { + void drainContainerQueue() { // Skip this, otherwise it would wait to make sure there are no more requests } }; @@ -691,7 +753,7 @@ public class ITestHandleHttpRequest { private void sendRequest(HttpURLConnection connection, CountDownLatch requestSent) throws Exception { Future executionFuture = Executors.newSingleThreadExecutor() - .submit(() -> connection.getInputStream()); + .submit(connection::getInputStream); requestSent.countDown(); @@ -701,12 +763,12 @@ public class ITestHandleHttpRequest { private void sendRequest(OkHttpClient client, Request request, CountDownLatch requestSent) { Callback callback = new Callback() { @Override - public void onFailure(Call call, IOException e) { + public void onFailure(@NotNull Call call, @NotNull IOException e) { // We (may) get a timeout as the processor doesn't answer unless there is some kind of error } @Override - public void onResponse(Call call, Response response) throws IOException { + public void onResponse(@NotNull Call call, @NotNull Response response) { // Not called as the processor doesn't answer unless there is some kind of error } }; @@ -747,10 +809,6 @@ public class ITestHandleHttpRequest { return responseMap.size(); } - public boolean isRegisterSuccessfully() { - return registerSuccessfully; - } - public void setRegisterSuccessfully(boolean registerSuccessfully) { this.registerSuccessfully = registerSuccessfully; }