NIFI-841 correct patch paths and fixed formatting issues

Signed-off-by: Mark Payne <markap14@hotmail.com>
This commit is contained in:
joewitt 2015-08-17 00:05:03 -04:00 committed by Mark Payne
parent 35439db347
commit c3731703f3
1 changed files with 260 additions and 248 deletions

View File

@ -81,7 +81,9 @@ import org.slf4j.LoggerFactory;
/**
* <p>
* Provides a ProcessSession that ensures all accesses, changes and transfers occur in an atomic manner for all FlowFiles including their contents and attributes</p>
* Provides a ProcessSession that ensures all accesses, changes and transfers
* occur in an atomic manner for all FlowFiles including their contents and
* attributes</p>
* <p>
* NOT THREAD SAFE</p>
* <p/>
@ -93,7 +95,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
// determines how many things must be transferred, removed, modified in order to avoid logging the FlowFile ID's on commit/rollback
public static final int VERBOSE_LOG_THRESHOLD = 10;
private static final long MAX_APPENDABLE_CLAIM_SIZE = DataUnit.parseDataSize(
NiFiProperties.getInstance().getMaxAppendableClaimSize(), DataUnit.B).longValue();
NiFiProperties.getInstance().getMaxAppendableClaimSize(), DataUnit.B).longValue();
private static final int MAX_FLOWFILES_PER_CLAIM = NiFiProperties.getInstance().getMaxFlowFilesPerClaim();
public static final String DEFAULT_FLOWFILE_PATH = "./";
@ -154,32 +156,32 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
String description = connectable.toString();
switch (connectable.getConnectableType()) {
case PROCESSOR:
final ProcessorNode procNode = (ProcessorNode) connectable;
componentType = procNode.getProcessor().getClass().getSimpleName();
description = procNode.getProcessor().toString();
break;
case INPUT_PORT:
componentType = "Input Port";
break;
case OUTPUT_PORT:
componentType = "Output Port";
break;
case REMOTE_INPUT_PORT:
componentType = "Remote Input Port";
break;
case REMOTE_OUTPUT_PORT:
componentType = "Remote Output Port";
break;
case FUNNEL:
componentType = "Funnel";
break;
default:
throw new AssertionError("Connectable type is " + connectable.getConnectableType());
case PROCESSOR:
final ProcessorNode procNode = (ProcessorNode) connectable;
componentType = procNode.getProcessor().getClass().getSimpleName();
description = procNode.getProcessor().toString();
break;
case INPUT_PORT:
componentType = "Input Port";
break;
case OUTPUT_PORT:
componentType = "Output Port";
break;
case REMOTE_INPUT_PORT:
componentType = "Remote Input Port";
break;
case REMOTE_OUTPUT_PORT:
componentType = "Remote Output Port";
break;
case FUNNEL:
componentType = "Funnel";
break;
default:
throw new AssertionError("Connectable type is " + connectable.getConnectableType());
}
this.provenanceReporter = new StandardProvenanceReporter(this, connectable.getIdentifier(), componentType,
context.getProvenanceRepository(), this);
context.getProvenanceRepository(), this);
this.sessionId = idGenerator.getAndIncrement();
this.connectableDescription = description;
@ -292,148 +294,147 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
@SuppressWarnings({"unchecked", "rawtypes"})
private void commit(final Checkpoint checkpoint) {
try {
final long commitStartNanos = System.nanoTime();
try {
final long commitStartNanos = System.nanoTime();
resetWriteClaims(false);
resetReadClaim();
resetWriteClaims(false);
resetReadClaim();
final long updateProvenanceStart = System.nanoTime();
updateProvenanceRepo(checkpoint);
final long updateProvenanceStart = System.nanoTime();
updateProvenanceRepo(checkpoint);
final long claimRemovalStart = System.nanoTime();
final long updateProvenanceNanos = claimRemovalStart - updateProvenanceStart;
final long claimRemovalStart = System.nanoTime();
final long updateProvenanceNanos = claimRemovalStart - updateProvenanceStart;
// Figure out which content claims can be released.
// At this point, we will decrement the Claimant Count for the claims via the Content Repository.
// We do not actually destroy the content because otherwise, we could remove the
// Original Claim and crash/restart before the FlowFileRepository is updated. This will result in the FlowFile being restored such that
// the content claim points to the Original Claim -- which has already been removed!
for (final Map.Entry<FlowFileRecord, StandardRepositoryRecord> entry : checkpoint.records.entrySet()) {
final FlowFile flowFile = entry.getKey();
final StandardRepositoryRecord record = entry.getValue();
// At this point, we will decrement the Claimant Count for the claims via the Content Repository.
// We do not actually destroy the content because otherwise, we could remove the
// Original Claim and crash/restart before the FlowFileRepository is updated. This will result in the FlowFile being restored such that
// the content claim points to the Original Claim -- which has already been removed!
for (final Map.Entry<FlowFileRecord, StandardRepositoryRecord> entry : checkpoint.records.entrySet()) {
final FlowFile flowFile = entry.getKey();
final StandardRepositoryRecord record = entry.getValue();
if (record.isMarkedForDelete()) {
if (record.isMarkedForDelete()) {
// if the working claim is not the same as the original claim, we can immediately destroy the working claim
// because it was created in this session and is to be deleted. We don't need to wait for the FlowFile Repo to sync.
removeContent(record.getWorkingClaim());
// because it was created in this session and is to be deleted. We don't need to wait for the FlowFile Repo to sync.
removeContent(record.getWorkingClaim());
if (record.getOriginalClaim() != null && !record.getOriginalClaim().equals(record.getWorkingClaim())) {
if (record.getOriginalClaim() != null && !record.getOriginalClaim().equals(record.getWorkingClaim())) {
// if working & original claim are same, don't remove twice; we only want to remove the original
// if it's different from the working. Otherwise, we remove two claimant counts. This causes
// an issue if we only updated the FlowFile attributes.
removeContent(record.getOriginalClaim());
}
final long flowFileLife = System.currentTimeMillis() - flowFile.getEntryDate();
final Connectable connectable = context.getConnectable();
final Object terminator = connectable instanceof ProcessorNode ? ((ProcessorNode) connectable).getProcessor() : connectable;
LOG.info("{} terminated by {}; life of FlowFile = {} ms", new Object[]{flowFile, terminator, flowFileLife});
} else if (record.isWorking() && record.getWorkingClaim() != record.getOriginalClaim()) {
//records which have been updated - remove original if exists
removeContent(record.getOriginalClaim());
}
}
// if it's different from the working. Otherwise, we remove two claimant counts. This causes
// an issue if we only updated the FlowFile attributes.
removeContent(record.getOriginalClaim());
}
final long flowFileLife = System.currentTimeMillis() - flowFile.getEntryDate();
final Connectable connectable = context.getConnectable();
final Object terminator = connectable instanceof ProcessorNode ? ((ProcessorNode) connectable).getProcessor() : connectable;
LOG.info("{} terminated by {}; life of FlowFile = {} ms", new Object[]{flowFile, terminator, flowFileLife});
} else if (record.isWorking() && record.getWorkingClaim() != record.getOriginalClaim()) {
//records which have been updated - remove original if exists
removeContent(record.getOriginalClaim());
}
}
final long claimRemovalFinishNanos = System.nanoTime();
final long claimRemovalNanos = claimRemovalFinishNanos - claimRemovalStart;
final long claimRemovalFinishNanos = System.nanoTime();
final long claimRemovalNanos = claimRemovalFinishNanos - claimRemovalStart;
// Update the FlowFile Repository
try {
final Collection<StandardRepositoryRecord> repoRecords = checkpoint.records.values();
context.getFlowFileRepository().updateRepository((Collection) repoRecords);
} catch (final IOException ioe) {
rollback();
throw new ProcessException("FlowFile Repository failed to update", ioe);
}
final long flowFileRepoUpdateFinishNanos = System.nanoTime();
final long flowFileRepoUpdateNanos = flowFileRepoUpdateFinishNanos - claimRemovalFinishNanos;
// Update the FlowFile Repository
try {
final Collection<StandardRepositoryRecord> repoRecords = checkpoint.records.values();
context.getFlowFileRepository().updateRepository((Collection) repoRecords);
} catch (final IOException ioe) {
rollback();
throw new ProcessException("FlowFile Repository failed to update", ioe);
}
final long flowFileRepoUpdateFinishNanos = System.nanoTime();
final long flowFileRepoUpdateNanos = flowFileRepoUpdateFinishNanos - claimRemovalFinishNanos;
updateEventRepository(checkpoint);
updateEventRepository(checkpoint);
final long updateEventRepositoryFinishNanos = System.nanoTime();
final long updateEventRepositoryNanos = updateEventRepositoryFinishNanos - claimRemovalFinishNanos;
final long updateEventRepositoryFinishNanos = System.nanoTime();
final long updateEventRepositoryNanos = updateEventRepositoryFinishNanos - claimRemovalFinishNanos;
// transfer the flowfiles to the connections' queues.
final Map<FlowFileQueue, Collection<FlowFileRecord>> recordMap = new HashMap<>();
for (final StandardRepositoryRecord record : checkpoint.records.values()) {
if (record.isMarkedForAbort() || record.isMarkedForDelete()) {
continue; //these don't need to be transferred
}
// transfer the flowfiles to the connections' queues.
final Map<FlowFileQueue, Collection<FlowFileRecord>> recordMap = new HashMap<>();
for (final StandardRepositoryRecord record : checkpoint.records.values()) {
if (record.isMarkedForAbort() || record.isMarkedForDelete()) {
continue; //these don't need to be transferred
}
// record.getCurrent() will return null if this record was created in this session --
// in this case, we just ignore it, and it will be cleaned up by clearing the records map.
if (record.getCurrent() != null) {
Collection<FlowFileRecord> collection = recordMap.get(record.getDestination());
if (collection == null) {
collection = new ArrayList<>();
recordMap.put(record.getDestination(), collection);
}
collection.add(record.getCurrent());
}
}
// in this case, we just ignore it, and it will be cleaned up by clearing the records map.
if (record.getCurrent() != null) {
Collection<FlowFileRecord> collection = recordMap.get(record.getDestination());
if (collection == null) {
collection = new ArrayList<>();
recordMap.put(record.getDestination(), collection);
}
collection.add(record.getCurrent());
}
}
for (final Map.Entry<FlowFileQueue, Collection<FlowFileRecord>> entry : recordMap.entrySet()) {
entry.getKey().putAll(entry.getValue());
}
for (final Map.Entry<FlowFileQueue, Collection<FlowFileRecord>> entry : recordMap.entrySet()) {
entry.getKey().putAll(entry.getValue());
}
final long enqueueFlowFileFinishNanos = System.nanoTime();
final long enqueueFlowFileNanos = enqueueFlowFileFinishNanos - updateEventRepositoryFinishNanos;
final long enqueueFlowFileFinishNanos = System.nanoTime();
final long enqueueFlowFileNanos = enqueueFlowFileFinishNanos - updateEventRepositoryFinishNanos;
// Delete any files from disk that need to be removed.
for (final Path path : checkpoint.deleteOnCommit) {
try {
Files.deleteIfExists(path);
} catch (final IOException e) {
throw new FlowFileAccessException("Unable to delete " + path.toFile().getAbsolutePath(), e);
}
}
checkpoint.deleteOnCommit.clear();
// Delete any files from disk that need to be removed.
for (final Path path : checkpoint.deleteOnCommit) {
try {
Files.deleteIfExists(path);
} catch (final IOException e) {
throw new FlowFileAccessException("Unable to delete " + path.toFile().getAbsolutePath(), e);
}
}
checkpoint.deleteOnCommit.clear();
if (LOG.isInfoEnabled()) {
final String sessionSummary = summarizeEvents(checkpoint);
if (!sessionSummary.isEmpty()) {
LOG.info("{} for {}, committed the following events: {}", new Object[]{this, connectableDescription, sessionSummary});
}
}
if (LOG.isInfoEnabled()) {
final String sessionSummary = summarizeEvents(checkpoint);
if (!sessionSummary.isEmpty()) {
LOG.info("{} for {}, committed the following events: {}", new Object[]{this, connectableDescription, sessionSummary});
}
}
for (final Map.Entry<String, Long> entry : checkpoint.localCounters.entrySet()) {
adjustCounter(entry.getKey(), entry.getValue(), true);
}
for (final Map.Entry<String, Long> entry : checkpoint.localCounters.entrySet()) {
adjustCounter(entry.getKey(), entry.getValue(), true);
}
for (final Map.Entry<String, Long> entry : checkpoint.globalCounters.entrySet()) {
adjustCounter(entry.getKey(), entry.getValue(), true);
}
for (final Map.Entry<String, Long> entry : checkpoint.globalCounters.entrySet()) {
adjustCounter(entry.getKey(), entry.getValue(), true);
}
acknowledgeRecords();
resetState();
acknowledgeRecords();
resetState();
if (LOG.isDebugEnabled()) {
final StringBuilder timingInfo = new StringBuilder();
timingInfo.append("Session commit for ").append(this).append(" [").append(connectableDescription).append("]").append(" took ");
if (LOG.isDebugEnabled()) {
final StringBuilder timingInfo = new StringBuilder();
timingInfo.append("Session commit for ").append(this).append(" [").append(connectableDescription).append("]").append(" took ");
final long commitNanos = System.nanoTime() - commitStartNanos;
formatNanos(commitNanos, timingInfo);
timingInfo.append("; FlowFile Repository Update took ");
formatNanos(flowFileRepoUpdateNanos, timingInfo);
timingInfo.append("; Claim Removal took ");
formatNanos(claimRemovalNanos, timingInfo);
timingInfo.append("; FlowFile Event Update took ");
formatNanos(updateEventRepositoryNanos, timingInfo);
timingInfo.append("; Enqueuing FlowFiles took ");
formatNanos(enqueueFlowFileNanos, timingInfo);
timingInfo.append("; Updating Provenance Event Repository took ");
formatNanos(updateProvenanceNanos, timingInfo);
final long commitNanos = System.nanoTime() - commitStartNanos;
formatNanos(commitNanos, timingInfo);
timingInfo.append("; FlowFile Repository Update took ");
formatNanos(flowFileRepoUpdateNanos, timingInfo);
timingInfo.append("; Claim Removal took ");
formatNanos(claimRemovalNanos, timingInfo);
timingInfo.append("; FlowFile Event Update took ");
formatNanos(updateEventRepositoryNanos, timingInfo);
timingInfo.append("; Enqueuing FlowFiles took ");
formatNanos(enqueueFlowFileNanos, timingInfo);
timingInfo.append("; Updating Provenance Event Repository took ");
formatNanos(updateProvenanceNanos, timingInfo);
LOG.debug(timingInfo.toString());
}
} catch (final Exception e) {
try {
rollback();
} catch (final Exception e1) {
e.addSuppressed(e1);
}
throw e;
}
LOG.debug(timingInfo.toString());
}
} catch (final Exception e) {
try {
rollback();
} catch (final Exception e1) {
e.addSuppressed(e1);
}
throw e;
}
}
private void updateEventRepository(final Checkpoint checkpoint) {
@ -448,16 +449,16 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
}
switch (event.getEventType()) {
case SEND:
flowFilesSent++;
bytesSent += event.getFileSize();
break;
case RECEIVE:
flowFilesReceived++;
bytesReceived += event.getFileSize();
break;
default:
break;
case SEND:
flowFilesSent++;
bytesSent += event.getFileSize();
break;
case RECEIVE:
flowFilesReceived++;
bytesReceived += event.getFileSize();
break;
default:
break;
}
}
@ -610,9 +611,9 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
boolean creationEventRegistered = false;
if (registeredTypes != null) {
if (registeredTypes.contains(ProvenanceEventType.CREATE)
|| registeredTypes.contains(ProvenanceEventType.FORK)
|| registeredTypes.contains(ProvenanceEventType.JOIN)
|| registeredTypes.contains(ProvenanceEventType.RECEIVE)) {
|| registeredTypes.contains(ProvenanceEventType.FORK)
|| registeredTypes.contains(ProvenanceEventType.JOIN)
|| registeredTypes.contains(ProvenanceEventType.RECEIVE)) {
creationEventRegistered = true;
}
}
@ -701,11 +702,11 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
builder.setCurrentContentClaim(null, null, null, null, 0L);
} else {
builder.setCurrentContentClaim(
originalClaim.getContainer(), originalClaim.getSection(), originalClaim.getId(), repoRecord.getOriginal().getContentClaimOffset(), repoRecord.getOriginal().getSize()
);
originalClaim.getContainer(), originalClaim.getSection(), originalClaim.getId(), repoRecord.getOriginal().getContentClaimOffset(), repoRecord.getOriginal().getSize()
);
builder.setPreviousContentClaim(
originalClaim.getContainer(), originalClaim.getSection(), originalClaim.getId(), repoRecord.getOriginal().getContentClaimOffset(), repoRecord.getOriginal().getSize()
);
originalClaim.getContainer(), originalClaim.getSection(), originalClaim.getId(), repoRecord.getOriginal().getContentClaimOffset(), repoRecord.getOriginal().getSize()
);
}
}
@ -741,7 +742,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
}
private StandardProvenanceEventRecord enrich(
final ProvenanceEventRecord rawEvent, final Map<String, FlowFileRecord> flowFileRecordMap, final Map<FlowFileRecord, StandardRepositoryRecord> records, final boolean updateAttributes) {
final ProvenanceEventRecord rawEvent, final Map<String, FlowFileRecord> flowFileRecordMap, final Map<FlowFileRecord, StandardRepositoryRecord> records, final boolean updateAttributes) {
final StandardProvenanceEventRecord.Builder recordBuilder = new StandardProvenanceEventRecord.Builder().fromEvent(rawEvent);
final FlowFileRecord eventFlowFile = flowFileRecordMap.get(rawEvent.getFlowFileUuid());
if (eventFlowFile != null) {
@ -781,8 +782,10 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
}
/**
* Checks if the given event is a spurious FORK, meaning that the FORK has a single child and that child was removed in this session. This happens when a Processor calls #create(FlowFile) and then
* removes the created FlowFile.
* Checks if the given event is a spurious FORK, meaning that the FORK has a
* single child and that child was removed in this session. This happens
* when a Processor calls #create(FlowFile) and then removes the created
* FlowFile.
*
* @param event event
* @return true if spurious fork
@ -799,8 +802,10 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
}
/**
* Checks if the given event is a spurious ROUTE, meaning that the ROUTE indicates that a FlowFile was routed to a relationship with only 1 connection and that Connection is the Connection from
* which the FlowFile was pulled. I.e., the FlowFile was really routed nowhere.
* Checks if the given event is a spurious ROUTE, meaning that the ROUTE
* indicates that a FlowFile was routed to a relationship with only 1
* connection and that Connection is the Connection from which the FlowFile
* was pulled. I.e., the FlowFile was really routed nowhere.
*
* @param event event
* @param records records
@ -1010,7 +1015,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
final StringBuilder sb = new StringBuilder(512);
if (!LOG.isDebugEnabled() && (largestTransferSetSize > VERBOSE_LOG_THRESHOLD
|| numModified > VERBOSE_LOG_THRESHOLD || numCreated > VERBOSE_LOG_THRESHOLD || numRemoved > VERBOSE_LOG_THRESHOLD)) {
|| numModified > VERBOSE_LOG_THRESHOLD || numCreated > VERBOSE_LOG_THRESHOLD || numRemoved > VERBOSE_LOG_THRESHOLD)) {
if (numCreated > 0) {
sb.append("created ").append(numCreated).append(" FlowFiles, ");
}
@ -1258,8 +1263,8 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
attrs.put(CoreAttributes.UUID.key(), UUID.randomUUID().toString());
final FlowFileRecord fFile = new StandardFlowFileRecord.Builder().id(context.getNextFlowFileSequence())
.addAttributes(attrs)
.build();
.addAttributes(attrs)
.build();
final StandardRepositoryRecord record = new StandardRepositoryRecord(null);
record.setWorking(fFile, attrs);
records.put(fFile, record);
@ -1621,7 +1626,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
}
final StandardProvenanceReporter expiredReporter = new StandardProvenanceReporter(this, connectable.getIdentifier(),
processorType, context.getProvenanceRepository(), this);
processorType, context.getProvenanceRepository(), this);
final Map<String, FlowFileRecord> recordIdMap = new HashMap<>();
for (final FlowFileRecord flowFile : flowFiles) {
@ -1748,9 +1753,9 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
}
try (final InputStream rawIn = getInputStream(source, record.getCurrentClaim(), record.getCurrentClaimOffset());
final InputStream limitedIn = new LimitedInputStream(rawIn, source.getSize());
final InputStream disableOnCloseIn = new DisableOnCloseInputStream(limitedIn);
final ByteCountingInputStream countingStream = new ByteCountingInputStream(disableOnCloseIn, this.bytesRead)) {
final InputStream limitedIn = new LimitedInputStream(rawIn, source.getSize());
final InputStream disableOnCloseIn = new DisableOnCloseInputStream(limitedIn);
final ByteCountingInputStream countingStream = new ByteCountingInputStream(disableOnCloseIn, this.bytesRead)) {
// We want to differentiate between IOExceptions thrown by the repository and IOExceptions thrown from
// Processor code. As a result, as have the FlowFileAccessInputStream that catches IOException from the repository
@ -1821,7 +1826,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
try {
try (final OutputStream rawOut = contentRepo.write(newClaim);
final OutputStream out = new BufferedOutputStream(rawOut)) {
final OutputStream out = new BufferedOutputStream(rawOut)) {
if (header != null && header.length > 0) {
out.write(header);
@ -1928,7 +1933,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
ensureNotAppending(newClaim);
try (final OutputStream disableOnClose = new DisableOnCloseOutputStream(currentWriteClaimStream);
final OutputStream countingOut = new ByteCountingOutputStream(disableOnClose, writtenHolder)) {
final OutputStream countingOut = new ByteCountingOutputStream(disableOnClose, writtenHolder)) {
recursionSet.add(source);
@ -1951,7 +1956,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
ensureNotAppending(newClaim);
try (final OutputStream stream = context.getContentRepository().write(newClaim);
final OutputStream countingOut = new ByteCountingOutputStream(stream, writtenHolder)) {
final OutputStream countingOut = new ByteCountingOutputStream(stream, writtenHolder)) {
recursionSet.add(source);
writeRecursionLevel++;
@ -2092,10 +2097,13 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
}
/**
* Checks if the ContentClaim associated with this record should be removed, since the record is about to be updated to point to a new content claim. If so, removes the working claim.
* Checks if the ContentClaim associated with this record should be removed,
* since the record is about to be updated to point to a new content claim.
* If so, removes the working claim.
*
* This happens if & only if the content of this FlowFile has been modified since it was last committed to the FlowFile repository, because this indicates that the content is no longer needed and
* should be cleaned up.
* This happens if & only if the content of this FlowFile has been modified
* since it was last committed to the FlowFile repository, because this
* indicates that the content is no longer needed and should be cleaned up.
*
* @param record record
*/
@ -2125,22 +2133,22 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
}
private void resetWriteClaims() {
resetWriteClaims(true);
resetWriteClaims(true);
}
private void resetWriteClaims(final boolean suppressExceptions) {
try {
if (currentWriteClaimStream != null) {
try {
currentWriteClaimStream.flush();
} finally {
currentWriteClaimStream.close();
}
try {
currentWriteClaimStream.flush();
} finally {
currentWriteClaimStream.close();
}
}
} catch (final IOException e) {
if (!suppressExceptions) {
throw new FlowFileAccessException("Unable to flush the output of FlowFile to the Content Repository");
}
if (!suppressExceptions) {
throw new FlowFileAccessException("Unable to flush the output of FlowFile to the Content Repository");
}
}
currentWriteClaimStream = null;
@ -2150,15 +2158,15 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
for (final ByteCountingOutputStream out : appendableStreams.values()) {
try {
try {
out.flush();
} finally {
out.close();
}
try {
out.flush();
} finally {
out.close();
}
} catch (final IOException e) {
if (!suppressExceptions) {
throw new FlowFileAccessException("Unable to flush the output of FlowFile to the Content Repository");
}
if (!suppressExceptions) {
throw new FlowFileAccessException("Unable to flush the output of FlowFile to the Content Repository");
}
}
}
appendableStreams.clear();
@ -2176,7 +2184,8 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
}
/**
* @return Indicates whether or not multiple FlowFiles should be merged into a single ContentClaim
* @return Indicates whether or not multiple FlowFiles should be merged into
* a single ContentClaim
*/
private boolean isMergeContent() {
if (writeRecursionLevel > 0) {
@ -2204,11 +2213,11 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
ensureNotAppending(newClaim);
try (final InputStream rawIn = getInputStream(source, currClaim, record.getCurrentClaimOffset());
final InputStream limitedIn = new LimitedInputStream(rawIn, source.getSize());
final InputStream disableOnCloseIn = new DisableOnCloseInputStream(limitedIn);
final InputStream countingIn = new ByteCountingInputStream(disableOnCloseIn, bytesRead);
final OutputStream disableOnCloseOut = new DisableOnCloseOutputStream(currentWriteClaimStream);
final OutputStream countingOut = new ByteCountingOutputStream(disableOnCloseOut, writtenHolder)) {
final InputStream limitedIn = new LimitedInputStream(rawIn, source.getSize());
final InputStream disableOnCloseIn = new DisableOnCloseInputStream(limitedIn);
final InputStream countingIn = new ByteCountingInputStream(disableOnCloseIn, bytesRead);
final OutputStream disableOnCloseOut = new DisableOnCloseOutputStream(currentWriteClaimStream);
final OutputStream countingOut = new ByteCountingOutputStream(disableOnCloseOut, writtenHolder)) {
recursionSet.add(source);
@ -2247,10 +2256,10 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
ensureNotAppending(newClaim);
try (final InputStream is = getInputStream(source, currClaim, record.getCurrentClaimOffset());
final InputStream limitedIn = new LimitedInputStream(is, source.getSize());
final InputStream countingIn = new ByteCountingInputStream(limitedIn, bytesRead);
final OutputStream os = context.getContentRepository().write(newClaim);
final OutputStream countingOut = new ByteCountingOutputStream(os, writtenHolder)) {
final InputStream limitedIn = new LimitedInputStream(is, source.getSize());
final InputStream countingIn = new ByteCountingInputStream(limitedIn, bytesRead);
final OutputStream os = context.getContentRepository().write(newClaim);
final OutputStream countingOut = new ByteCountingOutputStream(os, writtenHolder)) {
recursionSet.add(source);
@ -2363,9 +2372,9 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
removeTemporaryClaim(record);
final FlowFileRecord newFile = new StandardFlowFileRecord.Builder().fromFlowFile(record.getCurrent())
.contentClaim(newClaim).contentClaimOffset(claimOffset).size(newSize)
.addAttribute(CoreAttributes.FILENAME.key(), source.toFile().getName())
.build();
.contentClaim(newClaim).contentClaimOffset(claimOffset).size(newSize)
.addAttribute(CoreAttributes.FILENAME.key(), source.toFile().getName())
.build();
record.setWorking(newFile, CoreAttributes.FILENAME.key(), source.toFile().getName());
if (!keepSourceFile) {
deleteOnCommit.add(source);
@ -2520,7 +2529,8 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
* Checks if a FlowFile is known in this session.
*
* @param flowFile the FlowFile to check
* @return <code>true</code> if the FlowFile is known in this session, <code>false</code> otherwise.
* @return <code>true</code> if the FlowFile is known in this session,
* <code>false</code> otherwise.
*/
boolean isFlowFileKnown(final FlowFile flowFile) {
return records.containsKey(flowFile);
@ -2542,8 +2552,8 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
final String key = entry.getKey();
final String value = entry.getValue();
if (CoreAttributes.ALTERNATE_IDENTIFIER.key().equals(key)
|| CoreAttributes.DISCARD_REASON.key().equals(key)
|| CoreAttributes.UUID.key().equals(key)) {
|| CoreAttributes.DISCARD_REASON.key().equals(key)
|| CoreAttributes.UUID.key().equals(key)) {
continue;
}
newAttributes.put(key, value);
@ -2591,10 +2601,10 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
newAttributes.put(CoreAttributes.UUID.key(), UUID.randomUUID().toString());
final FlowFileRecord fFile = new StandardFlowFileRecord.Builder().id(context.getNextFlowFileSequence())
.addAttributes(newAttributes)
.lineageIdentifiers(lineageIdentifiers)
.lineageStartDate(lineageStartDate)
.build();
.addAttributes(newAttributes)
.lineageIdentifiers(lineageIdentifiers)
.lineageStartDate(lineageStartDate)
.build();
final StandardRepositoryRecord record = new StandardRepositoryRecord(null);
record.setWorking(fFile, newAttributes);
@ -2606,7 +2616,8 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
}
/**
* Returns the attributes that are common to every FlowFile given. The key and value must match exactly.
* Returns the attributes that are common to every FlowFile given. The key
* and value must match exactly.
*
* @param flowFileList a list of FlowFiles
*
@ -2628,18 +2639,18 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
final Map<String, String> firstMap = flowFileList.iterator().next().getAttributes();
outer:
for (final Map.Entry<String, String> mapEntry : firstMap.entrySet()) {
final String key = mapEntry.getKey();
final String value = mapEntry.getValue();
for (final FlowFile flowFile : flowFileList) {
final Map<String, String> currMap = flowFile.getAttributes();
final String curVal = currMap.get(key);
if (curVal == null || !curVal.equals(value)) {
continue outer;
}
for (final Map.Entry<String, String> mapEntry : firstMap.entrySet()) {
final String key = mapEntry.getKey();
final String value = mapEntry.getValue();
for (final FlowFile flowFile : flowFileList) {
final Map<String, String> currMap = flowFile.getAttributes();
final String curVal = currMap.get(key);
if (curVal == null || !curVal.equals(value)) {
continue outer;
}
result.put(key, value);
}
result.put(key, value);
}
return result;
}
@ -2661,7 +2672,8 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
}
/**
* Callback interface used to poll a FlowFileQueue, in order to perform functional programming-type of polling a queue
* Callback interface used to poll a FlowFileQueue, in order to perform
* functional programming-type of polling a queue
*/
private static interface QueuePoller {