return proper json errors to propagate query cancellation and timeout

This commit is contained in:
Xavier Léauté 2014-05-12 15:07:49 -07:00
parent def62c74f8
commit d2c729adec
3 changed files with 127 additions and 66 deletions

View File

@ -32,7 +32,12 @@ public class DefaultObjectMapper extends ObjectMapper
{ {
public DefaultObjectMapper() public DefaultObjectMapper()
{ {
this(null); this((JsonFactory)null);
}
public DefaultObjectMapper(DefaultObjectMapper mapper)
{
super(mapper);
} }
public DefaultObjectMapper(JsonFactory factory) public DefaultObjectMapper(JsonFactory factory)
@ -52,4 +57,10 @@ public class DefaultObjectMapper extends ObjectMapper
configure(MapperFeature.AUTO_DETECT_SETTERS, false); configure(MapperFeature.AUTO_DETECT_SETTERS, false);
configure(SerializationFeature.INDENT_OUTPUT, false); configure(SerializationFeature.INDENT_OUTPUT, false);
} }
@Override
public ObjectMapper copy()
{
return new DefaultObjectMapper(this);
}
} }

View File

@ -105,7 +105,7 @@ public class DruidDefaultSerializersModule extends SimpleModule
jgen.writeStartArray(); jgen.writeStartArray();
value.accumulate( value.accumulate(
null, null,
new Accumulator() new Accumulator<Object, Object>()
{ {
@Override @Override
public Object accumulate(Object o, Object o1) public Object accumulate(Object o, Object o1)
@ -116,7 +116,7 @@ public class DruidDefaultSerializersModule extends SimpleModule
catch (IOException e) { catch (IOException e) {
throw Throwables.propagate(e); throw Throwables.propagate(e);
} }
return o; return null;
} }
} }
); );
@ -124,6 +124,28 @@ public class DruidDefaultSerializersModule extends SimpleModule
} }
} }
); );
addSerializer(
Yielder.class,
new JsonSerializer<Yielder>()
{
@Override
public void serialize(Yielder yielder, final JsonGenerator jgen, SerializerProvider provider)
throws IOException, JsonProcessingException
{
jgen.writeStartArray();
try {
while (!yielder.isDone()) {
final Object o = yielder.get();
jgen.writeObject(o);
yielder = yielder.next(null);
}
} finally {
yielder.close();
}
jgen.writeEndArray();
}
}
);
addSerializer(ByteOrder.class, ToStringSerializer.instance); addSerializer(ByteOrder.class, ToStringSerializer.instance);
addDeserializer( addDeserializer(
ByteOrder.class, ByteOrder.class,

View File

@ -19,16 +19,23 @@
package io.druid.server; package io.druid.server;
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectWriter; import com.fasterxml.jackson.databind.ObjectWriter;
import com.google.api.client.repackaged.com.google.common.base.Throwables;
import com.google.common.base.Charsets; import com.google.common.base.Charsets;
import com.google.common.base.Joiner; import com.google.common.base.Joiner;
import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMap;
import com.google.common.io.ByteStreams; import com.google.common.io.ByteStreams;
import com.google.common.io.Closeables; import com.google.common.io.Closeables;
import com.google.inject.Inject; import com.google.inject.Inject;
import com.metamx.common.guava.Accumulator;
import com.metamx.common.guava.Accumulators;
import com.metamx.common.guava.Sequence; import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.Sequences; import com.metamx.common.guava.Sequences;
import com.metamx.common.guava.Yielder;
import com.metamx.common.guava.YieldingAccumulator;
import com.metamx.common.guava.YieldingAccumulators;
import com.metamx.emitter.EmittingLogger; import com.metamx.emitter.EmittingLogger;
import com.metamx.emitter.service.ServiceEmitter; import com.metamx.emitter.service.ServiceEmitter;
import com.metamx.emitter.service.ServiceMetricEvent; import com.metamx.emitter.service.ServiceMetricEvent;
@ -36,21 +43,25 @@ import io.druid.guice.annotations.Json;
import io.druid.guice.annotations.Smile; import io.druid.guice.annotations.Smile;
import io.druid.query.DataSourceUtil; import io.druid.query.DataSourceUtil;
import io.druid.query.Query; import io.druid.query.Query;
import io.druid.query.QueryInterruptedException;
import io.druid.query.QuerySegmentWalker; import io.druid.query.QuerySegmentWalker;
import io.druid.query.QueryWatcher;
import io.druid.server.log.RequestLogger; import io.druid.server.log.RequestLogger;
import org.joda.time.DateTime; import org.joda.time.DateTime;
import javax.servlet.ServletException; import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse; import javax.servlet.http.HttpServletResponse;
import javax.ws.rs.Consumes;
import javax.ws.rs.DELETE; import javax.ws.rs.DELETE;
import javax.ws.rs.POST; import javax.ws.rs.POST;
import javax.ws.rs.Path; import javax.ws.rs.Path;
import javax.ws.rs.PathParam; import javax.ws.rs.PathParam;
import javax.ws.rs.Produces; import javax.ws.rs.Produces;
import javax.ws.rs.WebApplicationException;
import javax.ws.rs.core.Context; import javax.ws.rs.core.Context;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response; import javax.ws.rs.core.Response;
import javax.ws.rs.core.StreamingOutput;
import java.io.IOException; import java.io.IOException;
import java.io.OutputStream; import java.io.OutputStream;
import java.nio.charset.Charset; import java.nio.charset.Charset;
@ -64,6 +75,8 @@ public class QueryResource
private static final EmittingLogger log = new EmittingLogger(QueryResource.class); private static final EmittingLogger log = new EmittingLogger(QueryResource.class);
private static final Charset UTF8 = Charset.forName("UTF-8"); private static final Charset UTF8 = Charset.forName("UTF-8");
private static final Joiner COMMA_JOIN = Joiner.on(","); private static final Joiner COMMA_JOIN = Joiner.on(",");
public static final String APPLICATION_SMILE = "application/smile";
public static final String APPLICATION_JSON = "application/json";
private final ObjectMapper jsonMapper; private final ObjectMapper jsonMapper;
private final ObjectMapper smileMapper; private final ObjectMapper smileMapper;
@ -82,8 +95,12 @@ public class QueryResource
QueryManager queryManager QueryManager queryManager
) )
{ {
this.jsonMapper = jsonMapper; this.jsonMapper = jsonMapper.copy();
this.smileMapper = smileMapper; this.jsonMapper.getFactory().configure(JsonGenerator.Feature.AUTO_CLOSE_TARGET, false);
this.smileMapper = smileMapper.copy();
this.smileMapper.getFactory().configure(JsonGenerator.Feature.AUTO_CLOSE_TARGET, false);
this.texasRanger = texasRanger; this.texasRanger = texasRanger;
this.emitter = emitter; this.emitter = emitter;
this.requestLogger = requestLogger; this.requestLogger = requestLogger;
@ -97,13 +114,13 @@ public class QueryResource
{ {
queryManager.cancelQuery(queryId); queryManager.cancelQuery(queryId);
return Response.status(Response.Status.ACCEPTED).build(); return Response.status(Response.Status.ACCEPTED).build();
} }
@POST @POST
@Produces("application/json") public Response doPost(
public void doPost(
@Context HttpServletRequest req, @Context HttpServletRequest req,
@Context HttpServletResponse resp @Context final HttpServletResponse resp
) throws ServletException, IOException ) throws ServletException, IOException
{ {
final long start = System.currentTimeMillis(); final long start = System.currentTimeMillis();
@ -111,13 +128,12 @@ public class QueryResource
byte[] requestQuery = null; byte[] requestQuery = null;
String queryId; String queryId;
final boolean isSmile = "application/smile".equals(req.getContentType()); final boolean isSmile = APPLICATION_SMILE.equals(req.getContentType());
ObjectMapper objectMapper = isSmile ? smileMapper : jsonMapper; ObjectMapper objectMapper = isSmile ? smileMapper : jsonMapper;
ObjectWriter jsonWriter = req.getParameter("pretty") == null final ObjectWriter jsonWriter = req.getParameter("pretty") == null
? objectMapper.writer() ? objectMapper.writer()
: objectMapper.writerWithDefaultPrettyPrinter(); : objectMapper.writerWithDefaultPrettyPrinter();
OutputStream out = null;
try { try {
requestQuery = ByteStreams.toByteArray(req.getInputStream()); requestQuery = ByteStreams.toByteArray(req.getInputStream());
@ -132,21 +148,26 @@ public class QueryResource
log.debug("Got query [%s]", query); log.debug("Got query [%s]", query);
} }
Sequence<?> results = query.run(texasRanger); Sequence results = query.run(texasRanger);
if (results == null) { if (results == null) {
results = Sequences.empty(); results = Sequences.empty();
} }
resp.setStatus(200); try (
resp.setContentType("application/x-javascript"); final Yielder yielder = results.toYielder(
resp.setHeader("X-Druid-Query-Id", query.getId()); null,
new YieldingAccumulator()
out = resp.getOutputStream(); {
jsonWriter.writeValue(out, results); @Override
public Object accumulate(Object accumulated, Object in)
// JsonGenerator jgen = jsonWriter.getFactory().createGenerator(out); {
yield();
return in;
}
}
)
) {
long requestTime = System.currentTimeMillis() - start; long requestTime = System.currentTimeMillis() - start;
emitter.emit( emitter.emit(
@ -174,6 +195,23 @@ public class QueryResource
) )
) )
); );
return Response
.ok(
new StreamingOutput()
{
@Override
public void write(OutputStream outputStream) throws IOException, WebApplicationException
{
jsonWriter.writeValue(outputStream, yielder);
outputStream.close();
}
},
isSmile ? APPLICATION_JSON : APPLICATION_SMILE
)
.header("X-Druid-Query-Id", queryId)
.build();
}
} }
catch (Exception e) { catch (Exception e) {
final String queryString = final String queryString =
@ -183,20 +221,6 @@ public class QueryResource
log.warn(e, "Exception occurred on request [%s]", queryString); log.warn(e, "Exception occurred on request [%s]", queryString);
if (!resp.isCommitted()) {
resp.setStatus(500);
resp.resetBuffer();
if (out == null) {
out = resp.getOutputStream();
}
out.write((e.getMessage() == null) ? "Exception null".getBytes(UTF8) : e.getMessage().getBytes(UTF8));
out.write("\n".getBytes(UTF8));
}
resp.flushBuffer();
try { try {
requestLogger.log( requestLogger.log(
new RequestLogLine( new RequestLogLine(
@ -216,10 +240,14 @@ public class QueryResource
.addData("query", queryString) .addData("query", queryString)
.addData("peer", req.getRemoteAddr()) .addData("peer", req.getRemoteAddr())
.emit(); .emit();
}
finally { return Response.serverError().entity(
resp.flushBuffer(); jsonWriter.writeValueAsString(
Closeables.closeQuietly(out); ImmutableMap.of(
"error", (e.getMessage() == null) ? "null Exception" : e.getMessage()
)
)
).build();
} }
} }
} }