diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/servlets/ListenHTTPServlet.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/servlets/ListenHTTPServlet.java index 0f6a51da47..70ab08b2e0 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/servlets/ListenHTTPServlet.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/servlets/ListenHTTPServlet.java @@ -38,6 +38,7 @@ import org.apache.nifi.util.FlowFileUnpackagerV2; import org.apache.nifi.util.FlowFileUnpackagerV3; import org.eclipse.jetty.server.Request; +import javax.servlet.AsyncContext; import javax.servlet.MultipartConfigElement; import javax.servlet.ServletConfig; import javax.servlet.ServletContext; @@ -242,7 +243,7 @@ public class ListenHTTPServlet extends HttpServlet { private void handleException(final HttpServletRequest request, final HttpServletResponse response, final ProcessSession session, final String foundSubject, final String foundIssuer, final Throwable t) throws IOException { session.rollback(); - logger.error("Unable to receive file from Remote Host: [{}] SubjectDN [{}] IssuerDN [{}] due to {}", new Object[]{request.getRemoteHost(), foundSubject, foundIssuer, t}); + logger.error("Unable to receive file from Remote Host: [{}] SubjectDN [{}] IssuerDN [{}] due to {}", request.getRemoteHost(), foundSubject, foundIssuer, t); response.sendError(HttpServletResponse.SC_INTERNAL_SERVER_ERROR, t.toString()); } @@ -410,15 +411,25 @@ public class ListenHTTPServlet extends HttpServlet { response.getOutputStream().write(ackUri.getBytes("UTF-8")); if (logger.isDebugEnabled()) { logger.debug("Ingested {} from Remote Host: [{}] Port [{}] SubjectDN [{}] IssuerDN [{}]; placed hold on these {} files with ID {}", - new Object[]{flowFileSet, request.getRemoteHost(), request.getRemotePort(), foundSubject, foundIssuer, flowFileSet.size(), uuid}); + flowFileSet, request.getRemoteHost(), request.getRemotePort(), foundSubject, foundIssuer, flowFileSet.size(), uuid); } } else { - response.setStatus(this.returnCode); logger.info("Received from Remote Host: [{}] Port [{}] SubjectDN [{}] IssuerDN [{}]; transferring to 'success'", - new Object[]{request.getRemoteHost(), request.getRemotePort(), foundSubject, foundIssuer}); + request.getRemoteHost(), request.getRemotePort(), foundSubject, foundIssuer); session.transfer(flowFileSet, ListenHTTP.RELATIONSHIP_SUCCESS); - session.commitAsync(); + + final AsyncContext asyncContext = request.startAsync(); + session.commitAsync(() -> { + response.setStatus(this.returnCode); + asyncContext.complete(); + }, t -> { + logger.error("Failed to commit session. Returning error response to Remote Host: [{}] Port [{}] SubjectDN [{}] IssuerDN [{}]", + request.getRemoteHost(), request.getRemotePort(), foundSubject, foundIssuer, t); + response.setStatus(HttpServletResponse.SC_INTERNAL_SERVER_ERROR); + asyncContext.complete(); + } + ); } }