fix DruidSchema incorrectly listing tables with no segments (#10660)

* fix race condition with DruidSchema tables and dataSourcesNeedingRebuild

* rework to see if it passes analysis

* more better

* maybe this

* re-arrange and comments
This commit is contained in:
Clint Wylie 2020-12-11 14:14:00 -08:00 committed by GitHub
parent 753fa6b3bd
commit 64f97e7003
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 58 additions and 10 deletions

View File

@ -24,7 +24,6 @@ import com.google.inject.Inject;
import org.apache.druid.curator.discovery.ServerDiscoveryFactory; import org.apache.druid.curator.discovery.ServerDiscoveryFactory;
import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.io.Closer; import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.http.client.HttpClient; import org.apache.druid.java.util.http.client.HttpClient;
import org.apache.druid.server.coordinator.rules.ForeverBroadcastDistributionRule; import org.apache.druid.server.coordinator.rules.ForeverBroadcastDistributionRule;
import org.apache.druid.testing.IntegrationTestingConfig; import org.apache.druid.testing.IntegrationTestingConfig;
@ -42,9 +41,9 @@ import org.testng.annotations.Test;
@Guice(moduleFactory = DruidTestModuleFactory.class) @Guice(moduleFactory = DruidTestModuleFactory.class)
public class ITBroadcastJoinQueryTest extends AbstractIndexerTest public class ITBroadcastJoinQueryTest extends AbstractIndexerTest
{ {
private static final Logger LOG = new Logger(ITBroadcastJoinQueryTest.class);
private static final String BROADCAST_JOIN_TASK = "/indexer/broadcast_join_index_task.json"; private static final String BROADCAST_JOIN_TASK = "/indexer/broadcast_join_index_task.json";
private static final String BROADCAST_JOIN_METADATA_QUERIES_RESOURCE = "/queries/broadcast_join_metadata_queries.json"; private static final String BROADCAST_JOIN_METADATA_QUERIES_RESOURCE = "/queries/broadcast_join_metadata_queries.json";
private static final String BROADCAST_JOIN_METADATA_QUERIES_AFTER_DROP_RESOURCE = "/queries/broadcast_join_after_drop_metadata_queries.json";
private static final String BROADCAST_JOIN_QUERIES_RESOURCE = "/queries/broadcast_join_queries.json"; private static final String BROADCAST_JOIN_QUERIES_RESOURCE = "/queries/broadcast_join_queries.json";
private static final String BROADCAST_JOIN_DATASOURCE = "broadcast_join_wikipedia_test"; private static final String BROADCAST_JOIN_DATASOURCE = "broadcast_join_wikipedia_test";
@ -71,8 +70,19 @@ public class ITBroadcastJoinQueryTest extends AbstractIndexerTest
final Closer closer = Closer.create(); final Closer closer = Closer.create();
try { try {
closer.register(unloader(BROADCAST_JOIN_DATASOURCE)); closer.register(unloader(BROADCAST_JOIN_DATASOURCE));
closer.register(() -> {
// remove broadcast rule
try {
coordinatorClient.postLoadRules(
BROADCAST_JOIN_DATASOURCE,
ImmutableList.of()
);
}
catch (Exception ignored) {
}
});
// prepare for broadcast // prepare for broadcast by adding forever broadcast load rule
coordinatorClient.postLoadRules( coordinatorClient.postLoadRules(
BROADCAST_JOIN_DATASOURCE, BROADCAST_JOIN_DATASOURCE,
ImmutableList.of(new ForeverBroadcastDistributionRule()) ImmutableList.of(new ForeverBroadcastDistributionRule())
@ -80,7 +90,7 @@ public class ITBroadcastJoinQueryTest extends AbstractIndexerTest
// load the data // load the data
String taskJson = replaceJoinTemplate(getResourceAsString(BROADCAST_JOIN_TASK), BROADCAST_JOIN_DATASOURCE); String taskJson = replaceJoinTemplate(getResourceAsString(BROADCAST_JOIN_TASK), BROADCAST_JOIN_DATASOURCE);
String taskId = indexer.submitTask(taskJson); indexer.submitTask(taskJson);
ITRetryUtil.retryUntilTrue( ITRetryUtil.retryUntilTrue(
() -> coordinatorClient.areSegmentsLoaded(BROADCAST_JOIN_DATASOURCE), "broadcast segment load" () -> coordinatorClient.areSegmentsLoaded(BROADCAST_JOIN_DATASOURCE), "broadcast segment load"
@ -114,6 +124,26 @@ public class ITBroadcastJoinQueryTest extends AbstractIndexerTest
} }
finally { finally {
closer.close(); closer.close();
// query metadata until druid schema is refreshed and datasource is no longer available
ITRetryUtil.retryUntilTrue(
() -> {
try {
queryHelper.testQueriesFromString(
queryHelper.getQueryURL(config.getRouterUrl()),
replaceJoinTemplate(
getResourceAsString(BROADCAST_JOIN_METADATA_QUERIES_AFTER_DROP_RESOURCE),
BROADCAST_JOIN_DATASOURCE
)
);
return true;
}
catch (Exception ex) {
return false;
}
},
"waiting for SQL metadata refresh"
);
} }
} }

View File

@ -0,0 +1,9 @@
[
{
"description": "query information schema to make sure datasource is joinable and broadcast",
"query": {
"query": "SELECT TABLE_NAME, IS_JOINABLE, IS_BROADCAST FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_NAME = '%%JOIN_DATASOURCE%%' AND IS_JOINABLE = 'YES' AND IS_BROADCAST = 'YES' AND TABLE_SCHEMA = 'druid'"
},
"expectedResults": []
}
]

View File

@ -2,7 +2,7 @@
{ {
"description": "query information schema to make sure datasource is joinable and broadcast", "description": "query information schema to make sure datasource is joinable and broadcast",
"query": { "query": {
"query": "SELECT TABLE_NAME, IS_JOINABLE, IS_BROADCAST FROM INFORMATION_SCHEMA.TABLES WHERE IS_JOINABLE = 'YES' AND IS_BROADCAST = 'YES' AND TABLE_SCHEMA = 'druid'" "query": "SELECT TABLE_NAME, IS_JOINABLE, IS_BROADCAST FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_NAME = '%%JOIN_DATASOURCE%%' AND IS_JOINABLE = 'YES' AND IS_BROADCAST = 'YES' AND TABLE_SCHEMA = 'druid'"
}, },
"expectedResults": [ "expectedResults": [
{ {

View File

@ -363,6 +363,8 @@ public class DruidSchema extends AbstractSchema
void addSegment(final DruidServerMetadata server, final DataSegment segment) void addSegment(final DruidServerMetadata server, final DataSegment segment)
{ {
synchronized (lock) { synchronized (lock) {
// someday we could hypothetically remove broker special casing, whenever BrokerServerView supports tracking
// broker served segments in the timeline, to ensure that removeSegment the event is triggered accurately
if (server.getType().equals(ServerType.BROKER)) { if (server.getType().equals(ServerType.BROKER)) {
// a segment on a broker means a broadcast datasource, skip metadata because we'll also see this segment on the // a segment on a broker means a broadcast datasource, skip metadata because we'll also see this segment on the
// historical, however mark the datasource for refresh because it needs to be globalized // historical, however mark the datasource for refresh because it needs to be globalized
@ -423,7 +425,6 @@ public class DruidSchema extends AbstractSchema
synchronized (lock) { synchronized (lock) {
log.debug("Segment[%s] is gone.", segment.getId()); log.debug("Segment[%s] is gone.", segment.getId());
dataSourcesNeedingRebuild.add(segment.getDataSource());
segmentsNeedingRefresh.remove(segment.getId()); segmentsNeedingRefresh.remove(segment.getId());
mutableSegments.remove(segment.getId()); mutableSegments.remove(segment.getId());
@ -437,6 +438,8 @@ public class DruidSchema extends AbstractSchema
segmentMetadataInfo.remove(segment.getDataSource()); segmentMetadataInfo.remove(segment.getDataSource());
tables.remove(segment.getDataSource()); tables.remove(segment.getDataSource());
log.info("dataSource[%s] no longer exists, all metadata removed.", segment.getDataSource()); log.info("dataSource[%s] no longer exists, all metadata removed.", segment.getDataSource());
} else {
dataSourcesNeedingRebuild.add(segment.getDataSource());
} }
lock.notifyAll(); lock.notifyAll();
@ -448,12 +451,18 @@ public class DruidSchema extends AbstractSchema
{ {
synchronized (lock) { synchronized (lock) {
log.debug("Segment[%s] is gone from server[%s]", segment.getId(), server.getName()); log.debug("Segment[%s] is gone from server[%s]", segment.getId(), server.getName());
final Map<SegmentId, AvailableSegmentMetadata> knownSegments = segmentMetadataInfo.get(segment.getDataSource());
// someday we could hypothetically remove broker special casing, whenever BrokerServerView supports tracking
// broker served segments in the timeline, to ensure that removeSegment the event is triggered accurately
if (server.getType().equals(ServerType.BROKER)) { if (server.getType().equals(ServerType.BROKER)) {
// for brokers, if the segment drops from all historicals before the broker this could be null.
if (knownSegments != null && !knownSegments.isEmpty()) {
// a segment on a broker means a broadcast datasource, skip metadata because we'll also see this segment on the // a segment on a broker means a broadcast datasource, skip metadata because we'll also see this segment on the
// historical, however mark the datasource for refresh because it might no longer be broadcast or something // historical, however mark the datasource for refresh because it might no longer be broadcast or something
dataSourcesNeedingRebuild.add(segment.getDataSource()); dataSourcesNeedingRebuild.add(segment.getDataSource());
}
} else { } else {
final Map<SegmentId, AvailableSegmentMetadata> knownSegments = segmentMetadataInfo.get(segment.getDataSource());
final AvailableSegmentMetadata segmentMetadata = knownSegments.get(segment.getId()); final AvailableSegmentMetadata segmentMetadata = knownSegments.get(segment.getId());
final Set<DruidServerMetadata> segmentServers = segmentMetadata.getReplicas(); final Set<DruidServerMetadata> segmentServers = segmentMetadata.getReplicas();
final ImmutableSet<DruidServerMetadata> servers = FluentIterable final ImmutableSet<DruidServerMetadata> servers = FluentIterable