merged changes with master

This commit is contained in:
Dhruv Parthasarathy 2013-08-08 18:11:33 -07:00
commit fc5e15f72b
21 changed files with 338 additions and 165 deletions

View File

@ -102,6 +102,7 @@ public class InfoResource
private final DatabaseRuleManager databaseRuleManager; private final DatabaseRuleManager databaseRuleManager;
private final IndexingServiceClient indexingServiceClient; private final IndexingServiceClient indexingServiceClient;
@Inject @Inject
public InfoResource( public InfoResource(
DruidMaster master, DruidMaster master,
@ -129,6 +130,7 @@ public class InfoResource
.build(); .build();
} }
@GET @GET
@Path("/cluster") @Path("/cluster")
@Produces("application/json") @Produces("application/json")

View File

@ -283,7 +283,8 @@ public class MasterMain
databaseRuleManager, databaseRuleManager,
master, master,
jsonMapper, jsonMapper,
indexingServiceClient indexingServiceClient,
configManager
) )
); );

View File

@ -24,13 +24,10 @@ import com.metamx.druid.master.LoadPeonCallback;
import javax.inject.Inject; import javax.inject.Inject;
import javax.ws.rs.Consumes; import javax.ws.rs.Consumes;
import javax.ws.rs.GET;
import javax.ws.rs.POST; import javax.ws.rs.POST;
import javax.ws.rs.Path; import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.core.Response; import javax.ws.rs.core.Response;
import java.util.List; import java.util.List;
import java.util.Map;
/** /**
*/ */
@ -110,11 +107,5 @@ public class MasterResource
return resp; return resp;
} }
@GET
@Path("/loadstatus")
@Produces("application/json")
public Map<String, Double> getLoadStatus()
{
return master.getLoadStatus();
}
} }

View File

@ -0,0 +1,69 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.http;
import com.metamx.druid.config.JacksonConfigManager;
import com.metamx.druid.master.MasterSegmentSettings;
import javax.inject.Inject;
import javax.ws.rs.Consumes;
import javax.ws.rs.GET;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.core.Response;
/**
*/
@Path("/master/config")
public class MasterSegmentSettingsResource
{
private final JacksonConfigManager manager;
@Inject
public MasterSegmentSettingsResource(
JacksonConfigManager manager
)
{
this.manager=manager;
}
@GET
@Produces("application/json")
public Response getDynamicConfigs()
{
Response.ResponseBuilder builder = Response.status(Response.Status.OK)
.entity(
manager.watch(MasterSegmentSettings.CONFIG_KEY,MasterSegmentSettings.class).get()
);
return builder.build();
}
@POST
@Consumes("application/json")
public Response setDynamicConfigs(
final MasterSegmentSettings masterSegmentSettings
)
{
if (!manager.set(MasterSegmentSettings.CONFIG_KEY, masterSegmentSettings)) {
return Response.status(Response.Status.BAD_REQUEST).build();
}
return Response.status(Response.Status.OK).build();
}
}

View File

