Support custom coordinator duties (#11601)

* impl

* fix checkstyle

* fix checkstyle

* fix checkstyle

* add test

* add test

* add test

* add integration tests

* add integration tests

* add more docs

* address comments

* address comments

* address comments

* add test

* fix checkstyle

* fix test
This commit is contained in:
Maytas Monsereenusorn 2021-08-19 11:54:11 +07:00 committed by GitHub
parent 91cd573472
commit ce4dd48bb8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 875 additions and 6 deletions

View File

@ -472,6 +472,10 @@ jobs:
name: "(Compile=openjdk8, Run=openjdk8) kafka index, transactional kafka index integration test with Indexer"
env: TESTNG_GROUPS='-Dgroups=kafka-index,kafka-transactional-index' JVM_RUNTIME='-Djvm.runtime=8' USE_INDEXER='indexer'
- <<: *integration_kafka_index
name: "(Compile=openjdk8, Run=openjdk8) custom coordinator duties integration test"
env: TESTNG_GROUPS='-Dgroups=custom-coordinator-duties' JVM_RUNTIME='-Djvm.runtime=8' USE_INDEXER='middleManager'
- &integration_kafka_index_slow
name: "(Compile=openjdk8, Run=openjdk8) kafka index integration test slow"
stage: Tests - phase 2
@ -605,13 +609,13 @@ jobs:
stage: Tests - phase 2
jdk: openjdk8
services: *integration_test_services
env: TESTNG_GROUPS='-DexcludedGroups=batch-index,input-format,input-source,perfect-rollup-parallel-batch-index,kafka-index,query,query-retry,query-error,realtime-index,security,ldap-security,s3-deep-storage,gcs-deep-storage,azure-deep-storage,hdfs-deep-storage,s3-ingestion,kinesis-index,kinesis-data-format,kafka-transactional-index,kafka-index-slow,kafka-transactional-index-slow,kafka-data-format,hadoop-s3-to-s3-deep-storage,hadoop-s3-to-hdfs-deep-storage,hadoop-azure-to-azure-deep-storage,hadoop-azure-to-hdfs-deep-storage,hadoop-gcs-to-gcs-deep-storage,hadoop-gcs-to-hdfs-deep-storage,aliyun-oss-deep-storage,append-ingestion,compaction,high-availability,upgrade,shuffle-deep-store' JVM_RUNTIME='-Djvm.runtime=8' USE_INDEXER='middleManager'
env: TESTNG_GROUPS='-DexcludedGroups=batch-index,input-format,input-source,perfect-rollup-parallel-batch-index,kafka-index,query,query-retry,query-error,realtime-index,security,ldap-security,s3-deep-storage,gcs-deep-storage,azure-deep-storage,hdfs-deep-storage,s3-ingestion,kinesis-index,kinesis-data-format,kafka-transactional-index,kafka-index-slow,kafka-transactional-index-slow,kafka-data-format,hadoop-s3-to-s3-deep-storage,hadoop-s3-to-hdfs-deep-storage,hadoop-azure-to-azure-deep-storage,hadoop-azure-to-hdfs-deep-storage,hadoop-gcs-to-gcs-deep-storage,hadoop-gcs-to-hdfs-deep-storage,aliyun-oss-deep-storage,append-ingestion,compaction,high-availability,upgrade,shuffle-deep-store,custom-coordinator-duties' JVM_RUNTIME='-Djvm.runtime=8' USE_INDEXER='middleManager'
script: *run_integration_test
after_failure: *integration_test_diags
- <<: *integration_tests
name: "(Compile=openjdk8, Run=openjdk8) other integration tests with Indexer"
env: TESTNG_GROUPS='-DexcludedGroups=batch-index,input-format,input-source,perfect-rollup-parallel-batch-index,kafka-index,query,query-retry,query-error,realtime-index,security,ldap-security,s3-deep-storage,gcs-deep-storage,azure-deep-storage,hdfs-deep-storage,s3-ingestion,kinesis-index,kinesis-data-format,kafka-transactional-index,kafka-index-slow,kafka-transactional-index-slow,kafka-data-format,hadoop-s3-to-s3-deep-storage,hadoop-s3-to-hdfs-deep-storage,hadoop-azure-to-azure-deep-storage,hadoop-azure-to-hdfs-deep-storage,hadoop-gcs-to-gcs-deep-storage,hadoop-gcs-to-hdfs-deep-storage,aliyun-oss-deep-storage,append-ingestion,compaction,high-availability,upgrade,shuffle-deep-store' JVM_RUNTIME='-Djvm.runtime=8' USE_INDEXER='indexer'
env: TESTNG_GROUPS='-DexcludedGroups=batch-index,input-format,input-source,perfect-rollup-parallel-batch-index,kafka-index,query,query-retry,query-error,realtime-index,security,ldap-security,s3-deep-storage,gcs-deep-storage,azure-deep-storage,hdfs-deep-storage,s3-ingestion,kinesis-index,kinesis-data-format,kafka-transactional-index,kafka-index-slow,kafka-transactional-index-slow,kafka-data-format,hadoop-s3-to-s3-deep-storage,hadoop-s3-to-hdfs-deep-storage,hadoop-azure-to-azure-deep-storage,hadoop-azure-to-hdfs-deep-storage,hadoop-gcs-to-gcs-deep-storage,hadoop-gcs-to-hdfs-deep-storage,aliyun-oss-deep-storage,append-ingestion,compaction,high-availability,upgrade,shuffle-deep-store,custom-coordinator-duties' JVM_RUNTIME='-Djvm.runtime=8' USE_INDEXER='indexer'
- <<: *integration_tests
name: "(Compile=openjdk8, Run=openjdk8) leadership and high availability integration tests"
@ -689,7 +693,7 @@ jobs:
- <<: *integration_tests
name: "(Compile=openjdk8, Run=openjdk11) other integration test"
jdk: openjdk8
env: TESTNG_GROUPS='-DexcludedGroups=batch-index,input-format,input-source,perfect-rollup-parallel-batch-index,kafka-index,query,query-retry,query-error,realtime-index,security,ldap-security,s3-deep-storage,gcs-deep-storage,azure-deep-storage,hdfs-deep-storage,s3-ingestion,kinesis-index,kinesis-data-format,kafka-transactional-index,kafka-index-slow,kafka-transactional-index-slow,kafka-data-format,hadoop-s3-to-s3-deep-storage,hadoop-s3-to-hdfs-deep-storage,hadoop-azure-to-azure-deep-storage,hadoop-azure-to-hdfs-deep-storage,hadoop-gcs-to-gcs-deep-storage,hadoop-gcs-to-hdfs-deep-storage,aliyun-oss-deep-storage,append-ingestion,compaction,high-availability,upgrade' JVM_RUNTIME='-Djvm.runtime=11' USE_INDEXER='middleManager'
env: TESTNG_GROUPS='-DexcludedGroups=batch-index,input-format,input-source,perfect-rollup-parallel-batch-index,kafka-index,query,query-retry,query-error,realtime-index,security,ldap-security,s3-deep-storage,gcs-deep-storage,azure-deep-storage,hdfs-deep-storage,s3-ingestion,kinesis-index,kinesis-data-format,kafka-transactional-index,kafka-index-slow,kafka-transactional-index-slow,kafka-data-format,hadoop-s3-to-s3-deep-storage,hadoop-s3-to-hdfs-deep-storage,hadoop-azure-to-azure-deep-storage,hadoop-azure-to-hdfs-deep-storage,hadoop-gcs-to-gcs-deep-storage,hadoop-gcs-to-hdfs-deep-storage,aliyun-oss-deep-storage,append-ingestion,compaction,high-availability,upgrade,shuffle-deep-store,custom-coordinator-duties' JVM_RUNTIME='-Djvm.runtime=11' USE_INDEXER='middleManager'
- <<: *integration_tests
name: "(Compile=openjdk8, Run=openjdk11) leadership and high availability integration tests"

View File

@ -324,6 +324,38 @@ public class MyTransformModule implements DruidModule {
}
```
### Adding your own custom pluggable Coordinator Duty
The coordinator periodically runs jobs, so-called `CoordinatorDuty` which include loading new segments, segment balancing, etc.
Druid users can add custom pluggable coordinator duties, which are not part of Core Druid, without modifying any Core Druid classes.
Users can do this by writing their own custom coordinator duty implementing the interface `CoordinatorCustomDuty` and setting the `JsonTypeName`.
Next, users will need to register their custom coordinator as subtypes in their Module's `DruidModule#getJacksonModules()`.
Once these steps are done, user will be able to load their custom coordinator duty using the following properties:
```
druid.coordinator.dutyGroups=[<GROUP_NAME_1>, <GROUP_NAME_2>, ...]
druid.coordinator.<GROUP_NAME_1>.duties=[<DUTY_NAME_MATCHING_JSON_TYPE_NAME_1>, <DUTY_NAME_MATCHING_JSON_TYPE_NAME_2>, ...]
druid.coordinator.<GROUP_NAME_1>.period=<GROUP_NAME_1_RUN_PERIOD>
druid.coordinator.<GROUP_NAME_1>.duty.<DUTY_NAME_MATCHING_JSON_TYPE_NAME_1>.<SOME_CONFIG_1_KEY>=<SOME_CONFIG_1_VALUE>
druid.coordinator.<GROUP_NAME_1>.duty.<DUTY_NAME_MATCHING_JSON_TYPE_NAME_1>.<SOME_CONFIG_2_KEY>=<SOME_CONFIG_2_VALUE>
```
In the new system for pluggable Coordinator duties, similar to what coordinator already does today, the duties can be grouped together.
The duties will be grouped into multiple groups as per the elements in list `druid.coordinator.dutyGroups`.
All duties in the same group will have the same run period configured by `druid.coordinator.<GROUP_NAME>.period`.
Currently, there is a single thread running the duties sequentially for each group.
For example, see `KillSupervisorsCustomDuty` for a custom coordinator duty implementation and `common-custom-coordinator-duties`
integration test group which loads `KillSupervisorsCustomDuty` using the configs set in `integration-tests/docker/environment-configs/common-custom-coordinator-duties`.
The relevant configs in `integration-tests/docker/environment-configs/common-custom-coordinator-duties` are as follows:
(The configs create a custom coordinator duty group called `cleanupMetadata` which runs a custom coordinator duty called `killSupervisors` every 10 seconds.
The custom coordinator duty `killSupervisors` also has a config called `retainDuration` which is set to 0 minute)
```
druid.coordinator.dutyGroups=["cleanupMetadata"]
druid.coordinator.cleanupMetadata.duties=["killSupervisors"]
druid.coordinator.cleanupMetadata.duty.killSupervisors.retainDuration=PT0M
druid.coordinator.cleanupMetadata.period=PT10S
```
### Bundle your extension with all the other Druid extensions
When you do `mvn install`, Druid extensions will be packaged within the Druid tarball and `extensions` directory, which are both underneath `distribution/target/`.

View File

@ -0,0 +1,103 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
version: "2.2"
services:
druid-zookeeper-kafka:
extends:
file: docker-compose.base.yml
service: druid-zookeeper-kafka
druid-metadata-storage:
extends:
file: docker-compose.base.yml
service: druid-metadata-storage
environment:
- DRUID_INTEGRATION_TEST_GROUP=${DRUID_INTEGRATION_TEST_GROUP}
depends_on:
- druid-zookeeper-kafka
druid-coordinator:
extends:
file: docker-compose.base.yml
service: druid-coordinator
env_file:
- ./environment-configs/common-custom-coordinator-duties
environment:
- DRUID_INTEGRATION_TEST_GROUP=${DRUID_INTEGRATION_TEST_GROUP}
depends_on:
- druid-metadata-storage
- druid-zookeeper-kafka
druid-overlord:
extends:
file: docker-compose.base.yml
service: druid-overlord
environment:
- DRUID_INTEGRATION_TEST_GROUP=${DRUID_INTEGRATION_TEST_GROUP}
depends_on:
- druid-coordinator
- druid-metadata-storage
- druid-zookeeper-kafka
druid-historical:
extends:
file: docker-compose.base.yml
service: druid-historical
environment:
- DRUID_INTEGRATION_TEST_GROUP=${DRUID_INTEGRATION_TEST_GROUP}
depends_on:
- druid-zookeeper-kafka
druid-middlemanager:
extends:
file: docker-compose.base.yml
service: druid-middlemanager
environment:
- DRUID_INTEGRATION_TEST_GROUP=${DRUID_INTEGRATION_TEST_GROUP}
depends_on:
- druid-zookeeper-kafka
- druid-overlord
druid-broker:
extends:
file: docker-compose.base.yml
service: druid-broker
environment:
- DRUID_INTEGRATION_TEST_GROUP=${DRUID_INTEGRATION_TEST_GROUP}
depends_on:
- druid-coordinator
- druid-zookeeper-kafka
- druid-middlemanager
- druid-historical
druid-router:
extends:
file: docker-compose.base.yml
service: druid-router
environment:
- DRUID_INTEGRATION_TEST_GROUP=${DRUID_INTEGRATION_TEST_GROUP}
depends_on:
- druid-zookeeper-kafka
- druid-coordinator
- druid-broker
- druid-overlord
networks:
druid-it-net:
name: druid-it-net
ipam:
config:
- subnet: 172.172.172.0/24

View File

@ -0,0 +1,80 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
LANG=C.UTF-8
LANGUAGE=C.UTF-8
LC_ALL=C.UTF-8
# JAVA OPTS
COMMON_DRUID_JAVA_OPTS=-Duser.timezone=UTC -Dfile.encoding=UTF-8 -Dlog4j.configurationFile=/shared/docker/lib/log4j2.xml -XX:+ExitOnOutOfMemoryError -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/tmp
DRUID_DEP_LIB_DIR=/shared/hadoop_xml:/shared/docker/lib/*:/usr/local/druid/lib/mysql-connector-java.jar
# Druid configs
druid_extensions_loadList=[]
druid_extensions_directory=/shared/docker/extensions
druid_auth_authenticator_basic_authorizerName=basic
druid_auth_authenticator_basic_initialAdminPassword=priest
druid_auth_authenticator_basic_initialInternalClientPassword=warlock
druid_auth_authenticator_basic_type=basic
druid_auth_authenticatorChain=["basic"]
druid_auth_authorizer_basic_type=basic
druid_auth_authorizers=["basic"]
druid_client_https_certAlias=druid
druid_client_https_keyManagerPassword=druid123
druid_client_https_keyStorePassword=druid123
druid_client_https_keyStorePath=/tls/server.jks
druid_client_https_protocol=TLSv1.2
druid_client_https_trustStoreAlgorithm=PKIX
druid_client_https_trustStorePassword=druid123
druid_client_https_trustStorePath=/tls/truststore.jks
druid_enableTlsPort=true
druid_escalator_authorizerName=basic
druid_escalator_internalClientPassword=warlock
druid_escalator_internalClientUsername=druid_system
druid_escalator_type=basic
druid_lookup_numLookupLoadingThreads=1
druid_server_http_numThreads=20
# Allow OPTIONS method for ITBasicAuthConfigurationTest.testSystemSchemaAccess
druid_server_http_allowedHttpMethods=["OPTIONS"]
druid_server_https_certAlias=druid
druid_server_https_keyManagerPassword=druid123
druid_server_https_keyStorePassword=druid123
druid_server_https_keyStorePath=/tls/server.jks
druid_server_https_keyStoreType=jks
druid_server_https_requireClientCertificate=true
druid_server_https_trustStoreAlgorithm=PKIX
druid_server_https_trustStorePassword=druid123
druid_server_https_trustStorePath=/tls/truststore.jks
druid_server_https_validateHostnames=true
druid_zk_service_host=druid-zookeeper-kafka
druid_auth_basic_common_maxSyncRetries=20
druid_indexer_logs_directory=/shared/tasklogs
druid_sql_enable=true
druid_extensions_hadoopDependenciesDir=/shared/hadoop-dependencies
druid_request_logging_type=slf4j
# Testing the legacy config from https://github.com/apache/druid/pull/10267
# Can remove this when the flag is no longer needed
druid_indexer_task_ignoreTimestampSpecForDruidInputSource=true
#Testing kill supervisor custom coordinator duty
druid.coordinator.dutyGroups=["cleanupMetadata"]
druid.coordinator.cleanupMetadata.duties=["killSupervisors"]
druid.coordinator.cleanupMetadata.duty.killSupervisors.retainDuration=PT0M
druid.coordinator.cleanupMetadata.period=PT10S

View File

@ -61,6 +61,10 @@ getComposeArgs()
# default + additional historical modified for query error test
# See CliHistoricalForQueryRetryTest.
echo "-f ${DOCKERDIR}/docker-compose.query-error-test.yml"
elif [ "$DRUID_INTEGRATION_TEST_GROUP" = "custom-coordinator-duties" ]
then
# default + custom for Coordinator to enable custom coordinator duties
echo "-f ${DOCKERDIR}/docker-compose.custom-coordinator-duties.yml"
elif [ "$DRUID_INTEGRATION_TEST_GROUP" = "high-availability" ]
then
# the 'high availability' test cluster with multiple coordinators and overlords

View File

@ -157,4 +157,6 @@ public class TestNGGroup
public static final String HIGH_AVAILABILTY = "high-availability";
public static final String SHUFFLE_DEEP_STORE = "shuffle-deep-store";
public static final String CUSTOM_COORDINATOR_DUTIES = "custom-coordinator-duties";
}

View File

@ -0,0 +1,54 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.tests.parallelized;
import org.apache.druid.testing.guice.DruidTestModuleFactory;
import org.apache.druid.tests.TestNGGroup;
import org.apache.druid.tests.indexer.AbstractKafkaIndexingServiceTest;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Guice;
import org.testng.annotations.Test;
@Test(groups = TestNGGroup.CUSTOM_COORDINATOR_DUTIES)
@Guice(moduleFactory = DruidTestModuleFactory.class)
public class ITCustomCoordinatorDuties extends AbstractKafkaIndexingServiceTest
{
@Override
public String getTestNamePrefix()
{
return "kafka_non_transactional_parallelized";
}
@BeforeClass
public void beforeClass() throws Exception
{
doBeforeClass();
}
/**
* This test verify the kill supervisor custom coordinator duties which is enabled using the
* custom coordinator pluggable configurations.
*/
@Test
public void testKafkaTerminatedSupervisorAutoCleanup() throws Exception
{
doTestTerminatedSupervisorAutoCleanup(false);
}
}

