Fix RetryQueryRunner to actually do the job (#10082)

* Fix RetryQueryRunner to actually do the job

* more javadoc

* fix test and checkstyle

* don't combine for testing

* address comments

* fix unit tests

* always initialize response context in cachingClusteredClient

* fix subquery

* address comments

* fix test

* query id for builders

* make queryId optional in the builders and ClusterQueryResult

* fix test

* suppress tests and unused methods

* exclude groupBy builder

* fix jacoco exclusion

* add tests for builders

* address comments

* don't truncate
This commit is contained in:
Jihoon Son 2020-07-01 14:02:21 -07:00 committed by GitHub
parent 7bb7489afc
commit 657f8ee80f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
47 changed files with 2070 additions and 718 deletions

View File

@ -21,7 +21,6 @@ package org.apache.druid.benchmark;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import org.apache.druid.benchmark.datagen.SegmentGenerator;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.granularity.Granularities;
@ -42,6 +41,7 @@ import org.apache.druid.segment.VirtualColumns;
import org.apache.druid.segment.column.ValueType;
import org.apache.druid.segment.generator.GeneratorColumnSchema;
import org.apache.druid.segment.generator.GeneratorSchemaInfo;
import org.apache.druid.segment.generator.SegmentGenerator;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.LinearShardSpec;
import org.openjdk.jmh.annotations.Benchmark;

View File

@ -20,7 +20,6 @@
package org.apache.druid.benchmark;
import com.google.common.collect.ImmutableList;
import org.apache.druid.benchmark.datagen.SegmentGenerator;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.granularity.Granularities;
@ -39,6 +38,7 @@ import org.apache.druid.segment.VirtualColumns;
import org.apache.druid.segment.column.ValueType;
import org.apache.druid.segment.generator.GeneratorColumnSchema;
import org.apache.druid.segment.generator.GeneratorSchemaInfo;
import org.apache.druid.segment.generator.SegmentGenerator;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.LinearShardSpec;
import org.openjdk.jmh.annotations.Benchmark;

View File

@ -20,7 +20,6 @@
package org.apache.druid.benchmark;
import com.google.common.collect.ImmutableList;
import org.apache.druid.benchmark.datagen.SegmentGenerator;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.granularity.Granularities;
@ -41,6 +40,7 @@ import org.apache.druid.segment.column.ColumnHolder;
import org.apache.druid.segment.column.ValueType;
import org.apache.druid.segment.generator.GeneratorColumnSchema;
import org.apache.druid.segment.generator.GeneratorSchemaInfo;
import org.apache.druid.segment.generator.SegmentGenerator;
import org.apache.druid.segment.virtual.ExpressionVirtualColumn;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.LinearShardSpec;

View File

@ -27,7 +27,6 @@ import com.google.common.base.Suppliers;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import com.google.common.collect.Ordering;
import org.apache.druid.benchmark.datagen.SegmentGenerator;
import org.apache.druid.client.CachingClusteredClient;
import org.apache.druid.client.DruidServer;
import org.apache.druid.client.ImmutableDruidServer;
@ -104,6 +103,7 @@ import org.apache.druid.segment.QueryableIndex;
import org.apache.druid.segment.QueryableIndexSegment;
import org.apache.druid.segment.generator.GeneratorBasicSchemas;
import org.apache.druid.segment.generator.GeneratorSchemaInfo;
import org.apache.druid.segment.generator.SegmentGenerator;
import org.apache.druid.server.QueryStackTests;
import org.apache.druid.server.coordination.ServerType;
import org.apache.druid.timeline.DataSegment;

View File

@ -22,7 +22,6 @@ package org.apache.druid.benchmark.query;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import org.apache.calcite.schema.SchemaPlus;
import org.apache.druid.benchmark.datagen.SegmentGenerator;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.guava.Sequence;
@ -32,6 +31,7 @@ import org.apache.druid.query.QueryRunnerFactoryConglomerate;
import org.apache.druid.segment.QueryableIndex;
import org.apache.druid.segment.generator.GeneratorBasicSchemas;
import org.apache.druid.segment.generator.GeneratorSchemaInfo;
import org.apache.druid.segment.generator.SegmentGenerator;
import org.apache.druid.server.QueryStackTests;
import org.apache.druid.server.security.AuthTestUtils;
import org.apache.druid.server.security.AuthenticationResult;

View File

@ -21,7 +21,6 @@ package org.apache.druid.benchmark.query;
import com.google.common.collect.ImmutableList;
import org.apache.calcite.schema.SchemaPlus;
import org.apache.druid.benchmark.datagen.SegmentGenerator;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.granularity.Granularities;
@ -38,6 +37,7 @@ import org.apache.druid.query.groupby.ResultRow;
import org.apache.druid.segment.QueryableIndex;
import org.apache.druid.segment.generator.GeneratorBasicSchemas;
import org.apache.druid.segment.generator.GeneratorSchemaInfo;
import org.apache.druid.segment.generator.SegmentGenerator;
import org.apache.druid.server.QueryStackTests;
import org.apache.druid.server.security.AuthTestUtils;
import org.apache.druid.server.security.AuthenticationResult;

View File

@ -1277,6 +1277,9 @@
<exclude>org/apache/druid/benchmark/**/*</exclude> <!-- benchmarks -->
<exclude>org/apache/druid/**/*Benchmark*</exclude> <!-- benchmarks -->
<exclude>org/apache/druid/testing/**/*</exclude> <!-- integration-tests -->
<!-- Exceptions -->
<exclude>org/apache/druid/query/TruncatedResponseContextException.class</exclude>
</excludes>
</configuration>
<executions>

View File

@ -199,7 +199,7 @@ public abstract class BaseQuery<T> implements Query<T>
return computeOverriddenContext(getContext(), overrides);
}
protected static Map<String, Object> computeOverriddenContext(
public static Map<String, Object> computeOverriddenContext(
final Map<String, Object> context,
final Map<String, Object> overrides
)
@ -247,7 +247,7 @@ public abstract class BaseQuery<T> implements Query<T>
}
@Override
public Query withId(String id)
public Query<T> withId(String id)
{
return withOverriddenContext(ImmutableMap.of(QUERY_ID, id));
}

View File

@ -22,6 +22,7 @@ package org.apache.druid.query;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Sets;
import org.apache.druid.java.util.common.granularity.Granularities;
@ -59,6 +60,7 @@ import java.util.EnumSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
/**
*/
@ -259,7 +261,18 @@ public class Druids
public TimeseriesQueryBuilder context(Map<String, Object> c)
{
context = c;
this.context = c;
return this;
}
public TimeseriesQueryBuilder randomQueryId()
{
return queryId(UUID.randomUUID().toString());
}
public TimeseriesQueryBuilder queryId(String queryId)
{
context = BaseQuery.computeOverriddenContext(context, ImmutableMap.of(BaseQuery.QUERY_ID, queryId));
return this;
}
@ -466,7 +479,18 @@ public class Druids
public SearchQueryBuilder context(Map<String, Object> c)
{
context = c;
this.context = c;
return this;
}
public SearchQueryBuilder randomQueryId()
{
return queryId(UUID.randomUUID().toString());
}
public SearchQueryBuilder queryId(String queryId)
{
context = BaseQuery.computeOverriddenContext(context, ImmutableMap.of(BaseQuery.QUERY_ID, queryId));
return this;
}
}
@ -572,7 +596,18 @@ public class Druids
public TimeBoundaryQueryBuilder context(Map<String, Object> c)
{
context = c;
this.context = c;
return this;
}
public TimeBoundaryQueryBuilder randomQueryId()
{
return queryId(UUID.randomUUID().toString());
}
public TimeBoundaryQueryBuilder queryId(String queryId)
{
context = BaseQuery.computeOverriddenContext(context, ImmutableMap.of(BaseQuery.QUERY_ID, queryId));
return this;
}
}
@ -721,7 +756,7 @@ public class Druids
public SegmentMetadataQueryBuilder context(Map<String, Object> c)
{
context = c;
this.context = c;
return this;
}
}
@ -839,7 +874,7 @@ public class Druids
public ScanQueryBuilder context(Map<String, Object> c)
{
context = c;
this.context = c;
return this;
}
@ -967,7 +1002,7 @@ public class Druids
public DataSourceMetadataQueryBuilder context(Map<String, Object> c)
{
context = c;
this.context = c;
return this;
}
}

View File

@ -21,6 +21,7 @@ package org.apache.druid.query;
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Ordering;
import org.apache.druid.guice.annotations.ExtensionPoint;
@ -45,6 +46,7 @@ import org.joda.time.Interval;
import javax.annotation.Nullable;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
@ExtensionPoint
@ -136,6 +138,11 @@ public interface Query<T>
*/
Query<T> withSubQueryId(String subQueryId);
default Query<T> withDefaultSubQueryId()
{
return withSubQueryId(UUID.randomUUID().toString());
}
/**
* Returns the subQueryId of this query. This is set by ClientQuerySegmentWalker (the entry point for the Broker's
* query stack) on any subqueries that it issues. It is null for the main query.
@ -154,6 +161,17 @@ public interface Query<T>
return null;
}
/**
* Returns a most specific ID of this query; if it is a subquery, this will return its subquery ID.
* If it is a regular query without subqueries, this will return its query ID.
* This method should be called after the relevant ID is assigned using {@link #withId} or {@link #withSubQueryId}.
*/
default String getMostSpecificId()
{
final String subqueryId = getSubQueryId();
return subqueryId == null ? Preconditions.checkNotNull(getId(), "queryId") : subqueryId;
}
Query<T> withDataSource(DataSource dataSource);
default Query<T> optimizeForSegment(PerSegmentQueryOptimizationContext optimizationContext)

View File

@ -53,6 +53,7 @@ public class QueryContexts
public static final String JOIN_FILTER_REWRITE_VALUE_COLUMN_FILTERS_ENABLE_KEY = "enableJoinFilterRewriteValueColumnFilters";
public static final String JOIN_FILTER_REWRITE_MAX_SIZE_KEY = "joinFilterRewriteMaxSize";
public static final String USE_FILTER_CNF_KEY = "useFilterCNF";
public static final String SHOULD_FAIL_ON_TRUNCATED_RESPONSE_CONTEXT_KEY = "shouldFailOnTruncatedResponseContext";
public static final boolean DEFAULT_BY_SEGMENT = false;
public static final boolean DEFAULT_POPULATE_CACHE = true;
@ -70,6 +71,7 @@ public class QueryContexts
public static final boolean DEFAULT_ENABLE_JOIN_FILTER_REWRITE_VALUE_COLUMN_FILTERS = false;
public static final long DEFAULT_ENABLE_JOIN_FILTER_REWRITE_MAX_SIZE = 10000;
public static final boolean DEFAULT_USE_FILTER_CNF = false;
public static final boolean DEFAULT_SHOULD_FAIL_ON_TRUNCATED_RESPONSE_CONTEXT = false;
@SuppressWarnings("unused") // Used by Jackson serialization
public enum Vectorize
@ -344,6 +346,19 @@ public class QueryContexts
return defaultTimeout;
}
public static <T> Query<T> setFailOnTruncatedResponseContext(Query<T> query)
{
return query.withOverriddenContext(ImmutableMap.of(SHOULD_FAIL_ON_TRUNCATED_RESPONSE_CONTEXT_KEY, true));
}
public static <T> boolean shouldFailOnTruncatedResponseContext(Query<T> query)
{
return query.getContextBoolean(
SHOULD_FAIL_ON_TRUNCATED_RESPONSE_CONTEXT_KEY,
DEFAULT_SHOULD_FAIL_ON_TRUNCATED_RESPONSE_CONTEXT
);
}
static <T> long parseLong(Query<T> query, String key, long defaultValue)
{
final Object val = query.getContextValue(key);

View File

@ -50,6 +50,7 @@ public class QueryInterruptedException extends QueryException
public static final String RESOURCE_LIMIT_EXCEEDED = "Resource limit exceeded";
public static final String UNAUTHORIZED = "Unauthorized request.";
public static final String UNSUPPORTED_OPERATION = "Unsupported operation";
public static final String TRUNCATED_RESPONSE_CONTEXT = "Truncated response context";
public static final String UNKNOWN_EXCEPTION = "Unknown exception";
@JsonCreator
@ -105,6 +106,8 @@ public class QueryInterruptedException extends QueryException
return RESOURCE_LIMIT_EXCEEDED;
} else if (e instanceof UnsupportedOperationException) {
return UNSUPPORTED_OPERATION;
} else if (e instanceof TruncatedResponseContextException) {
return TRUNCATED_RESPONSE_CONTEXT;
} else {
return UNKNOWN_EXCEPTION;
}

View File

@ -1,114 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Iterables;
import org.apache.druid.java.util.common.guava.MergeSequence;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.guava.Sequences;
import org.apache.druid.java.util.common.guava.Yielder;
import org.apache.druid.java.util.common.guava.YieldingAccumulator;
import org.apache.druid.java.util.common.guava.YieldingSequenceBase;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.query.context.ResponseContext;
import org.apache.druid.segment.SegmentMissingException;
import java.util.ArrayList;
import java.util.List;
public class RetryQueryRunner<T> implements QueryRunner<T>
{
private static final EmittingLogger log = new EmittingLogger(RetryQueryRunner.class);
private final QueryRunner<T> baseRunner;
private final RetryQueryRunnerConfig config;
private final ObjectMapper jsonMapper;
public RetryQueryRunner(
QueryRunner<T> baseRunner,
RetryQueryRunnerConfig config,
ObjectMapper jsonMapper
)
{
this.baseRunner = baseRunner;
this.config = config;
this.jsonMapper = jsonMapper;
}
@Override
public Sequence<T> run(final QueryPlus<T> queryPlus, final ResponseContext context)
{
final List<Sequence<T>> listOfSequences = new ArrayList<>();
listOfSequences.add(baseRunner.run(queryPlus, context));
return new YieldingSequenceBase<T>()
{
@Override
public <OutType> Yielder<OutType> toYielder(OutType initValue, YieldingAccumulator<OutType, T> accumulator)
{
List<SegmentDescriptor> missingSegments = getMissingSegments(context);
if (!missingSegments.isEmpty()) {
for (int i = 0; i < config.getNumTries(); i++) {
log.info("[%,d] missing segments found. Retry attempt [%,d]", missingSegments.size(), i);
context.put(ResponseContext.Key.MISSING_SEGMENTS, new ArrayList<>());
final QueryPlus<T> retryQueryPlus = queryPlus.withQuery(
Queries.withSpecificSegments(queryPlus.getQuery(), missingSegments)
);
Sequence<T> retrySequence = baseRunner.run(retryQueryPlus, context);
listOfSequences.add(retrySequence);
missingSegments = getMissingSegments(context);
if (missingSegments.isEmpty()) {
break;
}
}
final List<SegmentDescriptor> finalMissingSegs = getMissingSegments(context);
if (!config.isReturnPartialResults() && !finalMissingSegs.isEmpty()) {
throw new SegmentMissingException("No results found for segments[%s]", finalMissingSegs);
}
return new MergeSequence<>(queryPlus.getQuery().getResultOrdering(), Sequences.simple(listOfSequences))
.toYielder(initValue, accumulator);
} else {
return Iterables.getOnlyElement(listOfSequences).toYielder(initValue, accumulator);
}
}
};
}
private List<SegmentDescriptor> getMissingSegments(final ResponseContext context)
{
final Object maybeMissingSegments = context.get(ResponseContext.Key.MISSING_SEGMENTS);
if (maybeMissingSegments == null) {
return new ArrayList<>();
}
return jsonMapper.convertValue(
maybeMissingSegments,
new TypeReference<List<SegmentDescriptor>>()
{
}
);
}
}

View File

@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query;
import org.apache.druid.java.util.common.StringUtils;
/**
* This exception is thrown when {@link org.apache.druid.query.context.ResponseContext} is truncated after serialization
* in historicals or realtime tasks. The serialized response context can be truncated if its size is larger than
* {@code QueryResource#RESPONSE_CTX_HEADER_LEN_LIMIT}.
*
* @see org.apache.druid.query.context.ResponseContext#serializeWith
* @see QueryContexts#shouldFailOnTruncatedResponseContext
*/
public class TruncatedResponseContextException extends RuntimeException
{
public TruncatedResponseContextException(String message, Object... arguments)
{
super(StringUtils.nonStrictFormat(message, arguments));
}
}

View File

@ -55,7 +55,12 @@ public class UnionQueryRunner<T> implements QueryRunner<T>
public Sequence<T> apply(DataSource singleSource)
{
return baseRunner.run(
queryPlus.withQuery(query.withDataSource(singleSource)),
queryPlus.withQuery(
query.withDataSource(singleSource)
// assign the subqueryId. this will be used to validate that every query servers
// have responded per subquery in RetryQueryRunner
.withDefaultSubQueryId()
),
responseContext
);
}

View File

