Configurable Index Type (#10335)

* Introduce a Configurable Index Type

* Change to @UnstableApi

* Add AppendableIndexSpecTest

* Update doc

* Add spelling exception

* Add tests coverage

* Revert some of the changes to reduce diff

* Minor fixes

* Update getMaxBytesInMemoryOrDefault() comment

* Fix typo, remove redundant interface

* Remove off-heap spec (postponed to a later PR)

* Add javadocs to AppendableIndexSpec

* Describe testCreateTask()

* Add tests for AppendableIndexSpec within TuningConfig

* Modify hashCode() to conform with equals()

* Add comment where building incremental-index

* Add "EqualsVerifier" tests

* Revert some of the API back to AppenderatorConfig

* Don't use multi-line comments

* Remove knob documentation (deferred)
This commit is contained in:
Liran Funaro 2020-10-24 04:34:26 +03:00 committed by GitHub
parent 1b9a8c4687
commit f3a2903218
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
77 changed files with 712 additions and 198 deletions

View File

@ -181,6 +181,7 @@ public class MaterializedViewSupervisorSpec implements SupervisorSpec
tuningConfig.getShardSpecs(),
tuningConfig.getIndexSpec(),
tuningConfig.getIndexSpecForIntermediatePersists(),
tuningConfig.getAppendableIndexSpec(),
tuningConfig.getRowFlushBoundary(),
tuningConfig.getMaxBytesInMemory(),
tuningConfig.isLeaveIntermediate(),

View File

@ -83,6 +83,7 @@ public class MaterializedViewSupervisorTest
private TaskQueue taskQueue;
private MaterializedViewSupervisor supervisor;
private String derivativeDatasourceName;
private MaterializedViewSupervisorSpec spec;
private final ObjectMapper objectMapper = TestHelper.makeJsonMapper();
@Before
@ -103,7 +104,7 @@ public class MaterializedViewSupervisorTest
taskQueue = EasyMock.createMock(TaskQueue.class);
taskQueue.start();
objectMapper.registerSubtypes(new NamedType(HashBasedNumberedShardSpec.class, "hashed"));
MaterializedViewSupervisorSpec spec = new MaterializedViewSupervisorSpec(
spec = new MaterializedViewSupervisorSpec(
"base",
new DimensionsSpec(Collections.singletonList(new StringDimensionSchema("dim")), null, null),
new AggregatorFactory[]{new LongSumAggregatorFactory("m1", "m1")},
@ -317,6 +318,35 @@ public class MaterializedViewSupervisorTest
}
/**
* Verifies that creating HadoopIndexTask compleates without raising exception.
*/
@Test
public void testCreateTask()
{
List<DataSegment> baseSegments = Collections.singletonList(
new DataSegment(
"base",
Intervals.of("2015-01-02T00Z/2015-01-03T00Z"),
"2015-01-03",
ImmutableMap.of(),
ImmutableList.of("dim1", "dim2"),
ImmutableList.of("m1"),
new HashBasedNumberedShardSpec(0, 1, 0, 1, null, null, null),
9,
1024
)
);
HadoopIndexTask task = spec.createTask(
Intervals.of("2015-01-02T00Z/2015-01-03T00Z"),
"2015-01-03",
baseSegments
);
Assert.assertNotNull(task);
}
@Test
public void testSuspendedDoesntRun()
{

View File

@ -203,6 +203,11 @@
<artifactId>assertj-core</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>nl.jqno.equalsverifier</groupId>
<artifactId>equalsverifier</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>

View File

@ -23,6 +23,7 @@ import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskTuningConfig;
import org.apache.druid.segment.IndexSpec;
import org.apache.druid.segment.incremental.AppendableIndexSpec;
import org.apache.druid.segment.writeout.SegmentWriteOutMediumFactory;
import org.joda.time.Period;
@ -33,6 +34,7 @@ public class KafkaIndexTaskTuningConfig extends SeekableStreamIndexTaskTuningCon
{
@JsonCreator
public KafkaIndexTaskTuningConfig(
@JsonProperty("appendableIndexSpec") @Nullable AppendableIndexSpec appendableIndexSpec,
@JsonProperty("maxRowsInMemory") @Nullable Integer maxRowsInMemory,
@JsonProperty("maxBytesInMemory") @Nullable Long maxBytesInMemory,
@JsonProperty("maxRowsPerSegment") @Nullable Integer maxRowsPerSegment,
@ -55,6 +57,7 @@ public class KafkaIndexTaskTuningConfig extends SeekableStreamIndexTaskTuningCon
)
{
super(
appendableIndexSpec,
maxRowsInMemory,
maxBytesInMemory,
maxRowsPerSegment,
@ -81,6 +84,7 @@ public class KafkaIndexTaskTuningConfig extends SeekableStreamIndexTaskTuningCon
public KafkaIndexTaskTuningConfig withBasePersistDirectory(File dir)
{
return new KafkaIndexTaskTuningConfig(
getAppendableIndexSpec(),
getMaxRowsInMemory(),
getMaxBytesInMemory(),
getMaxRowsPerSegment(),

View File

@ -23,7 +23,7 @@ import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.indexing.kafka.KafkaIndexTaskTuningConfig;
import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorTuningConfig;
import org.apache.druid.segment.IndexSpec;
import org.apache.druid.segment.indexing.TuningConfigs;
import org.apache.druid.segment.incremental.AppendableIndexSpec;
import org.apache.druid.segment.writeout.SegmentWriteOutMediumFactory;
import org.joda.time.Duration;
import org.joda.time.Period;
@ -67,11 +67,13 @@ public class KafkaSupervisorTuningConfig extends KafkaIndexTaskTuningConfig
null,
null,
null,
null,
null
);
}
public KafkaSupervisorTuningConfig(
@JsonProperty("appendableIndexSpec") @Nullable AppendableIndexSpec appendableIndexSpec,
@JsonProperty("maxRowsInMemory") Integer maxRowsInMemory,
@JsonProperty("maxBytesInMemory") Long maxBytesInMemory,
@JsonProperty("maxRowsPerSegment") Integer maxRowsPerSegment,
@ -100,6 +102,7 @@ public class KafkaSupervisorTuningConfig extends KafkaIndexTaskTuningConfig
)
{
super(
appendableIndexSpec,
maxRowsInMemory,
maxBytesInMemory,
maxRowsPerSegment,
@ -193,7 +196,7 @@ public class KafkaSupervisorTuningConfig extends KafkaIndexTaskTuningConfig
"maxRowsInMemory=" + getMaxRowsInMemory() +
", maxRowsPerSegment=" + getMaxRowsPerSegment() +
", maxTotalRows=" + getMaxTotalRows() +
", maxBytesInMemory=" + TuningConfigs.getMaxBytesInMemoryOrDefault(getMaxBytesInMemory()) +
", maxBytesInMemory=" + getMaxBytesInMemoryOrDefault() +
", intermediatePersistPeriod=" + getIntermediatePersistPeriod() +
", basePersistDirectory=" + getBasePersistDirectory() +
", maxPendingPersists=" + getMaxPendingPersists() +
@ -219,6 +222,7 @@ public class KafkaSupervisorTuningConfig extends KafkaIndexTaskTuningConfig
public KafkaIndexTaskTuningConfig convertToTaskTuningConfig()
{
return new KafkaIndexTaskTuningConfig(
getAppendableIndexSpec(),
getMaxRowsInMemory(),
getMaxBytesInMemory(),
getMaxRowsPerSegment(),

View File

@ -2497,6 +2497,7 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase
) throws JsonProcessingException
{
final KafkaIndexTaskTuningConfig tuningConfig = new KafkaIndexTaskTuningConfig(
null,
1000,
null,
maxRowsPerSegment,

View File

@ -21,11 +21,13 @@ package org.apache.druid.indexing.kafka;
import com.fasterxml.jackson.databind.Module;
import com.fasterxml.jackson.databind.ObjectMapper;
import nl.jqno.equalsverifier.EqualsVerifier;
import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorTuningConfig;
import org.apache.druid.indexing.kafka.test.TestModifiedKafkaIndexTaskTuningConfig;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.segment.IndexSpec;
import org.apache.druid.segment.data.CompressionStrategy;
import org.apache.druid.segment.incremental.OnheapIncrementalIndex;
import org.apache.druid.segment.indexing.TuningConfig;
import org.joda.time.Period;
import org.junit.Assert;
@ -60,6 +62,7 @@ public class KafkaIndexTaskTuningConfigTest
);
Assert.assertNotNull(config.getBasePersistDirectory());
Assert.assertEquals(new OnheapIncrementalIndex.Spec(), config.getAppendableIndexSpec());
Assert.assertEquals(1000000, config.getMaxRowsInMemory());
Assert.assertEquals(5_000_000, config.getMaxRowsPerSegment().intValue());
Assert.assertNull(config.getMaxTotalRows());
@ -85,7 +88,8 @@ public class KafkaIndexTaskTuningConfigTest
+ " \"reportParseExceptions\": true,\n"
+ " \"handoffConditionTimeout\": 100,\n"
+ " \"indexSpec\": { \"metricCompression\" : \"NONE\" },\n"
+ " \"indexSpecForIntermediatePersists\": { \"dimensionCompression\" : \"uncompressed\" }\n"
+ " \"indexSpecForIntermediatePersists\": { \"dimensionCompression\" : \"uncompressed\" },\n"
+ " \"appendableIndexSpec\": { \"type\" : \"onheap\" }\n"
+ "}";
KafkaIndexTaskTuningConfig config = (KafkaIndexTaskTuningConfig) mapper.readValue(
@ -99,6 +103,7 @@ public class KafkaIndexTaskTuningConfigTest
);
Assert.assertEquals(new File("/tmp/xxx"), config.getBasePersistDirectory());
Assert.assertEquals(new OnheapIncrementalIndex.Spec(), config.getAppendableIndexSpec());
Assert.assertEquals(100, config.getMaxRowsInMemory());
Assert.assertEquals(100, config.getMaxRowsPerSegment().intValue());
Assert.assertNotEquals(null, config.getMaxTotalRows());
@ -115,6 +120,7 @@ public class KafkaIndexTaskTuningConfigTest
public void testConvert()
{
KafkaSupervisorTuningConfig original = new KafkaSupervisorTuningConfig(
null,
1,
null,
2,
@ -142,6 +148,7 @@ public class KafkaIndexTaskTuningConfigTest
);
KafkaIndexTaskTuningConfig copy = (KafkaIndexTaskTuningConfig) original.convertToTaskTuningConfig();
Assert.assertEquals(original.getAppendableIndexSpec(), copy.getAppendableIndexSpec());
Assert.assertEquals(1, copy.getMaxRowsInMemory());
Assert.assertEquals(2, copy.getMaxRowsPerSegment().intValue());
Assert.assertNotEquals(null, copy.getMaxTotalRows());
@ -158,6 +165,7 @@ public class KafkaIndexTaskTuningConfigTest
public void testSerdeWithModifiedTuningConfigAddedField() throws IOException
{
KafkaIndexTaskTuningConfig base = new KafkaIndexTaskTuningConfig(
null,
1,
null,
2,
@ -183,6 +191,7 @@ public class KafkaIndexTaskTuningConfigTest
mapper.readValue(serialized, TestModifiedKafkaIndexTaskTuningConfig.class);
Assert.assertEquals(null, deserialized.getExtra());
Assert.assertEquals(base.getAppendableIndexSpec(), deserialized.getAppendableIndexSpec());
Assert.assertEquals(base.getMaxRowsInMemory(), deserialized.getMaxRowsInMemory());
Assert.assertEquals(base.getMaxBytesInMemory(), deserialized.getMaxBytesInMemory());
Assert.assertEquals(base.getMaxRowsPerSegment(), deserialized.getMaxRowsPerSegment());
@ -206,6 +215,7 @@ public class KafkaIndexTaskTuningConfigTest
public void testSerdeWithModifiedTuningConfigRemovedField() throws IOException
{
TestModifiedKafkaIndexTaskTuningConfig base = new TestModifiedKafkaIndexTaskTuningConfig(
null,
1,
null,
2,
@ -231,6 +241,7 @@ public class KafkaIndexTaskTuningConfigTest
KafkaIndexTaskTuningConfig deserialized =
mapper.readValue(serialized, KafkaIndexTaskTuningConfig.class);
Assert.assertEquals(base.getAppendableIndexSpec(), deserialized.getAppendableIndexSpec());
Assert.assertEquals(base.getMaxRowsInMemory(), deserialized.getMaxRowsInMemory());
Assert.assertEquals(base.getMaxBytesInMemory(), deserialized.getMaxBytesInMemory());
Assert.assertEquals(base.getMaxRowsPerSegment(), deserialized.getMaxRowsPerSegment());
@ -249,4 +260,12 @@ public class KafkaIndexTaskTuningConfigTest
Assert.assertEquals(base.getMaxParseExceptions(), deserialized.getMaxParseExceptions());
Assert.assertEquals(base.getMaxSavedParseExceptions(), deserialized.getMaxSavedParseExceptions());
}
@Test
public void testEqualsAndHashCode()
{
EqualsVerifier.forClass(KafkaIndexTaskTuningConfig.class)
.usingGetClass()
.verify();
}
}

View File

@ -270,6 +270,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
null,
null,
null,
null,
null
),
null
@ -3070,6 +3071,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
kafkaHost,
dataSchema,
new KafkaSupervisorTuningConfig(
null,
1000,
null,
50000,
@ -3109,6 +3111,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
DataSchema modifiedDataSchema = getDataSchema("some other datasource");
KafkaSupervisorTuningConfig modifiedTuningConfig = new KafkaSupervisorTuningConfig(
null,
42, // This is different
null,
50000,
@ -3404,6 +3407,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
};
final KafkaSupervisorTuningConfig tuningConfig = new KafkaSupervisorTuningConfig(
null,
1000,
null,
50000,
@ -3514,6 +3518,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
};
final KafkaSupervisorTuningConfig tuningConfig = new KafkaSupervisorTuningConfig(
null,
1000,
null,
50000,

View File

@ -25,6 +25,7 @@ import org.apache.druid.indexing.kafka.KafkaIndexTaskModule;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.segment.IndexSpec;
import org.apache.druid.segment.data.CompressionStrategy;
import org.apache.druid.segment.incremental.OnheapIncrementalIndex;
import org.apache.druid.segment.indexing.TuningConfig;
import org.joda.time.Duration;
import org.joda.time.Period;
@ -59,6 +60,7 @@ public class KafkaSupervisorTuningConfigTest
);
Assert.assertNotNull(config.getBasePersistDirectory());
Assert.assertEquals(new OnheapIncrementalIndex.Spec(), config.getAppendableIndexSpec());
Assert.assertEquals(1000000, config.getMaxRowsInMemory());
Assert.assertEquals(5_000_000, config.getMaxRowsPerSegment().intValue());
Assert.assertEquals(new Period("PT10M"), config.getIntermediatePersistPeriod());
@ -94,7 +96,8 @@ public class KafkaSupervisorTuningConfigTest
+ " \"shutdownTimeout\": \"PT95S\",\n"
+ " \"offsetFetchPeriod\": \"PT20S\",\n"
+ " \"indexSpec\": { \"metricCompression\" : \"NONE\" },\n"
+ " \"indexSpecForIntermediatePersists\": { \"dimensionCompression\" : \"uncompressed\" }\n"
+ " \"indexSpecForIntermediatePersists\": { \"dimensionCompression\" : \"uncompressed\" },\n"
+ " \"appendableIndexSpec\": { \"type\" : \"onheap\" }\n"
+ "}";
KafkaSupervisorTuningConfig config = (KafkaSupervisorTuningConfig) mapper.readValue(
@ -108,6 +111,7 @@ public class KafkaSupervisorTuningConfigTest
);
Assert.assertEquals(new File("/tmp/xxx"), config.getBasePersistDirectory());
Assert.assertEquals(new OnheapIncrementalIndex.Spec(), config.getAppendableIndexSpec());
Assert.assertEquals(100, config.getMaxRowsInMemory());
Assert.assertEquals(100, config.getMaxRowsPerSegment().intValue());
Assert.assertEquals(new Period("PT1H"), config.getIntermediatePersistPeriod());

View File

@ -24,6 +24,7 @@ import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
import org.apache.druid.indexing.kafka.KafkaIndexTaskTuningConfig;
import org.apache.druid.segment.IndexSpec;
import org.apache.druid.segment.incremental.AppendableIndexSpec;
import org.apache.druid.segment.writeout.SegmentWriteOutMediumFactory;
import org.joda.time.Period;
@ -37,6 +38,7 @@ public class TestModifiedKafkaIndexTaskTuningConfig extends KafkaIndexTaskTuning
@JsonCreator
public TestModifiedKafkaIndexTaskTuningConfig(
@JsonProperty("appendableIndexSpec") @Nullable AppendableIndexSpec appendableIndexSpec,
@JsonProperty("maxRowsInMemory") @Nullable Integer maxRowsInMemory,
@JsonProperty("maxBytesInMemory") @Nullable Long maxBytesInMemory,
@JsonProperty("maxRowsPerSegment") @Nullable Integer maxRowsPerSegment,
@ -60,6 +62,7 @@ public class TestModifiedKafkaIndexTaskTuningConfig extends KafkaIndexTaskTuning
)
{
super(
appendableIndexSpec,
maxRowsInMemory,
maxBytesInMemory,
maxRowsPerSegment,

View File

@ -192,6 +192,11 @@
<artifactId>system-rules</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>nl.jqno.equalsverifier</groupId>
<artifactId>equalsverifier</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>

View File

@ -25,6 +25,7 @@ import com.fasterxml.jackson.annotation.JsonTypeName;
import com.google.common.base.Preconditions;
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskTuningConfig;
import org.apache.druid.segment.IndexSpec;
import org.apache.druid.segment.incremental.AppendableIndexSpec;
import org.apache.druid.segment.writeout.SegmentWriteOutMediumFactory;
import org.joda.time.Period;
@ -50,6 +51,7 @@ public class KinesisIndexTaskTuningConfig extends SeekableStreamIndexTaskTuningC
@JsonCreator
public KinesisIndexTaskTuningConfig(
@JsonProperty("appendableIndexSpec") @Nullable AppendableIndexSpec appendableIndexSpec,
@JsonProperty("maxRowsInMemory") Integer maxRowsInMemory,
@JsonProperty("maxBytesInMemory") Long maxBytesInMemory,
@JsonProperty("maxRowsPerSegment") Integer maxRowsPerSegment,
@ -78,6 +80,7 @@ public class KinesisIndexTaskTuningConfig extends SeekableStreamIndexTaskTuningC
)
{
super(
appendableIndexSpec,
maxRowsInMemory,
maxBytesInMemory,
maxRowsPerSegment,
@ -154,6 +157,7 @@ public class KinesisIndexTaskTuningConfig extends SeekableStreamIndexTaskTuningC
public KinesisIndexTaskTuningConfig withBasePersistDirectory(File dir)
{
return new KinesisIndexTaskTuningConfig(
getAppendableIndexSpec(),
getMaxRowsInMemory(),
getMaxBytesInMemory(),
getMaxRowsPerSegment(),

View File

@ -23,6 +23,7 @@ import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.indexing.kinesis.KinesisIndexTaskTuningConfig;
import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorTuningConfig;
import org.apache.druid.segment.IndexSpec;
import org.apache.druid.segment.incremental.AppendableIndexSpec;
import org.apache.druid.segment.writeout.SegmentWriteOutMediumFactory;
import org.joda.time.Duration;
import org.joda.time.Period;
@ -75,11 +76,13 @@ public class KinesisSupervisorTuningConfig extends KinesisIndexTaskTuningConfig
null,
null,
null,
null,
null
);
}
public KinesisSupervisorTuningConfig(
@JsonProperty("appendableIndexSpec") @Nullable AppendableIndexSpec appendableIndexSpec,
@JsonProperty("maxRowsInMemory") Integer maxRowsInMemory,
@JsonProperty("maxBytesInMemory") Long maxBytesInMemory,
@JsonProperty("maxRowsPerSegment") Integer maxRowsPerSegment,
@ -115,6 +118,7 @@ public class KinesisSupervisorTuningConfig extends KinesisIndexTaskTuningConfig
)
{
super(
appendableIndexSpec,
maxRowsInMemory,
maxBytesInMemory,
maxRowsPerSegment,
@ -248,6 +252,7 @@ public class KinesisSupervisorTuningConfig extends KinesisIndexTaskTuningConfig
public KinesisIndexTaskTuningConfig convertToTaskTuningConfig()
{
return new KinesisIndexTaskTuningConfig(
getAppendableIndexSpec(),
getMaxRowsInMemory(),
getMaxBytesInMemory(),
getMaxRowsPerSegment(),

View File

@ -72,6 +72,7 @@ public class KinesisIndexTaskSerdeTest
null,
null,
null,
null,
null
);
private static final KinesisIndexTaskIOConfig IO_CONFIG = new KinesisIndexTaskIOConfig(

View File

@ -2739,6 +2739,7 @@ public class KinesisIndexTaskTest extends SeekableStreamIndexTaskTestBase
boolean resetOffsetAutomatically = false;
int maxRowsInMemory = 1000;
final KinesisIndexTaskTuningConfig tuningConfig = new KinesisIndexTaskTuningConfig(
null,
maxRowsInMemory,
null,
maxRowsPerSegment,

View File

@ -22,10 +22,12 @@ package org.apache.druid.indexing.kinesis;
import com.fasterxml.jackson.databind.JsonMappingException;
import com.fasterxml.jackson.databind.Module;
import com.fasterxml.jackson.databind.ObjectMapper;
import nl.jqno.equalsverifier.EqualsVerifier;
import org.apache.druid.indexing.kinesis.supervisor.KinesisSupervisorTuningConfig;
import org.apache.druid.indexing.kinesis.test.TestModifiedKinesisIndexTaskTuningConfig;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.segment.IndexSpec;
import org.apache.druid.segment.incremental.OnheapIncrementalIndex;
import org.apache.druid.segment.indexing.TuningConfig;
import org.hamcrest.CoreMatchers;
import org.joda.time.Period;
@ -66,6 +68,7 @@ public class KinesisIndexTaskTuningConfigTest
);
Assert.assertNotNull(config.getBasePersistDirectory());
Assert.assertEquals(new OnheapIncrementalIndex.Spec(), config.getAppendableIndexSpec());
Assert.assertEquals(1000000, config.getMaxRowsInMemory());
Assert.assertEquals(5_000_000, config.getMaxRowsPerSegment().intValue());
Assert.assertEquals(new Period("PT10M"), config.getIntermediatePersistPeriod());
@ -102,7 +105,8 @@ public class KinesisIndexTaskTuningConfigTest
+ " \"fetchSequenceNumberTimeout\": 6000,\n"
+ " \"resetOffsetAutomatically\": false,\n"
+ " \"skipSequenceNumberAvailabilityCheck\": true,\n"
+ " \"fetchThreads\": 2\n"
+ " \"fetchThreads\": 2,\n"
+ " \"appendableIndexSpec\": { \"type\" : \"onheap\" }\n"
+ "}";
KinesisIndexTaskTuningConfig config = (KinesisIndexTaskTuningConfig) mapper.readValue(
@ -116,6 +120,7 @@ public class KinesisIndexTaskTuningConfigTest
);
Assert.assertEquals(new File("/tmp/xxx"), config.getBasePersistDirectory());
Assert.assertEquals(new OnheapIncrementalIndex.Spec(), config.getAppendableIndexSpec());
Assert.assertEquals(100, config.getMaxRowsInMemory());
Assert.assertEquals(100, config.getMaxRowsPerSegment().intValue());
Assert.assertEquals(new Period("PT1H"), config.getIntermediatePersistPeriod());
@ -136,6 +141,7 @@ public class KinesisIndexTaskTuningConfigTest
public void testSerdeWithModifiedTuningConfigAddedField() throws IOException
{
KinesisIndexTaskTuningConfig base = new KinesisIndexTaskTuningConfig(
null,
1,
3L,
2,
@ -168,6 +174,7 @@ public class KinesisIndexTaskTuningConfigTest
mapper.readValue(serialized, TestModifiedKinesisIndexTaskTuningConfig.class);
Assert.assertEquals(null, deserialized.getExtra());
Assert.assertEquals(base.getAppendableIndexSpec(), deserialized.getAppendableIndexSpec());
Assert.assertEquals(base.getMaxRowsInMemory(), deserialized.getMaxRowsInMemory());
Assert.assertEquals(base.getMaxBytesInMemory(), deserialized.getMaxBytesInMemory());
Assert.assertEquals(base.getMaxRowsPerSegment(), deserialized.getMaxRowsPerSegment());
@ -195,6 +202,7 @@ public class KinesisIndexTaskTuningConfigTest
public void testSerdeWithModifiedTuningConfigRemovedField() throws IOException
{
KinesisIndexTaskTuningConfig base = new KinesisIndexTaskTuningConfig(
null,
1,
3L,
2,
@ -226,6 +234,7 @@ public class KinesisIndexTaskTuningConfigTest
KinesisIndexTaskTuningConfig deserialized =
mapper.readValue(serialized, KinesisIndexTaskTuningConfig.class);
Assert.assertEquals(base.getAppendableIndexSpec(), deserialized.getAppendableIndexSpec());
Assert.assertEquals(base.getMaxRowsInMemory(), deserialized.getMaxRowsInMemory());
Assert.assertEquals(base.getMaxBytesInMemory(), deserialized.getMaxBytesInMemory());
Assert.assertEquals(base.getMaxRowsPerSegment(), deserialized.getMaxRowsPerSegment());
@ -282,6 +291,7 @@ public class KinesisIndexTaskTuningConfigTest
public void testConvert()
{
KinesisSupervisorTuningConfig original = new KinesisSupervisorTuningConfig(
null,
1,
(long) 3,
2,
@ -317,6 +327,7 @@ public class KinesisIndexTaskTuningConfigTest
);
KinesisIndexTaskTuningConfig copy = (KinesisIndexTaskTuningConfig) original.convertToTaskTuningConfig();
Assert.assertEquals(original.getAppendableIndexSpec(), copy.getAppendableIndexSpec());
Assert.assertEquals(1, copy.getMaxRowsInMemory());
Assert.assertEquals(3, copy.getMaxBytesInMemory());
Assert.assertEquals(2, copy.getMaxRowsPerSegment().intValue());
@ -338,4 +349,12 @@ public class KinesisIndexTaskTuningConfigTest
Assert.assertEquals(100, copy.getMaxRecordsPerPoll());
Assert.assertEquals(new Period().withDays(Integer.MAX_VALUE), copy.getIntermediateHandoffPeriod());
}
@Test
public void testEqualsAndHashCode()
{
EqualsVerifier.forClass(KinesisIndexTaskTuningConfig.class)
.usingGetClass()
.verify();
}
}

View File

@ -167,6 +167,7 @@ public class KinesisSupervisorTest extends EasyMockSupport
supervisorRecordSupplier = createMock(KinesisRecordSupplier.class);
tuningConfig = new KinesisSupervisorTuningConfig(
null,
1000,
null,
50000,
@ -3689,6 +3690,7 @@ public class KinesisSupervisorTest extends EasyMockSupport
DataSchema modifiedDataSchema = getDataSchema("some other datasource");
KinesisSupervisorTuningConfig modifiedTuningConfig = new KinesisSupervisorTuningConfig(
null,
1000,
null,
50000,
@ -4741,6 +4743,7 @@ public class KinesisSupervisorTest extends EasyMockSupport
};
final KinesisSupervisorTuningConfig tuningConfig = new KinesisSupervisorTuningConfig(
null,
1000,
null,
50000,

View File

@ -24,6 +24,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.druid.indexing.kinesis.KinesisIndexingServiceModule;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.segment.IndexSpec;
import org.apache.druid.segment.incremental.OnheapIncrementalIndex;
import org.apache.druid.segment.indexing.TuningConfig;
import org.joda.time.Duration;
import org.joda.time.Period;
@ -58,6 +59,7 @@ public class KinesisSupervisorTuningConfigTest
);
Assert.assertNotNull(config.getBasePersistDirectory());
Assert.assertEquals(new OnheapIncrementalIndex.Spec(), config.getAppendableIndexSpec());
Assert.assertEquals(1000000, config.getMaxRowsInMemory());
Assert.assertEquals(5_000_000, config.getMaxRowsPerSegment().intValue());
Assert.assertEquals(new Period("PT10M"), config.getIntermediatePersistPeriod());
@ -92,7 +94,8 @@ public class KinesisSupervisorTuningConfigTest
+ " \"chatRetries\": 14,\n"
+ " \"httpTimeout\": \"PT15S\",\n"
+ " \"shutdownTimeout\": \"PT95S\",\n"
+ " \"repartitionTransitionDuration\": \"PT500S\"\n"
+ " \"repartitionTransitionDuration\": \"PT500S\",\n"
+ " \"appendableIndexSpec\": { \"type\" : \"onheap\" }\n"
+ "}";
KinesisSupervisorTuningConfig config = (KinesisSupervisorTuningConfig) mapper.readValue(
@ -106,6 +109,7 @@ public class KinesisSupervisorTuningConfigTest
);
Assert.assertEquals(new File("/tmp/xxx"), config.getBasePersistDirectory());
Assert.assertEquals(new OnheapIncrementalIndex.Spec(), config.getAppendableIndexSpec());
Assert.assertEquals(100, config.getMaxRowsInMemory());
Assert.assertEquals(100, config.getMaxRowsPerSegment().intValue());
Assert.assertEquals(new Period("PT1H"), config.getIntermediatePersistPeriod());

View File

@ -24,6 +24,7 @@ import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
import org.apache.druid.indexing.kinesis.KinesisIndexTaskTuningConfig;
import org.apache.druid.segment.IndexSpec;
import org.apache.druid.segment.incremental.AppendableIndexSpec;
import org.apache.druid.segment.writeout.SegmentWriteOutMediumFactory;
import org.joda.time.Period;
@ -37,6 +38,7 @@ public class TestModifiedKinesisIndexTaskTuningConfig extends KinesisIndexTaskTu
@JsonCreator
public TestModifiedKinesisIndexTaskTuningConfig(
@JsonProperty("appendableIndexSpec") @Nullable AppendableIndexSpec appendableIndexSpec,
@JsonProperty("maxRowsInMemory") Integer maxRowsInMemory,
@JsonProperty("maxBytesInMemory") Long maxBytesInMemory,
@JsonProperty("maxRowsPerSegment") Integer maxRowsPerSegment,
@ -66,6 +68,7 @@ public class TestModifiedKinesisIndexTaskTuningConfig extends KinesisIndexTaskTu
)
{
super(
appendableIndexSpec,
maxRowsInMemory,
maxBytesInMemory,
maxRowsPerSegment,
@ -98,6 +101,7 @@ public class TestModifiedKinesisIndexTaskTuningConfig extends KinesisIndexTaskTu
public TestModifiedKinesisIndexTaskTuningConfig(KinesisIndexTaskTuningConfig base, String extra)
{
super(
base.getAppendableIndexSpec(),
base.getMaxRowsInMemory(),
base.getMaxBytesInMemory(),
base.getMaxRowsPerSegment(),

View File

@ -29,6 +29,7 @@ import org.apache.druid.indexer.partitions.DimensionBasedPartitionsSpec;
import org.apache.druid.indexer.partitions.HashedPartitionsSpec;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.segment.IndexSpec;
import org.apache.druid.segment.incremental.AppendableIndexSpec;
import org.apache.druid.segment.indexing.TuningConfig;
import javax.annotation.Nullable;
@ -56,6 +57,7 @@ public class HadoopTuningConfig implements TuningConfig
DEFAULT_SHARD_SPECS,
DEFAULT_INDEX_SPEC,
DEFAULT_INDEX_SPEC,
DEFAULT_APPENDABLE_INDEX,
DEFAULT_ROW_FLUSH_BOUNDARY,
0L,
false,
@ -83,6 +85,7 @@ public class HadoopTuningConfig implements TuningConfig
private final Map<Long, List<HadoopyShardSpec>> shardSpecs;
private final IndexSpec indexSpec;
private final IndexSpec indexSpecForIntermediatePersists;
private final AppendableIndexSpec appendableIndexSpec;
private final int rowFlushBoundary;
private final long maxBytesInMemory;
private final boolean leaveIntermediate;
@ -108,6 +111,7 @@ public class HadoopTuningConfig implements TuningConfig
final @JsonProperty("shardSpecs") @Nullable Map<Long, List<HadoopyShardSpec>> shardSpecs,
final @JsonProperty("indexSpec") @Nullable IndexSpec indexSpec,
final @JsonProperty("indexSpecForIntermediatePersists") @Nullable IndexSpec indexSpecForIntermediatePersists,
final @JsonProperty("appendableIndexSpec") @Nullable AppendableIndexSpec appendableIndexSpec,
final @JsonProperty("maxRowsInMemory") @Nullable Integer maxRowsInMemory,
final @JsonProperty("maxBytesInMemory") @Nullable Long maxBytesInMemory,
final @JsonProperty("leaveIntermediate") boolean leaveIntermediate,
@ -140,8 +144,9 @@ public class HadoopTuningConfig implements TuningConfig
this.rowFlushBoundary = maxRowsInMemory == null ? maxRowsInMemoryCOMPAT == null
? DEFAULT_ROW_FLUSH_BOUNDARY
: maxRowsInMemoryCOMPAT : maxRowsInMemory;
this.appendableIndexSpec = appendableIndexSpec == null ? DEFAULT_APPENDABLE_INDEX : appendableIndexSpec;
// initializing this to 0, it will be lazily initialized to a value
// @see server.src.main.java.org.apache.druid.segment.indexing.TuningConfigs#getMaxBytesInMemoryOrDefault(long)
// @see #getMaxBytesInMemoryOrDefault()
this.maxBytesInMemory = maxBytesInMemory == null ? 0 : maxBytesInMemory;
this.leaveIntermediate = leaveIntermediate;
this.cleanupOnFailure = cleanupOnFailure == null ? true : cleanupOnFailure;
@ -211,6 +216,13 @@ public class HadoopTuningConfig implements TuningConfig
return indexSpecForIntermediatePersists;
}
@JsonProperty
@Override
public AppendableIndexSpec getAppendableIndexSpec()
{
return appendableIndexSpec;
}
@JsonProperty("maxRowsInMemory")
public int getRowFlushBoundary()
{
@ -218,6 +230,7 @@ public class HadoopTuningConfig implements TuningConfig
}
@JsonProperty
@Override
public long getMaxBytesInMemory()
{
return maxBytesInMemory;
@ -327,6 +340,7 @@ public class HadoopTuningConfig implements TuningConfig
shardSpecs,
indexSpec,
indexSpecForIntermediatePersists,
appendableIndexSpec,
rowFlushBoundary,
maxBytesInMemory,
leaveIntermediate,
@ -357,6 +371,7 @@ public class HadoopTuningConfig implements TuningConfig
shardSpecs,
indexSpec,
indexSpecForIntermediatePersists,
appendableIndexSpec,
rowFlushBoundary,
maxBytesInMemory,
leaveIntermediate,
@ -387,6 +402,7 @@ public class HadoopTuningConfig implements TuningConfig
specs,
indexSpec,
indexSpecForIntermediatePersists,
appendableIndexSpec,
rowFlushBoundary,
maxBytesInMemory,
leaveIntermediate,

View File

@ -50,7 +50,6 @@ import org.apache.druid.segment.QueryableIndex;
import org.apache.druid.segment.column.ColumnCapabilities;
import org.apache.druid.segment.incremental.IncrementalIndex;
import org.apache.druid.segment.incremental.IncrementalIndexSchema;
import org.apache.druid.segment.indexing.TuningConfigs;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.NumberedShardSpec;
import org.apache.druid.timeline.partition.ShardSpec;
@ -302,11 +301,12 @@ public class IndexGeneratorJob implements Jobby
.withRollup(config.getSchema().getDataSchema().getGranularitySpec().isRollup())
.build();
IncrementalIndex newIndex = new IncrementalIndex.Builder()
// Build the incremental-index according to the spec that was chosen by the user
IncrementalIndex newIndex = tuningConfig.getAppendableIndexSpec().builder()
.setIndexSchema(indexSchema)
.setMaxRowCount(tuningConfig.getRowFlushBoundary())
.setMaxBytesInMemory(TuningConfigs.getMaxBytesInMemoryOrDefault(tuningConfig.getMaxBytesInMemory()))
.buildOnheap();
.setMaxBytesInMemory(tuningConfig.getMaxBytesInMemoryOrDefault())
.build();
if (oldDimOrder != null && !indexSchema.getDimensionsSpec().hasCustomDimensions()) {
newIndex.loadDimensionIterable(oldDimOrder, oldCapabilities);

View File

@ -465,6 +465,7 @@ public class BatchDeltaIngestionTest
null,
null,
null,
null,
false,
false,
false,

View File

@ -219,6 +219,7 @@ public class DetermineHashedPartitionsJobTest
null,
null,
null,
null,
false,
false,
false,

View File

@ -328,6 +328,7 @@ public class DeterminePartitionsJobTest
null,
null,
null,
null,
false,
false,
false,

View File

@ -241,6 +241,7 @@ public class HadoopDruidIndexerConfigTest
null,
null,
null,
null,
false,
false,
false,

View File

@ -23,6 +23,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.segment.IndexSpec;
import org.apache.druid.segment.incremental.OnheapIncrementalIndex;
import org.junit.Assert;
import org.junit.Test;
@ -44,6 +45,7 @@ public class HadoopTuningConfigTest
null,
null,
null,
null,
100,
null,
true,
@ -68,6 +70,7 @@ public class HadoopTuningConfigTest
Assert.assertEquals("/tmp/workingpath", actual.getWorkingPath());
Assert.assertEquals("version", actual.getVersion());
Assert.assertEquals(new OnheapIncrementalIndex.Spec(), actual.getAppendableIndexSpec());
Assert.assertNotNull(actual.getPartitionsSpec());
Assert.assertEquals(ImmutableMap.<Long, List<HadoopyShardSpec>>of(), actual.getShardSpecs());
Assert.assertEquals(new IndexSpec(), actual.getIndexSpec());

View File

@ -529,6 +529,7 @@ public class IndexGeneratorJobTest
null,
null,
null,
null,
maxRowsInMemory,
maxBytesInMemory,
true,

View File

@ -165,6 +165,7 @@ public class JobHelperTest
null,
null,
null,
null,
false,
false,
false,

View File

@ -63,6 +63,7 @@ public class GranularityPathSpecTest
null,
null,
null,
null,
false,
false,
false,

View File

@ -26,6 +26,7 @@ import com.google.common.base.Preconditions;
import org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
import org.apache.druid.java.util.common.FileUtils;
import org.apache.druid.segment.IndexSpec;
import org.apache.druid.segment.incremental.AppendableIndexSpec;
import org.apache.druid.segment.indexing.TuningConfig;
import org.apache.druid.segment.realtime.appenderator.AppenderatorConfig;
import org.apache.druid.segment.writeout.SegmentWriteOutMediumFactory;
@ -37,9 +38,8 @@ import javax.annotation.Nullable;
import java.io.File;
@JsonTypeName("realtime_appenderator")
public class RealtimeAppenderatorTuningConfig implements TuningConfig, AppenderatorConfig
public class RealtimeAppenderatorTuningConfig implements AppenderatorConfig
{
private static final int DEFAULT_MAX_ROWS_IN_MEMORY = TuningConfig.DEFAULT_MAX_ROWS_IN_MEMORY;
private static final Period DEFAULT_INTERMEDIATE_PERSIST_PERIOD = new Period("PT10M");
private static final int DEFAULT_MAX_PENDING_PERSISTS = 0;
private static final ShardSpec DEFAULT_SHARD_SPEC = new NumberedShardSpec(0, 1);
@ -53,6 +53,7 @@ public class RealtimeAppenderatorTuningConfig implements TuningConfig, Appendera
return FileUtils.createTempDir("druid-realtime-persist");
}
private final AppendableIndexSpec appendableIndexSpec;
private final int maxRowsInMemory;
private final long maxBytesInMemory;
private final DynamicPartitionsSpec partitionsSpec;
@ -74,6 +75,7 @@ public class RealtimeAppenderatorTuningConfig implements TuningConfig, Appendera
@JsonCreator
public RealtimeAppenderatorTuningConfig(
@JsonProperty("appendableIndexSpec") @Nullable AppendableIndexSpec appendableIndexSpec,
@JsonProperty("maxRowsInMemory") Integer maxRowsInMemory,
@JsonProperty("maxBytesInMemory") @Nullable Long maxBytesInMemory,
@JsonProperty("maxRowsPerSegment") @Nullable Integer maxRowsPerSegment,
@ -93,9 +95,10 @@ public class RealtimeAppenderatorTuningConfig implements TuningConfig, Appendera
@JsonProperty("maxSavedParseExceptions") @Nullable Integer maxSavedParseExceptions
)
{
this.appendableIndexSpec = appendableIndexSpec == null ? DEFAULT_APPENDABLE_INDEX : appendableIndexSpec;
this.maxRowsInMemory = maxRowsInMemory == null ? DEFAULT_MAX_ROWS_IN_MEMORY : maxRowsInMemory;
// initializing this to 0, it will be lazily intialized to a value
// @see server.src.main.java.org.apache.druid.segment.indexing.TuningConfigs#getMaxBytesInMemoryOrDefault(long)
// initializing this to 0, it will be lazily initialized to a value
// @see #getMaxBytesInMemoryOrDefault()
this.maxBytesInMemory = maxBytesInMemory == null ? 0 : maxBytesInMemory;
this.partitionsSpec = new DynamicPartitionsSpec(maxRowsPerSegment, maxTotalRows);
this.intermediatePersistPeriod = intermediatePersistPeriod == null
@ -135,6 +138,13 @@ public class RealtimeAppenderatorTuningConfig implements TuningConfig, Appendera
: logParseExceptions;
}
@Override
@JsonProperty
public AppendableIndexSpec getAppendableIndexSpec()
{
return appendableIndexSpec;
}
@Override
@JsonProperty
public int getMaxRowsInMemory()
@ -260,6 +270,7 @@ public class RealtimeAppenderatorTuningConfig implements TuningConfig, Appendera
public RealtimeAppenderatorTuningConfig withBasePersistDirectory(File dir)
{
return new RealtimeAppenderatorTuningConfig(
appendableIndexSpec,
maxRowsInMemory,
maxBytesInMemory,
partitionsSpec.getMaxRowsPerSegment(),

View File

@ -219,6 +219,7 @@ public class CompactionTask extends AbstractBatchIndexTask
return new ParallelIndexTuningConfig(
null,
indexTuningConfig.getMaxRowsPerSegment(),
indexTuningConfig.getAppendableIndexSpec(),
indexTuningConfig.getMaxRowsPerSegment(),
indexTuningConfig.getMaxBytesInMemory(),
indexTuningConfig.getMaxTotalRows(),

View File

@ -74,6 +74,7 @@ import org.apache.druid.java.util.common.guava.Comparators;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.common.parsers.CloseableIterator;
import org.apache.druid.segment.IndexSpec;
import org.apache.druid.segment.incremental.AppendableIndexSpec;
import org.apache.druid.segment.incremental.ParseExceptionHandler;
import org.apache.druid.segment.incremental.RowIngestionMeters;
import org.apache.druid.segment.indexing.BatchIOConfig;
@ -1110,7 +1111,7 @@ public class IndexTask extends AbstractBatchIndexTask implements ChatHandler
}
}
public static class IndexTuningConfig implements TuningConfig, AppenderatorConfig
public static class IndexTuningConfig implements AppenderatorConfig
{
private static final IndexSpec DEFAULT_INDEX_SPEC = new IndexSpec();
private static final int DEFAULT_MAX_PENDING_PERSISTS = 0;
@ -1118,6 +1119,7 @@ public class IndexTask extends AbstractBatchIndexTask implements ChatHandler
private static final boolean DEFAULT_REPORT_PARSE_EXCEPTIONS = false;
private static final long DEFAULT_PUSH_TIMEOUT = 0;
private final AppendableIndexSpec appendableIndexSpec;
private final int maxRowsInMemory;
private final long maxBytesInMemory;
@ -1189,6 +1191,7 @@ public class IndexTask extends AbstractBatchIndexTask implements ChatHandler
public IndexTuningConfig(
@JsonProperty("targetPartitionSize") @Deprecated @Nullable Integer targetPartitionSize,
@JsonProperty("maxRowsPerSegment") @Deprecated @Nullable Integer maxRowsPerSegment,
@JsonProperty("appendableIndexSpec") @Nullable AppendableIndexSpec appendableIndexSpec,
@JsonProperty("maxRowsInMemory") @Nullable Integer maxRowsInMemory,
@JsonProperty("maxBytesInMemory") @Nullable Long maxBytesInMemory,
@JsonProperty("maxTotalRows") @Deprecated @Nullable Long maxTotalRows,
@ -1211,6 +1214,7 @@ public class IndexTask extends AbstractBatchIndexTask implements ChatHandler
)
{
this(
appendableIndexSpec,
maxRowsInMemory != null ? maxRowsInMemory : rowFlushBoundary_forBackCompatibility,
maxBytesInMemory != null ? maxBytesInMemory : 0,
getPartitionsSpec(
@ -1242,10 +1246,11 @@ public class IndexTask extends AbstractBatchIndexTask implements ChatHandler
private IndexTuningConfig()
{
this(null, null, null, null, null, null, null, null, null, null, null, null, null, null);
this(null, null, null, null, null, null, null, null, null, null, null, null, null, null, null);
}
private IndexTuningConfig(
@Nullable AppendableIndexSpec appendableIndexSpec,
@Nullable Integer maxRowsInMemory,
@Nullable Long maxBytesInMemory,
@Nullable PartitionsSpec partitionsSpec,
@ -1262,9 +1267,10 @@ public class IndexTask extends AbstractBatchIndexTask implements ChatHandler
@Nullable Integer maxSavedParseExceptions
)
{
this.appendableIndexSpec = appendableIndexSpec == null ? DEFAULT_APPENDABLE_INDEX : appendableIndexSpec;
this.maxRowsInMemory = maxRowsInMemory == null ? TuningConfig.DEFAULT_MAX_ROWS_IN_MEMORY : maxRowsInMemory;
// initializing this to 0, it will be lazily initialized to a value
// @see server.src.main.java.org.apache.druid.segment.indexing.TuningConfigs#getMaxBytesInMemoryOrDefault(long)
// @see #getMaxBytesInMemoryOrDefault()
this.maxBytesInMemory = maxBytesInMemory == null ? 0 : maxBytesInMemory;
this.partitionsSpec = partitionsSpec;
this.indexSpec = indexSpec == null ? DEFAULT_INDEX_SPEC : indexSpec;
@ -1300,6 +1306,7 @@ public class IndexTask extends AbstractBatchIndexTask implements ChatHandler
public IndexTuningConfig withBasePersistDirectory(File dir)
{
return new IndexTuningConfig(
appendableIndexSpec,
maxRowsInMemory,
maxBytesInMemory,
partitionsSpec,
@ -1320,6 +1327,7 @@ public class IndexTask extends AbstractBatchIndexTask implements ChatHandler
public IndexTuningConfig withPartitionsSpec(PartitionsSpec partitionsSpec)
{
return new IndexTuningConfig(
appendableIndexSpec,
maxRowsInMemory,
maxBytesInMemory,
partitionsSpec,
@ -1337,6 +1345,13 @@ public class IndexTask extends AbstractBatchIndexTask implements ChatHandler
);
}
@JsonProperty
@Override
public AppendableIndexSpec getAppendableIndexSpec()
{
return appendableIndexSpec;
}
@JsonProperty
@Override
public int getMaxRowsInMemory()
@ -1514,7 +1529,8 @@ public class IndexTask extends AbstractBatchIndexTask implements ChatHandler
return false;
}
IndexTuningConfig that = (IndexTuningConfig) o;
return maxRowsInMemory == that.maxRowsInMemory &&
return Objects.equals(appendableIndexSpec, that.appendableIndexSpec) &&
maxRowsInMemory == that.maxRowsInMemory &&
maxBytesInMemory == that.maxBytesInMemory &&
maxPendingPersists == that.maxPendingPersists &&
forceGuaranteedRollup == that.forceGuaranteedRollup &&
@ -1534,6 +1550,7 @@ public class IndexTask extends AbstractBatchIndexTask implements ChatHandler
public int hashCode()
{
return Objects.hash(
appendableIndexSpec,
maxRowsInMemory,
maxBytesInMemory,
partitionsSpec,

View File

@ -929,6 +929,7 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implemen
return new IndexTuningConfig(
null,
null,
tuningConfig.getAppendableIndexSpec(),
tuningConfig.getMaxRowsInMemory(),
tuningConfig.getMaxBytesInMemory(),
null,

View File

@ -28,6 +28,7 @@ import org.apache.druid.indexer.partitions.SingleDimensionPartitionsSpec;
import org.apache.druid.indexing.common.task.IndexTask.IndexTuningConfig;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.segment.IndexSpec;
import org.apache.druid.segment.incremental.AppendableIndexSpec;
import org.apache.druid.segment.writeout.SegmentWriteOutMediumFactory;
import org.joda.time.Duration;
import org.joda.time.Period;
@ -97,6 +98,7 @@ public class ParallelIndexTuningConfig extends IndexTuningConfig
null,
null,
null,
null,
null
);
}
@ -105,6 +107,7 @@ public class ParallelIndexTuningConfig extends IndexTuningConfig
public ParallelIndexTuningConfig(
@JsonProperty("targetPartitionSize") @Deprecated @Nullable Integer targetPartitionSize,
@JsonProperty("maxRowsPerSegment") @Deprecated @Nullable Integer maxRowsPerSegment,
@JsonProperty("appendableIndexSpec") @Nullable AppendableIndexSpec appendableIndexSpec,
@JsonProperty("maxRowsInMemory") @Nullable Integer maxRowsInMemory,
@JsonProperty("maxBytesInMemory") @Nullable Long maxBytesInMemory,
@JsonProperty("maxTotalRows") @Deprecated @Nullable Long maxTotalRows,
@ -134,6 +137,7 @@ public class ParallelIndexTuningConfig extends IndexTuningConfig
super(
targetPartitionSize,
maxRowsPerSegment,
appendableIndexSpec,
maxRowsInMemory,
maxBytesInMemory,
maxTotalRows,
@ -248,6 +252,7 @@ public class ParallelIndexTuningConfig extends IndexTuningConfig
return new ParallelIndexTuningConfig(
null,
null,
getAppendableIndexSpec(),
getMaxRowsInMemory(),
getMaxBytesInMemory(),
null,

View File

@ -22,6 +22,7 @@ package org.apache.druid.indexing.seekablestream;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
import org.apache.druid.segment.IndexSpec;
import org.apache.druid.segment.incremental.AppendableIndexSpec;
import org.apache.druid.segment.indexing.RealtimeTuningConfig;
import org.apache.druid.segment.indexing.TuningConfig;
import org.apache.druid.segment.realtime.appenderator.AppenderatorConfig;
@ -32,11 +33,12 @@ import javax.annotation.Nullable;
import java.io.File;
import java.util.Objects;
public abstract class SeekableStreamIndexTaskTuningConfig implements TuningConfig, AppenderatorConfig
public abstract class SeekableStreamIndexTaskTuningConfig implements AppenderatorConfig
{
private static final boolean DEFAULT_RESET_OFFSET_AUTOMATICALLY = false;
private static final boolean DEFAULT_SKIP_SEQUENCE_NUMBER_AVAILABILITY_CHECK = false;
private final AppendableIndexSpec appendableIndexSpec;
private final int maxRowsInMemory;
private final long maxBytesInMemory;
private final DynamicPartitionsSpec partitionsSpec;
@ -59,6 +61,7 @@ public abstract class SeekableStreamIndexTaskTuningConfig implements TuningConfi
private final int maxSavedParseExceptions;
public SeekableStreamIndexTaskTuningConfig(
@Nullable AppendableIndexSpec appendableIndexSpec,
@Nullable Integer maxRowsInMemory,
@Nullable Long maxBytesInMemory,
@Nullable Integer maxRowsPerSegment,
@ -84,10 +87,11 @@ public abstract class SeekableStreamIndexTaskTuningConfig implements TuningConfi
// Cannot be a static because default basePersistDirectory is unique per-instance
final RealtimeTuningConfig defaults = RealtimeTuningConfig.makeDefaultTuningConfig(basePersistDirectory);
this.appendableIndexSpec = appendableIndexSpec == null ? DEFAULT_APPENDABLE_INDEX : appendableIndexSpec;
this.maxRowsInMemory = maxRowsInMemory == null ? defaults.getMaxRowsInMemory() : maxRowsInMemory;
this.partitionsSpec = new DynamicPartitionsSpec(maxRowsPerSegment, maxTotalRows);
// initializing this to 0, it will be lazily initialized to a value
// @see server.src.main.java.org.apache.druid.segment.indexing.TuningConfigs#getMaxBytesInMemoryOrDefault(long)
// @see #getMaxBytesInMemoryOrDefault()
this.maxBytesInMemory = maxBytesInMemory == null ? 0 : maxBytesInMemory;
this.intermediatePersistPeriod = intermediatePersistPeriod == null
? defaults.getIntermediatePersistPeriod()
@ -130,6 +134,13 @@ public abstract class SeekableStreamIndexTaskTuningConfig implements TuningConfi
: logParseExceptions;
}
@Override
@JsonProperty
public AppendableIndexSpec getAppendableIndexSpec()
{
return appendableIndexSpec;
}
@Override
@JsonProperty
public int getMaxRowsInMemory()
@ -281,7 +292,8 @@ public abstract class SeekableStreamIndexTaskTuningConfig implements TuningConfi
return false;
}
SeekableStreamIndexTaskTuningConfig that = (SeekableStreamIndexTaskTuningConfig) o;
return maxRowsInMemory == that.maxRowsInMemory &&
return Objects.equals(appendableIndexSpec, that.appendableIndexSpec) &&
maxRowsInMemory == that.maxRowsInMemory &&
maxBytesInMemory == that.maxBytesInMemory &&
maxPendingPersists == that.maxPendingPersists &&
reportParseExceptions == that.reportParseExceptions &&
@ -304,6 +316,7 @@ public abstract class SeekableStreamIndexTaskTuningConfig implements TuningConfi
public int hashCode()
{
return Objects.hash(
appendableIndexSpec,
maxRowsInMemory,
maxBytesInMemory,
partitionsSpec,

View File

@ -1394,6 +1394,7 @@ public class AppenderatorDriverRealtimeIndexTaskTest extends InitializedNullHand
null
);
RealtimeAppenderatorTuningConfig tuningConfig = new RealtimeAppenderatorTuningConfig(
null,
1000,
null,
maxRowsPerSegment,

View File

@ -202,6 +202,7 @@ public class ClientCompactionTaskQuerySerdeTest
.inputSpec(new CompactionIntervalSpec(Intervals.of("2019/2020"), "testSha256OfSortedSegmentIds"))
.tuningConfig(
new ParallelIndexTuningConfig(
null,
null,
null,
40000,

View File

@ -279,6 +279,7 @@ public class CompactionTaskRunTest extends IngestionTestBase
null,
null,
null,
null,
new HashedPartitionsSpec(null, 3, null),
null,
null,

View File

@ -305,6 +305,7 @@ public class CompactionTaskTest
return new ParallelIndexTuningConfig(
null,
null, // null to compute maxRowsPerSegment automatically
null,
500000,
1000000L,
null,
@ -438,6 +439,7 @@ public class CompactionTaskTest
new IndexTuningConfig(
null,
null, // null to compute maxRowsPerSegment automatically
null,
500000,
1000000L,
null,
@ -583,6 +585,7 @@ public class CompactionTaskTest
final ParallelIndexTuningConfig tuningConfig = new ParallelIndexTuningConfig(
100000,
null,
null,
500000,
1000000L,
null,
@ -648,6 +651,7 @@ public class CompactionTaskTest
public void testCreateIngestionSchemaWithMaxTotalRows() throws IOException, SegmentLoadingException
{
final ParallelIndexTuningConfig tuningConfig = new ParallelIndexTuningConfig(
null,
null,
null,
500000,
@ -715,6 +719,7 @@ public class CompactionTaskTest
public void testCreateIngestionSchemaWithNumShards() throws IOException, SegmentLoadingException
{
final ParallelIndexTuningConfig tuningConfig = new ParallelIndexTuningConfig(
null,
null,
null,
500000,
@ -1103,6 +1108,7 @@ public class CompactionTaskTest
expectedMetricsSpec,
expectedSegmentIntervals,
new ParallelIndexTuningConfig(
null,
null,
null,
500000,

View File

@ -57,6 +57,7 @@ public class IndexTaskSerdeTest
public void testSerdeTuningConfigWithDynamicPartitionsSpec() throws IOException
{
final IndexTuningConfig tuningConfig = new IndexTuningConfig(
null,
null,
null,
100,
@ -90,6 +91,7 @@ public class IndexTaskSerdeTest
public void testSerdeTuningConfigWithHashedPartitionsSpec() throws IOException
{
final IndexTuningConfig tuningConfig = new IndexTuningConfig(
null,
null,
null,
100,
@ -125,6 +127,7 @@ public class IndexTaskSerdeTest
final IndexTuningConfig tuningConfig = new IndexTuningConfig(
null,
1000,
null,
100,
2000L,
3000L,
@ -156,6 +159,7 @@ public class IndexTaskSerdeTest
public void testSerdeTuningConfigWithDeprecatedHashedPartitionsSpec() throws IOException
{
final IndexTuningConfig tuningConfig = new IndexTuningConfig(
null,
null,
null,
100,
@ -191,6 +195,7 @@ public class IndexTaskSerdeTest
expectedException.expect(IllegalArgumentException.class);
expectedException.expectMessage("DynamicPartitionsSpec cannot be used for perfect rollup");
final IndexTuningConfig tuningConfig = new IndexTuningConfig(
null,
null,
null,
100,
@ -225,6 +230,7 @@ public class IndexTaskSerdeTest
expectedException.expect(IllegalArgumentException.class);
expectedException.expectMessage("DynamicPartitionsSpec must be used for best-effort rollup");
final IndexTuningConfig tuningConfig = new IndexTuningConfig(
null,
null,
null,
100,

View File

@ -26,6 +26,7 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Sets;
import com.google.common.io.Files;
import nl.jqno.equalsverifier.EqualsVerifier;
import org.apache.druid.data.input.InputFormat;
import org.apache.druid.data.input.impl.CSVParseSpec;
import org.apache.druid.data.input.impl.CsvInputFormat;
@ -1122,6 +1123,7 @@ public class IndexTaskTest extends IngestionTestBase
null,
null,
null,
null,
new HashedPartitionsSpec(2, null, null),
INDEX_SPEC,
null,
@ -1250,6 +1252,7 @@ public class IndexTaskTest extends IngestionTestBase
null,
null,
null,
null,
new DynamicPartitionsSpec(2, null),
INDEX_SPEC,
null,
@ -1370,6 +1373,7 @@ public class IndexTaskTest extends IngestionTestBase
null,
null,
null,
null,
new HashedPartitionsSpec(2, null, null),
INDEX_SPEC,
null,
@ -1808,6 +1812,7 @@ public class IndexTaskTest extends IngestionTestBase
return new IndexTuningConfig(
null,
maxRowsPerSegment,
null,
maxRowsInMemory,
maxBytesInMemory,
maxTotalRows,
@ -1998,4 +2003,12 @@ public class IndexTaskTest extends IngestionTestBase
);
}
}
@Test
public void testEqualsAndHashCode()
{
EqualsVerifier.forClass(IndexTuningConfig.class)
.usingGetClass()
.verify();
}
}

View File

@ -830,6 +830,7 @@ public class RealtimeIndexTaskTest extends InitializedNullHandlingTest
null
);
RealtimeTuningConfig realtimeTuningConfig = new RealtimeTuningConfig(
null,
1000,
null,
new Period("P1Y"),

View File

@ -247,6 +247,7 @@ public class TaskSerdeTest
),
new IndexIOConfig(null, new LocalInputSource(new File("lol"), "rofl"), new NoopInputFormat(), true),
new IndexTuningConfig(
null,
null,
null,
10,
@ -327,6 +328,7 @@ public class TaskSerdeTest
),
new IndexIOConfig(null, new LocalInputSource(new File("lol"), "rofl"), new NoopInputFormat(), true),
new IndexTuningConfig(
null,
null,
null,
10,
@ -396,6 +398,7 @@ public class TaskSerdeTest
),
new RealtimeTuningConfig(
null,
1,
10L,
new Period("PT10M"),

View File

@ -157,6 +157,7 @@ public class AbstractParallelIndexSupervisorTaskTest extends IngestionTestBase
null,
null,
null,
null,
2,
null,
null,
@ -227,6 +228,7 @@ public class AbstractParallelIndexSupervisorTaskTest extends IngestionTestBase
null,
null,
null,
null,
new MaxSizeSplitHintSpec(null, 1),
partitionsSpec,
null,

View File

@ -178,6 +178,7 @@ public class ParallelIndexSupervisorTaskKillTest extends AbstractParallelIndexSu
null,
null,
null,
null,
numTotalSubTasks,
null,
null,

View File

@ -427,6 +427,7 @@ public class ParallelIndexSupervisorTaskResourceTest extends AbstractParallelInd
null,
null,
null,
null,
NUM_SUB_TASKS,
null,
null,

View File

@ -271,6 +271,7 @@ public class ParallelIndexSupervisorTaskSerdeTest
null,
null,
null,
null,
partitionsSpec,
null,
null,

View File

@ -193,6 +193,7 @@ public class ParallelIndexSupervisorTaskTest
appendToExisting
);
final ParallelIndexTuningConfig tuningConfig = new ParallelIndexTuningConfig(
null,
null,
null,
10,

View File

@ -158,6 +158,7 @@ class ParallelIndexTestingFactory
return new ParallelIndexTuningConfig(
1,
null,
null,
3,
4L,
5L,

View File

@ -21,6 +21,7 @@ package org.apache.druid.indexing.common.task.batch.parallel;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.jsontype.NamedType;
import nl.jqno.equalsverifier.EqualsVerifier;
import org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
import org.apache.druid.indexer.partitions.HashedPartitionsSpec;
import org.apache.druid.indexer.partitions.SingleDimensionPartitionsSpec;
@ -67,6 +68,7 @@ public class ParallelIndexTuningConfigTest
throws IOException
{
final ParallelIndexTuningConfig tuningConfig = new ParallelIndexTuningConfig(
null,
null,
null,
10,
@ -109,6 +111,7 @@ public class ParallelIndexTuningConfigTest
{
final int maxNumConcurrentSubTasks = 250;
final ParallelIndexTuningConfig tuningConfig = new ParallelIndexTuningConfig(
null,
null,
null,
10,
@ -151,6 +154,7 @@ public class ParallelIndexTuningConfigTest
{
final int maxNumSubTasks = 250;
final ParallelIndexTuningConfig tuningConfig = new ParallelIndexTuningConfig(
null,
null,
null,
10,
@ -195,6 +199,7 @@ public class ParallelIndexTuningConfigTest
expectedException.expectMessage("Can't use both maxNumSubTasks and maxNumConcurrentSubTasks");
final int maxNumSubTasks = 250;
final ParallelIndexTuningConfig tuningConfig = new ParallelIndexTuningConfig(
null,
null,
null,
10,
@ -236,6 +241,7 @@ public class ParallelIndexTuningConfigTest
expectedException.expectMessage("DynamicPartitionsSpec must be used for best-effort rollup");
final boolean forceGuaranteedRollup = false;
new ParallelIndexTuningConfig(
null,
null,
null,
10,
@ -277,6 +283,7 @@ public class ParallelIndexTuningConfigTest
expectedException.expectMessage("DynamicPartitionsSpec must be used for best-effort rollup");
final boolean forceGuaranteedRollup = false;
new ParallelIndexTuningConfig(
null,
null,
null,
10,
@ -318,6 +325,7 @@ public class ParallelIndexTuningConfigTest
expectedException.expectMessage("cannot be used for perfect rollup");
final boolean forceGuaranteedRollup = true;
new ParallelIndexTuningConfig(
null,
null,
null,
10,
@ -351,4 +359,12 @@ public class ParallelIndexTuningConfigTest
null
);
}
@Test
public void testEqualsAndHashCode()
{
EqualsVerifier.forClass(ParallelIndexTuningConfig.class)
.usingGetClass()
.verify();
}
}

View File

@ -352,6 +352,7 @@ public class SinglePhaseParallelIndexingTest extends AbstractParallelIndexSuperv
null,
null,
null,
null,
1,
null,
null,

View File

@ -746,6 +746,7 @@ public class TaskLifecycleTest extends InitializedNullHandlingTest
new IndexTuningConfig(
null,
10000,
null,
10,
null,
null,
@ -826,6 +827,7 @@ public class TaskLifecycleTest extends InitializedNullHandlingTest
new IndexTuningConfig(
null,
10000,
null,
10,
null,
null,
@ -1251,6 +1253,7 @@ public class TaskLifecycleTest extends InitializedNullHandlingTest
new IndexTuningConfig(
null,
10000,
null,
10,
null,
null,
@ -1358,6 +1361,7 @@ public class TaskLifecycleTest extends InitializedNullHandlingTest
new IndexTuningConfig(
null,
10000,
null,
10,
null,
null,
@ -1468,6 +1472,7 @@ public class TaskLifecycleTest extends InitializedNullHandlingTest
// PlumberSchool - Realtime Index Task always uses RealtimePlumber which is hardcoded in RealtimeIndexTask class
);
RealtimeTuningConfig realtimeTuningConfig = new RealtimeTuningConfig(
null,
1000,
null,
new Period("P1Y"),

View File

@ -898,6 +898,7 @@ public class SeekableStreamSupervisorStateTest extends EasyMockSupport
null,
null,
null,
null,
null
)
{

View File

@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.jackson;
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import com.fasterxml.jackson.databind.module.SimpleModule;
import org.apache.druid.segment.incremental.AppendableIndexSpec;
import org.apache.druid.segment.incremental.OnheapIncrementalIndex;
public class AppendableIndexModule extends SimpleModule
{
public AppendableIndexModule()
{
super("AppendableIndexFactories");
setMixInAnnotation(AppendableIndexSpec.class, AppendableIndexSpecMixin.class);
}
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", defaultImpl = OnheapIncrementalIndex.Spec.class)
@JsonSubTypes(value = {
@JsonSubTypes.Type(name = OnheapIncrementalIndex.Spec.TYPE, value = OnheapIncrementalIndex.Spec.class),
})
public interface AppendableIndexSpecMixin
{
}
}

View File

@ -49,6 +49,7 @@ public class DefaultObjectMapper extends ObjectMapper
registerModule(new AggregatorsModule());
registerModule(new StringComparatorModule());
registerModule(new SegmentizerModule());
registerModule(new AppendableIndexModule());
configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
configure(MapperFeature.AUTO_DETECT_GETTERS, false);

View File

@ -39,9 +39,12 @@ import org.apache.druid.query.ResourceLimitExceededException;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.PostAggregator;
import org.apache.druid.query.dimension.DimensionSpec;
import org.apache.druid.segment.incremental.AppendableIndexBuilder;
import org.apache.druid.segment.incremental.IncrementalIndex;
import org.apache.druid.segment.incremental.IncrementalIndexSchema;
import org.apache.druid.segment.incremental.IndexSizeExceededException;
import org.apache.druid.segment.incremental.OffheapIncrementalIndex;
import org.apache.druid.segment.incremental.OnheapIncrementalIndex;
import org.joda.time.DateTime;
import javax.annotation.Nullable;
@ -117,24 +120,24 @@ public class GroupByQueryHelper
.withMinTimestamp(granTimeStart.getMillis())
.build();
final AppendableIndexBuilder indexBuilder;
if (query.getContextValue("useOffheap", false)) {
index = new IncrementalIndex.Builder()
.setIndexSchema(indexSchema)
.setDeserializeComplexMetrics(false)
.setConcurrentEventAdd(true)
.setSortFacts(sortResults)
.setMaxRowCount(querySpecificConfig.getMaxResults())
.buildOffheap(bufferPool);
indexBuilder = new OffheapIncrementalIndex.Builder()
.setBufferPool(bufferPool);
} else {
index = new IncrementalIndex.Builder()
.setIndexSchema(indexSchema)
.setDeserializeComplexMetrics(false)
.setConcurrentEventAdd(true)
.setSortFacts(sortResults)
.setMaxRowCount(querySpecificConfig.getMaxResults())
.buildOnheap();
indexBuilder = new OnheapIncrementalIndex.Builder();
}
index = indexBuilder
.setIndexSchema(indexSchema)
.setDeserializeComplexMetrics(false)
.setConcurrentEventAdd(true)
.setSortFacts(sortResults)
.setMaxRowCount(querySpecificConfig.getMaxResults())
.build();
Accumulator<IncrementalIndex, T> accumulator = new Accumulator<IncrementalIndex, T>()
{
@Override

View File

@ -0,0 +1,128 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.segment.incremental;
import com.google.common.annotations.VisibleForTesting;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.query.aggregation.AggregatorFactory;
import javax.annotation.Nullable;
public abstract class AppendableIndexBuilder
{
@Nullable
protected IncrementalIndexSchema incrementalIndexSchema = null;
protected boolean deserializeComplexMetrics = true;
protected boolean concurrentEventAdd = false;
protected boolean sortFacts = true;
protected int maxRowCount = 0;
protected long maxBytesInMemory = 0;
protected final Logger log = new Logger(this.getClass().getName());
public AppendableIndexBuilder setIndexSchema(final IncrementalIndexSchema incrementalIndexSchema)
{
this.incrementalIndexSchema = incrementalIndexSchema;
return this;
}
/**
* A helper method to set a simple index schema with only metrics and default values for the other parameters. Note
* that this method is normally used for testing and benchmarking; it is unlikely that you would use it in
* production settings.
*
* @param metrics variable array of {@link AggregatorFactory} metrics
*
* @return this
*/
@VisibleForTesting
public AppendableIndexBuilder setSimpleTestingIndexSchema(final AggregatorFactory... metrics)
{
return setSimpleTestingIndexSchema(null, metrics);
}
/**
* A helper method to set a simple index schema with controllable metrics and rollup, and default values for the
* other parameters. Note that this method is normally used for testing and benchmarking; it is unlikely that you
* would use it in production settings.
*
* @param metrics variable array of {@link AggregatorFactory} metrics
*
* @return this
*/
@VisibleForTesting
public AppendableIndexBuilder setSimpleTestingIndexSchema(@Nullable Boolean rollup, final AggregatorFactory... metrics)
{
IncrementalIndexSchema.Builder builder = new IncrementalIndexSchema.Builder().withMetrics(metrics);
this.incrementalIndexSchema = rollup != null ? builder.withRollup(rollup).build() : builder.build();
return this;
}
public AppendableIndexBuilder setDeserializeComplexMetrics(final boolean deserializeComplexMetrics)
{
this.deserializeComplexMetrics = deserializeComplexMetrics;
return this;
}
public AppendableIndexBuilder setConcurrentEventAdd(final boolean concurrentEventAdd)
{
this.concurrentEventAdd = concurrentEventAdd;
return this;
}
public AppendableIndexBuilder setSortFacts(final boolean sortFacts)
{
this.sortFacts = sortFacts;
return this;
}
public AppendableIndexBuilder setMaxRowCount(final int maxRowCount)
{
this.maxRowCount = maxRowCount;
return this;
}
public AppendableIndexBuilder setMaxBytesInMemory(final long maxBytesInMemory)
{
this.maxBytesInMemory = maxBytesInMemory;
return this;
}
public void validate()
{
if (maxRowCount <= 0) {
throw new IllegalArgumentException("Invalid max row count: " + maxRowCount);
}
if (incrementalIndexSchema == null) {
throw new IllegalArgumentException("incrementIndexSchema cannot be null");
}
}
public final IncrementalIndex build()
{
log.debug("Building appendable index.");
validate();
return buildInner();
}
protected abstract IncrementalIndex buildInner();
}

View File

@ -17,25 +17,19 @@
* under the License.
*/
package org.apache.druid.segment.indexing;
package org.apache.druid.segment.incremental;
public class TuningConfigs
import org.apache.druid.guice.annotations.UnstableApi;
/**
* AppendableIndexSpec describes the in-memory indexing method for data ingestion.
*/
@UnstableApi
public interface AppendableIndexSpec
{
private TuningConfigs()
{
}
// Returns a builder of the appendable index.
AppendableIndexBuilder builder();
public static long getMaxBytesInMemoryOrDefault(final long maxBytesInMemory)
{
// In the main tuningConfig class constructor, we set the maxBytes to 0 if null to avoid setting
// maxBytes to max jvm memory of the process that starts first. Instead we set the default based on
// the actual task node's jvm memory.
long newMaxBytesInMemory = maxBytesInMemory;
if (maxBytesInMemory == 0) {
newMaxBytesInMemory = TuningConfig.DEFAULT_MAX_BYTES_IN_MEMORY;
} else if (maxBytesInMemory < 0) {
newMaxBytesInMemory = Long.MAX_VALUE;
}
return newMaxBytesInMemory;
}
// Returns the default max bytes in memory for this index.
long getDefaultMaxBytesInMemory();
}

View File

@ -31,7 +31,6 @@ import com.google.common.collect.Sets;
import com.google.common.primitives.Ints;
import com.google.common.primitives.Longs;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import org.apache.druid.collections.NonBlockingPool;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.MapBasedRow;
@ -79,7 +78,6 @@ import org.joda.time.Interval;
import javax.annotation.Nullable;
import java.io.Closeable;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
@ -88,7 +86,6 @@ import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedDeque;
@ -319,126 +316,62 @@ public abstract class IncrementalIndex<AggregatorType> extends AbstractIndex imp
}
}
public static class Builder
/**
* This class exists only as backward competability to reduce the number of modified lines.
*/
public static class Builder extends OnheapIncrementalIndex.Builder
{
@Nullable
private IncrementalIndexSchema incrementalIndexSchema;
private boolean deserializeComplexMetrics;
private boolean concurrentEventAdd;
private boolean sortFacts;
private int maxRowCount;
private long maxBytesInMemory;
public Builder()
{
incrementalIndexSchema = null;
deserializeComplexMetrics = true;
concurrentEventAdd = false;
sortFacts = true;
maxRowCount = 0;
maxBytesInMemory = 0;
}
@Override
public Builder setIndexSchema(final IncrementalIndexSchema incrementalIndexSchema)
{
this.incrementalIndexSchema = incrementalIndexSchema;
return this;
return (Builder) super.setIndexSchema(incrementalIndexSchema);
}
/**
* A helper method to set a simple index schema with only metrics and default values for the other parameters. Note
* that this method is normally used for testing and benchmarking; it is unlikely that you would use it in
* production settings.
*
* @param metrics variable array of {@link AggregatorFactory} metrics
*
* @return this
*/
@VisibleForTesting
@Override
public Builder setSimpleTestingIndexSchema(final AggregatorFactory... metrics)
{
return setSimpleTestingIndexSchema(null, metrics);
return (Builder) super.setSimpleTestingIndexSchema(metrics);
}
/**
* A helper method to set a simple index schema with controllable metrics and rollup, and default values for the
* other parameters. Note that this method is normally used for testing and benchmarking; it is unlikely that you
* would use it in production settings.
*
* @param metrics variable array of {@link AggregatorFactory} metrics
*
* @return this
*/
@VisibleForTesting
@Override
public Builder setSimpleTestingIndexSchema(@Nullable Boolean rollup, final AggregatorFactory... metrics)
{
IncrementalIndexSchema.Builder builder = new IncrementalIndexSchema.Builder().withMetrics(metrics);
this.incrementalIndexSchema = rollup != null ? builder.withRollup(rollup).build() : builder.build();
return this;
return (Builder) super.setSimpleTestingIndexSchema(rollup, metrics);
}
@Override
public Builder setDeserializeComplexMetrics(final boolean deserializeComplexMetrics)
{
this.deserializeComplexMetrics = deserializeComplexMetrics;
return this;
return (Builder) super.setDeserializeComplexMetrics(deserializeComplexMetrics);
}
@Override
public Builder setConcurrentEventAdd(final boolean concurrentEventAdd)
{
this.concurrentEventAdd = concurrentEventAdd;
return this;
return (Builder) super.setConcurrentEventAdd(concurrentEventAdd);
}
@Override
public Builder setSortFacts(final boolean sortFacts)
{
this.sortFacts = sortFacts;
return this;
return (Builder) super.setSortFacts(sortFacts);
}
@Override
public Builder setMaxRowCount(final int maxRowCount)
{
this.maxRowCount = maxRowCount;
return this;
return (Builder) super.setMaxRowCount(maxRowCount);
}
//maxBytesInMemory only applies to OnHeapIncrementalIndex
@Override
public Builder setMaxBytesInMemory(final long maxBytesInMemory)
{
this.maxBytesInMemory = maxBytesInMemory;
return this;
return (Builder) super.setMaxBytesInMemory(maxBytesInMemory);
}
public OnheapIncrementalIndex buildOnheap()
{
if (maxRowCount <= 0) {
throw new IllegalArgumentException("Invalid max row count: " + maxRowCount);
}
return new OnheapIncrementalIndex(
Objects.requireNonNull(incrementalIndexSchema, "incrementIndexSchema is null"),
deserializeComplexMetrics,
concurrentEventAdd,
sortFacts,
maxRowCount,
maxBytesInMemory
);
}
public IncrementalIndex buildOffheap(final NonBlockingPool<ByteBuffer> bufferPool)
{
if (maxRowCount <= 0) {
throw new IllegalArgumentException("Invalid max row count: " + maxRowCount);
}
return new OffheapIncrementalIndex(
Objects.requireNonNull(incrementalIndexSchema, "incrementalIndexSchema is null"),
deserializeComplexMetrics,
concurrentEventAdd,
sortFacts,
maxRowCount,
Objects.requireNonNull(bufferPool, "bufferPool is null")
);
return (OnheapIncrementalIndex) build();
}
}

View File

@ -40,6 +40,7 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicInteger;
/**
@ -115,6 +116,8 @@ public class OffheapIncrementalIndex extends IncrementalIndex<BufferAggregator>
selectors = new HashMap<>();
aggOffsetInBuffer = new int[metrics.length];
int aggsCurOffsetInBuffer = 0;
for (int i = 0; i < metrics.length; i++) {
AggregatorFactory agg = metrics[i];
@ -129,15 +132,11 @@ public class OffheapIncrementalIndex extends IncrementalIndex<BufferAggregator>
new OnheapIncrementalIndex.CachingColumnSelectorFactory(columnSelectorFactory, concurrentEventAdd)
);
if (i == 0) {
aggOffsetInBuffer[i] = 0;
} else {
aggOffsetInBuffer[i] = aggOffsetInBuffer[i - 1] + metrics[i - 1].getMaxIntermediateSizeWithNulls();
}
aggOffsetInBuffer[i] = aggsCurOffsetInBuffer;
aggsCurOffsetInBuffer += agg.getMaxIntermediateSizeWithNulls();
}
aggsTotalSize = aggOffsetInBuffer[metrics.length - 1] + metrics[metrics.length
- 1].getMaxIntermediateSizeWithNulls();
aggsTotalSize = aggsCurOffsetInBuffer;
return new BufferAggregator[metrics.length];
}
@ -346,4 +345,38 @@ public class OffheapIncrementalIndex extends IncrementalIndex<BufferAggregator>
}
aggBuffers.clear();
}
public static class Builder extends AppendableIndexBuilder
{
@Nullable
NonBlockingPool<ByteBuffer> bufferPool = null;
public Builder setBufferPool(final NonBlockingPool<ByteBuffer> bufferPool)
{
this.bufferPool = bufferPool;
return this;
}
@Override
public void validate()
{
super.validate();
if (bufferPool == null) {
throw new IllegalArgumentException("bufferPool cannot be null");
}
}
@Override
protected OffheapIncrementalIndex buildInner()
{
return new OffheapIncrementalIndex(
Objects.requireNonNull(incrementalIndexSchema, "incrementalIndexSchema is null"),
deserializeComplexMetrics,
concurrentEventAdd,
sortFacts,
maxRowCount,
Objects.requireNonNull(bufferPool, "bufferPool is null")
);
}
}
}

View File

@ -32,6 +32,7 @@ import org.apache.druid.segment.ColumnSelectorFactory;
import org.apache.druid.segment.ColumnValueSelector;
import org.apache.druid.segment.DimensionSelector;
import org.apache.druid.segment.column.ColumnCapabilities;
import org.apache.druid.utils.JvmUtils;
import javax.annotation.Nullable;
import java.io.IOException;
@ -40,6 +41,7 @@ import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
@ -434,4 +436,51 @@ public class OnheapIncrementalIndex extends IncrementalIndex<Aggregator>
}
}
public static class Builder extends AppendableIndexBuilder
{
@Override
protected OnheapIncrementalIndex buildInner()
{
return new OnheapIncrementalIndex(
Objects.requireNonNull(incrementalIndexSchema, "incrementIndexSchema is null"),
deserializeComplexMetrics,
concurrentEventAdd,
sortFacts,
maxRowCount,
maxBytesInMemory
);
}
}
public static class Spec implements AppendableIndexSpec
{
public static final String TYPE = "onheap";
@Override
public AppendableIndexBuilder builder()
{
return new Builder();
}
@Override
public long getDefaultMaxBytesInMemory()
{
// We initially estimated this to be 1/3(max jvm memory), but bytesCurrentlyInMemory only
// tracks active index and not the index being flushed to disk, to account for that
// we halved default to 1/6(max jvm memory)
return JvmUtils.getRuntimeInfo().getMaxHeapSizeBytes() / 6;
}
@Override
public boolean equals(Object that)
{
return that.getClass().equals(this.getClass());
}
@Override
public int hashCode()
{
return Objects.hash(this.getClass());
}
}
}

View File

@ -63,9 +63,10 @@ import org.apache.druid.segment.CloserRule;
import org.apache.druid.segment.IncrementalIndexSegment;
import org.apache.druid.segment.Segment;
import org.apache.druid.segment.incremental.IncrementalIndex;
import org.apache.druid.segment.incremental.IncrementalIndex.Builder;
import org.apache.druid.segment.incremental.IncrementalIndexSchema;
import org.apache.druid.segment.incremental.IndexSizeExceededException;
import org.apache.druid.segment.incremental.OffheapIncrementalIndex;
import org.apache.druid.segment.incremental.OnheapIncrementalIndex;
import org.apache.druid.testing.InitializedNullHandlingTest;
import org.joda.time.Interval;
import org.junit.AfterClass;
@ -130,10 +131,11 @@ public class IncrementalIndexTest extends InitializedNullHandlingTest
RESOURCE_CLOSER.register(pool1);
params.add(
new Object[] {
(IndexCreator) factories -> new Builder()
(IndexCreator) factories -> new OffheapIncrementalIndex.Builder()
.setBufferPool(pool1)
.setSimpleTestingIndexSchema(factories)
.setMaxRowCount(1000000)
.buildOffheap(pool1)
.build()
}
);
params.add(new Object[] {(IndexCreator) IncrementalIndexTest::createNoRollupIndex});
@ -144,7 +146,8 @@ public class IncrementalIndexTest extends InitializedNullHandlingTest
RESOURCE_CLOSER.register(pool2);
params.add(
new Object[] {
(IndexCreator) factories -> new Builder()
(IndexCreator) factories -> new OffheapIncrementalIndex.Builder()
.setBufferPool(pool2)
.setIndexSchema(
new IncrementalIndexSchema.Builder()
.withMetrics(factories)
@ -152,7 +155,7 @@ public class IncrementalIndexTest extends InitializedNullHandlingTest
.build()
)
.setMaxRowCount(1000000)
.buildOffheap(pool2)
.build()
}
);
@ -173,7 +176,7 @@ public class IncrementalIndexTest extends InitializedNullHandlingTest
aggregatorFactories = DEFAULT_AGGREGATOR_FACTORIES;
}
return new IncrementalIndex.Builder()
return new OnheapIncrementalIndex.Builder()
.setIndexSchema(
new IncrementalIndexSchema.Builder()
.withDimensionsSpec(dimensionsSpec)
@ -181,7 +184,7 @@ public class IncrementalIndexTest extends InitializedNullHandlingTest
.build()
)
.setMaxRowCount(1000000)
.buildOnheap();
.build();
}
public static IncrementalIndex createIndex(AggregatorFactory[] aggregatorFactories)
@ -190,10 +193,10 @@ public class IncrementalIndexTest extends InitializedNullHandlingTest
aggregatorFactories = DEFAULT_AGGREGATOR_FACTORIES;
}
return new IncrementalIndex.Builder()
return new OnheapIncrementalIndex.Builder()
.setSimpleTestingIndexSchema(aggregatorFactories)
.setMaxRowCount(1000000)
.buildOnheap();
.build();
}
public static IncrementalIndex createNoRollupIndex(AggregatorFactory[] aggregatorFactories)
@ -202,10 +205,10 @@ public class IncrementalIndexTest extends InitializedNullHandlingTest
aggregatorFactories = DEFAULT_AGGREGATOR_FACTORIES;
}
return new IncrementalIndex.Builder()
return new OnheapIncrementalIndex.Builder()
.setSimpleTestingIndexSchema(false, aggregatorFactories)
.setMaxRowCount(1000000)
.buildOnheap();
.build();
}
public static void populateIndex(long timestamp, IncrementalIndex index) throws IndexSizeExceededException
@ -722,7 +725,7 @@ public class IncrementalIndexTest extends InitializedNullHandlingTest
@Test
public void testgetDimensions()
{
final IncrementalIndex<Aggregator> incrementalIndex = new IncrementalIndex.Builder()
final IncrementalIndex<Aggregator> incrementalIndex = (OnheapIncrementalIndex) new OnheapIncrementalIndex.Builder()
.setIndexSchema(
new IncrementalIndexSchema.Builder()
.withMetrics(new CountAggregatorFactory("count"))
@ -736,7 +739,7 @@ public class IncrementalIndexTest extends InitializedNullHandlingTest
.build()
)
.setMaxRowCount(1000000)
.buildOnheap();
.build();
closerRule.closeLater(incrementalIndex);
Assert.assertEquals(Arrays.asList("dim0", "dim1"), incrementalIndex.getDimensionNames());
@ -745,10 +748,10 @@ public class IncrementalIndexTest extends InitializedNullHandlingTest
@Test
public void testDynamicSchemaRollup() throws IndexSizeExceededException
{
IncrementalIndex<Aggregator> index = new IncrementalIndex.Builder()
IncrementalIndex<Aggregator> index = (OnheapIncrementalIndex) new OnheapIncrementalIndex.Builder()
.setSimpleTestingIndexSchema(/* empty */)
.setMaxRowCount(10)
.buildOnheap();
.build();
closerRule.closeLater(index);
index.add(
new MapBasedInputRow(

View File

@ -117,12 +117,12 @@ public class IncrementalIndexTest extends InitializedNullHandlingTest
@Override
public IncrementalIndex createIndex()
{
return new IncrementalIndex.Builder()
return new OnheapIncrementalIndex.Builder()
.setIndexSchema(schema)
.setDeserializeComplexMetrics(false)
.setSortFacts(sortFacts)
.setMaxRowCount(1000)
.buildOnheap();
.build();
}
},
Closer.create()
@ -141,11 +141,12 @@ public class IncrementalIndexTest extends InitializedNullHandlingTest
@Override
public IncrementalIndex createIndex()
{
return new IncrementalIndex.Builder()
return new OffheapIncrementalIndex.Builder()
.setBufferPool(stupidPool)
.setIndexSchema(schema)
.setSortFacts(sortFacts)
.setMaxRowCount(1000000)
.buildOffheap(stupidPool);
.build();
}
},
poolCloser

View File

@ -25,6 +25,7 @@ import com.google.common.base.Preconditions;
import org.apache.druid.indexer.partitions.PartitionsSpec;
import org.apache.druid.java.util.common.FileUtils;
import org.apache.druid.segment.IndexSpec;
import org.apache.druid.segment.incremental.AppendableIndexSpec;
import org.apache.druid.segment.realtime.appenderator.AppenderatorConfig;
import org.apache.druid.segment.realtime.plumber.IntervalStartVersioningPolicy;
import org.apache.druid.segment.realtime.plumber.RejectionPolicyFactory;
@ -41,9 +42,8 @@ import java.io.File;
/**
*
*/
public class RealtimeTuningConfig implements TuningConfig, AppenderatorConfig
public class RealtimeTuningConfig implements AppenderatorConfig
{
private static final int DEFAULT_MAX_ROWS_IN_MEMORY = TuningConfig.DEFAULT_MAX_ROWS_IN_MEMORY;
private static final Period DEFAULT_INTERMEDIATE_PERSIST_PERIOD = new Period("PT10M");
private static final Period DEFAULT_WINDOW_PERIOD = new Period("PT10M");
private static final VersioningPolicy DEFAULT_VERSIONING_POLICY = new IntervalStartVersioningPolicy();
@ -65,6 +65,7 @@ public class RealtimeTuningConfig implements TuningConfig, AppenderatorConfig
public static RealtimeTuningConfig makeDefaultTuningConfig(final @Nullable File basePersistDirectory)
{
return new RealtimeTuningConfig(
DEFAULT_APPENDABLE_INDEX,
DEFAULT_MAX_ROWS_IN_MEMORY,
0L,
DEFAULT_INTERMEDIATE_PERSIST_PERIOD,
@ -87,6 +88,7 @@ public class RealtimeTuningConfig implements TuningConfig, AppenderatorConfig
);
}
private final AppendableIndexSpec appendableIndexSpec;
private final int maxRowsInMemory;
private final long maxBytesInMemory;
private final Period intermediatePersistPeriod;
@ -110,6 +112,7 @@ public class RealtimeTuningConfig implements TuningConfig, AppenderatorConfig
@JsonCreator
public RealtimeTuningConfig(
@JsonProperty("appendableIndexSpec") @Nullable AppendableIndexSpec appendableIndexSpec,
@JsonProperty("maxRowsInMemory") Integer maxRowsInMemory,
@JsonProperty("maxBytesInMemory") Long maxBytesInMemory,
@JsonProperty("intermediatePersistPeriod") Period intermediatePersistPeriod,
@ -132,9 +135,10 @@ public class RealtimeTuningConfig implements TuningConfig, AppenderatorConfig
@JsonProperty("dedupColumn") @Nullable String dedupColumn
)
{
this.appendableIndexSpec = appendableIndexSpec == null ? DEFAULT_APPENDABLE_INDEX : appendableIndexSpec;
this.maxRowsInMemory = maxRowsInMemory == null ? DEFAULT_MAX_ROWS_IN_MEMORY : maxRowsInMemory;
// initializing this to 0, it will be lazily initialized to a value
// @see server.src.main.java.org.apache.druid.segment.indexing.TuningConfigs#getMaxBytesInMemoryOrDefault(long)
// @see #getMaxBytesInMemoryOrDefault()
this.maxBytesInMemory = maxBytesInMemory == null ? 0 : maxBytesInMemory;
this.intermediatePersistPeriod = intermediatePersistPeriod == null
? DEFAULT_INTERMEDIATE_PERSIST_PERIOD
@ -166,6 +170,13 @@ public class RealtimeTuningConfig implements TuningConfig, AppenderatorConfig
this.dedupColumn = dedupColumn == null ? DEFAULT_DEDUP_COLUMN : dedupColumn;
}
@Override
@JsonProperty
public AppendableIndexSpec getAppendableIndexSpec()
{
return appendableIndexSpec;
}
@Override
@JsonProperty
public int getMaxRowsInMemory()
@ -304,6 +315,7 @@ public class RealtimeTuningConfig implements TuningConfig, AppenderatorConfig
public RealtimeTuningConfig withVersioningPolicy(VersioningPolicy policy)
{
return new RealtimeTuningConfig(
appendableIndexSpec,
maxRowsInMemory,
maxBytesInMemory,
intermediatePersistPeriod,
@ -330,6 +342,7 @@ public class RealtimeTuningConfig implements TuningConfig, AppenderatorConfig
public RealtimeTuningConfig withBasePersistDirectory(File dir)
{
return new RealtimeTuningConfig(
appendableIndexSpec,
maxRowsInMemory,
maxBytesInMemory,
intermediatePersistPeriod,

View File

@ -21,7 +21,8 @@ package org.apache.druid.segment.indexing;
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import org.apache.druid.utils.JvmUtils;
import org.apache.druid.segment.incremental.AppendableIndexSpec;
import org.apache.druid.segment.incremental.OnheapIncrementalIndex;
/**
*/
@ -32,11 +33,37 @@ import org.apache.druid.utils.JvmUtils;
public interface TuningConfig
{
boolean DEFAULT_LOG_PARSE_EXCEPTIONS = false;
AppendableIndexSpec DEFAULT_APPENDABLE_INDEX = new OnheapIncrementalIndex.Spec();
int DEFAULT_MAX_PARSE_EXCEPTIONS = Integer.MAX_VALUE;
int DEFAULT_MAX_SAVED_PARSE_EXCEPTIONS = 0;
int DEFAULT_MAX_ROWS_IN_MEMORY = 1_000_000;
// We initially estimated this to be 1/3(max jvm memory), but bytesCurrentlyInMemory only
// tracks active index and not the index being flushed to disk, to account for that
// we halved default to 1/6(max jvm memory)
long DEFAULT_MAX_BYTES_IN_MEMORY = JvmUtils.getRuntimeInfo().getMaxHeapSizeBytes() / 6;
/**
* The incremental index implementation to use
*/
AppendableIndexSpec getAppendableIndexSpec();
/**
* Maximum number of bytes (estimated) to store in memory before persisting to local storage
*/
long getMaxBytesInMemory();
/**
* Maximum number of bytes (estimated) to store in memory before persisting to local storage.
* If getMaxBytesInMemory() returns 0, the appendable index default will be used.
*/
default long getMaxBytesInMemoryOrDefault()
{
// In the main tuningConfig class constructor, we set the maxBytes to 0 if null to avoid setting
// maxBytes to max jvm memory of the process that starts first. Instead we set the default based on
// the actual task node's jvm memory.
final long maxBytesInMemory = getMaxBytesInMemory();
if (maxBytesInMemory > 0) {
return maxBytesInMemory;
} else if (maxBytesInMemory == 0) {
return getAppendableIndexSpec().getDefaultMaxBytesInMemory();
} else {
return Long.MAX_VALUE;
}
}
}

View File

@ -21,13 +21,14 @@ package org.apache.druid.segment.realtime.appenderator;
import org.apache.druid.indexer.partitions.PartitionsSpec;
import org.apache.druid.segment.IndexSpec;
import org.apache.druid.segment.indexing.TuningConfig;
import org.apache.druid.segment.writeout.SegmentWriteOutMediumFactory;
import org.joda.time.Period;
import javax.annotation.Nullable;
import java.io.File;
public interface AppenderatorConfig
public interface AppenderatorConfig extends TuningConfig
{
boolean isReportParseExceptions();
@ -36,11 +37,6 @@ public interface AppenderatorConfig
*/
int getMaxRowsInMemory();
/**
* Maximum number of bytes (estimated) to store in memory before persisting to local storage
*/
long getMaxBytesInMemory();
int getMaxPendingPersists();
/**

View File

@ -64,7 +64,6 @@ import org.apache.druid.segment.incremental.IndexSizeExceededException;
import org.apache.druid.segment.incremental.ParseExceptionHandler;
import org.apache.druid.segment.incremental.RowIngestionMeters;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.indexing.TuningConfigs;
import org.apache.druid.segment.loading.DataSegmentPusher;
import org.apache.druid.segment.realtime.FireDepartmentMetrics;
import org.apache.druid.segment.realtime.FireHydrant;
@ -199,7 +198,7 @@ public class AppenderatorImpl implements Appenderator
this.sinkTimeline = sinkQuerySegmentWalker.getSinkTimeline();
}
maxBytesTuningConfig = TuningConfigs.getMaxBytesInMemoryOrDefault(tuningConfig.getMaxBytesInMemory());
maxBytesTuningConfig = tuningConfig.getMaxBytesInMemoryOrDefault();
}
@Override
@ -404,6 +403,7 @@ public class AppenderatorImpl implements Appenderator
schema,
identifier.getShardSpec(),
identifier.getVersion(),
tuningConfig.getAppendableIndexSpec(),
tuningConfig.getMaxRowsInMemory(),
maxBytesTuningConfig,
null
@ -1121,6 +1121,7 @@ public class AppenderatorImpl implements Appenderator
schema,
identifier.getShardSpec(),
identifier.getVersion(),
tuningConfig.getAppendableIndexSpec(),
tuningConfig.getMaxRowsInMemory(),
maxBytesTuningConfig,
null,

View File

@ -49,6 +49,7 @@ import org.apache.druid.segment.IndexSpec;
import org.apache.druid.segment.IndexableAdapter;
import org.apache.druid.segment.ProgressIndicator;
import org.apache.druid.segment.QueryableIndex;
import org.apache.druid.segment.incremental.AppendableIndexSpec;
import org.apache.druid.segment.incremental.IncrementalIndex;
import org.apache.druid.segment.incremental.ParseExceptionHandler;
import org.apache.druid.segment.incremental.RowIngestionMeters;
@ -382,6 +383,12 @@ public class UnifiedIndexerAppenderatorsManager implements AppenderatorsManager
return baseConfig.isReportParseExceptions();
}
@Override
public AppendableIndexSpec getAppendableIndexSpec()
{
return baseConfig.getAppendableIndexSpec();
}
@Override
public int getMaxRowsInMemory()
{

View File

@ -61,7 +61,6 @@ import org.apache.druid.segment.incremental.IncrementalIndexAddResult;
import org.apache.druid.segment.incremental.IndexSizeExceededException;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.indexing.RealtimeTuningConfig;
import org.apache.druid.segment.indexing.TuningConfigs;
import org.apache.druid.segment.join.JoinableFactory;
import org.apache.druid.segment.loading.DataSegmentPusher;
import org.apache.druid.segment.realtime.FireDepartmentMetrics;
@ -260,8 +259,9 @@ public class RealtimePlumber implements Plumber
schema,
config.getShardSpec(),
versioningPolicy.getVersion(sinkInterval),
config.getAppendableIndexSpec(),
config.getMaxRowsInMemory(),
TuningConfigs.getMaxBytesInMemoryOrDefault(config.getMaxBytesInMemory()),
config.getMaxBytesInMemoryOrDefault(),
config.getDedupColumn()
);
addSink(retVal);
@ -723,8 +723,9 @@ public class RealtimePlumber implements Plumber
schema,
config.getShardSpec(),
versioningPolicy.getVersion(sinkInterval),
config.getAppendableIndexSpec(),
config.getMaxRowsInMemory(),
TuningConfigs.getMaxBytesInMemoryOrDefault(config.getMaxBytesInMemory()),
config.getMaxBytesInMemoryOrDefault(),
config.getDedupColumn(),
hydrants
);

View File

@ -30,6 +30,7 @@ import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.segment.QueryableIndex;
import org.apache.druid.segment.ReferenceCountingSegment;
import org.apache.druid.segment.column.ColumnCapabilities;
import org.apache.druid.segment.incremental.AppendableIndexSpec;
import org.apache.druid.segment.incremental.IncrementalIndex;
import org.apache.druid.segment.incremental.IncrementalIndexAddResult;
import org.apache.druid.segment.incremental.IncrementalIndexSchema;
@ -63,6 +64,7 @@ public class Sink implements Iterable<FireHydrant>, Overshadowable<Sink>
private final DataSchema schema;
private final ShardSpec shardSpec;
private final String version;
private final AppendableIndexSpec appendableIndexSpec;
private final int maxRowsInMemory;
private final long maxBytesInMemory;
private final CopyOnWriteArrayList<FireHydrant> hydrants = new CopyOnWriteArrayList<>();
@ -79,6 +81,7 @@ public class Sink implements Iterable<FireHydrant>, Overshadowable<Sink>
DataSchema schema,
ShardSpec shardSpec,
String version,
AppendableIndexSpec appendableIndexSpec,
int maxRowsInMemory,
long maxBytesInMemory,
String dedupColumn
@ -89,6 +92,7 @@ public class Sink implements Iterable<FireHydrant>, Overshadowable<Sink>
schema,
shardSpec,
version,
appendableIndexSpec,
maxRowsInMemory,
maxBytesInMemory,
dedupColumn,
@ -101,6 +105,7 @@ public class Sink implements Iterable<FireHydrant>, Overshadowable<Sink>
DataSchema schema,
ShardSpec shardSpec,
String version,
AppendableIndexSpec appendableIndexSpec,
int maxRowsInMemory,
long maxBytesInMemory,
String dedupColumn,
@ -111,6 +116,7 @@ public class Sink implements Iterable<FireHydrant>, Overshadowable<Sink>
this.shardSpec = shardSpec;
this.interval = interval;
this.version = version;
this.appendableIndexSpec = appendableIndexSpec;
this.maxRowsInMemory = maxRowsInMemory;
this.maxBytesInMemory = maxBytesInMemory;
this.dedupColumn = dedupColumn;
@ -322,11 +328,13 @@ public class Sink implements Iterable<FireHydrant>, Overshadowable<Sink>
.withMetrics(schema.getAggregators())
.withRollup(schema.getGranularitySpec().isRollup())
.build();
final IncrementalIndex newIndex = new IncrementalIndex.Builder()
// Build the incremental-index according to the spec that was chosen by the user
final IncrementalIndex newIndex = appendableIndexSpec.builder()
.setIndexSchema(indexSchema)
.setMaxRowCount(maxRowsInMemory)
.setMaxBytesInMemory(maxBytesInMemory)
.buildOnheap();
.build();
final FireHydrant old;
synchronized (hydrantLock) {

View File

@ -23,6 +23,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.druid.segment.IndexSpec;
import org.apache.druid.segment.TestHelper;
import org.apache.druid.segment.data.CompressionStrategy;
import org.apache.druid.segment.incremental.OnheapIncrementalIndex;
import org.apache.druid.timeline.partition.NumberedShardSpec;
import org.hamcrest.CoreMatchers;
import org.joda.time.Period;
@ -89,6 +90,7 @@ public class RealtimeTuningConfigTest
);
Assert.assertNotNull(config.getBasePersistDirectory());
Assert.assertEquals(new OnheapIncrementalIndex.Spec(), config.getAppendableIndexSpec());
Assert.assertEquals(0, config.getHandoffConditionTimeout());
Assert.assertEquals(0, config.getAlertTimeout());
Assert.assertEquals(new IndexSpec(), config.getIndexSpec());
@ -119,7 +121,8 @@ public class RealtimeTuningConfigTest
+ " \"handoffConditionTimeout\": 100,\n"
+ " \"alertTimeout\": 70,\n"
+ " \"indexSpec\": { \"metricCompression\" : \"NONE\" },\n"
+ " \"indexSpecForIntermediatePersists\": { \"dimensionCompression\" : \"uncompressed\" }\n"
+ " \"indexSpecForIntermediatePersists\": { \"dimensionCompression\" : \"uncompressed\" },\n"
+ " \"appendableIndexSpec\": { \"type\" : \"onheap\" }\n"
+ "}";
ObjectMapper mapper = TestHelper.makeJsonMapper();
@ -134,6 +137,7 @@ public class RealtimeTuningConfigTest
);
Assert.assertEquals("/tmp/xxx", config.getBasePersistDirectory().toString());
Assert.assertEquals(new OnheapIncrementalIndex.Spec(), config.getAppendableIndexSpec());
Assert.assertEquals(100, config.getHandoffConditionTimeout());
Assert.assertEquals(70, config.getAlertTimeout());
Assert.assertEquals(new Period("PT1H"), config.getIntermediatePersistPeriod());

View File

@ -62,6 +62,7 @@ public class AppenderatorPlumberTest
EasyMock.anyObject())).andReturn(true).anyTimes();
RealtimeTuningConfig tuningConfig = new RealtimeTuningConfig(
null,
1,
null,
null,

View File

@ -162,6 +162,7 @@ public class AppenderatorTester implements AutoCloseable
objectMapper
);
tuningConfig = new RealtimeTuningConfig(
null,
maxRowsInMemory,
maxSizeInBytes == 0L ? getDefaultMaxBytesInMemory() : maxSizeInBytes,
null,

View File

@ -134,6 +134,7 @@ public class DefaultOfflineAppenderatorFactoryTest
);
RealtimeTuningConfig tuningConfig = new RealtimeTuningConfig(
null,
75000,
null,
null,

View File

@ -48,7 +48,6 @@ import org.apache.druid.segment.ReferenceCountingSegment;
import org.apache.druid.segment.TestHelper;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.indexing.RealtimeTuningConfig;
import org.apache.druid.segment.indexing.TuningConfigs;
import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
import org.apache.druid.segment.join.NoopJoinableFactory;
import org.apache.druid.segment.loading.DataSegmentPusher;
@ -200,6 +199,7 @@ public class RealtimePlumberSchoolTest extends InitializedNullHandlingTest
EasyMock.replay(announcer, segmentPublisher, dataSegmentPusher, handoffNotifierFactory, handoffNotifier, emitter);
tuningConfig = new RealtimeTuningConfig(
null,
1,
null,
null,
@ -278,8 +278,9 @@ public class RealtimePlumberSchoolTest extends InitializedNullHandlingTest
schema,
tuningConfig.getShardSpec(),
DateTimes.of("2014-12-01T12:34:56.789").toString(),
tuningConfig.getAppendableIndexSpec(),
tuningConfig.getMaxRowsInMemory(),
TuningConfigs.getMaxBytesInMemoryOrDefault(tuningConfig.getMaxBytesInMemory()),
tuningConfig.getMaxBytesInMemoryOrDefault(),
tuningConfig.getDedupColumn()
);
plumber.getSinks().put(0L, sink);
@ -323,8 +324,9 @@ public class RealtimePlumberSchoolTest extends InitializedNullHandlingTest
schema,
tuningConfig.getShardSpec(),
DateTimes.of("2014-12-01T12:34:56.789").toString(),
tuningConfig.getAppendableIndexSpec(),
tuningConfig.getMaxRowsInMemory(),
TuningConfigs.getMaxBytesInMemoryOrDefault(tuningConfig.getMaxBytesInMemory()),
tuningConfig.getMaxBytesInMemoryOrDefault(),
tuningConfig.getDedupColumn()
);
plumber.getSinks().put(0L, sink);
@ -373,8 +375,9 @@ public class RealtimePlumberSchoolTest extends InitializedNullHandlingTest
schema2,
tuningConfig.getShardSpec(),
DateTimes.of("2014-12-01T12:34:56.789").toString(),
tuningConfig.getAppendableIndexSpec(),
tuningConfig.getMaxRowsInMemory(),
TuningConfigs.getMaxBytesInMemoryOrDefault(tuningConfig.getMaxBytesInMemory()),
tuningConfig.getMaxBytesInMemoryOrDefault(),
tuningConfig.getDedupColumn()
);
plumber2.getSinks().put(0L, sink);

View File

@ -34,7 +34,6 @@ import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.CountAggregatorFactory;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.indexing.RealtimeTuningConfig;
import org.apache.druid.segment.indexing.TuningConfigs;
import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
import org.apache.druid.segment.realtime.FireHydrant;
import org.apache.druid.testing.InitializedNullHandlingTest;
@ -66,6 +65,7 @@ public class SinkTest extends InitializedNullHandlingTest
final Interval interval = Intervals.of("2013-01-01/2013-01-02");
final String version = DateTimes.nowUtc().toString();
RealtimeTuningConfig tuningConfig = new RealtimeTuningConfig(
null,
100,
null,
new Period("P1Y"),
@ -91,8 +91,9 @@ public class SinkTest extends InitializedNullHandlingTest
schema,
tuningConfig.getShardSpec(),
version,
tuningConfig.getAppendableIndexSpec(),
tuningConfig.getMaxRowsInMemory(),
TuningConfigs.getMaxBytesInMemoryOrDefault(tuningConfig.getMaxBytesInMemory()),
tuningConfig.getMaxBytesInMemoryOrDefault(),
tuningConfig.getDedupColumn()
);
@ -220,6 +221,7 @@ public class SinkTest extends InitializedNullHandlingTest
final Interval interval = Intervals.of("2013-01-01/2013-01-02");
final String version = DateTimes.nowUtc().toString();
RealtimeTuningConfig tuningConfig = new RealtimeTuningConfig(
null,
100,
null,
new Period("P1Y"),
@ -245,8 +247,9 @@ public class SinkTest extends InitializedNullHandlingTest
schema,
tuningConfig.getShardSpec(),
version,
tuningConfig.getAppendableIndexSpec(),
tuningConfig.getMaxRowsInMemory(),
TuningConfigs.getMaxBytesInMemoryOrDefault(tuningConfig.getMaxBytesInMemory()),
tuningConfig.getMaxBytesInMemoryOrDefault(),
tuningConfig.getDedupColumn()
);

View File

@ -156,6 +156,7 @@ public class DruidJsonValidatorTest
),
new RealtimeTuningConfig(
null,
1,
null,
new Period("PT10M"),