diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/ExpressionAggregationBenchmark.java b/benchmarks/src/test/java/org/apache/druid/benchmark/ExpressionAggregationBenchmark.java index 83a13e9a0a6..8afcea4a4c3 100644 --- a/benchmarks/src/test/java/org/apache/druid/benchmark/ExpressionAggregationBenchmark.java +++ b/benchmarks/src/test/java/org/apache/druid/benchmark/ExpressionAggregationBenchmark.java @@ -21,7 +21,6 @@ package org.apache.druid.benchmark; import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterables; -import org.apache.druid.benchmark.datagen.SegmentGenerator; import org.apache.druid.common.config.NullHandling; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.granularity.Granularities; @@ -42,6 +41,7 @@ import org.apache.druid.segment.VirtualColumns; import org.apache.druid.segment.column.ValueType; import org.apache.druid.segment.generator.GeneratorColumnSchema; import org.apache.druid.segment.generator.GeneratorSchemaInfo; +import org.apache.druid.segment.generator.SegmentGenerator; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.partition.LinearShardSpec; import org.openjdk.jmh.annotations.Benchmark; diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/ExpressionFilterBenchmark.java b/benchmarks/src/test/java/org/apache/druid/benchmark/ExpressionFilterBenchmark.java index ea7ac3bdac8..eea1804292a 100644 --- a/benchmarks/src/test/java/org/apache/druid/benchmark/ExpressionFilterBenchmark.java +++ b/benchmarks/src/test/java/org/apache/druid/benchmark/ExpressionFilterBenchmark.java @@ -20,7 +20,6 @@ package org.apache.druid.benchmark; import com.google.common.collect.ImmutableList; -import org.apache.druid.benchmark.datagen.SegmentGenerator; import org.apache.druid.common.config.NullHandling; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.granularity.Granularities; @@ -39,6 +38,7 @@ import org.apache.druid.segment.VirtualColumns; import org.apache.druid.segment.column.ValueType; import org.apache.druid.segment.generator.GeneratorColumnSchema; import org.apache.druid.segment.generator.GeneratorSchemaInfo; +import org.apache.druid.segment.generator.SegmentGenerator; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.partition.LinearShardSpec; import org.openjdk.jmh.annotations.Benchmark; diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/ExpressionSelectorBenchmark.java b/benchmarks/src/test/java/org/apache/druid/benchmark/ExpressionSelectorBenchmark.java index 0ee1db3669c..98440fcac94 100644 --- a/benchmarks/src/test/java/org/apache/druid/benchmark/ExpressionSelectorBenchmark.java +++ b/benchmarks/src/test/java/org/apache/druid/benchmark/ExpressionSelectorBenchmark.java @@ -20,7 +20,6 @@ package org.apache.druid.benchmark; import com.google.common.collect.ImmutableList; -import org.apache.druid.benchmark.datagen.SegmentGenerator; import org.apache.druid.common.config.NullHandling; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.granularity.Granularities; @@ -41,6 +40,7 @@ import org.apache.druid.segment.column.ColumnHolder; import org.apache.druid.segment.column.ValueType; import org.apache.druid.segment.generator.GeneratorColumnSchema; import org.apache.druid.segment.generator.GeneratorSchemaInfo; +import org.apache.druid.segment.generator.SegmentGenerator; import org.apache.druid.segment.virtual.ExpressionVirtualColumn; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.partition.LinearShardSpec; diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/query/CachingClusteredClientBenchmark.java b/benchmarks/src/test/java/org/apache/druid/benchmark/query/CachingClusteredClientBenchmark.java index daa614ff4dd..877aca5a796 100644 --- a/benchmarks/src/test/java/org/apache/druid/benchmark/query/CachingClusteredClientBenchmark.java +++ b/benchmarks/src/test/java/org/apache/druid/benchmark/query/CachingClusteredClientBenchmark.java @@ -27,7 +27,6 @@ import com.google.common.base.Suppliers; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Maps; import com.google.common.collect.Ordering; -import org.apache.druid.benchmark.datagen.SegmentGenerator; import org.apache.druid.client.CachingClusteredClient; import org.apache.druid.client.DruidServer; import org.apache.druid.client.ImmutableDruidServer; @@ -104,6 +103,7 @@ import org.apache.druid.segment.QueryableIndex; import org.apache.druid.segment.QueryableIndexSegment; import org.apache.druid.segment.generator.GeneratorBasicSchemas; import org.apache.druid.segment.generator.GeneratorSchemaInfo; +import org.apache.druid.segment.generator.SegmentGenerator; import org.apache.druid.server.QueryStackTests; import org.apache.druid.server.coordination.ServerType; import org.apache.druid.timeline.DataSegment; diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/query/SqlBenchmark.java b/benchmarks/src/test/java/org/apache/druid/benchmark/query/SqlBenchmark.java index bd63a1a20e9..55dc74c81d2 100644 --- a/benchmarks/src/test/java/org/apache/druid/benchmark/query/SqlBenchmark.java +++ b/benchmarks/src/test/java/org/apache/druid/benchmark/query/SqlBenchmark.java @@ -22,7 +22,6 @@ package org.apache.druid.benchmark.query; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import org.apache.calcite.schema.SchemaPlus; -import org.apache.druid.benchmark.datagen.SegmentGenerator; import org.apache.druid.common.config.NullHandling; import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.java.util.common.guava.Sequence; @@ -32,6 +31,7 @@ import org.apache.druid.query.QueryRunnerFactoryConglomerate; import org.apache.druid.segment.QueryableIndex; import org.apache.druid.segment.generator.GeneratorBasicSchemas; import org.apache.druid.segment.generator.GeneratorSchemaInfo; +import org.apache.druid.segment.generator.SegmentGenerator; import org.apache.druid.server.QueryStackTests; import org.apache.druid.server.security.AuthTestUtils; import org.apache.druid.server.security.AuthenticationResult; diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/query/SqlVsNativeBenchmark.java b/benchmarks/src/test/java/org/apache/druid/benchmark/query/SqlVsNativeBenchmark.java index 2e89c4dda48..32fa2152ec1 100644 --- a/benchmarks/src/test/java/org/apache/druid/benchmark/query/SqlVsNativeBenchmark.java +++ b/benchmarks/src/test/java/org/apache/druid/benchmark/query/SqlVsNativeBenchmark.java @@ -21,7 +21,6 @@ package org.apache.druid.benchmark.query; import com.google.common.collect.ImmutableList; import org.apache.calcite.schema.SchemaPlus; -import org.apache.druid.benchmark.datagen.SegmentGenerator; import org.apache.druid.common.config.NullHandling; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.granularity.Granularities; @@ -38,6 +37,7 @@ import org.apache.druid.query.groupby.ResultRow; import org.apache.druid.segment.QueryableIndex; import org.apache.druid.segment.generator.GeneratorBasicSchemas; import org.apache.druid.segment.generator.GeneratorSchemaInfo; +import org.apache.druid.segment.generator.SegmentGenerator; import org.apache.druid.server.QueryStackTests; import org.apache.druid.server.security.AuthTestUtils; import org.apache.druid.server.security.AuthenticationResult; diff --git a/pom.xml b/pom.xml index 71c771dd1bd..bbceed61c7f 100644 --- a/pom.xml +++ b/pom.xml @@ -1277,6 +1277,9 @@ org/apache/druid/benchmark/**/* org/apache/druid/**/*Benchmark* org/apache/druid/testing/**/* + + + org/apache/druid/query/TruncatedResponseContextException.class diff --git a/processing/src/main/java/org/apache/druid/query/BaseQuery.java b/processing/src/main/java/org/apache/druid/query/BaseQuery.java index 68be7ba48fd..1682f9e296c 100644 --- a/processing/src/main/java/org/apache/druid/query/BaseQuery.java +++ b/processing/src/main/java/org/apache/druid/query/BaseQuery.java @@ -199,7 +199,7 @@ public abstract class BaseQuery implements Query return computeOverriddenContext(getContext(), overrides); } - protected static Map computeOverriddenContext( + public static Map computeOverriddenContext( final Map context, final Map overrides ) @@ -247,7 +247,7 @@ public abstract class BaseQuery implements Query } @Override - public Query withId(String id) + public Query withId(String id) { return withOverriddenContext(ImmutableMap.of(QUERY_ID, id)); } diff --git a/processing/src/main/java/org/apache/druid/query/Druids.java b/processing/src/main/java/org/apache/druid/query/Druids.java index 522f26b3689..21db123d89e 100644 --- a/processing/src/main/java/org/apache/druid/query/Druids.java +++ b/processing/src/main/java/org/apache/druid/query/Druids.java @@ -22,6 +22,7 @@ package org.apache.druid.query; import com.google.common.base.Function; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterables; import com.google.common.collect.Sets; import org.apache.druid.java.util.common.granularity.Granularities; @@ -59,6 +60,7 @@ import java.util.EnumSet; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.UUID; /** */ @@ -259,7 +261,18 @@ public class Druids public TimeseriesQueryBuilder context(Map c) { - context = c; + this.context = c; + return this; + } + + public TimeseriesQueryBuilder randomQueryId() + { + return queryId(UUID.randomUUID().toString()); + } + + public TimeseriesQueryBuilder queryId(String queryId) + { + context = BaseQuery.computeOverriddenContext(context, ImmutableMap.of(BaseQuery.QUERY_ID, queryId)); return this; } @@ -466,7 +479,18 @@ public class Druids public SearchQueryBuilder context(Map c) { - context = c; + this.context = c; + return this; + } + + public SearchQueryBuilder randomQueryId() + { + return queryId(UUID.randomUUID().toString()); + } + + public SearchQueryBuilder queryId(String queryId) + { + context = BaseQuery.computeOverriddenContext(context, ImmutableMap.of(BaseQuery.QUERY_ID, queryId)); return this; } } @@ -572,7 +596,18 @@ public class Druids public TimeBoundaryQueryBuilder context(Map c) { - context = c; + this.context = c; + return this; + } + + public TimeBoundaryQueryBuilder randomQueryId() + { + return queryId(UUID.randomUUID().toString()); + } + + public TimeBoundaryQueryBuilder queryId(String queryId) + { + context = BaseQuery.computeOverriddenContext(context, ImmutableMap.of(BaseQuery.QUERY_ID, queryId)); return this; } } @@ -721,7 +756,7 @@ public class Druids public SegmentMetadataQueryBuilder context(Map c) { - context = c; + this.context = c; return this; } } @@ -839,7 +874,7 @@ public class Druids public ScanQueryBuilder context(Map c) { - context = c; + this.context = c; return this; } @@ -967,7 +1002,7 @@ public class Druids public DataSourceMetadataQueryBuilder context(Map c) { - context = c; + this.context = c; return this; } } diff --git a/processing/src/main/java/org/apache/druid/query/Query.java b/processing/src/main/java/org/apache/druid/query/Query.java index 03e26fe92a9..93b24ce45ce 100644 --- a/processing/src/main/java/org/apache/druid/query/Query.java +++ b/processing/src/main/java/org/apache/druid/query/Query.java @@ -21,6 +21,7 @@ package org.apache.druid.query; import com.fasterxml.jackson.annotation.JsonSubTypes; import com.fasterxml.jackson.annotation.JsonTypeInfo; +import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Ordering; import org.apache.druid.guice.annotations.ExtensionPoint; @@ -45,6 +46,7 @@ import org.joda.time.Interval; import javax.annotation.Nullable; import java.util.List; import java.util.Map; +import java.util.UUID; import java.util.concurrent.ExecutorService; @ExtensionPoint @@ -136,6 +138,11 @@ public interface Query */ Query withSubQueryId(String subQueryId); + default Query withDefaultSubQueryId() + { + return withSubQueryId(UUID.randomUUID().toString()); + } + /** * Returns the subQueryId of this query. This is set by ClientQuerySegmentWalker (the entry point for the Broker's * query stack) on any subqueries that it issues. It is null for the main query. @@ -154,6 +161,17 @@ public interface Query return null; } + /** + * Returns a most specific ID of this query; if it is a subquery, this will return its subquery ID. + * If it is a regular query without subqueries, this will return its query ID. + * This method should be called after the relevant ID is assigned using {@link #withId} or {@link #withSubQueryId}. + */ + default String getMostSpecificId() + { + final String subqueryId = getSubQueryId(); + return subqueryId == null ? Preconditions.checkNotNull(getId(), "queryId") : subqueryId; + } + Query withDataSource(DataSource dataSource); default Query optimizeForSegment(PerSegmentQueryOptimizationContext optimizationContext) diff --git a/processing/src/main/java/org/apache/druid/query/QueryContexts.java b/processing/src/main/java/org/apache/druid/query/QueryContexts.java index 36675935fc8..fb67305c5aa 100644 --- a/processing/src/main/java/org/apache/druid/query/QueryContexts.java +++ b/processing/src/main/java/org/apache/druid/query/QueryContexts.java @@ -53,6 +53,7 @@ public class QueryContexts public static final String JOIN_FILTER_REWRITE_VALUE_COLUMN_FILTERS_ENABLE_KEY = "enableJoinFilterRewriteValueColumnFilters"; public static final String JOIN_FILTER_REWRITE_MAX_SIZE_KEY = "joinFilterRewriteMaxSize"; public static final String USE_FILTER_CNF_KEY = "useFilterCNF"; + public static final String SHOULD_FAIL_ON_TRUNCATED_RESPONSE_CONTEXT_KEY = "shouldFailOnTruncatedResponseContext"; public static final boolean DEFAULT_BY_SEGMENT = false; public static final boolean DEFAULT_POPULATE_CACHE = true; @@ -70,6 +71,7 @@ public class QueryContexts public static final boolean DEFAULT_ENABLE_JOIN_FILTER_REWRITE_VALUE_COLUMN_FILTERS = false; public static final long DEFAULT_ENABLE_JOIN_FILTER_REWRITE_MAX_SIZE = 10000; public static final boolean DEFAULT_USE_FILTER_CNF = false; + public static final boolean DEFAULT_SHOULD_FAIL_ON_TRUNCATED_RESPONSE_CONTEXT = false; @SuppressWarnings("unused") // Used by Jackson serialization public enum Vectorize @@ -344,6 +346,19 @@ public class QueryContexts return defaultTimeout; } + public static Query setFailOnTruncatedResponseContext(Query query) + { + return query.withOverriddenContext(ImmutableMap.of(SHOULD_FAIL_ON_TRUNCATED_RESPONSE_CONTEXT_KEY, true)); + } + + public static boolean shouldFailOnTruncatedResponseContext(Query query) + { + return query.getContextBoolean( + SHOULD_FAIL_ON_TRUNCATED_RESPONSE_CONTEXT_KEY, + DEFAULT_SHOULD_FAIL_ON_TRUNCATED_RESPONSE_CONTEXT + ); + } + static long parseLong(Query query, String key, long defaultValue) { final Object val = query.getContextValue(key); diff --git a/processing/src/main/java/org/apache/druid/query/QueryInterruptedException.java b/processing/src/main/java/org/apache/druid/query/QueryInterruptedException.java index acb86a64a95..206e1145333 100644 --- a/processing/src/main/java/org/apache/druid/query/QueryInterruptedException.java +++ b/processing/src/main/java/org/apache/druid/query/QueryInterruptedException.java @@ -50,6 +50,7 @@ public class QueryInterruptedException extends QueryException public static final String RESOURCE_LIMIT_EXCEEDED = "Resource limit exceeded"; public static final String UNAUTHORIZED = "Unauthorized request."; public static final String UNSUPPORTED_OPERATION = "Unsupported operation"; + public static final String TRUNCATED_RESPONSE_CONTEXT = "Truncated response context"; public static final String UNKNOWN_EXCEPTION = "Unknown exception"; @JsonCreator @@ -105,6 +106,8 @@ public class QueryInterruptedException extends QueryException return RESOURCE_LIMIT_EXCEEDED; } else if (e instanceof UnsupportedOperationException) { return UNSUPPORTED_OPERATION; + } else if (e instanceof TruncatedResponseContextException) { + return TRUNCATED_RESPONSE_CONTEXT; } else { return UNKNOWN_EXCEPTION; } diff --git a/processing/src/main/java/org/apache/druid/query/RetryQueryRunner.java b/processing/src/main/java/org/apache/druid/query/RetryQueryRunner.java deleted file mode 100644 index fa337d04789..00000000000 --- a/processing/src/main/java/org/apache/druid/query/RetryQueryRunner.java +++ /dev/null @@ -1,114 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.druid.query; - -import com.fasterxml.jackson.core.type.TypeReference; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.collect.Iterables; -import org.apache.druid.java.util.common.guava.MergeSequence; -import org.apache.druid.java.util.common.guava.Sequence; -import org.apache.druid.java.util.common.guava.Sequences; -import org.apache.druid.java.util.common.guava.Yielder; -import org.apache.druid.java.util.common.guava.YieldingAccumulator; -import org.apache.druid.java.util.common.guava.YieldingSequenceBase; -import org.apache.druid.java.util.emitter.EmittingLogger; -import org.apache.druid.query.context.ResponseContext; -import org.apache.druid.segment.SegmentMissingException; - -import java.util.ArrayList; -import java.util.List; - -public class RetryQueryRunner implements QueryRunner -{ - private static final EmittingLogger log = new EmittingLogger(RetryQueryRunner.class); - - private final QueryRunner baseRunner; - private final RetryQueryRunnerConfig config; - private final ObjectMapper jsonMapper; - - public RetryQueryRunner( - QueryRunner baseRunner, - RetryQueryRunnerConfig config, - ObjectMapper jsonMapper - ) - { - this.baseRunner = baseRunner; - this.config = config; - this.jsonMapper = jsonMapper; - } - - @Override - public Sequence run(final QueryPlus queryPlus, final ResponseContext context) - { - final List> listOfSequences = new ArrayList<>(); - listOfSequences.add(baseRunner.run(queryPlus, context)); - - return new YieldingSequenceBase() - { - @Override - public Yielder toYielder(OutType initValue, YieldingAccumulator accumulator) - { - List missingSegments = getMissingSegments(context); - - if (!missingSegments.isEmpty()) { - for (int i = 0; i < config.getNumTries(); i++) { - log.info("[%,d] missing segments found. Retry attempt [%,d]", missingSegments.size(), i); - - context.put(ResponseContext.Key.MISSING_SEGMENTS, new ArrayList<>()); - final QueryPlus retryQueryPlus = queryPlus.withQuery( - Queries.withSpecificSegments(queryPlus.getQuery(), missingSegments) - ); - Sequence retrySequence = baseRunner.run(retryQueryPlus, context); - listOfSequences.add(retrySequence); - missingSegments = getMissingSegments(context); - if (missingSegments.isEmpty()) { - break; - } - } - - final List finalMissingSegs = getMissingSegments(context); - if (!config.isReturnPartialResults() && !finalMissingSegs.isEmpty()) { - throw new SegmentMissingException("No results found for segments[%s]", finalMissingSegs); - } - - return new MergeSequence<>(queryPlus.getQuery().getResultOrdering(), Sequences.simple(listOfSequences)) - .toYielder(initValue, accumulator); - } else { - return Iterables.getOnlyElement(listOfSequences).toYielder(initValue, accumulator); - } - } - }; - } - - private List getMissingSegments(final ResponseContext context) - { - final Object maybeMissingSegments = context.get(ResponseContext.Key.MISSING_SEGMENTS); - if (maybeMissingSegments == null) { - return new ArrayList<>(); - } - - return jsonMapper.convertValue( - maybeMissingSegments, - new TypeReference>() - { - } - ); - } -} diff --git a/processing/src/main/java/org/apache/druid/query/TruncatedResponseContextException.java b/processing/src/main/java/org/apache/druid/query/TruncatedResponseContextException.java new file mode 100644 index 00000000000..15a26f494e3 --- /dev/null +++ b/processing/src/main/java/org/apache/druid/query/TruncatedResponseContextException.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query; + +import org.apache.druid.java.util.common.StringUtils; + +/** + * This exception is thrown when {@link org.apache.druid.query.context.ResponseContext} is truncated after serialization + * in historicals or realtime tasks. The serialized response context can be truncated if its size is larger than + * {@code QueryResource#RESPONSE_CTX_HEADER_LEN_LIMIT}. + * + * @see org.apache.druid.query.context.ResponseContext#serializeWith + * @see QueryContexts#shouldFailOnTruncatedResponseContext + */ +public class TruncatedResponseContextException extends RuntimeException +{ + public TruncatedResponseContextException(String message, Object... arguments) + { + super(StringUtils.nonStrictFormat(message, arguments)); + } +} diff --git a/processing/src/main/java/org/apache/druid/query/UnionQueryRunner.java b/processing/src/main/java/org/apache/druid/query/UnionQueryRunner.java index f0b2cb55d8a..f0340698059 100644 --- a/processing/src/main/java/org/apache/druid/query/UnionQueryRunner.java +++ b/processing/src/main/java/org/apache/druid/query/UnionQueryRunner.java @@ -55,7 +55,12 @@ public class UnionQueryRunner implements QueryRunner public Sequence apply(DataSource singleSource) { return baseRunner.run( - queryPlus.withQuery(query.withDataSource(singleSource)), + queryPlus.withQuery( + query.withDataSource(singleSource) + // assign the subqueryId. this will be used to validate that every query servers + // have responded per subquery in RetryQueryRunner + .withDefaultSubQueryId() + ), responseContext ); } diff --git a/processing/src/main/java/org/apache/druid/query/context/ResponseContext.java b/processing/src/main/java/org/apache/druid/query/context/ResponseContext.java index a399233e70b..c0d90b31f55 100644 --- a/processing/src/main/java/org/apache/druid/query/context/ResponseContext.java +++ b/processing/src/main/java/org/apache/druid/query/context/ResponseContext.java @@ -28,10 +28,12 @@ import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import org.apache.druid.guice.annotations.PublicApi; +import org.apache.druid.java.util.common.NonnullPair; import org.apache.druid.java.util.common.jackson.JacksonUtils; import org.apache.druid.query.SegmentDescriptor; import org.joda.time.Interval; +import javax.annotation.Nullable; import java.io.IOException; import java.util.ArrayList; import java.util.Collection; @@ -40,6 +42,7 @@ import java.util.Comparator; import java.util.List; import java.util.Map; import java.util.TreeMap; +import java.util.concurrent.ConcurrentHashMap; import java.util.function.BiFunction; /** @@ -112,6 +115,30 @@ public abstract class ResponseContext "uncoveredIntervalsOverflowed", (oldValue, newValue) -> (boolean) oldValue || (boolean) newValue ), + /** + * Map of most relevant query ID to remaining number of responses from query nodes. + * The value is initialized in {@code CachingClusteredClient} when it initializes the connection to the query nodes, + * and is updated whenever they respond (@code DirectDruidClient). {@code RetryQueryRunner} uses this value to + * check if the {@link #MISSING_SEGMENTS} is valid. + * + * Currently, the broker doesn't run subqueries in parallel, the remaining number of responses will be updated + * one by one per subquery. However, since it can be parallelized to run subqueries simultaneously, we store them + * in a ConcurrentHashMap. + * + * @see org.apache.druid.query.Query#getMostSpecificId + */ + REMAINING_RESPONSES_FROM_QUERY_SERVERS( + "remainingResponsesFromQueryServers", + (totalRemainingPerId, idAndNumResponses) -> { + final ConcurrentHashMap map = (ConcurrentHashMap) totalRemainingPerId; + final NonnullPair pair = (NonnullPair) idAndNumResponses; + map.compute( + pair.lhs, + (id, remaining) -> remaining == null ? pair.rhs : remaining + pair.rhs + ); + return map; + } + ), /** * Lists missing segments. */ @@ -335,7 +362,7 @@ public abstract class ResponseContext { final String fullSerializedString = objectMapper.writeValueAsString(getDelegate()); if (fullSerializedString.length() <= maxCharsNumber) { - return new SerializationResult(fullSerializedString, fullSerializedString); + return new SerializationResult(null, fullSerializedString); } else { // Indicates that the context is truncated during serialization. add(Key.TRUNCATED, true); @@ -411,18 +438,22 @@ public abstract class ResponseContext */ public static class SerializationResult { + @Nullable private final String truncatedResult; private final String fullResult; - SerializationResult(String truncatedResult, String fullResult) + SerializationResult(@Nullable String truncatedResult, String fullResult) { this.truncatedResult = truncatedResult; this.fullResult = fullResult; } - public String getTruncatedResult() + /** + * Returns the truncated result if it exists otherwise returns the full result. + */ + public String getResult() { - return truncatedResult; + return isTruncated() ? truncatedResult : fullResult; } public String getFullResult() @@ -430,9 +461,9 @@ public abstract class ResponseContext return fullResult; } - public Boolean isReduced() + public boolean isTruncated() { - return !truncatedResult.equals(fullResult); + return truncatedResult != null; } } } diff --git a/processing/src/main/java/org/apache/druid/query/groupby/GroupByQuery.java b/processing/src/main/java/org/apache/druid/query/groupby/GroupByQuery.java index 462644425dc..54388b53cfe 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/GroupByQuery.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/GroupByQuery.java @@ -27,6 +27,7 @@ import com.google.common.base.Function; import com.google.common.base.Functions; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import com.google.common.collect.Ordering; import com.google.common.primitives.Longs; @@ -78,6 +79,7 @@ import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Set; +import java.util.UUID; import java.util.stream.Collectors; /** @@ -867,7 +869,7 @@ public class GroupByQuery extends BaseQuery private List postAggregatorSpecs; @Nullable private HavingSpec havingSpec; - + @Nullable private Map context; @Nullable @@ -1115,6 +1117,17 @@ public class GroupByQuery extends BaseQuery return this; } + public Builder randomQueryId() + { + return queryId(UUID.randomUUID().toString()); + } + + public Builder queryId(String queryId) + { + context = BaseQuery.computeOverriddenContext(context, ImmutableMap.of(BaseQuery.QUERY_ID, queryId)); + return this; + } + public Builder overrideContext(Map contextOverride) { this.context = computeOverriddenContext(context, contextOverride); diff --git a/processing/src/main/java/org/apache/druid/query/topn/TopNQueryBuilder.java b/processing/src/main/java/org/apache/druid/query/topn/TopNQueryBuilder.java index c6715df8817..700ab58b0f8 100644 --- a/processing/src/main/java/org/apache/druid/query/topn/TopNQueryBuilder.java +++ b/processing/src/main/java/org/apache/druid/query/topn/TopNQueryBuilder.java @@ -19,9 +19,11 @@ package org.apache.druid.query.topn; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.Sets; import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.java.util.common.granularity.Granularity; +import org.apache.druid.query.BaseQuery; import org.apache.druid.query.DataSource; import org.apache.druid.query.TableDataSource; import org.apache.druid.query.aggregation.AggregatorFactory; @@ -43,6 +45,7 @@ import java.util.Collection; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.UUID; /** * A Builder for TopNQuery. @@ -276,8 +279,18 @@ public class TopNQueryBuilder public TopNQueryBuilder context(Map c) { - context = c; + this.context = c; return this; } + public TopNQueryBuilder randomQueryId() + { + return queryId(UUID.randomUUID().toString()); + } + + public TopNQueryBuilder queryId(String queryId) + { + context = BaseQuery.computeOverriddenContext(context, ImmutableMap.of(BaseQuery.QUERY_ID, queryId)); + return this; + } } diff --git a/processing/src/test/java/org/apache/druid/query/DruidsTest.java b/processing/src/test/java/org/apache/druid/query/DruidsTest.java new file mode 100644 index 00000000000..f4144d2d55f --- /dev/null +++ b/processing/src/test/java/org/apache/druid/query/DruidsTest.java @@ -0,0 +1,238 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import org.apache.druid.java.util.common.Intervals; +import org.apache.druid.java.util.common.granularity.Granularities; +import org.apache.druid.query.Druids.SearchQueryBuilder; +import org.apache.druid.query.Druids.TimeBoundaryQueryBuilder; +import org.apache.druid.query.Druids.TimeseriesQueryBuilder; +import org.apache.druid.query.search.SearchQuery; +import org.apache.druid.query.spec.MultipleSpecificSegmentSpec; +import org.apache.druid.query.spec.QuerySegmentSpec; +import org.apache.druid.query.timeboundary.TimeBoundaryQuery; +import org.apache.druid.query.timeseries.TimeseriesQuery; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.runners.Enclosed; +import org.junit.runner.RunWith; + +@RunWith(Enclosed.class) +public class DruidsTest +{ + private static final String DATASOURCE = "datasource"; + private static final QuerySegmentSpec QUERY_SEGMENT_SPEC = new MultipleSpecificSegmentSpec( + ImmutableList.of( + new SegmentDescriptor(Intervals.of("2000/3000"), "0", 0), + new SegmentDescriptor(Intervals.of("2000/3000"), "0", 1) + ) + ); + + public static class TimeseriesQueryBuilderTest + { + private TimeseriesQueryBuilder builder; + + @Before + public void setup() + { + builder = Druids.newTimeseriesQueryBuilder() + .dataSource(DATASOURCE) + .intervals(QUERY_SEGMENT_SPEC) + .granularity(Granularities.ALL); + } + + @Test + public void testQueryIdWhenContextInBuilderIsNullReturnContextContainingQueryId() + { + final TimeseriesQuery query = builder + .queryId("queryId") + .build(); + Assert.assertEquals(ImmutableMap.of(BaseQuery.QUERY_ID, "queryId"), query.getContext()); + } + + @Test + public void testQueryIdWhenBuilderHasNonnullContextWithoutQueryIdReturnMergedContext() + { + final TimeseriesQuery query = builder + .context(ImmutableMap.of("my", "context")) + .queryId("queryId") + .build(); + Assert.assertEquals(ImmutableMap.of(BaseQuery.QUERY_ID, "queryId", "my", "context"), query.getContext()); + } + + @Test + public void testQueryIdWhenBuilderHasNonnullContextWithQueryIdReturnMergedContext() + { + final TimeseriesQuery query = builder + .context(ImmutableMap.of("my", "context", BaseQuery.QUERY_ID, "queryId")) + .queryId("realQueryId") + .build(); + Assert.assertEquals(ImmutableMap.of(BaseQuery.QUERY_ID, "realQueryId", "my", "context"), query.getContext()); + } + + @Test + public void testContextAfterSettingQueryIdReturnContextWithoutQueryId() + { + final TimeseriesQuery query = builder + .queryId("queryId") + .context(ImmutableMap.of("my", "context")) + .build(); + Assert.assertEquals(ImmutableMap.of("my", "context"), query.getContext()); + } + + @Test + public void testContextContainingQueryIdAfterSettingQueryIdOverwriteQueryId() + { + final TimeseriesQuery query = builder + .queryId("queryId") + .context(ImmutableMap.of("my", "context", BaseQuery.QUERY_ID, "realQueryId")) + .build(); + Assert.assertEquals(ImmutableMap.of(BaseQuery.QUERY_ID, "realQueryId", "my", "context"), query.getContext()); + } + } + + public static class SearchQueryBuilderTest + { + private SearchQueryBuilder builder; + + @Before + public void setup() + { + builder = Druids.newSearchQueryBuilder() + .dataSource(DATASOURCE) + .intervals(QUERY_SEGMENT_SPEC) + .granularity(Granularities.ALL); + } + + @Test + public void testQueryIdWhenContextInBuilderIsNullReturnContextContainingQueryId() + { + final SearchQuery query = builder + .queryId("queryId") + .build(); + Assert.assertEquals(ImmutableMap.of(BaseQuery.QUERY_ID, "queryId"), query.getContext()); + } + + @Test + public void testQueryIdWhenBuilderHasNonnullContextWithoutQueryIdReturnMergedContext() + { + final SearchQuery query = builder + .context(ImmutableMap.of("my", "context")) + .queryId("queryId") + .build(); + Assert.assertEquals(ImmutableMap.of(BaseQuery.QUERY_ID, "queryId", "my", "context"), query.getContext()); + } + + @Test + public void testQueryIdWhenBuilderHasNonnullContextWithQueryIdReturnMergedContext() + { + final SearchQuery query = builder + .context(ImmutableMap.of("my", "context", BaseQuery.QUERY_ID, "queryId")) + .queryId("realQueryId") + .build(); + Assert.assertEquals(ImmutableMap.of(BaseQuery.QUERY_ID, "realQueryId", "my", "context"), query.getContext()); + } + + @Test + public void testContextAfterSettingQueryIdReturnContextWithoutQueryId() + { + final SearchQuery query = builder + .queryId("queryId") + .context(ImmutableMap.of("my", "context")) + .build(); + Assert.assertEquals(ImmutableMap.of("my", "context"), query.getContext()); + } + + @Test + public void testContextContainingQueryIdAfterSettingQueryIdOverwriteQueryId() + { + final SearchQuery query = builder + .queryId("queryId") + .context(ImmutableMap.of("my", "context", BaseQuery.QUERY_ID, "realQueryId")) + .build(); + Assert.assertEquals(ImmutableMap.of(BaseQuery.QUERY_ID, "realQueryId", "my", "context"), query.getContext()); + } + } + + public static class TimeBoundaryBuilderTest + { + private TimeBoundaryQueryBuilder builder; + + @Before + public void setup() + { + builder = Druids.newTimeBoundaryQueryBuilder() + .dataSource(DATASOURCE) + .intervals(QUERY_SEGMENT_SPEC); + } + + @Test + public void testQueryIdWhenContextInBuilderIsNullReturnContextContainingQueryId() + { + final TimeBoundaryQuery query = builder + .queryId("queryId") + .build(); + Assert.assertEquals(ImmutableMap.of(BaseQuery.QUERY_ID, "queryId"), query.getContext()); + } + + @Test + public void testQueryIdWhenBuilderHasNonnullContextWithoutQueryIdReturnMergedContext() + { + final TimeBoundaryQuery query = builder + .context(ImmutableMap.of("my", "context")) + .queryId("queryId") + .build(); + Assert.assertEquals(ImmutableMap.of(BaseQuery.QUERY_ID, "queryId", "my", "context"), query.getContext()); + } + + @Test + public void testQueryIdWhenBuilderHasNonnullContextWithQueryIdReturnMergedContext() + { + final TimeBoundaryQuery query = builder + .context(ImmutableMap.of("my", "context", BaseQuery.QUERY_ID, "queryId")) + .queryId("realQueryId") + .build(); + Assert.assertEquals(ImmutableMap.of(BaseQuery.QUERY_ID, "realQueryId", "my", "context"), query.getContext()); + } + + @Test + public void testContextAfterSettingQueryIdReturnContextWithoutQueryId() + { + final TimeBoundaryQuery query = builder + .queryId("queryId") + .context(ImmutableMap.of("my", "context")) + .build(); + Assert.assertEquals(ImmutableMap.of("my", "context"), query.getContext()); + } + + @Test + public void testContextContainingQueryIdAfterSettingQueryIdOverwriteQueryId() + { + final TimeBoundaryQuery query = builder + .queryId("queryId") + .context(ImmutableMap.of("my", "context", BaseQuery.QUERY_ID, "realQueryId")) + .build(); + Assert.assertEquals(ImmutableMap.of(BaseQuery.QUERY_ID, "realQueryId", "my", "context"), query.getContext()); + } + } +} diff --git a/processing/src/test/java/org/apache/druid/query/QueryContextsTest.java b/processing/src/test/java/org/apache/druid/query/QueryContextsTest.java index 49df451fb1e..f2c6601964a 100644 --- a/processing/src/test/java/org/apache/druid/query/QueryContextsTest.java +++ b/processing/src/test/java/org/apache/druid/query/QueryContextsTest.java @@ -107,4 +107,18 @@ public class QueryContextsTest QueryContexts.withMaxScatterGatherBytes(query, 100); } + + @Test + public void testSetFailOnTruncatedResponseContext() + { + final Query query = new TestQuery( + new TableDataSource("test"), + new MultipleIntervalSegmentSpec(ImmutableList.of(Intervals.of("0/100"))), + false, + null + ); + Assert.assertFalse(QueryContexts.shouldFailOnTruncatedResponseContext(query)); + final Query queryWithFlag = QueryContexts.setFailOnTruncatedResponseContext(query); + Assert.assertTrue(QueryContexts.shouldFailOnTruncatedResponseContext(queryWithFlag)); + } } diff --git a/processing/src/test/java/org/apache/druid/query/RetryQueryRunnerTest.java b/processing/src/test/java/org/apache/druid/query/RetryQueryRunnerTest.java deleted file mode 100644 index 6379b6a6bc4..00000000000 --- a/processing/src/test/java/org/apache/druid/query/RetryQueryRunnerTest.java +++ /dev/null @@ -1,350 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.druid.query; - -import com.fasterxml.jackson.databind.ObjectMapper; -import org.apache.druid.java.util.common.DateTimes; -import org.apache.druid.java.util.common.Intervals; -import org.apache.druid.java.util.common.guava.Sequence; -import org.apache.druid.java.util.common.guava.Sequences; -import org.apache.druid.query.aggregation.LongSumAggregatorFactory; -import org.apache.druid.query.context.ConcurrentResponseContext; -import org.apache.druid.query.context.ResponseContext; -import org.apache.druid.query.spec.MultipleSpecificSegmentSpec; -import org.apache.druid.query.timeseries.TimeseriesQuery; -import org.apache.druid.query.timeseries.TimeseriesResultValue; -import org.apache.druid.segment.SegmentMissingException; -import org.apache.druid.segment.TestHelper; -import org.junit.Assert; -import org.junit.Test; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; - -public class RetryQueryRunnerTest -{ - private static class TestRetryQueryRunnerConfig extends RetryQueryRunnerConfig - { - private int numTries; - private boolean returnPartialResults; - - public TestRetryQueryRunnerConfig(int numTries, boolean returnPartialResults) - { - this.numTries = numTries; - this.returnPartialResults = returnPartialResults; - } - - @Override - public int getNumTries() - { - return numTries; - } - - @Override - public boolean isReturnPartialResults() - { - return returnPartialResults; - } - } - - private final ObjectMapper jsonMapper = TestHelper.makeJsonMapper(); - - final TimeseriesQuery query = Druids.newTimeseriesQueryBuilder() - .dataSource(QueryRunnerTestHelper.DATA_SOURCE) - .granularity(QueryRunnerTestHelper.DAY_GRAN) - .intervals(QueryRunnerTestHelper.FIRST_TO_THIRD) - .aggregators( - Arrays.asList( - QueryRunnerTestHelper.ROWS_COUNT, - new LongSumAggregatorFactory( - "idx", - "index" - ), - QueryRunnerTestHelper.QUALITY_UNIQUES - ) - ) - .build(); - - - @Test - public void testRunWithMissingSegments() - { - ResponseContext context = ConcurrentResponseContext.createEmpty(); - context.put(ResponseContext.Key.MISSING_SEGMENTS, new ArrayList<>()); - RetryQueryRunner> runner = new RetryQueryRunner<>( - new QueryRunner>() - { - @Override - public Sequence> run(QueryPlus queryPlus, ResponseContext context) - { - context.add( - ResponseContext.Key.MISSING_SEGMENTS, - Collections.singletonList(new SegmentDescriptor(Intervals.utc(178888, 1999999), "test", 1)) - ); - return Sequences.empty(); - } - }, - new RetryQueryRunnerConfig() - { - @Override - public int getNumTries() - { - return 0; - } - - @Override - public boolean isReturnPartialResults() - { - return true; - } - }, - jsonMapper - ); - - Iterable> actualResults = runner.run(QueryPlus.wrap(query), context).toList(); - - Assert.assertTrue( - "Should have one entry in the list of missing segments", - ((List) context.get(ResponseContext.Key.MISSING_SEGMENTS)).size() == 1 - ); - Assert.assertTrue("Should return an empty sequence as a result", ((List) actualResults).size() == 0); - } - - - @Test - public void testRetry() - { - ResponseContext context = ConcurrentResponseContext.createEmpty(); - context.put(ResponseContext.Key.NUM_SCANNED_ROWS, 0); - context.put(ResponseContext.Key.MISSING_SEGMENTS, new ArrayList<>()); - RetryQueryRunner> runner = new RetryQueryRunner<>( - new QueryRunner>() - { - @Override - public Sequence> run( - QueryPlus> queryPlus, - ResponseContext context - ) - { - if ((int) context.get(ResponseContext.Key.NUM_SCANNED_ROWS) == 0) { - context.add( - ResponseContext.Key.MISSING_SEGMENTS, - Collections.singletonList(new SegmentDescriptor(Intervals.utc(178888, 1999999), "test", 1)) - ); - context.put(ResponseContext.Key.NUM_SCANNED_ROWS, 1); - return Sequences.empty(); - } else { - return Sequences.simple( - Collections.singletonList( - new Result<>( - DateTimes.nowUtc(), - new TimeseriesResultValue( - new HashMap<>() - ) - ) - ) - ); - } - } - }, - new TestRetryQueryRunnerConfig(1, true), - jsonMapper - ); - - Iterable> actualResults = runner.run(QueryPlus.wrap(query), context).toList(); - - Assert.assertTrue("Should return a list with one element", ((List) actualResults).size() == 1); - Assert.assertTrue( - "Should have nothing in missingSegment list", - ((List) context.get(ResponseContext.Key.MISSING_SEGMENTS)).size() == 0 - ); - } - - @Test - public void testRetryMultiple() - { - ResponseContext context = ConcurrentResponseContext.createEmpty(); - context.put(ResponseContext.Key.NUM_SCANNED_ROWS, 0); - context.put(ResponseContext.Key.MISSING_SEGMENTS, new ArrayList<>()); - RetryQueryRunner> runner = new RetryQueryRunner<>( - new QueryRunner>() - { - @Override - public Sequence> run( - QueryPlus> queryPlus, - ResponseContext context - ) - { - if ((int) context.get(ResponseContext.Key.NUM_SCANNED_ROWS) < 3) { - context.add( - ResponseContext.Key.MISSING_SEGMENTS, - Collections.singletonList(new SegmentDescriptor(Intervals.utc(178888, 1999999), "test", 1)) - ); - context.put(ResponseContext.Key.NUM_SCANNED_ROWS, (int) context.get(ResponseContext.Key.NUM_SCANNED_ROWS) + 1); - return Sequences.empty(); - } else { - return Sequences.simple( - Collections.singletonList( - new Result<>( - DateTimes.nowUtc(), - new TimeseriesResultValue( - new HashMap<>() - ) - ) - ) - ); - } - } - }, - new TestRetryQueryRunnerConfig(4, true), - jsonMapper - ); - - Iterable> actualResults = runner.run(QueryPlus.wrap(query), context).toList(); - - Assert.assertTrue("Should return a list with one element", ((List) actualResults).size() == 1); - Assert.assertTrue( - "Should have nothing in missingSegment list", - ((List) context.get(ResponseContext.Key.MISSING_SEGMENTS)).size() == 0 - ); - } - - @Test(expected = SegmentMissingException.class) - public void testException() - { - ResponseContext context = ConcurrentResponseContext.createEmpty(); - context.put(ResponseContext.Key.MISSING_SEGMENTS, new ArrayList<>()); - RetryQueryRunner> runner = new RetryQueryRunner<>( - new QueryRunner>() - { - @Override - public Sequence> run( - QueryPlus> queryPlus, - ResponseContext context - ) - { - context.add( - ResponseContext.Key.MISSING_SEGMENTS, - Collections.singletonList(new SegmentDescriptor(Intervals.utc(178888, 1999999), "test", 1)) - ); - return Sequences.empty(); - } - }, - new TestRetryQueryRunnerConfig(1, false), - jsonMapper - ); - - runner.run(QueryPlus.wrap(query), context).toList(); - - Assert.assertTrue( - "Should have one entry in the list of missing segments", - ((List) context.get(ResponseContext.Key.MISSING_SEGMENTS)).size() == 1 - ); - } - - @Test - public void testNoDuplicateRetry() - { - ResponseContext context = ConcurrentResponseContext.createEmpty(); - context.put(ResponseContext.Key.NUM_SCANNED_ROWS, 0); - context.put(ResponseContext.Key.MISSING_SEGMENTS, new ArrayList<>()); - RetryQueryRunner> runner = new RetryQueryRunner<>( - new QueryRunner>() - { - @Override - public Sequence> run( - QueryPlus> queryPlus, - ResponseContext context - ) - { - final Query> query = queryPlus.getQuery(); - if ((int) context.get(ResponseContext.Key.NUM_SCANNED_ROWS) == 0) { - // assume 2 missing segments at first run - context.add( - ResponseContext.Key.MISSING_SEGMENTS, - Arrays.asList( - new SegmentDescriptor(Intervals.utc(178888, 1999999), "test", 1), - new SegmentDescriptor(Intervals.utc(178888, 1999999), "test", 2) - ) - ); - context.put(ResponseContext.Key.NUM_SCANNED_ROWS, 1); - return Sequences.simple( - Collections.singletonList( - new Result<>( - DateTimes.nowUtc(), - new TimeseriesResultValue( - new HashMap<>() - ) - ) - ) - ); - } else if ((int) context.get(ResponseContext.Key.NUM_SCANNED_ROWS) == 1) { - // this is first retry - Assert.assertTrue("Should retry with 2 missing segments", ((MultipleSpecificSegmentSpec) ((BaseQuery) query).getQuerySegmentSpec()).getDescriptors().size() == 2); - // assume only left 1 missing at first retry - context.add( - ResponseContext.Key.MISSING_SEGMENTS, - Collections.singletonList(new SegmentDescriptor(Intervals.utc(178888, 1999999), "test", 2)) - ); - context.put(ResponseContext.Key.NUM_SCANNED_ROWS, 2); - return Sequences.simple( - Collections.singletonList( - new Result<>( - DateTimes.nowUtc(), - new TimeseriesResultValue( - new HashMap<>() - ) - ) - ) - ); - } else { - // this is second retry - Assert.assertTrue("Should retry with 1 missing segments", ((MultipleSpecificSegmentSpec) ((BaseQuery) query).getQuerySegmentSpec()).getDescriptors().size() == 1); - // assume no more missing at second retry - context.put(ResponseContext.Key.NUM_SCANNED_ROWS, 3); - return Sequences.simple( - Collections.singletonList( - new Result<>( - DateTimes.nowUtc(), - new TimeseriesResultValue( - new HashMap<>() - ) - ) - ) - ); - } - } - }, - new TestRetryQueryRunnerConfig(2, false), - jsonMapper - ); - - Iterable> actualResults = runner.run(QueryPlus.wrap(query), context).toList(); - - Assert.assertTrue("Should return a list with 3 elements", ((List) actualResults).size() == 3); - Assert.assertTrue( - "Should have nothing in missingSegment list", - ((List) context.get(ResponseContext.Key.MISSING_SEGMENTS)).size() == 0 - ); - } -} diff --git a/processing/src/test/java/org/apache/druid/query/context/ResponseContextTest.java b/processing/src/test/java/org/apache/druid/query/context/ResponseContextTest.java index 92bf2efee85..8c329441734 100644 --- a/processing/src/test/java/org/apache/druid/query/context/ResponseContextTest.java +++ b/processing/src/test/java/org/apache/druid/query/context/ResponseContextTest.java @@ -23,7 +23,9 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.google.common.collect.ImmutableMap; import org.apache.druid.jackson.DefaultObjectMapper; import org.apache.druid.java.util.common.Intervals; +import org.apache.druid.java.util.common.NonnullPair; import org.apache.druid.query.SegmentDescriptor; +import org.apache.druid.query.context.ResponseContext.Key; import org.joda.time.Interval; import org.junit.Assert; import org.junit.Test; @@ -33,6 +35,7 @@ import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import java.util.function.BiFunction; public class ResponseContextTest @@ -133,6 +136,18 @@ public class ResponseContextTest ((List) ctx.get(ResponseContext.Key.UNCOVERED_INTERVALS)).toArray() ); + final String queryId = "queryId"; + final String queryId2 = "queryId2"; + ctx.put(Key.REMAINING_RESPONSES_FROM_QUERY_SERVERS, new ConcurrentHashMap<>()); + ctx.add(Key.REMAINING_RESPONSES_FROM_QUERY_SERVERS, new NonnullPair<>(queryId, 3)); + ctx.add(Key.REMAINING_RESPONSES_FROM_QUERY_SERVERS, new NonnullPair<>(queryId2, 4)); + ctx.add(Key.REMAINING_RESPONSES_FROM_QUERY_SERVERS, new NonnullPair<>(queryId, -1)); + ctx.add(Key.REMAINING_RESPONSES_FROM_QUERY_SERVERS, new NonnullPair<>(queryId, -2)); + Assert.assertEquals( + ImmutableMap.of(queryId, 0, queryId2, 4), + ctx.get(Key.REMAINING_RESPONSES_FROM_QUERY_SERVERS) + ); + final SegmentDescriptor sd01 = new SegmentDescriptor(interval01, "01", 0); ctx.add(ResponseContext.Key.MISSING_SEGMENTS, Collections.singletonList(sd01)); Assert.assertArrayEquals( @@ -214,14 +229,14 @@ public class ResponseContextTest final DefaultObjectMapper mapper = new DefaultObjectMapper(); Assert.assertEquals( mapper.writeValueAsString(ImmutableMap.of("ETag", "string-value")), - ctx1.serializeWith(mapper, Integer.MAX_VALUE).getTruncatedResult() + ctx1.serializeWith(mapper, Integer.MAX_VALUE).getResult() ); final ResponseContext ctx2 = ResponseContext.createEmpty(); ctx2.add(ResponseContext.Key.NUM_SCANNED_ROWS, 100); Assert.assertEquals( mapper.writeValueAsString(ImmutableMap.of("count", 100)), - ctx2.serializeWith(mapper, Integer.MAX_VALUE).getTruncatedResult() + ctx2.serializeWith(mapper, Integer.MAX_VALUE).getResult() ); } @@ -234,7 +249,7 @@ public class ResponseContextTest final DefaultObjectMapper objectMapper = new DefaultObjectMapper(); final String fullString = objectMapper.writeValueAsString(ctx.getDelegate()); final ResponseContext.SerializationResult res1 = ctx.serializeWith(objectMapper, Integer.MAX_VALUE); - Assert.assertEquals(fullString, res1.getTruncatedResult()); + Assert.assertEquals(fullString, res1.getResult()); final ResponseContext ctxCopy = ResponseContext.createEmpty(); ctxCopy.merge(ctx); final ResponseContext.SerializationResult res2 = ctx.serializeWith(objectMapper, 30); @@ -242,7 +257,7 @@ public class ResponseContextTest ctxCopy.put(ResponseContext.Key.TRUNCATED, true); Assert.assertEquals( ctxCopy.getDelegate(), - ResponseContext.deserialize(res2.getTruncatedResult(), objectMapper).getDelegate() + ResponseContext.deserialize(res2.getResult(), objectMapper).getDelegate() ); } @@ -262,7 +277,7 @@ public class ResponseContextTest final DefaultObjectMapper objectMapper = new DefaultObjectMapper(); final String fullString = objectMapper.writeValueAsString(ctx.getDelegate()); final ResponseContext.SerializationResult res1 = ctx.serializeWith(objectMapper, Integer.MAX_VALUE); - Assert.assertEquals(fullString, res1.getTruncatedResult()); + Assert.assertEquals(fullString, res1.getResult()); final ResponseContext ctxCopy = ResponseContext.createEmpty(); ctxCopy.merge(ctx); final ResponseContext.SerializationResult res2 = ctx.serializeWith(objectMapper, 70); @@ -271,7 +286,7 @@ public class ResponseContextTest ctxCopy.put(ResponseContext.Key.TRUNCATED, true); Assert.assertEquals( ctxCopy.getDelegate(), - ResponseContext.deserialize(res2.getTruncatedResult(), objectMapper).getDelegate() + ResponseContext.deserialize(res2.getResult(), objectMapper).getDelegate() ); } diff --git a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryBuilderTest.java b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryBuilderTest.java new file mode 100644 index 00000000000..91d22184343 --- /dev/null +++ b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryBuilderTest.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.groupby; + +import com.google.common.collect.ImmutableMap; +import org.apache.druid.query.BaseQuery; +import org.apache.druid.query.QueryRunnerTestHelper; +import org.apache.druid.query.aggregation.DoubleMaxAggregatorFactory; +import org.apache.druid.query.dimension.DefaultDimensionSpec; +import org.apache.druid.query.groupby.GroupByQuery.Builder; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +public class GroupByQueryBuilderTest +{ + private Builder builder; + + @Before + public void setup() + { + builder = new Builder() + .setDataSource(QueryRunnerTestHelper.DATA_SOURCE) + .setGranularity(QueryRunnerTestHelper.ALL_GRAN) + .setQuerySegmentSpec(QueryRunnerTestHelper.EMPTY_INTERVAL) + .setDimensions( + new DefaultDimensionSpec(QueryRunnerTestHelper.MARKET_DIMENSION, QueryRunnerTestHelper.MARKET_DIMENSION) + ) + .setAggregatorSpecs(new DoubleMaxAggregatorFactory("index", "index")); + } + + @Test + public void testQueryIdWhenContextInBuilderIsNullReturnContextContainingQueryId() + { + final GroupByQuery query = builder + .queryId("queryId") + .build(); + Assert.assertEquals(ImmutableMap.of(BaseQuery.QUERY_ID, "queryId"), query.getContext()); + } + + @Test + public void testQueryIdWhenBuilderHasNonnullContextWithoutQueryIdReturnMergedContext() + { + final GroupByQuery query = builder + .setContext(ImmutableMap.of("my", "context")) + .queryId("queryId") + .build(); + Assert.assertEquals(ImmutableMap.of(BaseQuery.QUERY_ID, "queryId", "my", "context"), query.getContext()); + } + + @Test + public void testQueryIdWhenBuilderHasNonnullContextWithQueryIdReturnMergedContext() + { + final GroupByQuery query = builder + .setContext(ImmutableMap.of("my", "context", BaseQuery.QUERY_ID, "queryId")) + .queryId("realQueryId") + .build(); + Assert.assertEquals(ImmutableMap.of(BaseQuery.QUERY_ID, "realQueryId", "my", "context"), query.getContext()); + } + + @Test + public void testContextAfterSettingQueryIdReturnContextWithoutQueryId() + { + final GroupByQuery query = builder + .queryId("queryId") + .setContext(ImmutableMap.of("my", "context")) + .build(); + Assert.assertEquals(ImmutableMap.of("my", "context"), query.getContext()); + } + + @Test + public void testContextContainingQueryIdAfterSettingQueryIdOverwriteQueryId() + { + final GroupByQuery query = builder + .queryId("queryId") + .setContext(ImmutableMap.of("my", "context", BaseQuery.QUERY_ID, "realQueryId")) + .build(); + Assert.assertEquals(ImmutableMap.of(BaseQuery.QUERY_ID, "realQueryId", "my", "context"), query.getContext()); + } +} diff --git a/processing/src/test/java/org/apache/druid/query/topn/TopNQueryBuilderTest.java b/processing/src/test/java/org/apache/druid/query/topn/TopNQueryBuilderTest.java new file mode 100644 index 00000000000..e7fbeeacff7 --- /dev/null +++ b/processing/src/test/java/org/apache/druid/query/topn/TopNQueryBuilderTest.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.topn; + +import com.google.common.collect.ImmutableMap; +import org.apache.druid.query.BaseQuery; +import org.apache.druid.query.QueryRunnerTestHelper; +import org.apache.druid.query.aggregation.DoubleMaxAggregatorFactory; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +public class TopNQueryBuilderTest +{ + private TopNQueryBuilder builder; + + @Before + public void setup() + { + builder = new TopNQueryBuilder() + .dataSource(QueryRunnerTestHelper.DATA_SOURCE) + .granularity(QueryRunnerTestHelper.ALL_GRAN) + .dimension(QueryRunnerTestHelper.MARKET_DIMENSION) + .metric(QueryRunnerTestHelper.INDEX_METRIC) + .intervals(QueryRunnerTestHelper.EMPTY_INTERVAL) + .threshold(4) + .aggregators(new DoubleMaxAggregatorFactory("index", "index")); + } + + @Test + public void testQueryIdWhenContextInBuilderIsNullReturnContextContainingQueryId() + { + final TopNQuery query = builder + .queryId("queryId") + .build(); + Assert.assertEquals(ImmutableMap.of(BaseQuery.QUERY_ID, "queryId"), query.getContext()); + } + + @Test + public void testQueryIdWhenBuilderHasNonnullContextWithoutQueryIdReturnMergedContext() + { + final TopNQuery query = builder + .context(ImmutableMap.of("my", "context")) + .queryId("queryId") + .build(); + Assert.assertEquals(ImmutableMap.of(BaseQuery.QUERY_ID, "queryId", "my", "context"), query.getContext()); + } + + @Test + public void testQueryIdWhenBuilderHasNonnullContextWithQueryIdReturnMergedContext() + { + final TopNQuery query = builder + .context(ImmutableMap.of("my", "context", BaseQuery.QUERY_ID, "queryId")) + .queryId("realQueryId") + .build(); + Assert.assertEquals(ImmutableMap.of(BaseQuery.QUERY_ID, "realQueryId", "my", "context"), query.getContext()); + } + + @Test + public void testContextAfterSettingQueryIdReturnContextWithoutQueryId() + { + final TopNQuery query = builder + .queryId("queryId") + .context(ImmutableMap.of("my", "context")) + .build(); + Assert.assertEquals(ImmutableMap.of("my", "context"), query.getContext()); + } + + @Test + public void testContextContainingQueryIdAfterSettingQueryIdOverwriteQueryId() + { + final TopNQuery query = builder + .queryId("queryId") + .context(ImmutableMap.of("my", "context", BaseQuery.QUERY_ID, "realQueryId")) + .build(); + Assert.assertEquals(ImmutableMap.of(BaseQuery.QUERY_ID, "realQueryId", "my", "context"), query.getContext()); + } +} diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/datagen/SegmentGenerator.java b/processing/src/test/java/org/apache/druid/segment/generator/SegmentGenerator.java similarity index 98% rename from benchmarks/src/test/java/org/apache/druid/benchmark/datagen/SegmentGenerator.java rename to processing/src/test/java/org/apache/druid/segment/generator/SegmentGenerator.java index f6d018d9bb8..e2bee73dc66 100644 --- a/benchmarks/src/test/java/org/apache/druid/benchmark/datagen/SegmentGenerator.java +++ b/processing/src/test/java/org/apache/druid/segment/generator/SegmentGenerator.java @@ -17,7 +17,7 @@ * under the License. */ -package org.apache.druid.benchmark.datagen; +package org.apache.druid.segment.generator; import com.google.common.hash.Hashing; import org.apache.druid.common.config.NullHandling; @@ -35,8 +35,6 @@ import org.apache.druid.segment.QueryableIndex; import org.apache.druid.segment.QueryableIndexIndexableAdapter; import org.apache.druid.segment.TestHelper; import org.apache.druid.segment.data.RoaringBitmapSerdeFactory; -import org.apache.druid.segment.generator.DataGenerator; -import org.apache.druid.segment.generator.GeneratorSchemaInfo; import org.apache.druid.segment.incremental.IncrementalIndexSchema; import org.apache.druid.segment.serde.ComplexMetrics; import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory; diff --git a/server/src/main/java/org/apache/druid/client/CachingClusteredClient.java b/server/src/main/java/org/apache/druid/client/CachingClusteredClient.java index ded9cf09d4e..2922f2f16c3 100644 --- a/server/src/main/java/org/apache/druid/client/CachingClusteredClient.java +++ b/server/src/main/java/org/apache/druid/client/CachingClusteredClient.java @@ -40,6 +40,7 @@ import org.apache.druid.guice.annotations.Merging; import org.apache.druid.guice.annotations.Smile; import org.apache.druid.guice.http.DruidHttpClientConfig; import org.apache.druid.java.util.common.Intervals; +import org.apache.druid.java.util.common.NonnullPair; import org.apache.druid.java.util.common.Pair; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.concurrent.Execs; @@ -65,6 +66,7 @@ import org.apache.druid.query.Result; import org.apache.druid.query.SegmentDescriptor; import org.apache.druid.query.aggregation.MetricManipulatorFns; import org.apache.druid.query.context.ResponseContext; +import org.apache.druid.query.context.ResponseContext.Key; import org.apache.druid.query.filter.DimFilterUtils; import org.apache.druid.query.planning.DataSourceAnalysis; import org.apache.druid.query.spec.QuerySegmentSpec; @@ -188,7 +190,21 @@ public class CachingClusteredClient implements QuerySegmentWalker final UnaryOperator> timelineConverter ) { - return new SpecificQueryRunnable<>(queryPlus, responseContext).run(timelineConverter); + final ClusterQueryResult result = new SpecificQueryRunnable<>(queryPlus, responseContext).run(timelineConverter); + initializeNumRemainingResponsesInResponseContext(queryPlus.getQuery(), responseContext, result.numQueryServers); + return result.sequence; + } + + private static void initializeNumRemainingResponsesInResponseContext( + final Query query, + final ResponseContext responseContext, + final int numQueryServers + ) + { + responseContext.add( + Key.REMAINING_RESPONSES_FROM_QUERY_SERVERS, + new NonnullPair<>(query.getMostSpecificId(), numQueryServers) + ); } @Override @@ -221,6 +237,18 @@ public class CachingClusteredClient implements QuerySegmentWalker }; } + private static class ClusterQueryResult + { + private final Sequence sequence; + private final int numQueryServers; + + private ClusterQueryResult(Sequence sequence, int numQueryServers) + { + this.sequence = sequence; + this.numQueryServers = numQueryServers; + } + } + /** * This class essentially encapsulates the major part of the logic of {@link CachingClusteredClient}. It's state and * methods couldn't belong to {@link CachingClusteredClient} itself, because they depend on the specific query object @@ -283,13 +311,23 @@ public class CachingClusteredClient implements QuerySegmentWalker return contextBuilder.build(); } - Sequence run(final UnaryOperator> timelineConverter) + /** + * Builds a query distribution and merge plan. + * + * This method returns an empty sequence if the query datasource is unknown or there is matching result-level cache. + * Otherwise, it creates a sequence merging sequences from the regular broker cache and remote servers. If parallel + * merge is enabled, it can merge and *combine* the underlying sequences in parallel. + * + * @return a pair of a sequence merging results from remote query servers and the number of remote servers + * participating in query processing. + */ + ClusterQueryResult run(final UnaryOperator> timelineConverter) { final Optional> maybeTimeline = serverView.getTimeline( dataSourceAnalysis ); if (!maybeTimeline.isPresent()) { - return Sequences.empty(); + return new ClusterQueryResult<>(Sequences.empty(), 0); } final TimelineLookup timeline = timelineConverter.apply(maybeTimeline.get()); @@ -306,7 +344,7 @@ public class CachingClusteredClient implements QuerySegmentWalker @Nullable final String currentEtag = computeCurrentEtag(segmentServers, queryCacheKey); if (currentEtag != null && currentEtag.equals(prevEtag)) { - return Sequences.empty(); + return new ClusterQueryResult<>(Sequences.empty(), 0); } } @@ -324,7 +362,7 @@ public class CachingClusteredClient implements QuerySegmentWalker return merge(sequencesByInterval); }); - return scheduler.run(query, mergedResultSequence); + return new ClusterQueryResult<>(scheduler.run(query, mergedResultSequence), segmentsByServer.size()); } private Sequence merge(List> sequencesByInterval) @@ -613,6 +651,10 @@ public class CachingClusteredClient implements QuerySegmentWalker } } + /** + * Create sequences that reads from remote query servers (historicals and tasks). Note that the broker will + * hold an HTTP connection per server after this method is called. + */ private void addSequencesFromServer( final List> listOfSequences, final SortedMap> segmentsByServer diff --git a/server/src/main/java/org/apache/druid/client/DirectDruidClient.java b/server/src/main/java/org/apache/druid/client/DirectDruidClient.java index 7c674f0e3ab..e9dd1cad5ac 100644 --- a/server/src/main/java/org/apache/druid/client/DirectDruidClient.java +++ b/server/src/main/java/org/apache/druid/client/DirectDruidClient.java @@ -27,6 +27,7 @@ import com.google.common.base.Preconditions; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; +import org.apache.druid.java.util.common.NonnullPair; import org.apache.druid.java.util.common.RE; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.concurrent.Execs; @@ -53,6 +54,7 @@ import org.apache.druid.query.QueryWatcher; import org.apache.druid.query.aggregation.MetricManipulatorFns; import org.apache.druid.query.context.ConcurrentResponseContext; import org.apache.druid.query.context.ResponseContext; +import org.apache.druid.query.context.ResponseContext.Key; import org.apache.druid.server.QueryResource; import org.jboss.netty.buffer.ChannelBuffer; import org.jboss.netty.buffer.ChannelBuffers; @@ -69,6 +71,7 @@ import java.io.SequenceInputStream; import java.net.URL; import java.util.Enumeration; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; @@ -86,6 +89,7 @@ public class DirectDruidClient implements QueryRunner public static final String QUERY_FAIL_TIME = "queryFailTime"; private static final Logger log = new Logger(DirectDruidClient.class); + private static final int VAL_TO_REDUCE_REMAINING_RESPONSES = -1; private final QueryToolChestWarehouse warehouse; private final QueryWatcher queryWatcher; @@ -105,12 +109,14 @@ public class DirectDruidClient implements QueryRunner public static void removeMagicResponseContextFields(ResponseContext responseContext) { responseContext.remove(ResponseContext.Key.QUERY_TOTAL_BYTES_GATHERED); + responseContext.remove(ResponseContext.Key.REMAINING_RESPONSES_FROM_QUERY_SERVERS); } public static ResponseContext makeResponseContextForQuery() { final ResponseContext responseContext = ConcurrentResponseContext.createEmpty(); responseContext.put(ResponseContext.Key.QUERY_TOTAL_BYTES_GATHERED, new AtomicLong()); + responseContext.put(Key.REMAINING_RESPONSES_FROM_QUERY_SERVERS, new ConcurrentHashMap<>()); return responseContext; } @@ -231,7 +237,17 @@ public class DirectDruidClient implements QueryRunner final boolean continueReading; try { + log.trace( + "Got a response from [%s] for query ID[%s], subquery ID[%s]", + url, + query.getId(), + query.getSubQueryId() + ); final String responseContext = response.headers().get(QueryResource.HEADER_RESPONSE_CONTEXT); + context.add( + ResponseContext.Key.REMAINING_RESPONSES_FROM_QUERY_SERVERS, + new NonnullPair<>(query.getMostSpecificId(), VAL_TO_REDUCE_REMAINING_RESPONSES) + ); // context may be null in case of error or query timeout if (responseContext != null) { context.merge(ResponseContext.deserialize(responseContext, objectMapper)); @@ -438,11 +454,17 @@ public class DirectDruidClient implements QueryRunner throw new RE("Query[%s] url[%s] timed out.", query.getId(), url); } + // Some logics in brokers such as retry on missing segments rely on the response context, + // and thus truncated response contexts are not allowed. + final Query queryToSend = QueryContexts.setFailOnTruncatedResponseContext( + QueryContexts.withTimeout(query, timeLeft) + ); + future = httpClient.go( new Request( HttpMethod.POST, new URL(url) - ).setContent(objectMapper.writeValueAsBytes(QueryContexts.withTimeout(query, timeLeft))) + ).setContent(objectMapper.writeValueAsBytes(queryToSend)) .setHeader( HttpHeaders.Names.CONTENT_TYPE, isSmile ? SmileMediaTypes.APPLICATION_JACKSON_SMILE : MediaType.APPLICATION_JSON diff --git a/server/src/main/java/org/apache/druid/query/RetryQueryRunner.java b/server/src/main/java/org/apache/druid/query/RetryQueryRunner.java new file mode 100644 index 00000000000..f18764f0830 --- /dev/null +++ b/server/src/main/java/org/apache/druid/query/RetryQueryRunner.java @@ -0,0 +1,250 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import org.apache.druid.java.util.common.ISE; +import org.apache.druid.java.util.common.guava.BaseSequence; +import org.apache.druid.java.util.common.guava.BaseSequence.IteratorMaker; +import org.apache.druid.java.util.common.guava.MergeSequence; +import org.apache.druid.java.util.common.guava.Sequence; +import org.apache.druid.java.util.common.guava.Yielder; +import org.apache.druid.java.util.common.guava.YieldingAccumulator; +import org.apache.druid.java.util.common.guava.YieldingSequenceBase; +import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.query.context.ResponseContext; +import org.apache.druid.query.context.ResponseContext.Key; +import org.apache.druid.segment.SegmentMissingException; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; +import java.util.concurrent.ConcurrentHashMap; +import java.util.function.BiFunction; + +public class RetryQueryRunner implements QueryRunner +{ + private static final Logger LOG = new Logger(RetryQueryRunner.class); + + private final QueryRunner baseRunner; + private final BiFunction, List, QueryRunner> retryRunnerCreateFn; + private final RetryQueryRunnerConfig config; + private final ObjectMapper jsonMapper; + + /** + * Runnable executed after the broker creates query distribution tree for the first attempt. This is only + * for testing and must not be used in production code. + */ + private final Runnable runnableAfterFirstAttempt; + + private int totalNumRetries; + + public RetryQueryRunner( + QueryRunner baseRunner, + BiFunction, List, QueryRunner> retryRunnerCreateFn, + RetryQueryRunnerConfig config, + ObjectMapper jsonMapper + ) + { + this(baseRunner, retryRunnerCreateFn, config, jsonMapper, () -> {}); + } + + /** + * Constructor only for testing. + */ + @VisibleForTesting + RetryQueryRunner( + QueryRunner baseRunner, + BiFunction, List, QueryRunner> retryRunnerCreateFn, + RetryQueryRunnerConfig config, + ObjectMapper jsonMapper, + Runnable runnableAfterFirstAttempt + ) + { + this.baseRunner = baseRunner; + this.retryRunnerCreateFn = retryRunnerCreateFn; + this.config = config; + this.jsonMapper = jsonMapper; + this.runnableAfterFirstAttempt = runnableAfterFirstAttempt; + } + + @VisibleForTesting + int getTotalNumRetries() + { + return totalNumRetries; + } + + @Override + public Sequence run(final QueryPlus queryPlus, final ResponseContext context) + { + return new YieldingSequenceBase() + { + @Override + public Yielder toYielder(OutType initValue, YieldingAccumulator accumulator) + { + final Sequence> retryingSequence = new BaseSequence<>( + new IteratorMaker, RetryingSequenceIterator>() + { + @Override + public RetryingSequenceIterator make() + { + return new RetryingSequenceIterator(queryPlus, context, baseRunner, runnableAfterFirstAttempt); + } + + @Override + public void cleanup(RetryingSequenceIterator iterFromMake) + { + totalNumRetries = iterFromMake.retryCount; + } + } + ); + return new MergeSequence<>(queryPlus.getQuery().getResultOrdering(), retryingSequence) + .toYielder(initValue, accumulator); + } + }; + } + + private List getMissingSegments(QueryPlus queryPlus, final ResponseContext context) + { + // Sanity check before retrieving missingSegments from responseContext. + // The missingSegments in the responseContext is only valid when all servers have responded to the broker. + // The remainingResponses MUST be not null but 0 in the responseContext at this point. + final ConcurrentHashMap idToRemainingResponses = + (ConcurrentHashMap) Preconditions.checkNotNull( + context.get(Key.REMAINING_RESPONSES_FROM_QUERY_SERVERS), + "%s in responseContext", + Key.REMAINING_RESPONSES_FROM_QUERY_SERVERS.getName() + ); + + final int remainingResponses = Preconditions.checkNotNull( + idToRemainingResponses.get(queryPlus.getQuery().getMostSpecificId()), + "Number of remaining responses for query[%s]", + queryPlus.getQuery().getMostSpecificId() + ); + if (remainingResponses > 0) { + throw new ISE("Failed to check missing segments due to missing responses from [%d] servers", remainingResponses); + } + + final Object maybeMissingSegments = context.get(ResponseContext.Key.MISSING_SEGMENTS); + if (maybeMissingSegments == null) { + return Collections.emptyList(); + } + + return jsonMapper.convertValue( + maybeMissingSegments, + new TypeReference>() + { + } + ); + } + + /** + * A lazy iterator populating {@link Sequence} by retrying the query. The first returned sequence is always the base + * sequence from the baseQueryRunner. Subsequent sequences are created dynamically whenever it retries the query. All + * the sequences populated by this iterator will be merged (not combined) with the base sequence. + * + * The design of this iterator depends on how {@link MergeSequence} works; the MergeSequence pops an item from + * each underlying sequence and pushes them to a {@link java.util.PriorityQueue}. Whenever it pops from the queue, + * it pushes a new item from the sequence where the returned item was originally from. Since the first returned + * sequence from this iterator is always the base sequence, the MergeSequence will call {@link Sequence#toYielder} + * on the base sequence first which in turn initializing query distribution tree. Once this tree is built, the query + * servers (historicals and realtime tasks) will lock all segments to read and report missing segments to the broker. + * If there are missing segments reported, this iterator will rewrite the query with those reported segments and + * reissue the rewritten query. + * + * @see org.apache.druid.client.CachingClusteredClient + * @see org.apache.druid.client.DirectDruidClient + */ + private class RetryingSequenceIterator implements Iterator> + { + private final QueryPlus queryPlus; + private final ResponseContext context; + private final QueryRunner baseQueryRunner; + private final Runnable runnableAfterFirstAttempt; + + private boolean first = true; + private Sequence sequence = null; + private int retryCount = 0; + + private RetryingSequenceIterator( + QueryPlus queryPlus, + ResponseContext context, + QueryRunner baseQueryRunner, + Runnable runnableAfterFirstAttempt + ) + { + this.queryPlus = queryPlus; + this.context = context; + this.baseQueryRunner = baseQueryRunner; + this.runnableAfterFirstAttempt = runnableAfterFirstAttempt; + } + + @Override + public boolean hasNext() + { + if (first) { + sequence = baseQueryRunner.run(queryPlus, context); + // runnableAfterFirstAttempt is only for testing, it must be no-op for production code. + runnableAfterFirstAttempt.run(); + first = false; + return true; + } else if (sequence != null) { + return true; + } else { + final List missingSegments = getMissingSegments(queryPlus, context); + if (missingSegments.isEmpty()) { + return false; + } else if (retryCount >= config.getNumTries()) { + if (!config.isReturnPartialResults()) { + throw new SegmentMissingException("No results found for segments[%s]", missingSegments); + } else { + return false; + } + } else { + retryCount++; + LOG.info("[%,d] missing segments found. Retry attempt [%,d]", missingSegments.size(), retryCount); + + context.put(ResponseContext.Key.MISSING_SEGMENTS, new ArrayList<>()); + final QueryPlus retryQueryPlus = queryPlus.withQuery( + Queries.withSpecificSegments(queryPlus.getQuery(), missingSegments) + ); + sequence = retryRunnerCreateFn.apply(retryQueryPlus.getQuery(), missingSegments).run(retryQueryPlus, context); + return true; + } + } + } + + @Override + public Sequence next() + { + if (!hasNext()) { + throw new NoSuchElementException(); + } + final Sequence next = sequence; + sequence = null; + return next; + } + } +} diff --git a/processing/src/main/java/org/apache/druid/query/RetryQueryRunnerConfig.java b/server/src/main/java/org/apache/druid/query/RetryQueryRunnerConfig.java similarity index 100% rename from processing/src/main/java/org/apache/druid/query/RetryQueryRunnerConfig.java rename to server/src/main/java/org/apache/druid/query/RetryQueryRunnerConfig.java diff --git a/server/src/main/java/org/apache/druid/server/ClientQuerySegmentWalker.java b/server/src/main/java/org/apache/druid/server/ClientQuerySegmentWalker.java index c7318a60f7f..85da1ff41c7 100644 --- a/server/src/main/java/org/apache/druid/server/ClientQuerySegmentWalker.java +++ b/server/src/main/java/org/apache/druid/server/ClientQuerySegmentWalker.java @@ -24,6 +24,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.Iterables; import com.google.inject.Inject; import org.apache.druid.client.CachingClusteredClient; +import org.apache.druid.client.DirectDruidClient; import org.apache.druid.client.cache.Cache; import org.apache.druid.client.cache.CacheConfig; import org.apache.druid.java.util.common.ISE; @@ -61,7 +62,6 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Stack; -import java.util.UUID; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; @@ -329,7 +329,7 @@ public class ClientQuerySegmentWalker implements QuerySegmentWalker } } else if (canRunQueryUsingLocalWalker(subQuery) || canRunQueryUsingClusterWalker(subQuery)) { // Subquery needs to be inlined. Assign it a subquery id and run it. - final Query subQueryWithId = subQuery.withSubQueryId(UUID.randomUUID().toString()); + final Query subQueryWithId = subQuery.withDefaultSubQueryId(); final Sequence queryResults; @@ -337,7 +337,10 @@ public class ClientQuerySegmentWalker implements QuerySegmentWalker queryResults = Sequences.empty(); } else { final QueryRunner subqueryRunner = subQueryWithId.getRunner(this); - queryResults = subqueryRunner.run(QueryPlus.wrap(subQueryWithId)); + queryResults = subqueryRunner.run( + QueryPlus.wrap(subQueryWithId), + DirectDruidClient.makeResponseContextForQuery() + ); } return toInlineDataSource( @@ -396,6 +399,7 @@ public class ClientQuerySegmentWalker implements QuerySegmentWalker serverConfig, new RetryQueryRunner<>( baseClusterRunner, + clusterClient::getQueryRunnerForSegments, retryConfig, objectMapper ) diff --git a/server/src/main/java/org/apache/druid/server/QueryResource.java b/server/src/main/java/org/apache/druid/server/QueryResource.java index 131ebf134f4..f667639bc25 100644 --- a/server/src/main/java/org/apache/druid/server/QueryResource.java +++ b/server/src/main/java/org/apache/druid/server/QueryResource.java @@ -45,6 +45,7 @@ import org.apache.druid.query.QueryContexts; import org.apache.druid.query.QueryInterruptedException; import org.apache.druid.query.QueryToolChest; import org.apache.druid.query.QueryUnsupportedException; +import org.apache.druid.query.TruncatedResponseContextException; import org.apache.druid.query.context.ResponseContext; import org.apache.druid.server.metrics.QueryCountStatsProvider; import org.apache.druid.server.security.Access; @@ -284,16 +285,28 @@ public class QueryResource implements QueryCountStatsProvider jsonMapper, RESPONSE_CTX_HEADER_LEN_LIMIT ); - if (serializationResult.isReduced()) { - log.info( - "Response Context truncated for id [%s] . Full context is [%s].", + + if (serializationResult.isTruncated()) { + final String logToPrint = StringUtils.format( + "Response Context truncated for id [%s]. Full context is [%s].", queryId, serializationResult.getFullResult() ); + if (QueryContexts.shouldFailOnTruncatedResponseContext(query)) { + log.error(logToPrint); + throw new QueryInterruptedException( + new TruncatedResponseContextException( + "Serialized response context exceeds the max size[%s]", + RESPONSE_CTX_HEADER_LEN_LIMIT + ) + ); + } else { + log.warn(logToPrint); + } } return responseBuilder - .header(HEADER_RESPONSE_CONTEXT, serializationResult.getTruncatedResult()) + .header(HEADER_RESPONSE_CONTEXT, serializationResult.getResult()) .build(); } catch (Exception e) { diff --git a/server/src/main/java/org/apache/druid/server/SetAndVerifyContextQueryRunner.java b/server/src/main/java/org/apache/druid/server/SetAndVerifyContextQueryRunner.java index aa27a85c0f7..f408ef6a0b9 100644 --- a/server/src/main/java/org/apache/druid/server/SetAndVerifyContextQueryRunner.java +++ b/server/src/main/java/org/apache/druid/server/SetAndVerifyContextQueryRunner.java @@ -66,6 +66,11 @@ public class SetAndVerifyContextQueryRunner implements QueryRunner ), serverConfig.getMaxQueryTimeout() ); - return newQuery.withOverriddenContext(ImmutableMap.of(DirectDruidClient.QUERY_FAIL_TIME, this.startTimeMillis + QueryContexts.getTimeout(newQuery))); + return newQuery.withOverriddenContext( + ImmutableMap.of( + DirectDruidClient.QUERY_FAIL_TIME, + this.startTimeMillis + QueryContexts.getTimeout(newQuery) + ) + ); } } diff --git a/server/src/main/java/org/apache/druid/server/coordination/ServerManager.java b/server/src/main/java/org/apache/druid/server/coordination/ServerManager.java index b0b8b17b3a0..7b86dd49ee0 100644 --- a/server/src/main/java/org/apache/druid/server/coordination/ServerManager.java +++ b/server/src/main/java/org/apache/druid/server/coordination/ServerManager.java @@ -250,8 +250,15 @@ public class ServerManager implements QuerySegmentWalker final AtomicLong cpuTimeAccumulator ) { - SpecificSegmentSpec segmentSpec = new SpecificSegmentSpec(segmentDescriptor); - SegmentId segmentId = segment.getId(); + final SpecificSegmentSpec segmentSpec = new SpecificSegmentSpec(segmentDescriptor); + final SegmentId segmentId = segment.getId(); + final Interval segmentInterval = segment.getDataInterval(); + // ReferenceCountingSegment can return null for ID or interval if it's already closed. + // Here, we check one more time if the segment is closed. + // If the segment is closed after this line, ReferenceCountingSegmentQueryRunner will handle and do the right thing. + if (segmentId == null || segmentInterval == null) { + return new ReportTimelineMissingSegmentQueryRunner<>(segmentDescriptor); + } String segmentIdString = segmentId.toString(); MetricsEmittingQueryRunner metricsEmittingQueryRunnerInner = new MetricsEmittingQueryRunner<>( @@ -275,7 +282,7 @@ public class ServerManager implements QuerySegmentWalker BySegmentQueryRunner bySegmentQueryRunner = new BySegmentQueryRunner<>( segmentId, - segment.getDataInterval().getStart(), + segmentInterval.getStart(), cachingQueryRunner ); diff --git a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java index 64168e11d76..806ba043204 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java @@ -766,7 +766,6 @@ public class DruidCoordinator .withDruidCluster(cluster) .withLoadManagementPeons(loadManagementPeons) .withSegmentReplicantLookup(segmentReplicantLookup) - .withBalancerReferenceTimestamp(DateTimes.nowUtc()) .build(); } diff --git a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinatorRuntimeParams.java b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinatorRuntimeParams.java index d73febeca0e..89bc3bf8c6d 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinatorRuntimeParams.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinatorRuntimeParams.java @@ -23,12 +23,10 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableMap; import org.apache.druid.client.DataSourcesSnapshot; -import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.emitter.service.ServiceEmitter; import org.apache.druid.metadata.MetadataRuleManager; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.VersionedIntervalTimeline; -import org.joda.time.DateTime; import javax.annotation.Nullable; import java.util.Arrays; @@ -70,7 +68,6 @@ public class DruidCoordinatorRuntimeParams private final CoordinatorDynamicConfig coordinatorDynamicConfig; private final CoordinatorCompactionConfig coordinatorCompactionConfig; private final CoordinatorStats stats; - private final DateTime balancerReferenceTimestamp; private final BalancerStrategy balancerStrategy; private final Set broadcastDatasources; @@ -87,7 +84,6 @@ public class DruidCoordinatorRuntimeParams CoordinatorDynamicConfig coordinatorDynamicConfig, CoordinatorCompactionConfig coordinatorCompactionConfig, CoordinatorStats stats, - DateTime balancerReferenceTimestamp, BalancerStrategy balancerStrategy, Set broadcastDatasources ) @@ -104,7 +100,6 @@ public class DruidCoordinatorRuntimeParams this.coordinatorDynamicConfig = coordinatorDynamicConfig; this.coordinatorCompactionConfig = coordinatorCompactionConfig; this.stats = stats; - this.balancerReferenceTimestamp = balancerReferenceTimestamp; this.balancerStrategy = balancerStrategy; this.broadcastDatasources = broadcastDatasources; } @@ -175,11 +170,6 @@ public class DruidCoordinatorRuntimeParams return stats; } - public DateTime getBalancerReferenceTimestamp() - { - return balancerReferenceTimestamp; - } - public BalancerStrategy getBalancerStrategy() { return balancerStrategy; @@ -225,7 +215,6 @@ public class DruidCoordinatorRuntimeParams coordinatorDynamicConfig, coordinatorCompactionConfig, stats, - balancerReferenceTimestamp, balancerStrategy, broadcastDatasources ); @@ -246,7 +235,6 @@ public class DruidCoordinatorRuntimeParams coordinatorDynamicConfig, coordinatorCompactionConfig, stats, - balancerReferenceTimestamp, balancerStrategy, broadcastDatasources ); @@ -266,7 +254,6 @@ public class DruidCoordinatorRuntimeParams private CoordinatorDynamicConfig coordinatorDynamicConfig; private CoordinatorCompactionConfig coordinatorCompactionConfig; private CoordinatorStats stats; - private DateTime balancerReferenceTimestamp; private BalancerStrategy balancerStrategy; private Set broadcastDatasources; @@ -284,7 +271,6 @@ public class DruidCoordinatorRuntimeParams this.stats = new CoordinatorStats(); this.coordinatorDynamicConfig = CoordinatorDynamicConfig.builder().build(); this.coordinatorCompactionConfig = CoordinatorCompactionConfig.empty(); - this.balancerReferenceTimestamp = DateTimes.nowUtc(); this.broadcastDatasources = new HashSet<>(); } @@ -301,7 +287,6 @@ public class DruidCoordinatorRuntimeParams CoordinatorDynamicConfig coordinatorDynamicConfig, CoordinatorCompactionConfig coordinatorCompactionConfig, CoordinatorStats stats, - DateTime balancerReferenceTimestamp, BalancerStrategy balancerStrategy, Set broadcastDatasources ) @@ -318,7 +303,6 @@ public class DruidCoordinatorRuntimeParams this.coordinatorDynamicConfig = coordinatorDynamicConfig; this.coordinatorCompactionConfig = coordinatorCompactionConfig; this.stats = stats; - this.balancerReferenceTimestamp = balancerReferenceTimestamp; this.balancerStrategy = balancerStrategy; this.broadcastDatasources = broadcastDatasources; } @@ -339,7 +323,6 @@ public class DruidCoordinatorRuntimeParams coordinatorDynamicConfig, coordinatorCompactionConfig, stats, - balancerReferenceTimestamp, balancerStrategy, broadcastDatasources ); @@ -442,12 +425,6 @@ public class DruidCoordinatorRuntimeParams return this; } - public Builder withBalancerReferenceTimestamp(DateTime balancerReferenceTimestamp) - { - this.balancerReferenceTimestamp = balancerReferenceTimestamp; - return this; - } - public Builder withBalancerStrategy(BalancerStrategy balancerStrategy) { this.balancerStrategy = balancerStrategy; diff --git a/server/src/test/java/org/apache/druid/client/CachingClusteredClientFunctionalityTest.java b/server/src/test/java/org/apache/druid/client/CachingClusteredClientFunctionalityTest.java index 5303989b80c..53f5b987d2a 100644 --- a/server/src/test/java/org/apache/druid/client/CachingClusteredClientFunctionalityTest.java +++ b/server/src/test/java/org/apache/druid/client/CachingClusteredClientFunctionalityTest.java @@ -342,8 +342,9 @@ public class CachingClusteredClientFunctionalityTest final ResponseContext responseContext ) { - return client.getQueryRunnerForIntervals(query, query.getIntervals()).run( - QueryPlus.wrap(query), + final Query theQuery = query.withId("queryId"); + return client.getQueryRunnerForIntervals(theQuery, theQuery.getIntervals()).run( + QueryPlus.wrap(theQuery), responseContext ); } diff --git a/server/src/test/java/org/apache/druid/client/CachingClusteredClientTest.java b/server/src/test/java/org/apache/druid/client/CachingClusteredClientTest.java index 30fb7659c11..452dd8cadd6 100644 --- a/server/src/test/java/org/apache/druid/client/CachingClusteredClientTest.java +++ b/server/src/test/java/org/apache/druid/client/CachingClusteredClientTest.java @@ -86,6 +86,7 @@ import org.apache.druid.query.aggregation.post.ArithmeticPostAggregator; import org.apache.druid.query.aggregation.post.ConstantPostAggregator; import org.apache.druid.query.aggregation.post.FieldAccessPostAggregator; import org.apache.druid.query.context.ResponseContext; +import org.apache.druid.query.context.ResponseContext.Key; import org.apache.druid.query.dimension.DefaultDimensionSpec; import org.apache.druid.query.filter.AndDimFilter; import org.apache.druid.query.filter.BoundDimFilter; @@ -156,6 +157,7 @@ import java.util.Optional; import java.util.Random; import java.util.TreeMap; import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.Executor; import java.util.concurrent.ForkJoinPool; @@ -431,7 +433,8 @@ public class CachingClusteredClientTest .granularity(GRANULARITY) .aggregators(AGGS) .postAggregators(POST_AGGS) - .context(CONTEXT); + .context(CONTEXT) + .randomQueryId(); QueryRunner runner = new FinalizeResultsQueryRunner(getDefaultQueryRunner(), new TimeseriesQueryQueryToolChest()); @@ -472,7 +475,7 @@ public class CachingClusteredClientTest testQueryCaching( runner, - builder.build(), + builder.randomQueryId().build(), Intervals.of("2011-01-01/2011-01-02"), makeTimeResults(DateTimes.of("2011-01-01"), 50, 5000), Intervals.of("2011-01-02/2011-01-03"), makeTimeResults(DateTimes.of("2011-01-02"), 30, 6000), Intervals.of("2011-01-04/2011-01-05"), makeTimeResults(DateTimes.of("2011-01-04"), 23, 85312), @@ -500,6 +503,7 @@ public class CachingClusteredClientTest TimeseriesQuery query = builder.intervals("2011-01-01/2011-01-10") .aggregators(RENAMED_AGGS) .postAggregators(RENAMED_POST_AGGS) + .randomQueryId() .build(); TestHelper.assertExpectedResults( makeRenamedTimeResults( @@ -536,9 +540,10 @@ public class CachingClusteredClientTest .aggregators(AGGS) .postAggregators(POST_AGGS) .context(CONTEXT) + .randomQueryId() .build(); - final ResponseContext context = ResponseContext.createEmpty(); + final ResponseContext context = initializeResponseContext(); final Cache cache = EasyMock.createStrictMock(Cache.class); final Capture> cacheKeyCapture = EasyMock.newCapture(); EasyMock.expect(cache.getBulk(EasyMock.capture(cacheKeyCapture))) @@ -594,7 +599,7 @@ public class CachingClusteredClientTest testQueryCaching( runner, - builder.build(), + builder.randomQueryId().build(), Intervals.of("2011-01-05/2011-01-10"), makeTimeResults( DateTimes.of("2011-01-05T02"), 80, 100, @@ -617,6 +622,7 @@ public class CachingClusteredClientTest .intervals("2011-01-05/2011-01-10") .aggregators(RENAMED_AGGS) .postAggregators(RENAMED_POST_AGGS) + .randomQueryId() .build(); TestHelper.assertExpectedResults( makeRenamedTimeResults( @@ -652,7 +658,7 @@ public class CachingClusteredClientTest testQueryCaching( runner, - builder.build(), + builder.randomQueryId().build(), Intervals.of("2011-11-04/2011-11-08"), makeTimeResults( new DateTime("2011-11-04", TIMEZONE), 50, 5000, @@ -665,6 +671,7 @@ public class CachingClusteredClientTest .intervals("2011-11-04/2011-11-08") .aggregators(RENAMED_AGGS) .postAggregators(RENAMED_POST_AGGS) + .randomQueryId() .build(); TestHelper.assertExpectedResults( makeRenamedTimeResults( @@ -700,7 +707,7 @@ public class CachingClusteredClientTest "useCache", "false", "populateCache", "true" ) - ).build(), + ).randomQueryId().build(), Intervals.of("2011-01-01/2011-01-02"), makeTimeResults(DateTimes.of("2011-01-01"), 50, 5000) ); @@ -719,7 +726,7 @@ public class CachingClusteredClientTest "useCache", "false", "populateCache", "false" ) - ).build(), + ).randomQueryId().build(), Intervals.of("2011-01-01/2011-01-02"), makeTimeResults(DateTimes.of("2011-01-01"), 50, 5000) ); @@ -736,7 +743,7 @@ public class CachingClusteredClientTest "useCache", "true", "populateCache", "false" ) - ).build(), + ).randomQueryId().build(), Intervals.of("2011-01-01/2011-01-02"), makeTimeResults(DateTimes.of("2011-01-01"), 50, 5000) ); @@ -768,7 +775,7 @@ public class CachingClusteredClientTest testQueryCaching( runner, - builder.build(), + builder.randomQueryId().build(), Intervals.of("2011-01-01/2011-01-02"), makeTopNResultsWithoutRename(DateTimes.of("2011-01-01"), "a", 50, 5000, "b", 50, 4999, "c", 50, 4998), @@ -798,6 +805,7 @@ public class CachingClusteredClientTest .metric("imps") .aggregators(RENAMED_AGGS) .postAggregators(DIFF_ORDER_POST_AGGS) + .randomQueryId() .build(); TestHelper.assertExpectedResults( makeRenamedTopNResults( @@ -841,7 +849,7 @@ public class CachingClusteredClientTest testQueryCaching( runner, - builder.build(), + builder.randomQueryId().build(), Intervals.of("2011-11-04/2011-11-08"), makeTopNResultsWithoutRename( new DateTime("2011-11-04", TIMEZONE), "a", 50, 4994, "b", 50, 4993, "c", 50, 4992, @@ -855,6 +863,7 @@ public class CachingClusteredClientTest .metric("imps") .aggregators(RENAMED_AGGS) .postAggregators(DIFF_ORDER_POST_AGGS) + .randomQueryId() .build(); TestHelper.assertExpectedResults( makeRenamedTopNResults( @@ -908,6 +917,7 @@ public class CachingClusteredClientTest .metric("b") .threshold(3) .aggregators(new CountAggregatorFactory("b")) + .randomQueryId() .build(), sequences ) @@ -942,7 +952,7 @@ public class CachingClusteredClientTest ); testQueryCaching( runner, - builder.build(), + builder.randomQueryId().build(), Intervals.of("2011-01-01/2011-01-02"), makeTopNResultsWithoutRename(), @@ -973,6 +983,7 @@ public class CachingClusteredClientTest .metric("imps") .aggregators(RENAMED_AGGS) .postAggregators(DIFF_ORDER_POST_AGGS) + .randomQueryId() .build(); TestHelper.assertExpectedResults( makeRenamedTopNResults( @@ -1013,7 +1024,7 @@ public class CachingClusteredClientTest testQueryCaching( runner, - builder.build(), + builder.randomQueryId().build(), Intervals.of("2011-01-01/2011-01-02"), makeTopNResultsWithoutRename(), @@ -1044,6 +1055,7 @@ public class CachingClusteredClientTest .metric("avg_imps_per_row_double") .aggregators(AGGS) .postAggregators(DIFF_ORDER_POST_AGGS) + .randomQueryId() .build(); TestHelper.assertExpectedResults( makeTopNResultsWithoutRename( @@ -1077,7 +1089,7 @@ public class CachingClusteredClientTest testQueryCaching( getDefaultQueryRunner(), - builder.build(), + builder.randomQueryId().build(), Intervals.of("2011-01-01/2011-01-02"), makeSearchResults(TOP_DIM, DateTimes.of("2011-01-01"), "how", 1, "howdy", 2, "howwwwww", 3, "howwy", 4), @@ -1126,7 +1138,7 @@ public class CachingClusteredClientTest DateTimes.of("2011-01-09"), "how6", 1, "howdy6", 2, "howwwwww6", 3, "howww6", 4, DateTimes.of("2011-01-09T01"), "how6", 1, "howdy6", 2, "howwwwww6", 3, "howww6", 4 ), - runner.run(QueryPlus.wrap(builder.intervals("2011-01-01/2011-01-10").build())) + runner.run(QueryPlus.wrap(builder.randomQueryId().intervals("2011-01-01/2011-01-10").build())) ); } @@ -1145,7 +1157,7 @@ public class CachingClusteredClientTest testQueryCaching( getDefaultQueryRunner(), - builder.build(), + builder.randomQueryId().build(), Intervals.of("2011-01-01/2011-01-02"), makeSearchResults(TOP_DIM, DateTimes.of("2011-01-01"), "how", 1, "howdy", 2, "howwwwww", 3, "howwy", 4), @@ -1178,7 +1190,7 @@ public class CachingClusteredClientTest new SearchQueryQueryToolChest(new SearchQueryConfig()) ); - ResponseContext context = ResponseContext.createEmpty(); + ResponseContext context = initializeResponseContext(); TestHelper.assertExpectedResults( makeSearchResults( TOP_DIM, @@ -1195,11 +1207,12 @@ public class CachingClusteredClientTest DateTimes.of("2011-01-09"), "how6", 1, "howdy6", 2, "howwwwww6", 3, "howww6", 4, DateTimes.of("2011-01-09T01"), "how6", 1, "howdy6", 2, "howwwwww6", 3, "howww6", 4 ), - runner.run(QueryPlus.wrap(builder.intervals("2011-01-01/2011-01-10").build()), context) + runner.run(QueryPlus.wrap(builder.randomQueryId().intervals("2011-01-01/2011-01-10").build()), context) ); SearchQuery query = builder .intervals("2011-01-01/2011-01-10") .dimensions(new DefaultDimensionSpec(TOP_DIM, "new_dim")) + .randomQueryId() .build(); TestHelper.assertExpectedResults( makeSearchResults( @@ -1244,7 +1257,7 @@ public class CachingClusteredClientTest collector.add(hashFn.hashString("abc123", StandardCharsets.UTF_8).asBytes()); collector.add(hashFn.hashString("123abc", StandardCharsets.UTF_8).asBytes()); - final GroupByQuery query = builder.build(); + final GroupByQuery query = builder.randomQueryId().build(); testQueryCaching( getDefaultQueryRunner(), @@ -1322,7 +1335,7 @@ public class CachingClusteredClientTest DateTimes.of("2011-01-09T01"), ImmutableMap.of("a", "g", "rows", 7, "imps", 7, "impers", 7, "uniques", collector) ), - runner.run(QueryPlus.wrap(builder.setInterval("2011-01-05/2011-01-10").build())), + runner.run(QueryPlus.wrap(builder.randomQueryId().setInterval("2011-01-05/2011-01-10").build())), "" ); } @@ -1336,6 +1349,7 @@ public class CachingClusteredClientTest .dataSource(CachingClusteredClientTest.DATA_SOURCE) .intervals(CachingClusteredClientTest.SEG_SPEC) .context(CachingClusteredClientTest.CONTEXT) + .randomQueryId() .build(), Intervals.of("2011-01-01/2011-01-02"), makeTimeBoundaryResult(DateTimes.of("2011-01-01"), DateTimes.of("2011-01-01"), DateTimes.of("2011-01-02")), @@ -1357,6 +1371,7 @@ public class CachingClusteredClientTest .intervals(CachingClusteredClientTest.SEG_SPEC) .context(CachingClusteredClientTest.CONTEXT) .bound(TimeBoundaryQuery.MAX_TIME) + .randomQueryId() .build(), Intervals.of("2011-01-01/2011-01-02"), makeTimeBoundaryResult(DateTimes.of("2011-01-02"), null, DateTimes.of("2011-01-02")), @@ -1375,6 +1390,7 @@ public class CachingClusteredClientTest .intervals(CachingClusteredClientTest.SEG_SPEC) .context(CachingClusteredClientTest.CONTEXT) .bound(TimeBoundaryQuery.MIN_TIME) + .randomQueryId() .build(), Intervals.of("2011-01-01/2011-01-02"), makeTimeBoundaryResult(DateTimes.of("2011-01-01"), DateTimes.of("2011-01-01"), null), @@ -1436,7 +1452,7 @@ public class CachingClusteredClientTest testQueryCachingWithFilter( runner, 3, - builder.build(), + builder.randomQueryId().build(), expectedResult, Intervals.of("2011-01-01/2011-01-05"), makeTimeResults(DateTimes.of("2011-01-01"), 50, 5000), Intervals.of("2011-01-01/2011-01-05"), makeTimeResults(DateTimes.of("2011-01-02"), 10, 1252), @@ -1477,7 +1493,7 @@ public class CachingClusteredClientTest .aggregators(RENAMED_AGGS) .postAggregators(RENAMED_POST_AGGS); - TimeseriesQuery query = builder.build(); + TimeseriesQuery query = builder.randomQueryId().build(); final Interval interval1 = Intervals.of("2011-01-06/2011-01-07"); final Interval interval2 = Intervals.of("2011-01-07/2011-01-08"); @@ -1768,7 +1784,6 @@ public class CachingClusteredClientTest Object... args // does this assume query intervals must be ordered? ) { - final List queryIntervals = Lists.newArrayListWithCapacity(args.length / 2); final List>>> expectedResults = Lists.newArrayListWithCapacity(queryIntervals.size()); @@ -1922,7 +1937,8 @@ public class CachingClusteredClientTest query.withQuerySegmentSpec( new MultipleIntervalSegmentSpec(ImmutableList.of(actualQueryInterval)) ) - ) + ), + initializeResponseContext() ) ); if (queryCompletedCallback != null) { @@ -2739,6 +2755,7 @@ public class CachingClusteredClientTest .dataSource(CachingClusteredClientTest.DATA_SOURCE) .intervals(CachingClusteredClientTest.SEG_SPEC) .context(CachingClusteredClientTest.CONTEXT) + .randomQueryId() .build(), Intervals.of("1970-01-01/1970-01-02"), makeTimeBoundaryResult(DateTimes.of("1970-01-01"), DateTimes.of("1970-01-01"), DateTimes.of("1970-01-02")), @@ -2760,6 +2777,7 @@ public class CachingClusteredClientTest .intervals(CachingClusteredClientTest.SEG_SPEC) .context(CachingClusteredClientTest.CONTEXT) .bound(TimeBoundaryQuery.MAX_TIME) + .randomQueryId() .build(), Intervals.of("1970-01-01/2011-01-02"), makeTimeBoundaryResult(DateTimes.of("1970-01-02"), null, DateTimes.of("1970-01-02")), @@ -2778,6 +2796,7 @@ public class CachingClusteredClientTest .intervals(CachingClusteredClientTest.SEG_SPEC) .context(CachingClusteredClientTest.CONTEXT) .bound(TimeBoundaryQuery.MIN_TIME) + .randomQueryId() .build(), Intervals.of("1970-01-01/2011-01-02"), makeTimeBoundaryResult(DateTimes.of("1970-01-01"), DateTimes.of("1970-01-01"), null), @@ -2804,7 +2823,7 @@ public class CachingClusteredClientTest .setAggregatorSpecs(AGGS) .setContext(CONTEXT); - final GroupByQuery query1 = builder.build(); + final GroupByQuery query1 = builder.randomQueryId().build(); testQueryCaching( getDefaultQueryRunner(), query1, @@ -2847,7 +2866,7 @@ public class CachingClusteredClientTest getDefaultQueryRunner(), WAREHOUSE.getToolChest(query1) ); - ResponseContext context = ResponseContext.createEmpty(); + final ResponseContext context = initializeResponseContext(); TestHelper.assertExpectedObjects( makeGroupByResults( query1, @@ -2862,13 +2881,14 @@ public class CachingClusteredClientTest DateTimes.of("2011-01-09T"), ImmutableMap.of("output", "g", "rows", 7, "imps", 7, "impers", 7), DateTimes.of("2011-01-09T01"), ImmutableMap.of("output", "g", "rows", 7, "imps", 7, "impers", 7) ), - runner.run(QueryPlus.wrap(builder.setInterval("2011-01-05/2011-01-10").build()), context), + runner.run(QueryPlus.wrap(builder.randomQueryId().setInterval("2011-01-05/2011-01-10").build()), context), "" ); final GroupByQuery query2 = builder .setInterval("2011-01-05/2011-01-10").setDimensions(new DefaultDimensionSpec("a", "output2")) .setAggregatorSpecs(RENAMED_AGGS) + .randomQueryId() .build(); TestHelper.assertExpectedObjects( makeGroupByResults( @@ -2918,10 +2938,11 @@ public class CachingClusteredClientTest .dataSource(DATA_SOURCE) .intervals(new MultipleIntervalSegmentSpec(ImmutableList.of(interval))) .context(ImmutableMap.of("If-None-Match", "aVJV29CJY93rszVW/QBy0arWZo0=")) + .randomQueryId() .build(); - ResponseContext responseContext = ResponseContext.createEmpty(); + final ResponseContext responseContext = initializeResponseContext(); getDefaultQueryRunner().run(QueryPlus.wrap(query), responseContext); Assert.assertEquals("MDs2yIUvYLVzaG6zmwTH1plqaYE=", responseContext.get(ResponseContext.Key.ETAG)); @@ -2958,16 +2979,18 @@ public class CachingClusteredClientTest .dataSource(DATA_SOURCE) .intervals(new MultipleIntervalSegmentSpec(ImmutableList.of(queryInterval))) .context(ImmutableMap.of("If-None-Match", "aVJV29CJY93rszVW/QBy0arWZo0=")) + .randomQueryId() .build(); final TimeBoundaryQuery query2 = Druids.newTimeBoundaryQueryBuilder() .dataSource(DATA_SOURCE) .intervals(new MultipleIntervalSegmentSpec(ImmutableList.of(queryInterval2))) .context(ImmutableMap.of("If-None-Match", "aVJV29CJY93rszVW/QBy0arWZo0=")) + .randomQueryId() .build(); - final ResponseContext responseContext = ResponseContext.createEmpty(); + final ResponseContext responseContext = initializeResponseContext(); getDefaultQueryRunner().run(QueryPlus.wrap(query), responseContext); final Object etag1 = responseContext.get(ResponseContext.Key.ETAG); @@ -2989,4 +3012,11 @@ public class CachingClusteredClientTest } }; } + + private static ResponseContext initializeResponseContext() + { + final ResponseContext context = ResponseContext.createEmpty(); + context.put(Key.REMAINING_RESPONSES_FROM_QUERY_SERVERS, new ConcurrentHashMap<>()); + return context; + } } diff --git a/server/src/test/java/org/apache/druid/client/SimpleQueryRunner.java b/server/src/test/java/org/apache/druid/client/SimpleQueryRunner.java new file mode 100644 index 00000000000..294a57103c6 --- /dev/null +++ b/server/src/test/java/org/apache/druid/client/SimpleQueryRunner.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.client; + +import org.apache.druid.java.util.common.guava.Sequence; +import org.apache.druid.query.BySegmentQueryRunner; +import org.apache.druid.query.FinalizeResultsQueryRunner; +import org.apache.druid.query.QueryPlus; +import org.apache.druid.query.QueryRunner; +import org.apache.druid.query.QueryRunnerFactory; +import org.apache.druid.query.QueryRunnerFactoryConglomerate; +import org.apache.druid.query.context.ResponseContext; +import org.apache.druid.segment.QueryableIndex; +import org.apache.druid.segment.QueryableIndexSegment; +import org.apache.druid.timeline.SegmentId; + +/** + * A simple query runner for testing that can process only one segment. + */ +public class SimpleQueryRunner implements QueryRunner +{ + private final QueryRunnerFactoryConglomerate conglomerate; + private final QueryableIndexSegment segment; + + public SimpleQueryRunner( + QueryRunnerFactoryConglomerate conglomerate, + SegmentId segmentId, + QueryableIndex queryableIndex + ) + { + this.conglomerate = conglomerate; + this.segment = new QueryableIndexSegment(queryableIndex, segmentId); + } + + @Override + public Sequence run(QueryPlus queryPlus, ResponseContext responseContext) + { + final QueryRunnerFactory factory = conglomerate.findFactory(queryPlus.getQuery()); + //noinspection unchecked + return factory.getToolchest().preMergeQueryDecoration( + new FinalizeResultsQueryRunner<>( + new BySegmentQueryRunner<>( + segment.getId(), + segment.getDataInterval().getStart(), + factory.createRunner(segment) + ), + factory.getToolchest() + ) + ).run(queryPlus, responseContext); + } +} diff --git a/server/src/test/java/org/apache/druid/client/SimpleServerView.java b/server/src/test/java/org/apache/druid/client/SimpleServerView.java new file mode 100644 index 00000000000..65da4afc79c --- /dev/null +++ b/server/src/test/java/org/apache/druid/client/SimpleServerView.java @@ -0,0 +1,185 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.client; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Preconditions; +import com.google.common.collect.Ordering; +import org.apache.druid.client.selector.HighestPriorityTierSelectorStrategy; +import org.apache.druid.client.selector.QueryableDruidServer; +import org.apache.druid.client.selector.RandomServerSelectorStrategy; +import org.apache.druid.client.selector.ServerSelector; +import org.apache.druid.client.selector.TierSelectorStrategy; +import org.apache.druid.java.util.common.ISE; +import org.apache.druid.java.util.http.client.HttpClient; +import org.apache.druid.query.QueryRunner; +import org.apache.druid.query.QueryToolChestWarehouse; +import org.apache.druid.query.QueryWatcher; +import org.apache.druid.query.planning.DataSourceAnalysis; +import org.apache.druid.server.coordination.ServerType; +import org.apache.druid.server.metrics.NoopServiceEmitter; +import org.apache.druid.timeline.DataSegment; +import org.apache.druid.timeline.TimelineLookup; +import org.apache.druid.timeline.VersionedIntervalTimeline; + +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.Executor; + +/** + * A simple broker server view for testing which you can manually update the server view. + */ +public class SimpleServerView implements TimelineServerView +{ + private static final QueryWatcher NOOP_QUERY_WATCHER = (query, future) -> {}; + private final TierSelectorStrategy tierSelectorStrategy = new HighestPriorityTierSelectorStrategy( + new RandomServerSelectorStrategy() + ); + // server -> queryRunner + private final Map servers = new HashMap<>(); + // segmentId -> serverSelector + private final Map selectors = new HashMap<>(); + // dataSource -> version -> serverSelector + private final Map> timelines = new HashMap<>(); + + private final QueryToolChestWarehouse warehouse; + private final ObjectMapper objectMapper; + private final HttpClient httpClient; + + public SimpleServerView( + QueryToolChestWarehouse warehouse, + ObjectMapper objectMapper, + HttpClient httpClient + ) + { + this.warehouse = warehouse; + this.objectMapper = objectMapper; + this.httpClient = httpClient; + } + + public void addServer(DruidServer server, DataSegment dataSegment) + { + servers.put( + server, + new QueryableDruidServer<>( + server, + new DirectDruidClient<>( + warehouse, + NOOP_QUERY_WATCHER, + objectMapper, + httpClient, + server.getScheme(), + server.getHost(), + new NoopServiceEmitter() + ) + ) + ); + addSegmentToServer(server, dataSegment); + } + + public void removeServer(DruidServer server) + { + servers.remove(server); + } + + public void unannounceSegmentFromServer(DruidServer server, DataSegment segment) + { + final QueryableDruidServer queryableDruidServer = servers.get(server); + if (queryableDruidServer == null) { + throw new ISE("Unknown server [%s]", server); + } + final ServerSelector selector = selectors.get(segment.getId().toString()); + if (selector == null) { + throw new ISE("Unknown segment [%s]", segment.getId()); + } + if (!selector.removeServer(queryableDruidServer)) { + throw new ISE("Failed to remove segment[%s] from server[%s]", segment.getId(), server); + } + final VersionedIntervalTimeline timeline = timelines.get(segment.getDataSource()); + if (timeline == null) { + throw new ISE("Unknown datasource [%s]", segment.getDataSource()); + } + timeline.remove(segment.getInterval(), segment.getVersion(), segment.getShardSpec().createChunk(selector)); + } + + private void addSegmentToServer(DruidServer server, DataSegment segment) + { + final ServerSelector selector = selectors.computeIfAbsent( + segment.getId().toString(), + k -> new ServerSelector(segment, tierSelectorStrategy) + ); + selector.addServerAndUpdateSegment(servers.get(server), segment); + timelines.computeIfAbsent(segment.getDataSource(), k -> new VersionedIntervalTimeline<>(Ordering.natural())) + .add(segment.getInterval(), segment.getVersion(), segment.getShardSpec().createChunk(selector)); + } + + @Override + public Optional> getTimeline(DataSourceAnalysis analysis) + { + return Optional.ofNullable(timelines.get(analysis.getBaseTableDataSource().get().getName())); + } + + @Override + public List getDruidServers() + { + return Collections.emptyList(); + } + + @Override + public QueryRunner getQueryRunner(DruidServer server) + { + final QueryableDruidServer queryableDruidServer = Preconditions.checkNotNull(servers.get(server), "server"); + return (QueryRunner) queryableDruidServer.getQueryRunner(); + } + + @Override + public void registerTimelineCallback(Executor exec, TimelineCallback callback) + { + // do nothing + } + + @Override + public void registerServerRemovedCallback(Executor exec, ServerRemovedCallback callback) + { + // do nothing + } + + @Override + public void registerSegmentCallback(Executor exec, SegmentCallback callback) + { + // do nothing + } + + public static DruidServer createServer(int nameSuiffix) + { + return new DruidServer( + "server_" + nameSuiffix, + "127.0.0." + nameSuiffix, + null, + Long.MAX_VALUE, + ServerType.HISTORICAL, + "default", + 0 + ); + } +} diff --git a/server/src/test/java/org/apache/druid/client/TestHttpClient.java b/server/src/test/java/org/apache/druid/client/TestHttpClient.java new file mode 100644 index 00000000000..7828b647028 --- /dev/null +++ b/server/src/test/java/org/apache/druid/client/TestHttpClient.java @@ -0,0 +1,188 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.client; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import org.apache.druid.java.util.common.ISE; +import org.apache.druid.java.util.common.NonnullPair; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.common.guava.Sequence; +import org.apache.druid.java.util.http.client.HttpClient; +import org.apache.druid.java.util.http.client.Request; +import org.apache.druid.java.util.http.client.response.ClientResponse; +import org.apache.druid.java.util.http.client.response.HttpResponseHandler; +import org.apache.druid.java.util.http.client.response.HttpResponseHandler.TrafficCop; +import org.apache.druid.query.Query; +import org.apache.druid.query.QueryPlus; +import org.apache.druid.query.QueryRunner; +import org.apache.druid.query.QueryRunnerFactoryConglomerate; +import org.apache.druid.query.ReportTimelineMissingSegmentQueryRunner; +import org.apache.druid.query.SegmentDescriptor; +import org.apache.druid.query.context.ResponseContext; +import org.apache.druid.segment.QueryableIndex; +import org.apache.druid.server.QueryResource; +import org.apache.druid.timeline.DataSegment; +import org.jboss.netty.buffer.HeapChannelBufferFactory; +import org.jboss.netty.handler.codec.http.DefaultHttpResponse; +import org.jboss.netty.handler.codec.http.HttpResponse; +import org.jboss.netty.handler.codec.http.HttpResponseStatus; +import org.jboss.netty.handler.codec.http.HttpVersion; +import org.joda.time.Duration; + +import javax.annotation.Nullable; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.net.MalformedURLException; +import java.net.URL; +import java.util.HashMap; +import java.util.Map; + +/** + * An HTTP client for testing which emulates querying data nodes (historicals or realtime tasks). + */ +public class TestHttpClient implements HttpClient +{ + private static final TrafficCop NOOP_TRAFFIC_COP = checkNum -> 0L; + private static final int RESPONSE_CTX_HEADER_LEN_LIMIT = 7 * 1024; + + private final Map servers = new HashMap<>(); + private final ObjectMapper objectMapper; + + public TestHttpClient(ObjectMapper objectMapper) + { + this.objectMapper = objectMapper; + } + + public void addServerAndRunner(DruidServer server, SimpleServerManager serverManager) + { + servers.put(computeUrl(server), serverManager); + } + + @Nullable + public SimpleServerManager getServerManager(DruidServer server) + { + return servers.get(computeUrl(server)); + } + + public Map getServers() + { + return servers; + } + + private static URL computeUrl(DruidServer server) + { + try { + return new URL(StringUtils.format("%s://%s/druid/v2/", server.getScheme(), server.getHost())); + } + catch (MalformedURLException e) { + throw new RuntimeException(e); + } + } + + @Override + public ListenableFuture go( + Request request, + HttpResponseHandler handler + ) + { + throw new UnsupportedOperationException(); + } + + @Override + public ListenableFuture go( + Request request, + HttpResponseHandler handler, + Duration readTimeout + ) + { + try { + final Query query = objectMapper.readValue(request.getContent().array(), Query.class); + final QueryRunner queryRunner = servers.get(request.getUrl()).getQueryRunner(); + if (queryRunner == null) { + throw new ISE("Can't find queryRunner for url[%s]", request.getUrl()); + } + final ResponseContext responseContext = ResponseContext.createEmpty(); + final Sequence sequence = queryRunner.run(QueryPlus.wrap(query), responseContext); + final byte[] serializedContent; + try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) { + objectMapper.writeValue(baos, sequence); + serializedContent = baos.toByteArray(); + } + final ResponseContext.SerializationResult serializationResult = responseContext.serializeWith( + objectMapper, + RESPONSE_CTX_HEADER_LEN_LIMIT + ); + final HttpResponse response = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK); + response.headers().add(QueryResource.HEADER_RESPONSE_CONTEXT, serializationResult.getResult()); + response.setContent( + HeapChannelBufferFactory.getInstance().getBuffer(serializedContent, 0, serializedContent.length) + ); + final ClientResponse intermClientResponse = handler.handleResponse(response, NOOP_TRAFFIC_COP); + final ClientResponse finalClientResponse = handler.done(intermClientResponse); + return Futures.immediateFuture(finalClientResponse.getObj()); + } + catch (IOException e) { + throw new RuntimeException(e); + } + } + + /** + * A simple server manager for testing which you can manually drop a segment. Currently used for + * testing {@link org.apache.druid.query.RetryQueryRunner}. + */ + public static class SimpleServerManager + { + private final QueryRunnerFactoryConglomerate conglomerate; + private final DataSegment segment; + private final QueryableIndex queryableIndex; + + private boolean isSegmentDropped = false; + + public SimpleServerManager( + QueryRunnerFactoryConglomerate conglomerate, + DataSegment segment, + QueryableIndex queryableIndex + ) + { + this.conglomerate = conglomerate; + this.segment = segment; + this.queryableIndex = queryableIndex; + } + + private QueryRunner getQueryRunner() + { + if (isSegmentDropped) { + return new ReportTimelineMissingSegmentQueryRunner( + new SegmentDescriptor(segment.getInterval(), segment.getVersion(), segment.getId().getPartitionNum()) + ); + } else { + return new SimpleQueryRunner(conglomerate, segment.getId(), queryableIndex); + } + } + + public NonnullPair dropSegment() + { + this.isSegmentDropped = true; + return new NonnullPair<>(segment, queryableIndex); + } + } +} diff --git a/server/src/test/java/org/apache/druid/query/RetryQueryRunnerTest.java b/server/src/test/java/org/apache/druid/query/RetryQueryRunnerTest.java new file mode 100644 index 00000000000..7ea13f22dcd --- /dev/null +++ b/server/src/test/java/org/apache/druid/query/RetryQueryRunnerTest.java @@ -0,0 +1,358 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import org.apache.druid.client.CachingClusteredClient; +import org.apache.druid.client.DirectDruidClient; +import org.apache.druid.client.DruidServer; +import org.apache.druid.client.SimpleServerView; +import org.apache.druid.client.TestHttpClient; +import org.apache.druid.client.TestHttpClient.SimpleServerManager; +import org.apache.druid.client.cache.CacheConfig; +import org.apache.druid.client.cache.CachePopulatorStats; +import org.apache.druid.client.cache.ForegroundCachePopulator; +import org.apache.druid.client.cache.MapCache; +import org.apache.druid.guice.http.DruidHttpClientConfig; +import org.apache.druid.jackson.DefaultObjectMapper; +import org.apache.druid.java.util.common.DateTimes; +import org.apache.druid.java.util.common.NonnullPair; +import org.apache.druid.java.util.common.granularity.Granularities; +import org.apache.druid.java.util.common.guava.Sequence; +import org.apache.druid.java.util.common.io.Closer; +import org.apache.druid.query.aggregation.CountAggregatorFactory; +import org.apache.druid.query.context.ConcurrentResponseContext; +import org.apache.druid.query.context.ResponseContext; +import org.apache.druid.query.context.ResponseContext.Key; +import org.apache.druid.query.timeseries.TimeseriesResultValue; +import org.apache.druid.segment.QueryableIndex; +import org.apache.druid.segment.SegmentMissingException; +import org.apache.druid.segment.generator.GeneratorBasicSchemas; +import org.apache.druid.segment.generator.GeneratorSchemaInfo; +import org.apache.druid.segment.generator.SegmentGenerator; +import org.apache.druid.server.QueryStackTests; +import org.apache.druid.timeline.DataSegment; +import org.apache.druid.timeline.partition.NumberedShardSpec; +import org.joda.time.Interval; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ForkJoinPool; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +public class RetryQueryRunnerTest +{ + private static final Closer CLOSER = Closer.create(); + private static final String DATASOURCE = "datasource"; + private static final GeneratorSchemaInfo SCHEMA_INFO = GeneratorBasicSchemas.SCHEMA_MAP.get("basic"); + private static final boolean USE_PARALLEL_MERGE_POOL_CONFIGURED = false; + + @Rule + public ExpectedException expectedException = ExpectedException.none(); + + private final ObjectMapper objectMapper = new DefaultObjectMapper(); + private final QueryToolChestWarehouse toolChestWarehouse; + private final QueryRunnerFactoryConglomerate conglomerate; + + private SegmentGenerator segmentGenerator; + private TestHttpClient httpClient; + private SimpleServerView simpleServerView; + private CachingClusteredClient cachingClusteredClient; + private List servers; + + public RetryQueryRunnerTest() + { + conglomerate = QueryStackTests.createQueryRunnerFactoryConglomerate(CLOSER, USE_PARALLEL_MERGE_POOL_CONFIGURED); + + toolChestWarehouse = new QueryToolChestWarehouse() + { + @Override + public > QueryToolChest getToolChest(final QueryType query) + { + return conglomerate.findFactory(query).getToolchest(); + } + }; + } + + @AfterClass + public static void tearDownClass() throws IOException + { + CLOSER.close(); + } + + @Before + public void setup() + { + segmentGenerator = new SegmentGenerator(); + httpClient = new TestHttpClient(objectMapper); + simpleServerView = new SimpleServerView(toolChestWarehouse, objectMapper, httpClient); + cachingClusteredClient = new CachingClusteredClient( + toolChestWarehouse, + simpleServerView, + MapCache.create(0), + objectMapper, + new ForegroundCachePopulator(objectMapper, new CachePopulatorStats(), 0), + new CacheConfig(), + new DruidHttpClientConfig(), + QueryStackTests.getProcessingConfig(USE_PARALLEL_MERGE_POOL_CONFIGURED), + ForkJoinPool.commonPool(), + QueryStackTests.DEFAULT_NOOP_SCHEDULER + ); + servers = new ArrayList<>(); + } + + @After + public void tearDown() throws IOException + { + segmentGenerator.close(); + } + + private void addServer(DruidServer server, DataSegment dataSegment, QueryableIndex queryableIndex) + { + servers.add(server); + simpleServerView.addServer(server, dataSegment); + httpClient.addServerAndRunner(server, new SimpleServerManager(conglomerate, dataSegment, queryableIndex)); + } + + @Test + public void testNoRetry() + { + prepareCluster(10); + final Query> query = timeseriesQuery(SCHEMA_INFO.getDataInterval()); + final RetryQueryRunner> queryRunner = createQueryRunner( + newRetryQueryRunnerConfig(1, false), + query, + () -> {} + ); + final Sequence> sequence = queryRunner.run(QueryPlus.wrap(query), responseContext()); + final List> queryResult = sequence.toList(); + Assert.assertEquals(0, queryRunner.getTotalNumRetries()); + Assert.assertFalse(queryResult.isEmpty()); + Assert.assertEquals(expectedTimeseriesResult(10), queryResult); + } + + @Test + public void testRetryForMovedSegment() + { + prepareCluster(10); + final Query> query = timeseriesQuery(SCHEMA_INFO.getDataInterval()); + final RetryQueryRunner> queryRunner = createQueryRunner( + newRetryQueryRunnerConfig(1, true), + query, + () -> { + // Let's move a segment + dropSegmentFromServerAndAddNewServerForSegment(servers.get(0)); + } + ); + final Sequence> sequence = queryRunner.run(QueryPlus.wrap(query), responseContext()); + + final List> queryResult = sequence.toList(); + Assert.assertEquals(1, queryRunner.getTotalNumRetries()); + // Note that we dropped a segment from a server, but it's still announced in the server view. + // As a result, we may get the full result or not depending on what server will get the retry query. + // If we hit the same server, the query will return incomplete result. + Assert.assertTrue(queryResult.size() == 9 || queryResult.size() == 10); + Assert.assertEquals(expectedTimeseriesResult(queryResult.size()), queryResult); + } + + @Test + public void testRetryUntilWeGetFullResult() + { + prepareCluster(10); + final Query> query = timeseriesQuery(SCHEMA_INFO.getDataInterval()); + final RetryQueryRunner> queryRunner = createQueryRunner( + newRetryQueryRunnerConfig(100, false), // retry up to 100 + query, + () -> { + // Let's move a segment + dropSegmentFromServerAndAddNewServerForSegment(servers.get(0)); + } + ); + final Sequence> sequence = queryRunner.run(QueryPlus.wrap(query), responseContext()); + + final List> queryResult = sequence.toList(); + Assert.assertTrue(0 < queryRunner.getTotalNumRetries()); + Assert.assertEquals(expectedTimeseriesResult(10), queryResult); + } + + @Test + public void testFailWithPartialResultsAfterRetry() + { + prepareCluster(10); + final Query> query = timeseriesQuery(SCHEMA_INFO.getDataInterval()); + final RetryQueryRunner> queryRunner = createQueryRunner( + newRetryQueryRunnerConfig(1, false), + query, + () -> dropSegmentFromServer(servers.get(0)) + ); + final Sequence> sequence = queryRunner.run(QueryPlus.wrap(query), responseContext()); + + expectedException.expect(SegmentMissingException.class); + expectedException.expectMessage("No results found for segments"); + try { + sequence.toList(); + } + finally { + Assert.assertEquals(1, queryRunner.getTotalNumRetries()); + } + } + + private void prepareCluster(int numServers) + { + for (int i = 0; i < numServers; i++) { + final DataSegment segment = newSegment(SCHEMA_INFO.getDataInterval(), i); + addServer( + SimpleServerView.createServer(i + 1), + segment, + segmentGenerator.generate(segment, SCHEMA_INFO, Granularities.NONE, 10) + ); + } + } + + /** + * Drops a segment from the DruidServer. This method doesn't update the server view, but the server will stop + * serving queries for the dropped segment. + */ + private NonnullPair dropSegmentFromServer(DruidServer fromServer) + { + final SimpleServerManager serverManager = httpClient.getServerManager(fromServer); + Assert.assertNotNull(serverManager); + return serverManager.dropSegment(); + } + + /** + * Drops a segment from the DruidServer and update the server view. + */ + private NonnullPair unannounceSegmentFromServer(DruidServer fromServer) + { + final NonnullPair pair = dropSegmentFromServer(fromServer); + simpleServerView.unannounceSegmentFromServer(fromServer, pair.lhs); + return pair; + } + + /** + * Drops a segment from the {@code fromServer} and creates a new server serving the dropped segment. + * This method updates the server view. + */ + private void dropSegmentFromServerAndAddNewServerForSegment(DruidServer fromServer) + { + final NonnullPair pair = unannounceSegmentFromServer(fromServer); + final DataSegment segmentToMove = pair.lhs; + final QueryableIndex queryableIndexToMove = pair.rhs; + addServer( + SimpleServerView.createServer(11), + segmentToMove, + queryableIndexToMove + ); + } + + private RetryQueryRunner createQueryRunner( + RetryQueryRunnerConfig retryQueryRunnerConfig, + Query query, + Runnable runnableAfterFirstAttempt + ) + { + final QueryRunner baseRunner = cachingClusteredClient.getQueryRunnerForIntervals(query, query.getIntervals()); + return new RetryQueryRunner<>( + baseRunner, + cachingClusteredClient::getQueryRunnerForSegments, + retryQueryRunnerConfig, + objectMapper, + runnableAfterFirstAttempt + ); + } + + private static RetryQueryRunnerConfig newRetryQueryRunnerConfig(int numTries, boolean returnPartialResults) + { + return new RetryQueryRunnerConfig() + { + @Override + public int getNumTries() + { + return numTries; + } + + @Override + public boolean isReturnPartialResults() + { + return returnPartialResults; + } + }; + } + + private static Query> timeseriesQuery(Interval interval) + { + return Druids.newTimeseriesQueryBuilder() + .dataSource(DATASOURCE) + .intervals(ImmutableList.of(interval)) + .granularity(Granularities.DAY) + .aggregators(new CountAggregatorFactory("rows")) + .context( + ImmutableMap.of( + DirectDruidClient.QUERY_FAIL_TIME, + System.currentTimeMillis() + 10000 + ) + ) + .build() + .withId(UUID.randomUUID().toString()); + } + + private ResponseContext responseContext() + { + final ResponseContext responseContext = ConcurrentResponseContext.createEmpty(); + responseContext.put(Key.REMAINING_RESPONSES_FROM_QUERY_SERVERS, new ConcurrentHashMap<>()); + return responseContext; + } + + private static List> expectedTimeseriesResult(int expectedNumResultRows) + { + return IntStream + .range(0, expectedNumResultRows) + .mapToObj(i -> new Result<>(DateTimes.of("2000-01-01"), new TimeseriesResultValue(ImmutableMap.of("rows", 10)))) + .collect(Collectors.toList()); + } + + private static DataSegment newSegment( + Interval interval, + int partitionId + ) + { + return DataSegment.builder() + .dataSource(DATASOURCE) + .interval(interval) + .version("1") + .shardSpec(new NumberedShardSpec(partitionId, 0)) + .size(10) + .build(); + } +} diff --git a/server/src/test/java/org/apache/druid/server/ClientQuerySegmentWalkerTest.java b/server/src/test/java/org/apache/druid/server/ClientQuerySegmentWalkerTest.java index 4846e637502..2fe75008d93 100644 --- a/server/src/test/java/org/apache/druid/server/ClientQuerySegmentWalkerTest.java +++ b/server/src/test/java/org/apache/druid/server/ClientQuerySegmentWalkerTest.java @@ -100,6 +100,7 @@ import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Optional; +import java.util.UUID; /** * Tests ClientQuerySegmentWalker. @@ -203,13 +204,14 @@ public class ClientQuerySegmentWalkerTest public void testTimeseriesOnTable() { final TimeseriesQuery query = - Druids.newTimeseriesQueryBuilder() - .dataSource(FOO) - .granularity(Granularities.ALL) - .intervals(Collections.singletonList(INTERVAL)) - .aggregators(new LongSumAggregatorFactory("sum", "n")) - .context(ImmutableMap.of(TimeseriesQuery.CTX_GRAND_TOTAL, false)) - .build(); + (TimeseriesQuery) Druids.newTimeseriesQueryBuilder() + .dataSource(FOO) + .granularity(Granularities.ALL) + .intervals(Collections.singletonList(INTERVAL)) + .aggregators(new LongSumAggregatorFactory("sum", "n")) + .context(ImmutableMap.of(TimeseriesQuery.CTX_GRAND_TOTAL, false)) + .build() + .withId(UUID.randomUUID().toString()); testQuery( query, @@ -227,23 +229,25 @@ public class ClientQuerySegmentWalkerTest public void testTimeseriesOnAutomaticGlobalTable() { final TimeseriesQuery query = - Druids.newTimeseriesQueryBuilder() - .dataSource(GLOBAL) - .granularity(Granularities.ALL) - .intervals(Collections.singletonList(INTERVAL)) - .aggregators(new LongSumAggregatorFactory("sum", "n")) - .context(ImmutableMap.of(TimeseriesQuery.CTX_GRAND_TOTAL, false)) - .build(); + (TimeseriesQuery) Druids.newTimeseriesQueryBuilder() + .dataSource(GLOBAL) + .granularity(Granularities.ALL) + .intervals(Collections.singletonList(INTERVAL)) + .aggregators(new LongSumAggregatorFactory("sum", "n")) + .context(ImmutableMap.of(TimeseriesQuery.CTX_GRAND_TOTAL, false)) + .build() + .withId("queryId"); // expect global/joinable datasource to be automatically translated into a GlobalTableDataSource final TimeseriesQuery expectedClusterQuery = - Druids.newTimeseriesQueryBuilder() - .dataSource(new GlobalTableDataSource(GLOBAL)) - .granularity(Granularities.ALL) - .intervals(Collections.singletonList(INTERVAL)) - .aggregators(new LongSumAggregatorFactory("sum", "n")) - .context(ImmutableMap.of(TimeseriesQuery.CTX_GRAND_TOTAL, false)) - .build(); + (TimeseriesQuery) Druids.newTimeseriesQueryBuilder() + .dataSource(new GlobalTableDataSource(GLOBAL)) + .granularity(Granularities.ALL) + .intervals(Collections.singletonList(INTERVAL)) + .aggregators(new LongSumAggregatorFactory("sum", "n")) + .context(ImmutableMap.of(TimeseriesQuery.CTX_GRAND_TOTAL, false)) + .build() + .withId("queryId"); testQuery( query, @@ -261,12 +265,13 @@ public class ClientQuerySegmentWalkerTest public void testTimeseriesOnInline() { final TimeseriesQuery query = - Druids.newTimeseriesQueryBuilder() - .dataSource(FOO_INLINE) - .granularity(Granularities.ALL) - .intervals(Collections.singletonList(INTERVAL)) - .aggregators(new LongSumAggregatorFactory("sum", "n")) - .build(); + (TimeseriesQuery) Druids.newTimeseriesQueryBuilder() + .dataSource(FOO_INLINE) + .granularity(Granularities.ALL) + .intervals(Collections.singletonList(INTERVAL)) + .aggregators(new LongSumAggregatorFactory("sum", "n")) + .build() + .withId(UUID.randomUUID().toString()); testQuery( query, @@ -292,12 +297,13 @@ public class ClientQuerySegmentWalkerTest .build(); final TimeseriesQuery query = - Druids.newTimeseriesQueryBuilder() - .dataSource(new QueryDataSource(subquery)) - .granularity(Granularities.ALL) - .intervals(Intervals.ONLY_ETERNITY) - .aggregators(new CountAggregatorFactory("cnt")) - .build(); + (TimeseriesQuery) Druids.newTimeseriesQueryBuilder() + .dataSource(new QueryDataSource(subquery)) + .granularity(Granularities.ALL) + .intervals(Intervals.ONLY_ETERNITY) + .aggregators(new CountAggregatorFactory("cnt")) + .build() + .withId(UUID.randomUUID().toString()); testQuery( query, @@ -327,20 +333,22 @@ public class ClientQuerySegmentWalkerTest public void testGroupByOnGroupByOnTable() { final GroupByQuery subquery = - GroupByQuery.builder() - .setDataSource(FOO) - .setGranularity(Granularities.ALL) - .setInterval(Collections.singletonList(INTERVAL)) - .setDimensions(DefaultDimensionSpec.of("s")) - .build(); + (GroupByQuery) GroupByQuery.builder() + .setDataSource(FOO) + .setGranularity(Granularities.ALL) + .setInterval(Collections.singletonList(INTERVAL)) + .setDimensions(DefaultDimensionSpec.of("s")) + .build() + .withId("queryId"); final GroupByQuery query = - GroupByQuery.builder() - .setDataSource(new QueryDataSource(subquery)) - .setGranularity(Granularities.ALL) - .setInterval(Intervals.ONLY_ETERNITY) - .setAggregatorSpecs(new CountAggregatorFactory("cnt")) - .build(); + (GroupByQuery) GroupByQuery.builder() + .setDataSource(new QueryDataSource(subquery)) + .setGranularity(Granularities.ALL) + .setInterval(Intervals.ONLY_ETERNITY) + .setAggregatorSpecs(new CountAggregatorFactory("cnt")) + .build() + .withId("queryId"); testQuery( query, @@ -359,20 +367,21 @@ public class ClientQuerySegmentWalkerTest public void testGroupByOnUnionOfTwoTables() { final GroupByQuery query = - GroupByQuery.builder() - .setDataSource( - new UnionDataSource( - ImmutableList.of( - new TableDataSource(FOO), - new TableDataSource(BAR) - ) - ) - ) - .setGranularity(Granularities.ALL) - .setInterval(Intervals.ONLY_ETERNITY) - .setDimensions(DefaultDimensionSpec.of("s")) - .setAggregatorSpecs(new CountAggregatorFactory("cnt")) - .build(); + (GroupByQuery) GroupByQuery.builder() + .setDataSource( + new UnionDataSource( + ImmutableList.of( + new TableDataSource(FOO), + new TableDataSource(BAR) + ) + ) + ) + .setGranularity(Granularities.ALL) + .setInterval(Intervals.ONLY_ETERNITY) + .setDimensions(DefaultDimensionSpec.of("s")) + .setAggregatorSpecs(new CountAggregatorFactory("cnt")) + .build() + .withId(UUID.randomUUID().toString()); testQuery( query, @@ -411,22 +420,23 @@ public class ClientQuerySegmentWalkerTest .build(); final GroupByQuery query = - GroupByQuery.builder() - .setDataSource( - JoinDataSource.create( - new TableDataSource(FOO), - new QueryDataSource(subquery), - "j.", - "\"j.s\" == \"s\"", - JoinType.INNER, - ExprMacroTable.nil() - ) - ) - .setGranularity(Granularities.ALL) - .setInterval(Intervals.ONLY_ETERNITY) - .setDimensions(DefaultDimensionSpec.of("s"), DefaultDimensionSpec.of("j.s")) - .setAggregatorSpecs(new CountAggregatorFactory("cnt")) - .build(); + (GroupByQuery) GroupByQuery.builder() + .setDataSource( + JoinDataSource.create( + new TableDataSource(FOO), + new QueryDataSource(subquery), + "j.", + "\"j.s\" == \"s\"", + JoinType.INNER, + ExprMacroTable.nil() + ) + ) + .setGranularity(Granularities.ALL) + .setInterval(Intervals.ONLY_ETERNITY) + .setDimensions(DefaultDimensionSpec.of("s"), DefaultDimensionSpec.of("j.s")) + .setAggregatorSpecs(new CountAggregatorFactory("cnt")) + .build() + .withId(UUID.randomUUID().toString()); testQuery( query, @@ -471,13 +481,14 @@ public class ClientQuerySegmentWalkerTest .resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST) .build(); final GroupByQuery query = - GroupByQuery.builder() - .setDataSource(new QueryDataSource(subquery)) - .setGranularity(Granularities.ALL) - .setInterval(Intervals.ONLY_ETERNITY) - .setDimensions(DefaultDimensionSpec.of("s")) - .setAggregatorSpecs(new LongSumAggregatorFactory("sum_n", "n")) - .build(); + (GroupByQuery) GroupByQuery.builder() + .setDataSource(new QueryDataSource(subquery)) + .setGranularity(Granularities.ALL) + .setInterval(Intervals.ONLY_ETERNITY) + .setDimensions(DefaultDimensionSpec.of("s")) + .setAggregatorSpecs(new LongSumAggregatorFactory("sum_n", "n")) + .build() + .withId(UUID.randomUUID().toString()); testQuery( query, @@ -525,14 +536,15 @@ public class ClientQuerySegmentWalkerTest .resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST) .build(); final TopNQuery query = - new TopNQueryBuilder().dataSource(new QueryDataSource(subquery)) - .granularity(Granularities.ALL) - .intervals(Intervals.ONLY_ETERNITY) - .dimension(DefaultDimensionSpec.of("s")) - .metric("sum_n") - .threshold(100) - .aggregators(new LongSumAggregatorFactory("sum_n", "n")) - .build(); + (TopNQuery) new TopNQueryBuilder().dataSource(new QueryDataSource(subquery)) + .granularity(Granularities.ALL) + .intervals(Intervals.ONLY_ETERNITY) + .dimension(DefaultDimensionSpec.of("s")) + .metric("sum_n") + .threshold(100) + .aggregators(new LongSumAggregatorFactory("sum_n", "n")) + .build() + .withId(UUID.randomUUID().toString()); testQuery( query, @@ -570,22 +582,23 @@ public class ClientQuerySegmentWalkerTest public void testJoinOnTableErrorCantInlineTable() { final GroupByQuery query = - GroupByQuery.builder() - .setDataSource( - JoinDataSource.create( - new TableDataSource(FOO), - new TableDataSource(BAR), - "j.", - "\"j.s\" == \"s\"", - JoinType.INNER, - ExprMacroTable.nil() - ) - ) - .setGranularity(Granularities.ALL) - .setInterval(Intervals.ONLY_ETERNITY) - .setDimensions(DefaultDimensionSpec.of("s"), DefaultDimensionSpec.of("j.s")) - .setAggregatorSpecs(new CountAggregatorFactory("cnt")) - .build(); + (GroupByQuery) GroupByQuery.builder() + .setDataSource( + JoinDataSource.create( + new TableDataSource(FOO), + new TableDataSource(BAR), + "j.", + "\"j.s\" == \"s\"", + JoinType.INNER, + ExprMacroTable.nil() + ) + ) + .setGranularity(Granularities.ALL) + .setInterval(Intervals.ONLY_ETERNITY) + .setDimensions(DefaultDimensionSpec.of("s"), DefaultDimensionSpec.of("j.s")) + .setAggregatorSpecs(new CountAggregatorFactory("cnt")) + .build() + .withId(UUID.randomUUID().toString()); expectedException.expect(IllegalStateException.class); expectedException.expectMessage("Cannot handle subquery structure for dataSource"); @@ -607,12 +620,13 @@ public class ClientQuerySegmentWalkerTest .build(); final TimeseriesQuery query = - Druids.newTimeseriesQueryBuilder() - .dataSource(new QueryDataSource(subquery)) - .granularity(Granularities.ALL) - .intervals(Intervals.ONLY_ETERNITY) - .aggregators(new CountAggregatorFactory("cnt")) - .build(); + (TimeseriesQuery) Druids.newTimeseriesQueryBuilder() + .dataSource(new QueryDataSource(subquery)) + .granularity(Granularities.ALL) + .intervals(Intervals.ONLY_ETERNITY) + .aggregators(new CountAggregatorFactory("cnt")) + .build() + .withId(UUID.randomUUID().toString()); expectedException.expect(ResourceLimitExceededException.class); expectedException.expectMessage("Subquery generated results beyond maximum[2]"); @@ -741,8 +755,7 @@ public class ClientQuerySegmentWalkerTest { issuedQueries.clear(); - final Sequence resultSequence = - QueryPlus.wrap(query).run(walker, ResponseContext.createEmpty()); + final Sequence resultSequence = QueryPlus.wrap(query).run(walker, ResponseContext.createEmpty()); final List arrays = conglomerate.findFactory(query).getToolchest().resultsAsArrays(query, resultSequence).toList(); diff --git a/server/src/test/java/org/apache/druid/server/QueryStackTests.java b/server/src/test/java/org/apache/druid/server/QueryStackTests.java index 8a468d2edc3..867e3e208bb 100644 --- a/server/src/test/java/org/apache/druid/server/QueryStackTests.java +++ b/server/src/test/java/org/apache/druid/server/QueryStackTests.java @@ -178,10 +178,57 @@ public class QueryStackTests ); } + public static DruidProcessingConfig getProcessingConfig(boolean useParallelMergePoolConfigured) + { + return new DruidProcessingConfig() + { + @Override + public String getFormatString() + { + return null; + } + + @Override + public int intermediateComputeSizeBytes() + { + return COMPUTE_BUFFER_SIZE; + } + + @Override + public int getNumThreads() + { + // Only use 1 thread for tests. + return 1; + } + + @Override + public int getNumMergeBuffers() + { + // Need 3 buffers for CalciteQueryTest.testDoubleNestedGroupby. + // Two buffers for the broker and one for the queryable. + return 3; + } + + @Override + public boolean useParallelMergePoolConfigured() + { + return useParallelMergePoolConfigured; + } + }; + } + /** * Returns a new {@link QueryRunnerFactoryConglomerate}. Adds relevant closeables to the passed-in {@link Closer}. */ public static QueryRunnerFactoryConglomerate createQueryRunnerFactoryConglomerate(final Closer closer) + { + return createQueryRunnerFactoryConglomerate(closer, true); + } + + public static QueryRunnerFactoryConglomerate createQueryRunnerFactoryConglomerate( + final Closer closer, + final boolean useParallelMergePoolConfigured + ) { final CloseableStupidPool stupidPool = new CloseableStupidPool<>( "TopNQueryRunnerFactory-bufferPool", @@ -201,35 +248,7 @@ public class QueryStackTests return GroupByStrategySelector.STRATEGY_V2; } }, - new DruidProcessingConfig() - { - @Override - public String getFormatString() - { - return null; - } - - @Override - public int intermediateComputeSizeBytes() - { - return COMPUTE_BUFFER_SIZE; - } - - @Override - public int getNumThreads() - { - // Only use 1 thread for tests. - return 1; - } - - @Override - public int getNumMergeBuffers() - { - // Need 3 buffers for CalciteQueryTest.testDoubleNestedGroupby. - // Two buffers for the broker and one for the queryable. - return 3; - } - } + getProcessingConfig(useParallelMergePoolConfigured) ); final GroupByQueryRunnerFactory groupByQueryRunnerFactory = factoryCloserPair.lhs; diff --git a/server/src/test/java/org/apache/druid/server/RequestLogLineTest.java b/server/src/test/java/org/apache/druid/server/RequestLogLineTest.java index 37ec655fff7..2230a5ad8e3 100644 --- a/server/src/test/java/org/apache/druid/server/RequestLogLineTest.java +++ b/server/src/test/java/org/apache/druid/server/RequestLogLineTest.java @@ -20,8 +20,8 @@ package org.apache.druid.server; import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.ImmutableMap; +import org.apache.druid.jackson.DefaultObjectMapper; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.query.Query; import org.apache.druid.query.TableDataSource; @@ -78,14 +78,14 @@ public class RequestLogLineTest new QueryStats(ImmutableMap.of()) ); Assert.assertEquals("", requestLogLine.getRemoteAddr()); - requestLogLine.getNativeQueryLine(new ObjectMapper()); // call should not throw exception + requestLogLine.getNativeQueryLine(new DefaultObjectMapper()); // call should not throw exception requestLogLine = RequestLogLine.forSql( "", null, DateTimes.nowUtc(), null, new QueryStats(ImmutableMap.of()) ); Assert.assertEquals("", requestLogLine.getRemoteAddr()); Assert.assertEquals(ImmutableMap.of(), requestLogLine.getSqlQueryContext()); - requestLogLine.getSqlQueryLine(new ObjectMapper()); // call should not throw exception + requestLogLine.getSqlQueryLine(new DefaultObjectMapper()); // call should not throw exception } } diff --git a/server/src/test/java/org/apache/druid/server/TestClusterQuerySegmentWalker.java b/server/src/test/java/org/apache/druid/server/TestClusterQuerySegmentWalker.java index 715dd0fff3e..326a0ca3df9 100644 --- a/server/src/test/java/org/apache/druid/server/TestClusterQuerySegmentWalker.java +++ b/server/src/test/java/org/apache/druid/server/TestClusterQuerySegmentWalker.java @@ -24,6 +24,7 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; import org.apache.druid.client.SegmentServerSelector; import org.apache.druid.java.util.common.ISE; +import org.apache.druid.java.util.common.NonnullPair; import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.java.util.common.guava.FunctionalIterable; import org.apache.druid.java.util.common.guava.LazySequence; @@ -39,6 +40,7 @@ import org.apache.druid.query.QuerySegmentWalker; import org.apache.druid.query.QueryToolChest; import org.apache.druid.query.SegmentDescriptor; import org.apache.druid.query.TableDataSource; +import org.apache.druid.query.context.ResponseContext.Key; import org.apache.druid.query.planning.DataSourceAnalysis; import org.apache.druid.query.spec.SpecificSegmentQueryRunner; import org.apache.druid.query.spec.SpecificSegmentSpec; @@ -60,6 +62,7 @@ import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Function; @@ -164,6 +167,11 @@ public class TestClusterQuerySegmentWalker implements QuerySegmentWalker // the LocalQuerySegmentWalker constructor instead since this walker is not mimic remote DruidServer objects // to actually serve the queries return (theQuery, responseContext) -> { + responseContext.put(Key.REMAINING_RESPONSES_FROM_QUERY_SERVERS, new ConcurrentHashMap<>()); + responseContext.add( + Key.REMAINING_RESPONSES_FROM_QUERY_SERVERS, + new NonnullPair<>(theQuery.getQuery().getMostSpecificId(), 0) + ); if (scheduler != null) { Set segments = new HashSet<>(); specs.forEach(spec -> segments.add(new SegmentServerSelector(spec))); diff --git a/server/src/test/java/org/apache/druid/server/coordinator/CoordinatorRuntimeParamsTestHelpers.java b/server/src/test/java/org/apache/druid/server/coordinator/CoordinatorRuntimeParamsTestHelpers.java index dfc98e5cc3c..82674ab5cf9 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/CoordinatorRuntimeParamsTestHelpers.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/CoordinatorRuntimeParamsTestHelpers.java @@ -19,16 +19,13 @@ package org.apache.druid.server.coordinator; -import org.apache.druid.java.util.common.DateTimes; - public class CoordinatorRuntimeParamsTestHelpers { public static DruidCoordinatorRuntimeParams.Builder newBuilder() { return DruidCoordinatorRuntimeParams .newBuilder() - .withStartTimeNanos(System.nanoTime()) - .withBalancerReferenceTimestamp(DateTimes.of("2013-01-01")); + .withStartTimeNanos(System.nanoTime()); } public static DruidCoordinatorRuntimeParams.Builder newBuilder(DruidCluster druidCluster) diff --git a/server/src/test/java/org/apache/druid/server/coordinator/RunRulesTest.java b/server/src/test/java/org/apache/druid/server/coordinator/RunRulesTest.java index e138111c76e..f3a54b41d37 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/RunRulesTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/RunRulesTest.java @@ -1062,7 +1062,6 @@ public class RunRulesTest .withDatabaseRuleManager(databaseRuleManager) .withSegmentReplicantLookup(SegmentReplicantLookup.make(new DruidCluster())) .withBalancerStrategy(balancerStrategy) - .withBalancerReferenceTimestamp(DateTimes.of("2013-01-01")) .withDynamicConfigs(CoordinatorDynamicConfig.builder().withMaxSegmentsToMove(5).build()) .build();