@ -25,6 +25,7 @@ import com.google.inject.Provides;
import com.google.inject.util.Providers; import com.google.inject.util.Providers;
import com.metamx.druid.client.InventoryView; import com.metamx.druid.client.InventoryView;
import com.metamx.druid.client.indexing.IndexingServiceClient; import com.metamx.druid.client.indexing.IndexingServiceClient;
import com.metamx.druid.config.JacksonConfigManager;
import com.metamx.druid.db.DatabaseRuleManager; import com.metamx.druid.db.DatabaseRuleManager;
import com.metamx.druid.db.DatabaseSegmentManager; import com.metamx.druid.db.DatabaseSegmentManager;
import com.metamx.druid.master.DruidMaster; import com.metamx.druid.master.DruidMaster;
@ -43,6 +44,8 @@ public class MasterServletModule extends JerseyServletModule
private final DruidMaster master; private final DruidMaster master;
private final ObjectMapper jsonMapper; private final ObjectMapper jsonMapper;
private final IndexingServiceClient indexingServiceClient; private final IndexingServiceClient indexingServiceClient;
private final JacksonConfigManager configManager;
public MasterServletModule( public MasterServletModule(
InventoryView serverInventoryView, InventoryView serverInventoryView,
@ -50,7 +53,8 @@ public class MasterServletModule extends JerseyServletModule
DatabaseRuleManager databaseRuleManager, DatabaseRuleManager databaseRuleManager,
DruidMaster master, DruidMaster master,
ObjectMapper jsonMapper, ObjectMapper jsonMapper,
IndexingServiceClient indexingServiceClient IndexingServiceClient indexingServiceClient,
JacksonConfigManager configManager
) )
{ {
this.serverInventoryView = serverInventoryView; this.serverInventoryView = serverInventoryView;
@ -59,17 +63,20 @@ public class MasterServletModule extends JerseyServletModule
this.master = master; this.master = master;
this.jsonMapper = jsonMapper; this.jsonMapper = jsonMapper;
this.indexingServiceClient = indexingServiceClient; this.indexingServiceClient = indexingServiceClient;
this.configManager = configManager;
} }
@Override @Override
protected void configureServlets() protected void configureServlets()
{ {
bind(MasterSegmentSettingsResource.class);
bind(InfoResource.class); bind(InfoResource.class);
bind(MasterResource.class); bind(MasterResource.class);
bind(InventoryView.class).toInstance(serverInventoryView); bind(InventoryView.class).toInstance(serverInventoryView);
bind(DatabaseSegmentManager.class).toInstance(segmentInventoryManager); bind(DatabaseSegmentManager.class).toInstance(segmentInventoryManager);
bind(DatabaseRuleManager.class).toInstance(databaseRuleManager); bind(DatabaseRuleManager.class).toInstance(databaseRuleManager);
bind(DruidMaster.class).toInstance(master); bind(DruidMaster.class).toInstance(master);
bind(JacksonConfigManager.class).toInstance(configManager);
if (indexingServiceClient == null) { if (indexingServiceClient == null) {
bind(IndexingServiceClient.class).toProvider(Providers.<IndexingServiceClient>of(null)); bind(IndexingServiceClient.class).toProvider(Providers.<IndexingServiceClient>of(null));
} }

View File

@ -94,6 +94,7 @@ public class DruidMaster
private final LoadQueueTaskMaster taskMaster; private final LoadQueueTaskMaster taskMaster;
private final Map<String, LoadQueuePeon> loadManagementPeons; private final Map<String, LoadQueuePeon> loadManagementPeons;
private final AtomicReference<LeaderLatch> leaderLatch; private final AtomicReference<LeaderLatch> leaderLatch;
private volatile AtomicReference<MasterSegmentSettings> segmentSettingsAtomicReference;
public DruidMaster( public DruidMaster(
DruidMasterConfig config, DruidMasterConfig config,
@ -155,6 +156,7 @@ public class DruidMaster
this.exec = scheduledExecutorFactory.create(1, "Master-Exec--%d"); this.exec = scheduledExecutorFactory.create(1, "Master-Exec--%d");
this.leaderLatch = new AtomicReference<LeaderLatch>(null); this.leaderLatch = new AtomicReference<LeaderLatch>(null);
this.segmentSettingsAtomicReference= new AtomicReference<MasterSegmentSettings>(null);
this.loadManagementPeons = loadQueuePeonMap; this.loadManagementPeons = loadQueuePeonMap;
} }
@ -464,7 +466,7 @@ public class DruidMaster
serverInventoryView.start(); serverInventoryView.start();
final List<Pair<? extends MasterRunnable, Duration>> masterRunnables = Lists.newArrayList(); final List<Pair<? extends MasterRunnable, Duration>> masterRunnables = Lists.newArrayList();
segmentSettingsAtomicReference = configManager.watch(MasterSegmentSettings.CONFIG_KEY, MasterSegmentSettings.class,new MasterSegmentSettings.Builder().build());
masterRunnables.add(Pair.of(new MasterComputeManagerRunnable(), config.getMasterPeriod())); masterRunnables.add(Pair.of(new MasterComputeManagerRunnable(), config.getMasterPeriod()));
if (indexingServiceClient != null) { if (indexingServiceClient != null) {
@ -649,19 +651,15 @@ public class DruidMaster
} }
// Do master stuff. // Do master stuff.
DruidMasterRuntimeParams params = DruidMasterRuntimeParams params =
DruidMasterRuntimeParams.newBuilder() DruidMasterRuntimeParams.newBuilder()
.withStartTime(startTime) .withStartTime(startTime)
.withDatasources(databaseSegmentManager.getInventory()) .withDatasources(databaseSegmentManager.getInventory())
.withMillisToWaitBeforeDeleting(config.getMillisToWaitBeforeDeleting()) .withMasterSegmentSettings(segmentSettingsAtomicReference.get())
.withEmitter(emitter) .withEmitter(emitter)
.withMergeBytesLimit(config.getMergeBytesLimit())
.withMergeSegmentsLimit(config.getMergeSegmentsLimit())
.withMaxSegmentsToMove(config.getMaxSegmentsToMove())
.withEmitBalancingCostParams(config.getEmitStats())
.build(); .build();
for (DruidMasterHelper helper : helpers) { for (DruidMasterHelper helper : helpers) {
params = helper.run(params); params = helper.run(params);
} }
@ -756,6 +754,9 @@ public class DruidMaster
.withLoadManagementPeons(loadManagementPeons) .withLoadManagementPeons(loadManagementPeons)
.withSegmentReplicantLookup(segmentReplicantLookup) .withSegmentReplicantLookup(segmentReplicantLookup)
.withBalancerReferenceTimestamp(DateTime.now()) .withBalancerReferenceTimestamp(DateTime.now())
.withMasterSegmentSettings(
segmentSettingsAtomicReference.get()
)
.build(); .build();
} }
}, },

View File

@ -79,7 +79,7 @@ public class DruidMasterBalancer implements DruidMasterHelper
final MasterStats stats = new MasterStats(); final MasterStats stats = new MasterStats();
final DateTime referenceTimestamp = params.getBalancerReferenceTimestamp(); final DateTime referenceTimestamp = params.getBalancerReferenceTimestamp();
final BalancerStrategy strategy = params.getBalancerStrategyFactory().getBalancerStrategy(referenceTimestamp); final BalancerStrategy strategy = params.getBalancerStrategyFactory().getBalancerStrategy(referenceTimestamp);
final int maxSegmentsToMove = params.getMaxSegmentsToMove(); final int maxSegmentsToMove = params.getMasterSegmentSettings().getMaxSegmentsToMove();
for (Map.Entry<String, MinMaxPriorityQueue<ServerHolder>> entry : for (Map.Entry<String, MinMaxPriorityQueue<ServerHolder>> entry :
params.getDruidCluster().getCluster().entrySet()) { params.getDruidCluster().getCluster().entrySet()) {
@ -124,7 +124,7 @@ public class DruidMasterBalancer implements DruidMasterHelper
} }
} }
stats.addToTieredStat("movedCount", tier, currentlyMovingSegments.get(tier).size()); stats.addToTieredStat("movedCount", tier, currentlyMovingSegments.get(tier).size());
if (params.getEmitStats()) { if (params.getMasterSegmentSettings().isEmitBalancingStats()) {
strategy.emitStats(tier, stats, serverHolderList); strategy.emitStats(tier, stats, serverHolderList);
} }

