pass through errors from computes in DirectDruidClient

This commit is contained in:
Xavier Léauté 2014-05-14 16:58:41 -07:00
parent 32f6243be0
commit 4f1e157639
2 changed files with 19 additions and 2 deletions

View File

@ -19,13 +19,17 @@
package io.druid.query; package io.druid.query;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
public class QueryInterruptedException extends RuntimeException public class QueryInterruptedException extends RuntimeException
{ {
public QueryInterruptedException() { public QueryInterruptedException() {
super(); super();
} }
public QueryInterruptedException(String message) @JsonCreator
public QueryInterruptedException(@JsonProperty("error") String message)
{ {
super(message); super(message);
} }
@ -34,4 +38,11 @@ public class QueryInterruptedException extends RuntimeException
{ {
super(cause); super(cause);
} }
@JsonProperty("error")
@Override
public String getMessage()
{
return super.getMessage();
}
} }

View File

@ -45,6 +45,7 @@ import com.metamx.http.client.response.ClientResponse;
import com.metamx.http.client.response.InputStreamResponseHandler; import com.metamx.http.client.response.InputStreamResponseHandler;
import io.druid.query.BySegmentResultValueClass; import io.druid.query.BySegmentResultValueClass;
import io.druid.query.Query; import io.druid.query.Query;
import io.druid.query.QueryInterruptedException;
import io.druid.query.QueryRunner; import io.druid.query.QueryRunner;
import io.druid.query.QueryToolChest; import io.druid.query.QueryToolChest;
import io.druid.query.QueryToolChestWarehouse; import io.druid.query.QueryToolChestWarehouse;
@ -283,7 +284,12 @@ public class DirectDruidClient<T> implements QueryRunner<T>
if (jp == null) { if (jp == null) {
try { try {
jp = objectMapper.getFactory().createParser(future.get()); jp = objectMapper.getFactory().createParser(future.get());
if (jp.nextToken() != JsonToken.START_ARRAY) { final JsonToken nextToken = jp.nextToken();
if (nextToken == JsonToken.START_OBJECT) {
QueryInterruptedException e = jp.getCodec().readValue(jp, QueryInterruptedException.class);
throw e;
}
else if (nextToken != JsonToken.START_ARRAY) {
throw new IAE("Next token wasn't a START_ARRAY, was[%s] from url [%s]", jp.getCurrentToken(), url); throw new IAE("Next token wasn't a START_ARRAY, was[%s] from url [%s]", jp.getCurrentToken(), url);
} else { } else {
jp.nextToken(); jp.nextToken();