mirror of https://github.com/apache/druid.git
Refactor ResponseContext (#11828)
* Refactor ResponseContext Fixes a number of issues in preparation for request trailers and the query profile. * Converts keys from an enum to classes for smaller code * Wraps stored values in functions for easier capture for other uses * Reworks the "header squeezer" to handle types other than arrays. * Uses metadata for visibility, and ability to compress, to replace ad-hoc code. * Cleans up JSON serialization for the response context. * Other miscellaneous cleanup. * Handle unknown keys in deserialization Also, make "Visibility" into a boolean. * Revised comment * Renamd variable
This commit is contained in:
parent
44b2fb71ab
commit
34a3d45737
|
@ -24,3 +24,4 @@ README
|
|||
**/.pmd
|
||||
**/.pmdruleset.xml
|
||||
.java-version
|
||||
integration-tests/gen-scripts/
|
||||
|
|
|
@ -123,8 +123,7 @@ public class MovingAverageQueryRunner implements QueryRunner<Row>
|
|||
|
||||
ResponseContext gbqResponseContext = ResponseContext.createEmpty();
|
||||
gbqResponseContext.merge(responseContext);
|
||||
gbqResponseContext.put(
|
||||
ResponseContext.Key.QUERY_FAIL_DEADLINE_MILLIS,
|
||||
gbqResponseContext.putQueryFailDeadlineMs(
|
||||
System.currentTimeMillis() + QueryContexts.getTimeout(gbq)
|
||||
);
|
||||
|
||||
|
@ -164,8 +163,7 @@ public class MovingAverageQueryRunner implements QueryRunner<Row>
|
|||
);
|
||||
ResponseContext tsqResponseContext = ResponseContext.createEmpty();
|
||||
tsqResponseContext.merge(responseContext);
|
||||
tsqResponseContext.put(
|
||||
ResponseContext.Key.QUERY_FAIL_DEADLINE_MILLIS,
|
||||
tsqResponseContext.putQueryFailDeadlineMs(
|
||||
System.currentTimeMillis() + QueryContexts.getTimeout(tsq)
|
||||
);
|
||||
|
||||
|
|
|
@ -31,15 +31,20 @@ import org.apache.druid.java.util.common.DateTimes;
|
|||
import org.apache.druid.java.util.common.guava.Accumulator;
|
||||
import org.apache.druid.java.util.common.guava.Sequence;
|
||||
import org.apache.druid.java.util.common.guava.Yielder;
|
||||
import org.apache.druid.query.context.ResponseContext;
|
||||
import org.apache.druid.query.context.ResponseContextDeserializer;
|
||||
import org.joda.time.DateTimeZone;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.ByteOrder;
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
@SuppressWarnings("serial")
|
||||
public class DruidDefaultSerializersModule extends SimpleModule
|
||||
{
|
||||
@SuppressWarnings("rawtypes")
|
||||
public DruidDefaultSerializersModule()
|
||||
{
|
||||
super("Druid default serializers");
|
||||
|
@ -78,6 +83,7 @@ public class DruidDefaultSerializersModule extends SimpleModule
|
|||
Sequence.class,
|
||||
new JsonSerializer<Sequence>()
|
||||
{
|
||||
@SuppressWarnings("unchecked")
|
||||
@Override
|
||||
public void serialize(Sequence value, final JsonGenerator jgen, SerializerProvider provider)
|
||||
throws IOException
|
||||
|
@ -108,6 +114,7 @@ public class DruidDefaultSerializersModule extends SimpleModule
|
|||
Yielder.class,
|
||||
new JsonSerializer<Yielder>()
|
||||
{
|
||||
@SuppressWarnings("unchecked")
|
||||
@Override
|
||||
public void serialize(Yielder yielder, final JsonGenerator jgen, SerializerProvider provider)
|
||||
throws IOException
|
||||
|
@ -142,5 +149,6 @@ public class DruidDefaultSerializersModule extends SimpleModule
|
|||
}
|
||||
}
|
||||
);
|
||||
addDeserializer(ResponseContext.class, new ResponseContextDeserializer());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -56,7 +56,6 @@ public class CPUTimeMetricQueryRunner<T> implements QueryRunner<T>
|
|||
this.report = report;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public Sequence<T> run(final QueryPlus<T> queryPlus, final ResponseContext responseContext)
|
||||
{
|
||||
|
@ -88,7 +87,7 @@ public class CPUTimeMetricQueryRunner<T> implements QueryRunner<T>
|
|||
if (report) {
|
||||
final long cpuTimeNs = cpuTimeAccumulator.get();
|
||||
if (cpuTimeNs > 0) {
|
||||
responseContext.add(ResponseContext.Key.CPU_CONSUMED_NANOS, cpuTimeNs);
|
||||
responseContext.addCpuNanos(cpuTimeNs);
|
||||
queryWithMetrics.getQueryMetrics().reportCpuTime(cpuTimeNs).emit(emitter);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -49,7 +49,7 @@ public class ReportTimelineMissingSegmentQueryRunner<T> implements QueryRunner<T
|
|||
public Sequence<T> run(QueryPlus<T> queryPlus, ResponseContext responseContext)
|
||||
{
|
||||
LOG.debug("Reporting a missing segments[%s] for query[%s]", descriptors, queryPlus.getQuery().getId());
|
||||
responseContext.add(ResponseContext.Key.MISSING_SEGMENTS, descriptors);
|
||||
responseContext.addMissingSegments(descriptors);
|
||||
return Sequences.empty();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -35,10 +35,10 @@ public class ConcurrentResponseContext extends ResponseContext
|
|||
return new ConcurrentResponseContext();
|
||||
}
|
||||
|
||||
private final ConcurrentHashMap<BaseKey, Object> delegate = new ConcurrentHashMap<>();
|
||||
private final ConcurrentHashMap<Key, Object> delegate = new ConcurrentHashMap<>();
|
||||
|
||||
@Override
|
||||
protected Map<BaseKey, Object> getDelegate()
|
||||
protected Map<Key, Object> getDelegate()
|
||||
{
|
||||
return delegate;
|
||||
}
|
||||
|
|
|
@ -35,10 +35,10 @@ public class DefaultResponseContext extends ResponseContext
|
|||
return new DefaultResponseContext();
|
||||
}
|
||||
|
||||
private final HashMap<BaseKey, Object> delegate = new HashMap<>();
|
||||
private final HashMap<Key, Object> delegate = new HashMap<>();
|
||||
|
||||
@Override
|
||||
protected Map<BaseKey, Object> getDelegate()
|
||||
protected Map<Key, Object> getDelegate()
|
||||
{
|
||||
return delegate;
|
||||
}
|
||||
|
|
|
@ -19,37 +19,79 @@
|
|||
|
||||
package org.apache.druid.query.context;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonIgnore;
|
||||
import com.fasterxml.jackson.annotation.JsonValue;
|
||||
import com.fasterxml.jackson.core.JsonParser;
|
||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||
import com.fasterxml.jackson.core.type.TypeReference;
|
||||
import com.fasterxml.jackson.databind.JsonNode;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.fasterxml.jackson.databind.node.ArrayNode;
|
||||
import com.fasterxml.jackson.databind.node.ObjectNode;
|
||||
import com.google.common.collect.Lists;
|
||||
import org.apache.druid.guice.annotations.ExtensionPoint;
|
||||
import org.apache.druid.guice.annotations.PublicApi;
|
||||
import org.apache.druid.java.util.common.IAE;
|
||||
import org.apache.druid.java.util.common.ISE;
|
||||
import org.apache.druid.java.util.common.NonnullPair;
|
||||
import org.apache.druid.java.util.common.jackson.JacksonUtils;
|
||||
import org.apache.druid.query.SegmentDescriptor;
|
||||
import org.apache.druid.utils.CollectionUtils;
|
||||
import org.joda.time.Interval;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.Comparator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
import java.util.concurrent.ConcurrentSkipListMap;
|
||||
import java.util.function.BiFunction;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.function.Function;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
/**
|
||||
* The context for storing and passing data between chains of {@link org.apache.druid.query.QueryRunner}s.
|
||||
* The context is also transferred between Druid nodes with all the data it contains.
|
||||
* <p>
|
||||
* The response context consists of a set of key/value pairs. Keys are those defined in
|
||||
* the {@code Keys} registry. Keys are indexed by key instance, not by name. The
|
||||
* key defines the type of the associated value, including logic to merge values and
|
||||
* to deserialize JSON values for that key.
|
||||
*
|
||||
* <h4>Structure</h4>
|
||||
* The context has evolved to perform multiple tasks. First, it holds two kinds
|
||||
* of information:
|
||||
* <ul>
|
||||
* <li>Information to be returned in the query response header.
|
||||
* These are values tagged as being in the header.</li>
|
||||
* <li>Values passed within a single server. These are tagged with
|
||||
* not being in the header.</li>
|
||||
* </ul>
|
||||
* Second, it performs multiple tasks:
|
||||
* <ul>
|
||||
* <li>Registers the keys to be used in the header. But, since it also holds
|
||||
* internal information, the internal information also needs keys, though the
|
||||
* corresponding values are never serialized.</li>
|
||||
* <li>Gathers information for the query as a whole.</li>
|
||||
* <li>Merges information back up the query tree: from multiple segments,
|
||||
* from multiple servers, etc.</li>
|
||||
* <li>Manages headers size by dropping fields when the header would get too
|
||||
* large.</li>
|
||||
* </ul>
|
||||
*
|
||||
* A result is that the information the context, when inspected by a calling
|
||||
* query, may be incomplete if some of it was previously dropped by the
|
||||
* called query.
|
||||
*
|
||||
* <h4>API</h4>
|
||||
*
|
||||
* The query profile needs to obtain the full, untruncated information. To do this
|
||||
* it piggy-backs on the set operations to obtain the full value. To ensure this
|
||||
* is possible, code that works with standard values should call the set (or add)
|
||||
* functions provided which will do the needed map update.
|
||||
*/
|
||||
@PublicApi
|
||||
public abstract class ResponseContext
|
||||
|
@ -58,65 +100,243 @@ public abstract class ResponseContext
|
|||
* The base interface of a response context key.
|
||||
* Should be implemented by every context key.
|
||||
*/
|
||||
public interface BaseKey
|
||||
@ExtensionPoint
|
||||
public interface Key
|
||||
{
|
||||
@JsonValue
|
||||
String getName();
|
||||
|
||||
/**
|
||||
* Merge function associated with a key: Object (Object oldValue, Object newValue)
|
||||
* Whether to return the key, value pair in the response header.
|
||||
* If false, the value is for internal use only.
|
||||
*/
|
||||
BiFunction<Object, Object, Object> getMergeFunction();
|
||||
boolean includeInHeader();
|
||||
|
||||
/**
|
||||
* Reads a value of this key from a JSON stream. Used by {@link ResponseContextDeserializer}.
|
||||
*/
|
||||
Object readValue(JsonParser jp);
|
||||
|
||||
/**
|
||||
* Merges two values of type T.
|
||||
*
|
||||
* This method may modify "oldValue" but must not modify "newValue".
|
||||
*/
|
||||
Object mergeValues(Object oldValue, Object newValue);
|
||||
|
||||
/**
|
||||
* Returns true if this key can be removed to reduce header size when the
|
||||
* header would otherwise be too large.
|
||||
*/
|
||||
@JsonIgnore
|
||||
boolean canDrop();
|
||||
}
|
||||
|
||||
/**
|
||||
* Keys associated with objects in the context.
|
||||
* Abstract key class which provides most functionality except the
|
||||
* type-specific merge logic. Parsing is provided by an associated
|
||||
* parse function.
|
||||
*/
|
||||
public abstract static class AbstractKey implements Key
|
||||
{
|
||||
private final String name;
|
||||
private final boolean inHeader;
|
||||
private final boolean canDrop;
|
||||
private final Function<JsonParser, Object> parseFunction;
|
||||
|
||||
AbstractKey(String name, boolean inHeader, boolean canDrop, Class<?> serializedClass)
|
||||
{
|
||||
this.name = name;
|
||||
this.inHeader = inHeader;
|
||||
this.canDrop = canDrop;
|
||||
this.parseFunction = jp -> {
|
||||
try {
|
||||
return jp.readValueAs(serializedClass);
|
||||
}
|
||||
catch (IOException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
AbstractKey(String name, boolean inHeader, boolean canDrop, TypeReference<?> serializedTypeReference)
|
||||
{
|
||||
this.name = name;
|
||||
this.inHeader = inHeader;
|
||||
this.canDrop = canDrop;
|
||||
this.parseFunction = jp -> {
|
||||
try {
|
||||
return jp.readValueAs(serializedTypeReference);
|
||||
}
|
||||
catch (IOException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getName()
|
||||
{
|
||||
return name;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean includeInHeader()
|
||||
{
|
||||
return inHeader;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean canDrop()
|
||||
{
|
||||
return canDrop;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object readValue(JsonParser jp)
|
||||
{
|
||||
return parseFunction.apply(jp);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString()
|
||||
{
|
||||
return name;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* String valued attribute that holds the latest value assigned.
|
||||
*/
|
||||
public static class StringKey extends AbstractKey
|
||||
{
|
||||
StringKey(String name, boolean inHeader, boolean canDrop)
|
||||
{
|
||||
super(name, inHeader, canDrop, String.class);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object mergeValues(Object oldValue, Object newValue)
|
||||
{
|
||||
return newValue;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Boolean valued attribute with the semantics that once the flag is
|
||||
* set true, it stays true.
|
||||
*/
|
||||
public static class BooleanKey extends AbstractKey
|
||||
{
|
||||
BooleanKey(String name, boolean inHeader)
|
||||
{
|
||||
super(name, inHeader, false, Boolean.class);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object mergeValues(Object oldValue, Object newValue)
|
||||
{
|
||||
return (boolean) oldValue || (boolean) newValue;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Long valued attribute that holds the latest value assigned.
|
||||
*/
|
||||
public static class LongKey extends AbstractKey
|
||||
{
|
||||
LongKey(String name, boolean inHeader)
|
||||
{
|
||||
super(name, inHeader, false, Long.class);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object mergeValues(Object oldValue, Object newValue)
|
||||
{
|
||||
return newValue;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Long valued attribute that holds the accumulation of values assigned.
|
||||
*/
|
||||
public static class CounterKey extends AbstractKey
|
||||
{
|
||||
CounterKey(String name, boolean inHeader)
|
||||
{
|
||||
super(name, inHeader, false, Long.class);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object mergeValues(Object oldValue, Object newValue)
|
||||
{
|
||||
if (oldValue == null) {
|
||||
return newValue;
|
||||
}
|
||||
if (newValue == null) {
|
||||
return oldValue;
|
||||
}
|
||||
return (Long) oldValue + (Long) newValue;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Global registry of response context keys. Also defines the standard keys
|
||||
* associated with objects in the context.
|
||||
* <p>
|
||||
* If it's necessary to have some new keys in the context then they might be listed in a separate enum:
|
||||
* If it's necessary to add new keys in the context then they should be listed
|
||||
* in a separate class:
|
||||
* <pre>{@code
|
||||
* public enum ExtensionResponseContextKey implements BaseKey
|
||||
* public class SomeClass
|
||||
* {
|
||||
* EXTENSION_KEY_1("extension_key_1"), EXTENSION_KEY_2("extension_key_2");
|
||||
* static final Key EXTENSION_KEY_1 = new StringKey(
|
||||
* "extension_key_1", Visibility.HEADER_AND_TRAILER, true),
|
||||
* static final Key EXTENSION_KEY_2 = new CounterKey(
|
||||
* "extension_key_2", Visibility.None);
|
||||
*
|
||||
* static {
|
||||
* for (BaseKey key : values()) ResponseContext.Key.registerKey(key);
|
||||
* }
|
||||
*
|
||||
* private final String name;
|
||||
* private final BiFunction<Object, Object, Object> mergeFunction;
|
||||
*
|
||||
* ExtensionResponseContextKey(String name)
|
||||
* {
|
||||
* this.name = name;
|
||||
* this.mergeFunction = (oldValue, newValue) -> newValue;
|
||||
* }
|
||||
*
|
||||
* @Override public String getName() { return name; }
|
||||
*
|
||||
* @Override public BiFunction<Object, Object, Object> getMergeFunction() { return mergeFunction; }
|
||||
* Keys.instance().registerKeys(new Key[] {
|
||||
* EXTENSION_KEY_1,
|
||||
* EXTENSION_KEY_2
|
||||
* });
|
||||
* }
|
||||
* }</pre>
|
||||
* Make sure all extension enum values added with {@link Key#registerKey} method.
|
||||
* Make sure all extension keys are added with the {@link #registerKey(Key)} or
|
||||
* {@link #registerKeys(Key[])} methods.
|
||||
* <p>
|
||||
* Create custom keys in one of two ways. As shown above, predefined key types
|
||||
* exist for common values. Custom values can be created as shown in the code
|
||||
* for this class.
|
||||
*/
|
||||
public enum Key implements BaseKey
|
||||
public static class Keys
|
||||
{
|
||||
/**
|
||||
* Lists intervals for which NO segment is present.
|
||||
*/
|
||||
UNCOVERED_INTERVALS(
|
||||
public static final Key UNCOVERED_INTERVALS = new AbstractKey(
|
||||
"uncoveredIntervals",
|
||||
(oldValue, newValue) -> {
|
||||
final ArrayList<Interval> result = new ArrayList<Interval>((List) oldValue);
|
||||
result.addAll((List) newValue);
|
||||
true, true,
|
||||
new TypeReference<List<Interval>>()
|
||||
{
|
||||
})
|
||||
{
|
||||
@Override
|
||||
@SuppressWarnings("unchecked")
|
||||
public Object mergeValues(Object oldValue, Object newValue)
|
||||
{
|
||||
final List<Interval> result = new ArrayList<Interval>((List<Interval>) oldValue);
|
||||
result.addAll((List<Interval>) newValue);
|
||||
return result;
|
||||
}
|
||||
),
|
||||
};
|
||||
|
||||
/**
|
||||
* Indicates if the number of uncovered intervals exceeded the limit (true/false).
|
||||
*/
|
||||
UNCOVERED_INTERVALS_OVERFLOWED(
|
||||
public static final Key UNCOVERED_INTERVALS_OVERFLOWED = new BooleanKey(
|
||||
"uncoveredIntervalsOverflowed",
|
||||
(oldValue, newValue) -> (boolean) oldValue || (boolean) newValue
|
||||
),
|
||||
true);
|
||||
|
||||
/**
|
||||
* Map of most relevant query ID to remaining number of responses from query nodes.
|
||||
* The value is initialized in {@code CachingClusteredClient} when it initializes the connection to the query nodes,
|
||||
|
@ -129,156 +349,213 @@ public abstract class ResponseContext
|
|||
*
|
||||
* @see org.apache.druid.query.Query#getMostSpecificId
|
||||
*/
|
||||
REMAINING_RESPONSES_FROM_QUERY_SERVERS(
|
||||
public static final Key REMAINING_RESPONSES_FROM_QUERY_SERVERS = new AbstractKey(
|
||||
"remainingResponsesFromQueryServers",
|
||||
(totalRemainingPerId, idAndNumResponses) -> {
|
||||
false, true,
|
||||
Object.class)
|
||||
{
|
||||
@Override
|
||||
@SuppressWarnings("unchecked")
|
||||
public Object mergeValues(Object totalRemainingPerId, Object idAndNumResponses)
|
||||
{
|
||||
final ConcurrentHashMap<String, Integer> map = (ConcurrentHashMap<String, Integer>) totalRemainingPerId;
|
||||
final NonnullPair<String, Integer> pair = (NonnullPair<String, Integer>) idAndNumResponses;
|
||||
map.compute(
|
||||
pair.lhs,
|
||||
(id, remaining) -> remaining == null ? pair.rhs : remaining + pair.rhs
|
||||
);
|
||||
(id, remaining) -> remaining == null ? pair.rhs : remaining + pair.rhs);
|
||||
return map;
|
||||
}
|
||||
),
|
||||
};
|
||||
|
||||
/**
|
||||
* Lists missing segments.
|
||||
*/
|
||||
MISSING_SEGMENTS(
|
||||
public static final Key MISSING_SEGMENTS = new AbstractKey(
|
||||
"missingSegments",
|
||||
(oldValue, newValue) -> {
|
||||
final ArrayList<SegmentDescriptor> result = new ArrayList<SegmentDescriptor>((List) oldValue);
|
||||
result.addAll((List) newValue);
|
||||
true, true,
|
||||
new TypeReference<List<SegmentDescriptor>>() {})
|
||||
{
|
||||
@Override
|
||||
@SuppressWarnings("unchecked")
|
||||
public Object mergeValues(Object oldValue, Object newValue)
|
||||
{
|
||||
final List<SegmentDescriptor> result = new ArrayList<SegmentDescriptor>((List<SegmentDescriptor>) oldValue);
|
||||
result.addAll((List<SegmentDescriptor>) newValue);
|
||||
return result;
|
||||
}
|
||||
),
|
||||
};
|
||||
|
||||
/**
|
||||
* Entity tag. A part of HTTP cache validation mechanism.
|
||||
* Is being removed from the context before sending and used as a separate HTTP header.
|
||||
*/
|
||||
ETAG("ETag"),
|
||||
/**
|
||||
* Query fail time (current time + timeout).
|
||||
* It is not updated continuously as {@link Key#TIMEOUT_AT}.
|
||||
*/
|
||||
QUERY_FAIL_DEADLINE_MILLIS("queryFailTime"),
|
||||
public static final Key ETAG = new StringKey("ETag", false, true);
|
||||
|
||||
/**
|
||||
* Query total bytes gathered.
|
||||
*/
|
||||
QUERY_TOTAL_BYTES_GATHERED("queryTotalBytesGathered"),
|
||||
public static final Key QUERY_TOTAL_BYTES_GATHERED = new AbstractKey(
|
||||
"queryTotalBytesGathered",
|
||||
false, false,
|
||||
new TypeReference<AtomicLong>() {})
|
||||
{
|
||||
@Override
|
||||
public Object mergeValues(Object oldValue, Object newValue)
|
||||
{
|
||||
return ((AtomicLong) newValue).addAndGet(((AtomicLong) newValue).get());
|
||||
}
|
||||
};
|
||||
|
||||
/**
|
||||
* Query fail time (current time + timeout).
|
||||
* It is not updated continuously as {@link Keys#TIMEOUT_AT}.
|
||||
*/
|
||||
public static final Key QUERY_FAIL_DEADLINE_MILLIS = new LongKey(
|
||||
"queryFailTime",
|
||||
false);
|
||||
|
||||
/**
|
||||
* This variable indicates when a running query should be expired,
|
||||
* and is effective only when 'timeout' of queryContext has a positive value.
|
||||
* Continuously updated by {@link org.apache.druid.query.scan.ScanQueryEngine}
|
||||
* by reducing its value on the time of every scan iteration.
|
||||
*/
|
||||
TIMEOUT_AT("timeoutAt"),
|
||||
public static final Key TIMEOUT_AT = new LongKey(
|
||||
"timeoutAt",
|
||||
false);
|
||||
|
||||
/**
|
||||
* The number of scanned rows.
|
||||
* For backward compatibility the context key name still equals to "count".
|
||||
* The number of rows scanned by {@link org.apache.druid.query.scan.ScanQueryEngine}.
|
||||
*
|
||||
* Named "count" for backwards compatibility with older data servers that still send this, even though it's now
|
||||
* marked as internal.
|
||||
*/
|
||||
NUM_SCANNED_ROWS(
|
||||
public static final Key NUM_SCANNED_ROWS = new CounterKey(
|
||||
"count",
|
||||
(oldValue, newValue) -> ((Number) oldValue).longValue() + ((Number) newValue).longValue()
|
||||
),
|
||||
false);
|
||||
|
||||
/**
|
||||
* The total CPU time for threads related to Sequence processing of the query.
|
||||
* Resulting value on a Broker is a sum of downstream values from historicals / realtime nodes.
|
||||
* For additional information see {@link org.apache.druid.query.CPUTimeMetricQueryRunner}
|
||||
*/
|
||||
CPU_CONSUMED_NANOS(
|
||||
public static final Key CPU_CONSUMED_NANOS = new CounterKey(
|
||||
"cpuConsumed",
|
||||
(oldValue, newValue) -> ((Number) oldValue).longValue() + ((Number) newValue).longValue()
|
||||
),
|
||||
false);
|
||||
|
||||
/**
|
||||
* Indicates if a {@link ResponseContext} was truncated during serialization.
|
||||
*/
|
||||
TRUNCATED(
|
||||
public static final Key TRUNCATED = new BooleanKey(
|
||||
"truncated",
|
||||
(oldValue, newValue) -> (boolean) oldValue || (boolean) newValue
|
||||
);
|
||||
false);
|
||||
|
||||
/**
|
||||
* One and only global list of keys. This is a semi-constant: it is mutable
|
||||
* at start-up time, but then is not thread-safe, and must remain unchanged
|
||||
* for the duration of the server run.
|
||||
*/
|
||||
public static final Keys INSTANCE = new Keys();
|
||||
|
||||
/**
|
||||
* ConcurrentSkipListMap is used to have the natural ordering of its keys.
|
||||
* Thread-safe structure is required since there is no guarantee that {@link #registerKey(BaseKey)}
|
||||
* Thread-safe structure is required since there is no guarantee that {@link #registerKey(Key)}
|
||||
* would be called only from class static blocks.
|
||||
*/
|
||||
private static final ConcurrentMap<String, BaseKey> REGISTERED_KEYS = new ConcurrentSkipListMap<>();
|
||||
private final ConcurrentMap<String, Key> registeredKeys = new ConcurrentSkipListMap<>();
|
||||
|
||||
static {
|
||||
for (BaseKey key : values()) {
|
||||
registerKey(key);
|
||||
instance().registerKeys(new Key[]
|
||||
{
|
||||
UNCOVERED_INTERVALS,
|
||||
UNCOVERED_INTERVALS_OVERFLOWED,
|
||||
REMAINING_RESPONSES_FROM_QUERY_SERVERS,
|
||||
MISSING_SEGMENTS,
|
||||
ETAG,
|
||||
QUERY_TOTAL_BYTES_GATHERED,
|
||||
QUERY_FAIL_DEADLINE_MILLIS,
|
||||
TIMEOUT_AT,
|
||||
NUM_SCANNED_ROWS,
|
||||
CPU_CONSUMED_NANOS,
|
||||
TRUNCATED,
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Use {@link #instance()} to obtain the singleton instance of this class.
|
||||
*/
|
||||
private Keys()
|
||||
{
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the single, global key registry for this server.
|
||||
*/
|
||||
public static Keys instance()
|
||||
{
|
||||
return INSTANCE;
|
||||
}
|
||||
|
||||
/**
|
||||
* Primary way of registering context keys.
|
||||
*
|
||||
* @throws IllegalArgumentException if the key has already been registered.
|
||||
*/
|
||||
public static void registerKey(BaseKey key)
|
||||
public void registerKey(Key key)
|
||||
{
|
||||
if (REGISTERED_KEYS.putIfAbsent(key.getName(), key) != null) {
|
||||
if (registeredKeys.putIfAbsent(key.getName(), key) != null) {
|
||||
throw new IAE("Key [%s] has already been registered as a context key", key.getName());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Register a group of keys.
|
||||
*/
|
||||
public void registerKeys(Key[] keys)
|
||||
{
|
||||
for (Key key : keys) {
|
||||
registerKey(key);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a registered key associated with the name {@param name}.
|
||||
*
|
||||
* @throws IllegalStateException if a corresponding key has not been registered.
|
||||
*/
|
||||
public static BaseKey keyOf(String name)
|
||||
public Key keyOf(String name)
|
||||
{
|
||||
BaseKey key = REGISTERED_KEYS.get(name);
|
||||
Key key = registeredKeys.get(name);
|
||||
if (key == null) {
|
||||
throw new ISE("Key [%s] has not yet been registered as a context key", name);
|
||||
throw new ISE("Key [%s] is not registered as a context key", name);
|
||||
}
|
||||
return key;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns all keys registered via {@link Key#registerKey}.
|
||||
* Returns a registered key associated with the given name, or
|
||||
* {@code null} if the key is not registered. This form is for testing
|
||||
* and for deserialization when the existence of the key is suspect.
|
||||
*/
|
||||
public static Collection<BaseKey> getAllRegisteredKeys()
|
||||
public Key find(String name)
|
||||
{
|
||||
return Collections.unmodifiableCollection(REGISTERED_KEYS.values());
|
||||
return registeredKeys.get(name);
|
||||
}
|
||||
}
|
||||
|
||||
private final String name;
|
||||
protected abstract Map<Key, Object> getDelegate();
|
||||
|
||||
private final BiFunction<Object, Object, Object> mergeFunction;
|
||||
|
||||
Key(String name)
|
||||
public Map<String, Object> toMap()
|
||||
{
|
||||
this.name = name;
|
||||
this.mergeFunction = (oldValue, newValue) -> newValue;
|
||||
return CollectionUtils.mapKeys(getDelegate(), k -> k.getName());
|
||||
}
|
||||
|
||||
Key(String name, BiFunction<Object, Object, Object> mergeFunction)
|
||||
{
|
||||
this.name = name;
|
||||
this.mergeFunction = mergeFunction;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getName()
|
||||
{
|
||||
return name;
|
||||
}
|
||||
|
||||
@Override
|
||||
public BiFunction<Object, Object, Object> getMergeFunction()
|
||||
{
|
||||
return mergeFunction;
|
||||
}
|
||||
}
|
||||
|
||||
protected abstract Map<BaseKey, Object> getDelegate();
|
||||
|
||||
private static final Comparator<Map.Entry<String, JsonNode>> VALUE_LENGTH_REVERSED_COMPARATOR =
|
||||
Comparator.comparing((Map.Entry<String, JsonNode> e) -> e.getValue().toString().length()).reversed();
|
||||
|
||||
/**
|
||||
* Create an empty DefaultResponseContext instance
|
||||
*
|
||||
* @return empty DefaultResponseContext instance
|
||||
*/
|
||||
public static ResponseContext createEmpty()
|
||||
|
@ -286,40 +563,130 @@ public abstract class ResponseContext
|
|||
return DefaultResponseContext.createEmpty();
|
||||
}
|
||||
|
||||
/**
|
||||
* Initialize fields for a query context. Not needed when merging.
|
||||
*/
|
||||
public void initialize()
|
||||
{
|
||||
putValue(Keys.QUERY_TOTAL_BYTES_GATHERED, new AtomicLong());
|
||||
initializeRemainingResponses();
|
||||
}
|
||||
|
||||
public void initializeRemainingResponses()
|
||||
{
|
||||
putValue(Keys.REMAINING_RESPONSES_FROM_QUERY_SERVERS, new ConcurrentHashMap<>());
|
||||
}
|
||||
|
||||
public void initializeMissingSegments()
|
||||
{
|
||||
putValue(Keys.MISSING_SEGMENTS, new ArrayList<>());
|
||||
}
|
||||
|
||||
public void initializeRowScanCount()
|
||||
{
|
||||
putValue(Keys.NUM_SCANNED_ROWS, 0L);
|
||||
}
|
||||
|
||||
/**
|
||||
* Deserializes a string into {@link ResponseContext} using given {@link ObjectMapper}.
|
||||
*
|
||||
* @throws IllegalStateException if one of the deserialized map keys has not been registered.
|
||||
*/
|
||||
public static ResponseContext deserialize(String responseContext, ObjectMapper objectMapper) throws IOException
|
||||
{
|
||||
final Map<String, Object> keyNameToObjects = objectMapper.readValue(
|
||||
responseContext,
|
||||
JacksonUtils.TYPE_REFERENCE_MAP_STRING_OBJECT
|
||||
);
|
||||
final ResponseContext context = ResponseContext.createEmpty();
|
||||
keyNameToObjects.forEach((keyName, value) -> {
|
||||
final BaseKey key = Key.keyOf(keyName);
|
||||
context.add(key, value);
|
||||
});
|
||||
return context;
|
||||
return objectMapper.readValue(responseContext, ResponseContext.class);
|
||||
}
|
||||
|
||||
/**
|
||||
* Associates the specified object with the specified key.
|
||||
* Associates the specified object with the specified extension key.
|
||||
*
|
||||
* @throws IllegalStateException if the key has not been registered.
|
||||
*/
|
||||
public Object put(BaseKey key, Object value)
|
||||
public Object put(Key key, Object value)
|
||||
{
|
||||
final BaseKey registeredKey = Key.keyOf(key.getName());
|
||||
return getDelegate().put(registeredKey, value);
|
||||
final Key registeredKey = Keys.instance().keyOf(key.getName());
|
||||
return putValue(registeredKey, value);
|
||||
}
|
||||
|
||||
public Object get(BaseKey key)
|
||||
public void putUncoveredIntervals(List<Interval> intervals, boolean overflowed)
|
||||
{
|
||||
putValue(Keys.UNCOVERED_INTERVALS, intervals);
|
||||
putValue(Keys.UNCOVERED_INTERVALS_OVERFLOWED, overflowed);
|
||||
}
|
||||
|
||||
public void putEntityTag(String eTag)
|
||||
{
|
||||
putValue(Keys.ETAG, eTag);
|
||||
}
|
||||
|
||||
public void putTimeoutTime(long time)
|
||||
{
|
||||
putValue(Keys.TIMEOUT_AT, time);
|
||||
}
|
||||
|
||||
public void putQueryFailDeadlineMs(long deadlineMs)
|
||||
{
|
||||
putValue(Keys.QUERY_FAIL_DEADLINE_MILLIS, deadlineMs);
|
||||
}
|
||||
|
||||
/**
|
||||
* Associates the specified object with the specified key. Assumes that
|
||||
* the key is validated.
|
||||
*/
|
||||
private Object putValue(Key key, Object value)
|
||||
{
|
||||
return getDelegate().put(key, value);
|
||||
}
|
||||
|
||||
public Object get(Key key)
|
||||
{
|
||||
return getDelegate().get(key);
|
||||
}
|
||||
|
||||
public Object remove(BaseKey key)
|
||||
@SuppressWarnings("unchecked")
|
||||
public ConcurrentHashMap<String, Integer> getRemainingResponses()
|
||||
{
|
||||
return (ConcurrentHashMap<String, Integer>) get(Keys.REMAINING_RESPONSES_FROM_QUERY_SERVERS);
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
public List<Interval> getUncoveredIntervals()
|
||||
{
|
||||
return (List<Interval>) get(Keys.UNCOVERED_INTERVALS);
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
public List<SegmentDescriptor> getMissingSegments()
|
||||
{
|
||||
return (List<SegmentDescriptor>) get(Keys.MISSING_SEGMENTS);
|
||||
}
|
||||
|
||||
public String getEntityTag()
|
||||
{
|
||||
return (String) get(Keys.ETAG);
|
||||
}
|
||||
|
||||
public AtomicLong getTotalBytes()
|
||||
{
|
||||
return (AtomicLong) get(Keys.QUERY_TOTAL_BYTES_GATHERED);
|
||||
}
|
||||
|
||||
public Long getTimeoutTime()
|
||||
{
|
||||
return (Long) get(Keys.TIMEOUT_AT);
|
||||
}
|
||||
|
||||
public Long getRowScanCount()
|
||||
{
|
||||
return (Long) get(Keys.NUM_SCANNED_ROWS);
|
||||
}
|
||||
|
||||
public Long getCpuNanos()
|
||||
{
|
||||
return (Long) get(Keys.CPU_CONSUMED_NANOS);
|
||||
}
|
||||
|
||||
public Object remove(Key key)
|
||||
{
|
||||
return getDelegate().remove(key);
|
||||
}
|
||||
|
@ -327,16 +694,44 @@ public abstract class ResponseContext
|
|||
/**
|
||||
* Adds (merges) a new value associated with a key to an old value.
|
||||
* See merge function of a context key for a specific implementation.
|
||||
*
|
||||
* @throws IllegalStateException if the key has not been registered.
|
||||
*/
|
||||
public Object add(BaseKey key, Object value)
|
||||
public Object add(Key key, Object value)
|
||||
{
|
||||
final BaseKey registeredKey = Key.keyOf(key.getName());
|
||||
return getDelegate().merge(registeredKey, value, key.getMergeFunction());
|
||||
final Key registeredKey = Keys.instance().keyOf(key.getName());
|
||||
return addValue(registeredKey, value);
|
||||
}
|
||||
|
||||
public void addRemainingResponse(String id, int count)
|
||||
{
|
||||
addValue(Keys.REMAINING_RESPONSES_FROM_QUERY_SERVERS,
|
||||
new NonnullPair<>(id, count));
|
||||
}
|
||||
|
||||
public void addMissingSegments(List<SegmentDescriptor> descriptors)
|
||||
{
|
||||
addValue(Keys.MISSING_SEGMENTS, descriptors);
|
||||
}
|
||||
|
||||
public void addRowScanCount(long count)
|
||||
{
|
||||
addValue(Keys.NUM_SCANNED_ROWS, count);
|
||||
}
|
||||
|
||||
public void addCpuNanos(long ns)
|
||||
{
|
||||
addValue(Keys.CPU_CONSUMED_NANOS, ns);
|
||||
}
|
||||
|
||||
private Object addValue(Key key, Object value)
|
||||
{
|
||||
return getDelegate().merge(key, value, key::mergeValues);
|
||||
}
|
||||
|
||||
/**
|
||||
* Merges a response context into the current.
|
||||
*
|
||||
* @throws IllegalStateException If a key of the {@code responseContext} has not been registered.
|
||||
*/
|
||||
public void merge(ResponseContext responseContext)
|
||||
|
@ -351,87 +746,101 @@ public abstract class ResponseContext
|
|||
/**
|
||||
* Serializes the context given that the resulting string length is less than the provided limit.
|
||||
* This method removes some elements from context collections if it's needed to satisfy the limit.
|
||||
* There is no explicit priorities of keys which values are being truncated because for now there are only
|
||||
* two potential limit breaking keys ({@link Key#UNCOVERED_INTERVALS}
|
||||
* and {@link Key#MISSING_SEGMENTS}) and their values are arrays.
|
||||
* Thus current implementation considers these arrays as equal prioritized and starts removing elements from
|
||||
* There is no explicit priorities of keys which values are being truncated.
|
||||
* Any kind of key can be removed, the key's @{code canDrop()} attribute indicates
|
||||
* which can be dropped. (The unit tests use a string key.)
|
||||
* Thus keys as equally prioritized and starts removing elements from
|
||||
* the array which serialized value length is the biggest.
|
||||
* The resulting string might be correctly deserialized to {@link ResponseContext}.
|
||||
* The resulting string will be correctly deserialized to {@link ResponseContext}.
|
||||
*/
|
||||
public SerializationResult serializeWith(ObjectMapper objectMapper, int maxCharsNumber) throws JsonProcessingException
|
||||
public SerializationResult serializeWith(ObjectMapper objectMapper, int maxCharsNumber)
|
||||
throws JsonProcessingException
|
||||
{
|
||||
final String fullSerializedString = objectMapper.writeValueAsString(getDelegate());
|
||||
final Map<Key, Object> headerMap =
|
||||
getDelegate().entrySet()
|
||||
.stream()
|
||||
.filter(entry -> entry.getKey().includeInHeader())
|
||||
.collect(
|
||||
Collectors.toMap(
|
||||
Map.Entry::getKey,
|
||||
Map.Entry::getValue
|
||||
)
|
||||
);
|
||||
|
||||
final String fullSerializedString = objectMapper.writeValueAsString(headerMap);
|
||||
if (fullSerializedString.length() <= maxCharsNumber) {
|
||||
return new SerializationResult(null, fullSerializedString);
|
||||
} else {
|
||||
// Indicates that the context is truncated during serialization.
|
||||
add(Key.TRUNCATED, true);
|
||||
final ObjectNode contextJsonNode = objectMapper.valueToTree(getDelegate());
|
||||
final ArrayList<Map.Entry<String, JsonNode>> sortedNodesByLength = Lists.newArrayList(contextJsonNode.fields());
|
||||
sortedNodesByLength.sort(VALUE_LENGTH_REVERSED_COMPARATOR);
|
||||
}
|
||||
|
||||
int needToRemoveCharsNumber = fullSerializedString.length() - maxCharsNumber;
|
||||
// Indicates that the context is truncated during serialization.
|
||||
headerMap.put(Keys.TRUNCATED, true);
|
||||
// Account for the extra field just added: "truncated: true,
|
||||
// The length of ": true," is 7.
|
||||
needToRemoveCharsNumber += Keys.TRUNCATED.getName().length() + 7;
|
||||
final ObjectNode contextJsonNode = objectMapper.valueToTree(headerMap);
|
||||
final List<Map.Entry<String, JsonNode>> sortedNodesByLength = Lists.newArrayList(contextJsonNode.fields());
|
||||
sortedNodesByLength.sort(VALUE_LENGTH_REVERSED_COMPARATOR);
|
||||
// The complexity of this block is O(n*m*log(m)) where n - context size, m - context's array size
|
||||
for (Map.Entry<String, JsonNode> e : sortedNodesByLength) {
|
||||
final String fieldName = e.getKey();
|
||||
if (!Keys.instance().keyOf(fieldName).canDrop()) {
|
||||
continue;
|
||||
}
|
||||
final JsonNode node = e.getValue();
|
||||
if (node.isArray()) {
|
||||
if (needToRemoveCharsNumber >= node.toString().length()) {
|
||||
// We need to remove more chars than the field's length so removing it completely
|
||||
int removeLength = fieldName.length() + node.toString().length();
|
||||
if (removeLength < needToRemoveCharsNumber || !node.isArray()) {
|
||||
// Remove the field
|
||||
contextJsonNode.remove(fieldName);
|
||||
// Since the field is completely removed (name + value) we need to do a recalculation
|
||||
needToRemoveCharsNumber = contextJsonNode.toString().length() - maxCharsNumber;
|
||||
needToRemoveCharsNumber -= removeLength;
|
||||
} else {
|
||||
final ArrayNode arrayNode = (ArrayNode) node;
|
||||
needToRemoveCharsNumber -= removeNodeElementsToSatisfyCharsLimit(arrayNode, needToRemoveCharsNumber);
|
||||
int removed = removeNodeElementsToSatisfyCharsLimit(arrayNode, needToRemoveCharsNumber);
|
||||
if (arrayNode.size() == 0) {
|
||||
// The field is empty, removing it because an empty array field may be misleading
|
||||
// The field is now empty, removing it because an empty array field may be misleading
|
||||
// for the recipients of the truncated response context.
|
||||
contextJsonNode.remove(fieldName);
|
||||
// Since the field is completely removed (name + value) we need to do a recalculation
|
||||
needToRemoveCharsNumber = contextJsonNode.toString().length() - maxCharsNumber;
|
||||
}
|
||||
} // node is not an array
|
||||
needToRemoveCharsNumber -= removeLength;
|
||||
} else {
|
||||
// A context should not contain nulls so we completely remove the field.
|
||||
contextJsonNode.remove(fieldName);
|
||||
// Since the field is completely removed (name + value) we need to do a recalculation
|
||||
needToRemoveCharsNumber = contextJsonNode.toString().length() - maxCharsNumber;
|
||||
needToRemoveCharsNumber -= removed;
|
||||
}
|
||||
}
|
||||
|
||||
if (needToRemoveCharsNumber <= 0) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
return new SerializationResult(contextJsonNode.toString(), fullSerializedString);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Removes {@code node}'s elements which total length of serialized values is greater or equal to the passed limit.
|
||||
* If it is impossible to satisfy the limit the method removes all {@code node}'s elements.
|
||||
* On every iteration it removes exactly half of the remained elements to reduce the overall complexity.
|
||||
*
|
||||
* @param node {@link ArrayNode} which elements are being removed.
|
||||
* @param needToRemoveCharsNumber the number of chars need to be removed.
|
||||
* @param target the number of chars need to be removed.
|
||||
*
|
||||
* @return the number of removed chars.
|
||||
*/
|
||||
private static int removeNodeElementsToSatisfyCharsLimit(ArrayNode node, int needToRemoveCharsNumber)
|
||||
private static int removeNodeElementsToSatisfyCharsLimit(ArrayNode node, int target)
|
||||
{
|
||||
int removedCharsNumber = 0;
|
||||
while (node.size() > 0 && needToRemoveCharsNumber > removedCharsNumber) {
|
||||
final int lengthBeforeRemove = node.toString().length();
|
||||
int nodeLen = node.toString().length();
|
||||
final int startLen = nodeLen;
|
||||
while (node.size() > 0 && target > startLen - nodeLen) {
|
||||
// Reducing complexity by removing half of array's elements
|
||||
final int removeUntil = node.size() / 2;
|
||||
for (int removeAt = node.size() - 1; removeAt >= removeUntil; removeAt--) {
|
||||
node.remove(removeAt);
|
||||
}
|
||||
final int lengthAfterRemove = node.toString().length();
|
||||
removedCharsNumber += lengthBeforeRemove - lengthAfterRemove;
|
||||
nodeLen = node.toString().length();
|
||||
}
|
||||
return removedCharsNumber;
|
||||
return startLen - nodeLen;
|
||||
}
|
||||
|
||||
/**
|
||||
* Serialization result of {@link ResponseContext}.
|
||||
* Response context might be serialized using max legth limit, in this case the context might be reduced
|
||||
* Response context might be serialized using max length limit, in this case the context might be reduced
|
||||
* by removing max-length fields one by one unless serialization result length is less than the limit.
|
||||
* This structure has a reduced serialization result along with full result and boolean property
|
||||
* indicating if some fields were removed from the context.
|
||||
|
|
|
@ -0,0 +1,128 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.apache.druid.query.context;
|
||||
|
||||
import com.fasterxml.jackson.core.JsonParser;
|
||||
import com.fasterxml.jackson.core.JsonToken;
|
||||
import com.fasterxml.jackson.databind.DeserializationContext;
|
||||
import com.fasterxml.jackson.databind.JsonMappingException;
|
||||
import com.fasterxml.jackson.databind.deser.std.StdDeserializer;
|
||||
import java.io.IOException;
|
||||
|
||||
/**
|
||||
* Deserialize a response context. The response context is created for single-thread use.
|
||||
* (That is, it is non-concurrent.) Clients of this code should convert the
|
||||
* context to concurrent if it will be used across threads.
|
||||
*/
|
||||
@SuppressWarnings("serial")
|
||||
public class ResponseContextDeserializer extends StdDeserializer<ResponseContext>
|
||||
{
|
||||
public ResponseContextDeserializer()
|
||||
{
|
||||
super(ResponseContext.class);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ResponseContext deserialize(
|
||||
final JsonParser jp,
|
||||
final DeserializationContext ctxt
|
||||
) throws IOException
|
||||
{
|
||||
if (jp.currentToken() != JsonToken.START_OBJECT) {
|
||||
throw ctxt.wrongTokenException(jp, ResponseContext.class, JsonToken.START_OBJECT, null);
|
||||
}
|
||||
|
||||
final ResponseContext retVal = ResponseContext.createEmpty();
|
||||
|
||||
jp.nextToken();
|
||||
|
||||
ResponseContext.Keys keys = ResponseContext.Keys.instance();
|
||||
while (jp.currentToken() == JsonToken.FIELD_NAME) {
|
||||
// Get the key. Since this is a deserialization, the sender may
|
||||
// be a different version of Druid with a different set of keys.
|
||||
// Ignore any keys which the sender knows about but this node
|
||||
// does not know about.
|
||||
final ResponseContext.Key key = keys.find(jp.getText());
|
||||
|
||||
jp.nextToken();
|
||||
if (key == null) {
|
||||
skipValue(jp, jp.getText());
|
||||
} else {
|
||||
retVal.add(key, key.readValue(jp));
|
||||
}
|
||||
|
||||
jp.nextToken();
|
||||
}
|
||||
|
||||
if (jp.currentToken() != JsonToken.END_OBJECT) {
|
||||
throw ctxt.wrongTokenException(jp, ResponseContext.class, JsonToken.END_OBJECT, null);
|
||||
}
|
||||
|
||||
return retVal;
|
||||
}
|
||||
|
||||
/**
|
||||
* Skip over a single JSON value: scalar or composite.
|
||||
*/
|
||||
private void skipValue(final JsonParser jp, String key) throws IOException
|
||||
{
|
||||
final JsonToken token = jp.currentToken();
|
||||
switch (token) {
|
||||
case START_OBJECT:
|
||||
skipTo(jp, JsonToken.END_OBJECT);
|
||||
break;
|
||||
case START_ARRAY:
|
||||
skipTo(jp, JsonToken.END_ARRAY);
|
||||
break;
|
||||
default:
|
||||
if (token.isScalarValue()) {
|
||||
return;
|
||||
}
|
||||
throw new JsonMappingException(jp, "Invalid JSON inside unknown key: " + key);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Freewheel over the contents of a structured object, including any
|
||||
* nested structured objects, until the given end token.
|
||||
*/
|
||||
private void skipTo(final JsonParser jp, JsonToken end) throws IOException
|
||||
{
|
||||
while (true) {
|
||||
jp.nextToken();
|
||||
final JsonToken token = jp.currentToken();
|
||||
if (token == null) {
|
||||
throw new JsonMappingException(jp, "Premature EOF");
|
||||
}
|
||||
switch (token) {
|
||||
case START_OBJECT:
|
||||
skipTo(jp, JsonToken.END_OBJECT);
|
||||
break;
|
||||
case START_ARRAY:
|
||||
skipTo(jp, JsonToken.END_ARRAY);
|
||||
break;
|
||||
default:
|
||||
if (token == end) {
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -67,15 +67,12 @@ public class ScanQueryEngine
|
|||
// "legacy" should be non-null due to toolChest.mergeResults
|
||||
final boolean legacy = Preconditions.checkNotNull(query.isLegacy(), "Expected non-null 'legacy' parameter");
|
||||
|
||||
final Object numScannedRows = responseContext.get(ResponseContext.Key.NUM_SCANNED_ROWS);
|
||||
if (numScannedRows != null) {
|
||||
long count = (long) numScannedRows;
|
||||
if (count >= query.getScanRowsLimit() && query.getTimeOrder().equals(ScanQuery.Order.NONE)) {
|
||||
final Long numScannedRows = responseContext.getRowScanCount();
|
||||
if (numScannedRows != null && numScannedRows >= query.getScanRowsLimit() && query.getTimeOrder().equals(ScanQuery.Order.NONE)) {
|
||||
return Sequences.empty();
|
||||
}
|
||||
}
|
||||
final boolean hasTimeout = QueryContexts.hasTimeout(query);
|
||||
final long timeoutAt = (long) responseContext.get(ResponseContext.Key.TIMEOUT_AT);
|
||||
final Long timeoutAt = responseContext.getTimeoutTime();
|
||||
final long start = System.currentTimeMillis();
|
||||
final StorageAdapter adapter = segment.asStorageAdapter();
|
||||
|
||||
|
@ -122,7 +119,8 @@ public class ScanQueryEngine
|
|||
|
||||
final Filter filter = Filters.convertToCNFFromQueryContext(query, Filters.toFilter(query.getFilter()));
|
||||
|
||||
responseContext.add(ResponseContext.Key.NUM_SCANNED_ROWS, 0L);
|
||||
// If the row count is not set, set it to 0, else do nothing.
|
||||
responseContext.addRowScanCount(0);
|
||||
final long limit = calculateRemainingScanRowsLimit(query, responseContext);
|
||||
return Sequences.concat(
|
||||
adapter
|
||||
|
@ -186,10 +184,9 @@ public class ScanQueryEngine
|
|||
} else {
|
||||
throw new UOE("resultFormat[%s] is not supported", resultFormat.toString());
|
||||
}
|
||||
responseContext.add(ResponseContext.Key.NUM_SCANNED_ROWS, offset - lastOffset);
|
||||
responseContext.addRowScanCount(offset - lastOffset);
|
||||
if (hasTimeout) {
|
||||
responseContext.put(
|
||||
ResponseContext.Key.TIMEOUT_AT,
|
||||
responseContext.putTimeoutTime(
|
||||
timeoutAt - (System.currentTimeMillis() - start)
|
||||
);
|
||||
}
|
||||
|
@ -262,7 +259,7 @@ public class ScanQueryEngine
|
|||
private long calculateRemainingScanRowsLimit(ScanQuery query, ResponseContext responseContext)
|
||||
{
|
||||
if (query.getTimeOrder().equals(ScanQuery.Order.NONE)) {
|
||||
return query.getScanRowsLimit() - (long) responseContext.get(ResponseContext.Key.NUM_SCANNED_ROWS);
|
||||
return query.getScanRowsLimit() - (Long) responseContext.getRowScanCount();
|
||||
}
|
||||
return query.getScanRowsLimit();
|
||||
}
|
||||
|
|
|
@ -87,7 +87,7 @@ public class ScanQueryRunnerFactory implements QueryRunnerFactory<ScanResultValu
|
|||
final Iterable<QueryRunner<ScanResultValue>> queryRunners
|
||||
)
|
||||
{
|
||||
// in single thread and in jetty thread instead of processing thread
|
||||
// in single thread and in Jetty thread instead of processing thread
|
||||
return (queryPlus, responseContext) -> {
|
||||
ScanQuery query = (ScanQuery) queryPlus.getQuery();
|
||||
ScanQuery.verifyOrderByForNativeExecution(query);
|
||||
|
@ -95,7 +95,7 @@ public class ScanQueryRunnerFactory implements QueryRunnerFactory<ScanResultValu
|
|||
// Note: this variable is effective only when queryContext has a timeout.
|
||||
// See the comment of ResponseContext.Key.TIMEOUT_AT.
|
||||
final long timeoutAt = System.currentTimeMillis() + QueryContexts.getTimeout(queryPlus.getQuery());
|
||||
responseContext.put(ResponseContext.Key.TIMEOUT_AT, timeoutAt);
|
||||
responseContext.putTimeoutTime(timeoutAt);
|
||||
|
||||
if (query.getTimeOrder().equals(ScanQuery.Order.NONE)) {
|
||||
// Use normal strategy
|
||||
|
@ -369,9 +369,9 @@ public class ScanQueryRunnerFactory implements QueryRunnerFactory<ScanResultValu
|
|||
ScanQuery.verifyOrderByForNativeExecution((ScanQuery) query);
|
||||
|
||||
// it happens in unit tests
|
||||
final Number timeoutAt = (Number) responseContext.get(ResponseContext.Key.TIMEOUT_AT);
|
||||
if (timeoutAt == null || timeoutAt.longValue() == 0L) {
|
||||
responseContext.put(ResponseContext.Key.TIMEOUT_AT, JodaUtils.MAX_INSTANT);
|
||||
final Long timeoutAt = responseContext.getTimeoutTime();
|
||||
if (timeoutAt == null || timeoutAt == 0L) {
|
||||
responseContext.putTimeoutTime(JodaUtils.MAX_INSTANT);
|
||||
}
|
||||
return engine.process((ScanQuery) query, segment, responseContext);
|
||||
}
|
||||
|
|
|
@ -157,8 +157,7 @@ public class SpecificSegmentQueryRunner<T> implements QueryRunner<T>
|
|||
|
||||
private void appendMissingSegment(ResponseContext responseContext)
|
||||
{
|
||||
responseContext.add(
|
||||
ResponseContext.Key.MISSING_SEGMENTS,
|
||||
responseContext.addMissingSegments(
|
||||
Collections.singletonList(specificSpec.getDescriptor())
|
||||
);
|
||||
}
|
||||
|
|
|
@ -23,7 +23,6 @@ import com.google.common.collect.ImmutableList;
|
|||
import org.apache.druid.java.util.common.Intervals;
|
||||
import org.apache.druid.query.context.DefaultResponseContext;
|
||||
import org.apache.druid.query.context.ResponseContext;
|
||||
import org.apache.druid.query.context.ResponseContext.Key;
|
||||
import org.apache.druid.query.filter.DimFilter;
|
||||
import org.apache.druid.query.spec.MultipleSpecificSegmentSpec;
|
||||
import org.apache.druid.query.spec.QuerySegmentSpec;
|
||||
|
@ -47,8 +46,8 @@ public class ReportTimelineMissingSegmentQueryRunnerTest
|
|||
= new ReportTimelineMissingSegmentQueryRunner<>(missingSegment);
|
||||
final ResponseContext responseContext = DefaultResponseContext.createEmpty();
|
||||
runner.run(QueryPlus.wrap(new TestQuery()), responseContext);
|
||||
Assert.assertNotNull(responseContext.get(Key.MISSING_SEGMENTS));
|
||||
Assert.assertEquals(Collections.singletonList(missingSegment), responseContext.get(Key.MISSING_SEGMENTS));
|
||||
Assert.assertNotNull(responseContext.getMissingSegments());
|
||||
Assert.assertEquals(Collections.singletonList(missingSegment), responseContext.getMissingSegments());
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -63,8 +62,8 @@ public class ReportTimelineMissingSegmentQueryRunnerTest
|
|||
= new ReportTimelineMissingSegmentQueryRunner<>(missingSegments);
|
||||
final ResponseContext responseContext = DefaultResponseContext.createEmpty();
|
||||
runner.run(QueryPlus.wrap(new TestQuery()), responseContext);
|
||||
Assert.assertNotNull(responseContext.get(Key.MISSING_SEGMENTS));
|
||||
Assert.assertEquals(missingSegments, responseContext.get(Key.MISSING_SEGMENTS));
|
||||
Assert.assertNotNull(responseContext.getMissingSegments());
|
||||
Assert.assertEquals(missingSegments, responseContext.getMissingSegments());
|
||||
}
|
||||
|
||||
private static class TestQuery extends BaseQuery<Object>
|
||||
|
|
|
@ -20,12 +20,17 @@
|
|||
package org.apache.druid.query.context;
|
||||
|
||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.common.base.Strings;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import org.apache.druid.jackson.DefaultObjectMapper;
|
||||
import org.apache.druid.java.util.common.Intervals;
|
||||
import org.apache.druid.java.util.common.NonnullPair;
|
||||
import org.apache.druid.java.util.common.StringUtils;
|
||||
import org.apache.druid.query.SegmentDescriptor;
|
||||
import org.apache.druid.query.context.ResponseContext.CounterKey;
|
||||
import org.apache.druid.query.context.ResponseContext.Key;
|
||||
import org.apache.druid.query.context.ResponseContext.Keys;
|
||||
import org.apache.druid.query.context.ResponseContext.StringKey;
|
||||
import org.joda.time.Interval;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
@ -33,177 +38,171 @@ import org.junit.Test;
|
|||
import java.io.IOException;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.function.BiFunction;
|
||||
|
||||
public class ResponseContextTest
|
||||
{
|
||||
|
||||
enum ExtensionResponseContextKey implements ResponseContext.BaseKey
|
||||
{
|
||||
EXTENSION_KEY_1("extension_key_1"),
|
||||
EXTENSION_KEY_2("extension_key_2", (oldValue, newValue) -> (long) oldValue + (long) newValue);
|
||||
// Droppable header key
|
||||
static final Key EXTN_STRING_KEY = new StringKey(
|
||||
"extn_string_key", true, true);
|
||||
// Non-droppable header key
|
||||
static final Key EXTN_COUNTER_KEY = new CounterKey(
|
||||
"extn_counter_key", true);
|
||||
|
||||
static {
|
||||
for (ResponseContext.BaseKey key : values()) {
|
||||
ResponseContext.Key.registerKey(key);
|
||||
}
|
||||
Keys.instance().registerKeys(new Key[] {
|
||||
EXTN_STRING_KEY,
|
||||
EXTN_COUNTER_KEY
|
||||
});
|
||||
}
|
||||
|
||||
private final String name;
|
||||
private final BiFunction<Object, Object, Object> mergeFunction;
|
||||
|
||||
ExtensionResponseContextKey(String name)
|
||||
{
|
||||
this.name = name;
|
||||
this.mergeFunction = (oldValue, newValue) -> newValue;
|
||||
}
|
||||
|
||||
ExtensionResponseContextKey(String name, BiFunction<Object, Object, Object> mergeFunction)
|
||||
{
|
||||
this.name = name;
|
||||
this.mergeFunction = mergeFunction;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getName()
|
||||
{
|
||||
return name;
|
||||
}
|
||||
|
||||
@Override
|
||||
public BiFunction<Object, Object, Object> getMergeFunction()
|
||||
{
|
||||
return mergeFunction;
|
||||
}
|
||||
}
|
||||
|
||||
private final ResponseContext.BaseKey nonregisteredKey = new ResponseContext.BaseKey()
|
||||
{
|
||||
@Override
|
||||
public String getName()
|
||||
{
|
||||
return "non-registered-key";
|
||||
}
|
||||
|
||||
@Override
|
||||
public BiFunction<Object, Object, Object> getMergeFunction()
|
||||
{
|
||||
return (Object a, Object b) -> a;
|
||||
}
|
||||
};
|
||||
static final Key UNREGISTERED_KEY = new StringKey(
|
||||
"unregistered-key", true, true);
|
||||
|
||||
@Test(expected = IllegalStateException.class)
|
||||
public void putISETest()
|
||||
{
|
||||
ResponseContext.createEmpty().put(nonregisteredKey, new Object());
|
||||
ResponseContext.createEmpty().put(UNREGISTERED_KEY, new Object());
|
||||
}
|
||||
|
||||
@Test(expected = IllegalStateException.class)
|
||||
public void addISETest()
|
||||
{
|
||||
ResponseContext.createEmpty().add(nonregisteredKey, new Object());
|
||||
ResponseContext.createEmpty().add(UNREGISTERED_KEY, new Object());
|
||||
}
|
||||
|
||||
@Test(expected = IllegalArgumentException.class)
|
||||
public void registerKeyIAETest()
|
||||
{
|
||||
ResponseContext.Key.registerKey(ResponseContext.Key.NUM_SCANNED_ROWS);
|
||||
Keys.INSTANCE.registerKey(Keys.NUM_SCANNED_ROWS);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void mergeValueTest()
|
||||
public void mergeETagTest()
|
||||
{
|
||||
final ResponseContext ctx = ResponseContext.createEmpty();
|
||||
ctx.add(ResponseContext.Key.ETAG, "dummy-etag");
|
||||
Assert.assertEquals("dummy-etag", ctx.get(ResponseContext.Key.ETAG));
|
||||
ctx.add(ResponseContext.Key.ETAG, "new-dummy-etag");
|
||||
Assert.assertEquals("new-dummy-etag", ctx.get(ResponseContext.Key.ETAG));
|
||||
ctx.putEntityTag("dummy-etag");
|
||||
Assert.assertEquals("dummy-etag", ctx.getEntityTag());
|
||||
ctx.putEntityTag("new-dummy-etag");
|
||||
Assert.assertEquals("new-dummy-etag", ctx.getEntityTag());
|
||||
}
|
||||
|
||||
final Interval interval01 = Intervals.of("2019-01-01/P1D");
|
||||
ctx.add(ResponseContext.Key.UNCOVERED_INTERVALS, Collections.singletonList(interval01));
|
||||
Assert.assertArrayEquals(
|
||||
Collections.singletonList(interval01).toArray(),
|
||||
((List) ctx.get(ResponseContext.Key.UNCOVERED_INTERVALS)).toArray()
|
||||
);
|
||||
final Interval interval12 = Intervals.of("2019-01-02/P1D");
|
||||
final Interval interval23 = Intervals.of("2019-01-03/P1D");
|
||||
ctx.add(ResponseContext.Key.UNCOVERED_INTERVALS, Arrays.asList(interval12, interval23));
|
||||
Assert.assertArrayEquals(
|
||||
Arrays.asList(interval01, interval12, interval23).toArray(),
|
||||
((List) ctx.get(ResponseContext.Key.UNCOVERED_INTERVALS)).toArray()
|
||||
);
|
||||
private static final Interval INTERVAL_01 = Intervals.of("2019-01-01/P1D");
|
||||
private static final Interval INTERVAL_12 = Intervals.of("2019-01-02/P1D");
|
||||
private static final Interval INTERVAL_23 = Intervals.of("2019-01-03/P1D");
|
||||
|
||||
@Test
|
||||
public void mergeUncoveredIntervalsTest()
|
||||
{
|
||||
final ResponseContext ctx = ResponseContext.createEmpty();
|
||||
ctx.putUncoveredIntervals(Collections.singletonList(INTERVAL_01), false);
|
||||
Assert.assertArrayEquals(
|
||||
Collections.singletonList(INTERVAL_01).toArray(),
|
||||
ctx.getUncoveredIntervals().toArray()
|
||||
);
|
||||
ctx.add(Keys.UNCOVERED_INTERVALS, Arrays.asList(INTERVAL_12, INTERVAL_23));
|
||||
Assert.assertArrayEquals(
|
||||
Arrays.asList(INTERVAL_01, INTERVAL_12, INTERVAL_23).toArray(),
|
||||
ctx.getUncoveredIntervals().toArray()
|
||||
);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void mergeRemainingResponseTest()
|
||||
{
|
||||
final ResponseContext ctx = ResponseContext.createEmpty();
|
||||
final String queryId = "queryId";
|
||||
final String queryId2 = "queryId2";
|
||||
ctx.put(Key.REMAINING_RESPONSES_FROM_QUERY_SERVERS, new ConcurrentHashMap<>());
|
||||
ctx.add(Key.REMAINING_RESPONSES_FROM_QUERY_SERVERS, new NonnullPair<>(queryId, 3));
|
||||
ctx.add(Key.REMAINING_RESPONSES_FROM_QUERY_SERVERS, new NonnullPair<>(queryId2, 4));
|
||||
ctx.add(Key.REMAINING_RESPONSES_FROM_QUERY_SERVERS, new NonnullPair<>(queryId, -1));
|
||||
ctx.add(Key.REMAINING_RESPONSES_FROM_QUERY_SERVERS, new NonnullPair<>(queryId, -2));
|
||||
ctx.initialize();
|
||||
ctx.addRemainingResponse(queryId, 3);
|
||||
ctx.addRemainingResponse(queryId2, 4);
|
||||
ctx.addRemainingResponse(queryId, -1);
|
||||
ctx.addRemainingResponse(queryId, -2);
|
||||
Assert.assertEquals(
|
||||
ImmutableMap.of(queryId, 0, queryId2, 4),
|
||||
ctx.get(Key.REMAINING_RESPONSES_FROM_QUERY_SERVERS)
|
||||
ctx.get(Keys.REMAINING_RESPONSES_FROM_QUERY_SERVERS)
|
||||
);
|
||||
}
|
||||
|
||||
final SegmentDescriptor sd01 = new SegmentDescriptor(interval01, "01", 0);
|
||||
ctx.add(ResponseContext.Key.MISSING_SEGMENTS, Collections.singletonList(sd01));
|
||||
@Test
|
||||
public void mergeMissingSegmentsTest()
|
||||
{
|
||||
final ResponseContext ctx = ResponseContext.createEmpty();
|
||||
final SegmentDescriptor sd01 = new SegmentDescriptor(INTERVAL_01, "01", 0);
|
||||
ctx.addMissingSegments(Collections.singletonList(sd01));
|
||||
Assert.assertArrayEquals(
|
||||
Collections.singletonList(sd01).toArray(),
|
||||
((List) ctx.get(ResponseContext.Key.MISSING_SEGMENTS)).toArray()
|
||||
ctx.getMissingSegments().toArray()
|
||||
);
|
||||
final SegmentDescriptor sd12 = new SegmentDescriptor(interval12, "12", 1);
|
||||
final SegmentDescriptor sd23 = new SegmentDescriptor(interval23, "23", 2);
|
||||
ctx.add(ResponseContext.Key.MISSING_SEGMENTS, Arrays.asList(sd12, sd23));
|
||||
final SegmentDescriptor sd12 = new SegmentDescriptor(INTERVAL_12, "12", 1);
|
||||
final SegmentDescriptor sd23 = new SegmentDescriptor(INTERVAL_23, "23", 2);
|
||||
ctx.addMissingSegments(Arrays.asList(sd12, sd23));
|
||||
Assert.assertArrayEquals(
|
||||
Arrays.asList(sd01, sd12, sd23).toArray(),
|
||||
((List) ctx.get(ResponseContext.Key.MISSING_SEGMENTS)).toArray()
|
||||
ctx.getMissingSegments().toArray()
|
||||
);
|
||||
}
|
||||
|
||||
ctx.add(ResponseContext.Key.NUM_SCANNED_ROWS, 0L);
|
||||
Assert.assertEquals(0L, ctx.get(ResponseContext.Key.NUM_SCANNED_ROWS));
|
||||
ctx.add(ResponseContext.Key.NUM_SCANNED_ROWS, 1L);
|
||||
Assert.assertEquals(1L, ctx.get(ResponseContext.Key.NUM_SCANNED_ROWS));
|
||||
ctx.add(ResponseContext.Key.NUM_SCANNED_ROWS, 3L);
|
||||
Assert.assertEquals(4L, ctx.get(ResponseContext.Key.NUM_SCANNED_ROWS));
|
||||
@Test
|
||||
public void initScannedRowsTest()
|
||||
{
|
||||
final ResponseContext ctx = ResponseContext.createEmpty();
|
||||
Assert.assertNull(ctx.getRowScanCount());
|
||||
ctx.initializeRowScanCount();
|
||||
Assert.assertEquals((Long) 0L, ctx.getRowScanCount());
|
||||
}
|
||||
|
||||
ctx.add(ResponseContext.Key.UNCOVERED_INTERVALS_OVERFLOWED, false);
|
||||
Assert.assertEquals(false, ctx.get(ResponseContext.Key.UNCOVERED_INTERVALS_OVERFLOWED));
|
||||
ctx.add(ResponseContext.Key.UNCOVERED_INTERVALS_OVERFLOWED, true);
|
||||
Assert.assertEquals(true, ctx.get(ResponseContext.Key.UNCOVERED_INTERVALS_OVERFLOWED));
|
||||
ctx.add(ResponseContext.Key.UNCOVERED_INTERVALS_OVERFLOWED, false);
|
||||
Assert.assertEquals(true, ctx.get(ResponseContext.Key.UNCOVERED_INTERVALS_OVERFLOWED));
|
||||
@Test
|
||||
public void mergeScannedRowsTest()
|
||||
{
|
||||
final ResponseContext ctx = ResponseContext.createEmpty();
|
||||
Assert.assertNull(ctx.getRowScanCount());
|
||||
ctx.addRowScanCount(0L);
|
||||
Assert.assertEquals((Long) 0L, ctx.getRowScanCount());
|
||||
ctx.addRowScanCount(1L);
|
||||
Assert.assertEquals((Long) 1L, ctx.getRowScanCount());
|
||||
ctx.addRowScanCount(3L);
|
||||
Assert.assertEquals((Long) 4L, ctx.getRowScanCount());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void mergeUncoveredIntervalsOverflowedTest()
|
||||
{
|
||||
final ResponseContext ctx = ResponseContext.createEmpty();
|
||||
ctx.add(Keys.UNCOVERED_INTERVALS_OVERFLOWED, false);
|
||||
Assert.assertEquals(false, ctx.get(Keys.UNCOVERED_INTERVALS_OVERFLOWED));
|
||||
ctx.add(Keys.UNCOVERED_INTERVALS_OVERFLOWED, true);
|
||||
Assert.assertEquals(true, ctx.get(Keys.UNCOVERED_INTERVALS_OVERFLOWED));
|
||||
ctx.add(Keys.UNCOVERED_INTERVALS_OVERFLOWED, false);
|
||||
Assert.assertEquals(true, ctx.get(Keys.UNCOVERED_INTERVALS_OVERFLOWED));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void mergeResponseContextTest()
|
||||
{
|
||||
final ResponseContext ctx1 = ResponseContext.createEmpty();
|
||||
ctx1.put(ResponseContext.Key.ETAG, "dummy-etag-1");
|
||||
final Interval interval01 = Intervals.of("2019-01-01/P1D");
|
||||
ctx1.put(ResponseContext.Key.UNCOVERED_INTERVALS, Collections.singletonList(interval01));
|
||||
ctx1.put(ResponseContext.Key.NUM_SCANNED_ROWS, 1L);
|
||||
ctx1.putEntityTag("dummy-etag-1");
|
||||
ctx1.putUncoveredIntervals(Collections.singletonList(INTERVAL_01), false);
|
||||
ctx1.addRowScanCount(1L);
|
||||
|
||||
final ResponseContext ctx2 = ResponseContext.createEmpty();
|
||||
ctx2.put(ResponseContext.Key.ETAG, "dummy-etag-2");
|
||||
final Interval interval12 = Intervals.of("2019-01-02/P1D");
|
||||
ctx2.put(ResponseContext.Key.UNCOVERED_INTERVALS, Collections.singletonList(interval12));
|
||||
final SegmentDescriptor sd01 = new SegmentDescriptor(interval01, "01", 0);
|
||||
ctx2.put(ResponseContext.Key.MISSING_SEGMENTS, Collections.singletonList(sd01));
|
||||
ctx2.put(ResponseContext.Key.NUM_SCANNED_ROWS, 2L);
|
||||
ctx2.putEntityTag("dummy-etag-2");
|
||||
ctx2.putUncoveredIntervals(Collections.singletonList(INTERVAL_12), false);
|
||||
final SegmentDescriptor sd01 = new SegmentDescriptor(INTERVAL_01, "01", 0);
|
||||
ctx2.addMissingSegments(Collections.singletonList(sd01));
|
||||
ctx2.addRowScanCount(2L);
|
||||
|
||||
ctx1.merge(ctx2);
|
||||
Assert.assertEquals("dummy-etag-2", ctx1.get(ResponseContext.Key.ETAG));
|
||||
Assert.assertEquals(3L, ctx1.get(ResponseContext.Key.NUM_SCANNED_ROWS));
|
||||
Assert.assertEquals("dummy-etag-2", ctx1.getEntityTag());
|
||||
Assert.assertEquals((Long) 3L, ctx1.getRowScanCount());
|
||||
Assert.assertArrayEquals(
|
||||
Arrays.asList(interval01, interval12).toArray(),
|
||||
((List) ctx1.get(ResponseContext.Key.UNCOVERED_INTERVALS)).toArray()
|
||||
Arrays.asList(INTERVAL_01, INTERVAL_12).toArray(),
|
||||
ctx1.getUncoveredIntervals().toArray()
|
||||
);
|
||||
Assert.assertArrayEquals(
|
||||
Collections.singletonList(sd01).toArray(),
|
||||
((List) ctx1.get(ResponseContext.Key.MISSING_SEGMENTS)).toArray()
|
||||
ctx1.getMissingSegments().toArray()
|
||||
);
|
||||
}
|
||||
|
||||
|
@ -213,9 +212,9 @@ public class ResponseContextTest
|
|||
final ResponseContext ctx = new ResponseContext()
|
||||
{
|
||||
@Override
|
||||
protected Map<BaseKey, Object> getDelegate()
|
||||
protected Map<Key, Object> getDelegate()
|
||||
{
|
||||
return ImmutableMap.of(nonregisteredKey, "non-registered-key");
|
||||
return ImmutableMap.of(UNREGISTERED_KEY, "non-registered-key");
|
||||
}
|
||||
};
|
||||
ResponseContext.createEmpty().merge(ctx);
|
||||
|
@ -225,68 +224,116 @@ public class ResponseContextTest
|
|||
public void serializeWithCorrectnessTest() throws JsonProcessingException
|
||||
{
|
||||
final ResponseContext ctx1 = ResponseContext.createEmpty();
|
||||
ctx1.add(ResponseContext.Key.ETAG, "string-value");
|
||||
ctx1.add(EXTN_STRING_KEY, "string-value");
|
||||
final DefaultObjectMapper mapper = new DefaultObjectMapper();
|
||||
Assert.assertEquals(
|
||||
mapper.writeValueAsString(ImmutableMap.of("ETag", "string-value")),
|
||||
ctx1.serializeWith(mapper, Integer.MAX_VALUE).getResult()
|
||||
);
|
||||
mapper.writeValueAsString(ImmutableMap.of(
|
||||
EXTN_STRING_KEY.getName(),
|
||||
"string-value")),
|
||||
ctx1.serializeWith(mapper, Integer.MAX_VALUE).getResult());
|
||||
|
||||
final ResponseContext ctx2 = ResponseContext.createEmpty();
|
||||
ctx2.add(ResponseContext.Key.NUM_SCANNED_ROWS, 100);
|
||||
// Add two non-header fields, and one that will be in the header
|
||||
ctx2.putEntityTag("not in header");
|
||||
ctx2.addCpuNanos(100);
|
||||
ctx2.add(EXTN_COUNTER_KEY, 100);
|
||||
Assert.assertEquals(
|
||||
mapper.writeValueAsString(ImmutableMap.of("count", 100)),
|
||||
ctx2.serializeWith(mapper, Integer.MAX_VALUE).getResult()
|
||||
);
|
||||
mapper.writeValueAsString(ImmutableMap.of(
|
||||
EXTN_COUNTER_KEY.getName(), 100)),
|
||||
ctx2.serializeWith(mapper, Integer.MAX_VALUE).getResult());
|
||||
}
|
||||
|
||||
private Map<ResponseContext.Key, Object> deserializeContext(String input, ObjectMapper mapper) throws IOException
|
||||
{
|
||||
return ResponseContext.deserialize(input, mapper).getDelegate();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void serializeWithTruncateValueTest() throws IOException
|
||||
{
|
||||
final ResponseContext ctx = ResponseContext.createEmpty();
|
||||
ctx.put(ResponseContext.Key.NUM_SCANNED_ROWS, 100);
|
||||
ctx.put(ResponseContext.Key.ETAG, "long-string-that-is-supposed-to-be-removed-from-result");
|
||||
ctx.put(EXTN_COUNTER_KEY, 100L);
|
||||
ctx.put(EXTN_STRING_KEY, "long-string-that-is-supposed-to-be-removed-from-result");
|
||||
final DefaultObjectMapper objectMapper = new DefaultObjectMapper();
|
||||
final String fullString = objectMapper.writeValueAsString(ctx.getDelegate());
|
||||
final ResponseContext.SerializationResult res1 = ctx.serializeWith(objectMapper, Integer.MAX_VALUE);
|
||||
Assert.assertEquals(fullString, res1.getResult());
|
||||
Assert.assertEquals(ctx.getDelegate(), deserializeContext(res1.getResult(), objectMapper));
|
||||
final ResponseContext ctxCopy = ResponseContext.createEmpty();
|
||||
ctxCopy.merge(ctx);
|
||||
final ResponseContext.SerializationResult res2 = ctx.serializeWith(objectMapper, 30);
|
||||
ctxCopy.remove(ResponseContext.Key.ETAG);
|
||||
ctxCopy.put(ResponseContext.Key.TRUNCATED, true);
|
||||
final int target = EXTN_COUNTER_KEY.getName().length() + 3 +
|
||||
Keys.TRUNCATED.getName().length() + 5 +
|
||||
15; // Fudge factor for quotes, separators, etc.
|
||||
final ResponseContext.SerializationResult res2 = ctx.serializeWith(objectMapper, target);
|
||||
ctxCopy.remove(EXTN_STRING_KEY);
|
||||
ctxCopy.put(Keys.TRUNCATED, true);
|
||||
Assert.assertEquals(
|
||||
ctxCopy.getDelegate(),
|
||||
ResponseContext.deserialize(res2.getResult(), objectMapper).getDelegate()
|
||||
deserializeContext(res2.getResult(), objectMapper)
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
* Tests the case in which the sender knows about a key that the
|
||||
* receiver does not know about. The receiver will silently ignore
|
||||
* such keys.
|
||||
* @throws IOException
|
||||
*/
|
||||
@Test
|
||||
public void deserializeWithUnknownKeyTest() throws IOException
|
||||
{
|
||||
Map<String, Object> bogus = new HashMap<>();
|
||||
bogus.put(Keys.ETAG.getName(), "eTag");
|
||||
bogus.put("scalar", "doomed");
|
||||
bogus.put("array", new String[]{"foo", "bar"});
|
||||
Map<String, Object> objValue = new HashMap<>();
|
||||
objValue.put("array", new String[]{"foo", "bar"});
|
||||
bogus.put("obj", objValue);
|
||||
bogus.put("null", null);
|
||||
final ObjectMapper mapper = new DefaultObjectMapper();
|
||||
String serialized = mapper.writeValueAsString(bogus);
|
||||
ResponseContext ctx = ResponseContext.deserialize(serialized, mapper);
|
||||
Assert.assertEquals(1, ctx.getDelegate().size());
|
||||
Assert.assertEquals("eTag", ctx.get(Keys.ETAG));
|
||||
}
|
||||
|
||||
// Interval value for the test. Must match the deserialized value.
|
||||
private static Interval interval(int n)
|
||||
{
|
||||
return Intervals.of(StringUtils.format("2021-01-%02d/PT1M", n));
|
||||
}
|
||||
|
||||
// Length of above with quotes and comma.
|
||||
private static final int INTERVAL_LEN = 52;
|
||||
|
||||
@Test
|
||||
public void serializeWithTruncateArrayTest() throws IOException
|
||||
{
|
||||
final ResponseContext ctx = ResponseContext.createEmpty();
|
||||
ctx.put(ResponseContext.Key.NUM_SCANNED_ROWS, 100);
|
||||
ctx.put(
|
||||
ResponseContext.Key.UNCOVERED_INTERVALS,
|
||||
Arrays.asList(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9)
|
||||
Keys.UNCOVERED_INTERVALS,
|
||||
Arrays.asList(interval(1), interval(2), interval(3), interval(4),
|
||||
interval(5), interval(6))
|
||||
);
|
||||
// This value should be longer than the above so it is fully removed
|
||||
// before we truncate the above.
|
||||
ctx.put(
|
||||
ResponseContext.Key.MISSING_SEGMENTS,
|
||||
Arrays.asList(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9)
|
||||
EXTN_STRING_KEY,
|
||||
Strings.repeat("x", INTERVAL_LEN * 7)
|
||||
);
|
||||
final DefaultObjectMapper objectMapper = new DefaultObjectMapper();
|
||||
final String fullString = objectMapper.writeValueAsString(ctx.getDelegate());
|
||||
final ResponseContext.SerializationResult res1 = ctx.serializeWith(objectMapper, Integer.MAX_VALUE);
|
||||
Assert.assertEquals(fullString, res1.getResult());
|
||||
final int maxLen = INTERVAL_LEN * 4 + Keys.UNCOVERED_INTERVALS.getName().length() + 4 +
|
||||
Keys.TRUNCATED.getName().length() + 6;
|
||||
final ResponseContext.SerializationResult res2 = ctx.serializeWith(objectMapper, maxLen);
|
||||
final ResponseContext ctxCopy = ResponseContext.createEmpty();
|
||||
ctxCopy.merge(ctx);
|
||||
final ResponseContext.SerializationResult res2 = ctx.serializeWith(objectMapper, 70);
|
||||
ctxCopy.put(ResponseContext.Key.UNCOVERED_INTERVALS, Arrays.asList(0, 1, 2, 3, 4));
|
||||
ctxCopy.remove(ResponseContext.Key.MISSING_SEGMENTS);
|
||||
ctxCopy.put(ResponseContext.Key.TRUNCATED, true);
|
||||
// The resulting key array length will be half the start
|
||||
// length.
|
||||
ctxCopy.put(Keys.UNCOVERED_INTERVALS, Arrays.asList(interval(1), interval(2), interval(3)));
|
||||
ctxCopy.put(Keys.TRUNCATED, true);
|
||||
Assert.assertEquals(
|
||||
ctxCopy.getDelegate(),
|
||||
ResponseContext.deserialize(res2.getResult(), objectMapper).getDelegate()
|
||||
deserializeContext(res2.getResult(), objectMapper)
|
||||
);
|
||||
}
|
||||
|
||||
|
@ -297,62 +344,45 @@ public class ResponseContextTest
|
|||
final ResponseContext ctx = ResponseContext.deserialize(
|
||||
mapper.writeValueAsString(
|
||||
ImmutableMap.of(
|
||||
"ETag", "string-value",
|
||||
"count", 100L,
|
||||
"cpuConsumed", 100000L
|
||||
Keys.ETAG.getName(), "string-value",
|
||||
Keys.NUM_SCANNED_ROWS.getName(), 100L,
|
||||
Keys.CPU_CONSUMED_NANOS.getName(), 100000L
|
||||
)
|
||||
),
|
||||
mapper
|
||||
);
|
||||
Assert.assertEquals("string-value", ctx.get(ResponseContext.Key.ETAG));
|
||||
Assert.assertEquals(100, ctx.get(ResponseContext.Key.NUM_SCANNED_ROWS));
|
||||
Assert.assertEquals(100000, ctx.get(ResponseContext.Key.CPU_CONSUMED_NANOS));
|
||||
ctx.add(ResponseContext.Key.NUM_SCANNED_ROWS, 10L);
|
||||
Assert.assertEquals(110L, ctx.get(ResponseContext.Key.NUM_SCANNED_ROWS));
|
||||
ctx.add(ResponseContext.Key.CPU_CONSUMED_NANOS, 100);
|
||||
Assert.assertEquals(100100L, ctx.get(ResponseContext.Key.CPU_CONSUMED_NANOS));
|
||||
}
|
||||
|
||||
@Test(expected = IllegalStateException.class)
|
||||
public void deserializeISETest() throws IOException
|
||||
{
|
||||
final DefaultObjectMapper mapper = new DefaultObjectMapper();
|
||||
ResponseContext.deserialize(
|
||||
mapper.writeValueAsString(ImmutableMap.of("ETag_unexpected", "string-value")),
|
||||
mapper
|
||||
);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void extensionEnumIntegrityTest()
|
||||
{
|
||||
Assert.assertEquals(
|
||||
ExtensionResponseContextKey.EXTENSION_KEY_1,
|
||||
ResponseContext.Key.keyOf(ExtensionResponseContextKey.EXTENSION_KEY_1.getName())
|
||||
);
|
||||
Assert.assertEquals(
|
||||
ExtensionResponseContextKey.EXTENSION_KEY_2,
|
||||
ResponseContext.Key.keyOf(ExtensionResponseContextKey.EXTENSION_KEY_2.getName())
|
||||
);
|
||||
for (ResponseContext.BaseKey key : ExtensionResponseContextKey.values()) {
|
||||
Assert.assertTrue(ResponseContext.Key.getAllRegisteredKeys().contains(key));
|
||||
}
|
||||
Assert.assertEquals("string-value", ctx.getEntityTag());
|
||||
Assert.assertEquals((Long) 100L, ctx.getRowScanCount());
|
||||
Assert.assertEquals((Long) 100000L, ctx.getCpuNanos());
|
||||
ctx.addRowScanCount(10L);
|
||||
Assert.assertEquals((Long) 110L, ctx.getRowScanCount());
|
||||
ctx.addCpuNanos(100L);
|
||||
Assert.assertEquals((Long) 100100L, ctx.getCpuNanos());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void extensionEnumMergeTest()
|
||||
{
|
||||
final ResponseContext ctx = ResponseContext.createEmpty();
|
||||
ctx.add(ResponseContext.Key.ETAG, "etag");
|
||||
ctx.add(ExtensionResponseContextKey.EXTENSION_KEY_1, "string-value");
|
||||
ctx.add(ExtensionResponseContextKey.EXTENSION_KEY_2, 2L);
|
||||
ctx.putEntityTag("etag");
|
||||
ctx.add(EXTN_STRING_KEY, "string-value");
|
||||
ctx.add(EXTN_COUNTER_KEY, 2L);
|
||||
final ResponseContext ctxFinal = ResponseContext.createEmpty();
|
||||
ctxFinal.add(ResponseContext.Key.ETAG, "old-etag");
|
||||
ctxFinal.add(ExtensionResponseContextKey.EXTENSION_KEY_1, "old-string-value");
|
||||
ctxFinal.add(ExtensionResponseContextKey.EXTENSION_KEY_2, 1L);
|
||||
ctxFinal.putEntityTag("old-etag");
|
||||
ctxFinal.add(EXTN_STRING_KEY, "old-string-value");
|
||||
ctxFinal.add(EXTN_COUNTER_KEY, 1L);
|
||||
ctxFinal.merge(ctx);
|
||||
Assert.assertEquals("etag", ctxFinal.get(ResponseContext.Key.ETAG));
|
||||
Assert.assertEquals("string-value", ctxFinal.get(ExtensionResponseContextKey.EXTENSION_KEY_1));
|
||||
Assert.assertEquals(1L + 2L, ctxFinal.get(ExtensionResponseContextKey.EXTENSION_KEY_2));
|
||||
Assert.assertEquals("etag", ctxFinal.getEntityTag());
|
||||
Assert.assertEquals("string-value", ctxFinal.get(EXTN_STRING_KEY));
|
||||
Assert.assertEquals(1L + 2L, ctxFinal.get(EXTN_COUNTER_KEY));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void toMapTest()
|
||||
{
|
||||
final ResponseContext ctx = ResponseContext.createEmpty();
|
||||
ctx.putEntityTag("etag");
|
||||
Map<String, Object> map = ctx.toMap();
|
||||
Assert.assertEquals(map.get(ResponseContext.Keys.ETAG.getName()), "etag");
|
||||
}
|
||||
}
|
||||
|
|
|
@ -50,7 +50,6 @@ import org.junit.Assert;
|
|||
import org.junit.Test;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
@ -140,7 +139,7 @@ public class DataSourceMetadataQueryTest
|
|||
.dataSource("testing")
|
||||
.build();
|
||||
ResponseContext context = ConcurrentResponseContext.createEmpty();
|
||||
context.put(ResponseContext.Key.MISSING_SEGMENTS, new ArrayList<>());
|
||||
context.initializeMissingSegments();
|
||||
Iterable<Result<DataSourceMetadataResultValue>> results =
|
||||
runner.run(QueryPlus.wrap(dataSourceMetadataQuery), context).toList();
|
||||
DataSourceMetadataResultValue val = results.iterator().next().getValue();
|
||||
|
|
|
@ -913,7 +913,7 @@ public class ScanQueryRunnerTest extends InitializedNullHandlingTest
|
|||
.context(ImmutableMap.of(QueryContexts.TIMEOUT_KEY, 1))
|
||||
.build();
|
||||
ResponseContext responseContext = DefaultResponseContext.createEmpty();
|
||||
responseContext.add(ResponseContext.Key.TIMEOUT_AT, System.currentTimeMillis());
|
||||
responseContext.putTimeoutTime(System.currentTimeMillis());
|
||||
try {
|
||||
runner.run(QueryPlus.wrap(query), responseContext).toList();
|
||||
}
|
||||
|
|
|
@ -197,17 +197,11 @@ public class SpecificSegmentQueryRunnerTest
|
|||
private void validate(ObjectMapper mapper, SegmentDescriptor descriptor, ResponseContext responseContext)
|
||||
throws IOException
|
||||
{
|
||||
Object missingSegments = responseContext.get(ResponseContext.Key.MISSING_SEGMENTS);
|
||||
|
||||
List<SegmentDescriptor> missingSegments = responseContext.getMissingSegments();
|
||||
Assert.assertTrue(missingSegments != null);
|
||||
Assert.assertTrue(missingSegments instanceof List);
|
||||
|
||||
Object segmentDesc = ((List) missingSegments).get(0);
|
||||
|
||||
Assert.assertTrue(segmentDesc instanceof SegmentDescriptor);
|
||||
|
||||
SegmentDescriptor segmentDesc = missingSegments.get(0);
|
||||
SegmentDescriptor newDesc = mapper.readValue(mapper.writeValueAsString(segmentDesc), SegmentDescriptor.class);
|
||||
|
||||
Assert.assertEquals(descriptor, newDesc);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -225,7 +225,7 @@ public class TimeBoundaryQueryRunnerTest
|
|||
.bound(TimeBoundaryQuery.MAX_TIME)
|
||||
.build();
|
||||
ResponseContext context = ConcurrentResponseContext.createEmpty();
|
||||
context.put(ResponseContext.Key.MISSING_SEGMENTS, new ArrayList<>());
|
||||
context.initializeMissingSegments();
|
||||
Iterable<Result<TimeBoundaryResultValue>> results = runner.run(QueryPlus.wrap(timeBoundaryQuery), context).toList();
|
||||
TimeBoundaryResultValue val = results.iterator().next().getValue();
|
||||
DateTime minTime = val.getMinTime();
|
||||
|
@ -244,7 +244,7 @@ public class TimeBoundaryQueryRunnerTest
|
|||
.bound(TimeBoundaryQuery.MIN_TIME)
|
||||
.build();
|
||||
ResponseContext context = ConcurrentResponseContext.createEmpty();
|
||||
context.put(ResponseContext.Key.MISSING_SEGMENTS, new ArrayList<>());
|
||||
context.initializeMissingSegments();
|
||||
Iterable<Result<TimeBoundaryResultValue>> results = runner.run(QueryPlus.wrap(timeBoundaryQuery), context).toList();
|
||||
TimeBoundaryResultValue val = results.iterator().next().getValue();
|
||||
DateTime minTime = val.getMinTime();
|
||||
|
|
|
@ -45,7 +45,6 @@ import org.apache.druid.guice.annotations.Merging;
|
|||
import org.apache.druid.guice.annotations.Smile;
|
||||
import org.apache.druid.guice.http.DruidHttpClientConfig;
|
||||
import org.apache.druid.java.util.common.Intervals;
|
||||
import org.apache.druid.java.util.common.NonnullPair;
|
||||
import org.apache.druid.java.util.common.Pair;
|
||||
import org.apache.druid.java.util.common.StringUtils;
|
||||
import org.apache.druid.java.util.common.concurrent.Execs;
|
||||
|
@ -72,7 +71,6 @@ import org.apache.druid.query.Result;
|
|||
import org.apache.druid.query.SegmentDescriptor;
|
||||
import org.apache.druid.query.aggregation.MetricManipulatorFns;
|
||||
import org.apache.druid.query.context.ResponseContext;
|
||||
import org.apache.druid.query.context.ResponseContext.Key;
|
||||
import org.apache.druid.query.filter.DimFilterUtils;
|
||||
import org.apache.druid.query.planning.DataSourceAnalysis;
|
||||
import org.apache.druid.query.spec.QuerySegmentSpec;
|
||||
|
@ -218,10 +216,7 @@ public class CachingClusteredClient implements QuerySegmentWalker
|
|||
final int numQueryServers
|
||||
)
|
||||
{
|
||||
responseContext.add(
|
||||
Key.REMAINING_RESPONSES_FROM_QUERY_SERVERS,
|
||||
new NonnullPair<>(query.getMostSpecificId(), numQueryServers)
|
||||
);
|
||||
responseContext.addRemainingResponse(query.getMostSpecificId(), numQueryServers);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -361,7 +356,7 @@ public class CachingClusteredClient implements QuerySegmentWalker
|
|||
@Nullable
|
||||
final String currentEtag = cacheKeyManager.computeResultLevelCachingEtag(segmentServers, queryCacheKey);
|
||||
if (null != currentEtag) {
|
||||
responseContext.put(Key.ETAG, currentEtag);
|
||||
responseContext.putEntityTag(currentEtag);
|
||||
}
|
||||
if (currentEtag != null && currentEtag.equals(prevEtag)) {
|
||||
return new ClusterQueryResult<>(Sequences.empty(), 0);
|
||||
|
@ -499,8 +494,7 @@ public class CachingClusteredClient implements QuerySegmentWalker
|
|||
// Which is not necessarily an indication that the data doesn't exist or is
|
||||
// incomplete. The data could exist and just not be loaded yet. In either
|
||||
// case, though, this query will not include any data from the identified intervals.
|
||||
responseContext.add(ResponseContext.Key.UNCOVERED_INTERVALS, uncoveredIntervals);
|
||||
responseContext.add(ResponseContext.Key.UNCOVERED_INTERVALS_OVERFLOWED, uncoveredIntervalsOverflowed);
|
||||
responseContext.putUncoveredIntervals(uncoveredIntervals, uncoveredIntervalsOverflowed);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -27,7 +27,6 @@ import com.google.common.base.Preconditions;
|
|||
import com.google.common.util.concurrent.FutureCallback;
|
||||
import com.google.common.util.concurrent.Futures;
|
||||
import com.google.common.util.concurrent.ListenableFuture;
|
||||
import org.apache.druid.java.util.common.NonnullPair;
|
||||
import org.apache.druid.java.util.common.RE;
|
||||
import org.apache.druid.java.util.common.StringUtils;
|
||||
import org.apache.druid.java.util.common.concurrent.Execs;
|
||||
|
@ -55,7 +54,6 @@ import org.apache.druid.query.ResourceLimitExceededException;
|
|||
import org.apache.druid.query.aggregation.MetricManipulatorFns;
|
||||
import org.apache.druid.query.context.ConcurrentResponseContext;
|
||||
import org.apache.druid.query.context.ResponseContext;
|
||||
import org.apache.druid.query.context.ResponseContext.Key;
|
||||
import org.apache.druid.server.QueryResource;
|
||||
import org.apache.druid.utils.CloseableUtils;
|
||||
import org.jboss.netty.buffer.ChannelBuffer;
|
||||
|
@ -67,13 +65,13 @@ import org.jboss.netty.handler.codec.http.HttpResponse;
|
|||
import org.joda.time.Duration;
|
||||
|
||||
import javax.ws.rs.core.MediaType;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.io.SequenceInputStream;
|
||||
import java.net.URL;
|
||||
import java.util.Enumeration;
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.LinkedBlockingQueue;
|
||||
|
@ -110,15 +108,14 @@ public class DirectDruidClient<T> implements QueryRunner<T>
|
|||
*/
|
||||
public static void removeMagicResponseContextFields(ResponseContext responseContext)
|
||||
{
|
||||
responseContext.remove(ResponseContext.Key.QUERY_TOTAL_BYTES_GATHERED);
|
||||
responseContext.remove(ResponseContext.Key.REMAINING_RESPONSES_FROM_QUERY_SERVERS);
|
||||
responseContext.remove(ResponseContext.Keys.QUERY_TOTAL_BYTES_GATHERED);
|
||||
responseContext.remove(ResponseContext.Keys.REMAINING_RESPONSES_FROM_QUERY_SERVERS);
|
||||
}
|
||||
|
||||
public static ResponseContext makeResponseContextForQuery()
|
||||
public static ConcurrentResponseContext makeResponseContextForQuery()
|
||||
{
|
||||
final ResponseContext responseContext = ConcurrentResponseContext.createEmpty();
|
||||
responseContext.put(ResponseContext.Key.QUERY_TOTAL_BYTES_GATHERED, new AtomicLong());
|
||||
responseContext.put(Key.REMAINING_RESPONSES_FROM_QUERY_SERVERS, new ConcurrentHashMap<>());
|
||||
final ConcurrentResponseContext responseContext = ConcurrentResponseContext.createEmpty();
|
||||
responseContext.initialize();
|
||||
return responseContext;
|
||||
}
|
||||
|
||||
|
@ -168,7 +165,7 @@ public class DirectDruidClient<T> implements QueryRunner<T>
|
|||
final long requestStartTimeNs = System.nanoTime();
|
||||
final long timeoutAt = query.getContextValue(QUERY_FAIL_TIME);
|
||||
final long maxScatterGatherBytes = QueryContexts.getMaxScatterGatherBytes(query);
|
||||
final AtomicLong totalBytesGathered = (AtomicLong) context.get(ResponseContext.Key.QUERY_TOTAL_BYTES_GATHERED);
|
||||
final AtomicLong totalBytesGathered = context.getTotalBytes();
|
||||
final long maxQueuedBytes = QueryContexts.getMaxQueuedBytes(query, 0);
|
||||
final boolean usingBackpressure = maxQueuedBytes > 0;
|
||||
|
||||
|
@ -246,10 +243,7 @@ public class DirectDruidClient<T> implements QueryRunner<T>
|
|||
query.getSubQueryId()
|
||||
);
|
||||
final String responseContext = response.headers().get(QueryResource.HEADER_RESPONSE_CONTEXT);
|
||||
context.add(
|
||||
ResponseContext.Key.REMAINING_RESPONSES_FROM_QUERY_SERVERS,
|
||||
new NonnullPair<>(query.getMostSpecificId(), VAL_TO_REDUCE_REMAINING_RESPONSES)
|
||||
);
|
||||
context.addRemainingResponse(query.getMostSpecificId(), VAL_TO_REDUCE_REMAINING_RESPONSES);
|
||||
// context may be null in case of error or query timeout
|
||||
if (responseContext != null) {
|
||||
context.merge(ResponseContext.deserialize(responseContext, objectMapper));
|
||||
|
|
|
@ -97,7 +97,7 @@ public class ResultLevelCachingQueryRunner<T> implements QueryRunner<T>
|
|||
QueryPlus.wrap(query),
|
||||
responseContext
|
||||
);
|
||||
String newResultSetId = (String) responseContext.get(ResponseContext.Key.ETAG);
|
||||
String newResultSetId = responseContext.getEntityTag();
|
||||
|
||||
if (useResultCache && newResultSetId != null && newResultSetId.equals(existingResultSetId)) {
|
||||
log.debug("Return cached result set as there is no change in identifiers for query %s ", query.getId());
|
||||
|
|
|
@ -33,10 +33,9 @@ import org.apache.druid.java.util.common.guava.YieldingAccumulator;
|
|||
import org.apache.druid.java.util.common.guava.YieldingSequenceBase;
|
||||
import org.apache.druid.java.util.common.logger.Logger;
|
||||
import org.apache.druid.query.context.ResponseContext;
|
||||
import org.apache.druid.query.context.ResponseContext.Key;
|
||||
import org.apache.druid.query.context.ResponseContext.Keys;
|
||||
import org.apache.druid.segment.SegmentMissingException;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
|
@ -54,7 +53,7 @@ public class RetryQueryRunner<T> implements QueryRunner<T>
|
|||
private final ObjectMapper jsonMapper;
|
||||
|
||||
/**
|
||||
* Runnable executed after the broker creates query distribution tree for the first attempt. This is only
|
||||
* Runnable executed after the broker creates the query distribution tree for the first attempt. This is only
|
||||
* for testing and must not be used in production code.
|
||||
*/
|
||||
private final Runnable runnableAfterFirstAttempt;
|
||||
|
@ -142,10 +141,10 @@ public class RetryQueryRunner<T> implements QueryRunner<T>
|
|||
// The missingSegments in the responseContext is only valid when all servers have responded to the broker.
|
||||
// The remainingResponses MUST be not null but 0 in the responseContext at this point.
|
||||
final ConcurrentHashMap<String, Integer> idToRemainingResponses =
|
||||
(ConcurrentHashMap<String, Integer>) Preconditions.checkNotNull(
|
||||
context.get(Key.REMAINING_RESPONSES_FROM_QUERY_SERVERS),
|
||||
Preconditions.checkNotNull(
|
||||
context.getRemainingResponses(),
|
||||
"%s in responseContext",
|
||||
Key.REMAINING_RESPONSES_FROM_QUERY_SERVERS.getName()
|
||||
Keys.REMAINING_RESPONSES_FROM_QUERY_SERVERS.getName()
|
||||
);
|
||||
|
||||
final int remainingResponses = Preconditions.checkNotNull(
|
||||
|
@ -157,7 +156,11 @@ public class RetryQueryRunner<T> implements QueryRunner<T>
|
|||
throw new ISE("Failed to check missing segments due to missing responses from [%d] servers", remainingResponses);
|
||||
}
|
||||
|
||||
final Object maybeMissingSegments = context.get(ResponseContext.Key.MISSING_SEGMENTS);
|
||||
// TODO: the sender's response may contain a truncated list of missing segments.
|
||||
// Truncation is aggregated in the response context given as a parameter.
|
||||
// Check the getTruncated() value: if true, then the we don't know the full set of
|
||||
// missing segments.
|
||||
final List<SegmentDescriptor> maybeMissingSegments = context.getMissingSegments();
|
||||
if (maybeMissingSegments == null) {
|
||||
return Collections.emptyList();
|
||||
}
|
||||
|
@ -171,7 +174,7 @@ public class RetryQueryRunner<T> implements QueryRunner<T>
|
|||
}
|
||||
|
||||
/**
|
||||
* A lazy iterator populating {@link Sequence} by retrying the query. The first returned sequence is always the base
|
||||
* A lazy iterator populating a {@link Sequence} by retrying the query. The first returned sequence is always the base
|
||||
* sequence from the baseQueryRunner. Subsequent sequences are created dynamically whenever it retries the query. All
|
||||
* the sequences populated by this iterator will be merged (not combined) with the base sequence.
|
||||
*
|
||||
|
@ -179,7 +182,7 @@ public class RetryQueryRunner<T> implements QueryRunner<T>
|
|||
* each underlying sequence and pushes them to a {@link java.util.PriorityQueue}. Whenever it pops from the queue,
|
||||
* it pushes a new item from the sequence where the returned item was originally from. Since the first returned
|
||||
* sequence from this iterator is always the base sequence, the MergeSequence will call {@link Sequence#toYielder}
|
||||
* on the base sequence first which in turn initializing query distribution tree. Once this tree is built, the query
|
||||
* on the base sequence first which in turn initializes the query distribution tree. Once this tree is built, the query
|
||||
* servers (historicals and realtime tasks) will lock all segments to read and report missing segments to the broker.
|
||||
* If there are missing segments reported, this iterator will rewrite the query with those reported segments and
|
||||
* reissue the rewritten query.
|
||||
|
@ -229,7 +232,7 @@ public class RetryQueryRunner<T> implements QueryRunner<T>
|
|||
retryCount++;
|
||||
LOG.info("[%,d] missing segments found. Retry attempt [%,d]", missingSegments.size(), retryCount);
|
||||
|
||||
context.put(ResponseContext.Key.MISSING_SEGMENTS, new ArrayList<>());
|
||||
context.initializeMissingSegments();
|
||||
final QueryPlus<T> retryQueryPlus = queryPlus.withQuery(
|
||||
Queries.withSpecificSegments(queryPlus.getQuery(), missingSegments)
|
||||
);
|
||||
|
|
|
@ -26,6 +26,7 @@ import com.fasterxml.jackson.databind.ObjectWriter;
|
|||
import com.fasterxml.jackson.databind.module.SimpleModule;
|
||||
import com.fasterxml.jackson.datatype.joda.ser.DateTimeSerializer;
|
||||
import com.fasterxml.jackson.jaxrs.smile.SmileMediaTypes;
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.base.Strings;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
|
@ -55,6 +56,7 @@ import org.apache.druid.query.QueryUnsupportedException;
|
|||
import org.apache.druid.query.ResourceLimitExceededException;
|
||||
import org.apache.druid.query.TruncatedResponseContextException;
|
||||
import org.apache.druid.query.context.ResponseContext;
|
||||
import org.apache.druid.query.context.ResponseContext.Keys;
|
||||
import org.apache.druid.server.metrics.QueryCountStatsProvider;
|
||||
import org.apache.druid.server.security.Access;
|
||||
import org.apache.druid.server.security.AuthConfig;
|
||||
|
@ -78,6 +80,7 @@ import javax.ws.rs.core.MediaType;
|
|||
import javax.ws.rs.core.Response;
|
||||
import javax.ws.rs.core.Response.Status;
|
||||
import javax.ws.rs.core.StreamingOutput;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.io.OutputStream;
|
||||
|
@ -215,7 +218,7 @@ public class QueryResource implements QueryCountStatsProvider
|
|||
final ResponseContext responseContext = queryResponse.getResponseContext();
|
||||
final String prevEtag = getPreviousEtag(req);
|
||||
|
||||
if (prevEtag != null && prevEtag.equals(responseContext.get(ResponseContext.Key.ETAG))) {
|
||||
if (prevEtag != null && prevEtag.equals(responseContext.getEntityTag())) {
|
||||
queryLifecycle.emitLogsAndMetrics(null, req.getRemoteAddr(), -1);
|
||||
successfulQueryCount.incrementAndGet();
|
||||
return Response.notModified().build();
|
||||
|
@ -274,16 +277,13 @@ public class QueryResource implements QueryCountStatsProvider
|
|||
)
|
||||
.header("X-Druid-Query-Id", queryId);
|
||||
|
||||
Object entityTag = responseContext.remove(ResponseContext.Key.ETAG);
|
||||
if (entityTag != null) {
|
||||
responseBuilder.header(HEADER_ETAG, entityTag);
|
||||
}
|
||||
transferEntityTag(responseContext, responseBuilder);
|
||||
|
||||
DirectDruidClient.removeMagicResponseContextFields(responseContext);
|
||||
|
||||
//Limit the response-context header, see https://github.com/apache/druid/issues/2331
|
||||
//Note that Response.ResponseBuilder.header(String key,Object value).build() calls value.toString()
|
||||
//and encodes the string using ASCII, so 1 char is = 1 byte
|
||||
// Limit the response-context header, see https://github.com/apache/druid/issues/2331
|
||||
// Note that Response.ResponseBuilder.header(String key,Object value).build() calls value.toString()
|
||||
// and encodes the string using ASCII, so 1 char is = 1 byte
|
||||
final ResponseContext.SerializationResult serializationResult = responseContext.serializeWith(
|
||||
jsonMapper,
|
||||
responseContextConfig.getMaxResponseContextHeaderSize()
|
||||
|
@ -557,4 +557,13 @@ public class QueryResource implements QueryCountStatsProvider
|
|||
{
|
||||
return timedOutQueryCount.get();
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public static void transferEntityTag(ResponseContext context, Response.ResponseBuilder builder)
|
||||
{
|
||||
Object entityTag = context.remove(Keys.ETAG);
|
||||
if (entityTag != null) {
|
||||
builder.header(HEADER_ETAG, entityTag);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -128,7 +128,7 @@ public class CachingClusteredClientFunctionalityTest
|
|||
|
||||
ResponseContext responseContext = ResponseContext.createEmpty();
|
||||
runQuery(client, builder.build(), responseContext);
|
||||
Assert.assertNull(responseContext.get(ResponseContext.Key.UNCOVERED_INTERVALS));
|
||||
Assert.assertNull(responseContext.getUncoveredIntervals());
|
||||
|
||||
builder.intervals("2015-01-01/2015-01-03");
|
||||
responseContext = ResponseContext.createEmpty();
|
||||
|
@ -177,8 +177,8 @@ public class CachingClusteredClientFunctionalityTest
|
|||
for (String interval : intervals) {
|
||||
expectedList.add(Intervals.of(interval));
|
||||
}
|
||||
Assert.assertEquals((Object) expectedList, context.get(ResponseContext.Key.UNCOVERED_INTERVALS));
|
||||
Assert.assertEquals(uncoveredIntervalsOverflowed, context.get(ResponseContext.Key.UNCOVERED_INTERVALS_OVERFLOWED));
|
||||
Assert.assertEquals((Object) expectedList, context.getUncoveredIntervals());
|
||||
Assert.assertEquals(uncoveredIntervalsOverflowed, context.get(ResponseContext.Keys.UNCOVERED_INTERVALS_OVERFLOWED));
|
||||
}
|
||||
|
||||
private void addToTimeline(Interval interval, String version)
|
||||
|
|
|
@ -88,7 +88,6 @@ import org.apache.druid.query.aggregation.post.ArithmeticPostAggregator;
|
|||
import org.apache.druid.query.aggregation.post.ConstantPostAggregator;
|
||||
import org.apache.druid.query.aggregation.post.FieldAccessPostAggregator;
|
||||
import org.apache.druid.query.context.ResponseContext;
|
||||
import org.apache.druid.query.context.ResponseContext.Key;
|
||||
import org.apache.druid.query.dimension.DefaultDimensionSpec;
|
||||
import org.apache.druid.query.filter.AndDimFilter;
|
||||
import org.apache.druid.query.filter.BoundDimFilter;
|
||||
|
@ -168,7 +167,6 @@ import java.util.Random;
|
|||
import java.util.Set;
|
||||
import java.util.TreeMap;
|
||||
import java.util.concurrent.Callable;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ConcurrentLinkedDeque;
|
||||
import java.util.concurrent.Executor;
|
||||
import java.util.concurrent.ForkJoinPool;
|
||||
|
@ -3292,7 +3290,7 @@ public class CachingClusteredClientTest
|
|||
final ResponseContext responseContext = initializeResponseContext();
|
||||
|
||||
getDefaultQueryRunner().run(QueryPlus.wrap(query), responseContext);
|
||||
Assert.assertEquals("MDs2yIUvYLVzaG6zmwTH1plqaYE=", responseContext.get(ResponseContext.Key.ETAG));
|
||||
Assert.assertEquals("MDs2yIUvYLVzaG6zmwTH1plqaYE=", responseContext.getEntityTag());
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -3340,9 +3338,9 @@ public class CachingClusteredClientTest
|
|||
final ResponseContext responseContext = initializeResponseContext();
|
||||
|
||||
getDefaultQueryRunner().run(QueryPlus.wrap(query), responseContext);
|
||||
final Object etag1 = responseContext.get(ResponseContext.Key.ETAG);
|
||||
final String etag1 = responseContext.getEntityTag();
|
||||
getDefaultQueryRunner().run(QueryPlus.wrap(query2), responseContext);
|
||||
final Object etag2 = responseContext.get(ResponseContext.Key.ETAG);
|
||||
final String etag2 = responseContext.getEntityTag();
|
||||
Assert.assertNotEquals(etag1, etag2);
|
||||
}
|
||||
|
||||
|
@ -3363,7 +3361,7 @@ public class CachingClusteredClientTest
|
|||
private static ResponseContext initializeResponseContext()
|
||||
{
|
||||
final ResponseContext context = ResponseContext.createEmpty();
|
||||
context.put(Key.REMAINING_RESPONSES_FROM_QUERY_SERVERS, new ConcurrentHashMap<>());
|
||||
context.initializeRemainingResponses();
|
||||
return context;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -44,7 +44,6 @@ import org.apache.druid.java.util.common.io.Closer;
|
|||
import org.apache.druid.query.aggregation.CountAggregatorFactory;
|
||||
import org.apache.druid.query.context.ConcurrentResponseContext;
|
||||
import org.apache.druid.query.context.ResponseContext;
|
||||
import org.apache.druid.query.context.ResponseContext.Key;
|
||||
import org.apache.druid.query.timeseries.TimeseriesResultValue;
|
||||
import org.apache.druid.query.topn.TopNQueryConfig;
|
||||
import org.apache.druid.segment.QueryableIndex;
|
||||
|
@ -65,7 +64,6 @@ import java.io.IOException;
|
|||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ForkJoinPool;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.IntStream;
|
||||
|
@ -247,7 +245,7 @@ public abstract class QueryRunnerBasedOnClusteredClientTestBase
|
|||
protected static ResponseContext responseContext()
|
||||
{
|
||||
final ResponseContext responseContext = ConcurrentResponseContext.createEmpty();
|
||||
responseContext.put(Key.REMAINING_RESPONSES_FROM_QUERY_SERVERS, new ConcurrentHashMap<>());
|
||||
responseContext.initializeRemainingResponses();
|
||||
return responseContext;
|
||||
}
|
||||
|
||||
|
|
|
@ -0,0 +1,59 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.apache.druid.server;
|
||||
|
||||
import org.apache.druid.query.context.ResponseContext;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
import javax.ws.rs.core.MultivaluedMap;
|
||||
import javax.ws.rs.core.Response;
|
||||
|
||||
/**
|
||||
* Tests for low-level bits of the query resource mechanism.
|
||||
*/
|
||||
public class QueryDetailsTest
|
||||
{
|
||||
@Test
|
||||
public void testEtag()
|
||||
{
|
||||
// No ETAG
|
||||
{
|
||||
ResponseContext responseContext = ResponseContext.createEmpty();
|
||||
Response.ResponseBuilder responseBuilder = Response.ok();
|
||||
QueryResource.transferEntityTag(responseContext, responseBuilder);
|
||||
Response response = responseBuilder.build();
|
||||
MultivaluedMap<String, Object> metadata = response.getMetadata();
|
||||
Assert.assertNull(metadata.get(QueryResource.HEADER_ETAG));
|
||||
}
|
||||
|
||||
// Provided ETAG is passed along.
|
||||
{
|
||||
ResponseContext responseContext = ResponseContext.createEmpty();
|
||||
String etagValue = "myTag";
|
||||
responseContext.putEntityTag(etagValue);
|
||||
Response.ResponseBuilder responseBuilder = Response.ok();
|
||||
QueryResource.transferEntityTag(responseContext, responseBuilder);
|
||||
Response response = responseBuilder.build();
|
||||
MultivaluedMap<String, Object> metadata = response.getMetadata();
|
||||
Assert.assertEquals(etagValue, metadata.getFirst(QueryResource.HEADER_ETAG));
|
||||
}
|
||||
}
|
||||
}
|
|
@ -893,9 +893,9 @@ public class QueryResourceTest
|
|||
final String queryString = "{\"queryType\":\"timeBoundary\", \"dataSource\":\"allow\","
|
||||
+ "\"context\":{\"queryId\":\"id_1\"}}";
|
||||
ObjectMapper mapper = new DefaultObjectMapper();
|
||||
Query query = mapper.readValue(queryString, Query.class);
|
||||
Query<?> query = mapper.readValue(queryString, Query.class);
|
||||
|
||||
ListenableFuture future = MoreExecutors.listeningDecorator(
|
||||
ListenableFuture<?> future = MoreExecutors.listeningDecorator(
|
||||
Execs.singleThreaded("test_query_resource_%s")
|
||||
).submit(
|
||||
new Runnable()
|
||||
|
@ -1017,9 +1017,9 @@ public class QueryResourceTest
|
|||
final String queryString = "{\"queryType\":\"timeBoundary\", \"dataSource\":\"allow\","
|
||||
+ "\"context\":{\"queryId\":\"id_1\"}}";
|
||||
ObjectMapper mapper = new DefaultObjectMapper();
|
||||
Query query = mapper.readValue(queryString, Query.class);
|
||||
Query<?> query = mapper.readValue(queryString, Query.class);
|
||||
|
||||
ListenableFuture future = MoreExecutors.listeningDecorator(
|
||||
ListenableFuture<?> future = MoreExecutors.listeningDecorator(
|
||||
Execs.singleThreaded("test_query_resource_%s")
|
||||
).submit(
|
||||
new Runnable()
|
||||
|
|
|
@ -24,7 +24,6 @@ import com.google.common.collect.ImmutableList;
|
|||
import com.google.common.collect.Lists;
|
||||
import org.apache.druid.client.SegmentServerSelector;
|
||||
import org.apache.druid.java.util.common.ISE;
|
||||
import org.apache.druid.java.util.common.NonnullPair;
|
||||
import org.apache.druid.java.util.common.concurrent.Execs;
|
||||
import org.apache.druid.java.util.common.guava.FunctionalIterable;
|
||||
import org.apache.druid.java.util.common.guava.LazySequence;
|
||||
|
@ -41,7 +40,6 @@ import org.apache.druid.query.QueryToolChest;
|
|||
import org.apache.druid.query.ReferenceCountingSegmentQueryRunner;
|
||||
import org.apache.druid.query.SegmentDescriptor;
|
||||
import org.apache.druid.query.TableDataSource;
|
||||
import org.apache.druid.query.context.ResponseContext.Key;
|
||||
import org.apache.druid.query.planning.DataSourceAnalysis;
|
||||
import org.apache.druid.query.spec.SpecificSegmentQueryRunner;
|
||||
import org.apache.druid.query.spec.SpecificSegmentSpec;
|
||||
|
@ -62,7 +60,6 @@ import java.util.HashSet;
|
|||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.function.Function;
|
||||
|
||||
|
@ -167,11 +164,9 @@ public class TestClusterQuerySegmentWalker implements QuerySegmentWalker
|
|||
// the LocalQuerySegmentWalker constructor instead since this walker is not mimic remote DruidServer objects
|
||||
// to actually serve the queries
|
||||
return (theQuery, responseContext) -> {
|
||||
responseContext.put(Key.REMAINING_RESPONSES_FROM_QUERY_SERVERS, new ConcurrentHashMap<>());
|
||||
responseContext.add(
|
||||
Key.REMAINING_RESPONSES_FROM_QUERY_SERVERS,
|
||||
new NonnullPair<>(theQuery.getQuery().getMostSpecificId(), 0)
|
||||
);
|
||||
responseContext.initializeRemainingResponses();
|
||||
responseContext.addRemainingResponse(
|
||||
theQuery.getQuery().getMostSpecificId(), 0);
|
||||
if (scheduler != null) {
|
||||
Set<SegmentServerSelector> segments = new HashSet<>();
|
||||
specs.forEach(spec -> segments.add(new SegmentServerSelector(spec)));
|
||||
|
|
|
@ -63,7 +63,6 @@ import org.apache.druid.query.TableDataSource;
|
|||
import org.apache.druid.query.aggregation.MetricManipulationFn;
|
||||
import org.apache.druid.query.context.DefaultResponseContext;
|
||||
import org.apache.druid.query.context.ResponseContext;
|
||||
import org.apache.druid.query.context.ResponseContext.Key;
|
||||
import org.apache.druid.query.filter.DimFilter;
|
||||
import org.apache.druid.query.filter.Filter;
|
||||
import org.apache.druid.query.planning.DataSourceAnalysis;
|
||||
|
@ -456,8 +455,8 @@ public class ServerManagerTest
|
|||
final ResponseContext responseContext = DefaultResponseContext.createEmpty();
|
||||
final List<Result<SearchResultValue>> results = queryRunner.run(QueryPlus.wrap(query), responseContext).toList();
|
||||
Assert.assertTrue(results.isEmpty());
|
||||
Assert.assertNotNull(responseContext.get(Key.MISSING_SEGMENTS));
|
||||
Assert.assertEquals(unknownSegments, responseContext.get(Key.MISSING_SEGMENTS));
|
||||
Assert.assertNotNull(responseContext.getMissingSegments());
|
||||
Assert.assertEquals(unknownSegments, responseContext.getMissingSegments());
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -475,8 +474,8 @@ public class ServerManagerTest
|
|||
final ResponseContext responseContext = DefaultResponseContext.createEmpty();
|
||||
final List<Result<SearchResultValue>> results = queryRunner.run(QueryPlus.wrap(query), responseContext).toList();
|
||||
Assert.assertTrue(results.isEmpty());
|
||||
Assert.assertNotNull(responseContext.get(Key.MISSING_SEGMENTS));
|
||||
Assert.assertEquals(unknownSegments, responseContext.get(Key.MISSING_SEGMENTS));
|
||||
Assert.assertNotNull(responseContext.getMissingSegments());
|
||||
Assert.assertEquals(unknownSegments, responseContext.getMissingSegments());
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -495,8 +494,8 @@ public class ServerManagerTest
|
|||
final ResponseContext responseContext = DefaultResponseContext.createEmpty();
|
||||
final List<Result<SearchResultValue>> results = queryRunner.run(QueryPlus.wrap(query), responseContext).toList();
|
||||
Assert.assertTrue(results.isEmpty());
|
||||
Assert.assertNotNull(responseContext.get(Key.MISSING_SEGMENTS));
|
||||
Assert.assertEquals(unknownSegments, responseContext.get(Key.MISSING_SEGMENTS));
|
||||
Assert.assertNotNull(responseContext.getMissingSegments());
|
||||
Assert.assertEquals(unknownSegments, responseContext.getMissingSegments());
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -526,8 +525,8 @@ public class ServerManagerTest
|
|||
final ResponseContext responseContext = DefaultResponseContext.createEmpty();
|
||||
final List<Result<SearchResultValue>> results = queryRunner.run(QueryPlus.wrap(query), responseContext).toList();
|
||||
Assert.assertTrue(results.isEmpty());
|
||||
Assert.assertNotNull(responseContext.get(Key.MISSING_SEGMENTS));
|
||||
Assert.assertEquals(closedSegments, responseContext.get(Key.MISSING_SEGMENTS));
|
||||
Assert.assertNotNull(responseContext.getMissingSegments());
|
||||
Assert.assertEquals(closedSegments, responseContext.getMissingSegments());
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
Loading…
Reference in New Issue