bug fix for dropping segments in master

This commit is contained in:
Fangjin Yang 2012-12-14 17:06:03 -08:00
parent b4a1bb4a00
commit 74a977504e
4 changed files with 162 additions and 24 deletions

View File

@ -43,8 +43,8 @@ import java.util.Map;
public class TimeBoundaryQuery extends BaseQuery<Result<TimeBoundaryResultValue>>
{
public static final Interval MY_Y2K_INTERVAL = new Interval(
new DateTime(Long.MIN_VALUE),
new DateTime(Long.MAX_VALUE)
new DateTime("0000-01-01"),
new DateTime("9000-01-01")
);
public static final String MAX_TIME = "maxTime";
public static final String MIN_TIME = "minTime";

View File

@ -94,9 +94,14 @@ public class ServerHolder implements Comparable<ServerHolder>
return availableSize;
}
public boolean isServingSegment(DataSegment segment)
{
return (server.getSegment(segment.getIdentifier()) != null);
}
public boolean containsSegment(DataSegment segment)
{
return (server.getSegment(segment.getIdentifier()) != null || peon.getSegmentsToLoad().contains(segment));
return isServingSegment(segment) || peon.getSegmentsToLoad().contains(segment);
}
@Override

View File

@ -163,23 +163,25 @@ public abstract class LoadRule implements Rule
while (actualNumReplicantsForType > expectedNumReplicantsForType) {
ServerHolder holder = serverQueue.pollLast();
if (holder == null) {
log.warn("Wtf, holder was null? Do I have no servers[%s]?", serverQueue);
continue;
log.warn("Wtf, holder was null? I have no servers serving [%s]?", segment.getIdentifier());
break;
}
holder.getPeon().dropSegment(
segment,
new LoadPeonCallback()
{
@Override
protected void execute()
if (holder.isServingSegment(segment)) {
holder.getPeon().dropSegment(
segment,
new LoadPeonCallback()
{
@Override
protected void execute()
{
}
}
}
);
);
--actualNumReplicantsForType;
stats.addToTieredStat("droppedCount", tier, 1);
}
droppedServers.add(holder);
--actualNumReplicantsForType;
stats.addToTieredStat("droppedCount", tier, 1);
}
serverQueue.addAll(droppedServers);
}

View File

