mirror of https://github.com/apache/druid.git
Fix truncation detectability for SQL array, object formats. (#11685)
The SQL "array" and "object" formats are intended to return invalid JSON (lacking a ] terminator) if an error occurs midstream. This enables callers to detect truncated responses. But JsonGenerators, by default, close JSON arrays even when not explicitly told to. This patch disables automatic array closing, which fixes the problem with truncated response detection. It also adds tests for truncated responses for all result formats.
This commit is contained in:
parent
b54d989cda
commit
7220d0466b
|
@ -36,6 +36,9 @@ public class ArrayWriter implements ResultFormat.Writer
|
|||
{
|
||||
this.jsonGenerator = jsonMapper.getFactory().createGenerator(outputStream);
|
||||
this.outputStream = outputStream;
|
||||
|
||||
// Disable automatic JSON termination, so clients can detect truncated responses.
|
||||
jsonGenerator.configure(JsonGenerator.Feature.AUTO_CLOSE_JSON_CONTENT, false);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -36,6 +36,9 @@ public class ObjectWriter implements ResultFormat.Writer
|
|||
{
|
||||
this.jsonGenerator = jsonMapper.getFactory().createGenerator(outputStream);
|
||||
this.outputStream = outputStream;
|
||||
|
||||
// Disable automatic JSON termination, so clients can detect truncated responses.
|
||||
jsonGenerator.configure(JsonGenerator.Feature.AUTO_CLOSE_JSON_CONTENT, false);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -95,9 +95,11 @@ import java.util.ArrayList;
|
|||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.function.Function;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
|
@ -123,6 +125,7 @@ public class SqlResourceTest extends CalciteTestBase
|
|||
private final SettableSupplier<NonnullPair<CountDownLatch, Boolean>> validateAndAuthorizeLatchSupplier = new SettableSupplier<>();
|
||||
private final SettableSupplier<NonnullPair<CountDownLatch, Boolean>> planLatchSupplier = new SettableSupplier<>();
|
||||
private final SettableSupplier<NonnullPair<CountDownLatch, Boolean>> executeLatchSupplier = new SettableSupplier<>();
|
||||
private final SettableSupplier<Function<Sequence<Object[]>, Sequence<Object[]>>> sequenceMapFnSupplier = new SettableSupplier<>();
|
||||
|
||||
private boolean sleep = false;
|
||||
|
||||
|
@ -254,7 +257,8 @@ public class SqlResourceTest extends CalciteTestBase
|
|||
System.nanoTime(),
|
||||
validateAndAuthorizeLatchSupplier,
|
||||
planLatchSupplier,
|
||||
executeLatchSupplier
|
||||
executeLatchSupplier,
|
||||
sequenceMapFnSupplier
|
||||
);
|
||||
}
|
||||
},
|
||||
|
@ -505,6 +509,76 @@ public class SqlResourceTest extends CalciteTestBase
|
|||
);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testArrayResultFormatWithErrorAfterFirstRow() throws Exception
|
||||
{
|
||||
sequenceMapFnSupplier.set(errorAfterSecondRowMapFn());
|
||||
|
||||
final String query = "SELECT cnt FROM foo";
|
||||
final Pair<QueryException, String> response =
|
||||
doPostRaw(new SqlQuery(query, ResultFormat.ARRAY, false, null, null), req);
|
||||
|
||||
// Truncated response: missing final ]
|
||||
Assert.assertNull(response.lhs);
|
||||
Assert.assertEquals("[[1],[1]", response.rhs);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testObjectResultFormatWithErrorAfterFirstRow() throws Exception
|
||||
{
|
||||
sequenceMapFnSupplier.set(errorAfterSecondRowMapFn());
|
||||
|
||||
final String query = "SELECT cnt FROM foo";
|
||||
final Pair<QueryException, String> response =
|
||||
doPostRaw(new SqlQuery(query, ResultFormat.OBJECT, false, null, null), req);
|
||||
|
||||
// Truncated response: missing final ]
|
||||
Assert.assertNull(response.lhs);
|
||||
Assert.assertEquals("[{\"cnt\":1},{\"cnt\":1}", response.rhs);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testArrayLinesResultFormatWithErrorAfterFirstRow() throws Exception
|
||||
{
|
||||
sequenceMapFnSupplier.set(errorAfterSecondRowMapFn());
|
||||
|
||||
final String query = "SELECT cnt FROM foo";
|
||||
final Pair<QueryException, String> response =
|
||||
doPostRaw(new SqlQuery(query, ResultFormat.ARRAYLINES, false, null, null), req);
|
||||
|
||||
// Truncated response: missing final LFLF
|
||||
Assert.assertNull(response.lhs);
|
||||
Assert.assertEquals("[1]\n[1]", response.rhs);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testObjectLinesResultFormatWithErrorAfterFirstRow() throws Exception
|
||||
{
|
||||
sequenceMapFnSupplier.set(errorAfterSecondRowMapFn());
|
||||
|
||||
final String query = "SELECT cnt FROM foo";
|
||||
final Pair<QueryException, String> response =
|
||||
doPostRaw(new SqlQuery(query, ResultFormat.OBJECTLINES, false, null, null), req);
|
||||
|
||||
// Truncated response: missing final LFLF
|
||||
Assert.assertNull(response.lhs);
|
||||
Assert.assertEquals("{\"cnt\":1}\n{\"cnt\":1}", response.rhs);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCsvResultFormatWithErrorAfterFirstRow() throws Exception
|
||||
{
|
||||
sequenceMapFnSupplier.set(errorAfterSecondRowMapFn());
|
||||
|
||||
final String query = "SELECT cnt FROM foo";
|
||||
final Pair<QueryException, String> response =
|
||||
doPostRaw(new SqlQuery(query, ResultFormat.CSV, false, null, null), req);
|
||||
|
||||
// Truncated response: missing final LFLF
|
||||
Assert.assertNull(response.lhs);
|
||||
Assert.assertEquals("1\n1\n", response.rhs);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testArrayResultFormatWithHeader() throws Exception
|
||||
{
|
||||
|
@ -1128,7 +1202,14 @@ public class SqlResourceTest extends CalciteTestBase
|
|||
if (response.getStatus() == 200) {
|
||||
final StreamingOutput output = (StreamingOutput) response.getEntity();
|
||||
final ByteArrayOutputStream baos = new ByteArrayOutputStream();
|
||||
output.write(baos);
|
||||
try {
|
||||
output.write(baos);
|
||||
}
|
||||
catch (Exception ignored) {
|
||||
// Suppress errors and return the response so far. Similar to what the real web server would do, if it
|
||||
// started writing a 200 OK and then threw an exception in the middle.
|
||||
}
|
||||
|
||||
return Pair.of(
|
||||
null,
|
||||
new String(baos.toByteArray(), StandardCharsets.UTF_8)
|
||||
|
@ -1180,11 +1261,26 @@ public class SqlResourceTest extends CalciteTestBase
|
|||
return req;
|
||||
}
|
||||
|
||||
private static Function<Sequence<Object[]>, Sequence<Object[]>> errorAfterSecondRowMapFn()
|
||||
{
|
||||
return results -> {
|
||||
final AtomicLong rows = new AtomicLong();
|
||||
return results.map(row -> {
|
||||
if (rows.incrementAndGet() == 3) {
|
||||
throw new ISE("Oh no!");
|
||||
} else {
|
||||
return row;
|
||||
}
|
||||
});
|
||||
};
|
||||
}
|
||||
|
||||
private static class TestSqlLifecycle extends SqlLifecycle
|
||||
{
|
||||
private final SettableSupplier<NonnullPair<CountDownLatch, Boolean>> validateAndAuthorizeLatchSupplier;
|
||||
private final SettableSupplier<NonnullPair<CountDownLatch, Boolean>> planLatchSupplier;
|
||||
private final SettableSupplier<NonnullPair<CountDownLatch, Boolean>> executeLatchSupplier;
|
||||
private final SettableSupplier<Function<Sequence<Object[]>, Sequence<Object[]>>> sequenceMapFnSupplier;
|
||||
|
||||
private TestSqlLifecycle(
|
||||
PlannerFactory plannerFactory,
|
||||
|
@ -1195,13 +1291,15 @@ public class SqlResourceTest extends CalciteTestBase
|
|||
long startNs,
|
||||
SettableSupplier<NonnullPair<CountDownLatch, Boolean>> validateAndAuthorizeLatchSupplier,
|
||||
SettableSupplier<NonnullPair<CountDownLatch, Boolean>> planLatchSupplier,
|
||||
SettableSupplier<NonnullPair<CountDownLatch, Boolean>> executeLatchSupplier
|
||||
SettableSupplier<NonnullPair<CountDownLatch, Boolean>> executeLatchSupplier,
|
||||
SettableSupplier<Function<Sequence<Object[]>, Sequence<Object[]>>> sequenceMapFnSupplier
|
||||
)
|
||||
{
|
||||
super(plannerFactory, emitter, requestLogger, queryScheduler, startMs, startNs);
|
||||
this.validateAndAuthorizeLatchSupplier = validateAndAuthorizeLatchSupplier;
|
||||
this.planLatchSupplier = planLatchSupplier;
|
||||
this.executeLatchSupplier = executeLatchSupplier;
|
||||
this.sequenceMapFnSupplier = sequenceMapFnSupplier;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -1253,9 +1351,12 @@ public class SqlResourceTest extends CalciteTestBase
|
|||
@Override
|
||||
public Sequence<Object[]> execute()
|
||||
{
|
||||
final Function<Sequence<Object[]>, Sequence<Object[]>> sequenceMapFn =
|
||||
Optional.ofNullable(sequenceMapFnSupplier.get()).orElse(Function.identity());
|
||||
|
||||
if (executeLatchSupplier.get() != null) {
|
||||
if (executeLatchSupplier.get().rhs) {
|
||||
Sequence<Object[]> sequence = super.execute();
|
||||
Sequence<Object[]> sequence = sequenceMapFn.apply(super.execute());
|
||||
executeLatchSupplier.get().lhs.countDown();
|
||||
return sequence;
|
||||
} else {
|
||||
|
@ -1267,10 +1368,10 @@ public class SqlResourceTest extends CalciteTestBase
|
|||
catch (InterruptedException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
return super.execute();
|
||||
return sequenceMapFn.apply(super.execute());
|
||||
}
|
||||
} else {
|
||||
return super.execute();
|
||||
return sequenceMapFn.apply(super.execute());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue