Merge pull request #2712 from guobingkun/make_runnables_pluggable

make Coordinator IndexingService helpers pluggable
This commit is contained in:
Parag Jain 2016-03-28 12:18:18 -05:00
commit 89a8277ae2
9 changed files with 150 additions and 87 deletions

View File

@ -0,0 +1,36 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.guice.annotations;
import com.google.inject.BindingAnnotation;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
/**
*/
@BindingAnnotation
@Target({ElementType.FIELD, ElementType.PARAMETER, ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
public @interface CoordinatorIndexingServiceHelper
{
}

View File

@ -40,7 +40,6 @@ import com.metamx.common.lifecycle.LifecycleStart;
import com.metamx.common.lifecycle.LifecycleStop;
import com.metamx.emitter.EmittingLogger;
import com.metamx.emitter.service.ServiceEmitter;
import com.metamx.emitter.service.ServiceMetricEvent;
import io.druid.client.DruidDataSource;
import io.druid.client.DruidServer;
import io.druid.client.ImmutableDruidDataSource;
@ -52,10 +51,10 @@ import io.druid.common.config.JacksonConfigManager;
import io.druid.concurrent.Execs;
import io.druid.curator.discovery.ServiceAnnouncer;
import io.druid.guice.ManageLifecycle;
import io.druid.guice.annotations.CoordinatorIndexingServiceHelper;
import io.druid.guice.annotations.Self;
import io.druid.metadata.MetadataRuleManager;
import io.druid.metadata.MetadataSegmentManager;
import io.druid.segment.IndexIO;
import io.druid.server.DruidNode;
import io.druid.server.coordinator.helper.DruidCoordinatorBalancer;
import io.druid.server.coordinator.helper.DruidCoordinatorCleanupOvershadowed;
@ -64,9 +63,6 @@ import io.druid.server.coordinator.helper.DruidCoordinatorHelper;
import io.druid.server.coordinator.helper.DruidCoordinatorLogger;
import io.druid.server.coordinator.helper.DruidCoordinatorRuleRunner;
import io.druid.server.coordinator.helper.DruidCoordinatorSegmentInfoLoader;
import io.druid.server.coordinator.helper.DruidCoordinatorSegmentKiller;
import io.druid.server.coordinator.helper.DruidCoordinatorSegmentMerger;
import io.druid.server.coordinator.helper.DruidCoordinatorVersionConverter;
import io.druid.server.coordinator.rules.LoadRule;
import io.druid.server.coordinator.rules.Rule;
import io.druid.server.initialization.ZkPathsConfig;
@ -128,6 +124,7 @@ public class DruidCoordinator
private final AtomicReference<LeaderLatch> leaderLatch;
private final ServiceAnnouncer serviceAnnouncer;
private final DruidNode self;
private final Set<DruidCoordinatorHelper> indexingServiceHelpers;
private volatile boolean started = false;
private volatile int leaderCounter = 0;
private volatile boolean leader = false;
@ -148,7 +145,8 @@ public class DruidCoordinator
IndexingServiceClient indexingServiceClient,
LoadQueueTaskMaster taskMaster,
ServiceAnnouncer serviceAnnouncer,
@Self DruidNode self
@Self DruidNode self,
@CoordinatorIndexingServiceHelper Set<DruidCoordinatorHelper> indexingServiceHelpers
)
{
this(
@ -165,7 +163,8 @@ public class DruidCoordinator
taskMaster,
serviceAnnouncer,
self,
Maps.<String, LoadQueuePeon>newConcurrentMap()
Maps.<String, LoadQueuePeon>newConcurrentMap(),
indexingServiceHelpers
);
}
@ -183,7 +182,8 @@ public class DruidCoordinator
LoadQueueTaskMaster taskMaster,
ServiceAnnouncer serviceAnnouncer,
DruidNode self,
ConcurrentMap<String, LoadQueuePeon> loadQueuePeonMap
ConcurrentMap<String, LoadQueuePeon> loadQueuePeonMap,
Set<DruidCoordinatorHelper> indexingServiceHelpers
)
{
this.config = config;
@ -199,6 +199,7 @@ public class DruidCoordinator
this.taskMaster = taskMaster;
this.serviceAnnouncer = serviceAnnouncer;
this.self = self;
this.indexingServiceHelpers = indexingServiceHelpers;
this.exec = scheduledExecutorFactory.create(1, "Coordinator-Exec--%d");
@ -404,7 +405,7 @@ public class DruidCoordinator
public void execute()
{
try {
if (curator.checkExists().forPath(toServedSegPath) != null &&
if (curator.checkExists().forPath(toServedSegPath) != null &&
curator.checkExists().forPath(toLoadQueueSegPath) == null &&
!dropPeon.getSegmentsToDrop().contains(segment)) {
dropPeon.dropSegment(segment, callback);
@ -560,12 +561,7 @@ public class DruidCoordinator
coordinatorRunnables.add(
Pair.of(
new CoordinatorIndexingServiceRunnable(
makeIndexingServiceHelpers(
configManager.watch(
DatasourceWhitelist.CONFIG_KEY,
DatasourceWhitelist.class
)
),
makeIndexingServiceHelpers(),
startingLeaderCounter
),
config.getCoordinatorIndexingPeriod()
@ -643,52 +639,13 @@ public class DruidCoordinator
}
}
private List<DruidCoordinatorHelper> makeIndexingServiceHelpers(
final AtomicReference<DatasourceWhitelist> whitelistRef
)
private List<DruidCoordinatorHelper> makeIndexingServiceHelpers()
{
List<DruidCoordinatorHelper> helpers = Lists.newArrayList();
helpers.add(new DruidCoordinatorSegmentInfoLoader(DruidCoordinator.this));
helpers.addAll(indexingServiceHelpers);
if (config.isConvertSegments()) {
helpers.add(new DruidCoordinatorVersionConverter(indexingServiceClient, whitelistRef));
}
if (config.isMergeSegments()) {
helpers.add(new DruidCoordinatorSegmentMerger(indexingServiceClient, whitelistRef));
helpers.add(
new DruidCoordinatorHelper()
{
@Override
public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params)
{
CoordinatorStats stats = params.getCoordinatorStats();
log.info("Issued merge requests for %s segments", stats.getGlobalStats().get("mergedCount").get());
params.getEmitter().emit(
new ServiceMetricEvent.Builder().build(
"coordinator/merge/count", stats.getGlobalStats().get("mergedCount")
)
);
return params;
}
}
);
}
if (config.isKillSegments()) {
helpers.add(
new DruidCoordinatorSegmentKiller(
metadataSegmentManager,
indexingServiceClient,
config.getCoordinatorKillDurationToRetain(),
config.getCoordinatorKillPeriod(),
config.getCoordinatorKillMaxSegments()
)
);
}
log.info("Done making indexing service helpers [%s]", helpers);
return ImmutableList.copyOf(helpers);
}

View File

@ -21,12 +21,13 @@ package io.druid.server.coordinator.helper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.inject.Inject;
import com.metamx.common.logger.Logger;
import io.druid.client.indexing.IndexingServiceClient;
import io.druid.common.utils.JodaUtils;
import io.druid.metadata.MetadataSegmentManager;
import io.druid.server.coordinator.DruidCoordinatorConfig;
import io.druid.server.coordinator.DruidCoordinatorRuntimeParams;
import org.joda.time.Duration;
import org.joda.time.Interval;
import java.util.List;
@ -47,21 +48,23 @@ public class DruidCoordinatorSegmentKiller implements DruidCoordinatorHelper
private final MetadataSegmentManager segmentManager;
private final IndexingServiceClient indexingServiceClient;
@Inject
public DruidCoordinatorSegmentKiller(
MetadataSegmentManager segmentManager,
IndexingServiceClient indexingServiceClient,
Duration retainDuration,
Duration period,
int maxSegmentsToKill
DruidCoordinatorConfig config
)
{
this.period = period.getMillis();
Preconditions.checkArgument(this.period > 0, "coordinator kill period must be > 0");
this.period = config.getCoordinatorKillPeriod().getMillis();
Preconditions.checkArgument(
this.period > config.getCoordinatorIndexingPeriod().getMillis(),
"coordinator kill period must be greater than druid.coordinator.period.indexingPeriod"
);
this.retainDuration = retainDuration.getMillis();
this.retainDuration = config.getCoordinatorKillDurationToRetain().getMillis();
Preconditions.checkArgument(this.retainDuration >= 0, "coordinator kill retainDuration must be >= 0");
this.maxSegmentsToKill = maxSegmentsToKill;
this.maxSegmentsToKill = config.getCoordinatorKillMaxSegments();
Preconditions.checkArgument(this.maxSegmentsToKill > 0, "coordinator kill maxSegments must be > 0");
log.info(

View File

@ -28,11 +28,14 @@ import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Multiset;
import com.google.common.collect.Ordering;
import com.google.inject.Inject;
import com.metamx.common.ISE;
import com.metamx.common.Pair;
import com.metamx.common.guava.FunctionalIterable;
import com.metamx.common.logger.Logger;
import com.metamx.emitter.service.ServiceMetricEvent;
import io.druid.client.indexing.IndexingServiceClient;
import io.druid.common.config.JacksonConfigManager;
import io.druid.server.coordinator.CoordinatorStats;
import io.druid.server.coordinator.DatasourceWhitelist;
import io.druid.server.coordinator.DruidCoordinatorRuntimeParams;
@ -57,13 +60,14 @@ public class DruidCoordinatorSegmentMerger implements DruidCoordinatorHelper
private final IndexingServiceClient indexingServiceClient;
private final AtomicReference<DatasourceWhitelist> whiteListRef;
@Inject
public DruidCoordinatorSegmentMerger(
IndexingServiceClient indexingServiceClient,
AtomicReference<DatasourceWhitelist> whitelistRef
JacksonConfigManager configManager
)
{
this.indexingServiceClient = indexingServiceClient;
this.whiteListRef = whitelistRef;
this.whiteListRef = configManager.watch(DatasourceWhitelist.CONFIG_KEY, DatasourceWhitelist.class);
}
@Override
@ -126,6 +130,14 @@ public class DruidCoordinatorSegmentMerger implements DruidCoordinatorHelper
}
}
log.info("Issued merge requests for %s segments", stats.getGlobalStats().get("mergedCount").get());
params.getEmitter().emit(
new ServiceMetricEvent.Builder().build(
"coordinator/merge/count", stats.getGlobalStats().get("mergedCount")
)
);
return params.buildFromExisting()
.withCoordinatorStats(stats)
.build();

View File

@ -19,8 +19,10 @@
package io.druid.server.coordinator.helper;
import com.google.inject.Inject;
import com.metamx.emitter.EmittingLogger;
import io.druid.client.indexing.IndexingServiceClient;
import io.druid.common.config.JacksonConfigManager;
import io.druid.segment.IndexIO;
import io.druid.server.coordinator.DatasourceWhitelist;
import io.druid.server.coordinator.DruidCoordinatorRuntimeParams;
@ -32,17 +34,17 @@ public class DruidCoordinatorVersionConverter implements DruidCoordinatorHelper
{
private static final EmittingLogger log = new EmittingLogger(DruidCoordinatorVersionConverter.class);
private final IndexingServiceClient indexingServiceClient;
private final AtomicReference<DatasourceWhitelist> whitelistRef;
@Inject
public DruidCoordinatorVersionConverter(
IndexingServiceClient indexingServiceClient,
AtomicReference<DatasourceWhitelist> whitelistRef
JacksonConfigManager configManager
)
{
this.indexingServiceClient = indexingServiceClient;
this.whitelistRef = whitelistRef;
this.whitelistRef = configManager.watch(DatasourceWhitelist.CONFIG_KEY, DatasourceWhitelist.class);
}
@Override

View File

@ -22,10 +22,13 @@ package io.druid.server.coordinator;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import com.metamx.emitter.service.ServiceEmitter;
import io.druid.client.indexing.IndexingServiceClient;
import io.druid.common.config.JacksonConfigManager;
import io.druid.server.coordinator.helper.DruidCoordinatorSegmentMerger;
import io.druid.timeline.DataSegment;
import io.druid.timeline.partition.LinearShardSpec;
import org.easymock.EasyMock;
import org.joda.time.Interval;
import org.junit.Assert;
import org.junit.Test;
@ -446,6 +449,11 @@ public class DruidCoordinatorSegmentMergerTest
*/
private static List<List<DataSegment>> merge(final Collection<DataSegment> segments)
{
final JacksonConfigManager configManager = EasyMock.createMock(JacksonConfigManager.class);
EasyMock.expect(configManager.watch(DatasourceWhitelist.CONFIG_KEY, DatasourceWhitelist.class))
.andReturn(new AtomicReference<DatasourceWhitelist>(null)).anyTimes();
EasyMock.replay(configManager);
final List<List<DataSegment>> retVal = Lists.newArrayList();
final IndexingServiceClient indexingServiceClient = new IndexingServiceClient(null, null, null)
{
@ -456,19 +464,17 @@ public class DruidCoordinatorSegmentMergerTest
}
};
final AtomicReference<DatasourceWhitelist> whitelistRef = new AtomicReference<DatasourceWhitelist>(null);
final DruidCoordinatorSegmentMerger merger = new DruidCoordinatorSegmentMerger(indexingServiceClient, whitelistRef);
final DruidCoordinatorRuntimeParams params = DruidCoordinatorRuntimeParams.newBuilder()
.withAvailableSegments(ImmutableSet.copyOf(segments))
.withDynamicConfigs(
new CoordinatorDynamicConfig.Builder().withMergeBytesLimit(
mergeBytesLimit
).withMergeSegmentsLimit(
mergeSegmentsLimit
)
.build()
)
.build();
final DruidCoordinatorSegmentMerger merger = new DruidCoordinatorSegmentMerger(
indexingServiceClient,
configManager
);
final DruidCoordinatorRuntimeParams params =
DruidCoordinatorRuntimeParams.newBuilder()
.withAvailableSegments(ImmutableSet.copyOf(segments))
.withDynamicConfigs(new CoordinatorDynamicConfig.Builder().withMergeBytesLimit(
mergeBytesLimit).withMergeSegmentsLimit(mergeSegmentsLimit).build())
.withEmitter(EasyMock.createMock(ServiceEmitter.class))
.build();
merger.run(params);
return retVal;
}

View File

@ -186,7 +186,8 @@ public class DruidCoordinatorTest extends CuratorTestBase
}
},
druidNode,
loadManagementPeons
loadManagementPeons,
null
);
}

