NIFI-387: If possible don't use ContentRepository.importFrom but just copy stream directly in StandardProcessSession

This commit is contained in:
Mark Payne 2015-02-26 09:04:05 -05:00
parent 50744bfdc6
commit 739f0c25e2
1 changed files with 26 additions and 20 deletions

View File

@ -2351,35 +2351,41 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
public FlowFile importFrom(final InputStream source, final FlowFile destination) {
validateRecordState(destination);
final StandardRepositoryRecord record = records.get(destination);
final ContentClaim newClaim;
ContentClaim newClaim = null;
long claimOffset = 0L;
final boolean appendToClaim = isMergeContent();
if (appendToClaim) {
enforceCurrentWriteClaimState();
newClaim = currentWriteClaim;
claimOffset = currentWriteClaimSize;
} else {
try {
newClaim = context.getContentRepository().create(context.getConnectable().isLossTolerant());
claimLog.debug("Creating ContentClaim {} for 'importFrom' for {}", newClaim, destination);
} catch (final IOException e) {
throw new FlowFileAccessException("Unable to create ContentClaim due to " + e.toString(), e);
}
}
final long newSize;
final boolean appendToClaim = isMergeContent();
try {
final boolean append = isMergeContent();
newSize = context.getContentRepository().importFrom(source, newClaim, append);
bytesWritten.increment(newSize);
currentWriteClaimSize += newSize;
if (appendToClaim) {
enforceCurrentWriteClaimState();
newClaim = currentWriteClaim;
claimOffset = currentWriteClaimSize;
final long bytesCopied = StreamUtils.copy(source, currentWriteClaimStream);
bytesWritten.increment(bytesCopied);
currentWriteClaimSize += bytesCopied;
newSize = bytesCopied;
} else {
try {
newClaim = context.getContentRepository().create(context.getConnectable().isLossTolerant());
claimLog.debug("Creating ContentClaim {} for 'importFrom' for {}", newClaim, destination);
newSize = context.getContentRepository().importFrom(source, newClaim, appendToClaim);
bytesWritten.increment(newSize);
currentWriteClaimSize += newSize;
} catch (final IOException e) {
throw new FlowFileAccessException("Unable to create ContentClaim due to " + e.toString(), e);
}
}
} catch (final Throwable t) {
if (appendToClaim) {
resetWriteClaims();
}
destroyContent(newClaim);
if ( newClaim != null ) {
destroyContent(newClaim);
}
throw new FlowFileAccessException("Failed to import data from " + source + " for " + destination + " due to " + t.toString(), t);
}