adding downstream source of QueryInterruptedException

This commit is contained in:
Slim Bouguerra 2016-02-16 13:54:53 -06:00
parent d63eec65a1
commit 77925cc061
10 changed files with 163 additions and 26 deletions

View File

@ -163,15 +163,15 @@ public class ChainedExecutionQueryRunner<T> implements QueryRunner<T>
catch (InterruptedException e) {
log.warn(e, "Query interrupted, cancelling pending results, query id [%s]", query.getId());
futures.cancel(true);
throw new QueryInterruptedException("Query interrupted");
throw new QueryInterruptedException(e);
}
catch (CancellationException e) {
throw new QueryInterruptedException("Query cancelled");
throw new QueryInterruptedException(e);
}
catch (TimeoutException e) {
log.info("Query timeout, cancelling pending results for query id [%s]", query.getId());
futures.cancel(true);
throw new QueryInterruptedException("Query timeout");
throw new QueryInterruptedException(e);
}
catch (ExecutionException e) {
throw Throwables.propagate(e.getCause());

View File

@ -151,17 +151,17 @@ public class GroupByParallelQueryRunner<T> implements QueryRunner<T>
log.warn(e, "Query interrupted, cancelling pending results, query id [%s]", query.getId());
futures.cancel(true);
indexAccumulatorPair.lhs.close();
throw new QueryInterruptedException("Query interrupted");
throw new QueryInterruptedException(e);
}
catch (CancellationException e) {
indexAccumulatorPair.lhs.close();
throw new QueryInterruptedException("Query cancelled");
throw new QueryInterruptedException(e);
}
catch (TimeoutException e) {
indexAccumulatorPair.lhs.close();
log.info("Query timeout, cancelling pending results for query id [%s]", query.getId());
futures.cancel(true);
throw new QueryInterruptedException("Query timeout");
throw new QueryInterruptedException(e);
}
catch (ExecutionException e) {
indexAccumulatorPair.lhs.close();

View File

@ -21,28 +21,88 @@ package io.druid.query;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableSet;
import java.util.Set;
import java.util.concurrent.CancellationException;
import java.util.concurrent.TimeoutException;
public class QueryInterruptedException extends RuntimeException
{
public QueryInterruptedException() {
super();
}
public static final String QUERY_INTERRUPTED = "Query interrupted";
public static final String QUERY_TIMEOUT = "Query timeout";
public static final String QUERY_CANCELLED = "Query cancelled";
public static final String UNKNOWN_EXCEPTION = "Unknown exception";
private static final Set<String> listKnownException = ImmutableSet.of(
QUERY_CANCELLED,
QUERY_INTERRUPTED,
QUERY_TIMEOUT,
UNKNOWN_EXCEPTION
);
@JsonProperty
private final String causeMessage;
@JsonProperty
private final String host;
@JsonCreator
public QueryInterruptedException(@JsonProperty("error") String message)
public QueryInterruptedException(
@JsonProperty("error") String message,
@JsonProperty("causeMessage") String causeMessage,
@JsonProperty("host") String host
)
{
super(message);
this.causeMessage = causeMessage;
this.host = host;
}
public QueryInterruptedException(Throwable cause)
{
super(cause);
this(cause, null);
}
public QueryInterruptedException(Throwable e, String host)
{
super(e);
this.host = host;
causeMessage = e.getMessage();
}
@JsonProperty("error")
@Override
public String getMessage()
{
return super.getMessage();
if (this.getCause() == null) {
return super.getMessage();
} else if (this.getCause() instanceof QueryInterruptedException) {
return getCause().getMessage();
} else if (this.getCause() instanceof InterruptedException) {
return QUERY_INTERRUPTED;
} else if (this.getCause() instanceof CancellationException) {
return QUERY_CANCELLED;
} else if (this.getCause() instanceof TimeoutException) {
return QUERY_TIMEOUT;
} else {
return UNKNOWN_EXCEPTION;
}
}
@JsonProperty("causeMessage")
public String getCauseMessage()
{
return causeMessage;
}
@JsonProperty("host")
public String getHost()
{
return host;
}
public boolean isNotKnown()
{
return !listKnownException.contains(getMessage());
}
}

View File

@ -156,15 +156,15 @@ public class GroupByQueryRunnerFactory implements QueryRunnerFactory<Row, GroupB
catch (InterruptedException e) {
log.warn(e, "Query interrupted, cancelling pending results, query id [%s]", query.getId());
future.cancel(true);
throw new QueryInterruptedException("Query interrupted");
throw new QueryInterruptedException(e);
}
catch (CancellationException e) {
throw new QueryInterruptedException("Query cancelled");
throw new QueryInterruptedException(e);
}
catch (TimeoutException e) {
log.info("Query timeout, cancelling pending results for query id [%s]", query.getId());
future.cancel(true);
throw new QueryInterruptedException("Query timeout");
throw new QueryInterruptedException(e);
}
catch (ExecutionException e) {
throw Throwables.propagate(e.getCause());

View File

@ -184,15 +184,15 @@ public class SegmentMetadataQueryRunnerFactory implements QueryRunnerFactory<Seg
catch (InterruptedException e) {
log.warn(e, "Query interrupted, cancelling pending results, query id [%s]", query.getId());
future.cancel(true);
throw new QueryInterruptedException("Query interrupted");
throw new QueryInterruptedException(e);
}
catch (CancellationException e) {
throw new QueryInterruptedException("Query cancelled");
catch(CancellationException e) {
throw new QueryInterruptedException(e);
}
catch (TimeoutException e) {
log.info("Query timeout, cancelling pending results for query id [%s]", query.getId());
future.cancel(true);
throw new QueryInterruptedException("Query timeout");
throw new QueryInterruptedException(e);
}
catch (ExecutionException e) {
throw Throwables.propagate(e.getCause());

View File

@ -339,7 +339,7 @@ public class QueryableIndexStorageAdapter implements StorageAdapter
public void advance()
{
if (Thread.interrupted()) {
throw new QueryInterruptedException();
throw new QueryInterruptedException(new InterruptedException());
}
cursorOffset.increment();
}

View File

@ -264,7 +264,7 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter
while (baseIter.hasNext()) {
if (Thread.interrupted()) {
throw new QueryInterruptedException();
throw new QueryInterruptedException(new InterruptedException());
}
currEntry.set(baseIter.next());
@ -307,7 +307,7 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter
}
if (Thread.interrupted()) {
throw new QueryInterruptedException();
throw new QueryInterruptedException( new InterruptedException());
}
boolean foundMatched = false;

View File

@ -343,7 +343,7 @@ public class ChainedExecutionQueryRunnerTest
interrupted = true;
interruptedRunners.offer(this);
stop.countDown();
throw new QueryInterruptedException("I got killed");
throw new QueryInterruptedException(e);
}
}

View File

@ -478,8 +478,14 @@ public class DirectDruidClient<T> implements QueryRunner<T>
jp = objectMapper.getFactory().createParser(future.get());
final JsonToken nextToken = jp.nextToken();
if (nextToken == JsonToken.START_OBJECT) {
QueryInterruptedException e = jp.getCodec().readValue(jp, QueryInterruptedException.class);
throw e;
QueryInterruptedException cause = jp.getCodec().readValue(jp, QueryInterruptedException.class);
//case we get an exception with an unknown message.
if (cause.isNotKnown()) {
throw new QueryInterruptedException(QueryInterruptedException.UNKNOWN_EXCEPTION, cause.getMessage(), host);
} else {
throw new QueryInterruptedException(cause, host);
}
} else if (nextToken != JsonToken.START_ARRAY) {
throw new IAE("Next token wasn't a START_ARRAY, was[%s] from url [%s]", jp.getCurrentToken(), url);
} else {
@ -491,7 +497,7 @@ public class DirectDruidClient<T> implements QueryRunner<T>
throw new RE(e, "Failure getting results from[%s] because of [%s]", url, e.getMessage());
}
catch (CancellationException e) {
throw new QueryInterruptedException("Query cancelled");
throw new QueryInterruptedException(e, host);
}
}
}

