NIFI-7868: Track number of flowfiles/bytes sent/received/fetched in provenance reporter so that events that are emitted with 'immediate' flag true are still counted (#4564)

This commit is contained in:
markap14 2020-09-30 19:58:20 -04:00 committed by GitHub
parent 62f4d09883
commit b2489f7644
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 162 additions and 36 deletions

View File

@ -675,4 +675,34 @@ public interface ProvenanceReporter {
* @param details any relevant details about the CREATE event
*/
void create(FlowFile flowFile, String details);
/**
* @return the number of FlowFiles for which there was a RECEIVE event
*/
int getFlowFilesReceived();
/**
* @return the sum of the sizes of all FlowFiles for which there was a RECEIVE event
*/
long getBytesReceived();
/**
* @return the number of FlowFiles for which there was a FETCH event
*/
int getFlowFilesFetched();
/**
* @return the sum of the sizes of all FlowFiles for which there was a FETCH event
*/
long getBytesFetched();
/**
* @return the number of FlowFiles for which there was a SEND event
*/
int getFlowFilesSent();
/**
* @return the sum of the sizes of all FlowFiles for which there was a SEND event
*/
long getBytesSent();
}

View File

@ -503,6 +503,36 @@ public class MockProvenanceReporter implements ProvenanceReporter {
}
}
@Override
public int getFlowFilesReceived() {
return 0;
}
@Override
public long getBytesReceived() {
return 0;
}
@Override
public int getFlowFilesFetched() {
return 0;
}
@Override
public long getBytesFetched() {
return 0;
}
@Override
public int getFlowFilesSent() {
return 0;
}
@Override
public long getBytesSent() {
return 0;
}
ProvenanceEventBuilder build(final FlowFile flowFile, final ProvenanceEventType eventType) {
final ProvenanceEventBuilder builder = new StandardProvenanceEventRecord.Builder();
builder.setEventType(eventType);

View File

@ -485,31 +485,6 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
}
private void updateEventRepository(final Checkpoint checkpoint) {
int flowFilesReceived = 0;
int flowFilesSent = 0;
long bytesReceived = 0L;
long bytesSent = 0L;
for (final ProvenanceEventRecord event : checkpoint.reportedEvents) {
if (isSpuriousForkEvent(event, checkpoint.removedFlowFiles)) {
continue;
}
switch (event.getEventType()) {
case SEND:
flowFilesSent++;
bytesSent += event.getFileSize();
break;
case RECEIVE:
case FETCH:
flowFilesReceived++;
bytesReceived += event.getFileSize();
break;
default:
break;
}
}
try {
// update event repository
final Connectable connectable = context.getConnectable();
@ -522,10 +497,10 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
flowFileEvent.setFlowFilesIn(checkpoint.flowFilesIn);
flowFileEvent.setFlowFilesOut(checkpoint.flowFilesOut);
flowFileEvent.setFlowFilesRemoved(checkpoint.removedCount);
flowFileEvent.setFlowFilesReceived(flowFilesReceived);
flowFileEvent.setBytesReceived(bytesReceived);
flowFileEvent.setFlowFilesSent(flowFilesSent);
flowFileEvent.setBytesSent(bytesSent);
flowFileEvent.setFlowFilesReceived(checkpoint.flowFilesReceived);
flowFileEvent.setBytesReceived(checkpoint.bytesReceived);
flowFileEvent.setFlowFilesSent(checkpoint.flowFilesSent);
flowFileEvent.setBytesSent(checkpoint.bytesSent);
final long now = System.currentTimeMillis();
long lineageMillis = 0L;
@ -3387,6 +3362,8 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
private long bytesWritten = 0L;
private int flowFilesIn = 0, flowFilesOut = 0;
private long contentSizeIn = 0L, contentSizeOut = 0L;
private int flowFilesReceived = 0, flowFilesSent = 0;
private long bytesReceived = 0L, bytesSent = 0L;
private void checkpoint(final StandardProcessSession session, final List<ProvenanceEventRecord> autoTerminatedEvents) {
this.processingTime += System.nanoTime() - session.processingStartTime;
@ -3416,6 +3393,10 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
this.flowFilesOut += session.flowFilesOut;
this.contentSizeIn += session.contentSizeIn;
this.contentSizeOut += session.contentSizeOut;
this.flowFilesReceived += session.provenanceReporter.getFlowFilesReceived() + session.provenanceReporter.getFlowFilesFetched();
this.bytesReceived += session.provenanceReporter.getBytesReceived() + session.provenanceReporter.getBytesFetched();
this.flowFilesSent += session.provenanceReporter.getFlowFilesSent();
this.bytesSent += session.provenanceReporter.getBytesSent();
}
private <K, V> void mergeMaps(final Map<K, V> destination, final Map<K, V> toMerge, final BiFunction<? super V, ? super V, ? extends V> merger) {

View File

@ -34,7 +34,6 @@ import org.slf4j.LoggerFactory;
public class StandardProvenanceReporter implements ProvenanceReporter {
public static final String NIFI_NAMESPACE = "nifi";
private final Logger logger = LoggerFactory.getLogger(StandardProvenanceReporter.class);
private final String processorId;
private final String processorType;
@ -42,6 +41,12 @@ public class StandardProvenanceReporter implements ProvenanceReporter {
private final ProvenanceEventRepository repository;
private final ProvenanceEventEnricher eventEnricher;
private final StandardProcessSession session;
private long bytesSent = 0L;
private long bytesReceived = 0L;
private int flowFilesSent = 0;
private int flowFilesReceived = 0;
private int flowFilesFetched = 0;
private long bytesFetched = 0L;
public StandardProvenanceReporter(final StandardProcessSession session, final String processorId, final String processorType,
final ProvenanceEventRepository repository, final ProvenanceEventEnricher enricher) {
@ -138,6 +143,9 @@ public class StandardProvenanceReporter implements ProvenanceReporter {
final ProvenanceEventRecord record = build(flowFile, ProvenanceEventType.RECEIVE)
.setTransitUri(transitUri).setSourceSystemFlowFileIdentifier(sourceSystemFlowFileIdentifier).setEventDuration(transmissionMillis).setDetails(details).build();
events.add(record);
bytesReceived += flowFile.getSize();
flowFilesReceived++;
} catch (final Exception e) {
logger.error("Failed to generate Provenance Event due to " + e);
if (logger.isDebugEnabled()) {
@ -166,7 +174,11 @@ public class StandardProvenanceReporter implements ProvenanceReporter {
.setEventDuration(transmissionMillis)
.setDetails(details)
.build();
events.add(record);
bytesFetched += flowFile.getSize();
flowFilesFetched++;
} catch (final Exception e) {
logger.error("Failed to generate Provenance Event due to " + e);
if (logger.isDebugEnabled()) {
@ -218,6 +230,9 @@ public class StandardProvenanceReporter implements ProvenanceReporter {
} else {
events.add(enriched);
}
bytesSent += flowFile.getSize();
flowFilesSent++;
} catch (final Exception e) {
logger.error("Failed to generate Provenance Event due to " + e);
if (logger.isDebugEnabled()) {
@ -519,4 +534,34 @@ public class StandardProvenanceReporter implements ProvenanceReporter {
builder.setComponentType(processorType);
return builder;
}
@Override
public int getFlowFilesSent() {
return flowFilesSent;
}
@Override
public long getBytesSent() {
return bytesSent;
}
@Override
public int getFlowFilesReceived() {
return flowFilesReceived;
}
@Override
public long getBytesReceived() {
return bytesReceived;
}
@Override
public int getFlowFilesFetched() {
return flowFilesFetched;
}
@Override
public long getBytesFetched() {
return bytesFetched;
}
}

View File

@ -37,6 +37,13 @@ public class ProvenanceCollector implements ProvenanceReporter {
private final String processorId;
private final String processorType;
private final Collection<ProvenanceEventRecord> events;
private long bytesSent = 0L;
private long bytesReceived = 0L;
private int flowFilesSent = 0;
private int flowFilesReceived = 0;
private int flowFilesFetched = 0;
private long bytesFetched = 0L;
public ProvenanceCollector(final StatelessProcessSession session, final Collection<ProvenanceEventRecord> events, final String processorId, final String processorType) {
this.session = session;
@ -134,6 +141,9 @@ public class ProvenanceCollector implements ProvenanceReporter {
.setDetails(details)
.build();
events.add(record);
flowFilesReceived++;
bytesReceived += flowFile.getSize();
} catch (final Exception e) {
logger.error("Failed to generate Provenance Event due to " + e);
if (logger.isDebugEnabled()) {
@ -163,6 +173,9 @@ public class ProvenanceCollector implements ProvenanceReporter {
.setDetails(details)
.build();
events.add(record);
flowFilesFetched++;
bytesFetched += flowFile.getSize();
} catch (final Exception e) {
logger.error("Failed to generate Provenance Event due to " + e);
if (logger.isDebugEnabled()) {
@ -205,12 +218,9 @@ public class ProvenanceCollector implements ProvenanceReporter {
public void send(final FlowFile flowFile, final String transitUri, final String details, final long transmissionMillis, final boolean force) {
try {
final ProvenanceEventRecord record = build(flowFile, ProvenanceEventType.SEND).setTransitUri(transitUri).setEventDuration(transmissionMillis).setDetails(details).build();
if (force) {
//sharedSessionState.addProvenanceEvents(Collections.singleton(record));
events.add(record);
} else {
events.add(record);
}
events.add(record);
flowFilesSent++;
bytesSent += flowFile.getSize();
} catch (final Exception e) {
logger.error("Failed to generate Provenance Event due to " + e);
if (logger.isDebugEnabled()) {
@ -497,6 +507,36 @@ public class ProvenanceCollector implements ProvenanceReporter {
}
}
@Override
public int getFlowFilesReceived() {
return flowFilesReceived;
}
@Override
public long getBytesReceived() {
return bytesReceived;
}
@Override
public int getFlowFilesFetched() {
return flowFilesFetched;
}
@Override
public long getBytesFetched() {
return bytesFetched;
}
@Override
public int getFlowFilesSent() {
return flowFilesSent;
}
@Override
public long getBytesSent() {
return bytesSent;
}
ProvenanceEventBuilder build(final FlowFile flowFile, final ProvenanceEventType eventType) {
final ProvenanceEventBuilder builder = new StandardProvenanceEventRecord.Builder();
builder.setEventType(eventType);