diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenHTTP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenHTTP.java index 0c38331536..d2bfa04710 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenHTTP.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenHTTP.java @@ -329,7 +329,7 @@ public class ListenHTTP extends AbstractSessionFactoryProcessor { for (final String id : findOldFlowFileIds(context)) { final FlowFileEntryTimeWrapper wrapper = flowFileMap.remove(id); if (wrapper != null) { - getLogger().warn("failed to received acknowledgment for HOLD with ID {}; rolling back session", new Object[] {id}); + getLogger().warn("failed to received acknowledgment for HOLD with ID {} sent by {}; rolling back session", new Object[] {id, wrapper.getClientIP()}); wrapper.session.rollback(); } } @@ -342,11 +342,13 @@ public class ListenHTTP extends AbstractSessionFactoryProcessor { private final Set flowFiles; private final long entryTime; private final ProcessSession session; + private final String clientIP; - public FlowFileEntryTimeWrapper(final ProcessSession session, final Set flowFiles, final long entryTime) { + public FlowFileEntryTimeWrapper(final ProcessSession session, final Set flowFiles, final long entryTime, final String clientIP) { this.flowFiles = flowFiles; this.entryTime = entryTime; this.session = session; + this.clientIP = clientIP; } public Set getFlowFiles() { @@ -360,5 +362,9 @@ public class ListenHTTP extends AbstractSessionFactoryProcessor { public ProcessSession getSession() { return session; } + + public String getClientIP() { + return clientIP; + } } } diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/servlets/ListenHTTPServlet.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/servlets/ListenHTTPServlet.java index 5f67e311f6..b38a872295 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/servlets/ListenHTTPServlet.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/servlets/ListenHTTPServlet.java @@ -282,7 +282,7 @@ public class ListenHTTPServlet extends HttpServlet { uuid = UUID.randomUUID().toString(); } - final FlowFileEntryTimeWrapper wrapper = new FlowFileEntryTimeWrapper(session, flowFileSet, System.currentTimeMillis()); + final FlowFileEntryTimeWrapper wrapper = new FlowFileEntryTimeWrapper(session, flowFileSet, System.currentTimeMillis(), request.getRemoteHost()); FlowFileEntryTimeWrapper previousWrapper; do { previousWrapper = flowFileMap.putIfAbsent(uuid, wrapper); diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenHTTP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenHTTP.java index 020535b15e..9193f8e9b3 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenHTTP.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenHTTP.java @@ -18,6 +18,7 @@ package org.apache.nifi.processors.standard; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSessionFactory; +import org.apache.nifi.remote.io.socket.NetworkUtils; import org.apache.nifi.reporting.InitializationException; import org.apache.nifi.ssl.SSLContextService; import org.apache.nifi.ssl.StandardSSLContextService; @@ -30,7 +31,6 @@ import org.junit.Test; import java.io.DataOutputStream; import java.io.IOException; import java.net.HttpURLConnection; -import java.net.ServerSocket; import java.net.URL; import java.util.ArrayList; import java.util.List; @@ -58,7 +58,7 @@ public class TestListenHTTP { public void setup() throws IOException { proc = new ListenHTTP(); runner = TestRunners.newTestRunner(proc); - availablePort = findAvailablePort(); + availablePort = NetworkUtils.availablePort();; runner.setVariable(PORT_VARIABLE, Integer.toString(availablePort)); runner.setVariable(BASEPATH_VARIABLE,HTTP_BASE_PATH); @@ -146,17 +146,10 @@ public class TestListenHTTP { Thread.sleep(100); } - runner.assertTransferCount(ListenTCP.REL_SUCCESS, messages.size()); + runner.assertTransferCount(ListenHTTP.RELATIONSHIP_SUCCESS, messages.size()); } - private int findAvailablePort() throws IOException { - try (ServerSocket socket = new ServerSocket(0)) { - socket.setReuseAddress(true); - return socket.getLocalPort(); - } - } - private SSLContextService configureProcessorSslContextService() throws InitializationException { final SSLContextService sslContextService = new StandardSSLContextService(); runner.addControllerService("ssl-context", sslContextService); @@ -168,7 +161,7 @@ public class TestListenHTTP { runner.setProperty(sslContextService, StandardSSLContextService.KEYSTORE_TYPE, "JKS"); runner.enableControllerService(sslContextService); - runner.setProperty(ListenTCP.SSL_CONTEXT_SERVICE, "ssl-context"); + runner.setProperty(ListenHTTP.SSL_CONTEXT_SERVICE, "ssl-context"); return sslContextService; }