Add Broker config `druid.broker.segment.ignoredTiers` (#11766)

The new config is an extension of the concept of "watchedTiers" where
the Broker can choose to add the info of only the specified tiers to its timeline.
Similarly, with this config, Broker can choose to ignore the segments being served
by the specified historical tiers. By default, no tier is ignored.

This config is useful when you want a completely isolated tier amongst many other tiers.

Say there are several tiers of historicals Tier T1, Tier T2 ... Tier Tn
and there are several brokers Broker B1, Broker B2 .... Broker Bm

If we want only Broker B1 to query Tier T1, instead of setting a long list of watchedTiers
on each of the other Brokers B2 ... Bm, we could just set druid.broker.segment.ignoredTiers=["T1"]
for these Brokers, while Broker B1 could have druid.broker.segment.watchedTiers=["T1"]
This commit is contained in:
Kashif Faraz 2021-10-06 10:06:32 +05:30 committed by GitHub
parent 104c9a07f0
commit b688db790b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 173 additions and 6 deletions

View File

@ -1834,7 +1834,8 @@ See [cache configuration](#cache-configuration) for how to configure cache setti
|Property|Possible Values|Description|Default|
|--------|---------------|-----------|-------|
|`druid.serverview.type`|batch or http|Segment discovery method to use. "http" enables discovering segments using HTTP instead of zookeeper.|batch|
|`druid.broker.segment.watchedTiers`|List of strings|Broker watches the segment announcements from processes serving segments to build cache of which process is serving which segments, this configuration allows to only consider segments being served from a whitelist of tiers. By default, Broker would consider all tiers. This can be used to partition your dataSources in specific Historical tiers and configure brokers in partitions so that they are only queryable for specific dataSources.|none|
|`druid.broker.segment.watchedTiers`|List of strings|The Broker watches segment announcements from processes that serve segments to build a cache to relate each process to the segments it serves. This configuration allows the Broker to only consider segments being served from a list of tiers. By default, Broker considers all tiers. This can be used to partition your dataSources in specific Historical tiers and configure brokers in partitions so that they are only queryable for specific dataSources. This config is mutually exclusive from `druid.broker.segment.ignoredTiers` and at most one of these can be configured on a Broker.|none|
|`druid.broker.segment.ignoredTiers`|List of strings|The Broker watches segment announcements from processes that serve segments to build a cache to relate each process to the segments it serves. This configuration allows the Broker to ignore the segments being served from a list of tiers. By default, Broker considers all tiers. This config is mutually exclusive from `druid.broker.segment.watchedTiers` and at most one of these can be configured on a Broker.|none|
|`druid.broker.segment.watchedDataSources`|List of strings|Broker watches the segment announcements from processes serving segments to build cache of which process is serving which segments, this configuration allows to only consider segments being served from a whitelist of dataSources. By default, Broker would consider all datasources. This can be used to configure brokers in partitions so that they are only queryable for specific dataSources.|none|
|`druid.broker.segment.awaitInitializationOnStart`|Boolean|Whether the Broker will wait for its view of segments to fully initialize before starting up. If set to 'true', the Broker's HTTP server will not start up, and the Broker will not announce itself as available, until the server view is initialized. See also `druid.sql.planner.awaitInitializationOnStart`, a related setting.|true|

View File

@ -30,6 +30,9 @@ public class BrokerSegmentWatcherConfig
@JsonProperty
private Set<String> watchedTiers = null;
@JsonProperty
private Set<String> ignoredTiers = null;
@JsonProperty
private Set<String> watchedDataSources = null;
@ -41,6 +44,11 @@ public class BrokerSegmentWatcherConfig
return watchedTiers;
}
public Set<String> getIgnoredTiers()
{
return ignoredTiers;
}
public Set<String> getWatchedDataSources()
{
return watchedDataSources;

View File

@ -106,17 +106,28 @@ public class BrokerServerView implements TimelineServerView
this.baseView = baseView;
this.tierSelectorStrategy = tierSelectorStrategy;
this.emitter = emitter;
this.segmentWatcherConfig = segmentWatcherConfig;
this.clients = new ConcurrentHashMap<>();
this.selectors = new HashMap<>();
this.timelines = new HashMap<>();
// Validate and set the segment watcher config
validateSegmentWatcherConfig(segmentWatcherConfig);
this.segmentWatcherConfig = segmentWatcherConfig;
this.segmentFilter = (Pair<DruidServerMetadata, DataSegment> metadataAndSegment) -> {
// Include only watched tiers if specified
if (segmentWatcherConfig.getWatchedTiers() != null
&& !segmentWatcherConfig.getWatchedTiers().contains(metadataAndSegment.lhs.getTier())) {
return false;
}
// Exclude ignored tiers if specified
if (segmentWatcherConfig.getIgnoredTiers() != null
&& segmentWatcherConfig.getIgnoredTiers().contains(metadataAndSegment.lhs.getTier())) {
return false;
}
// Include only watched datasources if specified
if (segmentWatcherConfig.getWatchedDataSources() != null
&& !segmentWatcherConfig.getWatchedDataSources().contains(metadataAndSegment.rhs.getDataSource())) {
return false;
@ -184,6 +195,35 @@ public class BrokerServerView implements TimelineServerView
initialized.await();
}
/**
* Validates the given BrokerSegmentWatcherConfig.
* <ul>
* <li>At most one of watchedTiers or ignoredTiers can be set</li>
* <li>If set, watchedTiers must be non-empty</li>
* <li>If set, ignoredTiers must be non-empty</li>
* </ul>
*/
private void validateSegmentWatcherConfig(BrokerSegmentWatcherConfig watcherConfig)
{
if (watcherConfig.getWatchedTiers() != null
&& watcherConfig.getIgnoredTiers() != null) {
throw new ISE(
"At most one of 'druid.broker.segment.watchedTiers' "
+ "and 'druid.broker.segment.ignoredTiers' can be configured."
);
}
if (watcherConfig.getWatchedTiers() != null
&& watcherConfig.getWatchedTiers().isEmpty()) {
throw new ISE("If configured, 'druid.broker.segment.watchedTiers' must be non-empty");
}
if (watcherConfig.getIgnoredTiers() != null
&& watcherConfig.getIgnoredTiers().isEmpty()) {
throw new ISE("If configured, 'druid.broker.segment.ignoredTiers' must be non-empty");
}
}
private QueryableDruidServer addServer(DruidServer server)
{
QueryableDruidServer retVal = new QueryableDruidServer<>(server, makeDirectClient(server));

View File

@ -45,6 +45,7 @@ public class BrokerSegmentWatcherConfigTest
);
Assert.assertNull(config.getWatchedTiers());
Assert.assertNull(config.getIgnoredTiers());
//non-defaults
json = "{ \"watchedTiers\": [\"t1\", \"t2\"], \"watchedDataSources\": [\"ds1\", \"ds2\"] }";
@ -57,7 +58,21 @@ public class BrokerSegmentWatcherConfigTest
);
Assert.assertEquals(ImmutableSet.of("t1", "t2"), config.getWatchedTiers());
Assert.assertNull(config.getIgnoredTiers());
Assert.assertEquals(ImmutableSet.of("ds1", "ds2"), config.getWatchedDataSources());
// json with ignoredTiers
json = "{ \"ignoredTiers\": [\"t3\", \"t4\"], \"watchedDataSources\": [\"ds1\", \"ds2\"] }";
config = MAPPER.readValue(
MAPPER.writeValueAsString(
MAPPER.readValue(json, BrokerSegmentWatcherConfig.class)
),
BrokerSegmentWatcherConfig.class
);
Assert.assertNull(config.getWatchedTiers());
Assert.assertEquals(ImmutableSet.of("t3", "t4"), config.getIgnoredTiers());
Assert.assertEquals(ImmutableSet.of("ds1", "ds2"), config.getWatchedDataSources());
}
}