View File

@ -34,11 +34,6 @@ public abstract class DruidMasterConfig
@Default("PT600s") @Default("PT600s")
public abstract Duration getMasterStartDelay(); public abstract Duration getMasterStartDelay();
@Config("druid.master.emitStats")
public boolean getEmitStats(){
return false;
}
@Config("druid.master.period") @Config("druid.master.period")
@Default("PT60s") @Default("PT60s")
public abstract Duration getMasterPeriod(); public abstract Duration getMasterPeriod();
@ -47,12 +42,6 @@ public abstract class DruidMasterConfig
@Default("PT1800s") @Default("PT1800s")
public abstract Duration getMasterSegmentMergerPeriod(); public abstract Duration getMasterSegmentMergerPeriod();
@Config("druid.master.millisToWaitBeforeDeleting")
public long getMillisToWaitBeforeDeleting()
{
return 15 * 60 * 1000L;
}
@Config("druid.master.merger.on") @Config("druid.master.merger.on")
public boolean isMergeSegments() public boolean isMergeSegments()
{ {
@ -71,26 +60,16 @@ public abstract class DruidMasterConfig
return null; return null;
} }
@Config("druid.master.merge.threshold")
public long getMergeBytesLimit()
{
return 100000000L;
}
@Config("druid.master.merge.maxSegments")
public int getMergeSegmentsLimit()
{
return Integer.MAX_VALUE;
}
@Config("druid.master.balancer.maxSegmentsToMove")
@Default("5")
public abstract int getMaxSegmentsToMove();
@Config("druid.master.replicant.lifetime") @Config("druid.master.replicant.lifetime")
@Default("15") @Default("15")
public abstract int getReplicantLifetime(); public abstract int getReplicantLifetime();
@Config("druid.master.masterSegmentSettings")
public MasterSegmentSettings getMasterSegmentSettings()
{
return new MasterSegmentSettings.Builder().build();
}
@Config("druid.master.replicant.throttleLimit") @Config("druid.master.replicant.throttleLimit")
@Default("10") @Default("10")
public abstract int getReplicantThrottleLimit(); public abstract int getReplicantThrottleLimit();

View File

