mirror of https://github.com/apache/nifi.git
NIFI-572: Do not set UUID as being modified when creating a clone
This commit is contained in:
parent
a43eecf1bd
commit
fea59e3249
|
@ -121,8 +121,8 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
|
||||||
|
|
||||||
private int removedCount = 0; // number of flowfiles removed in this session
|
private int removedCount = 0; // number of flowfiles removed in this session
|
||||||
private long removedBytes = 0L; // size of all flowfiles removed in this session
|
private long removedBytes = 0L; // size of all flowfiles removed in this session
|
||||||
private LongHolder bytesRead = new LongHolder(0L);
|
private final LongHolder bytesRead = new LongHolder(0L);
|
||||||
private LongHolder bytesWritten = new LongHolder(0L);
|
private final LongHolder bytesWritten = new LongHolder(0L);
|
||||||
private int flowFilesIn = 0, flowFilesOut = 0;
|
private int flowFilesIn = 0, flowFilesOut = 0;
|
||||||
private long contentSizeIn = 0L, contentSizeOut = 0L;
|
private long contentSizeIn = 0L, contentSizeOut = 0L;
|
||||||
private int writeRecursionLevel = 0;
|
private int writeRecursionLevel = 0;
|
||||||
|
@ -139,11 +139,11 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
|
||||||
// maps a FlowFile to all Provenance Events that were generated for that FlowFile.
|
// maps a FlowFile to all Provenance Events that were generated for that FlowFile.
|
||||||
// we do this so that if we generate a Fork event, for example, and then remove the event in the same
|
// we do this so that if we generate a Fork event, for example, and then remove the event in the same
|
||||||
// Session, we will not send that event to the Provenance Repository
|
// Session, we will not send that event to the Provenance Repository
|
||||||
private Map<FlowFile, List<ProvenanceEventRecord>> generatedProvenanceEvents = new HashMap<>();
|
private final Map<FlowFile, List<ProvenanceEventRecord>> generatedProvenanceEvents = new HashMap<>();
|
||||||
|
|
||||||
// when Forks are generated for a single parent, we add the Fork event to this map, with the Key being the parent
|
// when Forks are generated for a single parent, we add the Fork event to this map, with the Key being the parent
|
||||||
// so that we are able to aggregate many into a single Fork Event.
|
// so that we are able to aggregate many into a single Fork Event.
|
||||||
private Map<FlowFile, ProvenanceEventBuilder> forkEventBuilders = new HashMap<>();
|
private final Map<FlowFile, ProvenanceEventBuilder> forkEventBuilders = new HashMap<>();
|
||||||
|
|
||||||
private Checkpoint checkpoint = new Checkpoint();
|
private Checkpoint checkpoint = new Checkpoint();
|
||||||
|
|
||||||
|
@ -266,7 +266,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
|
||||||
if (claim != null) {
|
if (claim != null) {
|
||||||
context.getContentRepository().incrementClaimaintCount(claim);
|
context.getContentRepository().incrementClaimaintCount(claim);
|
||||||
}
|
}
|
||||||
newRecord.setWorking(clone, CoreAttributes.UUID.key(), newUuid);
|
newRecord.setWorking(clone, Collections.<String, String>emptyMap());
|
||||||
|
|
||||||
newRecord.setDestination(destination.getFlowFileQueue());
|
newRecord.setDestination(destination.getFlowFileQueue());
|
||||||
newRecord.setTransferRelationship(record.getTransferRelationship());
|
newRecord.setTransferRelationship(record.getTransferRelationship());
|
||||||
|
@ -1282,7 +1282,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
|
||||||
context.getContentRepository().incrementClaimaintCount(claim);
|
context.getContentRepository().incrementClaimaintCount(claim);
|
||||||
}
|
}
|
||||||
final StandardRepositoryRecord record = new StandardRepositoryRecord(null);
|
final StandardRepositoryRecord record = new StandardRepositoryRecord(null);
|
||||||
record.setWorking(clone, CoreAttributes.UUID.key(), newUuid);
|
record.setWorking(clone, Collections.<String, String>emptyMap());
|
||||||
records.put(clone, record);
|
records.put(clone, record);
|
||||||
|
|
||||||
if (offset == 0L && size == example.getSize()) {
|
if (offset == 0L && size == example.getSize()) {
|
||||||
|
@ -1874,7 +1874,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
|
||||||
|
|
||||||
try {
|
try {
|
||||||
currentWriteClaimStream = context.getContentRepository().write(currentWriteClaim);
|
currentWriteClaimStream = context.getContentRepository().write(currentWriteClaim);
|
||||||
} catch (IOException e) {
|
} catch (final IOException e) {
|
||||||
resetWriteClaims();
|
resetWriteClaims();
|
||||||
throw new FlowFileAccessException("Unable to obtain stream for writing to Content Repostiory: " + e, e);
|
throw new FlowFileAccessException("Unable to obtain stream for writing to Content Repostiory: " + e, e);
|
||||||
}
|
}
|
||||||
|
@ -1994,7 +1994,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
|
||||||
|
|
||||||
// Get the current Content Claim from the record and see if we already have
|
// Get the current Content Claim from the record and see if we already have
|
||||||
// an OutputStream that we can append to.
|
// an OutputStream that we can append to.
|
||||||
ContentClaim oldClaim = record.getCurrentClaim();
|
final ContentClaim oldClaim = record.getCurrentClaim();
|
||||||
ByteCountingOutputStream outStream = appendableStreams.get(oldClaim);
|
ByteCountingOutputStream outStream = appendableStreams.get(oldClaim);
|
||||||
long originalByteWrittenCount = 0;
|
long originalByteWrittenCount = 0;
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue