NIFI-13635 Expose processor bulletins as a part of FlowInfo

Signed-off-by: Ferenc Erdei <erdei.ferenc90@gmail.com>
This closes #9161.
This commit is contained in:
Peter Kedvessy 2024-08-06 23:37:56 +02:00 committed by Ferenc Erdei
parent 31e5afc9b8
commit 83b701a25e
No known key found for this signature in database
GPG Key ID: 023D856C60E92F96
10 changed files with 250 additions and 13 deletions

View File

@ -58,6 +58,7 @@ public class C2ClientConfig {
private final String c2RequestCompression;
private final String c2AssetDirectory;
private final long bootstrapAcknowledgeTimeout;
private final int c2FlowInfoProcessorBulletinLimit;
private C2ClientConfig(final Builder builder) {
this.c2Url = builder.c2Url;
@ -88,6 +89,7 @@ public class C2ClientConfig {
this.c2RequestCompression = builder.c2RequestCompression;
this.c2AssetDirectory = builder.c2AssetDirectory;
this.bootstrapAcknowledgeTimeout = builder.bootstrapAcknowledgeTimeout;
this.c2FlowInfoProcessorBulletinLimit = builder.c2FlowInfoProcessorBulletinLimit;
}
public String getC2Url() {
@ -202,6 +204,10 @@ public class C2ClientConfig {
return bootstrapAcknowledgeTimeout;
}
public int getC2FlowInfoProcessorBulletinLimit() {
return c2FlowInfoProcessorBulletinLimit;
}
/**
* Builder for client configuration.
*/
@ -238,6 +244,7 @@ public class C2ClientConfig {
private String c2RequestCompression;
private String c2AssetDirectory;
private long bootstrapAcknowledgeTimeout;
private int c2FlowInfoProcessorBulletinLimit;
public Builder c2Url(String c2Url) {
this.c2Url = c2Url;
@ -389,6 +396,11 @@ public class C2ClientConfig {
return this;
}
public Builder c2FlowInfoProcessorBulletinLimit(int c2FlowInfoProcessorBulletinLimit) {
this.c2FlowInfoProcessorBulletinLimit = c2FlowInfoProcessorBulletinLimit;
return this;
}
public C2ClientConfig build() {
return new C2ClientConfig(this);
}

View File

@ -35,6 +35,7 @@ import java.net.NetworkInterface;
import java.net.SocketException;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
@ -47,6 +48,7 @@ import org.apache.nifi.c2.protocol.api.AgentManifest;
import org.apache.nifi.c2.protocol.api.AgentRepositories;
import org.apache.nifi.c2.protocol.api.AgentResourceConsumption;
import org.apache.nifi.c2.protocol.api.AgentStatus;
import org.apache.nifi.c2.protocol.api.ProcessorBulletin;
import org.apache.nifi.c2.protocol.api.ResourceInfo;
import org.apache.nifi.c2.protocol.api.C2Heartbeat;
import org.apache.nifi.c2.protocol.api.DeviceInfo;
@ -89,7 +91,7 @@ public class C2HeartbeatFactory {
heartbeat.setAgentInfo(getAgentInfo(runtimeInfoWrapper.getAgentRepositories(), runtimeInfoWrapper.getManifest()));
heartbeat.setDeviceInfo(generateDeviceInfo());
heartbeat.setFlowInfo(getFlowInfo(runtimeInfoWrapper.getQueueStatus()));
heartbeat.setFlowInfo(getFlowInfo(runtimeInfoWrapper.getQueueStatus(), runtimeInfoWrapper.getProcessorBulletins()));
heartbeat.setCreated(System.currentTimeMillis());
ResourceInfo resourceInfo = new ResourceInfo();
@ -99,9 +101,10 @@ public class C2HeartbeatFactory {
return heartbeat;
}
private FlowInfo getFlowInfo(Map<String, FlowQueueStatus> queueStatus) {
private FlowInfo getFlowInfo(Map<String, FlowQueueStatus> queueStatus, List<ProcessorBulletin> processorBulletins) {
FlowInfo flowInfo = new FlowInfo();
flowInfo.setQueues(queueStatus);
flowInfo.setProcessorBulletins(processorBulletins);
Optional.ofNullable(flowIdHolder.getFlowId()).ifPresent(flowInfo::setFlowId);
return flowInfo;
}

View File

@ -16,20 +16,24 @@
*/
package org.apache.nifi.c2.client.service.model;
import java.util.List;
import java.util.Map;
import org.apache.nifi.c2.protocol.api.AgentRepositories;
import org.apache.nifi.c2.protocol.api.FlowQueueStatus;
import org.apache.nifi.c2.protocol.api.ProcessorBulletin;
import org.apache.nifi.c2.protocol.component.api.RuntimeManifest;
public class RuntimeInfoWrapper {
final AgentRepositories repos;
final RuntimeManifest manifest;
final Map<String, FlowQueueStatus> queueStatus;
final List<ProcessorBulletin> processorBulletins;
public RuntimeInfoWrapper(AgentRepositories repos, RuntimeManifest manifest, Map<String, FlowQueueStatus> queueStatus) {
public RuntimeInfoWrapper(AgentRepositories repos, RuntimeManifest manifest, Map<String, FlowQueueStatus> queueStatus, List<ProcessorBulletin> processorBulletins) {
this.repos = repos;
this.manifest = manifest;
this.queueStatus = queueStatus;
this.processorBulletins = processorBulletins;
}
public AgentRepositories getAgentRepositories() {
@ -43,4 +47,8 @@ public class RuntimeInfoWrapper {
public Map<String, FlowQueueStatus> getQueueStatus() {
return queueStatus;
}
public List<ProcessorBulletin> getProcessorBulletins() {
return processorBulletins;
}
}

View File

@ -24,9 +24,11 @@ import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.io.File;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Supplier;
@ -37,6 +39,7 @@ import org.apache.nifi.c2.protocol.api.AgentRepositories;
import org.apache.nifi.c2.protocol.api.C2Heartbeat;
import org.apache.nifi.c2.protocol.api.FlowQueueStatus;
import org.apache.nifi.c2.protocol.api.OperationType;
import org.apache.nifi.c2.protocol.api.ProcessorBulletin;
import org.apache.nifi.c2.protocol.api.SupportedOperation;
import org.apache.nifi.c2.protocol.api.ResourcesGlobalHash;
import org.apache.nifi.c2.protocol.component.api.Bundle;
@ -117,12 +120,14 @@ public class C2HeartbeatFactoryTest {
AgentRepositories repos = new AgentRepositories();
RuntimeManifest manifest = createManifest();
Map<String, FlowQueueStatus> queueStatus = new HashMap<>();
List<ProcessorBulletin> processorBulletins = new ArrayList<>();
C2Heartbeat heartbeat = c2HeartbeatFactory.create(new RuntimeInfoWrapper(repos, manifest, queueStatus));
C2Heartbeat heartbeat = c2HeartbeatFactory.create(new RuntimeInfoWrapper(repos, manifest, queueStatus, processorBulletins));
assertEquals(repos, heartbeat.getAgentInfo().getStatus().getRepositories());
assertEquals(manifest, heartbeat.getAgentInfo().getAgentManifest());
assertEquals(queueStatus, heartbeat.getFlowInfo().getQueues());
assertEquals(processorBulletins, heartbeat.getFlowInfo().getProcessorBulletins());
assertEquals(RESOURCE_HASH, heartbeat.getResourceInfo().getHash());
}
@ -134,12 +139,14 @@ public class C2HeartbeatFactoryTest {
AgentRepositories repos = new AgentRepositories();
RuntimeManifest manifest = createManifest();
Map<String, FlowQueueStatus> queueStatus = new HashMap<>();
List<ProcessorBulletin> processorBulletins = new ArrayList<>();
C2Heartbeat heartbeat = c2HeartbeatFactory.create(new RuntimeInfoWrapper(repos, manifest, queueStatus));
C2Heartbeat heartbeat = c2HeartbeatFactory.create(new RuntimeInfoWrapper(repos, manifest, queueStatus, processorBulletins));
assertEquals(repos, heartbeat.getAgentInfo().getStatus().getRepositories());
assertNull(heartbeat.getAgentInfo().getAgentManifest());
assertEquals(queueStatus, heartbeat.getFlowInfo().getQueues());
assertEquals(processorBulletins, heartbeat.getFlowInfo().getProcessorBulletins());
assertEquals(RESOURCE_HASH, heartbeat.getResourceInfo().getHash());
}
@ -156,7 +163,7 @@ public class C2HeartbeatFactoryTest {
when(manifestHashProvider.calculateManifestHash(manifest.getBundles(), Collections.emptySet())).thenReturn(MANIFEST_HASH);
when(resourcesGlobalHashSupplier.get()).thenReturn(createResourcesGlobalHash());
C2Heartbeat heartbeat = c2HeartbeatFactory.create(new RuntimeInfoWrapper(new AgentRepositories(), manifest, new HashMap<>()));
C2Heartbeat heartbeat = c2HeartbeatFactory.create(new RuntimeInfoWrapper(new AgentRepositories(), manifest, new HashMap<>(), new ArrayList<>()));
assertEquals(MANIFEST_HASH, heartbeat.getAgentInfo().getAgentManifestHash());
assertEquals(RESOURCE_HASH, heartbeat.getResourceInfo().getHash());
@ -172,7 +179,7 @@ public class C2HeartbeatFactoryTest {
when(manifestHashProvider.calculateManifestHash(manifest.getBundles(), supportedOperations)).thenReturn(MANIFEST_HASH);
when(resourcesGlobalHashSupplier.get()).thenReturn(createResourcesGlobalHash());
C2Heartbeat heartbeat = c2HeartbeatFactory.create(new RuntimeInfoWrapper(new AgentRepositories(), manifest, new HashMap<>()));
C2Heartbeat heartbeat = c2HeartbeatFactory.create(new RuntimeInfoWrapper(new AgentRepositories(), manifest, new HashMap<>(), new ArrayList<>()));
assertEquals(MANIFEST_HASH, heartbeat.getAgentInfo().getAgentManifestHash());
assertEquals(RESOURCE_HASH, heartbeat.getResourceInfo().getHash());

View File

@ -58,7 +58,7 @@ public class DescribeManifestOperationHandlerTest {
void testDescribeManifestOperationHandlerPopulatesAckSuccessfully() {
RuntimeManifest manifest = new RuntimeManifest();
manifest.setIdentifier("manifestId");
RuntimeInfoWrapper runtimeInfoWrapper = new RuntimeInfoWrapper(null, manifest, null);
RuntimeInfoWrapper runtimeInfoWrapper = new RuntimeInfoWrapper(null, manifest, null, null);
C2Heartbeat heartbeat = new C2Heartbeat();
AgentInfo agentInfo = new AgentInfo();

View File

@ -20,6 +20,7 @@ package org.apache.nifi.c2.protocol.api;
import io.swagger.v3.oas.annotations.media.Schema;
import java.io.Serializable;
import java.util.List;
import java.util.Map;
public class FlowInfo implements Serializable {
@ -29,6 +30,7 @@ public class FlowInfo implements Serializable {
private FlowUri flowUri;
private Map<String, ComponentStatus> components;
private Map<String, FlowQueueStatus> queues;
private List<ProcessorBulletin> processorBulletins;
@Schema(description = "A unique identifier of the flow currently deployed on the agent")
public String getFlowId() {
@ -66,4 +68,13 @@ public class FlowInfo implements Serializable {
this.queues = queues;
}
@Schema(description = "Bulletins of each processors")
public List<ProcessorBulletin> getProcessorBulletins() {
return processorBulletins;
}
public void setProcessorBulletins(List<ProcessorBulletin> processorBulletins) {
this.processorBulletins = processorBulletins;
}
}

View File

@ -0,0 +1,148 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.c2.protocol.api;
import io.swagger.v3.oas.annotations.media.Schema;
import java.io.Serializable;
import java.util.Date;
public class ProcessorBulletin implements Serializable {
private static final long serialVersionUID = 1L;
private Date timestamp;
private long id;
private String nodeAddress;
private String level;
private String category;
private String message;
private String groupId;
private String groupName;
private String groupPath;
private String sourceId;
private String sourceName;
private String flowFileUuid;
@Schema(description = "When this bulletin was generated")
public Date getTimestamp() {
return timestamp;
}
public void setTimestamp(Date timestamp) {
this.timestamp = timestamp;
}
@Schema(description = "The id of the bulletin")
public long getId() {
return id;
}
public void setId(long id) {
this.id = id;
}
@Schema(description = "If clustered, the address of the node from which the bulletin originated")
public String getNodeAddress() {
return nodeAddress;
}
public void setNodeAddress(String nodeAddress) {
this.nodeAddress = nodeAddress;
}
@Schema(description = "The level of the bulletin")
public String getLevel() {
return level;
}
public void setLevel(String level) {
this.level = level;
}
@Schema(description = "The category of this bulletin")
public String getCategory() {
return category;
}
public void setCategory(String category) {
this.category = category;
}
@Schema(description = "The bulletin message")
public String getMessage() {
return message;
}
public void setMessage(String message) {
this.message = message;
}
@Schema(description = "The group id of the source component")
public String getGroupId() {
return groupId;
}
public void setGroupId(String groupId) {
this.groupId = groupId;
}
@Schema(description = "The group name of the source component")
public String getGroupName() {
return groupName;
}
public void setGroupName(String groupName) {
this.groupName = groupName;
}
@Schema(description = "The group path of the source component")
public String getGroupPath() {
return groupPath;
}
public void setGroupPath(String groupPath) {
this.groupPath = groupPath;
}
@Schema(description = "The id of the source component")
public String getSourceId() {
return sourceId;
}
public void setSourceId(String sourceId) {
this.sourceId = sourceId;
}
@Schema(description = "The name of the source component")
public String getSourceName() {
return sourceName;
}
public void setSourceName(String sourceName) {
this.sourceName = sourceName;
}
@Schema(description = "The id of the flow file")
public String getFlowFileUuid() {
return flowFileUuid;
}
public void setFlowFileUuid(String flowFileUuid) {
this.flowFileUuid = flowFileUuid;
}
}

View File

@ -92,6 +92,7 @@ public enum MiNiFiProperties {
C2_SECURITY_KEYSTORE_TYPE("c2.security.keystore.type", "JKS", false, false, VALID),
C2_REQUEST_COMPRESSION("c2.request.compression", "none", false, true, VALID),
C2_BOOTSTRAP_ACKNOWLEDGE_TIMEOUT("c2.bootstrap.acknowledge.timeout", "15 sec", false, true, VALID),
C2_FLOW_INFO_PROCESSOR_BULLETIN_LIMIT("c2.flow.info.processor.bulletin.limit", "1000", false, true, NON_NEGATIVE_INTEGER_VALIDATOR ),
NIFI_MINIFI_NOTIFIER_INGESTORS("nifi.minifi.notifier.ingestors", null, false, true, VALID),
NIFI_MINIFI_NOTIFIER_INGESTORS_FILE_CONFIG_PATH("nifi.minifi.notifier.ingestors.file.config.path", null, false, true, VALID),
NIFI_MINIFI_NOTIFIER_INGESTORS_FILE_POLLING_PERIOD_SECONDS("nifi.minifi.notifier.ingestors.file.polling.period.seconds", null, false, true, NON_NEGATIVE_INTEGER_VALIDATOR),

View File

@ -44,6 +44,7 @@ import static org.apache.nifi.minifi.commons.api.MiNiFiProperties.C2_REST_PATH_H
import static org.apache.nifi.minifi.commons.api.MiNiFiProperties.C2_REST_READ_TIMEOUT;
import static org.apache.nifi.minifi.commons.api.MiNiFiProperties.C2_REST_URL;
import static org.apache.nifi.minifi.commons.api.MiNiFiProperties.C2_REST_URL_ACK;
import static org.apache.nifi.minifi.commons.api.MiNiFiProperties.C2_FLOW_INFO_PROCESSOR_BULLETIN_LIMIT;
import static org.apache.nifi.minifi.commons.api.MiNiFiProperties.C2_RUNTIME_MANIFEST_IDENTIFIER;
import static org.apache.nifi.minifi.commons.api.MiNiFiProperties.C2_RUNTIME_TYPE;
import static org.apache.nifi.minifi.commons.api.MiNiFiProperties.C2_SECURITY_KEYSTORE_LOCATION;
@ -59,11 +60,14 @@ import static org.apache.nifi.util.NiFiProperties.SENSITIVE_PROPS_KEY;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Supplier;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.nifi.bootstrap.BootstrapCommunicator;
import org.apache.nifi.c2.client.C2ClientConfig;
@ -90,6 +94,7 @@ import org.apache.nifi.c2.protocol.api.AgentManifest;
import org.apache.nifi.c2.protocol.api.AgentRepositories;
import org.apache.nifi.c2.protocol.api.AgentRepositoryStatus;
import org.apache.nifi.c2.protocol.api.FlowQueueStatus;
import org.apache.nifi.c2.protocol.api.ProcessorBulletin;
import org.apache.nifi.c2.serializer.C2JacksonSerializer;
import org.apache.nifi.c2.serializer.C2Serializer;
import org.apache.nifi.controller.FlowController;
@ -113,6 +118,8 @@ import org.apache.nifi.minifi.commons.service.StandardFlowEnrichService;
import org.apache.nifi.minifi.commons.service.StandardFlowPropertyEncryptor;
import org.apache.nifi.minifi.commons.service.StandardFlowSerDeService;
import org.apache.nifi.nar.ExtensionManagerHolder;
import org.apache.nifi.reporting.BulletinQuery;
import org.apache.nifi.reporting.ComponentType;
import org.apache.nifi.services.FlowService;
import org.apache.nifi.util.NiFiProperties;
import org.slf4j.Logger;
@ -173,8 +180,9 @@ public class C2NifiClientService {
this.c2OperationManager = new C2OperationManager(
client, c2OperationHandlerProvider, heartbeatLock, operationQueueDAO, c2OperationRestartHandler);
Supplier<RuntimeInfoWrapper> runtimeInfoWrapperSupplier = () -> generateRuntimeInfo(clientConfig.getC2FlowInfoProcessorBulletinLimit());
this.c2HeartbeatManager = new C2HeartbeatManager(
client, heartbeatFactory, heartbeatLock, this::generateRuntimeInfo, c2OperationManager);
client, heartbeatFactory, heartbeatLock, runtimeInfoWrapperSupplier, c2OperationManager);
}
private C2ClientConfig generateClientConfig(NiFiProperties properties) {
@ -206,6 +214,8 @@ public class C2NifiClientService {
.c2RestPathHeartbeat(properties.getProperty(C2_REST_PATH_HEARTBEAT.getKey(), C2_REST_PATH_HEARTBEAT.getDefaultValue()))
.c2RestPathAcknowledge(properties.getProperty(C2_REST_PATH_ACKNOWLEDGE.getKey(), C2_REST_PATH_ACKNOWLEDGE.getDefaultValue()))
.bootstrapAcknowledgeTimeout(durationPropertyInMilliSecs(properties, C2_BOOTSTRAP_ACKNOWLEDGE_TIMEOUT))
.c2FlowInfoProcessorBulletinLimit(parseInt(properties
.getProperty(C2_FLOW_INFO_PROCESSOR_BULLETIN_LIMIT.getKey(), C2_FLOW_INFO_PROCESSOR_BULLETIN_LIMIT.getDefaultValue())))
.build();
}
@ -231,10 +241,12 @@ public class C2NifiClientService {
UpdateConfigurationStrategy updateConfigurationStrategy = new DefaultUpdateConfigurationStrategy(flowController, flowService,
new StandardFlowEnrichService(niFiProperties), flowPropertyEncryptor,
StandardFlowSerDeService.defaultInstance(), niFiProperties.getProperty(FLOW_CONFIGURATION_FILE));
Supplier<RuntimeInfoWrapper> runtimeInfoWrapperSupplier = () -> generateRuntimeInfo(
parseInt(niFiProperties.getProperty(C2_FLOW_INFO_PROCESSOR_BULLETIN_LIMIT.getKey(), C2_FLOW_INFO_PROCESSOR_BULLETIN_LIMIT.getDefaultValue())));
return new C2OperationHandlerProvider(List.of(
new UpdateConfigurationOperationHandler(client, flowIdHolder, updateConfigurationStrategy, emptyOperandPropertiesProvider),
new DescribeManifestOperationHandler(heartbeatFactory, this::generateRuntimeInfo, emptyOperandPropertiesProvider),
new DescribeManifestOperationHandler(heartbeatFactory, runtimeInfoWrapperSupplier, emptyOperandPropertiesProvider),
TransferDebugOperationHandler.create(client, emptyOperandPropertiesProvider,
transferDebugCommandHelper.debugBundleFiles(), transferDebugCommandHelper::excludeSensitiveText),
UpdateAssetOperationHandler.create(client, emptyOperandPropertiesProvider,
@ -271,10 +283,10 @@ public class C2NifiClientService {
}
}
private synchronized RuntimeInfoWrapper generateRuntimeInfo() {
private synchronized RuntimeInfoWrapper generateRuntimeInfo(int processorBulletinLimit) {
AgentManifest agentManifest = new AgentManifest(runtimeManifestService.getManifest());
agentManifest.setSupportedOperations(supportedOperationsProvider.getSupportedOperations());
return new RuntimeInfoWrapper(getAgentRepositories(), agentManifest, getQueueStatus());
return new RuntimeInfoWrapper(getAgentRepositories(), agentManifest, getQueueStatus(), getBulletins(processorBulletinLimit));
}
private AgentRepositories getAgentRepositories() {
@ -321,4 +333,38 @@ public class C2NifiClientService {
})
.collect(toMap(Pair::getKey, Pair::getValue));
}
private List<ProcessorBulletin> getBulletins(int processorBulletinLimit) {
if (processorBulletinLimit > 0) {
String groupId = flowController.getEventAccess()
.getGroupStatus(ROOT_GROUP_ID)
.getId();
BulletinQuery query = new BulletinQuery.Builder()
.sourceType(ComponentType.PROCESSOR)
.groupIdMatches(groupId)
.limit(processorBulletinLimit)
.build();
return flowController.getBulletinRepository()
.findBulletins(query)
.stream()
.map(bulletin -> {
ProcessorBulletin processorBulletin = new ProcessorBulletin();
processorBulletin.setCategory(bulletin.getCategory());
processorBulletin.setFlowFileUuid(bulletin.getFlowFileUuid());
processorBulletin.setGroupId(bulletin.getGroupId());
processorBulletin.setGroupName(bulletin.getGroupName());
processorBulletin.setGroupPath(bulletin.getGroupPath());
processorBulletin.setId(bulletin.getId());
processorBulletin.setLevel(bulletin.getLevel());
processorBulletin.setMessage(bulletin.getMessage());
processorBulletin.setNodeAddress(bulletin.getNodeAddress());
processorBulletin.setSourceId(bulletin.getSourceId());
processorBulletin.setSourceName(bulletin.getSourceName());
processorBulletin.setTimestamp(bulletin.getTimestamp());
return processorBulletin;
}).toList();
}
return new ArrayList<>();
}
}

View File

@ -113,7 +113,8 @@ nifi.minifi.sensitive.props.algorithm=
#c2.security.keystore.password=
#c2.security.keystore.type=JKS
#c2.request.compression=none
# Number of processor bulletins exposed as a part of flow info.
#c2.flow.info.processor.bulletin.limit=0
### MiNiFi Notifiers For Flow Updates
# If C2 is enabled these should be disabled