Add unit test for config `druid.broker.segment.watchedTiers` (#11555)

This commit is contained in:
Kashif Faraz 2021-08-07 00:12:40 +05:30 committed by GitHub
parent 59c8430d29
commit 39a3db7943
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 97 additions and 37 deletions

View File

@ -27,6 +27,7 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import org.apache.druid.client.selector.HighestPriorityTierSelectorStrategy;
import org.apache.druid.client.selector.RandomServerSelectorStrategy;
import org.apache.druid.client.selector.ServerSelector;
@ -58,7 +59,9 @@ import org.junit.Before;
import org.junit.Test;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
@ -97,18 +100,7 @@ public class BrokerServerViewTest extends CuratorTestBase
setupViews();
final DruidServer druidServer = new DruidServer(
"localhost:1234",
"localhost:1234",
null,
10000000L,
ServerType.HISTORICAL,
"default_tier",
0
);
setupZNodeForServer(druidServer, zkPathsConfig, jsonMapper);
final DruidServer druidServer = setupHistoricalServer("default_tier", "localhost:1234", 0);
final DataSegment segment = dataSegmentWithIntervalAndVersion("2014-10-20T00:00:00Z/P1D", "v1");
final int partition = segment.getShardSpec().getPartitionNum();
final Interval intervals = Intervals.of("2014-10-20T00:00:00Z/P1D");
@ -159,21 +151,9 @@ public class BrokerServerViewTest extends CuratorTestBase
final List<DruidServer> druidServers = Lists.transform(
ImmutableList.of("locahost:0", "localhost:1", "localhost:2", "localhost:3", "localhost:4"),
input -> new DruidServer(
input,
input,
null,
10000000L,
ServerType.HISTORICAL,
"default_tier",
0
)
hostname -> setupHistoricalServer("default_tier", hostname, 0)
);
for (DruidServer druidServer : druidServers) {
setupZNodeForServer(druidServer, zkPathsConfig, jsonMapper);
}
final List<DataSegment> segments = Lists.transform(
ImmutableList.of(
Pair.of("2011-04-01/2011-04-03", "v1"),
@ -268,21 +248,10 @@ public class BrokerServerViewTest extends CuratorTestBase
final List<DruidServer> druidServers = Lists.transform(
ImmutableList.of("locahost:0", "localhost:1", "localhost:2", "localhost:3", "localhost:4"),
input -> new DruidServer(
input,
input,
null,
10000000L,
ServerType.HISTORICAL,
"default_tier",
0
)
hostname -> setupHistoricalServer("default_tier", hostname, 0)
);
setupZNodeForServer(druidBroker, zkPathsConfig, jsonMapper);
for (DruidServer druidServer : druidServers) {
setupZNodeForServer(druidServer, zkPathsConfig, jsonMapper);
}
final List<DataSegment> segments = Lists.transform(
ImmutableList.of(
@ -352,6 +321,85 @@ public class BrokerServerViewTest extends CuratorTestBase
Assert.assertTrue(timing.forWaiting().awaitLatch(segmentRemovedLatch));
}
@Test
public void testMultipleTiers() throws Exception
{
segmentViewInitLatch = new CountDownLatch(1);
segmentAddedLatch = new CountDownLatch(4);
segmentRemovedLatch = new CountDownLatch(0);
// Setup a Broker that watches only Tier 2
final String tier1 = "tier1";
final String tier2 = "tier2";
setupViews(Sets.newHashSet(tier2));
// Historical Tier 1 has segments 1 and 2, Tier 2 has segments 2 and 3
final DruidServer server11 = setupHistoricalServer(tier1, "localhost:1", 1);
final DruidServer server21 = setupHistoricalServer(tier2, "localhost:2", 1);
final DataSegment segment1 = dataSegmentWithIntervalAndVersion("2020-01-01/P1D", "v1");
announceSegmentForServer(server11, segment1, zkPathsConfig, jsonMapper);
final DataSegment segment2 = dataSegmentWithIntervalAndVersion("2020-01-02/P1D", "v1");
announceSegmentForServer(server11, segment2, zkPathsConfig, jsonMapper);
announceSegmentForServer(server21, segment2, zkPathsConfig, jsonMapper);
final DataSegment segment3 = dataSegmentWithIntervalAndVersion("2020-01-03/P1D", "v1");
announceSegmentForServer(server21, segment3, zkPathsConfig, jsonMapper);
// Wait for the segments to be added
Assert.assertTrue(timing.forWaiting().awaitLatch(segmentViewInitLatch));
Assert.assertTrue(timing.forWaiting().awaitLatch(segmentAddedLatch));
// Get the timeline for the datasource
TimelineLookup<String, ServerSelector> timeline = brokerServerView.getTimeline(
DataSourceAnalysis.forDataSource(new TableDataSource(segment1.getDataSource()))
).get();
// Verify that the timeline has no entry for the interval of segment 1
Assert.assertTrue(timeline.lookup(segment1.getInterval()).isEmpty());
// Verify that there is one entry for the interval of segment 2
List<TimelineObjectHolder<String, ServerSelector>> timelineHolders =
timeline.lookup(segment2.getInterval());
Assert.assertEquals(1, timelineHolders.size());
TimelineObjectHolder<String, ServerSelector> timelineHolder = timelineHolders.get(0);
Assert.assertEquals(segment2.getInterval(), timelineHolder.getInterval());
Assert.assertEquals(segment2.getVersion(), timelineHolder.getVersion());
PartitionHolder<ServerSelector> partitionHolder = timelineHolder.getObject();
Assert.assertTrue(partitionHolder.isComplete());
Assert.assertEquals(1, Iterables.size(partitionHolder));
ServerSelector selector = (partitionHolder.iterator().next()).getObject();
Assert.assertFalse(selector.isEmpty());
Assert.assertEquals(segment2, selector.getSegment());
// Verify that the ServerSelector always picks Tier 1
for (int i = 0; i < 5; ++i) {
Assert.assertEquals(server21, selector.pick(null).getServer());
}
Assert.assertEquals(Collections.singletonList(server21.getMetadata()), selector.getCandidates(2));
}
/**
* Creates a DruidServer of type HISTORICAL and sets up a ZNode for it.
*/
private DruidServer setupHistoricalServer(String tier, String name, int priority)
{
final DruidServer historical = new DruidServer(
name,
name,
null,
1000000,
ServerType.HISTORICAL,
tier,
priority
);
setupZNodeForServer(historical, zkPathsConfig, jsonMapper);
return historical;
}
private Pair<Interval, Pair<String, Pair<DruidServer, DataSegment>>> createExpected(
String intervalStr,
@ -389,6 +437,11 @@ public class BrokerServerViewTest extends CuratorTestBase
}
private void setupViews() throws Exception
{
setupViews(null);
}
private void setupViews(Set<String> watchedTiers) throws Exception
{
baseView = new BatchServerInventoryView(
zkPathsConfig,
@ -441,6 +494,13 @@ public class BrokerServerViewTest extends CuratorTestBase
new HighestPriorityTierSelectorStrategy(new RandomServerSelectorStrategy()),
new NoopServiceEmitter(),
new BrokerSegmentWatcherConfig()
{
@Override
public Set<String> getWatchedTiers()
{
return watchedTiers;
}
}
);
baseView.start();