SQL: Fix too-long headers in http responses. (#6411)

Fixes #6409 by moving column name info from HTTP headers into the
result body.
This commit is contained in:
Gian Merlino 2018-10-01 18:13:08 -07:00 committed by Fangjin Yang
parent 42e5385e56
commit 244046fda5
11 changed files with 196 additions and 52 deletions

View File

@ -369,12 +369,7 @@ Metadata is available over the HTTP API by querying [system tables](#retrieving-
#### Responses
All Druid SQL HTTP responses include a "X-Druid-Column-Names" header with a JSON-encoded array of columns that
will appear in the result rows and an "X-Druid-Column-Types" header with a JSON-encoded array of
[types](#data-types-and-casts).
For the result rows themselves, Druid SQL supports a variety of result formats. You can
specify these by adding a "resultFormat" parameter, like:
Druid SQL supports a variety of result formats. You can specify these by adding a "resultFormat" parameter, like:
```json
{
@ -393,6 +388,20 @@ The supported result formats are:
|`arrayLines`|Like "array", but the JSON arrays are separated by newlines instead of being wrapped in a JSON array. This can make it easier to parse the entire response set as a stream, if you do not have ready access to a streaming JSON parser. To make it possible to detect a truncated response, this format includes a trailer of one blank line.|text/plain|
|`csv`|Comma-separated values, with one row per line. Individual field values may be escaped by being surrounded in double quotes. If double quotes appear in a field value, they will be escaped by replacing them with double-double-quotes like `""this""`. To make it possible to detect a truncated response, this format includes a trailer of one blank line.|text/csv|
You can additionally request a header by setting "header" to true in your request, like:
```json
{
"query" : "SELECT COUNT(*) FROM data_source WHERE foo = 'bar' AND __time > TIMESTAMP '2000-01-01 00:00:00'",
"resultFormat" : "arrayLines",
"header" : true
}
```
In this case, the first result returned will be a header. For the `csv`, `array`, and `arrayLines` formats, the header
will be a list of column names. For the `object` and `objectLines` formats, the header will be an object where the
keys are column names, and the values are null.
Errors that occur before the response body is sent will be reported in JSON, with an HTTP 500 status code, in the
same format as [native Druid query errors](../querying/querying.html#query-errors). If an error occurs while the response body is
being sent, at that point it is too late to change the HTTP status code or report a JSON error, so the response will

View File

@ -26,6 +26,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import javax.annotation.Nullable;
import java.io.IOException;
import java.io.OutputStream;
import java.util.List;
public class ArrayLinesWriter implements ResultFormat.Writer
{
@ -55,6 +56,18 @@ public class ArrayLinesWriter implements ResultFormat.Writer
outputStream.flush();
}
@Override
public void writeHeader(final List<String> columnNames) throws IOException
{
jsonGenerator.writeStartArray();
for (String columnName : columnNames) {
jsonGenerator.writeString(columnName);
}
jsonGenerator.writeEndArray();
}
@Override
public void writeRowStart() throws IOException
{

View File

@ -25,6 +25,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import javax.annotation.Nullable;
import java.io.IOException;
import java.io.OutputStream;
import java.util.List;
public class ArrayWriter implements ResultFormat.Writer
{
@ -53,6 +54,18 @@ public class ArrayWriter implements ResultFormat.Writer
outputStream.write('\n');
}
@Override
public void writeHeader(final List<String> columnNames) throws IOException
{
jsonGenerator.writeStartArray();
for (String columnName : columnNames) {
jsonGenerator.writeString(columnName);
}
jsonGenerator.writeEndArray();
}
@Override
public void writeRowStart() throws IOException
{

View File

@ -58,6 +58,12 @@ public class CsvWriter implements ResultFormat.Writer
outputStream.flush();
}
@Override
public void writeHeader(final List<String> columnNames)
{
writer.writeNext(columnNames.toArray(new String[0]), false);
}
@Override
public void writeRowStart()
{

View File

@ -26,6 +26,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import javax.annotation.Nullable;
import java.io.IOException;
import java.io.OutputStream;
import java.util.List;
public class ObjectLinesWriter implements ResultFormat.Writer
{
@ -40,7 +41,7 @@ public class ObjectLinesWriter implements ResultFormat.Writer
}
@Override
public void writeResponseStart() throws IOException
public void writeResponseStart()
{
// Do nothing.
}
@ -55,6 +56,18 @@ public class ObjectLinesWriter implements ResultFormat.Writer
outputStream.flush();
}
@Override
public void writeHeader(final List<String> columnNames) throws IOException
{
jsonGenerator.writeStartObject();
for (String columnName : columnNames) {
jsonGenerator.writeNullField(columnName);
}
jsonGenerator.writeEndObject();
}
@Override
public void writeRowStart() throws IOException
{

View File

@ -25,6 +25,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import javax.annotation.Nullable;
import java.io.IOException;
import java.io.OutputStream;
import java.util.List;
public class ObjectWriter implements ResultFormat.Writer
{
@ -53,6 +54,18 @@ public class ObjectWriter implements ResultFormat.Writer
outputStream.write('\n');
}
@Override
public void writeHeader(final List<String> columnNames) throws IOException
{
jsonGenerator.writeStartObject();
for (String columnName : columnNames) {
jsonGenerator.writeNullField(columnName);
}
jsonGenerator.writeEndObject();
}
@Override
public void writeRowStart() throws IOException
{

View File

@ -28,6 +28,7 @@ import javax.ws.rs.core.MediaType;
import java.io.Closeable;
import java.io.IOException;
import java.io.OutputStream;
import java.util.List;
public enum ResultFormat
{
@ -112,6 +113,8 @@ public enum ResultFormat
*/
void writeResponseStart() throws IOException;
void writeHeader(List<String> columnNames) throws IOException;
/**
* Start of each result row.
*/

View File

@ -31,17 +31,20 @@ public class SqlQuery
{
private final String query;
private final ResultFormat resultFormat;
private final boolean header;
private final Map<String, Object> context;
@JsonCreator
public SqlQuery(
@JsonProperty("query") final String query,
@JsonProperty("resultFormat") final ResultFormat resultFormat,
@JsonProperty("header") final boolean header,
@JsonProperty("context") final Map<String, Object> context
)
{
this.query = Preconditions.checkNotNull(query, "query");
this.resultFormat = resultFormat == null ? ResultFormat.OBJECT : resultFormat;
this.header = header;
this.context = context == null ? ImmutableMap.of() : context;
}
@ -57,6 +60,12 @@ public class SqlQuery
return resultFormat;
}
@JsonProperty("header")
public boolean includeHeader()
{
return header;
}
@JsonProperty
public Map<String, Object> getContext()
{
@ -73,7 +82,8 @@ public class SqlQuery
return false;
}
final SqlQuery sqlQuery = (SqlQuery) o;
return Objects.equals(query, sqlQuery.query) &&
return header == sqlQuery.header &&
Objects.equals(query, sqlQuery.query) &&
resultFormat == sqlQuery.resultFormat &&
Objects.equals(context, sqlQuery.context);
}
@ -81,7 +91,7 @@ public class SqlQuery
@Override
public int hashCode()
{
return Objects.hash(query, resultFormat, context);
return Objects.hash(query, resultFormat, header, context);
}
@Override
@ -90,6 +100,7 @@ public class SqlQuery
return "SqlQuery{" +
"query='" + query + '\'' +
", resultFormat=" + resultFormat +
", header=" + header +
", context=" + context +
'}';
}

View File

@ -23,6 +23,9 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.inject.Inject;
import org.apache.calcite.plan.RelOptPlanner;
import org.apache.calcite.rel.type.RelDataTypeField;
import org.apache.calcite.sql.type.SqlTypeName;
import org.apache.druid.guice.annotations.Json;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.guava.Yielder;
@ -34,9 +37,6 @@ import org.apache.druid.sql.calcite.planner.Calcites;
import org.apache.druid.sql.calcite.planner.DruidPlanner;
import org.apache.druid.sql.calcite.planner.PlannerFactory;
import org.apache.druid.sql.calcite.planner.PlannerResult;
import org.apache.calcite.plan.RelOptPlanner;
import org.apache.calcite.rel.type.RelDataTypeField;
import org.apache.calcite.sql.type.SqlTypeName;
import org.joda.time.DateTimeZone;
import org.joda.time.format.ISODateTimeFormat;
@ -52,6 +52,7 @@ import javax.ws.rs.core.Response;
import javax.ws.rs.core.StreamingOutput;
import java.io.IOException;
import java.io.OutputStream;
import java.util.Arrays;
import java.util.List;
@Path("/druid/v2/sql/")
@ -93,14 +94,12 @@ public class SqlResource
final boolean[] timeColumns = new boolean[fieldList.size()];
final boolean[] dateColumns = new boolean[fieldList.size()];
final String[] columnNames = new String[fieldList.size()];
final String[] columnTypes = new String[fieldList.size()];
for (int i = 0; i < fieldList.size(); i++) {
final SqlTypeName sqlTypeName = fieldList.get(i).getType().getSqlTypeName();
timeColumns[i] = sqlTypeName == SqlTypeName.TIMESTAMP;
dateColumns[i] = sqlTypeName == SqlTypeName.DATE;
columnNames[i] = fieldList.get(i).getName();
columnTypes[i] = sqlTypeName.getName();
}
final Yielder<Object[]> yielder0 = Yielders.each(plannerResult.run());
@ -119,6 +118,10 @@ public class SqlResource
.createFormatter(outputStream, jsonMapper)) {
writer.writeResponseStart();
if (sqlQuery.includeHeader()) {
writer.writeHeader(Arrays.asList(columnNames));
}
while (!yielder.isDone()) {
final Object[] row = yielder.get();
writer.writeRowStart();
@ -151,8 +154,6 @@ public class SqlResource
}
}
)
.header("X-Druid-Column-Names", jsonMapper.writeValueAsString(columnNames))
.header("X-Druid-Column-Types", jsonMapper.writeValueAsString(columnTypes))
.build();
}
catch (Throwable e) {

View File

@ -34,7 +34,7 @@ public class SqlQueryTest extends CalciteTestBase
public void testSerde() throws Exception
{
final ObjectMapper jsonMapper = TestHelper.makeJsonMapper();
final SqlQuery query = new SqlQuery("SELECT 1", ResultFormat.ARRAY, ImmutableMap.of("useCache", false));
final SqlQuery query = new SqlQuery("SELECT 1", ResultFormat.ARRAY, true, ImmutableMap.of("useCache", false));
Assert.assertEquals(query, jsonMapper.readValue(jsonMapper.writeValueAsString(query), SqlQuery.class));
}
}

View File

@ -25,6 +25,7 @@ import com.google.common.base.Splitter;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import org.apache.calcite.tools.ValidationException;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.ISE;
@ -49,7 +50,6 @@ import org.apache.druid.sql.calcite.util.SpecificSegmentsQuerySegmentWalker;
import org.apache.druid.sql.http.ResultFormat;
import org.apache.druid.sql.http.SqlQuery;
import org.apache.druid.sql.http.SqlResource;
import org.apache.calcite.tools.ValidationException;
import org.easymock.EasyMock;
import org.junit.After;
import org.junit.AfterClass;
@ -151,34 +151,11 @@ public class SqlResourceTest extends CalciteTestBase
walker = null;
}
@Test
public void testXDruidColumnHeaders() throws Exception
{
final Response response = resource.doPost(
new SqlQuery(
"SELECT FLOOR(__time TO DAY) as \"day\", COUNT(*) as TheCount, SUM(m1) FROM druid.foo GROUP BY 1",
ResultFormat.OBJECT,
null
),
req
);
Assert.assertEquals(
"[\"day\",\"TheCount\",\"EXPR$2\"]",
response.getMetadata().getFirst("X-Druid-Column-Names")
);
Assert.assertEquals(
"[\"TIMESTAMP\",\"BIGINT\",\"DOUBLE\"]",
response.getMetadata().getFirst("X-Druid-Column-Types")
);
}
@Test
public void testCountStar() throws Exception
{
final List<Map<String, Object>> rows = doPost(
new SqlQuery("SELECT COUNT(*) AS cnt, 'foo' AS TheFoo FROM druid.foo", null, null)
new SqlQuery("SELECT COUNT(*) AS cnt, 'foo' AS TheFoo FROM druid.foo", null, false, null)
).rhs;
Assert.assertEquals(
@ -196,6 +173,7 @@ public class SqlResourceTest extends CalciteTestBase
new SqlQuery(
"SELECT __time, CAST(__time AS DATE) AS t2 FROM druid.foo LIMIT 1",
ResultFormat.OBJECT,
false,
null
)
).rhs;
@ -215,6 +193,7 @@ public class SqlResourceTest extends CalciteTestBase
new SqlQuery(
"SELECT __time, CAST(__time AS DATE) AS t2 FROM druid.foo LIMIT 1",
ResultFormat.OBJECT,
false,
ImmutableMap.of(PlannerContext.CTX_SQL_TIME_ZONE, "America/Los_Angeles")
)
).rhs;
@ -231,7 +210,7 @@ public class SqlResourceTest extends CalciteTestBase
public void testFieldAliasingSelect() throws Exception
{
final List<Map<String, Object>> rows = doPost(
new SqlQuery("SELECT dim2 \"x\", dim2 \"y\" FROM druid.foo LIMIT 1", ResultFormat.OBJECT, null)
new SqlQuery("SELECT dim2 \"x\", dim2 \"y\" FROM druid.foo LIMIT 1", ResultFormat.OBJECT, false, null)
).rhs;
Assert.assertEquals(
@ -246,7 +225,7 @@ public class SqlResourceTest extends CalciteTestBase
public void testFieldAliasingGroupBy() throws Exception
{
final List<Map<String, Object>> rows = doPost(
new SqlQuery("SELECT dim2 \"x\", dim2 \"y\" FROM druid.foo GROUP BY dim2", ResultFormat.OBJECT, null)
new SqlQuery("SELECT dim2 \"x\", dim2 \"y\" FROM druid.foo GROUP BY dim2", ResultFormat.OBJECT, false, null)
).rhs;
Assert.assertEquals(
@ -276,9 +255,43 @@ public class SqlResourceTest extends CalciteTestBase
Assert.assertEquals(
ImmutableList.of(
Arrays.asList("2000-01-01T00:00:00.000Z", 1, "", "a", 1.0, 1.0, "org.apache.druid.hll.HLLCV1", nullStr),
Arrays.asList("2000-01-02T00:00:00.000Z", 1, "10.1", nullStr, 2.0, 2.0, "org.apache.druid.hll.HLLCV1", nullStr)
Arrays.asList(
"2000-01-02T00:00:00.000Z",
1,
"10.1",
nullStr,
2.0,
2.0,
"org.apache.druid.hll.HLLCV1",
nullStr
)
),
doPost(new SqlQuery(query, ResultFormat.ARRAY, null), new TypeReference<List<List<Object>>>() {}).rhs
doPost(new SqlQuery(query, ResultFormat.ARRAY, false, null), new TypeReference<List<List<Object>>>() {}).rhs
);
}
@Test
public void testArrayResultFormatWithHeader() throws Exception
{
final String query = "SELECT *, CASE dim2 WHEN '' THEN dim2 END FROM foo LIMIT 2";
final String nullStr = NullHandling.replaceWithDefault() ? "" : null;
Assert.assertEquals(
ImmutableList.of(
Arrays.asList("__time", "cnt", "dim1", "dim2", "m1", "m2", "unique_dim1", "EXPR$7"),
Arrays.asList("2000-01-01T00:00:00.000Z", 1, "", "a", 1.0, 1.0, "org.apache.druid.hll.HLLCV1", nullStr),
Arrays.asList(
"2000-01-02T00:00:00.000Z",
1,
"10.1",
nullStr,
2.0,
2.0,
"org.apache.druid.hll.HLLCV1",
nullStr
)
),
doPost(new SqlQuery(query, ResultFormat.ARRAY, true, null), new TypeReference<List<List<Object>>>() {}).rhs
);
}
@ -286,7 +299,7 @@ public class SqlResourceTest extends CalciteTestBase
public void testArrayLinesResultFormat() throws Exception
{
final String query = "SELECT *, CASE dim2 WHEN '' THEN dim2 END FROM foo LIMIT 2";
final String response = doPostRaw(new SqlQuery(query, ResultFormat.ARRAYLINES, null)).rhs;
final String response = doPostRaw(new SqlQuery(query, ResultFormat.ARRAYLINES, false, null)).rhs;
final String nullStr = NullHandling.replaceWithDefault() ? "" : null;
final List<String> lines = Splitter.on('\n').splitToList(response);
@ -303,6 +316,31 @@ public class SqlResourceTest extends CalciteTestBase
Assert.assertEquals("", lines.get(3));
}
@Test
public void testArrayLinesResultFormatWithHeader() throws Exception
{
final String query = "SELECT *, CASE dim2 WHEN '' THEN dim2 END FROM foo LIMIT 2";
final String response = doPostRaw(new SqlQuery(query, ResultFormat.ARRAYLINES, true, null)).rhs;
final String nullStr = NullHandling.replaceWithDefault() ? "" : null;
final List<String> lines = Splitter.on('\n').splitToList(response);
Assert.assertEquals(5, lines.size());
Assert.assertEquals(
Arrays.asList("__time", "cnt", "dim1", "dim2", "m1", "m2", "unique_dim1", "EXPR$7"),
JSON_MAPPER.readValue(lines.get(0), List.class)
);
Assert.assertEquals(
Arrays.asList("2000-01-01T00:00:00.000Z", 1, "", "a", 1.0, 1.0, "org.apache.druid.hll.HLLCV1", nullStr),
JSON_MAPPER.readValue(lines.get(1), List.class)
);
Assert.assertEquals(
Arrays.asList("2000-01-02T00:00:00.000Z", 1, "10.1", nullStr, 2.0, 2.0, "org.apache.druid.hll.HLLCV1", nullStr),
JSON_MAPPER.readValue(lines.get(2), List.class)
);
Assert.assertEquals("", lines.get(3));
Assert.assertEquals("", lines.get(4));
}
@Test
public void testObjectResultFormat() throws Exception
{
@ -340,7 +378,10 @@ public class SqlResourceTest extends CalciteTestBase
.put("EXPR$7", "")
.build()
).stream().map(transformer).collect(Collectors.toList()),
doPost(new SqlQuery(query, ResultFormat.OBJECT, null), new TypeReference<List<Map<String, Object>>>() {}).rhs
doPost(
new SqlQuery(query, ResultFormat.OBJECT, false, null),
new TypeReference<List<Map<String, Object>>>() {}
).rhs
);
}
@ -348,7 +389,7 @@ public class SqlResourceTest extends CalciteTestBase
public void testObjectLinesResultFormat() throws Exception
{
final String query = "SELECT *, CASE dim2 WHEN '' THEN dim2 END FROM foo LIMIT 2";
final String response = doPostRaw(new SqlQuery(query, ResultFormat.OBJECTLINES, null)).rhs;
final String response = doPostRaw(new SqlQuery(query, ResultFormat.OBJECTLINES, false, null)).rhs;
final String nullStr = NullHandling.replaceWithDefault() ? "" : null;
final Function<Map<String, Object>, Map<String, Object>> transformer = m -> {
return Maps.transformEntries(
@ -399,7 +440,7 @@ public class SqlResourceTest extends CalciteTestBase
public void testCsvResultFormat() throws Exception
{
final String query = "SELECT *, CASE dim2 WHEN '' THEN dim2 END FROM foo LIMIT 2";
final String response = doPostRaw(new SqlQuery(query, ResultFormat.CSV, null)).rhs;
final String response = doPostRaw(new SqlQuery(query, ResultFormat.CSV, false, null)).rhs;
final List<String> lines = Splitter.on('\n').splitToList(response);
Assert.assertEquals(
@ -413,11 +454,30 @@ public class SqlResourceTest extends CalciteTestBase
);
}
@Test
public void testCsvResultFormatWithHeaders() throws Exception
{
final String query = "SELECT *, CASE dim2 WHEN '' THEN dim2 END FROM foo LIMIT 2";
final String response = doPostRaw(new SqlQuery(query, ResultFormat.CSV, true, null)).rhs;
final List<String> lines = Splitter.on('\n').splitToList(response);
Assert.assertEquals(
ImmutableList.of(
"__time,cnt,dim1,dim2,m1,m2,unique_dim1,EXPR$7",
"2000-01-01T00:00:00.000Z,1,,a,1.0,1.0,org.apache.druid.hll.HLLCV1,",
"2000-01-02T00:00:00.000Z,1,10.1,,2.0,2.0,org.apache.druid.hll.HLLCV1,",
"",
""
),
lines
);
}
@Test
public void testExplainCountStar() throws Exception
{
final List<Map<String, Object>> rows = doPost(
new SqlQuery("EXPLAIN PLAN FOR SELECT COUNT(*) AS cnt FROM druid.foo", ResultFormat.OBJECT, null)
new SqlQuery("EXPLAIN PLAN FOR SELECT COUNT(*) AS cnt FROM druid.foo", ResultFormat.OBJECT, false, null)
).rhs;
Assert.assertEquals(
@ -438,6 +498,7 @@ public class SqlResourceTest extends CalciteTestBase
new SqlQuery(
"SELECT dim3 FROM druid.foo",
ResultFormat.OBJECT,
false,
null
)
).lhs;
@ -453,7 +514,7 @@ public class SqlResourceTest extends CalciteTestBase
{
// SELECT + ORDER unsupported
final QueryInterruptedException exception = doPost(
new SqlQuery("SELECT dim1 FROM druid.foo ORDER BY dim1", ResultFormat.OBJECT, null)
new SqlQuery("SELECT dim1 FROM druid.foo ORDER BY dim1", ResultFormat.OBJECT, false, null)
).lhs;
Assert.assertNotNull(exception);
@ -472,6 +533,7 @@ public class SqlResourceTest extends CalciteTestBase
new SqlQuery(
"SELECT DISTINCT dim1 FROM foo",
ResultFormat.OBJECT,
false,
ImmutableMap.of("maxMergingDictionarySize", 1)
)
).lhs;