NIFI-8710: Pass details instead of source ID for Provenance per the interface doc

This closes #5224

Signed-off-by: Nandor Soma Abonyi <nsabonyi@apache.org>
This commit is contained in:
Matthew Burgess 2021-07-16 16:37:00 -04:00 committed by Nandor Soma Abonyi
parent 595b1b4dd3
commit 59eebb9906
No known key found for this signature in database
GPG Key ID: AFFFD8C3A1A88ED7
3 changed files with 63 additions and 10 deletions

View File

@ -115,8 +115,8 @@ public class MockProvenanceReporter implements ProvenanceReporter {
} }
@Override @Override
public void receive(FlowFile flowFile, String transitUri, String sourceSystemFlowFileIdentifier) { public void receive(FlowFile flowFile, String transitUri, String details) {
receive(flowFile, transitUri, sourceSystemFlowFileIdentifier, -1L); receive(flowFile, transitUri, details, -1L);
} }
@Override @Override
@ -125,8 +125,8 @@ public class MockProvenanceReporter implements ProvenanceReporter {
} }
@Override @Override
public void receive(final FlowFile flowFile, final String transitUri, final String sourceSystemFlowFileIdentifier, final long transmissionMillis) { public void receive(final FlowFile flowFile, final String transitUri, final String details, final long transmissionMillis) {
receive(flowFile, transitUri, sourceSystemFlowFileIdentifier, null, transmissionMillis); receive(flowFile, transitUri, null, details, transmissionMillis);
} }
@Override @Override

View File

@ -151,8 +151,8 @@ public class StandardProvenanceReporter implements InternalProvenanceReporter {
} }
@Override @Override
public void receive(FlowFile flowFile, String transitUri, String sourceSystemFlowFileIdentifier) { public void receive(FlowFile flowFile, String transitUri, String details) {
receive(flowFile, transitUri, sourceSystemFlowFileIdentifier, -1L); receive(flowFile, transitUri, details, -1L);
} }
@Override @Override
@ -161,8 +161,8 @@ public class StandardProvenanceReporter implements InternalProvenanceReporter {
} }
@Override @Override
public void receive(final FlowFile flowFile, final String transitUri, final String sourceSystemFlowFileIdentifier, final long transmissionMillis) { public void receive(final FlowFile flowFile, final String transitUri, final String details, final long transmissionMillis) {
receive(flowFile, transitUri, sourceSystemFlowFileIdentifier, null, transmissionMillis); receive(flowFile, transitUri, null, details, transmissionMillis);
} }
@Override @Override
@ -170,8 +170,12 @@ public class StandardProvenanceReporter implements InternalProvenanceReporter {
verifyFlowFileKnown(flowFile); verifyFlowFileKnown(flowFile);
try { try {
final ProvenanceEventRecord record = build(flowFile, ProvenanceEventType.RECEIVE) final ProvenanceEventBuilder builder = build(flowFile, ProvenanceEventType.RECEIVE);
.setTransitUri(transitUri).setSourceSystemFlowFileIdentifier(sourceSystemFlowFileIdentifier).setEventDuration(transmissionMillis).setDetails(details).build(); builder.setTransitUri(transitUri);
builder.setSourceSystemFlowFileIdentifier(sourceSystemFlowFileIdentifier);
builder.setEventDuration(transmissionMillis);
builder.setDetails(details);
final ProvenanceEventRecord record = builder.build();
events.add(record); events.add(record);
bytesReceived += flowFile.getSize(); bytesReceived += flowFile.getSize();

View File

@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.controller.repository;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.provenance.ProvenanceEventRepository;
import org.apache.nifi.provenance.StandardProvenanceEventRecord;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;
import java.util.Set;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
class StandardProvenanceReporterTest {
@Test
public void testDetailsRetainedWithDelegate() {
final ProvenanceEventRepository mockRepo = Mockito.mock(ProvenanceEventRepository.class);
final StandardProvenanceReporter reporter = new StandardProvenanceReporter(null, "1234", "TestProc", mockRepo, null);
Mockito.when(mockRepo.eventBuilder()).thenReturn(new StandardProvenanceEventRecord.Builder());
final FlowFile flowFile = new StandardFlowFileRecord.Builder().id(10L).addAttribute("uuid", "10").build();
reporter.receive(flowFile, "test://noop", "These are details", -1);
final Set<ProvenanceEventRecord> records = reporter.getEvents();
assertNotNull(records);
assertEquals(1, records.size());
final ProvenanceEventRecord record = records.iterator().next();
assertNotNull(record);
assertEquals("These are details", record.getDetails());
}
}