mirror of https://github.com/apache/druid.git
Merge branch 'master' of github.com:druid-io/druid into system-table
This commit is contained in:
commit
ba7afe9dd3
|
@ -22,6 +22,8 @@ package org.apache.druid.indexer;
|
|||
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import com.google.common.base.Preconditions;
|
||||
import org.apache.druid.java.util.common.StringUtils;
|
||||
import org.apache.druid.java.util.common.logger.Logger;
|
||||
import org.joda.time.DateTime;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
|
@ -29,11 +31,13 @@ import java.util.Objects;
|
|||
|
||||
public class TaskStatusPlus
|
||||
{
|
||||
private static final Logger log = new Logger(TaskStatusPlus.class);
|
||||
|
||||
private final String id;
|
||||
private final String type;
|
||||
private final DateTime createdTime;
|
||||
private final DateTime queueInsertionTime;
|
||||
private final TaskState state;
|
||||
private final TaskState statusCode;
|
||||
private final RunnerTaskState runnerTaskState;
|
||||
private final Long duration;
|
||||
private final TaskLocation location;
|
||||
|
@ -42,13 +46,43 @@ public class TaskStatusPlus
|
|||
@Nullable
|
||||
private final String errorMsg;
|
||||
|
||||
public TaskStatusPlus(
|
||||
String id,
|
||||
String type, // nullable for backward compatibility
|
||||
DateTime createdTime,
|
||||
DateTime queueInsertionTime,
|
||||
@Nullable TaskState statusCode,
|
||||
@Nullable RunnerTaskState runnerStatusCode,
|
||||
@Nullable Long duration,
|
||||
TaskLocation location,
|
||||
@Nullable String dataSource, // nullable for backward compatibility
|
||||
@Nullable String errorMsg
|
||||
)
|
||||
{
|
||||
this(
|
||||
id,
|
||||
type,
|
||||
createdTime,
|
||||
queueInsertionTime,
|
||||
statusCode,
|
||||
statusCode,
|
||||
runnerStatusCode,
|
||||
duration,
|
||||
location,
|
||||
dataSource,
|
||||
errorMsg
|
||||
);
|
||||
}
|
||||
|
||||
|
||||
@JsonCreator
|
||||
public TaskStatusPlus(
|
||||
@JsonProperty("id") String id,
|
||||
@JsonProperty("type") @Nullable String type, // nullable for backward compatibility
|
||||
@JsonProperty("createdTime") DateTime createdTime,
|
||||
@JsonProperty("queueInsertionTime") DateTime queueInsertionTime,
|
||||
@JsonProperty("statusCode") @Nullable TaskState state,
|
||||
@JsonProperty("statusCode") @Nullable TaskState statusCode,
|
||||
@Deprecated @JsonProperty("status") @Nullable TaskState status, // present for backwards compatibility
|
||||
@JsonProperty("runnerStatusCode") @Nullable RunnerTaskState runnerTaskState,
|
||||
@JsonProperty("duration") @Nullable Long duration,
|
||||
@JsonProperty("location") TaskLocation location,
|
||||
|
@ -56,14 +90,24 @@ public class TaskStatusPlus
|
|||
@JsonProperty("errorMsg") @Nullable String errorMsg
|
||||
)
|
||||
{
|
||||
if (state != null && state.isComplete()) {
|
||||
if (statusCode != null && statusCode.isComplete()) {
|
||||
Preconditions.checkNotNull(duration, "duration");
|
||||
}
|
||||
this.id = Preconditions.checkNotNull(id, "id");
|
||||
this.type = type;
|
||||
this.createdTime = Preconditions.checkNotNull(createdTime, "createdTime");
|
||||
this.queueInsertionTime = Preconditions.checkNotNull(queueInsertionTime, "queueInsertionTime");
|
||||
this.state = state;
|
||||
//checks for deserialization safety
|
||||
if (statusCode != null && status == null) {
|
||||
this.statusCode = statusCode;
|
||||
} else if (statusCode == null && status != null) {
|
||||
this.statusCode = status;
|
||||
} else {
|
||||
if (statusCode != null && status != null && statusCode != status) {
|
||||
throw new RuntimeException(StringUtils.format("statusCode[%s] and status[%s] must match", statusCode, status));
|
||||
}
|
||||
this.statusCode = statusCode;
|
||||
}
|
||||
this.runnerTaskState = runnerTaskState;
|
||||
this.duration = duration;
|
||||
this.location = Preconditions.checkNotNull(location, "location");
|
||||
|
@ -98,14 +142,22 @@ public class TaskStatusPlus
|
|||
|
||||
@Nullable
|
||||
@JsonProperty("statusCode")
|
||||
public TaskState getState()
|
||||
public TaskState getStatusCode()
|
||||
{
|
||||
return state;
|
||||
return statusCode;
|
||||
}
|
||||
|
||||
@Deprecated
|
||||
@Nullable
|
||||
@JsonProperty("status")
|
||||
public TaskState getStatus()
|
||||
{
|
||||
return statusCode;
|
||||
}
|
||||
|
||||
@Nullable
|
||||
@JsonProperty("runnerStatusCode")
|
||||
public RunnerTaskState getRunnerTaskState()
|
||||
public RunnerTaskState getRunnerStatusCode()
|
||||
{
|
||||
return runnerTaskState;
|
||||
}
|
||||
|
@ -150,7 +202,7 @@ public class TaskStatusPlus
|
|||
Objects.equals(getType(), that.getType()) &&
|
||||
Objects.equals(getCreatedTime(), that.getCreatedTime()) &&
|
||||
Objects.equals(getQueueInsertionTime(), that.getQueueInsertionTime()) &&
|
||||
getState() == that.getState() &&
|
||||
getStatusCode() == that.getStatusCode() &&
|
||||
Objects.equals(getDuration(), that.getDuration()) &&
|
||||
Objects.equals(getLocation(), that.getLocation()) &&
|
||||
Objects.equals(getDataSource(), that.getDataSource()) &&
|
||||
|
@ -165,7 +217,7 @@ public class TaskStatusPlus
|
|||
getType(),
|
||||
getCreatedTime(),
|
||||
getQueueInsertionTime(),
|
||||
getState(),
|
||||
getStatusCode(),
|
||||
getDuration(),
|
||||
getLocation(),
|
||||
getDataSource(),
|
||||
|
@ -181,7 +233,7 @@ public class TaskStatusPlus
|
|||
", type='" + type + '\'' +
|
||||
", createdTime=" + createdTime +
|
||||
", queueInsertionTime=" + queueInsertionTime +
|
||||
", state=" + state +
|
||||
", statusCode=" + statusCode +
|
||||
", duration=" + duration +
|
||||
", location=" + location +
|
||||
", dataSource='" + dataSource + '\'' +
|
||||
|
|
|
@ -61,6 +61,45 @@ public class TaskStatusPlusTest
|
|||
Assert.assertEquals(status, mapper.readValue(json, TaskStatusPlus.class));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testJsonAttributes() throws IOException
|
||||
{
|
||||
final ObjectMapper mapper = new ObjectMapper();
|
||||
mapper.registerModule(
|
||||
new SimpleModule()
|
||||
.addDeserializer(DateTime.class, new DateTimeDeserializer())
|
||||
.addSerializer(DateTime.class, ToStringSerializer.instance)
|
||||
);
|
||||
final String json = "{\n"
|
||||
+ "\"id\": \"testId\",\n"
|
||||
+ "\"type\": \"testType\",\n"
|
||||
+ "\"createdTime\": \"2018-09-17T06:35:17.392Z\",\n"
|
||||
+ "\"queueInsertionTime\": \"2018-09-17T06:35:17.392Z\",\n"
|
||||
+ "\"statusCode\": \"RUNNING\",\n"
|
||||
+ "\"status\": \"RUNNING\",\n"
|
||||
+ "\"runnerStatusCode\": \"RUNNING\",\n"
|
||||
+ "\"duration\": 1000,\n"
|
||||
+ "\"location\": {\n"
|
||||
+ "\"host\": \"testHost\",\n"
|
||||
+ "\"port\": 1010,\n"
|
||||
+ "\"tlsPort\": -1\n"
|
||||
+ "},\n"
|
||||
+ "\"dataSource\": \"ds_test\",\n"
|
||||
+ "\"errorMsg\": null\n"
|
||||
+ "}";
|
||||
TaskStatusPlus taskStatusPlus = mapper.readValue(json, TaskStatusPlus.class);
|
||||
Assert.assertNotNull(taskStatusPlus);
|
||||
Assert.assertNotNull(taskStatusPlus.getStatusCode());
|
||||
Assert.assertTrue(taskStatusPlus.getStatusCode().isRunnable());
|
||||
Assert.assertNotNull(taskStatusPlus.getRunnerStatusCode());
|
||||
|
||||
String serialized = mapper.writeValueAsString(taskStatusPlus);
|
||||
|
||||
Assert.assertTrue(serialized.contains("\"status\":"));
|
||||
Assert.assertTrue(serialized.contains("\"statusCode\":"));
|
||||
Assert.assertTrue(serialized.contains("\"runnerStatusCode\":"));
|
||||
}
|
||||
|
||||
// Copied from org.apache.druid.jackson.JodaStuff
|
||||
private static class DateTimeDeserializer extends StdDeserializer<DateTime>
|
||||
{
|
||||
|
|
|
@ -369,12 +369,7 @@ Metadata is available over the HTTP API by querying [system tables](#retrieving-
|
|||
|
||||
#### Responses
|
||||
|
||||
All Druid SQL HTTP responses include a "X-Druid-Column-Names" header with a JSON-encoded array of columns that
|
||||
will appear in the result rows and an "X-Druid-Column-Types" header with a JSON-encoded array of
|
||||
[types](#data-types-and-casts).
|
||||
|
||||
For the result rows themselves, Druid SQL supports a variety of result formats. You can
|
||||
specify these by adding a "resultFormat" parameter, like:
|
||||
Druid SQL supports a variety of result formats. You can specify these by adding a "resultFormat" parameter, like:
|
||||
|
||||
```json
|
||||
{
|
||||
|
@ -393,6 +388,20 @@ The supported result formats are:
|
|||
|`arrayLines`|Like "array", but the JSON arrays are separated by newlines instead of being wrapped in a JSON array. This can make it easier to parse the entire response set as a stream, if you do not have ready access to a streaming JSON parser. To make it possible to detect a truncated response, this format includes a trailer of one blank line.|text/plain|
|
||||
|`csv`|Comma-separated values, with one row per line. Individual field values may be escaped by being surrounded in double quotes. If double quotes appear in a field value, they will be escaped by replacing them with double-double-quotes like `""this""`. To make it possible to detect a truncated response, this format includes a trailer of one blank line.|text/csv|
|
||||
|
||||
You can additionally request a header by setting "header" to true in your request, like:
|
||||
|
||||
```json
|
||||
{
|
||||
"query" : "SELECT COUNT(*) FROM data_source WHERE foo = 'bar' AND __time > TIMESTAMP '2000-01-01 00:00:00'",
|
||||
"resultFormat" : "arrayLines",
|
||||
"header" : true
|
||||
}
|
||||
```
|
||||
|
||||
In this case, the first result returned will be a header. For the `csv`, `array`, and `arrayLines` formats, the header
|
||||
will be a list of column names. For the `object` and `objectLines` formats, the header will be an object where the
|
||||
keys are column names, and the values are null.
|
||||
|
||||
Errors that occur before the response body is sent will be reported in JSON, with an HTTP 500 status code, in the
|
||||
same format as [native Druid query errors](../querying/querying.html#query-errors). If an error occurs while the response body is
|
||||
being sent, at that point it is too late to change the HTTP status code or report a JSON error, so the response will
|
||||
|
|
|
@ -50,6 +50,6 @@ public class ClientBasedTaskInfoProvider implements TaskInfoProvider
|
|||
final TaskStatusResponse response = client.getTaskStatus(id);
|
||||
return response == null ?
|
||||
Optional.absent() :
|
||||
Optional.of(TaskStatus.fromCode(id, response.getStatus().getState()));
|
||||
Optional.of(TaskStatus.fromCode(id, response.getStatus().getStatusCode()));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -39,10 +39,10 @@ class TaskHistory<T extends Task>
|
|||
{
|
||||
attemptHistory.forEach(status -> {
|
||||
Preconditions.checkState(
|
||||
status.getState() == TaskState.SUCCESS || status.getState() == TaskState.FAILED,
|
||||
status.getStatusCode() == TaskState.SUCCESS || status.getStatusCode() == TaskState.FAILED,
|
||||
"Complete tasks should be recorded, but the state of task[%s] is [%s]",
|
||||
status.getId(),
|
||||
status.getState()
|
||||
status.getStatusCode()
|
||||
);
|
||||
});
|
||||
this.spec = spec;
|
||||
|
|
|
@ -115,7 +115,7 @@ public class TaskMonitor<T extends Task>
|
|||
final TaskStatusResponse taskStatusResponse = indexingServiceClient.getTaskStatus(taskId);
|
||||
final TaskStatusPlus taskStatus = taskStatusResponse.getStatus();
|
||||
if (taskStatus != null) {
|
||||
switch (Preconditions.checkNotNull(taskStatus.getState(), "taskState")) {
|
||||
switch (Preconditions.checkNotNull(taskStatus.getStatusCode(), "taskState")) {
|
||||
case SUCCESS:
|
||||
incrementNumSucceededTasks();
|
||||
|
||||
|
@ -152,7 +152,7 @@ public class TaskMonitor<T extends Task>
|
|||
monitorEntry.updateStatus(taskStatus);
|
||||
break;
|
||||
default:
|
||||
throw new ISE("Unknown taskStatus[%s] for task[%s[", taskStatus.getState(), taskId);
|
||||
throw new ISE("Unknown taskStatus[%s] for task[%s[", taskStatus.getStatusCode(), taskId);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -459,7 +459,7 @@ public class TaskMonitor<T extends Task>
|
|||
|
||||
TaskState getLastState()
|
||||
{
|
||||
return lastStatus == null ? TaskState.FAILED : lastStatus.getState();
|
||||
return lastStatus == null ? TaskState.FAILED : lastStatus.getStatusCode();
|
||||
}
|
||||
|
||||
@Nullable
|
||||
|
|
|
@ -364,7 +364,7 @@ public class ParallelIndexSupervisorTaskResourceTest extends AbstractParallelInd
|
|||
.filter(entry -> {
|
||||
final TaskStatusPlus currentStatus = entry.getValue().getCurrentStatus();
|
||||
return currentStatus != null &&
|
||||
(currentStatus.getState() == TaskState.SUCCESS || currentStatus.getState() == TaskState.FAILED);
|
||||
(currentStatus.getStatusCode() == TaskState.SUCCESS || currentStatus.getStatusCode() == TaskState.FAILED);
|
||||
})
|
||||
.map(Entry::getKey)
|
||||
.findFirst()
|
||||
|
|
|
@ -87,7 +87,7 @@ public class TaskMonitorTest
|
|||
Assert.assertEquals("supervisorId", result.getSpec().getSupervisorTaskId());
|
||||
Assert.assertEquals("specId" + i, result.getSpec().getId());
|
||||
Assert.assertNotNull(result.getLastStatus());
|
||||
Assert.assertEquals(TaskState.SUCCESS, result.getLastStatus().getState());
|
||||
Assert.assertEquals(TaskState.SUCCESS, result.getLastStatus().getStatusCode());
|
||||
Assert.assertEquals(TaskState.SUCCESS, result.getLastState());
|
||||
}
|
||||
}
|
||||
|
@ -113,7 +113,7 @@ public class TaskMonitorTest
|
|||
Assert.assertEquals("specId" + i, result.getSpec().getId());
|
||||
|
||||
Assert.assertNotNull(result.getLastStatus());
|
||||
Assert.assertEquals(TaskState.SUCCESS, result.getLastStatus().getState());
|
||||
Assert.assertEquals(TaskState.SUCCESS, result.getLastStatus().getStatusCode());
|
||||
Assert.assertEquals(TaskState.SUCCESS, result.getLastState());
|
||||
|
||||
final TaskHistory<TestTask> taskHistory = monitor.getCompleteSubTaskSpecHistory(specs.get(i).getId());
|
||||
|
@ -122,8 +122,8 @@ public class TaskMonitorTest
|
|||
final List<TaskStatusPlus> attemptHistory = taskHistory.getAttemptHistory();
|
||||
Assert.assertNotNull(attemptHistory);
|
||||
Assert.assertEquals(3, attemptHistory.size());
|
||||
Assert.assertEquals(TaskState.FAILED, attemptHistory.get(0).getState());
|
||||
Assert.assertEquals(TaskState.FAILED, attemptHistory.get(1).getState());
|
||||
Assert.assertEquals(TaskState.FAILED, attemptHistory.get(0).getStatusCode());
|
||||
Assert.assertEquals(TaskState.FAILED, attemptHistory.get(1).getStatusCode());
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -881,6 +881,8 @@ public class OverlordResourceTest
|
|||
TestHelper.makeJsonMapper().writeValueAsString(response1.getEntity()),
|
||||
TaskStatusResponse.class
|
||||
);
|
||||
TaskStatusPlus tsp = taskStatusResponse1.getStatus();
|
||||
Assert.assertEquals(tsp.getStatusCode(), tsp.getStatus());
|
||||
Assert.assertEquals(
|
||||
new TaskStatusResponse(
|
||||
"mytask",
|
||||
|
|
|
@ -248,7 +248,7 @@ public class OverlordTest
|
|||
Assert.assertEquals(taskId_0, ((TaskStatusResponse) response.getEntity()).getTask());
|
||||
Assert.assertEquals(
|
||||
TaskStatus.running(taskId_0).getStatusCode(),
|
||||
((TaskStatusResponse) response.getEntity()).getStatus().getState()
|
||||
((TaskStatusResponse) response.getEntity()).getStatus().getStatusCode()
|
||||
);
|
||||
|
||||
// Simulate completion of task_0
|
||||
|
@ -296,7 +296,7 @@ public class OverlordTest
|
|||
{
|
||||
while (true) {
|
||||
Response response = overlordResource.getTaskStatus(taskId);
|
||||
if (status.equals(((TaskStatusResponse) response.getEntity()).getStatus().getState())) {
|
||||
if (status.equals(((TaskStatusResponse) response.getEntity()).getStatus().getStatusCode())) {
|
||||
break;
|
||||
}
|
||||
Thread.sleep(10);
|
||||
|
|
|
@ -132,7 +132,7 @@ public class OverlordResourceTestClient
|
|||
{
|
||||
}
|
||||
);
|
||||
return taskStatusResponse.getStatus().getState();
|
||||
return taskStatusResponse.getStatus().getStatusCode();
|
||||
}
|
||||
catch (Exception e) {
|
||||
throw Throwables.propagate(e);
|
||||
|
|
|
@ -21,7 +21,7 @@ package org.apache.druid.testing.clients;
|
|||
|
||||
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import org.apache.druid.indexer.TaskStatus;
|
||||
import org.apache.druid.indexer.TaskState;
|
||||
import org.joda.time.DateTime;
|
||||
|
||||
public class TaskResponseObject
|
||||
|
@ -30,14 +30,14 @@ public class TaskResponseObject
|
|||
private final String id;
|
||||
private final DateTime createdTime;
|
||||
private final DateTime queueInsertionTime;
|
||||
private final TaskStatus status;
|
||||
private final TaskState status;
|
||||
|
||||
@JsonCreator
|
||||
private TaskResponseObject(
|
||||
@JsonProperty("id") String id,
|
||||
@JsonProperty("createdTime") DateTime createdTime,
|
||||
@JsonProperty("queueInsertionTime") DateTime queueInsertionTime,
|
||||
@JsonProperty("status") TaskStatus status
|
||||
@JsonProperty("status") TaskState status
|
||||
)
|
||||
{
|
||||
this.id = id;
|
||||
|
@ -65,7 +65,7 @@ public class TaskResponseObject
|
|||
}
|
||||
|
||||
@SuppressWarnings("unused") // Used by Jackson serialization?
|
||||
public TaskStatus getStatus()
|
||||
public TaskState getStatus()
|
||||
{
|
||||
return status;
|
||||
}
|
||||
|
|
|
@ -26,6 +26,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
|
|||
import javax.annotation.Nullable;
|
||||
import java.io.IOException;
|
||||
import java.io.OutputStream;
|
||||
import java.util.List;
|
||||
|
||||
public class ArrayLinesWriter implements ResultFormat.Writer
|
||||
{
|
||||
|
@ -55,6 +56,18 @@ public class ArrayLinesWriter implements ResultFormat.Writer
|
|||
outputStream.flush();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeHeader(final List<String> columnNames) throws IOException
|
||||
{
|
||||
jsonGenerator.writeStartArray();
|
||||
|
||||
for (String columnName : columnNames) {
|
||||
jsonGenerator.writeString(columnName);
|
||||
}
|
||||
|
||||
jsonGenerator.writeEndArray();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeRowStart() throws IOException
|
||||
{
|
||||
|
|
|
@ -25,6 +25,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
|
|||
import javax.annotation.Nullable;
|
||||
import java.io.IOException;
|
||||
import java.io.OutputStream;
|
||||
import java.util.List;
|
||||
|
||||
public class ArrayWriter implements ResultFormat.Writer
|
||||
{
|
||||
|
@ -53,6 +54,18 @@ public class ArrayWriter implements ResultFormat.Writer
|
|||
outputStream.write('\n');
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeHeader(final List<String> columnNames) throws IOException
|
||||
{
|
||||
jsonGenerator.writeStartArray();
|
||||
|
||||
for (String columnName : columnNames) {
|
||||
jsonGenerator.writeString(columnName);
|
||||
}
|
||||
|
||||
jsonGenerator.writeEndArray();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeRowStart() throws IOException
|
||||
{
|
||||
|
|
|
@ -58,6 +58,12 @@ public class CsvWriter implements ResultFormat.Writer
|
|||
outputStream.flush();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeHeader(final List<String> columnNames)
|
||||
{
|
||||
writer.writeNext(columnNames.toArray(new String[0]), false);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeRowStart()
|
||||
{
|
||||
|
|
|
@ -26,6 +26,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
|
|||
import javax.annotation.Nullable;
|
||||
import java.io.IOException;
|
||||
import java.io.OutputStream;
|
||||
import java.util.List;
|
||||
|
||||
public class ObjectLinesWriter implements ResultFormat.Writer
|
||||
{
|
||||
|
@ -40,7 +41,7 @@ public class ObjectLinesWriter implements ResultFormat.Writer
|
|||
}
|
||||
|
||||
@Override
|
||||
public void writeResponseStart() throws IOException
|
||||
public void writeResponseStart()
|
||||
{
|
||||
// Do nothing.
|
||||
}
|
||||
|
@ -55,6 +56,18 @@ public class ObjectLinesWriter implements ResultFormat.Writer
|
|||
outputStream.flush();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeHeader(final List<String> columnNames) throws IOException
|
||||
{
|
||||
jsonGenerator.writeStartObject();
|
||||
|
||||
for (String columnName : columnNames) {
|
||||
jsonGenerator.writeNullField(columnName);
|
||||
}
|
||||
|
||||
jsonGenerator.writeEndObject();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeRowStart() throws IOException
|
||||
{
|
||||
|
|
|
@ -25,6 +25,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
|
|||
import javax.annotation.Nullable;
|
||||
import java.io.IOException;
|
||||
import java.io.OutputStream;
|
||||
import java.util.List;
|
||||
|
||||
public class ObjectWriter implements ResultFormat.Writer
|
||||
{
|
||||
|
@ -53,6 +54,18 @@ public class ObjectWriter implements ResultFormat.Writer
|
|||
outputStream.write('\n');
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeHeader(final List<String> columnNames) throws IOException
|
||||
{
|
||||
jsonGenerator.writeStartObject();
|
||||
|
||||
for (String columnName : columnNames) {
|
||||
jsonGenerator.writeNullField(columnName);
|
||||
}
|
||||
|
||||
jsonGenerator.writeEndObject();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeRowStart() throws IOException
|
||||
{
|
||||
|
|
|
@ -28,6 +28,7 @@ import javax.ws.rs.core.MediaType;
|
|||
import java.io.Closeable;
|
||||
import java.io.IOException;
|
||||
import java.io.OutputStream;
|
||||
import java.util.List;
|
||||
|
||||
public enum ResultFormat
|
||||
{
|
||||
|
@ -112,6 +113,8 @@ public enum ResultFormat
|
|||
*/
|
||||
void writeResponseStart() throws IOException;
|
||||
|
||||
void writeHeader(List<String> columnNames) throws IOException;
|
||||
|
||||
/**
|
||||
* Start of each result row.
|
||||
*/
|
||||
|
|
|
@ -31,17 +31,20 @@ public class SqlQuery
|
|||
{
|
||||
private final String query;
|
||||
private final ResultFormat resultFormat;
|
||||
private final boolean header;
|
||||
private final Map<String, Object> context;
|
||||
|
||||
@JsonCreator
|
||||
public SqlQuery(
|
||||
@JsonProperty("query") final String query,
|
||||
@JsonProperty("resultFormat") final ResultFormat resultFormat,
|
||||
@JsonProperty("header") final boolean header,
|
||||
@JsonProperty("context") final Map<String, Object> context
|
||||
)
|
||||
{
|
||||
this.query = Preconditions.checkNotNull(query, "query");
|
||||
this.resultFormat = resultFormat == null ? ResultFormat.OBJECT : resultFormat;
|
||||
this.header = header;
|
||||
this.context = context == null ? ImmutableMap.of() : context;
|
||||
}
|
||||
|
||||
|
@ -57,6 +60,12 @@ public class SqlQuery
|
|||
return resultFormat;
|
||||
}
|
||||
|
||||
@JsonProperty("header")
|
||||
public boolean includeHeader()
|
||||
{
|
||||
return header;
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
public Map<String, Object> getContext()
|
||||
{
|
||||
|
@ -73,7 +82,8 @@ public class SqlQuery
|
|||
return false;
|
||||
}
|
||||
final SqlQuery sqlQuery = (SqlQuery) o;
|
||||
return Objects.equals(query, sqlQuery.query) &&
|
||||
return header == sqlQuery.header &&
|
||||
Objects.equals(query, sqlQuery.query) &&
|
||||
resultFormat == sqlQuery.resultFormat &&
|
||||
Objects.equals(context, sqlQuery.context);
|
||||
}
|
||||
|
@ -81,7 +91,7 @@ public class SqlQuery
|
|||
@Override
|
||||
public int hashCode()
|
||||
{
|
||||
return Objects.hash(query, resultFormat, context);
|
||||
return Objects.hash(query, resultFormat, header, context);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -90,6 +100,7 @@ public class SqlQuery
|
|||
return "SqlQuery{" +
|
||||
"query='" + query + '\'' +
|
||||
", resultFormat=" + resultFormat +
|
||||
", header=" + header +
|
||||
", context=" + context +
|
||||
'}';
|
||||
}
|
||||
|
|
|
@ -23,6 +23,9 @@ import com.fasterxml.jackson.databind.ObjectMapper;
|
|||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.base.Throwables;
|
||||
import com.google.inject.Inject;
|
||||
import org.apache.calcite.plan.RelOptPlanner;
|
||||
import org.apache.calcite.rel.type.RelDataTypeField;
|
||||
import org.apache.calcite.sql.type.SqlTypeName;
|
||||
import org.apache.druid.guice.annotations.Json;
|
||||
import org.apache.druid.java.util.common.ISE;
|
||||
import org.apache.druid.java.util.common.guava.Yielder;
|
||||
|
@ -34,9 +37,6 @@ import org.apache.druid.sql.calcite.planner.Calcites;
|
|||
import org.apache.druid.sql.calcite.planner.DruidPlanner;
|
||||
import org.apache.druid.sql.calcite.planner.PlannerFactory;
|
||||
import org.apache.druid.sql.calcite.planner.PlannerResult;
|
||||
import org.apache.calcite.plan.RelOptPlanner;
|
||||
import org.apache.calcite.rel.type.RelDataTypeField;
|
||||
import org.apache.calcite.sql.type.SqlTypeName;
|
||||
import org.joda.time.DateTimeZone;
|
||||
import org.joda.time.format.ISODateTimeFormat;
|
||||
|
||||
|
@ -52,6 +52,7 @@ import javax.ws.rs.core.Response;
|
|||
import javax.ws.rs.core.StreamingOutput;
|
||||
import java.io.IOException;
|
||||
import java.io.OutputStream;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
|
||||
@Path("/druid/v2/sql/")
|
||||
|
@ -93,14 +94,12 @@ public class SqlResource
|
|||
final boolean[] timeColumns = new boolean[fieldList.size()];
|
||||
final boolean[] dateColumns = new boolean[fieldList.size()];
|
||||
final String[] columnNames = new String[fieldList.size()];
|
||||
final String[] columnTypes = new String[fieldList.size()];
|
||||
|
||||
for (int i = 0; i < fieldList.size(); i++) {
|
||||
final SqlTypeName sqlTypeName = fieldList.get(i).getType().getSqlTypeName();
|
||||
timeColumns[i] = sqlTypeName == SqlTypeName.TIMESTAMP;
|
||||
dateColumns[i] = sqlTypeName == SqlTypeName.DATE;
|
||||
columnNames[i] = fieldList.get(i).getName();
|
||||
columnTypes[i] = sqlTypeName.getName();
|
||||
}
|
||||
|
||||
final Yielder<Object[]> yielder0 = Yielders.each(plannerResult.run());
|
||||
|
@ -119,6 +118,10 @@ public class SqlResource
|
|||
.createFormatter(outputStream, jsonMapper)) {
|
||||
writer.writeResponseStart();
|
||||
|
||||
if (sqlQuery.includeHeader()) {
|
||||
writer.writeHeader(Arrays.asList(columnNames));
|
||||
}
|
||||
|
||||
while (!yielder.isDone()) {
|
||||
final Object[] row = yielder.get();
|
||||
writer.writeRowStart();
|
||||
|
@ -151,8 +154,6 @@ public class SqlResource
|
|||
}
|
||||
}
|
||||
)
|
||||
.header("X-Druid-Column-Names", jsonMapper.writeValueAsString(columnNames))
|
||||
.header("X-Druid-Column-Types", jsonMapper.writeValueAsString(columnTypes))
|
||||
.build();
|
||||
}
|
||||
catch (Throwable e) {
|
||||
|
|
|
@ -34,7 +34,7 @@ public class SqlQueryTest extends CalciteTestBase
|
|||
public void testSerde() throws Exception
|
||||
{
|
||||
final ObjectMapper jsonMapper = TestHelper.makeJsonMapper();
|
||||
final SqlQuery query = new SqlQuery("SELECT 1", ResultFormat.ARRAY, ImmutableMap.of("useCache", false));
|
||||
final SqlQuery query = new SqlQuery("SELECT 1", ResultFormat.ARRAY, true, ImmutableMap.of("useCache", false));
|
||||
Assert.assertEquals(query, jsonMapper.readValue(jsonMapper.writeValueAsString(query), SqlQuery.class));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -25,6 +25,7 @@ import com.google.common.base.Splitter;
|
|||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.google.common.collect.Maps;
|
||||
import org.apache.calcite.tools.ValidationException;
|
||||
import org.apache.druid.client.TimelineServerView;
|
||||
import org.apache.druid.common.config.NullHandling;
|
||||
import org.apache.druid.discovery.DruidLeaderClient;
|
||||
|
@ -52,7 +53,6 @@ import org.apache.druid.sql.calcite.util.TestServerInventoryView;
|
|||
import org.apache.druid.sql.http.ResultFormat;
|
||||
import org.apache.druid.sql.http.SqlQuery;
|
||||
import org.apache.druid.sql.http.SqlResource;
|
||||
import org.apache.calcite.tools.ValidationException;
|
||||
import org.easymock.EasyMock;
|
||||
import org.junit.After;
|
||||
import org.junit.AfterClass;
|
||||
|
@ -159,34 +159,11 @@ public class SqlResourceTest extends CalciteTestBase
|
|||
walker = null;
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testXDruidColumnHeaders() throws Exception
|
||||
{
|
||||
final Response response = resource.doPost(
|
||||
new SqlQuery(
|
||||
"SELECT FLOOR(__time TO DAY) as \"day\", COUNT(*) as TheCount, SUM(m1) FROM druid.foo GROUP BY 1",
|
||||
ResultFormat.OBJECT,
|
||||
null
|
||||
),
|
||||
req
|
||||
);
|
||||
|
||||
Assert.assertEquals(
|
||||
"[\"day\",\"TheCount\",\"EXPR$2\"]",
|
||||
response.getMetadata().getFirst("X-Druid-Column-Names")
|
||||
);
|
||||
|
||||
Assert.assertEquals(
|
||||
"[\"TIMESTAMP\",\"BIGINT\",\"DOUBLE\"]",
|
||||
response.getMetadata().getFirst("X-Druid-Column-Types")
|
||||
);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCountStar() throws Exception
|
||||
{
|
||||
final List<Map<String, Object>> rows = doPost(
|
||||
new SqlQuery("SELECT COUNT(*) AS cnt, 'foo' AS TheFoo FROM druid.foo", null, null)
|
||||
new SqlQuery("SELECT COUNT(*) AS cnt, 'foo' AS TheFoo FROM druid.foo", null, false, null)
|
||||
).rhs;
|
||||
|
||||
Assert.assertEquals(
|
||||
|
@ -204,6 +181,7 @@ public class SqlResourceTest extends CalciteTestBase
|
|||
new SqlQuery(
|
||||
"SELECT __time, CAST(__time AS DATE) AS t2 FROM druid.foo LIMIT 1",
|
||||
ResultFormat.OBJECT,
|
||||
false,
|
||||
null
|
||||
)
|
||||
).rhs;
|
||||
|
@ -223,6 +201,7 @@ public class SqlResourceTest extends CalciteTestBase
|
|||
new SqlQuery(
|
||||
"SELECT __time, CAST(__time AS DATE) AS t2 FROM druid.foo LIMIT 1",
|
||||
ResultFormat.OBJECT,
|
||||
false,
|
||||
ImmutableMap.of(PlannerContext.CTX_SQL_TIME_ZONE, "America/Los_Angeles")
|
||||
)
|
||||
).rhs;
|
||||
|
@ -239,7 +218,7 @@ public class SqlResourceTest extends CalciteTestBase
|
|||
public void testFieldAliasingSelect() throws Exception
|
||||
{
|
||||
final List<Map<String, Object>> rows = doPost(
|
||||
new SqlQuery("SELECT dim2 \"x\", dim2 \"y\" FROM druid.foo LIMIT 1", ResultFormat.OBJECT, null)
|
||||
new SqlQuery("SELECT dim2 \"x\", dim2 \"y\" FROM druid.foo LIMIT 1", ResultFormat.OBJECT, false, null)
|
||||
).rhs;
|
||||
|
||||
Assert.assertEquals(
|
||||
|
@ -254,7 +233,7 @@ public class SqlResourceTest extends CalciteTestBase
|
|||
public void testFieldAliasingGroupBy() throws Exception
|
||||
{
|
||||
final List<Map<String, Object>> rows = doPost(
|
||||
new SqlQuery("SELECT dim2 \"x\", dim2 \"y\" FROM druid.foo GROUP BY dim2", ResultFormat.OBJECT, null)
|
||||
new SqlQuery("SELECT dim2 \"x\", dim2 \"y\" FROM druid.foo GROUP BY dim2", ResultFormat.OBJECT, false, null)
|
||||
).rhs;
|
||||
|
||||
Assert.assertEquals(
|
||||
|
@ -284,9 +263,43 @@ public class SqlResourceTest extends CalciteTestBase
|
|||
Assert.assertEquals(
|
||||
ImmutableList.of(
|
||||
Arrays.asList("2000-01-01T00:00:00.000Z", 1, "", "a", 1.0, 1.0, "org.apache.druid.hll.HLLCV1", nullStr),
|
||||
Arrays.asList("2000-01-02T00:00:00.000Z", 1, "10.1", nullStr, 2.0, 2.0, "org.apache.druid.hll.HLLCV1", nullStr)
|
||||
Arrays.asList(
|
||||
"2000-01-02T00:00:00.000Z",
|
||||
1,
|
||||
"10.1",
|
||||
nullStr,
|
||||
2.0,
|
||||
2.0,
|
||||
"org.apache.druid.hll.HLLCV1",
|
||||
nullStr
|
||||
)
|
||||
),
|
||||
doPost(new SqlQuery(query, ResultFormat.ARRAY, null), new TypeReference<List<List<Object>>>() {}).rhs
|
||||
doPost(new SqlQuery(query, ResultFormat.ARRAY, false, null), new TypeReference<List<List<Object>>>() {}).rhs
|
||||
);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testArrayResultFormatWithHeader() throws Exception
|
||||
{
|
||||
final String query = "SELECT *, CASE dim2 WHEN '' THEN dim2 END FROM foo LIMIT 2";
|
||||
final String nullStr = NullHandling.replaceWithDefault() ? "" : null;
|
||||
|
||||
Assert.assertEquals(
|
||||
ImmutableList.of(
|
||||
Arrays.asList("__time", "cnt", "dim1", "dim2", "m1", "m2", "unique_dim1", "EXPR$7"),
|
||||
Arrays.asList("2000-01-01T00:00:00.000Z", 1, "", "a", 1.0, 1.0, "org.apache.druid.hll.HLLCV1", nullStr),
|
||||
Arrays.asList(
|
||||
"2000-01-02T00:00:00.000Z",
|
||||
1,
|
||||
"10.1",
|
||||
nullStr,
|
||||
2.0,
|
||||
2.0,
|
||||
"org.apache.druid.hll.HLLCV1",
|
||||
nullStr
|
||||
)
|
||||
),
|
||||
doPost(new SqlQuery(query, ResultFormat.ARRAY, true, null), new TypeReference<List<List<Object>>>() {}).rhs
|
||||
);
|
||||
}
|
||||
|
||||
|
@ -294,7 +307,7 @@ public class SqlResourceTest extends CalciteTestBase
|
|||
public void testArrayLinesResultFormat() throws Exception
|
||||
{
|
||||
final String query = "SELECT *, CASE dim2 WHEN '' THEN dim2 END FROM foo LIMIT 2";
|
||||
final String response = doPostRaw(new SqlQuery(query, ResultFormat.ARRAYLINES, null)).rhs;
|
||||
final String response = doPostRaw(new SqlQuery(query, ResultFormat.ARRAYLINES, false, null)).rhs;
|
||||
final String nullStr = NullHandling.replaceWithDefault() ? "" : null;
|
||||
final List<String> lines = Splitter.on('\n').splitToList(response);
|
||||
|
||||
|
@ -311,6 +324,31 @@ public class SqlResourceTest extends CalciteTestBase
|
|||
Assert.assertEquals("", lines.get(3));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testArrayLinesResultFormatWithHeader() throws Exception
|
||||
{
|
||||
final String query = "SELECT *, CASE dim2 WHEN '' THEN dim2 END FROM foo LIMIT 2";
|
||||
final String response = doPostRaw(new SqlQuery(query, ResultFormat.ARRAYLINES, true, null)).rhs;
|
||||
final String nullStr = NullHandling.replaceWithDefault() ? "" : null;
|
||||
final List<String> lines = Splitter.on('\n').splitToList(response);
|
||||
|
||||
Assert.assertEquals(5, lines.size());
|
||||
Assert.assertEquals(
|
||||
Arrays.asList("__time", "cnt", "dim1", "dim2", "m1", "m2", "unique_dim1", "EXPR$7"),
|
||||
JSON_MAPPER.readValue(lines.get(0), List.class)
|
||||
);
|
||||
Assert.assertEquals(
|
||||
Arrays.asList("2000-01-01T00:00:00.000Z", 1, "", "a", 1.0, 1.0, "org.apache.druid.hll.HLLCV1", nullStr),
|
||||
JSON_MAPPER.readValue(lines.get(1), List.class)
|
||||
);
|
||||
Assert.assertEquals(
|
||||
Arrays.asList("2000-01-02T00:00:00.000Z", 1, "10.1", nullStr, 2.0, 2.0, "org.apache.druid.hll.HLLCV1", nullStr),
|
||||
JSON_MAPPER.readValue(lines.get(2), List.class)
|
||||
);
|
||||
Assert.assertEquals("", lines.get(3));
|
||||
Assert.assertEquals("", lines.get(4));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testObjectResultFormat() throws Exception
|
||||
{
|
||||
|
@ -348,7 +386,10 @@ public class SqlResourceTest extends CalciteTestBase
|
|||
.put("EXPR$7", "")
|
||||
.build()
|
||||
).stream().map(transformer).collect(Collectors.toList()),
|
||||
doPost(new SqlQuery(query, ResultFormat.OBJECT, null), new TypeReference<List<Map<String, Object>>>() {}).rhs
|
||||
doPost(
|
||||
new SqlQuery(query, ResultFormat.OBJECT, false, null),
|
||||
new TypeReference<List<Map<String, Object>>>() {}
|
||||
).rhs
|
||||
);
|
||||
}
|
||||
|
||||
|
@ -356,7 +397,7 @@ public class SqlResourceTest extends CalciteTestBase
|
|||
public void testObjectLinesResultFormat() throws Exception
|
||||
{
|
||||
final String query = "SELECT *, CASE dim2 WHEN '' THEN dim2 END FROM foo LIMIT 2";
|
||||
final String response = doPostRaw(new SqlQuery(query, ResultFormat.OBJECTLINES, null)).rhs;
|
||||
final String response = doPostRaw(new SqlQuery(query, ResultFormat.OBJECTLINES, false, null)).rhs;
|
||||
final String nullStr = NullHandling.replaceWithDefault() ? "" : null;
|
||||
final Function<Map<String, Object>, Map<String, Object>> transformer = m -> {
|
||||
return Maps.transformEntries(
|
||||
|
@ -407,7 +448,7 @@ public class SqlResourceTest extends CalciteTestBase
|
|||
public void testCsvResultFormat() throws Exception
|
||||
{
|
||||
final String query = "SELECT *, CASE dim2 WHEN '' THEN dim2 END FROM foo LIMIT 2";
|
||||
final String response = doPostRaw(new SqlQuery(query, ResultFormat.CSV, null)).rhs;
|
||||
final String response = doPostRaw(new SqlQuery(query, ResultFormat.CSV, false, null)).rhs;
|
||||
final List<String> lines = Splitter.on('\n').splitToList(response);
|
||||
|
||||
Assert.assertEquals(
|
||||
|
@ -421,11 +462,30 @@ public class SqlResourceTest extends CalciteTestBase
|
|||
);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCsvResultFormatWithHeaders() throws Exception
|
||||
{
|
||||
final String query = "SELECT *, CASE dim2 WHEN '' THEN dim2 END FROM foo LIMIT 2";
|
||||
final String response = doPostRaw(new SqlQuery(query, ResultFormat.CSV, true, null)).rhs;
|
||||
final List<String> lines = Splitter.on('\n').splitToList(response);
|
||||
|
||||
Assert.assertEquals(
|
||||
ImmutableList.of(
|
||||
"__time,cnt,dim1,dim2,m1,m2,unique_dim1,EXPR$7",
|
||||
"2000-01-01T00:00:00.000Z,1,,a,1.0,1.0,org.apache.druid.hll.HLLCV1,",
|
||||
"2000-01-02T00:00:00.000Z,1,10.1,,2.0,2.0,org.apache.druid.hll.HLLCV1,",
|
||||
"",
|
||||
""
|
||||
),
|
||||
lines
|
||||
);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testExplainCountStar() throws Exception
|
||||
{
|
||||
final List<Map<String, Object>> rows = doPost(
|
||||
new SqlQuery("EXPLAIN PLAN FOR SELECT COUNT(*) AS cnt FROM druid.foo", ResultFormat.OBJECT, null)
|
||||
new SqlQuery("EXPLAIN PLAN FOR SELECT COUNT(*) AS cnt FROM druid.foo", ResultFormat.OBJECT, false, null)
|
||||
).rhs;
|
||||
|
||||
Assert.assertEquals(
|
||||
|
@ -446,6 +506,7 @@ public class SqlResourceTest extends CalciteTestBase
|
|||
new SqlQuery(
|
||||
"SELECT dim3 FROM druid.foo",
|
||||
ResultFormat.OBJECT,
|
||||
false,
|
||||
null
|
||||
)
|
||||
).lhs;
|
||||
|
@ -461,7 +522,7 @@ public class SqlResourceTest extends CalciteTestBase
|
|||
{
|
||||
// SELECT + ORDER unsupported
|
||||
final QueryInterruptedException exception = doPost(
|
||||
new SqlQuery("SELECT dim1 FROM druid.foo ORDER BY dim1", ResultFormat.OBJECT, null)
|
||||
new SqlQuery("SELECT dim1 FROM druid.foo ORDER BY dim1", ResultFormat.OBJECT, false, null)
|
||||
).lhs;
|
||||
|
||||
Assert.assertNotNull(exception);
|
||||
|
@ -480,6 +541,7 @@ public class SqlResourceTest extends CalciteTestBase
|
|||
new SqlQuery(
|
||||
"SELECT DISTINCT dim1 FROM foo",
|
||||
ResultFormat.OBJECT,
|
||||
false,
|
||||
ImmutableMap.of("maxMergingDictionarySize", 1)
|
||||
)
|
||||
).lhs;
|
||||
|
|
Loading…
Reference in New Issue