@ -28,10 +28,12 @@ import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import org.apache.druid.guice.annotations.PublicApi;
import org.apache.druid.java.util.common.NonnullPair;
import org.apache.druid.java.util.common.jackson.JacksonUtils;
import org.apache.druid.query.SegmentDescriptor;
import org.joda.time.Interval;
import javax.annotation.Nullable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
@ -40,6 +42,7 @@ import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.BiFunction;
/**
@ -112,6 +115,30 @@ public abstract class ResponseContext
"uncoveredIntervalsOverflowed",
(oldValue, newValue) -> (boolean) oldValue || (boolean) newValue
),
/**
* Map of most relevant query ID to remaining number of responses from query nodes.
* The value is initialized in {@code CachingClusteredClient} when it initializes the connection to the query nodes,
* and is updated whenever they respond (@code DirectDruidClient). {@code RetryQueryRunner} uses this value to
* check if the {@link #MISSING_SEGMENTS} is valid.
*
* Currently, the broker doesn't run subqueries in parallel, the remaining number of responses will be updated
* one by one per subquery. However, since it can be parallelized to run subqueries simultaneously, we store them
* in a ConcurrentHashMap.
*
* @see org.apache.druid.query.Query#getMostSpecificId
*/
REMAINING_RESPONSES_FROM_QUERY_SERVERS(
"remainingResponsesFromQueryServers",
(totalRemainingPerId, idAndNumResponses) -> {
final ConcurrentHashMap<String, Integer> map = (ConcurrentHashMap<String, Integer>) totalRemainingPerId;
final NonnullPair<String, Integer> pair = (NonnullPair<String, Integer>) idAndNumResponses;
map.compute(
pair.lhs,
(id, remaining) -> remaining == null ? pair.rhs : remaining + pair.rhs
);
return map;
}
),
/**
* Lists missing segments.
*/
@ -335,7 +362,7 @@ public abstract class ResponseContext
{
final String fullSerializedString = objectMapper.writeValueAsString(getDelegate());
if (fullSerializedString.length() <= maxCharsNumber) {
return new SerializationResult(fullSerializedString, fullSerializedString);
return new SerializationResult(null, fullSerializedString);
} else {
// Indicates that the context is truncated during serialization.
add(Key.TRUNCATED, true);
@ -411,18 +438,22 @@ public abstract class ResponseContext
*/
public static class SerializationResult
{
@Nullable
private final String truncatedResult;
private final String fullResult;
SerializationResult(String truncatedResult, String fullResult)
SerializationResult(@Nullable String truncatedResult, String fullResult)
{
this.truncatedResult = truncatedResult;
this.fullResult = fullResult;
}
public String getTruncatedResult()
/**
* Returns the truncated result if it exists otherwise returns the full result.
*/
public String getResult()
{
return truncatedResult;
return isTruncated() ? truncatedResult : fullResult;
}
public String getFullResult()
@ -430,9 +461,9 @@ public abstract class ResponseContext
return fullResult;
}
public Boolean isReduced()
public boolean isTruncated()
{
return !truncatedResult.equals(fullResult);
return truncatedResult != null;
}
}
}

View File

