From 34a3d45737c260497ca01eb6904d77bd628b9ee2 Mon Sep 17 00:00:00 2001 From: Paul Rogers Date: Mon, 6 Dec 2021 17:03:12 -0800 Subject: [PATCH] Refactor ResponseContext (#11828) * Refactor ResponseContext Fixes a number of issues in preparation for request trailers and the query profile. * Converts keys from an enum to classes for smaller code * Wraps stored values in functions for easier capture for other uses * Reworks the "header squeezer" to handle types other than arrays. * Uses metadata for visibility, and ability to compress, to replace ad-hoc code. * Cleans up JSON serialization for the response context. * Other miscellaneous cleanup. * Handle unknown keys in deserialization Also, make "Visibility" into a boolean. * Revised comment * Renamd variable --- .gitignore | 1 + .../MovingAverageQueryRunner.java | 6 +- .../DruidDefaultSerializersModule.java | 8 + .../druid/query/CPUTimeMetricQueryRunner.java | 3 +- ...portTimelineMissingSegmentQueryRunner.java | 2 +- .../context/ConcurrentResponseContext.java | 4 +- .../query/context/DefaultResponseContext.java | 4 +- .../druid/query/context/ResponseContext.java | 789 +++++++++++++----- .../context/ResponseContextDeserializer.java | 128 +++ .../druid/query/scan/ScanQueryEngine.java | 21 +- .../query/scan/ScanQueryRunnerFactory.java | 10 +- .../spec/SpecificSegmentQueryRunner.java | 3 +- ...TimelineMissingSegmentQueryRunnerTest.java | 9 +- .../query/context/ResponseContextTest.java | 410 ++++----- .../DataSourceMetadataQueryTest.java | 3 +- .../druid/query/scan/ScanQueryRunnerTest.java | 2 +- .../spec/SpecificSegmentQueryRunnerTest.java | 10 +- .../TimeBoundaryQueryRunnerTest.java | 4 +- .../druid/client/CachingClusteredClient.java | 12 +- .../druid/client/DirectDruidClient.java | 22 +- .../query/ResultLevelCachingQueryRunner.java | 2 +- .../apache/druid/query/RetryQueryRunner.java | 23 +- .../apache/druid/server/QueryResource.java | 25 +- ...chingClusteredClientFunctionalityTest.java | 6 +- .../client/CachingClusteredClientTest.java | 10 +- ...yRunnerBasedOnClusteredClientTestBase.java | 4 +- .../apache/druid/server/QueryDetailsTest.java | 59 ++ .../druid/server/QueryResourceTest.java | 8 +- .../server/TestClusterQuerySegmentWalker.java | 11 +- .../coordination/ServerManagerTest.java | 17 +- 30 files changed, 1113 insertions(+), 503 deletions(-) create mode 100644 processing/src/main/java/org/apache/druid/query/context/ResponseContextDeserializer.java create mode 100644 server/src/test/java/org/apache/druid/server/QueryDetailsTest.java diff --git a/.gitignore b/.gitignore index a8b2a0bde5c..d92e548ba4b 100644 --- a/.gitignore +++ b/.gitignore @@ -24,3 +24,4 @@ README **/.pmd **/.pmdruleset.xml .java-version +integration-tests/gen-scripts/ diff --git a/extensions-contrib/moving-average-query/src/main/java/org/apache/druid/query/movingaverage/MovingAverageQueryRunner.java b/extensions-contrib/moving-average-query/src/main/java/org/apache/druid/query/movingaverage/MovingAverageQueryRunner.java index 5cc0964079c..80cc45fbc29 100644 --- a/extensions-contrib/moving-average-query/src/main/java/org/apache/druid/query/movingaverage/MovingAverageQueryRunner.java +++ b/extensions-contrib/moving-average-query/src/main/java/org/apache/druid/query/movingaverage/MovingAverageQueryRunner.java @@ -123,8 +123,7 @@ public class MovingAverageQueryRunner implements QueryRunner ResponseContext gbqResponseContext = ResponseContext.createEmpty(); gbqResponseContext.merge(responseContext); - gbqResponseContext.put( - ResponseContext.Key.QUERY_FAIL_DEADLINE_MILLIS, + gbqResponseContext.putQueryFailDeadlineMs( System.currentTimeMillis() + QueryContexts.getTimeout(gbq) ); @@ -164,8 +163,7 @@ public class MovingAverageQueryRunner implements QueryRunner ); ResponseContext tsqResponseContext = ResponseContext.createEmpty(); tsqResponseContext.merge(responseContext); - tsqResponseContext.put( - ResponseContext.Key.QUERY_FAIL_DEADLINE_MILLIS, + tsqResponseContext.putQueryFailDeadlineMs( System.currentTimeMillis() + QueryContexts.getTimeout(tsq) ); diff --git a/processing/src/main/java/org/apache/druid/jackson/DruidDefaultSerializersModule.java b/processing/src/main/java/org/apache/druid/jackson/DruidDefaultSerializersModule.java index 214e72a9cdd..a8ec404ad8c 100644 --- a/processing/src/main/java/org/apache/druid/jackson/DruidDefaultSerializersModule.java +++ b/processing/src/main/java/org/apache/druid/jackson/DruidDefaultSerializersModule.java @@ -31,15 +31,20 @@ import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.guava.Accumulator; import org.apache.druid.java.util.common.guava.Sequence; import org.apache.druid.java.util.common.guava.Yielder; +import org.apache.druid.query.context.ResponseContext; +import org.apache.druid.query.context.ResponseContextDeserializer; import org.joda.time.DateTimeZone; import java.io.IOException; import java.nio.ByteOrder; /** + * */ +@SuppressWarnings("serial") public class DruidDefaultSerializersModule extends SimpleModule { + @SuppressWarnings("rawtypes") public DruidDefaultSerializersModule() { super("Druid default serializers"); @@ -78,6 +83,7 @@ public class DruidDefaultSerializersModule extends SimpleModule Sequence.class, new JsonSerializer() { + @SuppressWarnings("unchecked") @Override public void serialize(Sequence value, final JsonGenerator jgen, SerializerProvider provider) throws IOException @@ -108,6 +114,7 @@ public class DruidDefaultSerializersModule extends SimpleModule Yielder.class, new JsonSerializer() { + @SuppressWarnings("unchecked") @Override public void serialize(Yielder yielder, final JsonGenerator jgen, SerializerProvider provider) throws IOException @@ -142,5 +149,6 @@ public class DruidDefaultSerializersModule extends SimpleModule } } ); + addDeserializer(ResponseContext.class, new ResponseContextDeserializer()); } } diff --git a/processing/src/main/java/org/apache/druid/query/CPUTimeMetricQueryRunner.java b/processing/src/main/java/org/apache/druid/query/CPUTimeMetricQueryRunner.java index 6b51cbbcc6e..e63e4ee07d9 100644 --- a/processing/src/main/java/org/apache/druid/query/CPUTimeMetricQueryRunner.java +++ b/processing/src/main/java/org/apache/druid/query/CPUTimeMetricQueryRunner.java @@ -56,7 +56,6 @@ public class CPUTimeMetricQueryRunner implements QueryRunner this.report = report; } - @Override public Sequence run(final QueryPlus queryPlus, final ResponseContext responseContext) { @@ -88,7 +87,7 @@ public class CPUTimeMetricQueryRunner implements QueryRunner if (report) { final long cpuTimeNs = cpuTimeAccumulator.get(); if (cpuTimeNs > 0) { - responseContext.add(ResponseContext.Key.CPU_CONSUMED_NANOS, cpuTimeNs); + responseContext.addCpuNanos(cpuTimeNs); queryWithMetrics.getQueryMetrics().reportCpuTime(cpuTimeNs).emit(emitter); } } diff --git a/processing/src/main/java/org/apache/druid/query/ReportTimelineMissingSegmentQueryRunner.java b/processing/src/main/java/org/apache/druid/query/ReportTimelineMissingSegmentQueryRunner.java index 89c593256be..a3e9f6a9aa8 100644 --- a/processing/src/main/java/org/apache/druid/query/ReportTimelineMissingSegmentQueryRunner.java +++ b/processing/src/main/java/org/apache/druid/query/ReportTimelineMissingSegmentQueryRunner.java @@ -49,7 +49,7 @@ public class ReportTimelineMissingSegmentQueryRunner implements QueryRunner run(QueryPlus queryPlus, ResponseContext responseContext) { LOG.debug("Reporting a missing segments[%s] for query[%s]", descriptors, queryPlus.getQuery().getId()); - responseContext.add(ResponseContext.Key.MISSING_SEGMENTS, descriptors); + responseContext.addMissingSegments(descriptors); return Sequences.empty(); } } diff --git a/processing/src/main/java/org/apache/druid/query/context/ConcurrentResponseContext.java b/processing/src/main/java/org/apache/druid/query/context/ConcurrentResponseContext.java index b1e648467a7..fb58f4f96e2 100644 --- a/processing/src/main/java/org/apache/druid/query/context/ConcurrentResponseContext.java +++ b/processing/src/main/java/org/apache/druid/query/context/ConcurrentResponseContext.java @@ -35,10 +35,10 @@ public class ConcurrentResponseContext extends ResponseContext return new ConcurrentResponseContext(); } - private final ConcurrentHashMap delegate = new ConcurrentHashMap<>(); + private final ConcurrentHashMap delegate = new ConcurrentHashMap<>(); @Override - protected Map getDelegate() + protected Map getDelegate() { return delegate; } diff --git a/processing/src/main/java/org/apache/druid/query/context/DefaultResponseContext.java b/processing/src/main/java/org/apache/druid/query/context/DefaultResponseContext.java index 33724c1bf04..5e8a1fdde4a 100644 --- a/processing/src/main/java/org/apache/druid/query/context/DefaultResponseContext.java +++ b/processing/src/main/java/org/apache/druid/query/context/DefaultResponseContext.java @@ -35,10 +35,10 @@ public class DefaultResponseContext extends ResponseContext return new DefaultResponseContext(); } - private final HashMap delegate = new HashMap<>(); + private final HashMap delegate = new HashMap<>(); @Override - protected Map getDelegate() + protected Map getDelegate() { return delegate; } diff --git a/processing/src/main/java/org/apache/druid/query/context/ResponseContext.java b/processing/src/main/java/org/apache/druid/query/context/ResponseContext.java index ffcef162034..62d995ad40a 100644 --- a/processing/src/main/java/org/apache/druid/query/context/ResponseContext.java +++ b/processing/src/main/java/org/apache/druid/query/context/ResponseContext.java @@ -19,38 +19,80 @@ package org.apache.druid.query.context; +import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonValue; +import com.fasterxml.jackson.core.JsonParser; import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ArrayNode; import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.common.collect.Lists; +import org.apache.druid.guice.annotations.ExtensionPoint; import org.apache.druid.guice.annotations.PublicApi; import org.apache.druid.java.util.common.IAE; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.NonnullPair; -import org.apache.druid.java.util.common.jackson.JacksonUtils; import org.apache.druid.query.SegmentDescriptor; +import org.apache.druid.utils.CollectionUtils; import org.joda.time.Interval; import javax.annotation.Nullable; + import java.io.IOException; import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; import java.util.Comparator; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentSkipListMap; -import java.util.function.BiFunction; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Function; +import java.util.stream.Collectors; /** * The context for storing and passing data between chains of {@link org.apache.druid.query.QueryRunner}s. * The context is also transferred between Druid nodes with all the data it contains. - */ + *

+ * The response context consists of a set of key/value pairs. Keys are those defined in + * the {@code Keys} registry. Keys are indexed by key instance, not by name. The + * key defines the type of the associated value, including logic to merge values and + * to deserialize JSON values for that key. + * + *

Structure

+ * The context has evolved to perform multiple tasks. First, it holds two kinds + * of information: + *
    + *
  • Information to be returned in the query response header. + * These are values tagged as being in the header.
  • + *
  • Values passed within a single server. These are tagged with + * not being in the header.
  • + *
+ * Second, it performs multiple tasks: + *
    + *
  • Registers the keys to be used in the header. But, since it also holds + * internal information, the internal information also needs keys, though the + * corresponding values are never serialized.
  • + *
  • Gathers information for the query as a whole.
  • + *
  • Merges information back up the query tree: from multiple segments, + * from multiple servers, etc.
  • + *
  • Manages headers size by dropping fields when the header would get too + * large.
  • + *
+ * + * A result is that the information the context, when inspected by a calling + * query, may be incomplete if some of it was previously dropped by the + * called query. + * + *

API

+ * + * The query profile needs to obtain the full, untruncated information. To do this + * it piggy-backs on the set operations to obtain the full value. To ensure this + * is possible, code that works with standard values should call the set (or add) + * functions provided which will do the needed map update. + */ @PublicApi public abstract class ResponseContext { @@ -58,65 +100,243 @@ public abstract class ResponseContext * The base interface of a response context key. * Should be implemented by every context key. */ - public interface BaseKey + @ExtensionPoint + public interface Key { @JsonValue String getName(); + /** - * Merge function associated with a key: Object (Object oldValue, Object newValue) + * Whether to return the key, value pair in the response header. + * If false, the value is for internal use only. */ - BiFunction getMergeFunction(); + boolean includeInHeader(); + + /** + * Reads a value of this key from a JSON stream. Used by {@link ResponseContextDeserializer}. + */ + Object readValue(JsonParser jp); + + /** + * Merges two values of type T. + * + * This method may modify "oldValue" but must not modify "newValue". + */ + Object mergeValues(Object oldValue, Object newValue); + + /** + * Returns true if this key can be removed to reduce header size when the + * header would otherwise be too large. + */ + @JsonIgnore + boolean canDrop(); } /** - * Keys associated with objects in the context. + * Abstract key class which provides most functionality except the + * type-specific merge logic. Parsing is provided by an associated + * parse function. + */ + public abstract static class AbstractKey implements Key + { + private final String name; + private final boolean inHeader; + private final boolean canDrop; + private final Function parseFunction; + + AbstractKey(String name, boolean inHeader, boolean canDrop, Class serializedClass) + { + this.name = name; + this.inHeader = inHeader; + this.canDrop = canDrop; + this.parseFunction = jp -> { + try { + return jp.readValueAs(serializedClass); + } + catch (IOException e) { + throw new RuntimeException(e); + } + }; + } + + AbstractKey(String name, boolean inHeader, boolean canDrop, TypeReference serializedTypeReference) + { + this.name = name; + this.inHeader = inHeader; + this.canDrop = canDrop; + this.parseFunction = jp -> { + try { + return jp.readValueAs(serializedTypeReference); + } + catch (IOException e) { + throw new RuntimeException(e); + } + }; + } + + @Override + public String getName() + { + return name; + } + + @Override + public boolean includeInHeader() + { + return inHeader; + } + + @Override + public boolean canDrop() + { + return canDrop; + } + + @Override + public Object readValue(JsonParser jp) + { + return parseFunction.apply(jp); + } + + @Override + public String toString() + { + return name; + } + } + + /** + * String valued attribute that holds the latest value assigned. + */ + public static class StringKey extends AbstractKey + { + StringKey(String name, boolean inHeader, boolean canDrop) + { + super(name, inHeader, canDrop, String.class); + } + + @Override + public Object mergeValues(Object oldValue, Object newValue) + { + return newValue; + } + } + + /** + * Boolean valued attribute with the semantics that once the flag is + * set true, it stays true. + */ + public static class BooleanKey extends AbstractKey + { + BooleanKey(String name, boolean inHeader) + { + super(name, inHeader, false, Boolean.class); + } + + @Override + public Object mergeValues(Object oldValue, Object newValue) + { + return (boolean) oldValue || (boolean) newValue; + } + } + + /** + * Long valued attribute that holds the latest value assigned. + */ + public static class LongKey extends AbstractKey + { + LongKey(String name, boolean inHeader) + { + super(name, inHeader, false, Long.class); + } + + @Override + public Object mergeValues(Object oldValue, Object newValue) + { + return newValue; + } + } + + /** + * Long valued attribute that holds the accumulation of values assigned. + */ + public static class CounterKey extends AbstractKey + { + CounterKey(String name, boolean inHeader) + { + super(name, inHeader, false, Long.class); + } + + @Override + public Object mergeValues(Object oldValue, Object newValue) + { + if (oldValue == null) { + return newValue; + } + if (newValue == null) { + return oldValue; + } + return (Long) oldValue + (Long) newValue; + } + } + + /** + * Global registry of response context keys. Also defines the standard keys + * associated with objects in the context. *

- * If it's necessary to have some new keys in the context then they might be listed in a separate enum: + * If it's necessary to add new keys in the context then they should be listed + * in a separate class: *

{@code
-   * public enum ExtensionResponseContextKey implements BaseKey
+   * public class SomeClass
    * {
-   *   EXTENSION_KEY_1("extension_key_1"), EXTENSION_KEY_2("extension_key_2");
+   *   static final Key EXTENSION_KEY_1 = new StringKey(
+   *      "extension_key_1", Visibility.HEADER_AND_TRAILER, true),
+   *   static final Key EXTENSION_KEY_2 = new CounterKey(
+   *      "extension_key_2", Visibility.None);
    *
    *   static {
-   *     for (BaseKey key : values()) ResponseContext.Key.registerKey(key);
+   *     Keys.instance().registerKeys(new Key[] {
+   *        EXTENSION_KEY_1,
+   *        EXTENSION_KEY_2
+   *     });
    *   }
-   *
-   *   private final String name;
-   *   private final BiFunction mergeFunction;
-   *
-   *   ExtensionResponseContextKey(String name)
-   *   {
-   *     this.name = name;
-   *     this.mergeFunction = (oldValue, newValue) -> newValue;
-   *   }
-   *
-   *   @Override public String getName() { return name; }
-   *
-   *   @Override public BiFunction getMergeFunction() { return mergeFunction; }
-   * }
    * }
- * Make sure all extension enum values added with {@link Key#registerKey} method. + * Make sure all extension keys are added with the {@link #registerKey(Key)} or + * {@link #registerKeys(Key[])} methods. + *

+ * Create custom keys in one of two ways. As shown above, predefined key types + * exist for common values. Custom values can be created as shown in the code + * for this class. */ - public enum Key implements BaseKey + public static class Keys { /** * Lists intervals for which NO segment is present. */ - UNCOVERED_INTERVALS( + public static final Key UNCOVERED_INTERVALS = new AbstractKey( "uncoveredIntervals", - (oldValue, newValue) -> { - final ArrayList result = new ArrayList((List) oldValue); - result.addAll((List) newValue); - return result; - } - ), + true, true, + new TypeReference>() + { + }) + { + @Override + @SuppressWarnings("unchecked") + public Object mergeValues(Object oldValue, Object newValue) + { + final List result = new ArrayList((List) oldValue); + result.addAll((List) newValue); + return result; + } + }; + /** * Indicates if the number of uncovered intervals exceeded the limit (true/false). */ - UNCOVERED_INTERVALS_OVERFLOWED( + public static final Key UNCOVERED_INTERVALS_OVERFLOWED = new BooleanKey( "uncoveredIntervalsOverflowed", - (oldValue, newValue) -> (boolean) oldValue || (boolean) newValue - ), + true); + /** * Map of most relevant query ID to remaining number of responses from query nodes. * The value is initialized in {@code CachingClusteredClient} when it initializes the connection to the query nodes, @@ -129,156 +349,213 @@ public abstract class ResponseContext * * @see org.apache.druid.query.Query#getMostSpecificId */ - REMAINING_RESPONSES_FROM_QUERY_SERVERS( + public static final Key REMAINING_RESPONSES_FROM_QUERY_SERVERS = new AbstractKey( "remainingResponsesFromQueryServers", - (totalRemainingPerId, idAndNumResponses) -> { - final ConcurrentHashMap map = (ConcurrentHashMap) totalRemainingPerId; - final NonnullPair pair = (NonnullPair) idAndNumResponses; - map.compute( - pair.lhs, - (id, remaining) -> remaining == null ? pair.rhs : remaining + pair.rhs - ); - return map; - } - ), + false, true, + Object.class) + { + @Override + @SuppressWarnings("unchecked") + public Object mergeValues(Object totalRemainingPerId, Object idAndNumResponses) + { + final ConcurrentHashMap map = (ConcurrentHashMap) totalRemainingPerId; + final NonnullPair pair = (NonnullPair) idAndNumResponses; + map.compute( + pair.lhs, + (id, remaining) -> remaining == null ? pair.rhs : remaining + pair.rhs); + return map; + } + }; + /** * Lists missing segments. */ - MISSING_SEGMENTS( + public static final Key MISSING_SEGMENTS = new AbstractKey( "missingSegments", - (oldValue, newValue) -> { - final ArrayList result = new ArrayList((List) oldValue); - result.addAll((List) newValue); - return result; - } - ), + true, true, + new TypeReference>() {}) + { + @Override + @SuppressWarnings("unchecked") + public Object mergeValues(Object oldValue, Object newValue) + { + final List result = new ArrayList((List) oldValue); + result.addAll((List) newValue); + return result; + } + }; + /** * Entity tag. A part of HTTP cache validation mechanism. * Is being removed from the context before sending and used as a separate HTTP header. */ - ETAG("ETag"), - /** - * Query fail time (current time + timeout). - * It is not updated continuously as {@link Key#TIMEOUT_AT}. - */ - QUERY_FAIL_DEADLINE_MILLIS("queryFailTime"), + public static final Key ETAG = new StringKey("ETag", false, true); + /** * Query total bytes gathered. */ - QUERY_TOTAL_BYTES_GATHERED("queryTotalBytesGathered"), + public static final Key QUERY_TOTAL_BYTES_GATHERED = new AbstractKey( + "queryTotalBytesGathered", + false, false, + new TypeReference() {}) + { + @Override + public Object mergeValues(Object oldValue, Object newValue) + { + return ((AtomicLong) newValue).addAndGet(((AtomicLong) newValue).get()); + } + }; + + /** + * Query fail time (current time + timeout). + * It is not updated continuously as {@link Keys#TIMEOUT_AT}. + */ + public static final Key QUERY_FAIL_DEADLINE_MILLIS = new LongKey( + "queryFailTime", + false); + /** * This variable indicates when a running query should be expired, * and is effective only when 'timeout' of queryContext has a positive value. * Continuously updated by {@link org.apache.druid.query.scan.ScanQueryEngine} * by reducing its value on the time of every scan iteration. */ - TIMEOUT_AT("timeoutAt"), + public static final Key TIMEOUT_AT = new LongKey( + "timeoutAt", + false); + /** - * The number of scanned rows. - * For backward compatibility the context key name still equals to "count". + * The number of rows scanned by {@link org.apache.druid.query.scan.ScanQueryEngine}. + * + * Named "count" for backwards compatibility with older data servers that still send this, even though it's now + * marked as internal. */ - NUM_SCANNED_ROWS( + public static final Key NUM_SCANNED_ROWS = new CounterKey( "count", - (oldValue, newValue) -> ((Number) oldValue).longValue() + ((Number) newValue).longValue() - ), + false); + /** * The total CPU time for threads related to Sequence processing of the query. * Resulting value on a Broker is a sum of downstream values from historicals / realtime nodes. * For additional information see {@link org.apache.druid.query.CPUTimeMetricQueryRunner} */ - CPU_CONSUMED_NANOS( + public static final Key CPU_CONSUMED_NANOS = new CounterKey( "cpuConsumed", - (oldValue, newValue) -> ((Number) oldValue).longValue() + ((Number) newValue).longValue() - ), + false); + /** * Indicates if a {@link ResponseContext} was truncated during serialization. */ - TRUNCATED( + public static final Key TRUNCATED = new BooleanKey( "truncated", - (oldValue, newValue) -> (boolean) oldValue || (boolean) newValue - ); + false); + + /** + * One and only global list of keys. This is a semi-constant: it is mutable + * at start-up time, but then is not thread-safe, and must remain unchanged + * for the duration of the server run. + */ + public static final Keys INSTANCE = new Keys(); /** * ConcurrentSkipListMap is used to have the natural ordering of its keys. - * Thread-safe structure is required since there is no guarantee that {@link #registerKey(BaseKey)} + * Thread-safe structure is required since there is no guarantee that {@link #registerKey(Key)} * would be called only from class static blocks. */ - private static final ConcurrentMap REGISTERED_KEYS = new ConcurrentSkipListMap<>(); + private final ConcurrentMap registeredKeys = new ConcurrentSkipListMap<>(); static { - for (BaseKey key : values()) { - registerKey(key); - } + instance().registerKeys(new Key[] + { + UNCOVERED_INTERVALS, + UNCOVERED_INTERVALS_OVERFLOWED, + REMAINING_RESPONSES_FROM_QUERY_SERVERS, + MISSING_SEGMENTS, + ETAG, + QUERY_TOTAL_BYTES_GATHERED, + QUERY_FAIL_DEADLINE_MILLIS, + TIMEOUT_AT, + NUM_SCANNED_ROWS, + CPU_CONSUMED_NANOS, + TRUNCATED, + }); + } + + /** + * Use {@link #instance()} to obtain the singleton instance of this class. + */ + private Keys() + { + } + + /** + * Returns the single, global key registry for this server. + */ + public static Keys instance() + { + return INSTANCE; } /** * Primary way of registering context keys. + * * @throws IllegalArgumentException if the key has already been registered. */ - public static void registerKey(BaseKey key) + public void registerKey(Key key) { - if (REGISTERED_KEYS.putIfAbsent(key.getName(), key) != null) { + if (registeredKeys.putIfAbsent(key.getName(), key) != null) { throw new IAE("Key [%s] has already been registered as a context key", key.getName()); } } + /** + * Register a group of keys. + */ + public void registerKeys(Key[] keys) + { + for (Key key : keys) { + registerKey(key); + } + } + /** * Returns a registered key associated with the name {@param name}. + * * @throws IllegalStateException if a corresponding key has not been registered. */ - public static BaseKey keyOf(String name) + public Key keyOf(String name) { - BaseKey key = REGISTERED_KEYS.get(name); + Key key = registeredKeys.get(name); if (key == null) { - throw new ISE("Key [%s] has not yet been registered as a context key", name); + throw new ISE("Key [%s] is not registered as a context key", name); } return key; } /** - * Returns all keys registered via {@link Key#registerKey}. + * Returns a registered key associated with the given name, or + * {@code null} if the key is not registered. This form is for testing + * and for deserialization when the existence of the key is suspect. */ - public static Collection getAllRegisteredKeys() + public Key find(String name) { - return Collections.unmodifiableCollection(REGISTERED_KEYS.values()); - } - - private final String name; - - private final BiFunction mergeFunction; - - Key(String name) - { - this.name = name; - this.mergeFunction = (oldValue, newValue) -> newValue; - } - - Key(String name, BiFunction mergeFunction) - { - this.name = name; - this.mergeFunction = mergeFunction; - } - - @Override - public String getName() - { - return name; - } - - @Override - public BiFunction getMergeFunction() - { - return mergeFunction; + return registeredKeys.get(name); } } - protected abstract Map getDelegate(); + protected abstract Map getDelegate(); + + public Map toMap() + { + return CollectionUtils.mapKeys(getDelegate(), k -> k.getName()); + } private static final Comparator> VALUE_LENGTH_REVERSED_COMPARATOR = Comparator.comparing((Map.Entry e) -> e.getValue().toString().length()).reversed(); /** * Create an empty DefaultResponseContext instance + * * @return empty DefaultResponseContext instance */ public static ResponseContext createEmpty() @@ -286,40 +563,130 @@ public abstract class ResponseContext return DefaultResponseContext.createEmpty(); } + /** + * Initialize fields for a query context. Not needed when merging. + */ + public void initialize() + { + putValue(Keys.QUERY_TOTAL_BYTES_GATHERED, new AtomicLong()); + initializeRemainingResponses(); + } + + public void initializeRemainingResponses() + { + putValue(Keys.REMAINING_RESPONSES_FROM_QUERY_SERVERS, new ConcurrentHashMap<>()); + } + + public void initializeMissingSegments() + { + putValue(Keys.MISSING_SEGMENTS, new ArrayList<>()); + } + + public void initializeRowScanCount() + { + putValue(Keys.NUM_SCANNED_ROWS, 0L); + } + /** * Deserializes a string into {@link ResponseContext} using given {@link ObjectMapper}. + * * @throws IllegalStateException if one of the deserialized map keys has not been registered. */ public static ResponseContext deserialize(String responseContext, ObjectMapper objectMapper) throws IOException { - final Map keyNameToObjects = objectMapper.readValue( - responseContext, - JacksonUtils.TYPE_REFERENCE_MAP_STRING_OBJECT - ); - final ResponseContext context = ResponseContext.createEmpty(); - keyNameToObjects.forEach((keyName, value) -> { - final BaseKey key = Key.keyOf(keyName); - context.add(key, value); - }); - return context; + return objectMapper.readValue(responseContext, ResponseContext.class); } /** - * Associates the specified object with the specified key. + * Associates the specified object with the specified extension key. + * * @throws IllegalStateException if the key has not been registered. */ - public Object put(BaseKey key, Object value) + public Object put(Key key, Object value) { - final BaseKey registeredKey = Key.keyOf(key.getName()); - return getDelegate().put(registeredKey, value); + final Key registeredKey = Keys.instance().keyOf(key.getName()); + return putValue(registeredKey, value); } - public Object get(BaseKey key) + public void putUncoveredIntervals(List intervals, boolean overflowed) + { + putValue(Keys.UNCOVERED_INTERVALS, intervals); + putValue(Keys.UNCOVERED_INTERVALS_OVERFLOWED, overflowed); + } + + public void putEntityTag(String eTag) + { + putValue(Keys.ETAG, eTag); + } + + public void putTimeoutTime(long time) + { + putValue(Keys.TIMEOUT_AT, time); + } + + public void putQueryFailDeadlineMs(long deadlineMs) + { + putValue(Keys.QUERY_FAIL_DEADLINE_MILLIS, deadlineMs); + } + + /** + * Associates the specified object with the specified key. Assumes that + * the key is validated. + */ + private Object putValue(Key key, Object value) + { + return getDelegate().put(key, value); + } + + public Object get(Key key) { return getDelegate().get(key); } - public Object remove(BaseKey key) + @SuppressWarnings("unchecked") + public ConcurrentHashMap getRemainingResponses() + { + return (ConcurrentHashMap) get(Keys.REMAINING_RESPONSES_FROM_QUERY_SERVERS); + } + + @SuppressWarnings("unchecked") + public List getUncoveredIntervals() + { + return (List) get(Keys.UNCOVERED_INTERVALS); + } + + @SuppressWarnings("unchecked") + public List getMissingSegments() + { + return (List) get(Keys.MISSING_SEGMENTS); + } + + public String getEntityTag() + { + return (String) get(Keys.ETAG); + } + + public AtomicLong getTotalBytes() + { + return (AtomicLong) get(Keys.QUERY_TOTAL_BYTES_GATHERED); + } + + public Long getTimeoutTime() + { + return (Long) get(Keys.TIMEOUT_AT); + } + + public Long getRowScanCount() + { + return (Long) get(Keys.NUM_SCANNED_ROWS); + } + + public Long getCpuNanos() + { + return (Long) get(Keys.CPU_CONSUMED_NANOS); + } + + public Object remove(Key key) { return getDelegate().remove(key); } @@ -327,16 +694,44 @@ public abstract class ResponseContext /** * Adds (merges) a new value associated with a key to an old value. * See merge function of a context key for a specific implementation. + * * @throws IllegalStateException if the key has not been registered. */ - public Object add(BaseKey key, Object value) + public Object add(Key key, Object value) { - final BaseKey registeredKey = Key.keyOf(key.getName()); - return getDelegate().merge(registeredKey, value, key.getMergeFunction()); + final Key registeredKey = Keys.instance().keyOf(key.getName()); + return addValue(registeredKey, value); + } + + public void addRemainingResponse(String id, int count) + { + addValue(Keys.REMAINING_RESPONSES_FROM_QUERY_SERVERS, + new NonnullPair<>(id, count)); + } + + public void addMissingSegments(List descriptors) + { + addValue(Keys.MISSING_SEGMENTS, descriptors); + } + + public void addRowScanCount(long count) + { + addValue(Keys.NUM_SCANNED_ROWS, count); + } + + public void addCpuNanos(long ns) + { + addValue(Keys.CPU_CONSUMED_NANOS, ns); + } + + private Object addValue(Key key, Object value) + { + return getDelegate().merge(key, value, key::mergeValues); } /** * Merges a response context into the current. + * * @throws IllegalStateException If a key of the {@code responseContext} has not been registered. */ public void merge(ResponseContext responseContext) @@ -351,87 +746,101 @@ public abstract class ResponseContext /** * Serializes the context given that the resulting string length is less than the provided limit. * This method removes some elements from context collections if it's needed to satisfy the limit. - * There is no explicit priorities of keys which values are being truncated because for now there are only - * two potential limit breaking keys ({@link Key#UNCOVERED_INTERVALS} - * and {@link Key#MISSING_SEGMENTS}) and their values are arrays. - * Thus current implementation considers these arrays as equal prioritized and starts removing elements from + * There is no explicit priorities of keys which values are being truncated. + * Any kind of key can be removed, the key's @{code canDrop()} attribute indicates + * which can be dropped. (The unit tests use a string key.) + * Thus keys as equally prioritized and starts removing elements from * the array which serialized value length is the biggest. - * The resulting string might be correctly deserialized to {@link ResponseContext}. + * The resulting string will be correctly deserialized to {@link ResponseContext}. */ - public SerializationResult serializeWith(ObjectMapper objectMapper, int maxCharsNumber) throws JsonProcessingException + public SerializationResult serializeWith(ObjectMapper objectMapper, int maxCharsNumber) + throws JsonProcessingException { - final String fullSerializedString = objectMapper.writeValueAsString(getDelegate()); + final Map headerMap = + getDelegate().entrySet() + .stream() + .filter(entry -> entry.getKey().includeInHeader()) + .collect( + Collectors.toMap( + Map.Entry::getKey, + Map.Entry::getValue + ) + ); + + final String fullSerializedString = objectMapper.writeValueAsString(headerMap); if (fullSerializedString.length() <= maxCharsNumber) { return new SerializationResult(null, fullSerializedString); - } else { - // Indicates that the context is truncated during serialization. - add(Key.TRUNCATED, true); - final ObjectNode contextJsonNode = objectMapper.valueToTree(getDelegate()); - final ArrayList> sortedNodesByLength = Lists.newArrayList(contextJsonNode.fields()); - sortedNodesByLength.sort(VALUE_LENGTH_REVERSED_COMPARATOR); - int needToRemoveCharsNumber = fullSerializedString.length() - maxCharsNumber; - // The complexity of this block is O(n*m*log(m)) where n - context size, m - context's array size - for (Map.Entry e : sortedNodesByLength) { - final String fieldName = e.getKey(); - final JsonNode node = e.getValue(); - if (node.isArray()) { - if (needToRemoveCharsNumber >= node.toString().length()) { - // We need to remove more chars than the field's length so removing it completely - contextJsonNode.remove(fieldName); - // Since the field is completely removed (name + value) we need to do a recalculation - needToRemoveCharsNumber = contextJsonNode.toString().length() - maxCharsNumber; - } else { - final ArrayNode arrayNode = (ArrayNode) node; - needToRemoveCharsNumber -= removeNodeElementsToSatisfyCharsLimit(arrayNode, needToRemoveCharsNumber); - if (arrayNode.size() == 0) { - // The field is empty, removing it because an empty array field may be misleading - // for the recipients of the truncated response context. - contextJsonNode.remove(fieldName); - // Since the field is completely removed (name + value) we need to do a recalculation - needToRemoveCharsNumber = contextJsonNode.toString().length() - maxCharsNumber; - } - } // node is not an array - } else { - // A context should not contain nulls so we completely remove the field. + } + + int needToRemoveCharsNumber = fullSerializedString.length() - maxCharsNumber; + // Indicates that the context is truncated during serialization. + headerMap.put(Keys.TRUNCATED, true); + // Account for the extra field just added: "truncated: true, + // The length of ": true," is 7. + needToRemoveCharsNumber += Keys.TRUNCATED.getName().length() + 7; + final ObjectNode contextJsonNode = objectMapper.valueToTree(headerMap); + final List> sortedNodesByLength = Lists.newArrayList(contextJsonNode.fields()); + sortedNodesByLength.sort(VALUE_LENGTH_REVERSED_COMPARATOR); + // The complexity of this block is O(n*m*log(m)) where n - context size, m - context's array size + for (Map.Entry e : sortedNodesByLength) { + final String fieldName = e.getKey(); + if (!Keys.instance().keyOf(fieldName).canDrop()) { + continue; + } + final JsonNode node = e.getValue(); + int removeLength = fieldName.length() + node.toString().length(); + if (removeLength < needToRemoveCharsNumber || !node.isArray()) { + // Remove the field + contextJsonNode.remove(fieldName); + needToRemoveCharsNumber -= removeLength; + } else { + final ArrayNode arrayNode = (ArrayNode) node; + int removed = removeNodeElementsToSatisfyCharsLimit(arrayNode, needToRemoveCharsNumber); + if (arrayNode.size() == 0) { + // The field is now empty, removing it because an empty array field may be misleading + // for the recipients of the truncated response context. contextJsonNode.remove(fieldName); - // Since the field is completely removed (name + value) we need to do a recalculation - needToRemoveCharsNumber = contextJsonNode.toString().length() - maxCharsNumber; - } - if (needToRemoveCharsNumber <= 0) { - break; + needToRemoveCharsNumber -= removeLength; + } else { + needToRemoveCharsNumber -= removed; } } - return new SerializationResult(contextJsonNode.toString(), fullSerializedString); + + if (needToRemoveCharsNumber <= 0) { + break; + } } + return new SerializationResult(contextJsonNode.toString(), fullSerializedString); } /** * Removes {@code node}'s elements which total length of serialized values is greater or equal to the passed limit. * If it is impossible to satisfy the limit the method removes all {@code node}'s elements. * On every iteration it removes exactly half of the remained elements to reduce the overall complexity. - * @param node {@link ArrayNode} which elements are being removed. - * @param needToRemoveCharsNumber the number of chars need to be removed. + * + * @param node {@link ArrayNode} which elements are being removed. + * @param target the number of chars need to be removed. + * * @return the number of removed chars. */ - private static int removeNodeElementsToSatisfyCharsLimit(ArrayNode node, int needToRemoveCharsNumber) + private static int removeNodeElementsToSatisfyCharsLimit(ArrayNode node, int target) { - int removedCharsNumber = 0; - while (node.size() > 0 && needToRemoveCharsNumber > removedCharsNumber) { - final int lengthBeforeRemove = node.toString().length(); + int nodeLen = node.toString().length(); + final int startLen = nodeLen; + while (node.size() > 0 && target > startLen - nodeLen) { // Reducing complexity by removing half of array's elements final int removeUntil = node.size() / 2; for (int removeAt = node.size() - 1; removeAt >= removeUntil; removeAt--) { node.remove(removeAt); } - final int lengthAfterRemove = node.toString().length(); - removedCharsNumber += lengthBeforeRemove - lengthAfterRemove; + nodeLen = node.toString().length(); } - return removedCharsNumber; + return startLen - nodeLen; } /** * Serialization result of {@link ResponseContext}. - * Response context might be serialized using max legth limit, in this case the context might be reduced + * Response context might be serialized using max length limit, in this case the context might be reduced * by removing max-length fields one by one unless serialization result length is less than the limit. * This structure has a reduced serialization result along with full result and boolean property * indicating if some fields were removed from the context. diff --git a/processing/src/main/java/org/apache/druid/query/context/ResponseContextDeserializer.java b/processing/src/main/java/org/apache/druid/query/context/ResponseContextDeserializer.java new file mode 100644 index 00000000000..0a631175be7 --- /dev/null +++ b/processing/src/main/java/org/apache/druid/query/context/ResponseContextDeserializer.java @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.context; + +import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.core.JsonToken; +import com.fasterxml.jackson.databind.DeserializationContext; +import com.fasterxml.jackson.databind.JsonMappingException; +import com.fasterxml.jackson.databind.deser.std.StdDeserializer; +import java.io.IOException; + +/** + * Deserialize a response context. The response context is created for single-thread use. + * (That is, it is non-concurrent.) Clients of this code should convert the + * context to concurrent if it will be used across threads. + */ +@SuppressWarnings("serial") +public class ResponseContextDeserializer extends StdDeserializer +{ + public ResponseContextDeserializer() + { + super(ResponseContext.class); + } + + @Override + public ResponseContext deserialize( + final JsonParser jp, + final DeserializationContext ctxt + ) throws IOException + { + if (jp.currentToken() != JsonToken.START_OBJECT) { + throw ctxt.wrongTokenException(jp, ResponseContext.class, JsonToken.START_OBJECT, null); + } + + final ResponseContext retVal = ResponseContext.createEmpty(); + + jp.nextToken(); + + ResponseContext.Keys keys = ResponseContext.Keys.instance(); + while (jp.currentToken() == JsonToken.FIELD_NAME) { + // Get the key. Since this is a deserialization, the sender may + // be a different version of Druid with a different set of keys. + // Ignore any keys which the sender knows about but this node + // does not know about. + final ResponseContext.Key key = keys.find(jp.getText()); + + jp.nextToken(); + if (key == null) { + skipValue(jp, jp.getText()); + } else { + retVal.add(key, key.readValue(jp)); + } + + jp.nextToken(); + } + + if (jp.currentToken() != JsonToken.END_OBJECT) { + throw ctxt.wrongTokenException(jp, ResponseContext.class, JsonToken.END_OBJECT, null); + } + + return retVal; + } + + /** + * Skip over a single JSON value: scalar or composite. + */ + private void skipValue(final JsonParser jp, String key) throws IOException + { + final JsonToken token = jp.currentToken(); + switch (token) { + case START_OBJECT: + skipTo(jp, JsonToken.END_OBJECT); + break; + case START_ARRAY: + skipTo(jp, JsonToken.END_ARRAY); + break; + default: + if (token.isScalarValue()) { + return; + } + throw new JsonMappingException(jp, "Invalid JSON inside unknown key: " + key); + } + } + + /** + * Freewheel over the contents of a structured object, including any + * nested structured objects, until the given end token. + */ + private void skipTo(final JsonParser jp, JsonToken end) throws IOException + { + while (true) { + jp.nextToken(); + final JsonToken token = jp.currentToken(); + if (token == null) { + throw new JsonMappingException(jp, "Premature EOF"); + } + switch (token) { + case START_OBJECT: + skipTo(jp, JsonToken.END_OBJECT); + break; + case START_ARRAY: + skipTo(jp, JsonToken.END_ARRAY); + break; + default: + if (token == end) { + return; + } + } + } + } +} diff --git a/processing/src/main/java/org/apache/druid/query/scan/ScanQueryEngine.java b/processing/src/main/java/org/apache/druid/query/scan/ScanQueryEngine.java index 1e30e15cfe5..af5b3abcdc0 100644 --- a/processing/src/main/java/org/apache/druid/query/scan/ScanQueryEngine.java +++ b/processing/src/main/java/org/apache/druid/query/scan/ScanQueryEngine.java @@ -67,15 +67,12 @@ public class ScanQueryEngine // "legacy" should be non-null due to toolChest.mergeResults final boolean legacy = Preconditions.checkNotNull(query.isLegacy(), "Expected non-null 'legacy' parameter"); - final Object numScannedRows = responseContext.get(ResponseContext.Key.NUM_SCANNED_ROWS); - if (numScannedRows != null) { - long count = (long) numScannedRows; - if (count >= query.getScanRowsLimit() && query.getTimeOrder().equals(ScanQuery.Order.NONE)) { - return Sequences.empty(); - } + final Long numScannedRows = responseContext.getRowScanCount(); + if (numScannedRows != null && numScannedRows >= query.getScanRowsLimit() && query.getTimeOrder().equals(ScanQuery.Order.NONE)) { + return Sequences.empty(); } final boolean hasTimeout = QueryContexts.hasTimeout(query); - final long timeoutAt = (long) responseContext.get(ResponseContext.Key.TIMEOUT_AT); + final Long timeoutAt = responseContext.getTimeoutTime(); final long start = System.currentTimeMillis(); final StorageAdapter adapter = segment.asStorageAdapter(); @@ -122,7 +119,8 @@ public class ScanQueryEngine final Filter filter = Filters.convertToCNFFromQueryContext(query, Filters.toFilter(query.getFilter())); - responseContext.add(ResponseContext.Key.NUM_SCANNED_ROWS, 0L); + // If the row count is not set, set it to 0, else do nothing. + responseContext.addRowScanCount(0); final long limit = calculateRemainingScanRowsLimit(query, responseContext); return Sequences.concat( adapter @@ -186,10 +184,9 @@ public class ScanQueryEngine } else { throw new UOE("resultFormat[%s] is not supported", resultFormat.toString()); } - responseContext.add(ResponseContext.Key.NUM_SCANNED_ROWS, offset - lastOffset); + responseContext.addRowScanCount(offset - lastOffset); if (hasTimeout) { - responseContext.put( - ResponseContext.Key.TIMEOUT_AT, + responseContext.putTimeoutTime( timeoutAt - (System.currentTimeMillis() - start) ); } @@ -262,7 +259,7 @@ public class ScanQueryEngine private long calculateRemainingScanRowsLimit(ScanQuery query, ResponseContext responseContext) { if (query.getTimeOrder().equals(ScanQuery.Order.NONE)) { - return query.getScanRowsLimit() - (long) responseContext.get(ResponseContext.Key.NUM_SCANNED_ROWS); + return query.getScanRowsLimit() - (Long) responseContext.getRowScanCount(); } return query.getScanRowsLimit(); } diff --git a/processing/src/main/java/org/apache/druid/query/scan/ScanQueryRunnerFactory.java b/processing/src/main/java/org/apache/druid/query/scan/ScanQueryRunnerFactory.java index 032c1159861..7133241836c 100644 --- a/processing/src/main/java/org/apache/druid/query/scan/ScanQueryRunnerFactory.java +++ b/processing/src/main/java/org/apache/druid/query/scan/ScanQueryRunnerFactory.java @@ -87,7 +87,7 @@ public class ScanQueryRunnerFactory implements QueryRunnerFactory> queryRunners ) { - // in single thread and in jetty thread instead of processing thread + // in single thread and in Jetty thread instead of processing thread return (queryPlus, responseContext) -> { ScanQuery query = (ScanQuery) queryPlus.getQuery(); ScanQuery.verifyOrderByForNativeExecution(query); @@ -95,7 +95,7 @@ public class ScanQueryRunnerFactory implements QueryRunnerFactory implements QueryRunner private void appendMissingSegment(ResponseContext responseContext) { - responseContext.add( - ResponseContext.Key.MISSING_SEGMENTS, + responseContext.addMissingSegments( Collections.singletonList(specificSpec.getDescriptor()) ); } diff --git a/processing/src/test/java/org/apache/druid/query/ReportTimelineMissingSegmentQueryRunnerTest.java b/processing/src/test/java/org/apache/druid/query/ReportTimelineMissingSegmentQueryRunnerTest.java index ab054d0f94c..1228c42b17d 100644 --- a/processing/src/test/java/org/apache/druid/query/ReportTimelineMissingSegmentQueryRunnerTest.java +++ b/processing/src/test/java/org/apache/druid/query/ReportTimelineMissingSegmentQueryRunnerTest.java @@ -23,7 +23,6 @@ import com.google.common.collect.ImmutableList; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.query.context.DefaultResponseContext; import org.apache.druid.query.context.ResponseContext; -import org.apache.druid.query.context.ResponseContext.Key; import org.apache.druid.query.filter.DimFilter; import org.apache.druid.query.spec.MultipleSpecificSegmentSpec; import org.apache.druid.query.spec.QuerySegmentSpec; @@ -47,8 +46,8 @@ public class ReportTimelineMissingSegmentQueryRunnerTest = new ReportTimelineMissingSegmentQueryRunner<>(missingSegment); final ResponseContext responseContext = DefaultResponseContext.createEmpty(); runner.run(QueryPlus.wrap(new TestQuery()), responseContext); - Assert.assertNotNull(responseContext.get(Key.MISSING_SEGMENTS)); - Assert.assertEquals(Collections.singletonList(missingSegment), responseContext.get(Key.MISSING_SEGMENTS)); + Assert.assertNotNull(responseContext.getMissingSegments()); + Assert.assertEquals(Collections.singletonList(missingSegment), responseContext.getMissingSegments()); } @Test @@ -63,8 +62,8 @@ public class ReportTimelineMissingSegmentQueryRunnerTest = new ReportTimelineMissingSegmentQueryRunner<>(missingSegments); final ResponseContext responseContext = DefaultResponseContext.createEmpty(); runner.run(QueryPlus.wrap(new TestQuery()), responseContext); - Assert.assertNotNull(responseContext.get(Key.MISSING_SEGMENTS)); - Assert.assertEquals(missingSegments, responseContext.get(Key.MISSING_SEGMENTS)); + Assert.assertNotNull(responseContext.getMissingSegments()); + Assert.assertEquals(missingSegments, responseContext.getMissingSegments()); } private static class TestQuery extends BaseQuery diff --git a/processing/src/test/java/org/apache/druid/query/context/ResponseContextTest.java b/processing/src/test/java/org/apache/druid/query/context/ResponseContextTest.java index 8c329441734..beca7bfc625 100644 --- a/processing/src/test/java/org/apache/druid/query/context/ResponseContextTest.java +++ b/processing/src/test/java/org/apache/druid/query/context/ResponseContextTest.java @@ -20,12 +20,17 @@ package org.apache.druid.query.context; import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Strings; import com.google.common.collect.ImmutableMap; import org.apache.druid.jackson.DefaultObjectMapper; import org.apache.druid.java.util.common.Intervals; -import org.apache.druid.java.util.common.NonnullPair; +import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.query.SegmentDescriptor; +import org.apache.druid.query.context.ResponseContext.CounterKey; import org.apache.druid.query.context.ResponseContext.Key; +import org.apache.druid.query.context.ResponseContext.Keys; +import org.apache.druid.query.context.ResponseContext.StringKey; import org.joda.time.Interval; import org.junit.Assert; import org.junit.Test; @@ -33,177 +38,171 @@ import org.junit.Test; import java.io.IOException; import java.util.Arrays; import java.util.Collections; -import java.util.List; +import java.util.HashMap; import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.function.BiFunction; public class ResponseContextTest { + // Droppable header key + static final Key EXTN_STRING_KEY = new StringKey( + "extn_string_key", true, true); + // Non-droppable header key + static final Key EXTN_COUNTER_KEY = new CounterKey( + "extn_counter_key", true); - enum ExtensionResponseContextKey implements ResponseContext.BaseKey - { - EXTENSION_KEY_1("extension_key_1"), - EXTENSION_KEY_2("extension_key_2", (oldValue, newValue) -> (long) oldValue + (long) newValue); - - static { - for (ResponseContext.BaseKey key : values()) { - ResponseContext.Key.registerKey(key); - } - } - - private final String name; - private final BiFunction mergeFunction; - - ExtensionResponseContextKey(String name) - { - this.name = name; - this.mergeFunction = (oldValue, newValue) -> newValue; - } - - ExtensionResponseContextKey(String name, BiFunction mergeFunction) - { - this.name = name; - this.mergeFunction = mergeFunction; - } - - @Override - public String getName() - { - return name; - } - - @Override - public BiFunction getMergeFunction() - { - return mergeFunction; - } + static { + Keys.instance().registerKeys(new Key[] { + EXTN_STRING_KEY, + EXTN_COUNTER_KEY + }); } - private final ResponseContext.BaseKey nonregisteredKey = new ResponseContext.BaseKey() - { - @Override - public String getName() - { - return "non-registered-key"; - } - - @Override - public BiFunction getMergeFunction() - { - return (Object a, Object b) -> a; - } - }; + static final Key UNREGISTERED_KEY = new StringKey( + "unregistered-key", true, true); @Test(expected = IllegalStateException.class) public void putISETest() { - ResponseContext.createEmpty().put(nonregisteredKey, new Object()); + ResponseContext.createEmpty().put(UNREGISTERED_KEY, new Object()); } @Test(expected = IllegalStateException.class) public void addISETest() { - ResponseContext.createEmpty().add(nonregisteredKey, new Object()); + ResponseContext.createEmpty().add(UNREGISTERED_KEY, new Object()); } @Test(expected = IllegalArgumentException.class) public void registerKeyIAETest() { - ResponseContext.Key.registerKey(ResponseContext.Key.NUM_SCANNED_ROWS); + Keys.INSTANCE.registerKey(Keys.NUM_SCANNED_ROWS); } @Test - public void mergeValueTest() + public void mergeETagTest() { final ResponseContext ctx = ResponseContext.createEmpty(); - ctx.add(ResponseContext.Key.ETAG, "dummy-etag"); - Assert.assertEquals("dummy-etag", ctx.get(ResponseContext.Key.ETAG)); - ctx.add(ResponseContext.Key.ETAG, "new-dummy-etag"); - Assert.assertEquals("new-dummy-etag", ctx.get(ResponseContext.Key.ETAG)); + ctx.putEntityTag("dummy-etag"); + Assert.assertEquals("dummy-etag", ctx.getEntityTag()); + ctx.putEntityTag("new-dummy-etag"); + Assert.assertEquals("new-dummy-etag", ctx.getEntityTag()); + } - final Interval interval01 = Intervals.of("2019-01-01/P1D"); - ctx.add(ResponseContext.Key.UNCOVERED_INTERVALS, Collections.singletonList(interval01)); - Assert.assertArrayEquals( - Collections.singletonList(interval01).toArray(), - ((List) ctx.get(ResponseContext.Key.UNCOVERED_INTERVALS)).toArray() - ); - final Interval interval12 = Intervals.of("2019-01-02/P1D"); - final Interval interval23 = Intervals.of("2019-01-03/P1D"); - ctx.add(ResponseContext.Key.UNCOVERED_INTERVALS, Arrays.asList(interval12, interval23)); - Assert.assertArrayEquals( - Arrays.asList(interval01, interval12, interval23).toArray(), - ((List) ctx.get(ResponseContext.Key.UNCOVERED_INTERVALS)).toArray() - ); + private static final Interval INTERVAL_01 = Intervals.of("2019-01-01/P1D"); + private static final Interval INTERVAL_12 = Intervals.of("2019-01-02/P1D"); + private static final Interval INTERVAL_23 = Intervals.of("2019-01-03/P1D"); + @Test + public void mergeUncoveredIntervalsTest() + { + final ResponseContext ctx = ResponseContext.createEmpty(); + ctx.putUncoveredIntervals(Collections.singletonList(INTERVAL_01), false); + Assert.assertArrayEquals( + Collections.singletonList(INTERVAL_01).toArray(), + ctx.getUncoveredIntervals().toArray() + ); + ctx.add(Keys.UNCOVERED_INTERVALS, Arrays.asList(INTERVAL_12, INTERVAL_23)); + Assert.assertArrayEquals( + Arrays.asList(INTERVAL_01, INTERVAL_12, INTERVAL_23).toArray(), + ctx.getUncoveredIntervals().toArray() + ); + } + + @Test + public void mergeRemainingResponseTest() + { + final ResponseContext ctx = ResponseContext.createEmpty(); final String queryId = "queryId"; final String queryId2 = "queryId2"; - ctx.put(Key.REMAINING_RESPONSES_FROM_QUERY_SERVERS, new ConcurrentHashMap<>()); - ctx.add(Key.REMAINING_RESPONSES_FROM_QUERY_SERVERS, new NonnullPair<>(queryId, 3)); - ctx.add(Key.REMAINING_RESPONSES_FROM_QUERY_SERVERS, new NonnullPair<>(queryId2, 4)); - ctx.add(Key.REMAINING_RESPONSES_FROM_QUERY_SERVERS, new NonnullPair<>(queryId, -1)); - ctx.add(Key.REMAINING_RESPONSES_FROM_QUERY_SERVERS, new NonnullPair<>(queryId, -2)); + ctx.initialize(); + ctx.addRemainingResponse(queryId, 3); + ctx.addRemainingResponse(queryId2, 4); + ctx.addRemainingResponse(queryId, -1); + ctx.addRemainingResponse(queryId, -2); Assert.assertEquals( ImmutableMap.of(queryId, 0, queryId2, 4), - ctx.get(Key.REMAINING_RESPONSES_FROM_QUERY_SERVERS) + ctx.get(Keys.REMAINING_RESPONSES_FROM_QUERY_SERVERS) ); + } - final SegmentDescriptor sd01 = new SegmentDescriptor(interval01, "01", 0); - ctx.add(ResponseContext.Key.MISSING_SEGMENTS, Collections.singletonList(sd01)); + @Test + public void mergeMissingSegmentsTest() + { + final ResponseContext ctx = ResponseContext.createEmpty(); + final SegmentDescriptor sd01 = new SegmentDescriptor(INTERVAL_01, "01", 0); + ctx.addMissingSegments(Collections.singletonList(sd01)); Assert.assertArrayEquals( Collections.singletonList(sd01).toArray(), - ((List) ctx.get(ResponseContext.Key.MISSING_SEGMENTS)).toArray() + ctx.getMissingSegments().toArray() ); - final SegmentDescriptor sd12 = new SegmentDescriptor(interval12, "12", 1); - final SegmentDescriptor sd23 = new SegmentDescriptor(interval23, "23", 2); - ctx.add(ResponseContext.Key.MISSING_SEGMENTS, Arrays.asList(sd12, sd23)); + final SegmentDescriptor sd12 = new SegmentDescriptor(INTERVAL_12, "12", 1); + final SegmentDescriptor sd23 = new SegmentDescriptor(INTERVAL_23, "23", 2); + ctx.addMissingSegments(Arrays.asList(sd12, sd23)); Assert.assertArrayEquals( Arrays.asList(sd01, sd12, sd23).toArray(), - ((List) ctx.get(ResponseContext.Key.MISSING_SEGMENTS)).toArray() + ctx.getMissingSegments().toArray() ); + } - ctx.add(ResponseContext.Key.NUM_SCANNED_ROWS, 0L); - Assert.assertEquals(0L, ctx.get(ResponseContext.Key.NUM_SCANNED_ROWS)); - ctx.add(ResponseContext.Key.NUM_SCANNED_ROWS, 1L); - Assert.assertEquals(1L, ctx.get(ResponseContext.Key.NUM_SCANNED_ROWS)); - ctx.add(ResponseContext.Key.NUM_SCANNED_ROWS, 3L); - Assert.assertEquals(4L, ctx.get(ResponseContext.Key.NUM_SCANNED_ROWS)); + @Test + public void initScannedRowsTest() + { + final ResponseContext ctx = ResponseContext.createEmpty(); + Assert.assertNull(ctx.getRowScanCount()); + ctx.initializeRowScanCount(); + Assert.assertEquals((Long) 0L, ctx.getRowScanCount()); + } - ctx.add(ResponseContext.Key.UNCOVERED_INTERVALS_OVERFLOWED, false); - Assert.assertEquals(false, ctx.get(ResponseContext.Key.UNCOVERED_INTERVALS_OVERFLOWED)); - ctx.add(ResponseContext.Key.UNCOVERED_INTERVALS_OVERFLOWED, true); - Assert.assertEquals(true, ctx.get(ResponseContext.Key.UNCOVERED_INTERVALS_OVERFLOWED)); - ctx.add(ResponseContext.Key.UNCOVERED_INTERVALS_OVERFLOWED, false); - Assert.assertEquals(true, ctx.get(ResponseContext.Key.UNCOVERED_INTERVALS_OVERFLOWED)); + @Test + public void mergeScannedRowsTest() + { + final ResponseContext ctx = ResponseContext.createEmpty(); + Assert.assertNull(ctx.getRowScanCount()); + ctx.addRowScanCount(0L); + Assert.assertEquals((Long) 0L, ctx.getRowScanCount()); + ctx.addRowScanCount(1L); + Assert.assertEquals((Long) 1L, ctx.getRowScanCount()); + ctx.addRowScanCount(3L); + Assert.assertEquals((Long) 4L, ctx.getRowScanCount()); + } + + @Test + public void mergeUncoveredIntervalsOverflowedTest() + { + final ResponseContext ctx = ResponseContext.createEmpty(); + ctx.add(Keys.UNCOVERED_INTERVALS_OVERFLOWED, false); + Assert.assertEquals(false, ctx.get(Keys.UNCOVERED_INTERVALS_OVERFLOWED)); + ctx.add(Keys.UNCOVERED_INTERVALS_OVERFLOWED, true); + Assert.assertEquals(true, ctx.get(Keys.UNCOVERED_INTERVALS_OVERFLOWED)); + ctx.add(Keys.UNCOVERED_INTERVALS_OVERFLOWED, false); + Assert.assertEquals(true, ctx.get(Keys.UNCOVERED_INTERVALS_OVERFLOWED)); } @Test public void mergeResponseContextTest() { final ResponseContext ctx1 = ResponseContext.createEmpty(); - ctx1.put(ResponseContext.Key.ETAG, "dummy-etag-1"); - final Interval interval01 = Intervals.of("2019-01-01/P1D"); - ctx1.put(ResponseContext.Key.UNCOVERED_INTERVALS, Collections.singletonList(interval01)); - ctx1.put(ResponseContext.Key.NUM_SCANNED_ROWS, 1L); + ctx1.putEntityTag("dummy-etag-1"); + ctx1.putUncoveredIntervals(Collections.singletonList(INTERVAL_01), false); + ctx1.addRowScanCount(1L); final ResponseContext ctx2 = ResponseContext.createEmpty(); - ctx2.put(ResponseContext.Key.ETAG, "dummy-etag-2"); - final Interval interval12 = Intervals.of("2019-01-02/P1D"); - ctx2.put(ResponseContext.Key.UNCOVERED_INTERVALS, Collections.singletonList(interval12)); - final SegmentDescriptor sd01 = new SegmentDescriptor(interval01, "01", 0); - ctx2.put(ResponseContext.Key.MISSING_SEGMENTS, Collections.singletonList(sd01)); - ctx2.put(ResponseContext.Key.NUM_SCANNED_ROWS, 2L); + ctx2.putEntityTag("dummy-etag-2"); + ctx2.putUncoveredIntervals(Collections.singletonList(INTERVAL_12), false); + final SegmentDescriptor sd01 = new SegmentDescriptor(INTERVAL_01, "01", 0); + ctx2.addMissingSegments(Collections.singletonList(sd01)); + ctx2.addRowScanCount(2L); ctx1.merge(ctx2); - Assert.assertEquals("dummy-etag-2", ctx1.get(ResponseContext.Key.ETAG)); - Assert.assertEquals(3L, ctx1.get(ResponseContext.Key.NUM_SCANNED_ROWS)); + Assert.assertEquals("dummy-etag-2", ctx1.getEntityTag()); + Assert.assertEquals((Long) 3L, ctx1.getRowScanCount()); Assert.assertArrayEquals( - Arrays.asList(interval01, interval12).toArray(), - ((List) ctx1.get(ResponseContext.Key.UNCOVERED_INTERVALS)).toArray() + Arrays.asList(INTERVAL_01, INTERVAL_12).toArray(), + ctx1.getUncoveredIntervals().toArray() ); Assert.assertArrayEquals( Collections.singletonList(sd01).toArray(), - ((List) ctx1.get(ResponseContext.Key.MISSING_SEGMENTS)).toArray() + ctx1.getMissingSegments().toArray() ); } @@ -213,9 +212,9 @@ public class ResponseContextTest final ResponseContext ctx = new ResponseContext() { @Override - protected Map getDelegate() + protected Map getDelegate() { - return ImmutableMap.of(nonregisteredKey, "non-registered-key"); + return ImmutableMap.of(UNREGISTERED_KEY, "non-registered-key"); } }; ResponseContext.createEmpty().merge(ctx); @@ -225,68 +224,116 @@ public class ResponseContextTest public void serializeWithCorrectnessTest() throws JsonProcessingException { final ResponseContext ctx1 = ResponseContext.createEmpty(); - ctx1.add(ResponseContext.Key.ETAG, "string-value"); + ctx1.add(EXTN_STRING_KEY, "string-value"); final DefaultObjectMapper mapper = new DefaultObjectMapper(); Assert.assertEquals( - mapper.writeValueAsString(ImmutableMap.of("ETag", "string-value")), - ctx1.serializeWith(mapper, Integer.MAX_VALUE).getResult() - ); + mapper.writeValueAsString(ImmutableMap.of( + EXTN_STRING_KEY.getName(), + "string-value")), + ctx1.serializeWith(mapper, Integer.MAX_VALUE).getResult()); final ResponseContext ctx2 = ResponseContext.createEmpty(); - ctx2.add(ResponseContext.Key.NUM_SCANNED_ROWS, 100); + // Add two non-header fields, and one that will be in the header + ctx2.putEntityTag("not in header"); + ctx2.addCpuNanos(100); + ctx2.add(EXTN_COUNTER_KEY, 100); Assert.assertEquals( - mapper.writeValueAsString(ImmutableMap.of("count", 100)), - ctx2.serializeWith(mapper, Integer.MAX_VALUE).getResult() - ); + mapper.writeValueAsString(ImmutableMap.of( + EXTN_COUNTER_KEY.getName(), 100)), + ctx2.serializeWith(mapper, Integer.MAX_VALUE).getResult()); + } + + private Map deserializeContext(String input, ObjectMapper mapper) throws IOException + { + return ResponseContext.deserialize(input, mapper).getDelegate(); } @Test public void serializeWithTruncateValueTest() throws IOException { final ResponseContext ctx = ResponseContext.createEmpty(); - ctx.put(ResponseContext.Key.NUM_SCANNED_ROWS, 100); - ctx.put(ResponseContext.Key.ETAG, "long-string-that-is-supposed-to-be-removed-from-result"); + ctx.put(EXTN_COUNTER_KEY, 100L); + ctx.put(EXTN_STRING_KEY, "long-string-that-is-supposed-to-be-removed-from-result"); final DefaultObjectMapper objectMapper = new DefaultObjectMapper(); - final String fullString = objectMapper.writeValueAsString(ctx.getDelegate()); final ResponseContext.SerializationResult res1 = ctx.serializeWith(objectMapper, Integer.MAX_VALUE); - Assert.assertEquals(fullString, res1.getResult()); + Assert.assertEquals(ctx.getDelegate(), deserializeContext(res1.getResult(), objectMapper)); final ResponseContext ctxCopy = ResponseContext.createEmpty(); ctxCopy.merge(ctx); - final ResponseContext.SerializationResult res2 = ctx.serializeWith(objectMapper, 30); - ctxCopy.remove(ResponseContext.Key.ETAG); - ctxCopy.put(ResponseContext.Key.TRUNCATED, true); + final int target = EXTN_COUNTER_KEY.getName().length() + 3 + + Keys.TRUNCATED.getName().length() + 5 + + 15; // Fudge factor for quotes, separators, etc. + final ResponseContext.SerializationResult res2 = ctx.serializeWith(objectMapper, target); + ctxCopy.remove(EXTN_STRING_KEY); + ctxCopy.put(Keys.TRUNCATED, true); Assert.assertEquals( ctxCopy.getDelegate(), - ResponseContext.deserialize(res2.getResult(), objectMapper).getDelegate() + deserializeContext(res2.getResult(), objectMapper) ); } + /** + * Tests the case in which the sender knows about a key that the + * receiver does not know about. The receiver will silently ignore + * such keys. + * @throws IOException + */ + @Test + public void deserializeWithUnknownKeyTest() throws IOException + { + Map bogus = new HashMap<>(); + bogus.put(Keys.ETAG.getName(), "eTag"); + bogus.put("scalar", "doomed"); + bogus.put("array", new String[]{"foo", "bar"}); + Map objValue = new HashMap<>(); + objValue.put("array", new String[]{"foo", "bar"}); + bogus.put("obj", objValue); + bogus.put("null", null); + final ObjectMapper mapper = new DefaultObjectMapper(); + String serialized = mapper.writeValueAsString(bogus); + ResponseContext ctx = ResponseContext.deserialize(serialized, mapper); + Assert.assertEquals(1, ctx.getDelegate().size()); + Assert.assertEquals("eTag", ctx.get(Keys.ETAG)); + } + + // Interval value for the test. Must match the deserialized value. + private static Interval interval(int n) + { + return Intervals.of(StringUtils.format("2021-01-%02d/PT1M", n)); + } + + // Length of above with quotes and comma. + private static final int INTERVAL_LEN = 52; + @Test public void serializeWithTruncateArrayTest() throws IOException { final ResponseContext ctx = ResponseContext.createEmpty(); - ctx.put(ResponseContext.Key.NUM_SCANNED_ROWS, 100); ctx.put( - ResponseContext.Key.UNCOVERED_INTERVALS, - Arrays.asList(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9) + Keys.UNCOVERED_INTERVALS, + Arrays.asList(interval(1), interval(2), interval(3), interval(4), + interval(5), interval(6)) ); + // This value should be longer than the above so it is fully removed + // before we truncate the above. ctx.put( - ResponseContext.Key.MISSING_SEGMENTS, - Arrays.asList(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9) + EXTN_STRING_KEY, + Strings.repeat("x", INTERVAL_LEN * 7) ); final DefaultObjectMapper objectMapper = new DefaultObjectMapper(); final String fullString = objectMapper.writeValueAsString(ctx.getDelegate()); final ResponseContext.SerializationResult res1 = ctx.serializeWith(objectMapper, Integer.MAX_VALUE); Assert.assertEquals(fullString, res1.getResult()); + final int maxLen = INTERVAL_LEN * 4 + Keys.UNCOVERED_INTERVALS.getName().length() + 4 + + Keys.TRUNCATED.getName().length() + 6; + final ResponseContext.SerializationResult res2 = ctx.serializeWith(objectMapper, maxLen); final ResponseContext ctxCopy = ResponseContext.createEmpty(); - ctxCopy.merge(ctx); - final ResponseContext.SerializationResult res2 = ctx.serializeWith(objectMapper, 70); - ctxCopy.put(ResponseContext.Key.UNCOVERED_INTERVALS, Arrays.asList(0, 1, 2, 3, 4)); - ctxCopy.remove(ResponseContext.Key.MISSING_SEGMENTS); - ctxCopy.put(ResponseContext.Key.TRUNCATED, true); + // The resulting key array length will be half the start + // length. + ctxCopy.put(Keys.UNCOVERED_INTERVALS, Arrays.asList(interval(1), interval(2), interval(3))); + ctxCopy.put(Keys.TRUNCATED, true); Assert.assertEquals( ctxCopy.getDelegate(), - ResponseContext.deserialize(res2.getResult(), objectMapper).getDelegate() + deserializeContext(res2.getResult(), objectMapper) ); } @@ -297,62 +344,45 @@ public class ResponseContextTest final ResponseContext ctx = ResponseContext.deserialize( mapper.writeValueAsString( ImmutableMap.of( - "ETag", "string-value", - "count", 100L, - "cpuConsumed", 100000L + Keys.ETAG.getName(), "string-value", + Keys.NUM_SCANNED_ROWS.getName(), 100L, + Keys.CPU_CONSUMED_NANOS.getName(), 100000L ) ), mapper ); - Assert.assertEquals("string-value", ctx.get(ResponseContext.Key.ETAG)); - Assert.assertEquals(100, ctx.get(ResponseContext.Key.NUM_SCANNED_ROWS)); - Assert.assertEquals(100000, ctx.get(ResponseContext.Key.CPU_CONSUMED_NANOS)); - ctx.add(ResponseContext.Key.NUM_SCANNED_ROWS, 10L); - Assert.assertEquals(110L, ctx.get(ResponseContext.Key.NUM_SCANNED_ROWS)); - ctx.add(ResponseContext.Key.CPU_CONSUMED_NANOS, 100); - Assert.assertEquals(100100L, ctx.get(ResponseContext.Key.CPU_CONSUMED_NANOS)); - } - - @Test(expected = IllegalStateException.class) - public void deserializeISETest() throws IOException - { - final DefaultObjectMapper mapper = new DefaultObjectMapper(); - ResponseContext.deserialize( - mapper.writeValueAsString(ImmutableMap.of("ETag_unexpected", "string-value")), - mapper - ); - } - - @Test - public void extensionEnumIntegrityTest() - { - Assert.assertEquals( - ExtensionResponseContextKey.EXTENSION_KEY_1, - ResponseContext.Key.keyOf(ExtensionResponseContextKey.EXTENSION_KEY_1.getName()) - ); - Assert.assertEquals( - ExtensionResponseContextKey.EXTENSION_KEY_2, - ResponseContext.Key.keyOf(ExtensionResponseContextKey.EXTENSION_KEY_2.getName()) - ); - for (ResponseContext.BaseKey key : ExtensionResponseContextKey.values()) { - Assert.assertTrue(ResponseContext.Key.getAllRegisteredKeys().contains(key)); - } + Assert.assertEquals("string-value", ctx.getEntityTag()); + Assert.assertEquals((Long) 100L, ctx.getRowScanCount()); + Assert.assertEquals((Long) 100000L, ctx.getCpuNanos()); + ctx.addRowScanCount(10L); + Assert.assertEquals((Long) 110L, ctx.getRowScanCount()); + ctx.addCpuNanos(100L); + Assert.assertEquals((Long) 100100L, ctx.getCpuNanos()); } @Test public void extensionEnumMergeTest() { final ResponseContext ctx = ResponseContext.createEmpty(); - ctx.add(ResponseContext.Key.ETAG, "etag"); - ctx.add(ExtensionResponseContextKey.EXTENSION_KEY_1, "string-value"); - ctx.add(ExtensionResponseContextKey.EXTENSION_KEY_2, 2L); + ctx.putEntityTag("etag"); + ctx.add(EXTN_STRING_KEY, "string-value"); + ctx.add(EXTN_COUNTER_KEY, 2L); final ResponseContext ctxFinal = ResponseContext.createEmpty(); - ctxFinal.add(ResponseContext.Key.ETAG, "old-etag"); - ctxFinal.add(ExtensionResponseContextKey.EXTENSION_KEY_1, "old-string-value"); - ctxFinal.add(ExtensionResponseContextKey.EXTENSION_KEY_2, 1L); + ctxFinal.putEntityTag("old-etag"); + ctxFinal.add(EXTN_STRING_KEY, "old-string-value"); + ctxFinal.add(EXTN_COUNTER_KEY, 1L); ctxFinal.merge(ctx); - Assert.assertEquals("etag", ctxFinal.get(ResponseContext.Key.ETAG)); - Assert.assertEquals("string-value", ctxFinal.get(ExtensionResponseContextKey.EXTENSION_KEY_1)); - Assert.assertEquals(1L + 2L, ctxFinal.get(ExtensionResponseContextKey.EXTENSION_KEY_2)); + Assert.assertEquals("etag", ctxFinal.getEntityTag()); + Assert.assertEquals("string-value", ctxFinal.get(EXTN_STRING_KEY)); + Assert.assertEquals(1L + 2L, ctxFinal.get(EXTN_COUNTER_KEY)); + } + + @Test + public void toMapTest() + { + final ResponseContext ctx = ResponseContext.createEmpty(); + ctx.putEntityTag("etag"); + Map map = ctx.toMap(); + Assert.assertEquals(map.get(ResponseContext.Keys.ETAG.getName()), "etag"); } } diff --git a/processing/src/test/java/org/apache/druid/query/datasourcemetadata/DataSourceMetadataQueryTest.java b/processing/src/test/java/org/apache/druid/query/datasourcemetadata/DataSourceMetadataQueryTest.java index 7d6c493143e..d4caf89e757 100644 --- a/processing/src/test/java/org/apache/druid/query/datasourcemetadata/DataSourceMetadataQueryTest.java +++ b/processing/src/test/java/org/apache/druid/query/datasourcemetadata/DataSourceMetadataQueryTest.java @@ -50,7 +50,6 @@ import org.junit.Assert; import org.junit.Test; import java.io.IOException; -import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.Map; @@ -140,7 +139,7 @@ public class DataSourceMetadataQueryTest .dataSource("testing") .build(); ResponseContext context = ConcurrentResponseContext.createEmpty(); - context.put(ResponseContext.Key.MISSING_SEGMENTS, new ArrayList<>()); + context.initializeMissingSegments(); Iterable> results = runner.run(QueryPlus.wrap(dataSourceMetadataQuery), context).toList(); DataSourceMetadataResultValue val = results.iterator().next().getValue(); diff --git a/processing/src/test/java/org/apache/druid/query/scan/ScanQueryRunnerTest.java b/processing/src/test/java/org/apache/druid/query/scan/ScanQueryRunnerTest.java index 3e148c7f5e6..cb1a7951637 100644 --- a/processing/src/test/java/org/apache/druid/query/scan/ScanQueryRunnerTest.java +++ b/processing/src/test/java/org/apache/druid/query/scan/ScanQueryRunnerTest.java @@ -913,7 +913,7 @@ public class ScanQueryRunnerTest extends InitializedNullHandlingTest .context(ImmutableMap.of(QueryContexts.TIMEOUT_KEY, 1)) .build(); ResponseContext responseContext = DefaultResponseContext.createEmpty(); - responseContext.add(ResponseContext.Key.TIMEOUT_AT, System.currentTimeMillis()); + responseContext.putTimeoutTime(System.currentTimeMillis()); try { runner.run(QueryPlus.wrap(query), responseContext).toList(); } diff --git a/processing/src/test/java/org/apache/druid/query/spec/SpecificSegmentQueryRunnerTest.java b/processing/src/test/java/org/apache/druid/query/spec/SpecificSegmentQueryRunnerTest.java index f4e1c31a218..88239b44730 100644 --- a/processing/src/test/java/org/apache/druid/query/spec/SpecificSegmentQueryRunnerTest.java +++ b/processing/src/test/java/org/apache/druid/query/spec/SpecificSegmentQueryRunnerTest.java @@ -197,17 +197,11 @@ public class SpecificSegmentQueryRunnerTest private void validate(ObjectMapper mapper, SegmentDescriptor descriptor, ResponseContext responseContext) throws IOException { - Object missingSegments = responseContext.get(ResponseContext.Key.MISSING_SEGMENTS); - + List missingSegments = responseContext.getMissingSegments(); Assert.assertTrue(missingSegments != null); - Assert.assertTrue(missingSegments instanceof List); - - Object segmentDesc = ((List) missingSegments).get(0); - - Assert.assertTrue(segmentDesc instanceof SegmentDescriptor); + SegmentDescriptor segmentDesc = missingSegments.get(0); SegmentDescriptor newDesc = mapper.readValue(mapper.writeValueAsString(segmentDesc), SegmentDescriptor.class); - Assert.assertEquals(descriptor, newDesc); } } diff --git a/processing/src/test/java/org/apache/druid/query/timeboundary/TimeBoundaryQueryRunnerTest.java b/processing/src/test/java/org/apache/druid/query/timeboundary/TimeBoundaryQueryRunnerTest.java index 6834cec5e17..e1ceab34de6 100644 --- a/processing/src/test/java/org/apache/druid/query/timeboundary/TimeBoundaryQueryRunnerTest.java +++ b/processing/src/test/java/org/apache/druid/query/timeboundary/TimeBoundaryQueryRunnerTest.java @@ -225,7 +225,7 @@ public class TimeBoundaryQueryRunnerTest .bound(TimeBoundaryQuery.MAX_TIME) .build(); ResponseContext context = ConcurrentResponseContext.createEmpty(); - context.put(ResponseContext.Key.MISSING_SEGMENTS, new ArrayList<>()); + context.initializeMissingSegments(); Iterable> results = runner.run(QueryPlus.wrap(timeBoundaryQuery), context).toList(); TimeBoundaryResultValue val = results.iterator().next().getValue(); DateTime minTime = val.getMinTime(); @@ -244,7 +244,7 @@ public class TimeBoundaryQueryRunnerTest .bound(TimeBoundaryQuery.MIN_TIME) .build(); ResponseContext context = ConcurrentResponseContext.createEmpty(); - context.put(ResponseContext.Key.MISSING_SEGMENTS, new ArrayList<>()); + context.initializeMissingSegments(); Iterable> results = runner.run(QueryPlus.wrap(timeBoundaryQuery), context).toList(); TimeBoundaryResultValue val = results.iterator().next().getValue(); DateTime minTime = val.getMinTime(); diff --git a/server/src/main/java/org/apache/druid/client/CachingClusteredClient.java b/server/src/main/java/org/apache/druid/client/CachingClusteredClient.java index 8175e2a3ce0..1ba1abe961e 100644 --- a/server/src/main/java/org/apache/druid/client/CachingClusteredClient.java +++ b/server/src/main/java/org/apache/druid/client/CachingClusteredClient.java @@ -45,7 +45,6 @@ import org.apache.druid.guice.annotations.Merging; import org.apache.druid.guice.annotations.Smile; import org.apache.druid.guice.http.DruidHttpClientConfig; import org.apache.druid.java.util.common.Intervals; -import org.apache.druid.java.util.common.NonnullPair; import org.apache.druid.java.util.common.Pair; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.concurrent.Execs; @@ -72,7 +71,6 @@ import org.apache.druid.query.Result; import org.apache.druid.query.SegmentDescriptor; import org.apache.druid.query.aggregation.MetricManipulatorFns; import org.apache.druid.query.context.ResponseContext; -import org.apache.druid.query.context.ResponseContext.Key; import org.apache.druid.query.filter.DimFilterUtils; import org.apache.druid.query.planning.DataSourceAnalysis; import org.apache.druid.query.spec.QuerySegmentSpec; @@ -218,10 +216,7 @@ public class CachingClusteredClient implements QuerySegmentWalker final int numQueryServers ) { - responseContext.add( - Key.REMAINING_RESPONSES_FROM_QUERY_SERVERS, - new NonnullPair<>(query.getMostSpecificId(), numQueryServers) - ); + responseContext.addRemainingResponse(query.getMostSpecificId(), numQueryServers); } @Override @@ -361,7 +356,7 @@ public class CachingClusteredClient implements QuerySegmentWalker @Nullable final String currentEtag = cacheKeyManager.computeResultLevelCachingEtag(segmentServers, queryCacheKey); if (null != currentEtag) { - responseContext.put(Key.ETAG, currentEtag); + responseContext.putEntityTag(currentEtag); } if (currentEtag != null && currentEtag.equals(prevEtag)) { return new ClusterQueryResult<>(Sequences.empty(), 0); @@ -499,8 +494,7 @@ public class CachingClusteredClient implements QuerySegmentWalker // Which is not necessarily an indication that the data doesn't exist or is // incomplete. The data could exist and just not be loaded yet. In either // case, though, this query will not include any data from the identified intervals. - responseContext.add(ResponseContext.Key.UNCOVERED_INTERVALS, uncoveredIntervals); - responseContext.add(ResponseContext.Key.UNCOVERED_INTERVALS_OVERFLOWED, uncoveredIntervalsOverflowed); + responseContext.putUncoveredIntervals(uncoveredIntervals, uncoveredIntervalsOverflowed); } } diff --git a/server/src/main/java/org/apache/druid/client/DirectDruidClient.java b/server/src/main/java/org/apache/druid/client/DirectDruidClient.java index ead9e2d3274..c7556b03cc9 100644 --- a/server/src/main/java/org/apache/druid/client/DirectDruidClient.java +++ b/server/src/main/java/org/apache/druid/client/DirectDruidClient.java @@ -27,7 +27,6 @@ import com.google.common.base.Preconditions; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; -import org.apache.druid.java.util.common.NonnullPair; import org.apache.druid.java.util.common.RE; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.concurrent.Execs; @@ -55,7 +54,6 @@ import org.apache.druid.query.ResourceLimitExceededException; import org.apache.druid.query.aggregation.MetricManipulatorFns; import org.apache.druid.query.context.ConcurrentResponseContext; import org.apache.druid.query.context.ResponseContext; -import org.apache.druid.query.context.ResponseContext.Key; import org.apache.druid.server.QueryResource; import org.apache.druid.utils.CloseableUtils; import org.jboss.netty.buffer.ChannelBuffer; @@ -67,13 +65,13 @@ import org.jboss.netty.handler.codec.http.HttpResponse; import org.joda.time.Duration; import javax.ws.rs.core.MediaType; + import java.io.IOException; import java.io.InputStream; import java.io.SequenceInputStream; import java.net.URL; import java.util.Enumeration; import java.util.concurrent.BlockingQueue; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; @@ -110,15 +108,14 @@ public class DirectDruidClient implements QueryRunner */ public static void removeMagicResponseContextFields(ResponseContext responseContext) { - responseContext.remove(ResponseContext.Key.QUERY_TOTAL_BYTES_GATHERED); - responseContext.remove(ResponseContext.Key.REMAINING_RESPONSES_FROM_QUERY_SERVERS); + responseContext.remove(ResponseContext.Keys.QUERY_TOTAL_BYTES_GATHERED); + responseContext.remove(ResponseContext.Keys.REMAINING_RESPONSES_FROM_QUERY_SERVERS); } - public static ResponseContext makeResponseContextForQuery() + public static ConcurrentResponseContext makeResponseContextForQuery() { - final ResponseContext responseContext = ConcurrentResponseContext.createEmpty(); - responseContext.put(ResponseContext.Key.QUERY_TOTAL_BYTES_GATHERED, new AtomicLong()); - responseContext.put(Key.REMAINING_RESPONSES_FROM_QUERY_SERVERS, new ConcurrentHashMap<>()); + final ConcurrentResponseContext responseContext = ConcurrentResponseContext.createEmpty(); + responseContext.initialize(); return responseContext; } @@ -168,7 +165,7 @@ public class DirectDruidClient implements QueryRunner final long requestStartTimeNs = System.nanoTime(); final long timeoutAt = query.getContextValue(QUERY_FAIL_TIME); final long maxScatterGatherBytes = QueryContexts.getMaxScatterGatherBytes(query); - final AtomicLong totalBytesGathered = (AtomicLong) context.get(ResponseContext.Key.QUERY_TOTAL_BYTES_GATHERED); + final AtomicLong totalBytesGathered = context.getTotalBytes(); final long maxQueuedBytes = QueryContexts.getMaxQueuedBytes(query, 0); final boolean usingBackpressure = maxQueuedBytes > 0; @@ -246,10 +243,7 @@ public class DirectDruidClient implements QueryRunner query.getSubQueryId() ); final String responseContext = response.headers().get(QueryResource.HEADER_RESPONSE_CONTEXT); - context.add( - ResponseContext.Key.REMAINING_RESPONSES_FROM_QUERY_SERVERS, - new NonnullPair<>(query.getMostSpecificId(), VAL_TO_REDUCE_REMAINING_RESPONSES) - ); + context.addRemainingResponse(query.getMostSpecificId(), VAL_TO_REDUCE_REMAINING_RESPONSES); // context may be null in case of error or query timeout if (responseContext != null) { context.merge(ResponseContext.deserialize(responseContext, objectMapper)); diff --git a/server/src/main/java/org/apache/druid/query/ResultLevelCachingQueryRunner.java b/server/src/main/java/org/apache/druid/query/ResultLevelCachingQueryRunner.java index aff8399296b..8a2d383b286 100644 --- a/server/src/main/java/org/apache/druid/query/ResultLevelCachingQueryRunner.java +++ b/server/src/main/java/org/apache/druid/query/ResultLevelCachingQueryRunner.java @@ -97,7 +97,7 @@ public class ResultLevelCachingQueryRunner implements QueryRunner QueryPlus.wrap(query), responseContext ); - String newResultSetId = (String) responseContext.get(ResponseContext.Key.ETAG); + String newResultSetId = responseContext.getEntityTag(); if (useResultCache && newResultSetId != null && newResultSetId.equals(existingResultSetId)) { log.debug("Return cached result set as there is no change in identifiers for query %s ", query.getId()); diff --git a/server/src/main/java/org/apache/druid/query/RetryQueryRunner.java b/server/src/main/java/org/apache/druid/query/RetryQueryRunner.java index 8d393b22db6..9302763d3e5 100644 --- a/server/src/main/java/org/apache/druid/query/RetryQueryRunner.java +++ b/server/src/main/java/org/apache/druid/query/RetryQueryRunner.java @@ -33,10 +33,9 @@ import org.apache.druid.java.util.common.guava.YieldingAccumulator; import org.apache.druid.java.util.common.guava.YieldingSequenceBase; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.query.context.ResponseContext; -import org.apache.druid.query.context.ResponseContext.Key; +import org.apache.druid.query.context.ResponseContext.Keys; import org.apache.druid.segment.SegmentMissingException; -import java.util.ArrayList; import java.util.Collections; import java.util.Iterator; import java.util.List; @@ -54,7 +53,7 @@ public class RetryQueryRunner implements QueryRunner private final ObjectMapper jsonMapper; /** - * Runnable executed after the broker creates query distribution tree for the first attempt. This is only + * Runnable executed after the broker creates the query distribution tree for the first attempt. This is only * for testing and must not be used in production code. */ private final Runnable runnableAfterFirstAttempt; @@ -142,10 +141,10 @@ public class RetryQueryRunner implements QueryRunner // The missingSegments in the responseContext is only valid when all servers have responded to the broker. // The remainingResponses MUST be not null but 0 in the responseContext at this point. final ConcurrentHashMap idToRemainingResponses = - (ConcurrentHashMap) Preconditions.checkNotNull( - context.get(Key.REMAINING_RESPONSES_FROM_QUERY_SERVERS), + Preconditions.checkNotNull( + context.getRemainingResponses(), "%s in responseContext", - Key.REMAINING_RESPONSES_FROM_QUERY_SERVERS.getName() + Keys.REMAINING_RESPONSES_FROM_QUERY_SERVERS.getName() ); final int remainingResponses = Preconditions.checkNotNull( @@ -157,7 +156,11 @@ public class RetryQueryRunner implements QueryRunner throw new ISE("Failed to check missing segments due to missing responses from [%d] servers", remainingResponses); } - final Object maybeMissingSegments = context.get(ResponseContext.Key.MISSING_SEGMENTS); + // TODO: the sender's response may contain a truncated list of missing segments. + // Truncation is aggregated in the response context given as a parameter. + // Check the getTruncated() value: if true, then the we don't know the full set of + // missing segments. + final List maybeMissingSegments = context.getMissingSegments(); if (maybeMissingSegments == null) { return Collections.emptyList(); } @@ -171,7 +174,7 @@ public class RetryQueryRunner implements QueryRunner } /** - * A lazy iterator populating {@link Sequence} by retrying the query. The first returned sequence is always the base + * A lazy iterator populating a {@link Sequence} by retrying the query. The first returned sequence is always the base * sequence from the baseQueryRunner. Subsequent sequences are created dynamically whenever it retries the query. All * the sequences populated by this iterator will be merged (not combined) with the base sequence. * @@ -179,7 +182,7 @@ public class RetryQueryRunner implements QueryRunner * each underlying sequence and pushes them to a {@link java.util.PriorityQueue}. Whenever it pops from the queue, * it pushes a new item from the sequence where the returned item was originally from. Since the first returned * sequence from this iterator is always the base sequence, the MergeSequence will call {@link Sequence#toYielder} - * on the base sequence first which in turn initializing query distribution tree. Once this tree is built, the query + * on the base sequence first which in turn initializes the query distribution tree. Once this tree is built, the query * servers (historicals and realtime tasks) will lock all segments to read and report missing segments to the broker. * If there are missing segments reported, this iterator will rewrite the query with those reported segments and * reissue the rewritten query. @@ -229,7 +232,7 @@ public class RetryQueryRunner implements QueryRunner retryCount++; LOG.info("[%,d] missing segments found. Retry attempt [%,d]", missingSegments.size(), retryCount); - context.put(ResponseContext.Key.MISSING_SEGMENTS, new ArrayList<>()); + context.initializeMissingSegments(); final QueryPlus retryQueryPlus = queryPlus.withQuery( Queries.withSpecificSegments(queryPlus.getQuery(), missingSegments) ); diff --git a/server/src/main/java/org/apache/druid/server/QueryResource.java b/server/src/main/java/org/apache/druid/server/QueryResource.java index b2f7f21e4da..72ceae822f6 100644 --- a/server/src/main/java/org/apache/druid/server/QueryResource.java +++ b/server/src/main/java/org/apache/druid/server/QueryResource.java @@ -26,6 +26,7 @@ import com.fasterxml.jackson.databind.ObjectWriter; import com.fasterxml.jackson.databind.module.SimpleModule; import com.fasterxml.jackson.datatype.joda.ser.DateTimeSerializer; import com.fasterxml.jackson.jaxrs.smile.SmileMediaTypes; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.base.Strings; import com.google.common.collect.ImmutableMap; @@ -55,6 +56,7 @@ import org.apache.druid.query.QueryUnsupportedException; import org.apache.druid.query.ResourceLimitExceededException; import org.apache.druid.query.TruncatedResponseContextException; import org.apache.druid.query.context.ResponseContext; +import org.apache.druid.query.context.ResponseContext.Keys; import org.apache.druid.server.metrics.QueryCountStatsProvider; import org.apache.druid.server.security.Access; import org.apache.druid.server.security.AuthConfig; @@ -78,6 +80,7 @@ import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response; import javax.ws.rs.core.Response.Status; import javax.ws.rs.core.StreamingOutput; + import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; @@ -215,7 +218,7 @@ public class QueryResource implements QueryCountStatsProvider final ResponseContext responseContext = queryResponse.getResponseContext(); final String prevEtag = getPreviousEtag(req); - if (prevEtag != null && prevEtag.equals(responseContext.get(ResponseContext.Key.ETAG))) { + if (prevEtag != null && prevEtag.equals(responseContext.getEntityTag())) { queryLifecycle.emitLogsAndMetrics(null, req.getRemoteAddr(), -1); successfulQueryCount.incrementAndGet(); return Response.notModified().build(); @@ -274,16 +277,13 @@ public class QueryResource implements QueryCountStatsProvider ) .header("X-Druid-Query-Id", queryId); - Object entityTag = responseContext.remove(ResponseContext.Key.ETAG); - if (entityTag != null) { - responseBuilder.header(HEADER_ETAG, entityTag); - } + transferEntityTag(responseContext, responseBuilder); DirectDruidClient.removeMagicResponseContextFields(responseContext); - //Limit the response-context header, see https://github.com/apache/druid/issues/2331 - //Note that Response.ResponseBuilder.header(String key,Object value).build() calls value.toString() - //and encodes the string using ASCII, so 1 char is = 1 byte + // Limit the response-context header, see https://github.com/apache/druid/issues/2331 + // Note that Response.ResponseBuilder.header(String key,Object value).build() calls value.toString() + // and encodes the string using ASCII, so 1 char is = 1 byte final ResponseContext.SerializationResult serializationResult = responseContext.serializeWith( jsonMapper, responseContextConfig.getMaxResponseContextHeaderSize() @@ -557,4 +557,13 @@ public class QueryResource implements QueryCountStatsProvider { return timedOutQueryCount.get(); } + + @VisibleForTesting + public static void transferEntityTag(ResponseContext context, Response.ResponseBuilder builder) + { + Object entityTag = context.remove(Keys.ETAG); + if (entityTag != null) { + builder.header(HEADER_ETAG, entityTag); + } + } } diff --git a/server/src/test/java/org/apache/druid/client/CachingClusteredClientFunctionalityTest.java b/server/src/test/java/org/apache/druid/client/CachingClusteredClientFunctionalityTest.java index a897e09d643..6e4a9b2ef61 100644 --- a/server/src/test/java/org/apache/druid/client/CachingClusteredClientFunctionalityTest.java +++ b/server/src/test/java/org/apache/druid/client/CachingClusteredClientFunctionalityTest.java @@ -128,7 +128,7 @@ public class CachingClusteredClientFunctionalityTest ResponseContext responseContext = ResponseContext.createEmpty(); runQuery(client, builder.build(), responseContext); - Assert.assertNull(responseContext.get(ResponseContext.Key.UNCOVERED_INTERVALS)); + Assert.assertNull(responseContext.getUncoveredIntervals()); builder.intervals("2015-01-01/2015-01-03"); responseContext = ResponseContext.createEmpty(); @@ -177,8 +177,8 @@ public class CachingClusteredClientFunctionalityTest for (String interval : intervals) { expectedList.add(Intervals.of(interval)); } - Assert.assertEquals((Object) expectedList, context.get(ResponseContext.Key.UNCOVERED_INTERVALS)); - Assert.assertEquals(uncoveredIntervalsOverflowed, context.get(ResponseContext.Key.UNCOVERED_INTERVALS_OVERFLOWED)); + Assert.assertEquals((Object) expectedList, context.getUncoveredIntervals()); + Assert.assertEquals(uncoveredIntervalsOverflowed, context.get(ResponseContext.Keys.UNCOVERED_INTERVALS_OVERFLOWED)); } private void addToTimeline(Interval interval, String version) diff --git a/server/src/test/java/org/apache/druid/client/CachingClusteredClientTest.java b/server/src/test/java/org/apache/druid/client/CachingClusteredClientTest.java index 6fe08c4622b..91cd75337a0 100644 --- a/server/src/test/java/org/apache/druid/client/CachingClusteredClientTest.java +++ b/server/src/test/java/org/apache/druid/client/CachingClusteredClientTest.java @@ -88,7 +88,6 @@ import org.apache.druid.query.aggregation.post.ArithmeticPostAggregator; import org.apache.druid.query.aggregation.post.ConstantPostAggregator; import org.apache.druid.query.aggregation.post.FieldAccessPostAggregator; import org.apache.druid.query.context.ResponseContext; -import org.apache.druid.query.context.ResponseContext.Key; import org.apache.druid.query.dimension.DefaultDimensionSpec; import org.apache.druid.query.filter.AndDimFilter; import org.apache.druid.query.filter.BoundDimFilter; @@ -168,7 +167,6 @@ import java.util.Random; import java.util.Set; import java.util.TreeMap; import java.util.concurrent.Callable; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.Executor; import java.util.concurrent.ForkJoinPool; @@ -3292,7 +3290,7 @@ public class CachingClusteredClientTest final ResponseContext responseContext = initializeResponseContext(); getDefaultQueryRunner().run(QueryPlus.wrap(query), responseContext); - Assert.assertEquals("MDs2yIUvYLVzaG6zmwTH1plqaYE=", responseContext.get(ResponseContext.Key.ETAG)); + Assert.assertEquals("MDs2yIUvYLVzaG6zmwTH1plqaYE=", responseContext.getEntityTag()); } @Test @@ -3340,9 +3338,9 @@ public class CachingClusteredClientTest final ResponseContext responseContext = initializeResponseContext(); getDefaultQueryRunner().run(QueryPlus.wrap(query), responseContext); - final Object etag1 = responseContext.get(ResponseContext.Key.ETAG); + final String etag1 = responseContext.getEntityTag(); getDefaultQueryRunner().run(QueryPlus.wrap(query2), responseContext); - final Object etag2 = responseContext.get(ResponseContext.Key.ETAG); + final String etag2 = responseContext.getEntityTag(); Assert.assertNotEquals(etag1, etag2); } @@ -3363,7 +3361,7 @@ public class CachingClusteredClientTest private static ResponseContext initializeResponseContext() { final ResponseContext context = ResponseContext.createEmpty(); - context.put(Key.REMAINING_RESPONSES_FROM_QUERY_SERVERS, new ConcurrentHashMap<>()); + context.initializeRemainingResponses(); return context; } } diff --git a/server/src/test/java/org/apache/druid/query/QueryRunnerBasedOnClusteredClientTestBase.java b/server/src/test/java/org/apache/druid/query/QueryRunnerBasedOnClusteredClientTestBase.java index 97457b8688e..ba7e4da1736 100644 --- a/server/src/test/java/org/apache/druid/query/QueryRunnerBasedOnClusteredClientTestBase.java +++ b/server/src/test/java/org/apache/druid/query/QueryRunnerBasedOnClusteredClientTestBase.java @@ -44,7 +44,6 @@ import org.apache.druid.java.util.common.io.Closer; import org.apache.druid.query.aggregation.CountAggregatorFactory; import org.apache.druid.query.context.ConcurrentResponseContext; import org.apache.druid.query.context.ResponseContext; -import org.apache.druid.query.context.ResponseContext.Key; import org.apache.druid.query.timeseries.TimeseriesResultValue; import org.apache.druid.query.topn.TopNQueryConfig; import org.apache.druid.segment.QueryableIndex; @@ -65,7 +64,6 @@ import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.UUID; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ForkJoinPool; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -247,7 +245,7 @@ public abstract class QueryRunnerBasedOnClusteredClientTestBase protected static ResponseContext responseContext() { final ResponseContext responseContext = ConcurrentResponseContext.createEmpty(); - responseContext.put(Key.REMAINING_RESPONSES_FROM_QUERY_SERVERS, new ConcurrentHashMap<>()); + responseContext.initializeRemainingResponses(); return responseContext; } diff --git a/server/src/test/java/org/apache/druid/server/QueryDetailsTest.java b/server/src/test/java/org/apache/druid/server/QueryDetailsTest.java new file mode 100644 index 00000000000..5a2fd8d3ebf --- /dev/null +++ b/server/src/test/java/org/apache/druid/server/QueryDetailsTest.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.server; + +import org.apache.druid.query.context.ResponseContext; +import org.junit.Assert; +import org.junit.Test; + +import javax.ws.rs.core.MultivaluedMap; +import javax.ws.rs.core.Response; + +/** + * Tests for low-level bits of the query resource mechanism. + */ +public class QueryDetailsTest +{ + @Test + public void testEtag() + { + // No ETAG + { + ResponseContext responseContext = ResponseContext.createEmpty(); + Response.ResponseBuilder responseBuilder = Response.ok(); + QueryResource.transferEntityTag(responseContext, responseBuilder); + Response response = responseBuilder.build(); + MultivaluedMap metadata = response.getMetadata(); + Assert.assertNull(metadata.get(QueryResource.HEADER_ETAG)); + } + + // Provided ETAG is passed along. + { + ResponseContext responseContext = ResponseContext.createEmpty(); + String etagValue = "myTag"; + responseContext.putEntityTag(etagValue); + Response.ResponseBuilder responseBuilder = Response.ok(); + QueryResource.transferEntityTag(responseContext, responseBuilder); + Response response = responseBuilder.build(); + MultivaluedMap metadata = response.getMetadata(); + Assert.assertEquals(etagValue, metadata.getFirst(QueryResource.HEADER_ETAG)); + } + } +} diff --git a/server/src/test/java/org/apache/druid/server/QueryResourceTest.java b/server/src/test/java/org/apache/druid/server/QueryResourceTest.java index 026af182208..c6cae9e1e1b 100644 --- a/server/src/test/java/org/apache/druid/server/QueryResourceTest.java +++ b/server/src/test/java/org/apache/druid/server/QueryResourceTest.java @@ -893,9 +893,9 @@ public class QueryResourceTest final String queryString = "{\"queryType\":\"timeBoundary\", \"dataSource\":\"allow\"," + "\"context\":{\"queryId\":\"id_1\"}}"; ObjectMapper mapper = new DefaultObjectMapper(); - Query query = mapper.readValue(queryString, Query.class); + Query query = mapper.readValue(queryString, Query.class); - ListenableFuture future = MoreExecutors.listeningDecorator( + ListenableFuture future = MoreExecutors.listeningDecorator( Execs.singleThreaded("test_query_resource_%s") ).submit( new Runnable() @@ -1017,9 +1017,9 @@ public class QueryResourceTest final String queryString = "{\"queryType\":\"timeBoundary\", \"dataSource\":\"allow\"," + "\"context\":{\"queryId\":\"id_1\"}}"; ObjectMapper mapper = new DefaultObjectMapper(); - Query query = mapper.readValue(queryString, Query.class); + Query query = mapper.readValue(queryString, Query.class); - ListenableFuture future = MoreExecutors.listeningDecorator( + ListenableFuture future = MoreExecutors.listeningDecorator( Execs.singleThreaded("test_query_resource_%s") ).submit( new Runnable() diff --git a/server/src/test/java/org/apache/druid/server/TestClusterQuerySegmentWalker.java b/server/src/test/java/org/apache/druid/server/TestClusterQuerySegmentWalker.java index ddf8d4e511b..8e7a64f44cf 100644 --- a/server/src/test/java/org/apache/druid/server/TestClusterQuerySegmentWalker.java +++ b/server/src/test/java/org/apache/druid/server/TestClusterQuerySegmentWalker.java @@ -24,7 +24,6 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; import org.apache.druid.client.SegmentServerSelector; import org.apache.druid.java.util.common.ISE; -import org.apache.druid.java.util.common.NonnullPair; import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.java.util.common.guava.FunctionalIterable; import org.apache.druid.java.util.common.guava.LazySequence; @@ -41,7 +40,6 @@ import org.apache.druid.query.QueryToolChest; import org.apache.druid.query.ReferenceCountingSegmentQueryRunner; import org.apache.druid.query.SegmentDescriptor; import org.apache.druid.query.TableDataSource; -import org.apache.druid.query.context.ResponseContext.Key; import org.apache.druid.query.planning.DataSourceAnalysis; import org.apache.druid.query.spec.SpecificSegmentQueryRunner; import org.apache.druid.query.spec.SpecificSegmentSpec; @@ -62,7 +60,6 @@ import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Function; @@ -167,11 +164,9 @@ public class TestClusterQuerySegmentWalker implements QuerySegmentWalker // the LocalQuerySegmentWalker constructor instead since this walker is not mimic remote DruidServer objects // to actually serve the queries return (theQuery, responseContext) -> { - responseContext.put(Key.REMAINING_RESPONSES_FROM_QUERY_SERVERS, new ConcurrentHashMap<>()); - responseContext.add( - Key.REMAINING_RESPONSES_FROM_QUERY_SERVERS, - new NonnullPair<>(theQuery.getQuery().getMostSpecificId(), 0) - ); + responseContext.initializeRemainingResponses(); + responseContext.addRemainingResponse( + theQuery.getQuery().getMostSpecificId(), 0); if (scheduler != null) { Set segments = new HashSet<>(); specs.forEach(spec -> segments.add(new SegmentServerSelector(spec))); diff --git a/server/src/test/java/org/apache/druid/server/coordination/ServerManagerTest.java b/server/src/test/java/org/apache/druid/server/coordination/ServerManagerTest.java index 2105c52b751..4d476f23a7d 100644 --- a/server/src/test/java/org/apache/druid/server/coordination/ServerManagerTest.java +++ b/server/src/test/java/org/apache/druid/server/coordination/ServerManagerTest.java @@ -63,7 +63,6 @@ import org.apache.druid.query.TableDataSource; import org.apache.druid.query.aggregation.MetricManipulationFn; import org.apache.druid.query.context.DefaultResponseContext; import org.apache.druid.query.context.ResponseContext; -import org.apache.druid.query.context.ResponseContext.Key; import org.apache.druid.query.filter.DimFilter; import org.apache.druid.query.filter.Filter; import org.apache.druid.query.planning.DataSourceAnalysis; @@ -456,8 +455,8 @@ public class ServerManagerTest final ResponseContext responseContext = DefaultResponseContext.createEmpty(); final List> results = queryRunner.run(QueryPlus.wrap(query), responseContext).toList(); Assert.assertTrue(results.isEmpty()); - Assert.assertNotNull(responseContext.get(Key.MISSING_SEGMENTS)); - Assert.assertEquals(unknownSegments, responseContext.get(Key.MISSING_SEGMENTS)); + Assert.assertNotNull(responseContext.getMissingSegments()); + Assert.assertEquals(unknownSegments, responseContext.getMissingSegments()); } @Test @@ -475,8 +474,8 @@ public class ServerManagerTest final ResponseContext responseContext = DefaultResponseContext.createEmpty(); final List> results = queryRunner.run(QueryPlus.wrap(query), responseContext).toList(); Assert.assertTrue(results.isEmpty()); - Assert.assertNotNull(responseContext.get(Key.MISSING_SEGMENTS)); - Assert.assertEquals(unknownSegments, responseContext.get(Key.MISSING_SEGMENTS)); + Assert.assertNotNull(responseContext.getMissingSegments()); + Assert.assertEquals(unknownSegments, responseContext.getMissingSegments()); } @Test @@ -495,8 +494,8 @@ public class ServerManagerTest final ResponseContext responseContext = DefaultResponseContext.createEmpty(); final List> results = queryRunner.run(QueryPlus.wrap(query), responseContext).toList(); Assert.assertTrue(results.isEmpty()); - Assert.assertNotNull(responseContext.get(Key.MISSING_SEGMENTS)); - Assert.assertEquals(unknownSegments, responseContext.get(Key.MISSING_SEGMENTS)); + Assert.assertNotNull(responseContext.getMissingSegments()); + Assert.assertEquals(unknownSegments, responseContext.getMissingSegments()); } @Test @@ -526,8 +525,8 @@ public class ServerManagerTest final ResponseContext responseContext = DefaultResponseContext.createEmpty(); final List> results = queryRunner.run(QueryPlus.wrap(query), responseContext).toList(); Assert.assertTrue(results.isEmpty()); - Assert.assertNotNull(responseContext.get(Key.MISSING_SEGMENTS)); - Assert.assertEquals(closedSegments, responseContext.get(Key.MISSING_SEGMENTS)); + Assert.assertNotNull(responseContext.getMissingSegments()); + Assert.assertEquals(closedSegments, responseContext.getMissingSegments()); } @Test