fix long/float/double dimension filtering for columns with nulls (#6906)

* fix long,float, double dimension filtering when sql compatible null handling is enabled and the column has null values

* revert unintended change

* fix tests
This commit is contained in:
Clint Wylie 2019-01-23 22:36:52 -08:00 committed by Fangjin Yang
parent 3b020fd81b
commit 66f64cd8bd
6 changed files with 237 additions and 18 deletions

View File

@ -41,6 +41,9 @@ public class DoubleValueMatcherColumnSelectorStrategy
@Override
public boolean matches()
{
if (selector.isNull()) {
return false;
}
return Double.doubleToLongBits(selector.getDouble()) == matchValLongBits;
}

View File

@ -40,6 +40,9 @@ public class FloatValueMatcherColumnSelectorStrategy
@Override
public boolean matches()
{
if (selector.isNull()) {
return false;
}
return Float.floatToIntBits(selector.getFloat()) == matchValIntBits;
}

View File

@ -39,6 +39,9 @@ public class LongValueMatcherColumnSelectorStrategy
@Override
public boolean matches()
{
if (selector.isNull()) {
return false;
}
return selector.getLong() == matchValLong;
}

View File

@ -388,13 +388,19 @@ public class DruidAvaticaHandlerTest extends CalciteTestBase
ImmutableList.of(
ROW(
Pair.of("TABLE_CAT", "druid"),
Pair.of("TABLE_NAME", "foo"),
Pair.of("TABLE_NAME", CalciteTests.DATASOURCE1),
Pair.of("TABLE_SCHEM", "druid"),
Pair.of("TABLE_TYPE", "TABLE")
),
ROW(
Pair.of("TABLE_CAT", "druid"),
Pair.of("TABLE_NAME", "foo2"),
Pair.of("TABLE_NAME", CalciteTests.DATASOURCE2),
Pair.of("TABLE_SCHEM", "druid"),
Pair.of("TABLE_TYPE", "TABLE")
),
ROW(
Pair.of("TABLE_CAT", "druid"),
Pair.of("TABLE_NAME", CalciteTests.DATASOURCE3),
Pair.of("TABLE_SCHEM", "druid"),
Pair.of("TABLE_TYPE", "TABLE")
)
@ -429,6 +435,12 @@ public class DruidAvaticaHandlerTest extends CalciteTestBase
Pair.of("TABLE_NAME", CalciteTests.FORBIDDEN_DATASOURCE),
Pair.of("TABLE_SCHEM", "druid"),
Pair.of("TABLE_TYPE", "TABLE")
),
ROW(
Pair.of("TABLE_CAT", "druid"),
Pair.of("TABLE_NAME", CalciteTests.DATASOURCE3),
Pair.of("TABLE_SCHEM", "druid"),
Pair.of("TABLE_TYPE", "TABLE")
)
),
getRows(

View File

@ -272,8 +272,9 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
+ "WHERE TABLE_TYPE IN ('SYSTEM_TABLE', 'TABLE', 'VIEW')",
ImmutableList.of(),
ImmutableList.of(
new Object[]{"druid", "foo", "TABLE"},
new Object[]{"druid", "foo2", "TABLE"},
new Object[]{"druid", CalciteTests.DATASOURCE1, "TABLE"},
new Object[]{"druid", CalciteTests.DATASOURCE2, "TABLE"},
new Object[]{"druid", CalciteTests.DATASOURCE3, "TABLE"},
new Object[]{"druid", "aview", "VIEW"},
new Object[]{"druid", "bview", "VIEW"},
new Object[]{"INFORMATION_SCHEMA", "COLUMNS", "SYSTEM_TABLE"},
@ -293,20 +294,21 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
+ "WHERE TABLE_TYPE IN ('SYSTEM_TABLE', 'TABLE', 'VIEW')",
CalciteTests.SUPER_USER_AUTH_RESULT,
ImmutableList.of(),
ImmutableList.of(
new Object[]{"druid", CalciteTests.DATASOURCE1, "TABLE"},
new Object[]{"druid", CalciteTests.DATASOURCE2, "TABLE"},
new Object[]{"druid", CalciteTests.FORBIDDEN_DATASOURCE, "TABLE"},
new Object[]{"druid", "aview", "VIEW"},
new Object[]{"druid", "bview", "VIEW"},
new Object[]{"INFORMATION_SCHEMA", "COLUMNS", "SYSTEM_TABLE"},
new Object[]{"INFORMATION_SCHEMA", "SCHEMATA", "SYSTEM_TABLE"},
new Object[]{"INFORMATION_SCHEMA", "TABLES", "SYSTEM_TABLE"},
new Object[]{"sys", "segments", "SYSTEM_TABLE"},
new Object[]{"sys", "server_segments", "SYSTEM_TABLE"},
new Object[]{"sys", "servers", "SYSTEM_TABLE"},
new Object[]{"sys", "tasks", "SYSTEM_TABLE"}
)
ImmutableList.<Object[]>builder()
.add(new Object[]{"druid", CalciteTests.DATASOURCE1, "TABLE"})
.add(new Object[]{"druid", CalciteTests.DATASOURCE2, "TABLE"})
.add(new Object[]{"druid", CalciteTests.FORBIDDEN_DATASOURCE, "TABLE"})
.add(new Object[]{"druid", CalciteTests.DATASOURCE3, "TABLE"})
.add(new Object[]{"druid", "aview", "VIEW"})
.add(new Object[]{"druid", "bview", "VIEW"})
.add(new Object[]{"INFORMATION_SCHEMA", "COLUMNS", "SYSTEM_TABLE"})
.add(new Object[]{"INFORMATION_SCHEMA", "SCHEMATA", "SYSTEM_TABLE"})
.add(new Object[]{"INFORMATION_SCHEMA", "TABLES", "SYSTEM_TABLE"})
.add(new Object[]{"sys", "segments", "SYSTEM_TABLE"})
.add(new Object[]{"sys", "server_segments", "SYSTEM_TABLE"})
.add(new Object[]{"sys", "servers", "SYSTEM_TABLE"})
.add(new Object[]{"sys", "tasks", "SYSTEM_TABLE"})
.build()
);
}
@ -7515,4 +7517,70 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
ImmutableList.of()
);
}
@Test
public void testFilterFloatDimension() throws Exception
{
testQuery(
"SELECT dim1 FROM numfoo WHERE f1 = 0.1 LIMIT 1",
ImmutableList.of(
newScanQueryBuilder()
.dataSource(CalciteTests.DATASOURCE3)
.intervals(QSS(Filtration.eternity()))
.columns("dim1")
.filters(SELECTOR("f1", "0.1", null))
.resultFormat(ScanQuery.RESULT_FORMAT_COMPACTED_LIST)
.limit(1)
.context(QUERY_CONTEXT_DEFAULT)
.build()
),
ImmutableList.of(
new Object[]{"10.1"}
)
);
}
@Test
public void testFilterDoubleDimension() throws Exception
{
testQuery(
"SELECT dim1 FROM numfoo WHERE d1 = 1.7 LIMIT 1",
ImmutableList.of(
newScanQueryBuilder()
.dataSource(CalciteTests.DATASOURCE3)
.intervals(QSS(Filtration.eternity()))
.columns("dim1")
.filters(SELECTOR("d1", "1.7", null))
.resultFormat(ScanQuery.RESULT_FORMAT_COMPACTED_LIST)
.limit(1)
.context(QUERY_CONTEXT_DEFAULT)
.build()
),
ImmutableList.of(
new Object[]{"10.1"}
)
);
}
@Test
public void testFilterLongDimension() throws Exception
{
testQuery(
"SELECT dim1 FROM numfoo WHERE l1 = 7 LIMIT 1",
ImmutableList.of(
newScanQueryBuilder()
.dataSource(CalciteTests.DATASOURCE3)
.intervals(QSS(Filtration.eternity()))
.columns("dim1")
.filters(SELECTOR("l1", "7", null))
.resultFormat(ScanQuery.RESULT_FORMAT_COMPACTED_LIST)
.limit(1)
.context(QUERY_CONTEXT_DEFAULT)
.build()
),
ImmutableList.of(
new Object[]{""}
)
);
}
}

