Segment backed broadcast join IndexedTable (#10224)

* Segment backed broadcast join IndexedTable

* fix comments

* fix tests

* sharing is caring

* fix test

* i hope this doesnt fix it

* filter by schema to maybe fix test

* changes

* close join stuffs so it does not leak, allow table to directly make selector factory

* oops

* update comment

* review stuffs

* better check
This commit is contained in:
Clint Wylie 2020-08-20 14:12:39 -07:00 committed by GitHub
parent 618c04a99e
commit 7620b0c54e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
51 changed files with 2071 additions and 182 deletions

View File

@ -24,6 +24,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Maps;
import com.google.inject.Injector;
import com.google.inject.Module;
@ -379,7 +380,7 @@ public class MovingAverageQueryTest extends InitializedNullHandlingTest
baseClient,
null /* local client; unused in this test, so pass in null */,
warehouse,
new MapJoinableFactory(ImmutableMap.of()),
new MapJoinableFactory(ImmutableSet.of(), ImmutableMap.of()),
retryConfig,
jsonMapper,
serverConfig,

View File

@ -39,3 +39,6 @@ druid_sql_avatica_enable=true
druid_server_https_crlPath=/tls/revocations.crl
druid_query_scheduler_laning_strategy=manual
druid_query_scheduler_laning_lanes_one=1
druid_segmentCache_locations=[{"path":"/shared/druid/brokerIndexCache","maxSize":1000000000}]
druid_server_maxSize=1000000000
druid_sql_planner_metadataRefreshPeriod=PT15S

View File

@ -33,6 +33,7 @@ import org.apache.druid.java.util.http.client.response.StatusResponseHandler;
import org.apache.druid.java.util.http.client.response.StatusResponseHolder;
import org.apache.druid.query.lookup.LookupsState;
import org.apache.druid.server.coordinator.CoordinatorDynamicConfig;
import org.apache.druid.server.coordinator.rules.Rule;
import org.apache.druid.server.lookup.cache.LookupExtractorFactoryMapContainer;
import org.apache.druid.testing.IntegrationTestingConfig;
import org.apache.druid.testing.guice.TestClient;
@ -376,6 +377,26 @@ public class CoordinatorResourceTestClient
}
}
public void postLoadRules(String datasourceName, List<Rule> rules) throws Exception
{
String url = StringUtils.format("%srules/%s", getCoordinatorURL(), datasourceName);
StatusResponseHolder response = httpClient.go(
new Request(HttpMethod.POST, new URL(url)).setContent(
"application/json",
jsonMapper.writeValueAsBytes(rules)
), responseHandler
).get();
if (!response.getStatus().equals(HttpResponseStatus.OK)) {
throw new ISE(
"Error while setting dynamic config[%s] status[%s] content[%s]",
url,
response.getStatus(),
response.getContent()
);
}
}
public CoordinatorDynamicConfig getDynamicConfig()
{
String url = StringUtils.format("%sconfig", getCoordinatorURL());

View File

@ -0,0 +1,130 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.tests.query;
import com.google.common.collect.ImmutableList;
import com.google.inject.Inject;
import org.apache.druid.curator.discovery.ServerDiscoveryFactory;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.http.client.HttpClient;
import org.apache.druid.server.coordinator.rules.ForeverBroadcastDistributionRule;
import org.apache.druid.testing.IntegrationTestingConfig;
import org.apache.druid.testing.clients.CoordinatorResourceTestClient;
import org.apache.druid.testing.guice.DruidTestModuleFactory;
import org.apache.druid.testing.guice.TestClient;
import org.apache.druid.testing.utils.ITRetryUtil;
import org.apache.druid.testing.utils.SqlTestQueryHelper;
import org.apache.druid.tests.TestNGGroup;
import org.apache.druid.tests.indexer.AbstractIndexerTest;
import org.testng.annotations.Guice;
import org.testng.annotations.Test;
@Test(groups = TestNGGroup.QUERY)
@Guice(moduleFactory = DruidTestModuleFactory.class)
public class ITBroadcastJoinQueryTest extends AbstractIndexerTest
{
private static final Logger LOG = new Logger(ITBroadcastJoinQueryTest.class);
private static final String BROADCAST_JOIN_TASK = "/indexer/broadcast_join_index_task.json";
private static final String BROADCAST_JOIN_METADATA_QUERIES_RESOURCE = "/queries/broadcast_join_metadata_queries.json";
private static final String BROADCAST_JOIN_QUERIES_RESOURCE = "/queries/broadcast_join_queries.json";
private static final String BROADCAST_JOIN_DATASOURCE = "broadcast_join_wikipedia_test";
@Inject
ServerDiscoveryFactory factory;
@Inject
CoordinatorResourceTestClient coordinatorClient;
@Inject
SqlTestQueryHelper queryHelper;
@Inject
@TestClient
HttpClient httpClient;
@Inject
IntegrationTestingConfig config;
@Test
public void testBroadcastJoin() throws Exception
{
final Closer closer = Closer.create();
try {
closer.register(unloader(BROADCAST_JOIN_DATASOURCE));
// prepare for broadcast
coordinatorClient.postLoadRules(
BROADCAST_JOIN_DATASOURCE,
ImmutableList.of(new ForeverBroadcastDistributionRule())
);
// load the data
String taskJson = replaceJoinTemplate(getResourceAsString(BROADCAST_JOIN_TASK), BROADCAST_JOIN_DATASOURCE);
String taskId = indexer.submitTask(taskJson);
ITRetryUtil.retryUntilTrue(
() -> coordinatorClient.areSegmentsLoaded(BROADCAST_JOIN_DATASOURCE), "broadcast segment load"
);
// query metadata until druid schema is refreshed and datasource is available joinable
ITRetryUtil.retryUntilTrue(
() -> {
try {
queryHelper.testQueriesFromString(
queryHelper.getQueryURL(config.getRouterUrl()),
replaceJoinTemplate(
getResourceAsString(BROADCAST_JOIN_METADATA_QUERIES_RESOURCE),
BROADCAST_JOIN_DATASOURCE
),
1
);
return true;
}
catch (Exception ex) {
return false;
}
},
"waiting for SQL metadata refresh"
);
// now do some queries
queryHelper.testQueriesFromString(
queryHelper.getQueryURL(config.getRouterUrl()),
replaceJoinTemplate(getResourceAsString(BROADCAST_JOIN_QUERIES_RESOURCE), BROADCAST_JOIN_DATASOURCE),
1
);
}
finally {
closer.close();
}
}
private static String replaceJoinTemplate(String template, String joinDataSource)
{
return StringUtils.replace(
StringUtils.replace(template, "%%JOIN_DATASOURCE%%", joinDataSource),
"%%REGULAR_DATASOURCE%%",
ITWikipediaQueryTest.WIKIPEDIA_DATA_SOURCE
);
}
}

View File

@ -47,7 +47,7 @@ import java.util.concurrent.Future;
@Guice(moduleFactory = DruidTestModuleFactory.class)
public class ITWikipediaQueryTest
{
private static final String WIKIPEDIA_DATA_SOURCE = "wikipedia_editstream";
public static final String WIKIPEDIA_DATA_SOURCE = "wikipedia_editstream";
private static final String WIKI_LOOKUP = "wiki-simple";
private static final String WIKIPEDIA_QUERIES_RESOURCE = "/queries/wikipedia_editstream_queries.json";
private static final String WIKIPEDIA_LOOKUP_RESOURCE = "/queries/wiki-lookup-config.json";

View File

@ -0,0 +1,82 @@
{
"type": "index_parallel",
"spec": {
"dataSchema": {
"dataSource": "%%JOIN_DATASOURCE%%",
"timestampSpec": {
"column": "timestamp",
"format": "iso"
},
"dimensionsSpec": {
"dimensions": [
"page",
"language",
"user",
"unpatrolled",
"newPage",
"robot",
"anonymous",
"namespace",
"continent",
"country",
"region",
"city",
{
"type": "long",
"name": "added"
},
{
"type": "long",
"name": "deleted"
}
]
},
"metricsSpec": [
{
"type": "count",
"name": "count"
},
{
"type": "doubleSum",
"name": "sum_added",
"fieldName": "added"
},
{
"type": "doubleSum",
"name": "sum_deleted",
"fieldName": "deleted"
},
{
"type": "doubleSum",
"name": "delta",
"fieldName": "delta"
}
],
"granularitySpec": {
"segmentGranularity": "YEAR",
"queryGranularity": "second"
}
},
"ioConfig": {
"type": "index_parallel",
"inputSource": {
"type": "local",
"baseDir": "/resources/data/union_query/",
"filter": "wikipedia_index_data*"
},
"appendToExisting": false,
"inputFormat": {
"type": "json"
}
},
"tuningConfig": {
"type": "index_parallel",
"indexSpec": {
"segmentLoader": {
"type": "broadcastJoinableMMapSegmentFactory",
"keyColumns": ["user", "language", "added", "deleted"]
}
}
}
}
}

View File

@ -16,6 +16,9 @@
"expectedResults": [
{
"server_type":"historical"
},
{
"server_type":"broker"
}
]
},

View File

@ -0,0 +1,26 @@
[
{
"description": "query information schema to make sure datasource is joinable and broadcast",
"query": {
"query": "SELECT TABLE_NAME, IS_JOINABLE, IS_BROADCAST FROM INFORMATION_SCHEMA.TABLES WHERE IS_JOINABLE = 'YES' AND IS_BROADCAST = 'YES' AND TABLE_SCHEMA = 'druid'"
},
"expectedResults": [
{
"TABLE_NAME": "%%JOIN_DATASOURCE%%",
"IS_JOINABLE": "YES",
"IS_BROADCAST": "YES"
}
]
},
{
"description": "query information schema to make sure druid schema is refreshed",
"query": {
"query": "SELECT COUNT(*) FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_NAME = '%%JOIN_DATASOURCE%%'"
},
"expectedResults": [
{
"EXPR$0": 19
}
]
}
]

View File

@ -0,0 +1,29 @@
[
{
"description": "query broadcast join segment directly",
"query": {
"query": "SELECT \"%%JOIN_DATASOURCE%%\".\"user\", SUM(\"%%JOIN_DATASOURCE%%\".\"added\") FROM druid.\"%%JOIN_DATASOURCE%%\" GROUP BY 1 ORDER BY 2",
"resultFormat": "OBJECT"
},
"expectedResults": [
{"user":"stringer","EXPR$1":2},
{"user":"nuclear","EXPR$1":114},
{"user":"masterYi","EXPR$1":246},
{"user":"speed","EXPR$1":918},
{"user":"triplets","EXPR$1":1810}
]
},
{
"description": "regular datasource is lhs, broadcast join datasource is rhs",
"query": {
"query": "SELECT \"%%JOIN_DATASOURCE%%\".\"language\" as l1, \"%%REGULAR_DATASOURCE%%\".\"language\" as l2, SUM(\"%%JOIN_DATASOURCE%%\".\"sum_added\"), SUM(\"%%REGULAR_DATASOURCE%%\".\"added\") FROM druid.\"%%REGULAR_DATASOURCE%%\" INNER JOIN druid.\"%%JOIN_DATASOURCE%%\" ON \"%%REGULAR_DATASOURCE%%\".\"language\" = \"%%JOIN_DATASOURCE%%\".\"language\" GROUP BY 1, 2 ORDER BY 3 DESC",
"resultFormat": "OBJECT"
},
"expectedResults": [
{"l1":"en","l2":"en","EXPR$2":1.372562064E9,"EXPR$3":2.191945776E9},
{"l1":"zh","l2":"zh","EXPR$2":2.0833281E8,"EXPR$3":9.6017292E7},
{"l1":"ru","l2":"ru","EXPR$2":6.6673872E7,"EXPR$3":2.19902506E8},
{"l1":"ja","l2":"ja","EXPR$2":249728.0,"EXPR$3":8.3520802E7}
]
}
]

View File

@ -21,6 +21,9 @@
"expectedResults": [
{
"server_type":"historical"
},
{
"server_type":"broker"
}
]
}

View File

@ -8,5 +8,15 @@
"tier": "_default_tier",
"curr_size": 2208932412,
"max_size": 5000000000
},
{
"server": "172.172.172.8:8282",
"host": "172.172.172.8",
"plaintext_port": 8082,
"tls_port": 8282,
"server_type": "broker",
"tier": "_default_tier",
"curr_size": 0,
"max_size": 1000000000
}
]

