ignore brokers in broker views (#10017)

This commit is contained in:
Clint Wylie 2020-06-10 12:29:30 -07:00 committed by GitHub
parent 5da78d13af
commit 96eb69e475
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 171 additions and 25 deletions

View File

@ -42,6 +42,7 @@ import org.apache.druid.query.QueryWatcher;
import org.apache.druid.query.TableDataSource;
import org.apache.druid.query.planning.DataSourceAnalysis;
import org.apache.druid.server.coordination.DruidServerMetadata;
import org.apache.druid.server.coordination.ServerType;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.SegmentId;
import org.apache.druid.timeline.VersionedIntervalTimeline;
@ -217,6 +218,12 @@ public class BrokerServerView implements TimelineServerView
private void serverAddedSegment(final DruidServerMetadata server, final DataSegment segment)
{
if (server.getType().equals(ServerType.BROKER)) {
// in theory we could just filter this to ensure we don't put ourselves in here, to make dope broker tree
// query topologies, but for now just skip all brokers, so we don't create some sort of wild infinite query
// loop...
return;
}
SegmentId segmentId = segment.getId();
synchronized (lock) {
log.debug("Adding segment[%s] for server[%s]", segment, server);
@ -246,6 +253,10 @@ public class BrokerServerView implements TimelineServerView
private void serverRemovedSegment(DruidServerMetadata server, DataSegment segment)
{
if (server.getType().equals(ServerType.BROKER)) {
// might as well save the trouble of grabbing a lock for something that isn't there..
return;
}
SegmentId segmentId = segment.getId();
final ServerSelector selector;

View File

@ -22,7 +22,6 @@ package org.apache.druid.client;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.dataformat.smile.SmileFactory;
import com.fasterxml.jackson.dataformat.smile.SmileGenerator;
import com.google.common.base.Function;
import com.google.common.base.Predicates;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
@ -161,22 +160,15 @@ public class BrokerServerViewTest extends CuratorTestBase
final List<DruidServer> druidServers = Lists.transform(
ImmutableList.of("locahost:0", "localhost:1", "localhost:2", "localhost:3", "localhost:4"),
new Function<String, DruidServer>()
{
@Override
public DruidServer apply(String input)
{
return new DruidServer(
input,
input,
null,
10000000L,
ServerType.HISTORICAL,
"default_tier",
0
);
}
}
input -> new DruidServer(
input,
input,
null,
10000000L,
ServerType.HISTORICAL,
"default_tier",
0
)
);
for (DruidServer druidServer : druidServers) {
@ -190,14 +182,7 @@ public class BrokerServerViewTest extends CuratorTestBase
Pair.of("2011-04-01/2011-04-09", "v2"),
Pair.of("2011-04-06/2011-04-09", "v3"),
Pair.of("2011-04-01/2011-04-02", "v3")
), new Function<Pair<String, String>, DataSegment>()
{
@Override
public DataSegment apply(Pair<String, String> input)
{
return dataSegmentWithIntervalAndVersion(input.lhs, input.rhs);
}
}
), input -> dataSegmentWithIntervalAndVersion(input.lhs, input.rhs)
);
for (int i = 0; i < 5; ++i) {
@ -261,6 +246,114 @@ public class BrokerServerViewTest extends CuratorTestBase
);
}
@Test
public void testMultipleServerAndBroker() throws Exception
{
segmentViewInitLatch = new CountDownLatch(1);
segmentAddedLatch = new CountDownLatch(6);
// temporarily set latch count to 1
segmentRemovedLatch = new CountDownLatch(1);
setupViews();
final DruidServer druidBroker = new DruidServer(
"localhost:5",
"localhost:5",
null,
10000000L,
ServerType.BROKER,
"default_tier",
0
);
final List<DruidServer> druidServers = Lists.transform(
ImmutableList.of("locahost:0", "localhost:1", "localhost:2", "localhost:3", "localhost:4"),
input -> new DruidServer(
input,
input,
null,
10000000L,
ServerType.HISTORICAL,
"default_tier",
0
)
);
setupZNodeForServer(druidBroker, zkPathsConfig, jsonMapper);
for (DruidServer druidServer : druidServers) {
setupZNodeForServer(druidServer, zkPathsConfig, jsonMapper);
}
final List<DataSegment> segments = Lists.transform(
ImmutableList.of(
Pair.of("2011-04-01/2011-04-03", "v1"),
Pair.of("2011-04-03/2011-04-06", "v1"),
Pair.of("2011-04-01/2011-04-09", "v2"),
Pair.of("2011-04-06/2011-04-09", "v3"),
Pair.of("2011-04-01/2011-04-02", "v3")
),
input -> dataSegmentWithIntervalAndVersion(input.lhs, input.rhs)
);
DataSegment brokerSegment = dataSegmentWithIntervalAndVersion("2011-04-01/2011-04-11", "v4");
announceSegmentForServer(druidBroker, brokerSegment, zkPathsConfig, jsonMapper);
for (int i = 0; i < 5; ++i) {
announceSegmentForServer(druidServers.get(i), segments.get(i), zkPathsConfig, jsonMapper);
}
Assert.assertTrue(timing.forWaiting().awaitLatch(segmentViewInitLatch));
Assert.assertTrue(timing.forWaiting().awaitLatch(segmentAddedLatch));
TimelineLookup timeline = brokerServerView.getTimeline(
DataSourceAnalysis.forDataSource(new TableDataSource("test_broker_server_view"))
).get();
assertValues(
Arrays.asList(
createExpected("2011-04-01/2011-04-02", "v3", druidServers.get(4), segments.get(4)),
createExpected("2011-04-02/2011-04-06", "v2", druidServers.get(2), segments.get(2)),
createExpected("2011-04-06/2011-04-09", "v3", druidServers.get(3), segments.get(3))
),
(List<TimelineObjectHolder>) timeline.lookup(
Intervals.of(
"2011-04-01/2011-04-09"
)
)
);
// unannounce the broker segment should do nothing to announcements
unannounceSegmentForServer(druidBroker, brokerSegment, zkPathsConfig);
Assert.assertTrue(timing.forWaiting().awaitLatch(segmentRemovedLatch));
// renew segmentRemovedLatch since we still have 5 segments to unannounce
segmentRemovedLatch = new CountDownLatch(5);
timeline = brokerServerView.getTimeline(
DataSourceAnalysis.forDataSource(new TableDataSource("test_broker_server_view"))
).get();
// expect same set of segments as before
assertValues(
Arrays.asList(
createExpected("2011-04-01/2011-04-02", "v3", druidServers.get(4), segments.get(4)),
createExpected("2011-04-02/2011-04-06", "v2", druidServers.get(2), segments.get(2)),
createExpected("2011-04-06/2011-04-09", "v3", druidServers.get(3), segments.get(3))
),
(List<TimelineObjectHolder>) timeline.lookup(
Intervals.of(
"2011-04-01/2011-04-09"
)
)
);
// unannounce all the segments
for (int i = 0; i < 5; ++i) {
unannounceSegmentForServer(druidServers.get(i), segments.get(i), zkPathsConfig);
}
Assert.assertTrue(timing.forWaiting().awaitLatch(segmentRemovedLatch));
}
private Pair<Interval, Pair<String, Pair<DruidServer, DataSegment>>> createExpected(
String intervalStr,
String version,

View File

@ -350,6 +350,12 @@ public class DruidSchema extends AbstractSchema
@VisibleForTesting
void addSegment(final DruidServerMetadata server, final DataSegment segment)
{
if (server.getType().equals(ServerType.BROKER)) {
// in theory we could just filter this to ensure we don't put ourselves in here, to make dope broker tree
// query topologies, but for now just skip all brokers, so we don't create some sort of wild infinite metadata
// loop...
return;
}
synchronized (lock) {
final Map<SegmentId, AvailableSegmentMetadata> knownSegments = segmentMetadataInfo.get(segment.getDataSource());
AvailableSegmentMetadata segmentMetadata = knownSegments != null ? knownSegments.get(segment.getId()) : null;
@ -428,6 +434,10 @@ public class DruidSchema extends AbstractSchema
@VisibleForTesting
void removeServerSegment(final DruidServerMetadata server, final DataSegment segment)
{
if (server.getType().equals(ServerType.BROKER)) {
// cheese it
return;
}
synchronized (lock) {
log.debug("Segment[%s] is gone from server[%s]", segment.getId(), server.getName());
final Map<SegmentId, AvailableSegmentMetadata> knownSegments = segmentMetadataInfo.get(segment.getDataSource());

View File

@ -57,6 +57,7 @@ import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.DataSegment.PruneSpecsHolder;
import org.apache.druid.timeline.SegmentId;
import org.apache.druid.timeline.partition.LinearShardSpec;
import org.apache.druid.timeline.partition.NoneShardSpec;
import org.apache.druid.timeline.partition.NumberedShardSpec;
import org.junit.After;
import org.junit.AfterClass;
@ -418,4 +419,35 @@ public class DruidSchemaTest extends CalciteTestBase
Assert.assertEquals(0L, currentMetadata.isRealtime());
}
@Test
public void testAvailableSegmentFromBrokerIsIgnored()
{
Assert.assertEquals(4, schema.getTotalSegments());
DruidServerMetadata metadata = new DruidServerMetadata(
"broker",
"localhost:0",
null,
1000L,
ServerType.BROKER,
"broken",
0
);
DataSegment segment = new DataSegment(
"test",
Intervals.of("2011-04-01/2011-04-11"),
"v1",
ImmutableMap.of(),
ImmutableList.of(),
ImmutableList.of(),
NoneShardSpec.instance(),
1,
100L
);
schema.addSegment(metadata, segment);
Assert.assertEquals(4, schema.getTotalSegments());
}
}