mirror of https://github.com/apache/nifi.git
parent
9db1def6c6
commit
e2bbea1c53
|
@ -16,7 +16,6 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.nifi.processors.standard;
|
package org.apache.nifi.processors.standard;
|
||||||
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
@ -155,9 +154,9 @@ public class HandleHttpResponse extends AbstractProcessor {
|
||||||
try {
|
try {
|
||||||
session.exportTo(flowFile, response.getOutputStream());
|
session.exportTo(flowFile, response.getOutputStream());
|
||||||
response.flushBuffer();
|
response.flushBuffer();
|
||||||
} catch (final IOException ioe) {
|
} catch (final Exception e) {
|
||||||
session.transfer(flowFile, REL_FAILURE);
|
session.transfer(flowFile, REL_FAILURE);
|
||||||
getLogger().error("Failed to respond to HTTP request for {} due to {}", new Object[]{flowFile, ioe});
|
getLogger().error("Failed to respond to HTTP request for {} due to {}", new Object[]{flowFile, e});
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -38,6 +38,7 @@ import javax.servlet.http.HttpServletResponse;
|
||||||
|
|
||||||
import org.apache.nifi.controller.AbstractControllerService;
|
import org.apache.nifi.controller.AbstractControllerService;
|
||||||
import org.apache.nifi.http.HttpContextMap;
|
import org.apache.nifi.http.HttpContextMap;
|
||||||
|
import org.apache.nifi.processor.exception.FlowFileAccessException;
|
||||||
import org.apache.nifi.reporting.InitializationException;
|
import org.apache.nifi.reporting.InitializationException;
|
||||||
import org.apache.nifi.util.TestRunner;
|
import org.apache.nifi.util.TestRunner;
|
||||||
import org.apache.nifi.util.TestRunners;
|
import org.apache.nifi.util.TestRunners;
|
||||||
|
@ -53,7 +54,7 @@ public class TestHandleHttpResponse {
|
||||||
public void testEnsureCompleted() throws InitializationException {
|
public void testEnsureCompleted() throws InitializationException {
|
||||||
final TestRunner runner = TestRunners.newTestRunner(HandleHttpResponse.class);
|
final TestRunner runner = TestRunners.newTestRunner(HandleHttpResponse.class);
|
||||||
|
|
||||||
final MockHttpContextMap contextMap = new MockHttpContextMap("my-id");
|
final MockHttpContextMap contextMap = new MockHttpContextMap("my-id", false);
|
||||||
runner.addControllerService("http-context-map", contextMap);
|
runner.addControllerService("http-context-map", contextMap);
|
||||||
runner.enableControllerService(contextMap);
|
runner.enableControllerService(contextMap);
|
||||||
runner.setProperty(HandleHttpResponse.HTTP_CONTEXT_MAP, "http-context-map");
|
runner.setProperty(HandleHttpResponse.HTTP_CONTEXT_MAP, "http-context-map");
|
||||||
|
@ -80,18 +81,44 @@ public class TestHandleHttpResponse {
|
||||||
assertTrue(contextMap.headersWithNoValue.isEmpty());
|
assertTrue(contextMap.headersWithNoValue.isEmpty());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testWithExceptionThrown() throws InitializationException {
|
||||||
|
final TestRunner runner = TestRunners.newTestRunner(HandleHttpResponse.class);
|
||||||
|
|
||||||
|
final MockHttpContextMap contextMap = new MockHttpContextMap("my-id", true);
|
||||||
|
runner.addControllerService("http-context-map", contextMap);
|
||||||
|
runner.enableControllerService(contextMap);
|
||||||
|
runner.setProperty(HandleHttpResponse.HTTP_CONTEXT_MAP, "http-context-map");
|
||||||
|
runner.setProperty(HandleHttpResponse.STATUS_CODE, "${status.code}");
|
||||||
|
runner.setProperty("my-attr", "${my-attr}");
|
||||||
|
runner.setProperty("no-valid-attr", "${no-valid-attr}");
|
||||||
|
|
||||||
|
final Map<String, String> attributes = new HashMap<>();
|
||||||
|
attributes.put(HandleHttpResponse.HTTP_CONTEXT_ID, "my-id");
|
||||||
|
attributes.put("my-attr", "hello");
|
||||||
|
attributes.put("status.code", "201");
|
||||||
|
|
||||||
|
runner.enqueue("hello".getBytes(), attributes);
|
||||||
|
|
||||||
|
runner.run();
|
||||||
|
|
||||||
|
runner.assertAllFlowFilesTransferred(HandleHttpResponse.REL_FAILURE, 1);
|
||||||
|
}
|
||||||
|
|
||||||
private static class MockHttpContextMap extends AbstractControllerService implements HttpContextMap {
|
private static class MockHttpContextMap extends AbstractControllerService implements HttpContextMap {
|
||||||
|
|
||||||
private final String id;
|
private final String id;
|
||||||
private final AtomicInteger completedCount = new AtomicInteger(0);
|
private final AtomicInteger completedCount = new AtomicInteger(0);
|
||||||
private final ByteArrayOutputStream baos = new ByteArrayOutputStream();
|
private final ByteArrayOutputStream baos = new ByteArrayOutputStream();
|
||||||
private final ConcurrentMap<String, String> headersSent = new ConcurrentHashMap<>();
|
private final ConcurrentMap<String, String> headersSent = new ConcurrentHashMap<>();
|
||||||
|
private final boolean shouldThrowException;
|
||||||
private volatile int statusCode = -1;
|
private volatile int statusCode = -1;
|
||||||
|
|
||||||
private final List<String> headersWithNoValue = new CopyOnWriteArrayList<>();
|
private final List<String> headersWithNoValue = new CopyOnWriteArrayList<>();
|
||||||
|
|
||||||
public MockHttpContextMap(final String expectedIdentifier) {
|
public MockHttpContextMap(final String expectedIdentifier, final boolean shouldThrowException) {
|
||||||
this.id = expectedIdentifier;
|
this.id = expectedIdentifier;
|
||||||
|
this.shouldThrowException = shouldThrowException;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -107,6 +134,9 @@ public class TestHandleHttpResponse {
|
||||||
|
|
||||||
try {
|
try {
|
||||||
final HttpServletResponse response = Mockito.mock(HttpServletResponse.class);
|
final HttpServletResponse response = Mockito.mock(HttpServletResponse.class);
|
||||||
|
if(shouldThrowException) {
|
||||||
|
Mockito.when(response.getOutputStream()).thenThrow(new FlowFileAccessException("exception"));
|
||||||
|
} else {
|
||||||
Mockito.when(response.getOutputStream()).thenReturn(new ServletOutputStream() {
|
Mockito.when(response.getOutputStream()).thenReturn(new ServletOutputStream() {
|
||||||
@Override
|
@Override
|
||||||
public boolean isReady() {
|
public boolean isReady() {
|
||||||
|
@ -132,6 +162,7 @@ public class TestHandleHttpResponse {
|
||||||
baos.write(b, off, len);
|
baos.write(b, off, len);
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
}
|
||||||
|
|
||||||
Mockito.doAnswer(new Answer<Object>() {
|
Mockito.doAnswer(new Answer<Object>() {
|
||||||
@Override
|
@Override
|
||||||
|
|
Loading…
Reference in New Issue