@ -46,13 +46,9 @@ public class DruidMasterRuntimeParams
private final Map<String, LoadQueuePeon> loadManagementPeons; private final Map<String, LoadQueuePeon> loadManagementPeons;
private final ReplicationThrottler replicationManager; private final ReplicationThrottler replicationManager;
private final ServiceEmitter emitter; private final ServiceEmitter emitter;
private final long millisToWaitBeforeDeleting; private final MasterSegmentSettings masterSegmentSettings;
private final MasterStats stats; private final MasterStats stats;
private final long mergeBytesLimit;
private final int mergeSegmentsLimit;
private final int maxSegmentsToMove;
private final DateTime balancerReferenceTimestamp; private final DateTime balancerReferenceTimestamp;
private final boolean emitStats;
private final BalancerStrategyFactory strategyFactory; private final BalancerStrategyFactory strategyFactory;
public DruidMasterRuntimeParams( public DruidMasterRuntimeParams(
@ -65,13 +61,9 @@ public class DruidMasterRuntimeParams
Map<String, LoadQueuePeon> loadManagementPeons, Map<String, LoadQueuePeon> loadManagementPeons,
ReplicationThrottler replicationManager, ReplicationThrottler replicationManager,
ServiceEmitter emitter, ServiceEmitter emitter,
long millisToWaitBeforeDeleting, MasterSegmentSettings masterSegmentSettings,
MasterStats stats, MasterStats stats,
long mergeBytesLimit,
int mergeSegmentsLimit,
int maxSegmentsToMove,
DateTime balancerReferenceTimestamp, DateTime balancerReferenceTimestamp,
boolean emitBalancingCostParams,
BalancerStrategyFactory strategyFactory BalancerStrategyFactory strategyFactory
) )
{ {
@ -84,21 +76,12 @@ public class DruidMasterRuntimeParams
this.loadManagementPeons = loadManagementPeons; this.loadManagementPeons = loadManagementPeons;
this.replicationManager = replicationManager; this.replicationManager = replicationManager;
this.emitter = emitter; this.emitter = emitter;
this.millisToWaitBeforeDeleting = millisToWaitBeforeDeleting; this.masterSegmentSettings = masterSegmentSettings;
this.stats = stats; this.stats = stats;
this.mergeBytesLimit = mergeBytesLimit;
this.mergeSegmentsLimit = mergeSegmentsLimit;
this.maxSegmentsToMove = maxSegmentsToMove;
this.balancerReferenceTimestamp = balancerReferenceTimestamp; this.balancerReferenceTimestamp = balancerReferenceTimestamp;
this.emitStats = emitBalancingCostParams;
this.strategyFactory = strategyFactory; this.strategyFactory = strategyFactory;
} }
public boolean getEmitStats()
{
return emitStats;
}
public long getStartTime() public long getStartTime()
{ {
return startTime; return startTime;
@ -144,9 +127,9 @@ public class DruidMasterRuntimeParams
return emitter; return emitter;
} }
public long getMillisToWaitBeforeDeleting() public MasterSegmentSettings getMasterSegmentSettings()
{ {
return millisToWaitBeforeDeleting; return masterSegmentSettings;
} }
public MasterStats getMasterStats() public MasterStats getMasterStats()
@ -154,21 +137,6 @@ public class DruidMasterRuntimeParams
return stats; return stats;
} }
public long getMergeBytesLimit()
{
return mergeBytesLimit;
}
public int getMergeSegmentsLimit()
{
return mergeSegmentsLimit;
}
public int getMaxSegmentsToMove()
{
return maxSegmentsToMove;
}
public DateTime getBalancerReferenceTimestamp() public DateTime getBalancerReferenceTimestamp()
{ {
return balancerReferenceTimestamp; return balancerReferenceTimestamp;
@ -181,7 +149,7 @@ public class DruidMasterRuntimeParams
public boolean hasDeletionWaitTimeElapsed() public boolean hasDeletionWaitTimeElapsed()
{ {
return (System.currentTimeMillis() - getStartTime() > getMillisToWaitBeforeDeleting()); return (System.currentTimeMillis() - getStartTime() > masterSegmentSettings.getMillisToWaitBeforeDeleting());
} }
public static Builder newBuilder() public static Builder newBuilder()
@ -201,13 +169,9 @@ public class DruidMasterRuntimeParams
loadManagementPeons, loadManagementPeons,
replicationManager, replicationManager,
emitter, emitter,
millisToWaitBeforeDeleting, masterSegmentSettings,
stats, stats,
mergeBytesLimit,
mergeSegmentsLimit,
maxSegmentsToMove,
balancerReferenceTimestamp, balancerReferenceTimestamp,
emitStats,
strategyFactory strategyFactory
); );
} }
@ -223,11 +187,8 @@ public class DruidMasterRuntimeParams
private final Map<String, LoadQueuePeon> loadManagementPeons; private final Map<String, LoadQueuePeon> loadManagementPeons;
private ReplicationThrottler replicationManager; private ReplicationThrottler replicationManager;
private ServiceEmitter emitter; private ServiceEmitter emitter;
private long millisToWaitBeforeDeleting; private MasterSegmentSettings masterSegmentSettings;
private MasterStats stats; private MasterStats stats;
private long mergeBytesLimit;
private int mergeSegmentsLimit;
private int maxSegmentsToMove;
private DateTime balancerReferenceTimestamp; private DateTime balancerReferenceTimestamp;
private boolean emitBalancingCostParams; private boolean emitBalancingCostParams;
private BalancerStrategyFactory strategyFactory; private BalancerStrategyFactory strategyFactory;
@ -243,11 +204,8 @@ public class DruidMasterRuntimeParams
this.loadManagementPeons = Maps.newHashMap(); this.loadManagementPeons = Maps.newHashMap();
this.replicationManager = null; this.replicationManager = null;
this.emitter = null; this.emitter = null;
this.millisToWaitBeforeDeleting = 0;
this.stats = new MasterStats(); this.stats = new MasterStats();
this.mergeBytesLimit = 0; this.masterSegmentSettings = new MasterSegmentSettings.Builder().build();
this.mergeSegmentsLimit = 0;
this.maxSegmentsToMove = 0;
this.balancerReferenceTimestamp = null; this.balancerReferenceTimestamp = null;
this.emitBalancingCostParams = false; this.emitBalancingCostParams = false;
this.strategyFactory = new CostBalancerStrategyFactory(); this.strategyFactory = new CostBalancerStrategyFactory();
@ -263,13 +221,9 @@ public class DruidMasterRuntimeParams
Map<String, LoadQueuePeon> loadManagementPeons, Map<String, LoadQueuePeon> loadManagementPeons,
ReplicationThrottler replicationManager, ReplicationThrottler replicationManager,
ServiceEmitter emitter, ServiceEmitter emitter,
long millisToWaitBeforeDeleting, MasterSegmentSettings masterSegmentSettings,
MasterStats stats, MasterStats stats,
long mergeBytesLimit,
int mergeSegmentsLimit,
int maxSegmentsToMove,
DateTime balancerReferenceTimestamp, DateTime balancerReferenceTimestamp,
boolean emitBalancingCostParams,
BalancerStrategyFactory strategyFactory BalancerStrategyFactory strategyFactory
) )
{ {
@ -282,11 +236,8 @@ public class DruidMasterRuntimeParams
this.loadManagementPeons = loadManagementPeons; this.loadManagementPeons = loadManagementPeons;
this.replicationManager = replicationManager; this.replicationManager = replicationManager;
this.emitter = emitter; this.emitter = emitter;
this.millisToWaitBeforeDeleting = millisToWaitBeforeDeleting; this.masterSegmentSettings = masterSegmentSettings;
this.stats = stats; this.stats = stats;
this.mergeBytesLimit = mergeBytesLimit;
this.mergeSegmentsLimit = mergeSegmentsLimit;
this.maxSegmentsToMove = maxSegmentsToMove;
this.balancerReferenceTimestamp = balancerReferenceTimestamp; this.balancerReferenceTimestamp = balancerReferenceTimestamp;
this.emitBalancingCostParams = emitBalancingCostParams; this.emitBalancingCostParams = emitBalancingCostParams;
this.strategyFactory=strategyFactory; this.strategyFactory=strategyFactory;
@ -304,13 +255,9 @@ public class DruidMasterRuntimeParams
loadManagementPeons, loadManagementPeons,
replicationManager, replicationManager,
emitter, emitter,
millisToWaitBeforeDeleting, masterSegmentSettings,
stats, stats,
mergeBytesLimit,
mergeSegmentsLimit,
maxSegmentsToMove,
balancerReferenceTimestamp, balancerReferenceTimestamp,
emitBalancingCostParams,
strategyFactory strategyFactory
); );
} }
@ -381,33 +328,15 @@ public class DruidMasterRuntimeParams
return this; return this;
} }
public Builder withMillisToWaitBeforeDeleting(long millisToWaitBeforeDeleting)
{
this.millisToWaitBeforeDeleting = millisToWaitBeforeDeleting;
return this;
}
public Builder withMasterStats(MasterStats stats) public Builder withMasterStats(MasterStats stats)
{ {
this.stats.accumulate(stats); this.stats.accumulate(stats);
return this; return this;
} }
public Builder withMergeBytesLimit(long mergeBytesLimit) public Builder withMasterSegmentSettings(MasterSegmentSettings configs)
{ {
this.mergeBytesLimit = mergeBytesLimit; this.masterSegmentSettings = configs;
return this;
}
public Builder withMergeSegmentsLimit(int mergeSegmentsLimit)
{
this.mergeSegmentsLimit = mergeSegmentsLimit;
return this;
}
public Builder withMaxSegmentsToMove(int maxSegmentsToMove)
{
this.maxSegmentsToMove = maxSegmentsToMove;
return this; return this;
} }

