NIFI-841: Ensure that IOExceptions on session commit are handled properly

Signed-off-by: joewitt <joewitt@apache.org>
Signed-off-by: Mark Payne <markap14@hotmail.com>
This commit is contained in:
Mark Payne 2015-08-11 16:19:47 -04:00
parent a264c49d80
commit 35439db347
1 changed files with 165 additions and 138 deletions

View File

@ -292,138 +292,148 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
@SuppressWarnings({"unchecked", "rawtypes"})
private void commit(final Checkpoint checkpoint) {
final long commitStartNanos = System.nanoTime();
resetWriteClaims();
resetReadClaim();
final long updateProvenanceStart = System.nanoTime();
updateProvenanceRepo(checkpoint);
final long claimRemovalStart = System.nanoTime();
final long updateProvenanceNanos = claimRemovalStart - updateProvenanceStart;
// Figure out which content claims can be released.
// At this point, we will decrement the Claimant Count for the claims via the Content Repository.
// We do not actually destroy the content because otherwise, we could remove the
// Original Claim and crash/restart before the FlowFileRepository is updated. This will result in the FlowFile being restored such that
// the content claim points to the Original Claim -- which has already been removed!
for (final Map.Entry<FlowFileRecord, StandardRepositoryRecord> entry : checkpoint.records.entrySet()) {
final FlowFile flowFile = entry.getKey();
final StandardRepositoryRecord record = entry.getValue();
if (record.isMarkedForDelete()) {
// if the working claim is not the same as the original claim, we can immediately destroy the working claim
// because it was created in this session and is to be deleted. We don't need to wait for the FlowFile Repo to sync.
removeContent(record.getWorkingClaim());
if (record.getOriginalClaim() != null && !record.getOriginalClaim().equals(record.getWorkingClaim())) {
// if working & original claim are same, don't remove twice; we only want to remove the original
// if it's different from the working. Otherwise, we remove two claimant counts. This causes
// an issue if we only updated the FlowFile attributes.
removeContent(record.getOriginalClaim());
}
final long flowFileLife = System.currentTimeMillis() - flowFile.getEntryDate();
final Connectable connectable = context.getConnectable();
final Object terminator = connectable instanceof ProcessorNode ? ((ProcessorNode) connectable).getProcessor() : connectable;
LOG.info("{} terminated by {}; life of FlowFile = {} ms", new Object[]{flowFile, terminator, flowFileLife});
} else if (record.isWorking() && record.getWorkingClaim() != record.getOriginalClaim()) {
//records which have been updated - remove original if exists
removeContent(record.getOriginalClaim());
}
}
final long claimRemovalFinishNanos = System.nanoTime();
final long claimRemovalNanos = claimRemovalFinishNanos - claimRemovalStart;
// Update the FlowFile Repository
try {
final Collection<StandardRepositoryRecord> repoRecords = checkpoint.records.values();
context.getFlowFileRepository().updateRepository((Collection) repoRecords);
} catch (final IOException ioe) {
rollback();
throw new ProcessException("FlowFile Repository failed to update", ioe);
}
final long flowFileRepoUpdateFinishNanos = System.nanoTime();
final long flowFileRepoUpdateNanos = flowFileRepoUpdateFinishNanos - claimRemovalFinishNanos;
updateEventRepository(checkpoint);
final long updateEventRepositoryFinishNanos = System.nanoTime();
final long updateEventRepositoryNanos = updateEventRepositoryFinishNanos - claimRemovalFinishNanos;
// transfer the flowfiles to the connections' queues.
final Map<FlowFileQueue, Collection<FlowFileRecord>> recordMap = new HashMap<>();
for (final StandardRepositoryRecord record : checkpoint.records.values()) {
if (record.isMarkedForAbort() || record.isMarkedForDelete()) {
continue; //these don't need to be transferred
}
// record.getCurrent() will return null if this record was created in this session --
// in this case, we just ignore it, and it will be cleaned up by clearing the records map.
if (record.getCurrent() != null) {
Collection<FlowFileRecord> collection = recordMap.get(record.getDestination());
if (collection == null) {
collection = new ArrayList<>();
recordMap.put(record.getDestination(), collection);
}
collection.add(record.getCurrent());
}
}
for (final Map.Entry<FlowFileQueue, Collection<FlowFileRecord>> entry : recordMap.entrySet()) {
entry.getKey().putAll(entry.getValue());
}
final long enqueueFlowFileFinishNanos = System.nanoTime();
final long enqueueFlowFileNanos = enqueueFlowFileFinishNanos - updateEventRepositoryFinishNanos;
// Delete any files from disk that need to be removed.
for (final Path path : checkpoint.deleteOnCommit) {
try {
Files.deleteIfExists(path);
} catch (final IOException e) {
throw new FlowFileAccessException("Unable to delete " + path.toFile().getAbsolutePath(), e);
}
}
checkpoint.deleteOnCommit.clear();
if (LOG.isInfoEnabled()) {
final String sessionSummary = summarizeEvents(checkpoint);
if (!sessionSummary.isEmpty()) {
LOG.info("{} for {}, committed the following events: {}", new Object[]{this, connectableDescription, sessionSummary});
}
}
for (final Map.Entry<String, Long> entry : checkpoint.localCounters.entrySet()) {
adjustCounter(entry.getKey(), entry.getValue(), true);
}
for (final Map.Entry<String, Long> entry : checkpoint.globalCounters.entrySet()) {
adjustCounter(entry.getKey(), entry.getValue(), true);
}
acknowledgeRecords();
resetState();
if (LOG.isDebugEnabled()) {
final StringBuilder timingInfo = new StringBuilder();
timingInfo.append("Session commit for ").append(this).append(" [").append(connectableDescription).append("]").append(" took ");
final long commitNanos = System.nanoTime() - commitStartNanos;
formatNanos(commitNanos, timingInfo);
timingInfo.append("; FlowFile Repository Update took ");
formatNanos(flowFileRepoUpdateNanos, timingInfo);
timingInfo.append("; Claim Removal took ");
formatNanos(claimRemovalNanos, timingInfo);
timingInfo.append("; FlowFile Event Update took ");
formatNanos(updateEventRepositoryNanos, timingInfo);
timingInfo.append("; Enqueuing FlowFiles took ");
formatNanos(enqueueFlowFileNanos, timingInfo);
timingInfo.append("; Updating Provenance Event Repository took ");
formatNanos(updateProvenanceNanos, timingInfo);
LOG.debug(timingInfo.toString());
}
try {
final long commitStartNanos = System.nanoTime();
resetWriteClaims(false);
resetReadClaim();
final long updateProvenanceStart = System.nanoTime();
updateProvenanceRepo(checkpoint);
final long claimRemovalStart = System.nanoTime();
final long updateProvenanceNanos = claimRemovalStart - updateProvenanceStart;
// Figure out which content claims can be released.
// At this point, we will decrement the Claimant Count for the claims via the Content Repository.
// We do not actually destroy the content because otherwise, we could remove the
// Original Claim and crash/restart before the FlowFileRepository is updated. This will result in the FlowFile being restored such that
// the content claim points to the Original Claim -- which has already been removed!
for (final Map.Entry<FlowFileRecord, StandardRepositoryRecord> entry : checkpoint.records.entrySet()) {
final FlowFile flowFile = entry.getKey();
final StandardRepositoryRecord record = entry.getValue();
if (record.isMarkedForDelete()) {
// if the working claim is not the same as the original claim, we can immediately destroy the working claim
// because it was created in this session and is to be deleted. We don't need to wait for the FlowFile Repo to sync.
removeContent(record.getWorkingClaim());
if (record.getOriginalClaim() != null && !record.getOriginalClaim().equals(record.getWorkingClaim())) {
// if working & original claim are same, don't remove twice; we only want to remove the original
// if it's different from the working. Otherwise, we remove two claimant counts. This causes
// an issue if we only updated the FlowFile attributes.
removeContent(record.getOriginalClaim());
}
final long flowFileLife = System.currentTimeMillis() - flowFile.getEntryDate();
final Connectable connectable = context.getConnectable();
final Object terminator = connectable instanceof ProcessorNode ? ((ProcessorNode) connectable).getProcessor() : connectable;
LOG.info("{} terminated by {}; life of FlowFile = {} ms", new Object[]{flowFile, terminator, flowFileLife});
} else if (record.isWorking() && record.getWorkingClaim() != record.getOriginalClaim()) {
//records which have been updated - remove original if exists
removeContent(record.getOriginalClaim());
}
}
final long claimRemovalFinishNanos = System.nanoTime();
final long claimRemovalNanos = claimRemovalFinishNanos - claimRemovalStart;
// Update the FlowFile Repository
try {
final Collection<StandardRepositoryRecord> repoRecords = checkpoint.records.values();
context.getFlowFileRepository().updateRepository((Collection) repoRecords);
} catch (final IOException ioe) {
rollback();
throw new ProcessException("FlowFile Repository failed to update", ioe);
}
final long flowFileRepoUpdateFinishNanos = System.nanoTime();
final long flowFileRepoUpdateNanos = flowFileRepoUpdateFinishNanos - claimRemovalFinishNanos;
updateEventRepository(checkpoint);
final long updateEventRepositoryFinishNanos = System.nanoTime();
final long updateEventRepositoryNanos = updateEventRepositoryFinishNanos - claimRemovalFinishNanos;
// transfer the flowfiles to the connections' queues.
final Map<FlowFileQueue, Collection<FlowFileRecord>> recordMap = new HashMap<>();
for (final StandardRepositoryRecord record : checkpoint.records.values()) {
if (record.isMarkedForAbort() || record.isMarkedForDelete()) {
continue; //these don't need to be transferred
}
// record.getCurrent() will return null if this record was created in this session --
// in this case, we just ignore it, and it will be cleaned up by clearing the records map.
if (record.getCurrent() != null) {
Collection<FlowFileRecord> collection = recordMap.get(record.getDestination());
if (collection == null) {
collection = new ArrayList<>();
recordMap.put(record.getDestination(), collection);
}
collection.add(record.getCurrent());
}
}
for (final Map.Entry<FlowFileQueue, Collection<FlowFileRecord>> entry : recordMap.entrySet()) {
entry.getKey().putAll(entry.getValue());
}
final long enqueueFlowFileFinishNanos = System.nanoTime();
final long enqueueFlowFileNanos = enqueueFlowFileFinishNanos - updateEventRepositoryFinishNanos;
// Delete any files from disk that need to be removed.
for (final Path path : checkpoint.deleteOnCommit) {
try {
Files.deleteIfExists(path);
} catch (final IOException e) {
throw new FlowFileAccessException("Unable to delete " + path.toFile().getAbsolutePath(), e);
}
}
checkpoint.deleteOnCommit.clear();
if (LOG.isInfoEnabled()) {
final String sessionSummary = summarizeEvents(checkpoint);
if (!sessionSummary.isEmpty()) {
LOG.info("{} for {}, committed the following events: {}", new Object[]{this, connectableDescription, sessionSummary});
}
}
for (final Map.Entry<String, Long> entry : checkpoint.localCounters.entrySet()) {
adjustCounter(entry.getKey(), entry.getValue(), true);
}
for (final Map.Entry<String, Long> entry : checkpoint.globalCounters.entrySet()) {
adjustCounter(entry.getKey(), entry.getValue(), true);
}
acknowledgeRecords();
resetState();
if (LOG.isDebugEnabled()) {
final StringBuilder timingInfo = new StringBuilder();
timingInfo.append("Session commit for ").append(this).append(" [").append(connectableDescription).append("]").append(" took ");
final long commitNanos = System.nanoTime() - commitStartNanos;
formatNanos(commitNanos, timingInfo);
timingInfo.append("; FlowFile Repository Update took ");
formatNanos(flowFileRepoUpdateNanos, timingInfo);
timingInfo.append("; Claim Removal took ");
formatNanos(claimRemovalNanos, timingInfo);
timingInfo.append("; FlowFile Event Update took ");
formatNanos(updateEventRepositoryNanos, timingInfo);
timingInfo.append("; Enqueuing FlowFiles took ");
formatNanos(enqueueFlowFileNanos, timingInfo);
timingInfo.append("; Updating Provenance Event Repository took ");
formatNanos(updateProvenanceNanos, timingInfo);
LOG.debug(timingInfo.toString());
}
} catch (final Exception e) {
try {
rollback();
} catch (final Exception e1) {
e.addSuppressed(e1);
}
throw e;
}
}
private void updateEventRepository(final Checkpoint checkpoint) {
@ -2115,13 +2125,24 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
}
private void resetWriteClaims() {
resetWriteClaims(true);
}
private void resetWriteClaims(final boolean suppressExceptions) {
try {
if (currentWriteClaimStream != null) {
currentWriteClaimStream.flush();
currentWriteClaimStream.close();
try {
currentWriteClaimStream.flush();
} finally {
currentWriteClaimStream.close();
}
}
} catch (final Exception e) {
} catch (final IOException e) {
if (!suppressExceptions) {
throw new FlowFileAccessException("Unable to flush the output of FlowFile to the Content Repository");
}
}
currentWriteClaimStream = null;
currentWriteClaim = null;
currentWriteClaimFlowFileCount = 0;
@ -2129,9 +2150,15 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
for (final ByteCountingOutputStream out : appendableStreams.values()) {
try {
out.flush();
out.close();
} catch (final Exception e) {
try {
out.flush();
} finally {
out.close();
}
} catch (final IOException e) {
if (!suppressExceptions) {
throw new FlowFileAccessException("Unable to flush the output of FlowFile to the Content Repository");
}
}
}
appendableStreams.clear();