NIFI-13556 Fixed logic for identifying components from uploaded NAR

- Return copied set of Extensions from ExtensionManager to avoid concurrent modification

This closes #9089

Signed-off-by: David Handermann <exceptionfactory@apache.org>
This commit is contained in:
Bryan Bende 2024-07-17 12:15:15 -04:00 committed by exceptionfactory
parent 0e7e511aec
commit 8e5cf99cd7
No known key found for this signature in database
4 changed files with 49 additions and 12 deletions

View File

@ -42,21 +42,22 @@ class ComponentNodeDefinitionPredicate implements Predicate<ComponentNode> {
}
private <T extends ComponentNode> boolean isComponentFromType(final T componentNode, final ExtensionDefinition extensionDefinition) {
final String componentType = componentNode.getComponentType();
final String componentClassName = componentNode.getCanonicalClassName();
final BundleCoordinate componentCoordinate = componentNode.getBundleCoordinate();
final String extensionDefinitionType = extensionDefinition.getImplementationClassName();
final String extensionDefinitionClassName = extensionDefinition.getImplementationClassName();
final BundleCoordinate extensionDefinitionCoordinate = extensionDefinition.getBundle().getBundleDetails().getCoordinate();
if (PythonBundle.isPythonCoordinate(componentCoordinate)) {
final String componentType = componentNode.getComponentType();
final String pythonComponentType = "python." + componentType;
return pythonComponentType.equals(extensionDefinitionType) && componentCoordinate.equals(extensionDefinitionCoordinate);
return pythonComponentType.equals(extensionDefinitionClassName) && componentCoordinate.equals(extensionDefinitionCoordinate);
} else if (componentNode.isExtensionMissing()) {
return componentType.equals(extensionDefinitionType)
return componentClassName.equals(extensionDefinitionClassName)
&& componentCoordinate.getGroup().equals(extensionDefinitionCoordinate.getGroup())
&& componentCoordinate.getId().equals(extensionDefinitionCoordinate.getId());
} else {
return componentType.equals(extensionDefinitionType) && componentCoordinate.equals(extensionDefinitionCoordinate);
return componentClassName.equals(extensionDefinitionClassName) && componentCoordinate.equals(extensionDefinitionCoordinate);
}
}
}

View File

