mirror of
https://github.com/apache/nifi.git
synced 2025-02-16 06:55:28 +00:00
NIFI-3517 If HandleHttpResponse cannot write response remove entry from HttpContextMap.
* Add test for failure not clear context map. * Add handler to remove context map entry if ProcessException occurs during while exporting response. Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com> This closes #1567.
This commit is contained in:
parent
1376e5fbd0
commit
90ff275bb9
@ -165,6 +165,11 @@ public class HandleHttpResponse extends AbstractProcessor {
|
||||
try {
|
||||
session.exportTo(flowFile, response.getOutputStream());
|
||||
response.flushBuffer();
|
||||
} catch (final ProcessException e) {
|
||||
session.transfer(flowFile, REL_FAILURE);
|
||||
getLogger().error("Failed to respond to HTTP request for {} due to {}", new Object[]{flowFile, e});
|
||||
contextMap.complete(contextIdentifier);
|
||||
return;
|
||||
} catch (final Exception e) {
|
||||
session.transfer(flowFile, REL_FAILURE);
|
||||
getLogger().error("Failed to respond to HTTP request for {} due to {}", new Object[]{flowFile, e});
|
||||
|
@ -39,6 +39,7 @@ import javax.servlet.http.HttpServletResponse;
|
||||
import org.apache.nifi.controller.AbstractControllerService;
|
||||
import org.apache.nifi.http.HttpContextMap;
|
||||
import org.apache.nifi.processor.exception.FlowFileAccessException;
|
||||
import org.apache.nifi.processor.exception.ProcessException;
|
||||
import org.apache.nifi.processors.standard.util.HTTPUtils;
|
||||
import org.apache.nifi.provenance.ProvenanceEventType;
|
||||
import org.apache.nifi.reporting.InitializationException;
|
||||
@ -56,7 +57,7 @@ public class TestHandleHttpResponse {
|
||||
public void testEnsureCompleted() throws InitializationException {
|
||||
final TestRunner runner = TestRunners.newTestRunner(HandleHttpResponse.class);
|
||||
|
||||
final MockHttpContextMap contextMap = new MockHttpContextMap("my-id", false);
|
||||
final MockHttpContextMap contextMap = new MockHttpContextMap("my-id", "");
|
||||
runner.addControllerService("http-context-map", contextMap);
|
||||
runner.enableControllerService(contextMap);
|
||||
runner.setProperty(HandleHttpResponse.HTTP_CONTEXT_MAP, "http-context-map");
|
||||
@ -95,7 +96,7 @@ public class TestHandleHttpResponse {
|
||||
public void testWithExceptionThrown() throws InitializationException {
|
||||
final TestRunner runner = TestRunners.newTestRunner(HandleHttpResponse.class);
|
||||
|
||||
final MockHttpContextMap contextMap = new MockHttpContextMap("my-id", true);
|
||||
final MockHttpContextMap contextMap = new MockHttpContextMap("my-id", "FlowFileAccessException");
|
||||
runner.addControllerService("http-context-map", contextMap);
|
||||
runner.enableControllerService(contextMap);
|
||||
runner.setProperty(HandleHttpResponse.HTTP_CONTEXT_MAP, "http-context-map");
|
||||
@ -113,6 +114,32 @@ public class TestHandleHttpResponse {
|
||||
runner.run();
|
||||
|
||||
runner.assertAllFlowFilesTransferred(HandleHttpResponse.REL_FAILURE, 1);
|
||||
assertEquals(0, contextMap.getCompletionCount());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCannotWriteResponse() throws InitializationException {
|
||||
final TestRunner runner = TestRunners.newTestRunner(HandleHttpResponse.class);
|
||||
|
||||
final MockHttpContextMap contextMap = new MockHttpContextMap("my-id", "ProcessException");
|
||||
runner.addControllerService("http-context-map", contextMap);
|
||||
runner.enableControllerService(contextMap);
|
||||
runner.setProperty(HandleHttpResponse.HTTP_CONTEXT_MAP, "http-context-map");
|
||||
runner.setProperty(HandleHttpResponse.STATUS_CODE, "${status.code}");
|
||||
runner.setProperty("my-attr", "${my-attr}");
|
||||
runner.setProperty("no-valid-attr", "${no-valid-attr}");
|
||||
|
||||
final Map<String, String> attributes = new HashMap<>();
|
||||
attributes.put(HTTPUtils.HTTP_CONTEXT_ID, "my-id");
|
||||
attributes.put("my-attr", "hello");
|
||||
attributes.put("status.code", "201");
|
||||
|
||||
runner.enqueue("hello".getBytes(), attributes);
|
||||
|
||||
runner.run();
|
||||
|
||||
runner.assertAllFlowFilesTransferred(HandleHttpResponse.REL_FAILURE, 1);
|
||||
assertEquals(1, contextMap.getCompletionCount());
|
||||
}
|
||||
|
||||
private static class MockHttpContextMap extends AbstractControllerService implements HttpContextMap {
|
||||
@ -121,14 +148,14 @@ public class TestHandleHttpResponse {
|
||||
private final AtomicInteger completedCount = new AtomicInteger(0);
|
||||
private final ByteArrayOutputStream baos = new ByteArrayOutputStream();
|
||||
private final ConcurrentMap<String, String> headersSent = new ConcurrentHashMap<>();
|
||||
private final boolean shouldThrowException;
|
||||
private final String shouldThrowExceptionClass;
|
||||
private volatile int statusCode = -1;
|
||||
|
||||
private final List<String> headersWithNoValue = new CopyOnWriteArrayList<>();
|
||||
|
||||
public MockHttpContextMap(final String expectedIdentifier, final boolean shouldThrowException) {
|
||||
public MockHttpContextMap(final String expectedIdentifier, final String shouldThrowExceptionClass) {
|
||||
this.id = expectedIdentifier;
|
||||
this.shouldThrowException = shouldThrowException;
|
||||
this.shouldThrowExceptionClass = shouldThrowExceptionClass;
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -144,8 +171,10 @@ public class TestHandleHttpResponse {
|
||||
|
||||
try {
|
||||
final HttpServletResponse response = Mockito.mock(HttpServletResponse.class);
|
||||
if(shouldThrowException) {
|
||||
if(shouldThrowExceptionClass != null && shouldThrowExceptionClass.equals("FlowFileAccessException")) {
|
||||
Mockito.when(response.getOutputStream()).thenThrow(new FlowFileAccessException("exception"));
|
||||
} else if(shouldThrowExceptionClass != null && shouldThrowExceptionClass.equals("ProcessException")) {
|
||||
Mockito.when(response.getOutputStream()).thenThrow(new ProcessException("exception"));
|
||||
} else {
|
||||
Mockito.when(response.getOutputStream()).thenReturn(new ServletOutputStream() {
|
||||
@Override
|
||||
|
Loading…
x
Reference in New Issue
Block a user