kubernetes based discovery druid extension to run Druid on K8S without Zookeeper (#10544)

* honor zk enablement config in more places in druid code

* kubernetes based discovery module

* fix spotbugs check

* fix intellij checks error

* fix doc link to kubernetes.md from extension

* make spellchecker happy

* update license.yaml

* fix dependency check errors

* update extension coverage

* UTs for BaseNodeRoleWatcher

* fix forbidden-api check

* update k8s module coverage ignores

* add Bouncy Castle License being same as MIT License for license checking purposes

* further update licenses.yaml

* label/annotation pre-existence assumption

* address review comment
This commit is contained in:
Himanshu 2020-12-14 21:10:31 -08:00 committed by GitHub
parent c2e26d2e1c
commit ac1882bf74
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
38 changed files with 3910 additions and 179 deletions

View File

@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.annotations;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
/**
* Annotation for suppressing spotbugs checks when necessary.
*/
@Retention(RetentionPolicy.CLASS)
public @interface SuppressFBWarnings
{
/**
* The set of FindBugs warnings that are to be suppressed in
* annotated element. The value can be a bug category, kind or pattern.
*
*/
String[] value() default {};
/**
* Optional documentation of the reason why the warning is suppressed
*/
String justification() default "";
}

View File

@ -277,6 +277,7 @@ def build_compatible_license_names():
compatible_licenses['The MIT License'] = 'MIT License'
compatible_licenses['MIT License'] = 'MIT License'
compatible_licenses['The MIT License (MIT)'] = 'MIT License'
compatible_licenses['Bouncy Castle Licence'] = 'MIT License'
compatible_licenses['-'] = '-'
return compatible_licenses

View File

@ -243,6 +243,8 @@
<argument>org.apache.druid.extensions:druid-pac4j</argument>
<argument>-c</argument>
<argument>org.apache.druid.extensions:druid-ranger-security</argument>
<argument>-c</argument>
<argument>org.apache.druid.extensions:druid-kubernetes-extensions</argument>
<argument>${druid.distribution.pulldeps.opts}</argument>
</arguments>
</configuration>

View File

@ -0,0 +1,59 @@
---
id: druid-kubernetes
title: "Kubernetes"
---
<!--
~ Licensed to the Apache Software Foundation (ASF) under one
~ or more contributor license agreements. See the NOTICE file
~ distributed with this work for additional information
~ regarding copyright ownership. The ASF licenses this file
~ to you under the Apache License, Version 2.0 (the
~ "License"); you may not use this file except in compliance
~ with the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing,
~ software distributed under the License is distributed on an
~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
~ KIND, either express or implied. See the License for the
~ specific language governing permissions and limitations
~ under the License.
-->
Consider this an [EXPERIMENTAL](../experimental.md) feature mostly because it has not been tested yet on a wide variety of long running Druid clusters.
Apache Druid Extension to enable using Kubernetes API Server for node discovery and leader election. This extension allows Druid cluster deployment on Kubernetes without Zookeeper. It allows running multiple Druid clusters within same Kubernetes Cluster, See `clusterIdentifier` config below.
## Configuration
To use this extension please make sure to [include](../../development/extensions.md#loading-extensions) `druid-kubernetes-extensions` as an extension.
This extension works together with HTTP based segment and task management in Druid. Consequently, following configurations must be set on all Druid nodes.
`druid.zk.service.enabled=false`
`druid.serverview.type=http`
`druid.coordinator.loadqueuepeon.type=http`
`druid.indexer.runner.type=httpRemote`
`druid.discovery.type=k8s`
For Node Discovery, Each Druid process running inside a pod "announces" itself by adding few "labels" and "annotations" in the pod spec. So, to add those...
- Druid process needs to be aware of pod name and namespace which it reads from environment variables `POD_NAME` and `POD_NAMESPACE`. These variable names can be changed, see configuration below. But in the end, each pod needs to have pod name and namespace added as environment variables.
- Label/Annotation path in the pod spec must exist, which is easily satisfied if there is at least one label/annotation in the pod spec already. This limitation may be removed in future.
Additionally, this extension has following configuration.
### Properties
|Property|Possible Values|Description|Default|required|
|--------|---------------|-----------|-------|--------|
|`druid.discovery.k8s.clusterIdentifier`|`string that matches [a-z0-9][a-z0-9-]*[a-z0-9]`|Unique identifier for this Druid cluster in Kubernetes e.g. us-west-prod-druid.|None|Yes|
|`druid.discovery.k8s.podNameEnvKey`|`Pod Env Variable`|Pod Env variable whose value is that pod's name.|POD_NAME|No|
|`druid.discovery.k8s.podNamespaceEnvKey`|`Pod Env Variable`|Pod Env variable whose value is that pod's kubernetes namespace.|POD_NAMESPACE|No|
|`druid.discovery.k8s.coordinatorLeaderElectionConfigMapNamespace`|`k8s namespace`|Leader election algorithm requires creating a ConfigMap resource in a namespace. This MUST only be provided if different coordinator pods run in different namespaces, such setup is discouraged however.|coordinator pod's namespace|No|
|`druid.discovery.k8s.overlordLeaderElectionConfigMapNamespace`|`k8s namespace`|Leader election algorithm requires creating a ConfigMap resource in a namespace. This MUST only be provided if different overlord pods run in different namespaces, such setup is discouraged however.|overlord pod's namespace|No|
|`druid.discovery.k8s.leaseDuration`|`Duration`|Lease duration used by Leader Election algorithm. Candidates wait for this time before taking over previous Leader.|PT60S|No|
|`druid.discovery.k8s.renewDeadline`|`Duration`|Lease renewal period used by Leader.|PT17S|No|
|`druid.discovery.k8s.retryPeriod`|`Duration`|Retry wait used by Leader Election algorithm on failed operations.|PT5S|No|

View File

@ -30,3 +30,5 @@ $docker pull apache/druid:0.16.0-incubating
```
[druid-operator](https://github.com/druid-io/druid-operator) can be used to manage a Druid cluster on [Kubernetes](https://kubernetes.io/) .
Druid clusters deployed on Kubernetes can function without Zookeeper using [druidkubernetes-extensions](../development/extensions-core/kubernetes.md) .

View File

@ -0,0 +1,152 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one
~ or more contributor license agreements. See the NOTICE file
~ distributed with this work for additional information
~ regarding copyright ownership. The ASF licenses this file
~ to you under the Apache License, Version 2.0 (the
~ "License"); you may not use this file except in compliance
~ with the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing,
~ software distributed under the License is distributed on an
~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
~ KIND, either express or implied. See the License for the
~ specific language governing permissions and limitations
~ under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.apache.druid.extensions</groupId>
<artifactId>druid-kubernetes-extensions</artifactId>
<name>druid-kubernetes-extensions</name>
<description>druid-kubernetes-extensions</description>
<parent>
<groupId>org.apache.druid</groupId>
<artifactId>druid</artifactId>
<version>0.21.0-SNAPSHOT</version>
<relativePath>../../pom.xml</relativePath>
</parent>
<properties>
<kubernetes.client.version>10.0.0</kubernetes.client.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.druid</groupId>
<artifactId>druid-server</artifactId>
<version>${project.parent.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.druid</groupId>
<artifactId>druid-core</artifactId>
<version>${project.parent.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.druid</groupId>
<artifactId>druid-processing</artifactId>
<version>${project.parent.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>io.kubernetes</groupId>
<artifactId>client-java</artifactId>
<version>${kubernetes.client.version}</version>
</dependency>
<dependency>
<groupId>io.kubernetes</groupId>
<artifactId>client-java-extended</artifactId>
<version>${kubernetes.client.version}</version>
</dependency>
<dependency>
<groupId>io.kubernetes</groupId>
<artifactId>client-java-api</artifactId>
<version>${kubernetes.client.version}</version>
</dependency>
<!-- Tests -->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.easymock</groupId>
<artifactId>easymock</artifactId>
<scope>test</scope>
</dependency>
<!-- others -->
<dependency>
<groupId>com.google.code.findbugs</groupId>
<artifactId>jsr305</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.google.inject</groupId>
<artifactId>guice</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.google.inject.extensions</groupId>
<artifactId>guice-multibindings</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>joda-time</groupId>
<artifactId>joda-time</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
<scope>provided</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.jacoco</groupId>
<artifactId>jacoco-maven-plugin</artifactId>
<version>0.8.5</version>
<configuration>
<excludes>
<!-- Initialization code -->
<exclude>org/apache/druid/k8s/discovery/K8sDiscoveryModule*</exclude>
<!-- K8S Api Glue, not unit testable -->
<exclude>org/apache/druid/k8s/discovery/DefaultK8sApiClient*</exclude>
<exclude>org/apache/druid/k8s/discovery/DefaultK8sLeaderElectorFactory*</exclude>
</excludes>
</configuration>
</plugin>
</plugins>
</build>
</project>

View File

@ -0,0 +1,186 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.k8s.discovery;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions;
import com.google.inject.Inject;
import io.kubernetes.client.custom.V1Patch;
import io.kubernetes.client.openapi.ApiClient;
import io.kubernetes.client.openapi.ApiException;
import io.kubernetes.client.openapi.apis.CoreV1Api;
import io.kubernetes.client.openapi.models.V1Pod;
import io.kubernetes.client.openapi.models.V1PodList;
import io.kubernetes.client.util.Watch;
import org.apache.druid.discovery.DiscoveryDruidNode;
import org.apache.druid.discovery.NodeRole;
import org.apache.druid.guice.annotations.Json;
import org.apache.druid.java.util.common.RE;
import org.apache.druid.java.util.common.logger.Logger;
import java.io.IOException;
import java.net.SocketTimeoutException;
import java.util.HashMap;
import java.util.Map;
/**
* Concrete {@link K8sApiClient} impl using k8s-client java lib.
*/
public class DefaultK8sApiClient implements K8sApiClient
{
private static final Logger LOGGER = new Logger(DefaultK8sApiClient.class);
private final ApiClient realK8sClient;
private final CoreV1Api coreV1Api;
private final ObjectMapper jsonMapper;
@Inject
public DefaultK8sApiClient(ApiClient realK8sClient, @Json ObjectMapper jsonMapper)
{
this.realK8sClient = realK8sClient;
this.coreV1Api = new CoreV1Api(realK8sClient);
this.jsonMapper = jsonMapper;
}
@Override
public void patchPod(String podName, String podNamespace, String jsonPatchStr)
{
try {
coreV1Api.patchNamespacedPod(podName, podNamespace, new V1Patch(jsonPatchStr), "true", null, null, null);
}
catch (ApiException ex) {
throw new RE(ex, "Failed to patch pod[%s/%s], code[%d], error[%s].", podNamespace, podName, ex.getCode(), ex.getResponseBody());
}
}
@Override
public DiscoveryDruidNodeList listPods(
String podNamespace,
String labelSelector,
NodeRole nodeRole
)
{
try {
V1PodList podList = coreV1Api.listNamespacedPod(podNamespace, null, null, null, null, labelSelector, 0, null, null, null);
Preconditions.checkState(podList != null, "WTH: NULL podList");
Map<String, DiscoveryDruidNode> allNodes = new HashMap();
for (V1Pod podDef : podList.getItems()) {
DiscoveryDruidNode node = getDiscoveryDruidNodeFromPodDef(nodeRole, podDef);
allNodes.put(node.getDruidNode().getHostAndPortToUse(), node);
}
return new DiscoveryDruidNodeList(podList.getMetadata().getResourceVersion(), allNodes);
}
catch (ApiException ex) {
throw new RE(ex, "Expection in listing pods, code[%d] and error[%s].", ex.getCode(), ex.getResponseBody());
}
}
private DiscoveryDruidNode getDiscoveryDruidNodeFromPodDef(NodeRole nodeRole, V1Pod podDef)
{
String jsonStr = podDef.getMetadata().getAnnotations().get(K8sDruidNodeAnnouncer.getInfoAnnotation(nodeRole));
try {
return jsonMapper.readValue(jsonStr, DiscoveryDruidNode.class);
}
catch (JsonProcessingException ex) {
throw new RE(ex, "Failed to deserialize DiscoveryDruidNode[%s]", jsonStr);
}
}
@Override
public WatchResult watchPods(String namespace, String labelSelector, String lastKnownResourceVersion, NodeRole nodeRole)
{
try {
Watch<V1Pod> watch =
Watch.createWatch(
realK8sClient,
coreV1Api.listNamespacedPodCall(namespace, null, true, null, null,
labelSelector, null, lastKnownResourceVersion, 0, true, null
),
new TypeReference<Watch.Response<V1Pod>>()
{
}.getType()
);
return new WatchResult()
{
private Watch.Response<DiscoveryDruidNodeAndResourceVersion> obj;
@Override
public boolean hasNext() throws SocketTimeoutException
{
try {
while (watch.hasNext()) {
Watch.Response<V1Pod> item = watch.next();
if (item != null && item.type != null) {
obj = new Watch.Response<DiscoveryDruidNodeAndResourceVersion>(
item.type,
new DiscoveryDruidNodeAndResourceVersion(
item.object.getMetadata().getResourceVersion(),
getDiscoveryDruidNodeFromPodDef(nodeRole, item.object)
)
);
return true;
} else {
LOGGER.error("WTH! item or item.type is NULL");
}
}
}
catch (RuntimeException ex) {
if (ex.getCause() instanceof SocketTimeoutException) {
throw (SocketTimeoutException) ex.getCause();
} else {
throw ex;
}
}
return false;
}
@Override
public Watch.Response<DiscoveryDruidNodeAndResourceVersion> next()
{
return obj;
}
@Override
public void close()
{
try {
watch.close();
}
catch (IOException ex) {
throw new RE(ex, "Exception while closing watch.");
}
}
};
}
catch (ApiException ex) {
if (ex.getCode() == 410) {
// k8s no longer has history that we need
return null;
}
throw new RE(ex, "Expection in watching pods, code[%d] and error[%s].", ex.getCode(), ex.getResponseBody());
}
}
}

View File

@ -0,0 +1,91 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.k8s.discovery;
import com.google.inject.Inject;
import io.kubernetes.client.extended.leaderelection.LeaderElectionConfig;
import io.kubernetes.client.extended.leaderelection.LeaderElector;
import io.kubernetes.client.extended.leaderelection.Lock;
import io.kubernetes.client.extended.leaderelection.resourcelock.ConfigMapLock;
import io.kubernetes.client.openapi.ApiClient;
import io.kubernetes.client.openapi.ApiException;
import org.apache.druid.java.util.common.RE;
import java.time.Duration;
/**
* Concrete {@link K8sLeaderElectorFactory} impl using k8s-client java lib.
*/
public class DefaultK8sLeaderElectorFactory implements K8sLeaderElectorFactory
{
private final ApiClient realK8sClient;
private final K8sDiscoveryConfig discoveryConfig;
@Inject
public DefaultK8sLeaderElectorFactory(ApiClient realK8sClient, K8sDiscoveryConfig discoveryConfig)
{
this.realK8sClient = realK8sClient;
this.discoveryConfig = discoveryConfig;
}
@Override
public K8sLeaderElector create(String candidateId, String namespace, String lockResourceName)
{
Lock lock = createLock(candidateId, namespace, lockResourceName, realK8sClient);
LeaderElectionConfig leaderElectionConfig =
new LeaderElectionConfig(
lock,
Duration.ofMillis(discoveryConfig.getLeaseDuration().getMillis()),
Duration.ofMillis(discoveryConfig.getRenewDeadline().getMillis()),
Duration.ofMillis(discoveryConfig.getRetryPeriod().getMillis())
);
LeaderElector leaderElector = new LeaderElector(leaderElectionConfig);
return new K8sLeaderElector()
{
@Override
public String getCurrentLeader()
{
try {
return lock.get().getHolderIdentity();
}
catch (ApiException ex) {
throw new RE(ex, "Failed to get current leader for [%s]", lockResourceName);
}
}
@Override
public void run(Runnable startLeadingHook, Runnable stopLeadingHook)
{
leaderElector.run(startLeadingHook, stopLeadingHook);
}
};
}
private Lock createLock(String candidateId, String namespace, String lockResourceName, ApiClient k8sApiClient)
{
return new ConfigMapLock(
namespace,
lockResourceName,
candidateId,
k8sApiClient
);
}
}

View File

@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.k8s.discovery;
import org.apache.druid.discovery.DiscoveryDruidNode;
public class DiscoveryDruidNodeAndResourceVersion
{
private final String resourceVersion;
private final DiscoveryDruidNode node;
public DiscoveryDruidNodeAndResourceVersion(String resourceVersion, DiscoveryDruidNode node)
{
this.resourceVersion = resourceVersion;
this.node = node;
}
public String getResourceVersion()
{
return resourceVersion;
}
public DiscoveryDruidNode getNode()
{
return node;
}
}

View File

@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.k8s.discovery;
import com.google.common.base.Preconditions;
import org.apache.druid.discovery.DiscoveryDruidNode;
import javax.annotation.Nullable;
import java.util.Collections;
import java.util.Map;
public class DiscoveryDruidNodeList
{
private final String resourceVersion;
private final Map<String, DiscoveryDruidNode> druidNodes;
public DiscoveryDruidNodeList(
String resourceVersion,
@Nullable Map<String, DiscoveryDruidNode> druidNodes
)
{
this.resourceVersion = Preconditions.checkNotNull(resourceVersion, "NULL resource version!");
this.druidNodes = druidNodes == null ? Collections.emptyMap() : druidNodes;
}
public String getResourceVersion()
{
return resourceVersion;
}
public Map<String, DiscoveryDruidNode> getDruidNodes()
{
return druidNodes;
}
}

View File

@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.k8s.discovery;
import org.apache.druid.discovery.NodeRole;
/**
* Interface to abstract pod read/update with K8S API Server to allow unit tests with mock impl.
*/
public interface K8sApiClient
{
void patchPod(String podName, String namespace, String jsonPatchStr);
DiscoveryDruidNodeList listPods(String namespace, String labelSelector, NodeRole nodeRole);
/**
* @return NULL if history not available or else return the {@link WatchResult} object
*/
WatchResult watchPods(String namespace, String labelSelector, String lastKnownResourceVersion, NodeRole nodeRole);
}

View File

@ -0,0 +1,205 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.k8s.discovery;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import org.apache.druid.java.util.common.logger.Logger;
import org.joda.time.Duration;
import javax.annotation.Nonnull;
import java.util.Objects;
import java.util.regex.Pattern;
public class K8sDiscoveryConfig
{
private static final Logger LOGGER = new Logger(K8sDiscoveryConfig.class);
public static final Pattern K8S_RESOURCE_NAME_REGEX = Pattern.compile("[a-z0-9][a-z0-9-]*[a-z0-9]");
@JsonProperty
@Nonnull
private final String clusterIdentifier;
@JsonProperty
private final String podNameEnvKey;
@JsonProperty
private final String podNamespaceEnvKey;
@JsonProperty
private final String coordinatorLeaderElectionConfigMapNamespace;
@JsonProperty
private final String overlordLeaderElectionConfigMapNamespace;
@JsonProperty
private final Duration leaseDuration;
@JsonProperty
private final Duration renewDeadline;
@JsonProperty
private final Duration retryPeriod;
@JsonCreator
public K8sDiscoveryConfig(
@JsonProperty("clusterIdentifier") String clusterIdentifier,
@JsonProperty("podNameEnvKey") String podNameEnvKey,
@JsonProperty("podNamespaceEnvKey") String podNamespaceEnvKey,
@JsonProperty("coordinatorLeaderElectionConfigMapNamespace") String coordinatorLeaderElectionConfigMapNamespace,
@JsonProperty("overlordLeaderElectionConfigMapNamespace") String overlordLeaderElectionConfigMapNamespace,
@JsonProperty("leaseDuration") Duration leaseDuration,
@JsonProperty("renewDeadline") Duration renewDeadline,
@JsonProperty("retryPeriod") Duration retryPeriod
)
{
Preconditions.checkArgument(clusterIdentifier != null && !clusterIdentifier.isEmpty(), "null/empty clusterIdentifier");
Preconditions.checkArgument(
K8S_RESOURCE_NAME_REGEX.matcher(clusterIdentifier).matches(),
"clusterIdentifier[%s] is used in k8s resource name and must match regex[%s]",
clusterIdentifier,
K8S_RESOURCE_NAME_REGEX.pattern()
);
this.clusterIdentifier = clusterIdentifier;
this.podNameEnvKey = podNameEnvKey == null ? "POD_NAME" : podNameEnvKey;
this.podNamespaceEnvKey = podNamespaceEnvKey == null ? "POD_NAMESPACE" : podNamespaceEnvKey;
if (coordinatorLeaderElectionConfigMapNamespace == null) {
LOGGER.warn("IF coordinator pods run in multiple namespaces, then you MUST provide coordinatorLeaderElectionConfigMapNamespace");
}
this.coordinatorLeaderElectionConfigMapNamespace = coordinatorLeaderElectionConfigMapNamespace;
if (overlordLeaderElectionConfigMapNamespace == null) {
LOGGER.warn("IF overlord pods run in multiple namespaces, then you MUST provide overlordLeaderElectionConfigMapNamespace");
}
this.overlordLeaderElectionConfigMapNamespace = overlordLeaderElectionConfigMapNamespace;
this.leaseDuration = leaseDuration == null ? Duration.millis(60000) : leaseDuration;
this.renewDeadline = renewDeadline == null ? Duration.millis(17000) : renewDeadline;
this.retryPeriod = retryPeriod == null ? Duration.millis(5000) : retryPeriod;
}
@JsonProperty
public String getClusterIdentifier()
{
return clusterIdentifier;
}
@JsonProperty
public String getPodNameEnvKey()
{
return podNameEnvKey;
}
@JsonProperty
public String getPodNamespaceEnvKey()
{
return podNamespaceEnvKey;
}
@JsonProperty
public String getCoordinatorLeaderElectionConfigMapNamespace()
{
return coordinatorLeaderElectionConfigMapNamespace;
}
@JsonProperty
public String getOverlordLeaderElectionConfigMapNamespace()
{
return overlordLeaderElectionConfigMapNamespace;
}
@JsonProperty
public Duration getLeaseDuration()
{
return leaseDuration;
}
@JsonProperty
public Duration getRenewDeadline()
{
return renewDeadline;
}
@JsonProperty
public Duration getRetryPeriod()
{
return retryPeriod;
}
@Override
public String toString()
{
return "K8sDiscoveryConfig{" +
"clusterIdentifier='" + clusterIdentifier + '\'' +
", podNameEnvKey='" + podNameEnvKey + '\'' +
", podNamespaceEnvKey='" + podNamespaceEnvKey + '\'' +
", coordinatorLeaderElectionConfigMapNamespace='" + coordinatorLeaderElectionConfigMapNamespace + '\'' +
", overlordLeaderElectionConfigMapNamespace='" + overlordLeaderElectionConfigMapNamespace + '\'' +
", leaseDuration=" + leaseDuration +
", renewDeadline=" + renewDeadline +
", retryPeriod=" + retryPeriod +
'}';
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
K8sDiscoveryConfig that = (K8sDiscoveryConfig) o;
return clusterIdentifier.equals(that.clusterIdentifier) &&
Objects.equals(podNameEnvKey, that.podNameEnvKey) &&
Objects.equals(podNamespaceEnvKey, that.podNamespaceEnvKey) &&
Objects.equals(
coordinatorLeaderElectionConfigMapNamespace,
that.coordinatorLeaderElectionConfigMapNamespace
) &&
Objects.equals(
overlordLeaderElectionConfigMapNamespace,
that.overlordLeaderElectionConfigMapNamespace
) &&
Objects.equals(leaseDuration, that.leaseDuration) &&
Objects.equals(renewDeadline, that.renewDeadline) &&
Objects.equals(retryPeriod, that.retryPeriod);
}
@Override
public int hashCode()
{
return Objects.hash(
clusterIdentifier,
podNameEnvKey,
podNamespaceEnvKey,
coordinatorLeaderElectionConfigMapNamespace,
overlordLeaderElectionConfigMapNamespace,
leaseDuration,
renewDeadline,
retryPeriod
);
}
}

View File

@ -0,0 +1,152 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.k8s.discovery;
import com.fasterxml.jackson.databind.Module;
import com.google.inject.Binder;
import com.google.inject.Inject;
import com.google.inject.Key;
import com.google.inject.Provider;
import io.kubernetes.client.openapi.ApiClient;
import io.kubernetes.client.util.Config;
import org.apache.druid.client.coordinator.Coordinator;
import org.apache.druid.client.indexing.IndexingService;
import org.apache.druid.discovery.DruidLeaderSelector;
import org.apache.druid.discovery.DruidNodeAnnouncer;
import org.apache.druid.discovery.DruidNodeDiscoveryProvider;
import org.apache.druid.guice.JsonConfigProvider;
import org.apache.druid.guice.LazySingleton;
import org.apache.druid.guice.PolyBind;
import org.apache.druid.guice.annotations.Self;
import org.apache.druid.initialization.DruidModule;
import org.apache.druid.server.DruidNode;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
public class K8sDiscoveryModule implements DruidModule
{
private static final String K8S_KEY = "k8s";
@Override
public List<? extends Module> getJacksonModules()
{
return Collections.emptyList();
}
@Override
public void configure(Binder binder)
{
JsonConfigProvider.bind(binder, "druid.discovery.k8s", K8sDiscoveryConfig.class);
binder.bind(ApiClient.class)
.toProvider(
() -> {
try {
// Note: we can probably improve things here about figuring out how to find the K8S API server,
// HTTP client timeouts etc.
return Config.defaultClient();
}
catch (IOException ex) {
throw new RuntimeException("Failed to create K8s ApiClient instance", ex);
}
}
)
.in(LazySingleton.class);
binder.bind(K8sApiClient.class).to(DefaultK8sApiClient.class).in(LazySingleton.class);
binder.bind(K8sLeaderElectorFactory.class).to(DefaultK8sLeaderElectorFactory.class).in(LazySingleton.class);
PolyBind.optionBinder(binder, Key.get(DruidNodeDiscoveryProvider.class))
.addBinding(K8S_KEY)
.to(K8sDruidNodeDiscoveryProvider.class)
.in(LazySingleton.class);
PolyBind.optionBinder(binder, Key.get(DruidNodeAnnouncer.class))
.addBinding(K8S_KEY)
.to(K8sDruidNodeAnnouncer.class)
.in(LazySingleton.class);
PolyBind.optionBinder(binder, Key.get(DruidLeaderSelector.class, Coordinator.class))
.addBinding(K8S_KEY)
.toProvider(
new DruidLeaderSelectorProvider(true)
)
.in(LazySingleton.class);
PolyBind.optionBinder(binder, Key.get(DruidLeaderSelector.class, IndexingService.class))
.addBinding(K8S_KEY)
.toProvider(
new DruidLeaderSelectorProvider(false)
)
.in(LazySingleton.class);
}
private static class DruidLeaderSelectorProvider implements Provider<DruidLeaderSelector>
{
@Inject
@Self
private DruidNode druidNode;
@Inject
private PodInfo podInfo;
@Inject
private K8sDiscoveryConfig discoveryConfig;
@Inject
private Provider<ApiClient> k8sApiClientProvider;
private boolean isCoordinator;
DruidLeaderSelectorProvider(boolean isCoordinator)
{
this.isCoordinator = isCoordinator;
}
@Override
public DruidLeaderSelector get()
{
// Note: these can not be setup in the constructor because injected K8sDiscoveryConfig and PodInfo
// are not available at that time.
String lockResourceName;
String lockResourceNamespace;
if (isCoordinator) {
lockResourceName = discoveryConfig.getClusterIdentifier() + "-leaderelection-coordinator";
lockResourceNamespace = discoveryConfig.getCoordinatorLeaderElectionConfigMapNamespace() == null ?
podInfo.getPodNamespace() : discoveryConfig.getCoordinatorLeaderElectionConfigMapNamespace();
} else {
lockResourceName = discoveryConfig.getClusterIdentifier() + "-leaderelection-overlord";
lockResourceNamespace = discoveryConfig.getOverlordLeaderElectionConfigMapNamespace() == null ?
podInfo.getPodNamespace() : discoveryConfig.getOverlordLeaderElectionConfigMapNamespace();
}
return new K8sDruidLeaderSelector(
druidNode,
lockResourceName,
lockResourceNamespace,
discoveryConfig,
new DefaultK8sLeaderElectorFactory(k8sApiClientProvider.get(), discoveryConfig)
);
}
}
}

View File

@ -0,0 +1,152 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.k8s.discovery;
import com.google.common.base.Preconditions;
import org.apache.druid.annotations.SuppressFBWarnings;
import org.apache.druid.concurrent.LifecycleLock;
import org.apache.druid.discovery.DruidLeaderSelector;
import org.apache.druid.guice.annotations.Self;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.guava.CloseQuietly;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.server.DruidNode;
import javax.annotation.Nullable;
public class K8sDruidLeaderSelector implements DruidLeaderSelector
{
private static final EmittingLogger LOGGER = new EmittingLogger(K8sDruidLeaderSelector.class);
private final LifecycleLock lifecycleLock = new LifecycleLock();
private DruidLeaderSelector.Listener listener = null;
private final LeaderElectorAsyncWrapper leaderLatch;
private volatile boolean leader = false;
@SuppressFBWarnings(value = "VO_VOLATILE_INCREMENT", justification = "incremented but in single thread")
private volatile int term = 0;
public K8sDruidLeaderSelector(@Self DruidNode self, String lockResourceName, String lockResourceNamespace, K8sDiscoveryConfig discoveryConfig, K8sLeaderElectorFactory k8sLeaderElectorFactory)
{
this.leaderLatch = new LeaderElectorAsyncWrapper(
self.getServiceScheme() + "://" + self.getHostAndPortToUse(),
lockResourceName,
lockResourceNamespace,
discoveryConfig,
k8sLeaderElectorFactory
);
}
private void startLeaderElector(LeaderElectorAsyncWrapper leaderElector)
{
leaderElector.run(
() -> {
try {
if (leader) {
LOGGER.warn("I'm being asked to become leader. But I am already the leader. Ignored event.");
return;
}
leader = true;
term++;
listener.becomeLeader();
}
catch (Throwable ex) {
LOGGER.makeAlert(ex, "listener becomeLeader() failed. Unable to become leader").emit();
CloseQuietly.close(leaderLatch);
leader = false;
//Exit and Kubernetes would simply create a new replacement pod.
System.exit(1);
}
},
() -> {
try {
if (!leader) {
LOGGER.warn("I'm being asked to stop being leader. But I am not the leader. Ignored event.");
return;
}
leader = false;
listener.stopBeingLeader();
}
catch (Throwable ex) {
LOGGER.makeAlert(ex, "listener.stopBeingLeader() failed. Unable to stopBeingLeader").emit();
}
}
);
}
@Nullable
@Override
public String getCurrentLeader()
{
try {
return leaderLatch.getCurrentLeader();
}
catch (Exception e) {
throw new RuntimeException(e);
}
}
@Override
public boolean isLeader()
{
return leader;
}
@Override
public int localTerm()
{
return term;
}
@Override
public void registerListener(DruidLeaderSelector.Listener listener)
{
Preconditions.checkArgument(listener != null, "listener is null.");
if (!lifecycleLock.canStart()) {
throw new ISE("can't start.");
}
try {
this.listener = listener;
startLeaderElector(leaderLatch);
lifecycleLock.started();
}
catch (Exception ex) {
throw new RuntimeException(ex);
}
finally {
lifecycleLock.exitStart();
}
}
@Override
public void unregisterListener()
{
if (!lifecycleLock.canStop()) {
throw new ISE("can't stop.");
}
CloseQuietly.close(leaderLatch);
}
}

View File

@ -0,0 +1,266 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.k8s.discovery;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import com.google.inject.Inject;
import org.apache.druid.discovery.DiscoveryDruidNode;
import org.apache.druid.discovery.DruidNodeAnnouncer;
import org.apache.druid.discovery.NodeRole;
import org.apache.druid.guice.annotations.Json;
import org.apache.druid.java.util.common.RE;
import org.apache.druid.java.util.common.RetryUtils;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.server.DruidNode;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
/**
* Announcement creates following in the pod def...
*
* Labels -
* druidDiscoveryAnnouncement-<nodeRole.getJsonName()> = true
* druidDiscoveryAnnouncement-id = encodeHostPort(host:port)
* druidDiscoveryAnnouncement-cluster-identifier = <clusterIdentifier>
*
* Annotation -
* druidNodeInfo-<nodeRole.getJsonName()> = json_serialize(DiscoveryDruidNode)
*
* Note that, a node can have multiple roles e.g. coordinator can take up overlord's role as well.
*/
public class K8sDruidNodeAnnouncer implements DruidNodeAnnouncer
{
private static final Logger LOGGER = new Logger(K8sDruidNodeAnnouncer.class);
private static String POD_LABELS_PATH_PREFIX = "/metadata/labels";
private static String POD_ANNOTATIONS_PATH_PREFIX = "/metadata/annotations";
private static final String OP_ADD = "add";
private static final String OP_REMOVE = "remove";
public static final String ANNOUNCEMENT_DONE = "true";
private final ObjectMapper jsonMapper;
private final K8sDiscoveryConfig discoveryConfig;
private final PodInfo podInfo;
private final K8sApiClient k8sApiClient;
@Inject
public K8sDruidNodeAnnouncer(
PodInfo podInfo,
K8sDiscoveryConfig discoveryConfig,
K8sApiClient k8sApiClient,
@Json ObjectMapper jsonMapper
)
{
this.discoveryConfig = discoveryConfig;
this.podInfo = podInfo;
this.k8sApiClient = k8sApiClient;
this.jsonMapper = jsonMapper;
}
@Override
public void announce(DiscoveryDruidNode discoveryDruidNode)
{
LOGGER.info("Announcing DiscoveryDruidNode[%s]", discoveryDruidNode);
String roleAnnouncementLabel = getRoleAnnouncementLabel(discoveryDruidNode.getNodeRole());
String idAnnouncementLabel = getIdAnnouncementLabel();
String clusterIdentifierAnnouncementLabel = getClusterIdentifierAnnouncementLabel();
String infoAnnotation = getInfoAnnotation(discoveryDruidNode.getNodeRole());
try {
List<Map<String, Object>> patches = new ArrayList<>();
// Note: We assume here that at least one label and annotation exists on the pod already, so that
// paths where labels/annotations are created, pre-exist.
// See https://github.com/kubernetes-sigs/kustomize/issues/2986 , we can add workaround of getting pod spec,
// checking if label/annotation path exists and create if not, however that could lead to race conditions
// so assuming the existence for now.
patches.add(createPatchObj(OP_ADD, getPodDefLabelPath(roleAnnouncementLabel), ANNOUNCEMENT_DONE));
patches.add(createPatchObj(OP_ADD, getPodDefLabelPath(idAnnouncementLabel), encodeHostPort(discoveryDruidNode.getDruidNode().getHostAndPortToUse())));
patches.add(createPatchObj(OP_ADD, getPodDefLabelPath(clusterIdentifierAnnouncementLabel), discoveryConfig.getClusterIdentifier()));
patches.add(createPatchObj(OP_ADD, getPodDefAnnocationPath(infoAnnotation), jsonMapper.writeValueAsString(discoveryDruidNode)));
// Creating patch string outside of retry block to not retry json serialization failures
String jsonPatchStr = jsonMapper.writeValueAsString(patches);
LOGGER.info("Json Patch For Node Announcement: [%s]", jsonPatchStr);
RetryUtils.retry(
() -> {
k8sApiClient.patchPod(podInfo.getPodName(), podInfo.getPodNamespace(), jsonPatchStr);
return "na";
},
(throwable) -> true,
3
);
LOGGER.info("Announced DiscoveryDruidNode[%s]", discoveryDruidNode);
}
catch (Exception ex) {
throw new RE(ex, "Failed to announce DiscoveryDruidNode[%s]", discoveryDruidNode);
}
}
@Override
public void unannounce(DiscoveryDruidNode discoveryDruidNode)
{
LOGGER.info("Unannouncing DiscoveryDruidNode[%s]", discoveryDruidNode);
String roleAnnouncementLabel = getRoleAnnouncementLabel(discoveryDruidNode.getNodeRole());
String idAnnouncementLabel = getIdAnnouncementLabel();
String clusterIdentifierAnnouncementLabel = getClusterIdentifierAnnouncementLabel();
String infoAnnotation = getInfoAnnotation(discoveryDruidNode.getNodeRole());
try {
List<Map<String, Object>> patches = new ArrayList<>();
patches.add(createPatchObj(OP_REMOVE, getPodDefLabelPath(roleAnnouncementLabel), null));
patches.add(createPatchObj(OP_REMOVE, getPodDefLabelPath(idAnnouncementLabel), null));
patches.add(createPatchObj(OP_REMOVE, getPodDefLabelPath(clusterIdentifierAnnouncementLabel), null));
patches.add(createPatchObj(OP_REMOVE, getPodDefAnnocationPath(infoAnnotation), null));
// Creating patch string outside of retry block to not retry json serialization failures
String jsonPatchStr = jsonMapper.writeValueAsString(patches);
RetryUtils.retry(
() -> {
k8sApiClient.patchPod(podInfo.getPodName(), podInfo.getPodNamespace(), jsonPatchStr);
return "na";
},
(throwable) -> true,
3
);
LOGGER.info("Unannounced DiscoveryDruidNode[%s]", discoveryDruidNode);
}
catch (Exception ex) {
// Unannouncement happens when druid process is shutting down, there is no point throwing exception
// in shutdown sequence.
if (ex instanceof InterruptedException) {
Thread.currentThread().interrupt();
}
LOGGER.error(ex, "Failed to unannounce DiscoveryDruidNode[%s]", discoveryDruidNode);
}
}
private Map<String, Object> createPatchObj(String op, String path, Object value)
{
if (value == null) {
return ImmutableMap.of(
"op", op,
"path", path
);
} else {
return ImmutableMap.of(
"op", op,
"path", path,
"value", value
);
}
}
public static String getRoleAnnouncementLabel(NodeRole nodeRole)
{
return StringUtils.format("druidDiscoveryAnnouncement-%s", nodeRole.getJsonName());
}
private static String getIdAnnouncementLabel()
{
return "druidDiscoveryAnnouncement-id";
}
public static String getClusterIdentifierAnnouncementLabel()
{
return "druidDiscoveryAnnouncement-cluster-identifier";
}
public static String getInfoAnnotation(NodeRole nodeRole)
{
return StringUtils.format("druidNodeInfo-%s", nodeRole.getJsonName());
}
public static String getLabelSelectorForNodeRole(K8sDiscoveryConfig discoveryConfig, NodeRole nodeRole)
{
return StringUtils.format(
"%s=%s,%s=%s",
getClusterIdentifierAnnouncementLabel(),
discoveryConfig.getClusterIdentifier(),
K8sDruidNodeAnnouncer.getRoleAnnouncementLabel(nodeRole),
K8sDruidNodeAnnouncer.ANNOUNCEMENT_DONE
);
}
public static String getLabelSelectorForNode(K8sDiscoveryConfig discoveryConfig, NodeRole nodeRole, DruidNode node)
{
return StringUtils.format(
"%s=%s,%s=%s,%s=%s",
getClusterIdentifierAnnouncementLabel(),
discoveryConfig.getClusterIdentifier(),
K8sDruidNodeAnnouncer.getRoleAnnouncementLabel(nodeRole),
K8sDruidNodeAnnouncer.ANNOUNCEMENT_DONE,
K8sDruidNodeAnnouncer.getIdAnnouncementLabel(),
encodeHostPort(node.getHostAndPortToUse())
);
}
private String getPodDefLabelPath(String label)
{
return StringUtils.format("%s/%s", POD_LABELS_PATH_PREFIX, label);
}
private String getPodDefAnnocationPath(String annotation)
{
return StringUtils.format("%s/%s", POD_ANNOTATIONS_PATH_PREFIX, annotation);
}
private static String encodeHostPort(String hostPort)
{
//K8S requires that label values must match regex (([A-Za-z0-9][-A-Za-z0-9_.]*)?[A-Za-z0-9])?
//So, it is essential to replace ':' with '-'
// it is assumed that hostname does not have ':' in it except for separating host and port
Preconditions.checkState(
hostPort.indexOf(':') == hostPort.lastIndexOf(':'),
"hostname in host:port[%s] has ':' in it", hostPort
);
return hostPort.replace(':', '-');
}
private String replaceLast(String str, char oldChar, char newChar)
{
char[] chars = str.toCharArray();
for (int i = chars.length - 1; i >= 0; i--) {
if (chars[i] == oldChar) {
chars[i] = newChar;
break;
}
}
return String.valueOf(chars);
}
}

View File

@ -0,0 +1,363 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.k8s.discovery;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.inject.Inject;
import io.kubernetes.client.util.Watch;
import org.apache.druid.concurrent.LifecycleLock;
import org.apache.druid.discovery.BaseNodeRoleWatcher;
import org.apache.druid.discovery.DiscoveryDruidNode;
import org.apache.druid.discovery.DruidNodeDiscovery;
import org.apache.druid.discovery.DruidNodeDiscoveryProvider;
import org.apache.druid.discovery.NodeRole;
import org.apache.druid.guice.ManageLifecycle;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.guava.CloseQuietly;
import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.server.DruidNode;
import java.io.Closeable;
import java.net.SocketTimeoutException;
import java.util.Collection;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BooleanSupplier;
@ManageLifecycle
public class K8sDruidNodeDiscoveryProvider extends DruidNodeDiscoveryProvider
{
private static final Logger LOGGER = new Logger(K8sDruidNodeDiscoveryProvider.class);
private final PodInfo podInfo;
private final K8sDiscoveryConfig discoveryConfig;
private final K8sApiClient k8sApiClient;
private ExecutorService listenerExecutor;
private final ConcurrentHashMap<NodeRole, NodeRoleWatcher> nodeTypeWatchers = new ConcurrentHashMap<>();
private final LifecycleLock lifecycleLock = new LifecycleLock();
private final long watcherErrorRetryWaitMS;
@Inject
public K8sDruidNodeDiscoveryProvider(
PodInfo podInfo,
K8sDiscoveryConfig discoveryConfig,
K8sApiClient k8sApiClient
)
{
// at some point, if needed, watcherErrorRetryWaitMS here can be made configurable and maybe some randomization
// component as well.
this(podInfo, discoveryConfig, k8sApiClient, 10_000);
}
@VisibleForTesting
K8sDruidNodeDiscoveryProvider(
PodInfo podInfo,
K8sDiscoveryConfig discoveryConfig,
K8sApiClient k8sApiClient,
long watcherErrorRetryWaitMS
)
{
this.podInfo = podInfo;
this.discoveryConfig = discoveryConfig;
this.k8sApiClient = k8sApiClient;
this.watcherErrorRetryWaitMS = watcherErrorRetryWaitMS;
}
@Override
public BooleanSupplier getForNode(DruidNode node, NodeRole nodeRole)
{
return () -> !k8sApiClient.listPods(
podInfo.getPodNamespace(),
K8sDruidNodeAnnouncer.getLabelSelectorForNode(discoveryConfig, nodeRole, node),
nodeRole
).getDruidNodes().isEmpty();
}
@Override
public DruidNodeDiscovery getForNodeRole(NodeRole nodeType)
{
return getForNodeRole(nodeType, true);
}
@VisibleForTesting
NodeRoleWatcher getForNodeRole(NodeRole nodeType, boolean startAfterCreation)
{
Preconditions.checkState(lifecycleLock.awaitStarted(1, TimeUnit.MILLISECONDS));
return nodeTypeWatchers.computeIfAbsent(
nodeType,
nType -> {
LOGGER.info("Creating NodeRoleWatcher for nodeRole [%s].", nType);
NodeRoleWatcher nodeRoleWatcher = new NodeRoleWatcher(
listenerExecutor,
nType,
podInfo,
discoveryConfig,
k8sApiClient,
watcherErrorRetryWaitMS
);
if (startAfterCreation) {
nodeRoleWatcher.start();
}
LOGGER.info("Created NodeRoleWatcher for nodeRole [%s].", nType);
return nodeRoleWatcher;
}
);
}
@LifecycleStart
public void start()
{
if (!lifecycleLock.canStart()) {
throw new ISE("can't start.");
}
try {
LOGGER.info("starting");
// This is single-threaded to ensure that all listener calls are executed precisely in the oder of add/remove
// event occurences.
listenerExecutor = Execs.singleThreaded("K8sDruidNodeDiscoveryProvider-ListenerExecutor");
LOGGER.info("started");
lifecycleLock.started();
}
finally {
lifecycleLock.exitStart();
}
}
@LifecycleStop
public void stop()
{
if (!lifecycleLock.canStop()) {
throw new ISE("can't stop.");
}
LOGGER.info("stopping");
for (NodeRoleWatcher watcher : nodeTypeWatchers.values()) {
watcher.stop();
}
listenerExecutor.shutdownNow();
LOGGER.info("stopped");
}
@VisibleForTesting
static class NodeRoleWatcher implements DruidNodeDiscovery
{
private static final Logger LOGGER = new Logger(NodeRoleWatcher.class);
private final PodInfo podInfo;
private final K8sDiscoveryConfig discoveryConfig;
private final K8sApiClient k8sApiClient;
private ExecutorService watchExecutor;
private final LifecycleLock lifecycleLock = new LifecycleLock();
private final AtomicReference<Closeable> watchRef = new AtomicReference<>();
private static final Closeable STOP_MARKER = () -> {};
private final NodeRole nodeRole;
private final BaseNodeRoleWatcher baseNodeRoleWatcher;
private final long watcherErrorRetryWaitMS;
NodeRoleWatcher(
ExecutorService listenerExecutor,
NodeRole nodeRole,
PodInfo podInfo,
K8sDiscoveryConfig discoveryConfig,
K8sApiClient k8sApiClient,
long watcherErrorRetryWaitMS
)
{
this.podInfo = podInfo;
this.discoveryConfig = discoveryConfig;
this.k8sApiClient = k8sApiClient;
this.nodeRole = nodeRole;
this.baseNodeRoleWatcher = new BaseNodeRoleWatcher(listenerExecutor, nodeRole);
this.watcherErrorRetryWaitMS = watcherErrorRetryWaitMS;
}
private void watch()
{
String labelSelector = K8sDruidNodeAnnouncer.getLabelSelectorForNodeRole(discoveryConfig, nodeRole);
boolean cacheInitialized = false;
while (lifecycleLock.awaitStarted(1, TimeUnit.MILLISECONDS)) {
try {
DiscoveryDruidNodeList list = k8sApiClient.listPods(podInfo.getPodNamespace(), labelSelector, nodeRole);
baseNodeRoleWatcher.resetNodes(list.getDruidNodes());
if (!cacheInitialized) {
baseNodeRoleWatcher.cacheInitialized();
cacheInitialized = true;
}
keepWatching(
podInfo.getPodNamespace(),
labelSelector,
list.getResourceVersion()
);
}
catch (Throwable ex) {
LOGGER.error(ex, "Expection while watching for NodeRole [%s].", nodeRole);
// Wait a little before trying again.
sleep(watcherErrorRetryWaitMS);
}
}
LOGGER.info("Exited Watch for NodeRole [%s].", nodeRole);
}
private void keepWatching(String namespace, String labelSelector, String resourceVersion)
{
String nextResourceVersion = resourceVersion;
while (lifecycleLock.awaitStarted(1, TimeUnit.MILLISECONDS)) {
try {
WatchResult iter =
k8sApiClient.watchPods(podInfo.getPodNamespace(), labelSelector, nextResourceVersion, nodeRole);
if (iter == null) {
// history not available, we need to start from scratch
return;
}
try {
while (iter.hasNext()) {
Watch.Response<DiscoveryDruidNodeAndResourceVersion> item = iter.next();
if (item != null && item.type != null) {
switch (item.type) {
case WatchResult.ADDED:
baseNodeRoleWatcher.childAdded(item.object.getNode());
break;
case WatchResult.DELETED:
baseNodeRoleWatcher.childRemoved(item.object.getNode());
break;
default:
}
// This should be updated after the action has been dealt with successfully
nextResourceVersion = item.object.getResourceVersion();
} else {
LOGGER.error("WTH! item or item.type is NULL");
}
}
}
finally {
iter.close();
}
}
catch (SocketTimeoutException ex) {
// socket read timeout can happen normally due to k8s not having anything new to push leading to socket
// read timeout, so no error log
sleep(watcherErrorRetryWaitMS);
}
catch (Throwable ex) {
LOGGER.error(ex, "Error while watching node type [%s]", this.nodeRole);
sleep(watcherErrorRetryWaitMS);
}
}
}
private void sleep(long ms)
{
try {
Thread.sleep(ms);
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
public void start()
{
if (!lifecycleLock.canStart()) {
throw new ISE("can't start.");
}
try {
LOGGER.info("Starting NodeRoleWatcher for [%s]...", nodeRole);
this.watchExecutor = Execs.singleThreaded(this.getClass().getName() + nodeRole.getJsonName());
watchExecutor.submit(this::watch);
lifecycleLock.started();
LOGGER.info("Started NodeRoleWatcher for [%s].", nodeRole);
}
finally {
lifecycleLock.exitStart();
}
}
public void stop()
{
if (!lifecycleLock.canStop()) {
throw new ISE("can't stop.");
}
try {
LOGGER.info("Stopping NodeRoleWatcher for [%s]...", nodeRole);
CloseQuietly.close(watchRef.getAndSet(STOP_MARKER));
watchExecutor.shutdownNow();
if (!watchExecutor.awaitTermination(15, TimeUnit.SECONDS)) {
LOGGER.warn("Failed to stop watchExecutor for NodeRoleWatcher[%s]", nodeRole);
}
LOGGER.info("Stopped NodeRoleWatcher for [%s].", nodeRole);
}
catch (Exception ex) {
LOGGER.error(ex, "Failed to stop NodeRoleWatcher for [%s].", nodeRole);
}
}
@Override
public Collection<DiscoveryDruidNode> getAllNodes()
{
return baseNodeRoleWatcher.getAllNodes();
}
@Override
public void registerListener(Listener listener)
{
baseNodeRoleWatcher.registerListener(listener);
}
}
}

View File

@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.k8s.discovery;
import io.kubernetes.client.extended.leaderelection.LeaderElector;
/**
* Interface to abstract creation/use of {@link LeaderElector} from k8s-client java lib to allow unit tests with
* mock impl.
*/
public interface K8sLeaderElectorFactory
{
K8sLeaderElector create(
String candidateId,
String namespace,
String lockResourceName
);
}
interface K8sLeaderElector
{
String getCurrentLeader();
void run(Runnable startLeadingHook, Runnable stopLeadingHook);
}

View File

@ -0,0 +1,119 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.k8s.discovery;
import com.google.common.base.Preconditions;
import org.apache.druid.concurrent.LifecycleLock;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.logger.Logger;
import java.io.Closeable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
public class LeaderElectorAsyncWrapper implements Closeable
{
private static final Logger LOGGER = new Logger(LeaderElectorAsyncWrapper.class);
private ExecutorService executor;
private final AtomicReference<Future> futureRef = new AtomicReference<>();
private final K8sLeaderElector k8sLeaderElector;
private final LifecycleLock lifecycleLock = new LifecycleLock();
public LeaderElectorAsyncWrapper(
String candidateId,
String lockResourceName,
String lockResourceNamespace,
K8sDiscoveryConfig discoveryConfig,
K8sLeaderElectorFactory k8sLeaderElectorFactory
)
{
Preconditions.checkArgument(
K8sDiscoveryConfig.K8S_RESOURCE_NAME_REGEX.matcher(lockResourceName).matches(),
"lockResourceName[%s] must match regex[%s]",
lockResourceName,
K8sDiscoveryConfig.K8S_RESOURCE_NAME_REGEX.pattern()
);
LOGGER.info(
"Creating LeaderElector with candidateId[%s], lockResourceName[%s], k8sNamespace[%s].",
candidateId,
lockResourceName,
lockResourceNamespace
);
k8sLeaderElector = k8sLeaderElectorFactory.create(candidateId, lockResourceNamespace, lockResourceName);
}
public void run(Runnable startLeadingHook, Runnable stopLeadingHook)
{
if (!lifecycleLock.canStart()) {
throw new ISE("can't start.");
}
try {
executor = Execs.singleThreaded(this.getClass().getSimpleName());
futureRef.set(executor.submit(
() -> {
while (lifecycleLock.awaitStarted(1, TimeUnit.MILLISECONDS)) {
try {
k8sLeaderElector.run(startLeadingHook, stopLeadingHook);
}
catch (Throwable ex) {
LOGGER.error(ex, "Exception in K8s LeaderElector.run()");
}
}
}
));
lifecycleLock.started();
}
finally {
lifecycleLock.exitStart();
}
}
@Override
public void close()
{
if (!lifecycleLock.canStop()) {
throw new ISE("can't stop.");
}
try {
futureRef.get().cancel(true);
executor.shutdownNow();
if (!executor.awaitTermination(10, TimeUnit.SECONDS)) {
LOGGER.warn("Failed to terminate [%s] executor.", this.getClass().getSimpleName());
}
}
catch (InterruptedException ex) {
Thread.currentThread().interrupt();
}
}
public String getCurrentLeader()
{
return k8sLeaderElector.getCurrentLeader();
}
}

View File

@ -0,0 +1,59 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.k8s.discovery;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.inject.Inject;
import org.apache.druid.guice.LazySingleton;
@LazySingleton
public class PodInfo
{
private final String podName;
private final String podNamespace;
@Inject
public PodInfo(K8sDiscoveryConfig discoveryConfig)
{
this.podName = System.getenv(discoveryConfig.getPodNameEnvKey());
Preconditions.checkState(podName != null && !podName.isEmpty(), "Failed to find podName");
this.podNamespace = System.getenv(discoveryConfig.getPodNamespaceEnvKey());
Preconditions.checkState(podNamespace != null && !podNamespace.isEmpty(), "Failed to find podNamespace");
}
@VisibleForTesting
public PodInfo(String podName, String podNamespace)
{
this.podName = podName;
this.podNamespace = podNamespace;
}
public String getPodName()
{
return podName;
}
public String getPodNamespace()
{
return podNamespace;
}
}

View File

@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.k8s.discovery;
import io.kubernetes.client.util.Watch;
import java.net.SocketTimeoutException;
public interface WatchResult
{
String ADDED = "ADDED";
String DELETED = "DELETED";
boolean hasNext() throws SocketTimeoutException;
Watch.Response<DiscoveryDruidNodeAndResourceVersion> next();
void close();
}

View File

@ -0,0 +1,16 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
org.apache.druid.k8s.discovery.K8sDiscoveryModule

View File

@ -0,0 +1,124 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.k8s.discovery;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.kubernetes.client.util.Config;
import org.apache.druid.discovery.DiscoveryDruidNode;
import org.apache.druid.discovery.DruidNodeDiscovery;
import org.apache.druid.discovery.NodeRole;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.server.DruidNode;
import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Test;
import java.util.Collection;
import java.util.Iterator;
import java.util.concurrent.CountDownLatch;
import java.util.function.BooleanSupplier;
/**
* This is not a UT, but very helpful when making changes to ensure things work with real K8S Api Server.
* It is ignored in the build but checked in the reporitory for running manually by devs.
*/
@Ignore("Needs K8S API Server")
public class K8sAnnouncerAndDiscoveryIntTest
{
private final DiscoveryDruidNode testNode = new DiscoveryDruidNode(
new DruidNode("druid/router", "test-host", true, 80, null, true, false),
NodeRole.ROUTER,
null
);
private final ObjectMapper jsonMapper = new DefaultObjectMapper();
private final PodInfo podInfo = new PodInfo("busybox", "default");
private final K8sDiscoveryConfig discoveryConfig = new K8sDiscoveryConfig("druid-cluster", null, null, null, null, null, null, null);
@Test(timeout = 30000L)
public void testAnnouncementAndDiscoveryWorkflow() throws Exception
{
K8sApiClient k8sApiClient = new DefaultK8sApiClient(Config.defaultClient(), new DefaultObjectMapper());
K8sDruidNodeDiscoveryProvider discoveryProvider = new K8sDruidNodeDiscoveryProvider(
podInfo,
discoveryConfig,
k8sApiClient
);
discoveryProvider.start();
BooleanSupplier nodeInquirer = discoveryProvider.getForNode(testNode.getDruidNode(), NodeRole.ROUTER);
Assert.assertFalse(nodeInquirer.getAsBoolean());
DruidNodeDiscovery discovery = discoveryProvider.getForNodeRole(NodeRole.ROUTER);
CountDownLatch nodeViewInitialized = new CountDownLatch(1);
CountDownLatch nodeAppeared = new CountDownLatch(1);
CountDownLatch nodeDisappeared = new CountDownLatch(1);
discovery.registerListener(
new DruidNodeDiscovery.Listener()
{
@Override
public void nodesAdded(Collection<DiscoveryDruidNode> nodes)
{
Iterator<DiscoveryDruidNode> iter = nodes.iterator();
if (iter.hasNext() && testNode.getDruidNode().getHostAndPort().equals(iter.next().getDruidNode().getHostAndPort())) {
nodeAppeared.countDown();
}
}
@Override
public void nodesRemoved(Collection<DiscoveryDruidNode> nodes)
{
Iterator<DiscoveryDruidNode> iter = nodes.iterator();
if (iter.hasNext() && testNode.getDruidNode().getHostAndPort().equals(iter.next().getDruidNode().getHostAndPort())) {
nodeDisappeared.countDown();
}
}
@Override
public void nodeViewInitialized()
{
nodeViewInitialized.countDown();
}
}
);
nodeViewInitialized.await();
K8sDruidNodeAnnouncer announcer = new K8sDruidNodeAnnouncer(podInfo, discoveryConfig, k8sApiClient, jsonMapper);
announcer.announce(testNode);
nodeAppeared.await();
Assert.assertTrue(nodeInquirer.getAsBoolean());
announcer.unannounce(testNode);
nodeDisappeared.await();
Assert.assertFalse(nodeInquirer.getAsBoolean());
discoveryProvider.stop();
}
}

View File

@ -0,0 +1,79 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.k8s.discovery;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.joda.time.Duration;
import org.junit.Assert;
import org.junit.Test;
public class K8sDiscoveryConfigTest
{
private final ObjectMapper jsonMapper = new DefaultObjectMapper();
@Test
public void testDefaultValuesSerde() throws Exception
{
testSerde(
"{\"clusterIdentifier\": \"test-cluster\"}\n",
new K8sDiscoveryConfig("test-cluster", null, null, null, null, null, null, null)
);
}
@Test
public void testCustomizedValuesSerde() throws Exception
{
testSerde(
"{\n"
+ " \"clusterIdentifier\": \"test-cluster\",\n"
+ " \"podNameEnvKey\": \"PODNAMETEST\",\n"
+ " \"podNamespaceEnvKey\": \"PODNAMESPACETEST\",\n"
+ " \"coordinatorLeaderElectionConfigMapNamespace\": \"coordinatorns\",\n"
+ " \"overlordLeaderElectionConfigMapNamespace\": \"overlordns\",\n"
+ " \"leaseDuration\": \"PT3S\",\n"
+ " \"renewDeadline\": \"PT2S\",\n"
+ " \"retryPeriod\": \"PT1S\"\n"
+ "}\n",
new K8sDiscoveryConfig(
"test-cluster",
"PODNAMETEST",
"PODNAMESPACETEST",
"coordinatorns",
"overlordns",
Duration.millis(3000),
Duration.millis(2000),
Duration.millis(1000)
)
);
}
private void testSerde(String jsonStr, K8sDiscoveryConfig expected) throws Exception
{
K8sDiscoveryConfig actual = jsonMapper.readValue(
jsonMapper.writeValueAsString(
jsonMapper.readValue(jsonStr, K8sDiscoveryConfig.class)
),
K8sDiscoveryConfig.class
);
Assert.assertEquals(expected, actual);
}
}

View File

@ -0,0 +1,166 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.k8s.discovery;
import io.kubernetes.client.openapi.ApiClient;
import io.kubernetes.client.util.Config;
import org.apache.druid.discovery.DiscoveryDruidNode;
import org.apache.druid.discovery.DruidLeaderSelector;
import org.apache.druid.discovery.NodeRole;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.server.DruidNode;
import org.joda.time.Duration;
import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Test;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* This is not a UT, but very helpful when making changes to ensure things work with real K8S Api Server.
* It is ignored in the build but checked in the reporitory for running manually by devs.
*/
@Ignore("Needs K8S API Server")
public class K8sDruidLeaderElectionIntTest
{
private final DiscoveryDruidNode testNode1 = new DiscoveryDruidNode(
new DruidNode("druid/router", "test-host1", true, 80, null, true, false),
NodeRole.ROUTER,
null
);
private final DiscoveryDruidNode testNode2 = new DiscoveryDruidNode(
new DruidNode("druid/router", "test-host2", true, 80, null, true, false),
NodeRole.ROUTER,
null
);
private final K8sDiscoveryConfig discoveryConfig = new K8sDiscoveryConfig("druid-cluster", null, null, "default", "default",
Duration.millis(10_000), Duration.millis(7_000), Duration.millis(3_000));
private final ApiClient k8sApiClient;
private final String lockResourceName = "druid-leader-election";
public K8sDruidLeaderElectionIntTest() throws Exception
{
EmittingLogger.registerEmitter(new NoopServiceEmitter());
k8sApiClient = Config.defaultClient();
}
// Note: This one is supposed to crash.
@Test(timeout = 60000L)
public void test_becomeLeader_exception() throws Exception
{
K8sDruidLeaderSelector leaderSelector = new K8sDruidLeaderSelector(testNode1.getDruidNode(), lockResourceName, discoveryConfig.getCoordinatorLeaderElectionConfigMapNamespace(), discoveryConfig, new DefaultK8sLeaderElectorFactory(k8sApiClient, discoveryConfig));
CountDownLatch becomeLeaderLatch = new CountDownLatch(1);
CountDownLatch stopBeingLeaderLatch = new CountDownLatch(1);
AtomicBoolean failed = new AtomicBoolean(false);
leaderSelector.registerListener(new DruidLeaderSelector.Listener()
{
@Override
public void becomeLeader()
{
becomeLeaderLatch.countDown();
// This leads to a System.exit() and pod restart is expected to happen.
throw new RuntimeException("Leader crashed");
}
@Override
public void stopBeingLeader()
{
try {
becomeLeaderLatch.await();
stopBeingLeaderLatch.countDown();
}
catch (InterruptedException ex) {
failed.set(true);
}
}
});
becomeLeaderLatch.await();
stopBeingLeaderLatch.await();
Assert.assertFalse(failed.get());
}
@Test(timeout = 60000L)
public void test_leaderCandidate_stopped() throws Exception
{
K8sDruidLeaderSelector leaderSelector = new K8sDruidLeaderSelector(testNode1.getDruidNode(), lockResourceName, discoveryConfig.getCoordinatorLeaderElectionConfigMapNamespace(), discoveryConfig, new DefaultK8sLeaderElectorFactory(k8sApiClient, discoveryConfig));
CountDownLatch becomeLeaderLatch = new CountDownLatch(1);
CountDownLatch stopBeingLeaderLatch = new CountDownLatch(1);
AtomicBoolean failed = new AtomicBoolean(false);
leaderSelector.registerListener(new DruidLeaderSelector.Listener()
{
@Override
public void becomeLeader()
{
becomeLeaderLatch.countDown();
}
@Override
public void stopBeingLeader()
{
try {
becomeLeaderLatch.await();
stopBeingLeaderLatch.countDown();
}
catch (InterruptedException ex) {
failed.set(true);
}
}
});
becomeLeaderLatch.await();
leaderSelector.unregisterListener();
stopBeingLeaderLatch.await();
Assert.assertFalse(failed.get());
leaderSelector = new K8sDruidLeaderSelector(testNode2.getDruidNode(), lockResourceName, discoveryConfig.getCoordinatorLeaderElectionConfigMapNamespace(), discoveryConfig, new DefaultK8sLeaderElectorFactory(k8sApiClient, discoveryConfig));
CountDownLatch becomeLeaderLatch2 = new CountDownLatch(1);
leaderSelector.registerListener(new DruidLeaderSelector.Listener()
{
@Override
public void becomeLeader()
{
becomeLeaderLatch2.countDown();
}
@Override
public void stopBeingLeader()
{
}
});
becomeLeaderLatch2.await();
}
}

View File

@ -0,0 +1,180 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.k8s.discovery;
import org.apache.druid.discovery.DiscoveryDruidNode;
import org.apache.druid.discovery.DruidLeaderSelector;
import org.apache.druid.discovery.NodeRole;
import org.apache.druid.server.DruidNode;
import org.joda.time.Duration;
import org.junit.Assert;
import org.junit.Test;
import java.util.concurrent.CountDownLatch;
public class K8sDruidLeaderSelectorTest
{
private final DiscoveryDruidNode testNode1 = new DiscoveryDruidNode(
new DruidNode("druid/router", "test-host1", true, 80, null, true, false),
NodeRole.ROUTER,
null
);
private final K8sDiscoveryConfig discoveryConfig = new K8sDiscoveryConfig("druid-cluster", null, null,
"default", "default", Duration.millis(10_000), Duration.millis(7_000), Duration.millis(3_000));
private final String lockResourceName = "druid-leader-election";
@Test(timeout = 5_000)
public void testLeaderElection_HappyPath() throws Exception
{
K8sDruidLeaderSelector leaderSelector = new K8sDruidLeaderSelector(
testNode1.getDruidNode(),
lockResourceName,
discoveryConfig.getCoordinatorLeaderElectionConfigMapNamespace(),
discoveryConfig,
new K8sLeaderElectorFactory()
{
@Override
public K8sLeaderElector create(String candidateId, String namespace, String lockResourceName)
{
return new K8sLeaderElector()
{
@Override
public String getCurrentLeader()
{
return testNode1.getDruidNode().getHostAndPortToUse();
}
@Override
public void run(Runnable startLeadingHook, Runnable stopLeadingHook)
{
startLeadingHook.run();
try {
Thread.sleep(Long.MAX_VALUE);
}
catch (InterruptedException ex) {
stopLeadingHook.run();
}
}
};
}
}
);
Assert.assertEquals(testNode1.getDruidNode().getHostAndPortToUse(), leaderSelector.getCurrentLeader());
CountDownLatch becomeLeaderLatch = new CountDownLatch(1);
CountDownLatch stopBeingLeaderLatch = new CountDownLatch(1);
leaderSelector.registerListener(
new DruidLeaderSelector.Listener()
{
@Override
public void becomeLeader()
{
becomeLeaderLatch.countDown();
}
@Override
public void stopBeingLeader()
{
stopBeingLeaderLatch.countDown();
}
}
);
becomeLeaderLatch.await();
leaderSelector.unregisterListener();
stopBeingLeaderLatch.await();
}
@Test(timeout = 5_000)
public void testLeaderElection_LeaderElectorExits() throws Exception
{
K8sDruidLeaderSelector leaderSelector = new K8sDruidLeaderSelector(
testNode1.getDruidNode(),
lockResourceName,
discoveryConfig.getCoordinatorLeaderElectionConfigMapNamespace(),
discoveryConfig,
new K8sLeaderElectorFactory()
{
@Override
public K8sLeaderElector create(String candidateId, String namespace, String lockResourceName)
{
return new K8sLeaderElector()
{
private boolean isFirstTime = true;
@Override
public String getCurrentLeader()
{
return testNode1.getDruidNode().getHostAndPortToUse();
}
@Override
public void run(Runnable startLeadingHook, Runnable stopLeadingHook)
{
startLeadingHook.run();
if (isFirstTime) {
isFirstTime = false;
stopLeadingHook.run();
} else {
try {
Thread.sleep(Long.MAX_VALUE);
}
catch (InterruptedException ex) {
stopLeadingHook.run();
}
}
}
};
}
}
);
Assert.assertEquals(testNode1.getDruidNode().getHostAndPortToUse(), leaderSelector.getCurrentLeader());
CountDownLatch becomeLeaderLatch = new CountDownLatch(2);
CountDownLatch stopBeingLeaderLatch = new CountDownLatch(2);
leaderSelector.registerListener(
new DruidLeaderSelector.Listener()
{
@Override
public void becomeLeader()
{
becomeLeaderLatch.countDown();
}
@Override
public void stopBeingLeader()
{
stopBeingLeaderLatch.countDown();
}
}
);
becomeLeaderLatch.await();
leaderSelector.unregisterListener();
stopBeingLeaderLatch.await();
}
}

View File

@ -0,0 +1,143 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.k8s.discovery;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import org.apache.druid.discovery.DiscoveryDruidNode;
import org.apache.druid.discovery.NodeRole;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.server.DruidNode;
import org.easymock.Capture;
import org.easymock.EasyMock;
import org.junit.Assert;
import org.junit.Test;
import java.util.List;
import java.util.Map;
public class K8sDruidNodeAnnouncerTest
{
private final DiscoveryDruidNode testNode = new DiscoveryDruidNode(
new DruidNode("druid/router", "test-host", true, 80, null, true, false),
NodeRole.ROUTER,
null
);
private final ObjectMapper jsonMapper = new DefaultObjectMapper();
private final PodInfo podInfo = new PodInfo("testpod", "testns");
private final K8sDiscoveryConfig discoveryConfig = new K8sDiscoveryConfig("druid-cluster", null, null, null, null, null, null, null);
@Test
public void testAnnounce() throws Exception
{
K8sApiClient mockK8sApiClient = EasyMock.createMock(K8sApiClient.class);
Capture<String> podNameArg = Capture.newInstance();
Capture<String> namespaceArg = Capture.newInstance();
Capture<String> patchArg = Capture.newInstance();
mockK8sApiClient.patchPod(EasyMock.capture(podNameArg), EasyMock.capture(namespaceArg), EasyMock.capture(patchArg));
EasyMock.replay(mockK8sApiClient);
K8sDruidNodeAnnouncer announcer = new K8sDruidNodeAnnouncer(podInfo, discoveryConfig, mockK8sApiClient, jsonMapper);
announcer.announce(testNode);
Assert.assertEquals(podInfo.getPodName(), podNameArg.getValue());
Assert.assertEquals(podInfo.getPodNamespace(), namespaceArg.getValue());
List<Map<String, Object>> actualPatchList = jsonMapper.readValue(
patchArg.getValue(),
new TypeReference<List<Map<String, Object>>>()
{
}
);
List<Map<String, Object>> expectedPatchList = Lists.newArrayList(
ImmutableMap.of(
"op", "add",
"path", "/metadata/labels/druidDiscoveryAnnouncement-router",
"value", "true"
),
ImmutableMap.of(
"op", "add",
"path", "/metadata/labels/druidDiscoveryAnnouncement-id",
"value", "test-host-80"
),
ImmutableMap.of(
"op", "add",
"path", "/metadata/labels/druidDiscoveryAnnouncement-cluster-identifier",
"value", discoveryConfig.getClusterIdentifier()
),
ImmutableMap.of(
"op", "add",
"path", "/metadata/annotations/druidNodeInfo-router",
"value", jsonMapper.writeValueAsString(testNode)
)
);
Assert.assertEquals(expectedPatchList, actualPatchList);
}
@Test
public void testUnannounce() throws Exception
{
K8sApiClient mockK8sApiClient = EasyMock.createMock(K8sApiClient.class);
Capture<String> podNameArg = Capture.newInstance();
Capture<String> namespaceArg = Capture.newInstance();
Capture<String> patchArg = Capture.newInstance();
mockK8sApiClient.patchPod(EasyMock.capture(podNameArg), EasyMock.capture(namespaceArg), EasyMock.capture(patchArg));
EasyMock.replay(mockK8sApiClient);
K8sDruidNodeAnnouncer announcer = new K8sDruidNodeAnnouncer(podInfo, discoveryConfig, mockK8sApiClient, jsonMapper);
announcer.unannounce(testNode);
Assert.assertEquals(podInfo.getPodName(), podNameArg.getValue());
Assert.assertEquals(podInfo.getPodNamespace(), namespaceArg.getValue());
List<Map<String, String>> actualPatchList = jsonMapper.readValue(
patchArg.getValue(),
new TypeReference<List<Map<String, String>>>()
{
}
);
List<Map<String, String>> expectedPatchList = Lists.newArrayList(
ImmutableMap.of(
"op", "remove",
"path", "/metadata/labels/druidDiscoveryAnnouncement-router"
),
ImmutableMap.of(
"op", "remove",
"path", "/metadata/labels/druidDiscoveryAnnouncement-id"
),
ImmutableMap.of(
"op", "remove",
"path", "/metadata/labels/druidDiscoveryAnnouncement-cluster-identifier"
),
ImmutableMap.of(
"op", "remove",
"path", "/metadata/annotations/druidNodeInfo-router"
)
);
Assert.assertEquals(expectedPatchList, actualPatchList);
}
}

View File

@ -0,0 +1,343 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.k8s.discovery;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import io.kubernetes.client.util.Watch;
import org.apache.druid.discovery.DiscoveryDruidNode;
import org.apache.druid.discovery.DruidNodeDiscovery;
import org.apache.druid.discovery.NodeRole;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.server.DruidNode;
import org.easymock.EasyMock;
import org.junit.Assert;
import org.junit.Test;
import java.net.SocketTimeoutException;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
public class K8sDruidNodeDiscoveryProviderTest
{
private static final Logger LOGGER = new Logger(K8sDruidNodeDiscoveryProviderTest.class);
private final DiscoveryDruidNode testNode1 = new DiscoveryDruidNode(
new DruidNode("druid/router", "test-host1", true, 80, null, true, false),
NodeRole.ROUTER,
null
);
private final DiscoveryDruidNode testNode2 = new DiscoveryDruidNode(
new DruidNode("druid/router", "test-host2", true, 80, null, true, false),
NodeRole.ROUTER,
null
);
private final DiscoveryDruidNode testNode3 = new DiscoveryDruidNode(
new DruidNode("druid/router", "test-host3", true, 80, null, true, false),
NodeRole.ROUTER,
null
);
private final DiscoveryDruidNode testNode4 = new DiscoveryDruidNode(
new DruidNode("druid/router", "test-host4", true, 80, null, true, false),
NodeRole.ROUTER,
null
);
private final DiscoveryDruidNode testNode5 = new DiscoveryDruidNode(
new DruidNode("druid/router", "test-host5", true, 80, null, true, false),
NodeRole.ROUTER,
null
);
private final PodInfo podInfo = new PodInfo("testpod", "testns");
private final K8sDiscoveryConfig discoveryConfig = new K8sDiscoveryConfig("druid-cluster", null, null, null, null, null, null, null);
@Test(timeout = 60_000)
public void testGetForNodeRole() throws Exception
{
String labelSelector = "druidDiscoveryAnnouncement-cluster-identifier=druid-cluster,druidDiscoveryAnnouncement-router=true";
K8sApiClient mockK8sApiClient = EasyMock.createMock(K8sApiClient.class);
EasyMock.expect(mockK8sApiClient.listPods(podInfo.getPodNamespace(), labelSelector, NodeRole.ROUTER)).andReturn(
new DiscoveryDruidNodeList(
"v1",
ImmutableMap.of(
testNode1.getDruidNode().getHostAndPortToUse(), testNode1,
testNode2.getDruidNode().getHostAndPortToUse(), testNode2
)
)
);
EasyMock.expect(mockK8sApiClient.watchPods(
podInfo.getPodNamespace(), labelSelector, "v1", NodeRole.ROUTER)).andReturn(null);
EasyMock.expect(mockK8sApiClient.listPods(podInfo.getPodNamespace(), labelSelector, NodeRole.ROUTER)).andReturn(
new DiscoveryDruidNodeList(
"v2",
ImmutableMap.of(
testNode2.getDruidNode().getHostAndPortToUse(), testNode2,
testNode3.getDruidNode().getHostAndPortToUse(), testNode3
)
)
);
EasyMock.expect(mockK8sApiClient.watchPods(
podInfo.getPodNamespace(), labelSelector, "v2", NodeRole.ROUTER)).andReturn(
new MockWatchResult(Collections.emptyList(), true, false)
);
EasyMock.expect(mockK8sApiClient.watchPods(
podInfo.getPodNamespace(), labelSelector, "v2", NodeRole.ROUTER)).andReturn(
new MockWatchResult(
ImmutableList.of(
new Watch.Response<>(WatchResult.ADDED, new DiscoveryDruidNodeAndResourceVersion("v3", testNode4)),
new Watch.Response<>(WatchResult.DELETED, new DiscoveryDruidNodeAndResourceVersion("v4", testNode2))
),
false,
true
)
);
EasyMock.expect(mockK8sApiClient.watchPods(
podInfo.getPodNamespace(), labelSelector, "v4", NodeRole.ROUTER)).andReturn(
new MockWatchResult(
ImmutableList.of(
new Watch.Response<>(WatchResult.ADDED, new DiscoveryDruidNodeAndResourceVersion("v5", testNode5)),
new Watch.Response<>(WatchResult.DELETED, new DiscoveryDruidNodeAndResourceVersion("v6", testNode3))
),
false,
false
)
);
EasyMock.replay(mockK8sApiClient);
K8sDruidNodeDiscoveryProvider discoveryProvider = new K8sDruidNodeDiscoveryProvider(
podInfo,
discoveryConfig,
mockK8sApiClient,
1
);
discoveryProvider.start();
K8sDruidNodeDiscoveryProvider.NodeRoleWatcher nodeDiscovery = discoveryProvider.getForNodeRole(NodeRole.ROUTER, false);
MockListener testListener = new MockListener(
ImmutableList.of(
MockListener.Event.added(testNode1),
MockListener.Event.added(testNode2),
MockListener.Event.inited(),
MockListener.Event.added(testNode3),
MockListener.Event.deleted(testNode1),
MockListener.Event.added(testNode4),
MockListener.Event.deleted(testNode2),
MockListener.Event.added(testNode5),
MockListener.Event.deleted(testNode3)
)
);
nodeDiscovery.registerListener(testListener);
nodeDiscovery.start();
testListener.assertSuccess();
discoveryProvider.stop();
}
private static class MockListener implements DruidNodeDiscovery.Listener
{
List<Event> events;
private boolean failed = false;
private String failReason;
public MockListener(List<Event> events)
{
this.events = Lists.newArrayList(events);
}
@Override
public void nodeViewInitialized()
{
assertNextEvent(Event.inited());
}
@Override
public void nodesAdded(Collection<DiscoveryDruidNode> nodes)
{
List<DiscoveryDruidNode> l = Lists.newArrayList(nodes);
Collections.sort(l, (n1, n2) -> n1.getDruidNode().getHostAndPortToUse().compareTo(n2.getDruidNode().getHostAndPortToUse()));
for (DiscoveryDruidNode node : l) {
assertNextEvent(Event.added(node));
}
}
@Override
public void nodesRemoved(Collection<DiscoveryDruidNode> nodes)
{
List<DiscoveryDruidNode> l = Lists.newArrayList(nodes);
Collections.sort(l, (n1, n2) -> n1.getDruidNode().getHostAndPortToUse().compareTo(n2.getDruidNode().getHostAndPortToUse()));
for (DiscoveryDruidNode node : l) {
assertNextEvent(Event.deleted(node));
}
}
private void assertNextEvent(Event actual)
{
if (!failed && !events.isEmpty()) {
Event expected = events.remove(0);
failed = !actual.equals(expected);
if (failed) {
failReason = StringUtils.format("Failed Equals [%s] and [%s]", expected, actual);
}
}
}
public void assertSuccess() throws Exception
{
while (!events.isEmpty()) {
Assert.assertFalse(failReason, failed);
LOGGER.info("Waiting for events to finish.");
Thread.sleep(1000);
}
Assert.assertFalse(failReason, failed);
}
static class Event
{
String type;
DiscoveryDruidNode node;
private Event(String type, DiscoveryDruidNode node)
{
this.type = type;
this.node = node;
}
static Event inited()
{
return new Event("inited", null);
}
static Event added(DiscoveryDruidNode node)
{
return new Event("added", node);
}
static Event deleted(DiscoveryDruidNode node)
{
return new Event("deleted", node);
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
Event event = (Event) o;
return type.equals(event.type) &&
Objects.equals(node, event.node);
}
@Override
public int hashCode()
{
return Objects.hash(type, node);
}
@Override
public String toString()
{
return "Event{" +
"type='" + type + '\'' +
", node=" + node +
'}';
}
}
}
private static class MockWatchResult implements WatchResult
{
private List<Watch.Response<DiscoveryDruidNodeAndResourceVersion>> results;
private volatile boolean timeoutOnStart;
private volatile boolean timeooutOnEmptyResults;
private volatile boolean closeCalled = false;
public MockWatchResult(
List<Watch.Response<DiscoveryDruidNodeAndResourceVersion>> results,
boolean timeoutOnStart,
boolean timeooutOnEmptyResults
)
{
this.results = Lists.newArrayList(results);
this.timeoutOnStart = timeoutOnStart;
this.timeooutOnEmptyResults = timeooutOnEmptyResults;
}
@Override
public boolean hasNext() throws SocketTimeoutException
{
if (timeoutOnStart) {
throw new SocketTimeoutException("testing timeout on start!!!");
}
if (results.isEmpty()) {
if (timeooutOnEmptyResults) {
throw new SocketTimeoutException("testing timeout on end!!!");
} else {
try {
Thread.sleep(Long.MAX_VALUE);
return false; // just making compiler happy, will never reach this.
}
catch (InterruptedException ex) {
throw new RuntimeException(ex);
}
}
} else {
return true;
}
}
@Override
public Watch.Response<DiscoveryDruidNodeAndResourceVersion> next()
{
return results.remove(0);
}
@Override
public void close()
{
closeCalled = true;
}
public void assertSuccess()
{
Assert.assertTrue("close() not called", closeCalled);
}
}
}

View File

@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.k8s.discovery;
import org.apache.druid.java.util.emitter.core.Event;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
public class NoopServiceEmitter extends ServiceEmitter
{
public NoopServiceEmitter()
{
super("", "", null);
}
@Override
public void emit(Event event)
{
}
}

View File

@ -827,6 +827,256 @@ libraries:
---
name: kubernetes official java client
license_category: binary
module: extensions/druid-kubernetes-extensions
license_name: Apache License version 2.0
version: 10.0.0
libraries:
- io.kubernetes: client-java
---
name: kubernetes official java client api
license_category: binary
module: extensions/druid-kubernetes-extensions
license_name: Apache License version 2.0
version: 10.0.0
libraries:
- io.kubernetes: client-java-api
---
name: kubernetes official java client extended
license_category: binary
module: extensions/druid-kubernetes-extensions
license_name: Apache License version 2.0
version: 10.0.0
libraries:
- io.kubernetes: client-java-extended
---
name: io.prometheus simpleclient_common
license_category: binary
module: extensions/druid-kubernetes-extensions
license_name: Apache License version 2.0
version: 0.9.0
libraries:
- io.prometheus: simpleclient_common
---
name: org.apache.commons commons-collections4
license_category: binary
module: extensions/druid-kubernetes-extensions
license_name: Apache License version 2.0
version: 4.4
libraries:
- org.apache.commons: commons-collections4
---
name: io.sundr builder-annotations
license_category: binary
module: extensions/druid-kubernetes-extensions
license_name: Apache License version 2.0
version: 0.22.0
libraries:
- io.sundr: builder-annotations
---
name: com.squareup.okio okio
license_category: binary
module: extensions/druid-kubernetes-extensions
license_name: Apache License version 2.0
version: 1.17.2
libraries:
- com.squareup.okio: okio
---
name: io.gsonfire gson-fire
license_category: binary
module: extensions/druid-kubernetes-extensions
license_name: Apache License version 2.0
version: 1.8.4
libraries:
- io.gsonfire: gson-fire
---
name: io.swagger swagger-annotations
license_category: binary
module: extensions/druid-kubernetes-extensions
license_name: Apache License version 2.0
version: 1.6.2
libraries:
- io.swagger: swagger-annotations
---
name: io.prometheus simpleclient_httpserver
license_category: binary
module: extensions/druid-kubernetes-extensions
license_name: Apache License version 2.0
version: 0.9.0
libraries:
- io.prometheus: simpleclient_httpserver
---
name: org.bitbucket.b_c jose4j
license_category: binary
module: extensions/druid-kubernetes-extensions
license_name: Apache License version 2.0
version: 0.7.2
libraries:
- org.bitbucket.b_c: jose4j
---
name: org.joda joda-convert
license_category: binary
module: extensions/druid-kubernetes-extensions
license_name: Apache License version 2.0
version: 2.2.1
libraries:
- org.joda: joda-convert
---
name: com.squareup.okhttp3 okhttp
license_category: binary
module: extensions/druid-kubernetes-extensions
license_name: Apache License version 2.0
version: 3.14.9
libraries:
- com.squareup.okhttp3: okhttp
---
name: io.prometheus simpleclient
license_category: binary
module: extensions/druid-kubernetes-extensions
license_name: Apache License version 2.0
version: 0.9.0
libraries:
- io.prometheus: simpleclient
---
name: io.kubernetes client-java-proto
license_category: binary
module: extensions/druid-kubernetes-extensions
license_name: Apache License version 2.0
version: 10.0.0
libraries:
- io.kubernetes: client-java-proto
---
name: org.yaml snakeyaml
license_category: binary
module: extensions/druid-kubernetes-extensions
license_name: Apache License version 2.0
version: 1.27
libraries:
- org.yaml: snakeyaml
---
name: com.flipkart.zjsonpatch zjsonpatch
license_category: binary
module: extensions/druid-kubernetes-extensions
license_name: Apache License version 2.0
version: 0.4.11
libraries:
- com.flipkart.zjsonpatch: zjsonpatch
---
name: org.bouncycastle bcprov-jdk15on
license_category: binary
module: extensions/druid-kubernetes-extensions
license_name: MIT License
version: 1.66
libraries:
- org.bouncycastle: bcprov-jdk15on
---
name: io.sundr resourcecify-annotations
license_category: binary
module: extensions/druid-kubernetes-extensions
license_name: Apache License version 2.0
version: 0.22.0
libraries:
- io.sundr: resourcecify-annotations
---
name: io.sundr sundr-codegen
license_category: binary
module: extensions/druid-kubernetes-extensions
license_name: Apache License version 2.0
version: 0.22.0
libraries:
- io.sundr: sundr-codegen
---
name: org.bouncycastle bcprov-ext-jdk15on
license_category: binary
module: extensions/druid-kubernetes-extensions
license_name: MIT License
version: 1.66
libraries:
- org.bouncycastle: bcprov-ext-jdk15on
---
name: io.sundr sundr-core
license_category: binary
module: extensions/druid-kubernetes-extensions
license_name: Apache License version 2.0
version: 0.22.0
libraries:
- io.sundr: sundr-core
---
name: com.squareup.okhttp3 logging-interceptor
license_category: binary
module: extensions/druid-kubernetes-extensions
license_name: Apache License version 2.0
version: 3.14.9
libraries:
- com.squareup.okhttp3: logging-interceptor
---
name: org.bouncycastle bcpkix-jdk15on
license_category: binary
module: extensions/druid-kubernetes-extensions
license_name: MIT License
version: 1.66
libraries:
- org.bouncycastle: bcpkix-jdk15on
---
name: com.github.vladimir-bukhtoyarov bucket4j-core
license_category: binary
module: extensions/druid-kubernetes-extensions
license_name: Apache License version 2.0
version: 4.10.0
libraries:
- com.github.vladimir-bukhtoyarov: bucket4j-core
---
name: Netty
license_category: binary
module: java-core

View File

@ -146,6 +146,7 @@
<module>cloud/aws-common</module>
<module>cloud/gcp-common</module>
<!-- Core extensions -->
<module>extensions-core/kubernetes-extensions</module>
<module>extensions-core/avro-extensions</module>
<module>extensions-core/azure-extensions</module>
<module>extensions-core/datasketches</module>

View File

@ -21,7 +21,6 @@ package org.apache.druid.curator.discovery;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import com.google.inject.Inject;
import org.apache.curator.framework.CuratorFramework;
@ -32,6 +31,7 @@ import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.utils.ZKPaths;
import org.apache.druid.concurrent.LifecycleLock;
import org.apache.druid.curator.cache.PathChildrenCacheFactory;
import org.apache.druid.discovery.BaseNodeRoleWatcher;
import org.apache.druid.discovery.DiscoveryDruidNode;
import org.apache.druid.discovery.DruidNodeDiscovery;
import org.apache.druid.discovery.DruidNodeDiscoveryProvider;
@ -52,16 +52,10 @@ import org.apache.druid.utils.CloseableUtils;
import javax.annotation.Nullable;
import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.BooleanSupplier;
/**
@ -172,24 +166,13 @@ public class CuratorDruidNodeDiscoveryProvider extends DruidNodeDiscoveryProvide
private final NodeRole nodeRole;
private final ObjectMapper jsonMapper;
/**
* hostAndPort -> DiscoveryDruidNode
*/
private final ConcurrentMap<String, DiscoveryDruidNode> nodes = new ConcurrentHashMap<>();
private final Collection<DiscoveryDruidNode> unmodifiableNodes = Collections.unmodifiableCollection(nodes.values());
private final BaseNodeRoleWatcher baseNodeRoleWatcher;
private final PathChildrenCache cache;
private final ExecutorService cacheExecutor;
private final ExecutorService listenerExecutor;
private final List<DruidNodeDiscovery.Listener> nodeListeners = new ArrayList<>();
private final Object lock = new Object();
private final CountDownLatch cacheInitialized = new CountDownLatch(1);
NodeRoleWatcher(
ExecutorService listenerExecutor,
CuratorFramework curatorFramework,
@ -198,10 +181,10 @@ public class CuratorDruidNodeDiscoveryProvider extends DruidNodeDiscoveryProvide
NodeRole nodeRole
)
{
this.listenerExecutor = listenerExecutor;
this.curatorFramework = curatorFramework;
this.nodeRole = nodeRole;
this.jsonMapper = jsonMapper;
this.baseNodeRoleWatcher = new BaseNodeRoleWatcher(listenerExecutor, nodeRole);
// This is required to be single threaded from docs in PathChildrenCache.
this.cacheExecutor = Execs.singleThreaded(
@ -236,39 +219,13 @@ public class CuratorDruidNodeDiscoveryProvider extends DruidNodeDiscoveryProvide
@Override
public Collection<DiscoveryDruidNode> getAllNodes()
{
boolean nodeViewInitialized;
try {
nodeViewInitialized = cacheInitialized.await((long) 30, TimeUnit.SECONDS);
}
catch (InterruptedException ex) {
Thread.currentThread().interrupt();
nodeViewInitialized = false;
}
if (!nodeViewInitialized) {
log.info(
"Cache for node role [%s] not initialized yet; getAllNodes() might not return full information.",
nodeRole.getJsonName()
);
}
return unmodifiableNodes;
return baseNodeRoleWatcher.getAllNodes();
}
@Override
public void registerListener(DruidNodeDiscovery.Listener listener)
{
synchronized (lock) {
// No need to wait on CountDownLatch, because we are holding the lock under which it could only be counted down.
if (cacheInitialized.getCount() == 0) {
safeSchedule(
() -> {
listener.nodesAdded(unmodifiableNodes);
listener.nodeViewInitialized();
},
"Exception occured in nodesAdded([%s]) in listener [%s].", unmodifiableNodes, listener
);
}
nodeListeners.add(listener);
}
baseNodeRoleWatcher.registerListener(listener);
}
void handleChildEvent(PathChildrenCacheEvent event)
@ -285,7 +242,7 @@ public class CuratorDruidNodeDiscoveryProvider extends DruidNodeDiscoveryProvide
break;
}
case INITIALIZED: {
cacheInitialized();
baseNodeRoleWatcher.cacheInitialized();
break;
}
default: {
@ -300,7 +257,7 @@ public class CuratorDruidNodeDiscoveryProvider extends DruidNodeDiscoveryProvide
}
@GuardedBy("lock")
void childAdded(PathChildrenCacheEvent event) throws IOException
private void childAdded(PathChildrenCacheEvent event) throws IOException
{
final byte[] data = getZkDataForNode(event.getData());
if (data == null) {
@ -311,48 +268,7 @@ public class CuratorDruidNodeDiscoveryProvider extends DruidNodeDiscoveryProvide
return;
}
DiscoveryDruidNode druidNode = jsonMapper.readValue(data, DiscoveryDruidNode.class);
if (!nodeRole.equals(druidNode.getNodeRole())) {
log.error(
"Node[%s] of role[%s] addition ignored due to mismatched role (expected role[%s]).",
druidNode.getDruidNode().getUriToUse(),
druidNode.getNodeRole().getJsonName(),
nodeRole.getJsonName()
);
return;
}
log.info("Node[%s] of role[%s] detected.", druidNode.getDruidNode().getUriToUse(), nodeRole.getJsonName());
addNode(druidNode);
}
@GuardedBy("lock")
private void addNode(DiscoveryDruidNode druidNode)
{
DiscoveryDruidNode prev = nodes.putIfAbsent(druidNode.getDruidNode().getHostAndPortToUse(), druidNode);
if (prev == null) {
// No need to wait on CountDownLatch, because we are holding the lock under which it could only be counted down.
if (cacheInitialized.getCount() == 0) {
List<DiscoveryDruidNode> newNode = ImmutableList.of(druidNode);
for (Listener listener : nodeListeners) {
safeSchedule(
() -> listener.nodesAdded(newNode),
"Exception occured in nodeAdded(node=[%s]) in listener [%s].",
druidNode.getDruidNode().getHostAndPortToUse(),
listener
);
}
}
} else {
log.error(
"Node[%s] of role[%s] discovered but existed already [%s].",
druidNode.getDruidNode().getUriToUse(),
nodeRole.getJsonName(),
prev
);
}
baseNodeRoleWatcher.childAdded(jsonMapper.readValue(data, DiscoveryDruidNode.class));
}
@GuardedBy("lock")
@ -364,50 +280,7 @@ public class CuratorDruidNodeDiscoveryProvider extends DruidNodeDiscoveryProvide
return;
}
DiscoveryDruidNode druidNode = jsonMapper.readValue(data, DiscoveryDruidNode.class);
if (!nodeRole.equals(druidNode.getNodeRole())) {
log.error(
"Node[%s] of role[%s] removal ignored due to mismatched role (expected role[%s]).",
druidNode.getDruidNode().getUriToUse(),
druidNode.getNodeRole().getJsonName(),
nodeRole.getJsonName()
);
return;
}
log.info("Node[%s] of role[%s] went offline.", druidNode.getDruidNode().getUriToUse(), nodeRole.getJsonName());
removeNode(druidNode);
}
@GuardedBy("lock")
private void removeNode(DiscoveryDruidNode druidNode)
{
DiscoveryDruidNode prev = nodes.remove(druidNode.getDruidNode().getHostAndPortToUse());
if (prev == null) {
log.error(
"Noticed disappearance of unknown druid node [%s] of role[%s].",
druidNode.getDruidNode().getUriToUse(),
druidNode.getNodeRole().getJsonName()
);
return;
}
// No need to wait on CountDownLatch, because we are holding the lock under which it could only be counted down.
if (cacheInitialized.getCount() == 0) {
List<DiscoveryDruidNode> nodeRemoved = ImmutableList.of(druidNode);
for (Listener listener : nodeListeners) {
safeSchedule(
() -> listener.nodesRemoved(nodeRemoved),
"Exception occured in nodeRemoved(node[%s] of role[%s]) in listener [%s].",
druidNode.getDruidNode().getUriToUse(),
druidNode.getNodeRole().getJsonName(),
listener
);
}
}
baseNodeRoleWatcher.childRemoved(jsonMapper.readValue(data, DiscoveryDruidNode.class));
}
/**
@ -425,45 +298,6 @@ public class CuratorDruidNodeDiscoveryProvider extends DruidNodeDiscoveryProvide
return null;
}
}
@GuardedBy("lock")
private void cacheInitialized()
{
// No need to wait on CountDownLatch, because we are holding the lock under which it could only be
// counted down.
if (cacheInitialized.getCount() == 0) {
log.error("cache is already initialized. ignoring cache initialization event.");
return;
}
log.info("Node watcher of role[%s] is now initialized.", nodeRole.getJsonName());
for (Listener listener : nodeListeners) {
safeSchedule(
() -> {
listener.nodesAdded(unmodifiableNodes);
listener.nodeViewInitialized();
},
"Exception occured in nodesAdded([%s]) in listener [%s].",
unmodifiableNodes,
listener
);
}
cacheInitialized.countDown();
}
private void safeSchedule(Runnable runnable, String errMsgFormat, Object... args)
{
listenerExecutor.submit(() -> {
try {
runnable.run();
}
catch (Exception ex) {
log.error(errMsgFormat, args);
}
});
}
}
private static class NodeDiscoverer implements Closeable

View File

@ -0,0 +1,301 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.discovery;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import org.apache.druid.java.util.common.logger.Logger;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
/**
* Common code used by various implementations of DruidNodeDiscovery.
*
* User code is supposed to arrange for following methods to be called,
* {@link #childAdded(DiscoveryDruidNode)}
* {@link #childRemoved(DiscoveryDruidNode)}
* {@link #cacheInitialized()}
* {@link #resetNodes(Map)}
*
* Then {@link #registerListener(DruidNodeDiscovery.Listener)} and {@link #getAllNodes()} can be delegated to the
* implementation here.
*/
public class BaseNodeRoleWatcher
{
private static final Logger LOGGER = new Logger(BaseNodeRoleWatcher.class);
private final NodeRole nodeRole;
/**
* hostAndPort -> DiscoveryDruidNode
*/
private final ConcurrentMap<String, DiscoveryDruidNode> nodes = new ConcurrentHashMap<>();
private final Collection<DiscoveryDruidNode> unmodifiableNodes = Collections.unmodifiableCollection(nodes.values());
private final ExecutorService listenerExecutor;
private final List<DruidNodeDiscovery.Listener> nodeListeners = new ArrayList<>();
private final Object lock = new Object();
private final CountDownLatch cacheInitialized = new CountDownLatch(1);
public BaseNodeRoleWatcher(
ExecutorService listenerExecutor,
NodeRole nodeRole
)
{
this.listenerExecutor = listenerExecutor;
this.nodeRole = nodeRole;
}
public Collection<DiscoveryDruidNode> getAllNodes()
{
boolean nodeViewInitialized;
try {
nodeViewInitialized = cacheInitialized.await((long) 30, TimeUnit.SECONDS);
}
catch (InterruptedException ex) {
Thread.currentThread().interrupt();
nodeViewInitialized = false;
}
if (!nodeViewInitialized) {
LOGGER.info(
"Cache for node role [%s] not initialized yet; getAllNodes() might not return full information.",
nodeRole.getJsonName()
);
}
return unmodifiableNodes;
}
public void registerListener(DruidNodeDiscovery.Listener listener)
{
synchronized (lock) {
// No need to wait on CountDownLatch, because we are holding the lock under which it could only be counted down.
if (cacheInitialized.getCount() == 0) {
// It is important to take a snapshot here as list of nodes might change by the time listeners process
// the changes.
List<DiscoveryDruidNode> currNodes = Lists.newArrayList(nodes.values());
safeSchedule(
() -> {
listener.nodesAdded(currNodes);
listener.nodeViewInitialized();
},
"Exception occured in nodesAdded([%s]) in listener [%s].", currNodes, listener
);
}
nodeListeners.add(listener);
}
}
public void childAdded(DiscoveryDruidNode druidNode)
{
synchronized (lock) {
if (!nodeRole.equals(druidNode.getNodeRole())) {
LOGGER.error(
"Node[%s] of role[%s] addition ignored due to mismatched role (expected role[%s]).",
druidNode.getDruidNode().getUriToUse(),
druidNode.getNodeRole().getJsonName(),
nodeRole.getJsonName()
);
return;
}
LOGGER.info("Node[%s] of role[%s] detected.", druidNode.getDruidNode().getUriToUse(), nodeRole.getJsonName());
addNode(druidNode);
}
}
@GuardedBy("lock")
private void addNode(DiscoveryDruidNode druidNode)
{
DiscoveryDruidNode prev = nodes.putIfAbsent(druidNode.getDruidNode().getHostAndPortToUse(), druidNode);
if (prev == null) {
// No need to wait on CountDownLatch, because we are holding the lock under which it could only be counted down.
if (cacheInitialized.getCount() == 0) {
List<DiscoveryDruidNode> newNode = ImmutableList.of(druidNode);
for (DruidNodeDiscovery.Listener listener : nodeListeners) {
safeSchedule(
() -> listener.nodesAdded(newNode),
"Exception occured in nodeAdded(node=[%s]) in listener [%s].",
druidNode.getDruidNode().getHostAndPortToUse(),
listener
);
}
}
} else {
LOGGER.error(
"Node[%s] of role[%s] discovered but existed already [%s].",
druidNode.getDruidNode().getUriToUse(),
nodeRole.getJsonName(),
prev
);
}
}
public void childRemoved(DiscoveryDruidNode druidNode)
{
synchronized (lock) {
if (!nodeRole.equals(druidNode.getNodeRole())) {
LOGGER.error(
"Node[%s] of role[%s] removal ignored due to mismatched role (expected role[%s]).",
druidNode.getDruidNode().getUriToUse(),
druidNode.getNodeRole().getJsonName(),
nodeRole.getJsonName()
);
return;
}
LOGGER.info("Node[%s] of role[%s] went offline.", druidNode.getDruidNode().getUriToUse(), nodeRole.getJsonName());
removeNode(druidNode);
}
}
@GuardedBy("lock")
private void removeNode(DiscoveryDruidNode druidNode)
{
DiscoveryDruidNode prev = nodes.remove(druidNode.getDruidNode().getHostAndPortToUse());
if (prev == null) {
LOGGER.error(
"Noticed disappearance of unknown druid node [%s] of role[%s].",
druidNode.getDruidNode().getUriToUse(),
druidNode.getNodeRole().getJsonName()
);
return;
}
// No need to wait on CountDownLatch, because we are holding the lock under which it could only be counted down.
if (cacheInitialized.getCount() == 0) {
List<DiscoveryDruidNode> nodeRemoved = ImmutableList.of(druidNode);
for (DruidNodeDiscovery.Listener listener : nodeListeners) {
safeSchedule(
() -> listener.nodesRemoved(nodeRemoved),
"Exception occured in nodeRemoved(node[%s] of role[%s]) in listener [%s].",
druidNode.getDruidNode().getUriToUse(),
druidNode.getNodeRole().getJsonName(),
listener
);
}
}
}
public void cacheInitialized()
{
synchronized (lock) {
// No need to wait on CountDownLatch, because we are holding the lock under which it could only be
// counted down.
if (cacheInitialized.getCount() == 0) {
LOGGER.error("cache is already initialized. ignoring cache initialization event.");
return;
}
LOGGER.info("Node watcher of role[%s] is now initialized.", nodeRole.getJsonName());
for (DruidNodeDiscovery.Listener listener : nodeListeners) {
// It is important to take a snapshot here as list of nodes might change by the time listeners process
// the changes.
List<DiscoveryDruidNode> currNodes = Lists.newArrayList(nodes.values());
safeSchedule(
() -> {
listener.nodesAdded(currNodes);
listener.nodeViewInitialized();
},
"Exception occured in nodesAdded([%s]) in listener [%s].",
currNodes,
listener
);
}
cacheInitialized.countDown();
}
}
public void resetNodes(Map<String, DiscoveryDruidNode> fullNodes)
{
synchronized (lock) {
List<DiscoveryDruidNode> nodesAdded = new ArrayList<>();
List<DiscoveryDruidNode> nodesDeleted = new ArrayList<>();
for (Map.Entry<String, DiscoveryDruidNode> e : fullNodes.entrySet()) {
if (!nodes.containsKey(e.getKey())) {
nodesAdded.add(e.getValue());
}
}
for (Map.Entry<String, DiscoveryDruidNode> e : nodes.entrySet()) {
if (!fullNodes.containsKey(e.getKey())) {
nodesDeleted.add(e.getValue());
}
}
for (DiscoveryDruidNode node : nodesDeleted) {
nodes.remove(node.getDruidNode().getHostAndPortToUse());
}
for (DiscoveryDruidNode node : nodesAdded) {
nodes.put(node.getDruidNode().getHostAndPortToUse(), node);
}
// No need to wait on CountDownLatch, because we are holding the lock under which it could only be counted down.
if (cacheInitialized.getCount() == 0) {
for (DruidNodeDiscovery.Listener listener : nodeListeners) {
safeSchedule(
() -> {
if (!nodesAdded.isEmpty()) {
listener.nodesAdded(nodesAdded);
}
if (!nodesDeleted.isEmpty()) {
listener.nodesRemoved(nodesDeleted);
}
},
"Exception occured in resetNodes in listener [%s].",
listener
);
}
}
}
}
private void safeSchedule(Runnable runnable, String errMsgFormat, Object... args)
{
listenerExecutor.submit(() -> {
try {
runnable.run();
}
catch (Exception ex) {
LOGGER.error(errMsgFormat, args);
}
});
}
}

View File

@ -39,6 +39,7 @@ public interface DruidLeaderSelector
* Get ID of current Leader. Returns NULL if it can't find the leader.
* Note that it is possible for leadership to change right after this call returns, so caller would get wrong
* leader.
* This is expected to work even if a listener is not registered.
*/
@Nullable
String getCurrentLeader();

View File

@ -0,0 +1,161 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.discovery;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.server.DruidNode;
import org.junit.Assert;
import org.junit.Test;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
public class BaseNodeRoleWatcherTest
{
@Test(timeout = 60_000L)
public void testGeneralUseSimulation()
{
BaseNodeRoleWatcher nodeRoleWatcher = new BaseNodeRoleWatcher(
Execs.directExecutor(),
NodeRole.BROKER
);
DiscoveryDruidNode broker1 = buildDiscoveryDruidNode(NodeRole.BROKER, "broker1");
DiscoveryDruidNode broker2 = buildDiscoveryDruidNode(NodeRole.BROKER, "broker2");
DiscoveryDruidNode broker3 = buildDiscoveryDruidNode(NodeRole.BROKER, "broker3");
DiscoveryDruidNode notBroker = new DiscoveryDruidNode(
new DruidNode("s3", "h3", false, 8080, null, true, false),
NodeRole.COORDINATOR,
ImmutableMap.of()
);
TestListener listener1 = new TestListener();
TestListener listener2 = new TestListener();
TestListener listener3 = new TestListener();
nodeRoleWatcher.registerListener(listener1);
nodeRoleWatcher.childAdded(broker1);
nodeRoleWatcher.childAdded(broker2);
nodeRoleWatcher.childAdded(notBroker);
nodeRoleWatcher.childAdded(broker3);
nodeRoleWatcher.registerListener(listener2);
nodeRoleWatcher.childRemoved(broker2);
assertListener(listener1, false, Collections.emptyList(), Collections.emptyList());
assertListener(listener2, false, Collections.emptyList(), Collections.emptyList());
nodeRoleWatcher.cacheInitialized();
nodeRoleWatcher.registerListener(listener3);
List<DiscoveryDruidNode> presentNodes = new ArrayList<>(nodeRoleWatcher.getAllNodes());
Assert.assertEquals(2, presentNodes.size());
Assert.assertTrue(presentNodes.contains(broker1));
Assert.assertTrue(presentNodes.contains(broker3));
assertListener(listener1, true, presentNodes, Collections.emptyList());
assertListener(listener2, true, presentNodes, Collections.emptyList());
assertListener(listener3, true, presentNodes, Collections.emptyList());
nodeRoleWatcher.childRemoved(notBroker);
nodeRoleWatcher.childRemoved(broker2);
nodeRoleWatcher.childAdded(broker2);
nodeRoleWatcher.childRemoved(broker3);
nodeRoleWatcher.childAdded(broker1);
Assert.assertEquals(ImmutableSet.of(broker2, broker1), new HashSet<>(nodeRoleWatcher.getAllNodes()));
List<DiscoveryDruidNode> nodesAdded = new ArrayList<>(presentNodes);
nodesAdded.add(broker2);
List<DiscoveryDruidNode> nodesRemoved = new ArrayList<>();
nodesRemoved.add(broker3);
assertListener(listener1, true, nodesAdded, nodesRemoved);
assertListener(listener2, true, nodesAdded, nodesRemoved);
assertListener(listener3, true, nodesAdded, nodesRemoved);
LinkedHashMap<String, DiscoveryDruidNode> resetNodes = new LinkedHashMap<>();
resetNodes.put(broker2.getDruidNode().getHostAndPortToUse(), broker2);
resetNodes.put(broker3.getDruidNode().getHostAndPortToUse(), broker3);
nodeRoleWatcher.resetNodes(resetNodes);
Assert.assertEquals(ImmutableSet.of(broker2, broker3), new HashSet<>(nodeRoleWatcher.getAllNodes()));
nodesAdded.add(broker3);
nodesRemoved.add(broker1);
assertListener(listener1, true, nodesAdded, nodesRemoved);
assertListener(listener2, true, nodesAdded, nodesRemoved);
assertListener(listener3, true, nodesAdded, nodesRemoved);
}
private DiscoveryDruidNode buildDiscoveryDruidNode(NodeRole role, String host)
{
return new DiscoveryDruidNode(
new DruidNode("s", host, false, 8080, null, true, false),
role,
ImmutableMap.of()
);
}
private void assertListener(TestListener listener, boolean nodeViewInitialized, List<DiscoveryDruidNode> nodesAdded, List<DiscoveryDruidNode> nodesRemoved)
{
Assert.assertEquals(nodeViewInitialized, listener.nodeViewInitialized.get());
Assert.assertEquals(nodesAdded, listener.nodesAddedList);
Assert.assertEquals(nodesRemoved, listener.nodesRemovedList);
}
public static class TestListener implements DruidNodeDiscovery.Listener
{
private final AtomicBoolean nodeViewInitialized = new AtomicBoolean(false);
private final List<DiscoveryDruidNode> nodesAddedList = new ArrayList<>();
private final List<DiscoveryDruidNode> nodesRemovedList = new ArrayList<>();
@Override
public void nodesAdded(Collection<DiscoveryDruidNode> nodes)
{
nodesAddedList.addAll(nodes);
}
@Override
public void nodesRemoved(Collection<DiscoveryDruidNode> nodes)
{
nodesRemovedList.addAll(nodes);
}
@Override
public void nodeViewInitialized()
{
if (!nodeViewInitialized.compareAndSet(false, true)) {
throw new RuntimeException("NodeViewInitialized called again!");
}
}
}
}

View File

@ -20,6 +20,7 @@
package org.apache.druid.cli;
import com.google.common.collect.ImmutableList;
import com.google.inject.Inject;
import com.google.inject.Key;
import com.google.inject.Module;
import com.google.inject.name.Names;
@ -33,6 +34,7 @@ import org.apache.druid.client.cache.CacheConfig;
import org.apache.druid.client.selector.CustomTierSelectorStrategyConfig;
import org.apache.druid.client.selector.ServerSelectorStrategy;
import org.apache.druid.client.selector.TierSelectorStrategy;
import org.apache.druid.curator.ZkEnablementConfig;
import org.apache.druid.discovery.DataNodeService;
import org.apache.druid.discovery.LookupNodeService;
import org.apache.druid.discovery.NodeRole;
@ -71,6 +73,7 @@ import org.apache.druid.timeline.PruneLoadSpec;
import org.eclipse.jetty.server.Server;
import java.util.List;
import java.util.Properties;
@Command(
name = "broker",
@ -80,11 +83,19 @@ public class CliBroker extends ServerRunnable
{
private static final Logger log = new Logger(CliBroker.class);
private boolean isZkEnabled = true;
public CliBroker()
{
super(log);
}
@Inject
public void configure(Properties properties)
{
isZkEnabled = ZkEnablementConfig.isEnabled(properties);
}
@Override
protected List<? extends Module> getModules()
{
@ -137,7 +148,9 @@ public class CliBroker extends ServerRunnable
Jerseys.addResource(binder, HistoricalResource.class);
Jerseys.addResource(binder, SegmentListerResource.class);
LifecycleModule.register(binder, ZkCoordinator.class);
if (isZkEnabled) {
LifecycleModule.register(binder, ZkCoordinator.class);
}
bindNodeRoleAndAnnouncer(
binder,

View File

@ -159,7 +159,10 @@ public class CliIndexer extends ServerRunnable
binder.bind(SegmentManager.class).in(LazySingleton.class);
binder.bind(ZkCoordinator.class).in(ManageLifecycle.class);
Jerseys.addResource(binder, HistoricalResource.class);
LifecycleModule.register(binder, ZkCoordinator.class);
if (isZkEnabled) {
LifecycleModule.register(binder, ZkCoordinator.class);
}
bindNodeRoleAndAnnouncer(
binder,

View File

@ -40,6 +40,7 @@ import org.apache.druid.client.cache.CacheConfig;
import org.apache.druid.client.coordinator.CoordinatorClient;
import org.apache.druid.client.indexing.HttpIndexingServiceClient;
import org.apache.druid.client.indexing.IndexingServiceClient;
import org.apache.druid.curator.ZkEnablementConfig;
import org.apache.druid.discovery.NodeRole;
import org.apache.druid.guice.Binders;
import org.apache.druid.guice.CacheModule;
@ -156,6 +157,7 @@ public class CliPeon extends GuiceRunnable
@Option(name = "--nodeType", title = "nodeType", description = "Set the node type to expose on ZK")
public String serverType = "indexer-executor";
private boolean isZkEnabled = true;
/**
* If set to "true", the peon will bind classes necessary for loading broadcast segments. This is used for
@ -166,7 +168,6 @@ public class CliPeon extends GuiceRunnable
private static final Logger log = new Logger(CliPeon.class);
@Inject
private Properties properties;
public CliPeon()
@ -174,6 +175,13 @@ public class CliPeon extends GuiceRunnable
super(log);
}
@Inject
public void configure(Properties properties)
{
this.properties = properties;
isZkEnabled = ZkEnablementConfig.isEnabled(properties);
}
@Override
protected List<? extends Module> getModules()
{
@ -235,7 +243,10 @@ public class CliPeon extends GuiceRunnable
binder.bind(SegmentManager.class).in(LazySingleton.class);
binder.bind(ZkCoordinator.class).in(ManageLifecycle.class);
Jerseys.addResource(binder, HistoricalResource.class);
LifecycleModule.register(binder, ZkCoordinator.class);
if (isZkEnabled) {
LifecycleModule.register(binder, ZkCoordinator.class);
}
}
}

View File

@ -239,6 +239,7 @@ deserialize
deserialized
downtimes
druid
druidkubernetes-extensions
e.g.
encodings
endian
@ -278,6 +279,7 @@ joinable
kerberos
keystore
keytab
kubernetes
laning
lifecycle
localhost
@ -700,6 +702,12 @@ OAuth
Okta
OpenID
pac4j
- ../docs/development/extensions-core/kubernetes.md
Env
POD_NAME
POD_NAMESPACE
ConfigMap
PT17S
- ../docs/development/extensions-core/google.md
GCS
StaticGoogleBlobStoreFirehose