View File

@ -33,6 +33,7 @@ import org.apache.druid.client.selector.RandomServerSelectorStrategy;
import org.apache.druid.client.selector.ServerSelector;
import org.apache.druid.curator.CuratorTestBase;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.http.client.HttpClient;
@ -383,22 +384,113 @@ public class BrokerServerViewTest extends CuratorTestBase
Assert.assertEquals(Collections.singletonList(server21.getMetadata()), selector.getCandidates(2));
}
@Test
public void testIgnoredTiers() throws Exception
{
segmentViewInitLatch = new CountDownLatch(1);
segmentAddedLatch = new CountDownLatch(4);
segmentRemovedLatch = new CountDownLatch(0);
// Setup a Broker that does not watch Tier 1
final String tier1 = "tier1";
final String tier2 = "tier2";
setupViews(null, Sets.newHashSet(tier1));
// Historical Tier 1 has segments 1 and 2, Tier 2 has segments 2 and 3
final DruidServer server11 = setupHistoricalServer(tier1, "localhost:1", 1);
final DruidServer server21 = setupHistoricalServer(tier2, "localhost:2", 1);
final DataSegment segment1 = dataSegmentWithIntervalAndVersion("2020-01-01/P1D", "v1");
announceSegmentForServer(server11, segment1, zkPathsConfig, jsonMapper);
final DataSegment segment2 = dataSegmentWithIntervalAndVersion("2020-01-02/P1D", "v1");
announceSegmentForServer(server11, segment2, zkPathsConfig, jsonMapper);
announceSegmentForServer(server21, segment2, zkPathsConfig, jsonMapper);
final DataSegment segment3 = dataSegmentWithIntervalAndVersion("2020-01-03/P1D", "v1");
announceSegmentForServer(server21, segment3, zkPathsConfig, jsonMapper);
// Wait for the segments to be added
Assert.assertTrue(timing.forWaiting().awaitLatch(segmentViewInitLatch));
Assert.assertTrue(timing.forWaiting().awaitLatch(segmentAddedLatch));
// Get the timeline for the datasource
TimelineLookup<String, ServerSelector> timeline = brokerServerView.getTimeline(
DataSourceAnalysis.forDataSource(new TableDataSource(segment1.getDataSource()))
).get();
// Verify that the timeline has no entry for the interval of segment 1
Assert.assertTrue(timeline.lookup(segment1.getInterval()).isEmpty());
// Verify that there is one entry for the interval of segment 2
List<TimelineObjectHolder<String, ServerSelector>> timelineHolders =
timeline.lookup(segment2.getInterval());
Assert.assertEquals(1, timelineHolders.size());
TimelineObjectHolder<String, ServerSelector> timelineHolder = timelineHolders.get(0);
Assert.assertEquals(segment2.getInterval(), timelineHolder.getInterval());
Assert.assertEquals(segment2.getVersion(), timelineHolder.getVersion());
PartitionHolder<ServerSelector> partitionHolder = timelineHolder.getObject();
Assert.assertTrue(partitionHolder.isComplete());
Assert.assertEquals(1, Iterables.size(partitionHolder));
ServerSelector selector = (partitionHolder.iterator().next()).getObject();
Assert.assertFalse(selector.isEmpty());
Assert.assertEquals(segment2, selector.getSegment());
// Verify that the ServerSelector always picks Tier 1
for (int i = 0; i < 5; ++i) {
Assert.assertEquals(server21, selector.pick(null).getServer());
}
Assert.assertEquals(Collections.singletonList(server21.getMetadata()), selector.getCandidates(2));
}
@Test(expected = ISE.class)
public void testInvalidWatchedTiersConfig() throws Exception
{
// Verify that specifying both ignoredTiers and watchedTiers fails startup
final String tier1 = "tier1";
final String tier2 = "tier2";
setupViews(Sets.newHashSet(tier2), Sets.newHashSet(tier1));
}
@Test(expected = ISE.class)
public void testEmptyWatchedTiersConfig() throws Exception
{
setupViews(Collections.emptySet(), null);
}
@Test(expected = ISE.class)
public void testEmptyIgnoredTiersConfig() throws Exception
{
setupViews(null, Collections.emptySet());
}
/**
* Creates a DruidServer of type HISTORICAL and sets up a ZNode for it.
*/
private DruidServer setupHistoricalServer(String tier, String name, int priority)
{
final DruidServer historical = new DruidServer(
return setupDruidServer(ServerType.HISTORICAL, tier, name, priority);
}
/**
* Creates a DruidServer of the specified type and sets up a ZNode for it.
*/
private DruidServer setupDruidServer(ServerType serverType, String tier, String name, int priority)
{
final DruidServer druidServer = new DruidServer(
name,
name,
null,
1000000,
ServerType.HISTORICAL,
serverType,
tier,
priority
);
setupZNodeForServer(historical, zkPathsConfig, jsonMapper);
return historical;
setupZNodeForServer(druidServer, zkPathsConfig, jsonMapper);
return druidServer;
}
private Pair<Interval, Pair<String, Pair<DruidServer, DataSegment>>> createExpected(
@ -442,6 +534,11 @@ public class BrokerServerViewTest extends CuratorTestBase
}
private void setupViews(Set<String> watchedTiers) throws Exception
{
setupViews(watchedTiers, null);
}
private void setupViews(Set<String> watchedTiers, Set<String> ignoredTiers) throws Exception
{
baseView = new BatchServerInventoryView(
zkPathsConfig,
@ -500,6 +597,12 @@ public class BrokerServerViewTest extends CuratorTestBase
{
return watchedTiers;
}
@Override
public Set<String> getIgnoredTiers()
{
return ignoredTiers;
}
}
);