Allow getDomain to return disjointed intervals (#5570)

* Allow getDomain to return disjointed intervals

* Indentation issues
This commit is contained in:
Niketh Sabbineni 2018-04-05 22:12:30 -07:00 committed by Gian Merlino
parent 969342cd28
commit 270fd1ea15
9 changed files with 55 additions and 75 deletions

View File

@ -22,7 +22,7 @@ package io.druid.timeline.partition;
import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonIgnore;
import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Range; import com.google.common.collect.RangeSet;
import io.druid.data.input.InputRow; import io.druid.data.input.InputRow;
import java.util.List; import java.util.List;
@ -71,19 +71,11 @@ public class NoneShardSpec implements ShardSpec
@Override @Override
public ShardSpecLookup getLookup(final List<ShardSpec> shardSpecs) public ShardSpecLookup getLookup(final List<ShardSpec> shardSpecs)
{ {
return (long timestamp, InputRow row) -> shardSpecs.get(0);
return new ShardSpecLookup()
{
@Override
public ShardSpec getShardSpec(long timestamp, InputRow row)
{
return shardSpecs.get(0);
}
};
} }
@Override @Override
public Map<String, Range<String>> getDomain() public Map<String, RangeSet<String>> getDomain()
{ {
return ImmutableMap.of(); return ImmutableMap.of();
} }

View File

@ -21,7 +21,7 @@ package io.druid.timeline.partition;
import com.fasterxml.jackson.annotation.JsonSubTypes; import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo; import com.fasterxml.jackson.annotation.JsonTypeInfo;
import com.google.common.collect.Range; import com.google.common.collect.RangeSet;
import io.druid.data.input.InputRow; import io.druid.data.input.InputRow;
import java.util.List; import java.util.List;
@ -50,5 +50,5 @@ public interface ShardSpec
* *
* @return map of dimensions to its possible range. Dimensions with unknown possible range are not mapped * @return map of dimensions to its possible range. Dimensions with unknown possible range are not mapped
*/ */
Map<String, Range<String>> getDomain(); Map<String, RangeSet<String>> getDomain();
} }

View File

@ -24,7 +24,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.common.collect.Range; import com.google.common.collect.RangeSet;
import com.google.common.collect.Sets; import com.google.common.collect.Sets;
import io.druid.TestObjectMapper; import io.druid.TestObjectMapper;
import io.druid.data.input.InputRow; import io.druid.data.input.InputRow;
@ -82,7 +82,7 @@ public class DataSegmentTest
} }
@Override @Override
public Map<String, Range<String>> getDomain() public Map<String, RangeSet<String>> getDomain()
{ {
return ImmutableMap.of(); return ImmutableMap.of();
} }

View File

@ -122,12 +122,13 @@ public class DimFilterUtils
boolean include = true; boolean include = true;
if (dimFilter != null && shard != null) { if (dimFilter != null && shard != null) {
Map<String, Range<String>> domain = shard.getDomain(); Map<String, RangeSet<String>> domain = shard.getDomain();
for (Map.Entry<String, Range<String>> entry : domain.entrySet()) { for (Map.Entry<String, RangeSet<String>> entry : domain.entrySet()) {
String dimension = entry.getKey(); String dimension = entry.getKey();
Optional<RangeSet<String>> optFilterRangeSet = dimensionRangeCache Optional<RangeSet<String>> optFilterRangeSet = dimensionRangeCache
.computeIfAbsent(dimension, d -> Optional.fromNullable(dimFilter.getDimensionRangeSet(d))); .computeIfAbsent(dimension, d -> Optional.fromNullable(dimFilter.getDimensionRangeSet(d)));
if (optFilterRangeSet.isPresent() && optFilterRangeSet.get().subRangeSet(entry.getValue()).isEmpty()) {
if (optFilterRangeSet.isPresent() && hasEmptyIntersection(optFilterRangeSet.get(), entry.getValue())) {
include = false; include = false;
} }
} }
@ -139,4 +140,15 @@ public class DimFilterUtils
} }
return retSet; return retSet;
} }
private static boolean hasEmptyIntersection(RangeSet<String> r1, RangeSet<String> r2)
{
for (Range<String> range : r2.asRanges()) {
if (!r1.subRangeSet(range).isEmpty()) {
return false;
}
}
return true;
}
} }

View File

@ -29,6 +29,7 @@ import com.google.common.collect.Maps;
import com.google.common.collect.Range; import com.google.common.collect.Range;
import com.google.common.collect.RangeSet; import com.google.common.collect.RangeSet;
import com.google.common.collect.Sets; import com.google.common.collect.Sets;
import com.google.common.collect.TreeRangeSet;
import io.druid.timeline.partition.ShardSpec; import io.druid.timeline.partition.ShardSpec;
import org.easymock.EasyMock; import org.easymock.EasyMock;
import org.junit.Assert; import org.junit.Assert;
@ -113,8 +114,10 @@ public class DimFilterUtilsTest
private static ShardSpec shardSpec(String dimension, Range<String> range) private static ShardSpec shardSpec(String dimension, Range<String> range)
{ {
ShardSpec shard = EasyMock.createMock(ShardSpec.class); ShardSpec shard = EasyMock.createMock(ShardSpec.class);
RangeSet<String> rangeSet = TreeRangeSet.create();
rangeSet.add(range);
EasyMock.expect(shard.getDomain()) EasyMock.expect(shard.getDomain())
.andReturn(ImmutableMap.of(dimension, range)) .andReturn(ImmutableMap.of(dimension, rangeSet))
.anyTimes(); .anyTimes();
return shard; return shard;
} }

View File

