From 8eb6466487d0f363939da41eddd715fa1348ddc9 Mon Sep 17 00:00:00 2001 From: nishantmonu51 Date: Fri, 19 Sep 2014 23:06:04 +0530 Subject: [PATCH] revert buffer size and add back rowFlushBoundary --- docs/content/Realtime-ingestion.md | 8 +- docs/content/Tasks.md | 2 +- .../examples/rand/RandomFirehoseFactory.java | 2 +- .../io/druid/indexer/HadoopTuningConfig.java | 20 +- .../io/druid/indexer/IndexGeneratorJob.java | 9 +- .../indexing/common/config/TaskConfig.java | 10 + .../druid/indexing/common/task/IndexTask.java | 37 +- .../common/task/RealtimeIndexTask.java | 92 ++--- .../indexing/overlord/TaskLifecycleTest.java | 346 +++++++++--------- .../segment/incremental/IncrementalIndex.java | 8 - .../indexing/RealtimeTuningConfig.java | 18 +- .../segment/realtime/RealtimeManager.java | 4 +- .../segment/realtime/plumber/Plumber.java | 2 - .../druid/segment/realtime/plumber/Sink.java | 14 +- .../segment/realtime/RealtimeManagerTest.java | 14 +- 15 files changed, 284 insertions(+), 302 deletions(-) diff --git a/docs/content/Realtime-ingestion.md b/docs/content/Realtime-ingestion.md index a2aa1854fec..9849946f0a8 100644 --- a/docs/content/Realtime-ingestion.md +++ b/docs/content/Realtime-ingestion.md @@ -41,7 +41,7 @@ The property `druid.realtime.specFile` has the path of a file (absolute or relat } }, "config": { - "bufferSize": 500000000, + "maxRowsInMemory": 500000, "intermediatePersistPeriod": "PT10m" }, "firehose": { @@ -104,7 +104,7 @@ This provides configuration for the data processing portion of the realtime stre |Field|Type|Description|Required| |-----|----|-----------|--------| |intermediatePersistPeriod|ISO8601 Period String|The period that determines the rate at which intermediate persists occur. These persists determine how often commits happen against the incoming realtime stream. If the realtime data loading process is interrupted at time T, it should be restarted to re-read data that arrived at T minus this period.|yes| -|bufferSize|Number|The size in bytes of buffer to be used for ingestion. When the buffer is full intermediate rows will be persisted to disk.|yes| +|maxRowsInMemory|Number|The number of rows to aggregate before persisting. This number is the post-aggregation rows, so it is not equivalent to the number of input events, but the number of aggregated rows that those events result in. This is used to manage the required JVM heap size.|yes| ### Firehose @@ -132,8 +132,8 @@ The following table summarizes constraints between settings in the spec file for | windowPeriod| when reading an InputRow, events with timestamp older than now minus this window are discarded | time jitter tolerance | use this to reject outliers | | segmentGranularity| time granularity (minute, hour, day, week, month) for loading data at query time | equal to indexGranularity| more than indexGranularity| | indexGranularity| time granularity (minute, hour, day, week, month) of indexes | less than segmentGranularity| minute, hour, day, week, month | -| intermediatePersistPeriod| the max real time (ISO8601 Period) between flushes of InputRows from memory to disk | avoid excessive flushing | size of un-persisted rows in memory also constrained by bufferSize | -| bufferSize| size of offheap buffer to be used to hold Input Rows before a flush to disk | +| intermediatePersistPeriod| the max real time (ISO8601 Period) between flushes of InputRows from memory to disk | avoid excessive flushing | number of un-persisted rows in memory also constrained by maxRowsInMemory | +| maxRowsInMemory| the max number of InputRows to hold in memory before a flush to disk | number of un-persisted post-aggregation rows in memory is also constrained by intermediatePersistPeriod | use this to avoid running out of heap if too many rows in an intermediatePersistPeriod | The normal, expected use cases have the following overall constraints: `indexGranularity < intermediatePersistPeriod =< windowPeriod < segmentGranularity` diff --git a/docs/content/Tasks.md b/docs/content/Tasks.md index bce0f549e1a..6c5df35d25f 100644 --- a/docs/content/Tasks.md +++ b/docs/content/Tasks.md @@ -57,7 +57,7 @@ The Index Task is a simpler variation of the Index Hadoop task that is designed |indexGranularity|The rollup granularity for timestamps. See [Realtime Ingestion](Realtime-ingestion.html) for more information. |no| |targetPartitionSize|Used in sharding. Determines how many rows are in each segment.|no| |firehose|The input source of data. For more info, see [Firehose](Firehose.html).|yes| -|bufferSize|Used in determining the size of offheap buffer to be used to store intermediate rows. When the buffer gets full, rows are persisted to disk.|no| +|rowFlushBoundary|Used in determining when intermediate persist should occur to disk.|no| ### Index Hadoop Task diff --git a/examples/src/main/java/io/druid/examples/rand/RandomFirehoseFactory.java b/examples/src/main/java/io/druid/examples/rand/RandomFirehoseFactory.java index 023b6ed6681..e395cf02dcb 100644 --- a/examples/src/main/java/io/druid/examples/rand/RandomFirehoseFactory.java +++ b/examples/src/main/java/io/druid/examples/rand/RandomFirehoseFactory.java @@ -65,7 +65,7 @@ import static java.lang.Thread.sleep; * {"type":"doubleSum","name":"outColumn","fieldName":"inColumn"} ], * "indexGranularity":"minute", * "shardSpec" : { "type": "none" } }, - * "config" : { "bufferSize" : 50000000, + * "config" : { "maxRowsInMemory" : 50000, * "intermediatePersistPeriod" : "PT2m" }, * * "firehose" : { "type" : "rand", diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/HadoopTuningConfig.java b/indexing-hadoop/src/main/java/io/druid/indexer/HadoopTuningConfig.java index 410b3a41659..4af7eaa200e 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/HadoopTuningConfig.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/HadoopTuningConfig.java @@ -38,7 +38,7 @@ public class HadoopTuningConfig implements TuningConfig { private static final PartitionsSpec defaultPartitionsSpec = HashedPartitionsSpec.makeDefaultHashedPartitionsSpec(); private static final Map> defaultShardSpecs = ImmutableMap.>of(); - private static final int defaultBufferSize = 512 * 1024 * 1024; + private static final int defaultRowFlushBoundary = 80000; public static HadoopTuningConfig makeDefaultTuningConfig() { @@ -47,7 +47,7 @@ public class HadoopTuningConfig implements TuningConfig new DateTime().toString(), defaultPartitionsSpec, defaultShardSpecs, - defaultBufferSize, + defaultRowFlushBoundary, false, true, false, @@ -62,7 +62,7 @@ public class HadoopTuningConfig implements TuningConfig private final String version; private final PartitionsSpec partitionsSpec; private final Map> shardSpecs; - private final int bufferSize; + private final int rowFlushBoundary; private final boolean leaveIntermediate; private final Boolean cleanupOnFailure; private final boolean overwriteFiles; @@ -77,7 +77,7 @@ public class HadoopTuningConfig implements TuningConfig final @JsonProperty("version") String version, final @JsonProperty("partitionsSpec") PartitionsSpec partitionsSpec, final @JsonProperty("shardSpecs") Map> shardSpecs, - final @JsonProperty("bufferSize") Integer bufferSize, + final @JsonProperty("rowFlushBoundary") Integer rowFlushBoundary, final @JsonProperty("leaveIntermediate") boolean leaveIntermediate, final @JsonProperty("cleanupOnFailure") Boolean cleanupOnFailure, final @JsonProperty("overwriteFiles") boolean overwriteFiles, @@ -91,7 +91,7 @@ public class HadoopTuningConfig implements TuningConfig this.version = version == null ? new DateTime().toString() : version; this.partitionsSpec = partitionsSpec == null ? defaultPartitionsSpec : partitionsSpec; this.shardSpecs = shardSpecs == null ? defaultShardSpecs : shardSpecs; - this.bufferSize = bufferSize == null ? defaultBufferSize : bufferSize; + this.rowFlushBoundary = rowFlushBoundary == null ? defaultRowFlushBoundary : rowFlushBoundary; this.leaveIntermediate = leaveIntermediate; this.cleanupOnFailure = cleanupOnFailure == null ? true : cleanupOnFailure; this.overwriteFiles = overwriteFiles; @@ -128,9 +128,9 @@ public class HadoopTuningConfig implements TuningConfig } @JsonProperty - public int getBufferSize() + public int getRowFlushBoundary() { - return bufferSize; + return rowFlushBoundary; } @JsonProperty @@ -182,7 +182,7 @@ public class HadoopTuningConfig implements TuningConfig version, partitionsSpec, shardSpecs, - bufferSize, + rowFlushBoundary, leaveIntermediate, cleanupOnFailure, overwriteFiles, @@ -200,7 +200,7 @@ public class HadoopTuningConfig implements TuningConfig ver, partitionsSpec, shardSpecs, - bufferSize, + rowFlushBoundary, leaveIntermediate, cleanupOnFailure, overwriteFiles, @@ -218,7 +218,7 @@ public class HadoopTuningConfig implements TuningConfig version, partitionsSpec, specs, - bufferSize, + rowFlushBoundary, leaveIntermediate, cleanupOnFailure, overwriteFiles, diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java b/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java index 13b9cb9b2cf..1b6878270f4 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java @@ -347,7 +347,7 @@ public class IndexGeneratorJob implements Jobby int numRows = index.add(inputRow); ++lineCount; - if (index.isFull()) { + if (numRows >= config.getSchema().getTuningConfig().getRowFlushBoundary()) { log.info( "%,d lines to %,d rows in %,d millis", lineCount - runningTotalLineCount, @@ -630,6 +630,11 @@ public class IndexGeneratorJob implements Jobby private IncrementalIndex makeIncrementalIndex(Bucket theBucket, AggregatorFactory[] aggs) { + int aggsSize = 0; + for (AggregatorFactory agg : aggs) { + aggsSize += agg.getMaxIntermediateSize(); + } + int bufferSize = aggsSize * config.getSchema().getTuningConfig().getRowFlushBoundary(); return new IncrementalIndex( new IncrementalIndexSchema.Builder() .withMinTimestamp(theBucket.time.getMillis()) @@ -637,7 +642,7 @@ public class IndexGeneratorJob implements Jobby .withQueryGranularity(config.getSchema().getDataSchema().getGranularitySpec().getQueryGranularity()) .withMetrics(aggs) .build(), - new OffheapBufferPool(config.getSchema().getTuningConfig().getBufferSize()) + new OffheapBufferPool(bufferSize) ); } diff --git a/indexing-service/src/main/java/io/druid/indexing/common/config/TaskConfig.java b/indexing-service/src/main/java/io/druid/indexing/common/config/TaskConfig.java index 6d60d002484..1671fb84bd3 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/config/TaskConfig.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/config/TaskConfig.java @@ -41,6 +41,9 @@ public class TaskConfig @JsonProperty private final String hadoopWorkingPath; + @JsonProperty + private final int defaultRowFlushBoundary; + @JsonProperty private final List defaultHadoopCoordinates; @@ -56,6 +59,7 @@ public class TaskConfig this.baseDir = baseDir == null ? "/tmp" : baseDir; this.baseTaskDir = new File(defaultDir(baseTaskDir, "persistent/task")); this.hadoopWorkingPath = defaultDir(hadoopWorkingPath, "druid-indexing"); + this.defaultRowFlushBoundary = defaultRowFlushBoundary == null ? 500000 : defaultRowFlushBoundary; this.defaultHadoopCoordinates = defaultHadoopCoordinates == null ? DEFAULT_DEFAULT_HADOOP_COORDINATES : defaultHadoopCoordinates; @@ -79,6 +83,12 @@ public class TaskConfig return hadoopWorkingPath; } + @JsonProperty + public int getDefaultRowFlushBoundary() + { + return defaultRowFlushBoundary; + } + @JsonProperty public List getDefaultHadoopCoordinates() { diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java index 48f93214303..c477cb5d153 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java @@ -55,7 +55,6 @@ import io.druid.segment.indexing.granularity.GranularitySpec; import io.druid.segment.loading.DataSegmentPusher; import io.druid.segment.realtime.FireDepartmentMetrics; import io.druid.segment.realtime.plumber.Plumber; -import io.druid.segment.realtime.plumber.Sink; import io.druid.timeline.DataSegment; import io.druid.timeline.partition.HashBasedNumberedShardSpec; import io.druid.timeline.partition.NoneShardSpec; @@ -148,7 +147,7 @@ public class IndexTask extends AbstractFixedIntervalTask granularitySpec.withQueryGranularity(indexGranularity == null ? QueryGranularity.NONE : indexGranularity) ), new IndexIOConfig(firehoseFactory), - new IndexTuningConfig(targetPartitionSize, null, null) + new IndexTuningConfig(targetPartitionSize, 0, null) ); } this.jsonMapper = jsonMapper; @@ -373,6 +372,7 @@ public class IndexTask extends AbstractFixedIntervalTask ); final FirehoseFactory firehoseFactory = ingestionSchema.getIOConfig().getFirehoseFactory(); + final int rowFlushBoundary = ingestionSchema.getTuningConfig().getRowFlushBoundary(); // We need to track published segments. final List pushedSegments = new CopyOnWriteArrayList(); @@ -403,20 +403,15 @@ public class IndexTask extends AbstractFixedIntervalTask tmpDir ).findPlumber( schema, - new RealtimeTuningConfig( - ingestionSchema.getTuningConfig().getBufferSize(), - null, - null, - null, - null, - null, - null, - shardSpec, - null - ), + new RealtimeTuningConfig(null, null, null, null, null, null, null, shardSpec, null), metrics ); + // rowFlushBoundary for this job + final int myRowFlushBoundary = rowFlushBoundary > 0 + ? rowFlushBoundary + : toolbox.getConfig().getDefaultRowFlushBoundary(); + try { plumber.startJob(); @@ -434,8 +429,8 @@ public class IndexTask extends AbstractFixedIntervalTask ); } metrics.incrementProcessed(); - Sink sink = plumber.getSink(inputRow.getTimestampFromEpoch()); - if (sink != null && sink.isFull()) { + + if (numRows >= myRowFlushBoundary) { plumber.persist(firehose.commit()); } } else { @@ -553,21 +548,21 @@ public class IndexTask extends AbstractFixedIntervalTask public static class IndexTuningConfig implements TuningConfig { private static final int DEFAULT_TARGET_PARTITION_SIZE = 5000000; - private static final int DEFAULT_BUFFER_SIZE = 512 * 1024 * 1024; + private static final int DEFAULT_ROW_FLUSH_BOUNDARY = 500000; private final int targetPartitionSize; - private final int bufferSize; + private final int rowFlushBoundary; private final int numShards; @JsonCreator public IndexTuningConfig( @JsonProperty("targetPartitionSize") int targetPartitionSize, - @JsonProperty("bufferSize") @Nullable Integer bufferSize, + @JsonProperty("rowFlushBoundary") int rowFlushBoundary, @JsonProperty("numShards") @Nullable Integer numShards ) { this.targetPartitionSize = targetPartitionSize == 0 ? DEFAULT_TARGET_PARTITION_SIZE : targetPartitionSize; - this.bufferSize = bufferSize == null ? DEFAULT_BUFFER_SIZE : bufferSize; + this.rowFlushBoundary = rowFlushBoundary == 0 ? DEFAULT_ROW_FLUSH_BOUNDARY : rowFlushBoundary; this.numShards = numShards == null ? -1 : numShards; Preconditions.checkArgument( this.targetPartitionSize == -1 || this.numShards == -1, @@ -582,9 +577,9 @@ public class IndexTask extends AbstractFixedIntervalTask } @JsonProperty - public int getBufferSize() + public int getRowFlushBoundary() { - return bufferSize; + return rowFlushBoundary; } @JsonProperty diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java index 34a1c5c9bff..6dd99d0806d 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java @@ -19,6 +19,7 @@ package io.druid.indexing.common.task; +import com.fasterxml.jackson.annotation.JacksonInject; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; @@ -43,6 +44,7 @@ import io.druid.query.QueryRunner; import io.druid.query.QueryRunnerFactory; import io.druid.query.QueryRunnerFactoryConglomerate; import io.druid.query.QueryToolChest; +import io.druid.segment.column.ColumnConfig; import io.druid.segment.indexing.DataSchema; import io.druid.segment.indexing.RealtimeIOConfig; import io.druid.segment.indexing.RealtimeTuningConfig; @@ -55,7 +57,6 @@ import io.druid.segment.realtime.SegmentPublisher; import io.druid.segment.realtime.plumber.Plumber; import io.druid.segment.realtime.plumber.RealtimePlumberSchool; import io.druid.segment.realtime.plumber.RejectionPolicyFactory; -import io.druid.segment.realtime.plumber.Sink; import io.druid.segment.realtime.plumber.VersioningPolicy; import io.druid.server.coordination.DataSegmentAnnouncer; import io.druid.timeline.DataSegment; @@ -69,10 +70,36 @@ import java.io.IOException; public class RealtimeIndexTask extends AbstractTask { private static final EmittingLogger log = new EmittingLogger(RealtimeIndexTask.class); + + private static String makeTaskId(FireDepartment fireDepartment, Schema schema) + { + // Backwards compatible + if (fireDepartment == null) { + return String.format( + "index_realtime_%s_%d_%s", + schema.getDataSource(), schema.getShardSpec().getPartitionNum(), new DateTime().toString() + ); + } else { + return String.format( + "index_realtime_%s_%d_%s", + fireDepartment.getDataSchema().getDataSource(), + fireDepartment.getTuningConfig().getShardSpec().getPartitionNum(), + new DateTime().toString() + ); + } + } + + private static String makeDatasource(FireDepartment fireDepartment, Schema schema) + { + return (fireDepartment != null) ? fireDepartment.getDataSchema().getDataSource() : schema.getDataSource(); + } + @JsonIgnore private final FireDepartment spec; + @JsonIgnore private volatile Plumber plumber = null; + @JsonIgnore private volatile QueryRunnerFactoryConglomerate queryRunnerFactoryConglomerate = null; @@ -126,29 +153,6 @@ public class RealtimeIndexTask extends AbstractTask } } - private static String makeTaskId(FireDepartment fireDepartment, Schema schema) - { - // Backwards compatible - if (fireDepartment == null) { - return String.format( - "index_realtime_%s_%d_%s", - schema.getDataSource(), schema.getShardSpec().getPartitionNum(), new DateTime().toString() - ); - } else { - return String.format( - "index_realtime_%s_%d_%s", - fireDepartment.getDataSchema().getDataSource(), - fireDepartment.getTuningConfig().getShardSpec().getPartitionNum(), - new DateTime().toString() - ); - } - } - - private static String makeDatasource(FireDepartment fireDepartment, Schema schema) - { - return (fireDepartment != null) ? fireDepartment.getDataSchema().getDataSource() : schema.getDataSource(); - } - @Override public String getType() { @@ -260,30 +264,29 @@ public class RealtimeIndexTask extends AbstractTask // Shouldn't usually happen, since we don't expect people to submit tasks that intersect with the // realtime window, but if they do it can be problematic. If we decide to care, we can use more threads in // the plumber such that waiting for the coordinator doesn't block data processing. - final VersioningPolicy versioningPolicy = new - VersioningPolicy() - { - @Override - public String getVersion(final Interval interval) - { - try { - // Side effect: Calling getVersion causes a lock to be acquired - final TaskLock myLock = toolbox.getTaskActionClient() - .submit(new LockAcquireAction(interval)); + final VersioningPolicy versioningPolicy = new VersioningPolicy() + { + @Override + public String getVersion(final Interval interval) + { + try { + // Side effect: Calling getVersion causes a lock to be acquired + final TaskLock myLock = toolbox.getTaskActionClient() + .submit(new LockAcquireAction(interval)); - return myLock.getVersion(); - } - catch (IOException e) { - throw Throwables.propagate(e); - } - } - }; + return myLock.getVersion(); + } + catch (IOException e) { + throw Throwables.propagate(e); + } + } + }; DataSchema dataSchema = spec.getDataSchema(); RealtimeIOConfig realtimeIOConfig = spec.getIOConfig(); RealtimeTuningConfig tuningConfig = spec.getTuningConfig() - .withBasePersistDirectory(new File(toolbox.getTaskWorkDir(), "persist")) - .withVersioningPolicy(versioningPolicy); + .withBasePersistDirectory(new File(toolbox.getTaskWorkDir(), "persist")) + .withVersioningPolicy(versioningPolicy); final FireDepartment fireDepartment = new FireDepartment( dataSchema, @@ -350,8 +353,7 @@ public class RealtimeIndexTask extends AbstractTask } fireDepartment.getMetrics().incrementProcessed(); - final Sink sink = plumber.getSink(inputRow.getTimestampFromEpoch()); - if ((sink != null && sink.isFull()) || System.currentTimeMillis() > nextFlush) { + if (currCount >= tuningConfig.getMaxRowsInMemory() || System.currentTimeMillis() > nextFlush) { plumber.persist(firehose.commit()); nextFlush = new DateTime().plus(intermediatePersistPeriod).getMillis(); } diff --git a/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java b/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java index a8a0e7d526d..33411036e11 100644 --- a/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java @@ -43,12 +43,14 @@ import io.druid.data.input.InputRow; import io.druid.data.input.MapBasedInputRow; import io.druid.data.input.impl.InputRowParser; import io.druid.granularity.QueryGranularity; +import io.druid.indexing.common.TestUtils; +import io.druid.segment.column.ColumnConfig; +import io.druid.segment.indexing.granularity.UniformGranularitySpec; import io.druid.indexing.common.SegmentLoaderFactory; import io.druid.indexing.common.TaskLock; import io.druid.indexing.common.TaskStatus; import io.druid.indexing.common.TaskToolbox; import io.druid.indexing.common.TaskToolboxFactory; -import io.druid.indexing.common.TestUtils; import io.druid.indexing.common.actions.LocalTaskActionClientFactory; import io.druid.indexing.common.actions.LockListAction; import io.druid.indexing.common.actions.SegmentInsertAction; @@ -65,8 +67,6 @@ import io.druid.indexing.overlord.config.TaskQueueConfig; import io.druid.jackson.DefaultObjectMapper; import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.DoubleSumAggregatorFactory; -import io.druid.segment.indexing.DataSchema; -import io.druid.segment.indexing.granularity.UniformGranularitySpec; import io.druid.segment.loading.DataSegmentArchiver; import io.druid.segment.loading.DataSegmentKiller; import io.druid.segment.loading.DataSegmentMover; @@ -96,15 +96,6 @@ import java.util.Set; public class TaskLifecycleTest { - private static final Ordering byIntervalOrdering = new Ordering() - { - @Override - public int compare(DataSegment dataSegment, DataSegment dataSegment2) - { - return Comparators.intervalsByStartThenEnd().compare(dataSegment.getInterval(), dataSegment2.getInterval()); - } - }; - TaskStorageQueryAdapter tsqa = null; private File tmp = null; private TaskStorage ts = null; private TaskLockbox tl = null; @@ -113,144 +104,16 @@ public class TaskLifecycleTest private MockIndexerDBCoordinator mdc = null; private TaskActionClientFactory tac = null; private TaskToolboxFactory tb = null; + TaskStorageQueryAdapter tsqa = null; - private static MockIndexerDBCoordinator newMockMDC() + private static final Ordering byIntervalOrdering = new Ordering() { - return new MockIndexerDBCoordinator(); - } - - private static ServiceEmitter newMockEmitter() - { - return new ServiceEmitter(null, null, null) + @Override + public int compare(DataSegment dataSegment, DataSegment dataSegment2) { - @Override - public void emit(Event event) - { - - } - - @Override - public void emit(ServiceEventBuilder builder) - { - - } - }; - } - - private static InputRow IR(String dt, String dim1, String dim2, float met) - { - return new MapBasedInputRow( - new DateTime(dt).getMillis(), - ImmutableList.of("dim1", "dim2"), - ImmutableMap.of( - "dim1", dim1, - "dim2", dim2, - "met", met - ) - ); - } - - private static FirehoseFactory newMockExceptionalFirehoseFactory() - { - return new FirehoseFactory() - { - @Override - public Firehose connect(InputRowParser parser) throws IOException - { - return new Firehose() - { - @Override - public boolean hasMore() - { - return true; - } - - @Override - public InputRow nextRow() - { - throw new RuntimeException("HA HA HA"); - } - - @Override - public Runnable commit() - { - return new Runnable() - { - @Override - public void run() - { - - } - }; - } - - @Override - public void close() throws IOException - { - - } - }; - } - - @Override - public InputRowParser getParser() - { - return null; - } - }; - } - - private static FirehoseFactory newMockFirehoseFactory(final Iterable inputRows) - { - return new FirehoseFactory() - { - @Override - public Firehose connect(InputRowParser parser) throws IOException - { - final Iterator inputRowIterator = inputRows.iterator(); - - return new Firehose() - { - @Override - public boolean hasMore() - { - return inputRowIterator.hasNext(); - } - - @Override - public InputRow nextRow() - { - return inputRowIterator.next(); - } - - @Override - public Runnable commit() - { - return new Runnable() - { - @Override - public void run() - { - - } - }; - } - - @Override - public void close() throws IOException - { - - } - }; - } - - @Override - public InputRowParser getParser() - { - return null; - } - }; - } + return Comparators.intervalsByStartThenEnd().compare(dataSegment.getInterval(), dataSegment2.getInterval()); + } + }; @Before public void setUp() throws Exception @@ -368,35 +231,25 @@ public class TaskLifecycleTest { final Task indexTask = new IndexTask( null, - new IndexTask.IndexIngestionSpec( - new DataSchema( - "foo", - null, - new AggregatorFactory[]{new DoubleSumAggregatorFactory("met", "met")}, - new UniformGranularitySpec( - Granularity.DAY, - null, - ImmutableList.of(new Interval("2010-01-01/P2D")), - Granularity.DAY - ) - ), new IndexTask.IndexIOConfig( - newMockFirehoseFactory( - ImmutableList.of( - IR("2010-01-01T01", "x", "y", 1), - IR("2010-01-01T01", "x", "z", 1), - IR("2010-01-02T01", "a", "b", 2), - IR("2010-01-02T01", "a", "c", 1) - ) + null, + "foo", + new UniformGranularitySpec( + Granularity.DAY, + null, + ImmutableList.of(new Interval("2010-01-01/P2D")), + Granularity.DAY + ), + new AggregatorFactory[]{new DoubleSumAggregatorFactory("met", "met")}, + QueryGranularity.NONE, + 10000, + newMockFirehoseFactory( + ImmutableList.of( + IR("2010-01-01T01", "x", "y", 1), + IR("2010-01-01T01", "x", "z", 1), + IR("2010-01-02T01", "a", "b", 2), + IR("2010-01-02T01", "a", "c", 1) ) ), - new IndexTask.IndexTuningConfig(10000, 5 * 1024 * 1024, -1) - ), - null, - null, - null, - null, - -1, - null, -1, TestUtils.MAPPER ); @@ -441,12 +294,7 @@ public class TaskLifecycleTest null, null, "foo", - new UniformGranularitySpec( - Granularity.DAY, - null, - ImmutableList.of(new Interval("2010-01-01/P1D")), - Granularity.DAY - ), + new UniformGranularitySpec(Granularity.DAY, null, ImmutableList.of(new Interval("2010-01-01/P1D")), Granularity.DAY), new AggregatorFactory[]{new DoubleSumAggregatorFactory("met", "met")}, QueryGranularity.NONE, 10000, @@ -713,4 +561,142 @@ public class TaskLifecycleTest return ImmutableSet.copyOf(nuked); } } + + private static MockIndexerDBCoordinator newMockMDC() + { + return new MockIndexerDBCoordinator(); + } + + private static ServiceEmitter newMockEmitter() + { + return new ServiceEmitter(null, null, null) + { + @Override + public void emit(Event event) + { + + } + + @Override + public void emit(ServiceEventBuilder builder) + { + + } + }; + } + + private static InputRow IR(String dt, String dim1, String dim2, float met) + { + return new MapBasedInputRow( + new DateTime(dt).getMillis(), + ImmutableList.of("dim1", "dim2"), + ImmutableMap.of( + "dim1", dim1, + "dim2", dim2, + "met", met + ) + ); + } + + private static FirehoseFactory newMockExceptionalFirehoseFactory() + { + return new FirehoseFactory() + { + @Override + public Firehose connect(InputRowParser parser) throws IOException + { + return new Firehose() + { + @Override + public boolean hasMore() + { + return true; + } + + @Override + public InputRow nextRow() + { + throw new RuntimeException("HA HA HA"); + } + + @Override + public Runnable commit() + { + return new Runnable() + { + @Override + public void run() + { + + } + }; + } + + @Override + public void close() throws IOException + { + + } + }; + } + + @Override + public InputRowParser getParser() + { + return null; + } + }; + } + + private static FirehoseFactory newMockFirehoseFactory(final Iterable inputRows) + { + return new FirehoseFactory() + { + @Override + public Firehose connect(InputRowParser parser) throws IOException + { + final Iterator inputRowIterator = inputRows.iterator(); + + return new Firehose() + { + @Override + public boolean hasMore() + { + return inputRowIterator.hasNext(); + } + + @Override + public InputRow nextRow() + { + return inputRowIterator.next(); + } + + @Override + public Runnable commit() + { + return new Runnable() + { + @Override + public void run() + { + + } + }; + } + + @Override + public void close() throws IOException + { + + } + }; + } + + @Override + public InputRowParser getParser() + { + return null; + } + }; + } } diff --git a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java index c1ab7d49327..14be82e7ae2 100644 --- a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java +++ b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java @@ -466,14 +466,6 @@ public class IncrementalIndex implements Iterable, Closeable return numEntries.get() == 0; } - /** - * @return true if the underlying buffer for IncrementalIndex is full and cannot accommodate more rows. - */ - public boolean isFull() - { - return (numEntries.get() + 1) * totalAggSize > bufferHolder.get().limit(); - } - public int size() { return numEntries.get(); diff --git a/server/src/main/java/io/druid/segment/indexing/RealtimeTuningConfig.java b/server/src/main/java/io/druid/segment/indexing/RealtimeTuningConfig.java index 60c19308650..a4d60bfe77a 100644 --- a/server/src/main/java/io/druid/segment/indexing/RealtimeTuningConfig.java +++ b/server/src/main/java/io/druid/segment/indexing/RealtimeTuningConfig.java @@ -36,7 +36,7 @@ import java.io.File; */ public class RealtimeTuningConfig implements TuningConfig { - private static final int defaultBufferSize = 256 * 1024 * 1024; + private static final int defaultMaxRowsInMemory = 500000; private static final Period defaultIntermediatePersistPeriod = new Period("PT10M"); private static final Period defaultWindowPeriod = new Period("PT10M"); private static final File defaultBasePersistDirectory = Files.createTempDir(); @@ -50,7 +50,7 @@ public class RealtimeTuningConfig implements TuningConfig public static RealtimeTuningConfig makeDefaultTuningConfig() { return new RealtimeTuningConfig( - defaultBufferSize, + defaultMaxRowsInMemory, defaultIntermediatePersistPeriod, defaultWindowPeriod, defaultBasePersistDirectory, @@ -62,7 +62,7 @@ public class RealtimeTuningConfig implements TuningConfig ); } - private final int bufferSize; + private final int maxRowsInMemory; private final Period intermediatePersistPeriod; private final Period windowPeriod; private final File basePersistDirectory; @@ -74,7 +74,7 @@ public class RealtimeTuningConfig implements TuningConfig @JsonCreator public RealtimeTuningConfig( - @JsonProperty("bufferSize") Integer bufferSize, + @JsonProperty("maxRowsInMemory") Integer maxRowsInMemory, @JsonProperty("intermediatePersistPeriod") Period intermediatePersistPeriod, @JsonProperty("windowPeriod") Period windowPeriod, @JsonProperty("basePersistDirectory") File basePersistDirectory, @@ -85,7 +85,7 @@ public class RealtimeTuningConfig implements TuningConfig @JsonProperty("persistInHeap") Boolean persistInHeap ) { - this.bufferSize = bufferSize == null ? defaultBufferSize : bufferSize; + this.maxRowsInMemory = maxRowsInMemory == null ? defaultMaxRowsInMemory : maxRowsInMemory; this.intermediatePersistPeriod = intermediatePersistPeriod == null ? defaultIntermediatePersistPeriod : intermediatePersistPeriod; @@ -101,9 +101,9 @@ public class RealtimeTuningConfig implements TuningConfig } @JsonProperty - public int getBufferSize() + public int getMaxRowsInMemory() { - return bufferSize; + return maxRowsInMemory; } @JsonProperty @@ -157,7 +157,7 @@ public class RealtimeTuningConfig implements TuningConfig public RealtimeTuningConfig withVersioningPolicy(VersioningPolicy policy) { return new RealtimeTuningConfig( - bufferSize, + maxRowsInMemory, intermediatePersistPeriod, windowPeriod, basePersistDirectory, @@ -172,7 +172,7 @@ public class RealtimeTuningConfig implements TuningConfig public RealtimeTuningConfig withBasePersistDirectory(File dir) { return new RealtimeTuningConfig( - bufferSize, + maxRowsInMemory, intermediatePersistPeriod, windowPeriod, dir, diff --git a/server/src/main/java/io/druid/segment/realtime/RealtimeManager.java b/server/src/main/java/io/druid/segment/realtime/RealtimeManager.java index 5ba960925aa..a1cfb220972 100644 --- a/server/src/main/java/io/druid/segment/realtime/RealtimeManager.java +++ b/server/src/main/java/io/druid/segment/realtime/RealtimeManager.java @@ -43,7 +43,6 @@ import io.druid.query.SegmentDescriptor; import io.druid.segment.indexing.DataSchema; import io.druid.segment.indexing.RealtimeTuningConfig; import io.druid.segment.realtime.plumber.Plumber; -import io.druid.segment.realtime.plumber.Sink; import org.joda.time.DateTime; import org.joda.time.Interval; import org.joda.time.Period; @@ -209,8 +208,7 @@ public class RealtimeManager implements QuerySegmentWalker continue; } - final Sink sink = plumber.getSink(inputRow.getTimestampFromEpoch()); - if ((sink != null && sink.isFull()) || System.currentTimeMillis() > nextFlush) { + if (currCount >= config.getMaxRowsInMemory() || System.currentTimeMillis() > nextFlush) { plumber.persist(firehose.commit()); nextFlush = new DateTime().plus(intermediatePersistPeriod).getMillis(); } diff --git a/server/src/main/java/io/druid/segment/realtime/plumber/Plumber.java b/server/src/main/java/io/druid/segment/realtime/plumber/Plumber.java index 3824d30d1eb..136d3a8a253 100644 --- a/server/src/main/java/io/druid/segment/realtime/plumber/Plumber.java +++ b/server/src/main/java/io/druid/segment/realtime/plumber/Plumber.java @@ -52,6 +52,4 @@ public interface Plumber * fed into sinks and persisted. */ public void finishJob(); - - public Sink getSink(long timeStamp); } diff --git a/server/src/main/java/io/druid/segment/realtime/plumber/Sink.java b/server/src/main/java/io/druid/segment/realtime/plumber/Sink.java index 03315201520..6a557b44822 100644 --- a/server/src/main/java/io/druid/segment/realtime/plumber/Sink.java +++ b/server/src/main/java/io/druid/segment/realtime/plumber/Sink.java @@ -133,13 +133,6 @@ public class Sink implements Iterable } } - public boolean isFull() - { - synchronized (currHydrant){ - return currHydrant != null && currHydrant.getIndex().isFull(); - } - } - /** * If currHydrant is A, creates a new index B, sets currHydrant to B and returns A. * @@ -183,6 +176,11 @@ public class Sink implements Iterable private FireHydrant makeNewCurrIndex(long minTimestamp, DataSchema schema) { + int aggsSize = 0; + for (AggregatorFactory agg : schema.getAggregators()) { + aggsSize += agg.getMaxIntermediateSize(); + } + int bufferSize = aggsSize * config.getMaxRowsInMemory(); IncrementalIndex newIndex = new IncrementalIndex( new IncrementalIndexSchema.Builder() .withMinTimestamp(minTimestamp) @@ -190,7 +188,7 @@ public class Sink implements Iterable .withDimensionsSpec(schema.getParser()) .withMetrics(schema.getAggregators()) .build(), - new OffheapBufferPool(config.getBufferSize()) + new OffheapBufferPool(bufferSize) ); FireHydrant old; diff --git a/server/src/test/java/io/druid/segment/realtime/RealtimeManagerTest.java b/server/src/test/java/io/druid/segment/realtime/RealtimeManagerTest.java index 669c25ca290..d8c86386b8e 100644 --- a/server/src/test/java/io/druid/segment/realtime/RealtimeManagerTest.java +++ b/server/src/test/java/io/druid/segment/realtime/RealtimeManagerTest.java @@ -69,11 +69,11 @@ public class RealtimeManagerTest final List rows = Arrays.asList( makeRow(new DateTime("9000-01-01").getMillis()), makeRow(new DateTime().getMillis()) ); - final AggregatorFactory[] aggs = {new CountAggregatorFactory("rows")}; + schema = new DataSchema( "test", null, - aggs, + new AggregatorFactory[]{new CountAggregatorFactory("rows")}, new UniformGranularitySpec(Granularity.HOUR, QueryGranularity.NONE, null, Granularity.HOUR) ); RealtimeIOConfig ioConfig = new RealtimeIOConfig( @@ -108,13 +108,8 @@ public class RealtimeManagerTest } } ); - int rowSize = 0; - for (AggregatorFactory agg : aggs) { - rowSize += agg.getMaxIntermediateSize(); - } - RealtimeTuningConfig tuningConfig = new RealtimeTuningConfig( - rowSize, + 1, new Period("P1Y"), null, null, @@ -207,6 +202,7 @@ public class RealtimeManagerTest }; } + private static class TestFirehose implements Firehose { private final Iterator rows; @@ -243,6 +239,8 @@ public class RealtimeManagerTest private static class TestPlumber implements Plumber { private final Sink sink; + + private volatile boolean startedJob = false; private volatile boolean finishedJob = false; private volatile int persistCount = 0;