diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java index 4ab040d947d..c9022fa765b 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java @@ -55,6 +55,7 @@ import io.druid.segment.indexing.granularity.GranularitySpec; import io.druid.segment.loading.DataSegmentPusher; import io.druid.segment.realtime.FireDepartmentMetrics; import io.druid.segment.realtime.plumber.Plumber; +import io.druid.segment.realtime.plumber.Sink; import io.druid.timeline.DataSegment; import io.druid.timeline.partition.HashBasedNumberedShardSpec; import io.druid.timeline.partition.NoneShardSpec; @@ -372,7 +373,6 @@ public class IndexTask extends AbstractFixedIntervalTask ); final FirehoseFactory firehoseFactory = ingestionSchema.getIOConfig().getFirehoseFactory(); - final int rowFlushBoundary = ingestionSchema.getTuningConfig().getRowFlushBoundary(); // We need to track published segments. final List pushedSegments = new CopyOnWriteArrayList(); @@ -403,15 +403,10 @@ public class IndexTask extends AbstractFixedIntervalTask tmpDir ).findPlumber( schema, - new RealtimeTuningConfig(null, null, null, null, null, null, null, shardSpec), + new RealtimeTuningConfig(ingestionSchema.getTuningConfig().getBufferSize(), null, null, null, null, null, null, shardSpec), metrics ); - // rowFlushBoundary for this job - final int myRowFlushBoundary = rowFlushBoundary > 0 - ? rowFlushBoundary - : toolbox.getConfig().getDefaultRowFlushBoundary(); - try { plumber.startJob(); @@ -429,8 +424,8 @@ public class IndexTask extends AbstractFixedIntervalTask ); } metrics.incrementProcessed(); - - if (numRows >= myRowFlushBoundary) { + Sink sink = plumber.getSink(inputRow.getTimestampFromEpoch()); + if (sink != null && sink.isFull()) { plumber.persist(firehose.commit()); } } else { @@ -548,21 +543,21 @@ public class IndexTask extends AbstractFixedIntervalTask public static class IndexTuningConfig implements TuningConfig { private static final int DEFAULT_TARGET_PARTITION_SIZE = 5000000; - private static final int DEFAULT_ROW_FLUSH_BOUNDARY = 500000; + private static final int DEFAULT_BUFFER_SIZE = 512 * 1024 * 1024; private final int targetPartitionSize; - private final int rowFlushBoundary; + private final int bufferSize; private final int numShards; @JsonCreator public IndexTuningConfig( @JsonProperty("targetPartitionSize") int targetPartitionSize, - @JsonProperty("rowFlushBoundary") int rowFlushBoundary, + @JsonProperty("bufferSize") int bufferSize, @JsonProperty("numShards") @Nullable Integer numShards ) { this.targetPartitionSize = targetPartitionSize == 0 ? DEFAULT_TARGET_PARTITION_SIZE : targetPartitionSize; - this.rowFlushBoundary = rowFlushBoundary == 0 ? DEFAULT_ROW_FLUSH_BOUNDARY : rowFlushBoundary; + this.bufferSize = bufferSize == 0 ? DEFAULT_BUFFER_SIZE : bufferSize; this.numShards = numShards == null ? -1 : numShards; Preconditions.checkArgument( this.targetPartitionSize == -1 || this.numShards == -1, @@ -577,9 +572,9 @@ public class IndexTask extends AbstractFixedIntervalTask } @JsonProperty - public int getRowFlushBoundary() + public int getBufferSize() { - return rowFlushBoundary; + return bufferSize; } @JsonProperty diff --git a/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java b/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java index 33411036e11..a8a0e7d526d 100644 --- a/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java @@ -43,14 +43,12 @@ import io.druid.data.input.InputRow; import io.druid.data.input.MapBasedInputRow; import io.druid.data.input.impl.InputRowParser; import io.druid.granularity.QueryGranularity; -import io.druid.indexing.common.TestUtils; -import io.druid.segment.column.ColumnConfig; -import io.druid.segment.indexing.granularity.UniformGranularitySpec; import io.druid.indexing.common.SegmentLoaderFactory; import io.druid.indexing.common.TaskLock; import io.druid.indexing.common.TaskStatus; import io.druid.indexing.common.TaskToolbox; import io.druid.indexing.common.TaskToolboxFactory; +import io.druid.indexing.common.TestUtils; import io.druid.indexing.common.actions.LocalTaskActionClientFactory; import io.druid.indexing.common.actions.LockListAction; import io.druid.indexing.common.actions.SegmentInsertAction; @@ -67,6 +65,8 @@ import io.druid.indexing.overlord.config.TaskQueueConfig; import io.druid.jackson.DefaultObjectMapper; import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.DoubleSumAggregatorFactory; +import io.druid.segment.indexing.DataSchema; +import io.druid.segment.indexing.granularity.UniformGranularitySpec; import io.druid.segment.loading.DataSegmentArchiver; import io.druid.segment.loading.DataSegmentKiller; import io.druid.segment.loading.DataSegmentMover; @@ -96,16 +96,6 @@ import java.util.Set; public class TaskLifecycleTest { - private File tmp = null; - private TaskStorage ts = null; - private TaskLockbox tl = null; - private TaskQueue tq = null; - private TaskRunner tr = null; - private MockIndexerDBCoordinator mdc = null; - private TaskActionClientFactory tac = null; - private TaskToolboxFactory tb = null; - TaskStorageQueryAdapter tsqa = null; - private static final Ordering byIntervalOrdering = new Ordering() { @Override @@ -114,6 +104,153 @@ public class TaskLifecycleTest return Comparators.intervalsByStartThenEnd().compare(dataSegment.getInterval(), dataSegment2.getInterval()); } }; + TaskStorageQueryAdapter tsqa = null; + private File tmp = null; + private TaskStorage ts = null; + private TaskLockbox tl = null; + private TaskQueue tq = null; + private TaskRunner tr = null; + private MockIndexerDBCoordinator mdc = null; + private TaskActionClientFactory tac = null; + private TaskToolboxFactory tb = null; + + private static MockIndexerDBCoordinator newMockMDC() + { + return new MockIndexerDBCoordinator(); + } + + private static ServiceEmitter newMockEmitter() + { + return new ServiceEmitter(null, null, null) + { + @Override + public void emit(Event event) + { + + } + + @Override + public void emit(ServiceEventBuilder builder) + { + + } + }; + } + + private static InputRow IR(String dt, String dim1, String dim2, float met) + { + return new MapBasedInputRow( + new DateTime(dt).getMillis(), + ImmutableList.of("dim1", "dim2"), + ImmutableMap.of( + "dim1", dim1, + "dim2", dim2, + "met", met + ) + ); + } + + private static FirehoseFactory newMockExceptionalFirehoseFactory() + { + return new FirehoseFactory() + { + @Override + public Firehose connect(InputRowParser parser) throws IOException + { + return new Firehose() + { + @Override + public boolean hasMore() + { + return true; + } + + @Override + public InputRow nextRow() + { + throw new RuntimeException("HA HA HA"); + } + + @Override + public Runnable commit() + { + return new Runnable() + { + @Override + public void run() + { + + } + }; + } + + @Override + public void close() throws IOException + { + + } + }; + } + + @Override + public InputRowParser getParser() + { + return null; + } + }; + } + + private static FirehoseFactory newMockFirehoseFactory(final Iterable inputRows) + { + return new FirehoseFactory() + { + @Override + public Firehose connect(InputRowParser parser) throws IOException + { + final Iterator inputRowIterator = inputRows.iterator(); + + return new Firehose() + { + @Override + public boolean hasMore() + { + return inputRowIterator.hasNext(); + } + + @Override + public InputRow nextRow() + { + return inputRowIterator.next(); + } + + @Override + public Runnable commit() + { + return new Runnable() + { + @Override + public void run() + { + + } + }; + } + + @Override + public void close() throws IOException + { + + } + }; + } + + @Override + public InputRowParser getParser() + { + return null; + } + }; + } @Before public void setUp() throws Exception @@ -231,25 +368,35 @@ public class TaskLifecycleTest { final Task indexTask = new IndexTask( null, - null, - "foo", - new UniformGranularitySpec( - Granularity.DAY, - null, - ImmutableList.of(new Interval("2010-01-01/P2D")), - Granularity.DAY - ), - new AggregatorFactory[]{new DoubleSumAggregatorFactory("met", "met")}, - QueryGranularity.NONE, - 10000, - newMockFirehoseFactory( - ImmutableList.of( - IR("2010-01-01T01", "x", "y", 1), - IR("2010-01-01T01", "x", "z", 1), - IR("2010-01-02T01", "a", "b", 2), - IR("2010-01-02T01", "a", "c", 1) + new IndexTask.IndexIngestionSpec( + new DataSchema( + "foo", + null, + new AggregatorFactory[]{new DoubleSumAggregatorFactory("met", "met")}, + new UniformGranularitySpec( + Granularity.DAY, + null, + ImmutableList.of(new Interval("2010-01-01/P2D")), + Granularity.DAY + ) + ), new IndexTask.IndexIOConfig( + newMockFirehoseFactory( + ImmutableList.of( + IR("2010-01-01T01", "x", "y", 1), + IR("2010-01-01T01", "x", "z", 1), + IR("2010-01-02T01", "a", "b", 2), + IR("2010-01-02T01", "a", "c", 1) + ) ) ), + new IndexTask.IndexTuningConfig(10000, 5 * 1024 * 1024, -1) + ), + null, + null, + null, + null, + -1, + null, -1, TestUtils.MAPPER ); @@ -294,7 +441,12 @@ public class TaskLifecycleTest null, null, "foo", - new UniformGranularitySpec(Granularity.DAY, null, ImmutableList.of(new Interval("2010-01-01/P1D")), Granularity.DAY), + new UniformGranularitySpec( + Granularity.DAY, + null, + ImmutableList.of(new Interval("2010-01-01/P1D")), + Granularity.DAY + ), new AggregatorFactory[]{new DoubleSumAggregatorFactory("met", "met")}, QueryGranularity.NONE, 10000, @@ -561,142 +713,4 @@ public class TaskLifecycleTest return ImmutableSet.copyOf(nuked); } } - - private static MockIndexerDBCoordinator newMockMDC() - { - return new MockIndexerDBCoordinator(); - } - - private static ServiceEmitter newMockEmitter() - { - return new ServiceEmitter(null, null, null) - { - @Override - public void emit(Event event) - { - - } - - @Override - public void emit(ServiceEventBuilder builder) - { - - } - }; - } - - private static InputRow IR(String dt, String dim1, String dim2, float met) - { - return new MapBasedInputRow( - new DateTime(dt).getMillis(), - ImmutableList.of("dim1", "dim2"), - ImmutableMap.of( - "dim1", dim1, - "dim2", dim2, - "met", met - ) - ); - } - - private static FirehoseFactory newMockExceptionalFirehoseFactory() - { - return new FirehoseFactory() - { - @Override - public Firehose connect(InputRowParser parser) throws IOException - { - return new Firehose() - { - @Override - public boolean hasMore() - { - return true; - } - - @Override - public InputRow nextRow() - { - throw new RuntimeException("HA HA HA"); - } - - @Override - public Runnable commit() - { - return new Runnable() - { - @Override - public void run() - { - - } - }; - } - - @Override - public void close() throws IOException - { - - } - }; - } - - @Override - public InputRowParser getParser() - { - return null; - } - }; - } - - private static FirehoseFactory newMockFirehoseFactory(final Iterable inputRows) - { - return new FirehoseFactory() - { - @Override - public Firehose connect(InputRowParser parser) throws IOException - { - final Iterator inputRowIterator = inputRows.iterator(); - - return new Firehose() - { - @Override - public boolean hasMore() - { - return inputRowIterator.hasNext(); - } - - @Override - public InputRow nextRow() - { - return inputRowIterator.next(); - } - - @Override - public Runnable commit() - { - return new Runnable() - { - @Override - public void run() - { - - } - }; - } - - @Override - public void close() throws IOException - { - - } - }; - } - - @Override - public InputRowParser getParser() - { - return null; - } - }; - } }