NIFI-902: Ensure that if we get an IOException during rollover of WAL, we are able to recover

This commit is contained in:
Mark Payne 2015-08-27 16:25:24 -04:00
parent 76b5b38cc4
commit 5de37f63d9
3 changed files with 51 additions and 11 deletions

View File

@ -722,11 +722,20 @@ public final class MinimalLockingWriteAheadLog<T> implements WriteAheadRepositor
try { try {
final DataOutputStream out = dataOut; final DataOutputStream out = dataOut;
if (out != null) { if (out != null) {
try {
out.close(); out.close();
} catch (final IOException ioe) {
dataOut = null;
fileOut = null;
blackList();
throw ioe;
}
} }
final Path editPath = getNewEditPath(); final Path editPath = getNewEditPath();
final FileOutputStream fos = new FileOutputStream(editPath.toFile()); final FileOutputStream fos = new FileOutputStream(editPath.toFile());
try {
final DataOutputStream outStream = new DataOutputStream(new BufferedOutputStream(fos)); final DataOutputStream outStream = new DataOutputStream(new BufferedOutputStream(fos));
outStream.writeUTF(MinimalLockingWriteAheadLog.class.getName()); outStream.writeUTF(MinimalLockingWriteAheadLog.class.getName());
outStream.writeInt(writeAheadLogVersion); outStream.writeInt(writeAheadLogVersion);
@ -735,6 +744,19 @@ public final class MinimalLockingWriteAheadLog<T> implements WriteAheadRepositor
outStream.flush(); outStream.flush();
dataOut = outStream; dataOut = outStream;
fileOut = fos; fileOut = fos;
} catch (final IOException ioe) {
logger.error("Failed to create new journal for {} due to {}", new Object[] {this, ioe.toString()}, ioe);
try {
fos.close();
} catch (final IOException innerIOE) {
}
dataOut = null;
fileOut = null;
blackList();
throw ioe;
}
currentJournalFilename = editPath.toFile().getName(); currentJournalFilename = editPath.toFile().getName();

View File

@ -159,7 +159,18 @@ public class ContinuallyRunProcessorTask implements Callable<Boolean> {
} finally { } finally {
try { try {
if (batch) { if (batch) {
try {
rawSession.commit(); rawSession.commit();
} catch (final Exception e) {
final ProcessorLog procLog = new SimpleProcessLogger(procNode.getIdentifier(), procNode.getProcessor());
procLog.error("Failed to commit session {} due to {}; rolling back", new Object[] { rawSession, e.toString() }, e);
try {
rawSession.rollback(true);
} catch (final Exception e1) {
procLog.error("Failed to roll back session {} due to {}", new Object[] { rawSession, e.toString() }, e);
}
}
} }
final long processingNanos = System.nanoTime() - startNanos; final long processingNanos = System.nanoTime() - startNanos;

View File

@ -32,11 +32,14 @@ import org.apache.nifi.controller.scheduling.ProcessContextFactory;
import org.apache.nifi.groups.ProcessGroup; import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.groups.RemoteProcessGroup; import org.apache.nifi.groups.RemoteProcessGroup;
import org.apache.nifi.util.FormatUtils; import org.apache.nifi.util.FormatUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/** /**
* This task runs through all Connectable Components and goes through its incoming queues, polling for FlowFiles and accepting none. This causes the desired side effect of expiring old FlowFiles. * This task runs through all Connectable Components and goes through its incoming queues, polling for FlowFiles and accepting none. This causes the desired side effect of expiring old FlowFiles.
*/ */
public class ExpireFlowFiles implements Runnable { public class ExpireFlowFiles implements Runnable {
private static final Logger logger = LoggerFactory.getLogger(ExpireFlowFiles.class);
private final FlowController flowController; private final FlowController flowController;
private final ProcessContextFactory contextFactory; private final ProcessContextFactory contextFactory;
@ -49,7 +52,11 @@ public class ExpireFlowFiles implements Runnable {
@Override @Override
public void run() { public void run() {
final ProcessGroup rootGroup = flowController.getGroup(flowController.getRootGroupId()); final ProcessGroup rootGroup = flowController.getGroup(flowController.getRootGroupId());
try {
expireFlowFiles(rootGroup); expireFlowFiles(rootGroup);
} catch (final Exception e) {
logger.error("Failed to expire FlowFiles due to {}", e.toString(), e);
}
} }
private StandardProcessSession createSession(final Connectable connectable) { private StandardProcessSession createSession(final Connectable connectable) {