From 8ee8786d3ce32f39d8517b5a0dddd6211101b2a4 Mon Sep 17 00:00:00 2001 From: Clint Wylie Date: Thu, 25 Aug 2022 00:50:41 -0700 Subject: [PATCH] add maxBytesInMemory and maxClientResponseBytes to SamplerConfig (#12947) * add maxBytesInMemory and maxClientResponseBytes to SamplerConfig --- .../util/common/parsers/ObjectFlatteners.java | 2 +- .../common/parsers/ObjectFlattenersTest.java | 4 + .../indexing/kafka/KafkaSamplerSpecTest.java | 8 +- .../kinesis/KinesisSamplerSpecTest.java | 5 +- .../overlord/sampler/InputSourceSampler.java | 45 ++++++- .../overlord/sampler/SamplerConfig.java | 54 +++++++- .../sampler/CsvInputSourceSamplerTest.java | 3 +- .../sampler/InputSourceSamplerTest.java | 119 +++++++++++++++++- 8 files changed, 220 insertions(+), 20 deletions(-) diff --git a/core/src/main/java/org/apache/druid/java/util/common/parsers/ObjectFlatteners.java b/core/src/main/java/org/apache/druid/java/util/common/parsers/ObjectFlatteners.java index 77ae467e855..876885344ba 100644 --- a/core/src/main/java/org/apache/druid/java/util/common/parsers/ObjectFlatteners.java +++ b/core/src/main/java/org/apache/druid/java/util/common/parsers/ObjectFlatteners.java @@ -89,7 +89,7 @@ public class ObjectFlatteners @Override public boolean isEmpty() { - throw new UnsupportedOperationException(); + return keySet().isEmpty(); } @Override diff --git a/core/src/test/java/org/apache/druid/java/util/common/parsers/ObjectFlattenersTest.java b/core/src/test/java/org/apache/druid/java/util/common/parsers/ObjectFlattenersTest.java index e48c4dafe88..2b610690db0 100644 --- a/core/src/test/java/org/apache/druid/java/util/common/parsers/ObjectFlattenersTest.java +++ b/core/src/test/java/org/apache/druid/java/util/common/parsers/ObjectFlattenersTest.java @@ -23,6 +23,7 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; import org.junit.Assert; import org.junit.Test; @@ -45,9 +46,12 @@ public class ObjectFlattenersTest { JsonNode node = OBJECT_MAPPER.readTree(SOME_JSON); Map flat = FLATTENER.flatten(node); + Assert.assertEquals(ImmutableSet.of("extract", "foo", "bar"), flat.keySet()); + Assert.assertFalse(flat.isEmpty()); Assert.assertNull(flat.get("foo")); Assert.assertEquals(1L, flat.get("bar")); Assert.assertEquals(1L, flat.get("extract")); + Assert.assertEquals("{\"extract\":1,\"foo\":null,\"bar\":1}", OBJECT_MAPPER.writeValueAsString(flat)); } @Test diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaSamplerSpecTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaSamplerSpecTest.java index 92ab899bbf5..bf5831ccaef 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaSamplerSpecTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaSamplerSpecTest.java @@ -160,8 +160,8 @@ public class KafkaSamplerSpecTest extends InitializedNullHandlingTest KafkaSamplerSpec samplerSpec = new KafkaSamplerSpec( supervisorSpec, - new SamplerConfig(5, null), - new InputSourceSampler(), + new SamplerConfig(5, null, null, null), + new InputSourceSampler(OBJECT_MAPPER), OBJECT_MAPPER ); @@ -335,8 +335,8 @@ public class KafkaSamplerSpecTest extends InitializedNullHandlingTest KafkaSamplerSpec samplerSpec = new KafkaSamplerSpec( supervisorSpec, - new SamplerConfig(5, null), - new InputSourceSampler(), + new SamplerConfig(5, null, null, null), + new InputSourceSampler(OBJECT_MAPPER), OBJECT_MAPPER ); diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisSamplerSpecTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisSamplerSpecTest.java index 918b25d4900..9a446f5265e 100644 --- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisSamplerSpecTest.java +++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisSamplerSpecTest.java @@ -40,6 +40,7 @@ import org.apache.druid.indexing.overlord.sampler.SamplerConfig; import org.apache.druid.indexing.overlord.sampler.SamplerTestUtils; import org.apache.druid.indexing.seekablestream.common.OrderedPartitionableRecord; import org.apache.druid.indexing.seekablestream.common.StreamPartition; +import org.apache.druid.jackson.DefaultObjectMapper; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.java.util.common.parsers.JSONPathSpec; @@ -170,8 +171,8 @@ public class KinesisSamplerSpecTest extends EasyMockSupport KinesisSamplerSpec samplerSpec = new TestableKinesisSamplerSpec( supervisorSpec, - new SamplerConfig(5, null), - new InputSourceSampler(), + new SamplerConfig(5, null, null, null), + new InputSourceSampler(new DefaultObjectMapper()), null ); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/sampler/InputSourceSampler.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/sampler/InputSourceSampler.java index 21685c4d745..c91d8434c21 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/sampler/InputSourceSampler.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/sampler/InputSourceSampler.java @@ -19,6 +19,7 @@ package org.apache.druid.indexing.overlord.sampler; +import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Preconditions; import org.apache.commons.lang3.ArrayUtils; import org.apache.druid.client.indexing.SamplerResponse; @@ -33,6 +34,7 @@ import org.apache.druid.data.input.Row; import org.apache.druid.data.input.impl.DimensionsSpec; import org.apache.druid.data.input.impl.TimedShutoffInputSourceReader; import org.apache.druid.data.input.impl.TimestampSpec; +import org.apache.druid.guice.annotations.Json; import org.apache.druid.indexing.input.InputRowSchemas; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.FileUtils; @@ -49,6 +51,7 @@ import org.apache.druid.segment.incremental.OnheapIncrementalIndex; import org.apache.druid.segment.indexing.DataSchema; import javax.annotation.Nullable; +import javax.inject.Inject; import java.io.File; import java.util.ArrayList; import java.util.LinkedHashMap; @@ -81,6 +84,14 @@ public class InputSourceSampler SamplerInputRow.SAMPLER_ORDERING_COLUMN ); + private final ObjectMapper jsonMapper; + + @Inject + public InputSourceSampler(@Json ObjectMapper jsonMapper) + { + this.jsonMapper = jsonMapper; + } + public SamplerResponse sample( final InputSource inputSource, // inputFormat can be null only if inputSource.needsFormat() = false or parser is specified. @@ -118,7 +129,11 @@ public class InputSourceSampler List responseRows = new ArrayList<>(nonNullSamplerConfig.getNumRows()); int numRowsIndexed = 0; - while (responseRows.size() < nonNullSamplerConfig.getNumRows() && iterator.hasNext()) { + while ( + responseRows.size() < nonNullSamplerConfig.getNumRows() && + index.getBytesInMemory().get() < nonNullSamplerConfig.getMaxBytesInMemory() && + iterator.hasNext() + ) { final InputRowListPlusRawValues inputRowListPlusRawValues = iterator.next(); final List> rawColumnsList = inputRowListPlusRawValues.getRawValuesList(); @@ -173,6 +188,7 @@ public class InputSourceSampler final List columnNames = index.getColumnNames(); columnNames.remove(SamplerInputRow.SAMPLER_ORDERING_COLUMN); + for (Row row : index) { Map parsed = new LinkedHashMap<>(); @@ -181,7 +197,9 @@ public class InputSourceSampler Number sortKey = row.getMetric(SamplerInputRow.SAMPLER_ORDERING_COLUMN); if (sortKey != null) { - responseRows.set(sortKey.intValue(), responseRows.get(sortKey.intValue()).withParsed(parsed)); + SamplerResponseRow theRow = responseRows.get(sortKey.intValue()).withParsed(parsed); + responseRows.set(sortKey.intValue(), theRow); + } } @@ -190,7 +208,30 @@ public class InputSourceSampler responseRows = responseRows.subList(0, nonNullSamplerConfig.getNumRows()); } + if (nonNullSamplerConfig.getMaxClientResponseBytes() > 0) { + long estimatedResponseSize = 0; + boolean limited = false; + int rowCounter = 0; + int parsedCounter = 0; + for (SamplerResponseRow row : responseRows) { + rowCounter++; + if (row.getInput() != null) { + parsedCounter++; + } + estimatedResponseSize += jsonMapper.writeValueAsBytes(row).length; + if (estimatedResponseSize > nonNullSamplerConfig.getMaxClientResponseBytes()) { + limited = true; + break; + } + } + if (limited) { + responseRows = responseRows.subList(0, rowCounter); + numRowsIndexed = parsedCounter; + } + } + int numRowsRead = responseRows.size(); + return new SamplerResponse( numRowsRead, numRowsIndexed, diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/sampler/SamplerConfig.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/sampler/SamplerConfig.java index 6799663cddb..aaa2cd07898 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/sampler/SamplerConfig.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/sampler/SamplerConfig.java @@ -22,6 +22,12 @@ package org.apache.druid.indexing.overlord.sampler; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Preconditions; +import org.apache.druid.data.input.InputFormat; +import org.apache.druid.data.input.InputSource; +import org.apache.druid.java.util.common.HumanReadableBytes; +import org.apache.druid.segment.indexing.DataSchema; + +import javax.annotation.Nullable; public class SamplerConfig { @@ -29,17 +35,27 @@ public class SamplerConfig private static final int MAX_NUM_ROWS = 5000; private static final int DEFAULT_TIMEOUT_MS = 10000; + + private final int numRows; private final int timeoutMs; + private final long maxBytesInMemory; + + private final long maxClientResponseBytes; + @JsonCreator public SamplerConfig( - @JsonProperty("numRows") Integer numRows, - @JsonProperty("timeoutMs") Integer timeoutMs + @JsonProperty("numRows") @Nullable Integer numRows, + @JsonProperty("timeoutMs") @Nullable Integer timeoutMs, + @JsonProperty("maxBytesInMemory") @Nullable HumanReadableBytes maxBytesInMemory, + @JsonProperty("maxClientResponseBytes") @Nullable HumanReadableBytes maxClientResponseBytes ) { this.numRows = numRows != null ? numRows : DEFAULT_NUM_ROWS; this.timeoutMs = timeoutMs != null ? timeoutMs : DEFAULT_TIMEOUT_MS; + this.maxBytesInMemory = maxBytesInMemory != null ? maxBytesInMemory.getBytes() : Long.MAX_VALUE; + this.maxClientResponseBytes = maxClientResponseBytes != null ? maxClientResponseBytes.getBytes() : 0; Preconditions.checkArgument(this.numRows <= MAX_NUM_ROWS, "numRows must be <= %s", MAX_NUM_ROWS); } @@ -47,9 +63,13 @@ public class SamplerConfig /** * The maximum number of rows to return in a response. The actual number of returned rows may be less if: * - The sampled source contains less data. - * - {@link SamplerConfig#timeoutMs} elapses before this value is reached. + * - {@link SamplerConfig#timeoutMs} elapses before this value is reached * - {@link org.apache.druid.segment.indexing.granularity.GranularitySpec#isRollup()} is true and input rows get * rolled-up into fewer indexed rows. + * - The incremental index performing the sampling reaches {@link SamplerConfig#getMaxBytesInMemory()} before this + * value is reached + * - The estimated size of the {@link org.apache.druid.client.indexing.SamplerResponse} crosses + * {@link SamplerConfig#getMaxClientResponseBytes()} * * @return maximum number of sampled rows to return */ @@ -70,8 +90,34 @@ public class SamplerConfig return timeoutMs; } + /** + * Maximum number of bytes in memory that the {@link org.apache.druid.segment.incremental.IncrementalIndex} used by + * {@link InputSourceSampler#sample(InputSource, InputFormat, DataSchema, SamplerConfig)} will be allowed to + * accumulate before aborting sampling. Particularly useful for limiting footprint of sample operations as well as + * overall response size from sample requests. However, it is not directly correlated to response size since it + * also contains the "raw" input data, so actual responses will likely be at least twice the size of this value, + * depending on factors such as number of transforms, aggregations in the case of rollup, whether all columns + * of the input are present in the dimension spec, and so on. If it is preferred to control client response size, + * use {@link SamplerConfig#getMaxClientResponseBytes()} instead. + */ + public long getMaxBytesInMemory() + { + return maxBytesInMemory; + } + + /** + * Maximum number of bytes to accumulate for a {@link org.apache.druid.client.indexing.SamplerResponse} before + * shutting off sampling. To directly control the size of the + * {@link org.apache.druid.segment.incremental.IncrementalIndex} used for sampling, use + * {@link SamplerConfig#getMaxBytesInMemory()} instead. + */ + public long getMaxClientResponseBytes() + { + return maxClientResponseBytes; + } + public static SamplerConfig empty() { - return new SamplerConfig(null, null); + return new SamplerConfig(null, null, null, null); } } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/sampler/CsvInputSourceSamplerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/sampler/CsvInputSourceSamplerTest.java index 1100cb82a1e..464579764d1 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/sampler/CsvInputSourceSamplerTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/sampler/CsvInputSourceSamplerTest.java @@ -28,6 +28,7 @@ import org.apache.druid.data.input.impl.CsvInputFormat; import org.apache.druid.data.input.impl.DimensionsSpec; import org.apache.druid.data.input.impl.InlineInputSource; import org.apache.druid.data.input.impl.TimestampSpec; +import org.apache.druid.jackson.DefaultObjectMapper; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.segment.indexing.DataSchema; import org.junit.Assert; @@ -60,7 +61,7 @@ public class CsvInputSourceSamplerTest ); final InputSource inputSource = new InlineInputSource(String.join("\n", strCsvRows)); final InputFormat inputFormat = new CsvInputFormat(null, null, null, true, 0); - final InputSourceSampler inputSourceSampler = new InputSourceSampler(); + final InputSourceSampler inputSourceSampler = new InputSourceSampler(new DefaultObjectMapper()); final SamplerResponse response = inputSourceSampler.sample( inputSource, diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/sampler/InputSourceSamplerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/sampler/InputSourceSamplerTest.java index 8244f3dcc45..50e2f63c0f8 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/sampler/InputSourceSamplerTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/sampler/InputSourceSamplerTest.java @@ -49,6 +49,7 @@ import org.apache.druid.indexing.seekablestream.common.RecordSupplier; import org.apache.druid.indexing.seekablestream.common.StreamPartition; import org.apache.druid.jackson.DefaultObjectMapper; import org.apache.druid.java.util.common.DateTimes; +import org.apache.druid.java.util.common.HumanReadableBytes; import org.apache.druid.java.util.common.IAE; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.collect.Utils; @@ -126,6 +127,7 @@ public class InputSourceSamplerTest extends InitializedNullHandlingTest @Parameterized.Parameters(name = "parserType = {0}, useInputFormatApi={1}") public static Iterable constructorFeeder() { + OBJECT_MAPPER.registerModules(new SamplerModule().getJacksonModules()); return ImmutableList.of( new Object[]{ParserType.STR_JSON, false}, new Object[]{ParserType.STR_JSON, true}, @@ -143,7 +145,7 @@ public class InputSourceSamplerTest extends InitializedNullHandlingTest @Before public void setupTest() { - inputSourceSampler = new InputSourceSampler(); + inputSourceSampler = new InputSourceSampler(OBJECT_MAPPER); mapOfRows = new ArrayList<>(); final List columns = ImmutableList.of("t", "dim1", "dim2", "met1"); @@ -246,7 +248,7 @@ public class InputSourceSamplerTest extends InitializedNullHandlingTest inputSource, createInputFormat(), null, - new SamplerConfig(3, null) + new SamplerConfig(3, null, null, null) ); Assert.assertEquals(3, response.getNumRowsRead()); @@ -1227,10 +1229,12 @@ public class InputSourceSamplerTest extends InitializedNullHandlingTest STR_JSON_ROWS.stream().limit(STR_JSON_ROWS.size() - 1).collect(Collectors.joining()) ); - SamplerResponse response = inputSourceSampler.sample(new RecordSupplierInputSource("topicName", new TestRecordSupplier(jsonBlockList), true), - createInputFormat(), - dataSchema, - new SamplerConfig(200, 3000/*default timeout is 10s, shorten it to speed up*/)); + SamplerResponse response = inputSourceSampler.sample( + new RecordSupplierInputSource("topicName", new TestRecordSupplier(jsonBlockList), true), + createInputFormat(), + dataSchema, + new SamplerConfig(200, 3000/*default timeout is 10s, shorten it to speed up*/, null, null) + ); // // the 1st json block contains STR_JSON_ROWS.size() lines, and 2nd json block contains STR_JSON_ROWS.size()-1 lines @@ -1328,6 +1332,109 @@ public class InputSourceSamplerTest extends InitializedNullHandlingTest inputSourceSampler.sample(failingReaderInputSource, null, null, null); } + @Test + public void testRowLimiting() throws IOException + { + final TimestampSpec timestampSpec = new TimestampSpec("t", null, null); + final DimensionsSpec dimensionsSpec = new DimensionsSpec(null); + final AggregatorFactory[] aggregatorFactories = {new LongSumAggregatorFactory("met1", "met1")}; + final GranularitySpec granularitySpec = new UniformGranularitySpec( + Granularities.DAY, + Granularities.HOUR, + true, + null + ); + final DataSchema dataSchema = createDataSchema( + timestampSpec, + dimensionsSpec, + aggregatorFactories, + granularitySpec, + null + ); + final InputSource inputSource = createInputSource(getTestRows(), dataSchema); + final InputFormat inputFormat = createInputFormat(); + + SamplerResponse response = inputSourceSampler.sample( + inputSource, + inputFormat, + dataSchema, + new SamplerConfig(4, null, null, null) + ); + + Assert.assertEquals(4, response.getNumRowsRead()); + Assert.assertEquals(4, response.getNumRowsIndexed()); + Assert.assertEquals(2, response.getData().size()); + + } + + @Test + public void testMaxBytesInMemoryLimiting() throws IOException + { + final TimestampSpec timestampSpec = new TimestampSpec("t", null, null); + final DimensionsSpec dimensionsSpec = new DimensionsSpec(null); + final AggregatorFactory[] aggregatorFactories = {new LongSumAggregatorFactory("met1", "met1")}; + final GranularitySpec granularitySpec = new UniformGranularitySpec( + Granularities.DAY, + Granularities.HOUR, + true, + null + ); + final DataSchema dataSchema = createDataSchema( + timestampSpec, + dimensionsSpec, + aggregatorFactories, + granularitySpec, + null + ); + final InputSource inputSource = createInputSource(getTestRows(), dataSchema); + final InputFormat inputFormat = createInputFormat(); + + SamplerResponse response = inputSourceSampler.sample( + inputSource, + inputFormat, + dataSchema, + new SamplerConfig(null, null, HumanReadableBytes.valueOf(256), null) + ); + + Assert.assertEquals(4, response.getNumRowsRead()); + Assert.assertEquals(4, response.getNumRowsIndexed()); + Assert.assertEquals(2, response.getData().size()); + } + + @Test + public void testMaxClientResponseBytesLimiting() throws IOException + { + final TimestampSpec timestampSpec = new TimestampSpec("t", null, null); + final DimensionsSpec dimensionsSpec = new DimensionsSpec(null); + final AggregatorFactory[] aggregatorFactories = {new LongSumAggregatorFactory("met1", "met1")}; + final GranularitySpec granularitySpec = new UniformGranularitySpec( + Granularities.DAY, + Granularities.HOUR, + true, + null + ); + final DataSchema dataSchema = createDataSchema( + timestampSpec, + dimensionsSpec, + aggregatorFactories, + granularitySpec, + null + ); + final InputSource inputSource = createInputSource(getTestRows(), dataSchema); + final InputFormat inputFormat = createInputFormat(); + + SamplerResponse response = inputSourceSampler.sample( + inputSource, + inputFormat, + dataSchema, + new SamplerConfig(null, null, null, HumanReadableBytes.valueOf(300)) + ); + + Assert.assertEquals(4, response.getNumRowsRead()); + Assert.assertEquals(4, response.getNumRowsIndexed()); + Assert.assertEquals(2, response.getData().size()); + } + private List getTestRows() { switch (parserType) {