Merge pull request #122 from metamx/config-master

Slightly better configuration when talking to indexing service
This commit is contained in:
gianm 2013-04-15 14:58:44 -07:00
commit 7a21ef9fd6
3 changed files with 57 additions and 29 deletions

View File

@ -376,6 +376,10 @@ public class InfoResource
@QueryParam("interval") final String interval
)
{
// TODO: will likely be all rewritten once Guice introduced
if (indexingServiceClient == null) {
return Response.status(Response.Status.OK).entity(ImmutableMap.of("error", "no indexing service found")).build();
}
if (kill != null && Boolean.valueOf(kill)) {
indexingServiceClient.killSegments(dataSourceName, new Interval(interval));
} else {

View File

@ -433,11 +433,17 @@ public class DruidMaster
final List<Pair<? extends MasterRunnable, Duration>> masterRunnables = Lists.newArrayList();
masterRunnables.add(Pair.of(new MasterComputeManagerRunnable(), config.getMasterPeriod()));
if (config.isMergeSegments() && indexingServiceClient != null) {
if (indexingServiceClient != null) {
masterRunnables.add(
Pair.of(
new MasterSegmentMergerRunnable(configManager.watch(MergerWhitelist.CONFIG_KEY, MergerWhitelist.class)),
new MasterIndexingServiceRunnable(
makeIndexingServiceHelpers(
configManager.watch(
MergerWhitelist.CONFIG_KEY,
MergerWhitelist.class
)
)
),
config.getMasterSegmentMergerPeriod()
)
);
@ -502,6 +508,41 @@ public class DruidMaster
}
}
private List<DruidMasterHelper> makeIndexingServiceHelpers(final AtomicReference<MergerWhitelist> whitelistRef)
{
List<DruidMasterHelper> helpers = Lists.newArrayList();
helpers.add(new DruidMasterSegmentInfoLoader(DruidMaster.this));
if (config.isConvertSegments()) {
helpers.add(new DruidMasterVersionConverter(indexingServiceClient, whitelistRef));
}
if (config.isMergeSegments()) {
helpers.add(new DruidMasterSegmentMerger(indexingServiceClient, whitelistRef));
helpers.add(
new DruidMasterHelper()
{
@Override
public DruidMasterRuntimeParams run(DruidMasterRuntimeParams params)
{
MasterStats stats = params.getMasterStats();
log.info("Issued merge requests for %s segments", stats.getGlobalStats().get("mergedCount").get());
params.getEmitter().emit(
new ServiceMetricEvent.Builder().build(
"master/merge/count", stats.getGlobalStats().get("mergedCount")
)
);
return params;
}
}
);
}
return ImmutableList.copyOf(helpers);
}
public static class DruidMasterVersionConverter implements DruidMasterHelper
{
private final IndexingServiceClient indexingServiceClient;
@ -728,34 +769,11 @@ public class DruidMaster
}
}
private class MasterSegmentMergerRunnable extends MasterRunnable
private class MasterIndexingServiceRunnable extends MasterRunnable
{
private MasterSegmentMergerRunnable(final AtomicReference<MergerWhitelist> whitelistRef)
private MasterIndexingServiceRunnable(List<DruidMasterHelper> helpers)
{
super(
ImmutableList.of(
new DruidMasterSegmentInfoLoader(DruidMaster.this),
new DruidMasterVersionConverter(indexingServiceClient, whitelistRef),
new DruidMasterSegmentMerger(indexingServiceClient, whitelistRef),
new DruidMasterHelper()
{
@Override
public DruidMasterRuntimeParams run(DruidMasterRuntimeParams params)
{
MasterStats stats = params.getMasterStats();
log.info("Issued merge requests for %s segments", stats.getGlobalStats().get("mergedCount").get());
params.getEmitter().emit(
new ServiceMetricEvent.Builder().build(
"master/merge/count", stats.getGlobalStats().get("mergedCount")
)
);
return params;
}
}
)
);
super(helpers);
}
}
}

View File

@ -63,6 +63,12 @@ public abstract class DruidMasterConfig
return true;
}
@Config("druid.master.conversion.on")
public boolean isConvertSegments()
{
return true;
}
@Config("druid.master.merger.service")
public String getMergerServiceName()
{