NIFI-12634 Ignored Blank Prefix Values in Kubernetes Components (#8268)

- Updated KubernetesConfigMapStateProvider and KubernetesLeaderElectionManager to ignore blank prefix values as provided in default configuration files
This commit is contained in:
David Handermann 2024-01-19 07:30:45 -06:00 committed by GitHub
parent 2acc1038c9
commit 806d8d6165
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 56 additions and 19 deletions

View File

@ -80,7 +80,8 @@ public class KubernetesLeaderElectionManager extends TrackedLeaderElectionManage
* Kubernetes Leader Election Manager constructor with NiFi Properties * Kubernetes Leader Election Manager constructor with NiFi Properties
*/ */
public KubernetesLeaderElectionManager(final NiFiProperties nifiProperties) { public KubernetesLeaderElectionManager(final NiFiProperties nifiProperties) {
this.roleIdPrefix = nifiProperties.getProperty(NiFiProperties.CLUSTER_LEADER_ELECTION_KUBERNETES_LEASE_PREFIX); final String leasePrefix = nifiProperties.getProperty(NiFiProperties.CLUSTER_LEADER_ELECTION_KUBERNETES_LEASE_PREFIX);
this.roleIdPrefix = leasePrefix == null || leasePrefix.isBlank() ? null : leasePrefix;
executorService = createExecutorService(); executorService = createExecutorService();
leaderElectionCommandProvider = createLeaderElectionCommandProvider(); leaderElectionCommandProvider = createLeaderElectionCommandProvider();
} }

View File

@ -53,6 +53,8 @@ class KubernetesLeaderElectionManagerTest {
private static final String PREFIX = "label"; private static final String PREFIX = "label";
private static final String EMPTY_PREFIX = "";
@Mock @Mock
LeaderElectionStateChangeListener changeListener; LeaderElectionStateChangeListener changeListener;
@ -68,16 +70,11 @@ class KubernetesLeaderElectionManagerTest {
ManagedLeaderElectionCommandProvider leaderElectionCommandProvider; ManagedLeaderElectionCommandProvider leaderElectionCommandProvider;
KubernetesLeaderElectionManager manager; KubernetesLeaderElectionManager manager;
KubernetesLeaderElectionManager managerWithProperties;
@BeforeEach @BeforeEach
void setManager() { void setManager() {
leaderElectionCommandProvider = new ManagedLeaderElectionCommandProvider(); leaderElectionCommandProvider = new ManagedLeaderElectionCommandProvider();
manager = new MockKubernetesLeaderElectionManager(new NiFiProperties()); manager = new MockKubernetesLeaderElectionManager(new NiFiProperties());
final Properties properties = new Properties();
properties.setProperty(NiFiProperties.CLUSTER_LEADER_ELECTION_KUBERNETES_LEASE_PREFIX, PREFIX);
managerWithProperties = new MockKubernetesLeaderElectionManager(new NiFiProperties(properties));
} }
@Test @Test
@ -195,15 +192,37 @@ class KubernetesLeaderElectionManagerTest {
@Test @Test
void testRoleIdWithPrefix() { void testRoleIdWithPrefix() {
managerWithProperties.start(); final Properties properties = new Properties();
properties.setProperty(NiFiProperties.CLUSTER_LEADER_ELECTION_KUBERNETES_LEASE_PREFIX, PREFIX);
final MockKubernetesLeaderElectionManager electionManager = new MockKubernetesLeaderElectionManager(new NiFiProperties(properties));
electionManager.start();
setSubmitStartLeading(); setSubmitStartLeading();
managerWithProperties.register(ROLE, changeListener, PARTICIPANT_ID); electionManager.register(ROLE, changeListener, PARTICIPANT_ID);
captureRunCommand(); captureRunCommand();
assertEquals(PREFIX + "-" + LEADER_ELECTION_ROLE.getRoleId(), leaderElectionCommandProvider.name); final String expected = String.format("%s-%s", PREFIX, LEADER_ELECTION_ROLE.getRoleId());
assertEquals(expected, leaderElectionCommandProvider.name);
}
@Test
void testRoleIdWithEmptyPrefix() {
final Properties properties = new Properties();
properties.setProperty(NiFiProperties.CLUSTER_LEADER_ELECTION_KUBERNETES_LEASE_PREFIX, EMPTY_PREFIX);
final MockKubernetesLeaderElectionManager electionManager = new MockKubernetesLeaderElectionManager(new NiFiProperties(properties));
electionManager.start();
setSubmitStartLeading();
electionManager.register(ROLE, changeListener, PARTICIPANT_ID);
captureRunCommand();
assertEquals(LEADER_ELECTION_ROLE.getRoleId(), leaderElectionCommandProvider.name);
} }
private void setSubmitStartLeading() { private void setSubmitStartLeading() {

View File

@ -48,6 +48,10 @@
<artifactId>kubernetes-server-mock</artifactId> <artifactId>kubernetes-server-mock</artifactId>
<scope>test</scope> <scope>test</scope>
</dependency> </dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-mock</artifactId>
</dependency>
<dependency> <dependency>
<groupId>org.apache.nifi</groupId> <groupId>org.apache.nifi</groupId>
<artifactId>nifi-expression-language</artifactId> <artifactId>nifi-expression-language</artifactId>

View File

@ -41,6 +41,7 @@ import java.util.regex.Pattern;
import org.apache.nifi.components.AbstractConfigurableComponent; import org.apache.nifi.components.AbstractConfigurableComponent;
import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyValue; import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.components.Validator;
import org.apache.nifi.components.state.Scope; import org.apache.nifi.components.state.Scope;
import org.apache.nifi.components.state.StateMap; import org.apache.nifi.components.state.StateMap;
import org.apache.nifi.components.state.StateProvider; import org.apache.nifi.components.state.StateProvider;
@ -48,7 +49,6 @@ import org.apache.nifi.components.state.StateProviderInitializationContext;
import org.apache.nifi.kubernetes.client.ServiceAccountNamespaceProvider; import org.apache.nifi.kubernetes.client.ServiceAccountNamespaceProvider;
import org.apache.nifi.kubernetes.client.StandardKubernetesClientProvider; import org.apache.nifi.kubernetes.client.StandardKubernetesClientProvider;
import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.util.StandardValidators;
/** /**
* State Provider implementation based on Kubernetes ConfigMaps with Base64 encoded keys to meet Kubernetes constraints * State Provider implementation based on Kubernetes ConfigMaps with Base64 encoded keys to meet Kubernetes constraints
@ -57,7 +57,7 @@ public class KubernetesConfigMapStateProvider extends AbstractConfigurableCompon
static final PropertyDescriptor CONFIG_MAP_NAME_PREFIX = new PropertyDescriptor.Builder() static final PropertyDescriptor CONFIG_MAP_NAME_PREFIX = new PropertyDescriptor.Builder()
.name("ConfigMap Name Prefix") .name("ConfigMap Name Prefix")
.description("Optional prefix that the Provider will prepend to Kubernetes ConfigMap names. The resulting ConfigMap name will contain nifi-component and the component identifier.") .description("Optional prefix that the Provider will prepend to Kubernetes ConfigMap names. The resulting ConfigMap name will contain nifi-component and the component identifier.")
.addValidator(StandardValidators.NON_BLANK_VALIDATOR) .addValidator(Validator.VALID)
.required(false) .required(false)
.build(); .build();
@ -126,8 +126,9 @@ public class KubernetesConfigMapStateProvider extends AbstractConfigurableCompon
this.namespace = new ServiceAccountNamespaceProvider().getNamespace(); this.namespace = new ServiceAccountNamespaceProvider().getNamespace();
final PropertyValue configMapNamePrefixProperty = context.getProperty(CONFIG_MAP_NAME_PREFIX); final PropertyValue configMapNamePrefixProperty = context.getProperty(CONFIG_MAP_NAME_PREFIX);
final String configMapNamePrefix = configMapNamePrefixProperty.isSet() ? configMapNamePrefixProperty.getValue() + PREFIX_SEPARATOR : EMPTY_PREFIX; final String prefixPropertyValue = configMapNamePrefixProperty.getValue();
final String configMapNamePrefix = prefixPropertyValue == null || prefixPropertyValue.isBlank() ? EMPTY_PREFIX : prefixPropertyValue + PREFIX_SEPARATOR;
configMapNameFormat = String.format(CONFIG_MAP_NAME_FORMAT, configMapNamePrefix); configMapNameFormat = String.format(CONFIG_MAP_NAME_FORMAT, configMapNamePrefix);
configMapNamePattern = Pattern.compile(String.format(CONFIG_MAP_NAME_PATTERN_FORMAT, configMapNamePrefix)); configMapNamePattern = Pattern.compile(String.format(CONFIG_MAP_NAME_PATTERN_FORMAT, configMapNamePrefix));
} }

View File

@ -24,11 +24,14 @@ import io.fabric8.kubernetes.client.server.mock.KubernetesMockServer;
import io.fabric8.kubernetes.client.server.mock.KubernetesMockServerExtension; import io.fabric8.kubernetes.client.server.mock.KubernetesMockServerExtension;
import io.fabric8.mockwebserver.dsl.HttpMethod; import io.fabric8.mockwebserver.dsl.HttpMethod;
import okhttp3.mockwebserver.RecordedRequest; import okhttp3.mockwebserver.RecordedRequest;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.state.Scope; import org.apache.nifi.components.state.Scope;
import org.apache.nifi.components.state.StateMap; import org.apache.nifi.components.state.StateMap;
import org.apache.nifi.components.state.StateProviderInitializationContext; import org.apache.nifi.components.state.StateProviderInitializationContext;
import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.parameter.ParameterLookup; import org.apache.nifi.parameter.ParameterLookup;
import org.apache.nifi.util.MockProcessContext;
import org.apache.nifi.util.MockValidationContext;
import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.api.extension.ExtendWith;
@ -72,6 +75,8 @@ class KubernetesConfigMapStateProviderTest {
private static final String CONFIG_MAP_NAME_PREFIX_VALUE = "label"; private static final String CONFIG_MAP_NAME_PREFIX_VALUE = "label";
private static final String EMPTY = "";
@Mock @Mock
StateProviderInitializationContext context; StateProviderInitializationContext context;
@ -97,12 +102,19 @@ class KubernetesConfigMapStateProviderTest {
} }
@Test @Test
void testInitializeShutdown() { void testInitializeValidateShutdown() {
setContext(); setContextWithConfigMapNamePrefix(EMPTY);
provider.initialize(context);
provider.initialize(context);
assertEquals(IDENTIFIER, provider.getIdentifier()); assertEquals(IDENTIFIER, provider.getIdentifier());
final MockProcessContext processContext = new MockProcessContext(provider);
processContext.setProperty(KubernetesConfigMapStateProvider.CONFIG_MAP_NAME_PREFIX, EMPTY);
final MockValidationContext validationContext = new MockValidationContext(processContext, null);
final Collection<ValidationResult> results = provider.validate(validationContext);
assertTrue(results.isEmpty());
provider.shutdown(); provider.shutdown();
} }
@ -299,7 +311,7 @@ class KubernetesConfigMapStateProviderTest {
@Test @Test
void testSetStateGetStateWithPrefix() throws IOException { void testSetStateGetStateWithPrefix() throws IOException {
setContextWithProperties(); setContextWithConfigMapNamePrefix(CONFIG_MAP_NAME_PREFIX_VALUE);
provider.initialize(context); provider.initialize(context);
final Map<String, String> state = Collections.singletonMap(STATE_PROPERTY, STATE_VALUE); final Map<String, String> state = Collections.singletonMap(STATE_PROPERTY, STATE_VALUE);
@ -317,7 +329,7 @@ class KubernetesConfigMapStateProviderTest {
@Test @Test
void testSetStateGetStoredComponentIdsWithPrefix() throws IOException { void testSetStateGetStoredComponentIdsWithPrefix() throws IOException {
setContextWithProperties(); setContextWithConfigMapNamePrefix(CONFIG_MAP_NAME_PREFIX_VALUE);
provider.initialize(context); provider.initialize(context);
final Collection<String> initialStoredComponentIds = provider.getStoredComponentIds(); final Collection<String> initialStoredComponentIds = provider.getStoredComponentIds();
@ -340,10 +352,10 @@ class KubernetesConfigMapStateProviderTest {
.thenReturn(new StandardPropertyValue(null, null, ParameterLookup.EMPTY)); .thenReturn(new StandardPropertyValue(null, null, ParameterLookup.EMPTY));
} }
private void setContextWithProperties() { private void setContextWithConfigMapNamePrefix(final String configMapNamePrefix) {
setContext(); setContext();
when(context.getProperty(KubernetesConfigMapStateProvider.CONFIG_MAP_NAME_PREFIX)) when(context.getProperty(KubernetesConfigMapStateProvider.CONFIG_MAP_NAME_PREFIX))
.thenReturn(new StandardPropertyValue(CONFIG_MAP_NAME_PREFIX_VALUE, null, ParameterLookup.EMPTY)); .thenReturn(new StandardPropertyValue(configMapNamePrefix, null, ParameterLookup.EMPTY));
} }
private void assertStateEquals(final Map<String, String> expected, final StateMap stateMap) { private void assertStateEquals(final Map<String, String> expected, final StateMap stateMap) {