object column selectors

This commit is contained in:
xvrl 2013-04-11 14:42:19 -07:00
parent 0c68bd1b1d
commit c83631928c
6 changed files with 507 additions and 6 deletions

View File

@ -26,4 +26,5 @@ public interface ColumnSelectorFactory
{ {
public FloatMetricSelector makeFloatMetricSelector(String metricName); public FloatMetricSelector makeFloatMetricSelector(String metricName);
public ComplexMetricSelector makeComplexMetricSelector(String metricName); public ComplexMetricSelector makeComplexMetricSelector(String metricName);
public ObjectColumnSelector makeObjectColumnSelector(String columnName);
} }

View File

@ -0,0 +1,26 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.processing;
public interface ObjectColumnSelector<T>
{
public Class<T> classOfObject();
public T get();
}

View File

@ -36,6 +36,11 @@ import com.metamx.druid.QueryGranularity;
import com.metamx.druid.aggregation.Aggregator; import com.metamx.druid.aggregation.Aggregator;
import com.metamx.druid.aggregation.AggregatorFactory; import com.metamx.druid.aggregation.AggregatorFactory;
import com.metamx.druid.aggregation.post.PostAggregator; import com.metamx.druid.aggregation.post.PostAggregator;
import com.metamx.druid.index.column.Column;
import com.metamx.druid.index.column.ComplexColumn;
import com.metamx.druid.index.column.DictionaryEncodedColumn;
import com.metamx.druid.index.column.GenericColumn;
import com.metamx.druid.index.column.ValueType;
import com.metamx.druid.index.v1.serde.ComplexMetricExtractor; import com.metamx.druid.index.v1.serde.ComplexMetricExtractor;
import com.metamx.druid.index.v1.serde.ComplexMetricSerde; import com.metamx.druid.index.v1.serde.ComplexMetricSerde;
import com.metamx.druid.index.v1.serde.ComplexMetrics; import com.metamx.druid.index.v1.serde.ComplexMetrics;
@ -45,6 +50,7 @@ import com.metamx.druid.input.Row;
import com.metamx.druid.processing.ColumnSelectorFactory; import com.metamx.druid.processing.ColumnSelectorFactory;
import com.metamx.druid.processing.ComplexMetricSelector; import com.metamx.druid.processing.ComplexMetricSelector;
import com.metamx.druid.processing.FloatMetricSelector; import com.metamx.druid.processing.FloatMetricSelector;
import com.metamx.druid.processing.ObjectColumnSelector;
import org.joda.time.DateTime; import org.joda.time.DateTime;
import org.joda.time.Interval; import org.joda.time.Interval;
@ -189,6 +195,7 @@ public class IncrementalIndex implements Iterable<Row>
public ComplexMetricSelector makeComplexMetricSelector(final String metric) public ComplexMetricSelector makeComplexMetricSelector(final String metric)
{ {
final String typeName = agg.getTypeName(); final String typeName = agg.getTypeName();
final ComplexMetricSerde serde = ComplexMetrics.getSerdeForType(typeName); final ComplexMetricSerde serde = ComplexMetrics.getSerdeForType(typeName);
if (serde == null) { if (serde == null) {
@ -213,6 +220,51 @@ public class IncrementalIndex implements Iterable<Row>
} }
}; };
} }
@Override
public ObjectColumnSelector makeObjectColumnSelector(String column)
{
final String typeName = agg.getTypeName();
final String columnName = column.toLowerCase();
if(typeName.equals("float")) return new ObjectColumnSelector<Float>()
{
@Override
public Class classOfObject()
{
return Float.TYPE;
}
@Override
public Float get()
{
return in.getFloatMetric(columnName);
}
};
final ComplexMetricSerde serde = ComplexMetrics.getSerdeForType(typeName);
if (serde == null) {
throw new ISE("Don't know how to handle type[%s]", typeName);
}
final ComplexMetricExtractor extractor = serde.getExtractor();
return new ObjectColumnSelector()
{
@Override
public Class classOfObject()
{
return extractor.extractedClass();
}
@Override
public Object get()
{
return extractor.extractValue(in, columnName);
}
};
}
} }
); );
} }

View File

