Merge pull request #446 from liquidm/add_druiddirectclient_logging

add url logging to DirectDruidClient for debugging purposes
This commit is contained in:
fjy 2014-03-27 16:41:03 -06:00
commit 366872ffe9
1 changed files with 9 additions and 6 deletions

View File

@ -34,6 +34,7 @@ import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListenableFuture;
import com.metamx.common.IAE; import com.metamx.common.IAE;
import com.metamx.common.Pair; import com.metamx.common.Pair;
import com.metamx.common.RE;
import com.metamx.common.guava.BaseSequence; import com.metamx.common.guava.BaseSequence;
import com.metamx.common.guava.Sequence; import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.Sequences; import com.metamx.common.guava.Sequences;
@ -203,7 +204,7 @@ public class DirectDruidClient<T> implements QueryRunner<T>
@Override @Override
public JsonParserIterator<T> make() public JsonParserIterator<T> make()
{ {
return new JsonParserIterator<T>(typeRef, future); return new JsonParserIterator<T>(typeRef, future, url);
} }
@Override @Override
@ -239,11 +240,13 @@ public class DirectDruidClient<T> implements QueryRunner<T>
private ObjectCodec objectCodec; private ObjectCodec objectCodec;
private final JavaType typeRef; private final JavaType typeRef;
private final Future<InputStream> future; private final Future<InputStream> future;
private final String url;
public JsonParserIterator(JavaType typeRef, Future<InputStream> future) public JsonParserIterator(JavaType typeRef, Future<InputStream> future, String url)
{ {
this.typeRef = typeRef; this.typeRef = typeRef;
this.future = future; this.future = future;
this.url = url;
jp = null; jp = null;
} }
@ -289,20 +292,20 @@ public class DirectDruidClient<T> implements QueryRunner<T>
try { try {
jp = objectMapper.getFactory().createParser(future.get()); jp = objectMapper.getFactory().createParser(future.get());
if (jp.nextToken() != JsonToken.START_ARRAY) { if (jp.nextToken() != JsonToken.START_ARRAY) {
throw new IAE("Next token wasn't a START_ARRAY, was[%s]", jp.getCurrentToken()); throw new IAE("Next token wasn't a START_ARRAY, was[%s] from url [%s]", jp.getCurrentToken(), url);
} else { } else {
jp.nextToken(); jp.nextToken();
objectCodec = jp.getCodec(); objectCodec = jp.getCodec();
} }
} }
catch (IOException e) { catch (IOException e) {
throw Throwables.propagate(e); throw new RE(e, "Failure getting results from[%s]", url);
} }
catch (InterruptedException e) { catch (InterruptedException e) {
throw Throwables.propagate(e); throw new RE(e, "Failure getting results from[%s]", url);
} }
catch (ExecutionException e) { catch (ExecutionException e) {
throw Throwables.propagate(e); throw new RE(e, "Failure getting results from[%s]", url);
} }
} }
} }