@ -27,6 +27,7 @@ import com.google.common.base.Function;
import com.google.common.base.Functions;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Ordering;
import com.google.common.primitives.Longs;
@ -78,6 +79,7 @@ import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
import java.util.stream.Collectors;
/**
@ -867,7 +869,7 @@ public class GroupByQuery extends BaseQuery<ResultRow>
private List<PostAggregator> postAggregatorSpecs;
@Nullable
private HavingSpec havingSpec;
@Nullable
private Map<String, Object> context;
@Nullable
@ -1115,6 +1117,17 @@ public class GroupByQuery extends BaseQuery<ResultRow>
return this;
}
public Builder randomQueryId()
{
return queryId(UUID.randomUUID().toString());
}
public Builder queryId(String queryId)
{
context = BaseQuery.computeOverriddenContext(context, ImmutableMap.of(BaseQuery.QUERY_ID, queryId));
return this;
}
public Builder overrideContext(Map<String, Object> contextOverride)
{
this.context = computeOverriddenContext(context, contextOverride);

View File

@ -19,9 +19,11 @@
package org.apache.druid.query.topn;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Sets;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.query.BaseQuery;
import org.apache.druid.query.DataSource;
import org.apache.druid.query.TableDataSource;
import org.apache.druid.query.aggregation.AggregatorFactory;
@ -43,6 +45,7 @@ import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
/**
* A Builder for TopNQuery.
@ -276,8 +279,18 @@ public class TopNQueryBuilder
public TopNQueryBuilder context(Map<String, Object> c)
{
context = c;
this.context = c;
return this;
}
public TopNQueryBuilder randomQueryId()
{
return queryId(UUID.randomUUID().toString());
}
public TopNQueryBuilder queryId(String queryId)
{
context = BaseQuery.computeOverriddenContext(context, ImmutableMap.of(BaseQuery.QUERY_ID, queryId));
return this;
}
}

View File

@ -0,0 +1,238 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.query.Druids.SearchQueryBuilder;
import org.apache.druid.query.Druids.TimeBoundaryQueryBuilder;
import org.apache.druid.query.Druids.TimeseriesQueryBuilder;
import org.apache.druid.query.search.SearchQuery;
import org.apache.druid.query.spec.MultipleSpecificSegmentSpec;
import org.apache.druid.query.spec.QuerySegmentSpec;
import org.apache.druid.query.timeboundary.TimeBoundaryQuery;
import org.apache.druid.query.timeseries.TimeseriesQuery;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.runners.Enclosed;
import org.junit.runner.RunWith;
@RunWith(Enclosed.class)
public class DruidsTest
{
private static final String DATASOURCE = "datasource";
private static final QuerySegmentSpec QUERY_SEGMENT_SPEC = new MultipleSpecificSegmentSpec(
ImmutableList.of(
new SegmentDescriptor(Intervals.of("2000/3000"), "0", 0),
new SegmentDescriptor(Intervals.of("2000/3000"), "0", 1)
)
);
public static class TimeseriesQueryBuilderTest
{
private TimeseriesQueryBuilder builder;
@Before
public void setup()
{
builder = Druids.newTimeseriesQueryBuilder()
.dataSource(DATASOURCE)
.intervals(QUERY_SEGMENT_SPEC)
.granularity(Granularities.ALL);
}
@Test
public void testQueryIdWhenContextInBuilderIsNullReturnContextContainingQueryId()
{
final TimeseriesQuery query = builder
.queryId("queryId")
.build();
Assert.assertEquals(ImmutableMap.of(BaseQuery.QUERY_ID, "queryId"), query.getContext());
}
@Test
public void testQueryIdWhenBuilderHasNonnullContextWithoutQueryIdReturnMergedContext()
{
final TimeseriesQuery query = builder
.context(ImmutableMap.of("my", "context"))
.queryId("queryId")
.build();
Assert.assertEquals(ImmutableMap.of(BaseQuery.QUERY_ID, "queryId", "my", "context"), query.getContext());
}
@Test
public void testQueryIdWhenBuilderHasNonnullContextWithQueryIdReturnMergedContext()
{
final TimeseriesQuery query = builder
.context(ImmutableMap.of("my", "context", BaseQuery.QUERY_ID, "queryId"))
.queryId("realQueryId")
.build();
Assert.assertEquals(ImmutableMap.of(BaseQuery.QUERY_ID, "realQueryId", "my", "context"), query.getContext());
}
@Test
public void testContextAfterSettingQueryIdReturnContextWithoutQueryId()
{
final TimeseriesQuery query = builder
.queryId("queryId")
.context(ImmutableMap.of("my", "context"))
.build();
Assert.assertEquals(ImmutableMap.of("my", "context"), query.getContext());
}
@Test
public void testContextContainingQueryIdAfterSettingQueryIdOverwriteQueryId()
{
final TimeseriesQuery query = builder
.queryId("queryId")
.context(ImmutableMap.of("my", "context", BaseQuery.QUERY_ID, "realQueryId"))
.build();
Assert.assertEquals(ImmutableMap.of(BaseQuery.QUERY_ID, "realQueryId", "my", "context"), query.getContext());
}
}
public static class SearchQueryBuilderTest
{
private SearchQueryBuilder builder;
@Before
public void setup()
{
builder = Druids.newSearchQueryBuilder()
.dataSource(DATASOURCE)
.intervals(QUERY_SEGMENT_SPEC)
.granularity(Granularities.ALL);
}
@Test
public void testQueryIdWhenContextInBuilderIsNullReturnContextContainingQueryId()
{
final SearchQuery query = builder
.queryId("queryId")
.build();
Assert.assertEquals(ImmutableMap.of(BaseQuery.QUERY_ID, "queryId"), query.getContext());
}
@Test
public void testQueryIdWhenBuilderHasNonnullContextWithoutQueryIdReturnMergedContext()
{
final SearchQuery query = builder
.context(ImmutableMap.of("my", "context"))
.queryId("queryId")
.build();
Assert.assertEquals(ImmutableMap.of(BaseQuery.QUERY_ID, "queryId", "my", "context"), query.getContext());
}
@Test
public void testQueryIdWhenBuilderHasNonnullContextWithQueryIdReturnMergedContext()
{
final SearchQuery query = builder
.context(ImmutableMap.of("my", "context", BaseQuery.QUERY_ID, "queryId"))
.queryId("realQueryId")
.build();
Assert.assertEquals(ImmutableMap.of(BaseQuery.QUERY_ID, "realQueryId", "my", "context"), query.getContext());
}
@Test
public void testContextAfterSettingQueryIdReturnContextWithoutQueryId()
{
final SearchQuery query = builder
.queryId("queryId")
.context(ImmutableMap.of("my", "context"))
.build();
Assert.assertEquals(ImmutableMap.of("my", "context"), query.getContext());
}
@Test
public void testContextContainingQueryIdAfterSettingQueryIdOverwriteQueryId()
{
final SearchQuery query = builder
.queryId("queryId")
.context(ImmutableMap.of("my", "context", BaseQuery.QUERY_ID, "realQueryId"))
.build();
Assert.assertEquals(ImmutableMap.of(BaseQuery.QUERY_ID, "realQueryId", "my", "context"), query.getContext());
}
}
public static class TimeBoundaryBuilderTest
{
private TimeBoundaryQueryBuilder builder;
@Before
public void setup()
{
builder = Druids.newTimeBoundaryQueryBuilder()
.dataSource(DATASOURCE)
.intervals(QUERY_SEGMENT_SPEC);
}
@Test
public void testQueryIdWhenContextInBuilderIsNullReturnContextContainingQueryId()
{
final TimeBoundaryQuery query = builder
.queryId("queryId")
.build();
Assert.assertEquals(ImmutableMap.of(BaseQuery.QUERY_ID, "queryId"), query.getContext());
}
@Test
public void testQueryIdWhenBuilderHasNonnullContextWithoutQueryIdReturnMergedContext()
{
final TimeBoundaryQuery query = builder
.context(ImmutableMap.of("my", "context"))
.queryId("queryId")
.build();
Assert.assertEquals(ImmutableMap.of(BaseQuery.QUERY_ID, "queryId", "my", "context"), query.getContext());
}
@Test
public void testQueryIdWhenBuilderHasNonnullContextWithQueryIdReturnMergedContext()
{
final TimeBoundaryQuery query = builder
.context(ImmutableMap.of("my", "context", BaseQuery.QUERY_ID, "queryId"))
.queryId("realQueryId")
.build();
Assert.assertEquals(ImmutableMap.of(BaseQuery.QUERY_ID, "realQueryId", "my", "context"), query.getContext());
}
@Test
public void testContextAfterSettingQueryIdReturnContextWithoutQueryId()
{
final TimeBoundaryQuery query = builder
.queryId("queryId")
.context(ImmutableMap.of("my", "context"))
.build();
Assert.assertEquals(ImmutableMap.of("my", "context"), query.getContext());
}
@Test
public void testContextContainingQueryIdAfterSettingQueryIdOverwriteQueryId()
{
final TimeBoundaryQuery query = builder
.queryId("queryId")
.context(ImmutableMap.of("my", "context", BaseQuery.QUERY_ID, "realQueryId"))
.build();
Assert.assertEquals(ImmutableMap.of(BaseQuery.QUERY_ID, "realQueryId", "my", "context"), query.getContext());
}
}
}

View File

@ -107,4 +107,18 @@ public class QueryContextsTest
QueryContexts.withMaxScatterGatherBytes(query, 100);
}
@Test
public void testSetFailOnTruncatedResponseContext()
{
final Query<?> query = new TestQuery(
new TableDataSource("test"),
new MultipleIntervalSegmentSpec(ImmutableList.of(Intervals.of("0/100"))),
false,
null
);
Assert.assertFalse(QueryContexts.shouldFailOnTruncatedResponseContext(query));
final Query<?> queryWithFlag = QueryContexts.setFailOnTruncatedResponseContext(query);
Assert.assertTrue(QueryContexts.shouldFailOnTruncatedResponseContext(queryWithFlag));
}
}

View File

@ -1,350 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.guava.Sequences;
import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
import org.apache.druid.query.context.ConcurrentResponseContext;
import org.apache.druid.query.context.ResponseContext;
import org.apache.druid.query.spec.MultipleSpecificSegmentSpec;
import org.apache.druid.query.timeseries.TimeseriesQuery;
import org.apache.druid.query.timeseries.TimeseriesResultValue;
import org.apache.druid.segment.SegmentMissingException;
import org.apache.druid.segment.TestHelper;
import org.junit.Assert;
import org.junit.Test;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
public class RetryQueryRunnerTest
{
private static class TestRetryQueryRunnerConfig extends RetryQueryRunnerConfig
{
private int numTries;
private boolean returnPartialResults;
public TestRetryQueryRunnerConfig(int numTries, boolean returnPartialResults)
{
this.numTries = numTries;
this.returnPartialResults = returnPartialResults;
}
@Override
public int getNumTries()
{
return numTries;
}
@Override
public boolean isReturnPartialResults()
{
return returnPartialResults;
}
}
private final ObjectMapper jsonMapper = TestHelper.makeJsonMapper();
final TimeseriesQuery query = Druids.newTimeseriesQueryBuilder()
.dataSource(QueryRunnerTestHelper.DATA_SOURCE)
.granularity(QueryRunnerTestHelper.DAY_GRAN)
.intervals(QueryRunnerTestHelper.FIRST_TO_THIRD)
.aggregators(
Arrays.asList(
QueryRunnerTestHelper.ROWS_COUNT,
new LongSumAggregatorFactory(
"idx",
"index"
),
QueryRunnerTestHelper.QUALITY_UNIQUES
)
)
.build();
@Test
public void testRunWithMissingSegments()
{
ResponseContext context = ConcurrentResponseContext.createEmpty();
context.put(ResponseContext.Key.MISSING_SEGMENTS, new ArrayList<>());
RetryQueryRunner<Result<TimeseriesResultValue>> runner = new RetryQueryRunner<>(
new QueryRunner<Result<TimeseriesResultValue>>()
{
@Override
public Sequence<Result<TimeseriesResultValue>> run(QueryPlus queryPlus, ResponseContext context)
{
context.add(
ResponseContext.Key.MISSING_SEGMENTS,
Collections.singletonList(new SegmentDescriptor(Intervals.utc(178888, 1999999), "test", 1))
);
return Sequences.empty();
}
},
new RetryQueryRunnerConfig()
{
@Override
public int getNumTries()
{
return 0;
}
@Override
public boolean isReturnPartialResults()
{
return true;
}
},
jsonMapper
);
Iterable<Result<TimeseriesResultValue>> actualResults = runner.run(QueryPlus.wrap(query), context).toList();
Assert.assertTrue(
"Should have one entry in the list of missing segments",
((List) context.get(ResponseContext.Key.MISSING_SEGMENTS)).size() == 1
);
Assert.assertTrue("Should return an empty sequence as a result", ((List) actualResults).size() == 0);
}
@Test
public void testRetry()
{
ResponseContext context = ConcurrentResponseContext.createEmpty();
context.put(ResponseContext.Key.NUM_SCANNED_ROWS, 0);
context.put(ResponseContext.Key.MISSING_SEGMENTS, new ArrayList<>());
RetryQueryRunner<Result<TimeseriesResultValue>> runner = new RetryQueryRunner<>(
new QueryRunner<Result<TimeseriesResultValue>>()
{
@Override
public Sequence<Result<TimeseriesResultValue>> run(
QueryPlus<Result<TimeseriesResultValue>> queryPlus,
ResponseContext context
)
{
if ((int) context.get(ResponseContext.Key.NUM_SCANNED_ROWS) == 0) {
context.add(
ResponseContext.Key.MISSING_SEGMENTS,
Collections.singletonList(new SegmentDescriptor(Intervals.utc(178888, 1999999), "test", 1))
);
context.put(ResponseContext.Key.NUM_SCANNED_ROWS, 1);
return Sequences.empty();
} else {
return Sequences.simple(
Collections.singletonList(
new Result<>(
DateTimes.nowUtc(),
new TimeseriesResultValue(
new HashMap<>()
)
)
)
);
}
}
},
new TestRetryQueryRunnerConfig(1, true),
jsonMapper
);
Iterable<Result<TimeseriesResultValue>> actualResults = runner.run(QueryPlus.wrap(query), context).toList();
Assert.assertTrue("Should return a list with one element", ((List) actualResults).size() == 1);
Assert.assertTrue(
"Should have nothing in missingSegment list",
((List) context.get(ResponseContext.Key.MISSING_SEGMENTS)).size() == 0
);
}
@Test
public void testRetryMultiple()
{
ResponseContext context = ConcurrentResponseContext.createEmpty();
context.put(ResponseContext.Key.NUM_SCANNED_ROWS, 0);
context.put(ResponseContext.Key.MISSING_SEGMENTS, new ArrayList<>());
RetryQueryRunner<Result<TimeseriesResultValue>> runner = new RetryQueryRunner<>(
new QueryRunner<Result<TimeseriesResultValue>>()
{
@Override
public Sequence<Result<TimeseriesResultValue>> run(
QueryPlus<Result<TimeseriesResultValue>> queryPlus,
ResponseContext context
)
{
if ((int) context.get(ResponseContext.Key.NUM_SCANNED_ROWS) < 3) {
context.add(
ResponseContext.Key.MISSING_SEGMENTS,
Collections.singletonList(new SegmentDescriptor(Intervals.utc(178888, 1999999), "test", 1))
);
context.put(ResponseContext.Key.NUM_SCANNED_ROWS, (int) context.get(ResponseContext.Key.NUM_SCANNED_ROWS) + 1);
return Sequences.empty();
} else {
return Sequences.simple(
Collections.singletonList(
new Result<>(
DateTimes.nowUtc(),
new TimeseriesResultValue(
new HashMap<>()
)
)
)
);
}
}
},
new TestRetryQueryRunnerConfig(4, true),
jsonMapper
);
Iterable<Result<TimeseriesResultValue>> actualResults = runner.run(QueryPlus.wrap(query), context).toList();
Assert.assertTrue("Should return a list with one element", ((List) actualResults).size() == 1);
Assert.assertTrue(
"Should have nothing in missingSegment list",
((List) context.get(ResponseContext.Key.MISSING_SEGMENTS)).size() == 0
);
}
@Test(expected = SegmentMissingException.class)
public void testException()
{
ResponseContext context = ConcurrentResponseContext.createEmpty();
context.put(ResponseContext.Key.MISSING_SEGMENTS, new ArrayList<>());
RetryQueryRunner<Result<TimeseriesResultValue>> runner = new RetryQueryRunner<>(
new QueryRunner<Result<TimeseriesResultValue>>()
{
@Override
public Sequence<Result<TimeseriesResultValue>> run(
QueryPlus<Result<TimeseriesResultValue>> queryPlus,
ResponseContext context
)
{
context.add(
ResponseContext.Key.MISSING_SEGMENTS,
Collections.singletonList(new SegmentDescriptor(Intervals.utc(178888, 1999999), "test", 1))
);
return Sequences.empty();
}
},
new TestRetryQueryRunnerConfig(1, false),
jsonMapper
);
runner.run(QueryPlus.wrap(query), context).toList();
Assert.assertTrue(
"Should have one entry in the list of missing segments",
((List) context.get(ResponseContext.Key.MISSING_SEGMENTS)).size() == 1
);
}
@Test
public void testNoDuplicateRetry()
{
ResponseContext context = ConcurrentResponseContext.createEmpty();
context.put(ResponseContext.Key.NUM_SCANNED_ROWS, 0);
context.put(ResponseContext.Key.MISSING_SEGMENTS, new ArrayList<>());
RetryQueryRunner<Result<TimeseriesResultValue>> runner = new RetryQueryRunner<>(
new QueryRunner<Result<TimeseriesResultValue>>()
{
@Override
public Sequence<Result<TimeseriesResultValue>> run(
QueryPlus<Result<TimeseriesResultValue>> queryPlus,
ResponseContext context
)
{
final Query<Result<TimeseriesResultValue>> query = queryPlus.getQuery();
if ((int) context.get(ResponseContext.Key.NUM_SCANNED_ROWS) == 0) {
// assume 2 missing segments at first run
context.add(
ResponseContext.Key.MISSING_SEGMENTS,
Arrays.asList(
new SegmentDescriptor(Intervals.utc(178888, 1999999), "test", 1),
new SegmentDescriptor(Intervals.utc(178888, 1999999), "test", 2)
)
);
context.put(ResponseContext.Key.NUM_SCANNED_ROWS, 1);
return Sequences.simple(
Collections.singletonList(
new Result<>(
DateTimes.nowUtc(),
new TimeseriesResultValue(
new HashMap<>()
)
)
)
);
} else if ((int) context.get(ResponseContext.Key.NUM_SCANNED_ROWS) == 1) {
// this is first retry
Assert.assertTrue("Should retry with 2 missing segments", ((MultipleSpecificSegmentSpec) ((BaseQuery) query).getQuerySegmentSpec()).getDescriptors().size() == 2);
// assume only left 1 missing at first retry
context.add(
ResponseContext.Key.MISSING_SEGMENTS,
Collections.singletonList(new SegmentDescriptor(Intervals.utc(178888, 1999999), "test", 2))
);
context.put(ResponseContext.Key.NUM_SCANNED_ROWS, 2);
return Sequences.simple(
Collections.singletonList(
new Result<>(
DateTimes.nowUtc(),
new TimeseriesResultValue(
new HashMap<>()
)
)
)
);
} else {
// this is second retry
Assert.assertTrue("Should retry with 1 missing segments", ((MultipleSpecificSegmentSpec) ((BaseQuery) query).getQuerySegmentSpec()).getDescriptors().size() == 1);
// assume no more missing at second retry
context.put(ResponseContext.Key.NUM_SCANNED_ROWS, 3);
return Sequences.simple(
Collections.singletonList(
new Result<>(
DateTimes.nowUtc(),
new TimeseriesResultValue(
new HashMap<>()
)
)
)
);
}
}
},
new TestRetryQueryRunnerConfig(2, false),
jsonMapper
);
Iterable<Result<TimeseriesResultValue>> actualResults = runner.run(QueryPlus.wrap(query), context).toList();
Assert.assertTrue("Should return a list with 3 elements", ((List) actualResults).size() == 3);
Assert.assertTrue(
"Should have nothing in missingSegment list",
((List) context.get(ResponseContext.Key.MISSING_SEGMENTS)).size() == 0
);
}
}

View File

@ -23,7 +23,9 @@ import com.fasterxml.jackson.core.JsonProcessingException;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.NonnullPair;
import org.apache.druid.query.SegmentDescriptor;
import org.apache.druid.query.context.ResponseContext.Key;
import org.joda.time.Interval;
import org.junit.Assert;
import org.junit.Test;
@ -33,6 +35,7 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.BiFunction;
public class ResponseContextTest
@ -133,6 +136,18 @@ public class ResponseContextTest
((List) ctx.get(ResponseContext.Key.UNCOVERED_INTERVALS)).toArray()
);
final String queryId = "queryId";
final String queryId2 = "queryId2";
ctx.put(Key.REMAINING_RESPONSES_FROM_QUERY_SERVERS, new ConcurrentHashMap<>());
ctx.add(Key.REMAINING_RESPONSES_FROM_QUERY_SERVERS, new NonnullPair<>(queryId, 3));
ctx.add(Key.REMAINING_RESPONSES_FROM_QUERY_SERVERS, new NonnullPair<>(queryId2, 4));
ctx.add(Key.REMAINING_RESPONSES_FROM_QUERY_SERVERS, new NonnullPair<>(queryId, -1));
ctx.add(Key.REMAINING_RESPONSES_FROM_QUERY_SERVERS, new NonnullPair<>(queryId, -2));
Assert.assertEquals(
ImmutableMap.of(queryId, 0, queryId2, 4),
ctx.get(Key.REMAINING_RESPONSES_FROM_QUERY_SERVERS)
);
final SegmentDescriptor sd01 = new SegmentDescriptor(interval01, "01", 0);
ctx.add(ResponseContext.Key.MISSING_SEGMENTS, Collections.singletonList(sd01));
Assert.assertArrayEquals(
@ -214,14 +229,14 @@ public class ResponseContextTest
final DefaultObjectMapper mapper = new DefaultObjectMapper();
Assert.assertEquals(
mapper.writeValueAsString(ImmutableMap.of("ETag", "string-value")),
ctx1.serializeWith(mapper, Integer.MAX_VALUE).getTruncatedResult()
ctx1.serializeWith(mapper, Integer.MAX_VALUE).getResult()
);
final ResponseContext ctx2 = ResponseContext.createEmpty();
ctx2.add(ResponseContext.Key.NUM_SCANNED_ROWS, 100);
Assert.assertEquals(
mapper.writeValueAsString(ImmutableMap.of("count", 100)),
ctx2.serializeWith(mapper, Integer.MAX_VALUE).getTruncatedResult()
ctx2.serializeWith(mapper, Integer.MAX_VALUE).getResult()
);
}
@ -234,7 +249,7 @@ public class ResponseContextTest
final DefaultObjectMapper objectMapper = new DefaultObjectMapper();
final String fullString = objectMapper.writeValueAsString(ctx.getDelegate());
final ResponseContext.SerializationResult res1 = ctx.serializeWith(objectMapper, Integer.MAX_VALUE);
Assert.assertEquals(fullString, res1.getTruncatedResult());
Assert.assertEquals(fullString, res1.getResult());
final ResponseContext ctxCopy = ResponseContext.createEmpty();
ctxCopy.merge(ctx);
final ResponseContext.SerializationResult res2 = ctx.serializeWith(objectMapper, 30);
@ -242,7 +257,7 @@ public class ResponseContextTest
ctxCopy.put(ResponseContext.Key.TRUNCATED, true);
Assert.assertEquals(
ctxCopy.getDelegate(),
ResponseContext.deserialize(res2.getTruncatedResult(), objectMapper).getDelegate()
ResponseContext.deserialize(res2.getResult(), objectMapper).getDelegate()
);
}
@ -262,7 +277,7 @@ public class ResponseContextTest
final DefaultObjectMapper objectMapper = new DefaultObjectMapper();
final String fullString = objectMapper.writeValueAsString(ctx.getDelegate());
final ResponseContext.SerializationResult res1 = ctx.serializeWith(objectMapper, Integer.MAX_VALUE);
Assert.assertEquals(fullString, res1.getTruncatedResult());
Assert.assertEquals(fullString, res1.getResult());
final ResponseContext ctxCopy = ResponseContext.createEmpty();
ctxCopy.merge(ctx);
final ResponseContext.SerializationResult res2 = ctx.serializeWith(objectMapper, 70);
@ -271,7 +286,7 @@ public class ResponseContextTest
ctxCopy.put(ResponseContext.Key.TRUNCATED, true);
Assert.assertEquals(
ctxCopy.getDelegate(),
ResponseContext.deserialize(res2.getTruncatedResult(), objectMapper).getDelegate()
ResponseContext.deserialize(res2.getResult(), objectMapper).getDelegate()
);
}

View File

@ -0,0 +1,97 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.groupby;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.query.BaseQuery;
import org.apache.druid.query.QueryRunnerTestHelper;
import org.apache.druid.query.aggregation.DoubleMaxAggregatorFactory;
import org.apache.druid.query.dimension.DefaultDimensionSpec;
import org.apache.druid.query.groupby.GroupByQuery.Builder;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
public class GroupByQueryBuilderTest
{
private Builder builder;
@Before
public void setup()
{
builder = new Builder()
.setDataSource(QueryRunnerTestHelper.DATA_SOURCE)
.setGranularity(QueryRunnerTestHelper.ALL_GRAN)
.setQuerySegmentSpec(QueryRunnerTestHelper.EMPTY_INTERVAL)
.setDimensions(
new DefaultDimensionSpec(QueryRunnerTestHelper.MARKET_DIMENSION, QueryRunnerTestHelper.MARKET_DIMENSION)
)
.setAggregatorSpecs(new DoubleMaxAggregatorFactory("index", "index"));
}
@Test
public void testQueryIdWhenContextInBuilderIsNullReturnContextContainingQueryId()
{
final GroupByQuery query = builder
.queryId("queryId")
.build();
Assert.assertEquals(ImmutableMap.of(BaseQuery.QUERY_ID, "queryId"), query.getContext());
}
@Test
public void testQueryIdWhenBuilderHasNonnullContextWithoutQueryIdReturnMergedContext()
{
final GroupByQuery query = builder
.setContext(ImmutableMap.of("my", "context"))
.queryId("queryId")
.build();
Assert.assertEquals(ImmutableMap.of(BaseQuery.QUERY_ID, "queryId", "my", "context"), query.getContext());
}
@Test
public void testQueryIdWhenBuilderHasNonnullContextWithQueryIdReturnMergedContext()
{
final GroupByQuery query = builder
.setContext(ImmutableMap.of("my", "context", BaseQuery.QUERY_ID, "queryId"))
.queryId("realQueryId")
.build();
Assert.assertEquals(ImmutableMap.of(BaseQuery.QUERY_ID, "realQueryId", "my", "context"), query.getContext());
}
@Test
public void testContextAfterSettingQueryIdReturnContextWithoutQueryId()
{
final GroupByQuery query = builder
.queryId("queryId")
.setContext(ImmutableMap.of("my", "context"))
.build();
Assert.assertEquals(ImmutableMap.of("my", "context"), query.getContext());
}
@Test
public void testContextContainingQueryIdAfterSettingQueryIdOverwriteQueryId()
{
final GroupByQuery query = builder
.queryId("queryId")
.setContext(ImmutableMap.of("my", "context", BaseQuery.QUERY_ID, "realQueryId"))
.build();
Assert.assertEquals(ImmutableMap.of(BaseQuery.QUERY_ID, "realQueryId", "my", "context"), query.getContext());
}
}

View File

@ -0,0 +1,95 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.topn;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.query.BaseQuery;
import org.apache.druid.query.QueryRunnerTestHelper;
import org.apache.druid.query.aggregation.DoubleMaxAggregatorFactory;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
public class TopNQueryBuilderTest
{
private TopNQueryBuilder builder;
@Before
public void setup()
{
builder = new TopNQueryBuilder()
.dataSource(QueryRunnerTestHelper.DATA_SOURCE)
.granularity(QueryRunnerTestHelper.ALL_GRAN)
.dimension(QueryRunnerTestHelper.MARKET_DIMENSION)
.metric(QueryRunnerTestHelper.INDEX_METRIC)
.intervals(QueryRunnerTestHelper.EMPTY_INTERVAL)
.threshold(4)
.aggregators(new DoubleMaxAggregatorFactory("index", "index"));
}
@Test
public void testQueryIdWhenContextInBuilderIsNullReturnContextContainingQueryId()
{
final TopNQuery query = builder
.queryId("queryId")
.build();
Assert.assertEquals(ImmutableMap.of(BaseQuery.QUERY_ID, "queryId"), query.getContext());
}
@Test
public void testQueryIdWhenBuilderHasNonnullContextWithoutQueryIdReturnMergedContext()
{
final TopNQuery query = builder
.context(ImmutableMap.of("my", "context"))
.queryId("queryId")
.build();
Assert.assertEquals(ImmutableMap.of(BaseQuery.QUERY_ID, "queryId", "my", "context"), query.getContext());
}
@Test
public void testQueryIdWhenBuilderHasNonnullContextWithQueryIdReturnMergedContext()
{
final TopNQuery query = builder
.context(ImmutableMap.of("my", "context", BaseQuery.QUERY_ID, "queryId"))
.queryId("realQueryId")
.build();
Assert.assertEquals(ImmutableMap.of(BaseQuery.QUERY_ID, "realQueryId", "my", "context"), query.getContext());
}
@Test
public void testContextAfterSettingQueryIdReturnContextWithoutQueryId()
{
final TopNQuery query = builder
.queryId("queryId")
.context(ImmutableMap.of("my", "context"))
.build();
Assert.assertEquals(ImmutableMap.of("my", "context"), query.getContext());
}
@Test
public void testContextContainingQueryIdAfterSettingQueryIdOverwriteQueryId()
{
final TopNQuery query = builder
.queryId("queryId")
.context(ImmutableMap.of("my", "context", BaseQuery.QUERY_ID, "realQueryId"))
.build();
Assert.assertEquals(ImmutableMap.of(BaseQuery.QUERY_ID, "realQueryId", "my", "context"), query.getContext());
}
}

View File

@ -17,7 +17,7 @@
* under the License.
*/
package org.apache.druid.benchmark.datagen;
package org.apache.druid.segment.generator;
import com.google.common.hash.Hashing;
import org.apache.druid.common.config.NullHandling;
@ -35,8 +35,6 @@ import org.apache.druid.segment.QueryableIndex;
import org.apache.druid.segment.QueryableIndexIndexableAdapter;
import org.apache.druid.segment.TestHelper;
import org.apache.druid.segment.data.RoaringBitmapSerdeFactory;
import org.apache.druid.segment.generator.DataGenerator;
import org.apache.druid.segment.generator.GeneratorSchemaInfo;
import org.apache.druid.segment.incremental.IncrementalIndexSchema;
import org.apache.druid.segment.serde.ComplexMetrics;
import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;

View File

