From 68aa3841904cc08c2d4ab43b37994a868574fb91 Mon Sep 17 00:00:00 2001 From: Clint Wylie Date: Tue, 16 Jun 2020 17:58:05 -0700 Subject: [PATCH] global table datasource for broadcast segments (#10020) * global table datasource for broadcast segments * tests * fix * fix test * comments and javadocs * review stuffs * use generated equals and hashcode --- .../org/apache/druid/query/DataSource.java | 3 +- .../druid/query/GlobalTableDataSource.java | 58 +++ .../apache/druid/query/TableDataSource.java | 17 +- .../apache/druid/query/DataSourceTest.java | 1 - .../query/GlobalTableDataSourceTest.java | 67 ++++ .../druid/query/TableDataSourceTest.java | 2 +- .../apache/druid/client/BrokerServerView.java | 55 +-- .../druid/server/LocalQuerySegmentWalker.java | 1 + .../apache/druid/server/SegmentManager.java | 6 + .../druid/server/SegmentManagerTest.java | 26 +- .../druid/sql/calcite/schema/DruidSchema.java | 336 +++++++++--------- .../schema/DruidCalciteSchemaModuleTest.java | 4 + .../schema/DruidSchemaNoDataInitTest.java | 4 + .../sql/calcite/schema/DruidSchemaTest.java | 123 +++++-- .../sql/calcite/schema/SystemSchemaTest.java | 3 + .../druid/sql/calcite/util/CalciteTests.java | 4 + .../calcite/util/TestServerInventoryView.java | 78 +++- 17 files changed, 545 insertions(+), 243 deletions(-) create mode 100644 processing/src/main/java/org/apache/druid/query/GlobalTableDataSource.java create mode 100644 processing/src/test/java/org/apache/druid/query/GlobalTableDataSourceTest.java diff --git a/processing/src/main/java/org/apache/druid/query/DataSource.java b/processing/src/main/java/org/apache/druid/query/DataSource.java index b78a45fc36c..c3edd1c7964 100644 --- a/processing/src/main/java/org/apache/druid/query/DataSource.java +++ b/processing/src/main/java/org/apache/druid/query/DataSource.java @@ -35,7 +35,8 @@ import java.util.Set; @JsonSubTypes.Type(value = UnionDataSource.class, name = "union"), @JsonSubTypes.Type(value = JoinDataSource.class, name = "join"), @JsonSubTypes.Type(value = LookupDataSource.class, name = "lookup"), - @JsonSubTypes.Type(value = InlineDataSource.class, name = "inline") + @JsonSubTypes.Type(value = InlineDataSource.class, name = "inline"), + @JsonSubTypes.Type(value = GlobalTableDataSource.class, name = "globalTable") }) public interface DataSource { diff --git a/processing/src/main/java/org/apache/druid/query/GlobalTableDataSource.java b/processing/src/main/java/org/apache/druid/query/GlobalTableDataSource.java new file mode 100644 index 00000000000..da5f1390ca3 --- /dev/null +++ b/processing/src/main/java/org/apache/druid/query/GlobalTableDataSource.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonTypeName; + +/** + * {@link TableDataSource} variant for globally available 'broadcast' segments. If bound to a + * {@link org.apache.druid.segment.join.JoinableFactory} that can create an + * {@link org.apache.druid.segment.join.table.IndexedTable} using DruidBinders.joinableFactoryBinder, this allows + * optimal usage of segments using this DataSource type in join operations (because they are global), and so can be + * pushed down to historicals as a {@link JoinDataSource}, instead of requiring a subquery join using + * {@link InlineDataSource} to construct an {@link org.apache.druid.segment.join.table.IndexedTable} on the fly on the + * broker. Because it is also a {@link TableDataSource}, when queried directly, or on the left hand side of a join, + * they will be treated as any normal table datasource. + */ +@JsonTypeName("globalTable") +public class GlobalTableDataSource extends TableDataSource +{ + @JsonCreator + public GlobalTableDataSource(@JsonProperty("name") String name) + { + super(name); + } + + @Override + public boolean isGlobal() + { + return true; + } + + @Override + public String toString() + { + return "GlobalTableDataSource{" + + "name='" + getName() + '\'' + + '}'; + } +} diff --git a/processing/src/main/java/org/apache/druid/query/TableDataSource.java b/processing/src/main/java/org/apache/druid/query/TableDataSource.java index f9a07a181ee..469d5be2171 100644 --- a/processing/src/main/java/org/apache/druid/query/TableDataSource.java +++ b/processing/src/main/java/org/apache/druid/query/TableDataSource.java @@ -27,6 +27,7 @@ import org.apache.druid.java.util.common.IAE; import java.util.Collections; import java.util.List; +import java.util.Objects; import java.util.Set; @JsonTypeName("table") @@ -99,27 +100,21 @@ public class TableDataSource implements DataSource } @Override - public final boolean equals(Object o) + public boolean equals(Object o) { if (this == o) { return true; } - if (!(o instanceof TableDataSource)) { + if (o == null || getClass() != o.getClass()) { return false; } - TableDataSource that = (TableDataSource) o; - - if (!name.equals(that.name)) { - return false; - } - - return true; + return name.equals(that.name); } @Override - public final int hashCode() + public int hashCode() { - return name.hashCode(); + return Objects.hash(name); } } diff --git a/processing/src/test/java/org/apache/druid/query/DataSourceTest.java b/processing/src/test/java/org/apache/druid/query/DataSourceTest.java index 090570db7ac..7c7f50f281b 100644 --- a/processing/src/test/java/org/apache/druid/query/DataSourceTest.java +++ b/processing/src/test/java/org/apache/druid/query/DataSourceTest.java @@ -99,5 +99,4 @@ public class DataSourceTest final DataSource serde = JSON_MAPPER.readValue(JSON_MAPPER.writeValueAsString(dataSource), DataSource.class); Assert.assertEquals(dataSource, serde); } - } diff --git a/processing/src/test/java/org/apache/druid/query/GlobalTableDataSourceTest.java b/processing/src/test/java/org/apache/druid/query/GlobalTableDataSourceTest.java new file mode 100644 index 00000000000..fea379015ad --- /dev/null +++ b/processing/src/test/java/org/apache/druid/query/GlobalTableDataSourceTest.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import nl.jqno.equalsverifier.EqualsVerifier; +import org.apache.druid.segment.TestHelper; +import org.junit.Assert; +import org.junit.Test; + +public class GlobalTableDataSourceTest +{ + private static final GlobalTableDataSource GLOBAL_TABLE_DATA_SOURCE = new GlobalTableDataSource("foo"); + + @Test + public void testEquals() + { + EqualsVerifier.forClass(GlobalTableDataSource.class) + .usingGetClass() + .withNonnullFields("name") + .verify(); + } + + @Test + public void testGlobalTableIsNotEqualsTable() + { + TableDataSource tbl = new TableDataSource(GLOBAL_TABLE_DATA_SOURCE.getName()); + Assert.assertNotEquals(GLOBAL_TABLE_DATA_SOURCE, tbl); + Assert.assertNotEquals(tbl, GLOBAL_TABLE_DATA_SOURCE); + } + + @Test + public void testIsGlobal() + { + Assert.assertTrue(GLOBAL_TABLE_DATA_SOURCE.isGlobal()); + } + + @Test + public void testSerde() throws JsonProcessingException + { + final ObjectMapper jsonMapper = TestHelper.makeJsonMapper(); + final GlobalTableDataSource deserialized = (GlobalTableDataSource) jsonMapper.readValue( + jsonMapper.writeValueAsString(GLOBAL_TABLE_DATA_SOURCE), + DataSource.class + ); + + Assert.assertEquals(GLOBAL_TABLE_DATA_SOURCE, deserialized); + } +} diff --git a/processing/src/test/java/org/apache/druid/query/TableDataSourceTest.java b/processing/src/test/java/org/apache/druid/query/TableDataSourceTest.java index ec8d27d2902..b5aeeb11bab 100644 --- a/processing/src/test/java/org/apache/druid/query/TableDataSourceTest.java +++ b/processing/src/test/java/org/apache/druid/query/TableDataSourceTest.java @@ -86,7 +86,7 @@ public class TableDataSourceTest @Test public void test_equals() { - EqualsVerifier.forClass(TableDataSource.class).withNonnullFields("name").verify(); + EqualsVerifier.forClass(TableDataSource.class).usingGetClass().withNonnullFields("name").verify(); } @Test diff --git a/server/src/main/java/org/apache/druid/client/BrokerServerView.java b/server/src/main/java/org/apache/druid/client/BrokerServerView.java index 3b5d40872cd..debd72131c2 100644 --- a/server/src/main/java/org/apache/druid/client/BrokerServerView.java +++ b/server/src/main/java/org/apache/druid/client/BrokerServerView.java @@ -218,51 +218,54 @@ public class BrokerServerView implements TimelineServerView private void serverAddedSegment(final DruidServerMetadata server, final DataSegment segment) { - if (server.getType().equals(ServerType.BROKER)) { - // in theory we could just filter this to ensure we don't put ourselves in here, to make dope broker tree - // query topologies, but for now just skip all brokers, so we don't create some sort of wild infinite query - // loop... - return; - } SegmentId segmentId = segment.getId(); synchronized (lock) { - log.debug("Adding segment[%s] for server[%s]", segment, server); + // in theory we could probably just filter this to ensure we don't put ourselves in here, to make broker tree + // query topologies, but for now just skip all brokers, so we don't create some sort of wild infinite query + // loop... + if (!server.getType().equals(ServerType.BROKER)) { + log.debug("Adding segment[%s] for server[%s]", segment, server); + ServerSelector selector = selectors.get(segmentId); + if (selector == null) { + selector = new ServerSelector(segment, tierSelectorStrategy); - ServerSelector selector = selectors.get(segmentId); - if (selector == null) { - selector = new ServerSelector(segment, tierSelectorStrategy); + VersionedIntervalTimeline timeline = timelines.get(segment.getDataSource()); + if (timeline == null) { + timeline = new VersionedIntervalTimeline<>(Ordering.natural()); + timelines.put(segment.getDataSource(), timeline); + } - VersionedIntervalTimeline timeline = timelines.get(segment.getDataSource()); - if (timeline == null) { - timeline = new VersionedIntervalTimeline<>(Ordering.natural()); - timelines.put(segment.getDataSource(), timeline); + timeline.add(segment.getInterval(), segment.getVersion(), segment.getShardSpec().createChunk(selector)); + selectors.put(segmentId, selector); } - timeline.add(segment.getInterval(), segment.getVersion(), segment.getShardSpec().createChunk(selector)); - selectors.put(segmentId, selector); + QueryableDruidServer queryableDruidServer = clients.get(server.getName()); + if (queryableDruidServer == null) { + queryableDruidServer = addServer(baseView.getInventoryValue(server.getName())); + } + selector.addServerAndUpdateSegment(queryableDruidServer, segment); } - - QueryableDruidServer queryableDruidServer = clients.get(server.getName()); - if (queryableDruidServer == null) { - queryableDruidServer = addServer(baseView.getInventoryValue(server.getName())); - } - selector.addServerAndUpdateSegment(queryableDruidServer, segment); + // run the callbacks, even if the segment came from a broker, lets downstream watchers decide what to do with it runTimelineCallbacks(callback -> callback.segmentAdded(server, segment)); } } private void serverRemovedSegment(DruidServerMetadata server, DataSegment segment) { - if (server.getType().equals(ServerType.BROKER)) { - // might as well save the trouble of grabbing a lock for something that isn't there.. - return; - } + SegmentId segmentId = segment.getId(); final ServerSelector selector; synchronized (lock) { log.debug("Removing segment[%s] from server[%s].", segmentId, server); + // we don't store broker segments here, but still run the callbacks for the segment being removed from the server + // since the broker segments are not stored on the timeline, do not fire segmentRemoved event + if (server.getType().equals(ServerType.BROKER)) { + runTimelineCallbacks(callback -> callback.serverSegmentRemoved(server, segment)); + return; + } + selector = selectors.get(segmentId); if (selector == null) { log.warn("Told to remove non-existant segment[%s]", segmentId); diff --git a/server/src/main/java/org/apache/druid/server/LocalQuerySegmentWalker.java b/server/src/main/java/org/apache/druid/server/LocalQuerySegmentWalker.java index 5e3928c799e..8283524040f 100644 --- a/server/src/main/java/org/apache/druid/server/LocalQuerySegmentWalker.java +++ b/server/src/main/java/org/apache/druid/server/LocalQuerySegmentWalker.java @@ -129,6 +129,7 @@ public class LocalQuerySegmentWalker implements QuerySegmentWalker .applyPostMergeDecoration() .emitCPUTimeMetric(emitter, cpuAccumulator); } + @Override public QueryRunner getQueryRunnerForSegments(final Query query, final Iterable specs) { diff --git a/server/src/main/java/org/apache/druid/server/SegmentManager.java b/server/src/main/java/org/apache/druid/server/SegmentManager.java index 45b6538bb61..a5484f21c9a 100644 --- a/server/src/main/java/org/apache/druid/server/SegmentManager.java +++ b/server/src/main/java/org/apache/druid/server/SegmentManager.java @@ -40,6 +40,7 @@ import org.apache.druid.utils.CollectionUtils; import java.util.Map; import java.util.Optional; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; /** @@ -121,6 +122,11 @@ public class SegmentManager return CollectionUtils.mapValues(dataSources, SegmentManager.DataSourceState::getTotalSegmentSize); } + public Set getDataSourceNames() + { + return dataSources.keySet(); + } + /** * Returns a map of dataSource to the number of segments managed by this segmentManager. This method should be * carefully because the returned map might be different from the actual data source states. diff --git a/server/src/test/java/org/apache/druid/server/SegmentManagerTest.java b/server/src/test/java/org/apache/druid/server/SegmentManagerTest.java index 562fc1dfffb..a91411b8db7 100644 --- a/server/src/test/java/org/apache/druid/server/SegmentManagerTest.java +++ b/server/src/test/java/org/apache/druid/server/SegmentManagerTest.java @@ -51,6 +51,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.Set; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -421,17 +422,19 @@ public class SegmentManagerTest @SuppressWarnings("RedundantThrows") // TODO remove when the bug in intelliJ is fixed. private void assertResult(List expectedExistingSegments) throws SegmentLoadingException { - final Map expectedDataSourceSizes = expectedExistingSegments - .stream() - .collect(Collectors.toMap(DataSegment::getDataSource, DataSegment::getSize, Long::sum)); - final Map expectedDataSourceCounts = expectedExistingSegments - .stream() - .collect(Collectors.toMap(DataSegment::getDataSource, segment -> 1L, Long::sum)); - final Map> expectedDataSources - = new HashMap<>(); + final Map expectedDataSourceSizes = + expectedExistingSegments.stream() + .collect(Collectors.toMap(DataSegment::getDataSource, DataSegment::getSize, Long::sum)); + final Map expectedDataSourceCounts = + expectedExistingSegments.stream() + .collect(Collectors.toMap(DataSegment::getDataSource, segment -> 1L, Long::sum)); + final Set expectedDataSourceNames = expectedExistingSegments.stream() + .map(DataSegment::getDataSource) + .collect(Collectors.toSet()); + final Map> expectedTimelines = new HashMap<>(); for (DataSegment segment : expectedExistingSegments) { final VersionedIntervalTimeline expectedTimeline = - expectedDataSources.computeIfAbsent( + expectedTimelines.computeIfAbsent( segment.getDataSource(), k -> new VersionedIntervalTimeline<>(Ordering.natural()) ); @@ -444,11 +447,12 @@ public class SegmentManagerTest ); } + Assert.assertEquals(expectedDataSourceNames, segmentManager.getDataSourceNames()); Assert.assertEquals(expectedDataSourceCounts, segmentManager.getDataSourceCounts()); Assert.assertEquals(expectedDataSourceSizes, segmentManager.getDataSourceSizes()); final Map dataSources = segmentManager.getDataSources(); - Assert.assertEquals(expectedDataSources.size(), dataSources.size()); + Assert.assertEquals(expectedTimelines.size(), dataSources.size()); dataSources.forEach( (sourceName, dataSourceState) -> { @@ -458,7 +462,7 @@ public class SegmentManagerTest dataSourceState.getTotalSegmentSize() ); Assert.assertEquals( - expectedDataSources.get(sourceName).getAllTimelineEntries(), + expectedTimelines.get(sourceName).getAllTimelineEntries(), dataSourceState.getTimeline().getAllTimelineEntries() ); } diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidSchema.java b/sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidSchema.java index 35037e2ff2c..6762aaf13c9 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidSchema.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidSchema.java @@ -47,6 +47,7 @@ import org.apache.druid.java.util.common.guava.Yielders; import org.apache.druid.java.util.common.lifecycle.LifecycleStart; import org.apache.druid.java.util.common.lifecycle.LifecycleStop; import org.apache.druid.java.util.emitter.EmittingLogger; +import org.apache.druid.query.GlobalTableDataSource; import org.apache.druid.query.TableDataSource; import org.apache.druid.query.metadata.metadata.AllColumnIncluderator; import org.apache.druid.query.metadata.metadata.ColumnAnalysis; @@ -56,6 +57,7 @@ import org.apache.druid.query.spec.MultipleSpecificSegmentSpec; import org.apache.druid.segment.column.RowSignature; import org.apache.druid.segment.column.ValueType; import org.apache.druid.server.QueryLifecycleFactory; +import org.apache.druid.server.SegmentManager; import org.apache.druid.server.coordination.DruidServerMetadata; import org.apache.druid.server.coordination.ServerType; import org.apache.druid.server.security.AuthenticationResult; @@ -100,6 +102,7 @@ public class DruidSchema extends AbstractSchema private final QueryLifecycleFactory queryLifecycleFactory; private final PlannerConfig config; + private final SegmentManager segmentManager; private final ViewManager viewManager; private final ExecutorService cacheExec; private final ConcurrentMap tables; @@ -117,26 +120,34 @@ public class DruidSchema extends AbstractSchema private int totalSegments = 0; // All mutable segments. + @GuardedBy("lock") private final Set mutableSegments = new TreeSet<>(SEGMENT_ORDER); // All dataSources that need tables regenerated. + @GuardedBy("lock") private final Set dataSourcesNeedingRebuild = new HashSet<>(); // All segments that need to be refreshed. + @GuardedBy("lock") private final TreeSet segmentsNeedingRefresh = new TreeSet<>(SEGMENT_ORDER); // Escalator, so we can attach an authentication result to queries we generate. private final Escalator escalator; + @GuardedBy("lock") private boolean refreshImmediately = false; + @GuardedBy("lock") private long lastRefresh = 0L; + @GuardedBy("lock") private long lastFailure = 0L; + @GuardedBy("lock") private boolean isServerViewInitialized = false; @Inject public DruidSchema( final QueryLifecycleFactory queryLifecycleFactory, final TimelineServerView serverView, + final SegmentManager segmentManager, final PlannerConfig config, final ViewManager viewManager, final Escalator escalator @@ -144,6 +155,7 @@ public class DruidSchema extends AbstractSchema { this.queryLifecycleFactory = Preconditions.checkNotNull(queryLifecycleFactory, "queryLifecycleFactory"); Preconditions.checkNotNull(serverView, "serverView"); + this.segmentManager = segmentManager; this.config = Preconditions.checkNotNull(config, "config"); this.viewManager = Preconditions.checkNotNull(viewManager, "viewManager"); this.cacheExec = Execs.singleThreaded("DruidSchema-Cache-%d"); @@ -196,118 +208,113 @@ public class DruidSchema extends AbstractSchema public void start() throws InterruptedException { cacheExec.submit( - new Runnable() - { - @Override - public void run() - { - try { - while (!Thread.currentThread().isInterrupted()) { - final Set segmentsToRefresh = new TreeSet<>(); - final Set dataSourcesToRebuild = new TreeSet<>(); + () -> { + try { + while (!Thread.currentThread().isInterrupted()) { + final Set segmentsToRefresh = new TreeSet<>(); + final Set dataSourcesToRebuild = new TreeSet<>(); - try { - synchronized (lock) { - final long nextRefreshNoFuzz = DateTimes - .utc(lastRefresh) - .plus(config.getMetadataRefreshPeriod()) - .getMillis(); + try { + synchronized (lock) { + final long nextRefreshNoFuzz = DateTimes + .utc(lastRefresh) + .plus(config.getMetadataRefreshPeriod()) + .getMillis(); - // Fuzz a bit to spread load out when we have multiple brokers. - final long nextRefresh = nextRefreshNoFuzz + (long) ((nextRefreshNoFuzz - lastRefresh) * 0.10); + // Fuzz a bit to spread load out when we have multiple brokers. + final long nextRefresh = nextRefreshNoFuzz + (long) ((nextRefreshNoFuzz - lastRefresh) * 0.10); - while (true) { - // Do not refresh if it's too soon after a failure (to avoid rapid cycles of failure). - final boolean wasRecentFailure = DateTimes.utc(lastFailure) - .plus(config.getMetadataRefreshPeriod()) - .isAfterNow(); + while (true) { + // Do not refresh if it's too soon after a failure (to avoid rapid cycles of failure). + final boolean wasRecentFailure = DateTimes.utc(lastFailure) + .plus(config.getMetadataRefreshPeriod()) + .isAfterNow(); - if (isServerViewInitialized && - !wasRecentFailure && - (!segmentsNeedingRefresh.isEmpty() || !dataSourcesNeedingRebuild.isEmpty()) && - (refreshImmediately || nextRefresh < System.currentTimeMillis())) { - // We need to do a refresh. Break out of the waiting loop. - break; - } - - if (isServerViewInitialized) { - // Server view is initialized, but we don't need to do a refresh. Could happen if there are - // no segments in the system yet. Just mark us as initialized, then. - initialized.countDown(); - } - - // Wait some more, we'll wake up when it might be time to do another refresh. - lock.wait(Math.max(1, nextRefresh - System.currentTimeMillis())); + if (isServerViewInitialized && + !wasRecentFailure && + (!segmentsNeedingRefresh.isEmpty() || !dataSourcesNeedingRebuild.isEmpty()) && + (refreshImmediately || nextRefresh < System.currentTimeMillis())) { + // We need to do a refresh. Break out of the waiting loop. + break; } - segmentsToRefresh.addAll(segmentsNeedingRefresh); - segmentsNeedingRefresh.clear(); - - // Mutable segments need a refresh every period, since new columns could be added dynamically. - segmentsNeedingRefresh.addAll(mutableSegments); - - lastFailure = 0L; - lastRefresh = System.currentTimeMillis(); - refreshImmediately = false; - } - - // Refresh the segments. - final Set refreshed = refreshSegments(segmentsToRefresh); - - synchronized (lock) { - // Add missing segments back to the refresh list. - segmentsNeedingRefresh.addAll(Sets.difference(segmentsToRefresh, refreshed)); - - // Compute the list of dataSources to rebuild tables for. - dataSourcesToRebuild.addAll(dataSourcesNeedingRebuild); - refreshed.forEach(segment -> dataSourcesToRebuild.add(segment.getDataSource())); - dataSourcesNeedingRebuild.clear(); - - lock.notifyAll(); - } - - // Rebuild the dataSources. - for (String dataSource : dataSourcesToRebuild) { - final DruidTable druidTable = buildDruidTable(dataSource); - final DruidTable oldTable = tables.put(dataSource, druidTable); - if (oldTable == null || !oldTable.getRowSignature().equals(druidTable.getRowSignature())) { - log.info("dataSource [%s] has new signature: %s.", dataSource, druidTable.getRowSignature()); - } else { - log.debug("dataSource [%s] signature is unchanged.", dataSource); + if (isServerViewInitialized) { + // Server view is initialized, but we don't need to do a refresh. Could happen if there are + // no segments in the system yet. Just mark us as initialized, then. + initialized.countDown(); } + + // Wait some more, we'll wake up when it might be time to do another refresh. + lock.wait(Math.max(1, nextRefresh - System.currentTimeMillis())); } - initialized.countDown(); - } - catch (InterruptedException e) { - // Fall through. - throw e; - } - catch (Exception e) { - log.warn(e, "Metadata refresh failed, trying again soon."); + segmentsToRefresh.addAll(segmentsNeedingRefresh); + segmentsNeedingRefresh.clear(); - synchronized (lock) { - // Add our segments and dataSources back to their refresh and rebuild lists. - segmentsNeedingRefresh.addAll(segmentsToRefresh); - dataSourcesNeedingRebuild.addAll(dataSourcesToRebuild); - lastFailure = System.currentTimeMillis(); - lock.notifyAll(); + // Mutable segments need a refresh every period, since new columns could be added dynamically. + segmentsNeedingRefresh.addAll(mutableSegments); + + lastFailure = 0L; + lastRefresh = System.currentTimeMillis(); + refreshImmediately = false; + } + + // Refresh the segments. + final Set refreshed = refreshSegments(segmentsToRefresh); + + synchronized (lock) { + // Add missing segments back to the refresh list. + segmentsNeedingRefresh.addAll(Sets.difference(segmentsToRefresh, refreshed)); + + // Compute the list of dataSources to rebuild tables for. + dataSourcesToRebuild.addAll(dataSourcesNeedingRebuild); + refreshed.forEach(segment -> dataSourcesToRebuild.add(segment.getDataSource())); + dataSourcesNeedingRebuild.clear(); + + lock.notifyAll(); + } + + // Rebuild the dataSources. + for (String dataSource : dataSourcesToRebuild) { + final DruidTable druidTable = buildDruidTable(dataSource); + final DruidTable oldTable = tables.put(dataSource, druidTable); + if (oldTable == null || !oldTable.getRowSignature().equals(druidTable.getRowSignature())) { + log.info("dataSource [%s] has new signature: %s.", dataSource, druidTable.getRowSignature()); + } else { + log.debug("dataSource [%s] signature is unchanged.", dataSource); } } + + initialized.countDown(); + } + catch (InterruptedException e) { + // Fall through. + throw e; + } + catch (Exception e) { + log.warn(e, "Metadata refresh failed, trying again soon."); + + synchronized (lock) { + // Add our segments and dataSources back to their refresh and rebuild lists. + segmentsNeedingRefresh.addAll(segmentsToRefresh); + dataSourcesNeedingRebuild.addAll(dataSourcesToRebuild); + lastFailure = System.currentTimeMillis(); + lock.notifyAll(); + } } } - catch (InterruptedException e) { - // Just exit. - } - catch (Throwable e) { - // Throwables that fall out to here (not caught by an inner try/catch) are potentially gnarly, like - // OOMEs. Anyway, let's just emit an alert and stop refreshing metadata. - log.makeAlert(e, "Metadata refresh failed permanently").emit(); - throw e; - } - finally { - log.info("Metadata refresh stopped."); - } + } + catch (InterruptedException e) { + // Just exit. + } + catch (Throwable e) { + // Throwables that fall out to here (not caught by an inner try/catch) are potentially gnarly, like + // OOMEs. Anyway, let's just emit an alert and stop refreshing metadata. + log.makeAlert(e, "Metadata refresh failed permanently").emit(); + throw e; + } + finally { + log.info("Metadata refresh stopped."); } } ); @@ -350,51 +357,51 @@ public class DruidSchema extends AbstractSchema @VisibleForTesting void addSegment(final DruidServerMetadata server, final DataSegment segment) { - if (server.getType().equals(ServerType.BROKER)) { - // in theory we could just filter this to ensure we don't put ourselves in here, to make dope broker tree - // query topologies, but for now just skip all brokers, so we don't create some sort of wild infinite metadata - // loop... - return; - } synchronized (lock) { - final Map knownSegments = segmentMetadataInfo.get(segment.getDataSource()); - AvailableSegmentMetadata segmentMetadata = knownSegments != null ? knownSegments.get(segment.getId()) : null; - if (segmentMetadata == null) { - // segmentReplicatable is used to determine if segments are served by historical or realtime servers - long isRealtime = server.isSegmentReplicationTarget() ? 0 : 1; - segmentMetadata = AvailableSegmentMetadata.builder( - segment, - isRealtime, - ImmutableSet.of(server), - null, - DEFAULT_NUM_ROWS - ).build(); - // Unknown segment. - setAvailableSegmentMetadata(segment.getId(), segmentMetadata); - segmentsNeedingRefresh.add(segment.getId()); - if (!server.isSegmentReplicationTarget()) { - log.debug("Added new mutable segment[%s].", segment.getId()); - mutableSegments.add(segment.getId()); - } else { - log.debug("Added new immutable segment[%s].", segment.getId()); - } + if (server.getType().equals(ServerType.BROKER)) { + // a segment on a broker means a broadcast datasource, skip metadata because we'll also see this segment on the + // historical, however mark the datasource for refresh because it needs to be globalized + dataSourcesNeedingRebuild.add(segment.getDataSource()); } else { - final Set segmentServers = segmentMetadata.getReplicas(); - final ImmutableSet servers = new ImmutableSet.Builder() - .addAll(segmentServers) - .add(server) - .build(); - final AvailableSegmentMetadata metadataWithNumReplicas = AvailableSegmentMetadata - .from(segmentMetadata) - .withReplicas(servers) - .withRealtime(recomputeIsRealtime(servers)) - .build(); - knownSegments.put(segment.getId(), metadataWithNumReplicas); - if (server.isSegmentReplicationTarget()) { - // If a segment shows up on a replicatable (historical) server at any point, then it must be immutable, - // even if it's also available on non-replicatable (realtime) servers. - mutableSegments.remove(segment.getId()); - log.debug("Segment[%s] has become immutable.", segment.getId()); + final Map knownSegments = segmentMetadataInfo.get(segment.getDataSource()); + AvailableSegmentMetadata segmentMetadata = knownSegments != null ? knownSegments.get(segment.getId()) : null; + if (segmentMetadata == null) { + // segmentReplicatable is used to determine if segments are served by historical or realtime servers + long isRealtime = server.isSegmentReplicationTarget() ? 0 : 1; + segmentMetadata = AvailableSegmentMetadata.builder( + segment, + isRealtime, + ImmutableSet.of(server), + null, + DEFAULT_NUM_ROWS + ).build(); + // Unknown segment. + setAvailableSegmentMetadata(segment.getId(), segmentMetadata); + segmentsNeedingRefresh.add(segment.getId()); + if (!server.isSegmentReplicationTarget()) { + log.debug("Added new mutable segment[%s].", segment.getId()); + mutableSegments.add(segment.getId()); + } else { + log.debug("Added new immutable segment[%s].", segment.getId()); + } + } else { + final Set segmentServers = segmentMetadata.getReplicas(); + final ImmutableSet servers = new ImmutableSet.Builder() + .addAll(segmentServers) + .add(server) + .build(); + final AvailableSegmentMetadata metadataWithNumReplicas = AvailableSegmentMetadata + .from(segmentMetadata) + .withReplicas(servers) + .withRealtime(recomputeIsRealtime(servers)) + .build(); + knownSegments.put(segment.getId(), metadataWithNumReplicas); + if (server.isSegmentReplicationTarget()) { + // If a segment shows up on a replicatable (historical) server at any point, then it must be immutable, + // even if it's also available on non-replicatable (realtime) servers. + mutableSegments.remove(segment.getId()); + log.debug("Segment[%s] has become immutable.", segment.getId()); + } } } if (!tables.containsKey(segment.getDataSource())) { @@ -434,26 +441,28 @@ public class DruidSchema extends AbstractSchema @VisibleForTesting void removeServerSegment(final DruidServerMetadata server, final DataSegment segment) { - if (server.getType().equals(ServerType.BROKER)) { - // cheese it - return; - } synchronized (lock) { log.debug("Segment[%s] is gone from server[%s]", segment.getId(), server.getName()); - final Map knownSegments = segmentMetadataInfo.get(segment.getDataSource()); - final AvailableSegmentMetadata segmentMetadata = knownSegments.get(segment.getId()); - final Set segmentServers = segmentMetadata.getReplicas(); - final ImmutableSet servers = FluentIterable - .from(segmentServers) - .filter(Predicates.not(Predicates.equalTo(server))) - .toSet(); + if (server.getType().equals(ServerType.BROKER)) { + // a segment on a broker means a broadcast datasource, skip metadata because we'll also see this segment on the + // historical, however mark the datasource for refresh because it might no longer be broadcast or something + dataSourcesNeedingRebuild.add(segment.getDataSource()); + } else { + final Map knownSegments = segmentMetadataInfo.get(segment.getDataSource()); + final AvailableSegmentMetadata segmentMetadata = knownSegments.get(segment.getId()); + final Set segmentServers = segmentMetadata.getReplicas(); + final ImmutableSet servers = FluentIterable + .from(segmentServers) + .filter(Predicates.not(Predicates.equalTo(server))) + .toSet(); - final AvailableSegmentMetadata metadataWithNumReplicas = AvailableSegmentMetadata - .from(segmentMetadata) - .withReplicas(servers) - .withRealtime(recomputeIsRealtime(servers)) - .build(); - knownSegments.put(segment.getId(), metadataWithNumReplicas); + final AvailableSegmentMetadata metadataWithNumReplicas = AvailableSegmentMetadata + .from(segmentMetadata) + .withReplicas(servers) + .withRealtime(recomputeIsRealtime(servers)) + .build(); + knownSegments.put(segment.getId(), metadataWithNumReplicas); + } lock.notifyAll(); } } @@ -592,7 +601,7 @@ public class DruidSchema extends AbstractSchema } } - private DruidTable buildDruidTable(final String dataSource) + protected DruidTable buildDruidTable(final String dataSource) { synchronized (lock) { final Map segmentMap = segmentMetadataInfo.get(dataSource); @@ -616,7 +625,14 @@ public class DruidSchema extends AbstractSchema final RowSignature.Builder builder = RowSignature.builder(); columnTypes.forEach(builder::add); - return new DruidTable(new TableDataSource(dataSource), builder.build()); + + final TableDataSource tableDataSource; + if (segmentManager.getDataSourceNames().contains(dataSource)) { + tableDataSource = new GlobalTableDataSource(dataSource); + } else { + tableDataSource = new TableDataSource(dataSource); + } + return new DruidTable(tableDataSource, builder.build()); } } diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidCalciteSchemaModuleTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidCalciteSchemaModuleTest.java index edba60a35c2..11c2b97f424 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidCalciteSchemaModuleTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidCalciteSchemaModuleTest.java @@ -39,6 +39,7 @@ import org.apache.druid.guice.LifecycleModule; import org.apache.druid.query.lookup.LookupExtractorFactoryContainerProvider; import org.apache.druid.query.lookup.LookupReferencesManager; import org.apache.druid.server.QueryLifecycleFactory; +import org.apache.druid.server.SegmentManager; import org.apache.druid.server.security.AuthorizerMapper; import org.apache.druid.server.security.Escalator; import org.apache.druid.sql.calcite.planner.PlannerConfig; @@ -84,6 +85,8 @@ public class DruidCalciteSchemaModuleTest extends CalciteTestBase private ObjectMapper objectMapper; @Mock private LookupReferencesManager lookupReferencesManager; + @Mock + private SegmentManager segmentManager; private DruidCalciteSchemaModule target; private Injector injector; @@ -104,6 +107,7 @@ public class DruidCalciteSchemaModuleTest extends CalciteTestBase binder.bind(Escalator.class).toInstance(escalator); binder.bind(AuthorizerMapper.class).toInstance(authorizerMapper); binder.bind(InventoryView.class).toInstance(serverInventoryView); + binder.bind(SegmentManager.class).toInstance(segmentManager); binder.bind(DruidLeaderClient.class) .annotatedWith(Coordinator.class) .toInstance(coordinatorDruidLeaderClient); diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidSchemaNoDataInitTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidSchemaNoDataInitTest.java index 713dfffedf6..fd20fb594e8 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidSchemaNoDataInitTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidSchemaNoDataInitTest.java @@ -22,7 +22,9 @@ package org.apache.druid.sql.calcite.schema; import com.google.common.collect.ImmutableMap; import org.apache.druid.java.util.common.io.Closer; import org.apache.druid.query.QueryRunnerFactoryConglomerate; +import org.apache.druid.segment.loading.SegmentLoader; import org.apache.druid.server.QueryStackTests; +import org.apache.druid.server.SegmentManager; import org.apache.druid.server.security.NoopEscalator; import org.apache.druid.sql.calcite.planner.PlannerConfig; import org.apache.druid.sql.calcite.util.CalciteTestBase; @@ -30,6 +32,7 @@ import org.apache.druid.sql.calcite.util.CalciteTests; import org.apache.druid.sql.calcite.util.SpecificSegmentsQuerySegmentWalker; import org.apache.druid.sql.calcite.util.TestServerInventoryView; import org.apache.druid.sql.calcite.view.NoopViewManager; +import org.easymock.EasyMock; import org.junit.Assert; import org.junit.Test; @@ -50,6 +53,7 @@ public class DruidSchemaNoDataInitTest extends CalciteTestBase conglomerate ), new TestServerInventoryView(Collections.emptyList()), + new SegmentManager(EasyMock.createMock(SegmentLoader.class)), PLANNER_CONFIG_DEFAULT, new NoopViewManager(), new NoopEscalator() diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidSchemaTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidSchemaTest.java index ee3bca1a4f8..8455965ef5e 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidSchemaTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidSchemaTest.java @@ -22,18 +22,20 @@ package org.apache.druid.sql.calcite.schema; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Sets; import org.apache.calcite.jdbc.JavaTypeFactoryImpl; import org.apache.calcite.rel.type.RelDataType; import org.apache.calcite.rel.type.RelDataTypeField; import org.apache.calcite.schema.Table; import org.apache.calcite.sql.type.SqlTypeName; import org.apache.druid.client.ImmutableDruidServer; -import org.apache.druid.client.TimelineServerView; import org.apache.druid.data.input.InputRow; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.Pair; import org.apache.druid.java.util.common.io.Closer; +import org.apache.druid.query.GlobalTableDataSource; import org.apache.druid.query.QueryRunnerFactoryConglomerate; +import org.apache.druid.query.TableDataSource; import org.apache.druid.query.aggregation.CountAggregatorFactory; import org.apache.druid.query.aggregation.DoubleSumAggregatorFactory; import org.apache.druid.query.aggregation.LongSumAggregatorFactory; @@ -41,8 +43,10 @@ import org.apache.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFact import org.apache.druid.segment.IndexBuilder; import org.apache.druid.segment.QueryableIndex; import org.apache.druid.segment.incremental.IncrementalIndexSchema; +import org.apache.druid.segment.loading.SegmentLoader; import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory; import org.apache.druid.server.QueryStackTests; +import org.apache.druid.server.SegmentManager; import org.apache.druid.server.coordination.DruidServerMetadata; import org.apache.druid.server.coordination.ServerType; import org.apache.druid.server.security.NoopEscalator; @@ -57,8 +61,9 @@ import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.DataSegment.PruneSpecsHolder; import org.apache.druid.timeline.SegmentId; import org.apache.druid.timeline.partition.LinearShardSpec; -import org.apache.druid.timeline.partition.NoneShardSpec; import org.apache.druid.timeline.partition.NumberedShardSpec; +import org.easymock.EasyMock; +import org.joda.time.Period; import org.junit.After; import org.junit.AfterClass; import org.junit.Assert; @@ -72,11 +77,21 @@ import java.io.File; import java.io.IOException; import java.util.List; import java.util.Map; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; public class DruidSchemaTest extends CalciteTestBase { - private static final PlannerConfig PLANNER_CONFIG_DEFAULT = new PlannerConfig(); + private static final PlannerConfig PLANNER_CONFIG_DEFAULT = new PlannerConfig() + { + @Override + public Period getMetadataRefreshPeriod() + { + return new Period("PT1S"); + } + }; private static final List ROWS1 = ImmutableList.of( CalciteTests.createRow(ImmutableMap.of("t", "2000-01-01", "m1", "1.0", "dim1", "")), @@ -93,7 +108,10 @@ public class DruidSchemaTest extends CalciteTestBase private static QueryRunnerFactoryConglomerate conglomerate; private static Closer resourceCloser; + private TestServerInventoryView serverView; private List druidServers; + private CountDownLatch getDatasourcesLatch = new CountDownLatch(1); + private CountDownLatch buildTableLatch = new CountDownLatch(1); @BeforeClass public static void setUpClass() @@ -113,10 +131,13 @@ public class DruidSchemaTest extends CalciteTestBase private SpecificSegmentsQuerySegmentWalker walker = null; private DruidSchema schema = null; + private SegmentManager segmentManager; + private Set dataSourceNames; @Before public void setUp() throws Exception { + dataSourceNames = Sets.newConcurrentHashSet(); final File tmpDir = temporaryFolder.newFolder(); final QueryableIndex index1 = IndexBuilder.create() .tmpDir(new File(tmpDir, "1")) @@ -146,6 +167,16 @@ public class DruidSchemaTest extends CalciteTestBase .rows(ROWS2) .buildMMappedIndex(); + segmentManager = new SegmentManager(EasyMock.createMock(SegmentLoader.class)) + { + @Override + public Set getDataSourceNames() + { + getDatasourcesLatch.countDown(); + return dataSourceNames; + } + }; + walker = new SpecificSegmentsQuerySegmentWalker(conglomerate).add( DataSegment.builder() .dataSource(CalciteTests.DATASOURCE1) @@ -188,16 +219,26 @@ public class DruidSchemaTest extends CalciteTestBase PruneSpecsHolder.DEFAULT ); final List realtimeSegments = ImmutableList.of(segment1); - final TimelineServerView serverView = new TestServerInventoryView(walker.getSegments(), realtimeSegments); + serverView = new TestServerInventoryView(walker.getSegments(), realtimeSegments); druidServers = serverView.getDruidServers(); schema = new DruidSchema( CalciteTests.createMockQueryLifecycleFactory(walker, conglomerate), serverView, + segmentManager, PLANNER_CONFIG_DEFAULT, new NoopViewManager(), new NoopEscalator() - ); + ) + { + @Override + protected DruidTable buildDruidTable(String dataSource) + { + DruidTable table = super.buildDruidTable(dataSource); + buildTableLatch.countDown(); + return table; + } + }; schema.start(); schema.awaitInitialization(); @@ -420,34 +461,60 @@ public class DruidSchemaTest extends CalciteTestBase } @Test - public void testAvailableSegmentFromBrokerIsIgnored() + public void testLocalSegmentCacheSetsDataSourceAsGlobal() throws InterruptedException { + DruidTable fooTable = (DruidTable) schema.getTableMap().get("foo"); + Assert.assertNotNull(fooTable); + Assert.assertTrue(fooTable.getDataSource() instanceof TableDataSource); + Assert.assertFalse(fooTable.getDataSource() instanceof GlobalTableDataSource); - Assert.assertEquals(4, schema.getTotalSegments()); - - DruidServerMetadata metadata = new DruidServerMetadata( - "broker", - "localhost:0", + final DataSegment someNewBrokerSegment = new DataSegment( + "foo", + Intervals.of("2012/2013"), + "version1", + null, + ImmutableList.of("dim1", "dim2"), + ImmutableList.of("met1", "met2"), + new NumberedShardSpec(2, 3), null, - 1000L, - ServerType.BROKER, - "broken", - 0 - ); - - DataSegment segment = new DataSegment( - "test", - Intervals.of("2011-04-01/2011-04-11"), - "v1", - ImmutableMap.of(), - ImmutableList.of(), - ImmutableList.of(), - NoneShardSpec.instance(), 1, - 100L + 100L, + PruneSpecsHolder.DEFAULT ); - schema.addSegment(metadata, segment); - Assert.assertEquals(4, schema.getTotalSegments()); + dataSourceNames.add("foo"); + serverView.addSegment(someNewBrokerSegment, ServerType.BROKER); + // wait for build + buildTableLatch.await(1, TimeUnit.SECONDS); + buildTableLatch = new CountDownLatch(1); + buildTableLatch.await(1, TimeUnit.SECONDS); + + // wait for get again, just to make sure table has been updated (latch counts down just before tables are updated) + getDatasourcesLatch = new CountDownLatch(1); + getDatasourcesLatch.await(1, TimeUnit.SECONDS); + + fooTable = (DruidTable) schema.getTableMap().get("foo"); + Assert.assertNotNull(fooTable); + Assert.assertTrue(fooTable.getDataSource() instanceof TableDataSource); + Assert.assertTrue(fooTable.getDataSource() instanceof GlobalTableDataSource); + + // now remove it + dataSourceNames.remove("foo"); + serverView.removeSegment(someNewBrokerSegment, ServerType.BROKER); + + // wait for build + buildTableLatch.await(1, TimeUnit.SECONDS); + buildTableLatch = new CountDownLatch(1); + buildTableLatch.await(1, TimeUnit.SECONDS); + + // wait for get again, just to make sure table has been updated (latch counts down just before tables are updated) + getDatasourcesLatch = new CountDownLatch(1); + getDatasourcesLatch.await(1, TimeUnit.SECONDS); + + fooTable = (DruidTable) schema.getTableMap().get("foo"); + Assert.assertNotNull(fooTable); + Assert.assertTrue(fooTable.getDataSource() instanceof TableDataSource); + Assert.assertFalse(fooTable.getDataSource() instanceof GlobalTableDataSource); } + } diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java index 80bf83030f3..3e1356af7b9 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java @@ -67,9 +67,11 @@ import org.apache.druid.segment.TestHelper; import org.apache.druid.segment.column.RowSignature; import org.apache.druid.segment.column.ValueType; import org.apache.druid.segment.incremental.IncrementalIndexSchema; +import org.apache.druid.segment.loading.SegmentLoader; import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory; import org.apache.druid.server.DruidNode; import org.apache.druid.server.QueryStackTests; +import org.apache.druid.server.SegmentManager; import org.apache.druid.server.coordination.DruidServerMetadata; import org.apache.druid.server.coordination.ServerType; import org.apache.druid.server.coordinator.BytesAccumulatingResponseHandler; @@ -239,6 +241,7 @@ public class SystemSchemaTest extends CalciteTestBase druidSchema = new DruidSchema( CalciteTests.createMockQueryLifecycleFactory(walker, conglomerate), new TestServerInventoryView(walker.getSegments(), realtimeSegments), + new SegmentManager(EasyMock.createMock(SegmentLoader.class)), PLANNER_CONFIG_DEFAULT, new NoopViewManager(), new NoopEscalator() diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java b/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java index 588d5adb8e3..0ca415b8746 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java @@ -73,11 +73,13 @@ import org.apache.druid.segment.IndexBuilder; import org.apache.druid.segment.QueryableIndex; import org.apache.druid.segment.TestHelper; import org.apache.druid.segment.incremental.IncrementalIndexSchema; +import org.apache.druid.segment.loading.SegmentLoader; import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory; import org.apache.druid.server.DruidNode; import org.apache.druid.server.QueryLifecycleFactory; import org.apache.druid.server.QueryScheduler; import org.apache.druid.server.QueryStackTests; +import org.apache.druid.server.SegmentManager; import org.apache.druid.server.coordinator.BytesAccumulatingResponseHandler; import org.apache.druid.server.log.NoopRequestLogger; import org.apache.druid.server.security.Access; @@ -107,6 +109,7 @@ import org.apache.druid.sql.calcite.view.NoopViewManager; import org.apache.druid.sql.calcite.view.ViewManager; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.partition.LinearShardSpec; +import org.easymock.EasyMock; import org.joda.time.DateTime; import org.joda.time.Duration; import org.joda.time.chrono.ISOChronology; @@ -866,6 +869,7 @@ public class CalciteTests final DruidSchema schema = new DruidSchema( CalciteTests.createMockQueryLifecycleFactory(walker, conglomerate), new TestServerInventoryView(walker.getSegments()), + new SegmentManager(EasyMock.createMock(SegmentLoader.class)), plannerConfig, viewManager, TEST_AUTHENTICATOR_ESCALATOR diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/util/TestServerInventoryView.java b/sql/src/test/java/org/apache/druid/sql/calcite/util/TestServerInventoryView.java index a8e498beb1f..170d205f449 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/util/TestServerInventoryView.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/util/TestServerInventoryView.java @@ -26,6 +26,7 @@ import org.apache.druid.client.ImmutableDruidDataSource; import org.apache.druid.client.ImmutableDruidServer; import org.apache.druid.client.TimelineServerView; import org.apache.druid.client.selector.ServerSelector; +import org.apache.druid.java.util.common.Pair; import org.apache.druid.query.QueryRunner; import org.apache.druid.query.planning.DataSourceAnalysis; import org.apache.druid.server.coordination.DruidServerMetadata; @@ -63,18 +64,31 @@ public class TestServerInventoryView implements TimelineServerView "dummy", 0 ); - private final List segments; + private static final DruidServerMetadata DUMMY_BROKER = new DruidServerMetadata( + "dummy3", + "dummy3", + null, + 0, + ServerType.BROKER, + "dummy", + 0 + ); + private List segments = new ArrayList<>(); private List realtimeSegments = new ArrayList<>(); + private List brokerSegments = new ArrayList<>(); + + private List> segmentCallbackExecs = new ArrayList<>(); + private List> timelineCallbackExecs = new ArrayList<>(); public TestServerInventoryView(List segments) { - this.segments = ImmutableList.copyOf(segments); + this.segments.addAll(segments); } public TestServerInventoryView(List segments, List realtimeSegments) { - this.segments = ImmutableList.copyOf(segments); - this.realtimeSegments = ImmutableList.copyOf(realtimeSegments); + this.segments.addAll(segments); + this.realtimeSegments.addAll(realtimeSegments); } @Override @@ -87,6 +101,7 @@ public class TestServerInventoryView implements TimelineServerView @Override public List getDruidServers() { + // do not return broker on purpose to mimic behavior of BrokerServerView final ImmutableDruidDataSource dataSource = new ImmutableDruidDataSource("DUMMY", Collections.emptyMap(), segments); final ImmutableDruidServer server = new ImmutableDruidServer( DUMMY_SERVER, @@ -118,6 +133,7 @@ public class TestServerInventoryView implements TimelineServerView exec.execute(() -> callback.segmentAdded(DUMMY_SERVER_REALTIME, segment)); } exec.execute(callback::segmentViewInitialized); + segmentCallbackExecs.add(new Pair<>(exec, callback)); } @Override @@ -130,6 +146,7 @@ public class TestServerInventoryView implements TimelineServerView exec.execute(() -> callback.segmentAdded(DUMMY_SERVER_REALTIME, segment)); } exec.execute(callback::timelineInitialized); + timelineCallbackExecs.add(new Pair<>(exec, callback)); } @Override @@ -143,4 +160,57 @@ public class TestServerInventoryView implements TimelineServerView { // Do nothing } + + public void addSegment(DataSegment segment, ServerType serverType) + { + final Pair> whichServerAndSegments = + getDummyServerAndSegmentsForType(serverType); + final DruidServerMetadata whichServer = whichServerAndSegments.lhs; + whichServerAndSegments.rhs.add(segment); + segmentCallbackExecs.forEach( + execAndCallback -> execAndCallback.lhs.execute(() -> execAndCallback.rhs.segmentAdded(whichServer, segment)) + ); + timelineCallbackExecs.forEach( + execAndCallback -> execAndCallback.lhs.execute(() -> execAndCallback.rhs.segmentAdded(whichServer, segment)) + ); + } + + public void removeSegment(DataSegment segment, ServerType serverType) + { + final Pair> whichServerAndSegments = + getDummyServerAndSegmentsForType(serverType); + final DruidServerMetadata whichServer = whichServerAndSegments.lhs; + whichServerAndSegments.rhs.remove(segment); + segmentCallbackExecs.forEach( + execAndCallback -> execAndCallback.lhs.execute(() -> execAndCallback.rhs.segmentRemoved(whichServer, segment)) + ); + timelineCallbackExecs.forEach( + execAndCallback -> execAndCallback.lhs.execute(() -> { + execAndCallback.rhs.serverSegmentRemoved(whichServer, segment); + // assume that all replicas have been removed and fire this one too + execAndCallback.rhs.segmentRemoved(segment); + }) + ); + } + + private Pair> getDummyServerAndSegmentsForType(ServerType serverType) + { + final DruidServerMetadata whichServer; + final List whichSegments; + switch (serverType) { + case BROKER: + whichServer = DUMMY_BROKER; + whichSegments = brokerSegments; + break; + case REALTIME: + whichServer = DUMMY_SERVER_REALTIME; + whichSegments = realtimeSegments; + break; + default: + whichServer = DUMMY_SERVER; + whichSegments = segments; + break; + } + return new Pair<>(whichServer, whichSegments); + } }