add maxBytesInMemory and maxClientResponseBytes to SamplerConfig (#12947)

* add maxBytesInMemory and maxClientResponseBytes to SamplerConfig
This commit is contained in:
Clint Wylie 2022-08-25 00:50:41 -07:00 committed by GitHub
parent 82ad927087
commit 8ee8786d3c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 220 additions and 20 deletions

View File

@ -89,7 +89,7 @@ public class ObjectFlatteners
@Override
public boolean isEmpty()
{
throw new UnsupportedOperationException();
return keySet().isEmpty();
}
@Override

View File

@ -23,6 +23,7 @@ import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import org.junit.Assert;
import org.junit.Test;
@ -45,9 +46,12 @@ public class ObjectFlattenersTest
{
JsonNode node = OBJECT_MAPPER.readTree(SOME_JSON);
Map<String, Object> flat = FLATTENER.flatten(node);
Assert.assertEquals(ImmutableSet.of("extract", "foo", "bar"), flat.keySet());
Assert.assertFalse(flat.isEmpty());
Assert.assertNull(flat.get("foo"));
Assert.assertEquals(1L, flat.get("bar"));
Assert.assertEquals(1L, flat.get("extract"));
Assert.assertEquals("{\"extract\":1,\"foo\":null,\"bar\":1}", OBJECT_MAPPER.writeValueAsString(flat));
}
@Test

View File

@ -160,8 +160,8 @@ public class KafkaSamplerSpecTest extends InitializedNullHandlingTest
KafkaSamplerSpec samplerSpec = new KafkaSamplerSpec(
supervisorSpec,
new SamplerConfig(5, null),
new InputSourceSampler(),
new SamplerConfig(5, null, null, null),
new InputSourceSampler(OBJECT_MAPPER),
OBJECT_MAPPER
);
@ -335,8 +335,8 @@ public class KafkaSamplerSpecTest extends InitializedNullHandlingTest
KafkaSamplerSpec samplerSpec = new KafkaSamplerSpec(
supervisorSpec,
new SamplerConfig(5, null),
new InputSourceSampler(),
new SamplerConfig(5, null, null, null),
new InputSourceSampler(OBJECT_MAPPER),
OBJECT_MAPPER
);

View File

@ -40,6 +40,7 @@ import org.apache.druid.indexing.overlord.sampler.SamplerConfig;
import org.apache.druid.indexing.overlord.sampler.SamplerTestUtils;
import org.apache.druid.indexing.seekablestream.common.OrderedPartitionableRecord;
import org.apache.druid.indexing.seekablestream.common.StreamPartition;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.parsers.JSONPathSpec;
@ -170,8 +171,8 @@ public class KinesisSamplerSpecTest extends EasyMockSupport
KinesisSamplerSpec samplerSpec = new TestableKinesisSamplerSpec(
supervisorSpec,
new SamplerConfig(5, null),
new InputSourceSampler(),
new SamplerConfig(5, null, null, null),
new InputSourceSampler(new DefaultObjectMapper()),
null
);

View File

@ -19,6 +19,7 @@
package org.apache.druid.indexing.overlord.sampler;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.druid.client.indexing.SamplerResponse;
@ -33,6 +34,7 @@ import org.apache.druid.data.input.Row;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.TimedShutoffInputSourceReader;
import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.guice.annotations.Json;
import org.apache.druid.indexing.input.InputRowSchemas;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.FileUtils;
@ -49,6 +51,7 @@ import org.apache.druid.segment.incremental.OnheapIncrementalIndex;
import org.apache.druid.segment.indexing.DataSchema;
import javax.annotation.Nullable;
import javax.inject.Inject;
import java.io.File;
import java.util.ArrayList;
import java.util.LinkedHashMap;
@ -81,6 +84,14 @@ public class InputSourceSampler
SamplerInputRow.SAMPLER_ORDERING_COLUMN
);
private final ObjectMapper jsonMapper;
@Inject
public InputSourceSampler(@Json ObjectMapper jsonMapper)
{
this.jsonMapper = jsonMapper;
}
public SamplerResponse sample(
final InputSource inputSource,
// inputFormat can be null only if inputSource.needsFormat() = false or parser is specified.
@ -118,7 +129,11 @@ public class InputSourceSampler
List<SamplerResponseRow> responseRows = new ArrayList<>(nonNullSamplerConfig.getNumRows());
int numRowsIndexed = 0;
while (responseRows.size() < nonNullSamplerConfig.getNumRows() && iterator.hasNext()) {
while (
responseRows.size() < nonNullSamplerConfig.getNumRows() &&
index.getBytesInMemory().get() < nonNullSamplerConfig.getMaxBytesInMemory() &&
iterator.hasNext()
) {
final InputRowListPlusRawValues inputRowListPlusRawValues = iterator.next();
final List<Map<String, Object>> rawColumnsList = inputRowListPlusRawValues.getRawValuesList();
@ -173,6 +188,7 @@ public class InputSourceSampler
final List<String> columnNames = index.getColumnNames();
columnNames.remove(SamplerInputRow.SAMPLER_ORDERING_COLUMN);
for (Row row : index) {
Map<String, Object> parsed = new LinkedHashMap<>();
@ -181,7 +197,9 @@ public class InputSourceSampler
Number sortKey = row.getMetric(SamplerInputRow.SAMPLER_ORDERING_COLUMN);
if (sortKey != null) {
responseRows.set(sortKey.intValue(), responseRows.get(sortKey.intValue()).withParsed(parsed));
SamplerResponseRow theRow = responseRows.get(sortKey.intValue()).withParsed(parsed);
responseRows.set(sortKey.intValue(), theRow);
}
}
@ -190,7 +208,30 @@ public class InputSourceSampler
responseRows = responseRows.subList(0, nonNullSamplerConfig.getNumRows());
}
if (nonNullSamplerConfig.getMaxClientResponseBytes() > 0) {
long estimatedResponseSize = 0;
boolean limited = false;
int rowCounter = 0;
int parsedCounter = 0;
for (SamplerResponseRow row : responseRows) {
rowCounter++;
if (row.getInput() != null) {
parsedCounter++;
}
estimatedResponseSize += jsonMapper.writeValueAsBytes(row).length;
if (estimatedResponseSize > nonNullSamplerConfig.getMaxClientResponseBytes()) {
limited = true;
break;
}
}
if (limited) {
responseRows = responseRows.subList(0, rowCounter);
numRowsIndexed = parsedCounter;
}
}
int numRowsRead = responseRows.size();
return new SamplerResponse(
numRowsRead,
numRowsIndexed,

View File

@ -22,6 +22,12 @@ package org.apache.druid.indexing.overlord.sampler;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import org.apache.druid.data.input.InputFormat;
import org.apache.druid.data.input.InputSource;
import org.apache.druid.java.util.common.HumanReadableBytes;
import org.apache.druid.segment.indexing.DataSchema;
import javax.annotation.Nullable;
public class SamplerConfig
{
@ -29,17 +35,27 @@ public class SamplerConfig
private static final int MAX_NUM_ROWS = 5000;
private static final int DEFAULT_TIMEOUT_MS = 10000;
private final int numRows;
private final int timeoutMs;
private final long maxBytesInMemory;
private final long maxClientResponseBytes;
@JsonCreator
public SamplerConfig(
@JsonProperty("numRows") Integer numRows,
@JsonProperty("timeoutMs") Integer timeoutMs
@JsonProperty("numRows") @Nullable Integer numRows,
@JsonProperty("timeoutMs") @Nullable Integer timeoutMs,
@JsonProperty("maxBytesInMemory") @Nullable HumanReadableBytes maxBytesInMemory,
@JsonProperty("maxClientResponseBytes") @Nullable HumanReadableBytes maxClientResponseBytes
)
{
this.numRows = numRows != null ? numRows : DEFAULT_NUM_ROWS;
this.timeoutMs = timeoutMs != null ? timeoutMs : DEFAULT_TIMEOUT_MS;
this.maxBytesInMemory = maxBytesInMemory != null ? maxBytesInMemory.getBytes() : Long.MAX_VALUE;
this.maxClientResponseBytes = maxClientResponseBytes != null ? maxClientResponseBytes.getBytes() : 0;
Preconditions.checkArgument(this.numRows <= MAX_NUM_ROWS, "numRows must be <= %s", MAX_NUM_ROWS);
}
@ -47,9 +63,13 @@ public class SamplerConfig
/**
* The maximum number of rows to return in a response. The actual number of returned rows may be less if:
* - The sampled source contains less data.
* - {@link SamplerConfig#timeoutMs} elapses before this value is reached.
* - {@link SamplerConfig#timeoutMs} elapses before this value is reached
* - {@link org.apache.druid.segment.indexing.granularity.GranularitySpec#isRollup()} is true and input rows get
* rolled-up into fewer indexed rows.
* - The incremental index performing the sampling reaches {@link SamplerConfig#getMaxBytesInMemory()} before this
* value is reached
* - The estimated size of the {@link org.apache.druid.client.indexing.SamplerResponse} crosses
* {@link SamplerConfig#getMaxClientResponseBytes()}
*
* @return maximum number of sampled rows to return
*/
@ -70,8 +90,34 @@ public class SamplerConfig
return timeoutMs;
}
/**
* Maximum number of bytes in memory that the {@link org.apache.druid.segment.incremental.IncrementalIndex} used by
* {@link InputSourceSampler#sample(InputSource, InputFormat, DataSchema, SamplerConfig)} will be allowed to
* accumulate before aborting sampling. Particularly useful for limiting footprint of sample operations as well as
* overall response size from sample requests. However, it is not directly correlated to response size since it
* also contains the "raw" input data, so actual responses will likely be at least twice the size of this value,
* depending on factors such as number of transforms, aggregations in the case of rollup, whether all columns
* of the input are present in the dimension spec, and so on. If it is preferred to control client response size,
* use {@link SamplerConfig#getMaxClientResponseBytes()} instead.
*/
public long getMaxBytesInMemory()
{
return maxBytesInMemory;
}
/**
* Maximum number of bytes to accumulate for a {@link org.apache.druid.client.indexing.SamplerResponse} before
* shutting off sampling. To directly control the size of the
* {@link org.apache.druid.segment.incremental.IncrementalIndex} used for sampling, use
* {@link SamplerConfig#getMaxBytesInMemory()} instead.
*/
public long getMaxClientResponseBytes()
{
return maxClientResponseBytes;
}
public static SamplerConfig empty()
{
return new SamplerConfig(null, null);
return new SamplerConfig(null, null, null, null);
}
}

View File

@ -28,6 +28,7 @@ import org.apache.druid.data.input.impl.CsvInputFormat;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.InlineInputSource;
import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.segment.indexing.DataSchema;
import org.junit.Assert;
@ -60,7 +61,7 @@ public class CsvInputSourceSamplerTest
);
final InputSource inputSource = new InlineInputSource(String.join("\n", strCsvRows));
final InputFormat inputFormat = new CsvInputFormat(null, null, null, true, 0);
final InputSourceSampler inputSourceSampler = new InputSourceSampler();
final InputSourceSampler inputSourceSampler = new InputSourceSampler(new DefaultObjectMapper());
final SamplerResponse response = inputSourceSampler.sample(
inputSource,

View File

@ -49,6 +49,7 @@ import org.apache.druid.indexing.seekablestream.common.RecordSupplier;
import org.apache.druid.indexing.seekablestream.common.StreamPartition;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.HumanReadableBytes;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.collect.Utils;
@ -126,6 +127,7 @@ public class InputSourceSamplerTest extends InitializedNullHandlingTest
@Parameterized.Parameters(name = "parserType = {0}, useInputFormatApi={1}")
public static Iterable<Object[]> constructorFeeder()
{
OBJECT_MAPPER.registerModules(new SamplerModule().getJacksonModules());
return ImmutableList.of(
new Object[]{ParserType.STR_JSON, false},
new Object[]{ParserType.STR_JSON, true},
@ -143,7 +145,7 @@ public class InputSourceSamplerTest extends InitializedNullHandlingTest
@Before
public void setupTest()
{
inputSourceSampler = new InputSourceSampler();
inputSourceSampler = new InputSourceSampler(OBJECT_MAPPER);
mapOfRows = new ArrayList<>();
final List<String> columns = ImmutableList.of("t", "dim1", "dim2", "met1");
@ -246,7 +248,7 @@ public class InputSourceSamplerTest extends InitializedNullHandlingTest
inputSource,
createInputFormat(),
null,
new SamplerConfig(3, null)
new SamplerConfig(3, null, null, null)
);
Assert.assertEquals(3, response.getNumRowsRead());
@ -1227,10 +1229,12 @@ public class InputSourceSamplerTest extends InitializedNullHandlingTest
STR_JSON_ROWS.stream().limit(STR_JSON_ROWS.size() - 1).collect(Collectors.joining())
);
SamplerResponse response = inputSourceSampler.sample(new RecordSupplierInputSource("topicName", new TestRecordSupplier(jsonBlockList), true),
createInputFormat(),
dataSchema,
new SamplerConfig(200, 3000/*default timeout is 10s, shorten it to speed up*/));
SamplerResponse response = inputSourceSampler.sample(
new RecordSupplierInputSource("topicName", new TestRecordSupplier(jsonBlockList), true),
createInputFormat(),
dataSchema,
new SamplerConfig(200, 3000/*default timeout is 10s, shorten it to speed up*/, null, null)
);
//
// the 1st json block contains STR_JSON_ROWS.size() lines, and 2nd json block contains STR_JSON_ROWS.size()-1 lines
@ -1328,6 +1332,109 @@ public class InputSourceSamplerTest extends InitializedNullHandlingTest
inputSourceSampler.sample(failingReaderInputSource, null, null, null);
}
@Test
public void testRowLimiting() throws IOException
{
final TimestampSpec timestampSpec = new TimestampSpec("t", null, null);
final DimensionsSpec dimensionsSpec = new DimensionsSpec(null);
final AggregatorFactory[] aggregatorFactories = {new LongSumAggregatorFactory("met1", "met1")};
final GranularitySpec granularitySpec = new UniformGranularitySpec(
Granularities.DAY,
Granularities.HOUR,
true,
null
);
final DataSchema dataSchema = createDataSchema(
timestampSpec,
dimensionsSpec,
aggregatorFactories,
granularitySpec,
null
);
final InputSource inputSource = createInputSource(getTestRows(), dataSchema);
final InputFormat inputFormat = createInputFormat();
SamplerResponse response = inputSourceSampler.sample(
inputSource,
inputFormat,
dataSchema,
new SamplerConfig(4, null, null, null)
);
Assert.assertEquals(4, response.getNumRowsRead());
Assert.assertEquals(4, response.getNumRowsIndexed());
Assert.assertEquals(2, response.getData().size());
}
@Test
public void testMaxBytesInMemoryLimiting() throws IOException
{
final TimestampSpec timestampSpec = new TimestampSpec("t", null, null);
final DimensionsSpec dimensionsSpec = new DimensionsSpec(null);
final AggregatorFactory[] aggregatorFactories = {new LongSumAggregatorFactory("met1", "met1")};
final GranularitySpec granularitySpec = new UniformGranularitySpec(
Granularities.DAY,
Granularities.HOUR,
true,
null
);
final DataSchema dataSchema = createDataSchema(
timestampSpec,
dimensionsSpec,
aggregatorFactories,
granularitySpec,
null
);
final InputSource inputSource = createInputSource(getTestRows(), dataSchema);
final InputFormat inputFormat = createInputFormat();
SamplerResponse response = inputSourceSampler.sample(
inputSource,
inputFormat,
dataSchema,
new SamplerConfig(null, null, HumanReadableBytes.valueOf(256), null)
);
Assert.assertEquals(4, response.getNumRowsRead());
Assert.assertEquals(4, response.getNumRowsIndexed());
Assert.assertEquals(2, response.getData().size());
}
@Test
public void testMaxClientResponseBytesLimiting() throws IOException
{
final TimestampSpec timestampSpec = new TimestampSpec("t", null, null);
final DimensionsSpec dimensionsSpec = new DimensionsSpec(null);
final AggregatorFactory[] aggregatorFactories = {new LongSumAggregatorFactory("met1", "met1")};
final GranularitySpec granularitySpec = new UniformGranularitySpec(
Granularities.DAY,
Granularities.HOUR,
true,
null
);
final DataSchema dataSchema = createDataSchema(
timestampSpec,
dimensionsSpec,
aggregatorFactories,
granularitySpec,
null
);
final InputSource inputSource = createInputSource(getTestRows(), dataSchema);
final InputFormat inputFormat = createInputFormat();
SamplerResponse response = inputSourceSampler.sample(
inputSource,
inputFormat,
dataSchema,
new SamplerConfig(null, null, null, HumanReadableBytes.valueOf(300))
);
Assert.assertEquals(4, response.getNumRowsRead());
Assert.assertEquals(4, response.getNumRowsIndexed());
Assert.assertEquals(2, response.getData().size());
}
private List<String> getTestRows()
{
switch (parserType) {