@ -40,6 +40,7 @@ import org.apache.druid.guice.annotations.Merging;
import org.apache.druid.guice.annotations.Smile;
import org.apache.druid.guice.http.DruidHttpClientConfig;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.NonnullPair;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.concurrent.Execs;
@ -65,6 +66,7 @@ import org.apache.druid.query.Result;
import org.apache.druid.query.SegmentDescriptor;
import org.apache.druid.query.aggregation.MetricManipulatorFns;
import org.apache.druid.query.context.ResponseContext;
import org.apache.druid.query.context.ResponseContext.Key;
import org.apache.druid.query.filter.DimFilterUtils;
import org.apache.druid.query.planning.DataSourceAnalysis;
import org.apache.druid.query.spec.QuerySegmentSpec;
@ -188,7 +190,21 @@ public class CachingClusteredClient implements QuerySegmentWalker
final UnaryOperator<TimelineLookup<String, ServerSelector>> timelineConverter
)
{
return new SpecificQueryRunnable<>(queryPlus, responseContext).run(timelineConverter);
final ClusterQueryResult<T> result = new SpecificQueryRunnable<>(queryPlus, responseContext).run(timelineConverter);
initializeNumRemainingResponsesInResponseContext(queryPlus.getQuery(), responseContext, result.numQueryServers);
return result.sequence;
}
private static <T> void initializeNumRemainingResponsesInResponseContext(
final Query<T> query,
final ResponseContext responseContext,
final int numQueryServers
)
{
responseContext.add(
Key.REMAINING_RESPONSES_FROM_QUERY_SERVERS,
new NonnullPair<>(query.getMostSpecificId(), numQueryServers)
);
}
@Override
@ -221,6 +237,18 @@ public class CachingClusteredClient implements QuerySegmentWalker
};
}
private static class ClusterQueryResult<T>
{
private final Sequence<T> sequence;
private final int numQueryServers;
private ClusterQueryResult(Sequence<T> sequence, int numQueryServers)
{
this.sequence = sequence;
this.numQueryServers = numQueryServers;
}
}
/**
* This class essentially encapsulates the major part of the logic of {@link CachingClusteredClient}. It's state and
* methods couldn't belong to {@link CachingClusteredClient} itself, because they depend on the specific query object
@ -283,13 +311,23 @@ public class CachingClusteredClient implements QuerySegmentWalker
return contextBuilder.build();
}
Sequence<T> run(final UnaryOperator<TimelineLookup<String, ServerSelector>> timelineConverter)
/**
* Builds a query distribution and merge plan.
*
* This method returns an empty sequence if the query datasource is unknown or there is matching result-level cache.
* Otherwise, it creates a sequence merging sequences from the regular broker cache and remote servers. If parallel
* merge is enabled, it can merge and *combine* the underlying sequences in parallel.
*
* @return a pair of a sequence merging results from remote query servers and the number of remote servers
* participating in query processing.
*/
ClusterQueryResult<T> run(final UnaryOperator<TimelineLookup<String, ServerSelector>> timelineConverter)
{
final Optional<? extends TimelineLookup<String, ServerSelector>> maybeTimeline = serverView.getTimeline(
dataSourceAnalysis
);
if (!maybeTimeline.isPresent()) {
return Sequences.empty();
return new ClusterQueryResult<>(Sequences.empty(), 0);
}
final TimelineLookup<String, ServerSelector> timeline = timelineConverter.apply(maybeTimeline.get());
@ -306,7 +344,7 @@ public class CachingClusteredClient implements QuerySegmentWalker
@Nullable
final String currentEtag = computeCurrentEtag(segmentServers, queryCacheKey);
if (currentEtag != null && currentEtag.equals(prevEtag)) {
return Sequences.empty();
return new ClusterQueryResult<>(Sequences.empty(), 0);
}
}
@ -324,7 +362,7 @@ public class CachingClusteredClient implements QuerySegmentWalker
return merge(sequencesByInterval);
});
return scheduler.run(query, mergedResultSequence);
return new ClusterQueryResult<>(scheduler.run(query, mergedResultSequence), segmentsByServer.size());
}
private Sequence<T> merge(List<Sequence<T>> sequencesByInterval)
@ -613,6 +651,10 @@ public class CachingClusteredClient implements QuerySegmentWalker
}
}
/**
* Create sequences that reads from remote query servers (historicals and tasks). Note that the broker will
* hold an HTTP connection per server after this method is called.
*/
private void addSequencesFromServer(
final List<Sequence<T>> listOfSequences,
final SortedMap<DruidServer, List<SegmentDescriptor>> segmentsByServer

View File

@ -27,6 +27,7 @@ import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import org.apache.druid.java.util.common.NonnullPair;
import org.apache.druid.java.util.common.RE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.concurrent.Execs;
@ -53,6 +54,7 @@ import org.apache.druid.query.QueryWatcher;
import org.apache.druid.query.aggregation.MetricManipulatorFns;
import org.apache.druid.query.context.ConcurrentResponseContext;
import org.apache.druid.query.context.ResponseContext;
import org.apache.druid.query.context.ResponseContext.Key;
import org.apache.druid.server.QueryResource;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
@ -69,6 +71,7 @@ import java.io.SequenceInputStream;
import java.net.URL;
import java.util.Enumeration;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
@ -86,6 +89,7 @@ public class DirectDruidClient<T> implements QueryRunner<T>
public static final String QUERY_FAIL_TIME = "queryFailTime";
private static final Logger log = new Logger(DirectDruidClient.class);
private static final int VAL_TO_REDUCE_REMAINING_RESPONSES = -1;
private final QueryToolChestWarehouse warehouse;
private final QueryWatcher queryWatcher;
@ -105,12 +109,14 @@ public class DirectDruidClient<T> implements QueryRunner<T>
public static void removeMagicResponseContextFields(ResponseContext responseContext)
{
responseContext.remove(ResponseContext.Key.QUERY_TOTAL_BYTES_GATHERED);
responseContext.remove(ResponseContext.Key.REMAINING_RESPONSES_FROM_QUERY_SERVERS);
}
public static ResponseContext makeResponseContextForQuery()
{
final ResponseContext responseContext = ConcurrentResponseContext.createEmpty();
responseContext.put(ResponseContext.Key.QUERY_TOTAL_BYTES_GATHERED, new AtomicLong());
responseContext.put(Key.REMAINING_RESPONSES_FROM_QUERY_SERVERS, new ConcurrentHashMap<>());
return responseContext;
}
@ -231,7 +237,17 @@ public class DirectDruidClient<T> implements QueryRunner<T>
final boolean continueReading;
try {
log.trace(
"Got a response from [%s] for query ID[%s], subquery ID[%s]",
url,
query.getId(),
query.getSubQueryId()
);
final String responseContext = response.headers().get(QueryResource.HEADER_RESPONSE_CONTEXT);
context.add(
ResponseContext.Key.REMAINING_RESPONSES_FROM_QUERY_SERVERS,
new NonnullPair<>(query.getMostSpecificId(), VAL_TO_REDUCE_REMAINING_RESPONSES)
);
// context may be null in case of error or query timeout
if (responseContext != null) {
context.merge(ResponseContext.deserialize(responseContext, objectMapper));
@ -438,11 +454,17 @@ public class DirectDruidClient<T> implements QueryRunner<T>
throw new RE("Query[%s] url[%s] timed out.", query.getId(), url);
}
// Some logics in brokers such as retry on missing segments rely on the response context,
// and thus truncated response contexts are not allowed.
final Query<T> queryToSend = QueryContexts.setFailOnTruncatedResponseContext(
QueryContexts.withTimeout(query, timeLeft)
);
future = httpClient.go(
new Request(
HttpMethod.POST,
new URL(url)
).setContent(objectMapper.writeValueAsBytes(QueryContexts.withTimeout(query, timeLeft)))
).setContent(objectMapper.writeValueAsBytes(queryToSend))
.setHeader(
HttpHeaders.Names.CONTENT_TYPE,
isSmile ? SmileMediaTypes.APPLICATION_JACKSON_SMILE : MediaType.APPLICATION_JSON

View File

@ -0,0 +1,250 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.guava.BaseSequence;
import org.apache.druid.java.util.common.guava.BaseSequence.IteratorMaker;
import org.apache.druid.java.util.common.guava.MergeSequence;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.guava.Yielder;
import org.apache.druid.java.util.common.guava.YieldingAccumulator;
import org.apache.druid.java.util.common.guava.YieldingSequenceBase;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.query.context.ResponseContext;
import org.apache.druid.query.context.ResponseContext.Key;
import org.apache.druid.segment.SegmentMissingException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.BiFunction;
public class RetryQueryRunner<T> implements QueryRunner<T>
{
private static final Logger LOG = new Logger(RetryQueryRunner.class);
private final QueryRunner<T> baseRunner;
private final BiFunction<Query<T>, List<SegmentDescriptor>, QueryRunner<T>> retryRunnerCreateFn;
private final RetryQueryRunnerConfig config;
private final ObjectMapper jsonMapper;
/**
* Runnable executed after the broker creates query distribution tree for the first attempt. This is only
* for testing and must not be used in production code.
*/
private final Runnable runnableAfterFirstAttempt;
private int totalNumRetries;
public RetryQueryRunner(
QueryRunner<T> baseRunner,
BiFunction<Query<T>, List<SegmentDescriptor>, QueryRunner<T>> retryRunnerCreateFn,
RetryQueryRunnerConfig config,
ObjectMapper jsonMapper
)
{
this(baseRunner, retryRunnerCreateFn, config, jsonMapper, () -> {});
}
/**
* Constructor only for testing.
*/
@VisibleForTesting
RetryQueryRunner(
QueryRunner<T> baseRunner,
BiFunction<Query<T>, List<SegmentDescriptor>, QueryRunner<T>> retryRunnerCreateFn,
RetryQueryRunnerConfig config,
ObjectMapper jsonMapper,
Runnable runnableAfterFirstAttempt
)
{
this.baseRunner = baseRunner;
this.retryRunnerCreateFn = retryRunnerCreateFn;
this.config = config;
this.jsonMapper = jsonMapper;
this.runnableAfterFirstAttempt = runnableAfterFirstAttempt;
}
@VisibleForTesting
int getTotalNumRetries()
{
return totalNumRetries;
}
@Override
public Sequence<T> run(final QueryPlus<T> queryPlus, final ResponseContext context)
{
return new YieldingSequenceBase<T>()
{
@Override
public <OutType> Yielder<OutType> toYielder(OutType initValue, YieldingAccumulator<OutType, T> accumulator)
{
final Sequence<Sequence<T>> retryingSequence = new BaseSequence<>(
new IteratorMaker<Sequence<T>, RetryingSequenceIterator>()
{
@Override
public RetryingSequenceIterator make()
{
return new RetryingSequenceIterator(queryPlus, context, baseRunner, runnableAfterFirstAttempt);
}
@Override
public void cleanup(RetryingSequenceIterator iterFromMake)
{
totalNumRetries = iterFromMake.retryCount;
}
}
);
return new MergeSequence<>(queryPlus.getQuery().getResultOrdering(), retryingSequence)
.toYielder(initValue, accumulator);
}
};
}
private List<SegmentDescriptor> getMissingSegments(QueryPlus<T> queryPlus, final ResponseContext context)
{
// Sanity check before retrieving missingSegments from responseContext.
// The missingSegments in the responseContext is only valid when all servers have responded to the broker.
// The remainingResponses MUST be not null but 0 in the responseContext at this point.
final ConcurrentHashMap<String, Integer> idToRemainingResponses =
(ConcurrentHashMap<String, Integer>) Preconditions.checkNotNull(
context.get(Key.REMAINING_RESPONSES_FROM_QUERY_SERVERS),
"%s in responseContext",
Key.REMAINING_RESPONSES_FROM_QUERY_SERVERS.getName()
);
final int remainingResponses = Preconditions.checkNotNull(
idToRemainingResponses.get(queryPlus.getQuery().getMostSpecificId()),
"Number of remaining responses for query[%s]",
queryPlus.getQuery().getMostSpecificId()
);
if (remainingResponses > 0) {
throw new ISE("Failed to check missing segments due to missing responses from [%d] servers", remainingResponses);
}
final Object maybeMissingSegments = context.get(ResponseContext.Key.MISSING_SEGMENTS);
if (maybeMissingSegments == null) {
return Collections.emptyList();
}
return jsonMapper.convertValue(
maybeMissingSegments,
new TypeReference<List<SegmentDescriptor>>()
{
}
);
}
/**
* A lazy iterator populating {@link Sequence} by retrying the query. The first returned sequence is always the base
* sequence from the baseQueryRunner. Subsequent sequences are created dynamically whenever it retries the query. All
* the sequences populated by this iterator will be merged (not combined) with the base sequence.
*
* The design of this iterator depends on how {@link MergeSequence} works; the MergeSequence pops an item from
* each underlying sequence and pushes them to a {@link java.util.PriorityQueue}. Whenever it pops from the queue,
* it pushes a new item from the sequence where the returned item was originally from. Since the first returned
* sequence from this iterator is always the base sequence, the MergeSequence will call {@link Sequence#toYielder}
* on the base sequence first which in turn initializing query distribution tree. Once this tree is built, the query
* servers (historicals and realtime tasks) will lock all segments to read and report missing segments to the broker.
* If there are missing segments reported, this iterator will rewrite the query with those reported segments and
* reissue the rewritten query.
*
* @see org.apache.druid.client.CachingClusteredClient
* @see org.apache.druid.client.DirectDruidClient
*/
private class RetryingSequenceIterator implements Iterator<Sequence<T>>
{
private final QueryPlus<T> queryPlus;
private final ResponseContext context;
private final QueryRunner<T> baseQueryRunner;
private final Runnable runnableAfterFirstAttempt;
private boolean first = true;
private Sequence<T> sequence = null;
private int retryCount = 0;
private RetryingSequenceIterator(
QueryPlus<T> queryPlus,
ResponseContext context,
QueryRunner<T> baseQueryRunner,
Runnable runnableAfterFirstAttempt
)
{
this.queryPlus = queryPlus;
this.context = context;
this.baseQueryRunner = baseQueryRunner;
this.runnableAfterFirstAttempt = runnableAfterFirstAttempt;
}
@Override
public boolean hasNext()
{
if (first) {
sequence = baseQueryRunner.run(queryPlus, context);
// runnableAfterFirstAttempt is only for testing, it must be no-op for production code.
runnableAfterFirstAttempt.run();
first = false;
return true;
} else if (sequence != null) {
return true;
} else {
final List<SegmentDescriptor> missingSegments = getMissingSegments(queryPlus, context);
if (missingSegments.isEmpty()) {
return false;
} else if (retryCount >= config.getNumTries()) {
if (!config.isReturnPartialResults()) {
throw new SegmentMissingException("No results found for segments[%s]", missingSegments);
} else {
return false;
}
} else {
retryCount++;
LOG.info("[%,d] missing segments found. Retry attempt [%,d]", missingSegments.size(), retryCount);
context.put(ResponseContext.Key.MISSING_SEGMENTS, new ArrayList<>());
final QueryPlus<T> retryQueryPlus = queryPlus.withQuery(
Queries.withSpecificSegments(queryPlus.getQuery(), missingSegments)
);
sequence = retryRunnerCreateFn.apply(retryQueryPlus.getQuery(), missingSegments).run(retryQueryPlus, context);
return true;
}
}
}
@Override
public Sequence<T> next()
{
if (!hasNext()) {
throw new NoSuchElementException();
}
final Sequence<T> next = sequence;
sequence = null;
return next;
}
}
}

View File

@ -24,6 +24,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Iterables;
import com.google.inject.Inject;
import org.apache.druid.client.CachingClusteredClient;
import org.apache.druid.client.DirectDruidClient;
import org.apache.druid.client.cache.Cache;
import org.apache.druid.client.cache.CacheConfig;
import org.apache.druid.java.util.common.ISE;
@ -61,7 +62,6 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Stack;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
@ -329,7 +329,7 @@ public class ClientQuerySegmentWalker implements QuerySegmentWalker
}
} else if (canRunQueryUsingLocalWalker(subQuery) || canRunQueryUsingClusterWalker(subQuery)) {
// Subquery needs to be inlined. Assign it a subquery id and run it.
final Query subQueryWithId = subQuery.withSubQueryId(UUID.randomUUID().toString());
final Query subQueryWithId = subQuery.withDefaultSubQueryId();
final Sequence<?> queryResults;
@ -337,7 +337,10 @@ public class ClientQuerySegmentWalker implements QuerySegmentWalker
queryResults = Sequences.empty();
} else {
final QueryRunner subqueryRunner = subQueryWithId.getRunner(this);
queryResults = subqueryRunner.run(QueryPlus.wrap(subQueryWithId));
queryResults = subqueryRunner.run(
QueryPlus.wrap(subQueryWithId),
DirectDruidClient.makeResponseContextForQuery()
);
}
return toInlineDataSource(
@ -396,6 +399,7 @@ public class ClientQuerySegmentWalker implements QuerySegmentWalker
serverConfig,
new RetryQueryRunner<>(
baseClusterRunner,
clusterClient::getQueryRunnerForSegments,
retryConfig,
objectMapper
)

View File

@ -45,6 +45,7 @@ import org.apache.druid.query.QueryContexts;
import org.apache.druid.query.QueryInterruptedException;
import org.apache.druid.query.QueryToolChest;
import org.apache.druid.query.QueryUnsupportedException;
import org.apache.druid.query.TruncatedResponseContextException;
import org.apache.druid.query.context.ResponseContext;
import org.apache.druid.server.metrics.QueryCountStatsProvider;
import org.apache.druid.server.security.Access;
@ -284,16 +285,28 @@ public class QueryResource implements QueryCountStatsProvider
jsonMapper,
RESPONSE_CTX_HEADER_LEN_LIMIT
);
if (serializationResult.isReduced()) {
log.info(
"Response Context truncated for id [%s] . Full context is [%s].",
if (serializationResult.isTruncated()) {
final String logToPrint = StringUtils.format(
"Response Context truncated for id [%s]. Full context is [%s].",
queryId,
serializationResult.getFullResult()
);
if (QueryContexts.shouldFailOnTruncatedResponseContext(query)) {
log.error(logToPrint);
throw new QueryInterruptedException(
new TruncatedResponseContextException(
"Serialized response context exceeds the max size[%s]",
RESPONSE_CTX_HEADER_LEN_LIMIT
)
);
} else {
log.warn(logToPrint);
}
}
return responseBuilder
.header(HEADER_RESPONSE_CONTEXT, serializationResult.getTruncatedResult())
.header(HEADER_RESPONSE_CONTEXT, serializationResult.getResult())
.build();
}
catch (Exception e) {

View File

@ -66,6 +66,11 @@ public class SetAndVerifyContextQueryRunner<T> implements QueryRunner<T>
),
serverConfig.getMaxQueryTimeout()
);
return newQuery.withOverriddenContext(ImmutableMap.of(DirectDruidClient.QUERY_FAIL_TIME, this.startTimeMillis + QueryContexts.getTimeout(newQuery)));
return newQuery.withOverriddenContext(
ImmutableMap.of(
DirectDruidClient.QUERY_FAIL_TIME,
this.startTimeMillis + QueryContexts.getTimeout(newQuery)
)
);
}
}

