mirror of https://github.com/apache/druid.git
Merge pull request #642 from metamx/numShards-IndexTask
Add way to skip determine partitions for index task
This commit is contained in:
commit
5d53b14bd2
|
@ -179,7 +179,16 @@ public class DetermineHashedPartitionsJob implements Jobby
|
|||
actualSpecs.add(new HadoopyShardSpec(new NoneShardSpec(), shardCount++));
|
||||
} else {
|
||||
for (int i = 0; i < numberOfShards; ++i) {
|
||||
actualSpecs.add(new HadoopyShardSpec(new HashBasedNumberedShardSpec(i, numberOfShards), shardCount++));
|
||||
actualSpecs.add(
|
||||
new HadoopyShardSpec(
|
||||
new HashBasedNumberedShardSpec(
|
||||
i,
|
||||
numberOfShards,
|
||||
HadoopDruidIndexerConfig.jsonMapper
|
||||
),
|
||||
shardCount++
|
||||
)
|
||||
);
|
||||
log.info("DateTime[%s], partition[%d], spec[%s]", bucket, i, actualSpecs.get(i));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -67,7 +67,7 @@ public class HadoopDruidDetermineConfigurationJob implements Jobby
|
|||
for (int i = 0; i < shardsPerInterval; i++) {
|
||||
specs.add(
|
||||
new HadoopyShardSpec(
|
||||
new HashBasedNumberedShardSpec(i, shardsPerInterval),
|
||||
new HashBasedNumberedShardSpec(i, shardsPerInterval, HadoopDruidIndexerConfig.jsonMapper),
|
||||
shardCount++
|
||||
)
|
||||
);
|
||||
|
|
|
@ -19,10 +19,13 @@
|
|||
|
||||
package io.druid.indexing.common.task;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JacksonInject;
|
||||
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||
import com.fasterxml.jackson.annotation.JsonIgnore;
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import com.fasterxml.jackson.annotation.JsonTypeName;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.Iterables;
|
||||
import com.google.common.collect.Lists;
|
||||
|
@ -53,12 +56,14 @@ import io.druid.segment.loading.DataSegmentPusher;
|
|||
import io.druid.segment.realtime.FireDepartmentMetrics;
|
||||
import io.druid.segment.realtime.plumber.Plumber;
|
||||
import io.druid.timeline.DataSegment;
|
||||
import io.druid.timeline.partition.HashBasedNumberedShardSpec;
|
||||
import io.druid.timeline.partition.NoneShardSpec;
|
||||
import io.druid.timeline.partition.ShardSpec;
|
||||
import io.druid.timeline.partition.SingleDimensionShardSpec;
|
||||
import org.joda.time.DateTime;
|
||||
import org.joda.time.Interval;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
|
@ -107,6 +112,8 @@ public class IndexTask extends AbstractFixedIntervalTask
|
|||
@JsonIgnore
|
||||
private final IndexIngestionSpec ingestionSchema;
|
||||
|
||||
private final ObjectMapper jsonMapper;
|
||||
|
||||
@JsonCreator
|
||||
public IndexTask(
|
||||
@JsonProperty("id") String id,
|
||||
|
@ -118,7 +125,8 @@ public class IndexTask extends AbstractFixedIntervalTask
|
|||
@JsonProperty("indexGranularity") final QueryGranularity indexGranularity,
|
||||
@JsonProperty("targetPartitionSize") final int targetPartitionSize,
|
||||
@JsonProperty("firehose") final FirehoseFactory firehoseFactory,
|
||||
@JsonProperty("rowFlushBoundary") final int rowFlushBoundary
|
||||
@JsonProperty("rowFlushBoundary") final int rowFlushBoundary,
|
||||
@JacksonInject ObjectMapper jsonMapper
|
||||
)
|
||||
{
|
||||
super(
|
||||
|
@ -139,9 +147,10 @@ public class IndexTask extends AbstractFixedIntervalTask
|
|||
granularitySpec.withQueryGranularity(indexGranularity == null ? QueryGranularity.NONE : indexGranularity)
|
||||
),
|
||||
new IndexIOConfig(firehoseFactory),
|
||||
new IndexTuningConfig(targetPartitionSize, rowFlushBoundary)
|
||||
new IndexTuningConfig(targetPartitionSize, rowFlushBoundary, null)
|
||||
);
|
||||
}
|
||||
this.jsonMapper = jsonMapper;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -175,7 +184,15 @@ public class IndexTask extends AbstractFixedIntervalTask
|
|||
if (targetPartitionSize > 0) {
|
||||
shardSpecs = determinePartitions(bucket, targetPartitionSize);
|
||||
} else {
|
||||
shardSpecs = ImmutableList.<ShardSpec>of(new NoneShardSpec());
|
||||
int numShards = ingestionSchema.getTuningConfig().getNumShards();
|
||||
if (numShards > 0) {
|
||||
shardSpecs = Lists.newArrayList();
|
||||
for (int i = 0; i < numShards; i++) {
|
||||
shardSpecs.add(new HashBasedNumberedShardSpec(i, numShards, jsonMapper));
|
||||
}
|
||||
} else {
|
||||
shardSpecs = ImmutableList.<ShardSpec>of(new NoneShardSpec());
|
||||
}
|
||||
}
|
||||
for (final ShardSpec shardSpec : shardSpecs) {
|
||||
final DataSegment segment = generateSegment(
|
||||
|
@ -206,6 +223,7 @@ public class IndexTask extends AbstractFixedIntervalTask
|
|||
retVal.add(interval);
|
||||
}
|
||||
}
|
||||
|
||||
return retVal;
|
||||
}
|
||||
|
||||
|
@ -477,7 +495,7 @@ public class IndexTask extends AbstractFixedIntervalTask
|
|||
|
||||
this.dataSchema = dataSchema;
|
||||
this.ioConfig = ioConfig;
|
||||
this.tuningConfig = tuningConfig == null ? new IndexTuningConfig(0, 0) : tuningConfig;
|
||||
this.tuningConfig = tuningConfig == null ? new IndexTuningConfig(0, 0, null) : tuningConfig;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -530,15 +548,22 @@ public class IndexTask extends AbstractFixedIntervalTask
|
|||
|
||||
private final int targetPartitionSize;
|
||||
private final int rowFlushBoundary;
|
||||
private final int numShards;
|
||||
|
||||
@JsonCreator
|
||||
public IndexTuningConfig(
|
||||
@JsonProperty("targetPartitionSize") int targetPartitionSize,
|
||||
@JsonProperty("rowFlushBoundary") int rowFlushBoundary
|
||||
)
|
||||
@JsonProperty("rowFlushBoundary") int rowFlushBoundary,
|
||||
@JsonProperty("numShards") @Nullable Integer numShards
|
||||
)
|
||||
{
|
||||
this.targetPartitionSize = targetPartitionSize == 0 ? DEFAULT_TARGET_PARTITION_SIZE : targetPartitionSize;
|
||||
this.rowFlushBoundary = rowFlushBoundary == 0 ? DEFAULT_ROW_FLUSH_BOUNDARY : rowFlushBoundary;
|
||||
this.numShards = numShards == null ? -1 : numShards;
|
||||
Preconditions.checkArgument(
|
||||
this.targetPartitionSize == -1 || this.numShards == -1,
|
||||
"targetPartitionsSize and shardCount both cannot be set"
|
||||
);
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
|
@ -552,5 +577,11 @@ public class IndexTask extends AbstractFixedIntervalTask
|
|||
{
|
||||
return rowFlushBoundary;
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
public int getNumShards()
|
||||
{
|
||||
return numShards;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -19,6 +19,8 @@
|
|||
|
||||
package io.druid.indexing.common.task;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonTypeInfo;
|
||||
import com.fasterxml.jackson.databind.InjectableValues;
|
||||
import com.fasterxml.jackson.databind.Module;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
|
@ -67,16 +69,19 @@ public class TaskSerdeTest
|
|||
QueryGranularity.NONE,
|
||||
10000,
|
||||
new LocalFirehoseFactory(new File("lol"), "rofl", null),
|
||||
-1
|
||||
-1,
|
||||
jsonMapper
|
||||
);
|
||||
|
||||
for (final Module jacksonModule : new FirehoseModule().getJacksonModules()) {
|
||||
jsonMapper.registerModule(jacksonModule);
|
||||
}
|
||||
InjectableValues inject = new InjectableValues.Std()
|
||||
.addValue(ObjectMapper.class, jsonMapper);
|
||||
final String json = jsonMapper.writeValueAsString(task);
|
||||
|
||||
Thread.sleep(100); // Just want to run the clock a bit to make sure the task id doesn't change
|
||||
final IndexTask task2 = (IndexTask) jsonMapper.readValue(json, Task.class);
|
||||
final IndexTask task2 = jsonMapper.reader(Task.class).with(inject).readValue(json);
|
||||
|
||||
Assert.assertEquals("foo", task.getDataSource());
|
||||
Assert.assertEquals(new Interval("2010-01-01/P2D"), task.getInterval());
|
||||
|
|
|
@ -43,6 +43,7 @@ import io.druid.data.input.InputRow;
|
|||
import io.druid.data.input.MapBasedInputRow;
|
||||
import io.druid.data.input.impl.InputRowParser;
|
||||
import io.druid.granularity.QueryGranularity;
|
||||
import io.druid.indexing.common.TestUtils;
|
||||
import io.druid.segment.column.ColumnConfig;
|
||||
import io.druid.segment.indexing.granularity.UniformGranularitySpec;
|
||||
import io.druid.indexing.common.SegmentLoaderFactory;
|
||||
|
@ -249,7 +250,8 @@ public class TaskLifecycleTest
|
|||
IR("2010-01-02T01", "a", "c", 1)
|
||||
)
|
||||
),
|
||||
-1
|
||||
-1,
|
||||
TestUtils.MAPPER
|
||||
);
|
||||
|
||||
final Optional<TaskStatus> preRunTaskStatus = tsqa.getStatus(indexTask.getId());
|
||||
|
@ -297,7 +299,8 @@ public class TaskLifecycleTest
|
|||
QueryGranularity.NONE,
|
||||
10000,
|
||||
newMockExceptionalFirehoseFactory(),
|
||||
-1
|
||||
-1,
|
||||
TestUtils.MAPPER
|
||||
);
|
||||
|
||||
final TaskStatus status = runTask(indexTask);
|
||||
|
|
|
@ -34,18 +34,18 @@ import java.util.List;
|
|||
|
||||
public class HashBasedNumberedShardSpec extends NumberedShardSpec
|
||||
{
|
||||
|
||||
private static final HashFunction hashFunction = Hashing.murmur3_32();
|
||||
@JacksonInject
|
||||
private ObjectMapper jsonMapper;
|
||||
private final ObjectMapper jsonMapper;
|
||||
|
||||
@JsonCreator
|
||||
public HashBasedNumberedShardSpec(
|
||||
@JsonProperty("partitionNum") int partitionNum,
|
||||
@JsonProperty("partitions") int partitions
|
||||
@JsonProperty("partitions") int partitions,
|
||||
@JacksonInject ObjectMapper jsonMapper
|
||||
)
|
||||
{
|
||||
super(partitionNum, partitions);
|
||||
this.jsonMapper = jsonMapper;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -26,6 +26,7 @@ import com.metamx.common.ISE;
|
|||
import io.druid.TestUtil;
|
||||
import io.druid.data.input.InputRow;
|
||||
import io.druid.data.input.Row;
|
||||
import io.druid.jackson.DefaultObjectMapper;
|
||||
import io.druid.timeline.partition.HashBasedNumberedShardSpec;
|
||||
import io.druid.timeline.partition.PartitionChunk;
|
||||
import io.druid.timeline.partition.ShardSpec;
|
||||
|
@ -43,7 +44,7 @@ public class HashBasedNumberedShardSpecTest
|
|||
{
|
||||
|
||||
final ShardSpec spec = TestUtil.MAPPER.readValue(
|
||||
TestUtil.MAPPER.writeValueAsBytes(new HashBasedNumberedShardSpec(1, 2)),
|
||||
TestUtil.MAPPER.writeValueAsBytes(new HashBasedNumberedShardSpec(1, 2, TestUtil.MAPPER)),
|
||||
ShardSpec.class
|
||||
);
|
||||
Assert.assertEquals(1, spec.getPartitionNum());
|
||||
|
@ -65,9 +66,9 @@ public class HashBasedNumberedShardSpecTest
|
|||
public void testPartitionChunks()
|
||||
{
|
||||
final List<ShardSpec> specs = ImmutableList.<ShardSpec>of(
|
||||
new HashBasedNumberedShardSpec(0, 3),
|
||||
new HashBasedNumberedShardSpec(1, 3),
|
||||
new HashBasedNumberedShardSpec(2, 3)
|
||||
new HashBasedNumberedShardSpec(0, 3, TestUtil.MAPPER),
|
||||
new HashBasedNumberedShardSpec(1, 3, TestUtil.MAPPER),
|
||||
new HashBasedNumberedShardSpec(2, 3, TestUtil.MAPPER)
|
||||
);
|
||||
|
||||
final List<PartitionChunk<String>> chunks = Lists.transform(
|
||||
|
@ -141,7 +142,7 @@ public class HashBasedNumberedShardSpecTest
|
|||
int partitions
|
||||
)
|
||||
{
|
||||
super(partitionNum, partitions);
|
||||
super(partitionNum, partitions, TestUtil.MAPPER);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
Loading…
Reference in New Issue