View File

@ -21,7 +21,6 @@ package com.metamx.druid.master;
import com.google.common.base.Function; import com.google.common.base.Function;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.collect.HashMultiset; import com.google.common.collect.HashMultiset;
import com.google.common.collect.ImmutableSet; import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables; import com.google.common.collect.Iterables;
@ -100,9 +99,9 @@ public class DruidMasterSegmentMerger implements DruidMasterHelper
for (int i = 0; i < timelineObjects.size(); i++) { for (int i = 0; i < timelineObjects.size(); i++) {
if (!segmentsToMerge.add(timelineObjects.get(i)) if (!segmentsToMerge.add(timelineObjects.get(i))
|| segmentsToMerge.getByteCount() > params.getMergeBytesLimit() || segmentsToMerge.getByteCount() > params.getMasterSegmentSettings().getMergeBytesLimit()
|| segmentsToMerge.getSegmentCount() >= params.getMergeSegmentsLimit()) { || segmentsToMerge.getSegmentCount() >= params.getMasterSegmentSettings().getMergeSegmentsLimit()) {
i -= segmentsToMerge.backtrack(params.getMergeBytesLimit()); i -= segmentsToMerge.backtrack(params.getMasterSegmentSettings().getMergeBytesLimit());
if (segmentsToMerge.getSegmentCount() > 1) { if (segmentsToMerge.getSegmentCount() > 1) {
stats.addToGlobalStat("mergedCount", mergeSegments(segmentsToMerge, entry.getKey())); stats.addToGlobalStat("mergedCount", mergeSegments(segmentsToMerge, entry.getKey()));
@ -118,7 +117,7 @@ public class DruidMasterSegmentMerger implements DruidMasterHelper
} }
// Finish any timelineObjects to merge that may have not hit threshold // Finish any timelineObjects to merge that may have not hit threshold
segmentsToMerge.backtrack(params.getMergeBytesLimit()); segmentsToMerge.backtrack(params.getMasterSegmentSettings().getMergeBytesLimit());
if (segmentsToMerge.getSegmentCount() > 1) { if (segmentsToMerge.getSegmentCount() > 1) {
stats.addToGlobalStat("mergedCount", mergeSegments(segmentsToMerge, entry.getKey())); stats.addToGlobalStat("mergedCount", mergeSegments(segmentsToMerge, entry.getKey()));
} }

View File

@ -0,0 +1,140 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.master;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
public class MasterSegmentSettings
{
public static final String CONFIG_KEY = "master.dynamicConfigs";
private long millisToWaitBeforeDeleting=15 * 60 * 1000L;
private long mergeBytesLimit= 100000000L;
private int mergeSegmentsLimit = Integer.MAX_VALUE;
private int maxSegmentsToMove = 5;
private boolean emitBalancingStats = false;
@JsonCreator
public MasterSegmentSettings(
@JsonProperty("millisToWaitBeforeDeleting") Long millisToWaitBeforeDeleting,
@JsonProperty("mergeBytesLimit") Long mergeBytesLimit,
@JsonProperty("mergeSegmentsLimit") Integer mergeSegmentsLimit,
@JsonProperty("maxSegmentsToMove") Integer maxSegmentsToMove,
@JsonProperty("emitBalancingStats") Boolean emitBalancingStats
)
{
this.maxSegmentsToMove=maxSegmentsToMove;
this.millisToWaitBeforeDeleting=millisToWaitBeforeDeleting;
this.mergeSegmentsLimit=mergeSegmentsLimit;
this.mergeBytesLimit=mergeBytesLimit;
this.emitBalancingStats = emitBalancingStats;
}
public static String getConfigKey()
{
return CONFIG_KEY;
}
@JsonProperty
public long getMillisToWaitBeforeDeleting()
{
return millisToWaitBeforeDeleting;
}
@JsonProperty
public long getMergeBytesLimit()
{
return mergeBytesLimit;
}
public boolean isEmitBalancingStats()
{
return emitBalancingStats;
}
@JsonProperty
public int getMergeSegmentsLimit()
{
return mergeSegmentsLimit;
}
@JsonProperty
public int getMaxSegmentsToMove()
{
return maxSegmentsToMove;
}
public static class Builder
{
public static final String CONFIG_KEY = "master.dynamicConfigs";
private long millisToWaitBeforeDeleting;
private long mergeBytesLimit;
private int mergeSegmentsLimit;
private int maxSegmentsToMove;
private boolean emitBalancingStats;
public Builder()
{
this.millisToWaitBeforeDeleting=15 * 60 * 1000L;
this.mergeBytesLimit= 100000000L;
this.mergeSegmentsLimit= Integer.MAX_VALUE;
this.maxSegmentsToMove = 5;
this.emitBalancingStats = false;
}
public Builder(long millisToWaitBeforeDeleting, long mergeBytesLimit, int mergeSegmentsLimit, int maxSegmentsToMove, boolean emitBalancingStats)
{
this.millisToWaitBeforeDeleting = millisToWaitBeforeDeleting;
this.mergeBytesLimit = mergeBytesLimit;
this.mergeSegmentsLimit = mergeSegmentsLimit;
this.maxSegmentsToMove = maxSegmentsToMove;
this.emitBalancingStats = emitBalancingStats;
}
public Builder withMillisToWaitBeforeDeleting(long millisToWaitBeforeDeleting)
{
this.millisToWaitBeforeDeleting=millisToWaitBeforeDeleting;
return this;
}
public Builder withMergeBytesLimit(long mergeBytesLimit)
{
this.mergeBytesLimit=mergeBytesLimit;
return this;
}
public Builder withMergeSegmentsLimit(int mergeSegmentsLimit)
{
this.mergeSegmentsLimit=mergeSegmentsLimit;
return this;
}
public Builder withMaxSegmentsToMove(int maxSegmentsToMove)
{
this.maxSegmentsToMove=maxSegmentsToMove;
return this;
}
public MasterSegmentSettings build()
{
return new MasterSegmentSettings(millisToWaitBeforeDeleting,mergeBytesLimit,mergeSegmentsLimit,maxSegmentsToMove, emitBalancingStats);
}
}
}

View File

@ -37,6 +37,9 @@
<div> <div>
<a href="rules.html">Configure Compute Node Rules</a> <a href="rules.html">Configure Compute Node Rules</a>
</div> </div>
<div>
<a href="masterSegmentSettings.html">Configure Dynamic Master Parameters</a>
</div>
<div> <div>
<a href="enable.html">Enable/Disable Datasources</a> <a href="enable.html">Enable/Disable Datasources</a>
</div> </div>

View File

@ -0,0 +1,25 @@
$(function () {
$.get('../master/config', function (data) {
document.getElementById("millis").value=data["millisToWaitBeforeDeleting"];
document.getElementById("mergeBytes").value = data["mergeBytesLimit"];
document.getElementById("mergeSegments").value = data["mergeSegmentsLimit"];
document.getElementById("maxSegments").value = data["maxSegmentsToMove"];
});
$("#submit").click( function ()
{
values = {};
list = $('form').serializeArray();
for (var i=0;i< list.length;i++)
{
values[list[i]["name"]]=list[i]["value"];
}
$.ajax({
url:'../master/config',
type:"POST",
data: JSON.stringify(values),
contentType:"application/json; charset=utf-8",
dataType:"json"
});
});
});

View File

@ -78,4 +78,4 @@ function initDataTable(el) {
this.value = asInitVals[$("thead input").index(this)]; this.value = asInitVals[$("thead input").index(this)];
} }
}); });
} }

