Coordinator loadstatus API full format does not consider Broadcast rules (#10048)

* Coordinator loadstatus API full format does not consider Broadcast rules

* address comments

* fix checkstyle

* minor optimization

* address comments
This commit is contained in:
Maytas Monsereenusorn 2020-06-18 14:52:33 -10:00 committed by GitHub
parent b5e6569d2c
commit 857e5204bf
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 242 additions and 65 deletions

View File

@ -69,6 +69,7 @@ import org.apache.druid.server.coordinator.duty.LogUsedSegments;
import org.apache.druid.server.coordinator.duty.MarkAsUnusedOvershadowedSegments; import org.apache.druid.server.coordinator.duty.MarkAsUnusedOvershadowedSegments;
import org.apache.druid.server.coordinator.duty.RunRules; import org.apache.druid.server.coordinator.duty.RunRules;
import org.apache.druid.server.coordinator.duty.UnloadUnusedSegments; import org.apache.druid.server.coordinator.duty.UnloadUnusedSegments;
import org.apache.druid.server.coordinator.rules.BroadcastDistributionRule;
import org.apache.druid.server.coordinator.rules.LoadRule; import org.apache.druid.server.coordinator.rules.LoadRule;
import org.apache.druid.server.coordinator.rules.Rule; import org.apache.druid.server.coordinator.rules.Rule;
import org.apache.druid.server.initialization.ZkPathsConfig; import org.apache.druid.server.initialization.ZkPathsConfig;
@ -262,6 +263,11 @@ public class DruidCoordinator
} }
/** /**
* segmentReplicantLookup use in this method could potentially be stale since it is only updated on coordinator runs.
* However, this is ok as long as the {@param dataSegments} is refreshed/latest as this would at least still ensure
* that the stale data in segmentReplicantLookup would be under counting replication levels,
* rather than potentially falsely reporting that everything is available.
*
* @return tier -> { dataSource -> underReplicationCount } map * @return tier -> { dataSource -> underReplicationCount } map
*/ */
public Map<String, Object2LongMap<String>> computeUnderReplicationCountsPerDataSourcePerTierForSegments( public Map<String, Object2LongMap<String>> computeUnderReplicationCountsPerDataSourcePerTierForSegments(
@ -269,6 +275,13 @@ public class DruidCoordinator
) )
{ {
final Map<String, Object2LongMap<String>> underReplicationCountsPerDataSourcePerTier = new HashMap<>(); final Map<String, Object2LongMap<String>> underReplicationCountsPerDataSourcePerTier = new HashMap<>();
final Set<String> decommissioningServers = getDynamicConfigs().getDecommissioningNodes();
final List<ImmutableDruidServer> broadcastTargetServers = serverInventoryView
.getInventory()
.stream()
.filter(druidServer -> druidServer.isSegmentBroadcastTarget() && !decommissioningServers.contains(druidServer.getHost()))
.map(DruidServer::toImmutableDruidServer)
.collect(Collectors.toList());
if (segmentReplicantLookup == null) { if (segmentReplicantLookup == null) {
return underReplicationCountsPerDataSourcePerTier; return underReplicationCountsPerDataSourcePerTier;
@ -280,20 +293,38 @@ public class DruidCoordinator
final List<Rule> rules = metadataRuleManager.getRulesWithDefault(segment.getDataSource()); final List<Rule> rules = metadataRuleManager.getRulesWithDefault(segment.getDataSource());
for (final Rule rule : rules) { for (final Rule rule : rules) {
if (!(rule instanceof LoadRule && rule.appliesTo(segment, now))) { if (!rule.appliesTo(segment, now)) {
continue; continue;
} }
((LoadRule) rule) if (rule instanceof LoadRule) {
.getTieredReplicants() ((LoadRule) rule)
.forEach((final String tier, final Integer ruleReplicants) -> { .getTieredReplicants()
int currentReplicants = segmentReplicantLookup.getLoadedReplicants(segment.getId(), tier); .forEach((final String tier, final Integer ruleReplicants) -> {
Object2LongMap<String> underReplicationPerDataSource = underReplicationCountsPerDataSourcePerTier int currentReplicants = segmentReplicantLookup.getLoadedReplicants(segment.getId(), tier);
.computeIfAbsent(tier, ignored -> new Object2LongOpenHashMap<>()); Object2LongMap<String> underReplicationPerDataSource = underReplicationCountsPerDataSourcePerTier
.computeIfAbsent(tier, ignored -> new Object2LongOpenHashMap<>());
((Object2LongOpenHashMap<String>) underReplicationPerDataSource)
.addTo(segment.getDataSource(), Math.max(ruleReplicants - currentReplicants, 0));
});
}
if (rule instanceof BroadcastDistributionRule) {
for (ImmutableDruidServer server : broadcastTargetServers) {
Object2LongMap<String> underReplicationPerDataSource = underReplicationCountsPerDataSourcePerTier
.computeIfAbsent(server.getTier(), ignored -> new Object2LongOpenHashMap<>());
if (server.getSegment(segment.getId()) == null) {
((Object2LongOpenHashMap<String>) underReplicationPerDataSource) ((Object2LongOpenHashMap<String>) underReplicationPerDataSource)
.addTo(segment.getDataSource(), Math.max(ruleReplicants - currentReplicants, 0)); .addTo(segment.getDataSource(), 1);
}); } else {
break; // only the first matching rule applies // This make sure that every datasource has a entry even if the all segments are loaded
underReplicationPerDataSource.putIfAbsent(segment.getDataSource(), 0);
}
}
}
// only the first matching rule applies
break;
} }
} }

View File

@ -28,8 +28,6 @@ import it.unimi.dsi.fastutil.objects.Object2LongMap;
import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.PathChildrenCache; import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent; import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.curator.utils.ZKPaths;
import org.apache.druid.client.DataSourcesSnapshot; import org.apache.druid.client.DataSourcesSnapshot;
import org.apache.druid.client.DruidDataSource; import org.apache.druid.client.DruidDataSource;
import org.apache.druid.client.DruidServer; import org.apache.druid.client.DruidServer;
@ -52,6 +50,7 @@ import org.apache.druid.metadata.SegmentsMetadataManager;
import org.apache.druid.server.DruidNode; import org.apache.druid.server.DruidNode;
import org.apache.druid.server.coordination.DruidServerMetadata; import org.apache.druid.server.coordination.DruidServerMetadata;
import org.apache.druid.server.coordination.ServerType; import org.apache.druid.server.coordination.ServerType;
import org.apache.druid.server.coordinator.rules.ForeverBroadcastDistributionRule;
import org.apache.druid.server.coordinator.rules.ForeverLoadRule; import org.apache.druid.server.coordinator.rules.ForeverLoadRule;
import org.apache.druid.server.coordinator.rules.IntervalLoadRule; import org.apache.druid.server.coordinator.rules.IntervalLoadRule;
import org.apache.druid.server.coordinator.rules.Rule; import org.apache.druid.server.coordinator.rules.Rule;
@ -104,10 +103,12 @@ public class DruidCoordinatorTest extends CuratorTestBase
private ObjectMapper objectMapper; private ObjectMapper objectMapper;
private DruidNode druidNode; private DruidNode druidNode;
private LatchableServiceEmitter serviceEmitter = new LatchableServiceEmitter(); private LatchableServiceEmitter serviceEmitter = new LatchableServiceEmitter();
private boolean serverAddedCountExpected = true;
@Before @Before
public void setUp() throws Exception public void setUp() throws Exception
{ {
serverAddedCountExpected = true;
druidServer = EasyMock.createMock(DruidServer.class); druidServer = EasyMock.createMock(DruidServer.class);
serverInventoryView = EasyMock.createMock(SingleServerInventoryView.class); serverInventoryView = EasyMock.createMock(SingleServerInventoryView.class);
segmentsMetadataManager = EasyMock.createNiceMock(SegmentsMetadataManager.class); segmentsMetadataManager = EasyMock.createNiceMock(SegmentsMetadataManager.class);
@ -375,37 +376,22 @@ public class DruidCoordinatorTest extends CuratorTestBase
// This coordinator should be leader by now // This coordinator should be leader by now
Assert.assertTrue(coordinator.isLeader()); Assert.assertTrue(coordinator.isLeader());
Assert.assertEquals(druidNode.getHostAndPort(), coordinator.getCurrentLeader()); Assert.assertEquals(druidNode.getHostAndPort(), coordinator.getCurrentLeader());
final CountDownLatch assignSegmentLatch = new CountDownLatch(1);
pathChildrenCache.getListenable().addListener(
new PathChildrenCacheListener()
{
@Override
public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent event)
{
if (CuratorUtils.isChildAdded(event)) {
if (assignSegmentLatch.getCount() > 0) {
//Coordinator should try to assign segment to druidServer historical
//Simulate historical loading segment
druidServer.addDataSegment(dataSegment);
assignSegmentLatch.countDown();
} else {
Assert.fail("The same segment is assigned to the same server multiple times");
}
}
}
}
);
pathChildrenCache.start(); pathChildrenCache.start();
final CountDownLatch assignSegmentLatch = createCountDownLatchAndSetPathChildrenCacheListenerWithLatch(
1,
pathChildrenCache,
ImmutableMap.of("2010-01-01T00:00:00.000Z_2010-01-02T00:00:00.000Z", dataSegment),
druidServer
);
assignSegmentLatch.await(); assignSegmentLatch.await();
Assert.assertTrue(serverAddedCountExpected);
final CountDownLatch coordinatorRunLatch = new CountDownLatch(2); final CountDownLatch coordinatorRunLatch = new CountDownLatch(2);
serviceEmitter.latch = coordinatorRunLatch; serviceEmitter.latch = coordinatorRunLatch;
coordinatorRunLatch.await(); coordinatorRunLatch.await();
Assert.assertEquals(ImmutableMap.of(dataSource, 100.0), coordinator.getLoadStatus()); Assert.assertEquals(ImmutableMap.of(dataSource, 100.0), coordinator.getLoadStatus());
curator.delete().guaranteed().forPath(ZKPaths.makePath(LOADPATH, dataSegment.getId().toString()));
Object2IntMap<String> numsUnavailableUsedSegmentsPerDataSource = Object2IntMap<String> numsUnavailableUsedSegmentsPerDataSource =
coordinator.computeNumsUnavailableUsedSegmentsPerDataSource(); coordinator.computeNumsUnavailableUsedSegmentsPerDataSource();
@ -496,39 +482,11 @@ public class DruidCoordinatorTest extends CuratorTestBase
coordinator.start(); coordinator.start();
leaderAnnouncerLatch.await(); // Wait for this coordinator to become leader leaderAnnouncerLatch.await(); // Wait for this coordinator to become leader
final CountDownLatch assignSegmentLatchHot = new CountDownLatch(2); final CountDownLatch assignSegmentLatchHot = createCountDownLatchAndSetPathChildrenCacheListenerWithLatch(2, pathChildrenCache, dataSegments, hotServer);
pathChildrenCache.getListenable().addListener( final CountDownLatch assignSegmentLatchCold = createCountDownLatchAndSetPathChildrenCacheListenerWithLatch(1, pathChildrenCacheCold, dataSegments, coldServer);
(client, event) -> {
if (CuratorUtils.isChildAdded(event)) {
DataSegment segment = findSegmentRelatedToCuratorEvent(dataSegments, event);
if (segment != null) {
hotServer.addDataSegment(segment);
curator.delete().guaranteed().forPath(event.getData().getPath());
}
assignSegmentLatchHot.countDown();
}
}
);
final CountDownLatch assignSegmentLatchCold = new CountDownLatch(1);
pathChildrenCacheCold.getListenable().addListener(
(CuratorFramework client, PathChildrenCacheEvent event) -> {
if (CuratorUtils.isChildAdded(event)) {
DataSegment segment = findSegmentRelatedToCuratorEvent(dataSegments, event);
if (segment != null) {
coldServer.addDataSegment(segment);
curator.delete().guaranteed().forPath(event.getData().getPath());
}
assignSegmentLatchCold.countDown();
}
}
);
assignSegmentLatchHot.await(); assignSegmentLatchHot.await();
assignSegmentLatchCold.await(); assignSegmentLatchCold.await();
Assert.assertTrue(serverAddedCountExpected);
final CountDownLatch coordinatorRunLatch = new CountDownLatch(2); final CountDownLatch coordinatorRunLatch = new CountDownLatch(2);
serviceEmitter.latch = coordinatorRunLatch; serviceEmitter.latch = coordinatorRunLatch;
@ -550,6 +508,194 @@ public class DruidCoordinatorTest extends CuratorTestBase
EasyMock.verify(metadataRuleManager); EasyMock.verify(metadataRuleManager);
} }
@Test(timeout = 60_000L)
public void testComputeUnderReplicationCountsPerDataSourcePerTierForSegmentsWithBroadcastRule() throws Exception
{
final String dataSource = "dataSource";
final String hotTierName = "hot";
final String coldTierName = "cold";
final String tierName1 = "tier1";
final String tierName2 = "tier2";
final Rule broadcastDistributionRule = new ForeverBroadcastDistributionRule();
final String loadPathCold = "/druid/loadqueue/cold:1234";
final String loadPathBroker1 = "/druid/loadqueue/broker1:1234";
final String loadPathBroker2 = "/druid/loadqueue/broker2:1234";
final String loadPathPeon = "/druid/loadqueue/peon:1234";
final DruidServer hotServer = new DruidServer("hot", "hot", null, 5L, ServerType.HISTORICAL, hotTierName, 0);
final DruidServer coldServer = new DruidServer("cold", "cold", null, 5L, ServerType.HISTORICAL, coldTierName, 0);
final DruidServer brokerServer1 = new DruidServer("broker1", "broker1", null, 5L, ServerType.BROKER, tierName1, 0);
final DruidServer brokerServer2 = new DruidServer("broker2", "broker2", null, 5L, ServerType.BROKER, tierName2, 0);
final DruidServer peonServer = new DruidServer("peon", "peon", null, 5L, ServerType.INDEXER_EXECUTOR, tierName2, 0);
final Map<String, DataSegment> dataSegments = ImmutableMap.of(
"2018-01-02T00:00:00.000Z_2018-01-03T00:00:00.000Z",
new DataSegment(dataSource, Intervals.of("2018-01-02/P1D"), "v1", null, null, null, null, 0x9, 0),
"2018-01-03T00:00:00.000Z_2018-01-04T00:00:00.000Z",
new DataSegment(dataSource, Intervals.of("2018-01-03/P1D"), "v1", null, null, null, null, 0x9, 0),
"2017-01-01T00:00:00.000Z_2017-01-02T00:00:00.000Z",
new DataSegment(dataSource, Intervals.of("2017-01-01/P1D"), "v1", null, null, null, null, 0x9, 0)
);
final LoadQueuePeon loadQueuePeonCold = new CuratorLoadQueuePeon(
curator,
loadPathCold,
objectMapper,
Execs.scheduledSingleThreaded("coordinator_test_load_queue_peon_cold_scheduled-%d"),
Execs.singleThreaded("coordinator_test_load_queue_peon_cold-%d"),
druidCoordinatorConfig
);
final LoadQueuePeon loadQueuePeonBroker1 = new CuratorLoadQueuePeon(
curator,
loadPathBroker1,
objectMapper,
Execs.scheduledSingleThreaded("coordinator_test_load_queue_peon_broker1_scheduled-%d"),
Execs.singleThreaded("coordinator_test_load_queue_peon_broker1-%d"),
druidCoordinatorConfig
);
final LoadQueuePeon loadQueuePeonBroker2 = new CuratorLoadQueuePeon(
curator,
loadPathBroker2,
objectMapper,
Execs.scheduledSingleThreaded("coordinator_test_load_queue_peon_broker2_scheduled-%d"),
Execs.singleThreaded("coordinator_test_load_queue_peon_broker2-%d"),
druidCoordinatorConfig
);
final LoadQueuePeon loadQueuePeonPoenServer = new CuratorLoadQueuePeon(
curator,
loadPathPeon,
objectMapper,
Execs.scheduledSingleThreaded("coordinator_test_load_queue_peon_peon_scheduled-%d"),
Execs.singleThreaded("coordinator_test_load_queue_peon_peon-%d"),
druidCoordinatorConfig
);
final PathChildrenCache pathChildrenCacheCold = new PathChildrenCache(
curator,
loadPathCold,
true,
true,
Execs.singleThreaded("coordinator_test_path_children_cache_cold-%d")
);
final PathChildrenCache pathChildrenCacheBroker1 = new PathChildrenCache(
curator,
loadPathBroker1,
true,
true,
Execs.singleThreaded("coordinator_test_path_children_cache_broker1-%d")
);
final PathChildrenCache pathChildrenCacheBroker2 = new PathChildrenCache(
curator,
loadPathBroker2,
true,
true,
Execs.singleThreaded("coordinator_test_path_children_cache_broker2-%d")
);
final PathChildrenCache pathChildrenCachePeon = new PathChildrenCache(
curator,
loadPathPeon,
true,
true,
Execs.singleThreaded("coordinator_test_path_children_cache_peon-%d")
);
loadManagementPeons.putAll(ImmutableMap.of("hot", loadQueuePeon,
"cold", loadQueuePeonCold,
"broker1", loadQueuePeonBroker1,
"broker2", loadQueuePeonBroker2,
"peon", loadQueuePeonPoenServer));
loadQueuePeonCold.start();
loadQueuePeonBroker1.start();
loadQueuePeonBroker2.start();
loadQueuePeonPoenServer.start();
pathChildrenCache.start();
pathChildrenCacheCold.start();
pathChildrenCacheBroker1.start();
pathChildrenCacheBroker2.start();
pathChildrenCachePeon.start();
DruidDataSource[] druidDataSources = {new DruidDataSource(dataSource, Collections.emptyMap())};
dataSegments.values().forEach(druidDataSources[0]::addSegment);
setupSegmentsMetadataMock(druidDataSources[0]);
EasyMock.expect(metadataRuleManager.getRulesWithDefault(EasyMock.anyString()))
.andReturn(ImmutableList.of(broadcastDistributionRule)).atLeastOnce();
EasyMock.expect(metadataRuleManager.getAllRules())
.andReturn(ImmutableMap.of(dataSource, ImmutableList.of(broadcastDistributionRule))).atLeastOnce();
EasyMock.expect(serverInventoryView.getInventory())
.andReturn(ImmutableList.of(hotServer, coldServer, brokerServer1, brokerServer2, peonServer))
.atLeastOnce();
EasyMock.expect(serverInventoryView.isStarted()).andReturn(true).anyTimes();
EasyMock.replay(metadataRuleManager, serverInventoryView);
coordinator.start();
leaderAnnouncerLatch.await(); // Wait for this coordinator to become leader
final CountDownLatch assignSegmentLatchHot = createCountDownLatchAndSetPathChildrenCacheListenerWithLatch(3, pathChildrenCache, dataSegments, hotServer);
final CountDownLatch assignSegmentLatchCold = createCountDownLatchAndSetPathChildrenCacheListenerWithLatch(3, pathChildrenCacheCold, dataSegments, coldServer);
final CountDownLatch assignSegmentLatchBroker1 = createCountDownLatchAndSetPathChildrenCacheListenerWithLatch(3, pathChildrenCacheBroker1, dataSegments, brokerServer1);
final CountDownLatch assignSegmentLatchBroker2 = createCountDownLatchAndSetPathChildrenCacheListenerWithLatch(3, pathChildrenCacheBroker2, dataSegments, brokerServer2);
final CountDownLatch assignSegmentLatchPeon = createCountDownLatchAndSetPathChildrenCacheListenerWithLatch(3, pathChildrenCachePeon, dataSegments, peonServer);
assignSegmentLatchHot.await();
assignSegmentLatchCold.await();
assignSegmentLatchBroker1.await();
assignSegmentLatchBroker2.await();
assignSegmentLatchPeon.await();
Assert.assertTrue(serverAddedCountExpected);
final CountDownLatch coordinatorRunLatch = new CountDownLatch(2);
serviceEmitter.latch = coordinatorRunLatch;
coordinatorRunLatch.await();
Assert.assertEquals(ImmutableMap.of(dataSource, 100.0), coordinator.getLoadStatus());
Map<String, Object2LongMap<String>> underReplicationCountsPerDataSourcePerTier =
coordinator.computeUnderReplicationCountsPerDataSourcePerTier();
Assert.assertEquals(4, underReplicationCountsPerDataSourcePerTier.size());
Assert.assertEquals(0L, underReplicationCountsPerDataSourcePerTier.get(hotTierName).getLong(dataSource));
Assert.assertEquals(0L, underReplicationCountsPerDataSourcePerTier.get(coldTierName).getLong(dataSource));
Assert.assertEquals(0L, underReplicationCountsPerDataSourcePerTier.get(tierName1).getLong(dataSource));
Assert.assertEquals(0L, underReplicationCountsPerDataSourcePerTier.get(tierName2).getLong(dataSource));
coordinator.stop();
leaderUnannouncerLatch.await();
EasyMock.verify(serverInventoryView);
EasyMock.verify(segmentsMetadataManager);
EasyMock.verify(metadataRuleManager);
}
private CountDownLatch createCountDownLatchAndSetPathChildrenCacheListenerWithLatch(int latchCount,
PathChildrenCache pathChildrenCache,
Map<String, DataSegment> segments,
DruidServer server)
{
final CountDownLatch countDownLatch = new CountDownLatch(latchCount);
pathChildrenCache.getListenable().addListener(
(CuratorFramework client, PathChildrenCacheEvent event) -> {
if (CuratorUtils.isChildAdded(event)) {
if (countDownLatch.getCount() > 0) {
DataSegment segment = findSegmentRelatedToCuratorEvent(segments, event);
if (segment != null) {
server.addDataSegment(segment);
curator.delete().guaranteed().forPath(event.getData().getPath());
}
countDownLatch.countDown();
} else {
// The segment is assigned to the server more times than expected
serverAddedCountExpected = false;
}
}
}
);
return countDownLatch;
}
private void setupSegmentsMetadataMock(DruidDataSource dataSource) private void setupSegmentsMetadataMock(DruidDataSource dataSource)
{ {
EasyMock.expect(segmentsMetadataManager.isPollingDatabasePeriodically()).andReturn(true).anyTimes(); EasyMock.expect(segmentsMetadataManager.isPollingDatabasePeriodically()).andReturn(true).anyTimes();