From 857e5204bf2e71b83da6eecb71105f4947bbd1d6 Mon Sep 17 00:00:00 2001 From: Maytas Monsereenusorn Date: Thu, 18 Jun 2020 14:52:33 -1000 Subject: [PATCH] Coordinator loadstatus API full format does not consider Broadcast rules (#10048) * Coordinator loadstatus API full format does not consider Broadcast rules * address comments * fix checkstyle * minor optimization * address comments --- .../server/coordinator/DruidCoordinator.java | 51 +++- .../coordinator/DruidCoordinatorTest.java | 256 ++++++++++++++---- 2 files changed, 242 insertions(+), 65 deletions(-) diff --git a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java index c4de3644c64..0dcf636b744 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java @@ -69,6 +69,7 @@ import org.apache.druid.server.coordinator.duty.LogUsedSegments; import org.apache.druid.server.coordinator.duty.MarkAsUnusedOvershadowedSegments; import org.apache.druid.server.coordinator.duty.RunRules; import org.apache.druid.server.coordinator.duty.UnloadUnusedSegments; +import org.apache.druid.server.coordinator.rules.BroadcastDistributionRule; import org.apache.druid.server.coordinator.rules.LoadRule; import org.apache.druid.server.coordinator.rules.Rule; import org.apache.druid.server.initialization.ZkPathsConfig; @@ -262,6 +263,11 @@ public class DruidCoordinator } /** + * segmentReplicantLookup use in this method could potentially be stale since it is only updated on coordinator runs. + * However, this is ok as long as the {@param dataSegments} is refreshed/latest as this would at least still ensure + * that the stale data in segmentReplicantLookup would be under counting replication levels, + * rather than potentially falsely reporting that everything is available. + * * @return tier -> { dataSource -> underReplicationCount } map */ public Map> computeUnderReplicationCountsPerDataSourcePerTierForSegments( @@ -269,6 +275,13 @@ public class DruidCoordinator ) { final Map> underReplicationCountsPerDataSourcePerTier = new HashMap<>(); + final Set decommissioningServers = getDynamicConfigs().getDecommissioningNodes(); + final List broadcastTargetServers = serverInventoryView + .getInventory() + .stream() + .filter(druidServer -> druidServer.isSegmentBroadcastTarget() && !decommissioningServers.contains(druidServer.getHost())) + .map(DruidServer::toImmutableDruidServer) + .collect(Collectors.toList()); if (segmentReplicantLookup == null) { return underReplicationCountsPerDataSourcePerTier; @@ -280,20 +293,38 @@ public class DruidCoordinator final List rules = metadataRuleManager.getRulesWithDefault(segment.getDataSource()); for (final Rule rule : rules) { - if (!(rule instanceof LoadRule && rule.appliesTo(segment, now))) { + if (!rule.appliesTo(segment, now)) { continue; } - ((LoadRule) rule) - .getTieredReplicants() - .forEach((final String tier, final Integer ruleReplicants) -> { - int currentReplicants = segmentReplicantLookup.getLoadedReplicants(segment.getId(), tier); - Object2LongMap underReplicationPerDataSource = underReplicationCountsPerDataSourcePerTier - .computeIfAbsent(tier, ignored -> new Object2LongOpenHashMap<>()); + if (rule instanceof LoadRule) { + ((LoadRule) rule) + .getTieredReplicants() + .forEach((final String tier, final Integer ruleReplicants) -> { + int currentReplicants = segmentReplicantLookup.getLoadedReplicants(segment.getId(), tier); + Object2LongMap underReplicationPerDataSource = underReplicationCountsPerDataSourcePerTier + .computeIfAbsent(tier, ignored -> new Object2LongOpenHashMap<>()); + ((Object2LongOpenHashMap) underReplicationPerDataSource) + .addTo(segment.getDataSource(), Math.max(ruleReplicants - currentReplicants, 0)); + }); + } + + if (rule instanceof BroadcastDistributionRule) { + for (ImmutableDruidServer server : broadcastTargetServers) { + Object2LongMap underReplicationPerDataSource = underReplicationCountsPerDataSourcePerTier + .computeIfAbsent(server.getTier(), ignored -> new Object2LongOpenHashMap<>()); + if (server.getSegment(segment.getId()) == null) { ((Object2LongOpenHashMap) underReplicationPerDataSource) - .addTo(segment.getDataSource(), Math.max(ruleReplicants - currentReplicants, 0)); - }); - break; // only the first matching rule applies + .addTo(segment.getDataSource(), 1); + } else { + // This make sure that every datasource has a entry even if the all segments are loaded + underReplicationPerDataSource.putIfAbsent(segment.getDataSource(), 0); + } + } + } + + // only the first matching rule applies + break; } } diff --git a/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java b/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java index e48bf943021..0077fc1123a 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java @@ -28,8 +28,6 @@ import it.unimi.dsi.fastutil.objects.Object2LongMap; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.recipes.cache.PathChildrenCache; import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent; -import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener; -import org.apache.curator.utils.ZKPaths; import org.apache.druid.client.DataSourcesSnapshot; import org.apache.druid.client.DruidDataSource; import org.apache.druid.client.DruidServer; @@ -52,6 +50,7 @@ import org.apache.druid.metadata.SegmentsMetadataManager; import org.apache.druid.server.DruidNode; import org.apache.druid.server.coordination.DruidServerMetadata; import org.apache.druid.server.coordination.ServerType; +import org.apache.druid.server.coordinator.rules.ForeverBroadcastDistributionRule; import org.apache.druid.server.coordinator.rules.ForeverLoadRule; import org.apache.druid.server.coordinator.rules.IntervalLoadRule; import org.apache.druid.server.coordinator.rules.Rule; @@ -104,10 +103,12 @@ public class DruidCoordinatorTest extends CuratorTestBase private ObjectMapper objectMapper; private DruidNode druidNode; private LatchableServiceEmitter serviceEmitter = new LatchableServiceEmitter(); + private boolean serverAddedCountExpected = true; @Before public void setUp() throws Exception { + serverAddedCountExpected = true; druidServer = EasyMock.createMock(DruidServer.class); serverInventoryView = EasyMock.createMock(SingleServerInventoryView.class); segmentsMetadataManager = EasyMock.createNiceMock(SegmentsMetadataManager.class); @@ -375,37 +376,22 @@ public class DruidCoordinatorTest extends CuratorTestBase // This coordinator should be leader by now Assert.assertTrue(coordinator.isLeader()); Assert.assertEquals(druidNode.getHostAndPort(), coordinator.getCurrentLeader()); - - final CountDownLatch assignSegmentLatch = new CountDownLatch(1); - pathChildrenCache.getListenable().addListener( - new PathChildrenCacheListener() - { - @Override - public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent event) - { - if (CuratorUtils.isChildAdded(event)) { - if (assignSegmentLatch.getCount() > 0) { - //Coordinator should try to assign segment to druidServer historical - //Simulate historical loading segment - druidServer.addDataSegment(dataSegment); - assignSegmentLatch.countDown(); - } else { - Assert.fail("The same segment is assigned to the same server multiple times"); - } - } - } - } - ); pathChildrenCache.start(); + final CountDownLatch assignSegmentLatch = createCountDownLatchAndSetPathChildrenCacheListenerWithLatch( + 1, + pathChildrenCache, + ImmutableMap.of("2010-01-01T00:00:00.000Z_2010-01-02T00:00:00.000Z", dataSegment), + druidServer + ); assignSegmentLatch.await(); + Assert.assertTrue(serverAddedCountExpected); final CountDownLatch coordinatorRunLatch = new CountDownLatch(2); serviceEmitter.latch = coordinatorRunLatch; coordinatorRunLatch.await(); Assert.assertEquals(ImmutableMap.of(dataSource, 100.0), coordinator.getLoadStatus()); - curator.delete().guaranteed().forPath(ZKPaths.makePath(LOADPATH, dataSegment.getId().toString())); Object2IntMap numsUnavailableUsedSegmentsPerDataSource = coordinator.computeNumsUnavailableUsedSegmentsPerDataSource(); @@ -496,39 +482,11 @@ public class DruidCoordinatorTest extends CuratorTestBase coordinator.start(); leaderAnnouncerLatch.await(); // Wait for this coordinator to become leader - final CountDownLatch assignSegmentLatchHot = new CountDownLatch(2); - pathChildrenCache.getListenable().addListener( - (client, event) -> { - if (CuratorUtils.isChildAdded(event)) { - DataSegment segment = findSegmentRelatedToCuratorEvent(dataSegments, event); - if (segment != null) { - hotServer.addDataSegment(segment); - curator.delete().guaranteed().forPath(event.getData().getPath()); - } - - assignSegmentLatchHot.countDown(); - } - } - ); - - final CountDownLatch assignSegmentLatchCold = new CountDownLatch(1); - pathChildrenCacheCold.getListenable().addListener( - (CuratorFramework client, PathChildrenCacheEvent event) -> { - if (CuratorUtils.isChildAdded(event)) { - DataSegment segment = findSegmentRelatedToCuratorEvent(dataSegments, event); - - if (segment != null) { - coldServer.addDataSegment(segment); - curator.delete().guaranteed().forPath(event.getData().getPath()); - } - - assignSegmentLatchCold.countDown(); - } - } - ); - + final CountDownLatch assignSegmentLatchHot = createCountDownLatchAndSetPathChildrenCacheListenerWithLatch(2, pathChildrenCache, dataSegments, hotServer); + final CountDownLatch assignSegmentLatchCold = createCountDownLatchAndSetPathChildrenCacheListenerWithLatch(1, pathChildrenCacheCold, dataSegments, coldServer); assignSegmentLatchHot.await(); assignSegmentLatchCold.await(); + Assert.assertTrue(serverAddedCountExpected); final CountDownLatch coordinatorRunLatch = new CountDownLatch(2); serviceEmitter.latch = coordinatorRunLatch; @@ -550,6 +508,194 @@ public class DruidCoordinatorTest extends CuratorTestBase EasyMock.verify(metadataRuleManager); } + @Test(timeout = 60_000L) + public void testComputeUnderReplicationCountsPerDataSourcePerTierForSegmentsWithBroadcastRule() throws Exception + { + final String dataSource = "dataSource"; + final String hotTierName = "hot"; + final String coldTierName = "cold"; + final String tierName1 = "tier1"; + final String tierName2 = "tier2"; + final Rule broadcastDistributionRule = new ForeverBroadcastDistributionRule(); + final String loadPathCold = "/druid/loadqueue/cold:1234"; + final String loadPathBroker1 = "/druid/loadqueue/broker1:1234"; + final String loadPathBroker2 = "/druid/loadqueue/broker2:1234"; + final String loadPathPeon = "/druid/loadqueue/peon:1234"; + final DruidServer hotServer = new DruidServer("hot", "hot", null, 5L, ServerType.HISTORICAL, hotTierName, 0); + final DruidServer coldServer = new DruidServer("cold", "cold", null, 5L, ServerType.HISTORICAL, coldTierName, 0); + final DruidServer brokerServer1 = new DruidServer("broker1", "broker1", null, 5L, ServerType.BROKER, tierName1, 0); + final DruidServer brokerServer2 = new DruidServer("broker2", "broker2", null, 5L, ServerType.BROKER, tierName2, 0); + final DruidServer peonServer = new DruidServer("peon", "peon", null, 5L, ServerType.INDEXER_EXECUTOR, tierName2, 0); + + final Map dataSegments = ImmutableMap.of( + "2018-01-02T00:00:00.000Z_2018-01-03T00:00:00.000Z", + new DataSegment(dataSource, Intervals.of("2018-01-02/P1D"), "v1", null, null, null, null, 0x9, 0), + "2018-01-03T00:00:00.000Z_2018-01-04T00:00:00.000Z", + new DataSegment(dataSource, Intervals.of("2018-01-03/P1D"), "v1", null, null, null, null, 0x9, 0), + "2017-01-01T00:00:00.000Z_2017-01-02T00:00:00.000Z", + new DataSegment(dataSource, Intervals.of("2017-01-01/P1D"), "v1", null, null, null, null, 0x9, 0) + ); + + final LoadQueuePeon loadQueuePeonCold = new CuratorLoadQueuePeon( + curator, + loadPathCold, + objectMapper, + Execs.scheduledSingleThreaded("coordinator_test_load_queue_peon_cold_scheduled-%d"), + Execs.singleThreaded("coordinator_test_load_queue_peon_cold-%d"), + druidCoordinatorConfig + ); + + final LoadQueuePeon loadQueuePeonBroker1 = new CuratorLoadQueuePeon( + curator, + loadPathBroker1, + objectMapper, + Execs.scheduledSingleThreaded("coordinator_test_load_queue_peon_broker1_scheduled-%d"), + Execs.singleThreaded("coordinator_test_load_queue_peon_broker1-%d"), + druidCoordinatorConfig + ); + + final LoadQueuePeon loadQueuePeonBroker2 = new CuratorLoadQueuePeon( + curator, + loadPathBroker2, + objectMapper, + Execs.scheduledSingleThreaded("coordinator_test_load_queue_peon_broker2_scheduled-%d"), + Execs.singleThreaded("coordinator_test_load_queue_peon_broker2-%d"), + druidCoordinatorConfig + ); + + final LoadQueuePeon loadQueuePeonPoenServer = new CuratorLoadQueuePeon( + curator, + loadPathPeon, + objectMapper, + Execs.scheduledSingleThreaded("coordinator_test_load_queue_peon_peon_scheduled-%d"), + Execs.singleThreaded("coordinator_test_load_queue_peon_peon-%d"), + druidCoordinatorConfig + ); + final PathChildrenCache pathChildrenCacheCold = new PathChildrenCache( + curator, + loadPathCold, + true, + true, + Execs.singleThreaded("coordinator_test_path_children_cache_cold-%d") + ); + final PathChildrenCache pathChildrenCacheBroker1 = new PathChildrenCache( + curator, + loadPathBroker1, + true, + true, + Execs.singleThreaded("coordinator_test_path_children_cache_broker1-%d") + ); + final PathChildrenCache pathChildrenCacheBroker2 = new PathChildrenCache( + curator, + loadPathBroker2, + true, + true, + Execs.singleThreaded("coordinator_test_path_children_cache_broker2-%d") + ); + final PathChildrenCache pathChildrenCachePeon = new PathChildrenCache( + curator, + loadPathPeon, + true, + true, + Execs.singleThreaded("coordinator_test_path_children_cache_peon-%d") + ); + + loadManagementPeons.putAll(ImmutableMap.of("hot", loadQueuePeon, + "cold", loadQueuePeonCold, + "broker1", loadQueuePeonBroker1, + "broker2", loadQueuePeonBroker2, + "peon", loadQueuePeonPoenServer)); + + loadQueuePeonCold.start(); + loadQueuePeonBroker1.start(); + loadQueuePeonBroker2.start(); + loadQueuePeonPoenServer.start(); + pathChildrenCache.start(); + pathChildrenCacheCold.start(); + pathChildrenCacheBroker1.start(); + pathChildrenCacheBroker2.start(); + pathChildrenCachePeon.start(); + + DruidDataSource[] druidDataSources = {new DruidDataSource(dataSource, Collections.emptyMap())}; + dataSegments.values().forEach(druidDataSources[0]::addSegment); + + setupSegmentsMetadataMock(druidDataSources[0]); + + EasyMock.expect(metadataRuleManager.getRulesWithDefault(EasyMock.anyString())) + .andReturn(ImmutableList.of(broadcastDistributionRule)).atLeastOnce(); + EasyMock.expect(metadataRuleManager.getAllRules()) + .andReturn(ImmutableMap.of(dataSource, ImmutableList.of(broadcastDistributionRule))).atLeastOnce(); + + EasyMock.expect(serverInventoryView.getInventory()) + .andReturn(ImmutableList.of(hotServer, coldServer, brokerServer1, brokerServer2, peonServer)) + .atLeastOnce(); + EasyMock.expect(serverInventoryView.isStarted()).andReturn(true).anyTimes(); + + EasyMock.replay(metadataRuleManager, serverInventoryView); + + coordinator.start(); + leaderAnnouncerLatch.await(); // Wait for this coordinator to become leader + + final CountDownLatch assignSegmentLatchHot = createCountDownLatchAndSetPathChildrenCacheListenerWithLatch(3, pathChildrenCache, dataSegments, hotServer); + final CountDownLatch assignSegmentLatchCold = createCountDownLatchAndSetPathChildrenCacheListenerWithLatch(3, pathChildrenCacheCold, dataSegments, coldServer); + final CountDownLatch assignSegmentLatchBroker1 = createCountDownLatchAndSetPathChildrenCacheListenerWithLatch(3, pathChildrenCacheBroker1, dataSegments, brokerServer1); + final CountDownLatch assignSegmentLatchBroker2 = createCountDownLatchAndSetPathChildrenCacheListenerWithLatch(3, pathChildrenCacheBroker2, dataSegments, brokerServer2); + final CountDownLatch assignSegmentLatchPeon = createCountDownLatchAndSetPathChildrenCacheListenerWithLatch(3, pathChildrenCachePeon, dataSegments, peonServer); + assignSegmentLatchHot.await(); + assignSegmentLatchCold.await(); + assignSegmentLatchBroker1.await(); + assignSegmentLatchBroker2.await(); + assignSegmentLatchPeon.await(); + Assert.assertTrue(serverAddedCountExpected); + + final CountDownLatch coordinatorRunLatch = new CountDownLatch(2); + serviceEmitter.latch = coordinatorRunLatch; + coordinatorRunLatch.await(); + + Assert.assertEquals(ImmutableMap.of(dataSource, 100.0), coordinator.getLoadStatus()); + + Map> underReplicationCountsPerDataSourcePerTier = + coordinator.computeUnderReplicationCountsPerDataSourcePerTier(); + Assert.assertEquals(4, underReplicationCountsPerDataSourcePerTier.size()); + Assert.assertEquals(0L, underReplicationCountsPerDataSourcePerTier.get(hotTierName).getLong(dataSource)); + Assert.assertEquals(0L, underReplicationCountsPerDataSourcePerTier.get(coldTierName).getLong(dataSource)); + Assert.assertEquals(0L, underReplicationCountsPerDataSourcePerTier.get(tierName1).getLong(dataSource)); + Assert.assertEquals(0L, underReplicationCountsPerDataSourcePerTier.get(tierName2).getLong(dataSource)); + + coordinator.stop(); + leaderUnannouncerLatch.await(); + + EasyMock.verify(serverInventoryView); + EasyMock.verify(segmentsMetadataManager); + EasyMock.verify(metadataRuleManager); + } + + private CountDownLatch createCountDownLatchAndSetPathChildrenCacheListenerWithLatch(int latchCount, + PathChildrenCache pathChildrenCache, + Map segments, + DruidServer server) + { + final CountDownLatch countDownLatch = new CountDownLatch(latchCount); + pathChildrenCache.getListenable().addListener( + (CuratorFramework client, PathChildrenCacheEvent event) -> { + if (CuratorUtils.isChildAdded(event)) { + if (countDownLatch.getCount() > 0) { + DataSegment segment = findSegmentRelatedToCuratorEvent(segments, event); + if (segment != null) { + server.addDataSegment(segment); + curator.delete().guaranteed().forPath(event.getData().getPath()); + } + countDownLatch.countDown(); + } else { + // The segment is assigned to the server more times than expected + serverAddedCountExpected = false; + } + } + } + ); + return countDownLatch; + } + private void setupSegmentsMetadataMock(DruidDataSource dataSource) { EasyMock.expect(segmentsMetadataManager.isPollingDatabasePeriodically()).andReturn(true).anyTimes();