View File

@ -0,0 +1,23 @@
<!DOCTYPE html>
<html>
<head>
<title>Configure Dynamic Master Parameters</title>
</head>
<body>
<form action="process_config.html" autocomplete="on" id ="configs">
millisToWaitBeforeDeleting:<input type="text" name="millisToWaitBeforeDeleting" id="millis">
<br>
mergeBytesLimit: <input type="text" name="mergeBytesLimit" id="mergeBytes">
<br>
mergeSegmentsLimit: <input type="text" name = "mergeSegmentsLimit" id="mergeSegments">
<br>
maxSegmentsToMove: <input type= "text" name ="maxSegmentsToMove" id ="maxSegments">
<br>
<button type="button" id="submit"> Submit </button>
</form>
</body>
<script type="text/javascript" src="js/jquery-1.8.3.js"></script>
<script type="text/javascript" src="js/masterSegmentSettings.js"></script>
</script>
</html>

View File

@ -192,7 +192,7 @@ public class DruidMasterBalancerTest
) )
) )
.withAvailableSegments(segments.values()) .withAvailableSegments(segments.values())
.withMaxSegmentsToMove(MAX_SEGMENTS_TO_MOVE) .withMasterSegmentSettings(new MasterSegmentSettings.Builder().withMaxSegmentsToMove(MAX_SEGMENTS_TO_MOVE).build())
.withBalancerReferenceTimestamp(new DateTime("2013-01-01")) .withBalancerReferenceTimestamp(new DateTime("2013-01-01"))
.build(); .build();
@ -239,7 +239,6 @@ public class DruidMasterBalancerTest
LoadQueuePeonTester fromPeon = new LoadQueuePeonTester(); LoadQueuePeonTester fromPeon = new LoadQueuePeonTester();
LoadQueuePeonTester toPeon = new LoadQueuePeonTester(); LoadQueuePeonTester toPeon = new LoadQueuePeonTester();
DruidMasterRuntimeParams params = DruidMasterRuntimeParams params =
DruidMasterRuntimeParams.newBuilder() DruidMasterRuntimeParams.newBuilder()
.withDruidCluster( .withDruidCluster(
@ -265,7 +264,10 @@ public class DruidMasterBalancerTest
) )
) )
.withAvailableSegments(segments.values()) .withAvailableSegments(segments.values())
.withMaxSegmentsToMove(MAX_SEGMENTS_TO_MOVE) .withMasterSegmentSettings(
new MasterSegmentSettings.Builder().withMaxSegmentsToMove(MAX_SEGMENTS_TO_MOVE)
.build()
)
.withBalancerReferenceTimestamp(new DateTime("2013-01-01")) .withBalancerReferenceTimestamp(new DateTime("2013-01-01"))
.build(); .build();
@ -355,7 +357,11 @@ public class DruidMasterBalancerTest
) )
) )
.withAvailableSegments(segments.values()) .withAvailableSegments(segments.values())
.withMaxSegmentsToMove(MAX_SEGMENTS_TO_MOVE) .withMasterSegmentSettings(
new MasterSegmentSettings.Builder().withMaxSegmentsToMove(
MAX_SEGMENTS_TO_MOVE
).build()
)
.withBalancerReferenceTimestamp(new DateTime("2013-01-01")) .withBalancerReferenceTimestamp(new DateTime("2013-01-01"))
.build(); .build();

