complexmetricserde workaround for groupby

This commit is contained in:
Xavier Léauté 2014-06-12 12:44:26 -07:00
parent e84bcca40f
commit 150480bdf9
2 changed files with 45 additions and 52 deletions

View File

@ -83,7 +83,7 @@ public class GroupByQueryHelper
@Override
public IncrementalIndex accumulate(IncrementalIndex accumulated, Row in)
{
if (accumulated.add(Rows.toCaseInsensitiveInputRow(in, dimensions)) > config.getMaxResults()) {
if (accumulated.add(Rows.toCaseInsensitiveInputRow(in, dimensions), false) > config.getMaxResults()) {
throw new ISE("Computation exceeds maxRows limit[%s]", config.getMaxResults());
}

View File

@ -135,17 +135,29 @@ public class IncrementalIndex implements Iterable<Row>
);
}
public int add(InputRow row)
{
// this is an ugly workaround to call ComplexMetricExtractor.extractValue at ingestion time
return add(row, true);
}
/**
* Adds a new row. The row might correspond with another row that already exists, in which case this will
* update that row instead of inserting a new one.
* <p/>
* This is *not* thread-safe. Calls to add() should always happen on the same thread.
*
* Calls to add() are thread safe.
*
* Setting deserializeComplexMetrics to false is necessary for intermediate aggregation such as groupBy that
* should not deserialize input columns using ComplexMetricSerde for aggregators that return complex metrics.
*
* @param row the row of data to add
* @param deserializeComplexMetrics flag whether or not to call ComplexMetricExtractor.extractValue() on the input
* value for aggregators that return metrics other than float.
*
* @return the number of rows in the data set after adding the InputRow
*/
public int add(InputRow row)
public int add(InputRow row, final boolean deserializeComplexMetrics)
{
row = spatialDimensionRowFormatter.formatRow(row);
@ -233,25 +245,7 @@ public class IncrementalIndex implements Iterable<Row>
final String typeName = agg.getTypeName();
final String columnName = column.toLowerCase();
if (typeName.equals("float")) {
return new ObjectColumnSelector<Float>()
{
@Override
public Class<Float> classOfObject()
{
return Float.TYPE;
}
@Override
public Float get()
{
return in.getFloatMetric(columnName);
}
};
}
if (getDimension(columnName) != null) {
return new ObjectColumnSelector<Object>()
final ObjectColumnSelector<Object> rawColumnSelector = new ObjectColumnSelector<Object>()
{
@Override
public Class<Object> classOfObject()
@ -262,39 +256,38 @@ public class IncrementalIndex implements Iterable<Row>
@Override
public Object get()
{
final String[] dimVals = in.getDimension(columnName).toArray(new String[]{});
if (dimVals.length == 1) {
return dimVals[0];
} else if (dimVals.length == 0) {
return null;
} else {
return dimVals;
}
return in.getRaw(columnName);
}
};
if(!deserializeComplexMetrics) {
return rawColumnSelector;
} else {
if (typeName.equals("float")) {
return rawColumnSelector;
}
final ComplexMetricSerde serde = ComplexMetrics.getSerdeForType(typeName);
if (serde == null) {
throw new ISE("Don't know how to handle type[%s]", typeName);
}
final ComplexMetricExtractor extractor = serde.getExtractor();
return new ObjectColumnSelector()
{
@Override
public Class classOfObject()
{
return extractor.extractedClass();
}
@Override
public Object get()
{
return extractor.extractValue(in, columnName);
}
};
}
final ComplexMetricSerde serde = ComplexMetrics.getSerdeForType(typeName);
if (serde == null) {
throw new ISE("Don't know how to handle type[%s]", typeName);
}
final ComplexMetricExtractor extractor = serde.getExtractor();
return new ObjectColumnSelector()
{
@Override
public Class classOfObject()
{
return extractor.extractedClass();
}
@Override
public Object get()
{
return extractor.extractValue(in, columnName);
}
};
}
@Override