View File

@ -34,8 +34,12 @@ import org.apache.curator.x.discovery.ServiceProvider;
import org.apache.druid.collections.CloseableStupidPool;
import org.apache.druid.curator.discovery.ServerDiscoverySelector;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.impl.DimensionSchema;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.DoubleDimensionSchema;
import org.apache.druid.data.input.impl.FloatDimensionSchema;
import org.apache.druid.data.input.impl.InputRowParser;
import org.apache.druid.data.input.impl.LongDimensionSchema;
import org.apache.druid.data.input.impl.MapInputRowParser;
import org.apache.druid.data.input.impl.TimeAndDimsParseSpec;
import org.apache.druid.data.input.impl.TimestampSpec;
@ -144,6 +148,7 @@ public class CalciteTests
{
public static final String DATASOURCE1 = "foo";
public static final String DATASOURCE2 = "foo2";
public static final String DATASOURCE3 = "numfoo";
public static final String FORBIDDEN_DATASOURCE = "forbiddenDatasource";
public static final String TEST_SUPERUSER_NAME = "testSuperuser";
@ -243,6 +248,22 @@ public class CalciteTests
)
);
private static final InputRowParser<Map<String, Object>> PARSER_NUMERIC_DIMS = new MapInputRowParser(
new TimeAndDimsParseSpec(
new TimestampSpec(TIMESTAMP_COLUMN, "iso", null),
new DimensionsSpec(
ImmutableList.<DimensionSchema>builder()
.addAll(DimensionsSpec.getDefaultSchemas(ImmutableList.of("dim1", "dim2", "dim3")))
.add(new DoubleDimensionSchema("d1"))
.add(new FloatDimensionSchema("f1"))
.add(new LongDimensionSchema("l1"))
.build(),
null,
null
)
)
);
private static final IncrementalIndexSchema INDEX_SCHEMA = new IncrementalIndexSchema.Builder()
.withMetrics(
new CountAggregatorFactory("cnt"),
@ -253,6 +274,17 @@ public class CalciteTests
.withRollup(false)
.build();
private static final IncrementalIndexSchema INDEX_SCHEMA_NUMERIC_DIMS = new IncrementalIndexSchema.Builder()
.withMetrics(
new CountAggregatorFactory("cnt"),
new FloatSumAggregatorFactory("m1", "m1"),
new DoubleSumAggregatorFactory("m2", "m2"),
new HyperUniquesAggregatorFactory("unique_dim1", "dim1")
)
.withDimensionsSpec(PARSER_NUMERIC_DIMS)
.withRollup(false)
.build();
public static final List<InputRow> ROWS1 = ImmutableList.of(
createRow(
ImmutableMap.<String, Object>builder()
@ -314,6 +346,84 @@ public class CalciteTests
)
);
public static final List<InputRow> ROWS1_WITH_NUMERIC_DIMS = ImmutableList.of(
createRow(
ImmutableMap.<String, Object>builder()
.put("t", "2000-01-01")
.put("m1", "1.0")
.put("m2", "1.0")
.put("d1", 1.0)
.put("f1", 1.0f)
.put("l1", 7L)
.put("dim1", "")
.put("dim2", ImmutableList.of("a"))
.put("dim3", ImmutableList.of("a", "b"))
.build(),
PARSER_NUMERIC_DIMS
),
createRow(
ImmutableMap.<String, Object>builder()
.put("t", "2000-01-02")
.put("m1", "2.0")
.put("m2", "2.0")
.put("d1", 1.7)
.put("f1", 0.1f)
.put("l1", 325323L)
.put("dim1", "10.1")
.put("dim2", ImmutableList.of())
.put("dim3", ImmutableList.of("b", "c"))
.build(),
PARSER_NUMERIC_DIMS
),
createRow(
ImmutableMap.<String, Object>builder()
.put("t", "2000-01-03")
.put("m1", "3.0")
.put("m2", "3.0")
.put("d1", 0.0)
.put("f1", 0.0)
.put("l1", 0)
.put("dim1", "2")
.put("dim2", ImmutableList.of(""))
.put("dim3", ImmutableList.of("d"))
.build(),
PARSER_NUMERIC_DIMS
),
createRow(
ImmutableMap.<String, Object>builder()
.put("t", "2001-01-01")
.put("m1", "4.0")
.put("m2", "4.0")
.put("dim1", "1")
.put("dim2", ImmutableList.of("a"))
.put("dim3", ImmutableList.of(""))
.build(),
PARSER_NUMERIC_DIMS
),
createRow(
ImmutableMap.<String, Object>builder()
.put("t", "2001-01-02")
.put("m1", "5.0")
.put("m2", "5.0")
.put("dim1", "def")
.put("dim2", ImmutableList.of("abc"))
.put("dim3", ImmutableList.of())
.build(),
PARSER_NUMERIC_DIMS
),
createRow(
ImmutableMap.<String, Object>builder()
.put("t", "2001-01-03")
.put("m1", "6.0")
.put("m2", "6.0")
.put("dim1", "abc")
.build(),
PARSER_NUMERIC_DIMS
)
);
public static final List<InputRow> ROWS2 = ImmutableList.of(
createRow("2000-01-01", "דרואיד", "he", 1.0),
createRow("2000-01-01", "druid", "en", 1.0),
@ -504,6 +614,14 @@ public class CalciteTests
.rows(FORBIDDEN_ROWS)
.buildMMappedIndex();
final QueryableIndex indexNumericDims = IndexBuilder
.create()
.tmpDir(new File(tmpDir, "3"))
.segmentWriteOutMediumFactory(OffHeapMemorySegmentWriteOutMediumFactory.instance())
.schema(INDEX_SCHEMA_NUMERIC_DIMS)
.rows(ROWS1_WITH_NUMERIC_DIMS)
.buildMMappedIndex();
return new SpecificSegmentsQuerySegmentWalker(conglomerate).add(
DataSegment.builder()
.dataSource(DATASOURCE1)
@ -528,6 +646,13 @@ public class CalciteTests
.shardSpec(new LinearShardSpec(0))
.build(),
forbiddenIndex
).add(DataSegment.builder()
.dataSource(DATASOURCE3)
.interval(indexNumericDims.getDataInterval())
.version("1")
.shardSpec(new LinearShardSpec(0))
.build(),
indexNumericDims
);
}
@ -594,6 +719,11 @@ public class CalciteTests
return PARSER.parseBatch((Map<String, Object>) map).get(0);
}
public static InputRow createRow(final ImmutableMap<String, ?> map, InputRowParser<Map<String, Object>> parser)
{
return parser.parseBatch((Map<String, Object>) map).get(0);
}
public static InputRow createRow(final Object t, final String dim1, final String dim2, final double m1)
{
return PARSER.parseBatch(