global table datasource for broadcast segments (#10020)

* global table datasource for broadcast segments

* tests

* fix

* fix test

* comments and javadocs

* review stuffs

* use generated equals and hashcode
This commit is contained in:
Clint Wylie 2020-06-16 17:58:05 -07:00 committed by GitHub
parent 4e483a70b4
commit 68aa384190
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 545 additions and 243 deletions

View File

@ -35,7 +35,8 @@ import java.util.Set;
@JsonSubTypes.Type(value = UnionDataSource.class, name = "union"),
@JsonSubTypes.Type(value = JoinDataSource.class, name = "join"),
@JsonSubTypes.Type(value = LookupDataSource.class, name = "lookup"),
@JsonSubTypes.Type(value = InlineDataSource.class, name = "inline")
@JsonSubTypes.Type(value = InlineDataSource.class, name = "inline"),
@JsonSubTypes.Type(value = GlobalTableDataSource.class, name = "globalTable")
})
public interface DataSource
{

View File

@ -0,0 +1,58 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
/**
* {@link TableDataSource} variant for globally available 'broadcast' segments. If bound to a
* {@link org.apache.druid.segment.join.JoinableFactory} that can create an
* {@link org.apache.druid.segment.join.table.IndexedTable} using DruidBinders.joinableFactoryBinder, this allows
* optimal usage of segments using this DataSource type in join operations (because they are global), and so can be
* pushed down to historicals as a {@link JoinDataSource}, instead of requiring a subquery join using
* {@link InlineDataSource} to construct an {@link org.apache.druid.segment.join.table.IndexedTable} on the fly on the
* broker. Because it is also a {@link TableDataSource}, when queried directly, or on the left hand side of a join,
* they will be treated as any normal table datasource.
*/
@JsonTypeName("globalTable")
public class GlobalTableDataSource extends TableDataSource
{
@JsonCreator
public GlobalTableDataSource(@JsonProperty("name") String name)
{
super(name);
}
@Override
public boolean isGlobal()
{
return true;
}
@Override
public String toString()
{
return "GlobalTableDataSource{" +
"name='" + getName() + '\'' +
'}';
}
}

View File

@ -27,6 +27,7 @@ import org.apache.druid.java.util.common.IAE;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Set;
@JsonTypeName("table")
@ -99,27 +100,21 @@ public class TableDataSource implements DataSource
}
@Override
public final boolean equals(Object o)
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (!(o instanceof TableDataSource)) {
if (o == null || getClass() != o.getClass()) {
return false;
}
TableDataSource that = (TableDataSource) o;
if (!name.equals(that.name)) {
return false;
}
return true;
return name.equals(that.name);
}
@Override
public final int hashCode()
public int hashCode()
{
return name.hashCode();
return Objects.hash(name);
}
}

View File

@ -99,5 +99,4 @@ public class DataSourceTest
final DataSource serde = JSON_MAPPER.readValue(JSON_MAPPER.writeValueAsString(dataSource), DataSource.class);
Assert.assertEquals(dataSource, serde);
}
}

View File

@ -0,0 +1,67 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import nl.jqno.equalsverifier.EqualsVerifier;
import org.apache.druid.segment.TestHelper;
import org.junit.Assert;
import org.junit.Test;
public class GlobalTableDataSourceTest
{
private static final GlobalTableDataSource GLOBAL_TABLE_DATA_SOURCE = new GlobalTableDataSource("foo");
@Test
public void testEquals()
{
EqualsVerifier.forClass(GlobalTableDataSource.class)
.usingGetClass()
.withNonnullFields("name")
.verify();
}
@Test
public void testGlobalTableIsNotEqualsTable()
{
TableDataSource tbl = new TableDataSource(GLOBAL_TABLE_DATA_SOURCE.getName());
Assert.assertNotEquals(GLOBAL_TABLE_DATA_SOURCE, tbl);
Assert.assertNotEquals(tbl, GLOBAL_TABLE_DATA_SOURCE);
}
@Test
public void testIsGlobal()
{
Assert.assertTrue(GLOBAL_TABLE_DATA_SOURCE.isGlobal());
}
@Test
public void testSerde() throws JsonProcessingException
{
final ObjectMapper jsonMapper = TestHelper.makeJsonMapper();
final GlobalTableDataSource deserialized = (GlobalTableDataSource) jsonMapper.readValue(
jsonMapper.writeValueAsString(GLOBAL_TABLE_DATA_SOURCE),
DataSource.class
);
Assert.assertEquals(GLOBAL_TABLE_DATA_SOURCE, deserialized);
}
}

