NIFI-1650: Ensure that we seek to the appropriate offset within the Content Claim when downloading content of a FlowFile

This commit is contained in:
Mark Payne 2016-03-19 11:18:02 -04:00
parent 9cde92da16
commit d3578a7c03
1 changed files with 11 additions and 2 deletions

View File

@ -158,6 +158,8 @@ import org.apache.nifi.reporting.ReportingInitializationContext;
import org.apache.nifi.reporting.ReportingTask; import org.apache.nifi.reporting.ReportingTask;
import org.apache.nifi.reporting.Severity; import org.apache.nifi.reporting.Severity;
import org.apache.nifi.scheduling.SchedulingStrategy; import org.apache.nifi.scheduling.SchedulingStrategy;
import org.apache.nifi.stream.io.LimitingInputStream;
import org.apache.nifi.stream.io.StreamUtils;
import org.apache.nifi.util.FormatUtils; import org.apache.nifi.util.FormatUtils;
import org.apache.nifi.util.NiFiProperties; import org.apache.nifi.util.NiFiProperties;
import org.apache.nifi.util.ReflectionUtils; import org.apache.nifi.util.ReflectionUtils;
@ -3381,7 +3383,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
requireNonNull(requestor); requireNonNull(requestor);
requireNonNull(requestUri); requireNonNull(requestUri);
final InputStream stream; InputStream stream;
final ResourceClaim resourceClaim; final ResourceClaim resourceClaim;
final ContentClaim contentClaim = flowFile.getContentClaim(); final ContentClaim contentClaim = flowFile.getContentClaim();
if (contentClaim == null) { if (contentClaim == null) {
@ -3390,6 +3392,12 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
} else { } else {
resourceClaim = flowFile.getContentClaim().getResourceClaim(); resourceClaim = flowFile.getContentClaim().getResourceClaim();
stream = contentRepository.read(flowFile.getContentClaim()); stream = contentRepository.read(flowFile.getContentClaim());
final long contentClaimOffset = flowFile.getContentClaimOffset();
if (contentClaimOffset > 0L) {
StreamUtils.skip(stream, contentClaimOffset);
}
stream = new LimitingInputStream(stream, flowFile.getSize());
} }
// Register a Provenance Event to indicate that we replayed the data. // Register a Provenance Event to indicate that we replayed the data.
@ -3406,7 +3414,8 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
.setDetails("Download of Content requested by " + requestor + " for " + flowFile); .setDetails("Download of Content requested by " + requestor + " for " + flowFile);
if (contentClaim != null) { if (contentClaim != null) {
sendEventBuilder.setCurrentContentClaim(resourceClaim.getContainer(), resourceClaim.getSection(), resourceClaim.getId(), contentClaim.getOffset(), flowFile.getSize()); sendEventBuilder.setCurrentContentClaim(resourceClaim.getContainer(), resourceClaim.getSection(), resourceClaim.getId(),
contentClaim.getOffset() + flowFile.getContentClaimOffset(), flowFile.getSize());
} }
final ProvenanceEventRecord sendEvent = sendEventBuilder.build(); final ProvenanceEventRecord sendEvent = sendEventBuilder.build();