View File

@ -21,6 +21,7 @@ package org.apache.druid.jackson;
import com.fasterxml.jackson.databind.jsontype.NamedType;
import com.fasterxml.jackson.databind.module.SimpleModule;
import org.apache.druid.segment.loading.BroadcastJoinableMMappedQueryableSegmentizerFactory;
import org.apache.druid.segment.loading.MMappedQueryableSegmentizerFactory;
public class SegmentizerModule extends SimpleModule
@ -29,5 +30,8 @@ public class SegmentizerModule extends SimpleModule
{
super("SegmentizerModule");
registerSubtypes(new NamedType(MMappedQueryableSegmentizerFactory.class, "mMapSegmentFactory"));
registerSubtypes(
new NamedType(BroadcastJoinableMMappedQueryableSegmentizerFactory.class, "broadcastJoinableMMapSegmentFactory")
);
}
}

View File

@ -39,7 +39,7 @@ import java.util.function.Function;
* It's counterpart for incremental index is {@link
* org.apache.druid.segment.incremental.IncrementalIndexColumnSelectorFactory}.
*/
class QueryableIndexColumnSelectorFactory implements ColumnSelectorFactory
public class QueryableIndexColumnSelectorFactory implements ColumnSelectorFactory
{
private final QueryableIndex index;
private final VirtualColumns virtualColumns;
@ -55,7 +55,7 @@ class QueryableIndexColumnSelectorFactory implements ColumnSelectorFactory
private final Map<DimensionSpec, DimensionSelector> dimensionSelectorCache;
private final Map<String, ColumnValueSelector> valueSelectorCache;
QueryableIndexColumnSelectorFactory(
public QueryableIndexColumnSelectorFactory(
QueryableIndex index,
VirtualColumns virtualColumns,
boolean descending,

View File

@ -23,7 +23,7 @@ import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
import org.apache.druid.segment.data.Offset;
import org.apache.druid.segment.data.ReadableOffset;
public class SimpleAscendingOffset extends Offset
public class SimpleAscendingOffset extends SimpleSettableOffset
{
private final int rowCount;
private final int initialOffset;
@ -53,7 +53,8 @@ public class SimpleAscendingOffset extends Offset
return currentOffset < rowCount;
}
void setCurrentOffset(int currentOffset)
@Override
public void setCurrentOffset(int currentOffset)
{
this.currentOffset = currentOffset;
}

View File

@ -23,13 +23,13 @@ import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
import org.apache.druid.segment.data.Offset;
import org.apache.druid.segment.data.ReadableOffset;
public class SimpleDescendingOffset extends Offset
public class SimpleDescendingOffset extends SimpleSettableOffset
{
private final int rowCount;
private final int initialOffset;
private int currentOffset;
SimpleDescendingOffset(int rowCount)
public SimpleDescendingOffset(int rowCount)
{
this(rowCount - 1, rowCount);
}
@ -77,6 +77,12 @@ public class SimpleDescendingOffset extends Offset
return currentOffset;
}
@Override
public void setCurrentOffset(int currentOffset)
{
this.currentOffset = currentOffset;
}
@Override
public String toString()
{

View File

@ -0,0 +1,27 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.segment;
import org.apache.druid.segment.data.Offset;
public abstract class SimpleSettableOffset extends Offset
{
public abstract void setCurrentOffset(int currentOffset);
}

View File

@ -143,6 +143,11 @@ public class RowSignature
return columnPositions.containsKey(columnName);
}
public boolean contains(final int columnNumber)
{
return 0 <= columnNumber && columnNumber < columnNames.size();
}
/**
* Returns the first position of {@code columnName} in this row signature, or -1 if it does not appear.
*

View File

@ -19,6 +19,7 @@
package org.apache.druid.segment.join;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.query.BaseQuery;
import org.apache.druid.query.dimension.DimensionSpec;
import org.apache.druid.segment.ColumnSelectorFactory;
@ -51,14 +52,21 @@ public class HashJoinEngine
* not be queryable through the returned Cursor. This happens even if the right-hand joinable doesn't actually have a
* column with this name.
*/
public static Cursor makeJoinCursor(final Cursor leftCursor, final JoinableClause joinableClause)
public static Cursor makeJoinCursor(
final Cursor leftCursor,
final JoinableClause joinableClause,
final boolean descending,
final Closer closer
)
{
final ColumnSelectorFactory leftColumnSelectorFactory = leftCursor.getColumnSelectorFactory();
final JoinMatcher joinMatcher = joinableClause.getJoinable()
.makeJoinMatcher(
leftColumnSelectorFactory,
joinableClause.getCondition(),
joinableClause.getJoinType().isRighty()
joinableClause.getJoinType().isRighty(),
descending,
closer
);
class JoinColumnSelectorFactory implements ColumnSelectorFactory

View File

@ -25,6 +25,7 @@ import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.guava.Sequences;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.query.QueryMetrics;
import org.apache.druid.query.filter.Filter;
import org.apache.druid.segment.Cursor;
@ -254,14 +255,15 @@ public class HashJoinSegmentStorageAdapter implements StorageAdapter
queryMetrics
);
return Sequences.map(
Closer joinablesCloser = Closer.create();
return Sequences.<Cursor, Cursor>map(
baseCursorSequence,
cursor -> {
assert cursor != null;
Cursor retVal = cursor;
for (JoinableClause clause : clauses) {
retVal = HashJoinEngine.makeJoinCursor(retVal, clause);
retVal = HashJoinEngine.makeJoinCursor(retVal, clause, descending, joinablesCloser);
}
return PostJoinCursor.wrap(
@ -270,7 +272,7 @@ public class HashJoinSegmentStorageAdapter implements StorageAdapter
joinFilterSplit.getJoinTableFilter().isPresent() ? joinFilterSplit.getJoinTableFilter().get() : null
);
}
);
).withBaggage(joinablesCloser);
}
/**

View File

@ -19,6 +19,7 @@
package org.apache.druid.segment.join;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.segment.ColumnSelectorFactory;
import org.apache.druid.segment.ReferenceCountedObject;
import org.apache.druid.segment.column.ColumnCapabilities;
@ -71,12 +72,17 @@ public interface Joinable extends ReferenceCountedObject
* @param remainderNeeded whether or not {@link JoinMatcher#matchRemainder()} will ever be called on the
* matcher. If we know it will not, additional optimizations are often possible.
*
* @param descending true if join cursor is iterated in descending order
* @param closer closer that will run after join cursor has completed to clean up any per query
* resources the joinable uses
* @return the matcher
*/
JoinMatcher makeJoinMatcher(
ColumnSelectorFactory leftColumnSelectorFactory,
JoinConditionAnalysis condition,
boolean remainderNeeded
boolean remainderNeeded,
boolean descending,
Closer closer
);
/**

View File

@ -26,7 +26,8 @@ import java.util.Optional;
/**
* Utility for creating {@link Joinable} objects.
*
* @see org.apache.druid.guice.DruidBinders#joinableFactoryBinder to register factories
* @see org.apache.druid.guice.DruidBinders#joinableFactoryMultiBinder to register factories
* @see org.apache.druid.guice.DruidBinders#joinableMappingBinder to register factory types with datasource types
*/
public interface JoinableFactory
{

View File

@ -19,49 +19,65 @@
package org.apache.druid.segment.join;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.SetMultimap;
import com.google.inject.Inject;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.query.DataSource;
import java.util.IdentityHashMap;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
/**
* A {@link JoinableFactory} that delegates to the appropriate factory based on the type of the datasource.
* A {@link JoinableFactory} that delegates to the appropriate factory based on the datasource.
*
* Datasources can register a factory via a DruidBinder
* Any number of {@link JoinableFactory} may be associated to the same class of {@link DataSource}, but for a specific
* datasource only a single {@link JoinableFactory} should be able to create a {@link Joinable} in the {@link #build}
* method.
*/
public class MapJoinableFactory implements JoinableFactory
{
private final Map<Class<? extends DataSource>, JoinableFactory> joinableFactories;
private final SetMultimap<Class<? extends DataSource>, JoinableFactory> joinableFactories;
@Inject
public MapJoinableFactory(Map<Class<? extends DataSource>, JoinableFactory> joinableFactories)
public MapJoinableFactory(
Set<JoinableFactory> factories,
Map<Class<? extends JoinableFactory>, Class<? extends DataSource>> factoryToDataSource
)
{
// Accesses to IdentityHashMap should be faster than to HashMap or ImmutableMap.
// Class doesn't override Object.equals().
this.joinableFactories = new IdentityHashMap<>(joinableFactories);
this.joinableFactories = HashMultimap.create();
factories.forEach(joinableFactory -> {
joinableFactories.put(factoryToDataSource.get(joinableFactory.getClass()), joinableFactory);
});
}
@Override
public boolean isDirectlyJoinable(DataSource dataSource)
{
JoinableFactory factory = joinableFactories.get(dataSource.getClass());
if (factory == null) {
return false;
} else {
return factory.isDirectlyJoinable(dataSource);
Set<JoinableFactory> factories = joinableFactories.get(dataSource.getClass());
for (JoinableFactory factory : factories) {
if (factory.isDirectlyJoinable(dataSource)) {
return true;
}
}
return false;
}
@Override
public Optional<Joinable> build(DataSource dataSource, JoinConditionAnalysis condition)
{
JoinableFactory factory = joinableFactories.get(dataSource.getClass());
if (factory == null) {
return Optional.empty();
} else {
return factory.build(dataSource, condition);
Set<JoinableFactory> factories = joinableFactories.get(dataSource.getClass());
Optional<Joinable> maybeJoinable = Optional.empty();
for (JoinableFactory factory : factories) {
Optional<Joinable> candidate = factory.build(dataSource, condition);
if (candidate.isPresent()) {
if (maybeJoinable.isPresent()) {
throw new ISE("Multiple joinable factories are valid for table[%s]", dataSource);
}
maybeJoinable = candidate;
}
}
return maybeJoinable;
}
}

View File

@ -21,6 +21,7 @@ package org.apache.druid.segment.join.lookup;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.query.lookup.LookupExtractor;
import org.apache.druid.segment.ColumnSelectorFactory;
import org.apache.druid.segment.column.ColumnCapabilities;
@ -83,7 +84,9 @@ public class LookupJoinable implements Joinable
public JoinMatcher makeJoinMatcher(
final ColumnSelectorFactory leftSelectorFactory,
final JoinConditionAnalysis condition,
final boolean remainderNeeded
final boolean remainderNeeded,
boolean descending,
Closer closer
)
{
return LookupJoinMatcher.create(extractor, leftSelectorFactory, condition, remainderNeeded);

View File

@ -0,0 +1,255 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.segment.join.table;
import com.google.common.base.Preconditions;
import it.unimi.dsi.fastutil.ints.IntArrayList;
import it.unimi.dsi.fastutil.ints.IntList;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.guava.Sequences;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.segment.BaseObjectColumnValueSelector;
import org.apache.druid.segment.ColumnSelectorFactory;
import org.apache.druid.segment.Cursor;
import org.apache.druid.segment.NilColumnValueSelector;
import org.apache.druid.segment.QueryableIndex;
import org.apache.druid.segment.QueryableIndexColumnSelectorFactory;
import org.apache.druid.segment.QueryableIndexSegment;
import org.apache.druid.segment.QueryableIndexStorageAdapter;
import org.apache.druid.segment.SimpleAscendingOffset;
import org.apache.druid.segment.VirtualColumns;
import org.apache.druid.segment.column.BaseColumn;
import org.apache.druid.segment.column.ColumnHolder;
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.segment.column.ValueType;
import org.apache.druid.segment.data.ReadableOffset;
import org.apache.druid.segment.filter.Filters;
import org.joda.time.chrono.ISOChronology;
import javax.annotation.Nullable;
import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
public class BroadcastSegmentIndexedTable implements IndexedTable
{
private static final Logger LOG = new Logger(BroadcastSegmentIndexedTable.class);
private final QueryableIndexSegment segment;
private final QueryableIndexStorageAdapter adapter;
private final QueryableIndex queryableIndex;
private final Set<String> keyColumns;
private final RowSignature rowSignature;
private final String version;
private final List<Map<Object, IntList>> keyColumnsIndex;
public BroadcastSegmentIndexedTable(final QueryableIndexSegment theSegment, final Set<String> keyColumns, final String version)
{
this.keyColumns = keyColumns;
this.version = version;
this.segment = Preconditions.checkNotNull(theSegment, "Segment must not be null");
this.adapter = Preconditions.checkNotNull(
(QueryableIndexStorageAdapter) segment.asStorageAdapter(),
"Segment[%s] must have a QueryableIndexStorageAdapter",
segment.getId()
);
this.queryableIndex = Preconditions.checkNotNull(
segment.asQueryableIndex(),
"Segment[%s] must have a QueryableIndexSegment",
segment.getId()
);
RowSignature.Builder sigBuilder = RowSignature.builder();
sigBuilder.add(ColumnHolder.TIME_COLUMN_NAME, ValueType.LONG);
for (String column : queryableIndex.getColumnNames()) {
sigBuilder.add(column, adapter.getColumnCapabilities(column).getType());
}
this.rowSignature = sigBuilder.build();
// initialize keycolumn index maps
this.keyColumnsIndex = new ArrayList<>(rowSignature.size());
final List<String> keyColumnNames = new ArrayList<>(keyColumns.size());
for (int i = 0; i < rowSignature.size(); i++) {
final Map<Object, IntList> m;
final String columnName = rowSignature.getColumnName(i);
if (keyColumns.contains(columnName)) {
m = new HashMap<>();
keyColumnNames.add(columnName);
} else {
m = null;
}
keyColumnsIndex.add(m);
}
// sort of like the dump segment tool, but build key column indexes when reading the segment
final Sequence<Cursor> cursors = adapter.makeCursors(
Filters.toFilter(null),
queryableIndex.getDataInterval().withChronology(ISOChronology.getInstanceUTC()),
VirtualColumns.EMPTY,
Granularities.ALL,
false,
null
);
final Sequence<Integer> sequence = Sequences.map(
cursors,
cursor -> {
if (cursor == null) {
return 0;
}
int rowNumber = 0;
ColumnSelectorFactory columnSelectorFactory = cursor.getColumnSelectorFactory();
// this should really be optimized to use dimension selectors where possible to populate indexes from bitmap
// indexes, but, an optimization for another day
final List<BaseObjectColumnValueSelector> selectors = keyColumnNames
.stream()
.map(columnName -> {
// multi-value dimensions are not currently supported
if (adapter.getColumnCapabilities(columnName).hasMultipleValues().isMaybeTrue()) {
return NilColumnValueSelector.instance();
}
return columnSelectorFactory.makeColumnValueSelector(columnName);
})
.collect(Collectors.toList());
while (!cursor.isDone()) {
for (int keyColumnSelectorIndex = 0; keyColumnSelectorIndex < selectors.size(); keyColumnSelectorIndex++) {
final String keyColumnName = keyColumnNames.get(keyColumnSelectorIndex);
final int columnPosition = rowSignature.indexOf(keyColumnName);
final Map<Object, IntList> keyColumnValueIndex = keyColumnsIndex.get(columnPosition);
final Object key = selectors.get(keyColumnSelectorIndex).getObject();
if (key != null) {
final IntList array = keyColumnValueIndex.computeIfAbsent(key, k -> new IntArrayList());
array.add(rowNumber);
}
}
if (rowNumber % 100_000 == 0) {
if (rowNumber == 0) {
LOG.debug("Indexed first row for table %s", theSegment.getId());
} else {
LOG.debug("Indexed row %s for table %s", rowNumber, theSegment.getId());
}
}
rowNumber++;
cursor.advance();
}
return rowNumber;
}
);
Integer totalRows = sequence.accumulate(0, (accumulated, in) -> accumulated += in);
LOG.info("Created BroadcastSegmentIndexedTable with %s rows.", totalRows);
}
@Override
public String version()
{
return version;
}
@Override
public Set<String> keyColumns()
{
return keyColumns;
}
@Override
public RowSignature rowSignature()
{
return rowSignature;
}
@Override
public int numRows()
{
return adapter.getNumRows();
}
@Override
public Index columnIndex(int column)
{
return RowBasedIndexedTable.getKeyColumnIndex(column, keyColumnsIndex, rowSignature);
}
@Override
public Reader columnReader(int column)
{
if (!rowSignature.contains(column)) {
throw new IAE("Column[%d] is not a valid column for segment[%s]", column, segment.getId());
}
final SimpleAscendingOffset offset = new SimpleAscendingOffset(adapter.getNumRows());
final BaseColumn baseColumn = queryableIndex.getColumnHolder(rowSignature.getColumnName(column)).getColumn();
final BaseObjectColumnValueSelector<?> selector = baseColumn.makeColumnValueSelector(offset);
return new Reader()
{
@Nullable
@Override
public Object read(int row)
{
offset.setCurrentOffset(row);
return selector.getObject();
}
@Override
public void close() throws IOException
{
baseColumn.close();
}
};
}
@Nullable
@Override
public ColumnSelectorFactory makeColumnSelectorFactory(ReadableOffset offset, boolean descending, Closer closer)
{
return new QueryableIndexColumnSelectorFactory(
queryableIndex,
VirtualColumns.EMPTY,
descending,
closer,
offset,
new HashMap<>()
);
}
@Override
public void close()
{
// the segment will close itself when it is dropped, no need to do it here
}
@Override
public Optional<Closeable> acquireReferences()
{
return Optional.empty();
}
}

View File

@ -20,11 +20,15 @@
package org.apache.druid.segment.join.table;
import it.unimi.dsi.fastutil.ints.IntList;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.segment.ColumnSelectorFactory;
import org.apache.druid.segment.ReferenceCountedObject;
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.segment.data.ReadableOffset;
import javax.annotation.Nullable;
import java.io.Closeable;
import java.io.IOException;
import java.util.Set;
/**
@ -65,10 +69,24 @@ public interface IndexedTable extends ReferenceCountedObject, Closeable
/**
* Returns a reader for a particular column. The provided column number must be that column's position in
* {@link #rowSignature()}.
* {@link #rowSignature()}. Don't forget to close your {@link Reader} when finished reading, to clean up any
* resources.
*/
Reader columnReader(int column);
/**
* This method allows a table to directly provide an optimized {@link ColumnSelectorFactory} for
* {@link IndexedTableJoinMatcher} to create selectors. If this method returns null, the default
* {@link IndexedTableColumnSelectorFactory}, which creates {@link IndexedTableDimensionSelector} or
* {@link IndexedTableColumnValueSelector} as appropriate, both backed with a {@link #columnReader}, will be used
* instead.
*/
@Nullable
default ColumnSelectorFactory makeColumnSelectorFactory(ReadableOffset offset, boolean descending, Closer closer)
{
return null;
}
/**
* Indexes support fast lookups on key columns.
*/
@ -83,7 +101,7 @@ public interface IndexedTable extends ReferenceCountedObject, Closeable
/**
* Readers support reading values out of any column.
*/
interface Reader
interface Reader extends Closeable
{
/**
* Read the value at a particular row number. Throws an exception if the row is out of bounds (must be between zero
@ -91,5 +109,11 @@ public interface IndexedTable extends ReferenceCountedObject, Closeable
*/
@Nullable
Object read(int row);
@Override
default void close() throws IOException
{
// nothing to close
}
}
}

View File

@ -19,6 +19,7 @@
package org.apache.druid.segment.join.table;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.query.dimension.DimensionSpec;
import org.apache.druid.segment.ColumnSelectorFactory;
import org.apache.druid.segment.ColumnValueSelector;
@ -36,11 +37,13 @@ public class IndexedTableColumnSelectorFactory implements ColumnSelectorFactory
{
private final IndexedTable table;
private final IntSupplier currentRow;
private final Closer closer;
IndexedTableColumnSelectorFactory(IndexedTable table, IntSupplier currentRow)
IndexedTableColumnSelectorFactory(IndexedTable table, IntSupplier currentRow, Closer closer)
{
this.table = table;
this.currentRow = currentRow;
this.closer = closer;
}
@Nullable
@ -79,7 +82,8 @@ public class IndexedTableColumnSelectorFactory implements ColumnSelectorFactory
table,
currentRow,
columnNumber,
dimensionSpec.getExtractionFn()
dimensionSpec.getExtractionFn(),
closer
);
return dimensionSpec.decorate(undecoratedSelector);
@ -95,7 +99,7 @@ public class IndexedTableColumnSelectorFactory implements ColumnSelectorFactory
if (columnNumber < 0) {
return NilColumnValueSelector.instance();
} else {
return new IndexedTableColumnValueSelector(table, currentRow, columnNumber);
return new IndexedTableColumnValueSelector(table, currentRow, columnNumber, closer);
}
}

View File

@ -20,6 +20,7 @@
package org.apache.druid.segment.join.table;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
import org.apache.druid.segment.ColumnValueSelector;
@ -31,10 +32,11 @@ public class IndexedTableColumnValueSelector implements ColumnValueSelector<Obje
private final IntSupplier currentRow;
private final IndexedTable.Reader columnReader;
IndexedTableColumnValueSelector(IndexedTable table, IntSupplier currentRow, int columnNumber)
IndexedTableColumnValueSelector(IndexedTable table, IntSupplier currentRow, int columnNumber, Closer closer)
{
this.currentRow = currentRow;
this.columnReader = table.columnReader(columnNumber);
closer.register(columnReader);
}
@Override

View File

@ -20,6 +20,7 @@
package org.apache.druid.segment.join.table;
import com.google.common.base.Predicate;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.query.extraction.ExtractionFn;
import org.apache.druid.query.filter.ValueMatcher;
import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
@ -46,13 +47,15 @@ public class IndexedTableDimensionSelector implements DimensionSelector
IndexedTable table,
IntSupplier currentRow,
int columnNumber,
@Nullable ExtractionFn extractionFn
@Nullable ExtractionFn extractionFn,
Closer closer
)
{
this.table = table;
this.currentRow = currentRow;
this.extractionFn = extractionFn;
this.columnReader = table.columnReader(columnNumber);
closer.register(columnReader);
this.currentIndexedInts = new SingleIndexedInt();
}

View File

@ -30,6 +30,7 @@ import it.unimi.dsi.fastutil.ints.IntRBTreeSet;
import it.unimi.dsi.fastutil.ints.IntSet;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.query.QueryUnsupportedException;
import org.apache.druid.segment.BaseDoubleColumnValueSelector;
import org.apache.druid.segment.BaseFloatColumnValueSelector;
@ -40,6 +41,9 @@ import org.apache.druid.segment.ColumnProcessors;
import org.apache.druid.segment.ColumnSelectorFactory;
import org.apache.druid.segment.DimensionDictionarySelector;
import org.apache.druid.segment.DimensionSelector;
import org.apache.druid.segment.SimpleAscendingOffset;
import org.apache.druid.segment.SimpleDescendingOffset;
import org.apache.druid.segment.SimpleSettableOffset;
import org.apache.druid.segment.column.ValueType;
import org.apache.druid.segment.data.IndexedInts;
import org.apache.druid.segment.join.Equality;
@ -78,16 +82,24 @@ public class IndexedTableJoinMatcher implements JoinMatcher
@Nullable
private IntIterator currentIterator;
private int currentRow;
private final SimpleSettableOffset joinableOffset;
IndexedTableJoinMatcher(
final IndexedTable table,
final ColumnSelectorFactory leftSelectorFactory,
final JoinConditionAnalysis condition,
final boolean remainderNeeded
final boolean remainderNeeded,
final boolean descending,
final Closer closer
)
{
this.table = table;
this.currentRow = UNINITIALIZED_CURRENT_ROW;
if (descending) {
this.joinableOffset = new SimpleDescendingOffset(table.numRows());
} else {
this.joinableOffset = new SimpleAscendingOffset(table.numRows());
}
reset();
if (condition.isAlwaysTrue()) {
this.conditionMatchers = Collections.singletonList(() -> IntIterators.fromTo(0, table.numRows()));
@ -106,7 +118,10 @@ public class IndexedTableJoinMatcher implements JoinMatcher
}
this.currentMatchedRows = new IntIterator[conditionMatchers.size()];
this.selectorFactory = new IndexedTableColumnSelectorFactory(table, () -> currentRow);
ColumnSelectorFactory selectorFactory = table.makeColumnSelectorFactory(joinableOffset, descending, closer);
this.selectorFactory = selectorFactory != null
? selectorFactory
: new IndexedTableColumnSelectorFactory(table, () -> currentRow, closer);
if (remainderNeeded) {
this.matchedRows = new IntRBTreeSet();
@ -243,6 +258,7 @@ public class IndexedTableJoinMatcher implements JoinMatcher
currentIterator = null;
currentRow = UNINITIALIZED_CURRENT_ROW;
matchingRemainder = false;
joinableOffset.reset();
}
private void advanceCurrentRow()
@ -252,6 +268,7 @@ public class IndexedTableJoinMatcher implements JoinMatcher
} else {
currentIterator = null;
currentRow = UNINITIALIZED_CURRENT_ROW;
joinableOffset.setCurrentOffset(currentRow);
}
}

View File

@ -20,6 +20,7 @@
package org.apache.druid.segment.join.table;
import it.unimi.dsi.fastutil.ints.IntList;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.segment.ColumnSelectorFactory;
import org.apache.druid.segment.column.ColumnCapabilities;
import org.apache.druid.segment.join.JoinConditionAnalysis;
@ -28,6 +29,7 @@ import org.apache.druid.segment.join.Joinable;
import javax.annotation.Nullable;
import java.io.Closeable;
import java.io.IOException;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
@ -71,14 +73,18 @@ public class IndexedTableJoinable implements Joinable
public JoinMatcher makeJoinMatcher(
final ColumnSelectorFactory leftColumnSelectorFactory,
final JoinConditionAnalysis condition,
final boolean remainderNeeded
final boolean remainderNeeded,
boolean descending,
Closer closer
)
{
return new IndexedTableJoinMatcher(
table,
leftColumnSelectorFactory,
condition,
remainderNeeded
remainderNeeded,
descending,
closer
);
}
@ -97,41 +103,48 @@ public class IndexedTableJoinable implements Joinable
if (filterColumnPosition < 0 || correlatedColumnPosition < 0) {
return Optional.empty();
}
Set<String> correlatedValues = new HashSet<>();
if (table.keyColumns().contains(searchColumnName)) {
IndexedTable.Index index = table.columnIndex(filterColumnPosition);
IndexedTable.Reader reader = table.columnReader(correlatedColumnPosition);
IntList rowIndex = index.find(searchColumnValue);
for (int i = 0; i < rowIndex.size(); i++) {
int rowNum = rowIndex.getInt(i);
String correlatedDimVal = Objects.toString(reader.read(rowNum), null);
correlatedValues.add(correlatedDimVal);
if (correlatedValues.size() > maxCorrelationSetSize) {
return Optional.empty();
}
}
return Optional.of(correlatedValues);
} else {
if (!allowNonKeyColumnSearch) {
return Optional.empty();
}
IndexedTable.Reader dimNameReader = table.columnReader(filterColumnPosition);
IndexedTable.Reader correlatedColumnReader = table.columnReader(correlatedColumnPosition);
for (int i = 0; i < table.numRows(); i++) {
String dimVal = Objects.toString(dimNameReader.read(i), null);
if (searchColumnValue.equals(dimVal)) {
String correlatedDimVal = Objects.toString(correlatedColumnReader.read(i), null);
try (final Closer closer = Closer.create()) {
Set<String> correlatedValues = new HashSet<>();
if (table.keyColumns().contains(searchColumnName)) {
IndexedTable.Index index = table.columnIndex(filterColumnPosition);
IndexedTable.Reader reader = table.columnReader(correlatedColumnPosition);
closer.register(reader);
IntList rowIndex = index.find(searchColumnValue);
for (int i = 0; i < rowIndex.size(); i++) {
int rowNum = rowIndex.getInt(i);
String correlatedDimVal = Objects.toString(reader.read(rowNum), null);
correlatedValues.add(correlatedDimVal);
if (correlatedValues.size() > maxCorrelationSetSize) {
return Optional.empty();
}
}
}
return Optional.of(correlatedValues);
} else {
if (!allowNonKeyColumnSearch) {
return Optional.empty();
}
return Optional.of(correlatedValues);
IndexedTable.Reader dimNameReader = table.columnReader(filterColumnPosition);
IndexedTable.Reader correlatedColumnReader = table.columnReader(correlatedColumnPosition);
closer.register(dimNameReader);
closer.register(correlatedColumnReader);
for (int i = 0; i < table.numRows(); i++) {
String dimVal = Objects.toString(dimNameReader.read(i), null);
if (searchColumnValue.equals(dimVal)) {
String correlatedDimVal = Objects.toString(correlatedColumnReader.read(i), null);
correlatedValues.add(correlatedDimVal);
if (correlatedValues.size() > maxCorrelationSetSize) {
return Optional.empty();
}
}
}
return Optional.of(correlatedValues);
}
}
catch (IOException e) {
throw new RuntimeException(e);
}
}

View File

@ -0,0 +1,78 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.segment.join.table;
import org.apache.druid.segment.ReferenceCountingCloseableObject;
import org.apache.druid.segment.column.RowSignature;
import java.io.Closeable;
import java.util.Optional;
import java.util.Set;
public class ReferenceCountingIndexedTable extends ReferenceCountingCloseableObject<IndexedTable>
implements IndexedTable
{
public ReferenceCountingIndexedTable(IndexedTable indexedTable)
{
super(indexedTable);
}
@Override
public String version()
{
return baseObject.version();
}
@Override
public Set<String> keyColumns()
{
return baseObject.keyColumns();
}
@Override
public RowSignature rowSignature()
{
return baseObject.rowSignature();
}
@Override
public int numRows()
{
return baseObject.numRows();
}
@Override
public Index columnIndex(int column)
{
return baseObject.columnIndex(column);
}
@Override
public Reader columnReader(int column)
{
return baseObject.columnReader(column);
}
@Override
public Optional<Closeable> acquireReferences()
{
return incrementReferenceAndDecrementOnceCloseable();
}
}

View File

@ -33,7 +33,6 @@ import org.apache.druid.segment.column.ValueType;
import java.io.Closeable;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@ -69,10 +68,6 @@ public class RowBasedIndexedTable<RowType> implements IndexedTable
this.keyColumns = keyColumns;
this.version = version;
if (new HashSet<>(keyColumns).size() != keyColumns.size()) {
throw new ISE("keyColumns[%s] must not contain duplicates", keyColumns);
}
if (!ImmutableSet.copyOf(rowSignature.getColumnNames()).containsAll(keyColumns)) {
throw new ISE(
"keyColumns[%s] must all be contained in rowSignature[%s]",
@ -132,29 +127,7 @@ public class RowBasedIndexedTable<RowType> implements IndexedTable
@Override
public Index columnIndex(int column)
{
final Map<Object, IntList> indexMap = index.get(column);
if (indexMap == null) {
throw new IAE("Column[%d] is not a key column", column);
}
final ValueType columnType =
rowSignature.getColumnType(column).orElse(IndexedTableJoinMatcher.DEFAULT_KEY_TYPE);
return key -> {
final Object convertedKey = DimensionHandlerUtils.convertObjectToType(key, columnType, false);
if (convertedKey != null) {
final IntList found = indexMap.get(convertedKey);
if (found != null) {
return found;
} else {
return IntLists.EMPTY_LIST;
}
} else {
return IntLists.EMPTY_LIST;
}
};
return getKeyColumnIndex(column, index, rowSignature);
}
@Override
@ -187,4 +160,31 @@ public class RowBasedIndexedTable<RowType> implements IndexedTable
{
// nothing to close
}
static Index getKeyColumnIndex(int column, List<Map<Object, IntList>> keyColumnsIndex, RowSignature rowSignature)
{
final Map<Object, IntList> indexMap = keyColumnsIndex.get(column);
if (indexMap == null) {
throw new IAE("Column[%d] is not a key column", column);
}
final ValueType columnType =
rowSignature.getColumnType(column).orElse(IndexedTableJoinMatcher.DEFAULT_KEY_TYPE);
return key -> {
final Object convertedKey = DimensionHandlerUtils.convertObjectToType(key, columnType, false);
if (convertedKey != null) {
final IntList found = indexMap.get(convertedKey);
if (found != null) {
return found;
} else {
return IntLists.EMPTY_LIST;
}
} else {
return IntLists.EMPTY_LIST;
}
};
}
}

View File

@ -0,0 +1,98 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.segment.loading;
import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.segment.IndexIO;
import org.apache.druid.segment.QueryableIndexSegment;
import org.apache.druid.segment.Segment;
import org.apache.druid.segment.join.table.BroadcastSegmentIndexedTable;
import org.apache.druid.segment.join.table.IndexedTable;
import org.apache.druid.timeline.DataSegment;
import javax.annotation.Nullable;
import java.io.File;
import java.io.IOException;
import java.util.Objects;
import java.util.Set;
public class BroadcastJoinableMMappedQueryableSegmentizerFactory implements SegmentizerFactory
{
private final IndexIO indexIO;
private final Set<String> keyColumns;
@JsonCreator
public BroadcastJoinableMMappedQueryableSegmentizerFactory(
@JacksonInject IndexIO indexIO,
@JsonProperty("keyColumns") Set<String> keyColumns
)
{
this.indexIO = indexIO;
this.keyColumns = keyColumns;
}
@JsonProperty
public Set<String> getKeyColumns()
{
return keyColumns;
}
@Override
public Segment factorize(DataSegment dataSegment, File parentDir, boolean lazy) throws SegmentLoadingException
{
try {
return new QueryableIndexSegment(indexIO.loadIndex(parentDir, lazy), dataSegment.getId()) {
@Nullable
@Override
public <T> T as(Class<T> clazz)
{
if (clazz.equals(IndexedTable.class)) {
return (T) new BroadcastSegmentIndexedTable(this, keyColumns, dataSegment.getVersion());
}
return super.as(clazz);
}
};
}
catch (IOException e) {
throw new SegmentLoadingException(e, "%s", e.getMessage());
}
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
BroadcastJoinableMMappedQueryableSegmentizerFactory that = (BroadcastJoinableMMappedQueryableSegmentizerFactory) o;
return Objects.equals(keyColumns, that.keyColumns);
}
@Override
public int hashCode()
{
return Objects.hash(keyColumns);
}
}

View File

@ -20,7 +20,7 @@
package org.apache.druid.segment.join;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.query.DataSource;
import com.google.common.collect.ImmutableSet;
import org.apache.druid.query.InlineDataSource;
import org.easymock.EasyMock;
import org.easymock.EasyMockRunner;
@ -31,20 +31,11 @@ import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import java.util.Map;
import java.util.Optional;
@RunWith(EasyMockRunner.class)
public class MapJoinableFactoryTest
{
/**
* A utility to create a {@link MapJoinableFactory} to be used by tests.
*/
public static MapJoinableFactory fromMap(Map<Class<? extends DataSource>, JoinableFactory> map)
{
return new MapJoinableFactory(map);
}
@Mock
private InlineDataSource inlineDataSource;
@Mock(MockType.NICE)
@ -63,7 +54,9 @@ public class MapJoinableFactoryTest
noopDataSource = new NoopDataSource();
target = new MapJoinableFactory(
ImmutableMap.of(NoopDataSource.class, noopJoinableFactory));
ImmutableSet.of(noopJoinableFactory),
ImmutableMap.of(noopJoinableFactory.getClass(), NoopDataSource.class)
);
}

View File

@ -0,0 +1,336 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.segment.join.table;
import com.fasterxml.jackson.databind.InjectableValues;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import it.unimi.dsi.fastutil.ints.IntList;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.jackson.SegmentizerModule;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.math.expr.ExprMacroTable;
import org.apache.druid.query.expression.TestExprMacroTable;
import org.apache.druid.segment.BaseObjectColumnValueSelector;
import org.apache.druid.segment.ColumnSelectorFactory;
import org.apache.druid.segment.IndexIO;
import org.apache.druid.segment.IndexMerger;
import org.apache.druid.segment.IndexMergerV9;
import org.apache.druid.segment.IndexSpec;
import org.apache.druid.segment.QueryableIndexSegment;
import org.apache.druid.segment.SimpleAscendingOffset;
import org.apache.druid.segment.TestIndex;
import org.apache.druid.segment.column.BaseColumn;
import org.apache.druid.segment.column.ColumnHolder;
import org.apache.druid.segment.incremental.IncrementalIndex;
import org.apache.druid.segment.loading.MMappedQueryableSegmentizerFactory;
import org.apache.druid.segment.loading.SegmentLoadingException;
import org.apache.druid.segment.loading.SegmentizerFactory;
import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
import org.apache.druid.testing.InitializedNullHandlingTest;
import org.apache.druid.timeline.DataSegment;
import org.joda.time.Interval;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.rules.TemporaryFolder;
import java.io.File;
import java.io.IOException;
import java.util.List;
import java.util.Set;
public class BroadcastSegmentIndexedTableTest extends InitializedNullHandlingTest
{
private static final String STRING_COL_1 = "market";
private static final String LONG_COL_1 = "longNumericNull";
private static final String DOUBLE_COL_1 = "doubleNumericNull";
private static final String FLOAT_COL_1 = "floatNumericNull";
private static final String STRING_COL_2 = "partial_null_column";
private static final String MULTI_VALUE_COLUMN = "placementish";
private static final String DIM_NOT_EXISTS = "DIM_NOT_EXISTS";
private static final String DATASOURCE = "DATASOURCE";
@Rule
public TemporaryFolder temporaryFolder = new TemporaryFolder();
@Rule
public ExpectedException expectedException = ExpectedException.none();
private QueryableIndexSegment backingSegment;
private BroadcastSegmentIndexedTable broadcastTable;
private List<String> columnNames;
private final Set<String> keyColumns = ImmutableSet.<String>builder()
.add(STRING_COL_1)
.add(STRING_COL_2)
.add(LONG_COL_1)
.add(DOUBLE_COL_1)
.add(FLOAT_COL_1)
.add(MULTI_VALUE_COLUMN)
.add(DIM_NOT_EXISTS)
.build();
@Before
public void setup() throws IOException, SegmentLoadingException
{
final ObjectMapper mapper = new DefaultObjectMapper();
mapper.registerModule(new SegmentizerModule());
final IndexIO indexIO = new IndexIO(mapper, () -> 0);
mapper.setInjectableValues(
new InjectableValues.Std()
.addValue(ExprMacroTable.class.getName(), TestExprMacroTable.INSTANCE)
.addValue(ObjectMapper.class.getName(), mapper)
.addValue(IndexIO.class, indexIO)
.addValue(DataSegment.PruneSpecsHolder.class, DataSegment.PruneSpecsHolder.DEFAULT)
);
final IndexMerger indexMerger =
new IndexMergerV9(mapper, indexIO, OffHeapMemorySegmentWriteOutMediumFactory.instance());
Interval testInterval = Intervals.of("2011-01-12T00:00:00.000Z/2011-05-01T00:00:00.000Z");
IncrementalIndex data = TestIndex.makeRealtimeIndex("druid.sample.numeric.tsv");
File segment = new File(temporaryFolder.newFolder(), "segment");
File persisted = indexMerger.persist(
data,
testInterval,
segment,
new IndexSpec(),
null
);
File factoryJson = new File(persisted, "factory.json");
Assert.assertTrue(factoryJson.exists());
SegmentizerFactory factory = mapper.readValue(factoryJson, SegmentizerFactory.class);
Assert.assertTrue(factory instanceof MMappedQueryableSegmentizerFactory);
DataSegment dataSegment = new DataSegment(
DATASOURCE,
testInterval,
DateTimes.nowUtc().toString(),
ImmutableMap.of(),
columnNames,
ImmutableList.of(),
null,
null,
segment.getTotalSpace()
);
backingSegment = (QueryableIndexSegment) factory.factorize(dataSegment, segment, false);
columnNames = ImmutableList.<String>builder().add(ColumnHolder.TIME_COLUMN_NAME)
.addAll(backingSegment.asQueryableIndex().getColumnNames()).build();
broadcastTable = new BroadcastSegmentIndexedTable(backingSegment, keyColumns, dataSegment.getVersion());
}
@Test
public void testInitShouldGenerateCorrectTable()
{
Assert.assertEquals(1209, broadcastTable.numRows());
}
@Test
public void testStringKeyColumn()
{
// lets try a few values out
final String[] vals = new String[] {"spot", "total_market", "upfront"};
checkIndexAndReader(STRING_COL_1, vals);
}
@Test
public void testNullableStringKeyColumn()
{
final String[] vals = new String[] {null, "value"};
checkIndexAndReader(STRING_COL_2, vals);
}
@Test
public void testMultiValueStringKeyColumn()
{
final Object[] nonMatchingVals = new Object[] {ImmutableList.of("a", "preferred")};
checkIndexAndReader(MULTI_VALUE_COLUMN, new Object[0], nonMatchingVals);
}
@Test
public void testLongKeyColumn()
{
final Long[] vals = new Long[] {NullHandling.replaceWithDefault() ? 0L : null, 10L, 20L};
checkIndexAndReader(LONG_COL_1, vals);
}
@Test
public void testFloatKeyColumn()
{
final Float[] vals = new Float[] {NullHandling.replaceWithDefault() ? 0.0f : null, 10.0f, 20.0f};
checkIndexAndReader(FLOAT_COL_1, vals);
}
@Test
public void testDoubleKeyColumn()
{
final Double[] vals = new Double[] {NullHandling.replaceWithDefault() ? 0.0 : null, 10.0, 20.0};
checkIndexAndReader(DOUBLE_COL_1, vals);
}
@Test
public void testTimestampColumn()
{
checkNonIndexedReader(ColumnHolder.TIME_COLUMN_NAME);
}
@Test
public void testStringNonKeyColumn()
{
checkNonIndexedReader("qualityNumericString");
}
@Test
public void testLongNonKeyColumn()
{
checkNonIndexedReader("qualityLong");
}
@Test
public void testFloatNonKeyColumn()
{
checkNonIndexedReader("qualityFloat");
}
@Test
public void testDoubleNonKeyColumn()
{
checkNonIndexedReader("qualityDouble");
}
@Test
public void testNonexistentColumn()
{
expectedException.expect(IAE.class);
expectedException.expectMessage("Column[-1] is not a valid column");
broadcastTable.columnReader(columnNames.indexOf(DIM_NOT_EXISTS));
}
@Test
public void testNonexistentColumnOutOfRange()
{
final int non = columnNames.size();
expectedException.expect(IAE.class);
expectedException.expectMessage(StringUtils.format("Column[%s] is not a valid column", non));
broadcastTable.columnReader(non);
}
private void checkIndexAndReader(String columnName, Object[] vals)
{
checkIndexAndReader(columnName, vals, new Object[0]);
}
private void checkIndexAndReader(String columnName, Object[] vals, Object[] nonmatchingVals)
{
checkColumnSelectorFactory(columnName);
try (final Closer closer = Closer.create()) {
final int columnIndex = columnNames.indexOf(columnName);
final IndexedTable.Reader reader = broadcastTable.columnReader(columnIndex);
closer.register(reader);
final IndexedTable.Index valueIndex = broadcastTable.columnIndex(columnIndex);
// lets try a few values out
for (Object val : vals) {
final IntList valIndex = valueIndex.find(val);
if (val == null) {
Assert.assertEquals(0, valIndex.size());
} else {
Assert.assertTrue(valIndex.size() > 0);
for (int i = 0; i < valIndex.size(); i++) {
Assert.assertEquals(val, reader.read(valIndex.getInt(i)));
}
}
}
for (Object val : nonmatchingVals) {
final IntList valIndex = valueIndex.find(val);
Assert.assertEquals(0, valIndex.size());
}
}
catch (IOException e) {
throw new RuntimeException(e);
}
}
private void checkNonIndexedReader(String columnName)
{
checkColumnSelectorFactory(columnName);
try (final Closer closer = Closer.create()) {
final int columnIndex = columnNames.indexOf(columnName);
final int numRows = backingSegment.asStorageAdapter().getNumRows();
final IndexedTable.Reader reader = broadcastTable.columnReader(columnIndex);
closer.register(reader);
final SimpleAscendingOffset offset = new SimpleAscendingOffset(numRows);
final BaseColumn theColumn = backingSegment.asQueryableIndex()
.getColumnHolder(columnName)
.getColumn();
closer.register(theColumn);
final BaseObjectColumnValueSelector<?> selector = theColumn.makeColumnValueSelector(offset);
// compare with selector make sure reader can read correct values
for (int row = 0; row < numRows; row++) {
offset.setCurrentOffset(row);
Assert.assertEquals(selector.getObject(), reader.read(row));
}
// make sure it doesn't have an index since it isn't a key column
try {
Assert.assertEquals(null, broadcastTable.columnIndex(columnIndex));
}
catch (IAE iae) {
Assert.assertEquals(StringUtils.format("Column[%d] is not a key column", columnIndex), iae.getMessage());
}
}
catch (IOException e) {
throw new RuntimeException(e);
}
}
private void checkColumnSelectorFactory(String columnName)
{
try (final Closer closer = Closer.create()) {
final int numRows = backingSegment.asStorageAdapter().getNumRows();
final SimpleAscendingOffset offset = new SimpleAscendingOffset(numRows);
final BaseColumn theColumn = backingSegment.asQueryableIndex()
.getColumnHolder(columnName)
.getColumn();
closer.register(theColumn);
final BaseObjectColumnValueSelector<?> selector = theColumn.makeColumnValueSelector(offset);
ColumnSelectorFactory tableFactory = broadcastTable.makeColumnSelectorFactory(offset, false, closer);
final BaseObjectColumnValueSelector<?> tableSelector = tableFactory.makeColumnValueSelector(columnName);
// compare with base segment selector to make sure tables selector can read correct values
for (int row = 0; row < numRows; row++) {
offset.setCurrentOffset(row);
Assert.assertEquals(selector.getObject(), tableSelector.getObject());
}
}
catch (IOException e) {
throw new RuntimeException(e);
}
}
}

View File

@ -23,6 +23,7 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.math.expr.ExprMacroTable;
import org.apache.druid.query.InlineDataSource;
import org.apache.druid.query.dimension.DefaultDimensionSpec;
@ -108,6 +109,7 @@ public class IndexedTableJoinableTest
{
target = new IndexedTableJoinable(indexedTable);
}
@Test
public void getAvailableColumns()
{
@ -169,7 +171,13 @@ public class IndexedTableJoinableTest
PREFIX,
ExprMacroTable.nil()
);
final JoinMatcher joinMatcher = target.makeJoinMatcher(dummyColumnSelectorFactory, condition, false);
final JoinMatcher joinMatcher = target.makeJoinMatcher(
dummyColumnSelectorFactory,
condition,
false,
false,
Closer.create()
);
final DimensionSelector selector = joinMatcher.getColumnSelectorFactory()
.makeDimensionSelector(DefaultDimensionSpec.of("str"));

View File

@ -0,0 +1,127 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.segment.loading;
import com.fasterxml.jackson.databind.InjectableValues;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.jackson.SegmentizerModule;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.math.expr.ExprMacroTable;
import org.apache.druid.query.expression.TestExprMacroTable;
import org.apache.druid.segment.IndexIO;
import org.apache.druid.segment.IndexMerger;
import org.apache.druid.segment.IndexMergerV9;
import org.apache.druid.segment.IndexSpec;
import org.apache.druid.segment.Segment;
import org.apache.druid.segment.TestIndex;
import org.apache.druid.segment.incremental.IncrementalIndex;
import org.apache.druid.segment.join.table.BroadcastSegmentIndexedTable;
import org.apache.druid.segment.join.table.IndexedTable;
import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
import org.apache.druid.testing.InitializedNullHandlingTest;
import org.apache.druid.timeline.DataSegment;
import org.joda.time.Interval;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import java.io.File;
import java.io.IOException;
import java.util.List;
import java.util.Set;
public class BroadcastJoinableMMappedQueryableSegmentizerFactoryTest extends InitializedNullHandlingTest
{
private static final String TABLE_NAME = "test";
private static final Set<String> KEY_COLUMNS =
ImmutableSet.of("market", "longNumericNull", "doubleNumericNull", "floatNumericNull", "partial_null_column");
@Rule
public TemporaryFolder temporaryFolder = new TemporaryFolder();
@Test
public void testSegmentizer() throws IOException, SegmentLoadingException
{
final ObjectMapper mapper = new DefaultObjectMapper();
mapper.registerModule(new SegmentizerModule());
final IndexIO indexIO = new IndexIO(mapper, () -> 0);
mapper.setInjectableValues(
new InjectableValues.Std()
.addValue(ExprMacroTable.class.getName(), TestExprMacroTable.INSTANCE)
.addValue(ObjectMapper.class.getName(), mapper)
.addValue(IndexIO.class, indexIO)
.addValue(DataSegment.PruneSpecsHolder.class, DataSegment.PruneSpecsHolder.DEFAULT)
);
IndexMerger indexMerger = new IndexMergerV9(mapper, indexIO, OffHeapMemorySegmentWriteOutMediumFactory.instance());
SegmentizerFactory expectedFactory = new BroadcastJoinableMMappedQueryableSegmentizerFactory(
indexIO,
KEY_COLUMNS
);
Interval testInterval = Intervals.of("2011-01-12T00:00:00.000Z/2011-05-01T00:00:00.000Z");
IncrementalIndex data = TestIndex.makeRealtimeIndex("druid.sample.numeric.tsv");
List<String> columnNames = data.getColumnNames();
File segment = new File(temporaryFolder.newFolder(), "segment");
File persistedSegmentRoot = indexMerger.persist(
data,
testInterval,
segment,
new IndexSpec(
null,
null,
null,
null,
expectedFactory
),
null
);
File factoryJson = new File(persistedSegmentRoot, "factory.json");
Assert.assertTrue(factoryJson.exists());
SegmentizerFactory factory = mapper.readValue(factoryJson, SegmentizerFactory.class);
Assert.assertTrue(factory instanceof BroadcastJoinableMMappedQueryableSegmentizerFactory);
Assert.assertEquals(expectedFactory, factory);
// load a segment
final DataSegment dataSegment = new DataSegment(
TABLE_NAME,
testInterval,
DateTimes.nowUtc().toString(),
ImmutableMap.of(),
columnNames,
ImmutableList.of(),
null,
null,
persistedSegmentRoot.getTotalSpace()
);
final Segment loaded = factory.factorize(dataSegment, persistedSegmentRoot, false);
final BroadcastSegmentIndexedTable table = (BroadcastSegmentIndexedTable) loaded.as(IndexedTable.class);
Assert.assertNotNull(table);
}
}

View File

@ -71,12 +71,20 @@ public class DruidBinders
);
}
public static MapBinder<Class<? extends DataSource>, JoinableFactory> joinableFactoryBinder(Binder binder)
public static Multibinder<JoinableFactory> joinableFactoryMultiBinder(Binder binder)
{
return MapBinder.newMapBinder(
return Multibinder.newSetBinder(
binder,
new TypeLiteral<Class<? extends DataSource>>() {},
new TypeLiteral<JoinableFactory>() {}
);
}
public static MapBinder<Class<? extends JoinableFactory>, Class<? extends DataSource>> joinableMappingBinder(Binder binder)
{
return MapBinder.newMapBinder(
binder,
new TypeLiteral<Class<? extends JoinableFactory>>() {},
new TypeLiteral<Class<? extends DataSource>>() {}
);
}
}

View File

@ -25,9 +25,12 @@ import com.google.inject.Binder;
import com.google.inject.Module;
import com.google.inject.Scopes;
import com.google.inject.multibindings.MapBinder;
import com.google.inject.multibindings.Multibinder;
import org.apache.druid.query.DataSource;
import org.apache.druid.query.GlobalTableDataSource;
import org.apache.druid.query.InlineDataSource;
import org.apache.druid.query.LookupDataSource;
import org.apache.druid.segment.join.BroadcastTableJoinableFactory;
import org.apache.druid.segment.join.InlineJoinableFactory;
import org.apache.druid.segment.join.JoinableFactory;
import org.apache.druid.segment.join.LookupJoinableFactory;
@ -47,17 +50,22 @@ public class JoinableFactoryModule implements Module
static final Map<Class<? extends DataSource>, Class<? extends JoinableFactory>> FACTORY_MAPPINGS =
ImmutableMap.of(
InlineDataSource.class, InlineJoinableFactory.class,
LookupDataSource.class, LookupJoinableFactory.class
LookupDataSource.class, LookupJoinableFactory.class,
GlobalTableDataSource.class, BroadcastTableJoinableFactory.class
);
@Override
public void configure(Binder binder)
{
MapBinder<Class<? extends DataSource>, JoinableFactory> joinableFactories =
DruidBinders.joinableFactoryBinder(binder);
// this binder maps JoinableFactory implementations to the type of DataSource they can handle
MapBinder<Class<? extends JoinableFactory>, Class<? extends DataSource>> joinableFactoryMappingBinder =
DruidBinders.joinableMappingBinder(binder);
Multibinder<JoinableFactory> joinableFactoryMultibinder = DruidBinders.joinableFactoryMultiBinder(binder);
FACTORY_MAPPINGS.forEach((ds, factory) -> {
joinableFactories.addBinding(ds).to(factory);
joinableFactoryMultibinder.addBinding().to(factory);
joinableFactoryMappingBinder.addBinding(factory).toInstance(ds);
binder.bind(factory).in(LazySingleton.class);
});

View File

@ -0,0 +1,79 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.segment.join;
import com.google.common.collect.Iterators;
import com.google.inject.Inject;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.query.DataSource;
import org.apache.druid.query.GlobalTableDataSource;
import org.apache.druid.query.planning.DataSourceAnalysis;
import org.apache.druid.segment.join.table.IndexedTableJoinable;
import org.apache.druid.segment.join.table.ReferenceCountingIndexedTable;
import org.apache.druid.server.SegmentManager;
import java.util.Iterator;
import java.util.Optional;
public class BroadcastTableJoinableFactory implements JoinableFactory
{
private final SegmentManager segmentManager;
@Inject
public BroadcastTableJoinableFactory(SegmentManager segmentManager)
{
this.segmentManager = segmentManager;
}
@Override
public boolean isDirectlyJoinable(DataSource dataSource)
{
GlobalTableDataSource broadcastDatasource = (GlobalTableDataSource) dataSource;
return broadcastDatasource != null && segmentManager.hasIndexedTables(broadcastDatasource.getName());
}
@Override
public Optional<Joinable> build(
DataSource dataSource,
JoinConditionAnalysis condition
)
{
GlobalTableDataSource broadcastDatasource = (GlobalTableDataSource) dataSource;
if (condition.canHashJoin()) {
DataSourceAnalysis analysis = DataSourceAnalysis.forDataSource(broadcastDatasource);
return segmentManager.getIndexedTables(analysis).map(tables -> {
Iterator<ReferenceCountingIndexedTable> tableIterator = tables.iterator();
if (!tableIterator.hasNext()) {
return null;
}
try {
return new IndexedTableJoinable(Iterators.getOnlyElement(tableIterator));
}
catch (IllegalArgumentException iae) {
throw new ISE(
"Currently only single segment datasources are supported for broadcast joins, dataSource[%s] has multiple segments. Reingest the data so that it is entirely contained within a single segment to use in JOIN queries.",
broadcastDatasource.getName()
);
}
});
}
return Optional.empty();
}
}

View File

@ -24,24 +24,33 @@ import com.google.common.collect.Ordering;
import com.google.inject.Inject;
import org.apache.druid.common.guava.SettableSupplier;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.query.TableDataSource;
import org.apache.druid.query.planning.DataSourceAnalysis;
import org.apache.druid.segment.ReferenceCountingSegment;
import org.apache.druid.segment.Segment;
import org.apache.druid.segment.join.table.IndexedTable;
import org.apache.druid.segment.join.table.ReferenceCountingIndexedTable;
import org.apache.druid.segment.loading.SegmentLoader;
import org.apache.druid.segment.loading.SegmentLoadingException;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.SegmentId;
import org.apache.druid.timeline.VersionedIntervalTimeline;
import org.apache.druid.timeline.partition.PartitionChunk;
import org.apache.druid.timeline.partition.PartitionHolder;
import org.apache.druid.timeline.partition.ShardSpec;
import org.apache.druid.utils.CollectionUtils;
import java.io.IOException;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
/**
* This class is responsible for managing data sources and their states like timeline, total segment size, and number of
@ -61,6 +70,8 @@ public class SegmentManager
{
private final VersionedIntervalTimeline<String, ReferenceCountingSegment> timeline =
new VersionedIntervalTimeline<>(Ordering.natural());
private final ConcurrentHashMap<SegmentId, ReferenceCountingIndexedTable> tablesLookup = new ConcurrentHashMap<>();
private long totalSegmentSize;
private long numSegments;
@ -81,6 +92,11 @@ public class SegmentManager
return timeline;
}
public ConcurrentHashMap<SegmentId, ReferenceCountingIndexedTable> getTablesLookup()
{
return tablesLookup;
}
public long getTotalSegmentSize()
{
return totalSegmentSize;
@ -155,13 +171,44 @@ public class SegmentManager
*/
public Optional<VersionedIntervalTimeline<String, ReferenceCountingSegment>> getTimeline(DataSourceAnalysis analysis)
{
final TableDataSource tableDataSource =
analysis.getBaseTableDataSource()
.orElseThrow(() -> new ISE("Cannot handle datasource: %s", analysis.getDataSource()));
final TableDataSource tableDataSource = getTableDataSource(analysis);
return Optional.ofNullable(dataSources.get(tableDataSource.getName())).map(DataSourceState::getTimeline);
}
/**
* Returns the collection of {@link IndexedTable} for the entire timeline (since join conditions do not currently
* consider the queries intervals), if the timeline exists for each of its segments that are joinable.
*/
public Optional<Stream<ReferenceCountingIndexedTable>> getIndexedTables(DataSourceAnalysis analysis)
{
return getTimeline(analysis).map(timeline -> {
// join doesn't currently consider intervals, so just consider all segments
final Stream<ReferenceCountingSegment> segments =
timeline.lookup(Intervals.ETERNITY)
.stream()
.flatMap(x -> StreamSupport.stream(x.getObject().payloads().spliterator(), false));
final TableDataSource tableDataSource = getTableDataSource(analysis);
ConcurrentHashMap<SegmentId, ReferenceCountingIndexedTable> tables =
Optional.ofNullable(dataSources.get(tableDataSource.getName())).map(DataSourceState::getTablesLookup)
.orElseThrow(() -> new ISE("Datasource %s does not have IndexedTables", tableDataSource.getName()));
return segments.map(segment -> tables.get(segment.getId())).filter(Objects::nonNull);
});
}
public boolean hasIndexedTables(String dataSourceName)
{
if (dataSources.containsKey(dataSourceName)) {
return dataSources.get(dataSourceName).tablesLookup.size() > 0;
}
return false;
}
private TableDataSource getTableDataSource(DataSourceAnalysis analysis)
{
return analysis.getBaseTableDataSource()
.orElseThrow(() -> new ISE("Cannot handle datasource: %s", analysis.getDataSource()));
}
/**
* Load a single segment.
*
@ -194,6 +241,17 @@ public class SegmentManager
log.warn("Told to load an adapter for segment[%s] that already exists", segment.getId());
resultSupplier.set(false);
} else {
IndexedTable table = adapter.as(IndexedTable.class);
if (table != null) {
if (dataSourceState.isEmpty() || dataSourceState.numSegments == dataSourceState.tablesLookup.size()) {
dataSourceState.tablesLookup.put(segment.getId(), new ReferenceCountingIndexedTable(table));
} else {
log.error("Cannot load segment[%s] with IndexedTable, no existing segments are joinable", segment.getId());
}
} else if (dataSourceState.tablesLookup.size() > 0) {
log.error("Cannot load segment[%s] without IndexedTable, all existing segments are joinable", segment.getId());
}
loadedIntervals.add(
segment.getInterval(),
segment.getVersion(),
@ -203,7 +261,9 @@ public class SegmentManager
);
dataSourceState.addSegment(segment);
resultSupplier.set(true);
}
return dataSourceState;
}
);
@ -254,10 +314,18 @@ public class SegmentManager
final ReferenceCountingSegment oldQueryable = (removed == null) ? null : removed.getObject();
if (oldQueryable != null) {
dataSourceState.removeSegment(segment);
log.info("Attempting to close segment %s", segment.getId());
oldQueryable.close();
try (final Closer closer = Closer.create()) {
dataSourceState.removeSegment(segment);
closer.register(oldQueryable);
log.info("Attempting to close segment %s", segment.getId());
final ReferenceCountingIndexedTable oldTable = dataSourceState.tablesLookup.remove(segment.getId());
if (oldTable != null) {
closer.register(oldTable);
}
}
catch (IOException e) {
throw new RuntimeException(e);
}
} else {
log.info(
"Told to delete a queryable on dataSource[%s] for interval[%s] and version[%s] that I don't have.",

View File

@ -27,13 +27,20 @@ import com.google.inject.Module;
import com.google.inject.Scopes;
import com.google.inject.TypeLiteral;
import org.apache.druid.query.DataSource;
import org.apache.druid.query.GlobalTableDataSource;
import org.apache.druid.query.InlineDataSource;
import org.apache.druid.query.LookupDataSource;
import org.apache.druid.query.expression.LookupEnabledTestExprMacroTable;
import org.apache.druid.query.lookup.LookupExtractorFactoryContainerProvider;
import org.apache.druid.segment.join.BroadcastTableJoinableFactory;
import org.apache.druid.segment.join.InlineJoinableFactory;
import org.apache.druid.segment.join.JoinableFactory;
import org.apache.druid.segment.join.LookupJoinableFactory;
import org.apache.druid.segment.join.MapJoinableFactory;
import org.apache.druid.segment.join.NoopDataSource;
import org.apache.druid.segment.join.NoopJoinableFactory;
import org.hamcrest.CoreMatchers;
import org.apache.druid.server.SegmentManager;
import org.easymock.EasyMock;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@ -64,28 +71,44 @@ public class JoinableFactoryModuleTest
@Test
public void testInjectDefaultBindingsShouldBeInjected()
{
Map<Class<? extends DataSource>, JoinableFactory> joinableFactories =
injector.getInstance(Key.get(new TypeLiteral<Map<Class<? extends DataSource>, JoinableFactory>>() {}));
Assert.assertEquals(JoinableFactoryModule.FACTORY_MAPPINGS.size(), joinableFactories.size());
final Set<Map.Entry<Class<? extends DataSource>, Class<? extends JoinableFactory>>> expectedEntries =
JoinableFactoryModule.FACTORY_MAPPINGS.entrySet();
for (Map.Entry<Class<? extends DataSource>, Class<? extends JoinableFactory>> entry : expectedEntries) {
Assert.assertThat(joinableFactories.get(entry.getKey()), CoreMatchers.instanceOf(entry.getValue()));
}
final Set<JoinableFactory> factories =
injector.getInstance(Key.get(new TypeLiteral<Set<JoinableFactory>>() {}));
Assert.assertEquals(JoinableFactoryModule.FACTORY_MAPPINGS.size(), factories.size());
Map<Class<? extends JoinableFactory>, Class<? extends DataSource>> joinableFactoriesMappings = injector.getInstance(
Key.get(new TypeLiteral<Map<Class<? extends JoinableFactory>, Class<? extends DataSource>>>() {})
);
assertDefaultFactories(joinableFactoriesMappings);
}
@Test
public void testJoinableFactoryCanBind()
{
injector = makeInjectorWithProperties(
binder -> DruidBinders
.joinableFactoryBinder(binder).addBinding(NoopDataSource.class).toInstance(NoopJoinableFactory.INSTANCE));
Map<Class<? extends DataSource>, JoinableFactory> joinableFactories =
injector.getInstance(Key.get(new TypeLiteral<Map<Class<? extends DataSource>, JoinableFactory>>() {}));
Assert.assertEquals(JoinableFactoryModule.FACTORY_MAPPINGS.size() + 1, joinableFactories.size());
Assert.assertEquals(NoopJoinableFactory.INSTANCE, joinableFactories.get(NoopDataSource.class));
binder -> {
DruidBinders.joinableFactoryMultiBinder(binder).addBinding().toInstance(NoopJoinableFactory.INSTANCE);
DruidBinders.joinableMappingBinder(binder).addBinding(NoopJoinableFactory.class).toInstance(NoopDataSource.class);
}
);
Map<Class<? extends JoinableFactory>, Class<? extends DataSource>> joinableFactoriesMappings = injector.getInstance(
Key.get(new TypeLiteral<Map<Class<? extends JoinableFactory>, Class<? extends DataSource>>>() {})
);
Set<JoinableFactory> factories = injector.getInstance(Key.get(new TypeLiteral<Set<JoinableFactory>>() {}));
Assert.assertEquals(JoinableFactoryModule.FACTORY_MAPPINGS.size() + 1, factories.size());
Assert.assertEquals(NoopDataSource.class, joinableFactoriesMappings.get(NoopJoinableFactory.class));
assertDefaultFactories(joinableFactoriesMappings);
}
private void assertDefaultFactories(
Map<Class<? extends JoinableFactory>, Class<? extends DataSource>> joinableFactoriesMappings
)
{
Assert.assertEquals(LookupDataSource.class, joinableFactoriesMappings.get(LookupJoinableFactory.class));
Assert.assertEquals(InlineDataSource.class, joinableFactoriesMappings.get(InlineJoinableFactory.class));
Assert.assertEquals(
GlobalTableDataSource.class,
joinableFactoriesMappings.get(BroadcastTableJoinableFactory.class)
);
}
private Injector makeInjectorWithProperties(Module... otherModules)
@ -97,6 +120,7 @@ public class JoinableFactoryModuleTest
ImmutableList.<Module>builder()
.add(new JoinableFactoryModule())
.add(binder -> binder.bind(LookupExtractorFactoryContainerProvider.class).toInstance(lookupProvider))
.add(binder -> binder.bind(SegmentManager.class).toInstance(EasyMock.createMock(SegmentManager.class)))
.add(binder -> binder.bindScope(LazySingleton.class, Scopes.SINGLETON));
for (final Module otherModule : otherModules) {

View File

@ -22,6 +22,7 @@ package org.apache.druid.server;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import org.apache.druid.client.DirectDruidClient;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.granularity.Granularities;
@ -656,23 +657,26 @@ public class ClientQuerySegmentWalkerTest
.build()
);
final JoinableFactory joinableFactory = new MapJoinableFactory(
ImmutableMap.<Class<? extends DataSource>, JoinableFactory>builder()
.put(InlineDataSource.class, new InlineJoinableFactory())
.put(GlobalTableDataSource.class, new JoinableFactory()
{
@Override
public boolean isDirectlyJoinable(DataSource dataSource)
{
return ((GlobalTableDataSource) dataSource).getName().equals(GLOBAL);
}
final JoinableFactory globalFactory = new JoinableFactory()
{
@Override
public boolean isDirectlyJoinable(DataSource dataSource)
{
return ((GlobalTableDataSource) dataSource).getName().equals(GLOBAL);
}
@Override
public Optional<Joinable> build(DataSource dataSource, JoinConditionAnalysis condition)
{
return Optional.empty();
}
})
@Override
public Optional<Joinable> build(DataSource dataSource, JoinConditionAnalysis condition)
{
return Optional.empty();
}
};
final JoinableFactory joinableFactory = new MapJoinableFactory(
ImmutableSet.of(globalFactory, new InlineJoinableFactory()),
ImmutableMap.<Class<? extends JoinableFactory>, Class<? extends DataSource>>builder()
.put(InlineJoinableFactory.class, InlineDataSource.class)
.put(globalFactory.getClass(), GlobalTableDataSource.class)
.build()
);

View File

@ -20,6 +20,7 @@
package org.apache.druid.server;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import org.apache.druid.client.cache.CacheConfig;
import org.apache.druid.collections.CloseableStupidPool;
import org.apache.druid.java.util.common.Pair;
@ -68,7 +69,7 @@ import org.apache.druid.segment.TestHelper;
import org.apache.druid.segment.join.InlineJoinableFactory;
import org.apache.druid.segment.join.JoinableFactory;
import org.apache.druid.segment.join.LookupJoinableFactory;
import org.apache.druid.segment.join.MapJoinableFactoryTest;
import org.apache.druid.segment.join.MapJoinableFactory;
import org.apache.druid.server.initialization.ServerConfig;
import org.apache.druid.server.metrics.NoopServiceEmitter;
import org.apache.druid.server.scheduling.ManualQueryPrioritizationStrategy;
@ -78,6 +79,7 @@ import org.apache.druid.timeline.VersionedIntervalTimeline;
import javax.annotation.Nullable;
import java.nio.ByteBuffer;
import java.util.Map;
import java.util.Set;
/**
* Utilities for creating query-stack objects for tests.
@ -303,22 +305,31 @@ public class QueryStackTests
LookupExtractorFactoryContainerProvider lookupProvider
)
{
return makeJoinableFactoryFromDefault(lookupProvider, null);
return makeJoinableFactoryFromDefault(lookupProvider, null, null);
}
public static JoinableFactory makeJoinableFactoryFromDefault(
@Nullable LookupExtractorFactoryContainerProvider lookupProvider,
@Nullable Map<Class<? extends DataSource>, JoinableFactory> custom
@Nullable Set<JoinableFactory> customFactories,
@Nullable Map<Class<? extends JoinableFactory>, Class<? extends DataSource>> customMappings
)
{
ImmutableMap.Builder<Class<? extends DataSource>, JoinableFactory> builder = ImmutableMap.builder();
builder.put(InlineDataSource.class, new InlineJoinableFactory());
ImmutableSet.Builder<JoinableFactory> setBuilder = ImmutableSet.builder();
ImmutableMap.Builder<Class<? extends JoinableFactory>, Class<? extends DataSource>> mapBuilder =
ImmutableMap.builder();
setBuilder.add(new InlineJoinableFactory());
mapBuilder.put(InlineJoinableFactory.class, InlineDataSource.class);
if (lookupProvider != null) {
builder.put(LookupDataSource.class, new LookupJoinableFactory(lookupProvider));
setBuilder.add(new LookupJoinableFactory(lookupProvider));
mapBuilder.put(LookupJoinableFactory.class, LookupDataSource.class);
}
if (custom != null) {
builder.putAll(custom);
if (customFactories != null) {
setBuilder.addAll(customFactories);
}
return MapJoinableFactoryTest.fromMap(builder.build());
if (customMappings != null) {
mapBuilder.putAll(customMappings);
}
return new MapJoinableFactory(setBuilder.build(), mapBuilder.build());
}
}

View File

@ -0,0 +1,343 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.server;
import com.fasterxml.jackson.databind.InjectableValues;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.jsontype.NamedType;
import com.fasterxml.jackson.databind.module.SimpleModule;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.jackson.SegmentizerModule;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.FileUtils;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.math.expr.ExprMacroTable;
import org.apache.druid.query.DataSource;
import org.apache.druid.query.GlobalTableDataSource;
import org.apache.druid.query.expression.TestExprMacroTable;
import org.apache.druid.segment.IndexIO;
import org.apache.druid.segment.IndexMerger;
import org.apache.druid.segment.IndexMergerV9;
import org.apache.druid.segment.IndexSpec;
import org.apache.druid.segment.TestIndex;
import org.apache.druid.segment.incremental.IncrementalIndex;
import org.apache.druid.segment.join.BroadcastTableJoinableFactory;
import org.apache.druid.segment.join.JoinConditionAnalysis;
import org.apache.druid.segment.join.Joinable;
import org.apache.druid.segment.loading.BroadcastJoinableMMappedQueryableSegmentizerFactory;
import org.apache.druid.segment.loading.DataSegmentPusher;
import org.apache.druid.segment.loading.LocalDataSegmentPuller;
import org.apache.druid.segment.loading.LocalLoadSpec;
import org.apache.druid.segment.loading.SegmentLoaderConfig;
import org.apache.druid.segment.loading.SegmentLoaderLocalCacheManager;
import org.apache.druid.segment.loading.SegmentLoadingException;
import org.apache.druid.segment.loading.SegmentizerFactory;
import org.apache.druid.segment.loading.StorageLocationConfig;
import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
import org.apache.druid.server.metrics.NoopServiceEmitter;
import org.apache.druid.testing.InitializedNullHandlingTest;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.NumberedShardSpec;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.rules.TemporaryFolder;
import java.io.File;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.Set;
public class SegmentManagerBroadcastJoinIndexedTableTest extends InitializedNullHandlingTest
{
private static final String TABLE_NAME = "test";
private static final String PREFIX = "j0";
private static final Set<String> KEY_COLUMNS =
ImmutableSet.of("market", "longNumericNull", "doubleNumericNull", "floatNumericNull", "partial_null_column");
@Rule
public TemporaryFolder temporaryFolder = new TemporaryFolder();
@Rule
public ExpectedException expectedException = ExpectedException.none();
private LocalDataSegmentPuller segmentPuller;
private ObjectMapper objectMapper;
private IndexIO indexIO;
private File segmentCacheDir;
private File segmentDeepStorageDir;
private SegmentLoaderLocalCacheManager segmentLoader;
private SegmentManager segmentManager;
private BroadcastTableJoinableFactory joinableFactory;
@Before
public void setup() throws IOException
{
segmentPuller = new LocalDataSegmentPuller();
objectMapper = new DefaultObjectMapper()
.registerModule(new SegmentizerModule())
.registerModule(
new SimpleModule().registerSubtypes(new NamedType(LocalLoadSpec.class, "local"))
);
indexIO = new IndexIO(objectMapper, () -> 0);
objectMapper.setInjectableValues(
new InjectableValues.Std().addValue(LocalDataSegmentPuller.class, segmentPuller)
.addValue(ExprMacroTable.class.getName(), TestExprMacroTable.INSTANCE)
.addValue(ObjectMapper.class.getName(), objectMapper)
.addValue(IndexIO.class, indexIO)
);
segmentCacheDir = temporaryFolder.newFolder();
segmentDeepStorageDir = temporaryFolder.newFolder();
segmentLoader = new SegmentLoaderLocalCacheManager(
indexIO,
new SegmentLoaderConfig()
{
@Override
public List<StorageLocationConfig> getLocations()
{
return Collections.singletonList(
new StorageLocationConfig(segmentCacheDir, null, null)
);
}
},
objectMapper
);
segmentManager = new SegmentManager(segmentLoader);
joinableFactory = new BroadcastTableJoinableFactory(segmentManager);
EmittingLogger.registerEmitter(new NoopServiceEmitter());
}
@After
public void teardown() throws IOException
{
FileUtils.deleteDirectory(segmentCacheDir);
}
@Test
public void testLoadIndexedTable() throws IOException, SegmentLoadingException
{
final DataSource dataSource = new GlobalTableDataSource(TABLE_NAME);
Assert.assertFalse(joinableFactory.isDirectlyJoinable(dataSource));
final String version = DateTimes.nowUtc().toString();
IncrementalIndex data = TestIndex.makeRealtimeIndex("druid.sample.numeric.tsv");
final String interval = "2011-01-12T00:00:00.000Z/2011-05-01T00:00:00.000Z";
DataSegment segment = createSegment(data, interval, version);
Assert.assertTrue(segmentManager.loadSegment(segment, false));
Assert.assertTrue(joinableFactory.isDirectlyJoinable(dataSource));
Optional<Joinable> maybeJoinable = makeJoinable(dataSource);
Assert.assertTrue(maybeJoinable.isPresent());
Joinable joinable = maybeJoinable.get();
// cardinality currently tied to number of rows,
Assert.assertEquals(1210, joinable.getCardinality("market"));
Assert.assertEquals(1210, joinable.getCardinality("placement"));
Assert.assertEquals(
Optional.of(ImmutableSet.of("preferred")),
joinable.getCorrelatedColumnValues(
"market",
"spot",
"placement",
Long.MAX_VALUE,
false
)
);
// dropping the segment should make the table no longer available
segmentManager.dropSegment(segment);
maybeJoinable = makeJoinable(dataSource);
Assert.assertFalse(maybeJoinable.isPresent());
}
@Test
public void testLoadMultipleIndexedTableOverwrite() throws IOException, SegmentLoadingException
{
final DataSource dataSource = new GlobalTableDataSource(TABLE_NAME);
Assert.assertFalse(joinableFactory.isDirectlyJoinable(dataSource));
// larger interval overwrites smaller interval
final String version = DateTimes.nowUtc().toString();
final String version2 = DateTimes.nowUtc().plus(1000L).toString();
final String interval = "2011-01-12T00:00:00.000Z/2011-03-28T00:00:00.000Z";
final String interval2 = "2011-01-12T00:00:00.000Z/2011-05-01T00:00:00.000Z";
IncrementalIndex data = TestIndex.makeRealtimeIndex("druid.sample.numeric.tsv.top");
IncrementalIndex data2 = TestIndex.makeRealtimeIndex("druid.sample.numeric.tsv.bottom");
DataSegment segment1 = createSegment(data, interval, version);
DataSegment segment2 = createSegment(data2, interval2, version2);
Assert.assertTrue(segmentManager.loadSegment(segment1, false));
Assert.assertTrue(segmentManager.loadSegment(segment2, false));
Assert.assertTrue(joinableFactory.isDirectlyJoinable(dataSource));
Optional<Joinable> maybeJoinable = makeJoinable(dataSource);
Assert.assertTrue(maybeJoinable.isPresent());
Joinable joinable = maybeJoinable.get();
// cardinality currently tied to number of rows,
Assert.assertEquals(733, joinable.getCardinality("market"));
Assert.assertEquals(733, joinable.getCardinality("placement"));
Assert.assertEquals(
Optional.of(ImmutableSet.of("preferred")),
joinable.getCorrelatedColumnValues(
"market",
"spot",
"placement",
Long.MAX_VALUE,
false
)
);
segmentManager.dropSegment(segment2);
// if new segment is dropped for some reason that probably never happens, old table should still exist..
maybeJoinable = makeJoinable(dataSource);
Assert.assertTrue(maybeJoinable.isPresent());
joinable = maybeJoinable.get();
// cardinality currently tied to number of rows,
Assert.assertEquals(478, joinable.getCardinality("market"));
Assert.assertEquals(478, joinable.getCardinality("placement"));
Assert.assertEquals(
Optional.of(ImmutableSet.of("preferred")),
joinable.getCorrelatedColumnValues(
"market",
"spot",
"placement",
Long.MAX_VALUE,
false
)
);
}
@Test
public void testLoadMultipleIndexedTable() throws IOException, SegmentLoadingException
{
final DataSource dataSource = new GlobalTableDataSource(TABLE_NAME);
Assert.assertFalse(joinableFactory.isDirectlyJoinable(dataSource));
final String version = DateTimes.nowUtc().toString();
final String version2 = DateTimes.nowUtc().plus(1000L).toString();
final String interval = "2011-01-12T00:00:00.000Z/2011-05-01T00:00:00.000Z";
final String interval2 = "2011-01-12T00:00:00.000Z/2011-03-28T00:00:00.000Z";
IncrementalIndex data = TestIndex.makeRealtimeIndex("druid.sample.numeric.tsv.bottom");
IncrementalIndex data2 = TestIndex.makeRealtimeIndex("druid.sample.numeric.tsv.top");
Assert.assertTrue(segmentManager.loadSegment(createSegment(data, interval, version), false));
Assert.assertTrue(joinableFactory.isDirectlyJoinable(dataSource));
Optional<Joinable> maybeJoinable = makeJoinable(dataSource);
Assert.assertTrue(maybeJoinable.isPresent());
Joinable joinable = maybeJoinable.get();
// cardinality currently tied to number of rows,
Assert.assertEquals(733, joinable.getCardinality("market"));
Assert.assertEquals(733, joinable.getCardinality("placement"));
Assert.assertEquals(
Optional.of(ImmutableSet.of("preferred")),
joinable.getCorrelatedColumnValues(
"market",
"spot",
"placement",
Long.MAX_VALUE,
false
)
);
// add another segment with smaller interval, only partially overshadows so there will be 2 segments in timeline
Assert.assertTrue(segmentManager.loadSegment(createSegment(data2, interval2, version2), false));
expectedException.expect(ISE.class);
expectedException.expectMessage(
StringUtils.format(
"Currently only single segment datasources are supported for broadcast joins, dataSource[%s] has multiple segments. Reingest the data so that it is entirely contained within a single segment to use in JOIN queries.",
TABLE_NAME
)
);
// this will explode because datasource has multiple segments which is an invalid state for the joinable factory
makeJoinable(dataSource);
}
private Optional<Joinable> makeJoinable(DataSource dataSource)
{
return joinableFactory.build(
dataSource,
JoinConditionAnalysis.forExpression(
StringUtils.format("market == \"%s.market\"", PREFIX),
PREFIX,
ExprMacroTable.nil()
)
);
}
private DataSegment createSegment(IncrementalIndex data, String interval, String version) throws IOException
{
final DataSegment tmpSegment = new DataSegment(
TABLE_NAME,
Intervals.of(interval),
version,
Collections.emptyMap(),
Collections.emptyList(),
Collections.emptyList(),
new NumberedShardSpec(0, 0),
9,
100
);
final String storageDir = DataSegmentPusher.getDefaultStorageDir(tmpSegment, false);
final File segmentDir = new File(segmentDeepStorageDir, storageDir);
org.apache.commons.io.FileUtils.forceMkdir(segmentDir);
IndexMerger indexMerger =
new IndexMergerV9(objectMapper, indexIO, OffHeapMemorySegmentWriteOutMediumFactory.instance());
SegmentizerFactory factory = new BroadcastJoinableMMappedQueryableSegmentizerFactory(indexIO, KEY_COLUMNS);
indexMerger.persist(
data,
Intervals.of(interval),
segmentDir,
new IndexSpec(
null,
null,
null,
null,
factory
),
null
);
final File factoryJson = new File(segmentDir, "factory.json");
objectMapper.writeValue(factoryJson, factory);
return tmpSegment.withLoadSpec(
ImmutableMap.of("type", "local", "path", segmentDir.getAbsolutePath())
);
}
}

View File

@ -254,7 +254,7 @@ public class SegmentManagerThreadSafetyTest
@Override
public <T> T as(Class<T> clazz)
{
throw new UnsupportedOperationException();
return null;
}
@Override

View File

@ -105,7 +105,7 @@ public class DruidCalciteSchemaModuleTest extends CalciteTestBase
binder -> {
binder.bind(QueryLifecycleFactory.class).toInstance(queryLifecycleFactory);
binder.bind(TimelineServerView.class).toInstance(serverView);
binder.bind(JoinableFactory.class).toInstance(new MapJoinableFactory(ImmutableMap.of()));
binder.bind(JoinableFactory.class).toInstance(new MapJoinableFactory(ImmutableSet.of(), ImmutableMap.of()));
binder.bind(PlannerConfig.class).toInstance(plannerConfig);
binder.bind(ViewManager.class).toInstance(viewManager);
binder.bind(Escalator.class).toInstance(escalator);

View File

@ -20,6 +20,7 @@
package org.apache.druid.sql.calcite.schema;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.query.QueryRunnerFactoryConglomerate;
import org.apache.druid.segment.join.MapJoinableFactory;
@ -55,7 +56,7 @@ public class DruidSchemaNoDataInitTest extends CalciteTestBase
),
new TestServerInventoryView(Collections.emptyList()),
new SegmentManager(EasyMock.createMock(SegmentLoader.class)),
new MapJoinableFactory(ImmutableMap.of()),
new MapJoinableFactory(ImmutableSet.of(), ImmutableMap.of()),
PLANNER_CONFIG_DEFAULT,
new NoopViewManager(),
new NoopEscalator()

View File

@ -254,7 +254,7 @@ public class DruidSchemaTest extends CalciteTestBase
CalciteTests.createMockQueryLifecycleFactory(walker, conglomerate),
serverView,
segmentManager,
new MapJoinableFactory(ImmutableMap.of(GlobalTableDataSource.class, globalTableJoinable)),
new MapJoinableFactory(ImmutableSet.of(globalTableJoinable), ImmutableMap.of(globalTableJoinable.getClass(), GlobalTableDataSource.class)),
PLANNER_CONFIG_DEFAULT,
new NoopViewManager(),
new NoopEscalator()

View File

@ -248,7 +248,7 @@ public class SystemSchemaTest extends CalciteTestBase
CalciteTests.createMockQueryLifecycleFactory(walker, conglomerate),
new TestServerInventoryView(walker.getSegments(), realtimeSegments),
new SegmentManager(EasyMock.createMock(SegmentLoader.class)),
new MapJoinableFactory(ImmutableMap.of()),
new MapJoinableFactory(ImmutableSet.of(), ImmutableMap.of()),
PLANNER_CONFIG_DEFAULT,
new NoopViewManager(),
new NoopEscalator()

View File

@ -729,10 +729,8 @@ public class CalciteTests
{
return QueryStackTests.makeJoinableFactoryFromDefault(
INJECTOR.getInstance(LookupExtractorFactoryContainerProvider.class),
ImmutableMap.of(
GlobalTableDataSource.class,
CUSTOM_ROW_TABLE_JOINABLE
)
ImmutableSet.of(CUSTOM_ROW_TABLE_JOINABLE),
ImmutableMap.of(CUSTOM_ROW_TABLE_JOINABLE.getClass(), GlobalTableDataSource.class)
);
}