mirror of https://github.com/apache/druid.git
fix hash based partition - boundary case Integer.MIN
This commit is contained in:
parent
8e3bcc8fc3
commit
c6f531168f
|
@ -51,10 +51,14 @@ public class HashBasedNumberedShardSpec extends NumberedShardSpec
|
||||||
@Override
|
@Override
|
||||||
public boolean isInChunk(InputRow inputRow)
|
public boolean isInChunk(InputRow inputRow)
|
||||||
{
|
{
|
||||||
return Math.abs(hash(inputRow)) % getPartitions() == getPartitionNum();
|
int hash = Math.abs(hash(inputRow));
|
||||||
|
// if (hash == Integer.MIN_VALUE) {
|
||||||
|
// hash = 0;
|
||||||
|
// }
|
||||||
|
return hash % getPartitions() == getPartitionNum();
|
||||||
}
|
}
|
||||||
|
|
||||||
private int hash(InputRow inputRow)
|
protected int hash(InputRow inputRow)
|
||||||
{
|
{
|
||||||
final List<Object> groupKey = Rows.toGroupKey(inputRow.getTimestampFromEpoch(), inputRow);
|
final List<Object> groupKey = Rows.toGroupKey(inputRow.getTimestampFromEpoch(), inputRow);
|
||||||
try {
|
try {
|
||||||
|
|
|
@ -19,26 +19,23 @@
|
||||||
|
|
||||||
package io.druid.server.shard;
|
package io.druid.server.shard;
|
||||||
|
|
||||||
import com.fasterxml.jackson.databind.BeanProperty;
|
|
||||||
import com.fasterxml.jackson.databind.DeserializationContext;
|
|
||||||
import com.fasterxml.jackson.databind.InjectableValues;
|
|
||||||
import com.google.common.base.Function;
|
import com.google.common.base.Function;
|
||||||
import com.google.common.collect.ImmutableList;
|
import com.google.common.collect.ImmutableList;
|
||||||
import com.google.common.collect.Lists;
|
import com.google.common.collect.Lists;
|
||||||
import com.metamx.common.ISE;
|
import com.metamx.common.ISE;
|
||||||
import io.druid.TestUtil;
|
import io.druid.TestUtil;
|
||||||
|
import io.druid.data.input.InputRow;
|
||||||
import io.druid.timeline.partition.HashBasedNumberedShardSpec;
|
import io.druid.timeline.partition.HashBasedNumberedShardSpec;
|
||||||
import io.druid.timeline.partition.PartitionChunk;
|
import io.druid.timeline.partition.PartitionChunk;
|
||||||
import io.druid.timeline.partition.ShardSpec;
|
import io.druid.timeline.partition.ShardSpec;
|
||||||
import junit.framework.Assert;
|
import junit.framework.Assert;
|
||||||
import org.junit.Before;
|
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import java.util.ArrayList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
public class HashBasedNumberedShardSpecTest
|
public class HashBasedNumberedShardSpecTest
|
||||||
{
|
{
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testSerdeRoundTrip() throws Exception
|
public void testSerdeRoundTrip() throws Exception
|
||||||
{
|
{
|
||||||
|
@ -107,4 +104,94 @@ public class HashBasedNumberedShardSpecTest
|
||||||
Assert.assertFalse(chunks.get(2).abuts(chunks.get(2)));
|
Assert.assertFalse(chunks.get(2).abuts(chunks.get(2)));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testIsInChunk()
|
||||||
|
{
|
||||||
|
|
||||||
|
List<ShardSpec> specs = new ArrayList<>();
|
||||||
|
for (int i = 0; i < 3; i++) {
|
||||||
|
specs.add(new HashOverridenShardSpec(i, 3));
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
assertExistsInOneSpec(specs, new HashInputRow(Integer.MIN_VALUE));
|
||||||
|
assertExistsInOneSpec(specs, new HashInputRow(Integer.MAX_VALUE));
|
||||||
|
assertExistsInOneSpec(specs, new HashInputRow(0));
|
||||||
|
assertExistsInOneSpec(specs, new HashInputRow(1000));
|
||||||
|
assertExistsInOneSpec(specs, new HashInputRow(-1000));
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean assertExistsInOneSpec(List<ShardSpec> specs, InputRow row)
|
||||||
|
{
|
||||||
|
for (ShardSpec spec : specs) {
|
||||||
|
if (spec.isInChunk(row)) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
throw new ISE("None of the partition matches");
|
||||||
|
}
|
||||||
|
|
||||||
|
public static class HashOverridenShardSpec extends HashBasedNumberedShardSpec
|
||||||
|
{
|
||||||
|
public HashOverridenShardSpec(
|
||||||
|
int partitionNum,
|
||||||
|
int partitions
|
||||||
|
)
|
||||||
|
{
|
||||||
|
super(partitionNum, partitions);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected int hash(InputRow inputRow)
|
||||||
|
{
|
||||||
|
return inputRow.hashCode();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public static class HashInputRow implements InputRow
|
||||||
|
{
|
||||||
|
private final int hashcode;
|
||||||
|
|
||||||
|
HashInputRow(int hashcode)
|
||||||
|
{
|
||||||
|
this.hashcode = hashcode;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int hashCode()
|
||||||
|
{
|
||||||
|
return super.hashCode(); //To change body of overridden methods use File | Settings | File Templates.
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public List<String> getDimensions()
|
||||||
|
{
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long getTimestampFromEpoch()
|
||||||
|
{
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public List<String> getDimension(String s)
|
||||||
|
{
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Object getRaw(String s)
|
||||||
|
{
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public float getFloatMetric(String s)
|
||||||
|
{
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue