From c3731703f397574399e14a7b1589bda9e9177fd9 Mon Sep 17 00:00:00 2001 From: joewitt Date: Mon, 17 Aug 2015 00:05:03 -0400 Subject: [PATCH] NIFI-841 correct patch paths and fixed formatting issues Signed-off-by: Mark Payne --- .../repository/StandardProcessSession.java | 508 +++++++++--------- 1 file changed, 260 insertions(+), 248 deletions(-) diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java index fe7f12531d..01fc3c87a4 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java @@ -81,7 +81,9 @@ import org.slf4j.LoggerFactory; /** *

- * Provides a ProcessSession that ensures all accesses, changes and transfers occur in an atomic manner for all FlowFiles including their contents and attributes

+ * Provides a ProcessSession that ensures all accesses, changes and transfers + * occur in an atomic manner for all FlowFiles including their contents and + * attributes

*

* NOT THREAD SAFE

*

@@ -93,7 +95,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE // determines how many things must be transferred, removed, modified in order to avoid logging the FlowFile ID's on commit/rollback public static final int VERBOSE_LOG_THRESHOLD = 10; private static final long MAX_APPENDABLE_CLAIM_SIZE = DataUnit.parseDataSize( - NiFiProperties.getInstance().getMaxAppendableClaimSize(), DataUnit.B).longValue(); + NiFiProperties.getInstance().getMaxAppendableClaimSize(), DataUnit.B).longValue(); private static final int MAX_FLOWFILES_PER_CLAIM = NiFiProperties.getInstance().getMaxFlowFilesPerClaim(); public static final String DEFAULT_FLOWFILE_PATH = "./"; @@ -154,32 +156,32 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE String description = connectable.toString(); switch (connectable.getConnectableType()) { - case PROCESSOR: - final ProcessorNode procNode = (ProcessorNode) connectable; - componentType = procNode.getProcessor().getClass().getSimpleName(); - description = procNode.getProcessor().toString(); - break; - case INPUT_PORT: - componentType = "Input Port"; - break; - case OUTPUT_PORT: - componentType = "Output Port"; - break; - case REMOTE_INPUT_PORT: - componentType = "Remote Input Port"; - break; - case REMOTE_OUTPUT_PORT: - componentType = "Remote Output Port"; - break; - case FUNNEL: - componentType = "Funnel"; - break; - default: - throw new AssertionError("Connectable type is " + connectable.getConnectableType()); + case PROCESSOR: + final ProcessorNode procNode = (ProcessorNode) connectable; + componentType = procNode.getProcessor().getClass().getSimpleName(); + description = procNode.getProcessor().toString(); + break; + case INPUT_PORT: + componentType = "Input Port"; + break; + case OUTPUT_PORT: + componentType = "Output Port"; + break; + case REMOTE_INPUT_PORT: + componentType = "Remote Input Port"; + break; + case REMOTE_OUTPUT_PORT: + componentType = "Remote Output Port"; + break; + case FUNNEL: + componentType = "Funnel"; + break; + default: + throw new AssertionError("Connectable type is " + connectable.getConnectableType()); } this.provenanceReporter = new StandardProvenanceReporter(this, connectable.getIdentifier(), componentType, - context.getProvenanceRepository(), this); + context.getProvenanceRepository(), this); this.sessionId = idGenerator.getAndIncrement(); this.connectableDescription = description; @@ -292,148 +294,147 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE @SuppressWarnings({"unchecked", "rawtypes"}) private void commit(final Checkpoint checkpoint) { - try { - final long commitStartNanos = System.nanoTime(); - - resetWriteClaims(false); - resetReadClaim(); - - final long updateProvenanceStart = System.nanoTime(); - updateProvenanceRepo(checkpoint); - - final long claimRemovalStart = System.nanoTime(); - final long updateProvenanceNanos = claimRemovalStart - updateProvenanceStart; - + try { + final long commitStartNanos = System.nanoTime(); + + resetWriteClaims(false); + resetReadClaim(); + + final long updateProvenanceStart = System.nanoTime(); + updateProvenanceRepo(checkpoint); + + final long claimRemovalStart = System.nanoTime(); + final long updateProvenanceNanos = claimRemovalStart - updateProvenanceStart; + // Figure out which content claims can be released. - // At this point, we will decrement the Claimant Count for the claims via the Content Repository. - // We do not actually destroy the content because otherwise, we could remove the - // Original Claim and crash/restart before the FlowFileRepository is updated. This will result in the FlowFile being restored such that - // the content claim points to the Original Claim -- which has already been removed! - for (final Map.Entry entry : checkpoint.records.entrySet()) { - final FlowFile flowFile = entry.getKey(); - final StandardRepositoryRecord record = entry.getValue(); - - if (record.isMarkedForDelete()) { + // At this point, we will decrement the Claimant Count for the claims via the Content Repository. + // We do not actually destroy the content because otherwise, we could remove the + // Original Claim and crash/restart before the FlowFileRepository is updated. This will result in the FlowFile being restored such that + // the content claim points to the Original Claim -- which has already been removed! + for (final Map.Entry entry : checkpoint.records.entrySet()) { + final FlowFile flowFile = entry.getKey(); + final StandardRepositoryRecord record = entry.getValue(); + + if (record.isMarkedForDelete()) { // if the working claim is not the same as the original claim, we can immediately destroy the working claim - // because it was created in this session and is to be deleted. We don't need to wait for the FlowFile Repo to sync. - removeContent(record.getWorkingClaim()); - - if (record.getOriginalClaim() != null && !record.getOriginalClaim().equals(record.getWorkingClaim())) { + // because it was created in this session and is to be deleted. We don't need to wait for the FlowFile Repo to sync. + removeContent(record.getWorkingClaim()); + + if (record.getOriginalClaim() != null && !record.getOriginalClaim().equals(record.getWorkingClaim())) { // if working & original claim are same, don't remove twice; we only want to remove the original - // if it's different from the working. Otherwise, we remove two claimant counts. This causes - // an issue if we only updated the FlowFile attributes. - removeContent(record.getOriginalClaim()); - } - final long flowFileLife = System.currentTimeMillis() - flowFile.getEntryDate(); - final Connectable connectable = context.getConnectable(); - final Object terminator = connectable instanceof ProcessorNode ? ((ProcessorNode) connectable).getProcessor() : connectable; - LOG.info("{} terminated by {}; life of FlowFile = {} ms", new Object[]{flowFile, terminator, flowFileLife}); - } else if (record.isWorking() && record.getWorkingClaim() != record.getOriginalClaim()) { - //records which have been updated - remove original if exists - removeContent(record.getOriginalClaim()); - } - } - - final long claimRemovalFinishNanos = System.nanoTime(); - final long claimRemovalNanos = claimRemovalFinishNanos - claimRemovalStart; - - // Update the FlowFile Repository - try { - final Collection repoRecords = checkpoint.records.values(); - context.getFlowFileRepository().updateRepository((Collection) repoRecords); - } catch (final IOException ioe) { - rollback(); - throw new ProcessException("FlowFile Repository failed to update", ioe); - } - final long flowFileRepoUpdateFinishNanos = System.nanoTime(); - final long flowFileRepoUpdateNanos = flowFileRepoUpdateFinishNanos - claimRemovalFinishNanos; - - updateEventRepository(checkpoint); - - final long updateEventRepositoryFinishNanos = System.nanoTime(); - final long updateEventRepositoryNanos = updateEventRepositoryFinishNanos - claimRemovalFinishNanos; - - // transfer the flowfiles to the connections' queues. - final Map> recordMap = new HashMap<>(); - for (final StandardRepositoryRecord record : checkpoint.records.values()) { - if (record.isMarkedForAbort() || record.isMarkedForDelete()) { - continue; //these don't need to be transferred - } + // if it's different from the working. Otherwise, we remove two claimant counts. This causes + // an issue if we only updated the FlowFile attributes. + removeContent(record.getOriginalClaim()); + } + final long flowFileLife = System.currentTimeMillis() - flowFile.getEntryDate(); + final Connectable connectable = context.getConnectable(); + final Object terminator = connectable instanceof ProcessorNode ? ((ProcessorNode) connectable).getProcessor() : connectable; + LOG.info("{} terminated by {}; life of FlowFile = {} ms", new Object[]{flowFile, terminator, flowFileLife}); + } else if (record.isWorking() && record.getWorkingClaim() != record.getOriginalClaim()) { + //records which have been updated - remove original if exists + removeContent(record.getOriginalClaim()); + } + } + + final long claimRemovalFinishNanos = System.nanoTime(); + final long claimRemovalNanos = claimRemovalFinishNanos - claimRemovalStart; + + // Update the FlowFile Repository + try { + final Collection repoRecords = checkpoint.records.values(); + context.getFlowFileRepository().updateRepository((Collection) repoRecords); + } catch (final IOException ioe) { + rollback(); + throw new ProcessException("FlowFile Repository failed to update", ioe); + } + final long flowFileRepoUpdateFinishNanos = System.nanoTime(); + final long flowFileRepoUpdateNanos = flowFileRepoUpdateFinishNanos - claimRemovalFinishNanos; + + updateEventRepository(checkpoint); + + final long updateEventRepositoryFinishNanos = System.nanoTime(); + final long updateEventRepositoryNanos = updateEventRepositoryFinishNanos - claimRemovalFinishNanos; + + // transfer the flowfiles to the connections' queues. + final Map> recordMap = new HashMap<>(); + for (final StandardRepositoryRecord record : checkpoint.records.values()) { + if (record.isMarkedForAbort() || record.isMarkedForDelete()) { + continue; //these don't need to be transferred + } // record.getCurrent() will return null if this record was created in this session -- - // in this case, we just ignore it, and it will be cleaned up by clearing the records map. - if (record.getCurrent() != null) { - Collection collection = recordMap.get(record.getDestination()); - if (collection == null) { - collection = new ArrayList<>(); - recordMap.put(record.getDestination(), collection); - } - collection.add(record.getCurrent()); - } - } - - for (final Map.Entry> entry : recordMap.entrySet()) { - entry.getKey().putAll(entry.getValue()); - } - - final long enqueueFlowFileFinishNanos = System.nanoTime(); - final long enqueueFlowFileNanos = enqueueFlowFileFinishNanos - updateEventRepositoryFinishNanos; - - // Delete any files from disk that need to be removed. - for (final Path path : checkpoint.deleteOnCommit) { - try { - Files.deleteIfExists(path); - } catch (final IOException e) { - throw new FlowFileAccessException("Unable to delete " + path.toFile().getAbsolutePath(), e); - } - } - checkpoint.deleteOnCommit.clear(); - - if (LOG.isInfoEnabled()) { - final String sessionSummary = summarizeEvents(checkpoint); - if (!sessionSummary.isEmpty()) { - LOG.info("{} for {}, committed the following events: {}", new Object[]{this, connectableDescription, sessionSummary}); - } - } - - for (final Map.Entry entry : checkpoint.localCounters.entrySet()) { - adjustCounter(entry.getKey(), entry.getValue(), true); - } - - for (final Map.Entry entry : checkpoint.globalCounters.entrySet()) { - adjustCounter(entry.getKey(), entry.getValue(), true); - } - - acknowledgeRecords(); - resetState(); - - if (LOG.isDebugEnabled()) { - final StringBuilder timingInfo = new StringBuilder(); - timingInfo.append("Session commit for ").append(this).append(" [").append(connectableDescription).append("]").append(" took "); - - final long commitNanos = System.nanoTime() - commitStartNanos; - formatNanos(commitNanos, timingInfo); - timingInfo.append("; FlowFile Repository Update took "); - formatNanos(flowFileRepoUpdateNanos, timingInfo); - timingInfo.append("; Claim Removal took "); - formatNanos(claimRemovalNanos, timingInfo); - timingInfo.append("; FlowFile Event Update took "); - formatNanos(updateEventRepositoryNanos, timingInfo); - timingInfo.append("; Enqueuing FlowFiles took "); - formatNanos(enqueueFlowFileNanos, timingInfo); - timingInfo.append("; Updating Provenance Event Repository took "); - formatNanos(updateProvenanceNanos, timingInfo); - - LOG.debug(timingInfo.toString()); - } - } catch (final Exception e) { - try { - rollback(); - } catch (final Exception e1) { - e.addSuppressed(e1); - } - - throw e; - } + // in this case, we just ignore it, and it will be cleaned up by clearing the records map. + if (record.getCurrent() != null) { + Collection collection = recordMap.get(record.getDestination()); + if (collection == null) { + collection = new ArrayList<>(); + recordMap.put(record.getDestination(), collection); + } + collection.add(record.getCurrent()); + } + } + + for (final Map.Entry> entry : recordMap.entrySet()) { + entry.getKey().putAll(entry.getValue()); + } + + final long enqueueFlowFileFinishNanos = System.nanoTime(); + final long enqueueFlowFileNanos = enqueueFlowFileFinishNanos - updateEventRepositoryFinishNanos; + + // Delete any files from disk that need to be removed. + for (final Path path : checkpoint.deleteOnCommit) { + try { + Files.deleteIfExists(path); + } catch (final IOException e) { + throw new FlowFileAccessException("Unable to delete " + path.toFile().getAbsolutePath(), e); + } + } + checkpoint.deleteOnCommit.clear(); + + if (LOG.isInfoEnabled()) { + final String sessionSummary = summarizeEvents(checkpoint); + if (!sessionSummary.isEmpty()) { + LOG.info("{} for {}, committed the following events: {}", new Object[]{this, connectableDescription, sessionSummary}); + } + } + + for (final Map.Entry entry : checkpoint.localCounters.entrySet()) { + adjustCounter(entry.getKey(), entry.getValue(), true); + } + + for (final Map.Entry entry : checkpoint.globalCounters.entrySet()) { + adjustCounter(entry.getKey(), entry.getValue(), true); + } + + acknowledgeRecords(); + resetState(); + + if (LOG.isDebugEnabled()) { + final StringBuilder timingInfo = new StringBuilder(); + timingInfo.append("Session commit for ").append(this).append(" [").append(connectableDescription).append("]").append(" took "); + + final long commitNanos = System.nanoTime() - commitStartNanos; + formatNanos(commitNanos, timingInfo); + timingInfo.append("; FlowFile Repository Update took "); + formatNanos(flowFileRepoUpdateNanos, timingInfo); + timingInfo.append("; Claim Removal took "); + formatNanos(claimRemovalNanos, timingInfo); + timingInfo.append("; FlowFile Event Update took "); + formatNanos(updateEventRepositoryNanos, timingInfo); + timingInfo.append("; Enqueuing FlowFiles took "); + formatNanos(enqueueFlowFileNanos, timingInfo); + timingInfo.append("; Updating Provenance Event Repository took "); + formatNanos(updateProvenanceNanos, timingInfo); + + LOG.debug(timingInfo.toString()); + } + } catch (final Exception e) { + try { + rollback(); + } catch (final Exception e1) { + e.addSuppressed(e1); + } + throw e; + } } private void updateEventRepository(final Checkpoint checkpoint) { @@ -448,16 +449,16 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE } switch (event.getEventType()) { - case SEND: - flowFilesSent++; - bytesSent += event.getFileSize(); - break; - case RECEIVE: - flowFilesReceived++; - bytesReceived += event.getFileSize(); - break; - default: - break; + case SEND: + flowFilesSent++; + bytesSent += event.getFileSize(); + break; + case RECEIVE: + flowFilesReceived++; + bytesReceived += event.getFileSize(); + break; + default: + break; } } @@ -610,9 +611,9 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE boolean creationEventRegistered = false; if (registeredTypes != null) { if (registeredTypes.contains(ProvenanceEventType.CREATE) - || registeredTypes.contains(ProvenanceEventType.FORK) - || registeredTypes.contains(ProvenanceEventType.JOIN) - || registeredTypes.contains(ProvenanceEventType.RECEIVE)) { + || registeredTypes.contains(ProvenanceEventType.FORK) + || registeredTypes.contains(ProvenanceEventType.JOIN) + || registeredTypes.contains(ProvenanceEventType.RECEIVE)) { creationEventRegistered = true; } } @@ -701,11 +702,11 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE builder.setCurrentContentClaim(null, null, null, null, 0L); } else { builder.setCurrentContentClaim( - originalClaim.getContainer(), originalClaim.getSection(), originalClaim.getId(), repoRecord.getOriginal().getContentClaimOffset(), repoRecord.getOriginal().getSize() - ); + originalClaim.getContainer(), originalClaim.getSection(), originalClaim.getId(), repoRecord.getOriginal().getContentClaimOffset(), repoRecord.getOriginal().getSize() + ); builder.setPreviousContentClaim( - originalClaim.getContainer(), originalClaim.getSection(), originalClaim.getId(), repoRecord.getOriginal().getContentClaimOffset(), repoRecord.getOriginal().getSize() - ); + originalClaim.getContainer(), originalClaim.getSection(), originalClaim.getId(), repoRecord.getOriginal().getContentClaimOffset(), repoRecord.getOriginal().getSize() + ); } } @@ -741,7 +742,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE } private StandardProvenanceEventRecord enrich( - final ProvenanceEventRecord rawEvent, final Map flowFileRecordMap, final Map records, final boolean updateAttributes) { + final ProvenanceEventRecord rawEvent, final Map flowFileRecordMap, final Map records, final boolean updateAttributes) { final StandardProvenanceEventRecord.Builder recordBuilder = new StandardProvenanceEventRecord.Builder().fromEvent(rawEvent); final FlowFileRecord eventFlowFile = flowFileRecordMap.get(rawEvent.getFlowFileUuid()); if (eventFlowFile != null) { @@ -781,8 +782,10 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE } /** - * Checks if the given event is a spurious FORK, meaning that the FORK has a single child and that child was removed in this session. This happens when a Processor calls #create(FlowFile) and then - * removes the created FlowFile. + * Checks if the given event is a spurious FORK, meaning that the FORK has a + * single child and that child was removed in this session. This happens + * when a Processor calls #create(FlowFile) and then removes the created + * FlowFile. * * @param event event * @return true if spurious fork @@ -799,8 +802,10 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE } /** - * Checks if the given event is a spurious ROUTE, meaning that the ROUTE indicates that a FlowFile was routed to a relationship with only 1 connection and that Connection is the Connection from - * which the FlowFile was pulled. I.e., the FlowFile was really routed nowhere. + * Checks if the given event is a spurious ROUTE, meaning that the ROUTE + * indicates that a FlowFile was routed to a relationship with only 1 + * connection and that Connection is the Connection from which the FlowFile + * was pulled. I.e., the FlowFile was really routed nowhere. * * @param event event * @param records records @@ -1010,7 +1015,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE final StringBuilder sb = new StringBuilder(512); if (!LOG.isDebugEnabled() && (largestTransferSetSize > VERBOSE_LOG_THRESHOLD - || numModified > VERBOSE_LOG_THRESHOLD || numCreated > VERBOSE_LOG_THRESHOLD || numRemoved > VERBOSE_LOG_THRESHOLD)) { + || numModified > VERBOSE_LOG_THRESHOLD || numCreated > VERBOSE_LOG_THRESHOLD || numRemoved > VERBOSE_LOG_THRESHOLD)) { if (numCreated > 0) { sb.append("created ").append(numCreated).append(" FlowFiles, "); } @@ -1258,8 +1263,8 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE attrs.put(CoreAttributes.UUID.key(), UUID.randomUUID().toString()); final FlowFileRecord fFile = new StandardFlowFileRecord.Builder().id(context.getNextFlowFileSequence()) - .addAttributes(attrs) - .build(); + .addAttributes(attrs) + .build(); final StandardRepositoryRecord record = new StandardRepositoryRecord(null); record.setWorking(fFile, attrs); records.put(fFile, record); @@ -1621,7 +1626,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE } final StandardProvenanceReporter expiredReporter = new StandardProvenanceReporter(this, connectable.getIdentifier(), - processorType, context.getProvenanceRepository(), this); + processorType, context.getProvenanceRepository(), this); final Map recordIdMap = new HashMap<>(); for (final FlowFileRecord flowFile : flowFiles) { @@ -1748,9 +1753,9 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE } try (final InputStream rawIn = getInputStream(source, record.getCurrentClaim(), record.getCurrentClaimOffset()); - final InputStream limitedIn = new LimitedInputStream(rawIn, source.getSize()); - final InputStream disableOnCloseIn = new DisableOnCloseInputStream(limitedIn); - final ByteCountingInputStream countingStream = new ByteCountingInputStream(disableOnCloseIn, this.bytesRead)) { + final InputStream limitedIn = new LimitedInputStream(rawIn, source.getSize()); + final InputStream disableOnCloseIn = new DisableOnCloseInputStream(limitedIn); + final ByteCountingInputStream countingStream = new ByteCountingInputStream(disableOnCloseIn, this.bytesRead)) { // We want to differentiate between IOExceptions thrown by the repository and IOExceptions thrown from // Processor code. As a result, as have the FlowFileAccessInputStream that catches IOException from the repository @@ -1821,7 +1826,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE try { try (final OutputStream rawOut = contentRepo.write(newClaim); - final OutputStream out = new BufferedOutputStream(rawOut)) { + final OutputStream out = new BufferedOutputStream(rawOut)) { if (header != null && header.length > 0) { out.write(header); @@ -1928,7 +1933,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE ensureNotAppending(newClaim); try (final OutputStream disableOnClose = new DisableOnCloseOutputStream(currentWriteClaimStream); - final OutputStream countingOut = new ByteCountingOutputStream(disableOnClose, writtenHolder)) { + final OutputStream countingOut = new ByteCountingOutputStream(disableOnClose, writtenHolder)) { recursionSet.add(source); @@ -1951,7 +1956,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE ensureNotAppending(newClaim); try (final OutputStream stream = context.getContentRepository().write(newClaim); - final OutputStream countingOut = new ByteCountingOutputStream(stream, writtenHolder)) { + final OutputStream countingOut = new ByteCountingOutputStream(stream, writtenHolder)) { recursionSet.add(source); writeRecursionLevel++; @@ -2092,10 +2097,13 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE } /** - * Checks if the ContentClaim associated with this record should be removed, since the record is about to be updated to point to a new content claim. If so, removes the working claim. + * Checks if the ContentClaim associated with this record should be removed, + * since the record is about to be updated to point to a new content claim. + * If so, removes the working claim. * - * This happens if & only if the content of this FlowFile has been modified since it was last committed to the FlowFile repository, because this indicates that the content is no longer needed and - * should be cleaned up. + * This happens if & only if the content of this FlowFile has been modified + * since it was last committed to the FlowFile repository, because this + * indicates that the content is no longer needed and should be cleaned up. * * @param record record */ @@ -2125,22 +2133,22 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE } private void resetWriteClaims() { - resetWriteClaims(true); + resetWriteClaims(true); } - + private void resetWriteClaims(final boolean suppressExceptions) { try { if (currentWriteClaimStream != null) { - try { - currentWriteClaimStream.flush(); - } finally { - currentWriteClaimStream.close(); - } + try { + currentWriteClaimStream.flush(); + } finally { + currentWriteClaimStream.close(); + } } } catch (final IOException e) { - if (!suppressExceptions) { - throw new FlowFileAccessException("Unable to flush the output of FlowFile to the Content Repository"); - } + if (!suppressExceptions) { + throw new FlowFileAccessException("Unable to flush the output of FlowFile to the Content Repository"); + } } currentWriteClaimStream = null; @@ -2150,15 +2158,15 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE for (final ByteCountingOutputStream out : appendableStreams.values()) { try { - try { - out.flush(); - } finally { - out.close(); - } + try { + out.flush(); + } finally { + out.close(); + } } catch (final IOException e) { - if (!suppressExceptions) { - throw new FlowFileAccessException("Unable to flush the output of FlowFile to the Content Repository"); - } + if (!suppressExceptions) { + throw new FlowFileAccessException("Unable to flush the output of FlowFile to the Content Repository"); + } } } appendableStreams.clear(); @@ -2176,7 +2184,8 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE } /** - * @return Indicates whether or not multiple FlowFiles should be merged into a single ContentClaim + * @return Indicates whether or not multiple FlowFiles should be merged into + * a single ContentClaim */ private boolean isMergeContent() { if (writeRecursionLevel > 0) { @@ -2204,11 +2213,11 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE ensureNotAppending(newClaim); try (final InputStream rawIn = getInputStream(source, currClaim, record.getCurrentClaimOffset()); - final InputStream limitedIn = new LimitedInputStream(rawIn, source.getSize()); - final InputStream disableOnCloseIn = new DisableOnCloseInputStream(limitedIn); - final InputStream countingIn = new ByteCountingInputStream(disableOnCloseIn, bytesRead); - final OutputStream disableOnCloseOut = new DisableOnCloseOutputStream(currentWriteClaimStream); - final OutputStream countingOut = new ByteCountingOutputStream(disableOnCloseOut, writtenHolder)) { + final InputStream limitedIn = new LimitedInputStream(rawIn, source.getSize()); + final InputStream disableOnCloseIn = new DisableOnCloseInputStream(limitedIn); + final InputStream countingIn = new ByteCountingInputStream(disableOnCloseIn, bytesRead); + final OutputStream disableOnCloseOut = new DisableOnCloseOutputStream(currentWriteClaimStream); + final OutputStream countingOut = new ByteCountingOutputStream(disableOnCloseOut, writtenHolder)) { recursionSet.add(source); @@ -2247,10 +2256,10 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE ensureNotAppending(newClaim); try (final InputStream is = getInputStream(source, currClaim, record.getCurrentClaimOffset()); - final InputStream limitedIn = new LimitedInputStream(is, source.getSize()); - final InputStream countingIn = new ByteCountingInputStream(limitedIn, bytesRead); - final OutputStream os = context.getContentRepository().write(newClaim); - final OutputStream countingOut = new ByteCountingOutputStream(os, writtenHolder)) { + final InputStream limitedIn = new LimitedInputStream(is, source.getSize()); + final InputStream countingIn = new ByteCountingInputStream(limitedIn, bytesRead); + final OutputStream os = context.getContentRepository().write(newClaim); + final OutputStream countingOut = new ByteCountingOutputStream(os, writtenHolder)) { recursionSet.add(source); @@ -2363,9 +2372,9 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE removeTemporaryClaim(record); final FlowFileRecord newFile = new StandardFlowFileRecord.Builder().fromFlowFile(record.getCurrent()) - .contentClaim(newClaim).contentClaimOffset(claimOffset).size(newSize) - .addAttribute(CoreAttributes.FILENAME.key(), source.toFile().getName()) - .build(); + .contentClaim(newClaim).contentClaimOffset(claimOffset).size(newSize) + .addAttribute(CoreAttributes.FILENAME.key(), source.toFile().getName()) + .build(); record.setWorking(newFile, CoreAttributes.FILENAME.key(), source.toFile().getName()); if (!keepSourceFile) { deleteOnCommit.add(source); @@ -2520,7 +2529,8 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE * Checks if a FlowFile is known in this session. * * @param flowFile the FlowFile to check - * @return true if the FlowFile is known in this session, false otherwise. + * @return true if the FlowFile is known in this session, + * false otherwise. */ boolean isFlowFileKnown(final FlowFile flowFile) { return records.containsKey(flowFile); @@ -2542,8 +2552,8 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE final String key = entry.getKey(); final String value = entry.getValue(); if (CoreAttributes.ALTERNATE_IDENTIFIER.key().equals(key) - || CoreAttributes.DISCARD_REASON.key().equals(key) - || CoreAttributes.UUID.key().equals(key)) { + || CoreAttributes.DISCARD_REASON.key().equals(key) + || CoreAttributes.UUID.key().equals(key)) { continue; } newAttributes.put(key, value); @@ -2591,10 +2601,10 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE newAttributes.put(CoreAttributes.UUID.key(), UUID.randomUUID().toString()); final FlowFileRecord fFile = new StandardFlowFileRecord.Builder().id(context.getNextFlowFileSequence()) - .addAttributes(newAttributes) - .lineageIdentifiers(lineageIdentifiers) - .lineageStartDate(lineageStartDate) - .build(); + .addAttributes(newAttributes) + .lineageIdentifiers(lineageIdentifiers) + .lineageStartDate(lineageStartDate) + .build(); final StandardRepositoryRecord record = new StandardRepositoryRecord(null); record.setWorking(fFile, newAttributes); @@ -2606,7 +2616,8 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE } /** - * Returns the attributes that are common to every FlowFile given. The key and value must match exactly. + * Returns the attributes that are common to every FlowFile given. The key + * and value must match exactly. * * @param flowFileList a list of FlowFiles * @@ -2628,18 +2639,18 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE final Map firstMap = flowFileList.iterator().next().getAttributes(); outer: - for (final Map.Entry mapEntry : firstMap.entrySet()) { - final String key = mapEntry.getKey(); - final String value = mapEntry.getValue(); - for (final FlowFile flowFile : flowFileList) { - final Map currMap = flowFile.getAttributes(); - final String curVal = currMap.get(key); - if (curVal == null || !curVal.equals(value)) { - continue outer; - } + for (final Map.Entry mapEntry : firstMap.entrySet()) { + final String key = mapEntry.getKey(); + final String value = mapEntry.getValue(); + for (final FlowFile flowFile : flowFileList) { + final Map currMap = flowFile.getAttributes(); + final String curVal = currMap.get(key); + if (curVal == null || !curVal.equals(value)) { + continue outer; } - result.put(key, value); } + result.put(key, value); + } return result; } @@ -2661,7 +2672,8 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE } /** - * Callback interface used to poll a FlowFileQueue, in order to perform functional programming-type of polling a queue + * Callback interface used to poll a FlowFileQueue, in order to perform + * functional programming-type of polling a queue */ private static interface QueuePoller {