Add support for different result formats to MSQ SqlStatementResource (#14571)

* Add support for different result format

* Add tests

* Add tests

* Fix checkstyle

* Remove changes to destination

* Removed some unwanted code

* Address review comments

* Rename parameter

* Fix tests
This commit is contained in:
Adarsh Sanjeev 2023-08-07 20:48:59 +05:30 committed by GitHub
parent 2d8e0f28f3
commit 56ab81f381
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 232 additions and 47 deletions

View File

@ -21,6 +21,7 @@ package org.apache.druid.msq.sql.resources;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableMap;
import com.google.common.io.CountingOutputStream;
import com.google.common.util.concurrent.ListenableFuture;
@ -49,6 +50,7 @@ import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.msq.guice.MultiStageQuery;
import org.apache.druid.msq.indexing.MSQControllerTask;
import org.apache.druid.msq.indexing.MSQSpec;
import org.apache.druid.msq.indexing.destination.DurableStorageMSQDestination;
import org.apache.druid.msq.indexing.destination.MSQDestination;
import org.apache.druid.msq.indexing.destination.MSQSelectDestination;
@ -115,6 +117,7 @@ import java.util.stream.Collectors;
public class SqlStatementResource
{
public static final String RESULT_FORMAT = "__resultFormat";
private static final Logger log = new Logger(SqlStatementResource.class);
private final SqlStatementFactory msqSqlStatementFactory;
private final AuthorizerMapper authorizerMapper;
@ -167,12 +170,14 @@ public class SqlStatementResource
@Consumes(MediaType.APPLICATION_JSON)
public Response doPost(final SqlQuery sqlQuery, @Context final HttpServletRequest req)
{
final HttpStatement stmt = msqSqlStatementFactory.httpStatement(sqlQuery, req);
SqlQuery modifiedQuery = createModifiedSqlQuery(sqlQuery);
final HttpStatement stmt = msqSqlStatementFactory.httpStatement(modifiedQuery, req);
final String sqlQueryId = stmt.sqlQueryId();
final String currThreadName = Thread.currentThread().getName();
boolean isDebug = false;
try {
QueryContext queryContext = QueryContext.of(sqlQuery.getContext());
QueryContext queryContext = QueryContext.of(modifiedQuery.getContext());
isDebug = queryContext.isDebug();
contextChecks(queryContext);
@ -190,7 +195,7 @@ public class SqlStatementResource
return buildTaskResponse(sequence, stmt.query().authResult().getIdentity());
} else {
// Used for EXPLAIN
return buildStandardResponse(sequence, sqlQuery, sqlQueryId, rowTransformer);
return buildStandardResponse(sequence, modifiedQuery, sqlQueryId, rowTransformer);
}
}
catch (DruidException e) {
@ -278,6 +283,7 @@ public class SqlStatementResource
public Response doGetResults(
@PathParam("id") final String queryId,
@QueryParam("page") Long page,
@QueryParam("resultFormat") String resultFormat,
@Context final HttpServletRequest req
)
{
@ -328,12 +334,14 @@ public class SqlStatementResource
return Response.ok().build();
}
ResultFormat preferredFormat = getPreferredResultFormat(resultFormat, msqControllerTask.getQuerySpec());
return Response.ok((StreamingOutput) outputStream -> resultPusher(
queryId,
signature,
closer,
results,
new CountingOutputStream(outputStream)
new CountingOutputStream(outputStream),
preferredFormat
)).build();
}
@ -658,6 +666,48 @@ public class SqlStatementResource
return msqControllerTask;
}
/**
* Creates a new sqlQuery from the user submitted sqlQuery after performing required modifications.
*/
private SqlQuery createModifiedSqlQuery(SqlQuery sqlQuery)
{
Map<String, Object> context = sqlQuery.getContext();
if (context.containsKey(RESULT_FORMAT)) {
throw InvalidInput.exception("Query context parameter [%s] is not allowed", RESULT_FORMAT);
}
Map<String, Object> modifiedContext = ImmutableMap.<String, Object>builder()
.putAll(context)
.put(RESULT_FORMAT, sqlQuery.getResultFormat().toString())
.build();
return new SqlQuery(
sqlQuery.getQuery(),
sqlQuery.getResultFormat(),
sqlQuery.includeHeader(),
sqlQuery.includeTypesHeader(),
sqlQuery.includeSqlTypesHeader(),
modifiedContext,
sqlQuery.getParameters()
);
}
private ResultFormat getPreferredResultFormat(String resultFormatParam, MSQSpec msqSpec)
{
if (resultFormatParam == null) {
return QueryContexts.getAsEnum(
RESULT_FORMAT,
msqSpec.getQuery().context().get(RESULT_FORMAT),
ResultFormat.class,
ResultFormat.DEFAULT_RESULT_FORMAT
);
}
return QueryContexts.getAsEnum(
"resultFormat",
resultFormatParam,
ResultFormat.class
);
}
private Optional<Yielder<Object[]>> getResultYielder(
String queryId,
Long page,
@ -769,13 +819,37 @@ public class SqlStatementResource
Optional<List<ColumnNameAndTypes>> signature,
Closer closer,
Optional<Yielder<Object[]>> results,
CountingOutputStream os
CountingOutputStream os,
ResultFormat resultFormat
) throws IOException
{
try {
try (final ResultFormat.Writer writer = ResultFormat.OBJECTLINES.createFormatter(os, jsonMapper)) {
try (final ResultFormat.Writer writer = resultFormat.createFormatter(os, jsonMapper)) {
Yielder<Object[]> yielder = results.get();
List<ColumnNameAndTypes> rowSignature = signature.get();
resultPusherInternal(writer, yielder, rowSignature);
}
catch (Exception e) {
log.error(e, "Unable to stream results back for query[%s]", queryId);
throw new ISE(e, "Unable to stream results back for query[%s]", queryId);
}
}
catch (Exception e) {
log.error(e, "Unable to stream results back for query[%s]", queryId);
throw new ISE(e, "Unable to stream results back for query[%s]", queryId);
}
finally {
closer.close();
}
}
@VisibleForTesting
static void resultPusherInternal(
ResultFormat.Writer writer,
Yielder<Object[]> yielder,
List<ColumnNameAndTypes> rowSignature
) throws IOException
{
writer.writeResponseStart();
while (!yielder.isDone()) {
@ -793,19 +867,6 @@ public class SqlStatementResource
writer.writeResponseEnd();
yielder.close();
}
catch (Exception e) {
log.error(e, "Unable to stream results back for query[%s]", queryId);
throw new ISE(e, "Unable to stream results back for query[%s]", queryId);
}
}
catch (Exception e) {
log.error(e, "Unable to stream results back for query[%s]", queryId);
throw new ISE(e, "Unable to stream results back for query[%s]", queryId);
}
finally {
closer.close();
}
}
private static void throwIfQueryIsNotSuccessful(String queryId, TaskStatusPlus statusPlus)
{

View File

@ -17,11 +17,10 @@
* under the License.
*/
package org.apache.druid.msq.indexing;
package org.apache.druid.msq.indexing.destination;
import nl.jqno.equalsverifier.EqualsVerifier;
import org.apache.druid.msq.indexing.destination.DataSourceMSQDestination;
import org.junit.Test;
public class DataSourceMSQDestinationTest

View File

@ -26,8 +26,8 @@ import org.apache.druid.error.DruidException;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.msq.indexing.error.MSQException;
import org.apache.druid.msq.indexing.error.QueryNotSupportedFault;
import org.apache.druid.msq.sql.SqlStatementResourceTest;
import org.apache.druid.msq.sql.SqlStatementState;
import org.apache.druid.msq.sql.resources.SqlStatementResourceTest;
import org.junit.Assert;
import org.junit.Test;

View File

@ -17,7 +17,7 @@
* under the License.
*/
package org.apache.druid.msq.sql;
package org.apache.druid.msq.sql.resources;
import com.fasterxml.jackson.databind.ObjectMapper;
@ -27,15 +27,17 @@ import org.apache.calcite.sql.type.SqlTypeName;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.error.DruidException;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.guava.Sequences;
import org.apache.druid.java.util.common.guava.Yielders;
import org.apache.druid.msq.indexing.MSQControllerTask;
import org.apache.druid.msq.indexing.destination.MSQSelectDestination;
import org.apache.druid.msq.indexing.error.InsertCannotBeEmptyFault;
import org.apache.druid.msq.indexing.error.MSQException;
import org.apache.druid.msq.sql.SqlStatementState;
import org.apache.druid.msq.sql.entity.ColumnNameAndTypes;
import org.apache.druid.msq.sql.entity.PageInformation;
import org.apache.druid.msq.sql.entity.ResultSetInformation;
import org.apache.druid.msq.sql.entity.SqlStatementResult;
import org.apache.druid.msq.sql.resources.SqlStatementResource;
import org.apache.druid.msq.test.MSQTestBase;
import org.apache.druid.msq.test.MSQTestOverlordServiceClient;
import org.apache.druid.msq.util.MultiStageQueryContext;
@ -43,6 +45,7 @@ import org.apache.druid.query.ExecutionMode;
import org.apache.druid.query.QueryContexts;
import org.apache.druid.segment.column.ValueType;
import org.apache.druid.sql.calcite.util.CalciteTests;
import org.apache.druid.sql.http.ResultFormat;
import org.apache.druid.sql.http.SqlQuery;
import org.apache.druid.storage.NilStorageConnector;
import org.junit.Assert;
@ -54,6 +57,7 @@ import javax.ws.rs.core.StreamingOutput;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ -142,7 +146,6 @@ public class SqlMSQStatementResourcePostTest extends MSQTestBase
@Test
public void nonSupportedModes()
{
SqlStatementResourceTest.assertExceptionMessage(
resource.doPost(new SqlQuery(
"select * from foo",
@ -241,7 +244,7 @@ public class SqlMSQStatementResourcePostTest extends MSQTestBase
+ "\"resultFormat\":\"compactedList\","
+ "\"columns\":[\"__time\",\"cnt\",\"dim1\",\"dim2\",\"dim3\",\"m1\",\"m2\",\"unique_dim1\"],"
+ "\"legacy\":false,"
+ "\"context\":{\"executionMode\":\"ASYNC\",\"scanSignature\":\"[{\\\"name\\\":\\\"__time\\\",\\\"type\\\":\\\"LONG\\\"},{\\\"name\\\":\\\"cnt\\\",\\\"type\\\":\\\"LONG\\\"},{\\\"name\\\":\\\"dim1\\\",\\\"type\\\":\\\"STRING\\\"},{\\\"name\\\":\\\"dim2\\\",\\\"type\\\":\\\"STRING\\\"},{\\\"name\\\":\\\"dim3\\\",\\\"type\\\":\\\"STRING\\\"},{\\\"name\\\":\\\"m1\\\",\\\"type\\\":\\\"FLOAT\\\"},{\\\"name\\\":\\\"m2\\\",\\\"type\\\":\\\"DOUBLE\\\"},{\\\"name\\\":\\\"unique_dim1\\\",\\\"type\\\":\\\"COMPLEX<hyperUnique>\\\"}]\",\"sqlQueryId\":\"queryId\"},\"granularity\":{\"type\":\"all\"}},\"signature\":[{\"name\":\"__time\",\"type\":\"LONG\"},{\"name\":\"dim1\",\"type\":\"STRING\"},{\"name\":\"dim2\",\"type\":\"STRING\"},{\"name\":\"dim3\",\"type\":\"STRING\"},{\"name\":\"cnt\",\"type\":\"LONG\"},{\"name\":\"m1\",\"type\":\"FLOAT\"},{\"name\":\"m2\",\"type\":\"DOUBLE\"},{\"name\":\"unique_dim1\",\"type\":\"COMPLEX<hyperUnique>\"}],\"columnMappings\":[{\"queryColumn\":\"__time\",\"outputColumn\":\"__time\"},{\"queryColumn\":\"dim1\",\"outputColumn\":\"dim1\"},{\"queryColumn\":\"dim2\",\"outputColumn\":\"dim2\"},{\"queryColumn\":\"dim3\",\"outputColumn\":\"dim3\"},{\"queryColumn\":\"cnt\",\"outputColumn\":\"cnt\"},{\"queryColumn\":\"m1\",\"outputColumn\":\"m1\"},{\"queryColumn\":\"m2\",\"outputColumn\":\"m2\"},{\"queryColumn\":\"unique_dim1\",\"outputColumn\":\"unique_dim1\"}]}],"
+ "\"context\":{\"__resultFormat\":\"object\",\"executionMode\":\"ASYNC\",\"scanSignature\":\"[{\\\"name\\\":\\\"__time\\\",\\\"type\\\":\\\"LONG\\\"},{\\\"name\\\":\\\"cnt\\\",\\\"type\\\":\\\"LONG\\\"},{\\\"name\\\":\\\"dim1\\\",\\\"type\\\":\\\"STRING\\\"},{\\\"name\\\":\\\"dim2\\\",\\\"type\\\":\\\"STRING\\\"},{\\\"name\\\":\\\"dim3\\\",\\\"type\\\":\\\"STRING\\\"},{\\\"name\\\":\\\"m1\\\",\\\"type\\\":\\\"FLOAT\\\"},{\\\"name\\\":\\\"m2\\\",\\\"type\\\":\\\"DOUBLE\\\"},{\\\"name\\\":\\\"unique_dim1\\\",\\\"type\\\":\\\"COMPLEX<hyperUnique>\\\"}]\",\"sqlQueryId\":\"queryId\"},\"granularity\":{\"type\":\"all\"}},\"signature\":[{\"name\":\"__time\",\"type\":\"LONG\"},{\"name\":\"dim1\",\"type\":\"STRING\"},{\"name\":\"dim2\",\"type\":\"STRING\"},{\"name\":\"dim3\",\"type\":\"STRING\"},{\"name\":\"cnt\",\"type\":\"LONG\"},{\"name\":\"m1\",\"type\":\"FLOAT\"},{\"name\":\"m2\",\"type\":\"DOUBLE\"},{\"name\":\"unique_dim1\",\"type\":\"COMPLEX<hyperUnique>\"}],\"columnMappings\":[{\"queryColumn\":\"__time\",\"outputColumn\":\"__time\"},{\"queryColumn\":\"dim1\",\"outputColumn\":\"dim1\"},{\"queryColumn\":\"dim2\",\"outputColumn\":\"dim2\"},{\"queryColumn\":\"dim3\",\"outputColumn\":\"dim3\"},{\"queryColumn\":\"cnt\",\"outputColumn\":\"cnt\"},{\"queryColumn\":\"m1\",\"outputColumn\":\"m1\"},{\"queryColumn\":\"m2\",\"outputColumn\":\"m2\"},{\"queryColumn\":\"unique_dim1\",\"outputColumn\":\"unique_dim1\"}]}],"
+ " RESOURCES=[{\"name\":\"foo\",\"type\":\"DATASOURCE\"}],"
+ " ATTRIBUTES={\"statementType\":\"SELECT\"}}",
String.valueOf(SqlStatementResourceTest.getResultRowsFromResponse(response).get(0))
@ -329,6 +332,7 @@ public class SqlMSQStatementResourcePostTest extends MSQTestBase
resource.doGetResults(
sqlStatementResult.getQueryId(),
null,
ResultFormat.OBJECTLINES.name(),
SqlStatementResourceTest.makeOkRequest()
),
objectMapper);
@ -344,11 +348,124 @@ public class SqlMSQStatementResourcePostTest extends MSQTestBase
resource.doGetResults(
sqlStatementResult.getQueryId(),
0L,
ResultFormat.OBJECTLINES.name(),
SqlStatementResourceTest.makeOkRequest()
),
objectMapper);
}
@Test
public void testResultFormat() throws Exception
{
Map<String, Object> context = defaultAsyncContext();
context.put(MultiStageQueryContext.CTX_SELECT_DESTINATION, MSQSelectDestination.DURABLESTORAGE.name());
SqlStatementResult sqlStatementResult = (SqlStatementResult) resource.doPost(
new SqlQuery(
"select cnt,dim1 from foo",
null,
false,
false,
false,
context,
null
),
SqlStatementResourceTest.makeOkRequest()
).getEntity();
final List<ColumnNameAndTypes> columnNameAndTypes = ImmutableList.of(
new ColumnNameAndTypes("cnt", "cnt", "cnt"),
new ColumnNameAndTypes("dim1", "dim1", "dim1")
);
List<Object[]> expectedRows = ImmutableList.of(
new Object[]{1, ""},
new Object[]{1, "10.1"},
new Object[]{1, "2"},
new Object[]{1, "1"},
new Object[]{1, "def"},
new Object[]{1, "abc"}
);
for (ResultFormat resultFormat : ResultFormat.values()) {
Assert.assertArrayEquals(
createExpectedResultsInFormat(resultFormat, expectedRows, columnNameAndTypes, objectMapper),
responseToByteArray(
resource.doGetResults(
sqlStatementResult.getQueryId(),
null,
resultFormat.name(),
SqlStatementResourceTest.makeOkRequest()
), objectMapper
)
);
Assert.assertArrayEquals(
createExpectedResultsInFormat(resultFormat, expectedRows, columnNameAndTypes, objectMapper),
responseToByteArray(
resource.doGetResults(
sqlStatementResult.getQueryId(),
0L,
resultFormat.name(),
SqlStatementResourceTest.makeOkRequest()
), objectMapper
)
);
}
}
@Test
public void testResultFormatWithParamInSelect() throws IOException
{
Map<String, Object> context = defaultAsyncContext();
context.put(MultiStageQueryContext.CTX_SELECT_DESTINATION, MSQSelectDestination.DURABLESTORAGE.name());
SqlStatementResult sqlStatementResult = (SqlStatementResult) resource.doPost(
new SqlQuery(
"select cnt,dim1 from foo",
ResultFormat.OBJECTLINES,
false,
false,
false,
context,
null
),
SqlStatementResourceTest.makeOkRequest()
).getEntity();
List<List<Object>> rows = new ArrayList<>();
rows.add(ImmutableList.of(1, ""));
rows.add(ImmutableList.of(1, "10.1"));
rows.add(ImmutableList.of(1, "2"));
rows.add(ImmutableList.of(1, "1"));
rows.add(ImmutableList.of(1, "def"));
rows.add(ImmutableList.of(1, "abc"));
Assert.assertEquals(rows, SqlStatementResourceTest.getResultRowsFromResponse(resource.doGetResults(
sqlStatementResult.getQueryId(),
null,
ResultFormat.ARRAY.name(),
SqlStatementResourceTest.makeOkRequest()
)));
Assert.assertEquals(rows, SqlStatementResourceTest.getResultRowsFromResponse(resource.doGetResults(
sqlStatementResult.getQueryId(),
0L,
ResultFormat.ARRAY.name(),
SqlStatementResourceTest.makeOkRequest()
)));
}
private byte[] createExpectedResultsInFormat(ResultFormat resultFormat, List<Object[]> resultsList, List<ColumnNameAndTypes> rowSignature, ObjectMapper jsonMapper) throws Exception
{
ByteArrayOutputStream os = new ByteArrayOutputStream();
try (final ResultFormat.Writer writer = resultFormat.createFormatter(os, jsonMapper)) {
SqlStatementResource.resultPusherInternal(writer, Yielders.each(Sequences.simple(resultsList)), rowSignature);
}
return os.toByteArray();
}
private void assertExpectedResults(String expectedResult, Response resultsResponse, ObjectMapper objectMapper) throws IOException
{
byte[] bytes = responseToByteArray(resultsResponse, objectMapper);
@ -400,6 +517,7 @@ public class SqlMSQStatementResourcePostTest extends MSQTestBase
Response resultsResponse = resource.doGetResults(
actual.getQueryId(),
0L,
null,
SqlStatementResourceTest.makeOkRequest()
);
@ -442,6 +560,7 @@ public class SqlMSQStatementResourcePostTest extends MSQTestBase
Response resultsResponse = resource.doGetResults(
actual.getQueryId(),
0L,
null,
SqlStatementResourceTest.makeOkRequest()
);
@ -452,7 +571,7 @@ public class SqlMSQStatementResourcePostTest extends MSQTestBase
private static Map<String, Object> defaultAsyncContext()
{
Map<String, Object> context = new HashMap<String, Object>();
Map<String, Object> context = new HashMap<>();
context.put(QueryContexts.CTX_EXECUTION_MODE, ExecutionMode.ASYNC.name());
return context;
}

View File

@ -17,7 +17,7 @@
* under the License.
*/
package org.apache.druid.msq.sql;
package org.apache.druid.msq.sql.resources;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
@ -57,11 +57,12 @@ import org.apache.druid.msq.indexing.report.MSQStatusReport;
import org.apache.druid.msq.indexing.report.MSQTaskReport;
import org.apache.druid.msq.indexing.report.MSQTaskReportPayload;
import org.apache.druid.msq.indexing.report.MSQTaskReportTest;
import org.apache.druid.msq.sql.MSQTaskQueryMaker;
import org.apache.druid.msq.sql.SqlStatementState;
import org.apache.druid.msq.sql.entity.ColumnNameAndTypes;
import org.apache.druid.msq.sql.entity.PageInformation;
import org.apache.druid.msq.sql.entity.ResultSetInformation;
import org.apache.druid.msq.sql.entity.SqlStatementResult;
import org.apache.druid.msq.sql.resources.SqlStatementResource;
import org.apache.druid.msq.test.MSQTestBase;
import org.apache.druid.query.Druids;
import org.apache.druid.query.Query;
@ -78,6 +79,7 @@ import org.apache.druid.server.security.AuthConfig;
import org.apache.druid.server.security.AuthenticationResult;
import org.apache.druid.sql.calcite.planner.ColumnMappings;
import org.apache.druid.sql.calcite.util.CalciteTests;
import org.apache.druid.sql.http.ResultFormat;
import org.apache.druid.sql.http.SqlResourceTest;
import org.apache.druid.storage.local.LocalFileStorageConnector;
import org.jboss.netty.handler.codec.http.DefaultHttpResponse;
@ -158,8 +160,7 @@ public class SqlStatementResourceTest extends MSQTestBase
ColumnType.STRING
)
.build()))
.destination(
TaskReportMSQDestination.INSTANCE)
.destination(TaskReportMSQDestination.instance())
.tuningConfig(
MSQTuningConfig.defaultConfig())
.build(),
@ -673,7 +674,7 @@ public class SqlStatementResourceTest extends MSQTestBase
);
assertExceptionMessage(
resource.doGetResults(ACCEPTED_SELECT_MSQ_QUERY, 0L, makeOkRequest()),
resource.doGetResults(ACCEPTED_SELECT_MSQ_QUERY, 0L, null, makeOkRequest()),
StringUtils.format(
"Query[%s] is currently in [%s] state. Please wait for it to complete.",
ACCEPTED_SELECT_MSQ_QUERY,
@ -707,7 +708,7 @@ public class SqlStatementResourceTest extends MSQTestBase
);
assertExceptionMessage(
resource.doGetResults(RUNNING_SELECT_MSQ_QUERY, 0L, makeOkRequest()),
resource.doGetResults(RUNNING_SELECT_MSQ_QUERY, 0L, null, makeOkRequest()),
StringUtils.format(
"Query[%s] is currently in [%s] state. Please wait for it to complete.",
RUNNING_SELECT_MSQ_QUERY,
@ -743,7 +744,7 @@ public class SqlStatementResourceTest extends MSQTestBase
null
)), objectMapper.writeValueAsString(response.getEntity()));
Response resultsResponse = resource.doGetResults(FINISHED_SELECT_MSQ_QUERY, 0L, makeOkRequest());
Response resultsResponse = resource.doGetResults(FINISHED_SELECT_MSQ_QUERY, 0L, ResultFormat.OBJECTLINES.name(), makeOkRequest());
Assert.assertEquals(Response.Status.OK.getStatusCode(), resultsResponse.getStatus());
String expectedResult = "{\"_time\":123,\"alias\":\"foo\",\"market\":\"bar\"}\n"
@ -761,6 +762,7 @@ public class SqlStatementResourceTest extends MSQTestBase
resource.doGetResults(
FINISHED_SELECT_MSQ_QUERY,
0L,
ResultFormat.OBJECTLINES.name(),
makeOkRequest()
)
);
@ -770,13 +772,14 @@ public class SqlStatementResourceTest extends MSQTestBase
resource.doGetResults(
FINISHED_SELECT_MSQ_QUERY,
null,
ResultFormat.OBJECTLINES.name(),
makeOkRequest()
)
);
Assert.assertEquals(
Response.Status.BAD_REQUEST.getStatusCode(),
resource.doGetResults(FINISHED_SELECT_MSQ_QUERY, -1L, makeOkRequest()).getStatus()
resource.doGetResults(FINISHED_SELECT_MSQ_QUERY, -1L, null, makeOkRequest()).getStatus()
);
}
@ -792,7 +795,7 @@ public class SqlStatementResourceTest extends MSQTestBase
for (String queryID : ImmutableList.of(ERRORED_SELECT_MSQ_QUERY, ERRORED_INSERT_MSQ_QUERY)) {
assertExceptionMessage(resource.doGetStatus(queryID, makeOkRequest()), FAILURE_MSG, Response.Status.OK);
assertExceptionMessage(
resource.doGetResults(queryID, 0L, makeOkRequest()),
resource.doGetResults(queryID, 0L, null, makeOkRequest()),
StringUtils.format(
"Query[%s] failed. Check the status api for more details.",
queryID
@ -824,16 +827,16 @@ public class SqlStatementResourceTest extends MSQTestBase
Assert.assertEquals(
Response.Status.OK.getStatusCode(),
resource.doGetResults(FINISHED_INSERT_MSQ_QUERY, 0L, makeOkRequest()).getStatus()
resource.doGetResults(FINISHED_INSERT_MSQ_QUERY, 0L, null, makeOkRequest()).getStatus()
);
Assert.assertEquals(
Response.Status.OK.getStatusCode(),
resource.doGetResults(FINISHED_INSERT_MSQ_QUERY, null, makeOkRequest()).getStatus()
resource.doGetResults(FINISHED_INSERT_MSQ_QUERY, null, null, makeOkRequest()).getStatus()
);
Assert.assertEquals(
Response.Status.BAD_REQUEST.getStatusCode(),
resource.doGetResults(FINISHED_INSERT_MSQ_QUERY, -1L, makeOkRequest()).getStatus()
resource.doGetResults(FINISHED_INSERT_MSQ_QUERY, -1L, null, makeOkRequest()).getStatus()
);
}
@ -842,7 +845,7 @@ public class SqlStatementResourceTest extends MSQTestBase
{
for (String queryID : ImmutableList.of(RUNNING_NON_MSQ_TASK, FAILED_NON_MSQ_TASK, FINISHED_NON_MSQ_TASK)) {
assertNotFound(resource.doGetStatus(queryID, makeOkRequest()), queryID);
assertNotFound(resource.doGetResults(queryID, 0L, makeOkRequest()), queryID);
assertNotFound(resource.doGetResults(queryID, 0L, null, makeOkRequest()), queryID);
assertNotFound(resource.deleteQuery(queryID, makeOkRequest()), queryID);
}
}
@ -866,7 +869,7 @@ public class SqlStatementResourceTest extends MSQTestBase
);
assertExceptionMessage(
resource.doGetResults(ACCEPTED_INSERT_MSQ_TASK, 0L, makeOkRequest()),
resource.doGetResults(ACCEPTED_INSERT_MSQ_TASK, 0L, null, makeOkRequest()),
StringUtils.format(
"Query[%s] is currently in [%s] state. Please wait for it to complete.",
ACCEPTED_INSERT_MSQ_TASK,
@ -899,7 +902,7 @@ public class SqlStatementResourceTest extends MSQTestBase
);
assertExceptionMessage(
resource.doGetResults(RUNNING_INSERT_MSQ_QUERY, 0L, makeOkRequest()),
resource.doGetResults(RUNNING_INSERT_MSQ_QUERY, 0L, null, makeOkRequest()),
StringUtils.format(
"Query[%s] is currently in [%s] state. Please wait for it to complete.",
RUNNING_INSERT_MSQ_QUERY,
@ -928,6 +931,7 @@ public class SqlStatementResourceTest extends MSQTestBase
resource.doGetResults(
RUNNING_SELECT_MSQ_QUERY,
1L,
null,
makeExpectedReq(CalciteTests.SUPER_USER_AUTH_RESULT)
).getStatus()
);
@ -959,7 +963,7 @@ public class SqlStatementResourceTest extends MSQTestBase
);
Assert.assertEquals(
Response.Status.NOT_FOUND.getStatusCode(),
resource.doGetResults(taskIdNotFound, null, makeOkRequest()).getStatus()
resource.doGetResults(taskIdNotFound, null, null, makeOkRequest()).getStatus()
);
Assert.assertEquals(
Response.Status.NOT_FOUND.getStatusCode(),

View File

@ -121,6 +121,8 @@ public enum ResultFormat
return name;
}
public static final ResultFormat DEFAULT_RESULT_FORMAT = OBJECT;
public interface Writer extends Closeable
{
/**

View File

@ -66,7 +66,7 @@ public class SqlQuery
)
{
this.query = Preconditions.checkNotNull(query, "query");
this.resultFormat = resultFormat == null ? ResultFormat.OBJECT : resultFormat;
this.resultFormat = resultFormat == null ? ResultFormat.DEFAULT_RESULT_FORMAT : resultFormat;
this.header = header;
this.typesHeader = typesHeader;
this.sqlTypesHeader = sqlTypesHeader;