diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/HandleHttpRequest.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/HandleHttpRequest.java index 3717a33c6c..66b940aa32 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/HandleHttpRequest.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/HandleHttpRequest.java @@ -25,7 +25,7 @@ import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.SeeAlso; import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.lifecycle.OnScheduled; -import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.annotation.lifecycle.OnUnscheduled; import org.apache.nifi.annotation.notification.OnPrimaryNodeStateChange; import org.apache.nifi.annotation.notification.PrimaryNodeState; import org.apache.nifi.components.AllowableValue; @@ -304,6 +304,7 @@ public class HandleHttpRequest extends AbstractProcessor { } private volatile Server server; + private volatile boolean ready; private AtomicBoolean initialized = new AtomicBoolean(false); private volatile BlockingQueue containerQueue; private AtomicBoolean runOnPrimary = new AtomicBoolean(false); @@ -323,7 +324,7 @@ public class HandleHttpRequest extends AbstractProcessor { initialized.set(false); } - private synchronized void initializeServer(final ProcessContext context) throws Exception { + synchronized void initializeServer(final ProcessContext context) throws Exception { if(initialized.get()){ return; } @@ -461,6 +462,12 @@ public class HandleHttpRequest extends AbstractProcessor { response.sendError(HttpServletResponse.SC_SERVICE_UNAVAILABLE, "Processor queue is full"); return; + } else if (!ready) { + getLogger().warn("Request from {} cannot be processed, processor is being shut down; responding with SERVICE_UNAVAILABLE", + new Object[]{request.getRemoteAddr()}); + + response.sendError(HttpServletResponse.SC_SERVICE_UNAVAILABLE, "Processor is shutting down"); + return; } // Right now, that information, though, is only in the ProcessSession, not the ProcessContext, @@ -491,6 +498,7 @@ public class HandleHttpRequest extends AbstractProcessor { getLogger().info("Server started and listening on port " + getPort()); initialized.set(true); + ready = true; } protected int getPort() { @@ -535,10 +543,13 @@ public class HandleHttpRequest extends AbstractProcessor { return sslFactory; } - @OnStopped + @OnUnscheduled public void shutdown() throws Exception { + ready = false; + if (server != null) { getLogger().debug("Shutting down server"); + rejectPendingRequests(); server.stop(); server.destroy(); server.join(); @@ -547,6 +558,35 @@ public class HandleHttpRequest extends AbstractProcessor { } } + void rejectPendingRequests() { + HttpRequestContainer container; + while ((container = getNextContainer()) != null) { + try { + getLogger().warn("Rejecting request from {} during cleanup after processor shutdown; responding with SERVICE_UNAVAILABLE", + new Object[]{container.getRequest().getRemoteAddr()}); + + HttpServletResponse response = container.getResponse(); + response.sendError(HttpServletResponse.SC_SERVICE_UNAVAILABLE, "Processor is shutting down"); + container.getContext().complete(); + } catch (final IOException e) { + getLogger().warn("Failed to send HTTP response to {} due to {}", + new Object[]{container.getRequest().getRemoteAddr(), e}); + } + } + } + + private HttpRequestContainer getNextContainer() { + HttpRequestContainer container; + try { + container = containerQueue.poll(2, TimeUnit.SECONDS); + } catch (final InterruptedException e) { + getLogger().warn("Interrupted while polling for " + HttpRequestContainer.class.getSimpleName() + " during cleanup."); + container = null; + } + + return container; + } + @OnPrimaryNodeStateChange public void onPrimaryNodeChange(final PrimaryNodeState newState) { if (runOnPrimary.get() && newState.equals(PrimaryNodeState.PRIMARY_NODE_REVOKED)) { diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/ITestHandleHttpRequest.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/ITestHandleHttpRequest.java index f65c0b958e..391d22ddac 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/ITestHandleHttpRequest.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/ITestHandleHttpRequest.java @@ -20,6 +20,8 @@ import com.google.api.client.util.Charsets; import com.google.common.base.Optional; import com.google.common.collect.Iterables; import com.google.common.io.Files; +import okhttp3.Call; +import okhttp3.Callback; import okhttp3.MediaType; import okhttp3.MultipartBody; import okhttp3.OkHttpClient; @@ -28,17 +30,17 @@ import okhttp3.RequestBody; import okhttp3.Response; import org.apache.nifi.controller.AbstractControllerService; import org.apache.nifi.http.HttpContextMap; +import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processors.standard.util.HTTPUtils; import org.apache.nifi.reporting.InitializationException; import org.apache.nifi.security.util.SslContextFactory; import org.apache.nifi.ssl.SSLContextService; import org.apache.nifi.ssl.StandardRestrictedSSLContextService; import org.apache.nifi.ssl.StandardSSLContextService; -import org.apache.nifi.stream.io.NullOutputStream; -import org.apache.nifi.stream.io.StreamUtils; import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; +import org.junit.After; import org.junit.Assert; import org.junit.Test; @@ -49,22 +51,31 @@ import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; import java.io.File; import java.io.IOException; +import java.io.InputStream; import java.net.HttpURLConnection; import java.net.MalformedURLException; import java.net.URL; +import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Random; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.IntStream; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; public class ITestHandleHttpRequest { + private HandleHttpRequest processor; + private static Map getTruststoreProperties() { final Map props = new HashMap<>(); props.put(StandardSSLContextService.TRUSTSTORE.getName(), "src/test/resources/truststore.jks"); @@ -103,9 +114,21 @@ public class ITestHandleHttpRequest { return service.createSSLContext(clientAuth); } + @After + public void tearDown() throws Exception { + if (processor != null) { + processor.shutdown(); + } + } + @Test(timeout=30000) public void testRequestAddedToService() throws InitializationException, MalformedURLException, IOException, InterruptedException { - final TestRunner runner = TestRunners.newTestRunner(HandleHttpRequest.class); + CountDownLatch serverReady = new CountDownLatch(1); + CountDownLatch requestSent = new CountDownLatch(1); + + processor = createProcessor(serverReady, requestSent); + + final TestRunner runner = TestRunners.newTestRunner(processor); runner.setProperty(HandleHttpRequest.PORT, "0"); final MockHttpContextMap contextMap = new MockHttpContextMap(); @@ -113,73 +136,65 @@ public class ITestHandleHttpRequest { runner.enableControllerService(contextMap); runner.setProperty(HandleHttpRequest.HTTP_CONTEXT_MAP, "http-context-map"); - // trigger processor to stop but not shutdown. - runner.run(1, false); - try { - final Thread httpThread = new Thread(new Runnable() { - @Override - public void run() { - try { - final int port = ((HandleHttpRequest) runner.getProcessor()).getPort(); - final HttpURLConnection connection = (HttpURLConnection) new URL("http://localhost:" - + port + "/my/path?query=true&value1=value1&value2=&value3&value4=apple=orange").openConnection(); - connection.setDoOutput(false); - connection.setRequestMethod("GET"); - connection.setRequestProperty("header1", "value1"); - connection.setRequestProperty("header2", ""); - connection.setRequestProperty("header3", "apple=orange"); - connection.setConnectTimeout(3000); - connection.setReadTimeout(3000); + final Thread httpThread = new Thread(new Runnable() { + @Override + public void run() { + try { + serverReady.await(); + final int port = ((HandleHttpRequest) runner.getProcessor()).getPort(); + final HttpURLConnection connection = (HttpURLConnection) new URL("http://localhost:" + + port + "/my/path?query=true&value1=value1&value2=&value3&value4=apple=orange").openConnection(); - StreamUtils.copy(connection.getInputStream(), new NullOutputStream()); - } catch (final Throwable t) { - t.printStackTrace(); - Assert.fail(t.toString()); - } + connection.setDoOutput(false); + connection.setRequestMethod("GET"); + connection.setRequestProperty("header1", "value1"); + connection.setRequestProperty("header2", ""); + connection.setRequestProperty("header3", "apple=orange"); + connection.setConnectTimeout(30000); + connection.setReadTimeout(30000); + + sendRequest(connection, requestSent); + } catch (final Throwable t) { + // Do nothing as HandleHttpRequest doesn't respond normally } - }); - httpThread.start(); - - while ( runner.getFlowFilesForRelationship(HandleHttpRequest.REL_SUCCESS).isEmpty() ) { - // process the request. - runner.run(1, false, false); } + }); - runner.assertAllFlowFilesTransferred(HandleHttpRequest.REL_SUCCESS, 1); - assertEquals(1, contextMap.size()); + httpThread.start(); + runner.run(1, false); - final MockFlowFile mff = runner.getFlowFilesForRelationship(HandleHttpRequest.REL_SUCCESS).get(0); - mff.assertAttributeEquals("http.query.param.query", "true"); - mff.assertAttributeEquals("http.query.param.value1", "value1"); - mff.assertAttributeEquals("http.query.param.value2", ""); - mff.assertAttributeEquals("http.query.param.value3", ""); - mff.assertAttributeEquals("http.query.param.value4", "apple=orange"); - mff.assertAttributeEquals("http.headers.header1", "value1"); - mff.assertAttributeEquals("http.headers.header3", "apple=orange"); - } finally { - // shut down the server - runner.run(1, true); - } + runner.assertAllFlowFilesTransferred(HandleHttpRequest.REL_SUCCESS, 1); + assertEquals(1, contextMap.size()); + + final MockFlowFile mff = runner.getFlowFilesForRelationship(HandleHttpRequest.REL_SUCCESS).get(0); + mff.assertAttributeEquals("http.query.param.query", "true"); + mff.assertAttributeEquals("http.query.param.value1", "value1"); + mff.assertAttributeEquals("http.query.param.value2", ""); + mff.assertAttributeEquals("http.query.param.value3", ""); + mff.assertAttributeEquals("http.query.param.value4", "apple=orange"); + mff.assertAttributeEquals("http.headers.header1", "value1"); + mff.assertAttributeEquals("http.headers.header3", "apple=orange"); } - @Test(timeout=30000) public void testMultipartFormDataRequest() throws InitializationException, MalformedURLException, IOException, InterruptedException { - final TestRunner runner = TestRunners.newTestRunner(HandleHttpRequest.class); - runner.setProperty(HandleHttpRequest.PORT, "0"); + CountDownLatch serverReady = new CountDownLatch(1); + CountDownLatch requestSent = new CountDownLatch(1); - final MockHttpContextMap contextMap = new MockHttpContextMap(); - runner.addControllerService("http-context-map", contextMap); - runner.enableControllerService(contextMap); - runner.setProperty(HandleHttpRequest.HTTP_CONTEXT_MAP, "http-context-map"); + processor = createProcessor(serverReady, requestSent); + final TestRunner runner = TestRunners.newTestRunner(processor); + runner.setProperty(HandleHttpRequest.PORT, "0"); + + final MockHttpContextMap contextMap = new MockHttpContextMap(); + runner.addControllerService("http-context-map", contextMap); + runner.enableControllerService(contextMap); + runner.setProperty(HandleHttpRequest.HTTP_CONTEXT_MAP, "http-context-map"); - // trigger processor to stop but not shutdown. - runner.run(1, false); - try { final Thread httpThread = new Thread(new Runnable() { @Override public void run() { try { + serverReady.await(); final int port = ((HandleHttpRequest) runner.getProcessor()).getPort(); @@ -202,21 +217,15 @@ public class ITestHandleHttpRequest { .writeTimeout(3000, TimeUnit.MILLISECONDS) .build(); - try (Response response = client.newCall(request).execute()) { - Assert.assertTrue(String.format("Unexpected code: %s, body: %s", response.code(), response.body().string()), response.isSuccessful()); - } - } catch (final Throwable t) { - t.printStackTrace(); - Assert.fail(t.toString()); + sendRequest(client, request, requestSent); + } catch (Exception e) { + // Do nothing as HandleHttpRequest doesn't respond normally } } }); - httpThread.start(); - while ( runner.getFlowFilesForRelationship(HandleHttpRequest.REL_SUCCESS).isEmpty() ) { - // process the request. - runner.run(1, false, false); - } + httpThread.start(); + runner.run(1, false, false); runner.assertAllFlowFilesTransferred(HandleHttpRequest.REL_SUCCESS, 5); assertEquals(1, contextMap.size()); @@ -275,31 +284,30 @@ public class ITestHandleHttpRequest { mff.assertAttributeExists("http.multipart.fragments.sequence.number"); mff.assertAttributeEquals("http.multipart.fragments.total.number", "5"); mff.assertAttributeExists("http.headers.multipart.content-disposition"); - } finally { - // shut down the server - runner.run(1, true); - } } @Test(timeout=30000) public void testMultipartFormDataRequestFailToRegisterContext() throws InitializationException, MalformedURLException, IOException, InterruptedException { - final TestRunner runner = TestRunners.newTestRunner(HandleHttpRequest.class); - runner.setProperty(HandleHttpRequest.PORT, "0"); + CountDownLatch serverReady = new CountDownLatch(1); + CountDownLatch requestSent = new CountDownLatch(1); + CountDownLatch resultReady = new CountDownLatch(1); - final MockHttpContextMap contextMap = new MockHttpContextMap(); - contextMap.setRegisterSuccessfully(false); - runner.addControllerService("http-context-map", contextMap); - runner.enableControllerService(contextMap); - runner.setProperty(HandleHttpRequest.HTTP_CONTEXT_MAP, "http-context-map"); + processor = createProcessor(serverReady, requestSent); + final TestRunner runner = TestRunners.newTestRunner(processor); + runner.setProperty(HandleHttpRequest.PORT, "0"); + + final MockHttpContextMap contextMap = new MockHttpContextMap(); + contextMap.setRegisterSuccessfully(false); + runner.addControllerService("http-context-map", contextMap); + runner.enableControllerService(contextMap); + runner.setProperty(HandleHttpRequest.HTTP_CONTEXT_MAP, "http-context-map"); - // trigger processor to stop but not shutdown. - runner.run(1, false); - try { AtomicInteger responseCode = new AtomicInteger(0); final Thread httpThread = new Thread(new Runnable() { @Override public void run() { try { + serverReady.await(); final int port = ((HandleHttpRequest) runner.getProcessor()).getPort(); @@ -322,29 +330,32 @@ public class ITestHandleHttpRequest { .writeTimeout(20000, TimeUnit.MILLISECONDS) .build(); - try (Response response = client.newCall(request).execute()) { - responseCode.set(response.code()); - } + Callback callback = new Callback() { + @Override + public void onFailure(Call call, IOException e) { + // Not going to happen + } + + @Override + public void onResponse(Call call, Response response) throws IOException { + responseCode.set(response.code()); + resultReady.countDown(); + } + }; + sendRequest(client, request, callback, requestSent); } catch (final Throwable t) { - t.printStackTrace(); - Assert.fail(t.toString()); + // Do nothing as HandleHttpRequest doesn't respond normally } } }); - httpThread.start(); - while (responseCode.get() == 0) { - // process the request. - runner.run(1, false, false); - } + httpThread.start(); + runner.run(1, false, false); + resultReady.await(); runner.assertAllFlowFilesTransferred(HandleHttpRequest.REL_SUCCESS, 0); assertEquals(0, contextMap.size()); Assert.assertEquals(503, responseCode.get()); - } finally { - // shut down the server - runner.run(1, true); - } } private byte[] generateRandomBinaryData(int i) { @@ -373,7 +384,12 @@ public class ITestHandleHttpRequest { @Test(timeout=30000) public void testFailToRegister() throws InitializationException, MalformedURLException, IOException, InterruptedException { - final TestRunner runner = TestRunners.newTestRunner(HandleHttpRequest.class); + CountDownLatch serverReady = new CountDownLatch(1); + CountDownLatch requestSent = new CountDownLatch(1); + CountDownLatch resultReady = new CountDownLatch(1); + + processor = createProcessor(serverReady, requestSent); + final TestRunner runner = TestRunners.newTestRunner(processor); runner.setProperty(HandleHttpRequest.PORT, "0"); final MockHttpContextMap contextMap = new MockHttpContextMap(); @@ -382,70 +398,160 @@ public class ITestHandleHttpRequest { runner.setProperty(HandleHttpRequest.HTTP_CONTEXT_MAP, "http-context-map"); contextMap.setRegisterSuccessfully(false); - // trigger processor to stop but not shutdown. - runner.run(1, false); - try { - final int[] responseCode = new int[1]; - responseCode[0] = 0; - final Thread httpThread = new Thread(new Runnable() { - @Override - public void run() { - HttpURLConnection connection = null; - try { - final int port = ((HandleHttpRequest) runner.getProcessor()).getPort(); - connection = (HttpURLConnection) new URL("http://localhost:" - + port + "/my/path?query=true&value1=value1&value2=&value3&value4=apple=orange").openConnection(); - connection.setDoOutput(false); - connection.setRequestMethod("GET"); - connection.setRequestProperty("header1", "value1"); - connection.setRequestProperty("header2", ""); - connection.setRequestProperty("header3", "apple=orange"); - connection.setConnectTimeout(20000); - connection.setReadTimeout(20000); + final int[] responseCode = new int[1]; + responseCode[0] = 0; + final Thread httpThread = new Thread(new Runnable() { + @Override + public void run() { + HttpURLConnection connection = null; + try { + serverReady.await(); - StreamUtils.copy(connection.getInputStream(), new NullOutputStream()); - } catch (final Throwable t) { - t.printStackTrace(); - if(connection != null ) { - try { - responseCode[0] = connection.getResponseCode(); - } catch (IOException e) { - responseCode[0] = -1; - } - } else { - responseCode[0] = -2; + final int port = ((HandleHttpRequest) runner.getProcessor()).getPort(); + connection = (HttpURLConnection) new URL("http://localhost:" + + port + "/my/path?query=true&value1=value1&value2=&value3&value4=apple=orange").openConnection(); + connection.setDoOutput(false); + connection.setRequestMethod("GET"); + connection.setRequestProperty("header1", "value1"); + connection.setRequestProperty("header2", ""); + connection.setRequestProperty("header3", "apple=orange"); + connection.setConnectTimeout(20000); + connection.setReadTimeout(20000); + + sendRequest(connection, requestSent); + } catch (final Throwable t) { + if(connection != null ) { + try { + responseCode[0] = connection.getResponseCode(); + } catch (IOException e) { + responseCode[0] = -1; } + } else { + responseCode[0] = -2; } + } finally { + resultReady.countDown(); } - }); - httpThread.start(); - - while (responseCode[0] == 0) { - // process the request. - runner.run(1, false, false); } + }); - runner.assertTransferCount(HandleHttpRequest.REL_SUCCESS, 0); - assertEquals(503, responseCode[0]); + httpThread.start(); + runner.run(1, false, false); + resultReady.await(); - } finally { - // shut down the server - runner.run(1, true); + runner.assertTransferCount(HandleHttpRequest.REL_SUCCESS, 0); + assertEquals(503, responseCode[0]); + } + + @Test + public void testCleanup() throws Exception { + // GIVEN + int nrOfRequests = 5; + + CountDownLatch serverReady = new CountDownLatch(1); + CountDownLatch requestSent = new CountDownLatch(nrOfRequests); + CountDownLatch cleanupDone = new CountDownLatch(nrOfRequests-1); + + processor = new HandleHttpRequest() { + @Override + synchronized void initializeServer(ProcessContext context) throws Exception { + super.initializeServer(context); + serverReady.countDown(); + + requestSent.await(); + while (getRequestQueueSize() < nrOfRequests) { + Thread.sleep(200); + } + } + }; + + final TestRunner runner = TestRunners.newTestRunner(processor); + runner.setProperty(HandleHttpRequest.PORT, "0"); + + final MockHttpContextMap contextMap = new MockHttpContextMap(); + runner.addControllerService("http-context-map", contextMap); + runner.enableControllerService(contextMap); + runner.setProperty(HandleHttpRequest.HTTP_CONTEXT_MAP, "http-context-map"); + + List responses = new ArrayList<>(nrOfRequests); + final Thread httpThread = new Thread(new Runnable() { + @Override + public void run() { + try { + serverReady.await(); + + final int port = ((HandleHttpRequest) runner.getProcessor()).getPort(); + + OkHttpClient client = + new OkHttpClient.Builder() + .readTimeout(3000, TimeUnit.MILLISECONDS) + .writeTimeout(3000, TimeUnit.MILLISECONDS) + .build(); + client.dispatcher().setMaxRequests(nrOfRequests); + client.dispatcher().setMaxRequestsPerHost(nrOfRequests); + + Callback callback = new Callback() { + @Override + public void onFailure(Call call, IOException e) { + // Will only happen once for the first non-rejected request, but not important + } + + @Override + public void onResponse(Call call, Response response) throws IOException { + responses.add(response); + cleanupDone.countDown(); + } + }; + IntStream.rangeClosed(1, nrOfRequests).forEach( + requestCounter -> { + Request request = new Request.Builder() + .url(String.format("http://localhost:%s/my/" + requestCounter , port)) + .get() + .build(); + sendRequest(client, request, callback, requestSent); + } + ); + } catch (final Throwable t) { + // Do nothing as HandleHttpRequest doesn't respond normally + } + } + }); + + // WHEN + httpThread.start(); + runner.run(1, false); + cleanupDone.await(); + + // THEN + int nrOfPendingRequests = processor.getRequestQueueSize(); + + runner.assertAllFlowFilesTransferred(HandleHttpRequest.REL_SUCCESS, 1); + + assertEquals(1, contextMap.size()); + assertEquals(0, nrOfPendingRequests); + assertEquals(responses.size(), nrOfRequests-1); + for (Response response : responses) { + assertEquals(HttpServletResponse.SC_SERVICE_UNAVAILABLE, response.code()); + assertTrue("Unexpected HTTP response for rejected requests", new String(response.body().bytes()).contains("Processor is shutting down")); } } @Test - public void testSecure() throws InitializationException { + public void testSecure() throws Exception { secureTest(false); } @Test - public void testSecureTwoWaySsl() throws InitializationException { + public void testSecureTwoWaySsl() throws Exception { secureTest(true); } - private void secureTest(boolean twoWaySsl) throws InitializationException { - final TestRunner runner = TestRunners.newTestRunner(HandleHttpRequest.class); + private void secureTest(boolean twoWaySsl) throws Exception { + CountDownLatch serverReady = new CountDownLatch(1); + CountDownLatch requestSent = new CountDownLatch(1); + + processor = createProcessor(serverReady, requestSent); + final TestRunner runner = TestRunners.newTestRunner(processor); runner.setProperty(HandleHttpRequest.PORT, "0"); final MockHttpContextMap contextMap = new MockHttpContextMap(); @@ -458,76 +564,117 @@ public class ITestHandleHttpRequest { sslProperties.put(StandardSSLContextService.SSL_ALGORITHM.getName(), "TLSv1.2"); useSSLContextService(runner, sslProperties, twoWaySsl ? SSLContextService.ClientAuth.WANT : SSLContextService.ClientAuth.NONE); - // trigger processor to stop but not shutdown. - runner.run(1, false); - try { - final Thread httpThread = new Thread(new Runnable() { - @Override - public void run() { - try { - final int port = ((HandleHttpRequest) runner.getProcessor()).getPort(); - final HttpsURLConnection connection = (HttpsURLConnection) new URL("https://localhost:" - + port + "/my/path?query=true&value1=value1&value2=&value3&value4=apple=orange").openConnection(); + final Thread httpThread = new Thread(new Runnable() { + @Override + public void run() { + try { + serverReady.await(); - if (twoWaySsl) { - // use a client certificate, do not reuse the server's keystore - SSLContext clientSslContext = SslContextFactory.createSslContext( - getClientKeystoreProperties().get(StandardSSLContextService.KEYSTORE.getName()), - getClientKeystoreProperties().get(StandardSSLContextService.KEYSTORE_PASSWORD.getName()).toCharArray(), - "JKS", - getTruststoreProperties().get(StandardSSLContextService.TRUSTSTORE.getName()), - getTruststoreProperties().get(StandardSSLContextService.TRUSTSTORE_PASSWORD.getName()).toCharArray(), - "JKS", - null, - "TLSv1.2"); - connection.setSSLSocketFactory(clientSslContext.getSocketFactory()); - } else { - // with one-way SSL, the client still needs a truststore - SSLContext clientSslContext = SslContextFactory.createTrustSslContext( - getTruststoreProperties().get(StandardSSLContextService.TRUSTSTORE.getName()), - getTruststoreProperties().get(StandardSSLContextService.TRUSTSTORE_PASSWORD.getName()).toCharArray(), - "JKS", - "TLSv1.2"); - connection.setSSLSocketFactory(clientSslContext.getSocketFactory()); - } - connection.setDoOutput(false); - connection.setRequestMethod("GET"); - connection.setRequestProperty("header1", "value1"); - connection.setRequestProperty("header2", ""); - connection.setRequestProperty("header3", "apple=orange"); - connection.setConnectTimeout(3000); - connection.setReadTimeout(3000); + final int port = ((HandleHttpRequest) runner.getProcessor()).getPort(); + final HttpsURLConnection connection = (HttpsURLConnection) new URL("https://localhost:" + + port + "/my/path?query=true&value1=value1&value2=&value3&value4=apple=orange").openConnection(); - StreamUtils.copy(connection.getInputStream(), new NullOutputStream()); - } catch (final Throwable t) { - t.printStackTrace(); - Assert.fail(t.toString()); + if (twoWaySsl) { + // use a client certificate, do not reuse the server's keystore + SSLContext clientSslContext = SslContextFactory.createSslContext( + getClientKeystoreProperties().get(StandardSSLContextService.KEYSTORE.getName()), + getClientKeystoreProperties().get(StandardSSLContextService.KEYSTORE_PASSWORD.getName()).toCharArray(), + "JKS", + getTruststoreProperties().get(StandardSSLContextService.TRUSTSTORE.getName()), + getTruststoreProperties().get(StandardSSLContextService.TRUSTSTORE_PASSWORD.getName()).toCharArray(), + "JKS", + null, + "TLSv1.2"); + connection.setSSLSocketFactory(clientSslContext.getSocketFactory()); + } else { + // with one-way SSL, the client still needs a truststore + SSLContext clientSslContext = SslContextFactory.createTrustSslContext( + getTruststoreProperties().get(StandardSSLContextService.TRUSTSTORE.getName()), + getTruststoreProperties().get(StandardSSLContextService.TRUSTSTORE_PASSWORD.getName()).toCharArray(), + "JKS", + "TLSv1.2"); + connection.setSSLSocketFactory(clientSslContext.getSocketFactory()); } - } - }); - httpThread.start(); + connection.setDoOutput(false); + connection.setRequestMethod("GET"); + connection.setRequestProperty("header1", "value1"); + connection.setRequestProperty("header2", ""); + connection.setRequestProperty("header3", "apple=orange"); + connection.setConnectTimeout(3000); + connection.setReadTimeout(3000); - while ( runner.getFlowFilesForRelationship(HandleHttpRequest.REL_SUCCESS).isEmpty() ) { - // process the request. - runner.run(1, false, false); + sendRequest(connection, requestSent); + } catch (final Throwable t) { + // Do nothing as HandleHttpRequest doesn't respond normally + } + } + }); + + httpThread.start(); + runner.run(1, false, false); + + runner.assertAllFlowFilesTransferred(HandleHttpRequest.REL_SUCCESS, 1); + assertEquals(1, contextMap.size()); + + final MockFlowFile mff = runner.getFlowFilesForRelationship(HandleHttpRequest.REL_SUCCESS).get(0); + mff.assertAttributeEquals("http.query.param.query", "true"); + mff.assertAttributeEquals("http.query.param.value1", "value1"); + mff.assertAttributeEquals("http.query.param.value2", ""); + mff.assertAttributeEquals("http.query.param.value3", ""); + mff.assertAttributeEquals("http.query.param.value4", "apple=orange"); + mff.assertAttributeEquals("http.headers.header1", "value1"); + mff.assertAttributeEquals("http.headers.header3", "apple=orange"); + mff.assertAttributeEquals("http.protocol", "HTTP/1.1"); + } + + private HandleHttpRequest createProcessor(CountDownLatch serverReady, CountDownLatch requestSent) { + return new HandleHttpRequest() { + @Override + synchronized void initializeServer(ProcessContext context) throws Exception { + super.initializeServer(context); + serverReady.countDown(); + + requestSent.await(); + while (getRequestQueueSize() == 0) { + Thread.sleep(200); + } } - runner.assertAllFlowFilesTransferred(HandleHttpRequest.REL_SUCCESS, 1); - assertEquals(1, contextMap.size()); + @Override + void rejectPendingRequests() { + // Skip this, otherwise it would wait to make sure there are no more requests + } + }; + } - final MockFlowFile mff = runner.getFlowFilesForRelationship(HandleHttpRequest.REL_SUCCESS).get(0); - mff.assertAttributeEquals("http.query.param.query", "true"); - mff.assertAttributeEquals("http.query.param.value1", "value1"); - mff.assertAttributeEquals("http.query.param.value2", ""); - mff.assertAttributeEquals("http.query.param.value3", ""); - mff.assertAttributeEquals("http.query.param.value4", "apple=orange"); - mff.assertAttributeEquals("http.headers.header1", "value1"); - mff.assertAttributeEquals("http.headers.header3", "apple=orange"); - mff.assertAttributeEquals("http.protocol", "HTTP/1.1"); - } finally { - // shut down the server - runner.run(1, true); - } + private void sendRequest(HttpURLConnection connection, CountDownLatch requestSent) throws Exception { + Future executionFuture = Executors.newSingleThreadExecutor() + .submit(() -> connection.getInputStream()); + + requestSent.countDown(); + + executionFuture.get(); + } + + private void sendRequest(OkHttpClient client, Request request, CountDownLatch requestSent) { + Callback callback = new Callback() { + @Override + public void onFailure(Call call, IOException e) { + // We (may) get a timeout as the processor doesn't answer unless there is some kind of error + } + + @Override + public void onResponse(Call call, Response response) throws IOException { + // Not called as the processor doesn't answer unless there is some kind of error + } + }; + + sendRequest(client, request, callback, requestSent); + } + + private void sendRequest(OkHttpClient client, Request request, Callback callback, CountDownLatch requestSent) { + client.newCall(request).enqueue(callback); + requestSent.countDown(); } private static class MockHttpContextMap extends AbstractControllerService implements HttpContextMap {