NIFI-3897: If swapping data into queue and an unexpected exception/error is thrown, do not lose track of the swap file

Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com>

This closes #1960.
This commit is contained in:
Mark Payne 2017-06-29 10:49:34 -04:00 committed by Pierre Villard
parent cdc154f7c8
commit 7f4cfd51ea
2 changed files with 91 additions and 2 deletions

View File

@ -459,21 +459,25 @@ public class StandardFlowFileQueue implements FlowFileQueue {
// keep up with queue), we will end up always processing the new FlowFiles first instead of the FlowFiles that arrived // keep up with queue), we will end up always processing the new FlowFiles first instead of the FlowFiles that arrived
// first. // first.
if (!swapLocations.isEmpty()) { if (!swapLocations.isEmpty()) {
final String swapLocation = swapLocations.remove(0); final String swapLocation = swapLocations.get(0);
boolean partialContents = false; boolean partialContents = false;
SwapContents swapContents = null; SwapContents swapContents = null;
try { try {
swapContents = swapManager.swapIn(swapLocation, this); swapContents = swapManager.swapIn(swapLocation, this);
swapLocations.remove(0);
} catch (final IncompleteSwapFileException isfe) { } catch (final IncompleteSwapFileException isfe) {
logger.error("Failed to swap in all FlowFiles from Swap File {}; Swap File ended prematurely. The records that were present will still be swapped in", swapLocation); logger.error("Failed to swap in all FlowFiles from Swap File {}; Swap File ended prematurely. The records that were present will still be swapped in", swapLocation);
logger.error("", isfe); logger.error("", isfe);
swapContents = isfe.getPartialContents(); swapContents = isfe.getPartialContents();
partialContents = true; partialContents = true;
swapLocations.remove(0);
} catch (final FileNotFoundException fnfe) { } catch (final FileNotFoundException fnfe) {
logger.error("Failed to swap in FlowFiles from Swap File {} because the Swap File can no longer be found", swapLocation); logger.error("Failed to swap in FlowFiles from Swap File {} because the Swap File can no longer be found", swapLocation);
if (eventReporter != null) { if (eventReporter != null) {
eventReporter.reportEvent(Severity.ERROR, "Swap File", "Failed to swap in FlowFiles from Swap File " + swapLocation + " because the Swap File can no longer be found"); eventReporter.reportEvent(Severity.ERROR, "Swap File", "Failed to swap in FlowFiles from Swap File " + swapLocation + " because the Swap File can no longer be found");
} }
swapLocations.remove(0);
return; return;
} catch (final IOException ioe) { } catch (final IOException ioe) {
logger.error("Failed to swap in FlowFiles from Swap File {}; Swap File appears to be corrupt!", swapLocation); logger.error("Failed to swap in FlowFiles from Swap File {}; Swap File appears to be corrupt!", swapLocation);
@ -482,7 +486,17 @@ public class StandardFlowFileQueue implements FlowFileQueue {
eventReporter.reportEvent(Severity.ERROR, "Swap File", "Failed to swap in FlowFiles from Swap File " + eventReporter.reportEvent(Severity.ERROR, "Swap File", "Failed to swap in FlowFiles from Swap File " +
swapLocation + "; Swap File appears to be corrupt! Some FlowFiles in the queue may not be accessible. See logs for more information."); swapLocation + "; Swap File appears to be corrupt! Some FlowFiles in the queue may not be accessible. See logs for more information.");
} }
// We do not remove the Swap File from swapLocations because the IOException may be recoverable later. For instance, the file may be on a network
// drive and we may have connectivity problems, etc.
return; return;
} catch (final Throwable t) {
logger.error("Failed to swap in FlowFiles from Swap File {}", swapLocation, t);
// We do not remove the Swap File from swapLocations because this is an unexpected failure that may be retry-able. For example, if there were
// an OOME, etc. then we don't want to he queue to still reflect that the data is around but never swap it in. By leaving the Swap File
// in swapLocations, we will continue to retry.
throw t;
} }
final QueueSize swapSize = swapContents.getSummary().getQueueSize(); final QueueSize swapSize = swapContents.getSummary().getQueueSize();
@ -516,7 +530,7 @@ public class StandardFlowFileQueue implements FlowFileQueue {
if (size.get().swappedCount > swapQueue.size()) { if (size.get().swappedCount > swapQueue.size()) {
// we already have FlowFiles swapped out, so we won't migrate the queue; we will wait for // we already have FlowFiles swapped out, so we won't migrate the queue; we will wait for
// an external process to swap FlowFiles back in. // the files to be swapped back in first
return; return;
} }

View File

@ -63,6 +63,7 @@ import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.provenance.ProvenanceEventRepository; import org.apache.nifi.provenance.ProvenanceEventRepository;
import org.apache.nifi.provenance.ProvenanceEventType; import org.apache.nifi.provenance.ProvenanceEventType;
import org.apache.nifi.provenance.StandardProvenanceEventRecord; import org.apache.nifi.provenance.StandardProvenanceEventRecord;
import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
import org.junit.BeforeClass; import org.junit.BeforeClass;
import org.junit.Test; import org.junit.Test;
@ -583,6 +584,62 @@ public class TestStandardFlowFileQueue {
} }
@Test
public void testOOMEFollowedBySuccessfulSwapIn() {
final List<FlowFileRecord> flowFiles = new ArrayList<>();
for (int i = 0; i < 50000; i++) {
flowFiles.add(new TestFlowFile());
}
queue.putAll(flowFiles);
swapManager.failSwapInAfterN = 2;
swapManager.setSwapInFailure(new OutOfMemoryError("Intentional OOME for unit test"));
final Set<FlowFileRecord> expiredRecords = new HashSet<>();
for (int i = 0; i < 30000; i++) {
final FlowFileRecord polled = queue.poll(expiredRecords);
assertNotNull(polled);
}
// verify that unexpected ERROR's are handled in such a way that we keep retrying
for (int i = 0; i < 3; i++) {
try {
queue.poll(expiredRecords);
Assert.fail("Expected OOME to be thrown");
} catch (final OutOfMemoryError oome) {
// expected
}
}
// verify that unexpected Runtime Exceptions are handled in such a way that we keep retrying
swapManager.setSwapInFailure(new NullPointerException("Intentional OOME for unit test"));
for (int i = 0; i < 3; i++) {
try {
queue.poll(expiredRecords);
Assert.fail("Expected NPE to be thrown");
} catch (final NullPointerException npe) {
// expected
}
}
swapManager.failSwapInAfterN = -1;
for (int i = 0; i < 20000; i++) {
final FlowFileRecord polled = queue.poll(expiredRecords);
assertNotNull(polled);
}
queue.acknowledge(flowFiles);
assertNull(queue.poll(expiredRecords));
assertEquals(0, queue.getActiveQueueSize().getObjectCount());
assertEquals(0, queue.size().getObjectCount());
assertTrue(swapManager.swappedOut.isEmpty());
}
private class TestSwapManager implements FlowFileSwapManager { private class TestSwapManager implements FlowFileSwapManager {
private final Map<String, List<FlowFileRecord>> swappedOut = new HashMap<>(); private final Map<String, List<FlowFileRecord>> swappedOut = new HashMap<>();
int swapOutCalledCount = 0; int swapOutCalledCount = 0;
@ -590,6 +647,13 @@ public class TestStandardFlowFileQueue {
private int incompleteSwapFileRecordsToInclude = -1; private int incompleteSwapFileRecordsToInclude = -1;
private int failSwapInAfterN = -1;
private Throwable failSwapInFailure = null;
private void setSwapInFailure(final Throwable t) {
this.failSwapInFailure = t;
}
@Override @Override
public void initialize(final SwapManagerInitializationContext initializationContext) { public void initialize(final SwapManagerInitializationContext initializationContext) {
@ -622,6 +686,17 @@ public class TestStandardFlowFileQueue {
final SwapContents partialContents = new StandardSwapContents(summary, partial); final SwapContents partialContents = new StandardSwapContents(summary, partial);
throw new IncompleteSwapFileException(swapLocation, partialContents); throw new IncompleteSwapFileException(swapLocation, partialContents);
} }
if (swapInCalledCount > failSwapInAfterN && failSwapInAfterN > -1) {
if (failSwapInFailure instanceof RuntimeException) {
throw (RuntimeException) failSwapInFailure;
}
if (failSwapInFailure instanceof Error) {
throw (Error) failSwapInFailure;
}
throw new RuntimeException(failSwapInFailure);
}
} }
@Override @Override