diff --git a/sql/src/main/java/io/druid/sql/http/SqlResource.java b/sql/src/main/java/io/druid/sql/http/SqlResource.java index 256491def84..9c8cf6bdff1 100644 --- a/sql/src/main/java/io/druid/sql/http/SqlResource.java +++ b/sql/src/main/java/io/druid/sql/http/SqlResource.java @@ -22,6 +22,7 @@ package io.druid.sql.http; import com.fasterxml.jackson.core.JsonGenerator; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Preconditions; +import com.google.common.base.Throwables; import com.google.inject.Inject; import io.druid.guice.annotations.Json; import io.druid.java.util.common.ISE; @@ -84,6 +85,74 @@ public class SqlResource try (final DruidPlanner planner = plannerFactory.createPlanner(sqlQuery.getContext())) { plannerResult = planner.plan(sqlQuery.getQuery()); timeZone = planner.getPlannerContext().getTimeZone(); + + // Remember which columns are time-typed, so we can emit ISO8601 instead of millis values. + final List fieldList = plannerResult.rowType().getFieldList(); + final boolean[] timeColumns = new boolean[fieldList.size()]; + final boolean[] dateColumns = new boolean[fieldList.size()]; + for (int i = 0; i < fieldList.size(); i++) { + final SqlTypeName sqlTypeName = fieldList.get(i).getType().getSqlTypeName(); + timeColumns[i] = sqlTypeName == SqlTypeName.TIMESTAMP; + dateColumns[i] = sqlTypeName == SqlTypeName.DATE; + } + + final Yielder yielder0 = Yielders.each(plannerResult.run()); + + try { + return Response.ok( + new StreamingOutput() + { + @Override + public void write(final OutputStream outputStream) throws IOException, WebApplicationException + { + Yielder yielder = yielder0; + + try (final JsonGenerator jsonGenerator = jsonMapper.getFactory().createGenerator(outputStream)) { + jsonGenerator.writeStartArray(); + + while (!yielder.isDone()) { + final Object[] row = yielder.get(); + jsonGenerator.writeStartObject(); + for (int i = 0; i < fieldList.size(); i++) { + final Object value; + + if (timeColumns[i]) { + value = ISODateTimeFormat.dateTime().print( + Calcites.calciteTimestampToJoda((long) row[i], timeZone) + ); + } else if (dateColumns[i]) { + value = ISODateTimeFormat.dateTime().print( + Calcites.calciteDateToJoda((int) row[i], timeZone) + ); + } else { + value = row[i]; + } + + jsonGenerator.writeObjectField(fieldList.get(i).getName(), value); + } + jsonGenerator.writeEndObject(); + yielder = yielder.next(null); + } + + jsonGenerator.writeEndArray(); + jsonGenerator.flush(); + + // End with CRLF + outputStream.write('\r'); + outputStream.write('\n'); + } + finally { + yielder.close(); + } + } + } + ).build(); + } + catch (Throwable e) { + // make sure to close yielder if anything happened before starting to serialize the response. + yielder0.close(); + throw Throwables.propagate(e); + } } catch (Exception e) { log.warn(e, "Failed to handle query: %s", sqlQuery); @@ -101,66 +170,5 @@ public class SqlResource .entity(jsonMapper.writeValueAsBytes(QueryInterruptedException.wrapIfNeeded(exceptionToReport))) .build(); } - - // Remember which columns are time-typed, so we can emit ISO8601 instead of millis values. - final List fieldList = plannerResult.rowType().getFieldList(); - final boolean[] timeColumns = new boolean[fieldList.size()]; - final boolean[] dateColumns = new boolean[fieldList.size()]; - for (int i = 0; i < fieldList.size(); i++) { - final SqlTypeName sqlTypeName = fieldList.get(i).getType().getSqlTypeName(); - timeColumns[i] = sqlTypeName == SqlTypeName.TIMESTAMP; - dateColumns[i] = sqlTypeName == SqlTypeName.DATE; - } - - final Yielder yielder0 = Yielders.each(plannerResult.run()); - - return Response.ok( - new StreamingOutput() - { - @Override - public void write(final OutputStream outputStream) throws IOException, WebApplicationException - { - Yielder yielder = yielder0; - - try (final JsonGenerator jsonGenerator = jsonMapper.getFactory().createGenerator(outputStream)) { - jsonGenerator.writeStartArray(); - - while (!yielder.isDone()) { - final Object[] row = yielder.get(); - jsonGenerator.writeStartObject(); - for (int i = 0; i < fieldList.size(); i++) { - final Object value; - - if (timeColumns[i]) { - value = ISODateTimeFormat.dateTime().print( - Calcites.calciteTimestampToJoda((long) row[i], timeZone) - ); - } else if (dateColumns[i]) { - value = ISODateTimeFormat.dateTime().print( - Calcites.calciteDateToJoda((int) row[i], timeZone) - ); - } else { - value = row[i]; - } - - jsonGenerator.writeObjectField(fieldList.get(i).getName(), value); - } - jsonGenerator.writeEndObject(); - yielder = yielder.next(null); - } - - jsonGenerator.writeEndArray(); - jsonGenerator.flush(); - - // End with CRLF - outputStream.write('\r'); - outputStream.write('\n'); - } - finally { - yielder.close(); - } - } - } - ).build(); } } diff --git a/sql/src/test/java/io/druid/sql/calcite/http/SqlResourceTest.java b/sql/src/test/java/io/druid/sql/calcite/http/SqlResourceTest.java index e8798e62e94..392d19fbf74 100644 --- a/sql/src/test/java/io/druid/sql/calcite/http/SqlResourceTest.java +++ b/sql/src/test/java/io/druid/sql/calcite/http/SqlResourceTest.java @@ -24,7 +24,10 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import io.druid.jackson.DefaultObjectMapper; +import io.druid.java.util.common.ISE; +import io.druid.java.util.common.Pair; import io.druid.query.QueryInterruptedException; +import io.druid.query.ResourceLimitExceededException; import io.druid.sql.calcite.planner.Calcites; import io.druid.sql.calcite.planner.DruidOperatorTable; import io.druid.sql.calcite.planner.PlannerConfig; @@ -36,12 +39,12 @@ import io.druid.sql.calcite.util.SpecificSegmentsQuerySegmentWalker; import io.druid.sql.http.SqlQuery; import io.druid.sql.http.SqlResource; import org.apache.calcite.schema.SchemaPlus; +import org.apache.calcite.tools.ValidationException; import org.junit.After; import org.junit.Assert; import org.junit.Before; import org.junit.Rule; import org.junit.Test; -import org.junit.rules.ExpectedException; import org.junit.rules.TemporaryFolder; import javax.ws.rs.core.Response; @@ -54,9 +57,6 @@ public class SqlResourceTest { private static final ObjectMapper JSON_MAPPER = new DefaultObjectMapper(); - @Rule - public ExpectedException expectedException = ExpectedException.none(); - @Rule public TemporaryFolder temporaryFolder = new TemporaryFolder(); @@ -92,7 +92,7 @@ public class SqlResourceTest { final List> rows = doPost( new SqlQuery("SELECT COUNT(*) AS cnt FROM druid.foo", null) - ); + ).rhs; Assert.assertEquals( ImmutableList.of( @@ -107,7 +107,7 @@ public class SqlResourceTest { final List> rows = doPost( new SqlQuery("SELECT __time, CAST(__time AS DATE) AS t2 FROM druid.foo LIMIT 1", null) - ); + ).rhs; Assert.assertEquals( ImmutableList.of( @@ -125,7 +125,7 @@ public class SqlResourceTest "SELECT __time, CAST(__time AS DATE) AS t2 FROM druid.foo LIMIT 1", ImmutableMap.of(PlannerContext.CTX_SQL_TIME_ZONE, "America/Los_Angeles") ) - ); + ).rhs; Assert.assertEquals( ImmutableList.of( @@ -140,7 +140,7 @@ public class SqlResourceTest { final List> rows = doPost( new SqlQuery("SELECT dim2 \"x\", dim2 \"y\" FROM druid.foo LIMIT 1", null) - ); + ).rhs; Assert.assertEquals( ImmutableList.of( @@ -155,7 +155,7 @@ public class SqlResourceTest { final List> rows = doPost( new SqlQuery("SELECT dim2 \"x\", dim2 \"y\" FROM druid.foo GROUP BY dim2", null) - ); + ).rhs; Assert.assertEquals( ImmutableList.of( @@ -172,7 +172,7 @@ public class SqlResourceTest { final List> rows = doPost( new SqlQuery("EXPLAIN PLAN FOR SELECT COUNT(*) AS cnt FROM druid.foo", null) - ); + ).rhs; Assert.assertEquals( ImmutableList.of( @@ -188,43 +188,65 @@ public class SqlResourceTest @Test public void testCannotValidate() throws Exception { - expectedException.expect(QueryInterruptedException.class); - expectedException.expectMessage("Column 'dim3' not found in any table"); + final QueryInterruptedException exception = doPost(new SqlQuery("SELECT dim3 FROM druid.foo", null)).lhs; - doPost( - new SqlQuery("SELECT dim3 FROM druid.foo", null) - ); - - Assert.fail(); + Assert.assertNotNull(exception); + Assert.assertEquals(QueryInterruptedException.UNKNOWN_EXCEPTION, exception.getErrorCode()); + Assert.assertEquals(ValidationException.class.getName(), exception.getErrorClass()); + Assert.assertTrue(exception.getMessage().contains("Column 'dim3' not found in any table")); } @Test public void testCannotConvert() throws Exception { - expectedException.expect(QueryInterruptedException.class); - expectedException.expectMessage("Cannot build plan for query: SELECT TRIM(dim1) FROM druid.foo"); - // TRIM unsupported - doPost(new SqlQuery("SELECT TRIM(dim1) FROM druid.foo", null)); + final QueryInterruptedException exception = doPost(new SqlQuery("SELECT TRIM(dim1) FROM druid.foo", null)).lhs; - Assert.fail(); + Assert.assertNotNull(exception); + Assert.assertEquals(QueryInterruptedException.UNKNOWN_EXCEPTION, exception.getErrorCode()); + Assert.assertEquals(ISE.class.getName(), exception.getErrorClass()); + Assert.assertTrue(exception.getMessage().contains("Cannot build plan for query: SELECT TRIM(dim1) FROM druid.foo")); } - private List> doPost(final SqlQuery query) throws Exception + @Test + public void testResourceLimitExceeded() throws Exception + { + final QueryInterruptedException exception = doPost( + new SqlQuery( + "SELECT DISTINCT dim1 FROM foo", + ImmutableMap.of( + "maxMergingDictionarySize", 1 + ) + ) + ).lhs; + + Assert.assertNotNull(exception); + Assert.assertEquals(exception.getErrorCode(), QueryInterruptedException.RESOURCE_LIMIT_EXCEEDED); + Assert.assertEquals(exception.getErrorClass(), ResourceLimitExceededException.class.getName()); + } + + // Returns either an error or a result. + private Pair>> doPost(final SqlQuery query) throws Exception { final Response response = resource.doPost(query); if (response.getStatus() == 200) { final StreamingOutput output = (StreamingOutput) response.getEntity(); final ByteArrayOutputStream baos = new ByteArrayOutputStream(); output.write(baos); - return JSON_MAPPER.readValue( - baos.toByteArray(), - new TypeReference>>() - { - } + return Pair.of( + null, + JSON_MAPPER.>>readValue( + baos.toByteArray(), + new TypeReference>>() + { + } + ) ); } else { - throw JSON_MAPPER.readValue((byte[]) response.getEntity(), QueryInterruptedException.class); + return Pair.of( + JSON_MAPPER.readValue((byte[]) response.getEntity(), QueryInterruptedException.class), + null + ); } } }