View File

@ -177,8 +177,7 @@ public class DruidMasterRuleRunnerTest
.withAvailableSegments(availableSegments) .withAvailableSegments(availableSegments)
.withDatabaseRuleManager(databaseRuleManager) .withDatabaseRuleManager(databaseRuleManager)
.withSegmentReplicantLookup(SegmentReplicantLookup.make(new DruidCluster())) .withSegmentReplicantLookup(SegmentReplicantLookup.make(new DruidCluster()))
.withMaxSegmentsToMove(5) .withMasterSegmentSettings(new MasterSegmentSettings.Builder().withMaxSegmentsToMove(5).build())
.withBalancerReferenceTimestamp(new DateTime("2013-01-01"))
.build(); .build();
DruidMasterRuntimeParams afterParams = ruleRunner.run(params); DruidMasterRuntimeParams afterParams = ruleRunner.run(params);
@ -523,7 +522,7 @@ public class DruidMasterRuleRunnerTest
DruidMasterRuntimeParams params = new DruidMasterRuntimeParams.Builder() DruidMasterRuntimeParams params = new DruidMasterRuntimeParams.Builder()
.withDruidCluster(druidCluster) .withDruidCluster(druidCluster)
.withMillisToWaitBeforeDeleting(0L) .withMasterSegmentSettings(new MasterSegmentSettings.Builder().withMillisToWaitBeforeDeleting(0L).build())
.withAvailableSegments(availableSegments) .withAvailableSegments(availableSegments)
.withDatabaseRuleManager(databaseRuleManager) .withDatabaseRuleManager(databaseRuleManager)
.withSegmentReplicantLookup(segmentReplicantLookup) .withSegmentReplicantLookup(segmentReplicantLookup)
@ -597,7 +596,7 @@ public class DruidMasterRuleRunnerTest
DruidMasterRuntimeParams params = new DruidMasterRuntimeParams.Builder() DruidMasterRuntimeParams params = new DruidMasterRuntimeParams.Builder()
.withDruidCluster(druidCluster) .withDruidCluster(druidCluster)
.withMillisToWaitBeforeDeleting(0L) .withMasterSegmentSettings(new MasterSegmentSettings.Builder().withMillisToWaitBeforeDeleting(0L).build())
.withAvailableSegments(availableSegments) .withAvailableSegments(availableSegments)
.withDatabaseRuleManager(databaseRuleManager) .withDatabaseRuleManager(databaseRuleManager)
.withSegmentReplicantLookup(segmentReplicantLookup) .withSegmentReplicantLookup(segmentReplicantLookup)
@ -678,7 +677,7 @@ public class DruidMasterRuleRunnerTest
DruidMasterRuntimeParams params = new DruidMasterRuntimeParams.Builder() DruidMasterRuntimeParams params = new DruidMasterRuntimeParams.Builder()
.withDruidCluster(druidCluster) .withDruidCluster(druidCluster)
.withMillisToWaitBeforeDeleting(0L) .withMasterSegmentSettings(new MasterSegmentSettings.Builder().withMillisToWaitBeforeDeleting(0L).build())
.withAvailableSegments(availableSegments) .withAvailableSegments(availableSegments)
.withDatabaseRuleManager(databaseRuleManager) .withDatabaseRuleManager(databaseRuleManager)
.withSegmentReplicantLookup(segmentReplicantLookup) .withSegmentReplicantLookup(segmentReplicantLookup)
@ -755,7 +754,7 @@ public class DruidMasterRuleRunnerTest
DruidMasterRuntimeParams params = new DruidMasterRuntimeParams.Builder() DruidMasterRuntimeParams params = new DruidMasterRuntimeParams.Builder()
.withDruidCluster(druidCluster) .withDruidCluster(druidCluster)
.withMillisToWaitBeforeDeleting(0L) .withMasterSegmentSettings(new MasterSegmentSettings.Builder().withMillisToWaitBeforeDeleting(0L).build())
.withAvailableSegments(availableSegments) .withAvailableSegments(availableSegments)
.withDatabaseRuleManager(databaseRuleManager) .withDatabaseRuleManager(databaseRuleManager)
.withSegmentReplicantLookup(segmentReplicantLookup) .withSegmentReplicantLookup(segmentReplicantLookup)
@ -844,7 +843,7 @@ public class DruidMasterRuleRunnerTest
DruidMasterRuntimeParams params = new DruidMasterRuntimeParams.Builder() DruidMasterRuntimeParams params = new DruidMasterRuntimeParams.Builder()
.withDruidCluster(druidCluster) .withDruidCluster(druidCluster)
.withMillisToWaitBeforeDeleting(0L) .withMasterSegmentSettings(new MasterSegmentSettings.Builder().withMillisToWaitBeforeDeleting(0L).build())
.withAvailableSegments(availableSegments) .withAvailableSegments(availableSegments)
.withDatabaseRuleManager(databaseRuleManager) .withDatabaseRuleManager(databaseRuleManager)
.withSegmentReplicantLookup(segmentReplicantLookup) .withSegmentReplicantLookup(segmentReplicantLookup)
@ -1030,7 +1029,7 @@ public class DruidMasterRuleRunnerTest
DruidMasterRuntimeParams params = new DruidMasterRuntimeParams.Builder() DruidMasterRuntimeParams params = new DruidMasterRuntimeParams.Builder()
.withDruidCluster(druidCluster) .withDruidCluster(druidCluster)
.withMillisToWaitBeforeDeleting(0L) .withMasterSegmentSettings(new MasterSegmentSettings.Builder().withMillisToWaitBeforeDeleting(0L).build())
.withAvailableSegments(longerAvailableSegments) .withAvailableSegments(longerAvailableSegments)
.withDatabaseRuleManager(databaseRuleManager) .withDatabaseRuleManager(databaseRuleManager)
.withSegmentReplicantLookup(segmentReplicantLookup) .withSegmentReplicantLookup(segmentReplicantLookup)