@ -25,12 +25,11 @@ import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Function;
import com.google.common.base.Throwables; import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.common.collect.Range; import com.google.common.collect.RangeSet;
import com.google.common.hash.HashFunction; import com.google.common.hash.HashFunction;
import com.google.common.hash.Hashing; import com.google.common.hash.Hashing;
import io.druid.data.input.InputRow; import io.druid.data.input.InputRow;
@ -90,14 +89,10 @@ public class HashBasedNumberedShardSpec extends NumberedShardSpec
if (partitionDimensions.isEmpty()) { if (partitionDimensions.isEmpty()) {
return Rows.toGroupKey(timestamp, inputRow); return Rows.toGroupKey(timestamp, inputRow);
} else { } else {
return Lists.transform(partitionDimensions, new Function<String, Object>() return Lists.transform(
{ partitionDimensions,
@Override dim -> inputRow.getDimension(dim)
public Object apply(final String dim) );
{
return inputRow.getDimension(dim);
}
});
} }
} }
@ -114,19 +109,14 @@ public class HashBasedNumberedShardSpec extends NumberedShardSpec
@Override @Override
public ShardSpecLookup getLookup(final List<ShardSpec> shardSpecs) public ShardSpecLookup getLookup(final List<ShardSpec> shardSpecs)
{ {
return new ShardSpecLookup() return (long timestamp, InputRow row) -> {
{ int index = Math.abs(hash(timestamp, row) % getPartitions());
@Override return shardSpecs.get(index);
public ShardSpec getShardSpec(long timestamp, InputRow row)
{
int index = Math.abs(hash(timestamp, row) % getPartitions());
return shardSpecs.get(index);
}
}; };
} }
@Override @Override
public Map<String, Range<String>> getDomain() public Map<String, RangeSet<String>> getDomain()
{ {
return ImmutableMap.of(); return ImmutableMap.of();
} }

View File

@ -23,7 +23,7 @@ import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Range; import com.google.common.collect.RangeSet;
import io.druid.data.input.InputRow; import io.druid.data.input.InputRow;
import java.util.List; import java.util.List;
@ -54,18 +54,11 @@ public class LinearShardSpec implements ShardSpec
@Override @Override
public ShardSpecLookup getLookup(final List<ShardSpec> shardSpecs) public ShardSpecLookup getLookup(final List<ShardSpec> shardSpecs)
{ {
return new ShardSpecLookup() return (long timestamp, InputRow row) -> shardSpecs.get(0);
{
@Override
public ShardSpec getShardSpec(long timestamp, InputRow row)
{
return shardSpecs.get(0);
}
};
} }
@Override @Override
public Map<String, Range<String>> getDomain() public Map<String, RangeSet<String>> getDomain()
{ {
return ImmutableMap.of(); return ImmutableMap.of();
} }

View File

@ -24,7 +24,7 @@ import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Range; import com.google.common.collect.RangeSet;
import io.druid.data.input.InputRow; import io.druid.data.input.InputRow;
import java.util.List; import java.util.List;
@ -67,18 +67,11 @@ public class NumberedShardSpec implements ShardSpec
@Override @Override
public ShardSpecLookup getLookup(final List<ShardSpec> shardSpecs) public ShardSpecLookup getLookup(final List<ShardSpec> shardSpecs)
{ {
return new ShardSpecLookup() return (long timestamp, InputRow row) -> shardSpecs.get(0);
{
@Override
public ShardSpec getShardSpec(long timestamp, InputRow row)
{
return shardSpecs.get(0);
}
};
} }
@Override @Override
public Map<String, Range<String>> getDomain() public Map<String, RangeSet<String>> getDomain()
{ {
return ImmutableMap.of(); return ImmutableMap.of();
} }

View File

@ -23,6 +23,8 @@ import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Range; import com.google.common.collect.Range;
import com.google.common.collect.RangeSet;
import com.google.common.collect.TreeRangeSet;
import io.druid.data.input.InputRow; import io.druid.data.input.InputRow;
import io.druid.java.util.common.ISE; import io.druid.java.util.common.ISE;
@ -102,35 +104,30 @@ public class SingleDimensionShardSpec implements ShardSpec
@Override @Override
public ShardSpecLookup getLookup(final List<ShardSpec> shardSpecs) public ShardSpecLookup getLookup(final List<ShardSpec> shardSpecs)
{ {
return new ShardSpecLookup() return (long timestamp, InputRow row) -> {
{ for (ShardSpec spec : shardSpecs) {
@Override if (spec.isInChunk(timestamp, row)) {
public ShardSpec getShardSpec(long timestamp, InputRow row) return spec;
{
for (ShardSpec spec : shardSpecs) {
if (spec.isInChunk(timestamp, row)) {
return spec;
}
} }
throw new ISE("row[%s] doesn't fit in any shard[%s]", row, shardSpecs);
} }
throw new ISE("row[%s] doesn't fit in any shard[%s]", row, shardSpecs);
}; };
} }
@Override @Override
public Map<String, Range<String>> getDomain() public Map<String, RangeSet<String>> getDomain()
{ {
Range<String> range; RangeSet<String> rangeSet = TreeRangeSet.create();
if (start == null && end == null) { if (start == null && end == null) {
range = Range.all(); rangeSet.add(Range.all());
} else if (start == null) { } else if (start == null) {
range = Range.atMost(end); rangeSet.add(Range.atMost(end));
} else if (end == null) { } else if (end == null) {
range = Range.atLeast(start); rangeSet.add(Range.atLeast(start));
} else { } else {
range = Range.closed(start, end); rangeSet.add(Range.closed(start, end));
} }
return ImmutableMap.of(dimension, range); return ImmutableMap.of(dimension, rangeSet);
} }
public void setPartitionNum(int partitionNum) public void setPartitionNum(int partitionNum)