QueryResource: Don't close JSON content on error. (#17034)

* QueryResource: Don't close JSON content on error.

Following similar issues fixed in #11685 and #15880, this patch fixes
a bug where QueryResource would write a closing array marker if it
encountered an exception after starting to push results. This makes it
difficult for callers to detect errors.

The prior patches didn't catch this problem because QueryResource uses
the ObjectMapper in a unique way, through writeValuesAsArray, which
doesn't respect the global AUTO_CLOSE_JSON_CONTENT setting.

* Fix usage of customized ObjectMappers.
This commit is contained in:
Gian Merlino 2024-09-14 15:32:49 -07:00 committed by GitHub
parent 27443a0600
commit d7be12067f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 100 additions and 40 deletions

View File

@ -19,7 +19,7 @@
package org.apache.druid.server; package org.apache.druid.server;
import com.fasterxml.jackson.databind.ObjectWriter; import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import com.google.common.base.Strings; import com.google.common.base.Strings;
import com.google.common.collect.Iterables; import com.google.common.collect.Iterables;
@ -62,7 +62,6 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
import javax.annotation.Nullable; import javax.annotation.Nullable;
import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletRequest;
import java.util.HashSet; import java.util.HashSet;
import java.util.LinkedHashMap; import java.util.LinkedHashMap;
import java.util.Map; import java.util.Map;
@ -434,7 +433,7 @@ public class QueryLifecycle
|| (!shouldFinalize && queryContext.isSerializeDateTimeAsLongInner(false)); || (!shouldFinalize && queryContext.isSerializeDateTimeAsLongInner(false));
} }
public ObjectWriter newOutputWriter(ResourceIOReaderWriter ioReaderWriter) public ObjectMapper newOutputWriter(ResourceIOReaderWriter ioReaderWriter)
{ {
return ioReaderWriter.getResponseWriter().newOutputWriter( return ioReaderWriter.getResponseWriter().newOutputWriter(
getToolChest(), getToolChest(),

View File

@ -19,11 +19,12 @@
package org.apache.druid.server; package org.apache.druid.server;
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.core.JsonParseException; import com.fasterxml.jackson.core.JsonParseException;
import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectWriter; import com.fasterxml.jackson.databind.SerializationFeature;
import com.fasterxml.jackson.databind.SequenceWriter; import com.fasterxml.jackson.databind.SerializerProvider;
import com.fasterxml.jackson.databind.module.SimpleModule; import com.fasterxml.jackson.databind.module.SimpleModule;
import com.fasterxml.jackson.datatype.joda.ser.DateTimeSerializer; import com.fasterxml.jackson.datatype.joda.ser.DateTimeSerializer;
import com.fasterxml.jackson.jaxrs.smile.SmileMediaTypes; import com.fasterxml.jackson.jaxrs.smile.SmileMediaTypes;
@ -37,6 +38,7 @@ import org.apache.druid.guice.LazySingleton;
import org.apache.druid.guice.annotations.Json; import org.apache.druid.guice.annotations.Json;
import org.apache.druid.guice.annotations.Self; import org.apache.druid.guice.annotations.Self;
import org.apache.druid.guice.annotations.Smile; import org.apache.druid.guice.annotations.Smile;
import org.apache.druid.java.util.common.jackson.JacksonUtils;
import org.apache.druid.java.util.emitter.EmittingLogger; import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.query.BadJsonQueryException; import org.apache.druid.query.BadJsonQueryException;
import org.apache.druid.query.Query; import org.apache.druid.query.Query;
@ -374,7 +376,7 @@ public class QueryResource implements QueryCountStatsProvider
return responseType; return responseType;
} }
ObjectWriter newOutputWriter( ObjectMapper newOutputWriter(
@Nullable QueryToolChest<?, Query<?>> toolChest, @Nullable QueryToolChest<?, Query<?>> toolChest,
@Nullable Query<?> query, @Nullable Query<?> query,
boolean serializeDateTimeAsLong boolean serializeDateTimeAsLong
@ -387,7 +389,7 @@ public class QueryResource implements QueryCountStatsProvider
} else { } else {
decoratedMapper = mapper; decoratedMapper = mapper;
} }
return isPretty ? decoratedMapper.writerWithDefaultPrettyPrinter() : decoratedMapper.writer(); return isPretty ? decoratedMapper.copy().enable(SerializationFeature.INDENT_OUTPUT) : decoratedMapper;
} }
Response ok(Object object) throws IOException Response ok(Object object) throws IOException
@ -531,35 +533,7 @@ public class QueryResource implements QueryCountStatsProvider
@Override @Override
public Writer makeWriter(OutputStream out) throws IOException public Writer makeWriter(OutputStream out) throws IOException
{ {
final ObjectWriter objectWriter = queryLifecycle.newOutputWriter(io); return new NativeQueryWriter(queryLifecycle.newOutputWriter(io), out);
final SequenceWriter sequenceWriter = objectWriter.writeValuesAsArray(out);
return new Writer()
{
@Override
public void writeResponseStart()
{
// Do nothing
}
@Override
public void writeRow(Object obj) throws IOException
{
sequenceWriter.write(obj);
}
@Override
public void writeResponseEnd()
{
// Do nothing
}
@Override
public void close() throws IOException
{
sequenceWriter.close();
}
};
} }
@Override @Override
@ -585,8 +559,49 @@ public class QueryResource implements QueryCountStatsProvider
@Override @Override
public void writeException(Exception e, OutputStream out) throws IOException public void writeException(Exception e, OutputStream out) throws IOException
{ {
final ObjectWriter objectWriter = queryLifecycle.newOutputWriter(io); final ObjectMapper objectMapper = queryLifecycle.newOutputWriter(io);
out.write(objectWriter.writeValueAsBytes(e)); out.write(objectMapper.writeValueAsBytes(e));
}
}
static class NativeQueryWriter implements QueryResultPusher.Writer
{
private final SerializerProvider serializers;
private final JsonGenerator jsonGenerator;
public NativeQueryWriter(final ObjectMapper responseMapper, final OutputStream out) throws IOException
{
// Don't use objectWriter.writeValuesAsArray(out), because that causes an end array ] to be written when the
// writer is closed, even if it's closed in case of an exception. This causes valid JSON to be emitted in case
// of an exception, which makes it difficult for callers to detect problems. Note: this means that if an error
// occurs on a Historical (or other data server) after it started to push results to the Broker, the Broker
// will experience that as "JsonEOFException: Unexpected end-of-input: expected close marker for Array".
this.serializers = responseMapper.getSerializerProviderInstance();
this.jsonGenerator = responseMapper.createGenerator(out);
}
@Override
public void writeResponseStart() throws IOException
{
jsonGenerator.writeStartArray();
}
@Override
public void writeRow(Object obj) throws IOException
{
JacksonUtils.writeObjectUsingSerializerProvider(jsonGenerator, serializers, obj);
}
@Override
public void writeResponseEnd() throws IOException
{
jsonGenerator.writeEndArray();
}
@Override
public void close() throws IOException
{
jsonGenerator.close();
} }
} }
} }

