Add support for 'maxTotalRows' to incremental publishing kafka indexing task and appenderator based realtime task (#6129)

* resolves #5898 by adding maxTotalRows to incremental publishing kafka index task and appenderator based realtime indexing task, as available in IndexTask

* address review comments

* changes due to review

* merge fail
This commit is contained in:
Clint Wylie 2018-09-07 13:17:49 -07:00 committed by Jonathan Wei
parent e095f63e8e
commit e6e068ce60
17 changed files with 463 additions and 179 deletions

View File

@ -117,7 +117,8 @@ The tuningConfig is optional and default parameters will be used if no tuningCon
|`type`|String|The indexing task type, this should always be `kafka`.|yes|
|`maxRowsInMemory`|Integer|The number of rows to aggregate before persisting. This number is the post-aggregation rows, so it is not equivalent to the number of input events, but the number of aggregated rows that those events result in. This is used to manage the required JVM heap size. Maximum heap memory usage for indexing scales with maxRowsInMemory * (2 + maxPendingPersists). Normally user does not need to set this, but depending on the nature of data, if rows are short in terms of bytes, user may not want to store a million rows in memory and this value should be set.|no (default == 1000000)|
|`maxBytesInMemory`|Long|The number of bytes to aggregate in heap memory before persisting. This is based on a rough estimate of memory usage and not actual usage. Normally this is computed internally and user does not need to set it. The maximum heap memory usage for indexing is maxBytesInMemory * (2 + maxPendingPersists). |no (default == One-sixth of max JVM memory)|
|`maxRowsPerSegment`|Integer|The number of rows to aggregate into a segment; this number is post-aggregation rows. Handoff will happen either if `maxRowsPerSegment` is hit or every `intermediateHandoffPeriod`, whichever happens earlier.|no (default == 5000000)|
|`maxRowsPerSegment`|Integer|The number of rows to aggregate into a segment; this number is post-aggregation rows. Handoff will happen either if `maxRowsPerSegment` or `maxTotalRows` is hit or every `intermediateHandoffPeriod`, whichever happens earlier.|no (default == 5000000)|
|`maxTotalRows`|Long|The number of rows to aggregate across all segments; this number is post-aggregation rows. Handoff will happen either if `maxRowsPerSegment` or `maxTotalRows` is hit or every `intermediateHandoffPeriod`, whichever happens earlier.|no (default == unlimited)|
|`intermediatePersistPeriod`|ISO8601 Period|The period that determines the rate at which intermediate persists occur.|no (default == PT10M)|
|`maxPendingPersists`|Integer|Maximum number of persists that can be pending but not started. If this limit would be exceeded by a new intermediate persist, ingestion will block until the currently-running persist finishes. Maximum heap memory usage for indexing scales with maxRowsInMemory * (2 + maxPendingPersists).|no (default == 0, meaning one persist can be running concurrently with ingestion, and none can be queued up)|
|`indexSpec`|Object|Tune how data is indexed, see 'IndexSpec' below for more details.|no|
@ -131,7 +132,7 @@ The tuningConfig is optional and default parameters will be used if no tuningCon
|`shutdownTimeout`|ISO8601 Period|How long to wait for the supervisor to attempt a graceful shutdown of tasks before exiting.|no (default == PT80S)|
|`offsetFetchPeriod`|ISO8601 Period|How often the supervisor queries Kafka and the indexing tasks to fetch current offsets and calculate lag.|no (default == PT30S, min == PT5S)|
|`segmentWriteOutMediumFactory`|String|Segment write-out medium to use when creating segments. See [Additional Peon Configuration: SegmentWriteOutMediumFactory](../../configuration/index.html#segmentwriteoutmediumfactory) for explanation and available options.|no (not specified by default, the value from `druid.peon.defaultSegmentWriteOutMediumFactory` is used)|
|`intermediateHandoffPeriod`|ISO8601 Period|How often the tasks should hand off segments. Handoff will happen either if `maxRowsPerSegment` is hit or every `intermediateHandoffPeriod`, whichever happens earlier.|no (default == P2147483647D)|
|`intermediateHandoffPeriod`|ISO8601 Period|How often the tasks should hand off segments. Handoff will happen either if `maxRowsPerSegment` or `maxTotalRows` is hit or every `intermediateHandoffPeriod`, whichever happens earlier.|no (default == P2147483647D)|
#### IndexSpec
@ -314,10 +315,10 @@ In this way, configuration changes can be applied without requiring any pause in
### On the Subject of Segments
Each Kafka Indexing Task puts events consumed from Kafka partitions assigned to it in a single segment for each segment
granular interval until maxRowsPerSegment or intermediateHandoffPeriod limit is reached, at this point a new partition
granular interval until maxRowsPerSegment, maxTotalRows or intermediateHandoffPeriod limit is reached, at this point a new partition
for this segment granularity is created for further events. Kafka Indexing Task also does incremental hand-offs which
means that all the segments created by a task will not be held up till the task duration is over. As soon as maxRowsPerSegment
or intermediateHandoffPeriod limit is hit, all the segments held by the task at that point in time will be handed-off
means that all the segments created by a task will not be held up till the task duration is over. As soon as maxRowsPerSegment,
maxTotalRows or intermediateHandoffPeriod limit is hit, all the segments held by the task at that point in time will be handed-off
and new set of segments will be created for further events. This means that the task can run for longer durations of time
without accumulating old segments locally on Middle Manager nodes and it is encouraged to do so.

View File

@ -431,9 +431,6 @@ public class IncrementalPublishingKafkaIndexTaskRunner implements KafkaIndexTask
// if stop is requested or task's end offset is set by call to setEndOffsets method with finish set to true
if (stopRequested.get() || sequences.get(sequences.size() - 1).isCheckpointed()) {
status = Status.PUBLISHING;
}
if (stopRequested.get()) {
break;
}
@ -530,10 +527,8 @@ public class IncrementalPublishingKafkaIndexTaskRunner implements KafkaIndexTask
if (addResult.isOk()) {
// If the number of rows in the segment exceeds the threshold after adding a row,
// move the segment out from the active segments of BaseAppenderatorDriver to make a new segment.
if (addResult.getNumRowsInSegment() > tuningConfig.getMaxRowsPerSegment()) {
if (!sequenceToUse.isCheckpointed()) {
sequenceToCheckpoint = sequenceToUse;
}
if (addResult.isPushRequired(tuningConfig) && !sequenceToUse.isCheckpointed()) {
sequenceToCheckpoint = sequenceToUse;
}
isPersistRequired |= addResult.isPersistRequired();
} else {

View File

@ -40,6 +40,8 @@ public class KafkaTuningConfig implements TuningConfig, AppenderatorConfig
private final int maxRowsInMemory;
private final long maxBytesInMemory;
private final int maxRowsPerSegment;
@Nullable
private final Long maxTotalRows;
private final Period intermediatePersistPeriod;
private final File basePersistDirectory;
@Deprecated
@ -61,6 +63,7 @@ public class KafkaTuningConfig implements TuningConfig, AppenderatorConfig
@JsonProperty("maxRowsInMemory") @Nullable Integer maxRowsInMemory,
@JsonProperty("maxBytesInMemory") @Nullable Long maxBytesInMemory,
@JsonProperty("maxRowsPerSegment") @Nullable Integer maxRowsPerSegment,
@JsonProperty("maxTotalRows") @Nullable Long maxTotalRows,
@JsonProperty("intermediatePersistPeriod") @Nullable Period intermediatePersistPeriod,
@JsonProperty("basePersistDirectory") @Nullable File basePersistDirectory,
@JsonProperty("maxPendingPersists") @Nullable Integer maxPendingPersists,
@ -85,6 +88,7 @@ public class KafkaTuningConfig implements TuningConfig, AppenderatorConfig
// initializing this to 0, it will be lazily initialized to a value
// @see server.src.main.java.org.apache.druid.segment.indexing.TuningConfigs#getMaxBytesInMemoryOrDefault(long)
this.maxBytesInMemory = maxBytesInMemory == null ? 0 : maxBytesInMemory;
this.maxTotalRows = maxTotalRows;
this.intermediatePersistPeriod = intermediatePersistPeriod == null
? defaults.getIntermediatePersistPeriod()
: intermediatePersistPeriod;
@ -123,6 +127,7 @@ public class KafkaTuningConfig implements TuningConfig, AppenderatorConfig
config.maxRowsInMemory,
config.maxBytesInMemory,
config.maxRowsPerSegment,
config.maxTotalRows,
config.intermediatePersistPeriod,
config.basePersistDirectory,
config.maxPendingPersists,
@ -153,12 +158,22 @@ public class KafkaTuningConfig implements TuningConfig, AppenderatorConfig
return maxBytesInMemory;
}
@Override
@JsonProperty
public int getMaxRowsPerSegment()
{
return maxRowsPerSegment;
}
@JsonProperty
@Override
@Nullable
public Long getMaxTotalRows()
{
return maxTotalRows;
}
@Override
@JsonProperty
public Period getIntermediatePersistPeriod()
@ -255,6 +270,7 @@ public class KafkaTuningConfig implements TuningConfig, AppenderatorConfig
maxRowsInMemory,
maxBytesInMemory,
maxRowsPerSegment,
maxTotalRows,
intermediatePersistPeriod,
dir,
maxPendingPersists,
@ -284,6 +300,7 @@ public class KafkaTuningConfig implements TuningConfig, AppenderatorConfig
return maxRowsInMemory == that.maxRowsInMemory &&
maxRowsPerSegment == that.maxRowsPerSegment &&
maxBytesInMemory == that.maxBytesInMemory &&
Objects.equals(maxTotalRows, that.maxTotalRows) &&
maxPendingPersists == that.maxPendingPersists &&
reportParseExceptions == that.reportParseExceptions &&
handoffConditionTimeout == that.handoffConditionTimeout &&
@ -305,6 +322,7 @@ public class KafkaTuningConfig implements TuningConfig, AppenderatorConfig
maxRowsInMemory,
maxRowsPerSegment,
maxBytesInMemory,
maxTotalRows,
intermediatePersistPeriod,
basePersistDirectory,
maxPendingPersists,
@ -326,6 +344,7 @@ public class KafkaTuningConfig implements TuningConfig, AppenderatorConfig
return "KafkaTuningConfig{" +
"maxRowsInMemory=" + maxRowsInMemory +
", maxRowsPerSegment=" + maxRowsPerSegment +
", maxTotalRows=" + maxTotalRows +
", maxBytesInMemory=" + maxBytesInMemory +
", intermediatePersistPeriod=" + intermediatePersistPeriod +
", basePersistDirectory=" + basePersistDirectory +

View File

@ -97,6 +97,7 @@ public class KafkaSupervisorSpec implements SupervisorSpec
null,
null,
null,
null,
null
);
this.ioConfig = Preconditions.checkNotNull(ioConfig, "ioConfig");

View File

@ -43,6 +43,7 @@ public class KafkaSupervisorTuningConfig extends KafkaTuningConfig
@JsonProperty("maxRowsInMemory") Integer maxRowsInMemory,
@JsonProperty("maxBytesInMemory") Long maxBytesInMemory,
@JsonProperty("maxRowsPerSegment") Integer maxRowsPerSegment,
@JsonProperty("maxTotalRows") Long maxTotalRows,
@JsonProperty("intermediatePersistPeriod") Period intermediatePersistPeriod,
@JsonProperty("basePersistDirectory") File basePersistDirectory,
@JsonProperty("maxPendingPersists") Integer maxPendingPersists,
@ -69,6 +70,7 @@ public class KafkaSupervisorTuningConfig extends KafkaTuningConfig
maxRowsInMemory,
maxBytesInMemory,
maxRowsPerSegment,
maxTotalRows,
intermediatePersistPeriod,
basePersistDirectory,
maxPendingPersists,
@ -134,6 +136,7 @@ public class KafkaSupervisorTuningConfig extends KafkaTuningConfig
return "KafkaSupervisorTuningConfig{" +
"maxRowsInMemory=" + getMaxRowsInMemory() +
", maxRowsPerSegment=" + getMaxRowsPerSegment() +
", maxTotalRows=" + getMaxTotalRows() +
", maxBytesInMemory=" + TuningConfigs.getMaxBytesInMemoryOrDefault(getMaxBytesInMemory()) +
", intermediatePersistPeriod=" + getIntermediatePersistPeriod() +
", basePersistDirectory=" + getBasePersistDirectory() +

View File

@ -104,7 +104,6 @@ import org.apache.druid.query.Query;
import org.apache.druid.query.QueryRunner;
import org.apache.druid.query.QueryRunnerFactoryConglomerate;
import org.apache.druid.query.QueryToolChest;
import org.apache.druid.query.QueryWatcher;
import org.apache.druid.query.Result;
import org.apache.druid.query.SegmentDescriptor;
import org.apache.druid.query.aggregation.AggregatorFactory;
@ -169,7 +168,6 @@ import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.Callable;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
@ -200,6 +198,7 @@ public class KafkaIndexTaskTest
private boolean resetOffsetAutomatically = false;
private boolean doHandoff = true;
private Integer maxRowsPerSegment = null;
private Long maxTotalRows = null;
private Period intermediateHandoffPeriod = null;
private TaskToolboxFactory toolboxFactory;
@ -214,6 +213,8 @@ public class KafkaIndexTaskTest
private File reportsFile;
private RowIngestionMetersFactory rowIngestionMetersFactory;
private int handoffCount = 0;
// This should be removed in versions greater that 0.12.x
// isIncrementalHandoffSupported should always be set to true in those later versions
@Parameterized.Parameters(name = "isIncrementalHandoffSupported = {0}")
@ -476,7 +477,7 @@ public class KafkaIndexTaskTest
}
final String baseSequenceName = "sequence0";
// as soon as any segment has more than one record, incremental publishing should happen
maxRowsPerSegment = 1;
maxRowsPerSegment = 2;
// Insert data
try (final KafkaProducer<byte[], byte[]> kafkaProducer = kafkaServer.newProducer()) {
@ -560,6 +561,125 @@ public class KafkaIndexTaskTest
Assert.assertEquals(ImmutableList.of("f"), readSegmentColumn("dim1", desc7));
}
@Test(timeout = 60_000L)
public void testIncrementalHandOffMaxTotalRows() throws Exception
{
if (!isIncrementalHandoffSupported) {
return;
}
final String baseSequenceName = "sequence0";
// incremental publish should happen every 3 records
maxRowsPerSegment = Integer.MAX_VALUE;
maxTotalRows = 3L;
// Insert data
int numToAdd = records.size() - 2;
try (final KafkaProducer<byte[], byte[]> kafkaProducer = kafkaServer.newProducer()) {
for (int i = 0; i < numToAdd; i++) {
kafkaProducer.send(records.get(i)).get();
}
Map<String, String> consumerProps = kafkaServer.consumerProperties();
consumerProps.put("max.poll.records", "1");
final KafkaPartitions startPartitions = new KafkaPartitions(topic, ImmutableMap.of(0, 0L, 1, 0L));
final KafkaPartitions checkpoint1 = new KafkaPartitions(topic, ImmutableMap.of(0, 3L, 1, 0L));
final KafkaPartitions checkpoint2 = new KafkaPartitions(topic, ImmutableMap.of(0, 10L, 1, 0L));
final KafkaPartitions endPartitions = new KafkaPartitions(topic, ImmutableMap.of(0, 10L, 1, 2L));
final KafkaIndexTask task = createTask(
null,
new KafkaIOConfig(
0,
baseSequenceName,
startPartitions,
endPartitions,
consumerProps,
true,
null,
null,
false
)
);
final ListenableFuture<TaskStatus> future = runTask(task);
while (task.getRunner().getStatus() != KafkaIndexTask.Status.PAUSED) {
Thread.sleep(10);
}
final Map<Integer, Long> currentOffsets = ImmutableMap.copyOf(task.getRunner().getCurrentOffsets());
Assert.assertTrue(checkpoint1.getPartitionOffsetMap().equals(currentOffsets));
task.getRunner().setEndOffsets(currentOffsets, false);
while (task.getRunner().getStatus() != KafkaIndexTask.Status.PAUSED) {
Thread.sleep(10);
}
// add remaining records
for (int i = numToAdd; i < records.size(); i++) {
kafkaProducer.send(records.get(i)).get();
}
final Map<Integer, Long> nextOffsets = ImmutableMap.copyOf(task.getRunner().getCurrentOffsets());
Assert.assertTrue(checkpoint2.getPartitionOffsetMap().equals(nextOffsets));
task.getRunner().setEndOffsets(nextOffsets, false);
Assert.assertEquals(TaskState.SUCCESS, future.get().getStatusCode());
Assert.assertEquals(2, checkpointRequestsHash.size());
Assert.assertTrue(
checkpointRequestsHash.contains(
Objects.hash(
DATA_SCHEMA.getDataSource(),
0,
new KafkaDataSourceMetadata(startPartitions),
new KafkaDataSourceMetadata(new KafkaPartitions(topic, currentOffsets))
)
)
);
Assert.assertTrue(
checkpointRequestsHash.contains(
Objects.hash(
DATA_SCHEMA.getDataSource(),
0,
new KafkaDataSourceMetadata(new KafkaPartitions(topic, currentOffsets)),
new KafkaDataSourceMetadata(new KafkaPartitions(topic, nextOffsets))
)
)
);
// Check metrics
Assert.assertEquals(8, task.getRunner().getRowIngestionMeters().getProcessed());
Assert.assertEquals(3, task.getRunner().getRowIngestionMeters().getUnparseable());
Assert.assertEquals(1, task.getRunner().getRowIngestionMeters().getThrownAway());
// Check published metadata
SegmentDescriptor desc1 = SD(task, "2008/P1D", 0);
SegmentDescriptor desc2 = SD(task, "2009/P1D", 0);
SegmentDescriptor desc3 = SD(task, "2010/P1D", 0);
SegmentDescriptor desc4 = SD(task, "2011/P1D", 0);
SegmentDescriptor desc5 = SD(task, "2011/P1D", 1);
SegmentDescriptor desc6 = SD(task, "2012/P1D", 0);
SegmentDescriptor desc7 = SD(task, "2013/P1D", 0);
Assert.assertEquals(ImmutableSet.of(desc1, desc2, desc3, desc4, desc5, desc6, desc7), publishedDescriptors());
Assert.assertEquals(
new KafkaDataSourceMetadata(new KafkaPartitions(topic, ImmutableMap.of(0, 10L, 1, 2L))),
metadataStorageCoordinator.getDataSourceMetadata(DATA_SCHEMA.getDataSource())
);
// Check segments in deep storage
Assert.assertEquals(ImmutableList.of("a"), readSegmentColumn("dim1", desc1));
Assert.assertEquals(ImmutableList.of("b"), readSegmentColumn("dim1", desc2));
Assert.assertEquals(ImmutableList.of("c"), readSegmentColumn("dim1", desc3));
Assert.assertTrue((ImmutableList.of("d", "e").equals(readSegmentColumn("dim1", desc4))
&& ImmutableList.of("h").equals(readSegmentColumn("dim1", desc5))) ||
(ImmutableList.of("d", "h").equals(readSegmentColumn("dim1", desc4))
&& ImmutableList.of("e").equals(readSegmentColumn("dim1", desc5))));
Assert.assertEquals(ImmutableList.of("g"), readSegmentColumn("dim1", desc6));
Assert.assertEquals(ImmutableList.of("f"), readSegmentColumn("dim1", desc7));
}
}
@Test(timeout = 60_000L)
public void testTimeBasedIncrementalHandOff() throws Exception
{
@ -1821,23 +1941,18 @@ public class KafkaIndexTaskTest
runningTasks.add(task);
}
return taskExec.submit(
new Callable<TaskStatus>()
{
@Override
public TaskStatus call()
{
try {
if (task.isReady(toolbox.getTaskActionClient())) {
return task.run(toolbox);
} else {
throw new ISE("Task is not ready");
}
}
catch (Exception e) {
log.warn(e, "Task failed");
return TaskStatus.failure(task.getId());
() -> {
try {
if (task.isReady(toolbox.getTaskActionClient())) {
return task.run(toolbox);
} else {
throw new ISE("Task is not ready");
}
}
catch (Exception e) {
log.warn(e, "Task failed");
return TaskStatus.failure(task.getId());
}
}
);
}
@ -1884,6 +1999,7 @@ public class KafkaIndexTaskTest
1000,
null,
maxRowsPerSegment,
maxTotalRows,
new Period("P1Y"),
null,
null,
@ -1928,6 +2044,7 @@ public class KafkaIndexTaskTest
1000,
null,
maxRowsPerSegment,
null,
new Period("P1Y"),
null,
null,
@ -1995,13 +2112,8 @@ public class KafkaIndexTaskTest
new TimeseriesQueryRunnerFactory(
new TimeseriesQueryQueryToolChest(queryRunnerDecorator),
new TimeseriesQueryEngine(),
new QueryWatcher()
{
@Override
public void registerQuery(Query query, ListenableFuture future)
{
// do nothing
}
(query, future) -> {
// do nothing
}
)
)
@ -2084,37 +2196,30 @@ public class KafkaIndexTaskTest
taskStorage,
taskActionToolbox
);
final SegmentHandoffNotifierFactory handoffNotifierFactory = new SegmentHandoffNotifierFactory()
final SegmentHandoffNotifierFactory handoffNotifierFactory = dataSource -> new SegmentHandoffNotifier()
{
@Override
public SegmentHandoffNotifier createSegmentHandoffNotifier(String dataSource)
public boolean registerSegmentHandoffCallback(
SegmentDescriptor descriptor, Executor exec, Runnable handOffRunnable
)
{
return new SegmentHandoffNotifier()
{
@Override
public boolean registerSegmentHandoffCallback(
SegmentDescriptor descriptor, Executor exec, Runnable handOffRunnable
)
{
if (doHandoff) {
// Simulate immediate handoff
exec.execute(handOffRunnable);
}
return true;
}
if (doHandoff) {
// Simulate immediate handoff
exec.execute(handOffRunnable);
}
return true;
}
@Override
public void start()
{
//Noop
}
@Override
public void start()
{
//Noop
}
@Override
public void close()
{
//Noop
}
};
@Override
public void close()
{
//Noop
}
};
final LocalDataSegmentPusherConfig dataSegmentPusherConfig = new LocalDataSegmentPusherConfig();

View File

@ -58,6 +58,7 @@ public class KafkaTuningConfigTest
Assert.assertNotNull(config.getBasePersistDirectory());
Assert.assertEquals(1000000, config.getMaxRowsInMemory());
Assert.assertEquals(5_000_000, config.getMaxRowsPerSegment());
Assert.assertEquals(null, config.getMaxTotalRows());
Assert.assertEquals(new Period("PT10M"), config.getIntermediatePersistPeriod());
Assert.assertEquals(0, config.getMaxPendingPersists());
Assert.assertEquals(new IndexSpec(), config.getIndexSpec());
@ -73,6 +74,7 @@ public class KafkaTuningConfigTest
+ " \"basePersistDirectory\": \"/tmp/xxx\",\n"
+ " \"maxRowsInMemory\": 100,\n"
+ " \"maxRowsPerSegment\": 100,\n"
+ " \"maxTotalRows\": 1000,\n"
+ " \"intermediatePersistPeriod\": \"PT1H\",\n"
+ " \"maxPendingPersists\": 100,\n"
+ " \"reportParseExceptions\": true,\n"
@ -92,6 +94,8 @@ public class KafkaTuningConfigTest
Assert.assertEquals(new File("/tmp/xxx"), config.getBasePersistDirectory());
Assert.assertEquals(100, config.getMaxRowsInMemory());
Assert.assertEquals(100, config.getMaxRowsPerSegment());
Assert.assertNotEquals(null, config.getMaxTotalRows());
Assert.assertEquals(1000, config.getMaxTotalRows().longValue());
Assert.assertEquals(new Period("PT1H"), config.getIntermediatePersistPeriod());
Assert.assertEquals(0, config.getMaxPendingPersists());
Assert.assertEquals(true, config.isReportParseExceptions());
@ -105,6 +109,7 @@ public class KafkaTuningConfigTest
1,
null,
2,
10L,
new Period("PT3S"),
new File("/tmp/xxx"),
4,
@ -123,6 +128,8 @@ public class KafkaTuningConfigTest
Assert.assertEquals(1, copy.getMaxRowsInMemory());
Assert.assertEquals(2, copy.getMaxRowsPerSegment());
Assert.assertNotEquals(null, copy.getMaxTotalRows());
Assert.assertEquals(10L, copy.getMaxTotalRows().longValue());
Assert.assertEquals(new Period("PT3S"), copy.getIntermediatePersistPeriod());
Assert.assertEquals(new File("/tmp/xxx"), copy.getBasePersistDirectory());
Assert.assertEquals(0, copy.getMaxPendingPersists());

View File

@ -194,6 +194,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
1000,
null,
50000,
null,
new Period("P1Y"),
new File("/test"),
null,

View File

@ -53,8 +53,10 @@ public class RealtimeAppenderatorTuningConfig implements TuningConfig, Appendera
}
private final int maxRowsInMemory;
private final int maxRowsPerSegment;
private final long maxBytesInMemory;
private final int maxRowsPerSegment;
@Nullable
private final Long maxTotalRows;
private final Period intermediatePersistPeriod;
private final File basePersistDirectory;
private final int maxPendingPersists;
@ -73,8 +75,9 @@ public class RealtimeAppenderatorTuningConfig implements TuningConfig, Appendera
@JsonCreator
public RealtimeAppenderatorTuningConfig(
@JsonProperty("maxRowsInMemory") Integer maxRowsInMemory,
@JsonProperty("maxRowsPerSegment") @Nullable Integer maxRowsPerSegment,
@JsonProperty("maxBytesInMemory") @Nullable Long maxBytesInMemory,
@JsonProperty("maxRowsPerSegment") @Nullable Integer maxRowsPerSegment,
@JsonProperty("maxTotalRows") @Nullable Long maxTotalRows,
@JsonProperty("intermediatePersistPeriod") Period intermediatePersistPeriod,
@JsonProperty("basePersistDirectory") File basePersistDirectory,
@JsonProperty("maxPendingPersists") Integer maxPendingPersists,
@ -94,6 +97,7 @@ public class RealtimeAppenderatorTuningConfig implements TuningConfig, Appendera
// initializing this to 0, it will be lazily intialized to a value
// @see server.src.main.java.org.apache.druid.segment.indexing.TuningConfigs#getMaxBytesInMemoryOrDefault(long)
this.maxBytesInMemory = maxBytesInMemory == null ? 0 : maxBytesInMemory;
this.maxTotalRows = maxTotalRows;
this.intermediatePersistPeriod = intermediatePersistPeriod == null
? defaultIntermediatePersistPeriod
: intermediatePersistPeriod;
@ -105,8 +109,8 @@ public class RealtimeAppenderatorTuningConfig implements TuningConfig, Appendera
? defaultReportParseExceptions
: reportParseExceptions;
this.publishAndHandoffTimeout = publishAndHandoffTimeout == null
? defaultPublishAndHandoffTimeout
: publishAndHandoffTimeout;
? defaultPublishAndHandoffTimeout
: publishAndHandoffTimeout;
Preconditions.checkArgument(this.publishAndHandoffTimeout >= 0, "publishAndHandoffTimeout must be >= 0");
this.alertTimeout = alertTimeout == null ? defaultAlertTimeout : alertTimeout;
@ -117,12 +121,16 @@ public class RealtimeAppenderatorTuningConfig implements TuningConfig, Appendera
this.maxParseExceptions = 0;
this.maxSavedParseExceptions = maxSavedParseExceptions == null ? 0 : Math.min(1, maxSavedParseExceptions);
} else {
this.maxParseExceptions = maxParseExceptions == null ? TuningConfig.DEFAULT_MAX_PARSE_EXCEPTIONS : maxParseExceptions;
this.maxParseExceptions = maxParseExceptions == null
? TuningConfig.DEFAULT_MAX_PARSE_EXCEPTIONS
: maxParseExceptions;
this.maxSavedParseExceptions = maxSavedParseExceptions == null
? TuningConfig.DEFAULT_MAX_SAVED_PARSE_EXCEPTIONS
: maxSavedParseExceptions;
}
this.logParseExceptions = logParseExceptions == null ? TuningConfig.DEFAULT_LOG_PARSE_EXCEPTIONS : logParseExceptions;
this.logParseExceptions = logParseExceptions == null
? TuningConfig.DEFAULT_LOG_PARSE_EXCEPTIONS
: logParseExceptions;
}
@Override
@ -138,12 +146,21 @@ public class RealtimeAppenderatorTuningConfig implements TuningConfig, Appendera
return maxBytesInMemory;
}
@Override
@JsonProperty
public int getMaxRowsPerSegment()
{
return maxRowsPerSegment;
}
@Override
@JsonProperty
@Nullable
public Long getMaxTotalRows()
{
return maxTotalRows;
}
@Override
@JsonProperty
public Period getIntermediatePersistPeriod()
@ -227,8 +244,9 @@ public class RealtimeAppenderatorTuningConfig implements TuningConfig, Appendera
{
return new RealtimeAppenderatorTuningConfig(
maxRowsInMemory,
maxRowsPerSegment,
maxBytesInMemory,
maxRowsPerSegment,
maxTotalRows,
intermediatePersistPeriod,
dir,
maxPendingPersists,

View File

@ -322,9 +322,8 @@ public class AppenderatorDriverRealtimeIndexTask extends AbstractTask implements
AppenderatorDriverAddResult addResult = driver.add(inputRow, sequenceName, committerSupplier);
if (addResult.isOk()) {
if (addResult.getNumRowsInSegment() >= tuningConfig.getMaxRowsPerSegment()) {
if (addResult.isPushRequired(tuningConfig)) {
publishSegments(driver, publisher, committerSupplier, sequenceName);
sequenceNumber++;
sequenceName = makeSequenceName(getId(), sequenceNumber);
}
@ -542,7 +541,8 @@ public class AppenderatorDriverRealtimeIndexTask extends AbstractTask implements
private Map<String, Object> getTaskCompletionUnparseableEvents()
{
Map<String, Object> unparseableEventsMap = Maps.newHashMap();
List<String> buildSegmentsParseExceptionMessages = IndexTaskUtils.getMessagesFromSavedParseExceptions(savedParseExceptions);
List<String> buildSegmentsParseExceptionMessages = IndexTaskUtils.getMessagesFromSavedParseExceptions(
savedParseExceptions);
if (buildSegmentsParseExceptionMessages != null) {
unparseableEventsMap.put(RowIngestionMeters.BUILD_SEGMENTS, buildSegmentsParseExceptionMessages);
}

View File

@ -1485,6 +1485,8 @@ public class IndexTask extends AbstractTask implements ChatHandler
}
@JsonProperty
@Override
@Nullable
public Long getMaxTotalRows()
{
return maxTotalRows;

View File

@ -102,7 +102,6 @@ import org.apache.druid.query.QueryPlus;
import org.apache.druid.query.QueryRunner;
import org.apache.druid.query.QueryRunnerFactoryConglomerate;
import org.apache.druid.query.QueryToolChest;
import org.apache.druid.query.QueryWatcher;
import org.apache.druid.query.Result;
import org.apache.druid.query.SegmentDescriptor;
import org.apache.druid.query.aggregation.AggregatorFactory;
@ -158,7 +157,6 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
@ -528,6 +526,73 @@ public class AppenderatorDriverRealtimeIndexTaskTest
Assert.assertEquals(TaskState.SUCCESS, taskStatus.getStatusCode());
}
@Test(timeout = 60_000L)
public void testMaxTotalRows() throws Exception
{
// Expect 2 segments as we will hit maxTotalRows
expectPublishedSegments(2);
final AppenderatorDriverRealtimeIndexTask task =
makeRealtimeTask(null, Integer.MAX_VALUE, 1500L);
final ListenableFuture<TaskStatus> statusFuture = runTask(task);
// Wait for firehose to show up, it starts off null.
while (task.getFirehose() == null) {
Thread.sleep(50);
}
final TestFirehose firehose = (TestFirehose) task.getFirehose();
// maxTotalRows is 1500
for (int i = 0; i < 2000; i++) {
firehose.addRows(
ImmutableList.of(
ImmutableMap.of("t", now.getMillis(), "dim1", "foo-" + i, "met1", "1")
)
);
}
// Stop the firehose, this will drain out existing events.
firehose.close();
// Wait for publish.
Collection<DataSegment> publishedSegments = awaitSegments();
// Check metrics.
Assert.assertEquals(2000, task.getRowIngestionMeters().getProcessed());
Assert.assertEquals(0, task.getRowIngestionMeters().getThrownAway());
Assert.assertEquals(0, task.getRowIngestionMeters().getUnparseable());
// Do some queries.
Assert.assertEquals(2000, sumMetric(task, null, "rows").longValue());
Assert.assertEquals(2000, sumMetric(task, null, "met1").longValue());
awaitHandoffs();
Assert.assertEquals(2, publishedSegments.size());
for (DataSegment publishedSegment : publishedSegments) {
Pair<Executor, Runnable> executorRunnablePair = handOffCallbacks.get(
new SegmentDescriptor(
publishedSegment.getInterval(),
publishedSegment.getVersion(),
publishedSegment.getShardSpec().getPartitionNum()
)
);
Assert.assertNotNull(
publishedSegment + " missing from handoff callbacks: " + handOffCallbacks,
executorRunnablePair
);
// Simulate handoff.
executorRunnablePair.lhs.execute(executorRunnablePair.rhs);
}
handOffCallbacks.clear();
// Wait for the task to finish.
final TaskStatus taskStatus = statusFuture.get();
Assert.assertEquals(TaskState.SUCCESS, taskStatus.getStatusCode());
}
@Test(timeout = 60_000L)
public void testTransformSpec() throws Exception
{
@ -1209,35 +1274,65 @@ public class AppenderatorDriverRealtimeIndexTaskTest
taskLockbox.syncFromStorage();
final TaskToolbox toolbox = taskToolboxFactory.build(task);
return taskExec.submit(
new Callable<TaskStatus>()
{
@Override
public TaskStatus call() throws Exception
{
try {
if (task.isReady(toolbox.getTaskActionClient())) {
return task.run(toolbox);
} else {
throw new ISE("Task is not ready");
}
}
catch (Exception e) {
log.warn(e, "Task failed");
throw e;
() -> {
try {
if (task.isReady(toolbox.getTaskActionClient())) {
return task.run(toolbox);
} else {
throw new ISE("Task is not ready");
}
}
catch (Exception e) {
log.warn(e, "Task failed");
throw e;
}
}
);
}
private AppenderatorDriverRealtimeIndexTask makeRealtimeTask(final String taskId)
{
return makeRealtimeTask(taskId, TransformSpec.NONE, true, 0, true, 0, 1);
return makeRealtimeTask(
taskId,
TransformSpec.NONE,
true,
0,
true,
0,
1
);
}
private AppenderatorDriverRealtimeIndexTask makeRealtimeTask(
final String taskId,
final Integer maxRowsPerSegment,
final Long maxTotalRows
)
{
return makeRealtimeTask(
taskId,
TransformSpec.NONE,
true,
0,
true,
0,
1,
maxRowsPerSegment,
maxTotalRows
);
}
private AppenderatorDriverRealtimeIndexTask makeRealtimeTask(final String taskId, boolean reportParseExceptions)
{
return makeRealtimeTask(taskId, TransformSpec.NONE, reportParseExceptions, 0, true, null, 1);
return makeRealtimeTask(
taskId,
TransformSpec.NONE,
reportParseExceptions,
0,
true,
null,
1
);
}
private AppenderatorDriverRealtimeIndexTask makeRealtimeTask(
@ -1249,6 +1344,32 @@ public class AppenderatorDriverRealtimeIndexTaskTest
final Integer maxParseExceptions,
final Integer maxSavedParseExceptions
)
{
return makeRealtimeTask(
taskId,
transformSpec,
reportParseExceptions,
handoffTimeout,
logParseExceptions,
maxParseExceptions,
maxSavedParseExceptions,
1000,
null
);
}
private AppenderatorDriverRealtimeIndexTask makeRealtimeTask(
final String taskId,
final TransformSpec transformSpec,
final boolean reportParseExceptions,
final long handoffTimeout,
final Boolean logParseExceptions,
final Integer maxParseExceptions,
final Integer maxSavedParseExceptions,
final Integer maxRowsPerSegment,
final Long maxTotalRows
)
{
ObjectMapper objectMapper = new DefaultObjectMapper();
DataSchema dataSchema = new DataSchema(
@ -1283,9 +1404,10 @@ public class AppenderatorDriverRealtimeIndexTaskTest
null
);
RealtimeAppenderatorTuningConfig tuningConfig = new RealtimeAppenderatorTuningConfig(
1000,
1000,
null,
maxRowsPerSegment,
maxTotalRows,
null,
null,
null,
@ -1425,49 +1547,37 @@ public class AppenderatorDriverRealtimeIndexTaskTest
new TimeseriesQueryRunnerFactory(
new TimeseriesQueryQueryToolChest(queryRunnerDecorator),
new TimeseriesQueryEngine(),
new QueryWatcher()
{
@Override
public void registerQuery(Query query, ListenableFuture future)
{
// do nothing
}
(query, future) -> {
// do nothing
}
)
)
);
handOffCallbacks = new ConcurrentHashMap<>();
final SegmentHandoffNotifierFactory handoffNotifierFactory = new SegmentHandoffNotifierFactory()
final SegmentHandoffNotifierFactory handoffNotifierFactory = dataSource -> new SegmentHandoffNotifier()
{
@Override
public SegmentHandoffNotifier createSegmentHandoffNotifier(String dataSource)
public boolean registerSegmentHandoffCallback(
SegmentDescriptor descriptor, Executor exec, Runnable handOffRunnable
)
{
return new SegmentHandoffNotifier()
{
@Override
public boolean registerSegmentHandoffCallback(
SegmentDescriptor descriptor, Executor exec, Runnable handOffRunnable
)
{
handOffCallbacks.put(descriptor, new Pair<>(exec, handOffRunnable));
handoffLatch.countDown();
return true;
}
@Override
public void start()
{
//Noop
}
@Override
public void close()
{
//Noop
}
};
handOffCallbacks.put(descriptor, new Pair<>(exec, handOffRunnable));
handoffLatch.countDown();
return true;
}
@Override
public void start()
{
//Noop
}
@Override
public void close()
{
//Noop
}
};
final TestUtils testUtils = new TestUtils();
rowIngestionMetersFactory = testUtils.getRowIngestionMetersFactory();

View File

@ -119,7 +119,7 @@ public interface Appenderator extends QuerySegmentWalker, Closeable
int getRowCount(SegmentIdentifier identifier);
/**
* Returns the number of total rows in this appenderator.
* Returns the number of total rows in this appenderator of all segments pending push.
*
* @return total number of rows
*/

View File

@ -30,12 +30,38 @@ public interface AppenderatorConfig
{
boolean isReportParseExceptions();
/**
* Maximum number of rows in memory before persisting to local storage
*/
int getMaxRowsInMemory();
/**
* Maximum number of bytes (estimated) to store in memory before persisting to local storage
*/
long getMaxBytesInMemory();
int getMaxPendingPersists();
/**
* Maximum number of rows in a single segment before pushing to deep storage
*/
default int getMaxRowsPerSegment()
{
return Integer.MAX_VALUE;
}
/**
* Maximum number of rows across all segments before pushing to deep storage
*/
@Nullable
default Long getMaxTotalRows()
{
throw new UnsupportedOperationException("maxTotalRows is not implemented.");
}
/**
* Period that sets frequency to persist to local storage if no other thresholds are met
*/
Period getIntermediatePersistPeriod();
IndexSpec getIndexSpec();

View File

@ -100,6 +100,16 @@ public class AppenderatorDriverAddResult
return isPersistRequired;
}
public boolean isPushRequired(AppenderatorConfig tuningConfig)
{
boolean overThreshold = getNumRowsInSegment() >= tuningConfig.getMaxRowsPerSegment();
Long maxTotal = tuningConfig.getMaxTotalRows();
if (maxTotal != null) {
overThreshold |= getTotalNumRowsInAppenderator() >= maxTotal;
}
return overThreshold;
}
@Nullable
public ParseException getParseException()
{

View File

@ -83,7 +83,6 @@ import org.joda.time.Interval;
import javax.annotation.Nullable;
import java.io.Closeable;
import java.io.File;
import java.io.FilenameFilter;
import java.io.IOException;
import java.nio.channels.FileChannel;
import java.nio.channels.FileLock;
@ -91,11 +90,9 @@ import java.nio.file.StandardOpenOption;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
@ -264,8 +261,8 @@ public class AppenderatorImpl implements Appenderator
final int numAddedRows = sinkRowsInMemoryAfterAdd - sinkRowsInMemoryBeforeAdd;
rowsCurrentlyInMemory.addAndGet(numAddedRows);
totalRows.addAndGet(numAddedRows);
bytesCurrentlyInMemory.addAndGet(bytesInMemoryAfterAdd - bytesInMemoryBeforeAdd);
totalRows.addAndGet(numAddedRows);
boolean isPersistRequired = false;
boolean persist = false;
@ -438,20 +435,15 @@ public class AppenderatorImpl implements Appenderator
if (persistExecutor != null) {
final ListenableFuture<?> uncommitFuture = persistExecutor.submit(
new Callable<Object>()
{
@Override
public Object call() throws Exception
{
try {
commitLock.lock();
objectMapper.writeValue(computeCommitFile(), Committed.nil());
}
finally {
commitLock.unlock();
}
return null;
() -> {
try {
commitLock.lock();
objectMapper.writeValue(computeCommitFile(), Committed.nil());
}
finally {
commitLock.unlock();
}
return null;
}
);
@ -610,7 +602,9 @@ public class AppenderatorImpl implements Appenderator
throw new ISE("No sink for identifier: %s", identifier);
}
theSinks.put(identifier, sink);
sink.finishWriting();
if (sink.finishWriting()) {
totalRows.addAndGet(-sink.getNumRows());
}
}
return Futures.transform(
@ -656,8 +650,8 @@ public class AppenderatorImpl implements Appenderator
* Merge segment, push to deep storage. Should only be used on segments that have been fully persisted. Must only
* be run in the single-threaded pushExecutor.
*
* @param identifier sink identifier
* @param sink sink to push
* @param identifier sink identifier
* @param sink sink to push
* @param useUniquePath true if the segment should be written to a path with a unique identifier
*
* @return segment descriptor, or null if the sink is no longer valid
@ -986,6 +980,8 @@ public class AppenderatorImpl implements Appenderator
commitLock.unlock();
}
int rowsSoFar = 0;
log.info("Loading sinks from[%s]: %s", baseDir, committed.getHydrants().keySet());
for (File sinkDir : files) {
@ -1011,26 +1007,12 @@ public class AppenderatorImpl implements Appenderator
// To avoid reading and listing of "merged" dir and other special files
final File[] sinkFiles = sinkDir.listFiles(
new FilenameFilter()
{
@Override
public boolean accept(File dir, String fileName)
{
return !(Ints.tryParse(fileName) == null);
}
}
(dir, fileName) -> !(Ints.tryParse(fileName) == null)
);
Arrays.sort(
sinkFiles,
new Comparator<File>()
{
@Override
public int compare(File o1, File o2)
{
return Ints.compare(Integer.parseInt(o1.getName()), Integer.parseInt(o2.getName()));
}
}
(o1, o2) -> Ints.compare(Integer.parseInt(o1.getName()), Integer.parseInt(o2.getName()))
);
List<FireHydrant> hydrants = Lists.newArrayList();
@ -1074,6 +1056,7 @@ public class AppenderatorImpl implements Appenderator
null,
hydrants
);
rowsSoFar += currSink.getNumRows();
sinks.put(identifier, currSink);
sinkTimeline.add(
currSink.getInterval(),
@ -1094,14 +1077,7 @@ public class AppenderatorImpl implements Appenderator
final Set<String> loadedSinks = Sets.newHashSet(
Iterables.transform(
sinks.keySet(),
new Function<SegmentIdentifier, String>()
{
@Override
public String apply(SegmentIdentifier input)
{
return input.getIdentifierAsString();
}
}
input -> input.getIdentifierAsString()
)
);
final Set<String> missingSinks = Sets.difference(committed.getHydrants().keySet(), loadedSinks);
@ -1109,6 +1085,7 @@ public class AppenderatorImpl implements Appenderator
throw new ISE("Missing committed sinks [%s]", Joiner.on(", ").join(missingSinks));
}
totalRows.set(rowsSoFar);
return committed.getMetadata();
}
@ -1119,16 +1096,17 @@ public class AppenderatorImpl implements Appenderator
)
{
// Ensure no future writes will be made to this sink.
sink.finishWriting();
if (sink.finishWriting()) {
// Decrement this sink's rows from the counters. we only count active sinks so that we don't double decrement,
// i.e. those that haven't been persisted for *InMemory counters, or pushed to deep storage for the total counter.
rowsCurrentlyInMemory.addAndGet(-sink.getNumRowsInMemory());
bytesCurrentlyInMemory.addAndGet(-sink.getBytesInMemory());
totalRows.addAndGet(-sink.getNumRows());
}
// Mark this identifier as dropping, so no future push tasks will pick it up.
droppingSinks.add(identifier);
// Decrement this sink's rows from rowsCurrentlyInMemory (we only count active sinks).
rowsCurrentlyInMemory.addAndGet(-sink.getNumRowsInMemory());
totalRows.addAndGet(-sink.getNumRows());
bytesCurrentlyInMemory.addAndGet(-sink.getBytesInMemory());
// Wait for any outstanding pushes to finish, then abandon the segment inside the persist thread.
return Futures.transform(
pushBarrier(),

View File

@ -223,12 +223,20 @@ public class Sink implements Iterable<FireHydrant>
return !writable;
}
public void finishWriting()
/**
* Marks sink as 'finished', preventing further writes.
* @return 'true' if sink was sucessfully finished, 'false' if sink was already finished
*/
public boolean finishWriting()
{
synchronized (hydrantLock) {
if (!writable) {
return false;
}
writable = false;
clearDedupCache();
}
return true;
}
public DataSegment getSegment()