mirror of https://github.com/apache/nifi.git
NIFI-396 reverting accidentially modified whitespace in TestStandardProcessSession
This commit is contained in:
parent
cd183be441
commit
3f36236473
|
@ -426,6 +426,7 @@ public class TestStandardProcessSession {
|
|||
assertEquals(0, provenanceRepo.getEvents(0L, 100000).size());
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testProvenanceEventsEmittedForForkIfNotRemoved() throws IOException {
|
||||
final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder()
|
||||
|
@ -465,59 +466,59 @@ public class TestStandardProcessSession {
|
|||
@Test
|
||||
public void testUpdateAttributesThenJoin() throws IOException {
|
||||
final FlowFileRecord flowFileRecord1 = new StandardFlowFileRecord.Builder()
|
||||
.id(1L)
|
||||
.addAttribute("uuid", "11111111-1111-1111-1111-111111111111")
|
||||
.entryDate(System.currentTimeMillis())
|
||||
.build();
|
||||
|
||||
.id(1L)
|
||||
.addAttribute("uuid", "11111111-1111-1111-1111-111111111111")
|
||||
.entryDate(System.currentTimeMillis())
|
||||
.build();
|
||||
|
||||
final FlowFileRecord flowFileRecord2 = new StandardFlowFileRecord.Builder()
|
||||
.id(2L)
|
||||
.addAttribute("uuid", "22222222-2222-2222-2222-222222222222")
|
||||
.entryDate(System.currentTimeMillis())
|
||||
.build();
|
||||
|
||||
.id(2L)
|
||||
.addAttribute("uuid", "22222222-2222-2222-2222-222222222222")
|
||||
.entryDate(System.currentTimeMillis())
|
||||
.build();
|
||||
|
||||
flowFileQueue.put(flowFileRecord1);
|
||||
flowFileQueue.put(flowFileRecord2);
|
||||
|
||||
|
||||
FlowFile ff1 = session.get();
|
||||
FlowFile ff2 = session.get();
|
||||
|
||||
ff1 = session.putAttribute(ff1, "index", "1");
|
||||
ff2 = session.putAttribute(ff2, "index", "2");
|
||||
|
||||
|
||||
final List<FlowFile> parents = new ArrayList<>(2);
|
||||
parents.add(ff1);
|
||||
parents.add(ff2);
|
||||
|
||||
|
||||
final FlowFile child = session.create(parents);
|
||||
|
||||
|
||||
final Relationship rel = new Relationship.Builder().name("A").build();
|
||||
|
||||
|
||||
session.transfer(ff1, rel);
|
||||
session.transfer(ff2, rel);
|
||||
session.transfer(child, rel);
|
||||
|
||||
|
||||
session.commit();
|
||||
|
||||
|
||||
final List<ProvenanceEventRecord> events = provenanceRepo.getEvents(0L, 1000);
|
||||
|
||||
// We should have a JOIN and 2 ATTRIBUTE_MODIFIED's
|
||||
assertEquals(3, events.size());
|
||||
|
||||
|
||||
int joinCount = 0;
|
||||
int ff1UpdateCount = 0;
|
||||
int ff2UpdateCount = 0;
|
||||
|
||||
for (final ProvenanceEventRecord event : events) {
|
||||
|
||||
for ( final ProvenanceEventRecord event : events ) {
|
||||
switch (event.getEventType()) {
|
||||
case JOIN:
|
||||
assertEquals(child.getAttribute("uuid"), event.getFlowFileUuid());
|
||||
joinCount++;
|
||||
break;
|
||||
case ATTRIBUTES_MODIFIED:
|
||||
if (event.getFlowFileUuid().equals(ff1.getAttribute("uuid"))) {
|
||||
if ( event.getFlowFileUuid().equals(ff1.getAttribute("uuid")) ) {
|
||||
ff1UpdateCount++;
|
||||
} else if (event.getFlowFileUuid().equals(ff2.getAttribute("uuid"))) {
|
||||
} else if ( event.getFlowFileUuid().equals(ff2.getAttribute("uuid")) ) {
|
||||
ff2UpdateCount++;
|
||||
} else {
|
||||
Assert.fail("Got ATTRIBUTE_MODIFIED for wrong FlowFile: " + event.getFlowFileUuid());
|
||||
|
@ -527,14 +528,14 @@ public class TestStandardProcessSession {
|
|||
Assert.fail("Unexpected event type: " + event);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
assertEquals(1, joinCount);
|
||||
assertEquals(1, ff1UpdateCount);
|
||||
assertEquals(1, ff2UpdateCount);
|
||||
|
||||
|
||||
assertEquals(1, joinCount);
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testForkOneToOneReported() throws IOException {
|
||||
final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder()
|
||||
|
@ -844,34 +845,34 @@ public class TestStandardProcessSession {
|
|||
@Test
|
||||
public void testContentNotFoundExceptionThrownWhenUnableToReadDataOffsetTooLarge() {
|
||||
final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder()
|
||||
.addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
|
||||
.entryDate(System.currentTimeMillis())
|
||||
.contentClaim(new ContentClaim() {
|
||||
@Override
|
||||
public int compareTo(ContentClaim arg0) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getId() {
|
||||
return "0";
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getContainer() {
|
||||
return "container";
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getSection() {
|
||||
return "section";
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isLossTolerant() {
|
||||
return true;
|
||||
}
|
||||
}).build();
|
||||
.addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
|
||||
.entryDate(System.currentTimeMillis())
|
||||
.contentClaim(new ContentClaim() {
|
||||
@Override
|
||||
public int compareTo(ContentClaim arg0) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getId() {
|
||||
return "0";
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getContainer() {
|
||||
return "container";
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getSection() {
|
||||
return "section";
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isLossTolerant() {
|
||||
return true;
|
||||
}
|
||||
}).build();
|
||||
flowFileQueue.put(flowFileRecord);
|
||||
|
||||
FlowFile ff1 = session.get();
|
||||
|
@ -884,35 +885,35 @@ public class TestStandardProcessSession {
|
|||
session.commit();
|
||||
|
||||
final FlowFileRecord flowFileRecord2 = new StandardFlowFileRecord.Builder()
|
||||
.addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
|
||||
.entryDate(System.currentTimeMillis())
|
||||
.contentClaim(new ContentClaim() {
|
||||
@Override
|
||||
public int compareTo(ContentClaim arg0) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getId() {
|
||||
return "0";
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getContainer() {
|
||||
return "container";
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getSection() {
|
||||
return "section";
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isLossTolerant() {
|
||||
return true;
|
||||
}
|
||||
})
|
||||
.contentClaimOffset(1000L).size(1L).build();
|
||||
.addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
|
||||
.entryDate(System.currentTimeMillis())
|
||||
.contentClaim(new ContentClaim() {
|
||||
@Override
|
||||
public int compareTo(ContentClaim arg0) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getId() {
|
||||
return "0";
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getContainer() {
|
||||
return "container";
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getSection() {
|
||||
return "section";
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isLossTolerant() {
|
||||
return true;
|
||||
}
|
||||
})
|
||||
.contentClaimOffset(1000L).size(1L).build();
|
||||
flowFileQueue.put(flowFileRecord2);
|
||||
|
||||
// attempt to read the data.
|
||||
|
@ -973,20 +974,21 @@ public class TestStandardProcessSession {
|
|||
}
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testCreateEmitted() throws IOException {
|
||||
FlowFile newFlowFile = session.create();
|
||||
session.transfer(newFlowFile, new Relationship.Builder().name("A").build());
|
||||
session.commit();
|
||||
|
||||
|
||||
final List<ProvenanceEventRecord> events = provenanceRepo.getEvents(0L, 10000);
|
||||
assertFalse(events.isEmpty());
|
||||
assertEquals(1, events.size());
|
||||
|
||||
|
||||
final ProvenanceEventRecord event = events.get(0);
|
||||
assertEquals(ProvenanceEventType.CREATE, event.getEventType());
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testContentModifiedNotEmittedForCreate() throws IOException {
|
||||
FlowFile newFlowFile = session.create();
|
||||
|
@ -997,23 +999,23 @@ public class TestStandardProcessSession {
|
|||
});
|
||||
session.transfer(newFlowFile, new Relationship.Builder().name("A").build());
|
||||
session.commit();
|
||||
|
||||
|
||||
final List<ProvenanceEventRecord> events = provenanceRepo.getEvents(0L, 10000);
|
||||
assertFalse(events.isEmpty());
|
||||
assertEquals(1, events.size());
|
||||
|
||||
|
||||
final ProvenanceEventRecord event = events.get(0);
|
||||
assertEquals(ProvenanceEventType.CREATE, event.getEventType());
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testContentModifiedEmittedAndNotAttributesModified() throws IOException {
|
||||
final FlowFileRecord flowFile = new StandardFlowFileRecord.Builder()
|
||||
.id(1L)
|
||||
.addAttribute("uuid", "000000000000-0000-0000-0000-00000000")
|
||||
.build();
|
||||
.id(1L)
|
||||
.addAttribute("uuid", "000000000000-0000-0000-0000-00000000")
|
||||
.build();
|
||||
this.flowFileQueue.put(flowFile);
|
||||
|
||||
|
||||
FlowFile existingFlowFile = session.get();
|
||||
existingFlowFile = session.write(existingFlowFile, new OutputStreamCallback() {
|
||||
@Override
|
||||
|
@ -1023,36 +1025,38 @@ public class TestStandardProcessSession {
|
|||
existingFlowFile = session.putAttribute(existingFlowFile, "attr", "a");
|
||||
session.transfer(existingFlowFile, new Relationship.Builder().name("A").build());
|
||||
session.commit();
|
||||
|
||||
|
||||
final List<ProvenanceEventRecord> events = provenanceRepo.getEvents(0L, 10000);
|
||||
assertFalse(events.isEmpty());
|
||||
assertEquals(1, events.size());
|
||||
|
||||
|
||||
final ProvenanceEventRecord event = events.get(0);
|
||||
assertEquals(ProvenanceEventType.CONTENT_MODIFIED, event.getEventType());
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testAttributesModifiedEmitted() throws IOException {
|
||||
final FlowFileRecord flowFile = new StandardFlowFileRecord.Builder()
|
||||
.id(1L)
|
||||
.addAttribute("uuid", "000000000000-0000-0000-0000-00000000")
|
||||
.build();
|
||||
.id(1L)
|
||||
.addAttribute("uuid", "000000000000-0000-0000-0000-00000000")
|
||||
.build();
|
||||
this.flowFileQueue.put(flowFile);
|
||||
|
||||
|
||||
FlowFile existingFlowFile = session.get();
|
||||
existingFlowFile = session.putAttribute(existingFlowFile, "attr", "a");
|
||||
session.transfer(existingFlowFile, new Relationship.Builder().name("A").build());
|
||||
session.commit();
|
||||
|
||||
|
||||
final List<ProvenanceEventRecord> events = provenanceRepo.getEvents(0L, 10000);
|
||||
assertFalse(events.isEmpty());
|
||||
assertEquals(1, events.size());
|
||||
|
||||
|
||||
final ProvenanceEventRecord event = events.get(0);
|
||||
assertEquals(ProvenanceEventType.ATTRIBUTES_MODIFIED, event.getEventType());
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
private static class MockFlowFileRepository implements FlowFileRepository {
|
||||
|
||||
private final AtomicLong idGenerator = new AtomicLong(0L);
|
||||
|
@ -1119,7 +1123,7 @@ public class TestStandardProcessSession {
|
|||
@Override
|
||||
public void shutdown() {
|
||||
}
|
||||
|
||||
|
||||
public Set<ContentClaim> getExistingClaims() {
|
||||
final Set<ContentClaim> claims = new HashSet<>();
|
||||
|
||||
|
@ -1142,7 +1146,7 @@ public class TestStandardProcessSession {
|
|||
if (Files.exists(parent) == false) {
|
||||
Files.createDirectories(parent);
|
||||
}
|
||||
Files.createFile(getPath(claim));
|
||||
Files.createFile(getPath(claim));
|
||||
return claim;
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue