Merge pull request #79 from metamx/bug_fixes

Throttle replicants that are created by rules
This commit is contained in:
fjy 2013-02-06 16:50:04 -08:00
commit e2862fe1fb
8 changed files with 436 additions and 19 deletions

View File

@ -21,7 +21,6 @@ package com.metamx.druid.master;
import com.google.common.base.Function;
import com.google.common.base.Predicate;
import com.google.common.collect.Collections2;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
@ -36,7 +35,6 @@ import com.metamx.common.guava.Comparators;
import com.metamx.common.guava.FunctionalIterable;
import com.metamx.common.lifecycle.LifecycleStart;
import com.metamx.common.lifecycle.LifecycleStop;
import com.metamx.common.logger.Logger;
import com.metamx.druid.client.DataSegment;
import com.metamx.druid.client.DruidDataSource;
import com.metamx.druid.client.DruidServer;
@ -56,7 +54,6 @@ import org.joda.time.Duration;
import javax.annotation.Nullable;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
@ -564,7 +561,6 @@ public class DruidMaster
DruidMasterRuntimeParams.newBuilder()
.withStartTime(startTime)
.withDatasources(databaseSegmentManager.getInventory())
.withLoadManagementPeons(loadManagementPeons)
.withMillisToWaitBeforeDeleting(config.getMillisToWaitBeforeDeleting())
.withEmitter(emitter)
.withMergeBytesLimit(config.getMergeBytesLimit())
@ -667,11 +663,16 @@ public class DruidMaster
return params.buildFromExisting()
.withDruidCluster(cluster)
.withDatabaseRuleManager(databaseRuleManager)
.withLoadManagementPeons(loadManagementPeons)
.withSegmentReplicantLookup(segmentReplicantLookup)
.build();
}
},
new DruidMasterRuleRunner(DruidMaster.this),
new DruidMasterRuleRunner(
DruidMaster.this,
config.getReplicantLifetime(),
config.getReplicantThrottleLimit()
),
new DruidMasterCleanup(DruidMaster.this),
new DruidMasterBalancer(DruidMaster.this, new BalancerAnalyzer()),
new DruidMasterLogger()

View File

@ -80,4 +80,12 @@ public abstract class DruidMasterConfig
{
return Integer.MAX_VALUE;
}
@Config("druid.master.replicant.lifetime")
@Default("15")
public abstract int getReplicantLifetime();
@Config("druid.master.replicant.throttleLimit")
@Default("10")
public abstract int getReplicantThrottleLimit();
}

View File

@ -33,11 +33,14 @@ public class DruidMasterRuleRunner implements DruidMasterHelper
{
private static final EmittingLogger log = new EmittingLogger(DruidMasterRuleRunner.class);
private final ReplicationThrottler replicationManager;
private final DruidMaster master;
public DruidMasterRuleRunner(DruidMaster master)
public DruidMasterRuleRunner(DruidMaster master, int replicantLifeTime, int replicantThrottleLimit)
{
this.master = master;
this.replicationManager = new ReplicationThrottler(replicantThrottleLimit, replicantLifeTime);
}
@Override
@ -51,16 +54,25 @@ public class DruidMasterRuleRunner implements DruidMasterHelper
return params;
}
for (String tier : cluster.getTierNames()) {
replicationManager.updateReplicationState(tier);
replicationManager.updateTerminationState(tier);
}
DruidMasterRuntimeParams paramsWithReplicationManager = params.buildFromExisting()
.withReplicationManager(replicationManager)
.build();
// Run through all matched rules for available segments
DateTime now = new DateTime();
DatabaseRuleManager databaseRuleManager = params.getDatabaseRuleManager();
for (DataSegment segment : params.getAvailableSegments()) {
DatabaseRuleManager databaseRuleManager = paramsWithReplicationManager.getDatabaseRuleManager();
for (DataSegment segment : paramsWithReplicationManager.getAvailableSegments()) {
List<Rule> rules = databaseRuleManager.getRulesWithDefault(segment.getDataSource());
boolean foundMatchingRule = false;
for (Rule rule : rules) {
if (rule.appliesTo(segment, now)) {
stats.accumulate(rule.run(master, params, segment));
stats.accumulate(rule.run(master, paramsWithReplicationManager, segment));
foundMatchingRule = true;
break;
}
@ -76,8 +88,8 @@ public class DruidMasterRuleRunner implements DruidMasterHelper
}
}
return params.buildFromExisting()
.withMasterStats(stats)
.build();
return paramsWithReplicationManager.buildFromExisting()
.withMasterStats(stats)
.build();
}
}

