Fix defaultQueryTimeout (#5807)

* Fix defaultQueryTimeout

- set default timeout in query context before query fail time is evaluated

Remove unused import

* Address failing checks

* Addressing code review comments

* Removed line that was no longer used
This commit is contained in:
awelsh93 2018-06-08 23:34:10 +01:00 committed by Jihoon Son
parent 96feb479cd
commit 6f0aedd6ab
4 changed files with 14 additions and 14 deletions

View File

@ -116,21 +116,16 @@ public class DirectDruidClient<T> implements QueryRunner<T>
private final boolean isSmile;
/**
* Removes the magical fields added by {@link #makeResponseContextForQuery(Query, long)}.
* Removes the magical fields added by {@link #makeResponseContextForQuery()}.
*/
public static void removeMagicResponseContextFields(Map<String, Object> responseContext)
{
responseContext.remove(DirectDruidClient.QUERY_FAIL_TIME);
responseContext.remove(DirectDruidClient.QUERY_TOTAL_BYTES_GATHERED);
}
public static Map<String, Object> makeResponseContextForQuery(Query query, long startTimeMillis)
public static Map<String, Object> makeResponseContextForQuery()
{
final Map<String, Object> responseContext = new ConcurrentHashMap<>();
responseContext.put(
DirectDruidClient.QUERY_FAIL_TIME,
startTimeMillis + QueryContexts.getTimeout(query)
);
responseContext.put(
DirectDruidClient.QUERY_TOTAL_BYTES_GATHERED,
new AtomicLong()
@ -199,7 +194,7 @@ public class DirectDruidClient<T> implements QueryRunner<T>
final long requestStartTimeNs = System.nanoTime();
long timeoutAt = ((Long) context.get(QUERY_FAIL_TIME)).longValue();
long timeoutAt = query.getContextValue(QUERY_FAIL_TIME);
long maxScatterGatherBytes = QueryContexts.getMaxScatterGatherBytes(query);
AtomicLong totalBytesGathered = (AtomicLong) context.get(QUERY_TOTAL_BYTES_GATHERED);

View File

@ -247,10 +247,7 @@ public class QueryLifecycle
{
transition(State.AUTHORIZED, State.EXECUTING);
final Map<String, Object> responseContext = DirectDruidClient.makeResponseContextForQuery(
baseQuery,
System.currentTimeMillis()
);
final Map<String, Object> responseContext = DirectDruidClient.makeResponseContextForQuery();
final Sequence res = QueryPlus.wrap(baseQuery)
.withIdentity(authenticationResult.getIdentity())

View File

@ -25,6 +25,8 @@ import io.druid.query.QueryContexts;
import io.druid.query.QueryPlus;
import io.druid.query.QueryRunner;
import io.druid.server.initialization.ServerConfig;
import io.druid.client.DirectDruidClient;
import com.google.common.collect.ImmutableMap;
import java.util.Map;
@ -35,11 +37,13 @@ public class SetAndVerifyContextQueryRunner<T> implements QueryRunner<T>
{
private final ServerConfig serverConfig;
private final QueryRunner<T> baseRunner;
private final long startTimeMillis;
public SetAndVerifyContextQueryRunner(ServerConfig serverConfig, QueryRunner<T> baseRunner)
{
this.serverConfig = serverConfig;
this.baseRunner = baseRunner;
this.startTimeMillis = System.currentTimeMillis();
}
@Override
@ -53,7 +57,7 @@ public class SetAndVerifyContextQueryRunner<T> implements QueryRunner<T>
public Query<T> withTimeoutAndMaxScatterGatherBytes(Query<T> query, ServerConfig serverConfig)
{
return QueryContexts.verifyMaxQueryTimeout(
Query<T> newQuery = QueryContexts.verifyMaxQueryTimeout(
QueryContexts.withMaxScatterGatherBytes(
QueryContexts.withDefaultTimeout(
query,
@ -63,5 +67,6 @@ public class SetAndVerifyContextQueryRunner<T> implements QueryRunner<T>
),
serverConfig.getMaxQueryTimeout()
);
return newQuery.withOverriddenContext(ImmutableMap.of(DirectDruidClient.QUERY_FAIL_TIME, this.startTimeMillis + QueryContexts.getTimeout(newQuery)));
}
}

View File

@ -19,6 +19,7 @@
package io.druid.client;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.util.concurrent.Futures;
@ -163,7 +164,7 @@ public class DirectDruidClientTest
serverSelector.addServerAndUpdateSegment(queryableDruidServer2, serverSelector.getSegment());
TimeBoundaryQuery query = Druids.newTimeBoundaryQueryBuilder().dataSource("test").build();
query = query.withOverriddenContext(ImmutableMap.of(DirectDruidClient.QUERY_FAIL_TIME, Long.MAX_VALUE));
Sequence s1 = client1.run(QueryPlus.wrap(query), defaultContext);
Assert.assertTrue(capturedRequest.hasCaptured());
Assert.assertEquals(url, capturedRequest.getValue().getUrl());
@ -267,6 +268,7 @@ public class DirectDruidClientTest
serverSelector.addServerAndUpdateSegment(queryableDruidServer1, serverSelector.getSegment());
TimeBoundaryQuery query = Druids.newTimeBoundaryQueryBuilder().dataSource("test").build();
query = query.withOverriddenContext(ImmutableMap.of(DirectDruidClient.QUERY_FAIL_TIME, Long.MAX_VALUE));
cancellationFuture.set(new StatusResponseHolder(HttpResponseStatus.OK, new StringBuilder("cancelled")));
Sequence results = client1.run(QueryPlus.wrap(query), defaultContext);
Assert.assertEquals(HttpMethod.DELETE, capturedRequest.getValue().getMethod());
@ -338,6 +340,7 @@ public class DirectDruidClientTest
serverSelector.addServerAndUpdateSegment(queryableDruidServer, dataSegment);
TimeBoundaryQuery query = Druids.newTimeBoundaryQueryBuilder().dataSource("test").build();
query = query.withOverriddenContext(ImmutableMap.of(DirectDruidClient.QUERY_FAIL_TIME, Long.MAX_VALUE));
interruptionFuture.set(
new ByteArrayInputStream(
StringUtils.toUtf8("{\"error\":\"testing1\",\"errorMessage\":\"testing2\"}")