mirror of https://github.com/apache/nifi.git
NIFI-11126 Delete MultiPart files in ListenHTTP after processing
Signed-off-by: Chris Sampson <chris.sampson82@gmail.com> This closes #6915
This commit is contained in:
parent
d46aa89788
commit
8bc7593d34
|
@ -62,11 +62,9 @@ import java.io.OutputStream;
|
|||
import java.nio.charset.StandardCharsets;
|
||||
import java.security.cert.X509Certificate;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.Enumeration;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.Iterator;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.UUID;
|
||||
|
@ -246,7 +244,11 @@ public class ListenHTTPServlet extends HttpServlet {
|
|||
|
||||
Set<FlowFile> flowFileSet;
|
||||
if (StringUtils.isNotBlank(request.getContentType()) && request.getContentType().contains("multipart/form-data")) {
|
||||
flowFileSet = handleMultipartRequest(request, session, foundSubject, foundIssuer);
|
||||
try {
|
||||
flowFileSet = handleMultipartRequest(request, session, foundSubject, foundIssuer);
|
||||
} finally {
|
||||
deleteMultiPartFiles(request);
|
||||
}
|
||||
} else {
|
||||
flowFileSet = handleRequest(request, session, foundSubject, foundIssuer, destinationIsLegacyNiFi, contentType, in);
|
||||
}
|
||||
|
@ -256,6 +258,16 @@ public class ListenHTTPServlet extends HttpServlet {
|
|||
}
|
||||
}
|
||||
|
||||
private void deleteMultiPartFiles(final HttpServletRequest request) {
|
||||
try {
|
||||
for (final Part part : request.getParts()) {
|
||||
part.delete();
|
||||
}
|
||||
} catch (final Exception e) {
|
||||
logger.warn("Delete MultiPart temporary files failed", e);
|
||||
}
|
||||
}
|
||||
|
||||
private void handleException(final HttpServletRequest request, final HttpServletResponse response,
|
||||
final ProcessSession session, final String foundSubject, final String foundIssuer, final Throwable t) throws IOException {
|
||||
session.rollback();
|
||||
|
@ -270,20 +282,18 @@ public class ListenHTTPServlet extends HttpServlet {
|
|||
|
||||
private Set<FlowFile> handleMultipartRequest(HttpServletRequest request, ProcessSession session, String foundSubject, String foundIssuer)
|
||||
throws IOException, IllegalStateException, ServletException {
|
||||
if (isRecordProcessing()) {
|
||||
logger.debug("Record processing will not be utilized while processing multipart request. Request URI: {}", request.getRequestURI());
|
||||
}
|
||||
Set<FlowFile> flowFileSet = new HashSet<>();
|
||||
String tempDir = System.getProperty("java.io.tmpdir");
|
||||
request.setAttribute(Request.MULTIPART_CONFIG_ELEMENT, new MultipartConfigElement(tempDir, multipartRequestMaxSize, multipartRequestMaxSize, multipartReadBufferSize));
|
||||
Collection<Part> requestParts = Collections.unmodifiableCollection(request.getParts());
|
||||
final Iterator<Part> parts = requestParts.iterator();
|
||||
int i = 0;
|
||||
while (parts.hasNext()) {
|
||||
Part part = parts.next();
|
||||
final Collection<Part> requestParts = request.getParts();
|
||||
for (final Part part : requestParts) {
|
||||
FlowFile flowFile = session.create();
|
||||
try (OutputStream flowFileOutputStream = session.write(flowFile)) {
|
||||
StreamUtils.copy(part.getInputStream(), flowFileOutputStream);
|
||||
try (
|
||||
OutputStream flowFileOutputStream = session.write(flowFile);
|
||||
InputStream partInputStream = part.getInputStream()
|
||||
) {
|
||||
StreamUtils.copy(partInputStream, flowFileOutputStream);
|
||||
}
|
||||
flowFile = saveRequestDetailsAsAttributes(request, session, foundSubject, foundIssuer, flowFile);
|
||||
flowFile = savePartDetailsAsAttributes(session, part, flowFile, i, requestParts.size());
|
||||
|
@ -333,9 +343,6 @@ public class ListenHTTPServlet extends HttpServlet {
|
|||
hasMoreData.set(false);
|
||||
}
|
||||
} else {
|
||||
if (isRecordProcessing()) {
|
||||
logger.debug("Record processing will not be utilized while processing with unpackager. Request URI: {}", request.getRequestURI());
|
||||
}
|
||||
attributes.putAll(unpackager.unpackageFlowFile(in, bos));
|
||||
|
||||
if (destinationIsLegacyNiFi) {
|
||||
|
|
|
@ -23,6 +23,7 @@ import java.net.InetSocketAddress;
|
|||
import java.net.Socket;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.nio.file.Paths;
|
||||
import java.security.GeneralSecurityException;
|
||||
import java.time.Duration;
|
||||
|
@ -696,6 +697,7 @@ public class TestListenHTTP {
|
|||
runner.setProperty(ListenHTTP.PORT, Integer.toString(availablePort));
|
||||
runner.setProperty(ListenHTTP.BASE_PATH, HTTP_BASE_PATH);
|
||||
runner.setProperty(ListenHTTP.RETURN_CODE, Integer.toString(HttpServletResponse.SC_OK));
|
||||
runner.setProperty(ListenHTTP.MULTIPART_READ_BUFFER_SIZE, "10 bytes");
|
||||
|
||||
final SSLContextService sslContextService = runner.getControllerService(SSL_CONTEXT_SERVICE_IDENTIFIER, SSLContextService.class);
|
||||
final boolean isSecure = (sslContextService != null);
|
||||
|
@ -769,6 +771,13 @@ public class TestListenHTTP {
|
|||
mff.assertAttributeExists("http.multipart.fragments.sequence.number");
|
||||
mff.assertAttributeEquals("http.multipart.fragments.total.number", "5");
|
||||
mff.assertAttributeExists("http.headers.multipart.content-disposition");
|
||||
|
||||
final Path tempDirectoryPath = Paths.get(System.getProperty("java.io.tmpdir"));
|
||||
final long multiPartTempFiles = Files.find(tempDirectoryPath, 1,
|
||||
(filePath, fileAttributes) -> filePath.getFileName().toString().startsWith("MultiPart")
|
||||
).count();
|
||||
final String multiPartMessage = String.format("MultiPart files found in temporary directory [%s]", tempDirectoryPath);
|
||||
assertEquals(0, multiPartTempFiles, multiPartMessage);
|
||||
}
|
||||
|
||||
private byte[] generateRandomBinaryData() {
|
||||
|
|
Loading…
Reference in New Issue