View File

@ -22,6 +22,7 @@ package io.druid.server.coordinator.helper;
import com.google.common.collect.ImmutableList;
import io.druid.client.indexing.IndexingServiceClient;
import io.druid.metadata.MetadataSegmentManager;
import io.druid.server.coordinator.TestDruidCoordinatorConfig;
import org.easymock.EasyMock;
import org.joda.time.Duration;
import org.joda.time.Interval;
@ -100,9 +101,18 @@ public class DruidCoordinatorSegmentKillerTest
DruidCoordinatorSegmentKiller coordinatorSegmentKiller = new DruidCoordinatorSegmentKiller(
segmentManager,
indexingServiceClient,
Duration.parse("PT86400S"),
Duration.parse("PT86400S"),
1000
new TestDruidCoordinatorConfig(
null,
null,
Duration.parse("PT76400S"),
new Duration(1),
Duration.parse("PT86400S"),
Duration.parse("PT86400S"),
1000,
null,
false,
false
)
);
Assert.assertEquals(

View File

@ -20,8 +20,10 @@
package io.druid.cli;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Predicates;
import com.google.common.collect.ImmutableList;
import com.google.inject.Binder;
import com.google.inject.Inject;
import com.google.inject.Module;
import com.google.inject.Provides;
import com.google.inject.name.Names;
@ -31,12 +33,14 @@ import io.airlift.airline.Command;
import io.druid.audit.AuditManager;
import io.druid.client.CoordinatorServerView;
import io.druid.client.indexing.IndexingServiceClient;
import io.druid.guice.ConditionalMultibind;
import io.druid.guice.ConfigProvider;
import io.druid.guice.Jerseys;
import io.druid.guice.JsonConfigProvider;
import io.druid.guice.LazySingleton;
import io.druid.guice.LifecycleModule;
import io.druid.guice.ManageLifecycle;
import io.druid.guice.annotations.CoordinatorIndexingServiceHelper;
import io.druid.metadata.MetadataRuleManager;
import io.druid.metadata.MetadataRuleManagerConfig;
import io.druid.metadata.MetadataRuleManagerProvider;
@ -49,6 +53,10 @@ import io.druid.server.audit.AuditManagerProvider;
import io.druid.server.coordinator.DruidCoordinator;
import io.druid.server.coordinator.DruidCoordinatorConfig;
import io.druid.server.coordinator.LoadQueueTaskMaster;
import io.druid.server.coordinator.helper.DruidCoordinatorHelper;
import io.druid.server.coordinator.helper.DruidCoordinatorSegmentKiller;
import io.druid.server.coordinator.helper.DruidCoordinatorSegmentMerger;
import io.druid.server.coordinator.helper.DruidCoordinatorVersionConverter;
import io.druid.server.http.CoordinatorDynamicConfigsResource;
import io.druid.server.http.CoordinatorRedirectInfo;
import io.druid.server.http.CoordinatorResource;
@ -70,6 +78,7 @@ import org.apache.curator.framework.CuratorFramework;
import org.eclipse.jetty.server.Server;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.Executors;
/**
@ -82,11 +91,19 @@ public class CliCoordinator extends ServerRunnable
{
private static final Logger log = new Logger(CliCoordinator.class);
private Properties properties;
public CliCoordinator()
{
super(log);
}
@Inject
public void configure(Properties properties)
{
this.properties = properties;
}
@Override
protected List<? extends Module> getModules()
{
@ -155,6 +172,25 @@ public class CliCoordinator extends ServerRunnable
LifecycleModule.register(binder, Server.class);
LifecycleModule.register(binder, DatasourcesResource.class);
ConditionalMultibind.create(
properties,
binder,
DruidCoordinatorHelper.class,
CoordinatorIndexingServiceHelper.class
).addConditionBinding(
"druid.coordinator.merge.on",
Predicates.equalTo("true"),
DruidCoordinatorSegmentMerger.class
).addConditionBinding(
"druid.coordinator.conversion.on",
Predicates.equalTo("true"),
DruidCoordinatorVersionConverter.class
).addConditionBinding(
"druid.coordinator.kill.on",
Predicates.equalTo("true"),
DruidCoordinatorSegmentKiller.class
);
}
@Provides