View File

@ -68,6 +68,8 @@ import org.apache.druid.query.DruidMetrics;
import org.apache.druid.server.DruidNode;
import org.apache.druid.server.coordinator.duty.BalanceSegments;
import org.apache.druid.server.coordinator.duty.CompactSegments;
import org.apache.druid.server.coordinator.duty.CoordinatorCustomDutyGroup;
import org.apache.druid.server.coordinator.duty.CoordinatorCustomDutyGroups;
import org.apache.druid.server.coordinator.duty.CoordinatorDuty;
import org.apache.druid.server.coordinator.duty.EmitClusterStatsAndMetrics;
import org.apache.druid.server.coordinator.duty.LogUsedSegments;
@ -152,6 +154,7 @@ public class DruidCoordinator
private final DruidNode self;
private final Set<CoordinatorDuty> indexingServiceDuties;
private final Set<CoordinatorDuty> metadataStoreManagementDuties;
private final CoordinatorCustomDutyGroups customDutyGroups;
private final BalancerStrategyFactory factory;
private final LookupCoordinatorManager lookupCoordinatorManager;
private final DruidLeaderSelector coordLeaderSelector;
@ -187,6 +190,7 @@ public class DruidCoordinator
@Self DruidNode self,
@CoordinatorMetadataStoreManagementDuty Set<CoordinatorDuty> metadataStoreManagementDuties,
@CoordinatorIndexingServiceDuty Set<CoordinatorDuty> indexingServiceDuties,
CoordinatorCustomDutyGroups customDutyGroups,
BalancerStrategyFactory factory,
LookupCoordinatorManager lookupCoordinatorManager,
@Coordinator DruidLeaderSelector coordLeaderSelector,
@ -211,6 +215,7 @@ public class DruidCoordinator
new ConcurrentHashMap<>(),
indexingServiceDuties,
metadataStoreManagementDuties,
customDutyGroups,
factory,
lookupCoordinatorManager,
coordLeaderSelector,
@ -236,6 +241,7 @@ public class DruidCoordinator
ConcurrentMap<String, LoadQueuePeon> loadQueuePeonMap,
Set<CoordinatorDuty> indexingServiceDuties,
Set<CoordinatorDuty> metadataStoreManagementDuties,
CoordinatorCustomDutyGroups customDutyGroups,
BalancerStrategyFactory factory,
LookupCoordinatorManager lookupCoordinatorManager,
DruidLeaderSelector coordLeaderSelector,
@ -262,6 +268,7 @@ public class DruidCoordinator
this.self = self;
this.indexingServiceDuties = indexingServiceDuties;
this.metadataStoreManagementDuties = metadataStoreManagementDuties;
this.customDutyGroups = customDutyGroups;
this.exec = scheduledExecutorFactory.create(1, "Coordinator-Exec--%d");
@ -679,6 +686,20 @@ public class DruidCoordinator
)
);
for (CoordinatorCustomDutyGroup customDutyGroup : customDutyGroups.getCoordinatorCustomDutyGroups()) {
dutiesRunnables.add(
Pair.of(
new DutiesRunnable(customDutyGroup.getCustomDutyList(), startingLeaderCounter, customDutyGroup.getName()),
customDutyGroup.getPeriod()
)
);
log.info(
"Done making custom coordinator duties %s for group %s",
customDutyGroup.getCustomDutyList().stream().map(duty -> duty.getClass().getName()).collect(Collectors.toList()),
customDutyGroup.getName()
);
}
for (final Pair<? extends DutiesRunnable, Duration> dutiesRunnable : dutiesRunnables) {
// CompactSegmentsDuty can takes a non trival amount of time to complete.
// Hence, we schedule at fixed rate to make sure the other tasks still run at approximately every
@ -785,11 +806,11 @@ public class DruidCoordinator
protected class DutiesRunnable implements Runnable
{
private final long startTimeNanos = System.nanoTime();
private final List<CoordinatorDuty> duties;
private final List<? extends CoordinatorDuty> duties;
private final int startingLeaderCounter;
private final String dutiesRunnableAlias;
protected DutiesRunnable(List<CoordinatorDuty> duties, final int startingLeaderCounter, String alias)
protected DutiesRunnable(List<? extends CoordinatorDuty> duties, final int startingLeaderCounter, String alias)
{
this.duties = duties;
this.startingLeaderCounter = startingLeaderCounter;

View File

@ -0,0 +1,59 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.server.coordinator.duty;
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import com.fasterxml.jackson.annotation.JsonTypeName;
import org.apache.druid.guice.annotations.ExtensionPoint;
import org.apache.druid.initialization.DruidModule;
/**
* This {@link ExtensionPoint} allows for coordinator duty to be pluggable
* so that users can register their own duties without modifying Core Druid classes.
*
* Users can write their own custom coordinator duty implemnting this interface and setting the {@link JsonTypeName}.
* Next, users will need to register their custom coordinator as subtypes in their
* Module's {@link DruidModule#getJacksonModules()}. Once these steps are done, user will be able to load their
* custom coordinator duty using the following properties:
* druid.coordinator.dutyGroups=[<GROUP_NAME>]
* druid.coordinator.<GROUP_NAME>.duties=[<DUTY_NAME_MATCHING_JSON_TYPE_NAME>]
* druid.coordinator.<GROUP_NAME>.duty.<DUTY_NAME_MATCHING_JSON_TYPE_NAME>.<SOME_CONFIG_1>=100
* druid.coordinator.<GROUP_NAME>.duty.<DUTY_NAME_MATCHING_JSON_TYPE_NAME>.<SOME_CONFIG_2>=200
* druid.coordinator.<GROUP_NAME>.period="P1D"
*
* The duties can be grouped into multiple groups as per the elements in list druid.coordinator.dutyGroups.
* All duties in the same group will have the same run period configured by druid.coordinator.<GROUP_NAME>.period.
* There will be a single thread running the duties sequentially for each group.
*
* Note that custom duty does not implement CoordinatorDuty directly as existing Core Druid Coordinator Duties
* don't have associated JSON type and should not be manually grouped/enabled/disabled by the users.
* (The only exception is the metadata cleanup duties which we may refactor to use the custom duty system in the future)
*
*/
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
@JsonSubTypes({
@JsonSubTypes.Type(name = "killSupervisors", value = KillSupervisorsCustomDuty.class),
})
@ExtensionPoint
public interface CoordinatorCustomDuty extends CoordinatorDuty
{
}

View File

@ -0,0 +1,69 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.server.coordinator.duty;
import org.joda.time.Duration;
import java.util.List;
public class CoordinatorCustomDutyGroup
{
private final String name;
private final Duration period;
private final List<CoordinatorCustomDuty> customDutyList;
public CoordinatorCustomDutyGroup(
String name,
Duration period,
List<CoordinatorCustomDuty> customDutyList
)
{
this.name = name;
this.period = period;
this.customDutyList = customDutyList;
}
public String getName()
{
return name;
}
public Duration getPeriod()
{
return period;
}
public List<CoordinatorCustomDuty> getCustomDutyList()
{
return customDutyList;
}
@Override
public String toString()
{
return "CoordinatorCustomDutyGroup{" +
"name='" + name + '\'' +
", period=" + period +
", customDutyList=" + customDutyList +
'}';
}
}

View File

@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.server.coordinator.duty;
import java.util.Set;
public class CoordinatorCustomDutyGroups
{
private final Set<CoordinatorCustomDutyGroup> coordinatorCustomDutyGroups;
public CoordinatorCustomDutyGroups(
Set<CoordinatorCustomDutyGroup> coordinatorCustomDutyGroups
)
{
this.coordinatorCustomDutyGroups = coordinatorCustomDutyGroups;
}
public Set<CoordinatorCustomDutyGroup> getCoordinatorCustomDutyGroups()
{
return coordinatorCustomDutyGroups;
}
@Override
public String toString()
{
return "CoordinatorCustomDutyGroups{" +
"coordinatorCustomDutyGroups=" + coordinatorCustomDutyGroups +
'}';
}
}

View File

@ -0,0 +1,85 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.server.coordinator.duty;
import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import org.apache.druid.metadata.MetadataSupervisorManager;
import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams;
import org.joda.time.Duration;
/**
* CoordinatorDuty for automatic deletion of terminated supervisors from the supervisor table in metadata storage.
* This class has the same purpose as {@link KillSupervisors} but uses a different configuration style as
* detailed in {@link CoordinatorCustomDuty}. This class primary purpose is as an example to demostrate the usuage
* of the {@link CoordinatorCustomDuty} {@link org.apache.druid.guice.annotations.ExtensionPoint}
*
* Production use case should still use {@link KillSupervisors}. In the future, we might migrate all metadata
* management coordinator duties to {@link CoordinatorCustomDuty} but until then this class will remains undocumented
* and should not be use in production.
*/
public class KillSupervisorsCustomDuty implements CoordinatorCustomDuty
{
private static final Logger log = new Logger(KillSupervisorsCustomDuty.class);
private final Duration retainDuration;
private final MetadataSupervisorManager metadataSupervisorManager;
@JsonCreator
public KillSupervisorsCustomDuty(
@JsonProperty("retainDuration") Duration retainDuration,
@JacksonInject MetadataSupervisorManager metadataSupervisorManager
)
{
this.metadataSupervisorManager = metadataSupervisorManager;
this.retainDuration = retainDuration;
Preconditions.checkArgument(this.retainDuration != null && this.retainDuration.getMillis() >= 0, "(Custom Duty) Coordinator supervisor kill retainDuration must be >= 0");
log.info(
"Supervisor Kill Task scheduling enabled with retainDuration [%s]",
this.retainDuration
);
}
@Override
public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params)
{
long timestamp = System.currentTimeMillis() - retainDuration.getMillis();
try {
int supervisorRemoved = metadataSupervisorManager.removeTerminatedSupervisorsOlderThan(timestamp);
ServiceEmitter emitter = params.getEmitter();
emitter.emit(
new ServiceMetricEvent.Builder().build(
"metadata/kill/supervisor/count",
supervisorRemoved
)
);
log.info("Finished running KillSupervisors duty. Removed %,d supervisor specs", supervisorRemoved);
}
catch (Exception e) {
log.error(e, "Failed to kill terminated supervisor metadata");
}
return params;
}
}

View File

@ -52,6 +52,7 @@ import org.apache.druid.segment.TestHelper;
import org.apache.druid.server.DruidNode;
import org.apache.druid.server.coordination.DruidServerMetadata;
import org.apache.druid.server.coordination.ServerType;
import org.apache.druid.server.coordinator.duty.CoordinatorCustomDutyGroups;
import org.apache.druid.server.initialization.ZkPathsConfig;
import org.apache.druid.server.lookup.cache.LookupCoordinatorManager;
import org.apache.druid.server.metrics.NoopServiceEmitter;
@ -258,6 +259,7 @@ public class CuratorDruidCoordinatorTest extends CuratorTestBase
loadManagementPeons,
null,
null,
new CoordinatorCustomDutyGroups(ImmutableSet.of()),
new CostBalancerStrategyFactory(),
EasyMock.createNiceMock(LookupCoordinatorManager.class),
new TestDruidLeaderSelector(),
@ -558,6 +560,7 @@ public class CuratorDruidCoordinatorTest extends CuratorTestBase
loadManagementPeons,
null,
null,
new CoordinatorCustomDutyGroups(ImmutableSet.of()),
new CostBalancerStrategyFactory(),
EasyMock.createNiceMock(LookupCoordinatorManager.class),
new TestDruidLeaderSelector(),

View File

@ -52,6 +52,9 @@ import org.apache.druid.metadata.SegmentsMetadataManager;
import org.apache.druid.server.DruidNode;
import org.apache.druid.server.coordination.DruidServerMetadata;
import org.apache.druid.server.coordination.ServerType;
import org.apache.druid.server.coordinator.duty.CoordinatorCustomDuty;
import org.apache.druid.server.coordinator.duty.CoordinatorCustomDutyGroup;
import org.apache.druid.server.coordinator.duty.CoordinatorCustomDutyGroups;
import org.apache.druid.server.coordinator.rules.ForeverBroadcastDistributionRule;
import org.apache.druid.server.coordinator.rules.ForeverLoadRule;
import org.apache.druid.server.coordinator.rules.IntervalLoadRule;
@ -223,6 +226,7 @@ public class DruidCoordinatorTest extends CuratorTestBase
loadManagementPeons,
null,
new HashSet<>(),
new CoordinatorCustomDutyGroups(ImmutableSet.of()),
new CostBalancerStrategyFactory(),
EasyMock.createNiceMock(LookupCoordinatorManager.class),
new TestDruidLeaderSelector(),
@ -746,6 +750,7 @@ public class DruidCoordinatorTest extends CuratorTestBase
null,
null,
null,
new CoordinatorCustomDutyGroups(ImmutableSet.of()),
null,
null,
null,
@ -781,6 +786,129 @@ public class DruidCoordinatorTest extends CuratorTestBase
Assert.assertFalse(firstExec == thirdExec);
}
@Test(timeout = 3000)
public void testCoordinatorCustomDutyGroupsRunAsExpected() throws Exception
{
// Some nessesary setup to start the Coordinator
JacksonConfigManager configManager = EasyMock.createNiceMock(JacksonConfigManager.class);
EasyMock.expect(
configManager.watch(
EasyMock.eq(CoordinatorDynamicConfig.CONFIG_KEY),
EasyMock.anyObject(Class.class),
EasyMock.anyObject()
)
).andReturn(new AtomicReference(CoordinatorDynamicConfig.builder().build())).anyTimes();
EasyMock.expect(
configManager.watch(
EasyMock.eq(CoordinatorCompactionConfig.CONFIG_KEY),
EasyMock.anyObject(Class.class),
EasyMock.anyObject()
)
).andReturn(new AtomicReference(CoordinatorCompactionConfig.empty())).anyTimes();
EasyMock.replay(configManager);
EasyMock.expect(segmentsMetadataManager.isPollingDatabasePeriodically()).andReturn(true).anyTimes();
DruidDataSource dataSource = new DruidDataSource("dataSource1", Collections.emptyMap());
DataSegment dataSegment = new DataSegment(
"dataSource1",
Intervals.of("2010-01-01/P1D"),
"v1",
null,
null,
null,
null,
0x9,
0
);
dataSource.addSegment(dataSegment);
DataSourcesSnapshot dataSourcesSnapshot =
new DataSourcesSnapshot(ImmutableMap.of(dataSource.getName(), dataSource.toImmutableDruidDataSource()));
EasyMock
.expect(segmentsMetadataManager.getSnapshotOfDataSourcesWithAllUsedSegments())
.andReturn(dataSourcesSnapshot)
.anyTimes();
EasyMock.replay(segmentsMetadataManager);
EasyMock.expect(serverInventoryView.isStarted()).andReturn(true).anyTimes();
EasyMock.replay(serverInventoryView);
// Create CoordinatorCustomDutyGroups
// We will have two groups and each group has one duty
CountDownLatch latch1 = new CountDownLatch(1);
CoordinatorCustomDuty duty1 = new CoordinatorCustomDuty() {
@Override
public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params)
{
latch1.countDown();
return params;
}
};
CoordinatorCustomDutyGroup group1 = new CoordinatorCustomDutyGroup("group1", Duration.standardSeconds(1), ImmutableList.of(duty1));
CountDownLatch latch2 = new CountDownLatch(1);
CoordinatorCustomDuty duty2 = new CoordinatorCustomDuty() {
@Override
public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params)
{
latch2.countDown();
return params;
}
};
CoordinatorCustomDutyGroup group2 = new CoordinatorCustomDutyGroup("group2", Duration.standardSeconds(1), ImmutableList.of(duty2));
CoordinatorCustomDutyGroups groups = new CoordinatorCustomDutyGroups(ImmutableSet.of(group1, group2));
coordinator = new DruidCoordinator(
druidCoordinatorConfig,
new ZkPathsConfig()
{
@Override
public String getBase()
{
return "druid";
}
},
configManager,
segmentsMetadataManager,
serverInventoryView,
metadataRuleManager,
() -> curator,
serviceEmitter,
scheduledExecutorFactory,
null,
null,
new NoopServiceAnnouncer()
{
@Override
public void announce(DruidNode node)
{
// count down when this coordinator becomes the leader
leaderAnnouncerLatch.countDown();
}
@Override
public void unannounce(DruidNode node)
{
leaderUnannouncerLatch.countDown();
}
},
druidNode,
loadManagementPeons,
null,
new HashSet<>(),
groups,
new CostBalancerStrategyFactory(),
EasyMock.createNiceMock(LookupCoordinatorManager.class),
new TestDruidLeaderSelector(),
null,
ZkEnablementConfig.ENABLED
);
coordinator.start();
// Wait until group 1 duty ran for latch1 to countdown
latch1.await();
// Wait until group 2 duty ran for latch2 to countdown
latch2.await();
}
private CountDownLatch createCountDownLatchAndSetPathChildrenCacheListenerWithLatch(int latchCount,
PathChildrenCache pathChildrenCache,
Map<String, DataSegment> segments,

View File

@ -0,0 +1,86 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.server.coordinator.duty;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.emitter.service.ServiceEventBuilder;
import org.apache.druid.metadata.MetadataSupervisorManager;
import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams;
import org.joda.time.Duration;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.mockito.ArgumentMatchers;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;
@RunWith(MockitoJUnitRunner.class)
public class KillSupervisorsCustomDutyTest
{
@Mock
private MetadataSupervisorManager mockMetadataSupervisorManager;
@Mock
private DruidCoordinatorRuntimeParams mockDruidCoordinatorRuntimeParams;
@Mock
private ServiceEmitter mockServiceEmitter;
@Rule
public ExpectedException exception = ExpectedException.none();
private KillSupervisorsCustomDuty killSupervisors;
@Test
public void testConstructorFailIfRetainDurationNull()
{
exception.expect(IllegalArgumentException.class);
exception.expectMessage("(Custom Duty) Coordinator supervisor kill retainDuration must be >= 0");
killSupervisors = new KillSupervisorsCustomDuty(null, mockMetadataSupervisorManager);
}
@Test
public void testConstructorFailIfRetainDurationInvalid()
{
exception.expect(IllegalArgumentException.class);
exception.expectMessage("(Custom Duty) Coordinator supervisor kill retainDuration must be >= 0");
killSupervisors = new KillSupervisorsCustomDuty(new Duration("PT-1s"), mockMetadataSupervisorManager);
}
@Test
public void testConstructorSuccess()
{
killSupervisors = new KillSupervisorsCustomDuty(new Duration("PT1S"), mockMetadataSupervisorManager);
Assert.assertNotNull(killSupervisors);
}
@Test
public void testRun()
{
Mockito.when(mockDruidCoordinatorRuntimeParams.getEmitter()).thenReturn(mockServiceEmitter);
killSupervisors = new KillSupervisorsCustomDuty(new Duration("PT1S"), mockMetadataSupervisorManager);
killSupervisors.run(mockDruidCoordinatorRuntimeParams);
Mockito.verify(mockMetadataSupervisorManager).removeTerminatedSupervisorsOlderThan(ArgumentMatchers.anyLong());
Mockito.verify(mockServiceEmitter).emit(ArgumentMatchers.any(ServiceEventBuilder.class));
}
}

View File

@ -19,8 +19,11 @@
package org.apache.druid.cli;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Predicates;
import com.google.common.base.Strings;
import com.google.common.base.Supplier;
import com.google.inject.Binder;
import com.google.inject.Inject;
import com.google.inject.Key;
@ -43,6 +46,7 @@ import org.apache.druid.guice.ConditionalMultibind;
import org.apache.druid.guice.ConfigProvider;
import org.apache.druid.guice.Jerseys;
import org.apache.druid.guice.JsonConfigProvider;
import org.apache.druid.guice.JsonConfigurator;
import org.apache.druid.guice.LazySingleton;
import org.apache.druid.guice.LifecycleModule;
import org.apache.druid.guice.ManageLifecycle;
@ -52,6 +56,9 @@ import org.apache.druid.guice.annotations.EscalatedGlobal;
import org.apache.druid.guice.http.JettyHttpClientModule;
import org.apache.druid.indexing.overlord.TaskMaster;
import org.apache.druid.indexing.overlord.TaskStorage;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.concurrent.ExecutorServices;
import org.apache.druid.java.util.common.concurrent.ScheduledExecutorFactory;
@ -75,6 +82,9 @@ import org.apache.druid.server.coordinator.DruidCoordinator;
import org.apache.druid.server.coordinator.DruidCoordinatorConfig;
import org.apache.druid.server.coordinator.KillStalePendingSegments;
import org.apache.druid.server.coordinator.LoadQueueTaskMaster;
import org.apache.druid.server.coordinator.duty.CoordinatorCustomDuty;
import org.apache.druid.server.coordinator.duty.CoordinatorCustomDutyGroup;
import org.apache.druid.server.coordinator.duty.CoordinatorCustomDutyGroups;
import org.apache.druid.server.coordinator.duty.CoordinatorDuty;
import org.apache.druid.server.coordinator.duty.KillAuditLog;
import org.apache.druid.server.coordinator.duty.KillCompactionConfig;
@ -104,10 +114,13 @@ import org.apache.druid.server.lookup.cache.LookupCoordinatorManager;
import org.apache.druid.server.lookup.cache.LookupCoordinatorManagerConfig;
import org.apache.druid.server.router.TieredBrokerConfig;
import org.eclipse.jetty.server.Server;
import org.joda.time.Duration;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ExecutorService;
/**
@ -224,7 +237,7 @@ public class CliCoordinator extends ServerRunnable
LifecycleModule.register(binder, Server.class);
LifecycleModule.register(binder, DataSourcesResource.class);
// Binding for Set of indexing service coordinator Ddty
// Binding for Set of indexing service coordinator Duty
final ConditionalMultibind<CoordinatorDuty> conditionalIndexingServiceDutyMultibind = ConditionalMultibind.create(
properties,
binder,
@ -300,6 +313,10 @@ public class CliCoordinator extends ServerRunnable
binder.bind(TaskMaster.class).toProvider(Providers.of(null));
binder.bind(RowIngestionMetersFactory.class).toProvider(Providers.of(null));
}
binder.bind(CoordinatorCustomDutyGroups.class)
.toProvider(new CoordinatorCustomDutyGroupsProvider())
.in(LazySingleton.class);
}
@Provides
@ -353,4 +370,79 @@ public class CliCoordinator extends ServerRunnable
{
return Boolean.parseBoolean(properties.getProperty(AS_OVERLORD_PROPERTY));
}
private static class CoordinatorCustomDutyGroupsProvider implements Provider<CoordinatorCustomDutyGroups>
{
private Properties props;
private JsonConfigurator configurator;
private ObjectMapper jsonMapper;
@Inject
public void inject(Properties props, JsonConfigurator configurator, ObjectMapper jsonMapper)
{
this.props = props;
this.configurator = configurator;
this.jsonMapper = jsonMapper;
}
@Override
public CoordinatorCustomDutyGroups get()
{
try {
Set<CoordinatorCustomDutyGroup> coordinatorCustomDutyGroups = new HashSet<>();
if (Strings.isNullOrEmpty(props.getProperty("druid.coordinator.dutyGroups"))) {
return new CoordinatorCustomDutyGroups(coordinatorCustomDutyGroups);
}
List<String> coordinatorCustomDutyGroupNames = jsonMapper.readValue(props.getProperty(
"druid.coordinator.dutyGroups"), new TypeReference<List<String>>() {});
for (String coordinatorCustomDutyGroupName : coordinatorCustomDutyGroupNames) {
String dutyListProperty = StringUtils.format("druid.coordinator.%s.duties", coordinatorCustomDutyGroupName);
if (Strings.isNullOrEmpty(props.getProperty(dutyListProperty))) {
throw new IAE("Coordinator custom duty group given without any duty for group %s", coordinatorCustomDutyGroupName);
}
List<String> dutyForGroup = jsonMapper.readValue(props.getProperty(dutyListProperty), new TypeReference<List<String>>() {});
List<CoordinatorCustomDuty> coordinatorCustomDuties = new ArrayList<>();
for (String dutyName : dutyForGroup) {
final String dutyPropertyBase = StringUtils.format(
"druid.coordinator.%s.duty.%s",
coordinatorCustomDutyGroupName,
dutyName
);
final JsonConfigProvider<CoordinatorCustomDuty> coordinatorCustomDutyProvider = JsonConfigProvider.of(
dutyPropertyBase,
CoordinatorCustomDuty.class
);
String typeProperty = StringUtils.format("%s.type", dutyPropertyBase);
Properties adjustedProps = new Properties(props);
if (adjustedProps.containsKey(typeProperty)) {
throw new IAE("'type' property [%s] is reserved.", typeProperty);
} else {
adjustedProps.put(typeProperty, dutyName);
}
coordinatorCustomDutyProvider.inject(adjustedProps, configurator);
Supplier<CoordinatorCustomDuty> coordinatorCustomDutySupplier = coordinatorCustomDutyProvider.get();
if (coordinatorCustomDutySupplier == null) {
throw new ISE("Could not create CoordinatorCustomDuty with name: %s for group: %s", dutyName, coordinatorCustomDutyGroupName);
}
CoordinatorCustomDuty coordinatorCustomDuty = coordinatorCustomDutySupplier.get();
if (coordinatorCustomDuty == null) {
throw new ISE("Could not create CoordinatorCustomDuty with name: %s for group: %s", dutyName, coordinatorCustomDutyGroupName);
}
coordinatorCustomDuties.add(coordinatorCustomDuty);
}
String groupPeriodPropKey = StringUtils.format("druid.coordinator.%s.period", coordinatorCustomDutyGroupName);
if (Strings.isNullOrEmpty(props.getProperty(groupPeriodPropKey))) {
throw new IAE("Run period for coordinator custom duty group must be set for group %s", coordinatorCustomDutyGroupName);
}
Duration groupPeriod = jsonMapper.readValue(props.getProperty(groupPeriodPropKey), Duration.class);
coordinatorCustomDutyGroups.add(new CoordinatorCustomDutyGroup(coordinatorCustomDutyGroupName, groupPeriod, coordinatorCustomDuties));
}
return new CoordinatorCustomDutyGroups(coordinatorCustomDutyGroups);
}
catch (Exception e) {
throw new RuntimeException(e);
}
}
}
}