View File

@ -25,7 +25,6 @@ import com.metamx.common.guava.Comparators;
import com.metamx.druid.client.DataSegment;
import com.metamx.druid.client.DruidDataSource;
import com.metamx.druid.db.DatabaseRuleManager;
import com.metamx.druid.master.rules.RuleMap;
import com.metamx.emitter.service.ServiceEmitter;
import java.util.Collection;
@ -44,6 +43,7 @@ public class DruidMasterRuntimeParams
private final Set<DruidDataSource> dataSources;
private final Set<DataSegment> availableSegments;
private final Map<String, LoadQueuePeon> loadManagementPeons;
private final ReplicationThrottler replicationManager;
private final ServiceEmitter emitter;
private final long millisToWaitBeforeDeleting;
private final MasterStats stats;
@ -58,6 +58,7 @@ public class DruidMasterRuntimeParams
Set<DruidDataSource> dataSources,
Set<DataSegment> availableSegments,
Map<String, LoadQueuePeon> loadManagementPeons,
ReplicationThrottler replicationManager,
ServiceEmitter emitter,
long millisToWaitBeforeDeleting,
MasterStats stats,
@ -72,6 +73,7 @@ public class DruidMasterRuntimeParams
this.dataSources = dataSources;
this.availableSegments = availableSegments;
this.loadManagementPeons = loadManagementPeons;
this.replicationManager = replicationManager;
this.emitter = emitter;
this.millisToWaitBeforeDeleting = millisToWaitBeforeDeleting;
this.stats = stats;
@ -114,6 +116,11 @@ public class DruidMasterRuntimeParams
return loadManagementPeons;
}
public ReplicationThrottler getReplicationManager()
{
return replicationManager;
}
public ServiceEmitter getEmitter()
{
return emitter;
@ -159,6 +166,7 @@ public class DruidMasterRuntimeParams
dataSources,
availableSegments,
loadManagementPeons,
replicationManager,
emitter,
millisToWaitBeforeDeleting,
stats,
@ -176,6 +184,7 @@ public class DruidMasterRuntimeParams
private final Set<DruidDataSource> dataSources;
private final Set<DataSegment> availableSegments;
private final Map<String, LoadQueuePeon> loadManagementPeons;
private ReplicationThrottler replicationManager;
private ServiceEmitter emitter;
private long millisToWaitBeforeDeleting;
private MasterStats stats;
@ -191,6 +200,7 @@ public class DruidMasterRuntimeParams
this.dataSources = Sets.newHashSet();
this.availableSegments = Sets.newTreeSet(Comparators.inverse(DataSegment.bucketMonthComparator()));
this.loadManagementPeons = Maps.newHashMap();
this.replicationManager = null;
this.emitter = null;
this.millisToWaitBeforeDeleting = 0;
this.stats = new MasterStats();
@ -206,6 +216,7 @@ public class DruidMasterRuntimeParams
Set<DruidDataSource> dataSources,
Set<DataSegment> availableSegments,
Map<String, LoadQueuePeon> loadManagementPeons,
ReplicationThrottler replicationManager,
ServiceEmitter emitter,
long millisToWaitBeforeDeleting,
MasterStats stats,
@ -220,6 +231,7 @@ public class DruidMasterRuntimeParams
this.dataSources = dataSources;
this.availableSegments = availableSegments;
this.loadManagementPeons = loadManagementPeons;
this.replicationManager = replicationManager;
this.emitter = emitter;
this.millisToWaitBeforeDeleting = millisToWaitBeforeDeleting;
this.stats = stats;
@ -237,6 +249,7 @@ public class DruidMasterRuntimeParams
dataSources,
availableSegments,
loadManagementPeons,
replicationManager,
emitter,
millisToWaitBeforeDeleting,
stats,
@ -287,6 +300,12 @@ public class DruidMasterRuntimeParams
return this;
}
public Builder withReplicationManager(ReplicationThrottler replicationManager)
{
this.replicationManager = replicationManager;
return this;
}
public Builder withEmitter(ServiceEmitter emitter)
{
this.emitter = emitter;

View File

@ -0,0 +1,166 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.master;
import com.google.common.collect.Maps;
import com.metamx.emitter.EmittingLogger;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentSkipListSet;
/**
* The ReplicationThrottler is used to throttle the number of replicants that are created and destroyed.
*/
public class ReplicationThrottler
{
private static final EmittingLogger log = new EmittingLogger(ReplicationThrottler.class);
private final int maxReplicants;
private final int maxLifetime;
private final Map<String, Boolean> replicatingLookup = Maps.newHashMap();
private final Map<String, Boolean> terminatingLookup = Maps.newHashMap();
private final ReplicatorSegmentHolder currentlyReplicating = new ReplicatorSegmentHolder();
private final ReplicatorSegmentHolder currentlyTerminating = new ReplicatorSegmentHolder();
public ReplicationThrottler(int maxReplicants, int maxLifetime)
{
this.maxReplicants = maxReplicants;
this.maxLifetime = maxLifetime;
}
public void updateReplicationState(String tier)
{
update(tier, currentlyReplicating, replicatingLookup, "create");
}
public void updateTerminationState(String tier)
{
update(tier, currentlyTerminating, terminatingLookup, "terminate");
}
private void update(String tier, ReplicatorSegmentHolder holder, Map<String, Boolean> lookup, String type)
{
int size = holder.getNumProcessing(tier);
if (size != 0) {
log.info(
"[%s]: Replicant %s queue still has %d segments. Lifetime[%d]",
tier,
type,
size,
holder.getLifetime(tier)
);
holder.reduceLifetime(tier);
lookup.put(tier, false);
if (holder.getLifetime(tier) < 0) {
log.makeAlert("[%s]: Replicant %s queue stuck after %d+ runs!", tier, type, maxLifetime).emit();
}
} else {
log.info("[%s]: Replicant %s queue is empty.", tier, type);
lookup.put(tier, true);
}
}
public boolean canAddReplicant(String tier)
{
return replicatingLookup.get(tier);
}
public boolean canDestroyReplicant(String tier)
{
return terminatingLookup.get(tier);
}
public boolean registerReplicantCreation(String tier, String segmentId)
{
return currentlyReplicating.addSegment(tier, segmentId);
}
public void unregisterReplicantCreation(String tier, String segmentId)
{
currentlyReplicating.removeSegment(tier, segmentId);
}
public boolean registerReplicantTermination(String tier, String segmentId)
{
return currentlyTerminating.addSegment(tier, segmentId);
}
public void unregisterReplicantTermination(String tier, String segmentId)
{
currentlyTerminating.removeSegment(tier, segmentId);
}
private class ReplicatorSegmentHolder
{
private final Map<String, ConcurrentSkipListSet<String>> currentlyProcessingSegments = Maps.newHashMap();
private final Map<String, Integer> lifetimes = Maps.newHashMap();
public boolean addSegment(String tier, String segmentId)
{
ConcurrentSkipListSet<String> segments = currentlyProcessingSegments.get(tier);
if (segments == null) {
segments = new ConcurrentSkipListSet<String>();
currentlyProcessingSegments.put(tier, segments);
}
if (segments.size() < maxReplicants) {
segments.add(segmentId);
return true;
}
return false;
}
public void removeSegment(String tier, String segmentId)
{
Set<String> segments = currentlyProcessingSegments.get(tier);
if (segments != null) {
segments.remove(segmentId);
}
}
public int getNumProcessing(String tier)
{
Set<String> segments = currentlyProcessingSegments.get(tier);
return (segments == null) ? 0 : segments.size();
}
public int getLifetime(String tier)
{
Integer lifetime = lifetimes.get(tier);
if (lifetime == null) {
lifetime = maxLifetime;
lifetimes.put(tier, lifetime);
}
return lifetime;
}
public void reduceLifetime(String tier)
{
Integer lifetime = lifetimes.get(tier);
if (lifetime == null) {
lifetime = maxLifetime;
lifetimes.put(tier, lifetime);
}
lifetimes.put(tier, --lifetime);
}
}
}

View File

@ -24,6 +24,7 @@ import com.google.common.collect.Lists;
import com.google.common.collect.MinMaxPriorityQueue;
import com.metamx.druid.client.DataSegment;
import com.metamx.druid.master.DruidMaster;
import com.metamx.druid.master.ReplicationThrottler;
import com.metamx.druid.master.DruidMasterRuntimeParams;
import com.metamx.druid.master.LoadPeonCallback;
import com.metamx.druid.master.MasterStats;
@ -55,23 +56,31 @@ public abstract class LoadRule implements Rule
return stats;
}
stats.accumulate(assign(expectedReplicants, totalReplicants, serverQueue, segment));
stats.accumulate(assign(params.getReplicationManager(), expectedReplicants, totalReplicants, serverQueue, segment));
stats.accumulate(drop(expectedReplicants, clusterReplicants, segment, params));
return stats;
}
private MasterStats assign(
final ReplicationThrottler replicationManager,
int expectedReplicants,
int totalReplicants,
MinMaxPriorityQueue<ServerHolder> serverQueue,
DataSegment segment
final DataSegment segment
)
{
MasterStats stats = new MasterStats();
List<ServerHolder> assignedServers = Lists.newArrayList();
while (totalReplicants < expectedReplicants) {
if (totalReplicants > 0) { // don't throttle if there's only 1 copy of this segment in the cluster
if (!replicationManager.canAddReplicant(getTier()) ||
!replicationManager.registerReplicantCreation(getTier(), segment.getIdentifier())) {
break;
}
}
ServerHolder holder = serverQueue.pollFirst();
if (holder == null) {
log.warn(
@ -116,6 +125,7 @@ public abstract class LoadRule implements Rule
@Override
protected void execute()
{
replicationManager.unregisterReplicantCreation(getTier(), segment.getIdentifier());
}
}
);
@ -132,11 +142,12 @@ public abstract class LoadRule implements Rule
private MasterStats drop(
int expectedReplicants,
int clusterReplicants,
DataSegment segment,
DruidMasterRuntimeParams params
final DataSegment segment,
final DruidMasterRuntimeParams params
)
{
MasterStats stats = new MasterStats();
final ReplicationThrottler replicationManager = params.getReplicationManager();
if (!params.hasDeletionWaitTimeElapsed()) {
return stats;
@ -162,6 +173,13 @@ public abstract class LoadRule implements Rule
List<ServerHolder> droppedServers = Lists.newArrayList();
while (actualNumReplicantsForType > expectedNumReplicantsForType) {
if (expectedNumReplicantsForType > 0) { // don't throttle unless we are removing extra replicants
if (!replicationManager.canDestroyReplicant(getTier()) ||
!replicationManager.registerReplicantTermination(getTier(), segment.getIdentifier())) {
break;
}
}
ServerHolder holder = serverQueue.pollLast();
if (holder == null) {
log.warn("Wtf, holder was null? I have no servers serving [%s]?", segment.getIdentifier());
@ -176,6 +194,7 @@ public abstract class LoadRule implements Rule
@Override
protected void execute()
{
replicationManager.unregisterReplicantTermination(getTier(), segment.getIdentifier());
}
}
);

View File

@ -86,7 +86,7 @@ public class DruidMasterRuleRunnerTest
start = start.plusHours(1);
}
ruleRunner = new DruidMasterRuleRunner(master);
ruleRunner = new DruidMasterRuleRunner(master, 1, 24);
}
@After
@ -850,4 +850,185 @@ public class DruidMasterRuleRunnerTest
EasyMock.verify(mockPeon);
EasyMock.verify(anotherMockPeon);
}
/**
* Nodes:
* hot - 2 replicants
*
* @throws Exception
*/
@Test
public void testReplicantThrottle() throws Exception
{
mockPeon.loadSegment(EasyMock.<DataSegment>anyObject(), EasyMock.<LoadPeonCallback>anyObject());
EasyMock.expectLastCall().atLeastOnce();
EasyMock.expect(mockPeon.getSegmentsToLoad()).andReturn(Sets.<DataSegment>newHashSet()).atLeastOnce();
EasyMock.expect(mockPeon.getLoadQueueSize()).andReturn(0L).atLeastOnce();
EasyMock.replay(mockPeon);
EasyMock.expect(databaseRuleManager.getRulesWithDefault(EasyMock.<String>anyObject())).andReturn(
Lists.<Rule>newArrayList(
new IntervalLoadRule(new Interval("2012-01-01T00:00:00.000Z/2013-01-01T00:00:00.000Z"), 2, "hot")
)
).atLeastOnce();
EasyMock.replay(databaseRuleManager);
DruidCluster druidCluster = new DruidCluster(
ImmutableMap.of(
"hot",
MinMaxPriorityQueue.orderedBy(Ordering.natural().reverse()).create(
Arrays.asList(
new ServerHolder(
new DruidServer(
"serverHot",
"hostHot",
1000,
"historical",
"hot"
),
mockPeon
),
new ServerHolder(
new DruidServer(
"serverHot2",
"hostHot2",
1000,
"historical",
"hot"
),
mockPeon
)
)
)
)
);
DruidMasterRuntimeParams params =
new DruidMasterRuntimeParams.Builder()
.withDruidCluster(druidCluster)
.withAvailableSegments(availableSegments)
.withDatabaseRuleManager(databaseRuleManager)
.withSegmentReplicantLookup(SegmentReplicantLookup.make(new DruidCluster()))
.build();
DruidMasterRuntimeParams afterParams = ruleRunner.run(params);
MasterStats stats = afterParams.getMasterStats();
Assert.assertTrue(stats.getPerTierStats().get("assignedCount").get("hot").get() == 48);
Assert.assertTrue(stats.getPerTierStats().get("unassignedCount") == null);
Assert.assertTrue(stats.getPerTierStats().get("unassignedSize") == null);
DataSegment overFlowSegment = new DataSegment(
"test",
new Interval("2012-02-01/2012-02-02"),
new DateTime().toString(),
Maps.<String, Object>newHashMap(),
Lists.<String>newArrayList(),
Lists.<String>newArrayList(),
new NoneShardSpec(),
1
);
afterParams = ruleRunner.run(
new DruidMasterRuntimeParams.Builder()
.withDruidCluster(druidCluster)
.withEmitter(emitter)
.withAvailableSegments(Arrays.asList(overFlowSegment))
.withDatabaseRuleManager(databaseRuleManager)
.withSegmentReplicantLookup(SegmentReplicantLookup.make(new DruidCluster()))
.build()
);
stats = afterParams.getMasterStats();
Assert.assertTrue(stats.getPerTierStats().get("assignedCount").get("hot").get() == 1);
Assert.assertTrue(stats.getPerTierStats().get("unassignedCount") == null);
Assert.assertTrue(stats.getPerTierStats().get("unassignedSize") == null);
EasyMock.verify(mockPeon);
}
@Test
public void testDropReplicantThrottle() throws Exception
{
mockPeon.dropSegment(EasyMock.<DataSegment>anyObject(), EasyMock.<LoadPeonCallback>anyObject());
EasyMock.expectLastCall().atLeastOnce();
EasyMock.expect(mockPeon.getSegmentsToLoad()).andReturn(Sets.<DataSegment>newHashSet()).atLeastOnce();
EasyMock.expect(mockPeon.getLoadQueueSize()).andReturn(0L).atLeastOnce();
EasyMock.replay(mockPeon);
EasyMock.expect(databaseRuleManager.getRulesWithDefault(EasyMock.<String>anyObject())).andReturn(
Lists.<Rule>newArrayList(
new IntervalLoadRule(new Interval("2012-01-01T00:00:00.000Z/2013-01-02T00:00:00.000Z"), 1, "normal")
)
).atLeastOnce();
EasyMock.replay(databaseRuleManager);
DataSegment overFlowSegment = new DataSegment(
"test",
new Interval("2012-02-01/2012-02-02"),
new DateTime().toString(),
Maps.<String, Object>newHashMap(),
Lists.<String>newArrayList(),
Lists.<String>newArrayList(),
new NoneShardSpec(),
1
);
List<DataSegment> longerAvailableSegments = Lists.newArrayList(availableSegments);
longerAvailableSegments.add(overFlowSegment);
DruidServer server1 = new DruidServer(
"serverNorm1",
"hostNorm1",
1000,
"historical",
"normal"
);
for (DataSegment availableSegment : longerAvailableSegments) {
server1.addDataSegment(availableSegment.getIdentifier(), availableSegment);
}
DruidServer server2 = new DruidServer(
"serverNorm2",
"hostNorm2",
1000,
"historical",
"normal"
);
for (DataSegment availableSegment : longerAvailableSegments) {
server2.addDataSegment(availableSegment.getIdentifier(), availableSegment);
}
DruidCluster druidCluster = new DruidCluster(
ImmutableMap.of(
"normal",
MinMaxPriorityQueue.orderedBy(Ordering.natural().reverse()).create(
Arrays.asList(
new ServerHolder(
server1,
mockPeon
),
new ServerHolder(
server2,
mockPeon
)
)
)
)
);
SegmentReplicantLookup segmentReplicantLookup = SegmentReplicantLookup.make(druidCluster);
DruidMasterRuntimeParams params = new DruidMasterRuntimeParams.Builder()
.withDruidCluster(druidCluster)
.withMillisToWaitBeforeDeleting(0L)
.withAvailableSegments(longerAvailableSegments)
.withDatabaseRuleManager(databaseRuleManager)
.withSegmentReplicantLookup(segmentReplicantLookup)
.build();
DruidMasterRuntimeParams afterParams = ruleRunner.run(params);
MasterStats stats = afterParams.getMasterStats();
Assert.assertTrue(stats.getPerTierStats().get("droppedCount").get("normal").get() == 24);
EasyMock.verify(mockPeon);
}
}

View File

@ -20,7 +20,6 @@
package com.metamx.druid.master;
import com.metamx.common.concurrent.ScheduledExecutorFactory;
import com.metamx.common.logger.Logger;
import com.metamx.druid.client.DataSegment;
import com.metamx.druid.client.DruidServer;
import com.metamx.druid.client.ServerInventoryManager;
@ -124,6 +123,18 @@ public class DruidMasterTest
{
return "";
}
@Override
public int getReplicantLifetime()
{
return 0;
}
@Override
public int getReplicantThrottleLimit()
{
return 0;
}
},
null,
null,