View File

@ -86,7 +86,7 @@ public class TableDataSourceTest
@Test
public void test_equals()
{
EqualsVerifier.forClass(TableDataSource.class).withNonnullFields("name").verify();
EqualsVerifier.forClass(TableDataSource.class).usingGetClass().withNonnullFields("name").verify();
}
@Test

View File

@ -218,16 +218,13 @@ public class BrokerServerView implements TimelineServerView
private void serverAddedSegment(final DruidServerMetadata server, final DataSegment segment)
{
if (server.getType().equals(ServerType.BROKER)) {
// in theory we could just filter this to ensure we don't put ourselves in here, to make dope broker tree
// query topologies, but for now just skip all brokers, so we don't create some sort of wild infinite query
// loop...
return;
}
SegmentId segmentId = segment.getId();
synchronized (lock) {
// in theory we could probably just filter this to ensure we don't put ourselves in here, to make broker tree
// query topologies, but for now just skip all brokers, so we don't create some sort of wild infinite query
// loop...
if (!server.getType().equals(ServerType.BROKER)) {
log.debug("Adding segment[%s] for server[%s]", segment, server);
ServerSelector selector = selectors.get(segmentId);
if (selector == null) {
selector = new ServerSelector(segment, tierSelectorStrategy);
@ -247,22 +244,28 @@ public class BrokerServerView implements TimelineServerView
queryableDruidServer = addServer(baseView.getInventoryValue(server.getName()));
}
selector.addServerAndUpdateSegment(queryableDruidServer, segment);
}
// run the callbacks, even if the segment came from a broker, lets downstream watchers decide what to do with it
runTimelineCallbacks(callback -> callback.segmentAdded(server, segment));
}
}
private void serverRemovedSegment(DruidServerMetadata server, DataSegment segment)
{
if (server.getType().equals(ServerType.BROKER)) {
// might as well save the trouble of grabbing a lock for something that isn't there..
return;
}
SegmentId segmentId = segment.getId();
final ServerSelector selector;
synchronized (lock) {
log.debug("Removing segment[%s] from server[%s].", segmentId, server);
// we don't store broker segments here, but still run the callbacks for the segment being removed from the server
// since the broker segments are not stored on the timeline, do not fire segmentRemoved event
if (server.getType().equals(ServerType.BROKER)) {
runTimelineCallbacks(callback -> callback.serverSegmentRemoved(server, segment));
return;
}
selector = selectors.get(segmentId);
if (selector == null) {
log.warn("Told to remove non-existant segment[%s]", segmentId);

View File

@ -129,6 +129,7 @@ public class LocalQuerySegmentWalker implements QuerySegmentWalker
.applyPostMergeDecoration()
.emitCPUTimeMetric(emitter, cpuAccumulator);
}
@Override
public <T> QueryRunner<T> getQueryRunnerForSegments(final Query<T> query, final Iterable<SegmentDescriptor> specs)
{

View File

@ -40,6 +40,7 @@ import org.apache.druid.utils.CollectionUtils;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
/**
@ -121,6 +122,11 @@ public class SegmentManager
return CollectionUtils.mapValues(dataSources, SegmentManager.DataSourceState::getTotalSegmentSize);
}
public Set<String> getDataSourceNames()
{
return dataSources.keySet();
}
/**
* Returns a map of dataSource to the number of segments managed by this segmentManager. This method should be
* carefully because the returned map might be different from the actual data source states.

View File

@ -51,6 +51,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@ -421,17 +422,19 @@ public class SegmentManagerTest
@SuppressWarnings("RedundantThrows") // TODO remove when the bug in intelliJ is fixed.
private void assertResult(List<DataSegment> expectedExistingSegments) throws SegmentLoadingException
{
final Map<String, Long> expectedDataSourceSizes = expectedExistingSegments
.stream()
final Map<String, Long> expectedDataSourceSizes =
expectedExistingSegments.stream()
.collect(Collectors.toMap(DataSegment::getDataSource, DataSegment::getSize, Long::sum));
final Map<String, Long> expectedDataSourceCounts = expectedExistingSegments
.stream()
final Map<String, Long> expectedDataSourceCounts =
expectedExistingSegments.stream()
.collect(Collectors.toMap(DataSegment::getDataSource, segment -> 1L, Long::sum));
final Map<String, VersionedIntervalTimeline<String, ReferenceCountingSegment>> expectedDataSources
= new HashMap<>();
final Set<String> expectedDataSourceNames = expectedExistingSegments.stream()
.map(DataSegment::getDataSource)
.collect(Collectors.toSet());
final Map<String, VersionedIntervalTimeline<String, ReferenceCountingSegment>> expectedTimelines = new HashMap<>();
for (DataSegment segment : expectedExistingSegments) {
final VersionedIntervalTimeline<String, ReferenceCountingSegment> expectedTimeline =
expectedDataSources.computeIfAbsent(
expectedTimelines.computeIfAbsent(
segment.getDataSource(),
k -> new VersionedIntervalTimeline<>(Ordering.natural())
);
@ -444,11 +447,12 @@ public class SegmentManagerTest
);
}
Assert.assertEquals(expectedDataSourceNames, segmentManager.getDataSourceNames());
Assert.assertEquals(expectedDataSourceCounts, segmentManager.getDataSourceCounts());
Assert.assertEquals(expectedDataSourceSizes, segmentManager.getDataSourceSizes());
final Map<String, DataSourceState> dataSources = segmentManager.getDataSources();
Assert.assertEquals(expectedDataSources.size(), dataSources.size());
Assert.assertEquals(expectedTimelines.size(), dataSources.size());
dataSources.forEach(
(sourceName, dataSourceState) -> {
@ -458,7 +462,7 @@ public class SegmentManagerTest
dataSourceState.getTotalSegmentSize()
);
Assert.assertEquals(
expectedDataSources.get(sourceName).getAllTimelineEntries(),
expectedTimelines.get(sourceName).getAllTimelineEntries(),
dataSourceState.getTimeline().getAllTimelineEntries()
);
}

View File

@ -47,6 +47,7 @@ import org.apache.druid.java.util.common.guava.Yielders;
import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.query.GlobalTableDataSource;
import org.apache.druid.query.TableDataSource;
import org.apache.druid.query.metadata.metadata.AllColumnIncluderator;
import org.apache.druid.query.metadata.metadata.ColumnAnalysis;
@ -56,6 +57,7 @@ import org.apache.druid.query.spec.MultipleSpecificSegmentSpec;
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.segment.column.ValueType;
import org.apache.druid.server.QueryLifecycleFactory;
import org.apache.druid.server.SegmentManager;
import org.apache.druid.server.coordination.DruidServerMetadata;
import org.apache.druid.server.coordination.ServerType;
import org.apache.druid.server.security.AuthenticationResult;
@ -100,6 +102,7 @@ public class DruidSchema extends AbstractSchema
private final QueryLifecycleFactory queryLifecycleFactory;
private final PlannerConfig config;
private final SegmentManager segmentManager;
private final ViewManager viewManager;
private final ExecutorService cacheExec;
private final ConcurrentMap<String, DruidTable> tables;
@ -117,26 +120,34 @@ public class DruidSchema extends AbstractSchema
private int totalSegments = 0;
// All mutable segments.
@GuardedBy("lock")
private final Set<SegmentId> mutableSegments = new TreeSet<>(SEGMENT_ORDER);
// All dataSources that need tables regenerated.
@GuardedBy("lock")
private final Set<String> dataSourcesNeedingRebuild = new HashSet<>();
// All segments that need to be refreshed.
@GuardedBy("lock")
private final TreeSet<SegmentId> segmentsNeedingRefresh = new TreeSet<>(SEGMENT_ORDER);
// Escalator, so we can attach an authentication result to queries we generate.
private final Escalator escalator;
@GuardedBy("lock")
private boolean refreshImmediately = false;
@GuardedBy("lock")
private long lastRefresh = 0L;
@GuardedBy("lock")
private long lastFailure = 0L;
@GuardedBy("lock")
private boolean isServerViewInitialized = false;
@Inject
public DruidSchema(
final QueryLifecycleFactory queryLifecycleFactory,
final TimelineServerView serverView,
final SegmentManager segmentManager,
final PlannerConfig config,
final ViewManager viewManager,
final Escalator escalator
@ -144,6 +155,7 @@ public class DruidSchema extends AbstractSchema
{
this.queryLifecycleFactory = Preconditions.checkNotNull(queryLifecycleFactory, "queryLifecycleFactory");
Preconditions.checkNotNull(serverView, "serverView");
this.segmentManager = segmentManager;
this.config = Preconditions.checkNotNull(config, "config");
this.viewManager = Preconditions.checkNotNull(viewManager, "viewManager");
this.cacheExec = Execs.singleThreaded("DruidSchema-Cache-%d");
@ -196,11 +208,7 @@ public class DruidSchema extends AbstractSchema
public void start() throws InterruptedException
{
cacheExec.submit(
new Runnable()
{
@Override
public void run()
{
() -> {
try {
while (!Thread.currentThread().isInterrupted()) {
final Set<SegmentId> segmentsToRefresh = new TreeSet<>();
@ -309,7 +317,6 @@ public class DruidSchema extends AbstractSchema
log.info("Metadata refresh stopped.");
}
}
}
);
if (config.isAwaitInitializationOnStart()) {
@ -350,13 +357,12 @@ public class DruidSchema extends AbstractSchema
@VisibleForTesting
void addSegment(final DruidServerMetadata server, final DataSegment segment)
{
if (server.getType().equals(ServerType.BROKER)) {
// in theory we could just filter this to ensure we don't put ourselves in here, to make dope broker tree
// query topologies, but for now just skip all brokers, so we don't create some sort of wild infinite metadata
// loop...
return;
}
synchronized (lock) {
if (server.getType().equals(ServerType.BROKER)) {
// a segment on a broker means a broadcast datasource, skip metadata because we'll also see this segment on the
// historical, however mark the datasource for refresh because it needs to be globalized
dataSourcesNeedingRebuild.add(segment.getDataSource());
} else {
final Map<SegmentId, AvailableSegmentMetadata> knownSegments = segmentMetadataInfo.get(segment.getDataSource());
AvailableSegmentMetadata segmentMetadata = knownSegments != null ? knownSegments.get(segment.getId()) : null;
if (segmentMetadata == null) {
@ -397,6 +403,7 @@ public class DruidSchema extends AbstractSchema
log.debug("Segment[%s] has become immutable.", segment.getId());
}
}
}
if (!tables.containsKey(segment.getDataSource())) {
refreshImmediately = true;
}
@ -434,12 +441,13 @@ public class DruidSchema extends AbstractSchema
@VisibleForTesting
void removeServerSegment(final DruidServerMetadata server, final DataSegment segment)
{
if (server.getType().equals(ServerType.BROKER)) {
// cheese it
return;
}
synchronized (lock) {
log.debug("Segment[%s] is gone from server[%s]", segment.getId(), server.getName());
if (server.getType().equals(ServerType.BROKER)) {
// a segment on a broker means a broadcast datasource, skip metadata because we'll also see this segment on the
// historical, however mark the datasource for refresh because it might no longer be broadcast or something
dataSourcesNeedingRebuild.add(segment.getDataSource());
} else {
final Map<SegmentId, AvailableSegmentMetadata> knownSegments = segmentMetadataInfo.get(segment.getDataSource());
final AvailableSegmentMetadata segmentMetadata = knownSegments.get(segment.getId());
final Set<DruidServerMetadata> segmentServers = segmentMetadata.getReplicas();
@ -454,6 +462,7 @@ public class DruidSchema extends AbstractSchema
.withRealtime(recomputeIsRealtime(servers))
.build();
knownSegments.put(segment.getId(), metadataWithNumReplicas);
}
lock.notifyAll();
}
}
@ -592,7 +601,7 @@ public class DruidSchema extends AbstractSchema
}
}
private DruidTable buildDruidTable(final String dataSource)
protected DruidTable buildDruidTable(final String dataSource)
{
synchronized (lock) {
final Map<SegmentId, AvailableSegmentMetadata> segmentMap = segmentMetadataInfo.get(dataSource);
@ -616,7 +625,14 @@ public class DruidSchema extends AbstractSchema
final RowSignature.Builder builder = RowSignature.builder();
columnTypes.forEach(builder::add);
return new DruidTable(new TableDataSource(dataSource), builder.build());
final TableDataSource tableDataSource;
if (segmentManager.getDataSourceNames().contains(dataSource)) {
tableDataSource = new GlobalTableDataSource(dataSource);
} else {
tableDataSource = new TableDataSource(dataSource);
}
return new DruidTable(tableDataSource, builder.build());
}
}

View File

@ -39,6 +39,7 @@ import org.apache.druid.guice.LifecycleModule;
import org.apache.druid.query.lookup.LookupExtractorFactoryContainerProvider;
import org.apache.druid.query.lookup.LookupReferencesManager;
import org.apache.druid.server.QueryLifecycleFactory;
import org.apache.druid.server.SegmentManager;
import org.apache.druid.server.security.AuthorizerMapper;
import org.apache.druid.server.security.Escalator;
import org.apache.druid.sql.calcite.planner.PlannerConfig;
@ -84,6 +85,8 @@ public class DruidCalciteSchemaModuleTest extends CalciteTestBase
private ObjectMapper objectMapper;
@Mock
private LookupReferencesManager lookupReferencesManager;
@Mock
private SegmentManager segmentManager;
private DruidCalciteSchemaModule target;
private Injector injector;
@ -104,6 +107,7 @@ public class DruidCalciteSchemaModuleTest extends CalciteTestBase
binder.bind(Escalator.class).toInstance(escalator);
binder.bind(AuthorizerMapper.class).toInstance(authorizerMapper);
binder.bind(InventoryView.class).toInstance(serverInventoryView);
binder.bind(SegmentManager.class).toInstance(segmentManager);
binder.bind(DruidLeaderClient.class)
.annotatedWith(Coordinator.class)
.toInstance(coordinatorDruidLeaderClient);

View File

@ -22,7 +22,9 @@ package org.apache.druid.sql.calcite.schema;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.query.QueryRunnerFactoryConglomerate;
import org.apache.druid.segment.loading.SegmentLoader;
import org.apache.druid.server.QueryStackTests;
import org.apache.druid.server.SegmentManager;
import org.apache.druid.server.security.NoopEscalator;
import org.apache.druid.sql.calcite.planner.PlannerConfig;
import org.apache.druid.sql.calcite.util.CalciteTestBase;
@ -30,6 +32,7 @@ import org.apache.druid.sql.calcite.util.CalciteTests;
import org.apache.druid.sql.calcite.util.SpecificSegmentsQuerySegmentWalker;
import org.apache.druid.sql.calcite.util.TestServerInventoryView;
import org.apache.druid.sql.calcite.view.NoopViewManager;
import org.easymock.EasyMock;
import org.junit.Assert;
import org.junit.Test;
@ -50,6 +53,7 @@ public class DruidSchemaNoDataInitTest extends CalciteTestBase
conglomerate
),
new TestServerInventoryView(Collections.emptyList()),
new SegmentManager(EasyMock.createMock(SegmentLoader.class)),
PLANNER_CONFIG_DEFAULT,
new NoopViewManager(),
new NoopEscalator()

View File

@ -22,18 +22,20 @@ package org.apache.druid.sql.calcite.schema;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Sets;
import org.apache.calcite.jdbc.JavaTypeFactoryImpl;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeField;
import org.apache.calcite.schema.Table;
import org.apache.calcite.sql.type.SqlTypeName;
import org.apache.druid.client.ImmutableDruidServer;
import org.apache.druid.client.TimelineServerView;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.query.GlobalTableDataSource;
import org.apache.druid.query.QueryRunnerFactoryConglomerate;
import org.apache.druid.query.TableDataSource;
import org.apache.druid.query.aggregation.CountAggregatorFactory;
import org.apache.druid.query.aggregation.DoubleSumAggregatorFactory;
import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
@ -41,8 +43,10 @@ import org.apache.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFact
import org.apache.druid.segment.IndexBuilder;
import org.apache.druid.segment.QueryableIndex;
import org.apache.druid.segment.incremental.IncrementalIndexSchema;
import org.apache.druid.segment.loading.SegmentLoader;
import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
import org.apache.druid.server.QueryStackTests;
import org.apache.druid.server.SegmentManager;
import org.apache.druid.server.coordination.DruidServerMetadata;
import org.apache.druid.server.coordination.ServerType;
import org.apache.druid.server.security.NoopEscalator;
@ -57,8 +61,9 @@ import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.DataSegment.PruneSpecsHolder;
import org.apache.druid.timeline.SegmentId;
import org.apache.druid.timeline.partition.LinearShardSpec;
import org.apache.druid.timeline.partition.NoneShardSpec;
import org.apache.druid.timeline.partition.NumberedShardSpec;
import org.easymock.EasyMock;
import org.joda.time.Period;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
@ -72,11 +77,21 @@ import java.io.File;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
public class DruidSchemaTest extends CalciteTestBase
{
private static final PlannerConfig PLANNER_CONFIG_DEFAULT = new PlannerConfig();
private static final PlannerConfig PLANNER_CONFIG_DEFAULT = new PlannerConfig()
{
@Override
public Period getMetadataRefreshPeriod()
{
return new Period("PT1S");
}
};
private static final List<InputRow> ROWS1 = ImmutableList.of(
CalciteTests.createRow(ImmutableMap.of("t", "2000-01-01", "m1", "1.0", "dim1", "")),
@ -93,7 +108,10 @@ public class DruidSchemaTest extends CalciteTestBase
private static QueryRunnerFactoryConglomerate conglomerate;
private static Closer resourceCloser;
private TestServerInventoryView serverView;
private List<ImmutableDruidServer> druidServers;
private CountDownLatch getDatasourcesLatch = new CountDownLatch(1);
private CountDownLatch buildTableLatch = new CountDownLatch(1);
@BeforeClass
public static void setUpClass()
@ -113,10 +131,13 @@ public class DruidSchemaTest extends CalciteTestBase
private SpecificSegmentsQuerySegmentWalker walker = null;
private DruidSchema schema = null;
private SegmentManager segmentManager;
private Set<String> dataSourceNames;
@Before
public void setUp() throws Exception
{
dataSourceNames = Sets.newConcurrentHashSet();
final File tmpDir = temporaryFolder.newFolder();
final QueryableIndex index1 = IndexBuilder.create()
.tmpDir(new File(tmpDir, "1"))
@ -146,6 +167,16 @@ public class DruidSchemaTest extends CalciteTestBase
.rows(ROWS2)
.buildMMappedIndex();
segmentManager = new SegmentManager(EasyMock.createMock(SegmentLoader.class))
{
@Override
public Set<String> getDataSourceNames()
{
getDatasourcesLatch.countDown();
return dataSourceNames;
}
};
walker = new SpecificSegmentsQuerySegmentWalker(conglomerate).add(
DataSegment.builder()
.dataSource(CalciteTests.DATASOURCE1)
@ -188,16 +219,26 @@ public class DruidSchemaTest extends CalciteTestBase
PruneSpecsHolder.DEFAULT
);
final List<DataSegment> realtimeSegments = ImmutableList.of(segment1);
final TimelineServerView serverView = new TestServerInventoryView(walker.getSegments(), realtimeSegments);
serverView = new TestServerInventoryView(walker.getSegments(), realtimeSegments);
druidServers = serverView.getDruidServers();
schema = new DruidSchema(
CalciteTests.createMockQueryLifecycleFactory(walker, conglomerate),
serverView,
segmentManager,
PLANNER_CONFIG_DEFAULT,
new NoopViewManager(),
new NoopEscalator()
);
)
{
@Override
protected DruidTable buildDruidTable(String dataSource)
{
DruidTable table = super.buildDruidTable(dataSource);
buildTableLatch.countDown();
return table;
}
};
schema.start();
schema.awaitInitialization();
@ -420,34 +461,60 @@ public class DruidSchemaTest extends CalciteTestBase
}
@Test
public void testAvailableSegmentFromBrokerIsIgnored()
public void testLocalSegmentCacheSetsDataSourceAsGlobal() throws InterruptedException
{
DruidTable fooTable = (DruidTable) schema.getTableMap().get("foo");
Assert.assertNotNull(fooTable);
Assert.assertTrue(fooTable.getDataSource() instanceof TableDataSource);
Assert.assertFalse(fooTable.getDataSource() instanceof GlobalTableDataSource);
Assert.assertEquals(4, schema.getTotalSegments());
DruidServerMetadata metadata = new DruidServerMetadata(
"broker",
"localhost:0",
final DataSegment someNewBrokerSegment = new DataSegment(
"foo",
Intervals.of("2012/2013"),
"version1",
null,
ImmutableList.of("dim1", "dim2"),
ImmutableList.of("met1", "met2"),
new NumberedShardSpec(2, 3),
null,
1000L,
ServerType.BROKER,
"broken",
0
);
DataSegment segment = new DataSegment(
"test",
Intervals.of("2011-04-01/2011-04-11"),
"v1",
ImmutableMap.of(),
ImmutableList.of(),
ImmutableList.of(),
NoneShardSpec.instance(),
1,
100L
100L,
PruneSpecsHolder.DEFAULT
);
schema.addSegment(metadata, segment);
Assert.assertEquals(4, schema.getTotalSegments());
dataSourceNames.add("foo");
serverView.addSegment(someNewBrokerSegment, ServerType.BROKER);
// wait for build
buildTableLatch.await(1, TimeUnit.SECONDS);
buildTableLatch = new CountDownLatch(1);
buildTableLatch.await(1, TimeUnit.SECONDS);
// wait for get again, just to make sure table has been updated (latch counts down just before tables are updated)
getDatasourcesLatch = new CountDownLatch(1);
getDatasourcesLatch.await(1, TimeUnit.SECONDS);
fooTable = (DruidTable) schema.getTableMap().get("foo");
Assert.assertNotNull(fooTable);
Assert.assertTrue(fooTable.getDataSource() instanceof TableDataSource);
Assert.assertTrue(fooTable.getDataSource() instanceof GlobalTableDataSource);
// now remove it
dataSourceNames.remove("foo");
serverView.removeSegment(someNewBrokerSegment, ServerType.BROKER);
// wait for build
buildTableLatch.await(1, TimeUnit.SECONDS);
buildTableLatch = new CountDownLatch(1);
buildTableLatch.await(1, TimeUnit.SECONDS);
// wait for get again, just to make sure table has been updated (latch counts down just before tables are updated)
getDatasourcesLatch = new CountDownLatch(1);
getDatasourcesLatch.await(1, TimeUnit.SECONDS);
fooTable = (DruidTable) schema.getTableMap().get("foo");
Assert.assertNotNull(fooTable);
Assert.assertTrue(fooTable.getDataSource() instanceof TableDataSource);
Assert.assertFalse(fooTable.getDataSource() instanceof GlobalTableDataSource);
}
}
}

View File

@ -67,9 +67,11 @@ import org.apache.druid.segment.TestHelper;
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.segment.column.ValueType;
import org.apache.druid.segment.incremental.IncrementalIndexSchema;
import org.apache.druid.segment.loading.SegmentLoader;
import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
import org.apache.druid.server.DruidNode;
import org.apache.druid.server.QueryStackTests;
import org.apache.druid.server.SegmentManager;
import org.apache.druid.server.coordination.DruidServerMetadata;
import org.apache.druid.server.coordination.ServerType;
import org.apache.druid.server.coordinator.BytesAccumulatingResponseHandler;
@ -239,6 +241,7 @@ public class SystemSchemaTest extends CalciteTestBase
druidSchema = new DruidSchema(
CalciteTests.createMockQueryLifecycleFactory(walker, conglomerate),
new TestServerInventoryView(walker.getSegments(), realtimeSegments),
new SegmentManager(EasyMock.createMock(SegmentLoader.class)),
PLANNER_CONFIG_DEFAULT,
new NoopViewManager(),
new NoopEscalator()

View File

@ -73,11 +73,13 @@ import org.apache.druid.segment.IndexBuilder;
import org.apache.druid.segment.QueryableIndex;
import org.apache.druid.segment.TestHelper;
import org.apache.druid.segment.incremental.IncrementalIndexSchema;
import org.apache.druid.segment.loading.SegmentLoader;
import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
import org.apache.druid.server.DruidNode;
import org.apache.druid.server.QueryLifecycleFactory;
import org.apache.druid.server.QueryScheduler;
import org.apache.druid.server.QueryStackTests;
import org.apache.druid.server.SegmentManager;
import org.apache.druid.server.coordinator.BytesAccumulatingResponseHandler;
import org.apache.druid.server.log.NoopRequestLogger;
import org.apache.druid.server.security.Access;
@ -107,6 +109,7 @@ import org.apache.druid.sql.calcite.view.NoopViewManager;
import org.apache.druid.sql.calcite.view.ViewManager;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.LinearShardSpec;
import org.easymock.EasyMock;
import org.joda.time.DateTime;
import org.joda.time.Duration;
import org.joda.time.chrono.ISOChronology;
@ -866,6 +869,7 @@ public class CalciteTests
final DruidSchema schema = new DruidSchema(
CalciteTests.createMockQueryLifecycleFactory(walker, conglomerate),
new TestServerInventoryView(walker.getSegments()),
new SegmentManager(EasyMock.createMock(SegmentLoader.class)),
plannerConfig,
viewManager,
TEST_AUTHENTICATOR_ESCALATOR

View File

@ -26,6 +26,7 @@ import org.apache.druid.client.ImmutableDruidDataSource;
import org.apache.druid.client.ImmutableDruidServer;
import org.apache.druid.client.TimelineServerView;
import org.apache.druid.client.selector.ServerSelector;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.query.QueryRunner;
import org.apache.druid.query.planning.DataSourceAnalysis;
import org.apache.druid.server.coordination.DruidServerMetadata;
@ -63,18 +64,31 @@ public class TestServerInventoryView implements TimelineServerView
"dummy",
0
);
private final List<DataSegment> segments;
private static final DruidServerMetadata DUMMY_BROKER = new DruidServerMetadata(
"dummy3",
"dummy3",
null,
0,
ServerType.BROKER,
"dummy",
0
);
private List<DataSegment> segments = new ArrayList<>();
private List<DataSegment> realtimeSegments = new ArrayList<>();
private List<DataSegment> brokerSegments = new ArrayList<>();
private List<Pair<Executor, SegmentCallback>> segmentCallbackExecs = new ArrayList<>();
private List<Pair<Executor, TimelineCallback>> timelineCallbackExecs = new ArrayList<>();
public TestServerInventoryView(List<DataSegment> segments)
{
this.segments = ImmutableList.copyOf(segments);
this.segments.addAll(segments);
}
public TestServerInventoryView(List<DataSegment> segments, List<DataSegment> realtimeSegments)
{
this.segments = ImmutableList.copyOf(segments);
this.realtimeSegments = ImmutableList.copyOf(realtimeSegments);
this.segments.addAll(segments);
this.realtimeSegments.addAll(realtimeSegments);
}
@Override
@ -87,6 +101,7 @@ public class TestServerInventoryView implements TimelineServerView
@Override
public List<ImmutableDruidServer> getDruidServers()
{
// do not return broker on purpose to mimic behavior of BrokerServerView
final ImmutableDruidDataSource dataSource = new ImmutableDruidDataSource("DUMMY", Collections.emptyMap(), segments);
final ImmutableDruidServer server = new ImmutableDruidServer(
DUMMY_SERVER,
@ -118,6 +133,7 @@ public class TestServerInventoryView implements TimelineServerView
exec.execute(() -> callback.segmentAdded(DUMMY_SERVER_REALTIME, segment));
}
exec.execute(callback::segmentViewInitialized);
segmentCallbackExecs.add(new Pair<>(exec, callback));
}
@Override
@ -130,6 +146,7 @@ public class TestServerInventoryView implements TimelineServerView
exec.execute(() -> callback.segmentAdded(DUMMY_SERVER_REALTIME, segment));
}
exec.execute(callback::timelineInitialized);
timelineCallbackExecs.add(new Pair<>(exec, callback));
}
@Override
@ -143,4 +160,57 @@ public class TestServerInventoryView implements TimelineServerView
{
// Do nothing
}
public void addSegment(DataSegment segment, ServerType serverType)
{
final Pair<DruidServerMetadata, List<DataSegment>> whichServerAndSegments =
getDummyServerAndSegmentsForType(serverType);
final DruidServerMetadata whichServer = whichServerAndSegments.lhs;
whichServerAndSegments.rhs.add(segment);
segmentCallbackExecs.forEach(
execAndCallback -> execAndCallback.lhs.execute(() -> execAndCallback.rhs.segmentAdded(whichServer, segment))
);
timelineCallbackExecs.forEach(
execAndCallback -> execAndCallback.lhs.execute(() -> execAndCallback.rhs.segmentAdded(whichServer, segment))
);
}
public void removeSegment(DataSegment segment, ServerType serverType)
{
final Pair<DruidServerMetadata, List<DataSegment>> whichServerAndSegments =
getDummyServerAndSegmentsForType(serverType);
final DruidServerMetadata whichServer = whichServerAndSegments.lhs;
whichServerAndSegments.rhs.remove(segment);
segmentCallbackExecs.forEach(
execAndCallback -> execAndCallback.lhs.execute(() -> execAndCallback.rhs.segmentRemoved(whichServer, segment))
);
timelineCallbackExecs.forEach(
execAndCallback -> execAndCallback.lhs.execute(() -> {
execAndCallback.rhs.serverSegmentRemoved(whichServer, segment);
// assume that all replicas have been removed and fire this one too
execAndCallback.rhs.segmentRemoved(segment);
})
);
}
private Pair<DruidServerMetadata, List<DataSegment>> getDummyServerAndSegmentsForType(ServerType serverType)
{
final DruidServerMetadata whichServer;
final List<DataSegment> whichSegments;
switch (serverType) {
case BROKER:
whichServer = DUMMY_BROKER;
whichSegments = brokerSegments;
break;
case REALTIME:
whichServer = DUMMY_SERVER_REALTIME;
whichSegments = realtimeSegments;
break;
default:
whichServer = DUMMY_SERVER;
whichSegments = segments;
break;
}
return new Pair<>(whichServer, whichSegments);
}
}