From ef48aceff8bb519aa426dad1958ef5727d66e98f Mon Sep 17 00:00:00 2001 From: George Shiqi Wu Date: Fri, 1 Mar 2024 15:38:27 -0500 Subject: [PATCH] Fix segment/unavailable/count (#16020) --- .../server/coordinator/DruidCoordinator.java | 2 +- .../coordinator/DruidCoordinatorTest.java | 107 ++++++++++++++++++ 2 files changed, 108 insertions(+), 1 deletion(-) diff --git a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java index caa1c7fb66f..725fbefd6d6 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java @@ -242,7 +242,7 @@ public class DruidCoordinator final Iterable dataSegments = metadataManager.segments().iterateAllUsedSegments(); for (DataSegment segment : dataSegments) { SegmentReplicaCount replicaCount = segmentReplicationStatus.getReplicaCountsInCluster(segment.getId()); - if (replicaCount != null && replicaCount.totalLoaded() > 0) { + if (replicaCount != null && (replicaCount.totalLoaded() > 0 || replicaCount.required() == 0)) { datasourceToUnavailableSegments.addTo(segment.getDataSource(), 0); } else { datasourceToUnavailableSegments.addTo(segment.getDataSource(), 1); diff --git a/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java b/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java index a9e85ca1b23..d2c70cb4cc4 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java @@ -798,6 +798,113 @@ public class DruidCoordinatorTest extends CuratorTestBase latch2.await(); } + @Test(timeout = 60_000L) + public void testCoordinatorRun_queryFromDeepStorage() throws Exception + { + String dataSource = "dataSource1"; + + String coldTier = "coldTier"; + String hotTier = "hotTier"; + + // Setup MetadataRuleManager + Rule intervalLoadRule = new IntervalLoadRule(Intervals.of("2010-02-01/P1M"), ImmutableMap.of(hotTier, 1), null); + Rule foreverLoadRule = new ForeverLoadRule(ImmutableMap.of(coldTier, 0), null); + EasyMock.expect(metadataRuleManager.getRulesWithDefault(EasyMock.anyString())) + .andReturn(ImmutableList.of(intervalLoadRule, foreverLoadRule)).atLeastOnce(); + + metadataRuleManager.stop(); + EasyMock.expectLastCall().once(); + + EasyMock.replay(metadataRuleManager); + + // Setup SegmentsMetadataManager + DruidDataSource[] dataSources = { + new DruidDataSource(dataSource, Collections.emptyMap()) + + }; + final DataSegment dataSegment = new DataSegment( + dataSource, + Intervals.of("2010-01-01/P1D"), + "v1", + null, + null, + null, + null, + 0x9, + 0 + ); + final DataSegment dataSegmentHot = new DataSegment( + dataSource, + Intervals.of("2010-02-01/P1D"), + "v1", + null, + null, + null, + null, + 0x9, + 0 + ); + dataSources[0].addSegment(dataSegment).addSegment(dataSegmentHot); + + setupSegmentsMetadataMock(dataSources[0]); + ImmutableDruidDataSource immutableDruidDataSource = EasyMock.createNiceMock(ImmutableDruidDataSource.class); + EasyMock.expect(immutableDruidDataSource.getSegments()) + .andReturn(ImmutableSet.of(dataSegment, dataSegmentHot)).atLeastOnce(); + EasyMock.replay(immutableDruidDataSource); + + // Setup ServerInventoryView + druidServer = new DruidServer("server1", "localhost", null, 5L, ServerType.HISTORICAL, hotTier, 0); + DruidServer druidServer2 = new DruidServer("server2", "localhost", null, 5L, ServerType.HISTORICAL, coldTier, 0); + setupPeons(ImmutableMap.of("server1", loadQueuePeon, "server2", loadQueuePeon)); + EasyMock.expect(serverInventoryView.getInventory()).andReturn( + ImmutableList.of(druidServer, druidServer2) + ).atLeastOnce(); + EasyMock.expect(serverInventoryView.isStarted()).andReturn(true).anyTimes(); + EasyMock.replay(serverInventoryView, loadQueueTaskMaster); + + coordinator.start(); + + // Wait for this coordinator to become leader + leaderAnnouncerLatch.await(); + + // This coordinator should be leader by now + Assert.assertTrue(coordinator.isLeader()); + Assert.assertEquals(druidNode.getHostAndPort(), coordinator.getCurrentLeader()); + pathChildrenCache.start(); + + final CountDownLatch coordinatorRunLatch = new CountDownLatch(2); + serviceEmitter.latch = coordinatorRunLatch; + coordinatorRunLatch.await(); + + Object2IntMap numsUnavailableUsedSegmentsPerDataSource = + coordinator.getDatasourceToUnavailableSegmentCount(); + Assert.assertEquals(1, numsUnavailableUsedSegmentsPerDataSource.size()); + // The cold tier segment should not be unavailable, the hot one should be unavailable + Assert.assertEquals(1, numsUnavailableUsedSegmentsPerDataSource.getInt(dataSource)); + + Map> underReplicationCountsPerDataSourcePerTier = + coordinator.getTierToDatasourceToUnderReplicatedCount(false); + Assert.assertNotNull(underReplicationCountsPerDataSourcePerTier); + Assert.assertEquals(2, underReplicationCountsPerDataSourcePerTier.size()); + + Object2LongMap underRepliicationCountsPerDataSourceHotTier = underReplicationCountsPerDataSourcePerTier.get(hotTier); + Assert.assertNotNull(underRepliicationCountsPerDataSourceHotTier); + Assert.assertEquals(1, underRepliicationCountsPerDataSourceHotTier.getLong(dataSource)); + + Object2LongMap underRepliicationCountsPerDataSourceColdTier = underReplicationCountsPerDataSourcePerTier.get(coldTier); + Assert.assertNotNull(underRepliicationCountsPerDataSourceColdTier); + Assert.assertEquals(0, underRepliicationCountsPerDataSourceColdTier.getLong(dataSource)); + + coordinator.stop(); + leaderUnannouncerLatch.await(); + + Assert.assertFalse(coordinator.isLeader()); + Assert.assertNull(coordinator.getCurrentLeader()); + + EasyMock.verify(serverInventoryView); + EasyMock.verify(metadataRuleManager); + } + private CountDownLatch createCountDownLatchAndSetPathChildrenCacheListenerWithLatch( int latchCount, PathChildrenCache pathChildrenCache,