mirror of https://github.com/apache/druid.git
Add support hash partitioning by a subset of dimensions to indexTask (#6326)
* Add support hash partitioning by a subset of dimensions to indexTask * add doc * fix style * fix test * fix doc * fix build
This commit is contained in:
parent
c5872bef41
commit
45aa51a00c
|
@ -475,6 +475,7 @@ The tuningConfig is optional and default parameters will be used if no tuningCon
|
||||||
|maxBytesInMemory|Used in determining when intermediate persists to disk should occur. Normally this is computed internally and user does not need to set it. This value represents number of bytes to aggregate in heap memory before persisting. This is based on a rough estimate of memory usage and not actual usage. The maximum heap memory usage for indexing is maxBytesInMemory * (2 + maxPendingPersists)|1/6 of max JVM memory|no|
|
|maxBytesInMemory|Used in determining when intermediate persists to disk should occur. Normally this is computed internally and user does not need to set it. This value represents number of bytes to aggregate in heap memory before persisting. This is based on a rough estimate of memory usage and not actual usage. The maximum heap memory usage for indexing is maxBytesInMemory * (2 + maxPendingPersists)|1/6 of max JVM memory|no|
|
||||||
|maxTotalRows|Total number of rows in segments waiting for being pushed. Used in determining when intermediate pushing should occur.|20000000|no|
|
|maxTotalRows|Total number of rows in segments waiting for being pushed. Used in determining when intermediate pushing should occur.|20000000|no|
|
||||||
|numShards|Directly specify the number of shards to create. If this is specified and 'intervals' is specified in the granularitySpec, the index task can skip the determine intervals/partitions pass through the data. numShards cannot be specified if targetPartitionSize is set.|null|no|
|
|numShards|Directly specify the number of shards to create. If this is specified and 'intervals' is specified in the granularitySpec, the index task can skip the determine intervals/partitions pass through the data. numShards cannot be specified if targetPartitionSize is set.|null|no|
|
||||||
|
|partitionDimensions|The dimensions to partition on. Leave blank to select all dimensions. Only used with `forceGuaranteedRollup` = true, will be ignored otherwise.|null|no|
|
||||||
|indexSpec|defines segment storage format options to be used at indexing time, see [IndexSpec](#indexspec)|null|no|
|
|indexSpec|defines segment storage format options to be used at indexing time, see [IndexSpec](#indexspec)|null|no|
|
||||||
|maxPendingPersists|Maximum number of persists that can be pending but not started. If this limit would be exceeded by a new intermediate persist, ingestion will block until the currently-running persist finishes. Maximum heap memory usage for indexing scales with maxRowsInMemory * (2 + maxPendingPersists).|0 (meaning one persist can be running concurrently with ingestion, and none can be queued up)|no|
|
|maxPendingPersists|Maximum number of persists that can be pending but not started. If this limit would be exceeded by a new intermediate persist, ingestion will block until the currently-running persist finishes. Maximum heap memory usage for indexing scales with maxRowsInMemory * (2 + maxPendingPersists).|0 (meaning one persist can be running concurrently with ingestion, and none can be queued up)|no|
|
||||||
|forceExtendableShardSpecs|Forces use of extendable shardSpecs. Experimental feature intended for use with the [Kafka indexing service extension](../development/extensions-core/kafka-ingestion.html).|false|no|
|
|forceExtendableShardSpecs|Forces use of extendable shardSpecs. Experimental feature intended for use with the [Kafka indexing service extension](../development/extensions-core/kafka-ingestion.html).|false|no|
|
||||||
|
|
|
@ -112,6 +112,7 @@ import javax.ws.rs.core.MediaType;
|
||||||
import javax.ws.rs.core.Response;
|
import javax.ws.rs.core.Response;
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.util.Collections;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
@ -654,6 +655,7 @@ public class IndexTask extends AbstractTask implements ChatHandler
|
||||||
final int numShards = tuningConfig.getNumShards() == null ? 1 : tuningConfig.getNumShards();
|
final int numShards = tuningConfig.getNumShards() == null ? 1 : tuningConfig.getNumShards();
|
||||||
final BiFunction<Integer, Integer, ShardSpec> shardSpecCreateFn = getShardSpecCreateFunction(
|
final BiFunction<Integer, Integer, ShardSpec> shardSpecCreateFn = getShardSpecCreateFunction(
|
||||||
numShards,
|
numShards,
|
||||||
|
tuningConfig.getPartitionDimensions(),
|
||||||
jsonMapper
|
jsonMapper
|
||||||
);
|
);
|
||||||
|
|
||||||
|
@ -721,6 +723,7 @@ public class IndexTask extends AbstractTask implements ChatHandler
|
||||||
// Overwrite mode, guaranteed rollup: shardSpecs must be known in advance.
|
// Overwrite mode, guaranteed rollup: shardSpecs must be known in advance.
|
||||||
final BiFunction<Integer, Integer, ShardSpec> shardSpecCreateFn = getShardSpecCreateFunction(
|
final BiFunction<Integer, Integer, ShardSpec> shardSpecCreateFn = getShardSpecCreateFunction(
|
||||||
numShards,
|
numShards,
|
||||||
|
tuningConfig.getPartitionDimensions(),
|
||||||
jsonMapper
|
jsonMapper
|
||||||
);
|
);
|
||||||
|
|
||||||
|
@ -839,6 +842,7 @@ public class IndexTask extends AbstractTask implements ChatHandler
|
||||||
|
|
||||||
private static BiFunction<Integer, Integer, ShardSpec> getShardSpecCreateFunction(
|
private static BiFunction<Integer, Integer, ShardSpec> getShardSpecCreateFunction(
|
||||||
Integer numShards,
|
Integer numShards,
|
||||||
|
List<String> partitionDimensions,
|
||||||
ObjectMapper jsonMapper
|
ObjectMapper jsonMapper
|
||||||
)
|
)
|
||||||
{
|
{
|
||||||
|
@ -847,7 +851,12 @@ public class IndexTask extends AbstractTask implements ChatHandler
|
||||||
if (numShards == 1) {
|
if (numShards == 1) {
|
||||||
return (shardId, totalNumShards) -> NoneShardSpec.instance();
|
return (shardId, totalNumShards) -> NoneShardSpec.instance();
|
||||||
} else {
|
} else {
|
||||||
return (shardId, totalNumShards) -> new HashBasedNumberedShardSpec(shardId, totalNumShards, null, jsonMapper);
|
return (shardId, totalNumShards) -> new HashBasedNumberedShardSpec(
|
||||||
|
shardId,
|
||||||
|
totalNumShards,
|
||||||
|
partitionDimensions,
|
||||||
|
jsonMapper
|
||||||
|
);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1347,6 +1356,7 @@ public class IndexTask extends AbstractTask implements ChatHandler
|
||||||
private final Long maxTotalRows;
|
private final Long maxTotalRows;
|
||||||
@Nullable
|
@Nullable
|
||||||
private final Integer numShards;
|
private final Integer numShards;
|
||||||
|
private final List<String> partitionDimensions;
|
||||||
private final IndexSpec indexSpec;
|
private final IndexSpec indexSpec;
|
||||||
private final File basePersistDirectory;
|
private final File basePersistDirectory;
|
||||||
private final int maxPendingPersists;
|
private final int maxPendingPersists;
|
||||||
|
@ -1386,6 +1396,7 @@ public class IndexTask extends AbstractTask implements ChatHandler
|
||||||
@JsonProperty("maxTotalRows") @Nullable Long maxTotalRows,
|
@JsonProperty("maxTotalRows") @Nullable Long maxTotalRows,
|
||||||
@JsonProperty("rowFlushBoundary") @Nullable Integer rowFlushBoundary_forBackCompatibility, // DEPRECATED
|
@JsonProperty("rowFlushBoundary") @Nullable Integer rowFlushBoundary_forBackCompatibility, // DEPRECATED
|
||||||
@JsonProperty("numShards") @Nullable Integer numShards,
|
@JsonProperty("numShards") @Nullable Integer numShards,
|
||||||
|
@JsonProperty("partitionDimensions") @Nullable List<String> partitionDimensions,
|
||||||
@JsonProperty("indexSpec") @Nullable IndexSpec indexSpec,
|
@JsonProperty("indexSpec") @Nullable IndexSpec indexSpec,
|
||||||
@JsonProperty("maxPendingPersists") @Nullable Integer maxPendingPersists,
|
@JsonProperty("maxPendingPersists") @Nullable Integer maxPendingPersists,
|
||||||
// This parameter is left for compatibility when reading existing JSONs, to be removed in Druid 0.12.
|
// This parameter is left for compatibility when reading existing JSONs, to be removed in Druid 0.12.
|
||||||
|
@ -1408,6 +1419,7 @@ public class IndexTask extends AbstractTask implements ChatHandler
|
||||||
maxBytesInMemory != null ? maxBytesInMemory : 0,
|
maxBytesInMemory != null ? maxBytesInMemory : 0,
|
||||||
maxTotalRows,
|
maxTotalRows,
|
||||||
numShards,
|
numShards,
|
||||||
|
partitionDimensions,
|
||||||
indexSpec,
|
indexSpec,
|
||||||
maxPendingPersists,
|
maxPendingPersists,
|
||||||
forceExtendableShardSpecs,
|
forceExtendableShardSpecs,
|
||||||
|
@ -1424,7 +1436,7 @@ public class IndexTask extends AbstractTask implements ChatHandler
|
||||||
|
|
||||||
private IndexTuningConfig()
|
private IndexTuningConfig()
|
||||||
{
|
{
|
||||||
this(null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null);
|
this(null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null);
|
||||||
}
|
}
|
||||||
|
|
||||||
private IndexTuningConfig(
|
private IndexTuningConfig(
|
||||||
|
@ -1433,6 +1445,7 @@ public class IndexTask extends AbstractTask implements ChatHandler
|
||||||
@Nullable Long maxBytesInMemory,
|
@Nullable Long maxBytesInMemory,
|
||||||
@Nullable Long maxTotalRows,
|
@Nullable Long maxTotalRows,
|
||||||
@Nullable Integer numShards,
|
@Nullable Integer numShards,
|
||||||
|
@Nullable List<String> partitionDimensions,
|
||||||
@Nullable IndexSpec indexSpec,
|
@Nullable IndexSpec indexSpec,
|
||||||
@Nullable Integer maxPendingPersists,
|
@Nullable Integer maxPendingPersists,
|
||||||
@Nullable Boolean forceExtendableShardSpecs,
|
@Nullable Boolean forceExtendableShardSpecs,
|
||||||
|
@ -1460,6 +1473,7 @@ public class IndexTask extends AbstractTask implements ChatHandler
|
||||||
this.maxBytesInMemory = maxBytesInMemory == null ? 0 : maxBytesInMemory;
|
this.maxBytesInMemory = maxBytesInMemory == null ? 0 : maxBytesInMemory;
|
||||||
this.maxTotalRows = maxTotalRows;
|
this.maxTotalRows = maxTotalRows;
|
||||||
this.numShards = numShards == null || numShards.equals(-1) ? null : numShards;
|
this.numShards = numShards == null || numShards.equals(-1) ? null : numShards;
|
||||||
|
this.partitionDimensions = partitionDimensions == null ? Collections.emptyList() : partitionDimensions;
|
||||||
this.indexSpec = indexSpec == null ? DEFAULT_INDEX_SPEC : indexSpec;
|
this.indexSpec = indexSpec == null ? DEFAULT_INDEX_SPEC : indexSpec;
|
||||||
this.maxPendingPersists = maxPendingPersists == null ? DEFAULT_MAX_PENDING_PERSISTS : maxPendingPersists;
|
this.maxPendingPersists = maxPendingPersists == null ? DEFAULT_MAX_PENDING_PERSISTS : maxPendingPersists;
|
||||||
this.forceExtendableShardSpecs = forceExtendableShardSpecs == null
|
this.forceExtendableShardSpecs = forceExtendableShardSpecs == null
|
||||||
|
@ -1498,6 +1512,7 @@ public class IndexTask extends AbstractTask implements ChatHandler
|
||||||
maxBytesInMemory,
|
maxBytesInMemory,
|
||||||
maxTotalRows,
|
maxTotalRows,
|
||||||
numShards,
|
numShards,
|
||||||
|
partitionDimensions,
|
||||||
indexSpec,
|
indexSpec,
|
||||||
maxPendingPersists,
|
maxPendingPersists,
|
||||||
forceExtendableShardSpecs,
|
forceExtendableShardSpecs,
|
||||||
|
@ -1520,6 +1535,7 @@ public class IndexTask extends AbstractTask implements ChatHandler
|
||||||
maxBytesInMemory,
|
maxBytesInMemory,
|
||||||
maxTotalRows,
|
maxTotalRows,
|
||||||
numShards,
|
numShards,
|
||||||
|
partitionDimensions,
|
||||||
indexSpec,
|
indexSpec,
|
||||||
maxPendingPersists,
|
maxPendingPersists,
|
||||||
forceExtendableShardSpecs,
|
forceExtendableShardSpecs,
|
||||||
|
@ -1577,6 +1593,12 @@ public class IndexTask extends AbstractTask implements ChatHandler
|
||||||
return numShards;
|
return numShards;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@JsonProperty
|
||||||
|
public List<String> getPartitionDimensions()
|
||||||
|
{
|
||||||
|
return partitionDimensions;
|
||||||
|
}
|
||||||
|
|
||||||
@JsonProperty
|
@JsonProperty
|
||||||
@Override
|
@Override
|
||||||
public IndexSpec getIndexSpec()
|
public IndexSpec getIndexSpec()
|
||||||
|
|
|
@ -296,6 +296,7 @@ public class ParallelIndexSupervisorTask extends AbstractTask implements ChatHan
|
||||||
tuningConfig.getMaxTotalRows(),
|
tuningConfig.getMaxTotalRows(),
|
||||||
null,
|
null,
|
||||||
tuningConfig.getNumShards(),
|
tuningConfig.getNumShards(),
|
||||||
|
null,
|
||||||
tuningConfig.getIndexSpec(),
|
tuningConfig.getIndexSpec(),
|
||||||
tuningConfig.getMaxPendingPersists(),
|
tuningConfig.getMaxPendingPersists(),
|
||||||
true,
|
true,
|
||||||
|
|
|
@ -105,6 +105,7 @@ public class ParallelIndexTuningConfig extends IndexTuningConfig
|
||||||
maxTotalRows,
|
maxTotalRows,
|
||||||
null,
|
null,
|
||||||
numShards,
|
numShards,
|
||||||
|
null,
|
||||||
indexSpec,
|
indexSpec,
|
||||||
maxPendingPersists,
|
maxPendingPersists,
|
||||||
null,
|
null,
|
||||||
|
|
|
@ -22,6 +22,7 @@ package org.apache.druid.indexing.common;
|
||||||
import com.fasterxml.jackson.databind.InjectableValues;
|
import com.fasterxml.jackson.databind.InjectableValues;
|
||||||
import com.fasterxml.jackson.databind.Module;
|
import com.fasterxml.jackson.databind.Module;
|
||||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
|
import com.fasterxml.jackson.databind.module.SimpleModule;
|
||||||
import com.google.common.base.Stopwatch;
|
import com.google.common.base.Stopwatch;
|
||||||
import com.google.common.collect.ImmutableMap;
|
import com.google.common.collect.ImmutableMap;
|
||||||
import org.apache.druid.client.indexing.IndexingServiceClient;
|
import org.apache.druid.client.indexing.IndexingServiceClient;
|
||||||
|
@ -37,6 +38,8 @@ import org.apache.druid.query.expression.LookupEnabledTestExprMacroTable;
|
||||||
import org.apache.druid.segment.IndexIO;
|
import org.apache.druid.segment.IndexIO;
|
||||||
import org.apache.druid.segment.IndexMergerV9;
|
import org.apache.druid.segment.IndexMergerV9;
|
||||||
import org.apache.druid.segment.column.ColumnConfig;
|
import org.apache.druid.segment.column.ColumnConfig;
|
||||||
|
import org.apache.druid.segment.loading.LocalDataSegmentPuller;
|
||||||
|
import org.apache.druid.segment.loading.LocalLoadSpec;
|
||||||
import org.apache.druid.segment.realtime.firehose.ChatHandlerProvider;
|
import org.apache.druid.segment.realtime.firehose.ChatHandlerProvider;
|
||||||
import org.apache.druid.segment.realtime.firehose.NoopChatHandlerProvider;
|
import org.apache.druid.segment.realtime.firehose.NoopChatHandlerProvider;
|
||||||
import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
|
import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
|
||||||
|
@ -93,6 +96,18 @@ public class TestUtils
|
||||||
.addValue(DataSegment.PruneLoadSpecHolder.class, DataSegment.PruneLoadSpecHolder.DEFAULT)
|
.addValue(DataSegment.PruneLoadSpecHolder.class, DataSegment.PruneLoadSpecHolder.DEFAULT)
|
||||||
.addValue(IndexingServiceClient.class, new NoopIndexingServiceClient())
|
.addValue(IndexingServiceClient.class, new NoopIndexingServiceClient())
|
||||||
.addValue(AuthorizerMapper.class, new AuthorizerMapper(ImmutableMap.of()))
|
.addValue(AuthorizerMapper.class, new AuthorizerMapper(ImmutableMap.of()))
|
||||||
|
.addValue(LocalDataSegmentPuller.class, new LocalDataSegmentPuller())
|
||||||
|
);
|
||||||
|
|
||||||
|
jsonMapper.registerModule(
|
||||||
|
new SimpleModule()
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
public void setupModule(SetupContext context)
|
||||||
|
{
|
||||||
|
context.registerSubtypes(LocalLoadSpec.class);
|
||||||
|
}
|
||||||
|
}
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -273,6 +273,7 @@ public class CompactionTaskTest
|
||||||
null,
|
null,
|
||||||
null,
|
null,
|
||||||
null,
|
null,
|
||||||
|
null,
|
||||||
new IndexSpec(
|
new IndexSpec(
|
||||||
new RoaringBitmapSerdeFactory(true),
|
new RoaringBitmapSerdeFactory(true),
|
||||||
CompressionStrategy.LZ4,
|
CompressionStrategy.LZ4,
|
||||||
|
@ -423,6 +424,7 @@ public class CompactionTaskTest
|
||||||
null,
|
null,
|
||||||
null,
|
null,
|
||||||
null,
|
null,
|
||||||
|
null,
|
||||||
new IndexSpec(
|
new IndexSpec(
|
||||||
new RoaringBitmapSerdeFactory(true),
|
new RoaringBitmapSerdeFactory(true),
|
||||||
CompressionStrategy.LZ4,
|
CompressionStrategy.LZ4,
|
||||||
|
@ -483,6 +485,7 @@ public class CompactionTaskTest
|
||||||
5L,
|
5L,
|
||||||
null,
|
null,
|
||||||
null,
|
null,
|
||||||
|
null,
|
||||||
new IndexSpec(
|
new IndexSpec(
|
||||||
new RoaringBitmapSerdeFactory(true),
|
new RoaringBitmapSerdeFactory(true),
|
||||||
CompressionStrategy.LZ4,
|
CompressionStrategy.LZ4,
|
||||||
|
@ -543,6 +546,7 @@ public class CompactionTaskTest
|
||||||
null,
|
null,
|
||||||
null,
|
null,
|
||||||
3,
|
3,
|
||||||
|
null,
|
||||||
new IndexSpec(
|
new IndexSpec(
|
||||||
new RoaringBitmapSerdeFactory(true),
|
new RoaringBitmapSerdeFactory(true),
|
||||||
CompressionStrategy.LZ4,
|
CompressionStrategy.LZ4,
|
||||||
|
@ -758,6 +762,7 @@ public class CompactionTaskTest
|
||||||
null,
|
null,
|
||||||
null,
|
null,
|
||||||
null,
|
null,
|
||||||
|
null,
|
||||||
new IndexSpec(
|
new IndexSpec(
|
||||||
new RoaringBitmapSerdeFactory(true),
|
new RoaringBitmapSerdeFactory(true),
|
||||||
CompressionStrategy.LZ4,
|
CompressionStrategy.LZ4,
|
||||||
|
@ -850,6 +855,7 @@ public class CompactionTaskTest
|
||||||
null,
|
null,
|
||||||
null,
|
null,
|
||||||
null,
|
null,
|
||||||
|
null,
|
||||||
new IndexSpec(
|
new IndexSpec(
|
||||||
new RoaringBitmapSerdeFactory(true),
|
new RoaringBitmapSerdeFactory(true),
|
||||||
CompressionStrategy.LZ4,
|
CompressionStrategy.LZ4,
|
||||||
|
|
|
@ -19,6 +19,7 @@
|
||||||
|
|
||||||
package org.apache.druid.indexing.common.task;
|
package org.apache.druid.indexing.common.task;
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||||
import com.fasterxml.jackson.core.type.TypeReference;
|
import com.fasterxml.jackson.core.type.TypeReference;
|
||||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
import com.google.common.collect.ImmutableList;
|
import com.google.common.collect.ImmutableList;
|
||||||
|
@ -61,21 +62,35 @@ import org.apache.druid.java.util.common.Intervals;
|
||||||
import org.apache.druid.java.util.common.Pair;
|
import org.apache.druid.java.util.common.Pair;
|
||||||
import org.apache.druid.java.util.common.StringUtils;
|
import org.apache.druid.java.util.common.StringUtils;
|
||||||
import org.apache.druid.java.util.common.granularity.Granularities;
|
import org.apache.druid.java.util.common.granularity.Granularities;
|
||||||
|
import org.apache.druid.java.util.common.guava.Comparators;
|
||||||
|
import org.apache.druid.java.util.common.guava.Sequence;
|
||||||
import org.apache.druid.math.expr.ExprMacroTable;
|
import org.apache.druid.math.expr.ExprMacroTable;
|
||||||
import org.apache.druid.query.aggregation.AggregatorFactory;
|
import org.apache.druid.query.aggregation.AggregatorFactory;
|
||||||
import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
|
import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
|
||||||
|
import org.apache.druid.query.dimension.DefaultDimensionSpec;
|
||||||
import org.apache.druid.query.filter.SelectorDimFilter;
|
import org.apache.druid.query.filter.SelectorDimFilter;
|
||||||
|
import org.apache.druid.segment.Cursor;
|
||||||
|
import org.apache.druid.segment.DimensionSelector;
|
||||||
import org.apache.druid.segment.IndexIO;
|
import org.apache.druid.segment.IndexIO;
|
||||||
import org.apache.druid.segment.IndexMergerV9;
|
import org.apache.druid.segment.IndexMergerV9;
|
||||||
import org.apache.druid.segment.IndexSpec;
|
import org.apache.druid.segment.IndexSpec;
|
||||||
|
import org.apache.druid.segment.QueryableIndexStorageAdapter;
|
||||||
|
import org.apache.druid.segment.VirtualColumns;
|
||||||
import org.apache.druid.segment.indexing.DataSchema;
|
import org.apache.druid.segment.indexing.DataSchema;
|
||||||
import org.apache.druid.segment.indexing.granularity.ArbitraryGranularitySpec;
|
import org.apache.druid.segment.indexing.granularity.ArbitraryGranularitySpec;
|
||||||
import org.apache.druid.segment.indexing.granularity.GranularitySpec;
|
import org.apache.druid.segment.indexing.granularity.GranularitySpec;
|
||||||
import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
|
import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
|
||||||
import org.apache.druid.segment.loading.DataSegmentKiller;
|
import org.apache.druid.segment.loading.DataSegmentKiller;
|
||||||
import org.apache.druid.segment.loading.DataSegmentPusher;
|
import org.apache.druid.segment.loading.DataSegmentPusher;
|
||||||
|
import org.apache.druid.segment.loading.LocalDataSegmentPusher;
|
||||||
|
import org.apache.druid.segment.loading.LocalDataSegmentPusherConfig;
|
||||||
|
import org.apache.druid.segment.loading.SegmentLoader;
|
||||||
|
import org.apache.druid.segment.loading.SegmentLoaderConfig;
|
||||||
|
import org.apache.druid.segment.loading.SegmentLoaderLocalCacheManager;
|
||||||
|
import org.apache.druid.segment.loading.StorageLocationConfig;
|
||||||
import org.apache.druid.segment.realtime.appenderator.SegmentIdentifier;
|
import org.apache.druid.segment.realtime.appenderator.SegmentIdentifier;
|
||||||
import org.apache.druid.segment.realtime.firehose.LocalFirehoseFactory;
|
import org.apache.druid.segment.realtime.firehose.LocalFirehoseFactory;
|
||||||
|
import org.apache.druid.segment.realtime.firehose.WindowedStorageAdapter;
|
||||||
import org.apache.druid.segment.transform.ExpressionTransform;
|
import org.apache.druid.segment.transform.ExpressionTransform;
|
||||||
import org.apache.druid.segment.transform.TransformSpec;
|
import org.apache.druid.segment.transform.TransformSpec;
|
||||||
import org.apache.druid.server.security.AuthTestUtils;
|
import org.apache.druid.server.security.AuthTestUtils;
|
||||||
|
@ -93,10 +108,10 @@ import org.junit.Test;
|
||||||
import org.junit.rules.ExpectedException;
|
import org.junit.rules.ExpectedException;
|
||||||
import org.junit.rules.TemporaryFolder;
|
import org.junit.rules.TemporaryFolder;
|
||||||
|
|
||||||
|
import javax.annotation.Nullable;
|
||||||
import java.io.BufferedWriter;
|
import java.io.BufferedWriter;
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.net.URI;
|
|
||||||
import java.nio.charset.StandardCharsets;
|
import java.nio.charset.StandardCharsets;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
|
@ -131,6 +146,10 @@ public class IndexTaskTest
|
||||||
0
|
0
|
||||||
);
|
);
|
||||||
|
|
||||||
|
private DataSegmentPusher pusher;
|
||||||
|
private SegmentLoader segmentLoader;
|
||||||
|
private List<DataSegment> segments;
|
||||||
|
|
||||||
private static final IndexSpec indexSpec = new IndexSpec();
|
private static final IndexSpec indexSpec = new IndexSpec();
|
||||||
private final ObjectMapper jsonMapper;
|
private final ObjectMapper jsonMapper;
|
||||||
private IndexMergerV9 indexMergerV9;
|
private IndexMergerV9 indexMergerV9;
|
||||||
|
@ -143,6 +162,7 @@ public class IndexTaskTest
|
||||||
{
|
{
|
||||||
TestUtils testUtils = new TestUtils();
|
TestUtils testUtils = new TestUtils();
|
||||||
jsonMapper = testUtils.getTestObjectMapper();
|
jsonMapper = testUtils.getTestObjectMapper();
|
||||||
|
|
||||||
indexMergerV9 = testUtils.getTestIndexMergerV9();
|
indexMergerV9 = testUtils.getTestIndexMergerV9();
|
||||||
indexIO = testUtils.getTestIndexIO();
|
indexIO = testUtils.getTestIndexIO();
|
||||||
rowIngestionMetersFactory = testUtils.getRowIngestionMetersFactory();
|
rowIngestionMetersFactory = testUtils.getRowIngestionMetersFactory();
|
||||||
|
@ -151,7 +171,50 @@ public class IndexTaskTest
|
||||||
@Before
|
@Before
|
||||||
public void setup() throws IOException
|
public void setup() throws IOException
|
||||||
{
|
{
|
||||||
reportsFile = File.createTempFile("IndexTaskTestReports-" + System.currentTimeMillis(), "json");
|
reportsFile = temporaryFolder.newFile(
|
||||||
|
StringUtils.format("IndexTaskTestReports-%s.json", System.currentTimeMillis())
|
||||||
|
);
|
||||||
|
|
||||||
|
final File deepStorageDir = temporaryFolder.newFolder();
|
||||||
|
final File cacheDir = temporaryFolder.newFolder();
|
||||||
|
|
||||||
|
pusher = new LocalDataSegmentPusher(
|
||||||
|
new LocalDataSegmentPusherConfig()
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
public File getStorageDirectory()
|
||||||
|
{
|
||||||
|
return deepStorageDir;
|
||||||
|
}
|
||||||
|
},
|
||||||
|
jsonMapper
|
||||||
|
)
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
public DataSegment push(final File dataSegmentFile, final DataSegment segment, final boolean useUniquePath)
|
||||||
|
throws IOException
|
||||||
|
{
|
||||||
|
final DataSegment returnSegment = super.push(dataSegmentFile, segment, useUniquePath);
|
||||||
|
segments.add(returnSegment);
|
||||||
|
return returnSegment;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
segmentLoader = new SegmentLoaderLocalCacheManager(
|
||||||
|
indexIO,
|
||||||
|
new SegmentLoaderConfig()
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
public List<StorageLocationConfig> getLocations()
|
||||||
|
{
|
||||||
|
return Collections.singletonList(
|
||||||
|
new StorageLocationConfig().setPath(cacheDir)
|
||||||
|
);
|
||||||
|
}
|
||||||
|
},
|
||||||
|
jsonMapper
|
||||||
|
);
|
||||||
|
segments = new ArrayList<>();
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@After
|
@After
|
||||||
|
@ -180,7 +243,7 @@ public class IndexTaskTest
|
||||||
tmpDir,
|
tmpDir,
|
||||||
null,
|
null,
|
||||||
null,
|
null,
|
||||||
createTuningConfig(2, null, false, true),
|
createTuningConfigWithTargetPartitionSize(2, false, true),
|
||||||
false
|
false
|
||||||
),
|
),
|
||||||
null,
|
null,
|
||||||
|
@ -226,7 +289,7 @@ public class IndexTaskTest
|
||||||
tmpDir,
|
tmpDir,
|
||||||
null,
|
null,
|
||||||
null,
|
null,
|
||||||
createTuningConfig(2, null, true, true),
|
createTuningConfigWithTargetPartitionSize(2, true, true),
|
||||||
false
|
false
|
||||||
),
|
),
|
||||||
null,
|
null,
|
||||||
|
@ -278,7 +341,7 @@ public class IndexTaskTest
|
||||||
)
|
)
|
||||||
),
|
),
|
||||||
null,
|
null,
|
||||||
createTuningConfig(2, null, true, false),
|
createTuningConfigWithTargetPartitionSize(2, true, false),
|
||||||
false
|
false
|
||||||
),
|
),
|
||||||
null,
|
null,
|
||||||
|
@ -322,7 +385,7 @@ public class IndexTaskTest
|
||||||
Granularities.MINUTE,
|
Granularities.MINUTE,
|
||||||
Collections.singletonList(Intervals.of("2014-01-01/2014-01-02"))
|
Collections.singletonList(Intervals.of("2014-01-01/2014-01-02"))
|
||||||
),
|
),
|
||||||
createTuningConfig(10, null, false, true),
|
createTuningConfigWithTargetPartitionSize(10, false, true),
|
||||||
false
|
false
|
||||||
),
|
),
|
||||||
null,
|
null,
|
||||||
|
@ -331,7 +394,7 @@ public class IndexTaskTest
|
||||||
rowIngestionMetersFactory
|
rowIngestionMetersFactory
|
||||||
);
|
);
|
||||||
|
|
||||||
List<DataSegment> segments = runTask(indexTask).rhs;
|
final List<DataSegment> segments = runTask(indexTask).rhs;
|
||||||
|
|
||||||
Assert.assertEquals(1, segments.size());
|
Assert.assertEquals(1, segments.size());
|
||||||
}
|
}
|
||||||
|
@ -359,7 +422,7 @@ public class IndexTaskTest
|
||||||
Granularities.HOUR,
|
Granularities.HOUR,
|
||||||
Collections.singletonList(Intervals.of("2014-01-01T08:00:00Z/2014-01-01T09:00:00Z"))
|
Collections.singletonList(Intervals.of("2014-01-01T08:00:00Z/2014-01-01T09:00:00Z"))
|
||||||
),
|
),
|
||||||
createTuningConfig(50, null, false, true),
|
createTuningConfigWithTargetPartitionSize(50, false, true),
|
||||||
false
|
false
|
||||||
),
|
),
|
||||||
null,
|
null,
|
||||||
|
@ -392,7 +455,7 @@ public class IndexTaskTest
|
||||||
tmpDir,
|
tmpDir,
|
||||||
null,
|
null,
|
||||||
null,
|
null,
|
||||||
createTuningConfig(null, 1, false, true),
|
createTuningConfigWithNumShards(1, null, false, true),
|
||||||
false
|
false
|
||||||
),
|
),
|
||||||
null,
|
null,
|
||||||
|
@ -407,10 +470,84 @@ public class IndexTaskTest
|
||||||
|
|
||||||
Assert.assertEquals("test", segments.get(0).getDataSource());
|
Assert.assertEquals("test", segments.get(0).getDataSource());
|
||||||
Assert.assertEquals(Intervals.of("2014/P1D"), segments.get(0).getInterval());
|
Assert.assertEquals(Intervals.of("2014/P1D"), segments.get(0).getInterval());
|
||||||
Assert.assertTrue(segments.get(0).getShardSpec().getClass().equals(NoneShardSpec.class));
|
Assert.assertEquals(NoneShardSpec.class, segments.get(0).getShardSpec().getClass());
|
||||||
Assert.assertEquals(0, segments.get(0).getShardSpec().getPartitionNum());
|
Assert.assertEquals(0, segments.get(0).getShardSpec().getPartitionNum());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testNumShardsAndPartitionDimensionsProvided() throws Exception
|
||||||
|
{
|
||||||
|
final File tmpDir = temporaryFolder.newFolder();
|
||||||
|
final File tmpFile = File.createTempFile("druid", "index", tmpDir);
|
||||||
|
|
||||||
|
try (BufferedWriter writer = Files.newWriter(tmpFile, StandardCharsets.UTF_8)) {
|
||||||
|
writer.write("2014-01-01T00:00:10Z,a,1\n");
|
||||||
|
writer.write("2014-01-01T01:00:20Z,b,1\n");
|
||||||
|
writer.write("2014-01-01T02:00:30Z,c,1\n");
|
||||||
|
}
|
||||||
|
|
||||||
|
final IndexTask indexTask = new IndexTask(
|
||||||
|
null,
|
||||||
|
null,
|
||||||
|
createIngestionSpec(
|
||||||
|
tmpDir,
|
||||||
|
null,
|
||||||
|
null,
|
||||||
|
createTuningConfigWithNumShards(2, ImmutableList.of("dim"), false, true),
|
||||||
|
false
|
||||||
|
),
|
||||||
|
null,
|
||||||
|
AuthTestUtils.TEST_AUTHORIZER_MAPPER,
|
||||||
|
null,
|
||||||
|
rowIngestionMetersFactory
|
||||||
|
);
|
||||||
|
|
||||||
|
runTask(indexTask);
|
||||||
|
|
||||||
|
Assert.assertEquals(2, segments.size());
|
||||||
|
|
||||||
|
for (DataSegment segment : segments) {
|
||||||
|
Assert.assertEquals("test", segment.getDataSource());
|
||||||
|
Assert.assertEquals(Intervals.of("2014/P1D"), segment.getInterval());
|
||||||
|
Assert.assertEquals(HashBasedNumberedShardSpec.class, segment.getShardSpec().getClass());
|
||||||
|
|
||||||
|
final File segmentFile = segmentLoader.getSegmentFiles(segment);
|
||||||
|
|
||||||
|
final WindowedStorageAdapter adapter = new WindowedStorageAdapter(
|
||||||
|
new QueryableIndexStorageAdapter(indexIO.loadIndex(segmentFile)),
|
||||||
|
segment.getInterval()
|
||||||
|
);
|
||||||
|
|
||||||
|
final Sequence<Cursor> cursorSequence = adapter.getAdapter().makeCursors(
|
||||||
|
null,
|
||||||
|
segment.getInterval(),
|
||||||
|
VirtualColumns.EMPTY,
|
||||||
|
Granularities.ALL,
|
||||||
|
false,
|
||||||
|
null
|
||||||
|
);
|
||||||
|
final List<Integer> hashes = cursorSequence
|
||||||
|
.map(cursor -> {
|
||||||
|
final DimensionSelector selector = cursor.getColumnSelectorFactory()
|
||||||
|
.makeDimensionSelector(new DefaultDimensionSpec("dim", "dim"));
|
||||||
|
try {
|
||||||
|
final int hash = HashBasedNumberedShardSpec.hash(
|
||||||
|
jsonMapper,
|
||||||
|
Collections.singletonList(selector.getObject())
|
||||||
|
);
|
||||||
|
cursor.advance();
|
||||||
|
return hash;
|
||||||
|
}
|
||||||
|
catch (JsonProcessingException e) {
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
|
})
|
||||||
|
.toList();
|
||||||
|
|
||||||
|
Assert.assertTrue(hashes.stream().allMatch(h -> h.intValue() == hashes.get(0)));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testAppendToExisting() throws Exception
|
public void testAppendToExisting() throws Exception
|
||||||
{
|
{
|
||||||
|
@ -431,7 +568,7 @@ public class IndexTaskTest
|
||||||
tmpDir,
|
tmpDir,
|
||||||
null,
|
null,
|
||||||
null,
|
null,
|
||||||
createTuningConfig(2, null, false, false),
|
createTuningConfigWithTargetPartitionSize(2, false, false),
|
||||||
true
|
true
|
||||||
),
|
),
|
||||||
null,
|
null,
|
||||||
|
@ -481,7 +618,7 @@ public class IndexTaskTest
|
||||||
Granularities.MINUTE,
|
Granularities.MINUTE,
|
||||||
null
|
null
|
||||||
),
|
),
|
||||||
createTuningConfig(2, null, false, true),
|
createTuningConfigWithTargetPartitionSize(2, false, true),
|
||||||
false
|
false
|
||||||
),
|
),
|
||||||
null,
|
null,
|
||||||
|
@ -544,7 +681,7 @@ public class IndexTaskTest
|
||||||
0
|
0
|
||||||
),
|
),
|
||||||
null,
|
null,
|
||||||
createTuningConfig(2, null, false, true),
|
createTuningConfigWithTargetPartitionSize(2, false, true),
|
||||||
false
|
false
|
||||||
),
|
),
|
||||||
null,
|
null,
|
||||||
|
@ -596,7 +733,7 @@ public class IndexTaskTest
|
||||||
0
|
0
|
||||||
),
|
),
|
||||||
null,
|
null,
|
||||||
createTuningConfig(2, null, false, true),
|
createTuningConfigWithTargetPartitionSize(2, false, true),
|
||||||
false
|
false
|
||||||
),
|
),
|
||||||
null,
|
null,
|
||||||
|
@ -643,7 +780,7 @@ public class IndexTaskTest
|
||||||
Granularities.MINUTE,
|
Granularities.MINUTE,
|
||||||
null
|
null
|
||||||
),
|
),
|
||||||
createTuningConfig(2, 2, null, 2L, null, false, false, true),
|
createTuningConfig(2, 2, null, 2L, null, null, false, false, true),
|
||||||
false
|
false
|
||||||
),
|
),
|
||||||
null,
|
null,
|
||||||
|
@ -688,7 +825,7 @@ public class IndexTaskTest
|
||||||
true,
|
true,
|
||||||
null
|
null
|
||||||
),
|
),
|
||||||
createTuningConfig(3, 2, null, 2L, null, false, true, true),
|
createTuningConfig(3, 2, null, 2L, null, null, false, true, true),
|
||||||
false
|
false
|
||||||
),
|
),
|
||||||
null,
|
null,
|
||||||
|
@ -732,7 +869,7 @@ public class IndexTaskTest
|
||||||
true,
|
true,
|
||||||
null
|
null
|
||||||
),
|
),
|
||||||
createTuningConfig(3, 2, null, 2L, null, false, false, true),
|
createTuningConfig(3, 2, null, 2L, null, null, false, false, true),
|
||||||
false
|
false
|
||||||
),
|
),
|
||||||
null,
|
null,
|
||||||
|
@ -805,7 +942,7 @@ public class IndexTaskTest
|
||||||
0
|
0
|
||||||
),
|
),
|
||||||
null,
|
null,
|
||||||
createTuningConfig(2, null, null, null, null, false, false, false), // ignore parse exception,
|
createTuningConfig(2, null, null, null, null, null, false, false, false), // ignore parse exception,
|
||||||
false
|
false
|
||||||
);
|
);
|
||||||
|
|
||||||
|
@ -858,7 +995,7 @@ public class IndexTaskTest
|
||||||
0
|
0
|
||||||
),
|
),
|
||||||
null,
|
null,
|
||||||
createTuningConfig(2, null, null, null, null, false, false, true), // report parse exception
|
createTuningConfig(2, null, null, null, null, null, false, false, true), // report parse exception
|
||||||
false
|
false
|
||||||
);
|
);
|
||||||
|
|
||||||
|
@ -912,6 +1049,7 @@ public class IndexTaskTest
|
||||||
null,
|
null,
|
||||||
null,
|
null,
|
||||||
null,
|
null,
|
||||||
|
null,
|
||||||
indexSpec,
|
indexSpec,
|
||||||
null,
|
null,
|
||||||
true,
|
true,
|
||||||
|
@ -1033,6 +1171,7 @@ public class IndexTaskTest
|
||||||
null,
|
null,
|
||||||
null,
|
null,
|
||||||
null,
|
null,
|
||||||
|
null,
|
||||||
indexSpec,
|
indexSpec,
|
||||||
null,
|
null,
|
||||||
true,
|
true,
|
||||||
|
@ -1147,6 +1286,7 @@ public class IndexTaskTest
|
||||||
null,
|
null,
|
||||||
null,
|
null,
|
||||||
null,
|
null,
|
||||||
|
null,
|
||||||
indexSpec,
|
indexSpec,
|
||||||
null,
|
null,
|
||||||
true,
|
true,
|
||||||
|
@ -1283,7 +1423,7 @@ public class IndexTaskTest
|
||||||
0
|
0
|
||||||
),
|
),
|
||||||
null,
|
null,
|
||||||
createTuningConfig(2, 1, null, null, null, false, true, true), // report parse exception
|
createTuningConfig(2, 1, null, null, null, null, false, true, true), // report parse exception
|
||||||
false
|
false
|
||||||
);
|
);
|
||||||
|
|
||||||
|
@ -1353,7 +1493,7 @@ public class IndexTaskTest
|
||||||
0
|
0
|
||||||
),
|
),
|
||||||
null,
|
null,
|
||||||
createTuningConfig(2, null, null, null, null, false, false, true), // report parse exception
|
createTuningConfig(2, null, null, null, null, null, false, false, true), // report parse exception
|
||||||
false
|
false
|
||||||
);
|
);
|
||||||
|
|
||||||
|
@ -1392,8 +1532,6 @@ public class IndexTaskTest
|
||||||
|
|
||||||
private Pair<TaskStatus, List<DataSegment>> runTask(IndexTask indexTask) throws Exception
|
private Pair<TaskStatus, List<DataSegment>> runTask(IndexTask indexTask) throws Exception
|
||||||
{
|
{
|
||||||
final List<DataSegment> segments = Lists.newArrayList();
|
|
||||||
|
|
||||||
final TaskActionClient actionClient = new TaskActionClient()
|
final TaskActionClient actionClient = new TaskActionClient()
|
||||||
{
|
{
|
||||||
@Override
|
@Override
|
||||||
|
@ -1450,35 +1588,6 @@ public class IndexTaskTest
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
final DataSegmentPusher pusher = new DataSegmentPusher()
|
|
||||||
{
|
|
||||||
@Deprecated
|
|
||||||
@Override
|
|
||||||
public String getPathForHadoop(String dataSource)
|
|
||||||
{
|
|
||||||
return getPathForHadoop();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public String getPathForHadoop()
|
|
||||||
{
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public DataSegment push(File file, DataSegment segment, boolean useUniquePath)
|
|
||||||
{
|
|
||||||
segments.add(segment);
|
|
||||||
return segment;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public Map<String, Object> makeLoadSpec(URI uri)
|
|
||||||
{
|
|
||||||
throw new UnsupportedOperationException();
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
final DataSegmentKiller killer = new DataSegmentKiller()
|
final DataSegmentKiller killer = new DataSegmentKiller()
|
||||||
{
|
{
|
||||||
@Override
|
@Override
|
||||||
|
@ -1526,7 +1635,14 @@ public class IndexTaskTest
|
||||||
indexTask.isReady(box.getTaskActionClient());
|
indexTask.isReady(box.getTaskActionClient());
|
||||||
TaskStatus status = indexTask.run(box);
|
TaskStatus status = indexTask.run(box);
|
||||||
|
|
||||||
Collections.sort(segments);
|
segments.sort((s1, s2) -> {
|
||||||
|
final int comp = Comparators.intervalsByStartThenEnd().compare(s1.getInterval(), s2.getInterval());
|
||||||
|
if (comp != 0) {
|
||||||
|
return comp;
|
||||||
|
}
|
||||||
|
//noinspection SubtractionInCompareTo
|
||||||
|
return s1.getShardSpec().getPartitionNum() - s2.getShardSpec().getPartitionNum();
|
||||||
|
});
|
||||||
|
|
||||||
return Pair.of(status, segments);
|
return Pair.of(status, segments);
|
||||||
}
|
}
|
||||||
|
@ -1584,9 +1700,8 @@ public class IndexTaskTest
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
private static IndexTuningConfig createTuningConfig(
|
private static IndexTuningConfig createTuningConfigWithTargetPartitionSize(
|
||||||
Integer targetPartitionSize,
|
int targetPartitionSize,
|
||||||
Integer numShards,
|
|
||||||
boolean forceExtendableShardSpecs,
|
boolean forceExtendableShardSpecs,
|
||||||
boolean forceGuaranteedRollup
|
boolean forceGuaranteedRollup
|
||||||
)
|
)
|
||||||
|
@ -1596,7 +1711,28 @@ public class IndexTaskTest
|
||||||
1,
|
1,
|
||||||
null,
|
null,
|
||||||
null,
|
null,
|
||||||
|
null,
|
||||||
|
null,
|
||||||
|
forceExtendableShardSpecs,
|
||||||
|
forceGuaranteedRollup,
|
||||||
|
true
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static IndexTuningConfig createTuningConfigWithNumShards(
|
||||||
|
int numShards,
|
||||||
|
@Nullable List<String> partitionDimensions,
|
||||||
|
boolean forceExtendableShardSpecs,
|
||||||
|
boolean forceGuaranteedRollup
|
||||||
|
)
|
||||||
|
{
|
||||||
|
return createTuningConfig(
|
||||||
|
null,
|
||||||
|
1,
|
||||||
|
null,
|
||||||
|
null,
|
||||||
numShards,
|
numShards,
|
||||||
|
partitionDimensions,
|
||||||
forceExtendableShardSpecs,
|
forceExtendableShardSpecs,
|
||||||
forceGuaranteedRollup,
|
forceGuaranteedRollup,
|
||||||
true
|
true
|
||||||
|
@ -1604,11 +1740,12 @@ public class IndexTaskTest
|
||||||
}
|
}
|
||||||
|
|
||||||
private static IndexTuningConfig createTuningConfig(
|
private static IndexTuningConfig createTuningConfig(
|
||||||
Integer targetPartitionSize,
|
@Nullable Integer targetPartitionSize,
|
||||||
Integer maxRowsInMemory,
|
@Nullable Integer maxRowsInMemory,
|
||||||
Long maxBytesInMemory,
|
@Nullable Long maxBytesInMemory,
|
||||||
Long maxTotalRows,
|
@Nullable Long maxTotalRows,
|
||||||
Integer numShards,
|
@Nullable Integer numShards,
|
||||||
|
@Nullable List<String> partitionDimensions,
|
||||||
boolean forceExtendableShardSpecs,
|
boolean forceExtendableShardSpecs,
|
||||||
boolean forceGuaranteedRollup,
|
boolean forceGuaranteedRollup,
|
||||||
boolean reportParseException
|
boolean reportParseException
|
||||||
|
@ -1621,6 +1758,7 @@ public class IndexTaskTest
|
||||||
maxTotalRows,
|
maxTotalRows,
|
||||||
null,
|
null,
|
||||||
numShards,
|
numShards,
|
||||||
|
partitionDimensions,
|
||||||
indexSpec,
|
indexSpec,
|
||||||
null,
|
null,
|
||||||
true,
|
true,
|
||||||
|
|
|
@ -193,6 +193,7 @@ public class TaskSerdeTest
|
||||||
null,
|
null,
|
||||||
9999,
|
9999,
|
||||||
null,
|
null,
|
||||||
|
null,
|
||||||
indexSpec,
|
indexSpec,
|
||||||
3,
|
3,
|
||||||
true,
|
true,
|
||||||
|
@ -278,6 +279,7 @@ public class TaskSerdeTest
|
||||||
null,
|
null,
|
||||||
null,
|
null,
|
||||||
null,
|
null,
|
||||||
|
null,
|
||||||
indexSpec,
|
indexSpec,
|
||||||
3,
|
3,
|
||||||
true,
|
true,
|
||||||
|
|
|
@ -692,6 +692,7 @@ public class TaskLifecycleTest
|
||||||
null,
|
null,
|
||||||
null,
|
null,
|
||||||
null,
|
null,
|
||||||
|
null,
|
||||||
indexSpec,
|
indexSpec,
|
||||||
3,
|
3,
|
||||||
true,
|
true,
|
||||||
|
@ -772,6 +773,7 @@ public class TaskLifecycleTest
|
||||||
null,
|
null,
|
||||||
null,
|
null,
|
||||||
null,
|
null,
|
||||||
|
null,
|
||||||
indexSpec,
|
indexSpec,
|
||||||
3,
|
3,
|
||||||
true,
|
true,
|
||||||
|
@ -1159,6 +1161,7 @@ public class TaskLifecycleTest
|
||||||
null,
|
null,
|
||||||
null,
|
null,
|
||||||
null,
|
null,
|
||||||
|
null,
|
||||||
indexSpec,
|
indexSpec,
|
||||||
null,
|
null,
|
||||||
false,
|
false,
|
||||||
|
|
|
@ -25,6 +25,7 @@ import com.fasterxml.jackson.annotation.JsonIgnore;
|
||||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
import com.google.common.base.Throwables;
|
import com.google.common.base.Throwables;
|
||||||
import com.google.common.collect.ImmutableList;
|
import com.google.common.collect.ImmutableList;
|
||||||
import com.google.common.collect.Lists;
|
import com.google.common.collect.Lists;
|
||||||
|
@ -74,7 +75,7 @@ public class HashBasedNumberedShardSpec extends NumberedShardSpec
|
||||||
{
|
{
|
||||||
final List<Object> groupKey = getGroupKey(timestamp, inputRow);
|
final List<Object> groupKey = getGroupKey(timestamp, inputRow);
|
||||||
try {
|
try {
|
||||||
return hashFunction.hashBytes(jsonMapper.writeValueAsBytes(groupKey)).asInt();
|
return hash(jsonMapper, groupKey);
|
||||||
}
|
}
|
||||||
catch (JsonProcessingException e) {
|
catch (JsonProcessingException e) {
|
||||||
throw Throwables.propagate(e);
|
throw Throwables.propagate(e);
|
||||||
|
@ -93,6 +94,12 @@ public class HashBasedNumberedShardSpec extends NumberedShardSpec
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
public static int hash(ObjectMapper jsonMapper, List<Object> objects) throws JsonProcessingException
|
||||||
|
{
|
||||||
|
return hashFunction.hashBytes(jsonMapper.writeValueAsBytes(objects)).asInt();
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String toString()
|
public String toString()
|
||||||
{
|
{
|
||||||
|
|
Loading…
Reference in New Issue