NIFI-12590 Added Prefix Properties for Kubernetes Leases and ConfigMaps

This closes #8240

Signed-off-by: David Handermann <exceptionfactory@apache.org>
This commit is contained in:
Juldrixx 2024-01-12 17:06:07 +01:00 committed by exceptionfactory
parent 050f81f686
commit 787c45dd61
No known key found for this signature in database
GPG Key ID: 29B6A52D2AAE8DBA
12 changed files with 148 additions and 12 deletions

View File

@ -326,6 +326,9 @@ public class NiFiProperties extends ApplicationProperties {
public static final String PYTHON_CONTROLLER_DEBUGPY_HOST = "nifi.python.controller.debugpy.host";
public static final String PYTHON_CONTROLLER_DEBUGPY_LOGS_DIR = "nifi.python.controller.debugpy.logs.directory";
// kubernetes properties
public static final String CLUSTER_LEADER_ELECTION_KUBERNETES_LEASE_PREFIX = "nifi.cluster.leader.election.kubernetes.lease.prefix";
public static final String DEFAULT_PYTHON_WORKING_DIRECTORY = "./work/python";
// automatic diagnostic defaults

View File

@ -82,6 +82,9 @@ prop_replace 'nifi.analytics.connection.model.implementation' "${NIFI_ANALYTIC
prop_replace 'nifi.analytics.connection.model.score.name' "${NIFI_ANALYTICS_MODEL_SCORE_NAME:-rSquared}"
prop_replace 'nifi.analytics.connection.model.score.threshold' "${NIFI_ANALYTICS_MODEL_SCORE_THRESHOLD:-.90}"
# Set kubernetes properties
prop_replace 'nifi.cluster.leader.election.kubernetes.lease.prefix' "${NIFI_CLUSTER_LEADER_ELECTION_KUBERNETES_LEASE_PREFIX:-}"
# Add NAR provider properties
# nifi-registry NAR provider
if [ -n "${NIFI_NAR_LIBRARY_PROVIDER_NIFI_REGISTRY_URL}" ]; then

View File

@ -29,3 +29,5 @@ edit_property() {
edit_property 'Connect String' "${NIFI_ZK_CONNECT_STRING}"
edit_property "Root Node" "${NIFI_ZK_ROOT_NODE}"
edit_property 'ConfigMap Name Prefix' "${NIFI_KUBERNETES_CONFIGMAP_NAME_PREFIX}"

View File

@ -30,6 +30,10 @@
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-utils</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-properties</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-framework-leader-election-shared</artifactId>

View File

@ -25,6 +25,7 @@ import org.apache.nifi.kubernetes.client.ServiceAccountNamespaceProvider;
import org.apache.nifi.kubernetes.client.StandardKubernetesClientProvider;
import org.apache.nifi.kubernetes.leader.election.command.LeaderElectionCommandProvider;
import org.apache.nifi.kubernetes.leader.election.command.StandardLeaderElectionCommandProvider;
import org.apache.nifi.util.NiFiProperties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -73,10 +74,13 @@ public class KubernetesLeaderElectionManager extends TrackedLeaderElectionManage
private final LeaderElectionCommandProvider leaderElectionCommandProvider;
private final String roleIdPrefix;
/**
* Kubernetes Leader Election Manager default constructor
* Kubernetes Leader Election Manager constructor with NiFi Properties
*/
public KubernetesLeaderElectionManager() {
public KubernetesLeaderElectionManager(final NiFiProperties nifiProperties) {
this.roleIdPrefix = nifiProperties.getProperty(NiFiProperties.CLUSTER_LEADER_ELECTION_KUBERNETES_LEASE_PREFIX);
executorService = createExecutorService();
leaderElectionCommandProvider = createLeaderElectionCommandProvider();
}
@ -285,7 +289,7 @@ public class KubernetesLeaderElectionManager extends TrackedLeaderElectionManage
if (roleId == null) {
throw new IllegalArgumentException(String.format("Role Name [%s] not supported", roleName));
}
return roleId;
return roleIdPrefix == null ? roleId : String.format("%s-%s", roleIdPrefix, roleId);
}
private static class ParticipantRegistration {

View File

@ -19,6 +19,7 @@ package org.apache.nifi.kubernetes.leader.election;
import org.apache.nifi.controller.leader.election.LeaderElectionRole;
import org.apache.nifi.controller.leader.election.LeaderElectionStateChangeListener;
import org.apache.nifi.kubernetes.leader.election.command.LeaderElectionCommandProvider;
import org.apache.nifi.util.NiFiProperties;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
@ -28,6 +29,7 @@ import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.function.Consumer;
@ -49,6 +51,8 @@ class KubernetesLeaderElectionManagerTest {
private static final String PARTICIPANT_ID = "Node-0";
private static final String PREFIX = "label";
@Mock
LeaderElectionStateChangeListener changeListener;
@ -64,11 +68,16 @@ class KubernetesLeaderElectionManagerTest {
ManagedLeaderElectionCommandProvider leaderElectionCommandProvider;
KubernetesLeaderElectionManager manager;
KubernetesLeaderElectionManager managerWithProperties;
@BeforeEach
void setManager() {
leaderElectionCommandProvider = new ManagedLeaderElectionCommandProvider();
manager = new MockKubernetesLeaderElectionManager();
manager = new MockKubernetesLeaderElectionManager(new NiFiProperties());
final Properties properties = new Properties();
properties.setProperty(NiFiProperties.CLUSTER_LEADER_ELECTION_KUBERNETES_LEASE_PREFIX, PREFIX);
managerWithProperties = new MockKubernetesLeaderElectionManager(new NiFiProperties(properties));
}
@Test
@ -184,6 +193,19 @@ class KubernetesLeaderElectionManagerTest {
assertFalse(unregisteredActiveParticipant);
}
@Test
void testRoleIdWithPrefix() {
managerWithProperties.start();
setSubmitStartLeading();
managerWithProperties.register(ROLE, changeListener, PARTICIPANT_ID);
captureRunCommand();
assertEquals(PREFIX + "-" + LEADER_ELECTION_ROLE.getRoleId(), leaderElectionCommandProvider.name);
}
private void setSubmitStartLeading() {
doReturn(future).when(executorService).submit(isA(Runnable.class));
leaderElectionCommandProvider.runStartLeading = true;
@ -223,6 +245,10 @@ class KubernetesLeaderElectionManagerTest {
}
private class MockKubernetesLeaderElectionManager extends KubernetesLeaderElectionManager {
public MockKubernetesLeaderElectionManager(NiFiProperties nifiProperties) {
super(nifiProperties);
}
@Override
protected ExecutorService createExecutorService() {
return executorService;

View File

@ -48,5 +48,10 @@
<artifactId>kubernetes-server-mock</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-expression-language</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>

View File

@ -38,8 +38,9 @@ import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.nifi.components.AbstractConfigurableComponent;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.components.state.StateMap;
import org.apache.nifi.components.state.StateProvider;
@ -47,19 +48,34 @@ import org.apache.nifi.components.state.StateProviderInitializationContext;
import org.apache.nifi.kubernetes.client.ServiceAccountNamespaceProvider;
import org.apache.nifi.kubernetes.client.StandardKubernetesClientProvider;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.util.StandardValidators;
/**
* State Provider implementation based on Kubernetes ConfigMaps with Base64 encoded keys to meet Kubernetes constraints
*/
public class KubernetesConfigMapStateProvider extends AbstractConfigurableComponent implements StateProvider {
static final PropertyDescriptor CONFIG_MAP_NAME_PREFIX = new PropertyDescriptor.Builder()
.name("ConfigMap Name Prefix")
.description("Optional prefix that the Provider will prepend to Kubernetes ConfigMap names. The resulting ConfigMap name will contain nifi-component and the component identifier.")
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
.required(false)
.build();
private static final List<PropertyDescriptor> PROPERTIES = List.of(CONFIG_MAP_NAME_PREFIX);
private static final int MAX_UPDATE_ATTEMPTS = 5;
private static final Scope[] SUPPORTED_SCOPES = { Scope.CLUSTER };
private static final Charset KEY_CHARACTER_SET = StandardCharsets.UTF_8;
private static final String CONFIG_MAP_NAME_FORMAT = "nifi-component-%s";
private static final String CONFIG_MAP_NAME_FORMAT = "%snifi-component-%%s";
private static final Pattern CONFIG_MAP_NAME_PATTERN = Pattern.compile("^nifi-component-(.+)$");
private static final String CONFIG_MAP_NAME_PATTERN_FORMAT = "^%snifi-component-(.+)$";
private static final String PREFIX_SEPARATOR = "-";
private static final String EMPTY_PREFIX = "";
private static final int COMPONENT_ID_GROUP = 1;
@ -70,6 +86,10 @@ public class KubernetesConfigMapStateProvider extends AbstractConfigurableCompon
private final AtomicBoolean enabled = new AtomicBoolean();
private String configMapNameFormat;
private Pattern configMapNamePattern;
private KubernetesClient kubernetesClient;
private String namespace;
@ -78,6 +98,11 @@ public class KubernetesConfigMapStateProvider extends AbstractConfigurableCompon
private ComponentLog logger;
@Override
public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return PROPERTIES;
}
/**
* Get configured component identifier
*
@ -99,6 +124,12 @@ public class KubernetesConfigMapStateProvider extends AbstractConfigurableCompon
this.logger = context.getLogger();
this.kubernetesClient = getKubernetesClient();
this.namespace = new ServiceAccountNamespaceProvider().getNamespace();
final PropertyValue configMapNamePrefixProperty = context.getProperty(CONFIG_MAP_NAME_PREFIX);
final String configMapNamePrefix = configMapNamePrefixProperty.isSet() ? configMapNamePrefixProperty.getValue() + PREFIX_SEPARATOR : EMPTY_PREFIX;
configMapNameFormat = String.format(CONFIG_MAP_NAME_FORMAT, configMapNamePrefix);
configMapNamePattern = Pattern.compile(String.format(CONFIG_MAP_NAME_PATTERN_FORMAT, configMapNamePrefix));
}
/**
@ -331,10 +362,10 @@ public class KubernetesConfigMapStateProvider extends AbstractConfigurableCompon
return configMapList.getItems().stream()
.map(ConfigMap::getMetadata)
.map(ObjectMeta::getName)
.map(CONFIG_MAP_NAME_PATTERN::matcher)
.map(configMapNamePattern::matcher)
.filter(Matcher::matches)
.map(matcher -> matcher.group(COMPONENT_ID_GROUP))
.collect(Collectors.toUnmodifiableList());
.toList();
}
/**
@ -363,7 +394,7 @@ public class KubernetesConfigMapStateProvider extends AbstractConfigurableCompon
}
private String getConfigMapName(final String componentId) {
return String.format(CONFIG_MAP_NAME_FORMAT, componentId);
return String.format(configMapNameFormat, componentId);
}
private Optional<String> getVersion(final ConfigMap configMap) {

View File

@ -28,11 +28,13 @@ import org.apache.nifi.components.state.Scope;
import org.apache.nifi.components.state.StateMap;
import org.apache.nifi.components.state.StateProviderInitializationContext;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.parameter.ParameterLookup;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.apache.nifi.attribute.expression.language.StandardPropertyValue;
import java.io.IOException;
import java.util.Collection;
@ -68,6 +70,8 @@ class KubernetesConfigMapStateProviderTest {
private static final String STATE_VALUE = "now";
private static final String CONFIG_MAP_NAME_PREFIX_VALUE = "label";
@Mock
StateProviderInitializationContext context;
@ -293,9 +297,53 @@ class KubernetesConfigMapStateProviderTest {
assertFalse(replaced2);
}
@Test
void testSetStateGetStateWithPrefix() throws IOException {
setContextWithProperties();
provider.initialize(context);
final Map<String, String> state = Collections.singletonMap(STATE_PROPERTY, STATE_VALUE);
provider.setState(state, COMPONENT_ID);
final StateMap stateMap = provider.getState(COMPONENT_ID);
assertNotNull(stateMap);
final Map<String, String> stateRetrieved = stateMap.toMap();
assertEquals(state, stateRetrieved);
assertConfigMapFound();
}
@Test
void testSetStateGetStoredComponentIdsWithPrefix() throws IOException {
setContextWithProperties();
provider.initialize(context);
final Collection<String> initialStoredComponentIds = provider.getStoredComponentIds();
assertTrue(initialStoredComponentIds.isEmpty());
final Map<String, String> state = Collections.singletonMap(STATE_PROPERTY, STATE_VALUE);
provider.setState(state, COMPONENT_ID);
final Collection<String> storedComponentIds = provider.getStoredComponentIds();
final Iterator<String> componentIds = storedComponentIds.iterator();
assertTrue(componentIds.hasNext());
assertEquals(COMPONENT_ID, componentIds.next());
}
private void setContext() {
when(context.getIdentifier()).thenReturn(IDENTIFIER);
when(context.getLogger()).thenReturn(logger);
when(context.getProperty(KubernetesConfigMapStateProvider.CONFIG_MAP_NAME_PREFIX))
.thenReturn(new StandardPropertyValue(null, null, ParameterLookup.EMPTY));
}
private void setContextWithProperties() {
setContext();
when(context.getProperty(KubernetesConfigMapStateProvider.CONFIG_MAP_NAME_PREFIX))
.thenReturn(new StandardPropertyValue(CONFIG_MAP_NAME_PREFIX_VALUE, null, ParameterLookup.EMPTY));
}
private void assertStateEquals(final Map<String, String> expected, final StateMap stateMap) {

View File

@ -270,6 +270,8 @@
<nifi.python.max.processes.per.extension.type>10</nifi.python.max.processes.per.extension.type>
<nifi.python.logs.dir>./logs</nifi.python.logs.dir>
<nifi.cluster.leader.election.kubernetes.lease.prefix />
<nifi.performance.tracking.percentage>0</nifi.performance.tracking.percentage>
</properties>
<build>

View File

@ -335,6 +335,9 @@ nifi.analytics.connection.model.implementation=${nifi.analytics.connection.model
nifi.analytics.connection.model.score.name=${nifi.analytics.connection.model.score.name}
nifi.analytics.connection.model.score.threshold=${nifi.analytics.connection.model.score.threshold}
# kubernetes #
nifi.cluster.leader.election.kubernetes.lease.prefix=${nifi.cluster.leader.election.kubernetes.lease.prefix}
# flow analysis properties
nifi.flow.analysis.background.task.schedule=${nifi.flow.analysis.background.task.schedule}

View File

@ -64,10 +64,15 @@
<property name="Access Control">Open</property>
</cluster-provider>
<!-- Kubernetes ConfigMap implementation of State Provider -->
<!--
Kubernetes ConfigMap implementation of State Provider. This Provider has the following optional properties:
ConfigMap Name Prefix - Optional prefix that the Provider will prepend to Kubernetes ConfigMap names. The resulting ConfigMap name will contain nifi-component and the component identifier.
-->
<cluster-provider>
<id>kubernetes-provider</id>
<class>org.apache.nifi.kubernetes.state.provider.KubernetesConfigMapStateProvider</class>
<property name="ConfigMap Name Prefix"></property>
</cluster-provider>
<!--