From 0872448ff09dd20010645feb4004fc5ed212d1c3 Mon Sep 17 00:00:00 2001 From: Bingkun Guo Date: Fri, 18 Mar 2016 11:38:06 -0500 Subject: [PATCH] make Coordinator IndexingService helpers pluggable Fixes #2682 IndexingService helpers are added according to the settings in runtime.properties. Rather than having all the config.isXXX checks there, it makes sense to have a pluggable approach for allowing the dynamic configuration to bring in implementations for helpers without having to have hard-coded sets of available helpers. Plus, it will also make it possible for extensions to plug helpers in. With https://github.com/druid-io/druid-api/pull/76, we could conditionally bind a helper to Coordinator's runlist. The condition is driven by the value set in the runtime.properties. --- .../CoordinatorIndexingServiceHelper.java | 36 ++++++++++ .../server/coordinator/DruidCoordinator.java | 71 ++++--------------- .../helper/DruidCoordinatorSegmentKiller.java | 19 ++--- .../helper/DruidCoordinatorSegmentMerger.java | 16 ++++- .../DruidCoordinatorVersionConverter.java | 8 ++- .../DruidCoordinatorSegmentMergerTest.java | 32 +++++---- .../coordinator/DruidCoordinatorTest.java | 3 +- .../DruidCoordinatorSegmentKillerTest.java | 16 ++++- .../java/io/druid/cli/CliCoordinator.java | 36 ++++++++++ 9 files changed, 150 insertions(+), 87 deletions(-) create mode 100644 server/src/main/java/io/druid/guice/annotations/CoordinatorIndexingServiceHelper.java diff --git a/server/src/main/java/io/druid/guice/annotations/CoordinatorIndexingServiceHelper.java b/server/src/main/java/io/druid/guice/annotations/CoordinatorIndexingServiceHelper.java new file mode 100644 index 00000000000..9f61959e01a --- /dev/null +++ b/server/src/main/java/io/druid/guice/annotations/CoordinatorIndexingServiceHelper.java @@ -0,0 +1,36 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.guice.annotations; + +import com.google.inject.BindingAnnotation; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +/** + */ +@BindingAnnotation +@Target({ElementType.FIELD, ElementType.PARAMETER, ElementType.METHOD}) +@Retention(RetentionPolicy.RUNTIME) +public @interface CoordinatorIndexingServiceHelper +{ +} diff --git a/server/src/main/java/io/druid/server/coordinator/DruidCoordinator.java b/server/src/main/java/io/druid/server/coordinator/DruidCoordinator.java index 217c6f9bfd5..3b2d20b9e0a 100644 --- a/server/src/main/java/io/druid/server/coordinator/DruidCoordinator.java +++ b/server/src/main/java/io/druid/server/coordinator/DruidCoordinator.java @@ -40,7 +40,6 @@ import com.metamx.common.lifecycle.LifecycleStart; import com.metamx.common.lifecycle.LifecycleStop; import com.metamx.emitter.EmittingLogger; import com.metamx.emitter.service.ServiceEmitter; -import com.metamx.emitter.service.ServiceMetricEvent; import io.druid.client.DruidDataSource; import io.druid.client.DruidServer; import io.druid.client.ImmutableDruidDataSource; @@ -52,10 +51,10 @@ import io.druid.common.config.JacksonConfigManager; import io.druid.concurrent.Execs; import io.druid.curator.discovery.ServiceAnnouncer; import io.druid.guice.ManageLifecycle; +import io.druid.guice.annotations.CoordinatorIndexingServiceHelper; import io.druid.guice.annotations.Self; import io.druid.metadata.MetadataRuleManager; import io.druid.metadata.MetadataSegmentManager; -import io.druid.segment.IndexIO; import io.druid.server.DruidNode; import io.druid.server.coordinator.helper.DruidCoordinatorBalancer; import io.druid.server.coordinator.helper.DruidCoordinatorCleanupOvershadowed; @@ -64,9 +63,6 @@ import io.druid.server.coordinator.helper.DruidCoordinatorHelper; import io.druid.server.coordinator.helper.DruidCoordinatorLogger; import io.druid.server.coordinator.helper.DruidCoordinatorRuleRunner; import io.druid.server.coordinator.helper.DruidCoordinatorSegmentInfoLoader; -import io.druid.server.coordinator.helper.DruidCoordinatorSegmentKiller; -import io.druid.server.coordinator.helper.DruidCoordinatorSegmentMerger; -import io.druid.server.coordinator.helper.DruidCoordinatorVersionConverter; import io.druid.server.coordinator.rules.LoadRule; import io.druid.server.coordinator.rules.Rule; import io.druid.server.initialization.ZkPathsConfig; @@ -128,6 +124,7 @@ public class DruidCoordinator private final AtomicReference leaderLatch; private final ServiceAnnouncer serviceAnnouncer; private final DruidNode self; + private final Set indexingServiceHelpers; private volatile boolean started = false; private volatile int leaderCounter = 0; private volatile boolean leader = false; @@ -148,7 +145,8 @@ public class DruidCoordinator IndexingServiceClient indexingServiceClient, LoadQueueTaskMaster taskMaster, ServiceAnnouncer serviceAnnouncer, - @Self DruidNode self + @Self DruidNode self, + @CoordinatorIndexingServiceHelper Set indexingServiceHelpers ) { this( @@ -165,7 +163,8 @@ public class DruidCoordinator taskMaster, serviceAnnouncer, self, - Maps.newConcurrentMap() + Maps.newConcurrentMap(), + indexingServiceHelpers ); } @@ -183,7 +182,8 @@ public class DruidCoordinator LoadQueueTaskMaster taskMaster, ServiceAnnouncer serviceAnnouncer, DruidNode self, - ConcurrentMap loadQueuePeonMap + ConcurrentMap loadQueuePeonMap, + Set indexingServiceHelpers ) { this.config = config; @@ -199,6 +199,7 @@ public class DruidCoordinator this.taskMaster = taskMaster; this.serviceAnnouncer = serviceAnnouncer; this.self = self; + this.indexingServiceHelpers = indexingServiceHelpers; this.exec = scheduledExecutorFactory.create(1, "Coordinator-Exec--%d"); @@ -404,7 +405,7 @@ public class DruidCoordinator public void execute() { try { - if (curator.checkExists().forPath(toServedSegPath) != null && + if (curator.checkExists().forPath(toServedSegPath) != null && curator.checkExists().forPath(toLoadQueueSegPath) == null && !dropPeon.getSegmentsToDrop().contains(segment)) { dropPeon.dropSegment(segment, callback); @@ -560,12 +561,7 @@ public class DruidCoordinator coordinatorRunnables.add( Pair.of( new CoordinatorIndexingServiceRunnable( - makeIndexingServiceHelpers( - configManager.watch( - DatasourceWhitelist.CONFIG_KEY, - DatasourceWhitelist.class - ) - ), + makeIndexingServiceHelpers(), startingLeaderCounter ), config.getCoordinatorIndexingPeriod() @@ -643,52 +639,13 @@ public class DruidCoordinator } } - private List makeIndexingServiceHelpers( - final AtomicReference whitelistRef - ) + private List makeIndexingServiceHelpers() { List helpers = Lists.newArrayList(); - helpers.add(new DruidCoordinatorSegmentInfoLoader(DruidCoordinator.this)); + helpers.addAll(indexingServiceHelpers); - if (config.isConvertSegments()) { - helpers.add(new DruidCoordinatorVersionConverter(indexingServiceClient, whitelistRef)); - } - if (config.isMergeSegments()) { - helpers.add(new DruidCoordinatorSegmentMerger(indexingServiceClient, whitelistRef)); - helpers.add( - new DruidCoordinatorHelper() - { - @Override - public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params) - { - CoordinatorStats stats = params.getCoordinatorStats(); - log.info("Issued merge requests for %s segments", stats.getGlobalStats().get("mergedCount").get()); - - params.getEmitter().emit( - new ServiceMetricEvent.Builder().build( - "coordinator/merge/count", stats.getGlobalStats().get("mergedCount") - ) - ); - - return params; - } - } - ); - } - - if (config.isKillSegments()) { - helpers.add( - new DruidCoordinatorSegmentKiller( - metadataSegmentManager, - indexingServiceClient, - config.getCoordinatorKillDurationToRetain(), - config.getCoordinatorKillPeriod(), - config.getCoordinatorKillMaxSegments() - ) - ); - } - + log.info("Done making indexing service helpers [%s]", helpers); return ImmutableList.copyOf(helpers); } diff --git a/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorSegmentKiller.java b/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorSegmentKiller.java index aa9333552fc..a04c16aab47 100644 --- a/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorSegmentKiller.java +++ b/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorSegmentKiller.java @@ -21,12 +21,13 @@ package io.druid.server.coordinator.helper; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import com.google.inject.Inject; import com.metamx.common.logger.Logger; import io.druid.client.indexing.IndexingServiceClient; import io.druid.common.utils.JodaUtils; import io.druid.metadata.MetadataSegmentManager; +import io.druid.server.coordinator.DruidCoordinatorConfig; import io.druid.server.coordinator.DruidCoordinatorRuntimeParams; -import org.joda.time.Duration; import org.joda.time.Interval; import java.util.List; @@ -47,21 +48,23 @@ public class DruidCoordinatorSegmentKiller implements DruidCoordinatorHelper private final MetadataSegmentManager segmentManager; private final IndexingServiceClient indexingServiceClient; + @Inject public DruidCoordinatorSegmentKiller( MetadataSegmentManager segmentManager, IndexingServiceClient indexingServiceClient, - Duration retainDuration, - Duration period, - int maxSegmentsToKill + DruidCoordinatorConfig config ) { - this.period = period.getMillis(); - Preconditions.checkArgument(this.period > 0, "coordinator kill period must be > 0"); + this.period = config.getCoordinatorKillPeriod().getMillis(); + Preconditions.checkArgument( + this.period > config.getCoordinatorIndexingPeriod().getMillis(), + "coordinator kill period must be greater than druid.coordinator.period.indexingPeriod" + ); - this.retainDuration = retainDuration.getMillis(); + this.retainDuration = config.getCoordinatorKillDurationToRetain().getMillis(); Preconditions.checkArgument(this.retainDuration >= 0, "coordinator kill retainDuration must be >= 0"); - this.maxSegmentsToKill = maxSegmentsToKill; + this.maxSegmentsToKill = config.getCoordinatorKillMaxSegments(); Preconditions.checkArgument(this.maxSegmentsToKill > 0, "coordinator kill maxSegments must be > 0"); log.info( diff --git a/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorSegmentMerger.java b/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorSegmentMerger.java index ccf4cc10cfc..d9fdbe89415 100644 --- a/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorSegmentMerger.java +++ b/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorSegmentMerger.java @@ -28,11 +28,14 @@ import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Multiset; import com.google.common.collect.Ordering; +import com.google.inject.Inject; import com.metamx.common.ISE; import com.metamx.common.Pair; import com.metamx.common.guava.FunctionalIterable; import com.metamx.common.logger.Logger; +import com.metamx.emitter.service.ServiceMetricEvent; import io.druid.client.indexing.IndexingServiceClient; +import io.druid.common.config.JacksonConfigManager; import io.druid.server.coordinator.CoordinatorStats; import io.druid.server.coordinator.DatasourceWhitelist; import io.druid.server.coordinator.DruidCoordinatorRuntimeParams; @@ -57,13 +60,14 @@ public class DruidCoordinatorSegmentMerger implements DruidCoordinatorHelper private final IndexingServiceClient indexingServiceClient; private final AtomicReference whiteListRef; + @Inject public DruidCoordinatorSegmentMerger( IndexingServiceClient indexingServiceClient, - AtomicReference whitelistRef + JacksonConfigManager configManager ) { this.indexingServiceClient = indexingServiceClient; - this.whiteListRef = whitelistRef; + this.whiteListRef = configManager.watch(DatasourceWhitelist.CONFIG_KEY, DatasourceWhitelist.class); } @Override @@ -126,6 +130,14 @@ public class DruidCoordinatorSegmentMerger implements DruidCoordinatorHelper } } + log.info("Issued merge requests for %s segments", stats.getGlobalStats().get("mergedCount").get()); + + params.getEmitter().emit( + new ServiceMetricEvent.Builder().build( + "coordinator/merge/count", stats.getGlobalStats().get("mergedCount") + ) + ); + return params.buildFromExisting() .withCoordinatorStats(stats) .build(); diff --git a/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorVersionConverter.java b/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorVersionConverter.java index 3318eb2e1c7..d37d0e660f5 100644 --- a/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorVersionConverter.java +++ b/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorVersionConverter.java @@ -19,8 +19,10 @@ package io.druid.server.coordinator.helper; +import com.google.inject.Inject; import com.metamx.emitter.EmittingLogger; import io.druid.client.indexing.IndexingServiceClient; +import io.druid.common.config.JacksonConfigManager; import io.druid.segment.IndexIO; import io.druid.server.coordinator.DatasourceWhitelist; import io.druid.server.coordinator.DruidCoordinatorRuntimeParams; @@ -32,17 +34,17 @@ public class DruidCoordinatorVersionConverter implements DruidCoordinatorHelper { private static final EmittingLogger log = new EmittingLogger(DruidCoordinatorVersionConverter.class); - private final IndexingServiceClient indexingServiceClient; private final AtomicReference whitelistRef; + @Inject public DruidCoordinatorVersionConverter( IndexingServiceClient indexingServiceClient, - AtomicReference whitelistRef + JacksonConfigManager configManager ) { this.indexingServiceClient = indexingServiceClient; - this.whitelistRef = whitelistRef; + this.whitelistRef = configManager.watch(DatasourceWhitelist.CONFIG_KEY, DatasourceWhitelist.class); } @Override diff --git a/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorSegmentMergerTest.java b/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorSegmentMergerTest.java index da6be0d2baa..fc541c58e02 100644 --- a/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorSegmentMergerTest.java +++ b/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorSegmentMergerTest.java @@ -22,10 +22,13 @@ package io.druid.server.coordinator; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Lists; +import com.metamx.emitter.service.ServiceEmitter; import io.druid.client.indexing.IndexingServiceClient; +import io.druid.common.config.JacksonConfigManager; import io.druid.server.coordinator.helper.DruidCoordinatorSegmentMerger; import io.druid.timeline.DataSegment; import io.druid.timeline.partition.LinearShardSpec; +import org.easymock.EasyMock; import org.joda.time.Interval; import org.junit.Assert; import org.junit.Test; @@ -446,6 +449,11 @@ public class DruidCoordinatorSegmentMergerTest */ private static List> merge(final Collection segments) { + final JacksonConfigManager configManager = EasyMock.createMock(JacksonConfigManager.class); + EasyMock.expect(configManager.watch(DatasourceWhitelist.CONFIG_KEY, DatasourceWhitelist.class)) + .andReturn(new AtomicReference(null)).anyTimes(); + EasyMock.replay(configManager); + final List> retVal = Lists.newArrayList(); final IndexingServiceClient indexingServiceClient = new IndexingServiceClient(null, null, null) { @@ -456,19 +464,17 @@ public class DruidCoordinatorSegmentMergerTest } }; - final AtomicReference whitelistRef = new AtomicReference(null); - final DruidCoordinatorSegmentMerger merger = new DruidCoordinatorSegmentMerger(indexingServiceClient, whitelistRef); - final DruidCoordinatorRuntimeParams params = DruidCoordinatorRuntimeParams.newBuilder() - .withAvailableSegments(ImmutableSet.copyOf(segments)) - .withDynamicConfigs( - new CoordinatorDynamicConfig.Builder().withMergeBytesLimit( - mergeBytesLimit - ).withMergeSegmentsLimit( - mergeSegmentsLimit - ) - .build() - ) - .build(); + final DruidCoordinatorSegmentMerger merger = new DruidCoordinatorSegmentMerger( + indexingServiceClient, + configManager + ); + final DruidCoordinatorRuntimeParams params = + DruidCoordinatorRuntimeParams.newBuilder() + .withAvailableSegments(ImmutableSet.copyOf(segments)) + .withDynamicConfigs(new CoordinatorDynamicConfig.Builder().withMergeBytesLimit( + mergeBytesLimit).withMergeSegmentsLimit(mergeSegmentsLimit).build()) + .withEmitter(EasyMock.createMock(ServiceEmitter.class)) + .build(); merger.run(params); return retVal; } diff --git a/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorTest.java b/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorTest.java index f884c0131a3..bdd73576507 100644 --- a/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorTest.java +++ b/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorTest.java @@ -186,7 +186,8 @@ public class DruidCoordinatorTest extends CuratorTestBase } }, druidNode, - loadManagementPeons + loadManagementPeons, + null ); } diff --git a/server/src/test/java/io/druid/server/coordinator/helper/DruidCoordinatorSegmentKillerTest.java b/server/src/test/java/io/druid/server/coordinator/helper/DruidCoordinatorSegmentKillerTest.java index 819446dcaea..cbefc9836bb 100644 --- a/server/src/test/java/io/druid/server/coordinator/helper/DruidCoordinatorSegmentKillerTest.java +++ b/server/src/test/java/io/druid/server/coordinator/helper/DruidCoordinatorSegmentKillerTest.java @@ -22,6 +22,7 @@ package io.druid.server.coordinator.helper; import com.google.common.collect.ImmutableList; import io.druid.client.indexing.IndexingServiceClient; import io.druid.metadata.MetadataSegmentManager; +import io.druid.server.coordinator.TestDruidCoordinatorConfig; import org.easymock.EasyMock; import org.joda.time.Duration; import org.joda.time.Interval; @@ -100,9 +101,18 @@ public class DruidCoordinatorSegmentKillerTest DruidCoordinatorSegmentKiller coordinatorSegmentKiller = new DruidCoordinatorSegmentKiller( segmentManager, indexingServiceClient, - Duration.parse("PT86400S"), - Duration.parse("PT86400S"), - 1000 + new TestDruidCoordinatorConfig( + null, + null, + Duration.parse("PT76400S"), + new Duration(1), + Duration.parse("PT86400S"), + Duration.parse("PT86400S"), + 1000, + null, + false, + false + ) ); Assert.assertEquals( diff --git a/services/src/main/java/io/druid/cli/CliCoordinator.java b/services/src/main/java/io/druid/cli/CliCoordinator.java index b44c4349d88..b66bd744ed4 100644 --- a/services/src/main/java/io/druid/cli/CliCoordinator.java +++ b/services/src/main/java/io/druid/cli/CliCoordinator.java @@ -20,8 +20,10 @@ package io.druid.cli; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Predicates; import com.google.common.collect.ImmutableList; import com.google.inject.Binder; +import com.google.inject.Inject; import com.google.inject.Module; import com.google.inject.Provides; import com.google.inject.name.Names; @@ -31,12 +33,14 @@ import io.airlift.airline.Command; import io.druid.audit.AuditManager; import io.druid.client.CoordinatorServerView; import io.druid.client.indexing.IndexingServiceClient; +import io.druid.guice.ConditionalMultibind; import io.druid.guice.ConfigProvider; import io.druid.guice.Jerseys; import io.druid.guice.JsonConfigProvider; import io.druid.guice.LazySingleton; import io.druid.guice.LifecycleModule; import io.druid.guice.ManageLifecycle; +import io.druid.guice.annotations.CoordinatorIndexingServiceHelper; import io.druid.metadata.MetadataRuleManager; import io.druid.metadata.MetadataRuleManagerConfig; import io.druid.metadata.MetadataRuleManagerProvider; @@ -49,6 +53,10 @@ import io.druid.server.audit.AuditManagerProvider; import io.druid.server.coordinator.DruidCoordinator; import io.druid.server.coordinator.DruidCoordinatorConfig; import io.druid.server.coordinator.LoadQueueTaskMaster; +import io.druid.server.coordinator.helper.DruidCoordinatorHelper; +import io.druid.server.coordinator.helper.DruidCoordinatorSegmentKiller; +import io.druid.server.coordinator.helper.DruidCoordinatorSegmentMerger; +import io.druid.server.coordinator.helper.DruidCoordinatorVersionConverter; import io.druid.server.http.CoordinatorDynamicConfigsResource; import io.druid.server.http.CoordinatorRedirectInfo; import io.druid.server.http.CoordinatorResource; @@ -70,6 +78,7 @@ import org.apache.curator.framework.CuratorFramework; import org.eclipse.jetty.server.Server; import java.util.List; +import java.util.Properties; import java.util.concurrent.Executors; /** @@ -82,11 +91,19 @@ public class CliCoordinator extends ServerRunnable { private static final Logger log = new Logger(CliCoordinator.class); + private Properties properties; + public CliCoordinator() { super(log); } + @Inject + public void configure(Properties properties) + { + this.properties = properties; + } + @Override protected List getModules() { @@ -155,6 +172,25 @@ public class CliCoordinator extends ServerRunnable LifecycleModule.register(binder, Server.class); LifecycleModule.register(binder, DatasourcesResource.class); + ConditionalMultibind.create( + properties, + binder, + DruidCoordinatorHelper.class, + CoordinatorIndexingServiceHelper.class + ).addConditionBinding( + "druid.coordinator.merge.on", + Predicates.equalTo("true"), + DruidCoordinatorSegmentMerger.class + ).addConditionBinding( + "druid.coordinator.conversion.on", + Predicates.equalTo("true"), + DruidCoordinatorVersionConverter.class + ).addConditionBinding( + "druid.coordinator.kill.on", + Predicates.equalTo("true"), + DruidCoordinatorSegmentKiller.class + ); + } @Provides