View File

@ -250,8 +250,15 @@ public class ServerManager implements QuerySegmentWalker
final AtomicLong cpuTimeAccumulator
)
{
SpecificSegmentSpec segmentSpec = new SpecificSegmentSpec(segmentDescriptor);
SegmentId segmentId = segment.getId();
final SpecificSegmentSpec segmentSpec = new SpecificSegmentSpec(segmentDescriptor);
final SegmentId segmentId = segment.getId();
final Interval segmentInterval = segment.getDataInterval();
// ReferenceCountingSegment can return null for ID or interval if it's already closed.
// Here, we check one more time if the segment is closed.
// If the segment is closed after this line, ReferenceCountingSegmentQueryRunner will handle and do the right thing.
if (segmentId == null || segmentInterval == null) {
return new ReportTimelineMissingSegmentQueryRunner<>(segmentDescriptor);
}
String segmentIdString = segmentId.toString();
MetricsEmittingQueryRunner<T> metricsEmittingQueryRunnerInner = new MetricsEmittingQueryRunner<>(
@ -275,7 +282,7 @@ public class ServerManager implements QuerySegmentWalker
BySegmentQueryRunner<T> bySegmentQueryRunner = new BySegmentQueryRunner<>(
segmentId,
segment.getDataInterval().getStart(),
segmentInterval.getStart(),
cachingQueryRunner
);

View File

@ -766,7 +766,6 @@ public class DruidCoordinator
.withDruidCluster(cluster)
.withLoadManagementPeons(loadManagementPeons)
.withSegmentReplicantLookup(segmentReplicantLookup)
.withBalancerReferenceTimestamp(DateTimes.nowUtc())
.build();
}

View File

@ -23,12 +23,10 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.client.DataSourcesSnapshot;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.metadata.MetadataRuleManager;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.VersionedIntervalTimeline;
import org.joda.time.DateTime;
import javax.annotation.Nullable;
import java.util.Arrays;
@ -70,7 +68,6 @@ public class DruidCoordinatorRuntimeParams
private final CoordinatorDynamicConfig coordinatorDynamicConfig;
private final CoordinatorCompactionConfig coordinatorCompactionConfig;
private final CoordinatorStats stats;
private final DateTime balancerReferenceTimestamp;
private final BalancerStrategy balancerStrategy;
private final Set<String> broadcastDatasources;
@ -87,7 +84,6 @@ public class DruidCoordinatorRuntimeParams
CoordinatorDynamicConfig coordinatorDynamicConfig,
CoordinatorCompactionConfig coordinatorCompactionConfig,
CoordinatorStats stats,
DateTime balancerReferenceTimestamp,
BalancerStrategy balancerStrategy,
Set<String> broadcastDatasources
)
@ -104,7 +100,6 @@ public class DruidCoordinatorRuntimeParams
this.coordinatorDynamicConfig = coordinatorDynamicConfig;
this.coordinatorCompactionConfig = coordinatorCompactionConfig;
this.stats = stats;
this.balancerReferenceTimestamp = balancerReferenceTimestamp;
this.balancerStrategy = balancerStrategy;
this.broadcastDatasources = broadcastDatasources;
}
@ -175,11 +170,6 @@ public class DruidCoordinatorRuntimeParams
return stats;
}
public DateTime getBalancerReferenceTimestamp()
{
return balancerReferenceTimestamp;
}
public BalancerStrategy getBalancerStrategy()
{
return balancerStrategy;
@ -225,7 +215,6 @@ public class DruidCoordinatorRuntimeParams
coordinatorDynamicConfig,
coordinatorCompactionConfig,
stats,
balancerReferenceTimestamp,
balancerStrategy,
broadcastDatasources
);
@ -246,7 +235,6 @@ public class DruidCoordinatorRuntimeParams
coordinatorDynamicConfig,
coordinatorCompactionConfig,
stats,
balancerReferenceTimestamp,
balancerStrategy,
broadcastDatasources
);
@ -266,7 +254,6 @@ public class DruidCoordinatorRuntimeParams
private CoordinatorDynamicConfig coordinatorDynamicConfig;
private CoordinatorCompactionConfig coordinatorCompactionConfig;
private CoordinatorStats stats;
private DateTime balancerReferenceTimestamp;
private BalancerStrategy balancerStrategy;
private Set<String> broadcastDatasources;
@ -284,7 +271,6 @@ public class DruidCoordinatorRuntimeParams
this.stats = new CoordinatorStats();
this.coordinatorDynamicConfig = CoordinatorDynamicConfig.builder().build();
this.coordinatorCompactionConfig = CoordinatorCompactionConfig.empty();
this.balancerReferenceTimestamp = DateTimes.nowUtc();
this.broadcastDatasources = new HashSet<>();
}
@ -301,7 +287,6 @@ public class DruidCoordinatorRuntimeParams
CoordinatorDynamicConfig coordinatorDynamicConfig,
CoordinatorCompactionConfig coordinatorCompactionConfig,
CoordinatorStats stats,
DateTime balancerReferenceTimestamp,
BalancerStrategy balancerStrategy,
Set<String> broadcastDatasources
)
@ -318,7 +303,6 @@ public class DruidCoordinatorRuntimeParams
this.coordinatorDynamicConfig = coordinatorDynamicConfig;
this.coordinatorCompactionConfig = coordinatorCompactionConfig;
this.stats = stats;
this.balancerReferenceTimestamp = balancerReferenceTimestamp;
this.balancerStrategy = balancerStrategy;
this.broadcastDatasources = broadcastDatasources;
}
@ -339,7 +323,6 @@ public class DruidCoordinatorRuntimeParams
coordinatorDynamicConfig,
coordinatorCompactionConfig,
stats,
balancerReferenceTimestamp,
balancerStrategy,
broadcastDatasources
);
@ -442,12 +425,6 @@ public class DruidCoordinatorRuntimeParams
return this;
}
public Builder withBalancerReferenceTimestamp(DateTime balancerReferenceTimestamp)
{
this.balancerReferenceTimestamp = balancerReferenceTimestamp;
return this;
}
public Builder withBalancerStrategy(BalancerStrategy balancerStrategy)
{
this.balancerStrategy = balancerStrategy;

View File

@ -342,8 +342,9 @@ public class CachingClusteredClientFunctionalityTest
final ResponseContext responseContext
)
{
return client.getQueryRunnerForIntervals(query, query.getIntervals()).run(
QueryPlus.wrap(query),
final Query<T> theQuery = query.withId("queryId");
return client.getQueryRunnerForIntervals(theQuery, theQuery.getIntervals()).run(
QueryPlus.wrap(theQuery),
responseContext
);
}

View File

@ -86,6 +86,7 @@ import org.apache.druid.query.aggregation.post.ArithmeticPostAggregator;
import org.apache.druid.query.aggregation.post.ConstantPostAggregator;
import org.apache.druid.query.aggregation.post.FieldAccessPostAggregator;
import org.apache.druid.query.context.ResponseContext;
import org.apache.druid.query.context.ResponseContext.Key;
import org.apache.druid.query.dimension.DefaultDimensionSpec;
import org.apache.druid.query.filter.AndDimFilter;
import org.apache.druid.query.filter.BoundDimFilter;
@ -156,6 +157,7 @@ import java.util.Optional;
import java.util.Random;
import java.util.TreeMap;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.Executor;
import java.util.concurrent.ForkJoinPool;
@ -431,7 +433,8 @@ public class CachingClusteredClientTest
.granularity(GRANULARITY)
.aggregators(AGGS)
.postAggregators(POST_AGGS)
.context(CONTEXT);
.context(CONTEXT)
.randomQueryId();
QueryRunner runner = new FinalizeResultsQueryRunner(getDefaultQueryRunner(), new TimeseriesQueryQueryToolChest());
@ -472,7 +475,7 @@ public class CachingClusteredClientTest
testQueryCaching(
runner,
builder.build(),
builder.randomQueryId().build(),
Intervals.of("2011-01-01/2011-01-02"), makeTimeResults(DateTimes.of("2011-01-01"), 50, 5000),
Intervals.of("2011-01-02/2011-01-03"), makeTimeResults(DateTimes.of("2011-01-02"), 30, 6000),
Intervals.of("2011-01-04/2011-01-05"), makeTimeResults(DateTimes.of("2011-01-04"), 23, 85312),
@ -500,6 +503,7 @@ public class CachingClusteredClientTest
TimeseriesQuery query = builder.intervals("2011-01-01/2011-01-10")
.aggregators(RENAMED_AGGS)
.postAggregators(RENAMED_POST_AGGS)
.randomQueryId()
.build();
TestHelper.assertExpectedResults(
makeRenamedTimeResults(
@ -536,9 +540,10 @@ public class CachingClusteredClientTest
.aggregators(AGGS)
.postAggregators(POST_AGGS)
.context(CONTEXT)
.randomQueryId()
.build();
final ResponseContext context = ResponseContext.createEmpty();
final ResponseContext context = initializeResponseContext();
final Cache cache = EasyMock.createStrictMock(Cache.class);
final Capture<Iterable<Cache.NamedKey>> cacheKeyCapture = EasyMock.newCapture();
EasyMock.expect(cache.getBulk(EasyMock.capture(cacheKeyCapture)))
@ -594,7 +599,7 @@ public class CachingClusteredClientTest
testQueryCaching(
runner,
builder.build(),
builder.randomQueryId().build(),
Intervals.of("2011-01-05/2011-01-10"),
makeTimeResults(
DateTimes.of("2011-01-05T02"), 80, 100,
@ -617,6 +622,7 @@ public class CachingClusteredClientTest
.intervals("2011-01-05/2011-01-10")
.aggregators(RENAMED_AGGS)
.postAggregators(RENAMED_POST_AGGS)
.randomQueryId()
.build();
TestHelper.assertExpectedResults(
makeRenamedTimeResults(
@ -652,7 +658,7 @@ public class CachingClusteredClientTest
testQueryCaching(
runner,
builder.build(),
builder.randomQueryId().build(),
Intervals.of("2011-11-04/2011-11-08"),
makeTimeResults(
new DateTime("2011-11-04", TIMEZONE), 50, 5000,
@ -665,6 +671,7 @@ public class CachingClusteredClientTest
.intervals("2011-11-04/2011-11-08")
.aggregators(RENAMED_AGGS)
.postAggregators(RENAMED_POST_AGGS)
.randomQueryId()
.build();
TestHelper.assertExpectedResults(
makeRenamedTimeResults(
@ -700,7 +707,7 @@ public class CachingClusteredClientTest
"useCache", "false",
"populateCache", "true"
)
).build(),
).randomQueryId().build(),
Intervals.of("2011-01-01/2011-01-02"), makeTimeResults(DateTimes.of("2011-01-01"), 50, 5000)
);
@ -719,7 +726,7 @@ public class CachingClusteredClientTest
"useCache", "false",
"populateCache", "false"
)
).build(),
).randomQueryId().build(),
Intervals.of("2011-01-01/2011-01-02"), makeTimeResults(DateTimes.of("2011-01-01"), 50, 5000)
);
@ -736,7 +743,7 @@ public class CachingClusteredClientTest
"useCache", "true",
"populateCache", "false"
)
).build(),
).randomQueryId().build(),
Intervals.of("2011-01-01/2011-01-02"), makeTimeResults(DateTimes.of("2011-01-01"), 50, 5000)
);
@ -768,7 +775,7 @@ public class CachingClusteredClientTest
testQueryCaching(
runner,
builder.build(),
builder.randomQueryId().build(),
Intervals.of("2011-01-01/2011-01-02"),
makeTopNResultsWithoutRename(DateTimes.of("2011-01-01"), "a", 50, 5000, "b", 50, 4999, "c", 50, 4998),
@ -798,6 +805,7 @@ public class CachingClusteredClientTest
.metric("imps")
.aggregators(RENAMED_AGGS)
.postAggregators(DIFF_ORDER_POST_AGGS)
.randomQueryId()
.build();
TestHelper.assertExpectedResults(
makeRenamedTopNResults(
@ -841,7 +849,7 @@ public class CachingClusteredClientTest
testQueryCaching(
runner,
builder.build(),
builder.randomQueryId().build(),
Intervals.of("2011-11-04/2011-11-08"),
makeTopNResultsWithoutRename(
new DateTime("2011-11-04", TIMEZONE), "a", 50, 4994, "b", 50, 4993, "c", 50, 4992,
@ -855,6 +863,7 @@ public class CachingClusteredClientTest
.metric("imps")
.aggregators(RENAMED_AGGS)
.postAggregators(DIFF_ORDER_POST_AGGS)
.randomQueryId()
.build();
TestHelper.assertExpectedResults(
makeRenamedTopNResults(
@ -908,6 +917,7 @@ public class CachingClusteredClientTest
.metric("b")
.threshold(3)
.aggregators(new CountAggregatorFactory("b"))
.randomQueryId()
.build(),
sequences
)
@ -942,7 +952,7 @@ public class CachingClusteredClientTest
);
testQueryCaching(
runner,
builder.build(),
builder.randomQueryId().build(),
Intervals.of("2011-01-01/2011-01-02"),
makeTopNResultsWithoutRename(),
@ -973,6 +983,7 @@ public class CachingClusteredClientTest
.metric("imps")
.aggregators(RENAMED_AGGS)
.postAggregators(DIFF_ORDER_POST_AGGS)
.randomQueryId()
.build();
TestHelper.assertExpectedResults(
makeRenamedTopNResults(
@ -1013,7 +1024,7 @@ public class CachingClusteredClientTest
testQueryCaching(
runner,
builder.build(),
builder.randomQueryId().build(),
Intervals.of("2011-01-01/2011-01-02"),
makeTopNResultsWithoutRename(),
@ -1044,6 +1055,7 @@ public class CachingClusteredClientTest
.metric("avg_imps_per_row_double")
.aggregators(AGGS)
.postAggregators(DIFF_ORDER_POST_AGGS)
.randomQueryId()
.build();
TestHelper.assertExpectedResults(
makeTopNResultsWithoutRename(
@ -1077,7 +1089,7 @@ public class CachingClusteredClientTest
testQueryCaching(
getDefaultQueryRunner(),
builder.build(),
builder.randomQueryId().build(),
Intervals.of("2011-01-01/2011-01-02"),
makeSearchResults(TOP_DIM, DateTimes.of("2011-01-01"), "how", 1, "howdy", 2, "howwwwww", 3, "howwy", 4),
@ -1126,7 +1138,7 @@ public class CachingClusteredClientTest
DateTimes.of("2011-01-09"), "how6", 1, "howdy6", 2, "howwwwww6", 3, "howww6", 4,
DateTimes.of("2011-01-09T01"), "how6", 1, "howdy6", 2, "howwwwww6", 3, "howww6", 4
),
runner.run(QueryPlus.wrap(builder.intervals("2011-01-01/2011-01-10").build()))
runner.run(QueryPlus.wrap(builder.randomQueryId().intervals("2011-01-01/2011-01-10").build()))
);
}
@ -1145,7 +1157,7 @@ public class CachingClusteredClientTest
testQueryCaching(
getDefaultQueryRunner(),
builder.build(),
builder.randomQueryId().build(),
Intervals.of("2011-01-01/2011-01-02"),
makeSearchResults(TOP_DIM, DateTimes.of("2011-01-01"), "how", 1, "howdy", 2, "howwwwww", 3, "howwy", 4),
@ -1178,7 +1190,7 @@ public class CachingClusteredClientTest
new SearchQueryQueryToolChest(new SearchQueryConfig())
);
ResponseContext context = ResponseContext.createEmpty();
ResponseContext context = initializeResponseContext();
TestHelper.assertExpectedResults(
makeSearchResults(
TOP_DIM,
@ -1195,11 +1207,12 @@ public class CachingClusteredClientTest
DateTimes.of("2011-01-09"), "how6", 1, "howdy6", 2, "howwwwww6", 3, "howww6", 4,
DateTimes.of("2011-01-09T01"), "how6", 1, "howdy6", 2, "howwwwww6", 3, "howww6", 4
),
runner.run(QueryPlus.wrap(builder.intervals("2011-01-01/2011-01-10").build()), context)
runner.run(QueryPlus.wrap(builder.randomQueryId().intervals("2011-01-01/2011-01-10").build()), context)
);
SearchQuery query = builder
.intervals("2011-01-01/2011-01-10")
.dimensions(new DefaultDimensionSpec(TOP_DIM, "new_dim"))
.randomQueryId()
.build();
TestHelper.assertExpectedResults(
makeSearchResults(
@ -1244,7 +1257,7 @@ public class CachingClusteredClientTest
collector.add(hashFn.hashString("abc123", StandardCharsets.UTF_8).asBytes());
collector.add(hashFn.hashString("123abc", StandardCharsets.UTF_8).asBytes());
final GroupByQuery query = builder.build();
final GroupByQuery query = builder.randomQueryId().build();
testQueryCaching(
getDefaultQueryRunner(),
@ -1322,7 +1335,7 @@ public class CachingClusteredClientTest
DateTimes.of("2011-01-09T01"),
ImmutableMap.of("a", "g", "rows", 7, "imps", 7, "impers", 7, "uniques", collector)
),
runner.run(QueryPlus.wrap(builder.setInterval("2011-01-05/2011-01-10").build())),
runner.run(QueryPlus.wrap(builder.randomQueryId().setInterval("2011-01-05/2011-01-10").build())),
""
);
}
@ -1336,6 +1349,7 @@ public class CachingClusteredClientTest
.dataSource(CachingClusteredClientTest.DATA_SOURCE)
.intervals(CachingClusteredClientTest.SEG_SPEC)
.context(CachingClusteredClientTest.CONTEXT)
.randomQueryId()
.build(),
Intervals.of("2011-01-01/2011-01-02"),
makeTimeBoundaryResult(DateTimes.of("2011-01-01"), DateTimes.of("2011-01-01"), DateTimes.of("2011-01-02")),
@ -1357,6 +1371,7 @@ public class CachingClusteredClientTest
.intervals(CachingClusteredClientTest.SEG_SPEC)
.context(CachingClusteredClientTest.CONTEXT)
.bound(TimeBoundaryQuery.MAX_TIME)
.randomQueryId()
.build(),
Intervals.of("2011-01-01/2011-01-02"),
makeTimeBoundaryResult(DateTimes.of("2011-01-02"), null, DateTimes.of("2011-01-02")),
@ -1375,6 +1390,7 @@ public class CachingClusteredClientTest
.intervals(CachingClusteredClientTest.SEG_SPEC)
.context(CachingClusteredClientTest.CONTEXT)
.bound(TimeBoundaryQuery.MIN_TIME)
.randomQueryId()
.build(),
Intervals.of("2011-01-01/2011-01-02"),
makeTimeBoundaryResult(DateTimes.of("2011-01-01"), DateTimes.of("2011-01-01"), null),
@ -1436,7 +1452,7 @@ public class CachingClusteredClientTest
testQueryCachingWithFilter(
runner,
3,
builder.build(),
builder.randomQueryId().build(),
expectedResult,
Intervals.of("2011-01-01/2011-01-05"), makeTimeResults(DateTimes.of("2011-01-01"), 50, 5000),
Intervals.of("2011-01-01/2011-01-05"), makeTimeResults(DateTimes.of("2011-01-02"), 10, 1252),
@ -1477,7 +1493,7 @@ public class CachingClusteredClientTest
.aggregators(RENAMED_AGGS)
.postAggregators(RENAMED_POST_AGGS);
TimeseriesQuery query = builder.build();
TimeseriesQuery query = builder.randomQueryId().build();
final Interval interval1 = Intervals.of("2011-01-06/2011-01-07");
final Interval interval2 = Intervals.of("2011-01-07/2011-01-08");
@ -1768,7 +1784,6 @@ public class CachingClusteredClientTest
Object... args // does this assume query intervals must be ordered?
)
{
final List<Interval> queryIntervals = Lists.newArrayListWithCapacity(args.length / 2);
final List<List<Iterable<Result<Object>>>> expectedResults = Lists.newArrayListWithCapacity(queryIntervals.size());
@ -1922,7 +1937,8 @@ public class CachingClusteredClientTest
query.withQuerySegmentSpec(
new MultipleIntervalSegmentSpec(ImmutableList.of(actualQueryInterval))
)
)
),
initializeResponseContext()
)
);
if (queryCompletedCallback != null) {
@ -2739,6 +2755,7 @@ public class CachingClusteredClientTest
.dataSource(CachingClusteredClientTest.DATA_SOURCE)
.intervals(CachingClusteredClientTest.SEG_SPEC)
.context(CachingClusteredClientTest.CONTEXT)
.randomQueryId()
.build(),
Intervals.of("1970-01-01/1970-01-02"),
makeTimeBoundaryResult(DateTimes.of("1970-01-01"), DateTimes.of("1970-01-01"), DateTimes.of("1970-01-02")),
@ -2760,6 +2777,7 @@ public class CachingClusteredClientTest
.intervals(CachingClusteredClientTest.SEG_SPEC)
.context(CachingClusteredClientTest.CONTEXT)
.bound(TimeBoundaryQuery.MAX_TIME)
.randomQueryId()
.build(),
Intervals.of("1970-01-01/2011-01-02"),
makeTimeBoundaryResult(DateTimes.of("1970-01-02"), null, DateTimes.of("1970-01-02")),
@ -2778,6 +2796,7 @@ public class CachingClusteredClientTest
.intervals(CachingClusteredClientTest.SEG_SPEC)
.context(CachingClusteredClientTest.CONTEXT)
.bound(TimeBoundaryQuery.MIN_TIME)
.randomQueryId()
.build(),
Intervals.of("1970-01-01/2011-01-02"),
makeTimeBoundaryResult(DateTimes.of("1970-01-01"), DateTimes.of("1970-01-01"), null),
@ -2804,7 +2823,7 @@ public class CachingClusteredClientTest
.setAggregatorSpecs(AGGS)
.setContext(CONTEXT);
final GroupByQuery query1 = builder.build();
final GroupByQuery query1 = builder.randomQueryId().build();
testQueryCaching(
getDefaultQueryRunner(),
query1,
@ -2847,7 +2866,7 @@ public class CachingClusteredClientTest
getDefaultQueryRunner(),
WAREHOUSE.getToolChest(query1)
);
ResponseContext context = ResponseContext.createEmpty();
final ResponseContext context = initializeResponseContext();
TestHelper.assertExpectedObjects(
makeGroupByResults(
query1,
@ -2862,13 +2881,14 @@ public class CachingClusteredClientTest
DateTimes.of("2011-01-09T"), ImmutableMap.of("output", "g", "rows", 7, "imps", 7, "impers", 7),
DateTimes.of("2011-01-09T01"), ImmutableMap.of("output", "g", "rows", 7, "imps", 7, "impers", 7)
),
runner.run(QueryPlus.wrap(builder.setInterval("2011-01-05/2011-01-10").build()), context),
runner.run(QueryPlus.wrap(builder.randomQueryId().setInterval("2011-01-05/2011-01-10").build()), context),
""
);
final GroupByQuery query2 = builder
.setInterval("2011-01-05/2011-01-10").setDimensions(new DefaultDimensionSpec("a", "output2"))
.setAggregatorSpecs(RENAMED_AGGS)
.randomQueryId()
.build();
TestHelper.assertExpectedObjects(
makeGroupByResults(
@ -2918,10 +2938,11 @@ public class CachingClusteredClientTest
.dataSource(DATA_SOURCE)
.intervals(new MultipleIntervalSegmentSpec(ImmutableList.of(interval)))
.context(ImmutableMap.of("If-None-Match", "aVJV29CJY93rszVW/QBy0arWZo0="))
.randomQueryId()
.build();
ResponseContext responseContext = ResponseContext.createEmpty();
final ResponseContext responseContext = initializeResponseContext();
getDefaultQueryRunner().run(QueryPlus.wrap(query), responseContext);
Assert.assertEquals("MDs2yIUvYLVzaG6zmwTH1plqaYE=", responseContext.get(ResponseContext.Key.ETAG));
@ -2958,16 +2979,18 @@ public class CachingClusteredClientTest
.dataSource(DATA_SOURCE)
.intervals(new MultipleIntervalSegmentSpec(ImmutableList.of(queryInterval)))
.context(ImmutableMap.of("If-None-Match", "aVJV29CJY93rszVW/QBy0arWZo0="))
.randomQueryId()
.build();
final TimeBoundaryQuery query2 = Druids.newTimeBoundaryQueryBuilder()
.dataSource(DATA_SOURCE)
.intervals(new MultipleIntervalSegmentSpec(ImmutableList.of(queryInterval2)))
.context(ImmutableMap.of("If-None-Match", "aVJV29CJY93rszVW/QBy0arWZo0="))
.randomQueryId()
.build();
final ResponseContext responseContext = ResponseContext.createEmpty();
final ResponseContext responseContext = initializeResponseContext();
getDefaultQueryRunner().run(QueryPlus.wrap(query), responseContext);
final Object etag1 = responseContext.get(ResponseContext.Key.ETAG);
@ -2989,4 +3012,11 @@ public class CachingClusteredClientTest
}
};
}
private static ResponseContext initializeResponseContext()
{
final ResponseContext context = ResponseContext.createEmpty();
context.put(Key.REMAINING_RESPONSES_FROM_QUERY_SERVERS, new ConcurrentHashMap<>());
return context;
}
}

View File

@ -0,0 +1,68 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.client;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.query.BySegmentQueryRunner;
import org.apache.druid.query.FinalizeResultsQueryRunner;
import org.apache.druid.query.QueryPlus;
import org.apache.druid.query.QueryRunner;
import org.apache.druid.query.QueryRunnerFactory;
import org.apache.druid.query.QueryRunnerFactoryConglomerate;
import org.apache.druid.query.context.ResponseContext;
import org.apache.druid.segment.QueryableIndex;
import org.apache.druid.segment.QueryableIndexSegment;
import org.apache.druid.timeline.SegmentId;
/**
* A simple query runner for testing that can process only one segment.
*/
public class SimpleQueryRunner implements QueryRunner<Object>
{
private final QueryRunnerFactoryConglomerate conglomerate;
private final QueryableIndexSegment segment;
public SimpleQueryRunner(
QueryRunnerFactoryConglomerate conglomerate,
SegmentId segmentId,
QueryableIndex queryableIndex
)
{
this.conglomerate = conglomerate;
this.segment = new QueryableIndexSegment(queryableIndex, segmentId);
}
@Override
public Sequence<Object> run(QueryPlus<Object> queryPlus, ResponseContext responseContext)
{
final QueryRunnerFactory factory = conglomerate.findFactory(queryPlus.getQuery());
//noinspection unchecked
return factory.getToolchest().preMergeQueryDecoration(
new FinalizeResultsQueryRunner<>(
new BySegmentQueryRunner<>(
segment.getId(),
segment.getDataInterval().getStart(),
factory.createRunner(segment)
),
factory.getToolchest()
)
).run(queryPlus, responseContext);
}
}

View File

@ -0,0 +1,185 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.client;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions;
import com.google.common.collect.Ordering;
import org.apache.druid.client.selector.HighestPriorityTierSelectorStrategy;
import org.apache.druid.client.selector.QueryableDruidServer;
import org.apache.druid.client.selector.RandomServerSelectorStrategy;
import org.apache.druid.client.selector.ServerSelector;
import org.apache.druid.client.selector.TierSelectorStrategy;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.http.client.HttpClient;
import org.apache.druid.query.QueryRunner;
import org.apache.druid.query.QueryToolChestWarehouse;
import org.apache.druid.query.QueryWatcher;
import org.apache.druid.query.planning.DataSourceAnalysis;
import org.apache.druid.server.coordination.ServerType;
import org.apache.druid.server.metrics.NoopServiceEmitter;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.TimelineLookup;
import org.apache.druid.timeline.VersionedIntervalTimeline;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.Executor;
/**
* A simple broker server view for testing which you can manually update the server view.
*/
public class SimpleServerView implements TimelineServerView
{
private static final QueryWatcher NOOP_QUERY_WATCHER = (query, future) -> {};
private final TierSelectorStrategy tierSelectorStrategy = new HighestPriorityTierSelectorStrategy(
new RandomServerSelectorStrategy()
);
// server -> queryRunner
private final Map<DruidServer, QueryableDruidServer> servers = new HashMap<>();
// segmentId -> serverSelector
private final Map<String, ServerSelector> selectors = new HashMap<>();
// dataSource -> version -> serverSelector
private final Map<String, VersionedIntervalTimeline<String, ServerSelector>> timelines = new HashMap<>();
private final QueryToolChestWarehouse warehouse;
private final ObjectMapper objectMapper;
private final HttpClient httpClient;
public SimpleServerView(
QueryToolChestWarehouse warehouse,
ObjectMapper objectMapper,
HttpClient httpClient
)
{
this.warehouse = warehouse;
this.objectMapper = objectMapper;
this.httpClient = httpClient;
}
public void addServer(DruidServer server, DataSegment dataSegment)
{
servers.put(
server,
new QueryableDruidServer<>(
server,
new DirectDruidClient<>(
warehouse,
NOOP_QUERY_WATCHER,
objectMapper,
httpClient,
server.getScheme(),
server.getHost(),
new NoopServiceEmitter()
)
)
);
addSegmentToServer(server, dataSegment);
}
public void removeServer(DruidServer server)
{
servers.remove(server);
}
public void unannounceSegmentFromServer(DruidServer server, DataSegment segment)
{
final QueryableDruidServer queryableDruidServer = servers.get(server);
if (queryableDruidServer == null) {
throw new ISE("Unknown server [%s]", server);
}
final ServerSelector selector = selectors.get(segment.getId().toString());
if (selector == null) {
throw new ISE("Unknown segment [%s]", segment.getId());
}
if (!selector.removeServer(queryableDruidServer)) {
throw new ISE("Failed to remove segment[%s] from server[%s]", segment.getId(), server);
}
final VersionedIntervalTimeline<String, ServerSelector> timeline = timelines.get(segment.getDataSource());
if (timeline == null) {
throw new ISE("Unknown datasource [%s]", segment.getDataSource());
}
timeline.remove(segment.getInterval(), segment.getVersion(), segment.getShardSpec().createChunk(selector));
}
private void addSegmentToServer(DruidServer server, DataSegment segment)
{
final ServerSelector selector = selectors.computeIfAbsent(
segment.getId().toString(),
k -> new ServerSelector(segment, tierSelectorStrategy)
);
selector.addServerAndUpdateSegment(servers.get(server), segment);
timelines.computeIfAbsent(segment.getDataSource(), k -> new VersionedIntervalTimeline<>(Ordering.natural()))
.add(segment.getInterval(), segment.getVersion(), segment.getShardSpec().createChunk(selector));
}
@Override
public Optional<? extends TimelineLookup<String, ServerSelector>> getTimeline(DataSourceAnalysis analysis)
{
return Optional.ofNullable(timelines.get(analysis.getBaseTableDataSource().get().getName()));
}
@Override
public List<ImmutableDruidServer> getDruidServers()
{
return Collections.emptyList();
}
@Override
public <T> QueryRunner<T> getQueryRunner(DruidServer server)
{
final QueryableDruidServer queryableDruidServer = Preconditions.checkNotNull(servers.get(server), "server");
return (QueryRunner<T>) queryableDruidServer.getQueryRunner();
}
@Override
public void registerTimelineCallback(Executor exec, TimelineCallback callback)
{
// do nothing
}
@Override
public void registerServerRemovedCallback(Executor exec, ServerRemovedCallback callback)
{
// do nothing
}
@Override
public void registerSegmentCallback(Executor exec, SegmentCallback callback)
{
// do nothing
}
public static DruidServer createServer(int nameSuiffix)
{
return new DruidServer(
"server_" + nameSuiffix,
"127.0.0." + nameSuiffix,
null,
Long.MAX_VALUE,
ServerType.HISTORICAL,
"default",
0
);
}
}

View File

@ -0,0 +1,188 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.client;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.NonnullPair;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.http.client.HttpClient;
import org.apache.druid.java.util.http.client.Request;
import org.apache.druid.java.util.http.client.response.ClientResponse;
import org.apache.druid.java.util.http.client.response.HttpResponseHandler;
import org.apache.druid.java.util.http.client.response.HttpResponseHandler.TrafficCop;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryPlus;
import org.apache.druid.query.QueryRunner;
import org.apache.druid.query.QueryRunnerFactoryConglomerate;
import org.apache.druid.query.ReportTimelineMissingSegmentQueryRunner;
import org.apache.druid.query.SegmentDescriptor;
import org.apache.druid.query.context.ResponseContext;
import org.apache.druid.segment.QueryableIndex;
import org.apache.druid.server.QueryResource;
import org.apache.druid.timeline.DataSegment;
import org.jboss.netty.buffer.HeapChannelBufferFactory;
import org.jboss.netty.handler.codec.http.DefaultHttpResponse;
import org.jboss.netty.handler.codec.http.HttpResponse;
import org.jboss.netty.handler.codec.http.HttpResponseStatus;
import org.jboss.netty.handler.codec.http.HttpVersion;
import org.joda.time.Duration;
import javax.annotation.Nullable;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.HashMap;
import java.util.Map;
/**
* An HTTP client for testing which emulates querying data nodes (historicals or realtime tasks).
*/
public class TestHttpClient implements HttpClient
{
private static final TrafficCop NOOP_TRAFFIC_COP = checkNum -> 0L;
private static final int RESPONSE_CTX_HEADER_LEN_LIMIT = 7 * 1024;
private final Map<URL, SimpleServerManager> servers = new HashMap<>();
private final ObjectMapper objectMapper;
public TestHttpClient(ObjectMapper objectMapper)
{
this.objectMapper = objectMapper;
}
public void addServerAndRunner(DruidServer server, SimpleServerManager serverManager)
{
servers.put(computeUrl(server), serverManager);
}
@Nullable
public SimpleServerManager getServerManager(DruidServer server)
{
return servers.get(computeUrl(server));
}
public Map<URL, SimpleServerManager> getServers()
{
return servers;
}
private static URL computeUrl(DruidServer server)
{
try {
return new URL(StringUtils.format("%s://%s/druid/v2/", server.getScheme(), server.getHost()));
}
catch (MalformedURLException e) {
throw new RuntimeException(e);
}
}
@Override
public <Intermediate, Final> ListenableFuture<Final> go(
Request request,
HttpResponseHandler<Intermediate, Final> handler
)
{
throw new UnsupportedOperationException();
}
@Override
public <Intermediate, Final> ListenableFuture<Final> go(
Request request,
HttpResponseHandler<Intermediate, Final> handler,
Duration readTimeout
)
{
try {
final Query query = objectMapper.readValue(request.getContent().array(), Query.class);
final QueryRunner queryRunner = servers.get(request.getUrl()).getQueryRunner();
if (queryRunner == null) {
throw new ISE("Can't find queryRunner for url[%s]", request.getUrl());
}
final ResponseContext responseContext = ResponseContext.createEmpty();
final Sequence sequence = queryRunner.run(QueryPlus.wrap(query), responseContext);
final byte[] serializedContent;
try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
objectMapper.writeValue(baos, sequence);
serializedContent = baos.toByteArray();
}
final ResponseContext.SerializationResult serializationResult = responseContext.serializeWith(
objectMapper,
RESPONSE_CTX_HEADER_LEN_LIMIT
);
final HttpResponse response = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
response.headers().add(QueryResource.HEADER_RESPONSE_CONTEXT, serializationResult.getResult());
response.setContent(
HeapChannelBufferFactory.getInstance().getBuffer(serializedContent, 0, serializedContent.length)
);
final ClientResponse<Intermediate> intermClientResponse = handler.handleResponse(response, NOOP_TRAFFIC_COP);
final ClientResponse<Final> finalClientResponse = handler.done(intermClientResponse);
return Futures.immediateFuture(finalClientResponse.getObj());
}
catch (IOException e) {
throw new RuntimeException(e);
}
}
/**
* A simple server manager for testing which you can manually drop a segment. Currently used for
* testing {@link org.apache.druid.query.RetryQueryRunner}.
*/
public static class SimpleServerManager
{
private final QueryRunnerFactoryConglomerate conglomerate;
private final DataSegment segment;
private final QueryableIndex queryableIndex;
private boolean isSegmentDropped = false;
public SimpleServerManager(
QueryRunnerFactoryConglomerate conglomerate,
DataSegment segment,
QueryableIndex queryableIndex
)
{
this.conglomerate = conglomerate;
this.segment = segment;
this.queryableIndex = queryableIndex;
}
private QueryRunner getQueryRunner()
{
if (isSegmentDropped) {
return new ReportTimelineMissingSegmentQueryRunner(
new SegmentDescriptor(segment.getInterval(), segment.getVersion(), segment.getId().getPartitionNum())
);
} else {
return new SimpleQueryRunner(conglomerate, segment.getId(), queryableIndex);
}
}
public NonnullPair<DataSegment, QueryableIndex> dropSegment()
{
this.isSegmentDropped = true;
return new NonnullPair<>(segment, queryableIndex);
}
}
}

View File

@ -0,0 +1,358 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.client.CachingClusteredClient;
import org.apache.druid.client.DirectDruidClient;
import org.apache.druid.client.DruidServer;
import org.apache.druid.client.SimpleServerView;
import org.apache.druid.client.TestHttpClient;
import org.apache.druid.client.TestHttpClient.SimpleServerManager;
import org.apache.druid.client.cache.CacheConfig;
import org.apache.druid.client.cache.CachePopulatorStats;
import org.apache.druid.client.cache.ForegroundCachePopulator;
import org.apache.druid.client.cache.MapCache;
import org.apache.druid.guice.http.DruidHttpClientConfig;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.NonnullPair;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.query.aggregation.CountAggregatorFactory;
import org.apache.druid.query.context.ConcurrentResponseContext;
import org.apache.druid.query.context.ResponseContext;
import org.apache.druid.query.context.ResponseContext.Key;
import org.apache.druid.query.timeseries.TimeseriesResultValue;
import org.apache.druid.segment.QueryableIndex;
import org.apache.druid.segment.SegmentMissingException;
import org.apache.druid.segment.generator.GeneratorBasicSchemas;
import org.apache.druid.segment.generator.GeneratorSchemaInfo;
import org.apache.druid.segment.generator.SegmentGenerator;
import org.apache.druid.server.QueryStackTests;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.NumberedShardSpec;
import org.joda.time.Interval;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ForkJoinPool;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
public class RetryQueryRunnerTest
{
private static final Closer CLOSER = Closer.create();
private static final String DATASOURCE = "datasource";
private static final GeneratorSchemaInfo SCHEMA_INFO = GeneratorBasicSchemas.SCHEMA_MAP.get("basic");
private static final boolean USE_PARALLEL_MERGE_POOL_CONFIGURED = false;
@Rule
public ExpectedException expectedException = ExpectedException.none();
private final ObjectMapper objectMapper = new DefaultObjectMapper();
private final QueryToolChestWarehouse toolChestWarehouse;
private final QueryRunnerFactoryConglomerate conglomerate;
private SegmentGenerator segmentGenerator;
private TestHttpClient httpClient;
private SimpleServerView simpleServerView;
private CachingClusteredClient cachingClusteredClient;
private List<DruidServer> servers;
public RetryQueryRunnerTest()
{
conglomerate = QueryStackTests.createQueryRunnerFactoryConglomerate(CLOSER, USE_PARALLEL_MERGE_POOL_CONFIGURED);
toolChestWarehouse = new QueryToolChestWarehouse()
{
@Override
public <T, QueryType extends Query<T>> QueryToolChest<T, QueryType> getToolChest(final QueryType query)
{
return conglomerate.findFactory(query).getToolchest();
}
};
}
@AfterClass
public static void tearDownClass() throws IOException
{
CLOSER.close();
}
@Before
public void setup()
{
segmentGenerator = new SegmentGenerator();
httpClient = new TestHttpClient(objectMapper);
simpleServerView = new SimpleServerView(toolChestWarehouse, objectMapper, httpClient);
cachingClusteredClient = new CachingClusteredClient(
toolChestWarehouse,
simpleServerView,
MapCache.create(0),
objectMapper,
new ForegroundCachePopulator(objectMapper, new CachePopulatorStats(), 0),
new CacheConfig(),
new DruidHttpClientConfig(),
QueryStackTests.getProcessingConfig(USE_PARALLEL_MERGE_POOL_CONFIGURED),
ForkJoinPool.commonPool(),
QueryStackTests.DEFAULT_NOOP_SCHEDULER
);
servers = new ArrayList<>();
}
@After
public void tearDown() throws IOException
{
segmentGenerator.close();
}
private void addServer(DruidServer server, DataSegment dataSegment, QueryableIndex queryableIndex)
{
servers.add(server);
simpleServerView.addServer(server, dataSegment);
httpClient.addServerAndRunner(server, new SimpleServerManager(conglomerate, dataSegment, queryableIndex));
}
@Test
public void testNoRetry()
{
prepareCluster(10);
final Query<Result<TimeseriesResultValue>> query = timeseriesQuery(SCHEMA_INFO.getDataInterval());
final RetryQueryRunner<Result<TimeseriesResultValue>> queryRunner = createQueryRunner(
newRetryQueryRunnerConfig(1, false),
query,
() -> {}
);
final Sequence<Result<TimeseriesResultValue>> sequence = queryRunner.run(QueryPlus.wrap(query), responseContext());
final List<Result<TimeseriesResultValue>> queryResult = sequence.toList();
Assert.assertEquals(0, queryRunner.getTotalNumRetries());
Assert.assertFalse(queryResult.isEmpty());
Assert.assertEquals(expectedTimeseriesResult(10), queryResult);
}
@Test
public void testRetryForMovedSegment()
{
prepareCluster(10);
final Query<Result<TimeseriesResultValue>> query = timeseriesQuery(SCHEMA_INFO.getDataInterval());
final RetryQueryRunner<Result<TimeseriesResultValue>> queryRunner = createQueryRunner(
newRetryQueryRunnerConfig(1, true),
query,
() -> {
// Let's move a segment
dropSegmentFromServerAndAddNewServerForSegment(servers.get(0));
}
);
final Sequence<Result<TimeseriesResultValue>> sequence = queryRunner.run(QueryPlus.wrap(query), responseContext());
final List<Result<TimeseriesResultValue>> queryResult = sequence.toList();
Assert.assertEquals(1, queryRunner.getTotalNumRetries());
// Note that we dropped a segment from a server, but it's still announced in the server view.
// As a result, we may get the full result or not depending on what server will get the retry query.
// If we hit the same server, the query will return incomplete result.
Assert.assertTrue(queryResult.size() == 9 || queryResult.size() == 10);
Assert.assertEquals(expectedTimeseriesResult(queryResult.size()), queryResult);
}
@Test
public void testRetryUntilWeGetFullResult()
{
prepareCluster(10);
final Query<Result<TimeseriesResultValue>> query = timeseriesQuery(SCHEMA_INFO.getDataInterval());
final RetryQueryRunner<Result<TimeseriesResultValue>> queryRunner = createQueryRunner(
newRetryQueryRunnerConfig(100, false), // retry up to 100
query,
() -> {
// Let's move a segment
dropSegmentFromServerAndAddNewServerForSegment(servers.get(0));
}
);
final Sequence<Result<TimeseriesResultValue>> sequence = queryRunner.run(QueryPlus.wrap(query), responseContext());
final List<Result<TimeseriesResultValue>> queryResult = sequence.toList();
Assert.assertTrue(0 < queryRunner.getTotalNumRetries());
Assert.assertEquals(expectedTimeseriesResult(10), queryResult);
}
@Test
public void testFailWithPartialResultsAfterRetry()
{
prepareCluster(10);
final Query<Result<TimeseriesResultValue>> query = timeseriesQuery(SCHEMA_INFO.getDataInterval());
final RetryQueryRunner<Result<TimeseriesResultValue>> queryRunner = createQueryRunner(
newRetryQueryRunnerConfig(1, false),
query,
() -> dropSegmentFromServer(servers.get(0))
);
final Sequence<Result<TimeseriesResultValue>> sequence = queryRunner.run(QueryPlus.wrap(query), responseContext());
expectedException.expect(SegmentMissingException.class);
expectedException.expectMessage("No results found for segments");
try {
sequence.toList();
}
finally {
Assert.assertEquals(1, queryRunner.getTotalNumRetries());
}
}
private void prepareCluster(int numServers)
{
for (int i = 0; i < numServers; i++) {
final DataSegment segment = newSegment(SCHEMA_INFO.getDataInterval(), i);
addServer(
SimpleServerView.createServer(i + 1),
segment,
segmentGenerator.generate(segment, SCHEMA_INFO, Granularities.NONE, 10)
);
}
}
/**
* Drops a segment from the DruidServer. This method doesn't update the server view, but the server will stop
* serving queries for the dropped segment.
*/
private NonnullPair<DataSegment, QueryableIndex> dropSegmentFromServer(DruidServer fromServer)
{
final SimpleServerManager serverManager = httpClient.getServerManager(fromServer);
Assert.assertNotNull(serverManager);
return serverManager.dropSegment();
}
/**
* Drops a segment from the DruidServer and update the server view.
*/
private NonnullPair<DataSegment, QueryableIndex> unannounceSegmentFromServer(DruidServer fromServer)
{
final NonnullPair<DataSegment, QueryableIndex> pair = dropSegmentFromServer(fromServer);
simpleServerView.unannounceSegmentFromServer(fromServer, pair.lhs);
return pair;
}
/**
* Drops a segment from the {@code fromServer} and creates a new server serving the dropped segment.
* This method updates the server view.
*/
private void dropSegmentFromServerAndAddNewServerForSegment(DruidServer fromServer)
{
final NonnullPair<DataSegment, QueryableIndex> pair = unannounceSegmentFromServer(fromServer);
final DataSegment segmentToMove = pair.lhs;
final QueryableIndex queryableIndexToMove = pair.rhs;
addServer(
SimpleServerView.createServer(11),
segmentToMove,
queryableIndexToMove
);
}
private <T> RetryQueryRunner<T> createQueryRunner(
RetryQueryRunnerConfig retryQueryRunnerConfig,
Query<T> query,
Runnable runnableAfterFirstAttempt
)
{
final QueryRunner<T> baseRunner = cachingClusteredClient.getQueryRunnerForIntervals(query, query.getIntervals());
return new RetryQueryRunner<>(
baseRunner,
cachingClusteredClient::getQueryRunnerForSegments,
retryQueryRunnerConfig,
objectMapper,
runnableAfterFirstAttempt
);
}
private static RetryQueryRunnerConfig newRetryQueryRunnerConfig(int numTries, boolean returnPartialResults)
{
return new RetryQueryRunnerConfig()
{
@Override
public int getNumTries()
{
return numTries;
}
@Override
public boolean isReturnPartialResults()
{
return returnPartialResults;
}
};
}
private static Query<Result<TimeseriesResultValue>> timeseriesQuery(Interval interval)
{
return Druids.newTimeseriesQueryBuilder()
.dataSource(DATASOURCE)
.intervals(ImmutableList.of(interval))
.granularity(Granularities.DAY)
.aggregators(new CountAggregatorFactory("rows"))
.context(
ImmutableMap.of(
DirectDruidClient.QUERY_FAIL_TIME,
System.currentTimeMillis() + 10000
)
)
.build()
.withId(UUID.randomUUID().toString());
}
private ResponseContext responseContext()
{
final ResponseContext responseContext = ConcurrentResponseContext.createEmpty();
responseContext.put(Key.REMAINING_RESPONSES_FROM_QUERY_SERVERS, new ConcurrentHashMap<>());
return responseContext;
}
private static List<Result<TimeseriesResultValue>> expectedTimeseriesResult(int expectedNumResultRows)
{
return IntStream
.range(0, expectedNumResultRows)
.mapToObj(i -> new Result<>(DateTimes.of("2000-01-01"), new TimeseriesResultValue(ImmutableMap.of("rows", 10))))
.collect(Collectors.toList());
}
private static DataSegment newSegment(
Interval interval,
int partitionId
)
{
return DataSegment.builder()
.dataSource(DATASOURCE)
.interval(interval)
.version("1")
.shardSpec(new NumberedShardSpec(partitionId, 0))
.size(10)
.build();
}
}

View File

@ -100,6 +100,7 @@ import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.UUID;
/**
* Tests ClientQuerySegmentWalker.
@ -203,13 +204,14 @@ public class ClientQuerySegmentWalkerTest
public void testTimeseriesOnTable()
{
final TimeseriesQuery query =
Druids.newTimeseriesQueryBuilder()
.dataSource(FOO)
.granularity(Granularities.ALL)
.intervals(Collections.singletonList(INTERVAL))
.aggregators(new LongSumAggregatorFactory("sum", "n"))
.context(ImmutableMap.of(TimeseriesQuery.CTX_GRAND_TOTAL, false))
.build();
(TimeseriesQuery) Druids.newTimeseriesQueryBuilder()
.dataSource(FOO)
.granularity(Granularities.ALL)
.intervals(Collections.singletonList(INTERVAL))
.aggregators(new LongSumAggregatorFactory("sum", "n"))
.context(ImmutableMap.of(TimeseriesQuery.CTX_GRAND_TOTAL, false))
.build()
.withId(UUID.randomUUID().toString());
testQuery(
query,
@ -227,23 +229,25 @@ public class ClientQuerySegmentWalkerTest
public void testTimeseriesOnAutomaticGlobalTable()
{
final TimeseriesQuery query =
Druids.newTimeseriesQueryBuilder()
.dataSource(GLOBAL)
.granularity(Granularities.ALL)
.intervals(Collections.singletonList(INTERVAL))
.aggregators(new LongSumAggregatorFactory("sum", "n"))
.context(ImmutableMap.of(TimeseriesQuery.CTX_GRAND_TOTAL, false))
.build();
(TimeseriesQuery) Druids.newTimeseriesQueryBuilder()
.dataSource(GLOBAL)
.granularity(Granularities.ALL)
.intervals(Collections.singletonList(INTERVAL))
.aggregators(new LongSumAggregatorFactory("sum", "n"))
.context(ImmutableMap.of(TimeseriesQuery.CTX_GRAND_TOTAL, false))
.build()
.withId("queryId");
// expect global/joinable datasource to be automatically translated into a GlobalTableDataSource
final TimeseriesQuery expectedClusterQuery =
Druids.newTimeseriesQueryBuilder()
.dataSource(new GlobalTableDataSource(GLOBAL))
.granularity(Granularities.ALL)
.intervals(Collections.singletonList(INTERVAL))
.aggregators(new LongSumAggregatorFactory("sum", "n"))
.context(ImmutableMap.of(TimeseriesQuery.CTX_GRAND_TOTAL, false))
.build();
(TimeseriesQuery) Druids.newTimeseriesQueryBuilder()
.dataSource(new GlobalTableDataSource(GLOBAL))
.granularity(Granularities.ALL)
.intervals(Collections.singletonList(INTERVAL))
.aggregators(new LongSumAggregatorFactory("sum", "n"))
.context(ImmutableMap.of(TimeseriesQuery.CTX_GRAND_TOTAL, false))
.build()
.withId("queryId");
testQuery(
query,
@ -261,12 +265,13 @@ public class ClientQuerySegmentWalkerTest
public void testTimeseriesOnInline()
{
final TimeseriesQuery query =
Druids.newTimeseriesQueryBuilder()
.dataSource(FOO_INLINE)
.granularity(Granularities.ALL)
.intervals(Collections.singletonList(INTERVAL))
.aggregators(new LongSumAggregatorFactory("sum", "n"))
.build();
(TimeseriesQuery) Druids.newTimeseriesQueryBuilder()
.dataSource(FOO_INLINE)
.granularity(Granularities.ALL)
.intervals(Collections.singletonList(INTERVAL))
.aggregators(new LongSumAggregatorFactory("sum", "n"))
.build()
.withId(UUID.randomUUID().toString());
testQuery(
query,
@ -292,12 +297,13 @@ public class ClientQuerySegmentWalkerTest
.build();
final TimeseriesQuery query =
Druids.newTimeseriesQueryBuilder()
.dataSource(new QueryDataSource(subquery))
.granularity(Granularities.ALL)
.intervals(Intervals.ONLY_ETERNITY)
.aggregators(new CountAggregatorFactory("cnt"))
.build();
(TimeseriesQuery) Druids.newTimeseriesQueryBuilder()
.dataSource(new QueryDataSource(subquery))
.granularity(Granularities.ALL)
.intervals(Intervals.ONLY_ETERNITY)
.aggregators(new CountAggregatorFactory("cnt"))
.build()
.withId(UUID.randomUUID().toString());
testQuery(
query,
@ -327,20 +333,22 @@ public class ClientQuerySegmentWalkerTest
public void testGroupByOnGroupByOnTable()
{
final GroupByQuery subquery =
GroupByQuery.builder()
.setDataSource(FOO)
.setGranularity(Granularities.ALL)
.setInterval(Collections.singletonList(INTERVAL))
.setDimensions(DefaultDimensionSpec.of("s"))
.build();
(GroupByQuery) GroupByQuery.builder()
.setDataSource(FOO)
.setGranularity(Granularities.ALL)
.setInterval(Collections.singletonList(INTERVAL))
.setDimensions(DefaultDimensionSpec.of("s"))
.build()
.withId("queryId");
final GroupByQuery query =
GroupByQuery.builder()
.setDataSource(new QueryDataSource(subquery))
.setGranularity(Granularities.ALL)
.setInterval(Intervals.ONLY_ETERNITY)
.setAggregatorSpecs(new CountAggregatorFactory("cnt"))
.build();
(GroupByQuery) GroupByQuery.builder()
.setDataSource(new QueryDataSource(subquery))
.setGranularity(Granularities.ALL)
.setInterval(Intervals.ONLY_ETERNITY)
.setAggregatorSpecs(new CountAggregatorFactory("cnt"))
.build()
.withId("queryId");
testQuery(
query,
@ -359,20 +367,21 @@ public class ClientQuerySegmentWalkerTest
public void testGroupByOnUnionOfTwoTables()
{
final GroupByQuery query =
GroupByQuery.builder()
.setDataSource(
new UnionDataSource(
ImmutableList.of(
new TableDataSource(FOO),
new TableDataSource(BAR)
)
)
)
.setGranularity(Granularities.ALL)
.setInterval(Intervals.ONLY_ETERNITY)
.setDimensions(DefaultDimensionSpec.of("s"))
.setAggregatorSpecs(new CountAggregatorFactory("cnt"))
.build();
(GroupByQuery) GroupByQuery.builder()
.setDataSource(
new UnionDataSource(
ImmutableList.of(
new TableDataSource(FOO),
new TableDataSource(BAR)
)
)
)
.setGranularity(Granularities.ALL)
.setInterval(Intervals.ONLY_ETERNITY)
.setDimensions(DefaultDimensionSpec.of("s"))
.setAggregatorSpecs(new CountAggregatorFactory("cnt"))
.build()
.withId(UUID.randomUUID().toString());
testQuery(
query,
@ -411,22 +420,23 @@ public class ClientQuerySegmentWalkerTest
.build();
final GroupByQuery query =
GroupByQuery.builder()
.setDataSource(
JoinDataSource.create(
new TableDataSource(FOO),
new QueryDataSource(subquery),
"j.",
"\"j.s\" == \"s\"",
JoinType.INNER,
ExprMacroTable.nil()
)
)
.setGranularity(Granularities.ALL)
.setInterval(Intervals.ONLY_ETERNITY)
.setDimensions(DefaultDimensionSpec.of("s"), DefaultDimensionSpec.of("j.s"))
.setAggregatorSpecs(new CountAggregatorFactory("cnt"))
.build();
(GroupByQuery) GroupByQuery.builder()
.setDataSource(
JoinDataSource.create(
new TableDataSource(FOO),
new QueryDataSource(subquery),
"j.",
"\"j.s\" == \"s\"",
JoinType.INNER,
ExprMacroTable.nil()
)
)
.setGranularity(Granularities.ALL)
.setInterval(Intervals.ONLY_ETERNITY)
.setDimensions(DefaultDimensionSpec.of("s"), DefaultDimensionSpec.of("j.s"))
.setAggregatorSpecs(new CountAggregatorFactory("cnt"))
.build()
.withId(UUID.randomUUID().toString());
testQuery(
query,
@ -471,13 +481,14 @@ public class ClientQuerySegmentWalkerTest
.resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST)
.build();
final GroupByQuery query =
GroupByQuery.builder()
.setDataSource(new QueryDataSource(subquery))
.setGranularity(Granularities.ALL)
.setInterval(Intervals.ONLY_ETERNITY)
.setDimensions(DefaultDimensionSpec.of("s"))
.setAggregatorSpecs(new LongSumAggregatorFactory("sum_n", "n"))
.build();
(GroupByQuery) GroupByQuery.builder()
.setDataSource(new QueryDataSource(subquery))
.setGranularity(Granularities.ALL)
.setInterval(Intervals.ONLY_ETERNITY)
.setDimensions(DefaultDimensionSpec.of("s"))
.setAggregatorSpecs(new LongSumAggregatorFactory("sum_n", "n"))
.build()
.withId(UUID.randomUUID().toString());
testQuery(
query,
@ -525,14 +536,15 @@ public class ClientQuerySegmentWalkerTest
.resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST)
.build();
final TopNQuery query =
new TopNQueryBuilder().dataSource(new QueryDataSource(subquery))
.granularity(Granularities.ALL)
.intervals(Intervals.ONLY_ETERNITY)
.dimension(DefaultDimensionSpec.of("s"))
.metric("sum_n")
.threshold(100)
.aggregators(new LongSumAggregatorFactory("sum_n", "n"))
.build();
(TopNQuery) new TopNQueryBuilder().dataSource(new QueryDataSource(subquery))
.granularity(Granularities.ALL)
.intervals(Intervals.ONLY_ETERNITY)
.dimension(DefaultDimensionSpec.of("s"))
.metric("sum_n")
.threshold(100)
.aggregators(new LongSumAggregatorFactory("sum_n", "n"))
.build()
.withId(UUID.randomUUID().toString());
testQuery(
query,
@ -570,22 +582,23 @@ public class ClientQuerySegmentWalkerTest
public void testJoinOnTableErrorCantInlineTable()
{
final GroupByQuery query =
GroupByQuery.builder()
.setDataSource(
JoinDataSource.create(
new TableDataSource(FOO),
new TableDataSource(BAR),
"j.",
"\"j.s\" == \"s\"",
JoinType.INNER,
ExprMacroTable.nil()
)
)
.setGranularity(Granularities.ALL)
.setInterval(Intervals.ONLY_ETERNITY)
.setDimensions(DefaultDimensionSpec.of("s"), DefaultDimensionSpec.of("j.s"))
.setAggregatorSpecs(new CountAggregatorFactory("cnt"))
.build();
(GroupByQuery) GroupByQuery.builder()
.setDataSource(
JoinDataSource.create(
new TableDataSource(FOO),
new TableDataSource(BAR),
"j.",
"\"j.s\" == \"s\"",
JoinType.INNER,
ExprMacroTable.nil()
)
)
.setGranularity(Granularities.ALL)
.setInterval(Intervals.ONLY_ETERNITY)
.setDimensions(DefaultDimensionSpec.of("s"), DefaultDimensionSpec.of("j.s"))
.setAggregatorSpecs(new CountAggregatorFactory("cnt"))
.build()
.withId(UUID.randomUUID().toString());
expectedException.expect(IllegalStateException.class);
expectedException.expectMessage("Cannot handle subquery structure for dataSource");
@ -607,12 +620,13 @@ public class ClientQuerySegmentWalkerTest
.build();
final TimeseriesQuery query =
Druids.newTimeseriesQueryBuilder()
.dataSource(new QueryDataSource(subquery))
.granularity(Granularities.ALL)
.intervals(Intervals.ONLY_ETERNITY)
.aggregators(new CountAggregatorFactory("cnt"))
.build();
(TimeseriesQuery) Druids.newTimeseriesQueryBuilder()
.dataSource(new QueryDataSource(subquery))
.granularity(Granularities.ALL)
.intervals(Intervals.ONLY_ETERNITY)
.aggregators(new CountAggregatorFactory("cnt"))
.build()
.withId(UUID.randomUUID().toString());
expectedException.expect(ResourceLimitExceededException.class);
expectedException.expectMessage("Subquery generated results beyond maximum[2]");
@ -741,8 +755,7 @@ public class ClientQuerySegmentWalkerTest
{
issuedQueries.clear();
final Sequence<T> resultSequence =
QueryPlus.wrap(query).run(walker, ResponseContext.createEmpty());
final Sequence<T> resultSequence = QueryPlus.wrap(query).run(walker, ResponseContext.createEmpty());
final List<Object[]> arrays =
conglomerate.findFactory(query).getToolchest().resultsAsArrays(query, resultSequence).toList();

View File

@ -178,10 +178,57 @@ public class QueryStackTests
);
}
public static DruidProcessingConfig getProcessingConfig(boolean useParallelMergePoolConfigured)
{
return new DruidProcessingConfig()
{
@Override
public String getFormatString()
{
return null;
}
@Override
public int intermediateComputeSizeBytes()
{
return COMPUTE_BUFFER_SIZE;
}
@Override
public int getNumThreads()
{
// Only use 1 thread for tests.
return 1;
}
@Override
public int getNumMergeBuffers()
{
// Need 3 buffers for CalciteQueryTest.testDoubleNestedGroupby.
// Two buffers for the broker and one for the queryable.
return 3;
}
@Override
public boolean useParallelMergePoolConfigured()
{
return useParallelMergePoolConfigured;
}
};
}
/**
* Returns a new {@link QueryRunnerFactoryConglomerate}. Adds relevant closeables to the passed-in {@link Closer}.
*/
public static QueryRunnerFactoryConglomerate createQueryRunnerFactoryConglomerate(final Closer closer)
{
return createQueryRunnerFactoryConglomerate(closer, true);
}
public static QueryRunnerFactoryConglomerate createQueryRunnerFactoryConglomerate(
final Closer closer,
final boolean useParallelMergePoolConfigured
)
{
final CloseableStupidPool<ByteBuffer> stupidPool = new CloseableStupidPool<>(
"TopNQueryRunnerFactory-bufferPool",
@ -201,35 +248,7 @@ public class QueryStackTests
return GroupByStrategySelector.STRATEGY_V2;
}
},
new DruidProcessingConfig()
{
@Override
public String getFormatString()
{
return null;
}
@Override
public int intermediateComputeSizeBytes()
{
return COMPUTE_BUFFER_SIZE;
}
@Override
public int getNumThreads()
{
// Only use 1 thread for tests.
return 1;
}
@Override
public int getNumMergeBuffers()
{
// Need 3 buffers for CalciteQueryTest.testDoubleNestedGroupby.
// Two buffers for the broker and one for the queryable.
return 3;
}
}
getProcessingConfig(useParallelMergePoolConfigured)
);
final GroupByQueryRunnerFactory groupByQueryRunnerFactory = factoryCloserPair.lhs;

View File

@ -20,8 +20,8 @@
package org.apache.druid.server;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.query.Query;
import org.apache.druid.query.TableDataSource;
@ -78,14 +78,14 @@ public class RequestLogLineTest
new QueryStats(ImmutableMap.of())
);
Assert.assertEquals("", requestLogLine.getRemoteAddr());
requestLogLine.getNativeQueryLine(new ObjectMapper()); // call should not throw exception
requestLogLine.getNativeQueryLine(new DefaultObjectMapper()); // call should not throw exception
requestLogLine = RequestLogLine.forSql(
"", null, DateTimes.nowUtc(), null, new QueryStats(ImmutableMap.of())
);
Assert.assertEquals("", requestLogLine.getRemoteAddr());
Assert.assertEquals(ImmutableMap.<String, Object>of(), requestLogLine.getSqlQueryContext());
requestLogLine.getSqlQueryLine(new ObjectMapper()); // call should not throw exception
requestLogLine.getSqlQueryLine(new DefaultObjectMapper()); // call should not throw exception
}
}

