mirror of https://github.com/apache/nifi.git
NIFI-5150: Fixed bug that caused StandardProcessSession.append() to copy too much data when called on an incoming flowfile
This closes #2676. Signed-off-by: Bryan Bende <bbende@apache.org>
This commit is contained in:
parent
5cfa29e48f
commit
868808f4b4
|
@ -2677,7 +2677,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
|
|||
if (outStream == null) {
|
||||
claimCache.flush(oldClaim);
|
||||
|
||||
try (final InputStream oldClaimIn = context.getContentRepository().read(oldClaim)) {
|
||||
try (final InputStream oldClaimIn = read(source)) {
|
||||
newClaim = context.getContentRepository().create(context.getConnectable().isLossTolerant());
|
||||
claimLog.debug("Creating ContentClaim {} for 'append' for {}", newClaim, source);
|
||||
|
||||
|
|
|
@ -38,6 +38,7 @@ import java.io.FilterOutputStream;
|
|||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.io.OutputStream;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.nio.file.Paths;
|
||||
|
@ -73,6 +74,7 @@ import org.apache.nifi.flowfile.attributes.CoreAttributes;
|
|||
import org.apache.nifi.groups.ProcessGroup;
|
||||
import org.apache.nifi.processor.FlowFileFilter;
|
||||
import org.apache.nifi.processor.FlowFileFilter.FlowFileFilterResult;
|
||||
import org.apache.nifi.processor.ProcessSession;
|
||||
import org.apache.nifi.processor.Relationship;
|
||||
import org.apache.nifi.processor.exception.FlowFileAccessException;
|
||||
import org.apache.nifi.processor.exception.MissingFlowFileException;
|
||||
|
@ -1176,6 +1178,36 @@ public class TestStandardProcessSession {
|
|||
assertEquals("Hello, World", new String(buff));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAppendToFlowFileWhereResourceClaimHasMultipleContentClaims() throws IOException {
|
||||
final Relationship relationship = new Relationship.Builder().name("A").build();
|
||||
|
||||
FlowFile ffa = session.create();
|
||||
ffa = session.write(ffa, (out) -> out.write('A'));
|
||||
session.transfer(ffa, relationship);
|
||||
|
||||
FlowFile ffb = session.create();
|
||||
ffb = session.write(ffb, (out) -> out.write('B'));
|
||||
session.transfer(ffb, relationship);
|
||||
session.commit();
|
||||
|
||||
final ProcessSession newSession = new StandardProcessSession(context, () -> false);
|
||||
FlowFile toUpdate = newSession.get();
|
||||
newSession.append(toUpdate, out -> out.write('C'));
|
||||
|
||||
// Read the content back and ensure that it is correct
|
||||
final byte[] buff;
|
||||
try (final ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
|
||||
newSession.read(toUpdate, in -> StreamUtils.copy(in, baos));
|
||||
buff = baos.toByteArray();
|
||||
}
|
||||
|
||||
final String output = new String(buff, StandardCharsets.UTF_8);
|
||||
assertEquals("AC", output);
|
||||
newSession.transfer(toUpdate);
|
||||
newSession.commit();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAppendDoesNotDecrementContentClaimIfNotNeeded() {
|
||||
FlowFile flowFile = session.create();
|
||||
|
|
Loading…
Reference in New Issue