NIFI-13762 Expose processor metrics as a part of FlowInfo

Signed-off-by: Ferenc Kis <briansolo1985@gmail.com>

This closes #9281.
This commit is contained in:
Peter Kedvessy 2024-09-19 12:16:11 +02:00 committed by Ferenc Kis
parent 516edf5d87
commit c8bdbb19db
No known key found for this signature in database
GPG Key ID: 5E1CCAC15A5958F2
9 changed files with 237 additions and 12 deletions

View File

@ -59,6 +59,7 @@ public class C2ClientConfig {
private final String c2AssetDirectory;
private final long bootstrapAcknowledgeTimeout;
private final int c2FlowInfoProcessorBulletinLimit;
private final boolean c2FlowInfoProcessorStatusEnabled;
private C2ClientConfig(final Builder builder) {
this.c2Url = builder.c2Url;
@ -90,6 +91,7 @@ public class C2ClientConfig {
this.c2AssetDirectory = builder.c2AssetDirectory;
this.bootstrapAcknowledgeTimeout = builder.bootstrapAcknowledgeTimeout;
this.c2FlowInfoProcessorBulletinLimit = builder.c2FlowInfoProcessorBulletinLimit;
this.c2FlowInfoProcessorStatusEnabled = builder.c2FlowInfoProcessorStatusEnabled;
}
public String getC2Url() {
@ -208,6 +210,9 @@ public class C2ClientConfig {
return c2FlowInfoProcessorBulletinLimit;
}
public boolean isC2FlowInfoProcessorStatusEnabled() {
return c2FlowInfoProcessorStatusEnabled;
}
/**
* Builder for client configuration.
*/
@ -245,6 +250,7 @@ public class C2ClientConfig {
private String c2AssetDirectory;
private long bootstrapAcknowledgeTimeout;
private int c2FlowInfoProcessorBulletinLimit;
private boolean c2FlowInfoProcessorStatusEnabled;
public Builder c2Url(String c2Url) {
this.c2Url = c2Url;
@ -401,6 +407,11 @@ public class C2ClientConfig {
return this;
}
public Builder c2FlowInfoProcessorStatusEnabled(boolean c2FlowInfoProcessorStatusEnabled) {
this.c2FlowInfoProcessorStatusEnabled = c2FlowInfoProcessorStatusEnabled;
return this;
}
public C2ClientConfig build() {
return new C2ClientConfig(this);
}

View File

@ -49,6 +49,7 @@ import org.apache.nifi.c2.protocol.api.AgentRepositories;
import org.apache.nifi.c2.protocol.api.AgentResourceConsumption;
import org.apache.nifi.c2.protocol.api.AgentStatus;
import org.apache.nifi.c2.protocol.api.ProcessorBulletin;
import org.apache.nifi.c2.protocol.api.ProcessorStatus;
import org.apache.nifi.c2.protocol.api.ResourceInfo;
import org.apache.nifi.c2.protocol.api.C2Heartbeat;
import org.apache.nifi.c2.protocol.api.DeviceInfo;
@ -91,7 +92,7 @@ public class C2HeartbeatFactory {
heartbeat.setAgentInfo(getAgentInfo(runtimeInfoWrapper.getAgentRepositories(), runtimeInfoWrapper.getManifest()));
heartbeat.setDeviceInfo(generateDeviceInfo());
heartbeat.setFlowInfo(getFlowInfo(runtimeInfoWrapper.getQueueStatus(), runtimeInfoWrapper.getProcessorBulletins()));
heartbeat.setFlowInfo(getFlowInfo(runtimeInfoWrapper.getQueueStatus(), runtimeInfoWrapper.getProcessorBulletins(), runtimeInfoWrapper.getProcessorStatus()));
heartbeat.setCreated(System.currentTimeMillis());
ResourceInfo resourceInfo = new ResourceInfo();
@ -101,10 +102,11 @@ public class C2HeartbeatFactory {
return heartbeat;
}
private FlowInfo getFlowInfo(Map<String, FlowQueueStatus> queueStatus, List<ProcessorBulletin> processorBulletins) {
private FlowInfo getFlowInfo(Map<String, FlowQueueStatus> queueStatus, List<ProcessorBulletin> processorBulletins, List<ProcessorStatus> processorStatus) {
FlowInfo flowInfo = new FlowInfo();
flowInfo.setQueues(queueStatus);
flowInfo.setProcessorBulletins(processorBulletins);
flowInfo.setProcessorStatuses(processorStatus);
Optional.ofNullable(flowIdHolder.getFlowId()).ifPresent(flowInfo::setFlowId);
return flowInfo;
}

View File

@ -21,6 +21,7 @@ import java.util.Map;
import org.apache.nifi.c2.protocol.api.AgentRepositories;
import org.apache.nifi.c2.protocol.api.FlowQueueStatus;
import org.apache.nifi.c2.protocol.api.ProcessorBulletin;
import org.apache.nifi.c2.protocol.api.ProcessorStatus;
import org.apache.nifi.c2.protocol.component.api.RuntimeManifest;
public class RuntimeInfoWrapper {
@ -28,12 +29,15 @@ public class RuntimeInfoWrapper {
final RuntimeManifest manifest;
final Map<String, FlowQueueStatus> queueStatus;
final List<ProcessorBulletin> processorBulletins;
final List<ProcessorStatus> processorStatus;
public RuntimeInfoWrapper(AgentRepositories repos, RuntimeManifest manifest, Map<String, FlowQueueStatus> queueStatus, List<ProcessorBulletin> processorBulletins) {
public RuntimeInfoWrapper(AgentRepositories repos, RuntimeManifest manifest, Map<String, FlowQueueStatus> queueStatus, List<ProcessorBulletin> processorBulletins,
List<ProcessorStatus> processorStatus) {
this.repos = repos;
this.manifest = manifest;
this.queueStatus = queueStatus;
this.processorBulletins = processorBulletins;
this.processorStatus = processorStatus;
}
public AgentRepositories getAgentRepositories() {
@ -51,4 +55,8 @@ public class RuntimeInfoWrapper {
public List<ProcessorBulletin> getProcessorBulletins() {
return processorBulletins;
}
public List<ProcessorStatus> getProcessorStatus() {
return processorStatus;
}
}

View File

@ -40,6 +40,7 @@ import org.apache.nifi.c2.protocol.api.C2Heartbeat;
import org.apache.nifi.c2.protocol.api.FlowQueueStatus;
import org.apache.nifi.c2.protocol.api.OperationType;
import org.apache.nifi.c2.protocol.api.ProcessorBulletin;
import org.apache.nifi.c2.protocol.api.ProcessorStatus;
import org.apache.nifi.c2.protocol.api.SupportedOperation;
import org.apache.nifi.c2.protocol.api.ResourcesGlobalHash;
import org.apache.nifi.c2.protocol.component.api.Bundle;
@ -121,13 +122,15 @@ public class C2HeartbeatFactoryTest {
RuntimeManifest manifest = createManifest();
Map<String, FlowQueueStatus> queueStatus = new HashMap<>();
List<ProcessorBulletin> processorBulletins = new ArrayList<>();
List<ProcessorStatus> processorStatus = new ArrayList<>();
C2Heartbeat heartbeat = c2HeartbeatFactory.create(new RuntimeInfoWrapper(repos, manifest, queueStatus, processorBulletins));
C2Heartbeat heartbeat = c2HeartbeatFactory.create(new RuntimeInfoWrapper(repos, manifest, queueStatus, processorBulletins, processorStatus));
assertEquals(repos, heartbeat.getAgentInfo().getStatus().getRepositories());
assertEquals(manifest, heartbeat.getAgentInfo().getAgentManifest());
assertEquals(queueStatus, heartbeat.getFlowInfo().getQueues());
assertEquals(processorBulletins, heartbeat.getFlowInfo().getProcessorBulletins());
assertEquals(processorStatus, heartbeat.getFlowInfo().getProcessorStatuses());
assertEquals(RESOURCE_HASH, heartbeat.getResourceInfo().getHash());
}
@ -140,13 +143,15 @@ public class C2HeartbeatFactoryTest {
RuntimeManifest manifest = createManifest();
Map<String, FlowQueueStatus> queueStatus = new HashMap<>();
List<ProcessorBulletin> processorBulletins = new ArrayList<>();
List<ProcessorStatus> processorStatus = new ArrayList<>();
C2Heartbeat heartbeat = c2HeartbeatFactory.create(new RuntimeInfoWrapper(repos, manifest, queueStatus, processorBulletins));
C2Heartbeat heartbeat = c2HeartbeatFactory.create(new RuntimeInfoWrapper(repos, manifest, queueStatus, processorBulletins, processorStatus));
assertEquals(repos, heartbeat.getAgentInfo().getStatus().getRepositories());
assertNull(heartbeat.getAgentInfo().getAgentManifest());
assertEquals(queueStatus, heartbeat.getFlowInfo().getQueues());
assertEquals(processorBulletins, heartbeat.getFlowInfo().getProcessorBulletins());
assertEquals(processorStatus, heartbeat.getFlowInfo().getProcessorStatuses());
assertEquals(RESOURCE_HASH, heartbeat.getResourceInfo().getHash());
}
@ -163,7 +168,7 @@ public class C2HeartbeatFactoryTest {
when(manifestHashProvider.calculateManifestHash(manifest.getBundles(), Collections.emptySet())).thenReturn(MANIFEST_HASH);
when(resourcesGlobalHashSupplier.get()).thenReturn(createResourcesGlobalHash());
C2Heartbeat heartbeat = c2HeartbeatFactory.create(new RuntimeInfoWrapper(new AgentRepositories(), manifest, new HashMap<>(), new ArrayList<>()));
C2Heartbeat heartbeat = c2HeartbeatFactory.create(new RuntimeInfoWrapper(new AgentRepositories(), manifest, new HashMap<>(), new ArrayList<>(), new ArrayList<>()));
assertEquals(MANIFEST_HASH, heartbeat.getAgentInfo().getAgentManifestHash());
assertEquals(RESOURCE_HASH, heartbeat.getResourceInfo().getHash());
@ -179,7 +184,7 @@ public class C2HeartbeatFactoryTest {
when(manifestHashProvider.calculateManifestHash(manifest.getBundles(), supportedOperations)).thenReturn(MANIFEST_HASH);
when(resourcesGlobalHashSupplier.get()).thenReturn(createResourcesGlobalHash());
C2Heartbeat heartbeat = c2HeartbeatFactory.create(new RuntimeInfoWrapper(new AgentRepositories(), manifest, new HashMap<>(), new ArrayList<>()));
C2Heartbeat heartbeat = c2HeartbeatFactory.create(new RuntimeInfoWrapper(new AgentRepositories(), manifest, new HashMap<>(), new ArrayList<>(), new ArrayList<>()));
assertEquals(MANIFEST_HASH, heartbeat.getAgentInfo().getAgentManifestHash());
assertEquals(RESOURCE_HASH, heartbeat.getResourceInfo().getHash());

View File

@ -58,7 +58,7 @@ public class DescribeManifestOperationHandlerTest {
void testDescribeManifestOperationHandlerPopulatesAckSuccessfully() {
RuntimeManifest manifest = new RuntimeManifest();
manifest.setIdentifier("manifestId");
RuntimeInfoWrapper runtimeInfoWrapper = new RuntimeInfoWrapper(null, manifest, null, null);
RuntimeInfoWrapper runtimeInfoWrapper = new RuntimeInfoWrapper(null, manifest, null, null, null);
C2Heartbeat heartbeat = new C2Heartbeat();
AgentInfo agentInfo = new AgentInfo();

View File

@ -31,6 +31,7 @@ public class FlowInfo implements Serializable {
private Map<String, ComponentStatus> components;
private Map<String, FlowQueueStatus> queues;
private List<ProcessorBulletin> processorBulletins;
private List<ProcessorStatus> processorStatuses;
@Schema(description = "A unique identifier of the flow currently deployed on the agent")
public String getFlowId() {
@ -77,4 +78,12 @@ public class FlowInfo implements Serializable {
this.processorBulletins = processorBulletins;
}
@Schema(description = "Status and metrics for each processors")
public List<ProcessorStatus> getProcessorStatuses() {
return processorStatuses;
}
public void setProcessorStatuses(List<ProcessorStatus> processorStatuses) {
this.processorStatuses = processorStatuses;
}
}

View File

@ -0,0 +1,148 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.c2.protocol.api;
import io.swagger.v3.oas.annotations.media.Schema;
import java.io.Serializable;
public class ProcessorStatus implements Serializable {
private static final long serialVersionUID = 1L;
private String id;
private String groupId;
private long bytesRead;
private long bytesWritten;
private long flowFilesIn;
private long flowFilesOut;
private long bytesIn;
private long bytesOut;
private int invocations;
private long processingNanos;
private int activeThreadCount;
private int terminatedThreadCount;
@Schema(description = "The id of the processor")
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
@Schema(description = "The group id of the processor")
public String getGroupId() {
return groupId;
}
public void setGroupId(String groupId) {
this.groupId = groupId;
}
@Schema(description = "The number of bytes read by the processor")
public long getBytesRead() {
return bytesRead;
}
public void setBytesRead(long bytesRead) {
this.bytesRead = bytesRead;
}
@Schema(description = "The number of bytes written by the processor")
public long getBytesWritten() {
return bytesWritten;
}
public void setBytesWritten(long bytesWritten) {
this.bytesWritten = bytesWritten;
}
@Schema(description = "The number of accepted flow files")
public long getFlowFilesIn() {
return flowFilesIn;
}
public void setFlowFilesIn(long flowFilesIn) {
this.flowFilesIn = flowFilesIn;
}
@Schema(description = "The number of transferred flow files")
public long getFlowFilesOut() {
return flowFilesOut;
}
public void setFlowFilesOut(long flowFilesOut) {
this.flowFilesOut = flowFilesOut;
}
@Schema(description = "The size of accepted flow files")
public long getBytesIn() {
return bytesIn;
}
public void setBytesIn(long bytesIn) {
this.bytesIn = bytesIn;
}
@Schema(description = "The size of transferred flow files")
public long getBytesOut() {
return bytesOut;
}
public void setBytesOut(long bytesOut) {
this.bytesOut = bytesOut;
}
@Schema(description = "The number of invocations")
public int getInvocations() {
return invocations;
}
public void setInvocations(int invocations) {
this.invocations = invocations;
}
@Schema(description = "The number of nanoseconds that the processor has spent running")
public long getProcessingNanos() {
return processingNanos;
}
public void setProcessingNanos(long processingNanos) {
this.processingNanos = processingNanos;
}
@Schema(description = "The number of active threads currently executing")
public int getActiveThreadCount() {
return activeThreadCount;
}
public void setActiveThreadCount(int activeThreadCount) {
this.activeThreadCount = activeThreadCount;
}
@Schema(description = "The number of threads currently terminated")
public int getTerminatedThreadCount() {
return terminatedThreadCount;
}
public void setTerminatedThreadCount(int terminatedThreadCount) {
this.terminatedThreadCount = terminatedThreadCount;
}
}

View File

@ -93,6 +93,7 @@ public enum MiNiFiProperties {
C2_REQUEST_COMPRESSION("c2.request.compression", "none", false, true, VALID),
C2_BOOTSTRAP_ACKNOWLEDGE_TIMEOUT("c2.bootstrap.acknowledge.timeout", "15 sec", false, true, VALID),
C2_FLOW_INFO_PROCESSOR_BULLETIN_LIMIT("c2.flow.info.processor.bulletin.limit", "1000", false, true, NON_NEGATIVE_INTEGER_VALIDATOR ),
C2_FLOW_INFO_PROCESSOR_STATUS_ENABLED("c2.flow.info.processor.status.enabled", "true", false, true, BOOLEAN_VALIDATOR),
NIFI_MINIFI_NOTIFIER_INGESTORS("nifi.minifi.notifier.ingestors", null, false, true, VALID),
NIFI_MINIFI_NOTIFIER_INGESTORS_FILE_CONFIG_PATH("nifi.minifi.notifier.ingestors.file.config.path", null, false, true, VALID),
NIFI_MINIFI_NOTIFIER_INGESTORS_FILE_POLLING_PERIOD_SECONDS("nifi.minifi.notifier.ingestors.file.polling.period.seconds", null, false, true, NON_NEGATIVE_INTEGER_VALIDATOR),

View File

@ -31,6 +31,7 @@ import static org.apache.nifi.minifi.commons.api.MiNiFiProperties.C2_AGENT_IDENT
import static org.apache.nifi.minifi.commons.api.MiNiFiProperties.C2_ASSET_DIRECTORY;
import static org.apache.nifi.minifi.commons.api.MiNiFiProperties.C2_BOOTSTRAP_ACKNOWLEDGE_TIMEOUT;
import static org.apache.nifi.minifi.commons.api.MiNiFiProperties.C2_CONFIG_DIRECTORY;
import static org.apache.nifi.minifi.commons.api.MiNiFiProperties.C2_FLOW_INFO_PROCESSOR_STATUS_ENABLED;
import static org.apache.nifi.minifi.commons.api.MiNiFiProperties.C2_FULL_HEARTBEAT;
import static org.apache.nifi.minifi.commons.api.MiNiFiProperties.C2_KEEP_ALIVE_DURATION;
import static org.apache.nifi.minifi.commons.api.MiNiFiProperties.C2_MAX_IDLE_CONNECTIONS;
@ -95,6 +96,7 @@ import org.apache.nifi.c2.protocol.api.AgentRepositories;
import org.apache.nifi.c2.protocol.api.AgentRepositoryStatus;
import org.apache.nifi.c2.protocol.api.FlowQueueStatus;
import org.apache.nifi.c2.protocol.api.ProcessorBulletin;
import org.apache.nifi.c2.protocol.api.ProcessorStatus;
import org.apache.nifi.c2.serializer.C2JacksonSerializer;
import org.apache.nifi.c2.serializer.C2Serializer;
import org.apache.nifi.controller.FlowController;
@ -180,7 +182,9 @@ public class C2NifiClientService {
this.c2OperationManager = new C2OperationManager(
client, c2OperationHandlerProvider, heartbeatLock, operationQueueDAO, c2OperationRestartHandler);
Supplier<RuntimeInfoWrapper> runtimeInfoWrapperSupplier = () -> generateRuntimeInfo(clientConfig.getC2FlowInfoProcessorBulletinLimit());
Supplier<RuntimeInfoWrapper> runtimeInfoWrapperSupplier = () -> generateRuntimeInfo(
clientConfig.getC2FlowInfoProcessorBulletinLimit(),
clientConfig.isC2FlowInfoProcessorStatusEnabled());
this.c2HeartbeatManager = new C2HeartbeatManager(
client, heartbeatFactory, heartbeatLock, runtimeInfoWrapperSupplier, c2OperationManager);
}
@ -216,6 +220,8 @@ public class C2NifiClientService {
.bootstrapAcknowledgeTimeout(durationPropertyInMilliSecs(properties, C2_BOOTSTRAP_ACKNOWLEDGE_TIMEOUT))
.c2FlowInfoProcessorBulletinLimit(parseInt(properties
.getProperty(C2_FLOW_INFO_PROCESSOR_BULLETIN_LIMIT.getKey(), C2_FLOW_INFO_PROCESSOR_BULLETIN_LIMIT.getDefaultValue())))
.c2FlowInfoProcessorStatusEnabled(parseBoolean(properties
.getProperty(C2_FLOW_INFO_PROCESSOR_STATUS_ENABLED.getKey(), C2_FLOW_INFO_PROCESSOR_STATUS_ENABLED.getDefaultValue())))
.build();
}
@ -242,7 +248,8 @@ public class C2NifiClientService {
new StandardFlowEnrichService(niFiProperties), flowPropertyEncryptor,
StandardFlowSerDeService.defaultInstance(), niFiProperties.getProperty(FLOW_CONFIGURATION_FILE));
Supplier<RuntimeInfoWrapper> runtimeInfoWrapperSupplier = () -> generateRuntimeInfo(
parseInt(niFiProperties.getProperty(C2_FLOW_INFO_PROCESSOR_BULLETIN_LIMIT.getKey(), C2_FLOW_INFO_PROCESSOR_BULLETIN_LIMIT.getDefaultValue())));
parseInt(niFiProperties.getProperty(C2_FLOW_INFO_PROCESSOR_BULLETIN_LIMIT.getKey(), C2_FLOW_INFO_PROCESSOR_BULLETIN_LIMIT.getDefaultValue())),
parseBoolean(niFiProperties.getProperty(C2_FLOW_INFO_PROCESSOR_STATUS_ENABLED.getKey(), C2_FLOW_INFO_PROCESSOR_STATUS_ENABLED.getDefaultValue())));
return new C2OperationHandlerProvider(List.of(
new UpdateConfigurationOperationHandler(client, flowIdHolder, updateConfigurationStrategy, emptyOperandPropertiesProvider),
@ -283,10 +290,15 @@ public class C2NifiClientService {
}
}
private synchronized RuntimeInfoWrapper generateRuntimeInfo(int processorBulletinLimit) {
private synchronized RuntimeInfoWrapper generateRuntimeInfo(int processorBulletinLimit, boolean processorStatusEnabled) {
AgentManifest agentManifest = new AgentManifest(runtimeManifestService.getManifest());
agentManifest.setSupportedOperations(supportedOperationsProvider.getSupportedOperations());
return new RuntimeInfoWrapper(getAgentRepositories(), agentManifest, getQueueStatus(), getBulletins(processorBulletinLimit));
return new RuntimeInfoWrapper(
getAgentRepositories(),
agentManifest,
getQueueStatus(),
getBulletins(processorBulletinLimit),
getProcessorStatus(processorStatusEnabled));
}
private AgentRepositories getAgentRepositories() {
@ -367,4 +379,33 @@ public class C2NifiClientService {
}
return new ArrayList<>();
}
private List<ProcessorStatus> getProcessorStatus(boolean processorStatusEnabled) {
if (processorStatusEnabled) {
return flowController.getEventAccess()
.getGroupStatus(ROOT_GROUP_ID)
.getProcessorStatus()
.stream()
.map(this::convertProcessorStatus)
.toList();
}
return null;
}
private ProcessorStatus convertProcessorStatus(org.apache.nifi.controller.status.ProcessorStatus processorStatus) {
ProcessorStatus result = new ProcessorStatus();
result.setId(processorStatus.getId());
result.setGroupId(processorStatus.getGroupId());
result.setBytesRead(processorStatus.getBytesRead());
result.setBytesWritten(processorStatus.getBytesWritten());
result.setFlowFilesIn(processorStatus.getFlowFilesReceived());
result.setFlowFilesOut(processorStatus.getFlowFilesSent());
result.setBytesIn(processorStatus.getBytesReceived());
result.setBytesOut(processorStatus.getBytesSent());
result.setInvocations(processorStatus.getInvocations());
result.setProcessingNanos(processorStatus.getProcessingNanos());
result.setActiveThreadCount(processorStatus.getActiveThreadCount());
result.setTerminatedThreadCount(processorStatus.getTerminatedThreadCount());
return result;
}
}