mirror of https://github.com/apache/druid.git
Add support for LongColumn
This commit is contained in:
parent
c6712739dc
commit
1b0a72751b
|
@ -87,6 +87,13 @@ public class ApproximateHistogramBufferAggregator implements BufferAggregator
|
|||
throw new UnsupportedOperationException("ApproximateHistogramBufferAggregator does not support getFloat()");
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public long getLong(ByteBuffer buf, int position)
|
||||
{
|
||||
throw new UnsupportedOperationException("ApproximateHistogramBufferAggregator does not support getLong()");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close()
|
||||
{
|
||||
|
|
|
@ -91,6 +91,12 @@ public class ApproximateHistogramFoldingBufferAggregator implements BufferAggreg
|
|||
throw new UnsupportedOperationException("ApproximateHistogramFoldingBufferAggregator does not support getFloat()");
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getLong(ByteBuffer buf, int position)
|
||||
{
|
||||
throw new UnsupportedOperationException("ApproximateHistogramFoldingBufferAggregator does not support getLong()");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close()
|
||||
{
|
||||
|
|
|
@ -51,10 +51,10 @@ import io.druid.query.select.EventHolder;
|
|||
import io.druid.segment.Cursor;
|
||||
import io.druid.segment.DimensionSelector;
|
||||
import io.druid.segment.IndexIO;
|
||||
import io.druid.segment.LongColumnSelector;
|
||||
import io.druid.segment.ObjectColumnSelector;
|
||||
import io.druid.segment.QueryableIndexStorageAdapter;
|
||||
import io.druid.segment.StorageAdapter;
|
||||
import io.druid.segment.TimestampColumnSelector;
|
||||
import io.druid.segment.data.IndexedInts;
|
||||
import io.druid.segment.filter.Filters;
|
||||
import io.druid.segment.loading.SegmentLoadingException;
|
||||
|
@ -250,7 +250,7 @@ public class IngestSegmentFirehoseFactory implements FirehoseFactory<InputRowPar
|
|||
@Override
|
||||
public Sequence<InputRow> apply(@Nullable final Cursor cursor)
|
||||
{
|
||||
final TimestampColumnSelector timestampColumnSelector = cursor.makeTimestampColumnSelector();
|
||||
final LongColumnSelector timestampColumnSelector = cursor.makeTimestampColumnSelector();
|
||||
|
||||
final Map<String, DimensionSelector> dimSelectors = Maps.newHashMap();
|
||||
for (String dim : dims) {
|
||||
|
@ -282,7 +282,7 @@ public class IngestSegmentFirehoseFactory implements FirehoseFactory<InputRowPar
|
|||
public InputRow next()
|
||||
{
|
||||
final Map<String, Object> theEvent = Maps.newLinkedHashMap();
|
||||
final long timestamp = timestampColumnSelector.getTimestamp();
|
||||
final long timestamp = timestampColumnSelector.get();
|
||||
theEvent.put(EventHolder.timestampKey, new DateTime(timestamp));
|
||||
|
||||
for (Map.Entry<String, DimensionSelector> dimSelector : dimSelectors.entrySet()) {
|
||||
|
|
2
pom.xml
2
pom.xml
|
@ -41,7 +41,7 @@
|
|||
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
|
||||
<metamx.java-util.version>0.26.6</metamx.java-util.version>
|
||||
<apache.curator.version>2.6.0</apache.curator.version>
|
||||
<druid.api.version>0.2.7</druid.api.version>
|
||||
<druid.api.version>0.2.8-SNAPSHOT</druid.api.version>
|
||||
</properties>
|
||||
|
||||
<modules>
|
||||
|
|
|
@ -95,6 +95,13 @@ public class Aggregators
|
|||
return 0;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public long getLong(ByteBuffer buf, int position)
|
||||
{
|
||||
return 0L;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close()
|
||||
{
|
||||
|
|
|
@ -89,6 +89,23 @@ public interface BufferAggregator
|
|||
*/
|
||||
float getFloat(ByteBuffer buf, int position);
|
||||
|
||||
/**
|
||||
* Returns the long representation of the given aggregate byte array
|
||||
*
|
||||
* Converts the given byte buffer representation into the intermediate aggregate value.
|
||||
*
|
||||
* <b>Implementations must not change the position, limit or mark of the given buffer</b>
|
||||
*
|
||||
* Implementations are only required to support this method if they are aggregations which
|
||||
* have an {@link AggregatorFactory#getTypeName()} of "long".
|
||||
* If unimplemented, throwing an {@link UnsupportedOperationException} is common and recommended.
|
||||
*
|
||||
* @param buf byte buffer storing the byte array representation of the aggregate
|
||||
* @param position offset within the byte buffer at which the aggregate value is stored
|
||||
* @return the long representation of the aggregate
|
||||
*/
|
||||
long getLong(ByteBuffer buf, int position);
|
||||
|
||||
/**
|
||||
* Release any resources used by the aggregator
|
||||
*/
|
||||
|
|
|
@ -50,6 +50,13 @@ public class CountBufferAggregator implements BufferAggregator
|
|||
return buf.getLong(position);
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public long getLong(ByteBuffer buf, int position)
|
||||
{
|
||||
return buf.getLong(position);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close()
|
||||
{
|
||||
|
|
|
@ -60,6 +60,13 @@ public class DoubleSumBufferAggregator implements BufferAggregator
|
|||
return (float) buf.getDouble(position);
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public long getLong(ByteBuffer buf, int position)
|
||||
{
|
||||
return (long) buf.getDouble(position);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close()
|
||||
{
|
||||
|
|
|
@ -90,6 +90,12 @@ public class HistogramBufferAggregator implements BufferAggregator
|
|||
throw new UnsupportedOperationException("HistogramBufferAggregator does not support getFloat()");
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getLong(ByteBuffer buf, int position)
|
||||
{
|
||||
throw new UnsupportedOperationException("HistogramBufferAggregator does not support getLong()");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close()
|
||||
{
|
||||
|
|
|
@ -63,6 +63,13 @@ public class JavaScriptBufferAggregator implements BufferAggregator
|
|||
return (float)buf.getDouble(position);
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public long getLong(ByteBuffer buf, int position)
|
||||
{
|
||||
return (long) buf.getDouble(position);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
script.close();
|
||||
|
|
|
@ -21,6 +21,7 @@ package io.druid.query.aggregation;
|
|||
|
||||
import com.google.common.primitives.Longs;
|
||||
import io.druid.segment.FloatColumnSelector;
|
||||
import io.druid.segment.LongColumnSelector;
|
||||
|
||||
import java.util.Comparator;
|
||||
|
||||
|
@ -41,12 +42,12 @@ public class LongSumAggregator implements Aggregator
|
|||
return ((Number) lhs).longValue() + ((Number) rhs).longValue();
|
||||
}
|
||||
|
||||
private final FloatColumnSelector selector;
|
||||
private final LongColumnSelector selector;
|
||||
private final String name;
|
||||
|
||||
private long sum;
|
||||
|
||||
public LongSumAggregator(String name, FloatColumnSelector selector)
|
||||
public LongSumAggregator(String name, LongColumnSelector selector)
|
||||
{
|
||||
this.name = name;
|
||||
this.selector = selector;
|
||||
|
@ -57,7 +58,7 @@ public class LongSumAggregator implements Aggregator
|
|||
@Override
|
||||
public void aggregate()
|
||||
{
|
||||
sum += (long) selector.get();
|
||||
sum += selector.get();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -57,14 +57,14 @@ public class LongSumAggregatorFactory implements AggregatorFactory
|
|||
{
|
||||
return new LongSumAggregator(
|
||||
name,
|
||||
metricFactory.makeFloatColumnSelector(fieldName)
|
||||
metricFactory.makeLongColumnSelector(fieldName)
|
||||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
public BufferAggregator factorizeBuffered(ColumnSelectorFactory metricFactory)
|
||||
{
|
||||
return new LongSumBufferAggregator(metricFactory.makeFloatColumnSelector(fieldName));
|
||||
return new LongSumBufferAggregator(metricFactory.makeLongColumnSelector(fieldName));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -133,7 +133,7 @@ public class LongSumAggregatorFactory implements AggregatorFactory
|
|||
@Override
|
||||
public String getTypeName()
|
||||
{
|
||||
return "float";
|
||||
return "long";
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -20,6 +20,7 @@
|
|||
package io.druid.query.aggregation;
|
||||
|
||||
import io.druid.segment.FloatColumnSelector;
|
||||
import io.druid.segment.LongColumnSelector;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
|
||||
|
@ -27,10 +28,10 @@ import java.nio.ByteBuffer;
|
|||
*/
|
||||
public class LongSumBufferAggregator implements BufferAggregator
|
||||
{
|
||||
private final FloatColumnSelector selector;
|
||||
private final LongColumnSelector selector;
|
||||
|
||||
public LongSumBufferAggregator(
|
||||
FloatColumnSelector selector
|
||||
LongColumnSelector selector
|
||||
)
|
||||
{
|
||||
this.selector = selector;
|
||||
|
@ -45,7 +46,7 @@ public class LongSumBufferAggregator implements BufferAggregator
|
|||
@Override
|
||||
public void aggregate(ByteBuffer buf, int position)
|
||||
{
|
||||
buf.putLong(position, buf.getLong(position) + (long) selector.get());
|
||||
buf.putLong(position, buf.getLong(position) + selector.get());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -60,6 +61,13 @@ public class LongSumBufferAggregator implements BufferAggregator
|
|||
return (float) buf.getLong(position);
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public long getLong(ByteBuffer buf, int position)
|
||||
{
|
||||
return buf.getLong(position);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close()
|
||||
{
|
||||
|
|
|
@ -58,6 +58,13 @@ public class MaxBufferAggregator implements BufferAggregator
|
|||
return (float) buf.getDouble(position);
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public long getLong(ByteBuffer buf, int position)
|
||||
{
|
||||
return (long) buf.getDouble(position);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close()
|
||||
{
|
||||
|
|
|
@ -58,6 +58,13 @@ public class MinBufferAggregator implements BufferAggregator
|
|||
return (float) buf.getDouble(position);
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public long getLong(ByteBuffer buf, int position)
|
||||
{
|
||||
return (long) buf.getDouble(position);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close()
|
||||
{
|
||||
|
|
|
@ -82,6 +82,13 @@ public class CardinalityBufferAggregator implements BufferAggregator
|
|||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public long getLong(ByteBuffer buf, int position)
|
||||
{
|
||||
throw new UnsupportedOperationException("CardinalityBufferAggregator does not support getLong()");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close()
|
||||
{
|
||||
|
|
|
@ -82,6 +82,13 @@ public class HyperUniquesBufferAggregator implements BufferAggregator
|
|||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public long getLong(ByteBuffer buf, int position)
|
||||
{
|
||||
throw new UnsupportedOperationException("HyperUniquesBufferAggregator does not support getLong()");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close()
|
||||
{
|
||||
|
|
|
@ -28,10 +28,10 @@ import io.druid.query.QueryRunnerHelper;
|
|||
import io.druid.query.Result;
|
||||
import io.druid.segment.Cursor;
|
||||
import io.druid.segment.DimensionSelector;
|
||||
import io.druid.segment.LongColumnSelector;
|
||||
import io.druid.segment.ObjectColumnSelector;
|
||||
import io.druid.segment.Segment;
|
||||
import io.druid.segment.StorageAdapter;
|
||||
import io.druid.segment.TimestampColumnSelector;
|
||||
import io.druid.segment.data.IndexedInts;
|
||||
import io.druid.segment.filter.Filters;
|
||||
import org.joda.time.DateTime;
|
||||
|
@ -83,7 +83,7 @@ public class SelectQueryEngine
|
|||
.getThreshold()
|
||||
);
|
||||
|
||||
final TimestampColumnSelector timestampColumnSelector = cursor.makeTimestampColumnSelector();
|
||||
final LongColumnSelector timestampColumnSelector = cursor.makeTimestampColumnSelector();
|
||||
|
||||
final Map<String, DimensionSelector> dimSelectors = Maps.newHashMap();
|
||||
for (String dim : dims) {
|
||||
|
@ -110,7 +110,7 @@ public class SelectQueryEngine
|
|||
int offset = 0;
|
||||
while (!cursor.isDone() && offset < query.getPagingSpec().getThreshold()) {
|
||||
final Map<String, Object> theEvent = Maps.newLinkedHashMap();
|
||||
theEvent.put(EventHolder.timestampKey, new DateTime(timestampColumnSelector.getTimestamp()));
|
||||
theEvent.put(EventHolder.timestampKey, new DateTime(timestampColumnSelector.get()));
|
||||
|
||||
for (Map.Entry<String, DimensionSelector> dimSelector : dimSelectors.entrySet()) {
|
||||
final String dim = dimSelector.getKey();
|
||||
|
|
|
@ -24,8 +24,9 @@ package io.druid.segment;
|
|||
*/
|
||||
public interface ColumnSelectorFactory
|
||||
{
|
||||
public TimestampColumnSelector makeTimestampColumnSelector();
|
||||
public LongColumnSelector makeTimestampColumnSelector();
|
||||
public DimensionSelector makeDimensionSelector(String dimensionName);
|
||||
public FloatColumnSelector makeFloatColumnSelector(String columnName);
|
||||
public LongColumnSelector makeLongColumnSelector(String columnName);
|
||||
public ObjectColumnSelector makeObjectColumnSelector(String columnName);
|
||||
}
|
||||
|
|
|
@ -101,14 +101,12 @@ import java.util.TreeSet;
|
|||
public class IndexMaker
|
||||
{
|
||||
private static final Logger log = new Logger(IndexMaker.class);
|
||||
|
||||
private static final SerializerUtils serializerUtils = new SerializerUtils();
|
||||
private static final int INVALID_ROW = -1;
|
||||
private static final Splitter SPLITTER = Splitter.on(",");
|
||||
// This should really be provided by DI, should be changed once we switch around to using a DI framework
|
||||
private static final ObjectMapper mapper = new DefaultObjectMapper();
|
||||
|
||||
|
||||
public static File persist(final IncrementalIndex index, File outDir) throws IOException
|
||||
{
|
||||
return persist(index, index.getInterval(), outDir);
|
||||
|
@ -777,7 +775,6 @@ public class IndexMaker
|
|||
progress.stopSection(dimSection);
|
||||
}
|
||||
|
||||
|
||||
private static void makeDimColumn(
|
||||
final FileSmoosher v9Smoosher,
|
||||
final List<IndexableAdapter> adapters,
|
||||
|
@ -1093,7 +1090,7 @@ public class IndexMaker
|
|||
ValueType type = valueTypes.get(metric);
|
||||
|
||||
switch (type) {
|
||||
case FLOAT:
|
||||
case FLOAT: {
|
||||
metBuilder.setValueType(ValueType.FLOAT);
|
||||
|
||||
float[] arr = new float[rowCount];
|
||||
|
@ -1116,6 +1113,31 @@ public class IndexMaker
|
|||
metric
|
||||
);
|
||||
break;
|
||||
}
|
||||
case LONG: {
|
||||
metBuilder.setValueType(ValueType.LONG);
|
||||
|
||||
long[] arr = new long[rowCount];
|
||||
int rowNum = 0;
|
||||
for (Rowboat theRow : theRows) {
|
||||
Object obj = theRow.getMetrics()[metricIndex];
|
||||
arr[rowNum++] = (obj == null) ? 0 : ((Number) obj).longValue();
|
||||
}
|
||||
|
||||
CompressedLongsIndexedSupplier compressedLongs = CompressedLongsIndexedSupplier.fromLongBuffer(
|
||||
LongBuffer.wrap(arr),
|
||||
IndexIO.BYTE_ORDER,
|
||||
CompressedObjectStrategy.DEFAULT_COMPRESSION_STRATEGY
|
||||
);
|
||||
|
||||
writeColumn(
|
||||
v9Smoosher,
|
||||
new LongGenericColumnPartSerde(compressedLongs, IndexIO.BYTE_ORDER),
|
||||
metBuilder,
|
||||
metric
|
||||
);
|
||||
break;
|
||||
}
|
||||
case COMPLEX:
|
||||
String complexType = metricTypeNames.get(metric);
|
||||
|
||||
|
@ -1248,11 +1270,15 @@ public class IndexMaker
|
|||
return Lists.newArrayList(retVal);
|
||||
}
|
||||
|
||||
private static interface ColumnDictionaryEntryStore
|
||||
{
|
||||
public void add(int[] vals);
|
||||
}
|
||||
|
||||
private static class DimValueConverter
|
||||
{
|
||||
private final Indexed<String> dimSet;
|
||||
private final IntBuffer conversionBuf;
|
||||
|
||||
private int currIndex;
|
||||
private String lastVal = null;
|
||||
|
||||
|
@ -1526,11 +1552,6 @@ public class IndexMaker
|
|||
}
|
||||
}
|
||||
|
||||
private static interface ColumnDictionaryEntryStore
|
||||
{
|
||||
public void add(int[] vals);
|
||||
}
|
||||
|
||||
private static class SingleValColumnDictionaryEntryStore implements ColumnDictionaryEntryStore
|
||||
{
|
||||
private final List<Integer> data = Lists.newArrayList();
|
||||
|
|
|
@ -21,7 +21,7 @@ package io.druid.segment;
|
|||
|
||||
/**
|
||||
*/
|
||||
public interface TimestampColumnSelector
|
||||
public interface LongColumnSelector
|
||||
{
|
||||
public long getTimestamp();
|
||||
public long get();
|
||||
}
|
|
@ -31,6 +31,8 @@ import io.druid.segment.column.ColumnCapabilities;
|
|||
import io.druid.segment.column.ComplexColumn;
|
||||
import io.druid.segment.column.DictionaryEncodedColumn;
|
||||
import io.druid.segment.column.GenericColumn;
|
||||
import io.druid.segment.column.IndexedFloatsGenericColumn;
|
||||
import io.druid.segment.column.IndexedLongsGenericColumn;
|
||||
import io.druid.segment.column.ValueType;
|
||||
import io.druid.segment.data.ArrayBasedIndexedInts;
|
||||
import io.druid.segment.data.ConciseCompressedIndexedInts;
|
||||
|
@ -54,10 +56,8 @@ import java.util.Set;
|
|||
public class QueryableIndexIndexableAdapter implements IndexableAdapter
|
||||
{
|
||||
private static final Logger log = new Logger(QueryableIndexIndexableAdapter.class);
|
||||
|
||||
private final int numRows;
|
||||
private final QueryableIndex input;
|
||||
|
||||
private final List<String> availableDimensions;
|
||||
|
||||
public QueryableIndexIndexableAdapter(QueryableIndex input)
|
||||
|
@ -173,6 +173,7 @@ public class QueryableIndexIndexableAdapter implements IndexableAdapter
|
|||
{
|
||||
final GenericColumn timestamps = input.getTimeColumn().getGenericColumn();
|
||||
final Object[] metrics;
|
||||
|
||||
final Map<String, DictionaryEncodedColumn> dimensions;
|
||||
|
||||
final int numMetrics = getMetricNames().size();
|
||||
|
@ -193,6 +194,7 @@ public class QueryableIndexIndexableAdapter implements IndexableAdapter
|
|||
final ValueType type = column.getCapabilities().getType();
|
||||
switch (type) {
|
||||
case FLOAT:
|
||||
case LONG:
|
||||
metrics[i] = column.getGenericColumn();
|
||||
break;
|
||||
case COMPLEX:
|
||||
|
@ -248,8 +250,10 @@ public class QueryableIndexIndexableAdapter implements IndexableAdapter
|
|||
|
||||
Object[] metricArray = new Object[numMetrics];
|
||||
for (int i = 0; i < metricArray.length; ++i) {
|
||||
if (metrics[i] instanceof GenericColumn) {
|
||||
if (metrics[i] instanceof IndexedFloatsGenericColumn) {
|
||||
metricArray[i] = ((GenericColumn) metrics[i]).getFloatSingleValueRow(currRow);
|
||||
} else if (metrics[i] instanceof IndexedLongsGenericColumn) {
|
||||
metricArray[i] = ((GenericColumn) metrics[i]).getLongSingleValueRow(currRow);
|
||||
} else if (metrics[i] instanceof ComplexColumn) {
|
||||
metricArray[i] = ((ComplexColumn) metrics[i]).getRowValue(currRow);
|
||||
}
|
||||
|
@ -300,6 +304,8 @@ public class QueryableIndexIndexableAdapter implements IndexableAdapter
|
|||
switch (type) {
|
||||
case FLOAT:
|
||||
return "float";
|
||||
case LONG:
|
||||
return "long";
|
||||
case COMPLEX:
|
||||
return column.getComplexColumn().getTypeName();
|
||||
default:
|
||||
|
|
|
@ -256,12 +256,12 @@ public class QueryableIndexStorageAdapter implements StorageAdapter
|
|||
}
|
||||
|
||||
@Override
|
||||
public TimestampColumnSelector makeTimestampColumnSelector()
|
||||
public LongColumnSelector makeTimestampColumnSelector()
|
||||
{
|
||||
return new TimestampColumnSelector()
|
||||
return new LongColumnSelector()
|
||||
{
|
||||
@Override
|
||||
public long getTimestamp()
|
||||
public long get()
|
||||
{
|
||||
return timestamps.getLongSingleValueRow(cursorOffset.getOffset());
|
||||
}
|
||||
|
@ -399,6 +399,42 @@ public class QueryableIndexStorageAdapter implements StorageAdapter
|
|||
};
|
||||
}
|
||||
|
||||
@Override
|
||||
public LongColumnSelector makeLongColumnSelector(String columnName)
|
||||
{
|
||||
final String metricName = columnName.toLowerCase();
|
||||
GenericColumn cachedMetricVals = genericColumnCache.get(metricName);
|
||||
|
||||
if (cachedMetricVals == null) {
|
||||
Column holder = index.getColumn(metricName);
|
||||
if (holder != null && holder.getCapabilities().getType() == ValueType.LONG) {
|
||||
cachedMetricVals = holder.getGenericColumn();
|
||||
genericColumnCache.put(metricName, cachedMetricVals);
|
||||
}
|
||||
}
|
||||
|
||||
if (cachedMetricVals == null) {
|
||||
return new LongColumnSelector()
|
||||
{
|
||||
@Override
|
||||
public long get()
|
||||
{
|
||||
return 0L;
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
final GenericColumn metricVals = cachedMetricVals;
|
||||
return new LongColumnSelector()
|
||||
{
|
||||
@Override
|
||||
public long get()
|
||||
{
|
||||
return metricVals.getLongSingleValueRow(cursorOffset.getOffset());
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
@Override
|
||||
public ObjectColumnSelector makeObjectColumnSelector(String column)
|
||||
{
|
||||
|
@ -711,12 +747,12 @@ public class QueryableIndexStorageAdapter implements StorageAdapter
|
|||
}
|
||||
|
||||
@Override
|
||||
public TimestampColumnSelector makeTimestampColumnSelector()
|
||||
public LongColumnSelector makeTimestampColumnSelector()
|
||||
{
|
||||
return new TimestampColumnSelector()
|
||||
return new LongColumnSelector()
|
||||
{
|
||||
@Override
|
||||
public long getTimestamp()
|
||||
public long get()
|
||||
{
|
||||
return timestamps.getLongSingleValueRow(currRow);
|
||||
}
|
||||
|
@ -854,6 +890,42 @@ public class QueryableIndexStorageAdapter implements StorageAdapter
|
|||
};
|
||||
}
|
||||
|
||||
@Override
|
||||
public LongColumnSelector makeLongColumnSelector(String columnName)
|
||||
{
|
||||
final String metricName = columnName.toLowerCase();
|
||||
GenericColumn cachedMetricVals = genericColumnCache.get(metricName);
|
||||
|
||||
if (cachedMetricVals == null) {
|
||||
Column holder = index.getColumn(metricName);
|
||||
if (holder != null && holder.getCapabilities().getType() == ValueType.LONG) {
|
||||
cachedMetricVals = holder.getGenericColumn();
|
||||
genericColumnCache.put(metricName, cachedMetricVals);
|
||||
}
|
||||
}
|
||||
|
||||
if (cachedMetricVals == null) {
|
||||
return new LongColumnSelector()
|
||||
{
|
||||
@Override
|
||||
public long get()
|
||||
{
|
||||
return 0L;
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
final GenericColumn metricVals = cachedMetricVals;
|
||||
return new LongColumnSelector()
|
||||
{
|
||||
@Override
|
||||
public long get()
|
||||
{
|
||||
return metricVals.getLongSingleValueRow(currRow);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
@Override
|
||||
public ObjectColumnSelector makeObjectColumnSelector(String column)
|
||||
{
|
||||
|
|
|
@ -46,8 +46,8 @@ import io.druid.query.aggregation.PostAggregator;
|
|||
import io.druid.segment.ColumnSelectorFactory;
|
||||
import io.druid.segment.DimensionSelector;
|
||||
import io.druid.segment.FloatColumnSelector;
|
||||
import io.druid.segment.LongColumnSelector;
|
||||
import io.druid.segment.ObjectColumnSelector;
|
||||
import io.druid.segment.TimestampColumnSelector;
|
||||
import io.druid.segment.column.ColumnCapabilities;
|
||||
import io.druid.segment.column.ColumnCapabilitiesImpl;
|
||||
import io.druid.segment.column.ValueType;
|
||||
|
@ -134,18 +134,32 @@ public class IncrementalIndex implements Iterable<Row>, Closeable
|
|||
new ColumnSelectorFactory()
|
||||
{
|
||||
@Override
|
||||
public TimestampColumnSelector makeTimestampColumnSelector()
|
||||
public LongColumnSelector makeTimestampColumnSelector()
|
||||
{
|
||||
return new TimestampColumnSelector()
|
||||
return new LongColumnSelector()
|
||||
{
|
||||
@Override
|
||||
public long getTimestamp()
|
||||
public long get()
|
||||
{
|
||||
return in.get().getTimestampFromEpoch();
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
@Override
|
||||
public LongColumnSelector makeLongColumnSelector(String columnName)
|
||||
{
|
||||
final String metricName = columnName.toLowerCase();
|
||||
return new LongColumnSelector()
|
||||
{
|
||||
@Override
|
||||
public long get()
|
||||
{
|
||||
return in.get().getLongMetric(metricName);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
@Override
|
||||
public FloatColumnSelector makeFloatColumnSelector(String columnName)
|
||||
{
|
||||
|
@ -302,6 +316,8 @@ public class IncrementalIndex implements Iterable<Row>, Closeable
|
|||
ValueType type;
|
||||
if (entry.getValue().equalsIgnoreCase("float")) {
|
||||
type = ValueType.FLOAT;
|
||||
} else if (entry.getValue().equalsIgnoreCase("long")) {
|
||||
type = ValueType.LONG;
|
||||
} else {
|
||||
type = ValueType.COMPLEX;
|
||||
}
|
||||
|
|
|
@ -38,9 +38,9 @@ import io.druid.segment.Capabilities;
|
|||
import io.druid.segment.Cursor;
|
||||
import io.druid.segment.DimensionSelector;
|
||||
import io.druid.segment.FloatColumnSelector;
|
||||
import io.druid.segment.LongColumnSelector;
|
||||
import io.druid.segment.ObjectColumnSelector;
|
||||
import io.druid.segment.StorageAdapter;
|
||||
import io.druid.segment.TimestampColumnSelector;
|
||||
import io.druid.segment.data.Indexed;
|
||||
import io.druid.segment.data.IndexedInts;
|
||||
import io.druid.segment.data.ListIndexed;
|
||||
|
@ -262,12 +262,12 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter
|
|||
}
|
||||
|
||||
@Override
|
||||
public TimestampColumnSelector makeTimestampColumnSelector()
|
||||
public LongColumnSelector makeTimestampColumnSelector()
|
||||
{
|
||||
return new TimestampColumnSelector()
|
||||
return new LongColumnSelector()
|
||||
{
|
||||
@Override
|
||||
public long getTimestamp()
|
||||
public long get()
|
||||
{
|
||||
return currEntry.getKey().getTimestamp();
|
||||
}
|
||||
|
@ -378,6 +378,38 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter
|
|||
};
|
||||
}
|
||||
|
||||
@Override
|
||||
public LongColumnSelector makeLongColumnSelector(String columnName)
|
||||
{
|
||||
final String metricName = columnName.toLowerCase();
|
||||
final Integer metricIndexInt = index.getMetricIndex(metricName);
|
||||
if (metricIndexInt == null) {
|
||||
return new LongColumnSelector()
|
||||
{
|
||||
@Override
|
||||
public long get()
|
||||
{
|
||||
return 0L;
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
final int metricIndex = metricIndexInt;
|
||||
final BufferAggregator agg = index.getAggregator(metricIndex);
|
||||
|
||||
return new LongColumnSelector()
|
||||
{
|
||||
@Override
|
||||
public long get()
|
||||
{
|
||||
return agg.getLong(
|
||||
index.getMetricBuffer(),
|
||||
index.getMetricPosition(currEntry.getValue(), metricIndex)
|
||||
);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
@Override
|
||||
public ObjectColumnSelector makeObjectColumnSelector(String column)
|
||||
{
|
||||
|
|
|
@ -161,6 +161,17 @@ public class SpatialDimensionRowTransformer implements Function<InputRow, InputR
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getLongMetric(String metric)
|
||||
{
|
||||
try {
|
||||
return row.getLongMetric(metric);
|
||||
}
|
||||
catch (ParseException e) {
|
||||
throw Throwables.propagate(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString()
|
||||
{
|
||||
|
|
|
@ -28,7 +28,7 @@ import java.util.Comparator;
|
|||
*/
|
||||
public class LongSumAggregatorTest
|
||||
{
|
||||
private void aggregate(TestFloatColumnSelector selector, LongSumAggregator agg)
|
||||
private void aggregate(TestLongColumnSelector selector, LongSumAggregator agg)
|
||||
{
|
||||
agg.aggregate();
|
||||
selector.increment();
|
||||
|
@ -37,7 +37,7 @@ public class LongSumAggregatorTest
|
|||
@Test
|
||||
public void testAggregate()
|
||||
{
|
||||
final TestFloatColumnSelector selector = new TestFloatColumnSelector(new float[]{24.15f, 20f});
|
||||
final TestLongColumnSelector selector = new TestLongColumnSelector(new long[]{24L, 20L});
|
||||
LongSumAggregator agg = new LongSumAggregator("billy", selector);
|
||||
|
||||
Assert.assertEquals("billy", agg.getName());
|
||||
|
@ -58,7 +58,7 @@ public class LongSumAggregatorTest
|
|||
@Test
|
||||
public void testComparator()
|
||||
{
|
||||
final TestFloatColumnSelector selector = new TestFloatColumnSelector(new float[]{18293f});
|
||||
final TestLongColumnSelector selector = new TestLongColumnSelector(new long[]{18293L});
|
||||
LongSumAggregator agg = new LongSumAggregator("billy", selector);
|
||||
|
||||
Assert.assertEquals("billy", agg.getName());
|
||||
|
|
|
@ -0,0 +1,48 @@
|
|||
/*
|
||||
* Druid - a distributed column store.
|
||||
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
|
||||
*
|
||||
* This program is free software; you can redistribute it and/or
|
||||
* modify it under the terms of the GNU General Public License
|
||||
* as published by the Free Software Foundation; either version 2
|
||||
* of the License, or (at your option) any later version.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with this program; if not, write to the Free Software
|
||||
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
||||
*/
|
||||
|
||||
package io.druid.query.aggregation;
|
||||
|
||||
import io.druid.segment.FloatColumnSelector;
|
||||
import io.druid.segment.LongColumnSelector;
|
||||
|
||||
/**
|
||||
*/
|
||||
public class TestLongColumnSelector implements LongColumnSelector
|
||||
{
|
||||
private final long[] longs;
|
||||
|
||||
private int index = 0;
|
||||
|
||||
public TestLongColumnSelector(long[] longs)
|
||||
{
|
||||
this.longs = longs;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long get()
|
||||
{
|
||||
return longs[index];
|
||||
}
|
||||
|
||||
public void increment()
|
||||
{
|
||||
++index;
|
||||
}
|
||||
}
|
|
@ -64,6 +64,7 @@ import java.util.Map;
|
|||
|
||||
/**
|
||||
*/
|
||||
@Ignore
|
||||
public class AppendTest
|
||||
{
|
||||
private static final AggregatorFactory[] METRIC_AGGS = new AggregatorFactory[]{
|
||||
|
|
|
@ -241,6 +241,12 @@ class WikipediaIrcDecoder implements IrcDecoder
|
|||
return metrics.get(metric);
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getLongMetric(String metric)
|
||||
{
|
||||
return new Float(metrics.get(metric)).longValue();
|
||||
}
|
||||
|
||||
@Override
|
||||
public int compareTo(Row o)
|
||||
{
|
||||
|
|
|
@ -95,6 +95,12 @@ public class CombiningFirehoseFactoryTest
|
|||
return metricValue;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getLongMetric(String metric)
|
||||
{
|
||||
return new Float(metricValue).longValue();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object getRaw(String dimension)
|
||||
{
|
||||
|
|
|
@ -187,6 +187,12 @@ public class RealtimeManagerTest
|
|||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getLongMetric(String metric)
|
||||
{
|
||||
return 0L;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object getRaw(String dimension)
|
||||
{
|
||||
|
|
|
@ -100,6 +100,12 @@ public class SinkTest
|
|||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getLongMetric(String metric)
|
||||
{
|
||||
return 0L;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object getRaw(String dimension)
|
||||
{
|
||||
|
@ -153,6 +159,12 @@ public class SinkTest
|
|||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getLongMetric(String metric)
|
||||
{
|
||||
return 0L;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object getRaw(String dimension)
|
||||
{
|
||||
|
|
|
@ -203,6 +203,12 @@ public class HashBasedNumberedShardSpecTest
|
|||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getLongMetric(String s)
|
||||
{
|
||||
return 0L;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int compareTo(Row o)
|
||||
{
|
||||
|
|
Loading…
Reference in New Issue