From cc929b2a07482658f4d7542cf3833f00b4d4b0ea Mon Sep 17 00:00:00 2001 From: Fangjin Yang Date: Mon, 15 Apr 2013 11:09:04 -0700 Subject: [PATCH] slightly better configuration when talking to indexing service --- .../com/metamx/druid/http/InfoResource.java | 4 + .../com/metamx/druid/master/DruidMaster.java | 76 ++++++++++++------- .../druid/master/DruidMasterConfig.java | 6 ++ 3 files changed, 57 insertions(+), 29 deletions(-) diff --git a/server/src/main/java/com/metamx/druid/http/InfoResource.java b/server/src/main/java/com/metamx/druid/http/InfoResource.java index acd94438d84..3bdef858caa 100644 --- a/server/src/main/java/com/metamx/druid/http/InfoResource.java +++ b/server/src/main/java/com/metamx/druid/http/InfoResource.java @@ -376,6 +376,10 @@ public class InfoResource @QueryParam("interval") final String interval ) { + // TODO: will likely be all rewritten once Guice introduced + if (indexingServiceClient == null) { + return Response.status(Response.Status.OK).entity(ImmutableMap.of("error", "no indexing service found")).build(); + } if (kill != null && Boolean.valueOf(kill)) { indexingServiceClient.killSegments(dataSourceName, new Interval(interval)); } else { diff --git a/server/src/main/java/com/metamx/druid/master/DruidMaster.java b/server/src/main/java/com/metamx/druid/master/DruidMaster.java index dff4d93d10c..2eeabd9db0f 100644 --- a/server/src/main/java/com/metamx/druid/master/DruidMaster.java +++ b/server/src/main/java/com/metamx/druid/master/DruidMaster.java @@ -433,11 +433,17 @@ public class DruidMaster final List> masterRunnables = Lists.newArrayList(); masterRunnables.add(Pair.of(new MasterComputeManagerRunnable(), config.getMasterPeriod())); - if (config.isMergeSegments() && indexingServiceClient != null) { - + if (indexingServiceClient != null) { masterRunnables.add( Pair.of( - new MasterSegmentMergerRunnable(configManager.watch(MergerWhitelist.CONFIG_KEY, MergerWhitelist.class)), + new MasterIndexingServiceRunnable( + makeIndexingServiceHelpers( + configManager.watch( + MergerWhitelist.CONFIG_KEY, + MergerWhitelist.class + ) + ) + ), config.getMasterSegmentMergerPeriod() ) ); @@ -502,6 +508,41 @@ public class DruidMaster } } + private List makeIndexingServiceHelpers(final AtomicReference whitelistRef) + { + List helpers = Lists.newArrayList(); + + helpers.add(new DruidMasterSegmentInfoLoader(DruidMaster.this)); + + if (config.isConvertSegments()) { + helpers.add(new DruidMasterVersionConverter(indexingServiceClient, whitelistRef)); + } + if (config.isMergeSegments()) { + helpers.add(new DruidMasterSegmentMerger(indexingServiceClient, whitelistRef)); + helpers.add( + new DruidMasterHelper() + { + @Override + public DruidMasterRuntimeParams run(DruidMasterRuntimeParams params) + { + MasterStats stats = params.getMasterStats(); + log.info("Issued merge requests for %s segments", stats.getGlobalStats().get("mergedCount").get()); + + params.getEmitter().emit( + new ServiceMetricEvent.Builder().build( + "master/merge/count", stats.getGlobalStats().get("mergedCount") + ) + ); + + return params; + } + } + ); + } + + return ImmutableList.copyOf(helpers); + } + public static class DruidMasterVersionConverter implements DruidMasterHelper { private final IndexingServiceClient indexingServiceClient; @@ -728,34 +769,11 @@ public class DruidMaster } } - private class MasterSegmentMergerRunnable extends MasterRunnable + private class MasterIndexingServiceRunnable extends MasterRunnable { - private MasterSegmentMergerRunnable(final AtomicReference whitelistRef) + private MasterIndexingServiceRunnable(List helpers) { - super( - ImmutableList.of( - new DruidMasterSegmentInfoLoader(DruidMaster.this), - new DruidMasterVersionConverter(indexingServiceClient, whitelistRef), - new DruidMasterSegmentMerger(indexingServiceClient, whitelistRef), - new DruidMasterHelper() - { - @Override - public DruidMasterRuntimeParams run(DruidMasterRuntimeParams params) - { - MasterStats stats = params.getMasterStats(); - log.info("Issued merge requests for %s segments", stats.getGlobalStats().get("mergedCount").get()); - - params.getEmitter().emit( - new ServiceMetricEvent.Builder().build( - "master/merge/count", stats.getGlobalStats().get("mergedCount") - ) - ); - - return params; - } - } - ) - ); + super(helpers); } } } diff --git a/server/src/main/java/com/metamx/druid/master/DruidMasterConfig.java b/server/src/main/java/com/metamx/druid/master/DruidMasterConfig.java index 755c668dc0e..88f2a348ed0 100644 --- a/server/src/main/java/com/metamx/druid/master/DruidMasterConfig.java +++ b/server/src/main/java/com/metamx/druid/master/DruidMasterConfig.java @@ -63,6 +63,12 @@ public abstract class DruidMasterConfig return true; } + @Config("druid.master.conversion.on") + public boolean isConvertSegments() + { + return true; + } + @Config("druid.master.merger.service") public String getMergerServiceName() {