From 150480bdf98e82bb781b67d127370ca15312fe46 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Xavier=20L=C3=A9aut=C3=A9?= Date: Thu, 12 Jun 2014 12:44:26 -0700 Subject: [PATCH] complexmetricserde workaround for groupby --- .../query/groupby/GroupByQueryHelper.java | 2 +- .../segment/incremental/IncrementalIndex.java | 95 +++++++++---------- 2 files changed, 45 insertions(+), 52 deletions(-) diff --git a/processing/src/main/java/io/druid/query/groupby/GroupByQueryHelper.java b/processing/src/main/java/io/druid/query/groupby/GroupByQueryHelper.java index 44fb82b7c6e..7041fa970da 100644 --- a/processing/src/main/java/io/druid/query/groupby/GroupByQueryHelper.java +++ b/processing/src/main/java/io/druid/query/groupby/GroupByQueryHelper.java @@ -83,7 +83,7 @@ public class GroupByQueryHelper @Override public IncrementalIndex accumulate(IncrementalIndex accumulated, Row in) { - if (accumulated.add(Rows.toCaseInsensitiveInputRow(in, dimensions)) > config.getMaxResults()) { + if (accumulated.add(Rows.toCaseInsensitiveInputRow(in, dimensions), false) > config.getMaxResults()) { throw new ISE("Computation exceeds maxRows limit[%s]", config.getMaxResults()); } diff --git a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java index 1c14d1a0447..25aa47463ed 100644 --- a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java +++ b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java @@ -135,17 +135,29 @@ public class IncrementalIndex implements Iterable ); } + public int add(InputRow row) + { + // this is an ugly workaround to call ComplexMetricExtractor.extractValue at ingestion time + return add(row, true); + } + /** * Adds a new row. The row might correspond with another row that already exists, in which case this will * update that row instead of inserting a new one. *

- * This is *not* thread-safe. Calls to add() should always happen on the same thread. + * + * Calls to add() are thread safe. + * + * Setting deserializeComplexMetrics to false is necessary for intermediate aggregation such as groupBy that + * should not deserialize input columns using ComplexMetricSerde for aggregators that return complex metrics. * * @param row the row of data to add + * @param deserializeComplexMetrics flag whether or not to call ComplexMetricExtractor.extractValue() on the input + * value for aggregators that return metrics other than float. * * @return the number of rows in the data set after adding the InputRow */ - public int add(InputRow row) + public int add(InputRow row, final boolean deserializeComplexMetrics) { row = spatialDimensionRowFormatter.formatRow(row); @@ -233,25 +245,7 @@ public class IncrementalIndex implements Iterable final String typeName = agg.getTypeName(); final String columnName = column.toLowerCase(); - if (typeName.equals("float")) { - return new ObjectColumnSelector() - { - @Override - public Class classOfObject() - { - return Float.TYPE; - } - - @Override - public Float get() - { - return in.getFloatMetric(columnName); - } - }; - } - - if (getDimension(columnName) != null) { - return new ObjectColumnSelector() + final ObjectColumnSelector rawColumnSelector = new ObjectColumnSelector() { @Override public Class classOfObject() @@ -262,39 +256,38 @@ public class IncrementalIndex implements Iterable @Override public Object get() { - final String[] dimVals = in.getDimension(columnName).toArray(new String[]{}); - if (dimVals.length == 1) { - return dimVals[0]; - } else if (dimVals.length == 0) { - return null; - } else { - return dimVals; - } + return in.getRaw(columnName); + } + }; + + if(!deserializeComplexMetrics) { + return rawColumnSelector; + } else { + if (typeName.equals("float")) { + return rawColumnSelector; + } + + final ComplexMetricSerde serde = ComplexMetrics.getSerdeForType(typeName); + if (serde == null) { + throw new ISE("Don't know how to handle type[%s]", typeName); + } + + final ComplexMetricExtractor extractor = serde.getExtractor(); + return new ObjectColumnSelector() + { + @Override + public Class classOfObject() + { + return extractor.extractedClass(); + } + + @Override + public Object get() + { + return extractor.extractValue(in, columnName); } }; } - - final ComplexMetricSerde serde = ComplexMetrics.getSerdeForType(typeName); - if (serde == null) { - throw new ISE("Don't know how to handle type[%s]", typeName); - } - - final ComplexMetricExtractor extractor = serde.getExtractor(); - - return new ObjectColumnSelector() - { - @Override - public Class classOfObject() - { - return extractor.extractedClass(); - } - - @Override - public Object get() - { - return extractor.extractValue(in, columnName); - } - }; } @Override