@ -31,7 +31,6 @@ import com.metamx.druid.db.DatabaseRuleManager;
import com.metamx.druid.master.rules.IntervalDropRule;
import com.metamx.druid.master.rules.IntervalLoadRule;
import com.metamx.druid.master.rules.Rule;
import com.metamx.druid.master.rules.RuleMap;
import com.metamx.druid.shard.NoneShardSpec;
import com.metamx.emitter.EmittingLogger;
import com.metamx.emitter.service.ServiceEmitter;
@ -86,14 +85,6 @@ public class DruidMasterRuleRunnerTest
}
ruleRunner = new DruidMasterRuleRunner(master);
mockPeon.loadSegment(EasyMock.<DataSegment>anyObject(), EasyMock.<LoadPeonCallback>anyObject());
EasyMock.expectLastCall().anyTimes();
mockPeon.dropSegment(EasyMock.<DataSegment>anyObject(), EasyMock.<LoadPeonCallback>anyObject());
EasyMock.expectLastCall().anyTimes();
EasyMock.expect(mockPeon.getSegmentsToLoad()).andReturn(Sets.<DataSegment>newHashSet()).anyTimes();
EasyMock.expect(mockPeon.getLoadQueueSize()).andReturn(0L).atLeastOnce();
EasyMock.replay(mockPeon);
}
@After
@ -113,6 +104,12 @@ public class DruidMasterRuleRunnerTest
@Test
public void testRunThreeTiersOneReplicant() throws Exception
{
mockPeon.loadSegment(EasyMock.<DataSegment>anyObject(), EasyMock.<LoadPeonCallback>anyObject());
EasyMock.expectLastCall().atLeastOnce();
EasyMock.expect(mockPeon.getSegmentsToLoad()).andReturn(Sets.<DataSegment>newHashSet()).atLeastOnce();
EasyMock.expect(mockPeon.getLoadQueueSize()).andReturn(0L).atLeastOnce();
EasyMock.replay(mockPeon);
EasyMock.expect(databaseRuleManager.getRulesWithDefault(EasyMock.<String>anyObject())).andReturn(
Lists.<Rule>newArrayList(
new IntervalLoadRule(new Interval("2012-01-01T00:00:00.000Z/2012-01-01T06:00:00.000Z"), 1, "hot"),
@ -202,6 +199,12 @@ public class DruidMasterRuleRunnerTest
@Test
public void testRunTwoTiersTwoReplicants() throws Exception
{
mockPeon.loadSegment(EasyMock.<DataSegment>anyObject(), EasyMock.<LoadPeonCallback>anyObject());
EasyMock.expectLastCall().atLeastOnce();
EasyMock.expect(mockPeon.getSegmentsToLoad()).andReturn(Sets.<DataSegment>newHashSet()).atLeastOnce();
EasyMock.expect(mockPeon.getLoadQueueSize()).andReturn(0L).atLeastOnce();
EasyMock.replay(mockPeon);
EasyMock.expect(databaseRuleManager.getRulesWithDefault(EasyMock.<String>anyObject())).andReturn(
Lists.<Rule>newArrayList(
new IntervalLoadRule(new Interval("2012-01-01T00:00:00.000Z/2012-01-01T06:00:00.000Z"), 2, "hot"),
@ -284,6 +287,12 @@ public class DruidMasterRuleRunnerTest
@Test
public void testRunTwoTiersWithExistingSegments() throws Exception
{
mockPeon.loadSegment(EasyMock.<DataSegment>anyObject(), EasyMock.<LoadPeonCallback>anyObject());
EasyMock.expectLastCall().atLeastOnce();
EasyMock.expect(mockPeon.getSegmentsToLoad()).andReturn(Sets.<DataSegment>newHashSet()).atLeastOnce();
EasyMock.expect(mockPeon.getLoadQueueSize()).andReturn(0L).atLeastOnce();
EasyMock.replay(mockPeon);
EasyMock.expect(databaseRuleManager.getRulesWithDefault(EasyMock.<String>anyObject())).andReturn(
Lists.<Rule>newArrayList(
new IntervalLoadRule(new Interval("2012-01-01T00:00:00.000Z/2012-01-01T12:00:00.000Z"), 1, "hot"),
@ -356,6 +365,12 @@ public class DruidMasterRuleRunnerTest
@Test
public void testRunTwoTiersTierDoesNotExist() throws Exception
{
mockPeon.loadSegment(EasyMock.<DataSegment>anyObject(), EasyMock.<LoadPeonCallback>anyObject());
EasyMock.expectLastCall().atLeastOnce();
EasyMock.expect(mockPeon.getSegmentsToLoad()).andReturn(Sets.<DataSegment>newHashSet()).atLeastOnce();
EasyMock.expect(mockPeon.getLoadQueueSize()).andReturn(0L).atLeastOnce();
EasyMock.replay(mockPeon);
emitter.emit(EasyMock.<ServiceEventBuilder>anyObject());
EasyMock.expectLastCall().times(12);
EasyMock.replay(emitter);
@ -455,6 +470,12 @@ public class DruidMasterRuleRunnerTest
@Test
public void testDropRemove() throws Exception
{
mockPeon.dropSegment(EasyMock.<DataSegment>anyObject(), EasyMock.<LoadPeonCallback>anyObject());
EasyMock.expectLastCall().atLeastOnce();
EasyMock.expect(mockPeon.getSegmentsToLoad()).andReturn(Sets.<DataSegment>newHashSet()).atLeastOnce();
EasyMock.expect(mockPeon.getLoadQueueSize()).andReturn(0L).atLeastOnce();
EasyMock.replay(mockPeon);
master.removeSegment(EasyMock.<DataSegment>anyObject());
EasyMock.expectLastCall().atLeastOnce();
EasyMock.replay(master);
@ -513,6 +534,12 @@ public class DruidMasterRuleRunnerTest
@Test
public void testDropTooManyInSameTier() throws Exception
{
mockPeon.dropSegment(EasyMock.<DataSegment>anyObject(), EasyMock.<LoadPeonCallback>anyObject());
EasyMock.expectLastCall().atLeastOnce();
EasyMock.expect(mockPeon.getSegmentsToLoad()).andReturn(Sets.<DataSegment>newHashSet()).atLeastOnce();
EasyMock.expect(mockPeon.getLoadQueueSize()).andReturn(0L).atLeastOnce();
EasyMock.replay(mockPeon);
EasyMock.expect(databaseRuleManager.getRulesWithDefault(EasyMock.<String>anyObject())).andReturn(
Lists.<Rule>newArrayList(
new IntervalLoadRule(new Interval("2012-01-01T00:00:00.000Z/2012-01-01T12:00:00.000Z"), 1, "normal"),
@ -581,6 +608,14 @@ public class DruidMasterRuleRunnerTest
@Test
public void testDropTooManyInDifferentTiers() throws Exception
{
mockPeon.loadSegment(EasyMock.<DataSegment>anyObject(), EasyMock.<LoadPeonCallback>anyObject());
EasyMock.expectLastCall().atLeastOnce();
mockPeon.dropSegment(EasyMock.<DataSegment>anyObject(), EasyMock.<LoadPeonCallback>anyObject());
EasyMock.expectLastCall().atLeastOnce();
EasyMock.expect(mockPeon.getSegmentsToLoad()).andReturn(Sets.<DataSegment>newHashSet()).atLeastOnce();
EasyMock.expect(mockPeon.getLoadQueueSize()).andReturn(0L).atLeastOnce();
EasyMock.replay(mockPeon);
EasyMock.expect(databaseRuleManager.getRulesWithDefault(EasyMock.<String>anyObject())).andReturn(
Lists.<Rule>newArrayList(
new IntervalLoadRule(new Interval("2012-01-01T00:00:00.000Z/2012-01-01T12:00:00.000Z"), 1, "hot"),
@ -653,6 +688,14 @@ public class DruidMasterRuleRunnerTest
@Test
public void testDontDropInDifferentTiers() throws Exception
{
mockPeon.loadSegment(EasyMock.<DataSegment>anyObject(), EasyMock.<LoadPeonCallback>anyObject());
EasyMock.expectLastCall().atLeastOnce();
mockPeon.dropSegment(EasyMock.<DataSegment>anyObject(), EasyMock.<LoadPeonCallback>anyObject());
EasyMock.expectLastCall().atLeastOnce();
EasyMock.expect(mockPeon.getSegmentsToLoad()).andReturn(Sets.<DataSegment>newHashSet()).atLeastOnce();
EasyMock.expect(mockPeon.getLoadQueueSize()).andReturn(0L).atLeastOnce();
EasyMock.replay(mockPeon);
EasyMock.expect(databaseRuleManager.getRulesWithDefault(EasyMock.<String>anyObject())).andReturn(
Lists.<Rule>newArrayList(
new IntervalLoadRule(new Interval("2012-01-01T00:00:00.000Z/2012-01-01T12:00:00.000Z"), 1, "hot"),
@ -717,4 +760,92 @@ public class DruidMasterRuleRunnerTest
Assert.assertTrue(stats.getPerTierStats().get("droppedCount") == null);
Assert.assertTrue(stats.getGlobalStats().get("deletedCount").get() == 12);
}
@Test
public void testDropServerActuallyServesSegment() throws Exception
{
EasyMock.expect(databaseRuleManager.getRulesWithDefault(EasyMock.<String>anyObject())).andReturn(
Lists.<Rule>newArrayList(
new IntervalLoadRule(new Interval("2012-01-01T00:00:00.000Z/2012-01-01T01:00:00.000Z"), 0, "normal")
)
).atLeastOnce();
EasyMock.replay(databaseRuleManager);
DruidServer server1 = new DruidServer(
"server1",
"host1",
1000,
"historical",
"normal"
);
server1.addDataSegment(availableSegments.get(0).getIdentifier(), availableSegments.get(0));
DruidServer server2 = new DruidServer(
"serverNorm2",
"hostNorm2",
1000,
"historical",
"normal"
);
server2.addDataSegment(availableSegments.get(1).getIdentifier(), availableSegments.get(1));
DruidServer server3 = new DruidServer(
"serverNorm3",
"hostNorm3",
1000,
"historical",
"normal"
);
server3.addDataSegment(availableSegments.get(1).getIdentifier(), availableSegments.get(1));
server3.addDataSegment(availableSegments.get(2).getIdentifier(), availableSegments.get(2));
mockPeon.dropSegment(EasyMock.<DataSegment>anyObject(), EasyMock.<LoadPeonCallback>anyObject());
EasyMock.expectLastCall().atLeastOnce();
EasyMock.expect(mockPeon.getSegmentsToLoad()).andReturn(Sets.<DataSegment>newHashSet()).atLeastOnce();
EasyMock.expect(mockPeon.getLoadQueueSize()).andReturn(0L).atLeastOnce();
EasyMock.replay(mockPeon);
LoadQueuePeon anotherMockPeon = EasyMock.createMock(LoadQueuePeon.class);
EasyMock.expect(anotherMockPeon.getSegmentsToLoad()).andReturn(Sets.<DataSegment>newHashSet()).atLeastOnce();
EasyMock.expect(anotherMockPeon.getLoadQueueSize()).andReturn(10L).atLeastOnce();
EasyMock.replay(anotherMockPeon);
DruidCluster druidCluster = new DruidCluster(
ImmutableMap.of(
"normal",
MinMaxPriorityQueue.orderedBy(Ordering.natural().reverse()).create(
Arrays.asList(
new ServerHolder(
server1,
mockPeon
),
new ServerHolder(
server2,
anotherMockPeon
),
new ServerHolder(
server3,
anotherMockPeon
)
)
)
)
);
SegmentReplicantLookup segmentReplicantLookup = SegmentReplicantLookup.make(druidCluster);
DruidMasterRuntimeParams params = new DruidMasterRuntimeParams.Builder()
.withDruidCluster(druidCluster)
.withMillisToWaitBeforeDeleting(0L)
.withAvailableSegments(availableSegments)
.withDatabaseRuleManager(databaseRuleManager)
.withSegmentReplicantLookup(segmentReplicantLookup)
.build();
DruidMasterRuntimeParams afterParams = ruleRunner.run(params);
MasterStats stats = afterParams.getMasterStats();
Assert.assertTrue(stats.getPerTierStats().get("droppedCount").get("normal").get() == 1);
EasyMock.verify(mockPeon);
EasyMock.verify(anotherMockPeon);
}
}