From 35439db347a05376e29c65363cbe47a122ce840c Mon Sep 17 00:00:00 2001 From: Mark Payne Date: Tue, 11 Aug 2015 16:19:47 -0400 Subject: [PATCH] NIFI-841: Ensure that IOExceptions on session commit are handled properly Signed-off-by: joewitt Signed-off-by: Mark Payne --- .../repository/StandardProcessSession.java | 303 ++++++++++-------- 1 file changed, 165 insertions(+), 138 deletions(-) diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java index 2e33c227c2..fe7f12531d 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java @@ -292,138 +292,148 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE @SuppressWarnings({"unchecked", "rawtypes"}) private void commit(final Checkpoint checkpoint) { - final long commitStartNanos = System.nanoTime(); - - resetWriteClaims(); - resetReadClaim(); - - final long updateProvenanceStart = System.nanoTime(); - updateProvenanceRepo(checkpoint); - - final long claimRemovalStart = System.nanoTime(); - final long updateProvenanceNanos = claimRemovalStart - updateProvenanceStart; - - // Figure out which content claims can be released. - // At this point, we will decrement the Claimant Count for the claims via the Content Repository. - // We do not actually destroy the content because otherwise, we could remove the - // Original Claim and crash/restart before the FlowFileRepository is updated. This will result in the FlowFile being restored such that - // the content claim points to the Original Claim -- which has already been removed! - for (final Map.Entry entry : checkpoint.records.entrySet()) { - final FlowFile flowFile = entry.getKey(); - final StandardRepositoryRecord record = entry.getValue(); - - if (record.isMarkedForDelete()) { - // if the working claim is not the same as the original claim, we can immediately destroy the working claim - // because it was created in this session and is to be deleted. We don't need to wait for the FlowFile Repo to sync. - removeContent(record.getWorkingClaim()); - - if (record.getOriginalClaim() != null && !record.getOriginalClaim().equals(record.getWorkingClaim())) { - // if working & original claim are same, don't remove twice; we only want to remove the original - // if it's different from the working. Otherwise, we remove two claimant counts. This causes - // an issue if we only updated the FlowFile attributes. - removeContent(record.getOriginalClaim()); - } - final long flowFileLife = System.currentTimeMillis() - flowFile.getEntryDate(); - final Connectable connectable = context.getConnectable(); - final Object terminator = connectable instanceof ProcessorNode ? ((ProcessorNode) connectable).getProcessor() : connectable; - LOG.info("{} terminated by {}; life of FlowFile = {} ms", new Object[]{flowFile, terminator, flowFileLife}); - } else if (record.isWorking() && record.getWorkingClaim() != record.getOriginalClaim()) { - //records which have been updated - remove original if exists - removeContent(record.getOriginalClaim()); - } - } - - final long claimRemovalFinishNanos = System.nanoTime(); - final long claimRemovalNanos = claimRemovalFinishNanos - claimRemovalStart; - - // Update the FlowFile Repository - try { - final Collection repoRecords = checkpoint.records.values(); - context.getFlowFileRepository().updateRepository((Collection) repoRecords); - } catch (final IOException ioe) { - rollback(); - throw new ProcessException("FlowFile Repository failed to update", ioe); - } - final long flowFileRepoUpdateFinishNanos = System.nanoTime(); - final long flowFileRepoUpdateNanos = flowFileRepoUpdateFinishNanos - claimRemovalFinishNanos; - - updateEventRepository(checkpoint); - - final long updateEventRepositoryFinishNanos = System.nanoTime(); - final long updateEventRepositoryNanos = updateEventRepositoryFinishNanos - claimRemovalFinishNanos; - - // transfer the flowfiles to the connections' queues. - final Map> recordMap = new HashMap<>(); - for (final StandardRepositoryRecord record : checkpoint.records.values()) { - if (record.isMarkedForAbort() || record.isMarkedForDelete()) { - continue; //these don't need to be transferred - } - // record.getCurrent() will return null if this record was created in this session -- - // in this case, we just ignore it, and it will be cleaned up by clearing the records map. - if (record.getCurrent() != null) { - Collection collection = recordMap.get(record.getDestination()); - if (collection == null) { - collection = new ArrayList<>(); - recordMap.put(record.getDestination(), collection); - } - collection.add(record.getCurrent()); - } - } - - for (final Map.Entry> entry : recordMap.entrySet()) { - entry.getKey().putAll(entry.getValue()); - } - - final long enqueueFlowFileFinishNanos = System.nanoTime(); - final long enqueueFlowFileNanos = enqueueFlowFileFinishNanos - updateEventRepositoryFinishNanos; - - // Delete any files from disk that need to be removed. - for (final Path path : checkpoint.deleteOnCommit) { - try { - Files.deleteIfExists(path); - } catch (final IOException e) { - throw new FlowFileAccessException("Unable to delete " + path.toFile().getAbsolutePath(), e); - } - } - checkpoint.deleteOnCommit.clear(); - - if (LOG.isInfoEnabled()) { - final String sessionSummary = summarizeEvents(checkpoint); - if (!sessionSummary.isEmpty()) { - LOG.info("{} for {}, committed the following events: {}", new Object[]{this, connectableDescription, sessionSummary}); - } - } - - for (final Map.Entry entry : checkpoint.localCounters.entrySet()) { - adjustCounter(entry.getKey(), entry.getValue(), true); - } - - for (final Map.Entry entry : checkpoint.globalCounters.entrySet()) { - adjustCounter(entry.getKey(), entry.getValue(), true); - } - - acknowledgeRecords(); - resetState(); - - if (LOG.isDebugEnabled()) { - final StringBuilder timingInfo = new StringBuilder(); - timingInfo.append("Session commit for ").append(this).append(" [").append(connectableDescription).append("]").append(" took "); - - final long commitNanos = System.nanoTime() - commitStartNanos; - formatNanos(commitNanos, timingInfo); - timingInfo.append("; FlowFile Repository Update took "); - formatNanos(flowFileRepoUpdateNanos, timingInfo); - timingInfo.append("; Claim Removal took "); - formatNanos(claimRemovalNanos, timingInfo); - timingInfo.append("; FlowFile Event Update took "); - formatNanos(updateEventRepositoryNanos, timingInfo); - timingInfo.append("; Enqueuing FlowFiles took "); - formatNanos(enqueueFlowFileNanos, timingInfo); - timingInfo.append("; Updating Provenance Event Repository took "); - formatNanos(updateProvenanceNanos, timingInfo); - - LOG.debug(timingInfo.toString()); - } + try { + final long commitStartNanos = System.nanoTime(); + + resetWriteClaims(false); + resetReadClaim(); + + final long updateProvenanceStart = System.nanoTime(); + updateProvenanceRepo(checkpoint); + + final long claimRemovalStart = System.nanoTime(); + final long updateProvenanceNanos = claimRemovalStart - updateProvenanceStart; + + // Figure out which content claims can be released. + // At this point, we will decrement the Claimant Count for the claims via the Content Repository. + // We do not actually destroy the content because otherwise, we could remove the + // Original Claim and crash/restart before the FlowFileRepository is updated. This will result in the FlowFile being restored such that + // the content claim points to the Original Claim -- which has already been removed! + for (final Map.Entry entry : checkpoint.records.entrySet()) { + final FlowFile flowFile = entry.getKey(); + final StandardRepositoryRecord record = entry.getValue(); + + if (record.isMarkedForDelete()) { + // if the working claim is not the same as the original claim, we can immediately destroy the working claim + // because it was created in this session and is to be deleted. We don't need to wait for the FlowFile Repo to sync. + removeContent(record.getWorkingClaim()); + + if (record.getOriginalClaim() != null && !record.getOriginalClaim().equals(record.getWorkingClaim())) { + // if working & original claim are same, don't remove twice; we only want to remove the original + // if it's different from the working. Otherwise, we remove two claimant counts. This causes + // an issue if we only updated the FlowFile attributes. + removeContent(record.getOriginalClaim()); + } + final long flowFileLife = System.currentTimeMillis() - flowFile.getEntryDate(); + final Connectable connectable = context.getConnectable(); + final Object terminator = connectable instanceof ProcessorNode ? ((ProcessorNode) connectable).getProcessor() : connectable; + LOG.info("{} terminated by {}; life of FlowFile = {} ms", new Object[]{flowFile, terminator, flowFileLife}); + } else if (record.isWorking() && record.getWorkingClaim() != record.getOriginalClaim()) { + //records which have been updated - remove original if exists + removeContent(record.getOriginalClaim()); + } + } + + final long claimRemovalFinishNanos = System.nanoTime(); + final long claimRemovalNanos = claimRemovalFinishNanos - claimRemovalStart; + + // Update the FlowFile Repository + try { + final Collection repoRecords = checkpoint.records.values(); + context.getFlowFileRepository().updateRepository((Collection) repoRecords); + } catch (final IOException ioe) { + rollback(); + throw new ProcessException("FlowFile Repository failed to update", ioe); + } + final long flowFileRepoUpdateFinishNanos = System.nanoTime(); + final long flowFileRepoUpdateNanos = flowFileRepoUpdateFinishNanos - claimRemovalFinishNanos; + + updateEventRepository(checkpoint); + + final long updateEventRepositoryFinishNanos = System.nanoTime(); + final long updateEventRepositoryNanos = updateEventRepositoryFinishNanos - claimRemovalFinishNanos; + + // transfer the flowfiles to the connections' queues. + final Map> recordMap = new HashMap<>(); + for (final StandardRepositoryRecord record : checkpoint.records.values()) { + if (record.isMarkedForAbort() || record.isMarkedForDelete()) { + continue; //these don't need to be transferred + } + // record.getCurrent() will return null if this record was created in this session -- + // in this case, we just ignore it, and it will be cleaned up by clearing the records map. + if (record.getCurrent() != null) { + Collection collection = recordMap.get(record.getDestination()); + if (collection == null) { + collection = new ArrayList<>(); + recordMap.put(record.getDestination(), collection); + } + collection.add(record.getCurrent()); + } + } + + for (final Map.Entry> entry : recordMap.entrySet()) { + entry.getKey().putAll(entry.getValue()); + } + + final long enqueueFlowFileFinishNanos = System.nanoTime(); + final long enqueueFlowFileNanos = enqueueFlowFileFinishNanos - updateEventRepositoryFinishNanos; + + // Delete any files from disk that need to be removed. + for (final Path path : checkpoint.deleteOnCommit) { + try { + Files.deleteIfExists(path); + } catch (final IOException e) { + throw new FlowFileAccessException("Unable to delete " + path.toFile().getAbsolutePath(), e); + } + } + checkpoint.deleteOnCommit.clear(); + + if (LOG.isInfoEnabled()) { + final String sessionSummary = summarizeEvents(checkpoint); + if (!sessionSummary.isEmpty()) { + LOG.info("{} for {}, committed the following events: {}", new Object[]{this, connectableDescription, sessionSummary}); + } + } + + for (final Map.Entry entry : checkpoint.localCounters.entrySet()) { + adjustCounter(entry.getKey(), entry.getValue(), true); + } + + for (final Map.Entry entry : checkpoint.globalCounters.entrySet()) { + adjustCounter(entry.getKey(), entry.getValue(), true); + } + + acknowledgeRecords(); + resetState(); + + if (LOG.isDebugEnabled()) { + final StringBuilder timingInfo = new StringBuilder(); + timingInfo.append("Session commit for ").append(this).append(" [").append(connectableDescription).append("]").append(" took "); + + final long commitNanos = System.nanoTime() - commitStartNanos; + formatNanos(commitNanos, timingInfo); + timingInfo.append("; FlowFile Repository Update took "); + formatNanos(flowFileRepoUpdateNanos, timingInfo); + timingInfo.append("; Claim Removal took "); + formatNanos(claimRemovalNanos, timingInfo); + timingInfo.append("; FlowFile Event Update took "); + formatNanos(updateEventRepositoryNanos, timingInfo); + timingInfo.append("; Enqueuing FlowFiles took "); + formatNanos(enqueueFlowFileNanos, timingInfo); + timingInfo.append("; Updating Provenance Event Repository took "); + formatNanos(updateProvenanceNanos, timingInfo); + + LOG.debug(timingInfo.toString()); + } + } catch (final Exception e) { + try { + rollback(); + } catch (final Exception e1) { + e.addSuppressed(e1); + } + + throw e; + } } private void updateEventRepository(final Checkpoint checkpoint) { @@ -2115,13 +2125,24 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE } private void resetWriteClaims() { + resetWriteClaims(true); + } + + private void resetWriteClaims(final boolean suppressExceptions) { try { if (currentWriteClaimStream != null) { - currentWriteClaimStream.flush(); - currentWriteClaimStream.close(); + try { + currentWriteClaimStream.flush(); + } finally { + currentWriteClaimStream.close(); + } } - } catch (final Exception e) { + } catch (final IOException e) { + if (!suppressExceptions) { + throw new FlowFileAccessException("Unable to flush the output of FlowFile to the Content Repository"); + } } + currentWriteClaimStream = null; currentWriteClaim = null; currentWriteClaimFlowFileCount = 0; @@ -2129,9 +2150,15 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE for (final ByteCountingOutputStream out : appendableStreams.values()) { try { - out.flush(); - out.close(); - } catch (final Exception e) { + try { + out.flush(); + } finally { + out.close(); + } + } catch (final IOException e) { + if (!suppressExceptions) { + throw new FlowFileAccessException("Unable to flush the output of FlowFile to the Content Repository"); + } } } appendableStreams.clear();