mirror of https://github.com/apache/nifi.git
NIFI-33: Ensure that we sync to disk, then rename file when swapping out; delete partial files on restart; destroy corrupt files on EOFException/FileNotFoundException when swapping in
This commit is contained in:
parent
1cc3ce5755
commit
ece5ce1409
|
@ -22,6 +22,7 @@ import java.io.DataOutputStream;
|
||||||
import java.io.EOFException;
|
import java.io.EOFException;
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
import java.io.FileInputStream;
|
import java.io.FileInputStream;
|
||||||
|
import java.io.FileNotFoundException;
|
||||||
import java.io.FileOutputStream;
|
import java.io.FileOutputStream;
|
||||||
import java.io.FilenameFilter;
|
import java.io.FilenameFilter;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
@ -80,6 +81,8 @@ public class FileSystemSwapManager implements FlowFileSwapManager {
|
||||||
|
|
||||||
public static final int MINIMUM_SWAP_COUNT = 10000;
|
public static final int MINIMUM_SWAP_COUNT = 10000;
|
||||||
private static final Pattern SWAP_FILE_PATTERN = Pattern.compile("\\d+-.+\\.swap");
|
private static final Pattern SWAP_FILE_PATTERN = Pattern.compile("\\d+-.+\\.swap");
|
||||||
|
private static final Pattern TEMP_SWAP_FILE_PATTERN = Pattern.compile("\\d+-.+\\.swap\\.part");
|
||||||
|
|
||||||
public static final int SWAP_ENCODING_VERSION = 6;
|
public static final int SWAP_ENCODING_VERSION = 6;
|
||||||
public static final String EVENT_CATEGORY = "Swap FlowFiles";
|
public static final String EVENT_CATEGORY = "Swap FlowFiles";
|
||||||
|
|
||||||
|
@ -441,14 +444,18 @@ public class FileSystemSwapManager implements FlowFileSwapManager {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!swapFile.delete()) {
|
if (!swapFile.delete()) {
|
||||||
final String errMsg = "Swapped in FlowFiles from file " + swapFile.getAbsolutePath() + " but failed to delete the file; this file should be cleaned up manually";
|
warn("Swapped in FlowFiles from file " + swapFile.getAbsolutePath() + " but failed to delete the file; this file should be cleaned up manually");
|
||||||
logger.warn(errMsg);
|
|
||||||
eventReporter.reportEvent(Severity.WARNING, EVENT_CATEGORY, errMsg);
|
|
||||||
}
|
}
|
||||||
|
} catch (final EOFException eof) {
|
||||||
|
error("Failed to Swap In FlowFiles for " + flowFileQueue + " due to: Corrupt Swap File; will remove this Swap File: " + swapFile);
|
||||||
|
|
||||||
|
if ( !swapFile.delete() ) {
|
||||||
|
warn("Failed to remove corrupt Swap File " + swapFile + "; This file should be cleaned up manually");
|
||||||
|
}
|
||||||
|
} catch (final FileNotFoundException fnfe) {
|
||||||
|
error("Failed to Swap In FlowFiles for " + flowFileQueue + " due to: Could not find Swap File " + swapFile);
|
||||||
} catch (final Exception e) {
|
} catch (final Exception e) {
|
||||||
final String errMsg = "Failed to Swap In FlowFiles for " + flowFileQueue + " due to " + e;
|
error("Failed to Swap In FlowFiles for " + flowFileQueue + " due to " + e, e);
|
||||||
logger.error(errMsg);
|
|
||||||
eventReporter.reportEvent(Severity.ERROR, EVENT_CATEGORY, errMsg);
|
|
||||||
|
|
||||||
if (swapFile != null) {
|
if (swapFile != null) {
|
||||||
queue.add(swapFile);
|
queue.add(swapFile);
|
||||||
|
@ -463,8 +470,29 @@ public class FileSystemSwapManager implements FlowFileSwapManager {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private class SwapOutTask implements Runnable {
|
private void error(final String error, final Throwable t) {
|
||||||
|
error(error);
|
||||||
|
if ( logger.isDebugEnabled() ) {
|
||||||
|
logger.error("", t);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void error(final String error) {
|
||||||
|
logger.error(error);
|
||||||
|
if ( eventReporter != null ) {
|
||||||
|
eventReporter.reportEvent(Severity.ERROR, EVENT_CATEGORY, error);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void warn(final String warning) {
|
||||||
|
logger.warn(warning);
|
||||||
|
if ( eventReporter != null ) {
|
||||||
|
eventReporter.reportEvent(Severity.WARNING, EVENT_CATEGORY, warning);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
private class SwapOutTask implements Runnable {
|
||||||
private final BlockingQueue<FlowFileQueue> connectionQueue;
|
private final BlockingQueue<FlowFileQueue> connectionQueue;
|
||||||
|
|
||||||
public SwapOutTask(final BlockingQueue<FlowFileQueue> connectionQueue) {
|
public SwapOutTask(final BlockingQueue<FlowFileQueue> connectionQueue) {
|
||||||
|
@ -486,20 +514,27 @@ public class FileSystemSwapManager implements FlowFileSwapManager {
|
||||||
|
|
||||||
while (flowFileQueue.getSwapQueueSize() >= MINIMUM_SWAP_COUNT) {
|
while (flowFileQueue.getSwapQueueSize() >= MINIMUM_SWAP_COUNT) {
|
||||||
final File swapFile = new File(storageDirectory, System.currentTimeMillis() + "-" + UUID.randomUUID().toString() + ".swap");
|
final File swapFile = new File(storageDirectory, System.currentTimeMillis() + "-" + UUID.randomUUID().toString() + ".swap");
|
||||||
|
final File swapTempFile = new File(swapFile.getParentFile(), swapFile.getName() + ".part");
|
||||||
final String swapLocation = swapFile.getAbsolutePath();
|
final String swapLocation = swapFile.getAbsolutePath();
|
||||||
final List<FlowFileRecord> toSwap = flowFileQueue.pollSwappableRecords();
|
final List<FlowFileRecord> toSwap = flowFileQueue.pollSwappableRecords();
|
||||||
|
|
||||||
int recordsSwapped;
|
int recordsSwapped;
|
||||||
try (final FileOutputStream fos = new FileOutputStream(swapFile)) {
|
try {
|
||||||
recordsSwapped = serializeFlowFiles(toSwap, flowFileQueue, swapLocation, fos);
|
try (final FileOutputStream fos = new FileOutputStream(swapTempFile)) {
|
||||||
flowFileRepository.swapFlowFilesOut(toSwap, flowFileQueue, swapLocation);
|
recordsSwapped = serializeFlowFiles(toSwap, flowFileQueue, swapLocation, fos);
|
||||||
fos.getFD().sync();
|
fos.getFD().sync();
|
||||||
|
}
|
||||||
|
|
||||||
|
if ( swapTempFile.renameTo(swapFile) ) {
|
||||||
|
flowFileRepository.swapFlowFilesOut(toSwap, flowFileQueue, swapLocation);
|
||||||
|
} else {
|
||||||
|
error("Failed to swap out FlowFiles from " + flowFileQueue + " due to: Unable to rename swap file from " + swapTempFile + " to " + swapFile);
|
||||||
|
recordsSwapped = 0;
|
||||||
|
}
|
||||||
} catch (final IOException ioe) {
|
} catch (final IOException ioe) {
|
||||||
recordsSwapped = 0;
|
recordsSwapped = 0;
|
||||||
flowFileQueue.putSwappedRecords(toSwap);
|
flowFileQueue.putSwappedRecords(toSwap);
|
||||||
final String errMsg = "Failed to swap out " + toSwap.size() + " FlowFiles from " + flowFileQueue + " to Swap File " + swapLocation + " due to " + ioe;
|
error("Failed to swap out " + toSwap.size() + " FlowFiles from " + flowFileQueue + " to Swap File " + swapLocation + " due to " + ioe, ioe);
|
||||||
logger.error(errMsg);
|
|
||||||
eventReporter.reportEvent(Severity.ERROR, EVENT_CATEGORY, errMsg);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (recordsSwapped > 0) {
|
if (recordsSwapped > 0) {
|
||||||
|
@ -533,7 +568,7 @@ public class FileSystemSwapManager implements FlowFileSwapManager {
|
||||||
final File[] swapFiles = storageDirectory.listFiles(new FilenameFilter() {
|
final File[] swapFiles = storageDirectory.listFiles(new FilenameFilter() {
|
||||||
@Override
|
@Override
|
||||||
public boolean accept(final File dir, final String name) {
|
public boolean accept(final File dir, final String name) {
|
||||||
return SWAP_FILE_PATTERN.matcher(name).matches();
|
return SWAP_FILE_PATTERN.matcher(name).matches() || TEMP_SWAP_FILE_PATTERN.matcher(name).matches();
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
|
@ -553,6 +588,16 @@ public class FileSystemSwapManager implements FlowFileSwapManager {
|
||||||
long maxRecoveredId = 0L;
|
long maxRecoveredId = 0L;
|
||||||
|
|
||||||
for (final File swapFile : swapFiles) {
|
for (final File swapFile : swapFiles) {
|
||||||
|
if ( TEMP_SWAP_FILE_PATTERN.matcher(swapFile.getName()).matches() ) {
|
||||||
|
if ( swapFile.delete() ) {
|
||||||
|
logger.info("Removed incomplete/temporary Swap File " + swapFile);
|
||||||
|
} else {
|
||||||
|
warn("Failed to remove incomplete/temporary Swap File " + swapFile + "; this file should be cleaned up manually");
|
||||||
|
}
|
||||||
|
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
// read record to disk via the swap file
|
// read record to disk via the swap file
|
||||||
try (final InputStream fis = new FileInputStream(swapFile);
|
try (final InputStream fis = new FileInputStream(swapFile);
|
||||||
final InputStream bufferedIn = new BufferedInputStream(fis);
|
final InputStream bufferedIn = new BufferedInputStream(fis);
|
||||||
|
@ -570,8 +615,7 @@ public class FileSystemSwapManager implements FlowFileSwapManager {
|
||||||
final String connectionId = in.readUTF();
|
final String connectionId = in.readUTF();
|
||||||
final FlowFileQueue queue = queueMap.get(connectionId);
|
final FlowFileQueue queue = queueMap.get(connectionId);
|
||||||
if (queue == null) {
|
if (queue == null) {
|
||||||
logger.error("Cannot recover Swapped FlowFiles from Swap File {} because the FlowFiles belong to a Connection with ID {} and that Connection does not exist", swapFile, connectionId);
|
error("Cannot recover Swapped FlowFiles from Swap File " + swapFile + " because the FlowFiles belong to a Connection with ID " + connectionId + " and that Connection does not exist");
|
||||||
eventReporter.reportEvent(Severity.ERROR, EVENT_CATEGORY, "Cannot recover Swapped FlowFiles from Swap File " + swapFile + " because the FlowFiles belong to a Connection with ID " + connectionId + " and that Connection does not exist");
|
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -594,12 +638,7 @@ public class FileSystemSwapManager implements FlowFileSwapManager {
|
||||||
maxRecoveredId = maxId;
|
maxRecoveredId = maxId;
|
||||||
}
|
}
|
||||||
} catch (final IOException ioe) {
|
} catch (final IOException ioe) {
|
||||||
final String errMsg = "Cannot recover Swapped FlowFiles from Swap File " + swapFile + " due to " + ioe;
|
error("Cannot recover Swapped FlowFiles from Swap File " + swapFile + " due to " + ioe, ioe);
|
||||||
logger.error(errMsg);
|
|
||||||
eventReporter.reportEvent(Severity.ERROR, EVENT_CATEGORY, errMsg);
|
|
||||||
if (logger.isDebugEnabled()) {
|
|
||||||
logger.error("", ioe);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue