Merge pull request #805 from metamx/long-column

Long column Support
This commit is contained in:
Fangjin Yang 2014-10-21 23:11:26 -06:00
commit 1ebc8a2a72
44 changed files with 438 additions and 122 deletions

View File

@ -87,6 +87,13 @@ public class ApproximateHistogramBufferAggregator implements BufferAggregator
throw new UnsupportedOperationException("ApproximateHistogramBufferAggregator does not support getFloat()");
}
@Override
public long getLong(ByteBuffer buf, int position)
{
throw new UnsupportedOperationException("ApproximateHistogramBufferAggregator does not support getLong()");
}
@Override
public void close()
{

View File

@ -91,6 +91,12 @@ public class ApproximateHistogramFoldingBufferAggregator implements BufferAggreg
throw new UnsupportedOperationException("ApproximateHistogramFoldingBufferAggregator does not support getFloat()");
}
@Override
public long getLong(ByteBuffer buf, int position)
{
throw new UnsupportedOperationException("ApproximateHistogramFoldingBufferAggregator does not support getLong()");
}
@Override
public void close()
{

View File

@ -51,10 +51,11 @@ import io.druid.query.select.EventHolder;
import io.druid.segment.Cursor;
import io.druid.segment.DimensionSelector;
import io.druid.segment.IndexIO;
import io.druid.segment.LongColumnSelector;
import io.druid.segment.ObjectColumnSelector;
import io.druid.segment.QueryableIndexStorageAdapter;
import io.druid.segment.StorageAdapter;
import io.druid.segment.TimestampColumnSelector;
import io.druid.segment.column.Column;
import io.druid.segment.data.IndexedInts;
import io.druid.segment.filter.Filters;
import io.druid.segment.loading.SegmentLoadingException;
@ -250,7 +251,7 @@ public class IngestSegmentFirehoseFactory implements FirehoseFactory<InputRowPar
@Override
public Sequence<InputRow> apply(final Cursor cursor)
{
final TimestampColumnSelector timestampColumnSelector = cursor.makeTimestampColumnSelector();
final LongColumnSelector timestampColumnSelector = cursor.makeLongColumnSelector(Column.TIME_COLUMN_NAME);
final Map<String, DimensionSelector> dimSelectors = Maps.newHashMap();
for (String dim : dims) {
@ -287,7 +288,7 @@ public class IngestSegmentFirehoseFactory implements FirehoseFactory<InputRowPar
public InputRow next()
{
final Map<String, Object> theEvent = Maps.newLinkedHashMap();
final long timestamp = timestampColumnSelector.getTimestamp();
final long timestamp = timestampColumnSelector.get();
theEvent.put(EventHolder.timestampKey, new DateTime(timestamp));
for (Map.Entry<String, DimensionSelector> dimSelector : dimSelectors.entrySet()) {

View File

@ -41,7 +41,7 @@
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<metamx.java-util.version>0.26.7</metamx.java-util.version>
<apache.curator.version>2.6.0</apache.curator.version>
<druid.api.version>0.2.10</druid.api.version>
<druid.api.version>0.2.16</druid.api.version>
</properties>
<modules>

View File

@ -95,6 +95,13 @@ public class Aggregators
return 0;
}
@Override
public long getLong(ByteBuffer buf, int position)
{
return 0L;
}
@Override
public void close()
{

View File

@ -89,6 +89,23 @@ public interface BufferAggregator
*/
float getFloat(ByteBuffer buf, int position);
/**
* Returns the long representation of the given aggregate byte array
*
* Converts the given byte buffer representation into the intermediate aggregate value.
*
* <b>Implementations must not change the position, limit or mark of the given buffer</b>
*
* Implementations are only required to support this method if they are aggregations which
* have an {@link AggregatorFactory#getTypeName()} of "long".
* If unimplemented, throwing an {@link UnsupportedOperationException} is common and recommended.
*
* @param buf byte buffer storing the byte array representation of the aggregate
* @param position offset within the byte buffer at which the aggregate value is stored
* @return the long representation of the aggregate
*/
long getLong(ByteBuffer buf, int position);
/**
* Release any resources used by the aggregator
*/

View File

@ -50,6 +50,13 @@ public class CountBufferAggregator implements BufferAggregator
return buf.getLong(position);
}
@Override
public long getLong(ByteBuffer buf, int position)
{
return buf.getLong(position);
}
@Override
public void close()
{

View File

@ -60,6 +60,13 @@ public class DoubleSumBufferAggregator implements BufferAggregator
return (float) buf.getDouble(position);
}
@Override
public long getLong(ByteBuffer buf, int position)
{
return (long) buf.getDouble(position);
}
@Override
public void close()
{

View File

@ -90,6 +90,12 @@ public class HistogramBufferAggregator implements BufferAggregator
throw new UnsupportedOperationException("HistogramBufferAggregator does not support getFloat()");
}
@Override
public long getLong(ByteBuffer buf, int position)
{
throw new UnsupportedOperationException("HistogramBufferAggregator does not support getLong()");
}
@Override
public void close()
{

View File

@ -63,6 +63,13 @@ public class JavaScriptBufferAggregator implements BufferAggregator
return (float)buf.getDouble(position);
}
@Override
public long getLong(ByteBuffer buf, int position)
{
return (long) buf.getDouble(position);
}
@Override
public void close() {
script.close();

View File

@ -21,6 +21,7 @@ package io.druid.query.aggregation;
import com.google.common.primitives.Longs;
import io.druid.segment.FloatColumnSelector;
import io.druid.segment.LongColumnSelector;
import java.util.Comparator;
@ -41,12 +42,12 @@ public class LongSumAggregator implements Aggregator
return ((Number) lhs).longValue() + ((Number) rhs).longValue();
}
private final FloatColumnSelector selector;
private final LongColumnSelector selector;
private final String name;
private long sum;
public LongSumAggregator(String name, FloatColumnSelector selector)
public LongSumAggregator(String name, LongColumnSelector selector)
{
this.name = name;
this.selector = selector;
@ -57,7 +58,7 @@ public class LongSumAggregator implements Aggregator
@Override
public void aggregate()
{
sum += (long) selector.get();
sum += selector.get();
}
@Override

View File

@ -58,14 +58,14 @@ public class LongSumAggregatorFactory implements AggregatorFactory
{
return new LongSumAggregator(
name,
metricFactory.makeFloatColumnSelector(fieldName)
metricFactory.makeLongColumnSelector(fieldName)
);
}
@Override
public BufferAggregator factorizeBuffered(ColumnSelectorFactory metricFactory)
{
return new LongSumBufferAggregator(metricFactory.makeFloatColumnSelector(fieldName));
return new LongSumBufferAggregator(metricFactory.makeLongColumnSelector(fieldName));
}
@Override
@ -134,7 +134,7 @@ public class LongSumAggregatorFactory implements AggregatorFactory
@Override
public String getTypeName()
{
return "float";
return "long";
}
@Override

View File

@ -20,6 +20,7 @@
package io.druid.query.aggregation;
import io.druid.segment.FloatColumnSelector;
import io.druid.segment.LongColumnSelector;
import java.nio.ByteBuffer;
@ -27,10 +28,10 @@ import java.nio.ByteBuffer;
*/
public class LongSumBufferAggregator implements BufferAggregator
{
private final FloatColumnSelector selector;
private final LongColumnSelector selector;
public LongSumBufferAggregator(
FloatColumnSelector selector
LongColumnSelector selector
)
{
this.selector = selector;
@ -45,7 +46,7 @@ public class LongSumBufferAggregator implements BufferAggregator
@Override
public void aggregate(ByteBuffer buf, int position)
{
buf.putLong(position, buf.getLong(position) + (long) selector.get());
buf.putLong(position, buf.getLong(position) + selector.get());
}
@Override
@ -60,6 +61,13 @@ public class LongSumBufferAggregator implements BufferAggregator
return (float) buf.getLong(position);
}
@Override
public long getLong(ByteBuffer buf, int position)
{
return buf.getLong(position);
}
@Override
public void close()
{

View File

@ -58,6 +58,12 @@ public class MaxBufferAggregator implements BufferAggregator
return (float) buf.getDouble(position);
}
@Override
public long getLong(ByteBuffer buf, int position)
{
return (long) buf.getDouble(position);
}
@Override
public void close()
{

View File

@ -58,6 +58,13 @@ public class MinBufferAggregator implements BufferAggregator
return (float) buf.getDouble(position);
}
@Override
public long getLong(ByteBuffer buf, int position)
{
return (long) buf.getDouble(position);
}
@Override
public void close()
{

View File

@ -79,7 +79,14 @@ public class CardinalityBufferAggregator implements BufferAggregator
@Override
public float getFloat(ByteBuffer buf, int position)
{
throw new UnsupportedOperationException();
throw new UnsupportedOperationException("CardinalityBufferAggregator does not support getFloat()");
}
@Override
public long getLong(ByteBuffer buf, int position)
{
throw new UnsupportedOperationException("CardinalityBufferAggregator does not support getLong()");
}
@Override

View File

@ -79,7 +79,14 @@ public class HyperUniquesBufferAggregator implements BufferAggregator
@Override
public float getFloat(ByteBuffer buf, int position)
{
throw new UnsupportedOperationException();
throw new UnsupportedOperationException("HyperUniquesBufferAggregator does not support getFloat()");
}
@Override
public long getLong(ByteBuffer buf, int position)
{
throw new UnsupportedOperationException("HyperUniquesBufferAggregator does not support getLong()");
}
@Override

View File

@ -84,7 +84,7 @@ public class SegmentAnalyzer
columns.put(columnName, analysis);
}
columns.put("__time", lengthBasedAnalysis(index.getTimeColumn(), NUM_BYTES_IN_TIMESTAMP));
columns.put(Column.TIME_COLUMN_NAME, lengthBasedAnalysis(index.getColumn(Column.TIME_COLUMN_NAME), NUM_BYTES_IN_TIMESTAMP));
return columns;
}

View File

@ -28,10 +28,11 @@ import io.druid.query.QueryRunnerHelper;
import io.druid.query.Result;
import io.druid.segment.Cursor;
import io.druid.segment.DimensionSelector;
import io.druid.segment.LongColumnSelector;
import io.druid.segment.ObjectColumnSelector;
import io.druid.segment.Segment;
import io.druid.segment.StorageAdapter;
import io.druid.segment.TimestampColumnSelector;
import io.druid.segment.column.Column;
import io.druid.segment.data.IndexedInts;
import io.druid.segment.filter.Filters;
import org.joda.time.DateTime;
@ -83,7 +84,7 @@ public class SelectQueryEngine
.getThreshold()
);
final TimestampColumnSelector timestampColumnSelector = cursor.makeTimestampColumnSelector();
final LongColumnSelector timestampColumnSelector = cursor.makeLongColumnSelector(Column.TIME_COLUMN_NAME);
final Map<String, DimensionSelector> dimSelectors = Maps.newHashMap();
for (String dim : dims) {
@ -110,7 +111,7 @@ public class SelectQueryEngine
int offset = 0;
while (!cursor.isDone() && offset < query.getPagingSpec().getThreshold()) {
final Map<String, Object> theEvent = Maps.newLinkedHashMap();
theEvent.put(EventHolder.timestampKey, new DateTime(timestampColumnSelector.getTimestamp()));
theEvent.put(EventHolder.timestampKey, new DateTime(timestampColumnSelector.get()));
for (Map.Entry<String, DimensionSelector> dimSelector : dimSelectors.entrySet()) {
final String dim = dimSelector.getKey();

View File

@ -23,6 +23,5 @@ package io.druid.segment;import io.druid.segment.column.Column;
*/
public interface ColumnSelector
{
public Column getTimeColumn();
public Column getColumn(String columnName);
}

View File

@ -91,7 +91,7 @@ public class ColumnSelectorBitmapIndexSelector implements BitmapIndexSelector
{
GenericColumn column = null;
try {
column = index.getTimeColumn().getGenericColumn();
column = index.getColumn(Column.TIME_COLUMN_NAME).getGenericColumn();
return column.length();
}
finally {

View File

@ -24,8 +24,8 @@ package io.druid.segment;
*/
public interface ColumnSelectorFactory
{
public TimestampColumnSelector makeTimestampColumnSelector();
public DimensionSelector makeDimensionSelector(String dimensionName);
public FloatColumnSelector makeFloatColumnSelector(String columnName);
public LongColumnSelector makeLongColumnSelector(String columnName);
public ObjectColumnSelector makeObjectColumnSelector(String columnName);
}

View File

@ -715,15 +715,14 @@ public class IndexIO
}
String[] cols = colSet.toArray(new String[colSet.size()]);
columns.put(Column.TIME_COLUMN_NAME, new ColumnBuilder()
.setType(ValueType.LONG)
.setGenericColumn(new LongGenericColumnSupplier(index.timestamps))
.build());
return new SimpleQueryableIndex(
index.getDataInterval(),
new ArrayIndexed<>(cols, String.class),
index.getAvailableDimensions(),
new ColumnBuilder()
.setType(ValueType.LONG)
.setGenericColumn(new LongGenericColumnSupplier(index.timestamps))
.build(),
columns,
index.getFileMapper()
);
@ -756,8 +755,10 @@ public class IndexIO
columns.put(columnName, deserializeColumn(mapper, smooshedFiles.mapFile(columnName)));
}
columns.put(Column.TIME_COLUMN_NAME, deserializeColumn(mapper, smooshedFiles.mapFile("__time")));
final QueryableIndex index = new SimpleQueryableIndex(
dataInterval, cols, dims, deserializeColumn(mapper, smooshedFiles.mapFile("__time")), columns, smooshedFiles
dataInterval, cols, dims, columns, smooshedFiles
);
log.debug("Mapped v9 index[%s] in %,d millis", inDir, System.currentTimeMillis() - startTime);

View File

@ -105,7 +105,6 @@ import java.util.TreeSet;
public class IndexMaker
{
private static final Logger log = new Logger(IndexMaker.class);
private static final SerializerUtils serializerUtils = new SerializerUtils();
private static final int INVALID_ROW = -1;
private static final Splitter SPLITTER = Splitter.on(",");
@ -116,7 +115,6 @@ public class IndexMaker
mapper = injector.getInstance(ObjectMapper.class);
}
public static File persist(final IncrementalIndex index, File outDir) throws IOException
{
return persist(index, index.getInterval(), outDir);
@ -806,7 +804,6 @@ public class IndexMaker
progress.stopSection(dimSection);
}
private static void makeDimColumn(
final FileSmoosher v9Smoosher,
final List<IndexableAdapter> adapters,
@ -1237,7 +1234,7 @@ public class IndexMaker
ValueType type = valueTypes.get(metric);
switch (type) {
case FLOAT:
case FLOAT: {
metBuilder.setValueType(ValueType.FLOAT);
float[] arr = new float[rowCount];
@ -1260,6 +1257,31 @@ public class IndexMaker
metric
);
break;
}
case LONG: {
metBuilder.setValueType(ValueType.LONG);
long[] arr = new long[rowCount];
int rowNum = 0;
for (Rowboat theRow : theRows) {
Object obj = theRow.getMetrics()[metricIndex];
arr[rowNum++] = (obj == null) ? 0 : ((Number) obj).longValue();
}
CompressedLongsIndexedSupplier compressedLongs = CompressedLongsIndexedSupplier.fromLongBuffer(
LongBuffer.wrap(arr),
IndexIO.BYTE_ORDER,
CompressedObjectStrategy.DEFAULT_COMPRESSION_STRATEGY
);
writeColumn(
v9Smoosher,
new LongGenericColumnPartSerde(compressedLongs, IndexIO.BYTE_ORDER),
metBuilder,
metric
);
break;
}
case COMPLEX:
String complexType = metricTypeNames.get(metric);
@ -1392,11 +1414,15 @@ public class IndexMaker
return Lists.newArrayList(retVal);
}
private static interface ColumnDictionaryEntryStore
{
public void add(int[] vals);
}
private static class DimValueConverter
{
private final Indexed<String> dimSet;
private final IntBuffer conversionBuf;
private int currIndex;
private String lastVal = null;
@ -1672,11 +1698,6 @@ public class IndexMaker
}
}
private static interface ColumnDictionaryEntryStore
{
public void add(int[] vals);
}
private static class SingleValColumnDictionaryEntryStore implements ColumnDictionaryEntryStore
{
private final List<Integer> data = Lists.newArrayList();

View File

@ -21,7 +21,7 @@ package io.druid.segment;
/**
*/
public interface TimestampColumnSelector
public interface LongColumnSelector
{
public long getTimestamp();
public long get();
}

View File

@ -31,6 +31,8 @@ import io.druid.segment.column.ColumnCapabilities;
import io.druid.segment.column.ComplexColumn;
import io.druid.segment.column.DictionaryEncodedColumn;
import io.druid.segment.column.GenericColumn;
import io.druid.segment.column.IndexedFloatsGenericColumn;
import io.druid.segment.column.IndexedLongsGenericColumn;
import io.druid.segment.column.ValueType;
import io.druid.segment.data.ArrayBasedIndexedInts;
import io.druid.segment.data.ConciseCompressedIndexedInts;
@ -54,10 +56,8 @@ import java.util.Set;
public class QueryableIndexIndexableAdapter implements IndexableAdapter
{
private static final Logger log = new Logger(QueryableIndexIndexableAdapter.class);
private final int numRows;
private final QueryableIndex input;
private final List<String> availableDimensions;
public QueryableIndexIndexableAdapter(QueryableIndex input)
@ -171,8 +171,9 @@ public class QueryableIndexIndexableAdapter implements IndexableAdapter
{
return new Iterator<Rowboat>()
{
final GenericColumn timestamps = input.getTimeColumn().getGenericColumn();
final GenericColumn timestamps = input.getColumn(Column.TIME_COLUMN_NAME).getGenericColumn();
final Object[] metrics;
final Map<String, DictionaryEncodedColumn> dimensions;
final int numMetrics = getMetricNames().size();
@ -193,6 +194,7 @@ public class QueryableIndexIndexableAdapter implements IndexableAdapter
final ValueType type = column.getCapabilities().getType();
switch (type) {
case FLOAT:
case LONG:
metrics[i] = column.getGenericColumn();
break;
case COMPLEX:
@ -248,8 +250,10 @@ public class QueryableIndexIndexableAdapter implements IndexableAdapter
Object[] metricArray = new Object[numMetrics];
for (int i = 0; i < metricArray.length; ++i) {
if (metrics[i] instanceof GenericColumn) {
if (metrics[i] instanceof IndexedFloatsGenericColumn) {
metricArray[i] = ((GenericColumn) metrics[i]).getFloatSingleValueRow(currRow);
} else if (metrics[i] instanceof IndexedLongsGenericColumn) {
metricArray[i] = ((GenericColumn) metrics[i]).getLongSingleValueRow(currRow);
} else if (metrics[i] instanceof ComplexColumn) {
metricArray[i] = ((ComplexColumn) metrics[i]).getRowValue(currRow);
}
@ -300,6 +304,8 @@ public class QueryableIndexIndexableAdapter implements IndexableAdapter
switch (type) {
case FLOAT:
return "float";
case LONG:
return "long";
case COMPLEX:
return column.getComplexColumn().getTypeName();
default:

View File

@ -106,7 +106,7 @@ public class QueryableIndexStorageAdapter implements StorageAdapter
{
GenericColumn column = null;
try {
column = index.getTimeColumn().getGenericColumn();
column = index.getColumn(Column.TIME_COLUMN_NAME).getGenericColumn();
return new DateTime(column.getLongSingleValueRow(0));
}
finally {
@ -119,7 +119,7 @@ public class QueryableIndexStorageAdapter implements StorageAdapter
{
GenericColumn column = null;
try {
column = index.getTimeColumn().getGenericColumn();
column = index.getColumn(Column.TIME_COLUMN_NAME).getGenericColumn();
return new DateTime(column.getLongSingleValueRow(column.length() - 1));
}
finally {
@ -195,7 +195,7 @@ public class QueryableIndexStorageAdapter implements StorageAdapter
final Map<String, ComplexColumn> complexColumnCache = Maps.newHashMap();
final Map<String, Object> objectColumnCache = Maps.newHashMap();
final GenericColumn timestamps = index.getTimeColumn().getGenericColumn();
final GenericColumn timestamps = index.getColumn(Column.TIME_COLUMN_NAME).getGenericColumn();
return Sequences.withBaggage(
Sequences.map(
@ -258,19 +258,6 @@ public class QueryableIndexStorageAdapter implements StorageAdapter
cursorOffset = initOffset.clone();
}
@Override
public TimestampColumnSelector makeTimestampColumnSelector()
{
return new TimestampColumnSelector()
{
@Override
public long getTimestamp()
{
return timestamps.getLongSingleValueRow(cursorOffset.getOffset());
}
};
}
@Override
public DimensionSelector makeDimensionSelector(String dimension)
{
@ -374,7 +361,8 @@ public class QueryableIndexStorageAdapter implements StorageAdapter
if (cachedMetricVals == null) {
Column holder = index.getColumn(metricName);
if (holder != null && holder.getCapabilities().getType() == ValueType.FLOAT) {
if (holder != null && (holder.getCapabilities().getType() == ValueType.FLOAT
|| holder.getCapabilities().getType() == ValueType.LONG)) {
cachedMetricVals = holder.getGenericColumn();
genericColumnCache.put(metricName, cachedMetricVals);
}
@ -402,6 +390,43 @@ public class QueryableIndexStorageAdapter implements StorageAdapter
};
}
@Override
public LongColumnSelector makeLongColumnSelector(String columnName)
{
final String metricName = columnName.toLowerCase();
GenericColumn cachedMetricVals = genericColumnCache.get(metricName);
if (cachedMetricVals == null) {
Column holder = index.getColumn(metricName);
if (holder != null && (holder.getCapabilities().getType() == ValueType.LONG
|| holder.getCapabilities().getType() == ValueType.FLOAT)) {
cachedMetricVals = holder.getGenericColumn();
genericColumnCache.put(metricName, cachedMetricVals);
}
}
if (cachedMetricVals == null) {
return new LongColumnSelector()
{
@Override
public long get()
{
return 0L;
}
};
}
final GenericColumn metricVals = cachedMetricVals;
return new LongColumnSelector()
{
@Override
public long get()
{
return metricVals.getLongSingleValueRow(cursorOffset.getOffset());
}
};
}
@Override
public ObjectColumnSelector makeObjectColumnSelector(String column)
{
@ -657,7 +682,7 @@ public class QueryableIndexStorageAdapter implements StorageAdapter
final Map<String, ComplexColumn> complexColumnCache = Maps.newHashMap();
final Map<String, Object> objectColumnCache = Maps.newHashMap();
final GenericColumn timestamps = index.getTimeColumn().getGenericColumn();
final GenericColumn timestamps = index.getColumn(Column.TIME_COLUMN_NAME).getGenericColumn();
return Sequences.withBaggage(
Sequences.map(
@ -713,19 +738,6 @@ public class QueryableIndexStorageAdapter implements StorageAdapter
currRow = initRow;
}
@Override
public TimestampColumnSelector makeTimestampColumnSelector()
{
return new TimestampColumnSelector()
{
@Override
public long getTimestamp()
{
return timestamps.getLongSingleValueRow(currRow);
}
};
}
@Override
public DimensionSelector makeDimensionSelector(String dimension)
{
@ -829,7 +841,8 @@ public class QueryableIndexStorageAdapter implements StorageAdapter
if (cachedMetricVals == null) {
Column holder = index.getColumn(metricName);
if (holder != null && holder.getCapabilities().getType() == ValueType.FLOAT) {
if (holder != null && (holder.getCapabilities().getType() == ValueType.LONG
|| holder.getCapabilities().getType() == ValueType.FLOAT)) {
cachedMetricVals = holder.getGenericColumn();
genericColumnCache.put(metricName, cachedMetricVals);
}
@ -857,6 +870,43 @@ public class QueryableIndexStorageAdapter implements StorageAdapter
};
}
@Override
public LongColumnSelector makeLongColumnSelector(String columnName)
{
final String metricName = columnName.toLowerCase();
GenericColumn cachedMetricVals = genericColumnCache.get(metricName);
if (cachedMetricVals == null) {
Column holder = index.getColumn(metricName);
if (holder != null && (holder.getCapabilities().getType() == ValueType.LONG
|| holder.getCapabilities().getType() == ValueType.FLOAT)) {
cachedMetricVals = holder.getGenericColumn();
genericColumnCache.put(metricName, cachedMetricVals);
}
}
if (cachedMetricVals == null) {
return new LongColumnSelector()
{
@Override
public long get()
{
return 0L;
}
};
}
final GenericColumn metricVals = cachedMetricVals;
return new LongColumnSelector()
{
@Override
public long get()
{
return metricVals.getLongSingleValueRow(currRow);
}
};
}
@Override
public ObjectColumnSelector makeObjectColumnSelector(String column)
{

View File

@ -19,6 +19,7 @@
package io.druid.segment;
import com.google.common.base.Preconditions;
import com.metamx.common.io.smoosh.SmooshedFileMapper;
import io.druid.segment.column.Column;
import io.druid.segment.data.Indexed;
@ -34,24 +35,22 @@ public class SimpleQueryableIndex implements QueryableIndex
private final Interval dataInterval;
private final Indexed<String> columnNames;
private final Indexed<String> availableDimensions;
private final Column timeColumn;
private final Map<String, Column> otherColumns;
private final Map<String, Column> columns;
private final SmooshedFileMapper fileMapper;
public SimpleQueryableIndex(
Interval dataInterval,
Indexed<String> columnNames,
Indexed<String> dimNames,
Column timeColumn,
Map<String, Column> otherColumns,
Map<String, Column> columns,
SmooshedFileMapper fileMapper
)
{
Preconditions.checkNotNull(columns.get(Column.TIME_COLUMN_NAME));
this.dataInterval = dataInterval;
this.columnNames = columnNames;
this.availableDimensions = dimNames;
this.timeColumn = timeColumn;
this.otherColumns = otherColumns;
this.columns = columns;
this.fileMapper = fileMapper;
}
@ -64,7 +63,7 @@ public class SimpleQueryableIndex implements QueryableIndex
@Override
public int getNumRows()
{
return timeColumn.getLength();
return columns.get(Column.TIME_COLUMN_NAME).getLength();
}
@Override
@ -79,16 +78,10 @@ public class SimpleQueryableIndex implements QueryableIndex
return availableDimensions;
}
@Override
public Column getTimeColumn()
{
return timeColumn;
}
@Override
public Column getColumn(String columnName)
{
return otherColumns.get(columnName);
return columns.get(columnName);
}
@Override

View File

@ -23,6 +23,7 @@ package io.druid.segment.column;
*/
public interface Column
{
public static final String TIME_COLUMN_NAME = "__time";
public ColumnCapabilities getCapabilities();
public int getLength();

View File

@ -82,7 +82,7 @@ public class IndexedFloatsGenericColumn implements GenericColumn
@Override
public long getLongSingleValueRow(int rowNum)
{
throw new UnsupportedOperationException();
return (long) column.get(rowNum);
}
@Override

View File

@ -70,7 +70,7 @@ public class IndexedLongsGenericColumn implements GenericColumn
@Override
public float getFloatSingleValueRow(int rowNum)
{
throw new UnsupportedOperationException();
return (float) column.get(rowNum);
}
@Override

View File

@ -45,8 +45,9 @@ import io.druid.query.aggregation.PostAggregator;
import io.druid.segment.ColumnSelectorFactory;
import io.druid.segment.DimensionSelector;
import io.druid.segment.FloatColumnSelector;
import io.druid.segment.LongColumnSelector;
import io.druid.segment.ObjectColumnSelector;
import io.druid.segment.TimestampColumnSelector;
import io.druid.segment.column.Column;
import io.druid.segment.column.ColumnCapabilities;
import io.druid.segment.column.ColumnCapabilitiesImpl;
import io.druid.segment.column.ValueType;
@ -129,14 +130,25 @@ public class IncrementalIndex implements Iterable<Row>, Closeable
new ColumnSelectorFactory()
{
@Override
public TimestampColumnSelector makeTimestampColumnSelector()
public LongColumnSelector makeLongColumnSelector(String columnName)
{
return new TimestampColumnSelector()
if(columnName.equals(Column.TIME_COLUMN_NAME)){
return new LongColumnSelector()
{
@Override
public long get()
{
return in.get().getTimestampFromEpoch();
}
};
}
final String metricName = columnName.toLowerCase();
return new LongColumnSelector()
{
@Override
public long getTimestamp()
public long get()
{
return in.get().getTimestampFromEpoch();
return in.get().getLongMetric(metricName);
}
};
}
@ -292,6 +304,8 @@ public class IncrementalIndex implements Iterable<Row>, Closeable
ValueType type;
if (entry.getValue().equalsIgnoreCase("float")) {
type = ValueType.FLOAT;
} else if (entry.getValue().equalsIgnoreCase("long")) {
type = ValueType.LONG;
} else {
type = ValueType.COMPLEX;
}

View File

@ -38,9 +38,10 @@ import io.druid.segment.Capabilities;
import io.druid.segment.Cursor;
import io.druid.segment.DimensionSelector;
import io.druid.segment.FloatColumnSelector;
import io.druid.segment.LongColumnSelector;
import io.druid.segment.ObjectColumnSelector;
import io.druid.segment.StorageAdapter;
import io.druid.segment.TimestampColumnSelector;
import io.druid.segment.column.Column;
import io.druid.segment.data.Indexed;
import io.druid.segment.data.IndexedInts;
import io.druid.segment.data.ListIndexed;
@ -264,19 +265,6 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter
done = !foundMatched && (cursorMap.size() == 0 || !baseIter.hasNext());
}
@Override
public TimestampColumnSelector makeTimestampColumnSelector()
{
return new TimestampColumnSelector()
{
@Override
public long getTimestamp()
{
return currEntry.getKey().getTimestamp();
}
};
}
@Override
public DimensionSelector makeDimensionSelector(String dimension)
{
@ -381,6 +369,49 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter
};
}
@Override
public LongColumnSelector makeLongColumnSelector(String columnName)
{
final String metricName = columnName.toLowerCase();
if(metricName.equals(Column.TIME_COLUMN_NAME)){
return new LongColumnSelector()
{
@Override
public long get()
{
return currEntry.getKey().getTimestamp();
}
};
}
final Integer metricIndexInt = index.getMetricIndex(metricName);
if (metricIndexInt == null) {
return new LongColumnSelector()
{
@Override
public long get()
{
return 0L;
}
};
}
final int metricIndex = metricIndexInt;
final BufferAggregator agg = index.getAggregator(metricIndex);
return new LongColumnSelector()
{
@Override
public long get()
{
return agg.getLong(
index.getMetricBuffer(),
index.getMetricPosition(currEntry.getValue(), metricIndex)
);
}
};
}
@Override
public ObjectColumnSelector makeObjectColumnSelector(String column)
{

View File

@ -152,6 +152,17 @@ public class SpatialDimensionRowTransformer implements Function<InputRow, InputR
}
}
@Override
public long getLongMetric(String metric)
{
try {
return row.getLongMetric(metric);
}
catch (ParseException e) {
throw Throwables.propagate(e);
}
}
@Override
public String toString()
{

View File

@ -28,7 +28,7 @@ import java.util.Comparator;
*/
public class LongSumAggregatorTest
{
private void aggregate(TestFloatColumnSelector selector, LongSumAggregator agg)
private void aggregate(TestLongColumnSelector selector, LongSumAggregator agg)
{
agg.aggregate();
selector.increment();
@ -37,7 +37,7 @@ public class LongSumAggregatorTest
@Test
public void testAggregate()
{
final TestFloatColumnSelector selector = new TestFloatColumnSelector(new float[]{24.15f, 20f});
final TestLongColumnSelector selector = new TestLongColumnSelector(new long[]{24L, 20L});
LongSumAggregator agg = new LongSumAggregator("billy", selector);
Assert.assertEquals("billy", agg.getName());
@ -58,7 +58,7 @@ public class LongSumAggregatorTest
@Test
public void testComparator()
{
final TestFloatColumnSelector selector = new TestFloatColumnSelector(new float[]{18293f});
final TestLongColumnSelector selector = new TestLongColumnSelector(new long[]{18293L});
LongSumAggregator agg = new LongSumAggregator("billy", selector);
Assert.assertEquals("billy", agg.getName());

View File

@ -0,0 +1,48 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.query.aggregation;
import io.druid.segment.FloatColumnSelector;
import io.druid.segment.LongColumnSelector;
/**
*/
public class TestLongColumnSelector implements LongColumnSelector
{
private final long[] longs;
private int index = 0;
public TestLongColumnSelector(long[] longs)
{
this.longs = longs;
}
@Override
public long get()
{
return longs[index];
}
public void increment()
{
++index;
}
}

View File

@ -64,6 +64,7 @@ import java.util.Map;
/**
*/
@Ignore
public class AppendTest
{
private static final AggregatorFactory[] METRIC_AGGS = new AggregatorFactory[]{

View File

@ -24,6 +24,7 @@ import com.google.common.collect.Lists;
import io.druid.granularity.QueryGranularity;
import io.druid.query.TestQueryRunners;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.segment.column.Column;
import io.druid.segment.incremental.IncrementalIndex;
import io.druid.segment.incremental.IncrementalIndexAdapter;
import org.joda.time.Interval;
@ -59,6 +60,6 @@ public class EmptyIndexTest
Assert.assertEquals("getDimensionNames", 0, Iterables.size(emptyQueryableIndex.getAvailableDimensions()));
Assert.assertEquals("getMetricNames", 0, Iterables.size(emptyQueryableIndex.getColumnNames()));
Assert.assertEquals("getDataInterval", new Interval("2012-08-01/P3D"), emptyQueryableIndex.getDataInterval());
Assert.assertEquals("getReadOnlyTimestamps", 0, emptyQueryableIndex.getTimeColumn().getLength());
Assert.assertEquals("getReadOnlyTimestamps", 0, emptyQueryableIndex.getColumn(Column.TIME_COLUMN_NAME).getLength());
}
}

View File

@ -27,6 +27,7 @@ import io.druid.data.input.MapBasedInputRow;
import io.druid.granularity.QueryGranularity;
import io.druid.query.TestQueryRunners;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.segment.column.Column;
import io.druid.segment.data.IncrementalIndexTest;
import io.druid.segment.incremental.IncrementalIndex;
import junit.framework.Assert;
@ -51,7 +52,7 @@ public class IndexMakerTest
try {
QueryableIndex index = IndexIO.loadIndex(IndexMaker.persist(toPersist, tempDir));
Assert.assertEquals(2, index.getTimeColumn().getLength());
Assert.assertEquals(2, index.getColumn(Column.TIME_COLUMN_NAME).getLength());
Assert.assertEquals(Arrays.asList("dim1", "dim2"), Lists.newArrayList(index.getAvailableDimensions()));
Assert.assertEquals(2, index.getColumnNames().size());
}
@ -90,13 +91,13 @@ public class IndexMakerTest
try {
QueryableIndex index1 = IndexIO.loadIndex(IndexMaker.persist(toPersist1, tempDir1));
Assert.assertEquals(2, index1.getTimeColumn().getLength());
Assert.assertEquals(2, index1.getColumn(Column.TIME_COLUMN_NAME).getLength());
Assert.assertEquals(Arrays.asList("dim1", "dim2"), Lists.newArrayList(index1.getAvailableDimensions()));
Assert.assertEquals(2, index1.getColumnNames().size());
QueryableIndex index2 = IndexIO.loadIndex(IndexMaker.persist(toPersist2, tempDir2));
Assert.assertEquals(2, index2.getTimeColumn().getLength());
Assert.assertEquals(2, index2.getColumn(Column.TIME_COLUMN_NAME).getLength());
Assert.assertEquals(Arrays.asList("dim1", "dim2"), Lists.newArrayList(index2.getAvailableDimensions()));
Assert.assertEquals(2, index2.getColumnNames().size());
@ -108,7 +109,7 @@ public class IndexMakerTest
)
);
Assert.assertEquals(3, merged.getTimeColumn().getLength());
Assert.assertEquals(3, merged.getColumn(Column.TIME_COLUMN_NAME).getLength());
Assert.assertEquals(Arrays.asList("dim1", "dim2"), Lists.newArrayList(merged.getAvailableDimensions()));
Assert.assertEquals(2, merged.getColumnNames().size());
}
@ -151,13 +152,13 @@ public class IndexMakerTest
IndexMaker.mergeQueryableIndex(Arrays.asList(index1, index2), new AggregatorFactory[]{}, tmpDir3)
);
Assert.assertEquals(1, index1.getTimeColumn().getLength());
Assert.assertEquals(1, index1.getColumn(Column.TIME_COLUMN_NAME).getLength());
Assert.assertEquals(ImmutableList.of("dim2"), ImmutableList.copyOf(index1.getAvailableDimensions()));
Assert.assertEquals(1, index2.getTimeColumn().getLength());
Assert.assertEquals(1, index2.getColumn(Column.TIME_COLUMN_NAME).getLength());
Assert.assertEquals(ImmutableList.of("dim2"), ImmutableList.copyOf(index2.getAvailableDimensions()));
Assert.assertEquals(1, merged.getTimeColumn().getLength());
Assert.assertEquals(1, merged.getColumn(Column.TIME_COLUMN_NAME).getLength());
Assert.assertEquals(ImmutableList.of("dim2"), ImmutableList.copyOf(merged.getAvailableDimensions()));
}
finally {

View File

@ -241,6 +241,12 @@ class WikipediaIrcDecoder implements IrcDecoder
return metrics.get(metric);
}
@Override
public long getLongMetric(String metric)
{
return new Float(metrics.get(metric)).longValue();
}
@Override
public int compareTo(Row o)
{

View File

@ -95,6 +95,12 @@ public class CombiningFirehoseFactoryTest
return metricValue;
}
@Override
public long getLongMetric(String metric)
{
return new Float(metricValue).longValue();
}
@Override
public Object getRaw(String dimension)
{

View File

@ -189,6 +189,12 @@ public class RealtimeManagerTest
return 0;
}
@Override
public long getLongMetric(String metric)
{
return 0L;
}
@Override
public Object getRaw(String dimension)
{

View File

@ -102,6 +102,12 @@ public class SinkTest
return 0;
}
@Override
public long getLongMetric(String metric)
{
return 0L;
}
@Override
public Object getRaw(String dimension)
{
@ -155,6 +161,12 @@ public class SinkTest
return 0;
}
@Override
public long getLongMetric(String metric)
{
return 0L;
}
@Override
public Object getRaw(String dimension)
{

View File

@ -202,6 +202,12 @@ public class HashBasedNumberedShardSpecTest
return 0;
}
@Override
public long getLongMetric(String s)
{
return 0L;
}
@Override
public int compareTo(Row o)
{