Hadoop based batch ingestion support range partition (#13303)

This pr implements range partitioning for hadoop-based ingestion. For detail about multi dimension range partition can be seen #11848.
This commit is contained in:
hqx871 2023-02-23 14:08:03 +08:00 committed by GitHub
parent 17a3cd0b68
commit 79f04e71a1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 664 additions and 72 deletions

View File

@ -19,10 +19,12 @@
package org.apache.druid.indexer;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.base.Joiner;
import com.google.common.base.Optional;
import com.google.common.base.Splitter;
import com.google.common.collect.ComparisonChain;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
@ -33,12 +35,17 @@ import com.google.common.io.Closeables;
import org.apache.druid.collections.CombiningIterable;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.Rows;
import org.apache.druid.data.input.StringTuple;
import org.apache.druid.data.input.impl.DimensionSchema;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.indexer.partitions.DimensionRangePartitionsSpec;
import org.apache.druid.indexer.partitions.SingleDimensionPartitionsSpec;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.timeline.partition.DimensionRangeShardSpec;
import org.apache.druid.timeline.partition.ShardSpec;
import org.apache.druid.timeline.partition.SingleDimensionShardSpec;
import org.apache.hadoop.conf.Configurable;
@ -71,6 +78,8 @@ import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
@ -95,7 +104,7 @@ public class DeterminePartitionsJob implements Jobby
private static final Logger log = new Logger(DeterminePartitionsJob.class);
private static final Joiner TAB_JOINER = HadoopDruidIndexerConfig.TAB_JOINER;
private static final Splitter TAB_SPLITTER = HadoopDruidIndexerConfig.TAB_SPLITTER;
private static final Joiner COMMA_JOINER = Joiner.on(",").useForNull("");
private final HadoopDruidIndexerConfig config;
@ -119,15 +128,15 @@ public class DeterminePartitionsJob implements Jobby
* in the final segment.
*/
if (!(config.getPartitionsSpec() instanceof SingleDimensionPartitionsSpec)) {
if (!(config.getPartitionsSpec() instanceof DimensionRangePartitionsSpec)) {
throw new ISE(
"DeterminePartitionsJob can only be run for SingleDimensionPartitionsSpec, partitionSpec found [%s]",
"DeterminePartitionsJob can only be run for DimensionRangePartitionsSpec, partitionSpec found [%s]",
config.getPartitionsSpec()
);
}
final SingleDimensionPartitionsSpec partitionsSpec =
(SingleDimensionPartitionsSpec) config.getPartitionsSpec();
final DimensionRangePartitionsSpec partitionsSpec =
(DimensionRangePartitionsSpec) config.getPartitionsSpec();
if (!partitionsSpec.isAssumeGrouped()) {
groupByJob = Job.getInstance(
@ -382,8 +391,7 @@ public class DeterminePartitionsJob implements Jobby
protected void setup(Context context)
{
final HadoopDruidIndexerConfig config = HadoopDruidIndexerConfig.fromConfiguration(context.getConfiguration());
SingleDimensionPartitionsSpec spec = (SingleDimensionPartitionsSpec) config.getPartitionsSpec();
helper = new DeterminePartitionsDimSelectionMapperHelper(config, spec.getPartitionDimension());
helper = new DeterminePartitionsDimSelectionMapperHelper(config);
}
@Override
@ -412,8 +420,7 @@ public class DeterminePartitionsJob implements Jobby
{
super.setup(context);
final HadoopDruidIndexerConfig config = HadoopDruidIndexerConfig.fromConfiguration(context.getConfiguration());
final SingleDimensionPartitionsSpec spec = (SingleDimensionPartitionsSpec) config.getPartitionsSpec();
helper = new DeterminePartitionsDimSelectionMapperHelper(config, spec.getPartitionDimension());
helper = new DeterminePartitionsDimSelectionMapperHelper(config);
}
@Override
@ -437,13 +444,25 @@ public class DeterminePartitionsJob implements Jobby
static class DeterminePartitionsDimSelectionMapperHelper
{
private final HadoopDruidIndexerConfig config;
private final String partitionDimension;
private final List<List<String>> dimensionGroupingSet;
private final Map<Long, Integer> intervalIndexes;
DeterminePartitionsDimSelectionMapperHelper(HadoopDruidIndexerConfig config, String partitionDimension)
DeterminePartitionsDimSelectionMapperHelper(HadoopDruidIndexerConfig config)
{
this.config = config;
this.partitionDimension = partitionDimension;
DimensionRangePartitionsSpec spec = (DimensionRangePartitionsSpec) config.getPartitionsSpec();
final DimensionsSpec dimensionsSpec = config.getSchema().getDataSchema().getDimensionsSpec();
this.dimensionGroupingSet = new ArrayList<>();
final List<String> partitionDimensions = spec.getPartitionDimensions();
//if the partitionDimensions is not set, we just try every dimension to find the best one.
if (partitionDimensions.isEmpty()) {
for (DimensionSchema dimensionSchema : dimensionsSpec.getDimensions()) {
dimensionGroupingSet.add(Collections.singletonList(dimensionSchema.getName()));
}
} else {
dimensionGroupingSet.add(partitionDimensions);
}
final ImmutableMap.Builder<Long, Integer> timeIndexBuilder = ImmutableMap.builder();
int idx = 0;
@ -476,22 +495,29 @@ public class DeterminePartitionsJob implements Jobby
final byte[] groupKey = buf.array();
// Emit row-counter value.
write(context, groupKey, new DimValueCount("", "", 1));
write(context, groupKey, new DimValueCount(Collections.emptyList(), StringTuple.create(), 1));
for (final Map.Entry<String, Iterable<String>> dimAndValues : dims.entrySet()) {
final String dim = dimAndValues.getKey();
if (partitionDimension == null || partitionDimension.equals(dim)) {
final Iterable<String> dimValues = dimAndValues.getValue();
if (Iterables.size(dimValues) == 1) {
// Emit this value.
write(context, groupKey, new DimValueCount(dim, Iterables.getOnlyElement(dimValues), 1));
Iterator<List<String>> dimensionGroupIterator = dimensionGroupingSet.iterator();
while (dimensionGroupIterator.hasNext()) {
List<String> dimensionGroup = dimensionGroupIterator.next();
String[] values = new String[dimensionGroup.size()];
int numRow = 1;
for (int i = 0; i < dimensionGroup.size(); i++) {
String dimension = dimensionGroup.get(i);
final Iterable<String> dimValues = dims.get(dimension);
if (dimValues != null && Iterables.size(dimValues) == 1) {
values[i] = Iterables.getOnlyElement(dimValues);
} else if (dimValues == null || Iterables.isEmpty(dimValues)) {
//just let values[i] be null when the dim value is null.
} else {
// This dimension is unsuitable for partitioning. Poison it by emitting a negative value.
write(context, groupKey, new DimValueCount(dim, "", -1));
numRow = -1;
}
}
write(context, groupKey, new DimValueCount(dimensionGroup, StringTuple.create(values), numRow));
if (numRow == -1) {
dimensionGroupIterator.remove();
}
}
}
}
@ -572,12 +598,30 @@ public class DeterminePartitionsJob implements Jobby
private static Iterable<DimValueCount> combineRows(Iterable<Text> input)
{
final Comparator<List<String>> dimsComparator = new Comparator<List<String>>()
{
@Override
public int compare(List<String> o1, List<String> o2)
{
int len = Math.min(o1.size(), o2.size());
for (int i = 0; i < len; i++) {
int comparison = o1.get(i).compareTo(o2.get(i));
if (comparison != 0) {
return comparison;
}
}
return Integer.compare(o1.size(), o2.size());
}
};
return new CombiningIterable<>(
Iterables.transform(
input,
DimValueCount::fromText
),
(o1, o2) -> ComparisonChain.start().compare(o1.dim, o2.dim).compare(o1.value, o2.value).result(),
(o1, o2) -> ComparisonChain.start()
.compare(o1.dims, o2.dims, dimsComparator)
.compare(o1.values, o2.values)
.result(),
(arg1, arg2) -> {
if (arg2 == null) {
return arg1;
@ -585,7 +629,7 @@ public class DeterminePartitionsJob implements Jobby
// Respect "poisoning" (negative values mean we can't use this dimension)
final long newNumRows = (arg1.numRows >= 0 && arg2.numRows >= 0 ? arg1.numRows + arg2.numRows : -1);
return new DimValueCount(arg1.dim, arg1.value, newNumRows);
return new DimValueCount(arg1.dims, arg1.values, newNumRows);
}
);
}
@ -603,6 +647,28 @@ public class DeterminePartitionsJob implements Jobby
}
}
static DimensionRangeShardSpec createShardSpec(
boolean isSingleDim,
List<String> dimensions,
@Nullable StringTuple start,
@Nullable StringTuple end,
int partitionNum,
@Nullable Integer numCorePartitions
)
{
if (isSingleDim) {
return new SingleDimensionShardSpec(
Iterables.getOnlyElement(dimensions),
start == null ? null : start.get(0),
end == null ? null : end.get(0),
partitionNum,
numCorePartitions
);
} else {
return new DimensionRangeShardSpec(dimensions, start, end, partitionNum, numCorePartitions);
}
}
public static class DeterminePartitionsDimSelectionReducer extends DeterminePartitionsDimSelectionBaseReducer
{
private static final double SHARD_COMBINE_THRESHOLD = 0.25;
@ -626,25 +692,26 @@ public class DeterminePartitionsJob implements Jobby
final DimValueCount firstDvc = iterator.next();
final long totalRows = firstDvc.numRows;
if (!"".equals(firstDvc.dim) || !"".equals(firstDvc.value)) {
if (!firstDvc.dims.isEmpty() || firstDvc.values.size() != 0) {
throw new IllegalStateException("Expected total row indicator on first k/v pair");
}
// "iterator" will now take us over many candidate dimensions
DimPartitions currentDimPartitions = null;
DimPartition currentDimPartition = null;
String currentDimPartitionStart = null;
StringTuple currentDimPartitionStart = null;
boolean currentDimSkip = false;
// We'll store possible partitions in here
final Map<String, DimPartitions> dimPartitionss = new HashMap<>();
final Map<List<String>, DimPartitions> dimPartitionss = new HashMap<>();
final DimensionRangePartitionsSpec partitionsSpec = (DimensionRangePartitionsSpec) config.getPartitionsSpec();
while (iterator.hasNext()) {
final DimValueCount dvc = iterator.next();
if (currentDimPartitions == null || !currentDimPartitions.dim.equals(dvc.dim)) {
if (currentDimPartitions == null || !currentDimPartitions.dims.equals(dvc.dims)) {
// Starting a new dimension! Exciting!
currentDimPartitions = new DimPartitions(dvc.dim);
currentDimPartitions = new DimPartitions(dvc.dims);
currentDimPartition = new DimPartition();
currentDimPartitionStart = null;
currentDimSkip = false;
@ -652,7 +719,7 @@ public class DeterminePartitionsJob implements Jobby
// Respect poisoning
if (!currentDimSkip && dvc.numRows < 0) {
log.info("Cannot partition on multi-value dimension: %s", dvc.dim);
log.info("Cannot partition on multi-value dimension: %s", dvc.dims);
currentDimSkip = true;
}
@ -662,10 +729,11 @@ public class DeterminePartitionsJob implements Jobby
// See if we need to cut a new partition ending immediately before this dimension value
if (currentDimPartition.rows > 0 && currentDimPartition.rows + dvc.numRows > config.getTargetPartitionSize()) {
final ShardSpec shardSpec = new SingleDimensionShardSpec(
currentDimPartitions.dim,
final ShardSpec shardSpec = createShardSpec(
partitionsSpec instanceof SingleDimensionPartitionsSpec,
currentDimPartitions.dims,
currentDimPartitionStart,
dvc.value,
dvc.values,
currentDimPartitions.partitions.size(),
// Set unknown core partitions size so that the legacy way is used for checking partitionHolder
// completeness. See SingleDimensionShardSpec.createChunk().
@ -682,14 +750,14 @@ public class DeterminePartitionsJob implements Jobby
currentDimPartition.shardSpec = shardSpec;
currentDimPartitions.partitions.add(currentDimPartition);
currentDimPartition = new DimPartition();
currentDimPartitionStart = dvc.value;
currentDimPartitionStart = dvc.values;
}
// Update counters
currentDimPartition.cardinality++;
currentDimPartition.rows += dvc.numRows;
if (!iterator.hasNext() || !currentDimPartitions.dim.equals(iterator.peek().dim)) {
if (!iterator.hasNext() || !currentDimPartitions.dims.equals(iterator.peek().dims)) {
// Finalize the current dimension
if (currentDimPartition.rows > 0) {
@ -703,11 +771,12 @@ public class DeterminePartitionsJob implements Jobby
currentDimPartitions.partitions.size() - 1
);
final SingleDimensionShardSpec previousShardSpec = (SingleDimensionShardSpec) previousDimPartition.shardSpec;
final DimensionRangeShardSpec previousShardSpec = (DimensionRangeShardSpec) previousDimPartition.shardSpec;
shardSpec = new SingleDimensionShardSpec(
currentDimPartitions.dim,
previousShardSpec.getStart(),
shardSpec = createShardSpec(
partitionsSpec instanceof SingleDimensionPartitionsSpec,
currentDimPartitions.dims,
previousShardSpec.getStartTuple(),
null,
previousShardSpec.getPartitionNum(),
// Set unknown core partitions size so that the legacy way is used for checking partitionHolder
@ -721,8 +790,9 @@ public class DeterminePartitionsJob implements Jobby
currentDimPartition.cardinality += previousDimPartition.cardinality;
} else {
// Create new shard
shardSpec = new SingleDimensionShardSpec(
currentDimPartitions.dim,
shardSpec = createShardSpec(
partitionsSpec instanceof SingleDimensionPartitionsSpec,
currentDimPartitions.dims,
currentDimPartitionStart,
null,
currentDimPartitions.partitions.size(),
@ -745,13 +815,13 @@ public class DeterminePartitionsJob implements Jobby
log.info(
"Completed dimension[%s]: %,d possible shards with %,d unique values",
currentDimPartitions.dim,
currentDimPartitions.dims,
currentDimPartitions.partitions.size(),
currentDimPartitions.getCardinality()
);
// Add ourselves to the partitions map
dimPartitionss.put(currentDimPartitions.dim, currentDimPartitions);
dimPartitionss.put(currentDimPartitions.dims, currentDimPartitions);
}
}
@ -769,7 +839,7 @@ public class DeterminePartitionsJob implements Jobby
if (dimPartitions.getRows() != totalRows) {
log.info(
"Dimension[%s] is not present in all rows (row count %,d != expected row count %,d)",
dimPartitions.dim,
dimPartitions.dims,
dimPartitions.getRows(),
totalRows
);
@ -779,11 +849,9 @@ public class DeterminePartitionsJob implements Jobby
// Make sure none of these shards are oversized
boolean oversized = false;
final SingleDimensionPartitionsSpec partitionsSpec =
(SingleDimensionPartitionsSpec) config.getPartitionsSpec();
for (final DimPartition partition : dimPartitions.partitions) {
if (partition.rows > partitionsSpec.getMaxRowsPerSegment()) {
log.info("Dimension[%s] has an oversized shard: %s", dimPartitions.dim, partition.shardSpec);
log.info("Dimension[%s] has an oversized shard: %s", dimPartitions.dims, partition.shardSpec);
oversized = true;
}
}
@ -877,12 +945,12 @@ public class DeterminePartitionsJob implements Jobby
private static class DimPartitions
{
public final String dim;
public final List<String> dims;
public final List<DimPartition> partitions = new ArrayList<>();
private DimPartitions(String dim)
private DimPartitions(List<String> dims)
{
this.dim = dim;
this.dims = dims;
}
int getCardinality()
@ -923,32 +991,46 @@ public class DeterminePartitionsJob implements Jobby
public long rows = 0;
}
private static class DimValueCount
public static class DimValueCount
{
public final String dim;
public final String value;
public final long numRows;
@JsonProperty
private final List<String> dims;
@JsonProperty
private final StringTuple values;
@JsonProperty
private final long numRows;
private DimValueCount(String dim, String value, long numRows)
@JsonCreator
public DimValueCount(
@JsonProperty("dims") List<String> dims,
@JsonProperty("values") StringTuple values,
@JsonProperty("numRows") long numRows
)
{
this.dim = dim;
this.value = value;
this.dims = dims;
this.values = values;
this.numRows = numRows;
}
Text toText()
public Text toText()
{
return new Text(TAB_JOINER.join(dim, String.valueOf(numRows), value));
try {
String jsonValue = HadoopDruidIndexerConfig.JSON_MAPPER.writeValueAsString(this);
return new Text(jsonValue);
}
catch (JsonProcessingException e) {
throw new ISE(e, "to json %s error", toString());
}
}
static DimValueCount fromText(Text text)
public static DimValueCount fromText(Text text)
{
final Iterator<String> splits = TAB_SPLITTER.limit(3).split(text.toString()).iterator();
final String dim = splits.next();
final long numRows = Long.parseLong(splits.next());
final String value = splits.next();
return new DimValueCount(dim, value, numRows);
try {
return HadoopDruidIndexerConfig.JSON_MAPPER.readValue(text.toString(), DimValueCount.class);
}
catch (IOException e) {
throw new ISE(e, "parse json %s error", text.toString());
}
}
}
@ -959,8 +1041,10 @@ public class DeterminePartitionsJob implements Jobby
)
throws IOException, InterruptedException
{
byte[] sortKey = TAB_JOINER.join(dimValueCount.dim, dimValueCount.value)
.getBytes(HadoopDruidIndexerConfig.JAVA_NATIVE_CHARSET);
byte[] sortKey = TAB_JOINER.join(
COMMA_JOINER.join(dimValueCount.dims),
COMMA_JOINER.join(dimValueCount.values.toArray())
).getBytes(HadoopDruidIndexerConfig.JAVA_NATIVE_CHARSET);
context.write(new SortableBytes(groupKey, sortKey).toBytesWritable(), dimValueCount.toText());
}
}

View File

@ -21,9 +21,9 @@ package org.apache.druid.indexer;
import com.google.common.collect.Lists;
import com.google.inject.Inject;
import org.apache.druid.indexer.partitions.DimensionRangePartitionsSpec;
import org.apache.druid.indexer.partitions.HashedPartitionsSpec;
import org.apache.druid.indexer.partitions.PartitionsSpec;
import org.apache.druid.indexer.partitions.SingleDimensionPartitionsSpec;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.timeline.partition.HashBasedNumberedShardSpec;
@ -114,7 +114,7 @@ public class HadoopDruidDetermineConfigurationJob implements Jobby
final PartitionsSpec partitionsSpec = config.getPartitionsSpec();
if (partitionsSpec instanceof HashedPartitionsSpec) {
return new DetermineHashedPartitionsJob(config);
} else if (partitionsSpec instanceof SingleDimensionPartitionsSpec) {
} else if (partitionsSpec instanceof DimensionRangePartitionsSpec) {
return new DeterminePartitionsJob(config);
} else {
throw new ISE("Unknown partitionsSpec[%s]", partitionsSpec);

View File

@ -0,0 +1,450 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.indexer;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.data.input.impl.CSVParseSpec;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.StringInputRowParser;
import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.indexer.partitions.DimensionRangePartitionsSpec;
import org.apache.druid.java.util.common.FileUtils;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
import org.apache.druid.timeline.partition.DimensionRangeShardSpec;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Map;
@RunWith(Parameterized.class)
public class DetermineRangePartitionsJobTest
{
private HadoopDruidIndexerConfig config;
private int expectedNumOfSegments;
private int[] expectedNumOfShardsForEachSegment;
private String[][][][] expectedStartEndForEachShard;
private File dataFile;
private File tmpDir;
@Parameterized.Parameters(name = "assumeGrouped={0}, "
+ "targetPartitionSize={1}, "
+ "interval={2}"
+ "expectedNumOfSegments={3}, "
+ "expectedNumOfShardsForEachSegment={4}, "
+ "expectedStartEndForEachShard={5}, "
+ "data={6}, "
+ "partitionDimensions={7}")
public static Collection<Object[]> constructFeed()
{
return Arrays.asList(
new Object[][]{
{
false,
3,
"2014-10-20T00:00:00Z/P1D",
1,
new int[]{4},
new String[][][][]{
{
{null, {"d.example.com"}},
{{"d.example.com"}, {"g.example.com"}},
{{"g.example.com"}, {"j.example.com"}},
{{"j.example.com"}, null},
}
},
ImmutableList.of(
"2014102000,a.example.com,CN,100",
"2014102000,a.example.com,CN,100",
"2014102000,b.exmaple.com,US,50",
"2014102000,b.exmaple.com,US,50",
"2014102000,c.example.com,US,200",
"2014102000,c.example.com,US,200",
"2014102000,d.example.com,US,250",
"2014102000,d.example.com,US,250",
"2014102000,e.example.com,US,123",
"2014102000,e.example.com,US,123",
"2014102000,f.example.com,US,567",
"2014102000,f.example.com,US,567",
"2014102000,g.example.com,US,11",
"2014102000,g.example.com,US,11",
"2014102000,h.example.com,US,251",
"2014102000,h.example.com,US,251",
"2014102000,i.example.com,US,963",
"2014102000,i.example.com,US,963",
"2014102000,j.example.com,US,333",
"2014102000,j.example.com,US,333"
),
ImmutableList.of("host")
},
{
true,
3,
"2014-10-22T00:00:00Z/P1D",
1,
new int[]{4},
new String[][][][]{
{
{null, {"d.example.com", "US"}},
{{"d.example.com", "US"}, {"g.example.com", "US"}},
{{"g.example.com", "US"}, {"j.example.com", "US"}},
{{"j.example.com", "US"}, null}
}
},
ImmutableList.of(
"2014102200,a.example.com,CN,100",
"2014102200,b.exmaple.com,US,50",
"2014102200,c.example.com,US,200",
"2014102200,d.example.com,US,250",
"2014102200,e.example.com,US,123",
"2014102200,f.example.com,US,567",
"2014102200,g.example.com,US,11",
"2014102200,h.example.com,US,251",
"2014102200,i.example.com,US,963",
"2014102200,j.example.com,US,333"
),
ImmutableList.of("host", "country")
},
{
false,
3,
"2014-10-20T00:00:00Z/P1D",
1,
new int[]{4},
new String[][][][]{
{
{null, {"d.example.com", "US"}},
{{"d.example.com", "US"}, {"g.example.com", "US"}},
{{"g.example.com", "US"}, {"j.example.com", "US"}},
{{"j.example.com", "US"}, null}
}
},
ImmutableList.of(
"2014102000,a.example.com,CN,100",
"2014102000,a.example.com,CN,100",
"2014102000,b.exmaple.com,US,50",
"2014102000,b.exmaple.com,US,50",
"2014102000,c.example.com,US,200",
"2014102000,c.example.com,US,200",
"2014102000,d.example.com,US,250",
"2014102000,d.example.com,US,250",
"2014102000,e.example.com,US,123",
"2014102000,e.example.com,US,123",
"2014102000,f.example.com,US,567",
"2014102000,f.example.com,US,567",
"2014102000,g.example.com,US,11",
"2014102000,g.example.com,US,11",
"2014102000,h.example.com,US,251",
"2014102000,h.example.com,US,251",
"2014102000,i.example.com,US,963",
"2014102000,i.example.com,US,963",
"2014102000,j.example.com,US,333",
"2014102000,j.example.com,US,333"
),
ImmutableList.of("host", "country")
},
{
true,
6,
"2014-10-20T00:00:00Z/P3D",
3,
new int[]{2, 2, 2},
new String[][][][]{
{
{null, {"g.example.com", "US"}},
{{"g.example.com", "US"}, null}
},
{
{null, {"g.example.com", "US"}},
{{"g.example.com", "US"}, null}
},
{
{null, {"g.example.com", "US"}},
{{"g.example.com", "US"}, null}
}
},
ImmutableList.of(
"2014102000,a.example.com,CN,100",
"2014102000,b.exmaple.com,CN,50",
"2014102000,c.example.com,CN,200",
"2014102000,d.example.com,US,250",
"2014102000,e.example.com,US,123",
"2014102000,f.example.com,US,567",
"2014102000,g.example.com,US,11",
"2014102000,h.example.com,US,251",
"2014102000,i.example.com,US,963",
"2014102000,j.example.com,US,333",
"2014102000,k.example.com,US,555",
"2014102100,a.example.com,CN,100",
"2014102100,b.exmaple.com,CN,50",
"2014102100,c.example.com,CN,200",
"2014102100,d.example.com,US,250",
"2014102100,e.example.com,US,123",
"2014102100,f.example.com,US,567",
"2014102100,g.example.com,US,11",
"2014102100,h.example.com,US,251",
"2014102100,i.example.com,US,963",
"2014102100,j.example.com,US,333",
"2014102100,k.example.com,US,555",
"2014102200,a.example.com,CN,100",
"2014102200,b.exmaple.com,CN,50",
"2014102200,c.example.com,CN,200",
"2014102200,d.example.com,US,250",
"2014102200,e.example.com,US,123",
"2014102200,f.example.com,US,567",
"2014102200,g.example.com,US,11",
"2014102200,h.example.com,US,251",
"2014102200,i.example.com,US,963",
"2014102200,j.example.com,US,333",
"2014102200,k.example.com,US,555"
),
ImmutableList.of("host", "country")
},
{
false,
2,
"2014-10-20T00:00:00Z/P1D",
1,
new int[]{5},
new String[][][][]{
{
{null, {"c.example.com", null}},
{{"c.example.com", null}, {"e.example.com", "US"}},
{{"e.example.com", "US"}, {"g.example.com", "US"}},
{{"g.example.com", "US"}, {"i.example.com", null}},
{{"i.example.com", null}, null}
}
},
ImmutableList.of(
"2014102000,a.example.com,CN,100",
"2014102000,a.example.com,CN,100",
"2014102000,b.exmaple.com,US,50",
"2014102000,b.exmaple.com,US,50",
"2014102000,c.example.com,,200",
"2014102000,c.example.com,,200",
"2014102000,d.example.com,US,250",
"2014102000,d.example.com,US,250",
"2014102000,e.example.com,US,123",
"2014102000,e.example.com,US,123",
"2014102000,f.example.com,US,567",
"2014102000,f.example.com,US,567",
"2014102000,g.example.com,US,11",
"2014102000,g.example.com,US,11",
"2014102000,h.example.com,US,251",
"2014102000,h.example.com,US,251",
"2014102000,i.example.com,,963",
"2014102000,i.example.com,,963",
"2014102000,j.example.com,US,333",
"2014102000,j.example.com,US,333"
),
ImmutableList.of("host", "country")
},
{
true,
2,
"2014-10-20T00:00:00Z/P1D",
1,
new int[]{5},
new String[][][][]{
{
{null, {"c.example.com", null}},
{{"c.example.com", null}, {"e.example.com", "US"}},
{{"e.example.com", "US"}, {"g.example.com", "US"}},
{{"g.example.com", "US"}, {"i.example.com", null}},
{{"i.example.com", null}, null}
}
},
ImmutableList.of(
"2014102000,a.example.com,CN,100",
"2014102000,b.exmaple.com,US,50",
"2014102000,c.example.com,,200",
"2014102000,d.example.com,US,250",
"2014102000,e.example.com,US,123",
"2014102000,f.example.com,US,567",
"2014102000,g.example.com,US,11",
"2014102000,h.example.com,US,251",
"2014102000,i.example.com,,963",
"2014102000,j.example.com,US,333"
),
ImmutableList.of("host", "country")
}
}
);
}
public DetermineRangePartitionsJobTest(
boolean assumeGrouped,
Integer targetPartitionSize,
String interval,
int expectedNumOfSegments,
int[] expectedNumOfShardsForEachSegment,
String[][][][] expectedStartEndForEachShard,
List<String> data,
List<String> partitionDimensions
) throws IOException
{
this.expectedNumOfSegments = expectedNumOfSegments;
this.expectedNumOfShardsForEachSegment = expectedNumOfShardsForEachSegment;
this.expectedStartEndForEachShard = expectedStartEndForEachShard;
dataFile = Files.createTempFile("test_range_website_data", "tmp").toFile();
dataFile.deleteOnExit();
tmpDir = FileUtils.createTempDir();
tmpDir.deleteOnExit();
org.apache.commons.io.FileUtils.writeLines(dataFile, data);
config = new HadoopDruidIndexerConfig(
new HadoopIngestionSpec(
new DataSchema(
"website",
null,
null,
new AggregatorFactory[]{new LongSumAggregatorFactory("visited_num", "visited_num")},
new UniformGranularitySpec(
Granularities.DAY,
Granularities.NONE,
ImmutableList.of(Intervals.of(interval))
),
null,
HadoopDruidIndexerConfig.JSON_MAPPER.convertValue(
new StringInputRowParser(
new CSVParseSpec(
new TimestampSpec("timestamp", "yyyyMMddHH", null),
new DimensionsSpec(
DimensionsSpec.getDefaultSchemas(ImmutableList.of("host", "country"))
),
null,
ImmutableList.of("timestamp", "host", "country", "visited_num"),
false,
0
),
null
),
Map.class
),
HadoopDruidIndexerConfig.JSON_MAPPER
),
new HadoopIOConfig(
ImmutableMap.of(
"paths",
dataFile.getCanonicalPath(),
"type",
"static"
),
null,
tmpDir.getCanonicalPath()
),
new HadoopTuningConfig(
tmpDir.getCanonicalPath(),
null,
new DimensionRangePartitionsSpec(
targetPartitionSize,
null,
partitionDimensions,
assumeGrouped
),
null,
null,
null,
null,
null,
null,
false,
false,
false,
false,
null,
false,
false,
null,
null,
false,
false,
null,
null,
null,
null,
null
)
)
);
}
@Test
public void testPartitionJob()
{
DeterminePartitionsJob job = new DeterminePartitionsJob(config);
job.run();
int shardNum = 0;
int segmentNum = 0;
Assert.assertEquals(expectedNumOfSegments, config.getSchema().getTuningConfig().getShardSpecs().size());
for (Map.Entry<Long, List<HadoopyShardSpec>> entry : config.getSchema()
.getTuningConfig()
.getShardSpecs()
.entrySet()) {
int partitionNum = 0;
List<HadoopyShardSpec> specs = entry.getValue();
Assert.assertEquals(expectedNumOfShardsForEachSegment[segmentNum], specs.size());
for (HadoopyShardSpec spec : specs) {
DimensionRangeShardSpec actualSpec = (DimensionRangeShardSpec) spec.getActualSpec();
Assert.assertEquals(shardNum, spec.getShardNum());
Assert.assertArrayEquals(
expectedStartEndForEachShard[segmentNum][partitionNum][0],
actualSpec.getStartTuple() == null ? null : actualSpec.getStartTuple().toArray()
);
Assert.assertArrayEquals(
expectedStartEndForEachShard[segmentNum][partitionNum][1],
actualSpec.getEndTuple() == null ? null : actualSpec.getEndTuple().toArray()
);
Assert.assertEquals(partitionNum, actualSpec.getPartitionNum());
shardNum++;
partitionNum++;
}
segmentNum++;
}
}
@After
public void tearDown() throws Exception
{
org.apache.commons.io.FileUtils.forceDelete(dataFile);
FileUtils.deleteDirectory(tmpDir);
}
}

View File

@ -23,6 +23,7 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.inject.Inject;
import org.apache.druid.indexer.partitions.DimensionBasedPartitionsSpec;
import org.apache.druid.indexer.partitions.DimensionRangePartitionsSpec;
import org.apache.druid.indexer.partitions.HashedPartitionsSpec;
import org.apache.druid.indexer.partitions.SingleDimensionPartitionsSpec;
import org.apache.druid.java.util.common.Pair;
@ -96,6 +97,8 @@ public class ITHadoopIndexTest extends AbstractITBatchIndexTest
{new SingleDimensionPartitionsSpec(1000, null, null, false)},
{new SingleDimensionPartitionsSpec(1000, null, "page", false)},
{new SingleDimensionPartitionsSpec(1000, null, null, true)},
{new DimensionRangePartitionsSpec(1000, null, ImmutableList.of("page"), true)},
{new DimensionRangePartitionsSpec(1000, null, ImmutableList.of("page", "user"), false)},
//{new HashedPartitionsSpec(null, 3, null)} // this results in a bug where the segments have 0 rows
};

View File

@ -136,6 +136,61 @@ public class DimensionRangeShardSpecTest
);
}
@Test
public void testShardSpecLookupWithNull()
{
setDimensions("dim1", "dim2");
final DimensionRangeShardSpec shard0 = new DimensionRangeShardSpec(
dimensions,
null,
StringTuple.create("India", null),
1,
1
);
final DimensionRangeShardSpec shard1 = new DimensionRangeShardSpec(
dimensions,
StringTuple.create("India", null),
StringTuple.create("Spain", "Valencia"),
10,
1
);
final DimensionRangeShardSpec shard2 = new DimensionRangeShardSpec(
dimensions,
StringTuple.create("Spain", "Valencia"),
StringTuple.create("Tokyo", null),
10,
1
);
final DimensionRangeShardSpec shard3 = new DimensionRangeShardSpec(
dimensions,
StringTuple.create("Tokyo", null),
null,
100,
1
);
final ShardSpecLookup lookup = shard0.getLookup(Arrays.asList(shard0, shard1, shard2, shard3));
final long timestamp = System.currentTimeMillis();
Assert.assertEquals(shard1, lookup.getShardSpec(timestamp, createRow("India", "Delhi")));
Assert.assertEquals(shard1, lookup.getShardSpec(timestamp, createRow("India", "Kolkata")));
Assert.assertEquals(shard1, lookup.getShardSpec(timestamp, createRow("Japan", "Tokyo")));
Assert.assertEquals(shard1, lookup.getShardSpec(timestamp, createRow("Spain", "Barcelona")));
Assert.assertEquals(shard1, lookup.getShardSpec(timestamp, createRow("India", "Bengaluru")));
Assert.assertEquals(shard2, lookup.getShardSpec(timestamp, createRow("Spain", "Valencia")));
Assert.assertEquals(shard3, lookup.getShardSpec(timestamp, createRow("United Kingdom", "London")));
Assert.assertEquals(shard0, lookup.getShardSpec(timestamp, createRow(null, null)));
Assert.assertEquals(shard0, lookup.getShardSpec(timestamp, createRow(null, "Lyon")));
Assert.assertEquals(shard1, lookup.getShardSpec(timestamp, createRow("India", null)));
Assert.assertEquals(shard1, lookup.getShardSpec(timestamp, createRow("Spain", null)));
Assert.assertEquals(shard3, lookup.getShardSpec(timestamp, createRow("Tokyo", null)));
Assert.assertEquals(shard3, lookup.getShardSpec(timestamp, createRow("United Kingdom", null)));
}
@Test
public void testPossibleInDomain_withNullStart()
{