View File

@ -19,6 +19,7 @@
package org.apache.druid.server; package org.apache.druid.server;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.jaxrs.smile.SmileMediaTypes; import com.fasterxml.jackson.jaxrs.smile.SmileMediaTypes;
@ -80,12 +81,14 @@ import org.apache.druid.server.security.AuthorizerMapper;
import org.apache.druid.server.security.ForbiddenException; import org.apache.druid.server.security.ForbiddenException;
import org.apache.druid.server.security.Resource; import org.apache.druid.server.security.Resource;
import org.apache.http.HttpStatus; import org.apache.http.HttpStatus;
import org.hamcrest.CoreMatchers;
import org.hamcrest.MatcherAssert; import org.hamcrest.MatcherAssert;
import org.joda.time.Interval; import org.joda.time.Interval;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
import org.junit.BeforeClass; import org.junit.BeforeClass;
import org.junit.Test; import org.junit.Test;
import org.junit.internal.matchers.ThrowableMessageMatcher;
import javax.annotation.Nonnull; import javax.annotation.Nonnull;
import javax.annotation.Nullable; import javax.annotation.Nullable;
@ -98,6 +101,7 @@ import java.io.ByteArrayOutputStream;
import java.io.IOException; import java.io.IOException;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
@ -424,7 +428,8 @@ public class QueryResourceTest
overrideConfig, overrideConfig,
new AuthConfig(), new AuthConfig(),
System.currentTimeMillis(), System.currentTimeMillis(),
System.nanoTime()) System.nanoTime()
)
{ {
@Override @Override
public void emitLogsAndMetrics(@Nullable Throwable e, @Nullable String remoteAddress, long bytesWritten) public void emitLogsAndMetrics(@Nullable Throwable e, @Nullable String remoteAddress, long bytesWritten)
@ -453,7 +458,8 @@ public class QueryResourceTest
entity.getUnderlyingException(), entity.getUnderlyingException(),
new DruidExceptionMatcher( new DruidExceptionMatcher(
DruidException.Persona.OPERATOR, DruidException.Persona.OPERATOR,
DruidException.Category.RUNTIME_FAILURE, "legacyQueryException") DruidException.Category.RUNTIME_FAILURE, "legacyQueryException"
)
.expectMessageIs("something") .expectMessageIs("something")
); );
} }
@ -1250,6 +1256,46 @@ public class QueryResourceTest
} }
} }
@Test
public void testNativeQueryWriter_goodResponse() throws IOException
{
final ByteArrayOutputStream baos = new ByteArrayOutputStream();
final QueryResultPusher.Writer writer = new QueryResource.NativeQueryWriter(jsonMapper, baos);
writer.writeResponseStart();
writer.writeRow(Arrays.asList("foo", "bar"));
writer.writeRow(Collections.singletonList("baz"));
writer.writeResponseEnd();
writer.close();
Assert.assertEquals(
ImmutableList.of(
ImmutableList.of("foo", "bar"),
ImmutableList.of("baz")
),
jsonMapper.readValue(baos.toByteArray(), Object.class)
);
}
@Test
public void testNativeQueryWriter_truncatedResponse() throws IOException
{
final ByteArrayOutputStream baos = new ByteArrayOutputStream();
final QueryResultPusher.Writer writer = new QueryResource.NativeQueryWriter(jsonMapper, baos);
writer.writeResponseStart();
writer.writeRow(Arrays.asList("foo", "bar"));
writer.close(); // Simulate an error that occurs midstream; close writer without calling writeResponseEnd.
final JsonProcessingException e = Assert.assertThrows(
JsonProcessingException.class,
() -> jsonMapper.readValue(baos.toByteArray(), Object.class)
);
MatcherAssert.assertThat(
e,
ThrowableMessageMatcher.hasMessage(CoreMatchers.containsString("expected close marker for Array"))
);
}
private void createScheduledQueryResource( private void createScheduledQueryResource(
QueryScheduler scheduler, QueryScheduler scheduler,
Collection<CountDownLatch> beforeScheduler, Collection<CountDownLatch> beforeScheduler,