use bufferSize for IndexTask

This commit is contained in:
nishantmonu51 2014-08-20 22:39:23 +05:30
parent 5dd3a1dea1
commit fe105d52ee
2 changed files with 193 additions and 184 deletions

View File

@ -55,6 +55,7 @@ import io.druid.segment.indexing.granularity.GranularitySpec;
import io.druid.segment.loading.DataSegmentPusher; import io.druid.segment.loading.DataSegmentPusher;
import io.druid.segment.realtime.FireDepartmentMetrics; import io.druid.segment.realtime.FireDepartmentMetrics;
import io.druid.segment.realtime.plumber.Plumber; import io.druid.segment.realtime.plumber.Plumber;
import io.druid.segment.realtime.plumber.Sink;
import io.druid.timeline.DataSegment; import io.druid.timeline.DataSegment;
import io.druid.timeline.partition.HashBasedNumberedShardSpec; import io.druid.timeline.partition.HashBasedNumberedShardSpec;
import io.druid.timeline.partition.NoneShardSpec; import io.druid.timeline.partition.NoneShardSpec;
@ -372,7 +373,6 @@ public class IndexTask extends AbstractFixedIntervalTask
); );
final FirehoseFactory firehoseFactory = ingestionSchema.getIOConfig().getFirehoseFactory(); final FirehoseFactory firehoseFactory = ingestionSchema.getIOConfig().getFirehoseFactory();
final int rowFlushBoundary = ingestionSchema.getTuningConfig().getRowFlushBoundary();
// We need to track published segments. // We need to track published segments.
final List<DataSegment> pushedSegments = new CopyOnWriteArrayList<DataSegment>(); final List<DataSegment> pushedSegments = new CopyOnWriteArrayList<DataSegment>();
@ -403,15 +403,10 @@ public class IndexTask extends AbstractFixedIntervalTask
tmpDir tmpDir
).findPlumber( ).findPlumber(
schema, schema,
new RealtimeTuningConfig(null, null, null, null, null, null, null, shardSpec), new RealtimeTuningConfig(ingestionSchema.getTuningConfig().getBufferSize(), null, null, null, null, null, null, shardSpec),
metrics metrics
); );
// rowFlushBoundary for this job
final int myRowFlushBoundary = rowFlushBoundary > 0
? rowFlushBoundary
: toolbox.getConfig().getDefaultRowFlushBoundary();
try { try {
plumber.startJob(); plumber.startJob();
@ -429,8 +424,8 @@ public class IndexTask extends AbstractFixedIntervalTask
); );
} }
metrics.incrementProcessed(); metrics.incrementProcessed();
Sink sink = plumber.getSink(inputRow.getTimestampFromEpoch());
if (numRows >= myRowFlushBoundary) { if (sink != null && sink.isFull()) {
plumber.persist(firehose.commit()); plumber.persist(firehose.commit());
} }
} else { } else {
@ -548,21 +543,21 @@ public class IndexTask extends AbstractFixedIntervalTask
public static class IndexTuningConfig implements TuningConfig public static class IndexTuningConfig implements TuningConfig
{ {
private static final int DEFAULT_TARGET_PARTITION_SIZE = 5000000; private static final int DEFAULT_TARGET_PARTITION_SIZE = 5000000;
private static final int DEFAULT_ROW_FLUSH_BOUNDARY = 500000; private static final int DEFAULT_BUFFER_SIZE = 512 * 1024 * 1024;
private final int targetPartitionSize; private final int targetPartitionSize;
private final int rowFlushBoundary; private final int bufferSize;
private final int numShards; private final int numShards;
@JsonCreator @JsonCreator
public IndexTuningConfig( public IndexTuningConfig(
@JsonProperty("targetPartitionSize") int targetPartitionSize, @JsonProperty("targetPartitionSize") int targetPartitionSize,
@JsonProperty("rowFlushBoundary") int rowFlushBoundary, @JsonProperty("bufferSize") int bufferSize,
@JsonProperty("numShards") @Nullable Integer numShards @JsonProperty("numShards") @Nullable Integer numShards
) )
{ {
this.targetPartitionSize = targetPartitionSize == 0 ? DEFAULT_TARGET_PARTITION_SIZE : targetPartitionSize; this.targetPartitionSize = targetPartitionSize == 0 ? DEFAULT_TARGET_PARTITION_SIZE : targetPartitionSize;
this.rowFlushBoundary = rowFlushBoundary == 0 ? DEFAULT_ROW_FLUSH_BOUNDARY : rowFlushBoundary; this.bufferSize = bufferSize == 0 ? DEFAULT_BUFFER_SIZE : bufferSize;
this.numShards = numShards == null ? -1 : numShards; this.numShards = numShards == null ? -1 : numShards;
Preconditions.checkArgument( Preconditions.checkArgument(
this.targetPartitionSize == -1 || this.numShards == -1, this.targetPartitionSize == -1 || this.numShards == -1,
@ -577,9 +572,9 @@ public class IndexTask extends AbstractFixedIntervalTask
} }
@JsonProperty @JsonProperty
public int getRowFlushBoundary() public int getBufferSize()
{ {
return rowFlushBoundary; return bufferSize;
} }
@JsonProperty @JsonProperty

View File

@ -43,14 +43,12 @@ import io.druid.data.input.InputRow;
import io.druid.data.input.MapBasedInputRow; import io.druid.data.input.MapBasedInputRow;
import io.druid.data.input.impl.InputRowParser; import io.druid.data.input.impl.InputRowParser;
import io.druid.granularity.QueryGranularity; import io.druid.granularity.QueryGranularity;
import io.druid.indexing.common.TestUtils;
import io.druid.segment.column.ColumnConfig;
import io.druid.segment.indexing.granularity.UniformGranularitySpec;
import io.druid.indexing.common.SegmentLoaderFactory; import io.druid.indexing.common.SegmentLoaderFactory;
import io.druid.indexing.common.TaskLock; import io.druid.indexing.common.TaskLock;
import io.druid.indexing.common.TaskStatus; import io.druid.indexing.common.TaskStatus;
import io.druid.indexing.common.TaskToolbox; import io.druid.indexing.common.TaskToolbox;
import io.druid.indexing.common.TaskToolboxFactory; import io.druid.indexing.common.TaskToolboxFactory;
import io.druid.indexing.common.TestUtils;
import io.druid.indexing.common.actions.LocalTaskActionClientFactory; import io.druid.indexing.common.actions.LocalTaskActionClientFactory;
import io.druid.indexing.common.actions.LockListAction; import io.druid.indexing.common.actions.LockListAction;
import io.druid.indexing.common.actions.SegmentInsertAction; import io.druid.indexing.common.actions.SegmentInsertAction;
@ -67,6 +65,8 @@ import io.druid.indexing.overlord.config.TaskQueueConfig;
import io.druid.jackson.DefaultObjectMapper; import io.druid.jackson.DefaultObjectMapper;
import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.DoubleSumAggregatorFactory; import io.druid.query.aggregation.DoubleSumAggregatorFactory;
import io.druid.segment.indexing.DataSchema;
import io.druid.segment.indexing.granularity.UniformGranularitySpec;
import io.druid.segment.loading.DataSegmentArchiver; import io.druid.segment.loading.DataSegmentArchiver;
import io.druid.segment.loading.DataSegmentKiller; import io.druid.segment.loading.DataSegmentKiller;
import io.druid.segment.loading.DataSegmentMover; import io.druid.segment.loading.DataSegmentMover;
@ -96,16 +96,6 @@ import java.util.Set;
public class TaskLifecycleTest public class TaskLifecycleTest
{ {
private File tmp = null;
private TaskStorage ts = null;
private TaskLockbox tl = null;
private TaskQueue tq = null;
private TaskRunner tr = null;
private MockIndexerDBCoordinator mdc = null;
private TaskActionClientFactory tac = null;
private TaskToolboxFactory tb = null;
TaskStorageQueryAdapter tsqa = null;
private static final Ordering<DataSegment> byIntervalOrdering = new Ordering<DataSegment>() private static final Ordering<DataSegment> byIntervalOrdering = new Ordering<DataSegment>()
{ {
@Override @Override
@ -114,6 +104,153 @@ public class TaskLifecycleTest
return Comparators.intervalsByStartThenEnd().compare(dataSegment.getInterval(), dataSegment2.getInterval()); return Comparators.intervalsByStartThenEnd().compare(dataSegment.getInterval(), dataSegment2.getInterval());
} }
}; };
TaskStorageQueryAdapter tsqa = null;
private File tmp = null;
private TaskStorage ts = null;
private TaskLockbox tl = null;
private TaskQueue tq = null;
private TaskRunner tr = null;
private MockIndexerDBCoordinator mdc = null;
private TaskActionClientFactory tac = null;
private TaskToolboxFactory tb = null;
private static MockIndexerDBCoordinator newMockMDC()
{
return new MockIndexerDBCoordinator();
}
private static ServiceEmitter newMockEmitter()
{
return new ServiceEmitter(null, null, null)
{
@Override
public void emit(Event event)
{
}
@Override
public void emit(ServiceEventBuilder builder)
{
}
};
}
private static InputRow IR(String dt, String dim1, String dim2, float met)
{
return new MapBasedInputRow(
new DateTime(dt).getMillis(),
ImmutableList.of("dim1", "dim2"),
ImmutableMap.<String, Object>of(
"dim1", dim1,
"dim2", dim2,
"met", met
)
);
}
private static FirehoseFactory newMockExceptionalFirehoseFactory()
{
return new FirehoseFactory()
{
@Override
public Firehose connect(InputRowParser parser) throws IOException
{
return new Firehose()
{
@Override
public boolean hasMore()
{
return true;
}
@Override
public InputRow nextRow()
{
throw new RuntimeException("HA HA HA");
}
@Override
public Runnable commit()
{
return new Runnable()
{
@Override
public void run()
{
}
};
}
@Override
public void close() throws IOException
{
}
};
}
@Override
public InputRowParser getParser()
{
return null;
}
};
}
private static FirehoseFactory newMockFirehoseFactory(final Iterable<InputRow> inputRows)
{
return new FirehoseFactory()
{
@Override
public Firehose connect(InputRowParser parser) throws IOException
{
final Iterator<InputRow> inputRowIterator = inputRows.iterator();
return new Firehose()
{
@Override
public boolean hasMore()
{
return inputRowIterator.hasNext();
}
@Override
public InputRow nextRow()
{
return inputRowIterator.next();
}
@Override
public Runnable commit()
{
return new Runnable()
{
@Override
public void run()
{
}
};
}
@Override
public void close() throws IOException
{
}
};
}
@Override
public InputRowParser getParser()
{
return null;
}
};
}
@Before @Before
public void setUp() throws Exception public void setUp() throws Exception
@ -231,25 +368,35 @@ public class TaskLifecycleTest
{ {
final Task indexTask = new IndexTask( final Task indexTask = new IndexTask(
null, null,
null, new IndexTask.IndexIngestionSpec(
"foo", new DataSchema(
new UniformGranularitySpec( "foo",
Granularity.DAY, null,
null, new AggregatorFactory[]{new DoubleSumAggregatorFactory("met", "met")},
ImmutableList.of(new Interval("2010-01-01/P2D")), new UniformGranularitySpec(
Granularity.DAY Granularity.DAY,
), null,
new AggregatorFactory[]{new DoubleSumAggregatorFactory("met", "met")}, ImmutableList.of(new Interval("2010-01-01/P2D")),
QueryGranularity.NONE, Granularity.DAY
10000, )
newMockFirehoseFactory( ), new IndexTask.IndexIOConfig(
ImmutableList.of( newMockFirehoseFactory(
IR("2010-01-01T01", "x", "y", 1), ImmutableList.of(
IR("2010-01-01T01", "x", "z", 1), IR("2010-01-01T01", "x", "y", 1),
IR("2010-01-02T01", "a", "b", 2), IR("2010-01-01T01", "x", "z", 1),
IR("2010-01-02T01", "a", "c", 1) IR("2010-01-02T01", "a", "b", 2),
IR("2010-01-02T01", "a", "c", 1)
)
) )
), ),
new IndexTask.IndexTuningConfig(10000, 5 * 1024 * 1024, -1)
),
null,
null,
null,
null,
-1,
null,
-1, -1,
TestUtils.MAPPER TestUtils.MAPPER
); );
@ -294,7 +441,12 @@ public class TaskLifecycleTest
null, null,
null, null,
"foo", "foo",
new UniformGranularitySpec(Granularity.DAY, null, ImmutableList.of(new Interval("2010-01-01/P1D")), Granularity.DAY), new UniformGranularitySpec(
Granularity.DAY,
null,
ImmutableList.of(new Interval("2010-01-01/P1D")),
Granularity.DAY
),
new AggregatorFactory[]{new DoubleSumAggregatorFactory("met", "met")}, new AggregatorFactory[]{new DoubleSumAggregatorFactory("met", "met")},
QueryGranularity.NONE, QueryGranularity.NONE,
10000, 10000,
@ -561,142 +713,4 @@ public class TaskLifecycleTest
return ImmutableSet.copyOf(nuked); return ImmutableSet.copyOf(nuked);
} }
} }
private static MockIndexerDBCoordinator newMockMDC()
{
return new MockIndexerDBCoordinator();
}
private static ServiceEmitter newMockEmitter()
{
return new ServiceEmitter(null, null, null)
{
@Override
public void emit(Event event)
{
}
@Override
public void emit(ServiceEventBuilder builder)
{
}
};
}
private static InputRow IR(String dt, String dim1, String dim2, float met)
{
return new MapBasedInputRow(
new DateTime(dt).getMillis(),
ImmutableList.of("dim1", "dim2"),
ImmutableMap.<String, Object>of(
"dim1", dim1,
"dim2", dim2,
"met", met
)
);
}
private static FirehoseFactory newMockExceptionalFirehoseFactory()
{
return new FirehoseFactory()
{
@Override
public Firehose connect(InputRowParser parser) throws IOException
{
return new Firehose()
{
@Override
public boolean hasMore()
{
return true;
}
@Override
public InputRow nextRow()
{
throw new RuntimeException("HA HA HA");
}
@Override
public Runnable commit()
{
return new Runnable()
{
@Override
public void run()
{
}
};
}
@Override
public void close() throws IOException
{
}
};
}
@Override
public InputRowParser getParser()
{
return null;
}
};
}
private static FirehoseFactory newMockFirehoseFactory(final Iterable<InputRow> inputRows)
{
return new FirehoseFactory()
{
@Override
public Firehose connect(InputRowParser parser) throws IOException
{
final Iterator<InputRow> inputRowIterator = inputRows.iterator();
return new Firehose()
{
@Override
public boolean hasMore()
{
return inputRowIterator.hasNext();
}
@Override
public InputRow nextRow()
{
return inputRowIterator.next();
}
@Override
public Runnable commit()
{
return new Runnable()
{
@Override
public void run()
{
}
};
}
@Override
public void close() throws IOException
{
}
};
}
@Override
public InputRowParser getParser()
{
return null;
}
};
}
} }