mirror of https://github.com/apache/druid.git
Fix segment/unavailable/count (#16020)
This commit is contained in:
parent
bf0995f846
commit
ef48aceff8
|
@ -242,7 +242,7 @@ public class DruidCoordinator
|
||||||
final Iterable<DataSegment> dataSegments = metadataManager.segments().iterateAllUsedSegments();
|
final Iterable<DataSegment> dataSegments = metadataManager.segments().iterateAllUsedSegments();
|
||||||
for (DataSegment segment : dataSegments) {
|
for (DataSegment segment : dataSegments) {
|
||||||
SegmentReplicaCount replicaCount = segmentReplicationStatus.getReplicaCountsInCluster(segment.getId());
|
SegmentReplicaCount replicaCount = segmentReplicationStatus.getReplicaCountsInCluster(segment.getId());
|
||||||
if (replicaCount != null && replicaCount.totalLoaded() > 0) {
|
if (replicaCount != null && (replicaCount.totalLoaded() > 0 || replicaCount.required() == 0)) {
|
||||||
datasourceToUnavailableSegments.addTo(segment.getDataSource(), 0);
|
datasourceToUnavailableSegments.addTo(segment.getDataSource(), 0);
|
||||||
} else {
|
} else {
|
||||||
datasourceToUnavailableSegments.addTo(segment.getDataSource(), 1);
|
datasourceToUnavailableSegments.addTo(segment.getDataSource(), 1);
|
||||||
|
|
|
@ -798,6 +798,113 @@ public class DruidCoordinatorTest extends CuratorTestBase
|
||||||
latch2.await();
|
latch2.await();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 60_000L)
|
||||||
|
public void testCoordinatorRun_queryFromDeepStorage() throws Exception
|
||||||
|
{
|
||||||
|
String dataSource = "dataSource1";
|
||||||
|
|
||||||
|
String coldTier = "coldTier";
|
||||||
|
String hotTier = "hotTier";
|
||||||
|
|
||||||
|
// Setup MetadataRuleManager
|
||||||
|
Rule intervalLoadRule = new IntervalLoadRule(Intervals.of("2010-02-01/P1M"), ImmutableMap.of(hotTier, 1), null);
|
||||||
|
Rule foreverLoadRule = new ForeverLoadRule(ImmutableMap.of(coldTier, 0), null);
|
||||||
|
EasyMock.expect(metadataRuleManager.getRulesWithDefault(EasyMock.anyString()))
|
||||||
|
.andReturn(ImmutableList.of(intervalLoadRule, foreverLoadRule)).atLeastOnce();
|
||||||
|
|
||||||
|
metadataRuleManager.stop();
|
||||||
|
EasyMock.expectLastCall().once();
|
||||||
|
|
||||||
|
EasyMock.replay(metadataRuleManager);
|
||||||
|
|
||||||
|
// Setup SegmentsMetadataManager
|
||||||
|
DruidDataSource[] dataSources = {
|
||||||
|
new DruidDataSource(dataSource, Collections.emptyMap())
|
||||||
|
|
||||||
|
};
|
||||||
|
final DataSegment dataSegment = new DataSegment(
|
||||||
|
dataSource,
|
||||||
|
Intervals.of("2010-01-01/P1D"),
|
||||||
|
"v1",
|
||||||
|
null,
|
||||||
|
null,
|
||||||
|
null,
|
||||||
|
null,
|
||||||
|
0x9,
|
||||||
|
0
|
||||||
|
);
|
||||||
|
final DataSegment dataSegmentHot = new DataSegment(
|
||||||
|
dataSource,
|
||||||
|
Intervals.of("2010-02-01/P1D"),
|
||||||
|
"v1",
|
||||||
|
null,
|
||||||
|
null,
|
||||||
|
null,
|
||||||
|
null,
|
||||||
|
0x9,
|
||||||
|
0
|
||||||
|
);
|
||||||
|
dataSources[0].addSegment(dataSegment).addSegment(dataSegmentHot);
|
||||||
|
|
||||||
|
setupSegmentsMetadataMock(dataSources[0]);
|
||||||
|
ImmutableDruidDataSource immutableDruidDataSource = EasyMock.createNiceMock(ImmutableDruidDataSource.class);
|
||||||
|
EasyMock.expect(immutableDruidDataSource.getSegments())
|
||||||
|
.andReturn(ImmutableSet.of(dataSegment, dataSegmentHot)).atLeastOnce();
|
||||||
|
EasyMock.replay(immutableDruidDataSource);
|
||||||
|
|
||||||
|
// Setup ServerInventoryView
|
||||||
|
druidServer = new DruidServer("server1", "localhost", null, 5L, ServerType.HISTORICAL, hotTier, 0);
|
||||||
|
DruidServer druidServer2 = new DruidServer("server2", "localhost", null, 5L, ServerType.HISTORICAL, coldTier, 0);
|
||||||
|
setupPeons(ImmutableMap.of("server1", loadQueuePeon, "server2", loadQueuePeon));
|
||||||
|
EasyMock.expect(serverInventoryView.getInventory()).andReturn(
|
||||||
|
ImmutableList.of(druidServer, druidServer2)
|
||||||
|
).atLeastOnce();
|
||||||
|
EasyMock.expect(serverInventoryView.isStarted()).andReturn(true).anyTimes();
|
||||||
|
EasyMock.replay(serverInventoryView, loadQueueTaskMaster);
|
||||||
|
|
||||||
|
coordinator.start();
|
||||||
|
|
||||||
|
// Wait for this coordinator to become leader
|
||||||
|
leaderAnnouncerLatch.await();
|
||||||
|
|
||||||
|
// This coordinator should be leader by now
|
||||||
|
Assert.assertTrue(coordinator.isLeader());
|
||||||
|
Assert.assertEquals(druidNode.getHostAndPort(), coordinator.getCurrentLeader());
|
||||||
|
pathChildrenCache.start();
|
||||||
|
|
||||||
|
final CountDownLatch coordinatorRunLatch = new CountDownLatch(2);
|
||||||
|
serviceEmitter.latch = coordinatorRunLatch;
|
||||||
|
coordinatorRunLatch.await();
|
||||||
|
|
||||||
|
Object2IntMap<String> numsUnavailableUsedSegmentsPerDataSource =
|
||||||
|
coordinator.getDatasourceToUnavailableSegmentCount();
|
||||||
|
Assert.assertEquals(1, numsUnavailableUsedSegmentsPerDataSource.size());
|
||||||
|
// The cold tier segment should not be unavailable, the hot one should be unavailable
|
||||||
|
Assert.assertEquals(1, numsUnavailableUsedSegmentsPerDataSource.getInt(dataSource));
|
||||||
|
|
||||||
|
Map<String, Object2LongMap<String>> underReplicationCountsPerDataSourcePerTier =
|
||||||
|
coordinator.getTierToDatasourceToUnderReplicatedCount(false);
|
||||||
|
Assert.assertNotNull(underReplicationCountsPerDataSourcePerTier);
|
||||||
|
Assert.assertEquals(2, underReplicationCountsPerDataSourcePerTier.size());
|
||||||
|
|
||||||
|
Object2LongMap<String> underRepliicationCountsPerDataSourceHotTier = underReplicationCountsPerDataSourcePerTier.get(hotTier);
|
||||||
|
Assert.assertNotNull(underRepliicationCountsPerDataSourceHotTier);
|
||||||
|
Assert.assertEquals(1, underRepliicationCountsPerDataSourceHotTier.getLong(dataSource));
|
||||||
|
|
||||||
|
Object2LongMap<String> underRepliicationCountsPerDataSourceColdTier = underReplicationCountsPerDataSourcePerTier.get(coldTier);
|
||||||
|
Assert.assertNotNull(underRepliicationCountsPerDataSourceColdTier);
|
||||||
|
Assert.assertEquals(0, underRepliicationCountsPerDataSourceColdTier.getLong(dataSource));
|
||||||
|
|
||||||
|
coordinator.stop();
|
||||||
|
leaderUnannouncerLatch.await();
|
||||||
|
|
||||||
|
Assert.assertFalse(coordinator.isLeader());
|
||||||
|
Assert.assertNull(coordinator.getCurrentLeader());
|
||||||
|
|
||||||
|
EasyMock.verify(serverInventoryView);
|
||||||
|
EasyMock.verify(metadataRuleManager);
|
||||||
|
}
|
||||||
|
|
||||||
private CountDownLatch createCountDownLatchAndSetPathChildrenCacheListenerWithLatch(
|
private CountDownLatch createCountDownLatchAndSetPathChildrenCacheListenerWithLatch(
|
||||||
int latchCount,
|
int latchCount,
|
||||||
PathChildrenCache pathChildrenCache,
|
PathChildrenCache pathChildrenCache,
|
||||||
|
|
Loading…
Reference in New Issue