Merge branch 'NIFI-33' into develop

This commit is contained in:
Mark Payne 2014-12-30 09:08:19 -05:00
commit bfe39c0b82
1 changed files with 63 additions and 24 deletions

View File

@ -22,6 +22,7 @@ import java.io.DataOutputStream;
import java.io.EOFException; import java.io.EOFException;
import java.io.File; import java.io.File;
import java.io.FileInputStream; import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream; import java.io.FileOutputStream;
import java.io.FilenameFilter; import java.io.FilenameFilter;
import java.io.IOException; import java.io.IOException;
@ -80,6 +81,8 @@ public class FileSystemSwapManager implements FlowFileSwapManager {
public static final int MINIMUM_SWAP_COUNT = 10000; public static final int MINIMUM_SWAP_COUNT = 10000;
private static final Pattern SWAP_FILE_PATTERN = Pattern.compile("\\d+-.+\\.swap"); private static final Pattern SWAP_FILE_PATTERN = Pattern.compile("\\d+-.+\\.swap");
private static final Pattern TEMP_SWAP_FILE_PATTERN = Pattern.compile("\\d+-.+\\.swap\\.part");
public static final int SWAP_ENCODING_VERSION = 6; public static final int SWAP_ENCODING_VERSION = 6;
public static final String EVENT_CATEGORY = "Swap FlowFiles"; public static final String EVENT_CATEGORY = "Swap FlowFiles";
@ -441,14 +444,18 @@ public class FileSystemSwapManager implements FlowFileSwapManager {
} }
if (!swapFile.delete()) { if (!swapFile.delete()) {
final String errMsg = "Swapped in FlowFiles from file " + swapFile.getAbsolutePath() + " but failed to delete the file; this file should be cleaned up manually"; warn("Swapped in FlowFiles from file " + swapFile.getAbsolutePath() + " but failed to delete the file; this file should be cleaned up manually");
logger.warn(errMsg);
eventReporter.reportEvent(Severity.WARNING, EVENT_CATEGORY, errMsg);
} }
} catch (final EOFException eof) {
error("Failed to Swap In FlowFiles for " + flowFileQueue + " due to: Corrupt Swap File; will remove this Swap File: " + swapFile);
if ( !swapFile.delete() ) {
warn("Failed to remove corrupt Swap File " + swapFile + "; This file should be cleaned up manually");
}
} catch (final FileNotFoundException fnfe) {
error("Failed to Swap In FlowFiles for " + flowFileQueue + " due to: Could not find Swap File " + swapFile);
} catch (final Exception e) { } catch (final Exception e) {
final String errMsg = "Failed to Swap In FlowFiles for " + flowFileQueue + " due to " + e; error("Failed to Swap In FlowFiles for " + flowFileQueue + " due to " + e, e);
logger.error(errMsg);
eventReporter.reportEvent(Severity.ERROR, EVENT_CATEGORY, errMsg);
if (swapFile != null) { if (swapFile != null) {
queue.add(swapFile); queue.add(swapFile);
@ -463,8 +470,29 @@ public class FileSystemSwapManager implements FlowFileSwapManager {
} }
} }
private void error(final String error, final Throwable t) {
error(error);
if ( logger.isDebugEnabled() ) {
logger.error("", t);
}
}
private void error(final String error) {
logger.error(error);
if ( eventReporter != null ) {
eventReporter.reportEvent(Severity.ERROR, EVENT_CATEGORY, error);
}
}
private void warn(final String warning) {
logger.warn(warning);
if ( eventReporter != null ) {
eventReporter.reportEvent(Severity.WARNING, EVENT_CATEGORY, warning);
}
}
private class SwapOutTask implements Runnable { private class SwapOutTask implements Runnable {
private final BlockingQueue<FlowFileQueue> connectionQueue; private final BlockingQueue<FlowFileQueue> connectionQueue;
public SwapOutTask(final BlockingQueue<FlowFileQueue> connectionQueue) { public SwapOutTask(final BlockingQueue<FlowFileQueue> connectionQueue) {
@ -486,20 +514,27 @@ public class FileSystemSwapManager implements FlowFileSwapManager {
while (flowFileQueue.getSwapQueueSize() >= MINIMUM_SWAP_COUNT) { while (flowFileQueue.getSwapQueueSize() >= MINIMUM_SWAP_COUNT) {
final File swapFile = new File(storageDirectory, System.currentTimeMillis() + "-" + UUID.randomUUID().toString() + ".swap"); final File swapFile = new File(storageDirectory, System.currentTimeMillis() + "-" + UUID.randomUUID().toString() + ".swap");
final File swapTempFile = new File(swapFile.getParentFile(), swapFile.getName() + ".part");
final String swapLocation = swapFile.getAbsolutePath(); final String swapLocation = swapFile.getAbsolutePath();
final List<FlowFileRecord> toSwap = flowFileQueue.pollSwappableRecords(); final List<FlowFileRecord> toSwap = flowFileQueue.pollSwappableRecords();
int recordsSwapped; int recordsSwapped;
try (final FileOutputStream fos = new FileOutputStream(swapFile)) { try {
recordsSwapped = serializeFlowFiles(toSwap, flowFileQueue, swapLocation, fos); try (final FileOutputStream fos = new FileOutputStream(swapTempFile)) {
flowFileRepository.swapFlowFilesOut(toSwap, flowFileQueue, swapLocation); recordsSwapped = serializeFlowFiles(toSwap, flowFileQueue, swapLocation, fos);
fos.getFD().sync(); fos.getFD().sync();
}
if ( swapTempFile.renameTo(swapFile) ) {
flowFileRepository.swapFlowFilesOut(toSwap, flowFileQueue, swapLocation);
} else {
error("Failed to swap out FlowFiles from " + flowFileQueue + " due to: Unable to rename swap file from " + swapTempFile + " to " + swapFile);
recordsSwapped = 0;
}
} catch (final IOException ioe) { } catch (final IOException ioe) {
recordsSwapped = 0; recordsSwapped = 0;
flowFileQueue.putSwappedRecords(toSwap); flowFileQueue.putSwappedRecords(toSwap);
final String errMsg = "Failed to swap out " + toSwap.size() + " FlowFiles from " + flowFileQueue + " to Swap File " + swapLocation + " due to " + ioe; error("Failed to swap out " + toSwap.size() + " FlowFiles from " + flowFileQueue + " to Swap File " + swapLocation + " due to " + ioe, ioe);
logger.error(errMsg);
eventReporter.reportEvent(Severity.ERROR, EVENT_CATEGORY, errMsg);
} }
if (recordsSwapped > 0) { if (recordsSwapped > 0) {
@ -514,7 +549,7 @@ public class FileSystemSwapManager implements FlowFileSwapManager {
swapQueue.getQueue().add(swapFile); swapQueue.getQueue().add(swapFile);
} else { } else {
swapFile.delete(); swapTempFile.delete();
} }
} }
} }
@ -533,7 +568,7 @@ public class FileSystemSwapManager implements FlowFileSwapManager {
final File[] swapFiles = storageDirectory.listFiles(new FilenameFilter() { final File[] swapFiles = storageDirectory.listFiles(new FilenameFilter() {
@Override @Override
public boolean accept(final File dir, final String name) { public boolean accept(final File dir, final String name) {
return SWAP_FILE_PATTERN.matcher(name).matches(); return SWAP_FILE_PATTERN.matcher(name).matches() || TEMP_SWAP_FILE_PATTERN.matcher(name).matches();
} }
}); });
@ -553,6 +588,16 @@ public class FileSystemSwapManager implements FlowFileSwapManager {
long maxRecoveredId = 0L; long maxRecoveredId = 0L;
for (final File swapFile : swapFiles) { for (final File swapFile : swapFiles) {
if ( TEMP_SWAP_FILE_PATTERN.matcher(swapFile.getName()).matches() ) {
if ( swapFile.delete() ) {
logger.info("Removed incomplete/temporary Swap File " + swapFile);
} else {
warn("Failed to remove incomplete/temporary Swap File " + swapFile + "; this file should be cleaned up manually");
}
continue;
}
// read record to disk via the swap file // read record to disk via the swap file
try (final InputStream fis = new FileInputStream(swapFile); try (final InputStream fis = new FileInputStream(swapFile);
final InputStream bufferedIn = new BufferedInputStream(fis); final InputStream bufferedIn = new BufferedInputStream(fis);
@ -570,8 +615,7 @@ public class FileSystemSwapManager implements FlowFileSwapManager {
final String connectionId = in.readUTF(); final String connectionId = in.readUTF();
final FlowFileQueue queue = queueMap.get(connectionId); final FlowFileQueue queue = queueMap.get(connectionId);
if (queue == null) { if (queue == null) {
logger.error("Cannot recover Swapped FlowFiles from Swap File {} because the FlowFiles belong to a Connection with ID {} and that Connection does not exist", swapFile, connectionId); error("Cannot recover Swapped FlowFiles from Swap File " + swapFile + " because the FlowFiles belong to a Connection with ID " + connectionId + " and that Connection does not exist");
eventReporter.reportEvent(Severity.ERROR, EVENT_CATEGORY, "Cannot recover Swapped FlowFiles from Swap File " + swapFile + " because the FlowFiles belong to a Connection with ID " + connectionId + " and that Connection does not exist");
continue; continue;
} }
@ -594,12 +638,7 @@ public class FileSystemSwapManager implements FlowFileSwapManager {
maxRecoveredId = maxId; maxRecoveredId = maxId;
} }
} catch (final IOException ioe) { } catch (final IOException ioe) {
final String errMsg = "Cannot recover Swapped FlowFiles from Swap File " + swapFile + " due to " + ioe; error("Cannot recover Swapped FlowFiles from Swap File " + swapFile + " due to " + ioe, ioe);
logger.error(errMsg);
eventReporter.reportEvent(Severity.ERROR, EVENT_CATEGORY, errMsg);
if (logger.isDebugEnabled()) {
logger.error("", ioe);
}
} }
} }