diff --git a/server/src/main/java/org/apache/druid/client/DirectDruidClient.java b/server/src/main/java/org/apache/druid/client/DirectDruidClient.java index 60bdde4094d..06976c1a5b4 100644 --- a/server/src/main/java/org/apache/druid/client/DirectDruidClient.java +++ b/server/src/main/java/org/apache/druid/client/DirectDruidClient.java @@ -29,6 +29,7 @@ import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import org.apache.druid.java.util.common.RE; import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.java.util.common.guava.BaseSequence; import org.apache.druid.java.util.common.guava.CloseQuietly; import org.apache.druid.java.util.common.guava.Sequence; @@ -69,9 +70,10 @@ import java.net.URL; import java.util.Enumeration; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; @@ -95,6 +97,7 @@ public class DirectDruidClient implements QueryRunner private final AtomicInteger openConnections; private final boolean isSmile; + private final ScheduledExecutorService queryCancellationExecutor; /** * Removes the magical fields added by {@link #makeResponseContextForQuery()}. @@ -131,6 +134,7 @@ public class DirectDruidClient implements QueryRunner this.isSmile = this.objectMapper.getFactory() instanceof SmileFactory; this.openConnections = new AtomicInteger(); + this.queryCancellationExecutor = Execs.scheduledSingleThreaded("query-cancellation-executor"); } public int getNumOpenConnections() @@ -465,38 +469,12 @@ public class DirectDruidClient implements QueryRunner { openConnections.getAndDecrement(); if (future.isCancelled()) { - // forward the cancellation to underlying queriable node - try { - StatusResponseHolder res = httpClient.go( - new Request( - HttpMethod.DELETE, - new URL(cancelUrl) - ).setContent(objectMapper.writeValueAsBytes(query)) - .setHeader( - HttpHeaders.Names.CONTENT_TYPE, - isSmile - ? SmileMediaTypes.APPLICATION_JACKSON_SMILE - : MediaType.APPLICATION_JSON - ), - StatusResponseHandler.getInstance(), - Duration.standardSeconds(1) - ).get(1, TimeUnit.SECONDS); - - if (res.getStatus().getCode() >= 500) { - throw new RE( - "Error cancelling query[%s]: queriable node returned status[%d] [%s].", - query, - res.getStatus().getCode(), - res.getStatus().getReasonPhrase() - ); - } - } - catch (IOException | ExecutionException | InterruptedException | TimeoutException e) { - throw new RuntimeException(e); - } + cancelQuery(query, cancelUrl); } } - } + }, + // The callback is non-blocking and quick, so it's OK to schedule it using directExecutor() + Execs.directExecutor() ); } catch (IOException e) { @@ -543,6 +521,43 @@ public class DirectDruidClient implements QueryRunner return retVal; } + private void cancelQuery(Query query, String cancelUrl) + { + Runnable cancelRunnable = () -> { + try { + Future responseFuture = httpClient.go( + new Request(HttpMethod.DELETE, new URL(cancelUrl)) + .setContent(objectMapper.writeValueAsBytes(query)) + .setHeader(HttpHeaders.Names.CONTENT_TYPE, isSmile ? SmileMediaTypes.APPLICATION_JACKSON_SMILE : MediaType.APPLICATION_JSON), + StatusResponseHandler.getInstance(), + Duration.standardSeconds(1)); + + Runnable checkRunnable = () -> { + try { + if (!responseFuture.isDone()) { + log.error("Error cancelling query[%s]", query); + } + StatusResponseHolder response = responseFuture.get(); + if (response.getStatus().getCode() >= 500) { + log.error("Error cancelling query[%s]: queriable node returned status[%d] [%s].", + query, + response.getStatus().getCode(), + response.getStatus().getReasonPhrase()); + } + } + catch (ExecutionException | InterruptedException e) { + log.error(e, "Error cancelling query[%s]", query); + } + }; + queryCancellationExecutor.schedule(checkRunnable, 5, TimeUnit.SECONDS); + } + catch (IOException e) { + log.error(e, "Error cancelling query[%s]", query); + } + }; + queryCancellationExecutor.submit(cancelRunnable); + } + @Override public String toString() { diff --git a/server/src/test/java/org/apache/druid/client/DirectDruidClientTest.java b/server/src/test/java/org/apache/druid/client/DirectDruidClientTest.java index caa549a5b14..6c50249693f 100644 --- a/server/src/test/java/org/apache/druid/client/DirectDruidClientTest.java +++ b/server/src/test/java/org/apache/druid/client/DirectDruidClientTest.java @@ -252,7 +252,7 @@ public class DirectDruidClientTest ) ) .andReturn(cancellationFuture) - .once(); + .anyTimes(); EasyMock.replay(httpClient); @@ -261,7 +261,7 @@ public class DirectDruidClientTest query = query.withOverriddenContext(ImmutableMap.of(DirectDruidClient.QUERY_FAIL_TIME, Long.MAX_VALUE)); cancellationFuture.set(new StatusResponseHolder(HttpResponseStatus.OK, new StringBuilder("cancelled"))); Sequence results = client.run(QueryPlus.wrap(query)); - Assert.assertEquals(HttpMethod.DELETE, capturedRequest.getValue().getMethod()); + Assert.assertEquals(HttpMethod.POST, capturedRequest.getValue().getMethod()); Assert.assertEquals(0, client.getNumOpenConnections());