diff --git a/api/src/main/java/org/apache/druid/indexer/TaskStatusPlus.java b/api/src/main/java/org/apache/druid/indexer/TaskStatusPlus.java index 82c76b6867a..e58d83111f1 100644 --- a/api/src/main/java/org/apache/druid/indexer/TaskStatusPlus.java +++ b/api/src/main/java/org/apache/druid/indexer/TaskStatusPlus.java @@ -22,6 +22,8 @@ package org.apache.druid.indexer; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Preconditions; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.common.logger.Logger; import org.joda.time.DateTime; import javax.annotation.Nullable; @@ -29,11 +31,13 @@ import java.util.Objects; public class TaskStatusPlus { + private static final Logger log = new Logger(TaskStatusPlus.class); + private final String id; private final String type; private final DateTime createdTime; private final DateTime queueInsertionTime; - private final TaskState state; + private final TaskState statusCode; private final RunnerTaskState runnerTaskState; private final Long duration; private final TaskLocation location; @@ -42,13 +46,43 @@ public class TaskStatusPlus @Nullable private final String errorMsg; + public TaskStatusPlus( + String id, + String type, // nullable for backward compatibility + DateTime createdTime, + DateTime queueInsertionTime, + @Nullable TaskState statusCode, + @Nullable RunnerTaskState runnerStatusCode, + @Nullable Long duration, + TaskLocation location, + @Nullable String dataSource, // nullable for backward compatibility + @Nullable String errorMsg + ) + { + this( + id, + type, + createdTime, + queueInsertionTime, + statusCode, + statusCode, + runnerStatusCode, + duration, + location, + dataSource, + errorMsg + ); + } + + @JsonCreator public TaskStatusPlus( @JsonProperty("id") String id, @JsonProperty("type") @Nullable String type, // nullable for backward compatibility @JsonProperty("createdTime") DateTime createdTime, @JsonProperty("queueInsertionTime") DateTime queueInsertionTime, - @JsonProperty("statusCode") @Nullable TaskState state, + @JsonProperty("statusCode") @Nullable TaskState statusCode, + @Deprecated @JsonProperty("status") @Nullable TaskState status, // present for backwards compatibility @JsonProperty("runnerStatusCode") @Nullable RunnerTaskState runnerTaskState, @JsonProperty("duration") @Nullable Long duration, @JsonProperty("location") TaskLocation location, @@ -56,14 +90,24 @@ public class TaskStatusPlus @JsonProperty("errorMsg") @Nullable String errorMsg ) { - if (state != null && state.isComplete()) { + if (statusCode != null && statusCode.isComplete()) { Preconditions.checkNotNull(duration, "duration"); } this.id = Preconditions.checkNotNull(id, "id"); this.type = type; this.createdTime = Preconditions.checkNotNull(createdTime, "createdTime"); this.queueInsertionTime = Preconditions.checkNotNull(queueInsertionTime, "queueInsertionTime"); - this.state = state; + //checks for deserialization safety + if (statusCode != null && status == null) { + this.statusCode = statusCode; + } else if (statusCode == null && status != null) { + this.statusCode = status; + } else { + if (statusCode != null && status != null && statusCode != status) { + throw new RuntimeException(StringUtils.format("statusCode[%s] and status[%s] must match", statusCode, status)); + } + this.statusCode = statusCode; + } this.runnerTaskState = runnerTaskState; this.duration = duration; this.location = Preconditions.checkNotNull(location, "location"); @@ -98,14 +142,22 @@ public class TaskStatusPlus @Nullable @JsonProperty("statusCode") - public TaskState getState() + public TaskState getStatusCode() { - return state; + return statusCode; + } + + @Deprecated + @Nullable + @JsonProperty("status") + public TaskState getStatus() + { + return statusCode; } @Nullable @JsonProperty("runnerStatusCode") - public RunnerTaskState getRunnerTaskState() + public RunnerTaskState getRunnerStatusCode() { return runnerTaskState; } @@ -150,7 +202,7 @@ public class TaskStatusPlus Objects.equals(getType(), that.getType()) && Objects.equals(getCreatedTime(), that.getCreatedTime()) && Objects.equals(getQueueInsertionTime(), that.getQueueInsertionTime()) && - getState() == that.getState() && + getStatusCode() == that.getStatusCode() && Objects.equals(getDuration(), that.getDuration()) && Objects.equals(getLocation(), that.getLocation()) && Objects.equals(getDataSource(), that.getDataSource()) && @@ -165,7 +217,7 @@ public class TaskStatusPlus getType(), getCreatedTime(), getQueueInsertionTime(), - getState(), + getStatusCode(), getDuration(), getLocation(), getDataSource(), @@ -181,7 +233,7 @@ public class TaskStatusPlus ", type='" + type + '\'' + ", createdTime=" + createdTime + ", queueInsertionTime=" + queueInsertionTime + - ", state=" + state + + ", statusCode=" + statusCode + ", duration=" + duration + ", location=" + location + ", dataSource='" + dataSource + '\'' + diff --git a/api/src/test/java/org/apache/druid/indexer/TaskStatusPlusTest.java b/api/src/test/java/org/apache/druid/indexer/TaskStatusPlusTest.java index 47dce44b91f..f99ea0acbd5 100644 --- a/api/src/test/java/org/apache/druid/indexer/TaskStatusPlusTest.java +++ b/api/src/test/java/org/apache/druid/indexer/TaskStatusPlusTest.java @@ -61,6 +61,45 @@ public class TaskStatusPlusTest Assert.assertEquals(status, mapper.readValue(json, TaskStatusPlus.class)); } + @Test + public void testJsonAttributes() throws IOException + { + final ObjectMapper mapper = new ObjectMapper(); + mapper.registerModule( + new SimpleModule() + .addDeserializer(DateTime.class, new DateTimeDeserializer()) + .addSerializer(DateTime.class, ToStringSerializer.instance) + ); + final String json = "{\n" + + "\"id\": \"testId\",\n" + + "\"type\": \"testType\",\n" + + "\"createdTime\": \"2018-09-17T06:35:17.392Z\",\n" + + "\"queueInsertionTime\": \"2018-09-17T06:35:17.392Z\",\n" + + "\"statusCode\": \"RUNNING\",\n" + + "\"status\": \"RUNNING\",\n" + + "\"runnerStatusCode\": \"RUNNING\",\n" + + "\"duration\": 1000,\n" + + "\"location\": {\n" + + "\"host\": \"testHost\",\n" + + "\"port\": 1010,\n" + + "\"tlsPort\": -1\n" + + "},\n" + + "\"dataSource\": \"ds_test\",\n" + + "\"errorMsg\": null\n" + + "}"; + TaskStatusPlus taskStatusPlus = mapper.readValue(json, TaskStatusPlus.class); + Assert.assertNotNull(taskStatusPlus); + Assert.assertNotNull(taskStatusPlus.getStatusCode()); + Assert.assertTrue(taskStatusPlus.getStatusCode().isRunnable()); + Assert.assertNotNull(taskStatusPlus.getRunnerStatusCode()); + + String serialized = mapper.writeValueAsString(taskStatusPlus); + + Assert.assertTrue(serialized.contains("\"status\":")); + Assert.assertTrue(serialized.contains("\"statusCode\":")); + Assert.assertTrue(serialized.contains("\"runnerStatusCode\":")); + } + // Copied from org.apache.druid.jackson.JodaStuff private static class DateTimeDeserializer extends StdDeserializer { diff --git a/docs/content/querying/sql.md b/docs/content/querying/sql.md index 5d2b4558f6b..2e28f46ff64 100644 --- a/docs/content/querying/sql.md +++ b/docs/content/querying/sql.md @@ -369,12 +369,7 @@ Metadata is available over the HTTP API by querying [system tables](#retrieving- #### Responses -All Druid SQL HTTP responses include a "X-Druid-Column-Names" header with a JSON-encoded array of columns that -will appear in the result rows and an "X-Druid-Column-Types" header with a JSON-encoded array of -[types](#data-types-and-casts). - -For the result rows themselves, Druid SQL supports a variety of result formats. You can -specify these by adding a "resultFormat" parameter, like: +Druid SQL supports a variety of result formats. You can specify these by adding a "resultFormat" parameter, like: ```json { @@ -393,6 +388,20 @@ The supported result formats are: |`arrayLines`|Like "array", but the JSON arrays are separated by newlines instead of being wrapped in a JSON array. This can make it easier to parse the entire response set as a stream, if you do not have ready access to a streaming JSON parser. To make it possible to detect a truncated response, this format includes a trailer of one blank line.|text/plain| |`csv`|Comma-separated values, with one row per line. Individual field values may be escaped by being surrounded in double quotes. If double quotes appear in a field value, they will be escaped by replacing them with double-double-quotes like `""this""`. To make it possible to detect a truncated response, this format includes a trailer of one blank line.|text/csv| +You can additionally request a header by setting "header" to true in your request, like: + +```json +{ + "query" : "SELECT COUNT(*) FROM data_source WHERE foo = 'bar' AND __time > TIMESTAMP '2000-01-01 00:00:00'", + "resultFormat" : "arrayLines", + "header" : true +} +``` + +In this case, the first result returned will be a header. For the `csv`, `array`, and `arrayLines` formats, the header +will be a list of column names. For the `object` and `objectLines` formats, the header will be an object where the +keys are column names, and the values are null. + Errors that occur before the response body is sent will be reported in JSON, with an HTTP 500 status code, in the same format as [native Druid query errors](../querying/querying.html#query-errors). If an error occurs while the response body is being sent, at that point it is too late to change the HTTP status code or report a JSON error, so the response will diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/ClientBasedTaskInfoProvider.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/ClientBasedTaskInfoProvider.java index f80163037fa..cc480bcf9c0 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/ClientBasedTaskInfoProvider.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/ClientBasedTaskInfoProvider.java @@ -50,6 +50,6 @@ public class ClientBasedTaskInfoProvider implements TaskInfoProvider final TaskStatusResponse response = client.getTaskStatus(id); return response == null ? Optional.absent() : - Optional.of(TaskStatus.fromCode(id, response.getStatus().getState())); + Optional.of(TaskStatus.fromCode(id, response.getStatus().getStatusCode())); } } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/TaskHistory.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/TaskHistory.java index 9bce0732759..48161e9cc5c 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/TaskHistory.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/TaskHistory.java @@ -39,10 +39,10 @@ class TaskHistory { attemptHistory.forEach(status -> { Preconditions.checkState( - status.getState() == TaskState.SUCCESS || status.getState() == TaskState.FAILED, + status.getStatusCode() == TaskState.SUCCESS || status.getStatusCode() == TaskState.FAILED, "Complete tasks should be recorded, but the state of task[%s] is [%s]", status.getId(), - status.getState() + status.getStatusCode() ); }); this.spec = spec; diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/TaskMonitor.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/TaskMonitor.java index 91c9957787c..87440119ff0 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/TaskMonitor.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/TaskMonitor.java @@ -115,7 +115,7 @@ public class TaskMonitor final TaskStatusResponse taskStatusResponse = indexingServiceClient.getTaskStatus(taskId); final TaskStatusPlus taskStatus = taskStatusResponse.getStatus(); if (taskStatus != null) { - switch (Preconditions.checkNotNull(taskStatus.getState(), "taskState")) { + switch (Preconditions.checkNotNull(taskStatus.getStatusCode(), "taskState")) { case SUCCESS: incrementNumSucceededTasks(); @@ -152,7 +152,7 @@ public class TaskMonitor monitorEntry.updateStatus(taskStatus); break; default: - throw new ISE("Unknown taskStatus[%s] for task[%s[", taskStatus.getState(), taskId); + throw new ISE("Unknown taskStatus[%s] for task[%s[", taskStatus.getStatusCode(), taskId); } } } @@ -459,7 +459,7 @@ public class TaskMonitor TaskState getLastState() { - return lastStatus == null ? TaskState.FAILED : lastStatus.getState(); + return lastStatus == null ? TaskState.FAILED : lastStatus.getStatusCode(); } @Nullable diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskResourceTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskResourceTest.java index 8a7503b837b..ff3caedfab5 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskResourceTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskResourceTest.java @@ -364,7 +364,7 @@ public class ParallelIndexSupervisorTaskResourceTest extends AbstractParallelInd .filter(entry -> { final TaskStatusPlus currentStatus = entry.getValue().getCurrentStatus(); return currentStatus != null && - (currentStatus.getState() == TaskState.SUCCESS || currentStatus.getState() == TaskState.FAILED); + (currentStatus.getStatusCode() == TaskState.SUCCESS || currentStatus.getStatusCode() == TaskState.FAILED); }) .map(Entry::getKey) .findFirst() diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/TaskMonitorTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/TaskMonitorTest.java index 80cbe816a63..7e86e4072fc 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/TaskMonitorTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/TaskMonitorTest.java @@ -87,7 +87,7 @@ public class TaskMonitorTest Assert.assertEquals("supervisorId", result.getSpec().getSupervisorTaskId()); Assert.assertEquals("specId" + i, result.getSpec().getId()); Assert.assertNotNull(result.getLastStatus()); - Assert.assertEquals(TaskState.SUCCESS, result.getLastStatus().getState()); + Assert.assertEquals(TaskState.SUCCESS, result.getLastStatus().getStatusCode()); Assert.assertEquals(TaskState.SUCCESS, result.getLastState()); } } @@ -113,7 +113,7 @@ public class TaskMonitorTest Assert.assertEquals("specId" + i, result.getSpec().getId()); Assert.assertNotNull(result.getLastStatus()); - Assert.assertEquals(TaskState.SUCCESS, result.getLastStatus().getState()); + Assert.assertEquals(TaskState.SUCCESS, result.getLastStatus().getStatusCode()); Assert.assertEquals(TaskState.SUCCESS, result.getLastState()); final TaskHistory taskHistory = monitor.getCompleteSubTaskSpecHistory(specs.get(i).getId()); @@ -122,8 +122,8 @@ public class TaskMonitorTest final List attemptHistory = taskHistory.getAttemptHistory(); Assert.assertNotNull(attemptHistory); Assert.assertEquals(3, attemptHistory.size()); - Assert.assertEquals(TaskState.FAILED, attemptHistory.get(0).getState()); - Assert.assertEquals(TaskState.FAILED, attemptHistory.get(1).getState()); + Assert.assertEquals(TaskState.FAILED, attemptHistory.get(0).getStatusCode()); + Assert.assertEquals(TaskState.FAILED, attemptHistory.get(1).getStatusCode()); } } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java index 85302bda5fe..7964c764069 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java @@ -881,6 +881,8 @@ public class OverlordResourceTest TestHelper.makeJsonMapper().writeValueAsString(response1.getEntity()), TaskStatusResponse.class ); + TaskStatusPlus tsp = taskStatusResponse1.getStatus(); + Assert.assertEquals(tsp.getStatusCode(), tsp.getStatus()); Assert.assertEquals( new TaskStatusResponse( "mytask", diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordTest.java index 1ff113c04bf..8c1f33a1fe7 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordTest.java @@ -248,7 +248,7 @@ public class OverlordTest Assert.assertEquals(taskId_0, ((TaskStatusResponse) response.getEntity()).getTask()); Assert.assertEquals( TaskStatus.running(taskId_0).getStatusCode(), - ((TaskStatusResponse) response.getEntity()).getStatus().getState() + ((TaskStatusResponse) response.getEntity()).getStatus().getStatusCode() ); // Simulate completion of task_0 @@ -296,7 +296,7 @@ public class OverlordTest { while (true) { Response response = overlordResource.getTaskStatus(taskId); - if (status.equals(((TaskStatusResponse) response.getEntity()).getStatus().getState())) { + if (status.equals(((TaskStatusResponse) response.getEntity()).getStatus().getStatusCode())) { break; } Thread.sleep(10); diff --git a/integration-tests/src/main/java/org/apache/druid/testing/clients/OverlordResourceTestClient.java b/integration-tests/src/main/java/org/apache/druid/testing/clients/OverlordResourceTestClient.java index eee9eaa75d9..a1d7e51b6f3 100644 --- a/integration-tests/src/main/java/org/apache/druid/testing/clients/OverlordResourceTestClient.java +++ b/integration-tests/src/main/java/org/apache/druid/testing/clients/OverlordResourceTestClient.java @@ -132,7 +132,7 @@ public class OverlordResourceTestClient { } ); - return taskStatusResponse.getStatus().getState(); + return taskStatusResponse.getStatus().getStatusCode(); } catch (Exception e) { throw Throwables.propagate(e); diff --git a/integration-tests/src/main/java/org/apache/druid/testing/clients/TaskResponseObject.java b/integration-tests/src/main/java/org/apache/druid/testing/clients/TaskResponseObject.java index 1d275f7feac..501cd5e1398 100644 --- a/integration-tests/src/main/java/org/apache/druid/testing/clients/TaskResponseObject.java +++ b/integration-tests/src/main/java/org/apache/druid/testing/clients/TaskResponseObject.java @@ -21,7 +21,7 @@ package org.apache.druid.testing.clients; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; -import org.apache.druid.indexer.TaskStatus; +import org.apache.druid.indexer.TaskState; import org.joda.time.DateTime; public class TaskResponseObject @@ -30,14 +30,14 @@ public class TaskResponseObject private final String id; private final DateTime createdTime; private final DateTime queueInsertionTime; - private final TaskStatus status; + private final TaskState status; @JsonCreator private TaskResponseObject( @JsonProperty("id") String id, @JsonProperty("createdTime") DateTime createdTime, @JsonProperty("queueInsertionTime") DateTime queueInsertionTime, - @JsonProperty("status") TaskStatus status + @JsonProperty("status") TaskState status ) { this.id = id; @@ -65,7 +65,7 @@ public class TaskResponseObject } @SuppressWarnings("unused") // Used by Jackson serialization? - public TaskStatus getStatus() + public TaskState getStatus() { return status; } diff --git a/sql/src/main/java/org/apache/druid/sql/http/ArrayLinesWriter.java b/sql/src/main/java/org/apache/druid/sql/http/ArrayLinesWriter.java index 01ee28d6923..29eae278198 100644 --- a/sql/src/main/java/org/apache/druid/sql/http/ArrayLinesWriter.java +++ b/sql/src/main/java/org/apache/druid/sql/http/ArrayLinesWriter.java @@ -26,6 +26,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import javax.annotation.Nullable; import java.io.IOException; import java.io.OutputStream; +import java.util.List; public class ArrayLinesWriter implements ResultFormat.Writer { @@ -55,6 +56,18 @@ public class ArrayLinesWriter implements ResultFormat.Writer outputStream.flush(); } + @Override + public void writeHeader(final List columnNames) throws IOException + { + jsonGenerator.writeStartArray(); + + for (String columnName : columnNames) { + jsonGenerator.writeString(columnName); + } + + jsonGenerator.writeEndArray(); + } + @Override public void writeRowStart() throws IOException { diff --git a/sql/src/main/java/org/apache/druid/sql/http/ArrayWriter.java b/sql/src/main/java/org/apache/druid/sql/http/ArrayWriter.java index 9871fddb503..c177cf39815 100644 --- a/sql/src/main/java/org/apache/druid/sql/http/ArrayWriter.java +++ b/sql/src/main/java/org/apache/druid/sql/http/ArrayWriter.java @@ -25,6 +25,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import javax.annotation.Nullable; import java.io.IOException; import java.io.OutputStream; +import java.util.List; public class ArrayWriter implements ResultFormat.Writer { @@ -53,6 +54,18 @@ public class ArrayWriter implements ResultFormat.Writer outputStream.write('\n'); } + @Override + public void writeHeader(final List columnNames) throws IOException + { + jsonGenerator.writeStartArray(); + + for (String columnName : columnNames) { + jsonGenerator.writeString(columnName); + } + + jsonGenerator.writeEndArray(); + } + @Override public void writeRowStart() throws IOException { diff --git a/sql/src/main/java/org/apache/druid/sql/http/CsvWriter.java b/sql/src/main/java/org/apache/druid/sql/http/CsvWriter.java index a118374af68..d89c752c3d2 100644 --- a/sql/src/main/java/org/apache/druid/sql/http/CsvWriter.java +++ b/sql/src/main/java/org/apache/druid/sql/http/CsvWriter.java @@ -58,6 +58,12 @@ public class CsvWriter implements ResultFormat.Writer outputStream.flush(); } + @Override + public void writeHeader(final List columnNames) + { + writer.writeNext(columnNames.toArray(new String[0]), false); + } + @Override public void writeRowStart() { diff --git a/sql/src/main/java/org/apache/druid/sql/http/ObjectLinesWriter.java b/sql/src/main/java/org/apache/druid/sql/http/ObjectLinesWriter.java index 9b040dde1af..887b27248cc 100644 --- a/sql/src/main/java/org/apache/druid/sql/http/ObjectLinesWriter.java +++ b/sql/src/main/java/org/apache/druid/sql/http/ObjectLinesWriter.java @@ -26,6 +26,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import javax.annotation.Nullable; import java.io.IOException; import java.io.OutputStream; +import java.util.List; public class ObjectLinesWriter implements ResultFormat.Writer { @@ -40,7 +41,7 @@ public class ObjectLinesWriter implements ResultFormat.Writer } @Override - public void writeResponseStart() throws IOException + public void writeResponseStart() { // Do nothing. } @@ -55,6 +56,18 @@ public class ObjectLinesWriter implements ResultFormat.Writer outputStream.flush(); } + @Override + public void writeHeader(final List columnNames) throws IOException + { + jsonGenerator.writeStartObject(); + + for (String columnName : columnNames) { + jsonGenerator.writeNullField(columnName); + } + + jsonGenerator.writeEndObject(); + } + @Override public void writeRowStart() throws IOException { diff --git a/sql/src/main/java/org/apache/druid/sql/http/ObjectWriter.java b/sql/src/main/java/org/apache/druid/sql/http/ObjectWriter.java index 76a65f61875..b1623a53cf8 100644 --- a/sql/src/main/java/org/apache/druid/sql/http/ObjectWriter.java +++ b/sql/src/main/java/org/apache/druid/sql/http/ObjectWriter.java @@ -25,6 +25,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import javax.annotation.Nullable; import java.io.IOException; import java.io.OutputStream; +import java.util.List; public class ObjectWriter implements ResultFormat.Writer { @@ -53,6 +54,18 @@ public class ObjectWriter implements ResultFormat.Writer outputStream.write('\n'); } + @Override + public void writeHeader(final List columnNames) throws IOException + { + jsonGenerator.writeStartObject(); + + for (String columnName : columnNames) { + jsonGenerator.writeNullField(columnName); + } + + jsonGenerator.writeEndObject(); + } + @Override public void writeRowStart() throws IOException { diff --git a/sql/src/main/java/org/apache/druid/sql/http/ResultFormat.java b/sql/src/main/java/org/apache/druid/sql/http/ResultFormat.java index 8d21fcbfd60..2e95993510e 100644 --- a/sql/src/main/java/org/apache/druid/sql/http/ResultFormat.java +++ b/sql/src/main/java/org/apache/druid/sql/http/ResultFormat.java @@ -28,6 +28,7 @@ import javax.ws.rs.core.MediaType; import java.io.Closeable; import java.io.IOException; import java.io.OutputStream; +import java.util.List; public enum ResultFormat { @@ -112,6 +113,8 @@ public enum ResultFormat */ void writeResponseStart() throws IOException; + void writeHeader(List columnNames) throws IOException; + /** * Start of each result row. */ diff --git a/sql/src/main/java/org/apache/druid/sql/http/SqlQuery.java b/sql/src/main/java/org/apache/druid/sql/http/SqlQuery.java index 86e6268c440..4e2c8739a42 100644 --- a/sql/src/main/java/org/apache/druid/sql/http/SqlQuery.java +++ b/sql/src/main/java/org/apache/druid/sql/http/SqlQuery.java @@ -31,17 +31,20 @@ public class SqlQuery { private final String query; private final ResultFormat resultFormat; + private final boolean header; private final Map context; @JsonCreator public SqlQuery( @JsonProperty("query") final String query, @JsonProperty("resultFormat") final ResultFormat resultFormat, + @JsonProperty("header") final boolean header, @JsonProperty("context") final Map context ) { this.query = Preconditions.checkNotNull(query, "query"); this.resultFormat = resultFormat == null ? ResultFormat.OBJECT : resultFormat; + this.header = header; this.context = context == null ? ImmutableMap.of() : context; } @@ -57,6 +60,12 @@ public class SqlQuery return resultFormat; } + @JsonProperty("header") + public boolean includeHeader() + { + return header; + } + @JsonProperty public Map getContext() { @@ -73,7 +82,8 @@ public class SqlQuery return false; } final SqlQuery sqlQuery = (SqlQuery) o; - return Objects.equals(query, sqlQuery.query) && + return header == sqlQuery.header && + Objects.equals(query, sqlQuery.query) && resultFormat == sqlQuery.resultFormat && Objects.equals(context, sqlQuery.context); } @@ -81,7 +91,7 @@ public class SqlQuery @Override public int hashCode() { - return Objects.hash(query, resultFormat, context); + return Objects.hash(query, resultFormat, header, context); } @Override @@ -90,6 +100,7 @@ public class SqlQuery return "SqlQuery{" + "query='" + query + '\'' + ", resultFormat=" + resultFormat + + ", header=" + header + ", context=" + context + '}'; } diff --git a/sql/src/main/java/org/apache/druid/sql/http/SqlResource.java b/sql/src/main/java/org/apache/druid/sql/http/SqlResource.java index 0406855a210..74a3597a105 100644 --- a/sql/src/main/java/org/apache/druid/sql/http/SqlResource.java +++ b/sql/src/main/java/org/apache/druid/sql/http/SqlResource.java @@ -23,6 +23,9 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Preconditions; import com.google.common.base.Throwables; import com.google.inject.Inject; +import org.apache.calcite.plan.RelOptPlanner; +import org.apache.calcite.rel.type.RelDataTypeField; +import org.apache.calcite.sql.type.SqlTypeName; import org.apache.druid.guice.annotations.Json; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.guava.Yielder; @@ -34,9 +37,6 @@ import org.apache.druid.sql.calcite.planner.Calcites; import org.apache.druid.sql.calcite.planner.DruidPlanner; import org.apache.druid.sql.calcite.planner.PlannerFactory; import org.apache.druid.sql.calcite.planner.PlannerResult; -import org.apache.calcite.plan.RelOptPlanner; -import org.apache.calcite.rel.type.RelDataTypeField; -import org.apache.calcite.sql.type.SqlTypeName; import org.joda.time.DateTimeZone; import org.joda.time.format.ISODateTimeFormat; @@ -52,6 +52,7 @@ import javax.ws.rs.core.Response; import javax.ws.rs.core.StreamingOutput; import java.io.IOException; import java.io.OutputStream; +import java.util.Arrays; import java.util.List; @Path("/druid/v2/sql/") @@ -93,14 +94,12 @@ public class SqlResource final boolean[] timeColumns = new boolean[fieldList.size()]; final boolean[] dateColumns = new boolean[fieldList.size()]; final String[] columnNames = new String[fieldList.size()]; - final String[] columnTypes = new String[fieldList.size()]; for (int i = 0; i < fieldList.size(); i++) { final SqlTypeName sqlTypeName = fieldList.get(i).getType().getSqlTypeName(); timeColumns[i] = sqlTypeName == SqlTypeName.TIMESTAMP; dateColumns[i] = sqlTypeName == SqlTypeName.DATE; columnNames[i] = fieldList.get(i).getName(); - columnTypes[i] = sqlTypeName.getName(); } final Yielder yielder0 = Yielders.each(plannerResult.run()); @@ -119,6 +118,10 @@ public class SqlResource .createFormatter(outputStream, jsonMapper)) { writer.writeResponseStart(); + if (sqlQuery.includeHeader()) { + writer.writeHeader(Arrays.asList(columnNames)); + } + while (!yielder.isDone()) { final Object[] row = yielder.get(); writer.writeRowStart(); @@ -151,8 +154,6 @@ public class SqlResource } } ) - .header("X-Druid-Column-Names", jsonMapper.writeValueAsString(columnNames)) - .header("X-Druid-Column-Types", jsonMapper.writeValueAsString(columnTypes)) .build(); } catch (Throwable e) { diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/http/SqlQueryTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/http/SqlQueryTest.java index 014f0394a32..aa85c70bb6e 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/http/SqlQueryTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/http/SqlQueryTest.java @@ -34,7 +34,7 @@ public class SqlQueryTest extends CalciteTestBase public void testSerde() throws Exception { final ObjectMapper jsonMapper = TestHelper.makeJsonMapper(); - final SqlQuery query = new SqlQuery("SELECT 1", ResultFormat.ARRAY, ImmutableMap.of("useCache", false)); + final SqlQuery query = new SqlQuery("SELECT 1", ResultFormat.ARRAY, true, ImmutableMap.of("useCache", false)); Assert.assertEquals(query, jsonMapper.readValue(jsonMapper.writeValueAsString(query), SqlQuery.class)); } } diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/http/SqlResourceTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/http/SqlResourceTest.java index e75c4a7dd2e..470e400eb7e 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/http/SqlResourceTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/http/SqlResourceTest.java @@ -25,6 +25,7 @@ import com.google.common.base.Splitter; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Maps; +import org.apache.calcite.tools.ValidationException; import org.apache.druid.client.TimelineServerView; import org.apache.druid.common.config.NullHandling; import org.apache.druid.discovery.DruidLeaderClient; @@ -52,7 +53,6 @@ import org.apache.druid.sql.calcite.util.TestServerInventoryView; import org.apache.druid.sql.http.ResultFormat; import org.apache.druid.sql.http.SqlQuery; import org.apache.druid.sql.http.SqlResource; -import org.apache.calcite.tools.ValidationException; import org.easymock.EasyMock; import org.junit.After; import org.junit.AfterClass; @@ -159,34 +159,11 @@ public class SqlResourceTest extends CalciteTestBase walker = null; } - @Test - public void testXDruidColumnHeaders() throws Exception - { - final Response response = resource.doPost( - new SqlQuery( - "SELECT FLOOR(__time TO DAY) as \"day\", COUNT(*) as TheCount, SUM(m1) FROM druid.foo GROUP BY 1", - ResultFormat.OBJECT, - null - ), - req - ); - - Assert.assertEquals( - "[\"day\",\"TheCount\",\"EXPR$2\"]", - response.getMetadata().getFirst("X-Druid-Column-Names") - ); - - Assert.assertEquals( - "[\"TIMESTAMP\",\"BIGINT\",\"DOUBLE\"]", - response.getMetadata().getFirst("X-Druid-Column-Types") - ); - } - @Test public void testCountStar() throws Exception { final List> rows = doPost( - new SqlQuery("SELECT COUNT(*) AS cnt, 'foo' AS TheFoo FROM druid.foo", null, null) + new SqlQuery("SELECT COUNT(*) AS cnt, 'foo' AS TheFoo FROM druid.foo", null, false, null) ).rhs; Assert.assertEquals( @@ -204,6 +181,7 @@ public class SqlResourceTest extends CalciteTestBase new SqlQuery( "SELECT __time, CAST(__time AS DATE) AS t2 FROM druid.foo LIMIT 1", ResultFormat.OBJECT, + false, null ) ).rhs; @@ -223,6 +201,7 @@ public class SqlResourceTest extends CalciteTestBase new SqlQuery( "SELECT __time, CAST(__time AS DATE) AS t2 FROM druid.foo LIMIT 1", ResultFormat.OBJECT, + false, ImmutableMap.of(PlannerContext.CTX_SQL_TIME_ZONE, "America/Los_Angeles") ) ).rhs; @@ -239,7 +218,7 @@ public class SqlResourceTest extends CalciteTestBase public void testFieldAliasingSelect() throws Exception { final List> rows = doPost( - new SqlQuery("SELECT dim2 \"x\", dim2 \"y\" FROM druid.foo LIMIT 1", ResultFormat.OBJECT, null) + new SqlQuery("SELECT dim2 \"x\", dim2 \"y\" FROM druid.foo LIMIT 1", ResultFormat.OBJECT, false, null) ).rhs; Assert.assertEquals( @@ -254,7 +233,7 @@ public class SqlResourceTest extends CalciteTestBase public void testFieldAliasingGroupBy() throws Exception { final List> rows = doPost( - new SqlQuery("SELECT dim2 \"x\", dim2 \"y\" FROM druid.foo GROUP BY dim2", ResultFormat.OBJECT, null) + new SqlQuery("SELECT dim2 \"x\", dim2 \"y\" FROM druid.foo GROUP BY dim2", ResultFormat.OBJECT, false, null) ).rhs; Assert.assertEquals( @@ -284,9 +263,43 @@ public class SqlResourceTest extends CalciteTestBase Assert.assertEquals( ImmutableList.of( Arrays.asList("2000-01-01T00:00:00.000Z", 1, "", "a", 1.0, 1.0, "org.apache.druid.hll.HLLCV1", nullStr), - Arrays.asList("2000-01-02T00:00:00.000Z", 1, "10.1", nullStr, 2.0, 2.0, "org.apache.druid.hll.HLLCV1", nullStr) + Arrays.asList( + "2000-01-02T00:00:00.000Z", + 1, + "10.1", + nullStr, + 2.0, + 2.0, + "org.apache.druid.hll.HLLCV1", + nullStr + ) ), - doPost(new SqlQuery(query, ResultFormat.ARRAY, null), new TypeReference>>() {}).rhs + doPost(new SqlQuery(query, ResultFormat.ARRAY, false, null), new TypeReference>>() {}).rhs + ); + } + + @Test + public void testArrayResultFormatWithHeader() throws Exception + { + final String query = "SELECT *, CASE dim2 WHEN '' THEN dim2 END FROM foo LIMIT 2"; + final String nullStr = NullHandling.replaceWithDefault() ? "" : null; + + Assert.assertEquals( + ImmutableList.of( + Arrays.asList("__time", "cnt", "dim1", "dim2", "m1", "m2", "unique_dim1", "EXPR$7"), + Arrays.asList("2000-01-01T00:00:00.000Z", 1, "", "a", 1.0, 1.0, "org.apache.druid.hll.HLLCV1", nullStr), + Arrays.asList( + "2000-01-02T00:00:00.000Z", + 1, + "10.1", + nullStr, + 2.0, + 2.0, + "org.apache.druid.hll.HLLCV1", + nullStr + ) + ), + doPost(new SqlQuery(query, ResultFormat.ARRAY, true, null), new TypeReference>>() {}).rhs ); } @@ -294,7 +307,7 @@ public class SqlResourceTest extends CalciteTestBase public void testArrayLinesResultFormat() throws Exception { final String query = "SELECT *, CASE dim2 WHEN '' THEN dim2 END FROM foo LIMIT 2"; - final String response = doPostRaw(new SqlQuery(query, ResultFormat.ARRAYLINES, null)).rhs; + final String response = doPostRaw(new SqlQuery(query, ResultFormat.ARRAYLINES, false, null)).rhs; final String nullStr = NullHandling.replaceWithDefault() ? "" : null; final List lines = Splitter.on('\n').splitToList(response); @@ -311,6 +324,31 @@ public class SqlResourceTest extends CalciteTestBase Assert.assertEquals("", lines.get(3)); } + @Test + public void testArrayLinesResultFormatWithHeader() throws Exception + { + final String query = "SELECT *, CASE dim2 WHEN '' THEN dim2 END FROM foo LIMIT 2"; + final String response = doPostRaw(new SqlQuery(query, ResultFormat.ARRAYLINES, true, null)).rhs; + final String nullStr = NullHandling.replaceWithDefault() ? "" : null; + final List lines = Splitter.on('\n').splitToList(response); + + Assert.assertEquals(5, lines.size()); + Assert.assertEquals( + Arrays.asList("__time", "cnt", "dim1", "dim2", "m1", "m2", "unique_dim1", "EXPR$7"), + JSON_MAPPER.readValue(lines.get(0), List.class) + ); + Assert.assertEquals( + Arrays.asList("2000-01-01T00:00:00.000Z", 1, "", "a", 1.0, 1.0, "org.apache.druid.hll.HLLCV1", nullStr), + JSON_MAPPER.readValue(lines.get(1), List.class) + ); + Assert.assertEquals( + Arrays.asList("2000-01-02T00:00:00.000Z", 1, "10.1", nullStr, 2.0, 2.0, "org.apache.druid.hll.HLLCV1", nullStr), + JSON_MAPPER.readValue(lines.get(2), List.class) + ); + Assert.assertEquals("", lines.get(3)); + Assert.assertEquals("", lines.get(4)); + } + @Test public void testObjectResultFormat() throws Exception { @@ -348,7 +386,10 @@ public class SqlResourceTest extends CalciteTestBase .put("EXPR$7", "") .build() ).stream().map(transformer).collect(Collectors.toList()), - doPost(new SqlQuery(query, ResultFormat.OBJECT, null), new TypeReference>>() {}).rhs + doPost( + new SqlQuery(query, ResultFormat.OBJECT, false, null), + new TypeReference>>() {} + ).rhs ); } @@ -356,7 +397,7 @@ public class SqlResourceTest extends CalciteTestBase public void testObjectLinesResultFormat() throws Exception { final String query = "SELECT *, CASE dim2 WHEN '' THEN dim2 END FROM foo LIMIT 2"; - final String response = doPostRaw(new SqlQuery(query, ResultFormat.OBJECTLINES, null)).rhs; + final String response = doPostRaw(new SqlQuery(query, ResultFormat.OBJECTLINES, false, null)).rhs; final String nullStr = NullHandling.replaceWithDefault() ? "" : null; final Function, Map> transformer = m -> { return Maps.transformEntries( @@ -407,7 +448,7 @@ public class SqlResourceTest extends CalciteTestBase public void testCsvResultFormat() throws Exception { final String query = "SELECT *, CASE dim2 WHEN '' THEN dim2 END FROM foo LIMIT 2"; - final String response = doPostRaw(new SqlQuery(query, ResultFormat.CSV, null)).rhs; + final String response = doPostRaw(new SqlQuery(query, ResultFormat.CSV, false, null)).rhs; final List lines = Splitter.on('\n').splitToList(response); Assert.assertEquals( @@ -421,11 +462,30 @@ public class SqlResourceTest extends CalciteTestBase ); } + @Test + public void testCsvResultFormatWithHeaders() throws Exception + { + final String query = "SELECT *, CASE dim2 WHEN '' THEN dim2 END FROM foo LIMIT 2"; + final String response = doPostRaw(new SqlQuery(query, ResultFormat.CSV, true, null)).rhs; + final List lines = Splitter.on('\n').splitToList(response); + + Assert.assertEquals( + ImmutableList.of( + "__time,cnt,dim1,dim2,m1,m2,unique_dim1,EXPR$7", + "2000-01-01T00:00:00.000Z,1,,a,1.0,1.0,org.apache.druid.hll.HLLCV1,", + "2000-01-02T00:00:00.000Z,1,10.1,,2.0,2.0,org.apache.druid.hll.HLLCV1,", + "", + "" + ), + lines + ); + } + @Test public void testExplainCountStar() throws Exception { final List> rows = doPost( - new SqlQuery("EXPLAIN PLAN FOR SELECT COUNT(*) AS cnt FROM druid.foo", ResultFormat.OBJECT, null) + new SqlQuery("EXPLAIN PLAN FOR SELECT COUNT(*) AS cnt FROM druid.foo", ResultFormat.OBJECT, false, null) ).rhs; Assert.assertEquals( @@ -446,6 +506,7 @@ public class SqlResourceTest extends CalciteTestBase new SqlQuery( "SELECT dim3 FROM druid.foo", ResultFormat.OBJECT, + false, null ) ).lhs; @@ -461,7 +522,7 @@ public class SqlResourceTest extends CalciteTestBase { // SELECT + ORDER unsupported final QueryInterruptedException exception = doPost( - new SqlQuery("SELECT dim1 FROM druid.foo ORDER BY dim1", ResultFormat.OBJECT, null) + new SqlQuery("SELECT dim1 FROM druid.foo ORDER BY dim1", ResultFormat.OBJECT, false, null) ).lhs; Assert.assertNotNull(exception); @@ -480,6 +541,7 @@ public class SqlResourceTest extends CalciteTestBase new SqlQuery( "SELECT DISTINCT dim1 FROM foo", ResultFormat.OBJECT, + false, ImmutableMap.of("maxMergingDictionarySize", 1) ) ).lhs;