NIFI-3079: Ensure that in all cases we increment session's bytesRead count when finished reading from Content Repo

This closes #3079.

Signed-off-by: Bryan Bende <bbende@apache.org>
This commit is contained in:
Mark Payne 2016-11-22 09:41:34 -05:00 committed by Bryan Bende
parent ead9205458
commit aa99cc6822
No known key found for this signature in database
GPG Key ID: A0DDA9ED50711C39
1 changed files with 6 additions and 4 deletions

View File

@ -2091,6 +2091,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
throw cnfe; throw cnfe;
} finally { } finally {
recursionSet.remove(source); recursionSet.remove(source);
bytesRead += countingStream.getBytesRead();
// if cnfeThrown is true, we don't need to re-thrown the Exception; it will propagate. // if cnfeThrown is true, we don't need to re-thrown the Exception; it will propagate.
if (!cnfeThrown && ffais.getContentNotFoundException() != null) { if (!cnfeThrown && ffais.getContentNotFoundException() != null) {
@ -2162,6 +2163,8 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
@Override @Override
public void close() throws IOException { public void close() throws IOException {
StandardProcessSession.this.bytesRead += countingStream.getBytesRead();
ffais.close(); ffais.close();
openInputStreams.remove(source); openInputStreams.remove(source);
} }
@ -2556,7 +2559,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
try (final InputStream is = getInputStream(source, currClaim, record.getCurrentClaimOffset(), true); try (final InputStream is = getInputStream(source, currClaim, record.getCurrentClaimOffset(), true);
final InputStream limitedIn = new LimitedInputStream(is, source.getSize()); final InputStream limitedIn = new LimitedInputStream(is, source.getSize());
final InputStream disableOnCloseIn = new DisableOnCloseInputStream(limitedIn); final InputStream disableOnCloseIn = new DisableOnCloseInputStream(limitedIn);
final InputStream countingIn = new ByteCountingInputStream(disableOnCloseIn, bytesRead); final ByteCountingInputStream countingIn = new ByteCountingInputStream(disableOnCloseIn, bytesRead);
final OutputStream os = context.getContentRepository().write(newClaim); final OutputStream os = context.getContentRepository().write(newClaim);
final OutputStream disableOnCloseOut = new DisableOnCloseOutputStream(os); final OutputStream disableOnCloseOut = new DisableOnCloseOutputStream(os);
final ByteCountingOutputStream countingOut = new ByteCountingOutputStream(disableOnCloseOut)) { final ByteCountingOutputStream countingOut = new ByteCountingOutputStream(disableOnCloseOut)) {
@ -2577,7 +2580,8 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
cnfeThrown = true; cnfeThrown = true;
throw cnfe; throw cnfe;
} finally { } finally {
writtenToFlowFile = countingOut.getBytesWritten(); this.bytesWritten += countingOut.getBytesWritten();
this.bytesRead += countingIn.getBytesRead();
recursionSet.remove(source); recursionSet.remove(source);
// if cnfeThrown is true, we don't need to re-thrown the Exception; it will propagate. // if cnfeThrown is true, we don't need to re-thrown the Exception; it will propagate.
@ -2598,8 +2602,6 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
} catch (final Throwable t) { } catch (final Throwable t) {
destroyContent(newClaim); destroyContent(newClaim);
throw t; throw t;
} finally {
bytesWritten += writtenToFlowFile;
} }
removeTemporaryClaim(record); removeTemporaryClaim(record);