NIFI-9310: Addressed issues found while testing the component verification features. Added a supportsVerification flag to the ConfigAnalysis DTO (#5469)

This closes #5469
This commit is contained in:
markap14 2021-10-20 11:07:17 -04:00 committed by GitHub
parent a380fb2964
commit 90ae271692
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 68 additions and 23 deletions

View File

@ -27,6 +27,7 @@ public class ConfigurationAnalysisDTO {
private String componentId;
private Map<String, String> properties;
private Map<String, String> referencedAttributes;
private boolean supportsVerification;
@ApiModelProperty("The ID of the component")
public String getComponentId() {
@ -54,4 +55,13 @@ public class ConfigurationAnalysisDTO {
public void setReferencedAttributes(final Map<String, String> referencedAttributes) {
this.referencedAttributes = referencedAttributes;
}
@ApiModelProperty("Whether or not the component supports verification")
public boolean isSupportsVerification() {
return supportsVerification;
}
public void setSupportsVerification(final boolean supportsVerification) {
this.supportsVerification = supportsVerification;
}
}

View File

@ -642,7 +642,7 @@ public interface NiFiServiceFacade {
* @param attributes a map of values that can be used for resolving FlowFile attributes for Expression Language
* @return verification results
*/
List<ConfigVerificationResultDTO> verifyProcessorConfiguration(String processorId, Map<String, String> properties, Map<String, String> attributes);
List<ConfigVerificationResultDTO> performProcessorConfigVerification(String processorId, Map<String, String> properties, Map<String, String> attributes);
/**
* Performs analysis of the given properties, determining which attributes are referenced by properties
@ -2058,7 +2058,7 @@ public interface NiFiServiceFacade {
* @param variables a map of values that can be used for resolving FlowFile attributes for Expression Language
* @return verification results
*/
List<ConfigVerificationResultDTO> verifyControllerServiceConfiguration(String controllerServiceId, Map<String, String> properties, Map<String, String> variables);
List<ConfigVerificationResultDTO> performControllerServiceConfigVerification(String controllerServiceId, Map<String, String> properties, Map<String, String> variables);
/**
* Performs analysis of the given properties, determining which attributes are referenced by properties
@ -2159,7 +2159,7 @@ public interface NiFiServiceFacade {
* @param properties the configured properties to verify
* @return verification results
*/
List<ConfigVerificationResultDTO> verifyReportingTaskConfiguration(String reportingTaskId, Map<String, String> properties);
List<ConfigVerificationResultDTO> performReportingTaskConfigVerification(String reportingTaskId, Map<String, String> properties);
/**
* Performs analysis of the given properties, determining which attributes are referenced by properties

View File

@ -81,6 +81,7 @@ import org.apache.nifi.controller.ReportingTaskNode;
import org.apache.nifi.controller.ScheduledState;
import org.apache.nifi.controller.Snippet;
import org.apache.nifi.controller.Template;
import org.apache.nifi.controller.VerifiableControllerService;
import org.apache.nifi.controller.flow.FlowManager;
import org.apache.nifi.controller.label.Label;
import org.apache.nifi.controller.leader.election.LeaderElectionManager;
@ -118,6 +119,7 @@ import org.apache.nifi.parameter.ParameterDescriptor;
import org.apache.nifi.parameter.ParameterLookup;
import org.apache.nifi.parameter.ParameterReferenceManager;
import org.apache.nifi.parameter.StandardParameterContext;
import org.apache.nifi.processor.VerifiableProcessor;
import org.apache.nifi.prometheus.util.BulletinMetricsRegistry;
import org.apache.nifi.prometheus.util.ConnectionAnalyticsMetricsRegistry;
import org.apache.nifi.prometheus.util.JvmMetricsRegistry;
@ -158,6 +160,7 @@ import org.apache.nifi.reporting.Bulletin;
import org.apache.nifi.reporting.BulletinQuery;
import org.apache.nifi.reporting.BulletinRepository;
import org.apache.nifi.reporting.ComponentType;
import org.apache.nifi.reporting.VerifiableReportingTask;
import org.apache.nifi.util.BundleUtils;
import org.apache.nifi.util.FlowDifferenceFilters;
import org.apache.nifi.util.NiFiProperties;
@ -772,7 +775,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
}
@Override
public List<ConfigVerificationResultDTO> verifyProcessorConfiguration(final String processorId, final Map<String, String> properties, final Map<String, String> attributes) {
public List<ConfigVerificationResultDTO> performProcessorConfigVerification(final String processorId, final Map<String, String> properties, final Map<String, String> attributes) {
return processorDAO.verifyProcessorConfiguration(processorId, properties, attributes);
}
@ -793,12 +796,25 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
dto.setComponentId(componentNode.getIdentifier());
dto.setProperties(properties);
dto.setReferencedAttributes(referencedAttributes);
dto.setSupportsVerification(isVerificationSupported(componentNode));
final ConfigurationAnalysisEntity entity = new ConfigurationAnalysisEntity();
entity.setConfigurationAnalysis(dto);
return entity;
}
private boolean isVerificationSupported(final ComponentNode componentNode) {
if (componentNode instanceof ProcessorNode) {
return ((ProcessorNode) componentNode).getProcessor() instanceof VerifiableProcessor;
} else if (componentNode instanceof ControllerServiceNode) {
return ((ControllerServiceNode) componentNode).getControllerServiceImplementation() instanceof VerifiableControllerService;
} else if (componentNode instanceof ReportingTaskNode) {
return ((ReportingTaskNode) componentNode).getReportingTask() instanceof VerifiableReportingTask;
} else {
return false;
}
}
private Map<String, String> determineReferencedAttributes(final Map<String, String> properties, final ComponentNode componentNode, final ParameterContext parameterContext) {
final Map<String, String> mergedProperties = new LinkedHashMap<>();
componentNode.getRawPropertyValues().forEach((desc, value) -> mergedProperties.put(desc.getName(), value));
@ -2834,7 +2850,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
}
@Override
public List<ConfigVerificationResultDTO> verifyControllerServiceConfiguration(final String controllerServiceId, final Map<String, String> properties, final Map<String, String> variables) {
public List<ConfigVerificationResultDTO> performControllerServiceConfigVerification(final String controllerServiceId, final Map<String, String> properties, final Map<String, String> variables) {
return controllerServiceDAO.verifyConfiguration(controllerServiceId, properties, variables);
}
@ -3227,7 +3243,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
}
@Override
public List<ConfigVerificationResultDTO> verifyReportingTaskConfiguration(final String reportingTaskId, final Map<String, String> properties) {
public List<ConfigVerificationResultDTO> performReportingTaskConfigVerification(final String reportingTaskId, final Map<String, String> properties) {
return reportingTaskDAO.verifyConfiguration(reportingTaskId, properties);
}

View File

@ -1086,7 +1086,7 @@ public class ControllerServiceResource extends ApplicationResource {
final Consumer<AsynchronousWebRequest<VerifyConfigRequestEntity, List<ConfigVerificationResultDTO>>> updateTask = asyncRequest -> {
try {
final Map<String, String> attributes = requestDto.getAttributes() == null ? Collections.emptyMap() : requestDto.getAttributes();
final List<ConfigVerificationResultDTO> results = serviceFacade.verifyControllerServiceConfiguration(serviceId, requestDto.getProperties(), attributes);
final List<ConfigVerificationResultDTO> results = serviceFacade.performControllerServiceConfigVerification(serviceId, requestDto.getProperties(), attributes);
asyncRequest.markStepComplete(results);
} catch (final Exception e) {
logger.error("Failed to verify Controller Service configuration", e);

View File

@ -1066,7 +1066,7 @@ public class ProcessorResource extends ApplicationResource {
final Consumer<AsynchronousWebRequest<VerifyConfigRequestEntity, List<ConfigVerificationResultDTO>>> updateTask = asyncRequest -> {
try {
final Map<String, String> attributes = requestDto.getAttributes() == null ? Collections.emptyMap() : requestDto.getAttributes();
final List<ConfigVerificationResultDTO> results = serviceFacade.verifyProcessorConfiguration(processorId, requestDto.getProperties(), attributes);
final List<ConfigVerificationResultDTO> results = serviceFacade.performProcessorConfigVerification(processorId, requestDto.getProperties(), attributes);
asyncRequest.markStepComplete(results);
} catch (final Exception e) {
logger.error("Failed to verify Processor configuration", e);

View File

@ -891,7 +891,7 @@ public class ReportingTaskResource extends ApplicationResource {
// Submit the request to be performed in the background
final Consumer<AsynchronousWebRequest<VerifyConfigRequestEntity, List<ConfigVerificationResultDTO>>> updateTask = asyncRequest -> {
try {
final List<ConfigVerificationResultDTO> results = serviceFacade.verifyReportingTaskConfiguration(taskId, requestDto.getProperties());
final List<ConfigVerificationResultDTO> results = serviceFacade.performReportingTaskConfigVerification(taskId, requestDto.getProperties());
asyncRequest.markStepComplete(results);
} catch (final Exception e) {
logger.error("Failed to verify Reporting Task configuration", e);

View File

@ -56,8 +56,8 @@ public class AsyncRequestManager<R, T> implements RequestManager<R, T> {
this.requestExpirationMillis = requestExpirationMillis;
this.maxConcurrentRequests = maxConcurrentRequests;
this.threadPool = new ThreadPoolExecutor(1, 50, 5L, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(maxConcurrentRequests),
this.threadPool = new ThreadPoolExecutor(1, maxConcurrentRequests, 5L, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(1),
new ThreadFactory() {
private final AtomicLong counter = new AtomicLong(0L);

View File

@ -158,7 +158,10 @@ public class StandardAsynchronousWebRequest<R, T> implements AsynchronousWebRequ
this.cancelled = true;
percentComplete = 100;
fail("Request cancelled by user");
cancelCallback.run();
if (cancelCallback != null) {
cancelCallback.run();
}
}
@Override

View File

@ -308,6 +308,7 @@ public class ConsumerPool implements Closeable {
if (topicPattern == null) {
final Map<String, Long> messagesToConsumePerTopic = new HashMap<>();
long toConsume = 0L;
for (final String topicName : topics) {
final List<PartitionInfo> partitionInfos = consumer.partitionsFor(topicName);
@ -320,13 +321,7 @@ public class ConsumerPool implements Closeable {
final Map<TopicPartition, OffsetAndMetadata> committedOffsets = consumer.committed(topicPartitions, Duration.ofSeconds(30));
for (final TopicPartition topicPartition : endOffsets.keySet()) {
long endOffset = endOffsets.get(topicPartition);
// When no messages have been added to a topic, end offset is 0. However, after the first message is added,
// the end offset points to where the next message will be. I.e., it goes from 0 to 2. We want the offset
// of the last message, not the offset of where the next one will be. So we subtract one.
if (endOffset > 0) {
endOffset--;
}
final long endOffset = endOffsets.get(topicPartition);
final long beginningOffset = beginningOffsets.getOrDefault(topicPartition, 0L);
if (endOffset <= beginningOffset) {
@ -339,6 +334,7 @@ public class ConsumerPool implements Closeable {
final long currentOffset = Math.max(beginningOffset, committedOffset);
final long messagesToConsume = endOffset - currentOffset;
toConsume += messagesToConsume;
messagesToConsumePerTopic.merge(topicPartition.topic(), messagesToConsume, Long::sum);
}
@ -351,12 +347,31 @@ public class ConsumerPool implements Closeable {
.build());
logger.info("Successfully determined offsets for {} topics. Number of messages left to consume per topic: {}", messagesToConsumePerTopic.size(), messagesToConsumePerTopic);
if (readerFactory != null) {
if (toConsume > 0) {
final ConfigVerificationResult checkDataResult = checkRecordIsParsable(lease);
verificationResults.add(checkDataResult);
} else {
verificationResults.add(new ConfigVerificationResult.Builder()
.verificationStepName("Parse Records")
.outcome(Outcome.SKIPPED)
.explanation("There are no available Records to attempt parsing")
.build());
}
}
} else {
verificationResults.add(new ConfigVerificationResult.Builder()
.verificationStepName("Determine Topic Offsets")
.outcome(Outcome.SKIPPED)
.explanation("Cannot determine Topic Offsets because a Topic Wildcard was used instead of an explicit Topic Name")
.build());
if (readerFactory != null) {
final ConfigVerificationResult checkDataResult = checkRecordIsParsable(lease);
verificationResults.add(checkDataResult);
}
}
} catch (final Exception e) {
logger.error("Failed to determine Topic Offsets in order to verify configuration", e);
@ -366,11 +381,12 @@ public class ConsumerPool implements Closeable {
.outcome(Outcome.FAILED)
.explanation("Could not fetch Topic Offsets: " + e)
.build());
}
if (readerFactory != null) {
final ConfigVerificationResult checkDataResult = checkRecordIsParsable(lease);
verificationResults.add(checkDataResult);
verificationResults.add(new ConfigVerificationResult.Builder()
.verificationStepName("Parse Records")
.outcome(Outcome.SKIPPED)
.explanation("Could not determine offsets so will not attempt to fetch records")
.build());
}
return verificationResults;