@ -43,6 +43,7 @@ import com.metamx.druid.index.v1.serde.ComplexMetrics;
import com.metamx.druid.kv.IndexedInts; import com.metamx.druid.kv.IndexedInts;
import com.metamx.druid.processing.ComplexMetricSelector; import com.metamx.druid.processing.ComplexMetricSelector;
import com.metamx.druid.processing.FloatMetricSelector; import com.metamx.druid.processing.FloatMetricSelector;
import com.metamx.druid.processing.ObjectColumnSelector;
import com.metamx.druid.query.search.SearchHit; import com.metamx.druid.query.search.SearchHit;
import com.metamx.druid.query.search.SearchQuery; import com.metamx.druid.query.search.SearchQuery;
import com.metamx.druid.query.search.SearchQuerySpec; import com.metamx.druid.query.search.SearchQuerySpec;
@ -359,6 +360,61 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter
} }
}; };
} }
@Override
public ObjectColumnSelector makeObjectColumnSelector(String column)
{
final String columnName = column.toLowerCase();
final Integer metricIndexInt = index.getMetricIndex(columnName);
if(metricIndexInt != null) {
final int metricIndex = metricIndexInt;
final ComplexMetricSerde serde = ComplexMetrics.getSerdeForType(index.getMetricType(columnName));
return new ObjectColumnSelector()
{
@Override
public Class classOfObject()
{
return serde.getObjectStrategy().getClazz();
}
@Override
public Object get()
{
return currEntry.getValue()[metricIndex].get();
}
};
}
final Integer dimensionIndexInt = index.getDimensionIndex(columnName);
if(dimensionIndexInt != null) {
final int dimensionIndex = dimensionIndexInt;
return new ObjectColumnSelector<String>()
{
@Override
public Class classOfObject()
{
return String.class;
}
@Override
public String get()
{
final String[] dimVals = currEntry.getKey().getDims()[dimensionIndex];
if(dimVals.length == 1) return dimVals[0];
if(dimVals.length == 0) return null;
throw new UnsupportedOperationException(
"makeObjectColumnSelector does not support multivalued columns"
);
}
};
}
return null;
}
}; };
} }
} }

View File

