NIFI-1913 Properly removing flowfile in HandleHttpRequest when the Context Map fails to register the request

This closes #462
This commit is contained in:
jpercivall 2016-05-23 12:24:43 -04:00
parent 5df67c5dc2
commit e1b654655a
2 changed files with 79 additions and 2 deletions

View File

@ -611,6 +611,7 @@ public class HandleHttpRequest extends AbstractProcessor {
new Object[]{request.getRemoteAddr(), e});
}
session.remove(flowFile);
return;
}

View File

@ -101,14 +101,82 @@ public class TestHandleHttpRequest {
}
}
@Test(timeout=10000)
public void testFailToRegister() throws InitializationException, MalformedURLException, IOException, InterruptedException {
final TestRunner runner = TestRunners.newTestRunner(HandleHttpRequest.class);
runner.setProperty(HandleHttpRequest.PORT, "0");
final MockHttpContextMap contextMap = new MockHttpContextMap();
runner.addControllerService("http-context-map", contextMap);
runner.enableControllerService(contextMap);
runner.setProperty(HandleHttpRequest.HTTP_CONTEXT_MAP, "http-context-map");
contextMap.setRegisterSuccessfully(false);
// trigger processor to stop but not shutdown.
runner.run(1, false);
try {
final int[] responseCode = new int[1];
responseCode[0] = 0;
final Thread httpThread = new Thread(new Runnable() {
@Override
public void run() {
HttpURLConnection connection = null;
try {
final int port = ((HandleHttpRequest) runner.getProcessor()).getPort();
connection = (HttpURLConnection) new URL("http://localhost:"
+ port + "/my/path?query=true&value1=value1&value2=&value3&value4=apple=orange").openConnection();
connection.setDoOutput(false);
connection.setRequestMethod("GET");
connection.setRequestProperty("header1", "value1");
connection.setRequestProperty("header2", "");
connection.setRequestProperty("header3", "apple=orange");
connection.setConnectTimeout(3000);
connection.setReadTimeout(3000);
StreamUtils.copy(connection.getInputStream(), new NullOutputStream());
} catch (final Throwable t) {
t.printStackTrace();
if(connection != null ) {
try {
responseCode[0] = connection.getResponseCode();
} catch (IOException e) {
responseCode[0] = -1;
}
} else {
responseCode[0] = -2;
}
}
}
});
httpThread.start();
while (responseCode[0] == 0) {
// process the request.
runner.run(1, false, false);
}
runner.assertTransferCount(HandleHttpRequest.REL_SUCCESS, 0);
assertEquals(503, responseCode[0]);
} finally {
// shut down the server
runner.run(1, true);
}
}
private static class MockHttpContextMap extends AbstractControllerService implements HttpContextMap {
private boolean registerSuccessfully = true;
private final ConcurrentMap<String, HttpServletResponse> responseMap = new ConcurrentHashMap<>();
@Override
public boolean register(final String identifier, final HttpServletRequest request, final HttpServletResponse response, final AsyncContext context) {
responseMap.put(identifier, response);
return true;
if(registerSuccessfully) {
responseMap.put(identifier, response);
}
return registerSuccessfully;
}
@Override
@ -124,5 +192,13 @@ public class TestHandleHttpRequest {
public int size() {
return responseMap.size();
}
public boolean isRegisterSuccessfully() {
return registerSuccessfully;
}
public void setRegisterSuccessfully(boolean registerSuccessfully) {
this.registerSuccessfully = registerSuccessfully;
}
}
}