diff --git a/c2/c2-client-bundle/c2-client-base/src/main/java/org/apache/nifi/c2/client/C2ClientConfig.java b/c2/c2-client-bundle/c2-client-base/src/main/java/org/apache/nifi/c2/client/C2ClientConfig.java index 070cc02d16..03b3f2491a 100644 --- a/c2/c2-client-bundle/c2-client-base/src/main/java/org/apache/nifi/c2/client/C2ClientConfig.java +++ b/c2/c2-client-bundle/c2-client-base/src/main/java/org/apache/nifi/c2/client/C2ClientConfig.java @@ -59,6 +59,7 @@ public class C2ClientConfig { private final String c2AssetDirectory; private final long bootstrapAcknowledgeTimeout; private final int c2FlowInfoProcessorBulletinLimit; + private final boolean c2FlowInfoProcessorStatusEnabled; private C2ClientConfig(final Builder builder) { this.c2Url = builder.c2Url; @@ -90,6 +91,7 @@ public class C2ClientConfig { this.c2AssetDirectory = builder.c2AssetDirectory; this.bootstrapAcknowledgeTimeout = builder.bootstrapAcknowledgeTimeout; this.c2FlowInfoProcessorBulletinLimit = builder.c2FlowInfoProcessorBulletinLimit; + this.c2FlowInfoProcessorStatusEnabled = builder.c2FlowInfoProcessorStatusEnabled; } public String getC2Url() { @@ -208,6 +210,9 @@ public class C2ClientConfig { return c2FlowInfoProcessorBulletinLimit; } + public boolean isC2FlowInfoProcessorStatusEnabled() { + return c2FlowInfoProcessorStatusEnabled; + } /** * Builder for client configuration. */ @@ -245,6 +250,7 @@ public class C2ClientConfig { private String c2AssetDirectory; private long bootstrapAcknowledgeTimeout; private int c2FlowInfoProcessorBulletinLimit; + private boolean c2FlowInfoProcessorStatusEnabled; public Builder c2Url(String c2Url) { this.c2Url = c2Url; @@ -401,6 +407,11 @@ public class C2ClientConfig { return this; } + public Builder c2FlowInfoProcessorStatusEnabled(boolean c2FlowInfoProcessorStatusEnabled) { + this.c2FlowInfoProcessorStatusEnabled = c2FlowInfoProcessorStatusEnabled; + return this; + } + public C2ClientConfig build() { return new C2ClientConfig(this); } diff --git a/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/C2HeartbeatFactory.java b/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/C2HeartbeatFactory.java index 653d9ba48f..240a3b84d7 100644 --- a/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/C2HeartbeatFactory.java +++ b/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/C2HeartbeatFactory.java @@ -49,6 +49,7 @@ import org.apache.nifi.c2.protocol.api.AgentRepositories; import org.apache.nifi.c2.protocol.api.AgentResourceConsumption; import org.apache.nifi.c2.protocol.api.AgentStatus; import org.apache.nifi.c2.protocol.api.ProcessorBulletin; +import org.apache.nifi.c2.protocol.api.ProcessorStatus; import org.apache.nifi.c2.protocol.api.ResourceInfo; import org.apache.nifi.c2.protocol.api.C2Heartbeat; import org.apache.nifi.c2.protocol.api.DeviceInfo; @@ -91,7 +92,7 @@ public class C2HeartbeatFactory { heartbeat.setAgentInfo(getAgentInfo(runtimeInfoWrapper.getAgentRepositories(), runtimeInfoWrapper.getManifest())); heartbeat.setDeviceInfo(generateDeviceInfo()); - heartbeat.setFlowInfo(getFlowInfo(runtimeInfoWrapper.getQueueStatus(), runtimeInfoWrapper.getProcessorBulletins())); + heartbeat.setFlowInfo(getFlowInfo(runtimeInfoWrapper.getQueueStatus(), runtimeInfoWrapper.getProcessorBulletins(), runtimeInfoWrapper.getProcessorStatus())); heartbeat.setCreated(System.currentTimeMillis()); ResourceInfo resourceInfo = new ResourceInfo(); @@ -101,10 +102,11 @@ public class C2HeartbeatFactory { return heartbeat; } - private FlowInfo getFlowInfo(Map queueStatus, List processorBulletins) { + private FlowInfo getFlowInfo(Map queueStatus, List processorBulletins, List processorStatus) { FlowInfo flowInfo = new FlowInfo(); flowInfo.setQueues(queueStatus); flowInfo.setProcessorBulletins(processorBulletins); + flowInfo.setProcessorStatuses(processorStatus); Optional.ofNullable(flowIdHolder.getFlowId()).ifPresent(flowInfo::setFlowId); return flowInfo; } diff --git a/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/model/RuntimeInfoWrapper.java b/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/model/RuntimeInfoWrapper.java index b7653ea424..7cf064408c 100644 --- a/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/model/RuntimeInfoWrapper.java +++ b/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/model/RuntimeInfoWrapper.java @@ -21,6 +21,7 @@ import java.util.Map; import org.apache.nifi.c2.protocol.api.AgentRepositories; import org.apache.nifi.c2.protocol.api.FlowQueueStatus; import org.apache.nifi.c2.protocol.api.ProcessorBulletin; +import org.apache.nifi.c2.protocol.api.ProcessorStatus; import org.apache.nifi.c2.protocol.component.api.RuntimeManifest; public class RuntimeInfoWrapper { @@ -28,12 +29,15 @@ public class RuntimeInfoWrapper { final RuntimeManifest manifest; final Map queueStatus; final List processorBulletins; + final List processorStatus; - public RuntimeInfoWrapper(AgentRepositories repos, RuntimeManifest manifest, Map queueStatus, List processorBulletins) { + public RuntimeInfoWrapper(AgentRepositories repos, RuntimeManifest manifest, Map queueStatus, List processorBulletins, + List processorStatus) { this.repos = repos; this.manifest = manifest; this.queueStatus = queueStatus; this.processorBulletins = processorBulletins; + this.processorStatus = processorStatus; } public AgentRepositories getAgentRepositories() { @@ -51,4 +55,8 @@ public class RuntimeInfoWrapper { public List getProcessorBulletins() { return processorBulletins; } + + public List getProcessorStatus() { + return processorStatus; + } } diff --git a/c2/c2-client-bundle/c2-client-service/src/test/java/org/apache/nifi/c2/client/service/C2HeartbeatFactoryTest.java b/c2/c2-client-bundle/c2-client-service/src/test/java/org/apache/nifi/c2/client/service/C2HeartbeatFactoryTest.java index ceaa42d4fa..fdc4056929 100644 --- a/c2/c2-client-bundle/c2-client-service/src/test/java/org/apache/nifi/c2/client/service/C2HeartbeatFactoryTest.java +++ b/c2/c2-client-bundle/c2-client-service/src/test/java/org/apache/nifi/c2/client/service/C2HeartbeatFactoryTest.java @@ -40,6 +40,7 @@ import org.apache.nifi.c2.protocol.api.C2Heartbeat; import org.apache.nifi.c2.protocol.api.FlowQueueStatus; import org.apache.nifi.c2.protocol.api.OperationType; import org.apache.nifi.c2.protocol.api.ProcessorBulletin; +import org.apache.nifi.c2.protocol.api.ProcessorStatus; import org.apache.nifi.c2.protocol.api.SupportedOperation; import org.apache.nifi.c2.protocol.api.ResourcesGlobalHash; import org.apache.nifi.c2.protocol.component.api.Bundle; @@ -121,13 +122,15 @@ public class C2HeartbeatFactoryTest { RuntimeManifest manifest = createManifest(); Map queueStatus = new HashMap<>(); List processorBulletins = new ArrayList<>(); + List processorStatus = new ArrayList<>(); - C2Heartbeat heartbeat = c2HeartbeatFactory.create(new RuntimeInfoWrapper(repos, manifest, queueStatus, processorBulletins)); + C2Heartbeat heartbeat = c2HeartbeatFactory.create(new RuntimeInfoWrapper(repos, manifest, queueStatus, processorBulletins, processorStatus)); assertEquals(repos, heartbeat.getAgentInfo().getStatus().getRepositories()); assertEquals(manifest, heartbeat.getAgentInfo().getAgentManifest()); assertEquals(queueStatus, heartbeat.getFlowInfo().getQueues()); assertEquals(processorBulletins, heartbeat.getFlowInfo().getProcessorBulletins()); + assertEquals(processorStatus, heartbeat.getFlowInfo().getProcessorStatuses()); assertEquals(RESOURCE_HASH, heartbeat.getResourceInfo().getHash()); } @@ -140,13 +143,15 @@ public class C2HeartbeatFactoryTest { RuntimeManifest manifest = createManifest(); Map queueStatus = new HashMap<>(); List processorBulletins = new ArrayList<>(); + List processorStatus = new ArrayList<>(); - C2Heartbeat heartbeat = c2HeartbeatFactory.create(new RuntimeInfoWrapper(repos, manifest, queueStatus, processorBulletins)); + C2Heartbeat heartbeat = c2HeartbeatFactory.create(new RuntimeInfoWrapper(repos, manifest, queueStatus, processorBulletins, processorStatus)); assertEquals(repos, heartbeat.getAgentInfo().getStatus().getRepositories()); assertNull(heartbeat.getAgentInfo().getAgentManifest()); assertEquals(queueStatus, heartbeat.getFlowInfo().getQueues()); assertEquals(processorBulletins, heartbeat.getFlowInfo().getProcessorBulletins()); + assertEquals(processorStatus, heartbeat.getFlowInfo().getProcessorStatuses()); assertEquals(RESOURCE_HASH, heartbeat.getResourceInfo().getHash()); } @@ -163,7 +168,7 @@ public class C2HeartbeatFactoryTest { when(manifestHashProvider.calculateManifestHash(manifest.getBundles(), Collections.emptySet())).thenReturn(MANIFEST_HASH); when(resourcesGlobalHashSupplier.get()).thenReturn(createResourcesGlobalHash()); - C2Heartbeat heartbeat = c2HeartbeatFactory.create(new RuntimeInfoWrapper(new AgentRepositories(), manifest, new HashMap<>(), new ArrayList<>())); + C2Heartbeat heartbeat = c2HeartbeatFactory.create(new RuntimeInfoWrapper(new AgentRepositories(), manifest, new HashMap<>(), new ArrayList<>(), new ArrayList<>())); assertEquals(MANIFEST_HASH, heartbeat.getAgentInfo().getAgentManifestHash()); assertEquals(RESOURCE_HASH, heartbeat.getResourceInfo().getHash()); @@ -179,7 +184,7 @@ public class C2HeartbeatFactoryTest { when(manifestHashProvider.calculateManifestHash(manifest.getBundles(), supportedOperations)).thenReturn(MANIFEST_HASH); when(resourcesGlobalHashSupplier.get()).thenReturn(createResourcesGlobalHash()); - C2Heartbeat heartbeat = c2HeartbeatFactory.create(new RuntimeInfoWrapper(new AgentRepositories(), manifest, new HashMap<>(), new ArrayList<>())); + C2Heartbeat heartbeat = c2HeartbeatFactory.create(new RuntimeInfoWrapper(new AgentRepositories(), manifest, new HashMap<>(), new ArrayList<>(), new ArrayList<>())); assertEquals(MANIFEST_HASH, heartbeat.getAgentInfo().getAgentManifestHash()); assertEquals(RESOURCE_HASH, heartbeat.getResourceInfo().getHash()); diff --git a/c2/c2-client-bundle/c2-client-service/src/test/java/org/apache/nifi/c2/client/service/operation/DescribeManifestOperationHandlerTest.java b/c2/c2-client-bundle/c2-client-service/src/test/java/org/apache/nifi/c2/client/service/operation/DescribeManifestOperationHandlerTest.java index 24bcc0700a..aeb7d55917 100644 --- a/c2/c2-client-bundle/c2-client-service/src/test/java/org/apache/nifi/c2/client/service/operation/DescribeManifestOperationHandlerTest.java +++ b/c2/c2-client-bundle/c2-client-service/src/test/java/org/apache/nifi/c2/client/service/operation/DescribeManifestOperationHandlerTest.java @@ -58,7 +58,7 @@ public class DescribeManifestOperationHandlerTest { void testDescribeManifestOperationHandlerPopulatesAckSuccessfully() { RuntimeManifest manifest = new RuntimeManifest(); manifest.setIdentifier("manifestId"); - RuntimeInfoWrapper runtimeInfoWrapper = new RuntimeInfoWrapper(null, manifest, null, null); + RuntimeInfoWrapper runtimeInfoWrapper = new RuntimeInfoWrapper(null, manifest, null, null, null); C2Heartbeat heartbeat = new C2Heartbeat(); AgentInfo agentInfo = new AgentInfo(); diff --git a/c2/c2-protocol/c2-protocol-api/src/main/java/org/apache/nifi/c2/protocol/api/FlowInfo.java b/c2/c2-protocol/c2-protocol-api/src/main/java/org/apache/nifi/c2/protocol/api/FlowInfo.java index f2f683c1a3..0060ce98d7 100644 --- a/c2/c2-protocol/c2-protocol-api/src/main/java/org/apache/nifi/c2/protocol/api/FlowInfo.java +++ b/c2/c2-protocol/c2-protocol-api/src/main/java/org/apache/nifi/c2/protocol/api/FlowInfo.java @@ -31,6 +31,7 @@ public class FlowInfo implements Serializable { private Map components; private Map queues; private List processorBulletins; + private List processorStatuses; @Schema(description = "A unique identifier of the flow currently deployed on the agent") public String getFlowId() { @@ -77,4 +78,12 @@ public class FlowInfo implements Serializable { this.processorBulletins = processorBulletins; } + @Schema(description = "Status and metrics for each processors") + public List getProcessorStatuses() { + return processorStatuses; + } + + public void setProcessorStatuses(List processorStatuses) { + this.processorStatuses = processorStatuses; + } } diff --git a/c2/c2-protocol/c2-protocol-api/src/main/java/org/apache/nifi/c2/protocol/api/ProcessorStatus.java b/c2/c2-protocol/c2-protocol-api/src/main/java/org/apache/nifi/c2/protocol/api/ProcessorStatus.java new file mode 100644 index 0000000000..a172200138 --- /dev/null +++ b/c2/c2-protocol/c2-protocol-api/src/main/java/org/apache/nifi/c2/protocol/api/ProcessorStatus.java @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.c2.protocol.api; + +import io.swagger.v3.oas.annotations.media.Schema; + +import java.io.Serializable; + +public class ProcessorStatus implements Serializable { + + private static final long serialVersionUID = 1L; + + private String id; + private String groupId; + private long bytesRead; + private long bytesWritten; + private long flowFilesIn; + private long flowFilesOut; + private long bytesIn; + private long bytesOut; + private int invocations; + private long processingNanos; + private int activeThreadCount; + private int terminatedThreadCount; + + @Schema(description = "The id of the processor") + public String getId() { + return id; + } + + public void setId(String id) { + this.id = id; + } + + @Schema(description = "The group id of the processor") + public String getGroupId() { + return groupId; + } + + public void setGroupId(String groupId) { + this.groupId = groupId; + } + + @Schema(description = "The number of bytes read by the processor") + public long getBytesRead() { + return bytesRead; + } + + public void setBytesRead(long bytesRead) { + this.bytesRead = bytesRead; + } + + @Schema(description = "The number of bytes written by the processor") + public long getBytesWritten() { + return bytesWritten; + } + + public void setBytesWritten(long bytesWritten) { + this.bytesWritten = bytesWritten; + } + + @Schema(description = "The number of accepted flow files") + public long getFlowFilesIn() { + return flowFilesIn; + } + + public void setFlowFilesIn(long flowFilesIn) { + this.flowFilesIn = flowFilesIn; + } + + @Schema(description = "The number of transferred flow files") + public long getFlowFilesOut() { + return flowFilesOut; + } + + public void setFlowFilesOut(long flowFilesOut) { + this.flowFilesOut = flowFilesOut; + } + + @Schema(description = "The size of accepted flow files") + public long getBytesIn() { + return bytesIn; + } + + public void setBytesIn(long bytesIn) { + this.bytesIn = bytesIn; + } + + @Schema(description = "The size of transferred flow files") + public long getBytesOut() { + return bytesOut; + } + + public void setBytesOut(long bytesOut) { + this.bytesOut = bytesOut; + } + + @Schema(description = "The number of invocations") + public int getInvocations() { + return invocations; + } + + public void setInvocations(int invocations) { + this.invocations = invocations; + } + + @Schema(description = "The number of nanoseconds that the processor has spent running") + public long getProcessingNanos() { + return processingNanos; + } + + public void setProcessingNanos(long processingNanos) { + this.processingNanos = processingNanos; + } + + @Schema(description = "The number of active threads currently executing") + public int getActiveThreadCount() { + return activeThreadCount; + } + + public void setActiveThreadCount(int activeThreadCount) { + this.activeThreadCount = activeThreadCount; + } + + @Schema(description = "The number of threads currently terminated") + public int getTerminatedThreadCount() { + return terminatedThreadCount; + } + + public void setTerminatedThreadCount(int terminatedThreadCount) { + this.terminatedThreadCount = terminatedThreadCount; + } +} diff --git a/minifi/minifi-commons/minifi-commons-api/src/main/java/org/apache/nifi/minifi/commons/api/MiNiFiProperties.java b/minifi/minifi-commons/minifi-commons-api/src/main/java/org/apache/nifi/minifi/commons/api/MiNiFiProperties.java index c40d2be198..1c34cb5d98 100644 --- a/minifi/minifi-commons/minifi-commons-api/src/main/java/org/apache/nifi/minifi/commons/api/MiNiFiProperties.java +++ b/minifi/minifi-commons/minifi-commons-api/src/main/java/org/apache/nifi/minifi/commons/api/MiNiFiProperties.java @@ -93,6 +93,7 @@ public enum MiNiFiProperties { C2_REQUEST_COMPRESSION("c2.request.compression", "none", false, true, VALID), C2_BOOTSTRAP_ACKNOWLEDGE_TIMEOUT("c2.bootstrap.acknowledge.timeout", "15 sec", false, true, VALID), C2_FLOW_INFO_PROCESSOR_BULLETIN_LIMIT("c2.flow.info.processor.bulletin.limit", "1000", false, true, NON_NEGATIVE_INTEGER_VALIDATOR ), + C2_FLOW_INFO_PROCESSOR_STATUS_ENABLED("c2.flow.info.processor.status.enabled", "true", false, true, BOOLEAN_VALIDATOR), NIFI_MINIFI_NOTIFIER_INGESTORS("nifi.minifi.notifier.ingestors", null, false, true, VALID), NIFI_MINIFI_NOTIFIER_INGESTORS_FILE_CONFIG_PATH("nifi.minifi.notifier.ingestors.file.config.path", null, false, true, VALID), NIFI_MINIFI_NOTIFIER_INGESTORS_FILE_POLLING_PERIOD_SECONDS("nifi.minifi.notifier.ingestors.file.polling.period.seconds", null, false, true, NON_NEGATIVE_INTEGER_VALIDATOR), diff --git a/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/C2NifiClientService.java b/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/C2NifiClientService.java index 02366147d5..d8c055a23d 100644 --- a/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/C2NifiClientService.java +++ b/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/C2NifiClientService.java @@ -31,6 +31,7 @@ import static org.apache.nifi.minifi.commons.api.MiNiFiProperties.C2_AGENT_IDENT import static org.apache.nifi.minifi.commons.api.MiNiFiProperties.C2_ASSET_DIRECTORY; import static org.apache.nifi.minifi.commons.api.MiNiFiProperties.C2_BOOTSTRAP_ACKNOWLEDGE_TIMEOUT; import static org.apache.nifi.minifi.commons.api.MiNiFiProperties.C2_CONFIG_DIRECTORY; +import static org.apache.nifi.minifi.commons.api.MiNiFiProperties.C2_FLOW_INFO_PROCESSOR_STATUS_ENABLED; import static org.apache.nifi.minifi.commons.api.MiNiFiProperties.C2_FULL_HEARTBEAT; import static org.apache.nifi.minifi.commons.api.MiNiFiProperties.C2_KEEP_ALIVE_DURATION; import static org.apache.nifi.minifi.commons.api.MiNiFiProperties.C2_MAX_IDLE_CONNECTIONS; @@ -95,6 +96,7 @@ import org.apache.nifi.c2.protocol.api.AgentRepositories; import org.apache.nifi.c2.protocol.api.AgentRepositoryStatus; import org.apache.nifi.c2.protocol.api.FlowQueueStatus; import org.apache.nifi.c2.protocol.api.ProcessorBulletin; +import org.apache.nifi.c2.protocol.api.ProcessorStatus; import org.apache.nifi.c2.serializer.C2JacksonSerializer; import org.apache.nifi.c2.serializer.C2Serializer; import org.apache.nifi.controller.FlowController; @@ -180,7 +182,9 @@ public class C2NifiClientService { this.c2OperationManager = new C2OperationManager( client, c2OperationHandlerProvider, heartbeatLock, operationQueueDAO, c2OperationRestartHandler); - Supplier runtimeInfoWrapperSupplier = () -> generateRuntimeInfo(clientConfig.getC2FlowInfoProcessorBulletinLimit()); + Supplier runtimeInfoWrapperSupplier = () -> generateRuntimeInfo( + clientConfig.getC2FlowInfoProcessorBulletinLimit(), + clientConfig.isC2FlowInfoProcessorStatusEnabled()); this.c2HeartbeatManager = new C2HeartbeatManager( client, heartbeatFactory, heartbeatLock, runtimeInfoWrapperSupplier, c2OperationManager); } @@ -216,6 +220,8 @@ public class C2NifiClientService { .bootstrapAcknowledgeTimeout(durationPropertyInMilliSecs(properties, C2_BOOTSTRAP_ACKNOWLEDGE_TIMEOUT)) .c2FlowInfoProcessorBulletinLimit(parseInt(properties .getProperty(C2_FLOW_INFO_PROCESSOR_BULLETIN_LIMIT.getKey(), C2_FLOW_INFO_PROCESSOR_BULLETIN_LIMIT.getDefaultValue()))) + .c2FlowInfoProcessorStatusEnabled(parseBoolean(properties + .getProperty(C2_FLOW_INFO_PROCESSOR_STATUS_ENABLED.getKey(), C2_FLOW_INFO_PROCESSOR_STATUS_ENABLED.getDefaultValue()))) .build(); } @@ -242,7 +248,8 @@ public class C2NifiClientService { new StandardFlowEnrichService(niFiProperties), flowPropertyEncryptor, StandardFlowSerDeService.defaultInstance(), niFiProperties.getProperty(FLOW_CONFIGURATION_FILE)); Supplier runtimeInfoWrapperSupplier = () -> generateRuntimeInfo( - parseInt(niFiProperties.getProperty(C2_FLOW_INFO_PROCESSOR_BULLETIN_LIMIT.getKey(), C2_FLOW_INFO_PROCESSOR_BULLETIN_LIMIT.getDefaultValue()))); + parseInt(niFiProperties.getProperty(C2_FLOW_INFO_PROCESSOR_BULLETIN_LIMIT.getKey(), C2_FLOW_INFO_PROCESSOR_BULLETIN_LIMIT.getDefaultValue())), + parseBoolean(niFiProperties.getProperty(C2_FLOW_INFO_PROCESSOR_STATUS_ENABLED.getKey(), C2_FLOW_INFO_PROCESSOR_STATUS_ENABLED.getDefaultValue()))); return new C2OperationHandlerProvider(List.of( new UpdateConfigurationOperationHandler(client, flowIdHolder, updateConfigurationStrategy, emptyOperandPropertiesProvider), @@ -283,10 +290,15 @@ public class C2NifiClientService { } } - private synchronized RuntimeInfoWrapper generateRuntimeInfo(int processorBulletinLimit) { + private synchronized RuntimeInfoWrapper generateRuntimeInfo(int processorBulletinLimit, boolean processorStatusEnabled) { AgentManifest agentManifest = new AgentManifest(runtimeManifestService.getManifest()); agentManifest.setSupportedOperations(supportedOperationsProvider.getSupportedOperations()); - return new RuntimeInfoWrapper(getAgentRepositories(), agentManifest, getQueueStatus(), getBulletins(processorBulletinLimit)); + return new RuntimeInfoWrapper( + getAgentRepositories(), + agentManifest, + getQueueStatus(), + getBulletins(processorBulletinLimit), + getProcessorStatus(processorStatusEnabled)); } private AgentRepositories getAgentRepositories() { @@ -367,4 +379,33 @@ public class C2NifiClientService { } return new ArrayList<>(); } + + private List getProcessorStatus(boolean processorStatusEnabled) { + if (processorStatusEnabled) { + return flowController.getEventAccess() + .getGroupStatus(ROOT_GROUP_ID) + .getProcessorStatus() + .stream() + .map(this::convertProcessorStatus) + .toList(); + } + return null; + } + + private ProcessorStatus convertProcessorStatus(org.apache.nifi.controller.status.ProcessorStatus processorStatus) { + ProcessorStatus result = new ProcessorStatus(); + result.setId(processorStatus.getId()); + result.setGroupId(processorStatus.getGroupId()); + result.setBytesRead(processorStatus.getBytesRead()); + result.setBytesWritten(processorStatus.getBytesWritten()); + result.setFlowFilesIn(processorStatus.getFlowFilesReceived()); + result.setFlowFilesOut(processorStatus.getFlowFilesSent()); + result.setBytesIn(processorStatus.getBytesReceived()); + result.setBytesOut(processorStatus.getBytesSent()); + result.setInvocations(processorStatus.getInvocations()); + result.setProcessingNanos(processorStatus.getProcessingNanos()); + result.setActiveThreadCount(processorStatus.getActiveThreadCount()); + result.setTerminatedThreadCount(processorStatus.getTerminatedThreadCount()); + return result; + } } \ No newline at end of file