Fix loadstatus?full double counting expected segments (#5667)

* fix loadstatus?full double counting expected segments

* remove possible flakiness from Thread.sleep() in test
This commit is contained in:
David Lim 2018-04-23 13:41:16 -06:00 committed by Nishant Bangarwa
parent a3a9ada843
commit 55b003e5e8
2 changed files with 150 additions and 17 deletions

View File

@ -252,6 +252,7 @@ public class DruidCoordinator
.computeIfAbsent(tier, ignored -> new Object2LongOpenHashMap<>()) .computeIfAbsent(tier, ignored -> new Object2LongOpenHashMap<>())
.addTo(segment.getDataSource(), Math.max(ruleReplicants - currentReplicants, 0)); .addTo(segment.getDataSource(), Math.max(ruleReplicants - currentReplicants, 0));
}); });
break; // only the first matching rule applies
} }
} }

View File

@ -37,16 +37,18 @@ import io.druid.jackson.DefaultObjectMapper;
import io.druid.java.util.common.Intervals; import io.druid.java.util.common.Intervals;
import io.druid.java.util.common.concurrent.Execs; import io.druid.java.util.common.concurrent.Execs;
import io.druid.java.util.common.concurrent.ScheduledExecutorFactory; import io.druid.java.util.common.concurrent.ScheduledExecutorFactory;
import io.druid.java.util.emitter.core.Event;
import io.druid.java.util.emitter.service.ServiceEmitter;
import io.druid.metadata.MetadataRuleManager; import io.druid.metadata.MetadataRuleManager;
import io.druid.metadata.MetadataSegmentManager; import io.druid.metadata.MetadataSegmentManager;
import io.druid.server.DruidNode; import io.druid.server.DruidNode;
import io.druid.server.coordination.DruidServerMetadata; import io.druid.server.coordination.DruidServerMetadata;
import io.druid.server.coordination.ServerType; import io.druid.server.coordination.ServerType;
import io.druid.server.coordinator.rules.ForeverLoadRule; import io.druid.server.coordinator.rules.ForeverLoadRule;
import io.druid.server.coordinator.rules.IntervalLoadRule;
import io.druid.server.coordinator.rules.Rule; import io.druid.server.coordinator.rules.Rule;
import io.druid.server.initialization.ZkPathsConfig; import io.druid.server.initialization.ZkPathsConfig;
import io.druid.server.lookup.cache.LookupCoordinatorManager; import io.druid.server.lookup.cache.LookupCoordinatorManager;
import io.druid.server.metrics.NoopServiceEmitter;
import io.druid.timeline.DataSegment; import io.druid.timeline.DataSegment;
import it.unimi.dsi.fastutil.objects.Object2LongMap; import it.unimi.dsi.fastutil.objects.Object2LongMap;
import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFramework;
@ -96,6 +98,7 @@ public class DruidCoordinatorTest extends CuratorTestBase
private ObjectMapper objectMapper; private ObjectMapper objectMapper;
private JacksonConfigManager configManager; private JacksonConfigManager configManager;
private DruidNode druidNode; private DruidNode druidNode;
private LatchableServiceEmitter serviceEmitter = new LatchableServiceEmitter();
private static final String LOADPATH = "/druid/loadqueue/localhost:1234"; private static final String LOADPATH = "/druid/loadqueue/localhost:1234";
private static final long COORDINATOR_START_DELAY = 1; private static final long COORDINATOR_START_DELAY = 1;
private static final long COORDINATOR_PERIOD = 100; private static final long COORDINATOR_PERIOD = 100;
@ -185,7 +188,7 @@ public class DruidCoordinatorTest extends CuratorTestBase
serverInventoryView, serverInventoryView,
metadataRuleManager, metadataRuleManager,
curator, curator,
new NoopServiceEmitter(), serviceEmitter,
scheduledExecutorFactory, scheduledExecutorFactory,
null, null,
null, null,
@ -385,27 +388,17 @@ public class DruidCoordinatorTest extends CuratorTestBase
assignSegmentLatch.await(); assignSegmentLatch.await();
final CountDownLatch coordinatorRunLatch = new CountDownLatch(2);
serviceEmitter.latch = coordinatorRunLatch;
coordinatorRunLatch.await();
Assert.assertEquals(ImmutableMap.of(dataSource, 100.0), coordinator.getLoadStatus()); Assert.assertEquals(ImmutableMap.of(dataSource, 100.0), coordinator.getLoadStatus());
curator.delete().guaranteed().forPath(ZKPaths.makePath(LOADPATH, dataSegment.getIdentifier())); curator.delete().guaranteed().forPath(ZKPaths.makePath(LOADPATH, dataSegment.getIdentifier()));
// Wait for coordinator thread to run so that replication status is updated
while (coordinator.getSegmentAvailability().getLong(dataSource) != 0) {
Thread.sleep(50);
}
Map segmentAvailability = coordinator.getSegmentAvailability(); Map segmentAvailability = coordinator.getSegmentAvailability();
Assert.assertEquals(1, segmentAvailability.size()); Assert.assertEquals(1, segmentAvailability.size());
Assert.assertEquals(0L, segmentAvailability.get(dataSource)); Assert.assertEquals(0L, segmentAvailability.get(dataSource));
while (coordinator.hasLoadPending(dataSource)) {
Thread.sleep(50);
}
// wait historical data to be updated
long startMillis = System.currentTimeMillis();
long coordinatorRunPeriodMillis = druidCoordinatorConfig.getCoordinatorPeriod().getMillis();
while (System.currentTimeMillis() - startMillis < coordinatorRunPeriodMillis) {
Thread.sleep(100);
}
Map<String, ? extends Object2LongMap<String>> replicationStatus = coordinator.getReplicationStatus(); Map<String, ? extends Object2LongMap<String>> replicationStatus = coordinator.getReplicationStatus();
Assert.assertNotNull(replicationStatus); Assert.assertNotNull(replicationStatus);
Assert.assertEquals(1, replicationStatus.entrySet().size()); Assert.assertEquals(1, replicationStatus.entrySet().size());
@ -428,6 +421,127 @@ public class DruidCoordinatorTest extends CuratorTestBase
EasyMock.verify(metadataRuleManager); EasyMock.verify(metadataRuleManager);
} }
@Test(timeout = 60_000L)
public void testCoordinatorTieredRun() throws Exception
{
final String dataSource = "dataSource", hotTierName = "hot", coldTierName = "cold";
final Rule hotTier = new IntervalLoadRule(Intervals.of("2018-01-01/P1M"), ImmutableMap.of(hotTierName, 1));
final Rule coldTier = new ForeverLoadRule(ImmutableMap.of(coldTierName, 1));
final String loadPathCold = "/druid/loadqueue/cold:1234";
final DruidServer hotServer = new DruidServer("hot", "hot", null, 5L, ServerType.HISTORICAL, hotTierName, 0);
final DruidServer coldServer = new DruidServer("cold", "cold", null, 5L, ServerType.HISTORICAL, coldTierName, 0);
final Map<String, DataSegment> dataSegments = ImmutableMap.of(
"2018-01-02T00:00:00.000Z_2018-01-03T00:00:00.000Z",
new DataSegment(dataSource, Intervals.of("2018-01-02/P1D"), "v1", null, null, null, null, 0x9, 0),
"2018-01-03T00:00:00.000Z_2018-01-04T00:00:00.000Z",
new DataSegment(dataSource, Intervals.of("2018-01-03/P1D"), "v1", null, null, null, null, 0x9, 0),
"2017-01-01T00:00:00.000Z_2017-01-02T00:00:00.000Z",
new DataSegment(dataSource, Intervals.of("2017-01-01/P1D"), "v1", null, null, null, null, 0x9, 0)
);
final LoadQueuePeon loadQueuePeonCold = new CuratorLoadQueuePeon(
curator,
loadPathCold,
objectMapper,
Execs.scheduledSingleThreaded("coordinator_test_load_queue_peon_cold_scheduled-%d"),
Execs.singleThreaded("coordinator_test_load_queue_peon_cold-%d"),
druidCoordinatorConfig
);
final PathChildrenCache pathChildrenCacheCold = new PathChildrenCache(
curator, loadPathCold, true, true, Execs.singleThreaded("coordinator_test_path_children_cache_cold-%d")
);
loadManagementPeons.putAll(ImmutableMap.of("hot", loadQueuePeon, "cold", loadQueuePeonCold));
loadQueuePeonCold.start();
pathChildrenCache.start();
pathChildrenCacheCold.start();
DruidDataSource[] druidDataSources = {new DruidDataSource(dataSource, Collections.emptyMap())};
dataSegments.values().forEach(druidDataSources[0]::addSegment);
EasyMock.expect(metadataRuleManager.getRulesWithDefault(EasyMock.anyString()))
.andReturn(ImmutableList.of(hotTier, coldTier)).atLeastOnce();
EasyMock.expect(databaseSegmentManager.isStarted()).andReturn(true).anyTimes();
EasyMock.expect(databaseSegmentManager.getInventory()).andReturn(
ImmutableList.of(druidDataSources[0].toImmutableDruidDataSource())
).atLeastOnce();
EasyMock.expect(serverInventoryView.getInventory())
.andReturn(ImmutableList.of(hotServer, coldServer))
.atLeastOnce();
EasyMock.expect(serverInventoryView.isStarted()).andReturn(true).anyTimes();
EasyMock.replay(metadataRuleManager, databaseSegmentManager, serverInventoryView);
coordinator.start();
leaderAnnouncerLatch.await(); // Wait for this coordinator to become leader
final CountDownLatch assignSegmentLatchHot = new CountDownLatch(2);
pathChildrenCache.getListenable().addListener(
(client, event) -> {
if (event.getType().equals(PathChildrenCacheEvent.Type.CHILD_ADDED)) {
DataSegment segment = dataSegments
.entrySet()
.stream()
.filter(x -> event.getData().getPath().contains(x.getKey()))
.map(Map.Entry::getValue)
.findFirst()
.orElse(null);
if (segment != null) {
hotServer.addDataSegment(segment);
curator.delete().guaranteed().forPath(event.getData().getPath());
}
assignSegmentLatchHot.countDown();
}
}
);
final CountDownLatch assignSegmentLatchCold = new CountDownLatch(1);
pathChildrenCacheCold.getListenable().addListener(
(client, event) -> {
if (event.getType().equals(PathChildrenCacheEvent.Type.CHILD_ADDED)) {
DataSegment segment = dataSegments
.entrySet()
.stream()
.filter(x -> event.getData().getPath().contains(x.getKey()))
.map(Map.Entry::getValue)
.findFirst()
.orElse(null);
if (segment != null) {
coldServer.addDataSegment(segment);
curator.delete().guaranteed().forPath(event.getData().getPath());
}
assignSegmentLatchCold.countDown();
}
}
);
assignSegmentLatchHot.await();
assignSegmentLatchCold.await();
final CountDownLatch coordinatorRunLatch = new CountDownLatch(2);
serviceEmitter.latch = coordinatorRunLatch;
coordinatorRunLatch.await();
Assert.assertEquals(ImmutableMap.of(dataSource, 100.0), coordinator.getLoadStatus());
Map<String, ? extends Object2LongMap<String>> replicationStatus = coordinator.getReplicationStatus();
Assert.assertEquals(2, replicationStatus.entrySet().size());
Assert.assertEquals(0L, replicationStatus.get(hotTierName).getLong(dataSource));
Assert.assertEquals(0L, replicationStatus.get(coldTierName).getLong(dataSource));
coordinator.stop();
leaderUnannouncerLatch.await();
EasyMock.verify(serverInventoryView);
EasyMock.verify(databaseSegmentManager);
EasyMock.verify(metadataRuleManager);
}
@Test @Test
public void testOrderedAvailableDataSegments() public void testOrderedAvailableDataSegments()
{ {
@ -507,4 +621,22 @@ public class DruidCoordinatorTest extends CuratorTestBase
listener.stopBeingLeader(); listener.stopBeingLeader();
} }
} }
private static class LatchableServiceEmitter extends ServiceEmitter
{
private CountDownLatch latch;
private LatchableServiceEmitter()
{
super("", "", null);
}
@Override
public void emit(Event event)
{
if (latch != null && "segment/count".equals(event.toMap().get("metric"))) {
latch.countDown();
}
}
}
} }