From 8bc7593d34a26801bd4957acaa0cc2633aa9c976 Mon Sep 17 00:00:00 2001 From: exceptionfactory Date: Wed, 1 Feb 2023 12:39:07 -0600 Subject: [PATCH] NIFI-11126 Delete MultiPart files in ListenHTTP after processing Signed-off-by: Chris Sampson This closes #6915 --- .../standard/servlets/ListenHTTPServlet.java | 37 +++++++++++-------- .../processors/standard/TestListenHTTP.java | 9 +++++ 2 files changed, 31 insertions(+), 15 deletions(-) diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/servlets/ListenHTTPServlet.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/servlets/ListenHTTPServlet.java index 1a2d3c6291..2793e1be12 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/servlets/ListenHTTPServlet.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/servlets/ListenHTTPServlet.java @@ -62,11 +62,9 @@ import java.io.OutputStream; import java.nio.charset.StandardCharsets; import java.security.cert.X509Certificate; import java.util.Collection; -import java.util.Collections; import java.util.Enumeration; import java.util.HashMap; import java.util.HashSet; -import java.util.Iterator; import java.util.Map; import java.util.Set; import java.util.UUID; @@ -246,7 +244,11 @@ public class ListenHTTPServlet extends HttpServlet { Set flowFileSet; if (StringUtils.isNotBlank(request.getContentType()) && request.getContentType().contains("multipart/form-data")) { - flowFileSet = handleMultipartRequest(request, session, foundSubject, foundIssuer); + try { + flowFileSet = handleMultipartRequest(request, session, foundSubject, foundIssuer); + } finally { + deleteMultiPartFiles(request); + } } else { flowFileSet = handleRequest(request, session, foundSubject, foundIssuer, destinationIsLegacyNiFi, contentType, in); } @@ -256,6 +258,16 @@ public class ListenHTTPServlet extends HttpServlet { } } + private void deleteMultiPartFiles(final HttpServletRequest request) { + try { + for (final Part part : request.getParts()) { + part.delete(); + } + } catch (final Exception e) { + logger.warn("Delete MultiPart temporary files failed", e); + } + } + private void handleException(final HttpServletRequest request, final HttpServletResponse response, final ProcessSession session, final String foundSubject, final String foundIssuer, final Throwable t) throws IOException { session.rollback(); @@ -270,20 +282,18 @@ public class ListenHTTPServlet extends HttpServlet { private Set handleMultipartRequest(HttpServletRequest request, ProcessSession session, String foundSubject, String foundIssuer) throws IOException, IllegalStateException, ServletException { - if (isRecordProcessing()) { - logger.debug("Record processing will not be utilized while processing multipart request. Request URI: {}", request.getRequestURI()); - } Set flowFileSet = new HashSet<>(); String tempDir = System.getProperty("java.io.tmpdir"); request.setAttribute(Request.MULTIPART_CONFIG_ELEMENT, new MultipartConfigElement(tempDir, multipartRequestMaxSize, multipartRequestMaxSize, multipartReadBufferSize)); - Collection requestParts = Collections.unmodifiableCollection(request.getParts()); - final Iterator parts = requestParts.iterator(); int i = 0; - while (parts.hasNext()) { - Part part = parts.next(); + final Collection requestParts = request.getParts(); + for (final Part part : requestParts) { FlowFile flowFile = session.create(); - try (OutputStream flowFileOutputStream = session.write(flowFile)) { - StreamUtils.copy(part.getInputStream(), flowFileOutputStream); + try ( + OutputStream flowFileOutputStream = session.write(flowFile); + InputStream partInputStream = part.getInputStream() + ) { + StreamUtils.copy(partInputStream, flowFileOutputStream); } flowFile = saveRequestDetailsAsAttributes(request, session, foundSubject, foundIssuer, flowFile); flowFile = savePartDetailsAsAttributes(session, part, flowFile, i, requestParts.size()); @@ -333,9 +343,6 @@ public class ListenHTTPServlet extends HttpServlet { hasMoreData.set(false); } } else { - if (isRecordProcessing()) { - logger.debug("Record processing will not be utilized while processing with unpackager. Request URI: {}", request.getRequestURI()); - } attributes.putAll(unpackager.unpackageFlowFile(in, bos)); if (destinationIsLegacyNiFi) { diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenHTTP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenHTTP.java index b5616d470d..f795891b89 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenHTTP.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenHTTP.java @@ -23,6 +23,7 @@ import java.net.InetSocketAddress; import java.net.Socket; import java.nio.charset.StandardCharsets; import java.nio.file.Files; +import java.nio.file.Path; import java.nio.file.Paths; import java.security.GeneralSecurityException; import java.time.Duration; @@ -696,6 +697,7 @@ public class TestListenHTTP { runner.setProperty(ListenHTTP.PORT, Integer.toString(availablePort)); runner.setProperty(ListenHTTP.BASE_PATH, HTTP_BASE_PATH); runner.setProperty(ListenHTTP.RETURN_CODE, Integer.toString(HttpServletResponse.SC_OK)); + runner.setProperty(ListenHTTP.MULTIPART_READ_BUFFER_SIZE, "10 bytes"); final SSLContextService sslContextService = runner.getControllerService(SSL_CONTEXT_SERVICE_IDENTIFIER, SSLContextService.class); final boolean isSecure = (sslContextService != null); @@ -769,6 +771,13 @@ public class TestListenHTTP { mff.assertAttributeExists("http.multipart.fragments.sequence.number"); mff.assertAttributeEquals("http.multipart.fragments.total.number", "5"); mff.assertAttributeExists("http.headers.multipart.content-disposition"); + + final Path tempDirectoryPath = Paths.get(System.getProperty("java.io.tmpdir")); + final long multiPartTempFiles = Files.find(tempDirectoryPath, 1, + (filePath, fileAttributes) -> filePath.getFileName().toString().startsWith("MultiPart") + ).count(); + final String multiPartMessage = String.format("MultiPart files found in temporary directory [%s]", tempDirectoryPath); + assertEquals(0, multiPartTempFiles, multiPartMessage); } private byte[] generateRandomBinaryData() {