mirror of https://github.com/apache/druid.git
Remove basePersistDirectory from tuning configs. (#13040)
* Remove basePersistDirectory from tuning configs. Since the removal of CliRealtime, it serves no purpose, since it is always overridden in production using withBasePersistDirectory given some subdirectory of the task work directory. Removing this from the tuning config has a benefit beyond removing no-longer-needed logic: it also avoids the side effect of empty "druid-realtime-persist" directories getting created in the systemwide temp directory. * Test adjustments to appropriately set basePersistDirectory. * Remove unused import. * Fix RATC constructor.
This commit is contained in:
parent
86e6e61e88
commit
d98c808d3f
|
@ -32,27 +32,26 @@ import java.io.File;
|
|||
|
||||
public class KafkaIndexTaskTuningConfig extends SeekableStreamIndexTaskTuningConfig
|
||||
{
|
||||
@JsonCreator
|
||||
public KafkaIndexTaskTuningConfig(
|
||||
@JsonProperty("appendableIndexSpec") @Nullable AppendableIndexSpec appendableIndexSpec,
|
||||
@JsonProperty("maxRowsInMemory") @Nullable Integer maxRowsInMemory,
|
||||
@JsonProperty("maxBytesInMemory") @Nullable Long maxBytesInMemory,
|
||||
@JsonProperty("skipBytesInMemoryOverheadCheck") @Nullable Boolean skipBytesInMemoryOverheadCheck,
|
||||
@JsonProperty("maxRowsPerSegment") @Nullable Integer maxRowsPerSegment,
|
||||
@JsonProperty("maxTotalRows") @Nullable Long maxTotalRows,
|
||||
@JsonProperty("intermediatePersistPeriod") @Nullable Period intermediatePersistPeriod,
|
||||
@JsonProperty("basePersistDirectory") @Nullable File basePersistDirectory,
|
||||
@JsonProperty("maxPendingPersists") @Nullable Integer maxPendingPersists,
|
||||
@JsonProperty("indexSpec") @Nullable IndexSpec indexSpec,
|
||||
@JsonProperty("indexSpecForIntermediatePersists") @Nullable IndexSpec indexSpecForIntermediatePersists,
|
||||
@Deprecated @JsonProperty("reportParseExceptions") @Nullable Boolean reportParseExceptions,
|
||||
@JsonProperty("handoffConditionTimeout") @Nullable Long handoffConditionTimeout,
|
||||
@JsonProperty("resetOffsetAutomatically") @Nullable Boolean resetOffsetAutomatically,
|
||||
@JsonProperty("segmentWriteOutMediumFactory") @Nullable SegmentWriteOutMediumFactory segmentWriteOutMediumFactory,
|
||||
@JsonProperty("intermediateHandoffPeriod") @Nullable Period intermediateHandoffPeriod,
|
||||
@JsonProperty("logParseExceptions") @Nullable Boolean logParseExceptions,
|
||||
@JsonProperty("maxParseExceptions") @Nullable Integer maxParseExceptions,
|
||||
@JsonProperty("maxSavedParseExceptions") @Nullable Integer maxSavedParseExceptions
|
||||
@Nullable AppendableIndexSpec appendableIndexSpec,
|
||||
@Nullable Integer maxRowsInMemory,
|
||||
@Nullable Long maxBytesInMemory,
|
||||
@Nullable Boolean skipBytesInMemoryOverheadCheck,
|
||||
@Nullable Integer maxRowsPerSegment,
|
||||
@Nullable Long maxTotalRows,
|
||||
@Nullable Period intermediatePersistPeriod,
|
||||
@Nullable File basePersistDirectory,
|
||||
@Nullable Integer maxPendingPersists,
|
||||
@Nullable IndexSpec indexSpec,
|
||||
@Nullable IndexSpec indexSpecForIntermediatePersists,
|
||||
@Nullable Boolean reportParseExceptions,
|
||||
@Nullable Long handoffConditionTimeout,
|
||||
@Nullable Boolean resetOffsetAutomatically,
|
||||
@Nullable SegmentWriteOutMediumFactory segmentWriteOutMediumFactory,
|
||||
@Nullable Period intermediateHandoffPeriod,
|
||||
@Nullable Boolean logParseExceptions,
|
||||
@Nullable Integer maxParseExceptions,
|
||||
@Nullable Integer maxSavedParseExceptions
|
||||
)
|
||||
{
|
||||
super(
|
||||
|
@ -79,6 +78,51 @@ public class KafkaIndexTaskTuningConfig extends SeekableStreamIndexTaskTuningCon
|
|||
);
|
||||
}
|
||||
|
||||
@JsonCreator
|
||||
private KafkaIndexTaskTuningConfig(
|
||||
@JsonProperty("appendableIndexSpec") @Nullable AppendableIndexSpec appendableIndexSpec,
|
||||
@JsonProperty("maxRowsInMemory") @Nullable Integer maxRowsInMemory,
|
||||
@JsonProperty("maxBytesInMemory") @Nullable Long maxBytesInMemory,
|
||||
@JsonProperty("skipBytesInMemoryOverheadCheck") @Nullable Boolean skipBytesInMemoryOverheadCheck,
|
||||
@JsonProperty("maxRowsPerSegment") @Nullable Integer maxRowsPerSegment,
|
||||
@JsonProperty("maxTotalRows") @Nullable Long maxTotalRows,
|
||||
@JsonProperty("intermediatePersistPeriod") @Nullable Period intermediatePersistPeriod,
|
||||
@JsonProperty("maxPendingPersists") @Nullable Integer maxPendingPersists,
|
||||
@JsonProperty("indexSpec") @Nullable IndexSpec indexSpec,
|
||||
@JsonProperty("indexSpecForIntermediatePersists") @Nullable IndexSpec indexSpecForIntermediatePersists,
|
||||
@Deprecated @JsonProperty("reportParseExceptions") @Nullable Boolean reportParseExceptions,
|
||||
@JsonProperty("handoffConditionTimeout") @Nullable Long handoffConditionTimeout,
|
||||
@JsonProperty("resetOffsetAutomatically") @Nullable Boolean resetOffsetAutomatically,
|
||||
@JsonProperty("segmentWriteOutMediumFactory") @Nullable SegmentWriteOutMediumFactory segmentWriteOutMediumFactory,
|
||||
@JsonProperty("intermediateHandoffPeriod") @Nullable Period intermediateHandoffPeriod,
|
||||
@JsonProperty("logParseExceptions") @Nullable Boolean logParseExceptions,
|
||||
@JsonProperty("maxParseExceptions") @Nullable Integer maxParseExceptions,
|
||||
@JsonProperty("maxSavedParseExceptions") @Nullable Integer maxSavedParseExceptions
|
||||
)
|
||||
{
|
||||
this(
|
||||
appendableIndexSpec,
|
||||
maxRowsInMemory,
|
||||
maxBytesInMemory,
|
||||
skipBytesInMemoryOverheadCheck,
|
||||
maxRowsPerSegment,
|
||||
maxTotalRows,
|
||||
intermediatePersistPeriod,
|
||||
null,
|
||||
maxPendingPersists,
|
||||
indexSpec,
|
||||
indexSpecForIntermediatePersists,
|
||||
reportParseExceptions,
|
||||
handoffConditionTimeout,
|
||||
resetOffsetAutomatically,
|
||||
segmentWriteOutMediumFactory,
|
||||
intermediateHandoffPeriod,
|
||||
logParseExceptions,
|
||||
maxParseExceptions,
|
||||
maxSavedParseExceptions
|
||||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
public KafkaIndexTaskTuningConfig withBasePersistDirectory(File dir)
|
||||
{
|
||||
|
@ -116,7 +160,6 @@ public class KafkaIndexTaskTuningConfig extends SeekableStreamIndexTaskTuningCon
|
|||
", maxBytesInMemory=" + getMaxBytesInMemory() +
|
||||
", skipBytesInMemoryOverheadCheck=" + isSkipBytesInMemoryOverheadCheck() +
|
||||
", intermediatePersistPeriod=" + getIntermediatePersistPeriod() +
|
||||
", basePersistDirectory=" + getBasePersistDirectory() +
|
||||
", maxPendingPersists=" + getMaxPendingPersists() +
|
||||
", indexSpec=" + getIndexSpec() +
|
||||
", indexSpecForIntermediatePersists=" + getIndexSpecForIntermediatePersists() +
|
||||
|
|
|
@ -29,7 +29,6 @@ import org.joda.time.Duration;
|
|||
import org.joda.time.Period;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
import java.io.File;
|
||||
|
||||
public class KafkaSupervisorTuningConfig extends KafkaIndexTaskTuningConfig
|
||||
implements SeekableStreamSupervisorTuningConfig
|
||||
|
@ -67,7 +66,6 @@ public class KafkaSupervisorTuningConfig extends KafkaIndexTaskTuningConfig
|
|||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null
|
||||
);
|
||||
}
|
||||
|
@ -80,7 +78,6 @@ public class KafkaSupervisorTuningConfig extends KafkaIndexTaskTuningConfig
|
|||
@JsonProperty("maxRowsPerSegment") Integer maxRowsPerSegment,
|
||||
@JsonProperty("maxTotalRows") Long maxTotalRows,
|
||||
@JsonProperty("intermediatePersistPeriod") Period intermediatePersistPeriod,
|
||||
@JsonProperty("basePersistDirectory") File basePersistDirectory,
|
||||
@JsonProperty("maxPendingPersists") Integer maxPendingPersists,
|
||||
@JsonProperty("indexSpec") IndexSpec indexSpec,
|
||||
@JsonProperty("indexSpecForIntermediatePersists") @Nullable IndexSpec indexSpecForIntermediatePersists,
|
||||
|
@ -108,7 +105,7 @@ public class KafkaSupervisorTuningConfig extends KafkaIndexTaskTuningConfig
|
|||
maxRowsPerSegment,
|
||||
maxTotalRows,
|
||||
intermediatePersistPeriod,
|
||||
basePersistDirectory,
|
||||
null,
|
||||
maxPendingPersists,
|
||||
indexSpec,
|
||||
indexSpecForIntermediatePersists,
|
||||
|
@ -198,7 +195,6 @@ public class KafkaSupervisorTuningConfig extends KafkaIndexTaskTuningConfig
|
|||
", maxBytesInMemory=" + getMaxBytesInMemoryOrDefault() +
|
||||
", skipBytesInMemoryOverheadCheck=" + isSkipBytesInMemoryOverheadCheck() +
|
||||
", intermediatePersistPeriod=" + getIntermediatePersistPeriod() +
|
||||
", basePersistDirectory=" + getBasePersistDirectory() +
|
||||
", maxPendingPersists=" + getMaxPendingPersists() +
|
||||
", indexSpec=" + getIndexSpec() +
|
||||
", reportParseExceptions=" + isReportParseExceptions() +
|
||||
|
@ -229,7 +225,7 @@ public class KafkaSupervisorTuningConfig extends KafkaIndexTaskTuningConfig
|
|||
getMaxRowsPerSegment(),
|
||||
getMaxTotalRows(),
|
||||
getIntermediatePersistPeriod(),
|
||||
getBasePersistDirectory(),
|
||||
null,
|
||||
getMaxPendingPersists(),
|
||||
getIndexSpec(),
|
||||
getIndexSpecForIntermediatePersists(),
|
||||
|
|
|
@ -61,7 +61,7 @@ public class KafkaIndexTaskTuningConfigTest
|
|||
TuningConfig.class
|
||||
);
|
||||
|
||||
Assert.assertNotNull(config.getBasePersistDirectory());
|
||||
Assert.assertNull(config.getBasePersistDirectory());
|
||||
Assert.assertEquals(new OnheapIncrementalIndex.Spec(), config.getAppendableIndexSpec());
|
||||
Assert.assertEquals(1000000, config.getMaxRowsInMemory());
|
||||
Assert.assertEquals(5_000_000, config.getMaxRowsPerSegment().intValue());
|
||||
|
@ -102,7 +102,7 @@ public class KafkaIndexTaskTuningConfigTest
|
|||
TuningConfig.class
|
||||
);
|
||||
|
||||
Assert.assertEquals(new File("/tmp/xxx"), config.getBasePersistDirectory());
|
||||
Assert.assertNull(config.getBasePersistDirectory());
|
||||
Assert.assertEquals(new OnheapIncrementalIndex.Spec(), config.getAppendableIndexSpec());
|
||||
Assert.assertEquals(100, config.getMaxRowsInMemory());
|
||||
Assert.assertEquals(100, config.getMaxRowsPerSegment().intValue());
|
||||
|
@ -127,7 +127,6 @@ public class KafkaIndexTaskTuningConfigTest
|
|||
2,
|
||||
10L,
|
||||
new Period("PT3S"),
|
||||
new File("/tmp/xxx"),
|
||||
4,
|
||||
new IndexSpec(),
|
||||
new IndexSpec(),
|
||||
|
@ -146,7 +145,7 @@ public class KafkaIndexTaskTuningConfigTest
|
|||
null,
|
||||
null
|
||||
);
|
||||
KafkaIndexTaskTuningConfig copy = (KafkaIndexTaskTuningConfig) original.convertToTaskTuningConfig();
|
||||
KafkaIndexTaskTuningConfig copy = original.convertToTaskTuningConfig();
|
||||
|
||||
Assert.assertEquals(original.getAppendableIndexSpec(), copy.getAppendableIndexSpec());
|
||||
Assert.assertEquals(1, copy.getMaxRowsInMemory());
|
||||
|
@ -154,7 +153,7 @@ public class KafkaIndexTaskTuningConfigTest
|
|||
Assert.assertNotEquals(null, copy.getMaxTotalRows());
|
||||
Assert.assertEquals(10L, copy.getMaxTotalRows().longValue());
|
||||
Assert.assertEquals(new Period("PT3S"), copy.getIntermediatePersistPeriod());
|
||||
Assert.assertEquals(new File("/tmp/xxx"), copy.getBasePersistDirectory());
|
||||
Assert.assertNull(copy.getBasePersistDirectory());
|
||||
Assert.assertEquals(4, copy.getMaxPendingPersists());
|
||||
Assert.assertEquals(new IndexSpec(), copy.getIndexSpec());
|
||||
Assert.assertEquals(true, copy.isReportParseExceptions());
|
||||
|
@ -197,7 +196,7 @@ public class KafkaIndexTaskTuningConfigTest
|
|||
Assert.assertEquals(base.getMaxRowsPerSegment(), deserialized.getMaxRowsPerSegment());
|
||||
Assert.assertEquals(base.getMaxTotalRows(), deserialized.getMaxTotalRows());
|
||||
Assert.assertEquals(base.getIntermediatePersistPeriod(), deserialized.getIntermediatePersistPeriod());
|
||||
Assert.assertEquals(base.getBasePersistDirectory(), deserialized.getBasePersistDirectory());
|
||||
Assert.assertNull(deserialized.getBasePersistDirectory());
|
||||
Assert.assertEquals(base.getMaxPendingPersists(), deserialized.getMaxPendingPersists());
|
||||
Assert.assertEquals(base.getIndexSpec(), deserialized.getIndexSpec());
|
||||
Assert.assertEquals(base.isReportParseExceptions(), deserialized.isReportParseExceptions());
|
||||
|
@ -221,7 +220,6 @@ public class KafkaIndexTaskTuningConfigTest
|
|||
2,
|
||||
10L,
|
||||
new Period("PT3S"),
|
||||
new File("/tmp/xxx"),
|
||||
4,
|
||||
new IndexSpec(),
|
||||
new IndexSpec(),
|
||||
|
|
|
@ -115,7 +115,6 @@ import org.junit.Test;
|
|||
import org.junit.runner.RunWith;
|
||||
import org.junit.runners.Parameterized;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
|
@ -313,7 +312,6 @@ public class KafkaSupervisorTest extends EasyMockSupport
|
|||
50000,
|
||||
null,
|
||||
new Period("P1Y"),
|
||||
new File("/test"),
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
|
@ -3771,7 +3769,6 @@ public class KafkaSupervisorTest extends EasyMockSupport
|
|||
50000,
|
||||
null,
|
||||
new Period("P1Y"),
|
||||
new File("/test"),
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
|
@ -3811,7 +3808,6 @@ public class KafkaSupervisorTest extends EasyMockSupport
|
|||
50000,
|
||||
null,
|
||||
new Period("P1Y"),
|
||||
new File("/test"),
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
|
@ -4138,7 +4134,6 @@ public class KafkaSupervisorTest extends EasyMockSupport
|
|||
50000,
|
||||
null,
|
||||
new Period("P1Y"),
|
||||
new File("/test"),
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
|
@ -4252,7 +4247,6 @@ public class KafkaSupervisorTest extends EasyMockSupport
|
|||
50000,
|
||||
null,
|
||||
new Period("P1Y"),
|
||||
new File("/test"),
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
|
|
|
@ -32,8 +32,6 @@ import org.joda.time.Period;
|
|||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.io.File;
|
||||
|
||||
public class KafkaSupervisorTuningConfigTest
|
||||
{
|
||||
private final ObjectMapper mapper;
|
||||
|
@ -59,7 +57,7 @@ public class KafkaSupervisorTuningConfigTest
|
|||
TuningConfig.class
|
||||
);
|
||||
|
||||
Assert.assertNotNull(config.getBasePersistDirectory());
|
||||
Assert.assertNull(config.getBasePersistDirectory());
|
||||
Assert.assertEquals(new OnheapIncrementalIndex.Spec(), config.getAppendableIndexSpec());
|
||||
Assert.assertEquals(1000000, config.getMaxRowsInMemory());
|
||||
Assert.assertEquals(5_000_000, config.getMaxRowsPerSegment().intValue());
|
||||
|
@ -110,7 +108,7 @@ public class KafkaSupervisorTuningConfigTest
|
|||
TuningConfig.class
|
||||
);
|
||||
|
||||
Assert.assertEquals(new File("/tmp/xxx"), config.getBasePersistDirectory());
|
||||
Assert.assertNull(config.getBasePersistDirectory());
|
||||
Assert.assertEquals(new OnheapIncrementalIndex.Spec(), config.getAppendableIndexSpec());
|
||||
Assert.assertEquals(100, config.getMaxRowsInMemory());
|
||||
Assert.assertEquals(100, config.getMaxRowsPerSegment().intValue());
|
||||
|
|
|
@ -29,7 +29,6 @@ import org.apache.druid.segment.writeout.SegmentWriteOutMediumFactory;
|
|||
import org.joda.time.Period;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
import java.io.File;
|
||||
|
||||
@JsonTypeName("KafkaTuningConfig")
|
||||
public class TestModifiedKafkaIndexTaskTuningConfig extends KafkaIndexTaskTuningConfig
|
||||
|
@ -45,7 +44,6 @@ public class TestModifiedKafkaIndexTaskTuningConfig extends KafkaIndexTaskTuning
|
|||
@JsonProperty("maxRowsPerSegment") @Nullable Integer maxRowsPerSegment,
|
||||
@JsonProperty("maxTotalRows") @Nullable Long maxTotalRows,
|
||||
@JsonProperty("intermediatePersistPeriod") @Nullable Period intermediatePersistPeriod,
|
||||
@JsonProperty("basePersistDirectory") @Nullable File basePersistDirectory,
|
||||
@JsonProperty("maxPendingPersists") @Nullable Integer maxPendingPersists,
|
||||
@JsonProperty("indexSpec") @Nullable IndexSpec indexSpec,
|
||||
@JsonProperty("indexSpecForIntermediatePersists") @Nullable IndexSpec indexSpecForIntermediatePersists,
|
||||
|
@ -68,7 +66,7 @@ public class TestModifiedKafkaIndexTaskTuningConfig extends KafkaIndexTaskTuning
|
|||
maxRowsPerSegment,
|
||||
maxTotalRows,
|
||||
intermediatePersistPeriod,
|
||||
basePersistDirectory,
|
||||
null,
|
||||
maxPendingPersists,
|
||||
indexSpec,
|
||||
indexSpecForIntermediatePersists,
|
||||
|
|
|
@ -47,33 +47,32 @@ public class KinesisIndexTaskTuningConfig extends SeekableStreamIndexTaskTuningC
|
|||
private final Integer fetchThreads;
|
||||
private final int maxRecordsPerPoll;
|
||||
|
||||
@JsonCreator
|
||||
public KinesisIndexTaskTuningConfig(
|
||||
@JsonProperty("appendableIndexSpec") @Nullable AppendableIndexSpec appendableIndexSpec,
|
||||
@JsonProperty("maxRowsInMemory") Integer maxRowsInMemory,
|
||||
@JsonProperty("maxBytesInMemory") Long maxBytesInMemory,
|
||||
@JsonProperty("skipBytesInMemoryOverheadCheck") @Nullable Boolean skipBytesInMemoryOverheadCheck,
|
||||
@JsonProperty("maxRowsPerSegment") Integer maxRowsPerSegment,
|
||||
@JsonProperty("maxTotalRows") Long maxTotalRows,
|
||||
@JsonProperty("intermediatePersistPeriod") Period intermediatePersistPeriod,
|
||||
@JsonProperty("basePersistDirectory") File basePersistDirectory,
|
||||
@JsonProperty("maxPendingPersists") Integer maxPendingPersists,
|
||||
@JsonProperty("indexSpec") IndexSpec indexSpec,
|
||||
@JsonProperty("indexSpecForIntermediatePersists") @Nullable IndexSpec indexSpecForIntermediatePersists,
|
||||
@JsonProperty("reportParseExceptions") Boolean reportParseExceptions,
|
||||
@JsonProperty("handoffConditionTimeout") Long handoffConditionTimeout,
|
||||
@JsonProperty("resetOffsetAutomatically") Boolean resetOffsetAutomatically,
|
||||
@JsonProperty("skipSequenceNumberAvailabilityCheck") Boolean skipSequenceNumberAvailabilityCheck,
|
||||
@JsonProperty("recordBufferSize") Integer recordBufferSize,
|
||||
@JsonProperty("recordBufferOfferTimeout") Integer recordBufferOfferTimeout,
|
||||
@JsonProperty("recordBufferFullWait") Integer recordBufferFullWait,
|
||||
@JsonProperty("fetchThreads") Integer fetchThreads,
|
||||
@JsonProperty("segmentWriteOutMediumFactory") @Nullable SegmentWriteOutMediumFactory segmentWriteOutMediumFactory,
|
||||
@JsonProperty("logParseExceptions") @Nullable Boolean logParseExceptions,
|
||||
@JsonProperty("maxParseExceptions") @Nullable Integer maxParseExceptions,
|
||||
@JsonProperty("maxSavedParseExceptions") @Nullable Integer maxSavedParseExceptions,
|
||||
@JsonProperty("maxRecordsPerPoll") @Nullable Integer maxRecordsPerPoll,
|
||||
@JsonProperty("intermediateHandoffPeriod") @Nullable Period intermediateHandoffPeriod
|
||||
@Nullable AppendableIndexSpec appendableIndexSpec,
|
||||
Integer maxRowsInMemory,
|
||||
Long maxBytesInMemory,
|
||||
@Nullable Boolean skipBytesInMemoryOverheadCheck,
|
||||
Integer maxRowsPerSegment,
|
||||
Long maxTotalRows,
|
||||
Period intermediatePersistPeriod,
|
||||
File basePersistDirectory,
|
||||
Integer maxPendingPersists,
|
||||
IndexSpec indexSpec,
|
||||
@Nullable IndexSpec indexSpecForIntermediatePersists,
|
||||
Boolean reportParseExceptions,
|
||||
Long handoffConditionTimeout,
|
||||
Boolean resetOffsetAutomatically,
|
||||
Boolean skipSequenceNumberAvailabilityCheck,
|
||||
Integer recordBufferSize,
|
||||
Integer recordBufferOfferTimeout,
|
||||
Integer recordBufferFullWait,
|
||||
Integer fetchThreads,
|
||||
@Nullable SegmentWriteOutMediumFactory segmentWriteOutMediumFactory,
|
||||
@Nullable Boolean logParseExceptions,
|
||||
@Nullable Integer maxParseExceptions,
|
||||
@Nullable Integer maxSavedParseExceptions,
|
||||
@Nullable Integer maxRecordsPerPoll,
|
||||
@Nullable Period intermediateHandoffPeriod
|
||||
)
|
||||
{
|
||||
super(
|
||||
|
@ -112,6 +111,63 @@ public class KinesisIndexTaskTuningConfig extends SeekableStreamIndexTaskTuningC
|
|||
);
|
||||
}
|
||||
|
||||
@JsonCreator
|
||||
private KinesisIndexTaskTuningConfig(
|
||||
@JsonProperty("appendableIndexSpec") @Nullable AppendableIndexSpec appendableIndexSpec,
|
||||
@JsonProperty("maxRowsInMemory") Integer maxRowsInMemory,
|
||||
@JsonProperty("maxBytesInMemory") Long maxBytesInMemory,
|
||||
@JsonProperty("skipBytesInMemoryOverheadCheck") @Nullable Boolean skipBytesInMemoryOverheadCheck,
|
||||
@JsonProperty("maxRowsPerSegment") Integer maxRowsPerSegment,
|
||||
@JsonProperty("maxTotalRows") Long maxTotalRows,
|
||||
@JsonProperty("intermediatePersistPeriod") Period intermediatePersistPeriod,
|
||||
@JsonProperty("maxPendingPersists") Integer maxPendingPersists,
|
||||
@JsonProperty("indexSpec") IndexSpec indexSpec,
|
||||
@JsonProperty("indexSpecForIntermediatePersists") @Nullable IndexSpec indexSpecForIntermediatePersists,
|
||||
@JsonProperty("reportParseExceptions") Boolean reportParseExceptions,
|
||||
@JsonProperty("handoffConditionTimeout") Long handoffConditionTimeout,
|
||||
@JsonProperty("resetOffsetAutomatically") Boolean resetOffsetAutomatically,
|
||||
@JsonProperty("skipSequenceNumberAvailabilityCheck") Boolean skipSequenceNumberAvailabilityCheck,
|
||||
@JsonProperty("recordBufferSize") Integer recordBufferSize,
|
||||
@JsonProperty("recordBufferOfferTimeout") Integer recordBufferOfferTimeout,
|
||||
@JsonProperty("recordBufferFullWait") Integer recordBufferFullWait,
|
||||
@JsonProperty("fetchThreads") Integer fetchThreads,
|
||||
@JsonProperty("segmentWriteOutMediumFactory") @Nullable SegmentWriteOutMediumFactory segmentWriteOutMediumFactory,
|
||||
@JsonProperty("logParseExceptions") @Nullable Boolean logParseExceptions,
|
||||
@JsonProperty("maxParseExceptions") @Nullable Integer maxParseExceptions,
|
||||
@JsonProperty("maxSavedParseExceptions") @Nullable Integer maxSavedParseExceptions,
|
||||
@JsonProperty("maxRecordsPerPoll") @Nullable Integer maxRecordsPerPoll,
|
||||
@JsonProperty("intermediateHandoffPeriod") @Nullable Period intermediateHandoffPeriod
|
||||
)
|
||||
{
|
||||
this(
|
||||
appendableIndexSpec,
|
||||
maxRowsInMemory,
|
||||
maxBytesInMemory,
|
||||
skipBytesInMemoryOverheadCheck,
|
||||
maxRowsPerSegment,
|
||||
maxTotalRows,
|
||||
intermediatePersistPeriod,
|
||||
null,
|
||||
maxPendingPersists,
|
||||
indexSpec,
|
||||
indexSpecForIntermediatePersists,
|
||||
reportParseExceptions,
|
||||
handoffConditionTimeout,
|
||||
resetOffsetAutomatically,
|
||||
skipSequenceNumberAvailabilityCheck,
|
||||
recordBufferSize,
|
||||
recordBufferOfferTimeout,
|
||||
recordBufferFullWait,
|
||||
fetchThreads,
|
||||
segmentWriteOutMediumFactory,
|
||||
logParseExceptions,
|
||||
maxParseExceptions,
|
||||
maxSavedParseExceptions,
|
||||
maxRecordsPerPoll,
|
||||
intermediateHandoffPeriod
|
||||
);
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
public int getRecordBufferSize()
|
||||
{
|
||||
|
@ -217,7 +273,6 @@ public class KinesisIndexTaskTuningConfig extends SeekableStreamIndexTaskTuningC
|
|||
", maxRowsPerSegment=" + getMaxRowsPerSegment() +
|
||||
", maxTotalRows=" + getMaxTotalRows() +
|
||||
", intermediatePersistPeriod=" + getIntermediatePersistPeriod() +
|
||||
", basePersistDirectory=" + getBasePersistDirectory() +
|
||||
", maxPendingPersists=" + getMaxPendingPersists() +
|
||||
", indexSpec=" + getIndexSpec() +
|
||||
", reportParseExceptions=" + isReportParseExceptions() +
|
||||
|
|
|
@ -29,7 +29,6 @@ import org.joda.time.Duration;
|
|||
import org.joda.time.Period;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
import java.io.File;
|
||||
|
||||
public class KinesisSupervisorTuningConfig extends KinesisIndexTaskTuningConfig
|
||||
implements SeekableStreamSupervisorTuningConfig
|
||||
|
@ -79,7 +78,6 @@ public class KinesisSupervisorTuningConfig extends KinesisIndexTaskTuningConfig
|
|||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null
|
||||
);
|
||||
}
|
||||
|
@ -92,7 +90,6 @@ public class KinesisSupervisorTuningConfig extends KinesisIndexTaskTuningConfig
|
|||
@JsonProperty("maxRowsPerSegment") Integer maxRowsPerSegment,
|
||||
@JsonProperty("maxTotalRows") Long maxTotalRows,
|
||||
@JsonProperty("intermediatePersistPeriod") Period intermediatePersistPeriod,
|
||||
@JsonProperty("basePersistDirectory") File basePersistDirectory,
|
||||
@JsonProperty("maxPendingPersists") Integer maxPendingPersists,
|
||||
@JsonProperty("indexSpec") IndexSpec indexSpec,
|
||||
@JsonProperty("indexSpecForIntermediatePersists") @Nullable IndexSpec indexSpecForIntermediatePersists,
|
||||
|
@ -129,7 +126,7 @@ public class KinesisSupervisorTuningConfig extends KinesisIndexTaskTuningConfig
|
|||
maxRowsPerSegment,
|
||||
maxTotalRows,
|
||||
intermediatePersistPeriod,
|
||||
basePersistDirectory,
|
||||
null,
|
||||
maxPendingPersists,
|
||||
indexSpec,
|
||||
indexSpecForIntermediatePersists,
|
||||
|
@ -239,7 +236,6 @@ public class KinesisSupervisorTuningConfig extends KinesisIndexTaskTuningConfig
|
|||
", maxRowsPerSegment=" + getMaxRowsPerSegment() +
|
||||
", maxTotalRows=" + getMaxTotalRows() +
|
||||
", intermediatePersistPeriod=" + getIntermediatePersistPeriod() +
|
||||
", basePersistDirectory=" + getBasePersistDirectory() +
|
||||
", maxPendingPersists=" + getMaxPendingPersists() +
|
||||
", indexSpec=" + getIndexSpec() +
|
||||
", reportParseExceptions=" + isReportParseExceptions() +
|
||||
|
@ -278,7 +274,7 @@ public class KinesisSupervisorTuningConfig extends KinesisIndexTaskTuningConfig
|
|||
getMaxRowsPerSegment(),
|
||||
getMaxTotalRows(),
|
||||
getIntermediatePersistPeriod(),
|
||||
getBasePersistDirectory(),
|
||||
null,
|
||||
getMaxPendingPersists(),
|
||||
getIndexSpec(),
|
||||
getIndexSpecForIntermediatePersists(),
|
||||
|
|
|
@ -67,7 +67,7 @@ public class KinesisIndexTaskTuningConfigTest
|
|||
TuningConfig.class
|
||||
);
|
||||
|
||||
Assert.assertNotNull(config.getBasePersistDirectory());
|
||||
Assert.assertNull(config.getBasePersistDirectory());
|
||||
Assert.assertEquals(new OnheapIncrementalIndex.Spec(), config.getAppendableIndexSpec());
|
||||
Assert.assertEquals(1000000, config.getMaxRowsInMemory());
|
||||
Assert.assertEquals(5_000_000, config.getMaxRowsPerSegment().intValue());
|
||||
|
@ -115,7 +115,7 @@ public class KinesisIndexTaskTuningConfigTest
|
|||
TuningConfig.class
|
||||
);
|
||||
|
||||
Assert.assertEquals(new File("/tmp/xxx"), config.getBasePersistDirectory());
|
||||
Assert.assertNull(config.getBasePersistDirectory());
|
||||
Assert.assertEquals(new OnheapIncrementalIndex.Spec(), config.getAppendableIndexSpec());
|
||||
Assert.assertEquals(100, config.getMaxRowsInMemory());
|
||||
Assert.assertEquals(100, config.getMaxRowsPerSegment().intValue());
|
||||
|
@ -173,7 +173,7 @@ public class KinesisIndexTaskTuningConfigTest
|
|||
Assert.assertEquals(base.getMaxRowsPerSegment(), deserialized.getMaxRowsPerSegment());
|
||||
Assert.assertEquals(base.getMaxTotalRows(), deserialized.getMaxTotalRows());
|
||||
Assert.assertEquals(base.getIntermediatePersistPeriod(), deserialized.getIntermediatePersistPeriod());
|
||||
Assert.assertEquals(base.getBasePersistDirectory(), deserialized.getBasePersistDirectory());
|
||||
Assert.assertNull(deserialized.getBasePersistDirectory());
|
||||
Assert.assertEquals(base.getMaxPendingPersists(), deserialized.getMaxPendingPersists());
|
||||
Assert.assertEquals(base.getIndexSpec(), deserialized.getIndexSpec());
|
||||
Assert.assertEquals(base.isReportParseExceptions(), deserialized.isReportParseExceptions());
|
||||
|
@ -231,7 +231,7 @@ public class KinesisIndexTaskTuningConfigTest
|
|||
Assert.assertEquals(base.getMaxRowsPerSegment(), deserialized.getMaxRowsPerSegment());
|
||||
Assert.assertEquals(base.getMaxTotalRows(), deserialized.getMaxTotalRows());
|
||||
Assert.assertEquals(base.getIntermediatePersistPeriod(), deserialized.getIntermediatePersistPeriod());
|
||||
Assert.assertEquals(base.getBasePersistDirectory(), deserialized.getBasePersistDirectory());
|
||||
Assert.assertNull(deserialized.getBasePersistDirectory());
|
||||
Assert.assertEquals(base.getMaxPendingPersists(), deserialized.getMaxPendingPersists());
|
||||
Assert.assertEquals(base.getIndexSpec(), deserialized.getIndexSpec());
|
||||
Assert.assertEquals(base.isReportParseExceptions(), deserialized.isReportParseExceptions());
|
||||
|
@ -286,7 +286,6 @@ public class KinesisIndexTaskTuningConfigTest
|
|||
2,
|
||||
100L,
|
||||
new Period("PT3S"),
|
||||
new File("/tmp/xxx"),
|
||||
4,
|
||||
new IndexSpec(),
|
||||
new IndexSpec(),
|
||||
|
@ -322,7 +321,7 @@ public class KinesisIndexTaskTuningConfigTest
|
|||
Assert.assertEquals(2, copy.getMaxRowsPerSegment().intValue());
|
||||
Assert.assertEquals(100L, (long) copy.getMaxTotalRows());
|
||||
Assert.assertEquals(new Period("PT3S"), copy.getIntermediatePersistPeriod());
|
||||
Assert.assertEquals(new File("/tmp/xxx"), copy.getBasePersistDirectory());
|
||||
Assert.assertNull(copy.getBasePersistDirectory());
|
||||
Assert.assertEquals(4, copy.getMaxPendingPersists());
|
||||
Assert.assertEquals(new IndexSpec(), copy.getIndexSpec());
|
||||
Assert.assertTrue(copy.isReportParseExceptions());
|
||||
|
|
|
@ -103,7 +103,6 @@ import org.junit.Before;
|
|||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.io.File;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
|
@ -185,7 +184,6 @@ public class KinesisSupervisorTest extends EasyMockSupport
|
|||
50000,
|
||||
null,
|
||||
new Period("P1Y"),
|
||||
new File("/test"),
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
|
@ -3948,7 +3946,6 @@ public class KinesisSupervisorTest extends EasyMockSupport
|
|||
50000,
|
||||
null,
|
||||
new Period("P1Y"),
|
||||
new File("/test"),
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
|
@ -5061,7 +5058,6 @@ public class KinesisSupervisorTest extends EasyMockSupport
|
|||
50000,
|
||||
null,
|
||||
new Period("P1Y"),
|
||||
new File("/test"),
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
|
|
|
@ -31,8 +31,6 @@ import org.joda.time.Period;
|
|||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.io.File;
|
||||
|
||||
public class KinesisSupervisorTuningConfigTest
|
||||
{
|
||||
private final ObjectMapper mapper;
|
||||
|
@ -58,7 +56,7 @@ public class KinesisSupervisorTuningConfigTest
|
|||
TuningConfig.class
|
||||
);
|
||||
|
||||
Assert.assertNotNull(config.getBasePersistDirectory());
|
||||
Assert.assertNull(config.getBasePersistDirectory());
|
||||
Assert.assertEquals(new OnheapIncrementalIndex.Spec(), config.getAppendableIndexSpec());
|
||||
Assert.assertEquals(1000000, config.getMaxRowsInMemory());
|
||||
Assert.assertEquals(5_000_000, config.getMaxRowsPerSegment().intValue());
|
||||
|
@ -106,7 +104,7 @@ public class KinesisSupervisorTuningConfigTest
|
|||
TuningConfig.class
|
||||
);
|
||||
|
||||
Assert.assertEquals(new File("/tmp/xxx"), config.getBasePersistDirectory());
|
||||
Assert.assertNull(config.getBasePersistDirectory());
|
||||
Assert.assertEquals(new OnheapIncrementalIndex.Spec(), config.getAppendableIndexSpec());
|
||||
Assert.assertEquals(100, config.getMaxRowsInMemory());
|
||||
Assert.assertEquals(100, config.getMaxRowsPerSegment().intValue());
|
||||
|
|
|
@ -29,7 +29,6 @@ import org.apache.druid.segment.writeout.SegmentWriteOutMediumFactory;
|
|||
import org.joda.time.Period;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
import java.io.File;
|
||||
|
||||
@JsonTypeName("KinesisTuningConfig")
|
||||
public class TestModifiedKinesisIndexTaskTuningConfig extends KinesisIndexTaskTuningConfig
|
||||
|
@ -45,7 +44,6 @@ public class TestModifiedKinesisIndexTaskTuningConfig extends KinesisIndexTaskTu
|
|||
@JsonProperty("maxRowsPerSegment") Integer maxRowsPerSegment,
|
||||
@JsonProperty("maxTotalRows") Long maxTotalRows,
|
||||
@JsonProperty("intermediatePersistPeriod") Period intermediatePersistPeriod,
|
||||
@JsonProperty("basePersistDirectory") File basePersistDirectory,
|
||||
@JsonProperty("maxPendingPersists") Integer maxPendingPersists,
|
||||
@JsonProperty("indexSpec") IndexSpec indexSpec,
|
||||
@JsonProperty("indexSpecForIntermediatePersists") @Nullable IndexSpec indexSpecForIntermediatePersists,
|
||||
|
@ -74,7 +72,7 @@ public class TestModifiedKinesisIndexTaskTuningConfig extends KinesisIndexTaskTu
|
|||
maxRowsPerSegment,
|
||||
maxTotalRows,
|
||||
intermediatePersistPeriod,
|
||||
basePersistDirectory,
|
||||
null,
|
||||
maxPendingPersists,
|
||||
indexSpec,
|
||||
indexSpecForIntermediatePersists,
|
||||
|
|
|
@ -24,7 +24,6 @@ import com.fasterxml.jackson.annotation.JsonProperty;
|
|||
import com.fasterxml.jackson.annotation.JsonTypeName;
|
||||
import com.google.common.base.Preconditions;
|
||||
import org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
|
||||
import org.apache.druid.java.util.common.FileUtils;
|
||||
import org.apache.druid.segment.IndexSpec;
|
||||
import org.apache.druid.segment.incremental.AppendableIndexSpec;
|
||||
import org.apache.druid.segment.indexing.TuningConfig;
|
||||
|
@ -48,11 +47,6 @@ public class RealtimeAppenderatorTuningConfig implements AppenderatorConfig
|
|||
private static final long DEFAULT_HANDOFF_CONDITION_TIMEOUT = 0;
|
||||
private static final long DEFAULT_ALERT_TIMEOUT = 0;
|
||||
|
||||
private static File createNewBasePersistDirectory()
|
||||
{
|
||||
return FileUtils.createTempDir("druid-realtime-persist");
|
||||
}
|
||||
|
||||
private final AppendableIndexSpec appendableIndexSpec;
|
||||
private final int maxRowsInMemory;
|
||||
private final long maxBytesInMemory;
|
||||
|
@ -74,27 +68,26 @@ public class RealtimeAppenderatorTuningConfig implements AppenderatorConfig
|
|||
private final int maxParseExceptions;
|
||||
private final int maxSavedParseExceptions;
|
||||
|
||||
@JsonCreator
|
||||
public RealtimeAppenderatorTuningConfig(
|
||||
@JsonProperty("appendableIndexSpec") @Nullable AppendableIndexSpec appendableIndexSpec,
|
||||
@JsonProperty("maxRowsInMemory") Integer maxRowsInMemory,
|
||||
@JsonProperty("maxBytesInMemory") @Nullable Long maxBytesInMemory,
|
||||
@JsonProperty("skipBytesInMemoryOverheadCheck") @Nullable Boolean skipBytesInMemoryOverheadCheck,
|
||||
@JsonProperty("maxRowsPerSegment") @Nullable Integer maxRowsPerSegment,
|
||||
@JsonProperty("maxTotalRows") @Nullable Long maxTotalRows,
|
||||
@JsonProperty("intermediatePersistPeriod") Period intermediatePersistPeriod,
|
||||
@JsonProperty("basePersistDirectory") File basePersistDirectory,
|
||||
@JsonProperty("maxPendingPersists") Integer maxPendingPersists,
|
||||
@JsonProperty("shardSpec") ShardSpec shardSpec,
|
||||
@JsonProperty("indexSpec") IndexSpec indexSpec,
|
||||
@JsonProperty("indexSpecForIntermediatePersists") @Nullable IndexSpec indexSpecForIntermediatePersists,
|
||||
@JsonProperty("reportParseExceptions") Boolean reportParseExceptions,
|
||||
@JsonProperty("publishAndHandoffTimeout") Long publishAndHandoffTimeout,
|
||||
@JsonProperty("alertTimeout") Long alertTimeout,
|
||||
@JsonProperty("segmentWriteOutMediumFactory") @Nullable SegmentWriteOutMediumFactory segmentWriteOutMediumFactory,
|
||||
@JsonProperty("logParseExceptions") @Nullable Boolean logParseExceptions,
|
||||
@JsonProperty("maxParseExceptions") @Nullable Integer maxParseExceptions,
|
||||
@JsonProperty("maxSavedParseExceptions") @Nullable Integer maxSavedParseExceptions
|
||||
@Nullable AppendableIndexSpec appendableIndexSpec,
|
||||
Integer maxRowsInMemory,
|
||||
@Nullable Long maxBytesInMemory,
|
||||
@Nullable Boolean skipBytesInMemoryOverheadCheck,
|
||||
@Nullable Integer maxRowsPerSegment,
|
||||
@Nullable Long maxTotalRows,
|
||||
Period intermediatePersistPeriod,
|
||||
File basePersistDirectory,
|
||||
Integer maxPendingPersists,
|
||||
ShardSpec shardSpec,
|
||||
IndexSpec indexSpec,
|
||||
@Nullable IndexSpec indexSpecForIntermediatePersists,
|
||||
Boolean reportParseExceptions,
|
||||
Long publishAndHandoffTimeout,
|
||||
Long alertTimeout,
|
||||
@Nullable SegmentWriteOutMediumFactory segmentWriteOutMediumFactory,
|
||||
@Nullable Boolean logParseExceptions,
|
||||
@Nullable Integer maxParseExceptions,
|
||||
@Nullable Integer maxSavedParseExceptions
|
||||
)
|
||||
{
|
||||
this.appendableIndexSpec = appendableIndexSpec == null ? DEFAULT_APPENDABLE_INDEX : appendableIndexSpec;
|
||||
|
@ -108,7 +101,7 @@ public class RealtimeAppenderatorTuningConfig implements AppenderatorConfig
|
|||
this.intermediatePersistPeriod = intermediatePersistPeriod == null
|
||||
? DEFAULT_INTERMEDIATE_PERSIST_PERIOD
|
||||
: intermediatePersistPeriod;
|
||||
this.basePersistDirectory = basePersistDirectory == null ? createNewBasePersistDirectory() : basePersistDirectory;
|
||||
this.basePersistDirectory = basePersistDirectory;
|
||||
this.maxPendingPersists = maxPendingPersists == null ? DEFAULT_MAX_PENDING_PERSISTS : maxPendingPersists;
|
||||
this.shardSpec = shardSpec == null ? DEFAULT_SHARD_SPEC : shardSpec;
|
||||
this.indexSpec = indexSpec == null ? DEFAULT_INDEX_SPEC : indexSpec;
|
||||
|
@ -142,6 +135,51 @@ public class RealtimeAppenderatorTuningConfig implements AppenderatorConfig
|
|||
: logParseExceptions;
|
||||
}
|
||||
|
||||
@JsonCreator
|
||||
private RealtimeAppenderatorTuningConfig(
|
||||
@JsonProperty("appendableIndexSpec") @Nullable AppendableIndexSpec appendableIndexSpec,
|
||||
@JsonProperty("maxRowsInMemory") Integer maxRowsInMemory,
|
||||
@JsonProperty("maxBytesInMemory") @Nullable Long maxBytesInMemory,
|
||||
@JsonProperty("skipBytesInMemoryOverheadCheck") @Nullable Boolean skipBytesInMemoryOverheadCheck,
|
||||
@JsonProperty("maxRowsPerSegment") @Nullable Integer maxRowsPerSegment,
|
||||
@JsonProperty("maxTotalRows") @Nullable Long maxTotalRows,
|
||||
@JsonProperty("intermediatePersistPeriod") Period intermediatePersistPeriod,
|
||||
@JsonProperty("maxPendingPersists") Integer maxPendingPersists,
|
||||
@JsonProperty("shardSpec") ShardSpec shardSpec,
|
||||
@JsonProperty("indexSpec") IndexSpec indexSpec,
|
||||
@JsonProperty("indexSpecForIntermediatePersists") @Nullable IndexSpec indexSpecForIntermediatePersists,
|
||||
@JsonProperty("reportParseExceptions") Boolean reportParseExceptions,
|
||||
@JsonProperty("publishAndHandoffTimeout") Long publishAndHandoffTimeout,
|
||||
@JsonProperty("alertTimeout") Long alertTimeout,
|
||||
@JsonProperty("segmentWriteOutMediumFactory") @Nullable SegmentWriteOutMediumFactory segmentWriteOutMediumFactory,
|
||||
@JsonProperty("logParseExceptions") @Nullable Boolean logParseExceptions,
|
||||
@JsonProperty("maxParseExceptions") @Nullable Integer maxParseExceptions,
|
||||
@JsonProperty("maxSavedParseExceptions") @Nullable Integer maxSavedParseExceptions
|
||||
)
|
||||
{
|
||||
this(
|
||||
appendableIndexSpec,
|
||||
maxRowsInMemory,
|
||||
maxBytesInMemory,
|
||||
skipBytesInMemoryOverheadCheck,
|
||||
maxRowsPerSegment,
|
||||
maxTotalRows,
|
||||
intermediatePersistPeriod,
|
||||
null,
|
||||
maxPendingPersists,
|
||||
shardSpec,
|
||||
indexSpec,
|
||||
indexSpecForIntermediatePersists,
|
||||
reportParseExceptions,
|
||||
publishAndHandoffTimeout,
|
||||
alertTimeout,
|
||||
segmentWriteOutMediumFactory,
|
||||
logParseExceptions,
|
||||
maxParseExceptions,
|
||||
maxSavedParseExceptions
|
||||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
@JsonProperty
|
||||
public AppendableIndexSpec getAppendableIndexSpec()
|
||||
|
@ -199,10 +237,9 @@ public class RealtimeAppenderatorTuningConfig implements AppenderatorConfig
|
|||
}
|
||||
|
||||
@Override
|
||||
@JsonProperty
|
||||
public File getBasePersistDirectory()
|
||||
{
|
||||
return basePersistDirectory;
|
||||
return Preconditions.checkNotNull(basePersistDirectory, "basePersistDirectory not set");
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -98,7 +98,7 @@ public abstract class SeekableStreamIndexTaskTuningConfig implements Appenderato
|
|||
this.intermediatePersistPeriod = intermediatePersistPeriod == null
|
||||
? defaults.getIntermediatePersistPeriod()
|
||||
: intermediatePersistPeriod;
|
||||
this.basePersistDirectory = defaults.getBasePersistDirectory();
|
||||
this.basePersistDirectory = basePersistDirectory;
|
||||
this.maxPendingPersists = maxPendingPersists == null ? 0 : maxPendingPersists;
|
||||
this.indexSpec = indexSpec == null ? defaults.getIndexSpec() : indexSpec;
|
||||
this.indexSpecForIntermediatePersists = indexSpecForIntermediatePersists == null ?
|
||||
|
@ -193,7 +193,6 @@ public abstract class SeekableStreamIndexTaskTuningConfig implements Appenderato
|
|||
}
|
||||
|
||||
@Override
|
||||
@JsonProperty
|
||||
public File getBasePersistDirectory()
|
||||
{
|
||||
return basePersistDirectory;
|
||||
|
|
|
@ -23,7 +23,6 @@ import com.fasterxml.jackson.annotation.JsonCreator;
|
|||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import com.google.common.base.Preconditions;
|
||||
import org.apache.druid.indexer.partitions.PartitionsSpec;
|
||||
import org.apache.druid.java.util.common.FileUtils;
|
||||
import org.apache.druid.segment.IndexSpec;
|
||||
import org.apache.druid.segment.incremental.AppendableIndexSpec;
|
||||
import org.apache.druid.segment.realtime.appenderator.AppenderatorConfig;
|
||||
|
@ -56,11 +55,6 @@ public class RealtimeTuningConfig implements AppenderatorConfig
|
|||
private static final long DEFAULT_ALERT_TIMEOUT = 0;
|
||||
private static final String DEFAULT_DEDUP_COLUMN = null;
|
||||
|
||||
private static File createNewBasePersistDirectory()
|
||||
{
|
||||
return FileUtils.createTempDir("druid-realtime-persist");
|
||||
}
|
||||
|
||||
// Might make sense for this to be a builder
|
||||
public static RealtimeTuningConfig makeDefaultTuningConfig(final @Nullable File basePersistDirectory)
|
||||
{
|
||||
|
@ -71,7 +65,7 @@ public class RealtimeTuningConfig implements AppenderatorConfig
|
|||
DEFAULT_SKIP_BYTES_IN_MEMORY_OVERHEAD_CHECK,
|
||||
DEFAULT_INTERMEDIATE_PERSIST_PERIOD,
|
||||
DEFAULT_WINDOW_PERIOD,
|
||||
basePersistDirectory == null ? createNewBasePersistDirectory() : basePersistDirectory,
|
||||
basePersistDirectory,
|
||||
DEFAULT_VERSIONING_POLICY,
|
||||
DEFAULT_REJECTION_POLICY_FACTORY,
|
||||
DEFAULT_MAX_PENDING_PERSISTS,
|
||||
|
@ -111,28 +105,27 @@ public class RealtimeTuningConfig implements AppenderatorConfig
|
|||
@Nullable
|
||||
private final String dedupColumn;
|
||||
|
||||
@JsonCreator
|
||||
public RealtimeTuningConfig(
|
||||
@JsonProperty("appendableIndexSpec") @Nullable AppendableIndexSpec appendableIndexSpec,
|
||||
@JsonProperty("maxRowsInMemory") Integer maxRowsInMemory,
|
||||
@JsonProperty("maxBytesInMemory") Long maxBytesInMemory,
|
||||
@JsonProperty("skipBytesInMemoryOverheadCheck") @Nullable Boolean skipBytesInMemoryOverheadCheck,
|
||||
@JsonProperty("intermediatePersistPeriod") Period intermediatePersistPeriod,
|
||||
@JsonProperty("windowPeriod") Period windowPeriod,
|
||||
@JsonProperty("basePersistDirectory") File basePersistDirectory,
|
||||
@JsonProperty("versioningPolicy") VersioningPolicy versioningPolicy,
|
||||
@JsonProperty("rejectionPolicy") RejectionPolicyFactory rejectionPolicyFactory,
|
||||
@JsonProperty("maxPendingPersists") Integer maxPendingPersists,
|
||||
@JsonProperty("shardSpec") ShardSpec shardSpec,
|
||||
@JsonProperty("indexSpec") IndexSpec indexSpec,
|
||||
@JsonProperty("indexSpecForIntermediatePersists") @Nullable IndexSpec indexSpecForIntermediatePersists,
|
||||
@JsonProperty("persistThreadPriority") int persistThreadPriority,
|
||||
@JsonProperty("mergeThreadPriority") int mergeThreadPriority,
|
||||
@JsonProperty("reportParseExceptions") Boolean reportParseExceptions,
|
||||
@JsonProperty("handoffConditionTimeout") Long handoffConditionTimeout,
|
||||
@JsonProperty("alertTimeout") Long alertTimeout,
|
||||
@JsonProperty("segmentWriteOutMediumFactory") @Nullable SegmentWriteOutMediumFactory segmentWriteOutMediumFactory,
|
||||
@JsonProperty("dedupColumn") @Nullable String dedupColumn
|
||||
@Nullable AppendableIndexSpec appendableIndexSpec,
|
||||
Integer maxRowsInMemory,
|
||||
Long maxBytesInMemory,
|
||||
@Nullable Boolean skipBytesInMemoryOverheadCheck,
|
||||
Period intermediatePersistPeriod,
|
||||
Period windowPeriod,
|
||||
File basePersistDirectory,
|
||||
VersioningPolicy versioningPolicy,
|
||||
RejectionPolicyFactory rejectionPolicyFactory,
|
||||
Integer maxPendingPersists,
|
||||
ShardSpec shardSpec,
|
||||
IndexSpec indexSpec,
|
||||
@Nullable IndexSpec indexSpecForIntermediatePersists,
|
||||
int persistThreadPriority,
|
||||
int mergeThreadPriority,
|
||||
Boolean reportParseExceptions,
|
||||
Long handoffConditionTimeout,
|
||||
Long alertTimeout,
|
||||
@Nullable SegmentWriteOutMediumFactory segmentWriteOutMediumFactory,
|
||||
@Nullable String dedupColumn
|
||||
)
|
||||
{
|
||||
this.appendableIndexSpec = appendableIndexSpec == null ? DEFAULT_APPENDABLE_INDEX : appendableIndexSpec;
|
||||
|
@ -146,8 +139,8 @@ public class RealtimeTuningConfig implements AppenderatorConfig
|
|||
? DEFAULT_INTERMEDIATE_PERSIST_PERIOD
|
||||
: intermediatePersistPeriod;
|
||||
this.windowPeriod = windowPeriod == null ? DEFAULT_WINDOW_PERIOD : windowPeriod;
|
||||
this.basePersistDirectory = basePersistDirectory == null ? createNewBasePersistDirectory() : basePersistDirectory;
|
||||
this.versioningPolicy = versioningPolicy == null ? DEFAULT_VERSIONING_POLICY : versioningPolicy;
|
||||
this.basePersistDirectory = basePersistDirectory;
|
||||
this.versioningPolicy = versioningPolicy;
|
||||
this.rejectionPolicyFactory = rejectionPolicyFactory == null
|
||||
? DEFAULT_REJECTION_POLICY_FACTORY
|
||||
: rejectionPolicyFactory;
|
||||
|
@ -172,6 +165,52 @@ public class RealtimeTuningConfig implements AppenderatorConfig
|
|||
this.dedupColumn = dedupColumn == null ? DEFAULT_DEDUP_COLUMN : dedupColumn;
|
||||
}
|
||||
|
||||
@JsonCreator
|
||||
private RealtimeTuningConfig(
|
||||
@JsonProperty("appendableIndexSpec") @Nullable AppendableIndexSpec appendableIndexSpec,
|
||||
@JsonProperty("maxRowsInMemory") Integer maxRowsInMemory,
|
||||
@JsonProperty("maxBytesInMemory") Long maxBytesInMemory,
|
||||
@JsonProperty("skipBytesInMemoryOverheadCheck") @Nullable Boolean skipBytesInMemoryOverheadCheck,
|
||||
@JsonProperty("intermediatePersistPeriod") Period intermediatePersistPeriod,
|
||||
@JsonProperty("windowPeriod") Period windowPeriod,
|
||||
@JsonProperty("rejectionPolicy") RejectionPolicyFactory rejectionPolicyFactory,
|
||||
@JsonProperty("maxPendingPersists") Integer maxPendingPersists,
|
||||
@JsonProperty("shardSpec") ShardSpec shardSpec,
|
||||
@JsonProperty("indexSpec") IndexSpec indexSpec,
|
||||
@JsonProperty("indexSpecForIntermediatePersists") @Nullable IndexSpec indexSpecForIntermediatePersists,
|
||||
@JsonProperty("persistThreadPriority") int persistThreadPriority,
|
||||
@JsonProperty("mergeThreadPriority") int mergeThreadPriority,
|
||||
@JsonProperty("reportParseExceptions") Boolean reportParseExceptions,
|
||||
@JsonProperty("handoffConditionTimeout") Long handoffConditionTimeout,
|
||||
@JsonProperty("alertTimeout") Long alertTimeout,
|
||||
@JsonProperty("segmentWriteOutMediumFactory") @Nullable SegmentWriteOutMediumFactory segmentWriteOutMediumFactory,
|
||||
@JsonProperty("dedupColumn") @Nullable String dedupColumn
|
||||
)
|
||||
{
|
||||
this(
|
||||
appendableIndexSpec,
|
||||
maxRowsInMemory,
|
||||
maxBytesInMemory,
|
||||
skipBytesInMemoryOverheadCheck,
|
||||
intermediatePersistPeriod,
|
||||
windowPeriod,
|
||||
null,
|
||||
null,
|
||||
rejectionPolicyFactory,
|
||||
maxPendingPersists,
|
||||
shardSpec,
|
||||
indexSpec,
|
||||
indexSpecForIntermediatePersists,
|
||||
persistThreadPriority,
|
||||
mergeThreadPriority,
|
||||
reportParseExceptions,
|
||||
handoffConditionTimeout,
|
||||
alertTimeout,
|
||||
segmentWriteOutMediumFactory,
|
||||
dedupColumn
|
||||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
@JsonProperty
|
||||
public AppendableIndexSpec getAppendableIndexSpec()
|
||||
|
@ -214,16 +253,14 @@ public class RealtimeTuningConfig implements AppenderatorConfig
|
|||
}
|
||||
|
||||
@Override
|
||||
@JsonProperty
|
||||
public File getBasePersistDirectory()
|
||||
{
|
||||
return basePersistDirectory;
|
||||
return Preconditions.checkNotNull(basePersistDirectory, "basePersistDirectory not set");
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
public VersioningPolicy getVersioningPolicy()
|
||||
{
|
||||
return versioningPolicy;
|
||||
return Preconditions.checkNotNull(versioningPolicy, "versioningPolicy not set");
|
||||
}
|
||||
|
||||
@JsonProperty("rejectionPolicy")
|
||||
|
|
|
@ -35,14 +35,6 @@ import java.util.UUID;
|
|||
|
||||
public class RealtimeTuningConfigTest
|
||||
{
|
||||
@Test
|
||||
public void testDefaultBasePersistDirectory()
|
||||
{
|
||||
final RealtimeTuningConfig tuningConfig1 = RealtimeTuningConfig.makeDefaultTuningConfig(null);
|
||||
final RealtimeTuningConfig tuningConfig2 = RealtimeTuningConfig.makeDefaultTuningConfig(null);
|
||||
Assert.assertNotEquals(tuningConfig1.getBasePersistDirectory(), tuningConfig2.getBasePersistDirectory());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testErrorMessageIsMeaningfulWhenUnableToCreateTemporaryDirectory()
|
||||
{
|
||||
|
@ -89,7 +81,6 @@ public class RealtimeTuningConfigTest
|
|||
TuningConfig.class
|
||||
);
|
||||
|
||||
Assert.assertNotNull(config.getBasePersistDirectory());
|
||||
Assert.assertEquals(new OnheapIncrementalIndex.Spec(), config.getAppendableIndexSpec());
|
||||
Assert.assertEquals(0, config.getHandoffConditionTimeout());
|
||||
Assert.assertEquals(0, config.getAlertTimeout());
|
||||
|
@ -102,7 +93,7 @@ public class RealtimeTuningConfigTest
|
|||
Assert.assertEquals(0, config.getMergeThreadPriority());
|
||||
Assert.assertEquals(0, config.getPersistThreadPriority());
|
||||
Assert.assertEquals(new Period("PT10M"), config.getWindowPeriod());
|
||||
Assert.assertEquals(false, config.isReportParseExceptions());
|
||||
Assert.assertFalse(config.isReportParseExceptions());
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -113,7 +104,6 @@ public class RealtimeTuningConfigTest
|
|||
+ " \"maxRowsInMemory\": 100,\n"
|
||||
+ " \"intermediatePersistPeriod\": \"PT1H\",\n"
|
||||
+ " \"windowPeriod\": \"PT1H\",\n"
|
||||
+ " \"basePersistDirectory\": \"/tmp/xxx\",\n"
|
||||
+ " \"maxPendingPersists\": 100,\n"
|
||||
+ " \"persistThreadPriority\": 100,\n"
|
||||
+ " \"mergeThreadPriority\": 100,\n"
|
||||
|
@ -136,7 +126,6 @@ public class RealtimeTuningConfigTest
|
|||
TuningConfig.class
|
||||
);
|
||||
|
||||
Assert.assertEquals("/tmp/xxx", config.getBasePersistDirectory().toString());
|
||||
Assert.assertEquals(new OnheapIncrementalIndex.Spec(), config.getAppendableIndexSpec());
|
||||
Assert.assertEquals(100, config.getHandoffConditionTimeout());
|
||||
Assert.assertEquals(70, config.getAlertTimeout());
|
||||
|
|
|
@ -29,16 +29,27 @@ import org.apache.druid.segment.realtime.plumber.NoopRejectionPolicyFactory;
|
|||
import org.apache.druid.server.coordination.DataSegmentAnnouncer;
|
||||
import org.easymock.EasyMock;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
import org.junit.rules.TemporaryFolder;
|
||||
|
||||
public class AppenderatorPlumberTest
|
||||
{
|
||||
private final AppenderatorPlumber plumber;
|
||||
private final StreamAppenderatorTester streamAppenderatorTester;
|
||||
private AppenderatorPlumber plumber;
|
||||
private StreamAppenderatorTester streamAppenderatorTester;
|
||||
|
||||
public AppenderatorPlumberTest() throws Exception
|
||||
@Rule
|
||||
public TemporaryFolder temporaryFolder = new TemporaryFolder();
|
||||
|
||||
@Before
|
||||
public void setUp() throws Exception
|
||||
{
|
||||
this.streamAppenderatorTester = new StreamAppenderatorTester(10);
|
||||
this.streamAppenderatorTester =
|
||||
new StreamAppenderatorTester.Builder()
|
||||
.maxRowsInMemory(10)
|
||||
.basePersistDirectory(temporaryFolder.newFolder())
|
||||
.build();
|
||||
DataSegmentAnnouncer segmentAnnouncer = EasyMock
|
||||
.createMock(DataSegmentAnnouncer.class);
|
||||
segmentAnnouncer.announceSegment(EasyMock.anyObject());
|
||||
|
@ -60,7 +71,7 @@ public class AppenderatorPlumberTest
|
|||
EasyMock.anyObject(),
|
||||
EasyMock.anyObject(),
|
||||
EasyMock.anyObject())).andReturn(true).anyTimes();
|
||||
|
||||
|
||||
RealtimeTuningConfig tuningConfig = new RealtimeTuningConfig(
|
||||
null,
|
||||
1,
|
||||
|
@ -68,7 +79,7 @@ public class AppenderatorPlumberTest
|
|||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
temporaryFolder.newFolder(),
|
||||
new IntervalStartVersioningPolicy(),
|
||||
new NoopRejectionPolicyFactory(),
|
||||
null,
|
||||
|
@ -88,7 +99,6 @@ public class AppenderatorPlumberTest
|
|||
tuningConfig, streamAppenderatorTester.getMetrics(),
|
||||
segmentAnnouncer, segmentPublisher, handoffNotifier,
|
||||
streamAppenderatorTester.getAppenderator());
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
|
@ -50,7 +50,9 @@ import org.joda.time.DateTime;
|
|||
import org.junit.After;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
import org.junit.rules.TemporaryFolder;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Arrays;
|
||||
|
@ -100,14 +102,21 @@ public class StreamAppenderatorDriverTest extends EasyMockSupport
|
|||
private StreamAppenderatorDriver driver;
|
||||
private DataSegmentKiller dataSegmentKiller;
|
||||
|
||||
@Rule
|
||||
public TemporaryFolder temporaryFolder = new TemporaryFolder();
|
||||
|
||||
static {
|
||||
NullHandling.initializeForTests();
|
||||
}
|
||||
|
||||
@Before
|
||||
public void setUp()
|
||||
public void setUp() throws Exception
|
||||
{
|
||||
streamAppenderatorTester = new StreamAppenderatorTester(MAX_ROWS_IN_MEMORY);
|
||||
streamAppenderatorTester =
|
||||
new StreamAppenderatorTester.Builder()
|
||||
.maxRowsInMemory(MAX_ROWS_IN_MEMORY)
|
||||
.basePersistDirectory(temporaryFolder.newFolder())
|
||||
.build();
|
||||
allocator = new TestSegmentAllocator(DATA_SOURCE, Granularities.HOUR);
|
||||
segmentHandoffNotifierFactory = new TestSegmentHandoffNotifierFactory();
|
||||
dataSegmentKiller = createStrictMock(DataSegmentKiller.class);
|
||||
|
|
|
@ -51,7 +51,9 @@ import org.apache.druid.testing.InitializedNullHandlingTest;
|
|||
import org.apache.druid.timeline.DataSegment;
|
||||
import org.apache.druid.timeline.partition.LinearShardSpec;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
import org.junit.rules.TemporaryFolder;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
|
@ -69,10 +71,17 @@ public class StreamAppenderatorTest extends InitializedNullHandlingTest
|
|||
si("2001/2002", "A", 0)
|
||||
);
|
||||
|
||||
@Rule
|
||||
public TemporaryFolder temporaryFolder = new TemporaryFolder();
|
||||
|
||||
@Test
|
||||
public void testSimpleIngestion() throws Exception
|
||||
{
|
||||
try (final StreamAppenderatorTester tester = new StreamAppenderatorTester(2, true)) {
|
||||
try (final StreamAppenderatorTester tester =
|
||||
new StreamAppenderatorTester.Builder().maxRowsInMemory(2)
|
||||
.enablePushFailure(true)
|
||||
.basePersistDirectory(temporaryFolder.newFolder())
|
||||
.build()) {
|
||||
final Appenderator appenderator = tester.getAppenderator();
|
||||
boolean thrown;
|
||||
|
||||
|
@ -128,7 +137,10 @@ public class StreamAppenderatorTest extends InitializedNullHandlingTest
|
|||
committerSupplier.get(),
|
||||
false
|
||||
).get();
|
||||
Assert.assertEquals(ImmutableMap.of("x", "3"), (Map<String, String>) segmentsAndCommitMetadata.getCommitMetadata());
|
||||
Assert.assertEquals(
|
||||
ImmutableMap.of("x", "3"),
|
||||
(Map<String, String>) segmentsAndCommitMetadata.getCommitMetadata()
|
||||
);
|
||||
Assert.assertEquals(
|
||||
IDENTIFIERS.subList(0, 2),
|
||||
sorted(
|
||||
|
@ -157,15 +169,13 @@ public class StreamAppenderatorTest extends InitializedNullHandlingTest
|
|||
public void testMaxBytesInMemoryWithSkipBytesInMemoryOverheadCheckConfig() throws Exception
|
||||
{
|
||||
try (
|
||||
final StreamAppenderatorTester tester = new StreamAppenderatorTester(
|
||||
100,
|
||||
1024,
|
||||
null,
|
||||
true,
|
||||
new SimpleRowIngestionMeters(),
|
||||
true
|
||||
)
|
||||
) {
|
||||
final StreamAppenderatorTester tester =
|
||||
new StreamAppenderatorTester.Builder().maxRowsInMemory(100)
|
||||
.maxSizeInBytes(1024)
|
||||
.basePersistDirectory(temporaryFolder.newFolder())
|
||||
.enablePushFailure(true)
|
||||
.skipBytesInMemoryOverheadCheck(true)
|
||||
.build()) {
|
||||
final Appenderator appenderator = tester.getAppenderator();
|
||||
final AtomicInteger eventCount = new AtomicInteger(0);
|
||||
final Supplier<Committer> committerSupplier = () -> {
|
||||
|
@ -209,15 +219,13 @@ public class StreamAppenderatorTest extends InitializedNullHandlingTest
|
|||
public void testMaxBytesInMemoryInMultipleSinksWithSkipBytesInMemoryOverheadCheckConfig() throws Exception
|
||||
{
|
||||
try (
|
||||
final StreamAppenderatorTester tester = new StreamAppenderatorTester(
|
||||
100,
|
||||
1024,
|
||||
null,
|
||||
true,
|
||||
new SimpleRowIngestionMeters(),
|
||||
true
|
||||
)
|
||||
) {
|
||||
final StreamAppenderatorTester tester =
|
||||
new StreamAppenderatorTester.Builder().maxRowsInMemory(100)
|
||||
.maxSizeInBytes(1024)
|
||||
.basePersistDirectory(temporaryFolder.newFolder())
|
||||
.enablePushFailure(true)
|
||||
.skipBytesInMemoryOverheadCheck(true)
|
||||
.build()) {
|
||||
final Appenderator appenderator = tester.getAppenderator();
|
||||
final AtomicInteger eventCount = new AtomicInteger(0);
|
||||
final Supplier<Committer> committerSupplier = () -> {
|
||||
|
@ -257,7 +265,13 @@ public class StreamAppenderatorTest extends InitializedNullHandlingTest
|
|||
@Test
|
||||
public void testMaxBytesInMemory() throws Exception
|
||||
{
|
||||
try (final StreamAppenderatorTester tester = new StreamAppenderatorTester(100, 15000, true)) {
|
||||
try (
|
||||
final StreamAppenderatorTester tester =
|
||||
new StreamAppenderatorTester.Builder().maxRowsInMemory(100)
|
||||
.maxSizeInBytes(15000)
|
||||
.basePersistDirectory(temporaryFolder.newFolder())
|
||||
.enablePushFailure(true)
|
||||
.build()) {
|
||||
final Appenderator appenderator = tester.getAppenderator();
|
||||
final AtomicInteger eventCount = new AtomicInteger(0);
|
||||
final Supplier<Committer> committerSupplier = () -> {
|
||||
|
@ -363,7 +377,13 @@ public class StreamAppenderatorTest extends InitializedNullHandlingTest
|
|||
@Test(expected = RuntimeException.class)
|
||||
public void testTaskFailAsPersistCannotFreeAnyMoreMemory() throws Exception
|
||||
{
|
||||
try (final StreamAppenderatorTester tester = new StreamAppenderatorTester(100, 5180, true)) {
|
||||
try (
|
||||
final StreamAppenderatorTester tester =
|
||||
new StreamAppenderatorTester.Builder().maxRowsInMemory(100)
|
||||
.maxSizeInBytes(5180)
|
||||
.basePersistDirectory(temporaryFolder.newFolder())
|
||||
.enablePushFailure(true)
|
||||
.build()) {
|
||||
final Appenderator appenderator = tester.getAppenderator();
|
||||
final AtomicInteger eventCount = new AtomicInteger(0);
|
||||
final Supplier<Committer> committerSupplier = () -> {
|
||||
|
@ -394,15 +414,13 @@ public class StreamAppenderatorTest extends InitializedNullHandlingTest
|
|||
public void testTaskDoesNotFailAsExceededMemoryWithSkipBytesInMemoryOverheadCheckConfig() throws Exception
|
||||
{
|
||||
try (
|
||||
final StreamAppenderatorTester tester = new StreamAppenderatorTester(
|
||||
100,
|
||||
10,
|
||||
null,
|
||||
true,
|
||||
new SimpleRowIngestionMeters(),
|
||||
true
|
||||
)
|
||||
) {
|
||||
final StreamAppenderatorTester tester =
|
||||
new StreamAppenderatorTester.Builder().maxRowsInMemory(100)
|
||||
.maxSizeInBytes(10)
|
||||
.basePersistDirectory(temporaryFolder.newFolder())
|
||||
.enablePushFailure(true)
|
||||
.skipBytesInMemoryOverheadCheck(true)
|
||||
.build()) {
|
||||
final Appenderator appenderator = tester.getAppenderator();
|
||||
final AtomicInteger eventCount = new AtomicInteger(0);
|
||||
final Supplier<Committer> committerSupplier = () -> {
|
||||
|
@ -443,7 +461,13 @@ public class StreamAppenderatorTest extends InitializedNullHandlingTest
|
|||
@Test
|
||||
public void testTaskCleanupInMemoryCounterAfterCloseWithRowInMemory() throws Exception
|
||||
{
|
||||
try (final StreamAppenderatorTester tester = new StreamAppenderatorTester(100, 10000, true)) {
|
||||
try (
|
||||
final StreamAppenderatorTester tester =
|
||||
new StreamAppenderatorTester.Builder().maxRowsInMemory(100)
|
||||
.maxSizeInBytes(10000)
|
||||
.basePersistDirectory(temporaryFolder.newFolder())
|
||||
.enablePushFailure(true)
|
||||
.build()) {
|
||||
final Appenderator appenderator = tester.getAppenderator();
|
||||
final AtomicInteger eventCount = new AtomicInteger(0);
|
||||
final Supplier<Committer> committerSupplier = () -> {
|
||||
|
@ -488,7 +512,13 @@ public class StreamAppenderatorTest extends InitializedNullHandlingTest
|
|||
@Test
|
||||
public void testMaxBytesInMemoryInMultipleSinks() throws Exception
|
||||
{
|
||||
try (final StreamAppenderatorTester tester = new StreamAppenderatorTester(100, 31100, true)) {
|
||||
try (
|
||||
final StreamAppenderatorTester tester =
|
||||
new StreamAppenderatorTester.Builder().maxRowsInMemory(100)
|
||||
.maxSizeInBytes(31100)
|
||||
.basePersistDirectory(temporaryFolder.newFolder())
|
||||
.enablePushFailure(true)
|
||||
.build()) {
|
||||
final Appenderator appenderator = tester.getAppenderator();
|
||||
final AtomicInteger eventCount = new AtomicInteger(0);
|
||||
final Supplier<Committer> committerSupplier = () -> {
|
||||
|
@ -628,7 +658,13 @@ public class StreamAppenderatorTest extends InitializedNullHandlingTest
|
|||
@Test
|
||||
public void testIgnoreMaxBytesInMemory() throws Exception
|
||||
{
|
||||
try (final StreamAppenderatorTester tester = new StreamAppenderatorTester(100, -1, true)) {
|
||||
try (
|
||||
final StreamAppenderatorTester tester =
|
||||
new StreamAppenderatorTester.Builder().maxRowsInMemory(100)
|
||||
.maxSizeInBytes(-1)
|
||||
.basePersistDirectory(temporaryFolder.newFolder())
|
||||
.enablePushFailure(true)
|
||||
.build()) {
|
||||
final Appenderator appenderator = tester.getAppenderator();
|
||||
final AtomicInteger eventCount = new AtomicInteger(0);
|
||||
final Supplier<Committer> committerSupplier = () -> {
|
||||
|
@ -676,7 +712,12 @@ public class StreamAppenderatorTest extends InitializedNullHandlingTest
|
|||
@Test
|
||||
public void testMaxRowsInMemory() throws Exception
|
||||
{
|
||||
try (final StreamAppenderatorTester tester = new StreamAppenderatorTester(3, true)) {
|
||||
try (
|
||||
final StreamAppenderatorTester tester =
|
||||
new StreamAppenderatorTester.Builder().maxRowsInMemory(3)
|
||||
.basePersistDirectory(temporaryFolder.newFolder())
|
||||
.enablePushFailure(true)
|
||||
.build()) {
|
||||
final Appenderator appenderator = tester.getAppenderator();
|
||||
final AtomicInteger eventCount = new AtomicInteger(0);
|
||||
final Supplier<Committer> committerSupplier = new Supplier<Committer>()
|
||||
|
@ -727,7 +768,11 @@ public class StreamAppenderatorTest extends InitializedNullHandlingTest
|
|||
@Test
|
||||
public void testMaxRowsInMemoryDisallowIncrementalPersists() throws Exception
|
||||
{
|
||||
try (final StreamAppenderatorTester tester = new StreamAppenderatorTester(3, false)) {
|
||||
try (
|
||||
final StreamAppenderatorTester tester =
|
||||
new StreamAppenderatorTester.Builder().maxRowsInMemory(3)
|
||||
.basePersistDirectory(temporaryFolder.newFolder())
|
||||
.build()) {
|
||||
final Appenderator appenderator = tester.getAppenderator();
|
||||
final AtomicInteger eventCount = new AtomicInteger(0);
|
||||
final Supplier<Committer> committerSupplier = () -> {
|
||||
|
@ -774,7 +819,12 @@ public class StreamAppenderatorTest extends InitializedNullHandlingTest
|
|||
public void testRestoreFromDisk() throws Exception
|
||||
{
|
||||
final RealtimeTuningConfig tuningConfig;
|
||||
try (final StreamAppenderatorTester tester = new StreamAppenderatorTester(2, true)) {
|
||||
try (
|
||||
final StreamAppenderatorTester tester =
|
||||
new StreamAppenderatorTester.Builder().maxRowsInMemory(2)
|
||||
.basePersistDirectory(temporaryFolder.newFolder())
|
||||
.enablePushFailure(true)
|
||||
.build()) {
|
||||
final Appenderator appenderator = tester.getAppenderator();
|
||||
tuningConfig = tester.getTuningConfig();
|
||||
|
||||
|
@ -816,12 +866,12 @@ public class StreamAppenderatorTest extends InitializedNullHandlingTest
|
|||
appenderator.add(IDENTIFIERS.get(0), ir("2000", "bob", 5), committerSupplier);
|
||||
appenderator.close();
|
||||
|
||||
try (final StreamAppenderatorTester tester2 = new StreamAppenderatorTester(
|
||||
2,
|
||||
-1,
|
||||
tuningConfig.getBasePersistDirectory(),
|
||||
true
|
||||
)) {
|
||||
try (
|
||||
final StreamAppenderatorTester tester2 =
|
||||
new StreamAppenderatorTester.Builder().maxRowsInMemory(2)
|
||||
.basePersistDirectory(tuningConfig.getBasePersistDirectory())
|
||||
.enablePushFailure(true)
|
||||
.build()) {
|
||||
final Appenderator appenderator2 = tester2.getAppenderator();
|
||||
Assert.assertEquals(ImmutableMap.of("eventCount", 4), appenderator2.startJob());
|
||||
Assert.assertEquals(ImmutableList.of(IDENTIFIERS.get(0)), appenderator2.getSegments());
|
||||
|
@ -833,7 +883,12 @@ public class StreamAppenderatorTest extends InitializedNullHandlingTest
|
|||
@Test(timeout = 60_000L)
|
||||
public void testTotalRowCount() throws Exception
|
||||
{
|
||||
try (final StreamAppenderatorTester tester = new StreamAppenderatorTester(3, true)) {
|
||||
try (
|
||||
final StreamAppenderatorTester tester =
|
||||
new StreamAppenderatorTester.Builder().maxRowsInMemory(3)
|
||||
.basePersistDirectory(temporaryFolder.newFolder())
|
||||
.enablePushFailure(true)
|
||||
.build()) {
|
||||
final Appenderator appenderator = tester.getAppenderator();
|
||||
final ConcurrentMap<String, String> commitMetadata = new ConcurrentHashMap<>();
|
||||
final Supplier<Committer> committerSupplier = committerSupplierFromConcurrentMap(commitMetadata);
|
||||
|
@ -876,7 +931,13 @@ public class StreamAppenderatorTest extends InitializedNullHandlingTest
|
|||
public void testVerifyRowIngestionMetrics() throws Exception
|
||||
{
|
||||
final RowIngestionMeters rowIngestionMeters = new SimpleRowIngestionMeters();
|
||||
try (final StreamAppenderatorTester tester = new StreamAppenderatorTester(5, 10000L, null, false, rowIngestionMeters)) {
|
||||
try (
|
||||
final StreamAppenderatorTester tester =
|
||||
new StreamAppenderatorTester.Builder().maxRowsInMemory(5)
|
||||
.maxSizeInBytes(10000L)
|
||||
.basePersistDirectory(temporaryFolder.newFolder())
|
||||
.rowIngestionMeters(rowIngestionMeters)
|
||||
.build()) {
|
||||
final Appenderator appenderator = tester.getAppenderator();
|
||||
appenderator.startJob();
|
||||
appenderator.add(IDENTIFIERS.get(0), ir("2000", "foo", "invalid_met"), Committers.nilSupplier());
|
||||
|
@ -892,7 +953,12 @@ public class StreamAppenderatorTest extends InitializedNullHandlingTest
|
|||
@Test
|
||||
public void testQueryByIntervals() throws Exception
|
||||
{
|
||||
try (final StreamAppenderatorTester tester = new StreamAppenderatorTester(2, true)) {
|
||||
try (
|
||||
final StreamAppenderatorTester tester =
|
||||
new StreamAppenderatorTester.Builder().maxRowsInMemory(2)
|
||||
.basePersistDirectory(temporaryFolder.newFolder())
|
||||
.enablePushFailure(true)
|
||||
.build()) {
|
||||
final Appenderator appenderator = tester.getAppenderator();
|
||||
|
||||
appenderator.startJob();
|
||||
|
@ -1028,7 +1094,12 @@ public class StreamAppenderatorTest extends InitializedNullHandlingTest
|
|||
@Test
|
||||
public void testQueryBySegments() throws Exception
|
||||
{
|
||||
try (final StreamAppenderatorTester tester = new StreamAppenderatorTester(2, true)) {
|
||||
try (
|
||||
final StreamAppenderatorTester tester =
|
||||
new StreamAppenderatorTester.Builder().maxRowsInMemory(2)
|
||||
.basePersistDirectory(temporaryFolder.newFolder())
|
||||
.enablePushFailure(true)
|
||||
.build()) {
|
||||
final Appenderator appenderator = tester.getAppenderator();
|
||||
|
||||
appenderator.startJob();
|
||||
|
|
|
@ -20,6 +20,7 @@
|
|||
package org.apache.druid.segment.realtime.appenderator;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import org.apache.druid.client.cache.CacheConfig;
|
||||
import org.apache.druid.client.cache.CachePopulatorStats;
|
||||
|
@ -87,57 +88,10 @@ public class StreamAppenderatorTester implements AutoCloseable
|
|||
private final ObjectMapper objectMapper;
|
||||
private final Appenderator appenderator;
|
||||
private final ExecutorService queryExecutor;
|
||||
private final IndexIO indexIO;
|
||||
private final IndexMergerV9 indexMerger;
|
||||
private final ServiceEmitter emitter;
|
||||
|
||||
private final List<DataSegment> pushedSegments = new CopyOnWriteArrayList<>();
|
||||
|
||||
public StreamAppenderatorTester(
|
||||
final int maxRowsInMemory
|
||||
)
|
||||
{
|
||||
this(maxRowsInMemory, -1, null, false);
|
||||
}
|
||||
|
||||
public StreamAppenderatorTester(
|
||||
final int maxRowsInMemory,
|
||||
final boolean enablePushFailure
|
||||
)
|
||||
{
|
||||
this(maxRowsInMemory, -1, null, enablePushFailure);
|
||||
}
|
||||
|
||||
public StreamAppenderatorTester(
|
||||
final int maxRowsInMemory,
|
||||
final long maxSizeInBytes,
|
||||
final boolean enablePushFailure
|
||||
)
|
||||
{
|
||||
this(maxRowsInMemory, maxSizeInBytes, null, enablePushFailure);
|
||||
}
|
||||
|
||||
public StreamAppenderatorTester(
|
||||
final int maxRowsInMemory,
|
||||
final long maxSizeInBytes,
|
||||
final File basePersistDirectory,
|
||||
final boolean enablePushFailure
|
||||
)
|
||||
{
|
||||
this(maxRowsInMemory, maxSizeInBytes, basePersistDirectory, enablePushFailure, new SimpleRowIngestionMeters(), false);
|
||||
}
|
||||
|
||||
public StreamAppenderatorTester(
|
||||
final int maxRowsInMemory,
|
||||
final long maxSizeInBytes,
|
||||
final File basePersistDirectory,
|
||||
final boolean enablePushFailure,
|
||||
final RowIngestionMeters rowIngestionMeters
|
||||
)
|
||||
{
|
||||
this(maxRowsInMemory, maxSizeInBytes, basePersistDirectory, enablePushFailure, rowIngestionMeters, false);
|
||||
}
|
||||
|
||||
public StreamAppenderatorTester(
|
||||
final int maxRowsInMemory,
|
||||
final long maxSizeInBytes,
|
||||
|
@ -199,7 +153,7 @@ public class StreamAppenderatorTester implements AutoCloseable
|
|||
metrics = new FireDepartmentMetrics();
|
||||
queryExecutor = Execs.singleThreaded("queryExecutor(%d)");
|
||||
|
||||
indexIO = new IndexIO(
|
||||
IndexIO indexIO = new IndexIO(
|
||||
objectMapper,
|
||||
new ColumnConfig()
|
||||
{
|
||||
|
@ -210,7 +164,12 @@ public class StreamAppenderatorTester implements AutoCloseable
|
|||
}
|
||||
}
|
||||
);
|
||||
indexMerger = new IndexMergerV9(objectMapper, indexIO, OffHeapMemorySegmentWriteOutMediumFactory.instance());
|
||||
|
||||
IndexMergerV9 indexMerger = new IndexMergerV9(
|
||||
objectMapper,
|
||||
indexIO,
|
||||
OffHeapMemorySegmentWriteOutMediumFactory.instance()
|
||||
);
|
||||
|
||||
emitter = new ServiceEmitter(
|
||||
"test",
|
||||
|
@ -342,4 +301,62 @@ public class StreamAppenderatorTester implements AutoCloseable
|
|||
emitter.close();
|
||||
FileUtils.deleteDirectory(tuningConfig.getBasePersistDirectory());
|
||||
}
|
||||
|
||||
public static class Builder
|
||||
{
|
||||
private int maxRowsInMemory;
|
||||
private long maxSizeInBytes = -1;
|
||||
private File basePersistDirectory;
|
||||
private boolean enablePushFailure;
|
||||
private RowIngestionMeters rowIngestionMeters;
|
||||
private boolean skipBytesInMemoryOverheadCheck;
|
||||
|
||||
public Builder maxRowsInMemory(final int maxRowsInMemory)
|
||||
{
|
||||
this.maxRowsInMemory = maxRowsInMemory;
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder maxSizeInBytes(final long maxSizeInBytes)
|
||||
{
|
||||
this.maxSizeInBytes = maxSizeInBytes;
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder basePersistDirectory(final File basePersistDirectory)
|
||||
{
|
||||
this.basePersistDirectory = basePersistDirectory;
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder enablePushFailure(final boolean enablePushFailure)
|
||||
{
|
||||
this.enablePushFailure = enablePushFailure;
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder rowIngestionMeters(final RowIngestionMeters rowIngestionMeters)
|
||||
{
|
||||
this.rowIngestionMeters = rowIngestionMeters;
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder skipBytesInMemoryOverheadCheck(final boolean skipBytesInMemoryOverheadCheck)
|
||||
{
|
||||
this.skipBytesInMemoryOverheadCheck = skipBytesInMemoryOverheadCheck;
|
||||
return this;
|
||||
}
|
||||
|
||||
public StreamAppenderatorTester build()
|
||||
{
|
||||
return new StreamAppenderatorTester(
|
||||
maxRowsInMemory,
|
||||
maxSizeInBytes,
|
||||
Preconditions.checkNotNull(basePersistDirectory, "basePersistDirectory"),
|
||||
enablePushFailure,
|
||||
rowIngestionMeters == null ? new SimpleRowIngestionMeters() : rowIngestionMeters,
|
||||
skipBytesInMemoryOverheadCheck
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -68,7 +68,9 @@ import org.joda.time.Interval;
|
|||
import org.junit.After;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
import org.junit.rules.TemporaryFolder;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.junit.runners.Parameterized;
|
||||
|
||||
|
@ -119,6 +121,9 @@ public class RealtimePlumberSchoolTest extends InitializedNullHandlingTest
|
|||
private FireDepartmentMetrics metrics;
|
||||
private File tmpDir;
|
||||
|
||||
@Rule
|
||||
public TemporaryFolder temporaryFolder = new TemporaryFolder();
|
||||
|
||||
public RealtimePlumberSchoolTest(
|
||||
RejectionPolicyFactory rejectionPolicy,
|
||||
SegmentWriteOutMediumFactory segmentWriteOutMediumFactory
|
||||
|
@ -207,7 +212,7 @@ public class RealtimePlumberSchoolTest extends InitializedNullHandlingTest
|
|||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
temporaryFolder.newFolder(),
|
||||
new IntervalStartVersioningPolicy(),
|
||||
rejectionPolicy,
|
||||
null,
|
||||
|
|
Loading…
Reference in New Issue