View File

@ -19,6 +19,8 @@
package io.druid.client;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.util.concurrent.Futures;
@ -259,4 +261,73 @@ public class DirectDruidClientTest
EasyMock.verify(httpClient);
}
@Test
public void testQueryInterruptionExceptionLogMessage() throws JsonProcessingException
{
HttpClient httpClient = EasyMock.createMock(HttpClient.class);
SettableFuture<Object> interruptionFuture = SettableFuture.create();
Capture<Request> capturedRequest = EasyMock.newCapture();
String hostName = "localhost:8080";
EasyMock.expect(
httpClient.go(
EasyMock.capture(capturedRequest),
EasyMock.<HttpResponseHandler>anyObject()
)
)
.andReturn(interruptionFuture)
.anyTimes();
EasyMock.replay(httpClient);
DataSegment dataSegment = new DataSegment(
"test",
new Interval("2013-01-01/2013-01-02"),
new DateTime("2013-01-01").toString(),
Maps.<String, Object>newHashMap(),
Lists.<String>newArrayList(),
Lists.<String>newArrayList(),
new NoneShardSpec(),
0,
0L
);
final ServerSelector serverSelector = new ServerSelector( dataSegment
,
new HighestPriorityTierSelectorStrategy(new ConnectionCountServerSelectorStrategy())
);
DirectDruidClient client1 = new DirectDruidClient(
new ReflectionQueryToolChestWarehouse(),
QueryRunnerTestHelper.NOOP_QUERYWATCHER,
new DefaultObjectMapper(),
httpClient,
hostName,
new NoopServiceEmitter()
);
QueryableDruidServer queryableDruidServer = new QueryableDruidServer(
new DruidServer("test1", hostName, 0, "historical", DruidServer.DEFAULT_TIER, 0),
client1
);
serverSelector.addServerAndUpdateSegment(queryableDruidServer, dataSegment);
TimeBoundaryQuery query = Druids.newTimeBoundaryQueryBuilder().dataSource("test").build();
HashMap<String, List> context = Maps.newHashMap();
interruptionFuture.set(new ByteArrayInputStream("{\"error\":\"testing\"}".getBytes()));
Sequence results = client1.run(query, context);
QueryInterruptedException actualException = null;
try {
Sequences.toList(results, Lists.newArrayList());
}
catch (QueryInterruptedException e) {
actualException = e;
}
Assert.assertNotNull(actualException);
Assert.assertEquals(actualException.getMessage(), QueryInterruptedException.UNKNOWN_EXCEPTION);
Assert.assertEquals(actualException.getCauseMessage(), "testing");
Assert.assertEquals(actualException.getHost(), hostName);
EasyMock.verify(httpClient);
}
}