diff --git a/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java b/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java index 4b8d43b66a..758a9ab7bd 100644 --- a/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java +++ b/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java @@ -22,6 +22,7 @@ import java.io.DataOutputStream; import java.io.EOFException; import java.io.File; import java.io.FileInputStream; +import java.io.FileNotFoundException; import java.io.FileOutputStream; import java.io.FilenameFilter; import java.io.IOException; @@ -80,6 +81,8 @@ public class FileSystemSwapManager implements FlowFileSwapManager { public static final int MINIMUM_SWAP_COUNT = 10000; private static final Pattern SWAP_FILE_PATTERN = Pattern.compile("\\d+-.+\\.swap"); + private static final Pattern TEMP_SWAP_FILE_PATTERN = Pattern.compile("\\d+-.+\\.swap\\.part"); + public static final int SWAP_ENCODING_VERSION = 6; public static final String EVENT_CATEGORY = "Swap FlowFiles"; @@ -441,14 +444,18 @@ public class FileSystemSwapManager implements FlowFileSwapManager { } if (!swapFile.delete()) { - final String errMsg = "Swapped in FlowFiles from file " + swapFile.getAbsolutePath() + " but failed to delete the file; this file should be cleaned up manually"; - logger.warn(errMsg); - eventReporter.reportEvent(Severity.WARNING, EVENT_CATEGORY, errMsg); + warn("Swapped in FlowFiles from file " + swapFile.getAbsolutePath() + " but failed to delete the file; this file should be cleaned up manually"); } + } catch (final EOFException eof) { + error("Failed to Swap In FlowFiles for " + flowFileQueue + " due to: Corrupt Swap File; will remove this Swap File: " + swapFile); + + if ( !swapFile.delete() ) { + warn("Failed to remove corrupt Swap File " + swapFile + "; This file should be cleaned up manually"); + } + } catch (final FileNotFoundException fnfe) { + error("Failed to Swap In FlowFiles for " + flowFileQueue + " due to: Could not find Swap File " + swapFile); } catch (final Exception e) { - final String errMsg = "Failed to Swap In FlowFiles for " + flowFileQueue + " due to " + e; - logger.error(errMsg); - eventReporter.reportEvent(Severity.ERROR, EVENT_CATEGORY, errMsg); + error("Failed to Swap In FlowFiles for " + flowFileQueue + " due to " + e, e); if (swapFile != null) { queue.add(swapFile); @@ -463,8 +470,29 @@ public class FileSystemSwapManager implements FlowFileSwapManager { } } + private void error(final String error, final Throwable t) { + error(error); + if ( logger.isDebugEnabled() ) { + logger.error("", t); + } + } + + private void error(final String error) { + logger.error(error); + if ( eventReporter != null ) { + eventReporter.reportEvent(Severity.ERROR, EVENT_CATEGORY, error); + } + } + + private void warn(final String warning) { + logger.warn(warning); + if ( eventReporter != null ) { + eventReporter.reportEvent(Severity.WARNING, EVENT_CATEGORY, warning); + } + } + + private class SwapOutTask implements Runnable { - private final BlockingQueue connectionQueue; public SwapOutTask(final BlockingQueue connectionQueue) { @@ -486,20 +514,27 @@ public class FileSystemSwapManager implements FlowFileSwapManager { while (flowFileQueue.getSwapQueueSize() >= MINIMUM_SWAP_COUNT) { final File swapFile = new File(storageDirectory, System.currentTimeMillis() + "-" + UUID.randomUUID().toString() + ".swap"); + final File swapTempFile = new File(swapFile.getParentFile(), swapFile.getName() + ".part"); final String swapLocation = swapFile.getAbsolutePath(); final List toSwap = flowFileQueue.pollSwappableRecords(); int recordsSwapped; - try (final FileOutputStream fos = new FileOutputStream(swapFile)) { - recordsSwapped = serializeFlowFiles(toSwap, flowFileQueue, swapLocation, fos); - flowFileRepository.swapFlowFilesOut(toSwap, flowFileQueue, swapLocation); - fos.getFD().sync(); + try { + try (final FileOutputStream fos = new FileOutputStream(swapTempFile)) { + recordsSwapped = serializeFlowFiles(toSwap, flowFileQueue, swapLocation, fos); + fos.getFD().sync(); + } + + if ( swapTempFile.renameTo(swapFile) ) { + flowFileRepository.swapFlowFilesOut(toSwap, flowFileQueue, swapLocation); + } else { + error("Failed to swap out FlowFiles from " + flowFileQueue + " due to: Unable to rename swap file from " + swapTempFile + " to " + swapFile); + recordsSwapped = 0; + } } catch (final IOException ioe) { recordsSwapped = 0; flowFileQueue.putSwappedRecords(toSwap); - final String errMsg = "Failed to swap out " + toSwap.size() + " FlowFiles from " + flowFileQueue + " to Swap File " + swapLocation + " due to " + ioe; - logger.error(errMsg); - eventReporter.reportEvent(Severity.ERROR, EVENT_CATEGORY, errMsg); + error("Failed to swap out " + toSwap.size() + " FlowFiles from " + flowFileQueue + " to Swap File " + swapLocation + " due to " + ioe, ioe); } if (recordsSwapped > 0) { @@ -514,7 +549,7 @@ public class FileSystemSwapManager implements FlowFileSwapManager { swapQueue.getQueue().add(swapFile); } else { - swapFile.delete(); + swapTempFile.delete(); } } } @@ -533,7 +568,7 @@ public class FileSystemSwapManager implements FlowFileSwapManager { final File[] swapFiles = storageDirectory.listFiles(new FilenameFilter() { @Override public boolean accept(final File dir, final String name) { - return SWAP_FILE_PATTERN.matcher(name).matches(); + return SWAP_FILE_PATTERN.matcher(name).matches() || TEMP_SWAP_FILE_PATTERN.matcher(name).matches(); } }); @@ -553,6 +588,16 @@ public class FileSystemSwapManager implements FlowFileSwapManager { long maxRecoveredId = 0L; for (final File swapFile : swapFiles) { + if ( TEMP_SWAP_FILE_PATTERN.matcher(swapFile.getName()).matches() ) { + if ( swapFile.delete() ) { + logger.info("Removed incomplete/temporary Swap File " + swapFile); + } else { + warn("Failed to remove incomplete/temporary Swap File " + swapFile + "; this file should be cleaned up manually"); + } + + continue; + } + // read record to disk via the swap file try (final InputStream fis = new FileInputStream(swapFile); final InputStream bufferedIn = new BufferedInputStream(fis); @@ -570,8 +615,7 @@ public class FileSystemSwapManager implements FlowFileSwapManager { final String connectionId = in.readUTF(); final FlowFileQueue queue = queueMap.get(connectionId); if (queue == null) { - logger.error("Cannot recover Swapped FlowFiles from Swap File {} because the FlowFiles belong to a Connection with ID {} and that Connection does not exist", swapFile, connectionId); - eventReporter.reportEvent(Severity.ERROR, EVENT_CATEGORY, "Cannot recover Swapped FlowFiles from Swap File " + swapFile + " because the FlowFiles belong to a Connection with ID " + connectionId + " and that Connection does not exist"); + error("Cannot recover Swapped FlowFiles from Swap File " + swapFile + " because the FlowFiles belong to a Connection with ID " + connectionId + " and that Connection does not exist"); continue; } @@ -594,12 +638,7 @@ public class FileSystemSwapManager implements FlowFileSwapManager { maxRecoveredId = maxId; } } catch (final IOException ioe) { - final String errMsg = "Cannot recover Swapped FlowFiles from Swap File " + swapFile + " due to " + ioe; - logger.error(errMsg); - eventReporter.reportEvent(Severity.ERROR, EVENT_CATEGORY, errMsg); - if (logger.isDebugEnabled()) { - logger.error("", ioe); - } + error("Cannot recover Swapped FlowFiles from Swap File " + swapFile + " due to " + ioe, ioe); } }