NIFI-396 created tests to demonstrate the situations where the ProcessSession throws an Exception and where it doesn't after it returns from the callback

This commit is contained in:
Bobby Owolabi 2015-03-18 23:30:57 -04:00
parent dea9e22475
commit 7272d0df58
1 changed files with 108 additions and 0 deletions

View File

@ -56,6 +56,7 @@ import org.apache.nifi.controller.repository.claim.StandardContentClaimManager;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.FlowFileAccessException;
import org.apache.nifi.processor.exception.MissingFlowFileException;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.InputStreamCallback;
@ -65,6 +66,8 @@ import org.apache.nifi.provenance.MockProvenanceEventRepository;
import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.provenance.ProvenanceEventRepository;
import org.apache.nifi.provenance.ProvenanceEventType;
import org.apache.nifi.stream.io.ByteArrayOutputStream;
import org.apache.nifi.util.ObjectHolder;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@ -235,6 +238,105 @@ public class TestStandardProcessSession {
assertEquals(0, contentRepo.getExistingClaims().size());
}
@Test(expected = FlowFileAccessException.class)
public void testAppendAfterSessionClosesStream() throws IOException {
final ContentClaim claim = contentRepo.create(false);
final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder()
.contentClaim(claim)
.addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
.entryDate(System.currentTimeMillis())
.build();
flowFileQueue.put(flowFileRecord);
FlowFile flowFile = session.get();
assertNotNull(flowFile);
final ObjectHolder<OutputStream> outputStreamHolder = new ObjectHolder<>(null);
flowFile = session.append(flowFile, new OutputStreamCallback() {
@Override
public void process(final OutputStream outputStream) throws IOException {
outputStreamHolder.set(outputStream);
}
});
try (final OutputStream outputStream = outputStreamHolder.get()) {
outputStream.write(5);
}
}
@Test(expected = FlowFileAccessException.class)
public void testReadAfterSessionClosesStream() throws IOException {
final ContentClaim claim = contentRepo.create(false);
final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder()
.contentClaim(claim)
.addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
.entryDate(System.currentTimeMillis())
.build();
flowFileQueue.put(flowFileRecord);
FlowFile flowFile = session.get();
assertNotNull(flowFile);
final ObjectHolder<InputStream> inputStreamHolder = new ObjectHolder<>(null);
session.read(flowFile, new InputStreamCallback() {
@Override
public void process(final InputStream inputStream) throws IOException {
inputStreamHolder.set(inputStream);
}
});
try (final InputStream inputStream = inputStreamHolder.get()) {
inputStream.read();
}
}
@Test
public void testStreamAfterSessionClosesStream() throws IOException {
final ContentClaim claim = contentRepo.create(false);
final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder()
.contentClaim(claim)
.addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
.entryDate(System.currentTimeMillis())
.build();
flowFileQueue.put(flowFileRecord);
FlowFile flowFile = session.get();
assertNotNull(flowFile);
final ObjectHolder<InputStream> inputStreamHolder = new ObjectHolder<>(null);
final ObjectHolder<OutputStream> outputStreamHolder = new ObjectHolder<>(null);
flowFile = session.write(flowFile, new StreamCallback() {
@Override
public void process(final InputStream input, final OutputStream output) throws IOException {
inputStreamHolder.set(input);
outputStreamHolder.set(output);
}
});
try (final InputStream inputStream = inputStreamHolder.get()) {
inputStream.read();
Assert.fail("Expected Exception to be thrown when read is attempted after session closes stream");
} catch (final Exception ex) {}
try (final OutputStream outputStream = outputStreamHolder.get()) {
outputStream.write(5);
Assert.fail("Expected Exception to be thrown when write is attempted after session closes stream");
} catch (final Exception ex) {}
}
@Test(expected = FlowFileAccessException.class)
public void testWriteAfterSessionClosesStream() throws IOException {
final ContentClaim claim = contentRepo.create(false);
final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder()
.contentClaim(claim)
.addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
.entryDate(System.currentTimeMillis())
.build();
flowFileQueue.put(flowFileRecord);
FlowFile flowFile = session.get();
assertNotNull(flowFile);
final ObjectHolder<OutputStream> outputStreamHolder = new ObjectHolder<>(null);
flowFile = session.write(flowFile, new OutputStreamCallback() {
@Override
public void process(final OutputStream out) throws IOException {
outputStreamHolder.set(out);
}
});
try (final OutputStream outputStream = outputStreamHolder.get()) {
outputStream.write(5);
}
}
@Test
public void testCreateThenRollbackRemovesContent() throws IOException {
@ -998,6 +1100,12 @@ public class TestStandardProcessSession {
public ContentClaim create(boolean lossTolerant) throws IOException {
final ContentClaim claim = claimManager.newContentClaim("container", "section", String.valueOf(idGenerator.getAndIncrement()), false);
claimantCounts.put(claim, new AtomicInteger(1));
final Path path = getPath(claim);
final Path parent = path.getParent();
if (Files.exists(parent) == false) {
Files.createDirectories(parent);
}
Files.createFile(getPath(claim));
return claim;
}