Add Broker config `druid.broker.segment.watchRealtimeNodes` (#11732)

The new config is an extension of the concept of "watchedTiers" where
the Broker can choose to add the info of only the specified tiers to its timeline.
Similarly, with this config, Broker can choose to skip the realtime nodes and
thus it would query only Historical processes for any given segment.
This commit is contained in:
Kashif Faraz 2021-11-02 12:38:42 +05:30 committed by GitHub
parent 5e1dc843d1
commit a22687ecbe
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 90 additions and 14 deletions

View File

@ -1855,6 +1855,7 @@ See [cache configuration](#cache-configuration) for how to configure cache setti
|`druid.broker.segment.watchedTiers`|List of strings|The Broker watches segment announcements from processes that serve segments to build a cache to relate each process to the segments it serves. This configuration allows the Broker to only consider segments being served from a list of tiers. By default, Broker considers all tiers. This can be used to partition your dataSources in specific Historical tiers and configure brokers in partitions so that they are only queryable for specific dataSources. This config is mutually exclusive from `druid.broker.segment.ignoredTiers` and at most one of these can be configured on a Broker.|none| |`druid.broker.segment.watchedTiers`|List of strings|The Broker watches segment announcements from processes that serve segments to build a cache to relate each process to the segments it serves. This configuration allows the Broker to only consider segments being served from a list of tiers. By default, Broker considers all tiers. This can be used to partition your dataSources in specific Historical tiers and configure brokers in partitions so that they are only queryable for specific dataSources. This config is mutually exclusive from `druid.broker.segment.ignoredTiers` and at most one of these can be configured on a Broker.|none|
|`druid.broker.segment.ignoredTiers`|List of strings|The Broker watches segment announcements from processes that serve segments to build a cache to relate each process to the segments it serves. This configuration allows the Broker to ignore the segments being served from a list of tiers. By default, Broker considers all tiers. This config is mutually exclusive from `druid.broker.segment.watchedTiers` and at most one of these can be configured on a Broker.|none| |`druid.broker.segment.ignoredTiers`|List of strings|The Broker watches segment announcements from processes that serve segments to build a cache to relate each process to the segments it serves. This configuration allows the Broker to ignore the segments being served from a list of tiers. By default, Broker considers all tiers. This config is mutually exclusive from `druid.broker.segment.watchedTiers` and at most one of these can be configured on a Broker.|none|
|`druid.broker.segment.watchedDataSources`|List of strings|Broker watches the segment announcements from processes serving segments to build cache of which process is serving which segments, this configuration allows to only consider segments being served from a whitelist of dataSources. By default, Broker would consider all datasources. This can be used to configure brokers in partitions so that they are only queryable for specific dataSources.|none| |`druid.broker.segment.watchedDataSources`|List of strings|Broker watches the segment announcements from processes serving segments to build cache of which process is serving which segments, this configuration allows to only consider segments being served from a whitelist of dataSources. By default, Broker would consider all datasources. This can be used to configure brokers in partitions so that they are only queryable for specific dataSources.|none|
|`druid.broker.segment.watchRealtimeTasks`|Boolean|The Broker watches segment announcements from processes that serve segments to build a cache to relate each process to the segments it serves. When `watchRealtimeTasks` is true, the Broker watches for segment announcements from both Historicals and realtime processes. To configure a broker to exclude segments served by realtime processes, set `watchRealtimeTasks` to false. |true|
|`druid.broker.segment.awaitInitializationOnStart`|Boolean|Whether the Broker will wait for its view of segments to fully initialize before starting up. If set to 'true', the Broker's HTTP server will not start up, and the Broker will not announce itself as available, until the server view is initialized. See also `druid.sql.planner.awaitInitializationOnStart`, a related setting.|true| |`druid.broker.segment.awaitInitializationOnStart`|Boolean|Whether the Broker will wait for its view of segments to fully initialize before starting up. If set to 'true', the Broker's HTTP server will not start up, and the Broker will not announce itself as available, until the server view is initialized. See also `druid.sql.planner.awaitInitializationOnStart`, a related setting.|true|
## Cache Configuration ## Cache Configuration

View File

@ -36,6 +36,9 @@ public class BrokerSegmentWatcherConfig
@JsonProperty @JsonProperty
private Set<String> watchedDataSources = null; private Set<String> watchedDataSources = null;
@JsonProperty
private boolean watchRealtimeTasks = true;
@JsonProperty @JsonProperty
private boolean awaitInitializationOnStart = true; private boolean awaitInitializationOnStart = true;
@ -54,6 +57,11 @@ public class BrokerSegmentWatcherConfig
return watchedDataSources; return watchedDataSources;
} }
public boolean isWatchRealtimeTasks()
{
return watchRealtimeTasks;
}
public boolean isAwaitInitializationOnStart() public boolean isAwaitInitializationOnStart()
{ {
return awaitInitializationOnStart; return awaitInitializationOnStart;

View File

@ -115,6 +115,7 @@ public class BrokerServerView implements TimelineServerView
this.segmentWatcherConfig = segmentWatcherConfig; this.segmentWatcherConfig = segmentWatcherConfig;
this.segmentFilter = (Pair<DruidServerMetadata, DataSegment> metadataAndSegment) -> { this.segmentFilter = (Pair<DruidServerMetadata, DataSegment> metadataAndSegment) -> {
// Include only watched tiers if specified // Include only watched tiers if specified
if (segmentWatcherConfig.getWatchedTiers() != null if (segmentWatcherConfig.getWatchedTiers() != null
&& !segmentWatcherConfig.getWatchedTiers().contains(metadataAndSegment.lhs.getTier())) { && !segmentWatcherConfig.getWatchedTiers().contains(metadataAndSegment.lhs.getTier())) {
@ -133,7 +134,9 @@ public class BrokerServerView implements TimelineServerView
return false; return false;
} }
return true; // Include realtime tasks only if they are watched
return metadataAndSegment.lhs.getType() != ServerType.INDEXER_EXECUTOR
|| segmentWatcherConfig.isWatchRealtimeTasks();
}; };
ExecutorService exec = Execs.singleThreaded("BrokerServerView-%s"); ExecutorService exec = Execs.singleThreaded("BrokerServerView-%s");
baseView.registerSegmentCallback( baseView.registerSegmentCallback(

View File

@ -45,10 +45,11 @@ public class BrokerSegmentWatcherConfigTest
); );
Assert.assertNull(config.getWatchedTiers()); Assert.assertNull(config.getWatchedTiers());
Assert.assertTrue(config.isWatchRealtimeTasks());
Assert.assertNull(config.getIgnoredTiers()); Assert.assertNull(config.getIgnoredTiers());
//non-defaults //non-defaults
json = "{ \"watchedTiers\": [\"t1\", \"t2\"], \"watchedDataSources\": [\"ds1\", \"ds2\"] }"; json = "{ \"watchedTiers\": [\"t1\", \"t2\"], \"watchedDataSources\": [\"ds1\", \"ds2\"], \"watchRealtimeTasks\": false }";
config = MAPPER.readValue( config = MAPPER.readValue(
MAPPER.writeValueAsString( MAPPER.writeValueAsString(
@ -60,6 +61,7 @@ public class BrokerSegmentWatcherConfigTest
Assert.assertEquals(ImmutableSet.of("t1", "t2"), config.getWatchedTiers()); Assert.assertEquals(ImmutableSet.of("t1", "t2"), config.getWatchedTiers());
Assert.assertNull(config.getIgnoredTiers()); Assert.assertNull(config.getIgnoredTiers());
Assert.assertEquals(ImmutableSet.of("ds1", "ds2"), config.getWatchedDataSources()); Assert.assertEquals(ImmutableSet.of("ds1", "ds2"), config.getWatchedDataSources());
Assert.assertFalse(config.isWatchRealtimeTasks());
// json with ignoredTiers // json with ignoredTiers
json = "{ \"ignoredTiers\": [\"t3\", \"t4\"], \"watchedDataSources\": [\"ds1\", \"ds2\"] }"; json = "{ \"ignoredTiers\": [\"t3\", \"t4\"], \"watchedDataSources\": [\"ds1\", \"ds2\"] }";
@ -74,5 +76,6 @@ public class BrokerSegmentWatcherConfigTest
Assert.assertNull(config.getWatchedTiers()); Assert.assertNull(config.getWatchedTiers());
Assert.assertEquals(ImmutableSet.of("t3", "t4"), config.getIgnoredTiers()); Assert.assertEquals(ImmutableSet.of("t3", "t4"), config.getIgnoredTiers());
Assert.assertEquals(ImmutableSet.of("ds1", "ds2"), config.getWatchedDataSources()); Assert.assertEquals(ImmutableSet.of("ds1", "ds2"), config.getWatchedDataSources());
Assert.assertTrue(config.isWatchRealtimeTasks());
} }
} }

View File

@ -332,7 +332,7 @@ public class BrokerServerViewTest extends CuratorTestBase
// Setup a Broker that watches only Tier 2 // Setup a Broker that watches only Tier 2
final String tier1 = "tier1"; final String tier1 = "tier1";
final String tier2 = "tier2"; final String tier2 = "tier2";
setupViews(Sets.newHashSet(tier2)); setupViews(Sets.newHashSet(tier2), null, true);
// Historical Tier 1 has segments 1 and 2, Tier 2 has segments 2 and 3 // Historical Tier 1 has segments 1 and 2, Tier 2 has segments 2 and 3
final DruidServer server11 = setupHistoricalServer(tier1, "localhost:1", 1); final DruidServer server11 = setupHistoricalServer(tier1, "localhost:1", 1);
@ -384,6 +384,66 @@ public class BrokerServerViewTest extends CuratorTestBase
Assert.assertEquals(Collections.singletonList(server21.getMetadata()), selector.getCandidates(2)); Assert.assertEquals(Collections.singletonList(server21.getMetadata()), selector.getCandidates(2));
} }
@Test
public void testRealtimeTasksNotWatched() throws Exception
{
segmentViewInitLatch = new CountDownLatch(1);
segmentAddedLatch = new CountDownLatch(4);
segmentRemovedLatch = new CountDownLatch(0);
// Setup a Broker that watches only Historicals
setupViews(null, null, false);
// Historical has segments 2 and 3, Realtime has segments 1 and 2
final DruidServer realtimeServer = setupDruidServer(ServerType.INDEXER_EXECUTOR, null, "realtime:1", 1);
final DruidServer historicalServer = setupHistoricalServer("tier1", "historical:2", 1);
final DataSegment segment1 = dataSegmentWithIntervalAndVersion("2020-01-01/P1D", "v1");
announceSegmentForServer(realtimeServer, segment1, zkPathsConfig, jsonMapper);
final DataSegment segment2 = dataSegmentWithIntervalAndVersion("2020-01-02/P1D", "v1");
announceSegmentForServer(realtimeServer, segment2, zkPathsConfig, jsonMapper);
announceSegmentForServer(historicalServer, segment2, zkPathsConfig, jsonMapper);
final DataSegment segment3 = dataSegmentWithIntervalAndVersion("2020-01-03/P1D", "v1");
announceSegmentForServer(historicalServer, segment3, zkPathsConfig, jsonMapper);
// Wait for the segments to be added
Assert.assertTrue(timing.forWaiting().awaitLatch(segmentViewInitLatch));
Assert.assertTrue(timing.forWaiting().awaitLatch(segmentAddedLatch));
// Get the timeline for the datasource
TimelineLookup<String, ServerSelector> timeline = brokerServerView.getTimeline(
DataSourceAnalysis.forDataSource(new TableDataSource(segment1.getDataSource()))
).get();
// Verify that the timeline has no entry for the interval of segment 1
Assert.assertTrue(timeline.lookup(segment1.getInterval()).isEmpty());
// Verify that there is one entry for the interval of segment 2
List<TimelineObjectHolder<String, ServerSelector>> timelineHolders =
timeline.lookup(segment2.getInterval());
Assert.assertEquals(1, timelineHolders.size());
TimelineObjectHolder<String, ServerSelector> timelineHolder = timelineHolders.get(0);
Assert.assertEquals(segment2.getInterval(), timelineHolder.getInterval());
Assert.assertEquals(segment2.getVersion(), timelineHolder.getVersion());
PartitionHolder<ServerSelector> partitionHolder = timelineHolder.getObject();
Assert.assertTrue(partitionHolder.isComplete());
Assert.assertEquals(1, Iterables.size(partitionHolder));
ServerSelector selector = (partitionHolder.iterator().next()).getObject();
Assert.assertFalse(selector.isEmpty());
Assert.assertEquals(segment2, selector.getSegment());
// Verify that the ServerSelector always picks the Historical server
for (int i = 0; i < 5; ++i) {
Assert.assertEquals(historicalServer, selector.pick(null).getServer());
}
Assert.assertEquals(Collections.singletonList(historicalServer.getMetadata()), selector.getCandidates(2));
}
@Test @Test
public void testIgnoredTiers() throws Exception public void testIgnoredTiers() throws Exception
{ {
@ -394,7 +454,7 @@ public class BrokerServerViewTest extends CuratorTestBase
// Setup a Broker that does not watch Tier 1 // Setup a Broker that does not watch Tier 1
final String tier1 = "tier1"; final String tier1 = "tier1";
final String tier2 = "tier2"; final String tier2 = "tier2";
setupViews(null, Sets.newHashSet(tier1)); setupViews(null, Sets.newHashSet(tier1), false);
// Historical Tier 1 has segments 1 and 2, Tier 2 has segments 2 and 3 // Historical Tier 1 has segments 1 and 2, Tier 2 has segments 2 and 3
final DruidServer server11 = setupHistoricalServer(tier1, "localhost:1", 1); final DruidServer server11 = setupHistoricalServer(tier1, "localhost:1", 1);
@ -452,19 +512,19 @@ public class BrokerServerViewTest extends CuratorTestBase
// Verify that specifying both ignoredTiers and watchedTiers fails startup // Verify that specifying both ignoredTiers and watchedTiers fails startup
final String tier1 = "tier1"; final String tier1 = "tier1";
final String tier2 = "tier2"; final String tier2 = "tier2";
setupViews(Sets.newHashSet(tier2), Sets.newHashSet(tier1)); setupViews(Sets.newHashSet(tier2), Sets.newHashSet(tier1), true);
} }
@Test(expected = ISE.class) @Test(expected = ISE.class)
public void testEmptyWatchedTiersConfig() throws Exception public void testEmptyWatchedTiersConfig() throws Exception
{ {
setupViews(Collections.emptySet(), null); setupViews(Collections.emptySet(), null, true);
} }
@Test(expected = ISE.class) @Test(expected = ISE.class)
public void testEmptyIgnoredTiersConfig() throws Exception public void testEmptyIgnoredTiersConfig() throws Exception
{ {
setupViews(null, Collections.emptySet()); setupViews(null, Collections.emptySet(), true);
} }
/** /**
@ -530,15 +590,10 @@ public class BrokerServerViewTest extends CuratorTestBase
private void setupViews() throws Exception private void setupViews() throws Exception
{ {
setupViews(null); setupViews(null, null, true);
} }
private void setupViews(Set<String> watchedTiers) throws Exception private void setupViews(Set<String> watchedTiers, Set<String> ignoredTiers, boolean watchRealtimeTasks) throws Exception
{
setupViews(watchedTiers, null);
}
private void setupViews(Set<String> watchedTiers, Set<String> ignoredTiers) throws Exception
{ {
baseView = new BatchServerInventoryView( baseView = new BatchServerInventoryView(
zkPathsConfig, zkPathsConfig,
@ -598,6 +653,12 @@ public class BrokerServerViewTest extends CuratorTestBase
return watchedTiers; return watchedTiers;
} }
@Override
public boolean isWatchRealtimeTasks()
{
return watchRealtimeTasks;
}
@Override @Override
public Set<String> getIgnoredTiers() public Set<String> getIgnoredTiers()
{ {