mirror of https://github.com/apache/druid.git
replace maxRowsInMemory with BufferSize
This commit is contained in:
parent
88a904e0b3
commit
33354cf7fe
|
@ -41,7 +41,7 @@ The property `druid.realtime.specFile` has the path of a file (absolute or relat
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
"config": {
|
"config": {
|
||||||
"maxRowsInMemory": 500000,
|
"bufferSize": 500000000,
|
||||||
"intermediatePersistPeriod": "PT10m"
|
"intermediatePersistPeriod": "PT10m"
|
||||||
},
|
},
|
||||||
"firehose": {
|
"firehose": {
|
||||||
|
@ -104,7 +104,7 @@ This provides configuration for the data processing portion of the realtime stre
|
||||||
|Field|Type|Description|Required|
|
|Field|Type|Description|Required|
|
||||||
|-----|----|-----------|--------|
|
|-----|----|-----------|--------|
|
||||||
|intermediatePersistPeriod|ISO8601 Period String|The period that determines the rate at which intermediate persists occur. These persists determine how often commits happen against the incoming realtime stream. If the realtime data loading process is interrupted at time T, it should be restarted to re-read data that arrived at T minus this period.|yes|
|
|intermediatePersistPeriod|ISO8601 Period String|The period that determines the rate at which intermediate persists occur. These persists determine how often commits happen against the incoming realtime stream. If the realtime data loading process is interrupted at time T, it should be restarted to re-read data that arrived at T minus this period.|yes|
|
||||||
|maxRowsInMemory|Number|The number of rows to aggregate before persisting. This number is the post-aggregation rows, so it is not equivalent to the number of input events, but the number of aggregated rows that those events result in. This is used to manage the required JVM heap size.|yes|
|
|bufferSize|Number|The size in bytes of buffer to be used for ingestion. When the buffer is full intermediate rows will be persisted to disk.|yes|
|
||||||
|
|
||||||
|
|
||||||
### Firehose
|
### Firehose
|
||||||
|
@ -132,8 +132,8 @@ The following table summarizes constraints between settings in the spec file for
|
||||||
| windowPeriod| when reading an InputRow, events with timestamp older than now minus this window are discarded | time jitter tolerance | use this to reject outliers |
|
| windowPeriod| when reading an InputRow, events with timestamp older than now minus this window are discarded | time jitter tolerance | use this to reject outliers |
|
||||||
| segmentGranularity| time granularity (minute, hour, day, week, month) for loading data at query time | equal to indexGranularity| more than indexGranularity|
|
| segmentGranularity| time granularity (minute, hour, day, week, month) for loading data at query time | equal to indexGranularity| more than indexGranularity|
|
||||||
| indexGranularity| time granularity (minute, hour, day, week, month) of indexes | less than segmentGranularity| minute, hour, day, week, month |
|
| indexGranularity| time granularity (minute, hour, day, week, month) of indexes | less than segmentGranularity| minute, hour, day, week, month |
|
||||||
| intermediatePersistPeriod| the max real time (ISO8601 Period) between flushes of InputRows from memory to disk | avoid excessive flushing | number of un-persisted rows in memory also constrained by maxRowsInMemory |
|
| intermediatePersistPeriod| the max real time (ISO8601 Period) between flushes of InputRows from memory to disk | avoid excessive flushing | size of un-persisted rows in memory also constrained by bufferSize |
|
||||||
| maxRowsInMemory| the max number of InputRows to hold in memory before a flush to disk | number of un-persisted post-aggregation rows in memory is also constrained by intermediatePersistPeriod | use this to avoid running out of heap if too many rows in an intermediatePersistPeriod |
|
| bufferSize| size of offheap buffer to be used to hold Input Rows before a flush to disk |
|
||||||
|
|
||||||
The normal, expected use cases have the following overall constraints: `indexGranularity < intermediatePersistPeriod =< windowPeriod < segmentGranularity`
|
The normal, expected use cases have the following overall constraints: `indexGranularity < intermediatePersistPeriod =< windowPeriod < segmentGranularity`
|
||||||
|
|
||||||
|
|
|
@ -57,7 +57,7 @@ The Index Task is a simpler variation of the Index Hadoop task that is designed
|
||||||
|indexGranularity|The rollup granularity for timestamps. See [Realtime Ingestion](Realtime-ingestion.html) for more information. |no|
|
|indexGranularity|The rollup granularity for timestamps. See [Realtime Ingestion](Realtime-ingestion.html) for more information. |no|
|
||||||
|targetPartitionSize|Used in sharding. Determines how many rows are in each segment.|no|
|
|targetPartitionSize|Used in sharding. Determines how many rows are in each segment.|no|
|
||||||
|firehose|The input source of data. For more info, see [Firehose](Firehose.html).|yes|
|
|firehose|The input source of data. For more info, see [Firehose](Firehose.html).|yes|
|
||||||
|rowFlushBoundary|Used in determining when intermediate persist should occur to disk.|no|
|
|bufferSize|Used in determining the size of offheap buffer to be used to store intermediate rows. When the buffer gets full, rows are persisted to disk.|no|
|
||||||
|
|
||||||
### Index Hadoop Task
|
### Index Hadoop Task
|
||||||
|
|
||||||
|
|
|
@ -65,7 +65,7 @@ import static java.lang.Thread.sleep;
|
||||||
* {"type":"doubleSum","name":"outColumn","fieldName":"inColumn"} ],
|
* {"type":"doubleSum","name":"outColumn","fieldName":"inColumn"} ],
|
||||||
* "indexGranularity":"minute",
|
* "indexGranularity":"minute",
|
||||||
* "shardSpec" : { "type": "none" } },
|
* "shardSpec" : { "type": "none" } },
|
||||||
* "config" : { "maxRowsInMemory" : 50000,
|
* "config" : { "bufferSize" : 50000000,
|
||||||
* "intermediatePersistPeriod" : "PT2m" },
|
* "intermediatePersistPeriod" : "PT2m" },
|
||||||
*
|
*
|
||||||
* "firehose" : { "type" : "rand",
|
* "firehose" : { "type" : "rand",
|
||||||
|
|
|
@ -38,7 +38,7 @@ public class HadoopTuningConfig implements TuningConfig
|
||||||
{
|
{
|
||||||
private static final PartitionsSpec defaultPartitionsSpec = HashedPartitionsSpec.makeDefaultHashedPartitionsSpec();
|
private static final PartitionsSpec defaultPartitionsSpec = HashedPartitionsSpec.makeDefaultHashedPartitionsSpec();
|
||||||
private static final Map<DateTime, List<HadoopyShardSpec>> defaultShardSpecs = ImmutableMap.<DateTime, List<HadoopyShardSpec>>of();
|
private static final Map<DateTime, List<HadoopyShardSpec>> defaultShardSpecs = ImmutableMap.<DateTime, List<HadoopyShardSpec>>of();
|
||||||
private static final int defaultRowFlushBoundary = 80000;
|
private static final int defaultBufferSize = 256 * 1024 * 1024;
|
||||||
|
|
||||||
public static HadoopTuningConfig makeDefaultTuningConfig()
|
public static HadoopTuningConfig makeDefaultTuningConfig()
|
||||||
{
|
{
|
||||||
|
@ -47,7 +47,7 @@ public class HadoopTuningConfig implements TuningConfig
|
||||||
new DateTime().toString(),
|
new DateTime().toString(),
|
||||||
defaultPartitionsSpec,
|
defaultPartitionsSpec,
|
||||||
defaultShardSpecs,
|
defaultShardSpecs,
|
||||||
defaultRowFlushBoundary,
|
defaultBufferSize,
|
||||||
false,
|
false,
|
||||||
true,
|
true,
|
||||||
false,
|
false,
|
||||||
|
@ -61,7 +61,7 @@ public class HadoopTuningConfig implements TuningConfig
|
||||||
private final String version;
|
private final String version;
|
||||||
private final PartitionsSpec partitionsSpec;
|
private final PartitionsSpec partitionsSpec;
|
||||||
private final Map<DateTime, List<HadoopyShardSpec>> shardSpecs;
|
private final Map<DateTime, List<HadoopyShardSpec>> shardSpecs;
|
||||||
private final int rowFlushBoundary;
|
private final int bufferSize;
|
||||||
private final boolean leaveIntermediate;
|
private final boolean leaveIntermediate;
|
||||||
private final Boolean cleanupOnFailure;
|
private final Boolean cleanupOnFailure;
|
||||||
private final boolean overwriteFiles;
|
private final boolean overwriteFiles;
|
||||||
|
@ -75,7 +75,7 @@ public class HadoopTuningConfig implements TuningConfig
|
||||||
final @JsonProperty("version") String version,
|
final @JsonProperty("version") String version,
|
||||||
final @JsonProperty("partitionsSpec") PartitionsSpec partitionsSpec,
|
final @JsonProperty("partitionsSpec") PartitionsSpec partitionsSpec,
|
||||||
final @JsonProperty("shardSpecs") Map<DateTime, List<HadoopyShardSpec>> shardSpecs,
|
final @JsonProperty("shardSpecs") Map<DateTime, List<HadoopyShardSpec>> shardSpecs,
|
||||||
final @JsonProperty("rowFlushBoundary") Integer rowFlushBoundary,
|
final @JsonProperty("bufferSize") Integer bufferSize,
|
||||||
final @JsonProperty("leaveIntermediate") boolean leaveIntermediate,
|
final @JsonProperty("leaveIntermediate") boolean leaveIntermediate,
|
||||||
final @JsonProperty("cleanupOnFailure") Boolean cleanupOnFailure,
|
final @JsonProperty("cleanupOnFailure") Boolean cleanupOnFailure,
|
||||||
final @JsonProperty("overwriteFiles") boolean overwriteFiles,
|
final @JsonProperty("overwriteFiles") boolean overwriteFiles,
|
||||||
|
@ -88,7 +88,7 @@ public class HadoopTuningConfig implements TuningConfig
|
||||||
this.version = version == null ? new DateTime().toString() : version;
|
this.version = version == null ? new DateTime().toString() : version;
|
||||||
this.partitionsSpec = partitionsSpec == null ? defaultPartitionsSpec : partitionsSpec;
|
this.partitionsSpec = partitionsSpec == null ? defaultPartitionsSpec : partitionsSpec;
|
||||||
this.shardSpecs = shardSpecs == null ? defaultShardSpecs : shardSpecs;
|
this.shardSpecs = shardSpecs == null ? defaultShardSpecs : shardSpecs;
|
||||||
this.rowFlushBoundary = rowFlushBoundary == null ? defaultRowFlushBoundary : rowFlushBoundary;
|
this.bufferSize = bufferSize == null ? defaultBufferSize : bufferSize;
|
||||||
this.leaveIntermediate = leaveIntermediate;
|
this.leaveIntermediate = leaveIntermediate;
|
||||||
this.cleanupOnFailure = cleanupOnFailure == null ? true : cleanupOnFailure;
|
this.cleanupOnFailure = cleanupOnFailure == null ? true : cleanupOnFailure;
|
||||||
this.overwriteFiles = overwriteFiles;
|
this.overwriteFiles = overwriteFiles;
|
||||||
|
@ -124,9 +124,9 @@ public class HadoopTuningConfig implements TuningConfig
|
||||||
}
|
}
|
||||||
|
|
||||||
@JsonProperty
|
@JsonProperty
|
||||||
public int getRowFlushBoundary()
|
public int getBufferSize()
|
||||||
{
|
{
|
||||||
return rowFlushBoundary;
|
return bufferSize;
|
||||||
}
|
}
|
||||||
|
|
||||||
@JsonProperty
|
@JsonProperty
|
||||||
|
@ -172,7 +172,7 @@ public class HadoopTuningConfig implements TuningConfig
|
||||||
version,
|
version,
|
||||||
partitionsSpec,
|
partitionsSpec,
|
||||||
shardSpecs,
|
shardSpecs,
|
||||||
rowFlushBoundary,
|
bufferSize,
|
||||||
leaveIntermediate,
|
leaveIntermediate,
|
||||||
cleanupOnFailure,
|
cleanupOnFailure,
|
||||||
overwriteFiles,
|
overwriteFiles,
|
||||||
|
@ -189,7 +189,7 @@ public class HadoopTuningConfig implements TuningConfig
|
||||||
ver,
|
ver,
|
||||||
partitionsSpec,
|
partitionsSpec,
|
||||||
shardSpecs,
|
shardSpecs,
|
||||||
rowFlushBoundary,
|
bufferSize,
|
||||||
leaveIntermediate,
|
leaveIntermediate,
|
||||||
cleanupOnFailure,
|
cleanupOnFailure,
|
||||||
overwriteFiles,
|
overwriteFiles,
|
||||||
|
@ -206,7 +206,7 @@ public class HadoopTuningConfig implements TuningConfig
|
||||||
version,
|
version,
|
||||||
partitionsSpec,
|
partitionsSpec,
|
||||||
specs,
|
specs,
|
||||||
rowFlushBoundary,
|
bufferSize,
|
||||||
leaveIntermediate,
|
leaveIntermediate,
|
||||||
cleanupOnFailure,
|
cleanupOnFailure,
|
||||||
overwriteFiles,
|
overwriteFiles,
|
||||||
|
|
|
@ -313,7 +313,7 @@ public class IndexGeneratorJob implements Jobby
|
||||||
int numRows = index.add(inputRow);
|
int numRows = index.add(inputRow);
|
||||||
++lineCount;
|
++lineCount;
|
||||||
|
|
||||||
if (numRows >= config.getSchema().getTuningConfig().getRowFlushBoundary()) {
|
if (index.isFull()) {
|
||||||
log.info(
|
log.info(
|
||||||
"%,d lines to %,d rows in %,d millis",
|
"%,d lines to %,d rows in %,d millis",
|
||||||
lineCount - runningTotalLineCount,
|
lineCount - runningTotalLineCount,
|
||||||
|
@ -602,11 +602,6 @@ public class IndexGeneratorJob implements Jobby
|
||||||
|
|
||||||
private IncrementalIndex makeIncrementalIndex(Bucket theBucket, AggregatorFactory[] aggs)
|
private IncrementalIndex makeIncrementalIndex(Bucket theBucket, AggregatorFactory[] aggs)
|
||||||
{
|
{
|
||||||
int aggsSize = 0;
|
|
||||||
for (AggregatorFactory agg : aggs) {
|
|
||||||
aggsSize += agg.getMaxIntermediateSize();
|
|
||||||
}
|
|
||||||
int bufferSize = aggsSize * config.getSchema().getTuningConfig().getRowFlushBoundary();
|
|
||||||
return new IncrementalIndex(
|
return new IncrementalIndex(
|
||||||
new IncrementalIndexSchema.Builder()
|
new IncrementalIndexSchema.Builder()
|
||||||
.withMinTimestamp(theBucket.time.getMillis())
|
.withMinTimestamp(theBucket.time.getMillis())
|
||||||
|
@ -614,7 +609,7 @@ public class IndexGeneratorJob implements Jobby
|
||||||
.withQueryGranularity(config.getSchema().getDataSchema().getGranularitySpec().getQueryGranularity())
|
.withQueryGranularity(config.getSchema().getDataSchema().getGranularitySpec().getQueryGranularity())
|
||||||
.withMetrics(aggs)
|
.withMetrics(aggs)
|
||||||
.build(),
|
.build(),
|
||||||
new OffheapBufferPool(bufferSize)
|
new OffheapBufferPool(config.getSchema().getTuningConfig().getBufferSize())
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -19,7 +19,6 @@
|
||||||
|
|
||||||
package io.druid.indexing.common.task;
|
package io.druid.indexing.common.task;
|
||||||
|
|
||||||
import com.fasterxml.jackson.annotation.JacksonInject;
|
|
||||||
import com.fasterxml.jackson.annotation.JsonCreator;
|
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||||
import com.fasterxml.jackson.annotation.JsonIgnore;
|
import com.fasterxml.jackson.annotation.JsonIgnore;
|
||||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||||
|
@ -44,7 +43,6 @@ import io.druid.query.QueryRunner;
|
||||||
import io.druid.query.QueryRunnerFactory;
|
import io.druid.query.QueryRunnerFactory;
|
||||||
import io.druid.query.QueryRunnerFactoryConglomerate;
|
import io.druid.query.QueryRunnerFactoryConglomerate;
|
||||||
import io.druid.query.QueryToolChest;
|
import io.druid.query.QueryToolChest;
|
||||||
import io.druid.segment.column.ColumnConfig;
|
|
||||||
import io.druid.segment.indexing.DataSchema;
|
import io.druid.segment.indexing.DataSchema;
|
||||||
import io.druid.segment.indexing.RealtimeIOConfig;
|
import io.druid.segment.indexing.RealtimeIOConfig;
|
||||||
import io.druid.segment.indexing.RealtimeTuningConfig;
|
import io.druid.segment.indexing.RealtimeTuningConfig;
|
||||||
|
@ -57,6 +55,7 @@ import io.druid.segment.realtime.SegmentPublisher;
|
||||||
import io.druid.segment.realtime.plumber.Plumber;
|
import io.druid.segment.realtime.plumber.Plumber;
|
||||||
import io.druid.segment.realtime.plumber.RealtimePlumberSchool;
|
import io.druid.segment.realtime.plumber.RealtimePlumberSchool;
|
||||||
import io.druid.segment.realtime.plumber.RejectionPolicyFactory;
|
import io.druid.segment.realtime.plumber.RejectionPolicyFactory;
|
||||||
|
import io.druid.segment.realtime.plumber.Sink;
|
||||||
import io.druid.segment.realtime.plumber.VersioningPolicy;
|
import io.druid.segment.realtime.plumber.VersioningPolicy;
|
||||||
import io.druid.server.coordination.DataSegmentAnnouncer;
|
import io.druid.server.coordination.DataSegmentAnnouncer;
|
||||||
import io.druid.timeline.DataSegment;
|
import io.druid.timeline.DataSegment;
|
||||||
|
@ -70,36 +69,10 @@ import java.io.IOException;
|
||||||
public class RealtimeIndexTask extends AbstractTask
|
public class RealtimeIndexTask extends AbstractTask
|
||||||
{
|
{
|
||||||
private static final EmittingLogger log = new EmittingLogger(RealtimeIndexTask.class);
|
private static final EmittingLogger log = new EmittingLogger(RealtimeIndexTask.class);
|
||||||
|
|
||||||
private static String makeTaskId(FireDepartment fireDepartment, Schema schema)
|
|
||||||
{
|
|
||||||
// Backwards compatible
|
|
||||||
if (fireDepartment == null) {
|
|
||||||
return String.format(
|
|
||||||
"index_realtime_%s_%d_%s",
|
|
||||||
schema.getDataSource(), schema.getShardSpec().getPartitionNum(), new DateTime().toString()
|
|
||||||
);
|
|
||||||
} else {
|
|
||||||
return String.format(
|
|
||||||
"index_realtime_%s_%d_%s",
|
|
||||||
fireDepartment.getDataSchema().getDataSource(),
|
|
||||||
fireDepartment.getTuningConfig().getShardSpec().getPartitionNum(),
|
|
||||||
new DateTime().toString()
|
|
||||||
);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private static String makeDatasource(FireDepartment fireDepartment, Schema schema)
|
|
||||||
{
|
|
||||||
return (fireDepartment != null) ? fireDepartment.getDataSchema().getDataSource() : schema.getDataSource();
|
|
||||||
}
|
|
||||||
|
|
||||||
@JsonIgnore
|
@JsonIgnore
|
||||||
private final FireDepartment spec;
|
private final FireDepartment spec;
|
||||||
|
|
||||||
@JsonIgnore
|
@JsonIgnore
|
||||||
private volatile Plumber plumber = null;
|
private volatile Plumber plumber = null;
|
||||||
|
|
||||||
@JsonIgnore
|
@JsonIgnore
|
||||||
private volatile QueryRunnerFactoryConglomerate queryRunnerFactoryConglomerate = null;
|
private volatile QueryRunnerFactoryConglomerate queryRunnerFactoryConglomerate = null;
|
||||||
|
|
||||||
|
@ -152,6 +125,29 @@ public class RealtimeIndexTask extends AbstractTask
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static String makeTaskId(FireDepartment fireDepartment, Schema schema)
|
||||||
|
{
|
||||||
|
// Backwards compatible
|
||||||
|
if (fireDepartment == null) {
|
||||||
|
return String.format(
|
||||||
|
"index_realtime_%s_%d_%s",
|
||||||
|
schema.getDataSource(), schema.getShardSpec().getPartitionNum(), new DateTime().toString()
|
||||||
|
);
|
||||||
|
} else {
|
||||||
|
return String.format(
|
||||||
|
"index_realtime_%s_%d_%s",
|
||||||
|
fireDepartment.getDataSchema().getDataSource(),
|
||||||
|
fireDepartment.getTuningConfig().getShardSpec().getPartitionNum(),
|
||||||
|
new DateTime().toString()
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static String makeDatasource(FireDepartment fireDepartment, Schema schema)
|
||||||
|
{
|
||||||
|
return (fireDepartment != null) ? fireDepartment.getDataSchema().getDataSource() : schema.getDataSource();
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String getType()
|
public String getType()
|
||||||
{
|
{
|
||||||
|
@ -263,7 +259,8 @@ public class RealtimeIndexTask extends AbstractTask
|
||||||
// Shouldn't usually happen, since we don't expect people to submit tasks that intersect with the
|
// Shouldn't usually happen, since we don't expect people to submit tasks that intersect with the
|
||||||
// realtime window, but if they do it can be problematic. If we decide to care, we can use more threads in
|
// realtime window, but if they do it can be problematic. If we decide to care, we can use more threads in
|
||||||
// the plumber such that waiting for the coordinator doesn't block data processing.
|
// the plumber such that waiting for the coordinator doesn't block data processing.
|
||||||
final VersioningPolicy versioningPolicy = new VersioningPolicy()
|
final VersioningPolicy versioningPolicy = new
|
||||||
|
VersioningPolicy()
|
||||||
{
|
{
|
||||||
@Override
|
@Override
|
||||||
public String getVersion(final Interval interval)
|
public String getVersion(final Interval interval)
|
||||||
|
@ -352,7 +349,8 @@ public class RealtimeIndexTask extends AbstractTask
|
||||||
}
|
}
|
||||||
|
|
||||||
fireDepartment.getMetrics().incrementProcessed();
|
fireDepartment.getMetrics().incrementProcessed();
|
||||||
if (currCount >= tuningConfig.getMaxRowsInMemory() || System.currentTimeMillis() > nextFlush) {
|
final Sink sink = plumber.getSink(inputRow.getTimestampFromEpoch());
|
||||||
|
if ((sink != null && sink.isFull()) || System.currentTimeMillis() > nextFlush) {
|
||||||
plumber.persist(firehose.commit());
|
plumber.persist(firehose.commit());
|
||||||
nextFlush = new DateTime().plus(intermediatePersistPeriod).getMillis();
|
nextFlush = new DateTime().plus(intermediatePersistPeriod).getMillis();
|
||||||
}
|
}
|
||||||
|
|
|
@ -471,6 +471,15 @@ public class IncrementalIndex implements Iterable<Row>, Closeable
|
||||||
return numEntries.get() == 0;
|
return numEntries.get() == 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
*
|
||||||
|
* @return true if the underlying buffer for IncrementalIndex is full and cannot accomodate more rows.
|
||||||
|
*/
|
||||||
|
public boolean isFull()
|
||||||
|
{
|
||||||
|
return (numEntries.get() + 1) * totalAggSize > bufferHolder.get().limit();
|
||||||
|
}
|
||||||
|
|
||||||
public int size()
|
public int size()
|
||||||
{
|
{
|
||||||
return numEntries.get();
|
return numEntries.get();
|
||||||
|
|
|
@ -36,7 +36,7 @@ import java.io.File;
|
||||||
*/
|
*/
|
||||||
public class RealtimeTuningConfig implements TuningConfig
|
public class RealtimeTuningConfig implements TuningConfig
|
||||||
{
|
{
|
||||||
private static final int defaultMaxRowsInMemory = 500000;
|
private static final int defaultBufferSize = 256 * 1024 * 1024;
|
||||||
private static final Period defaultIntermediatePersistPeriod = new Period("PT10M");
|
private static final Period defaultIntermediatePersistPeriod = new Period("PT10M");
|
||||||
private static final Period defaultWindowPeriod = new Period("PT10M");
|
private static final Period defaultWindowPeriod = new Period("PT10M");
|
||||||
private static final File defaultBasePersistDirectory = Files.createTempDir();
|
private static final File defaultBasePersistDirectory = Files.createTempDir();
|
||||||
|
@ -49,7 +49,7 @@ public class RealtimeTuningConfig implements TuningConfig
|
||||||
public static RealtimeTuningConfig makeDefaultTuningConfig()
|
public static RealtimeTuningConfig makeDefaultTuningConfig()
|
||||||
{
|
{
|
||||||
return new RealtimeTuningConfig(
|
return new RealtimeTuningConfig(
|
||||||
defaultMaxRowsInMemory,
|
defaultBufferSize,
|
||||||
defaultIntermediatePersistPeriod,
|
defaultIntermediatePersistPeriod,
|
||||||
defaultWindowPeriod,
|
defaultWindowPeriod,
|
||||||
defaultBasePersistDirectory,
|
defaultBasePersistDirectory,
|
||||||
|
@ -60,7 +60,7 @@ public class RealtimeTuningConfig implements TuningConfig
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
private final int maxRowsInMemory;
|
private final int bufferSize;
|
||||||
private final Period intermediatePersistPeriod;
|
private final Period intermediatePersistPeriod;
|
||||||
private final Period windowPeriod;
|
private final Period windowPeriod;
|
||||||
private final File basePersistDirectory;
|
private final File basePersistDirectory;
|
||||||
|
@ -71,7 +71,7 @@ public class RealtimeTuningConfig implements TuningConfig
|
||||||
|
|
||||||
@JsonCreator
|
@JsonCreator
|
||||||
public RealtimeTuningConfig(
|
public RealtimeTuningConfig(
|
||||||
@JsonProperty("maxRowsInMemory") Integer maxRowsInMemory,
|
@JsonProperty("bufferSize") Integer bufferSize,
|
||||||
@JsonProperty("intermediatePersistPeriod") Period intermediatePersistPeriod,
|
@JsonProperty("intermediatePersistPeriod") Period intermediatePersistPeriod,
|
||||||
@JsonProperty("windowPeriod") Period windowPeriod,
|
@JsonProperty("windowPeriod") Period windowPeriod,
|
||||||
@JsonProperty("basePersistDirectory") File basePersistDirectory,
|
@JsonProperty("basePersistDirectory") File basePersistDirectory,
|
||||||
|
@ -81,7 +81,7 @@ public class RealtimeTuningConfig implements TuningConfig
|
||||||
@JsonProperty("shardSpec") ShardSpec shardSpec
|
@JsonProperty("shardSpec") ShardSpec shardSpec
|
||||||
)
|
)
|
||||||
{
|
{
|
||||||
this.maxRowsInMemory = maxRowsInMemory == null ? defaultMaxRowsInMemory : maxRowsInMemory;
|
this.bufferSize = bufferSize == null ? defaultBufferSize : bufferSize;
|
||||||
this.intermediatePersistPeriod = intermediatePersistPeriod == null
|
this.intermediatePersistPeriod = intermediatePersistPeriod == null
|
||||||
? defaultIntermediatePersistPeriod
|
? defaultIntermediatePersistPeriod
|
||||||
: intermediatePersistPeriod;
|
: intermediatePersistPeriod;
|
||||||
|
@ -96,9 +96,9 @@ public class RealtimeTuningConfig implements TuningConfig
|
||||||
}
|
}
|
||||||
|
|
||||||
@JsonProperty
|
@JsonProperty
|
||||||
public int getMaxRowsInMemory()
|
public int getBufferSize()
|
||||||
{
|
{
|
||||||
return maxRowsInMemory;
|
return bufferSize;
|
||||||
}
|
}
|
||||||
|
|
||||||
@JsonProperty
|
@JsonProperty
|
||||||
|
@ -146,7 +146,7 @@ public class RealtimeTuningConfig implements TuningConfig
|
||||||
public RealtimeTuningConfig withVersioningPolicy(VersioningPolicy policy)
|
public RealtimeTuningConfig withVersioningPolicy(VersioningPolicy policy)
|
||||||
{
|
{
|
||||||
return new RealtimeTuningConfig(
|
return new RealtimeTuningConfig(
|
||||||
maxRowsInMemory,
|
bufferSize,
|
||||||
intermediatePersistPeriod,
|
intermediatePersistPeriod,
|
||||||
windowPeriod,
|
windowPeriod,
|
||||||
basePersistDirectory,
|
basePersistDirectory,
|
||||||
|
@ -160,7 +160,7 @@ public class RealtimeTuningConfig implements TuningConfig
|
||||||
public RealtimeTuningConfig withBasePersistDirectory(File dir)
|
public RealtimeTuningConfig withBasePersistDirectory(File dir)
|
||||||
{
|
{
|
||||||
return new RealtimeTuningConfig(
|
return new RealtimeTuningConfig(
|
||||||
maxRowsInMemory,
|
bufferSize,
|
||||||
intermediatePersistPeriod,
|
intermediatePersistPeriod,
|
||||||
windowPeriod,
|
windowPeriod,
|
||||||
dir,
|
dir,
|
||||||
|
|
|
@ -43,6 +43,7 @@ import io.druid.query.SegmentDescriptor;
|
||||||
import io.druid.segment.indexing.DataSchema;
|
import io.druid.segment.indexing.DataSchema;
|
||||||
import io.druid.segment.indexing.RealtimeTuningConfig;
|
import io.druid.segment.indexing.RealtimeTuningConfig;
|
||||||
import io.druid.segment.realtime.plumber.Plumber;
|
import io.druid.segment.realtime.plumber.Plumber;
|
||||||
|
import io.druid.segment.realtime.plumber.Sink;
|
||||||
import org.joda.time.DateTime;
|
import org.joda.time.DateTime;
|
||||||
import org.joda.time.Interval;
|
import org.joda.time.Interval;
|
||||||
import org.joda.time.Period;
|
import org.joda.time.Period;
|
||||||
|
@ -208,7 +209,8 @@ public class RealtimeManager implements QuerySegmentWalker
|
||||||
|
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
if (currCount >= config.getMaxRowsInMemory() || System.currentTimeMillis() > nextFlush) {
|
final Sink sink = plumber.getSink(inputRow.getTimestampFromEpoch());
|
||||||
|
if ((sink != null && sink.isFull()) || System.currentTimeMillis() > nextFlush) {
|
||||||
plumber.persist(firehose.commit());
|
plumber.persist(firehose.commit());
|
||||||
nextFlush = new DateTime().plus(intermediatePersistPeriod).getMillis();
|
nextFlush = new DateTime().plus(intermediatePersistPeriod).getMillis();
|
||||||
}
|
}
|
||||||
|
|
|
@ -52,4 +52,6 @@ public interface Plumber
|
||||||
* fed into sinks and persisted.
|
* fed into sinks and persisted.
|
||||||
*/
|
*/
|
||||||
public void finishJob();
|
public void finishJob();
|
||||||
|
|
||||||
|
public Sink getSink(long timeStamp);
|
||||||
}
|
}
|
||||||
|
|
|
@ -133,6 +133,13 @@ public class Sink implements Iterable<FireHydrant>
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public boolean isFull()
|
||||||
|
{
|
||||||
|
synchronized (currHydrant){
|
||||||
|
return currHydrant.getIndex().isFull();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* If currHydrant is A, creates a new index B, sets currHydrant to B and returns A.
|
* If currHydrant is A, creates a new index B, sets currHydrant to B and returns A.
|
||||||
*
|
*
|
||||||
|
@ -176,11 +183,6 @@ public class Sink implements Iterable<FireHydrant>
|
||||||
|
|
||||||
private FireHydrant makeNewCurrIndex(long minTimestamp, DataSchema schema)
|
private FireHydrant makeNewCurrIndex(long minTimestamp, DataSchema schema)
|
||||||
{
|
{
|
||||||
int aggsSize = 0;
|
|
||||||
for (AggregatorFactory agg : schema.getAggregators()) {
|
|
||||||
aggsSize += agg.getMaxIntermediateSize();
|
|
||||||
}
|
|
||||||
int bufferSize = aggsSize * config.getMaxRowsInMemory();
|
|
||||||
IncrementalIndex newIndex = new IncrementalIndex(
|
IncrementalIndex newIndex = new IncrementalIndex(
|
||||||
new IncrementalIndexSchema.Builder()
|
new IncrementalIndexSchema.Builder()
|
||||||
.withMinTimestamp(minTimestamp)
|
.withMinTimestamp(minTimestamp)
|
||||||
|
@ -188,7 +190,7 @@ public class Sink implements Iterable<FireHydrant>
|
||||||
.withDimensionsSpec(schema.getParser())
|
.withDimensionsSpec(schema.getParser())
|
||||||
.withMetrics(schema.getAggregators())
|
.withMetrics(schema.getAggregators())
|
||||||
.build(),
|
.build(),
|
||||||
new OffheapBufferPool(bufferSize)
|
new OffheapBufferPool(config.getBufferSize())
|
||||||
);
|
);
|
||||||
|
|
||||||
FireHydrant old;
|
FireHydrant old;
|
||||||
|
|
|
@ -69,11 +69,11 @@ public class RealtimeManagerTest
|
||||||
final List<InputRow> rows = Arrays.asList(
|
final List<InputRow> rows = Arrays.asList(
|
||||||
makeRow(new DateTime("9000-01-01").getMillis()), makeRow(new DateTime().getMillis())
|
makeRow(new DateTime("9000-01-01").getMillis()), makeRow(new DateTime().getMillis())
|
||||||
);
|
);
|
||||||
|
final AggregatorFactory[] aggs = {new CountAggregatorFactory("rows")};
|
||||||
schema = new DataSchema(
|
schema = new DataSchema(
|
||||||
"test",
|
"test",
|
||||||
null,
|
null,
|
||||||
new AggregatorFactory[]{new CountAggregatorFactory("rows")},
|
aggs,
|
||||||
new UniformGranularitySpec(Granularity.HOUR, QueryGranularity.NONE, null, Granularity.HOUR)
|
new UniformGranularitySpec(Granularity.HOUR, QueryGranularity.NONE, null, Granularity.HOUR)
|
||||||
);
|
);
|
||||||
RealtimeIOConfig ioConfig = new RealtimeIOConfig(
|
RealtimeIOConfig ioConfig = new RealtimeIOConfig(
|
||||||
|
@ -108,8 +108,13 @@ public class RealtimeManagerTest
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
);
|
);
|
||||||
|
int rowSize = 0;
|
||||||
|
for (AggregatorFactory agg : aggs) {
|
||||||
|
rowSize += agg.getMaxIntermediateSize();
|
||||||
|
}
|
||||||
|
|
||||||
RealtimeTuningConfig tuningConfig = new RealtimeTuningConfig(
|
RealtimeTuningConfig tuningConfig = new RealtimeTuningConfig(
|
||||||
1,
|
rowSize,
|
||||||
new Period("P1Y"),
|
new Period("P1Y"),
|
||||||
null,
|
null,
|
||||||
null,
|
null,
|
||||||
|
@ -201,7 +206,6 @@ public class RealtimeManagerTest
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
private static class TestFirehose implements Firehose
|
private static class TestFirehose implements Firehose
|
||||||
{
|
{
|
||||||
private final Iterator<InputRow> rows;
|
private final Iterator<InputRow> rows;
|
||||||
|
@ -238,8 +242,6 @@ public class RealtimeManagerTest
|
||||||
private static class TestPlumber implements Plumber
|
private static class TestPlumber implements Plumber
|
||||||
{
|
{
|
||||||
private final Sink sink;
|
private final Sink sink;
|
||||||
|
|
||||||
|
|
||||||
private volatile boolean startedJob = false;
|
private volatile boolean startedJob = false;
|
||||||
private volatile boolean finishedJob = false;
|
private volatile boolean finishedJob = false;
|
||||||
private volatile int persistCount = 0;
|
private volatile int persistCount = 0;
|
||||||
|
|
Loading…
Reference in New Issue