NIFI-7314 HandleHttpRequest stops Jetty in OnUnscheduled instead of OnStopped. Also reject pending request and clean their queue when shutting down.

NIFI-7314 In HandleHttpRequest returning 503 when rejecting pending requests before shutdown.
NIFI-7314 In HandleHttpRequest add logs and better response message during cleanup.

This closes #4191.

Signed-off-by: Peter Turcsanyi <turcsanyi@apache.org>
This commit is contained in:
Tamas Palfy 2020-04-07 15:35:20 +02:00 committed by Peter Turcsanyi
parent 445efcfde6
commit 09cece8e99
2 changed files with 394 additions and 207 deletions

View File

@ -25,7 +25,7 @@ import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
import org.apache.nifi.annotation.notification.OnPrimaryNodeStateChange;
import org.apache.nifi.annotation.notification.PrimaryNodeState;
import org.apache.nifi.components.AllowableValue;
@ -304,6 +304,7 @@ public class HandleHttpRequest extends AbstractProcessor {
}
private volatile Server server;
private volatile boolean ready;
private AtomicBoolean initialized = new AtomicBoolean(false);
private volatile BlockingQueue<HttpRequestContainer> containerQueue;
private AtomicBoolean runOnPrimary = new AtomicBoolean(false);
@ -323,7 +324,7 @@ public class HandleHttpRequest extends AbstractProcessor {
initialized.set(false);
}
private synchronized void initializeServer(final ProcessContext context) throws Exception {
synchronized void initializeServer(final ProcessContext context) throws Exception {
if(initialized.get()){
return;
}
@ -461,6 +462,12 @@ public class HandleHttpRequest extends AbstractProcessor {
response.sendError(HttpServletResponse.SC_SERVICE_UNAVAILABLE, "Processor queue is full");
return;
} else if (!ready) {
getLogger().warn("Request from {} cannot be processed, processor is being shut down; responding with SERVICE_UNAVAILABLE",
new Object[]{request.getRemoteAddr()});
response.sendError(HttpServletResponse.SC_SERVICE_UNAVAILABLE, "Processor is shutting down");
return;
}
// Right now, that information, though, is only in the ProcessSession, not the ProcessContext,
@ -491,6 +498,7 @@ public class HandleHttpRequest extends AbstractProcessor {
getLogger().info("Server started and listening on port " + getPort());
initialized.set(true);
ready = true;
}
protected int getPort() {
@ -535,10 +543,13 @@ public class HandleHttpRequest extends AbstractProcessor {
return sslFactory;
}
@OnStopped
@OnUnscheduled
public void shutdown() throws Exception {
ready = false;
if (server != null) {
getLogger().debug("Shutting down server");
rejectPendingRequests();
server.stop();
server.destroy();
server.join();
@ -547,6 +558,35 @@ public class HandleHttpRequest extends AbstractProcessor {
}
}
void rejectPendingRequests() {
HttpRequestContainer container;
while ((container = getNextContainer()) != null) {
try {
getLogger().warn("Rejecting request from {} during cleanup after processor shutdown; responding with SERVICE_UNAVAILABLE",
new Object[]{container.getRequest().getRemoteAddr()});
HttpServletResponse response = container.getResponse();
response.sendError(HttpServletResponse.SC_SERVICE_UNAVAILABLE, "Processor is shutting down");
container.getContext().complete();
} catch (final IOException e) {
getLogger().warn("Failed to send HTTP response to {} due to {}",
new Object[]{container.getRequest().getRemoteAddr(), e});
}
}
}
private HttpRequestContainer getNextContainer() {
HttpRequestContainer container;
try {
container = containerQueue.poll(2, TimeUnit.SECONDS);
} catch (final InterruptedException e) {
getLogger().warn("Interrupted while polling for " + HttpRequestContainer.class.getSimpleName() + " during cleanup.");
container = null;
}
return container;
}
@OnPrimaryNodeStateChange
public void onPrimaryNodeChange(final PrimaryNodeState newState) {
if (runOnPrimary.get() && newState.equals(PrimaryNodeState.PRIMARY_NODE_REVOKED)) {

View File

@ -20,6 +20,8 @@ import com.google.api.client.util.Charsets;
import com.google.common.base.Optional;
import com.google.common.collect.Iterables;
import com.google.common.io.Files;
import okhttp3.Call;
import okhttp3.Callback;
import okhttp3.MediaType;
import okhttp3.MultipartBody;
import okhttp3.OkHttpClient;
@ -28,17 +30,17 @@ import okhttp3.RequestBody;
import okhttp3.Response;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.http.HttpContextMap;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processors.standard.util.HTTPUtils;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.security.util.SslContextFactory;
import org.apache.nifi.ssl.SSLContextService;
import org.apache.nifi.ssl.StandardRestrictedSSLContextService;
import org.apache.nifi.ssl.StandardSSLContextService;
import org.apache.nifi.stream.io.NullOutputStream;
import org.apache.nifi.stream.io.StreamUtils;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
@ -49,22 +51,31 @@ import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.net.HttpURLConnection;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.IntStream;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
public class ITestHandleHttpRequest {
private HandleHttpRequest processor;
private static Map<String, String> getTruststoreProperties() {
final Map<String, String> props = new HashMap<>();
props.put(StandardSSLContextService.TRUSTSTORE.getName(), "src/test/resources/truststore.jks");
@ -103,9 +114,21 @@ public class ITestHandleHttpRequest {
return service.createSSLContext(clientAuth);
}
@After
public void tearDown() throws Exception {
if (processor != null) {
processor.shutdown();
}
}
@Test(timeout=30000)
public void testRequestAddedToService() throws InitializationException, MalformedURLException, IOException, InterruptedException {
final TestRunner runner = TestRunners.newTestRunner(HandleHttpRequest.class);
CountDownLatch serverReady = new CountDownLatch(1);
CountDownLatch requestSent = new CountDownLatch(1);
processor = createProcessor(serverReady, requestSent);
final TestRunner runner = TestRunners.newTestRunner(processor);
runner.setProperty(HandleHttpRequest.PORT, "0");
final MockHttpContextMap contextMap = new MockHttpContextMap();
@ -113,73 +136,65 @@ public class ITestHandleHttpRequest {
runner.enableControllerService(contextMap);
runner.setProperty(HandleHttpRequest.HTTP_CONTEXT_MAP, "http-context-map");
// trigger processor to stop but not shutdown.
runner.run(1, false);
try {
final Thread httpThread = new Thread(new Runnable() {
@Override
public void run() {
try {
final int port = ((HandleHttpRequest) runner.getProcessor()).getPort();
final HttpURLConnection connection = (HttpURLConnection) new URL("http://localhost:"
+ port + "/my/path?query=true&value1=value1&value2=&value3&value4=apple=orange").openConnection();
connection.setDoOutput(false);
connection.setRequestMethod("GET");
connection.setRequestProperty("header1", "value1");
connection.setRequestProperty("header2", "");
connection.setRequestProperty("header3", "apple=orange");
connection.setConnectTimeout(3000);
connection.setReadTimeout(3000);
final Thread httpThread = new Thread(new Runnable() {
@Override
public void run() {
try {
serverReady.await();
final int port = ((HandleHttpRequest) runner.getProcessor()).getPort();
final HttpURLConnection connection = (HttpURLConnection) new URL("http://localhost:"
+ port + "/my/path?query=true&value1=value1&value2=&value3&value4=apple=orange").openConnection();
StreamUtils.copy(connection.getInputStream(), new NullOutputStream());
} catch (final Throwable t) {
t.printStackTrace();
Assert.fail(t.toString());
}
connection.setDoOutput(false);
connection.setRequestMethod("GET");
connection.setRequestProperty("header1", "value1");
connection.setRequestProperty("header2", "");
connection.setRequestProperty("header3", "apple=orange");
connection.setConnectTimeout(30000);
connection.setReadTimeout(30000);
sendRequest(connection, requestSent);
} catch (final Throwable t) {
// Do nothing as HandleHttpRequest doesn't respond normally
}
});
httpThread.start();
while ( runner.getFlowFilesForRelationship(HandleHttpRequest.REL_SUCCESS).isEmpty() ) {
// process the request.
runner.run(1, false, false);
}
});
runner.assertAllFlowFilesTransferred(HandleHttpRequest.REL_SUCCESS, 1);
assertEquals(1, contextMap.size());
httpThread.start();
runner.run(1, false);
final MockFlowFile mff = runner.getFlowFilesForRelationship(HandleHttpRequest.REL_SUCCESS).get(0);
mff.assertAttributeEquals("http.query.param.query", "true");
mff.assertAttributeEquals("http.query.param.value1", "value1");
mff.assertAttributeEquals("http.query.param.value2", "");
mff.assertAttributeEquals("http.query.param.value3", "");
mff.assertAttributeEquals("http.query.param.value4", "apple=orange");
mff.assertAttributeEquals("http.headers.header1", "value1");
mff.assertAttributeEquals("http.headers.header3", "apple=orange");
} finally {
// shut down the server
runner.run(1, true);
}
runner.assertAllFlowFilesTransferred(HandleHttpRequest.REL_SUCCESS, 1);
assertEquals(1, contextMap.size());
final MockFlowFile mff = runner.getFlowFilesForRelationship(HandleHttpRequest.REL_SUCCESS).get(0);
mff.assertAttributeEquals("http.query.param.query", "true");
mff.assertAttributeEquals("http.query.param.value1", "value1");
mff.assertAttributeEquals("http.query.param.value2", "");
mff.assertAttributeEquals("http.query.param.value3", "");
mff.assertAttributeEquals("http.query.param.value4", "apple=orange");
mff.assertAttributeEquals("http.headers.header1", "value1");
mff.assertAttributeEquals("http.headers.header3", "apple=orange");
}
@Test(timeout=30000)
public void testMultipartFormDataRequest() throws InitializationException, MalformedURLException, IOException, InterruptedException {
final TestRunner runner = TestRunners.newTestRunner(HandleHttpRequest.class);
runner.setProperty(HandleHttpRequest.PORT, "0");
CountDownLatch serverReady = new CountDownLatch(1);
CountDownLatch requestSent = new CountDownLatch(1);
final MockHttpContextMap contextMap = new MockHttpContextMap();
runner.addControllerService("http-context-map", contextMap);
runner.enableControllerService(contextMap);
runner.setProperty(HandleHttpRequest.HTTP_CONTEXT_MAP, "http-context-map");
processor = createProcessor(serverReady, requestSent);
final TestRunner runner = TestRunners.newTestRunner(processor);
runner.setProperty(HandleHttpRequest.PORT, "0");
final MockHttpContextMap contextMap = new MockHttpContextMap();
runner.addControllerService("http-context-map", contextMap);
runner.enableControllerService(contextMap);
runner.setProperty(HandleHttpRequest.HTTP_CONTEXT_MAP, "http-context-map");
// trigger processor to stop but not shutdown.
runner.run(1, false);
try {
final Thread httpThread = new Thread(new Runnable() {
@Override
public void run() {
try {
serverReady.await();
final int port = ((HandleHttpRequest) runner.getProcessor()).getPort();
@ -202,21 +217,15 @@ public class ITestHandleHttpRequest {
.writeTimeout(3000, TimeUnit.MILLISECONDS)
.build();
try (Response response = client.newCall(request).execute()) {
Assert.assertTrue(String.format("Unexpected code: %s, body: %s", response.code(), response.body().string()), response.isSuccessful());
}
} catch (final Throwable t) {
t.printStackTrace();
Assert.fail(t.toString());
sendRequest(client, request, requestSent);
} catch (Exception e) {
// Do nothing as HandleHttpRequest doesn't respond normally
}
}
});
httpThread.start();
while ( runner.getFlowFilesForRelationship(HandleHttpRequest.REL_SUCCESS).isEmpty() ) {
// process the request.
runner.run(1, false, false);
}
httpThread.start();
runner.run(1, false, false);
runner.assertAllFlowFilesTransferred(HandleHttpRequest.REL_SUCCESS, 5);
assertEquals(1, contextMap.size());
@ -275,31 +284,30 @@ public class ITestHandleHttpRequest {
mff.assertAttributeExists("http.multipart.fragments.sequence.number");
mff.assertAttributeEquals("http.multipart.fragments.total.number", "5");
mff.assertAttributeExists("http.headers.multipart.content-disposition");
} finally {
// shut down the server
runner.run(1, true);
}
}
@Test(timeout=30000)
public void testMultipartFormDataRequestFailToRegisterContext() throws InitializationException, MalformedURLException, IOException, InterruptedException {
final TestRunner runner = TestRunners.newTestRunner(HandleHttpRequest.class);
runner.setProperty(HandleHttpRequest.PORT, "0");
CountDownLatch serverReady = new CountDownLatch(1);
CountDownLatch requestSent = new CountDownLatch(1);
CountDownLatch resultReady = new CountDownLatch(1);
final MockHttpContextMap contextMap = new MockHttpContextMap();
contextMap.setRegisterSuccessfully(false);
runner.addControllerService("http-context-map", contextMap);
runner.enableControllerService(contextMap);
runner.setProperty(HandleHttpRequest.HTTP_CONTEXT_MAP, "http-context-map");
processor = createProcessor(serverReady, requestSent);
final TestRunner runner = TestRunners.newTestRunner(processor);
runner.setProperty(HandleHttpRequest.PORT, "0");
final MockHttpContextMap contextMap = new MockHttpContextMap();
contextMap.setRegisterSuccessfully(false);
runner.addControllerService("http-context-map", contextMap);
runner.enableControllerService(contextMap);
runner.setProperty(HandleHttpRequest.HTTP_CONTEXT_MAP, "http-context-map");
// trigger processor to stop but not shutdown.
runner.run(1, false);
try {
AtomicInteger responseCode = new AtomicInteger(0);
final Thread httpThread = new Thread(new Runnable() {
@Override
public void run() {
try {
serverReady.await();
final int port = ((HandleHttpRequest) runner.getProcessor()).getPort();
@ -322,29 +330,32 @@ public class ITestHandleHttpRequest {
.writeTimeout(20000, TimeUnit.MILLISECONDS)
.build();
try (Response response = client.newCall(request).execute()) {
responseCode.set(response.code());
}
Callback callback = new Callback() {
@Override
public void onFailure(Call call, IOException e) {
// Not going to happen
}
@Override
public void onResponse(Call call, Response response) throws IOException {
responseCode.set(response.code());
resultReady.countDown();
}
};
sendRequest(client, request, callback, requestSent);
} catch (final Throwable t) {
t.printStackTrace();
Assert.fail(t.toString());
// Do nothing as HandleHttpRequest doesn't respond normally
}
}
});
httpThread.start();
while (responseCode.get() == 0) {
// process the request.
runner.run(1, false, false);
}
httpThread.start();
runner.run(1, false, false);
resultReady.await();
runner.assertAllFlowFilesTransferred(HandleHttpRequest.REL_SUCCESS, 0);
assertEquals(0, contextMap.size());
Assert.assertEquals(503, responseCode.get());
} finally {
// shut down the server
runner.run(1, true);
}
}
private byte[] generateRandomBinaryData(int i) {
@ -373,7 +384,12 @@ public class ITestHandleHttpRequest {
@Test(timeout=30000)
public void testFailToRegister() throws InitializationException, MalformedURLException, IOException, InterruptedException {
final TestRunner runner = TestRunners.newTestRunner(HandleHttpRequest.class);
CountDownLatch serverReady = new CountDownLatch(1);
CountDownLatch requestSent = new CountDownLatch(1);
CountDownLatch resultReady = new CountDownLatch(1);
processor = createProcessor(serverReady, requestSent);
final TestRunner runner = TestRunners.newTestRunner(processor);
runner.setProperty(HandleHttpRequest.PORT, "0");
final MockHttpContextMap contextMap = new MockHttpContextMap();
@ -382,70 +398,160 @@ public class ITestHandleHttpRequest {
runner.setProperty(HandleHttpRequest.HTTP_CONTEXT_MAP, "http-context-map");
contextMap.setRegisterSuccessfully(false);
// trigger processor to stop but not shutdown.
runner.run(1, false);
try {
final int[] responseCode = new int[1];
responseCode[0] = 0;
final Thread httpThread = new Thread(new Runnable() {
@Override
public void run() {
HttpURLConnection connection = null;
try {
final int port = ((HandleHttpRequest) runner.getProcessor()).getPort();
connection = (HttpURLConnection) new URL("http://localhost:"
+ port + "/my/path?query=true&value1=value1&value2=&value3&value4=apple=orange").openConnection();
connection.setDoOutput(false);
connection.setRequestMethod("GET");
connection.setRequestProperty("header1", "value1");
connection.setRequestProperty("header2", "");
connection.setRequestProperty("header3", "apple=orange");
connection.setConnectTimeout(20000);
connection.setReadTimeout(20000);
final int[] responseCode = new int[1];
responseCode[0] = 0;
final Thread httpThread = new Thread(new Runnable() {
@Override
public void run() {
HttpURLConnection connection = null;
try {
serverReady.await();
StreamUtils.copy(connection.getInputStream(), new NullOutputStream());
} catch (final Throwable t) {
t.printStackTrace();
if(connection != null ) {
try {
responseCode[0] = connection.getResponseCode();
} catch (IOException e) {
responseCode[0] = -1;
}
} else {
responseCode[0] = -2;
final int port = ((HandleHttpRequest) runner.getProcessor()).getPort();
connection = (HttpURLConnection) new URL("http://localhost:"
+ port + "/my/path?query=true&value1=value1&value2=&value3&value4=apple=orange").openConnection();
connection.setDoOutput(false);
connection.setRequestMethod("GET");
connection.setRequestProperty("header1", "value1");
connection.setRequestProperty("header2", "");
connection.setRequestProperty("header3", "apple=orange");
connection.setConnectTimeout(20000);
connection.setReadTimeout(20000);
sendRequest(connection, requestSent);
} catch (final Throwable t) {
if(connection != null ) {
try {
responseCode[0] = connection.getResponseCode();
} catch (IOException e) {
responseCode[0] = -1;
}
} else {
responseCode[0] = -2;
}
} finally {
resultReady.countDown();
}
});
httpThread.start();
while (responseCode[0] == 0) {
// process the request.
runner.run(1, false, false);
}
});
runner.assertTransferCount(HandleHttpRequest.REL_SUCCESS, 0);
assertEquals(503, responseCode[0]);
httpThread.start();
runner.run(1, false, false);
resultReady.await();
} finally {
// shut down the server
runner.run(1, true);
runner.assertTransferCount(HandleHttpRequest.REL_SUCCESS, 0);
assertEquals(503, responseCode[0]);
}
@Test
public void testCleanup() throws Exception {
// GIVEN
int nrOfRequests = 5;
CountDownLatch serverReady = new CountDownLatch(1);
CountDownLatch requestSent = new CountDownLatch(nrOfRequests);
CountDownLatch cleanupDone = new CountDownLatch(nrOfRequests-1);
processor = new HandleHttpRequest() {
@Override
synchronized void initializeServer(ProcessContext context) throws Exception {
super.initializeServer(context);
serverReady.countDown();
requestSent.await();
while (getRequestQueueSize() < nrOfRequests) {
Thread.sleep(200);
}
}
};
final TestRunner runner = TestRunners.newTestRunner(processor);
runner.setProperty(HandleHttpRequest.PORT, "0");
final MockHttpContextMap contextMap = new MockHttpContextMap();
runner.addControllerService("http-context-map", contextMap);
runner.enableControllerService(contextMap);
runner.setProperty(HandleHttpRequest.HTTP_CONTEXT_MAP, "http-context-map");
List<Response> responses = new ArrayList<>(nrOfRequests);
final Thread httpThread = new Thread(new Runnable() {
@Override
public void run() {
try {
serverReady.await();
final int port = ((HandleHttpRequest) runner.getProcessor()).getPort();
OkHttpClient client =
new OkHttpClient.Builder()
.readTimeout(3000, TimeUnit.MILLISECONDS)
.writeTimeout(3000, TimeUnit.MILLISECONDS)
.build();
client.dispatcher().setMaxRequests(nrOfRequests);
client.dispatcher().setMaxRequestsPerHost(nrOfRequests);
Callback callback = new Callback() {
@Override
public void onFailure(Call call, IOException e) {
// Will only happen once for the first non-rejected request, but not important
}
@Override
public void onResponse(Call call, Response response) throws IOException {
responses.add(response);
cleanupDone.countDown();
}
};
IntStream.rangeClosed(1, nrOfRequests).forEach(
requestCounter -> {
Request request = new Request.Builder()
.url(String.format("http://localhost:%s/my/" + requestCounter , port))
.get()
.build();
sendRequest(client, request, callback, requestSent);
}
);
} catch (final Throwable t) {
// Do nothing as HandleHttpRequest doesn't respond normally
}
}
});
// WHEN
httpThread.start();
runner.run(1, false);
cleanupDone.await();
// THEN
int nrOfPendingRequests = processor.getRequestQueueSize();
runner.assertAllFlowFilesTransferred(HandleHttpRequest.REL_SUCCESS, 1);
assertEquals(1, contextMap.size());
assertEquals(0, nrOfPendingRequests);
assertEquals(responses.size(), nrOfRequests-1);
for (Response response : responses) {
assertEquals(HttpServletResponse.SC_SERVICE_UNAVAILABLE, response.code());
assertTrue("Unexpected HTTP response for rejected requests", new String(response.body().bytes()).contains("Processor is shutting down"));
}
}
@Test
public void testSecure() throws InitializationException {
public void testSecure() throws Exception {
secureTest(false);
}
@Test
public void testSecureTwoWaySsl() throws InitializationException {
public void testSecureTwoWaySsl() throws Exception {
secureTest(true);
}
private void secureTest(boolean twoWaySsl) throws InitializationException {
final TestRunner runner = TestRunners.newTestRunner(HandleHttpRequest.class);
private void secureTest(boolean twoWaySsl) throws Exception {
CountDownLatch serverReady = new CountDownLatch(1);
CountDownLatch requestSent = new CountDownLatch(1);
processor = createProcessor(serverReady, requestSent);
final TestRunner runner = TestRunners.newTestRunner(processor);
runner.setProperty(HandleHttpRequest.PORT, "0");
final MockHttpContextMap contextMap = new MockHttpContextMap();
@ -458,76 +564,117 @@ public class ITestHandleHttpRequest {
sslProperties.put(StandardSSLContextService.SSL_ALGORITHM.getName(), "TLSv1.2");
useSSLContextService(runner, sslProperties, twoWaySsl ? SSLContextService.ClientAuth.WANT : SSLContextService.ClientAuth.NONE);
// trigger processor to stop but not shutdown.
runner.run(1, false);
try {
final Thread httpThread = new Thread(new Runnable() {
@Override
public void run() {
try {
final int port = ((HandleHttpRequest) runner.getProcessor()).getPort();
final HttpsURLConnection connection = (HttpsURLConnection) new URL("https://localhost:"
+ port + "/my/path?query=true&value1=value1&value2=&value3&value4=apple=orange").openConnection();
final Thread httpThread = new Thread(new Runnable() {
@Override
public void run() {
try {
serverReady.await();
if (twoWaySsl) {
// use a client certificate, do not reuse the server's keystore
SSLContext clientSslContext = SslContextFactory.createSslContext(
getClientKeystoreProperties().get(StandardSSLContextService.KEYSTORE.getName()),
getClientKeystoreProperties().get(StandardSSLContextService.KEYSTORE_PASSWORD.getName()).toCharArray(),
"JKS",
getTruststoreProperties().get(StandardSSLContextService.TRUSTSTORE.getName()),
getTruststoreProperties().get(StandardSSLContextService.TRUSTSTORE_PASSWORD.getName()).toCharArray(),
"JKS",
null,
"TLSv1.2");
connection.setSSLSocketFactory(clientSslContext.getSocketFactory());
} else {
// with one-way SSL, the client still needs a truststore
SSLContext clientSslContext = SslContextFactory.createTrustSslContext(
getTruststoreProperties().get(StandardSSLContextService.TRUSTSTORE.getName()),
getTruststoreProperties().get(StandardSSLContextService.TRUSTSTORE_PASSWORD.getName()).toCharArray(),
"JKS",
"TLSv1.2");
connection.setSSLSocketFactory(clientSslContext.getSocketFactory());
}
connection.setDoOutput(false);
connection.setRequestMethod("GET");
connection.setRequestProperty("header1", "value1");
connection.setRequestProperty("header2", "");
connection.setRequestProperty("header3", "apple=orange");
connection.setConnectTimeout(3000);
connection.setReadTimeout(3000);
final int port = ((HandleHttpRequest) runner.getProcessor()).getPort();
final HttpsURLConnection connection = (HttpsURLConnection) new URL("https://localhost:"
+ port + "/my/path?query=true&value1=value1&value2=&value3&value4=apple=orange").openConnection();
StreamUtils.copy(connection.getInputStream(), new NullOutputStream());
} catch (final Throwable t) {
t.printStackTrace();
Assert.fail(t.toString());
if (twoWaySsl) {
// use a client certificate, do not reuse the server's keystore
SSLContext clientSslContext = SslContextFactory.createSslContext(
getClientKeystoreProperties().get(StandardSSLContextService.KEYSTORE.getName()),
getClientKeystoreProperties().get(StandardSSLContextService.KEYSTORE_PASSWORD.getName()).toCharArray(),
"JKS",
getTruststoreProperties().get(StandardSSLContextService.TRUSTSTORE.getName()),
getTruststoreProperties().get(StandardSSLContextService.TRUSTSTORE_PASSWORD.getName()).toCharArray(),
"JKS",
null,
"TLSv1.2");
connection.setSSLSocketFactory(clientSslContext.getSocketFactory());
} else {
// with one-way SSL, the client still needs a truststore
SSLContext clientSslContext = SslContextFactory.createTrustSslContext(
getTruststoreProperties().get(StandardSSLContextService.TRUSTSTORE.getName()),
getTruststoreProperties().get(StandardSSLContextService.TRUSTSTORE_PASSWORD.getName()).toCharArray(),
"JKS",
"TLSv1.2");
connection.setSSLSocketFactory(clientSslContext.getSocketFactory());
}
}
});
httpThread.start();
connection.setDoOutput(false);
connection.setRequestMethod("GET");
connection.setRequestProperty("header1", "value1");
connection.setRequestProperty("header2", "");
connection.setRequestProperty("header3", "apple=orange");
connection.setConnectTimeout(3000);
connection.setReadTimeout(3000);
while ( runner.getFlowFilesForRelationship(HandleHttpRequest.REL_SUCCESS).isEmpty() ) {
// process the request.
runner.run(1, false, false);
sendRequest(connection, requestSent);
} catch (final Throwable t) {
// Do nothing as HandleHttpRequest doesn't respond normally
}
}
});
httpThread.start();
runner.run(1, false, false);
runner.assertAllFlowFilesTransferred(HandleHttpRequest.REL_SUCCESS, 1);
assertEquals(1, contextMap.size());
final MockFlowFile mff = runner.getFlowFilesForRelationship(HandleHttpRequest.REL_SUCCESS).get(0);
mff.assertAttributeEquals("http.query.param.query", "true");
mff.assertAttributeEquals("http.query.param.value1", "value1");
mff.assertAttributeEquals("http.query.param.value2", "");
mff.assertAttributeEquals("http.query.param.value3", "");
mff.assertAttributeEquals("http.query.param.value4", "apple=orange");
mff.assertAttributeEquals("http.headers.header1", "value1");
mff.assertAttributeEquals("http.headers.header3", "apple=orange");
mff.assertAttributeEquals("http.protocol", "HTTP/1.1");
}
private HandleHttpRequest createProcessor(CountDownLatch serverReady, CountDownLatch requestSent) {
return new HandleHttpRequest() {
@Override
synchronized void initializeServer(ProcessContext context) throws Exception {
super.initializeServer(context);
serverReady.countDown();
requestSent.await();
while (getRequestQueueSize() == 0) {
Thread.sleep(200);
}
}
runner.assertAllFlowFilesTransferred(HandleHttpRequest.REL_SUCCESS, 1);
assertEquals(1, contextMap.size());
@Override
void rejectPendingRequests() {
// Skip this, otherwise it would wait to make sure there are no more requests
}
};
}
final MockFlowFile mff = runner.getFlowFilesForRelationship(HandleHttpRequest.REL_SUCCESS).get(0);
mff.assertAttributeEquals("http.query.param.query", "true");
mff.assertAttributeEquals("http.query.param.value1", "value1");
mff.assertAttributeEquals("http.query.param.value2", "");
mff.assertAttributeEquals("http.query.param.value3", "");
mff.assertAttributeEquals("http.query.param.value4", "apple=orange");
mff.assertAttributeEquals("http.headers.header1", "value1");
mff.assertAttributeEquals("http.headers.header3", "apple=orange");
mff.assertAttributeEquals("http.protocol", "HTTP/1.1");
} finally {
// shut down the server
runner.run(1, true);
}
private void sendRequest(HttpURLConnection connection, CountDownLatch requestSent) throws Exception {
Future<InputStream> executionFuture = Executors.newSingleThreadExecutor()
.submit(() -> connection.getInputStream());
requestSent.countDown();
executionFuture.get();
}
private void sendRequest(OkHttpClient client, Request request, CountDownLatch requestSent) {
Callback callback = new Callback() {
@Override
public void onFailure(Call call, IOException e) {
// We (may) get a timeout as the processor doesn't answer unless there is some kind of error
}
@Override
public void onResponse(Call call, Response response) throws IOException {
// Not called as the processor doesn't answer unless there is some kind of error
}
};
sendRequest(client, request, callback, requestSent);
}
private void sendRequest(OkHttpClient client, Request request, Callback callback, CountDownLatch requestSent) {
client.newCall(request).enqueue(callback);
requestSent.countDown();
}
private static class MockHttpContextMap extends AbstractControllerService implements HttpContextMap {