passing tests from loading across tiers

This commit is contained in:
fjy 2014-01-02 11:33:05 -08:00
parent 488a118f3f
commit 2d3c4812e5
8 changed files with 311 additions and 37 deletions

View File

@ -34,9 +34,6 @@ public class DruidServerConfig
@JsonProperty
private String tier = DruidServer.DEFAULT_TIER;
@JsonProperty
private String zone = DruidServer.DEFAULT_ZONE;
public long getMaxSize()
{
return maxSize;
@ -46,8 +43,4 @@ public class DruidServerConfig
{
return tier;
}
public String getZone() {
return zone;
}
}

View File

@ -73,7 +73,8 @@ public class IntervalLoadRule extends LoadRule
@Override
public int getNumReplicants(String tier)
{
return tieredReplicants.get(tier);
final Integer retVal = tieredReplicants.get(tier);
return retVal == null ? 0 : retVal;
}
@JsonProperty

View File

@ -19,6 +19,7 @@
package io.druid.server.coordinator.rules;
import com.google.api.client.util.Maps;
import com.google.common.collect.Lists;
import com.google.common.collect.MinMaxPriorityQueue;
import com.metamx.emitter.EmittingLogger;
@ -47,13 +48,12 @@ public abstract class LoadRule implements Rule
{
CoordinatorStats stats = new CoordinatorStats();
final Map<String, Integer> loadStatus = Maps.newHashMap();
for (Map.Entry<String, Integer> entry : getTieredReplicants().entrySet()) {
final String tier = entry.getKey();
final int expectedReplicants = entry.getValue();
int totalReplicants = params.getSegmentReplicantLookup().getTotalReplicants(segment.getIdentifier(), tier);
int clusterReplicants = params.getSegmentReplicantLookup()
.getClusterReplicants(segment.getIdentifier(), tier);
MinMaxPriorityQueue<ServerHolder> serverQueue = params.getDruidCluster().getServersByTier(tier);
if (serverQueue == null) {
@ -78,8 +78,13 @@ public abstract class LoadRule implements Rule
);
}
stats.accumulate(drop(expectedReplicants, clusterReplicants, segment, params));
int clusterReplicants = params.getSegmentReplicantLookup()
.getClusterReplicants(segment.getIdentifier(), tier);
loadStatus.put(tier, expectedReplicants - clusterReplicants);
}
// Remove over-replication
stats.accumulate(drop(loadStatus, segment, params));
return stats;
}
@ -146,24 +151,26 @@ public abstract class LoadRule implements Rule
}
private CoordinatorStats drop(
int expectedReplicants,
int clusterReplicants,
final Map<String, Integer> loadStatus,
final DataSegment segment,
final DruidCoordinatorRuntimeParams params
)
{
CoordinatorStats stats = new CoordinatorStats();
final ReplicationThrottler replicationManager = params.getReplicationManager();
if (!params.hasDeletionWaitTimeElapsed()) {
return stats;
}
// Make sure we have enough actual replicants in the correct tier in the cluster before doing anything
if (clusterReplicants < expectedReplicants) {
return stats;
// Make sure we have enough actual replicants in the correct tiers in the cluster before doing anything
for (Integer leftToLoad : loadStatus.values()) {
if (leftToLoad > 0) {
return stats;
}
}
final ReplicationThrottler replicationManager = params.getReplicationManager();
// Find all instances of this segment across tiers
Map<String, Integer> replicantsByTier = params.getSegmentReplicantLookup().getClusterTiers(segment.getIdentifier());

View File

@ -44,7 +44,7 @@ public class PeriodLoadRule extends LoadRule
@JsonProperty("period") Period period,
@JsonProperty("tieredReplicants") Map<String, Integer> tieredReplicants,
// The following two vars need to be deprecated
@JsonProperty("replicants") Integer replicants,
@JsonProperty("replicants") int replicants,
@JsonProperty("tier") String tier
)
{
@ -80,7 +80,8 @@ public class PeriodLoadRule extends LoadRule
@Override
public int getNumReplicants(String tier)
{
return tieredReplicants.get(tier);
final Integer retVal = tieredReplicants.get(tier);
return retVal == null ? 0 : retVal;
}
@Override

View File

@ -54,7 +54,7 @@ public class DruidCoordinatorBalancerProfiler
Map<String, DataSegment> segments = Maps.newHashMap();
ServiceEmitter emitter;
DatabaseRuleManager manager;
PeriodLoadRule loadRule = new PeriodLoadRule(new Period("P5000Y"), 3, "normal");
PeriodLoadRule loadRule = new PeriodLoadRule(new Period("P5000Y"), null, 3, "normal");
List<Rule> rules = ImmutableList.<Rule>of(loadRule);
@Before

View File

@ -115,9 +115,9 @@ public class DruidCoordinatorRuleRunnerTest
EasyMock.expect(databaseRuleManager.getRulesWithDefault(EasyMock.<String>anyObject())).andReturn(
Lists.<Rule>newArrayList(
new IntervalLoadRule(new Interval("2012-01-01T00:00:00.000Z/2012-01-01T06:00:00.000Z"), 1, "hot"),
new IntervalLoadRule(new Interval("2012-01-01T00:00:00.000Z/2012-01-01T12:00:00.000Z"), 1, "normal"),
new IntervalLoadRule(new Interval("2012-01-01T00:00:00.000Z/2012-01-02T00:00:00.000Z"), 1, "cold")
new IntervalLoadRule(new Interval("2012-01-01T00:00:00.000Z/2012-01-01T06:00:00.000Z"), null, 1, "hot"),
new IntervalLoadRule(new Interval("2012-01-01T00:00:00.000Z/2012-01-01T12:00:00.000Z"), null, 1, "normal"),
new IntervalLoadRule(new Interval("2012-01-01T00:00:00.000Z/2012-01-02T00:00:00.000Z"), null, 1, "cold")
)
).atLeastOnce();
EasyMock.replay(databaseRuleManager);
@ -211,8 +211,8 @@ public class DruidCoordinatorRuleRunnerTest
EasyMock.expect(databaseRuleManager.getRulesWithDefault(EasyMock.<String>anyObject())).andReturn(
Lists.<Rule>newArrayList(
new IntervalLoadRule(new Interval("2012-01-01T00:00:00.000Z/2012-01-01T06:00:00.000Z"), 2, "hot"),
new IntervalLoadRule(new Interval("2012-01-01T00:00:00.000Z/2012-01-02T00:00:00.000Z"), 1, "cold")
new IntervalLoadRule(new Interval("2012-01-01T00:00:00.000Z/2012-01-01T06:00:00.000Z"), null, 2, "hot"),
new IntervalLoadRule(new Interval("2012-01-01T00:00:00.000Z/2012-01-02T00:00:00.000Z"), null, 1, "cold")
)
).atLeastOnce();
EasyMock.replay(databaseRuleManager);
@ -300,8 +300,8 @@ public class DruidCoordinatorRuleRunnerTest
EasyMock.expect(databaseRuleManager.getRulesWithDefault(EasyMock.<String>anyObject())).andReturn(
Lists.<Rule>newArrayList(
new IntervalLoadRule(new Interval("2012-01-01T00:00:00.000Z/2012-01-01T12:00:00.000Z"), 1, "hot"),
new IntervalLoadRule(new Interval("2012-01-01T00:00:00.000Z/2012-01-02T00:00:00.000Z"), 1, "normal")
new IntervalLoadRule(new Interval("2012-01-01T00:00:00.000Z/2012-01-01T12:00:00.000Z"), null, 1, "hot"),
new IntervalLoadRule(new Interval("2012-01-01T00:00:00.000Z/2012-01-02T00:00:00.000Z"), null, 1, "normal")
)
).atLeastOnce();
EasyMock.replay(databaseRuleManager);
@ -383,8 +383,8 @@ public class DruidCoordinatorRuleRunnerTest
EasyMock.expect(databaseRuleManager.getRulesWithDefault(EasyMock.<String>anyObject())).andReturn(
Lists.<Rule>newArrayList(
new IntervalLoadRule(new Interval("2012-01-01T00:00:00.000Z/2012-01-01T12:00:00.000Z"), 1, "hot"),
new IntervalLoadRule(new Interval("2012-01-01T00:00:00.000Z/2012-01-02T00:00:00.000Z"), 1, "normal")
new IntervalLoadRule(new Interval("2012-01-01T00:00:00.000Z/2012-01-01T12:00:00.000Z"), null, 1, "hot"),
new IntervalLoadRule(new Interval("2012-01-01T00:00:00.000Z/2012-01-02T00:00:00.000Z"), null, 1, "normal")
)
).atLeastOnce();
EasyMock.replay(databaseRuleManager);
@ -434,7 +434,7 @@ public class DruidCoordinatorRuleRunnerTest
EasyMock.expect(databaseRuleManager.getRulesWithDefault(EasyMock.<String>anyObject())).andReturn(
Lists.<Rule>newArrayList(
new IntervalLoadRule(new Interval("2012-01-02T00:00:00.000Z/2012-01-03T00:00:00.000Z"), 1, "normal")
new IntervalLoadRule(new Interval("2012-01-02T00:00:00.000Z/2012-01-03T00:00:00.000Z"), null, 1, "normal")
)
).atLeastOnce();
EasyMock.replay(databaseRuleManager);
@ -488,7 +488,7 @@ public class DruidCoordinatorRuleRunnerTest
EasyMock.expect(databaseRuleManager.getRulesWithDefault(EasyMock.<String>anyObject())).andReturn(
Lists.<Rule>newArrayList(
new IntervalLoadRule(new Interval("2012-01-01T00:00:00.000Z/2012-01-01T12:00:00.000Z"), 1, "normal"),
new IntervalLoadRule(new Interval("2012-01-01T00:00:00.000Z/2012-01-01T12:00:00.000Z"), null, 1, "normal"),
new IntervalDropRule(new Interval("2012-01-01T00:00:00.000Z/2012-01-02T00:00:00.000Z"))
)
).atLeastOnce();
@ -549,7 +549,7 @@ public class DruidCoordinatorRuleRunnerTest
EasyMock.expect(databaseRuleManager.getRulesWithDefault(EasyMock.<String>anyObject())).andReturn(
Lists.<Rule>newArrayList(
new IntervalLoadRule(new Interval("2012-01-01T00:00:00.000Z/2012-01-01T12:00:00.000Z"), 1, "normal"),
new IntervalLoadRule(new Interval("2012-01-01T00:00:00.000Z/2012-01-01T12:00:00.000Z"), null, 1, "normal"),
new IntervalDropRule(new Interval("2012-01-01T00:00:00.000Z/2012-01-02T00:00:00.000Z"))
)
).atLeastOnce();
@ -626,7 +626,7 @@ public class DruidCoordinatorRuleRunnerTest
EasyMock.expect(databaseRuleManager.getRulesWithDefault(EasyMock.<String>anyObject())).andReturn(
Lists.<Rule>newArrayList(
new IntervalLoadRule(new Interval("2012-01-01T00:00:00.000Z/2012-01-01T12:00:00.000Z"), 1, "hot"),
new IntervalLoadRule(new Interval("2012-01-01T00:00:00.000Z/2012-01-01T12:00:00.000Z"), null, 1, "hot"),
new IntervalDropRule(new Interval("2012-01-01T00:00:00.000Z/2012-01-02T00:00:00.000Z"))
)
).atLeastOnce();
@ -705,7 +705,7 @@ public class DruidCoordinatorRuleRunnerTest
EasyMock.expect(databaseRuleManager.getRulesWithDefault(EasyMock.<String>anyObject())).andReturn(
Lists.<Rule>newArrayList(
new IntervalLoadRule(new Interval("2012-01-01T00:00:00.000Z/2012-01-01T12:00:00.000Z"), 1, "hot"),
new IntervalLoadRule(new Interval("2012-01-01T00:00:00.000Z/2012-01-01T12:00:00.000Z"), null, 1, "hot"),
new IntervalDropRule(new Interval("2012-01-01T00:00:00.000Z/2012-01-02T00:00:00.000Z"))
)
).atLeastOnce();
@ -776,7 +776,7 @@ public class DruidCoordinatorRuleRunnerTest
{
EasyMock.expect(databaseRuleManager.getRulesWithDefault(EasyMock.<String>anyObject())).andReturn(
Lists.<Rule>newArrayList(
new IntervalLoadRule(new Interval("2012-01-01T00:00:00.000Z/2012-01-01T01:00:00.000Z"), 0, "normal")
new IntervalLoadRule(new Interval("2012-01-01T00:00:00.000Z/2012-01-01T01:00:00.000Z"), null, 0, "normal")
)
).atLeastOnce();
EasyMock.replay(databaseRuleManager);
@ -877,7 +877,7 @@ public class DruidCoordinatorRuleRunnerTest
EasyMock.expect(databaseRuleManager.getRulesWithDefault(EasyMock.<String>anyObject())).andReturn(
Lists.<Rule>newArrayList(
new IntervalLoadRule(new Interval("2012-01-01T00:00:00.000Z/2013-01-01T00:00:00.000Z"), 2, "hot")
new IntervalLoadRule(new Interval("2012-01-01T00:00:00.000Z/2013-01-01T00:00:00.000Z"), null, 2, "hot")
)
).atLeastOnce();
EasyMock.replay(databaseRuleManager);
@ -968,7 +968,7 @@ public class DruidCoordinatorRuleRunnerTest
EasyMock.expect(databaseRuleManager.getRulesWithDefault(EasyMock.<String>anyObject())).andReturn(
Lists.<Rule>newArrayList(
new IntervalLoadRule(new Interval("2012-01-01T00:00:00.000Z/2013-01-02T00:00:00.000Z"), 1, "normal")
new IntervalLoadRule(new Interval("2012-01-01T00:00:00.000Z/2013-01-02T00:00:00.000Z"), null, 1, "normal")
)
).atLeastOnce();
EasyMock.replay(databaseRuleManager);

View File

@ -0,0 +1,270 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.server.coordinator.rules;
import com.google.api.client.util.Lists;
import com.google.api.client.util.Maps;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.MinMaxPriorityQueue;
import com.google.common.collect.Ordering;
import com.google.common.collect.Sets;
import io.druid.client.DruidServer;
import io.druid.server.coordinator.CoordinatorStats;
import io.druid.server.coordinator.DruidCluster;
import io.druid.server.coordinator.DruidCoordinatorRuntimeParams;
import io.druid.server.coordinator.LoadPeonCallback;
import io.druid.server.coordinator.LoadQueuePeon;
import io.druid.server.coordinator.ReplicationThrottler;
import io.druid.server.coordinator.SegmentReplicantLookup;
import io.druid.server.coordinator.ServerHolder;
import io.druid.timeline.DataSegment;
import io.druid.timeline.partition.NoneShardSpec;
import org.easymock.EasyMock;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.util.Arrays;
import java.util.Map;
/**
*/
public class LoadRuleTest
{
private LoadQueuePeon mockPeon;
private ReplicationThrottler throttler;
private DataSegment segment;
@Before
public void setUp() throws Exception
{
mockPeon = EasyMock.createMock(LoadQueuePeon.class);
throttler = new ReplicationThrottler(1, 1);
for (String tier : Arrays.asList("hot", DruidServer.DEFAULT_TIER)) {
throttler.updateReplicationState(tier);
throttler.updateTerminationState(tier);
}
segment = new DataSegment(
"foo",
new Interval("0/3000"),
new DateTime().toString(),
Maps.<String, Object>newHashMap(),
Lists.<String>newArrayList(),
Lists.<String>newArrayList(),
new NoneShardSpec(),
0,
0
);
}
@After
public void tearDown() throws Exception
{
EasyMock.verify(mockPeon);
}
@Test
public void testLoad() throws Exception
{
mockPeon.loadSegment(EasyMock.<DataSegment>anyObject(), EasyMock.<LoadPeonCallback>anyObject());
EasyMock.expectLastCall().atLeastOnce();
EasyMock.expect(mockPeon.getSegmentsToLoad()).andReturn(Sets.<DataSegment>newHashSet()).atLeastOnce();
EasyMock.expect(mockPeon.getLoadQueueSize()).andReturn(0L).atLeastOnce();
EasyMock.replay(mockPeon);
LoadRule rule = new LoadRule()
{
private final Map<String, Integer> tiers = ImmutableMap.of(
"hot", 1,
DruidServer.DEFAULT_TIER, 2
);
@Override
public Map<String, Integer> getTieredReplicants()
{
return tiers;
}
@Override
public int getNumReplicants(String tier)
{
return tiers.get(tier);
}
@Override
public String getType()
{
return "test";
}
@Override
public boolean appliesTo(DataSegment segment, DateTime referenceTimestamp)
{
return true;
}
};
DruidCluster druidCluster = new DruidCluster(
ImmutableMap.of(
"hot",
MinMaxPriorityQueue.orderedBy(Ordering.natural().reverse()).create(
Arrays.asList(
new ServerHolder(
new DruidServer(
"serverHot",
"hostHot",
1000,
"historical",
"hot"
),
mockPeon
)
)
),
DruidServer.DEFAULT_TIER,
MinMaxPriorityQueue.orderedBy(Ordering.natural().reverse()).create(
Arrays.asList(
new ServerHolder(
new DruidServer(
"serverNorm",
"hostNorm",
1000,
"historical",
DruidServer.DEFAULT_TIER
),
mockPeon
)
)
)
)
);
CoordinatorStats stats = rule.run(
null,
DruidCoordinatorRuntimeParams.newBuilder()
.withDruidCluster(druidCluster)
.withSegmentReplicantLookup(SegmentReplicantLookup.make(new DruidCluster()))
.withReplicationManager(throttler)
.withAvailableSegments(Arrays.asList(segment)).build(),
segment
);
Assert.assertTrue(stats.getPerTierStats().get("assignedCount").get("hot").get() == 1);
Assert.assertTrue(stats.getPerTierStats().get("assignedCount").get(DruidServer.DEFAULT_TIER).get() == 2);
}
@Test
public void testDrop() throws Exception
{
mockPeon.dropSegment(EasyMock.<DataSegment>anyObject(), EasyMock.<LoadPeonCallback>anyObject());
EasyMock.expectLastCall().atLeastOnce();
EasyMock.expect(mockPeon.getSegmentsToLoad()).andReturn(Sets.<DataSegment>newHashSet()).atLeastOnce();
EasyMock.expect(mockPeon.getLoadQueueSize()).andReturn(0L).anyTimes();
EasyMock.replay(mockPeon);
LoadRule rule = new LoadRule()
{
private final Map<String, Integer> tiers = ImmutableMap.of(
"hot", 0,
DruidServer.DEFAULT_TIER, 0
);
@Override
public Map<String, Integer> getTieredReplicants()
{
return tiers;
}
@Override
public int getNumReplicants(String tier)
{
return tiers.get(tier);
}
@Override
public String getType()
{
return "test";
}
@Override
public boolean appliesTo(DataSegment segment, DateTime referenceTimestamp)
{
return true;
}
};
DruidServer server1 = new DruidServer(
"serverHot",
"hostHot",
1000,
"historical",
"hot"
);
server1.addDataSegment(segment.getIdentifier(), segment);
DruidServer server2 = new DruidServer(
"serverNorm",
"hostNorm",
1000,
"historical",
DruidServer.DEFAULT_TIER
);
server2.addDataSegment(segment.getIdentifier(), segment);
DruidCluster druidCluster = new DruidCluster(
ImmutableMap.of(
"hot",
MinMaxPriorityQueue.orderedBy(Ordering.natural().reverse()).create(
Arrays.asList(
new ServerHolder(
server1,
mockPeon
)
)
),
DruidServer.DEFAULT_TIER,
MinMaxPriorityQueue.orderedBy(Ordering.natural().reverse()).create(
Arrays.asList(
new ServerHolder(
server2,
mockPeon
)
)
)
)
);
CoordinatorStats stats = rule.run(
null,
DruidCoordinatorRuntimeParams.newBuilder()
.withDruidCluster(druidCluster)
.withSegmentReplicantLookup(SegmentReplicantLookup.make(druidCluster))
.withReplicationManager(throttler)
.withAvailableSegments(Arrays.asList(segment)).build(),
segment
);
Assert.assertTrue(stats.getPerTierStats().get("droppedCount").get("hot").get() == 1);
Assert.assertTrue(stats.getPerTierStats().get("droppedCount").get(DruidServer.DEFAULT_TIER).get() == 1);
}
}

View File

@ -42,6 +42,7 @@ public class PeriodLoadRuleTest
DateTime now = new DateTime("2013-01-01");
PeriodLoadRule rule = new PeriodLoadRule(
new Period("P5000Y"),
null,
0,
""
);
@ -57,6 +58,7 @@ public class PeriodLoadRuleTest
DateTime now = new DateTime("2012-12-31T01:00:00");
PeriodLoadRule rule = new PeriodLoadRule(
new Period("P1M"),
null,
0,
""
);