diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/resources/SqlStatementResource.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/resources/SqlStatementResource.java index 60f9f3db947..409aec7f5a9 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/resources/SqlStatementResource.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/resources/SqlStatementResource.java @@ -773,7 +773,7 @@ public class SqlStatementResource ) throws IOException { try { - try (final ResultFormat.Writer writer = ResultFormat.OBJECT.createFormatter(os, jsonMapper)) { + try (final ResultFormat.Writer writer = ResultFormat.OBJECTLINES.createFormatter(os, jsonMapper)) { Yielder yielder = results.get(); List rowSignature = signature.get(); writer.writeResponseStart(); diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/sql/SqlMSQStatementResourcePostTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/sql/SqlMSQStatementResourcePostTest.java index 385b3a89b6b..b7421739d1a 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/sql/SqlMSQStatementResourcePostTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/sql/SqlMSQStatementResourcePostTest.java @@ -20,6 +20,7 @@ package org.apache.druid.msq.sql; +import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import org.apache.calcite.sql.type.SqlTypeName; @@ -49,8 +50,10 @@ import org.junit.Before; import org.junit.Test; import javax.ws.rs.core.Response; +import javax.ws.rs.core.StreamingOutput; +import java.io.ByteArrayOutputStream; import java.io.IOException; -import java.util.ArrayList; +import java.nio.charset.StandardCharsets; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -315,26 +318,52 @@ public class SqlMSQStatementResourcePostTest extends MSQTestBase SqlStatementResourceTest.makeOkRequest() ).getEntity(); + assertExpectedResults( + "{\"cnt\":1,\"dim1\":\"\"}\n" + + "{\"cnt\":1,\"dim1\":\"10.1\"}\n" + + "{\"cnt\":1,\"dim1\":\"2\"}\n" + + "{\"cnt\":1,\"dim1\":\"1\"}\n" + + "{\"cnt\":1,\"dim1\":\"def\"}\n" + + "{\"cnt\":1,\"dim1\":\"abc\"}\n" + + "\n", + resource.doGetResults( + sqlStatementResult.getQueryId(), + null, + SqlStatementResourceTest.makeOkRequest() + ), + objectMapper); - List> rows = new ArrayList<>(); - rows.add(ImmutableMap.of("cnt", 1, "dim1", "")); - rows.add(ImmutableMap.of("cnt", 1, "dim1", "10.1")); - rows.add(ImmutableMap.of("cnt", 1, "dim1", "2")); - rows.add(ImmutableMap.of("cnt", 1, "dim1", "1")); - rows.add(ImmutableMap.of("cnt", 1, "dim1", "def")); - rows.add(ImmutableMap.of("cnt", 1, "dim1", "abc")); + assertExpectedResults( + "{\"cnt\":1,\"dim1\":\"\"}\n" + + "{\"cnt\":1,\"dim1\":\"10.1\"}\n" + + "{\"cnt\":1,\"dim1\":\"2\"}\n" + + "{\"cnt\":1,\"dim1\":\"1\"}\n" + + "{\"cnt\":1,\"dim1\":\"def\"}\n" + + "{\"cnt\":1,\"dim1\":\"abc\"}\n" + + "\n", + resource.doGetResults( + sqlStatementResult.getQueryId(), + 0L, + SqlStatementResourceTest.makeOkRequest() + ), + objectMapper); + } - Assert.assertEquals(rows, SqlStatementResourceTest.getResultRowsFromResponse(resource.doGetResults( - sqlStatementResult.getQueryId(), - null, - SqlStatementResourceTest.makeOkRequest() - ))); + private void assertExpectedResults(String expectedResult, Response resultsResponse, ObjectMapper objectMapper) throws IOException + { + byte[] bytes = responseToByteArray(resultsResponse, objectMapper); + Assert.assertEquals(expectedResult, new String(bytes, StandardCharsets.UTF_8)); + } - Assert.assertEquals(rows, SqlStatementResourceTest.getResultRowsFromResponse(resource.doGetResults( - sqlStatementResult.getQueryId(), - 0L, - SqlStatementResourceTest.makeOkRequest() - ))); + public static byte[] responseToByteArray(Response resp, ObjectMapper objectMapper) throws IOException + { + if (resp.getEntity() instanceof StreamingOutput) { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + ((StreamingOutput) resp.getEntity()).write(baos); + return baos.toByteArray(); + } else { + return objectMapper.writeValueAsBytes(resp.getEntity()); + } } @Test diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/sql/SqlStatementResourceTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/sql/SqlStatementResourceTest.java index 0315eaa874b..2ff3f643f6c 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/sql/SqlStatementResourceTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/sql/SqlStatementResourceTest.java @@ -95,7 +95,6 @@ import javax.ws.rs.core.Response; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.ArrayDeque; -import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -747,33 +746,32 @@ public class SqlStatementResourceTest extends MSQTestBase Response resultsResponse = resource.doGetResults(FINISHED_SELECT_MSQ_QUERY, 0L, makeOkRequest()); Assert.assertEquals(Response.Status.OK.getStatusCode(), resultsResponse.getStatus()); - List> rows = new ArrayList<>(); - rows.add(ROW1); - rows.add(ROW2); + String expectedResult = "{\"_time\":123,\"alias\":\"foo\",\"market\":\"bar\"}\n" + + "{\"_time\":234,\"alias\":\"foo1\",\"market\":\"bar1\"}\n\n"; - Assert.assertEquals(rows, getResultRowsFromResponse(resultsResponse)); + assertExpectedResults(expectedResult, resultsResponse); Assert.assertEquals( Response.Status.OK.getStatusCode(), resource.deleteQuery(FINISHED_SELECT_MSQ_QUERY, makeOkRequest()).getStatus() ); - Assert.assertEquals( - rows, - getResultRowsFromResponse(resource.doGetResults( + assertExpectedResults( + expectedResult, + resource.doGetResults( FINISHED_SELECT_MSQ_QUERY, 0L, makeOkRequest() - )) + ) ); - Assert.assertEquals( - rows, - getResultRowsFromResponse(resource.doGetResults( + assertExpectedResults( + expectedResult, + resource.doGetResults( FINISHED_SELECT_MSQ_QUERY, null, makeOkRequest() - )) + ) ); Assert.assertEquals( @@ -782,6 +780,12 @@ public class SqlStatementResourceTest extends MSQTestBase ); } + private void assertExpectedResults(String expectedResult, Response resultsResponse) throws IOException + { + byte[] bytes = SqlResourceTest.responseToByteArray(resultsResponse); + Assert.assertEquals(expectedResult, new String(bytes, StandardCharsets.UTF_8)); + } + @Test public void testFailedMSQQuery() {