Make float default representation for DoubleSum/Min/Max aggregators (#4944)

* Introduce System wide property to select how to store double.
Set the default to store as float

Change-Id: Id85cca04ed0e7ecbce78624168c586dcc2adafaa

* fix tests

Change-Id: Ib42db724b8a8f032d204b58c366caaeabdd0d939

* Change the property name

Change-Id: I3ed69f79fc56e3735bc8f3a097f52a9f932b4734

* add tests and make default distribution store doubles as 64bits

Change-Id: I237b07829117ac61e247a6124423b03992f550f2

* adding mvn argument to parallel-test profile

Change-Id: Iae5d1328f901c4876b133894fa37e0d9a4162b05

* move property name and helper function to io.druid.segment.column.Column

Change-Id: I62ea903d332515de2b7ca45c02587a1b015cb065

* fix docs and clean style

Change-Id: I726abb8f52d25dc9dc62ad98814c5feda5e4d065

* fix docs

Change-Id: If10f4cf1e51a58285a301af4107ea17fe5e09b6d
This commit is contained in:
Slim 2017-10-16 17:17:22 -07:00 committed by Jonathan Wei
parent a7e802c9d4
commit af2bc5f814
13 changed files with 448 additions and 10 deletions

View File

@ -395,3 +395,14 @@ the following properties.
<div class="note info">
JavaScript-based functionality is disabled by default. Please refer to the Druid <a href="../development/javascript.html">JavaScript programming guide</a> for guidelines about using Druid's JavaScript functionality, including instructions on how to enable it.
</div>
### Double Column storage
Druid's storage layer uses a 32-bit float representation to store columns created by the
doubleSum, doubleMin, and doubleMax aggregators at indexing time. To instead use 64-bit floats
for these columns, please set the system-wide property `druid.indexing.doubleStorage=double`.
This will become the default behavior in a future version of Druid.
|Property|Description|Default|
|--------|-----------|-------|
|`druid.indexing.doubleStorage`|Set to "double" to use 64-bit double representation for double columns.|float|

View File

@ -116,3 +116,8 @@ druid.selectors.coordinator.serviceName=druid/coordinator
druid.monitoring.monitors=["com.metamx.metrics.JvmMonitor"]
druid.emitter=logging
druid.emitter.logging.logLevel=info
# Storage type of double columns
# ommiting this will lead to index double as float at the storage layer
druid.indexing.doubleStorage=double

View File

@ -115,3 +115,8 @@ druid.selectors.coordinator.serviceName=druid/coordinator
druid.monitoring.monitors=["com.metamx.metrics.JvmMonitor"]
druid.emitter=logging
druid.emitter.logging.logLevel=info
# Storage type of double columns
# ommiting this will lead to index double as float at the storage layer
druid.indexing.doubleStorage=double

View File

@ -1036,6 +1036,8 @@
<!-- set heap size to work around https://github.com/travis-ci/travis-ci/issues/3396 -->
<argLine>-Xmx3000m -Duser.language=en -Duser.country=US -Dfile.encoding=UTF-8
-Duser.timezone=UTC -Djava.util.logging.manager=org.apache.logging.log4j.jul.LogManager
<!--@TODO After fixing https://github.com/druid-io/druid/issues/4964 remove this parameter-->
-Ddruid.indexing.doubleStorage=double
</argLine>
<!-- our tests are very verbose, let's keep the volume down -->
<redirectTestOutputToFile>true</redirectTestOutputToFile>
@ -1264,7 +1266,9 @@
<!-- locale settings must be set on the command line before startup -->
<!-- set heap size to work around https://github.com/travis-ci/travis-ci/issues/3396 -->
<argLine>-Xmx768m -Duser.language=en -Duser.country=US -Dfile.encoding=UTF-8
-Duser.timezone=UTC -Djava.util.logging.manager=org.apache.logging.log4j.jul.LogManager
-Duser.timezone=UTC -Djava.util.logging.manager=org.apache.logging.log4j.jul.LogManager
<!--@TODO After fixing https://github.com/druid-io/druid/issues/4964 remove this parameter-->
-Ddruid.indexing.doubleStorage=double
</argLine>
<!-- our tests are very verbose, let's keep the volume down -->
<redirectTestOutputToFile>true</redirectTestOutputToFile>

View File

@ -26,6 +26,7 @@ import io.druid.math.expr.ExprMacroTable;
import io.druid.math.expr.Parser;
import io.druid.segment.BaseDoubleColumnValueSelector;
import io.druid.segment.ColumnSelectorFactory;
import io.druid.segment.column.Column;
import java.util.Collections;
import java.util.Comparator;
@ -38,6 +39,7 @@ public abstract class SimpleDoubleAggregatorFactory extends AggregatorFactory
protected final String fieldName;
protected final String expression;
protected final ExprMacroTable macroTable;
protected final boolean storeDoubleAsFloat;
public SimpleDoubleAggregatorFactory(
ExprMacroTable macroTable,
@ -50,6 +52,7 @@ public abstract class SimpleDoubleAggregatorFactory extends AggregatorFactory
this.fieldName = fieldName;
this.name = name;
this.expression = expression;
this.storeDoubleAsFloat = Column.storeDoubleAsFloat();
Preconditions.checkNotNull(name, "Must have a valid, non-null aggregator name");
Preconditions.checkArgument(
fieldName == null ^ expression == null,
@ -81,6 +84,9 @@ public abstract class SimpleDoubleAggregatorFactory extends AggregatorFactory
@Override
public String getTypeName()
{
if (storeDoubleAsFloat) {
return "float";
}
return "double";
}
@ -144,4 +150,5 @@ public abstract class SimpleDoubleAggregatorFactory extends AggregatorFactory
{
return expression;
}
}

View File

@ -59,6 +59,7 @@ public class DoubleFirstAggregatorFactory extends AggregatorFactory
private final String fieldName;
private final String name;
private final boolean storeDoubleAsFloat;
@JsonCreator
public DoubleFirstAggregatorFactory(
@ -71,6 +72,7 @@ public class DoubleFirstAggregatorFactory extends AggregatorFactory
this.name = name;
this.fieldName = fieldName;
this.storeDoubleAsFloat = Column.storeDoubleAsFloat();
}
@Override
@ -222,6 +224,9 @@ public class DoubleFirstAggregatorFactory extends AggregatorFactory
@Override
public String getTypeName()
{
if (storeDoubleAsFloat) {
return "float";
}
return "double";
}

View File

@ -51,6 +51,7 @@ public class DoubleLastAggregatorFactory extends AggregatorFactory
private final String fieldName;
private final String name;
private final boolean storeDoubleAsFloat;
@JsonCreator
public DoubleLastAggregatorFactory(
@ -62,6 +63,7 @@ public class DoubleLastAggregatorFactory extends AggregatorFactory
Preconditions.checkNotNull(fieldName, "Must have a valid, non-null fieldName");
this.name = name;
this.fieldName = fieldName;
this.storeDoubleAsFloat = Column.storeDoubleAsFloat();
}
@Override
@ -213,6 +215,10 @@ public class DoubleLastAggregatorFactory extends AggregatorFactory
@Override
public String getTypeName()
{
if (storeDoubleAsFloat) {
return "float";
}
return "double";
}

View File

@ -19,11 +19,21 @@
package io.druid.segment.column;
import io.druid.java.util.common.StringUtils;
/**
*/
public interface Column
{
String TIME_COLUMN_NAME = "__time";
String DOUBLE_STORAGE_TYPE_PROPERTY = "druid.indexing.doubleStorage";
static boolean storeDoubleAsFloat()
{
String value = System.getProperty(DOUBLE_STORAGE_TYPE_PROPERTY, "float");
return !StringUtils.toLowerCase(value).equals("double");
}
ColumnCapabilities getCapabilities();
int getLength();

View File

@ -0,0 +1,369 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.query;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import io.druid.data.input.impl.DimensionsSpec;
import io.druid.data.input.impl.InputRowParser;
import io.druid.data.input.impl.JSONParseSpec;
import io.druid.data.input.impl.MapInputRowParser;
import io.druid.data.input.impl.SpatialDimensionSchema;
import io.druid.data.input.impl.TimestampSpec;
import io.druid.java.util.common.DateTimes;
import io.druid.java.util.common.Intervals;
import io.druid.java.util.common.guava.Sequences;
import io.druid.query.aggregation.DoubleSumAggregatorFactory;
import io.druid.query.metadata.SegmentMetadataQueryConfig;
import io.druid.query.metadata.SegmentMetadataQueryQueryToolChest;
import io.druid.query.metadata.SegmentMetadataQueryRunnerFactory;
import io.druid.query.metadata.metadata.ColumnAnalysis;
import io.druid.query.metadata.metadata.ListColumnIncluderator;
import io.druid.query.metadata.metadata.SegmentAnalysis;
import io.druid.query.metadata.metadata.SegmentMetadataQuery;
import io.druid.query.scan.ScanQuery;
import io.druid.query.scan.ScanQueryConfig;
import io.druid.query.scan.ScanQueryEngine;
import io.druid.query.scan.ScanQueryQueryToolChest;
import io.druid.query.scan.ScanQueryRunnerFactory;
import io.druid.query.scan.ScanResultValue;
import io.druid.query.spec.LegacySegmentSpec;
import io.druid.segment.IndexIO;
import io.druid.segment.IndexMergerV9;
import io.druid.segment.IndexSpec;
import io.druid.segment.QueryableIndex;
import io.druid.segment.QueryableIndexSegment;
import io.druid.segment.TestHelper;
import io.druid.segment.column.ValueType;
import io.druid.segment.incremental.IncrementalIndex;
import io.druid.segment.incremental.IncrementalIndexSchema;
import io.druid.segment.incremental.IndexSizeExceededException;
import org.joda.time.Interval;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import java.io.File;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import static io.druid.segment.column.Column.DOUBLE_STORAGE_TYPE_PROPERTY;
import static io.druid.query.scan.ScanQueryRunnerTest.verify;
@RunWith(Parameterized.class)
public class DoubleStorageTest
{
private static final SegmentMetadataQueryRunnerFactory METADATA_QR_FACTORY = new SegmentMetadataQueryRunnerFactory(
new SegmentMetadataQueryQueryToolChest(new SegmentMetadataQueryConfig()),
QueryRunnerTestHelper.NOOP_QUERYWATCHER
);
private static final ScanQueryQueryToolChest scanQueryQueryToolChest = new ScanQueryQueryToolChest(
new ScanQueryConfig(),
DefaultGenericQueryMetricsFactory.instance()
);
private static final ScanQueryRunnerFactory SCAN_QUERY_RUNNER_FACTORY = new ScanQueryRunnerFactory(
scanQueryQueryToolChest,
new ScanQueryEngine()
);
private ScanQuery.ScanQueryBuilder newTestQuery()
{
return ScanQuery.newScanQueryBuilder()
.dataSource(new TableDataSource(QueryRunnerTestHelper.dataSource))
.columns(Arrays.<String>asList())
.intervals(QueryRunnerTestHelper.fullOnInterval)
.limit(Integer.MAX_VALUE)
.legacy(false);
}
private static final IndexMergerV9 INDEX_MERGER_V9 = TestHelper.getTestIndexMergerV9();
private static final IndexIO INDEX_IO = TestHelper.getTestIndexIO();
private static final Integer MAX_ROWS = 10;
private static final String TIME_COLUMN = "__time";
private static final String DIM_NAME = "testDimName";
private static final String DIM_VALUE = "testDimValue";
private static final String DIM_FLOAT_NAME = "testDimFloatName";
private static final String SEGMENT_ID = "segmentId";
private static final Interval INTERVAL = Intervals.of("2011-01-13T00:00:00.000Z/2011-01-22T00:00:00.001Z");
private static final InputRowParser<Map<String, Object>> ROW_PARSER = new MapInputRowParser(
new JSONParseSpec(
new TimestampSpec(TIME_COLUMN, "auto", null),
new DimensionsSpec(
DimensionsSpec.getDefaultSchemas(ImmutableList.of(DIM_NAME)),
ImmutableList.of(DIM_FLOAT_NAME),
ImmutableList.<SpatialDimensionSchema>of()
),
null,
null
)
);
private QueryableIndex index;
private final SegmentAnalysis expectedSegmentAnalysis;
private final String storeDoubleAs;
public DoubleStorageTest(
String storeDoubleAs,
SegmentAnalysis expectedSegmentAnalysis
) throws IOException
{
this.storeDoubleAs = storeDoubleAs;
this.expectedSegmentAnalysis = expectedSegmentAnalysis;
}
@Parameterized.Parameters
public static Collection<?> dataFeeder() throws IOException
{
SegmentAnalysis expectedSegmentAnalysisDouble = new SegmentAnalysis(
"segmentId",
ImmutableList.of(INTERVAL),
ImmutableMap.of(
TIME_COLUMN,
new ColumnAnalysis(
ValueType.LONG.toString(),
false,
100,
null,
null,
null,
null
),
DIM_NAME,
new ColumnAnalysis(
ValueType.STRING.toString(),
false,
120,
1,
DIM_VALUE,
DIM_VALUE,
null
),
DIM_FLOAT_NAME,
new ColumnAnalysis(
ValueType.DOUBLE.toString(),
false,
80,
null,
null,
null,
null
)
), 330,
MAX_ROWS,
null,
null,
null,
null
);
SegmentAnalysis expectedSegmentAnalysisFloat = new SegmentAnalysis(
"segmentId",
ImmutableList.of(INTERVAL),
ImmutableMap.of(
TIME_COLUMN,
new ColumnAnalysis(
ValueType.LONG.toString(),
false,
100,
null,
null,
null,
null
),
DIM_NAME,
new ColumnAnalysis(
ValueType.STRING.toString(),
false,
120,
1,
DIM_VALUE,
DIM_VALUE,
null
),
DIM_FLOAT_NAME,
new ColumnAnalysis(
ValueType.FLOAT.toString(),
false,
80,
null,
null,
null,
null
)
), 330,
MAX_ROWS,
null,
null,
null,
null
);
return ImmutableList.of(
new Object[]{"double", expectedSegmentAnalysisDouble},
new Object[]{"float", expectedSegmentAnalysisFloat}
);
}
@Before
public void setup() throws IOException
{
index = buildIndex(storeDoubleAs);
}
@Test
public void testMetaDataAnalysis() throws IndexSizeExceededException
{
QueryRunner runner = QueryRunnerTestHelper.makeQueryRunner(
METADATA_QR_FACTORY,
SEGMENT_ID,
new QueryableIndexSegment("segmentId", index),
null
);
SegmentMetadataQuery segmentMetadataQuery = Druids.newSegmentMetadataQueryBuilder()
.dataSource("testing")
.intervals(ImmutableList.of(INTERVAL))
.toInclude(new ListColumnIncluderator(Arrays.asList(
TIME_COLUMN,
DIM_NAME,
DIM_FLOAT_NAME
)))
.analysisTypes(
SegmentMetadataQuery.AnalysisType.CARDINALITY,
SegmentMetadataQuery.AnalysisType.SIZE,
SegmentMetadataQuery.AnalysisType.INTERVAL,
SegmentMetadataQuery.AnalysisType.MINMAX
)
.merge(true)
.build();
List<SegmentAnalysis> results = Sequences.toList(
runner.run(QueryPlus.wrap(segmentMetadataQuery), Maps.newHashMap()),
Lists.<SegmentAnalysis>newArrayList()
);
Assert.assertEquals(Arrays.asList(expectedSegmentAnalysis), results);
}
@Test
public void testSelectValues()
{
QueryRunner runner = QueryRunnerTestHelper.makeQueryRunner(
SCAN_QUERY_RUNNER_FACTORY,
SEGMENT_ID,
new QueryableIndexSegment("segmentId", index),
null
);
ScanQuery query = newTestQuery()
.intervals(new LegacySegmentSpec(INTERVAL))
.virtualColumns()
.build();
HashMap<String, Object> context = new HashMap<String, Object>();
Iterable<ScanResultValue> results = Sequences.toList(
runner.run(QueryPlus.wrap(query), context),
Lists.<ScanResultValue>newArrayList()
);
ScanResultValue expectedScanResult = new ScanResultValue(
SEGMENT_ID,
ImmutableList.of(TIME_COLUMN, DIM_NAME, DIM_FLOAT_NAME),
getStreamOfEvents().collect(Collectors.toList())
);
List<ScanResultValue> expectedResults = Lists.newArrayList(expectedScanResult);
verify(expectedResults, results);
}
private static QueryableIndex buildIndex(String storeDoubleAsFloat) throws IOException
{
String oldValue = System.getProperty(DOUBLE_STORAGE_TYPE_PROPERTY);
System.setProperty(DOUBLE_STORAGE_TYPE_PROPERTY, storeDoubleAsFloat);
final IncrementalIndexSchema schema = new IncrementalIndexSchema.Builder()
.withMinTimestamp(DateTimes.of("2011-01-13T00:00:00.000Z").getMillis())
.withDimensionsSpec(ROW_PARSER)
.withMetrics(
new DoubleSumAggregatorFactory(DIM_FLOAT_NAME, DIM_FLOAT_NAME)
)
.build();
final IncrementalIndex index = new IncrementalIndex.Builder()
.setIndexSchema(schema)
.setMaxRowCount(MAX_ROWS)
.buildOnheap();
getStreamOfEvents().forEach(o -> {
try {
index.add(ROW_PARSER.parse((Map) o));
}
catch (IndexSizeExceededException e) {
Throwables.propagate(e);
}
});
if (oldValue == null) {
System.clearProperty(DOUBLE_STORAGE_TYPE_PROPERTY);
} else {
System.setProperty(DOUBLE_STORAGE_TYPE_PROPERTY, oldValue);
}
File someTmpFile = File.createTempFile("billy", "yay");
someTmpFile.delete();
someTmpFile.mkdirs();
INDEX_MERGER_V9.persist(index, someTmpFile, new IndexSpec());
someTmpFile.delete();
return INDEX_IO.loadIndex(someTmpFile);
}
@After
public void cleanUp() throws IOException
{
index.close();
}
private static Stream getStreamOfEvents()
{
return IntStream.range(0, MAX_ROWS).mapToObj(i -> ImmutableMap.of(
TIME_COLUMN, DateTimes.of("2011-01-13T00:00:00.000Z").plusDays(i).getMillis(),
DIM_NAME, DIM_VALUE,
DIM_FLOAT_NAME, i / 1.6179775280898876
));
}
}

View File

@ -172,8 +172,8 @@ public class GroupByTimeseriesQueryRunnerTest extends TimeseriesQueryRunnerTest
final TimeseriesResultValue value = result.getValue();
Assert.assertEquals(result.toString(), 1870.061029, value.getDoubleMetric("maxIndex"), 0.0);
Assert.assertEquals(result.toString(), 59.021022, value.getDoubleMetric("minIndex"), 0.0);
Assert.assertEquals(result.toString(), 1870.061029, value.getDoubleMetric("maxIndex"), 1870.061029 * 1e-6);
Assert.assertEquals(result.toString(), 59.021022, value.getDoubleMetric("minIndex"), 59.021022 * 1e-6);
}

View File

@ -688,7 +688,7 @@ public class ScanQueryRunnerTest
return expected;
}
private static void verify(
public static void verify(
Iterable<ScanResultValue> expectedResults,
Iterable<ScanResultValue> actualResults
)
@ -719,8 +719,13 @@ public class ScanQueryRunnerTest
if (actVal instanceof String[]) {
actVal = Arrays.asList((String[]) actVal);
}
Assert.assertEquals("invalid value for " + ex.getKey(), ex.getValue(), actVal);
Object exValue = ex.getValue();
if (exValue instanceof Double || exValue instanceof Float) {
final double expectedDoubleValue = ((Number) exValue).doubleValue();
Assert.assertEquals("invalid value for " + ex.getKey(), expectedDoubleValue, ((Number) actVal).doubleValue(), expectedDoubleValue * 1e-6);
} else {
Assert.assertEquals("invalid value for " + ex.getKey(), ex.getValue(), actVal);
}
}
for (Map.Entry<String, Object> ac : acHolder.entrySet()) {
@ -731,7 +736,18 @@ public class ScanQueryRunnerTest
actVal = Arrays.asList((String[]) actVal);
}
Assert.assertEquals("invalid value for " + ac.getKey(), exVal, actVal);
if (exVal instanceof Double || exVal instanceof Float) {
final double exDoubleValue = ((Number) exVal).doubleValue();
Assert.assertEquals(
"invalid value for " + ac.getKey(),
exDoubleValue,
((Number) actVal).doubleValue(),
exDoubleValue * 1e-6
);
} else {
Assert.assertEquals("invalid value for " + ac.getKey(), exVal, actVal);
}
}
}

View File

@ -308,8 +308,8 @@ public class TimeseriesQueryRunnerTest
final TimeseriesResultValue value = result.getValue();
Assert.assertEquals(result.toString(), 1870.061029, value.getDoubleMetric("maxIndex"), 0.0);
Assert.assertEquals(result.toString(), 59.021022, value.getDoubleMetric("minIndex"), 0.0);
Assert.assertEquals(result.toString(), 1870.061029, value.getDoubleMetric("maxIndex"), 1870.061029 * 1e-6);
Assert.assertEquals(result.toString(), 59.021022, value.getDoubleMetric("minIndex"), 59.021022 * 1e-6);
}
@Test

View File

@ -323,7 +323,7 @@ public class TestHelper
StringUtils.format("%s: key[%s]", msg, key),
((Number) expectedValue).doubleValue(),
((Number) actualValue).doubleValue(),
((Number) expectedValue).doubleValue() * 1e-6
Math.abs(((Number) expectedValue).doubleValue() * 1e-6)
);
} else {
Assert.assertEquals(