SQL: Better error handling for HTTP API. (#4053)

* SQL: Better error handling for HTTP API.

* Fix test.
This commit is contained in:
Gian Merlino 2017-03-15 11:18:00 -07:00 committed by Fangjin Yang
parent db15d494ca
commit 403fbae7b1
2 changed files with 120 additions and 90 deletions

View File

@ -22,6 +22,7 @@ package io.druid.sql.http;
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.inject.Inject;
import io.druid.guice.annotations.Json;
import io.druid.java.util.common.ISE;
@ -84,6 +85,74 @@ public class SqlResource
try (final DruidPlanner planner = plannerFactory.createPlanner(sqlQuery.getContext())) {
plannerResult = planner.plan(sqlQuery.getQuery());
timeZone = planner.getPlannerContext().getTimeZone();
// Remember which columns are time-typed, so we can emit ISO8601 instead of millis values.
final List<RelDataTypeField> fieldList = plannerResult.rowType().getFieldList();
final boolean[] timeColumns = new boolean[fieldList.size()];
final boolean[] dateColumns = new boolean[fieldList.size()];
for (int i = 0; i < fieldList.size(); i++) {
final SqlTypeName sqlTypeName = fieldList.get(i).getType().getSqlTypeName();
timeColumns[i] = sqlTypeName == SqlTypeName.TIMESTAMP;
dateColumns[i] = sqlTypeName == SqlTypeName.DATE;
}
final Yielder<Object[]> yielder0 = Yielders.each(plannerResult.run());
try {
return Response.ok(
new StreamingOutput()
{
@Override
public void write(final OutputStream outputStream) throws IOException, WebApplicationException
{
Yielder<Object[]> yielder = yielder0;
try (final JsonGenerator jsonGenerator = jsonMapper.getFactory().createGenerator(outputStream)) {
jsonGenerator.writeStartArray();
while (!yielder.isDone()) {
final Object[] row = yielder.get();
jsonGenerator.writeStartObject();
for (int i = 0; i < fieldList.size(); i++) {
final Object value;
if (timeColumns[i]) {
value = ISODateTimeFormat.dateTime().print(
Calcites.calciteTimestampToJoda((long) row[i], timeZone)
);
} else if (dateColumns[i]) {
value = ISODateTimeFormat.dateTime().print(
Calcites.calciteDateToJoda((int) row[i], timeZone)
);
} else {
value = row[i];
}
jsonGenerator.writeObjectField(fieldList.get(i).getName(), value);
}
jsonGenerator.writeEndObject();
yielder = yielder.next(null);
}
jsonGenerator.writeEndArray();
jsonGenerator.flush();
// End with CRLF
outputStream.write('\r');
outputStream.write('\n');
}
finally {
yielder.close();
}
}
}
).build();
}
catch (Throwable e) {
// make sure to close yielder if anything happened before starting to serialize the response.
yielder0.close();
throw Throwables.propagate(e);
}
}
catch (Exception e) {
log.warn(e, "Failed to handle query: %s", sqlQuery);
@ -101,66 +170,5 @@ public class SqlResource
.entity(jsonMapper.writeValueAsBytes(QueryInterruptedException.wrapIfNeeded(exceptionToReport)))
.build();
}
// Remember which columns are time-typed, so we can emit ISO8601 instead of millis values.
final List<RelDataTypeField> fieldList = plannerResult.rowType().getFieldList();
final boolean[] timeColumns = new boolean[fieldList.size()];
final boolean[] dateColumns = new boolean[fieldList.size()];
for (int i = 0; i < fieldList.size(); i++) {
final SqlTypeName sqlTypeName = fieldList.get(i).getType().getSqlTypeName();
timeColumns[i] = sqlTypeName == SqlTypeName.TIMESTAMP;
dateColumns[i] = sqlTypeName == SqlTypeName.DATE;
}
final Yielder<Object[]> yielder0 = Yielders.each(plannerResult.run());
return Response.ok(
new StreamingOutput()
{
@Override
public void write(final OutputStream outputStream) throws IOException, WebApplicationException
{
Yielder<Object[]> yielder = yielder0;
try (final JsonGenerator jsonGenerator = jsonMapper.getFactory().createGenerator(outputStream)) {
jsonGenerator.writeStartArray();
while (!yielder.isDone()) {
final Object[] row = yielder.get();
jsonGenerator.writeStartObject();
for (int i = 0; i < fieldList.size(); i++) {
final Object value;
if (timeColumns[i]) {
value = ISODateTimeFormat.dateTime().print(
Calcites.calciteTimestampToJoda((long) row[i], timeZone)
);
} else if (dateColumns[i]) {
value = ISODateTimeFormat.dateTime().print(
Calcites.calciteDateToJoda((int) row[i], timeZone)
);
} else {
value = row[i];
}
jsonGenerator.writeObjectField(fieldList.get(i).getName(), value);
}
jsonGenerator.writeEndObject();
yielder = yielder.next(null);
}
jsonGenerator.writeEndArray();
jsonGenerator.flush();
// End with CRLF
outputStream.write('\r');
outputStream.write('\n');
}
finally {
yielder.close();
}
}
}
).build();
}
}

View File

