From 64f97e7003ebc922e404ff49a31969993669747c Mon Sep 17 00:00:00 2001 From: Clint Wylie Date: Fri, 11 Dec 2020 14:14:00 -0800 Subject: [PATCH] fix DruidSchema incorrectly listing tables with no segments (#10660) * fix race condition with DruidSchema tables and dataSourcesNeedingRebuild * rework to see if it passes analysis * more better * maybe this * re-arrange and comments --- .../tests/query/ITBroadcastJoinQueryTest.java | 38 +++++++++++++++++-- ...cast_join_after_drop_metadata_queries.json | 9 +++++ .../broadcast_join_metadata_queries.json | 2 +- .../druid/sql/calcite/schema/DruidSchema.java | 19 +++++++--- 4 files changed, 58 insertions(+), 10 deletions(-) create mode 100644 integration-tests/src/test/resources/queries/broadcast_join_after_drop_metadata_queries.json diff --git a/integration-tests/src/test/java/org/apache/druid/tests/query/ITBroadcastJoinQueryTest.java b/integration-tests/src/test/java/org/apache/druid/tests/query/ITBroadcastJoinQueryTest.java index 66f98e85cee..8b62fa18e4a 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/query/ITBroadcastJoinQueryTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/query/ITBroadcastJoinQueryTest.java @@ -24,7 +24,6 @@ import com.google.inject.Inject; import org.apache.druid.curator.discovery.ServerDiscoveryFactory; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.io.Closer; -import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.java.util.http.client.HttpClient; import org.apache.druid.server.coordinator.rules.ForeverBroadcastDistributionRule; import org.apache.druid.testing.IntegrationTestingConfig; @@ -42,9 +41,9 @@ import org.testng.annotations.Test; @Guice(moduleFactory = DruidTestModuleFactory.class) public class ITBroadcastJoinQueryTest extends AbstractIndexerTest { - private static final Logger LOG = new Logger(ITBroadcastJoinQueryTest.class); private static final String BROADCAST_JOIN_TASK = "/indexer/broadcast_join_index_task.json"; private static final String BROADCAST_JOIN_METADATA_QUERIES_RESOURCE = "/queries/broadcast_join_metadata_queries.json"; + private static final String BROADCAST_JOIN_METADATA_QUERIES_AFTER_DROP_RESOURCE = "/queries/broadcast_join_after_drop_metadata_queries.json"; private static final String BROADCAST_JOIN_QUERIES_RESOURCE = "/queries/broadcast_join_queries.json"; private static final String BROADCAST_JOIN_DATASOURCE = "broadcast_join_wikipedia_test"; @@ -71,8 +70,19 @@ public class ITBroadcastJoinQueryTest extends AbstractIndexerTest final Closer closer = Closer.create(); try { closer.register(unloader(BROADCAST_JOIN_DATASOURCE)); + closer.register(() -> { + // remove broadcast rule + try { + coordinatorClient.postLoadRules( + BROADCAST_JOIN_DATASOURCE, + ImmutableList.of() + ); + } + catch (Exception ignored) { + } + }); - // prepare for broadcast + // prepare for broadcast by adding forever broadcast load rule coordinatorClient.postLoadRules( BROADCAST_JOIN_DATASOURCE, ImmutableList.of(new ForeverBroadcastDistributionRule()) @@ -80,7 +90,7 @@ public class ITBroadcastJoinQueryTest extends AbstractIndexerTest // load the data String taskJson = replaceJoinTemplate(getResourceAsString(BROADCAST_JOIN_TASK), BROADCAST_JOIN_DATASOURCE); - String taskId = indexer.submitTask(taskJson); + indexer.submitTask(taskJson); ITRetryUtil.retryUntilTrue( () -> coordinatorClient.areSegmentsLoaded(BROADCAST_JOIN_DATASOURCE), "broadcast segment load" @@ -114,6 +124,26 @@ public class ITBroadcastJoinQueryTest extends AbstractIndexerTest } finally { closer.close(); + + // query metadata until druid schema is refreshed and datasource is no longer available + ITRetryUtil.retryUntilTrue( + () -> { + try { + queryHelper.testQueriesFromString( + queryHelper.getQueryURL(config.getRouterUrl()), + replaceJoinTemplate( + getResourceAsString(BROADCAST_JOIN_METADATA_QUERIES_AFTER_DROP_RESOURCE), + BROADCAST_JOIN_DATASOURCE + ) + ); + return true; + } + catch (Exception ex) { + return false; + } + }, + "waiting for SQL metadata refresh" + ); } } diff --git a/integration-tests/src/test/resources/queries/broadcast_join_after_drop_metadata_queries.json b/integration-tests/src/test/resources/queries/broadcast_join_after_drop_metadata_queries.json new file mode 100644 index 00000000000..26e36885c6f --- /dev/null +++ b/integration-tests/src/test/resources/queries/broadcast_join_after_drop_metadata_queries.json @@ -0,0 +1,9 @@ +[ + { + "description": "query information schema to make sure datasource is joinable and broadcast", + "query": { + "query": "SELECT TABLE_NAME, IS_JOINABLE, IS_BROADCAST FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_NAME = '%%JOIN_DATASOURCE%%' AND IS_JOINABLE = 'YES' AND IS_BROADCAST = 'YES' AND TABLE_SCHEMA = 'druid'" + }, + "expectedResults": [] + } +] \ No newline at end of file diff --git a/integration-tests/src/test/resources/queries/broadcast_join_metadata_queries.json b/integration-tests/src/test/resources/queries/broadcast_join_metadata_queries.json index c2c32b31832..af036c76638 100644 --- a/integration-tests/src/test/resources/queries/broadcast_join_metadata_queries.json +++ b/integration-tests/src/test/resources/queries/broadcast_join_metadata_queries.json @@ -2,7 +2,7 @@ { "description": "query information schema to make sure datasource is joinable and broadcast", "query": { - "query": "SELECT TABLE_NAME, IS_JOINABLE, IS_BROADCAST FROM INFORMATION_SCHEMA.TABLES WHERE IS_JOINABLE = 'YES' AND IS_BROADCAST = 'YES' AND TABLE_SCHEMA = 'druid'" + "query": "SELECT TABLE_NAME, IS_JOINABLE, IS_BROADCAST FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_NAME = '%%JOIN_DATASOURCE%%' AND IS_JOINABLE = 'YES' AND IS_BROADCAST = 'YES' AND TABLE_SCHEMA = 'druid'" }, "expectedResults": [ { diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidSchema.java b/sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidSchema.java index b264749308c..215120aa02b 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidSchema.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidSchema.java @@ -363,6 +363,8 @@ public class DruidSchema extends AbstractSchema void addSegment(final DruidServerMetadata server, final DataSegment segment) { synchronized (lock) { + // someday we could hypothetically remove broker special casing, whenever BrokerServerView supports tracking + // broker served segments in the timeline, to ensure that removeSegment the event is triggered accurately if (server.getType().equals(ServerType.BROKER)) { // a segment on a broker means a broadcast datasource, skip metadata because we'll also see this segment on the // historical, however mark the datasource for refresh because it needs to be globalized @@ -423,7 +425,6 @@ public class DruidSchema extends AbstractSchema synchronized (lock) { log.debug("Segment[%s] is gone.", segment.getId()); - dataSourcesNeedingRebuild.add(segment.getDataSource()); segmentsNeedingRefresh.remove(segment.getId()); mutableSegments.remove(segment.getId()); @@ -437,6 +438,8 @@ public class DruidSchema extends AbstractSchema segmentMetadataInfo.remove(segment.getDataSource()); tables.remove(segment.getDataSource()); log.info("dataSource[%s] no longer exists, all metadata removed.", segment.getDataSource()); + } else { + dataSourcesNeedingRebuild.add(segment.getDataSource()); } lock.notifyAll(); @@ -448,12 +451,18 @@ public class DruidSchema extends AbstractSchema { synchronized (lock) { log.debug("Segment[%s] is gone from server[%s]", segment.getId(), server.getName()); + final Map knownSegments = segmentMetadataInfo.get(segment.getDataSource()); + + // someday we could hypothetically remove broker special casing, whenever BrokerServerView supports tracking + // broker served segments in the timeline, to ensure that removeSegment the event is triggered accurately if (server.getType().equals(ServerType.BROKER)) { - // a segment on a broker means a broadcast datasource, skip metadata because we'll also see this segment on the - // historical, however mark the datasource for refresh because it might no longer be broadcast or something - dataSourcesNeedingRebuild.add(segment.getDataSource()); + // for brokers, if the segment drops from all historicals before the broker this could be null. + if (knownSegments != null && !knownSegments.isEmpty()) { + // a segment on a broker means a broadcast datasource, skip metadata because we'll also see this segment on the + // historical, however mark the datasource for refresh because it might no longer be broadcast or something + dataSourcesNeedingRebuild.add(segment.getDataSource()); + } } else { - final Map knownSegments = segmentMetadataInfo.get(segment.getDataSource()); final AvailableSegmentMetadata segmentMetadata = knownSegments.get(segment.getId()); final Set segmentServers = segmentMetadata.getReplicas(); final ImmutableSet servers = FluentIterable