@ -34,7 +34,8 @@ import static org.mockito.Mockito.when;
public class ComponentNodeDefinitionPredicateTest {
private static final String STANDARD_PROCESSOR_TYPE = "org.apache.nifi.MyProcessor";
private static final String STANDARD_PROCESSOR_COMPONENT_TYPE = "MyProcessor";
private static final String STANDARD_PROCESSOR_CLASS_NAME = "org.apache.nifi." + STANDARD_PROCESSOR_COMPONENT_TYPE;
private static final BundleCoordinate STANDARD_BUNDLE_COORDINATE_V1 = new BundleCoordinate("org.apache.nifi", "my-processors-nar", "1.0.0");
private static final BundleCoordinate STANDARD_BUNDLE_COORDINATE_V2 = new BundleCoordinate("org.apache.nifi", "my-processors-nar", "2.0.0");
@ -55,7 +56,7 @@ public class ComponentNodeDefinitionPredicateTest {
when(standardBundle.getBundleDetails()).thenReturn(standardBundleDetails);
standardProcessorDefinition = mock(ExtensionDefinition.class);
when(standardProcessorDefinition.getImplementationClassName()).thenReturn(STANDARD_PROCESSOR_TYPE);
when(standardProcessorDefinition.getImplementationClassName()).thenReturn(STANDARD_PROCESSOR_CLASS_NAME);
when(standardProcessorDefinition.getBundle()).thenReturn(standardBundle);
final BundleDetails pythonBundleDetails = mock(BundleDetails.class);
@ -72,7 +73,7 @@ public class ComponentNodeDefinitionPredicateTest {
@Test
public void testWhenComponentNodeMatchesDefinition() {
final ComponentNode componentNode = mock(ComponentNode.class);
when(componentNode.getComponentType()).thenReturn(STANDARD_PROCESSOR_TYPE);
when(componentNode.getCanonicalClassName()).thenReturn(STANDARD_PROCESSOR_CLASS_NAME);
when(componentNode.getBundleCoordinate()).thenReturn(STANDARD_BUNDLE_COORDINATE_V1);
final Predicate<ComponentNode> predicate = new ComponentNodeDefinitionPredicate(Set.of(standardProcessorDefinition));
@ -82,7 +83,7 @@ public class ComponentNodeDefinitionPredicateTest {
@Test
public void testWhenComponentNodeTypeDoesNotMatchDefinition() {
final ComponentNode componentNode = mock(ComponentNode.class);
when(componentNode.getComponentType()).thenReturn("com.SomeOtherProcessor");
when(componentNode.getCanonicalClassName()).thenReturn("com.SomeOtherProcessor");
when(componentNode.getBundleCoordinate()).thenReturn(STANDARD_BUNDLE_COORDINATE_V1);
final Predicate<ComponentNode> predicate = new ComponentNodeDefinitionPredicate(Set.of(standardProcessorDefinition));
@ -92,7 +93,7 @@ public class ComponentNodeDefinitionPredicateTest {
@Test
public void testWhenComponentNodeCoordinateDoesMatchDefinition() {
final ComponentNode componentNode = mock(ComponentNode.class);
when(componentNode.getComponentType()).thenReturn(STANDARD_PROCESSOR_TYPE);
when(componentNode.getCanonicalClassName()).thenReturn(STANDARD_PROCESSOR_CLASS_NAME);
when(componentNode.getBundleCoordinate()).thenReturn(STANDARD_BUNDLE_COORDINATE_V2);
final Predicate<ComponentNode> predicate = new ComponentNodeDefinitionPredicate(Set.of(standardProcessorDefinition));
@ -102,7 +103,7 @@ public class ComponentNodeDefinitionPredicateTest {
@Test
public void testWhenComponentNodeIsMissingAndCompatibleBundle() {
final ComponentNode componentNode = mock(ComponentNode.class);
when(componentNode.getComponentType()).thenReturn(STANDARD_PROCESSOR_TYPE);
when(componentNode.getCanonicalClassName()).thenReturn(STANDARD_PROCESSOR_CLASS_NAME);
when(componentNode.getBundleCoordinate()).thenReturn(STANDARD_BUNDLE_COORDINATE_V2);
when(componentNode.isExtensionMissing()).thenReturn(true);

View File

@ -830,7 +830,7 @@ public class StandardExtensionDiscoveringManager implements ExtensionDiscovering
throw new IllegalArgumentException("Class cannot be null");
}
final Set<ExtensionDefinition> extensions = definitionMap.get(definition);
return (extensions == null) ? Collections.emptySet() : extensions;
return (extensions == null) ? Collections.emptySet() : new HashSet<>(extensions);
}
@Override

View File

@ -28,8 +28,11 @@ import org.apache.nifi.web.api.entity.ControllerServiceTypesEntity;
import org.apache.nifi.web.api.entity.NarDetailsEntity;
import org.apache.nifi.web.api.entity.NarSummariesEntity;
import org.apache.nifi.web.api.entity.NarSummaryEntity;
import org.apache.nifi.web.api.entity.ProcessorEntity;
import org.apache.nifi.web.api.entity.ProcessorTypesEntity;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
@ -43,8 +46,11 @@ import static org.junit.jupiter.api.Assertions.assertThrows;
public class NarUploadStandaloneIT extends NiFiSystemIT {
private static final Logger logger = LoggerFactory.getLogger(NarUploadStandaloneIT.class);
private static final String NAR_PROVIDER_NARS_LOCATION = "target/nifi-nar-provider-nars";
private static final String PROCESSORS_NAR_ID = "nifi-nar-provider-processors-nar";
private static final String PROCESSOR_CLASS_NAME = "org.apache.nifi.nar.provider.GetClassLoaderInfo";
private static final String CONTROLLER_SERVICE_API_NAR_ID = "nifi-nar-provider-service-api-nar";
private static final String CONTROLLER_SERVICE_NAR_ID = "nifi-nar-provider-service-nar";
@ -116,6 +122,14 @@ public class NarUploadStandaloneIT extends NiFiSystemIT {
assertNotNull(serviceApiNarDetails.getDependentCoordinates());
assertEquals(2, serviceApiNarDetails.getDependentCoordinates().size());
// Create instance of the custom processor
final NarCoordinateDTO uploadedProcessorCoordinate = uploadedProcessorsNar.getCoordinate();
final ProcessorEntity customProcessor = getClientUtil().createProcessor(PROCESSOR_CLASS_NAME, uploadedProcessorCoordinate.getGroup(),
uploadedProcessorCoordinate.getArtifact(), uploadedProcessorCoordinate.getVersion());
assertNotNull(customProcessor.getComponent());
assertNotNull(customProcessor.getComponent().getExtensionMissing());
assertFalse(customProcessor.getComponent().getExtensionMissing());
// Verify service API NAR can't be replaced while other NARs depend on it
assertThrows(NiFiClientException.class, () -> narUploadUtil.uploadNar(narsLocation, CONTROLLER_SERVICE_API_NAR_ID));
@ -137,6 +151,27 @@ public class NarUploadStandaloneIT extends NiFiSystemIT {
// Verify no NARs exist
narUploadUtil.verifyNarSummaries(0);
// Verify custom processor is ghosted
final String customProcessorId = customProcessor.getId();
waitFor(() -> {
final ProcessorEntity customProcessorAfterDelete = getNifiClient().getProcessorClient().getProcessor(customProcessorId);
logger.info("Waiting for processor {} to be considered missing", customProcessorId);
return customProcessorAfterDelete.getComponent().getExtensionMissing();
});
// Restore NARs
narUploadUtil.uploadNar(narsLocation, CONTROLLER_SERVICE_API_NAR_ID);
narUploadUtil.uploadNar(narsLocation, CONTROLLER_SERVICE_NAR_ID);
final NarSummaryDTO restoredProcessorsNar = narUploadUtil.uploadNar(narsLocation, PROCESSORS_NAR_ID);
waitFor(narUploadUtil.getWaitForNarStateSupplier(restoredProcessorsNar.getIdentifier(), NarState.INSTALLED));
// Verify processor is un-ghosted
waitFor(() -> {
final ProcessorEntity customProcessorAfterDelete = getNifiClient().getProcessorClient().getProcessor(customProcessorId);
logger.info("Waiting for processor {} to be considered not missing", customProcessorId);
return !customProcessorAfterDelete.getComponent().getExtensionMissing();
});
}
private boolean matchingBundles(final BundleDTO bundleDTO, final NarCoordinateDTO narCoordinateDTO) {