From a22687ecbebe79196f6a246cb62d2ddb19bbb7a3 Mon Sep 17 00:00:00 2001 From: Kashif Faraz Date: Tue, 2 Nov 2021 12:38:42 +0530 Subject: [PATCH] Add Broker config `druid.broker.segment.watchRealtimeNodes` (#11732) The new config is an extension of the concept of "watchedTiers" where the Broker can choose to add the info of only the specified tiers to its timeline. Similarly, with this config, Broker can choose to skip the realtime nodes and thus it would query only Historical processes for any given segment. --- docs/configuration/index.md | 1 + .../client/BrokerSegmentWatcherConfig.java | 8 ++ .../apache/druid/client/BrokerServerView.java | 5 +- .../BrokerSegmentWatcherConfigTest.java | 5 +- .../druid/client/BrokerServerViewTest.java | 85 ++++++++++++++++--- 5 files changed, 90 insertions(+), 14 deletions(-) diff --git a/docs/configuration/index.md b/docs/configuration/index.md index c3eb5761c16..1eba2dc898d 100644 --- a/docs/configuration/index.md +++ b/docs/configuration/index.md @@ -1855,6 +1855,7 @@ See [cache configuration](#cache-configuration) for how to configure cache setti |`druid.broker.segment.watchedTiers`|List of strings|The Broker watches segment announcements from processes that serve segments to build a cache to relate each process to the segments it serves. This configuration allows the Broker to only consider segments being served from a list of tiers. By default, Broker considers all tiers. This can be used to partition your dataSources in specific Historical tiers and configure brokers in partitions so that they are only queryable for specific dataSources. This config is mutually exclusive from `druid.broker.segment.ignoredTiers` and at most one of these can be configured on a Broker.|none| |`druid.broker.segment.ignoredTiers`|List of strings|The Broker watches segment announcements from processes that serve segments to build a cache to relate each process to the segments it serves. This configuration allows the Broker to ignore the segments being served from a list of tiers. By default, Broker considers all tiers. This config is mutually exclusive from `druid.broker.segment.watchedTiers` and at most one of these can be configured on a Broker.|none| |`druid.broker.segment.watchedDataSources`|List of strings|Broker watches the segment announcements from processes serving segments to build cache of which process is serving which segments, this configuration allows to only consider segments being served from a whitelist of dataSources. By default, Broker would consider all datasources. This can be used to configure brokers in partitions so that they are only queryable for specific dataSources.|none| +|`druid.broker.segment.watchRealtimeTasks`|Boolean|The Broker watches segment announcements from processes that serve segments to build a cache to relate each process to the segments it serves. When `watchRealtimeTasks` is true, the Broker watches for segment announcements from both Historicals and realtime processes. To configure a broker to exclude segments served by realtime processes, set `watchRealtimeTasks` to false. |true| |`druid.broker.segment.awaitInitializationOnStart`|Boolean|Whether the Broker will wait for its view of segments to fully initialize before starting up. If set to 'true', the Broker's HTTP server will not start up, and the Broker will not announce itself as available, until the server view is initialized. See also `druid.sql.planner.awaitInitializationOnStart`, a related setting.|true| ## Cache Configuration diff --git a/server/src/main/java/org/apache/druid/client/BrokerSegmentWatcherConfig.java b/server/src/main/java/org/apache/druid/client/BrokerSegmentWatcherConfig.java index c487c713f23..5f3dbd3abe7 100644 --- a/server/src/main/java/org/apache/druid/client/BrokerSegmentWatcherConfig.java +++ b/server/src/main/java/org/apache/druid/client/BrokerSegmentWatcherConfig.java @@ -36,6 +36,9 @@ public class BrokerSegmentWatcherConfig @JsonProperty private Set watchedDataSources = null; + @JsonProperty + private boolean watchRealtimeTasks = true; + @JsonProperty private boolean awaitInitializationOnStart = true; @@ -54,6 +57,11 @@ public class BrokerSegmentWatcherConfig return watchedDataSources; } + public boolean isWatchRealtimeTasks() + { + return watchRealtimeTasks; + } + public boolean isAwaitInitializationOnStart() { return awaitInitializationOnStart; diff --git a/server/src/main/java/org/apache/druid/client/BrokerServerView.java b/server/src/main/java/org/apache/druid/client/BrokerServerView.java index f4269157b6a..5fcd8cdc85f 100644 --- a/server/src/main/java/org/apache/druid/client/BrokerServerView.java +++ b/server/src/main/java/org/apache/druid/client/BrokerServerView.java @@ -115,6 +115,7 @@ public class BrokerServerView implements TimelineServerView this.segmentWatcherConfig = segmentWatcherConfig; this.segmentFilter = (Pair metadataAndSegment) -> { + // Include only watched tiers if specified if (segmentWatcherConfig.getWatchedTiers() != null && !segmentWatcherConfig.getWatchedTiers().contains(metadataAndSegment.lhs.getTier())) { @@ -133,7 +134,9 @@ public class BrokerServerView implements TimelineServerView return false; } - return true; + // Include realtime tasks only if they are watched + return metadataAndSegment.lhs.getType() != ServerType.INDEXER_EXECUTOR + || segmentWatcherConfig.isWatchRealtimeTasks(); }; ExecutorService exec = Execs.singleThreaded("BrokerServerView-%s"); baseView.registerSegmentCallback( diff --git a/server/src/test/java/org/apache/druid/client/BrokerSegmentWatcherConfigTest.java b/server/src/test/java/org/apache/druid/client/BrokerSegmentWatcherConfigTest.java index 8eab4629489..cc5398c85fa 100644 --- a/server/src/test/java/org/apache/druid/client/BrokerSegmentWatcherConfigTest.java +++ b/server/src/test/java/org/apache/druid/client/BrokerSegmentWatcherConfigTest.java @@ -45,10 +45,11 @@ public class BrokerSegmentWatcherConfigTest ); Assert.assertNull(config.getWatchedTiers()); + Assert.assertTrue(config.isWatchRealtimeTasks()); Assert.assertNull(config.getIgnoredTiers()); //non-defaults - json = "{ \"watchedTiers\": [\"t1\", \"t2\"], \"watchedDataSources\": [\"ds1\", \"ds2\"] }"; + json = "{ \"watchedTiers\": [\"t1\", \"t2\"], \"watchedDataSources\": [\"ds1\", \"ds2\"], \"watchRealtimeTasks\": false }"; config = MAPPER.readValue( MAPPER.writeValueAsString( @@ -60,6 +61,7 @@ public class BrokerSegmentWatcherConfigTest Assert.assertEquals(ImmutableSet.of("t1", "t2"), config.getWatchedTiers()); Assert.assertNull(config.getIgnoredTiers()); Assert.assertEquals(ImmutableSet.of("ds1", "ds2"), config.getWatchedDataSources()); + Assert.assertFalse(config.isWatchRealtimeTasks()); // json with ignoredTiers json = "{ \"ignoredTiers\": [\"t3\", \"t4\"], \"watchedDataSources\": [\"ds1\", \"ds2\"] }"; @@ -74,5 +76,6 @@ public class BrokerSegmentWatcherConfigTest Assert.assertNull(config.getWatchedTiers()); Assert.assertEquals(ImmutableSet.of("t3", "t4"), config.getIgnoredTiers()); Assert.assertEquals(ImmutableSet.of("ds1", "ds2"), config.getWatchedDataSources()); + Assert.assertTrue(config.isWatchRealtimeTasks()); } } diff --git a/server/src/test/java/org/apache/druid/client/BrokerServerViewTest.java b/server/src/test/java/org/apache/druid/client/BrokerServerViewTest.java index 6e93c372b34..0a0e6b83cac 100644 --- a/server/src/test/java/org/apache/druid/client/BrokerServerViewTest.java +++ b/server/src/test/java/org/apache/druid/client/BrokerServerViewTest.java @@ -332,7 +332,7 @@ public class BrokerServerViewTest extends CuratorTestBase // Setup a Broker that watches only Tier 2 final String tier1 = "tier1"; final String tier2 = "tier2"; - setupViews(Sets.newHashSet(tier2)); + setupViews(Sets.newHashSet(tier2), null, true); // Historical Tier 1 has segments 1 and 2, Tier 2 has segments 2 and 3 final DruidServer server11 = setupHistoricalServer(tier1, "localhost:1", 1); @@ -384,6 +384,66 @@ public class BrokerServerViewTest extends CuratorTestBase Assert.assertEquals(Collections.singletonList(server21.getMetadata()), selector.getCandidates(2)); } + @Test + public void testRealtimeTasksNotWatched() throws Exception + { + segmentViewInitLatch = new CountDownLatch(1); + segmentAddedLatch = new CountDownLatch(4); + segmentRemovedLatch = new CountDownLatch(0); + + // Setup a Broker that watches only Historicals + setupViews(null, null, false); + + // Historical has segments 2 and 3, Realtime has segments 1 and 2 + final DruidServer realtimeServer = setupDruidServer(ServerType.INDEXER_EXECUTOR, null, "realtime:1", 1); + final DruidServer historicalServer = setupHistoricalServer("tier1", "historical:2", 1); + + final DataSegment segment1 = dataSegmentWithIntervalAndVersion("2020-01-01/P1D", "v1"); + announceSegmentForServer(realtimeServer, segment1, zkPathsConfig, jsonMapper); + + final DataSegment segment2 = dataSegmentWithIntervalAndVersion("2020-01-02/P1D", "v1"); + announceSegmentForServer(realtimeServer, segment2, zkPathsConfig, jsonMapper); + announceSegmentForServer(historicalServer, segment2, zkPathsConfig, jsonMapper); + + final DataSegment segment3 = dataSegmentWithIntervalAndVersion("2020-01-03/P1D", "v1"); + announceSegmentForServer(historicalServer, segment3, zkPathsConfig, jsonMapper); + + // Wait for the segments to be added + Assert.assertTrue(timing.forWaiting().awaitLatch(segmentViewInitLatch)); + Assert.assertTrue(timing.forWaiting().awaitLatch(segmentAddedLatch)); + + // Get the timeline for the datasource + TimelineLookup timeline = brokerServerView.getTimeline( + DataSourceAnalysis.forDataSource(new TableDataSource(segment1.getDataSource())) + ).get(); + + // Verify that the timeline has no entry for the interval of segment 1 + Assert.assertTrue(timeline.lookup(segment1.getInterval()).isEmpty()); + + // Verify that there is one entry for the interval of segment 2 + List> timelineHolders = + timeline.lookup(segment2.getInterval()); + Assert.assertEquals(1, timelineHolders.size()); + + TimelineObjectHolder timelineHolder = timelineHolders.get(0); + Assert.assertEquals(segment2.getInterval(), timelineHolder.getInterval()); + Assert.assertEquals(segment2.getVersion(), timelineHolder.getVersion()); + + PartitionHolder partitionHolder = timelineHolder.getObject(); + Assert.assertTrue(partitionHolder.isComplete()); + Assert.assertEquals(1, Iterables.size(partitionHolder)); + + ServerSelector selector = (partitionHolder.iterator().next()).getObject(); + Assert.assertFalse(selector.isEmpty()); + Assert.assertEquals(segment2, selector.getSegment()); + + // Verify that the ServerSelector always picks the Historical server + for (int i = 0; i < 5; ++i) { + Assert.assertEquals(historicalServer, selector.pick(null).getServer()); + } + Assert.assertEquals(Collections.singletonList(historicalServer.getMetadata()), selector.getCandidates(2)); + } + @Test public void testIgnoredTiers() throws Exception { @@ -394,7 +454,7 @@ public class BrokerServerViewTest extends CuratorTestBase // Setup a Broker that does not watch Tier 1 final String tier1 = "tier1"; final String tier2 = "tier2"; - setupViews(null, Sets.newHashSet(tier1)); + setupViews(null, Sets.newHashSet(tier1), false); // Historical Tier 1 has segments 1 and 2, Tier 2 has segments 2 and 3 final DruidServer server11 = setupHistoricalServer(tier1, "localhost:1", 1); @@ -452,19 +512,19 @@ public class BrokerServerViewTest extends CuratorTestBase // Verify that specifying both ignoredTiers and watchedTiers fails startup final String tier1 = "tier1"; final String tier2 = "tier2"; - setupViews(Sets.newHashSet(tier2), Sets.newHashSet(tier1)); + setupViews(Sets.newHashSet(tier2), Sets.newHashSet(tier1), true); } @Test(expected = ISE.class) public void testEmptyWatchedTiersConfig() throws Exception { - setupViews(Collections.emptySet(), null); + setupViews(Collections.emptySet(), null, true); } @Test(expected = ISE.class) public void testEmptyIgnoredTiersConfig() throws Exception { - setupViews(null, Collections.emptySet()); + setupViews(null, Collections.emptySet(), true); } /** @@ -530,15 +590,10 @@ public class BrokerServerViewTest extends CuratorTestBase private void setupViews() throws Exception { - setupViews(null); + setupViews(null, null, true); } - private void setupViews(Set watchedTiers) throws Exception - { - setupViews(watchedTiers, null); - } - - private void setupViews(Set watchedTiers, Set ignoredTiers) throws Exception + private void setupViews(Set watchedTiers, Set ignoredTiers, boolean watchRealtimeTasks) throws Exception { baseView = new BatchServerInventoryView( zkPathsConfig, @@ -598,6 +653,12 @@ public class BrokerServerViewTest extends CuratorTestBase return watchedTiers; } + @Override + public boolean isWatchRealtimeTasks() + { + return watchRealtimeTasks; + } + @Override public Set getIgnoredTiers() {