@ -46,11 +46,13 @@ import com.metamx.druid.kv.IndexedInts;
import com.metamx.druid.kv.ListIndexed; import com.metamx.druid.kv.ListIndexed;
import com.metamx.druid.processing.ComplexMetricSelector; import com.metamx.druid.processing.ComplexMetricSelector;
import com.metamx.druid.processing.FloatMetricSelector; import com.metamx.druid.processing.FloatMetricSelector;
import com.metamx.druid.processing.ObjectColumnSelector;
import it.uniroma3.mat.extendedset.intset.ImmutableConciseSet; import it.uniroma3.mat.extendedset.intset.ImmutableConciseSet;
import org.joda.time.DateTime; import org.joda.time.DateTime;
import org.joda.time.Interval; import org.joda.time.Interval;
import java.io.Closeable; import java.io.Closeable;
import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Iterator; import java.util.Iterator;
import java.util.Map; import java.util.Map;
@ -148,7 +150,7 @@ public class IndexStorageAdapter extends BaseStorageAdapter
); );
} }
final Map<String, Object> metricHolderCache = Maps.newHashMap(); final Map<String, Object> columnHolderCache = Maps.newHashMap();
// This after call is not perfect, if there is an exception during processing, it will never get called, // This after call is not perfect, if there is an exception during processing, it will never get called,
// but it's better than nothing and doing this properly all the time requires a lot more fixerating // but it's better than nothing and doing this properly all the time requires a lot more fixerating
@ -246,7 +248,7 @@ public class IndexStorageAdapter extends BaseStorageAdapter
public FloatMetricSelector makeFloatMetricSelector(String metric) public FloatMetricSelector makeFloatMetricSelector(String metric)
{ {
String metricName = metric.toLowerCase(); String metricName = metric.toLowerCase();
IndexedFloats cachedFloats = (IndexedFloats) metricHolderCache.get(metric); IndexedFloats cachedFloats = (IndexedFloats) columnHolderCache.get(metric);
if (cachedFloats == null) { if (cachedFloats == null) {
MetricHolder holder = index.metricVals.get(metricName); MetricHolder holder = index.metricVals.get(metricName);
if (holder == null) { if (holder == null) {
@ -261,7 +263,7 @@ public class IndexStorageAdapter extends BaseStorageAdapter
} }
cachedFloats = holder.getFloatType(); cachedFloats = holder.getFloatType();
metricHolderCache.put(metricName, cachedFloats); columnHolderCache.put(metricName, cachedFloats);
} }
final IndexedFloats metricVals = cachedFloats; final IndexedFloats metricVals = cachedFloats;
@ -279,12 +281,13 @@ public class IndexStorageAdapter extends BaseStorageAdapter
public ComplexMetricSelector makeComplexMetricSelector(String metric) public ComplexMetricSelector makeComplexMetricSelector(String metric)
{ {
final String metricName = metric.toLowerCase(); final String metricName = metric.toLowerCase();
Indexed cachedComplex = (Indexed) metricHolderCache.get(metricName); Indexed cachedComplex = (Indexed) columnHolderCache.get(metricName);
if (cachedComplex == null) { if (cachedComplex == null) {
MetricHolder holder = index.metricVals.get(metricName); MetricHolder holder = index.metricVals.get(metricName);
if (holder != null) { if (holder != null) {
cachedComplex = holder.getComplexType(); cachedComplex = holder.getComplexType();
metricHolderCache.put(metricName, cachedComplex); columnHolderCache.put(metricName, cachedComplex);
} }
} }
@ -308,6 +311,107 @@ public class IndexStorageAdapter extends BaseStorageAdapter
} }
}; };
} }
@Override
public ObjectColumnSelector makeObjectColumnSelector(String column)
{
final String columnName = column.toLowerCase();
Object cachedColumn = (Indexed) columnHolderCache.get(columnName);
if (cachedColumn == null) {
MetricHolder holder = index.metricVals.get(columnName);
final String[] nameLookup = index.reverseDimLookup.get(columnName);
if(nameLookup != null) {
cachedColumn = index.dimensionValues.get(columnName);
}
else if(holder != null) {
final MetricHolder.MetricType type = holder.getType();
if (type == MetricHolder.MetricType.COMPLEX) {
cachedColumn = holder.getComplexType();
}
else {
cachedColumn = holder.getFloatType();
}
}
if(cachedColumn != null) {
columnHolderCache.put(columnName, cachedColumn);
}
}
if (cachedColumn == null) {
return null;
}
if(cachedColumn instanceof IndexedFloats) {
final IndexedFloats vals = (IndexedFloats)cachedColumn;
return new ObjectColumnSelector<Float>()
{
@Override
public Class classOfObject()
{
return Float.TYPE;
}
@Override
public Float get()
{
return vals.get(cursorOffset.getOffset());
}
};
}
if(cachedColumn instanceof Indexed) {
final Indexed vals = (Indexed)cachedColumn;
return new ObjectColumnSelector()
{
@Override
public Class classOfObject()
{
return vals.getClazz();
}
@Override
public Object get()
{
return vals.get(cursorOffset.getOffset());
}
};
}
if(cachedColumn instanceof DimensionColumn) {
final DimensionColumn vals = (DimensionColumn)cachedColumn;
final String[] nameLookup = index.reverseDimLookup.get(columnName);
final int[] dimensionRowValues = vals.getDimensionRowValues();
final int[][] dimensionExpansions = vals.getDimensionExpansions();
return new ObjectColumnSelector<String>()
{
@Override
public Class classOfObject()
{
return String.class;
}
@Override
public String get()
{
final int[] dimIds = dimensionExpansions[dimensionRowValues[cursorOffset.getOffset()]];
if(dimIds.length == 1) return nameLookup[dimIds[0]];
if(dimIds.length == 0) return null;
throw new UnsupportedOperationException(
"makeObjectColumnSelector does not support multivalued columns"
);
}
};
}
return null;
}
}; };
} }
} }
@ -317,7 +421,7 @@ public class IndexStorageAdapter extends BaseStorageAdapter
@Override @Override
public void run() public void run()
{ {
for (Object object : metricHolderCache.values()) { for (Object object : columnHolderCache.values()) {
if (object instanceof Closeable) { if (object instanceof Closeable) {
Closeables.closeQuietly((Closeable) object); Closeables.closeQuietly((Closeable) object);
} }

View File

@ -48,10 +48,12 @@ import com.metamx.druid.kv.IndexedInts;
import com.metamx.druid.kv.IndexedIterable; import com.metamx.druid.kv.IndexedIterable;
import com.metamx.druid.processing.ComplexMetricSelector; import com.metamx.druid.processing.ComplexMetricSelector;
import com.metamx.druid.processing.FloatMetricSelector; import com.metamx.druid.processing.FloatMetricSelector;
import com.metamx.druid.processing.ObjectColumnSelector;
import it.uniroma3.mat.extendedset.intset.ImmutableConciseSet; import it.uniroma3.mat.extendedset.intset.ImmutableConciseSet;
import org.joda.time.DateTime; import org.joda.time.DateTime;
import org.joda.time.Interval; import org.joda.time.Interval;
import java.io.Closeable;
import java.util.Iterator; import java.util.Iterator;
import java.util.Map; import java.util.Map;
@ -254,6 +256,8 @@ public class QueryableIndexStorageAdapter extends BaseStorageAdapter
final Map<String, GenericColumn> genericColumnCache = Maps.newHashMap(); final Map<String, GenericColumn> genericColumnCache = Maps.newHashMap();
final Map<String, ComplexColumn> complexColumnCache = Maps.newHashMap(); final Map<String, ComplexColumn> complexColumnCache = Maps.newHashMap();
final Map<String, Object> objectColumnCache = Maps.newHashMap();
final GenericColumn timestamps = index.getTimeColumn().getGenericColumn(); final GenericColumn timestamps = index.getTimeColumn().getGenericColumn();
final FunctionalIterator<Cursor> retVal = FunctionalIterator final FunctionalIterator<Cursor> retVal = FunctionalIterator
@ -465,6 +469,131 @@ public class QueryableIndexStorageAdapter extends BaseStorageAdapter
} }
}; };
} }
@Override
public ObjectColumnSelector makeObjectColumnSelector(String column)
{
final String columnName = column.toLowerCase();
Object cachedColumnVals = objectColumnCache.get(columnName);
if (cachedColumnVals == null) {
Column holder = index.getColumn(columnName);
if(holder != null) {
if(holder.getCapabilities().hasMultipleValues()) {
throw new UnsupportedOperationException(
"makeObjectColumnSelector does not support multivalued columns"
);
}
final ValueType type = holder.getCapabilities().getType();
if(holder.getCapabilities().isDictionaryEncoded()) {
cachedColumnVals = holder.getDictionaryEncoding();
}
else if(type == ValueType.COMPLEX) {
cachedColumnVals = holder.getComplexColumn();
}
else {
cachedColumnVals = holder.getGenericColumn();
}
}
if(cachedColumnVals != null) objectColumnCache.put(columnName, cachedColumnVals);
}
if (cachedColumnVals == null) {
return null;
}
if(cachedColumnVals instanceof GenericColumn) {
final GenericColumn columnVals = (GenericColumn) cachedColumnVals;
final ValueType type = columnVals.getType();
if(type == ValueType.FLOAT) {
return new ObjectColumnSelector<Float>()
{
@Override
public Class classOfObject()
{
return Float.TYPE;
}
@Override
public Float get()
{
return columnVals.getFloatSingleValueRow(cursorOffset.getOffset());
}
};
}
if(type == ValueType.LONG) {
return new ObjectColumnSelector<Long>()
{
@Override
public Class classOfObject()
{
return Long.TYPE;
}
@Override
public Long get()
{
return columnVals.getLongSingleValueRow(cursorOffset.getOffset());
}
};
}
if(type == ValueType.STRING) {
return new ObjectColumnSelector<String>()
{
@Override
public Class classOfObject()
{
return String.class;
}
@Override
public String get()
{
return columnVals.getStringSingleValueRow(cursorOffset.getOffset());
}
};
}
}
if (cachedColumnVals instanceof DictionaryEncodedColumn) {
final DictionaryEncodedColumn columnVals = (DictionaryEncodedColumn) cachedColumnVals;
return new ObjectColumnSelector<String>()
{
@Override
public Class classOfObject()
{
return String.class;
}
@Override
public String get()
{
return columnVals.lookupName(columnVals.getSingleValueRow(cursorOffset.getOffset()));
}
};
}
final ComplexColumn columnVals = (ComplexColumn)cachedColumnVals;
return new ObjectColumnSelector()
{
@Override
public Class classOfObject()
{
return columnVals.getClazz();
}
@Override
public Object get()
{
return columnVals.getRowValue(cursorOffset.getOffset());
}
};
}
}; };
} }
} }
@ -486,6 +615,9 @@ public class QueryableIndexStorageAdapter extends BaseStorageAdapter
for (ComplexColumn complexColumn : complexColumnCache.values()) { for (ComplexColumn complexColumn : complexColumnCache.values()) {
Closeables.closeQuietly(complexColumn); Closeables.closeQuietly(complexColumn);
} }
for (Object column : complexColumnCache.values()) {
if(column instanceof Closeable) Closeables.closeQuietly((Closeable)column);
}
} }
} }
); );
@ -562,6 +694,8 @@ public class QueryableIndexStorageAdapter extends BaseStorageAdapter
{ {
final Map<String, GenericColumn> genericColumnCache = Maps.newHashMap(); final Map<String, GenericColumn> genericColumnCache = Maps.newHashMap();
final Map<String, ComplexColumn> complexColumnCache = Maps.newHashMap(); final Map<String, ComplexColumn> complexColumnCache = Maps.newHashMap();
final Map<String, Object> objectColumnCache = Maps.newHashMap();
final GenericColumn timestamps = index.getTimeColumn().getGenericColumn(); final GenericColumn timestamps = index.getTimeColumn().getGenericColumn();
final FunctionalIterator<Cursor> retVal = FunctionalIterator final FunctionalIterator<Cursor> retVal = FunctionalIterator
@ -769,6 +903,131 @@ public class QueryableIndexStorageAdapter extends BaseStorageAdapter
} }
}; };
} }
@Override
public ObjectColumnSelector makeObjectColumnSelector(String column)
{
final String columnName = column.toLowerCase();
Object cachedColumnVals = objectColumnCache.get(columnName);
if (cachedColumnVals == null) {
Column holder = index.getColumn(columnName);
if(holder != null) {
if(holder.getCapabilities().hasMultipleValues()) {
throw new UnsupportedOperationException(
"makeObjectColumnSelector does not support multivalued columns"
);
}
final ValueType type = holder.getCapabilities().getType();
if(holder.getCapabilities().isDictionaryEncoded()) {
cachedColumnVals = holder.getDictionaryEncoding();
}
else if(type == ValueType.COMPLEX) {
cachedColumnVals = holder.getComplexColumn();
}
else {
cachedColumnVals = holder.getGenericColumn();
}
}
if(cachedColumnVals != null) objectColumnCache.put(columnName, cachedColumnVals);
}
if (cachedColumnVals == null) {
return null;
}
if(cachedColumnVals instanceof GenericColumn) {
final GenericColumn columnVals = (GenericColumn) cachedColumnVals;
final ValueType type = columnVals.getType();
if(type == ValueType.FLOAT) {
return new ObjectColumnSelector<Float>()
{
@Override
public Class classOfObject()
{
return Float.TYPE;
}
@Override
public Float get()
{
return columnVals.getFloatSingleValueRow(currRow);
}
};
}
if(type == ValueType.LONG) {
return new ObjectColumnSelector<Long>()
{
@Override
public Class classOfObject()
{
return Long.TYPE;
}
@Override
public Long get()
{
return columnVals.getLongSingleValueRow(currRow);
}
};
}
if(type == ValueType.STRING) {
return new ObjectColumnSelector<String>()
{
@Override
public Class classOfObject()
{
return String.class;
}
@Override
public String get()
{
return columnVals.getStringSingleValueRow(currRow);
}
};
}
}
if (cachedColumnVals instanceof DictionaryEncodedColumn) {
final DictionaryEncodedColumn columnVals = (DictionaryEncodedColumn) cachedColumnVals;
return new ObjectColumnSelector<String>()
{
@Override
public Class classOfObject()
{
return String.class;
}
@Override
public String get()
{
return columnVals.lookupName(columnVals.getSingleValueRow(currRow));
}
};
}
final ComplexColumn columnVals = (ComplexColumn)cachedColumnVals;
return new ObjectColumnSelector()
{
@Override
public Class classOfObject()
{
return columnVals.getClazz();
}
@Override
public Object get()
{
return columnVals.getRowValue(currRow);
}
};
}
}; };
} }
} }
@ -788,6 +1047,9 @@ public class QueryableIndexStorageAdapter extends BaseStorageAdapter
for (ComplexColumn complexColumn : complexColumnCache.values()) { for (ComplexColumn complexColumn : complexColumnCache.values()) {
Closeables.closeQuietly(complexColumn); Closeables.closeQuietly(complexColumn);
} }
for (Object column : objectColumnCache.values()) {
if(column instanceof Closeable) Closeables.closeQuietly((Closeable)column);
}
} }
} }
); );