View File

@ -459,10 +459,15 @@ public class DruidMasterSegmentMergerTest
final DruidMasterSegmentMerger merger = new DruidMasterSegmentMerger(indexingServiceClient, whitelistRef); final DruidMasterSegmentMerger merger = new DruidMasterSegmentMerger(indexingServiceClient, whitelistRef);
final DruidMasterRuntimeParams params = DruidMasterRuntimeParams.newBuilder() final DruidMasterRuntimeParams params = DruidMasterRuntimeParams.newBuilder()
.withAvailableSegments(ImmutableSet.copyOf(segments)) .withAvailableSegments(ImmutableSet.copyOf(segments))
.withMergeBytesLimit(mergeBytesLimit) .withMasterSegmentSettings(
.withMergeSegmentsLimit(mergeSegmentsLimit) new MasterSegmentSettings.Builder().withMergeBytesLimit(
mergeBytesLimit
).withMergeSegmentsLimit(
mergeSegmentsLimit
)
.build()
)
.build(); .build();
merger.run(params); merger.run(params);
return retVal; return retVal;
} }

View File

@ -36,7 +36,6 @@ import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ScheduledExecutorService;
/** /**
*/ */
@ -95,12 +94,6 @@ public class DruidMasterTest
return null; return null;
} }
@Override
public long getMillisToWaitBeforeDeleting()
{
return super.getMillisToWaitBeforeDeleting();
}
@Override @Override
public String getMergerServiceName() public String getMergerServiceName()
{ {
@ -108,9 +101,9 @@ public class DruidMasterTest
} }
@Override @Override
public int getMaxSegmentsToMove() public MasterSegmentSettings getMasterSegmentSettings()
{ {
return 0; return new MasterSegmentSettings.Builder().withMillisToWaitBeforeDeleting(super.getMasterSegmentSettings().getMillisToWaitBeforeDeleting()).withMaxSegmentsToMove(0).build();
} }
@Override @Override

View File

@ -36,6 +36,7 @@ import com.metamx.druid.master.DruidMasterRuntimeParams;
import com.metamx.druid.master.LoadPeonCallback; import com.metamx.druid.master.LoadPeonCallback;
import com.metamx.druid.master.LoadQueuePeon; import com.metamx.druid.master.LoadQueuePeon;
import com.metamx.druid.master.LoadQueuePeonTester; import com.metamx.druid.master.LoadQueuePeonTester;
import com.metamx.druid.master.MasterSegmentSettings;
import com.metamx.druid.master.ReplicationThrottler; import com.metamx.druid.master.ReplicationThrottler;
import com.metamx.druid.master.SegmentReplicantLookup; import com.metamx.druid.master.SegmentReplicantLookup;
import com.metamx.druid.master.ServerHolder; import com.metamx.druid.master.ServerHolder;
@ -161,7 +162,7 @@ public class DruidMasterBalancerProfiler
peonMap peonMap
) )
.withAvailableSegments(segmentMap.values()) .withAvailableSegments(segmentMap.values())
.withMaxSegmentsToMove(MAX_SEGMENTS_TO_MOVE) .withMasterSegmentSettings(new MasterSegmentSettings.Builder().withMaxSegmentsToMove(MAX_SEGMENTS_TO_MOVE).build())
.withBalancerReferenceTimestamp(new DateTime("2013-01-01")) .withBalancerReferenceTimestamp(new DateTime("2013-01-01"))
.withEmitter(emitter) .withEmitter(emitter)
.withDatabaseRuleManager(manager) .withDatabaseRuleManager(manager)
@ -236,7 +237,7 @@ public class DruidMasterBalancerProfiler
) )
.withLoadManagementPeons(ImmutableMap.<String, LoadQueuePeon>of("from", fromPeon, "to", toPeon)) .withLoadManagementPeons(ImmutableMap.<String, LoadQueuePeon>of("from", fromPeon, "to", toPeon))
.withAvailableSegments(segments.values()) .withAvailableSegments(segments.values())
.withMaxSegmentsToMove(MAX_SEGMENTS_TO_MOVE) .withMasterSegmentSettings(new MasterSegmentSettings.Builder().withMaxSegmentsToMove(MAX_SEGMENTS_TO_MOVE).build())
.withBalancerReferenceTimestamp(new DateTime("2013-01-01")) .withBalancerReferenceTimestamp(new DateTime("2013-01-01"))
.build(); .build();
DruidMasterBalancerTester tester = new DruidMasterBalancerTester(master); DruidMasterBalancerTester tester = new DruidMasterBalancerTester(master);