View File

@ -24,6 +24,7 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import org.apache.druid.client.SegmentServerSelector;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.NonnullPair;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.guava.FunctionalIterable;
import org.apache.druid.java.util.common.guava.LazySequence;
@ -39,6 +40,7 @@ import org.apache.druid.query.QuerySegmentWalker;
import org.apache.druid.query.QueryToolChest;
import org.apache.druid.query.SegmentDescriptor;
import org.apache.druid.query.TableDataSource;
import org.apache.druid.query.context.ResponseContext.Key;
import org.apache.druid.query.planning.DataSourceAnalysis;
import org.apache.druid.query.spec.SpecificSegmentQueryRunner;
import org.apache.druid.query.spec.SpecificSegmentSpec;
@ -60,6 +62,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
@ -164,6 +167,11 @@ public class TestClusterQuerySegmentWalker implements QuerySegmentWalker
// the LocalQuerySegmentWalker constructor instead since this walker is not mimic remote DruidServer objects
// to actually serve the queries
return (theQuery, responseContext) -> {
responseContext.put(Key.REMAINING_RESPONSES_FROM_QUERY_SERVERS, new ConcurrentHashMap<>());
responseContext.add(
Key.REMAINING_RESPONSES_FROM_QUERY_SERVERS,
new NonnullPair<>(theQuery.getQuery().getMostSpecificId(), 0)
);
if (scheduler != null) {
Set<SegmentServerSelector> segments = new HashSet<>();
specs.forEach(spec -> segments.add(new SegmentServerSelector(spec)));

View File

@ -19,16 +19,13 @@
package org.apache.druid.server.coordinator;
import org.apache.druid.java.util.common.DateTimes;
public class CoordinatorRuntimeParamsTestHelpers
{
public static DruidCoordinatorRuntimeParams.Builder newBuilder()
{
return DruidCoordinatorRuntimeParams
.newBuilder()
.withStartTimeNanos(System.nanoTime())
.withBalancerReferenceTimestamp(DateTimes.of("2013-01-01"));
.withStartTimeNanos(System.nanoTime());
}
public static DruidCoordinatorRuntimeParams.Builder newBuilder(DruidCluster druidCluster)

View File

@ -1062,7 +1062,6 @@ public class RunRulesTest
.withDatabaseRuleManager(databaseRuleManager)
.withSegmentReplicantLookup(SegmentReplicantLookup.make(new DruidCluster()))
.withBalancerStrategy(balancerStrategy)
.withBalancerReferenceTimestamp(DateTimes.of("2013-01-01"))
.withDynamicConfigs(CoordinatorDynamicConfig.builder().withMaxSegmentsToMove(5).build())
.build();