In DirectDruidClient, don't run Future cancellation listener in… (#8700)

* In DirectDruidClient, don't run Future cancellation listener in HTTP library executor

* extract cancelQuery as a method of DirectDruidClient

* Fix testCancel

* Add exception as the first argument to log.error
This commit is contained in:
Zhenxiao Luo 2019-11-07 10:12:18 -08:00 committed by Roman Leventov
parent fca23d0c32
commit a9aa416c3d
2 changed files with 48 additions and 33 deletions

View File

@ -29,6 +29,7 @@ import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListenableFuture;
import org.apache.druid.java.util.common.RE; import org.apache.druid.java.util.common.RE;
import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.guava.BaseSequence; import org.apache.druid.java.util.common.guava.BaseSequence;
import org.apache.druid.java.util.common.guava.CloseQuietly; import org.apache.druid.java.util.common.guava.CloseQuietly;
import org.apache.druid.java.util.common.guava.Sequence; import org.apache.druid.java.util.common.guava.Sequence;
@ -69,9 +70,10 @@ import java.net.URL;
import java.util.Enumeration; import java.util.Enumeration;
import java.util.concurrent.BlockingQueue; import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
@ -95,6 +97,7 @@ public class DirectDruidClient<T> implements QueryRunner<T>
private final AtomicInteger openConnections; private final AtomicInteger openConnections;
private final boolean isSmile; private final boolean isSmile;
private final ScheduledExecutorService queryCancellationExecutor;
/** /**
* Removes the magical fields added by {@link #makeResponseContextForQuery()}. * Removes the magical fields added by {@link #makeResponseContextForQuery()}.
@ -131,6 +134,7 @@ public class DirectDruidClient<T> implements QueryRunner<T>
this.isSmile = this.objectMapper.getFactory() instanceof SmileFactory; this.isSmile = this.objectMapper.getFactory() instanceof SmileFactory;
this.openConnections = new AtomicInteger(); this.openConnections = new AtomicInteger();
this.queryCancellationExecutor = Execs.scheduledSingleThreaded("query-cancellation-executor");
} }
public int getNumOpenConnections() public int getNumOpenConnections()
@ -465,38 +469,12 @@ public class DirectDruidClient<T> implements QueryRunner<T>
{ {
openConnections.getAndDecrement(); openConnections.getAndDecrement();
if (future.isCancelled()) { if (future.isCancelled()) {
// forward the cancellation to underlying queriable node cancelQuery(query, cancelUrl);
try {
StatusResponseHolder res = httpClient.go(
new Request(
HttpMethod.DELETE,
new URL(cancelUrl)
).setContent(objectMapper.writeValueAsBytes(query))
.setHeader(
HttpHeaders.Names.CONTENT_TYPE,
isSmile
? SmileMediaTypes.APPLICATION_JACKSON_SMILE
: MediaType.APPLICATION_JSON
),
StatusResponseHandler.getInstance(),
Duration.standardSeconds(1)
).get(1, TimeUnit.SECONDS);
if (res.getStatus().getCode() >= 500) {
throw new RE(
"Error cancelling query[%s]: queriable node returned status[%d] [%s].",
query,
res.getStatus().getCode(),
res.getStatus().getReasonPhrase()
);
}
}
catch (IOException | ExecutionException | InterruptedException | TimeoutException e) {
throw new RuntimeException(e);
}
}
} }
} }
},
// The callback is non-blocking and quick, so it's OK to schedule it using directExecutor()
Execs.directExecutor()
); );
} }
catch (IOException e) { catch (IOException e) {
@ -543,6 +521,43 @@ public class DirectDruidClient<T> implements QueryRunner<T>
return retVal; return retVal;
} }
private <T> void cancelQuery(Query<T> query, String cancelUrl)
{
Runnable cancelRunnable = () -> {
try {
Future<StatusResponseHolder> responseFuture = httpClient.go(
new Request(HttpMethod.DELETE, new URL(cancelUrl))
.setContent(objectMapper.writeValueAsBytes(query))
.setHeader(HttpHeaders.Names.CONTENT_TYPE, isSmile ? SmileMediaTypes.APPLICATION_JACKSON_SMILE : MediaType.APPLICATION_JSON),
StatusResponseHandler.getInstance(),
Duration.standardSeconds(1));
Runnable checkRunnable = () -> {
try {
if (!responseFuture.isDone()) {
log.error("Error cancelling query[%s]", query);
}
StatusResponseHolder response = responseFuture.get();
if (response.getStatus().getCode() >= 500) {
log.error("Error cancelling query[%s]: queriable node returned status[%d] [%s].",
query,
response.getStatus().getCode(),
response.getStatus().getReasonPhrase());
}
}
catch (ExecutionException | InterruptedException e) {
log.error(e, "Error cancelling query[%s]", query);
}
};
queryCancellationExecutor.schedule(checkRunnable, 5, TimeUnit.SECONDS);
}
catch (IOException e) {
log.error(e, "Error cancelling query[%s]", query);
}
};
queryCancellationExecutor.submit(cancelRunnable);
}
@Override @Override
public String toString() public String toString()
{ {

View File

@ -252,7 +252,7 @@ public class DirectDruidClientTest
) )
) )
.andReturn(cancellationFuture) .andReturn(cancellationFuture)
.once(); .anyTimes();
EasyMock.replay(httpClient); EasyMock.replay(httpClient);
@ -261,7 +261,7 @@ public class DirectDruidClientTest
query = query.withOverriddenContext(ImmutableMap.of(DirectDruidClient.QUERY_FAIL_TIME, Long.MAX_VALUE)); query = query.withOverriddenContext(ImmutableMap.of(DirectDruidClient.QUERY_FAIL_TIME, Long.MAX_VALUE));
cancellationFuture.set(new StatusResponseHolder(HttpResponseStatus.OK, new StringBuilder("cancelled"))); cancellationFuture.set(new StatusResponseHolder(HttpResponseStatus.OK, new StringBuilder("cancelled")));
Sequence results = client.run(QueryPlus.wrap(query)); Sequence results = client.run(QueryPlus.wrap(query));
Assert.assertEquals(HttpMethod.DELETE, capturedRequest.getValue().getMethod()); Assert.assertEquals(HttpMethod.POST, capturedRequest.getValue().getMethod());
Assert.assertEquals(0, client.getNumOpenConnections()); Assert.assertEquals(0, client.getNumOpenConnections());