NIFI-55 - Ensures ListenHTTP logs the source of an expired hold

Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com>

This closes #1623.
This commit is contained in:
Andre F de Miranda 2017-03-04 14:38:31 +11:00 committed by Pierre Villard
parent b4e0a6e206
commit b0be99036d
3 changed files with 13 additions and 14 deletions

View File

@ -329,7 +329,7 @@ public class ListenHTTP extends AbstractSessionFactoryProcessor {
for (final String id : findOldFlowFileIds(context)) { for (final String id : findOldFlowFileIds(context)) {
final FlowFileEntryTimeWrapper wrapper = flowFileMap.remove(id); final FlowFileEntryTimeWrapper wrapper = flowFileMap.remove(id);
if (wrapper != null) { if (wrapper != null) {
getLogger().warn("failed to received acknowledgment for HOLD with ID {}; rolling back session", new Object[] {id}); getLogger().warn("failed to received acknowledgment for HOLD with ID {} sent by {}; rolling back session", new Object[] {id, wrapper.getClientIP()});
wrapper.session.rollback(); wrapper.session.rollback();
} }
} }
@ -342,11 +342,13 @@ public class ListenHTTP extends AbstractSessionFactoryProcessor {
private final Set<FlowFile> flowFiles; private final Set<FlowFile> flowFiles;
private final long entryTime; private final long entryTime;
private final ProcessSession session; private final ProcessSession session;
private final String clientIP;
public FlowFileEntryTimeWrapper(final ProcessSession session, final Set<FlowFile> flowFiles, final long entryTime) { public FlowFileEntryTimeWrapper(final ProcessSession session, final Set<FlowFile> flowFiles, final long entryTime, final String clientIP) {
this.flowFiles = flowFiles; this.flowFiles = flowFiles;
this.entryTime = entryTime; this.entryTime = entryTime;
this.session = session; this.session = session;
this.clientIP = clientIP;
} }
public Set<FlowFile> getFlowFiles() { public Set<FlowFile> getFlowFiles() {
@ -360,5 +362,9 @@ public class ListenHTTP extends AbstractSessionFactoryProcessor {
public ProcessSession getSession() { public ProcessSession getSession() {
return session; return session;
} }
public String getClientIP() {
return clientIP;
}
} }
} }

View File

@ -282,7 +282,7 @@ public class ListenHTTPServlet extends HttpServlet {
uuid = UUID.randomUUID().toString(); uuid = UUID.randomUUID().toString();
} }
final FlowFileEntryTimeWrapper wrapper = new FlowFileEntryTimeWrapper(session, flowFileSet, System.currentTimeMillis()); final FlowFileEntryTimeWrapper wrapper = new FlowFileEntryTimeWrapper(session, flowFileSet, System.currentTimeMillis(), request.getRemoteHost());
FlowFileEntryTimeWrapper previousWrapper; FlowFileEntryTimeWrapper previousWrapper;
do { do {
previousWrapper = flowFileMap.putIfAbsent(uuid, wrapper); previousWrapper = flowFileMap.putIfAbsent(uuid, wrapper);

View File

@ -18,6 +18,7 @@ package org.apache.nifi.processors.standard;
import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSessionFactory; import org.apache.nifi.processor.ProcessSessionFactory;
import org.apache.nifi.remote.io.socket.NetworkUtils;
import org.apache.nifi.reporting.InitializationException; import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.ssl.SSLContextService; import org.apache.nifi.ssl.SSLContextService;
import org.apache.nifi.ssl.StandardSSLContextService; import org.apache.nifi.ssl.StandardSSLContextService;
@ -30,7 +31,6 @@ import org.junit.Test;
import java.io.DataOutputStream; import java.io.DataOutputStream;
import java.io.IOException; import java.io.IOException;
import java.net.HttpURLConnection; import java.net.HttpURLConnection;
import java.net.ServerSocket;
import java.net.URL; import java.net.URL;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
@ -58,7 +58,7 @@ public class TestListenHTTP {
public void setup() throws IOException { public void setup() throws IOException {
proc = new ListenHTTP(); proc = new ListenHTTP();
runner = TestRunners.newTestRunner(proc); runner = TestRunners.newTestRunner(proc);
availablePort = findAvailablePort(); availablePort = NetworkUtils.availablePort();;
runner.setVariable(PORT_VARIABLE, Integer.toString(availablePort)); runner.setVariable(PORT_VARIABLE, Integer.toString(availablePort));
runner.setVariable(BASEPATH_VARIABLE,HTTP_BASE_PATH); runner.setVariable(BASEPATH_VARIABLE,HTTP_BASE_PATH);
@ -146,17 +146,10 @@ public class TestListenHTTP {
Thread.sleep(100); Thread.sleep(100);
} }
runner.assertTransferCount(ListenTCP.REL_SUCCESS, messages.size()); runner.assertTransferCount(ListenHTTP.RELATIONSHIP_SUCCESS, messages.size());
} }
private int findAvailablePort() throws IOException {
try (ServerSocket socket = new ServerSocket(0)) {
socket.setReuseAddress(true);
return socket.getLocalPort();
}
}
private SSLContextService configureProcessorSslContextService() throws InitializationException { private SSLContextService configureProcessorSslContextService() throws InitializationException {
final SSLContextService sslContextService = new StandardSSLContextService(); final SSLContextService sslContextService = new StandardSSLContextService();
runner.addControllerService("ssl-context", sslContextService); runner.addControllerService("ssl-context", sslContextService);
@ -168,7 +161,7 @@ public class TestListenHTTP {
runner.setProperty(sslContextService, StandardSSLContextService.KEYSTORE_TYPE, "JKS"); runner.setProperty(sslContextService, StandardSSLContextService.KEYSTORE_TYPE, "JKS");
runner.enableControllerService(sslContextService); runner.enableControllerService(sslContextService);
runner.setProperty(ListenTCP.SSL_CONTEXT_SERVICE, "ssl-context"); runner.setProperty(ListenHTTP.SSL_CONTEXT_SERVICE, "ssl-context");
return sslContextService; return sslContextService;
} }