fix some bugs from last commit

This commit is contained in:
Fangjin Yang 2013-02-06 13:57:56 -08:00
parent 97999c06c4
commit 84baef0462
8 changed files with 279 additions and 62 deletions

View File

@ -21,7 +21,6 @@ package com.metamx.druid.master;
import com.google.common.base.Function;
import com.google.common.base.Predicate;
import com.google.common.collect.Collections2;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
@ -36,7 +35,6 @@ import com.metamx.common.guava.Comparators;
import com.metamx.common.guava.FunctionalIterable;
import com.metamx.common.lifecycle.LifecycleStart;
import com.metamx.common.lifecycle.LifecycleStop;
import com.metamx.common.logger.Logger;
import com.metamx.druid.client.DataSegment;
import com.metamx.druid.client.DruidDataSource;
import com.metamx.druid.client.DruidServer;
@ -56,13 +54,11 @@ import org.joda.time.Duration;
import javax.annotation.Nullable;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.ScheduledExecutorService;
/**
@ -565,7 +561,6 @@ public class DruidMaster
DruidMasterRuntimeParams.newBuilder()
.withStartTime(startTime)
.withDatasources(databaseSegmentManager.getInventory())
.withLoadManagementPeons(loadManagementPeons)
.withMillisToWaitBeforeDeleting(config.getMillisToWaitBeforeDeleting())
.withEmitter(emitter)
.withMergeBytesLimit(config.getMergeBytesLimit())
@ -623,7 +618,6 @@ public class DruidMaster
// Find all historical servers, group them by subType and sort by ascending usage
final DruidCluster cluster = new DruidCluster();
final Map<String, ConcurrentSkipListSet<String>> currentlyReplicatingSegments;
for (DruidServer server : servers) {
if (!loadManagementPeons.containsKey(server.getName())) {
String basePath = yp.combineParts(Arrays.asList(config.getLoadQueuePath(), server.getName()));
@ -670,11 +664,16 @@ public class DruidMaster
return params.buildFromExisting()
.withDruidCluster(cluster)
.withDatabaseRuleManager(databaseRuleManager)
.withLoadManagementPeons(loadManagementPeons)
.withSegmentReplicantLookup(segmentReplicantLookup)
.build();
}
},
new DruidMasterRuleRunner(DruidMaster.this),
new DruidMasterRuleRunner(
DruidMaster.this,
config.getReplicantLifetime(),
config.getReplicantThrottleLimit()
),
new DruidMasterCleanup(DruidMaster.this),
new DruidMasterBalancer(DruidMaster.this, new BalancerAnalyzer()),
new DruidMasterLogger()

View File

@ -80,4 +80,12 @@ public abstract class DruidMasterConfig
{
return Integer.MAX_VALUE;
}
@Config("druid.master.replicant.lifetime")
@Default("15")
public abstract int getReplicantLifetime();
@Config("druid.master.replicant.throttleLimit")
@Default("10")
public abstract int getReplicantThrottleLimit();
}

View File

@ -33,13 +33,14 @@ public class DruidMasterRuleRunner implements DruidMasterHelper
{
private static final EmittingLogger log = new EmittingLogger(DruidMasterRuleRunner.class);
private final DruidMasterReplicationManager replicationManager = new DruidMasterReplicationManager(10, 15);
private final ReplicationThrottler replicationManager;
private final DruidMaster master;
public DruidMasterRuleRunner(DruidMaster master)
public DruidMasterRuleRunner(DruidMaster master, int replicantLifeTime, int replicantThrottleLimit)
{
this.master = master;
this.replicationManager = new ReplicationThrottler(replicantThrottleLimit, replicantLifeTime);
}
@Override
@ -53,8 +54,8 @@ public class DruidMasterRuleRunner implements DruidMasterHelper
return params;
}
for (String tier : params.getDruidCluster().getTierNames()) {
replicationManager.updateCreationState(tier);
for (String tier : cluster.getTierNames()) {
replicationManager.updateReplicationState(tier);
replicationManager.updateTerminationState(tier);
}
@ -64,14 +65,14 @@ public class DruidMasterRuleRunner implements DruidMasterHelper
// Run through all matched rules for available segments
DateTime now = new DateTime();
DatabaseRuleManager databaseRuleManager = params.getDatabaseRuleManager();
for (DataSegment segment : params.getAvailableSegments()) {
DatabaseRuleManager databaseRuleManager = paramsWithReplicationManager.getDatabaseRuleManager();
for (DataSegment segment : paramsWithReplicationManager.getAvailableSegments()) {
List<Rule> rules = databaseRuleManager.getRulesWithDefault(segment.getDataSource());
boolean foundMatchingRule = false;
for (Rule rule : rules) {
if (rule.appliesTo(segment, now)) {
stats.accumulate(rule.run(master, params, segment));
stats.accumulate(rule.run(master, paramsWithReplicationManager, segment));
foundMatchingRule = true;
break;
}

View File

@ -31,7 +31,6 @@ import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentSkipListSet;
/**
*/
@ -44,7 +43,7 @@ public class DruidMasterRuntimeParams
private final Set<DruidDataSource> dataSources;
private final Set<DataSegment> availableSegments;
private final Map<String, LoadQueuePeon> loadManagementPeons;
private final DruidMasterReplicationManager replicationManager;
private final ReplicationThrottler replicationManager;
private final ServiceEmitter emitter;
private final long millisToWaitBeforeDeleting;
private final MasterStats stats;
@ -59,7 +58,7 @@ public class DruidMasterRuntimeParams
Set<DruidDataSource> dataSources,
Set<DataSegment> availableSegments,
Map<String, LoadQueuePeon> loadManagementPeons,
DruidMasterReplicationManager replicationManager,
ReplicationThrottler replicationManager,
ServiceEmitter emitter,
long millisToWaitBeforeDeleting,
MasterStats stats,
@ -117,7 +116,7 @@ public class DruidMasterRuntimeParams
return loadManagementPeons;
}
public DruidMasterReplicationManager getReplicationManager()
public ReplicationThrottler getReplicationManager()
{
return replicationManager;
}
@ -185,7 +184,7 @@ public class DruidMasterRuntimeParams
private final Set<DruidDataSource> dataSources;
private final Set<DataSegment> availableSegments;
private final Map<String, LoadQueuePeon> loadManagementPeons;
private DruidMasterReplicationManager replicationManager;
private ReplicationThrottler replicationManager;
private ServiceEmitter emitter;
private long millisToWaitBeforeDeleting;
private MasterStats stats;
@ -217,7 +216,7 @@ public class DruidMasterRuntimeParams
Set<DruidDataSource> dataSources,
Set<DataSegment> availableSegments,
Map<String, LoadQueuePeon> loadManagementPeons,
DruidMasterReplicationManager replicationManager,
ReplicationThrottler replicationManager,
ServiceEmitter emitter,
long millisToWaitBeforeDeleting,
MasterStats stats,
@ -301,7 +300,7 @@ public class DruidMasterRuntimeParams
return this;
}
public Builder withReplicationManager(DruidMasterReplicationManager replicationManager)
public Builder withReplicationManager(ReplicationThrottler replicationManager)
{
this.replicationManager = replicationManager;
return this;

View File

@ -27,11 +27,11 @@ import java.util.Set;
import java.util.concurrent.ConcurrentSkipListSet;
/**
* The DruidMasterReplicationManager is used to throttle the number of replicants that are created and destroyed.
* The ReplicationThrottler is used to throttle the number of replicants that are created and destroyed.
*/
public class DruidMasterReplicationManager
public class ReplicationThrottler
{
private static final EmittingLogger log = new EmittingLogger(DruidMasterReplicationManager.class);
private static final EmittingLogger log = new EmittingLogger(ReplicationThrottler.class);
private final int maxReplicants;
private final int maxLifetime;
@ -40,7 +40,7 @@ public class DruidMasterReplicationManager
private final ReplicatorSegmentHolder currentlyReplicating = new ReplicatorSegmentHolder();
private final ReplicatorSegmentHolder currentlyTerminating = new ReplicatorSegmentHolder();
public DruidMasterReplicationManager(int maxReplicants, int maxLifetime)
public ReplicationThrottler(int maxReplicants, int maxLifetime)
{
this.maxReplicants = maxReplicants;
this.maxLifetime = maxLifetime;
@ -60,16 +60,23 @@ public class DruidMasterReplicationManager
{
int size = holder.getNumProcessing(tier);
if (size != 0) {
log.info("[%s]: Replicant %s queue still has %d segments", tier, type, size);
holder.reduceLifetime();
if (holder.getLifetime() < 0) {
log.makeAlert("[%s]: Replicant %s queue stuck after %d+ runs!", tier, type, maxReplicants).emit();
}
log.info(
"[%s]: Replicant %s queue still has %d segments. Lifetime[%d]",
tier,
type,
size,
holder.getLifetime(tier)
);
holder.reduceLifetime(tier);
lookup.put(tier, false);
}
lookup.put(tier, true);
if (holder.getLifetime(tier) < 0) {
log.makeAlert("[%s]: Replicant %s queue stuck after %d+ runs!", tier, type, maxLifetime).emit();
}
} else {
log.info("[%s]: Replicant %s queue is empty.", tier, type);
lookup.put(tier, true);
}
}
public boolean canAddReplicant(String tier)
@ -105,15 +112,16 @@ public class DruidMasterReplicationManager
private class ReplicatorSegmentHolder
{
private final Map<String, ConcurrentSkipListSet<String>> currentlyProcessingSegments = Maps.newHashMap();
private volatile int lifetime = maxLifetime;
private final Map<String, Integer> lifetimes = Maps.newHashMap();
public boolean addSegment(String tier, String segmentId)
{
if (currentlyProcessingSegments.size() < maxReplicants) {
Set<String> segments = currentlyProcessingSegments.get(tier);
if (segments == null) {
segments = new ConcurrentSkipListSet<String>();
}
ConcurrentSkipListSet<String> segments = currentlyProcessingSegments.get(tier);
if (segments == null) {
segments = new ConcurrentSkipListSet<String>();
currentlyProcessingSegments.put(tier, segments);
}
if (segments.size() < maxReplicants) {
segments.add(segmentId);
return true;
}
@ -132,17 +140,27 @@ public class DruidMasterReplicationManager
public int getNumProcessing(String tier)
{
Set<String> segments = currentlyProcessingSegments.get(tier);
return (segments == null) ? 0 : currentlyProcessingSegments.size();
return (segments == null) ? 0 : segments.size();
}
public int getLifetime()
public int getLifetime(String tier)
{
Integer lifetime = lifetimes.get(tier);
if (lifetime == null) {
lifetime = maxLifetime;
lifetimes.put(tier, lifetime);
}
return lifetime;
}
public void reduceLifetime()
public void reduceLifetime(String tier)
{
lifetime--;
Integer lifetime = lifetimes.get(tier);
if (lifetime == null) {
lifetime = maxLifetime;
lifetimes.put(tier, lifetime);
}
lifetimes.put(tier, --lifetime);
}
}
}

View File

@ -24,7 +24,7 @@ import com.google.common.collect.Lists;
import com.google.common.collect.MinMaxPriorityQueue;
import com.metamx.druid.client.DataSegment;
import com.metamx.druid.master.DruidMaster;
import com.metamx.druid.master.DruidMasterReplicationManager;
import com.metamx.druid.master.ReplicationThrottler;
import com.metamx.druid.master.DruidMasterRuntimeParams;
import com.metamx.druid.master.LoadPeonCallback;
import com.metamx.druid.master.MasterStats;
@ -63,7 +63,7 @@ public abstract class LoadRule implements Rule
}
private MasterStats assign(
final DruidMasterReplicationManager replicationManager,
final ReplicationThrottler replicationManager,
int expectedReplicants,
int totalReplicants,
MinMaxPriorityQueue<ServerHolder> serverQueue,
@ -74,6 +74,13 @@ public abstract class LoadRule implements Rule
List<ServerHolder> assignedServers = Lists.newArrayList();
while (totalReplicants < expectedReplicants) {
if (totalReplicants > 0) { // don't throttle if there's only 1 copy of this segment in the cluster
if (!replicationManager.canAddReplicant(getTier()) ||
!replicationManager.registerReplicantCreation(getTier(), segment.getIdentifier())) {
break;
}
}
ServerHolder holder = serverQueue.pollFirst();
if (holder == null) {
log.warn(
@ -111,13 +118,6 @@ public abstract class LoadRule implements Rule
break;
}
if (totalReplicants > 0) { // don't throttle if there's only 1 copy of this segment in the cluster
if (!replicationManager.canAddReplicant(getTier()) ||
!replicationManager.registerReplicantCreation(getTier(), segment.getIdentifier())) {
break;
}
}
holder.getPeon().loadSegment(
segment,
new LoadPeonCallback()
@ -147,7 +147,7 @@ public abstract class LoadRule implements Rule
)
{
MasterStats stats = new MasterStats();
final DruidMasterReplicationManager replicationManager = params.getReplicationManager();
final ReplicationThrottler replicationManager = params.getReplicationManager();
if (!params.hasDeletionWaitTimeElapsed()) {
return stats;
@ -173,19 +173,19 @@ public abstract class LoadRule implements Rule
List<ServerHolder> droppedServers = Lists.newArrayList();
while (actualNumReplicantsForType > expectedNumReplicantsForType) {
ServerHolder holder = serverQueue.pollLast();
if (holder == null) {
log.warn("Wtf, holder was null? I have no servers serving [%s]?", segment.getIdentifier());
break;
}
if (actualNumReplicantsForType > 1) { // don't throttle unless we are removing extra replicants
if (expectedNumReplicantsForType > 0) { // don't throttle unless we are removing extra replicants
if (!replicationManager.canDestroyReplicant(getTier()) ||
!replicationManager.registerReplicantTermination(getTier(), segment.getIdentifier())) {
break;
}
}
ServerHolder holder = serverQueue.pollLast();
if (holder == null) {
log.warn("Wtf, holder was null? I have no servers serving [%s]?", segment.getIdentifier());
break;
}
if (holder.isServingSegment(segment)) {
holder.getPeon().dropSegment(
segment,

View File

@ -84,7 +84,7 @@ public class DruidMasterRuleRunnerTest
start = start.plusHours(1);
}
ruleRunner = new DruidMasterRuleRunner(master);
ruleRunner = new DruidMasterRuleRunner(master, 1, 24);
}
@After
@ -848,4 +848,185 @@ public class DruidMasterRuleRunnerTest
EasyMock.verify(mockPeon);
EasyMock.verify(anotherMockPeon);
}
/**
* Nodes:
* hot - 2 replicants
*
* @throws Exception
*/
@Test
public void testReplicantThrottle() throws Exception
{
mockPeon.loadSegment(EasyMock.<DataSegment>anyObject(), EasyMock.<LoadPeonCallback>anyObject());
EasyMock.expectLastCall().atLeastOnce();
EasyMock.expect(mockPeon.getSegmentsToLoad()).andReturn(Sets.<DataSegment>newHashSet()).atLeastOnce();
EasyMock.expect(mockPeon.getLoadQueueSize()).andReturn(0L).atLeastOnce();
EasyMock.replay(mockPeon);
EasyMock.expect(databaseRuleManager.getRulesWithDefault(EasyMock.<String>anyObject())).andReturn(
Lists.<Rule>newArrayList(
new IntervalLoadRule(new Interval("2012-01-01T00:00:00.000Z/2013-01-01T00:00:00.000Z"), 2, "hot")
)
).atLeastOnce();
EasyMock.replay(databaseRuleManager);
DruidCluster druidCluster = new DruidCluster(
ImmutableMap.of(
"hot",
MinMaxPriorityQueue.orderedBy(Ordering.natural().reverse()).create(
Arrays.asList(
new ServerHolder(
new DruidServer(
"serverHot",
"hostHot",
1000,
"historical",
"hot"
),
mockPeon
),
new ServerHolder(
new DruidServer(
"serverHot2",
"hostHot2",
1000,
"historical",
"hot"
),
mockPeon
)
)
)
)
);
DruidMasterRuntimeParams params =
new DruidMasterRuntimeParams.Builder()
.withDruidCluster(druidCluster)
.withAvailableSegments(availableSegments)
.withDatabaseRuleManager(databaseRuleManager)
.withSegmentReplicantLookup(SegmentReplicantLookup.make(new DruidCluster()))
.build();
DruidMasterRuntimeParams afterParams = ruleRunner.run(params);
MasterStats stats = afterParams.getMasterStats();
Assert.assertTrue(stats.getPerTierStats().get("assignedCount").get("hot").get() == 48);
Assert.assertTrue(stats.getPerTierStats().get("unassignedCount") == null);
Assert.assertTrue(stats.getPerTierStats().get("unassignedSize") == null);
DataSegment overFlowSegment = new DataSegment(
"test",
new Interval("2012-02-01/2012-02-02"),
new DateTime().toString(),
Maps.<String, Object>newHashMap(),
Lists.<String>newArrayList(),
Lists.<String>newArrayList(),
new NoneShardSpec(),
1
);
afterParams = ruleRunner.run(
new DruidMasterRuntimeParams.Builder()
.withDruidCluster(druidCluster)
.withEmitter(emitter)
.withAvailableSegments(Arrays.asList(overFlowSegment))
.withDatabaseRuleManager(databaseRuleManager)
.withSegmentReplicantLookup(SegmentReplicantLookup.make(new DruidCluster()))
.build()
);
stats = afterParams.getMasterStats();
Assert.assertTrue(stats.getPerTierStats().get("assignedCount").get("hot").get() == 1);
Assert.assertTrue(stats.getPerTierStats().get("unassignedCount") == null);
Assert.assertTrue(stats.getPerTierStats().get("unassignedSize") == null);
EasyMock.verify(mockPeon);
}
@Test
public void testDropReplicantThrottle() throws Exception
{
mockPeon.dropSegment(EasyMock.<DataSegment>anyObject(), EasyMock.<LoadPeonCallback>anyObject());
EasyMock.expectLastCall().atLeastOnce();
EasyMock.expect(mockPeon.getSegmentsToLoad()).andReturn(Sets.<DataSegment>newHashSet()).atLeastOnce();
EasyMock.expect(mockPeon.getLoadQueueSize()).andReturn(0L).atLeastOnce();
EasyMock.replay(mockPeon);
EasyMock.expect(databaseRuleManager.getRulesWithDefault(EasyMock.<String>anyObject())).andReturn(
Lists.<Rule>newArrayList(
new IntervalLoadRule(new Interval("2012-01-01T00:00:00.000Z/2013-01-02T00:00:00.000Z"), 1, "normal")
)
).atLeastOnce();
EasyMock.replay(databaseRuleManager);
DataSegment overFlowSegment = new DataSegment(
"test",
new Interval("2012-02-01/2012-02-02"),
new DateTime().toString(),
Maps.<String, Object>newHashMap(),
Lists.<String>newArrayList(),
Lists.<String>newArrayList(),
new NoneShardSpec(),
1
);
List<DataSegment> longerAvailableSegments = Lists.newArrayList(availableSegments);
longerAvailableSegments.add(overFlowSegment);
DruidServer server1 = new DruidServer(
"serverNorm1",
"hostNorm1",
1000,
"historical",
"normal"
);
for (DataSegment availableSegment : longerAvailableSegments) {
server1.addDataSegment(availableSegment.getIdentifier(), availableSegment);
}
DruidServer server2 = new DruidServer(
"serverNorm2",
"hostNorm2",
1000,
"historical",
"normal"
);
for (DataSegment availableSegment : longerAvailableSegments) {
server2.addDataSegment(availableSegment.getIdentifier(), availableSegment);
}
DruidCluster druidCluster = new DruidCluster(
ImmutableMap.of(
"normal",
MinMaxPriorityQueue.orderedBy(Ordering.natural().reverse()).create(
Arrays.asList(
new ServerHolder(
server1,
mockPeon
),
new ServerHolder(
server2,
mockPeon
)
)
)
)
);
SegmentReplicantLookup segmentReplicantLookup = SegmentReplicantLookup.make(druidCluster);
DruidMasterRuntimeParams params = new DruidMasterRuntimeParams.Builder()
.withDruidCluster(druidCluster)
.withMillisToWaitBeforeDeleting(0L)
.withAvailableSegments(longerAvailableSegments)
.withDatabaseRuleManager(databaseRuleManager)
.withSegmentReplicantLookup(segmentReplicantLookup)
.build();
DruidMasterRuntimeParams afterParams = ruleRunner.run(params);
MasterStats stats = afterParams.getMasterStats();
Assert.assertTrue(stats.getPerTierStats().get("droppedCount").get("normal").get() == 24);
EasyMock.verify(mockPeon);
}
}

View File

@ -20,7 +20,6 @@
package com.metamx.druid.master;
import com.metamx.common.concurrent.ScheduledExecutorFactory;
import com.metamx.common.logger.Logger;
import com.metamx.druid.client.DataSegment;
import com.metamx.druid.client.DruidServer;
import com.metamx.druid.client.ServerInventoryManager;
@ -124,6 +123,18 @@ public class DruidMasterTest
{
return "";
}
@Override
public int getReplicantLifetime()
{
return 0;
}
@Override
public int getReplicantThrottleLimit()
{
return 0;
}
},
null,
null,