@ -24,7 +24,10 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.java.util.common.ISE;
import io.druid.java.util.common.Pair;
import io.druid.query.QueryInterruptedException;
import io.druid.query.ResourceLimitExceededException;
import io.druid.sql.calcite.planner.Calcites;
import io.druid.sql.calcite.planner.DruidOperatorTable;
import io.druid.sql.calcite.planner.PlannerConfig;
@ -36,12 +39,12 @@ import io.druid.sql.calcite.util.SpecificSegmentsQuerySegmentWalker;
import io.druid.sql.http.SqlQuery;
import io.druid.sql.http.SqlResource;
import org.apache.calcite.schema.SchemaPlus;
import org.apache.calcite.tools.ValidationException;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.rules.TemporaryFolder;
import javax.ws.rs.core.Response;
@ -54,9 +57,6 @@ public class SqlResourceTest
{
private static final ObjectMapper JSON_MAPPER = new DefaultObjectMapper();
@Rule
public ExpectedException expectedException = ExpectedException.none();
@Rule
public TemporaryFolder temporaryFolder = new TemporaryFolder();
@ -92,7 +92,7 @@ public class SqlResourceTest
{
final List<Map<String, Object>> rows = doPost(
new SqlQuery("SELECT COUNT(*) AS cnt FROM druid.foo", null)
);
).rhs;
Assert.assertEquals(
ImmutableList.of(
@ -107,7 +107,7 @@ public class SqlResourceTest
{
final List<Map<String, Object>> rows = doPost(
new SqlQuery("SELECT __time, CAST(__time AS DATE) AS t2 FROM druid.foo LIMIT 1", null)
);
).rhs;
Assert.assertEquals(
ImmutableList.of(
@ -125,7 +125,7 @@ public class SqlResourceTest
"SELECT __time, CAST(__time AS DATE) AS t2 FROM druid.foo LIMIT 1",
ImmutableMap.<String, Object>of(PlannerContext.CTX_SQL_TIME_ZONE, "America/Los_Angeles")
)
);
).rhs;
Assert.assertEquals(
ImmutableList.of(
@ -140,7 +140,7 @@ public class SqlResourceTest
{
final List<Map<String, Object>> rows = doPost(
new SqlQuery("SELECT dim2 \"x\", dim2 \"y\" FROM druid.foo LIMIT 1", null)
);
).rhs;
Assert.assertEquals(
ImmutableList.of(
@ -155,7 +155,7 @@ public class SqlResourceTest
{
final List<Map<String, Object>> rows = doPost(
new SqlQuery("SELECT dim2 \"x\", dim2 \"y\" FROM druid.foo GROUP BY dim2", null)
);
).rhs;
Assert.assertEquals(
ImmutableList.of(
@ -172,7 +172,7 @@ public class SqlResourceTest
{
final List<Map<String, Object>> rows = doPost(
new SqlQuery("EXPLAIN PLAN FOR SELECT COUNT(*) AS cnt FROM druid.foo", null)
);
).rhs;
Assert.assertEquals(
ImmutableList.of(
@ -188,43 +188,65 @@ public class SqlResourceTest
@Test
public void testCannotValidate() throws Exception
{
expectedException.expect(QueryInterruptedException.class);
expectedException.expectMessage("Column 'dim3' not found in any table");
final QueryInterruptedException exception = doPost(new SqlQuery("SELECT dim3 FROM druid.foo", null)).lhs;
doPost(
new SqlQuery("SELECT dim3 FROM druid.foo", null)
);
Assert.fail();
Assert.assertNotNull(exception);
Assert.assertEquals(QueryInterruptedException.UNKNOWN_EXCEPTION, exception.getErrorCode());
Assert.assertEquals(ValidationException.class.getName(), exception.getErrorClass());
Assert.assertTrue(exception.getMessage().contains("Column 'dim3' not found in any table"));
}
@Test
public void testCannotConvert() throws Exception
{
expectedException.expect(QueryInterruptedException.class);
expectedException.expectMessage("Cannot build plan for query: SELECT TRIM(dim1) FROM druid.foo");
// TRIM unsupported
doPost(new SqlQuery("SELECT TRIM(dim1) FROM druid.foo", null));
final QueryInterruptedException exception = doPost(new SqlQuery("SELECT TRIM(dim1) FROM druid.foo", null)).lhs;
Assert.fail();
Assert.assertNotNull(exception);
Assert.assertEquals(QueryInterruptedException.UNKNOWN_EXCEPTION, exception.getErrorCode());
Assert.assertEquals(ISE.class.getName(), exception.getErrorClass());
Assert.assertTrue(exception.getMessage().contains("Cannot build plan for query: SELECT TRIM(dim1) FROM druid.foo"));
}
private List<Map<String, Object>> doPost(final SqlQuery query) throws Exception
@Test
public void testResourceLimitExceeded() throws Exception
{
final QueryInterruptedException exception = doPost(
new SqlQuery(
"SELECT DISTINCT dim1 FROM foo",
ImmutableMap.<String, Object>of(
"maxMergingDictionarySize", 1
)
)
).lhs;
Assert.assertNotNull(exception);
Assert.assertEquals(exception.getErrorCode(), QueryInterruptedException.RESOURCE_LIMIT_EXCEEDED);
Assert.assertEquals(exception.getErrorClass(), ResourceLimitExceededException.class.getName());
}
// Returns either an error or a result.
private Pair<QueryInterruptedException, List<Map<String, Object>>> doPost(final SqlQuery query) throws Exception
{
final Response response = resource.doPost(query);
if (response.getStatus() == 200) {
final StreamingOutput output = (StreamingOutput) response.getEntity();
final ByteArrayOutputStream baos = new ByteArrayOutputStream();
output.write(baos);
return JSON_MAPPER.readValue(
baos.toByteArray(),
new TypeReference<List<Map<String, Object>>>()
{
}
return Pair.of(
null,
JSON_MAPPER.<List<Map<String, Object>>>readValue(
baos.toByteArray(),
new TypeReference<List<Map<String, Object>>>()
{
}
)
);
} else {
throw JSON_MAPPER.readValue((byte[]) response.getEntity(), QueryInterruptedException.class);
return Pair.of(
JSON_MAPPER.readValue((byte[]) response.getEntity(), QueryInterruptedException.class),
null
);
}
}
}