NIFI-8120 Added RuntimeException handling on HttpContextMap.complete()

NIFI-8120 Renamed exception variable and reordered log statements

This closes #4747.

Signed-off-by: Peter Turcsanyi <turcsanyi@apache.org>
This commit is contained in:
exceptionfactory 2021-01-07 10:15:44 -05:00 committed by Peter Turcsanyi
parent f330078fff
commit a4027e8e77
2 changed files with 113 additions and 82 deletions

View File

@ -65,8 +65,6 @@ import org.apache.nifi.util.StopWatch;
@SeeAlso(value = {HandleHttpRequest.class}, classNames = {"org.apache.nifi.http.StandardHttpContextMap", "org.apache.nifi.ssl.StandardSSLContextService"})
public class HandleHttpResponse extends AbstractProcessor {
public static final Pattern NUMBER_PATTERN = Pattern.compile("[0-9]+");
public static final PropertyDescriptor STATUS_CODE = new PropertyDescriptor.Builder()
.name("HTTP Status Code")
.description("The HTTP Status Code to use when responding to the HTTP Request. See Section 10 of RFC 2616 for more information.")
@ -136,25 +134,25 @@ public class HandleHttpResponse extends AbstractProcessor {
final String contextIdentifier = flowFile.getAttribute(HTTPUtils.HTTP_CONTEXT_ID);
if (contextIdentifier == null) {
session.transfer(flowFile, REL_FAILURE);
getLogger().warn("Failed to respond to HTTP request for {} because FlowFile did not have an '" + HTTPUtils.HTTP_CONTEXT_ID + "' attribute",
new Object[]{flowFile});
session.transfer(flowFile, REL_FAILURE);
return;
}
final String statusCodeValue = context.getProperty(STATUS_CODE).evaluateAttributeExpressions(flowFile).getValue();
if (!isNumber(statusCodeValue)) {
session.transfer(flowFile, REL_FAILURE);
getLogger().error("Failed to respond to HTTP request for {} because status code was '{}', which is not a valid number", new Object[]{flowFile, statusCodeValue});
session.transfer(flowFile, REL_FAILURE);
return;
}
final HttpContextMap contextMap = context.getProperty(HTTP_CONTEXT_MAP).asControllerService(HttpContextMap.class);
final HttpServletResponse response = contextMap.getResponse(contextIdentifier);
if (response == null) {
session.transfer(flowFile, REL_FAILURE);
getLogger().error("Failed to respond to HTTP request for {} because FlowFile had an '{}' attribute of {} but could not find an HTTP Response Object for this identifier",
new Object[]{flowFile, HTTPUtils.HTTP_CONTEXT_ID, contextIdentifier});
session.transfer(flowFile, REL_FAILURE);
return;
}
@ -192,27 +190,31 @@ public class HandleHttpResponse extends AbstractProcessor {
session.exportTo(flowFile, response.getOutputStream());
response.flushBuffer();
} catch (final ProcessException e) {
session.transfer(flowFile, REL_FAILURE);
getLogger().error("Failed to respond to HTTP request for {} due to {}", new Object[]{flowFile, e});
contextMap.complete(contextIdentifier);
try {
contextMap.complete(contextIdentifier);
} catch (final RuntimeException ce) {
getLogger().error("Failed to complete HTTP Transaction for {} due to {}", new Object[]{flowFile, ce});
}
session.transfer(flowFile, REL_FAILURE);
return;
} catch (final Exception e) {
session.transfer(flowFile, REL_FAILURE);
getLogger().error("Failed to respond to HTTP request for {} due to {}", new Object[]{flowFile, e});
session.transfer(flowFile, REL_FAILURE);
return;
}
try {
contextMap.complete(contextIdentifier);
} catch (final IllegalStateException ise) {
getLogger().error("Failed to complete HTTP Transaction for {} due to {}", new Object[]{flowFile, ise});
} catch (final RuntimeException ce) {
getLogger().error("Failed to complete HTTP Transaction for {} due to {}", new Object[]{flowFile, ce});
session.transfer(flowFile, REL_FAILURE);
return;
}
session.getProvenanceReporter().send(flowFile, HTTPUtils.getURI(flowFile.getAttributes()), stopWatch.getElapsed(TimeUnit.MILLISECONDS));
session.transfer(flowFile, REL_SUCCESS);
getLogger().info("Successfully responded to HTTP Request for {} with status code {}", new Object[]{flowFile, statusCode});
session.transfer(flowFile, REL_SUCCESS);
}
private static boolean isNumber(final String value) {

View File

@ -49,46 +49,52 @@ import org.apache.nifi.util.TestRunners;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
public class TestHandleHttpResponse {
private static final String CONTEXT_MAP_ID = MockHttpContextMap.class.getSimpleName();
private static final String HTTP_REQUEST_ID = "HTTP-Request-Identifier";
private static final int HTTP_STATUS_CREATED = HttpServletResponse.SC_CREATED;
private static final String FLOW_FILE_CONTENT = "TESTING";
@Test
public void testEnsureCompleted() throws InitializationException {
final TestRunner runner = TestRunners.newTestRunner(HandleHttpResponse.class);
final MockHttpContextMap contextMap = new MockHttpContextMap("my-id", "");
runner.addControllerService("http-context-map", contextMap);
final MockHttpContextMap contextMap = new MockHttpContextMap(HTTP_REQUEST_ID, null, null);
runner.addControllerService(CONTEXT_MAP_ID, contextMap);
runner.enableControllerService(contextMap);
runner.setProperty(HandleHttpResponse.HTTP_CONTEXT_MAP, "http-context-map");
runner.setProperty(HandleHttpResponse.HTTP_CONTEXT_MAP, CONTEXT_MAP_ID);
runner.setProperty(HandleHttpResponse.STATUS_CODE, "${status.code}");
runner.setProperty("my-attr", "${my-attr}");
runner.setProperty("no-valid-attr", "${no-valid-attr}");
final Map<String, String> attributes = new HashMap<>();
attributes.put(HTTPUtils.HTTP_CONTEXT_ID, "my-id");
attributes.put(HTTPUtils.HTTP_CONTEXT_ID, HTTP_REQUEST_ID);
attributes.put(HTTPUtils.HTTP_REQUEST_URI, "/test");
attributes.put(HTTPUtils.HTTP_LOCAL_NAME, "server");
attributes.put(HTTPUtils.HTTP_PORT, "8443");
attributes.put(HTTPUtils.HTTP_REMOTE_HOST, "client");
attributes.put(HTTPUtils.HTTP_SSL_CERT, "sslDN");
attributes.put("my-attr", "hello");
attributes.put("status.code", "201");
attributes.put("status.code", Integer.toString(HTTP_STATUS_CREATED));
runner.enqueue("hello".getBytes(), attributes);
runner.enqueue(FLOW_FILE_CONTENT.getBytes(), attributes);
runner.run();
runner.assertAllFlowFilesTransferred(HandleHttpResponse.REL_SUCCESS, 1);
assertTrue(runner.getProvenanceEvents().size() == 1);
assertEquals(1, runner.getProvenanceEvents().size());
assertEquals(ProvenanceEventType.SEND, runner.getProvenanceEvents().get(0).getEventType());
assertEquals("https://client@server:8443/test", runner.getProvenanceEvents().get(0).getTransitUri());
assertEquals("hello", contextMap.baos.toString());
assertEquals(FLOW_FILE_CONTENT, contextMap.outputStream.toString());
assertEquals("hello", contextMap.headersSent.get("my-attr"));
assertNull(contextMap.headersSent.get("no-valid-attr"));
assertEquals(201, contextMap.statusCode);
assertEquals(HTTP_STATUS_CREATED, contextMap.statusCode);
assertEquals(1, contextMap.getCompletionCount());
assertTrue(contextMap.headersWithNoValue.isEmpty());
}
@ -97,15 +103,15 @@ public class TestHandleHttpResponse {
public void testRegexHeaders() throws InitializationException {
final TestRunner runner = TestRunners.newTestRunner(HandleHttpResponse.class);
final MockHttpContextMap contextMap = new MockHttpContextMap("my-id", "");
runner.addControllerService("http-context-map", contextMap);
final MockHttpContextMap contextMap = new MockHttpContextMap(HTTP_REQUEST_ID, null, null);
runner.addControllerService(CONTEXT_MAP_ID, contextMap);
runner.enableControllerService(contextMap);
runner.setProperty(HandleHttpResponse.HTTP_CONTEXT_MAP, "http-context-map");
runner.setProperty(HandleHttpResponse.HTTP_CONTEXT_MAP, CONTEXT_MAP_ID);
runner.setProperty(HandleHttpResponse.STATUS_CODE, "${status.code}");
runner.setProperty(HandleHttpResponse.ATTRIBUTES_AS_HEADERS_REGEX, "^(my.*)$");
final Map<String, String> attributes = new HashMap<>();
attributes.put(HTTPUtils.HTTP_CONTEXT_ID, "my-id");
attributes.put(HTTPUtils.HTTP_CONTEXT_ID, HTTP_REQUEST_ID);
attributes.put(HTTPUtils.HTTP_REQUEST_URI, "/test");
attributes.put(HTTPUtils.HTTP_LOCAL_NAME, "server");
attributes.put(HTTPUtils.HTTP_PORT, "8443");
@ -113,43 +119,43 @@ public class TestHandleHttpResponse {
attributes.put(HTTPUtils.HTTP_SSL_CERT, "sslDN");
attributes.put("my-attr", "hello");
attributes.put("my-blank-attr", "");
attributes.put("status.code", "201");
attributes.put("status.code", Integer.toString(HTTP_STATUS_CREATED));
runner.enqueue("hello".getBytes(), attributes);
runner.enqueue(FLOW_FILE_CONTENT.getBytes(), attributes);
runner.run();
runner.assertAllFlowFilesTransferred(HandleHttpResponse.REL_SUCCESS, 1);
assertTrue(runner.getProvenanceEvents().size() == 1);
assertEquals(1, runner.getProvenanceEvents().size());
assertEquals(ProvenanceEventType.SEND, runner.getProvenanceEvents().get(0).getEventType());
assertEquals("https://client@server:8443/test", runner.getProvenanceEvents().get(0).getTransitUri());
assertEquals("hello", contextMap.baos.toString());
assertEquals(FLOW_FILE_CONTENT, contextMap.outputStream.toString());
assertEquals("hello", contextMap.headersSent.get("my-attr"));
assertNull(contextMap.headersSent.get("my-blank-attr"));
assertEquals(201, contextMap.statusCode);
assertEquals(HTTP_STATUS_CREATED, contextMap.statusCode);
assertEquals(1, contextMap.getCompletionCount());
assertTrue(contextMap.headersWithNoValue.isEmpty());
}
@Test
public void testWithExceptionThrown() throws InitializationException {
public void testResponseFlowFileAccessException() throws InitializationException {
final TestRunner runner = TestRunners.newTestRunner(HandleHttpResponse.class);
final MockHttpContextMap contextMap = new MockHttpContextMap("my-id", "FlowFileAccessException");
runner.addControllerService("http-context-map", contextMap);
final MockHttpContextMap contextMap = new MockHttpContextMap(HTTP_REQUEST_ID, new FlowFileAccessException("Access Problem"), null);
runner.addControllerService(CONTEXT_MAP_ID, contextMap);
runner.enableControllerService(contextMap);
runner.setProperty(HandleHttpResponse.HTTP_CONTEXT_MAP, "http-context-map");
runner.setProperty(HandleHttpResponse.HTTP_CONTEXT_MAP, CONTEXT_MAP_ID);
runner.setProperty(HandleHttpResponse.STATUS_CODE, "${status.code}");
runner.setProperty("my-attr", "${my-attr}");
runner.setProperty("no-valid-attr", "${no-valid-attr}");
final Map<String, String> attributes = new HashMap<>();
attributes.put(HTTPUtils.HTTP_CONTEXT_ID, "my-id");
attributes.put(HTTPUtils.HTTP_CONTEXT_ID, HTTP_REQUEST_ID);
attributes.put("my-attr", "hello");
attributes.put("status.code", "201");
attributes.put("status.code", Integer.toString(HTTP_STATUS_CREATED));
runner.enqueue("hello".getBytes(), attributes);
runner.enqueue(FLOW_FILE_CONTENT.getBytes(), attributes);
runner.run();
@ -158,23 +164,23 @@ public class TestHandleHttpResponse {
}
@Test
public void testCannotWriteResponse() throws InitializationException {
public void testResponseProcessException() throws InitializationException {
final TestRunner runner = TestRunners.newTestRunner(HandleHttpResponse.class);
final MockHttpContextMap contextMap = new MockHttpContextMap("my-id", "ProcessException");
runner.addControllerService("http-context-map", contextMap);
final MockHttpContextMap contextMap = new MockHttpContextMap(HTTP_REQUEST_ID, new ProcessException(), null);
runner.addControllerService(CONTEXT_MAP_ID, contextMap);
runner.enableControllerService(contextMap);
runner.setProperty(HandleHttpResponse.HTTP_CONTEXT_MAP, "http-context-map");
runner.setProperty(HandleHttpResponse.HTTP_CONTEXT_MAP, CONTEXT_MAP_ID);
runner.setProperty(HandleHttpResponse.STATUS_CODE, "${status.code}");
runner.setProperty("my-attr", "${my-attr}");
runner.setProperty("no-valid-attr", "${no-valid-attr}");
final Map<String, String> attributes = new HashMap<>();
attributes.put(HTTPUtils.HTTP_CONTEXT_ID, "my-id");
attributes.put(HTTPUtils.HTTP_CONTEXT_ID, HTTP_REQUEST_ID);
attributes.put("my-attr", "hello");
attributes.put("status.code", "201");
attributes.put("status.code", Integer.toString(HTTP_STATUS_CREATED));
runner.enqueue("hello".getBytes(), attributes);
runner.enqueue(FLOW_FILE_CONTENT.getBytes(), attributes);
runner.run();
@ -182,21 +188,46 @@ public class TestHandleHttpResponse {
assertEquals(1, contextMap.getCompletionCount());
}
@Test
public void testResponseProcessExceptionThenIllegalStateException() throws InitializationException {
final TestRunner runner = TestRunners.newTestRunner(HandleHttpResponse.class);
final MockHttpContextMap contextMap = new MockHttpContextMap(HTTP_REQUEST_ID, new ProcessException(), new IllegalStateException());
runner.addControllerService(CONTEXT_MAP_ID, contextMap);
runner.enableControllerService(contextMap);
runner.setProperty(HandleHttpResponse.HTTP_CONTEXT_MAP, CONTEXT_MAP_ID);
runner.setProperty(HandleHttpResponse.STATUS_CODE, "${status.code}");
runner.setProperty("my-attr", "${my-attr}");
runner.setProperty("no-valid-attr", "${no-valid-attr}");
final Map<String, String> attributes = new HashMap<>();
attributes.put(HTTPUtils.HTTP_CONTEXT_ID, HTTP_REQUEST_ID);
attributes.put("my-attr", "hello");
attributes.put("status.code", Integer.toString(HTTP_STATUS_CREATED));
runner.enqueue(FLOW_FILE_CONTENT.getBytes(), attributes);
runner.run();
runner.assertAllFlowFilesTransferred(HandleHttpResponse.REL_FAILURE, 1);
assertEquals(0, contextMap.getCompletionCount());
}
@Test
public void testStatusCodeEmpty() throws InitializationException {
final TestRunner runner = TestRunners.newTestRunner(HandleHttpResponse.class);
final MockHttpContextMap contextMap = new MockHttpContextMap("my-id", "");
runner.addControllerService("http-context-map", contextMap);
final MockHttpContextMap contextMap = new MockHttpContextMap(HTTP_REQUEST_ID, null, null);
runner.addControllerService(CONTEXT_MAP_ID, contextMap);
runner.enableControllerService(contextMap);
runner.setProperty(HandleHttpResponse.HTTP_CONTEXT_MAP, "http-context-map");
runner.setProperty(HandleHttpResponse.HTTP_CONTEXT_MAP, CONTEXT_MAP_ID);
runner.setProperty(HandleHttpResponse.STATUS_CODE, "${status.code}");
final Map<String, String> attributes = new HashMap<>();
attributes.put(HTTPUtils.HTTP_CONTEXT_ID, "my-id");
attributes.put(HTTPUtils.HTTP_CONTEXT_ID, HTTP_REQUEST_ID);
attributes.put("my-attr", "hello");
runner.enqueue("hello".getBytes(), attributes);
runner.enqueue(FLOW_FILE_CONTENT.getBytes(), attributes);
runner.run();
@ -208,16 +239,18 @@ public class TestHandleHttpResponse {
private final String id;
private final AtomicInteger completedCount = new AtomicInteger(0);
private final ByteArrayOutputStream baos = new ByteArrayOutputStream();
private final ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
private final ConcurrentMap<String, String> headersSent = new ConcurrentHashMap<>();
private final String shouldThrowExceptionClass;
private final Exception responseException;
private final RuntimeException completeException;
private volatile int statusCode = -1;
private final List<String> headersWithNoValue = new CopyOnWriteArrayList<>();
public MockHttpContextMap(final String expectedIdentifier, final String shouldThrowExceptionClass) {
public MockHttpContextMap(final String expectedIdentifier, final Exception responseException, final RuntimeException completeException) {
this.id = expectedIdentifier;
this.shouldThrowExceptionClass = shouldThrowExceptionClass;
this.responseException = responseException;
this.completeException = completeException;
}
@Override
@ -233,11 +266,7 @@ public class TestHandleHttpResponse {
try {
final HttpServletResponse response = Mockito.mock(HttpServletResponse.class);
if(shouldThrowExceptionClass != null && shouldThrowExceptionClass.equals("FlowFileAccessException")) {
Mockito.when(response.getOutputStream()).thenThrow(new FlowFileAccessException("exception"));
} else if(shouldThrowExceptionClass != null && shouldThrowExceptionClass.equals("ProcessException")) {
Mockito.when(response.getOutputStream()).thenThrow(new ProcessException("exception"));
} else {
if (responseException == null) {
Mockito.when(response.getOutputStream()).thenReturn(new ServletOutputStream() {
@Override
public boolean isReady() {
@ -249,43 +278,39 @@ public class TestHandleHttpResponse {
}
@Override
public void write(int b) throws IOException {
baos.write(b);
public void write(int b) {
outputStream.write(b);
}
@Override
public void write(byte[] b) throws IOException {
baos.write(b);
outputStream.write(b);
}
@Override
public void write(byte[] b, int off, int len) throws IOException {
baos.write(b, off, len);
public void write(byte[] b, int off, int len) {
outputStream.write(b, off, len);
}
});
} else {
Mockito.when(response.getOutputStream()).thenThrow(responseException);
}
Mockito.doAnswer(new Answer<Object>() {
@Override
public Object answer(final InvocationOnMock invocation) throws Throwable {
final String key = invocation.getArgument(0);
final String value = invocation.getArgument(1);
if (value == null) {
headersWithNoValue.add(key);
} else {
headersSent.put(key, value);
}
return null;
Mockito.doAnswer(invocation -> {
final String key = invocation.getArgument(0);
final String value = invocation.getArgument(1);
if (value == null) {
headersWithNoValue.add(key);
} else {
headersSent.put(key, value);
}
return null;
}).when(response).setHeader(Mockito.any(String.class), Mockito.any(String.class));
Mockito.doAnswer(new Answer<Object>() {
@Override
public Object answer(final InvocationOnMock invocation) throws Throwable {
statusCode = invocation.getArgument(0);
return null;
}
Mockito.doAnswer(invocation -> {
statusCode = invocation.getArgument(0);
return null;
}).when(response).setStatus(Mockito.anyInt());
return response;
@ -302,6 +327,10 @@ public class TestHandleHttpResponse {
Assert.fail("attempting to respond to wrong request; should have been " + id + " but was " + identifier);
}
if (completeException != null) {
throw completeException;
}
completedCount.incrementAndGet();
}